feat: Update tests and add new tests for multi channels (#14)

This commit is contained in:
Akhil 2025-07-11 16:31:22 +05:30 committed by GitHub
parent 99121098cc
commit 810b62896f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 250 additions and 80 deletions

View File

@ -1,21 +1,15 @@
mode = ScriptMode.Verbose
# Package
version = "0.1.0"
author = "Waku Team"
description = "E2E Reliability Protocol API"
license = "MIT"
srcDir = "src"
version = "0.1.0"
author = "Waku Team"
description = "E2E Reliability Protocol API"
license = "MIT"
srcDir = "src"
# Dependencies
requires "nim >= 2.2.4",
"chronicles",
"chronos",
"stew",
"stint",
"metrics",
"libp2p",
"results"
"chronicles", "chronos", "stew", "stint", "metrics", "libp2p", "results"
proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "static") =
if not dirExists "build":
@ -49,4 +43,4 @@ task libsdsDynamic, "Generate bindings":
--warning:Deprecated:off \
--warning:UnusedImport:on \
-d:chronicles_log_level=TRACE """,
"dynamic"
"dynamic"

View File

@ -1,14 +1,17 @@
import unittest, results, chronos, std/[times, options, tables]
import ../src/[reliability, message, protobuf, reliability_utils, rolling_bloom_filter]
const testChannel = "testChannel"
# Core functionality tests
suite "Core Operations":
var rm: ReliabilityManager
setup:
let rmResult = newReliabilityManager(some("testChannel"))
let rmResult = newReliabilityManager()
check rmResult.isOk()
rm = rmResult.get()
check rm.ensureChannel(testChannel).isOk()
teardown:
if not rm.isNil:
@ -25,17 +28,18 @@ suite "Core Operations":
let msg = @[byte(1), 2, 3]
let msgId = "test-msg-1"
let wrappedResult = rm.wrapOutgoingMessage(msg, msgId)
let wrappedResult = rm.wrapOutgoingMessage(msg, msgId, testChannel)
check wrappedResult.isOk()
let wrapped = wrappedResult.get()
check wrapped.len > 0
let unwrapResult = rm.unwrapReceivedMessage(wrapped)
check unwrapResult.isOk()
let (unwrapped, missingDeps) = unwrapResult.get()
let (unwrapped, missingDeps, channelId) = unwrapResult.get()
check:
unwrapped == msg
missingDeps.len == 0
channelId == testChannel
test "message ordering":
# Create messages with different timestamps
@ -43,7 +47,7 @@ suite "Core Operations":
messageId: "msg1",
lamportTimestamp: 1,
causalHistory: @[],
channelId: some("testChannel"),
channelId: testChannel,
content: @[byte(1)],
bloomFilter: @[],
)
@ -52,7 +56,7 @@ suite "Core Operations":
messageId: "msg2",
lamportTimestamp: 5,
causalHistory: @[],
channelId: some("testChannel"),
channelId: testChannel,
content: @[byte(2)],
bloomFilter: @[],
)
@ -65,9 +69,9 @@ suite "Core Operations":
# Process out of order
discard rm.unwrapReceivedMessage(serialized2.get())
let timestamp1 = rm.lamportTimestamp
let timestamp1 = rm.channels[testChannel].lamportTimestamp
discard rm.unwrapReceivedMessage(serialized1.get())
let timestamp2 = rm.lamportTimestamp
let timestamp2 = rm.channels[testChannel].lamportTimestamp
check timestamp2 > timestamp1
@ -76,9 +80,10 @@ suite "Reliability Mechanisms":
var rm: ReliabilityManager
setup:
let rmResult = newReliabilityManager(some("testChannel"))
let rmResult = newReliabilityManager()
check rmResult.isOk()
rm = rmResult.get()
check rm.ensureChannel(testChannel).isOk()
teardown:
if not rm.isNil:
@ -90,11 +95,11 @@ suite "Reliability Mechanisms":
var missingDepsCount = 0
rm.setCallbacks(
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
missingDepsCount += 1,
)
@ -108,7 +113,7 @@ suite "Reliability Mechanisms":
messageId: id2,
lamportTimestamp: 2,
causalHistory: @[id1], # msg2 depends on msg1
channelId: some("testChannel"),
channelId: testChannel,
content: @[byte(2)],
bloomFilter: @[],
)
@ -117,7 +122,7 @@ suite "Reliability Mechanisms":
messageId: id3,
lamportTimestamp: 3,
causalHistory: @[id1, id2], # msg3 depends on both msg1 and msg2
channelId: some("testChannel"),
channelId: testChannel,
content: @[byte(3)],
bloomFilter: @[],
)
@ -131,7 +136,7 @@ suite "Reliability Mechanisms":
# First try processing msg3 (which depends on msg2 which depends on msg1)
let unwrapResult3 = rm.unwrapReceivedMessage(serialized3.get())
check unwrapResult3.isOk()
let (_, missingDeps3) = unwrapResult3.get()
let (_, missingDeps3, _) = unwrapResult3.get()
check:
missingDepsCount == 1 # Should trigger missing deps callback
@ -142,7 +147,7 @@ suite "Reliability Mechanisms":
# Then try processing msg2 (which only depends on msg1)
let unwrapResult2 = rm.unwrapReceivedMessage(serialized2.get())
check unwrapResult2.isOk()
let (_, missingDeps2) = unwrapResult2.get()
let (_, missingDeps2, _) = unwrapResult2.get()
check:
missingDepsCount == 2 # Should have triggered another missing deps callback
@ -151,10 +156,10 @@ suite "Reliability Mechanisms":
messageReadyCount == 0 # No messages should be ready yet
# Mark first dependency (msg1) as met
let markResult1 = rm.markDependenciesMet(@[id1])
let markResult1 = rm.markDependenciesMet(@[id1], testChannel)
check markResult1.isOk()
let incomingBuffer = rm.getIncomingBuffer()
let incomingBuffer = rm.getIncomingBuffer(testChannel)
check:
incomingBuffer.len == 0
@ -167,26 +172,26 @@ suite "Reliability Mechanisms":
var missingDepsCount = 0
rm.setCallbacks(
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
missingDepsCount += 1,
)
# Send our message
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = rm.wrapOutgoingMessage(msg1, id1)
let wrap1 = rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
# Create a message that has our message in causal history
let msg2 = SdsMessage(
messageId: "msg2",
lamportTimestamp: rm.lamportTimestamp + 1,
lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1,
causalHistory: @[id1], # Include our message in causal history
channelId: some("testChannel"),
channelId: testChannel,
content: @[byte(2)],
bloomFilter: @[] # Test with an empty bloom filter
,
@ -207,18 +212,18 @@ suite "Reliability Mechanisms":
var messageSentCount = 0
rm.setCallbacks(
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
discard,
)
# Send our message
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = rm.wrapOutgoingMessage(msg1, id1)
let wrap1 = rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
# Create a message with bloom filter containing our message
@ -231,9 +236,9 @@ suite "Reliability Mechanisms":
let msg2 = SdsMessage(
messageId: "msg2",
lamportTimestamp: rm.lamportTimestamp + 1,
lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1,
causalHistory: @[], # Empty causal history as we're using bloom filter
channelId: some("testChannel"),
channelId: testChannel,
content: @[byte(2)],
bloomFilter: bfResult.get(),
)
@ -251,9 +256,10 @@ suite "Periodic Tasks & Buffer Management":
var rm: ReliabilityManager
setup:
let rmResult = newReliabilityManager(some("testChannel"))
let rmResult = newReliabilityManager()
check rmResult.isOk()
rm = rmResult.get()
check rm.ensureChannel(testChannel).isOk()
teardown:
if not rm.isNil:
@ -263,11 +269,11 @@ suite "Periodic Tasks & Buffer Management":
var messageSentCount = 0
rm.setCallbacks(
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
discard,
)
@ -275,18 +281,18 @@ suite "Periodic Tasks & Buffer Management":
for i in 0 .. 5:
let msg = @[byte(i)]
let id = "msg" & $i
let wrap = rm.wrapOutgoingMessage(msg, id)
let wrap = rm.wrapOutgoingMessage(msg, id, testChannel)
check wrap.isOk()
let outBuffer = rm.getOutgoingBuffer()
let outBuffer = rm.getOutgoingBuffer(testChannel)
check outBuffer.len == 6
# Create message that acknowledges some messages
let ackMsg = SdsMessage(
messageId: "ack1",
lamportTimestamp: rm.lamportTimestamp + 1,
lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1,
causalHistory: @["msg0", "msg2", "msg4"],
channelId: some("testChannel"),
channelId: testChannel,
content: @[byte(100)],
bloomFilter: @[],
)
@ -297,7 +303,7 @@ suite "Periodic Tasks & Buffer Management":
# Process the acknowledgment
discard rm.unwrapReceivedMessage(serializedAck.get())
let finalBuffer = rm.getOutgoingBuffer()
let finalBuffer = rm.getOutgoingBuffer(testChannel)
check:
finalBuffer.len == 3 # Should have removed acknowledged messages
messageSentCount == 3
@ -312,29 +318,30 @@ suite "Periodic Tasks & Buffer Management":
config.bloomFilterCapacity = 2 # Small capacity for testing
config.maxResendAttempts = 3 # Set a low number of max attempts
let rmResultP = newReliabilityManager(some("testChannel"), config)
let rmResultP = newReliabilityManager(config)
check rmResultP.isOk()
let rm = rmResultP.get()
check rm.ensureChannel(testChannel).isOk()
rm.setCallbacks(
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
discard,
)
# First message - should be cleaned from bloom filter later
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = rm.wrapOutgoingMessage(msg1, id1)
let wrap1 = rm.wrapOutgoingMessage(msg1, id1, testChannel)
check wrap1.isOk()
let initialBuffer = rm.getOutgoingBuffer()
let initialBuffer = rm.getOutgoingBuffer(testChannel)
check:
initialBuffer[0].resendAttempts == 0
rm.bloomFilter.contains(id1)
rm.channels[testChannel].bloomFilter.contains(id1)
rm.startPeriodicTasks()
@ -344,33 +351,33 @@ suite "Periodic Tasks & Buffer Management":
# Add new messages
let msg2 = @[byte(2)]
let id2 = "msg2"
let wrap2 = rm.wrapOutgoingMessage(msg2, id2)
let wrap2 = rm.wrapOutgoingMessage(msg2, id2, testChannel)
check wrap2.isOk()
let msg3 = @[byte(3)]
let id3 = "msg3"
let wrap3 = rm.wrapOutgoingMessage(msg3, id3)
let wrap3 = rm.wrapOutgoingMessage(msg3, id3, testChannel)
check wrap3.isOk()
let finalBuffer = rm.getOutgoingBuffer()
let finalBuffer = rm.getOutgoingBuffer(testChannel)
check:
finalBuffer.len == 2
# Only msg2 and msg3 should be in buffer, msg1 should be removed after max retries
finalBuffer[0].message.messageId == id2 # Verify it's the second message
finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts
not rm.bloomFilter.contains(id1) # Bloom filter cleaning check
rm.bloomFilter.contains(id3) # New message still in filter
not rm.channels[testChannel].bloomFilter.contains(id1) # Bloom filter cleaning check
rm.channels[testChannel].bloomFilter.contains(id3) # New message still in filter
rm.cleanup()
test "periodic sync callback":
var syncCallCount = 0
rm.setCallbacks(
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
discard,
proc() {.gcsafe.} =
syncCallCount += 1,
@ -387,9 +394,10 @@ suite "Special Cases Handling":
var rm: ReliabilityManager
setup:
let rmResult = newReliabilityManager(some("testChannel"))
let rmResult = newReliabilityManager()
check rmResult.isOk()
rm = rmResult.get()
check rm.ensureChannel(testChannel).isOk()
teardown:
if not rm.isNil:
@ -400,10 +408,10 @@ suite "Special Cases Handling":
for i in 0 .. rm.config.maxMessageHistory + 5:
let msg = @[byte(i)]
let id = "msg" & $i
let wrap = rm.wrapOutgoingMessage(msg, id)
let wrap = rm.wrapOutgoingMessage(msg, id, testChannel)
check wrap.isOk()
let history = rm.getMessageHistory()
let history = rm.getMessageHistory(testChannel)
check:
history.len <= rm.config.maxMessageHistory
history[^1] == "msg" & $(rm.config.maxMessageHistory + 5)
@ -413,7 +421,7 @@ suite "Special Cases Handling":
messageId: "invalid-bf",
lamportTimestamp: 1,
causalHistory: @[],
channelId: some("testChannel"),
channelId: testChannel,
content: @[byte(1)],
bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data
,
@ -431,11 +439,11 @@ suite "Special Cases Handling":
test "duplicate message handling":
var messageReadyCount = 0
rm.setCallbacks(
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageReadyCount += 1,
proc(messageId: SdsMessageID) {.gcsafe.} =
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
discard,
)
@ -444,7 +452,7 @@ suite "Special Cases Handling":
messageId: "dup-msg",
lamportTimestamp: 1,
causalHistory: @[],
channelId: some("testChannel"),
channelId: testChannel,
content: @[byte(1)],
bloomFilter: @[],
)
@ -464,33 +472,201 @@ suite "Special Cases Handling":
test "error handling":
# Empty message
let emptyMsg: seq[byte] = @[]
let emptyResult = rm.wrapOutgoingMessage(emptyMsg, "empty")
let emptyResult = rm.wrapOutgoingMessage(emptyMsg, "empty", testChannel)
check:
not emptyResult.isOk()
emptyResult.error == reInvalidArgument
# Oversized message
let largeMsg = newSeq[byte](MaxMessageSize + 1)
let largeResult = rm.wrapOutgoingMessage(largeMsg, "large")
let largeResult = rm.wrapOutgoingMessage(largeMsg, "large", testChannel)
check:
not largeResult.isOk()
largeResult.error == reMessageTooLarge
suite "cleanup":
test "cleanup works correctly":
let rmResult = newReliabilityManager(some("testChannel"))
let rmResult = newReliabilityManager()
check rmResult.isOk()
let rm = rmResult.get()
check rm.ensureChannel(testChannel).isOk()
# Add some messages
let msg = @[byte(1), 2, 3]
let msgId = "test-msg-1"
discard rm.wrapOutgoingMessage(msg, msgId)
discard rm.wrapOutgoingMessage(msg, msgId, testChannel)
rm.cleanup()
let outBuffer = rm.getOutgoingBuffer()
let history = rm.getMessageHistory()
let outBuffer = rm.getOutgoingBuffer(testChannel)
let history = rm.getMessageHistory(testChannel)
check:
outBuffer.len == 0
history.len == 0
suite "Multi-Channel ReliabilityManager Tests":
var rm: ReliabilityManager
setup:
let rmResult = newReliabilityManager()
check rmResult.isOk()
rm = rmResult.get()
teardown:
if not rm.isNil:
rm.cleanup()
test "can create multi-channel manager without channel ID":
check rm.channels.len == 0
test "channel management":
let channel1 = "channel1"
let channel2 = "channel2"
# Ensure channels
check rm.ensureChannel(channel1).isOk()
check rm.ensureChannel(channel2).isOk()
check rm.channels.len == 2
# Remove channel
check rm.removeChannel(channel1).isOk()
check rm.channels.len == 1
check channel1 notin rm.channels
check channel2 in rm.channels
test "stateless message unwrapping with channel extraction":
let channel1 = "test-channel-1"
let channel2 = "test-channel-2"
# Create and wrap messages for different channels
let msg1 = @[byte(1), 2, 3]
let msgId1 = "msg1"
let wrapped1 = rm.wrapOutgoingMessage(msg1, msgId1, channel1)
check wrapped1.isOk()
let msg2 = @[byte(4), 5, 6]
let msgId2 = "msg2"
let wrapped2 = rm.wrapOutgoingMessage(msg2, msgId2, channel2)
check wrapped2.isOk()
# Unwrap messages - should extract channel ID and route correctly
let unwrap1 = rm.unwrapReceivedMessage(wrapped1.get())
check unwrap1.isOk()
let (content1, deps1, extractedChannel1) = unwrap1.get()
check:
content1 == msg1
deps1.len == 0
extractedChannel1 == channel1
let unwrap2 = rm.unwrapReceivedMessage(wrapped2.get())
check unwrap2.isOk()
let (content2, deps2, extractedChannel2) = unwrap2.get()
check:
content2 == msg2
deps2.len == 0
extractedChannel2 == channel2
test "channel isolation":
let channel1 = "isolated-channel-1"
let channel2 = "isolated-channel-2"
# Add messages to different channels
let msg1 = @[byte(1)]
let msgId1 = "isolated-msg1"
discard rm.wrapOutgoingMessage(msg1, msgId1, channel1)
let msg2 = @[byte(2)]
let msgId2 = "isolated-msg2"
discard rm.wrapOutgoingMessage(msg2, msgId2, channel2)
# Check channel-specific data is isolated
let history1 = rm.getMessageHistory(channel1)
let history2 = rm.getMessageHistory(channel2)
check:
history1.len == 1
history2.len == 1
msgId1 in history1
msgId2 in history2
msgId1 notin history2
msgId2 notin history1
test "multi-channel callbacks":
var readyMessageCount = 0
var sentMessageCount = 0
var missingDepsCount = 0
rm.setCallbacks(
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
readyMessageCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
sentMessageCount += 1,
proc(messageId: SdsMessageID, deps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
missingDepsCount += 1
)
let channel1 = "callback-channel-1"
let channel2 = "callback-channel-2"
# Send messages from both channels
let msg1 = @[byte(1)]
let msgId1 = "callback-msg1"
let wrapped1 = rm.wrapOutgoingMessage(msg1, msgId1, channel1)
check wrapped1.isOk()
let msg2 = @[byte(2)]
let msgId2 = "callback-msg2"
let wrapped2 = rm.wrapOutgoingMessage(msg2, msgId2, channel2)
check wrapped2.isOk()
# Create acknowledgment messages that include our message IDs in causal history
# to trigger sent callbacks
let ackMsg1 = SdsMessage(
messageId: "ack1",
lamportTimestamp: rm.channels[channel1].lamportTimestamp + 1,
causalHistory: @[msgId1], # Acknowledge msg1
channelId: channel1,
content: @[byte(100)],
bloomFilter: @[],
)
let ackMsg2 = SdsMessage(
messageId: "ack2",
lamportTimestamp: rm.channels[channel2].lamportTimestamp + 1,
causalHistory: @[msgId2], # Acknowledge msg2
channelId: channel2,
content: @[byte(101)],
bloomFilter: @[],
)
let serializedAck1 = serializeMessage(ackMsg1)
let serializedAck2 = serializeMessage(ackMsg2)
check:
serializedAck1.isOk()
serializedAck2.isOk()
# Process acknowledgment messages - should trigger callbacks
discard rm.unwrapReceivedMessage(serializedAck1.get())
discard rm.unwrapReceivedMessage(serializedAck2.get())
check:
readyMessageCount == 2 # Both ack messages should trigger ready callbacks
sentMessageCount == 2 # Both original messages should be marked as sent
missingDepsCount == 0 # No missing dependencies
test "channel-specific dependency management":
let channel1 = "dep-channel-1"
let channel2 = "dep-channel-2"
let depIds = @["dep1", "dep2", "dep3"]
# Ensure both channels exist first
check rm.ensureChannel(channel1).isOk()
check rm.ensureChannel(channel2).isOk()
# Mark dependencies as met for specific channel
check rm.markDependenciesMet(depIds, channel1).isOk()
# Dependencies should only affect the specified channel
# Dependencies in channel1 should not affect channel2
check rm.channels[channel1].bloomFilter.contains("dep1")
check not rm.channels[channel2].bloomFilter.contains("dep1")