add main loop lag monitor (#3803)

* add loop lagging as health status
This commit is contained in:
Ivan FB 2026-04-09 16:51:46 +02:00 committed by GitHub
parent 5503529531
commit ca7ec3de05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 77 additions and 2 deletions

View File

@ -0,0 +1,58 @@
{.push raises: [].}
import chronos, chronicles, metrics
logScope:
topics = "waku event_loop_monitor"
const CheckInterval = 5.seconds
declarePublicGauge event_loop_lag_seconds,
"chronos event loop lag in seconds: difference between actual and expected wake-up interval"
type OnLagChange* = proc(lagTooHigh: bool) {.gcsafe, raises: [].}
proc eventLoopMonitorLoop*(onLagChange: OnLagChange = nil) {.async.} =
## Monitors chronos event loop responsiveness.
##
## Schedules a task every `CheckInterval`. Because chronos is single-threaded
## and cooperative, the task can only resume after all previously queued work
## completes. The actual elapsed time between iterations therefore reflects
## how saturated the event loop is:
##
## actual_elapsed ≈ CheckInterval → loop is healthy
## actual_elapsed >> CheckInterval → tasks are accumulating / loop is stalling
##
## The lag (actual - expected) is exposed via `event_loop_lag_seconds`.
## When lag transitions above or below `CheckInterval`, `onLagChange` is called.
var lastWakeup = Moment.now()
var lagWasHigh = false
while true:
await sleepAsync(CheckInterval)
let now = Moment.now()
let actualElapsed = now - lastWakeup
let lag = actualElapsed - CheckInterval
let lagSecs = lag.nanoseconds.float64 / 1_000_000_000.0
event_loop_lag_seconds.set(lagSecs)
let lagIsHigh = lag > CheckInterval
if lag > CheckInterval:
warn "chronos event loop severely lagging, many tasks may be accumulating",
expected_secs = CheckInterval.seconds,
actual_secs = actualElapsed.nanoseconds.float64 / 1_000_000_000.0,
lag_secs = lagSecs
elif lag > (CheckInterval div 2):
info "chronos event loop lag detected",
expected_secs = CheckInterval.seconds,
actual_secs = actualElapsed.nanoseconds.float64 / 1_000_000_000.0,
lag_secs = lagSecs
if not isNil(onLagChange) and lagIsHigh != lagWasHigh:
lagWasHigh = lagIsHigh
onLagChange(lagIsHigh)
lastWakeup = now

View File

@ -7,6 +7,7 @@ type HealthStatus* {.pure.} = enum
NOT_READY
NOT_MOUNTED
SHUTTING_DOWN
EVENT_LOOP_LAGGING
proc init*(t: typedesc[HealthStatus], strRep: string): Result[HealthStatus, string] =
try:

View File

@ -21,6 +21,7 @@ import
node/health_monitor/health_report,
node/health_monitor/connection_status,
node/health_monitor/protocol_health,
node/health_monitor/event_loop_monitor,
requests/health_requests,
]
@ -36,6 +37,7 @@ type NodeHealthMonitor* = ref object
onlineMonitor*: OnlineMonitor
keepAliveFut: Future[void]
healthLoopFut: Future[void]
eventLoopMonitorFut: Future[void]
healthUpdateEvent: AsyncEvent
connectionStatus: ConnectionStatus
onConnectionStatusChange*: ConnectionStatusChangeHandler
@ -48,6 +50,9 @@ type NodeHealthMonitor* = ref object
relayObserver: PubSubObserver
peerEventListener: WakuPeerEventListener
shardHealthListener: EventShardTopicHealthChangeListener
eventLoopLagExceeded: bool
## set to true when the chronos event loop lag exceeds the severe threshold,
## causing the node health to be reported as EVENT_LOOP_LAGGING until lag recovers.
func getHealth*(report: HealthReport, kind: WakuProtocol): ProtocolHealth =
for h in report.protocolsHealth:
@ -441,7 +446,8 @@ proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.}
hm.cachedProtocols = await hm.getAllProtocolHealthInfo()
hm.connectionStatus = hm.calculateConnectionState()
report.nodeHealth = HealthStatus.READY
report.nodeHealth =
if hm.eventLoopLagExceeded: HealthStatus.EVENT_LOOP_LAGGING else: HealthStatus.READY
report.connectionStatus = hm.connectionStatus
report.protocolsHealth = hm.cachedProtocols
return report
@ -461,7 +467,8 @@ proc getSyncNodeHealthReport*(hm: NodeHealthMonitor): HealthReport =
hm.cachedProtocols = hm.getSyncAllProtocolHealthInfo()
hm.connectionStatus = hm.calculateConnectionState()
report.nodeHealth = HealthStatus.READY
report.nodeHealth =
if hm.eventLoopLagExceeded: HealthStatus.EVENT_LOOP_LAGGING else: HealthStatus.READY
report.connectionStatus = hm.connectionStatus
report.protocolsHealth = hm.cachedProtocols
return report
@ -694,9 +701,15 @@ proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] =
hm.healthUpdateEvent.fire()
hm.healthLoopFut = hm.healthLoop()
hm.eventLoopMonitorFut = eventLoopMonitorLoop(
proc(lagTooHigh: bool) {.gcsafe, raises: [].} =
hm.eventLoopLagExceeded = lagTooHigh
hm.healthUpdateEvent.fire()
)
hm.startKeepalive().isOkOr:
return err("startHealthMonitor: failed starting keep alive: " & error)
return ok()
proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} =
@ -709,6 +722,9 @@ proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} =
if not isNil(hm.healthLoopFut):
await hm.healthLoopFut.cancelAndWait()
if not isNil(hm.eventLoopMonitorFut):
await hm.eventLoopMonitorFut.cancelAndWait()
WakuPeerEvent.dropListener(hm.node.brokerCtx, hm.peerEventListener)
EventShardTopicHealthChange.dropListener(hm.node.brokerCtx, hm.shardHealthListener)