diff --git a/waku/common/benchmark_metrics.nim b/waku/common/benchmark_metrics.nim new file mode 100644 index 000000000..fb4c382ae --- /dev/null +++ b/waku/common/benchmark_metrics.nim @@ -0,0 +1,37 @@ +{.push raises: [].} + +import std/times +import metrics + +export metrics + +declarePublicSummary benchmark_duration_seconds, + "duration in seconds", ["module", "proc"] + +## Sets up a deferred timer that observes elapsed seconds into +## `benchmark_duration_seconds` when the enclosing scope exits. +## The summary's `_count` field tracks the number of calls. +## +## Both `module` and `procName` are static strings resolved at compile time, +## ensuring labels are always reliable regardless of build flags. +## +## Usage: +## import waku/common/benchmark_metrics +## +## proc myProc*() = +## benchmarkPoint("waku_relay", "myProc") +## # ... rest of the proc + +when defined(metrics): + proc recordBenchmark( + startTime: float64, module: string, procName: string + ) {.gcsafe, raises: [].} = + benchmark_duration_seconds.observe( + getTime().toUnixFloat() - startTime, labelValues = [module, procName] + ) + +template benchmarkPoint*(module: static string, procName: static string) = + when defined(metrics): + let bpStartTime = getTime().toUnixFloat() + defer: + recordBenchmark(bpStartTime, module, procName) diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index 0b298084e..286d42364 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -10,7 +10,7 @@ import chronos/threadsync, chronicles, strutils -import ./dbconn, ../common, ../../../waku_core/time +import ./dbconn, ../common, ../../../waku_core/time, waku/common/benchmark_metrics type # Database connection pool @@ -120,6 +120,7 @@ proc pgQuery*( rowCallback: DataProc = nil, requestId: string = "", ): Future[DatabaseResult[void]] {.async.} = + benchmarkPoint("postgres", "pgQuery") let connIndex = (await pool.getConnIndex()).valueOr: return err("connRes.isErr in query: " & $error) @@ -152,6 +153,7 @@ proc runStmt*( ## ## rowCallback != nil when it is expected to retrieve info from the database. ## rowCallback == nil for queries that change the database state. + benchmarkPoint("postgres", "runStmt") let connIndex = (await pool.getConnIndex()).valueOr: return err("Error in runStmt: " & $error) diff --git a/waku/node/health_monitor/event_loop_monitor.nim b/waku/node/health_monitor/event_loop_monitor.nim new file mode 100644 index 000000000..c0f51905c --- /dev/null +++ b/waku/node/health_monitor/event_loop_monitor.nim @@ -0,0 +1,48 @@ +{.push raises: [].} + +## Monitors chronos event loop responsiveness. +## +## Schedules a task every `CheckInterval`. Because chronos is single-threaded +## and cooperative, the task can only resume after all previously queued work +## completes. The actual elapsed time between iterations therefore reflects +## how saturated the event loop is: +## +## actual_elapsed ≈ CheckInterval → loop is healthy +## actual_elapsed >> CheckInterval → tasks are accumulating / loop is stalling +## +## The lag (actual - expected) is exposed via `event_loop_lag_seconds`. + +import chronos, chronicles, metrics + +logScope: + topics = "waku event_loop_monitor" + +const CheckInterval = 5.seconds + +declarePublicGauge event_loop_lag_seconds, + "chronos event loop lag in seconds: difference between actual and expected wake-up interval" + +proc eventLoopMonitorLoop*() {.async.} = + var lastWakeup = Moment.now() + while true: + await sleepAsync(CheckInterval) + + let now = Moment.now() + let actualElapsed = now - lastWakeup + let lag = actualElapsed - CheckInterval + let lagSecs = lag.nanoseconds.float64 / 1_000_000_000.0 + + event_loop_lag_seconds.set(lagSecs) + + if lag > CheckInterval: + warn "chronos event loop severely lagging, many tasks may be accumulating", + expected_secs = CheckInterval.seconds, + actual_secs = actualElapsed.nanoseconds.float64 / 1_000_000_000.0, + lag_secs = lagSecs + elif lag > (CheckInterval div 2): + info "chronos event loop lag detected", + expected_secs = CheckInterval.seconds, + actual_secs = actualElapsed.nanoseconds.float64 / 1_000_000_000.0, + lag_secs = lagSecs + + lastWakeup = now diff --git a/waku/node/health_monitor/node_health_monitor.nim b/waku/node/health_monitor/node_health_monitor.nim index eb5d0ed8c..1ffdfe081 100644 --- a/waku/node/health_monitor/node_health_monitor.nim +++ b/waku/node/health_monitor/node_health_monitor.nim @@ -14,7 +14,8 @@ import ../peer_manager, ./online_monitor, ./health_status, - ./protocol_health + ./protocol_health, + ./event_loop_monitor ## This module is aimed to check the state of the "self" Waku Node @@ -32,6 +33,7 @@ type node: WakuNode onlineMonitor*: OnlineMonitor keepAliveFut: Future[void] + eventLoopMonitorFut: Future[void] template checkWakuNodeNotNil(node: WakuNode, p: ProtocolHealth): untyped = if node.isNil(): @@ -416,6 +418,7 @@ proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] = hm.onlineMonitor.startOnlineMonitor() hm.startKeepalive().isOkOr: return err("startHealthMonitor: failed starting keep alive: " & error) + hm.eventLoopMonitorFut = eventLoopMonitorLoop() return ok() proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} = @@ -425,6 +428,9 @@ proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} = if not hm.keepAliveFut.isNil(): await hm.keepAliveFut.cancelAndWait() + if not hm.eventLoopMonitorFut.isNil(): + await hm.eventLoopMonitorFut.cancelAndWait() + proc new*( T: type NodeHealthMonitor, dnsNameServers = @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")], diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index 4d0c49a84..94a71bec4 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -11,7 +11,8 @@ import ./common, ./protocol_metrics, ./rpc, - ./rpc_codec + ./rpc_codec, + ../common/benchmark_metrics logScope: topics = "waku lightpush client" @@ -76,6 +77,7 @@ proc publish*( wakuMessage: WakuMessage, peer: PeerId | RemotePeerInfo, ): Future[WakuLightPushResult] {.async, gcsafe.} = + benchmarkPoint("waku_lightpush_client", "publish") var message = wakuMessage if message.timestamp == 0: message.timestamp = getNowInNanosecondTime() @@ -104,6 +106,7 @@ proc publishToAny*( ): Future[WakuLightPushResult] {.async, gcsafe.} = ## This proc is similar to the publish one but in this case ## we don't specify a particular peer and instead we get it from peer manager + benchmarkPoint("waku_lightpush_client", "publishToAny") var message = wakuMessage if message.timestamp == 0: @@ -140,6 +143,7 @@ proc publishWithConn*( conn: Connection, destPeer: PeerId, ): Future[WakuLightPushResult] {.async, gcsafe.} = + benchmarkPoint("waku_lightpush_client", "publishWithConn") info "publishWithConn", my_peer_id = wl.peerManager.switch.peerInfo.peerId, peer_id = destPeer, diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 2e8c9c2f1..ac816ccc3 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -16,7 +16,8 @@ import ./rpc, ./rpc_codec, ./protocol_metrics, - ../common/rate_limit/request_limiter + ../common/rate_limit/request_limiter, + ../common/benchmark_metrics logScope: topics = "waku lightpush" @@ -31,6 +32,7 @@ type WakuLightPush* = ref object of LPProtocol proc handleRequest( wl: WakuLightPush, peerId: PeerId, pushRequest: LightpushRequest ): Future[WakuLightPushResult] {.async.} = + benchmarkPoint("waku_lightpush", "handleRequest") let pubsubTopic = pushRequest.pubSubTopic.valueOr: if wl.autoSharding.isNone(): let msg = "Pubsub topic must be specified when static sharding is enabled" @@ -78,6 +80,7 @@ proc handleRequest( proc handleRequest*( wl: WakuLightPush, peerId: PeerId, buffer: seq[byte] ): Future[LightPushResponse] {.async.} = + benchmarkPoint("waku_lightpush", "handleRequest*") let pushRequest = LightPushRequest.decode(buffer).valueOr: let desc = decodeRpcFailure & ": " & $error error "failed to push message", error = desc diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index b99f5eabf..2d6595982 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -15,7 +15,8 @@ import ./rpc, ./rpc_codec, ../common/rate_limit/request_limiter, - ./common + ./common, + waku/common/benchmark_metrics from ../waku_core/codecs import WakuPeerExchangeCodec export WakuPeerExchangeCodec @@ -38,6 +39,7 @@ type WakuPeerExchange* = ref object of LPProtocol proc respond( wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection ): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = + benchmarkPoint("waku_peer_exchange", "respond") let rpc = PeerExchangeRpc.makeResponse(enrs.mapIt(PeerExchangePeerInfo(enr: it.raw))) try: @@ -60,6 +62,7 @@ proc respondError( status_desc: Option[string], conn: Connection, ): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = + benchmarkPoint("waku_peer_exchange", "respondError") let rpc = PeerExchangeRpc.makeErrorResponse(status_code, status_desc) try: @@ -123,6 +126,7 @@ proc getEnrsFromStore( proc initProtocolHandler(wpx: WakuPeerExchange) = proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} = + benchmarkPoint("waku_peer_exchange", "handler") var buffer: seq[byte] wpx.requestRateLimiter.checkUsageLimit(WakuPeerExchangeCodec, conn): try: diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index cbf9123dd..4f3b70d3f 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -18,7 +18,8 @@ import libp2p/stream/connection, libp2p/switch import - ../waku_core, ./message_id, ./topic_health, ../node/delivery_monitor/publish_observer + ../waku_core, ./message_id, ./topic_health, ../node/delivery_monitor/publish_observer, + ../common/benchmark_metrics from ../waku_core/codecs import WakuRelayCodec export WakuRelayCodec @@ -510,6 +511,7 @@ proc generateOrderedValidator(w: WakuRelay): ValidatorHandler {.gcsafe.} = proc validateMessage*( w: WakuRelay, pubsubTopic: string, msg: WakuMessage ): Future[Result[void, string]] {.async.} = + benchmarkPoint("waku_relay", "validateMessage") let messageSizeBytes = msg.encode().buffer.len let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex() @@ -611,6 +613,7 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) = proc publish*( w: WakuRelay, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage ): Future[Result[int, PublishOutcome]] {.async.} = + benchmarkPoint("waku_relay", "publish") if pubsubTopic.isEmptyOrWhitespace(): return err(NoTopicSpecified) diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index 0eb55d350..7737f12b9 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -17,7 +17,8 @@ import ../waku_core/peers, ../waku_core/topics, ../waku_core/topics/pubsub_topic, - ./common + ./common, + waku/common/benchmark_metrics logScope: topics = "waku rendezvous" @@ -93,6 +94,8 @@ proc batchRequest*( ): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} = ## Request all records from all rendezvous peers matching a namespace + benchmarkPoint("waku_rendezvous", "batchRequest") + # rendezvous.request expects already opened connections # must dial first var futs = collect(newSeq): @@ -137,6 +140,7 @@ proc advertiseAll( ): Future[Result[void, string]] {.async: (raises: []).} = info "waku rendezvous advertisements started" + benchmarkPoint("waku_rendezvous", "advertiseAll") let shards = self.getShards() let futs = collect(newSeq): @@ -176,6 +180,7 @@ proc initialRequestAll*( ): Future[Result[void, string]] {.async: (raises: []).} = info "waku rendezvous initial requests started" + benchmarkPoint("waku_rendezvous", "initialRequestAll") let shards = self.getShards() let futs = collect(newSeq): diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 6a8fea2b5..d02058fa3 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -21,7 +21,8 @@ import ./constants, ./protocol_types, ./protocol_metrics, - ./nonce_manager + ./nonce_manager, + waku/common/benchmark_metrics import ../common/error_handling, @@ -178,6 +179,8 @@ proc validateMessage*( ## `timeOption` indicates Unix epoch time (fractional part holds sub-seconds) ## if `timeOption` is supplied, then the current epoch is calculated based on that + benchmarkPoint("waku_rln_relay", "validateMessage") + let proof = RateLimitProof.init(msg.proof).valueOr: return MessageValidationResult.Invalid @@ -260,6 +263,8 @@ proc validateMessageAndUpdateLog*( ## validates the message and updates the log to prevent double messaging ## in future messages + benchmarkPoint("waku_rln_relay", "validateMessageAndUpdateLog") + let isValidMessage = rlnPeer.validateMessage(msg) let msgProof = RateLimitProof.init(msg.proof).valueOr: @@ -283,6 +288,7 @@ proc appendRLNProof*( ## `senderEpochTime` indicates the number of seconds passed since Unix epoch. The fractional part holds sub-seconds. ## The `epoch` field of `RateLimitProof` is derived from the provided `senderEpochTime` (using `calcEpoch()`) + benchmarkPoint("waku_rln_relay", "appendRLNProof") let input = msg.toRLNSignal() let epoch = rlnPeer.calcEpoch(senderEpochTime) @@ -297,6 +303,7 @@ proc appendRLNProof*( proc clearNullifierLog*(rlnPeer: WakuRlnRelay) = # clear the first MaxEpochGap epochs of the nullifer log # if more than MaxEpochGap epochs are in the log + benchmarkPoint("waku_rln_relay", "clearNullifierLog") let currentEpoch = fromEpoch(rlnPeer.getCurrentEpoch()) var epochsToRemove: seq[Epoch] = @[] @@ -323,6 +330,8 @@ proc generateRlnValidator*( topic: string, message: WakuMessage ): Future[pubsub.ValidationResult] {.async.} = trace "rln-relay topic validator is called" + benchmarkPoint("waku_rln_relay", "generateRlnValidator.validator") + wakuRlnRelay.clearNullifierLog() let msgProof = RateLimitProof.init(message.proof).valueOr: diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 308d7f98e..7daa816ae 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -2,7 +2,12 @@ import std/[options, tables], results, chronicles, chronos, metrics, bearssl/rand import - ../node/peer_manager, ../utils/requests, ./protocol_metrics, ./common, ./rpc_codec + ../node/peer_manager, + ../utils/requests, + ./protocol_metrics, + ./common, + ./rpc_codec, + ../common/benchmark_metrics logScope: topics = "waku store client" @@ -66,6 +71,7 @@ proc sendStoreRequest( proc query*( self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo | PeerId ): Future[StoreQueryResult] {.async, gcsafe.} = + benchmarkPoint("waku_store_client", "query") if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor: return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor")) @@ -81,6 +87,7 @@ proc queryToAny*( ): Future[StoreQueryResult] {.async.} = ## This proc is similar to the query one but in this case ## we don't specify a particular peer and instead we get it from peer manager + benchmarkPoint("waku_store_client", "queryToAny") if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor: return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor")) diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index 891c6a93c..e9bbc4288 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -20,7 +20,8 @@ import ./common, ./rpc_codec, ./protocol_metrics, - ../common/rate_limit/request_limiter + ../common/rate_limit/request_limiter, + ../common/benchmark_metrics logScope: topics = "waku store" @@ -41,6 +42,7 @@ type StoreResp = tuple[resp: seq[byte], requestId: string] proc handleQueryRequest( self: WakuStore, requestor: PeerId, raw_request: seq[byte] ): Future[StoreResp] {.async.} = + benchmarkPoint("waku_store", "handleQueryRequest") var res = StoreQueryResponse() let req = StoreQueryRequest.decode(raw_request).valueOr: