feat: active filter subscription management for edge nodes (#3773)

feat: active filter subscription management for edge nodes

## Subscription Manager
* edgeFilterSubLoop reconciles desired vs actual filter subscriptions
* edgeFilterHealthLoop pings filter peers, evicts stale ones
* EdgeFilterSubState per-shard tracking of confirmed peers and health
* best-effort unsubscribe on peer removal
* RequestEdgeShardHealth and RequestEdgeFilterPeerCount broker providers

## WakuNode
* Remove old edge health loop (loopEdgeHealth, edgeHealthEvent, calculateEdgeTopicHealth)
* Register MessageSeenEvent push handler on filter client during start
* startDeliveryService now returns `Result[void, string]` and propagates errors

## Health Monitor
* getFilterClientHealth queries RequestEdgeFilterPeerCount via broker
* Shard/content health providers fall back to RequestEdgeShardHealth when relay inactive
* Listen to EventShardTopicHealthChange for health recalculation
* Add missing return p.notReady() on failed edge filter peer count request
* HealthyThreshold constant moved to `connection_status.nim`

## Broker types
* RequestEdgeShardHealth, RequestEdgeFilterPeerCount request types
* EventShardTopicHealthChange event type

## Filter Client
* Add timeout parameter to ping proc

## Tests
* Health monitor event tests with per-node lockNewGlobalBrokerContext
* Edge (light client) health update test
* Edge health driven by confirmed filter subscriptions test
* API subscription tests: sub/receive, failover, peer replacement

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
Co-authored by Zoltan Nagy
This commit is contained in:
Fabiana Cecin 2026-03-30 08:30:34 -03:00 committed by GitHub
parent 0623c10635
commit dc026bbff1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 1126 additions and 266 deletions

View File

@ -1,6 +1,6 @@
{.used.}
import std/[strutils, net, options, sets]
import std/[strutils, sequtils, net, options, sets, tables]
import chronos, testutils/unittests, stew/byteutils
import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto]
import ../testlib/[common, wakucore, wakunode, testasync]
@ -13,12 +13,12 @@ import
common/broker/broker_context,
events/message_events,
waku_relay/protocol,
node/kernel_api/filter,
node/delivery_service/subscription_manager,
]
import waku/factory/waku_conf
import tools/confutils/cli_args
# TODO: Edge testing (after MAPI edge support is completed)
const TestTimeout = chronos.seconds(10)
const NegativeTestTimeout = chronos.seconds(2)
@ -60,8 +60,10 @@ proc waitForEvents(
return await manager.receivedEvent.wait().withTimeout(timeout)
type TestNetwork = ref object
publisher: WakuNode
publisher: WakuNode # Relay node that publishes messages in tests.
meshBuddy: WakuNode # Extra relay peer for publisher's mesh (Edge tests only).
subscriber: Waku
# The receiver node in tests. Edge node in edge tests, Core node in relay tests.
publisherPeerInfo: RemotePeerInfo
proc createApiNodeConf(
@ -94,8 +96,12 @@ proc setupNetwork(
lockNewGlobalBrokerContext:
net.publisher =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
net.publisher.mountMetadata(3, @[0'u16]).expect("Failed to mount metadata")
net.publisher.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect(
"Failed to mount metadata"
)
(await net.publisher.mountRelay()).expect("Failed to mount relay")
if mode == cli_args.WakuMode.Edge:
await net.publisher.mountFilter()
await net.publisher.mountLibp2pPing()
await net.publisher.start()
@ -104,16 +110,32 @@ proc setupNetwork(
proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
discard
# Subscribe the publisher to all shards to guarantee a GossipSub mesh with the subscriber.
# Currently, Core/Relay nodes auto-subscribe to all network shards on boot, but if
# that changes, this will be needed to cause the publisher to have shard interest
# for any shards the subscriber may want to use, which is required for waitForMesh to work.
var shards: seq[PubsubTopic]
for i in 0 ..< numShards.int:
let shard = PubsubTopic("/waku/2/rs/3/" & $i)
shards.add(PubsubTopic("/waku/2/rs/3/" & $i))
for shard in shards:
net.publisher.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect(
"Failed to sub publisher"
)
if mode == cli_args.WakuMode.Edge:
lockNewGlobalBrokerContext:
net.meshBuddy =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
net.meshBuddy.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect(
"Failed to mount metadata on meshBuddy"
)
(await net.meshBuddy.mountRelay()).expect("Failed to mount relay on meshBuddy")
await net.meshBuddy.start()
for shard in shards:
net.meshBuddy.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect(
"Failed to sub meshBuddy"
)
await net.meshBuddy.connectToNodes(@[net.publisherPeerInfo])
net.subscriber = await setupSubscriberNode(createApiNodeConf(mode, numShards))
await net.subscriber.node.connectToNodes(@[net.publisherPeerInfo])
@ -125,6 +147,10 @@ proc teardown(net: TestNetwork) {.async.} =
(await net.subscriber.stop()).expect("Failed to stop subscriber node")
net.subscriber = nil
if not isNil(net.meshBuddy):
await net.meshBuddy.stop()
net.meshBuddy = nil
if not isNil(net.publisher):
await net.publisher.stop()
net.publisher = nil
@ -141,18 +167,34 @@ proc waitForMesh(node: WakuNode, shard: PubsubTopic) {.async.} =
await sleepAsync(100.milliseconds)
raise newException(ValueError, "GossipSub Mesh failed to stabilize on " & shard)
proc waitForEdgeSubs(w: Waku, shard: PubsubTopic) {.async.} =
let sm = w.deliveryService.subscriptionManager
for _ in 0 ..< 50:
if sm.edgeFilterPeerCount(shard) > 0:
return
await sleepAsync(100.milliseconds)
raise newException(ValueError, "Edge filter subscription failed on " & shard)
proc publishToMesh(
net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte]
): Future[Result[int, string]] {.async.} =
# Publishes a message from "publisher" via relay into the gossipsub mesh.
let shard = net.subscriber.node.getRelayShard(contentTopic)
await waitForMesh(net.publisher, shard)
let msg = WakuMessage(
payload: payload, contentTopic: contentTopic, version: 0, timestamp: now()
)
return await net.publisher.publish(some(shard), msg)
proc publishToMeshAfterEdgeReady(
net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte]
): Future[Result[int, string]] {.async.} =
# First, ensure "subscriber" node (an edge node) is subscribed and ready to receive.
# Afterwards, "publisher" (relay node) sends the message in the gossipsub network.
let shard = net.subscriber.node.getRelayShard(contentTopic)
await waitForEdgeSubs(net.subscriber, shard)
return await net.publishToMesh(contentTopic, payload)
suite "Messaging API, SubscriptionManager":
asyncTest "Subscription API, relay node auto subscribe and receive message":
let net = await setupNetwork(1)
@ -398,3 +440,370 @@ suite "Messaging API, SubscriptionManager":
activeSubs.add(t)
await verifyNetworkState(activeSubs)
asyncTest "Subscription API, edge node subscribe and receive message":
let net = await setupNetwork(1, cli_args.WakuMode.Edge)
defer:
await net.teardown()
let testTopic = ContentTopic("/waku/2/test-content/proto")
(await net.subscriber.subscribe(testTopic)).expect("failed to subscribe")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
discard (await net.publishToMeshAfterEdgeReady(testTopic, "Hello, edge!".toBytes())).expect(
"Publish failed"
)
require await eventManager.waitForEvents(TestTimeout)
require eventManager.receivedMessages.len == 1
check eventManager.receivedMessages[0].contentTopic == testTopic
asyncTest "Subscription API, edge node ignores unsubscribed content topics":
let net = await setupNetwork(1, cli_args.WakuMode.Edge)
defer:
await net.teardown()
let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto")
let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto")
(await net.subscriber.subscribe(subbedTopic)).expect("failed to subscribe")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
discard (await net.publishToMesh(ignoredTopic, "Ghost Msg".toBytes())).expect(
"Publish failed"
)
check not await eventManager.waitForEvents(NegativeTestTimeout)
check eventManager.receivedMessages.len == 0
asyncTest "Subscription API, edge node unsubscribe stops message receipt":
let net = await setupNetwork(1, cli_args.WakuMode.Edge)
defer:
await net.teardown()
let testTopic = ContentTopic("/waku/2/unsub-test/proto")
(await net.subscriber.subscribe(testTopic)).expect("failed to subscribe")
net.subscriber.unsubscribe(testTopic).expect("failed to unsubscribe")
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
discard (await net.publishToMesh(testTopic, "Should be dropped".toBytes())).expect(
"Publish failed"
)
check not await eventManager.waitForEvents(NegativeTestTimeout)
check eventManager.receivedMessages.len == 0
asyncTest "Subscription API, edge node overlapping topics isolation":
let net = await setupNetwork(1, cli_args.WakuMode.Edge)
defer:
await net.teardown()
let topicA = ContentTopic("/waku/2/topic-a/proto")
let topicB = ContentTopic("/waku/2/topic-b/proto")
(await net.subscriber.subscribe(topicA)).expect("failed to sub A")
(await net.subscriber.subscribe(topicB)).expect("failed to sub B")
let shard = net.subscriber.node.getRelayShard(topicA)
await waitForEdgeSubs(net.subscriber, shard)
let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
defer:
eventManager.teardown()
net.subscriber.unsubscribe(topicA).expect("failed to unsub A")
discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect(
"Publish A failed"
)
discard
(await net.publishToMesh(topicB, "Kept Msg".toBytes())).expect("Publish B failed")
require await eventManager.waitForEvents(TestTimeout)
require eventManager.receivedMessages.len == 1
check eventManager.receivedMessages[0].contentTopic == topicB
asyncTest "Subscription API, edge node resubscribe after unsubscribe":
let net = await setupNetwork(1, cli_args.WakuMode.Edge)
defer:
await net.teardown()
let testTopic = ContentTopic("/waku/2/resub-test/proto")
(await net.subscriber.subscribe(testTopic)).expect("Initial sub failed")
var eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 1".toBytes())).expect(
"Pub 1 failed"
)
require await eventManager.waitForEvents(TestTimeout)
eventManager.teardown()
net.subscriber.unsubscribe(testTopic).expect("Unsub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
discard
(await net.publishToMesh(testTopic, "Ghost".toBytes())).expect("Ghost pub failed")
check not await eventManager.waitForEvents(NegativeTestTimeout)
eventManager.teardown()
(await net.subscriber.subscribe(testTopic)).expect("Resub failed")
eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1)
discard (await net.publishToMeshAfterEdgeReady(testTopic, "Msg 2".toBytes())).expect(
"Pub 2 failed"
)
require await eventManager.waitForEvents(TestTimeout)
check eventManager.receivedMessages[0].payload == "Msg 2".toBytes()
asyncTest "Subscription API, edge node failover after service peer dies":
# NOTE: This test is a bit more verbose because it defines a custom topology.
# It doesn't use the shared TestNetwork helper.
# This mounts two service peers for the edge node then fails one.
let numShards: uint16 = 1
let shards = @[PubsubTopic("/waku/2/rs/3/0")]
proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
discard
var publisher: WakuNode
lockNewGlobalBrokerContext:
publisher =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
publisher.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect(
"Failed to mount metadata on publisher"
)
(await publisher.mountRelay()).expect("Failed to mount relay on publisher")
await publisher.mountFilter()
await publisher.mountLibp2pPing()
await publisher.start()
for shard in shards:
publisher.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect(
"Failed to sub publisher"
)
let publisherPeerInfo = publisher.peerInfo.toRemotePeerInfo()
var meshBuddy: WakuNode
lockNewGlobalBrokerContext:
meshBuddy =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
meshBuddy.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect(
"Failed to mount metadata on meshBuddy"
)
(await meshBuddy.mountRelay()).expect("Failed to mount relay on meshBuddy")
await meshBuddy.mountFilter()
await meshBuddy.mountLibp2pPing()
await meshBuddy.start()
for shard in shards:
meshBuddy.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect(
"Failed to sub meshBuddy"
)
let meshBuddyPeerInfo = meshBuddy.peerInfo.toRemotePeerInfo()
await meshBuddy.connectToNodes(@[publisherPeerInfo])
let conf = createApiNodeConf(cli_args.WakuMode.Edge, numShards)
var subscriber: Waku
lockNewGlobalBrokerContext:
subscriber = (await createNode(conf)).expect("Failed to create edge subscriber")
(await startWaku(addr subscriber)).expect("Failed to start edge subscriber")
# Connect edge subscriber to both filter servers so selectPeers finds both
await subscriber.node.connectToNodes(@[publisherPeerInfo, meshBuddyPeerInfo])
let testTopic = ContentTopic("/waku/2/failover-test/proto")
let shard = subscriber.node.getRelayShard(testTopic)
(await subscriber.subscribe(testTopic)).expect("Failed to subscribe")
# Wait for dialing both filter servers (HealthyThreshold = 2)
for _ in 0 ..< 100:
if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2:
break
await sleepAsync(100.milliseconds)
check subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2
# Verify message delivery with both servers alive
await waitForMesh(publisher, shard)
var eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1)
let msg1 = WakuMessage(
payload: "Before failover".toBytes(),
contentTopic: testTopic,
version: 0,
timestamp: now(),
)
discard (await publisher.publish(some(shard), msg1)).expect("Publish 1 failed")
require await eventManager.waitForEvents(TestTimeout)
check eventManager.receivedMessages[0].payload == "Before failover".toBytes()
eventManager.teardown()
# Disconnect meshBuddy from edge (keeps relay mesh alive for publishing)
await subscriber.node.disconnectNode(meshBuddyPeerInfo)
# Wait for the dead peer to be pruned
for _ in 0 ..< 50:
if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) < 2:
break
await sleepAsync(100.milliseconds)
check subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 1
# Verify messages still arrive through the surviving filter server (publisher)
eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1)
let msg2 = WakuMessage(
payload: "After failover".toBytes(),
contentTopic: testTopic,
version: 0,
timestamp: now(),
)
discard (await publisher.publish(some(shard), msg2)).expect("Publish 2 failed")
require await eventManager.waitForEvents(TestTimeout)
check eventManager.receivedMessages[0].payload == "After failover".toBytes()
eventManager.teardown()
(await subscriber.stop()).expect("Failed to stop subscriber")
await meshBuddy.stop()
await publisher.stop()
asyncTest "Subscription API, edge node dials replacement after peer eviction":
# 3 service peers: publisher, meshBuddy, sparePeer. Edge subscribes and
# confirms 2 (HealthyThreshold). After one is disconnected, the sub loop
# should detect the loss and dial the spare to recover back to threshold.
let numShards: uint16 = 1
let shards = @[PubsubTopic("/waku/2/rs/3/0")]
proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
discard
var publisher: WakuNode
lockNewGlobalBrokerContext:
publisher =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
publisher.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect(
"Failed to mount metadata on publisher"
)
(await publisher.mountRelay()).expect("Failed to mount relay on publisher")
await publisher.mountFilter()
await publisher.mountLibp2pPing()
await publisher.start()
for shard in shards:
publisher.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect(
"Failed to sub publisher"
)
let publisherPeerInfo = publisher.peerInfo.toRemotePeerInfo()
var meshBuddy: WakuNode
lockNewGlobalBrokerContext:
meshBuddy =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
meshBuddy.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect(
"Failed to mount metadata on meshBuddy"
)
(await meshBuddy.mountRelay()).expect("Failed to mount relay on meshBuddy")
await meshBuddy.mountFilter()
await meshBuddy.mountLibp2pPing()
await meshBuddy.start()
for shard in shards:
meshBuddy.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect(
"Failed to sub meshBuddy"
)
let meshBuddyPeerInfo = meshBuddy.peerInfo.toRemotePeerInfo()
var sparePeer: WakuNode
lockNewGlobalBrokerContext:
sparePeer =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
sparePeer.mountMetadata(3, toSeq(0'u16 ..< numShards)).expect(
"Failed to mount metadata on sparePeer"
)
(await sparePeer.mountRelay()).expect("Failed to mount relay on sparePeer")
await sparePeer.mountFilter()
await sparePeer.mountLibp2pPing()
await sparePeer.start()
for shard in shards:
sparePeer.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect(
"Failed to sub sparePeer"
)
let sparePeerInfo = sparePeer.peerInfo.toRemotePeerInfo()
await meshBuddy.connectToNodes(@[publisherPeerInfo])
await sparePeer.connectToNodes(@[publisherPeerInfo])
let conf = createApiNodeConf(cli_args.WakuMode.Edge, numShards)
var subscriber: Waku
lockNewGlobalBrokerContext:
subscriber = (await createNode(conf)).expect("Failed to create edge subscriber")
(await startWaku(addr subscriber)).expect("Failed to start edge subscriber")
await subscriber.node.connectToNodes(
@[publisherPeerInfo, meshBuddyPeerInfo, sparePeerInfo]
)
let testTopic = ContentTopic("/waku/2/replacement-test/proto")
let shard = subscriber.node.getRelayShard(testTopic)
(await subscriber.subscribe(testTopic)).expect("Failed to subscribe")
# Wait for 2 confirmed peers (HealthyThreshold). The 3rd is available but not dialed.
for _ in 0 ..< 100:
if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2:
break
await sleepAsync(100.milliseconds)
require subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) ==
2
await subscriber.node.disconnectNode(meshBuddyPeerInfo)
# Wait for the sub loop to detect the loss and dial a replacement
for _ in 0 ..< 100:
if subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2:
break
await sleepAsync(100.milliseconds)
check subscriber.deliveryService.subscriptionManager.edgeFilterPeerCount(shard) >= 2
await waitForMesh(publisher, shard)
var eventManager = newReceiveEventListenerManager(subscriber.brokerCtx, 1)
let msg = WakuMessage(
payload: "After replacement".toBytes(),
contentTopic: testTopic,
version: 0,
timestamp: now(),
)
discard (await publisher.publish(some(shard), msg)).expect("Publish failed")
require await eventManager.waitForEvents(TestTimeout)
check eventManager.receivedMessages[0].payload == "After replacement".toBytes()
eventManager.teardown()
(await subscriber.stop()).expect("Failed to stop subscriber")
await sparePeer.stop()
await meshBuddy.stop()
await publisher.stop()

