From 455d4884ac371b95c201efdb6d6f97ae30847870 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Wed, 6 May 2026 16:27:31 +0200 Subject: [PATCH] Revert "add temporary debug logs recv_monitor" This reverts commit cd898d0f72265123be5e5aa621c1363277dc6d67. --- .../recv_service/recv_service.nim | 74 +++---------------- waku/node/kernel_api/relay.nim | 6 -- 2 files changed, 9 insertions(+), 71 deletions(-) diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 57b87a38f..9a85df2f9 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -114,15 +114,6 @@ proc msgChecker(self: RecvService) {.async.} = let missedHashes: seq[WakuMessageHash] = msgHashesInStore.filterIt(not rxMsgHashes.contains(it)) - debug "AAAA msgChecker delivery discrepancy", - store_hashes = msgHashesInStore.len, - rx_hashes = rxMsgHashes.len, - missed_hashes = missedHashes.len - - if missedHashes.len > 0: - debug "AAAA msgChecker detected missed hashes", - missed_hashes = missedHashes.len, missed_hash_list = missedHashes - ## Now retrieve the missed WakuMessages let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes) if missingMsgsRet.isOk(): @@ -146,30 +137,10 @@ proc processIncomingMessageOfInterest( ## the MAPI MessageReceivedEvent for every unique incoming message. let msgHash = computeMessageHash(pubsubTopic, message) - debug "AAAA processIncomingMessageOfInterest received event", - topic = pubsubTopic, - contenttopic = message.contentTopic, - msg_hash = shortLog(msgHash) - - if self.recentReceivedMsgs.anyIt(it.msgHash == msgHash): - debug "AAAA processIncomingMessageOfInterest duplicate message skipped", - topic = pubsubTopic, - contenttopic = message.contentTopic, - msg_hash = shortLog(msgHash) - return - - let prevHashes = self.recentReceivedMsgs.mapIt(it.msgHash) - let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) - self.recentReceivedMsgs.add(rxMsg) - debug "AAAA recentReceivedMsgs added new message", - topic = pubsubTopic, - contenttopic = message.contentTopic, - added_msg_hash = shortLog(msgHash), - prev_entries = prevHashes.len, - now_entries = self.recentReceivedMsgs.len, - recent_hashes = self.recentReceivedMsgs.mapIt(it.msgHash) - - MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message) + if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash): + let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) + self.recentReceivedMsgs.add(rxMsg) + MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message) proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T = ## The storeClient will help to acquire any possible missed messages @@ -188,16 +159,7 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T = proc loopPruneOldMessages(self: RecvService) {.async.} = while true: let oldestAllowedTime = getNowInNanosecondTime() - MaxMessageLife.nanos - let prevHashes = self.recentReceivedMsgs.mapIt(it.msgHash) - let prevEntries = prevHashes.len - let hashesBeforePrune = self.recentReceivedMsgs.mapIt(it.msgHash) self.recentReceivedMsgs.keepItIf(it.rxTime > oldestAllowedTime) - debug "AAAA loopPruneOldMessages", - before_entries = prevEntries, - after_entries = self.recentReceivedMsgs.len, - pruned_entries = prevEntries - self.recentReceivedMsgs.len, - remaining_hashes = self.recentReceivedMsgs.mapIt(it.msgHash), - hashesBeforePrune = hashesBeforePrune await sleepAsync(PruneOldMsgsPeriod) proc startRecvService*(self: RecvService) = @@ -207,31 +169,13 @@ proc startRecvService*(self: RecvService) = self.seenMsgListener = MessageSeenEvent.listen( self.brokerCtx, proc(event: MessageSeenEvent) {.async: (raises: []).} = - let eventHash = computeMessageHash(event.topic, event.message) - debug "AAAA MessageSeenEvent listener received event", - topic = event.topic, - contenttopic = event.message.contentTopic, - msg_hash = shortLog(eventHash), - timestamp = event.message.timestamp - - let subscribed = - self.subscriptionManager.isSubscribed(event.topic, event.message.contentTopic) - debug "AAAA MessageSeenEvent subscription check", - topic = event.topic, - contenttopic = event.message.contentTopic, - subscribed = subscribed - - if not subscribed: - debug "AAAA skipping MessageSeenEvent because not subscribed", - shard = event.topic, - contenttopic = event.message.contentTopic, - msg_hash = shortLog(eventHash) + if not self.subscriptionManager.isSubscribed( + event.topic, event.message.contentTopic + ): + trace "skipping message as I am not subscribed", + shard = event.topic, contenttopic = event.message.contentTopic return - debug "AAAA MessageSeenEvent accepted for processing", - topic = event.topic, - contenttopic = event.message.contentTopic, - msg_hash = shortLog(eventHash) self.processIncomingMessageOfInterest(event.topic, event.message), ).valueOr: error "Failed to set MessageSeenEvent listener", error = error diff --git a/waku/node/kernel_api/relay.nim b/waku/node/kernel_api/relay.nim index da61a9640..c5a11ff02 100644 --- a/waku/node/kernel_api/relay.nim +++ b/waku/node/kernel_api/relay.nim @@ -92,12 +92,6 @@ proc registerRelayHandler( node.wakuStoreReconciliation.messageIngress(topic, msg) proc internalHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - let msgHash = topic.computeMessageHash(msg) - debug "AAAA relay internalHandler emitting MessageSeenEvent", - topic = topic, - contenttopic = msg.contentTopic, - msg_hash = shortLog(msgHash), - payload_len = msg.payload.len MessageSeenEvent.emit(node.brokerCtx, topic, msg) let uniqueTopicHandler = proc(