From e4358c9718d0c9f0e47f9e9d577fbee02cb67aa9 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 13 Aug 2025 10:48:56 +0530 Subject: [PATCH] chore: remove metadata protocol dependency on enr, relax check when nwaku is edge node (#3519) * remove metadata protocol dep on enr, do not disconnect peers based on shards mismatch --- apps/networkmonitor/networkmonitor.nim | 2 +- apps/wakucanary/wakucanary.nim | 2 +- examples/filter_subscriber.nim | 4 +- examples/lightpush_publisher.nim | 4 +- tests/node/peer_manager/test_peer_manager.nim | 18 ++++-- tests/test_peer_manager.nim | 22 +++---- tests/wakunode_rest/test_rest_admin.nim | 7 ++- waku/factory/node_factory.nim | 3 +- waku/node/peer_manager/peer_manager.nim | 12 +--- waku/node/waku_node.nim | 9 ++- waku/waku_metadata/protocol.nim | 57 ++++++++----------- 11 files changed, 67 insertions(+), 73 deletions(-) diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index f391b3d20..e585a0e15 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -652,7 +652,7 @@ when isMainModule: error "failed to setup RLN", err = getCurrentExceptionMsg() quit 1 - node.mountMetadata(conf.clusterId).isOkOr: + node.mountMetadata(conf.clusterId, conf.shards).isOkOr: error "failed to mount waku metadata protocol: ", err = error quit 1 diff --git a/apps/wakucanary/wakucanary.nim b/apps/wakucanary/wakucanary.nim index 84ac6350c..b2dd4d331 100644 --- a/apps/wakucanary/wakucanary.nim +++ b/apps/wakucanary/wakucanary.nim @@ -256,7 +256,7 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} = error "failed to mount libp2p ping protocol: " & getCurrentExceptionMsg() quit(QuitFailure) - node.mountMetadata(conf.clusterId).isOkOr: + node.mountMetadata(conf.clusterId, conf.shards).isOkOr: error "failed to mount metadata protocol", error quit(QuitFailure) diff --git a/examples/filter_subscriber.nim b/examples/filter_subscriber.nim index af40d21c3..e4e26bdb7 100644 --- a/examples/filter_subscriber.nim +++ b/examples/filter_subscriber.nim @@ -76,7 +76,9 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() let node = builder.build().tryGet() - node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol") + node.mountMetadata(clusterId, shardId).expect( + "failed to mount waku metadata protocol" + ) await node.mountFilterClient() await node.start() diff --git a/examples/lightpush_publisher.nim b/examples/lightpush_publisher.nim index 9c7499695..e9fa2174d 100644 --- a/examples/lightpush_publisher.nim +++ b/examples/lightpush_publisher.nim @@ -68,7 +68,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() let node = builder.build().tryGet() - node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol") + node.mountMetadata(clusterId, shardId).expect( + "failed to mount waku metadata protocol" + ) node.mountLegacyLightPushClient() await node.start() diff --git a/tests/node/peer_manager/test_peer_manager.nim b/tests/node/peer_manager/test_peer_manager.nim index ad1f1bf0e..7697e0af7 100644 --- a/tests/node/peer_manager/test_peer_manager.nim +++ b/tests/node/peer_manager/test_peer_manager.nim @@ -34,8 +34,10 @@ suite "Peer Manager": ) # And both mount metadata and filter - discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic - discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic + discard + client.mountMetadata(0, @[1'u16]) # clusterId irrelevant, overridden by topic + discard + server.mountMetadata(0, @[0'u16]) # clusterId irrelevant, overridden by topic await client.mountFilterClient() await server.mountFilter() @@ -69,8 +71,10 @@ suite "Peer Manager": ) # And both mount metadata and relay - discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic - discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic + discard + client.mountMetadata(0, @[1'u16]) # clusterId irrelevant, overridden by topic + discard + server.mountMetadata(0, @[0'u16]) # clusterId irrelevant, overridden by topic (await client.mountRelay()).isOkOr: assert false, "Failed to mount relay" (await server.mountRelay()).isOkOr: @@ -105,8 +109,10 @@ suite "Peer Manager": ) # And both mount metadata and relay - discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic - discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic + discard + client.mountMetadata(0, @[1'u16]) # clusterId irrelevant, overridden by topic + discard + server.mountMetadata(0, @[0'u16]) # clusterId irrelevant, overridden by topic (await client.mountRelay()).isOkOr: assert false, "Failed to mount relay" (await server.mountRelay()).isOkOr: diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index e36f2a819..c2639a7c1 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -274,8 +274,8 @@ procSuite "Peer Manager": ) node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023)) - node1.mountMetadata(0).expect("Mounted Waku Metadata") - node2.mountMetadata(0).expect("Mounted Waku Metadata") + node1.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata") + node2.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata") await node1.start() await node2.start() @@ -313,7 +313,7 @@ procSuite "Peer Manager": peerStorage = storage, ) - node3.mountMetadata(0).expect("Mounted Waku Metadata") + node3.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata") await node3.start() @@ -347,8 +347,8 @@ procSuite "Peer Manager": ) node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023)) - node1.mountMetadata(0).expect("Mounted Waku Metadata") - node2.mountMetadata(0).expect("Mounted Waku Metadata") + node1.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata") + node2.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata") await node1.start() await node2.start() @@ -386,7 +386,7 @@ procSuite "Peer Manager": peerStorage = storage, ) - node3.mountMetadata(0).expect("Mounted Waku Metadata") + node3.mountMetadata(0, @[0'u16]).expect("Mounted Waku Metadata") await node3.start() @@ -439,9 +439,9 @@ procSuite "Peer Manager": subscribeShards = @[uint16(0)], ) - node1.mountMetadata(3).expect("Mounted Waku Metadata") - node2.mountMetadata(4).expect("Mounted Waku Metadata") - node3.mountMetadata(4).expect("Mounted Waku Metadata") + node1.mountMetadata(3, @[0'u16]).expect("Mounted Waku Metadata") + node2.mountMetadata(4, @[0'u16]).expect("Mounted Waku Metadata") + node3.mountMetadata(4, @[0'u16]).expect("Mounted Waku Metadata") # Start nodes await allFutures([node1.start(), node2.start(), node3.start()]) @@ -548,7 +548,7 @@ procSuite "Peer Manager": ) # Start them - discard nodes.mapIt(it.mountMetadata(0)) + discard nodes.mapIt(it.mountMetadata(0, @[0'u16])) await allFutures(nodes.mapIt(it.mountRelay())) await allFutures(nodes.mapIt(it.start())) @@ -621,7 +621,7 @@ procSuite "Peer Manager": ) # Start them - discard nodes.mapIt(it.mountMetadata(0)) + discard nodes.mapIt(it.mountMetadata(0, @[0'u16])) await allFutures(nodes.mapIt(it.mountRelay())) await allFutures(nodes.mapIt(it.start())) diff --git a/tests/wakunode_rest/test_rest_admin.nim b/tests/wakunode_rest/test_rest_admin.nim index c928140e1..e47207a42 100644 --- a/tests/wakunode_rest/test_rest_admin.nim +++ b/tests/wakunode_rest/test_rest_admin.nim @@ -43,11 +43,12 @@ suite "Waku v2 Rest API - Admin": node3 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60604)) let clusterId = 1.uint16 - node1.mountMetadata(clusterId).isOkOr: + let shards: seq[uint16] = @[0] + node1.mountMetadata(clusterId, shards).isOkOr: assert false, "Failed to mount metadata: " & $error - node2.mountMetadata(clusterId).isOkOr: + node2.mountMetadata(clusterId, shards).isOkOr: assert false, "Failed to mount metadata: " & $error - node3.mountMetadata(clusterId).isOkOr: + node3.mountMetadata(clusterId, shards).isOkOr: assert false, "Failed to mount metadata: " & $error await allFutures(node1.start(), node2.start(), node3.start()) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 5b0133288..a944379e9 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -154,7 +154,8 @@ proc setupProtocols( ## Optionally include persistent message storage. ## No protocols are started yet. - node.mountMetadata(conf.clusterId).isOkOr: + var allShards = conf.subscribeShards + node.mountMetadata(conf.clusterId, allShards).isOkOr: return err("failed to mount waku metadata protocol: " & error) var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} = diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 81f4206cf..7fcd34a60 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -654,17 +654,6 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = $clusterId break guardClauses - if ( - pm.switch.peerStore.hasPeer(peerId, WakuRelayCodec) and - not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)) - ): - let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & " ]" - let otherShardsString = "[ " & metadata.shards.join(", ") & " ]" - reason = - "no shards in common: my_shards = " & myShardsString & " others_shards = " & - otherShardsString - break guardClauses - return info "disconnecting from peer", peerId = peerId, reason = reason @@ -799,6 +788,7 @@ proc getOnlineStateObserver*(pm: PeerManager): OnOnlineStateChange = #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# proc manageRelayPeers*(pm: PeerManager) {.async.} = + #TODO: this check should not be based on whether shards are present, but rather if relay is mounted if pm.wakuMetadata.shards.len == 0: return diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 5cb97972c..7fddd7d63 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -183,11 +183,14 @@ proc disconnectNode*(node: WakuNode, remotePeer: RemotePeerInfo) {.async.} = ## Waku Metadata -proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] = +proc mountMetadata*( + node: WakuNode, clusterId: uint32, shards: seq[uint16] +): Result[void, string] = if not node.wakuMetadata.isNil(): return err("Waku metadata already mounted, skipping") - - let metadata = WakuMetadata.new(clusterId, node.enr, node.topicSubscriptionQueue) + let shards32 = shards.mapIt(it.uint32) + let metadata = + WakuMetadata.new(clusterId, shards32.toHashSet(), some(node.topicSubscriptionQueue)) node.wakuMetadata = metadata node.peerManager.wakuMetadata = metadata diff --git a/waku/waku_metadata/protocol.nim b/waku/waku_metadata/protocol.nim index 75f021dbe..0112fd45e 100644 --- a/waku/waku_metadata/protocol.nim +++ b/waku/waku_metadata/protocol.nim @@ -10,7 +10,7 @@ import libp2p/stream/connection, libp2p/crypto/crypto, eth/p2p/discoveryv5/enr -import ../common/nimchronos, ../common/enr, ../waku_core, ../waku_enr, ./rpc +import ../common/nimchronos, ../waku_core, ./rpc from ../waku_core/codecs import WakuMetadataCodec export WakuMetadataCodec @@ -23,7 +23,7 @@ const RpcResponseMaxBytes* = 1024 type WakuMetadata* = ref object of LPProtocol clusterId*: uint32 shards*: HashSet[uint32] - topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent] + topicSubscriptionQueue: Option[AsyncEventQueue[SubscriptionEvent]] proc respond( m: WakuMetadata, conn: Connection @@ -49,7 +49,7 @@ proc request*( let readRes = catch: await conn.readLp(RpcResponseMaxBytes) - # close no watter what + # close no matter what let closeRes = catch: await conn.closeWithEof() if closeRes.isErr(): @@ -104,21 +104,11 @@ proc initProtocolHandler(m: WakuMetadata) = proc new*( T: type WakuMetadata, clusterId: uint32, - enr: Record, - queue: AsyncEventQueue[SubscriptionEvent], + shards: HashSet[uint32], + queue: Option[AsyncEventQueue[SubscriptionEvent]], ): T = - var (cluster, shards) = (clusterId, initHashSet[uint32]()) - - let enrRes = enr.toTyped() - if enrRes.isOk(): - let shardingRes = enrRes.get().relaySharding() - if shardingRes.isSome(): - let relayShard = shardingRes.get() - cluster = uint32(relayShard.clusterId) - shards = toHashSet(relayShard.shardIds.mapIt(uint32(it))) - let wm = - WakuMetadata(clusterId: cluster, shards: shards, topicSubscriptionQueue: queue) + WakuMetadata(clusterId: clusterId, shards: shards, topicSubscriptionQueue: queue) wm.initProtocolHandler() @@ -128,32 +118,31 @@ proc new*( proc subscriptionsListener(wm: WakuMetadata) {.async.} = ## Listen for pubsub topics subscriptions changes + if wm.topicSubscriptionQueue.isSome(): + let key = wm.topicSubscriptionQueue.get().register() - let key = wm.topicSubscriptionQueue.register() + while wm.started: + let events = await wm.topicSubscriptionQueue.get().waitEvents(key) - while wm.started: - let events = await wm.topicSubscriptionQueue.waitEvents(key) + for event in events: + let parsedShard = RelayShard.parse(event.topic).valueOr: + continue - for event in events: - let parsedShard = RelayShard.parse(event.topic).valueOr: - continue + if parsedShard.clusterId != wm.clusterId: + continue - if parsedShard.clusterId != wm.clusterId: - continue + case event.kind + of PubsubSub: + wm.shards.incl(parsedShard.shardId) + of PubsubUnsub: + wm.shards.excl(parsedShard.shardId) + else: + continue - case event.kind - of PubsubSub: - wm.shards.incl(parsedShard.shardId) - of PubsubUnsub: - wm.shards.excl(parsedShard.shardId) - else: - continue - - wm.topicSubscriptionQueue.unregister(key) + wm.topicSubscriptionQueue.get().unregister(key) proc start*(wm: WakuMetadata) = wm.started = true - asyncSpawn wm.subscriptionsListener() proc stop*(wm: WakuMetadata) =