mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
fix: admin API peer shards field from metadata protocol (#3594)
* fix: admin API peer shards field from metadata protocol Store and return peer shard info from metadata protocol exchange instead of only checking ENR records. * peer_manager set shard info and extend rest test to validate it Co-authored-by: MorganaFuture <andrewmochalskyi@gmail.com>
This commit is contained in:
parent
adeb1a928e
commit
e54851d9d6
@ -65,7 +65,7 @@ suite "Waku v2 Rest API - Admin":
|
||||
): Future[void] {.async, gcsafe.} =
|
||||
await sleepAsync(0.milliseconds)
|
||||
|
||||
let shard = RelayShard(clusterId: clusterId, shardId: 0)
|
||||
let shard = RelayShard(clusterId: clusterId, shardId: 5)
|
||||
node1.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr:
|
||||
assert false, "Failed to subscribe to topic: " & $error
|
||||
node2.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr:
|
||||
@ -212,6 +212,18 @@ suite "Waku v2 Rest API - Admin":
|
||||
let conn2 = await node1.peerManager.connectPeer(peerInfo2)
|
||||
let conn3 = await node1.peerManager.connectPeer(peerInfo3)
|
||||
|
||||
var count = 0
|
||||
while count < 20:
|
||||
## Wait ~1s at most for the peer store to update shard info
|
||||
let getRes = await client.getPeers()
|
||||
if getRes.data.allIt(it.shards == @[5.uint16]):
|
||||
break
|
||||
|
||||
count.inc()
|
||||
await sleepAsync(50.milliseconds)
|
||||
|
||||
assert count < 20, "Timeout waiting for shards to be updated in peer store"
|
||||
|
||||
# Check successful connections
|
||||
check:
|
||||
conn2 == true
|
||||
|
||||
@ -658,6 +658,11 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
|
||||
$clusterId
|
||||
break guardClauses
|
||||
|
||||
# Store the shard information from metadata in the peer store
|
||||
if pm.switch.peerStore.peerExists(peerId):
|
||||
let shards = metadata.shards.mapIt(it.uint16)
|
||||
pm.switch.peerStore.setShardInfo(peerId, shards)
|
||||
|
||||
return
|
||||
|
||||
info "disconnecting from peer", peerId = peerId, reason = reason
|
||||
|
||||
@ -39,6 +39,9 @@ type
|
||||
# Keeps track of the ENR (Ethereum Node Record) of a peer
|
||||
ENRBook* = ref object of PeerBook[enr.Record]
|
||||
|
||||
# Keeps track of peer shards
|
||||
ShardBook* = ref object of PeerBook[seq[uint16]]
|
||||
|
||||
proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo =
|
||||
let addresses =
|
||||
if peerStore[LastSeenBook][peerId].isSome():
|
||||
@ -55,6 +58,7 @@ proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo =
|
||||
else:
|
||||
none(enr.Record),
|
||||
protocols: peerStore[ProtoBook][peerId],
|
||||
shards: peerStore[ShardBook][peerId],
|
||||
agent: peerStore[AgentBook][peerId],
|
||||
protoVersion: peerStore[ProtoVersionBook][peerId],
|
||||
publicKey: peerStore[KeyBook][peerId],
|
||||
@ -76,6 +80,7 @@ proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] =
|
||||
toSeq(peerStore[AddressBook].book.keys()),
|
||||
toSeq(peerStore[ProtoBook].book.keys()),
|
||||
toSeq(peerStore[KeyBook].book.keys()),
|
||||
toSeq(peerStore[ShardBook].book.keys()),
|
||||
)
|
||||
.toHashSet()
|
||||
|
||||
@ -127,6 +132,9 @@ proc addPeer*(peerStore: PeerStore, peer: RemotePeerInfo, origin = UnknownOrigin
|
||||
if peer.enr.isSome():
|
||||
peerStore[ENRBook][peer.peerId] = peer.enr.get()
|
||||
|
||||
proc setShardInfo*(peerStore: PeerStore, peerId: PeerID, shards: seq[uint16]) =
|
||||
peerStore[ShardBook][peerId] = shards
|
||||
|
||||
proc peers*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] =
|
||||
peerStore.peers().filterIt(it.protocols.contains(proto))
|
||||
|
||||
|
||||
@ -48,6 +48,7 @@ type RemotePeerInfo* = ref object
|
||||
addrs*: seq[MultiAddress]
|
||||
enr*: Option[enr.Record]
|
||||
protocols*: seq[string]
|
||||
shards*: seq[uint16]
|
||||
|
||||
agent*: string
|
||||
protoVersion*: string
|
||||
@ -73,6 +74,7 @@ proc init*(
|
||||
addrs: seq[MultiAddress] = @[],
|
||||
enr: Option[enr.Record] = none(enr.Record),
|
||||
protocols: seq[string] = @[],
|
||||
shards: seq[uint16] = @[],
|
||||
publicKey: crypto.PublicKey = crypto.PublicKey(),
|
||||
agent: string = "",
|
||||
protoVersion: string = "",
|
||||
@ -88,6 +90,7 @@ proc init*(
|
||||
addrs: addrs,
|
||||
enr: enr,
|
||||
protocols: protocols,
|
||||
shards: shards,
|
||||
publicKey: publicKey,
|
||||
agent: agent,
|
||||
protoVersion: protoVersion,
|
||||
@ -105,9 +108,12 @@ proc init*(
|
||||
addrs: seq[MultiAddress] = @[],
|
||||
enr: Option[enr.Record] = none(enr.Record),
|
||||
protocols: seq[string] = @[],
|
||||
shards: seq[uint16] = @[],
|
||||
): T {.raises: [Defect, ResultError[cstring], LPError].} =
|
||||
let peerId = PeerID.init(peerId).tryGet()
|
||||
RemotePeerInfo(peerId: peerId, addrs: addrs, enr: enr, protocols: protocols)
|
||||
RemotePeerInfo(
|
||||
peerId: peerId, addrs: addrs, enr: enr, protocols: protocols, shards: shards
|
||||
)
|
||||
|
||||
## Parse
|
||||
|
||||
@ -326,6 +332,7 @@ converter toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo =
|
||||
addrs: peerInfo.listenAddrs,
|
||||
enr: none(enr.Record),
|
||||
protocols: peerInfo.protocols,
|
||||
shards: @[],
|
||||
agent: peerInfo.agentVersion,
|
||||
protoVersion: peerInfo.protoVersion,
|
||||
publicKey: peerInfo.publicKey,
|
||||
@ -361,6 +368,9 @@ proc getAgent*(peer: RemotePeerInfo): string =
|
||||
return peer.agent
|
||||
|
||||
proc getShards*(peer: RemotePeerInfo): seq[uint16] =
|
||||
if peer.shards.len > 0:
|
||||
return peer.shards
|
||||
|
||||
if peer.enr.isNone():
|
||||
return @[]
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user