From cb35b59f951a8b6ba3815fd7f0cfb95c99192753 Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Wed, 13 May 2026 12:09:56 -0300 Subject: [PATCH] stop recv_service from delivering messages on unsubscribed topics for store-recovered messages (#3874) * fix/harden recv_service so it won't deliver messages on unsubscribed content topics * fix SubscrptionManager's subscribed-content-topics iterator * fix broken store-message-receive test * misc cleanups --- tests/api/test_api_receive.nim | 26 +++++++----- .../recv_service/recv_service.nim | 41 ++++++++++--------- .../delivery_service/subscription_manager.nim | 9 ++++ 3 files changed, 47 insertions(+), 29 deletions(-) diff --git a/tests/api/test_api_receive.nim b/tests/api/test_api_receive.nim index 52f8713f9..24251f161 100644 --- a/tests/api/test_api_receive.nim +++ b/tests/api/test_api_receive.nim @@ -138,7 +138,20 @@ suite "Messaging API, Receive Service (store recovery)": break await sleepAsync(100.milliseconds) - # publish before subscriber exists, gets archived + # create the subscriber before publishing. + # RecvService captures startTimeToCheck at construction time; the + # message's timestamp must land after that point to fall inside + # checkStore's time window. + var subscriber: Waku + lockNewGlobalBrokerContext: + subscriber = (await createNode(createApiNodeConf(numShards))).expect( + "Failed to create subscriber" + ) + (await startWaku(addr subscriber)).expect("Failed to start subscriber") + + # publish after the subscriber exists but before it connects to the + # store; the message reaches the archive but the subscriber doesn't + # see it via live relay. let missedPayload = "This message was missed".toBytes() let missedMsg = WakuMessage( payload: missedPayload, contentTopic: testTopic, version: 0, timestamp: now() @@ -159,15 +172,8 @@ suite "Messaging API, Receive Service (store recovery)": await sleepAsync(100.milliseconds) raiseAssert "Message was not archived in time" - # create subscriber - var subscriber: Waku - lockNewGlobalBrokerContext: - subscriber = (await createNode(createApiNodeConf(numShards))).expect( - "Failed to create subscriber" - ) - (await startWaku(addr subscriber)).expect("Failed to start subscriber") - - # connect subscriber to store (not publisher, so msg won't come via relay to it) + # connect subscriber to store after the message is already archived so + # gossipsub doesn't replay it via the live path await subscriber.node.connectToNodes(@[storeNodePeerInfo]) # subscribe to content topic diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 64f4d683d..0f077a289 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -70,20 +70,30 @@ proc getMissingMsgsFromStore( ) ) -proc processIncomingMessageOfInterest( +proc processIncomingMessage( self: RecvService, pubsubTopic: string, message: WakuMessage ): bool = - ## Deduplicate (by hash), store (saves in recently-seen messages) and emit - ## the MAPI MessageReceivedEvent for every unique incoming message. - ## Returns true if the message was new and the MessageReceivedEvent was properly emitted. + ## Return false if the incoming message is from a non-subscribed topic, + ## or if the message is a duplicate (recently-seen). Otherwise, save it as + ## recently-seen, emit a MessageReceivedEvent, and return true. + + if not self.subscriptionManager.isSubscribed(pubsubTopic, message.contentTopic): + trace "skipping message as I am not subscribed", + shard = pubsubTopic, contentTopic = message.contentTopic + return false let msgHash = computeMessageHash(pubsubTopic, message) - if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash): - let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) - self.recentReceivedMsgs.add(rxMsg) - MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message) - return true - return false + if self.recentReceivedMsgs.anyIt(it.msgHash == msgHash): + trace "skipping duplicate message", + shard = pubsubTopic, + contentTopic = message.contentTopic, + msg_hash = msgHash.to0xHex() + return false + + let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) + self.recentReceivedMsgs.add(rxMsg) + MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message) + return true proc checkStore*(self: RecvService) {.async.} = ## Checks the store for messages that were not received directly and @@ -121,7 +131,7 @@ proc checkStore*(self: RecvService) {.async.} = let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes) if missingMsgsRet.isOk(): for msgTuple in missingMsgsRet.get(): - if self.processIncomingMessageOfInterest(msgTuple.pubsubTopic, msgTuple.msg): + if self.processIncomingMessage(msgTuple.pubsubTopic, msgTuple.msg): info "recv service store-recovered message", msg_hash = shortLog(msgTuple.hash), pubsubTopic = msgTuple.pubsubTopic else: @@ -163,14 +173,7 @@ proc startRecvService*(self: RecvService) = self.seenMsgListener = MessageSeenEvent.listen( self.brokerCtx, proc(event: MessageSeenEvent) {.async: (raises: []).} = - if not self.subscriptionManager.isSubscribed( - event.topic, event.message.contentTopic - ): - trace "skipping message as I am not subscribed", - shard = event.topic, contenttopic = event.message.contentTopic - return - - discard self.processIncomingMessageOfInterest(event.topic, event.message), + discard self.processIncomingMessage(event.topic, event.message), ).valueOr: error "Failed to set MessageSeenEvent listener", error = error quit(QuitFailure) diff --git a/waku/node/delivery_service/subscription_manager.nim b/waku/node/delivery_service/subscription_manager.nim index f00d9024c..c34335057 100644 --- a/waku/node/delivery_service/subscription_manager.nim +++ b/waku/node/delivery_service/subscription_manager.nim @@ -61,7 +61,16 @@ type SubscriptionManager* = ref object of RootObj iterator subscribedTopics*( self: SubscriptionManager ): (PubsubTopic, HashSet[ContentTopic]) = + ## Iterate over all subscribed content topics, batched per shard. + ## This is guaranteed to return a non-empty `topics` (content topics) list on iteration. + for pubsub, topics in self.contentTopicSubs.pairs: + # We are iterating over subscribed content topics; if we are subscribed to + # a shard but have no subscription (interest) for any content topic in that + # shard, then avoid triggering an iteration that doesn't advance the intent + # to iterate over content topic subscriptions. + if topics.len == 0: + continue yield (pubsub, topics) proc edgeFilterPeerCount*(sm: SubscriptionManager, shard: PubsubTopic): int =