diff --git a/tests/persistency/test_all.nim b/tests/persistency/test_all.nim index 194977692..0909850a4 100644 --- a/tests/persistency/test_all.nim +++ b/tests/persistency/test_all.nim @@ -5,5 +5,6 @@ import ./test_backend import ./test_lifecycle import ./test_facade import ./test_encoding +import ./test_blob_codec import ./test_string_lookup import ./test_singleton diff --git a/tests/persistency/test_blob_codec.nim b/tests/persistency/test_blob_codec.nim new file mode 100644 index 000000000..884e94d09 --- /dev/null +++ b/tests/persistency/test_blob_codec.nim @@ -0,0 +1,87 @@ +{.used.} + +## Round-trip tests for the generic payload blob codec (payload_codec.nim) +## and its `BlobCodec` application to the SDS persistence types. +## +## Importing `sds_persistency` forces the real adapter (and its six +## `BlobCodec` calls + call sites) to compile. The local derivations +## below live in this module's own scope and drive the round-trip checks. + +import std/[sets, times] +import testutils/unittests +import waku/persistency/payload_codec +import waku/persistency/sds_persistency # compile-checks the real adapter +import sds/types/persistence + +# Same order constraint as the adapter: a field's type before its container. +BlobCodec(HistoryEntry) +BlobCodec(SdsMessage) +BlobCodec(UnacknowledgedMessage) +BlobCodec(IncomingMessage) +BlobCodec(OutgoingRepairEntry) +BlobCodec(IncomingRepairEntry) + +proc sampleHistory(): HistoryEntry = + HistoryEntry.init("mid-1", @[0xAA'u8, 0xBB, 0xCC], "sender-1".SdsParticipantID) + +proc sampleMessage(content = @[1'u8, 2, 3]): SdsMessage = + SdsMessage.init( + messageId = "msg-42", + lamportTimestamp = 7'i64, + causalHistory = @[sampleHistory(), HistoryEntry.init("mid-2")], + channelId = "channel-9", + content = content, + bloomFilter = @[0xDE'u8, 0xAD, 0xBE, 0xEF], + senderId = "alice".SdsParticipantID, + repairRequest = @[sampleHistory()], + ) + +suite "Persistency blob codec": + test "primitive round-trips": + check fromBlob(toBlob(7'i64), int64) == 7'i64 + check fromBlob(toBlob(int64.low), int64) == int64.low + check fromBlob(toBlob("héllo"), string) == "héllo" + check fromBlob(toBlob(@[0'u8, 255, 7]), seq[byte]) == @[0'u8, 255, 7] + + test "HistoryEntry round-trips": + let h = sampleHistory() + check fromBlob(toBlob(h), HistoryEntry) == h + + test "SdsMessage round-trips (seqs, distinct, nested)": + let m = sampleMessage() + check fromBlob(toBlob(m), SdsMessage) == m + + test "UnacknowledgedMessage round-trips (Time, int)": + let u = UnacknowledgedMessage.init(sampleMessage(), initTime(1_700_000_000, 123), 4) + check fromBlob(toBlob(u), UnacknowledgedMessage) == u + + test "IncomingMessage round-trips (HashSet)": + let inc = IncomingMessage.init(sampleMessage(), toHashSet(["dep-a", "dep-b", "dep-c"])) + check fromBlob(toBlob(inc), IncomingMessage) == inc + + test "repair tuples round-trip": + let outPair = + ("msg-42".SdsMessageID, OutgoingRepairEntry.init(sampleHistory(), initTime(10, 0))) + check fromBlob(toBlob(outPair), (SdsMessageID, OutgoingRepairEntry)) == outPair + + let inPair = ( + "msg-42".SdsMessageID, + IncomingRepairEntry.init(sampleHistory(), @[9'u8, 8, 7], initTime(20, 500)), + ) + check fromBlob(toBlob(inPair), (SdsMessageID, IncomingRepairEntry)) == inPair + + test "payload exceeds the old 64 KiB key cap (4-byte length)": + var big = newSeq[byte](70_000) + for i in 0 ..< big.len: + big[i] = byte(i and 0xFF) + let m = sampleMessage(content = big) + let decoded = fromBlob(toBlob(m), SdsMessage) + check decoded.content.len == 70_000 + check decoded == m + + test "truncated input raises ValueError": + let bytes = toBlob(sampleMessage()) + expect ValueError: + discard fromBlob(bytes[0 ..< bytes.len - 5], SdsMessage) + expect ValueError: + discard fromBlob(@[0xFF'u8, 0xFF, 0xFF, 0xFF], string) # claims 4 GiB diff --git a/waku/persistency/payload_codec.nim b/waku/persistency/payload_codec.nim new file mode 100644 index 000000000..361582b02 --- /dev/null +++ b/waku/persistency/payload_codec.nim @@ -0,0 +1,308 @@ +## Generic length-prefixed blob codec for persistency payloads. +## +## Symmetric counterpart to `keys.nim`'s `encodePart`: every persisted value +## round-trips through `writePart`/`readPart` over a `ReadCtx` cursor. Unlike +## keys, payloads are not byte-wise sort-stable, so strings and byte blobs use +## a **4-byte BE length prefix** (4 GiB ceiling) instead of keys.nim's 2-byte +## (64 KiB) prefix — the cap that originally forced SDS to hand-roll its codec. +## +## ## How a type opts in +## +## Primitives, `string`, `seq[byte]`, `enum`, `distinct`, `Time`, `seq[T]`, +## `HashSet[T]` and tuples already have codecs here. A **named struct** opts in +## with a single line: +## +## ```nim +## BlobCodec(MyType) +## ``` +## +## which emits both `writePart`/`readPart` for `MyType` from its fields, in +## declaration order, and reconstructs the value via `MyType.init(...)` +## (positional). This is the *only* mechanism for structs — there is +## deliberately no `fieldPairs`/default-construct path, so `{.requiresInit.}` +## types (which cannot be zero-initialised) work unchanged. The contract: +## +## * the type is a value object whose `init` takes its fields positionally in +## declaration order; +## * only public fields participate (private fields are invisible to the +## macro and would be dropped — don't persist such types); +## * `BlobCodec` must be called for a field's type *before* the struct +## that contains it (Nim resolves the concrete `writePart`/`readPart` +## top-down). +## +## ## Entry points +## +## `toBlob(v)` → `seq[byte]`, `fromBlob(bytes, T)` → `T` (raises `ValueError` +## on truncated/corrupt input). + +{.push raises: [].} + +import std/[macros, sets, times, typetraits] + +type ReadCtx* = object + buf*: seq[byte] + pos*: int + +proc initReadCtx*(bytes: openArray[byte]): ReadCtx = + ReadCtx(buf: @bytes, pos: 0) + +proc need(r: ReadCtx, n: int) {.raises: [ValueError].} = + if n < 0 or r.pos + n > r.buf.len: + raise newException(ValueError, "truncated payload: need " & $n & " more bytes") + +# ── Fixed-width integers ──────────────────────────────────────────────── + +proc writePart*(buf: var seq[byte], v: uint32) = + buf.add(byte((v shr 24) and 0xFF'u32)) + buf.add(byte((v shr 16) and 0xFF'u32)) + buf.add(byte((v shr 8) and 0xFF'u32)) + buf.add(byte(v and 0xFF'u32)) + +proc readPart*(r: var ReadCtx, _: typedesc[uint32]): uint32 {.raises: [ValueError].} = + r.need(4) + result = + (uint32(r.buf[r.pos]) shl 24) or (uint32(r.buf[r.pos + 1]) shl 16) or + (uint32(r.buf[r.pos + 2]) shl 8) or uint32(r.buf[r.pos + 3]) + r.pos += 4 + +proc writePart*(buf: var seq[byte], v: int64) = + let u = cast[uint64](v) + for shift in countdown(56, 0, 8): + buf.add(byte((u shr shift) and 0xFF'u64)) + +proc readPart*(r: var ReadCtx, _: typedesc[int64]): int64 {.raises: [ValueError].} = + r.need(8) + var u: uint64 = 0 + for i in 0 ..< 8: + u = (u shl 8) or uint64(r.buf[r.pos + i]) + r.pos += 8 + cast[int64](u) + +proc writePart*(buf: var seq[byte], v: int) = + writePart(buf, int64(v)) + +proc readPart*(r: var ReadCtx, _: typedesc[int]): int {.raises: [ValueError].} = + int(readPart(r, int64)) + +# ── Small scalars ─────────────────────────────────────────────────────── + +proc writePart*(buf: var seq[byte], v: bool) = + buf.add(if v: 1'u8 else: 0'u8) + +proc readPart*(r: var ReadCtx, _: typedesc[bool]): bool {.raises: [ValueError].} = + r.need(1) + result = r.buf[r.pos] != 0'u8 + r.pos += 1 + +proc writePart*(buf: var seq[byte], v: byte) = + buf.add(v) + +proc readPart*(r: var ReadCtx, _: typedesc[byte]): byte {.raises: [ValueError].} = + r.need(1) + result = r.buf[r.pos] + r.pos += 1 + +proc writePart*(buf: var seq[byte], v: char) = + buf.add(byte(v)) + +proc readPart*(r: var ReadCtx, _: typedesc[char]): char {.raises: [ValueError].} = + r.need(1) + result = char(r.buf[r.pos]) + r.pos += 1 + +proc writePart*[E: enum](buf: var seq[byte], v: E) = + writePart(buf, int64(ord(v))) + +proc readPart*[E: enum](r: var ReadCtx, _: typedesc[E]): E {.raises: [ValueError].} = + E(readPart(r, int64)) + +# ── string / seq[byte] (4-byte length) ────────────────────────────────── + +proc writePart*(buf: var seq[byte], s: string) = + writePart(buf, uint32(s.len)) + for c in s: + buf.add(byte(c)) + +proc readPart*(r: var ReadCtx, _: typedesc[string]): string {.raises: [ValueError].} = + let n = int(readPart(r, uint32)) + r.need(n) + result = newString(n) + for i in 0 ..< n: + result[i] = char(r.buf[r.pos + i]) + r.pos += n + +proc writePart*(buf: var seq[byte], b: seq[byte]) = + writePart(buf, uint32(b.len)) + for x in b: + buf.add(x) + +proc readPart*(r: var ReadCtx, _: typedesc[seq[byte]]): seq[byte] {.raises: [ValueError].} = + let n = int(readPart(r, uint32)) + r.need(n) + result = newSeq[byte](n) + for i in 0 ..< n: + result[i] = r.buf[r.pos + i] + r.pos += n + +# ── distinct (e.g. SdsParticipantID = distinct string) ────────────────── + +proc writePart*[T: distinct](buf: var seq[byte], v: T) = + mixin writePart + writePart(buf, distinctBase(T)(v)) + +proc readPart*[T: distinct]( + r: var ReadCtx, _: typedesc[T] +): T {.raises: [ValueError].} = + mixin readPart + T(readPart(r, distinctBase(T))) + +# ── Time ──────────────────────────────────────────────────────────────── + +proc writePart*(buf: var seq[byte], t: Time) = + writePart(buf, t.toUnix()) + writePart(buf, uint32(t.nanosecond)) + +proc readPart*(r: var ReadCtx, _: typedesc[Time]): Time {.raises: [ValueError].} = + let secs = readPart(r, int64) + let nanos = int(readPart(r, uint32)) + if nanos < 0 or nanos > 999_999_999: + raise newException(ValueError, "nanosecond out of range: " & $nanos) + initTime(secs, nanos) + +# ── Containers ────────────────────────────────────────────────────────── + +proc writePart*[T](buf: var seq[byte], xs: seq[T]) = + mixin writePart + writePart(buf, uint32(xs.len)) + for x in xs: + writePart(buf, x) + +proc readPart*[T]( + r: var ReadCtx, _: typedesc[seq[T]] +): seq[T] {.raises: [ValueError].} = + mixin readPart + let n = int(readPart(r, uint32)) + result = newSeqOfCap[T](n) + for _ in 0 ..< n: + result.add(readPart(r, T)) + +proc writePart*[T](buf: var seq[byte], s: HashSet[T]) = + mixin writePart + writePart(buf, uint32(s.len)) + for x in s: + writePart(buf, x) + +proc readPart*[T]( + r: var ReadCtx, _: typedesc[HashSet[T]] +): HashSet[T] {.raises: [ValueError].} = + mixin readPart + let n = int(readPart(r, uint32)) + result = initHashSet[T](max(n, 2)) + for _ in 0 ..< n: + result.incl(readPart(r, T)) + +proc writePart*[T: tuple](buf: var seq[byte], v: T) = + mixin writePart + for f in fields(v): + writePart(buf, f) + +proc readPart*[T: tuple]( + r: var ReadCtx, _: typedesc[T] +): T {.raises: [ValueError].} = + mixin readPart + for f in fields(result): + f = readPart(r, typeof(f)) + +# ── Named-struct derivation ───────────────────────────────────────────── + +proc objectRecList(tSym: NimNode): NimNode {.compileTime.} = + ## Resolve a type symbol to its object's RecList, preserving field types + ## exactly as written (getImpl, not getTypeImpl, so `HashSet[SdsMessageID]` + ## and friends stay named rather than being expanded to their structure). + var body = tSym.getImpl[2] + while body.kind in {nnkRefTy, nnkPtrTy, nnkDistinctTy}: + body = body[0] + doAssert body.kind == nnkObjectTy, + "BlobCodec: expected an object type, got " & treeRepr(body) + body[2] + +macro BlobCodec*(T: typedesc): untyped = + ## Emit `writePart`/`readPart` for a named value object `T`, encoding each + ## public field in declaration order and rebuilding via `T.init(...)`. + let tSym = getTypeInst(T)[1] + let recList = objectRecList(tSym) + + var fieldNames: seq[NimNode] + var fieldTypes: seq[NimNode] + for defs in recList: + if defs.kind != nnkIdentDefs: + continue + # Rebuild the field type from its textual form rather than splicing the + # resolved symbol: a spliced *alias* type symbol (e.g. `SdsMessageID = + # string`) is mis-resolved as a value in `readPart(r, T)`, breaking + # typedesc overload resolution. A fresh ident/expr behaves like literal + # source and resolves to a typedesc correctly. + let ftype = parseExpr(repr(defs[^2])) + for i in 0 ..< defs.len - 2: + var nameNode = defs[i] + if nameNode.kind == nnkPragmaExpr: + nameNode = nameNode[0] + if nameNode.kind == nnkPostfix: + nameNode = nameNode[1] + fieldNames.add(ident($nameNode)) + fieldTypes.add(ftype.copyNimTree) + + let bufId = ident "buf" + let vId = ident "v" + let rId = ident "r" + + # writePart(buf: var seq[byte], v: T) + var writeBody = newStmtList() + for fn in fieldNames: + writeBody.add(newCall(ident "writePart", bufId, newDotExpr(vId, fn))) + let writeProc = newProc( + name = ident "writePart", + params = [ + newEmptyNode(), + newIdentDefs(bufId, nnkVarTy.newTree(nnkBracketExpr.newTree(ident "seq", ident "byte"))), + newIdentDefs(vId, tSym), + ], + body = writeBody, + ) + + # readPart(r: var ReadCtx, _: typedesc[T]): T {.raises: [ValueError].} + var readBody = newStmtList() + var tmps: seq[NimNode] + for i, ft in fieldTypes: + let tmp = genSym(nskLet, "f" & $i) + tmps.add(tmp) + readBody.add(newLetStmt(tmp, newCall(ident "readPart", rId, ft))) + readBody.add(newCall(newDotExpr(tSym, ident "init"), tmps)) + let readProc = newProc( + name = ident "readPart", + params = [ + tSym, + newIdentDefs(rId, nnkVarTy.newTree(ident "ReadCtx")), + newIdentDefs(ident "_", nnkBracketExpr.newTree(ident "typedesc", tSym)), + ], + body = readBody, + ) + readProc.addPragma(nnkExprColonExpr.newTree( + ident "raises", nnkBracket.newTree(ident "ValueError") + )) + + result = newStmtList(writeProc, readProc) + +# ── Public entry points ───────────────────────────────────────────────── + +proc toBlob*[T](v: T): seq[byte] = + mixin writePart + result = @[] + writePart(result, v) + +proc fromBlob*[T](bytes: openArray[byte], _: typedesc[T]): T {.raises: [ValueError].} = + mixin readPart + var r = initReadCtx(bytes) + readPart(r, T) + +{.pop.} diff --git a/waku/persistency/sds_persistency.nim b/waku/persistency/sds_persistency.nim new file mode 100644 index 000000000..eef687ec0 --- /dev/null +++ b/waku/persistency/sds_persistency.nim @@ -0,0 +1,323 @@ +## Adapter that materialises the SDS `Persistence` contract on top of a +## waku-persistency `Job`. One `Job` (== one SQLite file, one worker thread) +## services all channels for a given SDS context; rows are namespaced by +## category and the channelId is the first key component so per-channel +## prefix scans stay cheap. +## +## ## Async contract +## +## Every `Persistence` proc field is async (`proc(..): Future[void] +## {.async: (raises: []), gcsafe.}`) — SDS awaits them on its own chronos +## loop. We map each onto the matching async `Job` op: +## +## * **Writes (save*/remove*)** — `await` the Job op through the `safePut`/ +## `safeDelete` helpers, which trap any backend error and log it rather +## than raising (the contract forbids raising). +## * **`dropChannel`** — awaits `doDropChannel`, which batches every row of +## the channel into one transactional `persist` so the wipe is atomic +## (called from removeChannel / resetReliabilityManager). +## * **`loadAllForChannel`** — awaits `doLoadAll` and returns the snapshot +## the SDS bootstrap path needs. +## +## ## Storage layout +## +## | Category | Key | Value | +## |------------------|------------------------------|-----------------------------------------| +## | `sds.lamport` | `key(channelId)` | 8 BE bytes (int64) | +## | `sds.log` | `key(channelId, msgId)` | encoded `SdsMessage` | +## | `sds.hint` | `key(msgId)` | raw hint bytes | +## | `sds.outgoing` | `key(channelId, msgId)` | encoded `UnacknowledgedMessage` | +## | `sds.incoming` | `key(channelId, msgId)` | encoded `IncomingMessage` | +## | `sds.outRepair` | `key(channelId, msgId)` | `(msgId, OutgoingRepairEntry)` encoded | +## | `sds.inRepair` | `key(channelId, msgId)` | `(msgId, IncomingRepairEntry)` encoded | +## +## `messageHistory` is reconstructed in memory by sorting on +## `(lamportTimestamp, messageId)` — the same total order SDS uses for +## delivery (see sds/sds_utils.nim). Insertion order is not relied upon, so +## `removeLogEntry` works with the natural `(channelId, msgId)` key. + +{.push raises: [].} + +import std/[algorithm, options, sets, times] +import chronos, chronicles, results +import ./persistency +import ./payload_codec +import sds/types/persistence + +export persistence, persistency + +logScope: + topics = "sds-persistency" + +const + CatLamport* = "sds.lamport" + CatLog* = "sds.log" + CatHint* = "sds.hint" + CatOutgoing* = "sds.outgoing" + CatIncoming* = "sds.incoming" + CatOutRepair* = "sds.outRepair" + CatInRepair* = "sds.inRepair" + +# ── Blob codecs ───────────────────────────────────────────────────────── +# +# All SDS payload types round-trip through the generic, length-prefixed +# codec in payload_codec.nim. Each `BlobCodec(T)` emits writePart/ +# readPart for `T` from its public fields (declaration order) and rebuilds +# via `T.init(...)`. Order matters: a field's type must be derived before +# the struct that contains it. The repair buffers store `(msgId, entry)` +# tuples, handled by the generic tuple codec. Lamport is a bare int64. + +BlobCodec(HistoryEntry) +BlobCodec(SdsMessage) +BlobCodec(UnacknowledgedMessage) +BlobCodec(IncomingMessage) +BlobCodec(OutgoingRepairEntry) +BlobCodec(IncomingRepairEntry) + +# ── Async backing procs ───────────────────────────────────────────────── + +proc doLoadAll( + job: Job, channelId: SdsChannelID +): Future[ChannelSnapshot] {.async.} = + var snap = ChannelSnapshot() + let chanKey = toKey(channelId) + + block lamport: + let r = await job.get(CatLamport, chanKey) + if r.isOk: + let opt = r.get + if opt.isSome: + try: + snap.lamportTimestamp = fromBlob(opt.get, int64) + except ValueError as e: + warn "sds-persistency: invalid lamport bytes", + channelId, err = e.msg + else: + warn "sds-persistency: get lamport failed", + channelId, err = $r.error + + block log: + let r = await job.scanPrefix(CatLog, chanKey) + if r.isOk: + var msgs = newSeq[SdsMessage]() + for row in r.get: + try: + msgs.add(fromBlob(row.payload, SdsMessage)) + except ValueError as e: + warn "sds-persistency: invalid log row", channelId, err = e.msg + msgs.sort do(a, b: SdsMessage) -> int: + result = cmp(a.lamportTimestamp, b.lamportTimestamp) + if result == 0: + result = cmp(a.messageId, b.messageId) + snap.messageHistory = msgs + else: + warn "sds-persistency: scan log failed", + channelId, err = $r.error + + block outgoing: + let r = await job.scanPrefix(CatOutgoing, chanKey) + if r.isOk: + for row in r.get: + try: + snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage)) + except ValueError as e: + warn "sds-persistency: invalid outgoing row", + channelId, err = e.msg + else: + warn "sds-persistency: scan outgoing failed", + channelId, err = $r.error + + block incoming: + let r = await job.scanPrefix(CatIncoming, chanKey) + if r.isOk: + for row in r.get: + try: + snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage)) + except ValueError as e: + warn "sds-persistency: invalid incoming row", + channelId, err = e.msg + else: + warn "sds-persistency: scan incoming failed", + channelId, err = $r.error + + block outRepair: + let r = await job.scanPrefix(CatOutRepair, chanKey) + if r.isOk: + for row in r.get: + try: + snap.outgoingRepairBuffer.add( + fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry)) + ) + except ValueError as e: + warn "sds-persistency: invalid out-repair row", + channelId, err = e.msg + else: + warn "sds-persistency: scan out-repair failed", + channelId, err = $r.error + + block inRepair: + let r = await job.scanPrefix(CatInRepair, chanKey) + if r.isOk: + for row in r.get: + try: + snap.incomingRepairBuffer.add( + fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry)) + ) + except ValueError as e: + warn "sds-persistency: invalid in-repair row", + channelId, err = e.msg + else: + warn "sds-persistency: scan in-repair failed", + channelId, err = $r.error + + return snap + +proc doDropChannel(job: Job, channelId: SdsChannelID): Future[void] {.async.} = + ## Collect every row belonging to the channel and submit them as a single + ## TxOp batch — the backend applies that as one BEGIN IMMEDIATE/COMMIT, + ## which is the atomicity the SDS contract asks for. + let chanKey = toKey(channelId) + var ops: seq[TxOp] = @[] + var hintIds: seq[SdsMessageID] = @[] + + let cats = + [CatLog, CatOutgoing, CatIncoming, CatOutRepair, CatInRepair] + for cat in cats: + let r = await job.scanPrefix(cat, chanKey) + if r.isOk: + for row in r.get: + ops.add(TxOp(category: cat, key: row.key, kind: txDelete)) + if cat == CatLog: + try: + hintIds.add(fromBlob(row.payload, SdsMessage).messageId) + except ValueError: + discard + else: + warn "sds-persistency: scan during drop failed", + channelId, category = cat, err = $r.error + + ops.add(TxOp(category: CatLamport, key: chanKey, kind: txDelete)) + for id in hintIds: + ops.add(TxOp(category: CatHint, key: toKey(id), kind: txDelete)) + + if ops.len > 0: + await job.persist(ops) + +# ── Write helpers ─────────────────────────────────────────────────────── +# +# The Persistence write fields are async with `raises: []`, but the Job ops +# raise `CatchableError`. These wrappers trap and log so the closures stay +# raise-free, preserving the "errors are logged, never raised" contract. + +proc safePut( + job: Job, category: string, k: Key, payload: seq[byte] +) {.async: (raises: []).} = + try: + await job.persistPut(category, k, payload) + except CatchableError as e: + warn "sds-persistency: put failed", category, err = e.msg + +proc safeDelete(job: Job, category: string, k: Key) {.async: (raises: []).} = + try: + await job.persistDelete(category, k) + except CatchableError as e: + warn "sds-persistency: delete failed", category, err = e.msg + +# ── Public factory ────────────────────────────────────────────────────── + +proc newSdsPersistence*(job: Job): Persistence {.gcsafe, raises: [].} = + ## Build an SDS `Persistence` value backed by ``job``. One Job services + ## all channels — channelId is part of every key. + ## + ## The closures capture ``job`` by ref. They must be invoked from a + ## thread that owns a running chronos loop (the SDS context's worker + ## thread satisfies this). + doAssert not job.isNil, "newSdsPersistence: job is nil" + + # Built field-by-field via assignment rather than an object literal: every + # field is an async closure whose body is an `await`/`return await` command + # call, which cannot be followed by the `,` field separator a `Persistence( + # ..)` literal would require (the parser cannot tell the comma from another + # command argument). Assignments have no separator, so the bodies stay plain. + var persistence = Persistence() + + persistence.saveLamport = proc( + channelId: SdsChannelID, lamport: int64 + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatLamport, toKey(channelId), toBlob(lamport)) + + persistence.appendLogEntry = proc( + channelId: SdsChannelID, msg: SdsMessage + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatLog, key(channelId, msg.messageId), toBlob(msg)) + + persistence.removeLogEntry = proc( + channelId: SdsChannelID, msgId: SdsMessageID + ): Future[void] {.async: (raises: []), gcsafe.} = + await safeDelete(job, CatLog, key(channelId, msgId)) + + persistence.setRetrievalHint = proc( + msgId: SdsMessageID, hint: seq[byte] + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatHint, toKey(msgId), hint) + + persistence.saveOutgoing = proc( + channelId: SdsChannelID, msg: UnacknowledgedMessage + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatOutgoing, key(channelId, msg.message.messageId), toBlob(msg)) + + persistence.removeOutgoing = proc( + channelId: SdsChannelID, msgId: SdsMessageID + ): Future[void] {.async: (raises: []), gcsafe.} = + await safeDelete(job, CatOutgoing, key(channelId, msgId)) + + persistence.saveIncoming = proc( + channelId: SdsChannelID, msg: IncomingMessage + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatIncoming, key(channelId, msg.message.messageId), toBlob(msg)) + + persistence.removeIncoming = proc( + channelId: SdsChannelID, msgId: SdsMessageID + ): Future[void] {.async: (raises: []), gcsafe.} = + await safeDelete(job, CatIncoming, key(channelId, msgId)) + + persistence.saveOutgoingRepair = proc( + channelId: SdsChannelID, msgId: SdsMessageID, entry: OutgoingRepairEntry + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatOutRepair, key(channelId, msgId), toBlob((msgId, entry))) + + persistence.removeOutgoingRepair = proc( + channelId: SdsChannelID, msgId: SdsMessageID + ): Future[void] {.async: (raises: []), gcsafe.} = + await safeDelete(job, CatOutRepair, key(channelId, msgId)) + + persistence.saveIncomingRepair = proc( + channelId: SdsChannelID, msgId: SdsMessageID, entry: IncomingRepairEntry + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatInRepair, key(channelId, msgId), toBlob((msgId, entry))) + + persistence.removeIncomingRepair = proc( + channelId: SdsChannelID, msgId: SdsMessageID + ): Future[void] {.async: (raises: []), gcsafe.} = + await safeDelete(job, CatInRepair, key(channelId, msgId)) + + persistence.dropChannel = proc( + channelId: SdsChannelID + ): Future[void] {.async: (raises: []), gcsafe.} = + try: + await doDropChannel(job, channelId) + except CatchableError as e: + error "sds-persistency: dropChannel failed", channelId, err = e.msg + + persistence.loadAllForChannel = proc( + channelId: SdsChannelID + ): Future[ChannelSnapshot] {.async: (raises: []), gcsafe.} = + try: + return await doLoadAll(job, channelId) + except CatchableError as e: + error "sds-persistency: loadAllForChannel failed", channelId, err = e.msg + return ChannelSnapshot() + + return persistence + +{.pop.} diff --git a/waku/waku_persistency.nim b/waku/waku_persistency.nim new file mode 100644 index 000000000..5eb94e3f0 --- /dev/null +++ b/waku/waku_persistency.nim @@ -0,0 +1,3 @@ +import waku/persistency/persistency + +export persistency