mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-04 05:00:02 +00:00
Utilizing async SDS, enhancement on persistency to simplify buffern serde
This commit is contained in:
parent
c4bff9de0c
commit
4a8e9fe553
@ -5,5 +5,6 @@ import ./test_backend
|
||||
import ./test_lifecycle
|
||||
import ./test_facade
|
||||
import ./test_encoding
|
||||
import ./test_blob_codec
|
||||
import ./test_string_lookup
|
||||
import ./test_singleton
|
||||
|
||||
87
tests/persistency/test_blob_codec.nim
Normal file
87
tests/persistency/test_blob_codec.nim
Normal file
@ -0,0 +1,87 @@
|
||||
{.used.}
|
||||
|
||||
## Round-trip tests for the generic payload blob codec (payload_codec.nim)
|
||||
## and its `BlobCodec` application to the SDS persistence types.
|
||||
##
|
||||
## Importing `sds_persistency` forces the real adapter (and its six
|
||||
## `BlobCodec` calls + call sites) to compile. The local derivations
|
||||
## below live in this module's own scope and drive the round-trip checks.
|
||||
|
||||
import std/[sets, times]
|
||||
import testutils/unittests
|
||||
import waku/persistency/payload_codec
|
||||
import waku/persistency/sds_persistency # compile-checks the real adapter
|
||||
import sds/types/persistence
|
||||
|
||||
# Same order constraint as the adapter: a field's type before its container.
|
||||
BlobCodec(HistoryEntry)
|
||||
BlobCodec(SdsMessage)
|
||||
BlobCodec(UnacknowledgedMessage)
|
||||
BlobCodec(IncomingMessage)
|
||||
BlobCodec(OutgoingRepairEntry)
|
||||
BlobCodec(IncomingRepairEntry)
|
||||
|
||||
proc sampleHistory(): HistoryEntry =
|
||||
HistoryEntry.init("mid-1", @[0xAA'u8, 0xBB, 0xCC], "sender-1".SdsParticipantID)
|
||||
|
||||
proc sampleMessage(content = @[1'u8, 2, 3]): SdsMessage =
|
||||
SdsMessage.init(
|
||||
messageId = "msg-42",
|
||||
lamportTimestamp = 7'i64,
|
||||
causalHistory = @[sampleHistory(), HistoryEntry.init("mid-2")],
|
||||
channelId = "channel-9",
|
||||
content = content,
|
||||
bloomFilter = @[0xDE'u8, 0xAD, 0xBE, 0xEF],
|
||||
senderId = "alice".SdsParticipantID,
|
||||
repairRequest = @[sampleHistory()],
|
||||
)
|
||||
|
||||
suite "Persistency blob codec":
|
||||
test "primitive round-trips":
|
||||
check fromBlob(toBlob(7'i64), int64) == 7'i64
|
||||
check fromBlob(toBlob(int64.low), int64) == int64.low
|
||||
check fromBlob(toBlob("héllo"), string) == "héllo"
|
||||
check fromBlob(toBlob(@[0'u8, 255, 7]), seq[byte]) == @[0'u8, 255, 7]
|
||||
|
||||
test "HistoryEntry round-trips":
|
||||
let h = sampleHistory()
|
||||
check fromBlob(toBlob(h), HistoryEntry) == h
|
||||
|
||||
test "SdsMessage round-trips (seqs, distinct, nested)":
|
||||
let m = sampleMessage()
|
||||
check fromBlob(toBlob(m), SdsMessage) == m
|
||||
|
||||
test "UnacknowledgedMessage round-trips (Time, int)":
|
||||
let u = UnacknowledgedMessage.init(sampleMessage(), initTime(1_700_000_000, 123), 4)
|
||||
check fromBlob(toBlob(u), UnacknowledgedMessage) == u
|
||||
|
||||
test "IncomingMessage round-trips (HashSet)":
|
||||
let inc = IncomingMessage.init(sampleMessage(), toHashSet(["dep-a", "dep-b", "dep-c"]))
|
||||
check fromBlob(toBlob(inc), IncomingMessage) == inc
|
||||
|
||||
test "repair tuples round-trip":
|
||||
let outPair =
|
||||
("msg-42".SdsMessageID, OutgoingRepairEntry.init(sampleHistory(), initTime(10, 0)))
|
||||
check fromBlob(toBlob(outPair), (SdsMessageID, OutgoingRepairEntry)) == outPair
|
||||
|
||||
let inPair = (
|
||||
"msg-42".SdsMessageID,
|
||||
IncomingRepairEntry.init(sampleHistory(), @[9'u8, 8, 7], initTime(20, 500)),
|
||||
)
|
||||
check fromBlob(toBlob(inPair), (SdsMessageID, IncomingRepairEntry)) == inPair
|
||||
|
||||
test "payload exceeds the old 64 KiB key cap (4-byte length)":
|
||||
var big = newSeq[byte](70_000)
|
||||
for i in 0 ..< big.len:
|
||||
big[i] = byte(i and 0xFF)
|
||||
let m = sampleMessage(content = big)
|
||||
let decoded = fromBlob(toBlob(m), SdsMessage)
|
||||
check decoded.content.len == 70_000
|
||||
check decoded == m
|
||||
|
||||
test "truncated input raises ValueError":
|
||||
let bytes = toBlob(sampleMessage())
|
||||
expect ValueError:
|
||||
discard fromBlob(bytes[0 ..< bytes.len - 5], SdsMessage)
|
||||
expect ValueError:
|
||||
discard fromBlob(@[0xFF'u8, 0xFF, 0xFF, 0xFF], string) # claims 4 GiB
|
||||
308
waku/persistency/payload_codec.nim
Normal file
308
waku/persistency/payload_codec.nim
Normal file
@ -0,0 +1,308 @@
|
||||
## Generic length-prefixed blob codec for persistency payloads.
|
||||
##
|
||||
## Symmetric counterpart to `keys.nim`'s `encodePart`: every persisted value
|
||||
## round-trips through `writePart`/`readPart` over a `ReadCtx` cursor. Unlike
|
||||
## keys, payloads are not byte-wise sort-stable, so strings and byte blobs use
|
||||
## a **4-byte BE length prefix** (4 GiB ceiling) instead of keys.nim's 2-byte
|
||||
## (64 KiB) prefix — the cap that originally forced SDS to hand-roll its codec.
|
||||
##
|
||||
## ## How a type opts in
|
||||
##
|
||||
## Primitives, `string`, `seq[byte]`, `enum`, `distinct`, `Time`, `seq[T]`,
|
||||
## `HashSet[T]` and tuples already have codecs here. A **named struct** opts in
|
||||
## with a single line:
|
||||
##
|
||||
## ```nim
|
||||
## BlobCodec(MyType)
|
||||
## ```
|
||||
##
|
||||
## which emits both `writePart`/`readPart` for `MyType` from its fields, in
|
||||
## declaration order, and reconstructs the value via `MyType.init(...)`
|
||||
## (positional). This is the *only* mechanism for structs — there is
|
||||
## deliberately no `fieldPairs`/default-construct path, so `{.requiresInit.}`
|
||||
## types (which cannot be zero-initialised) work unchanged. The contract:
|
||||
##
|
||||
## * the type is a value object whose `init` takes its fields positionally in
|
||||
## declaration order;
|
||||
## * only public fields participate (private fields are invisible to the
|
||||
## macro and would be dropped — don't persist such types);
|
||||
## * `BlobCodec` must be called for a field's type *before* the struct
|
||||
## that contains it (Nim resolves the concrete `writePart`/`readPart`
|
||||
## top-down).
|
||||
##
|
||||
## ## Entry points
|
||||
##
|
||||
## `toBlob(v)` → `seq[byte]`, `fromBlob(bytes, T)` → `T` (raises `ValueError`
|
||||
## on truncated/corrupt input).
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[macros, sets, times, typetraits]
|
||||
|
||||
type ReadCtx* = object
|
||||
buf*: seq[byte]
|
||||
pos*: int
|
||||
|
||||
proc initReadCtx*(bytes: openArray[byte]): ReadCtx =
|
||||
ReadCtx(buf: @bytes, pos: 0)
|
||||
|
||||
proc need(r: ReadCtx, n: int) {.raises: [ValueError].} =
|
||||
if n < 0 or r.pos + n > r.buf.len:
|
||||
raise newException(ValueError, "truncated payload: need " & $n & " more bytes")
|
||||
|
||||
# ── Fixed-width integers ────────────────────────────────────────────────
|
||||
|
||||
proc writePart*(buf: var seq[byte], v: uint32) =
|
||||
buf.add(byte((v shr 24) and 0xFF'u32))
|
||||
buf.add(byte((v shr 16) and 0xFF'u32))
|
||||
buf.add(byte((v shr 8) and 0xFF'u32))
|
||||
buf.add(byte(v and 0xFF'u32))
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[uint32]): uint32 {.raises: [ValueError].} =
|
||||
r.need(4)
|
||||
result =
|
||||
(uint32(r.buf[r.pos]) shl 24) or (uint32(r.buf[r.pos + 1]) shl 16) or
|
||||
(uint32(r.buf[r.pos + 2]) shl 8) or uint32(r.buf[r.pos + 3])
|
||||
r.pos += 4
|
||||
|
||||
proc writePart*(buf: var seq[byte], v: int64) =
|
||||
let u = cast[uint64](v)
|
||||
for shift in countdown(56, 0, 8):
|
||||
buf.add(byte((u shr shift) and 0xFF'u64))
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[int64]): int64 {.raises: [ValueError].} =
|
||||
r.need(8)
|
||||
var u: uint64 = 0
|
||||
for i in 0 ..< 8:
|
||||
u = (u shl 8) or uint64(r.buf[r.pos + i])
|
||||
r.pos += 8
|
||||
cast[int64](u)
|
||||
|
||||
proc writePart*(buf: var seq[byte], v: int) =
|
||||
writePart(buf, int64(v))
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[int]): int {.raises: [ValueError].} =
|
||||
int(readPart(r, int64))
|
||||
|
||||
# ── Small scalars ───────────────────────────────────────────────────────
|
||||
|
||||
proc writePart*(buf: var seq[byte], v: bool) =
|
||||
buf.add(if v: 1'u8 else: 0'u8)
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[bool]): bool {.raises: [ValueError].} =
|
||||
r.need(1)
|
||||
result = r.buf[r.pos] != 0'u8
|
||||
r.pos += 1
|
||||
|
||||
proc writePart*(buf: var seq[byte], v: byte) =
|
||||
buf.add(v)
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[byte]): byte {.raises: [ValueError].} =
|
||||
r.need(1)
|
||||
result = r.buf[r.pos]
|
||||
r.pos += 1
|
||||
|
||||
proc writePart*(buf: var seq[byte], v: char) =
|
||||
buf.add(byte(v))
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[char]): char {.raises: [ValueError].} =
|
||||
r.need(1)
|
||||
result = char(r.buf[r.pos])
|
||||
r.pos += 1
|
||||
|
||||
proc writePart*[E: enum](buf: var seq[byte], v: E) =
|
||||
writePart(buf, int64(ord(v)))
|
||||
|
||||
proc readPart*[E: enum](r: var ReadCtx, _: typedesc[E]): E {.raises: [ValueError].} =
|
||||
E(readPart(r, int64))
|
||||
|
||||
# ── string / seq[byte] (4-byte length) ──────────────────────────────────
|
||||
|
||||
proc writePart*(buf: var seq[byte], s: string) =
|
||||
writePart(buf, uint32(s.len))
|
||||
for c in s:
|
||||
buf.add(byte(c))
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[string]): string {.raises: [ValueError].} =
|
||||
let n = int(readPart(r, uint32))
|
||||
r.need(n)
|
||||
result = newString(n)
|
||||
for i in 0 ..< n:
|
||||
result[i] = char(r.buf[r.pos + i])
|
||||
r.pos += n
|
||||
|
||||
proc writePart*(buf: var seq[byte], b: seq[byte]) =
|
||||
writePart(buf, uint32(b.len))
|
||||
for x in b:
|
||||
buf.add(x)
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[seq[byte]]): seq[byte] {.raises: [ValueError].} =
|
||||
let n = int(readPart(r, uint32))
|
||||
r.need(n)
|
||||
result = newSeq[byte](n)
|
||||
for i in 0 ..< n:
|
||||
result[i] = r.buf[r.pos + i]
|
||||
r.pos += n
|
||||
|
||||
# ── distinct (e.g. SdsParticipantID = distinct string) ──────────────────
|
||||
|
||||
proc writePart*[T: distinct](buf: var seq[byte], v: T) =
|
||||
mixin writePart
|
||||
writePart(buf, distinctBase(T)(v))
|
||||
|
||||
proc readPart*[T: distinct](
|
||||
r: var ReadCtx, _: typedesc[T]
|
||||
): T {.raises: [ValueError].} =
|
||||
mixin readPart
|
||||
T(readPart(r, distinctBase(T)))
|
||||
|
||||
# ── Time ────────────────────────────────────────────────────────────────
|
||||
|
||||
proc writePart*(buf: var seq[byte], t: Time) =
|
||||
writePart(buf, t.toUnix())
|
||||
writePart(buf, uint32(t.nanosecond))
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[Time]): Time {.raises: [ValueError].} =
|
||||
let secs = readPart(r, int64)
|
||||
let nanos = int(readPart(r, uint32))
|
||||
if nanos < 0 or nanos > 999_999_999:
|
||||
raise newException(ValueError, "nanosecond out of range: " & $nanos)
|
||||
initTime(secs, nanos)
|
||||
|
||||
# ── Containers ──────────────────────────────────────────────────────────
|
||||
|
||||
proc writePart*[T](buf: var seq[byte], xs: seq[T]) =
|
||||
mixin writePart
|
||||
writePart(buf, uint32(xs.len))
|
||||
for x in xs:
|
||||
writePart(buf, x)
|
||||
|
||||
proc readPart*[T](
|
||||
r: var ReadCtx, _: typedesc[seq[T]]
|
||||
): seq[T] {.raises: [ValueError].} =
|
||||
mixin readPart
|
||||
let n = int(readPart(r, uint32))
|
||||
result = newSeqOfCap[T](n)
|
||||
for _ in 0 ..< n:
|
||||
result.add(readPart(r, T))
|
||||
|
||||
proc writePart*[T](buf: var seq[byte], s: HashSet[T]) =
|
||||
mixin writePart
|
||||
writePart(buf, uint32(s.len))
|
||||
for x in s:
|
||||
writePart(buf, x)
|
||||
|
||||
proc readPart*[T](
|
||||
r: var ReadCtx, _: typedesc[HashSet[T]]
|
||||
): HashSet[T] {.raises: [ValueError].} =
|
||||
mixin readPart
|
||||
let n = int(readPart(r, uint32))
|
||||
result = initHashSet[T](max(n, 2))
|
||||
for _ in 0 ..< n:
|
||||
result.incl(readPart(r, T))
|
||||
|
||||
proc writePart*[T: tuple](buf: var seq[byte], v: T) =
|
||||
mixin writePart
|
||||
for f in fields(v):
|
||||
writePart(buf, f)
|
||||
|
||||
proc readPart*[T: tuple](
|
||||
r: var ReadCtx, _: typedesc[T]
|
||||
): T {.raises: [ValueError].} =
|
||||
mixin readPart
|
||||
for f in fields(result):
|
||||
f = readPart(r, typeof(f))
|
||||
|
||||
# ── Named-struct derivation ─────────────────────────────────────────────
|
||||
|
||||
proc objectRecList(tSym: NimNode): NimNode {.compileTime.} =
|
||||
## Resolve a type symbol to its object's RecList, preserving field types
|
||||
## exactly as written (getImpl, not getTypeImpl, so `HashSet[SdsMessageID]`
|
||||
## and friends stay named rather than being expanded to their structure).
|
||||
var body = tSym.getImpl[2]
|
||||
while body.kind in {nnkRefTy, nnkPtrTy, nnkDistinctTy}:
|
||||
body = body[0]
|
||||
doAssert body.kind == nnkObjectTy,
|
||||
"BlobCodec: expected an object type, got " & treeRepr(body)
|
||||
body[2]
|
||||
|
||||
macro BlobCodec*(T: typedesc): untyped =
|
||||
## Emit `writePart`/`readPart` for a named value object `T`, encoding each
|
||||
## public field in declaration order and rebuilding via `T.init(...)`.
|
||||
let tSym = getTypeInst(T)[1]
|
||||
let recList = objectRecList(tSym)
|
||||
|
||||
var fieldNames: seq[NimNode]
|
||||
var fieldTypes: seq[NimNode]
|
||||
for defs in recList:
|
||||
if defs.kind != nnkIdentDefs:
|
||||
continue
|
||||
# Rebuild the field type from its textual form rather than splicing the
|
||||
# resolved symbol: a spliced *alias* type symbol (e.g. `SdsMessageID =
|
||||
# string`) is mis-resolved as a value in `readPart(r, T)`, breaking
|
||||
# typedesc overload resolution. A fresh ident/expr behaves like literal
|
||||
# source and resolves to a typedesc correctly.
|
||||
let ftype = parseExpr(repr(defs[^2]))
|
||||
for i in 0 ..< defs.len - 2:
|
||||
var nameNode = defs[i]
|
||||
if nameNode.kind == nnkPragmaExpr:
|
||||
nameNode = nameNode[0]
|
||||
if nameNode.kind == nnkPostfix:
|
||||
nameNode = nameNode[1]
|
||||
fieldNames.add(ident($nameNode))
|
||||
fieldTypes.add(ftype.copyNimTree)
|
||||
|
||||
let bufId = ident "buf"
|
||||
let vId = ident "v"
|
||||
let rId = ident "r"
|
||||
|
||||
# writePart(buf: var seq[byte], v: T)
|
||||
var writeBody = newStmtList()
|
||||
for fn in fieldNames:
|
||||
writeBody.add(newCall(ident "writePart", bufId, newDotExpr(vId, fn)))
|
||||
let writeProc = newProc(
|
||||
name = ident "writePart",
|
||||
params = [
|
||||
newEmptyNode(),
|
||||
newIdentDefs(bufId, nnkVarTy.newTree(nnkBracketExpr.newTree(ident "seq", ident "byte"))),
|
||||
newIdentDefs(vId, tSym),
|
||||
],
|
||||
body = writeBody,
|
||||
)
|
||||
|
||||
# readPart(r: var ReadCtx, _: typedesc[T]): T {.raises: [ValueError].}
|
||||
var readBody = newStmtList()
|
||||
var tmps: seq[NimNode]
|
||||
for i, ft in fieldTypes:
|
||||
let tmp = genSym(nskLet, "f" & $i)
|
||||
tmps.add(tmp)
|
||||
readBody.add(newLetStmt(tmp, newCall(ident "readPart", rId, ft)))
|
||||
readBody.add(newCall(newDotExpr(tSym, ident "init"), tmps))
|
||||
let readProc = newProc(
|
||||
name = ident "readPart",
|
||||
params = [
|
||||
tSym,
|
||||
newIdentDefs(rId, nnkVarTy.newTree(ident "ReadCtx")),
|
||||
newIdentDefs(ident "_", nnkBracketExpr.newTree(ident "typedesc", tSym)),
|
||||
],
|
||||
body = readBody,
|
||||
)
|
||||
readProc.addPragma(nnkExprColonExpr.newTree(
|
||||
ident "raises", nnkBracket.newTree(ident "ValueError")
|
||||
))
|
||||
|
||||
result = newStmtList(writeProc, readProc)
|
||||
|
||||
# ── Public entry points ─────────────────────────────────────────────────
|
||||
|
||||
proc toBlob*[T](v: T): seq[byte] =
|
||||
mixin writePart
|
||||
result = @[]
|
||||
writePart(result, v)
|
||||
|
||||
proc fromBlob*[T](bytes: openArray[byte], _: typedesc[T]): T {.raises: [ValueError].} =
|
||||
mixin readPart
|
||||
var r = initReadCtx(bytes)
|
||||
readPart(r, T)
|
||||
|
||||
{.pop.}
|
||||
323
waku/persistency/sds_persistency.nim
Normal file
323
waku/persistency/sds_persistency.nim
Normal file
@ -0,0 +1,323 @@
|
||||
## Adapter that materialises the SDS `Persistence` contract on top of a
|
||||
## waku-persistency `Job`. One `Job` (== one SQLite file, one worker thread)
|
||||
## services all channels for a given SDS context; rows are namespaced by
|
||||
## category and the channelId is the first key component so per-channel
|
||||
## prefix scans stay cheap.
|
||||
##
|
||||
## ## Async contract
|
||||
##
|
||||
## Every `Persistence` proc field is async (`proc(..): Future[void]
|
||||
## {.async: (raises: []), gcsafe.}`) — SDS awaits them on its own chronos
|
||||
## loop. We map each onto the matching async `Job` op:
|
||||
##
|
||||
## * **Writes (save*/remove*)** — `await` the Job op through the `safePut`/
|
||||
## `safeDelete` helpers, which trap any backend error and log it rather
|
||||
## than raising (the contract forbids raising).
|
||||
## * **`dropChannel`** — awaits `doDropChannel`, which batches every row of
|
||||
## the channel into one transactional `persist` so the wipe is atomic
|
||||
## (called from removeChannel / resetReliabilityManager).
|
||||
## * **`loadAllForChannel`** — awaits `doLoadAll` and returns the snapshot
|
||||
## the SDS bootstrap path needs.
|
||||
##
|
||||
## ## Storage layout
|
||||
##
|
||||
## | Category | Key | Value |
|
||||
## |------------------|------------------------------|-----------------------------------------|
|
||||
## | `sds.lamport` | `key(channelId)` | 8 BE bytes (int64) |
|
||||
## | `sds.log` | `key(channelId, msgId)` | encoded `SdsMessage` |
|
||||
## | `sds.hint` | `key(msgId)` | raw hint bytes |
|
||||
## | `sds.outgoing` | `key(channelId, msgId)` | encoded `UnacknowledgedMessage` |
|
||||
## | `sds.incoming` | `key(channelId, msgId)` | encoded `IncomingMessage` |
|
||||
## | `sds.outRepair` | `key(channelId, msgId)` | `(msgId, OutgoingRepairEntry)` encoded |
|
||||
## | `sds.inRepair` | `key(channelId, msgId)` | `(msgId, IncomingRepairEntry)` encoded |
|
||||
##
|
||||
## `messageHistory` is reconstructed in memory by sorting on
|
||||
## `(lamportTimestamp, messageId)` — the same total order SDS uses for
|
||||
## delivery (see sds/sds_utils.nim). Insertion order is not relied upon, so
|
||||
## `removeLogEntry` works with the natural `(channelId, msgId)` key.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[algorithm, options, sets, times]
|
||||
import chronos, chronicles, results
|
||||
import ./persistency
|
||||
import ./payload_codec
|
||||
import sds/types/persistence
|
||||
|
||||
export persistence, persistency
|
||||
|
||||
logScope:
|
||||
topics = "sds-persistency"
|
||||
|
||||
const
|
||||
CatLamport* = "sds.lamport"
|
||||
CatLog* = "sds.log"
|
||||
CatHint* = "sds.hint"
|
||||
CatOutgoing* = "sds.outgoing"
|
||||
CatIncoming* = "sds.incoming"
|
||||
CatOutRepair* = "sds.outRepair"
|
||||
CatInRepair* = "sds.inRepair"
|
||||
|
||||
# ── Blob codecs ─────────────────────────────────────────────────────────
|
||||
#
|
||||
# All SDS payload types round-trip through the generic, length-prefixed
|
||||
# codec in payload_codec.nim. Each `BlobCodec(T)` emits writePart/
|
||||
# readPart for `T` from its public fields (declaration order) and rebuilds
|
||||
# via `T.init(...)`. Order matters: a field's type must be derived before
|
||||
# the struct that contains it. The repair buffers store `(msgId, entry)`
|
||||
# tuples, handled by the generic tuple codec. Lamport is a bare int64.
|
||||
|
||||
BlobCodec(HistoryEntry)
|
||||
BlobCodec(SdsMessage)
|
||||
BlobCodec(UnacknowledgedMessage)
|
||||
BlobCodec(IncomingMessage)
|
||||
BlobCodec(OutgoingRepairEntry)
|
||||
BlobCodec(IncomingRepairEntry)
|
||||
|
||||
# ── Async backing procs ─────────────────────────────────────────────────
|
||||
|
||||
proc doLoadAll(
|
||||
job: Job, channelId: SdsChannelID
|
||||
): Future[ChannelSnapshot] {.async.} =
|
||||
var snap = ChannelSnapshot()
|
||||
let chanKey = toKey(channelId)
|
||||
|
||||
block lamport:
|
||||
let r = await job.get(CatLamport, chanKey)
|
||||
if r.isOk:
|
||||
let opt = r.get
|
||||
if opt.isSome:
|
||||
try:
|
||||
snap.lamportTimestamp = fromBlob(opt.get, int64)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid lamport bytes",
|
||||
channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: get lamport failed",
|
||||
channelId, err = $r.error
|
||||
|
||||
block log:
|
||||
let r = await job.scanPrefix(CatLog, chanKey)
|
||||
if r.isOk:
|
||||
var msgs = newSeq[SdsMessage]()
|
||||
for row in r.get:
|
||||
try:
|
||||
msgs.add(fromBlob(row.payload, SdsMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid log row", channelId, err = e.msg
|
||||
msgs.sort do(a, b: SdsMessage) -> int:
|
||||
result = cmp(a.lamportTimestamp, b.lamportTimestamp)
|
||||
if result == 0:
|
||||
result = cmp(a.messageId, b.messageId)
|
||||
snap.messageHistory = msgs
|
||||
else:
|
||||
warn "sds-persistency: scan log failed",
|
||||
channelId, err = $r.error
|
||||
|
||||
block outgoing:
|
||||
let r = await job.scanPrefix(CatOutgoing, chanKey)
|
||||
if r.isOk:
|
||||
for row in r.get:
|
||||
try:
|
||||
snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid outgoing row",
|
||||
channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: scan outgoing failed",
|
||||
channelId, err = $r.error
|
||||
|
||||
block incoming:
|
||||
let r = await job.scanPrefix(CatIncoming, chanKey)
|
||||
if r.isOk:
|
||||
for row in r.get:
|
||||
try:
|
||||
snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid incoming row",
|
||||
channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: scan incoming failed",
|
||||
channelId, err = $r.error
|
||||
|
||||
block outRepair:
|
||||
let r = await job.scanPrefix(CatOutRepair, chanKey)
|
||||
if r.isOk:
|
||||
for row in r.get:
|
||||
try:
|
||||
snap.outgoingRepairBuffer.add(
|
||||
fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry))
|
||||
)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid out-repair row",
|
||||
channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: scan out-repair failed",
|
||||
channelId, err = $r.error
|
||||
|
||||
block inRepair:
|
||||
let r = await job.scanPrefix(CatInRepair, chanKey)
|
||||
if r.isOk:
|
||||
for row in r.get:
|
||||
try:
|
||||
snap.incomingRepairBuffer.add(
|
||||
fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry))
|
||||
)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid in-repair row",
|
||||
channelId, err = e.msg
|
||||
else:
|
||||
warn "sds-persistency: scan in-repair failed",
|
||||
channelId, err = $r.error
|
||||
|
||||
return snap
|
||||
|
||||
proc doDropChannel(job: Job, channelId: SdsChannelID): Future[void] {.async.} =
|
||||
## Collect every row belonging to the channel and submit them as a single
|
||||
## TxOp batch — the backend applies that as one BEGIN IMMEDIATE/COMMIT,
|
||||
## which is the atomicity the SDS contract asks for.
|
||||
let chanKey = toKey(channelId)
|
||||
var ops: seq[TxOp] = @[]
|
||||
var hintIds: seq[SdsMessageID] = @[]
|
||||
|
||||
let cats =
|
||||
[CatLog, CatOutgoing, CatIncoming, CatOutRepair, CatInRepair]
|
||||
for cat in cats:
|
||||
let r = await job.scanPrefix(cat, chanKey)
|
||||
if r.isOk:
|
||||
for row in r.get:
|
||||
ops.add(TxOp(category: cat, key: row.key, kind: txDelete))
|
||||
if cat == CatLog:
|
||||
try:
|
||||
hintIds.add(fromBlob(row.payload, SdsMessage).messageId)
|
||||
except ValueError:
|
||||
discard
|
||||
else:
|
||||
warn "sds-persistency: scan during drop failed",
|
||||
channelId, category = cat, err = $r.error
|
||||
|
||||
ops.add(TxOp(category: CatLamport, key: chanKey, kind: txDelete))
|
||||
for id in hintIds:
|
||||
ops.add(TxOp(category: CatHint, key: toKey(id), kind: txDelete))
|
||||
|
||||
if ops.len > 0:
|
||||
await job.persist(ops)
|
||||
|
||||
# ── Write helpers ───────────────────────────────────────────────────────
|
||||
#
|
||||
# The Persistence write fields are async with `raises: []`, but the Job ops
|
||||
# raise `CatchableError`. These wrappers trap and log so the closures stay
|
||||
# raise-free, preserving the "errors are logged, never raised" contract.
|
||||
|
||||
proc safePut(
|
||||
job: Job, category: string, k: Key, payload: seq[byte]
|
||||
) {.async: (raises: []).} =
|
||||
try:
|
||||
await job.persistPut(category, k, payload)
|
||||
except CatchableError as e:
|
||||
warn "sds-persistency: put failed", category, err = e.msg
|
||||
|
||||
proc safeDelete(job: Job, category: string, k: Key) {.async: (raises: []).} =
|
||||
try:
|
||||
await job.persistDelete(category, k)
|
||||
except CatchableError as e:
|
||||
warn "sds-persistency: delete failed", category, err = e.msg
|
||||
|
||||
# ── Public factory ──────────────────────────────────────────────────────
|
||||
|
||||
proc newSdsPersistence*(job: Job): Persistence {.gcsafe, raises: [].} =
|
||||
## Build an SDS `Persistence` value backed by ``job``. One Job services
|
||||
## all channels — channelId is part of every key.
|
||||
##
|
||||
## The closures capture ``job`` by ref. They must be invoked from a
|
||||
## thread that owns a running chronos loop (the SDS context's worker
|
||||
## thread satisfies this).
|
||||
doAssert not job.isNil, "newSdsPersistence: job is nil"
|
||||
|
||||
# Built field-by-field via assignment rather than an object literal: every
|
||||
# field is an async closure whose body is an `await`/`return await` command
|
||||
# call, which cannot be followed by the `,` field separator a `Persistence(
|
||||
# ..)` literal would require (the parser cannot tell the comma from another
|
||||
# command argument). Assignments have no separator, so the bodies stay plain.
|
||||
var persistence = Persistence()
|
||||
|
||||
persistence.saveLamport = proc(
|
||||
channelId: SdsChannelID, lamport: int64
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatLamport, toKey(channelId), toBlob(lamport))
|
||||
|
||||
persistence.appendLogEntry = proc(
|
||||
channelId: SdsChannelID, msg: SdsMessage
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatLog, key(channelId, msg.messageId), toBlob(msg))
|
||||
|
||||
persistence.removeLogEntry = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safeDelete(job, CatLog, key(channelId, msgId))
|
||||
|
||||
persistence.setRetrievalHint = proc(
|
||||
msgId: SdsMessageID, hint: seq[byte]
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatHint, toKey(msgId), hint)
|
||||
|
||||
persistence.saveOutgoing = proc(
|
||||
channelId: SdsChannelID, msg: UnacknowledgedMessage
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatOutgoing, key(channelId, msg.message.messageId), toBlob(msg))
|
||||
|
||||
persistence.removeOutgoing = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safeDelete(job, CatOutgoing, key(channelId, msgId))
|
||||
|
||||
persistence.saveIncoming = proc(
|
||||
channelId: SdsChannelID, msg: IncomingMessage
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatIncoming, key(channelId, msg.message.messageId), toBlob(msg))
|
||||
|
||||
persistence.removeIncoming = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safeDelete(job, CatIncoming, key(channelId, msgId))
|
||||
|
||||
persistence.saveOutgoingRepair = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID, entry: OutgoingRepairEntry
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatOutRepair, key(channelId, msgId), toBlob((msgId, entry)))
|
||||
|
||||
persistence.removeOutgoingRepair = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safeDelete(job, CatOutRepair, key(channelId, msgId))
|
||||
|
||||
persistence.saveIncomingRepair = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID, entry: IncomingRepairEntry
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatInRepair, key(channelId, msgId), toBlob((msgId, entry)))
|
||||
|
||||
persistence.removeIncomingRepair = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safeDelete(job, CatInRepair, key(channelId, msgId))
|
||||
|
||||
persistence.dropChannel = proc(
|
||||
channelId: SdsChannelID
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
try:
|
||||
await doDropChannel(job, channelId)
|
||||
except CatchableError as e:
|
||||
error "sds-persistency: dropChannel failed", channelId, err = e.msg
|
||||
|
||||
persistence.loadAllForChannel = proc(
|
||||
channelId: SdsChannelID
|
||||
): Future[ChannelSnapshot] {.async: (raises: []), gcsafe.} =
|
||||
try:
|
||||
return await doLoadAll(job, channelId)
|
||||
except CatchableError as e:
|
||||
error "sds-persistency: loadAllForChannel failed", channelId, err = e.msg
|
||||
return ChannelSnapshot()
|
||||
|
||||
return persistence
|
||||
|
||||
{.pop.}
|
||||
3
waku/waku_persistency.nim
Normal file
3
waku/waku_persistency.nim
Normal file
@ -0,0 +1,3 @@
|
||||
import waku/persistency/persistency
|
||||
|
||||
export persistency
|
||||
Loading…
x
Reference in New Issue
Block a user