mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
chore: add missing metrics (#3565)
This commit is contained in:
parent
c07537705e
commit
1a0891dba9
File diff suppressed because it is too large
Load Diff
@ -37,6 +37,8 @@ declarePublicGauge waku_connected_peers,
|
||||
labels = ["direction", "protocol"]
|
||||
declarePublicGauge waku_connected_peers_per_shard,
|
||||
"Number of physical connections per shard", labels = ["shard"]
|
||||
declarePublicGauge waku_connected_peers_per_agent,
|
||||
"Number of physical connections per agent", labels = ["agent"]
|
||||
declarePublicGauge waku_streams_peers,
|
||||
"Number of streams per direction and protocol", labels = ["direction", "protocol"]
|
||||
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
|
||||
@ -72,7 +74,7 @@ const
|
||||
PrunePeerStoreInterval = chronos.minutes(10)
|
||||
|
||||
# How often metrics and logs are shown/updated
|
||||
LogAndMetricsInterval = chronos.minutes(3)
|
||||
LogAndMetricsInterval = chronos.minutes(5)
|
||||
|
||||
# Max peers that we allow from the same IP
|
||||
DefaultColocationLimit* = 5
|
||||
@ -745,7 +747,8 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
|
||||
let notConnectedPeers =
|
||||
peerStore.getDisconnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
|
||||
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
|
||||
let totalConnections = pm.switch.connManager.getConnections().len
|
||||
let connections = pm.switch.connManager.getConnections()
|
||||
let totalConnections = connections.len
|
||||
|
||||
info "Relay peer connections",
|
||||
inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget,
|
||||
@ -771,14 +774,26 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
|
||||
protoStreamsOut.float64, labelValues = [$Direction.Out, proto]
|
||||
)
|
||||
|
||||
for shard in pm.getShards().items:
|
||||
waku_connected_peers_per_shard.set(0.0, labelValues = [$shard])
|
||||
var agentCounts = initTable[string, int]()
|
||||
var connectedPeerIds: HashSet[PeerId]
|
||||
for peerId, muxers in connections:
|
||||
connectedPeerIds.incl(peerId)
|
||||
if peerStore[AgentBook].contains(peerId):
|
||||
let agent = peerStore[AgentBook][peerId]
|
||||
agentCounts[agent] = agentCounts.getOrDefault(agent, 0) + 1
|
||||
for agent, count in agentCounts:
|
||||
waku_connected_peers_per_agent.set(count.float64, labelValues = [$agent])
|
||||
|
||||
for shard in pm.getShards().items:
|
||||
let connectedPeers =
|
||||
# peers known for this shard
|
||||
let shardPeers =
|
||||
peerStore.getPeersByShard(uint16(pm.wakuMetadata.clusterId), shard)
|
||||
|
||||
# keep only those that are physically connected right now
|
||||
let connectedInShard = shardPeers.filterIt(connectedPeerIds.contains(it.peerId))
|
||||
|
||||
waku_connected_peers_per_shard.set(
|
||||
connectedPeers.len.float64, labelValues = [$shard]
|
||||
connectedInShard.len.float64, labelValues = [$shard]
|
||||
)
|
||||
|
||||
proc getOnlineStateObserver*(pm: PeerManager): OnOnlineStateChange =
|
||||
|
||||
@ -36,6 +36,12 @@ declareCounter waku_relay_network_bytes,
|
||||
"total traffic per topic, distinct gross/net and direction",
|
||||
labels = ["topic", "type", "direction"]
|
||||
|
||||
declarePublicGauge(
|
||||
waku_relay_total_msg_bytes_per_shard,
|
||||
"total length of messages seen per shard",
|
||||
labels = ["shard"],
|
||||
)
|
||||
|
||||
declarePublicGauge(
|
||||
waku_relay_max_msg_bytes_per_shard,
|
||||
"Maximum length of messages seen per shard",
|
||||
@ -228,6 +234,8 @@ proc logMessageInfo*(
|
||||
|
||||
waku_relay_avg_msg_bytes_per_shard.set(shardMetrics.avgSize, labelValues = [topic])
|
||||
|
||||
waku_relay_total_msg_bytes_per_shard.set(shardMetrics.sizeSum, labelValues = [topic])
|
||||
|
||||
proc initRelayObservers(w: WakuRelay) =
|
||||
proc decodeRpcMessageInfo(
|
||||
peer: PubSubPeer, msg: Message
|
||||
|
||||
@ -25,10 +25,10 @@ declarePublicCounter(
|
||||
waku_rln_invalid_messages_total, "number of invalid messages detected", ["type"]
|
||||
)
|
||||
# This metric will be useful in detecting the index of the root in the acceptable window of roots
|
||||
declarePublicHistogram(
|
||||
identifier = waku_rln_valid_messages_total,
|
||||
help = "number of valid messages with their roots tracked",
|
||||
buckets = generateBucketsForHistogram(AcceptableRootWindowSize),
|
||||
declarePublicCounter(
|
||||
waku_rln_valid_messages_total,
|
||||
"number of valid messages with their roots tracked",
|
||||
["shard"],
|
||||
)
|
||||
declarePublicCounter(
|
||||
waku_rln_errors_total,
|
||||
|
||||
@ -256,8 +256,7 @@ proc validateMessage*(
|
||||
return MessageValidationResult.Spam
|
||||
|
||||
trace "message is valid", payloadLen = msg.payload.len
|
||||
let rootIndex = rlnPeer.groupManager.indexOfRoot(proof.merkleRoot)
|
||||
waku_rln_valid_messages_total.observe(rootIndex.toFloat())
|
||||
# Metric increment moved to validator to include shard label
|
||||
return MessageValidationResult.Valid
|
||||
|
||||
proc validateMessageAndUpdateLog*(
|
||||
@ -356,15 +355,16 @@ proc generateRlnValidator*(
|
||||
payload = string.fromBytes(message.payload)
|
||||
case validationRes
|
||||
of Valid:
|
||||
trace "message validity is verified, relaying:",
|
||||
trace "message validity is verified, relaying",
|
||||
proof = proof,
|
||||
root = root,
|
||||
shareX = shareX,
|
||||
shareY = shareY,
|
||||
nullifier = nullifier
|
||||
waku_rln_valid_messages_total.inc(labelValues = [topic])
|
||||
return pubsub.ValidationResult.Accept
|
||||
of Invalid:
|
||||
trace "message validity could not be verified, discarding:",
|
||||
trace "message validity could not be verified, discarding",
|
||||
proof = proof,
|
||||
root = root,
|
||||
shareX = shareX,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user