View File

@ -12,12 +12,18 @@ import
node/health_monitor/health_status,
node/health_monitor/connection_status,
node/health_monitor/protocol_health,
node/health_monitor/topic_health,
node/health_monitor/node_health_monitor,
node/delivery_service/delivery_service,
node/delivery_service/subscription_manager,
node/kernel_api/relay,
node/kernel_api/store,
node/kernel_api/lightpush,
node/kernel_api/filter,
events/health_events,
events/peer_events,
waku_archive,
common/broker/broker_context,
]
import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils
@ -129,13 +135,12 @@ suite "Health Monitor - health state calculation":
suite "Health Monitor - events":
asyncTest "Core (relay) health update":
let
nodeAKey = generateSecp256k1Key()
var nodeA: WakuNode
lockNewGlobalBrokerContext:
let nodeAKey = generateSecp256k1Key()
nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0))
(await nodeA.mountRelay()).expect("Node A failed to mount Relay")
await nodeA.start()
(await nodeA.mountRelay()).expect("Node A failed to mount Relay")
await nodeA.start()
let monitorA = NodeHealthMonitor.new(nodeA)
@ -151,17 +156,15 @@ suite "Health Monitor - events":
monitorA.startHealthMonitor().expect("Health monitor failed to start")
let
nodeBKey = generateSecp256k1Key()
var nodeB: WakuNode
lockNewGlobalBrokerContext:
let nodeBKey = generateSecp256k1Key()
nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0))
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
await nodeB.mountStore()
await nodeB.start()
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
await nodeB.mountStore()
await nodeB.start()
await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()])
@ -214,15 +217,20 @@ suite "Health Monitor - events":
await nodeA.stop()
asyncTest "Edge (light client) health update":
let
nodeAKey = generateSecp256k1Key()
var nodeA: WakuNode
lockNewGlobalBrokerContext:
let nodeAKey = generateSecp256k1Key()
nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0))
nodeA.mountLightpushClient()
await nodeA.mountFilterClient()
nodeA.mountStoreClient()
require nodeA.mountAutoSharding(1, 8).isOk
nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata")
await nodeA.start()
nodeA.mountLightpushClient()
await nodeA.mountFilterClient()
nodeA.mountStoreClient()
await nodeA.start()
let ds =
DeliveryService.new(false, nodeA).expect("Failed to create DeliveryService")
ds.startDeliveryService().expect("Failed to start DeliveryService")
let monitorA = NodeHealthMonitor.new(nodeA)
@ -238,23 +246,40 @@ suite "Health Monitor - events":
monitorA.startHealthMonitor().expect("Health monitor failed to start")
let
nodeBKey = generateSecp256k1Key()
var nodeB: WakuNode
lockNewGlobalBrokerContext:
let nodeBKey = generateSecp256k1Key()
nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0))
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
(await nodeB.mountLightpush()).expect("Node B failed to mount lightpush")
await nodeB.mountFilter()
await nodeB.mountStore()
require nodeB.mountAutoSharding(1, 8).isOk
nodeB.mountMetadata(1, toSeq(0'u16 ..< 8'u16)).expect(
"Node B failed to mount metadata"
)
await nodeB.start()
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
(await nodeB.mountLightpush()).expect("Node B failed to mount lightpush")
await nodeB.mountFilter()
await nodeB.mountStore()
await nodeB.start()
var metadataFut = newFuture[void]("waitForMetadata")
let metadataLis = WakuPeerEvent
.listen(
nodeA.brokerCtx,
proc(evt: WakuPeerEvent): Future[void] {.async: (raises: []), gcsafe.} =
if not metadataFut.finished and
evt.kind == WakuPeerEventKind.EventMetadataUpdated:
metadataFut.complete()
,
)
.expect("Failed to listen for metadata")
await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()])
let metadataOk = await metadataFut.withTimeout(TestConnectivityTimeLimit)
WakuPeerEvent.dropListener(nodeA.brokerCtx, metadataLis)
require metadataOk
let connectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotConnected = false
@ -292,4 +317,118 @@ suite "Health Monitor - events":
lastStatus == ConnectionStatus.Disconnected
await monitorA.stopHealthMonitor()
await ds.stopDeliveryService()
await nodeA.stop()
asyncTest "Edge health driven by confirmed filter subscriptions":
var nodeA: WakuNode
lockNewGlobalBrokerContext:
let nodeAKey = generateSecp256k1Key()
nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0))
await nodeA.mountFilterClient()
nodeA.mountLightpushClient()
nodeA.mountStoreClient()
require nodeA.mountAutoSharding(1, 8).isOk
nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata")
await nodeA.start()
let ds =
DeliveryService.new(false, nodeA).expect("Failed to create DeliveryService")
ds.startDeliveryService().expect("Failed to start DeliveryService")
let subMgr = ds.subscriptionManager
var nodeB: WakuNode
lockNewGlobalBrokerContext:
let nodeBKey = generateSecp256k1Key()
nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0))
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
(await nodeB.mountLightpush()).expect("Node B failed to mount lightpush")
await nodeB.mountFilter()
await nodeB.mountStore()
require nodeB.mountAutoSharding(1, 8).isOk
nodeB.mountMetadata(1, toSeq(0'u16 ..< 8'u16)).expect(
"Node B failed to mount metadata"
)
await nodeB.start()
let monitorA = NodeHealthMonitor.new(nodeA)
var
lastStatus = ConnectionStatus.Disconnected
healthSignal = newAsyncEvent()
monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} =
lastStatus = status
healthSignal.fire()
monitorA.startHealthMonitor().expect("Health monitor failed to start")
var metadataFut = newFuture[void]("waitForMetadata")
let metadataLis = WakuPeerEvent
.listen(
nodeA.brokerCtx,
proc(evt: WakuPeerEvent): Future[void] {.async: (raises: []), gcsafe.} =
if not metadataFut.finished and
evt.kind == WakuPeerEventKind.EventMetadataUpdated:
metadataFut.complete()
,
)
.expect("Failed to listen for metadata")
await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()])
let metadataOk = await metadataFut.withTimeout(TestConnectivityTimeLimit)
WakuPeerEvent.dropListener(nodeA.brokerCtx, metadataLis)
require metadataOk
var deadline = Moment.now() + TestConnectivityTimeLimit
while Moment.now() < deadline:
if lastStatus == ConnectionStatus.PartiallyConnected:
break
if await healthSignal.wait().withTimeout(deadline - Moment.now()):
healthSignal.clear()
check lastStatus == ConnectionStatus.PartiallyConnected
var shardHealthFut = newFuture[EventShardTopicHealthChange]("waitForShardHealth")
let shardHealthLis = EventShardTopicHealthChange
.listen(
nodeA.brokerCtx,
proc(
evt: EventShardTopicHealthChange
): Future[void] {.async: (raises: []), gcsafe.} =
if not shardHealthFut.finished and (
evt.health == TopicHealth.MINIMALLY_HEALTHY or
evt.health == TopicHealth.SUFFICIENTLY_HEALTHY
):
shardHealthFut.complete(evt)
,
)
.expect("Failed to listen for shard health")
let contentTopic = ContentTopic("/waku/2/default-content/proto")
subMgr.subscribe(contentTopic).expect("Failed to subscribe")
let shardHealthOk = await shardHealthFut.withTimeout(TestConnectivityTimeLimit)
EventShardTopicHealthChange.dropListener(nodeA.brokerCtx, shardHealthLis)
check shardHealthOk == true
check subMgr.edgeFilterSubStates.len > 0
healthSignal.clear()
deadline = Moment.now() + TestConnectivityTimeLimit
while Moment.now() < deadline:
if lastStatus == ConnectionStatus.PartiallyConnected:
break
if await healthSignal.wait().withTimeout(deadline - Moment.now()):
healthSignal.clear()
check lastStatus == ConnectionStatus.PartiallyConnected
await ds.stopDeliveryService()
await monitorA.stopHealthMonitor()
await nodeB.stop()
await nodeA.stop()

