From f4ad7a332e0012c7d3432b5b9eab56e0afa7a305 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 10 Jun 2025 02:10:06 +0200 Subject: [PATCH] chore: Added extra debug helper via getting peer statistics (#3443) * Added extra debug helper via getting peer statistics on new /admin/v1/peers/stats endpoint * Add /admin/v1/peers/stats client part * Address review, change protocol names to codec string * fix formatting --- waku/waku_api/rest/admin/client.nim | 4 + waku/waku_api/rest/admin/handlers.nim | 135 ++++++++++++++++++++------ waku/waku_api/rest/admin/types.nim | 35 +++++++ 3 files changed, 147 insertions(+), 27 deletions(-) diff --git a/waku/waku_api/rest/admin/client.nim b/waku/waku_api/rest/admin/client.nim index 7d45544e2..87d46dd3d 100644 --- a/waku/waku_api/rest/admin/client.nim +++ b/waku/waku_api/rest/admin/client.nim @@ -62,6 +62,10 @@ proc getMeshPeersByShard*( rest, endpoint: "/admin/v1/peers/mesh/on/{shardId}", meth: HttpMethod.MethodGet .} +proc getPeersStats*(): RestResponse[PeerStats] {. + rest, endpoint: "/admin/v1/peers/stats", meth: HttpMethod.MethodGet +.} + proc getFilterSubscriptions*(): RestResponse[seq[FilterSubscription]] {. rest, endpoint: "/admin/v1/filter/subscriptions", meth: HttpMethod.MethodGet .} diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index 9cf6ec131..ba401de25 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -31,6 +31,8 @@ export types logScope: topics = "waku node rest admin api" +const ROUTE_ADMIN_V1_PEERS_STATS* = "/admin/v1/peers/stats" # provides peer statistics + const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers" # returns all peers const ROUTE_ADMIN_V1_SINGLE_PEER* = "/admin/v1/peer/{peerId}" @@ -94,6 +96,40 @@ proc populateAdminPeerInfoForCodecs(node: WakuNode, codecs: seq[string]): WakuPe return peers +proc getRelayPeers(node: WakuNode): PeersOfShards = + var relayPeers: PeersOfShards = @[] + if not node.wakuRelay.isNil(): + for topic in node.wakuRelay.getSubscribedTopics(): + let relayShard = RelayShard.parse(topic).valueOr: + error "Invalid subscribed topic", error = error, topic = topic + continue + let pubsubPeers = + node.wakuRelay.getConnectedPubSubPeers(topic).get(initHashSet[PubSubPeer](0)) + relayPeers.add( + PeersOfShard( + shard: relayShard.shardId, + peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager)), + ) + ) + return relayPeers + +proc getMeshPeers(node: WakuNode): PeersOfShards = + var meshPeers: PeersOfShards = @[] + if not node.wakuRelay.isNil(): + for topic in node.wakuRelay.getSubscribedTopics(): + let relayShard = RelayShard.parse(topic).valueOr: + error "Invalid subscribed topic", error = error, topic = topic + continue + let peers = + node.wakuRelay.getPubSubPeersInMesh(topic).get(initHashSet[PubSubPeer](0)) + meshPeers.add( + PeersOfShard( + shard: relayShard.shardId, + peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager)), + ) + ) + return meshPeers + proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = router.api(MethodGet, ROUTE_ADMIN_V1_PEERS) do() -> RestApiResponse: let peers = populateAdminPeerInfoForAll(node) @@ -185,19 +221,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = "Error: Relay Protocol is not mounted to the node" ) - var relayPeers: PeersOfShards = @[] - for topic in node.wakuRelay.getSubscribedTopics(): - let relayShard = RelayShard.parse(topic).valueOr: - error "Invalid subscribed topic", error = error, topic = topic - continue - let pubsubPeers = - node.wakuRelay.getConnectedPubSubPeers(topic).get(initHashSet[PubSubPeer](0)) - relayPeers.add( - PeersOfShard( - shard: relayShard.shardId, - peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager)), - ) - ) + var relayPeers: PeersOfShards = getRelayPeers(node) let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200).valueOr: error "An error occurred while building the json response: ", error = error @@ -240,21 +264,9 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = "Error: Relay Protocol is not mounted to the node" ) - var relayPeers: PeersOfShards = @[] - for topic in node.wakuRelay.getSubscribedTopics(): - let relayShard = RelayShard.parse(topic).valueOr: - error "Invalid subscribed topic", error = error, topic = topic - continue - let peers = - node.wakuRelay.getPubSubPeersInMesh(topic).get(initHashSet[PubSubPeer](0)) - relayPeers.add( - PeersOfShard( - shard: relayShard.shardId, - peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager)), - ) - ) + var meshPeers: PeersOfShards = getMeshPeers(node) - let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200).valueOr: + let resp = RestApiResponse.jsonResponse(meshPeers, status = Http200).valueOr: error "An error occurred while building the json response: ", error = error return RestApiResponse.internalServerError( fmt("An error occurred while building the json response: {error}") @@ -289,6 +301,75 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = return resp + router.api(MethodGet, ROUTE_ADMIN_V1_PEERS_STATS) do() -> RestApiResponse: + let peers = populateAdminPeerInfoForAll(node) + + var stats: PeerStats = initOrderedTable[string, OrderedTable[string, int]]() + + stats["Sum"] = {"Total peers": peers.len()}.toOrderedTable() + + # stats of connectedness + var connectednessStats = initOrderedTable[string, int]() + connectednessStats[$Connectedness.Connected] = + peers.countIt(it.connected == Connectedness.Connected) + connectednessStats[$Connectedness.NotConnected] = + peers.countIt(it.connected == Connectedness.NotConnected) + connectednessStats[$Connectedness.CannotConnect] = + peers.countIt(it.connected == Connectedness.CannotConnect) + connectednessStats[$Connectedness.CanConnect] = + peers.countIt(it.connected == Connectedness.CanConnect) + stats["By Connectedness"] = connectednessStats + + # stats of relay peers + var totalRelayPeers = 0 + stats["Relay peers"] = block: + let relayPeers = getRelayPeers(node) + var stat = initOrderedTable[string, int]() + for ps in relayPeers: + totalRelayPeers += ps.peers.len + stat[$ps.shard] = ps.peers.len + stat["Total relay peers"] = relayPeers.len + stat + + # stats of mesh peers + stats["Mesh peers"] = block: + let meshPeers = getMeshPeers(node) + var totalMeshPeers = 0 + var stat = initOrderedTable[string, int]() + for ps in meshPeers: + totalMeshPeers += ps.peers.len + stat[$ps.shard] = ps.peers.len + stat["Total mesh peers"] = meshPeers.len + stat + + var protoStats = initOrderedTable[string, int]() + protoStats[WakuRelayCodec] = peers.countIt(it.protocols.contains(WakuRelayCodec)) + protoStats[WakuFilterSubscribeCodec] = + peers.countIt(it.protocols.contains(WakuFilterSubscribeCodec)) + protoStats[WakuFilterPushCodec] = + peers.countIt(it.protocols.contains(WakuFilterPushCodec)) + protoStats[WakuStoreCodec] = peers.countIt(it.protocols.contains(WakuStoreCodec)) + protoStats[WakuLegacyStoreCodec] = + peers.countIt(it.protocols.contains(WakuLegacyStoreCodec)) + protoStats[WakuLightPushCodec] = + peers.countIt(it.protocols.contains(WakuLightPushCodec)) + protoStats[WakuLegacyLightPushCodec] = + peers.countIt(it.protocols.contains(WakuLegacyLightPushCodec)) + protoStats[WakuPeerExchangeCodec] = + peers.countIt(it.protocols.contains(WakuPeerExchangeCodec)) + protoStats[WakuReconciliationCodec] = + peers.countIt(it.protocols.contains(WakuReconciliationCodec)) + + stats["By Protocols"] = protoStats + + let resp = RestApiResponse.jsonResponse(stats, status = Http200).valueOr: + error "An error occurred while building the json response: ", error = error + return RestApiResponse.internalServerError( + fmt("An error occurred while building the json response: {error}") + ) + + return resp + proc installAdminV1PostPeersHandler(router: var RestRouter, node: WakuNode) = router.api(MethodPost, ROUTE_ADMIN_V1_PEERS) do( contentBody: Option[ContentBody] diff --git a/waku/waku_api/rest/admin/types.nim b/waku/waku_api/rest/admin/types.nim index 0c0786e3d..483acf8b8 100644 --- a/waku/waku_api/rest/admin/types.nim +++ b/waku/waku_api/rest/admin/types.nim @@ -35,6 +35,9 @@ type FilterSubscription* = object peerId*: string filterCriteria*: seq[FilterTopic] +type PeerStats* = OrderedTable[string, OrderedTable[string, int]] + # maps high level grouping to low level grouping of counters + #### Serialization and deserialization proc writeValue*( writer: var JsonWriter[RestJson], value: WakuPeer @@ -73,6 +76,23 @@ proc writeValue*( writer.writeField("filterCriteria", value.filterCriteria) writer.endRecord() +proc writeValue*( + writer: var JsonWriter[RestJson], value: OrderedTable[string, int] +) {.raises: [IOError].} = + writer.beginRecord() + for key, value in value.pairs: + writer.writeField(key, value) + writer.endRecord() + +proc writeValue*( + writer: var JsonWriter[RestJson], + value: OrderedTable[string, OrderedTable[string, int]], +) {.raises: [IOError].} = + writer.beginRecord() + for group, subTab in value.pairs: + writer.writeField(group, subTab) + writer.endRecord() + proc readValue*( reader: var JsonReader[RestJson], value: var WakuPeer ) {.gcsafe, raises: [SerializationError, IOError].} = @@ -238,6 +258,21 @@ proc readValue*( value = FilterSubscription(peerId: peerId.get(), filterCriteria: filterCriteria.get()) +proc readValue*( + reader: var JsonReader[RestJson], value: var OrderedTable[string, int] +) {.gcsafe, raises: [SerializationError, IOError].} = + for fieldName in readObjectFields(reader): + let fieldValue = reader.readValue(int) + value[fieldName] = fieldValue + +proc readValue*( + reader: var JsonReader[RestJson], + value: var OrderedTable[string, OrderedTable[string, int]], +) {.gcsafe, raises: [SerializationError, IOError].} = + for fieldName in readObjectFields(reader): + let fieldValue = reader.readValue(OrderedTable[string, int]) + value[fieldName] = fieldValue + func `==`*(a, b: WakuPeer): bool {.inline.} = return a.multiaddr == b.multiaddr