nwaku/library/waku_thread/waku_thread.nim
Ivan FB 1e8f577104
chore(cbindings): avoid using global var in libwaku.nim (#2118)
* libwaku: Avoid global variable and changing callback signature

* Better signature for the callback. Two new parameters have been added:
  one aimed to allow passing the caller result code; the other
  param is to pass an optional userData pointer that might need
  to be linked locally with the Context object. For example, this is needed
  in Rust to make the passed closures live as
  long as the Context.

* waku_example.c: adaptation to the latest changes

* libwaku.h: removing 'waku_set_user_data' function

* libwaku.nim: renaming parameter in WakuCallBack (isOk -> callerRet)
2023-10-23 08:37:28 +02:00

131 lines
4.0 KiB
Nim

{.pragma: exported, exportc, cdecl, raises: [].}
{.pragma: callback, cdecl, raises: [], gcsafe.}
{.passc: "-fPIC".}
import
std/[json,sequtils,times,strformat,options,atomics,strutils,os]
import
chronicles,
chronos,
chronos/threadsync,
taskpools/channels_spsc_single,
stew/results,
stew/shims/net
import
../../../waku/node/waku_node,
../events/[json_error_event,json_message_event,json_base_event],
./inter_thread_communication/waku_thread_request,
./inter_thread_communication/waku_thread_response
type
Context* = object
thread: Thread[(ptr Context)]
reqChannel: ChannelSPSCSingle[ptr InterThreadRequest]
reqSignal: ThreadSignalPtr
respChannel: ChannelSPSCSingle[ptr InterThreadResponse]
respSignal: ThreadSignalPtr
userData*: pointer
# To control when the thread is running
var running: Atomic[bool]
# Every Nim library must have this function called - the name is derived from
# the `--nimMainPrefix` command line option
proc NimMain() {.importc.}
var initialized: Atomic[bool]
proc waku_init() =
if not initialized.exchange(true):
NimMain() # Every Nim library needs to call `NimMain` once exactly
when declared(setupForeignThreadGc): setupForeignThreadGc()
when declared(nimGC_setStackBottom):
var locals {.volatile, noinit.}: pointer
locals = addr(locals)
nimGC_setStackBottom(locals)
proc run(ctx: ptr Context) {.thread.} =
## This is the worker thread body. This thread runs the Waku node
## and attends library user requests (stop, connect_to, etc.)
var node: WakuNode
while running.load == true:
## Trying to get a request from the libwaku main thread
var request: ptr InterThreadRequest
waitFor ctx.reqSignal.wait()
let recvOk = ctx.reqChannel.tryRecv(request)
if recvOk == true:
let resultResponse =
waitFor InterThreadRequest.process(request, addr node)
## Converting a `Result` into a thread-safe transferable response type
let threadSafeResp = InterThreadResponse.createShared(resultResponse)
## The error-handling is performed in the main thread
discard ctx.respChannel.trySend(threadSafeResp)
discard ctx.respSignal.fireSync()
tearDownForeignThreadGc()
proc createWakuThread*(): Result[ptr Context, string] =
## This proc is called from the main thread and it creates
## the Waku working thread.
waku_init()
var ctx = createShared(Context, 1)
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr")
ctx.respSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create respSignal ThreadSignalPtr")
running.store(true)
try:
createThread(ctx.thread, run, ctx)
except ValueError, ResourceExhaustedError:
# and freeShared for typed allocations!
freeShared(ctx)
return err("failed to create the Waku thread: " & getCurrentExceptionMsg())
return ok(ctx)
proc stopWakuNodeThread*(ctx: ptr Context) =
running.store(false)
joinThread(ctx.thread)
discard ctx.reqSignal.close()
discard ctx.respSignal.close()
freeShared(ctx)
proc sendRequestToWakuThread*(ctx: ptr Context,
reqType: RequestType,
reqContent: pointer): Result[string, string] =
let req = InterThreadRequest.createShared(reqType, reqContent)
## Sending the request
let sentOk = ctx.reqChannel.trySend(req)
if not sentOk:
return err("Couldn't send a request to the waku thread: " & $req[])
let fireSyncRes = ctx.reqSignal.fireSync()
if fireSyncRes.isErr():
return err("failed fireSync: " & $fireSyncRes.error)
if fireSyncRes.get() == false:
return err("Couldn't fireSync in time")
## Waiting for the response
waitFor ctx.respSignal.wait()
var response: ptr InterThreadResponse
var recvOk = ctx.respChannel.tryRecv(response)
if recvOk == false:
return err("Couldn't receive response from the waku thread: " & $req[])
## Converting the thread-safe response into a managed/CG'ed `Result`
return InterThreadResponse.process(response)