From cd898d0f72265123be5e5aa621c1363277dc6d67 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Wed, 6 May 2026 14:12:02 +0200 Subject: [PATCH] add temporary debug logs recv_monitor --- .../recv_service/recv_service.nim | 74 ++++++++++++++++--- waku/node/kernel_api/relay.nim | 6 ++ 2 files changed, 71 insertions(+), 9 deletions(-) diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 9a85df2f9..57b87a38f 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -114,6 +114,15 @@ proc msgChecker(self: RecvService) {.async.} = let missedHashes: seq[WakuMessageHash] = msgHashesInStore.filterIt(not rxMsgHashes.contains(it)) + debug "AAAA msgChecker delivery discrepancy", + store_hashes = msgHashesInStore.len, + rx_hashes = rxMsgHashes.len, + missed_hashes = missedHashes.len + + if missedHashes.len > 0: + debug "AAAA msgChecker detected missed hashes", + missed_hashes = missedHashes.len, missed_hash_list = missedHashes + ## Now retrieve the missed WakuMessages let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes) if missingMsgsRet.isOk(): @@ -137,10 +146,30 @@ proc processIncomingMessageOfInterest( ## the MAPI MessageReceivedEvent for every unique incoming message. let msgHash = computeMessageHash(pubsubTopic, message) - if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash): - let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) - self.recentReceivedMsgs.add(rxMsg) - MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message) + debug "AAAA processIncomingMessageOfInterest received event", + topic = pubsubTopic, + contenttopic = message.contentTopic, + msg_hash = shortLog(msgHash) + + if self.recentReceivedMsgs.anyIt(it.msgHash == msgHash): + debug "AAAA processIncomingMessageOfInterest duplicate message skipped", + topic = pubsubTopic, + contenttopic = message.contentTopic, + msg_hash = shortLog(msgHash) + return + + let prevHashes = self.recentReceivedMsgs.mapIt(it.msgHash) + let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) + self.recentReceivedMsgs.add(rxMsg) + debug "AAAA recentReceivedMsgs added new message", + topic = pubsubTopic, + contenttopic = message.contentTopic, + added_msg_hash = shortLog(msgHash), + prev_entries = prevHashes.len, + now_entries = self.recentReceivedMsgs.len, + recent_hashes = self.recentReceivedMsgs.mapIt(it.msgHash) + + MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message) proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T = ## The storeClient will help to acquire any possible missed messages @@ -159,7 +188,16 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T = proc loopPruneOldMessages(self: RecvService) {.async.} = while true: let oldestAllowedTime = getNowInNanosecondTime() - MaxMessageLife.nanos + let prevHashes = self.recentReceivedMsgs.mapIt(it.msgHash) + let prevEntries = prevHashes.len + let hashesBeforePrune = self.recentReceivedMsgs.mapIt(it.msgHash) self.recentReceivedMsgs.keepItIf(it.rxTime > oldestAllowedTime) + debug "AAAA loopPruneOldMessages", + before_entries = prevEntries, + after_entries = self.recentReceivedMsgs.len, + pruned_entries = prevEntries - self.recentReceivedMsgs.len, + remaining_hashes = self.recentReceivedMsgs.mapIt(it.msgHash), + hashesBeforePrune = hashesBeforePrune await sleepAsync(PruneOldMsgsPeriod) proc startRecvService*(self: RecvService) = @@ -169,13 +207,31 @@ proc startRecvService*(self: RecvService) = self.seenMsgListener = MessageSeenEvent.listen( self.brokerCtx, proc(event: MessageSeenEvent) {.async: (raises: []).} = - if not self.subscriptionManager.isSubscribed( - event.topic, event.message.contentTopic - ): - trace "skipping message as I am not subscribed", - shard = event.topic, contenttopic = event.message.contentTopic + let eventHash = computeMessageHash(event.topic, event.message) + debug "AAAA MessageSeenEvent listener received event", + topic = event.topic, + contenttopic = event.message.contentTopic, + msg_hash = shortLog(eventHash), + timestamp = event.message.timestamp + + let subscribed = + self.subscriptionManager.isSubscribed(event.topic, event.message.contentTopic) + debug "AAAA MessageSeenEvent subscription check", + topic = event.topic, + contenttopic = event.message.contentTopic, + subscribed = subscribed + + if not subscribed: + debug "AAAA skipping MessageSeenEvent because not subscribed", + shard = event.topic, + contenttopic = event.message.contentTopic, + msg_hash = shortLog(eventHash) return + debug "AAAA MessageSeenEvent accepted for processing", + topic = event.topic, + contenttopic = event.message.contentTopic, + msg_hash = shortLog(eventHash) self.processIncomingMessageOfInterest(event.topic, event.message), ).valueOr: error "Failed to set MessageSeenEvent listener", error = error diff --git a/waku/node/kernel_api/relay.nim b/waku/node/kernel_api/relay.nim index c5a11ff02..da61a9640 100644 --- a/waku/node/kernel_api/relay.nim +++ b/waku/node/kernel_api/relay.nim @@ -92,6 +92,12 @@ proc registerRelayHandler( node.wakuStoreReconciliation.messageIngress(topic, msg) proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + let msgHash = topic.computeMessageHash(msg) + debug "AAAA relay internalHandler emitting MessageSeenEvent", + topic = topic, + contenttopic = msg.contentTopic, + msg_hash = shortLog(msgHash), + payload_len = msg.payload.len MessageSeenEvent.emit(node.brokerCtx, topic, msg) let uniqueTopicHandler = proc(