mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-03 22:43:09 +00:00
chore: supporting parallel libwaku requests (#3296)
This commit is contained in:
parent
9b55665f41
commit
6b00684ad1
@ -2,12 +2,13 @@
|
||||
{.pragma: callback, cdecl, raises: [], gcsafe.}
|
||||
{.passc: "-fPIC".}
|
||||
|
||||
import std/[options, atomics, os, net]
|
||||
import std/[options, atomics, os, net, locks]
|
||||
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
|
||||
import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types
|
||||
|
||||
type WakuContext* = object
|
||||
thread: Thread[(ptr WakuContext)]
|
||||
lock: Lock
|
||||
reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest]
|
||||
reqSignal: ThreadSignalPtr
|
||||
# to inform The Waku Thread (a.k.a TWT) that a new request is sent
|
||||
@ -59,6 +60,7 @@ proc createWakuThread*(): Result[ptr WakuContext, string] =
|
||||
return err("couldn't create reqSignal ThreadSignalPtr")
|
||||
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
|
||||
ctx.lock.initLock()
|
||||
|
||||
ctx.running.store(true)
|
||||
|
||||
@ -81,6 +83,7 @@ proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] =
|
||||
return err("failed to signal reqSignal on time in destroyWakuThread")
|
||||
|
||||
joinThread(ctx.thread)
|
||||
ctx.lock.deinitLock()
|
||||
?ctx.reqSignal.close()
|
||||
?ctx.reqReceivedSignal.close()
|
||||
freeShared(ctx)
|
||||
@ -95,6 +98,14 @@ proc sendRequestToWakuThread*(
|
||||
userData: pointer,
|
||||
): Result[void, string] =
|
||||
let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData)
|
||||
|
||||
# This lock is only necessary while we use a SP Channel and while the signalling
|
||||
# between threads assumes that there aren't concurrent requests.
|
||||
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
|
||||
# requests concurrently and spare us the need of locks
|
||||
ctx.lock.acquire()
|
||||
defer:
|
||||
ctx.lock.release()
|
||||
## Sending the request
|
||||
let sentOk = ctx.reqChannel.trySend(req)
|
||||
if not sentOk:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user