From 84f791100fcc4367ca17a9d999e9aa943359dd51 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Fri, 13 Feb 2026 11:23:21 +0100 Subject: [PATCH] fix: peer selection by shard and rendezvous/metadata sharding initialization (#3718) * Fix peer selection for cases where ENR is not yet advertiesed but metadata exchange already adjusted supported shards. Fix initialization rendezvous protocol with configured and autoshards to let connect to relay nodes without having a valid subscribed shard already. This solves issue for autoshard nodes to connect ahead of subscribing. * Extend peer selection, rendezvous and metadata tests * Fix rendezvous test, fix metadata test failing due wrong setup, added it into all_tests --- tests/all_tests_waku.nim | 1 + tests/test_peer_manager.nim | 230 ++++++++++++++++++++++++ tests/test_waku_metadata.nim | 85 +++++++-- tests/test_waku_rendezvous.nim | 63 +++++++ waku/factory/node_factory.nim | 2 +- waku/node/peer_manager/peer_manager.nim | 14 +- waku/node/waku_node.nim | 28 ++- 7 files changed, 401 insertions(+), 22 deletions(-) diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 3d22cd9c2..4d4225f9f 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -89,6 +89,7 @@ import ./test_waku_netconfig, ./test_waku_switch, ./test_waku_rendezvous, + ./test_waku_metadata, ./waku_discv5/test_waku_discv5 # Waku Keystore test suite diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 97df39582..c96f21b6e 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -1207,3 +1207,233 @@ procSuite "Peer Manager": r = node1.peerManager.selectPeer(WakuPeerExchangeCodec) assert r.isSome(), "could not retrieve peer mounting WakuPeerExchangeCodec" + + asyncTest "selectPeer() filters peers by shard using ENR": + ## Given: A peer manager with 3 peers having different shards in their ENRs + let + clusterId = 0.uint16 + shardId0 = 0.uint16 + shardId1 = 1.uint16 + + # Create 3 nodes with different shards + let nodes = + @[ + newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + subscribeShards = @[shardId0], + ), + newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + subscribeShards = @[shardId1], + ), + newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + subscribeShards = @[shardId0], + ), + ] + + await allFutures(nodes.mapIt(it.start())) + for node in nodes: + discard await node.mountRelay() + + # Get peer infos with ENRs + let peerInfos = collect: + for node in nodes: + var peerInfo = node.switch.peerInfo.toRemotePeerInfo() + peerInfo.enr = some(node.enr) + peerInfo + + # Add all peers to node 0's peer manager and peerstore + for i in 1 .. 2: + nodes[0].peerManager.addPeer(peerInfos[i]) + nodes[0].peerManager.switch.peerStore[AddressBook][peerInfos[i].peerId] = + peerInfos[i].addrs + nodes[0].peerManager.switch.peerStore[ProtoBook][peerInfos[i].peerId] = + @[WakuRelayCodec] + + ## When: We select a peer for shard 0 + let shard0Topic = some(PubsubTopic("/waku/2/rs/0/0")) + let selectedPeer0 = nodes[0].peerManager.selectPeer(WakuRelayCodec, shard0Topic) + + ## Then: Only peers supporting shard 0 are considered (nodes 2, not node 1) + check: + selectedPeer0.isSome() + selectedPeer0.get().peerId != peerInfos[1].peerId # node1 has shard 1 + selectedPeer0.get().peerId == peerInfos[2].peerId # node2 has shard 0 + + ## When: We select a peer for shard 1 + let shard1Topic = some(PubsubTopic("/waku/2/rs/0/1")) + let selectedPeer1 = nodes[0].peerManager.selectPeer(WakuRelayCodec, shard1Topic) + + ## Then: Only peer with shard 1 is selected + check: + selectedPeer1.isSome() + selectedPeer1.get().peerId == peerInfos[1].peerId # node1 has shard 1 + + await allFutures(nodes.mapIt(it.stop())) + + asyncTest "selectPeer() filters peers by shard using shards field": + ## Given: A peer manager with peers having shards in RemotePeerInfo (no ENR) + let + clusterId = 0.uint16 + shardId0 = 0.uint16 + shardId1 = 1.uint16 + + # Create peer manager + let pm = PeerManager.new( + switch = SwitchBuilder.new().withRng(rng()).withMplex().withNoise().build(), + storage = nil, + ) + + # Create peer infos with shards field populated (simulating metadata exchange) + let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D" + let peers = toSeq(1 .. 3) + .mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it)) + .filterIt(it.isOk()) + .mapIt(it.value) + require: + peers.len == 3 + + # Manually populate the shards field (ENR is not available) + var peerInfos: seq[RemotePeerInfo] = @[] + for i, peer in peers: + var peerInfo = RemotePeerInfo.init(peer.peerId, peer.addrs) + # Peer 0 and 2 have shard 0, peer 1 has shard 1 + peerInfo.shards = + if i == 1: + @[shardId1] + else: + @[shardId0] + # Note: ENR is intentionally left as none + peerInfos.add(peerInfo) + + # Add peers to peerstore + for peerInfo in peerInfos: + pm.switch.peerStore[AddressBook][peerInfo.peerId] = peerInfo.addrs + pm.switch.peerStore[ProtoBook][peerInfo.peerId] = @[WakuRelayCodec] + # simulate metadata exchange by setting shards field in peerstore + pm.switch.peerStore.setShardInfo(peerInfo.peerId, peerInfo.shards) + + ## When: We select a peer for shard 0 + let shard0Topic = some(PubsubTopic("/waku/2/rs/0/0")) + let selectedPeer0 = pm.selectPeer(WakuRelayCodec, shard0Topic) + + ## Then: Peers with shard 0 in shards field are selected + check: + selectedPeer0.isSome() + selectedPeer0.get().peerId in [peerInfos[0].peerId, peerInfos[2].peerId] + + ## When: We select a peer for shard 1 + let shard1Topic = some(PubsubTopic("/waku/2/rs/0/1")) + let selectedPeer1 = pm.selectPeer(WakuRelayCodec, shard1Topic) + + ## Then: Peer with shard 1 in shards field is selected + check: + selectedPeer1.isSome() + selectedPeer1.get().peerId == peerInfos[1].peerId + + asyncTest "selectPeer() handles invalid pubsub topic gracefully": + ## Given: A peer manager with valid peers + let node = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = 0, + subscribeShards = @[0'u16], + ) + await node.start() + + # Add a peer + let peer = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + await peer.start() + discard await peer.mountRelay() + + var peerInfo = peer.switch.peerInfo.toRemotePeerInfo() + peerInfo.enr = some(peer.enr) + node.peerManager.addPeer(peerInfo) + node.peerManager.switch.peerStore[ProtoBook][peerInfo.peerId] = @[WakuRelayCodec] + + ## When: selectPeer is called with malformed pubsub topic + let invalidTopics = + @[ + some(PubsubTopic("invalid-topic")), + some(PubsubTopic("/waku/2/invalid")), + some(PubsubTopic("/waku/2/rs/abc/0")), # non-numeric cluster + some(PubsubTopic("")), # empty topic + ] + + ## Then: Returns none(RemotePeerInfo) without crashing + for invalidTopic in invalidTopics: + let result = node.peerManager.selectPeer(WakuRelayCodec, invalidTopic) + check: + result.isNone() + + await allFutures(node.stop(), peer.stop()) + + asyncTest "selectPeer() prioritizes ENR over shards field": + ## Given: A peer with both ENR and shards field populated + let + clusterId = 0.uint16 + shardId0 = 0.uint16 + shardId1 = 1.uint16 + + let node = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + subscribeShards = @[shardId0], + ) + await node.start() + discard await node.mountRelay() + + # Create peer with ENR containing shard 0 + let peer = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + subscribeShards = @[shardId0], + ) + await peer.start() + discard await peer.mountRelay() + + # Create peer info with ENR (shard 0) but set shards field to shard 1 + var peerInfo = peer.switch.peerInfo.toRemotePeerInfo() + peerInfo.enr = some(peer.enr) # ENR has shard 0 + peerInfo.shards = @[shardId1] # shards field has shard 1 + + node.peerManager.addPeer(peerInfo) + node.peerManager.switch.peerStore[ProtoBook][peerInfo.peerId] = @[WakuRelayCodec] + # simulate metadata exchange by setting shards field in peerstore + node.peerManager.switch.peerStore.setShardInfo(peerInfo.peerId, peerInfo.shards) + + ## When: We select for shard 0 + let shard0Topic = some(PubsubTopic("/waku/2/rs/0/0")) + let selectedPeer = node.peerManager.selectPeer(WakuRelayCodec, shard0Topic) + + ## Then: Peer is selected because ENR (shard 0) takes precedence + check: + selectedPeer.isSome() + selectedPeer.get().peerId == peerInfo.peerId + + ## When: We select for shard 1 + let shard1Topic = some(PubsubTopic("/waku/2/rs/0/1")) + let selectedPeer1 = node.peerManager.selectPeer(WakuRelayCodec, shard1Topic) + + ## Then: Peer is still selected because shards field is checked as fallback + check: + selectedPeer1.isSome() + selectedPeer1.get().peerId == peerInfo.peerId + + await allFutures(node.stop(), peer.stop()) diff --git a/tests/test_waku_metadata.nim b/tests/test_waku_metadata.nim index b30fd1712..cfceb89b5 100644 --- a/tests/test_waku_metadata.nim +++ b/tests/test_waku_metadata.nim @@ -13,14 +13,15 @@ import eth/keys, eth/p2p/discoveryv5/enr import - waku/ - [ - waku_node, - waku_core/topics, - node/peer_manager, - discovery/waku_discv5, - waku_metadata, - ], + waku/[ + waku_node, + waku_core/topics, + waku_core, + node/peer_manager, + discovery/waku_discv5, + waku_metadata, + waku_relay/protocol, + ], ./testlib/wakucore, ./testlib/wakunode @@ -41,26 +42,86 @@ procSuite "Waku Metadata Protocol": clusterId = clusterId, ) + # Mount metadata protocol on both nodes before starting + discard node1.mountMetadata(clusterId, @[]) + discard node2.mountMetadata(clusterId, @[]) + + # Mount relay so metadata can track subscriptions + discard await node1.mountRelay() + discard await node2.mountRelay() + # Start nodes await allFutures([node1.start(), node2.start()]) - node1.topicSubscriptionQueue.emit((kind: PubsubSub, topic: "/waku/2/rs/10/7")) - node1.topicSubscriptionQueue.emit((kind: PubsubSub, topic: "/waku/2/rs/10/6")) + # Subscribe to topics on node1 - relay will track these and metadata will report them + let noOpHandler: WakuRelayHandler = proc( + pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[void] {.async.} = + discard + + node1.wakuRelay.subscribe("/waku/2/rs/10/7", noOpHandler) + node1.wakuRelay.subscribe("/waku/2/rs/10/6", noOpHandler) # Create connection let connOpt = await node2.peerManager.dialPeer( node1.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec ) require: - connOpt.isSome + connOpt.isSome() # Request metadata let response1 = await node2.wakuMetadata.request(connOpt.get()) # Check the response or dont even continue require: - response1.isOk + response1.isOk() check: response1.get().clusterId.get() == clusterId response1.get().shards == @[uint32(6), uint32(7)] + + await allFutures([node1.stop(), node2.stop()]) + + asyncTest "Metadata reports configured shards before relay subscription": + ## Given: Node with configured shards but no relay subscriptions yet + let + clusterId = 10.uint16 + configuredShards = @[uint16(0), uint16(1)] + + let node1 = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + subscribeShards = configuredShards, + ) + let node2 = newTestWakuNode( + generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), clusterId = clusterId + ) + + # Mount metadata with configured shards on node1 + discard node1.mountMetadata(clusterId, configuredShards) + # Mount metadata on node2 so it can make requests + discard node2.mountMetadata(clusterId, @[]) + + # Start nodes (relay is NOT mounted yet on node1) + await allFutures([node1.start(), node2.start()]) + + ## When: Node2 requests metadata from Node1 before relay is active + let connOpt = await node2.peerManager.dialPeer( + node1.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec + ) + require: + connOpt.isSome + + let response = await node2.wakuMetadata.request(connOpt.get()) + + ## Then: Response contains configured shards even without relay subscriptions + require: + response.isOk() + + check: + response.get().clusterId.get() == clusterId + response.get().shards == @[uint32(0), uint32(1)] + + await allFutures([node1.stop(), node2.stop()]) diff --git a/tests/test_waku_rendezvous.nim b/tests/test_waku_rendezvous.nim index d3dd6f920..07113ca4a 100644 --- a/tests/test_waku_rendezvous.nim +++ b/tests/test_waku_rendezvous.nim @@ -10,6 +10,7 @@ import import waku/waku_core/peers, waku/waku_core/codecs, + waku/waku_core, waku/node/waku_node, waku/node/peer_manager/peer_manager, waku/waku_rendezvous/protocol, @@ -81,3 +82,65 @@ procSuite "Waku Rendezvous": records.len == 1 records[0].peerId == peerInfo1.peerId #records[0].mixPubKey == $node1.wakuMix.pubKey + + asyncTest "Rendezvous advertises configured shards before relay is active": + ## Given: A node with configured shards but no relay subscriptions yet + let + clusterId = 10.uint16 + configuredShards = @[RelayShard(clusterId: clusterId, shardId: 0)] + + let node = newTestWakuNode( + generateSecp256k1Key(), + parseIpAddress("0.0.0.0"), + Port(0), + clusterId = clusterId, + subscribeShards = @[0'u16], + ) + + ## When: Node mounts rendezvous with configured shards (before relay) + await node.mountRendezvous(clusterId, configuredShards) + await node.start() + + ## Then: The rendezvous protocol should be mounted successfully + check: + node.wakuRendezvous != nil + + # Verify that the protocol is running without errors + # (shards are used internally by the getShardsGetter closure) + let namespace = computeMixNamespace(clusterId) + check: + namespace.len > 0 + + await node.stop() + + asyncTest "Rendezvous uses configured shards when relay not mounted": + ## Given: A light client node with no relay protocol + let + clusterId = 10.uint16 + configuredShards = + @[ + RelayShard(clusterId: clusterId, shardId: 0), + RelayShard(clusterId: clusterId, shardId: 1), + ] + + let lightClient = newTestWakuNode( + generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), clusterId = clusterId + ) + + ## When: Node mounts rendezvous with configured shards (no relay mounted) + await lightClient.mountRendezvous(clusterId, configuredShards) + await lightClient.start() + + ## Then: Rendezvous should be mounted successfully without relay + check: + lightClient.wakuRendezvous != nil + lightClient.wakuRelay == nil # Verify relay is not mounted + + # Verify the protocol is working (doesn't fail immediately) + # advertiseAll requires peers,so we just check the protocol is initialized + await sleepAsync(100.milliseconds) + + check: + lightClient.wakuRendezvous != nil + + await lightClient.stop() diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 2cdfdb0d2..dc383e89d 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -337,7 +337,7 @@ proc setupProtocols( node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId) if conf.rendezvous: - await node.mountRendezvous(conf.clusterId) + await node.mountRendezvous(conf.clusterId, shards) await node.mountRendezvousClient(conf.clusterId) # Keepalive mounted on all nodes diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 834fb19cf..0c435468f 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -227,7 +227,19 @@ proc selectPeer*( protocol = proto, peers, address = cast[uint](pm.switch.peerStore) if shard.isSome(): - peers.keepItIf((it.enr.isSome() and it.enr.get().containsShard(shard.get()))) + # Parse the shard from the pubsub topic to get cluster and shard ID + let shardInfo = RelayShard.parse(shard.get()).valueOr: + trace "Failed to parse shard from pubsub topic", topic = shard.get() + return none(RemotePeerInfo) + + # Filter peers that support the requested shard + # Check both ENR (if present) and the shards field on RemotePeerInfo + peers.keepItIf( + # Check ENR if available + (it.enr.isSome() and it.enr.get().containsShard(shard.get())) or + # Otherwise check the shards field directly + (it.shards.len > 0 and it.shards.contains(shardInfo.shardId)) + ) shuffle(peers) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index cb3d81c7c..53ce0349a 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -167,20 +167,28 @@ proc deduceRelayShard( return err("Invalid topic:" & pubsubTopic & " " & $error) return ok(shard) -proc getShardsGetter(node: WakuNode): GetShards = +proc getShardsGetter(node: WakuNode, configuredShards: seq[uint16]): GetShards = return proc(): seq[uint16] {.closure, gcsafe, raises: [].} = # fetch pubsubTopics subscribed to relay and convert them to shards if node.wakuRelay.isNil(): - return @[] + # If relay is not mounted, return configured shards + return configuredShards + let subscribedTopics = node.wakuRelay.subscribedTopics() + + # If relay hasn't subscribed to any topics yet, return configured shards + if subscribedTopics.len == 0: + return configuredShards + let relayShards = topicsToRelayShards(subscribedTopics).valueOr: error "could not convert relay topics to shards", error = $error, topics = subscribedTopics - return @[] + # Fall back to configured shards on error + return configuredShards if relayShards.isSome(): let shards = relayShards.get().shardIds return shards - return @[] + return configuredShards proc getCapabilitiesGetter(node: WakuNode): GetCapabilities = return proc(): seq[Capabilities] {.closure, gcsafe, raises: [].} = @@ -227,7 +235,7 @@ proc new*( rateLimitSettings: rateLimitSettings, ) - peerManager.setShardGetter(node.getShardsGetter()) + peerManager.setShardGetter(node.getShardsGetter(@[])) return node @@ -272,7 +280,7 @@ proc mountMetadata*( if not node.wakuMetadata.isNil(): return err("Waku metadata already mounted, skipping") - let metadata = WakuMetadata.new(clusterId, node.getShardsGetter()) + let metadata = WakuMetadata.new(clusterId, node.getShardsGetter(shards)) node.wakuMetadata = metadata node.peerManager.wakuMetadata = metadata @@ -413,14 +421,18 @@ proc mountRendezvousClient*(node: WakuNode, clusterId: uint16) {.async: (raises: if node.started: await node.wakuRendezvousClient.start() -proc mountRendezvous*(node: WakuNode, clusterId: uint16) {.async: (raises: []).} = +proc mountRendezvous*( + node: WakuNode, clusterId: uint16, shards: seq[RelayShard] = @[] +) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" + let configuredShards = shards.mapIt(it.shardId) + node.wakuRendezvous = WakuRendezVous.new( node.switch, node.peerManager, clusterId, - node.getShardsGetter(), + node.getShardsGetter(configuredShards), node.getCapabilitiesGetter(), node.getWakuPeerRecordGetter(), ).valueOr: