From d7a3a85db9dbea35f865c623b6d22f1e7af78076 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Tue, 24 Jun 2025 23:20:08 +0200 Subject: [PATCH] chore: Libwaku watchdog that can potentially raise a WakuNotResponding event if Waku is blocked (#3466) * refactor add waku not responding event to libwaku Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> --- .../events/json_waku_not_responding_event.nim | 9 + library/libwaku.h | 2 + library/libwaku.nim | 43 +--- .../requests/debug_node_request.nim | 3 + library/waku_thread/waku_thread.nim | 227 ++++++++++++------ 5 files changed, 175 insertions(+), 109 deletions(-) create mode 100644 library/events/json_waku_not_responding_event.nim diff --git a/library/events/json_waku_not_responding_event.nim b/library/events/json_waku_not_responding_event.nim new file mode 100644 index 000000000..1e1d5fcc5 --- /dev/null +++ b/library/events/json_waku_not_responding_event.nim @@ -0,0 +1,9 @@ +import system, std/json, ./json_base_event + +type JsonWakuNotRespondingEvent* = ref object of JsonEvent + +proc new*(T: type JsonWakuNotRespondingEvent): T = + return JsonWakuNotRespondingEvent(eventType: "waku_not_responding") + +method `$`*(event: JsonWakuNotRespondingEvent): string = + $(%*event) diff --git a/library/libwaku.h b/library/libwaku.h index 525fec69a..b5d6c9bab 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -45,6 +45,8 @@ int waku_version(void* ctx, WakuCallBack callback, void* userData); +// Sets a callback that will be invoked whenever an event occurs. +// It is crucial that the passed callback is fast, non-blocking and potentially thread-safe. void waku_set_event_callback(void* ctx, WakuCallBack callback, void* userData); diff --git a/library/libwaku.nim b/library/libwaku.nim index 3e4431411..bc1614af8 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -15,8 +15,7 @@ import waku/waku_core/topics/pubsub_topic, waku/waku_core/subscription/push_handler, waku/waku_relay, - ./events/ - [json_message_event, json_topic_health_change_event, json_connection_change_event], + ./events/json_message_event, ./waku_thread/waku_thread, ./waku_thread/inter_thread_communication/requests/node_lifecycle_request, ./waku_thread/inter_thread_communication/requests/peer_manager_request, @@ -48,25 +47,6 @@ template checkLibwakuParams*( if isNil(callback): return RET_MISSING_CALLBACK -template callEventCallback(ctx: ptr WakuContext, eventName: string, body: untyped) = - if isNil(ctx[].eventCallback): - error eventName & " - eventCallback is nil" - return - - foreignThreadGc: - try: - let event = body - cast[WakuCallBack](ctx[].eventCallback)( - RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData - ) - except Exception, CatchableError: - let msg = - "Exception " & eventName & " when calling 'eventCallBack': " & - getCurrentExceptionMsg() - cast[WakuCallBack](ctx[].eventCallback)( - RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData - ) - proc handleRequest( ctx: ptr WakuContext, requestType: RequestType, @@ -81,21 +61,6 @@ proc handleRequest( return RET_OK -proc onConnectionChange(ctx: ptr WakuContext): ConnectionChangeHandler = - return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} = - callEventCallback(ctx, "onConnectionChange"): - $JsonConnectionChangeEvent.new($peerId, peerEvent) - -proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler = - return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} = - callEventCallback(ctx, "onReceivedMessage"): - $JsonMessageEvent.new(pubsubTopic, msg) - -proc onTopicHealthChange(ctx: ptr WakuContext): TopicHealthChangeHandler = - return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} = - callEventCallback(ctx, "onTopicHealthChange"): - $JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth) - ### End of not-exported components ################################################################################ @@ -146,8 +111,8 @@ proc waku_new( return nil ## Create the Waku thread that will keep waiting for req from the main thread. - var ctx = waku_thread.createWakuThread().valueOr: - let msg = "Error in createWakuThread: " & $error + var ctx = waku_thread.createWakuContext().valueOr: + let msg = "Error in createWakuContext: " & $error callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return nil @@ -180,7 +145,7 @@ proc waku_destroy( initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread.destroyWakuThread(ctx).isOkOr: + waku_thread.destroyWakuContext(ctx).isOkOr: let msg = "libwaku error: " & $error callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR diff --git a/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim b/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim index 4ab8914ee..0bd9235b6 100644 --- a/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim @@ -18,6 +18,7 @@ type DebugNodeMsgType* = enum RETRIEVE_MY_PEER_ID RETRIEVE_METRICS RETRIEVE_ONLINE_STATE + CHECK_WAKU_NOT_BLOCKED type DebugNodeRequest* = object operation: DebugNodeMsgType @@ -55,6 +56,8 @@ proc process*( return ok(getMetrics()) of RETRIEVE_ONLINE_STATE: return ok($waku.healthMonitor.onlineMonitor.amIOnline()) + of CHECK_WAKU_NOT_BLOCKED: + return ok("waku thread is not blocked") error "unsupported operation in DebugNodeRequest" return err("unsupported operation in DebugNodeRequest") diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 640389e32..37c37e6df 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -4,10 +4,22 @@ import std/[options, atomics, os, net, locks] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results -import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types +import + waku/factory/waku, + waku/node/peer_manager, + waku/waku_relay/[protocol, topic_health], + waku/waku_core/[topics/pubsub_topic, message], + ./inter_thread_communication/[waku_thread_request, requests/debug_node_request], + ../ffi_types, + ../events/[ + json_message_event, json_topic_health_change_event, json_connection_change_event, + json_waku_not_responding_event, + ] type WakuContext* = object - thread: Thread[(ptr WakuContext)] + wakuThread: Thread[(ptr WakuContext)] + watchdogThread: Thread[(ptr WakuContext)] + # monitors the Waku thread and notifies the Waku SDK consumer if it hangs lock: Lock reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest] reqSignal: ThreadSignalPtr @@ -17,78 +29,48 @@ type WakuContext* = object userData*: pointer eventCallback*: pointer eventUserdata*: pointer - running: Atomic[bool] # To control when the thread is running + running: Atomic[bool] # To control when the threads are running const git_version* {.strdefine.} = "n/a" const versionString = "version / git commit hash: " & waku.git_version -proc runWaku(ctx: ptr WakuContext) {.async.} = - ## This is the worker body. This runs the Waku node - ## and attends library user requests (stop, connect_to, etc.) +template callEventCallback(ctx: ptr WakuContext, eventName: string, body: untyped) = + if isNil(ctx[].eventCallback): + error eventName & " - eventCallback is nil" + return - var waku: Waku + foreignThreadGc: + try: + let event = body + cast[WakuCallBack](ctx[].eventCallback)( + RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData + ) + except Exception, CatchableError: + let msg = + "Exception " & eventName & " when calling 'eventCallBack': " & + getCurrentExceptionMsg() + cast[WakuCallBack](ctx[].eventCallback)( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData + ) - while true: - await ctx.reqSignal.wait() +proc onConnectionChange*(ctx: ptr WakuContext): ConnectionChangeHandler = + return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} = + callEventCallback(ctx, "onConnectionChange"): + $JsonConnectionChangeEvent.new($peerId, peerEvent) - if ctx.running.load == false: - break +proc onReceivedMessage*(ctx: ptr WakuContext): WakuRelayHandler = + return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} = + callEventCallback(ctx, "onReceivedMessage"): + $JsonMessageEvent.new(pubsubTopic, msg) - ## Trying to get a request from the libwaku requestor thread - var request: ptr WakuThreadRequest - let recvOk = ctx.reqChannel.tryRecv(request) - if not recvOk: - error "waku thread could not receive a request" - continue +proc onTopicHealthChange*(ctx: ptr WakuContext): TopicHealthChangeHandler = + return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} = + callEventCallback(ctx, "onTopicHealthChange"): + $JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth) - let fireRes = ctx.reqReceivedSignal.fireSync() - if fireRes.isErr(): - error "could not fireSync back to requester thread", error = fireRes.error - - ## Handle the request - asyncSpawn WakuThreadRequest.process(request, addr waku) - -proc run(ctx: ptr WakuContext) {.thread.} = - ## Launch waku worker - waitFor runWaku(ctx) - -proc createWakuThread*(): Result[ptr WakuContext, string] = - ## This proc is called from the main thread and it creates - ## the Waku working thread. - var ctx = createShared(WakuContext, 1) - ctx.reqSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create reqSignal ThreadSignalPtr") - ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create reqReceivedSignal ThreadSignalPtr") - ctx.lock.initLock() - - ctx.running.store(true) - - try: - createThread(ctx.thread, run, ctx) - except ValueError, ResourceExhaustedError: - # and freeShared for typed allocations! - freeShared(ctx) - - return err("failed to create the Waku thread: " & getCurrentExceptionMsg()) - - return ok(ctx) - -proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] = - ctx.running.store(false) - - let signaledOnTime = ctx.reqSignal.fireSync().valueOr: - return err("error in destroyWakuThread: " & $error) - if not signaledOnTime: - return err("failed to signal reqSignal on time in destroyWakuThread") - - joinThread(ctx.thread) - ctx.lock.deinitLock() - ?ctx.reqSignal.close() - ?ctx.reqReceivedSignal.close() - freeShared(ctx) - - return ok() +proc onWakuNotResponding*(ctx: ptr WakuContext) = + callEventCallback(ctx, "onWakuNotResponsive"): + $JsonWakuNotRespondingEvent.new() proc sendRequestToWakuThread*( ctx: ptr WakuContext, @@ -96,16 +78,17 @@ proc sendRequestToWakuThread*( reqContent: pointer, callback: WakuCallBack, userData: pointer, + timeout = InfiniteDuration, ): Result[void, string] = - let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData) - + ctx.lock.acquire() # This lock is only necessary while we use a SP Channel and while the signalling # between threads assumes that there aren't concurrent requests. # Rearchitecting the signaling + migrating to a MP Channel will allow us to receive # requests concurrently and spare us the need of locks - ctx.lock.acquire() defer: ctx.lock.release() + + let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData) ## Sending the request let sentOk = ctx.reqChannel.trySend(req) if not sentOk: @@ -122,11 +105,115 @@ proc sendRequestToWakuThread*( return err("Couldn't fireSync in time") ## wait until the Waku Thread properly received the request - let res = ctx.reqReceivedSignal.waitSync() + let res = ctx.reqReceivedSignal.waitSync(timeout) if res.isErr(): deallocShared(req) return err("Couldn't receive reqReceivedSignal signal") ## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the - ## process proc. + ## process proc. See the 'waku_thread_request.nim' module for more details. ok() + +proc watchdogThreadBody(ctx: ptr WakuContext) {.thread.} = + ## Watchdog thread that monitors the Waku thread and notifies the library user if it hangs. + + let watchdogRun = proc(ctx: ptr WakuContext) {.async.} = + const WatchdogTimeinterval = 1.seconds + const WakuNotRespondingTimeout = 3.seconds + while true: + await sleepAsync(WatchdogTimeinterval) + + if ctx.running.load == false: + debug "Watchdog thread exiting because WakuContext is not running" + break + + let wakuCallback = proc( + callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer + ) {.cdecl, gcsafe, raises: [].} = + discard ## Don't do anything. Just respecting the callback signature. + const nilUserData = nil + + trace "Sending watchdog request to Waku thread" + + sendRequestToWakuThread( + ctx, + RequestType.DEBUG, + DebugNodeRequest.createShared(DebugNodeMsgType.CHECK_WAKU_NOT_BLOCKED), + wakuCallback, + nilUserData, + WakuNotRespondingTimeout, + ).isOkOr: + error "Failed to send watchdog request to Waku thread", error = $error + onWakuNotResponding(ctx) + + waitFor watchdogRun(ctx) + +proc wakuThreadBody(ctx: ptr WakuContext) {.thread.} = + ## Waku thread that attends library user requests (stop, connect_to, etc.) + + let wakuRun = proc(ctx: ptr WakuContext) {.async.} = + var waku: Waku + while true: + await ctx.reqSignal.wait() + + if ctx.running.load == false: + break + + ## Trying to get a request from the libwaku requestor thread + var request: ptr WakuThreadRequest + let recvOk = ctx.reqChannel.tryRecv(request) + if not recvOk: + error "waku thread could not receive a request" + continue + + let fireRes = ctx.reqReceivedSignal.fireSync() + if fireRes.isErr(): + error "could not fireSync back to requester thread", error = fireRes.error + + ## Handle the request + asyncSpawn WakuThreadRequest.process(request, addr waku) + + waitFor wakuRun(ctx) + +proc createWakuContext*(): Result[ptr WakuContext, string] = + ## This proc is called from the main thread and it creates + ## the Waku working thread. + var ctx = createShared(WakuContext, 1) + ctx.reqSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqSignal ThreadSignalPtr") + ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqReceivedSignal ThreadSignalPtr") + ctx.lock.initLock() + + ctx.running.store(true) + + try: + createThread(ctx.wakuThread, wakuThreadBody, ctx) + except ValueError, ResourceExhaustedError: + freeShared(ctx) + return err("failed to create the Waku thread: " & getCurrentExceptionMsg()) + + try: + createThread(ctx.watchdogThread, watchdogThreadBody, ctx) + except ValueError, ResourceExhaustedError: + freeShared(ctx) + return err("failed to create the watchdog thread: " & getCurrentExceptionMsg()) + + return ok(ctx) + +proc destroyWakuContext*(ctx: ptr WakuContext): Result[void, string] = + ctx.running.store(false) + + let signaledOnTime = ctx.reqSignal.fireSync().valueOr: + return err("error in destroyWakuContext: " & $error) + if not signaledOnTime: + return err("failed to signal reqSignal on time in destroyWakuContext") + + joinThread(ctx.wakuThread) + joinThread(ctx.watchdogThread) + ctx.lock.deinitLock() + ?ctx.reqSignal.close() + ?ctx.reqReceivedSignal.close() + freeShared(ctx) + + return ok()