From e54851d9d63fd9e7e5e70844364a4464c4dba8f4 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Thu, 20 Nov 2025 13:12:16 +0100 Subject: [PATCH] fix: admin API peer shards field from metadata protocol (#3594) * fix: admin API peer shards field from metadata protocol Store and return peer shard info from metadata protocol exchange instead of only checking ENR records. * peer_manager set shard info and extend rest test to validate it Co-authored-by: MorganaFuture --- tests/wakunode_rest/test_rest_admin.nim | 14 +++++++++++++- waku/node/peer_manager/peer_manager.nim | 5 +++++ waku/node/peer_manager/waku_peer_store.nim | 8 ++++++++ waku/waku_core/peers.nim | 12 +++++++++++- 4 files changed, 37 insertions(+), 2 deletions(-) diff --git a/tests/wakunode_rest/test_rest_admin.nim b/tests/wakunode_rest/test_rest_admin.nim index 6de886f74..ef82b8dfc 100644 --- a/tests/wakunode_rest/test_rest_admin.nim +++ b/tests/wakunode_rest/test_rest_admin.nim @@ -65,7 +65,7 @@ suite "Waku v2 Rest API - Admin": ): Future[void] {.async, gcsafe.} = await sleepAsync(0.milliseconds) - let shard = RelayShard(clusterId: clusterId, shardId: 0) + let shard = RelayShard(clusterId: clusterId, shardId: 5) node1.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr: assert false, "Failed to subscribe to topic: " & $error node2.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr: @@ -212,6 +212,18 @@ suite "Waku v2 Rest API - Admin": let conn2 = await node1.peerManager.connectPeer(peerInfo2) let conn3 = await node1.peerManager.connectPeer(peerInfo3) + var count = 0 + while count < 20: + ## Wait ~1s at most for the peer store to update shard info + let getRes = await client.getPeers() + if getRes.data.allIt(it.shards == @[5.uint16]): + break + + count.inc() + await sleepAsync(50.milliseconds) + + assert count < 20, "Timeout waiting for shards to be updated in peer store" + # Check successful connections check: conn2 == true diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 72b526aca..1abcc1ac0 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -658,6 +658,11 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = $clusterId break guardClauses + # Store the shard information from metadata in the peer store + if pm.switch.peerStore.peerExists(peerId): + let shards = metadata.shards.mapIt(it.uint16) + pm.switch.peerStore.setShardInfo(peerId, shards) + return info "disconnecting from peer", peerId = peerId, reason = reason diff --git a/waku/node/peer_manager/waku_peer_store.nim b/waku/node/peer_manager/waku_peer_store.nim index 0098c1687..c9e2d4817 100644 --- a/waku/node/peer_manager/waku_peer_store.nim +++ b/waku/node/peer_manager/waku_peer_store.nim @@ -39,6 +39,9 @@ type # Keeps track of the ENR (Ethereum Node Record) of a peer ENRBook* = ref object of PeerBook[enr.Record] + # Keeps track of peer shards + ShardBook* = ref object of PeerBook[seq[uint16]] + proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo = let addresses = if peerStore[LastSeenBook][peerId].isSome(): @@ -55,6 +58,7 @@ proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo = else: none(enr.Record), protocols: peerStore[ProtoBook][peerId], + shards: peerStore[ShardBook][peerId], agent: peerStore[AgentBook][peerId], protoVersion: peerStore[ProtoVersionBook][peerId], publicKey: peerStore[KeyBook][peerId], @@ -76,6 +80,7 @@ proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] = toSeq(peerStore[AddressBook].book.keys()), toSeq(peerStore[ProtoBook].book.keys()), toSeq(peerStore[KeyBook].book.keys()), + toSeq(peerStore[ShardBook].book.keys()), ) .toHashSet() @@ -127,6 +132,9 @@ proc addPeer*(peerStore: PeerStore, peer: RemotePeerInfo, origin = UnknownOrigin if peer.enr.isSome(): peerStore[ENRBook][peer.peerId] = peer.enr.get() +proc setShardInfo*(peerStore: PeerStore, peerId: PeerID, shards: seq[uint16]) = + peerStore[ShardBook][peerId] = shards + proc peers*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] = peerStore.peers().filterIt(it.protocols.contains(proto)) diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index 5591699c6..76ff29aa0 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -48,6 +48,7 @@ type RemotePeerInfo* = ref object addrs*: seq[MultiAddress] enr*: Option[enr.Record] protocols*: seq[string] + shards*: seq[uint16] agent*: string protoVersion*: string @@ -73,6 +74,7 @@ proc init*( addrs: seq[MultiAddress] = @[], enr: Option[enr.Record] = none(enr.Record), protocols: seq[string] = @[], + shards: seq[uint16] = @[], publicKey: crypto.PublicKey = crypto.PublicKey(), agent: string = "", protoVersion: string = "", @@ -88,6 +90,7 @@ proc init*( addrs: addrs, enr: enr, protocols: protocols, + shards: shards, publicKey: publicKey, agent: agent, protoVersion: protoVersion, @@ -105,9 +108,12 @@ proc init*( addrs: seq[MultiAddress] = @[], enr: Option[enr.Record] = none(enr.Record), protocols: seq[string] = @[], + shards: seq[uint16] = @[], ): T {.raises: [Defect, ResultError[cstring], LPError].} = let peerId = PeerID.init(peerId).tryGet() - RemotePeerInfo(peerId: peerId, addrs: addrs, enr: enr, protocols: protocols) + RemotePeerInfo( + peerId: peerId, addrs: addrs, enr: enr, protocols: protocols, shards: shards + ) ## Parse @@ -326,6 +332,7 @@ converter toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo = addrs: peerInfo.listenAddrs, enr: none(enr.Record), protocols: peerInfo.protocols, + shards: @[], agent: peerInfo.agentVersion, protoVersion: peerInfo.protoVersion, publicKey: peerInfo.publicKey, @@ -361,6 +368,9 @@ proc getAgent*(peer: RemotePeerInfo): string = return peer.agent proc getShards*(peer: RemotePeerInfo): seq[uint16] = + if peer.shards.len > 0: + return peer.shards + if peer.enr.isNone(): return @[]