mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-04 05:00:02 +00:00
persistency: follow nim-sds 0.3.0 snapshot persistence contract
nim-sds 0.3.0 replaced the ~14 fine-grained per-row Persistence callbacks with a 5-proc snapshot model (saveChannelMeta / updateHistory / loadChannel / dropChannel / setRetrievalHint), all returning Future[Result[...]]. Rewrite waku/persistency/sds_persistency.nim accordingly: - ChannelMeta is stored as one blob per channel; the message log as append/evict rows. Categories collapse from 7 to 2 (sds.meta, sds.log). - Blob transform uses nim-sds' own codecs: snapshot_codec (schema-versioned protobuf) for ChannelMeta, the SDS wire codec for SdsMessage log rows. The generic payload_codec/BlobCodec path is retired (removed payload_codec.nim and test_blob_codec.nim). - setRetrievalHint is a deliberate no-op: persisted hints are never read back (loadChannel/ChannelMeta carry none; hints are supplied live via the onRetrievalHint provider). The closure stays because the field is required. - Fix the module import spelling (srcDir="sds" => bare module paths), which the previous adapter got wrong and never compiled against the locked deps. Add tests/persistency/test_sds_persistency.nim (round-trip, empty-load, evict, drop) replacing test_blob_codec in test_all. Full persistency suite passes 74/74 under both refc and ORC. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
5b3bce6334
commit
fbdff090b1
@ -5,6 +5,6 @@ import ./test_backend
|
||||
import ./test_lifecycle
|
||||
import ./test_facade
|
||||
import ./test_encoding
|
||||
import ./test_blob_codec
|
||||
import ./test_sds_persistency
|
||||
import ./test_string_lookup
|
||||
import ./test_singleton
|
||||
|
||||
@ -1,94 +0,0 @@
|
||||
{.used.}
|
||||
|
||||
## Round-trip tests for the generic payload blob codec (payload_codec.nim)
|
||||
## and its `BlobCodec` application to the SDS persistence types.
|
||||
##
|
||||
## Importing `sds_persistency` forces the real adapter (and its six
|
||||
## `BlobCodec` calls + call sites) to compile. The local derivations
|
||||
## below live in this module's own scope and drive the round-trip checks.
|
||||
|
||||
import std/[sets, times]
|
||||
import testutils/unittests
|
||||
import waku/persistency/payload_codec
|
||||
import waku/persistency/sds_persistency # compile-checks the real adapter
|
||||
import sds/types/persistence
|
||||
|
||||
# Same order constraint as the adapter: a field's type before its container.
|
||||
BlobCodec(HistoryEntry)
|
||||
BlobCodec(SdsMessage)
|
||||
BlobCodec(UnacknowledgedMessage)
|
||||
BlobCodec(IncomingMessage)
|
||||
BlobCodec(OutgoingRepairEntry)
|
||||
BlobCodec(IncomingRepairEntry)
|
||||
|
||||
proc sampleHistory(): HistoryEntry =
|
||||
HistoryEntry.init("mid-1", @[0xAA'u8, 0xBB, 0xCC], "sender-1".SdsParticipantID)
|
||||
|
||||
proc sampleMessage(content = @[1'u8, 2, 3]): SdsMessage =
|
||||
SdsMessage.init(
|
||||
messageId = "msg-42",
|
||||
lamportTimestamp = 7'i64,
|
||||
causalHistory = @[sampleHistory(), HistoryEntry.init("mid-2")],
|
||||
channelId = "channel-9",
|
||||
content = content,
|
||||
bloomFilter = @[0xDE'u8, 0xAD, 0xBE, 0xEF],
|
||||
senderId = "alice".SdsParticipantID,
|
||||
repairRequest = @[sampleHistory()],
|
||||
)
|
||||
|
||||
suite "Persistency blob codec":
|
||||
test "primitive round-trips":
|
||||
check fromBlob(toBlob(7'i64), int64) == 7'i64
|
||||
check fromBlob(toBlob(int64.low), int64) == int64.low
|
||||
check fromBlob(toBlob("héllo"), string) == "héllo"
|
||||
check fromBlob(toBlob(@[0'u8, 255, 7]), seq[byte]) == @[0'u8, 255, 7]
|
||||
|
||||
test "HistoryEntry round-trips":
|
||||
let h = sampleHistory()
|
||||
check fromBlob(toBlob(h), HistoryEntry) == h
|
||||
|
||||
test "SdsMessage round-trips (seqs, distinct, nested)":
|
||||
let m = sampleMessage()
|
||||
check fromBlob(toBlob(m), SdsMessage) == m
|
||||
|
||||
test "UnacknowledgedMessage round-trips (Time, int)":
|
||||
let u = UnacknowledgedMessage.init(sampleMessage(), initTime(1_700_000_000, 123), 4)
|
||||
check fromBlob(toBlob(u), UnacknowledgedMessage) == u
|
||||
|
||||
test "IncomingMessage round-trips (HashSet)":
|
||||
let inc =
|
||||
IncomingMessage.init(sampleMessage(), toHashSet(["dep-a", "dep-b", "dep-c"]))
|
||||
check fromBlob(toBlob(inc), IncomingMessage) == inc
|
||||
|
||||
test "repair tuples round-trip":
|
||||
let outPair = (
|
||||
"msg-42".SdsMessageID, OutgoingRepairEntry.init(sampleHistory(), initTime(10, 0))
|
||||
)
|
||||
check fromBlob(toBlob(outPair), (SdsMessageID, OutgoingRepairEntry)) == outPair
|
||||
|
||||
let inPair = (
|
||||
"msg-42".SdsMessageID,
|
||||
IncomingRepairEntry.init(sampleHistory(), @[9'u8, 8, 7], initTime(20, 500)),
|
||||
)
|
||||
check fromBlob(toBlob(inPair), (SdsMessageID, IncomingRepairEntry)) == inPair
|
||||
|
||||
test "payload exceeds the old 64 KiB key cap (4-byte length)":
|
||||
var big = newSeq[byte](70_000)
|
||||
for i in 0 ..< big.len:
|
||||
big[i] = byte(i and 0xFF)
|
||||
let m = sampleMessage(content = big)
|
||||
let decoded = fromBlob(toBlob(m), SdsMessage)
|
||||
check decoded.content.len == 70_000
|
||||
check decoded == m
|
||||
|
||||
test "truncated input raises ValueError":
|
||||
let bytes = toBlob(sampleMessage())
|
||||
expect ValueError:
|
||||
discard fromBlob(bytes[0 ..< bytes.len - 5], SdsMessage)
|
||||
expect ValueError:
|
||||
discard fromBlob(@[0xFF'u8, 0xFF, 0xFF, 0xFF], string) # claims 4 GiB
|
||||
|
||||
test "trailing input raises ValueError":
|
||||
let bytes = toBlob(sampleMessage())
|
||||
expect ValueError:
|
||||
discard fromBlob(bytes & @[0x00'u8], SdsMessage)
|
||||
155
tests/persistency/test_sds_persistency.nim
Normal file
155
tests/persistency/test_sds_persistency.nim
Normal file
@ -0,0 +1,155 @@
|
||||
{.used.}
|
||||
|
||||
## Behavioural tests for the SDS Persistence adapter (nim-sds 0.3.0 snapshot
|
||||
## model). Importing `sds_persistency` also compile-checks the real adapter.
|
||||
##
|
||||
## Writes go through the fire-and-forget Job path (the Future resolves when
|
||||
## the op is queued, not applied — Persistency v1), so every read-back polls
|
||||
## until the row appears/disappears.
|
||||
|
||||
import std/[options, os, times]
|
||||
import chronos, results
|
||||
import testutils/unittests
|
||||
import waku/persistency/persistency
|
||||
import waku/persistency/keys
|
||||
import waku/persistency/sds_persistency
|
||||
|
||||
proc tmpRoot(label: string): string =
|
||||
let p = getTempDir() / ("sds_persistency_test_" & label & "_" & $epochTime().int)
|
||||
removeDir(p)
|
||||
p
|
||||
|
||||
proc pollExists(
|
||||
t: Job, category: string, k: Key, timeoutMs = 1000
|
||||
): Future[bool] {.async.} =
|
||||
let deadline = epochTime() + (timeoutMs.float / 1000.0)
|
||||
while epochTime() < deadline:
|
||||
let r = await t.exists(category, k)
|
||||
if r.isOk and r.get():
|
||||
return true
|
||||
await sleepAsync(chronos.milliseconds(2))
|
||||
return false
|
||||
|
||||
proc pollGone(
|
||||
t: Job, category: string, k: Key, timeoutMs = 1000
|
||||
): Future[bool] {.async.} =
|
||||
let deadline = epochTime() + (timeoutMs.float / 1000.0)
|
||||
while epochTime() < deadline:
|
||||
let r = await t.exists(category, k)
|
||||
if r.isOk and not r.get():
|
||||
return true
|
||||
await sleepAsync(chronos.milliseconds(2))
|
||||
return false
|
||||
|
||||
proc mkMsg(channelId: SdsChannelID, msgId: SdsMessageID, lamport: int64): SdsMessage =
|
||||
SdsMessage.init(
|
||||
messageId = msgId,
|
||||
lamportTimestamp = lamport,
|
||||
causalHistory = @[],
|
||||
channelId = channelId,
|
||||
content = @[byte(1), byte(2)],
|
||||
bloomFilter = @[],
|
||||
)
|
||||
|
||||
suite "SDS persistency adapter (0.3.0 snapshot model)":
|
||||
asyncTest "saveChannelMeta + updateHistory round-trip via loadChannel":
|
||||
let root = tmpRoot("roundtrip")
|
||||
defer:
|
||||
removeDir(root)
|
||||
let p = Persistency.instance(root).get()
|
||||
defer:
|
||||
Persistency.reset()
|
||||
let job = p.openJob("sds").get()
|
||||
let persistence = newSdsPersistence(job)
|
||||
let channelId = "chan-1".SdsChannelID
|
||||
|
||||
var meta = ChannelMeta.init()
|
||||
meta.lamportTimestamp = 42
|
||||
check (await persistence.saveChannelMeta(channelId, meta)).isOk
|
||||
check (await job.pollExists(CatMeta, toKey(channelId)))
|
||||
|
||||
# append out of (lamport) order on purpose; loadChannel must sort.
|
||||
var upd = HistoryUpdate.init()
|
||||
upd.append = @[mkMsg(channelId, "m2", 2), mkMsg(channelId, "m1", 1)]
|
||||
check (await persistence.updateHistory(channelId, upd)).isOk
|
||||
check (await job.pollExists(CatLog, key(channelId, "m2")))
|
||||
|
||||
let data = (await persistence.loadChannel(channelId)).valueOr:
|
||||
check false
|
||||
return
|
||||
check data.meta.lamportTimestamp == 42
|
||||
check data.messageHistory.len == 2
|
||||
check data.messageHistory[0].messageId == "m1"
|
||||
check data.messageHistory[1].messageId == "m2"
|
||||
|
||||
asyncTest "loadChannel on a fresh channel returns empty ChannelData":
|
||||
let root = tmpRoot("empty")
|
||||
defer:
|
||||
removeDir(root)
|
||||
let p = Persistency.instance(root).get()
|
||||
defer:
|
||||
Persistency.reset()
|
||||
let job = p.openJob("sds").get()
|
||||
let persistence = newSdsPersistence(job)
|
||||
|
||||
let data = (await persistence.loadChannel("nope".SdsChannelID)).valueOr:
|
||||
check false
|
||||
return
|
||||
check data.meta.lamportTimestamp == 0
|
||||
check data.messageHistory.len == 0
|
||||
|
||||
asyncTest "updateHistory evict removes a log row":
|
||||
let root = tmpRoot("evict")
|
||||
defer:
|
||||
removeDir(root)
|
||||
let p = Persistency.instance(root).get()
|
||||
defer:
|
||||
Persistency.reset()
|
||||
let job = p.openJob("sds").get()
|
||||
let persistence = newSdsPersistence(job)
|
||||
let channelId = "c".SdsChannelID
|
||||
|
||||
var upd = HistoryUpdate.init()
|
||||
upd.append = @[mkMsg(channelId, "a", 1), mkMsg(channelId, "b", 2)]
|
||||
check (await persistence.updateHistory(channelId, upd)).isOk
|
||||
check (await job.pollExists(CatLog, key(channelId, "b")))
|
||||
|
||||
var ev = HistoryUpdate.init()
|
||||
ev.evict = @["a".SdsMessageID]
|
||||
check (await persistence.updateHistory(channelId, ev)).isOk
|
||||
check (await job.pollGone(CatLog, key(channelId, "a")))
|
||||
|
||||
let data = (await persistence.loadChannel(channelId)).valueOr:
|
||||
check false
|
||||
return
|
||||
check data.messageHistory.len == 1
|
||||
check data.messageHistory[0].messageId == "b"
|
||||
|
||||
asyncTest "dropChannel wipes meta and log":
|
||||
let root = tmpRoot("drop")
|
||||
defer:
|
||||
removeDir(root)
|
||||
let p = Persistency.instance(root).get()
|
||||
defer:
|
||||
Persistency.reset()
|
||||
let job = p.openJob("sds").get()
|
||||
let persistence = newSdsPersistence(job)
|
||||
let channelId = "d".SdsChannelID
|
||||
|
||||
var meta = ChannelMeta.init()
|
||||
meta.lamportTimestamp = 7
|
||||
check (await persistence.saveChannelMeta(channelId, meta)).isOk
|
||||
var upd = HistoryUpdate.init()
|
||||
upd.append = @[mkMsg(channelId, "x", 1)]
|
||||
check (await persistence.updateHistory(channelId, upd)).isOk
|
||||
check (await job.pollExists(CatMeta, toKey(channelId)))
|
||||
check (await job.pollExists(CatLog, key(channelId, "x")))
|
||||
|
||||
check (await persistence.dropChannel(channelId)).isOk
|
||||
check (await job.pollGone(CatMeta, toKey(channelId)))
|
||||
|
||||
let data = (await persistence.loadChannel(channelId)).valueOr:
|
||||
check false
|
||||
return
|
||||
check data.meta.lamportTimestamp == 0
|
||||
check data.messageHistory.len == 0
|
||||
@ -1,331 +0,0 @@
|
||||
## Generic length-prefixed blob codec for persistency payloads.
|
||||
##
|
||||
## Symmetric counterpart to `keys.nim`'s `encodePart`: every persisted value
|
||||
## round-trips through `writePart`/`readPart` over a `ReadCtx` cursor. Unlike
|
||||
## keys, payloads are not byte-wise sort-stable, so strings and byte blobs use
|
||||
## a **4-byte BE length prefix** (4 GiB ceiling) instead of keys.nim's 2-byte
|
||||
## (64 KiB) prefix — the cap that originally forced SDS to hand-roll its codec.
|
||||
##
|
||||
## ## How a type opts in
|
||||
##
|
||||
## Primitives, `string`, `seq[byte]`, `enum`, `distinct`, `Time`, `seq[T]`,
|
||||
## `HashSet[T]` and tuples already have codecs here. A **named struct** opts in
|
||||
## with a single line:
|
||||
##
|
||||
## ```nim
|
||||
## BlobCodec(MyType)
|
||||
## ```
|
||||
##
|
||||
## which emits both `writePart`/`readPart` for `MyType` from its fields, in
|
||||
## declaration order, and reconstructs the value via `MyType.init(...)`
|
||||
## (positional). This is the *only* mechanism for structs — there is
|
||||
## deliberately no `fieldPairs`/default-construct path, so `{.requiresInit.}`
|
||||
## types (which cannot be zero-initialised) work unchanged. The contract:
|
||||
##
|
||||
## * the type is a value object whose `init` takes its fields positionally in
|
||||
## declaration order;
|
||||
## * only public fields participate (private fields are invisible to the
|
||||
## macro and would be dropped — don't persist such types);
|
||||
## * `BlobCodec` must be called for a field's type *before* the struct
|
||||
## that contains it (Nim resolves the concrete `writePart`/`readPart`
|
||||
## top-down).
|
||||
##
|
||||
## ## Entry points
|
||||
##
|
||||
## `toBlob(v)` → `seq[byte]`, `fromBlob(bytes, T)` → `T` (raises `ValueError`
|
||||
## on truncated/corrupt input).
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[macros, sets, times, typetraits]
|
||||
|
||||
type ReadCtx* = object
|
||||
buf*: seq[byte]
|
||||
pos*: int
|
||||
|
||||
proc initReadCtx*(bytes: openArray[byte]): ReadCtx =
|
||||
ReadCtx(buf: @bytes, pos: 0)
|
||||
|
||||
proc need(r: ReadCtx, n: int) {.raises: [ValueError].} =
|
||||
if n < 0 or r.pos + n > r.buf.len:
|
||||
raise newException(ValueError, "truncated payload: need " & $n & " more bytes")
|
||||
|
||||
# ── Fixed-width integers ────────────────────────────────────────────────
|
||||
|
||||
proc writePart*(buf: var seq[byte], v: uint32) =
|
||||
buf.add(byte((v shr 24) and 0xFF'u32))
|
||||
buf.add(byte((v shr 16) and 0xFF'u32))
|
||||
buf.add(byte((v shr 8) and 0xFF'u32))
|
||||
buf.add(byte(v and 0xFF'u32))
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[uint32]): uint32 {.raises: [ValueError].} =
|
||||
r.need(4)
|
||||
result =
|
||||
(uint32(r.buf[r.pos]) shl 24) or (uint32(r.buf[r.pos + 1]) shl 16) or
|
||||
(uint32(r.buf[r.pos + 2]) shl 8) or uint32(r.buf[r.pos + 3])
|
||||
r.pos += 4
|
||||
|
||||
proc writePart*(buf: var seq[byte], v: int64) =
|
||||
let u = cast[uint64](v)
|
||||
for shift in countdown(56, 0, 8):
|
||||
buf.add(byte((u shr shift) and 0xFF'u64))
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[int64]): int64 {.raises: [ValueError].} =
|
||||
r.need(8)
|
||||
var u: uint64 = 0
|
||||
for i in 0 ..< 8:
|
||||
u = (u shl 8) or uint64(r.buf[r.pos + i])
|
||||
r.pos += 8
|
||||
cast[int64](u)
|
||||
|
||||
proc writePart*(buf: var seq[byte], v: int) =
|
||||
writePart(buf, int64(v))
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[int]): int {.raises: [ValueError].} =
|
||||
let x = readPart(r, int64)
|
||||
when sizeof(int) < sizeof(int64):
|
||||
if x < int64(low(int)) or x > int64(high(int)):
|
||||
raise newException(ValueError, "int out of range: " & $x)
|
||||
result = int(x)
|
||||
|
||||
# ── Small scalars ───────────────────────────────────────────────────────
|
||||
|
||||
proc writePart*(buf: var seq[byte], v: bool) =
|
||||
buf.add(if v: 1'u8 else: 0'u8)
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[bool]): bool {.raises: [ValueError].} =
|
||||
r.need(1)
|
||||
result = r.buf[r.pos] != 0'u8
|
||||
r.pos += 1
|
||||
|
||||
proc writePart*(buf: var seq[byte], v: byte) =
|
||||
buf.add(v)
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[byte]): byte {.raises: [ValueError].} =
|
||||
r.need(1)
|
||||
result = r.buf[r.pos]
|
||||
r.pos += 1
|
||||
|
||||
proc writePart*(buf: var seq[byte], v: char) =
|
||||
buf.add(byte(v))
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[char]): char {.raises: [ValueError].} =
|
||||
r.need(1)
|
||||
result = char(r.buf[r.pos])
|
||||
r.pos += 1
|
||||
|
||||
proc writePart*[E: enum](buf: var seq[byte], v: E) =
|
||||
writePart(buf, int64(ord(v)))
|
||||
|
||||
proc readPart*[E: enum](r: var ReadCtx, _: typedesc[E]): E {.raises: [ValueError].} =
|
||||
let x = readPart(r, int64)
|
||||
let lo = int64(ord(low(E)))
|
||||
let hi = int64(ord(high(E)))
|
||||
if x < lo or x > hi:
|
||||
raise newException(ValueError, "enum value out of range: " & $x)
|
||||
result = E(x)
|
||||
|
||||
# ── string / seq[byte] (4-byte length) ──────────────────────────────────
|
||||
|
||||
proc writePart*(buf: var seq[byte], s: string) =
|
||||
writePart(buf, uint32(s.len))
|
||||
for c in s:
|
||||
buf.add(byte(c))
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[string]): string {.raises: [ValueError].} =
|
||||
let nU = readPart(r, uint32)
|
||||
if nU > uint32(high(int)):
|
||||
raise newException(ValueError, "string length out of range: " & $nU)
|
||||
let n = int(nU)
|
||||
r.need(n)
|
||||
result = newString(n)
|
||||
for i in 0 ..< n:
|
||||
result[i] = char(r.buf[r.pos + i])
|
||||
r.pos += n
|
||||
|
||||
proc writePart*(buf: var seq[byte], b: seq[byte]) =
|
||||
writePart(buf, uint32(b.len))
|
||||
for x in b:
|
||||
buf.add(x)
|
||||
|
||||
proc readPart*(
|
||||
r: var ReadCtx, _: typedesc[seq[byte]]
|
||||
): seq[byte] {.raises: [ValueError].} =
|
||||
let nU = readPart(r, uint32)
|
||||
if nU > uint32(high(int)):
|
||||
raise newException(ValueError, "blob length out of range: " & $nU)
|
||||
let n = int(nU)
|
||||
r.need(n)
|
||||
result = newSeq[byte](n)
|
||||
for i in 0 ..< n:
|
||||
result[i] = r.buf[r.pos + i]
|
||||
r.pos += n
|
||||
|
||||
# ── distinct (e.g. SdsParticipantID = distinct string) ──────────────────
|
||||
|
||||
proc writePart*[T: distinct](buf: var seq[byte], v: T) =
|
||||
mixin writePart
|
||||
writePart(buf, distinctBase(T)(v))
|
||||
|
||||
proc readPart*[T: distinct](
|
||||
r: var ReadCtx, _: typedesc[T]
|
||||
): T {.raises: [ValueError].} =
|
||||
mixin readPart
|
||||
T(readPart(r, distinctBase(T)))
|
||||
|
||||
# ── Time ────────────────────────────────────────────────────────────────
|
||||
|
||||
proc writePart*(buf: var seq[byte], t: Time) =
|
||||
writePart(buf, t.toUnix())
|
||||
writePart(buf, uint32(t.nanosecond))
|
||||
|
||||
proc readPart*(r: var ReadCtx, _: typedesc[Time]): Time {.raises: [ValueError].} =
|
||||
let secs = readPart(r, int64)
|
||||
let nanos = int(readPart(r, uint32))
|
||||
if nanos < 0 or nanos > 999_999_999:
|
||||
raise newException(ValueError, "nanosecond out of range: " & $nanos)
|
||||
initTime(secs, nanos)
|
||||
|
||||
# ── Containers ──────────────────────────────────────────────────────────
|
||||
|
||||
proc writePart*[T](buf: var seq[byte], xs: seq[T]) =
|
||||
mixin writePart
|
||||
writePart(buf, uint32(xs.len))
|
||||
for x in xs:
|
||||
writePart(buf, x)
|
||||
|
||||
proc readPart*[T](
|
||||
r: var ReadCtx, _: typedesc[seq[T]]
|
||||
): seq[T] {.raises: [ValueError].} =
|
||||
mixin readPart
|
||||
let nU = readPart(r, uint32)
|
||||
if nU > uint32(high(int)):
|
||||
raise newException(ValueError, "sequence length out of range: " & $nU)
|
||||
let n = int(nU)
|
||||
result = newSeqOfCap[T](n)
|
||||
for _ in 0 ..< n:
|
||||
result.add(readPart(r, T))
|
||||
proc writePart*[T](buf: var seq[byte], s: HashSet[T]) =
|
||||
mixin writePart
|
||||
writePart(buf, uint32(s.len))
|
||||
for x in s:
|
||||
writePart(buf, x)
|
||||
|
||||
proc readPart*[T](
|
||||
r: var ReadCtx, _: typedesc[HashSet[T]]
|
||||
): HashSet[T] {.raises: [ValueError].} =
|
||||
mixin readPart
|
||||
let nU = readPart(r, uint32)
|
||||
if nU > uint32(high(int)):
|
||||
raise newException(ValueError, "set length out of range: " & $nU)
|
||||
let n = int(nU)
|
||||
result = initHashSet[T](max(n, 2))
|
||||
for _ in 0 ..< n:
|
||||
result.incl(readPart(r, T))
|
||||
proc writePart*[T: tuple](buf: var seq[byte], v: T) =
|
||||
mixin writePart
|
||||
for f in fields(v):
|
||||
writePart(buf, f)
|
||||
|
||||
proc readPart*[T: tuple](r: var ReadCtx, _: typedesc[T]): T {.raises: [ValueError].} =
|
||||
mixin readPart
|
||||
for f in fields(result):
|
||||
f = readPart(r, typeof(f))
|
||||
|
||||
# ── Named-struct derivation ─────────────────────────────────────────────
|
||||
|
||||
proc objectRecList(tSym: NimNode): NimNode {.compileTime.} =
|
||||
## Resolve a type symbol to its object's RecList, preserving field types
|
||||
## exactly as written (getImpl, not getTypeImpl, so `HashSet[SdsMessageID]`
|
||||
## and friends stay named rather than being expanded to their structure).
|
||||
var body = tSym.getImpl[2]
|
||||
while body.kind in {nnkRefTy, nnkPtrTy, nnkDistinctTy}:
|
||||
body = body[0]
|
||||
doAssert body.kind == nnkObjectTy,
|
||||
"BlobCodec: expected an object type, got " & treeRepr(body)
|
||||
body[2]
|
||||
|
||||
macro BlobCodec*(T: typedesc): untyped =
|
||||
## Emit `writePart`/`readPart` for a named value object `T`, encoding each
|
||||
## public field in declaration order and rebuilding via `T.init(...)`.
|
||||
let tSym = getTypeInst(T)[1]
|
||||
let recList = objectRecList(tSym)
|
||||
|
||||
var fieldNames: seq[NimNode]
|
||||
var fieldTypes: seq[NimNode]
|
||||
for defs in recList:
|
||||
if defs.kind != nnkIdentDefs:
|
||||
continue
|
||||
# Rebuild the field type from its textual form rather than splicing the
|
||||
# resolved symbol: a spliced *alias* type symbol (e.g. `SdsMessageID =
|
||||
# string`) is mis-resolved as a value in `readPart(r, T)`, breaking
|
||||
# typedesc overload resolution. A fresh ident/expr behaves like literal
|
||||
# source and resolves to a typedesc correctly.
|
||||
let ftype = parseExpr(repr(defs[^2]))
|
||||
for i in 0 ..< defs.len - 2:
|
||||
var nameNode = defs[i]
|
||||
if nameNode.kind == nnkPragmaExpr:
|
||||
nameNode = nameNode[0]
|
||||
if nameNode.kind == nnkPostfix:
|
||||
nameNode = nameNode[1]
|
||||
fieldNames.add(ident($nameNode))
|
||||
fieldTypes.add(ftype.copyNimTree)
|
||||
|
||||
let bufId = ident "buf"
|
||||
let vId = ident "v"
|
||||
let rId = ident "r"
|
||||
|
||||
# writePart(buf: var seq[byte], v: T)
|
||||
var writeBody = newStmtList()
|
||||
for fn in fieldNames:
|
||||
writeBody.add(newCall(ident "writePart", bufId, newDotExpr(vId, fn)))
|
||||
let writeProc = newProc(
|
||||
name = ident "writePart",
|
||||
params = [
|
||||
newEmptyNode(),
|
||||
newIdentDefs(
|
||||
bufId, nnkVarTy.newTree(nnkBracketExpr.newTree(ident "seq", ident "byte"))
|
||||
),
|
||||
newIdentDefs(vId, tSym),
|
||||
],
|
||||
body = writeBody,
|
||||
)
|
||||
|
||||
# readPart(r: var ReadCtx, _: typedesc[T]): T {.raises: [ValueError].}
|
||||
var readBody = newStmtList()
|
||||
var tmps: seq[NimNode]
|
||||
for i, ft in fieldTypes:
|
||||
let tmp = genSym(nskLet, "f" & $i)
|
||||
tmps.add(tmp)
|
||||
readBody.add(newLetStmt(tmp, newCall(ident "readPart", rId, ft)))
|
||||
readBody.add(newCall(newDotExpr(tSym, ident "init"), tmps))
|
||||
let readProc = newProc(
|
||||
name = ident "readPart",
|
||||
params = [
|
||||
tSym,
|
||||
newIdentDefs(rId, nnkVarTy.newTree(ident "ReadCtx")),
|
||||
newIdentDefs(ident "_", nnkBracketExpr.newTree(ident "typedesc", tSym)),
|
||||
],
|
||||
body = readBody,
|
||||
)
|
||||
readProc.addPragma(
|
||||
nnkExprColonExpr.newTree(ident "raises", nnkBracket.newTree(ident "ValueError"))
|
||||
)
|
||||
|
||||
result = newStmtList(writeProc, readProc)
|
||||
|
||||
# ── Public entry points ─────────────────────────────────────────────────
|
||||
|
||||
proc toBlob*[T](v: T): seq[byte] =
|
||||
mixin writePart
|
||||
result = @[]
|
||||
writePart(result, v)
|
||||
|
||||
proc fromBlob*[T](bytes: openArray[byte], _: typedesc[T]): T {.raises: [ValueError].} =
|
||||
mixin readPart
|
||||
var r = initReadCtx(bytes)
|
||||
result = readPart(r, T)
|
||||
if r.pos != r.buf.len:
|
||||
raise newException(ValueError, "trailing payload bytes: " & $(r.buf.len - r.pos))
|
||||
|
||||
{.pop.}
|
||||
@ -1,49 +1,67 @@
|
||||
## Adapter that materialises the SDS `Persistence` contract on top of a
|
||||
## waku-persistency `Job`. One `Job` (== one SQLite file, one worker thread)
|
||||
## services all channels for a given SDS context; rows are namespaced by
|
||||
## category and the channelId is the first key component so per-channel
|
||||
## prefix scans stay cheap.
|
||||
## Adapter that materialises the SDS `Persistence` contract (nim-sds 0.3.0,
|
||||
## snapshot model) on top of a waku-persistency `Job`. One `Job` (== one
|
||||
## SQLite file, one worker thread) services all channels for a given SDS
|
||||
## context; rows are namespaced by category and the channelId is the first
|
||||
## key component so per-channel prefix scans stay cheap.
|
||||
##
|
||||
## ## Async contract
|
||||
## ## Snapshot contract (nim-sds 0.3.0)
|
||||
##
|
||||
## Every `Persistence` proc field is async (`proc(..): Future[void]
|
||||
## {.async: (raises: []), gcsafe.}`) — SDS awaits them on its own chronos
|
||||
## loop. We map each onto the matching async `Job` op:
|
||||
## The fine-grained per-row callbacks of 0.2.4 are gone. SDS now persists via
|
||||
## five procs, all `Future[Result[void, string]]` (load returns
|
||||
## `Result[ChannelData, string]`), `{.async: (raises: []), gcsafe.}`:
|
||||
##
|
||||
## * **Writes (save*/remove*)** — call the fire-and-forget `Job.persist*` ops
|
||||
## through the `safePut`/`safeDelete` helpers, which trap any backend error
|
||||
## and log it rather than raising (the contract forbids raising). Note that
|
||||
## Persistency v1 only guarantees the event has been queued when the Future
|
||||
## resolves — reads immediately after an awaited write can still be racy.
|
||||
## * **`dropChannel`** — awaits `doDropChannel`, which batches every row of
|
||||
## the channel into one transactional `persist` (atomic when applied).
|
||||
## * **`loadAllForChannel`** — awaits `doLoadAll` and returns the snapshot
|
||||
## the SDS bootstrap path needs.
|
||||
## * **`saveChannelMeta`** — the complete fast-changing per-channel state
|
||||
## (lamport clock, outgoing/incoming buffers, both SDS-R repair buffers)
|
||||
## as ONE blob. Idempotent; a missed write self-heals on the next save.
|
||||
## * **`updateHistory`** — append newly-delivered messages / evict the
|
||||
## oldest past the cap, applied as one transactional batch.
|
||||
## * **`loadChannel`** — bootstrap: returns the prior `ChannelData`
|
||||
## (meta + ordered message history) or an empty one. Surfaces errors.
|
||||
## * **`dropChannel`** — wipe all state for a channel. Surfaces errors.
|
||||
## * **`setRetrievalHint`** — see below; deliberate no-op here.
|
||||
##
|
||||
## Failure policy mirrors the interface docs: save/update/hint are non-fatal
|
||||
## (we log and still return the error string); load/drop are durability-intent
|
||||
## and propagate their error to the caller.
|
||||
##
|
||||
## ## Codec
|
||||
##
|
||||
## The blob transform is owned by nim-sds: `ChannelMeta` round-trips through
|
||||
## `sds/snapshot_codec` (protobuf, schema-versioned — refuses unknown
|
||||
## versions), and each persisted `SdsMessage` log row through the SDS wire
|
||||
## codec in `sds/protobuf`. We do not maintain a second codec for these
|
||||
## shapes (the previous `payload_codec`/`BlobCodec` path is retired).
|
||||
##
|
||||
## ## Retrieval hints
|
||||
##
|
||||
## `setRetrievalHint` is intentionally a no-op: persisted hints are never read
|
||||
## back — `loadChannel` returns `ChannelData` (meta + messageHistory) with no
|
||||
## hint field, and `ChannelMeta` carries none. Hints are supplied live via the
|
||||
## `onRetrievalHint` provider, so persisting them would be write-only dead
|
||||
## data. The closure still exists because the field is required by the
|
||||
## `Persistence` object (SDS calls it from `getRecentHistoryEntries`).
|
||||
##
|
||||
## ## Storage layout
|
||||
##
|
||||
## | Category | Key | Value |
|
||||
## |------------------|------------------------------|-----------------------------------------|
|
||||
## | `sds.lamport` | `key(channelId)` | 8 BE bytes (int64) |
|
||||
## | `sds.log` | `key(channelId, msgId)` | encoded `SdsMessage` |
|
||||
## | `sds.hint` | `key(msgId)` | raw hint bytes |
|
||||
## | `sds.outgoing` | `key(channelId, msgId)` | encoded `UnacknowledgedMessage` |
|
||||
## | `sds.incoming` | `key(channelId, msgId)` | encoded `IncomingMessage` |
|
||||
## | `sds.outRepair` | `key(channelId, msgId)` | `(msgId, OutgoingRepairEntry)` encoded |
|
||||
## | `sds.inRepair` | `key(channelId, msgId)` | `(msgId, IncomingRepairEntry)` encoded |
|
||||
## | Category | Key | Value |
|
||||
## |---------------|--------------------------|----------------------------------------|
|
||||
## | `sds.meta` | `key(channelId)` | `ChannelMeta` (snapshot_codec protobuf)|
|
||||
## | `sds.log` | `key(channelId, msgId)` | `SdsMessage` (sds wire protobuf) |
|
||||
##
|
||||
## `messageHistory` is reconstructed in memory by sorting on
|
||||
## `(lamportTimestamp, messageId)` — the same total order SDS uses for
|
||||
## delivery (see sds/sds_utils.nim). Insertion order is not relied upon, so
|
||||
## `removeLogEntry` works with the natural `(channelId, msgId)` key.
|
||||
## delivery (see sds/sds_utils.nim).
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[algorithm, options, sets, times]
|
||||
import std/[algorithm, options]
|
||||
import chronos, chronicles, results
|
||||
import libp2p/protobuf/minprotobuf
|
||||
import ./persistency
|
||||
import ./payload_codec
|
||||
import sds/types/persistence
|
||||
import ./keys
|
||||
import types/persistence
|
||||
import snapshot_codec
|
||||
import protobuf
|
||||
|
||||
export persistence, persistency
|
||||
|
||||
@ -51,155 +69,8 @@ logScope:
|
||||
topics = "sds-persistency"
|
||||
|
||||
const
|
||||
CatLamport* = "sds.lamport"
|
||||
CatMeta* = "sds.meta"
|
||||
CatLog* = "sds.log"
|
||||
CatHint* = "sds.hint"
|
||||
CatOutgoing* = "sds.outgoing"
|
||||
CatIncoming* = "sds.incoming"
|
||||
CatOutRepair* = "sds.outRepair"
|
||||
CatInRepair* = "sds.inRepair"
|
||||
|
||||
# ── Blob codecs ─────────────────────────────────────────────────────────
|
||||
#
|
||||
# All SDS payload types round-trip through the generic, length-prefixed
|
||||
# codec in payload_codec.nim. Each `BlobCodec(T)` emits writePart/
|
||||
# readPart for `T` from its public fields (declaration order) and rebuilds
|
||||
# via `T.init(...)`. Order matters: a field's type must be derived before
|
||||
# the struct that contains it. The repair buffers store `(msgId, entry)`
|
||||
# tuples, handled by the generic tuple codec. Lamport is a bare int64.
|
||||
|
||||
BlobCodec(HistoryEntry)
|
||||
BlobCodec(SdsMessage)
|
||||
BlobCodec(UnacknowledgedMessage)
|
||||
BlobCodec(IncomingMessage)
|
||||
BlobCodec(OutgoingRepairEntry)
|
||||
BlobCodec(IncomingRepairEntry)
|
||||
|
||||
# ── Write helpers ───────────────────────────────────────────────────────
|
||||
#
|
||||
# The Persistence write fields are async with `raises: []`, but the Job ops
|
||||
# raise `CatchableError`. These wrappers trap and log so the closures stay
|
||||
# raise-free, preserving the "errors are logged, never raised" contract.
|
||||
|
||||
proc safePut(
|
||||
job: Job, category: string, k: Key, payload: seq[byte]
|
||||
) {.async: (raises: []).} =
|
||||
try:
|
||||
await job.persistPut(category, k, payload)
|
||||
except CatchableError as e:
|
||||
warn "sds-persistency: put failed", category, err = e.msg
|
||||
|
||||
proc safeDelete(job: Job, category: string, k: Key) {.async: (raises: []).} =
|
||||
try:
|
||||
await job.persistDelete(category, k)
|
||||
except CatchableError as e:
|
||||
warn "sds-persistency: delete failed", category, err = e.msg
|
||||
|
||||
proc safePersist(job: Job, ops: seq[TxOp]) {.async: (raises: []).} =
|
||||
try:
|
||||
await job.persist(ops)
|
||||
except CatchableError as e:
|
||||
warn "sds-persistency: persist batch failed", opCount = ops.len, err = e.msg
|
||||
|
||||
# ── Async backing procs ─────────────────────────────────────────────────
|
||||
|
||||
proc doLoadAll(job: Job, channelId: SdsChannelID): Future[ChannelSnapshot] {.async.} =
|
||||
var snap = ChannelSnapshot()
|
||||
let chanKey = toKey(channelId)
|
||||
|
||||
block lamport:
|
||||
let opt = (await job.get(CatLamport, chanKey)).valueOr:
|
||||
warn "sds-persistency: get lamport failed", channelId, err = $error
|
||||
break lamport
|
||||
if opt.isSome:
|
||||
try:
|
||||
snap.lamportTimestamp = fromBlob(opt.get, int64)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid lamport bytes", channelId, err = e.msg
|
||||
|
||||
block log:
|
||||
let rows = (await job.scanPrefix(CatLog, chanKey)).valueOr:
|
||||
warn "sds-persistency: scan log failed", channelId, err = $error
|
||||
break log
|
||||
var msgs = newSeq[SdsMessage]()
|
||||
for row in rows:
|
||||
try:
|
||||
msgs.add(fromBlob(row.payload, SdsMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid log row", channelId, err = e.msg
|
||||
msgs.sort do(a, b: SdsMessage) -> int:
|
||||
result = cmp(a.lamportTimestamp, b.lamportTimestamp)
|
||||
if result == 0:
|
||||
result = cmp(a.messageId, b.messageId)
|
||||
snap.messageHistory = msgs
|
||||
|
||||
block outgoing:
|
||||
let rows = (await job.scanPrefix(CatOutgoing, chanKey)).valueOr:
|
||||
warn "sds-persistency: scan outgoing failed", channelId, err = $error
|
||||
break outgoing
|
||||
for row in rows:
|
||||
try:
|
||||
snap.outgoingBuffer.add(fromBlob(row.payload, UnacknowledgedMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid outgoing row", channelId, err = e.msg
|
||||
|
||||
block incoming:
|
||||
let rows = (await job.scanPrefix(CatIncoming, chanKey)).valueOr:
|
||||
warn "sds-persistency: scan incoming failed", channelId, err = $error
|
||||
break incoming
|
||||
for row in rows:
|
||||
try:
|
||||
snap.incomingBuffer.add(fromBlob(row.payload, IncomingMessage))
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid incoming row", channelId, err = e.msg
|
||||
|
||||
block outRepair:
|
||||
let rows = (await job.scanPrefix(CatOutRepair, chanKey)).valueOr:
|
||||
warn "sds-persistency: scan out-repair failed", channelId, err = $error
|
||||
break outRepair
|
||||
for row in rows:
|
||||
try:
|
||||
snap.outgoingRepairBuffer.add(
|
||||
fromBlob(row.payload, (SdsMessageID, OutgoingRepairEntry))
|
||||
)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid out-repair row", channelId, err = e.msg
|
||||
|
||||
block inRepair:
|
||||
let rows = (await job.scanPrefix(CatInRepair, chanKey)).valueOr:
|
||||
warn "sds-persistency: scan in-repair failed", channelId, err = $error
|
||||
break inRepair
|
||||
for row in rows:
|
||||
try:
|
||||
snap.incomingRepairBuffer.add(
|
||||
fromBlob(row.payload, (SdsMessageID, IncomingRepairEntry))
|
||||
)
|
||||
except ValueError as e:
|
||||
warn "sds-persistency: invalid in-repair row", channelId, err = e.msg
|
||||
|
||||
return snap
|
||||
|
||||
proc doDropChannel(job: Job, channelId: SdsChannelID): Future[void] {.async.} =
|
||||
## Delete every row belonging to the channel in one transactional batch.
|
||||
## Uses txDeletePrefix to push bulk deletes to the worker thread — no
|
||||
## caller-side scans needed. Hint rows (keyed by msgId, not channelId)
|
||||
## are not cleaned here; they are cascade-deleted by removeLogEntry during
|
||||
## normal rolling-history eviction, so by the time a channel is dropped
|
||||
## the only remaining hints belong to the still-live log tail. Those
|
||||
## become harmless orphans (never reloaded — hints are re-derived on
|
||||
## demand from the onRetrievalHint callback).
|
||||
let chanKey = toKey(channelId)
|
||||
await safePersist(
|
||||
job,
|
||||
@[
|
||||
TxOp(category: CatLog, key: chanKey, kind: txDeletePrefix),
|
||||
TxOp(category: CatOutgoing, key: chanKey, kind: txDeletePrefix),
|
||||
TxOp(category: CatIncoming, key: chanKey, kind: txDeletePrefix),
|
||||
TxOp(category: CatOutRepair, key: chanKey, kind: txDeletePrefix),
|
||||
TxOp(category: CatInRepair, key: chanKey, kind: txDeletePrefix),
|
||||
TxOp(category: CatLamport, key: chanKey, kind: txDelete),
|
||||
],
|
||||
)
|
||||
|
||||
# ── Public factory ──────────────────────────────────────────────────────
|
||||
|
||||
@ -207,102 +78,106 @@ proc newSdsPersistence*(job: Job): Persistence {.gcsafe, raises: [].} =
|
||||
## Build an SDS `Persistence` value backed by ``job``. One Job services
|
||||
## all channels — channelId is part of every key.
|
||||
##
|
||||
## The closures capture ``job`` by ref. They must be invoked from a
|
||||
## thread that owns a running chronos loop (the SDS context's worker
|
||||
## thread satisfies this).
|
||||
## The closures capture ``job`` by ref. They must be invoked from a thread
|
||||
## that owns a running chronos loop (the SDS context's worker thread
|
||||
## satisfies this).
|
||||
doAssert not job.isNil, "newSdsPersistence: job is nil"
|
||||
|
||||
# Built field-by-field via assignment rather than an object literal: every
|
||||
# field is an async closure whose body is an `await`/`return await` command
|
||||
# call, which cannot be followed by the `,` field separator a `Persistence(
|
||||
# ..)` literal would require (the parser cannot tell the comma from another
|
||||
# command argument). Assignments have no separator, so the bodies stay plain.
|
||||
# field is an async closure whose body uses `await`/`return` statements,
|
||||
# which cannot be followed by the `,` field separator a `Persistence(..)`
|
||||
# literal would require. Assignments have no separator, so bodies stay plain.
|
||||
var persistence = Persistence()
|
||||
|
||||
persistence.saveLamport = proc(
|
||||
channelId: SdsChannelID, lamport: int64
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatLamport, toKey(channelId), toBlob(lamport))
|
||||
persistence.saveChannelMeta = proc(
|
||||
channelId: SdsChannelID, meta: ChannelMeta
|
||||
): Future[Result[void, string]] {.async: (raises: []), gcsafe.} =
|
||||
try:
|
||||
await job.persistPut(CatMeta, toKey(channelId), encode(meta).buffer)
|
||||
return ok()
|
||||
except CatchableError as e:
|
||||
warn "sds-persistency: saveChannelMeta failed", channelId, err = e.msg
|
||||
return err(e.msg)
|
||||
|
||||
persistence.appendLogEntry = proc(
|
||||
channelId: SdsChannelID, msg: SdsMessage
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatLog, key(channelId, msg.messageId), toBlob(msg))
|
||||
persistence.updateHistory = proc(
|
||||
channelId: SdsChannelID, update: HistoryUpdate
|
||||
): Future[Result[void, string]] {.async: (raises: []), gcsafe.} =
|
||||
if update.isEmpty:
|
||||
return ok()
|
||||
# One transactional batch: append rows (txPut) and evictions (txDelete).
|
||||
var ops = newSeq[TxOp]()
|
||||
for m in update.append:
|
||||
ops.add TxOp(
|
||||
category: CatLog,
|
||||
key: key(channelId, m.messageId),
|
||||
kind: txPut,
|
||||
payload: encode(m).buffer,
|
||||
)
|
||||
for id in update.evict:
|
||||
ops.add TxOp(category: CatLog, key: key(channelId, id), kind: txDelete)
|
||||
try:
|
||||
await job.persist(ops)
|
||||
return ok()
|
||||
except CatchableError as e:
|
||||
warn "sds-persistency: updateHistory failed",
|
||||
channelId, appended = update.append.len, evicted = update.evict.len, err = e.msg
|
||||
return err(e.msg)
|
||||
|
||||
persistence.removeLogEntry = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
# Atomic batch: delete the log row and its associated retrieval hint in
|
||||
# one transaction so they can't diverge.
|
||||
await safePersist(
|
||||
job,
|
||||
@[
|
||||
TxOp(category: CatLog, key: key(channelId, msgId), kind: txDelete),
|
||||
TxOp(category: CatHint, key: toKey(msgId), kind: txDelete),
|
||||
],
|
||||
)
|
||||
persistence.loadChannel = proc(
|
||||
channelId: SdsChannelID
|
||||
): Future[Result[ChannelData, string]] {.async: (raises: []), gcsafe.} =
|
||||
let chanKey = toKey(channelId)
|
||||
var data = ChannelData.init()
|
||||
try:
|
||||
block meta:
|
||||
let opt = (await job.get(CatMeta, chanKey)).valueOr:
|
||||
return err("loadChannel: get meta: " & $error)
|
||||
if opt.isSome:
|
||||
# schema-versioned decode; refuses unknown versions loudly.
|
||||
data.meta = ChannelMeta.decode(opt.get).valueOr:
|
||||
return err("loadChannel: corrupt or unsupported ChannelMeta blob")
|
||||
|
||||
persistence.setRetrievalHint = proc(
|
||||
msgId: SdsMessageID, hint: seq[byte]
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatHint, toKey(msgId), hint)
|
||||
block history:
|
||||
let rows = (await job.scanPrefix(CatLog, chanKey)).valueOr:
|
||||
return err("loadChannel: scan log: " & $error)
|
||||
var msgs = newSeq[SdsMessage]()
|
||||
for row in rows:
|
||||
let m = SdsMessage.decode(row.payload).valueOr:
|
||||
warn "sds-persistency: skipping undecodable log row", channelId
|
||||
continue
|
||||
msgs.add(m)
|
||||
msgs.sort do(a, b: SdsMessage) -> int:
|
||||
result = cmp(a.lamportTimestamp, b.lamportTimestamp)
|
||||
if result == 0:
|
||||
result = cmp(a.messageId, b.messageId)
|
||||
data.messageHistory = msgs
|
||||
|
||||
persistence.saveOutgoing = proc(
|
||||
channelId: SdsChannelID, msg: UnacknowledgedMessage
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatOutgoing, key(channelId, msg.message.messageId), toBlob(msg))
|
||||
|
||||
persistence.removeOutgoing = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safeDelete(job, CatOutgoing, key(channelId, msgId))
|
||||
|
||||
persistence.saveIncoming = proc(
|
||||
channelId: SdsChannelID, msg: IncomingMessage
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatIncoming, key(channelId, msg.message.messageId), toBlob(msg))
|
||||
|
||||
persistence.removeIncoming = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safeDelete(job, CatIncoming, key(channelId, msgId))
|
||||
|
||||
persistence.saveOutgoingRepair = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID, entry: OutgoingRepairEntry
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatOutRepair, key(channelId, msgId), toBlob((msgId, entry)))
|
||||
|
||||
persistence.removeOutgoingRepair = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safeDelete(job, CatOutRepair, key(channelId, msgId))
|
||||
|
||||
persistence.saveIncomingRepair = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID, entry: IncomingRepairEntry
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safePut(job, CatInRepair, key(channelId, msgId), toBlob((msgId, entry)))
|
||||
|
||||
persistence.removeIncomingRepair = proc(
|
||||
channelId: SdsChannelID, msgId: SdsMessageID
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
await safeDelete(job, CatInRepair, key(channelId, msgId))
|
||||
return ok(data)
|
||||
except CatchableError as e:
|
||||
return err("loadChannel: " & e.msg)
|
||||
|
||||
persistence.dropChannel = proc(
|
||||
channelId: SdsChannelID
|
||||
): Future[void] {.async: (raises: []), gcsafe.} =
|
||||
): Future[Result[void, string]] {.async: (raises: []), gcsafe.} =
|
||||
let chanKey = toKey(channelId)
|
||||
try:
|
||||
await doDropChannel(job, channelId)
|
||||
await job.persist(
|
||||
@[
|
||||
TxOp(category: CatLog, key: chanKey, kind: txDeletePrefix),
|
||||
TxOp(category: CatMeta, key: chanKey, kind: txDelete),
|
||||
]
|
||||
)
|
||||
return ok()
|
||||
except CatchableError as e:
|
||||
error "sds-persistency: dropChannel failed", channelId, err = e.msg
|
||||
return err(e.msg)
|
||||
|
||||
persistence.loadAllForChannel = proc(
|
||||
channelId: SdsChannelID
|
||||
): Future[ChannelSnapshot] {.async: (raises: []), gcsafe.} =
|
||||
try:
|
||||
return await doLoadAll(job, channelId)
|
||||
except CatchableError as e:
|
||||
error "sds-persistency: loadAllForChannel failed", channelId, err = e.msg
|
||||
return ChannelSnapshot()
|
||||
persistence.setRetrievalHint = proc(
|
||||
msgId: SdsMessageID, hint: seq[byte]
|
||||
): Future[Result[void, string]] {.async: (raises: []), gcsafe.} =
|
||||
## Deliberate no-op — persisted hints are never read back (see module
|
||||
## header). Hints are supplied live via the onRetrievalHint provider.
|
||||
return ok()
|
||||
|
||||
return persistence
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user