mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-05-18 07:59:54 +00:00
feat: update test for bloom filter + reliability manager
This commit is contained in:
parent
5966998946
commit
3cf816da76
@ -7,6 +7,16 @@ converter toParticipantID(s: string): SdsParticipantID = s.SdsParticipantID
|
||||
|
||||
const testChannel = "testChannel"
|
||||
|
||||
proc seedBloom(
|
||||
rm: ReliabilityManager, channel: SdsChannelID, n: int, prefix = "noise"
|
||||
) =
|
||||
## Pre-populate a channel's bloom filter with n unrelated ids so the test
|
||||
## exercises the manager against a realistic, non-empty filter rather than
|
||||
## the implicit empty one a fresh ReliabilityManager would produce.
|
||||
let ch = rm.channels[channel]
|
||||
for i in 0 ..< n:
|
||||
ch.bloomFilter.add(prefix & $i)
|
||||
|
||||
# Core functionality tests
|
||||
suite "Core Operations":
|
||||
var rm: ReliabilityManager
|
||||
@ -45,6 +55,31 @@ suite "Core Operations":
|
||||
missingDeps.len == 0
|
||||
channelId == testChannel
|
||||
|
||||
test "basic message wrapping and unwrapping (non-empty bloom)":
|
||||
rm.seedBloom(testChannel, 50)
|
||||
|
||||
let msg = @[byte(1), 2, 3]
|
||||
let msgId = "test-msg-1"
|
||||
|
||||
let wrappedResult = rm.wrapOutgoingMessage(msg, msgId, testChannel)
|
||||
check wrappedResult.isOk()
|
||||
let wrapped = wrappedResult.get()
|
||||
check wrapped.len > 0
|
||||
|
||||
# The outgoing message must carry the populated bloom snapshot, not an
|
||||
# empty one — this is the path that was never exercised before.
|
||||
let decoded = deserializeMessage(wrapped)
|
||||
check decoded.isOk()
|
||||
check decoded.get().bloomFilter.len > 0
|
||||
|
||||
let unwrapResult = rm.unwrapReceivedMessage(wrapped)
|
||||
check unwrapResult.isOk()
|
||||
let (unwrapped, missingDeps, channelId) = unwrapResult.get()
|
||||
check:
|
||||
unwrapped == msg
|
||||
missingDeps.len == 0
|
||||
channelId == testChannel
|
||||
|
||||
test "message ordering":
|
||||
# Create messages with different timestamps
|
||||
let msg1 = SdsMessage(
|
||||
@ -170,6 +205,52 @@ suite "Reliability Mechanisms":
|
||||
messageReadyCount == 2 # Both msg2 and msg3 should be ready
|
||||
missingDepsCount == 2 # Should still be 2 from the initial missing deps
|
||||
|
||||
test "dependency detection and resolution (non-empty bloom)":
|
||||
# A populated bloom filter must not short-circuit the dependency check.
|
||||
# Dependency resolution reads messageHistory, not the bloom — but a future
|
||||
# "optimisation" could regress this. Seed the bloom with the dep id so a
|
||||
# bloom-based shortcut would mistakenly mark the dep as satisfied.
|
||||
var missingDepsCount = 0
|
||||
var messageReadyCount = 0
|
||||
|
||||
rm.setCallbacks(
|
||||
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
|
||||
messageReadyCount += 1,
|
||||
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
|
||||
discard,
|
||||
proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
|
||||
missingDepsCount += 1,
|
||||
)
|
||||
|
||||
let id1 = "msg1"
|
||||
let id2 = "msg2"
|
||||
|
||||
rm.seedBloom(testChannel, 30)
|
||||
# Crucially, also seed the bloom with id1 itself — the dep we will be
|
||||
# missing from messageHistory. The manager must still report it missing.
|
||||
rm.channels[testChannel].bloomFilter.add(id1)
|
||||
|
||||
let msg2 = SdsMessage(
|
||||
messageId: id2,
|
||||
lamportTimestamp: 2,
|
||||
causalHistory: toCausalHistory(@[id1]),
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
)
|
||||
let serialized2 = serializeMessage(msg2)
|
||||
check serialized2.isOk()
|
||||
|
||||
let unwrapResult = rm.unwrapReceivedMessage(serialized2.get())
|
||||
check unwrapResult.isOk()
|
||||
let (_, missingDeps, _) = unwrapResult.get()
|
||||
|
||||
check:
|
||||
missingDepsCount == 1
|
||||
missingDeps.len == 1
|
||||
id1 in missingDeps.getMessageIds()
|
||||
messageReadyCount == 0
|
||||
|
||||
test "acknowledgment via causal history":
|
||||
var messageReadyCount = 0
|
||||
var messageSentCount = 0
|
||||
@ -212,6 +293,47 @@ suite "Reliability Mechanisms":
|
||||
messageReadyCount == 1 # For msg2 which we "received"
|
||||
messageSentCount == 1 # For msg1 which was acknowledged via causal history
|
||||
|
||||
test "acknowledgment via causal history (non-empty bloom)":
|
||||
# The causal-history ack path must not be perturbed by the local channel
|
||||
# bloom carrying unrelated ids, and the empty bloom on the incoming
|
||||
# message must not spuriously ack any of them.
|
||||
var messageReadyCount = 0
|
||||
var messageSentCount = 0
|
||||
|
||||
rm.setCallbacks(
|
||||
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
|
||||
messageReadyCount += 1,
|
||||
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
|
||||
messageSentCount += 1,
|
||||
proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
|
||||
discard,
|
||||
)
|
||||
|
||||
rm.seedBloom(testChannel, 50)
|
||||
|
||||
let msg1 = @[byte(1)]
|
||||
let id1 = "msg1"
|
||||
let wrap1 = rm.wrapOutgoingMessage(msg1, id1, testChannel)
|
||||
check wrap1.isOk()
|
||||
|
||||
let msg2 = SdsMessage(
|
||||
messageId: "msg2",
|
||||
lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1,
|
||||
causalHistory: toCausalHistory(@[id1]),
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: @[],
|
||||
)
|
||||
let serializedMsg2 = serializeMessage(msg2)
|
||||
check serializedMsg2.isOk()
|
||||
|
||||
let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get())
|
||||
check unwrapResult.isOk()
|
||||
|
||||
check:
|
||||
messageReadyCount == 1
|
||||
messageSentCount == 1 # exactly id1; no spurious acks for the seeded ids
|
||||
|
||||
test "acknowledgment via bloom filter":
|
||||
var messageSentCount = 0
|
||||
|
||||
@ -255,6 +377,90 @@ suite "Reliability Mechanisms":
|
||||
|
||||
check messageSentCount == 1 # Our message should be acknowledged via bloom filter
|
||||
|
||||
test "acknowledgment via bloom filter (non-empty bloom)":
|
||||
# The peer's bloom contains both our outgoing id and a pile of unrelated
|
||||
# ids. The manager must still ack our message exactly once, and unrelated
|
||||
# ids in the peer's bloom must not produce spurious sent callbacks.
|
||||
var messageSentCount = 0
|
||||
|
||||
rm.setCallbacks(
|
||||
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
|
||||
discard,
|
||||
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
|
||||
messageSentCount += 1,
|
||||
proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
|
||||
discard,
|
||||
)
|
||||
|
||||
let msg1 = @[byte(1)]
|
||||
let id1 = "msg1"
|
||||
let wrap1 = rm.wrapOutgoingMessage(msg1, id1, testChannel)
|
||||
check wrap1.isOk()
|
||||
|
||||
var otherPartyBloomFilter =
|
||||
RollingBloomFilter.init(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate)
|
||||
for i in 0 ..< 100:
|
||||
otherPartyBloomFilter.add("peer-noise-" & $i)
|
||||
otherPartyBloomFilter.add(id1)
|
||||
|
||||
let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter)
|
||||
check bfResult.isOk()
|
||||
|
||||
let msg2 = SdsMessage(
|
||||
messageId: "msg2",
|
||||
lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(2)],
|
||||
bloomFilter: bfResult.get(),
|
||||
)
|
||||
let serializedMsg2 = serializeMessage(msg2)
|
||||
check serializedMsg2.isOk()
|
||||
|
||||
let unwrapResult = rm.unwrapReceivedMessage(serializedMsg2.get())
|
||||
check unwrapResult.isOk()
|
||||
|
||||
check messageSentCount == 1
|
||||
|
||||
test "outgoing message bloom snapshot reflects channel state":
|
||||
# Until now nothing asserts that wrapOutgoingMessage actually attaches
|
||||
# the current bloom snapshot — every other test runs against an empty
|
||||
# filter where the field is empty either way.
|
||||
rm.seedBloom(testChannel, 40, prefix = "delivered-")
|
||||
|
||||
# Plus a real delivery so we exercise the bloom-on-delivery path too.
|
||||
let incoming = SdsMessage(
|
||||
messageId: "incoming-1",
|
||||
lamportTimestamp: 1,
|
||||
causalHistory: @[],
|
||||
channelId: testChannel,
|
||||
content: @[byte(9)],
|
||||
bloomFilter: @[],
|
||||
)
|
||||
let serIncoming = serializeMessage(incoming)
|
||||
check serIncoming.isOk()
|
||||
discard rm.unwrapReceivedMessage(serIncoming.get())
|
||||
|
||||
let outId = "outgoing-1"
|
||||
let wrapped = rm.wrapOutgoingMessage(@[byte(1)], outId, testChannel)
|
||||
check wrapped.isOk()
|
||||
|
||||
let decoded = deserializeMessage(wrapped.get())
|
||||
check decoded.isOk()
|
||||
let attachedFilter = deserializeBloomFilter(decoded.get().bloomFilter)
|
||||
check attachedFilter.isOk()
|
||||
|
||||
var snapshot = RollingBloomFilter.init(
|
||||
filter = attachedFilter.get(),
|
||||
capacity = DefaultBloomFilterCapacity,
|
||||
minCapacity = 0,
|
||||
maxCapacity = DefaultBloomFilterCapacity,
|
||||
)
|
||||
check:
|
||||
snapshot.contains("delivered-0")
|
||||
snapshot.contains("delivered-39")
|
||||
snapshot.contains("incoming-1")
|
||||
|
||||
test "retrieval hints":
|
||||
var messageReadyCount = 0
|
||||
var messageSentCount = 0
|
||||
@ -666,6 +872,34 @@ suite "Multi-Channel ReliabilityManager Tests":
|
||||
msgId1 notin history2
|
||||
msgId2 notin history1
|
||||
|
||||
test "channel isolation (non-empty bloom)":
|
||||
# With both channels carrying populated blooms, ids on one channel must
|
||||
# not appear in the other's filter. An empty-bloom test cannot observe
|
||||
# this — there is nothing to bleed across.
|
||||
let channel1 = "iso-bloom-1"
|
||||
let channel2 = "iso-bloom-2"
|
||||
check rm.ensureChannel(channel1).isOk()
|
||||
check rm.ensureChannel(channel2).isOk()
|
||||
|
||||
rm.seedBloom(channel1, 25, prefix = "ch1-")
|
||||
rm.seedBloom(channel2, 25, prefix = "ch2-")
|
||||
|
||||
let wrap1 = rm.wrapOutgoingMessage(@[byte(1)], "iso-msg-1", channel1)
|
||||
let wrap2 = rm.wrapOutgoingMessage(@[byte(2)], "iso-msg-2", channel2)
|
||||
check wrap1.isOk() and wrap2.isOk()
|
||||
|
||||
let bf1 = rm.channels[channel1].bloomFilter
|
||||
let bf2 = rm.channels[channel2].bloomFilter
|
||||
check:
|
||||
bf1.contains("ch1-0")
|
||||
bf1.contains("iso-msg-1")
|
||||
not bf1.contains("ch2-0")
|
||||
not bf1.contains("iso-msg-2")
|
||||
bf2.contains("ch2-0")
|
||||
bf2.contains("iso-msg-2")
|
||||
not bf2.contains("ch1-0")
|
||||
not bf2.contains("iso-msg-1")
|
||||
|
||||
test "multi-channel callbacks":
|
||||
var readyMessageCount = 0
|
||||
var sentMessageCount = 0
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user