From 151e7e5a7d4a01e74d4f32dd6e8c4e680f7083c2 Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Wed, 8 Apr 2026 17:23:04 -0300 Subject: [PATCH] fix: recv_service delivers store-recovered messages * recv_service fixed to deliver incoming store messages * add regression test_api_receive * fixed wrong/verbose log message --- tests/api/test_all.nim | 1 + tests/api/test_api_receive.nim | 193 ++++++++++++++++++ waku/events/delivery_events.nim | 16 -- .../delivery_service/delivery_service.nim | 2 +- .../recv_service/recv_service.nim | 135 ++++++------ 5 files changed, 258 insertions(+), 89 deletions(-) create mode 100644 tests/api/test_api_receive.nim diff --git a/tests/api/test_all.nim b/tests/api/test_all.nim index 4617c8cdb..56be19c27 100644 --- a/tests/api/test_all.nim +++ b/tests/api/test_all.nim @@ -5,4 +5,5 @@ import ./test_node_conf, ./test_api_send, ./test_api_subscription, + ./test_api_receive, ./test_api_health diff --git a/tests/api/test_api_receive.nim b/tests/api/test_api_receive.nim new file mode 100644 index 000000000..52f8713f9 --- /dev/null +++ b/tests/api/test_api_receive.nim @@ -0,0 +1,193 @@ +{.used.} + +import std/[options, sequtils, net, sets] +import chronos, testutils/unittests, stew/byteutils +import libp2p/[peerid, peerinfo, crypto/crypto] +import ../testlib/[common, wakucore, wakunode, testasync] +import ../waku_archive/archive_utils + +import + waku, + waku/[ + waku_node, + waku_core, + common/broker/broker_context, + events/message_events, + waku_relay/protocol, + waku_archive, + waku_archive/common as archive_common, + node/delivery_service/delivery_service, + node/delivery_service/recv_service, + ] +import waku/factory/waku_conf +import tools/confutils/cli_args + +const TestTimeout = chronos.seconds(60) + +type ReceiveEventListenerManager = ref object + brokerCtx: BrokerContext + receivedListener: MessageReceivedEventListener + receivedEvent: AsyncEvent + receivedMessages: seq[WakuMessage] + targetCount: int + +proc newReceiveEventListenerManager( + brokerCtx: BrokerContext, expectedCount: int = 1 +): ReceiveEventListenerManager = + let manager = ReceiveEventListenerManager( + brokerCtx: brokerCtx, receivedMessages: @[], targetCount: expectedCount + ) + manager.receivedEvent = newAsyncEvent() + + manager.receivedListener = MessageReceivedEvent + .listen( + brokerCtx, + proc(event: MessageReceivedEvent) {.async: (raises: []).} = + manager.receivedMessages.add(event.message) + if manager.receivedMessages.len >= manager.targetCount: + manager.receivedEvent.fire() + , + ) + .expect("Failed to listen to MessageReceivedEvent") + + return manager + +proc teardown(manager: ReceiveEventListenerManager) = + MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener) + +proc waitForEvents( + manager: ReceiveEventListenerManager, timeout: Duration +): Future[bool] {.async.} = + return await manager.receivedEvent.wait().withTimeout(timeout) + +proc createApiNodeConf(numShards: uint16 = 1): WakuNodeConf = + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.mode = cli_args.WakuMode.Core + conf.listenAddress = parseIpAddress("0.0.0.0") + conf.tcpPort = Port(0) + conf.discv5UdpPort = Port(0) + conf.clusterId = 3'u16 + conf.numShardsInNetwork = numShards + conf.reliabilityEnabled = true + conf.rest = false + result = conf + +suite "Messaging API, Receive Service (store recovery)": + asyncTest "recv_service delivers store-recovered messages via MessageReceivedEvent": + ## Message gets archived before subscriber exists, checkStore() recovers it. + ## This is a regression test: it proves that messages recovered via store by + ## the RecvService (instead of receiving via a live relay sub) are actually + ## delivered via the MessageReceivedEvent API. + + let numShards: uint16 = 1 + let shards = @[PubsubTopic("/waku/2/rs/3/0")] + let shard = shards[0] + let testTopic = ContentTopic("/waku/2/recv-test/proto") + + proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + discard + + # store node has archive, store, relay + # it archives messages from relay and serves them to the + # subscriber's store client when it comes up (later) + var storeNode: WakuNode + lockNewGlobalBrokerContext: + storeNode = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + storeNode.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect( + "Failed to mount metadata on storeNode" + ) + (await storeNode.mountRelay()).expect("Failed to mount relay on storeNode") + let archiveDriver = newSqliteArchiveDriver() + storeNode.mountArchive(archiveDriver).expect("Failed to mount archive") + await storeNode.mountStore() + await storeNode.mountLibp2pPing() + await storeNode.start() + + for s in shards: + storeNode.subscribe((kind: PubsubSub, topic: s), dummyHandler).expect( + "Failed to sub storeNode" + ) + + let storeNodePeerInfo = storeNode.peerInfo.toRemotePeerInfo() + + # publisher node (relay) + var publisher: WakuNode + lockNewGlobalBrokerContext: + publisher = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + publisher.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect( + "Failed to mount metadata on publisher" + ) + (await publisher.mountRelay()).expect("Failed to mount relay on publisher") + await publisher.mountLibp2pPing() + await publisher.start() + + for s in shards: + publisher.subscribe((kind: PubsubSub, topic: s), dummyHandler).expect( + "Failed to sub publisher" + ) + + # connect publisher to store so messages get archived + await publisher.connectToNodes(@[storeNodePeerInfo]) + + # wait for relay mesh + for _ in 0 ..< 50: + if publisher.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0: + break + await sleepAsync(100.milliseconds) + + # publish before subscriber exists, gets archived + let missedPayload = "This message was missed".toBytes() + let missedMsg = WakuMessage( + payload: missedPayload, contentTopic: testTopic, version: 0, timestamp: now() + ) + discard (await publisher.publish(some(shard), missedMsg)).expect( + "Publish missed msg failed" + ) + + # wait for archive + block waitArchive: + for _ in 0 ..< 50: + let query = archive_common.ArchiveQuery( + includeData: false, contentTopics: @[testTopic], pubsubTopic: some(shard) + ) + let res = await storeNode.wakuArchive.findMessages(query) + if res.isOk() and res.get().hashes.len > 0: + break waitArchive + await sleepAsync(100.milliseconds) + raiseAssert "Message was not archived in time" + + # create subscriber + var subscriber: Waku + lockNewGlobalBrokerContext: + subscriber = (await createNode(createApiNodeConf(numShards))).expect( + "Failed to create subscriber" + ) + (await startWaku(addr subscriber)).expect("Failed to start subscriber") + + # connect subscriber to store (not publisher, so msg won't come via relay to it) + await subscriber.node.connectToNodes(@[storeNodePeerInfo]) + + # subscribe to content topic + (await subscriber.subscribe(testTopic)).expect("Failed to subscribe") + + # listen before triggering store check + let eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + # trigger store check, should recover and deliver via MessageReceivedEvent + await subscriber.deliveryService.recvService.checkStore() + + let received = await eventManager.waitForEvents(TestTimeout) + check received + check eventManager.receivedMessages.len == 1 + if eventManager.receivedMessages.len > 0: + check eventManager.receivedMessages[0].payload == missedPayload + + # cleanup + (await subscriber.stop()).expect("Failed to stop subscriber") + await publisher.stop() + await storeNode.stop() diff --git a/waku/events/delivery_events.nim b/waku/events/delivery_events.nim index f8eb0f48d..f27f02721 100644 --- a/waku/events/delivery_events.nim +++ b/waku/events/delivery_events.nim @@ -1,21 +1,5 @@ import waku/waku_core/[message/message, message/digest], waku/common/broker/event_broker -type DeliveryDirection* {.pure.} = enum - PUBLISHING - RECEIVING - -type DeliverySuccess* {.pure.} = enum - SUCCESSFUL - UNSUCCESSFUL - -EventBroker: - type DeliveryFeedbackEvent* = ref object - success*: DeliverySuccess - dir*: DeliveryDirection - comment*: string - msgHash*: WakuMessageHash - msg*: WakuMessage - EventBroker: type OnFilterSubscribeEvent* = object pubsubTopic*: string diff --git a/waku/node/delivery_service/delivery_service.nim b/waku/node/delivery_service/delivery_service.nim index fd728d048..f3d78d98e 100644 --- a/waku/node/delivery_service/delivery_service.nim +++ b/waku/node/delivery_service/delivery_service.nim @@ -12,7 +12,7 @@ import type DeliveryService* = ref object sendService*: SendService - recvService: RecvService + recvService*: RecvService subscriptionManager*: SubscriptionManager proc new*( diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 9a85df2f9..c72267b5d 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -12,7 +12,6 @@ import waku_store/common, waku_filter_v2/client, waku_core/topics, - events/delivery_events, events/message_events, waku_node, common/broker/broker_context, @@ -27,7 +26,8 @@ const PruneOldMsgsPeriod = chronos.minutes(1) const DelayExtra* = chronos.seconds(5) ## Additional security time to overlap the missing messages queries -type TupleHashAndMsg = tuple[hash: WakuMessageHash, msg: WakuMessage] +type TupleHashAndMsg = + tuple[hash: WakuMessageHash, msg: WakuMessage, pubsubTopic: PubsubTopic] type RecvMessage = object msgHash: WakuMessageHash @@ -59,88 +59,79 @@ proc getMissingMsgsFromStore( return err("getMissingMsgsFromStore: " & $error) let otherwiseMsg = WakuMessage() - ## message to be returned if the Option message is none + let otherwiseTopic = PubsubTopic("") return ok( - storeResp.messages.mapIt((hash: it.messageHash, msg: it.message.get(otherwiseMsg))) + storeResp.messages.mapIt( + ( + hash: it.messageHash, + msg: it.message.get(otherwiseMsg), + pubsubTopic: it.pubsubTopic.get(otherwiseTopic), + ) + ) ) -proc performDeliveryFeedback( - self: RecvService, - success: DeliverySuccess, - dir: DeliveryDirection, - comment: string, - msgHash: WakuMessageHash, - msg: WakuMessage, -) {.gcsafe, raises: [].} = - info "recv monitor performDeliveryFeedback", - success, dir, comment, msg_hash = shortLog(msgHash) - - DeliveryFeedbackEvent.emit( - brokerCtx = self.brokerCtx, - success = success, - dir = dir, - comment = comment, - msgHash = msgHash, - msg = msg, - ) - -proc msgChecker(self: RecvService) {.async.} = - ## Continuously checks if a message has been received - while true: - await sleepAsync(StoreCheckPeriod) - self.endTimeToCheck = getNowInNanosecondTime() - - var msgHashesInStore = newSeq[WakuMessageHash](0) - for pubsubTopic, contentTopics in self.subscriptionManager.subscribedTopics: - let storeResp: StoreQueryResponse = ( - await self.node.wakuStoreClient.queryToAny( - StoreQueryRequest( - includeData: false, - pubsubTopic: some(pubsubTopic), - contentTopics: toSeq(contentTopics), - startTime: some(self.startTimeToCheck - DelayExtra.nanos), - endTime: some(self.endTimeToCheck + DelayExtra.nanos), - ) - ) - ).valueOr: - error "msgChecker failed to get remote msgHashes", - pubsubTopic = pubsubTopic, cTopics = toSeq(contentTopics), error = $error - continue - - msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash)) - - ## compare the msgHashes seen from the store vs the ones received directly - let rxMsgHashes = self.recentReceivedMsgs.mapIt(it.msgHash) - let missedHashes: seq[WakuMessageHash] = - msgHashesInStore.filterIt(not rxMsgHashes.contains(it)) - - ## Now retrieve the missed WakuMessages - let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes) - if missingMsgsRet.isOk(): - ## Give feedback so that the api client can perfom any action with the missed messages - for msgTuple in missingMsgsRet.get(): - self.performDeliveryFeedback( - DeliverySuccess.UNSUCCESSFUL, RECEIVING, "Missed message", msgTuple.hash, - msgTuple.msg, - ) - else: - error "failed to retrieve missing messages: ", error = $missingMsgsRet.error - - ## update next check times - self.startTimeToCheck = self.endTimeToCheck - proc processIncomingMessageOfInterest( self: RecvService, pubsubTopic: string, message: WakuMessage -) = - ## Resolve an incoming network message that was already filtered by topic. +): bool = ## Deduplicate (by hash), store (saves in recently-seen messages) and emit ## the MAPI MessageReceivedEvent for every unique incoming message. + ## Returns true if the message was new and delivered. let msgHash = computeMessageHash(pubsubTopic, message) if not self.recentReceivedMsgs.anyIt(it.msgHash == msgHash): let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) self.recentReceivedMsgs.add(rxMsg) MessageReceivedEvent.emit(self.brokerCtx, msgHash.to0xHex(), message) + return true + return false + +proc checkStore*(self: RecvService) {.async.} = + ## Checks the store for messages that were not received directly and + ## delivers them via MessageReceivedEvent. + self.endTimeToCheck = getNowInNanosecondTime() + + var msgHashesInStore = newSeq[WakuMessageHash](0) + for pubsubTopic, contentTopics in self.subscriptionManager.subscribedTopics: + let storeResp: StoreQueryResponse = ( + await self.node.wakuStoreClient.queryToAny( + StoreQueryRequest( + includeData: false, + pubsubTopic: some(pubsubTopic), + contentTopics: toSeq(contentTopics), + startTime: some(self.startTimeToCheck - DelayExtra.nanos), + endTime: some(self.endTimeToCheck + DelayExtra.nanos), + ) + ) + ).valueOr: + error "msgChecker failed to get remote msgHashes", + pubsubTopic = pubsubTopic, cTopics = toSeq(contentTopics), error = $error + continue + + msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash)) + + ## compare the msgHashes seen from the store vs the ones received directly + let rxMsgHashes = self.recentReceivedMsgs.mapIt(it.msgHash) + let missedHashes: seq[WakuMessageHash] = + msgHashesInStore.filterIt(not rxMsgHashes.contains(it)) + + ## Now retrieve the missing WakuMessages and deliver them + let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes) + if missingMsgsRet.isOk(): + for msgTuple in missingMsgsRet.get(): + if self.processIncomingMessageOfInterest(msgTuple.pubsubTopic, msgTuple.msg): + info "recv service store-recovered message", + msg_hash = shortLog(msgTuple.hash), pubsubTopic = msgTuple.pubsubTopic + else: + error "failed to retrieve missing messages: ", error = $missingMsgsRet.error + + ## update next check times + self.startTimeToCheck = self.endTimeToCheck + +proc msgChecker(self: RecvService) {.async.} = + ## Continuously checks if a message has been received + while true: + await sleepAsync(StoreCheckPeriod) + await self.checkStore() proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T = ## The storeClient will help to acquire any possible missed messages @@ -176,7 +167,7 @@ proc startRecvService*(self: RecvService) = shard = event.topic, contenttopic = event.message.contentTopic return - self.processIncomingMessageOfInterest(event.topic, event.message), + discard self.processIncomingMessageOfInterest(event.topic, event.message), ).valueOr: error "Failed to set MessageSeenEvent listener", error = error quit(QuitFailure)