diff --git a/src/message.nim b/src/message.nim index f9c68c0..4f6640c 100644 --- a/src/message.nim +++ b/src/message.nim @@ -1,8 +1,8 @@ import std/[times, options, sets] type - SdsMessageID* = seq[byte] - SdsChannelID* = seq[byte] + SdsMessageID* = string + SdsChannelID* = string SdsMessage* = object messageId*: SdsMessageID diff --git a/src/protobuf.nim b/src/protobuf.nim index 4689da2..6a147c8 100644 --- a/src/protobuf.nim +++ b/src/protobuf.nim @@ -32,12 +32,12 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] = return err(ProtobufError.missingRequiredField("lamportTimestamp")) msg.lamportTimestamp = int64(timestamp) - var causalHistory: seq[seq[byte]] + var causalHistory: seq[SdsMessageID] let histResult = pb.getRepeatedField(3, causalHistory) if histResult.isOk: msg.causalHistory = causalHistory - var channelId: seq[byte] + var channelId: SdsChannelID if ?pb.getField(4, channelId): msg.channelId = some(channelId) else: diff --git a/src/reliability_utils.nim b/src/reliability_utils.nim index d9bf316..3cc23fa 100644 --- a/src/reliability_utils.nim +++ b/src/reliability_utils.nim @@ -12,6 +12,12 @@ type PeriodicSyncCallback* = proc() {.gcsafe, raises: [].} + AppCallbacks* = ref object + messageReadyCb*: MessageReadyCallback + messageSentCb*: MessageSentCallback + missingDependenciesCb*: MissingDependenciesCallback + periodicSyncCb*: PeriodicSyncCallback + ReliabilityConfig* = object bloomFilterCapacity*: int bloomFilterErrorRate*: float diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index 6f17aaa..a0b2135 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -1,4 +1,4 @@ -import unittest, results, chronos, std/times +import unittest, results, chronos, std/[times, options, tables] import ../src/[reliability, message, protobuf, reliability_utils, rolling_bloom_filter] # Core functionality tests @@ -6,7 +6,7 @@ suite "Core Operations": var rm: ReliabilityManager setup: - let rmResult = newReliabilityManager("testChannel") + let rmResult = newReliabilityManager(some("testChannel")) check rmResult.isOk() rm = rmResult.get() @@ -19,7 +19,6 @@ suite "Core Operations": check: config.bloomFilterCapacity == DefaultBloomFilterCapacity config.bloomFilterErrorRate == DefaultBloomFilterErrorRate - config.bloomFilterWindow == DefaultBloomFilterWindow config.maxMessageHistory == DefaultMaxMessageHistory test "basic message wrapping and unwrapping": @@ -40,20 +39,20 @@ suite "Core Operations": test "message ordering": # Create messages with different timestamps - let msg1 = Message( + let msg1 = SdsMessage( messageId: "msg1", lamportTimestamp: 1, causalHistory: @[], - channelId: "testChannel", + channelId: some("testChannel"), content: @[byte(1)], bloomFilter: @[], ) - let msg2 = Message( + let msg2 = SdsMessage( messageId: "msg2", lamportTimestamp: 5, causalHistory: @[], - channelId: "testChannel", + channelId: some("testChannel"), content: @[byte(2)], bloomFilter: @[], ) @@ -77,7 +76,7 @@ suite "Reliability Mechanisms": var rm: ReliabilityManager setup: - let rmResult = newReliabilityManager("testChannel") + let rmResult = newReliabilityManager(some("testChannel")) check rmResult.isOk() rm = rmResult.get() @@ -91,11 +90,11 @@ suite "Reliability Mechanisms": var missingDepsCount = 0 rm.setCallbacks( - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = messageReadyCount += 1, - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = missingDepsCount += 1, ) @@ -105,20 +104,20 @@ suite "Reliability Mechanisms": let id3 = "msg3" # Create messages with dependencies - let msg2 = Message( + let msg2 = SdsMessage( messageId: id2, lamportTimestamp: 2, causalHistory: @[id1], # msg2 depends on msg1 - channelId: "testChannel", + channelId: some("testChannel"), content: @[byte(2)], bloomFilter: @[], ) - let msg3 = Message( + let msg3 = SdsMessage( messageId: id3, lamportTimestamp: 3, causalHistory: @[id1, id2], # msg3 depends on both msg1 and msg2 - channelId: "testChannel", + channelId: some("testChannel"), content: @[byte(3)], bloomFilter: @[], ) @@ -168,11 +167,11 @@ suite "Reliability Mechanisms": var missingDepsCount = 0 rm.setCallbacks( - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = messageReadyCount += 1, - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = missingDepsCount += 1, ) @@ -183,11 +182,11 @@ suite "Reliability Mechanisms": check wrap1.isOk() # Create a message that has our message in causal history - let msg2 = Message( + let msg2 = SdsMessage( messageId: "msg2", lamportTimestamp: rm.lamportTimestamp + 1, causalHistory: @[id1], # Include our message in causal history - channelId: "testChannel", + channelId: some("testChannel"), content: @[byte(2)], bloomFilter: @[] # Test with an empty bloom filter , @@ -208,11 +207,11 @@ suite "Reliability Mechanisms": var messageSentCount = 0 rm.setCallbacks( - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = discard, - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = discard, ) @@ -223,19 +222,18 @@ suite "Reliability Mechanisms": check wrap1.isOk() # Create a message with bloom filter containing our message - var otherPartyBloomFilter = newRollingBloomFilter( - DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate, DefaultBloomFilterWindow - ) + var otherPartyBloomFilter = + newRollingBloomFilter(DefaultBloomFilterCapacity, DefaultBloomFilterErrorRate) otherPartyBloomFilter.add(id1) let bfResult = serializeBloomFilter(otherPartyBloomFilter.filter) check bfResult.isOk() - let msg2 = Message( + let msg2 = SdsMessage( messageId: "msg2", lamportTimestamp: rm.lamportTimestamp + 1, causalHistory: @[], # Empty causal history as we're using bloom filter - channelId: "testChannel", + channelId: some("testChannel"), content: @[byte(2)], bloomFilter: bfResult.get(), ) @@ -253,7 +251,7 @@ suite "Periodic Tasks & Buffer Management": var rm: ReliabilityManager setup: - let rmResult = newReliabilityManager("testChannel") + let rmResult = newReliabilityManager(some("testChannel")) check rmResult.isOk() rm = rmResult.get() @@ -265,11 +263,11 @@ suite "Periodic Tasks & Buffer Management": var messageSentCount = 0 rm.setCallbacks( - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = discard, - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = discard, ) @@ -284,11 +282,11 @@ suite "Periodic Tasks & Buffer Management": check outBuffer.len == 6 # Create message that acknowledges some messages - let ackMsg = Message( + let ackMsg = SdsMessage( messageId: "ack1", lamportTimestamp: rm.lamportTimestamp + 1, causalHistory: @["msg0", "msg2", "msg4"], - channelId: "testChannel", + channelId: some("testChannel"), content: @[byte(100)], bloomFilter: @[], ) @@ -311,19 +309,19 @@ suite "Periodic Tasks & Buffer Management": var config = defaultConfig() config.resendInterval = initDuration(milliseconds = 100) # Short for testing config.bufferSweepInterval = initDuration(milliseconds = 50) # Frequent sweeps - config.bloomFilterWindow = initDuration(milliseconds = 150) # Short window + config.bloomFilterCapacity = 2 # Small capacity for testing config.maxResendAttempts = 3 # Set a low number of max attempts - let rmResultP = newReliabilityManager("testChannel", config) + let rmResultP = newReliabilityManager(some("testChannel"), config) check rmResultP.isOk() let rm = rmResultP.get() rm.setCallbacks( - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = discard, - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = discard, ) @@ -340,34 +338,39 @@ suite "Periodic Tasks & Buffer Management": rm.startPeriodicTasks() - # Wait long enough for bloom filter window to pass and first message to exceed max retries + # Wait long enough for bloom filter waitFor sleepAsync(chronos.milliseconds(500)) - # Add new message + # Add new messages let msg2 = @[byte(2)] let id2 = "msg2" let wrap2 = rm.wrapOutgoingMessage(msg2, id2) check wrap2.isOk() + let msg3 = @[byte(3)] + let id3 = "msg3" + let wrap3 = rm.wrapOutgoingMessage(msg3, id3) + check wrap3.isOk() + let finalBuffer = rm.getOutgoingBuffer() check: - finalBuffer.len == 1 - # Only msg2 should be in buffer, msg1 should be removed after max retries + finalBuffer.len == 2 + # Only msg2 and msg3 should be in buffer, msg1 should be removed after max retries finalBuffer[0].message.messageId == id2 # Verify it's the second message finalBuffer[0].resendAttempts == 0 # New message should have 0 attempts not rm.bloomFilter.contains(id1) # Bloom filter cleaning check - rm.bloomFilter.contains(id2) # New message still in filter + rm.bloomFilter.contains(id3) # New message still in filter rm.cleanup() test "periodic sync callback": var syncCallCount = 0 rm.setCallbacks( - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = discard, - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = discard, - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = discard, proc() {.gcsafe.} = syncCallCount += 1, @@ -384,7 +387,7 @@ suite "Special Cases Handling": var rm: ReliabilityManager setup: - let rmResult = newReliabilityManager("testChannel") + let rmResult = newReliabilityManager(some("testChannel")) check rmResult.isOk() rm = rmResult.get() @@ -406,11 +409,11 @@ suite "Special Cases Handling": history[^1] == "msg" & $(rm.config.maxMessageHistory + 5) test "invalid bloom filter handling": - let msgInvalid = Message( + let msgInvalid = SdsMessage( messageId: "invalid-bf", lamportTimestamp: 1, causalHistory: @[], - channelId: "testChannel", + channelId: some("testChannel"), content: @[byte(1)], bloomFilter: @[1.byte, 2.byte, 3.byte] # Invalid filter data , @@ -428,20 +431,20 @@ suite "Special Cases Handling": test "duplicate message handling": var messageReadyCount = 0 rm.setCallbacks( - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = messageReadyCount += 1, - proc(messageId: MessageID) {.gcsafe.} = + proc(messageId: SdsMessageID) {.gcsafe.} = discard, - proc(messageId: MessageID, missingDeps: seq[MessageID]) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.} = discard, ) # Create and process a message - let msg = Message( + let msg = SdsMessage( messageId: "dup-msg", lamportTimestamp: 1, causalHistory: @[], - channelId: "testChannel", + channelId: some("testChannel"), content: @[byte(1)], bloomFilter: @[], ) @@ -475,7 +478,7 @@ suite "Special Cases Handling": suite "cleanup": test "cleanup works correctly": - let rmResult = newReliabilityManager("testChannel") + let rmResult = newReliabilityManager(some("testChannel")) check rmResult.isOk() let rm = rmResult.get()