From 4f2ecba4b6009914d7921c35582e803d7512b35e Mon Sep 17 00:00:00 2001 From: darshankabariya Date: Wed, 11 Jun 2025 12:29:06 +0530 Subject: [PATCH] chore: add waku_relay_avg_msg_bytes_per_shard and waku_relay_max_msg_bytes_per_shard --- waku/waku_relay/protocol.nim | 49 ++++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index f6223bb63..a957eb3fd 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -27,8 +27,26 @@ logScope: topics = "waku relay" declarePublicGauge( - waku_relay_messages_per_shard, - "number of unique messages seen, grouped by shard", + waku_relay_msg_count_per_shard, + "Number of messages seen, grouped by shard", + labels = ["shard"], +) + +declarePublicGauge( + waku_relay_max_msg_bytes_per_shard, + "Maximum length of messages seen per shard", + labels = ["shard"], +) + +declarePublicGauge( + waku_relay_avg_msg_bytes_per_shard, + "Average length of messages seen per shard", + labels = ["shard"], +) + +declarePublicGauge( + waku_relay_sum_msg_bytes_per_shard, + "Sum of message lengths seen per shard", labels = ["shard"], ) @@ -182,6 +200,7 @@ proc logMessageInfo*( onRecv: bool, ) = let msg_hash = computeMessageHash(topic, msg).to0xHex() + let payloadSize = msg.payload.len if onRecv: notice "received relay message", @@ -191,7 +210,7 @@ proc logMessageInfo*( from_peer_id = remotePeerId, topic = topic, receivedTime = getNowInNanosecondTime(), - payloadSizeBytes = msg.payload.len + payloadSizeBytes = payloadSize else: notice "sent relay message", my_peer_id = w.switch.peerInfo.peerId, @@ -200,9 +219,29 @@ proc logMessageInfo*( to_peer_id = remotePeerId, topic = topic, sentTime = getNowInNanosecondTime(), - payloadSizeBytes = msg.payload.len + payloadSizeBytes = payloadSize - waku_relay_messages_per_shard.inc(labelValues = [topic]) + # Increment message count + waku_relay_msg_count_per_shard.inc(labelValues = [topic]) + + # Add to sum of message sizes + waku_relay_sum_msg_bytes_per_shard.inc(payloadSize, labelValues = [topic]) + + # Calculate and set average (sum/count) + try: + let count = waku_relay_msg_count_per_shard.value(labelValuesParam = [topic]) + let sum = waku_relay_sum_msg_bytes_per_shard.value(labelValuesParam = [topic]) + let avg = sum / count + waku_relay_avg_msg_bytes_per_shard.set(avg, labelValues = [topic]) + except CatchableError: + warn "Error calculating average message size", error = getCurrentExceptionMsg() + + try: + let currentMax = waku_relay_max_msg_bytes_per_shard.value(labelValuesParam = [topic]) + if float64(payloadSize) > currentMax: + waku_relay_max_msg_bytes_per_shard.set(float64(payloadSize), labelValues = [topic]) + except CatchableError: + warn "Error updating max message size", error = getCurrentExceptionMsg() proc initRelayObservers(w: WakuRelay) = proc decodeRpcMessageInfo(