mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-27 23:27:27 +00:00
feat: Added proper per shard bandwidth metric calculation (#2851)
* Added proper per shard bandwidth metric calculation and proper logging of in/out messages Changed rate limit metrics for dashboard Updated monitoring dashboard for bw and rate metrics
This commit is contained in:
parent
76d5b2642d
commit
8f14c04735
File diff suppressed because it is too large
Load Diff
@ -43,10 +43,10 @@ template checkUsageLimit*(
|
||||
bodyWithinLimit, bodyRejected: untyped,
|
||||
) =
|
||||
if t.checkUsage(proto, conn):
|
||||
waku_service_requests.inc(labelValues = [proto])
|
||||
waku_service_requests.inc(labelValues = [proto, "served"])
|
||||
bodyWithinLimit
|
||||
else:
|
||||
waku_service_requests_rejected.inc(labelValues = [proto])
|
||||
waku_service_requests.inc(labelValues = [proto, "rejected"])
|
||||
bodyRejected
|
||||
|
||||
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
|
||||
|
@ -6,13 +6,7 @@ else:
|
||||
import metrics
|
||||
|
||||
declarePublicCounter waku_service_requests,
|
||||
"number of non-relay service requests received", ["service"]
|
||||
declarePublicCounter waku_service_requests_rejected,
|
||||
"number of non-relay service requests received being rejected due to limit overdue",
|
||||
["service"]
|
||||
"number of non-relay service requests received", ["service", "state"]
|
||||
|
||||
declarePublicCounter waku_service_inbound_network_bytes,
|
||||
"total incoming traffic of specific waku services", labels = ["service"]
|
||||
|
||||
declarePublicCounter waku_service_outbound_network_bytes,
|
||||
"total outgoing traffic of specific waku services", labels = ["service"]
|
||||
declarePublicCounter waku_service_network_bytes,
|
||||
"total incoming traffic of specific waku services", labels = ["service", "direction"]
|
||||
|
@ -73,8 +73,8 @@ proc initProtocolHandler(wl: WakuLightPush) =
|
||||
wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn):
|
||||
let buffer = await conn.readLp(DefaultMaxRpcSize)
|
||||
|
||||
waku_service_inbound_network_bytes.inc(
|
||||
amount = buffer.len().int64, labelValues = [WakuLightPushCodec]
|
||||
waku_service_network_bytes.inc(
|
||||
amount = buffer.len().int64, labelValues = [WakuLightPushCodec, "in"]
|
||||
)
|
||||
|
||||
rpc = await handleRequest(wl, conn.peerId, buffer)
|
||||
|
@ -59,6 +59,9 @@ const TopicParameters = TopicParams(
|
||||
invalidMessageDeliveriesDecay: 0.5,
|
||||
)
|
||||
|
||||
declareCounter waku_relay_network_bytes,
|
||||
"total traffic per topic", labels = ["topic", "direction"]
|
||||
|
||||
# see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters
|
||||
const GossipsubParameters = GossipSubParams.init(
|
||||
pruneBackoff = chronos.minutes(1),
|
||||
@ -150,6 +153,78 @@ proc initProtocolHandler(w: WakuRelay) =
|
||||
w.handler = handler
|
||||
w.codec = WakuRelayCodec
|
||||
|
||||
proc initRelayMetricObserver(w: WakuRelay) =
|
||||
proc decodeRpcMessageInfo(
|
||||
peer: PubSubPeer, msg: Message
|
||||
): Result[
|
||||
tuple[msgId: string, topic: string, wakuMessage: WakuMessage, msgSize: int], void
|
||||
] =
|
||||
let msg_id = w.msgIdProvider(msg).valueOr:
|
||||
warn "Error generating message id",
|
||||
my_peer_id = w.switch.peerInfo.peerId,
|
||||
from_peer_id = peer.peerId,
|
||||
pubsub_topic = msg.topic,
|
||||
error = $error
|
||||
return err()
|
||||
|
||||
let msg_id_short = shortLog(msg_id)
|
||||
|
||||
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
|
||||
warn "Error decoding to Waku Message",
|
||||
my_peer_id = w.switch.peerInfo.peerId,
|
||||
msg_id = msg_id_short,
|
||||
from_peer_id = peer.peerId,
|
||||
pubsub_topic = msg.topic,
|
||||
error = $error
|
||||
return err()
|
||||
|
||||
let msgSize = msg.data.len + msg.topic.len
|
||||
return ok((msg_id_short, msg.topic, wakuMessage, msgSize))
|
||||
|
||||
proc logMessageInfo(
|
||||
peer: PubSubPeer, topic: string, msg_id_short: string, msg: WakuMessage
|
||||
) =
|
||||
let msg_hash = computeMessageHash(topic, msg).to0xHex()
|
||||
|
||||
notice "sent relay message",
|
||||
my_peer_id = w.switch.peerInfo.peerId,
|
||||
msg_hash = msg_hash,
|
||||
msg_id = msg_id_short,
|
||||
to_peer_id = peer.peerId,
|
||||
topic = topic,
|
||||
sentTime = getNowInNanosecondTime(),
|
||||
payloadSizeBytes = msg.payload.len
|
||||
|
||||
proc updateMetrics(
|
||||
peer: PubSubPeer,
|
||||
pubsub_topic: string,
|
||||
msg: WakuMessage,
|
||||
msgSize: int,
|
||||
onRecv: bool,
|
||||
) =
|
||||
waku_relay_network_bytes.inc(
|
||||
msgSize.int64, labelValues = [pubsub_topic, if onRecv: "in" else: "out"]
|
||||
)
|
||||
|
||||
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||
for msg in msgs.messages:
|
||||
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
|
||||
continue
|
||||
# message receive log happens in treaceHandler as this one is called before checks
|
||||
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = true)
|
||||
discard
|
||||
|
||||
proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||
for msg in msgs.messages:
|
||||
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
|
||||
continue
|
||||
logMessageInfo(peer, topic, msg_id_short, wakuMessage)
|
||||
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = false)
|
||||
|
||||
let administrativeObserver = PubSubObserver(onRecv: onRecv, onSend: onSend)
|
||||
|
||||
w.addObserver(administrativeObserver)
|
||||
|
||||
proc new*(
|
||||
T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize)
|
||||
): WakuRelayResult[T] =
|
||||
@ -170,6 +245,7 @@ proc new*(
|
||||
|
||||
procCall GossipSub(w).initPubSub()
|
||||
w.initProtocolHandler()
|
||||
w.initRelayMetricObserver()
|
||||
except InitializationError:
|
||||
return err("initialization error: " & getCurrentExceptionMsg())
|
||||
|
||||
@ -180,6 +256,9 @@ proc addValidator*(
|
||||
) {.gcsafe.} =
|
||||
w.wakuValidators.add((handler, errorMessage))
|
||||
|
||||
proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
|
||||
procCall GossipSub(w).addObserver(observer)
|
||||
|
||||
method start*(w: WakuRelay) {.async, base.} =
|
||||
debug "start"
|
||||
await procCall GossipSub(w).start()
|
||||
@ -311,4 +390,6 @@ proc publish*(
|
||||
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||
notice "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic
|
||||
|
||||
return await procCall GossipSub(w).publish(pubsubTopic, data)
|
||||
let relayedPeerCount = await procCall GossipSub(w).publish(pubsubTopic, data)
|
||||
|
||||
return relayedPeerCount
|
||||
|
@ -103,8 +103,8 @@ proc initProtocolHandler(self: WakuStore) =
|
||||
error "Connection read error", error = error.msg
|
||||
return
|
||||
|
||||
waku_service_inbound_network_bytes.inc(
|
||||
amount = reqBuf.len().int64, labelValues = [WakuStoreCodec]
|
||||
waku_service_network_bytes.inc(
|
||||
amount = reqBuf.len().int64, labelValues = [WakuStoreCodec, "in"]
|
||||
)
|
||||
|
||||
resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)
|
||||
@ -120,8 +120,8 @@ proc initProtocolHandler(self: WakuStore) =
|
||||
error "Connection write error", error = writeRes.error.msg
|
||||
return
|
||||
|
||||
waku_service_outbound_network_bytes.inc(
|
||||
amount = resBuf.len().int64, labelValues = [WakuStoreCodec]
|
||||
waku_service_network_bytes.inc(
|
||||
amount = resBuf.len().int64, labelValues = [WakuStoreCodec, "out"]
|
||||
)
|
||||
|
||||
self.handler = handler
|
||||
|
@ -118,8 +118,8 @@ proc initProtocolHandler(ws: WakuStore) =
|
||||
error "Connection read error", error = error.msg
|
||||
return
|
||||
|
||||
waku_service_inbound_network_bytes.inc(
|
||||
amount = reqBuf.len().int64, labelValues = [WakuLegacyStoreCodec]
|
||||
waku_service_network_bytes.inc(
|
||||
amount = reqBuf.len().int64, labelValues = [WakuLegacyStoreCodec, "in"]
|
||||
)
|
||||
|
||||
resBuf = await ws.handleLegacyQueryRequest(conn.peerId, reqBuf)
|
||||
@ -135,8 +135,8 @@ proc initProtocolHandler(ws: WakuStore) =
|
||||
error "Connection write error", error = writeRes.error.msg
|
||||
return
|
||||
|
||||
waku_service_outbound_network_bytes.inc(
|
||||
amount = resBuf.len().int64, labelValues = [WakuLegacyStoreCodec]
|
||||
waku_service_network_bytes.inc(
|
||||
amount = resBuf.len().int64, labelValues = [WakuLegacyStoreCodec, "out"]
|
||||
)
|
||||
|
||||
ws.handler = handler
|
||||
|
Loading…
x
Reference in New Issue
Block a user