diff --git a/waku/persistency/backend_comm.nim b/waku/persistency/backend_comm.nim index dd7e71297..193e52825 100644 --- a/waku/persistency/backend_comm.nim +++ b/waku/persistency/backend_comm.nim @@ -68,7 +68,7 @@ proc mtMarshalValue*( of txPut: if not mtMarshalValue(buf, cap, value.payload, pos): return false - of txDelete: + of txDelete, txDeletePrefix: discard return true @@ -93,6 +93,8 @@ proc mtUnmarshalValue*( value = TxOp(category: category, key: key, kind: txPut, payload: payload) of txDelete: value = TxOp(category: category, key: key, kind: txDelete) + of txDeletePrefix: + value = TxOp(category: category, key: key, kind: txDeletePrefix) return true EventBroker(mt): diff --git a/waku/persistency/backend_sqlite.nim b/waku/persistency/backend_sqlite.nim index 6851febc1..95757bc2c 100644 --- a/waku/persistency/backend_sqlite.nim +++ b/waku/persistency/backend_sqlite.nim @@ -7,7 +7,7 @@ import std/options import results, sqlite3_abi import ../common/databases/[common, db_sqlite] -import ./[types, schema] +import ./[types, keys, schema] type KvBackend* = ref object @@ -121,6 +121,37 @@ proc close*(b: KvBackend) = b.db.close() b.db = nil +proc deletePrefix( + b: KvBackend, category: string, prefix: Key +): Result[void, PersistencyError] = + let rng = prefixRange(prefix) + let openEnded = bytes(rng.stop).len == 0 + let sql = + if openEnded: + "DELETE FROM kv WHERE category = ? AND key >= ?;" + else: + "DELETE FROM kv WHERE category = ? AND key >= ? AND key < ?;" + var s: ptr sqlite3_stmt + let rc = sqlite3_prepare_v2(b.db.env, sql.cstring, sql.len.cint, addr s, nil) + if rc != SQLITE_OK: + return err(toErr("deletePrefix prepare: " & $sqlite3_errstr(rc))) + defer: + discard sqlite3_finalize(s) + var bc = bindBlob(s, 1.cint, catBytes(category)) + if bc != SQLITE_OK: + return err(toErr("deletePrefix bind cat: " & $sqlite3_errstr(bc))) + bc = bindBlob(s, 2.cint, keyBytes(rng.start)) + if bc != SQLITE_OK: + return err(toErr("deletePrefix bind start: " & $sqlite3_errstr(bc))) + if not openEnded: + bc = bindBlob(s, 3.cint, keyBytes(rng.stop)) + if bc != SQLITE_OK: + return err(toErr("deletePrefix bind stop: " & $sqlite3_errstr(bc))) + let v = sqlite3_step(s) + if v != SQLITE_DONE: + return err(toErr("deletePrefix step: " & $sqlite3_errstr(v))) + return ok() + proc applyOne(b: KvBackend, op: TxOp): Result[void, PersistencyError] = case op.kind of txPut: @@ -131,6 +162,8 @@ proc applyOne(b: KvBackend, op: TxOp): Result[void, PersistencyError] = let r = b.deleteStmt.exec((catBytes(op.category), keyBytes(op.key))) if r.isErr: return err(toErr("delete failed: " & r.error)) + of txDeletePrefix: + ?b.deletePrefix(op.category, op.key) return ok() proc execSql(b: KvBackend, sql: string): Result[void, PersistencyError] = diff --git a/waku/persistency/persistency.nim b/waku/persistency/persistency.nim index 916f3ac8b..1e070dbb5 100644 --- a/waku/persistency/persistency.nim +++ b/waku/persistency/persistency.nim @@ -284,6 +284,11 @@ proc persistPut*( proc persistDelete*(t: Job, category: string, key: Key): Future[void] {.async.} = await persist(t, TxOp(category: category, key: key, kind: txDelete)) +proc persistDeletePrefix*( + t: Job, category: string, prefix: Key +): Future[void] {.async.} = + await persist(t, TxOp(category: category, key: prefix, kind: txDeletePrefix)) + proc persistEncoded*[T]( t: Job, category: string, key: Key, value: T ): Future[void] {.async.} = @@ -335,6 +340,13 @@ proc persistDelete*( if not j.isNil(): await j.persistDelete(category, key) +proc persistDeletePrefix*( + p: Persistency, jobId: string, category: string, prefix: Key +): Future[void] {.async.} = + let j = p.jobOrWarn(jobId) + if not j.isNil(): + await j.persistDeletePrefix(category, prefix) + proc persistEncoded*[T]( p: Persistency, jobId: string, category: string, key: Key, value: T ): Future[void] {.async.} = diff --git a/waku/persistency/sds_persistency.nim b/waku/persistency/sds_persistency.nim index a52b7b4e9..44d551814 100644 --- a/waku/persistency/sds_persistency.nim +++ b/waku/persistency/sds_persistency.nim @@ -75,6 +75,32 @@ BlobCodec(IncomingMessage) BlobCodec(OutgoingRepairEntry) BlobCodec(IncomingRepairEntry) +# ── Write helpers ─────────────────────────────────────────────────────── +# +# The Persistence write fields are async with `raises: []`, but the Job ops +# raise `CatchableError`. These wrappers trap and log so the closures stay +# raise-free, preserving the "errors are logged, never raised" contract. + +proc safePut( + job: Job, category: string, k: Key, payload: seq[byte] +) {.async: (raises: []).} = + try: + await job.persistPut(category, k, payload) + except CatchableError as e: + warn "sds-persistency: put failed", category, err = e.msg + +proc safeDelete(job: Job, category: string, k: Key) {.async: (raises: []).} = + try: + await job.persistDelete(category, k) + except CatchableError as e: + warn "sds-persistency: delete failed", category, err = e.msg + +proc safePersist(job: Job, ops: seq[TxOp]) {.async: (raises: []).} = + try: + await job.persist(ops) + except CatchableError as e: + warn "sds-persistency: persist batch failed", opCount = ops.len, err = e.msg + # ── Async backing procs ───────────────────────────────────────────────── proc doLoadAll(job: Job, channelId: SdsChannelID): Future[ChannelSnapshot] {.async.} = @@ -154,53 +180,26 @@ proc doLoadAll(job: Job, channelId: SdsChannelID): Future[ChannelSnapshot] {.asy return snap proc doDropChannel(job: Job, channelId: SdsChannelID): Future[void] {.async.} = - ## Collect every row belonging to the channel and submit them as a single - ## TxOp batch — the backend applies that as one BEGIN IMMEDIATE/COMMIT, - ## which is the atomicity the SDS contract asks for. + ## Delete every row belonging to the channel in one transactional batch. + ## Uses txDeletePrefix to push bulk deletes to the worker thread — no + ## caller-side scans needed. Hint rows (keyed by msgId, not channelId) + ## are not cleaned here; they are cascade-deleted by removeLogEntry during + ## normal rolling-history eviction, so by the time a channel is dropped + ## the only remaining hints belong to the still-live log tail. Those + ## become harmless orphans (never reloaded — hints are re-derived on + ## demand from the onRetrievalHint callback). let chanKey = toKey(channelId) - var ops: seq[TxOp] = @[] - var hintIds: seq[SdsMessageID] = @[] - - let cats = [CatLog, CatOutgoing, CatIncoming, CatOutRepair, CatInRepair] - for cat in cats: - let rows = (await job.scanPrefix(cat, chanKey)).valueOr: - warn "sds-persistency: scan during drop failed", - channelId, category = cat, err = $error - continue - for row in rows: - ops.add(TxOp(category: cat, key: row.key, kind: txDelete)) - if cat == CatLog: - try: - hintIds.add(fromBlob(row.payload, SdsMessage).messageId) - except ValueError: - discard - - ops.add(TxOp(category: CatLamport, key: chanKey, kind: txDelete)) - for id in hintIds: - ops.add(TxOp(category: CatHint, key: toKey(id), kind: txDelete)) - - if ops.len > 0: - await job.persist(ops) - -# ── Write helpers ─────────────────────────────────────────────────────── -# -# The Persistence write fields are async with `raises: []`, but the Job ops -# raise `CatchableError`. These wrappers trap and log so the closures stay -# raise-free, preserving the "errors are logged, never raised" contract. - -proc safePut( - job: Job, category: string, k: Key, payload: seq[byte] -) {.async: (raises: []).} = - try: - await job.persistPut(category, k, payload) - except CatchableError as e: - warn "sds-persistency: put failed", category, err = e.msg - -proc safeDelete(job: Job, category: string, k: Key) {.async: (raises: []).} = - try: - await job.persistDelete(category, k) - except CatchableError as e: - warn "sds-persistency: delete failed", category, err = e.msg + await safePersist( + job, + @[ + TxOp(category: CatLog, key: chanKey, kind: txDeletePrefix), + TxOp(category: CatOutgoing, key: chanKey, kind: txDeletePrefix), + TxOp(category: CatIncoming, key: chanKey, kind: txDeletePrefix), + TxOp(category: CatOutRepair, key: chanKey, kind: txDeletePrefix), + TxOp(category: CatInRepair, key: chanKey, kind: txDeletePrefix), + TxOp(category: CatLamport, key: chanKey, kind: txDelete), + ], + ) # ── Public factory ────────────────────────────────────────────────────── @@ -233,7 +232,15 @@ proc newSdsPersistence*(job: Job): Persistence {.gcsafe, raises: [].} = persistence.removeLogEntry = proc( channelId: SdsChannelID, msgId: SdsMessageID ): Future[void] {.async: (raises: []), gcsafe.} = - await safeDelete(job, CatLog, key(channelId, msgId)) + # Atomic batch: delete the log row and its associated retrieval hint in + # one transaction so they can't diverge. + await safePersist( + job, + @[ + TxOp(category: CatLog, key: key(channelId, msgId), kind: txDelete), + TxOp(category: CatHint, key: toKey(msgId), kind: txDelete), + ], + ) persistence.setRetrievalHint = proc( msgId: SdsMessageID, hint: seq[byte] diff --git a/waku/persistency/types.nim b/waku/persistency/types.nim index 4c4c2de3f..0fdf12af0 100644 --- a/waku/persistency/types.nim +++ b/waku/persistency/types.nim @@ -19,6 +19,7 @@ type TxOpKind* = enum txPut txDelete + txDeletePrefix TxOp* = object category*: string @@ -28,6 +29,8 @@ type payload*: seq[byte] of txDelete: discard + of txDeletePrefix: + discard PersistencyErrorKind* = enum peBackend