From dc026bbff19050155f0ef1abb3e03a64c8bba81e Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Mon, 30 Mar 2026 08:30:34 -0300 Subject: [PATCH] feat: active filter subscription management for edge nodes (#3773) feat: active filter subscription management for edge nodes ## Subscription Manager * edgeFilterSubLoop reconciles desired vs actual filter subscriptions * edgeFilterHealthLoop pings filter peers, evicts stale ones * EdgeFilterSubState per-shard tracking of confirmed peers and health * best-effort unsubscribe on peer removal * RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers ## WakuNode * Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth) * Register MessageSeenEvent push handler on filter client during start * startDeliveryService now returns `Result[void, string]` and propagates errors ## Health Monitor * getFilterClientHealth queries RequestEdgeFilterPeerCount via broker * Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive * Listen to EventShardTopicHealthChange for health recalculation * Add missing return p.notReady() on failed edge filter peer count request * HealthyThreshold constant moved to `connection_status.nim` ## Broker types * RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types * EventShardTopicHealthChange event type ## Filter Client * Add timeout parameter to ping proc ## Tests * Health monitor event tests with per-node lockNewGlobalBrokerContext * Edge (light client) health update test * Edge health driven by confirmed filter subscriptions test * API subscription tests: sub/receive, failover, peer replacement Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Co-authored by Zoltan Nagy --- tests/api/test_api_subscription.nim | 433 +++++++++++++++- tests/node/test_wakunode_health_monitor.nim | 209 ++++++-- waku/events/peer_events.nim | 2 +- waku/factory/waku.nim | 3 +- .../delivery_service/delivery_service.nim | 14 +- .../recv_service/recv_service.nim | 12 +- .../delivery_service/subscription_manager.nim | 464 ++++++++++++++++-- .../node/health_monitor/connection_status.nim | 3 + .../health_monitor/node_health_monitor.nim | 35 +- waku/node/peer_manager/peer_manager.nim | 37 +- waku/node/waku_node.nim | 93 +--- waku/requests/health_requests.nim | 12 + waku/waku_core/subscription.nim | 4 +- .../subscription/subscription_manager.nim | 52 -- waku/waku_filter_v2/client.nim | 10 +- waku/waku_relay/protocol.nim | 9 +- 16 files changed, 1126 insertions(+), 266 deletions(-) delete mode 100644 waku/waku_core/subscription/subscription_manager.nim diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim index 6639e3dea..e0ceb9226 100644 --- a/tests/api/test_api_subscription.nim +++ b/tests/api/test_api_subscription.nim @@ -1,6 +1,6 @@ {.used.} -import std/[strutils, net, options, sets] +import std/[strutils, sequtils, net, options, sets, tables] import chronos, testutils/unittests, stew/byteutils import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto] import ../testlib/[common, wakucore, wakunode, testasync] @@ -13,12 +13,12 @@ import common/broker/broker_context, events/message_events, waku_relay/protocol, + node/kernel_api/filter, + node/delivery_service/subscription_manager, ] import waku/factory/waku_conf import tools/confutils/cli_args -# TODO: Edge testing (after MAPI edge support is completed) - const TestTimeout = chronos.seconds(10) const NegativeTestTimeout = chronos.seconds(2) @@ -60,8 +60,10 @@ proc waitForEvents( return await manager.receivedEvent.wait().withTimeout(timeout) type TestNetwork = ref object - publisher: WakuNode + publisher: WakuNode # Relay node that publishes messages in tests. + meshBuddy: WakuNode # Extra relay peer for publisher's mesh (Edge tests only). subscriber: Waku + # The receiver node in tests. Edge node in edge tests, Core node in relay tests. publisherPeerInfo: RemotePeerInfo proc createApiNodeConf( @@ -94,8 +96,12 @@ proc setupNetwork( lockNewGlobalBrokerContext: net.publisher = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - net.publisher.mountMetadata(3, @[0'u16]).expect("Failed to mount metadata") + net.publisher.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect( + "Failed to mount metadata" + ) (await net.publisher.mountRelay()).expect("Failed to mount relay") + if mode == cli_args.WakuMode.Edge: + await net.publisher.mountFilter() await net.publisher.mountLibp2pPing() await net.publisher.start() @@ -104,16 +110,32 @@ proc setupNetwork( proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = discard - # Subscribe the publisher to all shards to guarantee a GossipSub mesh with the subscriber. - # Currently, Core/Relay nodes auto-subscribe to all network shards on boot, but if - # that changes, this will be needed to cause the publisher to have shard interest - # for any shards the subscriber may want to use, which is required for waitForMesh to work. + var shards: seq[PubsubTopic] for i in 0 ..< numShards.int: - let shard = PubsubTopic("/waku/2/rs/3/" & $i) + shards.add(PubsubTopic("/waku/2/rs/3/" & $i)) + + for shard in shards: net.publisher.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect( "Failed to sub publisher" ) + if mode == cli_args.WakuMode.Edge: + lockNewGlobalBrokerContext: + net.meshBuddy = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + net.meshBuddy.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect( + "Failed to mount metadata on meshBuddy" + ) + (await net.meshBuddy.mountRelay()).expect("Failed to mount relay on meshBuddy") + await net.meshBuddy.start() + + for shard in shards: + net.meshBuddy.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect( + "Failed to sub meshBuddy" + ) + + await net.meshBuddy.connectToNodes(@[net.publisherPeerInfo]) + net.subscriber = await setupSubscriberNode(createApiNodeConf(mode, numShards)) await net.subscriber.node.connectToNodes(@[net.publisherPeerInfo]) @@ -125,6 +147,10 @@ proc teardown(net: TestNetwork) {.async.} = (await net.subscriber.stop()).expect("Failed to stop subscriber node") net.subscriber = nil + if not isNil(net.meshBuddy): + await net.meshBuddy.stop() + net.meshBuddy = nil + if not isNil(net.publisher): await net.publisher.stop() net.publisher = nil @@ -141,18 +167,34 @@ proc waitForMesh(node: WakuNode, shard: PubsubTopic) {.async.} = await sleepAsync(100.milliseconds) raise newException(ValueError, "GossipSub Mesh failed to stabilize on " & shard) +proc waitForEdgeSubs(w: Waku, shard: PubsubTopic) {.async.} = + let sm = w.deliveryService.subscriptionManager + for _ in 0 ..< 50: + if sm.edgeFilterPeerCount(shard) > 0: + return + await sleepAsync(100.milliseconds) + raise newException(ValueError, "Edge filter subscription failed on " & shard) + proc publishToMesh( net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte] ): Future[Result[int, string]] {.async.} = + # Publishes a message from "publisher" via relay into the gossipsub mesh. let shard = net.subscriber.node.getRelayShard(contentTopic) - await waitForMesh(net.publisher, shard) - let msg = WakuMessage( payload: payload, contentTopic: contentTopic, version: 0, timestamp: now() ) return await net.publisher.publish(some(shard), msg) +proc publishToMeshAfterEdgeReady( + net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte] +): Future[Result[int, string]] {.async.} = + # First, ensure "subscriber" node (an edge node) is subscribed and ready to receive. + # Afterwards, "publisher" (relay node) sends the message in the gossipsub network. + let shard = net.subscriber.node.getRelayShard(contentTopic) + await waitForEdgeSubs(net.subscriber, shard) + return await net.publishToMesh(contentTopic, payload) + suite "Messaging API, SubscriptionManager": asyncTest "Subscription API, relay node auto subscribe and receive message": let net = await setupNetwork(1) @@ -398,3 +440,370 @@ suite "Messaging API, SubscriptionManager": activeSubs.add(t) await verifyNetworkState(activeSubs) + + asyncTest "Subscription API, edge node subscribe and receive message": + let net = await setupNetwork(1, cli_args.WakuMode.Edge) + defer: + await net.teardown() + + let testTopic = ContentTopic("/waku/2/test-content/proto") + (await net.subscriber.subscribe(testTopic)).expect("failed to subscribe") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + discard (await net.publishToMeshAfterEdgeReady(testTopic, "Hello, edge!".toBytes())).expect( + "Publish failed" + ) + + require await eventManager.waitForEvents(TestTimeout) + require eventManager.receivedMessages.len == 1 + check eventManager.receivedMessages[0].contentTopic == testTopic + + asyncTest "Subscription API, edge node ignores unsubscribed content topics": + let net = await setupNetwork(1, cli_args.WakuMode.Edge) + defer: + await net.teardown() + + let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto") + let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto") + (await net.subscriber.subscribe(subbedTopic)).expect("failed to subscribe") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + discard (await net.publishToMesh(ignoredTopic, "Ghost Msg".toBytes())).expect( + "Publish failed" + ) + + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 + + asyncTest "Subscription API, edge node unsubscribe stops message receipt": + let net = await setupNetwork(1, cli_args.WakuMode.Edge) + defer: + await net.teardown() + + let testTopic = ContentTopic("/waku/2/unsub-test/proto") + + (await net.subscriber.subscribe(testTopic)).expect("failed to subscribe") + net.subscriber.unsubscribe(testTopic).expect("failed to unsubscribe") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + discard (await net.publishToMesh(testTopic, "Should be dropped".toBytes())).expect( + "Publish failed" + ) + + check not await eventManager.waitForEvents(NegativeTestTimeout) + check eventManager.receivedMessages.len == 0 + + asyncTest "Subscription API, edge node overlapping topics isolation": + let net = await setupNetwork(1, cli_args.WakuMode.Edge) + defer: + await net.teardown() + + let topicA = ContentTopic("/waku/2/topic-a/proto") + let topicB = ContentTopic("/waku/2/topic-b/proto") + (await net.subscriber.subscribe(topicA)).expect("failed to sub A") + (await net.subscriber.subscribe(topicB)).expect("failed to sub B") + + let shard = net.subscriber.node.getRelayShard(topicA) + await waitForEdgeSubs(net.subscriber, shard) + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + defer: + eventManager.teardown() + + net.subscriber.unsubscribe(topicA).expect("failed to unsub A") + + discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect( + "Publish A failed" + ) + discard + (await net.publishToMesh(topicB, "Kept Msg".toBytes())).expect("Publish B failed") + + require await eventManager.waitForEvents(TestTimeout) + require eventManager.receivedMessages.len == 1 + check eventManager.receivedMessages[0].contentTopic == topicB + + asyncTest "Subscription API, edge node resubscribe after unsubscribe": + let net = await setupNetwork(1, cli_args.WakuMode.Edge) + defer: + await net.teardown() + + let testTopic = ContentTopic("/waku/2/resub-test/proto") + + (await net.subscriber.subscribe(testTopic)).expect("Initial sub failed") + + var eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 1".toBytes())).expect( + "Pub 1 failed" + ) + + require await eventManager.waitForEvents(TestTimeout) + eventManager.teardown() + + net.subscriber.unsubscribe(testTopic).expect("Unsub failed") + eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + + discard + (await net.publishToMesh(testTopic, "Ghost".toBytes())).expect("Ghost pub failed") + + check not await eventManager.waitForEvents(NegativeTestTimeout) + eventManager.teardown() + + (await net.subscriber.subscribe(testTopic)).expect("Resub failed") + eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + + discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 2".toBytes())).expect( + "Pub 2 failed" + ) + + require await eventManager.waitForEvents(TestTimeout) + check eventManager.receivedMessages[0].payload == "Msg 2".toBytes() + + asyncTest "Subscription API, edge node failover after service peer dies": + # NOTE: This test is a bit more verbose because it defines a custom topology. + # It doesn't use the shared TestNetwork helper. + # This mounts two service peers for the edge node then fails one. + let numShards: uint16 = 1 + let shards = @[PubsubTopic("/waku/2/rs/3/0")] + + proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + discard + + var publisher: WakuNode + lockNewGlobalBrokerContext: + publisher = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + publisher.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect( + "Failed to mount metadata on publisher" + ) + (await publisher.mountRelay()).expect("Failed to mount relay on publisher") + await publisher.mountFilter() + await publisher.mountLibp2pPing() + await publisher.start() + + for shard in shards: + publisher.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect( + "Failed to sub publisher" + ) + + let publisherPeerInfo = publisher.peerInfo.toRemotePeerInfo() + + var meshBuddy: WakuNode + lockNewGlobalBrokerContext: + meshBuddy = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + meshBuddy.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect( + "Failed to mount metadata on meshBuddy" + ) + (await meshBuddy.mountRelay()).expect("Failed to mount relay on meshBuddy") + await meshBuddy.mountFilter() + await meshBuddy.mountLibp2pPing() + await meshBuddy.start() + + for shard in shards: + meshBuddy.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect( + "Failed to sub meshBuddy" + ) + + let meshBuddyPeerInfo = meshBuddy.peerInfo.toRemotePeerInfo() + + await meshBuddy.connectToNodes(@[publisherPeerInfo]) + + let conf = createApiNodeConf(cli_args.WakuMode.Edge, numShards) + var subscriber: Waku + lockNewGlobalBrokerContext: + subscriber = (await createNode(conf)).expect("Failed to create edge subscriber") + (await startWaku(addr subscriber)).expect("Failed to start edge subscriber") + + # Connect edge subscriber to both filter servers so selectPeers finds both + await subscriber.node.connectToNodes(@[publisherPeerInfo, meshBuddyPeerInfo]) + + let testTopic = ContentTopic("/waku/2/failover-test/proto") + let shard = subscriber.node.getRelayShard(testTopic) + + (await subscriber.subscribe(testTopic)).expect("Failed to subscribe") + + # Wait for dialing both filter servers (HealthyThreshold = 2) + for _ in 0 ..< 100: + if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2: + break + await sleepAsync(100.milliseconds) + + check subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2 + + # Verify message delivery with both servers alive + await waitForMesh(publisher, shard) + + var eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1) + let msg1 = WakuMessage( + payload: "Before failover".toBytes(), + contentTopic: testTopic, + version: 0, + timestamp: now(), + ) + discard (await publisher.publish(some(shard), msg1)).expect("Publish 1 failed") + + require await eventManager.waitForEvents(TestTimeout) + check eventManager.receivedMessages[0].payload == "Before failover".toBytes() + eventManager.teardown() + + # Disconnect meshBuddy from edge (keeps relay mesh alive for publishing) + await subscriber.node.disconnectNode(meshBuddyPeerInfo) + + # Wait for the dead peer to be pruned + for _ in 0 ..< 50: + if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) < 2: + break + await sleepAsync(100.milliseconds) + + check subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 1 + + # Verify messages still arrive through the surviving filter server (publisher) + eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1) + let msg2 = WakuMessage( + payload: "After failover".toBytes(), + contentTopic: testTopic, + version: 0, + timestamp: now(), + ) + discard (await publisher.publish(some(shard), msg2)).expect("Publish 2 failed") + + require await eventManager.waitForEvents(TestTimeout) + check eventManager.receivedMessages[0].payload == "After failover".toBytes() + eventManager.teardown() + + (await subscriber.stop()).expect("Failed to stop subscriber") + await meshBuddy.stop() + await publisher.stop() + + asyncTest "Subscription API, edge node dials replacement after peer eviction": + # 3 service peers: publisher, meshBuddy, sparePeer. Edge subscribes and + # confirms 2 (HealthyThreshold). After one is disconnected, the sub loop + # should detect the loss and dial the spare to recover back to threshold. + let numShards: uint16 = 1 + let shards = @[PubsubTopic("/waku/2/rs/3/0")] + + proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + discard + + var publisher: WakuNode + lockNewGlobalBrokerContext: + publisher = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + publisher.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect( + "Failed to mount metadata on publisher" + ) + (await publisher.mountRelay()).expect("Failed to mount relay on publisher") + await publisher.mountFilter() + await publisher.mountLibp2pPing() + await publisher.start() + + for shard in shards: + publisher.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect( + "Failed to sub publisher" + ) + + let publisherPeerInfo = publisher.peerInfo.toRemotePeerInfo() + + var meshBuddy: WakuNode + lockNewGlobalBrokerContext: + meshBuddy = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + meshBuddy.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect( + "Failed to mount metadata on meshBuddy" + ) + (await meshBuddy.mountRelay()).expect("Failed to mount relay on meshBuddy") + await meshBuddy.mountFilter() + await meshBuddy.mountLibp2pPing() + await meshBuddy.start() + + for shard in shards: + meshBuddy.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect( + "Failed to sub meshBuddy" + ) + + let meshBuddyPeerInfo = meshBuddy.peerInfo.toRemotePeerInfo() + + var sparePeer: WakuNode + lockNewGlobalBrokerContext: + sparePeer = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + sparePeer.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect( + "Failed to mount metadata on sparePeer" + ) + (await sparePeer.mountRelay()).expect("Failed to mount relay on sparePeer") + await sparePeer.mountFilter() + await sparePeer.mountLibp2pPing() + await sparePeer.start() + + for shard in shards: + sparePeer.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect( + "Failed to sub sparePeer" + ) + + let sparePeerInfo = sparePeer.peerInfo.toRemotePeerInfo() + + await meshBuddy.connectToNodes(@[publisherPeerInfo]) + await sparePeer.connectToNodes(@[publisherPeerInfo]) + + let conf = createApiNodeConf(cli_args.WakuMode.Edge, numShards) + var subscriber: Waku + lockNewGlobalBrokerContext: + subscriber = (await createNode(conf)).expect("Failed to create edge subscriber") + (await startWaku(addr subscriber)).expect("Failed to start edge subscriber") + + await subscriber.node.connectToNodes( + @[publisherPeerInfo, meshBuddyPeerInfo, sparePeerInfo] + ) + + let testTopic = ContentTopic("/waku/2/replacement-test/proto") + let shard = subscriber.node.getRelayShard(testTopic) + + (await subscriber.subscribe(testTopic)).expect("Failed to subscribe") + + # Wait for 2 confirmed peers (HealthyThreshold). The 3rd is available but not dialed. + for _ in 0 ..< 100: + if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2: + break + await sleepAsync(100.milliseconds) + + require subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) == + 2 + + await subscriber.node.disconnectNode(meshBuddyPeerInfo) + + # Wait for the sub loop to detect the loss and dial a replacement + for _ in 0 ..< 100: + if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2: + break + await sleepAsync(100.milliseconds) + + check subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2 + + await waitForMesh(publisher, shard) + + var eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1) + let msg = WakuMessage( + payload: "After replacement".toBytes(), + contentTopic: testTopic, + version: 0, + timestamp: now(), + ) + discard (await publisher.publish(some(shard), msg)).expect("Publish failed") + + require await eventManager.waitForEvents(TestTimeout) + check eventManager.receivedMessages[0].payload == "After replacement".toBytes() + eventManager.teardown() + + (await subscriber.stop()).expect("Failed to stop subscriber") + await sparePeer.stop() + await meshBuddy.stop() + await publisher.stop() diff --git a/tests/node/test_wakunode_health_monitor.nim b/tests/node/test_wakunode_health_monitor.nim index 416dc9dda..8a3ddd104 100644 --- a/tests/node/test_wakunode_health_monitor.nim +++ b/tests/node/test_wakunode_health_monitor.nim @@ -12,12 +12,18 @@ import node/health_monitor/health_status, node/health_monitor/connection_status, node/health_monitor/protocol_health, + node/health_monitor/topic_health, node/health_monitor/node_health_monitor, + node/delivery_service/delivery_service, + node/delivery_service/subscription_manager, node/kernel_api/relay, node/kernel_api/store, node/kernel_api/lightpush, node/kernel_api/filter, + events/health_events, + events/peer_events, waku_archive, + common/broker/broker_context, ] import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils @@ -129,13 +135,12 @@ suite "Health Monitor - health state calculation": suite "Health Monitor - events": asyncTest "Core (relay) health update": - let - nodeAKey = generateSecp256k1Key() + var nodeA: WakuNode + lockNewGlobalBrokerContext: + let nodeAKey = generateSecp256k1Key() nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0)) - - (await nodeA.mountRelay()).expect("Node A failed to mount Relay") - - await nodeA.start() + (await nodeA.mountRelay()).expect("Node A failed to mount Relay") + await nodeA.start() let monitorA = NodeHealthMonitor.new(nodeA) @@ -151,17 +156,15 @@ suite "Health Monitor - events": monitorA.startHealthMonitor().expect("Health monitor failed to start") - let - nodeBKey = generateSecp256k1Key() + var nodeB: WakuNode + lockNewGlobalBrokerContext: + let nodeBKey = generateSecp256k1Key() nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0)) - - let driver = newSqliteArchiveDriver() - nodeB.mountArchive(driver).expect("Node B failed to mount archive") - - (await nodeB.mountRelay()).expect("Node B failed to mount relay") - await nodeB.mountStore() - - await nodeB.start() + let driver = newSqliteArchiveDriver() + nodeB.mountArchive(driver).expect("Node B failed to mount archive") + (await nodeB.mountRelay()).expect("Node B failed to mount relay") + await nodeB.mountStore() + await nodeB.start() await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()]) @@ -214,15 +217,20 @@ suite "Health Monitor - events": await nodeA.stop() asyncTest "Edge (light client) health update": - let - nodeAKey = generateSecp256k1Key() + var nodeA: WakuNode + lockNewGlobalBrokerContext: + let nodeAKey = generateSecp256k1Key() nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0)) + nodeA.mountLightpushClient() + await nodeA.mountFilterClient() + nodeA.mountStoreClient() + require nodeA.mountAutoSharding(1, 8).isOk + nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata") + await nodeA.start() - nodeA.mountLightpushClient() - await nodeA.mountFilterClient() - nodeA.mountStoreClient() - - await nodeA.start() + let ds = + DeliveryService.new(false, nodeA).expect("Failed to create DeliveryService") + ds.startDeliveryService().expect("Failed to start DeliveryService") let monitorA = NodeHealthMonitor.new(nodeA) @@ -238,23 +246,40 @@ suite "Health Monitor - events": monitorA.startHealthMonitor().expect("Health monitor failed to start") - let - nodeBKey = generateSecp256k1Key() + var nodeB: WakuNode + lockNewGlobalBrokerContext: + let nodeBKey = generateSecp256k1Key() nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0)) + let driver = newSqliteArchiveDriver() + nodeB.mountArchive(driver).expect("Node B failed to mount archive") + (await nodeB.mountRelay()).expect("Node B failed to mount relay") + (await nodeB.mountLightpush()).expect("Node B failed to mount lightpush") + await nodeB.mountFilter() + await nodeB.mountStore() + require nodeB.mountAutoSharding(1, 8).isOk + nodeB.mountMetadata(1, toSeq(0'u16 ..< 8'u16)).expect( + "Node B failed to mount metadata" + ) + await nodeB.start() - let driver = newSqliteArchiveDriver() - nodeB.mountArchive(driver).expect("Node B failed to mount archive") - - (await nodeB.mountRelay()).expect("Node B failed to mount relay") - - (await nodeB.mountLightpush()).expect("Node B failed to mount lightpush") - await nodeB.mountFilter() - await nodeB.mountStore() - - await nodeB.start() + var metadataFut = newFuture[void]("waitForMetadata") + let metadataLis = WakuPeerEvent + .listen( + nodeA.brokerCtx, + proc(evt: WakuPeerEvent): Future[void] {.async: (raises: []), gcsafe.} = + if not metadataFut.finished and + evt.kind == WakuPeerEventKind.EventMetadataUpdated: + metadataFut.complete() + , + ) + .expect("Failed to listen for metadata") await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()]) + let metadataOk = await metadataFut.withTimeout(TestConnectivityTimeLimit) + WakuPeerEvent.dropListener(nodeA.brokerCtx, metadataLis) + require metadataOk + let connectTimeLimit = Moment.now() + TestConnectivityTimeLimit var gotConnected = false @@ -292,4 +317,118 @@ suite "Health Monitor - events": lastStatus == ConnectionStatus.Disconnected await monitorA.stopHealthMonitor() + await ds.stopDeliveryService() + await nodeA.stop() + + asyncTest "Edge health driven by confirmed filter subscriptions": + var nodeA: WakuNode + lockNewGlobalBrokerContext: + let nodeAKey = generateSecp256k1Key() + nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0)) + await nodeA.mountFilterClient() + nodeA.mountLightpushClient() + nodeA.mountStoreClient() + require nodeA.mountAutoSharding(1, 8).isOk + nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata") + await nodeA.start() + + let ds = + DeliveryService.new(false, nodeA).expect("Failed to create DeliveryService") + ds.startDeliveryService().expect("Failed to start DeliveryService") + let subMgr = ds.subscriptionManager + + var nodeB: WakuNode + lockNewGlobalBrokerContext: + let nodeBKey = generateSecp256k1Key() + nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0)) + let driver = newSqliteArchiveDriver() + nodeB.mountArchive(driver).expect("Node B failed to mount archive") + (await nodeB.mountRelay()).expect("Node B failed to mount relay") + (await nodeB.mountLightpush()).expect("Node B failed to mount lightpush") + await nodeB.mountFilter() + await nodeB.mountStore() + require nodeB.mountAutoSharding(1, 8).isOk + nodeB.mountMetadata(1, toSeq(0'u16 ..< 8'u16)).expect( + "Node B failed to mount metadata" + ) + await nodeB.start() + + let monitorA = NodeHealthMonitor.new(nodeA) + + var + lastStatus = ConnectionStatus.Disconnected + healthSignal = newAsyncEvent() + + monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} = + lastStatus = status + healthSignal.fire() + + monitorA.startHealthMonitor().expect("Health monitor failed to start") + + var metadataFut = newFuture[void]("waitForMetadata") + let metadataLis = WakuPeerEvent + .listen( + nodeA.brokerCtx, + proc(evt: WakuPeerEvent): Future[void] {.async: (raises: []), gcsafe.} = + if not metadataFut.finished and + evt.kind == WakuPeerEventKind.EventMetadataUpdated: + metadataFut.complete() + , + ) + .expect("Failed to listen for metadata") + + await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()]) + + let metadataOk = await metadataFut.withTimeout(TestConnectivityTimeLimit) + WakuPeerEvent.dropListener(nodeA.brokerCtx, metadataLis) + require metadataOk + + var deadline = Moment.now() + TestConnectivityTimeLimit + while Moment.now() < deadline: + if lastStatus == ConnectionStatus.PartiallyConnected: + break + if await healthSignal.wait().withTimeout(deadline - Moment.now()): + healthSignal.clear() + + check lastStatus == ConnectionStatus.PartiallyConnected + + var shardHealthFut = newFuture[EventShardTopicHealthChange]("waitForShardHealth") + + let shardHealthLis = EventShardTopicHealthChange + .listen( + nodeA.brokerCtx, + proc( + evt: EventShardTopicHealthChange + ): Future[void] {.async: (raises: []), gcsafe.} = + if not shardHealthFut.finished and ( + evt.health == TopicHealth.MINIMALLY_HEALTHY or + evt.health == TopicHealth.SUFFICIENTLY_HEALTHY + ): + shardHealthFut.complete(evt) + , + ) + .expect("Failed to listen for shard health") + + let contentTopic = ContentTopic("/waku/2/default-content/proto") + subMgr.subscribe(contentTopic).expect("Failed to subscribe") + + let shardHealthOk = await shardHealthFut.withTimeout(TestConnectivityTimeLimit) + EventShardTopicHealthChange.dropListener(nodeA.brokerCtx, shardHealthLis) + + check shardHealthOk == true + check subMgr.edgeFilterSubStates.len > 0 + + healthSignal.clear() + deadline = Moment.now() + TestConnectivityTimeLimit + while Moment.now() < deadline: + if lastStatus == ConnectionStatus.PartiallyConnected: + break + if await healthSignal.wait().withTimeout(deadline - Moment.now()): + healthSignal.clear() + + check lastStatus == ConnectionStatus.PartiallyConnected + + await ds.stopDeliveryService() + await monitorA.stopHealthMonitor() + await nodeB.stop() await nodeA.stop() diff --git a/waku/events/peer_events.nim b/waku/events/peer_events.nim index 49dfa9f9a..dd02841f7 100644 --- a/waku/events/peer_events.nim +++ b/waku/events/peer_events.nim @@ -8,6 +8,6 @@ type WakuPeerEventKind* {.pure.} = enum EventMetadataUpdated EventBroker: - type EventWakuPeer* = object + type WakuPeerEvent* = object peerId*: PeerId kind*: WakuPeerEventKind diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index dbee8d093..45e0edee0 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -416,7 +416,8 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: ## Reliability if not waku[].deliveryService.isNil(): - waku[].deliveryService.startDeliveryService() + waku[].deliveryService.startDeliveryService().isOkOr: + return err("failed to start delivery service: " & $error) ## Health Monitor waku[].healthMonitor.startHealthMonitor().isOkOr: diff --git a/waku/node/delivery_service/delivery_service.nim b/waku/node/delivery_service/delivery_service.nim index 258c01e95..fd728d048 100644 --- a/waku/node/delivery_service/delivery_service.nim +++ b/waku/node/delivery_service/delivery_service.nim @@ -1,18 +1,13 @@ ## This module helps to ensure the correct transmission and reception of messages import results -import chronos +import chronos, chronicles import ./recv_service, ./send_service, ./subscription_manager, waku/[ - waku_core, - waku_node, - waku_store/client, - waku_relay/protocol, - waku_lightpush/client, - waku_filter_v2/client, + waku_core, waku_node, waku_store/client, waku_relay/protocol, waku_lightpush/client ] type DeliveryService* = ref object @@ -37,10 +32,11 @@ proc new*( ) ) -proc startDeliveryService*(self: DeliveryService) = - self.subscriptionManager.startSubscriptionManager() +proc startDeliveryService*(self: DeliveryService): Result[void, string] = + ?self.subscriptionManager.startSubscriptionManager() self.recvService.startRecvService() self.sendService.startSendService() + return ok() proc stopDeliveryService*(self: DeliveryService) {.async.} = await self.sendService.stopSendService() diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 0eba2c450..9a85df2f9 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -91,20 +91,20 @@ proc msgChecker(self: RecvService) {.async.} = self.endTimeToCheck = getNowInNanosecondTime() var msgHashesInStore = newSeq[WakuMessageHash](0) - for sub in self.subscriptionManager.getActiveSubscriptions(): + for pubsubTopic, contentTopics in self.subscriptionManager.subscribedTopics: let storeResp: StoreQueryResponse = ( await self.node.wakuStoreClient.queryToAny( StoreQueryRequest( includeData: false, - pubsubTopic: some(PubsubTopic(sub.pubsubTopic)), - contentTopics: sub.contentTopics, + pubsubTopic: some(pubsubTopic), + contentTopics: toSeq(contentTopics), startTime: some(self.startTimeToCheck - DelayExtra.nanos), endTime: some(self.endTimeToCheck + DelayExtra.nanos), ) ) ).valueOr: error "msgChecker failed to get remote msgHashes", - pubsubTopic = sub.pubsubTopic, cTopics = sub.contentTopics, error = $error + pubsubTopic = pubsubTopic, cTopics = toSeq(contentTopics), error = $error continue msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash)) @@ -154,10 +154,6 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T = recentReceivedMsgs: @[], ) - # TODO: For MAPI Edge support, either call node.wakuFilterClient.registerPushHandler - # so that the RecvService listens to incoming filter messages, - # or have the filter client emit MessageSeenEvent. - return recvService proc loopPruneOldMessages(self: RecvService) {.async.} = diff --git a/waku/node/delivery_service/subscription_manager.nim b/waku/node/delivery_service/subscription_manager.nim index 22df47413..70d8df7f0 100644 --- a/waku/node/delivery_service/subscription_manager.nim +++ b/waku/node/delivery_service/subscription_manager.nim @@ -1,4 +1,5 @@ -import std/[sets, tables, options, strutils], chronos, chronicles, results +import std/[sequtils, sets, tables, options, strutils], chronos, chronicles, results +import libp2p/[peerid, peerinfo] import waku/[ waku_core, @@ -6,16 +7,67 @@ import waku_core/topics/sharding, waku_node, waku_relay, + waku_filter_v2/common as filter_common, + waku_filter_v2/client as filter_client, + waku_filter_v2/protocol as filter_protocol, common/broker/broker_context, - events/delivery_events, + events/health_events, + events/peer_events, + requests/health_requests, + node/peer_manager, + node/health_monitor/topic_health, + node/health_monitor/connection_status, ] +# --------------------------------------------------------------------------- +# Logos Messaging API SubscriptionManager +# +# Maps all topic subscription intent and centralizes all consistency +# maintenance of the pubsub and content topic subscription model across +# the various network drivers that handle topics (Edge/Filter and Core/Relay). +# --------------------------------------------------------------------------- + +type EdgeFilterSubState* = object + peers: seq[RemotePeerInfo] + ## Filter service peers with confirmed subscriptions on this shard. + pending: seq[Future[void]] ## In-flight dial futures for peers not yet confirmed. + pendingPeers: HashSet[PeerId] ## PeerIds of peers currently being dialed. + currentHealth: TopicHealth + ## Cached health derived from peers.len; updated on every peer set change. + +func toTopicHealth*(peersCount: int): TopicHealth = + if peersCount >= HealthyThreshold: + TopicHealth.SUFFICIENTLY_HEALTHY + elif peersCount > 0: + TopicHealth.MINIMALLY_HEALTHY + else: + TopicHealth.UNHEALTHY + type SubscriptionManager* = ref object of RootObj node: WakuNode contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]] ## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only. ## A present key with an empty HashSet value means pubsubtopic already subscribed ## (via subscribePubsubTopics()) but there's no specific content topic interest yet. + edgeFilterSubStates*: Table[PubsubTopic, EdgeFilterSubState] + ## Per-shard filter subscription state for edge mode. + edgeFilterWakeup: AsyncEvent + ## Signalled when the edge filter sub loop should re-reconcile. + edgeFilterSubLoopFut: Future[void] + edgeFilterHealthLoopFut: Future[void] + peerEventListener: WakuPeerEventListener + ## Listener for peer connect/disconnect events (edge filter wakeup). + +iterator subscribedTopics*( + self: SubscriptionManager +): (PubsubTopic, HashSet[ContentTopic]) = + for pubsub, topics in self.contentTopicSubs.pairs: + yield (pubsub, topics) + +proc edgeFilterPeerCount*(sm: SubscriptionManager, shard: PubsubTopic): int = + sm.edgeFilterSubStates.withValue(shard, state): + return state.peers.len + return 0 proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T = SubscriptionManager( @@ -25,30 +77,35 @@ proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T = proc addContentTopicInterest( self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic ): Result[void, string] = + var changed = false if not self.contentTopicSubs.hasKey(shard): self.contentTopicSubs[shard] = initHashSet[ContentTopic]() + changed = true self.contentTopicSubs.withValue(shard, cTopics): if not cTopics[].contains(topic): cTopics[].incl(topic) + changed = true - # TODO: Call a "subscribe(shard, topic)" on filter client here, - # so the filter client can know that subscriptions changed. + if changed and not isNil(self.edgeFilterWakeup): + self.edgeFilterWakeup.fire() return ok() proc removeContentTopicInterest( self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic ): Result[void, string] = + var changed = false self.contentTopicSubs.withValue(shard, cTopics): if cTopics[].contains(topic): cTopics[].excl(topic) + changed = true if cTopics[].len == 0 and isNil(self.node.wakuRelay): self.contentTopicSubs.del(shard) # We're done with cTopics here - # TODO: Call a "unsubscribe(shard, topic)" on filter client here, - # so the filter client can know that subscriptions changed. + if changed and not isNil(self.edgeFilterWakeup): + self.edgeFilterWakeup.fire() return ok() @@ -73,46 +130,6 @@ proc subscribePubsubTopics( return ok() -proc startSubscriptionManager*(self: SubscriptionManager) = - if isNil(self.node.wakuRelay): - return - - if self.node.wakuAutoSharding.isSome(): - # Subscribe relay to all shards in autosharding. - let autoSharding = self.node.wakuAutoSharding.get() - let clusterId = autoSharding.clusterId - let numShards = autoSharding.shardCountGenZero - - if numShards > 0: - var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards) - - for i in 0 ..< numShards: - let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i)) - clusterPubsubTopics.add(PubsubTopic($shardObj)) - - self.subscribePubsubTopics(clusterPubsubTopics).isOkOr: - error "Failed to auto-subscribe Relay to cluster shards: ", error = error - else: - info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe." - -proc stopSubscriptionManager*(self: SubscriptionManager) {.async.} = - discard - -proc getActiveSubscriptions*( - self: SubscriptionManager -): seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] = - var activeSubs: seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] = - @[] - - for pubsub, cTopicSet in self.contentTopicSubs.pairs: - if cTopicSet.len > 0: - var cTopicSeq = newSeqOfCap[ContentTopic](cTopicSet.len) - for t in cTopicSet: - cTopicSeq.add(t) - activeSubs.add((pubsub, cTopicSeq)) - - return activeSubs - proc getShardForContentTopic( self: SubscriptionManager, topic: ContentTopic ): Result[PubsubTopic, string] = @@ -162,3 +179,358 @@ proc unsubscribe*( ?self.removeContentTopicInterest(shard, topic) return ok() + +# --------------------------------------------------------------------------- +# Edge Filter driver for the Logos Messaging API +# +# The SubscriptionManager absorbs natively the responsibility of using the +# Edge Filter protocol to effect subscriptions and message receipt for edge. +# --------------------------------------------------------------------------- + +const EdgeFilterSubscribeTimeout = chronos.seconds(15) + ## Timeout for a single filter subscribe/unsubscribe RPC to a service peer. +const EdgeFilterPingTimeout = chronos.seconds(5) + ## Timeout for a filter ping health check. +const EdgeFilterLoopInterval = chronos.seconds(30) + ## Interval for the edge filter health ping loop. +const EdgeFilterSubLoopDebounce = chronos.seconds(1) + ## Debounce delay to coalesce rapid-fire wakeups into a single reconciliation pass. + +proc updateShardHealth( + self: SubscriptionManager, shard: PubsubTopic, state: var EdgeFilterSubState +) = + ## Recompute and emit health for a shard after its peer set changed. + let newHealth = toTopicHealth(state.peers.len) + if newHealth != state.currentHealth: + state.currentHealth = newHealth + EventShardTopicHealthChange.emit(self.node.brokerCtx, shard, newHealth) + +proc removePeer(self: SubscriptionManager, shard: PubsubTopic, peerId: PeerId) = + ## Remove a peer from edgeFilterSubStates for the given shard, + ## update health, and wake the sub loop to dial a replacement. + ## Best-effort unsubscribe so the service peer stops pushing to us. + self.edgeFilterSubStates.withValue(shard, state): + var peer: RemotePeerInfo + var found = false + for p in state.peers: + if p.peerId == peerId: + peer = p + found = true + break + if not found: + return + + state.peers.keepItIf(it.peerId != peerId) + self.updateShardHealth(shard, state[]) + self.edgeFilterWakeup.fire() + + if not self.node.wakuFilterClient.isNil(): + self.contentTopicSubs.withValue(shard, topics): + let ct = toSeq(topics[]) + if ct.len > 0: + proc doUnsubscribe() {.async.} = + discard await self.node.wakuFilterClient.unsubscribe(peer, shard, ct) + + asyncSpawn doUnsubscribe() + +type SendChunkedFilterRpcKind = enum + FilterSubscribe + FilterUnsubscribe + +proc sendChunkedFilterRpc( + self: SubscriptionManager, + peer: RemotePeerInfo, + shard: PubsubTopic, + topics: seq[ContentTopic], + kind: SendChunkedFilterRpcKind, +): Future[bool] {.async.} = + ## Send a chunked filter subscribe or unsubscribe RPC. Returns true on + ## success. On failure the peer is removed and false is returned. + try: + var i = 0 + while i < topics.len: + let chunk = + topics[i ..< min(i + filter_protocol.MaxContentTopicsPerRequest, topics.len)] + let fut = + case kind + of FilterSubscribe: + self.node.wakuFilterClient.subscribe(peer, shard, chunk) + of FilterUnsubscribe: + self.node.wakuFilterClient.unsubscribe(peer, shard, chunk) + if not (await fut.withTimeout(EdgeFilterSubscribeTimeout)) or fut.read().isErr(): + trace "sendChunkedFilterRpc: chunk failed", + op = kind, shard = shard, peer = peer.peerId + self.removePeer(shard, peer.peerId) + return false + i += filter_protocol.MaxContentTopicsPerRequest + except CatchableError as exc: + debug "sendChunkedFilterRpc: failed", + op = kind, shard = shard, peer = peer.peerId, err = exc.msg + self.removePeer(shard, peer.peerId) + return false + return true + +proc syncFilterDeltas( + self: SubscriptionManager, + peer: RemotePeerInfo, + shard: PubsubTopic, + added: seq[ContentTopic], + removed: seq[ContentTopic], +) {.async.} = + ## Push content topic changes (adds/removes) to an already-tracked peer. + if added.len > 0: + if not await self.sendChunkedFilterRpc(peer, shard, added, FilterSubscribe): + return + + if removed.len > 0: + discard await self.sendChunkedFilterRpc(peer, shard, removed, FilterUnsubscribe) + +proc dialFilterPeer( + self: SubscriptionManager, + peer: RemotePeerInfo, + shard: PubsubTopic, + contentTopics: seq[ContentTopic], +) {.async.} = + ## Subscribe a new peer to all content topics on a shard and start tracking it. + self.edgeFilterSubStates.withValue(shard, state): + state.pendingPeers.incl(peer.peerId) + + try: + if not await self.sendChunkedFilterRpc(peer, shard, contentTopics, FilterSubscribe): + return + + self.edgeFilterSubStates.withValue(shard, state): + if state.peers.anyIt(it.peerId == peer.peerId): + trace "dialFilterPeer: peer already tracked, skipping duplicate", + shard = shard, peer = peer.peerId + return + + state.peers.add(peer) + self.updateShardHealth(shard, state[]) + trace "dialFilterPeer: successfully subscribed to all chunks", + shard = shard, peer = peer.peerId, totalPeers = state.peers.len + do: + trace "dialFilterPeer: shard removed while subscribing, discarding result", + shard = shard, peer = peer.peerId + finally: + self.edgeFilterSubStates.withValue(shard, state): + state.pendingPeers.excl(peer.peerId) + +proc edgeFilterHealthLoop*(self: SubscriptionManager) {.async.} = + ## Periodically pings all connected filter service peers to verify they are + ## still alive at the application layer. Peers that fail the ping are removed. + while true: + await sleepAsync(EdgeFilterLoopInterval) + + if self.node.wakuFilterClient.isNil(): + warn "filter client is nil within edge filter health loop" + continue + + var connected = initTable[PeerId, RemotePeerInfo]() + for state in self.edgeFilterSubStates.values: + for peer in state.peers: + if self.node.peerManager.switch.peerStore.isConnected(peer.peerId): + connected[peer.peerId] = peer + + var alive = initHashSet[PeerId]() + + if connected.len > 0: + var pingTasks: seq[(PeerId, Future[FilterSubscribeResult])] = @[] + for peer in connected.values: + pingTasks.add( + (peer.peerId, self.node.wakuFilterClient.ping(peer, EdgeFilterPingTimeout)) + ) + + # extract future tasks from (PeerId, Future) tuples and await them + await allFutures(pingTasks.mapIt(it[1])) + + for (peerId, task) in pingTasks: + if task.read().isOk(): + alive.incl(peerId) + + var changed = false + for shard, state in self.edgeFilterSubStates.mpairs: + let oldLen = state.peers.len + state.peers.keepItIf(it.peerId notin connected or alive.contains(it.peerId)) + + if state.peers.len < oldLen: + changed = true + self.updateShardHealth(shard, state) + trace "Edge Filter health degraded by Ping failure", + shard = shard, new = state.currentHealth + + if changed: + self.edgeFilterWakeup.fire() + +proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} = + ## Reconciles filter subscriptions with the desired state from SubscriptionManager. + var lastSynced = initTable[PubsubTopic, HashSet[ContentTopic]]() + + while true: + await self.edgeFilterWakeup.wait() + await sleepAsync(EdgeFilterSubLoopDebounce) + self.edgeFilterWakeup.clear() + trace "edgeFilterSubLoop: woke up" + + if isNil(self.node.wakuFilterClient): + trace "edgeFilterSubLoop: wakuFilterClient is nil, skipping" + continue + + let desired = self.contentTopicSubs + + trace "edgeFilterSubLoop: desired state", numShards = desired.len + + let allShards = toHashSet(toSeq(desired.keys)) + toHashSet(toSeq(lastSynced.keys)) + + for shard in allShards: + let currTopics = desired.getOrDefault(shard) + let prevTopics = lastSynced.getOrDefault(shard) + + if shard notin self.edgeFilterSubStates: + self.edgeFilterSubStates[shard] = + EdgeFilterSubState(currentHealth: TopicHealth.UNHEALTHY) + + let addedTopics = toSeq(currTopics - prevTopics) + let removedTopics = toSeq(prevTopics - currTopics) + + self.edgeFilterSubStates.withValue(shard, state): + state.peers.keepItIf( + self.node.peerManager.switch.peerStore.isConnected(it.peerId) + ) + state.pending.keepItIf(not it.finished) + + if addedTopics.len > 0 or removedTopics.len > 0: + for peer in state.peers: + asyncSpawn self.syncFilterDeltas(peer, shard, addedTopics, removedTopics) + + if currTopics.len == 0: + for fut in state.pending: + if not fut.finished: + await fut.cancelAndWait() + self.edgeFilterSubStates.del(shard) + # invalidates `state` — do not use after this + else: + self.updateShardHealth(shard, state[]) + + let needed = max(0, HealthyThreshold - state.peers.len - state.pending.len) + + if needed > 0: + let tracked = state.peers.mapIt(it.peerId).toHashSet() + state.pendingPeers + var candidates = self.node.peerManager.selectPeers( + filter_common.WakuFilterSubscribeCodec, some(shard) + ) + candidates.keepItIf(it.peerId notin tracked) + + let toDial = min(needed, candidates.len) + + trace "edgeFilterSubLoop: shard reconciliation", + shard = shard, + num_peers = state.peers.len, + num_pending = state.pending.len, + num_needed = needed, + num_available = candidates.len, + toDial = toDial + + for i in 0 ..< toDial: + let fut = self.dialFilterPeer(candidates[i], shard, toSeq(currTopics)) + state.pending.add(fut) + + lastSynced = desired + +proc startEdgeFilterLoops(self: SubscriptionManager): Result[void, string] = + ## Start the edge filter orchestration loops. + ## Caller must ensure this is only called in edge mode (relay nil, filter client present). + self.edgeFilterWakeup = newAsyncEvent() + + self.peerEventListener = WakuPeerEvent.listen( + self.node.brokerCtx, + proc(evt: WakuPeerEvent) {.async: (raises: []), gcsafe.} = + if evt.kind == WakuPeerEventKind.EventDisconnected or + evt.kind == WakuPeerEventKind.EventMetadataUpdated: + self.edgeFilterWakeup.fire() + , + ).valueOr: + return err("Failed to listen to peer events for edge filter: " & error) + + self.edgeFilterSubLoopFut = self.edgeFilterSubLoop() + self.edgeFilterHealthLoopFut = self.edgeFilterHealthLoop() + return ok() + +proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} = + ## Stop the edge filter orchestration loops and clean up pending futures. + if not isNil(self.edgeFilterSubLoopFut): + await self.edgeFilterSubLoopFut.cancelAndWait() + self.edgeFilterSubLoopFut = nil + + if not isNil(self.edgeFilterHealthLoopFut): + await self.edgeFilterHealthLoopFut.cancelAndWait() + self.edgeFilterHealthLoopFut = nil + + for shard, state in self.edgeFilterSubStates: + for fut in state.pending: + if not fut.finished: + await fut.cancelAndWait() + + WakuPeerEvent.dropListener(self.node.brokerCtx, self.peerEventListener) + +# --------------------------------------------------------------------------- +# SubscriptionManager Lifecycle (calls Edge behavior above) +# +# startSubscriptionManager and stopSubscriptionManager orchestrate both the +# core (relay) and edge (filter) paths, and register/clear broker providers. +# --------------------------------------------------------------------------- + +proc startSubscriptionManager*(self: SubscriptionManager): Result[void, string] = + # Register edge filter broker providers. The shard/content health providers + # in WakuNode query these via the broker as a fallback when relay health is + # not available. If edge mode is not active, these providers simply return + # NOT_SUBSCRIBED / strength 0, which is harmless. + RequestEdgeShardHealth.setProvider( + self.node.brokerCtx, + proc(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] = + self.edgeFilterSubStates.withValue(shard, state): + return ok(RequestEdgeShardHealth(health: state.currentHealth)) + return ok(RequestEdgeShardHealth(health: TopicHealth.NOT_SUBSCRIBED)), + ).isOkOr: + error "Can't set provider for RequestEdgeShardHealth", error = error + + RequestEdgeFilterPeerCount.setProvider( + self.node.brokerCtx, + proc(): Result[RequestEdgeFilterPeerCount, string] = + var minPeers = high(int) + for state in self.edgeFilterSubStates.values: + minPeers = min(minPeers, state.peers.len) + if minPeers == high(int): + minPeers = 0 + return ok(RequestEdgeFilterPeerCount(peerCount: minPeers)), + ).isOkOr: + error "Can't set provider for RequestEdgeFilterPeerCount", error = error + + if self.node.wakuRelay.isNil(): + return self.startEdgeFilterLoops() + + # Core mode: auto-subscribe relay to all shards in autosharding. + if self.node.wakuAutoSharding.isSome(): + let autoSharding = self.node.wakuAutoSharding.get() + let clusterId = autoSharding.clusterId + let numShards = autoSharding.shardCountGenZero + + if numShards > 0: + var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards) + + for i in 0 ..< numShards: + let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i)) + clusterPubsubTopics.add(PubsubTopic($shardObj)) + + self.subscribePubsubTopics(clusterPubsubTopics).isOkOr: + error "Failed to auto-subscribe Relay to cluster shards: ", error = error + else: + info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe." + + return ok() + +proc stopSubscriptionManager*(self: SubscriptionManager) {.async: (raises: []).} = + if self.node.wakuRelay.isNil(): + await self.stopEdgeFilterLoops() + RequestEdgeShardHealth.clearProvider(self.node.brokerCtx) + RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx) diff --git a/waku/node/health_monitor/connection_status.nim b/waku/node/health_monitor/connection_status.nim index 77696130a..68ec9d4be 100644 --- a/waku/node/health_monitor/connection_status.nim +++ b/waku/node/health_monitor/connection_status.nim @@ -2,6 +2,9 @@ import chronos, results, std/strutils, ../../api/types export ConnectionStatus +const HealthyThreshold* = 2 + ## Minimum peers required per service protocol for a "Connected" status (excluding Relay). + proc init*( t: typedesc[ConnectionStatus], strRep: string ): Result[ConnectionStatus, string] = diff --git a/waku/node/health_monitor/node_health_monitor.nim b/waku/node/health_monitor/node_health_monitor.nim index 79bf9f92a..066e7776a 100644 --- a/waku/node/health_monitor/node_health_monitor.nim +++ b/waku/node/health_monitor/node_health_monitor.nim @@ -21,6 +21,7 @@ import node/health_monitor/health_report, node/health_monitor/connection_status, node/health_monitor/protocol_health, + requests/health_requests, ] ## This module is aimed to check the state of the "self" Waku Node @@ -29,9 +30,6 @@ import # if not called, the outcome of randomization procedures will be the same in every run random.randomize() -const HealthyThreshold* = 2 - ## minimum peers required for all services for a Connected status, excluding Relay - type NodeHealthMonitor* = ref object nodeHealth: HealthStatus node: WakuNode @@ -48,7 +46,8 @@ type NodeHealthMonitor* = ref object ## latest known connectivity strength (e.g. connected peer count) metric for each protocol. ## if it doesn't make sense for the protocol in question, this is set to zero. relayObserver: PubSubObserver - peerEventListener: EventWakuPeerListener + peerEventListener: WakuPeerEventListener + shardHealthListener: EventShardTopicHealthChangeListener func getHealth*(report: HealthReport, kind: WakuProtocol): ProtocolHealth = for h in report.protocolsHealth: @@ -198,6 +197,17 @@ proc getFilterClientHealth(hm: NodeHealthMonitor): ProtocolHealth = hm.strength[WakuProtocol.FilterClientProtocol] = 0 return p.notMounted() + if isNil(hm.node.wakuRelay): + let edgeRes = RequestEdgeFilterPeerCount.request(hm.node.brokerCtx) + if edgeRes.isOk(): + let peerCount = edgeRes.get().peerCount + if peerCount > 0: + hm.strength[WakuProtocol.FilterClientProtocol] = peerCount + return p.ready() + else: + error "Failed to request edge filter peer count", error = edgeRes.error + return p.notReady("Failed to request edge filter peer count: " & edgeRes.error) + let peerCount = countCapablePeers(hm, WakuFilterSubscribeCodec) hm.strength[WakuProtocol.FilterClientProtocol] = peerCount @@ -663,14 +673,23 @@ proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] = ) hm.node.wakuRelay.addObserver(hm.relayObserver) - hm.peerEventListener = EventWakuPeer.listen( + hm.peerEventListener = WakuPeerEvent.listen( hm.node.brokerCtx, - proc(evt: EventWakuPeer): Future[void] {.async: (raises: []), gcsafe.} = + proc(evt: WakuPeerEvent): Future[void] {.async: (raises: []), gcsafe.} = ## Recompute health on any peer changing anything (join, leave, identify, metadata update) hm.healthUpdateEvent.fire(), ).valueOr: return err("Failed to subscribe to peer events: " & error) + hm.shardHealthListener = EventShardTopicHealthChange.listen( + hm.node.brokerCtx, + proc( + evt: EventShardTopicHealthChange + ): Future[void] {.async: (raises: []), gcsafe.} = + hm.healthUpdateEvent.fire(), + ).valueOr: + return err("Failed to subscribe to shard health events: " & error) + hm.healthUpdateEvent = newAsyncEvent() hm.healthUpdateEvent.fire() @@ -690,8 +709,8 @@ proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} = if not isNil(hm.healthLoopFut): await hm.healthLoopFut.cancelAndWait() - if hm.peerEventListener.id != 0: - EventWakuPeer.dropListener(hm.node.brokerCtx, hm.peerEventListener) + WakuPeerEvent.dropListener(hm.node.brokerCtx, hm.peerEventListener) + EventShardTopicHealthChange.dropListener(hm.node.brokerCtx, hm.shardHealthListener) if not isNil(hm.node.wakuRelay) and not isNil(hm.relayObserver): hm.node.wakuRelay.removeObserver(hm.relayObserver) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index dc0da9624..e3eb8d75b 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -215,31 +215,34 @@ proc loadFromStorage(pm: PeerManager) {.gcsafe.} = trace "recovered peers from storage", amount = amount -proc selectPeer*( +proc selectPeers*( pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic) -): Option[RemotePeerInfo] = - # Selects the best peer for a given protocol - +): seq[RemotePeerInfo] = + ## Returns all peers that support the given protocol (and optionally shard), + ## shuffled randomly. Callers can further filter or pick from this list. var peers = pm.switch.peerStore.getPeersByProtocol(proto) - trace "Selecting peer from peerstore", - protocol = proto, peers, address = cast[uint](pm.switch.peerStore) + trace "Selecting peers from peerstore", + protocol = proto, num_peers = peers.len, address = cast[uint](pm.switch.peerStore) if shard.isSome(): - # Parse the shard from the pubsub topic to get cluster and shard ID let shardInfo = RelayShard.parse(shard.get()).valueOr: trace "Failed to parse shard from pubsub topic", topic = shard.get() - return none(RemotePeerInfo) + return @[] - # Filter peers that support the requested shard - # Check both ENR (if present) and the shards field on RemotePeerInfo peers.keepItIf( - # Check ENR if available (it.enr.isSome() and it.enr.get().containsShard(shard.get())) or - # Otherwise check the shards field directly - (it.shards.len > 0 and it.shards.contains(shardInfo.shardId)) + (it.shards.len > 0 and it.shards.contains(shardInfo.shardId)) ) shuffle(peers) + return peers + +proc selectPeer*( + pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic) +): Option[RemotePeerInfo] = + ## Selects a single peer for a given protocol, checking service slots first + ## (for non-relay protocols). + let peers = pm.selectPeers(proto, shard) # No criteria for selecting a peer for WakuRelay, random one if proto == WakuRelayCodec: @@ -742,7 +745,7 @@ proc refreshPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = # TODO: should only trigger an event if metadata actually changed # should include the shard subscription delta in the event when # it is a MetadataUpdated event - EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventMetadataUpdated) + WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventMetadataUpdated) return info "disconnecting from peer", peerId = peerId, reason = reason @@ -787,7 +790,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = asyncSpawn(pm.switch.disconnect(peerId)) peerStore.delete(peerId) - EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventConnected) + WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventConnected) if not pm.onConnectionChange.isNil(): # we don't want to await for the callback to finish @@ -804,7 +807,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = pm.ipTable.del(ip) break - EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventDisconnected) + WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventDisconnected) if not pm.onConnectionChange.isNil(): # we don't want to await for the callback to finish @@ -812,7 +815,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = of PeerEventKind.Identified: info "event identified", peerId = peerId - EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventIdentified) + WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventIdentified) peerStore[ConnectionBook][peerId] = connectedness peerStore[DirectionBook][peerId] = direction diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 92528c7b9..506a3e592 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[options, tables, strutils, sequtils, os, net, random], + std/[options, tables, strutils, sequtils, os, net, random, sets], chronos, chronicles, metrics, @@ -38,7 +38,6 @@ import waku_store/resume, waku_store_sync, waku_filter_v2, - waku_filter_v2/common as filter_common, waku_filter_v2/client as filter_client, waku_metadata, waku_rendezvous/protocol, @@ -60,7 +59,7 @@ import requests/node_requests, requests/health_requests, events/health_events, - events/peer_events, + events/message_events, ], waku/discovery/waku_kademlia, ./net_config, @@ -95,9 +94,6 @@ const clientId* = "Nimbus Waku v2 node" const WakuNodeVersionString* = "version / git commit hash: " & git_version -const EdgeTopicHealthyThreshold = 2 - ## Lightpush server and filter server requirement for a healthy topic in edge mode - # key and crypto modules different type # TODO: Move to application instance (e.g., `WakuNode2`) @@ -142,10 +138,6 @@ type legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler] ## Kernel API Relay appHandlers (if any) wakuMix*: WakuMix - edgeTopicsHealth*: Table[PubsubTopic, TopicHealth] - edgeHealthEvent*: AsyncEvent - edgeHealthLoop: Future[void] - peerEventListener*: EventWakuPeerListener kademliaDiscoveryLoop*: Future[void] wakuKademlia*: WakuKademlia @@ -498,52 +490,7 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] return ok() -proc calculateEdgeTopicHealth(node: WakuNode, shard: PubsubTopic): TopicHealth = - let filterPeers = - node.peerManager.getPeersForShard(filter_common.WakuFilterSubscribeCodec, shard) - let lightpushPeers = - node.peerManager.getPeersForShard(lightpush_protocol.WakuLightPushCodec, shard) - - if filterPeers >= EdgeTopicHealthyThreshold and - lightpushPeers >= EdgeTopicHealthyThreshold: - return TopicHealth.SUFFICIENTLY_HEALTHY - elif filterPeers > 0 and lightpushPeers > 0: - return TopicHealth.MINIMALLY_HEALTHY - - return TopicHealth.UNHEALTHY - -proc loopEdgeHealth(node: WakuNode) {.async.} = - while node.started: - await node.edgeHealthEvent.wait() - node.edgeHealthEvent.clear() - - try: - for shard in node.edgeTopicsHealth.keys: - if not node.wakuRelay.isNil and node.wakuRelay.isSubscribed(shard): - continue - - let oldHealth = node.edgeTopicsHealth.getOrDefault(shard, TopicHealth.UNHEALTHY) - let newHealth = node.calculateEdgeTopicHealth(shard) - if newHealth != oldHealth: - node.edgeTopicsHealth[shard] = newHealth - EventShardTopicHealthChange.emit(node.brokerCtx, shard, newHealth) - except CancelledError: - break - except CatchableError as e: - warn "Error in edge health check", error = e.msg - - # safety cooldown to protect from edge cases - await sleepAsync(100.milliseconds) - proc startProvidersAndListeners*(node: WakuNode) = - node.peerEventListener = EventWakuPeer.listen( - node.brokerCtx, - proc(evt: EventWakuPeer) {.async: (raises: []), gcsafe.} = - node.edgeHealthEvent.fire(), - ).valueOr: - error "Failed to listen to peer events", error = error - return - RequestRelayShard.setProvider( node.brokerCtx, proc( @@ -561,14 +508,23 @@ proc startProvidersAndListeners*(node: WakuNode) = var response: RequestShardTopicsHealth for shard in topics: - var healthStatus = TopicHealth.UNHEALTHY + # Health resolution order: + # 1. Relay topicsHealth (computed from gossipsub mesh state) + # 2. If relay is active but topicsHealth hasn't computed yet, UNHEALTHY + # 3. Otherwise, ask edge filter (via broker; no-op if no provider set) + var healthStatus = TopicHealth.NOT_SUBSCRIBED if not node.wakuRelay.isNil: healthStatus = node.wakuRelay.topicsHealth.getOrDefault(shard, TopicHealth.NOT_SUBSCRIBED) if healthStatus == TopicHealth.NOT_SUBSCRIBED: - healthStatus = node.calculateEdgeTopicHealth(shard) + if not node.wakuRelay.isNil and node.wakuRelay.isSubscribed(shard): + healthStatus = TopicHealth.UNHEALTHY + else: + let edgeRes = RequestEdgeShardHealth.request(node.brokerCtx, shard) + if edgeRes.isOk(): + healthStatus = edgeRes.get().health response.topicHealth.add((shard, healthStatus)) @@ -594,9 +550,10 @@ proc startProvidersAndListeners*(node: WakuNode) = pubsubTopic, TopicHealth.NOT_SUBSCRIBED ) - if topicHealth == TopicHealth.NOT_SUBSCRIBED and - pubsubTopic in node.edgeTopicsHealth: - topicHealth = node.calculateEdgeTopicHealth(pubsubTopic) + if topicHealth == TopicHealth.NOT_SUBSCRIBED: + let edgeRes = RequestEdgeShardHealth.request(node.brokerCtx, pubsubTopic) + if edgeRes.isOk(): + topicHealth = edgeRes.get().health response.contentTopicHealth.add((topic: contentTopic, health: topicHealth)) @@ -605,7 +562,6 @@ proc startProvidersAndListeners*(node: WakuNode) = error "Can't set provider for RequestContentTopicsHealth", error = error proc stopProvidersAndListeners*(node: WakuNode) = - EventWakuPeer.dropListener(node.brokerCtx, node.peerEventListener) RequestRelayShard.clearProvider(node.brokerCtx) RequestContentTopicsHealth.clearProvider(node.brokerCtx) RequestShardTopicsHealth.clearProvider(node.brokerCtx) @@ -658,13 +614,16 @@ proc start*(node: WakuNode) {.async.} = ## The switch will update addresses after start using the addressMapper await node.switch.start() - node.edgeHealthEvent = newAsyncEvent() - node.edgeHealthLoop = loopEdgeHealth(node) + node.started = true + + if not node.wakuFilterClient.isNil(): + node.wakuFilterClient.registerPushHandler( + proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + MessageSeenEvent.emit(node.brokerCtx, pubsubTopic, msg) + ) node.startProvidersAndListeners() - node.started = true - if not zeroPortPresent: updateAnnouncedAddrWithPrimaryIpAddr(node).isOkOr: error "failed update announced addr", error = $error @@ -678,10 +637,6 @@ proc stop*(node: WakuNode) {.async.} = node.stopProvidersAndListeners() - if not node.edgeHealthLoop.isNil: - await node.edgeHealthLoop.cancelAndWait() - node.edgeHealthLoop = nil - await node.switch.stop() node.peerManager.stop() diff --git a/waku/requests/health_requests.nim b/waku/requests/health_requests.nim index 3554922b3..c3a0ce286 100644 --- a/waku/requests/health_requests.nim +++ b/waku/requests/health_requests.nim @@ -37,3 +37,15 @@ RequestBroker: healthStatus*: ProtocolHealth proc signature(protocol: WakuProtocol): Future[Result[RequestProtocolHealth, string]] + +# Get edge filter health for a single shard (set by DeliveryService when edge mode is active) +RequestBroker(sync): + type RequestEdgeShardHealth* = object + health*: TopicHealth + + proc signature(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] + +# Get edge filter confirmed peer count (set by DeliveryService when edge mode is active) +RequestBroker(sync): + type RequestEdgeFilterPeerCount* = object + peerCount*: int diff --git a/waku/waku_core/subscription.nim b/waku/waku_core/subscription.nim index 19f3386ef..8694efa1b 100644 --- a/waku/waku_core/subscription.nim +++ b/waku/waku_core/subscription.nim @@ -1,3 +1,3 @@ -import ./subscription/subscription_manager, ./subscription/push_handler +import ./subscription/push_handler -export subscription_manager, push_handler +export push_handler diff --git a/waku/waku_core/subscription/subscription_manager.nim b/waku/waku_core/subscription/subscription_manager.nim deleted file mode 100644 index ccade763b..000000000 --- a/waku/waku_core/subscription/subscription_manager.nim +++ /dev/null @@ -1,52 +0,0 @@ -{.push raises: [].} - -import std/tables, results, chronicles, chronos - -import ./push_handler, ../topics, ../message - -## Subscription manager -type LegacySubscriptionManager* = object - subscriptions: TableRef[(string, ContentTopic), FilterPushHandler] - -proc init*(T: type LegacySubscriptionManager): T = - LegacySubscriptionManager( - subscriptions: newTable[(string, ContentTopic), FilterPushHandler]() - ) - -proc clear*(m: var LegacySubscriptionManager) = - m.subscriptions.clear() - -proc registerSubscription*( - m: LegacySubscriptionManager, - pubsubTopic: PubsubTopic, - contentTopic: ContentTopic, - handler: FilterPushHandler, -) = - try: - # TODO: Handle over subscription surprises - m.subscriptions[(pubsubTopic, contentTopic)] = handler - except CatchableError: - error "failed to register filter subscription", error = getCurrentExceptionMsg() - -proc removeSubscription*( - m: LegacySubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic -) = - m.subscriptions.del((pubsubTopic, contentTopic)) - -proc notifySubscriptionHandler*( - m: LegacySubscriptionManager, - pubsubTopic: PubsubTopic, - contentTopic: ContentTopic, - message: WakuMessage, -) = - if not m.subscriptions.hasKey((pubsubTopic, contentTopic)): - return - - try: - let handler = m.subscriptions[(pubsubTopic, contentTopic)] - asyncSpawn handler(pubsubTopic, message) - except CatchableError: - discard - -proc getSubscriptionsCount*(m: LegacySubscriptionManager): int = - m.subscriptions.len() diff --git a/waku/waku_filter_v2/client.nim b/waku/waku_filter_v2/client.nim index ba8cd3d0c..265bf5e7b 100644 --- a/waku/waku_filter_v2/client.nim +++ b/waku/waku_filter_v2/client.nim @@ -101,12 +101,20 @@ proc sendSubscribeRequest( return ok() proc ping*( - wfc: WakuFilterClient, servicePeer: RemotePeerInfo + wfc: WakuFilterClient, servicePeer: RemotePeerInfo, timeout = chronos.seconds(0) ): Future[FilterSubscribeResult] {.async.} = info "sending ping", servicePeer = shortLog($servicePeer) let requestId = generateRequestId(wfc.rng) let filterSubscribeRequest = FilterSubscribeRequest.ping(requestId) + if timeout > chronos.seconds(0): + let fut = wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + if not await fut.withTimeout(timeout): + return err( + FilterSubscribeError.parse(uint32(FilterSubscribeErrorKind.PEER_DIAL_FAILURE)) + ) + return fut.read() + return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) proc subscribe*( diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 17470af29..490feae87 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -157,7 +157,7 @@ type ): Future[ValidationResult] {.gcsafe, raises: [Defect].} WakuRelay* = ref object of GossipSub brokerCtx: BrokerContext - peerEventListener: EventWakuPeerListener + peerEventListener: WakuPeerEventListener # seq of tuples: the first entry in the tuple contains the validators are called for every topic # the second entry contains the error messages to be returned when the validator fails wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]] @@ -376,9 +376,9 @@ proc new*( w.initProtocolHandler() w.initRelayObservers() - w.peerEventListener = EventWakuPeer.listen( + w.peerEventListener = WakuPeerEvent.listen( w.brokerCtx, - proc(evt: EventWakuPeer): Future[void] {.async: (raises: []), gcsafe.} = + proc(evt: WakuPeerEvent): Future[void] {.async: (raises: []), gcsafe.} = if evt.kind == WakuPeerEventKind.EventDisconnected: w.topicHealthCheckAll = true w.topicHealthUpdateEvent.fire() @@ -524,8 +524,7 @@ method stop*(w: WakuRelay) {.async, base.} = info "stop" await procCall GossipSub(w).stop() - if w.peerEventListener.id != 0: - EventWakuPeer.dropListener(w.brokerCtx, w.peerEventListener) + WakuPeerEvent.dropListener(w.brokerCtx, w.peerEventListener) if not w.topicHealthLoopHandle.isNil(): await w.topicHealthLoopHandle.cancelAndWait()