fix: peer selection by shard and rendezvous/metadata sharding initialization (#3718)

* Fix peer selection for cases where ENR is not yet advertiesed but metadata exchange already adjusted supported shards. Fix initialization rendezvous protocol with configured and autoshards to let connect to relay nodes without having a valid subscribed shard already. This solves issue for autoshard nodes to connect ahead of subscribing.
* Extend peer selection, rendezvous and metadata tests
* Fix rendezvous test, fix metadata test failing due wrong setup, added it into all_tests
This commit is contained in:
NagyZoltanPeter 2026-02-13 11:23:21 +01:00 committed by GitHub
parent 1fb4d1eab0
commit 84f791100f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 401 additions and 22 deletions

View File

@ -89,6 +89,7 @@ import
./test_waku_netconfig,
./test_waku_switch,
./test_waku_rendezvous,
./test_waku_metadata,
./waku_discv5/test_waku_discv5
# Waku Keystore test suite

View File

@ -1207,3 +1207,233 @@ procSuite "Peer Manager":
r = node1.peerManager.selectPeer(WakuPeerExchangeCodec)
assert r.isSome(), "could not retrieve peer mounting WakuPeerExchangeCodec"
asyncTest "selectPeer() filters peers by shard using ENR":
## Given: A peer manager with 3 peers having different shards in their ENRs
let
clusterId = 0.uint16
shardId0 = 0.uint16
shardId1 = 1.uint16
# Create 3 nodes with different shards
let nodes =
@[
newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = @[shardId0],
),
newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = @[shardId1],
),
newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = @[shardId0],
),
]
await allFutures(nodes.mapIt(it.start()))
for node in nodes:
discard await node.mountRelay()
# Get peer infos with ENRs
let peerInfos = collect:
for node in nodes:
var peerInfo = node.switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(node.enr)
peerInfo
# Add all peers to node 0's peer manager and peerstore
for i in 1 .. 2:
nodes[0].peerManager.addPeer(peerInfos[i])
nodes[0].peerManager.switch.peerStore[AddressBook][peerInfos[i].peerId] =
peerInfos[i].addrs
nodes[0].peerManager.switch.peerStore[ProtoBook][peerInfos[i].peerId] =
@[WakuRelayCodec]
## When: We select a peer for shard 0
let shard0Topic = some(PubsubTopic("/waku/2/rs/0/0"))
let selectedPeer0 = nodes[0].peerManager.selectPeer(WakuRelayCodec, shard0Topic)
## Then: Only peers supporting shard 0 are considered (nodes 2, not node 1)
check:
selectedPeer0.isSome()
selectedPeer0.get().peerId != peerInfos[1].peerId # node1 has shard 1
selectedPeer0.get().peerId == peerInfos[2].peerId # node2 has shard 0
## When: We select a peer for shard 1
let shard1Topic = some(PubsubTopic("/waku/2/rs/0/1"))
let selectedPeer1 = nodes[0].peerManager.selectPeer(WakuRelayCodec, shard1Topic)
## Then: Only peer with shard 1 is selected
check:
selectedPeer1.isSome()
selectedPeer1.get().peerId == peerInfos[1].peerId # node1 has shard 1
await allFutures(nodes.mapIt(it.stop()))
asyncTest "selectPeer() filters peers by shard using shards field":
## Given: A peer manager with peers having shards in RemotePeerInfo (no ENR)
let
clusterId = 0.uint16
shardId0 = 0.uint16
shardId1 = 1.uint16
# Create peer manager
let pm = PeerManager.new(
switch = SwitchBuilder.new().withRng(rng()).withMplex().withNoise().build(),
storage = nil,
)
# Create peer infos with shards field populated (simulating metadata exchange)
let basePeerId = "16Uiu2HAm7QGEZKujdSbbo1aaQyfDPQ6Bw3ybQnj6fruH5Dxwd7D"
let peers = toSeq(1 .. 3)
.mapIt(parsePeerInfo("/ip4/0.0.0.0/tcp/30300/p2p/" & basePeerId & $it))
.filterIt(it.isOk())
.mapIt(it.value)
require:
peers.len == 3
# Manually populate the shards field (ENR is not available)
var peerInfos: seq[RemotePeerInfo] = @[]
for i, peer in peers:
var peerInfo = RemotePeerInfo.init(peer.peerId, peer.addrs)
# Peer 0 and 2 have shard 0, peer 1 has shard 1
peerInfo.shards =
if i == 1:
@[shardId1]
else:
@[shardId0]
# Note: ENR is intentionally left as none
peerInfos.add(peerInfo)
# Add peers to peerstore
for peerInfo in peerInfos:
pm.switch.peerStore[AddressBook][peerInfo.peerId] = peerInfo.addrs
pm.switch.peerStore[ProtoBook][peerInfo.peerId] = @[WakuRelayCodec]
# simulate metadata exchange by setting shards field in peerstore
pm.switch.peerStore.setShardInfo(peerInfo.peerId, peerInfo.shards)
## When: We select a peer for shard 0
let shard0Topic = some(PubsubTopic("/waku/2/rs/0/0"))
let selectedPeer0 = pm.selectPeer(WakuRelayCodec, shard0Topic)
## Then: Peers with shard 0 in shards field are selected
check:
selectedPeer0.isSome()
selectedPeer0.get().peerId in [peerInfos[0].peerId, peerInfos[2].peerId]
## When: We select a peer for shard 1
let shard1Topic = some(PubsubTopic("/waku/2/rs/0/1"))
let selectedPeer1 = pm.selectPeer(WakuRelayCodec, shard1Topic)
## Then: Peer with shard 1 in shards field is selected
check:
selectedPeer1.isSome()
selectedPeer1.get().peerId == peerInfos[1].peerId
asyncTest "selectPeer() handles invalid pubsub topic gracefully":
## Given: A peer manager with valid peers
let node = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = 0,
subscribeShards = @[0'u16],
)
await node.start()
# Add a peer
let peer =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
await peer.start()
discard await peer.mountRelay()
var peerInfo = peer.switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(peer.enr)
node.peerManager.addPeer(peerInfo)
node.peerManager.switch.peerStore[ProtoBook][peerInfo.peerId] = @[WakuRelayCodec]
## When: selectPeer is called with malformed pubsub topic
let invalidTopics =
@[
some(PubsubTopic("invalid-topic")),
some(PubsubTopic("/waku/2/invalid")),
some(PubsubTopic("/waku/2/rs/abc/0")), # non-numeric cluster
some(PubsubTopic("")), # empty topic
]
## Then: Returns none(RemotePeerInfo) without crashing
for invalidTopic in invalidTopics:
let result = node.peerManager.selectPeer(WakuRelayCodec, invalidTopic)
check:
result.isNone()
await allFutures(node.stop(), peer.stop())
asyncTest "selectPeer() prioritizes ENR over shards field":
## Given: A peer with both ENR and shards field populated
let
clusterId = 0.uint16
shardId0 = 0.uint16
shardId1 = 1.uint16
let node = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = @[shardId0],
)
await node.start()
discard await node.mountRelay()
# Create peer with ENR containing shard 0
let peer = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = @[shardId0],
)
await peer.start()
discard await peer.mountRelay()
# Create peer info with ENR (shard 0) but set shards field to shard 1
var peerInfo = peer.switch.peerInfo.toRemotePeerInfo()
peerInfo.enr = some(peer.enr) # ENR has shard 0
peerInfo.shards = @[shardId1] # shards field has shard 1
node.peerManager.addPeer(peerInfo)
node.peerManager.switch.peerStore[ProtoBook][peerInfo.peerId] = @[WakuRelayCodec]
# simulate metadata exchange by setting shards field in peerstore
node.peerManager.switch.peerStore.setShardInfo(peerInfo.peerId, peerInfo.shards)
## When: We select for shard 0
let shard0Topic = some(PubsubTopic("/waku/2/rs/0/0"))
let selectedPeer = node.peerManager.selectPeer(WakuRelayCodec, shard0Topic)
## Then: Peer is selected because ENR (shard 0) takes precedence
check:
selectedPeer.isSome()
selectedPeer.get().peerId == peerInfo.peerId
## When: We select for shard 1
let shard1Topic = some(PubsubTopic("/waku/2/rs/0/1"))
let selectedPeer1 = node.peerManager.selectPeer(WakuRelayCodec, shard1Topic)
## Then: Peer is still selected because shards field is checked as fallback
check:
selectedPeer1.isSome()
selectedPeer1.get().peerId == peerInfo.peerId
await allFutures(node.stop(), peer.stop())

