diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index eef965983..8d4d357ab 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -813,7 +813,7 @@ proc logAndMetrics(pm: PeerManager) {.async.} = protoStreamsOut.float64, labelValues = [$Direction.Out, proto] ) - for shard in 0 .. 63: # Assuming shards are 0-63 as per Waku spec + for shard in pm.wakuMetadata.shards.items: waku_connected_peers_per_shard.set(0.0, labelValues = [$shard]) for shard in pm.wakuMetadata.shards.items: diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index a957eb3fd..9e2b46afe 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -20,17 +20,18 @@ import import ../waku_core, ./message_id, ./topic_health, ../node/delivery_monitor/publish_observer +var msgCountPerShard {.global, threadvar.}: Table[string, int64] +var msgSizeSumPerShard {.global, threadvar.}: Table[string, int64] + from ../waku_core/codecs import WakuRelayCodec export WakuRelayCodec logScope: topics = "waku relay" -declarePublicGauge( - waku_relay_msg_count_per_shard, - "Number of messages seen, grouped by shard", - labels = ["shard"], -) +declareCounter waku_relay_network_bytes, + "total traffic per topic, distinct gross/net and direction", + labels = ["topic", "type", "direction"] declarePublicGauge( waku_relay_max_msg_bytes_per_shard, @@ -44,12 +45,6 @@ declarePublicGauge( labels = ["shard"], ) -declarePublicGauge( - waku_relay_sum_msg_bytes_per_shard, - "Sum of message lengths seen per shard", - labels = ["shard"], -) - # see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters const TopicParameters = TopicParams( topicWeight: 1, @@ -82,10 +77,6 @@ const TopicParameters = TopicParams( invalidMessageDeliveriesDecay: 0.5, ) -declareCounter waku_relay_network_bytes, - "total traffic per topic, distinct gross/net and direction", - labels = ["topic", "type", "direction"] - # see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters const GossipsubParameters = GossipSubParams.init( pruneBackoff = chronos.minutes(1), @@ -221,25 +212,27 @@ proc logMessageInfo*( sentTime = getNowInNanosecondTime(), payloadSizeBytes = payloadSize - # Increment message count - waku_relay_msg_count_per_shard.inc(labelValues = [topic]) - - # Add to sum of message sizes - waku_relay_sum_msg_bytes_per_shard.inc(payloadSize, labelValues = [topic]) - - # Calculate and set average (sum/count) try: - let count = waku_relay_msg_count_per_shard.value(labelValuesParam = [topic]) - let sum = waku_relay_sum_msg_bytes_per_shard.value(labelValuesParam = [topic]) + msgCountPerShard[topic] += 1 + msgSizeSumPerShard[topic] += payloadSize + except KeyError: + warn "Error updating message metrics", error = getCurrentExceptionMsg() + + try: + let count = msgCountPerShard[topic] + let sum = msgSizeSumPerShard[topic] let avg = sum / count waku_relay_avg_msg_bytes_per_shard.set(avg, labelValues = [topic]) except CatchableError: warn "Error calculating average message size", error = getCurrentExceptionMsg() try: - let currentMax = waku_relay_max_msg_bytes_per_shard.value(labelValuesParam = [topic]) + let currentMax = + waku_relay_max_msg_bytes_per_shard.value(labelValuesParam = [topic]) if float64(payloadSize) > currentMax: - waku_relay_max_msg_bytes_per_shard.set(float64(payloadSize), labelValues = [topic]) + waku_relay_max_msg_bytes_per_shard.set( + float64(payloadSize), labelValues = [topic] + ) except CatchableError: warn "Error updating max message size", error = getCurrentExceptionMsg()