diff --git a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim index 55446c2a9..90bdb0839 100644 --- a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim +++ b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim @@ -14,12 +14,11 @@ import waku_store/common, waku_filter_v2/client, events/message_events, + events/health_events, waku_node, node/subscription_manager, ] -const StoreCheckPeriod = chronos.minutes(5) ## How often to perform store queries - const MaxMessageLife = chronos.minutes(7) ## Max time we will keep track of rx messages const PruneOldMsgsPeriod = chronos.minutes(1) @@ -39,10 +38,17 @@ type RecvService* = ref object of RootObj brokerCtx: BrokerContext node: WakuNode seenMsgListener: MessageSeenEventListener + connStatusListener: EventConnectionStatusChangeListener recentReceivedMsgs: seq[RecvMessage] - msgCheckerHandler: Future[void] ## allows to stop the msgChecker async task + online: bool + ## Whether we currently have connectivity (ConnectionStatus != Disconnected). + ## Status events carry only the new state, so this remembers the previous one + ## to act on edges, not every event: `PartiallyConnected`/`Connected` flicker + ## while still online, and the bool collapses that — backfill once when we come + ## online, stamp the gap start when we go offline. + backfillHandler: Future[void] ## in-flight store backfill task msgPrunerHandler: Future[void] ## removes too old messages startTimeToCheck: Timestamp @@ -100,6 +106,10 @@ proc processIncomingMessage( proc checkStore*(self: RecvService) {.async.} = ## Checks the store for messages that were not received directly and ## delivers them via MessageReceivedEvent. + if self.node.wakuStoreClient.isNil(): + error "recv service has no store client mounted, skipping store check" + return + self.endTimeToCheck = getNowInNanosecondTime() ## query store and deliver new recovered messages per subscribed topic @@ -115,7 +125,7 @@ proc checkStore*(self: RecvService) {.async.} = ) ) ).valueOr: - error "msgChecker failed to get remote msgHashes", + error "checkStore failed to get remote msgHashes", pubsubTopic = pubsubTopic, cTopics = toSeq(contentTopics), error = $error continue @@ -142,11 +152,22 @@ proc checkStore*(self: RecvService) {.async.} = ## update next check times self.startTimeToCheck = self.endTimeToCheck -proc msgChecker(self: RecvService) {.async.} = - ## Continuously checks if a message has been received - while true: - await sleepAsync(StoreCheckPeriod) - await self.checkStore() +proc onConnectionStatusChange(self: RecvService, status: ConnectionStatus) = + ## Backfill the store over the window we were offline (`Disconnected`). + let nowOnline = status != ConnectionStatus.Disconnected + if nowOnline == self.online: + return + self.online = nowOnline + + if not nowOnline: + self.startTimeToCheck = getNowInNanosecondTime() + return + + # At most one backfill in flight; skip if the previous is still running. + # Triggers are paced by health-monitor status changes, so overlap is unlikely. + if self.backfillHandler.isNil() or self.backfillHandler.finished(): + info "recv service backfilling missed messages after coming back online" + self.backfillHandler = self.checkStore() proc new*(T: typedesc[RecvService], node: WakuNode): T = ## The storeClient will help to acquire any possible missed messages @@ -168,7 +189,6 @@ proc loopPruneOldMessages(self: RecvService) {.async.} = await sleepAsync(PruneOldMsgsPeriod) proc startRecvService*(self: RecvService) = - self.msgCheckerHandler = self.msgChecker() self.msgPrunerHandler = self.loopPruneOldMessages() self.seenMsgListener = MessageSeenEvent.listen( @@ -179,11 +199,22 @@ proc startRecvService*(self: RecvService) = error "Failed to set MessageSeenEvent listener", error = error quit(QuitFailure) + self.connStatusListener = EventConnectionStatusChange.listen( + self.brokerCtx, + proc(event: EventConnectionStatusChange) {.async: (raises: []).} = + self.onConnectionStatusChange(event.connectionStatus), + ).valueOr: + error "Failed to set EventConnectionStatusChange listener", error = error + quit(QuitFailure) + proc stopRecvService*(self: RecvService) {.async.} = await MessageSeenEvent.dropListener(self.brokerCtx, self.seenMsgListener) - if not self.msgCheckerHandler.isNil(): - await self.msgCheckerHandler.cancelAndWait() - self.msgCheckerHandler = nil + await EventConnectionStatusChange.dropListener( + self.brokerCtx, self.connStatusListener + ) + if not self.backfillHandler.isNil(): + await self.backfillHandler.cancelAndWait() + self.backfillHandler = nil if not self.msgPrunerHandler.isNil(): await self.msgPrunerHandler.cancelAndWait() self.msgPrunerHandler = nil diff --git a/tests/api/test_api_receive.nim b/tests/api/test_api_receive.nim index d7b6e3d7b..362ee1847 100644 --- a/tests/api/test_api_receive.nim +++ b/tests/api/test_api_receive.nim @@ -15,6 +15,7 @@ import waku_node, waku_core, events/message_events, + events/health_events, waku_relay/protocol, waku_archive, waku_archive/common as archive_common, @@ -60,6 +61,27 @@ proc waitForEvents( ): Future[bool] {.async.} = return await manager.receivedEvent.wait().withTimeout(timeout) +proc waitForConnectionStatus( + brokerCtx: BrokerContext, expected: ConnectionStatus +) {.async.} = + ## Completes when the node reports `expected`. + var future = newFuture[void]("waitForConnectionStatus") + + let handler: EventConnectionStatusChangeListenerProc = proc( + e: EventConnectionStatusChange + ) {.async: (raises: []), gcsafe.} = + if not future.finished and e.connectionStatus == expected: + future.complete() + + let handle = EventConnectionStatusChange.listen(brokerCtx, handler).valueOr: + raiseAssert error + + try: + if not await future.withTimeout(TestTimeout): + raiseAssert "Timeout waiting for status: " & $expected + finally: + await EventConnectionStatusChange.dropListener(brokerCtx, handle) + proc createApiNodeConf(numShards: uint16 = 1): WakuNodeConf = var conf = defaultWakuNodeConf().valueOr: raiseAssert error @@ -73,128 +95,159 @@ proc createApiNodeConf(numShards: uint16 = 1): WakuNodeConf = conf.rest = false result = conf +type TestNetwork = ref object + storeNode: WakuNode + publisher: WakuNode + subscriber: Waku + storeNodePeerInfo: RemotePeerInfo + missedPayload: seq[byte] + +proc setupNetwork(testTopic: ContentTopic): Future[TestNetwork] {.async.} = + ## Returns a started subscriber subscribed to `testTopic` but not yet connected + ## to the store, with a message sitting in the store it never saw live. + const numShards: uint16 = 1 + let shard = PubsubTopic("/waku/2/rs/3/0") + + proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + discard + + # store node: archive + store + relay, subscribed to the shard + var storeNode: WakuNode + lockNewGlobalBrokerContext: + storeNode = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + storeNode.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect( + "Failed to mount metadata on storeNode" + ) + (await storeNode.mountRelay()).expect("Failed to mount relay on storeNode") + storeNode.mountArchive(newSqliteArchiveDriver()).expect("Failed to mount archive") + await storeNode.mountStore() + await storeNode.mountLibp2pPing() + await storeNode.start() + storeNode.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect( + "Failed to sub storeNode" + ) + + let storeNodePeerInfo = storeNode.peerInfo.toRemotePeerInfo() + + # publisher: relay, connected to the store so its messages get archived + var publisher: WakuNode + lockNewGlobalBrokerContext: + publisher = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + publisher.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect( + "Failed to mount metadata on publisher" + ) + (await publisher.mountRelay()).expect("Failed to mount relay on publisher") + await publisher.mountLibp2pPing() + await publisher.start() + publisher.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect( + "Failed to sub publisher" + ) + + await publisher.connectToNodes(@[storeNodePeerInfo]) + + var meshFormed = false + for _ in 0 ..< 50: + if publisher.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0: + meshFormed = true + break + await sleepAsync(100.milliseconds) + if not meshFormed: + raiseAssert "publisher<->store relay mesh did not form in time" + + # subscriber: created before the publish so the message timestamp lands after + # its RecvService startTimeToCheck watermark + var subscriber: Waku + lockNewGlobalBrokerContext: + subscriber = (await createNode(createApiNodeConf(numShards))).expect( + "Failed to create subscriber" + ) + subscriber.mountMessagingClient().expect("Failed to mount messaging") + (await subscriber.start()).expect("Failed to start subscriber") + + # publish while the subscriber is offline: the message reaches the archive but + # the subscriber never sees it via live relay + let missedPayload = "This message was missed".toBytes() + let missedMsg = WakuMessage( + payload: missedPayload, contentTopic: testTopic, version: 0, timestamp: now() + ) + discard (await publisher.publish(some(shard), missedMsg)).expect( + "Publish missed msg failed" + ) + + block waitArchive: + for _ in 0 ..< 50: + let query = archive_common.ArchiveQuery( + includeData: false, contentTopics: @[testTopic], pubsubTopic: some(shard) + ) + let res = await storeNode.wakuArchive.findMessages(query) + if res.isOk() and res.get().hashes.len > 0: + break waitArchive + await sleepAsync(100.milliseconds) + raiseAssert "Message was not archived in time" + + # subscribe to the content topic; with no peers yet the subscriber stays offline + (await subscriber.subscribe(testTopic)).expect("Failed to subscribe") + + return TestNetwork( + storeNode: storeNode, + publisher: publisher, + subscriber: subscriber, + storeNodePeerInfo: storeNodePeerInfo, + missedPayload: missedPayload, + ) + +proc teardown(net: TestNetwork) {.async.} = + if not isNil(net.subscriber): + (await net.subscriber.stop()).expect("Failed to stop subscriber") + net.subscriber = nil + if not isNil(net.publisher): + await net.publisher.stop() + net.publisher = nil + if not isNil(net.storeNode): + await net.storeNode.stop() + net.storeNode = nil + suite "Messaging API, Receive Service (store recovery)": asyncTest "recv_service delivers store-recovered messages via MessageReceivedEvent": - ## Message gets archived before subscriber exists, checkStore() recovers it. - ## This is a regression test: it proves that messages recovered via store by - ## the RecvService (instead of receiving via a live relay sub) are actually - ## delivered via the MessageReceivedEvent API. + ## Regression: a message archived before the subscriber connects is recovered + ## by an explicit checkStore() and delivered via MessageReceivedEvent. + let net = await setupNetwork(ContentTopic("/waku/2/recv-test/proto")) + defer: + await net.teardown() - let numShards: uint16 = 1 - let shards = @[PubsubTopic("/waku/2/rs/3/0")] - let shard = shards[0] - let testTopic = ContentTopic("/waku/2/recv-test/proto") - - proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - discard - - # store node has archive, store, relay - # it archives messages from relay and serves them to the - # subscriber's store client when it comes up (later) - var storeNode: WakuNode - lockNewGlobalBrokerContext: - storeNode = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - storeNode.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect( - "Failed to mount metadata on storeNode" - ) - (await storeNode.mountRelay()).expect("Failed to mount relay on storeNode") - let archiveDriver = newSqliteArchiveDriver() - storeNode.mountArchive(archiveDriver).expect("Failed to mount archive") - await storeNode.mountStore() - await storeNode.mountLibp2pPing() - await storeNode.start() - - for s in shards: - storeNode.subscribe((kind: PubsubSub, topic: s), dummyHandler).expect( - "Failed to sub storeNode" - ) - - let storeNodePeerInfo = storeNode.peerInfo.toRemotePeerInfo() - - # publisher node (relay) - var publisher: WakuNode - lockNewGlobalBrokerContext: - publisher = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - publisher.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect( - "Failed to mount metadata on publisher" - ) - (await publisher.mountRelay()).expect("Failed to mount relay on publisher") - await publisher.mountLibp2pPing() - await publisher.start() - - for s in shards: - publisher.subscribe((kind: PubsubSub, topic: s), dummyHandler).expect( - "Failed to sub publisher" - ) - - # connect publisher to store so messages get archived - await publisher.connectToNodes(@[storeNodePeerInfo]) - - # wait for relay mesh - for _ in 0 ..< 50: - if publisher.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0: - break - await sleepAsync(100.milliseconds) - - # create the subscriber before publishing. - # RecvService captures startTimeToCheck at construction time; the - # message's timestamp must land after that point to fall inside - # checkStore's time window. - var subscriber: Waku - lockNewGlobalBrokerContext: - subscriber = (await createNode(createApiNodeConf(numShards))).expect( - "Failed to create subscriber" - ) - subscriber.mountMessagingClient().expect("Failed to mount messaging") - (await subscriber.start()).expect("Failed to start subscriber") - - # publish after the subscriber exists but before it connects to the - # store; the message reaches the archive but the subscriber doesn't - # see it via live relay. - let missedPayload = "This message was missed".toBytes() - let missedMsg = WakuMessage( - payload: missedPayload, contentTopic: testTopic, version: 0, timestamp: now() - ) - discard (await publisher.publish(some(shard), missedMsg)).expect( - "Publish missed msg failed" - ) - - # wait for archive - block waitArchive: - for _ in 0 ..< 50: - let query = archive_common.ArchiveQuery( - includeData: false, contentTopics: @[testTopic], pubsubTopic: some(shard) - ) - let res = await storeNode.wakuArchive.findMessages(query) - if res.isOk() and res.get().hashes.len > 0: - break waitArchive - await sleepAsync(100.milliseconds) - raiseAssert "Message was not archived in time" - - # connect subscriber to store after the message is already archived so - # gossipsub doesn't replay it via the live path - await subscriber.node.connectToNodes(@[storeNodePeerInfo]) - - # subscribe to content topic - (await subscriber.subscribe(testTopic)).expect("Failed to subscribe") - - # listen before triggering store check - let eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) defer: await eventManager.teardown() - # trigger store check, should recover and deliver via MessageReceivedEvent - await subscriber.messagingClient.recvService.checkStore() + await net.subscriber.node.connectToNodes(@[net.storeNodePeerInfo]) + await net.subscriber.messagingClient.recvService.checkStore() - let received = await eventManager.waitForEvents(TestTimeout) - check received + check await eventManager.waitForEvents(TestTimeout) check eventManager.receivedMessages.len == 1 if eventManager.receivedMessages.len > 0: - check eventManager.receivedMessages[0].payload == missedPayload + check eventManager.receivedMessages[0].payload == net.missedPayload - # cleanup - (await subscriber.stop()).expect("Failed to stop subscriber") - await publisher.stop() - await storeNode.stop() + asyncTest "recv_service backfills missed messages when it comes back online": + ## Connecting a peer brings the subscriber online, firing the backfill that + ## recovers a message archived while it was offline. + let net = await setupNetwork(ContentTopic("/waku/2/recv-reconnect-test/proto")) + defer: + await net.teardown() + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + await eventManager.teardown() + + # sync on coming online (the transition that fires the backfill) before asserting + let onlineFut = waitForConnectionStatus( + net.subscriber.brokerCtx, ConnectionStatus.PartiallyConnected + ) + await net.subscriber.node.connectToNodes(@[net.storeNodePeerInfo]) + await onlineFut + + check await eventManager.waitForEvents(TestTimeout) + check eventManager.receivedMessages.len == 1 + if eventManager.receivedMessages.len > 0: + check eventManager.receivedMessages[0].payload == net.missedPayload