View File

@ -8,6 +8,6 @@ type WakuPeerEventKind* {.pure.} = enum
EventMetadataUpdated
EventBroker:
type EventWakuPeer* = object
type WakuPeerEvent* = object
peerId*: PeerId
kind*: WakuPeerEventKind

View File

@ -416,7 +416,8 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
## Reliability
if not waku[].deliveryService.isNil():
waku[].deliveryService.startDeliveryService()
waku[].deliveryService.startDeliveryService().isOkOr:
return err("failed to start delivery service: " & $error)
## Health Monitor
waku[].healthMonitor.startHealthMonitor().isOkOr:

View File

@ -1,18 +1,13 @@
## This module helps to ensure the correct transmission and reception of messages
import results
import chronos
import chronos, chronicles
import
./recv_service,
./send_service,
./subscription_manager,
waku/[
waku_core,
waku_node,
waku_store/client,
waku_relay/protocol,
waku_lightpush/client,
waku_filter_v2/client,
waku_core, waku_node, waku_store/client, waku_relay/protocol, waku_lightpush/client
]
type DeliveryService* = ref object
@ -37,10 +32,11 @@ proc new*(
)
)
proc startDeliveryService*(self: DeliveryService) =
self.subscriptionManager.startSubscriptionManager()
proc startDeliveryService*(self: DeliveryService): Result[void, string] =
?self.subscriptionManager.startSubscriptionManager()
self.recvService.startRecvService()
self.sendService.startSendService()
return ok()
proc stopDeliveryService*(self: DeliveryService) {.async.} =
await self.sendService.stopSendService()

