diff --git a/.DS_Store b/.DS_Store index f79e090..31fc1d5 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/src/protobuf.nim b/src/protobuf.nim index 794cbf4..b674511 100644 --- a/src/protobuf.nim +++ b/src/protobuf.nim @@ -3,13 +3,23 @@ import ./common import libp2p/protobuf/minprotobuf import std/options +proc toString(bytes: seq[byte]): string = + result = newString(bytes.len) + copyMem(result[0].addr, bytes[0].unsafeAddr, bytes.len) + +proc toBytes(s: string): seq[byte] = + result = newSeq[byte](s.len) + copyMem(result[0].addr, s[0].unsafeAddr, s.len) + proc encode*(msg: Message): ProtoBuffer = var pb = initProtoBuffer() pb.write(1, msg.messageId) pb.write(2, uint64(msg.lamportTimestamp)) + for hist in msg.causalHistory: - pb.write(3, hist) + pb.write(3, hist.toBytes) # Convert string to bytes for proper length handling + pb.write(4, msg.channelId) pb.write(5, msg.content) pb.finish() @@ -28,9 +38,12 @@ proc decode*(T: type Message, buffer: seq[byte]): ProtobufResult[T] = return err(ProtobufError.missingRequiredField("lamportTimestamp")) msg.lamportTimestamp = int64(timestamp) - var hist: string - while ?pb.getField(3, hist): - msg.causalHistory.add(hist) + # Decode causal history + var histories: seq[seq[byte]] + for histBytes in histories: + let hist = histBytes.toString + if hist notin msg.causalHistory: # Avoid duplicate entries + msg.causalHistory.add(hist) if not ?pb.getField(4, msg.channelId): return err(ProtobufError.missingRequiredField("channelId")) diff --git a/src/reliability.nim b/src/reliability.nim index fea7048..79c586e 100644 --- a/src/reliability.nim +++ b/src/reliability.nim @@ -67,6 +67,7 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId: withLock rm.lock: try: + rm.updateLamportTimestamp(getTime().toUnix) let msg = Message( messageId: messageId, lamportTimestamp: rm.lamportTimestamp, @@ -74,8 +75,9 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId: channelId: rm.channelId, content: message ) - rm.updateLamportTimestamp(getTime().toUnix) rm.outgoingBuffer.add(UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0)) + # rm.messageHistory.add(messageId) + # rm.bloomFilter.add(messageId) return serializeMessage(msg) except: return err(reInternalError) diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index d30eb0d..48e02d7 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -42,54 +42,37 @@ suite "ReliabilityManager": missingDeps.len == 0 test "markDependenciesMet": - info "test_state", state="starting markDependenciesMet test" + # First message + let msg1 = @[byte(1)] + let id1 = "msg1" + let wrap1 = rm.wrapOutgoingMessage(msg1, id1) + check wrap1.isOk() + let wrapped1 = wrap1.get() - block message1: - let msg1 = @[byte(1)] - let id1 = "msg1" - info "message_creation", msg="message 1", id=id1 - let wrap1 = rm.wrapOutgoingMessage(msg1, id1) - check wrap1.isOk() - let wrapped1 = wrap1.get() + # Second message + let msg2 = @[byte(2)] + let id2 = "msg2" + let wrap2 = rm.wrapOutgoingMessage(msg2, id2) + check wrap2.isOk() + let wrapped2 = wrap2.get() - info "message_processing", msg="message 1", id=id1 - let unwrap1 = rm.unwrapReceivedMessage(wrapped1) - check unwrap1.isOk() - let (content1, deps1) = unwrap1.get() - info "message_processed", msg="message 1", deps_count=deps1.len - check content1 == msg1 + # Third message + let msg3 = @[byte(3)] + let id3 = "msg3" + let wrap3 = rm.wrapOutgoingMessage(msg3, id3) + check wrap3.isOk() + let wrapped3 = wrap3.get() - block message2: - let msg2 = @[byte(2)] - let id2 = "msg2" - info "message_creation", msg="message 2", id=id2 - let wrap2 = rm.wrapOutgoingMessage(msg2, id2) - check wrap2.isOk() - let wrapped2 = wrap2.get() + # Check dependencies + var unwrap3 = rm.unwrapReceivedMessage(wrapped3) + check unwrap3.isOk() + var (_, missing3) = unwrap3.get() - info "message_processing", msg="message 2", id=id2 - let unwrap2 = rm.unwrapReceivedMessage(wrapped2) - check unwrap2.isOk() - let (content2, deps2) = unwrap2.get() - info "message_processed", msg="message 2", deps_count=deps2.len - check content2 == msg2 + # Mark dependencies as met + let markResult = rm.markDependenciesMet(@[id1, id2]) + check markResult.isOk() - block message3: - info "message_creation", msg="message 3" - let msg3 = @[byte(3)] - let id3 = "msg3" - let wrap3 = rm.wrapOutgoingMessage(msg3, id3) - check wrap3.isOk() - info "message_wrapped", msg="message 3", id=id3 - let wrapped3 = wrap3.get() - - info "checking_dependencies", msg="message 3", id=id3 - var unwrap3 = rm.unwrapReceivedMessage(wrapped3) - check unwrap3.isOk() - var (content3, missing3) = unwrap3.get() - info "dependencies_checked", msg="message 3", missing_deps=missing3.len - - info "test_state", state="completed" + check missing3.len == 0 test "callbacks work correctly": var messageReadyCount = 0