mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 23:43:07 +00:00
chore: Added extra debug helper via getting peer statistics (#3443)
* Added extra debug helper via getting peer statistics on new /admin/v1/peers/stats endpoint * Add /admin/v1/peers/stats client part * Address review, change protocol names to codec string * fix formatting
This commit is contained in:
parent
e42e28cc6f
commit
f4ad7a332e
@ -62,6 +62,10 @@ proc getMeshPeersByShard*(
|
|||||||
rest, endpoint: "/admin/v1/peers/mesh/on/{shardId}", meth: HttpMethod.MethodGet
|
rest, endpoint: "/admin/v1/peers/mesh/on/{shardId}", meth: HttpMethod.MethodGet
|
||||||
.}
|
.}
|
||||||
|
|
||||||
|
proc getPeersStats*(): RestResponse[PeerStats] {.
|
||||||
|
rest, endpoint: "/admin/v1/peers/stats", meth: HttpMethod.MethodGet
|
||||||
|
.}
|
||||||
|
|
||||||
proc getFilterSubscriptions*(): RestResponse[seq[FilterSubscription]] {.
|
proc getFilterSubscriptions*(): RestResponse[seq[FilterSubscription]] {.
|
||||||
rest, endpoint: "/admin/v1/filter/subscriptions", meth: HttpMethod.MethodGet
|
rest, endpoint: "/admin/v1/filter/subscriptions", meth: HttpMethod.MethodGet
|
||||||
.}
|
.}
|
||||||
|
|||||||
@ -31,6 +31,8 @@ export types
|
|||||||
logScope:
|
logScope:
|
||||||
topics = "waku node rest admin api"
|
topics = "waku node rest admin api"
|
||||||
|
|
||||||
|
const ROUTE_ADMIN_V1_PEERS_STATS* = "/admin/v1/peers/stats" # provides peer statistics
|
||||||
|
|
||||||
const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers" # returns all peers
|
const ROUTE_ADMIN_V1_PEERS* = "/admin/v1/peers" # returns all peers
|
||||||
const ROUTE_ADMIN_V1_SINGLE_PEER* = "/admin/v1/peer/{peerId}"
|
const ROUTE_ADMIN_V1_SINGLE_PEER* = "/admin/v1/peer/{peerId}"
|
||||||
|
|
||||||
@ -94,6 +96,40 @@ proc populateAdminPeerInfoForCodecs(node: WakuNode, codecs: seq[string]): WakuPe
|
|||||||
|
|
||||||
return peers
|
return peers
|
||||||
|
|
||||||
|
proc getRelayPeers(node: WakuNode): PeersOfShards =
|
||||||
|
var relayPeers: PeersOfShards = @[]
|
||||||
|
if not node.wakuRelay.isNil():
|
||||||
|
for topic in node.wakuRelay.getSubscribedTopics():
|
||||||
|
let relayShard = RelayShard.parse(topic).valueOr:
|
||||||
|
error "Invalid subscribed topic", error = error, topic = topic
|
||||||
|
continue
|
||||||
|
let pubsubPeers =
|
||||||
|
node.wakuRelay.getConnectedPubSubPeers(topic).get(initHashSet[PubSubPeer](0))
|
||||||
|
relayPeers.add(
|
||||||
|
PeersOfShard(
|
||||||
|
shard: relayShard.shardId,
|
||||||
|
peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager)),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return relayPeers
|
||||||
|
|
||||||
|
proc getMeshPeers(node: WakuNode): PeersOfShards =
|
||||||
|
var meshPeers: PeersOfShards = @[]
|
||||||
|
if not node.wakuRelay.isNil():
|
||||||
|
for topic in node.wakuRelay.getSubscribedTopics():
|
||||||
|
let relayShard = RelayShard.parse(topic).valueOr:
|
||||||
|
error "Invalid subscribed topic", error = error, topic = topic
|
||||||
|
continue
|
||||||
|
let peers =
|
||||||
|
node.wakuRelay.getPubSubPeersInMesh(topic).get(initHashSet[PubSubPeer](0))
|
||||||
|
meshPeers.add(
|
||||||
|
PeersOfShard(
|
||||||
|
shard: relayShard.shardId,
|
||||||
|
peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager)),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return meshPeers
|
||||||
|
|
||||||
proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
|
proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
|
||||||
router.api(MethodGet, ROUTE_ADMIN_V1_PEERS) do() -> RestApiResponse:
|
router.api(MethodGet, ROUTE_ADMIN_V1_PEERS) do() -> RestApiResponse:
|
||||||
let peers = populateAdminPeerInfoForAll(node)
|
let peers = populateAdminPeerInfoForAll(node)
|
||||||
@ -185,19 +221,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
|
|||||||
"Error: Relay Protocol is not mounted to the node"
|
"Error: Relay Protocol is not mounted to the node"
|
||||||
)
|
)
|
||||||
|
|
||||||
var relayPeers: PeersOfShards = @[]
|
var relayPeers: PeersOfShards = getRelayPeers(node)
|
||||||
for topic in node.wakuRelay.getSubscribedTopics():
|
|
||||||
let relayShard = RelayShard.parse(topic).valueOr:
|
|
||||||
error "Invalid subscribed topic", error = error, topic = topic
|
|
||||||
continue
|
|
||||||
let pubsubPeers =
|
|
||||||
node.wakuRelay.getConnectedPubSubPeers(topic).get(initHashSet[PubSubPeer](0))
|
|
||||||
relayPeers.add(
|
|
||||||
PeersOfShard(
|
|
||||||
shard: relayShard.shardId,
|
|
||||||
peers: toSeq(pubsubPeers).mapIt(WakuPeer.init(it, node.peerManager)),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200).valueOr:
|
let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200).valueOr:
|
||||||
error "An error occurred while building the json response: ", error = error
|
error "An error occurred while building the json response: ", error = error
|
||||||
@ -240,21 +264,9 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
|
|||||||
"Error: Relay Protocol is not mounted to the node"
|
"Error: Relay Protocol is not mounted to the node"
|
||||||
)
|
)
|
||||||
|
|
||||||
var relayPeers: PeersOfShards = @[]
|
var meshPeers: PeersOfShards = getMeshPeers(node)
|
||||||
for topic in node.wakuRelay.getSubscribedTopics():
|
|
||||||
let relayShard = RelayShard.parse(topic).valueOr:
|
|
||||||
error "Invalid subscribed topic", error = error, topic = topic
|
|
||||||
continue
|
|
||||||
let peers =
|
|
||||||
node.wakuRelay.getPubSubPeersInMesh(topic).get(initHashSet[PubSubPeer](0))
|
|
||||||
relayPeers.add(
|
|
||||||
PeersOfShard(
|
|
||||||
shard: relayShard.shardId,
|
|
||||||
peers: toSeq(peers).mapIt(WakuPeer.init(it, node.peerManager)),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
let resp = RestApiResponse.jsonResponse(relayPeers, status = Http200).valueOr:
|
let resp = RestApiResponse.jsonResponse(meshPeers, status = Http200).valueOr:
|
||||||
error "An error occurred while building the json response: ", error = error
|
error "An error occurred while building the json response: ", error = error
|
||||||
return RestApiResponse.internalServerError(
|
return RestApiResponse.internalServerError(
|
||||||
fmt("An error occurred while building the json response: {error}")
|
fmt("An error occurred while building the json response: {error}")
|
||||||
@ -289,6 +301,75 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
|
|||||||
|
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
|
router.api(MethodGet, ROUTE_ADMIN_V1_PEERS_STATS) do() -> RestApiResponse:
|
||||||
|
let peers = populateAdminPeerInfoForAll(node)
|
||||||
|
|
||||||
|
var stats: PeerStats = initOrderedTable[string, OrderedTable[string, int]]()
|
||||||
|
|
||||||
|
stats["Sum"] = {"Total peers": peers.len()}.toOrderedTable()
|
||||||
|
|
||||||
|
# stats of connectedness
|
||||||
|
var connectednessStats = initOrderedTable[string, int]()
|
||||||
|
connectednessStats[$Connectedness.Connected] =
|
||||||
|
peers.countIt(it.connected == Connectedness.Connected)
|
||||||
|
connectednessStats[$Connectedness.NotConnected] =
|
||||||
|
peers.countIt(it.connected == Connectedness.NotConnected)
|
||||||
|
connectednessStats[$Connectedness.CannotConnect] =
|
||||||
|
peers.countIt(it.connected == Connectedness.CannotConnect)
|
||||||
|
connectednessStats[$Connectedness.CanConnect] =
|
||||||
|
peers.countIt(it.connected == Connectedness.CanConnect)
|
||||||
|
stats["By Connectedness"] = connectednessStats
|
||||||
|
|
||||||
|
# stats of relay peers
|
||||||
|
var totalRelayPeers = 0
|
||||||
|
stats["Relay peers"] = block:
|
||||||
|
let relayPeers = getRelayPeers(node)
|
||||||
|
var stat = initOrderedTable[string, int]()
|
||||||
|
for ps in relayPeers:
|
||||||
|
totalRelayPeers += ps.peers.len
|
||||||
|
stat[$ps.shard] = ps.peers.len
|
||||||
|
stat["Total relay peers"] = relayPeers.len
|
||||||
|
stat
|
||||||
|
|
||||||
|
# stats of mesh peers
|
||||||
|
stats["Mesh peers"] = block:
|
||||||
|
let meshPeers = getMeshPeers(node)
|
||||||
|
var totalMeshPeers = 0
|
||||||
|
var stat = initOrderedTable[string, int]()
|
||||||
|
for ps in meshPeers:
|
||||||
|
totalMeshPeers += ps.peers.len
|
||||||
|
stat[$ps.shard] = ps.peers.len
|
||||||
|
stat["Total mesh peers"] = meshPeers.len
|
||||||
|
stat
|
||||||
|
|
||||||
|
var protoStats = initOrderedTable[string, int]()
|
||||||
|
protoStats[WakuRelayCodec] = peers.countIt(it.protocols.contains(WakuRelayCodec))
|
||||||
|
protoStats[WakuFilterSubscribeCodec] =
|
||||||
|
peers.countIt(it.protocols.contains(WakuFilterSubscribeCodec))
|
||||||
|
protoStats[WakuFilterPushCodec] =
|
||||||
|
peers.countIt(it.protocols.contains(WakuFilterPushCodec))
|
||||||
|
protoStats[WakuStoreCodec] = peers.countIt(it.protocols.contains(WakuStoreCodec))
|
||||||
|
protoStats[WakuLegacyStoreCodec] =
|
||||||
|
peers.countIt(it.protocols.contains(WakuLegacyStoreCodec))
|
||||||
|
protoStats[WakuLightPushCodec] =
|
||||||
|
peers.countIt(it.protocols.contains(WakuLightPushCodec))
|
||||||
|
protoStats[WakuLegacyLightPushCodec] =
|
||||||
|
peers.countIt(it.protocols.contains(WakuLegacyLightPushCodec))
|
||||||
|
protoStats[WakuPeerExchangeCodec] =
|
||||||
|
peers.countIt(it.protocols.contains(WakuPeerExchangeCodec))
|
||||||
|
protoStats[WakuReconciliationCodec] =
|
||||||
|
peers.countIt(it.protocols.contains(WakuReconciliationCodec))
|
||||||
|
|
||||||
|
stats["By Protocols"] = protoStats
|
||||||
|
|
||||||
|
let resp = RestApiResponse.jsonResponse(stats, status = Http200).valueOr:
|
||||||
|
error "An error occurred while building the json response: ", error = error
|
||||||
|
return RestApiResponse.internalServerError(
|
||||||
|
fmt("An error occurred while building the json response: {error}")
|
||||||
|
)
|
||||||
|
|
||||||
|
return resp
|
||||||
|
|
||||||
proc installAdminV1PostPeersHandler(router: var RestRouter, node: WakuNode) =
|
proc installAdminV1PostPeersHandler(router: var RestRouter, node: WakuNode) =
|
||||||
router.api(MethodPost, ROUTE_ADMIN_V1_PEERS) do(
|
router.api(MethodPost, ROUTE_ADMIN_V1_PEERS) do(
|
||||||
contentBody: Option[ContentBody]
|
contentBody: Option[ContentBody]
|
||||||
|
|||||||
@ -35,6 +35,9 @@ type FilterSubscription* = object
|
|||||||
peerId*: string
|
peerId*: string
|
||||||
filterCriteria*: seq[FilterTopic]
|
filterCriteria*: seq[FilterTopic]
|
||||||
|
|
||||||
|
type PeerStats* = OrderedTable[string, OrderedTable[string, int]]
|
||||||
|
# maps high level grouping to low level grouping of counters
|
||||||
|
|
||||||
#### Serialization and deserialization
|
#### Serialization and deserialization
|
||||||
proc writeValue*(
|
proc writeValue*(
|
||||||
writer: var JsonWriter[RestJson], value: WakuPeer
|
writer: var JsonWriter[RestJson], value: WakuPeer
|
||||||
@ -73,6 +76,23 @@ proc writeValue*(
|
|||||||
writer.writeField("filterCriteria", value.filterCriteria)
|
writer.writeField("filterCriteria", value.filterCriteria)
|
||||||
writer.endRecord()
|
writer.endRecord()
|
||||||
|
|
||||||
|
proc writeValue*(
|
||||||
|
writer: var JsonWriter[RestJson], value: OrderedTable[string, int]
|
||||||
|
) {.raises: [IOError].} =
|
||||||
|
writer.beginRecord()
|
||||||
|
for key, value in value.pairs:
|
||||||
|
writer.writeField(key, value)
|
||||||
|
writer.endRecord()
|
||||||
|
|
||||||
|
proc writeValue*(
|
||||||
|
writer: var JsonWriter[RestJson],
|
||||||
|
value: OrderedTable[string, OrderedTable[string, int]],
|
||||||
|
) {.raises: [IOError].} =
|
||||||
|
writer.beginRecord()
|
||||||
|
for group, subTab in value.pairs:
|
||||||
|
writer.writeField(group, subTab)
|
||||||
|
writer.endRecord()
|
||||||
|
|
||||||
proc readValue*(
|
proc readValue*(
|
||||||
reader: var JsonReader[RestJson], value: var WakuPeer
|
reader: var JsonReader[RestJson], value: var WakuPeer
|
||||||
) {.gcsafe, raises: [SerializationError, IOError].} =
|
) {.gcsafe, raises: [SerializationError, IOError].} =
|
||||||
@ -238,6 +258,21 @@ proc readValue*(
|
|||||||
|
|
||||||
value = FilterSubscription(peerId: peerId.get(), filterCriteria: filterCriteria.get())
|
value = FilterSubscription(peerId: peerId.get(), filterCriteria: filterCriteria.get())
|
||||||
|
|
||||||
|
proc readValue*(
|
||||||
|
reader: var JsonReader[RestJson], value: var OrderedTable[string, int]
|
||||||
|
) {.gcsafe, raises: [SerializationError, IOError].} =
|
||||||
|
for fieldName in readObjectFields(reader):
|
||||||
|
let fieldValue = reader.readValue(int)
|
||||||
|
value[fieldName] = fieldValue
|
||||||
|
|
||||||
|
proc readValue*(
|
||||||
|
reader: var JsonReader[RestJson],
|
||||||
|
value: var OrderedTable[string, OrderedTable[string, int]],
|
||||||
|
) {.gcsafe, raises: [SerializationError, IOError].} =
|
||||||
|
for fieldName in readObjectFields(reader):
|
||||||
|
let fieldValue = reader.readValue(OrderedTable[string, int])
|
||||||
|
value[fieldName] = fieldValue
|
||||||
|
|
||||||
func `==`*(a, b: WakuPeer): bool {.inline.} =
|
func `==`*(a, b: WakuPeer): bool {.inline.} =
|
||||||
return a.multiaddr == b.multiaddr
|
return a.multiaddr == b.multiaddr
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user