mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-07-02 13:59:41 +00:00
refactor(persistence): R2 pending-write queue + per-op accumulator (PR #72 review fix)
Addresses all three substantive review findings on PR #72 in one structural change: fold the per-op accumulator and the R2 retry buffer into a single queue on `ChannelContext`, flushed once at op end. Changes: - sds/types/channel_context.nim: add `pendingHistoryAppends` (`OrderedSet[SdsMessageID]`) and `pendingHistoryEvicts` (`HashSet[SdsMessageID]`) fields. Only ids are stored — the full SdsMessage is looked up from `messageHistory` at flush time. Documented invariant: every id in pendingHistoryAppends is also in messageHistory, upheld by the merge rule. - sds/sds_utils.nim: * `queueHistoryAppend(channel, msgId)` / `queueHistoryEvict(channel, msgId)` — "latest-wins" merge: append cancels any pending evict and vice versa. Symmetric, simple, handles the evict-then-re-add sequence correctly (SDS-R repair re-delivering an evicted message while the backend is unreachable). * `tryUpdateHistory(rm, channelId)` — no more list params; flushes the channel's pending queue. Dual role: per-op accumulator (multiple `addToHistory` calls within one op queue together and flush as one round-trip) AND R2 retry buffer (a failed flush leaves the queue populated for the next op to retry). * `addToHistory` queues via the helpers; does not call persistence. * Pending queue cleared on `cleanup` and `removeChannel`. - sds.nim: * `processIncomingBuffer` returns to its single-arg signature — the queue lives on the channel, no parameter threading needed. * `wrapOutgoingMessage`, `unwrapReceivedMessage` (all three paths), `markDependenciesMet` issue exactly one `trySaveMeta` + `tryUpdateHistory` pair at op end, under the lock, with no intervening `await`-of-other-work. Matches the Persistence atomicity contract documented in `sds/types/persistence.nim`. * Pending queue cleared in `resetReliabilityManager`. - tests/test_persistence.nim: * Direct `addToHistory` callers (state-survival setup) now follow with explicit `tryUpdateHistory(channelId)` to flush. Reflects the production op-end flush pattern. * New: `updateHistory failure is retried via R2 pending-write queue` — verifies that two failed sends leave both messages on the queue, and a third successful send drains the whole queue in one call. * New: `pending queue survives idle ops` — verifies that an op with no history changes of its own still flushes a previously-failed batch at op end. * New: `evict-then-re-add merge rule preserves the re-added message on disk` — regression for the "latest-wins" merge rule. The original "evict-wins" rule would silently drop the re-add and leave the message permanently absent from disk; this test would fail under that rule and passes under the corrected one. Resolves PR #72 review comments: - #1 (delta loss on failed updateHistory) — R2 retry queue. - #2 (cascade chattiness — N updateHistory calls per op) — queue collects cascaded entries, flushed as one batch. - #3 (atomicity contract mismatch) — implementation now matches the documented "saveChannelMeta then updateHistory back-to-back" pairing. Test summary: 50 tests pass (47 prior + 3 new R2/merge-rule tests). FFI dylib (`nimble libsdsDynamicMac`, refc + threads:on): clean. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
f058594270
commit
abc36f61aa
53
sds.nim
53
sds.nim
@ -153,19 +153,18 @@ proc wrapOutgoingMessage*(
|
||||
# Phase 2B: in-memory append only; op-end trySaveMeta covers it.
|
||||
|
||||
channel.bloomFilter.add(msg.messageId)
|
||||
# The full SdsMessage carries senderId and content, so a single
|
||||
# addToHistory replaces the old triple-write to messageHistory,
|
||||
# messageCache, and messageSenders.
|
||||
# addToHistory mutates in-memory state and queues the append/evict
|
||||
# on the channel's pending-history queue; persistence happens
|
||||
# ONCE at op end via tryUpdateHistory.
|
||||
(await rm.addToHistory(msg, channelId)).isOkOr:
|
||||
return err(error)
|
||||
|
||||
# Phase 2.5 (PLAN §2): one V2 meta snapshot at the end of the op
|
||||
# captures lamport + outgoing buffer + repair buffer mutations as
|
||||
# a single atomic blob. Non-fatal: failure is logged, the op still
|
||||
# succeeds, the next op re-issues a fresh snapshot. Legacy
|
||||
# fine-grained persistence calls above remain in place during the
|
||||
# additive phase; they will be stripped in phase 2.B.
|
||||
# Op end: one meta snapshot + one history flush, paired under the
|
||||
# lock per the Persistence atomicity contract. tryUpdateHistory
|
||||
# flushes the channel's pending queue (this op's mutations PLUS
|
||||
# any leftovers from a prior failed write — R2 retry).
|
||||
await rm.trySaveMeta(channelId, channel)
|
||||
await rm.tryUpdateHistory(channelId)
|
||||
|
||||
return serializeMessage(msg)
|
||||
except CatchableError:
|
||||
@ -182,6 +181,11 @@ proc wrapOutgoingMessage*(
|
||||
proc processIncomingBuffer(
|
||||
rm: ReliabilityManager, channelId: SdsChannelID
|
||||
): Future[Result[void, ReliabilityError]] {.async: (raises: []).} =
|
||||
## Cascade-deliver any buffered messages whose dependencies are now met.
|
||||
## Each `addToHistory` call queues its append/evict on the channel's
|
||||
## pending-history queue; the *caller* (a public protocol op) issues
|
||||
## ONE `tryUpdateHistory` at op end to flush the whole cascade in a
|
||||
## single round-trip.
|
||||
try:
|
||||
await rm.lock.acquire()
|
||||
try:
|
||||
@ -261,10 +265,11 @@ proc unwrapReceivedMessage*(
|
||||
channel.incomingRepairBuffer.del(msg.messageId)
|
||||
|
||||
if msg.messageId in channel.messageHistory:
|
||||
# Duplicate: no state change beyond the repair-buffer cleanup above.
|
||||
# Per ANALYSIS_SNAPSHOT_SAVE_POINTS, still save (those buffer dels
|
||||
# are mutations). Persist the meta and return.
|
||||
# Duplicate: no history change. Still flush the meta (repair-buffer
|
||||
# dels above are mutations) and the history queue (any pending
|
||||
# entries from a prior failed write get retried here too).
|
||||
await rm.trySaveMeta(channelId, channel)
|
||||
await rm.tryUpdateHistory(channelId)
|
||||
return ok((msg.content, @[], channelId))
|
||||
|
||||
channel.bloomFilter.add(msg.messageId)
|
||||
@ -321,13 +326,12 @@ proc unwrapReceivedMessage*(
|
||||
(await rm.addToHistory(msg, channelId)).isOkOr:
|
||||
return err(error)
|
||||
# Unblock any buffered messages that were waiting on this one.
|
||||
# Phase 2B: in-memory dep-set shrink only; op-end trySaveMeta and
|
||||
# the subsequent processIncomingBuffer cascade (which is also
|
||||
# in-memory only) leave the final state on the in-memory side, and
|
||||
# the op-end trySaveMeta snapshots it.
|
||||
for pendingId, entry in channel.incomingBuffer:
|
||||
if msg.messageId in entry.missingDeps:
|
||||
channel.incomingBuffer[pendingId].missingDeps.excl(msg.messageId)
|
||||
# Cascade — addToHistory calls within processIncomingBuffer queue
|
||||
# their entries on the channel's pending-history queue, flushed
|
||||
# by the single op-end tryUpdateHistory below.
|
||||
(await rm.processIncomingBuffer(channelId)).isOkOr:
|
||||
return err(error)
|
||||
if not rm.onMessageReady.isNil():
|
||||
@ -356,11 +360,11 @@ proc unwrapReceivedMessage*(
|
||||
# Phase 2B: in-memory insert only; op-end trySaveMeta covers it.
|
||||
channel.outgoingRepairBuffer[dep.messageId] = outEntry
|
||||
|
||||
# Phase 2.5: single V2 meta snapshot covers ALL three paths
|
||||
# (deps-met-fresh, deps-met-buffered, missing-deps). Buffer mutations,
|
||||
# lamport update, repair-buffer cleanup, and ack-driven outgoing buffer
|
||||
# shrinkage all land atomically on disk. Non-fatal per PLAN §8.
|
||||
# Op end: one meta snapshot + one history flush, paired under the
|
||||
# lock. The flush is the single point where any cascade-driven
|
||||
# appends/evicts hit disk (R2 queue absorbs failures).
|
||||
await rm.trySaveMeta(channelId, channel)
|
||||
await rm.tryUpdateHistory(channelId)
|
||||
|
||||
return ok((msg.content, missingDeps, channelId))
|
||||
except CatchableError:
|
||||
@ -394,11 +398,12 @@ proc markDependenciesMet*(
|
||||
(await rm.processIncomingBuffer(channelId)).isOkOr:
|
||||
return err(error)
|
||||
|
||||
# Phase 2.7: single V2 meta snapshot at op end. processIncomingBuffer
|
||||
# may have cascaded through several unblocked deliveries; all those
|
||||
# incoming/repair-buffer mutations land in this one save.
|
||||
# Op end: one meta snapshot + one history flush, paired under the lock.
|
||||
# The flush covers any cascade-driven appends/evicts queued during
|
||||
# processIncomingBuffer.
|
||||
if channelId in rm.channels:
|
||||
await rm.trySaveMeta(channelId, rm.channels[channelId])
|
||||
await rm.tryUpdateHistory(channelId)
|
||||
return ok()
|
||||
except CatchableError:
|
||||
error "Failed to mark dependencies as met",
|
||||
@ -599,6 +604,8 @@ proc resetReliabilityManager*(
|
||||
channel.incomingBuffer.clear()
|
||||
channel.outgoingRepairBuffer.clear()
|
||||
channel.incomingRepairBuffer.clear()
|
||||
channel.pendingHistoryAppends.clear()
|
||||
channel.pendingHistoryEvicts.clear()
|
||||
channel.bloomFilter = RollingBloomFilter.init(
|
||||
rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate
|
||||
)
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import std/[times, tables, sequtils, hashes]
|
||||
import std/[times, tables, sequtils, sets, hashes]
|
||||
import chronos, chronicles, results
|
||||
import ./rolling_bloom_filter
|
||||
import
|
||||
@ -64,22 +64,95 @@ proc trySaveMeta*(
|
||||
warn "snapshot save failed; in-memory state authoritative, next op will retry",
|
||||
channelId = channelId, detail = res.error
|
||||
|
||||
proc queueHistoryAppend*(channel: ChannelContext, msgId: SdsMessageID) =
|
||||
## Push an append onto the pending history queue. Only the id is
|
||||
## stored — the full SdsMessage is looked up from `messageHistory` at
|
||||
## flush time (invariant: every queued id is present in messageHistory).
|
||||
##
|
||||
## Merge rule: **latest operation wins.** Cancels any pending evict for
|
||||
## the same id, then adds. Handles the evict-then-re-add sequence
|
||||
## correctly (e.g. SDS-R repair re-delivers a previously-evicted
|
||||
## message while the backend is unreachable).
|
||||
channel.pendingHistoryEvicts.excl(msgId)
|
||||
channel.pendingHistoryAppends.incl(msgId)
|
||||
|
||||
proc queueHistoryEvict*(channel: ChannelContext, msgId: SdsMessageID) =
|
||||
## Push an evict onto the pending history queue. Merge rule symmetric
|
||||
## with `queueHistoryAppend`: cancels any pending append for the same
|
||||
## id (the just-evicted message no longer needs to be persisted as an
|
||||
## addition), then adds to the evict set.
|
||||
channel.pendingHistoryAppends.excl(msgId)
|
||||
channel.pendingHistoryEvicts.incl(msgId)
|
||||
|
||||
proc tryUpdateHistory*(
|
||||
rm: ReliabilityManager,
|
||||
channelId: SdsChannelID,
|
||||
append: seq[SdsMessage],
|
||||
evict: seq[SdsMessageID],
|
||||
rm: ReliabilityManager, channelId: SdsChannelID
|
||||
) {.async: (raises: []).} =
|
||||
## Best-effort history append/evict. Skips the call entirely when both
|
||||
## lists are empty (see HistoryUpdate contract). Non-fatal on error,
|
||||
## same rationale as `trySaveMeta`.
|
||||
if append.len == 0 and evict.len == 0:
|
||||
return
|
||||
let update = HistoryUpdate(append: append, evict: evict)
|
||||
let res = await rm.persistence.updateHistory(channelId, update)
|
||||
if res.isErr:
|
||||
warn "history update failed; in-memory log authoritative, next op will retry",
|
||||
channelId = channelId, detail = res.error
|
||||
## Flush the channel's pending history queue to disk.
|
||||
##
|
||||
## The pending queue (`channel.pendingHistoryAppends` /
|
||||
## `pendingHistoryEvicts`) plays a DUAL role — and that's deliberate:
|
||||
## 1. **Per-op accumulator.** Every `addToHistory` call pushes its
|
||||
## mutation into this queue but does NOT persist. A protocol op
|
||||
## that invokes `addToHistory` N times (e.g. a
|
||||
## `processIncomingBuffer` cascade) leaves N entries queued and
|
||||
## issues exactly ONE `tryUpdateHistory` at op end — one
|
||||
## round-trip per op regardless of cascade depth. This fixes PR
|
||||
## #72 review comments #2 and #3.
|
||||
## 2. **R2 retry queue.** If the flush fails, the queue is NOT
|
||||
## cleared. The next op's `addToHistory` calls add to it; the
|
||||
## next op's `tryUpdateHistory` retries the merged batch. This
|
||||
## fixes PR #72 review comment #1 (delta loss).
|
||||
##
|
||||
## Both roles share the same data structure because they want the same
|
||||
## semantics: "merge everything pending into one batch and try to
|
||||
## flush". Failure is non-fatal at the FFI boundary (PLAN §8) — the
|
||||
## in-memory state is the source of truth.
|
||||
##
|
||||
## Callers MUST invoke this once at the end of every protocol op (even
|
||||
## when this op had no history changes) — otherwise a previously-failed
|
||||
## batch could sit on the queue indefinitely.
|
||||
var channel: ChannelContext
|
||||
try:
|
||||
if channelId notin rm.channels:
|
||||
return
|
||||
channel = rm.channels[channelId]
|
||||
except KeyError:
|
||||
return # checked `in` above; unreachable, but tables can raise per spec
|
||||
|
||||
if channel.pendingHistoryAppends.len == 0 and
|
||||
channel.pendingHistoryEvicts.len == 0:
|
||||
return # nothing to flush — no round-trip cost
|
||||
|
||||
var batch = HistoryUpdate.init()
|
||||
# Look up each queued id in messageHistory (source of truth). The
|
||||
# invariant on pendingHistoryAppends guarantees the id is present;
|
||||
# the defensive check below logs any violation rather than crashing.
|
||||
for id in channel.pendingHistoryAppends:
|
||||
try:
|
||||
if id in channel.messageHistory:
|
||||
batch.append.add(channel.messageHistory[id])
|
||||
else:
|
||||
warn "queued append id missing from messageHistory; invariant violated, skipping",
|
||||
channelId = channelId, msgId = id
|
||||
except KeyError:
|
||||
discard # unreachable — `in` was true
|
||||
for id in channel.pendingHistoryEvicts:
|
||||
batch.evict.add(id)
|
||||
|
||||
let res = await rm.persistence.updateHistory(channelId, batch)
|
||||
if res.isOk:
|
||||
channel.pendingHistoryAppends.clear()
|
||||
channel.pendingHistoryEvicts.clear()
|
||||
else:
|
||||
warn "history update failed; queued for retry on next op",
|
||||
channelId = channelId,
|
||||
pendingAppends = channel.pendingHistoryAppends.len,
|
||||
pendingEvicts = channel.pendingHistoryEvicts.len,
|
||||
detail = res.error
|
||||
if channel.pendingHistoryAppends.len > rm.config.maxMessageHistory:
|
||||
warn "pending history queue exceeds maxMessageHistory; backend may be stuck",
|
||||
channelId = channelId,
|
||||
pendingAppends = channel.pendingHistoryAppends.len
|
||||
|
||||
proc dropChannelFromPersistence*(
|
||||
rm: ReliabilityManager, channelId: SdsChannelID
|
||||
@ -119,6 +192,8 @@ proc cleanup*(rm: ReliabilityManager) {.async: (raises: []).} =
|
||||
channel.messageHistory.clear()
|
||||
channel.outgoingRepairBuffer.clear()
|
||||
channel.incomingRepairBuffer.clear()
|
||||
channel.pendingHistoryAppends.clear()
|
||||
channel.pendingHistoryEvicts.clear()
|
||||
rm.channels.clear()
|
||||
finally:
|
||||
rm.lock.release()
|
||||
@ -142,29 +217,30 @@ proc cleanBloomFilter*(
|
||||
proc addToHistory*(
|
||||
rm: ReliabilityManager, msg: SdsMessage, channelId: SdsChannelID
|
||||
): Future[Result[void, ReliabilityError]] {.async: (raises: []).} =
|
||||
## Inserts a delivered message into the channel's history map and evicts the
|
||||
## eldest entries when the bound is exceeded. The full SdsMessage is kept so
|
||||
## senderId is available for downstream causal-history population and the
|
||||
## bytes can be re-serialized on demand to answer SDS-R repair requests.
|
||||
## Persistence (phase 2B): mutations are batched into ONE V2
|
||||
## `tryUpdateHistory` call at the end of this proc (append the new
|
||||
## message + evict whatever rolled past `maxMessageHistory`). Failure is
|
||||
## non-fatal: in-memory state is the source of truth, the next op's
|
||||
## history update re-synchronises disk. Legacy per-row `appendLogEntry`
|
||||
## / `removeLogEntry` calls are removed.
|
||||
## Inserts a delivered message into the channel's history map, evicts
|
||||
## the eldest entries past `maxMessageHistory`, and queues the resulting
|
||||
## append+evict on the channel's pending-history queue. Does NOT issue
|
||||
## a persistence call — the caller's op-end `tryUpdateHistory` flushes
|
||||
## the queue in one round-trip.
|
||||
##
|
||||
## A cascade of N unblocked messages (e.g. `processIncomingBuffer`)
|
||||
## therefore leaves N entries queued and triggers ONE persistence call
|
||||
## at op end, not N. Fixes PR #72 review #2/#3.
|
||||
##
|
||||
## Direct callers (tests, ad-hoc) that want the disk write to land
|
||||
## immediately should follow this with `await rm.tryUpdateHistory(channelId)`.
|
||||
try:
|
||||
if channelId in rm.channels:
|
||||
let channel = rm.channels[channelId]
|
||||
channel.messageHistory[msg.messageId] = msg
|
||||
var evicted: seq[SdsMessageID] = @[]
|
||||
queueHistoryAppend(channel, msg.messageId)
|
||||
while channel.messageHistory.len > rm.config.maxMessageHistory:
|
||||
var firstKey: SdsMessageID
|
||||
for k in channel.messageHistory.keys:
|
||||
firstKey = k
|
||||
break
|
||||
channel.messageHistory.del(firstKey)
|
||||
evicted.add(firstKey)
|
||||
await rm.tryUpdateHistory(channelId, @[msg], evicted)
|
||||
queueHistoryEvict(channel, firstKey)
|
||||
ok()
|
||||
except CatchableError:
|
||||
error "Failed to add to history",
|
||||
@ -434,6 +510,8 @@ proc removeChannel*(
|
||||
channel.messageHistory.clear()
|
||||
channel.outgoingRepairBuffer.clear()
|
||||
channel.incomingRepairBuffer.clear()
|
||||
channel.pendingHistoryAppends.clear()
|
||||
channel.pendingHistoryEvicts.clear()
|
||||
rm.channels.del(channelId)
|
||||
return ok()
|
||||
except CatchableError:
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import std/tables
|
||||
import std/[sets, tables]
|
||||
import ./sds_message_id
|
||||
import ./sds_message
|
||||
import ./rolling_bloom_filter
|
||||
@ -23,6 +23,31 @@ type ChannelContext* = ref object
|
||||
## SDS-R buffers
|
||||
outgoingRepairBuffer*: Table[SdsMessageID, OutgoingRepairEntry]
|
||||
incomingRepairBuffer*: Table[SdsMessageID, IncomingRepairEntry]
|
||||
## R2 pending-write queue for history (see PLAN §8 + PR #72 review).
|
||||
## When `updateHistory` fails, the failed (append, evict) batch is parked
|
||||
## here and merged with the next op's batch on the next `tryUpdateHistory`
|
||||
## call. Cleared on successful flush. NOT persisted — runtime-only state;
|
||||
## on a crash the in-memory `messageHistory` is also lost and the next
|
||||
## `loadChannel` brings whatever made it to disk.
|
||||
##
|
||||
## INVARIANT (relied on by the flush): every id in `pendingHistoryAppends`
|
||||
## is also present in `messageHistory`. The full `SdsMessage` is NOT
|
||||
## stored here — it is looked up from `messageHistory` at flush time.
|
||||
## Storing only the id avoids the ~1 KB-per-entry duplication of
|
||||
## SdsMessage that an OrderedTable would carry.
|
||||
pendingHistoryAppends*: OrderedSet[SdsMessageID]
|
||||
## Pending appends, in insertion order so the on-disk log stays
|
||||
## oldest-first across retries.
|
||||
pendingHistoryEvicts*: HashSet[SdsMessageID]
|
||||
## Pending evictions. Set semantics — evicting the same id twice is a
|
||||
## no-op.
|
||||
##
|
||||
## Merge rule with `pendingHistoryAppends`: **latest operation wins.**
|
||||
## Queuing an append cancels any pending evict for the same id;
|
||||
## queuing an evict cancels any pending append. This handles the
|
||||
## "evict-then-re-add" sequence correctly (e.g. SDS-R repair
|
||||
## re-delivers a message that was previously evicted while the
|
||||
## backend was unreachable).
|
||||
|
||||
proc new*(T: type ChannelContext, bloomFilter: RollingBloomFilter): T =
|
||||
return T(
|
||||
@ -33,4 +58,6 @@ proc new*(T: type ChannelContext, bloomFilter: RollingBloomFilter): T =
|
||||
incomingBuffer: initTable[SdsMessageID, IncomingMessage](),
|
||||
outgoingRepairBuffer: initTable[SdsMessageID, OutgoingRepairEntry](),
|
||||
incomingRepairBuffer: initTable[SdsMessageID, IncomingRepairEntry](),
|
||||
pendingHistoryAppends: initOrderedSet[SdsMessageID](),
|
||||
pendingHistoryEvicts: initHashSet[SdsMessageID](),
|
||||
)
|
||||
|
||||
@ -72,6 +72,10 @@ suite "Persistence: write → restart → read-back":
|
||||
senderId = "alice",
|
||||
)
|
||||
check (await rm1.addToHistory(msg, testChannel)).isOk()
|
||||
# New design: addToHistory queues; tryUpdateHistory flushes. Tests
|
||||
# that drive addToHistory directly must follow with an explicit flush
|
||||
# (in production, the public protocol op issues the flush at op end).
|
||||
await rm1.tryUpdateHistory(testChannel)
|
||||
check store.log[testChannel].len == 1
|
||||
await rm1.cleanup()
|
||||
|
||||
@ -274,6 +278,7 @@ suite "Persistence: write → restart → read-back":
|
||||
senderId = "alice",
|
||||
)
|
||||
check (await rm1.addToHistory(m, testChannel)).isOk()
|
||||
await rm1.tryUpdateHistory(testChannel)
|
||||
check store.log[testChannel].len == 3
|
||||
check "m1" notin store.log[testChannel]
|
||||
check "m2" notin store.log[testChannel]
|
||||
@ -299,6 +304,7 @@ suite "Persistence: write → restart → read-back":
|
||||
senderId = "alice",
|
||||
)
|
||||
check (await rm2.addToHistory(m6, testChannel)).isOk()
|
||||
await rm2.tryUpdateHistory(testChannel)
|
||||
check "m3" notin store.log[testChannel]
|
||||
check "m6" in store.log[testChannel]
|
||||
await rm2.cleanup()
|
||||
@ -416,6 +422,124 @@ suite "Persistence: failure policy":
|
||||
check res.isOk()
|
||||
check rm.channels[testChannel].messageHistory.len == 1
|
||||
|
||||
asyncTest "updateHistory failure is retried via R2 pending-write queue":
|
||||
# Fix for PR #72 review comment #1: a failed history write must not
|
||||
# silently drop the delta. The pending-write queue parks failed
|
||||
# entries and retries them on the next op end. Once the backend
|
||||
# recovers, the disk catches up automatically — no caller action
|
||||
# needed, no err surfaced.
|
||||
let store = newInMemoryStore()
|
||||
let rm = newReliabilityManager(
|
||||
participantId = "alice", persistence = newInMemoryPersistence(store)
|
||||
)
|
||||
.get()
|
||||
check (await rm.ensureChannel(testChannel)).isOk()
|
||||
|
||||
# Failure 1: send m1 while updateHistory is broken.
|
||||
store.failingOps.incl("updateHistory")
|
||||
discard await rm.wrapOutgoingMessage(@[byte(1)], "m1", testChannel)
|
||||
# In-memory state is correct; disk has no log entry for m1 yet.
|
||||
check rm.channels[testChannel].messageHistory.len == 1
|
||||
check testChannel notin store.log or "m1" notin store.log[testChannel]
|
||||
# Pending queue should be holding m1 for retry.
|
||||
check rm.channels[testChannel].pendingHistoryAppends.len == 1
|
||||
check "m1" in rm.channels[testChannel].pendingHistoryAppends
|
||||
|
||||
# Failure 2: send m2 while still broken. Pending should now hold both.
|
||||
discard await rm.wrapOutgoingMessage(@[byte(2)], "m2", testChannel)
|
||||
check rm.channels[testChannel].pendingHistoryAppends.len == 2
|
||||
check "m1" in rm.channels[testChannel].pendingHistoryAppends
|
||||
check "m2" in rm.channels[testChannel].pendingHistoryAppends
|
||||
# Still nothing on disk.
|
||||
check testChannel notin store.log or store.log[testChannel].len == 0
|
||||
|
||||
# Recovery: clear the backend failure, send m3. The op-end flush
|
||||
# should drain ALL pending entries plus the new one in a single call.
|
||||
store.failingOps.excl("updateHistory")
|
||||
discard await rm.wrapOutgoingMessage(@[byte(3)], "m3", testChannel)
|
||||
check rm.channels[testChannel].pendingHistoryAppends.len == 0
|
||||
check "m1" in store.log[testChannel]
|
||||
check "m2" in store.log[testChannel]
|
||||
check "m3" in store.log[testChannel]
|
||||
|
||||
asyncTest "evict-then-re-add merge rule preserves the re-added message on disk":
|
||||
# Regression: with the original "evict-wins" merge rule, a message
|
||||
# re-added (e.g. via SDS-R repair) after being evicted during a
|
||||
# backend outage would have its append silently dropped because the
|
||||
# id was still in pendingHistoryEvicts. The "latest-wins" rule fixes
|
||||
# this — the re-add cancels the pending evict.
|
||||
let store = newInMemoryStore()
|
||||
var smallCfg = defaultConfig()
|
||||
smallCfg.maxMessageHistory = 2
|
||||
smallCfg.bloomFilterCapacity = 2
|
||||
let rm = newReliabilityManager(
|
||||
participantId = "alice",
|
||||
config = smallCfg,
|
||||
persistence = newInMemoryPersistence(store),
|
||||
)
|
||||
.get()
|
||||
check (await rm.ensureChannel(testChannel)).isOk()
|
||||
|
||||
proc mkMsg(id: string, ts: int64): SdsMessage =
|
||||
SdsMessage.init(
|
||||
messageId = id,
|
||||
lamportTimestamp = ts,
|
||||
causalHistory = @[],
|
||||
channelId = testChannel,
|
||||
content = @[byte(ts)],
|
||||
bloomFilter = @[],
|
||||
senderId = "alice",
|
||||
)
|
||||
|
||||
# Break the backend, then fill the channel past maxMessageHistory so
|
||||
# m1 gets evicted while we have no successful flush yet.
|
||||
store.failingOps.incl("updateHistory")
|
||||
check (await rm.addToHistory(mkMsg("m1", 1), testChannel)).isOk()
|
||||
await rm.tryUpdateHistory(testChannel) # fails — m1 queued
|
||||
check (await rm.addToHistory(mkMsg("m2", 2), testChannel)).isOk()
|
||||
check (await rm.addToHistory(mkMsg("m3", 3), testChannel)).isOk()
|
||||
# m1 evicted by FIFO; pending should now have m2,m3 as appends and m1 as evict.
|
||||
check "m1" notin rm.channels[testChannel].messageHistory
|
||||
check "m1" in rm.channels[testChannel].pendingHistoryEvicts
|
||||
check "m1" notin rm.channels[testChannel].pendingHistoryAppends
|
||||
|
||||
# SDS-R-style re-delivery of m1. With latest-wins, this MUST cancel
|
||||
# the pending evict and re-queue the append.
|
||||
check (await rm.addToHistory(mkMsg("m1", 4), testChannel)).isOk()
|
||||
check "m1" in rm.channels[testChannel].messageHistory
|
||||
check "m1" notin rm.channels[testChannel].pendingHistoryEvicts
|
||||
check "m1" in rm.channels[testChannel].pendingHistoryAppends
|
||||
|
||||
# Recover and flush. m1 must land on disk.
|
||||
store.failingOps.excl("updateHistory")
|
||||
await rm.tryUpdateHistory(testChannel)
|
||||
check "m1" in store.log[testChannel]
|
||||
|
||||
asyncTest "pending queue survives idle ops (flush on next op without history changes)":
|
||||
# Even if the next op makes no history changes of its own, it must
|
||||
# still flush the pending queue at op end — otherwise a failed write
|
||||
# could sit indefinitely if the application only ever does
|
||||
# mark-deps-met-style ops after a failure.
|
||||
let store = newInMemoryStore()
|
||||
let rm = newReliabilityManager(
|
||||
participantId = "alice", persistence = newInMemoryPersistence(store)
|
||||
)
|
||||
.get()
|
||||
check (await rm.ensureChannel(testChannel)).isOk()
|
||||
|
||||
# Stage a pending entry by failing one send.
|
||||
store.failingOps.incl("updateHistory")
|
||||
discard await rm.wrapOutgoingMessage(@[byte(1)], "m1", testChannel)
|
||||
check rm.channels[testChannel].pendingHistoryAppends.len == 1
|
||||
|
||||
# Now clear the failure and drive a markDependenciesMet on a no-op
|
||||
# input — it has no history changes of its own but its op-end flush
|
||||
# must still retry the queue.
|
||||
store.failingOps.excl("updateHistory")
|
||||
check (await rm.markDependenciesMet(@["nonexistent"], testChannel)).isOk()
|
||||
check rm.channels[testChannel].pendingHistoryAppends.len == 0
|
||||
check "m1" in store.log[testChannel]
|
||||
|
||||
asyncTest "dropChannel failure during removeChannel surfaces as rePersistenceError":
|
||||
# Durability is the semantic intent of removeChannel — the caller
|
||||
# asked us to confirm a disk wipe. We cannot silently lie. So this op
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user