View File

@ -91,20 +91,20 @@ proc msgChecker(self: RecvService) {.async.} =
self.endTimeToCheck = getNowInNanosecondTime()
var msgHashesInStore = newSeq[WakuMessageHash](0)
for sub in self.subscriptionManager.getActiveSubscriptions():
for pubsubTopic, contentTopics in self.subscriptionManager.subscribedTopics:
let storeResp: StoreQueryResponse = (
await self.node.wakuStoreClient.queryToAny(
StoreQueryRequest(
includeData: false,
pubsubTopic: some(PubsubTopic(sub.pubsubTopic)),
contentTopics: sub.contentTopics,
pubsubTopic: some(pubsubTopic),
contentTopics: toSeq(contentTopics),
startTime: some(self.startTimeToCheck - DelayExtra.nanos),
endTime: some(self.endTimeToCheck + DelayExtra.nanos),
)
)
).valueOr:
error "msgChecker failed to get remote msgHashes",
pubsubTopic = sub.pubsubTopic, cTopics = sub.contentTopics, error = $error
pubsubTopic = pubsubTopic, cTopics = toSeq(contentTopics), error = $error
continue
msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash))
@ -154,10 +154,6 @@ proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionManager): T =
recentReceivedMsgs: @[],
)
# TODO: For MAPI Edge support, either call node.wakuFilterClient.registerPushHandler
# so that the RecvService listens to incoming filter messages,
# or have the filter client emit MessageSeenEvent.
return recvService
proc loopPruneOldMessages(self: RecvService) {.async.} =

View File

