diff --git a/waku/node/health_monitor/event_loop_monitor.nim b/waku/node/health_monitor/event_loop_monitor.nim new file mode 100644 index 000000000..d4b8d98d2 --- /dev/null +++ b/waku/node/health_monitor/event_loop_monitor.nim @@ -0,0 +1,58 @@ +{.push raises: [].} + +import chronos, chronicles, metrics + +logScope: + topics = "waku event_loop_monitor" + +const CheckInterval = 5.seconds + +declarePublicGauge event_loop_lag_seconds, + "chronos event loop lag in seconds: difference between actual and expected wake-up interval" + +type OnLagChange* = proc(lagTooHigh: bool) {.gcsafe, raises: [].} + +proc eventLoopMonitorLoop*(onLagChange: OnLagChange = nil) {.async.} = + ## Monitors chronos event loop responsiveness. + ## + ## Schedules a task every `CheckInterval`. Because chronos is single-threaded + ## and cooperative, the task can only resume after all previously queued work + ## completes. The actual elapsed time between iterations therefore reflects + ## how saturated the event loop is: + ## + ## actual_elapsed ≈ CheckInterval → loop is healthy + ## actual_elapsed >> CheckInterval → tasks are accumulating / loop is stalling + ## + ## The lag (actual - expected) is exposed via `event_loop_lag_seconds`. + ## When lag transitions above or below `CheckInterval`, `onLagChange` is called. + + var lastWakeup = Moment.now() + var lagWasHigh = false + while true: + await sleepAsync(CheckInterval) + + let now = Moment.now() + let actualElapsed = now - lastWakeup + let lag = actualElapsed - CheckInterval + let lagSecs = lag.nanoseconds.float64 / 1_000_000_000.0 + + event_loop_lag_seconds.set(lagSecs) + + let lagIsHigh = lag > CheckInterval + + if lag > CheckInterval: + warn "chronos event loop severely lagging, many tasks may be accumulating", + expected_secs = CheckInterval.seconds, + actual_secs = actualElapsed.nanoseconds.float64 / 1_000_000_000.0, + lag_secs = lagSecs + elif lag > (CheckInterval div 2): + info "chronos event loop lag detected", + expected_secs = CheckInterval.seconds, + actual_secs = actualElapsed.nanoseconds.float64 / 1_000_000_000.0, + lag_secs = lagSecs + + if not isNil(onLagChange) and lagIsHigh != lagWasHigh: + lagWasHigh = lagIsHigh + onLagChange(lagIsHigh) + + lastWakeup = now diff --git a/waku/node/health_monitor/health_status.nim b/waku/node/health_monitor/health_status.nim index 4dd2bdd9a..91663a507 100644 --- a/waku/node/health_monitor/health_status.nim +++ b/waku/node/health_monitor/health_status.nim @@ -7,6 +7,7 @@ type HealthStatus* {.pure.} = enum NOT_READY NOT_MOUNTED SHUTTING_DOWN + EVENT_LOOP_LAGGING proc init*(t: typedesc[HealthStatus], strRep: string): Result[HealthStatus, string] = try: diff --git a/waku/node/health_monitor/node_health_monitor.nim b/waku/node/health_monitor/node_health_monitor.nim index 066e7776a..c92dc1aaf 100644 --- a/waku/node/health_monitor/node_health_monitor.nim +++ b/waku/node/health_monitor/node_health_monitor.nim @@ -21,6 +21,7 @@ import node/health_monitor/health_report, node/health_monitor/connection_status, node/health_monitor/protocol_health, + node/health_monitor/event_loop_monitor, requests/health_requests, ] @@ -36,6 +37,7 @@ type NodeHealthMonitor* = ref object onlineMonitor*: OnlineMonitor keepAliveFut: Future[void] healthLoopFut: Future[void] + eventLoopMonitorFut: Future[void] healthUpdateEvent: AsyncEvent connectionStatus: ConnectionStatus onConnectionStatusChange*: ConnectionStatusChangeHandler @@ -48,6 +50,9 @@ type NodeHealthMonitor* = ref object relayObserver: PubSubObserver peerEventListener: WakuPeerEventListener shardHealthListener: EventShardTopicHealthChangeListener + eventLoopLagExceeded: bool + ## set to true when the chronos event loop lag exceeds the severe threshold, + ## causing the node health to be reported as EVENT_LOOP_LAGGING until lag recovers. func getHealth*(report: HealthReport, kind: WakuProtocol): ProtocolHealth = for h in report.protocolsHealth: @@ -441,7 +446,8 @@ proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.} hm.cachedProtocols = await hm.getAllProtocolHealthInfo() hm.connectionStatus = hm.calculateConnectionState() - report.nodeHealth = HealthStatus.READY + report.nodeHealth = + if hm.eventLoopLagExceeded: HealthStatus.EVENT_LOOP_LAGGING else: HealthStatus.READY report.connectionStatus = hm.connectionStatus report.protocolsHealth = hm.cachedProtocols return report @@ -461,7 +467,8 @@ proc getSyncNodeHealthReport*(hm: NodeHealthMonitor): HealthReport = hm.cachedProtocols = hm.getSyncAllProtocolHealthInfo() hm.connectionStatus = hm.calculateConnectionState() - report.nodeHealth = HealthStatus.READY + report.nodeHealth = + if hm.eventLoopLagExceeded: HealthStatus.EVENT_LOOP_LAGGING else: HealthStatus.READY report.connectionStatus = hm.connectionStatus report.protocolsHealth = hm.cachedProtocols return report @@ -694,9 +701,15 @@ proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] = hm.healthUpdateEvent.fire() hm.healthLoopFut = hm.healthLoop() + hm.eventLoopMonitorFut = eventLoopMonitorLoop( + proc(lagTooHigh: bool) {.gcsafe, raises: [].} = + hm.eventLoopLagExceeded = lagTooHigh + hm.healthUpdateEvent.fire() + ) hm.startKeepalive().isOkOr: return err("startHealthMonitor: failed starting keep alive: " & error) + return ok() proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} = @@ -709,6 +722,9 @@ proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} = if not isNil(hm.healthLoopFut): await hm.healthLoopFut.cancelAndWait() + if not isNil(hm.eventLoopMonitorFut): + await hm.eventLoopMonitorFut.cancelAndWait() + WakuPeerEvent.dropListener(hm.node.brokerCtx, hm.peerEventListener) EventShardTopicHealthChange.dropListener(hm.node.brokerCtx, hm.shardHealthListener)