Merge 7d99833ceef7caf352e5d507c87f9a1295c8d3f7 into 64a0ed7d967454d9c3b345023719e6ca5d73f129

This commit is contained in:
NagyZoltanPeter 2026-06-02 20:02:17 +02:00 committed by GitHub
commit a8055a0955
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 396 additions and 18 deletions

View File

@ -328,8 +328,8 @@
}
},
"brokers": {
"version": "#v2.0.1",
"vcsRevision": "2093ca4d50e581adda73fee7fd16231f990f4cbe",
"version": "#v3.1.1",
"vcsRevision": "a7316a35f1b62e3497ae8ee0fc1aace74df0beb2",
"url": "https://github.com/NagyZoltanPeter/nim-brokers.git",
"downloadMethod": "git",
"dependencies": [
@ -341,7 +341,7 @@
"cbor_serialization"
],
"checksums": {
"sha1": "cc74c987af94537e9d44d1b0143aa417299040c5"
"sha1": "4447d7c1f9da14ae439afb23aee45116ce2ecb40"
}
},
"stint": {
@ -620,8 +620,8 @@
}
},
"sds": {
"version": "#2e9a7683f0e180bf112135fae3a3803eed8490d4",
"vcsRevision": "2e9a7683f0e180bf112135fae3a3803eed8490d4",
"version": "#abdd40cc645f1b024c3ee99cced7e287c4e4c441",
"vcsRevision": "abdd40cc645f1b024c3ee99cced7e287c4e4c441",
"url": "https://github.com/logos-messaging/nim-sds.git",
"downloadMethod": "git",
"dependencies": [
@ -636,7 +636,7 @@
"taskpools"
],
"checksums": {
"sha1": "d13f1bf8d1b90b27e9edfc063b043831242cda19"
"sha1": "61c4ae13c6896bfa70e662520e8660a78c7f438c"
}
},
"ffi": {

View File

@ -5,5 +5,6 @@ import ./test_backend
import ./test_lifecycle
import ./test_facade
import ./test_encoding
import ./test_sds_persistency
import ./test_string_lookup
import ./test_singleton

View File

@ -0,0 +1,155 @@
{.used.}
## Behavioural tests for the SDS Persistence adapter (nim-sds 0.3.0 snapshot
## model). Importing `sds_persistency` also compile-checks the real adapter.
##
## Writes go through the fire-and-forget Job path (the Future resolves when
## the op is queued, not applied — Persistency v1), so every read-back polls
## until the row appears/disappears.
import std/[options, os, times]
import chronos, results
import testutils/unittests
import waku/persistency/persistency
import waku/persistency/keys
import waku/persistency/sds_persistency
proc tmpRoot(label: string): string =
let p = getTempDir() / ("sds_persistency_test_" & label & "_" & $epochTime().int)
removeDir(p)
p
proc pollExists(
t: Job, category: string, k: Key, timeoutMs = 1000
): Future[bool] {.async.} =
let deadline = epochTime() + (timeoutMs.float / 1000.0)
while epochTime() < deadline:
let r = await t.exists(category, k)
if r.isOk and r.get():
return true
await sleepAsync(chronos.milliseconds(2))
return false
proc pollGone(
t: Job, category: string, k: Key, timeoutMs = 1000
): Future[bool] {.async.} =
let deadline = epochTime() + (timeoutMs.float / 1000.0)
while epochTime() < deadline:
let r = await t.exists(category, k)
if r.isOk and not r.get():
return true
await sleepAsync(chronos.milliseconds(2))
return false
proc mkMsg(channelId: SdsChannelID, msgId: SdsMessageID, lamport: int64): SdsMessage =
SdsMessage.init(
messageId = msgId,
lamportTimestamp = lamport,
causalHistory = @[],
channelId = channelId,
content = @[byte(1), byte(2)],
bloomFilter = @[],
)
suite "SDS persistency adapter (0.3.0 snapshot model)":
asyncTest "saveChannelMeta + updateHistory round-trip via loadChannel":
let root = tmpRoot("roundtrip")
defer:
removeDir(root)
let p = Persistency.instance(root).get()
defer:
Persistency.reset()
let job = p.openJob("sds").get()
let persistence = newSdsPersistence(job)
let channelId = "chan-1".SdsChannelID
var meta = ChannelMeta.init()
meta.lamportTimestamp = 42
check (await persistence.saveChannelMeta(channelId, meta)).isOk
check (await job.pollExists(CatMeta, toKey(channelId)))
# append out of (lamport) order on purpose; loadChannel must sort.
var upd = HistoryUpdate.init()
upd.append = @[mkMsg(channelId, "m2", 2), mkMsg(channelId, "m1", 1)]
check (await persistence.updateHistory(channelId, upd)).isOk
check (await job.pollExists(CatLog, key(channelId, "m2")))
let data = (await persistence.loadChannel(channelId)).valueOr:
check false
return
check data.meta.lamportTimestamp == 42
check data.messageHistory.len == 2
check data.messageHistory[0].messageId == "m1"
check data.messageHistory[1].messageId == "m2"
asyncTest "loadChannel on a fresh channel returns empty ChannelData":
let root = tmpRoot("empty")
defer:
removeDir(root)
let p = Persistency.instance(root).get()
defer:
Persistency.reset()
let job = p.openJob("sds").get()
let persistence = newSdsPersistence(job)
let data = (await persistence.loadChannel("nope".SdsChannelID)).valueOr:
check false
return
check data.meta.lamportTimestamp == 0
check data.messageHistory.len == 0
asyncTest "updateHistory evict removes a log row":
let root = tmpRoot("evict")
defer:
removeDir(root)
let p = Persistency.instance(root).get()
defer:
Persistency.reset()
let job = p.openJob("sds").get()
let persistence = newSdsPersistence(job)
let channelId = "c".SdsChannelID
var upd = HistoryUpdate.init()
upd.append = @[mkMsg(channelId, "a", 1), mkMsg(channelId, "b", 2)]
check (await persistence.updateHistory(channelId, upd)).isOk
check (await job.pollExists(CatLog, key(channelId, "b")))
var ev = HistoryUpdate.init()
ev.evict = @["a".SdsMessageID]
check (await persistence.updateHistory(channelId, ev)).isOk
check (await job.pollGone(CatLog, key(channelId, "a")))
let data = (await persistence.loadChannel(channelId)).valueOr:
check false
return
check data.messageHistory.len == 1
check data.messageHistory[0].messageId == "b"
asyncTest "dropChannel wipes meta and log":
let root = tmpRoot("drop")
defer:
removeDir(root)
let p = Persistency.instance(root).get()
defer:
Persistency.reset()
let job = p.openJob("sds").get()
let persistence = newSdsPersistence(job)
let channelId = "d".SdsChannelID
var meta = ChannelMeta.init()
meta.lamportTimestamp = 7
check (await persistence.saveChannelMeta(channelId, meta)).isOk
var upd = HistoryUpdate.init()
upd.append = @[mkMsg(channelId, "x", 1)]
check (await persistence.updateHistory(channelId, upd)).isOk
check (await job.pollExists(CatMeta, toKey(channelId)))
check (await job.pollExists(CatLog, key(channelId, "x")))
check (await persistence.dropChannel(channelId)).isOk
check (await job.pollGone(CatMeta, toKey(channelId)))
let data = (await persistence.loadChannel(channelId)).valueOr:
check false
return
check data.meta.lamportTimestamp == 0
check data.messageHistory.len == 0

View File

@ -61,17 +61,9 @@ requires "nim >= 2.2.4",
# Packages not on nimble (use git URLs)
requires "https://github.com/logos-messaging/nim-ffi"
requires "https://github.com/logos-messaging/nim-sds.git#2e9a7683f0e180bf112135fae3a3803eed8490d4"
requires "https://github.com/logos-messaging/nim-sds.git#abdd40cc645f1b024c3ee99cced7e287c4e4c441"
# brokers: pinned by URL+commit rather than the bare `brokers >= 2.0.1`
# form because the nim-lang/packages registry entry for `brokers` only
# carries metadata for the original v0.1.0 publication. Until that
# registry entry is refreshed, the local SAT solver enumerates "0.1.0"
# as the only available version and cannot satisfy `>= 2.0.1`. The URL
# pin below bypasses the registry and locks the exact commit of the
# v2.0.1 tag. Revert to the bare form once nim-lang/packages is
# updated.
requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v2.0.1"
requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v3.1.1"
requires "https://github.com/vacp2p/nim-lsquic"
requires "https://github.com/vacp2p/nim-jwt.git#057ec95eb5af0eea9c49bfe9025b3312c95dc5f2"

View File

@ -68,7 +68,7 @@ proc mtMarshalValue*(
of txPut:
if not mtMarshalValue(buf, cap, value.payload, pos):
return false
of txDelete:
of txDelete, txDeletePrefix:
discard
return true
@ -93,6 +93,8 @@ proc mtUnmarshalValue*(
value = TxOp(category: category, key: key, kind: txPut, payload: payload)
of txDelete:
value = TxOp(category: category, key: key, kind: txDelete)
of txDeletePrefix:
value = TxOp(category: category, key: key, kind: txDeletePrefix)
return true
EventBroker(mt):

View File

@ -7,7 +7,7 @@
import std/options
import results, sqlite3_abi
import ../common/databases/[common, db_sqlite]
import ./[types, schema]
import ./[types, keys, schema]
type
KvBackend* = ref object
@ -121,6 +121,37 @@ proc close*(b: KvBackend) =
b.db.close()
b.db = nil
proc deletePrefix(
b: KvBackend, category: string, prefix: Key
): Result[void, PersistencyError] =
let rng = prefixRange(prefix)
let openEnded = bytes(rng.stop).len == 0
let sql =
if openEnded:
"DELETE FROM kv WHERE category = ? AND key >= ?;"
else:
"DELETE FROM kv WHERE category = ? AND key >= ? AND key < ?;"
var s: ptr sqlite3_stmt
let rc = sqlite3_prepare_v2(b.db.env, sql.cstring, sql.len.cint, addr s, nil)
if rc != SQLITE_OK:
return err(toErr("deletePrefix prepare: " & $sqlite3_errstr(rc)))
defer:
discard sqlite3_finalize(s)
var bc = bindBlob(s, 1.cint, catBytes(category))
if bc != SQLITE_OK:
return err(toErr("deletePrefix bind cat: " & $sqlite3_errstr(bc)))
bc = bindBlob(s, 2.cint, keyBytes(rng.start))
if bc != SQLITE_OK:
return err(toErr("deletePrefix bind start: " & $sqlite3_errstr(bc)))
if not openEnded:
bc = bindBlob(s, 3.cint, keyBytes(rng.stop))
if bc != SQLITE_OK:
return err(toErr("deletePrefix bind stop: " & $sqlite3_errstr(bc)))
let v = sqlite3_step(s)
if v != SQLITE_DONE:
return err(toErr("deletePrefix step: " & $sqlite3_errstr(v)))
return ok()
proc applyOne(b: KvBackend, op: TxOp): Result[void, PersistencyError] =
case op.kind
of txPut:
@ -131,6 +162,8 @@ proc applyOne(b: KvBackend, op: TxOp): Result[void, PersistencyError] =
let r = b.deleteStmt.exec((catBytes(op.category), keyBytes(op.key)))
if r.isErr:
return err(toErr("delete failed: " & r.error))
of txDeletePrefix:
?b.deletePrefix(op.category, op.key)
return ok()
proc execSql(b: KvBackend, sql: string): Result[void, PersistencyError] =

View File

@ -284,6 +284,11 @@ proc persistPut*(
proc persistDelete*(t: Job, category: string, key: Key): Future[void] {.async.} =
await persist(t, TxOp(category: category, key: key, kind: txDelete))
proc persistDeletePrefix*(
t: Job, category: string, prefix: Key
): Future[void] {.async.} =
await persist(t, TxOp(category: category, key: prefix, kind: txDeletePrefix))
proc persistEncoded*[T](
t: Job, category: string, key: Key, value: T
): Future[void] {.async.} =
@ -335,6 +340,13 @@ proc persistDelete*(
if not j.isNil():
await j.persistDelete(category, key)
proc persistDeletePrefix*(
p: Persistency, jobId: string, category: string, prefix: Key
): Future[void] {.async.} =
let j = p.jobOrWarn(jobId)
if not j.isNil():
await j.persistDeletePrefix(category, prefix)
proc persistEncoded*[T](
p: Persistency, jobId: string, category: string, key: Key, value: T
): Future[void] {.async.} =

View File

@ -0,0 +1,177 @@
## Adapter that materialises the SDS `Persistence` contract (nim-sds 0.3.0,
## snapshot model) on top of a waku-persistency `Job`. One `Job` (== one
## SQLite file, one worker thread) services all channels for a given SDS
## context; rows are namespaced by category and the channelId is the first
## key component so per-channel prefix scans stay cheap.
##
## ## Snapshot contract (nim-sds 0.3.0)
##
## The fine-grained per-row callbacks of 0.2.4 are gone. SDS now persists via
## five procs, all `Future[Result[void, string]]` (load returns
## `Result[ChannelData, string]`), `{.async: (raises: []), gcsafe.}`:
##
## * **`saveChannelMeta`** — the complete fast-changing per-channel state
## (lamport clock, outgoing/incoming buffers, both SDS-R repair buffers)
## as ONE blob. Idempotent; a missed write self-heals on the next save.
## * **`updateHistory`** — append newly-delivered messages / evict the
## oldest past the cap, applied as one transactional batch.
## * **`loadChannel`** — bootstrap: returns the prior `ChannelData`
## (meta + ordered message history) or an empty one. Surfaces errors.
## * **`dropChannel`** — wipe all state for a channel. Surfaces errors.
## * **`setRetrievalHint`** — see below; deliberate no-op here.
##
## Failure policy mirrors the interface docs: save/update/hint are non-fatal
## (we log and still return the error string); load/drop are durability-intent
## and propagate their error to the caller.
##
## ## Codec
##
## The blob transform is owned by nim-sds: `ChannelMeta` round-trips through
## `sds/snapshot_codec` (protobuf, schema-versioned — refuses unknown
## versions), and each persisted `SdsMessage` log row through the SDS wire
## codec in `sds/protobuf`. We do not maintain a second codec for these
## shapes (the previous `payload_codec`/`BlobCodec` path is retired).
##
## ## Retrieval hints
##
## `setRetrievalHint` is intentionally a no-op: persisted hints are never read
## back — `loadChannel` returns `ChannelData` (meta + messageHistory) with no
## hint field, and `ChannelMeta` carries none. Hints are supplied live via the
## `onRetrievalHint` provider, so persisting them would be write-only dead
## data. The closure still exists because the field is required by the
## `Persistence` object (SDS calls it from `getRecentHistoryEntries`).
##
## ## Storage layout
##
## | Category | Key | Value |
## |---------------|--------------------------|----------------------------------------|
## | `sds.meta` | `key(channelId)` | `ChannelMeta` (snapshot_codec protobuf)|
## | `sds.log` | `key(channelId, msgId)` | `SdsMessage` (sds wire protobuf) |
##
## `messageHistory` is reconstructed in memory by sorting on
## `(lamportTimestamp, messageId)` — the same total order SDS uses for
## delivery (see sds/sds_utils.nim).
{.push raises: [].}
import std/[algorithm, options]
import chronos, chronicles, results
import libp2p/protobuf/minprotobuf
import ./persistency
import ./keys
import types/persistence
import snapshot_codec
import protobuf
export persistence, persistency
logScope:
topics = "sds-persistency"
const
CatMeta* = "sds.meta"
CatLog* = "sds.log"
# ── Public factory ──────────────────────────────────────────────────────
proc newSdsPersistence*(job: Job): Persistence {.gcsafe, raises: [].} =
## Build an SDS `Persistence` value backed by ``job``. One Job services
## all channels — channelId is part of every key.
##
## The closures capture ``job`` by ref. They must be invoked from a thread
## that owns a running chronos loop (the SDS context's worker thread
## satisfies this).
doAssert not job.isNil, "newSdsPersistence: job is nil"
# Built field-by-field via assignment rather than an object literal: every
# field is an async closure whose body uses `await`/`return` statements,
# which cannot be followed by the `,` field separator a `Persistence(..)`
# literal would require. Assignments have no separator, so bodies stay plain.
var persistence = Persistence()
persistence.saveChannelMeta = proc(
channelId: SdsChannelID, meta: ChannelMeta
): Future[Result[void, string]] {.async: (raises: []), gcsafe.} =
try:
await job.persistPut(CatMeta, toKey(channelId), encode(meta).buffer)
return ok()
except CatchableError as e:
warn "sds-persistency: saveChannelMeta failed", channelId, err = e.msg
return err(e.msg)
persistence.updateHistory = proc(
channelId: SdsChannelID, update: HistoryUpdate
): Future[Result[void, string]] {.async: (raises: []), gcsafe.} =
if update.isEmpty:
return ok()
# One transactional batch: append rows (txPut) and evictions (txDelete).
var ops = newSeq[TxOp]()
for m in update.append:
ops.add TxOp(
category: CatLog,
key: key(channelId, m.messageId),
kind: txPut,
payload: encode(m).buffer,
)
for id in update.evict:
ops.add TxOp(category: CatLog, key: key(channelId, id), kind: txDelete)
try:
await job.persist(ops)
return ok()
except CatchableError as e:
warn "sds-persistency: updateHistory failed",
channelId, appended = update.append.len, evicted = update.evict.len, err = e.msg
return err(e.msg)
persistence.loadChannel = proc(
channelId: SdsChannelID
): Future[Result[ChannelData, string]] {.async: (raises: []), gcsafe.} =
let chanKey = toKey(channelId)
var data = ChannelData.init()
try:
block meta:
let opt = (await job.get(CatMeta, chanKey)).valueOr:
return err("loadChannel: get meta: " & $error)
if opt.isSome:
# schema-versioned decode; refuses unknown versions loudly.
data.meta = ChannelMeta.decode(opt.get).valueOr:
return err("loadChannel: corrupt or unsupported ChannelMeta blob")
block history:
let rows = (await job.scanPrefix(CatLog, chanKey)).valueOr:
return err("loadChannel: scan log: " & $error)
var msgs = newSeq[SdsMessage]()
for row in rows:
let m = SdsMessage.decode(row.payload).valueOr:
warn "sds-persistency: skipping undecodable log row", channelId
continue
msgs.add(m)
msgs.sort do(a, b: SdsMessage) -> int:
result = cmp(a.lamportTimestamp, b.lamportTimestamp)
if result == 0:
result = cmp(a.messageId, b.messageId)
data.messageHistory = msgs
return ok(data)
except CatchableError as e:
return err("loadChannel: " & e.msg)
persistence.dropChannel = proc(
channelId: SdsChannelID
): Future[Result[void, string]] {.async: (raises: []), gcsafe.} =
let chanKey = toKey(channelId)
try:
await job.persist(
@[
TxOp(category: CatLog, key: chanKey, kind: txDeletePrefix),
TxOp(category: CatMeta, key: chanKey, kind: txDelete),
]
)
return ok()
except CatchableError as e:
error "sds-persistency: dropChannel failed", channelId, err = e.msg
return err(e.msg)
return persistence
{.pop.}

View File

@ -19,6 +19,7 @@ type
TxOpKind* = enum
txPut
txDelete
txDeletePrefix
TxOp* = object
category*: string
@ -28,6 +29,8 @@ type
payload*: seq[byte]
of txDelete:
discard
of txDeletePrefix:
discard
PersistencyErrorKind* = enum
peBackend

View File

@ -0,0 +1,3 @@
import waku/persistency/persistency
export persistency