diff --git a/sds.nimble b/sds.nimble index ab3e48e..d8294b3 100644 --- a/sds.nimble +++ b/sds.nimble @@ -1,21 +1,15 @@ mode = ScriptMode.Verbose # Package -version = "0.1.0" -author = "Waku Team" -description = "E2E Reliability Protocol API" -license = "MIT" -srcDir = "src" +version = "0.1.0" +author = "Waku Team" +description = "E2E Reliability Protocol API" +license = "MIT" +srcDir = "src" # Dependencies requires "nim >= 2.2.4", - "chronicles", - "chronos", - "stew", - "stint", - "metrics", - "libp2p", - "results" + "chronicles", "chronos", "stew", "stint", "metrics", "libp2p", "results" proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "static") = if not dirExists "build": @@ -49,4 +43,4 @@ task libsdsDynamic, "Generate bindings": --warning:Deprecated:off \ --warning:UnusedImport:on \ -d:chronicles_log_level=TRACE """, - "dynamic" \ No newline at end of file + "dynamic" diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index a0b2135..7b68a86 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -1,14 +1,17 @@ import unittest, results, chronos, std/[times, options, tables] import ../src/[reliability, message, protobuf, reliability_utils, rolling_bloom_filter] +const testChannel = "testChannel" + # Core functionality tests suite "Core Operations": var rm: ReliabilityManager setup: - let rmResult = newReliabilityManager(some("testChannel")) + let rmResult = newReliabilityManager() check rmResult.isOk() rm = rmResult.get() + check rm.ensureChannel(testChannel).isOk() teardown: if not rm.isNil: @@ -25,17 +28,18 @@ suite "Core Operations": let msg = @[byte(1), 2, 3] let msgId = "test-msg-1" - let wrappedResult = rm.wrapOutgoingMessage(msg, msgId) + let wrappedResult = rm.wrapOutgoingMessage(msg, msgId, testChannel) check wrappedResult.isOk() let wrapped = wrappedResult.get() check wrapped.len > 0 let unwrapResult = rm.unwrapReceivedMessage(wrapped) check unwrapResult.isOk() - let (unwrapped, missingDeps) = unwrapResult.get() + let (unwrapped, missingDeps, channelId) = unwrapResult.get() check: unwrapped == msg missingDeps.len == 0 + channelId == testChannel test "message ordering": # Create messages with different timestamps @@ -43,7 +47,7 @@ suite "Core Operations": messageId: "msg1", lamportTimestamp: 1, causalHistory: @[], - channelId: some("testChannel"), + channelId: testChannel, content: @[byte(1)], bloomFilter: @[], ) @@ -52,7 +56,7 @@ suite "Core Operations": messageId: "msg2", lamportTimestamp: 5, causalHistory: @[], - channelId: some("testChannel"), + channelId: testChannel, content: @[byte(2)], bloomFilter: @[], ) @@ -65,9 +69,9 @@ suite "Core Operations": # Process out of order discard rm.unwrapReceivedMessage(serialized2.get()) - let timestamp1 = rm.lamportTimestamp + let timestamp1 = rm.channels[testChannel].lamportTimestamp discard rm.unwrapReceivedMessage(serialized1.get()) - let timestamp2 = rm.lamportTimestamp + let timestamp2 = rm.channels[testChannel].lamportTimestamp check timestamp2 > timestamp1 @@ -76,9 +80,10 @@ suite "Reliability Mechanisms": var rm: ReliabilityManager setup: - let rmResult = newReliabilityManager(some("testChannel")) + let rmResult = newReliabilityManager() check rmResult.isOk() rm = rmResult.get() + check rm.ensureChannel(testChannel).isOk() teardown: if not rm.isNil: @@ -90,11 +95,11 @@ suite "Reliability Mechanisms": var missingDepsCount = 0 rm.setCallbacks( - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageReadyCount += 1, - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = missingDepsCount += 1, ) @@ -108,7 +113,7 @@ suite "Reliability Mechanisms": messageId: id2, lamportTimestamp: 2, causalHistory: @[id1], # msg2 depends on msg1 - channelId: some("testChannel"), + channelId: testChannel, content: @[byte(2)], bloomFilter: @[], ) @@ -117,7 +122,7 @@ suite "Reliability Mechanisms": messageId: id3, lamportTimestamp: 3, causalHistory: @[id1, id2], # msg3 depends on both msg1 and msg2 - channelId: some("testChannel"), + channelId: testChannel, content: @[byte(3)], bloomFilter: @[], ) @@ -131,7 +136,7 @@ suite "Reliability Mechanisms": # First try processing msg3 (which depends on msg2 which depends on msg1) let unwrapResult3 = rm.unwrapReceivedMessage(serialized3.get()) check unwrapResult3.isOk() - let (_, missingDeps3) = unwrapResult3.get() + let (_, missingDeps3, _) = unwrapResult3.get() check: missingDepsCount == 1 # Should trigger missing deps callback @@ -142,7 +147,7 @@ suite "Reliability Mechanisms": # Then try processing msg2 (which only depends on msg1) let unwrapResult2 = rm.unwrapReceivedMessage(serialized2.get()) check unwrapResult2.isOk() - let (_, missingDeps2) = unwrapResult2.get() + let (_, missingDeps2, _) = unwrapResult2.get() check: missingDepsCount == 2 # Should have triggered another missing deps callback @@ -151,10 +156,10 @@ suite "Reliability Mechanisms": messageReadyCount == 0 # No messages should be ready yet # Mark first dependency (msg1) as met - let markResult1 = rm.markDependenciesMet(@[id1]) + let markResult1 = rm.markDependenciesMet(@[id1], testChannel) check markResult1.isOk() - let incomingBuffer = rm.getIncomingBuffer() + let incomingBuffer = rm.getIncomingBuffer(testChannel) check: incomingBuffer.len == 0 @@ -167,26 +172,26 @@ suite "Reliability Mechanisms": var missingDepsCount = 0 rm.setCallbacks( - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageReadyCount += 1, - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = missingDepsCount += 1, ) # Send our message let msg1 = @[byte(1)] let id1 = "msg1" - let wrap1 = rm.wrapOutgoingMessage(msg1, id1) + let wrap1 = rm.wrapOutgoingMessage(msg1, id1, testChannel) check wrap1.isOk() # Create a message that has our message in causal history let msg2 = SdsMessage( messageId: "msg2", - lamportTimestamp: rm.lamportTimestamp + 1, + lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1, causalHistory: @[id1], # Include our message in causal history - channelId: some("testChannel"), + channelId: testChannel, content: @[byte(2)], bloomFilter: @[] # Test with an empty bloom filter , @@ -207,18 +212,18 @@ suite "Reliability Mechanisms": var messageSentCount = 0 rm.setCallbacks( - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = discard, - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = discard, ) # Send our message let msg1 = @[byte(1)] let id1 = "msg1" - let wrap1 = rm.wrapOutgoingMessage(msg1, id1) + let wrap1 = rm.wrapOutgoingMessage(msg1, id1, testChannel) check wrap1.isOk() # Create a message with bloom filter containing our message @@ -231,9 +236,9 @@ suite "Reliability Mechanisms": let msg2 = SdsMessage( messageId: "msg2", - lamportTimestamp: rm.lamportTimestamp + 1, + lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1, causalHistory: @[], # Empty causal history as we're using bloom filter - channelId: some("testChannel"), + channelId: testChannel, content: @[byte(2)], bloomFilter: bfResult.get(), ) @@ -251,9 +256,10 @@ suite "Periodic Tasks & Buffer Management": var rm: ReliabilityManager setup: - let rmResult = newReliabilityManager(some("testChannel")) + let rmResult = newReliabilityManager() check rmResult.isOk() rm = rmResult.get() + check rm.ensureChannel(testChannel).isOk() teardown: if not rm.isNil: @@ -263,11 +269,11 @@ suite "Periodic Tasks & Buffer Management": var messageSentCount = 0 rm.setCallbacks( - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = discard, - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = discard, ) @@ -275,18 +281,18 @@ suite "Periodic Tasks & Buffer Management": for i in 0 .. 5: let msg = @[byte(i)] let id = "msg" & $i - let wrap = rm.wrapOutgoingMessage(msg, id) + let wrap = rm.wrapOutgoingMessage(msg, id, testChannel) check wrap.isOk() - let outBuffer = rm.getOutgoingBuffer() + let outBuffer = rm.getOutgoingBuffer(testChannel) check outBuffer.len == 6 # Create message that acknowledges some messages let ackMsg = SdsMessage( messageId: "ack1", - lamportTimestamp: rm.lamportTimestamp + 1, + lamportTimestamp: rm.channels[testChannel].lamportTimestamp + 1, causalHistory: @["msg0", "msg2", "msg4"], - channelId: some("testChannel"), + channelId: testChannel, content: @[byte(100)], bloomFilter: @[], ) @@ -297,7 +303,7 @@ suite "Periodic Tasks & Buffer Management": # Process the acknowledgment discard rm.unwrapReceivedMessage(serializedAck.get()) - let finalBuffer = rm.getOutgoingBuffer() + let finalBuffer = rm.getOutgoingBuffer(testChannel) check: finalBuffer.len == 3 # Should have removed acknowledged messages messageSentCount == 3 @@ -312,29 +318,30 @@ suite "Periodic Tasks & Buffer Management": config.bloomFilterCapacity = 2 # Small capacity for testing config.maxResendAttempts = 3 # Set a low number of max attempts - let rmResultP = newReliabilityManager(some("testChannel"), config) + let rmResultP = newReliabilityManager(config) check rmResultP.isOk() let rm = rmResultP.get() + check rm.ensureChannel(testChannel).isOk() rm.setCallbacks( - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = discard, - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = discard, ) # First message - should be cleaned from bloom filter later let msg1 = @[byte(1)] let id1 = "msg1" - let wrap1 = rm.wrapOutgoingMessage(msg1, id1) + let wrap1 = rm.wrapOutgoingMessage(msg1, id1, testChannel) check wrap1.isOk() - let initialBuffer = rm.getOutgoingBuffer() + let initialBuffer = rm.getOutgoingBuffer(testChannel) check: initialBuffer[0].resendAttempts == 0 - rm.bloomFilter.contains(id1) + rm.channels[testChannel].bloomFilter.contains(id1) rm.startPeriodicTasks() @@ -344,33 +351,33 @@ suite "Periodic Tasks & Buffer Management": # Add new messages let msg2 = @[byte(2)] let id2 = "msg2" - let wrap2 = rm.wrapOutgoingMessage(msg2, id2) + let wrap2 = rm.wrapOutgoingMessage(msg2, id2, testChannel) check wrap2.isOk() let msg3 = @[byte(3)] let id3 = "msg3" - let wrap3 = rm.wrapOutgoingMessage(msg3, id3) + let wrap3 = rm.wrapOutgoingMessage(msg3, id3, testChannel) check wrap3.isOk() - let finalBuffer = rm.getOutgoingBuffer() + let finalBuffer = rm.getOutgoingBuffer(testChannel) check: finalBuffer.len == 2 # Only msg2 and msg3 should be in buffer, msg1 should be removed after max retries finalBuffer[0].message.messageId == id2 # Verify it's the second message finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts - not rm.bloomFilter.contains(id1) # Bloom filter cleaning check - rm.bloomFilter.contains(id3) # New message still in filter + not rm.channels[testChannel].bloomFilter.contains(id1) # Bloom filter cleaning check + rm.channels[testChannel].bloomFilter.contains(id3) # New message still in filter rm.cleanup() test "periodic sync callback": var syncCallCount = 0 rm.setCallbacks( - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = discard, - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = discard, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = discard, proc() {.gcsafe.} = syncCallCount += 1, @@ -387,9 +394,10 @@ suite "Special Cases Handling": var rm: ReliabilityManager setup: - let rmResult = newReliabilityManager(some("testChannel")) + let rmResult = newReliabilityManager() check rmResult.isOk() rm = rmResult.get() + check rm.ensureChannel(testChannel).isOk() teardown: if not rm.isNil: @@ -400,10 +408,10 @@ suite "Special Cases Handling": for i in 0 .. rm.config.maxMessageHistory + 5: let msg = @[byte(i)] let id = "msg" & $i - let wrap = rm.wrapOutgoingMessage(msg, id) + let wrap = rm.wrapOutgoingMessage(msg, id, testChannel) check wrap.isOk() - let history = rm.getMessageHistory() + let history = rm.getMessageHistory(testChannel) check: history.len <= rm.config.maxMessageHistory history[^1] == "msg" & $(rm.config.maxMessageHistory + 5) @@ -413,7 +421,7 @@ suite "Special Cases Handling": messageId: "invalid-bf", lamportTimestamp: 1, causalHistory: @[], - channelId: some("testChannel"), + channelId: testChannel, content: @[byte(1)], bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data , @@ -431,11 +439,11 @@ suite "Special Cases Handling": test "duplicate message handling": var messageReadyCount = 0 rm.setCallbacks( - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageReadyCount += 1, - proc(messageId: SdsMessageID) {.gcsafe.} = + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = discard, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = discard, ) @@ -444,7 +452,7 @@ suite "Special Cases Handling": messageId: "dup-msg", lamportTimestamp: 1, causalHistory: @[], - channelId: some("testChannel"), + channelId: testChannel, content: @[byte(1)], bloomFilter: @[], ) @@ -464,33 +472,201 @@ suite "Special Cases Handling": test "error handling": # Empty message let emptyMsg: seq[byte] = @[] - let emptyResult = rm.wrapOutgoingMessage(emptyMsg, "empty") + let emptyResult = rm.wrapOutgoingMessage(emptyMsg, "empty", testChannel) check: not emptyResult.isOk() emptyResult.error == reInvalidArgument # Oversized message let largeMsg = newSeq[byte](MaxMessageSize + 1) - let largeResult = rm.wrapOutgoingMessage(largeMsg, "large") + let largeResult = rm.wrapOutgoingMessage(largeMsg, "large", testChannel) check: not largeResult.isOk() largeResult.error == reMessageTooLarge suite "cleanup": test "cleanup works correctly": - let rmResult = newReliabilityManager(some("testChannel")) + let rmResult = newReliabilityManager() check rmResult.isOk() let rm = rmResult.get() + check rm.ensureChannel(testChannel).isOk() # Add some messages let msg = @[byte(1), 2, 3] let msgId = "test-msg-1" - discard rm.wrapOutgoingMessage(msg, msgId) + discard rm.wrapOutgoingMessage(msg, msgId, testChannel) rm.cleanup() - let outBuffer = rm.getOutgoingBuffer() - let history = rm.getMessageHistory() + let outBuffer = rm.getOutgoingBuffer(testChannel) + let history = rm.getMessageHistory(testChannel) check: outBuffer.len == 0 history.len == 0 + +suite "Multi-Channel ReliabilityManager Tests": + var rm: ReliabilityManager + + setup: + let rmResult = newReliabilityManager() + check rmResult.isOk() + rm = rmResult.get() + + teardown: + if not rm.isNil: + rm.cleanup() + + test "can create multi-channel manager without channel ID": + check rm.channels.len == 0 + + test "channel management": + let channel1 = "channel1" + let channel2 = "channel2" + + # Ensure channels + check rm.ensureChannel(channel1).isOk() + check rm.ensureChannel(channel2).isOk() + check rm.channels.len == 2 + + # Remove channel + check rm.removeChannel(channel1).isOk() + check rm.channels.len == 1 + check channel1 notin rm.channels + check channel2 in rm.channels + + test "stateless message unwrapping with channel extraction": + let channel1 = "test-channel-1" + let channel2 = "test-channel-2" + + # Create and wrap messages for different channels + let msg1 = @[byte(1), 2, 3] + let msgId1 = "msg1" + let wrapped1 = rm.wrapOutgoingMessage(msg1, msgId1, channel1) + check wrapped1.isOk() + + let msg2 = @[byte(4), 5, 6] + let msgId2 = "msg2" + let wrapped2 = rm.wrapOutgoingMessage(msg2, msgId2, channel2) + check wrapped2.isOk() + + # Unwrap messages - should extract channel ID and route correctly + let unwrap1 = rm.unwrapReceivedMessage(wrapped1.get()) + check unwrap1.isOk() + let (content1, deps1, extractedChannel1) = unwrap1.get() + check: + content1 == msg1 + deps1.len == 0 + extractedChannel1 == channel1 + + let unwrap2 = rm.unwrapReceivedMessage(wrapped2.get()) + check unwrap2.isOk() + let (content2, deps2, extractedChannel2) = unwrap2.get() + check: + content2 == msg2 + deps2.len == 0 + extractedChannel2 == channel2 + + test "channel isolation": + let channel1 = "isolated-channel-1" + let channel2 = "isolated-channel-2" + + # Add messages to different channels + let msg1 = @[byte(1)] + let msgId1 = "isolated-msg1" + discard rm.wrapOutgoingMessage(msg1, msgId1, channel1) + + let msg2 = @[byte(2)] + let msgId2 = "isolated-msg2" + discard rm.wrapOutgoingMessage(msg2, msgId2, channel2) + + # Check channel-specific data is isolated + let history1 = rm.getMessageHistory(channel1) + let history2 = rm.getMessageHistory(channel2) + + check: + history1.len == 1 + history2.len == 1 + msgId1 in history1 + msgId2 in history2 + msgId1 notin history2 + msgId2 notin history1 + + test "multi-channel callbacks": + var readyMessageCount = 0 + var sentMessageCount = 0 + var missingDepsCount = 0 + + rm.setCallbacks( + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + readyMessageCount += 1, + proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = + sentMessageCount += 1, + proc(messageId: SdsMessageID, deps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = + missingDepsCount += 1 + ) + + let channel1 = "callback-channel-1" + let channel2 = "callback-channel-2" + + # Send messages from both channels + let msg1 = @[byte(1)] + let msgId1 = "callback-msg1" + let wrapped1 = rm.wrapOutgoingMessage(msg1, msgId1, channel1) + check wrapped1.isOk() + + let msg2 = @[byte(2)] + let msgId2 = "callback-msg2" + let wrapped2 = rm.wrapOutgoingMessage(msg2, msgId2, channel2) + check wrapped2.isOk() + + # Create acknowledgment messages that include our message IDs in causal history + # to trigger sent callbacks + let ackMsg1 = SdsMessage( + messageId: "ack1", + lamportTimestamp: rm.channels[channel1].lamportTimestamp + 1, + causalHistory: @[msgId1], # Acknowledge msg1 + channelId: channel1, + content: @[byte(100)], + bloomFilter: @[], + ) + + let ackMsg2 = SdsMessage( + messageId: "ack2", + lamportTimestamp: rm.channels[channel2].lamportTimestamp + 1, + causalHistory: @[msgId2], # Acknowledge msg2 + channelId: channel2, + content: @[byte(101)], + bloomFilter: @[], + ) + + let serializedAck1 = serializeMessage(ackMsg1) + let serializedAck2 = serializeMessage(ackMsg2) + check: + serializedAck1.isOk() + serializedAck2.isOk() + + # Process acknowledgment messages - should trigger callbacks + discard rm.unwrapReceivedMessage(serializedAck1.get()) + discard rm.unwrapReceivedMessage(serializedAck2.get()) + + check: + readyMessageCount == 2 # Both ack messages should trigger ready callbacks + sentMessageCount == 2 # Both original messages should be marked as sent + missingDepsCount == 0 # No missing dependencies + + test "channel-specific dependency management": + let channel1 = "dep-channel-1" + let channel2 = "dep-channel-2" + let depIds = @["dep1", "dep2", "dep3"] + + # Ensure both channels exist first + check rm.ensureChannel(channel1).isOk() + check rm.ensureChannel(channel2).isOk() + + # Mark dependencies as met for specific channel + check rm.markDependenciesMet(depIds, channel1).isOk() + + # Dependencies should only affect the specified channel + # Dependencies in channel1 should not affect channel2 + check rm.channels[channel1].bloomFilter.contains("dep1") + check not rm.channels[channel2].bloomFilter.contains("dep1")