use HistoryEntry for deps

This commit is contained in:
shash256 2025-07-25 12:01:01 +05:30
parent b7b24c6747
commit b69164da17
7 changed files with 62 additions and 26 deletions

View File

@ -1,15 +1,15 @@
import std/json
import ./json_base_event, ../../src/[message]
import ./json_base_event, ../../src/[message], std/base64
type JsonMissingDependenciesEvent* = ref object of JsonEvent
messageId*: SdsMessageID
missingDeps: seq[SdsMessageID]
missingDeps*: seq[HistoryEntry]
channelId*: SdsChannelID
proc new*(
T: type JsonMissingDependenciesEvent,
messageId: SdsMessageID,
missingDeps: seq[SdsMessageID],
missingDeps: seq[HistoryEntry],
channelId: SdsChannelID,
): T =
return JsonMissingDependenciesEvent(
@ -17,4 +17,15 @@ proc new*(
)
method `$`*(jsonMissingDependencies: JsonMissingDependenciesEvent): string =
$(%*jsonMissingDependencies)
var node = newJObject()
node["eventType"] = %*jsonMissingDependencies.eventType
node["messageId"] = %*jsonMissingDependencies.messageId
node["channelId"] = %*jsonMissingDependencies.channelId
var missingDepsNode = newJArray()
for dep in jsonMissingDependencies.missingDeps:
var depNode = newJObject()
depNode["messageId"] = %*dep.messageId
depNode["retrievalHint"] = %*encode(dep.retrievalHint)
missingDepsNode.add(depNode)
node["missingDeps"] = missingDepsNode
$node

View File

@ -82,7 +82,7 @@ proc onMessageSent(ctx: ptr SdsContext): MessageSentCallback =
$JsonMessageSentEvent.new(messageId, channelId)
proc onMissingDependencies(ctx: ptr SdsContext): MissingDependenciesCallback =
return proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
return proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
callEventCallback(ctx, "onMissingDependencies"):
$JsonMissingDependenciesEvent.new(messageId, missingDeps, channelId)

View File

@ -40,6 +40,7 @@ proc createReliabilityManager(
rm.setCallbacks(
appCallbacks.messageReadyCb, appCallbacks.messageSentCb,
appCallbacks.missingDependenciesCb, appCallbacks.periodicSyncCb,
appCallbacks.retrievalHintProvider,
)
return ok(rm)

View File

@ -1,4 +1,4 @@
import std/[json, strutils, net, sequtils]
import std/[json, strutils, net, sequtils, base64]
import chronos, chronicles, results
import ../../../alloc
@ -17,7 +17,7 @@ type SdsMessageRequest* = object
type SdsUnwrapResponse* = object
message*: seq[byte]
missingDeps*: seq[SdsMessageID]
missingDeps*: seq[HistoryEntry]
proc createShared*(
T: type SdsMessageRequest,
@ -61,13 +61,23 @@ proc process*(
of UNWRAP_MESSAGE:
let messageBytes = self.message.toSeq()
let (unwrappedMessage, missingDeps, _) = unwrapReceivedMessage(rm[], messageBytes).valueOr:
let (unwrappedMessage, missingDeps, extractedChannelId) = unwrapReceivedMessage(rm[], messageBytes).valueOr:
error "UNWRAP_MESSAGE failed", error = error
return err("error processing UNWRAP_MESSAGE request: " & $error)
let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps.getMessageIds())
let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps)
# return the result as a json string
return ok($(%*(res)))
var node = newJObject()
node["message"] = %*res.message
node["channelId"] = %*extractedChannelId
var missingDepsNode = newJArray()
for dep in res.missingDeps:
var depNode = newJObject()
depNode["messageId"] = %*dep.messageId
depNode["retrievalHint"] = %*encode(dep.retrievalHint)
missingDepsNode.add(depNode)
node["missingDeps"] = missingDepsNode
return ok($node)
return ok("")

View File

@ -226,7 +226,7 @@ proc unwrapReceivedMessage*(
channel.incomingBuffer[msg.messageId] =
IncomingMessage(message: msg, missingDeps: missingDeps.getMessageIds().toHashSet())
if not rm.onMissingDependencies.isNil():
rm.onMissingDependencies(msg.messageId, missingDeps.getMessageIds(), channelId)
rm.onMissingDependencies(msg.messageId, missingDeps, channelId)
return ok((msg.content, missingDeps, channelId))
except Exception:

View File

@ -10,7 +10,7 @@ type
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.}
MissingDependenciesCallback* = proc(
messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID
messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID
) {.gcsafe.}
RetrievalHintProvider* = proc(messageId: SdsMessageID): seq[byte] {.gcsafe.}
@ -48,7 +48,7 @@ type
onMessageReady*: proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.}
onMessageSent*: proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.}
onMissingDependencies*: proc(
messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID
messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID
) {.gcsafe.}
onPeriodicSync*: PeriodicSyncCallback
onRetrievalHint*: RetrievalHintProvider
@ -153,9 +153,23 @@ proc checkDependencies*(
let channel = rm.channels[channelId]
for dep in deps:
if dep.messageId notin channel.messageHistory:
missingDeps.add(dep)
# If we have a retrieval hint provider and the original dep has no hint, get one
if not rm.onRetrievalHint.isNil() and dep.retrievalHint.len == 0:
let hint = rm.onRetrievalHint(dep.messageId)
missingDeps.add(newHistoryEntry(dep.messageId, hint))
else:
missingDeps.add(dep)
else:
missingDeps = deps
# Channel doesn't exist, all deps are missing
if not rm.onRetrievalHint.isNil():
for dep in deps:
if dep.retrievalHint.len == 0:
let hint = rm.onRetrievalHint(dep.messageId)
missingDeps.add(newHistoryEntry(dep.messageId, hint))
else:
missingDeps.add(dep)
else:
missingDeps = deps
except Exception:
error "Failed to check dependencies",
channelId = channelId, error = getCurrentExceptionMsg()

View File

@ -99,7 +99,7 @@ suite "Reliability Mechanisms":
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
missingDepsCount += 1,
)
@ -176,7 +176,7 @@ suite "Reliability Mechanisms":
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
missingDepsCount += 1,
)
@ -216,7 +216,7 @@ suite "Reliability Mechanisms":
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
discard,
)
@ -261,7 +261,7 @@ suite "Reliability Mechanisms":
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
missingDepsCount += 1,
nil,
proc(messageId: SdsMessageID): seq[byte] =
@ -301,8 +301,8 @@ suite "Reliability Mechanisms":
let (_, missingDeps3, _) = unwrapResult3.get()
check missingDeps3.len == 1
check missingDeps3[0].messageId == "missing-dep"
# The hint is empty because it was not in our history, so the provider was not called
check missingDeps3[0].retrievalHint.len == 0
# The hint should be populated by the retrieval hint provider for missing dependencies
check missingDeps3[0].retrievalHint == cast[seq[byte]]("hint:missing-dep")
# Periodic task & Buffer management tests
suite "Periodic Tasks & Buffer Management":
@ -326,7 +326,7 @@ suite "Periodic Tasks & Buffer Management":
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
discard,
)
@ -381,7 +381,7 @@ suite "Periodic Tasks & Buffer Management":
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
messageSentCount += 1,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
discard,
)
@ -430,7 +430,7 @@ suite "Periodic Tasks & Buffer Management":
discard,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
discard,
proc() {.gcsafe.} =
syncCallCount += 1,
@ -496,7 +496,7 @@ suite "Special Cases Handling":
messageReadyCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
discard,
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
discard,
)
@ -654,7 +654,7 @@ suite "Multi-Channel ReliabilityManager Tests":
readyMessageCount += 1,
proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} =
sentMessageCount += 1,
proc(messageId: SdsMessageID, deps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
proc(messageId: SdsMessageID, deps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} =
missingDepsCount += 1
)