mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-01-27 02:13:09 +00:00
chore: address review comments
This commit is contained in:
parent
521b76fef1
commit
8d61f5972f
@ -126,9 +126,12 @@ proc onRetrievalHint(ctx: ptr SdsContext): RetrievalHintProvider =
|
||||
)
|
||||
|
||||
if not isNil(hint) and hintLen > 0:
|
||||
result = newSeq[byte](hintLen)
|
||||
copyMem(addr result[0], hint, hintLen)
|
||||
var hintBytes = newSeq[byte](hintLen)
|
||||
copyMem(addr hintBytes[0], hint, hintLen)
|
||||
deallocShared(hint)
|
||||
return hintBytes
|
||||
|
||||
return @[]
|
||||
|
||||
### End of not-exported components
|
||||
################################################################################
|
||||
|
||||
@ -65,7 +65,7 @@ proc process*(
|
||||
let (unwrappedMessage, missingDeps, extractedChannelId) = unwrapReceivedMessage(rm[], messageBytes).valueOr:
|
||||
return err("error processing UNWRAP_MESSAGE request: " & $error)
|
||||
|
||||
let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps.getMessageIds())
|
||||
let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps, channelId: extractedChannelId)
|
||||
|
||||
# return the result as a json string
|
||||
var node = newJObject()
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
import libp2p/protobuf/minprotobuf
|
||||
import std/options
|
||||
import endians
|
||||
import ../src/[message, protobufutil, bloom, reliability_utils]
|
||||
|
||||
@ -38,7 +37,7 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] =
|
||||
|
||||
# Handle both old and new causal history formats
|
||||
var historyBuffers: seq[seq[byte]]
|
||||
if pb.getRepeatedField(3, historyBuffers).isOk:
|
||||
if pb.getRepeatedField(3, historyBuffers).isOk():
|
||||
# New format: repeated HistoryEntry
|
||||
for histBuffer in historyBuffers:
|
||||
let entryPb = initProtoBuffer(histBuffer)
|
||||
@ -52,7 +51,7 @@ proc decode*(T: type SdsMessage, buffer: seq[byte]): ProtobufResult[T] =
|
||||
# Try old format: repeated string
|
||||
var causalHistory: seq[SdsMessageID]
|
||||
let histResult = pb.getRepeatedField(3, causalHistory)
|
||||
if histResult.isOk:
|
||||
if histResult.isOk():
|
||||
msg.causalHistory = toCausalHistory(causalHistory)
|
||||
|
||||
if not ?pb.getField(4, msg.channelId):
|
||||
|
||||
@ -135,19 +135,23 @@ proc newHistoryEntry*(messageId: SdsMessageID, retrievalHint: string): HistoryEn
|
||||
|
||||
proc toCausalHistory*(messageIds: seq[SdsMessageID]): seq[HistoryEntry] =
|
||||
## Converts a sequence of message IDs to HistoryEntry sequence
|
||||
result = newSeq[HistoryEntry](messageIds.len)
|
||||
var entries = newSeq[HistoryEntry](messageIds.len)
|
||||
for i, msgId in messageIds:
|
||||
result[i] = newHistoryEntry(msgId)
|
||||
entries[i] = newHistoryEntry(msgId)
|
||||
return entries
|
||||
|
||||
proc getMessageIds*(causalHistory: seq[HistoryEntry]): seq[SdsMessageID] =
|
||||
## Extracts message IDs from HistoryEntry sequence
|
||||
result = newSeq[SdsMessageID](causalHistory.len)
|
||||
var messageIds = newSeq[SdsMessageID](causalHistory.len)
|
||||
for i, entry in causalHistory:
|
||||
result[i] = entry.messageId
|
||||
messageIds[i] = entry.messageId
|
||||
return messageIds
|
||||
|
||||
proc getRecentHistoryEntries*(
|
||||
rm: ReliabilityManager, n: int, channelId: SdsChannelID
|
||||
): seq[HistoryEntry] =
|
||||
## Get recent history entries for sending in causal history.
|
||||
## Populates retrieval hints for our own messages using the provider callback.
|
||||
try:
|
||||
if channelId in rm.channels:
|
||||
let channel = rm.channels[channelId]
|
||||
@ -155,42 +159,32 @@ proc getRecentHistoryEntries*(
|
||||
if rm.onRetrievalHint.isNil():
|
||||
return toCausalHistory(recentMessageIds)
|
||||
else:
|
||||
var entries: seq[HistoryEntry] = @[]
|
||||
for msgId in recentMessageIds:
|
||||
let hint = rm.onRetrievalHint(msgId)
|
||||
result.add(newHistoryEntry(msgId, hint))
|
||||
entries.add(newHistoryEntry(msgId, hint))
|
||||
return entries
|
||||
else:
|
||||
result = @[]
|
||||
return @[]
|
||||
except Exception:
|
||||
error "Failed to get recent history entries",
|
||||
channelId = channelId, n = n, error = getCurrentExceptionMsg()
|
||||
result = @[]
|
||||
return @[]
|
||||
|
||||
proc checkDependencies*(
|
||||
rm: ReliabilityManager, deps: seq[HistoryEntry], channelId: SdsChannelID
|
||||
): seq[HistoryEntry] =
|
||||
## Check which dependencies are missing from our message history.
|
||||
var missingDeps: seq[HistoryEntry] = @[]
|
||||
try:
|
||||
if channelId in rm.channels:
|
||||
let channel = rm.channels[channelId]
|
||||
for dep in deps:
|
||||
if dep.messageId notin channel.messageHistory:
|
||||
# If we have a retrieval hint provider and the original dep has no hint, get one
|
||||
if not rm.onRetrievalHint.isNil() and dep.retrievalHint.len == 0:
|
||||
let hint = rm.onRetrievalHint(dep.messageId)
|
||||
missingDeps.add(newHistoryEntry(dep.messageId, hint))
|
||||
else:
|
||||
missingDeps.add(dep)
|
||||
missingDeps.add(dep)
|
||||
else:
|
||||
# Channel doesn't exist, all deps are missing
|
||||
if not rm.onRetrievalHint.isNil():
|
||||
for dep in deps:
|
||||
if dep.retrievalHint.len == 0:
|
||||
let hint = rm.onRetrievalHint(dep.messageId)
|
||||
missingDeps.add(newHistoryEntry(dep.messageId, hint))
|
||||
else:
|
||||
missingDeps.add(dep)
|
||||
else:
|
||||
missingDeps = deps
|
||||
missingDeps = deps
|
||||
except Exception:
|
||||
error "Failed to check dependencies",
|
||||
channelId = channelId, error = getCurrentExceptionMsg()
|
||||
|
||||
@ -286,7 +286,7 @@ suite "Reliability Mechanisms":
|
||||
check unwrappedMsg2.causalHistory[0].messageId == id1
|
||||
check unwrappedMsg2.causalHistory[0].retrievalHint == cast[seq[byte]]("hint:" & id1)
|
||||
|
||||
# Create a message with a missing dependency
|
||||
# Create a message with a missing dependency (no retrieval hint)
|
||||
let msg3 = SdsMessage(
|
||||
messageId: "msg3",
|
||||
lamportTimestamp: 3,
|
||||
@ -301,8 +301,26 @@ suite "Reliability Mechanisms":
|
||||
let (_, missingDeps3, _) = unwrapResult3.get()
|
||||
check missingDeps3.len == 1
|
||||
check missingDeps3[0].messageId == "missing-dep"
|
||||
# The hint should be populated by the retrieval hint provider for missing dependencies
|
||||
check missingDeps3[0].retrievalHint == cast[seq[byte]]("hint:missing-dep")
|
||||
# The hint is empty because it was not provided by the remote sender
|
||||
check missingDeps3[0].retrievalHint.len == 0
|
||||
|
||||
# Test with a message that HAS a retrieval hint from remote
|
||||
let msg4 = SdsMessage(
|
||||
messageId: "msg4",
|
||||
lamportTimestamp: 4,
|
||||
causalHistory: @[newHistoryEntry("another-missing", cast[seq[byte]]("remote-hint"))],
|
||||
channelId: testChannel,
|
||||
content: @[byte(4)],
|
||||
bloomFilter: @[],
|
||||
)
|
||||
let serialized4 = serializeMessage(msg4).get()
|
||||
let unwrapResult4 = rm.unwrapReceivedMessage(serialized4)
|
||||
check unwrapResult4.isOk()
|
||||
let (_, missingDeps4, _) = unwrapResult4.get()
|
||||
check missingDeps4.len == 1
|
||||
check missingDeps4[0].messageId == "another-missing"
|
||||
# The hint should be preserved from the remote sender
|
||||
check missingDeps4[0].retrievalHint == cast[seq[byte]]("remote-hint")
|
||||
|
||||
# Periodic task & Buffer management tests
|
||||
suite "Periodic Tasks & Buffer Management":
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user