mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
feat: Added proper per shard bandwidth metric calculation (#2851)
* Added proper per shard bandwidth metric calculation and proper logging of in/out messages Changed rate limit metrics for dashboard Updated monitoring dashboard for bw and rate metrics
This commit is contained in:
parent
9362948a02
commit
35509ed4fd
File diff suppressed because it is too large
Load Diff
@ -43,10 +43,10 @@ template checkUsageLimit*(
|
|||||||
bodyWithinLimit, bodyRejected: untyped,
|
bodyWithinLimit, bodyRejected: untyped,
|
||||||
) =
|
) =
|
||||||
if t.checkUsage(proto, conn):
|
if t.checkUsage(proto, conn):
|
||||||
waku_service_requests.inc(labelValues = [proto])
|
waku_service_requests.inc(labelValues = [proto, "served"])
|
||||||
bodyWithinLimit
|
bodyWithinLimit
|
||||||
else:
|
else:
|
||||||
waku_service_requests_rejected.inc(labelValues = [proto])
|
waku_service_requests.inc(labelValues = [proto, "rejected"])
|
||||||
bodyRejected
|
bodyRejected
|
||||||
|
|
||||||
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
|
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
|
||||||
|
|||||||
@ -6,13 +6,7 @@ else:
|
|||||||
import metrics
|
import metrics
|
||||||
|
|
||||||
declarePublicCounter waku_service_requests,
|
declarePublicCounter waku_service_requests,
|
||||||
"number of non-relay service requests received", ["service"]
|
"number of non-relay service requests received", ["service", "state"]
|
||||||
declarePublicCounter waku_service_requests_rejected,
|
|
||||||
"number of non-relay service requests received being rejected due to limit overdue",
|
|
||||||
["service"]
|
|
||||||
|
|
||||||
declarePublicCounter waku_service_inbound_network_bytes,
|
declarePublicCounter waku_service_network_bytes,
|
||||||
"total incoming traffic of specific waku services", labels = ["service"]
|
"total incoming traffic of specific waku services", labels = ["service", "direction"]
|
||||||
|
|
||||||
declarePublicCounter waku_service_outbound_network_bytes,
|
|
||||||
"total outgoing traffic of specific waku services", labels = ["service"]
|
|
||||||
|
|||||||
@ -73,8 +73,8 @@ proc initProtocolHandler(wl: WakuLightPush) =
|
|||||||
wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn):
|
wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn):
|
||||||
let buffer = await conn.readLp(DefaultMaxRpcSize)
|
let buffer = await conn.readLp(DefaultMaxRpcSize)
|
||||||
|
|
||||||
waku_service_inbound_network_bytes.inc(
|
waku_service_network_bytes.inc(
|
||||||
amount = buffer.len().int64, labelValues = [WakuLightPushCodec]
|
amount = buffer.len().int64, labelValues = [WakuLightPushCodec, "in"]
|
||||||
)
|
)
|
||||||
|
|
||||||
rpc = await handleRequest(wl, conn.peerId, buffer)
|
rpc = await handleRequest(wl, conn.peerId, buffer)
|
||||||
@ -115,4 +115,4 @@ proc new*(
|
|||||||
requestRateLimiter: newTokenBucket(rateLimitSetting),
|
requestRateLimiter: newTokenBucket(rateLimitSetting),
|
||||||
)
|
)
|
||||||
wl.initProtocolHandler()
|
wl.initProtocolHandler()
|
||||||
return wl
|
return wl
|
||||||
|
|||||||
@ -59,6 +59,9 @@ const TopicParameters = TopicParams(
|
|||||||
invalidMessageDeliveriesDecay: 0.5,
|
invalidMessageDeliveriesDecay: 0.5,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
declareCounter waku_relay_network_bytes,
|
||||||
|
"total traffic per topic", labels = ["topic", "direction"]
|
||||||
|
|
||||||
# see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters
|
# see: https://rfc.vac.dev/spec/29/#gossipsub-v10-parameters
|
||||||
const GossipsubParameters = GossipSubParams.init(
|
const GossipsubParameters = GossipSubParams.init(
|
||||||
pruneBackoff = chronos.minutes(1),
|
pruneBackoff = chronos.minutes(1),
|
||||||
@ -150,6 +153,78 @@ proc initProtocolHandler(w: WakuRelay) =
|
|||||||
w.handler = handler
|
w.handler = handler
|
||||||
w.codec = WakuRelayCodec
|
w.codec = WakuRelayCodec
|
||||||
|
|
||||||
|
proc initRelayMetricObserver(w: WakuRelay) =
|
||||||
|
proc decodeRpcMessageInfo(
|
||||||
|
peer: PubSubPeer, msg: Message
|
||||||
|
): Result[
|
||||||
|
tuple[msgId: string, topic: string, wakuMessage: WakuMessage, msgSize: int], void
|
||||||
|
] =
|
||||||
|
let msg_id = w.msgIdProvider(msg).valueOr:
|
||||||
|
warn "Error generating message id",
|
||||||
|
my_peer_id = w.switch.peerInfo.peerId,
|
||||||
|
from_peer_id = peer.peerId,
|
||||||
|
pubsub_topic = msg.topic,
|
||||||
|
error = $error
|
||||||
|
return err()
|
||||||
|
|
||||||
|
let msg_id_short = shortLog(msg_id)
|
||||||
|
|
||||||
|
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
|
||||||
|
warn "Error decoding to Waku Message",
|
||||||
|
my_peer_id = w.switch.peerInfo.peerId,
|
||||||
|
msg_id = msg_id_short,
|
||||||
|
from_peer_id = peer.peerId,
|
||||||
|
pubsub_topic = msg.topic,
|
||||||
|
error = $error
|
||||||
|
return err()
|
||||||
|
|
||||||
|
let msgSize = msg.data.len + msg.topic.len
|
||||||
|
return ok((msg_id_short, msg.topic, wakuMessage, msgSize))
|
||||||
|
|
||||||
|
proc logMessageInfo(
|
||||||
|
peer: PubSubPeer, topic: string, msg_id_short: string, msg: WakuMessage
|
||||||
|
) =
|
||||||
|
let msg_hash = computeMessageHash(topic, msg).to0xHex()
|
||||||
|
|
||||||
|
notice "sent relay message",
|
||||||
|
my_peer_id = w.switch.peerInfo.peerId,
|
||||||
|
msg_hash = msg_hash,
|
||||||
|
msg_id = msg_id_short,
|
||||||
|
to_peer_id = peer.peerId,
|
||||||
|
topic = topic,
|
||||||
|
sentTime = getNowInNanosecondTime(),
|
||||||
|
payloadSizeBytes = msg.payload.len
|
||||||
|
|
||||||
|
proc updateMetrics(
|
||||||
|
peer: PubSubPeer,
|
||||||
|
pubsub_topic: string,
|
||||||
|
msg: WakuMessage,
|
||||||
|
msgSize: int,
|
||||||
|
onRecv: bool,
|
||||||
|
) =
|
||||||
|
waku_relay_network_bytes.inc(
|
||||||
|
msgSize.int64, labelValues = [pubsub_topic, if onRecv: "in" else: "out"]
|
||||||
|
)
|
||||||
|
|
||||||
|
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||||
|
for msg in msgs.messages:
|
||||||
|
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
|
||||||
|
continue
|
||||||
|
# message receive log happens in treaceHandler as this one is called before checks
|
||||||
|
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = true)
|
||||||
|
discard
|
||||||
|
|
||||||
|
proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
|
||||||
|
for msg in msgs.messages:
|
||||||
|
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
|
||||||
|
continue
|
||||||
|
logMessageInfo(peer, topic, msg_id_short, wakuMessage)
|
||||||
|
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = false)
|
||||||
|
|
||||||
|
let administrativeObserver = PubSubObserver(onRecv: onRecv, onSend: onSend)
|
||||||
|
|
||||||
|
w.addObserver(administrativeObserver)
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize)
|
T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize)
|
||||||
): WakuRelayResult[T] =
|
): WakuRelayResult[T] =
|
||||||
@ -170,6 +245,7 @@ proc new*(
|
|||||||
|
|
||||||
procCall GossipSub(w).initPubSub()
|
procCall GossipSub(w).initPubSub()
|
||||||
w.initProtocolHandler()
|
w.initProtocolHandler()
|
||||||
|
w.initRelayMetricObserver()
|
||||||
except InitializationError:
|
except InitializationError:
|
||||||
return err("initialization error: " & getCurrentExceptionMsg())
|
return err("initialization error: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
@ -180,6 +256,9 @@ proc addValidator*(
|
|||||||
) {.gcsafe.} =
|
) {.gcsafe.} =
|
||||||
w.wakuValidators.add((handler, errorMessage))
|
w.wakuValidators.add((handler, errorMessage))
|
||||||
|
|
||||||
|
proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
|
||||||
|
procCall GossipSub(w).addObserver(observer)
|
||||||
|
|
||||||
method start*(w: WakuRelay) {.async, base.} =
|
method start*(w: WakuRelay) {.async, base.} =
|
||||||
debug "start"
|
debug "start"
|
||||||
await procCall GossipSub(w).start()
|
await procCall GossipSub(w).start()
|
||||||
@ -311,4 +390,6 @@ proc publish*(
|
|||||||
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||||
notice "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic
|
notice "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic
|
||||||
|
|
||||||
return await procCall GossipSub(w).publish(pubsubTopic, data)
|
let relayedPeerCount = await procCall GossipSub(w).publish(pubsubTopic, data)
|
||||||
|
|
||||||
|
return relayedPeerCount
|
||||||
|
|||||||
@ -103,8 +103,8 @@ proc initProtocolHandler(self: WakuStore) =
|
|||||||
error "Connection read error", error = error.msg
|
error "Connection read error", error = error.msg
|
||||||
return
|
return
|
||||||
|
|
||||||
waku_service_inbound_network_bytes.inc(
|
waku_service_network_bytes.inc(
|
||||||
amount = reqBuf.len().int64, labelValues = [WakuStoreCodec]
|
amount = reqBuf.len().int64, labelValues = [WakuStoreCodec, "in"]
|
||||||
)
|
)
|
||||||
|
|
||||||
resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)
|
resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)
|
||||||
@ -120,8 +120,8 @@ proc initProtocolHandler(self: WakuStore) =
|
|||||||
error "Connection write error", error = writeRes.error.msg
|
error "Connection write error", error = writeRes.error.msg
|
||||||
return
|
return
|
||||||
|
|
||||||
waku_service_outbound_network_bytes.inc(
|
waku_service_network_bytes.inc(
|
||||||
amount = resBuf.len().int64, labelValues = [WakuStoreCodec]
|
amount = resBuf.len().int64, labelValues = [WakuStoreCodec, "out"]
|
||||||
)
|
)
|
||||||
|
|
||||||
self.handler = handler
|
self.handler = handler
|
||||||
|
|||||||
@ -118,8 +118,8 @@ proc initProtocolHandler(ws: WakuStore) =
|
|||||||
error "Connection read error", error = error.msg
|
error "Connection read error", error = error.msg
|
||||||
return
|
return
|
||||||
|
|
||||||
waku_service_inbound_network_bytes.inc(
|
waku_service_network_bytes.inc(
|
||||||
amount = reqBuf.len().int64, labelValues = [WakuLegacyStoreCodec]
|
amount = reqBuf.len().int64, labelValues = [WakuLegacyStoreCodec, "in"]
|
||||||
)
|
)
|
||||||
|
|
||||||
resBuf = await ws.handleLegacyQueryRequest(conn.peerId, reqBuf)
|
resBuf = await ws.handleLegacyQueryRequest(conn.peerId, reqBuf)
|
||||||
@ -135,8 +135,8 @@ proc initProtocolHandler(ws: WakuStore) =
|
|||||||
error "Connection write error", error = writeRes.error.msg
|
error "Connection write error", error = writeRes.error.msg
|
||||||
return
|
return
|
||||||
|
|
||||||
waku_service_outbound_network_bytes.inc(
|
waku_service_network_bytes.inc(
|
||||||
amount = resBuf.len().int64, labelValues = [WakuLegacyStoreCodec]
|
amount = resBuf.len().int64, labelValues = [WakuLegacyStoreCodec, "out"]
|
||||||
)
|
)
|
||||||
|
|
||||||
ws.handler = handler
|
ws.handler = handler
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user