diff --git a/sds.nim b/sds.nim index 2a7674d..d859954 100644 --- a/sds.nim +++ b/sds.nim @@ -153,19 +153,18 @@ proc wrapOutgoingMessage*( # Phase 2B: in-memory append only; op-end trySaveMeta covers it. channel.bloomFilter.add(msg.messageId) - # The full SdsMessage carries senderId and content, so a single - # addToHistory replaces the old triple-write to messageHistory, - # messageCache, and messageSenders. + # addToHistory mutates in-memory state and queues the append/evict + # on the channel's pending-history queue; persistence happens + # ONCE at op end via tryUpdateHistory. (await rm.addToHistory(msg, channelId)).isOkOr: return err(error) - # Phase 2.5 (PLAN §2): one V2 meta snapshot at the end of the op - # captures lamport + outgoing buffer + repair buffer mutations as - # a single atomic blob. Non-fatal: failure is logged, the op still - # succeeds, the next op re-issues a fresh snapshot. Legacy - # fine-grained persistence calls above remain in place during the - # additive phase; they will be stripped in phase 2.B. + # Op end: one meta snapshot + one history flush, paired under the + # lock per the Persistence atomicity contract. tryUpdateHistory + # flushes the channel's pending queue (this op's mutations PLUS + # any leftovers from a prior failed write — R2 retry). await rm.trySaveMeta(channelId, channel) + await rm.tryUpdateHistory(channelId) return serializeMessage(msg) except CatchableError: @@ -182,6 +181,11 @@ proc wrapOutgoingMessage*( proc processIncomingBuffer( rm: ReliabilityManager, channelId: SdsChannelID ): Future[Result[void, ReliabilityError]] {.async: (raises: []).} = + ## Cascade-deliver any buffered messages whose dependencies are now met. + ## Each `addToHistory` call queues its append/evict on the channel's + ## pending-history queue; the *caller* (a public protocol op) issues + ## ONE `tryUpdateHistory` at op end to flush the whole cascade in a + ## single round-trip. try: await rm.lock.acquire() try: @@ -261,10 +265,11 @@ proc unwrapReceivedMessage*( channel.incomingRepairBuffer.del(msg.messageId) if msg.messageId in channel.messageHistory: - # Duplicate: no state change beyond the repair-buffer cleanup above. - # Per ANALYSIS_SNAPSHOT_SAVE_POINTS, still save (those buffer dels - # are mutations). Persist the meta and return. + # Duplicate: no history change. Still flush the meta (repair-buffer + # dels above are mutations) and the history queue (any pending + # entries from a prior failed write get retried here too). await rm.trySaveMeta(channelId, channel) + await rm.tryUpdateHistory(channelId) return ok((msg.content, @[], channelId)) channel.bloomFilter.add(msg.messageId) @@ -321,13 +326,12 @@ proc unwrapReceivedMessage*( (await rm.addToHistory(msg, channelId)).isOkOr: return err(error) # Unblock any buffered messages that were waiting on this one. - # Phase 2B: in-memory dep-set shrink only; op-end trySaveMeta and - # the subsequent processIncomingBuffer cascade (which is also - # in-memory only) leave the final state on the in-memory side, and - # the op-end trySaveMeta snapshots it. for pendingId, entry in channel.incomingBuffer: if msg.messageId in entry.missingDeps: channel.incomingBuffer[pendingId].missingDeps.excl(msg.messageId) + # Cascade — addToHistory calls within processIncomingBuffer queue + # their entries on the channel's pending-history queue, flushed + # by the single op-end tryUpdateHistory below. (await rm.processIncomingBuffer(channelId)).isOkOr: return err(error) if not rm.onMessageReady.isNil(): @@ -356,11 +360,11 @@ proc unwrapReceivedMessage*( # Phase 2B: in-memory insert only; op-end trySaveMeta covers it. channel.outgoingRepairBuffer[dep.messageId] = outEntry - # Phase 2.5: single V2 meta snapshot covers ALL three paths - # (deps-met-fresh, deps-met-buffered, missing-deps). Buffer mutations, - # lamport update, repair-buffer cleanup, and ack-driven outgoing buffer - # shrinkage all land atomically on disk. Non-fatal per PLAN §8. + # Op end: one meta snapshot + one history flush, paired under the + # lock. The flush is the single point where any cascade-driven + # appends/evicts hit disk (R2 queue absorbs failures). await rm.trySaveMeta(channelId, channel) + await rm.tryUpdateHistory(channelId) return ok((msg.content, missingDeps, channelId)) except CatchableError: @@ -394,11 +398,12 @@ proc markDependenciesMet*( (await rm.processIncomingBuffer(channelId)).isOkOr: return err(error) - # Phase 2.7: single V2 meta snapshot at op end. processIncomingBuffer - # may have cascaded through several unblocked deliveries; all those - # incoming/repair-buffer mutations land in this one save. + # Op end: one meta snapshot + one history flush, paired under the lock. + # The flush covers any cascade-driven appends/evicts queued during + # processIncomingBuffer. if channelId in rm.channels: await rm.trySaveMeta(channelId, rm.channels[channelId]) + await rm.tryUpdateHistory(channelId) return ok() except CatchableError: error "Failed to mark dependencies as met", @@ -599,6 +604,8 @@ proc resetReliabilityManager*( channel.incomingBuffer.clear() channel.outgoingRepairBuffer.clear() channel.incomingRepairBuffer.clear() + channel.pendingHistoryAppends.clear() + channel.pendingHistoryEvicts.clear() channel.bloomFilter = RollingBloomFilter.init( rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate ) diff --git a/sds/sds_utils.nim b/sds/sds_utils.nim index ac5829d..6459a4b 100644 --- a/sds/sds_utils.nim +++ b/sds/sds_utils.nim @@ -1,4 +1,4 @@ -import std/[times, tables, sequtils, hashes] +import std/[times, tables, sequtils, sets, hashes] import chronos, chronicles, results import ./rolling_bloom_filter import @@ -64,22 +64,95 @@ proc trySaveMeta*( warn "snapshot save failed; in-memory state authoritative, next op will retry", channelId = channelId, detail = res.error +proc queueHistoryAppend*(channel: ChannelContext, msgId: SdsMessageID) = + ## Push an append onto the pending history queue. Only the id is + ## stored — the full SdsMessage is looked up from `messageHistory` at + ## flush time (invariant: every queued id is present in messageHistory). + ## + ## Merge rule: **latest operation wins.** Cancels any pending evict for + ## the same id, then adds. Handles the evict-then-re-add sequence + ## correctly (e.g. SDS-R repair re-delivers a previously-evicted + ## message while the backend is unreachable). + channel.pendingHistoryEvicts.excl(msgId) + channel.pendingHistoryAppends.incl(msgId) + +proc queueHistoryEvict*(channel: ChannelContext, msgId: SdsMessageID) = + ## Push an evict onto the pending history queue. Merge rule symmetric + ## with `queueHistoryAppend`: cancels any pending append for the same + ## id (the just-evicted message no longer needs to be persisted as an + ## addition), then adds to the evict set. + channel.pendingHistoryAppends.excl(msgId) + channel.pendingHistoryEvicts.incl(msgId) + proc tryUpdateHistory*( - rm: ReliabilityManager, - channelId: SdsChannelID, - append: seq[SdsMessage], - evict: seq[SdsMessageID], + rm: ReliabilityManager, channelId: SdsChannelID ) {.async: (raises: []).} = - ## Best-effort history append/evict. Skips the call entirely when both - ## lists are empty (see HistoryUpdate contract). Non-fatal on error, - ## same rationale as `trySaveMeta`. - if append.len == 0 and evict.len == 0: - return - let update = HistoryUpdate(append: append, evict: evict) - let res = await rm.persistence.updateHistory(channelId, update) - if res.isErr: - warn "history update failed; in-memory log authoritative, next op will retry", - channelId = channelId, detail = res.error + ## Flush the channel's pending history queue to disk. + ## + ## The pending queue (`channel.pendingHistoryAppends` / + ## `pendingHistoryEvicts`) plays a DUAL role — and that's deliberate: + ## 1. **Per-op accumulator.** Every `addToHistory` call pushes its + ## mutation into this queue but does NOT persist. A protocol op + ## that invokes `addToHistory` N times (e.g. a + ## `processIncomingBuffer` cascade) leaves N entries queued and + ## issues exactly ONE `tryUpdateHistory` at op end — one + ## round-trip per op regardless of cascade depth. This fixes PR + ## #72 review comments #2 and #3. + ## 2. **R2 retry queue.** If the flush fails, the queue is NOT + ## cleared. The next op's `addToHistory` calls add to it; the + ## next op's `tryUpdateHistory` retries the merged batch. This + ## fixes PR #72 review comment #1 (delta loss). + ## + ## Both roles share the same data structure because they want the same + ## semantics: "merge everything pending into one batch and try to + ## flush". Failure is non-fatal at the FFI boundary (PLAN §8) — the + ## in-memory state is the source of truth. + ## + ## Callers MUST invoke this once at the end of every protocol op (even + ## when this op had no history changes) — otherwise a previously-failed + ## batch could sit on the queue indefinitely. + var channel: ChannelContext + try: + if channelId notin rm.channels: + return + channel = rm.channels[channelId] + except KeyError: + return # checked `in` above; unreachable, but tables can raise per spec + + if channel.pendingHistoryAppends.len == 0 and + channel.pendingHistoryEvicts.len == 0: + return # nothing to flush — no round-trip cost + + var batch = HistoryUpdate.init() + # Look up each queued id in messageHistory (source of truth). The + # invariant on pendingHistoryAppends guarantees the id is present; + # the defensive check below logs any violation rather than crashing. + for id in channel.pendingHistoryAppends: + try: + if id in channel.messageHistory: + batch.append.add(channel.messageHistory[id]) + else: + warn "queued append id missing from messageHistory; invariant violated, skipping", + channelId = channelId, msgId = id + except KeyError: + discard # unreachable — `in` was true + for id in channel.pendingHistoryEvicts: + batch.evict.add(id) + + let res = await rm.persistence.updateHistory(channelId, batch) + if res.isOk: + channel.pendingHistoryAppends.clear() + channel.pendingHistoryEvicts.clear() + else: + warn "history update failed; queued for retry on next op", + channelId = channelId, + pendingAppends = channel.pendingHistoryAppends.len, + pendingEvicts = channel.pendingHistoryEvicts.len, + detail = res.error + if channel.pendingHistoryAppends.len > rm.config.maxMessageHistory: + warn "pending history queue exceeds maxMessageHistory; backend may be stuck", + channelId = channelId, + pendingAppends = channel.pendingHistoryAppends.len proc dropChannelFromPersistence*( rm: ReliabilityManager, channelId: SdsChannelID @@ -119,6 +192,8 @@ proc cleanup*(rm: ReliabilityManager) {.async: (raises: []).} = channel.messageHistory.clear() channel.outgoingRepairBuffer.clear() channel.incomingRepairBuffer.clear() + channel.pendingHistoryAppends.clear() + channel.pendingHistoryEvicts.clear() rm.channels.clear() finally: rm.lock.release() @@ -142,29 +217,30 @@ proc cleanBloomFilter*( proc addToHistory*( rm: ReliabilityManager, msg: SdsMessage, channelId: SdsChannelID ): Future[Result[void, ReliabilityError]] {.async: (raises: []).} = - ## Inserts a delivered message into the channel's history map and evicts the - ## eldest entries when the bound is exceeded. The full SdsMessage is kept so - ## senderId is available for downstream causal-history population and the - ## bytes can be re-serialized on demand to answer SDS-R repair requests. - ## Persistence (phase 2B): mutations are batched into ONE V2 - ## `tryUpdateHistory` call at the end of this proc (append the new - ## message + evict whatever rolled past `maxMessageHistory`). Failure is - ## non-fatal: in-memory state is the source of truth, the next op's - ## history update re-synchronises disk. Legacy per-row `appendLogEntry` - ## / `removeLogEntry` calls are removed. + ## Inserts a delivered message into the channel's history map, evicts + ## the eldest entries past `maxMessageHistory`, and queues the resulting + ## append+evict on the channel's pending-history queue. Does NOT issue + ## a persistence call — the caller's op-end `tryUpdateHistory` flushes + ## the queue in one round-trip. + ## + ## A cascade of N unblocked messages (e.g. `processIncomingBuffer`) + ## therefore leaves N entries queued and triggers ONE persistence call + ## at op end, not N. Fixes PR #72 review #2/#3. + ## + ## Direct callers (tests, ad-hoc) that want the disk write to land + ## immediately should follow this with `await rm.tryUpdateHistory(channelId)`. try: if channelId in rm.channels: let channel = rm.channels[channelId] channel.messageHistory[msg.messageId] = msg - var evicted: seq[SdsMessageID] = @[] + queueHistoryAppend(channel, msg.messageId) while channel.messageHistory.len > rm.config.maxMessageHistory: var firstKey: SdsMessageID for k in channel.messageHistory.keys: firstKey = k break channel.messageHistory.del(firstKey) - evicted.add(firstKey) - await rm.tryUpdateHistory(channelId, @[msg], evicted) + queueHistoryEvict(channel, firstKey) ok() except CatchableError: error "Failed to add to history", @@ -434,6 +510,8 @@ proc removeChannel*( channel.messageHistory.clear() channel.outgoingRepairBuffer.clear() channel.incomingRepairBuffer.clear() + channel.pendingHistoryAppends.clear() + channel.pendingHistoryEvicts.clear() rm.channels.del(channelId) return ok() except CatchableError: diff --git a/sds/types/channel_context.nim b/sds/types/channel_context.nim index 3f7bee6..b5455e7 100644 --- a/sds/types/channel_context.nim +++ b/sds/types/channel_context.nim @@ -1,4 +1,4 @@ -import std/tables +import std/[sets, tables] import ./sds_message_id import ./sds_message import ./rolling_bloom_filter @@ -23,6 +23,31 @@ type ChannelContext* = ref object ## SDS-R buffers outgoingRepairBuffer*: Table[SdsMessageID, OutgoingRepairEntry] incomingRepairBuffer*: Table[SdsMessageID, IncomingRepairEntry] + ## R2 pending-write queue for history (see PLAN §8 + PR #72 review). + ## When `updateHistory` fails, the failed (append, evict) batch is parked + ## here and merged with the next op's batch on the next `tryUpdateHistory` + ## call. Cleared on successful flush. NOT persisted — runtime-only state; + ## on a crash the in-memory `messageHistory` is also lost and the next + ## `loadChannel` brings whatever made it to disk. + ## + ## INVARIANT (relied on by the flush): every id in `pendingHistoryAppends` + ## is also present in `messageHistory`. The full `SdsMessage` is NOT + ## stored here — it is looked up from `messageHistory` at flush time. + ## Storing only the id avoids the ~1 KB-per-entry duplication of + ## SdsMessage that an OrderedTable would carry. + pendingHistoryAppends*: OrderedSet[SdsMessageID] + ## Pending appends, in insertion order so the on-disk log stays + ## oldest-first across retries. + pendingHistoryEvicts*: HashSet[SdsMessageID] + ## Pending evictions. Set semantics — evicting the same id twice is a + ## no-op. + ## + ## Merge rule with `pendingHistoryAppends`: **latest operation wins.** + ## Queuing an append cancels any pending evict for the same id; + ## queuing an evict cancels any pending append. This handles the + ## "evict-then-re-add" sequence correctly (e.g. SDS-R repair + ## re-delivers a message that was previously evicted while the + ## backend was unreachable). proc new*(T: type ChannelContext, bloomFilter: RollingBloomFilter): T = return T( @@ -33,4 +58,6 @@ proc new*(T: type ChannelContext, bloomFilter: RollingBloomFilter): T = incomingBuffer: initTable[SdsMessageID, IncomingMessage](), outgoingRepairBuffer: initTable[SdsMessageID, OutgoingRepairEntry](), incomingRepairBuffer: initTable[SdsMessageID, IncomingRepairEntry](), + pendingHistoryAppends: initOrderedSet[SdsMessageID](), + pendingHistoryEvicts: initHashSet[SdsMessageID](), ) diff --git a/tests/test_persistence.nim b/tests/test_persistence.nim index bc62c07..613e174 100644 --- a/tests/test_persistence.nim +++ b/tests/test_persistence.nim @@ -72,6 +72,10 @@ suite "Persistence: write → restart → read-back": senderId = "alice", ) check (await rm1.addToHistory(msg, testChannel)).isOk() + # New design: addToHistory queues; tryUpdateHistory flushes. Tests + # that drive addToHistory directly must follow with an explicit flush + # (in production, the public protocol op issues the flush at op end). + await rm1.tryUpdateHistory(testChannel) check store.log[testChannel].len == 1 await rm1.cleanup() @@ -274,6 +278,7 @@ suite "Persistence: write → restart → read-back": senderId = "alice", ) check (await rm1.addToHistory(m, testChannel)).isOk() + await rm1.tryUpdateHistory(testChannel) check store.log[testChannel].len == 3 check "m1" notin store.log[testChannel] check "m2" notin store.log[testChannel] @@ -299,6 +304,7 @@ suite "Persistence: write → restart → read-back": senderId = "alice", ) check (await rm2.addToHistory(m6, testChannel)).isOk() + await rm2.tryUpdateHistory(testChannel) check "m3" notin store.log[testChannel] check "m6" in store.log[testChannel] await rm2.cleanup() @@ -416,6 +422,124 @@ suite "Persistence: failure policy": check res.isOk() check rm.channels[testChannel].messageHistory.len == 1 + asyncTest "updateHistory failure is retried via R2 pending-write queue": + # Fix for PR #72 review comment #1: a failed history write must not + # silently drop the delta. The pending-write queue parks failed + # entries and retries them on the next op end. Once the backend + # recovers, the disk catches up automatically — no caller action + # needed, no err surfaced. + let store = newInMemoryStore() + let rm = newReliabilityManager( + participantId = "alice", persistence = newInMemoryPersistence(store) + ) + .get() + check (await rm.ensureChannel(testChannel)).isOk() + + # Failure 1: send m1 while updateHistory is broken. + store.failingOps.incl("updateHistory") + discard await rm.wrapOutgoingMessage(@[byte(1)], "m1", testChannel) + # In-memory state is correct; disk has no log entry for m1 yet. + check rm.channels[testChannel].messageHistory.len == 1 + check testChannel notin store.log or "m1" notin store.log[testChannel] + # Pending queue should be holding m1 for retry. + check rm.channels[testChannel].pendingHistoryAppends.len == 1 + check "m1" in rm.channels[testChannel].pendingHistoryAppends + + # Failure 2: send m2 while still broken. Pending should now hold both. + discard await rm.wrapOutgoingMessage(@[byte(2)], "m2", testChannel) + check rm.channels[testChannel].pendingHistoryAppends.len == 2 + check "m1" in rm.channels[testChannel].pendingHistoryAppends + check "m2" in rm.channels[testChannel].pendingHistoryAppends + # Still nothing on disk. + check testChannel notin store.log or store.log[testChannel].len == 0 + + # Recovery: clear the backend failure, send m3. The op-end flush + # should drain ALL pending entries plus the new one in a single call. + store.failingOps.excl("updateHistory") + discard await rm.wrapOutgoingMessage(@[byte(3)], "m3", testChannel) + check rm.channels[testChannel].pendingHistoryAppends.len == 0 + check "m1" in store.log[testChannel] + check "m2" in store.log[testChannel] + check "m3" in store.log[testChannel] + + asyncTest "evict-then-re-add merge rule preserves the re-added message on disk": + # Regression: with the original "evict-wins" merge rule, a message + # re-added (e.g. via SDS-R repair) after being evicted during a + # backend outage would have its append silently dropped because the + # id was still in pendingHistoryEvicts. The "latest-wins" rule fixes + # this — the re-add cancels the pending evict. + let store = newInMemoryStore() + var smallCfg = defaultConfig() + smallCfg.maxMessageHistory = 2 + smallCfg.bloomFilterCapacity = 2 + let rm = newReliabilityManager( + participantId = "alice", + config = smallCfg, + persistence = newInMemoryPersistence(store), + ) + .get() + check (await rm.ensureChannel(testChannel)).isOk() + + proc mkMsg(id: string, ts: int64): SdsMessage = + SdsMessage.init( + messageId = id, + lamportTimestamp = ts, + causalHistory = @[], + channelId = testChannel, + content = @[byte(ts)], + bloomFilter = @[], + senderId = "alice", + ) + + # Break the backend, then fill the channel past maxMessageHistory so + # m1 gets evicted while we have no successful flush yet. + store.failingOps.incl("updateHistory") + check (await rm.addToHistory(mkMsg("m1", 1), testChannel)).isOk() + await rm.tryUpdateHistory(testChannel) # fails — m1 queued + check (await rm.addToHistory(mkMsg("m2", 2), testChannel)).isOk() + check (await rm.addToHistory(mkMsg("m3", 3), testChannel)).isOk() + # m1 evicted by FIFO; pending should now have m2,m3 as appends and m1 as evict. + check "m1" notin rm.channels[testChannel].messageHistory + check "m1" in rm.channels[testChannel].pendingHistoryEvicts + check "m1" notin rm.channels[testChannel].pendingHistoryAppends + + # SDS-R-style re-delivery of m1. With latest-wins, this MUST cancel + # the pending evict and re-queue the append. + check (await rm.addToHistory(mkMsg("m1", 4), testChannel)).isOk() + check "m1" in rm.channels[testChannel].messageHistory + check "m1" notin rm.channels[testChannel].pendingHistoryEvicts + check "m1" in rm.channels[testChannel].pendingHistoryAppends + + # Recover and flush. m1 must land on disk. + store.failingOps.excl("updateHistory") + await rm.tryUpdateHistory(testChannel) + check "m1" in store.log[testChannel] + + asyncTest "pending queue survives idle ops (flush on next op without history changes)": + # Even if the next op makes no history changes of its own, it must + # still flush the pending queue at op end — otherwise a failed write + # could sit indefinitely if the application only ever does + # mark-deps-met-style ops after a failure. + let store = newInMemoryStore() + let rm = newReliabilityManager( + participantId = "alice", persistence = newInMemoryPersistence(store) + ) + .get() + check (await rm.ensureChannel(testChannel)).isOk() + + # Stage a pending entry by failing one send. + store.failingOps.incl("updateHistory") + discard await rm.wrapOutgoingMessage(@[byte(1)], "m1", testChannel) + check rm.channels[testChannel].pendingHistoryAppends.len == 1 + + # Now clear the failure and drive a markDependenciesMet on a no-op + # input — it has no history changes of its own but its op-end flush + # must still retry the queue. + store.failingOps.excl("updateHistory") + check (await rm.markDependenciesMet(@["nonexistent"], testChannel)).isOk() + check rm.channels[testChannel].pendingHistoryAppends.len == 0 + check "m1" in store.log[testChannel] + asyncTest "dropChannel failure during removeChannel surfaces as rePersistenceError": # Durability is the semantic intent of removeChannel — the caller # asked us to confirm a disk wipe. We cannot silently lie. So this op