From ab8a30d3d61eb6bc83f69b759b0d5fc0b0fe5da2 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Thu, 24 Apr 2025 08:36:02 +0200 Subject: [PATCH] chore: extended /admin/v1 RESP API with different option to look at current connected/relay/mesh state of the node (#3382) * Extended /admin/v1 RESP API with different option to look at current connected/relay/mesh state of the node * Added score information for peer info retrievals --- tests/wakunode_rest/test_rest_admin.nim | 104 ++++++- waku/node/peer_manager/peer_manager.nim | 2 +- waku/waku_api/rest/admin/client.nim | 38 +++ waku/waku_api/rest/admin/handlers.nim | 351 +++++++++++++++++------- waku/waku_api/rest/admin/types.nim | 184 +++++++++---- waku/waku_api/rest/serdes.nim | 14 +- waku/waku_core/peers.nim | 31 ++- waku/waku_enr/sharding.nim | 2 +- waku/waku_relay/protocol.nim | 49 +++- 9 files changed, 589 insertions(+), 186 deletions(-) diff --git a/tests/wakunode_rest/test_rest_admin.nim b/tests/wakunode_rest/test_rest_admin.nim index 99ddacd8c..bdab61a75 100644 --- a/tests/wakunode_rest/test_rest_admin.nim +++ b/tests/wakunode_rest/test_rest_admin.nim @@ -1,11 +1,11 @@ {.used.} import - std/[sequtils, net], - stew/shims/net, + std/[sequtils, strformat, net], testutils/unittests, presto, presto/client as presto_client, + presto /../ tests/helpers, libp2p/crypto/crypto import @@ -43,10 +43,11 @@ suite "Waku v2 Rest API - Admin": node3 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60604)) await allFutures(node1.start(), node2.start(), node3.start()) + let shards = @[RelayShard(clusterId: 1, shardId: 0)] await allFutures( - node1.mountRelay(), - node2.mountRelay(), - node3.mountRelay(), + node1.mountRelay(shards = shards), + node2.mountRelay(shards = shards), + node3.mountRelay(shards = shards), node3.mountPeerExchange(), ) @@ -203,3 +204,96 @@ suite "Waku v2 Rest API - Admin": getRes.data.anyIt(it.origin == Discv5) # Check peer 3 getRes.data.anyIt(it.origin == PeerExchange) + + asyncTest "get peers by id": + # Connect to nodes 2 and 3 using the Admin API + let postRes = await client.postPeers( + @[constructMultiaddrStr(peerInfo2), constructMultiaddrStr(peerInfo3)] + ) + + check: + postRes.status == 200 + + let getRes = await client.getPeerById($peerInfo2.peerId) + + check: + getRes.status == 200 + $getRes.contentType == $MIMETYPE_JSON + getRes.data.protocols.find(WakuRelayCodec) >= 0 + getRes.data.multiaddr == constructMultiaddrStr(peerInfo2) + + ## nim-presto library's RestClient does not support text error case decode if + ## the RestResponse expects a JSON with complex type + # let getRes2 = await client.getPeerById("bad peer id") + let getRes2 = await httpClient( + restServer.httpServer.address, MethodGet, "/admin/v1/peer/bad+peer+id", "" + ) + check: + getRes2.status == 400 + getRes2.data == "Invalid argument:peerid: incorrect PeerId string" + + asyncTest "get connected peers": + # Connect to nodes 2 and 3 using the Admin API + let postRes = await client.postPeers( + @[constructMultiaddrStr(peerInfo2), constructMultiaddrStr(peerInfo3)] + ) + + check: + postRes.status == 200 + + let getRes = await client.getConnectedPeers() + + check: + getRes.status == 200 + $getRes.contentType == $MIMETYPE_JSON + getRes.data.len() == 2 + # Check peer 2 + getRes.data.anyIt(it.multiaddr == constructMultiaddrStr(peerInfo2)) + # Check peer 3 + getRes.data.anyIt(it.multiaddr == constructMultiaddrStr(peerInfo3)) + + # Seems shard info is not available in the peer manager + # let getRes2 = await client.getConnectedPeersByShard(0) + # check: + # getRes2.status == 200 + # $getRes2.contentType == $MIMETYPE_JSON + # getRes2.data.len() == 2 + + let getRes3 = await client.getConnectedPeersByShard(99) + check: + getRes3.status == 200 + $getRes3.contentType == $MIMETYPE_JSON + getRes3.data.len() == 0 + + asyncTest "get relay peers": + # Connect to nodes 2 and 3 using the Admin API + let postRes = await client.postPeers( + @[constructMultiaddrStr(peerInfo2), constructMultiaddrStr(peerInfo3)] + ) + + check: + postRes.status == 200 + + let getRes = await client.getConnectedRelayPeers() + + check: + getRes.status == 200 + $getRes.contentType == $MIMETYPE_JSON + require getRes.data.len() == 1 # Check peer 2 + check getRes.data[0].peers.anyIt(it.multiaddr == constructMultiaddrStr(peerInfo2)) + # Check peer 2 + check getRes.data[0].peers.anyIt(it.multiaddr == constructMultiaddrStr(peerInfo3)) + # Check peer 3 + + # Todo: investigate why the test setup missing remote peer's shard info + # let getRes2 = await client.getConnectedRelayPeersByShard(0) + # check: + # getRes2.status == 200 + # $getRes2.contentType == $MIMETYPE_JSON + # getRes2.data.peers.len() == 2 + + let getRes3 = await client.getConnectedRelayPeersByShard(99) + check: + getRes3.status == 200 + $getRes3.contentType == $MIMETYPE_JSON + getRes3.data.peers.len() == 0 diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 39baeea3e..602718d5d 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -154,7 +154,7 @@ proc addPeer*( pm.storage.insertOrReplace(remotePeerInfo) -proc getPeer(pm: PeerManager, peerId: PeerId): RemotePeerInfo = +proc getPeer*(pm: PeerManager, peerId: PeerId): RemotePeerInfo = return pm.switch.peerStore.getPeer(peerId) proc loadFromStorage(pm: PeerManager) {.gcsafe.} = diff --git a/waku/waku_api/rest/admin/client.nim b/waku/waku_api/rest/admin/client.nim index ebcebe965..4b46ca136 100644 --- a/waku/waku_api/rest/admin/client.nim +++ b/waku/waku_api/rest/admin/client.nim @@ -22,6 +22,44 @@ proc postPeers*( rest, endpoint: "/admin/v1/peers", meth: HttpMethod.MethodPost .} +proc getPeerById*( + peerId: string +): RestResponse[WakuPeer] {. + rest, endpoint: "/admin/v1/peer/{peerId}", meth: HttpMethod.MethodGet +.} + +proc getConnectedPeers*(): RestResponse[seq[WakuPeer]] {. + rest, endpoint: "/admin/v1/peers/connected", meth: HttpMethod.MethodGet +.} + +proc getConnectedPeersByShard*( + shardId: uint16 +): RestResponse[seq[WakuPeer]] {. + rest, endpoint: "/admin/v1/peers/connected/on/{shardId}", meth: HttpMethod.MethodGet +.} + +proc getConnectedRelayPeers*(): RestResponse[PeersOfShards] {. + rest, endpoint: "/admin/v1/peers/connected/relay", meth: HttpMethod.MethodGet +.} + +proc getConnectedRelayPeersByShard*( + shardId: uint16 +): RestResponse[PeersOfShard] {. + rest, + endpoint: "/admin/v1/peers/connected/relay/on/{shardId}", + meth: HttpMethod.MethodGet +.} + +proc getMeshPeers*(): RestResponse[PeersOfShards] {. + rest, endpoint: "/admin/v1/peers/mesh", meth: HttpMethod.MethodGet +.} + +proc getMeshPeersByShard*( + shardId: uint16 +): RestResponse[PeersOfShard] {. + rest, endpoint: "/admin/v1/peers/mesh/on/{shardId}", meth: HttpMethod.MethodGet +.} + proc getFilterSubscriptions*(): RestResponse[seq[FilterSubscription]] {. rest, endpoint: "/admin/v1/filter/subscriptions", meth: HttpMethod.MethodGet .} diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index f2eb4a8ba..ada60e870 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -1,22 +1,26 @@ {.push raises: [].} import - std/[strformat, sequtils, tables], + std/[sets, strformat, sequtils, tables], chronicles, json_serialization, presto/route, - libp2p/[peerinfo, switch] + libp2p/[peerinfo, switch, peerid, protocols/pubsub/pubsubpeer] import - ../../../waku_core, - ../../../waku_store_legacy/common, - ../../../waku_store/common, - ../../../waku_filter_v2, - ../../../waku_lightpush_legacy/common, - ../../../waku_relay, - ../../../waku_peer_exchange, - ../../../waku_node, - ../../../node/peer_manager, + waku/[ + waku_core, + waku_core/topics/pubsub_topic, + waku_store_legacy/common, + waku_store/common, + waku_filter_v2, + waku_lightpush_legacy/common, + waku_relay, + waku_peer_exchange, + waku_node, + node/peer_manager, + waku_enr/sharding, + ], ../responses, ../serdes, ../rest_serdes, @@ -27,103 +31,260 @@ export types logScope: topics = "waku node rest admin api" -const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers" +const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers" # returns all peers +const ROUTE_ADMIN_V1_SINGLE_PEER* = "/admin/v1/peer/{peerId}" + +const ROUTE_ADMIN_V1_CONNECTED_PEERS* = "/admin/v1/peers/connected" +const ROUTE_ADMIN_V1_CONNECTED_PEERS_ON_SHARD* = + "/admin/v1/peers/connected/on/{shardId}" +const ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS* = "/admin/v1/peers/connected/relay" +const ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS_ON_SHARD* = + "/admin/v1/peers/connected/relay/on/{shardId}" +const ROUTE_ADMIN_V1_MESH_PEERS* = "/admin/v1/peers/mesh" +const ROUTE_ADMIN_V1_MESH_PEERS_ON_SHARD* = "/admin/v1/peers/mesh/on/{shardId}" + const ROUTE_ADMIN_V1_FILTER_SUBS* = "/admin/v1/filter/subscriptions" type PeerProtocolTuple = - tuple[multiaddr: string, protocol: string, connected: bool, origin: PeerOrigin] + tuple[ + multiaddr: string, + protocol: string, + shards: seq[uint16], + connected: Connectedness, + agent: string, + origin: PeerOrigin, + ] proc tuplesToWakuPeers(peers: var WakuPeers, peersTup: seq[PeerProtocolTuple]) = for peer in peersTup: - peers.add(peer.multiaddr, peer.protocol, peer.connected, peer.origin) + peers.add( + peer.multiaddr, peer.protocol, peer.shards, peer.connected, peer.agent, + peer.origin, + ) + +proc populateAdminPeerInfo(peers: var WakuPeers, node: WakuNode, codec: string) = + let peersForCodec = node.peerManager.switch.peerStore.peers(codec).mapIt( + ( + multiaddr: constructMultiaddrStr(it), + protocol: codec, + shards: it.getShards(), + connected: it.connectedness, + agent: it.agent, + origin: it.origin, + ) + ) + tuplesToWakuPeers(peers, peersForCodec) + +proc populateAdminPeerInfoForCodecs(node: WakuNode, codecs: seq[string]): WakuPeers = + var peers: WakuPeers = @[] + + for codec in codecs: + populateAdminPeerInfo(peers, node, codec) + + return peers proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = router.api(MethodGet, ROUTE_ADMIN_V1_PEERS) do() -> RestApiResponse: - var peers: WakuPeers = @[] - - let relayPeers = node.peerManager.switch.peerStore.peers(WakuRelayCodec).mapIt( - ( - multiaddr: constructMultiaddrStr(it), - protocol: WakuRelayCodec, - connected: it.connectedness == Connectedness.Connected, - origin: it.origin, - ) - ) - tuplesToWakuPeers(peers, relayPeers) - - let filterV2Peers = node.peerManager.switch.peerStore - .peers(WakuFilterSubscribeCodec) - .mapIt( - ( - multiaddr: constructMultiaddrStr(it), - protocol: WakuFilterSubscribeCodec, - connected: it.connectedness == Connectedness.Connected, - origin: it.origin, - ) - ) - tuplesToWakuPeers(peers, filterV2Peers) - - let storePeers = node.peerManager.switch.peerStore.peers(WakuStoreCodec).mapIt( - ( - multiaddr: constructMultiaddrStr(it), - protocol: WakuStoreCodec, - connected: it.connectedness == Connectedness.Connected, - origin: it.origin, - ) - ) - tuplesToWakuPeers(peers, storePeers) - - let legacyStorePeers = node.peerManager.switch.peerStore - .peers(WakuLegacyStoreCodec) - .mapIt( - ( - multiaddr: constructMultiaddrStr(it), - protocol: WakuLegacyStoreCodec, - connected: it.connectedness == Connectedness.Connected, - origin: it.origin, - ) - ) - tuplesToWakuPeers(peers, legacyStorePeers) - - let legacyLightpushPeers = node.peerManager.switch.peerStore - .peers(WakuLegacyLightPushCodec) - .mapIt( - ( - multiaddr: constructMultiaddrStr(it), - protocol: WakuLegacyLightPushCodec, - connected: it.connectedness == Connectedness.Connected, - origin: it.origin, - ) - ) - tuplesToWakuPeers(peers, legacyLightpushPeers) - - let lightpushPeers = node.peerManager.switch.peerStore - .peers(WakuLightPushCodec) - .mapIt( - ( - multiaddr: constructMultiaddrStr(it), - protocol: WakuLightPushCodec, - connected: it.connectedness == Connectedness.Connected, - origin: it.origin, - ) - ) - tuplesToWakuPeers(peers, lightpushPeers) - - let pxPeers = node.peerManager.switch.peerStore.peers(WakuPeerExchangeCodec).mapIt( - ( - multiaddr: constructMultiaddrStr(it), - protocol: WakuPeerExchangeCodec, - connected: it.connectedness == Connectedness.Connected, - origin: it.origin, - ) - ) - tuplesToWakuPeers(peers, pxPeers) + let peers = populateAdminPeerInfoForCodecs( + node, + @[ + WakuRelayCodec, WakuFilterSubscribeCodec, WakuStoreCodec, WakuLegacyStoreCodec, + WakuLegacyLightPushCodec, WakuLightPushCodec, WakuPeerExchangeCodec, + WakuReconciliationCodec, + ], + ) let resp = RestApiResponse.jsonResponse(peers, status = Http200) if resp.isErr(): - error "An error ocurred while building the json respose: ", error = resp.error + error "An error occurred while building the json response: ", error = resp.error return RestApiResponse.internalServerError( - fmt("An error ocurred while building the json respose: {resp.error}") + fmt("An error occurred while building the json response: {resp.error}") + ) + + return resp.get() + + router.api(MethodGet, ROUTE_ADMIN_V1_SINGLE_PEER) do( + peerId: string + ) -> RestApiResponse: + let peerIdString = peerId.valueOr: + return RestApiResponse.badRequest("Invalid argument:" & $error) + + let peerIdVal: PeerId = PeerId.init(peerIdString).valueOr: + return RestApiResponse.badRequest("Invalid argument:" & $error) + + if node.peerManager.switch.peerStore.peerExists(peerIdVal): + let peerInfo = node.peerManager.switch.peerStore.getPeer(peerIdVal) + let peer = WakuPeer.init(peerInfo) + let resp = RestApiResponse.jsonResponse(peer, status = Http200) + if resp.isErr(): + error "An error occurred while building the json response: ", error = resp.error + return RestApiResponse.internalServerError( + fmt("An error occurred while building the json response: {resp.error}") + ) + + return resp.get() + else: + return RestApiResponse.notFound(fmt("Peer with ID {peerId} not found")) + + router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_PEERS) do() -> RestApiResponse: + let allPeers = populateAdminPeerInfoForCodecs( + node, + @[ + WakuRelayCodec, WakuFilterSubscribeCodec, WakuStoreCodec, WakuLegacyStoreCodec, + WakuLegacyLightPushCodec, WakuLightPushCodec, WakuPeerExchangeCodec, + WakuReconciliationCodec, + ], + ) + + let connectedPeers = allPeers.filterIt(it.connected == Connectedness.Connected) + + let resp = RestApiResponse.jsonResponse(connectedPeers, status = Http200) + if resp.isErr(): + error "An error occurred while building the json response: ", error = resp.error + return RestApiResponse.internalServerError( + fmt("An error occurred while building the json response: {resp.error}") + ) + + return resp.get() + + router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_PEERS_ON_SHARD) do( + shardId: uint16 + ) -> RestApiResponse: + let shard = shardId.valueOr: + return RestApiResponse.badRequest(fmt("Invalid shardId: {error}")) + + let allPeers = populateAdminPeerInfoForCodecs( + node, + @[ + WakuRelayCodec, WakuFilterSubscribeCodec, WakuStoreCodec, WakuLegacyStoreCodec, + WakuLegacyLightPushCodec, WakuLightPushCodec, WakuPeerExchangeCodec, + WakuReconciliationCodec, + ], + ) + + let connectedPeers = allPeers.filterIt( + it.connected == Connectedness.Connected and it.shards.contains(shard) + ) + + let resp = RestApiResponse.jsonResponse(connectedPeers, status = Http200) + if resp.isErr(): + error "An error occurred while building the json response: ", error = resp.error + return RestApiResponse.internalServerError( + fmt("An error occurred while building the json response: {resp.error}") + ) + + return resp.get() + + router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS) do() -> RestApiResponse: + if node.wakuRelay.isNil(): + return RestApiResponse.serviceUnavailable( + "Error: Relay Protocol is not mounted to the node" + ) + + var relayPeers: PeersOfShards = @[] + for topic in node.wakuRelay.getSubscribedTopics(): + let relayShard = RelayShard.parse(topic).valueOr: + error "Invalid subscribed topic", error = error, topic = topic + continue + let pubsubPeers = + node.wakuRelay.getConnectedPubSubPeers(topic).get(initHashSet[PubSubPeer](0)) + relayPeers.add( + PeersOfShard( + shard: relayShard.shardId, + peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager)), + ) + ) + + let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200) + if resp.isErr(): + error "An error occurred while building the json response: ", error = resp.error + return RestApiResponse.internalServerError( + fmt("An error occurred while building the json response: {resp.error}") + ) + + return resp.get() + + router.api(MethodGet, ROUTE_ADMIN_V1_CONNECTED_RELAY_PEERS_ON_SHARD) do( + shardId: uint16 + ) -> RestApiResponse: + let shard = shardId.valueOr: + return RestApiResponse.badRequest(fmt("Invalid shardId: {error}")) + + if node.wakuRelay.isNil(): + return RestApiResponse.serviceUnavailable( + "Error: Relay Protocol is not mounted to the node" + ) + + let topic = + toPubsubTopic(RelayShard(clusterId: node.wakuSharding.clusterId, shardId: shard)) + let pubsubPeers = + node.wakuRelay.getConnectedPubSubPeers(topic).get(initHashSet[PubSubPeer](0)) + let relayPeer = PeersOfShard( + shard: shard, peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager)) + ) + + let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200) + if resp.isErr(): + error "An error occurred while building the json response: ", error = resp.error + return RestApiResponse.internalServerError( + fmt("An error occurred while building the json response: {resp.error}") + ) + + return resp.get() + + router.api(MethodGet, ROUTE_ADMIN_V1_MESH_PEERS) do() -> RestApiResponse: + if node.wakuRelay.isNil(): + return RestApiResponse.serviceUnavailable( + "Error: Relay Protocol is not mounted to the node" + ) + + var relayPeers: PeersOfShards = @[] + for topic in node.wakuRelay.getSubscribedTopics(): + let relayShard = RelayShard.parse(topic).valueOr: + error "Invalid subscribed topic", error = error, topic = topic + continue + let peers = + node.wakuRelay.getPubSubPeersInMesh(topic).get(initHashSet[PubSubPeer](0)) + relayPeers.add( + PeersOfShard( + shard: relayShard.shardId, + peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager)), + ) + ) + + let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200) + if resp.isErr(): + error "An error occurred while building the json response: ", error = resp.error + return RestApiResponse.internalServerError( + fmt("An error occurred while building the json response: {resp.error}") + ) + + return resp.get() + + router.api(MethodGet, ROUTE_ADMIN_V1_MESH_PEERS_ON_SHARD) do( + shardId: uint16 + ) -> RestApiResponse: + let shard = shardId.valueOr: + return RestApiResponse.badRequest(fmt("Invalid shardId: {error}")) + + if node.wakuRelay.isNil(): + return RestApiResponse.serviceUnavailable( + "Error: Relay Protocol is not mounted to the node" + ) + + let topic = + toPubsubTopic(RelayShard(clusterId: node.wakuSharding.clusterId, shardId: shard)) + let peers = + node.wakuRelay.getPubSubPeersInMesh(topic).get(initHashSet[PubSubPeer](0)) + let relayPeer = PeersOfShard( + shard: shard, peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager)) + ) + + let resp = RestApiResponse.jsonResponse(relayPeer, status = Http200) + if resp.isErr(): + error "An error occurred while building the json response: ", error = resp.error + return RestApiResponse.internalServerError( + fmt("An error occurred while building the json response: {resp.error}") ) return resp.get() diff --git a/waku/waku_api/rest/admin/types.nim b/waku/waku_api/rest/admin/types.nim index bb7dd2b0c..0c0786e3d 100644 --- a/waku/waku_api/rest/admin/types.nim +++ b/waku/waku_api/rest/admin/types.nim @@ -4,22 +4,29 @@ import chronicles, json_serialization, json_serialization/std/options, - json_serialization/lexer -import ../serdes, ../../../waku_core + json_serialization/lexer, + results, + libp2p/protocols/pubsub/pubsubpeer +import waku/[waku_core, node/peer_manager], ../serdes #### Types - -type ProtocolState* = object - protocol*: string - connected*: bool - type WakuPeer* = object multiaddr*: string - protocols*: seq[ProtocolState] + protocols*: seq[string] + shards*: seq[uint16] + connected*: Connectedness + agent*: string origin*: PeerOrigin + score*: Option[float64] type WakuPeers* = seq[WakuPeer] +type PeersOfShard* = object + shard*: uint16 + peers*: WakuPeers + +type PeersOfShards* = seq[PeersOfShard] + type FilterTopic* = object pubsubTopic*: string contentTopic*: string @@ -29,22 +36,25 @@ type FilterSubscription* = object filterCriteria*: seq[FilterTopic] #### Serialization and deserialization - -proc writeValue*( - writer: var JsonWriter[RestJson], value: ProtocolState -) {.raises: [IOError].} = - writer.beginRecord() - writer.writeField("protocol", value.protocol) - writer.writeField("connected", value.connected) - writer.endRecord() - proc writeValue*( writer: var JsonWriter[RestJson], value: WakuPeer ) {.raises: [IOError].} = writer.beginRecord() writer.writeField("multiaddr", value.multiaddr) writer.writeField("protocols", value.protocols) + writer.writeField("shards", value.shards) + writer.writeField("connected", value.connected) + writer.writeField("agent", value.agent) writer.writeField("origin", value.origin) + writer.writeField("score", value.score) + writer.endRecord() + +proc writeValue*( + writer: var JsonWriter[RestJson], value: PeersOfShard +) {.raises: [IOError].} = + writer.beginRecord() + writer.writeField("shard", value.shard) + writer.writeField("peers", value.peers) writer.endRecord() proc writeValue*( @@ -63,43 +73,17 @@ proc writeValue*( writer.writeField("filterCriteria", value.filterCriteria) writer.endRecord() -proc readValue*( - reader: var JsonReader[RestJson], value: var ProtocolState -) {.gcsafe, raises: [SerializationError, IOError].} = - var - protocol: Option[string] - connected: Option[bool] - - for fieldName in readObjectFields(reader): - case fieldName - of "protocol": - if protocol.isSome(): - reader.raiseUnexpectedField("Multiple `protocol` fields found", "ProtocolState") - protocol = some(reader.readValue(string)) - of "connected": - if connected.isSome(): - reader.raiseUnexpectedField( - "Multiple `connected` fields found", "ProtocolState" - ) - connected = some(reader.readValue(bool)) - else: - unrecognizedFieldWarning(value) - - if connected.isNone(): - reader.raiseUnexpectedValue("Field `connected` is missing") - - if protocol.isNone(): - reader.raiseUnexpectedValue("Field `protocol` is missing") - - value = ProtocolState(protocol: protocol.get(), connected: connected.get()) - proc readValue*( reader: var JsonReader[RestJson], value: var WakuPeer ) {.gcsafe, raises: [SerializationError, IOError].} = var multiaddr: Option[string] - protocols: Option[seq[ProtocolState]] + protocols: Option[seq[string]] + shards: Option[seq[uint16]] + connected: Option[Connectedness] + agent: Option[string] origin: Option[PeerOrigin] + score: Option[float64] for fieldName in readObjectFields(reader): case fieldName @@ -110,11 +94,27 @@ proc readValue*( of "protocols": if protocols.isSome(): reader.raiseUnexpectedField("Multiple `protocols` fields found", "WakuPeer") - protocols = some(reader.readValue(seq[ProtocolState])) + protocols = some(reader.readValue(seq[string])) + of "shards": + if shards.isSome(): + reader.raiseUnexpectedField("Multiple `shards` fields found", "WakuPeer") + shards = some(reader.readValue(seq[uint16])) + of "connected": + if connected.isSome(): + reader.raiseUnexpectedField("Multiple `connected` fields found", "WakuPeer") + connected = some(reader.readValue(Connectedness)) + of "agent": + if agent.isSome(): + reader.raiseUnexpectedField("Multiple `agent` fields found", "WakuPeer") + agent = some(reader.readValue(string)) of "origin": if origin.isSome(): reader.raiseUnexpectedField("Multiple `origin` fields found", "WakuPeer") origin = some(reader.readValue(PeerOrigin)) + of "score": + if score.isSome(): + reader.raiseUnexpectedField("Multiple `score` fields found", "WakuPeer") + score = some(reader.readValue(float64)) else: unrecognizedFieldWarning(value) @@ -124,13 +124,56 @@ proc readValue*( if protocols.isNone(): reader.raiseUnexpectedValue("Field `protocols` are missing") + if shards.isNone(): + reader.raiseUnexpectedValue("Field `shards` is missing") + + if connected.isNone(): + reader.raiseUnexpectedValue("Field `connected` is missing") + + if agent.isNone(): + reader.raiseUnexpectedValue("Field `agent` is missing") + if origin.isNone(): reader.raiseUnexpectedValue("Field `origin` is missing") value = WakuPeer( - multiaddr: multiaddr.get(), protocols: protocols.get(), origin: origin.get() + multiaddr: multiaddr.get(), + protocols: protocols.get(), + shards: shards.get(), + connected: connected.get(), + agent: agent.get(), + origin: origin.get(), + score: score, ) +proc readValue*( + reader: var JsonReader[RestJson], value: var PeersOfShard +) {.gcsafe, raises: [SerializationError, IOError].} = + var + shard: Option[uint16] + peers: Option[WakuPeers] + + for fieldName in readObjectFields(reader): + case fieldName + of "shard": + if shard.isSome(): + reader.raiseUnexpectedField("Multiple `shard` fields found", "PeersOfShard") + shard = some(reader.readValue(uint16)) + of "peers": + if peers.isSome(): + reader.raiseUnexpectedField("Multiple `peers` fields found", "PeersOfShard") + peers = some(reader.readValue(WakuPeers)) + else: + unrecognizedFieldWarning(value) + + if shard.isNone(): + reader.raiseUnexpectedValue("Field `shard` is missing") + + if peers.isNone(): + reader.raiseUnexpectedValue("Field `peers` are missing") + + value = PeersOfShard(shard: shard.get(), peers: peers.get()) + proc readValue*( reader: var JsonReader[RestJson], value: var FilterTopic ) {.gcsafe, raises: [SerializationError, IOError].} = @@ -195,26 +238,47 @@ proc readValue*( value = FilterSubscription(peerId: peerId.get(), filterCriteria: filterCriteria.get()) -## Utility for populating WakuPeers and ProtocolState -func `==`*(a, b: ProtocolState): bool {.inline.} = - return a.protocol == b.protocol - -func `==`*(a: ProtocolState, b: string): bool {.inline.} = - return a.protocol == b - func `==`*(a, b: WakuPeer): bool {.inline.} = return a.multiaddr == b.multiaddr +proc init*(T: type WakuPeer, peerInfo: RemotePeerInfo): WakuPeer = + result = WakuPeer( + multiaddr: constructMultiaddrStr(peerInfo), + protocols: peerInfo.protocols, + shards: peerInfo.getShards(), + connected: peerInfo.connectedness, + agent: peerInfo.agent, + origin: peerInfo.origin, + score: none(float64), + ) + +proc init*(T: type WakuPeer, pubsubPeer: PubSubPeer, pm: PeerManager): WakuPeer = + let peerInfo = pm.getPeer(pubsubPeer.peerId) + result = WakuPeer( + multiaddr: constructMultiaddrStr(peerInfo), + protocols: peerInfo.protocols, + shards: peerInfo.getShards(), + connected: peerInfo.connectedness, + agent: peerInfo.agent, + origin: peerInfo.origin, + score: some(pubsubPeer.score), + ) + proc add*( peers: var WakuPeers, multiaddr: string, protocol: string, - connected: bool, + shards: seq[uint16], + connected: Connectedness, + agent: string, origin: PeerOrigin, ) = var peer: WakuPeer = WakuPeer( multiaddr: multiaddr, - protocols: @[ProtocolState(protocol: protocol, connected: connected)], + protocols: @[protocol], + shards: shards, + connected: connected, + agent: agent, origin: origin, ) let idx = peers.find(peer) @@ -222,4 +286,4 @@ proc add*( if idx < 0: peers.add(peer) else: - peers[idx].protocols.add(ProtocolState(protocol: protocol, connected: connected)) + peers[idx].protocols.add(protocol) diff --git a/waku/waku_api/rest/serdes.nim b/waku/waku_api/rest/serdes.nim index eb6bc1545..d54d17e78 100644 --- a/waku/waku_api/rest/serdes.nim +++ b/waku/waku_api/rest/serdes.nim @@ -1,9 +1,9 @@ {.push raises: [].} import - std/typetraits, + std/[typetraits, parseutils], results, - stew/byteutils, + stew/[byteutils, base10], chronicles, serialization, json_serialization, @@ -100,3 +100,13 @@ proc encodeString*(value: string): RestResult[string] = proc decodeString*(t: typedesc[string], value: string): RestResult[string] = ok(value) + +proc encodeString*(value: SomeUnsignedInt): RestResult[string] = + ok(Base10.toString(value)) + +proc decodeString*(T: typedesc[SomeUnsignedInt], value: string): RestResult[T] = + let v = Base10.decode(T, value) + if v.isErr(): + return err(v.error()) + else: + return ok(v.get()) diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index fdd3d7948..883f266bd 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -18,7 +18,7 @@ import libp2p/routing_record, regex, json_serialization -import ../waku_enr/capabilities +import ../waku_enr type Connectedness* = enum @@ -231,7 +231,7 @@ proc parsePeerInfo*(maddrs: varargs[string]): Result[RemotePeerInfo, string] = parsePeerInfo(multiAddresses) -func getTransportProtocol(typedR: TypedRecord): Option[IpTransportProtocol] = +func getTransportProtocol(typedR: enr.TypedRecord): Option[IpTransportProtocol] = if typedR.tcp6.isSome() or typedR.tcp.isSome(): return some(IpTransportProtocol.tcpProtocol) @@ -255,9 +255,9 @@ proc parseUrlPeerAddr*( return ok(some(parsedPeerInfo.value)) -proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] = +proc toRemotePeerInfo*(enrRec: enr.Record): Result[RemotePeerInfo, cstring] = ## Converts an ENR to dialable RemotePeerInfo - let typedR = TypedRecord.fromRecord(enr) + let typedR = enr.TypedRecord.fromRecord(enrRec) if not typedR.secp256k1.isSome(): return err("enr: no secp256k1 key in record") @@ -303,7 +303,7 @@ proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] = return err("enr: no addresses in record") let protocolsRes = catch: - enr.getCapabilitiesCodecs() + enrRec.getCapabilitiesCodecs() var protocols: seq[string] if not protocolsRes.isErr(): @@ -312,7 +312,7 @@ proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] = error "Could not retrieve supported protocols from enr", peerId = peerId, msg = protocolsRes.error.msg - return ok(RemotePeerInfo.init(peerId, addrs, some(enr), protocols)) + return ok(RemotePeerInfo.init(peerId, addrs, some(enrRec), protocols)) converter toRemotePeerInfo*(peerRecord: PeerRecord): RemotePeerInfo = ## Converts peer records to dialable RemotePeerInfo @@ -350,8 +350,8 @@ func hasUdpPort*(peer: RemotePeerInfo): bool = return false let - enr = peer.enr.get() - typedEnr = TypedRecord.fromRecord(enr) + enrRec = peer.enr.get() + typedEnr = enr.TypedRecord.fromRecord(enrRec) typedEnr.udp.isSome() or typedEnr.udp6.isSome() @@ -361,3 +361,18 @@ proc getAgent*(peer: RemotePeerInfo): string = return "unknown" return peer.agent + +proc getShards*(peer: RemotePeerInfo): seq[uint16] = + if peer.enr.isNone(): + return @[] + + let enrRec = peer.enr.get() + let typedRecord = enrRec.toTyped().valueOr: + trace "invalid ENR record", error = error + return @[] + + let shards = typedRecord.relaySharding() + if shards.isSome(): + return shards.get().shardIds + + return @[] diff --git a/waku/waku_enr/sharding.nim b/waku/waku_enr/sharding.nim index 88dc4e200..4ee77bf96 100644 --- a/waku/waku_enr/sharding.nim +++ b/waku/waku_enr/sharding.nim @@ -8,7 +8,7 @@ import eth/keys, libp2p/[multiaddress, multicodec], libp2p/crypto/crypto -import ../common/enr, ../waku_core +import ../common/enr, ../waku_core/topics/pubsub_topic logScope: topics = "waku enr sharding" diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 1698fac70..4eeaf4607 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -323,31 +323,42 @@ proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} = proc getDHigh*(T: type WakuRelay): int = return GossipsubParameters.dHigh -proc getPeersInMesh*( +proc getPubSubPeersInMesh*( w: WakuRelay, pubsubTopic: PubsubTopic -): Result[seq[PeerId], string] = - ## Returns the list of peerIds in a mesh defined by the passed pubsub topic. +): Result[HashSet[PubSubPeer], string] = + ## Returns the list of PubSubPeers in a mesh defined by the passed pubsub topic. ## The 'mesh' atribute is defined in the GossipSub ref object. if not w.mesh.hasKey(pubsubTopic): - debug "getPeersInMesh - there is no mesh peer for the given pubsub topic", + debug "getPubSubPeersInMesh - there is no mesh peer for the given pubsub topic", pubsubTopic = pubsubTopic - return ok(newSeq[PeerId]()) + return ok(initHashSet[PubSubPeer]()) let peersRes = catch: w.mesh[pubsubTopic] let peers: HashSet[PubSubPeer] = peersRes.valueOr: - return err("getPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg) + return err( + "getPubSubPeersInMesh - exception accessing " & pubsubTopic & ": " & error.msg + ) - let peerIds = toSeq(peers).mapIt(it.peerId) + return ok(peers) + +proc getPeersInMesh*( + w: WakuRelay, pubsubTopic: PubsubTopic +): Result[seq[PeerId], string] = + ## Returns the list of peerIds in a mesh defined by the passed pubsub topic. + ## The 'mesh' atribute is defined in the GossipSub ref object. + let pubSubPeers = w.getPubSubPeersInMesh(pubsubTopic).valueOr: + return err(error) + let peerIds = toSeq(pubSubPeers).mapIt(it.peerId) return ok(peerIds) proc getNumPeersInMesh*(w: WakuRelay, pubsubTopic: PubsubTopic): Result[int, string] = ## Returns the number of peers in a mesh defined by the passed pubsub topic. - let peers = w.getPeersInMesh(pubsubTopic).valueOr: + let peers = w.getPubSubPeersInMesh(pubsubTopic).valueOr: return err( "getNumPeersInMesh - failed retrieving peers in mesh: " & pubsubTopic & ": " & error @@ -557,18 +568,17 @@ proc publish*( return ok(relayedPeerCount) -proc getConnectedPeers*( +proc getConnectedPubSubPeers*( w: WakuRelay, pubsubTopic: PubsubTopic -): Result[seq[PeerId], string] = +): Result[HashSet[PubsubPeer], string] = ## Returns the list of peerIds of connected peers and subscribed to the passed pubsub topic. ## The 'gossipsub' atribute is defined in the GossipSub ref object. if pubsubTopic == "": ## Return all the connected peers - var peerIds = newSeq[PeerId]() + var peerIds = initHashSet[PubsubPeer]() for k, v in w.gossipsub: - peerIds.add(toSeq(v).mapIt(it.peerId)) - # alternatively: peerIds &= toSeq(v).mapIt(it.peerId) + peerIds = peerIds + v return ok(peerIds) if not w.gossipsub.hasKey(pubsubTopic): @@ -584,6 +594,17 @@ proc getConnectedPeers*( return err("getConnectedPeers - exception accessing " & pubsubTopic & ": " & error.msg) + return ok(peers) + +proc getConnectedPeers*( + w: WakuRelay, pubsubTopic: PubsubTopic +): Result[seq[PeerId], string] = + ## Returns the list of peerIds of connected peers and subscribed to the passed pubsub topic. + ## The 'gossipsub' atribute is defined in the GossipSub ref object. + + let peers = w.getConnectedPubSubPeers(pubsubTopic).valueOr: + return err(error) + let peerIds = toSeq(peers).mapIt(it.peerId) return ok(peerIds) @@ -593,7 +614,7 @@ proc getNumConnectedPeers*( ## Returns the number of connected peers and subscribed to the passed pubsub topic. ## Return all the connected peers - let peers = w.getConnectedPeers(pubsubTopic).valueOr: + let peers = w.getConnectedPubSubPeers(pubsubTopic).valueOr: return err( "getNumConnectedPeers - failed retrieving peers in mesh: " & pubsubTopic & ": " & error