@ -1,4 +1,5 @@
import std/[sets, tables, options, strutils], chronos, chronicles, results
import std/[sequtils, sets, tables, options, strutils], chronos, chronicles, results
import libp2p/[peerid, peerinfo]
import
waku/[
waku_core,
@ -6,16 +7,67 @@ import
waku_core/topics/sharding,
waku_node,
waku_relay,
waku_filter_v2/common as filter_common,
waku_filter_v2/client as filter_client,
waku_filter_v2/protocol as filter_protocol,
common/broker/broker_context,
events/delivery_events,
events/health_events,
events/peer_events,
requests/health_requests,
node/peer_manager,
node/health_monitor/topic_health,
node/health_monitor/connection_status,
]
# ---------------------------------------------------------------------------
# Logos Messaging API SubscriptionManager
#
# Maps all topic subscription intent and centralizes all consistency
# maintenance of the pubsub and content topic subscription model across
# the various network drivers that handle topics (Edge/Filter and Core/Relay).
# ---------------------------------------------------------------------------
type EdgeFilterSubState* = object
peers: seq[RemotePeerInfo]
## Filter service peers with confirmed subscriptions on this shard.
pending: seq[Future[void]] ## In-flight dial futures for peers not yet confirmed.
pendingPeers: HashSet[PeerId] ## PeerIds of peers currently being dialed.
currentHealth: TopicHealth
## Cached health derived from peers.len; updated on every peer set change.
func toTopicHealth*(peersCount: int): TopicHealth =
if peersCount >= HealthyThreshold:
TopicHealth.SUFFICIENTLY_HEALTHY
elif peersCount > 0:
TopicHealth.MINIMALLY_HEALTHY
else:
TopicHealth.UNHEALTHY
type SubscriptionManager* = ref object of RootObj
node: WakuNode
contentTopicSubs: Table[PubsubTopic, HashSet[ContentTopic]]
## Map of Shard to ContentTopic needed because e.g. WakuRelay is PubsubTopic only.
## A present key with an empty HashSet value means pubsubtopic already subscribed
## (via subscribePubsubTopics()) but there's no specific content topic interest yet.
edgeFilterSubStates*: Table[PubsubTopic, EdgeFilterSubState]
## Per-shard filter subscription state for edge mode.
edgeFilterWakeup: AsyncEvent
## Signalled when the edge filter sub loop should re-reconcile.
edgeFilterSubLoopFut: Future[void]
edgeFilterHealthLoopFut: Future[void]
peerEventListener: WakuPeerEventListener
## Listener for peer connect/disconnect events (edge filter wakeup).
iterator subscribedTopics*(
self: SubscriptionManager
): (PubsubTopic, HashSet[ContentTopic]) =
for pubsub, topics in self.contentTopicSubs.pairs:
yield (pubsub, topics)
proc edgeFilterPeerCount*(sm: SubscriptionManager, shard: PubsubTopic): int =
sm.edgeFilterSubStates.withValue(shard, state):
return state.peers.len
return 0
proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T =
SubscriptionManager(
@ -25,30 +77,35 @@ proc new*(T: typedesc[SubscriptionManager], node: WakuNode): T =
proc addContentTopicInterest(
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
): Result[void, string] =
var changed = false
if not self.contentTopicSubs.hasKey(shard):
self.contentTopicSubs[shard] = initHashSet[ContentTopic]()
changed = true
self.contentTopicSubs.withValue(shard, cTopics):
if not cTopics[].contains(topic):
cTopics[].incl(topic)
changed = true
# TODO: Call a "subscribe(shard, topic)" on filter client here,
# so the filter client can know that subscriptions changed.
if changed and not isNil(self.edgeFilterWakeup):
self.edgeFilterWakeup.fire()
return ok()
proc removeContentTopicInterest(
self: SubscriptionManager, shard: PubsubTopic, topic: ContentTopic
): Result[void, string] =
var changed = false
self.contentTopicSubs.withValue(shard, cTopics):
if cTopics[].contains(topic):
cTopics[].excl(topic)
changed = true
if cTopics[].len == 0 and isNil(self.node.wakuRelay):
self.contentTopicSubs.del(shard) # We're done with cTopics here
# TODO: Call a "unsubscribe(shard, topic)" on filter client here,
# so the filter client can know that subscriptions changed.
if changed and not isNil(self.edgeFilterWakeup):
self.edgeFilterWakeup.fire()
return ok()
@ -73,46 +130,6 @@ proc subscribePubsubTopics(
return ok()
proc startSubscriptionManager*(self: SubscriptionManager) =
if isNil(self.node.wakuRelay):
return
if self.node.wakuAutoSharding.isSome():
# Subscribe relay to all shards in autosharding.
let autoSharding = self.node.wakuAutoSharding.get()
let clusterId = autoSharding.clusterId
let numShards = autoSharding.shardCountGenZero
if numShards > 0:
var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards)
for i in 0 ..< numShards:
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
clusterPubsubTopics.add(PubsubTopic($shardObj))
self.subscribePubsubTopics(clusterPubsubTopics).isOkOr:
error "Failed to auto-subscribe Relay to cluster shards: ", error = error
else:
info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe."
proc stopSubscriptionManager*(self: SubscriptionManager) {.async.} =
discard
proc getActiveSubscriptions*(
self: SubscriptionManager
): seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] =
var activeSubs: seq[tuple[pubsubTopic: string, contentTopics: seq[ContentTopic]]] =
@[]
for pubsub, cTopicSet in self.contentTopicSubs.pairs:
if cTopicSet.len > 0:
var cTopicSeq = newSeqOfCap[ContentTopic](cTopicSet.len)
for t in cTopicSet:
cTopicSeq.add(t)
activeSubs.add((pubsub, cTopicSeq))
return activeSubs
proc getShardForContentTopic(
self: SubscriptionManager, topic: ContentTopic
): Result[PubsubTopic, string] =
@ -162,3 +179,358 @@ proc unsubscribe*(
?self.removeContentTopicInterest(shard, topic)
return ok()
# ---------------------------------------------------------------------------
# Edge Filter driver for the Logos Messaging API
#
# The SubscriptionManager absorbs natively the responsibility of using the
# Edge Filter protocol to effect subscriptions and message receipt for edge.
# ---------------------------------------------------------------------------
const EdgeFilterSubscribeTimeout = chronos.seconds(15)
## Timeout for a single filter subscribe/unsubscribe RPC to a service peer.
const EdgeFilterPingTimeout = chronos.seconds(5)
## Timeout for a filter ping health check.
const EdgeFilterLoopInterval = chronos.seconds(30)
## Interval for the edge filter health ping loop.
const EdgeFilterSubLoopDebounce = chronos.seconds(1)
## Debounce delay to coalesce rapid-fire wakeups into a single reconciliation pass.
proc updateShardHealth(
self: SubscriptionManager, shard: PubsubTopic, state: var EdgeFilterSubState
) =
## Recompute and emit health for a shard after its peer set changed.
let newHealth = toTopicHealth(state.peers.len)
if newHealth != state.currentHealth:
state.currentHealth = newHealth
EventShardTopicHealthChange.emit(self.node.brokerCtx, shard, newHealth)
proc removePeer(self: SubscriptionManager, shard: PubsubTopic, peerId: PeerId) =
## Remove a peer from edgeFilterSubStates for the given shard,
## update health, and wake the sub loop to dial a replacement.
## Best-effort unsubscribe so the service peer stops pushing to us.
self.edgeFilterSubStates.withValue(shard, state):
var peer: RemotePeerInfo
var found = false
for p in state.peers:
if p.peerId == peerId:
peer = p
found = true
break
if not found:
return
state.peers.keepItIf(it.peerId != peerId)
self.updateShardHealth(shard, state[])
self.edgeFilterWakeup.fire()
if not self.node.wakuFilterClient.isNil():
self.contentTopicSubs.withValue(shard, topics):
let ct = toSeq(topics[])
if ct.len > 0:
proc doUnsubscribe() {.async.} =
discard await self.node.wakuFilterClient.unsubscribe(peer, shard, ct)
asyncSpawn doUnsubscribe()
type SendChunkedFilterRpcKind = enum
FilterSubscribe
FilterUnsubscribe
proc sendChunkedFilterRpc(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
topics: seq[ContentTopic],
kind: SendChunkedFilterRpcKind,
): Future[bool] {.async.} =
## Send a chunked filter subscribe or unsubscribe RPC. Returns true on
## success. On failure the peer is removed and false is returned.
try:
var i = 0
while i < topics.len:
let chunk =
topics[i ..< min(i + filter_protocol.MaxContentTopicsPerRequest, topics.len)]
let fut =
case kind
of FilterSubscribe:
self.node.wakuFilterClient.subscribe(peer, shard, chunk)
of FilterUnsubscribe:
self.node.wakuFilterClient.unsubscribe(peer, shard, chunk)
if not (await fut.withTimeout(EdgeFilterSubscribeTimeout)) or fut.read().isErr():
trace "sendChunkedFilterRpc: chunk failed",
op = kind, shard = shard, peer = peer.peerId
self.removePeer(shard, peer.peerId)
return false
i += filter_protocol.MaxContentTopicsPerRequest
except CatchableError as exc:
debug "sendChunkedFilterRpc: failed",
op = kind, shard = shard, peer = peer.peerId, err = exc.msg
self.removePeer(shard, peer.peerId)
return false
return true
proc syncFilterDeltas(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
added: seq[ContentTopic],
removed: seq[ContentTopic],
) {.async.} =
## Push content topic changes (adds/removes) to an already-tracked peer.
if added.len > 0:
if not await self.sendChunkedFilterRpc(peer, shard, added, FilterSubscribe):
return
if removed.len > 0:
discard await self.sendChunkedFilterRpc(peer, shard, removed, FilterUnsubscribe)
proc dialFilterPeer(
self: SubscriptionManager,
peer: RemotePeerInfo,
shard: PubsubTopic,
contentTopics: seq[ContentTopic],
) {.async.} =
## Subscribe a new peer to all content topics on a shard and start tracking it.
self.edgeFilterSubStates.withValue(shard, state):
state.pendingPeers.incl(peer.peerId)
try:
if not await self.sendChunkedFilterRpc(peer, shard, contentTopics, FilterSubscribe):
return
self.edgeFilterSubStates.withValue(shard, state):
if state.peers.anyIt(it.peerId == peer.peerId):
trace "dialFilterPeer: peer already tracked, skipping duplicate",
shard = shard, peer = peer.peerId
return
state.peers.add(peer)
self.updateShardHealth(shard, state[])
trace "dialFilterPeer: successfully subscribed to all chunks",
shard = shard, peer = peer.peerId, totalPeers = state.peers.len
do:
trace "dialFilterPeer: shard removed while subscribing, discarding result",
shard = shard, peer = peer.peerId
finally:
self.edgeFilterSubStates.withValue(shard, state):
state.pendingPeers.excl(peer.peerId)
proc edgeFilterHealthLoop*(self: SubscriptionManager) {.async.} =
## Periodically pings all connected filter service peers to verify they are
## still alive at the application layer. Peers that fail the ping are removed.
while true:
await sleepAsync(EdgeFilterLoopInterval)
if self.node.wakuFilterClient.isNil():
warn "filter client is nil within edge filter health loop"
continue
var connected = initTable[PeerId, RemotePeerInfo]()
for state in self.edgeFilterSubStates.values:
for peer in state.peers:
if self.node.peerManager.switch.peerStore.isConnected(peer.peerId):
connected[peer.peerId] = peer
var alive = initHashSet[PeerId]()
if connected.len > 0:
var pingTasks: seq[(PeerId, Future[FilterSubscribeResult])] = @[]
for peer in connected.values:
pingTasks.add(
(peer.peerId, self.node.wakuFilterClient.ping(peer, EdgeFilterPingTimeout))
)
# extract future tasks from (PeerId, Future) tuples and await them
await allFutures(pingTasks.mapIt(it[1]))
for (peerId, task) in pingTasks:
if task.read().isOk():
alive.incl(peerId)
var changed = false
for shard, state in self.edgeFilterSubStates.mpairs:
let oldLen = state.peers.len
state.peers.keepItIf(it.peerId notin connected or alive.contains(it.peerId))
if state.peers.len < oldLen:
changed = true
self.updateShardHealth(shard, state)
trace "Edge Filter health degraded by Ping failure",
shard = shard, new = state.currentHealth
if changed:
self.edgeFilterWakeup.fire()
proc edgeFilterSubLoop*(self: SubscriptionManager) {.async.} =
## Reconciles filter subscriptions with the desired state from SubscriptionManager.
var lastSynced = initTable[PubsubTopic, HashSet[ContentTopic]]()
while true:
await self.edgeFilterWakeup.wait()
await sleepAsync(EdgeFilterSubLoopDebounce)
self.edgeFilterWakeup.clear()
trace "edgeFilterSubLoop: woke up"
if isNil(self.node.wakuFilterClient):
trace "edgeFilterSubLoop: wakuFilterClient is nil, skipping"
continue
let desired = self.contentTopicSubs
trace "edgeFilterSubLoop: desired state", numShards = desired.len
let allShards = toHashSet(toSeq(desired.keys)) + toHashSet(toSeq(lastSynced.keys))
for shard in allShards:
let currTopics = desired.getOrDefault(shard)
let prevTopics = lastSynced.getOrDefault(shard)
if shard notin self.edgeFilterSubStates:
self.edgeFilterSubStates[shard] =
EdgeFilterSubState(currentHealth: TopicHealth.UNHEALTHY)
let addedTopics = toSeq(currTopics - prevTopics)
let removedTopics = toSeq(prevTopics - currTopics)
self.edgeFilterSubStates.withValue(shard, state):
state.peers.keepItIf(
self.node.peerManager.switch.peerStore.isConnected(it.peerId)
)
state.pending.keepItIf(not it.finished)
if addedTopics.len > 0 or removedTopics.len > 0:
for peer in state.peers:
asyncSpawn self.syncFilterDeltas(peer, shard, addedTopics, removedTopics)
if currTopics.len == 0:
for fut in state.pending:
if not fut.finished:
await fut.cancelAndWait()
self.edgeFilterSubStates.del(shard)
# invalidates `state` — do not use after this
else:
self.updateShardHealth(shard, state[])
let needed = max(0, HealthyThreshold - state.peers.len - state.pending.len)
if needed > 0:
let tracked = state.peers.mapIt(it.peerId).toHashSet() + state.pendingPeers
var candidates = self.node.peerManager.selectPeers(
filter_common.WakuFilterSubscribeCodec, some(shard)
)
candidates.keepItIf(it.peerId notin tracked)
let toDial = min(needed, candidates.len)
trace "edgeFilterSubLoop: shard reconciliation",
shard = shard,
num_peers = state.peers.len,
num_pending = state.pending.len,
num_needed = needed,
num_available = candidates.len,
toDial = toDial
for i in 0 ..< toDial:
let fut = self.dialFilterPeer(candidates[i], shard, toSeq(currTopics))
state.pending.add(fut)
lastSynced = desired
proc startEdgeFilterLoops(self: SubscriptionManager): Result[void, string] =
## Start the edge filter orchestration loops.
## Caller must ensure this is only called in edge mode (relay nil, filter client present).
self.edgeFilterWakeup = newAsyncEvent()
self.peerEventListener = WakuPeerEvent.listen(
self.node.brokerCtx,
proc(evt: WakuPeerEvent) {.async: (raises: []), gcsafe.} =
if evt.kind == WakuPeerEventKind.EventDisconnected or
evt.kind == WakuPeerEventKind.EventMetadataUpdated:
self.edgeFilterWakeup.fire()
,
).valueOr:
return err("Failed to listen to peer events for edge filter: " & error)
self.edgeFilterSubLoopFut = self.edgeFilterSubLoop()
self.edgeFilterHealthLoopFut = self.edgeFilterHealthLoop()
return ok()
proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} =
## Stop the edge filter orchestration loops and clean up pending futures.
if not isNil(self.edgeFilterSubLoopFut):
await self.edgeFilterSubLoopFut.cancelAndWait()
self.edgeFilterSubLoopFut = nil
if not isNil(self.edgeFilterHealthLoopFut):
await self.edgeFilterHealthLoopFut.cancelAndWait()
self.edgeFilterHealthLoopFut = nil
for shard, state in self.edgeFilterSubStates:
for fut in state.pending:
if not fut.finished:
await fut.cancelAndWait()
WakuPeerEvent.dropListener(self.node.brokerCtx, self.peerEventListener)
# ---------------------------------------------------------------------------
# SubscriptionManager Lifecycle (calls Edge behavior above)
#
# startSubscriptionManager and stopSubscriptionManager orchestrate both the
# core (relay) and edge (filter) paths, and register/clear broker providers.
# ---------------------------------------------------------------------------
proc startSubscriptionManager*(self: SubscriptionManager): Result[void, string] =
# Register edge filter broker providers. The shard/content health providers
# in WakuNode query these via the broker as a fallback when relay health is
# not available. If edge mode is not active, these providers simply return
# NOT_SUBSCRIBED / strength 0, which is harmless.
RequestEdgeShardHealth.setProvider(
self.node.brokerCtx,
proc(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] =
self.edgeFilterSubStates.withValue(shard, state):
return ok(RequestEdgeShardHealth(health: state.currentHealth))
return ok(RequestEdgeShardHealth(health: TopicHealth.NOT_SUBSCRIBED)),
).isOkOr:
error "Can't set provider for RequestEdgeShardHealth", error = error
RequestEdgeFilterPeerCount.setProvider(
self.node.brokerCtx,
proc(): Result[RequestEdgeFilterPeerCount, string] =
var minPeers = high(int)
for state in self.edgeFilterSubStates.values:
minPeers = min(minPeers, state.peers.len)
if minPeers == high(int):
minPeers = 0
return ok(RequestEdgeFilterPeerCount(peerCount: minPeers)),
).isOkOr:
error "Can't set provider for RequestEdgeFilterPeerCount", error = error
if self.node.wakuRelay.isNil():
return self.startEdgeFilterLoops()
# Core mode: auto-subscribe relay to all shards in autosharding.
if self.node.wakuAutoSharding.isSome():
let autoSharding = self.node.wakuAutoSharding.get()
let clusterId = autoSharding.clusterId
let numShards = autoSharding.shardCountGenZero
if numShards > 0:
var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards)
for i in 0 ..< numShards:
let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i))
clusterPubsubTopics.add(PubsubTopic($shardObj))
self.subscribePubsubTopics(clusterPubsubTopics).isOkOr:
error "Failed to auto-subscribe Relay to cluster shards: ", error = error
else:
info "SubscriptionManager has no AutoSharding configured; skipping auto-subscribe."
return ok()
proc stopSubscriptionManager*(self: SubscriptionManager) {.async: (raises: []).} =
if self.node.wakuRelay.isNil():
await self.stopEdgeFilterLoops()
RequestEdgeShardHealth.clearProvider(self.node.brokerCtx)
RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx)

View File

@ -2,6 +2,9 @@ import chronos, results, std/strutils, ../../api/types
export ConnectionStatus
const HealthyThreshold* = 2
## Minimum peers required per service protocol for a "Connected" status (excluding Relay).
proc init*(
t: typedesc[ConnectionStatus], strRep: string
): Result[ConnectionStatus, string] =

View File

@ -21,6 +21,7 @@ import
node/health_monitor/health_report,
node/health_monitor/connection_status,
node/health_monitor/protocol_health,
requests/health_requests,
]
## This module is aimed to check the state of the "self" Waku Node
@ -29,9 +30,6 @@ import
# if not called, the outcome of randomization procedures will be the same in every run
random.randomize()
const HealthyThreshold* = 2
## minimum peers required for all services for a Connected status, excluding Relay
type NodeHealthMonitor* = ref object
nodeHealth: HealthStatus
node: WakuNode
@ -48,7 +46,8 @@ type NodeHealthMonitor* = ref object
## latest known connectivity strength (e.g. connected peer count) metric for each protocol.
## if it doesn't make sense for the protocol in question, this is set to zero.
relayObserver: PubSubObserver
peerEventListener: EventWakuPeerListener
peerEventListener: WakuPeerEventListener
shardHealthListener: EventShardTopicHealthChangeListener
func getHealth*(report: HealthReport, kind: WakuProtocol): ProtocolHealth =
for h in report.protocolsHealth:
@ -198,6 +197,17 @@ proc getFilterClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
hm.strength[WakuProtocol.FilterClientProtocol] = 0
return p.notMounted()
if isNil(hm.node.wakuRelay):
let edgeRes = RequestEdgeFilterPeerCount.request(hm.node.brokerCtx)
if edgeRes.isOk():
let peerCount = edgeRes.get().peerCount
if peerCount > 0:
hm.strength[WakuProtocol.FilterClientProtocol] = peerCount
return p.ready()
else:
error "Failed to request edge filter peer count", error = edgeRes.error
return p.notReady("Failed to request edge filter peer count: " & edgeRes.error)
let peerCount = countCapablePeers(hm, WakuFilterSubscribeCodec)
hm.strength[WakuProtocol.FilterClientProtocol] = peerCount
@ -663,14 +673,23 @@ proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] =
)
hm.node.wakuRelay.addObserver(hm.relayObserver)
hm.peerEventListener = EventWakuPeer.listen(
hm.peerEventListener = WakuPeerEvent.listen(
hm.node.brokerCtx,
proc(evt: EventWakuPeer): Future[void] {.async: (raises: []), gcsafe.} =
proc(evt: WakuPeerEvent): Future[void] {.async: (raises: []), gcsafe.} =
## Recompute health on any peer changing anything (join, leave, identify, metadata update)
hm.healthUpdateEvent.fire(),
).valueOr:
return err("Failed to subscribe to peer events: " & error)
hm.shardHealthListener = EventShardTopicHealthChange.listen(
hm.node.brokerCtx,
proc(
evt: EventShardTopicHealthChange
): Future[void] {.async: (raises: []), gcsafe.} =
hm.healthUpdateEvent.fire(),
).valueOr:
return err("Failed to subscribe to shard health events: " & error)
hm.healthUpdateEvent = newAsyncEvent()
hm.healthUpdateEvent.fire()
@ -690,8 +709,8 @@ proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} =
if not isNil(hm.healthLoopFut):
await hm.healthLoopFut.cancelAndWait()
if hm.peerEventListener.id != 0:
EventWakuPeer.dropListener(hm.node.brokerCtx, hm.peerEventListener)
WakuPeerEvent.dropListener(hm.node.brokerCtx, hm.peerEventListener)
EventShardTopicHealthChange.dropListener(hm.node.brokerCtx, hm.shardHealthListener)
if not isNil(hm.node.wakuRelay) and not isNil(hm.relayObserver):
hm.node.wakuRelay.removeObserver(hm.relayObserver)

