mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-04-14 04:03:20 +00:00
add main loop lag monitor
This commit is contained in:
parent
f5762af4c4
commit
36e77c5d89
48
waku/node/health_monitor/event_loop_monitor.nim
Normal file
48
waku/node/health_monitor/event_loop_monitor.nim
Normal file
@ -0,0 +1,48 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import chronos, chronicles, metrics
|
||||
|
||||
logScope:
|
||||
topics = "waku event_loop_monitor"
|
||||
|
||||
const CheckInterval = 5.seconds
|
||||
|
||||
declarePublicGauge event_loop_lag_seconds,
|
||||
"chronos event loop lag in seconds: difference between actual and expected wake-up interval"
|
||||
|
||||
proc eventLoopMonitorLoop*() {.async.} =
|
||||
## Monitors chronos event loop responsiveness.
|
||||
##
|
||||
## Schedules a task every `CheckInterval`. Because chronos is single-threaded
|
||||
## and cooperative, the task can only resume after all previously queued work
|
||||
## completes. The actual elapsed time between iterations therefore reflects
|
||||
## how saturated the event loop is:
|
||||
##
|
||||
## actual_elapsed ≈ CheckInterval → loop is healthy
|
||||
## actual_elapsed >> CheckInterval → tasks are accumulating / loop is stalling
|
||||
##
|
||||
## The lag (actual - expected) is exposed via `event_loop_lag_seconds`.
|
||||
|
||||
var lastWakeup = Moment.now()
|
||||
while true:
|
||||
await sleepAsync(CheckInterval)
|
||||
|
||||
let now = Moment.now()
|
||||
let actualElapsed = now - lastWakeup
|
||||
let lag = actualElapsed - CheckInterval
|
||||
let lagSecs = lag.nanoseconds.float64 / 1_000_000_000.0
|
||||
|
||||
event_loop_lag_seconds.set(lagSecs)
|
||||
|
||||
if lag > CheckInterval:
|
||||
warn "chronos event loop severely lagging, many tasks may be accumulating",
|
||||
expected_secs = CheckInterval.seconds,
|
||||
actual_secs = actualElapsed.nanoseconds.float64 / 1_000_000_000.0,
|
||||
lag_secs = lagSecs
|
||||
elif lag > (CheckInterval div 2):
|
||||
info "chronos event loop lag detected",
|
||||
expected_secs = CheckInterval.seconds,
|
||||
actual_secs = actualElapsed.nanoseconds.float64 / 1_000_000_000.0,
|
||||
lag_secs = lagSecs
|
||||
|
||||
lastWakeup = now
|
||||
@ -21,6 +21,7 @@ import
|
||||
node/health_monitor/health_report,
|
||||
node/health_monitor/connection_status,
|
||||
node/health_monitor/protocol_health,
|
||||
node/health_monitor/event_loop_monitor,
|
||||
requests/health_requests,
|
||||
]
|
||||
|
||||
@ -36,6 +37,7 @@ type NodeHealthMonitor* = ref object
|
||||
onlineMonitor*: OnlineMonitor
|
||||
keepAliveFut: Future[void]
|
||||
healthLoopFut: Future[void]
|
||||
eventLoopMonitorFut: Future[void]
|
||||
healthUpdateEvent: AsyncEvent
|
||||
connectionStatus: ConnectionStatus
|
||||
onConnectionStatusChange*: ConnectionStatusChangeHandler
|
||||
@ -694,9 +696,11 @@ proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] =
|
||||
hm.healthUpdateEvent.fire()
|
||||
|
||||
hm.healthLoopFut = hm.healthLoop()
|
||||
hm.eventLoopMonitorFut = eventLoopMonitorLoop()
|
||||
|
||||
hm.startKeepalive().isOkOr:
|
||||
return err("startHealthMonitor: failed starting keep alive: " & error)
|
||||
|
||||
return ok()
|
||||
|
||||
proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} =
|
||||
@ -709,6 +713,9 @@ proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} =
|
||||
if not isNil(hm.healthLoopFut):
|
||||
await hm.healthLoopFut.cancelAndWait()
|
||||
|
||||
if not isNil(hm.eventLoopMonitorFut):
|
||||
await hm.eventLoopMonitorFut.cancelAndWait()
|
||||
|
||||
WakuPeerEvent.dropListener(hm.node.brokerCtx, hm.peerEventListener)
|
||||
EventShardTopicHealthChange.dropListener(hm.node.brokerCtx, hm.shardHealthListener)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user