diff --git a/tests/persistency/test_all.nim b/tests/persistency/test_all.nim index 0909850a4..5b0cfdbb5 100644 --- a/tests/persistency/test_all.nim +++ b/tests/persistency/test_all.nim @@ -5,6 +5,6 @@ import ./test_backend import ./test_lifecycle import ./test_facade import ./test_encoding -import ./test_blob_codec +import ./test_sds_persistency import ./test_string_lookup import ./test_singleton diff --git a/tests/persistency/test_blob_codec.nim b/tests/persistency/test_blob_codec.nim deleted file mode 100644 index f30cf1d4f..000000000 --- a/tests/persistency/test_blob_codec.nim +++ /dev/null @@ -1,94 +0,0 @@ -{.used.} - -## Round-trip tests for the generic payload blob codec (payload_codec.nim) -## and its `BlobCodec` application to the SDS persistence types. -## -## Importing `sds_persistency` forces the real adapter (and its six -## `BlobCodec` calls + call sites) to compile. The local derivations -## below live in this module's own scope and drive the round-trip checks. - -import std/[sets, times] -import testutils/unittests -import waku/persistency/payload_codec -import waku/persistency/sds_persistency # compile-checks the real adapter -import sds/types/persistence - -# Same order constraint as the adapter: a field's type before its container. -BlobCodec(HistoryEntry) -BlobCodec(SdsMessage) -BlobCodec(UnacknowledgedMessage) -BlobCodec(IncomingMessage) -BlobCodec(OutgoingRepairEntry) -BlobCodec(IncomingRepairEntry) - -proc sampleHistory(): HistoryEntry = - HistoryEntry.init("mid-1", @[0xAA'u8, 0xBB, 0xCC], "sender-1".SdsParticipantID) - -proc sampleMessage(content = @[1'u8, 2, 3]): SdsMessage = - SdsMessage.init( - messageId = "msg-42", - lamportTimestamp = 7'i64, - causalHistory = @[sampleHistory(), HistoryEntry.init("mid-2")], - channelId = "channel-9", - content = content, - bloomFilter = @[0xDE'u8, 0xAD, 0xBE, 0xEF], - senderId = "alice".SdsParticipantID, - repairRequest = @[sampleHistory()], - ) - -suite "Persistency blob codec": - test "primitive round-trips": - check fromBlob(toBlob(7'i64), int64) == 7'i64 - check fromBlob(toBlob(int64.low), int64) == int64.low - check fromBlob(toBlob("héllo"), string) == "héllo" - check fromBlob(toBlob(@[0'u8, 255, 7]), seq[byte]) == @[0'u8, 255, 7] - - test "HistoryEntry round-trips": - let h = sampleHistory() - check fromBlob(toBlob(h), HistoryEntry) == h - - test "SdsMessage round-trips (seqs, distinct, nested)": - let m = sampleMessage() - check fromBlob(toBlob(m), SdsMessage) == m - - test "UnacknowledgedMessage round-trips (Time, int)": - let u = UnacknowledgedMessage.init(sampleMessage(), initTime(1_700_000_000, 123), 4) - check fromBlob(toBlob(u), UnacknowledgedMessage) == u - - test "IncomingMessage round-trips (HashSet)": - let inc = - IncomingMessage.init(sampleMessage(), toHashSet(["dep-a", "dep-b", "dep-c"])) - check fromBlob(toBlob(inc), IncomingMessage) == inc - - test "repair tuples round-trip": - let outPair = ( - "msg-42".SdsMessageID, OutgoingRepairEntry.init(sampleHistory(), initTime(10, 0)) - ) - check fromBlob(toBlob(outPair), (SdsMessageID, OutgoingRepairEntry)) == outPair - - let inPair = ( - "msg-42".SdsMessageID, - IncomingRepairEntry.init(sampleHistory(), @[9'u8, 8, 7], initTime(20, 500)), - ) - check fromBlob(toBlob(inPair), (SdsMessageID, IncomingRepairEntry)) == inPair - - test "payload exceeds the old 64 KiB key cap (4-byte length)": - var big = newSeq[byte](70_000) - for i in 0 ..< big.len: - big[i] = byte(i and 0xFF) - let m = sampleMessage(content = big) - let decoded = fromBlob(toBlob(m), SdsMessage) - check decoded.content.len == 70_000 - check decoded == m - - test "truncated input raises ValueError": - let bytes = toBlob(sampleMessage()) - expect ValueError: - discard fromBlob(bytes[0 ..< bytes.len - 5], SdsMessage) - expect ValueError: - discard fromBlob(@[0xFF'u8, 0xFF, 0xFF, 0xFF], string) # claims 4 GiB - - test "trailing input raises ValueError": - let bytes = toBlob(sampleMessage()) - expect ValueError: - discard fromBlob(bytes & @[0x00'u8], SdsMessage) diff --git a/tests/persistency/test_sds_persistency.nim b/tests/persistency/test_sds_persistency.nim new file mode 100644 index 000000000..ed14f904b --- /dev/null +++ b/tests/persistency/test_sds_persistency.nim @@ -0,0 +1,155 @@ +{.used.} + +## Behavioural tests for the SDS Persistence adapter (nim-sds 0.3.0 snapshot +## model). Importing `sds_persistency` also compile-checks the real adapter. +## +## Writes go through the fire-and-forget Job path (the Future resolves when +## the op is queued, not applied — Persistency v1), so every read-back polls +## until the row appears/disappears. + +import std/[options, os, times] +import chronos, results +import testutils/unittests +import waku/persistency/persistency +import waku/persistency/keys +import waku/persistency/sds_persistency + +proc tmpRoot(label: string): string = + let p = getTempDir() / ("sds_persistency_test_" & label & "_" & $epochTime().int) + removeDir(p) + p + +proc pollExists( + t: Job, category: string, k: Key, timeoutMs = 1000 +): Future[bool] {.async.} = + let deadline = epochTime() + (timeoutMs.float / 1000.0) + while epochTime() < deadline: + let r = await t.exists(category, k) + if r.isOk and r.get(): + return true + await sleepAsync(chronos.milliseconds(2)) + return false + +proc pollGone( + t: Job, category: string, k: Key, timeoutMs = 1000 +): Future[bool] {.async.} = + let deadline = epochTime() + (timeoutMs.float / 1000.0) + while epochTime() < deadline: + let r = await t.exists(category, k) + if r.isOk and not r.get(): + return true + await sleepAsync(chronos.milliseconds(2)) + return false + +proc mkMsg(channelId: SdsChannelID, msgId: SdsMessageID, lamport: int64): SdsMessage = + SdsMessage.init( + messageId = msgId, + lamportTimestamp = lamport, + causalHistory = @[], + channelId = channelId, + content = @[byte(1), byte(2)], + bloomFilter = @[], + ) + +suite "SDS persistency adapter (0.3.0 snapshot model)": + asyncTest "saveChannelMeta + updateHistory round-trip via loadChannel": + let root = tmpRoot("roundtrip") + defer: + removeDir(root) + let p = Persistency.instance(root).get() + defer: + Persistency.reset() + let job = p.openJob("sds").get() + let persistence = newSdsPersistence(job) + let channelId = "chan-1".SdsChannelID + + var meta = ChannelMeta.init() + meta.lamportTimestamp = 42 + check (await persistence.saveChannelMeta(channelId, meta)).isOk + check (await job.pollExists(CatMeta, toKey(channelId))) + + # append out of (lamport) order on purpose; loadChannel must sort. + var upd = HistoryUpdate.init() + upd.append = @[mkMsg(channelId, "m2", 2), mkMsg(channelId, "m1", 1)] + check (await persistence.updateHistory(channelId, upd)).isOk + check (await job.pollExists(CatLog, key(channelId, "m2"))) + + let data = (await persistence.loadChannel(channelId)).valueOr: + check false + return + check data.meta.lamportTimestamp == 42 + check data.messageHistory.len == 2 + check data.messageHistory[0].messageId == "m1" + check data.messageHistory[1].messageId == "m2" + + asyncTest "loadChannel on a fresh channel returns empty ChannelData": + let root = tmpRoot("empty") + defer: + removeDir(root) + let p = Persistency.instance(root).get() + defer: + Persistency.reset() + let job = p.openJob("sds").get() + let persistence = newSdsPersistence(job) + + let data = (await persistence.loadChannel("nope".SdsChannelID)).valueOr: + check false + return + check data.meta.lamportTimestamp == 0 + check data.messageHistory.len == 0 + + asyncTest "updateHistory evict removes a log row": + let root = tmpRoot("evict") + defer: + removeDir(root) + let p = Persistency.instance(root).get() + defer: + Persistency.reset() + let job = p.openJob("sds").get() + let persistence = newSdsPersistence(job) + let channelId = "c".SdsChannelID + + var upd = HistoryUpdate.init() + upd.append = @[mkMsg(channelId, "a", 1), mkMsg(channelId, "b", 2)] + check (await persistence.updateHistory(channelId, upd)).isOk + check (await job.pollExists(CatLog, key(channelId, "b"))) + + var ev = HistoryUpdate.init() + ev.evict = @["a".SdsMessageID] + check (await persistence.updateHistory(channelId, ev)).isOk + check (await job.pollGone(CatLog, key(channelId, "a"))) + + let data = (await persistence.loadChannel(channelId)).valueOr: + check false + return + check data.messageHistory.len == 1 + check data.messageHistory[0].messageId == "b" + + asyncTest "dropChannel wipes meta and log": + let root = tmpRoot("drop") + defer: + removeDir(root) + let p = Persistency.instance(root).get() + defer: + Persistency.reset() + let job = p.openJob("sds").get() + let persistence = newSdsPersistence(job) + let channelId = "d".SdsChannelID + + var meta = ChannelMeta.init() + meta.lamportTimestamp = 7 + check (await persistence.saveChannelMeta(channelId, meta)).isOk + var upd = HistoryUpdate.init() + upd.append = @[mkMsg(channelId, "x", 1)] + check (await persistence.updateHistory(channelId, upd)).isOk + check (await job.pollExists(CatMeta, toKey(channelId))) + check (await job.pollExists(CatLog, key(channelId, "x"))) + + check (await persistence.dropChannel(channelId)).isOk + check (await job.pollGone(CatMeta, toKey(channelId))) + + let data = (await persistence.loadChannel(channelId)).valueOr: + check false + return + check data.meta.lamportTimestamp == 0 + check data.messageHistory.len == 0 diff --git a/waku/persistency/payload_codec.nim b/waku/persistency/payload_codec.nim deleted file mode 100644 index 04a92bc6e..000000000 --- a/waku/persistency/payload_codec.nim +++ /dev/null @@ -1,331 +0,0 @@ -## Generic length-prefixed blob codec for persistency payloads. -## -## Symmetric counterpart to `keys.nim`'s `encodePart`: every persisted value -## round-trips through `writePart`/`readPart` over a `ReadCtx` cursor. Unlike -## keys, payloads are not byte-wise sort-stable, so strings and byte blobs use -## a **4-byte BE length prefix** (4 GiB ceiling) instead of keys.nim's 2-byte -## (64 KiB) prefix — the cap that originally forced SDS to hand-roll its codec. -## -## ## How a type opts in -## -## Primitives, `string`, `seq[byte]`, `enum`, `distinct`, `Time`, `seq[T]`, -## `HashSet[T]` and tuples already have codecs here. A **named struct** opts in -## with a single line: -## -## ```nim -## BlobCodec(MyType) -## ``` -## -## which emits both `writePart`/`readPart` for `MyType` from its fields, in -## declaration order, and reconstructs the value via `MyType.init(...)` -## (positional). This is the *only* mechanism for structs — there is -## deliberately no `fieldPairs`/default-construct path, so `{.requiresInit.}` -## types (which cannot be zero-initialised) work unchanged. The contract: -## -## * the type is a value object whose `init` takes its fields positionally in -## declaration order; -## * only public fields participate (private fields are invisible to the -## macro and would be dropped — don't persist such types); -## * `BlobCodec` must be called for a field's type *before* the struct -## that contains it (Nim resolves the concrete `writePart`/`readPart` -## top-down). -## -## ## Entry points -## -## `toBlob(v)` → `seq[byte]`, `fromBlob(bytes, T)` → `T` (raises `ValueError` -## on truncated/corrupt input). - -{.push raises: [].} - -import std/[macros, sets, times, typetraits] - -type ReadCtx* = object - buf*: seq[byte] - pos*: int - -proc initReadCtx*(bytes: openArray[byte]): ReadCtx = - ReadCtx(buf: @bytes, pos: 0) - -proc need(r: ReadCtx, n: int) {.raises: [ValueError].} = - if n < 0 or r.pos + n > r.buf.len: - raise newException(ValueError, "truncated payload: need " & $n & " more bytes") - -# ── Fixed-width integers ──────────────────────────────────────────────── - -proc writePart*(buf: var seq[byte], v: uint32) = - buf.add(byte((v shr 24) and 0xFF'u32)) - buf.add(byte((v shr 16) and 0xFF'u32)) - buf.add(byte((v shr 8) and 0xFF'u32)) - buf.add(byte(v and 0xFF'u32)) - -proc readPart*(r: var ReadCtx, _: typedesc[uint32]): uint32 {.raises: [ValueError].} = - r.need(4) - result = - (uint32(r.buf[r.pos]) shl 24) or (uint32(r.buf[r.pos + 1]) shl 16) or - (uint32(r.buf[r.pos + 2]) shl 8) or uint32(r.buf[r.pos + 3]) - r.pos += 4 - -proc writePart*(buf: var seq[byte], v: int64) = - let u = cast[uint64](v) - for shift in countdown(56, 0, 8): - buf.add(byte((u shr shift) and 0xFF'u64)) - -proc readPart*(r: var ReadCtx, _: typedesc[int64]): int64 {.raises: [ValueError].} = - r.need(8) - var u: uint64 = 0 - for i in 0 ..< 8: - u = (u shl 8) or uint64(r.buf[r.pos + i]) - r.pos += 8 - cast[int64](u) - -proc writePart*(buf: var seq[byte], v: int) = - writePart(buf, int64(v)) - -proc readPart*(r: var ReadCtx, _: typedesc[int]): int {.raises: [ValueError].} = - let x = readPart(r, int64) - when sizeof(int) < sizeof(int64): - if x < int64(low(int)) or x > int64(high(int)): - raise newException(ValueError, "int out of range: " & $x) - result = int(x) - -# ── Small scalars ─────────────────────────────────────────────────────── - -proc writePart*(buf: var seq[byte], v: bool) = - buf.add(if v: 1'u8 else: 0'u8) - -proc readPart*(r: var ReadCtx, _: typedesc[bool]): bool {.raises: [ValueError].} = - r.need(1) - result = r.buf[r.pos] != 0'u8 - r.pos += 1 - -proc writePart*(buf: var seq[byte], v: byte) = - buf.add(v) - -proc readPart*(r: var ReadCtx, _: typedesc[byte]): byte {.raises: [ValueError].} = - r.need(1) - result = r.buf[r.pos] - r.pos += 1 - -proc writePart*(buf: var seq[byte], v: char) = - buf.add(byte(v)) - -proc readPart*(r: var ReadCtx, _: typedesc[char]): char {.raises: [ValueError].} = - r.need(1) - result = char(r.buf[r.pos]) - r.pos += 1 - -proc writePart*[E: enum](buf: var seq[byte], v: E) = - writePart(buf, int64(ord(v))) - -proc readPart*[E: enum](r: var ReadCtx, _: typedesc[E]): E {.raises: [ValueError].} = - let x = readPart(r, int64) - let lo = int64(ord(low(E))) - let hi = int64(ord(high(E))) - if x < lo or x > hi: - raise newException(ValueError, "enum value out of range: " & $x) - result = E(x) - -# ── string / seq[byte] (4-byte length) ────────────────────────────────── - -proc writePart*(buf: var seq[byte], s: string) = - writePart(buf, uint32(s.len)) - for c in s: - buf.add(byte(c)) - -proc readPart*(r: var ReadCtx, _: typedesc[string]): string {.raises: [ValueError].} = - let nU = readPart(r, uint32) - if nU > uint32(high(int)): - raise newException(ValueError, "string length out of range: " & $nU) - let n = int(nU) - r.need(n) - result = newString(n) - for i in 0 ..< n: - result[i] = char(r.buf[r.pos + i]) - r.pos += n - -proc writePart*(buf: var seq[byte], b: seq[byte]) = - writePart(buf, uint32(b.len)) - for x in b: - buf.add(x) - -proc readPart*( - r: var ReadCtx, _: typedesc[seq[byte]] -): seq[byte] {.raises: [ValueError].} = - let nU = readPart(r, uint32) - if nU > uint32(high(int)): - raise newException(ValueError, "blob length out of range: " & $nU) - let n = int(nU) - r.need(n) - result = newSeq[byte](n) - for i in 0 ..< n: - result[i] = r.buf[r.pos + i] - r.pos += n - -# ── distinct (e.g. SdsParticipantID = distinct string) ────────────────── - -proc writePart*[T: distinct](buf: var seq[byte], v: T) = - mixin writePart - writePart(buf, distinctBase(T)(v)) - -proc readPart*[T: distinct]( - r: var ReadCtx, _: typedesc[T] -): T {.raises: [ValueError].} = - mixin readPart - T(readPart(r, distinctBase(T))) - -# ── Time ──────────────────────────────────────────────────────────────── - -proc writePart*(buf: var seq[byte], t: Time) = - writePart(buf, t.toUnix()) - writePart(buf, uint32(t.nanosecond)) - -proc readPart*(r: var ReadCtx, _: typedesc[Time]): Time {.raises: [ValueError].} = - let secs = readPart(r, int64) - let nanos = int(readPart(r, uint32)) - if nanos < 0 or nanos > 999_999_999: - raise newException(ValueError, "nanosecond out of range: " & $nanos) - initTime(secs, nanos) - -# ── Containers ────────────────────────────────────────────────────────── - -proc writePart*[T](buf: var seq[byte], xs: seq[T]) = - mixin writePart - writePart(buf, uint32(xs.len)) - for x in xs: - writePart(buf, x) - -proc readPart*[T]( - r: var ReadCtx, _: typedesc[seq[T]] -): seq[T] {.raises: [ValueError].} = - mixin readPart - let nU = readPart(r, uint32) - if nU > uint32(high(int)): - raise newException(ValueError, "sequence length out of range: " & $nU) - let n = int(nU) - result = newSeqOfCap[T](n) - for _ in 0 ..< n: - result.add(readPart(r, T)) -proc writePart*[T](buf: var seq[byte], s: HashSet[T]) = - mixin writePart - writePart(buf, uint32(s.len)) - for x in s: - writePart(buf, x) - -proc readPart*[T]( - r: var ReadCtx, _: typedesc[HashSet[T]] -): HashSet[T] {.raises: [ValueError].} = - mixin readPart - let nU = readPart(r, uint32) - if nU > uint32(high(int)): - raise newException(ValueError, "set length out of range: " & $nU) - let n = int(nU) - result = initHashSet[T](max(n, 2)) - for _ in 0 ..< n: - result.incl(readPart(r, T)) -proc writePart*[T: tuple](buf: var seq[byte], v: T) = - mixin writePart - for f in fields(v): - writePart(buf, f) - -proc readPart*[T: tuple](r: var ReadCtx, _: typedesc[T]): T {.raises: [ValueError].} = - mixin readPart - for f in fields(result): - f = readPart(r, typeof(f)) - -# ── Named-struct derivation ───────────────────────────────────────────── - -proc objectRecList(tSym: NimNode): NimNode {.compileTime.} = - ## Resolve a type symbol to its object's RecList, preserving field types - ## exactly as written (getImpl, not getTypeImpl, so `HashSet[SdsMessageID]` - ## and friends stay named rather than being expanded to their structure). - var body = tSym.getImpl[2] - while body.kind in {nnkRefTy, nnkPtrTy, nnkDistinctTy}: - body = body[0] - doAssert body.kind == nnkObjectTy, - "BlobCodec: expected an object type, got " & treeRepr(body) - body[2] - -macro BlobCodec*(T: typedesc): untyped = - ## Emit `writePart`/`readPart` for a named value object `T`, encoding each - ## public field in declaration order and rebuilding via `T.init(...)`. - let tSym = getTypeInst(T)[1] - let recList = objectRecList(tSym) - - var fieldNames: seq[NimNode] - var fieldTypes: seq[NimNode] - for defs in recList: - if defs.kind != nnkIdentDefs: - continue - # Rebuild the field type from its textual form rather than splicing the - # resolved symbol: a spliced *alias* type symbol (e.g. `SdsMessageID = - # string`) is mis-resolved as a value in `readPart(r, T)`, breaking - # typedesc overload resolution. A fresh ident/expr behaves like literal - # source and resolves to a typedesc correctly. - let ftype = parseExpr(repr(defs[^2])) - for i in 0 ..< defs.len - 2: - var nameNode = defs[i] - if nameNode.kind == nnkPragmaExpr: - nameNode = nameNode[0] - if nameNode.kind == nnkPostfix: - nameNode = nameNode[1] - fieldNames.add(ident($nameNode)) - fieldTypes.add(ftype.copyNimTree) - - let bufId = ident "buf" - let vId = ident "v" - let rId = ident "r" - - # writePart(buf: var seq[byte], v: T) - var writeBody = newStmtList() - for fn in fieldNames: - writeBody.add(newCall(ident "writePart", bufId, newDotExpr(vId, fn))) - let writeProc = newProc( - name = ident "writePart", - params = [ - newEmptyNode(), - newIdentDefs( - bufId, nnkVarTy.newTree(nnkBracketExpr.newTree(ident "seq", ident "byte")) - ), - newIdentDefs(vId, tSym), - ], - body = writeBody, - ) - - # readPart(r: var ReadCtx, _: typedesc[T]): T {.raises: [ValueError].} - var readBody = newStmtList() - var tmps: seq[NimNode] - for i, ft in fieldTypes: - let tmp = genSym(nskLet, "f" & $i) - tmps.add(tmp) - readBody.add(newLetStmt(tmp, newCall(ident "readPart", rId, ft))) - readBody.add(newCall(newDotExpr(tSym, ident "init"), tmps)) - let readProc = newProc( - name = ident "readPart", - params = [ - tSym, - newIdentDefs(rId, nnkVarTy.newTree(ident "ReadCtx")), - newIdentDefs(ident "_", nnkBracketExpr.newTree(ident "typedesc", tSym)), - ], - body = readBody, - ) - readProc.addPragma( - nnkExprColonExpr.newTree(ident "raises", nnkBracket.newTree(ident "ValueError")) - ) - - result = newStmtList(writeProc, readProc) - -# ── Public entry points ───────────────────────────────────────────────── - -proc toBlob*[T](v: T): seq[byte] = - mixin writePart - result = @[] - writePart(result, v) - -proc fromBlob*[T](bytes: openArray[byte], _: typedesc[T]): T {.raises: [ValueError].} = - mixin readPart - var r = initReadCtx(bytes) - result = readPart(r, T) - if r.pos != r.buf.len: - raise newException(ValueError, "trailing payload bytes: " & $(r.buf.len - r.pos)) - -{.pop.} diff --git a/waku/persistency/sds_persistency.nim b/waku/persistency/sds_persistency.nim index 44d551814..a476171dc 100644 --- a/waku/persistency/sds_persistency.nim +++ b/waku/persistency/sds_persistency.nim @@ -1,49 +1,67 @@ -## Adapter that materialises the SDS `Persistence` contract on top of a -## waku-persistency `Job`. One `Job` (== one SQLite file, one worker thread) -## services all channels for a given SDS context; rows are namespaced by -## category and the channelId is the first key component so per-channel -## prefix scans stay cheap. +## Adapter that materialises the SDS `Persistence` contract (nim-sds 0.3.0, +## snapshot model) on top of a waku-persistency `Job`. One `Job` (== one +## SQLite file, one worker thread) services all channels for a given SDS +## context; rows are namespaced by category and the channelId is the first +## key component so per-channel prefix scans stay cheap. ## -## ## Async contract +## ## Snapshot contract (nim-sds 0.3.0) ## -## Every `Persistence` proc field is async (`proc(..): Future[void] -## {.async: (raises: []), gcsafe.}`) — SDS awaits them on its own chronos -## loop. We map each onto the matching async `Job` op: +## The fine-grained per-row callbacks of 0.2.4 are gone. SDS now persists via +## five procs, all `Future[Result[void, string]]` (load returns +## `Result[ChannelData, string]`), `{.async: (raises: []), gcsafe.}`: ## -## * **Writes (save*/remove*)** — call the fire-and-forget `Job.persist*` ops -## through the `safePut`/`safeDelete` helpers, which trap any backend error -## and log it rather than raising (the contract forbids raising). Note that -## Persistency v1 only guarantees the event has been queued when the Future -## resolves — reads immediately after an awaited write can still be racy. -## * **`dropChannel`** — awaits `doDropChannel`, which batches every row of -## the channel into one transactional `persist` (atomic when applied). -## * **`loadAllForChannel`** — awaits `doLoadAll` and returns the snapshot -## the SDS bootstrap path needs. +## * **`saveChannelMeta`** — the complete fast-changing per-channel state +## (lamport clock, outgoing/incoming buffers, both SDS-R repair buffers) +## as ONE blob. Idempotent; a missed write self-heals on the next save. +## * **`updateHistory`** — append newly-delivered messages / evict the +## oldest past the cap, applied as one transactional batch. +## * **`loadChannel`** — bootstrap: returns the prior `ChannelData` +## (meta + ordered message history) or an empty one. Surfaces errors. +## * **`dropChannel`** — wipe all state for a channel. Surfaces errors. +## * **`setRetrievalHint`** — see below; deliberate no-op here. +## +## Failure policy mirrors the interface docs: save/update/hint are non-fatal +## (we log and still return the error string); load/drop are durability-intent +## and propagate their error to the caller. +## +## ## Codec +## +## The blob transform is owned by nim-sds: `ChannelMeta` round-trips through +## `sds/snapshot_codec` (protobuf, schema-versioned — refuses unknown +## versions), and each persisted `SdsMessage` log row through the SDS wire +## codec in `sds/protobuf`. We do not maintain a second codec for these +## shapes (the previous `payload_codec`/`BlobCodec` path is retired). +## +## ## Retrieval hints +## +## `setRetrievalHint` is intentionally a no-op: persisted hints are never read +## back — `loadChannel` returns `ChannelData` (meta + messageHistory) with no +## hint field, and `ChannelMeta` carries none. Hints are supplied live via the +## `onRetrievalHint` provider, so persisting them would be write-only dead +## data. The closure still exists because the field is required by the +## `Persistence` object (SDS calls it from `getRecentHistoryEntries`). ## ## ## Storage layout ## -## | Category | Key | Value | -## |------------------|------------------------------|-----------------------------------------| -## | `sds.lamport` | `key(channelId)` | 8 BE bytes (int64) | -## | `sds.log` | `key(channelId, msgId)` | encoded `SdsMessage` | -## | `sds.hint` | `key(msgId)` | raw hint bytes | -## | `sds.outgoing` | `key(channelId, msgId)` | encoded `UnacknowledgedMessage` | -## | `sds.incoming` | `key(channelId, msgId)` | encoded `IncomingMessage` | -## | `sds.outRepair` | `key(channelId, msgId)` | `(msgId, OutgoingRepairEntry)` encoded | -## | `sds.inRepair` | `key(channelId, msgId)` | `(msgId, IncomingRepairEntry)` encoded | +## | Category | Key | Value | +## |---------------|--------------------------|----------------------------------------| +## | `sds.meta` | `key(channelId)` | `ChannelMeta` (snapshot_codec protobuf)| +## | `sds.log` | `key(channelId, msgId)` | `SdsMessage` (sds wire protobuf) | ## ## `messageHistory` is reconstructed in memory by sorting on ## `(lamportTimestamp, messageId)` — the same total order SDS uses for -## delivery (see sds/sds_utils.nim). Insertion order is not relied upon, so -## `removeLogEntry` works with the natural `(channelId, msgId)` key. +## delivery (see sds/sds_utils.nim). {.push raises: [].} -import std/[algorithm, options, sets, times] +import std/[algorithm, options] import chronos, chronicles, results +import libp2p/protobuf/minprotobuf import ./persistency -import ./payload_codec -import sds/types/persistence +import ./keys +import types/persistence +import snapshot_codec +import protobuf export persistence, persistency @@ -51,155 +69,8 @@ logScope: topics = "sds-persistency" const - CatLamport* = "sds.lamport" + CatMeta* = "sds.meta" CatLog* = "sds.log" - CatHint* = "sds.hint" - CatOutgoing* = "sds.outgoing" - CatIncoming* = "sds.incoming" - CatOutRepair* = "sds.outRepair" - CatInRepair* = "sds.inRepair" - -# ── Blob codecs ───────────────────────────────────────────────────────── -# -# All SDS payload types round-trip through the generic, length-prefixed -# codec in payload_codec.nim. Each `BlobCodec(T)` emits writePart/ -# readPart for `T` from its public fields (declaration order) and rebuilds -# via `T.init(...)`. Order matters: a field's type must be derived before -# the struct that contains it. The repair buffers store `(msgId, entry)` -# tuples, handled by the generic tuple codec. Lamport is a bare int64. - -BlobCodec(HistoryEntry) -BlobCodec(SdsMessage) -BlobCodec(UnacknowledgedMessage) -BlobCodec(IncomingMessage) -BlobCodec(OutgoingRepairEntry) -BlobCodec(IncomingRepairEntry) - -# ── Write helpers ─────────────────────────────────────────────────────── -# -# The Persistence write fields are async with `raises: []`, but the Job ops -# raise `CatchableError`. These wrappers trap and log so the closures stay -# raise-free, preserving the "errors are logged, never raised" contract. - -proc safePut( - job: Job, category: string, k: Key, payload: seq[byte] -) {.async: (raises: []).} = - try: - await job.persistPut(category, k, payload) - except CatchableError as e: - warn "sds-persistency: put failed", category, err = e.msg - -proc safeDelete(job: Job, category: string, k: Key) {.async: (raises: []).} = - try: - await job.persistDelete(category, k) - except CatchableError as e: - warn "sds-persistency: delete failed", category, err = e.msg - -proc safePersist(job: Job, ops: seq[TxOp]) {.async: (raises: []).} = - try: - await job.persist(ops) - except CatchableError as e: - warn "sds-persistency: persist batch failed", opCount = ops.len, err = e.msg - -# ── Async backing procs ───────────────────────────────────────────────── - -proc doLoadAll(job: Job, channelId: SdsChannelID): Future[ChannelSnapshot] {.async.} = - var snap = ChannelSnapshot() - let chanKey = toKey(channelId) - - block lamport: - let opt = (await job.get(CatLamport, chanKey)).valueOr: - warn "sds-persistency: get lamport failed", channelId, err = $error - break lamport - if opt.isSome: - try: - snap.lamportTimestamp = fromBlob(opt.get, int64) - except ValueError as e: - warn "sds-persistency: invalid lamport bytes", channelId, err = e.msg - - block log: - let rows = (await job.scanPrefix(CatLog, chanKey)).valueOr: - warn "sds-persistency: scan log failed", channelId, err = $error - break log - var msgs = newSeq[SdsMessage]() - for row in rows: - try: - msgs.add(fromBlob(row.payload, SdsMessage)) - except ValueError as e: - warn "sds-persistency: invalid log row", channelId, err = e.msg - msgs.sort do(a, b: SdsMessage) -> int: - result = cmp(a.lamportTimestamp, b.lamportTimestamp) - if result == 0: - result = cmp(a.messageId, b.messageId) - snap.messageHistory = msgs - - block outgoing: - let rows = (await job.scanPrefix(CatOutgoing, chanKey)).valueOr: - warn "sds-persistency: scan outgoing failed", channelId, err = $error - break outgoing - for row in rows: - try: - snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage)) - except ValueError as e: - warn "sds-persistency: invalid outgoing row", channelId, err = e.msg - - block incoming: - let rows = (await job.scanPrefix(CatIncoming, chanKey)).valueOr: - warn "sds-persistency: scan incoming failed", channelId, err = $error - break incoming - for row in rows: - try: - snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage)) - except ValueError as e: - warn "sds-persistency: invalid incoming row", channelId, err = e.msg - - block outRepair: - let rows = (await job.scanPrefix(CatOutRepair, chanKey)).valueOr: - warn "sds-persistency: scan out-repair failed", channelId, err = $error - break outRepair - for row in rows: - try: - snap.outgoingRepairBuffer.add( - fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry)) - ) - except ValueError as e: - warn "sds-persistency: invalid out-repair row", channelId, err = e.msg - - block inRepair: - let rows = (await job.scanPrefix(CatInRepair, chanKey)).valueOr: - warn "sds-persistency: scan in-repair failed", channelId, err = $error - break inRepair - for row in rows: - try: - snap.incomingRepairBuffer.add( - fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry)) - ) - except ValueError as e: - warn "sds-persistency: invalid in-repair row", channelId, err = e.msg - - return snap - -proc doDropChannel(job: Job, channelId: SdsChannelID): Future[void] {.async.} = - ## Delete every row belonging to the channel in one transactional batch. - ## Uses txDeletePrefix to push bulk deletes to the worker thread — no - ## caller-side scans needed. Hint rows (keyed by msgId, not channelId) - ## are not cleaned here; they are cascade-deleted by removeLogEntry during - ## normal rolling-history eviction, so by the time a channel is dropped - ## the only remaining hints belong to the still-live log tail. Those - ## become harmless orphans (never reloaded — hints are re-derived on - ## demand from the onRetrievalHint callback). - let chanKey = toKey(channelId) - await safePersist( - job, - @[ - TxOp(category: CatLog, key: chanKey, kind: txDeletePrefix), - TxOp(category: CatOutgoing, key: chanKey, kind: txDeletePrefix), - TxOp(category: CatIncoming, key: chanKey, kind: txDeletePrefix), - TxOp(category: CatOutRepair, key: chanKey, kind: txDeletePrefix), - TxOp(category: CatInRepair, key: chanKey, kind: txDeletePrefix), - TxOp(category: CatLamport, key: chanKey, kind: txDelete), - ], - ) # ── Public factory ────────────────────────────────────────────────────── @@ -207,102 +78,106 @@ proc newSdsPersistence*(job: Job): Persistence {.gcsafe, raises: [].} = ## Build an SDS `Persistence` value backed by ``job``. One Job services ## all channels — channelId is part of every key. ## - ## The closures capture ``job`` by ref. They must be invoked from a - ## thread that owns a running chronos loop (the SDS context's worker - ## thread satisfies this). + ## The closures capture ``job`` by ref. They must be invoked from a thread + ## that owns a running chronos loop (the SDS context's worker thread + ## satisfies this). doAssert not job.isNil, "newSdsPersistence: job is nil" # Built field-by-field via assignment rather than an object literal: every - # field is an async closure whose body is an `await`/`return await` command - # call, which cannot be followed by the `,` field separator a `Persistence( - # ..)` literal would require (the parser cannot tell the comma from another - # command argument). Assignments have no separator, so the bodies stay plain. + # field is an async closure whose body uses `await`/`return` statements, + # which cannot be followed by the `,` field separator a `Persistence(..)` + # literal would require. Assignments have no separator, so bodies stay plain. var persistence = Persistence() - persistence.saveLamport = proc( - channelId: SdsChannelID, lamport: int64 - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatLamport, toKey(channelId), toBlob(lamport)) + persistence.saveChannelMeta = proc( + channelId: SdsChannelID, meta: ChannelMeta + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} = + try: + await job.persistPut(CatMeta, toKey(channelId), encode(meta).buffer) + return ok() + except CatchableError as e: + warn "sds-persistency: saveChannelMeta failed", channelId, err = e.msg + return err(e.msg) - persistence.appendLogEntry = proc( - channelId: SdsChannelID, msg: SdsMessage - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatLog, key(channelId, msg.messageId), toBlob(msg)) + persistence.updateHistory = proc( + channelId: SdsChannelID, update: HistoryUpdate + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} = + if update.isEmpty: + return ok() + # One transactional batch: append rows (txPut) and evictions (txDelete). + var ops = newSeq[TxOp]() + for m in update.append: + ops.add TxOp( + category: CatLog, + key: key(channelId, m.messageId), + kind: txPut, + payload: encode(m).buffer, + ) + for id in update.evict: + ops.add TxOp(category: CatLog, key: key(channelId, id), kind: txDelete) + try: + await job.persist(ops) + return ok() + except CatchableError as e: + warn "sds-persistency: updateHistory failed", + channelId, appended = update.append.len, evicted = update.evict.len, err = e.msg + return err(e.msg) - persistence.removeLogEntry = proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[void] {.async: (raises: []), gcsafe.} = - # Atomic batch: delete the log row and its associated retrieval hint in - # one transaction so they can't diverge. - await safePersist( - job, - @[ - TxOp(category: CatLog, key: key(channelId, msgId), kind: txDelete), - TxOp(category: CatHint, key: toKey(msgId), kind: txDelete), - ], - ) + persistence.loadChannel = proc( + channelId: SdsChannelID + ): Future[Result[ChannelData, string]] {.async: (raises: []), gcsafe.} = + let chanKey = toKey(channelId) + var data = ChannelData.init() + try: + block meta: + let opt = (await job.get(CatMeta, chanKey)).valueOr: + return err("loadChannel: get meta: " & $error) + if opt.isSome: + # schema-versioned decode; refuses unknown versions loudly. + data.meta = ChannelMeta.decode(opt.get).valueOr: + return err("loadChannel: corrupt or unsupported ChannelMeta blob") - persistence.setRetrievalHint = proc( - msgId: SdsMessageID, hint: seq[byte] - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatHint, toKey(msgId), hint) + block history: + let rows = (await job.scanPrefix(CatLog, chanKey)).valueOr: + return err("loadChannel: scan log: " & $error) + var msgs = newSeq[SdsMessage]() + for row in rows: + let m = SdsMessage.decode(row.payload).valueOr: + warn "sds-persistency: skipping undecodable log row", channelId + continue + msgs.add(m) + msgs.sort do(a, b: SdsMessage) -> int: + result = cmp(a.lamportTimestamp, b.lamportTimestamp) + if result == 0: + result = cmp(a.messageId, b.messageId) + data.messageHistory = msgs - persistence.saveOutgoing = proc( - channelId: SdsChannelID, msg: UnacknowledgedMessage - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatOutgoing, key(channelId, msg.message.messageId), toBlob(msg)) - - persistence.removeOutgoing = proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[void] {.async: (raises: []), gcsafe.} = - await safeDelete(job, CatOutgoing, key(channelId, msgId)) - - persistence.saveIncoming = proc( - channelId: SdsChannelID, msg: IncomingMessage - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatIncoming, key(channelId, msg.message.messageId), toBlob(msg)) - - persistence.removeIncoming = proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[void] {.async: (raises: []), gcsafe.} = - await safeDelete(job, CatIncoming, key(channelId, msgId)) - - persistence.saveOutgoingRepair = proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: OutgoingRepairEntry - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatOutRepair, key(channelId, msgId), toBlob((msgId, entry))) - - persistence.removeOutgoingRepair = proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[void] {.async: (raises: []), gcsafe.} = - await safeDelete(job, CatOutRepair, key(channelId, msgId)) - - persistence.saveIncomingRepair = proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: IncomingRepairEntry - ): Future[void] {.async: (raises: []), gcsafe.} = - await safePut(job, CatInRepair, key(channelId, msgId), toBlob((msgId, entry))) - - persistence.removeIncomingRepair = proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[void] {.async: (raises: []), gcsafe.} = - await safeDelete(job, CatInRepair, key(channelId, msgId)) + return ok(data) + except CatchableError as e: + return err("loadChannel: " & e.msg) persistence.dropChannel = proc( channelId: SdsChannelID - ): Future[void] {.async: (raises: []), gcsafe.} = + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} = + let chanKey = toKey(channelId) try: - await doDropChannel(job, channelId) + await job.persist( + @[ + TxOp(category: CatLog, key: chanKey, kind: txDeletePrefix), + TxOp(category: CatMeta, key: chanKey, kind: txDelete), + ] + ) + return ok() except CatchableError as e: error "sds-persistency: dropChannel failed", channelId, err = e.msg + return err(e.msg) - persistence.loadAllForChannel = proc( - channelId: SdsChannelID - ): Future[ChannelSnapshot] {.async: (raises: []), gcsafe.} = - try: - return await doLoadAll(job, channelId) - except CatchableError as e: - error "sds-persistency: loadAllForChannel failed", channelId, err = e.msg - return ChannelSnapshot() + persistence.setRetrievalHint = proc( + msgId: SdsMessageID, hint: seq[byte] + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} = + ## Deliberate no-op — persisted hints are never read back (see module + ## header). Hints are supplied live via the onRetrievalHint provider. + return ok() return persistence