From 6cea737109223bbd2900519d8d958593086b9798 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 22 Sep 2023 16:36:31 +0200 Subject: [PATCH] waku_thread.nim: Using 'ThreadSignalPtr' instead of loop to handle req/resp (#2045) --- library/waku_thread/waku_thread.nim | 33 ++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index c9a405117..a2965e4dd 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -8,9 +8,10 @@ import import chronicles, chronos, + chronos/threadsync, + taskpools/channels_spsc_single, stew/results, - stew/shims/net, - taskpools/channels_spsc_single + stew/shims/net import ../../../waku/node/waku_node, ../events/[json_error_event,json_message_event,json_base_event], @@ -21,7 +22,9 @@ type Context* = object thread: Thread[(ptr Context)] reqChannel: ChannelSPSCSingle[ptr InterThreadRequest] + reqSignal: ThreadSignalPtr respChannel: ChannelSPSCSingle[ptr InterThreadResponse] + respSignal: ThreadSignalPtr var ctx {.threadvar.}: ptr Context @@ -52,6 +55,7 @@ proc run(ctx: ptr Context) {.thread.} = ## Trying to get a request from the libwaku main thread var request: ptr InterThreadRequest + waitFor ctx.reqSignal.wait() let recvOk = ctx.reqChannel.tryRecv(request) if recvOk == true: let resultResponse = @@ -61,9 +65,8 @@ proc run(ctx: ptr Context) {.thread.} = let threadSafeResp = InterThreadResponse.createShared(resultResponse) ## The error-handling is performed in the main thread - discard ctx.respChannel.trySend( threadSafeResp ) - - waitFor sleepAsync(1) + discard ctx.respChannel.trySend(threadSafeResp) + discard ctx.respSignal.fireSync() tearDownForeignThreadGc() @@ -74,6 +77,10 @@ proc createWakuThread*(): Result[void, string] = waku_init() ctx = createShared(Context, 1) + ctx.reqSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create reqSignal ThreadSignalPtr") + ctx.respSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create respSignal ThreadSignalPtr") running.store(true) @@ -90,6 +97,8 @@ proc createWakuThread*(): Result[void, string] = proc stopWakuNodeThread*() = running.store(false) joinThread(ctx.thread) + discard ctx.reqSignal.close() + discard ctx.respSignal.close() freeShared(ctx) proc sendRequestToWakuThread*(reqType: RequestType, @@ -102,12 +111,20 @@ proc sendRequestToWakuThread*(reqType: RequestType, if not sentOk: return err("Couldn't send a request to the waku thread: " & $req[]) + let fireSyncRes = ctx.reqSignal.fireSync() + if fireSyncRes.isErr(): + return err("failed fireSync: " & $fireSyncRes.error) + + if fireSyncRes.get() == false: + return err("Couldn't fireSync in time") + ## Waiting for the response + waitFor ctx.respSignal.wait() + var response: ptr InterThreadResponse var recvOk = ctx.respChannel.tryRecv(response) - while recvOk == false: - recvOk = ctx.respChannel.tryRecv(response) - os.sleep(1) + if recvOk == false: + return err("Couldn't receive response from the waku thread: " & $req[]) ## Converting the thread-safe response into a managed/CG'ed `Result` return InterThreadResponse.process(response)