From 294dd03c457ce35e073fb576177a81afa3353d75 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 8 Nov 2024 14:59:02 +0700 Subject: [PATCH] chore: libwaku - better error handling and better waku thread destroy handling (#3167) --- .../networkmonitor/networkmonitor_metrics.nim | 3 +- library/libwaku.nim | 2 +- library/waku_thread/waku_thread.nim | 57 ++++++++++++------- 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/apps/networkmonitor/networkmonitor_metrics.nim b/apps/networkmonitor/networkmonitor_metrics.nim index 0dcd02298..dda3e57c7 100644 --- a/apps/networkmonitor/networkmonitor_metrics.nim +++ b/apps/networkmonitor/networkmonitor_metrics.nim @@ -37,8 +37,7 @@ declarePublicGauge networkmonitor_peer_user_agents, declarePublicHistogram networkmonitor_peer_ping, "Histogram tracking ping durations for discovered peers", - buckets = - [10.0, 20.0, 50.0, 100.0, 200.0, 300.0, 500.0, 800.0, 1000.0, 2000.0, Inf] + buckets = [10.0, 20.0, 50.0, 100.0, 200.0, 300.0, 500.0, 800.0, 1000.0, 2000.0, Inf] declarePublicGauge networkmonitor_peer_count, "Number of discovered peers", labels = ["connected"] diff --git a/library/libwaku.nim b/library/libwaku.nim index 2feb44302..36286a386 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -168,7 +168,7 @@ proc waku_destroy( ): cint {.dynlib, exportc.} = checkLibwakuParams(ctx, callback, userData) - waku_thread.stopWakuThread(ctx).handleRes(callback, userData) + waku_thread.destroyWakuThread(ctx).handleRes(callback, userData) proc waku_version( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 5967cbfdd..5babbf380 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -18,36 +18,45 @@ type WakuContext* = object userData*: pointer eventCallback*: pointer eventUserdata*: pointer + running: Atomic[bool] # To control when the thread is running const git_version* {.strdefine.} = "n/a" const versionString = "version / git commit hash: " & waku.git_version -# To control when the thread is running -# TODO: this should be part of the context so multiple instances can be executed -var running: Atomic[bool] - proc runWaku(ctx: ptr WakuContext) {.async.} = ## This is the worker body. This runs the Waku node ## and attends library user requests (stop, connect_to, etc.) - info "Starting Waku", version = versionString var waku: Waku - while running.load == true: + while true: await ctx.reqSignal.wait() - # Trying to get a request from the libwaku main thread + if ctx.running.load == false: + break + + ## Trying to get a request from the libwaku requestor thread var request: ptr InterThreadRequest let recvOk = ctx.reqChannel.tryRecv(request) - if recvOk == true: - let resultResponse = waitFor InterThreadRequest.process(request, addr waku) + if not recvOk: + error "waku thread could not receive a request" + continue - ## Converting a `Result` into a thread-safe transferable response type - let threadSafeResp = InterThreadResponse.createShared(resultResponse) + ## Handle the request + let resultResponse = waitFor InterThreadRequest.process(request, addr waku) - ## The error-handling is performed in the main thread - discard ctx.respChannel.trySend(threadSafeResp) - discard ctx.respSignal.fireSync() + ## Converting a `Result` into a thread-safe transferable response type + let threadSafeResp = InterThreadResponse.createShared(resultResponse) + + ## Send the response back to the thread that sent the request + let sentOk = ctx.respChannel.trySend(threadSafeResp) + if not sentOk: + error "could not send a request to the requester thread", + original_request = $request[] + + let fireRes = ctx.respSignal.fireSync() + if fireRes.isErr(): + error "could not fireSync back to requester thread", error = fireRes.error proc run(ctx: ptr WakuContext) {.thread.} = ## Launch waku worker @@ -62,7 +71,7 @@ proc createWakuThread*(): Result[ptr WakuContext, string] = ctx.respSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create respSignal ThreadSignalPtr") - running.store(true) + ctx.running.store(true) try: createThread(ctx.thread, run, ctx) @@ -74,15 +83,19 @@ proc createWakuThread*(): Result[ptr WakuContext, string] = return ok(ctx) -proc stopWakuThread*(ctx: ptr WakuContext): Result[void, string] = - running.store(false) - let fireRes = ctx.reqSignal.fireSync() - if fireRes.isErr(): - return err("error in stopWakuThread: " & $fireRes.error) - discard ctx.reqSignal.close() - discard ctx.respSignal.close() +proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] = + ctx.running.store(false) + + let signaledOnTime = ctx.reqSignal.fireSync().valueOr: + return err("error in destroyWakuThread: " & $error) + if not signaledOnTime: + return err("failed to signal reqSignal on time in destroyWakuThread") + joinThread(ctx.thread) + ?ctx.reqSignal.close() + ?ctx.respSignal.close() freeShared(ctx) + return ok() proc sendRequestToWakuThread*(