mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-04-16 13:13:08 +00:00
add more health metrics and benchmarks
This commit is contained in:
parent
79bc9fd421
commit
d4fa393be8
37
waku/common/benchmark_metrics.nim
Normal file
37
waku/common/benchmark_metrics.nim
Normal file
@ -0,0 +1,37 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/times
|
||||
import metrics
|
||||
|
||||
export metrics
|
||||
|
||||
declarePublicSummary benchmark_duration_seconds,
|
||||
"duration in seconds", ["module", "proc"]
|
||||
|
||||
## Sets up a deferred timer that observes elapsed seconds into
|
||||
## `benchmark_duration_seconds` when the enclosing scope exits.
|
||||
## The summary's `_count` field tracks the number of calls.
|
||||
##
|
||||
## Both `module` and `procName` are static strings resolved at compile time,
|
||||
## ensuring labels are always reliable regardless of build flags.
|
||||
##
|
||||
## Usage:
|
||||
## import waku/common/benchmark_metrics
|
||||
##
|
||||
## proc myProc*() =
|
||||
## benchmarkPoint("waku_relay", "myProc")
|
||||
## # ... rest of the proc
|
||||
|
||||
when defined(metrics):
|
||||
proc recordBenchmark(
|
||||
startTime: float64, module: string, procName: string
|
||||
) {.gcsafe, raises: [].} =
|
||||
benchmark_duration_seconds.observe(
|
||||
getTime().toUnixFloat() - startTime, labelValues = [module, procName]
|
||||
)
|
||||
|
||||
template benchmarkPoint*(module: static string, procName: static string) =
|
||||
when defined(metrics):
|
||||
let bpStartTime = getTime().toUnixFloat()
|
||||
defer:
|
||||
recordBenchmark(bpStartTime, module, procName)
|
||||
@ -10,7 +10,7 @@ import
|
||||
chronos/threadsync,
|
||||
chronicles,
|
||||
strutils
|
||||
import ./dbconn, ../common, ../../../waku_core/time
|
||||
import ./dbconn, ../common, ../../../waku_core/time, waku/common/benchmark_metrics
|
||||
|
||||
type
|
||||
# Database connection pool
|
||||
@ -120,6 +120,7 @@ proc pgQuery*(
|
||||
rowCallback: DataProc = nil,
|
||||
requestId: string = "",
|
||||
): Future[DatabaseResult[void]] {.async.} =
|
||||
benchmarkPoint("postgres", "pgQuery")
|
||||
let connIndex = (await pool.getConnIndex()).valueOr:
|
||||
return err("connRes.isErr in query: " & $error)
|
||||
|
||||
@ -152,6 +153,7 @@ proc runStmt*(
|
||||
##
|
||||
## rowCallback != nil when it is expected to retrieve info from the database.
|
||||
## rowCallback == nil for queries that change the database state.
|
||||
benchmarkPoint("postgres", "runStmt")
|
||||
|
||||
let connIndex = (await pool.getConnIndex()).valueOr:
|
||||
return err("Error in runStmt: " & $error)
|
||||
|
||||
48
waku/node/health_monitor/event_loop_monitor.nim
Normal file
48
waku/node/health_monitor/event_loop_monitor.nim
Normal file
@ -0,0 +1,48 @@
|
||||
{.push raises: [].}
|
||||
|
||||
## Monitors chronos event loop responsiveness.
|
||||
##
|
||||
## Schedules a task every `CheckInterval`. Because chronos is single-threaded
|
||||
## and cooperative, the task can only resume after all previously queued work
|
||||
## completes. The actual elapsed time between iterations therefore reflects
|
||||
## how saturated the event loop is:
|
||||
##
|
||||
## actual_elapsed ≈ CheckInterval → loop is healthy
|
||||
## actual_elapsed >> CheckInterval → tasks are accumulating / loop is stalling
|
||||
##
|
||||
## The lag (actual - expected) is exposed via `event_loop_lag_seconds`.
|
||||
|
||||
import chronos, chronicles, metrics
|
||||
|
||||
logScope:
|
||||
topics = "waku event_loop_monitor"
|
||||
|
||||
const CheckInterval = 5.seconds
|
||||
|
||||
declarePublicGauge event_loop_lag_seconds,
|
||||
"chronos event loop lag in seconds: difference between actual and expected wake-up interval"
|
||||
|
||||
proc eventLoopMonitorLoop*() {.async.} =
|
||||
var lastWakeup = Moment.now()
|
||||
while true:
|
||||
await sleepAsync(CheckInterval)
|
||||
|
||||
let now = Moment.now()
|
||||
let actualElapsed = now - lastWakeup
|
||||
let lag = actualElapsed - CheckInterval
|
||||
let lagSecs = lag.nanoseconds.float64 / 1_000_000_000.0
|
||||
|
||||
event_loop_lag_seconds.set(lagSecs)
|
||||
|
||||
if lag > CheckInterval:
|
||||
warn "chronos event loop severely lagging, many tasks may be accumulating",
|
||||
expected_secs = CheckInterval.seconds,
|
||||
actual_secs = actualElapsed.nanoseconds.float64 / 1_000_000_000.0,
|
||||
lag_secs = lagSecs
|
||||
elif lag > (CheckInterval div 2):
|
||||
info "chronos event loop lag detected",
|
||||
expected_secs = CheckInterval.seconds,
|
||||
actual_secs = actualElapsed.nanoseconds.float64 / 1_000_000_000.0,
|
||||
lag_secs = lagSecs
|
||||
|
||||
lastWakeup = now
|
||||
@ -14,7 +14,8 @@ import
|
||||
../peer_manager,
|
||||
./online_monitor,
|
||||
./health_status,
|
||||
./protocol_health
|
||||
./protocol_health,
|
||||
./event_loop_monitor
|
||||
|
||||
## This module is aimed to check the state of the "self" Waku Node
|
||||
|
||||
@ -32,6 +33,7 @@ type
|
||||
node: WakuNode
|
||||
onlineMonitor*: OnlineMonitor
|
||||
keepAliveFut: Future[void]
|
||||
eventLoopMonitorFut: Future[void]
|
||||
|
||||
template checkWakuNodeNotNil(node: WakuNode, p: ProtocolHealth): untyped =
|
||||
if node.isNil():
|
||||
@ -416,6 +418,7 @@ proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] =
|
||||
hm.onlineMonitor.startOnlineMonitor()
|
||||
hm.startKeepalive().isOkOr:
|
||||
return err("startHealthMonitor: failed starting keep alive: " & error)
|
||||
hm.eventLoopMonitorFut = eventLoopMonitorLoop()
|
||||
return ok()
|
||||
|
||||
proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} =
|
||||
@ -425,6 +428,9 @@ proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} =
|
||||
if not hm.keepAliveFut.isNil():
|
||||
await hm.keepAliveFut.cancelAndWait()
|
||||
|
||||
if not hm.eventLoopMonitorFut.isNil():
|
||||
await hm.eventLoopMonitorFut.cancelAndWait()
|
||||
|
||||
proc new*(
|
||||
T: type NodeHealthMonitor,
|
||||
dnsNameServers = @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
|
||||
|
||||
@ -11,7 +11,8 @@ import
|
||||
./common,
|
||||
./protocol_metrics,
|
||||
./rpc,
|
||||
./rpc_codec
|
||||
./rpc_codec,
|
||||
../common/benchmark_metrics
|
||||
|
||||
logScope:
|
||||
topics = "waku lightpush client"
|
||||
@ -76,6 +77,7 @@ proc publish*(
|
||||
wakuMessage: WakuMessage,
|
||||
peer: PeerId | RemotePeerInfo,
|
||||
): Future[WakuLightPushResult] {.async, gcsafe.} =
|
||||
benchmarkPoint("waku_lightpush_client", "publish")
|
||||
var message = wakuMessage
|
||||
if message.timestamp == 0:
|
||||
message.timestamp = getNowInNanosecondTime()
|
||||
@ -104,6 +106,7 @@ proc publishToAny*(
|
||||
): Future[WakuLightPushResult] {.async, gcsafe.} =
|
||||
## This proc is similar to the publish one but in this case
|
||||
## we don't specify a particular peer and instead we get it from peer manager
|
||||
benchmarkPoint("waku_lightpush_client", "publishToAny")
|
||||
|
||||
var message = wakuMessage
|
||||
if message.timestamp == 0:
|
||||
@ -140,6 +143,7 @@ proc publishWithConn*(
|
||||
conn: Connection,
|
||||
destPeer: PeerId,
|
||||
): Future[WakuLightPushResult] {.async, gcsafe.} =
|
||||
benchmarkPoint("waku_lightpush_client", "publishWithConn")
|
||||
info "publishWithConn",
|
||||
my_peer_id = wl.peerManager.switch.peerInfo.peerId,
|
||||
peer_id = destPeer,
|
||||
|
||||
@ -16,7 +16,8 @@ import
|
||||
./rpc,
|
||||
./rpc_codec,
|
||||
./protocol_metrics,
|
||||
../common/rate_limit/request_limiter
|
||||
../common/rate_limit/request_limiter,
|
||||
../common/benchmark_metrics
|
||||
|
||||
logScope:
|
||||
topics = "waku lightpush"
|
||||
@ -31,6 +32,7 @@ type WakuLightPush* = ref object of LPProtocol
|
||||
proc handleRequest(
|
||||
wl: WakuLightPush, peerId: PeerId, pushRequest: LightpushRequest
|
||||
): Future[WakuLightPushResult] {.async.} =
|
||||
benchmarkPoint("waku_lightpush", "handleRequest")
|
||||
let pubsubTopic = pushRequest.pubSubTopic.valueOr:
|
||||
if wl.autoSharding.isNone():
|
||||
let msg = "Pubsub topic must be specified when static sharding is enabled"
|
||||
@ -78,6 +80,7 @@ proc handleRequest(
|
||||
proc handleRequest*(
|
||||
wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]
|
||||
): Future[LightPushResponse] {.async.} =
|
||||
benchmarkPoint("waku_lightpush", "handleRequest*")
|
||||
let pushRequest = LightPushRequest.decode(buffer).valueOr:
|
||||
let desc = decodeRpcFailure & ": " & $error
|
||||
error "failed to push message", error = desc
|
||||
|
||||
@ -15,7 +15,8 @@ import
|
||||
./rpc,
|
||||
./rpc_codec,
|
||||
../common/rate_limit/request_limiter,
|
||||
./common
|
||||
./common,
|
||||
waku/common/benchmark_metrics
|
||||
|
||||
from ../waku_core/codecs import WakuPeerExchangeCodec
|
||||
export WakuPeerExchangeCodec
|
||||
@ -38,6 +39,7 @@ type WakuPeerExchange* = ref object of LPProtocol
|
||||
proc respond(
|
||||
wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection
|
||||
): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
|
||||
benchmarkPoint("waku_peer_exchange", "respond")
|
||||
let rpc = PeerExchangeRpc.makeResponse(enrs.mapIt(PeerExchangePeerInfo(enr: it.raw)))
|
||||
|
||||
try:
|
||||
@ -60,6 +62,7 @@ proc respondError(
|
||||
status_desc: Option[string],
|
||||
conn: Connection,
|
||||
): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
|
||||
benchmarkPoint("waku_peer_exchange", "respondError")
|
||||
let rpc = PeerExchangeRpc.makeErrorResponse(status_code, status_desc)
|
||||
|
||||
try:
|
||||
@ -123,6 +126,7 @@ proc getEnrsFromStore(
|
||||
|
||||
proc initProtocolHandler(wpx: WakuPeerExchange) =
|
||||
proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} =
|
||||
benchmarkPoint("waku_peer_exchange", "handler")
|
||||
var buffer: seq[byte]
|
||||
wpx.requestRateLimiter.checkUsageLimit(WakuPeerExchangeCodec, conn):
|
||||
try:
|
||||
|
||||
@ -18,7 +18,8 @@ import
|
||||
libp2p/stream/connection,
|
||||
libp2p/switch
|
||||
import
|
||||
../waku_core, ./message_id, ./topic_health, ../node/delivery_monitor/publish_observer
|
||||
../waku_core, ./message_id, ./topic_health, ../node/delivery_monitor/publish_observer,
|
||||
../common/benchmark_metrics
|
||||
|
||||
from ../waku_core/codecs import WakuRelayCodec
|
||||
export WakuRelayCodec
|
||||
@ -510,6 +511,7 @@ proc generateOrderedValidator(w: WakuRelay): ValidatorHandler {.gcsafe.} =
|
||||
proc validateMessage*(
|
||||
w: WakuRelay, pubsubTopic: string, msg: WakuMessage
|
||||
): Future[Result[void, string]] {.async.} =
|
||||
benchmarkPoint("waku_relay", "validateMessage")
|
||||
let messageSizeBytes = msg.encode().buffer.len
|
||||
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()
|
||||
|
||||
@ -611,6 +613,7 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) =
|
||||
proc publish*(
|
||||
w: WakuRelay, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage
|
||||
): Future[Result[int, PublishOutcome]] {.async.} =
|
||||
benchmarkPoint("waku_relay", "publish")
|
||||
if pubsubTopic.isEmptyOrWhitespace():
|
||||
return err(NoTopicSpecified)
|
||||
|
||||
|
||||
@ -17,7 +17,8 @@ import
|
||||
../waku_core/peers,
|
||||
../waku_core/topics,
|
||||
../waku_core/topics/pubsub_topic,
|
||||
./common
|
||||
./common,
|
||||
waku/common/benchmark_metrics
|
||||
|
||||
logScope:
|
||||
topics = "waku rendezvous"
|
||||
@ -93,6 +94,8 @@ proc batchRequest*(
|
||||
): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} =
|
||||
## Request all records from all rendezvous peers matching a namespace
|
||||
|
||||
benchmarkPoint("waku_rendezvous", "batchRequest")
|
||||
|
||||
# rendezvous.request expects already opened connections
|
||||
# must dial first
|
||||
var futs = collect(newSeq):
|
||||
@ -137,6 +140,7 @@ proc advertiseAll(
|
||||
): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
info "waku rendezvous advertisements started"
|
||||
|
||||
benchmarkPoint("waku_rendezvous", "advertiseAll")
|
||||
let shards = self.getShards()
|
||||
|
||||
let futs = collect(newSeq):
|
||||
@ -176,6 +180,7 @@ proc initialRequestAll*(
|
||||
): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
info "waku rendezvous initial requests started"
|
||||
|
||||
benchmarkPoint("waku_rendezvous", "initialRequestAll")
|
||||
let shards = self.getShards()
|
||||
|
||||
let futs = collect(newSeq):
|
||||
|
||||
@ -21,7 +21,8 @@ import
|
||||
./constants,
|
||||
./protocol_types,
|
||||
./protocol_metrics,
|
||||
./nonce_manager
|
||||
./nonce_manager,
|
||||
waku/common/benchmark_metrics
|
||||
|
||||
import
|
||||
../common/error_handling,
|
||||
@ -178,6 +179,8 @@ proc validateMessage*(
|
||||
## `timeOption` indicates Unix epoch time (fractional part holds sub-seconds)
|
||||
## if `timeOption` is supplied, then the current epoch is calculated based on that
|
||||
|
||||
benchmarkPoint("waku_rln_relay", "validateMessage")
|
||||
|
||||
let proof = RateLimitProof.init(msg.proof).valueOr:
|
||||
return MessageValidationResult.Invalid
|
||||
|
||||
@ -260,6 +263,8 @@ proc validateMessageAndUpdateLog*(
|
||||
## validates the message and updates the log to prevent double messaging
|
||||
## in future messages
|
||||
|
||||
benchmarkPoint("waku_rln_relay", "validateMessageAndUpdateLog")
|
||||
|
||||
let isValidMessage = rlnPeer.validateMessage(msg)
|
||||
|
||||
let msgProof = RateLimitProof.init(msg.proof).valueOr:
|
||||
@ -283,6 +288,7 @@ proc appendRLNProof*(
|
||||
## `senderEpochTime` indicates the number of seconds passed since Unix epoch. The fractional part holds sub-seconds.
|
||||
## The `epoch` field of `RateLimitProof` is derived from the provided `senderEpochTime` (using `calcEpoch()`)
|
||||
|
||||
benchmarkPoint("waku_rln_relay", "appendRLNProof")
|
||||
let input = msg.toRLNSignal()
|
||||
let epoch = rlnPeer.calcEpoch(senderEpochTime)
|
||||
|
||||
@ -297,6 +303,7 @@ proc appendRLNProof*(
|
||||
proc clearNullifierLog*(rlnPeer: WakuRlnRelay) =
|
||||
# clear the first MaxEpochGap epochs of the nullifer log
|
||||
# if more than MaxEpochGap epochs are in the log
|
||||
benchmarkPoint("waku_rln_relay", "clearNullifierLog")
|
||||
let currentEpoch = fromEpoch(rlnPeer.getCurrentEpoch())
|
||||
|
||||
var epochsToRemove: seq[Epoch] = @[]
|
||||
@ -323,6 +330,8 @@ proc generateRlnValidator*(
|
||||
topic: string, message: WakuMessage
|
||||
): Future[pubsub.ValidationResult] {.async.} =
|
||||
trace "rln-relay topic validator is called"
|
||||
benchmarkPoint("waku_rln_relay", "generateRlnValidator.validator")
|
||||
|
||||
wakuRlnRelay.clearNullifierLog()
|
||||
|
||||
let msgProof = RateLimitProof.init(message.proof).valueOr:
|
||||
|
||||
@ -2,7 +2,12 @@
|
||||
|
||||
import std/[options, tables], results, chronicles, chronos, metrics, bearssl/rand
|
||||
import
|
||||
../node/peer_manager, ../utils/requests, ./protocol_metrics, ./common, ./rpc_codec
|
||||
../node/peer_manager,
|
||||
../utils/requests,
|
||||
./protocol_metrics,
|
||||
./common,
|
||||
./rpc_codec,
|
||||
../common/benchmark_metrics
|
||||
|
||||
logScope:
|
||||
topics = "waku store client"
|
||||
@ -66,6 +71,7 @@ proc sendStoreRequest(
|
||||
proc query*(
|
||||
self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo | PeerId
|
||||
): Future[StoreQueryResult] {.async, gcsafe.} =
|
||||
benchmarkPoint("waku_store_client", "query")
|
||||
if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor:
|
||||
return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor"))
|
||||
|
||||
@ -81,6 +87,7 @@ proc queryToAny*(
|
||||
): Future[StoreQueryResult] {.async.} =
|
||||
## This proc is similar to the query one but in this case
|
||||
## we don't specify a particular peer and instead we get it from peer manager
|
||||
benchmarkPoint("waku_store_client", "queryToAny")
|
||||
|
||||
if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor:
|
||||
return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor"))
|
||||
|
||||
@ -20,7 +20,8 @@ import
|
||||
./common,
|
||||
./rpc_codec,
|
||||
./protocol_metrics,
|
||||
../common/rate_limit/request_limiter
|
||||
../common/rate_limit/request_limiter,
|
||||
../common/benchmark_metrics
|
||||
|
||||
logScope:
|
||||
topics = "waku store"
|
||||
@ -41,6 +42,7 @@ type StoreResp = tuple[resp: seq[byte], requestId: string]
|
||||
proc handleQueryRequest(
|
||||
self: WakuStore, requestor: PeerId, raw_request: seq[byte]
|
||||
): Future[StoreResp] {.async.} =
|
||||
benchmarkPoint("waku_store", "handleQueryRequest")
|
||||
var res = StoreQueryResponse()
|
||||
|
||||
let req = StoreQueryRequest.decode(raw_request).valueOr:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user