View File

@ -13,14 +13,15 @@ import
eth/keys,
eth/p2p/discoveryv5/enr
import
waku/
[
waku_node,
waku_core/topics,
node/peer_manager,
discovery/waku_discv5,
waku_metadata,
],
waku/[
waku_node,
waku_core/topics,
waku_core,
node/peer_manager,
discovery/waku_discv5,
waku_metadata,
waku_relay/protocol,
],
./testlib/wakucore,
./testlib/wakunode
@ -41,26 +42,86 @@ procSuite "Waku Metadata Protocol":
clusterId = clusterId,
)
# Mount metadata protocol on both nodes before starting
discard node1.mountMetadata(clusterId, @[])
discard node2.mountMetadata(clusterId, @[])
# Mount relay so metadata can track subscriptions
discard await node1.mountRelay()
discard await node2.mountRelay()
# Start nodes
await allFutures([node1.start(), node2.start()])
node1.topicSubscriptionQueue.emit((kind: PubsubSub, topic: "/waku/2/rs/10/7"))
node1.topicSubscriptionQueue.emit((kind: PubsubSub, topic: "/waku/2/rs/10/6"))
# Subscribe to topics on node1 - relay will track these and metadata will report them
let noOpHandler: WakuRelayHandler = proc(
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[void] {.async.} =
discard
node1.wakuRelay.subscribe("/waku/2/rs/10/7", noOpHandler)
node1.wakuRelay.subscribe("/waku/2/rs/10/6", noOpHandler)
# Create connection
let connOpt = await node2.peerManager.dialPeer(
node1.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec
)
require:
connOpt.isSome
connOpt.isSome()
# Request metadata
let response1 = await node2.wakuMetadata.request(connOpt.get())
# Check the response or dont even continue
require:
response1.isOk
response1.isOk()
check:
response1.get().clusterId.get() == clusterId
response1.get().shards == @[uint32(6), uint32(7)]
await allFutures([node1.stop(), node2.stop()])
asyncTest "Metadata reports configured shards before relay subscription":
## Given: Node with configured shards but no relay subscriptions yet
let
clusterId = 10.uint16
configuredShards = @[uint16(0), uint16(1)]
let node1 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = configuredShards,
)
let node2 = newTestWakuNode(
generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), clusterId = clusterId
)
# Mount metadata with configured shards on node1
discard node1.mountMetadata(clusterId, configuredShards)
# Mount metadata on node2 so it can make requests
discard node2.mountMetadata(clusterId, @[])
# Start nodes (relay is NOT mounted yet on node1)
await allFutures([node1.start(), node2.start()])
## When: Node2 requests metadata from Node1 before relay is active
let connOpt = await node2.peerManager.dialPeer(
node1.switch.peerInfo.toRemotePeerInfo(), WakuMetadataCodec
)
require:
connOpt.isSome
let response = await node2.wakuMetadata.request(connOpt.get())
## Then: Response contains configured shards even without relay subscriptions
require:
response.isOk()
check:
response.get().clusterId.get() == clusterId
response.get().shards == @[uint32(0), uint32(1)]
await allFutures([node1.stop(), node2.stop()])

