From b69164da17da0b0e9e6b0b8478790f7ae93b4777 Mon Sep 17 00:00:00 2001 From: shash256 <111925100+shash256@users.noreply.github.com> Date: Fri, 25 Jul 2025 12:01:01 +0530 Subject: [PATCH] use HistoryEntry for deps --- .../json_missing_dependencies_event.nim | 19 ++++++++++++---- library/libsds.nim | 2 +- .../requests/sds_lifecycle_request.nim | 1 + .../requests/sds_message_request.nim | 20 ++++++++++++----- src/reliability.nim | 2 +- src/reliability_utils.nim | 22 +++++++++++++++---- tests/test_reliability.nim | 22 +++++++++---------- 7 files changed, 62 insertions(+), 26 deletions(-) diff --git a/library/events/json_missing_dependencies_event.nim b/library/events/json_missing_dependencies_event.nim index ef7ae57..2b4a094 100644 --- a/library/events/json_missing_dependencies_event.nim +++ b/library/events/json_missing_dependencies_event.nim @@ -1,15 +1,15 @@ import std/json -import ./json_base_event, ../../src/[message] +import ./json_base_event, ../../src/[message], std/base64 type JsonMissingDependenciesEvent* = ref object of JsonEvent messageId*: SdsMessageID - missingDeps: seq[SdsMessageID] + missingDeps*: seq[HistoryEntry] channelId*: SdsChannelID proc new*( T: type JsonMissingDependenciesEvent, messageId: SdsMessageID, - missingDeps: seq[SdsMessageID], + missingDeps: seq[HistoryEntry], channelId: SdsChannelID, ): T = return JsonMissingDependenciesEvent( @@ -17,4 +17,15 @@ proc new*( ) method `$`*(jsonMissingDependencies: JsonMissingDependenciesEvent): string = - $(%*jsonMissingDependencies) + var node = newJObject() + node["eventType"] = %*jsonMissingDependencies.eventType + node["messageId"] = %*jsonMissingDependencies.messageId + node["channelId"] = %*jsonMissingDependencies.channelId + var missingDepsNode = newJArray() + for dep in jsonMissingDependencies.missingDeps: + var depNode = newJObject() + depNode["messageId"] = %*dep.messageId + depNode["retrievalHint"] = %*encode(dep.retrievalHint) + missingDepsNode.add(depNode) + node["missingDeps"] = missingDepsNode + $node diff --git a/library/libsds.nim b/library/libsds.nim index d13290a..cd2c49b 100644 --- a/library/libsds.nim +++ b/library/libsds.nim @@ -82,7 +82,7 @@ proc onMessageSent(ctx: ptr SdsContext): MessageSentCallback = $JsonMessageSentEvent.new(messageId, channelId) proc onMissingDependencies(ctx: ptr SdsContext): MissingDependenciesCallback = - return proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = + return proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = callEventCallback(ctx, "onMissingDependencies"): $JsonMissingDependenciesEvent.new(messageId, missingDeps, channelId) diff --git a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim index fd5a615..19b9b3f 100644 --- a/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim +++ b/library/sds_thread/inter_thread_communication/requests/sds_lifecycle_request.nim @@ -40,6 +40,7 @@ proc createReliabilityManager( rm.setCallbacks( appCallbacks.messageReadyCb, appCallbacks.messageSentCb, appCallbacks.missingDependenciesCb, appCallbacks.periodicSyncCb, + appCallbacks.retrievalHintProvider, ) return ok(rm) diff --git a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim index 3a07fa3..63e0704 100644 --- a/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim +++ b/library/sds_thread/inter_thread_communication/requests/sds_message_request.nim @@ -1,4 +1,4 @@ -import std/[json, strutils, net, sequtils] +import std/[json, strutils, net, sequtils, base64] import chronos, chronicles, results import ../../../alloc @@ -17,7 +17,7 @@ type SdsMessageRequest* = object type SdsUnwrapResponse* = object message*: seq[byte] - missingDeps*: seq[SdsMessageID] + missingDeps*: seq[HistoryEntry] proc createShared*( T: type SdsMessageRequest, @@ -61,13 +61,23 @@ proc process*( of UNWRAP_MESSAGE: let messageBytes = self.message.toSeq() - let (unwrappedMessage, missingDeps, _) = unwrapReceivedMessage(rm[], messageBytes).valueOr: + let (unwrappedMessage, missingDeps, extractedChannelId) = unwrapReceivedMessage(rm[], messageBytes).valueOr: error "UNWRAP_MESSAGE failed", error = error return err("error processing UNWRAP_MESSAGE request: " & $error) - let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps.getMessageIds()) + let res = SdsUnwrapResponse(message: unwrappedMessage, missingDeps: missingDeps) # return the result as a json string - return ok($(%*(res))) + var node = newJObject() + node["message"] = %*res.message + node["channelId"] = %*extractedChannelId + var missingDepsNode = newJArray() + for dep in res.missingDeps: + var depNode = newJObject() + depNode["messageId"] = %*dep.messageId + depNode["retrievalHint"] = %*encode(dep.retrievalHint) + missingDepsNode.add(depNode) + node["missingDeps"] = missingDepsNode + return ok($node) return ok("") diff --git a/src/reliability.nim b/src/reliability.nim index 2ce37fc..f73d5de 100644 --- a/src/reliability.nim +++ b/src/reliability.nim @@ -226,7 +226,7 @@ proc unwrapReceivedMessage*( channel.incomingBuffer[msg.messageId] = IncomingMessage(message: msg, missingDeps: missingDeps.getMessageIds().toHashSet()) if not rm.onMissingDependencies.isNil(): - rm.onMissingDependencies(msg.messageId, missingDeps.getMessageIds(), channelId) + rm.onMissingDependencies(msg.messageId, missingDeps, channelId) return ok((msg.content, missingDeps, channelId)) except Exception: diff --git a/src/reliability_utils.nim b/src/reliability_utils.nim index 83b73bd..5e88fc9 100644 --- a/src/reliability_utils.nim +++ b/src/reliability_utils.nim @@ -10,7 +10,7 @@ type proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} MissingDependenciesCallback* = proc( - messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID + messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID ) {.gcsafe.} RetrievalHintProvider* = proc(messageId: SdsMessageID): seq[byte] {.gcsafe.} @@ -48,7 +48,7 @@ type onMessageReady*: proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} onMessageSent*: proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} onMissingDependencies*: proc( - messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID + messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID ) {.gcsafe.} onPeriodicSync*: PeriodicSyncCallback onRetrievalHint*: RetrievalHintProvider @@ -153,9 +153,23 @@ proc checkDependencies*( let channel = rm.channels[channelId] for dep in deps: if dep.messageId notin channel.messageHistory: - missingDeps.add(dep) + # If we have a retrieval hint provider and the original dep has no hint, get one + if not rm.onRetrievalHint.isNil() and dep.retrievalHint.len == 0: + let hint = rm.onRetrievalHint(dep.messageId) + missingDeps.add(newHistoryEntry(dep.messageId, hint)) + else: + missingDeps.add(dep) else: - missingDeps = deps + # Channel doesn't exist, all deps are missing + if not rm.onRetrievalHint.isNil(): + for dep in deps: + if dep.retrievalHint.len == 0: + let hint = rm.onRetrievalHint(dep.messageId) + missingDeps.add(newHistoryEntry(dep.messageId, hint)) + else: + missingDeps.add(dep) + else: + missingDeps = deps except Exception: error "Failed to check dependencies", channelId = channelId, error = getCurrentExceptionMsg() diff --git a/tests/test_reliability.nim b/tests/test_reliability.nim index 4126f35..e9d44dc 100644 --- a/tests/test_reliability.nim +++ b/tests/test_reliability.nim @@ -99,7 +99,7 @@ suite "Reliability Mechanisms": messageReadyCount += 1, proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = missingDepsCount += 1, ) @@ -176,7 +176,7 @@ suite "Reliability Mechanisms": messageReadyCount += 1, proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = missingDepsCount += 1, ) @@ -216,7 +216,7 @@ suite "Reliability Mechanisms": discard, proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = discard, ) @@ -261,7 +261,7 @@ suite "Reliability Mechanisms": messageReadyCount += 1, proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = missingDepsCount += 1, nil, proc(messageId: SdsMessageID): seq[byte] = @@ -301,8 +301,8 @@ suite "Reliability Mechanisms": let (_, missingDeps3, _) = unwrapResult3.get() check missingDeps3.len == 1 check missingDeps3[0].messageId == "missing-dep" - # The hint is empty because it was not in our history, so the provider was not called - check missingDeps3[0].retrievalHint.len == 0 + # The hint should be populated by the retrieval hint provider for missing dependencies + check missingDeps3[0].retrievalHint == cast[seq[byte]]("hint:missing-dep") # Periodic task & Buffer management tests suite "Periodic Tasks & Buffer Management": @@ -326,7 +326,7 @@ suite "Periodic Tasks & Buffer Management": discard, proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = discard, ) @@ -381,7 +381,7 @@ suite "Periodic Tasks & Buffer Management": discard, proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = messageSentCount += 1, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = discard, ) @@ -430,7 +430,7 @@ suite "Periodic Tasks & Buffer Management": discard, proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = discard, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = discard, proc() {.gcsafe.} = syncCallCount += 1, @@ -496,7 +496,7 @@ suite "Special Cases Handling": messageReadyCount += 1, proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = discard, - proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = + proc(messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = discard, ) @@ -654,7 +654,7 @@ suite "Multi-Channel ReliabilityManager Tests": readyMessageCount += 1, proc(messageId: SdsMessageID, channelId: SdsChannelID) {.gcsafe.} = sentMessageCount += 1, - proc(messageId: SdsMessageID, deps: seq[SdsMessageID], channelId: SdsChannelID) {.gcsafe.} = + proc(messageId: SdsMessageID, deps: seq[HistoryEntry], channelId: SdsChannelID) {.gcsafe.} = missingDepsCount += 1 )