mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 16:33:08 +00:00
feat: shard-specific metrics tracking (#3520)
This commit is contained in:
parent
5640232085
commit
c3da29fd63
@ -14,7 +14,8 @@ import ../node/peer_manager/peer_manager, ../waku_core, ../waku_enr
|
|||||||
|
|
||||||
export protocol, waku_enr
|
export protocol, waku_enr
|
||||||
|
|
||||||
declarePublicGauge waku_discv5_discovered, "number of nodes discovered"
|
declarePublicGauge waku_discv5_discovered_per_shard,
|
||||||
|
"number of nodes discovered by each shard", labels = ["shard"]
|
||||||
declarePublicGauge waku_discv5_errors, "number of waku discv5 errors", ["type"]
|
declarePublicGauge waku_discv5_errors, "number of waku discv5 errors", ["type"]
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
@ -231,7 +232,20 @@ proc findRandomPeers*(
|
|||||||
elif wd.predicate.isSome():
|
elif wd.predicate.isSome():
|
||||||
discoveredRecords = discoveredRecords.filter(wd.predicate.get())
|
discoveredRecords = discoveredRecords.filter(wd.predicate.get())
|
||||||
|
|
||||||
waku_discv5_discovered.inc(discoveredRecords.len)
|
# Increment metric for each discovered record's shards
|
||||||
|
for record in discoveredRecords:
|
||||||
|
let typedRecord = record.toTyped().valueOr:
|
||||||
|
# If we can't parse the record, skip it
|
||||||
|
waku_discv5_errors.inc(labelValues = ["ParseFailure"])
|
||||||
|
continue
|
||||||
|
|
||||||
|
let relayShards = typedRecord.relaySharding().valueOr:
|
||||||
|
# If no relay sharding info, skip it
|
||||||
|
waku_discv5_errors.inc(labelValues = ["NoShardInfo"])
|
||||||
|
continue
|
||||||
|
|
||||||
|
for shardId in relayShards.shardIds:
|
||||||
|
waku_discv5_discovered_per_shard.inc(labelValues = [$shardId])
|
||||||
|
|
||||||
return discoveredRecords
|
return discoveredRecords
|
||||||
|
|
||||||
|
|||||||
@ -34,6 +34,8 @@ declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
|
|||||||
declarePublicGauge waku_connected_peers,
|
declarePublicGauge waku_connected_peers,
|
||||||
"Number of physical connections per direction and protocol",
|
"Number of physical connections per direction and protocol",
|
||||||
labels = ["direction", "protocol"]
|
labels = ["direction", "protocol"]
|
||||||
|
declarePublicGauge waku_connected_peers_per_shard,
|
||||||
|
"Number of physical connections per shard", labels = ["shard"]
|
||||||
declarePublicGauge waku_streams_peers,
|
declarePublicGauge waku_streams_peers,
|
||||||
"Number of streams per direction and protocol", labels = ["direction", "protocol"]
|
"Number of streams per direction and protocol", labels = ["direction", "protocol"]
|
||||||
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
|
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
|
||||||
@ -778,6 +780,16 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
|
|||||||
protoStreamsOut.float64, labelValues = [$Direction.Out, proto]
|
protoStreamsOut.float64, labelValues = [$Direction.Out, proto]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
for shard in pm.wakuMetadata.shards.items:
|
||||||
|
waku_connected_peers_per_shard.set(0.0, labelValues = [$shard])
|
||||||
|
|
||||||
|
for shard in pm.wakuMetadata.shards.items:
|
||||||
|
let connectedPeers =
|
||||||
|
peerStore.getPeersByShard(uint16(pm.wakuMetadata.clusterId), uint16(shard))
|
||||||
|
waku_connected_peers_per_shard.set(
|
||||||
|
connectedPeers.len.float64, labelValues = [$shard]
|
||||||
|
)
|
||||||
|
|
||||||
proc getOnlineStateObserver*(pm: PeerManager): OnOnlineStateChange =
|
proc getOnlineStateObserver*(pm: PeerManager): OnOnlineStateChange =
|
||||||
return proc(online: bool) {.gcsafe, raises: [].} =
|
return proc(online: bool) {.gcsafe, raises: [].} =
|
||||||
pm.online = online
|
pm.online = online
|
||||||
|
|||||||
@ -119,6 +119,11 @@ proc handleMessage*(
|
|||||||
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
||||||
waku_archive_insert_duration_seconds.observe(insertDuration)
|
waku_archive_insert_duration_seconds.observe(insertDuration)
|
||||||
|
|
||||||
|
let shard = RelayShard.parseStaticSharding(pubsubTopic).valueOr:
|
||||||
|
DefaultRelayShard
|
||||||
|
|
||||||
|
waku_archive_messages_per_shard.inc(labelValues = [$shard.shardId])
|
||||||
|
|
||||||
trace "message archived",
|
trace "message archived",
|
||||||
msg_hash = msgHashHex,
|
msg_hash = msgHashHex,
|
||||||
pubsubTopic = pubsubTopic,
|
pubsubTopic = pubsubTopic,
|
||||||
|
|||||||
@ -3,6 +3,8 @@
|
|||||||
import metrics
|
import metrics
|
||||||
|
|
||||||
declarePublicGauge waku_archive_messages, "number of historical messages", ["type"]
|
declarePublicGauge waku_archive_messages, "number of historical messages", ["type"]
|
||||||
|
declarePublicGauge waku_archive_messages_per_shard,
|
||||||
|
"number of historical messages per shard ", ["shard"]
|
||||||
declarePublicGauge waku_archive_errors, "number of store protocol errors", ["type"]
|
declarePublicGauge waku_archive_errors, "number of store protocol errors", ["type"]
|
||||||
declarePublicGauge waku_archive_queries, "number of store queries received"
|
declarePublicGauge waku_archive_queries, "number of store queries received"
|
||||||
declarePublicHistogram waku_archive_insert_duration_seconds,
|
declarePublicHistogram waku_archive_insert_duration_seconds,
|
||||||
|
|||||||
@ -23,9 +23,31 @@ import
|
|||||||
from ../waku_core/codecs import WakuRelayCodec
|
from ../waku_core/codecs import WakuRelayCodec
|
||||||
export WakuRelayCodec
|
export WakuRelayCodec
|
||||||
|
|
||||||
|
type ShardMetrics = object
|
||||||
|
count: float64
|
||||||
|
sizeSum: float64
|
||||||
|
avgSize: float64
|
||||||
|
maxSize: float64
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "waku relay"
|
topics = "waku relay"
|
||||||
|
|
||||||
|
declareCounter waku_relay_network_bytes,
|
||||||
|
"total traffic per topic, distinct gross/net and direction",
|
||||||
|
labels = ["topic", "type", "direction"]
|
||||||
|
|
||||||
|
declarePublicGauge(
|
||||||
|
waku_relay_max_msg_bytes_per_shard,
|
||||||
|
"Maximum length of messages seen per shard",
|
||||||
|
labels = ["shard"],
|
||||||
|
)
|
||||||
|
|
||||||
|
declarePublicGauge(
|
||||||
|
waku_relay_avg_msg_bytes_per_shard,
|
||||||
|
"Average length of messages seen per shard",
|
||||||
|
labels = ["shard"],
|
||||||
|
)
|
||||||
|
|
||||||
# see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters
|
# see: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#overview-of-new-parameters
|
||||||
const TopicParameters = TopicParams(
|
const TopicParameters = TopicParams(
|
||||||
topicWeight: 1,
|
topicWeight: 1,
|
||||||
@ -58,10 +80,6 @@ const TopicParameters = TopicParams(
|
|||||||
invalidMessageDeliveriesDecay: 0.5,
|
invalidMessageDeliveriesDecay: 0.5,
|
||||||
)
|
)
|
||||||
|
|
||||||
declareCounter waku_relay_network_bytes,
|
|
||||||
"total traffic per topic, distinct gross/net and direction",
|
|
||||||
labels = ["topic", "type", "direction"]
|
|
||||||
|
|
||||||
# see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters
|
# see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters
|
||||||
const GossipsubParameters = GossipSubParams.init(
|
const GossipsubParameters = GossipSubParams.init(
|
||||||
pruneBackoff = chronos.minutes(1),
|
pruneBackoff = chronos.minutes(1),
|
||||||
@ -137,6 +155,7 @@ type
|
|||||||
topicsHealth*: Table[string, TopicHealth]
|
topicsHealth*: Table[string, TopicHealth]
|
||||||
onTopicHealthChange*: TopicHealthChangeHandler
|
onTopicHealthChange*: TopicHealthChangeHandler
|
||||||
topicHealthLoopHandle*: Future[void]
|
topicHealthLoopHandle*: Future[void]
|
||||||
|
msgMetricsPerShard*: Table[string, ShardMetrics]
|
||||||
|
|
||||||
# predefinition for more detailed results from publishing new message
|
# predefinition for more detailed results from publishing new message
|
||||||
type PublishOutcome* {.pure.} = enum
|
type PublishOutcome* {.pure.} = enum
|
||||||
@ -176,6 +195,7 @@ proc logMessageInfo*(
|
|||||||
onRecv: bool,
|
onRecv: bool,
|
||||||
) =
|
) =
|
||||||
let msg_hash = computeMessageHash(topic, msg).to0xHex()
|
let msg_hash = computeMessageHash(topic, msg).to0xHex()
|
||||||
|
let payloadSize = float64(msg.payload.len)
|
||||||
|
|
||||||
if onRecv:
|
if onRecv:
|
||||||
notice "received relay message",
|
notice "received relay message",
|
||||||
@ -185,7 +205,7 @@ proc logMessageInfo*(
|
|||||||
from_peer_id = remotePeerId,
|
from_peer_id = remotePeerId,
|
||||||
topic = topic,
|
topic = topic,
|
||||||
receivedTime = getNowInNanosecondTime(),
|
receivedTime = getNowInNanosecondTime(),
|
||||||
payloadSizeBytes = msg.payload.len
|
payloadSizeBytes = payloadSize
|
||||||
else:
|
else:
|
||||||
notice "sent relay message",
|
notice "sent relay message",
|
||||||
my_peer_id = w.switch.peerInfo.peerId,
|
my_peer_id = w.switch.peerInfo.peerId,
|
||||||
@ -194,7 +214,19 @@ proc logMessageInfo*(
|
|||||||
to_peer_id = remotePeerId,
|
to_peer_id = remotePeerId,
|
||||||
topic = topic,
|
topic = topic,
|
||||||
sentTime = getNowInNanosecondTime(),
|
sentTime = getNowInNanosecondTime(),
|
||||||
payloadSizeBytes = msg.payload.len
|
payloadSizeBytes = payloadSize
|
||||||
|
|
||||||
|
var shardMetrics = w.msgMetricsPerShard.getOrDefault(topic, ShardMetrics())
|
||||||
|
shardMetrics.count += 1
|
||||||
|
shardMetrics.sizeSum += payloadSize
|
||||||
|
if payloadSize > shardMetrics.maxSize:
|
||||||
|
shardMetrics.maxSize = payloadSize
|
||||||
|
shardMetrics.avgSize = shardMetrics.sizeSum / shardMetrics.count
|
||||||
|
w.msgMetricsPerShard[topic] = shardMetrics
|
||||||
|
|
||||||
|
waku_relay_max_msg_bytes_per_shard.set(shardMetrics.maxSize, labelValues = [topic])
|
||||||
|
|
||||||
|
waku_relay_avg_msg_bytes_per_shard.set(shardMetrics.avgSize, labelValues = [topic])
|
||||||
|
|
||||||
proc initRelayObservers(w: WakuRelay) =
|
proc initRelayObservers(w: WakuRelay) =
|
||||||
proc decodeRpcMessageInfo(
|
proc decodeRpcMessageInfo(
|
||||||
|
|||||||
@ -17,8 +17,9 @@ func generateBucketsForHistogram*(length: int): seq[float64] =
|
|||||||
return buckets
|
return buckets
|
||||||
|
|
||||||
declarePublicCounter(
|
declarePublicCounter(
|
||||||
waku_rln_messages_total, "number of messages published on the rln content topic"
|
waku_rln_messages_total, "number of messages seen by the rln relay"
|
||||||
)
|
)
|
||||||
|
|
||||||
declarePublicCounter(waku_rln_spam_messages_total, "number of spam messages detected")
|
declarePublicCounter(waku_rln_spam_messages_total, "number of spam messages detected")
|
||||||
declarePublicCounter(
|
declarePublicCounter(
|
||||||
waku_rln_invalid_messages_total, "number of invalid messages detected", ["type"]
|
waku_rln_invalid_messages_total, "number of invalid messages detected", ["type"]
|
||||||
|
|||||||
@ -13,6 +13,7 @@ const DefaultPageSize*: uint = 20
|
|||||||
type WakuStoreClient* = ref object
|
type WakuStoreClient* = ref object
|
||||||
peerManager: PeerManager
|
peerManager: PeerManager
|
||||||
rng: ref rand.HmacDrbgContext
|
rng: ref rand.HmacDrbgContext
|
||||||
|
storeMsgMetricsPerShard*: Table[string, float64]
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
T: type WakuStoreClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext
|
T: type WakuStoreClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext
|
||||||
@ -46,6 +47,17 @@ proc sendStoreRequest(
|
|||||||
waku_store_errors.inc(labelValues = [NoSuccessStatusCode])
|
waku_store_errors.inc(labelValues = [NoSuccessStatusCode])
|
||||||
return err(StoreError.new(res.statusCode, res.statusDesc))
|
return err(StoreError.new(res.statusCode, res.statusDesc))
|
||||||
|
|
||||||
|
if req.pubsubTopic.isSome():
|
||||||
|
let topic = req.pubsubTopic.get()
|
||||||
|
if not self.storeMsgMetricsPerShard.hasKey(topic):
|
||||||
|
self.storeMsgMetricsPerShard[topic] = 0
|
||||||
|
self.storeMsgMetricsPerShard[topic] += float64(req.encode().buffer.len)
|
||||||
|
|
||||||
|
waku_relay_fleet_store_msg_size_bytes.inc(
|
||||||
|
self.storeMsgMetricsPerShard[topic], labelValues = [topic]
|
||||||
|
)
|
||||||
|
waku_relay_fleet_store_msg_count.inc(1.0, labelValues = [topic])
|
||||||
|
|
||||||
return ok(res)
|
return ok(res)
|
||||||
|
|
||||||
proc query*(
|
proc query*(
|
||||||
|
|||||||
@ -10,6 +10,18 @@ declarePublicGauge waku_store_queries, "number of store queries received"
|
|||||||
declarePublicGauge waku_store_time_seconds,
|
declarePublicGauge waku_store_time_seconds,
|
||||||
"Time in seconds spent by each store phase", labels = ["phase"]
|
"Time in seconds spent by each store phase", labels = ["phase"]
|
||||||
|
|
||||||
|
declarePublicGauge(
|
||||||
|
waku_relay_fleet_store_msg_size_bytes,
|
||||||
|
"Total size of messages stored by fleet store nodes per shard",
|
||||||
|
labels = ["shard"],
|
||||||
|
)
|
||||||
|
|
||||||
|
declarePublicGauge(
|
||||||
|
waku_relay_fleet_store_msg_count,
|
||||||
|
"Number of messages stored by fleet store nodes per shard",
|
||||||
|
labels = ["shard"],
|
||||||
|
)
|
||||||
|
|
||||||
# Error types (metric label values)
|
# Error types (metric label values)
|
||||||
const
|
const
|
||||||
DialFailure* = "dial_failure"
|
DialFailure* = "dial_failure"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user