View File

@ -10,6 +10,7 @@ import
import
waku/waku_core/peers,
waku/waku_core/codecs,
waku/waku_core,
waku/node/waku_node,
waku/node/peer_manager/peer_manager,
waku/waku_rendezvous/protocol,
@ -81,3 +82,65 @@ procSuite "Waku Rendezvous":
records.len == 1
records[0].peerId == peerInfo1.peerId
#records[0].mixPubKey == $node1.wakuMix.pubKey
asyncTest "Rendezvous advertises configured shards before relay is active":
## Given: A node with configured shards but no relay subscriptions yet
let
clusterId = 10.uint16
configuredShards = @[RelayShard(clusterId: clusterId, shardId: 0)]
let node = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
subscribeShards = @[0'u16],
)
## When: Node mounts rendezvous with configured shards (before relay)
await node.mountRendezvous(clusterId, configuredShards)
await node.start()
## Then: The rendezvous protocol should be mounted successfully
check:
node.wakuRendezvous != nil
# Verify that the protocol is running without errors
# (shards are used internally by the getShardsGetter closure)
let namespace = computeMixNamespace(clusterId)
check:
namespace.len > 0
await node.stop()
asyncTest "Rendezvous uses configured shards when relay not mounted":
## Given: A light client node with no relay protocol
let
clusterId = 10.uint16
configuredShards =
@[
RelayShard(clusterId: clusterId, shardId: 0),
RelayShard(clusterId: clusterId, shardId: 1),
]
let lightClient = newTestWakuNode(
generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0), clusterId = clusterId
)
## When: Node mounts rendezvous with configured shards (no relay mounted)
await lightClient.mountRendezvous(clusterId, configuredShards)
await lightClient.start()
## Then: Rendezvous should be mounted successfully without relay
check:
lightClient.wakuRendezvous != nil
lightClient.wakuRelay == nil # Verify relay is not mounted
# Verify the protocol is working (doesn't fail immediately)
# advertiseAll requires peers,so we just check the protocol is initialized
await sleepAsync(100.milliseconds)
check:
lightClient.wakuRendezvous != nil
await lightClient.stop()

