From c4bff9de0c6a18572a4a017287136f53c74b4df6 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Mon, 25 May 2026 23:05:32 +0200 Subject: [PATCH 01/10] bump sds (now async) and brokers version --- nimble.lock | 12 ++++++------ waku.nimble | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/nimble.lock b/nimble.lock index cd533001e..e03a3db29 100644 --- a/nimble.lock +++ b/nimble.lock @@ -328,8 +328,8 @@ } }, "brokers": { - "version": "#v2.0.1", - "vcsRevision": "2093ca4d50e581adda73fee7fd16231f990f4cbe", + "version": "#v3.0.0", + "vcsRevision": "8199b236db409cbaf6102dcc2431ebf33446ae26", "url": "https://github.com/NagyZoltanPeter/nim-brokers.git", "downloadMethod": "git", "dependencies": [ @@ -341,7 +341,7 @@ "cbor_serialization" ], "checksums": { - "sha1": "cc74c987af94537e9d44d1b0143aa417299040c5" + "sha1": "b3a48e23540c0f26f905a7516830559f71c89003" } }, "stint": { @@ -620,8 +620,8 @@ } }, "sds": { - "version": "#2e9a7683f0e180bf112135fae3a3803eed8490d4", - "vcsRevision": "2e9a7683f0e180bf112135fae3a3803eed8490d4", + "version": "#35a33adc9808a053f4ad7af8d07ff92075ba3462", + "vcsRevision": "35a33adc9808a053f4ad7af8d07ff92075ba3462", "url": "https://github.com/logos-messaging/nim-sds.git", "downloadMethod": "git", "dependencies": [ @@ -636,7 +636,7 @@ "taskpools" ], "checksums": { - "sha1": "d13f1bf8d1b90b27e9edfc063b043831242cda19" + "sha1": "136296859b324403486b93d62ae9154e53fb15d4" } }, "ffi": { diff --git a/waku.nimble b/waku.nimble index 99f649758..066a33018 100644 --- a/waku.nimble +++ b/waku.nimble @@ -61,7 +61,7 @@ requires "nim >= 2.2.4", # Packages not on nimble (use git URLs) requires "https://github.com/logos-messaging/nim-ffi" -requires "https://github.com/logos-messaging/nim-sds.git#2e9a7683f0e180bf112135fae3a3803eed8490d4" +requires "https://github.com/logos-messaging/nim-sds.git#35a33adc9808a053f4ad7af8d07ff92075ba3462" # brokers: pinned by URL+commit rather than the bare `brokers >= 2.0.1` # form because the nim-lang/packages registry entry for `brokers` only @@ -71,7 +71,7 @@ requires "https://github.com/logos-messaging/nim-sds.git#2e9a7683f0e180bf112135f # pin below bypasses the registry and locks the exact commit of the # v2.0.1 tag. Revert to the bare form once nim-lang/packages is # updated. -requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v2.0.1" +requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v3.0.0" requires "https://github.com/vacp2p/nim-lsquic" requires "https://github.com/vacp2p/nim-jwt.git#057ec95eb5af0eea9c49bfe9025b3312c95dc5f2" From 4a8e9fe55344dc7cc8c3ace38d717b0bc6afa7ee Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 26 May 2026 11:14:26 +0200 Subject: [PATCH 02/10] Utilizing async SDS, enhancement on persistency to simplify buffern serde --- tests/persistency/test_all.nim | 1 + tests/persistency/test_blob_codec.nim | 87 +++++++ waku/persistency/payload_codec.nim | 308 ++++++++++++++++++++++++ waku/persistency/sds_persistency.nim | 323 ++++++++++++++++++++++++++ waku/waku_persistency.nim | 3 + 5 files changed, 722 insertions(+) create mode 100644 tests/persistency/test_blob_codec.nim create mode 100644 waku/persistency/payload_codec.nim create mode 100644 waku/persistency/sds_persistency.nim create mode 100644 waku/waku_persistency.nim diff --git a/tests/persistency/test_all.nim b/tests/persistency/test_all.nim index 194977692..0909850a4 100644 --- a/tests/persistency/test_all.nim +++ b/tests/persistency/test_all.nim @@ -5,5 +5,6 @@ import ./test_backend import ./test_lifecycle import ./test_facade import ./test_encoding +import ./test_blob_codec import ./test_string_lookup import ./test_singleton diff --git a/tests/persistency/test_blob_codec.nim b/tests/persistency/test_blob_codec.nim new file mode 100644 index 000000000..884e94d09 --- /dev/null +++ b/tests/persistency/test_blob_codec.nim @@ -0,0 +1,87 @@ +{.used.} + +## Round-trip tests for the generic payload blob codec (payload_codec.nim) +## and its `BlobCodec` application to the SDS persistence types. +## +## Importing `sds_persistency` forces the real adapter (and its six +## `BlobCodec` calls + call sites) to compile. The local derivations +## below live in this module's own scope and drive the round-trip checks. + +import std/[sets, times] +import testutils/unittests +import waku/persistency/payload_codec +import waku/persistency/sds_persistency # compile-checks the real adapter +import sds/types/persistence + +# Same order constraint as the adapter: a field's type before its container. +BlobCodec(HistoryEntry) +BlobCodec(SdsMessage) +BlobCodec(UnacknowledgedMessage) +BlobCodec(IncomingMessage) +BlobCodec(OutgoingRepairEntry) +BlobCodec(IncomingRepairEntry) + +proc sampleHistory(): HistoryEntry = + HistoryEntry.init("mid-1", @[0xAA'u8, 0xBB, 0xCC], "sender-1".SdsParticipantID) + +proc sampleMessage(content = @[1'u8, 2, 3]): SdsMessage = + SdsMessage.init( + messageId = "msg-42", + lamportTimestamp = 7'i64, + causalHistory = @[sampleHistory(), HistoryEntry.init("mid-2")], + channelId = "channel-9", + content = content, + bloomFilter = @[0xDE'u8, 0xAD, 0xBE, 0xEF], + senderId = "alice".SdsParticipantID, + repairRequest = @[sampleHistory()], + ) + +suite "Persistency blob codec": + test "primitive round-trips": + check fromBlob(toBlob(7'i64), int64) == 7'i64 + check fromBlob(toBlob(int64.low), int64) == int64.low + check fromBlob(toBlob("héllo"), string) == "héllo" + check fromBlob(toBlob(@[0'u8, 255, 7]), seq[byte]) == @[0'u8, 255, 7] + + test "HistoryEntry round-trips": + let h = sampleHistory() + check fromBlob(toBlob(h), HistoryEntry) == h + + test "SdsMessage round-trips (seqs, distinct, nested)": + let m = sampleMessage() + check fromBlob(toBlob(m), SdsMessage) == m + + test "UnacknowledgedMessage round-trips (Time, int)": + let u = UnacknowledgedMessage.init(sampleMessage(), initTime(1_700_000_000, 123), 4) + check fromBlob(toBlob(u), UnacknowledgedMessage) == u + + test "IncomingMessage round-trips (HashSet)": + let inc = IncomingMessage.init(sampleMessage(), toHashSet(["dep-a", "dep-b", "dep-c"])) + check fromBlob(toBlob(inc), IncomingMessage) == inc + + test "repair tuples round-trip": + let outPair = + ("msg-42".SdsMessageID, OutgoingRepairEntry.init(sampleHistory(), initTime(10, 0))) + check fromBlob(toBlob(outPair), (SdsMessageID, OutgoingRepairEntry)) == outPair + + let inPair = ( + "msg-42".SdsMessageID, + IncomingRepairEntry.init(sampleHistory(), @[9'u8, 8, 7], initTime(20, 500)), + ) + check fromBlob(toBlob(inPair), (SdsMessageID, IncomingRepairEntry)) == inPair + + test "payload exceeds the old 64 KiB key cap (4-byte length)": + var big = newSeq[byte](70_000) + for i in 0 ..< big.len: + big[i] = byte(i and 0xFF) + let m = sampleMessage(content = big) + let decoded = fromBlob(toBlob(m), SdsMessage) + check decoded.content.len == 70_000 + check decoded == m + + test "truncated input raises ValueError": + let bytes = toBlob(sampleMessage()) + expect ValueError: + discard fromBlob(bytes[0 ..< bytes.len - 5], SdsMessage) + expect ValueError: + discard fromBlob(@[0xFF'u8, 0xFF, 0xFF, 0xFF], string) # claims 4 GiB diff --git a/waku/persistency/payload_codec.nim b/waku/persistency/payload_codec.nim new file mode 100644 index 000000000..361582b02 --- /dev/null +++ b/waku/persistency/payload_codec.nim @@ -0,0 +1,308 @@ +## Generic length-prefixed blob codec for persistency payloads. +## +## Symmetric counterpart to `keys.nim`'s `encodePart`: every persisted value +## round-trips through `writePart`/`readPart` over a `ReadCtx` cursor. Unlike +## keys, payloads are not byte-wise sort-stable, so strings and byte blobs use +## a **4-byte BE length prefix** (4 GiB ceiling) instead of keys.nim's 2-byte +## (64 KiB) prefix — the cap that originally forced SDS to hand-roll its codec. +## +## ## How a type opts in +## +## Primitives, `string`, `seq[byte]`, `enum`, `distinct`, `Time`, `seq[T]`, +## `HashSet[T]` and tuples already have codecs here. A **named struct** opts in +## with a single line: +## +## ```nim +## BlobCodec(MyType) +## ``` +## +## which emits both `writePart`/`readPart` for `MyType` from its fields, in +## declaration order, and reconstructs the value via `MyType.init(...)` +## (positional). This is the *only* mechanism for structs — there is +## deliberately no `fieldPairs`/default-construct path, so `{.requiresInit.}` +## types (which cannot be zero-initialised) work unchanged. The contract: +## +## * the type is a value object whose `init` takes its fields positionally in +## declaration order; +## * only public fields participate (private fields are invisible to the +## macro and would be dropped — don't persist such types); +## * `BlobCodec` must be called for a field's type *before* the struct +## that contains it (Nim resolves the concrete `writePart`/`readPart` +## top-down). +## +## ## Entry points +## +## `toBlob(v)` → `seq[byte]`, `fromBlob(bytes, T)` → `T` (raises `ValueError` +## on truncated/corrupt input). + +{.push raises: [].} + +import std/[macros, sets, times, typetraits] + +type ReadCtx* = object + buf*: seq[byte] + pos*: int + +proc initReadCtx*(bytes: openArray[byte]): ReadCtx = + ReadCtx(buf: @bytes, pos: 0) + +proc need(r: ReadCtx, n: int) {.raises: [ValueError].} = + if n < 0 or r.pos + n > r.buf.len: + raise newException(ValueError, "truncated payload: need " & $n & " more bytes") + +# ── Fixed-width integers ──────────────────────────────────────────────── + +proc writePart*(buf: var seq[byte], v: uint32) = + buf.add(byte((v shr 24) and 0xFF'u32)) + buf.add(byte((v shr 16) and 0xFF'u32)) + buf.add(byte((v shr 8) and 0xFF'u32)) + buf.add(byte(v and 0xFF'u32)) + +proc readPart*(r: var ReadCtx, _: typedesc[uint32]): uint32 {.raises: [ValueError].} = + r.need(4) + result = + (uint32(r.buf[r.pos]) shl 24) or (uint32(r.buf[r.pos + 1]) shl 16) or + (uint32(r.buf[r.pos + 2]) shl 8) or uint32(r.buf[r.pos + 3]) + r.pos += 4 + +proc writePart*(buf: var seq[byte], v: int64) = + let u = cast[uint64](v) + for shift in countdown(56, 0, 8): + buf.add(byte((u shr shift) and 0xFF'u64)) + +proc readPart*(r: var ReadCtx, _: typedesc[int64]): int64 {.raises: [ValueError].} = + r.need(8) + var u: uint64 = 0 + for i in 0 ..< 8: + u = (u shl 8) or uint64(r.buf[r.pos + i]) + r.pos += 8 + cast[int64](u) + +proc writePart*(buf: var seq[byte], v: int) = + writePart(buf, int64(v)) + +proc readPart*(r: var ReadCtx, _: typedesc[int]): int {.raises: [ValueError].} = + int(readPart(r, int64)) + +# ── Small scalars ─────────────────────────────────────────────────────── + +proc writePart*(buf: var seq[byte], v: bool) = + buf.add(if v: 1'u8 else: 0'u8) + +proc readPart*(r: var ReadCtx, _: typedesc[bool]): bool {.raises: [ValueError].} = + r.need(1) + result = r.buf[r.pos] != 0'u8 + r.pos += 1 + +proc writePart*(buf: var seq[byte], v: byte) = + buf.add(v) + +proc readPart*(r: var ReadCtx, _: typedesc[byte]): byte {.raises: [ValueError].} = + r.need(1) + result = r.buf[r.pos] + r.pos += 1 + +proc writePart*(buf: var seq[byte], v: char) = + buf.add(byte(v)) + +proc readPart*(r: var ReadCtx, _: typedesc[char]): char {.raises: [ValueError].} = + r.need(1) + result = char(r.buf[r.pos]) + r.pos += 1 + +proc writePart*[E: enum](buf: var seq[byte], v: E) = + writePart(buf, int64(ord(v))) + +proc readPart*[E: enum](r: var ReadCtx, _: typedesc[E]): E {.raises: [ValueError].} = + E(readPart(r, int64)) + +# ── string / seq[byte] (4-byte length) ────────────────────────────────── + +proc writePart*(buf: var seq[byte], s: string) = + writePart(buf, uint32(s.len)) + for c in s: + buf.add(byte(c)) + +proc readPart*(r: var ReadCtx, _: typedesc[string]): string {.raises: [ValueError].} = + let n = int(readPart(r, uint32)) + r.need(n) + result = newString(n) + for i in 0 ..< n: + result[i] = char(r.buf[r.pos + i]) + r.pos += n + +proc writePart*(buf: var seq[byte], b: seq[byte]) = + writePart(buf, uint32(b.len)) + for x in b: + buf.add(x) + +proc readPart*(r: var ReadCtx, _: typedesc[seq[byte]]): seq[byte] {.raises: [ValueError].} = + let n = int(readPart(r, uint32)) + r.need(n) + result = newSeq[byte](n) + for i in 0 ..< n: + result[i] = r.buf[r.pos + i] + r.pos += n + +# ── distinct (e.g. SdsParticipantID = distinct string) ────────────────── + +proc writePart*[T: distinct](buf: var seq[byte], v: T) = + mixin writePart + writePart(buf, distinctBase(T)(v)) + +proc readPart*[T: distinct]( + r: var ReadCtx, _: typedesc[T] +): T {.raises: [ValueError].} = + mixin readPart + T(readPart(r, distinctBase(T))) + +# ── Time ──────────────────────────────────────────────────────────────── + +proc writePart*(buf: var seq[byte], t: Time) = + writePart(buf, t.toUnix()) + writePart(buf, uint32(t.nanosecond)) + +proc readPart*(r: var ReadCtx, _: typedesc[Time]): Time {.raises: [ValueError].} = + let secs = readPart(r, int64) + let nanos = int(readPart(r, uint32)) + if nanos < 0 or nanos > 999_999_999: + raise newException(ValueError, "nanosecond out of range: " & $nanos) + initTime(secs, nanos) + +# ── Containers ────────────────────────────────────────────────────────── + +proc writePart*[T](buf: var seq[byte], xs: seq[T]) = + mixin writePart + writePart(buf, uint32(xs.len)) + for x in xs: + writePart(buf, x) + +proc readPart*[T]( + r: var ReadCtx, _: typedesc[seq[T]] +): seq[T] {.raises: [ValueError].} = + mixin readPart + let n = int(readPart(r, uint32)) + result = newSeqOfCap[T](n) + for _ in 0 ..< n: + result.add(readPart(r, T)) + +proc writePart*[T](buf: var seq[byte], s: HashSet[T]) = + mixin writePart + writePart(buf, uint32(s.len)) + for x in s: + writePart(buf, x) + +proc readPart*[T]( + r: var ReadCtx, _: typedesc[HashSet[T]] +): HashSet[T] {.raises: [ValueError].} = + mixin readPart + let n = int(readPart(r, uint32)) + result = initHashSet[T](max(n, 2)) + for _ in 0 ..< n: + result.incl(readPart(r, T)) + +proc writePart*[T: tuple](buf: var seq[byte], v: T) = + mixin writePart + for f in fields(v): + writePart(buf, f) + +proc readPart*[T: tuple]( + r: var ReadCtx, _: typedesc[T] +): T {.raises: [ValueError].} = + mixin readPart + for f in fields(result): + f = readPart(r, typeof(f)) + +# ── Named-struct derivation ───────────────────────────────────────────── + +proc objectRecList(tSym: NimNode): NimNode {.compileTime.} = + ## Resolve a type symbol to its object's RecList, preserving field types + ## exactly as written (getImpl, not getTypeImpl, so `HashSet[SdsMessageID]` + ## and friends stay named rather than being expanded to their structure). + var body = tSym.getImpl[2] + while body.kind in {nnkRefTy, nnkPtrTy, nnkDistinctTy}: + body = body[0] + doAssert body.kind == nnkObjectTy, + "BlobCodec: expected an object type, got " & treeRepr(body) + body[2] + +macro BlobCodec*(T: typedesc): untyped = + ## Emit `writePart`/`readPart` for a named value object `T`, encoding each + ## public field in declaration order and rebuilding via `T.init(...)`. + let tSym = getTypeInst(T)[1] + let recList = objectRecList(tSym) + + var fieldNames: seq[NimNode] + var fieldTypes: seq[NimNode] + for defs in recList: + if defs.kind != nnkIdentDefs: + continue + # Rebuild the field type from its textual form rather than splicing the + # resolved symbol: a spliced *alias* type symbol (e.g. `SdsMessageID = + # string`) is mis-resolved as a value in `readPart(r, T)`, breaking + # typedesc overload resolution. A fresh ident/expr behaves like literal + # source and resolves to a typedesc correctly. + let ftype = parseExpr(repr(defs[^2])) + for i in 0 ..< defs.len - 2: + var nameNode = defs[i] + if nameNode.kind == nnkPragmaExpr: + nameNode = nameNode[0] + if nameNode.kind == nnkPostfix: + nameNode = nameNode[1] + fieldNames.add(ident($nameNode)) + fieldTypes.add(ftype.copyNimTree) + + let bufId = ident "buf" + let vId = ident "v" + let rId = ident "r" + + # writePart(buf: var seq[byte], v: T) + var writeBody = newStmtList() + for fn in fieldNames: + writeBody.add(newCall(ident "writePart", bufId, newDotExpr(vId, fn))) + let writeProc = newProc( + name = ident "writePart", + params = [ + newEmptyNode(), + newIdentDefs(bufId, nnkVarTy.newTree(nnkBracketExpr.newTree(ident "seq", ident "byte"))), + newIdentDefs(vId, tSym), + ], + body = writeBody, + ) + + # readPart(r: var ReadCtx, _: typedesc[T]): T {.raises: [ValueError].} + var readBody = newStmtList() + var tmps: seq[NimNode] + for i, ft in fieldTypes: + let tmp = genSym(nskLet, "f" & $i) + tmps.add(tmp) + readBody.add(newLetStmt(tmp, newCall(ident "readPart", rId, ft))) + readBody.add(newCall(newDotExpr(tSym, ident "init"), tmps)) + let readProc = newProc( + name = ident "readPart", + params = [ + tSym, + newIdentDefs(rId, nnkVarTy.newTree(ident "ReadCtx")), + newIdentDefs(ident "_", nnkBracketExpr.newTree(ident "typedesc", tSym)), + ], + body = readBody, + ) + readProc.addPragma(nnkExprColonExpr.newTree( + ident "raises", nnkBracket.newTree(ident "ValueError") + )) + + result = newStmtList(writeProc, readProc) + +# ── Public entry points ───────────────────────────────────────────────── + +proc toBlob*[T](v: T): seq[byte] = + mixin writePart + result = @[] + writePart(result, v) + +proc fromBlob*[T](bytes: openArray[byte], _: typedesc[T]): T {.raises: [ValueError].} = + mixin readPart + var r = initReadCtx(bytes) + readPart(r, T) + +{.pop.} diff --git a/waku/persistency/sds_persistency.nim b/waku/persistency/sds_persistency.nim new file mode 100644 index 000000000..eef687ec0 --- /dev/null +++ b/waku/persistency/sds_persistency.nim @@ -0,0 +1,323 @@ +## Adapter that materialises the SDS `Persistence` contract on top of a +## waku-persistency `Job`. One `Job` (== one SQLite file, one worker thread) +## services all channels for a given SDS context; rows are namespaced by +## category and the channelId is the first key component so per-channel +## prefix scans stay cheap. +## +## ## Async contract +## +## Every `Persistence` proc field is async (`proc(..): Future[void] +## {.async: (raises: []), gcsafe.}`) — SDS awaits them on its own chronos +## loop. We map each onto the matching async `Job` op: +## +## * **Writes (save*/remove*)** — `await` the Job op through the `safePut`/ +## `safeDelete` helpers, which trap any backend error and log it rather +## than raising (the contract forbids raising). +## * **`dropChannel`** — awaits `doDropChannel`, which batches every row of +## the channel into one transactional `persist` so the wipe is atomic +## (called from removeChannel / resetReliabilityManager). +## * **`loadAllForChannel`** — awaits `doLoadAll` and returns the snapshot +## the SDS bootstrap path needs. +## +## ## Storage layout +## +## | Category | Key | Value | +## |------------------|------------------------------|-----------------------------------------| +## | `sds.lamport` | `key(channelId)` | 8 BE bytes (int64) | +## | `sds.log` | `key(channelId, msgId)` | encoded `SdsMessage` | +## | `sds.hint` | `key(msgId)` | raw hint bytes | +## | `sds.outgoing` | `key(channelId, msgId)` | encoded `UnacknowledgedMessage` | +## | `sds.incoming` | `key(channelId, msgId)` | encoded `IncomingMessage` | +## | `sds.outRepair` | `key(channelId, msgId)` | `(msgId, OutgoingRepairEntry)` encoded | +## | `sds.inRepair` | `key(channelId, msgId)` | `(msgId, IncomingRepairEntry)` encoded | +## +## `messageHistory` is reconstructed in memory by sorting on +## `(lamportTimestamp, messageId)` — the same total order SDS uses for +## delivery (see sds/sds_utils.nim). Insertion order is not relied upon, so +## `removeLogEntry` works with the natural `(channelId, msgId)` key. + +{.push raises: [].} + +import std/[algorithm, options, sets, times] +import chronos, chronicles, results +import ./persistency +import ./payload_codec +import sds/types/persistence + +export persistence, persistency + +logScope: + topics = "sds-persistency" + +const + CatLamport* = "sds.lamport" + CatLog* = "sds.log" + CatHint* = "sds.hint" + CatOutgoing* = "sds.outgoing" + CatIncoming* = "sds.incoming" + CatOutRepair* = "sds.outRepair" + CatInRepair* = "sds.inRepair" + +# ── Blob codecs ───────────────────────────────────────────────────────── +# +# All SDS payload types round-trip through the generic, length-prefixed +# codec in payload_codec.nim. Each `BlobCodec(T)` emits writePart/ +# readPart for `T` from its public fields (declaration order) and rebuilds +# via `T.init(...)`. Order matters: a field's type must be derived before +# the struct that contains it. The repair buffers store `(msgId, entry)` +# tuples, handled by the generic tuple codec. Lamport is a bare int64. + +BlobCodec(HistoryEntry) +BlobCodec(SdsMessage) +BlobCodec(UnacknowledgedMessage) +BlobCodec(IncomingMessage) +BlobCodec(OutgoingRepairEntry) +BlobCodec(IncomingRepairEntry) + +# ── Async backing procs ───────────────────────────────────────────────── + +proc doLoadAll( + job: Job, channelId: SdsChannelID +): Future[ChannelSnapshot] {.async.} = + var snap = ChannelSnapshot() + let chanKey = toKey(channelId) + + block lamport: + let r = await job.get(CatLamport, chanKey) + if r.isOk: + let opt = r.get + if opt.isSome: + try: + snap.lamportTimestamp = fromBlob(opt.get, int64) + except ValueError as e: + warn "sds-persistency: invalid lamport bytes", + channelId, err = e.msg + else: + warn "sds-persistency: get lamport failed", + channelId, err = $r.error + + block log: + let r = await job.scanPrefix(CatLog, chanKey) + if r.isOk: + var msgs = newSeq[SdsMessage]() + for row in r.get: + try: + msgs.add(fromBlob(row.payload, SdsMessage)) + except ValueError as e: + warn "sds-persistency: invalid log row", channelId, err = e.msg + msgs.sort do(a, b: SdsMessage) -> int: + result = cmp(a.lamportTimestamp, b.lamportTimestamp) + if result == 0: + result = cmp(a.messageId, b.messageId) + snap.messageHistory = msgs + else: + warn "sds-persistency: scan log failed", + channelId, err = $r.error + + block outgoing: + let r = await job.scanPrefix(CatOutgoing, chanKey) + if r.isOk: + for row in r.get: + try: + snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage)) + except ValueError as e: + warn "sds-persistency: invalid outgoing row", + channelId, err = e.msg + else: + warn "sds-persistency: scan outgoing failed", + channelId, err = $r.error + + block incoming: + let r = await job.scanPrefix(CatIncoming, chanKey) + if r.isOk: + for row in r.get: + try: + snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage)) + except ValueError as e: + warn "sds-persistency: invalid incoming row", + channelId, err = e.msg + else: + warn "sds-persistency: scan incoming failed", + channelId, err = $r.error + + block outRepair: + let r = await job.scanPrefix(CatOutRepair, chanKey) + if r.isOk: + for row in r.get: + try: + snap.outgoingRepairBuffer.add( + fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry)) + ) + except ValueError as e: + warn "sds-persistency: invalid out-repair row", + channelId, err = e.msg + else: + warn "sds-persistency: scan out-repair failed", + channelId, err = $r.error + + block inRepair: + let r = await job.scanPrefix(CatInRepair, chanKey) + if r.isOk: + for row in r.get: + try: + snap.incomingRepairBuffer.add( + fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry)) + ) + except ValueError as e: + warn "sds-persistency: invalid in-repair row", + channelId, err = e.msg + else: + warn "sds-persistency: scan in-repair failed", + channelId, err = $r.error + + return snap + +proc doDropChannel(job: Job, channelId: SdsChannelID): Future[void] {.async.} = + ## Collect every row belonging to the channel and submit them as a single + ## TxOp batch — the backend applies that as one BEGIN IMMEDIATE/COMMIT, + ## which is the atomicity the SDS contract asks for. + let chanKey = toKey(channelId) + var ops: seq[TxOp] = @[] + var hintIds: seq[SdsMessageID] = @[] + + let cats = + [CatLog, CatOutgoing, CatIncoming, CatOutRepair, CatInRepair] + for cat in cats: + let r = await job.scanPrefix(cat, chanKey) + if r.isOk: + for row in r.get: + ops.add(TxOp(category: cat, key: row.key, kind: txDelete)) + if cat == CatLog: + try: + hintIds.add(fromBlob(row.payload, SdsMessage).messageId) + except ValueError: + discard + else: + warn "sds-persistency: scan during drop failed", + channelId, category = cat, err = $r.error + + ops.add(TxOp(category: CatLamport, key: chanKey, kind: txDelete)) + for id in hintIds: + ops.add(TxOp(category: CatHint, key: toKey(id), kind: txDelete)) + + if ops.len > 0: + await job.persist(ops) + +# ── Write helpers ─────────────────────────────────────────────────────── +# +# The Persistence write fields are async with `raises: []`, but the Job ops +# raise `CatchableError`. These wrappers trap and log so the closures stay +# raise-free, preserving the "errors are logged, never raised" contract. + +proc safePut( + job: Job, category: string, k: Key, payload: seq[byte] +) {.async: (raises: []).} = + try: + await job.persistPut(category, k, payload) + except CatchableError as e: + warn "sds-persistency: put failed", category, err = e.msg + +proc safeDelete(job: Job, category: string, k: Key) {.async: (raises: []).} = + try: + await job.persistDelete(category, k) + except CatchableError as e: + warn "sds-persistency: delete failed", category, err = e.msg + +# ── Public factory ────────────────────────────────────────────────────── + +proc newSdsPersistence*(job: Job): Persistence {.gcsafe, raises: [].} = + ## Build an SDS `Persistence` value backed by ``job``. One Job services + ## all channels — channelId is part of every key. + ## + ## The closures capture ``job`` by ref. They must be invoked from a + ## thread that owns a running chronos loop (the SDS context's worker + ## thread satisfies this). + doAssert not job.isNil, "newSdsPersistence: job is nil" + + # Built field-by-field via assignment rather than an object literal: every + # field is an async closure whose body is an `await`/`return await` command + # call, which cannot be followed by the `,` field separator a `Persistence( + # ..)` literal would require (the parser cannot tell the comma from another + # command argument). Assignments have no separator, so the bodies stay plain. + var persistence = Persistence() + + persistence.saveLamport = proc( + channelId: SdsChannelID, lamport: int64 + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatLamport, toKey(channelId), toBlob(lamport)) + + persistence.appendLogEntry = proc( + channelId: SdsChannelID, msg: SdsMessage + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatLog, key(channelId, msg.messageId), toBlob(msg)) + + persistence.removeLogEntry = proc( + channelId: SdsChannelID, msgId: SdsMessageID + ): Future[void] {.async: (raises: []), gcsafe.} = + await safeDelete(job, CatLog, key(channelId, msgId)) + + persistence.setRetrievalHint = proc( + msgId: SdsMessageID, hint: seq[byte] + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatHint, toKey(msgId), hint) + + persistence.saveOutgoing = proc( + channelId: SdsChannelID, msg: UnacknowledgedMessage + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatOutgoing, key(channelId, msg.message.messageId), toBlob(msg)) + + persistence.removeOutgoing = proc( + channelId: SdsChannelID, msgId: SdsMessageID + ): Future[void] {.async: (raises: []), gcsafe.} = + await safeDelete(job, CatOutgoing, key(channelId, msgId)) + + persistence.saveIncoming = proc( + channelId: SdsChannelID, msg: IncomingMessage + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatIncoming, key(channelId, msg.message.messageId), toBlob(msg)) + + persistence.removeIncoming = proc( + channelId: SdsChannelID, msgId: SdsMessageID + ): Future[void] {.async: (raises: []), gcsafe.} = + await safeDelete(job, CatIncoming, key(channelId, msgId)) + + persistence.saveOutgoingRepair = proc( + channelId: SdsChannelID, msgId: SdsMessageID, entry: OutgoingRepairEntry + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatOutRepair, key(channelId, msgId), toBlob((msgId, entry))) + + persistence.removeOutgoingRepair = proc( + channelId: SdsChannelID, msgId: SdsMessageID + ): Future[void] {.async: (raises: []), gcsafe.} = + await safeDelete(job, CatOutRepair, key(channelId, msgId)) + + persistence.saveIncomingRepair = proc( + channelId: SdsChannelID, msgId: SdsMessageID, entry: IncomingRepairEntry + ): Future[void] {.async: (raises: []), gcsafe.} = + await safePut(job, CatInRepair, key(channelId, msgId), toBlob((msgId, entry))) + + persistence.removeIncomingRepair = proc( + channelId: SdsChannelID, msgId: SdsMessageID + ): Future[void] {.async: (raises: []), gcsafe.} = + await safeDelete(job, CatInRepair, key(channelId, msgId)) + + persistence.dropChannel = proc( + channelId: SdsChannelID + ): Future[void] {.async: (raises: []), gcsafe.} = + try: + await doDropChannel(job, channelId) + except CatchableError as e: + error "sds-persistency: dropChannel failed", channelId, err = e.msg + + persistence.loadAllForChannel = proc( + channelId: SdsChannelID + ): Future[ChannelSnapshot] {.async: (raises: []), gcsafe.} = + try: + return await doLoadAll(job, channelId) + except CatchableError as e: + error "sds-persistency: loadAllForChannel failed", channelId, err = e.msg + return ChannelSnapshot() + + return persistence + +{.pop.} diff --git a/waku/waku_persistency.nim b/waku/waku_persistency.nim new file mode 100644 index 000000000..5eb94e3f0 --- /dev/null +++ b/waku/waku_persistency.nim @@ -0,0 +1,3 @@ +import waku/persistency/persistency + +export persistency From 2939ad941e30a50633d3ec05888f49b720825297 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 26 May 2026 11:17:55 +0200 Subject: [PATCH 03/10] nph formatting --- tests/persistency/test_blob_codec.nim | 8 ++++-- waku/persistency/payload_codec.nim | 18 ++++++------ waku/persistency/sds_persistency.nim | 40 +++++++++------------------ 3 files changed, 28 insertions(+), 38 deletions(-) diff --git a/tests/persistency/test_blob_codec.nim b/tests/persistency/test_blob_codec.nim index 884e94d09..d1806a9ed 100644 --- a/tests/persistency/test_blob_codec.nim +++ b/tests/persistency/test_blob_codec.nim @@ -56,12 +56,14 @@ suite "Persistency blob codec": check fromBlob(toBlob(u), UnacknowledgedMessage) == u test "IncomingMessage round-trips (HashSet)": - let inc = IncomingMessage.init(sampleMessage(), toHashSet(["dep-a", "dep-b", "dep-c"])) + let inc = + IncomingMessage.init(sampleMessage(), toHashSet(["dep-a", "dep-b", "dep-c"])) check fromBlob(toBlob(inc), IncomingMessage) == inc test "repair tuples round-trip": - let outPair = - ("msg-42".SdsMessageID, OutgoingRepairEntry.init(sampleHistory(), initTime(10, 0))) + let outPair = ( + "msg-42".SdsMessageID, OutgoingRepairEntry.init(sampleHistory(), initTime(10, 0)) + ) check fromBlob(toBlob(outPair), (SdsMessageID, OutgoingRepairEntry)) == outPair let inPair = ( diff --git a/waku/persistency/payload_codec.nim b/waku/persistency/payload_codec.nim index 361582b02..03805c5d1 100644 --- a/waku/persistency/payload_codec.nim +++ b/waku/persistency/payload_codec.nim @@ -136,7 +136,9 @@ proc writePart*(buf: var seq[byte], b: seq[byte]) = for x in b: buf.add(x) -proc readPart*(r: var ReadCtx, _: typedesc[seq[byte]]): seq[byte] {.raises: [ValueError].} = +proc readPart*( + r: var ReadCtx, _: typedesc[seq[byte]] +): seq[byte] {.raises: [ValueError].} = let n = int(readPart(r, uint32)) r.need(n) result = newSeq[byte](n) @@ -206,9 +208,7 @@ proc writePart*[T: tuple](buf: var seq[byte], v: T) = for f in fields(v): writePart(buf, f) -proc readPart*[T: tuple]( - r: var ReadCtx, _: typedesc[T] -): T {.raises: [ValueError].} = +proc readPart*[T: tuple](r: var ReadCtx, _: typedesc[T]): T {.raises: [ValueError].} = mixin readPart for f in fields(result): f = readPart(r, typeof(f)) @@ -264,7 +264,9 @@ macro BlobCodec*(T: typedesc): untyped = name = ident "writePart", params = [ newEmptyNode(), - newIdentDefs(bufId, nnkVarTy.newTree(nnkBracketExpr.newTree(ident "seq", ident "byte"))), + newIdentDefs( + bufId, nnkVarTy.newTree(nnkBracketExpr.newTree(ident "seq", ident "byte")) + ), newIdentDefs(vId, tSym), ], body = writeBody, @@ -287,9 +289,9 @@ macro BlobCodec*(T: typedesc): untyped = ], body = readBody, ) - readProc.addPragma(nnkExprColonExpr.newTree( - ident "raises", nnkBracket.newTree(ident "ValueError") - )) + readProc.addPragma( + nnkExprColonExpr.newTree(ident "raises", nnkBracket.newTree(ident "ValueError")) + ) result = newStmtList(writeProc, readProc) diff --git a/waku/persistency/sds_persistency.nim b/waku/persistency/sds_persistency.nim index eef687ec0..2f0e5ad50 100644 --- a/waku/persistency/sds_persistency.nim +++ b/waku/persistency/sds_persistency.nim @@ -76,9 +76,7 @@ BlobCodec(IncomingRepairEntry) # ── Async backing procs ───────────────────────────────────────────────── -proc doLoadAll( - job: Job, channelId: SdsChannelID -): Future[ChannelSnapshot] {.async.} = +proc doLoadAll(job: Job, channelId: SdsChannelID): Future[ChannelSnapshot] {.async.} = var snap = ChannelSnapshot() let chanKey = toKey(channelId) @@ -90,11 +88,9 @@ proc doLoadAll( try: snap.lamportTimestamp = fromBlob(opt.get, int64) except ValueError as e: - warn "sds-persistency: invalid lamport bytes", - channelId, err = e.msg + warn "sds-persistency: invalid lamport bytes", channelId, err = e.msg else: - warn "sds-persistency: get lamport failed", - channelId, err = $r.error + warn "sds-persistency: get lamport failed", channelId, err = $r.error block log: let r = await job.scanPrefix(CatLog, chanKey) @@ -111,8 +107,7 @@ proc doLoadAll( result = cmp(a.messageId, b.messageId) snap.messageHistory = msgs else: - warn "sds-persistency: scan log failed", - channelId, err = $r.error + warn "sds-persistency: scan log failed", channelId, err = $r.error block outgoing: let r = await job.scanPrefix(CatOutgoing, chanKey) @@ -121,11 +116,9 @@ proc doLoadAll( try: snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage)) except ValueError as e: - warn "sds-persistency: invalid outgoing row", - channelId, err = e.msg + warn "sds-persistency: invalid outgoing row", channelId, err = e.msg else: - warn "sds-persistency: scan outgoing failed", - channelId, err = $r.error + warn "sds-persistency: scan outgoing failed", channelId, err = $r.error block incoming: let r = await job.scanPrefix(CatIncoming, chanKey) @@ -134,11 +127,9 @@ proc doLoadAll( try: snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage)) except ValueError as e: - warn "sds-persistency: invalid incoming row", - channelId, err = e.msg + warn "sds-persistency: invalid incoming row", channelId, err = e.msg else: - warn "sds-persistency: scan incoming failed", - channelId, err = $r.error + warn "sds-persistency: scan incoming failed", channelId, err = $r.error block outRepair: let r = await job.scanPrefix(CatOutRepair, chanKey) @@ -149,11 +140,9 @@ proc doLoadAll( fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry)) ) except ValueError as e: - warn "sds-persistency: invalid out-repair row", - channelId, err = e.msg + warn "sds-persistency: invalid out-repair row", channelId, err = e.msg else: - warn "sds-persistency: scan out-repair failed", - channelId, err = $r.error + warn "sds-persistency: scan out-repair failed", channelId, err = $r.error block inRepair: let r = await job.scanPrefix(CatInRepair, chanKey) @@ -164,11 +153,9 @@ proc doLoadAll( fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry)) ) except ValueError as e: - warn "sds-persistency: invalid in-repair row", - channelId, err = e.msg + warn "sds-persistency: invalid in-repair row", channelId, err = e.msg else: - warn "sds-persistency: scan in-repair failed", - channelId, err = $r.error + warn "sds-persistency: scan in-repair failed", channelId, err = $r.error return snap @@ -180,8 +167,7 @@ proc doDropChannel(job: Job, channelId: SdsChannelID): Future[void] {.async.} = var ops: seq[TxOp] = @[] var hintIds: seq[SdsMessageID] = @[] - let cats = - [CatLog, CatOutgoing, CatIncoming, CatOutRepair, CatInRepair] + let cats = [CatLog, CatOutgoing, CatIncoming, CatOutRepair, CatInRepair] for cat in cats: let r = await job.scanPrefix(cat, chanKey) if r.isOk: From ef36c2f5134a95689b091e02ea1a06b8e6edda36 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 27 May 2026 01:37:59 +0200 Subject: [PATCH 04/10] Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- tests/persistency/test_blob_codec.nim | 5 ++++ waku/persistency/payload_codec.nim | 39 ++++++++++++++++++++------- waku/persistency/sds_persistency.nim | 11 ++++---- 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/tests/persistency/test_blob_codec.nim b/tests/persistency/test_blob_codec.nim index d1806a9ed..f30cf1d4f 100644 --- a/tests/persistency/test_blob_codec.nim +++ b/tests/persistency/test_blob_codec.nim @@ -87,3 +87,8 @@ suite "Persistency blob codec": discard fromBlob(bytes[0 ..< bytes.len - 5], SdsMessage) expect ValueError: discard fromBlob(@[0xFF'u8, 0xFF, 0xFF, 0xFF], string) # claims 4 GiB + + test "trailing input raises ValueError": + let bytes = toBlob(sampleMessage()) + expect ValueError: + discard fromBlob(bytes & @[0x00'u8], SdsMessage) diff --git a/waku/persistency/payload_codec.nim b/waku/persistency/payload_codec.nim index 03805c5d1..04a92bc6e 100644 --- a/waku/persistency/payload_codec.nim +++ b/waku/persistency/payload_codec.nim @@ -82,7 +82,11 @@ proc writePart*(buf: var seq[byte], v: int) = writePart(buf, int64(v)) proc readPart*(r: var ReadCtx, _: typedesc[int]): int {.raises: [ValueError].} = - int(readPart(r, int64)) + let x = readPart(r, int64) + when sizeof(int) < sizeof(int64): + if x < int64(low(int)) or x > int64(high(int)): + raise newException(ValueError, "int out of range: " & $x) + result = int(x) # ── Small scalars ─────────────────────────────────────────────────────── @@ -114,7 +118,12 @@ proc writePart*[E: enum](buf: var seq[byte], v: E) = writePart(buf, int64(ord(v))) proc readPart*[E: enum](r: var ReadCtx, _: typedesc[E]): E {.raises: [ValueError].} = - E(readPart(r, int64)) + let x = readPart(r, int64) + let lo = int64(ord(low(E))) + let hi = int64(ord(high(E))) + if x < lo or x > hi: + raise newException(ValueError, "enum value out of range: " & $x) + result = E(x) # ── string / seq[byte] (4-byte length) ────────────────────────────────── @@ -124,7 +133,10 @@ proc writePart*(buf: var seq[byte], s: string) = buf.add(byte(c)) proc readPart*(r: var ReadCtx, _: typedesc[string]): string {.raises: [ValueError].} = - let n = int(readPart(r, uint32)) + let nU = readPart(r, uint32) + if nU > uint32(high(int)): + raise newException(ValueError, "string length out of range: " & $nU) + let n = int(nU) r.need(n) result = newString(n) for i in 0 ..< n: @@ -139,7 +151,10 @@ proc writePart*(buf: var seq[byte], b: seq[byte]) = proc readPart*( r: var ReadCtx, _: typedesc[seq[byte]] ): seq[byte] {.raises: [ValueError].} = - let n = int(readPart(r, uint32)) + let nU = readPart(r, uint32) + if nU > uint32(high(int)): + raise newException(ValueError, "blob length out of range: " & $nU) + let n = int(nU) r.need(n) result = newSeq[byte](n) for i in 0 ..< n: @@ -183,11 +198,13 @@ proc readPart*[T]( r: var ReadCtx, _: typedesc[seq[T]] ): seq[T] {.raises: [ValueError].} = mixin readPart - let n = int(readPart(r, uint32)) + let nU = readPart(r, uint32) + if nU > uint32(high(int)): + raise newException(ValueError, "sequence length out of range: " & $nU) + let n = int(nU) result = newSeqOfCap[T](n) for _ in 0 ..< n: result.add(readPart(r, T)) - proc writePart*[T](buf: var seq[byte], s: HashSet[T]) = mixin writePart writePart(buf, uint32(s.len)) @@ -198,11 +215,13 @@ proc readPart*[T]( r: var ReadCtx, _: typedesc[HashSet[T]] ): HashSet[T] {.raises: [ValueError].} = mixin readPart - let n = int(readPart(r, uint32)) + let nU = readPart(r, uint32) + if nU > uint32(high(int)): + raise newException(ValueError, "set length out of range: " & $nU) + let n = int(nU) result = initHashSet[T](max(n, 2)) for _ in 0 ..< n: result.incl(readPart(r, T)) - proc writePart*[T: tuple](buf: var seq[byte], v: T) = mixin writePart for f in fields(v): @@ -305,6 +324,8 @@ proc toBlob*[T](v: T): seq[byte] = proc fromBlob*[T](bytes: openArray[byte], _: typedesc[T]): T {.raises: [ValueError].} = mixin readPart var r = initReadCtx(bytes) - readPart(r, T) + result = readPart(r, T) + if r.pos != r.buf.len: + raise newException(ValueError, "trailing payload bytes: " & $(r.buf.len - r.pos)) {.pop.} diff --git a/waku/persistency/sds_persistency.nim b/waku/persistency/sds_persistency.nim index 2f0e5ad50..41fbf1cf0 100644 --- a/waku/persistency/sds_persistency.nim +++ b/waku/persistency/sds_persistency.nim @@ -10,12 +10,13 @@ ## {.async: (raises: []), gcsafe.}`) — SDS awaits them on its own chronos ## loop. We map each onto the matching async `Job` op: ## -## * **Writes (save*/remove*)** — `await` the Job op through the `safePut`/ -## `safeDelete` helpers, which trap any backend error and log it rather -## than raising (the contract forbids raising). +## * **Writes (save*/remove*)** — call the fire-and-forget `Job.persist*` ops +## through the `safePut`/`safeDelete` helpers, which trap any backend error +## and log it rather than raising (the contract forbids raising). Note that +## Persistency v1 only guarantees the event has been queued when the Future +## resolves — reads immediately after an awaited write can still be racy. ## * **`dropChannel`** — awaits `doDropChannel`, which batches every row of -## the channel into one transactional `persist` so the wipe is atomic -## (called from removeChannel / resetReliabilityManager). +## the channel into one transactional `persist` (atomic when applied). ## * **`loadAllForChannel`** — awaits `doLoadAll` and returns the snapshot ## the SDS bootstrap path needs. ## From 5b80f459223d901a122c64152d5d2efff3244463 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 27 May 2026 02:49:25 +0200 Subject: [PATCH 05/10] use valueOr --- waku/persistency/sds_persistency.nim | 144 +++++++++++++-------------- 1 file changed, 68 insertions(+), 76 deletions(-) diff --git a/waku/persistency/sds_persistency.nim b/waku/persistency/sds_persistency.nim index 41fbf1cf0..a52b7b4e9 100644 --- a/waku/persistency/sds_persistency.nim +++ b/waku/persistency/sds_persistency.nim @@ -82,81 +82,74 @@ proc doLoadAll(job: Job, channelId: SdsChannelID): Future[ChannelSnapshot] {.asy let chanKey = toKey(channelId) block lamport: - let r = await job.get(CatLamport, chanKey) - if r.isOk: - let opt = r.get - if opt.isSome: - try: - snap.lamportTimestamp = fromBlob(opt.get, int64) - except ValueError as e: - warn "sds-persistency: invalid lamport bytes", channelId, err = e.msg - else: - warn "sds-persistency: get lamport failed", channelId, err = $r.error + let opt = (await job.get(CatLamport, chanKey)).valueOr: + warn "sds-persistency: get lamport failed", channelId, err = $error + break lamport + if opt.isSome: + try: + snap.lamportTimestamp = fromBlob(opt.get, int64) + except ValueError as e: + warn "sds-persistency: invalid lamport bytes", channelId, err = e.msg block log: - let r = await job.scanPrefix(CatLog, chanKey) - if r.isOk: - var msgs = newSeq[SdsMessage]() - for row in r.get: - try: - msgs.add(fromBlob(row.payload, SdsMessage)) - except ValueError as e: - warn "sds-persistency: invalid log row", channelId, err = e.msg - msgs.sort do(a, b: SdsMessage) -> int: - result = cmp(a.lamportTimestamp, b.lamportTimestamp) - if result == 0: - result = cmp(a.messageId, b.messageId) - snap.messageHistory = msgs - else: - warn "sds-persistency: scan log failed", channelId, err = $r.error + let rows = (await job.scanPrefix(CatLog, chanKey)).valueOr: + warn "sds-persistency: scan log failed", channelId, err = $error + break log + var msgs = newSeq[SdsMessage]() + for row in rows: + try: + msgs.add(fromBlob(row.payload, SdsMessage)) + except ValueError as e: + warn "sds-persistency: invalid log row", channelId, err = e.msg + msgs.sort do(a, b: SdsMessage) -> int: + result = cmp(a.lamportTimestamp, b.lamportTimestamp) + if result == 0: + result = cmp(a.messageId, b.messageId) + snap.messageHistory = msgs block outgoing: - let r = await job.scanPrefix(CatOutgoing, chanKey) - if r.isOk: - for row in r.get: - try: - snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage)) - except ValueError as e: - warn "sds-persistency: invalid outgoing row", channelId, err = e.msg - else: - warn "sds-persistency: scan outgoing failed", channelId, err = $r.error + let rows = (await job.scanPrefix(CatOutgoing, chanKey)).valueOr: + warn "sds-persistency: scan outgoing failed", channelId, err = $error + break outgoing + for row in rows: + try: + snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage)) + except ValueError as e: + warn "sds-persistency: invalid outgoing row", channelId, err = e.msg block incoming: - let r = await job.scanPrefix(CatIncoming, chanKey) - if r.isOk: - for row in r.get: - try: - snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage)) - except ValueError as e: - warn "sds-persistency: invalid incoming row", channelId, err = e.msg - else: - warn "sds-persistency: scan incoming failed", channelId, err = $r.error + let rows = (await job.scanPrefix(CatIncoming, chanKey)).valueOr: + warn "sds-persistency: scan incoming failed", channelId, err = $error + break incoming + for row in rows: + try: + snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage)) + except ValueError as e: + warn "sds-persistency: invalid incoming row", channelId, err = e.msg block outRepair: - let r = await job.scanPrefix(CatOutRepair, chanKey) - if r.isOk: - for row in r.get: - try: - snap.outgoingRepairBuffer.add( - fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry)) - ) - except ValueError as e: - warn "sds-persistency: invalid out-repair row", channelId, err = e.msg - else: - warn "sds-persistency: scan out-repair failed", channelId, err = $r.error + let rows = (await job.scanPrefix(CatOutRepair, chanKey)).valueOr: + warn "sds-persistency: scan out-repair failed", channelId, err = $error + break outRepair + for row in rows: + try: + snap.outgoingRepairBuffer.add( + fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry)) + ) + except ValueError as e: + warn "sds-persistency: invalid out-repair row", channelId, err = e.msg block inRepair: - let r = await job.scanPrefix(CatInRepair, chanKey) - if r.isOk: - for row in r.get: - try: - snap.incomingRepairBuffer.add( - fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry)) - ) - except ValueError as e: - warn "sds-persistency: invalid in-repair row", channelId, err = e.msg - else: - warn "sds-persistency: scan in-repair failed", channelId, err = $r.error + let rows = (await job.scanPrefix(CatInRepair, chanKey)).valueOr: + warn "sds-persistency: scan in-repair failed", channelId, err = $error + break inRepair + for row in rows: + try: + snap.incomingRepairBuffer.add( + fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry)) + ) + except ValueError as e: + warn "sds-persistency: invalid in-repair row", channelId, err = e.msg return snap @@ -170,18 +163,17 @@ proc doDropChannel(job: Job, channelId: SdsChannelID): Future[void] {.async.} = let cats = [CatLog, CatOutgoing, CatIncoming, CatOutRepair, CatInRepair] for cat in cats: - let r = await job.scanPrefix(cat, chanKey) - if r.isOk: - for row in r.get: - ops.add(TxOp(category: cat, key: row.key, kind: txDelete)) - if cat == CatLog: - try: - hintIds.add(fromBlob(row.payload, SdsMessage).messageId) - except ValueError: - discard - else: + let rows = (await job.scanPrefix(cat, chanKey)).valueOr: warn "sds-persistency: scan during drop failed", - channelId, category = cat, err = $r.error + channelId, category = cat, err = $error + continue + for row in rows: + ops.add(TxOp(category: cat, key: row.key, kind: txDelete)) + if cat == CatLog: + try: + hintIds.add(fromBlob(row.payload, SdsMessage).messageId) + except ValueError: + discard ops.add(TxOp(category: CatLamport, key: chanKey, kind: txDelete)) for id in hintIds: From 83179f4ba559efc9c6aa56cbbe34c1499bf36682 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 27 May 2026 10:03:23 +0200 Subject: [PATCH 06/10] Allow sub key delete (optimize by no need to pre-polpulate full key ahead of deletes), refactored channel-drop and hint maintenance now tied to log entry removal --- waku/persistency/backend_comm.nim | 4 +- waku/persistency/backend_sqlite.nim | 35 +++++++++- waku/persistency/persistency.nim | 12 ++++ waku/persistency/sds_persistency.nim | 101 ++++++++++++++------------- waku/persistency/types.nim | 3 + 5 files changed, 106 insertions(+), 49 deletions(-) diff --git a/waku/persistency/backend_comm.nim b/waku/persistency/backend_comm.nim index dd7e71297..193e52825 100644 --- a/waku/persistency/backend_comm.nim +++ b/waku/persistency/backend_comm.nim @@ -68,7 +68,7 @@ proc mtMarshalValue*( of txPut: if not mtMarshalValue(buf, cap, value.payload, pos): return false - of txDelete: + of txDelete, txDeletePrefix: discard return true @@ -93,6 +93,8 @@ proc mtUnmarshalValue*( value = TxOp(category: category, key: key, kind: txPut, payload: payload) of txDelete: value = TxOp(category: category, key: key, kind: txDelete) + of txDeletePrefix: + value = TxOp(category: category, key: key, kind: txDeletePrefix) return true EventBroker(mt): diff --git a/waku/persistency/backend_sqlite.nim b/waku/persistency/backend_sqlite.nim index 6851febc1..95757bc2c 100644 --- a/waku/persistency/backend_sqlite.nim +++ b/waku/persistency/backend_sqlite.nim @@ -7,7 +7,7 @@ import std/options import results, sqlite3_abi import ../common/databases/[common, db_sqlite] -import ./[types, schema] +import ./[types, keys, schema] type KvBackend* = ref object @@ -121,6 +121,37 @@ proc close*(b: KvBackend) = b.db.close() b.db = nil +proc deletePrefix( + b: KvBackend, category: string, prefix: Key +): Result[void, PersistencyError] = + let rng = prefixRange(prefix) + let openEnded = bytes(rng.stop).len == 0 + let sql = + if openEnded: + "DELETE FROM kv WHERE category = ? AND key >= ?;" + else: + "DELETE FROM kv WHERE category = ? AND key >= ? AND key < ?;" + var s: ptr sqlite3_stmt + let rc = sqlite3_prepare_v2(b.db.env, sql.cstring, sql.len.cint, addr s, nil) + if rc != SQLITE_OK: + return err(toErr("deletePrefix prepare: " & $sqlite3_errstr(rc))) + defer: + discard sqlite3_finalize(s) + var bc = bindBlob(s, 1.cint, catBytes(category)) + if bc != SQLITE_OK: + return err(toErr("deletePrefix bind cat: " & $sqlite3_errstr(bc))) + bc = bindBlob(s, 2.cint, keyBytes(rng.start)) + if bc != SQLITE_OK: + return err(toErr("deletePrefix bind start: " & $sqlite3_errstr(bc))) + if not openEnded: + bc = bindBlob(s, 3.cint, keyBytes(rng.stop)) + if bc != SQLITE_OK: + return err(toErr("deletePrefix bind stop: " & $sqlite3_errstr(bc))) + let v = sqlite3_step(s) + if v != SQLITE_DONE: + return err(toErr("deletePrefix step: " & $sqlite3_errstr(v))) + return ok() + proc applyOne(b: KvBackend, op: TxOp): Result[void, PersistencyError] = case op.kind of txPut: @@ -131,6 +162,8 @@ proc applyOne(b: KvBackend, op: TxOp): Result[void, PersistencyError] = let r = b.deleteStmt.exec((catBytes(op.category), keyBytes(op.key))) if r.isErr: return err(toErr("delete failed: " & r.error)) + of txDeletePrefix: + ?b.deletePrefix(op.category, op.key) return ok() proc execSql(b: KvBackend, sql: string): Result[void, PersistencyError] = diff --git a/waku/persistency/persistency.nim b/waku/persistency/persistency.nim index 916f3ac8b..1e070dbb5 100644 --- a/waku/persistency/persistency.nim +++ b/waku/persistency/persistency.nim @@ -284,6 +284,11 @@ proc persistPut*( proc persistDelete*(t: Job, category: string, key: Key): Future[void] {.async.} = await persist(t, TxOp(category: category, key: key, kind: txDelete)) +proc persistDeletePrefix*( + t: Job, category: string, prefix: Key +): Future[void] {.async.} = + await persist(t, TxOp(category: category, key: prefix, kind: txDeletePrefix)) + proc persistEncoded*[T]( t: Job, category: string, key: Key, value: T ): Future[void] {.async.} = @@ -335,6 +340,13 @@ proc persistDelete*( if not j.isNil(): await j.persistDelete(category, key) +proc persistDeletePrefix*( + p: Persistency, jobId: string, category: string, prefix: Key +): Future[void] {.async.} = + let j = p.jobOrWarn(jobId) + if not j.isNil(): + await j.persistDeletePrefix(category, prefix) + proc persistEncoded*[T]( p: Persistency, jobId: string, category: string, key: Key, value: T ): Future[void] {.async.} = diff --git a/waku/persistency/sds_persistency.nim b/waku/persistency/sds_persistency.nim index a52b7b4e9..44d551814 100644 --- a/waku/persistency/sds_persistency.nim +++ b/waku/persistency/sds_persistency.nim @@ -75,6 +75,32 @@ BlobCodec(IncomingMessage) BlobCodec(OutgoingRepairEntry) BlobCodec(IncomingRepairEntry) +# ── Write helpers ─────────────────────────────────────────────────────── +# +# The Persistence write fields are async with `raises: []`, but the Job ops +# raise `CatchableError`. These wrappers trap and log so the closures stay +# raise-free, preserving the "errors are logged, never raised" contract. + +proc safePut( + job: Job, category: string, k: Key, payload: seq[byte] +) {.async: (raises: []).} = + try: + await job.persistPut(category, k, payload) + except CatchableError as e: + warn "sds-persistency: put failed", category, err = e.msg + +proc safeDelete(job: Job, category: string, k: Key) {.async: (raises: []).} = + try: + await job.persistDelete(category, k) + except CatchableError as e: + warn "sds-persistency: delete failed", category, err = e.msg + +proc safePersist(job: Job, ops: seq[TxOp]) {.async: (raises: []).} = + try: + await job.persist(ops) + except CatchableError as e: + warn "sds-persistency: persist batch failed", opCount = ops.len, err = e.msg + # ── Async backing procs ───────────────────────────────────────────────── proc doLoadAll(job: Job, channelId: SdsChannelID): Future[ChannelSnapshot] {.async.} = @@ -154,53 +180,26 @@ proc doLoadAll(job: Job, channelId: SdsChannelID): Future[ChannelSnapshot] {.asy return snap proc doDropChannel(job: Job, channelId: SdsChannelID): Future[void] {.async.} = - ## Collect every row belonging to the channel and submit them as a single - ## TxOp batch — the backend applies that as one BEGIN IMMEDIATE/COMMIT, - ## which is the atomicity the SDS contract asks for. + ## Delete every row belonging to the channel in one transactional batch. + ## Uses txDeletePrefix to push bulk deletes to the worker thread — no + ## caller-side scans needed. Hint rows (keyed by msgId, not channelId) + ## are not cleaned here; they are cascade-deleted by removeLogEntry during + ## normal rolling-history eviction, so by the time a channel is dropped + ## the only remaining hints belong to the still-live log tail. Those + ## become harmless orphans (never reloaded — hints are re-derived on + ## demand from the onRetrievalHint callback). let chanKey = toKey(channelId) - var ops: seq[TxOp] = @[] - var hintIds: seq[SdsMessageID] = @[] - - let cats = [CatLog, CatOutgoing, CatIncoming, CatOutRepair, CatInRepair] - for cat in cats: - let rows = (await job.scanPrefix(cat, chanKey)).valueOr: - warn "sds-persistency: scan during drop failed", - channelId, category = cat, err = $error - continue - for row in rows: - ops.add(TxOp(category: cat, key: row.key, kind: txDelete)) - if cat == CatLog: - try: - hintIds.add(fromBlob(row.payload, SdsMessage).messageId) - except ValueError: - discard - - ops.add(TxOp(category: CatLamport, key: chanKey, kind: txDelete)) - for id in hintIds: - ops.add(TxOp(category: CatHint, key: toKey(id), kind: txDelete)) - - if ops.len > 0: - await job.persist(ops) - -# ── Write helpers ─────────────────────────────────────────────────────── -# -# The Persistence write fields are async with `raises: []`, but the Job ops -# raise `CatchableError`. These wrappers trap and log so the closures stay -# raise-free, preserving the "errors are logged, never raised" contract. - -proc safePut( - job: Job, category: string, k: Key, payload: seq[byte] -) {.async: (raises: []).} = - try: - await job.persistPut(category, k, payload) - except CatchableError as e: - warn "sds-persistency: put failed", category, err = e.msg - -proc safeDelete(job: Job, category: string, k: Key) {.async: (raises: []).} = - try: - await job.persistDelete(category, k) - except CatchableError as e: - warn "sds-persistency: delete failed", category, err = e.msg + await safePersist( + job, + @[ + TxOp(category: CatLog, key: chanKey, kind: txDeletePrefix), + TxOp(category: CatOutgoing, key: chanKey, kind: txDeletePrefix), + TxOp(category: CatIncoming, key: chanKey, kind: txDeletePrefix), + TxOp(category: CatOutRepair, key: chanKey, kind: txDeletePrefix), + TxOp(category: CatInRepair, key: chanKey, kind: txDeletePrefix), + TxOp(category: CatLamport, key: chanKey, kind: txDelete), + ], + ) # ── Public factory ────────────────────────────────────────────────────── @@ -233,7 +232,15 @@ proc newSdsPersistence*(job: Job): Persistence {.gcsafe, raises: [].} = persistence.removeLogEntry = proc( channelId: SdsChannelID, msgId: SdsMessageID ): Future[void] {.async: (raises: []), gcsafe.} = - await safeDelete(job, CatLog, key(channelId, msgId)) + # Atomic batch: delete the log row and its associated retrieval hint in + # one transaction so they can't diverge. + await safePersist( + job, + @[ + TxOp(category: CatLog, key: key(channelId, msgId), kind: txDelete), + TxOp(category: CatHint, key: toKey(msgId), kind: txDelete), + ], + ) persistence.setRetrievalHint = proc( msgId: SdsMessageID, hint: seq[byte] diff --git a/waku/persistency/types.nim b/waku/persistency/types.nim index 4c4c2de3f..0fdf12af0 100644 --- a/waku/persistency/types.nim +++ b/waku/persistency/types.nim @@ -19,6 +19,7 @@ type TxOpKind* = enum txPut txDelete + txDeletePrefix TxOp* = object category*: string @@ -28,6 +29,8 @@ type payload*: seq[byte] of txDelete: discard + of txDeletePrefix: + discard PersistencyErrorKind* = enum peBackend From 5b3bce6334e78d68515db410d6d11bfc80c26272 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 2 Jun 2026 01:12:35 +0200 Subject: [PATCH 07/10] deps: bump nim-sds to 4ccdd122 (0.3.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Update the nim-sds pin in waku.nimble and follow it through in nimble.lock: version, vcsRevision and the sha1 checksum (cee8c7e2…). Co-Authored-By: Claude Opus 4.8 --- nimble.lock | 6 +++--- waku.nimble | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nimble.lock b/nimble.lock index e03a3db29..92651e58c 100644 --- a/nimble.lock +++ b/nimble.lock @@ -620,8 +620,8 @@ } }, "sds": { - "version": "#35a33adc9808a053f4ad7af8d07ff92075ba3462", - "vcsRevision": "35a33adc9808a053f4ad7af8d07ff92075ba3462", + "version": "#4ccdd122fc4fa82f9ef69eef5dedd24ca2d9f420", + "vcsRevision": "4ccdd122fc4fa82f9ef69eef5dedd24ca2d9f420", "url": "https://github.com/logos-messaging/nim-sds.git", "downloadMethod": "git", "dependencies": [ @@ -636,7 +636,7 @@ "taskpools" ], "checksums": { - "sha1": "136296859b324403486b93d62ae9154e53fb15d4" + "sha1": "cee8c7e2e7b869da0be23d383f11ddda5b3524d2" } }, "ffi": { diff --git a/waku.nimble b/waku.nimble index 066a33018..fcbeaa18d 100644 --- a/waku.nimble +++ b/waku.nimble @@ -61,7 +61,7 @@ requires "nim >= 2.2.4", # Packages not on nimble (use git URLs) requires "https://github.com/logos-messaging/nim-ffi" -requires "https://github.com/logos-messaging/nim-sds.git#35a33adc9808a053f4ad7af8d07ff92075ba3462" +requires "https://github.com/logos-messaging/nim-sds.git#4ccdd122fc4fa82f9ef69eef5dedd24ca2d9f420" # brokers: pinned by URL+commit rather than the bare `brokers >= 2.0.1` # form because the nim-lang/packages registry entry for `brokers` only From fbdff090b1581088b9c4c75df7709fc7da20524e Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 2 Jun 2026 02:59:55 +0200 Subject: [PATCH 08/10] persistency: follow nim-sds 0.3.0 snapshot persistence contract nim-sds 0.3.0 replaced the ~14 fine-grained per-row Persistence callbacks with a 5-proc snapshot model (saveChannelMeta / updateHistory / loadChannel / dropChannel / setRetrievalHint), all returning Future[Result[...]]. Rewrite waku/persistency/sds_persistency.nim accordingly: - ChannelMeta is stored as one blob per channel; the message log as append/evict rows. Categories collapse from 7 to 2 (sds.meta, sds.log). - Blob transform uses nim-sds' own codecs: snapshot_codec (schema-versioned protobuf) for ChannelMeta, the SDS wire codec for SdsMessage log rows. The generic payload_codec/BlobCodec path is retired (removed payload_codec.nim and test_blob_codec.nim). - setRetrievalHint is a deliberate no-op: persisted hints are never read back (loadChannel/ChannelMeta carry none; hints are supplied live via the onRetrievalHint provider). The closure stays because the field is required. - Fix the module import spelling (srcDir="sds" => bare module paths), which the previous adapter got wrong and never compiled against the locked deps. Add tests/persistency/test_sds_persistency.nim (round-trip, empty-load, evict, drop) replacing test_blob_codec in test_all. Full persistency suite passes 74/74 under both refc and ORC. Co-Authored-By: Claude Opus 4.8 --- tests/persistency/test_all.nim | 2 +- tests/persistency/test_blob_codec.nim | 94 ----- tests/persistency/test_sds_persistency.nim | 155 ++++++++ waku/persistency/payload_codec.nim | 331 ----------------- waku/persistency/sds_persistency.nim | 395 +++++++-------------- 5 files changed, 291 insertions(+), 686 deletions(-) delete mode 100644 tests/persistency/test_blob_codec.nim create mode 100644 tests/persistency/test_sds_persistency.nim delete mode 100644 waku/persistency/payload_codec.nim diff --git a/tests/persistency/test_all.nim b/tests/persistency/test_all.nim index 0909850a4..5b0cfdbb5 100644 --- a/tests/persistency/test_all.nim +++ b/tests/persistency/test_all.nim @@ -5,6 +5,6 @@ import ./test_backend import ./test_lifecycle import ./test_facade import ./test_encoding -import ./test_blob_codec +import ./test_sds_persistency import ./test_string_lookup import ./test_singleton diff --git a/tests/persistency/test_blob_codec.nim b/tests/persistency/test_blob_codec.nim deleted file mode 100644 index f30cf1d4f..000000000 --- a/tests/persistency/test_blob_codec.nim +++ /dev/null @@ -1,94 +0,0 @@ -{.used.} - -## Round-trip tests for the generic payload blob codec (payload_codec.nim) -## and its `BlobCodec` application to the SDS persistence types. -## -## Importing `sds_persistency` forces the real adapter (and its six -## `BlobCodec` calls + call sites) to compile. The local derivations -## below live in this module's own scope and drive the round-trip checks. - -import std/[sets, times] -import testutils/unittests -import waku/persistency/payload_codec -import waku/persistency/sds_persistency # compile-checks the real adapter -import sds/types/persistence - -# Same order constraint as the adapter: a field's type before its container. -BlobCodec(HistoryEntry) -BlobCodec(SdsMessage) -BlobCodec(UnacknowledgedMessage) -BlobCodec(IncomingMessage) -BlobCodec(OutgoingRepairEntry) -BlobCodec(IncomingRepairEntry) - -proc sampleHistory(): HistoryEntry = - HistoryEntry.init("mid-1", @[0xAA'u8, 0xBB, 0xCC], "sender-1".SdsParticipantID) - -proc sampleMessage(content = @[1'u8, 2, 3]): SdsMessage = - SdsMessage.init( - messageId = "msg-42", - lamportTimestamp = 7'i64, - causalHistory = @[sampleHistory(), HistoryEntry.init("mid-2")], - channelId = "channel-9", - content = content, - bloomFilter = @[0xDE'u8, 0xAD, 0xBE, 0xEF], - senderId = "alice".SdsParticipantID, - repairRequest = @[sampleHistory()], - ) - -suite "Persistency blob codec": - test "primitive round-trips": - check fromBlob(toBlob(7'i64), int64) == 7'i64 - check fromBlob(toBlob(int64.low), int64) == int64.low - check fromBlob(toBlob("héllo"), string) == "héllo" - check fromBlob(toBlob(@[0'u8, 255, 7]), seq[byte]) == @[0'u8, 255, 7] - - test "HistoryEntry round-trips": - let h = sampleHistory() - check fromBlob(toBlob(h), HistoryEntry) == h - - test "SdsMessage round-trips (seqs, distinct, nested)": - let m = sampleMessage() - check fromBlob(toBlob(m), SdsMessage) == m - - test "UnacknowledgedMessage round-trips (Time, int)": - let u = UnacknowledgedMessage.init(sampleMessage(), initTime(1_700_000_000, 123), 4) - check fromBlob(toBlob(u), UnacknowledgedMessage) == u - - test "IncomingMessage round-trips (HashSet)": - let inc = - IncomingMessage.init(sampleMessage(), toHashSet(["dep-a", "dep-b", "dep-c"])) - check fromBlob(toBlob(inc), IncomingMessage) == inc - - test "repair tuples round-trip": - let outPair = ( - "msg-42".SdsMessageID, OutgoingRepairEntry.init(sampleHistory(), initTime(10, 0)) - ) - check fromBlob(toBlob(outPair), (SdsMessageID, OutgoingRepairEntry)) == outPair - - let inPair = ( - "msg-42".SdsMessageID, - IncomingRepairEntry.init(sampleHistory(), @[9'u8, 8, 7], initTime(20, 500)), - ) - check fromBlob(toBlob(inPair), (SdsMessageID, IncomingRepairEntry)) == inPair - - test "payload exceeds the old 64 KiB key cap (4-byte length)": - var big = newSeq[byte](70_000) - for i in 0 ..< big.len: - big[i] = byte(i and 0xFF) - let m = sampleMessage(content = big) - let decoded = fromBlob(toBlob(m), SdsMessage) - check decoded.content.len == 70_000 - check decoded == m - - test "truncated input raises ValueError": - let bytes = toBlob(sampleMessage()) - expect ValueError: - discard fromBlob(bytes[0 ..< bytes.len - 5], SdsMessage) - expect ValueError: - discard fromBlob(@[0xFF'u8, 0xFF, 0xFF, 0xFF], string) # claims 4 GiB - - test "trailing input raises ValueError": - let bytes = toBlob(sampleMessage()) - expect ValueError: - discard fromBlob(bytes & @[0x00'u8], SdsMessage) diff --git a/tests/persistency/test_sds_persistency.nim b/tests/persistency/test_sds_persistency.nim new file mode 100644 index 000000000..ed14f904b --- /dev/null +++ b/tests/persistency/test_sds_persistency.nim @@ -0,0 +1,155 @@ +{.used.} + +## Behavioural tests for the SDS Persistence adapter (nim-sds 0.3.0 snapshot +## model). Importing `sds_persistency` also compile-checks the real adapter. +## +## Writes go through the fire-and-forget Job path (the Future resolves when +## the op is queued, not applied — Persistency v1), so every read-back polls +## until the row appears/disappears. + +import std/[options, os, times] +import chronos, results +import testutils/unittests +import waku/persistency/persistency +import waku/persistency/keys +import waku/persistency/sds_persistency + +proc tmpRoot(label: string): string = + let p = getTempDir() / ("sds_persistency_test_" & label & "_" & $epochTime().int) + removeDir(p) + p + +proc pollExists( + t: Job, category: string, k: Key, timeoutMs = 1000 +): Future[bool] {.async.} = + let deadline = epochTime() + (timeoutMs.float / 1000.0) + while epochTime() < deadline: + let r = await t.exists(category, k) + if r.isOk and r.get(): + return true + await sleepAsync(chronos.milliseconds(2)) + return false + +proc pollGone( + t: Job, category: string, k: Key, timeoutMs = 1000 +): Future[bool] {.async.} = + let deadline = epochTime() + (timeoutMs.float / 1000.0) + while epochTime() < deadline: + let r = await t.exists(category, k) + if r.isOk and not r.get(): + return true + await sleepAsync(chronos.milliseconds(2)) + return false + +proc mkMsg(channelId: SdsChannelID, msgId: SdsMessageID, lamport: int64): SdsMessage = + SdsMessage.init( + messageId = msgId, + lamportTimestamp = lamport, + causalHistory = @[], + channelId = channelId, + content = @[byte(1), byte(2)], + bloomFilter = @[], + ) + +suite "SDS persistency adapter (0.3.0 snapshot model)": + asyncTest "saveChannelMeta + updateHistory round-trip via loadChannel": + let root = tmpRoot("roundtrip") + defer: + removeDir(root) + let p = Persistency.instance(root).get() + defer: + Persistency.reset() + let job = p.openJob("sds").get() + let persistence = newSdsPersistence(job) + let channelId = "chan-1".SdsChannelID + + var meta = ChannelMeta.init() + meta.lamportTimestamp = 42 + check (await persistence.saveChannelMeta(channelId, meta)).isOk + check (await job.pollExists(CatMeta, toKey(channelId))) + + # append out of (lamport) order on purpose; loadChannel must sort. + var upd = HistoryUpdate.init() + upd.append = @[mkMsg(channelId, "m2", 2), mkMsg(channelId, "m1", 1)] + check (await persistence.updateHistory(channelId, upd)).isOk + check (await job.pollExists(CatLog, key(channelId, "m2"))) + + let data = (await persistence.loadChannel(channelId)).valueOr: + check false + return + check data.meta.lamportTimestamp == 42 + check data.messageHistory.len == 2 + check data.messageHistory[0].messageId == "m1" + check data.messageHistory[1].messageId == "m2" + + asyncTest "loadChannel on a fresh channel returns empty ChannelData": + let root = tmpRoot("empty") + defer: + removeDir(root) + let p = Persistency.instance(root).get() + defer: + Persistency.reset() + let job = p.openJob("sds").get() + let persistence = newSdsPersistence(job) + + let data = (await persistence.loadChannel("nope".SdsChannelID)).valueOr: + check false + return + check data.meta.lamportTimestamp == 0 + check data.messageHistory.len == 0 + + asyncTest "updateHistory evict removes a log row": + let root = tmpRoot("evict") + defer: + removeDir(root) + let p = Persistency.instance(root).get() + defer: + Persistency.reset() + let job = p.openJob("sds").get() + let persistence = newSdsPersistence(job) + let channelId = "c".SdsChannelID + + var upd = HistoryUpdate.init() + upd.append = @[mkMsg(channelId, "a", 1), mkMsg(channelId, "b", 2)] + check (await persistence.updateHistory(channelId, upd)).isOk + check (await job.pollExists(CatLog, key(channelId, "b"))) + + var ev = HistoryUpdate.init() + ev.evict = @["a".SdsMessageID] + check (await persistence.updateHistory(channelId, ev)).isOk + check (await job.pollGone(CatLog, key(channelId, "a"))) + + let data = (await persistence.loadChannel(channelId)).valueOr: + check false + return + check data.messageHistory.len == 1 + check data.messageHistory[0].messageId == "b" + + asyncTest "dropChannel wipes meta and log": + let root = tmpRoot("drop") + defer: + removeDir(root) + let p = Persistency.instance(root).get() + defer: + Persistency.reset() + let job = p.openJob("sds").get() + let persistence = newSdsPersistence(job) + let channelId = "d".SdsChannelID + + var meta = ChannelMeta.init() + meta.lamportTimestamp = 7 + check (await persistence.saveChannelMeta(channelId, meta)).isOk + var upd = HistoryUpdate.init() + upd.append = @[mkMsg(channelId, "x", 1)] + check (await persistence.updateHistory(channelId, upd)).isOk + check (await job.pollExists(CatMeta, toKey(channelId))) + check (await job.pollExists(CatLog, key(channelId, "x"))) + + check (await persistence.dropChannel(channelId)).isOk + check (await job.pollGone(CatMeta, toKey(channelId))) + + let data = (await persistence.loadChannel(channelId)).valueOr: + check false + return + check data.meta.lamportTimestamp == 0 + check data.messageHistory.len == 0 diff --git a/waku/persistency/payload_codec.nim b/waku/persistency/payload_codec.nim deleted file mode 100644 index 04a92bc6e..000000000 --- a/waku/persistency/payload_codec.nim +++ /dev/null @@ -1,331 +0,0 @@ -## Generic length-prefixed blob codec for persistency payloads. -## -## Symmetric counterpart to `keys.nim`'s `encodePart`: every persisted value -## round-trips through `writePart`/`readPart` over a `ReadCtx` cursor. Unlike -## keys, payloads are not byte-wise sort-stable, so strings and byte blobs use -## a **4-byte BE length prefix** (4 GiB ceiling) instead of keys.nim's 2-byte -## (64 KiB) prefix — the cap that originally forced SDS to hand-roll its codec. -## -## ## How a type opts in -## -## Primitives, `string`, `seq[byte]`, `enum`, `distinct`, `Time`, `seq[T]`, -## `HashSet[T]` and tuples already have codecs here. A **named struct** opts in -## with a single line: -## -## ```nim -## BlobCodec(MyType) -## ``` -## -## which emits both `writePart`/`readPart` for `MyType` from its fields, in -## declaration order, and reconstructs the value via `MyType.init(...)` -## (positional). This is the *only* mechanism for structs — there is -## deliberately no `fieldPairs`/default-construct path, so `{.requiresInit.}` -## types (which cannot be zero-initialised) work unchanged. The contract: -## -## * the type is a value object whose `init` takes its fields positionally in -## declaration order; -## * only public fields participate (private fields are invisible to the -## macro and would be dropped — don't persist such types); -## * `BlobCodec` must be called for a field's type *before* the struct -## that contains it (Nim resolves the concrete `writePart`/`readPart` -## top-down). -## -## ## Entry points -## -## `toBlob(v)` → `seq[byte]`, `fromBlob(bytes, T)` → `T` (raises `ValueError` -## on truncated/corrupt input). - -{.push raises: [].} - -import std/[macros, sets, times, typetraits] - -type ReadCtx* = object - buf*: seq[byte] - pos*: int - -proc initReadCtx*(bytes: openArray[byte]): ReadCtx = - ReadCtx(buf: @bytes, pos: 0) - -proc need(r: ReadCtx, n: int) {.raises: [ValueError].} = - if n < 0 or r.pos + n > r.buf.len: - raise newException(ValueError, "truncated payload: need " & $n & " more bytes") - -# ── Fixed-width integers ──────────────────────────────────────────────── - -proc writePart*(buf: var seq[byte], v: uint32) = - buf.add(byte((v shr 24) and 0xFF'u32)) - buf.add(byte((v shr 16) and 0xFF'u32)) - buf.add(byte((v shr 8) and 0xFF'u32)) - buf.add(byte(v and 0xFF'u32)) - -proc readPart*(r: var ReadCtx, _: typedesc[uint32]): uint32 {.raises: [ValueError].} = - r.need(4) - result = - (uint32(r.buf[r.pos]) shl 24) or (uint32(r.buf[r.pos + 1]) shl 16) or - (uint32(r.buf[r.pos + 2]) shl 8) or uint32(r.buf[r.pos + 3]) - r.pos += 4 - -proc writePart*(buf: var seq[byte], v: int64) = - let u = cast[uint64](v) - for shift in countdown(56, 0, 8): - buf.add(byte((u shr shift) and 0xFF'u64)) - -proc readPart*(r: var ReadCtx, _: typedesc[int64]): int64 {.raises: [ValueError].} = - r.need(8) - var u: uint64 = 0 - for i in 0 ..< 8: - u = (u shl 8) or uint64(r.buf[r.pos + i]) - r.pos += 8 - cast[int64](u) - -proc writePart*(buf: var seq[byte], v: int) = - writePart(buf, int64(v)) - -proc readPart*(r: var ReadCtx, _: typedesc[int]): int {.raises: [ValueError].} = - let x = readPart(r, int64) - when sizeof(int) < sizeof(int64): - if x < int64(low(int)) or x > int64(high(int)): - raise newException(ValueError, "int out of range: " & $x) - result = int(x) - -# ── Small scalars ─────────────────────────────────────────────────────── - -proc writePart*(buf: var seq[byte], v: bool) = - buf.add(if v: 1'u8 else: 0'u8) - -proc readPart*(r: var ReadCtx, _: typedesc[bool]): bool {.raises: [ValueError].} = - r.need(1) - result = r.buf[r.pos] != 0'u8 - r.pos += 1 - -proc writePart*(buf: var seq[byte], v: byte) = - buf.add(v) - -proc readPart*(r: var ReadCtx, _: typedesc[byte]): byte {.raises: [ValueError].} = - r.need(1) - result = r.buf[r.pos] - r.pos += 1 - -proc writePart*(buf: var seq[byte], v: char) = - buf.add(byte(v)) - -proc readPart*(r: var ReadCtx, _: typedesc[char]): char {.raises: [ValueError].} = - r.need(1) - result = char(r.buf[r.pos]) - r.pos += 1 - -proc writePart*[E: enum](buf: var seq[byte], v: E) = - writePart(buf, int64(ord(v))) - -proc readPart*[E: enum](r: var ReadCtx, _: typedesc[E]): E {.raises: [ValueError].} = - let x = readPart(r, int64) - let lo = int64(ord(low(E))) - let hi = int64(ord(high(E))) - if x < lo or x > hi: - raise newException(ValueError, "enum value out of range: " & $x) - result = E(x) - -# ── string / seq[byte] (4-byte length) ────────────────────────────────── - -proc writePart*(buf: var seq[byte], s: string) = - writePart(buf, uint32(s.len)) - for c in s: - buf.add(byte(c)) - -proc readPart*(r: var ReadCtx, _: typedesc[string]): string {.raises: [ValueError].} = - let nU = readPart(r, uint32) - if nU > uint32(high(int)): - raise newException(ValueError, "string length out of range: " & $nU) - let n = int(nU) - r.need(n) - result = newString(n) - for i in 0 ..< n: - result[i] = char(r.buf[r.pos + i]) - r.pos += n - -proc writePart*(buf: var seq[byte], b: seq[byte]) = - writePart(buf, uint32(b.len)) - for x in b: - buf.add(x) - -proc readPart*( - r: var ReadCtx, _: typedesc[seq[byte]] -): seq[byte] {.raises: [ValueError].} = - let nU = readPart(r, uint32) - if nU > uint32(high(int)): - raise newException(ValueError, "blob length out of range: " & $nU) - let n = int(nU) - r.need(n) - result = newSeq[byte](n) - for i in 0 ..< n: - result[i] = r.buf[r.pos + i] - r.pos += n - -# ── distinct (e.g. SdsParticipantID = distinct string) ────────────────── - -proc writePart*[T: distinct](buf: var seq[byte], v: T) = - mixin writePart - writePart(buf, distinctBase(T)(v)) - -proc readPart*[T: distinct]( - r: var ReadCtx, _: typedesc[T] -): T {.raises: [ValueError].} = - mixin readPart - T(readPart(r, distinctBase(T))) - -# ── Time ──────────────────────────────────────────────────────────────── - -proc writePart*(buf: var seq[byte], t: Time) = - writePart(buf, t.toUnix()) - writePart(buf, uint32(t.nanosecond)) - -proc readPart*(r: var ReadCtx, _: typedesc[Time]): Time {.raises: [ValueError].} = - let secs = readPart(r, int64) - let nanos = int(readPart(r, uint32)) - if nanos < 0 or nanos > 999_999_999: - raise newException(ValueError, "nanosecond out of range: " & $nanos) - initTime(secs, nanos) - -# ── Containers ────────────────────────────────────────────────────────── - -proc writePart*[T](buf: var seq[byte], xs: seq[T]) = - mixin writePart - writePart(buf, uint32(xs.len)) - for x in xs: - writePart(buf, x) - -proc readPart*[T]( - r: var ReadCtx, _: typedesc[seq[T]] -): seq[T] {.raises: [ValueError].} = - mixin readPart - let nU = readPart(r, uint32) - if nU > uint32(high(int)): - raise newException(ValueError, "sequence length out of range: " & $nU) - let n = int(nU) - result = newSeqOfCap[T](n) - for _ in 0 ..< n: - result.add(readPart(r, T)) -proc writePart*[T](buf: var seq[byte], s: HashSet[T]) = - mixin writePart - writePart(buf, uint32(s.len)) - for x in s: - writePart(buf, x) - -proc readPart*[T]( - r: var ReadCtx, _: typedesc[HashSet[T]] -): HashSet[T] {.raises: [ValueError].} = - mixin readPart - let nU = readPart(r, uint32) - if nU > uint32(high(int)): - raise newException(ValueError, "set length out of range: " & $nU) - let n = int(nU) - result = initHashSet[T](max(n, 2)) - for _ in 0 ..< n: - result.incl(readPart(r, T)) -proc writePart*[T: tuple](buf: var seq[byte], v: T) = - mixin writePart - for f in fields(v): - writePart(buf, f) - -proc readPart*[T: tuple](r: var ReadCtx, _: typedesc[T]): T {.raises: [ValueError].} = - mixin readPart - for f in fields(result): - f = readPart(r, typeof(f)) - -# ── Named-struct derivation ───────────────────────────────────────────── - -proc objectRecList(tSym: NimNode): NimNode {.compileTime.} = - ## Resolve a type symbol to its object's RecList, preserving field types - ## exactly as written (getImpl, not getTypeImpl, so `HashSet[SdsMessageID]` - ## and friends stay named rather than being expanded to their structure). - var body = tSym.getImpl[2] - while body.kind in {nnkRefTy, nnkPtrTy, nnkDistinctTy}: - body = body[0] - doAssert body.kind == nnkObjectTy, - "BlobCodec: expected an object type, got " & treeRepr(body) - body[2] - -macro BlobCodec*(T: typedesc): untyped = - ## Emit `writePart`/`readPart` for a named value object `T`, encoding each - ## public field in declaration order and rebuilding via `T.init(...)`. - let tSym = getTypeInst(T)[1] - let recList = objectRecList(tSym) - - var fieldNames: seq[NimNode] - var fieldTypes: seq[NimNode] - for defs in recList: - if defs.kind != nnkIdentDefs: - continue - # Rebuild the field type from its textual form rather than splicing the - # resolved symbol: a spliced *alias* type symbol (e.g. `SdsMessageID = - # string`) is mis-resolved as a value in `readPart(r, T)`, breaking - # typedesc overload resolution. A fresh ident/expr behaves like literal - # source and resolves to a typedesc correctly. - let ftype = parseExpr(repr(defs[^2])) - for i in 0 ..< defs.len - 2: - var nameNode = defs[i] - if nameNode.kind == nnkPragmaExpr: - nameNode = nameNode[0] - if nameNode.kind == nnkPostfix: - nameNode = nameNode[1] - fieldNames.add(ident($nameNode)) - fieldTypes.add(ftype.copyNimTree) - - let bufId = ident "buf" - let vId = ident "v" - let rId = ident "r" - - # writePart(buf: var seq[byte], v: T) - var writeBody = newStmtList() - for fn in fieldNames: - writeBody.add(newCall(ident "writePart", bufId, newDotExpr(vId, fn))) - let writeProc = newProc( - name = ident "writePart", - params = [ - newEmptyNode(), - newIdentDefs( - bufId, nnkVarTy.newTree(nnkBracketExpr.newTree(ident "seq", ident "byte")) - ), - newIdentDefs(vId, tSym), - ], - body = writeBody, - ) - - # readPart(r: var ReadCtx, _: typedesc[T]): T {.raises: [ValueError].} - var readBody = newStmtList() - var tmps: seq[NimNode] - for i, ft in fieldTypes: - let tmp = genSym(nskLet, "f" & $i) - tmps.add(tmp) - readBody.add(newLetStmt(tmp, newCall(ident "readPart", rId, ft))) - readBody.add(newCall(newDotExpr(tSym, ident "init"), tmps)) - let readProc = newProc( - name = ident "readPart", - params = [ - tSym, - newIdentDefs(rId, nnkVarTy.newTree(ident "ReadCtx")), - newIdentDefs(ident "_", nnkBracketExpr.newTree(ident "typedesc", tSym)), - ], - body = readBody, - ) - readProc.addPragma( - nnkExprColonExpr.newTree(ident "raises", nnkBracket.newTree(ident "ValueError")) - ) - - result = newStmtList(writeProc, readProc) - -# ── Public entry points ───────────────────────────────────────────────── - -proc toBlob*[T](v: T): seq[byte] = - mixin writePart - result = @[] - writePart(result, v) - -proc fromBlob*[T](bytes: openArray[byte], _: typedesc[T]): T {.raises: [ValueError].} = - mixin readPart - var r = initReadCtx(bytes) - result = readPart(r, T) - if r.pos != r.buf.len: - raise newException(ValueError, "trailing payload bytes: " & $(r.buf.len - r.pos)) - -{.pop.} diff --git a/waku/persistency/sds_persistency.nim b/waku/persistency/sds_persistency.nim index 44d551814..a476171dc 100644 --- a/waku/persistency/sds_persistency.nim +++ b/waku/persistency/sds_persistency.nim @@ -1,49 +1,67 @@ -## Adapter that materialises the SDS `Persistence` contract on top of a -## waku-persistency `Job`. One `Job` (== one SQLite file, one worker thread) -## services all channels for a given SDS context; rows are namespaced by -## category and the channelId is the first key component so per-channel -## prefix scans stay cheap. +## Adapter that materialises the SDS `Persistence` contract (nim-sds 0.3.0, +## snapshot model) on top of a waku-persistency `Job`. One `Job` (== one +## SQLite file, one worker thread) services all channels for a given SDS +## context; rows are namespaced by category and the channelId is the first +## key component so per-channel prefix scans stay cheap. ## -## ## Async contract +## ## Snapshot contract (nim-sds 0.3.0) ## -## Every `Persistence` proc field is async (`proc(..): Future[void] -## {.async: (raises: []), gcsafe.}`) — SDS awaits them on its own chronos -## loop. We map each onto the matching async `Job` op: +## The fine-grained per-row callbacks of 0.2.4 are gone. SDS now persists via +## five procs, all `Future[Result[void, string]]` (load returns +## `Result[ChannelData, string]`), `{.async: (raises: []), gcsafe.}`: ## -## * **Writes (save*/remove*)** — call the fire-and-forget `Job.persist*` ops -## through the `safePut`/`safeDelete` helpers, which trap any backend error -## and log it rather than raising (the contract forbids raising). Note that -## Persistency v1 only guarantees the event has been queued when the Future -## resolves — reads immediately after an awaited write can still be racy. -## * **`dropChannel`** — awaits `doDropChannel`, which batches every row of -## the channel into one transactional `persist` (atomic when applied). -## * **`loadAllForChannel`** — awaits `doLoadAll` and returns the snapshot -## the SDS bootstrap path needs. +## * **`saveChannelMeta`** — the complete fast-changing per-channel state +## (lamport clock, outgoing/incoming buffers, both SDS-R repair buffers) +## as ONE blob. Idempotent; a missed write self-heals on the next save. +## * **`updateHistory`** — append newly-delivered messages / evict the +## oldest past the cap, applied as one transactional batch. +## * **`loadChannel`** — bootstrap: returns the prior `ChannelData` +## (meta + ordered message history) or an empty one. Surfaces errors. +## * **`dropChannel`** — wipe all state for a channel. Surfaces errors. +## * **`setRetrievalHint`** — see below; deliberate no-op here. +## +## Failure policy mirrors the interface docs: save/update/hint are non-fatal +## (we log and still return the error string); load/drop are durability-intent +## and propagate their error to the caller. +## +## ## Codec +## +## The blob transform is owned by nim-sds: `ChannelMeta` round-trips through +## `sds/snapshot_codec` (protobuf, schema-versioned — refuses unknown +## versions), and each persisted `SdsMessage` log row through the SDS wire +## codec in `sds/protobuf`. We do not maintain a second codec for these +## shapes (the previous `payload_codec`/`BlobCodec` path is retired). +## +## ## Retrieval hints +## +## `setRetrievalHint` is intentionally a no-op: persisted hints are never read +## back — `loadChannel` returns `ChannelData` (meta + messageHistory) with no +## hint field, and `ChannelMeta` carries none. Hints are supplied live via the +## `onRetrievalHint` provider, so persisting them would be write-only dead +## data. The closure still exists because the field is required by the +## `Persistence` object (SDS calls it from `getRecentHistoryEntries`). ## ## ## Storage layout ## -## | Category | Key | Value | -## |------------------|------------------------------|-----------------------------------------| -## | `sds.lamport` | `key(channelId)` | 8 BE bytes (int64) | -## | `sds.log` | `key(channelId, msgId)` | encoded `SdsMessage` | -## | `sds.hint` | `key(msgId)` | raw hint bytes | -## | `sds.outgoing` | `key(channelId, msgId)` | encoded `UnacknowledgedMessage` | -## | `sds.incoming` | `key(channelId, msgId)` | encoded `IncomingMessage` | -## | `sds.outRepair` | `key(channelId, msgId)` | `(msgId, OutgoingRepairEntry)` encoded | -## | `sds.inRepair` | `key(channelId, msgId)` | `(msgId, IncomingRepairEntry)` encoded | +## | Category | Key | Value | +## |---------------|--------------------------|----------------------------------------| +## | `sds.meta` | `key(channelId)` | `ChannelMeta` (snapshot_codec protobuf)| +## | `sds.log` | `key(channelId, msgId)` | `SdsMessage` (sds wire protobuf) | ## ## `messageHistory` is reconstructed in memory by sorting on ## `(lamportTimestamp, messageId)` — the same total order SDS uses for -## delivery (see sds/sds_utils.nim). Insertion order is not relied upon, so -## `removeLogEntry` works with the natural `(channelId, msgId)` key. +## delivery (see sds/sds_utils.nim). {.push raises: [].} -import std/[algorithm, options, sets, times] +import std/[algorithm, options] import chronos, chronicles, results +import libp2p/protobuf/minprotobuf import ./persistency -import ./payload_codec -import sds/types/persistence +import ./keys +import types/persistence +import snapshot_codec +import protobuf export persistence, persistency @@ -51,155 +69,8 @@ logScope: topics = "sds-persistency" const - CatLamport* = "sds.lamport" + CatMeta* = "sds.meta" CatLog* = "sds.log" - CatHint* = "sds.hint" - CatOutgoing* = "sds.outgoing" - CatIncoming* = "sds.incoming" - CatOutRepair* = "sds.outRepair" - CatInRepair* = "sds.inRepair" - -# ── Blob codecs ───────────────────────────────────────────────────────── -# -# All SDS payload types round-trip through the generic, length-prefixed -# codec in payload_codec.nim. Each `BlobCodec(T)` emits writePart/ -# readPart for `T` from its public fields (declaration order) and rebuilds -# via `T.init(...)`. Order matters: a field's type must be derived before -# the struct that contains it. The repair buffers store `(msgId, entry)` -# tuples, handled by the generic tuple codec. Lamport is a bare int64. - -BlobCodec(HistoryEntry) -BlobCodec(SdsMessage) -BlobCodec(UnacknowledgedMessage) -BlobCodec(IncomingMessage) -BlobCodec(OutgoingRepairEntry) -BlobCodec(IncomingRepairEntry) - -# ── Write helpers ─────────────────────────────────────────────────────── -# -# The Persistence write fields are async with `raises: []`, but the Job ops -# raise `CatchableError`. These wrappers trap and log so the closures stay -# raise-free, preserving the "errors are logged, never raised" contract. - -proc safePut( - job: Job, category: string, k: Key, payload: seq[byte] -) {.async: (raises: []).} = - try: - await job.persistPut(category, k, payload) - except CatchableError as e: - warn "sds-persistency: put failed", category, err = e.msg - -proc safeDelete(job: Job, category: string, k: Key) {.async: (raises: []).} = - try: - await job.persistDelete(category, k) - except CatchableError as e: - warn "sds-persistency: delete failed", category, err = e.msg - -proc safePersist(job: Job, ops: seq[TxOp]) {.async: (raises: []).} = - try: - await job.persist(ops) - except CatchableError as e: - warn "sds-persistency: persist batch failed", opCount = ops.len, err = e.msg - -# ── Async backing procs ───────────────────────────────────────────────── - -proc doLoadAll(job: Job, channelId: SdsChannelID): Future[ChannelSnapshot] {.async.} = - var snap = ChannelSnapshot() - let chanKey = toKey(channelId) - - block lamport: - let opt = (await job.get(CatLamport, chanKey)).valueOr: - warn "sds-persistency: get lamport failed", channelId, err = $error - break lamport - if opt.isSome: - try: - snap.lamportTimestamp = fromBlob(opt.get, int64) - except ValueError as e: - warn "sds-persistency: invalid lamport bytes", channelId, err = e.msg - - block log: - let rows = (await job.scanPrefix(CatLog, chanKey)).valueOr: - warn "sds-persistency: scan log failed", channelId, err = $error - break log - var msgs = newSeq[SdsMessage]() - for row in rows: - try: - msgs.add(fromBlob(row.payload, SdsMessage)) - except ValueError as e: - warn "sds-persistency: invalid log row", channelId, err = e.msg - msgs.sort do(a, b: SdsMessage) -> int: - result = cmp(a.lamportTimestamp, b.lamportTimestamp) - if result == 0: - result = cmp(a.messageId, b.messageId) - snap.messageHistory = msgs - - block outgoing: - let rows = (await job.scanPrefix(CatOutgoing, chanKey)).valueOr: - warn "sds-persistency: scan outgoing failed", channelId, err = $error - break outgoing - for row in rows: - try: - snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage)) - except ValueError as e: - warn "sds-persistency: invalid outgoing row", channelId, err = e.msg - - block incoming: - let rows = (await job.scanPrefix(CatIncoming, chanKey)).valueOr: - warn "sds-persistency: scan incoming failed", channelId, err = $error - break incoming - for row in rows: - try: - snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage)) - except ValueError as e: - warn "sds-persistency: invalid incoming row", channelId, err = e.msg - - block outRepair: - let rows = (await job.scanPrefix(CatOutRepair, chanKey)).valueOr: - warn "sds-persistency: scan out-repair failed", channelId, err = $error - break outRepair - for row in rows: - try: - snap.outgoingRepairBuffer.add( - fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry)) - ) - except ValueError as e: - warn "sds-persistency: invalid out-repair row", channelId, err = e.msg - - block inRepair: - let rows = (await job.scanPrefix(CatInRepair, chanKey)).valueOr: - warn "sds-persistency: scan in-repair failed", channelId, err = $error - break inRepair - for row in rows: - try: - snap.incomingRepairBuffer.add( - fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry)) - ) - except ValueError as e: - warn "sds-persistency: invalid in-repair row", channelId, err = e.msg - - return snap - -proc doDropChannel(job: Job, channelId: SdsChannelID): Future[void] {.async.} = - ## Delete every row belonging to the channel in one transactional batch. - ## Uses txDeletePrefix to push bulk deletes to the worker thread — no - ## caller-side scans needed. Hint rows (keyed by msgId, not channelId) - ## are not cleaned here; they are cascade-deleted by removeLogEntry during - ## normal rolling-history eviction, so by the time a channel is dropped - ## the only remaining hints belong to the still-live log tail. Those - ## become harmless orphans (never reloaded — hints are re-derived on - ## demand from the onRetrievalHint callback). - let chanKey = toKey(channelId) - await safePersist( - job, - @[ - TxOp(category: CatLog, key: chanKey, kind: txDeletePrefix), - TxOp(category: CatOutgoing, key: chanKey, kind: txDeletePrefix), - TxOp(category: CatIncoming, key: chanKey, kind: txDeletePrefix), - TxOp(category: CatOutRepair, key: chanKey, kind: txDeletePrefix), - TxOp(category: CatInRepair, key: chanKey, kind: txDeletePrefix), - TxOp(category: CatLamport, key: chanKey, kind: txDelete), - ], - ) # ── Public factory ────────────────────────────────────────────────────── @@ -207,102 +78,106 @@ proc newSdsPersistence*(job: Job): Persistence {.gcsafe, raises: [].} = ## Build an SDS `Persistence` value backed by ``job``. One Job services ## all channels — channelId is part of every key. ## - ## The closures capture ``job`` by ref. They must be invoked from a - ## thread that owns a running chronos loop (the SDS context's worker - ## thread satisfies this). + ## The closures capture ``job`` by ref. They must be invoked from a thread + ## that owns a running chronos loop (the SDS context's worker thread + ## satisfies this). doAssert not job.isNil, "newSdsPersistence: job is nil" # Built field-by-field via assignment rather than an object literal: every - # field is an async closure whose body is an `await`/`return await` command - # call, which cannot be followed by the `,` field separator a `Persistence( - # ..)` literal would require (the parser cannot tell the comma from another - # command argument). Assignments have no separator, so the bodies stay plain. + # field is an async closure whose body uses `await`/`return` statements, + # which cannot be followed by the `,` field separator a `Persistence(..)` + # literal would require. Assignments have no separator, so bodies stay plain. var persistence = Persistence() - persistence.saveLamport = proc( - channelId: SdsChannelID, lamport: int64 - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatLamport, toKey(channelId), toBlob(lamport)) + persistence.saveChannelMeta = proc( + channelId: SdsChannelID, meta: ChannelMeta + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} = + try: + await job.persistPut(CatMeta, toKey(channelId), encode(meta).buffer) + return ok() + except CatchableError as e: + warn "sds-persistency: saveChannelMeta failed", channelId, err = e.msg + return err(e.msg) - persistence.appendLogEntry = proc( - channelId: SdsChannelID, msg: SdsMessage - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatLog, key(channelId, msg.messageId), toBlob(msg)) + persistence.updateHistory = proc( + channelId: SdsChannelID, update: HistoryUpdate + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} = + if update.isEmpty: + return ok() + # One transactional batch: append rows (txPut) and evictions (txDelete). + var ops = newSeq[TxOp]() + for m in update.append: + ops.add TxOp( + category: CatLog, + key: key(channelId, m.messageId), + kind: txPut, + payload: encode(m).buffer, + ) + for id in update.evict: + ops.add TxOp(category: CatLog, key: key(channelId, id), kind: txDelete) + try: + await job.persist(ops) + return ok() + except CatchableError as e: + warn "sds-persistency: updateHistory failed", + channelId, appended = update.append.len, evicted = update.evict.len, err = e.msg + return err(e.msg) - persistence.removeLogEntry = proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[void] {.async: (raises: []), gcsafe.} = - # Atomic batch: delete the log row and its associated retrieval hint in - # one transaction so they can't diverge. - await safePersist( - job, - @[ - TxOp(category: CatLog, key: key(channelId, msgId), kind: txDelete), - TxOp(category: CatHint, key: toKey(msgId), kind: txDelete), - ], - ) + persistence.loadChannel = proc( + channelId: SdsChannelID + ): Future[Result[ChannelData, string]] {.async: (raises: []), gcsafe.} = + let chanKey = toKey(channelId) + var data = ChannelData.init() + try: + block meta: + let opt = (await job.get(CatMeta, chanKey)).valueOr: + return err("loadChannel: get meta: " & $error) + if opt.isSome: + # schema-versioned decode; refuses unknown versions loudly. + data.meta = ChannelMeta.decode(opt.get).valueOr: + return err("loadChannel: corrupt or unsupported ChannelMeta blob") - persistence.setRetrievalHint = proc( - msgId: SdsMessageID, hint: seq[byte] - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatHint, toKey(msgId), hint) + block history: + let rows = (await job.scanPrefix(CatLog, chanKey)).valueOr: + return err("loadChannel: scan log: " & $error) + var msgs = newSeq[SdsMessage]() + for row in rows: + let m = SdsMessage.decode(row.payload).valueOr: + warn "sds-persistency: skipping undecodable log row", channelId + continue + msgs.add(m) + msgs.sort do(a, b: SdsMessage) -> int: + result = cmp(a.lamportTimestamp, b.lamportTimestamp) + if result == 0: + result = cmp(a.messageId, b.messageId) + data.messageHistory = msgs - persistence.saveOutgoing = proc( - channelId: SdsChannelID, msg: UnacknowledgedMessage - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatOutgoing, key(channelId, msg.message.messageId), toBlob(msg)) - - persistence.removeOutgoing = proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[void] {.async: (raises: []), gcsafe.} = - await safeDelete(job, CatOutgoing, key(channelId, msgId)) - - persistence.saveIncoming = proc( - channelId: SdsChannelID, msg: IncomingMessage - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatIncoming, key(channelId, msg.message.messageId), toBlob(msg)) - - persistence.removeIncoming = proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[void] {.async: (raises: []), gcsafe.} = - await safeDelete(job, CatIncoming, key(channelId, msgId)) - - persistence.saveOutgoingRepair = proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: OutgoingRepairEntry - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatOutRepair, key(channelId, msgId), toBlob((msgId, entry))) - - persistence.removeOutgoingRepair = proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[void] {.async: (raises: []), gcsafe.} = - await safeDelete(job, CatOutRepair, key(channelId, msgId)) - - persistence.saveIncomingRepair = proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: IncomingRepairEntry - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatInRepair, key(channelId, msgId), toBlob((msgId, entry))) - - persistence.removeIncomingRepair = proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[void] {.async: (raises: []), gcsafe.} = - await safeDelete(job, CatInRepair, key(channelId, msgId)) + return ok(data) + except CatchableError as e: + return err("loadChannel: " & e.msg) persistence.dropChannel = proc( channelId: SdsChannelID - ): Future[void] {.async: (raises: []), gcsafe.} = + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} = + let chanKey = toKey(channelId) try: - await doDropChannel(job, channelId) + await job.persist( + @[ + TxOp(category: CatLog, key: chanKey, kind: txDeletePrefix), + TxOp(category: CatMeta, key: chanKey, kind: txDelete), + ] + ) + return ok() except CatchableError as e: error "sds-persistency: dropChannel failed", channelId, err = e.msg + return err(e.msg) - persistence.loadAllForChannel = proc( - channelId: SdsChannelID - ): Future[ChannelSnapshot] {.async: (raises: []), gcsafe.} = - try: - return await doLoadAll(job, channelId) - except CatchableError as e: - error "sds-persistency: loadAllForChannel failed", channelId, err = e.msg - return ChannelSnapshot() + persistence.setRetrievalHint = proc( + msgId: SdsMessageID, hint: seq[byte] + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} = + ## Deliberate no-op — persisted hints are never read back (see module + ## header). Hints are supplied live via the onRetrievalHint provider. + return ok() return persistence From 4e1ec5a717a49ead6cb51164f9a3c23aaa787627 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 2 Jun 2026 14:44:26 +0200 Subject: [PATCH 09/10] Bump to latest nim-sds and nim-brokers 3.1.1 --- nimble.lock | 12 ++++++------ waku.nimble | 12 ++---------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/nimble.lock b/nimble.lock index 92651e58c..4bdb8bb82 100644 --- a/nimble.lock +++ b/nimble.lock @@ -328,8 +328,8 @@ } }, "brokers": { - "version": "#v3.0.0", - "vcsRevision": "8199b236db409cbaf6102dcc2431ebf33446ae26", + "version": "#v3.1.1", + "vcsRevision": "a7316a35f1b62e3497ae8ee0fc1aace74df0beb2", "url": "https://github.com/NagyZoltanPeter/nim-brokers.git", "downloadMethod": "git", "dependencies": [ @@ -341,7 +341,7 @@ "cbor_serialization" ], "checksums": { - "sha1": "b3a48e23540c0f26f905a7516830559f71c89003" + "sha1": "4447d7c1f9da14ae439afb23aee45116ce2ecb40" } }, "stint": { @@ -620,8 +620,8 @@ } }, "sds": { - "version": "#4ccdd122fc4fa82f9ef69eef5dedd24ca2d9f420", - "vcsRevision": "4ccdd122fc4fa82f9ef69eef5dedd24ca2d9f420", + "version": "#abdd40cc645f1b024c3ee99cced7e287c4e4c441", + "vcsRevision": "abdd40cc645f1b024c3ee99cced7e287c4e4c441", "url": "https://github.com/logos-messaging/nim-sds.git", "downloadMethod": "git", "dependencies": [ @@ -636,7 +636,7 @@ "taskpools" ], "checksums": { - "sha1": "cee8c7e2e7b869da0be23d383f11ddda5b3524d2" + "sha1": "61c4ae13c6896bfa70e662520e8660a78c7f438c" } }, "ffi": { diff --git a/waku.nimble b/waku.nimble index fcbeaa18d..b88f79a33 100644 --- a/waku.nimble +++ b/waku.nimble @@ -61,17 +61,9 @@ requires "nim >= 2.2.4", # Packages not on nimble (use git URLs) requires "https://github.com/logos-messaging/nim-ffi" -requires "https://github.com/logos-messaging/nim-sds.git#4ccdd122fc4fa82f9ef69eef5dedd24ca2d9f420" +requires "https://github.com/logos-messaging/nim-sds.git#abdd40cc645f1b024c3ee99cced7e287c4e4c441" -# brokers: pinned by URL+commit rather than the bare `brokers >= 2.0.1` -# form because the nim-lang/packages registry entry for `brokers` only -# carries metadata for the original v0.1.0 publication. Until that -# registry entry is refreshed, the local SAT solver enumerates "0.1.0" -# as the only available version and cannot satisfy `>= 2.0.1`. The URL -# pin below bypasses the registry and locks the exact commit of the -# v2.0.1 tag. Revert to the bare form once nim-lang/packages is -# updated. -requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v3.0.0" +requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v3.1.1" requires "https://github.com/vacp2p/nim-lsquic" requires "https://github.com/vacp2p/nim-jwt.git#057ec95eb5af0eea9c49bfe9025b3312c95dc5f2" From 7d99833ceef7caf352e5d507c87f9a1295c8d3f7 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 2 Jun 2026 15:31:21 +0200 Subject: [PATCH 10/10] Update with latest nim-sds changes - removal of setRetrievalHints - not needed --- waku/persistency/sds_persistency.nim | 7 ------- 1 file changed, 7 deletions(-) diff --git a/waku/persistency/sds_persistency.nim b/waku/persistency/sds_persistency.nim index a476171dc..a8da0c034 100644 --- a/waku/persistency/sds_persistency.nim +++ b/waku/persistency/sds_persistency.nim @@ -172,13 +172,6 @@ proc newSdsPersistence*(job: Job): Persistence {.gcsafe, raises: [].} = error "sds-persistency: dropChannel failed", channelId, err = e.msg return err(e.msg) - persistence.setRetrievalHint = proc( - msgId: SdsMessageID, hint: seq[byte] - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} = - ## Deliberate no-op — persisted hints are never read back (see module - ## header). Hints are supplied live via the onRetrievalHint provider. - return ok() - return persistence {.pop.}