View File

@ -215,31 +215,34 @@ proc loadFromStorage(pm: PeerManager) {.gcsafe.} =
trace "recovered peers from storage", amount = amount
proc selectPeer*(
proc selectPeers*(
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
): Option[RemotePeerInfo] =
# Selects the best peer for a given protocol
): seq[RemotePeerInfo] =
## Returns all peers that support the given protocol (and optionally shard),
## shuffled randomly. Callers can further filter or pick from this list.
var peers = pm.switch.peerStore.getPeersByProtocol(proto)
trace "Selecting peer from peerstore",
protocol = proto, peers, address = cast[uint](pm.switch.peerStore)
trace "Selecting peers from peerstore",
protocol = proto, num_peers = peers.len, address = cast[uint](pm.switch.peerStore)
if shard.isSome():
# Parse the shard from the pubsub topic to get cluster and shard ID
let shardInfo = RelayShard.parse(shard.get()).valueOr:
trace "Failed to parse shard from pubsub topic", topic = shard.get()
return none(RemotePeerInfo)
return @[]
# Filter peers that support the requested shard
# Check both ENR (if present) and the shards field on RemotePeerInfo
peers.keepItIf(
# Check ENR if available
(it.enr.isSome() and it.enr.get().containsShard(shard.get())) or
# Otherwise check the shards field directly
(it.shards.len > 0 and it.shards.contains(shardInfo.shardId))
(it.shards.len > 0 and it.shards.contains(shardInfo.shardId))
)
shuffle(peers)
return peers
proc selectPeer*(
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
): Option[RemotePeerInfo] =
## Selects a single peer for a given protocol, checking service slots first
## (for non-relay protocols).
let peers = pm.selectPeers(proto, shard)
# No criteria for selecting a peer for WakuRelay, random one
if proto == WakuRelayCodec:
@ -742,7 +745,7 @@ proc refreshPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
# TODO: should only trigger an event if metadata actually changed
# should include the shard subscription delta in the event when
# it is a MetadataUpdated event
EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventMetadataUpdated)
WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventMetadataUpdated)
return
info "disconnecting from peer", peerId = peerId, reason = reason
@ -787,7 +790,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
asyncSpawn(pm.switch.disconnect(peerId))
peerStore.delete(peerId)
EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventConnected)
WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventConnected)
if not pm.onConnectionChange.isNil():
# we don't want to await for the callback to finish
@ -804,7 +807,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
pm.ipTable.del(ip)
break
EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventDisconnected)
WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventDisconnected)
if not pm.onConnectionChange.isNil():
# we don't want to await for the callback to finish
@ -812,7 +815,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
of PeerEventKind.Identified:
info "event identified", peerId = peerId
EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventIdentified)
WakuPeerEvent.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventIdentified)
peerStore[ConnectionBook][peerId] = connectedness
peerStore[DirectionBook][peerId] = direction

View File