View File

@ -337,7 +337,7 @@ proc setupProtocols(
node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId)
if conf.rendezvous:
await node.mountRendezvous(conf.clusterId)
await node.mountRendezvous(conf.clusterId, shards)
await node.mountRendezvousClient(conf.clusterId)
# Keepalive mounted on all nodes

View File

@ -227,7 +227,19 @@ proc selectPeer*(
protocol = proto, peers, address = cast[uint](pm.switch.peerStore)
if shard.isSome():
peers.keepItIf((it.enr.isSome() and it.enr.get().containsShard(shard.get())))
# Parse the shard from the pubsub topic to get cluster and shard ID
let shardInfo = RelayShard.parse(shard.get()).valueOr:
trace "Failed to parse shard from pubsub topic", topic = shard.get()
return none(RemotePeerInfo)
# Filter peers that support the requested shard
# Check both ENR (if present) and the shards field on RemotePeerInfo
peers.keepItIf(
# Check ENR if available
(it.enr.isSome() and it.enr.get().containsShard(shard.get())) or
# Otherwise check the shards field directly
(it.shards.len > 0 and it.shards.contains(shardInfo.shardId))
)
shuffle(peers)

View File

@ -167,20 +167,28 @@ proc deduceRelayShard(
return err("Invalid topic:" & pubsubTopic & " " & $error)
return ok(shard)
proc getShardsGetter(node: WakuNode): GetShards =
proc getShardsGetter(node: WakuNode, configuredShards: seq[uint16]): GetShards =
return proc(): seq[uint16] {.closure, gcsafe, raises: [].} =
# fetch pubsubTopics subscribed to relay and convert them to shards
if node.wakuRelay.isNil():
return @[]
# If relay is not mounted, return configured shards
return configuredShards
let subscribedTopics = node.wakuRelay.subscribedTopics()
# If relay hasn't subscribed to any topics yet, return configured shards
if subscribedTopics.len == 0:
return configuredShards
let relayShards = topicsToRelayShards(subscribedTopics).valueOr:
error "could not convert relay topics to shards",
error = $error, topics = subscribedTopics
return @[]
# Fall back to configured shards on error
return configuredShards
if relayShards.isSome():
let shards = relayShards.get().shardIds
return shards
return @[]
return configuredShards
proc getCapabilitiesGetter(node: WakuNode): GetCapabilities =
return proc(): seq[Capabilities] {.closure, gcsafe, raises: [].} =
@ -227,7 +235,7 @@ proc new*(
rateLimitSettings: rateLimitSettings,
)
peerManager.setShardGetter(node.getShardsGetter())
peerManager.setShardGetter(node.getShardsGetter(@[]))
return node
@ -272,7 +280,7 @@ proc mountMetadata*(
if not node.wakuMetadata.isNil():
return err("Waku metadata already mounted, skipping")
let metadata = WakuMetadata.new(clusterId, node.getShardsGetter())
let metadata = WakuMetadata.new(clusterId, node.getShardsGetter(shards))
node.wakuMetadata = metadata
node.peerManager.wakuMetadata = metadata
@ -413,14 +421,18 @@ proc mountRendezvousClient*(node: WakuNode, clusterId: uint16) {.async: (raises:
if node.started:
await node.wakuRendezvousClient.start()
proc mountRendezvous*(node: WakuNode, clusterId: uint16) {.async: (raises: []).} =
proc mountRendezvous*(
node: WakuNode, clusterId: uint16, shards: seq[RelayShard] = @[]
) {.async: (raises: []).} =
info "mounting rendezvous discovery protocol"
let configuredShards = shards.mapIt(it.shardId)
node.wakuRendezvous = WakuRendezVous.new(
node.switch,
node.peerManager,
clusterId,
node.getShardsGetter(),
node.getShardsGetter(configuredShards),
node.getCapabilitiesGetter(),
node.getWakuPeerRecordGetter(),
).valueOr: