From abdd40cc645f1b024c3ee99cced7e287c4e4c441 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 2 Jun 2026 14:15:03 +0200 Subject: [PATCH] Refactor persistency - follow up - review fixes (#73) removal of persistency of retrievalHints because its never read. The PR also covers some leftovers of #72 - small interface and code style changes. --- AGENTS.md | 1 + sds.nim | 2 +- sds/sds_utils.nim | 62 ++++++++++----------------------- sds/types/persistence.nim | 26 ++++---------- tests/in_memory_persistence.nim | 16 ++------- 5 files changed, 30 insertions(+), 77 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index ff1cb75..f0d2f39 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,6 +1,7 @@ # GitNexus — Code Intelligence + This project is indexed by GitNexus as **nim-sds** (1100 symbols, 1820 relationships, 66 execution flows). Use the GitNexus MCP tools to understand code, assess impact, and navigate safely. > If any GitNexus tool warns the index is stale, run `npx gitnexus analyze` in terminal first. diff --git a/sds.nim b/sds.nim index d859954..d2e4907 100644 --- a/sds.nim +++ b/sds.nim @@ -106,7 +106,7 @@ proc wrapOutgoingMessage*( return err(error) let bfResult = serializeBloomFilter(channel.bloomFilter.filter) - if bfResult.isErr: + if bfResult.isErr(): error "Failed to serialize bloom filter", channelId = channelId return err(ReliabilityError.reSerializationError) diff --git a/sds/sds_utils.nim b/sds/sds_utils.nim index 6459a4b..445dd35 100644 --- a/sds/sds_utils.nim +++ b/sds/sds_utils.nim @@ -15,22 +15,7 @@ export proc defaultConfig*(): ReliabilityConfig = return ReliabilityConfig.init() -proc reliabilityErr*(detail: string): ReliabilityError {.gcsafe, raises: [].} = - ## Maps a backend-supplied persistence error string onto the - ## `rePersistenceError` enum value. The enum carries no payload, so the - ## original detail is logged here — this is the single point where a - ## persistence failure is recorded, while the enum value travels up the - ## `Result` chain to the public API caller, who decides what to do. - ## - ## With the snapshot-based Persistence interface, most protocol ops no - ## longer propagate persistence errors at all — they log and continue - ## (see PLAN_SNAPSHOT_PERSISTENCE.md §8). This helper is still used by - ## the durability-intent ops (removeChannel, resetReliabilityManager, - ## getOrCreateChannel) that retain err-on-failure semantics. - warn "persistence operation failed", detail = detail - ReliabilityError.rePersistenceError - -proc snapshotMeta*(channel: ChannelContext): ChannelMeta {.gcsafe, raises: [].} = +proc snapshotMeta(channel: ChannelContext): ChannelMeta {.gcsafe, raises: [].} = ## Captures the current in-memory state of a `ChannelContext` as a ## `ChannelMeta` blob, suitable for `Persistence.saveChannelMeta`. ## @@ -39,16 +24,17 @@ proc snapshotMeta*(channel: ChannelContext): ChannelMeta {.gcsafe, raises: [].} ## (see PLAN §6). The bloom filter and message history are intentionally ## excluded — the former is rebuilt from the latter on bootstrap, and ## the latter is persisted separately via `updateHistory`. - result = ChannelMeta.init() - result.lamportTimestamp = channel.lamportTimestamp + var meta = ChannelMeta.init() + meta.lamportTimestamp = channel.lamportTimestamp for u in channel.outgoingBuffer: - result.outgoingBuffer.add(u) + meta.outgoingBuffer.add(u) for _, m in channel.incomingBuffer.pairs: - result.incomingBuffer.add(m) + meta.incomingBuffer.add(m) for id, e in channel.outgoingRepairBuffer.pairs: - result.outgoingRepairBuffer.add(OutgoingRepairKV(messageId: id, entry: e)) + meta.outgoingRepairBuffer.add(OutgoingRepairKV(messageId: id, entry: e)) for id, e in channel.incomingRepairBuffer.pairs: - result.incomingRepairBuffer.add(IncomingRepairKV(messageId: id, entry: e)) + meta.incomingRepairBuffer.add(IncomingRepairKV(messageId: id, entry: e)) + return meta proc trySaveMeta*( rm: ReliabilityManager, channelId: SdsChannelID, channel: ChannelContext @@ -59,12 +45,11 @@ proc trySaveMeta*( ## ## This helper is the single point where snapshot-save failures are ## logged; callers do not need to handle the Result. - let res = await rm.persistence.saveChannelMeta(channelId, snapshotMeta(channel)) - if res.isErr: + (await rm.persistence.saveChannelMeta(channelId, snapshotMeta(channel))).isOkOr: warn "snapshot save failed; in-memory state authoritative, next op will retry", - channelId = channelId, detail = res.error + channelId = channelId, detail = error -proc queueHistoryAppend*(channel: ChannelContext, msgId: SdsMessageID) = +proc queueHistoryAppend(channel: ChannelContext, msgId: SdsMessageID) = ## Push an append onto the pending history queue. Only the id is ## stored — the full SdsMessage is looked up from `messageHistory` at ## flush time (invariant: every queued id is present in messageHistory). @@ -76,7 +61,7 @@ proc queueHistoryAppend*(channel: ChannelContext, msgId: SdsMessageID) = channel.pendingHistoryEvicts.excl(msgId) channel.pendingHistoryAppends.incl(msgId) -proc queueHistoryEvict*(channel: ChannelContext, msgId: SdsMessageID) = +proc queueHistoryEvict(channel: ChannelContext, msgId: SdsMessageID) = ## Push an evict onto the pending history queue. Merge rule symmetric ## with `queueHistoryAppend`: cancels any pending append for the same ## id (the just-evicted message no longer needs to be persisted as an @@ -119,8 +104,7 @@ proc tryUpdateHistory*( except KeyError: return # checked `in` above; unreachable, but tables can raise per spec - if channel.pendingHistoryAppends.len == 0 and - channel.pendingHistoryEvicts.len == 0: + if channel.pendingHistoryAppends.len == 0 and channel.pendingHistoryEvicts.len == 0: return # nothing to flush — no round-trip cost var batch = HistoryUpdate.init() @@ -151,8 +135,7 @@ proc tryUpdateHistory*( detail = res.error if channel.pendingHistoryAppends.len > rm.config.maxMessageHistory: warn "pending history queue exceeds maxMessageHistory; backend may be stuck", - channelId = channelId, - pendingAppends = channel.pendingHistoryAppends.len + channelId = channelId, pendingAppends = channel.pendingHistoryAppends.len proc dropChannelFromPersistence*( rm: ReliabilityManager, channelId: SdsChannelID @@ -165,7 +148,8 @@ proc dropChannelFromPersistence*( ## err on failure (durability is the semantic intent — the caller asked ## us to confirm a disk wipe; we cannot silently lie). See PLAN §8. (await rm.persistence.dropChannel(channelId)).isOkOr: - return err(reliabilityErr(error)) + warn "persistence operation failed", cause = error + return err(ReliabilityError.rePersistenceError) ok() proc cleanup*(rm: ReliabilityManager) {.async: (raises: []).} = @@ -344,16 +328,6 @@ proc getRecentHistoryEntries*( if not rm.onRetrievalHint.isNil(): {.cast(raises: []).}: entry.retrievalHint = rm.onRetrievalHint(msgId) - if entry.retrievalHint.len > 0: - # Phase 2B: best-effort hint persistence via V2. Non-fatal — - # hints are an optimisation; a missing hint just means the - # peer falls back to slower retrieval. - let hintRes = await rm.persistence.setRetrievalHint( - msgId, entry.retrievalHint - ) - if hintRes.isErr: - warn "retrieval hint save failed; continuing", - msgId = msgId, detail = hintRes.error entry.senderId = channel.messageHistory[msgId].senderId entries.add(entry) ok(entries) @@ -456,7 +430,9 @@ proc getOrCreateChannel*( ) ) let data = (await rm.persistence.loadChannel(channelId)).valueOr: - return err(reliabilityErr(error)) + warn "persistence operation failed", cause = error + return err(ReliabilityError.rePersistenceError) + channel.lamportTimestamp = data.meta.lamportTimestamp # Backend contract: messageHistory MUST be ordered oldest-first. # If a backend violates this, FIFO eviction breaks across restarts. diff --git a/sds/types/persistence.nim b/sds/types/persistence.nim index 6f5e459..b077e3f 100644 --- a/sds/types/persistence.nim +++ b/sds/types/persistence.nim @@ -27,7 +27,6 @@ export results, sds_message_id, channel_meta, history_update type Persistence* = object ## Pluggable durability backend. Supplied at `newReliabilityManager` ## construction time; defaults to `noOpPersistence()` when not given. - saveChannelMeta*: proc( channelId: SdsChannelID, meta: ChannelMeta ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} @@ -42,30 +41,21 @@ type Persistence* = object ## maxMessageHistory cap. Callers SHOULD skip this call entirely when ## `update.isEmpty`. - loadChannel*: proc( - channelId: SdsChannelID - ): Future[Result[ChannelData, string]] {.async: (raises: []), gcsafe.} + loadChannel*: proc(channelId: SdsChannelID): Future[Result[ChannelData, string]] {. + async: (raises: []), gcsafe + .} ## Bootstrap on `getOrCreateChannel`. Returns the full prior state, or ## an empty `ChannelData` if the channel is new on disk. Failure ## propagates to the caller — bootstrap is a durability-intent op. - dropChannel*: proc( - channelId: SdsChannelID - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} + dropChannel*: proc(channelId: SdsChannelID): Future[Result[void, string]] {. + async: (raises: []), gcsafe + .} ## Wipe all persisted state for a channel. Called by `removeChannel` / ## `resetReliabilityManager`. Backends SHOULD execute atomically. ## Failure propagates to the caller — the caller asked us to confirm a ## disk wipe and we cannot silently lie. - setRetrievalHint*: proc( - msgId: SdsMessageID, hint: seq[byte] - ): Future[Result[void, string]] {.async: (raises: []), gcsafe.} - ## Record a retrieval hint for a message id. Called from - ## `getRecentHistoryEntries` when an application-supplied hint - ## provider returns a non-empty hint. Out-of-band from the - ## snapshot/history write path because hints are populated lazily - ## during read. Non-fatal on failure. - proc noOpPersistence*(): Persistence = ## Default backend: discards all writes, returns an empty snapshot on ## load. Used when no real backend is supplied (existing tests and @@ -87,8 +77,4 @@ proc noOpPersistence*(): Persistence = channelId: SdsChannelID ): Future[Result[void, string]] {.async: (raises: []).} = ok(), - setRetrievalHint: proc( - msgId: SdsMessageID, hint: seq[byte] - ): Future[Result[void, string]] {.async: (raises: []).} = - ok(), ) diff --git a/tests/in_memory_persistence.nim b/tests/in_memory_persistence.nim index 087545e..17a4775 100644 --- a/tests/in_memory_persistence.nim +++ b/tests/in_memory_persistence.nim @@ -6,7 +6,7 @@ ## ## `failingOps` injects backend failures. Op names match the `Persistence` ## field names: "saveChannelMeta", "updateHistory", "loadChannel", -## "dropChannel", "setRetrievalHint". +## "dropChannel". import std/[tables, sets] import chronos @@ -15,7 +15,6 @@ import sds type InMemoryStore* = ref object lamports*: Table[SdsChannelID, int64] log*: Table[SdsChannelID, OrderedTable[SdsMessageID, SdsMessage]] - hints*: Table[SdsMessageID, seq[byte]] outgoing*: Table[SdsChannelID, OrderedTable[SdsMessageID, UnacknowledgedMessage]] incoming*: Table[SdsChannelID, OrderedTable[SdsMessageID, IncomingMessage]] outgoingRepair*: Table[SdsChannelID, OrderedTable[SdsMessageID, OutgoingRepairEntry]] @@ -23,8 +22,7 @@ type InMemoryStore* = ref object dropChannelCalls*: Table[SdsChannelID, int] ## Per-channel counter; lets tests assert dropChannel is invoked ## exactly once per logical drop. - failingOps*: HashSet[string] - ## Op names that should return an injected backend error. + failingOps*: HashSet[string] ## Op names that should return an injected backend error. proc newInMemoryStore*(): InMemoryStore = InMemoryStore(failingOps: initHashSet[string]()) @@ -48,8 +46,7 @@ proc newInMemoryPersistence*(store: InMemoryStore): Persistence = store.outgoing[channelId][u.message.messageId] = u # Incoming buffer. - store.incoming[channelId] = - initOrderedTable[SdsMessageID, IncomingMessage]() + store.incoming[channelId] = initOrderedTable[SdsMessageID, IncomingMessage]() for m in meta.incomingBuffer: store.incoming[channelId][m.message.messageId] = m @@ -120,11 +117,4 @@ proc newInMemoryPersistence*(store: InMemoryStore): Persistence = store.dropChannelCalls[channelId] = store.dropChannelCalls.getOrDefault(channelId) + 1 ok(), - setRetrievalHint: proc( - msgId: SdsMessageID, hint: seq[byte] - ): Future[Result[void, string]] {.async: (raises: []).} = - if "setRetrievalHint" in store.failingOps: - return err("injected backend failure: setRetrievalHint") - store.hints[msgId] = hint - ok(), )