chore: Added extra debug helper via getting peer statistics (#3443)

* Added extra debug helper via getting peer statistics on new /admin/v1/peers/stats endpoint

* Add /admin/v1/peers/stats client part

* Address review, change protocol names to codec string

* fix formatting
This commit is contained in:
NagyZoltanPeter 2025-06-10 02:10:06 +02:00 committed by GitHub
parent 5132510bc6
commit 895e202265
3 changed files with 147 additions and 27 deletions

View File

@ -62,6 +62,10 @@ proc getMeshPeersByShard*(
rest, endpoint: "/admin/v1/peers/mesh/on/{shardId}", meth: HttpMethod.MethodGet
.}
proc getPeersStats*(): RestResponse[PeerStats] {.
rest, endpoint: "/admin/v1/peers/stats", meth: HttpMethod.MethodGet
.}
proc getFilterSubscriptions*(): RestResponse[seq[FilterSubscription]] {.
rest, endpoint: "/admin/v1/filter/subscriptions", meth: HttpMethod.MethodGet
.}

View File

@ -31,6 +31,8 @@ export types
logScope:
topics = "waku node rest admin api"
const ROUTE_ADMIN_V1_PEERS_STATS* = "/admin/v1/peers/stats" # provides peer statistics
const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers" # returns all peers
const ROUTE_ADMIN_V1_SINGLE_PEER* = "/admin/v1/peer/{peerId}"
@ -94,6 +96,40 @@ proc populateAdminPeerInfoForCodecs(node: WakuNode, codecs: seq[string]): WakuPe
return peers
proc getRelayPeers(node: WakuNode): PeersOfShards =
var relayPeers: PeersOfShards = @[]
if not node.wakuRelay.isNil():
for topic in node.wakuRelay.getSubscribedTopics():
let relayShard = RelayShard.parse(topic).valueOr:
error "Invalid subscribed topic", error = error, topic = topic
continue
let pubsubPeers =
node.wakuRelay.getConnectedPubSubPeers(topic).get(initHashSet[PubSubPeer](0))
relayPeers.add(
PeersOfShard(
shard: relayShard.shardId,
peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager)),
)
)
return relayPeers
proc getMeshPeers(node: WakuNode): PeersOfShards =
var meshPeers: PeersOfShards = @[]
if not node.wakuRelay.isNil():
for topic in node.wakuRelay.getSubscribedTopics():
let relayShard = RelayShard.parse(topic).valueOr:
error "Invalid subscribed topic", error = error, topic = topic
continue
let peers =
node.wakuRelay.getPubSubPeersInMesh(topic).get(initHashSet[PubSubPeer](0))
meshPeers.add(
PeersOfShard(
shard: relayShard.shardId,
peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager)),
)
)
return meshPeers
proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
router.api(MethodGet, ROUTE_ADMIN_V1_PEERS) do() -> RestApiResponse:
let peers = populateAdminPeerInfoForAll(node)
@ -185,19 +221,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
"Error: Relay Protocol is not mounted to the node"
)
var relayPeers: PeersOfShards = @[]
for topic in node.wakuRelay.getSubscribedTopics():
let relayShard = RelayShard.parse(topic).valueOr:
error "Invalid subscribed topic", error = error, topic = topic
continue
let pubsubPeers =
node.wakuRelay.getConnectedPubSubPeers(topic).get(initHashSet[PubSubPeer](0))
relayPeers.add(
PeersOfShard(
shard: relayShard.shardId,
peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager)),
)
)
var relayPeers: PeersOfShards = getRelayPeers(node)
let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200).valueOr:
error "An error occurred while building the json response: ", error = error
@ -240,21 +264,9 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
"Error: Relay Protocol is not mounted to the node"
)
var relayPeers: PeersOfShards = @[]
for topic in node.wakuRelay.getSubscribedTopics():
let relayShard = RelayShard.parse(topic).valueOr:
error "Invalid subscribed topic", error = error, topic = topic
continue
let peers =
node.wakuRelay.getPubSubPeersInMesh(topic).get(initHashSet[PubSubPeer](0))
relayPeers.add(
PeersOfShard(
shard: relayShard.shardId,
peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager)),
)
)
var meshPeers: PeersOfShards = getMeshPeers(node)
let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200).valueOr:
let resp = RestApiResponse.jsonResponse(meshPeers, status = Http200).valueOr:
error "An error occurred while building the json response: ", error = error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {error}")
@ -289,6 +301,75 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
return resp
router.api(MethodGet, ROUTE_ADMIN_V1_PEERS_STATS) do() -> RestApiResponse:
let peers = populateAdminPeerInfoForAll(node)
var stats: PeerStats = initOrderedTable[string, OrderedTable[string, int]]()
stats["Sum"] = {"Total peers": peers.len()}.toOrderedTable()
# stats of connectedness
var connectednessStats = initOrderedTable[string, int]()
connectednessStats[$Connectedness.Connected] =
peers.countIt(it.connected == Connectedness.Connected)
connectednessStats[$Connectedness.NotConnected] =
peers.countIt(it.connected == Connectedness.NotConnected)
connectednessStats[$Connectedness.CannotConnect] =
peers.countIt(it.connected == Connectedness.CannotConnect)
connectednessStats[$Connectedness.CanConnect] =
peers.countIt(it.connected == Connectedness.CanConnect)
stats["By Connectedness"] = connectednessStats
# stats of relay peers
var totalRelayPeers = 0
stats["Relay peers"] = block:
let relayPeers = getRelayPeers(node)
var stat = initOrderedTable[string, int]()
for ps in relayPeers:
totalRelayPeers += ps.peers.len
stat[$ps.shard] = ps.peers.len
stat["Total relay peers"] = relayPeers.len
stat
# stats of mesh peers
stats["Mesh peers"] = block:
let meshPeers = getMeshPeers(node)
var totalMeshPeers = 0
var stat = initOrderedTable[string, int]()
for ps in meshPeers:
totalMeshPeers += ps.peers.len
stat[$ps.shard] = ps.peers.len
stat["Total mesh peers"] = meshPeers.len
stat
var protoStats = initOrderedTable[string, int]()
protoStats[WakuRelayCodec] = peers.countIt(it.protocols.contains(WakuRelayCodec))
protoStats[WakuFilterSubscribeCodec] =
peers.countIt(it.protocols.contains(WakuFilterSubscribeCodec))
protoStats[WakuFilterPushCodec] =
peers.countIt(it.protocols.contains(WakuFilterPushCodec))
protoStats[WakuStoreCodec] = peers.countIt(it.protocols.contains(WakuStoreCodec))
protoStats[WakuLegacyStoreCodec] =
peers.countIt(it.protocols.contains(WakuLegacyStoreCodec))
protoStats[WakuLightPushCodec] =
peers.countIt(it.protocols.contains(WakuLightPushCodec))
protoStats[WakuLegacyLightPushCodec] =
peers.countIt(it.protocols.contains(WakuLegacyLightPushCodec))
protoStats[WakuPeerExchangeCodec] =
peers.countIt(it.protocols.contains(WakuPeerExchangeCodec))
protoStats[WakuReconciliationCodec] =
peers.countIt(it.protocols.contains(WakuReconciliationCodec))
stats["By Protocols"] = protoStats
let resp = RestApiResponse.jsonResponse(stats, status = Http200).valueOr:
error "An error occurred while building the json response: ", error = error
return RestApiResponse.internalServerError(
fmt("An error occurred while building the json response: {error}")
)
return resp
proc installAdminV1PostPeersHandler(router: var RestRouter, node: WakuNode) =
router.api(MethodPost, ROUTE_ADMIN_V1_PEERS) do(
contentBody: Option[ContentBody]

View File

@ -35,6 +35,9 @@ type FilterSubscription* = object
peerId*: string
filterCriteria*: seq[FilterTopic]
type PeerStats* = OrderedTable[string, OrderedTable[string, int]]
# maps high level grouping to low level grouping of counters
#### Serialization and deserialization
proc writeValue*(
writer: var JsonWriter[RestJson], value: WakuPeer
@ -73,6 +76,23 @@ proc writeValue*(
writer.writeField("filterCriteria", value.filterCriteria)
writer.endRecord()
proc writeValue*(
writer: var JsonWriter[RestJson], value: OrderedTable[string, int]
) {.raises: [IOError].} =
writer.beginRecord()
for key, value in value.pairs:
writer.writeField(key, value)
writer.endRecord()
proc writeValue*(
writer: var JsonWriter[RestJson],
value: OrderedTable[string, OrderedTable[string, int]],
) {.raises: [IOError].} =
writer.beginRecord()
for group, subTab in value.pairs:
writer.writeField(group, subTab)
writer.endRecord()
proc readValue*(
reader: var JsonReader[RestJson], value: var WakuPeer
) {.gcsafe, raises: [SerializationError, IOError].} =
@ -238,6 +258,21 @@ proc readValue*(
value = FilterSubscription(peerId: peerId.get(), filterCriteria: filterCriteria.get())
proc readValue*(
reader: var JsonReader[RestJson], value: var OrderedTable[string, int]
) {.gcsafe, raises: [SerializationError, IOError].} =
for fieldName in readObjectFields(reader):
let fieldValue = reader.readValue(int)
value[fieldName] = fieldValue
proc readValue*(
reader: var JsonReader[RestJson],
value: var OrderedTable[string, OrderedTable[string, int]],
) {.gcsafe, raises: [SerializationError, IOError].} =
for fieldName in readObjectFields(reader):
let fieldValue = reader.readValue(OrderedTable[string, int])
value[fieldName] = fieldValue
func `==`*(a, b: WakuPeer): bool {.inline.} =
return a.multiaddr == b.multiaddr