waku_thread.nim: Using 'ThreadSignalPtr' instead of loop to handle req/resp (#2045)

This commit is contained in:
Ivan Folgueira Bande 2023-09-22 16:36:31 +02:00 committed by GitHub
parent 3e72e83067
commit 6cea737109
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 25 additions and 8 deletions

View File

@ -8,9 +8,10 @@ import
import import
chronicles, chronicles,
chronos, chronos,
chronos/threadsync,
taskpools/channels_spsc_single,
stew/results, stew/results,
stew/shims/net, stew/shims/net
taskpools/channels_spsc_single
import import
../../../waku/node/waku_node, ../../../waku/node/waku_node,
../events/[json_error_event,json_message_event,json_base_event], ../events/[json_error_event,json_message_event,json_base_event],
@ -21,7 +22,9 @@ type
Context* = object Context* = object
thread: Thread[(ptr Context)] thread: Thread[(ptr Context)]
reqChannel: ChannelSPSCSingle[ptr InterThreadRequest] reqChannel: ChannelSPSCSingle[ptr InterThreadRequest]
reqSignal: ThreadSignalPtr
respChannel: ChannelSPSCSingle[ptr InterThreadResponse] respChannel: ChannelSPSCSingle[ptr InterThreadResponse]
respSignal: ThreadSignalPtr
var ctx {.threadvar.}: ptr Context var ctx {.threadvar.}: ptr Context
@ -52,6 +55,7 @@ proc run(ctx: ptr Context) {.thread.} =
## Trying to get a request from the libwaku main thread ## Trying to get a request from the libwaku main thread
var request: ptr InterThreadRequest var request: ptr InterThreadRequest
waitFor ctx.reqSignal.wait()
let recvOk = ctx.reqChannel.tryRecv(request) let recvOk = ctx.reqChannel.tryRecv(request)
if recvOk == true: if recvOk == true:
let resultResponse = let resultResponse =
@ -61,9 +65,8 @@ proc run(ctx: ptr Context) {.thread.} =
let threadSafeResp = InterThreadResponse.createShared(resultResponse) let threadSafeResp = InterThreadResponse.createShared(resultResponse)
## The error-handling is performed in the main thread ## The error-handling is performed in the main thread
discard ctx.respChannel.trySend( threadSafeResp ) discard ctx.respChannel.trySend(threadSafeResp)
discard ctx.respSignal.fireSync()
waitFor sleepAsync(1)
tearDownForeignThreadGc() tearDownForeignThreadGc()
@ -74,6 +77,10 @@ proc createWakuThread*(): Result[void, string] =
waku_init() waku_init()
ctx = createShared(Context, 1) ctx = createShared(Context, 1)
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr")
ctx.respSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create respSignal ThreadSignalPtr")
running.store(true) running.store(true)
@ -90,6 +97,8 @@ proc createWakuThread*(): Result[void, string] =
proc stopWakuNodeThread*() = proc stopWakuNodeThread*() =
running.store(false) running.store(false)
joinThread(ctx.thread) joinThread(ctx.thread)
discard ctx.reqSignal.close()
discard ctx.respSignal.close()
freeShared(ctx) freeShared(ctx)
proc sendRequestToWakuThread*(reqType: RequestType, proc sendRequestToWakuThread*(reqType: RequestType,
@ -102,12 +111,20 @@ proc sendRequestToWakuThread*(reqType: RequestType,
if not sentOk: if not sentOk:
return err("Couldn't send a request to the waku thread: " & $req[]) return err("Couldn't send a request to the waku thread: " & $req[])
let fireSyncRes = ctx.reqSignal.fireSync()
if fireSyncRes.isErr():
return err("failed fireSync: " & $fireSyncRes.error)
if fireSyncRes.get() == false:
return err("Couldn't fireSync in time")
## Waiting for the response ## Waiting for the response
waitFor ctx.respSignal.wait()
var response: ptr InterThreadResponse var response: ptr InterThreadResponse
var recvOk = ctx.respChannel.tryRecv(response) var recvOk = ctx.respChannel.tryRecv(response)
while recvOk == false: if recvOk == false:
recvOk = ctx.respChannel.tryRecv(response) return err("Couldn't receive response from the waku thread: " & $req[])
os.sleep(1)
## Converting the thread-safe response into a managed/CG'ed `Result` ## Converting the thread-safe response into a managed/CG'ed `Result`
return InterThreadResponse.process(response) return InterThreadResponse.process(response)