fix: protobuf causal history

This commit is contained in:
shash256 2024-12-09 17:32:50 +04:00
parent db190d914e
commit d92d2f733a
4 changed files with 46 additions and 48 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@ -3,13 +3,23 @@ import ./common
import libp2p/protobuf/minprotobuf import libp2p/protobuf/minprotobuf
import std/options import std/options
proc toString(bytes: seq[byte]): string =
result = newString(bytes.len)
copyMem(result[0].addr, bytes[0].unsafeAddr, bytes.len)
proc toBytes(s: string): seq[byte] =
result = newSeq[byte](s.len)
copyMem(result[0].addr, s[0].unsafeAddr, s.len)
proc encode*(msg: Message): ProtoBuffer = proc encode*(msg: Message): ProtoBuffer =
var pb = initProtoBuffer() var pb = initProtoBuffer()
pb.write(1, msg.messageId) pb.write(1, msg.messageId)
pb.write(2, uint64(msg.lamportTimestamp)) pb.write(2, uint64(msg.lamportTimestamp))
for hist in msg.causalHistory: for hist in msg.causalHistory:
pb.write(3, hist) pb.write(3, hist.toBytes) # Convert string to bytes for proper length handling
pb.write(4, msg.channelId) pb.write(4, msg.channelId)
pb.write(5, msg.content) pb.write(5, msg.content)
pb.finish() pb.finish()
@ -28,9 +38,12 @@ proc decode*(T: type Message, buffer: seq[byte]): ProtobufResult[T] =
return err(ProtobufError.missingRequiredField("lamportTimestamp")) return err(ProtobufError.missingRequiredField("lamportTimestamp"))
msg.lamportTimestamp = int64(timestamp) msg.lamportTimestamp = int64(timestamp)
var hist: string # Decode causal history
while ?pb.getField(3, hist): var histories: seq[seq[byte]]
msg.causalHistory.add(hist) for histBytes in histories:
let hist = histBytes.toString
if hist notin msg.causalHistory: # Avoid duplicate entries
msg.causalHistory.add(hist)
if not ?pb.getField(4, msg.channelId): if not ?pb.getField(4, msg.channelId):
return err(ProtobufError.missingRequiredField("channelId")) return err(ProtobufError.missingRequiredField("channelId"))

View File

@ -67,6 +67,7 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId:
withLock rm.lock: withLock rm.lock:
try: try:
rm.updateLamportTimestamp(getTime().toUnix)
let msg = Message( let msg = Message(
messageId: messageId, messageId: messageId,
lamportTimestamp: rm.lamportTimestamp, lamportTimestamp: rm.lamportTimestamp,
@ -74,8 +75,9 @@ proc wrapOutgoingMessage*(rm: ReliabilityManager, message: seq[byte], messageId:
channelId: rm.channelId, channelId: rm.channelId,
content: message content: message
) )
rm.updateLamportTimestamp(getTime().toUnix)
rm.outgoingBuffer.add(UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0)) rm.outgoingBuffer.add(UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0))
# rm.messageHistory.add(messageId)
# rm.bloomFilter.add(messageId)
return serializeMessage(msg) return serializeMessage(msg)
except: except:
return err(reInternalError) return err(reInternalError)

View File

@ -42,54 +42,37 @@ suite "ReliabilityManager":
missingDeps.len == 0 missingDeps.len == 0
test "markDependenciesMet": test "markDependenciesMet":
info "test_state", state="starting markDependenciesMet test" # First message
let msg1 = @[byte(1)]
let id1 = "msg1"
let wrap1 = rm.wrapOutgoingMessage(msg1, id1)
check wrap1.isOk()
let wrapped1 = wrap1.get()
block message1: # Second message
let msg1 = @[byte(1)] let msg2 = @[byte(2)]
let id1 = "msg1" let id2 = "msg2"
info "message_creation", msg="message 1", id=id1 let wrap2 = rm.wrapOutgoingMessage(msg2, id2)
let wrap1 = rm.wrapOutgoingMessage(msg1, id1) check wrap2.isOk()
check wrap1.isOk() let wrapped2 = wrap2.get()
let wrapped1 = wrap1.get()
info "message_processing", msg="message 1", id=id1 # Third message
let unwrap1 = rm.unwrapReceivedMessage(wrapped1) let msg3 = @[byte(3)]
check unwrap1.isOk() let id3 = "msg3"
let (content1, deps1) = unwrap1.get() let wrap3 = rm.wrapOutgoingMessage(msg3, id3)
info "message_processed", msg="message 1", deps_count=deps1.len check wrap3.isOk()
check content1 == msg1 let wrapped3 = wrap3.get()
block message2: # Check dependencies
let msg2 = @[byte(2)] var unwrap3 = rm.unwrapReceivedMessage(wrapped3)
let id2 = "msg2" check unwrap3.isOk()
info "message_creation", msg="message 2", id=id2 var (_, missing3) = unwrap3.get()
let wrap2 = rm.wrapOutgoingMessage(msg2, id2)
check wrap2.isOk()
let wrapped2 = wrap2.get()
info "message_processing", msg="message 2", id=id2 # Mark dependencies as met
let unwrap2 = rm.unwrapReceivedMessage(wrapped2) let markResult = rm.markDependenciesMet(@[id1, id2])
check unwrap2.isOk() check markResult.isOk()
let (content2, deps2) = unwrap2.get()
info "message_processed", msg="message 2", deps_count=deps2.len
check content2 == msg2
block message3: check missing3.len == 0
info "message_creation", msg="message 3"
let msg3 = @[byte(3)]
let id3 = "msg3"
let wrap3 = rm.wrapOutgoingMessage(msg3, id3)
check wrap3.isOk()
info "message_wrapped", msg="message 3", id=id3
let wrapped3 = wrap3.get()
info "checking_dependencies", msg="message 3", id=id3
var unwrap3 = rm.unwrapReceivedMessage(wrapped3)
check unwrap3.isOk()
var (content3, missing3) = unwrap3.get()
info "dependencies_checked", msg="message 3", missing_deps=missing3.len
info "test_state", state="completed"
test "callbacks work correctly": test "callbacks work correctly":
var messageReadyCount = 0 var messageReadyCount = 0