chore: extended /admin/v1 RESP API with different option to look at current connected/relay/mesh state of the node (#3382)

* Extended /admin/v1 RESP API with different option to look at current connected/relay/mesh state of the node
* Added score information for peer info retrievals
This commit is contained in:
NagyZoltanPeter 2025-04-24 08:36:02 +02:00 committed by GitHub
parent 0304f063b8
commit ab8a30d3d6
9 changed files with 589 additions and 186 deletions

View File

@ -1,11 +1,11 @@
{.used.}
import
std/[sequtils, net],
stew/shims/net,
std/[sequtils, strformat, net],
testutils/unittests,
presto,
presto/client as presto_client,
presto /../ tests/helpers,
libp2p/crypto/crypto
import
@ -43,10 +43,11 @@ suite "Waku v2 Rest API - Admin":
node3 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60604))
await allFutures(node1.start(), node2.start(), node3.start())
let shards = @[RelayShard(clusterId: 1, shardId: 0)]
await allFutures(
node1.mountRelay(),
node2.mountRelay(),
node3.mountRelay(),
node1.mountRelay(shards = shards),
node2.mountRelay(shards = shards),
node3.mountRelay(shards = shards),
node3.mountPeerExchange(),
)
@ -203,3 +204,96 @@ suite "Waku v2 Rest API - Admin":
getRes.data.anyIt(it.origin == Discv5)
# Check peer 3
getRes.data.anyIt(it.origin == PeerExchange)
asyncTest "get peers by id":
# Connect to nodes 2 and 3 using the Admin API
let postRes = await client.postPeers(
@[constructMultiaddrStr(peerInfo2), constructMultiaddrStr(peerInfo3)]
)
check:
postRes.status == 200
let getRes = await client.getPeerById($peerInfo2.peerId)
check:
getRes.status == 200
$getRes.contentType == $MIMETYPE_JSON
getRes.data.protocols.find(WakuRelayCodec) >= 0
getRes.data.multiaddr == constructMultiaddrStr(peerInfo2)
## nim-presto library's RestClient does not support text error case decode if
## the RestResponse expects a JSON with complex type
# let getRes2 = await client.getPeerById("bad peer id")
let getRes2 = await httpClient(
restServer.httpServer.address, MethodGet, "/admin/v1/peer/bad+peer+id", ""
)
check:
getRes2.status == 400
getRes2.data == "Invalid argument:peerid: incorrect PeerId string"
asyncTest "get connected peers":
# Connect to nodes 2 and 3 using the Admin API
let postRes = await client.postPeers(
@[constructMultiaddrStr(peerInfo2), constructMultiaddrStr(peerInfo3)]
)
check:
postRes.status == 200
let getRes = await client.getConnectedPeers()
check:
getRes.status == 200
$getRes.contentType == $MIMETYPE_JSON
getRes.data.len() == 2
# Check peer 2
getRes.data.anyIt(it.multiaddr == constructMultiaddrStr(peerInfo2))
# Check peer 3
getRes.data.anyIt(it.multiaddr == constructMultiaddrStr(peerInfo3))
# Seems shard info is not available in the peer manager
# let getRes2 = await client.getConnectedPeersByShard(0)
# check:
# getRes2.status == 200
# $getRes2.contentType == $MIMETYPE_JSON
# getRes2.data.len() == 2
let getRes3 = await client.getConnectedPeersByShard(99)
check:
getRes3.status == 200
$getRes3.contentType == $MIMETYPE_JSON
getRes3.data.len() == 0
asyncTest "get relay peers":
# Connect to nodes 2 and 3 using the Admin API
let postRes = await client.postPeers(
@[constructMultiaddrStr(peerInfo2), constructMultiaddrStr(peerInfo3)]
)
check:
postRes.status == 200
let getRes = await client.getConnectedRelayPeers()
check:
getRes.status == 200
$getRes.contentType == $MIMETYPE_JSON
require getRes.data.len() == 1 # Check peer 2
check getRes.data[0].peers.anyIt(it.multiaddr == constructMultiaddrStr(peerInfo2))
# Check peer 2
check getRes.data[0].peers.anyIt(it.multiaddr == constructMultiaddrStr(peerInfo3))
# Check peer 3
# Todo: investigate why the test setup missing remote peer's shard info
# let getRes2 = await client.getConnectedRelayPeersByShard(0)
# check:
# getRes2.status == 200
# $getRes2.contentType == $MIMETYPE_JSON
# getRes2.data.peers.len() == 2
let getRes3 = await client.getConnectedRelayPeersByShard(99)
check:
getRes3.status == 200
$getRes3.contentType == $MIMETYPE_JSON
getRes3.data.peers.len() == 0

