From c3da29fd63e4b0fdd72a032a38088749c0c57ebb Mon Sep 17 00:00:00 2001 From: Darshan K <35736874+darshankabariya@users.noreply.github.com> Date: Thu, 31 Jul 2025 22:53:38 +0530 Subject: [PATCH] feat: shard-specific metrics tracking (#3520) --- waku/discovery/waku_discv5.nim | 18 ++++++++-- waku/node/peer_manager/peer_manager.nim | 12 +++++++ waku/waku_archive/archive.nim | 5 +++ waku/waku_archive/archive_metrics.nim | 2 ++ waku/waku_relay/protocol.nim | 44 ++++++++++++++++++++---- waku/waku_rln_relay/protocol_metrics.nim | 3 +- waku/waku_store/client.nim | 12 +++++++ waku/waku_store/protocol_metrics.nim | 12 +++++++ 8 files changed, 99 insertions(+), 9 deletions(-) diff --git a/waku/discovery/waku_discv5.nim b/waku/discovery/waku_discv5.nim index 5bdb91a2e..58c50df52 100644 --- a/waku/discovery/waku_discv5.nim +++ b/waku/discovery/waku_discv5.nim @@ -14,7 +14,8 @@ import ../node/peer_manager/peer_manager, ../waku_core, ../waku_enr export protocol, waku_enr -declarePublicGauge waku_discv5_discovered, "number of nodes discovered" +declarePublicGauge waku_discv5_discovered_per_shard, + "number of nodes discovered by each shard", labels = ["shard"] declarePublicGauge waku_discv5_errors, "number of waku discv5 errors", ["type"] logScope: @@ -231,7 +232,20 @@ proc findRandomPeers*( elif wd.predicate.isSome(): discoveredRecords = discoveredRecords.filter(wd.predicate.get()) - waku_discv5_discovered.inc(discoveredRecords.len) + # Increment metric for each discovered record's shards + for record in discoveredRecords: + let typedRecord = record.toTyped().valueOr: + # If we can't parse the record, skip it + waku_discv5_errors.inc(labelValues = ["ParseFailure"]) + continue + + let relayShards = typedRecord.relaySharding().valueOr: + # If no relay sharding info, skip it + waku_discv5_errors.inc(labelValues = ["NoShardInfo"]) + continue + + for shardId in relayShards.shardIds: + waku_discv5_discovered_per_shard.inc(labelValues = [$shardId]) return discoveredRecords diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 0df7c672d..bb9716205 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -34,6 +34,8 @@ declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"] declarePublicGauge waku_connected_peers, "Number of physical connections per direction and protocol", labels = ["direction", "protocol"] +declarePublicGauge waku_connected_peers_per_shard, + "Number of physical connections per shard", labels = ["shard"] declarePublicGauge waku_streams_peers, "Number of streams per direction and protocol", labels = ["direction", "protocol"] declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store" @@ -778,6 +780,16 @@ proc logAndMetrics(pm: PeerManager) {.async.} = protoStreamsOut.float64, labelValues = [$Direction.Out, proto] ) + for shard in pm.wakuMetadata.shards.items: + waku_connected_peers_per_shard.set(0.0, labelValues = [$shard]) + + for shard in pm.wakuMetadata.shards.items: + let connectedPeers = + peerStore.getPeersByShard(uint16(pm.wakuMetadata.clusterId), uint16(shard)) + waku_connected_peers_per_shard.set( + connectedPeers.len.float64, labelValues = [$shard] + ) + proc getOnlineStateObserver*(pm: PeerManager): OnOnlineStateChange = return proc(online: bool) {.gcsafe, raises: [].} = pm.online = online diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index c296e2a6f..f3112c7b1 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -119,6 +119,11 @@ proc handleMessage*( let insertDuration = getTime().toUnixFloat() - insertStartTime waku_archive_insert_duration_seconds.observe(insertDuration) + let shard = RelayShard.parseStaticSharding(pubsubTopic).valueOr: + DefaultRelayShard + + waku_archive_messages_per_shard.inc(labelValues = [$shard.shardId]) + trace "message archived", msg_hash = msgHashHex, pubsubTopic = pubsubTopic, diff --git a/waku/waku_archive/archive_metrics.nim b/waku/waku_archive/archive_metrics.nim index fd39f923e..918c2933e 100644 --- a/waku/waku_archive/archive_metrics.nim +++ b/waku/waku_archive/archive_metrics.nim @@ -3,6 +3,8 @@ import metrics declarePublicGauge waku_archive_messages, "number of historical messages", ["type"] +declarePublicGauge waku_archive_messages_per_shard, + "number of historical messages per shard ", ["shard"] declarePublicGauge waku_archive_errors, "number of store protocol errors", ["type"] declarePublicGauge waku_archive_queries, "number of store queries received" declarePublicHistogram waku_archive_insert_duration_seconds, diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 18d60dcef..070d6992f 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -23,9 +23,31 @@ import from ../waku_core/codecs import WakuRelayCodec export WakuRelayCodec +type ShardMetrics = object + count: float64 + sizeSum: float64 + avgSize: float64 + maxSize: float64 + logScope: topics = "waku relay" +declareCounter waku_relay_network_bytes, + "total traffic per topic, distinct gross/net and direction", + labels = ["topic", "type", "direction"] + +declarePublicGauge( + waku_relay_max_msg_bytes_per_shard, + "Maximum length of messages seen per shard", + labels = ["shard"], +) + +declarePublicGauge( + waku_relay_avg_msg_bytes_per_shard, + "Average length of messages seen per shard", + labels = ["shard"], +) + # see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters const TopicParameters = TopicParams( topicWeight: 1, @@ -58,10 +80,6 @@ const TopicParameters = TopicParams( invalidMessageDeliveriesDecay: 0.5, ) -declareCounter waku_relay_network_bytes, - "total traffic per topic, distinct gross/net and direction", - labels = ["topic", "type", "direction"] - # see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters const GossipsubParameters = GossipSubParams.init( pruneBackoff = chronos.minutes(1), @@ -137,6 +155,7 @@ type topicsHealth*: Table[string, TopicHealth] onTopicHealthChange*: TopicHealthChangeHandler topicHealthLoopHandle*: Future[void] + msgMetricsPerShard*: Table[string, ShardMetrics] # predefinition for more detailed results from publishing new message type PublishOutcome* {.pure.} = enum @@ -176,6 +195,7 @@ proc logMessageInfo*( onRecv: bool, ) = let msg_hash = computeMessageHash(topic, msg).to0xHex() + let payloadSize = float64(msg.payload.len) if onRecv: notice "received relay message", @@ -185,7 +205,7 @@ proc logMessageInfo*( from_peer_id = remotePeerId, topic = topic, receivedTime = getNowInNanosecondTime(), - payloadSizeBytes = msg.payload.len + payloadSizeBytes = payloadSize else: notice "sent relay message", my_peer_id = w.switch.peerInfo.peerId, @@ -194,7 +214,19 @@ proc logMessageInfo*( to_peer_id = remotePeerId, topic = topic, sentTime = getNowInNanosecondTime(), - payloadSizeBytes = msg.payload.len + payloadSizeBytes = payloadSize + + var shardMetrics = w.msgMetricsPerShard.getOrDefault(topic, ShardMetrics()) + shardMetrics.count += 1 + shardMetrics.sizeSum += payloadSize + if payloadSize > shardMetrics.maxSize: + shardMetrics.maxSize = payloadSize + shardMetrics.avgSize = shardMetrics.sizeSum / shardMetrics.count + w.msgMetricsPerShard[topic] = shardMetrics + + waku_relay_max_msg_bytes_per_shard.set(shardMetrics.maxSize, labelValues = [topic]) + + waku_relay_avg_msg_bytes_per_shard.set(shardMetrics.avgSize, labelValues = [topic]) proc initRelayObservers(w: WakuRelay) = proc decodeRpcMessageInfo( diff --git a/waku/waku_rln_relay/protocol_metrics.nim b/waku/waku_rln_relay/protocol_metrics.nim index 2210328f4..55bafc1fa 100644 --- a/waku/waku_rln_relay/protocol_metrics.nim +++ b/waku/waku_rln_relay/protocol_metrics.nim @@ -17,8 +17,9 @@ func generateBucketsForHistogram*(length: int): seq[float64] = return buckets declarePublicCounter( - waku_rln_messages_total, "number of messages published on the rln content topic" + waku_rln_messages_total, "number of messages seen by the rln relay" ) + declarePublicCounter(waku_rln_spam_messages_total, "number of spam messages detected") declarePublicCounter( waku_rln_invalid_messages_total, "number of invalid messages detected", ["type"] diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 082120823..c5885e8d5 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -13,6 +13,7 @@ const DefaultPageSize*: uint = 20 type WakuStoreClient* = ref object peerManager: PeerManager rng: ref rand.HmacDrbgContext + storeMsgMetricsPerShard*: Table[string, float64] proc new*( T: type WakuStoreClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext @@ -46,6 +47,17 @@ proc sendStoreRequest( waku_store_errors.inc(labelValues = [NoSuccessStatusCode]) return err(StoreError.new(res.statusCode, res.statusDesc)) + if req.pubsubTopic.isSome(): + let topic = req.pubsubTopic.get() + if not self.storeMsgMetricsPerShard.hasKey(topic): + self.storeMsgMetricsPerShard[topic] = 0 + self.storeMsgMetricsPerShard[topic] += float64(req.encode().buffer.len) + + waku_relay_fleet_store_msg_size_bytes.inc( + self.storeMsgMetricsPerShard[topic], labelValues = [topic] + ) + waku_relay_fleet_store_msg_count.inc(1.0, labelValues = [topic]) + return ok(res) proc query*( diff --git a/waku/waku_store/protocol_metrics.nim b/waku/waku_store/protocol_metrics.nim index 5d9e69420..1c7bb5ec8 100644 --- a/waku/waku_store/protocol_metrics.nim +++ b/waku/waku_store/protocol_metrics.nim @@ -10,6 +10,18 @@ declarePublicGauge waku_store_queries, "number of store queries received" declarePublicGauge waku_store_time_seconds, "Time in seconds spent by each store phase", labels = ["phase"] +declarePublicGauge( + waku_relay_fleet_store_msg_size_bytes, + "Total size of messages stored by fleet store nodes per shard", + labels = ["shard"], +) + +declarePublicGauge( + waku_relay_fleet_store_msg_count, + "Number of messages stored by fleet store nodes per shard", + labels = ["shard"], +) + # Error types (metric label values) const DialFailure* = "dial_failure"