From 8501d051a102f5844803c376fa19951b9cb2cffa Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 23 Jun 2026 12:22:16 +0200 Subject: [PATCH] fix - Cap store checks on propagated messages by MessagingClient (#3965) * Frame send service store checks and cap to 1 min per task * Add test for store not verify case --- .../send_service/delivery_task.nim | 11 ++++ .../send_service/lightpush_processor.nim | 2 + .../send_service/relay_processor.nim | 2 + .../send_service/send_service.nim | 39 ++++++++++++- tests/api/test_api_send.nim | 57 +++++++++++++++++++ 5 files changed, 108 insertions(+), 3 deletions(-) diff --git a/logos_delivery/messaging/delivery_service/send_service/delivery_task.nim b/logos_delivery/messaging/delivery_service/send_service/delivery_task.nim index 1e38fcee1..685f9afce 100644 --- a/logos_delivery/messaging/delivery_service/send_service/delivery_task.nim +++ b/logos_delivery/messaging/delivery_service/send_service/delivery_task.nim @@ -24,6 +24,9 @@ type DeliveryTask* = ref object tryCount*: int state*: DeliveryState deliveryTime*: Moment + firstPropagatedTime*: Option[Moment] + ## Set once on the first successful propagation; never reset on re-publish. + ## Anchors the store-validation time cap (see propagationAge). propagateEventEmitted*: bool errorDesc*: string @@ -74,5 +77,13 @@ proc deliveryAge*(self: DeliveryTask): timer.Duration = else: ZeroDuration +proc propagationAge*(self: DeliveryTask): timer.Duration = + ## Time elapsed since the message was first successfully propagated. + ## Stable across re-publishes; ZeroDuration until first propagation. + if self.firstPropagatedTime.isSome(): + timer.Moment.now() - self.firstPropagatedTime.get() + else: + ZeroDuration + proc isEphemeral*(self: DeliveryTask): bool = return self.msg.ephemeral diff --git a/logos_delivery/messaging/delivery_service/send_service/lightpush_processor.nim b/logos_delivery/messaging/delivery_service/send_service/lightpush_processor.nim index 4105ba48d..ae5e775aa 100644 --- a/logos_delivery/messaging/delivery_service/send_service/lightpush_processor.nim +++ b/logos_delivery/messaging/delivery_service/send_service/lightpush_processor.nim @@ -71,6 +71,8 @@ method sendImpl*( requestId = task.requestId, msgHash = task.msgHash.to0xHex() task.state = DeliveryState.SuccessfullyPropagated task.deliveryTime = Moment.now() + if task.firstPropagatedTime.isNone(): + task.firstPropagatedTime = some(Moment.now()) # TODO: with a simple retry processor it might be more accurate to say `Sent` else: # Controversial state, publish says ok but no peer. It should not happen. diff --git a/logos_delivery/messaging/delivery_service/send_service/relay_processor.nim b/logos_delivery/messaging/delivery_service/send_service/relay_processor.nim index 23b5b846c..3acc44bb4 100644 --- a/logos_delivery/messaging/delivery_service/send_service/relay_processor.nim +++ b/logos_delivery/messaging/delivery_service/send_service/relay_processor.nim @@ -76,6 +76,8 @@ method sendImpl*(self: RelaySendProcessor, task: DeliveryTask) {.async.} = noOfPeers = noOfPublishedPeers task.state = DeliveryState.SuccessfullyPropagated task.deliveryTime = Moment.now() + if task.firstPropagatedTime.isNone(): + task.firstPropagatedTime = some(Moment.now()) else: # It shall not happen, but still covering it task.state = self.fallbackStateToSet diff --git a/logos_delivery/messaging/delivery_service/send_service/send_service.nim b/logos_delivery/messaging/delivery_service/send_service/send_service.nim index 92026be5d..00f2ff672 100644 --- a/logos_delivery/messaging/delivery_service/send_service/send_service.nim +++ b/logos_delivery/messaging/delivery_service/send_service/send_service.nim @@ -59,6 +59,7 @@ type SendService* = ref object of RootObj node: WakuNode checkStoreForMessages: bool + lastStoreCheckTime: Moment ## throttles store validation queries to ArchiveTime cadence proc setupSendProcessorChain( peerManager: PeerManager, @@ -117,6 +118,7 @@ proc new*( sendProcessor: sendProcessorChain, node: w, checkStoreForMessages: checkStoreForMessages, + lastStoreCheckTime: Moment.now(), ) return ok(sendService) @@ -163,11 +165,20 @@ proc checkStoredMessages(self: SendService) {.async.} = if not self.checkStoreForMessages: return + # Throttle store queries so they run at most every ArchiveTime (3s), regardless + # of the 1s service loop cadence. + if Moment.now() - self.lastStoreCheckTime < ArchiveTime: + return + let tasksToValidate = self.taskCache.filterIt( - it.state == DeliveryState.SuccessfullyPropagated and it.deliveryAge() > ArchiveTime and - not it.isEphemeral() + it.state == DeliveryState.SuccessfullyPropagated and + it.propagationAge() > ArchiveTime and not it.isEphemeral() ) + if tasksToValidate.len() == 0: + return + + self.lastStoreCheckTime = Moment.now() await self.checkMsgsInStore(tasksToValidate) proc reportTaskResult(self: SendService, task: DeliveryTask) = @@ -200,7 +211,10 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) = # rest of the states are intermediate and does not translate to event discard - if task.messageAge() > MaxTimeInCache: + # Only tasks that never propagated are reported as hard send failures here. + # Propagated-but-not-store-validated tasks are handled (warn + drop, no event) + # in evaluateAndCleanUp. + if task.firstPropagatedTime.isNone() and task.messageAge() > MaxTimeInCache: error "Failed to send message", requestId = task.requestId, msgHash = task.msgHash.to0xHex(), @@ -229,6 +243,25 @@ proc evaluateAndCleanUp(self: SendService) = ) ) + # Store validation timed out: the message was propagated but never confirmed in a + # store node within MaxTimeInCache (measured from first propagation). Warn and drop + # without emitting an app event. + for task in self.taskCache: + if task.firstPropagatedTime.isSome() and + task.state != DeliveryState.SuccessfullyValidated and + task.propagationAge() > MaxTimeInCache: + warn "Message propagated but not validated by a store node within time window; stop trying.", + requestId = task.requestId, + msgHash = task.msgHash.to0xHex(), + propagationAge = task.propagationAge() + + self.taskCache.keepItIf( + not ( + it.firstPropagatedTime.isSome() and it.state != DeliveryState.SuccessfullyValidated and + it.propagationAge() > MaxTimeInCache + ) + ) + proc trySendMessages(self: SendService) {.async.} = let tasksToSend = self.taskCache.filterIt(it.state == DeliveryState.NextRoundRetry) diff --git a/tests/api/test_api_send.nim b/tests/api/test_api_send.nim index 0585998b2..13ec57c83 100644 --- a/tests/api/test_api_send.nim +++ b/tests/api/test_api_send.nim @@ -472,3 +472,60 @@ suite "Waku API - Send": eventManager.validate({SendEventOutcome.Error}, requestId) (await node.stop()).isOkOr: raiseAssert "Failed to stop node: " & error + + asyncTest "Store validation times out without event": + ## The message propagates successfully, but the only reachable store peer never + ## receives/archives it (it is outside the relay propagation path), so store + ## validation never confirms. After MaxTimeInCache the task must be dropped with a + ## warn log and NO app event: Propagated fires, but neither Sent nor Error - the + ## missing Sent event is the signal that delivery could not be validated. + var isolatedStoreNode: WakuNode + lockNewGlobalBrokerContext: + isolatedStoreNode = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + isolatedStoreNode.mountMetadata(3, @[0'u16]).isOkOr: + raiseAssert "Failed to mount metadata: " & error + (await isolatedStoreNode.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" + let archiveDriver = newSqliteArchiveDriver() + isolatedStoreNode.mountArchive(archiveDriver).isOkOr: + raiseAssert "Failed to mount archive: " & error + await isolatedStoreNode.mountStore() + await isolatedStoreNode.mountLibp2pPing() + await isolatedStoreNode.start() + # Deliberately NOT subscribed to the topic and NOT wired into the relay mesh, so + # it can answer store queries but never holds the published message. + let isolatedStoreNodePeerInfo = isolatedStoreNode.peerInfo.toRemotePeerInfo() + + var node: Waku + lockNewGlobalBrokerContext: + node = (await createNode(createApiNodeConf())).valueOr: + raiseAssert error + node.mountMessagingClient().isOkOr: + raiseAssert "Failed to mount messaging: " & error + (await node.start()).isOkOr: + raiseAssert "Failed to start Waku node: " & error + + # Propagate via relayNode1; store queries can only reach the isolated store node. + await node.node.connectToNodes(@[relayNode1PeerInfo, isolatedStoreNodePeerInfo]) + + let eventManager = newSendEventListenerManager(node.brokerCtx) + defer: + await eventManager.teardown() + + let envelope = MessageEnvelope.init( + ContentTopic("/waku/2/default-content/proto"), "test payload" + ) + + let requestId = (await node.send(envelope)).valueOr: + raiseAssert error + + # Must outlive MaxTimeInCache (1 min) so the store-validation timeout drop fires. + const eventTimeout = 65.seconds + discard await eventManager.waitForEvents(eventTimeout) + + eventManager.validate({SendEventOutcome.Propagated}, requestId) + + await isolatedStoreNode.stop() + (await node.stop()).isOkOr: + raiseAssert "Failed to stop node: " & error