View File

@ -154,7 +154,7 @@ proc addPeer*(
pm.storage.insertOrReplace(remotePeerInfo)
proc getPeer(pm: PeerManager, peerId: PeerId): RemotePeerInfo =
proc getPeer*(pm: PeerManager, peerId: PeerId): RemotePeerInfo =
return pm.switch.peerStore.getPeer(peerId)
proc loadFromStorage(pm: PeerManager) {.gcsafe.} =

View File

@ -22,6 +22,44 @@ proc postPeers*(
rest, endpoint: "/admin/v1/peers", meth: HttpMethod.MethodPost
.}
proc getPeerById*(
peerId: string
): RestResponse[WakuPeer] {.
rest, endpoint: "/admin/v1/peer/{peerId}", meth: HttpMethod.MethodGet
.}
proc getConnectedPeers*(): RestResponse[seq[WakuPeer]] {.
rest, endpoint: "/admin/v1/peers/connected", meth: HttpMethod.MethodGet
.}
proc getConnectedPeersByShard*(
shardId: uint16
): RestResponse[seq[WakuPeer]] {.
rest, endpoint: "/admin/v1/peers/connected/on/{shardId}", meth: HttpMethod.MethodGet
.}
proc getConnectedRelayPeers*(): RestResponse[PeersOfShards] {.
rest, endpoint: "/admin/v1/peers/connected/relay", meth: HttpMethod.MethodGet
.}
proc getConnectedRelayPeersByShard*(
shardId: uint16
): RestResponse[PeersOfShard] {.
rest,
endpoint: "/admin/v1/peers/connected/relay/on/{shardId}",
meth: HttpMethod.MethodGet
.}
proc getMeshPeers*(): RestResponse[PeersOfShards] {.
rest, endpoint: "/admin/v1/peers/mesh", meth: HttpMethod.MethodGet
.}
proc getMeshPeersByShard*(
shardId: uint16
): RestResponse[PeersOfShard] {.
rest, endpoint: "/admin/v1/peers/mesh/on/{shardId}", meth: HttpMethod.MethodGet
.}
proc getFilterSubscriptions*(): RestResponse[seq[FilterSubscription]] {.
rest, endpoint: "/admin/v1/filter/subscriptions", meth: HttpMethod.MethodGet
.}

View File

@ -1,22 +1,26 @@
{.push raises: [].}
import
std/[strformat, sequtils, tables],
std/[sets, strformat, sequtils, tables],
chronicles,
json_serialization,
presto/route,
libp2p/[peerinfo, switch]
libp2p/[peerinfo, switch, peerid, protocols/pubsub/pubsubpeer]
import
../../../waku_core,
../../../waku_store_legacy/common,
../../../waku_store/common,
../../../waku_filter_v2,
../../../waku_lightpush_legacy/common,
../../../waku_relay,
../../../waku_peer_exchange,
../../../waku_node,
../../../node/peer_manager,
waku/[
waku_core,
waku_core/topics/pubsub_topic,
waku_store_legacy/common,
waku_store/common,
waku_filter_v2,
waku_lightpush_legacy/common,
waku_relay,
waku_peer_exchange,
waku_node,
node/peer_manager,
waku_enr/sharding,
],
../responses,
../serdes,
../rest_serdes,
@ -27,103 +31,260 @@ export types
logScope:
topics = "waku node rest admin api"
const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers"
const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers" # returns all peers
const ROUTE_ADMIN_V1_SINGLE_PEER* = "/admin/v1/peer/{peerId}"
const ROUTE_ADMIN_V1_CONNECTED_PEERS* = "/admin/v1/peers/connected"
const ROUTE_ADMIN_V1_CONNECTED_PEERS_ON_SHARD* =
"/admin/v1/peers/connected/on/{shardId}"
const ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS* = "/admin/v1/peers/connected/relay"
const ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS_ON_SHARD* =
"/admin/v1/peers/connected/relay/on/{shardId}"
const ROUTE_ADMIN_V1_MESH_PEERS* = "/admin/v1/peers/mesh"
const ROUTE_ADMIN_V1_MESH_PEERS_ON_SHARD* = "/admin/v1/peers/mesh/on/{shardId}"
const ROUTE_ADMIN_V1_FILTER_SUBS* = "/admin/v1/filter/subscriptions"
type PeerProtocolTuple =
tuple[multiaddr: string, protocol: string, connected: bool, origin: PeerOrigin]
tuple[
multiaddr: string,
protocol: string,
shards: seq[uint16],
connected: Connectedness,
agent: string,
origin: PeerOrigin,
]
proc tuplesToWakuPeers(peers: var WakuPeers, peersTup: seq[PeerProtocolTuple]) =
for peer in peersTup:
peers.add(peer.multiaddr, peer.protocol, peer.connected, peer.origin)
peers.add(
peer.multiaddr, peer.protocol, peer.shards, peer.connected, peer.agent,
peer.origin,
)
proc populateAdminPeerInfo(peers: var WakuPeers, node: WakuNode, codec: string) =
let peersForCodec = node.peerManager.switch.peerStore.peers(codec).mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: codec,
shards: it.getShards(),
connected: it.connectedness,
agent: it.agent,
origin: it.origin,
)
)
tuplesToWakuPeers(peers, peersForCodec)
proc populateAdminPeerInfoForCodecs(node: WakuNode, codecs: seq[string]): WakuPeers =
var peers: WakuPeers = @[]
for codec in codecs:
populateAdminPeerInfo(peers, node, codec)
return peers
proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
router.api(MethodGet, ROUTE_ADMIN_V1_PEERS) do() -> RestApiResponse:
var peers: WakuPeers = @[]
let relayPeers = node.peerManager.switch.peerStore.peers(WakuRelayCodec).mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: WakuRelayCodec,
connected: it.connectedness == Connectedness.Connected,
origin: it.origin,
)
)
tuplesToWakuPeers(peers, relayPeers)
let filterV2Peers = node.peerManager.switch.peerStore
.peers(WakuFilterSubscribeCodec)
.mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: WakuFilterSubscribeCodec,
connected: it.connectedness == Connectedness.Connected,
origin: it.origin,
)
)
tuplesToWakuPeers(peers, filterV2Peers)
let storePeers = node.peerManager.switch.peerStore.peers(WakuStoreCodec).mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: WakuStoreCodec,
connected: it.connectedness == Connectedness.Connected,
origin: it.origin,
)
)
tuplesToWakuPeers(peers, storePeers)
let legacyStorePeers = node.peerManager.switch.peerStore
.peers(WakuLegacyStoreCodec)
.mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: WakuLegacyStoreCodec,
connected: it.connectedness == Connectedness.Connected,
origin: it.origin,
)
)
tuplesToWakuPeers(peers, legacyStorePeers)
let legacyLightpushPeers = node.peerManager.switch.peerStore
.peers(WakuLegacyLightPushCodec)
.mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: WakuLegacyLightPushCodec,
connected: it.connectedness == Connectedness.Connected,
origin: it.origin,
)
)
tuplesToWakuPeers(peers, legacyLightpushPeers)
let lightpushPeers = node.peerManager.switch.peerStore
.peers(WakuLightPushCodec)
.mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: WakuLightPushCodec,
connected: it.connectedness == Connectedness.Connected,
origin: it.origin,
)
)
tuplesToWakuPeers(peers, lightpushPeers)
let pxPeers = node.peerManager.switch.peerStore.peers(WakuPeerExchangeCodec).mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: WakuPeerExchangeCodec,
connected: it.connectedness == Connectedness.Connected,
origin: it.origin,
)
)
tuplesToWakuPeers(peers, pxPeers)
let peers = populateAdminPeerInfoForCodecs(
node,
@[
WakuRelayCodec, WakuFilterSubscribeCodec, WakuStoreCodec, WakuLegacyStoreCodec,
WakuLegacyLightPushCodec, WakuLightPushCodec, WakuPeerExchangeCodec,
WakuReconciliationCodec,
],
)
let resp = RestApiResponse.jsonResponse(peers, status = Http200)
if resp.isErr():
error "An error ocurred while building the json respose: ", error = resp.error
error "An error occurred while building the json response: ", error = resp.error
return RestApiResponse.internalServerError(
fmt("An error ocurred while building the json respose: {resp.error}")
fmt("An error occurred while building the json response: {resp.error}")
)
return resp.get()
router.api(MethodGet, ROUTE_ADMIN_V1_SINGLE_PEER) do(
peerId: string
) -> RestApiResponse:
let peerIdString = peerId.valueOr:
return RestApiResponse.badRequest("Invalid argument:" & $error)
let peerIdVal: PeerId = PeerId.init(peerIdString).valueOr:
return RestApiResponse.badRequest("Invalid argument:" & $error)
if node.peerManager.switch.peerStore.peerExists(peerIdVal):
let peerInfo = node.peerManager.switch.peerStore.getPeer(peerIdVal)
let peer = WakuPeer.init(peerInfo)
let resp = RestApiResponse.jsonResponse(peer, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
)
return resp.get()
else:
return RestApiResponse.notFound(fmt("Peer with ID {peerId} not found"))
router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_PEERS) do() -> RestApiResponse:
let allPeers = populateAdminPeerInfoForCodecs(
node,
@[
WakuRelayCodec, WakuFilterSubscribeCodec, WakuStoreCodec, WakuLegacyStoreCodec,
WakuLegacyLightPushCodec, WakuLightPushCodec, WakuPeerExchangeCodec,
WakuReconciliationCodec,
],
)
let connectedPeers = allPeers.filterIt(it.connected == Connectedness.Connected)
let resp = RestApiResponse.jsonResponse(connectedPeers, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
)
return resp.get()
router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_PEERS_ON_SHARD) do(
shardId: uint16
) -> RestApiResponse:
let shard = shardId.valueOr:
return RestApiResponse.badRequest(fmt("Invalid shardId: {error}"))
let allPeers = populateAdminPeerInfoForCodecs(
node,
@[
WakuRelayCodec, WakuFilterSubscribeCodec, WakuStoreCodec, WakuLegacyStoreCodec,
WakuLegacyLightPushCodec, WakuLightPushCodec, WakuPeerExchangeCodec,
WakuReconciliationCodec,
],
)
let connectedPeers = allPeers.filterIt(
it.connected == Connectedness.Connected and it.shards.contains(shard)
)
let resp = RestApiResponse.jsonResponse(connectedPeers, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
)
return resp.get()
router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS) do() -> RestApiResponse:
if node.wakuRelay.isNil():
return RestApiResponse.serviceUnavailable(
"Error: Relay Protocol is not mounted to the node"
)
var relayPeers: PeersOfShards = @[]
for topic in node.wakuRelay.getSubscribedTopics():
let relayShard = RelayShard.parse(topic).valueOr:
error "Invalid subscribed topic", error = error, topic = topic
continue
let pubsubPeers =
node.wakuRelay.getConnectedPubSubPeers(topic).get(initHashSet[PubSubPeer](0))
relayPeers.add(
PeersOfShard(
shard: relayShard.shardId,
peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager)),
)
)
let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
)
return resp.get()
router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS_ON_SHARD) do(
shardId: uint16
) -> RestApiResponse:
let shard = shardId.valueOr:
return RestApiResponse.badRequest(fmt("Invalid shardId: {error}"))
if node.wakuRelay.isNil():
return RestApiResponse.serviceUnavailable(
"Error: Relay Protocol is not mounted to the node"
)
let topic =
toPubsubTopic(RelayShard(clusterId: node.wakuSharding.clusterId, shardId: shard))
let pubsubPeers =
node.wakuRelay.getConnectedPubSubPeers(topic).get(initHashSet[PubSubPeer](0))
let relayPeer = PeersOfShard(
shard: shard, peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager))
)
let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
)
return resp.get()
router.api(MethodGet, ROUTE_ADMIN_V1_MESH_PEERS) do() -> RestApiResponse:
if node.wakuRelay.isNil():
return RestApiResponse.serviceUnavailable(
"Error: Relay Protocol is not mounted to the node"
)
var relayPeers: PeersOfShards = @[]
for topic in node.wakuRelay.getSubscribedTopics():
let relayShard = RelayShard.parse(topic).valueOr:
error "Invalid subscribed topic", error = error, topic = topic
continue
let peers =
node.wakuRelay.getPubSubPeersInMesh(topic).get(initHashSet[PubSubPeer](0))
relayPeers.add(
PeersOfShard(
shard: relayShard.shardId,
peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager)),
)
)
let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
)
return resp.get()
router.api(MethodGet, ROUTE_ADMIN_V1_MESH_PEERS_ON_SHARD) do(
shardId: uint16
) -> RestApiResponse:
let shard = shardId.valueOr:
return RestApiResponse.badRequest(fmt("Invalid shardId: {error}"))
if node.wakuRelay.isNil():
return RestApiResponse.serviceUnavailable(
"Error: Relay Protocol is not mounted to the node"
)
let topic =
toPubsubTopic(RelayShard(clusterId: node.wakuSharding.clusterId, shardId: shard))
let peers =
node.wakuRelay.getPubSubPeersInMesh(topic).get(initHashSet[PubSubPeer](0))
let relayPeer = PeersOfShard(
shard: shard, peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager))
)
let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200)
if resp.isErr():
error "An error occurred while building the json response: ", error = resp.error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {resp.error}")
)
return resp.get()

View File

@ -4,22 +4,29 @@ import
chronicles,
json_serialization,
json_serialization/std/options,
json_serialization/lexer
import ../serdes, ../../../waku_core
json_serialization/lexer,
results,
libp2p/protocols/pubsub/pubsubpeer
import waku/[waku_core, node/peer_manager], ../serdes
#### Types
type ProtocolState* = object
protocol*: string
connected*: bool
type WakuPeer* = object
multiaddr*: string
protocols*: seq[ProtocolState]
protocols*: seq[string]
shards*: seq[uint16]
connected*: Connectedness
agent*: string
origin*: PeerOrigin
score*: Option[float64]
type WakuPeers* = seq[WakuPeer]
type PeersOfShard* = object
shard*: uint16
peers*: WakuPeers
type PeersOfShards* = seq[PeersOfShard]
type FilterTopic* = object
pubsubTopic*: string
contentTopic*: string
@ -29,22 +36,25 @@ type FilterSubscription* = object
filterCriteria*: seq[FilterTopic]
#### Serialization and deserialization
proc writeValue*(
writer: var JsonWriter[RestJson], value: ProtocolState
) {.raises: [IOError].} =
writer.beginRecord()
writer.writeField("protocol", value.protocol)
writer.writeField("connected", value.connected)
writer.endRecord()
proc writeValue*(
writer: var JsonWriter[RestJson], value: WakuPeer
) {.raises: [IOError].} =
writer.beginRecord()
writer.writeField("multiaddr", value.multiaddr)
writer.writeField("protocols", value.protocols)
writer.writeField("shards", value.shards)
writer.writeField("connected", value.connected)
writer.writeField("agent", value.agent)
writer.writeField("origin", value.origin)
writer.writeField("score", value.score)
writer.endRecord()
proc writeValue*(
writer: var JsonWriter[RestJson], value: PeersOfShard
) {.raises: [IOError].} =
writer.beginRecord()
writer.writeField("shard", value.shard)
writer.writeField("peers", value.peers)
writer.endRecord()
proc writeValue*(
@ -63,43 +73,17 @@ proc writeValue*(
writer.writeField("filterCriteria", value.filterCriteria)
writer.endRecord()
proc readValue*(
reader: var JsonReader[RestJson], value: var ProtocolState
) {.gcsafe, raises: [SerializationError, IOError].} =
var
protocol: Option[string]
connected: Option[bool]
for fieldName in readObjectFields(reader):
case fieldName
of "protocol":
if protocol.isSome():
reader.raiseUnexpectedField("Multiple `protocol` fields found", "ProtocolState")
protocol = some(reader.readValue(string))
of "connected":
if connected.isSome():
reader.raiseUnexpectedField(
"Multiple `connected` fields found", "ProtocolState"
)
connected = some(reader.readValue(bool))
else:
unrecognizedFieldWarning(value)
if connected.isNone():
reader.raiseUnexpectedValue("Field `connected` is missing")
if protocol.isNone():
reader.raiseUnexpectedValue("Field `protocol` is missing")
value = ProtocolState(protocol: protocol.get(), connected: connected.get())
proc readValue*(
reader: var JsonReader[RestJson], value: var WakuPeer
) {.gcsafe, raises: [SerializationError, IOError].} =
var
multiaddr: Option[string]
protocols: Option[seq[ProtocolState]]
protocols: Option[seq[string]]
shards: Option[seq[uint16]]
connected: Option[Connectedness]
agent: Option[string]
origin: Option[PeerOrigin]
score: Option[float64]
for fieldName in readObjectFields(reader):
case fieldName
@ -110,11 +94,27 @@ proc readValue*(
of "protocols":
if protocols.isSome():
reader.raiseUnexpectedField("Multiple `protocols` fields found", "WakuPeer")
protocols = some(reader.readValue(seq[ProtocolState]))
protocols = some(reader.readValue(seq[string]))
of "shards":
if shards.isSome():
reader.raiseUnexpectedField("Multiple `shards` fields found", "WakuPeer")
shards = some(reader.readValue(seq[uint16]))
of "connected":
if connected.isSome():
reader.raiseUnexpectedField("Multiple `connected` fields found", "WakuPeer")
connected = some(reader.readValue(Connectedness))
of "agent":
if agent.isSome():
reader.raiseUnexpectedField("Multiple `agent` fields found", "WakuPeer")
agent = some(reader.readValue(string))
of "origin":
if origin.isSome():
reader.raiseUnexpectedField("Multiple `origin` fields found", "WakuPeer")
origin = some(reader.readValue(PeerOrigin))
of "score":
if score.isSome():
reader.raiseUnexpectedField("Multiple `score` fields found", "WakuPeer")
score = some(reader.readValue(float64))
else:
unrecognizedFieldWarning(value)
@ -124,13 +124,56 @@ proc readValue*(
if protocols.isNone():
reader.raiseUnexpectedValue("Field `protocols` are missing")
if shards.isNone():
reader.raiseUnexpectedValue("Field `shards` is missing")
if connected.isNone():
reader.raiseUnexpectedValue("Field `connected` is missing")
if agent.isNone():
reader.raiseUnexpectedValue("Field `agent` is missing")
if origin.isNone():
reader.raiseUnexpectedValue("Field `origin` is missing")
value = WakuPeer(
multiaddr: multiaddr.get(), protocols: protocols.get(), origin: origin.get()
multiaddr: multiaddr.get(),
protocols: protocols.get(),
shards: shards.get(),
connected: connected.get(),
agent: agent.get(),
origin: origin.get(),
score: score,
)
proc readValue*(
reader: var JsonReader[RestJson], value: var PeersOfShard
) {.gcsafe, raises: [SerializationError, IOError].} =
var
shard: Option[uint16]
peers: Option[WakuPeers]
for fieldName in readObjectFields(reader):
case fieldName
of "shard":
if shard.isSome():
reader.raiseUnexpectedField("Multiple `shard` fields found", "PeersOfShard")
shard = some(reader.readValue(uint16))
of "peers":
if peers.isSome():
reader.raiseUnexpectedField("Multiple `peers` fields found", "PeersOfShard")
peers = some(reader.readValue(WakuPeers))
else:
unrecognizedFieldWarning(value)
if shard.isNone():
reader.raiseUnexpectedValue("Field `shard` is missing")
if peers.isNone():
reader.raiseUnexpectedValue("Field `peers` are missing")
value = PeersOfShard(shard: shard.get(), peers: peers.get())
proc readValue*(
reader: var JsonReader[RestJson], value: var FilterTopic
) {.gcsafe, raises: [SerializationError, IOError].} =
@ -195,26 +238,47 @@ proc readValue*(
value = FilterSubscription(peerId: peerId.get(), filterCriteria: filterCriteria.get())
## Utility for populating WakuPeers and ProtocolState
func `==`*(a, b: ProtocolState): bool {.inline.} =
return a.protocol == b.protocol
func `==`*(a: ProtocolState, b: string): bool {.inline.} =
return a.protocol == b
func `==`*(a, b: WakuPeer): bool {.inline.} =
return a.multiaddr == b.multiaddr
proc init*(T: type WakuPeer, peerInfo: RemotePeerInfo): WakuPeer =
result = WakuPeer(
multiaddr: constructMultiaddrStr(peerInfo),
protocols: peerInfo.protocols,
shards: peerInfo.getShards(),
connected: peerInfo.connectedness,
agent: peerInfo.agent,
origin: peerInfo.origin,
score: none(float64),
)
proc init*(T: type WakuPeer, pubsubPeer: PubSubPeer, pm: PeerManager): WakuPeer =
let peerInfo = pm.getPeer(pubsubPeer.peerId)
result = WakuPeer(
multiaddr: constructMultiaddrStr(peerInfo),
protocols: peerInfo.protocols,
shards: peerInfo.getShards(),
connected: peerInfo.connectedness,
agent: peerInfo.agent,
origin: peerInfo.origin,
score: some(pubsubPeer.score),
)
proc add*(
peers: var WakuPeers,
multiaddr: string,
protocol: string,
connected: bool,
shards: seq[uint16],
connected: Connectedness,
agent: string,
origin: PeerOrigin,
) =
var peer: WakuPeer = WakuPeer(
multiaddr: multiaddr,
protocols: @[ProtocolState(protocol: protocol, connected: connected)],
protocols: @[protocol],
shards: shards,
connected: connected,
agent: agent,
origin: origin,
)
let idx = peers.find(peer)
@ -222,4 +286,4 @@ proc add*(
if idx < 0:
peers.add(peer)
else:
peers[idx].protocols.add(ProtocolState(protocol: protocol, connected: connected))
peers[idx].protocols.add(protocol)

View File

@ -1,9 +1,9 @@
{.push raises: [].}
import
std/typetraits,
std/[typetraits, parseutils],
results,
stew/byteutils,
stew/[byteutils, base10],
chronicles,
serialization,
json_serialization,
@ -100,3 +100,13 @@ proc encodeString*(value: string): RestResult[string] =
proc decodeString*(t: typedesc[string], value: string): RestResult[string] =
ok(value)
proc encodeString*(value: SomeUnsignedInt): RestResult[string] =
ok(Base10.toString(value))
proc decodeString*(T: typedesc[SomeUnsignedInt], value: string): RestResult[T] =
let v = Base10.decode(T, value)
if v.isErr():
return err(v.error())
else:
return ok(v.get())

View File

@ -18,7 +18,7 @@ import
libp2p/routing_record,
regex,
json_serialization
import ../waku_enr/capabilities
import ../waku_enr
type
Connectedness* = enum
@ -231,7 +231,7 @@ proc parsePeerInfo*(maddrs: varargs[string]): Result[RemotePeerInfo, string] =
parsePeerInfo(multiAddresses)
func getTransportProtocol(typedR: TypedRecord): Option[IpTransportProtocol] =
func getTransportProtocol(typedR: enr.TypedRecord): Option[IpTransportProtocol] =
if typedR.tcp6.isSome() or typedR.tcp.isSome():
return some(IpTransportProtocol.tcpProtocol)
@ -255,9 +255,9 @@ proc parseUrlPeerAddr*(
return ok(some(parsedPeerInfo.value))
proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
proc toRemotePeerInfo*(enrRec: enr.Record): Result[RemotePeerInfo, cstring] =
## Converts an ENR to dialable RemotePeerInfo
let typedR = TypedRecord.fromRecord(enr)
let typedR = enr.TypedRecord.fromRecord(enrRec)
if not typedR.secp256k1.isSome():
return err("enr: no secp256k1 key in record")
@ -303,7 +303,7 @@ proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
return err("enr: no addresses in record")
let protocolsRes = catch:
enr.getCapabilitiesCodecs()
enrRec.getCapabilitiesCodecs()
var protocols: seq[string]
if not protocolsRes.isErr():
@ -312,7 +312,7 @@ proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
error "Could not retrieve supported protocols from enr",
peerId = peerId, msg = protocolsRes.error.msg
return ok(RemotePeerInfo.init(peerId, addrs, some(enr), protocols))
return ok(RemotePeerInfo.init(peerId, addrs, some(enrRec), protocols))
converter toRemotePeerInfo*(peerRecord: PeerRecord): RemotePeerInfo =
## Converts peer records to dialable RemotePeerInfo
@ -350,8 +350,8 @@ func hasUdpPort*(peer: RemotePeerInfo): bool =
return false
let
enr = peer.enr.get()
typedEnr = TypedRecord.fromRecord(enr)
enrRec = peer.enr.get()
typedEnr = enr.TypedRecord.fromRecord(enrRec)
typedEnr.udp.isSome() or typedEnr.udp6.isSome()
@ -361,3 +361,18 @@ proc getAgent*(peer: RemotePeerInfo): string =
return "unknown"
return peer.agent
proc getShards*(peer: RemotePeerInfo): seq[uint16] =
if peer.enr.isNone():
return @[]
let enrRec = peer.enr.get()
let typedRecord = enrRec.toTyped().valueOr:
trace "invalid ENR record", error = error
return @[]
let shards = typedRecord.relaySharding()
if shards.isSome():
return shards.get().shardIds
return @[]

View File

@ -8,7 +8,7 @@ import
eth/keys,
libp2p/[multiaddress, multicodec],
libp2p/crypto/crypto
import ../common/enr, ../waku_core
import ../common/enr, ../waku_core/topics/pubsub_topic
logScope:
topics = "waku enr sharding"

View File

@ -323,31 +323,42 @@ proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
proc getDHigh*(T: type WakuRelay): int =
return GossipsubParameters.dHigh
proc getPeersInMesh*(
proc getPubSubPeersInMesh*(
w: WakuRelay, pubsubTopic: PubsubTopic
): Result[seq[PeerId], string] =
## Returns the list of peerIds in a mesh defined by the passed pubsub topic.
): Result[HashSet[PubSubPeer], string] =
## Returns the list of PubSubPeers in a mesh defined by the passed pubsub topic.
## The 'mesh' atribute is defined in the GossipSub ref object.
if not w.mesh.hasKey(pubsubTopic):
debug "getPeersInMesh - there is no mesh peer for the given pubsub topic",
debug "getPubSubPeersInMesh - there is no mesh peer for the given pubsub topic",
pubsubTopic = pubsubTopic
return ok(newSeq[PeerId]())
return ok(initHashSet[PubSubPeer]())
let peersRes = catch:
w.mesh[pubsubTopic]
let peers: HashSet[PubSubPeer] = peersRes.valueOr:
return err("getPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg)
return err(
"getPubSubPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg
)
let peerIds = toSeq(peers).mapIt(it.peerId)
return ok(peers)
proc getPeersInMesh*(
w: WakuRelay, pubsubTopic: PubsubTopic
): Result[seq[PeerId], string] =
## Returns the list of peerIds in a mesh defined by the passed pubsub topic.
## The 'mesh' atribute is defined in the GossipSub ref object.
let pubSubPeers = w.getPubSubPeersInMesh(pubsubTopic).valueOr:
return err(error)
let peerIds = toSeq(pubSubPeers).mapIt(it.peerId)
return ok(peerIds)
proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] =
## Returns the number of peers in a mesh defined by the passed pubsub topic.
let peers = w.getPeersInMesh(pubsubTopic).valueOr:
let peers = w.getPubSubPeersInMesh(pubsubTopic).valueOr:
return err(
"getNumPeersInMesh - failed retrieving peers in mesh: " & pubsubTopic & ": " &
error
@ -557,18 +568,17 @@ proc publish*(
return ok(relayedPeerCount)
proc getConnectedPeers*(
proc getConnectedPubSubPeers*(
w: WakuRelay, pubsubTopic: PubsubTopic
): Result[seq[PeerId], string] =
): Result[HashSet[PubsubPeer], string] =
## Returns the list of peerIds of connected peers and subscribed to the passed pubsub topic.
## The 'gossipsub' atribute is defined in the GossipSub ref object.
if pubsubTopic == "":
## Return all the connected peers
var peerIds = newSeq[PeerId]()
var peerIds = initHashSet[PubsubPeer]()
for k, v in w.gossipsub:
peerIds.add(toSeq(v).mapIt(it.peerId))
# alternatively: peerIds &= toSeq(v).mapIt(it.peerId)
peerIds = peerIds + v
return ok(peerIds)
if not w.gossipsub.hasKey(pubsubTopic):
@ -584,6 +594,17 @@ proc getConnectedPeers*(
return
err("getConnectedPeers - exception accessing " & pubsubTopic & ": " & error.msg)
return ok(peers)
proc getConnectedPeers*(
w: WakuRelay, pubsubTopic: PubsubTopic
): Result[seq[PeerId], string] =
## Returns the list of peerIds of connected peers and subscribed to the passed pubsub topic.
## The 'gossipsub' atribute is defined in the GossipSub ref object.
let peers = w.getConnectedPubSubPeers(pubsubTopic).valueOr:
return err(error)
let peerIds = toSeq(peers).mapIt(it.peerId)
return ok(peerIds)
@ -593,7 +614,7 @@ proc getNumConnectedPeers*(
## Returns the number of connected peers and subscribed to the passed pubsub topic.
## Return all the connected peers
let peers = w.getConnectedPeers(pubsubTopic).valueOr:
let peers = w.getConnectedPubSubPeers(pubsubTopic).valueOr:
return err(
"getNumConnectedPeers - failed retrieving peers in mesh: " & pubsubTopic & ": " &
error