nim-sds/tests/test_reliability.nim
NagyZoltanPeter 35a33adc98
feat: make Persistence interface async (#69)
* feat: make Persistence interface async

The 14 Persistence proc fields now return Future[...] with
{.async: (raises: []), gcsafe.}, allowing real I/O backends (SQLite,
encrypted file, network) to suspend rather than block the Chronos event
loop the manager runs on.

Propagates through:
- ReliabilityManager.lock: system.Lock -> chronos.AsyncLock. Acquired
  across awaits cleanly; matches the single-threaded Chronos worker the
  FFI uses. Multi-OS-thread use is now explicitly the caller's
  responsibility.
- sds_utils + sds.nim public API procs (wrapOutgoingMessage,
  unwrapReceivedMessage, markDependenciesMet, setCallbacks,
  resetReliabilityManager, cleanup, ensureChannel, removeChannel, the
  getter snapshots, etc.) are now async.
- FFI request handlers in library/sds_thread/... await the new API.
- Tests converted via an asyncTest template that wraps each test body
  in an async proc; setup/teardown use waitFor for their single async
  call (ensureChannel / cleanup).

Lock scope is preserved exactly: the same call sites that held the
kernel Lock today hold AsyncLock now -- no new locking added.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* refactor: drop asyncSpawn, add asyncSetup/asyncTeardown

Three asyncSpawn usages removed:

- sds.nim startPeriodicTasks: stored the periodic-task futures on
  ReliabilityManager (new field `periodicTasks: seq[FutureBase]`) so
  cleanup can cancel them on shutdown instead of leaking the loops
  against a cleared manager.
- library/sds_thread/sds_thread.nim: fireSync moved BEFORE processing,
  then `await SdsThreadRequest.process(...)` instead of asyncSpawn'ing
  it. Aligns the worker with the SP-channel + lock assumption that
  there are no concurrent requests; caller throughput is unchanged
  because the caller only waits for receipt (fireSync), not processing.
- tests TestBus repair callback: replaced asyncSpawn(deliverExcept...)
  with an explicit pending-delivery queue drained by `bus.drain()`.
  Integration tests no longer rely on `sleepAsync(10ms)` to let
  spawned deliveries finish — they await drain instead.

Tests also pick up an asyncSetup/asyncTeardown pair (tests/async_unittest.nim)
so suite fixtures can `await` directly. All `waitFor` in setup/teardown
blocks is gone; only the top-level asyncTest wrapper still uses waitFor
(once, to drive the async proc to completion).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* Correctly propagate error hidden by new async move

* Correctly handle future cancellation exceptions, +some housekeeping

* Apply suggestion from @Ivansete-status

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>

* Stylistics, async default implication addressed, nph style run

* Remove leaking CancelledFuture from public facing + as a consequence it is tuneled into handling CatchableError everywhere

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
2026-05-25 22:30:15 +02:00

2068 lines
71 KiB
Nim

import results, std/[times, options, tables]
import sds
import ./async_unittest
# Test-only convenience: implicit string → SdsParticipantID so test fixtures
# can use string literals. Production code retains the distinct-type safety.
converter toParticipantID(s: string): SdsParticipantID =
s.SdsParticipantID
const testChannel = "testChannel"
proc seedBloom(
rm: ReliabilityManager, channel: SdsChannelID, n: int, prefix = "noise"
) =
## Pre-populate a channel's bloom filter with n unrelated ids so the test
## exercises the manager against a realistic, non-empty filter rather than
## the implicit empty one a fresh ReliabilityManager would produce.
let ch = rm.channels[channel]
for i in 0 ..< n:
ch.bloomFilter.add(prefix & $i)
# Core functionality tests
suite "Core Operations":
var rm: ReliabilityManager
asyncSetup:
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
check (await rm.ensureChannel(testChannel)).isOk()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "can create with default config":
let config = defaultConfig()
check:
config.bloomFilterCapacity == DefaultBloomFilterCapacity
config.bloomFilterErrorRate == DefaultBloomFilterErrorRate
config.maxMessageHistory == DefaultMaxMessageHistory
asyncTest "basic message wrapping and unwrapping":
let msg = @[byte(1), 2, 3]
let msgId = "test-msg-1"
let wrappedResult = await rm.wrapOutgoingMessage(msg, msgId, testChannel)
check wrappedResult.isOk()
let wrapped = wrappedResult.get()
check wrapped.len > 0
let unwrapResult = await rm.unwrapReceivedMessage(wrapped)
check unwrapResult.isOk()
let (unwrapped, missingDeps, channelId) = unwrapResult.get()
check:
unwrapped == msg
missingDeps.len == 0
channelId == testChannel
asyncTest "basic message wrapping and unwrapping (non-empty bloom)":
rm.seedBloom(testChannel, 50)
let msg = @[byte(1), 2, 3]
let msgId = "test-msg-1"
let wrappedResult = await rm.wrapOutgoingMessage(msg, msgId, testChannel)
check wrappedResult.isOk()
let wrapped = wrappedResult.get()
check wrapped.len > 0
# The outgoing message must carry the populated bloom snapshot, not an
# empty one — this is the path that was never exercised before.
let decoded = deserializeMessage(wrapped)
check decoded.isOk()
check decoded.get().bloomFilter.len > 0
let unwrapResult = await rm.unwrapReceivedMessage(wrapped)
check unwrapResult.isOk()
let (unwrapped, missingDeps, channelId) = unwrapResult.get()
check:
unwrapped == msg
missingDeps.len == 0
channelId == testChannel
asyncTest "message ordering":
# Create messages with different timestamps
let msg1 = SdsMessage.init(
messageId = "msg1",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[],
)
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = 5,
causalHistory = @[],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
let serialized1 = serializeMessage(msg1)
let serialized2 = serializeMessage(msg2)
check:
serialized1.isOk()
serialized2.isOk()
# Process out of order
discard await rm.unwrapReceivedMessage(serialized2.get())
let timestamp1 = rm.channels[testChannel].lamportTimestamp
discard await rm.unwrapReceivedMessage(serialized1.get())
let timestamp2 = rm.channels[testChannel].lamportTimestamp
check timestamp2 > timestamp1
# Reliability mechanism tests
suite "Reliability Mechanisms":
var rm: ReliabilityManager
asyncSetup:
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
check (await rm.ensureChannel(testChannel)).isOk()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "dependency detection and resolution":
var messageReadyCount = 0
var messageSentCount = 0
var missingDepsCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
missingDepsCount += 1,
)
# Create dependency chain: msg3 -> msg2 -> msg1
let id1 = "msg1"
let id2 = "msg2"
let id3 = "msg3"
# Create messages with dependencies
let msg2 = SdsMessage.init(
messageId = id2,
lamportTimestamp = 2,
causalHistory = toCausalHistory(@[id1]), # msg2 depends on msg1
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
let msg3 = SdsMessage.init(
messageId = id3,
lamportTimestamp = 3,
causalHistory = toCausalHistory(@[id1, id2]), # msg3 depends on both msg1 and msg2
channelId = testChannel,
content = @[byte(3)],
bloomFilter = @[],
)
let serialized2 = serializeMessage(msg2)
let serialized3 = serializeMessage(msg3)
check:
serialized2.isOk()
serialized3.isOk()
# First try processing msg3 (which depends on msg2 which depends on msg1)
let unwrapResult3 = await rm.unwrapReceivedMessage(serialized3.get())
check unwrapResult3.isOk()
let (_, missingDeps3, _) = unwrapResult3.get()
check:
missingDepsCount == 1 # Should trigger missing deps callback
missingDeps3.len == 2 # Should be missing both msg1 and msg2
id1 in missingDeps3.getMessageIds()
id2 in missingDeps3.getMessageIds()
# Then try processing msg2 (which only depends on msg1)
let unwrapResult2 = await rm.unwrapReceivedMessage(serialized2.get())
check unwrapResult2.isOk()
let (_, missingDeps2, _) = unwrapResult2.get()
check:
missingDepsCount == 2 # Should have triggered another missing deps callback
missingDeps2.len == 1 # Should only be missing msg1
id1 in missingDeps2.getMessageIds()
messageReadyCount == 0 # No messages should be ready yet
# Mark first dependency (msg1) as met
let markResult1 = await rm.markDependenciesMet(@[id1], testChannel)
check markResult1.isOk()
let incomingBuffer = await rm.getIncomingBuffer(testChannel)
check:
incomingBuffer.len == 0
messageReadyCount == 2 # Both msg2 and msg3 should be ready
missingDepsCount == 2 # Should still be 2 from the initial missing deps
asyncTest "dependency detection and resolution (non-empty bloom)":
# A populated bloom filter must not short-circuit the dependency check.
# Dependency resolution reads messageHistory, not the bloom — but a future
# "optimisation" could regress this. Seed the bloom with the dep id so a
# bloom-based shortcut would mistakenly mark the dep as satisfied.
var missingDepsCount = 0
var messageReadyCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
missingDepsCount += 1,
)
let id1 = "msg1"
let id2 = "msg2"
rm.seedBloom(testChannel, 30)
# Crucially, also seed the bloom with id1 itself — the dep we will be
# missing from messageHistory. The manager must still report it missing.
rm.channels[testChannel].bloomFilter.add(id1)
let msg2 = SdsMessage.init(
messageId = id2,
lamportTimestamp = 2,
causalHistory = toCausalHistory(@[id1]),
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
let serialized2 = serializeMessage(msg2)
check serialized2.isOk()
let unwrapResult = await rm.unwrapReceivedMessage(serialized2.get())
check unwrapResult.isOk()
let (_, missingDeps, _) = unwrapResult.get()
check:
missingDepsCount == 1
missingDeps.len == 1
id1 in missingDeps.getMessageIds()
messageReadyCount == 0
asyncTest "acknowledgment via causal history":
var messageReadyCount = 0
var messageSentCount = 0
var missingDepsCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
missingDepsCount += 1,
)
# Send our message
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = await rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
# Create a message that has our message in causal history
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
causalHistory = toCausalHistory(@[id1]), # Include our message in causal history
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[] # Test with an empty bloom filter
,
)
let serializedMsg2 = serializeMessage(msg2)
check serializedMsg2.isOk()
# Process the "received" message - should trigger callbacks
let unwrapResult = await rm.unwrapReceivedMessage(serializedMsg2.get())
check unwrapResult.isOk()
check:
messageReadyCount == 1 # For msg2 which we "received"
messageSentCount == 1 # For msg1 which was acknowledged via causal history
asyncTest "acknowledgment via causal history (non-empty bloom)":
# The causal-history ack path must not be perturbed by the local channel
# bloom carrying unrelated ids, and the empty bloom on the incoming
# message must not spuriously ack any of them.
var messageReadyCount = 0
var messageSentCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
rm.seedBloom(testChannel, 50)
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = await rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
causalHistory = toCausalHistory(@[id1]),
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
let serializedMsg2 = serializeMessage(msg2)
check serializedMsg2.isOk()
let unwrapResult = await rm.unwrapReceivedMessage(serializedMsg2.get())
check unwrapResult.isOk()
check:
messageReadyCount == 1
messageSentCount == 1 # exactly id1; no spurious acks for the seeded ids
asyncTest "acknowledgment via bloom filter":
var messageSentCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
# Send our message
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = await rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
# Create a message with bloom filter containing our message
var otherPartyBloomFilter =
RollingBloomFilter.init(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
otherPartyBloomFilter.add(id1)
let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter)
check bfResult.isOk()
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
causalHistory = @[], # Empty causal history as we're using bloom filter
channelId = testChannel,
content = @[byte(2)],
bloomFilter = bfResult.get(),
)
let serializedMsg2 = serializeMessage(msg2)
check serializedMsg2.isOk()
let unwrapResult = await rm.unwrapReceivedMessage(serializedMsg2.get())
check unwrapResult.isOk()
check messageSentCount == 1 # Our message should be acknowledged via bloom filter
asyncTest "acknowledgment via bloom filter (non-empty bloom)":
# The peer's bloom contains both our outgoing id and a pile of unrelated
# ids. The manager must still ack our message exactly once, and unrelated
# ids in the peer's bloom must not produce spurious sent callbacks.
var messageSentCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = await rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
var otherPartyBloomFilter =
RollingBloomFilter.init(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
for i in 0 ..< 100:
otherPartyBloomFilter.add("peer-noise-" & $i)
otherPartyBloomFilter.add(id1)
let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter)
check bfResult.isOk()
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = bfResult.get(),
)
let serializedMsg2 = serializeMessage(msg2)
check serializedMsg2.isOk()
let unwrapResult = await rm.unwrapReceivedMessage(serializedMsg2.get())
check unwrapResult.isOk()
check messageSentCount == 1
asyncTest "outgoing message bloom snapshot reflects channel state":
# Until now nothing asserts that wrapOutgoingMessage actually attaches
# the current bloom snapshot — every other test runs against an empty
# filter where the field is empty either way.
rm.seedBloom(testChannel, 40, prefix = "delivered-")
# Plus a real delivery so we exercise the bloom-on-delivery path too.
let incoming = SdsMessage.init(
messageId = "incoming-1",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(9)],
bloomFilter = @[],
)
let serIncoming = serializeMessage(incoming)
check serIncoming.isOk()
discard await rm.unwrapReceivedMessage(serIncoming.get())
let outId = "outgoing-1"
let wrapped = await rm.wrapOutgoingMessage(@[byte(1)], outId, testChannel)
check wrapped.isOk()
let decoded = deserializeMessage(wrapped.get())
check decoded.isOk()
let attachedFilter = deserializeBloomFilter(decoded.get().bloomFilter)
check attachedFilter.isOk()
var snapshot = RollingBloomFilter.init(
filter = attachedFilter.get(),
capacity = DefaultBloomFilterCapacity,
minCapacity = 0,
maxCapacity = DefaultBloomFilterCapacity,
)
check:
snapshot.contains("delivered-0")
snapshot.contains("delivered-39")
snapshot.contains("incoming-1")
asyncTest "retrieval hints":
var messageReadyCount = 0
var messageSentCount = 0
var missingDepsCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
missingDepsCount += 1,
nil,
proc(messageId: SdsMessageID): seq[byte] =
return cast[seq[byte]]("hint:" & messageId),
)
# Send a first message to populate history
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = await rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
# Send a second message, which should have the first in its causal history
let msg2 = @[byte(2)]
let id2 = "msg2"
let wrap2 = await rm.wrapOutgoingMessage(msg2, id2, testChannel)
check wrap2.isOk()
# Check that the wrapped message contains the hint
let unwrappedMsg2 = deserializeMessage(wrap2.get()).get()
check unwrappedMsg2.causalHistory.len > 0
check unwrappedMsg2.causalHistory[0].messageId == id1
check unwrappedMsg2.causalHistory[0].retrievalHint == cast[seq[byte]]("hint:" & id1)
# Create a message with a missing dependency (no retrieval hint)
let msg3 = SdsMessage.init(
messageId = "msg3",
lamportTimestamp = 3,
causalHistory = toCausalHistory(@["missing-dep"]),
channelId = testChannel,
content = @[byte(3)],
bloomFilter = @[],
)
let serialized3 = serializeMessage(msg3).get()
let unwrapResult3 = await rm.unwrapReceivedMessage(serialized3)
check unwrapResult3.isOk()
let (_, missingDeps3, _) = unwrapResult3.get()
check missingDeps3.len == 1
check missingDeps3[0].messageId == "missing-dep"
# The hint is empty because it was not provided by the remote sender
check missingDeps3[0].retrievalHint.len == 0
# Test with a message that HAS a retrieval hint from remote
let msg4 = SdsMessage.init(
messageId = "msg4",
lamportTimestamp = 4,
causalHistory =
@[newHistoryEntry("another-missing", cast[seq[byte]]("remote-hint"))],
channelId = testChannel,
content = @[byte(4)],
bloomFilter = @[],
)
let serialized4 = serializeMessage(msg4).get()
let unwrapResult4 = await rm.unwrapReceivedMessage(serialized4)
check unwrapResult4.isOk()
let (_, missingDeps4, _) = unwrapResult4.get()
check missingDeps4.len == 1
check missingDeps4[0].messageId == "another-missing"
# The hint should be preserved from the remote sender
check missingDeps4[0].retrievalHint == cast[seq[byte]]("remote-hint")
# Periodic task & Buffer management tests
suite "Periodic Tasks & Buffer Management":
var rm: ReliabilityManager
asyncSetup:
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
check (await rm.ensureChannel(testChannel)).isOk()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "outgoing buffer management":
var messageSentCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
# Add multiple messages
for i in 0 .. 5:
let msg = @[byte(i)]
let id = "msg" & $i
let wrap = await rm.wrapOutgoingMessage(msg, id, testChannel)
check wrap.isOk()
let outBuffer = await rm.getOutgoingBuffer(testChannel)
check outBuffer.len == 6
# Create message that acknowledges some messages
let ackMsg = SdsMessage.init(
messageId = "ack1",
lamportTimestamp = rm.channels[testChannel].lamportTimestamp + 1,
causalHistory = toCausalHistory(@["msg0", "msg2", "msg4"]),
channelId = testChannel,
content = @[byte(100)],
bloomFilter = @[],
)
let serializedAck = serializeMessage(ackMsg)
check serializedAck.isOk()
# Process the acknowledgment
discard await rm.unwrapReceivedMessage(serializedAck.get())
let finalBuffer = await rm.getOutgoingBuffer(testChannel)
check:
finalBuffer.len == 3 # Should have removed acknowledged messages
messageSentCount == 3
# Should have triggered sent callback for acknowledged messages
asyncTest "periodic buffer sweep and bloom clean":
var messageSentCount = 0
var config = defaultConfig()
config.resendInterval = initDuration(milliseconds = 100) # Short for testing
config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps
config.bloomFilterCapacity = 2 # Small capacity for testing
config.maxResendAttempts = 3 # Set a low number of max attempts
let rmResultP = newReliabilityManager(participantId = "alice", config = config)
check rmResultP.isOk()
let rm = rmResultP.get()
check (await rm.ensureChannel(testChannel)).isOk()
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
# First message - should be cleaned from bloom filter later
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = await rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
let initialBuffer = await rm.getOutgoingBuffer(testChannel)
check:
initialBuffer[0].resendAttempts == 0
rm.channels[testChannel].bloomFilter.contains(id1)
rm.startPeriodicTasks()
# Wait long enough for bloom filter
await sleepAsync(chronos.milliseconds(500))
# Add new messages
let msg2 = @[byte(2)]
let id2 = "msg2"
let wrap2 = await rm.wrapOutgoingMessage(msg2, id2, testChannel)
check wrap2.isOk()
let msg3 = @[byte(3)]
let id3 = "msg3"
let wrap3 = await rm.wrapOutgoingMessage(msg3, id3, testChannel)
check wrap3.isOk()
let finalBuffer = await rm.getOutgoingBuffer(testChannel)
check:
finalBuffer.len == 2
# Only msg2 and msg3 should be in buffer, msg1 should be removed after max retries
finalBuffer[0].message.messageId == id2 # Verify it's the second message
finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts
not rm.channels[testChannel].bloomFilter.contains(id1)
# Bloom filter cleaning check
rm.channels[testChannel].bloomFilter.contains(id3) # New message still in filter
await rm.cleanup()
asyncTest "periodic sync callback":
var syncCallCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
proc() {.gcsafe.} =
syncCallCount += 1,
)
rm.startPeriodicTasks()
await sleepAsync(chronos.seconds(1))
await rm.cleanup()
check syncCallCount > 0
# Special cases handling
suite "Special Cases Handling":
var rm: ReliabilityManager
asyncSetup:
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
check (await rm.ensureChannel(testChannel)).isOk()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "message history limits":
# Add messages up to max history size
for i in 0 .. rm.config.maxMessageHistory + 5:
let msg = @[byte(i)]
let id = "msg" & $i
let wrap = await rm.wrapOutgoingMessage(msg, id, testChannel)
check wrap.isOk()
let history = await rm.getMessageHistory(testChannel)
check:
history.len <= rm.config.maxMessageHistory
history[^1] == "msg" & $(rm.config.maxMessageHistory + 5)
asyncTest "invalid bloom filter handling":
let msgInvalid = SdsMessage.init(
messageId = "invalid-bf",
lamportTimestamp = 1,
causalHistory = toCausalHistory(@[]),
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[1.byte, 2.byte, 3.byte] # Invalid filter data
,
)
let serializedInvalid = serializeMessage(msgInvalid)
check serializedInvalid.isOk()
# Should handle invalid bloom filter gracefully
let result = await rm.unwrapReceivedMessage(serializedInvalid.get())
check:
result.isOk()
result.get()[1].len == 0 # No missing dependencies
asyncTest "duplicate message handling":
var messageReadyCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
# Create and process a message
let msg = SdsMessage.init(
messageId = "dup-msg",
lamportTimestamp = 1,
causalHistory = toCausalHistory(@[]),
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[],
)
let serialized = serializeMessage(msg)
check serialized.isOk()
# Process same message twice
let result1 = await rm.unwrapReceivedMessage(serialized.get())
check result1.isOk()
let result2 = await rm.unwrapReceivedMessage(serialized.get())
check:
result2.isOk()
result2.get()[1].len == 0 # No missing deps on second process
messageReadyCount == 1 # Message should only be processed once
asyncTest "error handling":
# Empty message
let emptyMsg: seq[byte] = @[]
let emptyResult = await rm.wrapOutgoingMessage(emptyMsg, "empty", testChannel)
check:
not emptyResult.isOk()
emptyResult.error == reInvalidArgument
# Oversized message
let largeMsg = newSeq[byte](MaxMessageSize + 1)
let largeResult = await rm.wrapOutgoingMessage(largeMsg, "large", testChannel)
check:
not largeResult.isOk()
largeResult.error == reMessageTooLarge
suite "cleanup":
asyncTest "cleanup works correctly":
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
let rm = rmResult.get()
check (await rm.ensureChannel(testChannel)).isOk()
# Add some messages
let msg = @[byte(1), 2, 3]
let msgId = "test-msg-1"
discard await rm.wrapOutgoingMessage(msg, msgId, testChannel)
await rm.cleanup()
let outBuffer = await rm.getOutgoingBuffer(testChannel)
let history = await rm.getMessageHistory(testChannel)
check:
outBuffer.len == 0
history.len == 0
suite "Multi-Channel ReliabilityManager Tests":
var rm: ReliabilityManager
asyncSetup:
let rmResult = newReliabilityManager(participantId = "alice")
check rmResult.isOk()
rm = rmResult.get()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "can create multi-channel manager without channel ID":
check rm.channels.len == 0
asyncTest "channel management":
let channel1 = "channel1"
let channel2 = "channel2"
# Ensure channels
check (await rm.ensureChannel(channel1)).isOk()
check (await rm.ensureChannel(channel2)).isOk()
check rm.channels.len == 2
# Remove channel
check (await rm.removeChannel(channel1)).isOk()
check rm.channels.len == 1
check channel1 notin rm.channels
check channel2 in rm.channels
asyncTest "stateless message unwrapping with channel extraction":
let channel1 = "test-channel-1"
let channel2 = "test-channel-2"
# Create and wrap messages for different channels
let msg1 = @[byte(1), 2, 3]
let msgId1 = "msg1"
let wrapped1 = await rm.wrapOutgoingMessage(msg1, msgId1, channel1)
check wrapped1.isOk()
let msg2 = @[byte(4), 5, 6]
let msgId2 = "msg2"
let wrapped2 = await rm.wrapOutgoingMessage(msg2, msgId2, channel2)
check wrapped2.isOk()
# Unwrap messages - should extract channel ID and route correctly
let unwrap1 = await rm.unwrapReceivedMessage(wrapped1.get())
check unwrap1.isOk()
let (content1, deps1, extractedChannel1) = unwrap1.get()
check:
content1 == msg1
deps1.len == 0
extractedChannel1 == channel1
let unwrap2 = await rm.unwrapReceivedMessage(wrapped2.get())
check unwrap2.isOk()
let (content2, deps2, extractedChannel2) = unwrap2.get()
check:
content2 == msg2
deps2.len == 0
extractedChannel2 == channel2
asyncTest "channel isolation":
let channel1 = "isolated-channel-1"
let channel2 = "isolated-channel-2"
# Add messages to different channels
let msg1 = @[byte(1)]
let msgId1 = "isolated-msg1"
discard await rm.wrapOutgoingMessage(msg1, msgId1, channel1)
let msg2 = @[byte(2)]
let msgId2 = "isolated-msg2"
discard await rm.wrapOutgoingMessage(msg2, msgId2, channel2)
# Check channel-specific data is isolated
let history1 = await rm.getMessageHistory(channel1)
let history2 = await rm.getMessageHistory(channel2)
check:
history1.len == 1
history2.len == 1
msgId1 in history1
msgId2 in history2
msgId1 notin history2
msgId2 notin history1
asyncTest "channel isolation (non-empty bloom)":
# With both channels carrying populated blooms, ids on one channel must
# not appear in the other's filter. An empty-bloom test cannot observe
# this — there is nothing to bleed across.
let channel1 = "iso-bloom-1"
let channel2 = "iso-bloom-2"
check (await rm.ensureChannel(channel1)).isOk()
check (await rm.ensureChannel(channel2)).isOk()
rm.seedBloom(channel1, 25, prefix = "ch1-")
rm.seedBloom(channel2, 25, prefix = "ch2-")
let wrap1 = await rm.wrapOutgoingMessage(@[byte(1)], "iso-msg-1", channel1)
let wrap2 = await rm.wrapOutgoingMessage(@[byte(2)], "iso-msg-2", channel2)
check wrap1.isOk() and wrap2.isOk()
let bf1 = rm.channels[channel1].bloomFilter
let bf2 = rm.channels[channel2].bloomFilter
check:
bf1.contains("ch1-0")
bf1.contains("iso-msg-1")
not bf1.contains("ch2-0")
not bf1.contains("iso-msg-2")
bf2.contains("ch2-0")
bf2.contains("iso-msg-2")
not bf2.contains("ch1-0")
not bf2.contains("iso-msg-1")
asyncTest "multi-channel callbacks":
var readyMessageCount = 0
var sentMessageCount = 0
var missingDepsCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
readyMessageCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
sentMessageCount += 1,
proc(
messageId: SdsMessageID, deps: seq[HistoryEntry], channelId: SdsChannelID
) {.gcsafe.} =
missingDepsCount += 1,
)
let channel1 = "callback-channel-1"
let channel2 = "callback-channel-2"
# Send messages from both channels
let msg1 = @[byte(1)]
let msgId1 = "callback-msg1"
let wrapped1 = await rm.wrapOutgoingMessage(msg1, msgId1, channel1)
check wrapped1.isOk()
let msg2 = @[byte(2)]
let msgId2 = "callback-msg2"
let wrapped2 = await rm.wrapOutgoingMessage(msg2, msgId2, channel2)
check wrapped2.isOk()
# Create acknowledgment messages that include our message IDs in causal history
# to trigger sent callbacks
let ackMsg1 = SdsMessage.init(
messageId = "ack1",
lamportTimestamp = rm.channels[channel1].lamportTimestamp + 1,
causalHistory = toCausalHistory(@[msgId1]), # Acknowledge msg1
channelId = channel1,
content = @[byte(100)],
bloomFilter = @[],
)
let ackMsg2 = SdsMessage.init(
messageId = "ack2",
lamportTimestamp = rm.channels[channel2].lamportTimestamp + 1,
causalHistory = toCausalHistory(@[msgId2]), # Acknowledge msg2
channelId = channel2,
content = @[byte(101)],
bloomFilter = @[],
)
let serializedAck1 = serializeMessage(ackMsg1)
let serializedAck2 = serializeMessage(ackMsg2)
check:
serializedAck1.isOk()
serializedAck2.isOk()
# Process acknowledgment messages - should trigger callbacks
discard await rm.unwrapReceivedMessage(serializedAck1.get())
discard await rm.unwrapReceivedMessage(serializedAck2.get())
check:
readyMessageCount == 2 # Both ack messages should trigger ready callbacks
sentMessageCount == 2 # Both original messages should be marked as sent
missingDepsCount == 0 # No missing dependencies
asyncTest "channel-specific dependency management":
let channel1 = "dep-channel-1"
let channel2 = "dep-channel-2"
let depIds = @["dep1", "dep2", "dep3"]
# Ensure both channels exist first
check (await rm.ensureChannel(channel1)).isOk()
check (await rm.ensureChannel(channel2)).isOk()
# Mark dependencies as met for specific channel
check (await rm.markDependenciesMet(depIds, channel1)).isOk()
# Dependencies should only affect the specified channel
# Dependencies in channel1 should not affect channel2
check rm.channels[channel1].bloomFilter.contains("dep1")
check not rm.channels[channel2].bloomFilter.contains("dep1")
# SDS-R Repair tests
suite "SDS-R: Computation Functions":
test "computeTReq returns duration in [tMin, tMax)":
let tMin = initDuration(seconds = 30)
let tMax = initDuration(seconds = 300)
let d = computeTReq("participant1", "msg1", tMin, tMax)
check:
d.inMilliseconds >= tMin.inMilliseconds
d.inMilliseconds < tMax.inMilliseconds
test "computeTReq is deterministic for same inputs":
let tMin = initDuration(seconds = 30)
let tMax = initDuration(seconds = 300)
let d1 = computeTReq("p1", "m1", tMin, tMax)
let d2 = computeTReq("p1", "m1", tMin, tMax)
check d1 == d2
test "computeTReq varies with different participants":
let tMin = initDuration(seconds = 30)
let tMax = initDuration(seconds = 300)
let d1 = computeTReq("participant-A", "msg1", tMin, tMax)
let d2 = computeTReq("participant-B", "msg1", tMin, tMax)
# Different participants should generally get different backoff (not guaranteed but highly likely)
# Just check both are in valid range
check:
d1.inMilliseconds >= tMin.inMilliseconds
d2.inMilliseconds >= tMin.inMilliseconds
test "computeTResp original sender has zero distance":
let d = computeTResp("sender1", "sender1", "msg1", initDuration(seconds = 300))
check d.inMilliseconds == 0
test "computeTResp non-sender has positive backoff":
let d = computeTResp("other-node", "sender1", "msg1", initDuration(seconds = 300))
check d.inMilliseconds >= 0
test "isInResponseGroup all in same group when numGroups =1":
check isInResponseGroup("p1", "sender1", "msg1", 1) == true
check isInResponseGroup("p2", "sender1", "msg1", 1) == true
test "isInResponseGroup sender always in own group":
# Original sender must always be in their own response group
for groups in 1 .. 10:
check isInResponseGroup("sender1", "sender1", "msg1", groups) == true
suite "SDS-R: Repair Buffer Management":
var rm: ReliabilityManager
asyncSetup:
let rmResult = newReliabilityManager(participantId = "test-participant")
check rmResult.isOk()
rm = rmResult.get()
check (await rm.ensureChannel(testChannel)).isOk()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "missing deps added to outgoing repair buffer":
var missingDepsCount = 0
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
missingDepsCount += 1,
)
# Create a message with a missing dependency
let msg = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = 2,
causalHistory = @[HistoryEntry(messageId: "msg1", senderId: "sender-A")],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
let serialized = serializeMessage(msg).get()
let result = await rm.unwrapReceivedMessage(serialized)
check result.isOk()
# msg1 should be in the outgoing repair buffer
let channel = rm.channels[testChannel]
check:
missingDepsCount == 1
"msg1" in channel.outgoingRepairBuffer
asyncTest "receiving message clears it from repair buffers":
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
# First, create the missing dep scenario
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = 2,
causalHistory = @[HistoryEntry(messageId: "msg1", senderId: "sender-A")],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg2).get())
check "msg1" in rm.channels[testChannel].outgoingRepairBuffer
# Now receive msg1 — should clear from repair buffer
let msg1 = SdsMessage.init(
messageId = "msg1",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg1).get())
check "msg1" notin rm.channels[testChannel].outgoingRepairBuffer
asyncTest "markDependenciesMet clears repair buffers":
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
let msg2 = SdsMessage.init(
messageId = "msg2",
lamportTimestamp = 2,
causalHistory = @[HistoryEntry(messageId: "msg1", senderId: "sender-A")],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg2).get())
check "msg1" in rm.channels[testChannel].outgoingRepairBuffer
# Mark as met via store retrieval
check (await rm.markDependenciesMet(@["msg1"], testChannel)).isOk()
check "msg1" notin rm.channels[testChannel].outgoingRepairBuffer
asyncTest "expired repair requests attached to outgoing messages":
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
# Manually add an expired repair entry
let channel = rm.channels[testChannel]
channel.outgoingRepairBuffer["missing-msg"] = OutgoingRepairEntry(
outHistEntry: HistoryEntry(messageId: "missing-msg", senderId: "orig-sender"),
minTimeRepairReq: getTime() - initDuration(seconds = 10), # Already expired
)
# Send a message — should pick up the expired repair request
let wrapped = await rm.wrapOutgoingMessage(@[byte(1)], "new-msg", testChannel)
check wrapped.isOk()
let unwrapped = deserializeMessage(wrapped.get()).get()
check:
unwrapped.repairRequest.len == 1
unwrapped.repairRequest[0].messageId == "missing-msg"
# Should be removed from buffer after attaching
"missing-msg" notin channel.outgoingRepairBuffer
asyncTest "expired repair requests attach the most-overdue first when capped":
# Per spec (sds-r-send-message, RECOMMENDED): when more entries are
# eligible than maxRepairRequests, attach the ones with the smallest
# minTimeRepairReq — i.e. the most overdue.
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
let channel = rm.channels[testChannel]
let now = getTime()
# Five eligible entries with strictly ordered minTimeRepairReq (most-overdue first).
# All are expired; the cap is the default 3, so two should be left behind.
let expected = ["oldest", "second", "third", "fourth", "newest"]
for i, id in expected:
channel.outgoingRepairBuffer[id] = OutgoingRepairEntry(
outHistEntry: HistoryEntry(messageId: id, senderId: "sender"),
minTimeRepairReq: now - initDuration(seconds = 50 - i * 10),
)
let wrapped = await rm.wrapOutgoingMessage(@[byte(1)], "outbound", testChannel)
check wrapped.isOk()
let attached = deserializeMessage(wrapped.get()).get().repairRequest
check:
attached.len == rm.config.maxRepairRequests
attached[0].messageId == "oldest"
attached[1].messageId == "second"
attached[2].messageId == "third"
# Two least-overdue remain in the buffer for next time.
"fourth" in channel.outgoingRepairBuffer
"newest" in channel.outgoingRepairBuffer
"oldest" notin channel.outgoingRepairBuffer
"second" notin channel.outgoingRepairBuffer
"third" notin channel.outgoingRepairBuffer
asyncTest "incoming repair request adds to incoming repair buffer when eligible":
await rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(
messageId: SdsMessageID,
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
) {.gcsafe.} =
discard,
)
let channel = rm.channels[testChannel]
# First, seed delivered history so we can respond to a repair request for it
let cachedMsg = SdsMessage.init(
messageId = "cached-msg",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(99)],
bloomFilter = @[],
)
channel.messageHistory["cached-msg"] = cachedMsg
# Receive a message with a repair request for "cached-msg"
let msgWithRepair = SdsMessage.init(
messageId = "requester-msg",
lamportTimestamp = 5,
causalHistory = @[],
channelId = testChannel,
content = @[byte(3)],
bloomFilter = @[],
repairRequest = @[
HistoryEntry(
messageId: "cached-msg",
senderId: "test-participant",
# Same as our participantId so we're in response group
)
],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msgWithRepair).get())
# We should have added it to the incoming repair buffer (we have the message and are in response group)
check "cached-msg" in channel.incomingRepairBuffer
suite "SDS-R: Protobuf Roundtrip":
test "senderId in HistoryEntry roundtrips through protobuf":
let msg = SdsMessage.init(
messageId = "msg1",
lamportTimestamp = 100,
causalHistory = @[
HistoryEntry(
messageId: "dep1", retrievalHint: @[byte(1), 2], senderId: "sender-A"
),
HistoryEntry(messageId: "dep2", senderId: "sender-B"),
],
channelId = "ch1",
content = @[byte(42)],
bloomFilter = @[],
)
let serialized = serializeMessage(msg).get()
let decoded = deserializeMessage(serialized).get()
check:
decoded.causalHistory.len == 2
decoded.causalHistory[0].messageId == "dep1"
decoded.causalHistory[0].senderId == "sender-A"
decoded.causalHistory[0].retrievalHint == @[byte(1), 2]
decoded.causalHistory[1].messageId == "dep2"
decoded.causalHistory[1].senderId == "sender-B"
test "repairRequest field roundtrips through protobuf":
let msg = SdsMessage.init(
messageId = "msg1",
lamportTimestamp = 100,
causalHistory = @[],
channelId = "ch1",
content = @[byte(42)],
bloomFilter = @[],
repairRequest = @[
HistoryEntry(messageId: "missing1", senderId: "sender-X"),
HistoryEntry(
messageId: "missing2", senderId: "sender-Y", retrievalHint: @[byte(5)]
),
],
)
let serialized = serializeMessage(msg).get()
let decoded = deserializeMessage(serialized).get()
check:
decoded.repairRequest.len == 2
decoded.repairRequest[0].messageId == "missing1"
decoded.repairRequest[0].senderId == "sender-X"
decoded.repairRequest[1].messageId == "missing2"
decoded.repairRequest[1].senderId == "sender-Y"
decoded.repairRequest[1].retrievalHint == @[byte(5)]
test "backward compat: message without repairRequest decodes fine":
let msg = SdsMessage.init(
messageId = "msg1",
lamportTimestamp = 100,
causalHistory = @[HistoryEntry(messageId: "dep1")],
channelId = "ch1",
content = @[byte(42)],
bloomFilter = @[],
)
let serialized = serializeMessage(msg).get()
let decoded = deserializeMessage(serialized).get()
check:
decoded.repairRequest.len == 0
decoded.causalHistory[0].senderId == ""
test "SdsMessage.senderId roundtrips through protobuf":
let msg = SdsMessage.init(
messageId = "m1",
lamportTimestamp = 1,
causalHistory = @[],
channelId = "ch1",
content = @[byte(1)],
bloomFilter = @[],
senderId = "alice",
)
let decoded = deserializeMessage(serializeMessage(msg).get()).get()
check decoded.senderId == "alice"
# ---------------------------------------------------------------------------
# SDS-R Phase 2 tests: edge cases, lifecycle, sweep, and multi-participant flows
# ---------------------------------------------------------------------------
suite "SDS-R: Edge Cases and Defensive Branches":
test "computeTReq returns tMin when range is degenerate":
let tMin = initDuration(seconds = 30)
# tMax == tMin
let d1 = computeTReq("p", "m", tMin, tMin)
check d1 == tMin
# tMax < tMin (rangeMs < 0)
let d2 = computeTReq("p", "m", tMin, initDuration(seconds = 10))
check d2 == tMin
test "computeTResp returns 0 when tMax is 0":
let d = computeTResp("p", "other", "m", initDuration(milliseconds = 0))
check d.inMilliseconds == 0
test "computeTResp always stays within [0, tMax)":
# Adversarial sweep — result must never wrap negative nor exceed tMax
let tMax = initDuration(seconds = 300)
for i in 0 ..< 500:
let d = computeTResp(
"participant-" & $i, "sender-" & $(i * 13), "msg-" & $(i * 31), tMax
)
check:
d.inMilliseconds >= 0
d.inMilliseconds < tMax.inMilliseconds
test "isInResponseGroup returns true for non-positive numGroups":
check isInResponseGroup("p", "sender", "m", 0) == true
check isInResponseGroup("p", "sender", "m", -1) == true
test "computeTReq bounds across many random inputs":
let tMin = initDuration(seconds = 30)
let tMax = initDuration(seconds = 300)
for i in 0 ..< 200:
let d = computeTReq("p-" & $i, "m-" & $i, tMin, tMax)
check:
d.inMilliseconds >= tMin.inMilliseconds
d.inMilliseconds < tMax.inMilliseconds
test "response group distribution is roughly uniform":
# With numGroups =10, ~10% of random participants should share sender's group.
const numGroups = 10
const totalParticipants = 1000
let senderId = "alice"
let msgId = "msg-xyz"
var sameGroup = 0
for i in 0 ..< totalParticipants:
if isInResponseGroup("participant-" & $i, senderId, msgId, numGroups):
sameGroup += 1
# Expected ~100 (1/N), allow [50, 200] band for hash quirks
check:
sameGroup >= 50
sameGroup <= 200
test "computeTResp monotonicity: self always fastest":
# The original sender (distance =0) must always be first to respond.
let tMax = initDuration(seconds = 300)
let selfD = computeTResp("alice", "alice", "msg-xyz", tMax)
check selfD.inMilliseconds == 0
for i in 0 ..< 50:
let other = computeTResp("other-" & $i, "alice", "msg-xyz", tMax)
check other.inMilliseconds >= selfD.inMilliseconds
suite "SDS-R: Lifecycle and State":
asyncTest "empty participantId disables outgoing repair creation":
# Explicitly pass empty id to exercise the SDS-R no-op branch. Required-arg
# signature means callers can no longer accidentally land here.
let rm = newReliabilityManager(participantId = "".SdsParticipantID).get()
check (await rm.ensureChannel(testChannel)).isOk()
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
)
let msg = SdsMessage.init(
messageId = "m2",
lamportTimestamp = 2,
causalHistory = @[HistoryEntry(messageId: "m1-missing", senderId: "alice")],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg).get())
check rm.channels[testChannel].outgoingRepairBuffer.len == 0
await rm.cleanup()
asyncTest "empty senderId in incoming repair request is ignored":
let rm = newReliabilityManager(participantId = "bob").get()
check (await rm.ensureChannel(testChannel)).isOk()
let channel = rm.channels[testChannel]
channel.messageHistory["m-wanted"] = SdsMessage.init(
messageId = "m-wanted",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(99), 99, 99],
bloomFilter = @[],
)
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
)
let msg = SdsMessage.init(
messageId = "req-msg",
lamportTimestamp = 5,
causalHistory = @[],
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[],
repairRequest = @[HistoryEntry(messageId: "m-wanted", senderId: "")],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg).get())
check "m-wanted" notin channel.incomingRepairBuffer
await rm.cleanup()
asyncTest "wrapOutgoingMessage records the message in history with our senderId":
# Proves Bug 1 is fixed — the original sender can serve her own message.
# In the consolidated history model, the SdsMessage itself carries senderId
# and can be re-serialized on demand for repair, so a single membership
# check + senderId read covers both halves of the original assertion.
let rm = newReliabilityManager(participantId = "alice").get()
check (await rm.ensureChannel(testChannel)).isOk()
discard await rm.wrapOutgoingMessage(@[byte(1), 2, 3], "m1", testChannel)
let channel = rm.channels[testChannel]
check:
"m1" in channel.messageHistory
channel.messageHistory["m1"].senderId == "alice"
channel.messageHistory["m1"].content == @[byte(1), 2, 3]
await rm.cleanup()
asyncTest "getRecentHistoryEntries carries senderId for own messages":
let rm = newReliabilityManager(participantId = "alice").get()
check (await rm.ensureChannel(testChannel)).isOk()
discard await rm.wrapOutgoingMessage(@[byte(1)], "m1", testChannel)
discard await rm.wrapOutgoingMessage(@[byte(2)], "m2", testChannel)
let entries = await rm.getRecentHistoryEntries(10, testChannel)
check:
entries.len == 2
entries[0].senderId == "alice"
entries[1].senderId == "alice"
await rm.cleanup()
asyncTest "resetReliabilityManager clears all SDS-R state":
let rm = newReliabilityManager(participantId = "alice").get()
check (await rm.ensureChannel(testChannel)).isOk()
let channel = rm.channels[testChannel]
channel.outgoingRepairBuffer["a"] = OutgoingRepairEntry(
outHistEntry: HistoryEntry(messageId: "a", senderId: "x"),
minTimeRepairReq: getTime(),
)
channel.incomingRepairBuffer["b"] = IncomingRepairEntry(
inHistEntry: HistoryEntry(messageId: "b", senderId: "y"),
cachedMessage: @[byte(1)],
minTimeRepairResp: getTime(),
)
channel.messageHistory["c"] = SdsMessage.init(
messageId = "c",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(2)],
bloomFilter = @[],
senderId = "someone",
)
check (await rm.resetReliabilityManager()).isOk()
check (await rm.ensureChannel(testChannel)).isOk()
let ch2 = rm.channels[testChannel]
check:
ch2.outgoingRepairBuffer.len == 0
ch2.incomingRepairBuffer.len == 0
ch2.messageHistory.len == 0
await rm.cleanup()
asyncTest "SDS-R state is isolated per channel":
let rm = newReliabilityManager(participantId = "alice").get()
check (await rm.ensureChannel("ch-A")).isOk()
check (await rm.ensureChannel("ch-B")).isOk()
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
)
let msg = SdsMessage.init(
messageId = "m2",
lamportTimestamp = 2,
causalHistory = @[HistoryEntry(messageId: "m1-missing", senderId: "bob")],
channelId = "ch-A",
content = @[byte(2)],
bloomFilter = @[],
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg).get())
check:
rm.channels["ch-A"].outgoingRepairBuffer.len == 1
rm.channels["ch-B"].outgoingRepairBuffer.len == 0
await rm.cleanup()
asyncTest "duplicate message arrival cancels pending incoming repair entry":
# Covers the dedup-before-cleanup fix: a rebroadcast arriving at a peer who
# already has the message must clear that peer's incomingRepairBuffer entry.
let rm = newReliabilityManager(participantId = "carol").get()
check (await rm.ensureChannel(testChannel)).isOk()
let channel = rm.channels[testChannel]
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
)
# Carol already has M1 in history and has a pending incomingRepairBuffer entry
channel.messageHistory["m1"] = SdsMessage.init(
messageId = "m1",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[],
)
channel.incomingRepairBuffer["m1"] = IncomingRepairEntry(
inHistEntry: HistoryEntry(messageId: "m1", senderId: "alice"),
cachedMessage: @[byte(1)],
minTimeRepairResp: getTime() + initDuration(seconds = 10),
)
# A rebroadcast of M1 arrives
let msg = SdsMessage.init(
messageId = "m1",
lamportTimestamp = 1,
causalHistory = @[],
channelId = testChannel,
content = @[byte(1)],
bloomFilter = @[],
senderId = "alice",
)
discard await rm.unwrapReceivedMessage(serializeMessage(msg).get())
check "m1" notin channel.incomingRepairBuffer
await rm.cleanup()
suite "SDS-R: Repair Sweep":
var rm: ReliabilityManager
asyncSetup:
rm = newReliabilityManager(participantId = "bob").get()
check (await rm.ensureChannel(testChannel)).isOk()
asyncTeardown:
if not rm.isNil:
await rm.cleanup()
asyncTest "runRepairSweep fires onRepairReady for expired tResp":
var fireCount = 0
var firstBytes: seq[byte] = @[]
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
onRepairReady = proc(bytes: seq[byte], ch: SdsChannelID) {.gcsafe.} =
{.cast(gcsafe).}:
fireCount += 1
if fireCount == 1:
firstBytes = bytes
,
)
let channel = rm.channels[testChannel]
channel.incomingRepairBuffer["m-ready"] = IncomingRepairEntry(
inHistEntry: HistoryEntry(messageId: "m-ready", senderId: "alice"),
cachedMessage: @[byte(1), 2, 3],
minTimeRepairResp: getTime() - initDuration(seconds = 1), # expired
)
channel.incomingRepairBuffer["m-not-ready"] = IncomingRepairEntry(
inHistEntry: HistoryEntry(messageId: "m-not-ready", senderId: "alice"),
cachedMessage: @[byte(9), 9, 9],
minTimeRepairResp: getTime() + initDuration(minutes = 10), # far future
)
await rm.runRepairSweep()
check:
fireCount == 1
firstBytes == @[byte(1), 2, 3]
"m-ready" notin channel.incomingRepairBuffer
"m-not-ready" in channel.incomingRepairBuffer
asyncTest "runRepairSweep drops outgoing entries past T_max window":
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
)
let channel = rm.channels[testChannel]
let tMax = rm.config.repairTMax
channel.outgoingRepairBuffer["m-stale"] = OutgoingRepairEntry(
outHistEntry: HistoryEntry(messageId: "m-stale", senderId: "alice"),
minTimeRepairReq: getTime() - (tMax + tMax), # now - 2*T_max, past drop window
)
channel.outgoingRepairBuffer["m-fresh"] = OutgoingRepairEntry(
outHistEntry: HistoryEntry(messageId: "m-fresh", senderId: "alice"),
minTimeRepairReq: getTime(),
)
await rm.runRepairSweep()
check:
"m-stale" notin channel.outgoingRepairBuffer
"m-fresh" in channel.outgoingRepairBuffer
asyncTest "runRepairSweep no-op when buffers are empty":
var fireCount = 0
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
onRepairReady = proc(bytes: seq[byte], ch: SdsChannelID) {.gcsafe.} =
fireCount += 1,
)
await rm.runRepairSweep()
check fireCount == 0
# --- Multi-participant in-process bus for integration tests ---------------
type TestBus = ref object
peers: OrderedTable[SdsParticipantID, ReliabilityManager]
delivered: Table[SdsParticipantID, seq[SdsMessageID]]
# Log of raw message-ids placed on the wire, tagged with the source peer.
wireLog: seq[tuple[senderId: SdsParticipantID, messageId: SdsMessageID]]
# Queue of (sender, bytes) the repair callback would have delivered if it
# could await. Drained explicitly by `bus.drain()` from the test body.
pending: seq[(SdsParticipantID, seq[byte])]
proc newTestBus(): TestBus =
TestBus(
peers: initOrderedTable[SdsParticipantID, ReliabilityManager](),
delivered: initTable[SdsParticipantID, seq[SdsMessageID]](),
wireLog: @[],
pending: @[],
)
proc recordWire(bus: TestBus, senderId: SdsParticipantID, bytes: seq[byte]) {.gcsafe.} =
let decoded = deserializeMessage(bytes)
if decoded.isOk():
bus.wireLog.add((senderId, decoded.get().messageId))
proc deliverExcept(
bus: TestBus,
senderId: SdsParticipantID,
bytes: seq[byte],
exclude: seq[SdsParticipantID],
) {.async: (raises: []).} =
for pid, peer in bus.peers:
if pid == senderId or pid in exclude:
continue
discard await peer.unwrapReceivedMessage(bytes)
proc drain(bus: TestBus): Future[void] {.async.} =
## Delivers every (sender, bytes) the repair callback enqueued. Loops until
## the queue stays empty across one full pass — a delivery may trigger a
## new repair-ready callback that re-enqueues.
while bus.pending.len > 0:
let batch = move bus.pending
bus.pending = @[]
for entry in batch:
await bus.deliverExcept(entry[0], entry[1], @[])
proc addPeer(
bus: TestBus,
participantId: SdsParticipantID,
config: ReliabilityConfig = defaultConfig(),
): Future[ReliabilityManager] {.async.} =
let rm = newReliabilityManager(participantId, config).get()
doAssert (await rm.ensureChannel(testChannel)).isOk()
bus.peers[participantId] = rm
bus.delivered[participantId] = @[]
let pid = participantId
let busRef = bus
await rm.setCallbacks(
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
{.cast(gcsafe).}:
busRef.delivered[pid].add(msgId),
proc(msgId: SdsMessageID, ch: SdsChannelID) {.gcsafe.} =
discard,
proc(msgId: SdsMessageID, deps: seq[HistoryEntry], ch: SdsChannelID) {.gcsafe.} =
discard,
onRepairReady = proc(bytes: seq[byte], ch: SdsChannelID) {.gcsafe.} =
# The callback contract is sync, so we cannot `await` here. Enqueue the
# delivery and let the test drive it via `bus.drain()` instead.
{.cast(gcsafe).}:
busRef.recordWire(pid, bytes)
busRef.pending.add((pid, bytes)),
)
return rm
proc broadcast(
bus: TestBus,
senderId: SdsParticipantID,
content: seq[byte],
messageId: SdsMessageID,
dropAt: seq[SdsParticipantID] = @[],
): Future[void] {.async.} =
let rm = bus.peers[senderId]
let wrapped = await rm.wrapOutgoingMessage(content, messageId, testChannel)
doAssert wrapped.isOk()
bus.recordWire(senderId, wrapped.get())
await bus.deliverExcept(senderId, wrapped.get(), dropAt)
proc forceOutgoingExpired(rm: ReliabilityManager, messageId: SdsMessageID) =
## Push a specific outgoingRepairBuffer entry's minTimeRepairReq into the past so the
## next wrapOutgoingMessage will pick it up.
let channel = rm.channels[testChannel]
if messageId in channel.outgoingRepairBuffer:
channel.outgoingRepairBuffer[messageId].minTimeRepairReq =
getTime() - initDuration(seconds = 1)
proc forceIncomingExpired(rm: ReliabilityManager, messageId: SdsMessageID) =
## Push an incomingRepairBuffer entry's minTimeRepairResp into the past so runRepairSweep fires it.
let channel = rm.channels[testChannel]
if messageId in channel.incomingRepairBuffer:
channel.incomingRepairBuffer[messageId].minTimeRepairResp =
getTime() - initDuration(seconds = 1)
suite "SDS-R: Multi-Participant Integration":
asyncTest "basic single-gap repair (Alice -> Bob misses -> Carol's message triggers repair)":
let bus = newTestBus()
let alice = await bus.addPeer("alice")
let bob = await bus.addPeer("bob")
let carol = await bus.addPeer("carol")
# Alice sends M1, but Bob is offline for this one.
await bus.broadcast("alice", @[byte(1)], "m1", dropAt = @["bob".SdsParticipantID])
# Carol now has M1; Bob does not.
check "m1" in carol.channels[testChannel].messageHistory
check "m1" notin bob.channels[testChannel].messageHistory
# Carol sends M2 with causal history referencing M1.
await bus.broadcast("carol", @[byte(2)], "m2")
# Bob detects M1 missing and populates his outgoingRepairBuffer.
check "m1" in bob.channels[testChannel].outgoingRepairBuffer
# Bob should have buffered M2.
check "m2" in bob.channels[testChannel].incomingBuffer
check "m2" notin bus.delivered["bob"]
# Force Bob's T_req so the next wrap attaches the repair request.
bob.forceOutgoingExpired("m1")
# Bob sends M3 — it must carry repair_request =[M1, sender =alice].
await bus.broadcast("bob", @[byte(3)], "m3")
# Alice received M3, saw the repair_request, cached-bypass and response-group
# checks pass, so she has an incomingRepairBuffer entry for M1 with tResp =0.
check "m1" in alice.channels[testChannel].incomingRepairBuffer
# Force alice's tResp to past just to be safe (it's already 0 for self),
# then run her sweep. She rebroadcasts M1.
alice.forceIncomingExpired("m1")
await alice.runRepairSweep()
await bus.drain()
# Bob now has M1 and M2 delivered.
check:
"m1" in bus.delivered["bob"]
"m2" in bus.delivered["bob"]
asyncTest "response cancellation: only one rebroadcast on the wire":
let bus = newTestBus()
let alice = await bus.addPeer("alice")
let bob = await bus.addPeer("bob")
let carol = await bus.addPeer("carol")
# Alice sends M1, Bob offline.
await bus.broadcast("alice", @[byte(1)], "m1", dropAt = @["bob".SdsParticipantID])
# Carol sends M2; Bob sees M1 missing.
await bus.broadcast("carol", @[byte(2)], "m2")
check "m1" in bob.channels[testChannel].outgoingRepairBuffer
# Bob requests repair.
bob.forceOutgoingExpired("m1")
await bus.broadcast("bob", @[byte(3)], "m3")
# Both Alice and Carol now have an incomingRepairBuffer entry for M1.
check:
"m1" in alice.channels[testChannel].incomingRepairBuffer
"m1" in carol.channels[testChannel].incomingRepairBuffer
# Alice fires first (T_resp =0 for self). Her rebroadcast should cancel Carol's
# pending entry when Carol receives the rebroadcast.
alice.forceIncomingExpired("m1")
await alice.runRepairSweep()
await bus.drain()
# Carol's pending response must have been cleared by the dedup-path cleanup.
check "m1" notin carol.channels[testChannel].incomingRepairBuffer
# Even if we now force-run Carol's sweep, nothing should fire.
let wireCountBefore = bus.wireLog.len
await carol.runRepairSweep()
await bus.drain()
check bus.wireLog.len == wireCountBefore
# Bob received exactly one rebroadcast of M1.
var m1RebroadcastCount = 0
for entry in bus.wireLog:
if entry.messageId == "m1" and entry.senderId != "alice":
discard # only the original Alice->all broadcast had senderId ="alice"
if entry.messageId == "m1":
m1RebroadcastCount += 1
# Two "m1" entries total on wire: (1) Alice's original broadcast, (2) Alice's rebroadcast.
check m1RebroadcastCount == 2
asyncTest "cancellation on incoming repair request: peer drops its own pending request":
let bus = newTestBus()
let alice = await bus.addPeer("alice")
let bob = await bus.addPeer("bob")
let carol = await bus.addPeer("carol")
# Alice sends M1 — drop at both Bob and Carol, so both miss it.
await bus.broadcast(
"alice",
@[byte(1)],
"m1",
dropAt = @["bob".SdsParticipantID, "carol".SdsParticipantID],
)
# Alice sends M2 referencing M1 — both Bob and Carol see M1 missing.
await bus.broadcast("alice", @[byte(2)], "m2")
check:
"m1" in bob.channels[testChannel].outgoingRepairBuffer
"m1" in carol.channels[testChannel].outgoingRepairBuffer
# Bob's T_req fires first. He sends a repair request for M1.
bob.forceOutgoingExpired("m1")
await bus.broadcast("bob", @[byte(3)], "m3")
# Carol, on receiving Bob's repair request, must have dropped her own
# pending outgoingRepairBuffer entry for M1 (cancellation).
check "m1" notin carol.channels[testChannel].outgoingRepairBuffer
asyncTest "response group filtering: only group members respond":
# With numGroups =10, roughly 1/10 of receivers will be in the group.
# Construct a sender+message where a specific non-sender is NOT in the group.
var cfg = defaultConfig()
cfg.numResponseGroups = 10
# Pick a msgId where carol is not in the group and bob is
# We probe deterministically because computeTReq/isInResponseGroup are pure.
var chosenMsg = ""
for i in 0 ..< 1000:
let candidate = "probe-" & $i
let bobIn = isInResponseGroup("bob", "alice", candidate, 10)
let carolIn = isInResponseGroup("carol", "alice", candidate, 10)
if bobIn and not carolIn:
chosenMsg = candidate
break
check chosenMsg.len > 0
let bus = newTestBus()
discard await bus.addPeer("alice", cfg)
let bob = await bus.addPeer("bob", cfg)
let carol = await bus.addPeer("carol", cfg)
# Both Bob and Carol receive the original M1 (so both have it in messageHistory).
await bus.broadcast("alice", @[byte(1)], chosenMsg)
# Now Dave arrives: build a fake requester message manually so its repair_request
# names Alice as senderId for chosenMsg.
# We inject directly by calling unwrapReceivedMessage on bob/carol.
discard await bus.addPeer("dave", cfg)
# Dave has no messages, but we can hand-craft a repair request he would send.
let reqMsg = SdsMessage.init(
messageId = "req-from-dave",
lamportTimestamp = 10,
causalHistory = @[],
channelId = testChannel,
content = @[byte(9)],
bloomFilter = @[],
senderId = "dave",
repairRequest = @[HistoryEntry(messageId: chosenMsg, senderId: "alice")],
)
let bytes = serializeMessage(reqMsg).get()
discard await bob.unwrapReceivedMessage(bytes)
discard await carol.unwrapReceivedMessage(bytes)
check:
chosenMsg in bob.channels[testChannel].incomingRepairBuffer
chosenMsg notin carol.channels[testChannel].incomingRepairBuffer
asyncTest "multi-gap batch repair: many missing deps split across requests":
let bus = newTestBus()
discard await bus.addPeer("alice")
let bob = await bus.addPeer("bob")
# Alice sends 5 messages while Bob is offline.
let drops = @["bob".SdsParticipantID]
await bus.broadcast("alice", @[byte(1)], "m1", dropAt = drops)
await bus.broadcast("alice", @[byte(2)], "m2", dropAt = drops)
await bus.broadcast("alice", @[byte(3)], "m3", dropAt = drops)
await bus.broadcast("alice", @[byte(4)], "m4", dropAt = drops)
await bus.broadcast("alice", @[byte(5)], "m5", dropAt = drops)
# Bob comes online and receives M6 which depends on m1..m5.
await bus.broadcast("alice", @[byte(6)], "m6")
# Bob should have 5 outgoing repair entries.
let channel = bob.channels[testChannel]
check channel.outgoingRepairBuffer.len == 5
# Force all to expired and wrap one message — only maxRepairRequests
# (default 3) should attach to a single outgoing message.
for id in ["m1", "m2", "m3", "m4", "m5"]:
bob.forceOutgoingExpired(id)
let wrapped =
(await bob.wrapOutgoingMessage(@[byte(99)], "bob-msg-1", testChannel)).get()
let decoded = deserializeMessage(wrapped).get()
check decoded.repairRequest.len <= bob.config.maxRepairRequests
# The attached entries should be removed from the outgoing buffer.
check channel.outgoingRepairBuffer.len == 5 - decoded.repairRequest.len
asyncTest "markDependenciesMet externally clears pending repair entry":
let bus = newTestBus()
discard await bus.addPeer("alice")
let bob = await bus.addPeer("bob")
await bus.broadcast("alice", @[byte(1)], "m1", dropAt = @["bob".SdsParticipantID])
await bus.broadcast("alice", @[byte(2)], "m2")
check "m1" in bob.channels[testChannel].outgoingRepairBuffer
# Simulate Bob fetching M1 via an out-of-band store query.
check (await bob.markDependenciesMet(@["m1"], testChannel)).isOk()
check "m1" notin bob.channels[testChannel].outgoingRepairBuffer