@ -1,7 +1,7 @@
{.push raises: [].}
import
std/[options, tables, strutils, sequtils, os, net, random],
std/[options, tables, strutils, sequtils, os, net, random, sets],
chronos,
chronicles,
metrics,
@ -38,7 +38,6 @@ import
waku_store/resume,
waku_store_sync,
waku_filter_v2,
waku_filter_v2/common as filter_common,
waku_filter_v2/client as filter_client,
waku_metadata,
waku_rendezvous/protocol,
@ -60,7 +59,7 @@ import
requests/node_requests,
requests/health_requests,
events/health_events,
events/peer_events,
events/message_events,
],
waku/discovery/waku_kademlia,
./net_config,
@ -95,9 +94,6 @@ const clientId* = "Nimbus Waku v2 node"
const WakuNodeVersionString* = "version / git commit hash: " & git_version
const EdgeTopicHealthyThreshold = 2
## Lightpush server and filter server requirement for a healthy topic in edge mode
# key and crypto modules different
type
# TODO: Move to application instance (e.g., `WakuNode2`)
@ -142,10 +138,6 @@ type
legacyAppHandlers*: Table[PubsubTopic, WakuRelayHandler]
## Kernel API Relay appHandlers (if any)
wakuMix*: WakuMix
edgeTopicsHealth*: Table[PubsubTopic, TopicHealth]
edgeHealthEvent*: AsyncEvent
edgeHealthLoop: Future[void]
peerEventListener*: EventWakuPeerListener
kademliaDiscoveryLoop*: Future[void]
wakuKademlia*: WakuKademlia
@ -498,52 +490,7 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string]
return ok()
proc calculateEdgeTopicHealth(node: WakuNode, shard: PubsubTopic): TopicHealth =
let filterPeers =
node.peerManager.getPeersForShard(filter_common.WakuFilterSubscribeCodec, shard)
let lightpushPeers =
node.peerManager.getPeersForShard(lightpush_protocol.WakuLightPushCodec, shard)
if filterPeers >= EdgeTopicHealthyThreshold and
lightpushPeers >= EdgeTopicHealthyThreshold:
return TopicHealth.SUFFICIENTLY_HEALTHY
elif filterPeers > 0 and lightpushPeers > 0:
return TopicHealth.MINIMALLY_HEALTHY
return TopicHealth.UNHEALTHY
proc loopEdgeHealth(node: WakuNode) {.async.} =
while node.started:
await node.edgeHealthEvent.wait()
node.edgeHealthEvent.clear()
try:
for shard in node.edgeTopicsHealth.keys:
if not node.wakuRelay.isNil and node.wakuRelay.isSubscribed(shard):
continue
let oldHealth = node.edgeTopicsHealth.getOrDefault(shard, TopicHealth.UNHEALTHY)
let newHealth = node.calculateEdgeTopicHealth(shard)
if newHealth != oldHealth:
node.edgeTopicsHealth[shard] = newHealth
EventShardTopicHealthChange.emit(node.brokerCtx, shard, newHealth)
except CancelledError:
break
except CatchableError as e:
warn "Error in edge health check", error = e.msg
# safety cooldown to protect from edge cases
await sleepAsync(100.milliseconds)
proc startProvidersAndListeners*(node: WakuNode) =
node.peerEventListener = EventWakuPeer.listen(
node.brokerCtx,
proc(evt: EventWakuPeer) {.async: (raises: []), gcsafe.} =
node.edgeHealthEvent.fire(),
).valueOr:
error "Failed to listen to peer events", error = error
return
RequestRelayShard.setProvider(
node.brokerCtx,
proc(
@ -561,14 +508,23 @@ proc startProvidersAndListeners*(node: WakuNode) =
var response: RequestShardTopicsHealth
for shard in topics:
var healthStatus = TopicHealth.UNHEALTHY
# Health resolution order:
# 1. Relay topicsHealth (computed from gossipsub mesh state)
# 2. If relay is active but topicsHealth hasn't computed yet, UNHEALTHY
# 3. Otherwise, ask edge filter (via broker; no-op if no provider set)
var healthStatus = TopicHealth.NOT_SUBSCRIBED
if not node.wakuRelay.isNil:
healthStatus =
node.wakuRelay.topicsHealth.getOrDefault(shard, TopicHealth.NOT_SUBSCRIBED)
if healthStatus == TopicHealth.NOT_SUBSCRIBED:
healthStatus = node.calculateEdgeTopicHealth(shard)
if not node.wakuRelay.isNil and node.wakuRelay.isSubscribed(shard):
healthStatus = TopicHealth.UNHEALTHY
else:
let edgeRes = RequestEdgeShardHealth.request(node.brokerCtx, shard)
if edgeRes.isOk():
healthStatus = edgeRes.get().health
response.topicHealth.add((shard, healthStatus))
@ -594,9 +550,10 @@ proc startProvidersAndListeners*(node: WakuNode) =
pubsubTopic, TopicHealth.NOT_SUBSCRIBED
)
if topicHealth == TopicHealth.NOT_SUBSCRIBED and
pubsubTopic in node.edgeTopicsHealth:
topicHealth = node.calculateEdgeTopicHealth(pubsubTopic)
if topicHealth == TopicHealth.NOT_SUBSCRIBED:
let edgeRes = RequestEdgeShardHealth.request(node.brokerCtx, pubsubTopic)
if edgeRes.isOk():
topicHealth = edgeRes.get().health
response.contentTopicHealth.add((topic: contentTopic, health: topicHealth))
@ -605,7 +562,6 @@ proc startProvidersAndListeners*(node: WakuNode) =
error "Can't set provider for RequestContentTopicsHealth", error = error
proc stopProvidersAndListeners*(node: WakuNode) =
EventWakuPeer.dropListener(node.brokerCtx, node.peerEventListener)
RequestRelayShard.clearProvider(node.brokerCtx)
RequestContentTopicsHealth.clearProvider(node.brokerCtx)
RequestShardTopicsHealth.clearProvider(node.brokerCtx)
@ -658,13 +614,16 @@ proc start*(node: WakuNode) {.async.} =
## The switch will update addresses after start using the addressMapper
await node.switch.start()
node.edgeHealthEvent = newAsyncEvent()
node.edgeHealthLoop = loopEdgeHealth(node)
node.started = true
if not node.wakuFilterClient.isNil():
node.wakuFilterClient.registerPushHandler(
proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
MessageSeenEvent.emit(node.brokerCtx, pubsubTopic, msg)
)
node.startProvidersAndListeners()
node.started = true
if not zeroPortPresent:
updateAnnouncedAddrWithPrimaryIpAddr(node).isOkOr:
error "failed update announced addr", error = $error
@ -678,10 +637,6 @@ proc stop*(node: WakuNode) {.async.} =
node.stopProvidersAndListeners()
if not node.edgeHealthLoop.isNil:
await node.edgeHealthLoop.cancelAndWait()
node.edgeHealthLoop = nil
await node.switch.stop()
node.peerManager.stop()

View File

@ -37,3 +37,15 @@ RequestBroker:
healthStatus*: ProtocolHealth
proc signature(protocol: WakuProtocol): Future[Result[RequestProtocolHealth, string]]
# Get edge filter health for a single shard (set by DeliveryService when edge mode is active)
RequestBroker(sync):
type RequestEdgeShardHealth* = object
health*: TopicHealth
proc signature(shard: PubsubTopic): Result[RequestEdgeShardHealth, string]
# Get edge filter confirmed peer count (set by DeliveryService when edge mode is active)
RequestBroker(sync):
type RequestEdgeFilterPeerCount* = object
peerCount*: int

View File

@ -1,3 +1,3 @@
import ./subscription/subscription_manager, ./subscription/push_handler
import ./subscription/push_handler
export subscription_manager, push_handler
export push_handler

View File

@ -1,52 +0,0 @@
{.push raises: [].}
import std/tables, results, chronicles, chronos
import ./push_handler, ../topics, ../message
## Subscription manager
type LegacySubscriptionManager* = object
subscriptions: TableRef[(string, ContentTopic), FilterPushHandler]
proc init*(T: type LegacySubscriptionManager): T =
LegacySubscriptionManager(
subscriptions: newTable[(string, ContentTopic), FilterPushHandler]()
)
proc clear*(m: var LegacySubscriptionManager) =
m.subscriptions.clear()
proc registerSubscription*(
m: LegacySubscriptionManager,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic,
handler: FilterPushHandler,
) =
try:
# TODO: Handle over subscription surprises
m.subscriptions[(pubsubTopic, contentTopic)] = handler
except CatchableError:
error "failed to register filter subscription", error = getCurrentExceptionMsg()
proc removeSubscription*(
m: LegacySubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic
) =
m.subscriptions.del((pubsubTopic, contentTopic))
proc notifySubscriptionHandler*(
m: LegacySubscriptionManager,
pubsubTopic: PubsubTopic,
contentTopic: ContentTopic,
message: WakuMessage,
) =
if not m.subscriptions.hasKey((pubsubTopic, contentTopic)):
return
try:
let handler = m.subscriptions[(pubsubTopic, contentTopic)]
asyncSpawn handler(pubsubTopic, message)
except CatchableError:
discard
proc getSubscriptionsCount*(m: LegacySubscriptionManager): int =
m.subscriptions.len()

View File

@ -101,12 +101,20 @@ proc sendSubscribeRequest(
return ok()
proc ping*(
wfc: WakuFilterClient, servicePeer: RemotePeerInfo
wfc: WakuFilterClient, servicePeer: RemotePeerInfo, timeout = chronos.seconds(0)
): Future[FilterSubscribeResult] {.async.} =
info "sending ping", servicePeer = shortLog($servicePeer)
let requestId = generateRequestId(wfc.rng)
let filterSubscribeRequest = FilterSubscribeRequest.ping(requestId)
if timeout > chronos.seconds(0):
let fut = wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
if not await fut.withTimeout(timeout):
return err(
FilterSubscribeError.parse(uint32(FilterSubscribeErrorKind.PEER_DIAL_FAILURE))
)
return fut.read()
return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest)
proc subscribe*(

View File

@ -157,7 +157,7 @@ type
): Future[ValidationResult] {.gcsafe, raises: [Defect].}
WakuRelay* = ref object of GossipSub
brokerCtx: BrokerContext
peerEventListener: EventWakuPeerListener
peerEventListener: WakuPeerEventListener
# seq of tuples: the first entry in the tuple contains the validators are called for every topic
# the second entry contains the error messages to be returned when the validator fails
wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]]
@ -376,9 +376,9 @@ proc new*(
w.initProtocolHandler()
w.initRelayObservers()
w.peerEventListener = EventWakuPeer.listen(
w.peerEventListener = WakuPeerEvent.listen(
w.brokerCtx,
proc(evt: EventWakuPeer): Future[void] {.async: (raises: []), gcsafe.} =
proc(evt: WakuPeerEvent): Future[void] {.async: (raises: []), gcsafe.} =
if evt.kind == WakuPeerEventKind.EventDisconnected:
w.topicHealthCheckAll = true
w.topicHealthUpdateEvent.fire()
@ -524,8 +524,7 @@ method stop*(w: WakuRelay) {.async, base.} =
info "stop"
await procCall GossipSub(w).stop()
if w.peerEventListener.id != 0:
EventWakuPeer.dropListener(w.brokerCtx, w.peerEventListener)
WakuPeerEvent.dropListener(w.brokerCtx, w.peerEventListener)
if not w.topicHealthLoopHandle.isNil():
await w.topicHealthLoopHandle.cancelAndWait()