From 8d61f5972fdec4e83af9641145e701796f393b1c Mon Sep 17 00:00:00 2001 From: shash256 <111925100+shash256@users.noreply.github.com> Date: Tue, 6 Jan 2026 02:28:49 +0700 Subject: [PATCH] chore: address review comments --- library/libsds.nim | 7 +++- .../requests/sds_message_request.nim | 2 +- src/protobuf.nim | 5 +-- src/reliability_utils.nim | 38 ++++++++----------- tests/test_reliability.nim | 24 ++++++++++-- 5 files changed, 45 insertions(+), 31 deletions(-) diff --git a/library/libsds.nim b/library/libsds.nim index cf7793a..bd77db2 100644 --- a/library/libsds.nim +++ b/library/libsds.nim @@ -126,9 +126,12 @@ proc onRetrievalHint(ctx: ptr SdsContext): RetrievalHintProvider = ) if not isNil(hint) and hintLen > 0: - result = newSeq[byte](hintLen) - copyMem(addr result[0], hint, hintLen) + var hintBytes = newSeq[byte](hintLen) + copyMem(addr hintBytes[0], hint, hintLen) deallocShared(hint) + return hintBytes + + return @[] ### End of not-exported components ################################################################################ diff --git a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim index 3c4353a..d42ee5a 100644 --- a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim +++ b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim @@ -65,7 +65,7 @@ proc process*( let (unwrappedMessage, missingDeps, extractedChannelId) = unwrapReceivedMessage(rm[], messageBytes).valueOr: return err("error processing UNWRAP_MESSAGE request: " & $error) - let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps.getMessageIds()) + let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps, channelId: extractedChannelId) # return the result as a json string var node = newJObject() diff --git a/src/protobuf.nim b/src/protobuf.nim index 77a551a..33eba0d 100644 --- a/src/protobuf.nim +++ b/src/protobuf.nim @@ -1,5 +1,4 @@ import libp2p/protobuf/minprotobuf -import std/options import endians import ../src/[message, protobufutil, bloom, reliability_utils] @@ -38,7 +37,7 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] = # Handle both old and new causal history formats var historyBuffers: seq[seq[byte]] - if pb.getRepeatedField(3, historyBuffers).isOk: + if pb.getRepeatedField(3, historyBuffers).isOk(): # New format: repeated HistoryEntry for histBuffer in historyBuffers: let entryPb = initProtoBuffer(histBuffer) @@ -52,7 +51,7 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] = # Try old format: repeated string var causalHistory: seq[SdsMessageID] let histResult = pb.getRepeatedField(3, causalHistory) - if histResult.isOk: + if histResult.isOk(): msg.causalHistory = toCausalHistory(causalHistory) if not ?pb.getField(4, msg.channelId): diff --git a/src/reliability_utils.nim b/src/reliability_utils.nim index 90055bc..0f12d24 100644 --- a/src/reliability_utils.nim +++ b/src/reliability_utils.nim @@ -135,19 +135,23 @@ proc newHistoryEntry*(messageId: SdsMessageID, retrievalHint: string): HistoryEn proc toCausalHistory*(messageIds: seq[SdsMessageID]): seq[HistoryEntry] = ## Converts a sequence of message IDs to HistoryEntry sequence - result = newSeq[HistoryEntry](messageIds.len) + var entries = newSeq[HistoryEntry](messageIds.len) for i, msgId in messageIds: - result[i] = newHistoryEntry(msgId) + entries[i] = newHistoryEntry(msgId) + return entries proc getMessageIds*(causalHistory: seq[HistoryEntry]): seq[SdsMessageID] = ## Extracts message IDs from HistoryEntry sequence - result = newSeq[SdsMessageID](causalHistory.len) + var messageIds = newSeq[SdsMessageID](causalHistory.len) for i, entry in causalHistory: - result[i] = entry.messageId + messageIds[i] = entry.messageId + return messageIds proc getRecentHistoryEntries*( rm: ReliabilityManager, n: int, channelId: SdsChannelID ): seq[HistoryEntry] = + ## Get recent history entries for sending in causal history. + ## Populates retrieval hints for our own messages using the provider callback. try: if channelId in rm.channels: let channel = rm.channels[channelId] @@ -155,42 +159,32 @@ proc getRecentHistoryEntries*( if rm.onRetrievalHint.isNil(): return toCausalHistory(recentMessageIds) else: + var entries: seq[HistoryEntry] = @[] for msgId in recentMessageIds: let hint = rm.onRetrievalHint(msgId) - result.add(newHistoryEntry(msgId, hint)) + entries.add(newHistoryEntry(msgId, hint)) + return entries else: - result = @[] + return @[] except Exception: error "Failed to get recent history entries", channelId = channelId, n = n, error = getCurrentExceptionMsg() - result = @[] + return @[] proc checkDependencies*( rm: ReliabilityManager, deps: seq[HistoryEntry], channelId: SdsChannelID ): seq[HistoryEntry] = + ## Check which dependencies are missing from our message history. var missingDeps: seq[HistoryEntry] = @[] try: if channelId in rm.channels: let channel = rm.channels[channelId] for dep in deps: if dep.messageId notin channel.messageHistory: - # If we have a retrieval hint provider and the original dep has no hint, get one - if not rm.onRetrievalHint.isNil() and dep.retrievalHint.len == 0: - let hint = rm.onRetrievalHint(dep.messageId) - missingDeps.add(newHistoryEntry(dep.messageId, hint)) - else: - missingDeps.add(dep) + missingDeps.add(dep) else: # Channel doesn't exist, all deps are missing - if not rm.onRetrievalHint.isNil(): - for dep in deps: - if dep.retrievalHint.len == 0: - let hint = rm.onRetrievalHint(dep.messageId) - missingDeps.add(newHistoryEntry(dep.messageId, hint)) - else: - missingDeps.add(dep) - else: - missingDeps = deps + missingDeps = deps except Exception: error "Failed to check dependencies", channelId = channelId, error = getCurrentExceptionMsg() diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index e9d44dc..bd9b21c 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -286,7 +286,7 @@ suite "Reliability Mechanisms": check unwrappedMsg2.causalHistory[0].messageId == id1 check unwrappedMsg2.causalHistory[0].retrievalHint == cast[seq[byte]]("hint:" & id1) - # Create a message with a missing dependency + # Create a message with a missing dependency (no retrieval hint) let msg3 = SdsMessage( messageId: "msg3", lamportTimestamp: 3, @@ -301,8 +301,26 @@ suite "Reliability Mechanisms": let (_, missingDeps3, _) = unwrapResult3.get() check missingDeps3.len == 1 check missingDeps3[0].messageId == "missing-dep" - # The hint should be populated by the retrieval hint provider for missing dependencies - check missingDeps3[0].retrievalHint == cast[seq[byte]]("hint:missing-dep") + # The hint is empty because it was not provided by the remote sender + check missingDeps3[0].retrievalHint.len == 0 + + # Test with a message that HAS a retrieval hint from remote + let msg4 = SdsMessage( + messageId: "msg4", + lamportTimestamp: 4, + causalHistory: @[newHistoryEntry("another-missing", cast[seq[byte]]("remote-hint"))], + channelId: testChannel, + content: @[byte(4)], + bloomFilter: @[], + ) + let serialized4 = serializeMessage(msg4).get() + let unwrapResult4 = rm.unwrapReceivedMessage(serialized4) + check unwrapResult4.isOk() + let (_, missingDeps4, _) = unwrapResult4.get() + check missingDeps4.len == 1 + check missingDeps4[0].messageId == "another-missing" + # The hint should be preserved from the remote sender + check missingDeps4[0].retrievalHint == cast[seq[byte]]("remote-hint") # Periodic task & Buffer management tests suite "Periodic Tasks & Buffer Management":