From 3cf816da76684be5996e2f6a988266d16d505d2e Mon Sep 17 00:00:00 2001 From: darshankabariya Date: Thu, 30 Apr 2026 02:32:43 +0530 Subject: [PATCH] feat: update test for bloom filter + reliability manager --- tests/test_reliability.nim | 234 +++++++++++++++++++++++++++++++++++++ 1 file changed, 234 insertions(+) diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index ec0d40f..5528951 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -7,6 +7,16 @@ converter toParticipantID(s: string): SdsParticipantID = s.SdsParticipantID const testChannel = "testChannel" +proc seedBloom( + rm: ReliabilityManager, channel: SdsChannelID, n: int, prefix = "noise" +) = + ## Pre-populate a channel's bloom filter with n unrelated ids so the test + ## exercises the manager against a realistic, non-empty filter rather than + ## the implicit empty one a fresh ReliabilityManager would produce. + let ch = rm.channels[channel] + for i in 0 ..< n: + ch.bloomFilter.add(prefix & $i) + # Core functionality tests suite "Core Operations": var rm: ReliabilityManager @@ -45,6 +55,31 @@ suite "Core Operations": missingDeps.len == 0 channelId == testChannel + test "basic message wrapping and unwrapping (non-empty bloom)": + rm.seedBloom(testChannel, 50) + + let msg = @[byte(1), 2, 3] + let msgId = "test-msg-1" + + let wrappedResult = rm.wrapOutgoingMessage(msg, msgId, testChannel) + check wrappedResult.isOk() + let wrapped = wrappedResult.get() + check wrapped.len > 0 + + # The outgoing message must carry the populated bloom snapshot, not an + # empty one — this is the path that was never exercised before. + let decoded = deserializeMessage(wrapped) + check decoded.isOk() + check decoded.get().bloomFilter.len > 0 + + let unwrapResult = rm.unwrapReceivedMessage(wrapped) + check unwrapResult.isOk() + let (unwrapped, missingDeps, channelId) = unwrapResult.get() + check: + unwrapped == msg + missingDeps.len == 0 + channelId == testChannel + test "message ordering": # Create messages with different timestamps let msg1 = SdsMessage( @@ -170,6 +205,52 @@ suite "Reliability Mechanisms": messageReadyCount == 2 # Both msg2 and msg3 should be ready missingDepsCount == 2 # Should still be 2 from the initial missing deps + test "dependency detection and resolution (non-empty bloom)": + # A populated bloom filter must not short-circuit the dependency check. + # Dependency resolution reads messageHistory, not the bloom — but a future + # "optimisation" could regress this. Seed the bloom with the dep id so a + # bloom-based shortcut would mistakenly mark the dep as satisfied. + var missingDepsCount = 0 + var messageReadyCount = 0 + + rm.setCallbacks( + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + messageReadyCount += 1, + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + discard, + proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = + missingDepsCount += 1, + ) + + let id1 = "msg1" + let id2 = "msg2" + + rm.seedBloom(testChannel, 30) + # Crucially, also seed the bloom with id1 itself — the dep we will be + # missing from messageHistory. The manager must still report it missing. + rm.channels[testChannel].bloomFilter.add(id1) + + let msg2 = SdsMessage( + messageId: id2, + lamportTimestamp: 2, + causalHistory: toCausalHistory(@[id1]), + channelId: testChannel, + content: @[byte(2)], + bloomFilter: @[], + ) + let serialized2 = serializeMessage(msg2) + check serialized2.isOk() + + let unwrapResult = rm.unwrapReceivedMessage(serialized2.get()) + check unwrapResult.isOk() + let (_, missingDeps, _) = unwrapResult.get() + + check: + missingDepsCount == 1 + missingDeps.len == 1 + id1 in missingDeps.getMessageIds() + messageReadyCount == 0 + test "acknowledgment via causal history": var messageReadyCount = 0 var messageSentCount = 0 @@ -212,6 +293,47 @@ suite "Reliability Mechanisms": messageReadyCount == 1 # For msg2 which we "received" messageSentCount == 1 # For msg1 which was acknowledged via causal history + test "acknowledgment via causal history (non-empty bloom)": + # The causal-history ack path must not be perturbed by the local channel + # bloom carrying unrelated ids, and the empty bloom on the incoming + # message must not spuriously ack any of them. + var messageReadyCount = 0 + var messageSentCount = 0 + + rm.setCallbacks( + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + messageReadyCount += 1, + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + messageSentCount += 1, + proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = + discard, + ) + + rm.seedBloom(testChannel, 50) + + let msg1 = @[byte(1)] + let id1 = "msg1" + let wrap1 = rm.wrapOutgoingMessage(msg1, id1, testChannel) + check wrap1.isOk() + + let msg2 = SdsMessage( + messageId: "msg2", + lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1, + causalHistory: toCausalHistory(@[id1]), + channelId: testChannel, + content: @[byte(2)], + bloomFilter: @[], + ) + let serializedMsg2 = serializeMessage(msg2) + check serializedMsg2.isOk() + + let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get()) + check unwrapResult.isOk() + + check: + messageReadyCount == 1 + messageSentCount == 1 # exactly id1; no spurious acks for the seeded ids + test "acknowledgment via bloom filter": var messageSentCount = 0 @@ -255,6 +377,90 @@ suite "Reliability Mechanisms": check messageSentCount == 1 # Our message should be acknowledged via bloom filter + test "acknowledgment via bloom filter (non-empty bloom)": + # The peer's bloom contains both our outgoing id and a pile of unrelated + # ids. The manager must still ack our message exactly once, and unrelated + # ids in the peer's bloom must not produce spurious sent callbacks. + var messageSentCount = 0 + + rm.setCallbacks( + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + discard, + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + messageSentCount += 1, + proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = + discard, + ) + + let msg1 = @[byte(1)] + let id1 = "msg1" + let wrap1 = rm.wrapOutgoingMessage(msg1, id1, testChannel) + check wrap1.isOk() + + var otherPartyBloomFilter = + RollingBloomFilter.init(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate) + for i in 0 ..< 100: + otherPartyBloomFilter.add("peer-noise-" & $i) + otherPartyBloomFilter.add(id1) + + let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter) + check bfResult.isOk() + + let msg2 = SdsMessage( + messageId: "msg2", + lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1, + causalHistory: @[], + channelId: testChannel, + content: @[byte(2)], + bloomFilter: bfResult.get(), + ) + let serializedMsg2 = serializeMessage(msg2) + check serializedMsg2.isOk() + + let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get()) + check unwrapResult.isOk() + + check messageSentCount == 1 + + test "outgoing message bloom snapshot reflects channel state": + # Until now nothing asserts that wrapOutgoingMessage actually attaches + # the current bloom snapshot — every other test runs against an empty + # filter where the field is empty either way. + rm.seedBloom(testChannel, 40, prefix = "delivered-") + + # Plus a real delivery so we exercise the bloom-on-delivery path too. + let incoming = SdsMessage( + messageId: "incoming-1", + lamportTimestamp: 1, + causalHistory: @[], + channelId: testChannel, + content: @[byte(9)], + bloomFilter: @[], + ) + let serIncoming = serializeMessage(incoming) + check serIncoming.isOk() + discard rm.unwrapReceivedMessage(serIncoming.get()) + + let outId = "outgoing-1" + let wrapped = rm.wrapOutgoingMessage(@[byte(1)], outId, testChannel) + check wrapped.isOk() + + let decoded = deserializeMessage(wrapped.get()) + check decoded.isOk() + let attachedFilter = deserializeBloomFilter(decoded.get().bloomFilter) + check attachedFilter.isOk() + + var snapshot = RollingBloomFilter.init( + filter = attachedFilter.get(), + capacity = DefaultBloomFilterCapacity, + minCapacity = 0, + maxCapacity = DefaultBloomFilterCapacity, + ) + check: + snapshot.contains("delivered-0") + snapshot.contains("delivered-39") + snapshot.contains("incoming-1") + test "retrieval hints": var messageReadyCount = 0 var messageSentCount = 0 @@ -666,6 +872,34 @@ suite "Multi-Channel ReliabilityManager Tests": msgId1 notin history2 msgId2 notin history1 + test "channel isolation (non-empty bloom)": + # With both channels carrying populated blooms, ids on one channel must + # not appear in the other's filter. An empty-bloom test cannot observe + # this — there is nothing to bleed across. + let channel1 = "iso-bloom-1" + let channel2 = "iso-bloom-2" + check rm.ensureChannel(channel1).isOk() + check rm.ensureChannel(channel2).isOk() + + rm.seedBloom(channel1, 25, prefix = "ch1-") + rm.seedBloom(channel2, 25, prefix = "ch2-") + + let wrap1 = rm.wrapOutgoingMessage(@[byte(1)], "iso-msg-1", channel1) + let wrap2 = rm.wrapOutgoingMessage(@[byte(2)], "iso-msg-2", channel2) + check wrap1.isOk() and wrap2.isOk() + + let bf1 = rm.channels[channel1].bloomFilter + let bf2 = rm.channels[channel2].bloomFilter + check: + bf1.contains("ch1-0") + bf1.contains("iso-msg-1") + not bf1.contains("ch2-0") + not bf1.contains("iso-msg-2") + bf2.contains("ch2-0") + bf2.contains("iso-msg-2") + not bf2.contains("ch1-0") + not bf2.contains("iso-msg-1") + test "multi-channel callbacks": var readyMessageCount = 0 var sentMessageCount = 0