nim-sds/tests/test_reliability.nim
NagyZoltanPeter 4ccdd122fc
refactor(persistence): snapshot-based interface (5 procs, atomic per-op) (#72)
* feat: propagate persistence backend errors via Result

The Persistence contract previously returned `Future[void]` for writes and
`Future[ChannelSnapshot]` for the loader, with `raises: []`. Backends had no
way to report a failure, so a failed write or a failed/partial read was
silently swallowed — and on the read path a mid-scan failure could bootstrap
a *truncated* channel snapshot, corrupting the rebuilt bloom filter and
lamport clock across a restart.

Make every contract field Result-returning:
  * mutating ops  -> Future[Result[void, string]]
  * loadAllForChannel -> Future[Result[ChannelSnapshot, string]]

The backend-supplied error string is mapped to a new
`ReliabilityError.rePersistenceError` (logged once at the boundary via
`reliabilityErr`) and threaded up through every persistence-touching proc to
the public API, where the caller decides what to do. Request-driven paths
(wrap/unwrap/markDependenciesMet/ensureChannel/removeChannel/reset) propagate
the error; background maintenance loops (periodicBufferSweep,
periodicRepairSweep) log and retry on the next tick, since they have no
synchronous caller.

Tests: in-memory backend gains a `failingOps` injection hook; new
"Persistence: error propagation" suite asserts read/write/drop failures
surface as `rePersistenceError`. Full suite passes (90 OK).

BREAKING CHANGE: the `Persistence` contract signature changed; custom
backends must return `Result` and `ok()` on success. Bumped to 0.3.0.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(persistence): add snapshot types and codec (phase 0)

Introduce atomic-snapshot persistence types that will replace the current
fine-grained 13-proc Persistence interface. This commit is purely additive:
no existing call site changes, no behaviour change.

New types (sds/types/):
- channel_meta.nim — ChannelMeta (atomic per-channel snapshot blob),
  ChannelData (bootstrap payload), OutgoingRepairKV / IncomingRepairKV
  (flattened map entries for protobuf wire shape).
- history_update.nim — HistoryUpdate (combined append/evict payload for
  the message log).

New codec (sds/snapshot_codec.nim):
- Protobuf encode/decode for all new types, reusing the existing
  SdsMessage and HistoryEntry encoders from sds/protobuf.nim.
- Explicit schemaVersion=1 on ChannelMeta; decoder rejects unknown
  versions loudly rather than silently truncating.
- Time encoded as int64 unix milliseconds.

Tests (tests/test_snapshot_codec.nim):
- 13 round-trip cases covering empty, single-entry, full-buffer, and
  repair-heavy snapshots; ChannelData ordering; HistoryUpdate variants;
  schemaVersion rejection.

Planning artefacts:
- ANALYSIS_SDS_PERSISTENCE.md — problem statement (partial-write
  divergence, chatty call rate, non-fatal-error policy gap).
- ANALYSIS_SNAPSHOT_SAVE_POINTS.md — exact save points per protocol op
  and projected call rates.
- PLAN_SNAPSHOT_PERSISTENCE.md — phased refactor plan; this commit
  implements phase 0.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat(persistence): add PersistenceV2 interface alongside legacy (phase 1)

Introduce the 5-proc snapshot-based Persistence interface that will
replace the legacy 13-proc one. Both coexist on `ReliabilityManager` so
phase 2 can migrate protocol ops one at a time without breaking existing
callers.

New file:
- sds/types/persistence_v2.nim — `PersistenceV2` type with
  saveChannelMeta / updateHistory / loadChannel / dropChannel /
  setRetrievalHint. `noOpPersistenceV2()` default. Doc-comments capture
  the atomicity pairing (meta save + history update issued back-to-back
  under the channel lock) and the non-fatal failure policy from PLAN §8.

Modified:
- sds/types/reliability_manager.nim — adds `persistenceV2: PersistenceV2`
  field alongside `persistence`; constructor takes both, both default to
  no-op.
- sds.nim — `newReliabilityManager` plumbs the new optional parameter.
- AGENTS.md / CLAUDE.md — GitNexus index re-indexed after phase 0 +
  phase 1 additions; symbol counts updated by `npx gitnexus analyze`.

No call site uses the new interface yet — that's phase 2. All existing
tests still pass against the legacy interface.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* refactor(persistence): migrate runRepairSweep to PersistenceV2 (phase 2.1)

Per-entry removeIncomingRepair / removeOutgoingRepair calls are replaced
by a single trySaveMeta per *dirty* channel at the end of that channel's
sweep. Failure is logged but does NOT abort the sweep — in-memory state
is the source of truth (PLAN_SNAPSHOT_PERSISTENCE.md §8).

Helpers added in sds/sds_utils.nim:
- snapshotMeta(channel) — capture current ChannelContext as ChannelMeta
  blob (flattens Table-keyed buffers to seqs for the wire shape).
- trySaveMeta(rm, channelId, channel) — best-effort meta snapshot save;
  logs on failure, never propagates.
- tryUpdateHistory(rm, channelId, append, evict) — best-effort history
  update; skips the call entirely when both lists are empty (HistoryUpdate
  contract).

Call-rate impact for runRepairSweep:
- Before: N persistence calls per expired entry per channel.
- After:  at most 1 saveChannelMeta per dirty channel; 0 on idle channels
  (matches the dirty-flag floor in ANALYSIS_SNAPSHOT_SAVE_POINTS).

All existing tests pass — including the 3 SDS-R Repair Sweep tests that
directly exercise this proc.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* refactor(persistence): migrate checkUnacknowledgedMessages to PersistenceV2 (phase 2.2)

Per-entry saveOutgoing / removeOutgoing calls are replaced by one
trySaveMeta at the end of the pass, conditional on a dirty flag (resend
attempt incremented, or entry expired). Pass succeeds even if the save
fails — next tick reissues the snapshot.

Call-rate impact:
- Before: N persistence calls per affected entry per pass.
- After:  at most 1 saveChannelMeta per pass; 0 when nothing aged out.

All existing tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* refactor(persistence): add V2 meta snapshot saves to foreground ops (phase 2A)

Wires `trySaveMeta` into the three public protocol ops that mutate
per-channel state — wrapOutgoingMessage, unwrapReceivedMessage, and
markDependenciesMet — at the operation's end, under the channel lock.

Legacy fine-grained persistence calls REMAIN in place; this commit is
additive. Both interfaces persist the same state simultaneously, so all
existing tests pass and a real backend wired to either interface
continues to work. Phase 2B will strip the legacy calls.

Save points match the §"Save Points" table in
ANALYSIS_SNAPSHOT_SAVE_POINTS.md exactly:
- wrapOutgoingMessage: 1 save (always)
- unwrapReceivedMessage: 1 save on every path including duplicate
  (the duplicate path still mutates the repair buffers)
- markDependenciesMet: 1 save after the processIncomingBuffer cascade

Non-fatal failure policy (PLAN §8): trySaveMeta logs and continues;
the protocol op never returns rePersistenceError for snapshot failures.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* refactor(persistence): strip legacy interface from protocol path; migrate tests to V2 (phase 2B+2C+2D)

End-state of phase 2: the protocol code no longer issues any legacy
fine-grained Persistence calls. All state survives via the snapshot-based
PersistenceV2 interface — one trySaveMeta per op end, plus tryUpdateHistory
batched inside addToHistory. The legacy Persistence field on
ReliabilityManager remains for backwards compatibility; phase 3 deletes it.

Protocol changes (sds.nim, sds/sds_utils.nim):
- reviewAckStatus, processIncomingBuffer, updateLamportTimestamp →
  pure in-memory; no per-mutation persistence.
- addToHistory: replaces appendLogEntry+removeLogEntry with a single
  tryUpdateHistory call carrying (append, evict) atomically.
- getRecentHistoryEntries: setRetrievalHint switched to V2; non-fatal.
- wrapOutgoingMessage, unwrapReceivedMessage, markDependenciesMet:
  all per-row saveOutgoing / removeOutgoing / saveIncoming /
  removeIncoming / saveOutgoingRepair / removeOutgoingRepair /
  saveIncomingRepair / removeIncomingRepair calls removed (16 call
  sites in total). State is captured by the op-end trySaveMeta added
  in phase 2A.
- getOrCreateChannel: bootstraps from persistenceV2.loadChannel.
- dropChannelFromPersistence: uses persistenceV2.dropChannel.

Failure policy (PLAN_SNAPSHOT_PERSISTENCE.md §8):
- Foreground ops (wrap, unwrap, markDeps, sweeps): non-fatal —
  trySaveMeta / tryUpdateHistory log and continue; the protocol op
  returns ok regardless of disk failure. In-memory state is the source
  of truth; the next op re-issues a complete snapshot and disk catches
  up automatically.
- Durability-intent ops (removeChannel, resetReliabilityManager via
  dropChannelFromPersistence; getOrCreateChannel via loadChannel):
  still propagate rePersistenceError, because the caller asked us to
  confirm a disk operation and we cannot silently lie.

Test infrastructure:
- tests/in_memory_persistence_v2.nim: new V2 adapter mock that
  decomposes the meta blob into the existing InMemoryStore shape so
  test assertions on store.outgoing / store.incoming / etc. continue to
  work without change.
- tests/test_persistence.nim: 17 tests, all rewritten against V2.
  - 13 state-survival tests carry over with identical assertions.
  - "loadChannel failure surfaces as err on bootstrap" — bootstrap
    keeps durability-intent semantics.
  - "saveChannelMeta failure during send does NOT surface" — deliberate
    inversion of the legacy "write failure surfaces as err" test. Asserts
    the new non-fatal policy: op returns ok, in-memory state correct,
    disk re-syncs on the next op.
  - "updateHistory failure during send does NOT surface" — same policy
    applied to the history path.
  - "dropChannel failure during removeChannel surfaces as err" — kept.
- All 17 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* refactor(persistence): delete legacy interface; rename PersistenceV2 -> Persistence (phase 3)

End-state of the snapshot-persistence refactor. The legacy 13-proc
Persistence interface and its noOpPersistence are gone; the 5-proc
snapshot-based interface (formerly PersistenceV2) takes their place under
the canonical name.

Source:
- sds/types/persistence.nim: replaced 13-proc contract with the 5-proc
  snapshot interface (saveChannelMeta, updateHistory, loadChannel,
  dropChannel, setRetrievalHint). noOpPersistence returns ok everywhere
  and an empty ChannelData on load.
- sds/types/persistence_v2.nim: removed.
- sds/types/reliability_manager.nim: dropped the second persistenceV2
  field; constructor takes a single `persistence: Persistence`.
- sds/sds_utils.nim: rm.persistenceV2.X -> rm.persistence.X; doc-comments
  updated.
- sds.nim: dropped the persistenceV2 parameter from newReliabilityManager.

Tests:
- tests/in_memory_persistence_v2.nim: removed; its content moved to...
- tests/in_memory_persistence.nim: replaces the old legacy mock with the
  snapshot adapter under the canonical filename. Same InMemoryStore
  shape so test assertions stay unchanged.
- tests/test_persistence.nim: ctor param renamed, suite name de-prefixed.

FFI smoke (`nimble libsdsDynamicMac`, refc/threads:on): builds clean.
All 4 test suites pass:
- test_bloom
- test_reliability
- test_persistence (17 V2 tests)
- test_snapshot_codec (13 codec round-trip tests)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* Persisting persistence redesign plan for reference

* refactor(persistence): R2 pending-write queue + per-op accumulator (PR #72 review fix)

Addresses all three substantive review findings on PR #72 in one
structural change: fold the per-op accumulator and the R2 retry buffer
into a single queue on `ChannelContext`, flushed once at op end.

Changes:

- sds/types/channel_context.nim: add `pendingHistoryAppends`
  (`OrderedSet[SdsMessageID]`) and `pendingHistoryEvicts`
  (`HashSet[SdsMessageID]`) fields. Only ids are stored — the full
  SdsMessage is looked up from `messageHistory` at flush time. Documented
  invariant: every id in pendingHistoryAppends is also in messageHistory,
  upheld by the merge rule.

- sds/sds_utils.nim:
  * `queueHistoryAppend(channel, msgId)` / `queueHistoryEvict(channel,
    msgId)` — "latest-wins" merge: append cancels any pending evict
    and vice versa. Symmetric, simple, handles the evict-then-re-add
    sequence correctly (SDS-R repair re-delivering an evicted message
    while the backend is unreachable).
  * `tryUpdateHistory(rm, channelId)` — no more list params; flushes the
    channel's pending queue. Dual role: per-op accumulator (multiple
    `addToHistory` calls within one op queue together and flush as one
    round-trip) AND R2 retry buffer (a failed flush leaves the queue
    populated for the next op to retry).
  * `addToHistory` queues via the helpers; does not call persistence.
  * Pending queue cleared on `cleanup` and `removeChannel`.

- sds.nim:
  * `processIncomingBuffer` returns to its single-arg signature — the
    queue lives on the channel, no parameter threading needed.
  * `wrapOutgoingMessage`, `unwrapReceivedMessage` (all three paths),
    `markDependenciesMet` issue exactly one `trySaveMeta` +
    `tryUpdateHistory` pair at op end, under the lock, with no
    intervening `await`-of-other-work. Matches the Persistence atomicity
    contract documented in `sds/types/persistence.nim`.
  * Pending queue cleared in `resetReliabilityManager`.

- tests/test_persistence.nim:
  * Direct `addToHistory` callers (state-survival setup) now follow with
    explicit `tryUpdateHistory(channelId)` to flush. Reflects the
    production op-end flush pattern.
  * New: `updateHistory failure is retried via R2 pending-write queue` —
    verifies that two failed sends leave both messages on the queue,
    and a third successful send drains the whole queue in one call.
  * New: `pending queue survives idle ops` — verifies that an op with
    no history changes of its own still flushes a previously-failed
    batch at op end.
  * New: `evict-then-re-add merge rule preserves the re-added message
    on disk` — regression for the "latest-wins" merge rule. The original
    "evict-wins" rule would silently drop the re-add and leave the
    message permanently absent from disk; this test would fail under
    that rule and passes under the corrected one.

Resolves PR #72 review comments:
- #1 (delta loss on failed updateHistory) — R2 retry queue.
- #2 (cascade chattiness — N updateHistory calls per op) — queue collects
  cascaded entries, flushed as one batch.
- #3 (atomicity contract mismatch) — implementation now matches the
  documented "saveChannelMeta then updateHistory back-to-back" pairing.

Test summary: 50 tests pass (47 prior + 3 new R2/merge-rule tests).
FFI dylib (`nimble libsdsDynamicMac`, refc + threads:on): clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 12:24:38 +02:00

2068 lines
72 KiB
Nim

import results, std/[times, options, tables]
import sds
import ./async_unittest
# Test-only convenience: implicit string → SdsParticipantID so test fixtures
# can use string literals. Production code retains the distinct-type safety.
converter toParticipantID(s: string): SdsParticipantID =
s.SdsParticipantID
const testChannel = "testChannel"
proc seedBloom(
rm: ReliabilityManager, channel: SdsChannelID, n: int, prefix = "noise"
) =
## Pre-populate a channel's bloom filter with n unrelated ids so the test
## exercises the manager against a realistic, non-empty filter rather than
## the implicit empty one a fresh ReliabilityManager would produce.
let ch = rm.channels[channel]
for i in 0 ..< n:
ch.bloomFilter.add(prefix & $i)
# Core functionality tests
suite "Core Operations":
var rm: ReliabilityManager
asyncSetup:
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
check (await rm.ensureChannel(testChannel)).isOk()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "can create with default config":
let config = defaultConfig()
check:
config.bloomFilterCapacity == DefaultBloomFilterCapacity
config.bloomFilterErrorRate == DefaultBloomFilterErrorRate
config.maxMessageHistory == DefaultMaxMessageHistory
asyncTest "basic message wrapping and unwrapping":
let msg = @[byte(1), 2, 3]
let msgId = "test-msg-1"
let wrappedResult = await rm.wrapOutgoingMessage(msg, msgId, testChannel)
check wrappedResult.isOk()
let wrapped = wrappedResult.get()
check wrapped.len > 0
let unwrapResult = await rm.unwrapReceivedMessage(wrapped)
check unwrapResult.isOk()
let (unwrapped, missingDeps, channelId) = unwrapResult.get()
check:
unwrapped == msg
missingDeps.len == 0
channelId == testChannel
asyncTest "basic message wrapping and unwrapping (non-empty bloom)":
rm.seedBloom(testChannel, 50)
let msg = @[byte(1), 2, 3]
let msgId = "test-msg-1"
let wrappedResult = await rm.wrapOutgoingMessage(msg, msgId, testChannel)
check wrappedResult.isOk()
let wrapped = wrappedResult.get()
check wrapped.len > 0
# The outgoing message must carry the populated bloom snapshot, not an
# empty one — this is the path that was never exercised before.
let decoded = deserializeMessage(wrapped)
check decoded.isOk()
check decoded.get().bloomFilter.len > 0
let unwrapResult = await rm.unwrapReceivedMessage(wrapped)
check unwrapResult.isOk()
let (unwrapped, missingDeps, channelId) = unwrapResult.get()
check:
unwrapped == msg
missingDeps.len == 0
channelId == testChannel
asyncTest "message ordering":
# Create messages with different timestamps
let msg1 = SdsMessage.init(
messageId = "msg1",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[],
)
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = 5,
causalHistory = @[],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
let serialized1 = serializeMessage(msg1)
let serialized2 = serializeMessage(msg2)
check:
serialized1.isOk()
serialized2.isOk()
# Process out of order
discard await rm.unwrapReceivedMessage(serialized2.get())
let timestamp1 = rm.channels[testChannel].lamportTimestamp
discard await rm.unwrapReceivedMessage(serialized1.get())
let timestamp2 = rm.channels[testChannel].lamportTimestamp
check timestamp2 > timestamp1
# Reliability mechanism tests
suite "Reliability Mechanisms":
var rm: ReliabilityManager
asyncSetup:
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
check (await rm.ensureChannel(testChannel)).isOk()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "dependency detection and resolution":
var messageReadyCount = 0
var messageSentCount = 0
var missingDepsCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
missingDepsCount += 1,
)
# Create dependency chain: msg3 -> msg2 -> msg1
let id1 = "msg1"
let id2 = "msg2"
let id3 = "msg3"
# Create messages with dependencies
let msg2 = SdsMessage.init(
messageId = id2,
lamportTimestamp = 2,
causalHistory = toCausalHistory(@[id1]), # msg2 depends on msg1
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
let msg3 = SdsMessage.init(
messageId = id3,
lamportTimestamp = 3,
causalHistory = toCausalHistory(@[id1, id2]), # msg3 depends on both msg1 and msg2
channelId = testChannel,
content = @[byte(3)],
bloomFilter = @[],
)
let serialized2 = serializeMessage(msg2)
let serialized3 = serializeMessage(msg3)
check:
serialized2.isOk()
serialized3.isOk()
# First try processing msg3 (which depends on msg2 which depends on msg1)
let unwrapResult3 = await rm.unwrapReceivedMessage(serialized3.get())
check unwrapResult3.isOk()
let (_, missingDeps3, _) = unwrapResult3.get()
check:
missingDepsCount == 1 # Should trigger missing deps callback
missingDeps3.len == 2 # Should be missing both msg1 and msg2
id1 in missingDeps3.getMessageIds()
id2 in missingDeps3.getMessageIds()
# Then try processing msg2 (which only depends on msg1)
let unwrapResult2 = await rm.unwrapReceivedMessage(serialized2.get())
check unwrapResult2.isOk()
let (_, missingDeps2, _) = unwrapResult2.get()
check:
missingDepsCount == 2 # Should have triggered another missing deps callback
missingDeps2.len == 1 # Should only be missing msg1
id1 in missingDeps2.getMessageIds()
messageReadyCount == 0 # No messages should be ready yet
# Mark first dependency (msg1) as met
let markResult1 = await rm.markDependenciesMet(@[id1], testChannel)
check markResult1.isOk()
let incomingBuffer = await rm.getIncomingBuffer(testChannel)
check:
incomingBuffer.len == 0
messageReadyCount == 2 # Both msg2 and msg3 should be ready
missingDepsCount == 2 # Should still be 2 from the initial missing deps
asyncTest "dependency detection and resolution (non-empty bloom)":
# A populated bloom filter must not short-circuit the dependency check.
# Dependency resolution reads messageHistory, not the bloom — but a future
# "optimisation" could regress this. Seed the bloom with the dep id so a
# bloom-based shortcut would mistakenly mark the dep as satisfied.
var missingDepsCount = 0
var messageReadyCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
missingDepsCount += 1,
)
let id1 = "msg1"
let id2 = "msg2"
rm.seedBloom(testChannel, 30)
# Crucially, also seed the bloom with id1 itself — the dep we will be
# missing from messageHistory. The manager must still report it missing.
rm.channels[testChannel].bloomFilter.add(id1)
let msg2 = SdsMessage.init(
messageId = id2,
lamportTimestamp = 2,
causalHistory = toCausalHistory(@[id1]),
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
let serialized2 = serializeMessage(msg2)
check serialized2.isOk()
let unwrapResult = await rm.unwrapReceivedMessage(serialized2.get())
check unwrapResult.isOk()
let (_, missingDeps, _) = unwrapResult.get()
check:
missingDepsCount == 1
missingDeps.len == 1
id1 in missingDeps.getMessageIds()
messageReadyCount == 0
asyncTest "acknowledgment via causal history":
var messageReadyCount = 0
var messageSentCount = 0
var missingDepsCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
missingDepsCount += 1,
)
# Send our message
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = await rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
# Create a message that has our message in causal history
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
causalHistory = toCausalHistory(@[id1]), # Include our message in causal history
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[] # Test with an empty bloom filter
,
)
let serializedMsg2 = serializeMessage(msg2)
check serializedMsg2.isOk()
# Process the "received" message - should trigger callbacks
let unwrapResult = await rm.unwrapReceivedMessage(serializedMsg2.get())
check unwrapResult.isOk()
check:
messageReadyCount == 1 # For msg2 which we "received"
messageSentCount == 1 # For msg1 which was acknowledged via causal history
asyncTest "acknowledgment via causal history (non-empty bloom)":
# The causal-history ack path must not be perturbed by the local channel
# bloom carrying unrelated ids, and the empty bloom on the incoming
# message must not spuriously ack any of them.
var messageReadyCount = 0
var messageSentCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
rm.seedBloom(testChannel, 50)
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = await rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
causalHistory = toCausalHistory(@[id1]),
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
let serializedMsg2 = serializeMessage(msg2)
check serializedMsg2.isOk()
let unwrapResult = await rm.unwrapReceivedMessage(serializedMsg2.get())
check unwrapResult.isOk()
check:
messageReadyCount == 1
messageSentCount == 1 # exactly id1; no spurious acks for the seeded ids
asyncTest "acknowledgment via bloom filter":
var messageSentCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
# Send our message
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = await rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
# Create a message with bloom filter containing our message
var otherPartyBloomFilter =
RollingBloomFilter.init(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
otherPartyBloomFilter.add(id1)
let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter)
check bfResult.isOk()
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
causalHistory = @[], # Empty causal history as we're using bloom filter
channelId = testChannel,
content = @[byte(2)],
bloomFilter = bfResult.get(),
)
let serializedMsg2 = serializeMessage(msg2)
check serializedMsg2.isOk()
let unwrapResult = await rm.unwrapReceivedMessage(serializedMsg2.get())
check unwrapResult.isOk()
check messageSentCount == 1 # Our message should be acknowledged via bloom filter
asyncTest "acknowledgment via bloom filter (non-empty bloom)":
# The peer's bloom contains both our outgoing id and a pile of unrelated
# ids. The manager must still ack our message exactly once, and unrelated
# ids in the peer's bloom must not produce spurious sent callbacks.
var messageSentCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = await rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
var otherPartyBloomFilter =
RollingBloomFilter.init(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
for i in 0 ..< 100:
otherPartyBloomFilter.add("peer-noise-" & $i)
otherPartyBloomFilter.add(id1)
let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter)
check bfResult.isOk()
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = bfResult.get(),
)
let serializedMsg2 = serializeMessage(msg2)
check serializedMsg2.isOk()
let unwrapResult = await rm.unwrapReceivedMessage(serializedMsg2.get())
check unwrapResult.isOk()
check messageSentCount == 1
asyncTest "outgoing message bloom snapshot reflects channel state":
# Until now nothing asserts that wrapOutgoingMessage actually attaches
# the current bloom snapshot — every other test runs against an empty
# filter where the field is empty either way.
rm.seedBloom(testChannel, 40, prefix = "delivered-")
# Plus a real delivery so we exercise the bloom-on-delivery path too.
let incoming = SdsMessage.init(
messageId = "incoming-1",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(9)],
bloomFilter = @[],
)
let serIncoming = serializeMessage(incoming)
check serIncoming.isOk()
discard await rm.unwrapReceivedMessage(serIncoming.get())
let outId = "outgoing-1"
let wrapped = await rm.wrapOutgoingMessage(@[byte(1)], outId, testChannel)
check wrapped.isOk()
let decoded = deserializeMessage(wrapped.get())
check decoded.isOk()
let attachedFilter = deserializeBloomFilter(decoded.get().bloomFilter)
check attachedFilter.isOk()
var snapshot = RollingBloomFilter.init(
filter = attachedFilter.get(),
capacity = DefaultBloomFilterCapacity,
minCapacity = 0,
maxCapacity = DefaultBloomFilterCapacity,
)
check:
snapshot.contains("delivered-0")
snapshot.contains("delivered-39")
snapshot.contains("incoming-1")
asyncTest "retrieval hints":
var messageReadyCount = 0
var messageSentCount = 0
var missingDepsCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
missingDepsCount += 1,
nil,
proc(messageId: SdsMessageID): seq[byte] =
return cast[seq[byte]]("hint:" & messageId),
)
# Send a first message to populate history
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = await rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
# Send a second message, which should have the first in its causal history
let msg2 = @[byte(2)]
let id2 = "msg2"
let wrap2 = await rm.wrapOutgoingMessage(msg2, id2, testChannel)
check wrap2.isOk()
# Check that the wrapped message contains the hint
let unwrappedMsg2 = deserializeMessage(wrap2.get()).get()
check unwrappedMsg2.causalHistory.len > 0
check unwrappedMsg2.causalHistory[0].messageId == id1
check unwrappedMsg2.causalHistory[0].retrievalHint == cast[seq[byte]]("hint:" & id1)
# Create a message with a missing dependency (no retrieval hint)
let msg3 = SdsMessage.init(
messageId = "msg3",
lamportTimestamp = 3,
causalHistory = toCausalHistory(@["missing-dep"]),
channelId = testChannel,
content = @[byte(3)],
bloomFilter = @[],
)
let serialized3 = serializeMessage(msg3).get()
let unwrapResult3 = await rm.unwrapReceivedMessage(serialized3)
check unwrapResult3.isOk()
let (_, missingDeps3, _) = unwrapResult3.get()
check missingDeps3.len == 1
check missingDeps3[0].messageId == "missing-dep"
# The hint is empty because it was not provided by the remote sender
check missingDeps3[0].retrievalHint.len == 0
# Test with a message that HAS a retrieval hint from remote
let msg4 = SdsMessage.init(
messageId = "msg4",
lamportTimestamp = 4,
causalHistory =
@[newHistoryEntry("another-missing", cast[seq[byte]]("remote-hint"))],
channelId = testChannel,
content = @[byte(4)],
bloomFilter = @[],
)
let serialized4 = serializeMessage(msg4).get()
let unwrapResult4 = await rm.unwrapReceivedMessage(serialized4)
check unwrapResult4.isOk()
let (_, missingDeps4, _) = unwrapResult4.get()
check missingDeps4.len == 1
check missingDeps4[0].messageId == "another-missing"
# The hint should be preserved from the remote sender
check missingDeps4[0].retrievalHint == cast[seq[byte]]("remote-hint")
# Periodic task & Buffer management tests
suite "Periodic Tasks & Buffer Management":
var rm: ReliabilityManager
asyncSetup:
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
check (await rm.ensureChannel(testChannel)).isOk()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "outgoing buffer management":
var messageSentCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
# Add multiple messages
for i in 0 .. 5:
let msg = @[byte(i)]
let id = "msg" & $i
let wrap = await rm.wrapOutgoingMessage(msg, id, testChannel)
check wrap.isOk()
let outBuffer = await rm.getOutgoingBuffer(testChannel)
check outBuffer.len == 6
# Create message that acknowledges some messages
let ackMsg = SdsMessage.init(
messageId = "ack1",
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
causalHistory = toCausalHistory(@["msg0", "msg2", "msg4"]),
channelId = testChannel,
content = @[byte(100)],
bloomFilter = @[],
)
let serializedAck = serializeMessage(ackMsg)
check serializedAck.isOk()
# Process the acknowledgment
discard await rm.unwrapReceivedMessage(serializedAck.get())
let finalBuffer = await rm.getOutgoingBuffer(testChannel)
check:
finalBuffer.len == 3 # Should have removed acknowledged messages
messageSentCount == 3
# Should have triggered sent callback for acknowledged messages
asyncTest "periodic buffer sweep and bloom clean":
var messageSentCount = 0
var config = defaultConfig()
config.resendInterval = initDuration(milliseconds = 100) # Short for testing
config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps
config.bloomFilterCapacity = 2 # Small capacity for testing
config.maxResendAttempts = 3 # Set a low number of max attempts
let rmResultP = newReliabilityManager(participantId = "alice", config = config)
check rmResultP.isOk()
let rm = rmResultP.get()
check (await rm.ensureChannel(testChannel)).isOk()
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
# First message - should be cleaned from bloom filter later
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = await rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
let initialBuffer = await rm.getOutgoingBuffer(testChannel)
check:
initialBuffer[0].resendAttempts == 0
rm.channels[testChannel].bloomFilter.contains(id1)
rm.startPeriodicTasks()
# Wait long enough for bloom filter
await sleepAsync(chronos.milliseconds(500))
# Add new messages
let msg2 = @[byte(2)]
let id2 = "msg2"
let wrap2 = await rm.wrapOutgoingMessage(msg2, id2, testChannel)
check wrap2.isOk()
let msg3 = @[byte(3)]
let id3 = "msg3"
let wrap3 = await rm.wrapOutgoingMessage(msg3, id3, testChannel)
check wrap3.isOk()
let finalBuffer = await rm.getOutgoingBuffer(testChannel)
check:
finalBuffer.len == 2
# Only msg2 and msg3 should be in buffer, msg1 should be removed after max retries
finalBuffer[0].message.messageId == id2 # Verify it's the second message
finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts
not rm.channels[testChannel].bloomFilter.contains(id1)
# Bloom filter cleaning check
rm.channels[testChannel].bloomFilter.contains(id3) # New message still in filter
await rm.cleanup()
asyncTest "periodic sync callback":
var syncCallCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
proc() {.gcsafe.} =
syncCallCount += 1,
)
rm.startPeriodicTasks()
await sleepAsync(chronos.seconds(1))
await rm.cleanup()
check syncCallCount > 0
# Special cases handling
suite "Special Cases Handling":
var rm: ReliabilityManager
asyncSetup:
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
check (await rm.ensureChannel(testChannel)).isOk()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "message history limits":
# Add messages up to max history size
for i in 0 .. rm.config.maxMessageHistory + 5:
let msg = @[byte(i)]
let id = "msg" & $i
let wrap = await rm.wrapOutgoingMessage(msg, id, testChannel)
check wrap.isOk()
let history = await rm.getMessageHistory(testChannel)
check:
history.len <= rm.config.maxMessageHistory
history[^1] == "msg" & $(rm.config.maxMessageHistory + 5)
asyncTest "invalid bloom filter handling":
let msgInvalid = SdsMessage.init(
messageId = "invalid-bf",
lamportTimestamp = 1,
causalHistory = toCausalHistory(@[]),
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[1.byte, 2.byte, 3.byte] # Invalid filter data
,
)
let serializedInvalid = serializeMessage(msgInvalid)
check serializedInvalid.isOk()
# Should handle invalid bloom filter gracefully
let result = await rm.unwrapReceivedMessage(serializedInvalid.get())
check:
result.isOk()
result.get()[1].len == 0 # No missing dependencies
asyncTest "duplicate message handling":
var messageReadyCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
# Create and process a message
let msg = SdsMessage.init(
messageId = "dup-msg",
lamportTimestamp = 1,
causalHistory = toCausalHistory(@[]),
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[],
)
let serialized = serializeMessage(msg)
check serialized.isOk()
# Process same message twice
let result1 = await rm.unwrapReceivedMessage(serialized.get())
check result1.isOk()
let result2 = await rm.unwrapReceivedMessage(serialized.get())
check:
result2.isOk()
result2.get()[1].len == 0 # No missing deps on second process
messageReadyCount == 1 # Message should only be processed once
asyncTest "error handling":
# Empty message
let emptyMsg: seq[byte] = @[]
let emptyResult = await rm.wrapOutgoingMessage(emptyMsg, "empty", testChannel)
check:
not emptyResult.isOk()
emptyResult.error == reInvalidArgument
# Oversized message
let largeMsg = newSeq[byte](MaxMessageSize + 1)
let largeResult = await rm.wrapOutgoingMessage(largeMsg, "large", testChannel)
check:
not largeResult.isOk()
largeResult.error == reMessageTooLarge
suite "cleanup":
asyncTest "cleanup works correctly":
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
let rm = rmResult.get()
check (await rm.ensureChannel(testChannel)).isOk()
# Add some messages
let msg = @[byte(1), 2, 3]
let msgId = "test-msg-1"
discard await rm.wrapOutgoingMessage(msg, msgId, testChannel)
await rm.cleanup()
let outBuffer = await rm.getOutgoingBuffer(testChannel)
let history = await rm.getMessageHistory(testChannel)
check:
outBuffer.len == 0
history.len == 0
suite "Multi-Channel ReliabilityManager Tests":
var rm: ReliabilityManager
asyncSetup:
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "can create multi-channel manager without channel ID":
check rm.channels.len == 0
asyncTest "channel management":
let channel1 = "channel1"
let channel2 = "channel2"
# Ensure channels
check (await rm.ensureChannel(channel1)).isOk()
check (await rm.ensureChannel(channel2)).isOk()
check rm.channels.len == 2
# Remove channel
check (await rm.removeChannel(channel1)).isOk()
check rm.channels.len == 1
check channel1 notin rm.channels
check channel2 in rm.channels
asyncTest "stateless message unwrapping with channel extraction":
let channel1 = "test-channel-1"
let channel2 = "test-channel-2"
# Create and wrap messages for different channels
let msg1 = @[byte(1), 2, 3]
let msgId1 = "msg1"
let wrapped1 = await rm.wrapOutgoingMessage(msg1, msgId1, channel1)
check wrapped1.isOk()
let msg2 = @[byte(4), 5, 6]
let msgId2 = "msg2"
let wrapped2 = await rm.wrapOutgoingMessage(msg2, msgId2, channel2)
check wrapped2.isOk()
# Unwrap messages - should extract channel ID and route correctly
let unwrap1 = await rm.unwrapReceivedMessage(wrapped1.get())
check unwrap1.isOk()
let (content1, deps1, extractedChannel1) = unwrap1.get()
check:
content1 == msg1
deps1.len == 0
extractedChannel1 == channel1
let unwrap2 = await rm.unwrapReceivedMessage(wrapped2.get())
check unwrap2.isOk()
let (content2, deps2, extractedChannel2) = unwrap2.get()
check:
content2 == msg2
deps2.len == 0
extractedChannel2 == channel2
asyncTest "channel isolation":
let channel1 = "isolated-channel-1"
let channel2 = "isolated-channel-2"
# Add messages to different channels
let msg1 = @[byte(1)]
let msgId1 = "isolated-msg1"
discard await rm.wrapOutgoingMessage(msg1, msgId1, channel1)
let msg2 = @[byte(2)]
let msgId2 = "isolated-msg2"
discard await rm.wrapOutgoingMessage(msg2, msgId2, channel2)
# Check channel-specific data is isolated
let history1 = await rm.getMessageHistory(channel1)
let history2 = await rm.getMessageHistory(channel2)
check:
history1.len == 1
history2.len == 1
msgId1 in history1
msgId2 in history2
msgId1 notin history2
msgId2 notin history1
asyncTest "channel isolation (non-empty bloom)":
# With both channels carrying populated blooms, ids on one channel must
# not appear in the other's filter. An empty-bloom test cannot observe
# this — there is nothing to bleed across.
let channel1 = "iso-bloom-1"
let channel2 = "iso-bloom-2"
check (await rm.ensureChannel(channel1)).isOk()
check (await rm.ensureChannel(channel2)).isOk()
rm.seedBloom(channel1, 25, prefix = "ch1-")
rm.seedBloom(channel2, 25, prefix = "ch2-")
let wrap1 = await rm.wrapOutgoingMessage(@[byte(1)], "iso-msg-1", channel1)
let wrap2 = await rm.wrapOutgoingMessage(@[byte(2)], "iso-msg-2", channel2)
check wrap1.isOk() and wrap2.isOk()
let bf1 = rm.channels[channel1].bloomFilter
let bf2 = rm.channels[channel2].bloomFilter
check:
bf1.contains("ch1-0")
bf1.contains("iso-msg-1")
not bf1.contains("ch2-0")
not bf1.contains("iso-msg-2")
bf2.contains("ch2-0")
bf2.contains("iso-msg-2")
not bf2.contains("ch1-0")
not bf2.contains("iso-msg-1")
asyncTest "multi-channel callbacks":
var readyMessageCount = 0
var sentMessageCount = 0
var missingDepsCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
readyMessageCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
sentMessageCount += 1,
proc(
messageId: SdsMessageID, deps: seq[HistoryEntry], channelId: SdsChannelID
) {.gcsafe.} =
missingDepsCount += 1,
)
let channel1 = "callback-channel-1"
let channel2 = "callback-channel-2"
# Send messages from both channels
let msg1 = @[byte(1)]
let msgId1 = "callback-msg1"
let wrapped1 = await rm.wrapOutgoingMessage(msg1, msgId1, channel1)
check wrapped1.isOk()
let msg2 = @[byte(2)]
let msgId2 = "callback-msg2"
let wrapped2 = await rm.wrapOutgoingMessage(msg2, msgId2, channel2)
check wrapped2.isOk()
# Create acknowledgment messages that include our message IDs in causal history
# to trigger sent callbacks
let ackMsg1 = SdsMessage.init(
messageId = "ack1",
lamportTimestamp = rm.channels[channel1].lamportTimestamp + 1,
causalHistory = toCausalHistory(@[msgId1]), # Acknowledge msg1
channelId = channel1,
content = @[byte(100)],
bloomFilter = @[],
)
let ackMsg2 = SdsMessage.init(
messageId = "ack2",
lamportTimestamp = rm.channels[channel2].lamportTimestamp + 1,
causalHistory = toCausalHistory(@[msgId2]), # Acknowledge msg2
channelId = channel2,
content = @[byte(101)],
bloomFilter = @[],
)
let serializedAck1 = serializeMessage(ackMsg1)
let serializedAck2 = serializeMessage(ackMsg2)
check:
serializedAck1.isOk()
serializedAck2.isOk()
# Process acknowledgment messages - should trigger callbacks
discard await rm.unwrapReceivedMessage(serializedAck1.get())
discard await rm.unwrapReceivedMessage(serializedAck2.get())
check:
readyMessageCount == 2 # Both ack messages should trigger ready callbacks
sentMessageCount == 2 # Both original messages should be marked as sent
missingDepsCount == 0 # No missing dependencies
asyncTest "channel-specific dependency management":
let channel1 = "dep-channel-1"
let channel2 = "dep-channel-2"
let depIds = @["dep1", "dep2", "dep3"]
# Ensure both channels exist first
check (await rm.ensureChannel(channel1)).isOk()
check (await rm.ensureChannel(channel2)).isOk()
# Mark dependencies as met for specific channel
check (await rm.markDependenciesMet(depIds, channel1)).isOk()
# Dependencies should only affect the specified channel
# Dependencies in channel1 should not affect channel2
check rm.channels[channel1].bloomFilter.contains("dep1")
check not rm.channels[channel2].bloomFilter.contains("dep1")
# SDS-R Repair tests
suite "SDS-R: Computation Functions":
test "computeTReq returns duration in [tMin, tMax)":
let tMin = initDuration(seconds = 30)
let tMax = initDuration(seconds = 300)
let d = computeTReq("participant1", "msg1", tMin, tMax)
check:
d.inMilliseconds >= tMin.inMilliseconds
d.inMilliseconds < tMax.inMilliseconds
test "computeTReq is deterministic for same inputs":
let tMin = initDuration(seconds = 30)
let tMax = initDuration(seconds = 300)
let d1 = computeTReq("p1", "m1", tMin, tMax)
let d2 = computeTReq("p1", "m1", tMin, tMax)
check d1 == d2
test "computeTReq varies with different participants":
let tMin = initDuration(seconds = 30)
let tMax = initDuration(seconds = 300)
let d1 = computeTReq("participant-A", "msg1", tMin, tMax)
let d2 = computeTReq("participant-B", "msg1", tMin, tMax)
# Different participants should generally get different backoff (not guaranteed but highly likely)
# Just check both are in valid range
check:
d1.inMilliseconds >= tMin.inMilliseconds
d2.inMilliseconds >= tMin.inMilliseconds
test "computeTResp original sender has zero distance":
let d = computeTResp("sender1", "sender1", "msg1", initDuration(seconds = 300))
check d.inMilliseconds == 0
test "computeTResp non-sender has positive backoff":
let d = computeTResp("other-node", "sender1", "msg1", initDuration(seconds = 300))
check d.inMilliseconds >= 0
test "isInResponseGroup all in same group when numGroups =1":
check isInResponseGroup("p1", "sender1", "msg1", 1) == true
check isInResponseGroup("p2", "sender1", "msg1", 1) == true
test "isInResponseGroup sender always in own group":
# Original sender must always be in their own response group
for groups in 1 .. 10:
check isInResponseGroup("sender1", "sender1", "msg1", groups) == true
suite "SDS-R: Repair Buffer Management":
var rm: ReliabilityManager
asyncSetup:
let rmResult = newReliabilityManager(participantId = "test-participant")
check rmResult.isOk()
rm = rmResult.get()
check (await rm.ensureChannel(testChannel)).isOk()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "missing deps added to outgoing repair buffer":
var missingDepsCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
missingDepsCount += 1,
)
# Create a message with a missing dependency
let msg = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = 2,
causalHistory = @[HistoryEntry(messageId: "msg1", senderId: "sender-A")],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
let serialized = serializeMessage(msg).get()
let result = await rm.unwrapReceivedMessage(serialized)
check result.isOk()
# msg1 should be in the outgoing repair buffer
let channel = rm.channels[testChannel]
check:
missingDepsCount == 1
"msg1" in channel.outgoingRepairBuffer
asyncTest "receiving message clears it from repair buffers":
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
# First, create the missing dep scenario
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = 2,
causalHistory = @[HistoryEntry(messageId: "msg1", senderId: "sender-A")],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg2).get())
check "msg1" in rm.channels[testChannel].outgoingRepairBuffer
# Now receive msg1 — should clear from repair buffer
let msg1 = SdsMessage.init(
messageId = "msg1",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg1).get())
check "msg1" notin rm.channels[testChannel].outgoingRepairBuffer
asyncTest "markDependenciesMet clears repair buffers":
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = 2,
causalHistory = @[HistoryEntry(messageId: "msg1", senderId: "sender-A")],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg2).get())
check "msg1" in rm.channels[testChannel].outgoingRepairBuffer
# Mark as met via store retrieval
check (await rm.markDependenciesMet(@["msg1"], testChannel)).isOk()
check "msg1" notin rm.channels[testChannel].outgoingRepairBuffer
asyncTest "expired repair requests attached to outgoing messages":
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
# Manually add an expired repair entry
let channel = rm.channels[testChannel]
channel.outgoingRepairBuffer["missing-msg"] = OutgoingRepairEntry(
outHistEntry: HistoryEntry(messageId: "missing-msg", senderId: "orig-sender"),
minTimeRepairReq: getTime() - initDuration(seconds = 10), # Already expired
)
# Send a message — should pick up the expired repair request
let wrapped = await rm.wrapOutgoingMessage(@[byte(1)], "new-msg", testChannel)
check wrapped.isOk()
let unwrapped = deserializeMessage(wrapped.get()).get()
check:
unwrapped.repairRequest.len == 1
unwrapped.repairRequest[0].messageId == "missing-msg"
# Should be removed from buffer after attaching
"missing-msg" notin channel.outgoingRepairBuffer
asyncTest "expired repair requests attach the most-overdue first when capped":
# Per spec (sds-r-send-message, RECOMMENDED): when more entries are
# eligible than maxRepairRequests, attach the ones with the smallest
# minTimeRepairReq — i.e. the most overdue.
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
let channel = rm.channels[testChannel]
let now = getTime()
# Five eligible entries with strictly ordered minTimeRepairReq (most-overdue first).
# All are expired; the cap is the default 3, so two should be left behind.
let expected = ["oldest", "second", "third", "fourth", "newest"]
for i, id in expected:
channel.outgoingRepairBuffer[id] = OutgoingRepairEntry(
outHistEntry: HistoryEntry(messageId: id, senderId: "sender"),
minTimeRepairReq: now - initDuration(seconds = 50 - i * 10),
)
let wrapped = await rm.wrapOutgoingMessage(@[byte(1)], "outbound", testChannel)
check wrapped.isOk()
let attached = deserializeMessage(wrapped.get()).get().repairRequest
check:
attached.len == rm.config.maxRepairRequests
attached[0].messageId == "oldest"
attached[1].messageId == "second"
attached[2].messageId == "third"
# Two least-overdue remain in the buffer for next time.
"fourth" in channel.outgoingRepairBuffer
"newest" in channel.outgoingRepairBuffer
"oldest" notin channel.outgoingRepairBuffer
"second" notin channel.outgoingRepairBuffer
"third" notin channel.outgoingRepairBuffer
asyncTest "incoming repair request adds to incoming repair buffer when eligible":
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
let channel = rm.channels[testChannel]
# First, seed delivered history so we can respond to a repair request for it
let cachedMsg = SdsMessage.init(
messageId = "cached-msg",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(99)],
bloomFilter = @[],
)
channel.messageHistory["cached-msg"] = cachedMsg
# Receive a message with a repair request for "cached-msg"
let msgWithRepair = SdsMessage.init(
messageId = "requester-msg",
lamportTimestamp = 5,
causalHistory = @[],
channelId = testChannel,
content = @[byte(3)],
bloomFilter = @[],
repairRequest = @[
HistoryEntry(
messageId: "cached-msg",
senderId: "test-participant",
# Same as our participantId so we're in response group
)
],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msgWithRepair).get())
# We should have added it to the incoming repair buffer (we have the message and are in response group)
check "cached-msg" in channel.incomingRepairBuffer
suite "SDS-R: Protobuf Roundtrip":
test "senderId in HistoryEntry roundtrips through protobuf":
let msg = SdsMessage.init(
messageId = "msg1",
lamportTimestamp = 100,
causalHistory = @[
HistoryEntry(
messageId: "dep1", retrievalHint: @[byte(1), 2], senderId: "sender-A"
),
HistoryEntry(messageId: "dep2", senderId: "sender-B"),
],
channelId = "ch1",
content = @[byte(42)],
bloomFilter = @[],
)
let serialized = serializeMessage(msg).get()
let decoded = deserializeMessage(serialized).get()
check:
decoded.causalHistory.len == 2
decoded.causalHistory[0].messageId == "dep1"
decoded.causalHistory[0].senderId == "sender-A"
decoded.causalHistory[0].retrievalHint == @[byte(1), 2]
decoded.causalHistory[1].messageId == "dep2"
decoded.causalHistory[1].senderId == "sender-B"
test "repairRequest field roundtrips through protobuf":
let msg = SdsMessage.init(
messageId = "msg1",
lamportTimestamp = 100,
causalHistory = @[],
channelId = "ch1",
content = @[byte(42)],
bloomFilter = @[],
repairRequest = @[
HistoryEntry(messageId: "missing1", senderId: "sender-X"),
HistoryEntry(
messageId: "missing2", senderId: "sender-Y", retrievalHint: @[byte(5)]
),
],
)
let serialized = serializeMessage(msg).get()
let decoded = deserializeMessage(serialized).get()
check:
decoded.repairRequest.len == 2
decoded.repairRequest[0].messageId == "missing1"
decoded.repairRequest[0].senderId == "sender-X"
decoded.repairRequest[1].messageId == "missing2"
decoded.repairRequest[1].senderId == "sender-Y"
decoded.repairRequest[1].retrievalHint == @[byte(5)]
test "backward compat: message without repairRequest decodes fine":
let msg = SdsMessage.init(
messageId = "msg1",
lamportTimestamp = 100,
causalHistory = @[HistoryEntry(messageId: "dep1")],
channelId = "ch1",
content = @[byte(42)],
bloomFilter = @[],
)
let serialized = serializeMessage(msg).get()
let decoded = deserializeMessage(serialized).get()
check:
decoded.repairRequest.len == 0
decoded.causalHistory[0].senderId == ""
test "SdsMessage.senderId roundtrips through protobuf":
let msg = SdsMessage.init(
messageId = "m1",
lamportTimestamp = 1,
causalHistory = @[],
channelId = "ch1",
content = @[byte(1)],
bloomFilter = @[],
senderId = "alice",
)
let decoded = deserializeMessage(serializeMessage(msg).get()).get()
check decoded.senderId == "alice"
# ---------------------------------------------------------------------------
# SDS-R Phase 2 tests: edge cases, lifecycle, sweep, and multi-participant flows
# ---------------------------------------------------------------------------
suite "SDS-R: Edge Cases and Defensive Branches":
test "computeTReq returns tMin when range is degenerate":
let tMin = initDuration(seconds = 30)
# tMax == tMin
let d1 = computeTReq("p", "m", tMin, tMin)
check d1 == tMin
# tMax < tMin (rangeMs < 0)
let d2 = computeTReq("p", "m", tMin, initDuration(seconds = 10))
check d2 == tMin
test "computeTResp returns 0 when tMax is 0":
let d = computeTResp("p", "other", "m", initDuration(milliseconds = 0))
check d.inMilliseconds == 0
test "computeTResp always stays within [0, tMax)":
# Adversarial sweep — result must never wrap negative nor exceed tMax
let tMax = initDuration(seconds = 300)
for i in 0 ..< 500:
let d = computeTResp(
"participant-" & $i, "sender-" & $(i * 13), "msg-" & $(i * 31), tMax
)
check:
d.inMilliseconds >= 0
d.inMilliseconds < tMax.inMilliseconds
test "isInResponseGroup returns true for non-positive numGroups":
check isInResponseGroup("p", "sender", "m", 0) == true
check isInResponseGroup("p", "sender", "m", -1) == true
test "computeTReq bounds across many random inputs":
let tMin = initDuration(seconds = 30)
let tMax = initDuration(seconds = 300)
for i in 0 ..< 200:
let d = computeTReq("p-" & $i, "m-" & $i, tMin, tMax)
check:
d.inMilliseconds >= tMin.inMilliseconds
d.inMilliseconds < tMax.inMilliseconds
test "response group distribution is roughly uniform":
# With numGroups =10, ~10% of random participants should share sender's group.
const numGroups = 10
const totalParticipants = 1000
let senderId = "alice"
let msgId = "msg-xyz"
var sameGroup = 0
for i in 0 ..< totalParticipants:
if isInResponseGroup("participant-" & $i, senderId, msgId, numGroups):
sameGroup += 1
# Expected ~100 (1/N), allow [50, 200] band for hash quirks
check:
sameGroup >= 50
sameGroup <= 200
test "computeTResp monotonicity: self always fastest":
# The original sender (distance =0) must always be first to respond.
let tMax = initDuration(seconds = 300)
let selfD = computeTResp("alice", "alice", "msg-xyz", tMax)
check selfD.inMilliseconds == 0
for i in 0 ..< 50:
let other = computeTResp("other-" & $i, "alice", "msg-xyz", tMax)
check other.inMilliseconds >= selfD.inMilliseconds
suite "SDS-R: Lifecycle and State":
asyncTest "empty participantId disables outgoing repair creation":
# Explicitly pass empty id to exercise the SDS-R no-op branch. Required-arg
# signature means callers can no longer accidentally land here.
let rm = newReliabilityManager(participantId = "".SdsParticipantID).get()
check (await rm.ensureChannel(testChannel)).isOk()
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
)
let msg = SdsMessage.init(
messageId = "m2",
lamportTimestamp = 2,
causalHistory = @[HistoryEntry(messageId: "m1-missing", senderId: "alice")],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg).get())
check rm.channels[testChannel].outgoingRepairBuffer.len == 0
await rm.cleanup()
asyncTest "empty senderId in incoming repair request is ignored":
let rm = newReliabilityManager(participantId = "bob").get()
check (await rm.ensureChannel(testChannel)).isOk()
let channel = rm.channels[testChannel]
channel.messageHistory["m-wanted"] = SdsMessage.init(
messageId = "m-wanted",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(99), 99, 99],
bloomFilter = @[],
)
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
)
let msg = SdsMessage.init(
messageId = "req-msg",
lamportTimestamp = 5,
causalHistory = @[],
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[],
repairRequest = @[HistoryEntry(messageId: "m-wanted", senderId: "")],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg).get())
check "m-wanted" notin channel.incomingRepairBuffer
await rm.cleanup()
asyncTest "wrapOutgoingMessage records the message in history with our senderId":
# Proves Bug 1 is fixed — the original sender can serve her own message.
# In the consolidated history model, the SdsMessage itself carries senderId
# and can be re-serialized on demand for repair, so a single membership
# check + senderId read covers both halves of the original assertion.
let rm = newReliabilityManager(participantId = "alice").get()
check (await rm.ensureChannel(testChannel)).isOk()
discard await rm.wrapOutgoingMessage(@[byte(1), 2, 3], "m1", testChannel)
let channel = rm.channels[testChannel]
check:
"m1" in channel.messageHistory
channel.messageHistory["m1"].senderId == "alice"
channel.messageHistory["m1"].content == @[byte(1), 2, 3]
await rm.cleanup()
asyncTest "getRecentHistoryEntries carries senderId for own messages":
let rm = newReliabilityManager(participantId = "alice").get()
check (await rm.ensureChannel(testChannel)).isOk()
discard await rm.wrapOutgoingMessage(@[byte(1)], "m1", testChannel)
discard await rm.wrapOutgoingMessage(@[byte(2)], "m2", testChannel)
let entries = (await rm.getRecentHistoryEntries(10, testChannel)).get()
check:
entries.len == 2
entries[0].senderId == "alice"
entries[1].senderId == "alice"
await rm.cleanup()
asyncTest "resetReliabilityManager clears all SDS-R state":
let rm = newReliabilityManager(participantId = "alice").get()
check (await rm.ensureChannel(testChannel)).isOk()
let channel = rm.channels[testChannel]
channel.outgoingRepairBuffer["a"] = OutgoingRepairEntry(
outHistEntry: HistoryEntry(messageId: "a", senderId: "x"),
minTimeRepairReq: getTime(),
)
channel.incomingRepairBuffer["b"] = IncomingRepairEntry(
inHistEntry: HistoryEntry(messageId: "b", senderId: "y"),
cachedMessage: @[byte(1)],
minTimeRepairResp: getTime(),
)
channel.messageHistory["c"] = SdsMessage.init(
messageId = "c",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
senderId = "someone",
)
check (await rm.resetReliabilityManager()).isOk()
check (await rm.ensureChannel(testChannel)).isOk()
let ch2 = rm.channels[testChannel]
check:
ch2.outgoingRepairBuffer.len == 0
ch2.incomingRepairBuffer.len == 0
ch2.messageHistory.len == 0
await rm.cleanup()
asyncTest "SDS-R state is isolated per channel":
let rm = newReliabilityManager(participantId = "alice").get()
check (await rm.ensureChannel("ch-A")).isOk()
check (await rm.ensureChannel("ch-B")).isOk()
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
)
let msg = SdsMessage.init(
messageId = "m2",
lamportTimestamp = 2,
causalHistory = @[HistoryEntry(messageId: "m1-missing", senderId: "bob")],
channelId = "ch-A",
content = @[byte(2)],
bloomFilter = @[],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg).get())
check:
rm.channels["ch-A"].outgoingRepairBuffer.len == 1
rm.channels["ch-B"].outgoingRepairBuffer.len == 0
await rm.cleanup()
asyncTest "duplicate message arrival cancels pending incoming repair entry":
# Covers the dedup-before-cleanup fix: a rebroadcast arriving at a peer who
# already has the message must clear that peer's incomingRepairBuffer entry.
let rm = newReliabilityManager(participantId = "carol").get()
check (await rm.ensureChannel(testChannel)).isOk()
let channel = rm.channels[testChannel]
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
)
# Carol already has M1 in history and has a pending incomingRepairBuffer entry
channel.messageHistory["m1"] = SdsMessage.init(
messageId = "m1",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[],
)
channel.incomingRepairBuffer["m1"] = IncomingRepairEntry(
inHistEntry: HistoryEntry(messageId: "m1", senderId: "alice"),
cachedMessage: @[byte(1)],
minTimeRepairResp: getTime() + initDuration(seconds = 10),
)
# A rebroadcast of M1 arrives
let msg = SdsMessage.init(
messageId = "m1",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[],
senderId = "alice",
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg).get())
check "m1" notin channel.incomingRepairBuffer
await rm.cleanup()
suite "SDS-R: Repair Sweep":
var rm: ReliabilityManager
asyncSetup:
rm = newReliabilityManager(participantId = "bob").get()
check (await rm.ensureChannel(testChannel)).isOk()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "runRepairSweep fires onRepairReady for expired tResp":
var fireCount = 0
var firstBytes: seq[byte] = @[]
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
onRepairReady = proc(bytes: seq[byte], ch: SdsChannelID) {.gcsafe.} =
{.cast(gcsafe).}:
fireCount += 1
if fireCount == 1:
firstBytes = bytes
,
)
let channel = rm.channels[testChannel]
channel.incomingRepairBuffer["m-ready"] = IncomingRepairEntry(
inHistEntry: HistoryEntry(messageId: "m-ready", senderId: "alice"),
cachedMessage: @[byte(1), 2, 3],
minTimeRepairResp: getTime() - initDuration(seconds = 1), # expired
)
channel.incomingRepairBuffer["m-not-ready"] = IncomingRepairEntry(
inHistEntry: HistoryEntry(messageId: "m-not-ready", senderId: "alice"),
cachedMessage: @[byte(9), 9, 9],
minTimeRepairResp: getTime() + initDuration(minutes = 10), # far future
)
check (await rm.runRepairSweep()).isOk()
check:
fireCount == 1
firstBytes == @[byte(1), 2, 3]
"m-ready" notin channel.incomingRepairBuffer
"m-not-ready" in channel.incomingRepairBuffer
asyncTest "runRepairSweep drops outgoing entries past T_max window":
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
)
let channel = rm.channels[testChannel]
let tMax = rm.config.repairTMax
channel.outgoingRepairBuffer["m-stale"] = OutgoingRepairEntry(
outHistEntry: HistoryEntry(messageId: "m-stale", senderId: "alice"),
minTimeRepairReq: getTime() - (tMax + tMax), # now - 2*T_max, past drop window
)
channel.outgoingRepairBuffer["m-fresh"] = OutgoingRepairEntry(
outHistEntry: HistoryEntry(messageId: "m-fresh", senderId: "alice"),
minTimeRepairReq: getTime(),
)
check (await rm.runRepairSweep()).isOk()
check:
"m-stale" notin channel.outgoingRepairBuffer
"m-fresh" in channel.outgoingRepairBuffer
asyncTest "runRepairSweep no-op when buffers are empty":
var fireCount = 0
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
onRepairReady = proc(bytes: seq[byte], ch: SdsChannelID) {.gcsafe.} =
fireCount += 1,
)
check (await rm.runRepairSweep()).isOk()
check fireCount == 0
# --- Multi-participant in-process bus for integration tests ---------------
type TestBus = ref object
peers: OrderedTable[SdsParticipantID, ReliabilityManager]
delivered: Table[SdsParticipantID, seq[SdsMessageID]]
# Log of raw message-ids placed on the wire, tagged with the source peer.
wireLog: seq[tuple[senderId: SdsParticipantID, messageId: SdsMessageID]]
# Queue of (sender, bytes) the repair callback would have delivered if it
# could await. Drained explicitly by `bus.drain()` from the test body.
pending: seq[(SdsParticipantID, seq[byte])]
proc newTestBus(): TestBus =
TestBus(
peers: initOrderedTable[SdsParticipantID, ReliabilityManager](),
delivered: initTable[SdsParticipantID, seq[SdsMessageID]](),
wireLog: @[],
pending: @[],
)
proc recordWire(bus: TestBus, senderId: SdsParticipantID, bytes: seq[byte]) {.gcsafe.} =
let decoded = deserializeMessage(bytes)
if decoded.isOk():
bus.wireLog.add((senderId, decoded.get().messageId))
proc deliverExcept(
bus: TestBus,
senderId: SdsParticipantID,
bytes: seq[byte],
exclude: seq[SdsParticipantID],
) {.async: (raises: []).} =
for pid, peer in bus.peers:
if pid == senderId or pid in exclude:
continue
discard await peer.unwrapReceivedMessage(bytes)
proc drain(bus: TestBus): Future[void] {.async.} =
## Delivers every (sender, bytes) the repair callback enqueued. Loops until
## the queue stays empty across one full pass — a delivery may trigger a
## new repair-ready callback that re-enqueues.
while bus.pending.len > 0:
let batch = move bus.pending
bus.pending = @[]
for entry in batch:
await bus.deliverExcept(entry[0], entry[1], @[])
proc addPeer(
bus: TestBus,
participantId: SdsParticipantID,
config: ReliabilityConfig = defaultConfig(),
): Future[ReliabilityManager] {.async.} =
let rm = newReliabilityManager(participantId, config).get()
doAssert (await rm.ensureChannel(testChannel)).isOk()
bus.peers[participantId] = rm
bus.delivered[participantId] = @[]
let pid = participantId
let busRef = bus
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
{.cast(gcsafe).}:
busRef.delivered[pid].add(msgId),
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
onRepairReady = proc(bytes: seq[byte], ch: SdsChannelID) {.gcsafe.} =
# The callback contract is sync, so we cannot `await` here. Enqueue the
# delivery and let the test drive it via `bus.drain()` instead.
{.cast(gcsafe).}:
busRef.recordWire(pid, bytes)
busRef.pending.add((pid, bytes)),
)
return rm
proc broadcast(
bus: TestBus,
senderId: SdsParticipantID,
content: seq[byte],
messageId: SdsMessageID,
dropAt: seq[SdsParticipantID] = @[],
): Future[void] {.async.} =
let rm = bus.peers[senderId]
let wrapped = await rm.wrapOutgoingMessage(content, messageId, testChannel)
doAssert wrapped.isOk()
bus.recordWire(senderId, wrapped.get())
await bus.deliverExcept(senderId, wrapped.get(), dropAt)
proc forceOutgoingExpired(rm: ReliabilityManager, messageId: SdsMessageID) =
## Push a specific outgoingRepairBuffer entry's minTimeRepairReq into the past so the
## next wrapOutgoingMessage will pick it up.
let channel = rm.channels[testChannel]
if messageId in channel.outgoingRepairBuffer:
channel.outgoingRepairBuffer[messageId].minTimeRepairReq =
getTime() - initDuration(seconds = 1)
proc forceIncomingExpired(rm: ReliabilityManager, messageId: SdsMessageID) =
## Push an incomingRepairBuffer entry's minTimeRepairResp into the past so runRepairSweep fires it.
let channel = rm.channels[testChannel]
if messageId in channel.incomingRepairBuffer:
channel.incomingRepairBuffer[messageId].minTimeRepairResp =
getTime() - initDuration(seconds = 1)
suite "SDS-R: Multi-Participant Integration":
asyncTest "basic single-gap repair (Alice -> Bob misses -> Carol's message triggers repair)":
let bus = newTestBus()
let alice = await bus.addPeer("alice")
let bob = await bus.addPeer("bob")
let carol = await bus.addPeer("carol")
# Alice sends M1, but Bob is offline for this one.
await bus.broadcast("alice", @[byte(1)], "m1", dropAt = @["bob".SdsParticipantID])
# Carol now has M1; Bob does not.
check "m1" in carol.channels[testChannel].messageHistory
check "m1" notin bob.channels[testChannel].messageHistory
# Carol sends M2 with causal history referencing M1.
await bus.broadcast("carol", @[byte(2)], "m2")
# Bob detects M1 missing and populates his outgoingRepairBuffer.
check "m1" in bob.channels[testChannel].outgoingRepairBuffer
# Bob should have buffered M2.
check "m2" in bob.channels[testChannel].incomingBuffer
check "m2" notin bus.delivered["bob"]
# Force Bob's T_req so the next wrap attaches the repair request.
bob.forceOutgoingExpired("m1")
# Bob sends M3 — it must carry repair_request =[M1, sender =alice].
await bus.broadcast("bob", @[byte(3)], "m3")
# Alice received M3, saw the repair_request, cached-bypass and response-group
# checks pass, so she has an incomingRepairBuffer entry for M1 with tResp =0.
check "m1" in alice.channels[testChannel].incomingRepairBuffer
# Force alice's tResp to past just to be safe (it's already 0 for self),
# then run her sweep. She rebroadcasts M1.
alice.forceIncomingExpired("m1")
check (await alice.runRepairSweep()).isOk()
await bus.drain()
# Bob now has M1 and M2 delivered.
check:
"m1" in bus.delivered["bob"]
"m2" in bus.delivered["bob"]
asyncTest "response cancellation: only one rebroadcast on the wire":
let bus = newTestBus()
let alice = await bus.addPeer("alice")
let bob = await bus.addPeer("bob")
let carol = await bus.addPeer("carol")
# Alice sends M1, Bob offline.
await bus.broadcast("alice", @[byte(1)], "m1", dropAt = @["bob".SdsParticipantID])
# Carol sends M2; Bob sees M1 missing.
await bus.broadcast("carol", @[byte(2)], "m2")
check "m1" in bob.channels[testChannel].outgoingRepairBuffer
# Bob requests repair.
bob.forceOutgoingExpired("m1")
await bus.broadcast("bob", @[byte(3)], "m3")
# Both Alice and Carol now have an incomingRepairBuffer entry for M1.
check:
"m1" in alice.channels[testChannel].incomingRepairBuffer
"m1" in carol.channels[testChannel].incomingRepairBuffer
# Alice fires first (T_resp =0 for self). Her rebroadcast should cancel Carol's
# pending entry when Carol receives the rebroadcast.
alice.forceIncomingExpired("m1")
check (await alice.runRepairSweep()).isOk()
await bus.drain()
# Carol's pending response must have been cleared by the dedup-path cleanup.
check "m1" notin carol.channels[testChannel].incomingRepairBuffer
# Even if we now force-run Carol's sweep, nothing should fire.
let wireCountBefore = bus.wireLog.len
check (await carol.runRepairSweep()).isOk()
await bus.drain()
check bus.wireLog.len == wireCountBefore
# Bob received exactly one rebroadcast of M1.
var m1RebroadcastCount = 0
for entry in bus.wireLog:
if entry.messageId == "m1" and entry.senderId != "alice":
discard # only the original Alice->all broadcast had senderId ="alice"
if entry.messageId == "m1":
m1RebroadcastCount += 1
# Two "m1" entries total on wire: (1) Alice's original broadcast, (2) Alice's rebroadcast.
check m1RebroadcastCount == 2
asyncTest "cancellation on incoming repair request: peer drops its own pending request":
let bus = newTestBus()
let alice = await bus.addPeer("alice")
let bob = await bus.addPeer("bob")
let carol = await bus.addPeer("carol")
# Alice sends M1 — drop at both Bob and Carol, so both miss it.
await bus.broadcast(
"alice",
@[byte(1)],
"m1",
dropAt = @["bob".SdsParticipantID, "carol".SdsParticipantID],
)
# Alice sends M2 referencing M1 — both Bob and Carol see M1 missing.
await bus.broadcast("alice", @[byte(2)], "m2")
check:
"m1" in bob.channels[testChannel].outgoingRepairBuffer
"m1" in carol.channels[testChannel].outgoingRepairBuffer
# Bob's T_req fires first. He sends a repair request for M1.
bob.forceOutgoingExpired("m1")
await bus.broadcast("bob", @[byte(3)], "m3")
# Carol, on receiving Bob's repair request, must have dropped her own
# pending outgoingRepairBuffer entry for M1 (cancellation).
check "m1" notin carol.channels[testChannel].outgoingRepairBuffer
asyncTest "response group filtering: only group members respond":
# With numGroups =10, roughly 1/10 of receivers will be in the group.
# Construct a sender+message where a specific non-sender is NOT in the group.
var cfg = defaultConfig()
cfg.numResponseGroups = 10
# Pick a msgId where carol is not in the group and bob is
# We probe deterministically because computeTReq/isInResponseGroup are pure.
var chosenMsg = ""
for i in 0 ..< 1000:
let candidate = "probe-" & $i
let bobIn = isInResponseGroup("bob", "alice", candidate, 10)
let carolIn = isInResponseGroup("carol", "alice", candidate, 10)
if bobIn and not carolIn:
chosenMsg = candidate
break
check chosenMsg.len > 0
let bus = newTestBus()
discard await bus.addPeer("alice", cfg)
let bob = await bus.addPeer("bob", cfg)
let carol = await bus.addPeer("carol", cfg)
# Both Bob and Carol receive the original M1 (so both have it in messageHistory).
await bus.broadcast("alice", @[byte(1)], chosenMsg)
# Now Dave arrives: build a fake requester message manually so its repair_request
# names Alice as senderId for chosenMsg.
# We inject directly by calling unwrapReceivedMessage on bob/carol.
discard await bus.addPeer("dave", cfg)
# Dave has no messages, but we can hand-craft a repair request he would send.
let reqMsg = SdsMessage.init(
messageId = "req-from-dave",
lamportTimestamp = 10,
causalHistory = @[],
channelId = testChannel,
content = @[byte(9)],
bloomFilter = @[],
senderId = "dave",
repairRequest = @[HistoryEntry(messageId: chosenMsg, senderId: "alice")],
)
let bytes = serializeMessage(reqMsg).get()
discard await bob.unwrapReceivedMessage(bytes)
discard await carol.unwrapReceivedMessage(bytes)
check:
chosenMsg in bob.channels[testChannel].incomingRepairBuffer
chosenMsg notin carol.channels[testChannel].incomingRepairBuffer
asyncTest "multi-gap batch repair: many missing deps split across requests":
let bus = newTestBus()
discard await bus.addPeer("alice")
let bob = await bus.addPeer("bob")
# Alice sends 5 messages while Bob is offline.
let drops = @["bob".SdsParticipantID]
await bus.broadcast("alice", @[byte(1)], "m1", dropAt = drops)
await bus.broadcast("alice", @[byte(2)], "m2", dropAt = drops)
await bus.broadcast("alice", @[byte(3)], "m3", dropAt = drops)
await bus.broadcast("alice", @[byte(4)], "m4", dropAt = drops)
await bus.broadcast("alice", @[byte(5)], "m5", dropAt = drops)
# Bob comes online and receives M6 which depends on m1..m5.
await bus.broadcast("alice", @[byte(6)], "m6")
# Bob should have 5 outgoing repair entries.
let channel = bob.channels[testChannel]
check channel.outgoingRepairBuffer.len == 5
# Force all to expired and wrap one message — only maxRepairRequests
# (default 3) should attach to a single outgoing message.
for id in ["m1", "m2", "m3", "m4", "m5"]:
bob.forceOutgoingExpired(id)
let wrapped =
(await bob.wrapOutgoingMessage(@[byte(99)], "bob-msg-1", testChannel)).get()
let decoded = deserializeMessage(wrapped).get()
check decoded.repairRequest.len <= bob.config.maxRepairRequests
# The attached entries should be removed from the outgoing buffer.
check channel.outgoingRepairBuffer.len == 5 - decoded.repairRequest.len
asyncTest "markDependenciesMet externally clears pending repair entry":
let bus = newTestBus()
discard await bus.addPeer("alice")
let bob = await bus.addPeer("bob")
await bus.broadcast("alice", @[byte(1)], "m1", dropAt = @["bob".SdsParticipantID])
await bus.broadcast("alice", @[byte(2)], "m2")
check "m1" in bob.channels[testChannel].outgoingRepairBuffer
# Simulate Bob fetching M1 via an out-of-band store query.
check (await bob.markDependenciesMet(@["m1"], testChannel)).isOk()
check "m1" notin bob.channels[testChannel].outgoingRepairBuffer