diff --git a/AGENTS.md b/AGENTS.md index 80c1843..ff1cb75 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,7 +1,7 @@ # GitNexus — Code Intelligence -This project is indexed by GitNexus as **nim-sds** (1079 symbols, 1770 relationships, 61 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. +This project is indexed by GitNexus as **nim-sds** (1100 symbols, 1820 relationships, 66 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. > If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first. diff --git a/CLAUDE.md b/CLAUDE.md index 3e35e35..94736b6 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -166,7 +166,7 @@ If using Nix, also recalculate the fixed-output hash in `nix/deps.nix` after upd # GitNexus — Code Intelligence -This project is indexed by GitNexus as **nim-sds** (1079 symbols, 1770 relationships, 61 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. +This project is indexed by GitNexus as **nim-sds** (1100 symbols, 1820 relationships, 66 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. > If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first. diff --git a/sds.nim b/sds.nim index 06d9f58..2a7674d 100644 --- a/sds.nim +++ b/sds.nim @@ -8,17 +8,13 @@ proc newReliabilityManager*( participantId: SdsParticipantID, config: ReliabilityConfig = defaultConfig(), persistence: Persistence = noOpPersistence(), - persistenceV2: PersistenceV2 = noOpPersistenceV2(), ): Result[ReliabilityManager, ReliabilityError] = ## Creates a new multi-channel ReliabilityManager. ## `participantId` is REQUIRED (see `ReliabilityManager.new`). - ## `persistence` is the legacy fine-grained backend; defaults to a no-op. - ## `persistenceV2` is the snapshot-based backend (target interface; see - ## PLAN_SNAPSHOT_PERSISTENCE.md). Also defaults to a no-op. During the - ## phased refactor both backends coexist; phase 3 deletes the legacy - ## interface and renames `persistenceV2` to `persistence`. + ## `persistence` defaults to a no-op backend; supply a real one to durably + ## store SDS state across restarts. try: - let rm = ReliabilityManager.new(participantId, config, persistence, persistenceV2) + let rm = ReliabilityManager.new(participantId, config, persistence) return ok(rm) except Exception: error "Failed to create ReliabilityManager", msg = getCurrentExceptionMsg() diff --git a/sds/sds_utils.nim b/sds/sds_utils.nim index 10329dc..ac5829d 100644 --- a/sds/sds_utils.nim +++ b/sds/sds_utils.nim @@ -22,18 +22,17 @@ proc reliabilityErr*(detail: string): ReliabilityError {.gcsafe, raises: [].} = ## persistence failure is recorded, while the enum value travels up the ## `Result` chain to the public API caller, who decides what to do. ## - ## NOTE (refactor in progress): with the snapshot-based PersistenceV2 - ## interface, most protocol ops no longer propagate persistence errors at - ## all — they log and continue (see PLAN §8). This helper is still used - ## by the legacy `Persistence` call sites during phase 2 migration, and - ## by the durability-intent ops (removeChannel, resetReliabilityManager) - ## that retain the err-on-failure semantics in the V2 world too. + ## With the snapshot-based Persistence interface, most protocol ops no + ## longer propagate persistence errors at all — they log and continue + ## (see PLAN_SNAPSHOT_PERSISTENCE.md §8). This helper is still used by + ## the durability-intent ops (removeChannel, resetReliabilityManager, + ## getOrCreateChannel) that retain err-on-failure semantics. warn "persistence operation failed", detail = detail ReliabilityError.rePersistenceError proc snapshotMeta*(channel: ChannelContext): ChannelMeta {.gcsafe, raises: [].} = ## Captures the current in-memory state of a `ChannelContext` as a - ## `ChannelMeta` blob, suitable for `PersistenceV2.saveChannelMeta`. + ## `ChannelMeta` blob, suitable for `Persistence.saveChannelMeta`. ## ## The in-memory shape uses `Table`-keyed buffers for fast lookup; ## `ChannelMeta` flattens them to `seq`s for stable wire serialization @@ -60,7 +59,7 @@ proc trySaveMeta*( ## ## This helper is the single point where snapshot-save failures are ## logged; callers do not need to handle the Result. - let res = await rm.persistenceV2.saveChannelMeta(channelId, snapshotMeta(channel)) + let res = await rm.persistence.saveChannelMeta(channelId, snapshotMeta(channel)) if res.isErr: warn "snapshot save failed; in-memory state authoritative, next op will retry", channelId = channelId, detail = res.error @@ -77,7 +76,7 @@ proc tryUpdateHistory*( if append.len == 0 and evict.len == 0: return let update = HistoryUpdate(append: append, evict: evict) - let res = await rm.persistenceV2.updateHistory(channelId, update) + let res = await rm.persistence.updateHistory(channelId, update) if res.isErr: warn "history update failed; in-memory log authoritative, next op will retry", channelId = channelId, detail = res.error @@ -92,7 +91,7 @@ proc dropChannelFromPersistence*( ## Phase 2D: uses `persistenceV2.dropChannel`. This op DOES propagate ## err on failure (durability is the semantic intent — the caller asked ## us to confirm a disk wipe; we cannot silently lie). See PLAN §8. - (await rm.persistenceV2.dropChannel(channelId)).isOkOr: + (await rm.persistence.dropChannel(channelId)).isOkOr: return err(reliabilityErr(error)) ok() @@ -273,7 +272,7 @@ proc getRecentHistoryEntries*( # Phase 2B: best-effort hint persistence via V2. Non-fatal — # hints are an optimisation; a missing hint just means the # peer falls back to slower retrieval. - let hintRes = await rm.persistenceV2.setRetrievalHint( + let hintRes = await rm.persistence.setRetrievalHint( msgId, entry.retrievalHint ) if hintRes.isErr: @@ -380,7 +379,7 @@ proc getOrCreateChannel*( rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate ) ) - let data = (await rm.persistenceV2.loadChannel(channelId)).valueOr: + let data = (await rm.persistence.loadChannel(channelId)).valueOr: return err(reliabilityErr(error)) channel.lamportTimestamp = data.meta.lamportTimestamp # Backend contract: messageHistory MUST be ordered oldest-first. diff --git a/sds/types/persistence.nim b/sds/types/persistence.nim index 6517d18..6f5e459 100644 --- a/sds/types/persistence.nim +++ b/sds/types/persistence.nim @@ -1,177 +1,94 @@ +## Snapshot-based persistence interface (5 procs). +## +## Each protocol op issues AT MOST one `saveChannelMeta` and one +## `updateHistory` call at the end of the op, under the channel lock. The +## meta blob is the complete current per-channel state (lamport clock, +## outgoing/incoming buffers, SDS-R repair buffers); the history update +## carries (append, evict) for the message log. Bloom filter is rebuilt +## from history on bootstrap, never persisted. +## +## Atomicity expectation: nim-sds issues `saveChannelMeta` and (when +## non-empty) `updateHistory` back-to-back with NO intervening +## `await`-of-other-work. The backend MAY treat the pair as one +## transaction. The pair is keyed on the same `channelId`. +## +## Failure policy: a failed `saveChannelMeta` or `updateHistory` MUST NOT +## abort the protocol op. The next op's save is fully self-contained and +## will re-synchronise on-disk state. See PLAN_SNAPSHOT_PERSISTENCE.md §8. +## `loadChannel` and `dropChannel` DO surface errors — they're the +## durability-intent ops. + import chronos, results import ./sds_message_id -import ./sds_message -import ./unacknowledged_message -import ./incoming_message -import ./repair_entry -export - results, sds_message_id, sds_message, unacknowledged_message, incoming_message, - repair_entry +import ./channel_meta +import ./history_update +export results, sds_message_id, channel_meta, history_update -## SDS state persistence interface (issue #64). -## -## Defines WHAT operations a persistence backend must provide. The actual -## storage technology (SQLite, encrypted file, in-memory) is supplied by the -## caller — nim-sds knows nothing about it. Every state-mutating proc in the -## protocol calls into one of these procs immediately after the in-memory -## change, so on-disk state stays in lockstep with in-memory state. -## -## All proc fields are async (return `Future`) so backends can do real I/O -## without blocking the Chronos event loop the manager runs on. -## -## Every field returns a `Result` so backend failures are propagated to nim-sds -## rather than swallowed by the backend. Mutating ops return -## `Result[void, string]`; the getter (`loadAllForChannel`) returns -## `Result[ChannelSnapshot, string]`. The error is a backend-supplied message; -## nim-sds maps it to `ReliabilityError.rePersistenceError` and surfaces it on -## the corresponding public API call. The contract still forbids raising -## (`raises: []`): failure must travel through the `Result`, not an exception. -## -## Bloom filter is intentionally not persisted: it is rebuilt from the local -## history log on bootstrap. Async timers are likewise recomputed from the -## absolute timestamps stored in the repair buffer entries. +type Persistence* = object + ## Pluggable durability backend. Supplied at `newReliabilityManager` + ## construction time; defaults to `noOpPersistence()` when not given. -type - ChannelSnapshot* = object - ## Returned by `loadAllForChannel` on bootstrap. Carries the entire - ## per-channel state needed to repopulate a `ChannelContext`. The bloom - ## filter is NOT in the snapshot — callers rebuild it from `messageHistory`. - lamportTimestamp*: int64 - messageHistory*: seq[SdsMessage] - ## MUST be ordered oldest-first. FIFO eviction relies on insertion order; - ## skipping ORDER BY corrupts the log across restarts. - outgoingBuffer*: seq[UnacknowledgedMessage] - incomingBuffer*: seq[IncomingMessage] - outgoingRepairBuffer*: seq[(SdsMessageID, OutgoingRepairEntry)] - incomingRepairBuffer*: seq[(SdsMessageID, IncomingRepairEntry)] + saveChannelMeta*: proc( + channelId: SdsChannelID, meta: ChannelMeta + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + ## Persist the complete current per-channel snapshot. Idempotent: the + ## blob is the full state, so a missed write is recovered by any later + ## successful write. - Persistence* = object - ## Pluggable persistence contract. The caller supplies an instance of this - ## type at `newReliabilityManager` construction time. Each proc field is - ## invoked by nim-sds at the corresponding state-mutation point. - ## All fields are async; nim-sds awaits each call to keep on-disk and - ## in-memory state in lockstep without blocking the event loop. + updateHistory*: proc( + channelId: SdsChannelID, update: HistoryUpdate + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + ## Append newly-delivered messages and evict oldest ones past the + ## maxMessageHistory cap. Callers SHOULD skip this call entirely when + ## `update.isEmpty`. - # Per-channel lamport clock - saveLamport*: proc( - channelId: SdsChannelID, lamport: int64 - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + loadChannel*: proc( + channelId: SdsChannelID + ): Future[Result[ChannelData, string]] {.async: (raises: []), gcsafe.} + ## Bootstrap on `getOrCreateChannel`. Returns the full prior state, or + ## an empty `ChannelData` if the channel is new on disk. Failure + ## propagates to the caller — bootstrap is a durability-intent op. - # Local log (delivered messages) - appendLogEntry*: proc( - channelId: SdsChannelID, msg: SdsMessage - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - removeLogEntry*: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - setRetrievalHint*: proc( - msgId: SdsMessageID, hint: seq[byte] - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + dropChannel*: proc( + channelId: SdsChannelID + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + ## Wipe all persisted state for a channel. Called by `removeChannel` / + ## `resetReliabilityManager`. Backends SHOULD execute atomically. + ## Failure propagates to the caller — the caller asked us to confirm a + ## disk wipe and we cannot silently lie. - # Outgoing unacknowledged buffer - saveOutgoing*: proc( - channelId: SdsChannelID, msg: UnacknowledgedMessage - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - removeOutgoing*: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - - # Incoming dependency-waiting buffer - saveIncoming*: proc( - channelId: SdsChannelID, msg: IncomingMessage - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - removeIncoming*: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - - # SDS-R outgoing repair buffer - saveOutgoingRepair*: proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: OutgoingRepairEntry - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - removeOutgoingRepair*: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - - # SDS-R incoming repair buffer - saveIncomingRepair*: proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: IncomingRepairEntry - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - removeIncomingRepair*: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - - # Wipe all persisted state for a channel in one transactional call. - # Called by removeChannel / resetReliabilityManager. Backends should - # implement this atomically (e.g. one BEGIN/COMMIT) — a per-row loop on - # the nim-sds side would mean N fsyncs per drop. - dropChannel*: proc(channelId: SdsChannelID): Future[Result[void, string]] {. - async: (raises: []), gcsafe - .} - - # Bootstrap on `addChannel` / `getOrCreateChannel`. - loadAllForChannel*: proc( - channelId: SdsChannelID - ): Future[Result[ChannelSnapshot, string]] {.async: (raises: []), gcsafe.} + setRetrievalHint*: proc( + msgId: SdsMessageID, hint: seq[byte] + ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + ## Record a retrieval hint for a message id. Called from + ## `getRecentHistoryEntries` when an application-supplied hint + ## provider returns a non-empty hint. Out-of-band from the + ## snapshot/history write path because hints are populated lazily + ## during read. Non-fatal on failure. proc noOpPersistence*(): Persistence = - ## Default backend that discards every write and returns an empty snapshot. - ## Used so existing callers (and tests) that don't care about durability - ## keep working without supplying a real backend. + ## Default backend: discards all writes, returns an empty snapshot on + ## load. Used when no real backend is supplied (existing tests and + ## non-durability-needing callers). Persistence( - saveLamport: proc( - channelId: SdsChannelID, lamport: int64 + saveChannelMeta: proc( + channelId: SdsChannelID, meta: ChannelMeta ): Future[Result[void, string]] {.async: (raises: []).} = ok(), - appendLogEntry: proc( - channelId: SdsChannelID, msg: SdsMessage + updateHistory: proc( + channelId: SdsChannelID, update: HistoryUpdate ): Future[Result[void, string]] {.async: (raises: []).} = ok(), - removeLogEntry: proc( - channelId: SdsChannelID, msgId: SdsMessageID + loadChannel: proc( + channelId: SdsChannelID + ): Future[Result[ChannelData, string]] {.async: (raises: []).} = + ok(ChannelData.init()), + dropChannel: proc( + channelId: SdsChannelID ): Future[Result[void, string]] {.async: (raises: []).} = ok(), setRetrievalHint: proc( msgId: SdsMessageID, hint: seq[byte] ): Future[Result[void, string]] {.async: (raises: []).} = ok(), - saveOutgoing: proc( - channelId: SdsChannelID, msg: UnacknowledgedMessage - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), - removeOutgoing: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), - saveIncoming: proc( - channelId: SdsChannelID, msg: IncomingMessage - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), - removeIncoming: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), - saveOutgoingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: OutgoingRepairEntry - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), - removeOutgoingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), - saveIncomingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: IncomingRepairEntry - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), - removeIncomingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), - dropChannel: proc( - channelId: SdsChannelID - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), - loadAllForChannel: proc( - channelId: SdsChannelID - ): Future[Result[ChannelSnapshot, string]] {.async: (raises: []).} = - ok(ChannelSnapshot()), ) diff --git a/sds/types/persistence_v2.nim b/sds/types/persistence_v2.nim deleted file mode 100644 index aed4635..0000000 --- a/sds/types/persistence_v2.nim +++ /dev/null @@ -1,94 +0,0 @@ -## Snapshot-based persistence interface (5 procs). -## -## This is the target interface for the refactor described in -## PLAN_SNAPSHOT_PERSISTENCE.md. It coexists with the legacy 13-proc -## `Persistence` (in `./persistence.nim`) during phase 2 of the refactor: -## protocol ops are migrated one at a time. Phase 3 deletes the old -## interface and renames `PersistenceV2` to `Persistence`. -## -## Why 5 procs instead of 13: every protocol op now issues at most ONE -## meta save + ONE history update at the end of the op, eliminating -## per-mutation persistence calls and the partial-write divergence they -## made unavoidable. See PLAN_SNAPSHOT_PERSISTENCE.md §2 and §8. - -import chronos, results -import ./sds_message_id -import ./channel_meta -import ./history_update -export results, sds_message_id, channel_meta, history_update - -type PersistenceV2* = object - ## Snapshot-based persistence contract. Supplied at - ## `newReliabilityManager` construction time. Each proc field is invoked - ## by nim-sds AT MOST ONCE per protocol op, at the end of the op, under - ## the channel lock. - ## - ## Atomicity expectation: nim-sds issues `saveChannelMeta` and (when - ## non-empty) `updateHistory` back-to-back with NO intervening - ## `await`-of-other-work. The backend MAY treat the pair as one - ## transaction. The pair is keyed on the same `channelId`. - ## - ## Failure policy: a failed `saveChannelMeta` or `updateHistory` MUST NOT - ## abort the protocol op. The next op's save is fully self-contained and - ## will re-synchronise on-disk state. See PLAN §8. - - saveChannelMeta*: proc( - channelId: SdsChannelID, meta: ChannelMeta - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - ## Persist the complete current per-channel snapshot. Idempotent: the - ## blob is the full state, so a missed write is recovered by any later - ## successful write. - - updateHistory*: proc( - channelId: SdsChannelID, update: HistoryUpdate - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - ## Append newly-delivered messages and evict oldest ones past the - ## maxMessageHistory cap. Callers SHOULD skip this call entirely when - ## `update.isEmpty`. - - loadChannel*: proc( - channelId: SdsChannelID - ): Future[Result[ChannelData, string]] {.async: (raises: []), gcsafe.} - ## Bootstrap on `getOrCreateChannel`. Returns the full prior state, or - ## an empty `ChannelData` if the channel is new on disk. - - dropChannel*: proc( - channelId: SdsChannelID - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - ## Wipe all persisted state for a channel. Called by `removeChannel` / - ## `resetReliabilityManager`. Backends SHOULD execute atomically. - - setRetrievalHint*: proc( - msgId: SdsMessageID, hint: seq[byte] - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - ## Record a retrieval hint for a message id. Called from - ## `getRecentHistoryEntries` when an application-supplied hint provider - ## returns a non-empty hint. Out-of-band from the snapshot/history - ## write path because hints are populated lazily during read. - -proc noOpPersistenceV2*(): PersistenceV2 = - ## Default backend: discards all writes, returns an empty snapshot on - ## load. Used when no real backend is supplied (existing tests and - ## non-durability-needing callers). - PersistenceV2( - saveChannelMeta: proc( - channelId: SdsChannelID, meta: ChannelMeta - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), - updateHistory: proc( - channelId: SdsChannelID, update: HistoryUpdate - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), - loadChannel: proc( - channelId: SdsChannelID - ): Future[Result[ChannelData, string]] {.async: (raises: []).} = - ok(ChannelData.init()), - dropChannel: proc( - channelId: SdsChannelID - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), - setRetrievalHint: proc( - msgId: SdsMessageID, hint: seq[byte] - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), - ) diff --git a/sds/types/reliability_manager.nim b/sds/types/reliability_manager.nim index 8bbfbfe..6b4cc7e 100644 --- a/sds/types/reliability_manager.nim +++ b/sds/types/reliability_manager.nim @@ -6,23 +6,16 @@ import ./callbacks import ./reliability_config import ./channel_context import ./persistence -import ./persistence_v2 export sds_message_id, history_entry, callbacks, reliability_config, channel_context, - persistence, persistence_v2 + persistence type ReliabilityManager* = ref object channels*: Table[SdsChannelID, ChannelContext] config*: ReliabilityConfig participantId*: SdsParticipantID persistence*: Persistence - ## Legacy fine-grained persistence interface. Phase 1 of the refactor - ## (see PLAN_SNAPSHOT_PERSISTENCE.md) keeps this alongside `persistenceV2` - ## so protocol ops can be migrated one at a time. - persistenceV2*: PersistenceV2 - ## Snapshot-based persistence interface. Defaults to a no-op when not - ## supplied. During phase 2 of the refactor, individual protocol ops - ## are migrated from `persistence.X` to `persistenceV2.X`. + ## Pluggable durability backend; defaults to a no-op when not supplied. lock*: AsyncLock ## Single-threaded Chronos cooperative lock. Serializes mutators against ## one another at await points; the manager assumes all calls come from @@ -45,7 +38,6 @@ proc new*( participantId: SdsParticipantID, config: ReliabilityConfig, persistence: Persistence = noOpPersistence(), - persistenceV2: PersistenceV2 = noOpPersistenceV2(), ): T = ## `participantId` is REQUIRED — it is the per-manager identity SDS-R uses ## to populate response groups and decide which incoming repair requests @@ -58,7 +50,6 @@ proc new*( config: config, participantId: participantId, persistence: persistence, - persistenceV2: persistenceV2, lock: newAsyncLock(), periodicTasks: @[], ) diff --git a/tests/in_memory_persistence.nim b/tests/in_memory_persistence.nim index ddf68fc..087545e 100644 --- a/tests/in_memory_persistence.nim +++ b/tests/in_memory_persistence.nim @@ -1,17 +1,17 @@ +## Test-only Persistence backend backed by Nim tables. Adapts the +## snapshot-based `Persistence` interface onto a denormalised +## `InMemoryStore` shape so test assertions can inspect individual buffers +## (`store.outgoing`, `store.log`, etc.) directly. The adapter +## decomposes the meta blob on save and reconstructs it on load. +## +## `failingOps` injects backend failures. Op names match the `Persistence` +## field names: "saveChannelMeta", "updateHistory", "loadChannel", +## "dropChannel", "setRetrievalHint". + import std/[tables, sets] import chronos import sds -## Test-only Persistence backend backed by Nim tables. Lets tests verify the -## full write → restart → read-back loop without depending on SQLite (or any -## real storage technology). Exposes the underlying store so tests can assert -## on what got saved. -## -## `failingOps` injects backend failures: any op whose name is in the set -## returns `err(...)` instead of performing the operation, so tests can verify -## that nim-sds propagates the failure as `rePersistenceError`. Op names match -## the `Persistence` field names (e.g. "appendLogEntry", "loadAllForChannel"). - type InMemoryStore* = ref object lamports*: Table[SdsChannelID, int64] log*: Table[SdsChannelID, OrderedTable[SdsMessageID, SdsMessage]] @@ -21,126 +21,95 @@ type InMemoryStore* = ref object outgoingRepair*: Table[SdsChannelID, OrderedTable[SdsMessageID, OutgoingRepairEntry]] incomingRepair*: Table[SdsChannelID, OrderedTable[SdsMessageID, IncomingRepairEntry]] dropChannelCalls*: Table[SdsChannelID, int] - ## Per-channel counter; lets tests assert dropChannel is invoked exactly - ## once per logical drop (not N times — see PR #66 review). + ## Per-channel counter; lets tests assert dropChannel is invoked + ## exactly once per logical drop. failingOps*: HashSet[string] - ## Op names that should return an injected backend error. See module doc. + ## Op names that should return an injected backend error. proc newInMemoryStore*(): InMemoryStore = InMemoryStore(failingOps: initHashSet[string]()) -proc failInjected(store: InMemoryStore, op: string): Result[void, string] = - ## Returns err(...) when `op` is registered in `failingOps`, ok() otherwise. - if op in store.failingOps: - return err("injected backend failure: " & op) - ok() - proc newInMemoryPersistence*(store: InMemoryStore): Persistence = Persistence( - saveLamport: proc( - channelId: SdsChannelID, lamport: int64 + saveChannelMeta: proc( + channelId: SdsChannelID, meta: ChannelMeta ): Future[Result[void, string]] {.async: (raises: []).} = - ?store.failInjected("saveLamport") - store.lamports[channelId] = lamport + if "saveChannelMeta" in store.failingOps: + return err("injected backend failure: saveChannelMeta") + {.cast(raises: []).}: + # Lamport. + store.lamports[channelId] = meta.lamportTimestamp + + # Outgoing buffer — replace existing rows wholesale (snapshot is + # the complete state, not a delta). + store.outgoing[channelId] = + initOrderedTable[SdsMessageID, UnacknowledgedMessage]() + for u in meta.outgoingBuffer: + store.outgoing[channelId][u.message.messageId] = u + + # Incoming buffer. + store.incoming[channelId] = + initOrderedTable[SdsMessageID, IncomingMessage]() + for m in meta.incomingBuffer: + store.incoming[channelId][m.message.messageId] = m + + # Repair buffers. + store.outgoingRepair[channelId] = + initOrderedTable[SdsMessageID, OutgoingRepairEntry]() + for kv in meta.outgoingRepairBuffer: + store.outgoingRepair[channelId][kv.messageId] = kv.entry + store.incomingRepair[channelId] = + initOrderedTable[SdsMessageID, IncomingRepairEntry]() + for kv in meta.incomingRepairBuffer: + store.incomingRepair[channelId][kv.messageId] = kv.entry ok(), - appendLogEntry: proc( - channelId: SdsChannelID, msg: SdsMessage + updateHistory: proc( + channelId: SdsChannelID, update: HistoryUpdate ): Future[Result[void, string]] {.async: (raises: []).} = - ?store.failInjected("appendLogEntry") + if "updateHistory" in store.failingOps: + return err("injected backend failure: updateHistory") {.cast(raises: []).}: if channelId notin store.log: store.log[channelId] = initOrderedTable[SdsMessageID, SdsMessage]() - store.log[channelId][msg.messageId] = msg + for m in update.append: + store.log[channelId][m.messageId] = m + for id in update.evict: + store.log[channelId].del(id) ok(), - removeLogEntry: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []).} = - ?store.failInjected("removeLogEntry") - {.cast(raises: []).}: - if channelId in store.log: - store.log[channelId].del(msgId) - ok(), - setRetrievalHint: proc( - msgId: SdsMessageID, hint: seq[byte] - ): Future[Result[void, string]] {.async: (raises: []).} = - ?store.failInjected("setRetrievalHint") - store.hints[msgId] = hint - ok(), - saveOutgoing: proc( - channelId: SdsChannelID, msg: UnacknowledgedMessage - ): Future[Result[void, string]] {.async: (raises: []).} = - ?store.failInjected("saveOutgoing") - {.cast(raises: []).}: - if channelId notin store.outgoing: - store.outgoing[channelId] = - initOrderedTable[SdsMessageID, UnacknowledgedMessage]() - store.outgoing[channelId][msg.message.messageId] = msg - ok(), - removeOutgoing: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []).} = - ?store.failInjected("removeOutgoing") + loadChannel: proc( + channelId: SdsChannelID + ): Future[Result[ChannelData, string]] {.async: (raises: []).} = + if "loadChannel" in store.failingOps: + return err("injected backend failure: loadChannel") {.cast(raises: []).}: + var data = ChannelData.init() + if channelId in store.lamports: + data.meta.lamportTimestamp = store.lamports[channelId] if channelId in store.outgoing: - store.outgoing[channelId].del(msgId) - ok(), - saveIncoming: proc( - channelId: SdsChannelID, msg: IncomingMessage - ): Future[Result[void, string]] {.async: (raises: []).} = - ?store.failInjected("saveIncoming") - {.cast(raises: []).}: - if channelId notin store.incoming: - store.incoming[channelId] = initOrderedTable[SdsMessageID, IncomingMessage]() - store.incoming[channelId][msg.message.messageId] = msg - ok(), - removeIncoming: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []).} = - ?store.failInjected("removeIncoming") - {.cast(raises: []).}: + for u in store.outgoing[channelId].values: + data.meta.outgoingBuffer.add(u) if channelId in store.incoming: - store.incoming[channelId].del(msgId) - ok(), - saveOutgoingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: OutgoingRepairEntry - ): Future[Result[void, string]] {.async: (raises: []).} = - ?store.failInjected("saveOutgoingRepair") - {.cast(raises: []).}: - if channelId notin store.outgoingRepair: - store.outgoingRepair[channelId] = - initOrderedTable[SdsMessageID, OutgoingRepairEntry]() - store.outgoingRepair[channelId][msgId] = entry - ok(), - removeOutgoingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []).} = - ?store.failInjected("removeOutgoingRepair") - {.cast(raises: []).}: + for m in store.incoming[channelId].values: + data.meta.incomingBuffer.add(m) if channelId in store.outgoingRepair: - store.outgoingRepair[channelId].del(msgId) - ok(), - saveIncomingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID, entry: IncomingRepairEntry - ): Future[Result[void, string]] {.async: (raises: []).} = - ?store.failInjected("saveIncomingRepair") - {.cast(raises: []).}: - if channelId notin store.incomingRepair: - store.incomingRepair[channelId] = - initOrderedTable[SdsMessageID, IncomingRepairEntry]() - store.incomingRepair[channelId][msgId] = entry - ok(), - removeIncomingRepair: proc( - channelId: SdsChannelID, msgId: SdsMessageID - ): Future[Result[void, string]] {.async: (raises: []).} = - ?store.failInjected("removeIncomingRepair") - {.cast(raises: []).}: + for id, e in store.outgoingRepair[channelId].pairs: + data.meta.outgoingRepairBuffer.add( + OutgoingRepairKV(messageId: id, entry: e) + ) if channelId in store.incomingRepair: - store.incomingRepair[channelId].del(msgId) - ok(), + for id, e in store.incomingRepair[channelId].pairs: + data.meta.incomingRepairBuffer.add( + IncomingRepairKV(messageId: id, entry: e) + ) + if channelId in store.log: + for m in store.log[channelId].values: + data.messageHistory.add(m) + return ok(data), dropChannel: proc( channelId: SdsChannelID ): Future[Result[void, string]] {.async: (raises: []).} = - ?store.failInjected("dropChannel") + if "dropChannel" in store.failingOps: + return err("injected backend failure: dropChannel") {.cast(raises: []).}: store.lamports.del(channelId) store.log.del(channelId) @@ -151,29 +120,11 @@ proc newInMemoryPersistence*(store: InMemoryStore): Persistence = store.dropChannelCalls[channelId] = store.dropChannelCalls.getOrDefault(channelId) + 1 ok(), - loadAllForChannel: proc( - channelId: SdsChannelID - ): Future[Result[ChannelSnapshot, string]] {.async: (raises: []).} = - if "loadAllForChannel" in store.failingOps: - return err("injected backend failure: loadAllForChannel") - {.cast(raises: []).}: - var snap = ChannelSnapshot() - if channelId in store.lamports: - snap.lamportTimestamp = store.lamports[channelId] - if channelId in store.log: - for msg in store.log[channelId].values: - snap.messageHistory.add(msg) - if channelId in store.outgoing: - for unack in store.outgoing[channelId].values: - snap.outgoingBuffer.add(unack) - if channelId in store.incoming: - for incoming in store.incoming[channelId].values: - snap.incomingBuffer.add(incoming) - if channelId in store.outgoingRepair: - for msgId, entry in store.outgoingRepair[channelId]: - snap.outgoingRepairBuffer.add((msgId, entry)) - if channelId in store.incomingRepair: - for msgId, entry in store.incomingRepair[channelId]: - snap.incomingRepairBuffer.add((msgId, entry)) - return ok(snap), + setRetrievalHint: proc( + msgId: SdsMessageID, hint: seq[byte] + ): Future[Result[void, string]] {.async: (raises: []).} = + if "setRetrievalHint" in store.failingOps: + return err("injected backend failure: setRetrievalHint") + store.hints[msgId] = hint + ok(), ) diff --git a/tests/in_memory_persistence_v2.nim b/tests/in_memory_persistence_v2.nim deleted file mode 100644 index c52057e..0000000 --- a/tests/in_memory_persistence_v2.nim +++ /dev/null @@ -1,119 +0,0 @@ -## V2 test-only backend that adapts the snapshot-based `PersistenceV2` -## interface onto the same `InMemoryStore` shape used by the legacy mock. -## -## Tests can assert against `store.outgoing`, `store.log`, etc. exactly as -## they did against the legacy backend; this adapter decomposes the -## snapshot blob into the same denormalised tables. That keeps the -## state-survival tests in tests/test_persistence.nim portable across the -## migration without rewriting every assertion. -## -## `failingOps` injects backend failures. Op names match the `PersistenceV2` -## field names: "saveChannelMeta", "updateHistory", "loadChannel", -## "dropChannel", "setRetrievalHint". - -import std/[tables, sets] -import chronos -import sds -import ./in_memory_persistence - -export in_memory_persistence - -proc newInMemoryPersistenceV2*(store: InMemoryStore): PersistenceV2 = - PersistenceV2( - saveChannelMeta: proc( - channelId: SdsChannelID, meta: ChannelMeta - ): Future[Result[void, string]] {.async: (raises: []).} = - if "saveChannelMeta" in store.failingOps: - return err("injected backend failure: saveChannelMeta") - {.cast(raises: []).}: - # Lamport. - store.lamports[channelId] = meta.lamportTimestamp - - # Outgoing buffer — replace existing rows wholesale (snapshot is - # the complete state, not a delta). - store.outgoing[channelId] = - initOrderedTable[SdsMessageID, UnacknowledgedMessage]() - for u in meta.outgoingBuffer: - store.outgoing[channelId][u.message.messageId] = u - - # Incoming buffer. - store.incoming[channelId] = - initOrderedTable[SdsMessageID, IncomingMessage]() - for m in meta.incomingBuffer: - store.incoming[channelId][m.message.messageId] = m - - # Repair buffers. - store.outgoingRepair[channelId] = - initOrderedTable[SdsMessageID, OutgoingRepairEntry]() - for kv in meta.outgoingRepairBuffer: - store.outgoingRepair[channelId][kv.messageId] = kv.entry - store.incomingRepair[channelId] = - initOrderedTable[SdsMessageID, IncomingRepairEntry]() - for kv in meta.incomingRepairBuffer: - store.incomingRepair[channelId][kv.messageId] = kv.entry - ok(), - updateHistory: proc( - channelId: SdsChannelID, update: HistoryUpdate - ): Future[Result[void, string]] {.async: (raises: []).} = - if "updateHistory" in store.failingOps: - return err("injected backend failure: updateHistory") - {.cast(raises: []).}: - if channelId notin store.log: - store.log[channelId] = initOrderedTable[SdsMessageID, SdsMessage]() - for m in update.append: - store.log[channelId][m.messageId] = m - for id in update.evict: - store.log[channelId].del(id) - ok(), - loadChannel: proc( - channelId: SdsChannelID - ): Future[Result[ChannelData, string]] {.async: (raises: []).} = - if "loadChannel" in store.failingOps: - return err("injected backend failure: loadChannel") - {.cast(raises: []).}: - var data = ChannelData.init() - if channelId in store.lamports: - data.meta.lamportTimestamp = store.lamports[channelId] - if channelId in store.outgoing: - for u in store.outgoing[channelId].values: - data.meta.outgoingBuffer.add(u) - if channelId in store.incoming: - for m in store.incoming[channelId].values: - data.meta.incomingBuffer.add(m) - if channelId in store.outgoingRepair: - for id, e in store.outgoingRepair[channelId].pairs: - data.meta.outgoingRepairBuffer.add( - OutgoingRepairKV(messageId: id, entry: e) - ) - if channelId in store.incomingRepair: - for id, e in store.incomingRepair[channelId].pairs: - data.meta.incomingRepairBuffer.add( - IncomingRepairKV(messageId: id, entry: e) - ) - if channelId in store.log: - for m in store.log[channelId].values: - data.messageHistory.add(m) - return ok(data), - dropChannel: proc( - channelId: SdsChannelID - ): Future[Result[void, string]] {.async: (raises: []).} = - if "dropChannel" in store.failingOps: - return err("injected backend failure: dropChannel") - {.cast(raises: []).}: - store.lamports.del(channelId) - store.log.del(channelId) - store.outgoing.del(channelId) - store.incoming.del(channelId) - store.outgoingRepair.del(channelId) - store.incomingRepair.del(channelId) - store.dropChannelCalls[channelId] = - store.dropChannelCalls.getOrDefault(channelId) + 1 - ok(), - setRetrievalHint: proc( - msgId: SdsMessageID, hint: seq[byte] - ): Future[Result[void, string]] {.async: (raises: []).} = - if "setRetrievalHint" in store.failingOps: - return err("injected backend failure: setRetrievalHint") - store.hints[msgId] = hint - ok(), - ) diff --git a/tests/test_persistence.nim b/tests/test_persistence.nim index 863751a..bc62c07 100644 --- a/tests/test_persistence.nim +++ b/tests/test_persistence.nim @@ -1,7 +1,7 @@ import results, std/[tables, sets, times] import sds import ./async_unittest -import ./in_memory_persistence_v2 +import ./in_memory_persistence converter toParticipantID(s: string): SdsParticipantID = s.SdsParticipantID @@ -17,11 +17,11 @@ proc newV2Manager( newReliabilityManager( participantId = "alice", config = config, - persistenceV2 = newInMemoryPersistenceV2(store), + persistence = newInMemoryPersistence(store), ) .get() -suite "Persistence (V2): write → restart → read-back": +suite "Persistence: write → restart → read-back": asyncTest "outgoing buffer survives restart": let store = newInMemoryStore() let rm1 = newV2Manager(store) @@ -125,7 +125,7 @@ suite "Persistence (V2): write → restart → read-back": check testChannel notin store.incomingRepair await rm.cleanup() - asyncTest "noOpPersistenceV2 keeps existing manager working": + asyncTest "noOpPersistence keeps existing manager working": let rm = newReliabilityManager(participantId = "alice").get() # default no-op persistence (both legacy and V2) check (await rm.ensureChannel(testChannel)).isOk() @@ -359,7 +359,7 @@ suite "Persistence (V2): write → restart → read-back": check inbufFinal.len == 0 await rm2.cleanup() -suite "Persistence (V2): failure policy": +suite "Persistence: failure policy": asyncTest "loadChannel failure surfaces as rePersistenceError on bootstrap": # Bootstrap durability is the semantic intent of getOrCreateChannel — # the caller asked us to materialise a channel and we can't do that @@ -368,7 +368,7 @@ suite "Persistence (V2): failure policy": let store = newInMemoryStore() store.failingOps.incl("loadChannel") let rm = newReliabilityManager( - participantId = "alice", persistenceV2 = newInMemoryPersistenceV2(store) + participantId = "alice", persistence = newInMemoryPersistence(store) ) .get() let res = await rm.ensureChannel(testChannel) @@ -383,7 +383,7 @@ suite "Persistence (V2): failure policy": # the new policy is deliberate. let store = newInMemoryStore() let rm = newReliabilityManager( - participantId = "alice", persistenceV2 = newInMemoryPersistenceV2(store) + participantId = "alice", persistence = newInMemoryPersistence(store) ) .get() check (await rm.ensureChannel(testChannel)).isOk() @@ -407,7 +407,7 @@ suite "Persistence (V2): failure policy": # Same policy applied to the history-update path. let store = newInMemoryStore() let rm = newReliabilityManager( - participantId = "alice", persistenceV2 = newInMemoryPersistenceV2(store) + participantId = "alice", persistence = newInMemoryPersistence(store) ) .get() check (await rm.ensureChannel(testChannel)).isOk() @@ -422,7 +422,7 @@ suite "Persistence (V2): failure policy": # DOES propagate err on failure (PLAN §8). let store = newInMemoryStore() let rm = newReliabilityManager( - participantId = "alice", persistenceV2 = newInMemoryPersistenceV2(store) + participantId = "alice", persistence = newInMemoryPersistence(store) ) .get() check (await rm.ensureChannel(testChannel)).isOk()