mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 23:43:07 +00:00
initial integration of mp channel
This commit is contained in:
parent
a117143ca1
commit
4bf4c176b1
@ -6,13 +6,11 @@ import std/[options, atomics, os, net]
|
|||||||
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
|
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
|
||||||
import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types
|
import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types
|
||||||
|
|
||||||
|
const MaxChannelItems = 1000
|
||||||
|
|
||||||
type WakuContext* = object
|
type WakuContext* = object
|
||||||
thread: Thread[(ptr WakuContext)]
|
thread: Thread[(ptr WakuContext)]
|
||||||
reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest]
|
reqChannel: Channel[ptr WakuThreadRequest]
|
||||||
reqSignal: ThreadSignalPtr
|
|
||||||
# to inform The Waku Thread (a.k.a TWT) that a new request is sent
|
|
||||||
reqReceivedSignal: ThreadSignalPtr
|
|
||||||
# to inform the main thread that the request is rx by TWT
|
|
||||||
userData*: pointer
|
userData*: pointer
|
||||||
eventCallback*: pointer
|
eventCallback*: pointer
|
||||||
eventUserdata*: pointer
|
eventUserdata*: pointer
|
||||||
@ -28,21 +26,16 @@ proc runWaku(ctx: ptr WakuContext) {.async.} =
|
|||||||
var waku: Waku
|
var waku: Waku
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
await ctx.reqSignal.wait()
|
|
||||||
|
|
||||||
if ctx.running.load == false:
|
if ctx.running.load == false:
|
||||||
break
|
break
|
||||||
|
|
||||||
## Trying to get a request from the libwaku requestor thread
|
|
||||||
var request: ptr WakuThreadRequest
|
var request: ptr WakuThreadRequest
|
||||||
let recvOk = ctx.reqChannel.tryRecv(request)
|
try:
|
||||||
if not recvOk:
|
echo "----------- before receive"
|
||||||
error "waku thread could not receive a request"
|
request = ctx.reqChannel.recv()
|
||||||
continue
|
echo "----------- after receive"
|
||||||
|
except Exception:
|
||||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
error "exception trying to receive a request"
|
||||||
if fireRes.isErr():
|
|
||||||
error "could not fireSync back to requester thread", error = fireRes.error
|
|
||||||
|
|
||||||
## Handle the request
|
## Handle the request
|
||||||
asyncSpawn WakuThreadRequest.process(request, addr waku)
|
asyncSpawn WakuThreadRequest.process(request, addr waku)
|
||||||
@ -55,10 +48,7 @@ proc createWakuThread*(): Result[ptr WakuContext, string] =
|
|||||||
## This proc is called from the main thread and it creates
|
## This proc is called from the main thread and it creates
|
||||||
## the Waku working thread.
|
## the Waku working thread.
|
||||||
var ctx = createShared(WakuContext, 1)
|
var ctx = createShared(WakuContext, 1)
|
||||||
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
|
ctx.reqChannel.open(MaxChannelItems)
|
||||||
return err("couldn't create reqSignal ThreadSignalPtr")
|
|
||||||
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
|
|
||||||
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
|
|
||||||
|
|
||||||
ctx.running.store(true)
|
ctx.running.store(true)
|
||||||
|
|
||||||
@ -75,14 +65,8 @@ proc createWakuThread*(): Result[ptr WakuContext, string] =
|
|||||||
proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] =
|
proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] =
|
||||||
ctx.running.store(false)
|
ctx.running.store(false)
|
||||||
|
|
||||||
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
|
|
||||||
return err("error in destroyWakuThread: " & $error)
|
|
||||||
if not signaledOnTime:
|
|
||||||
return err("failed to signal reqSignal on time in destroyWakuThread")
|
|
||||||
|
|
||||||
joinThread(ctx.thread)
|
joinThread(ctx.thread)
|
||||||
?ctx.reqSignal.close()
|
ctx.reqChannel.close()
|
||||||
?ctx.reqReceivedSignal.close()
|
|
||||||
freeShared(ctx)
|
freeShared(ctx)
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
@ -95,26 +79,10 @@ proc sendRequestToWakuThread*(
|
|||||||
userData: pointer,
|
userData: pointer,
|
||||||
): Result[void, string] =
|
): Result[void, string] =
|
||||||
let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData)
|
let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData)
|
||||||
## Sending the request
|
## Sending the request, blocks if the channel is full
|
||||||
let sentOk = ctx.reqChannel.trySend(req)
|
echo "--------- before send reqType: ", $RequestType
|
||||||
if not sentOk:
|
ctx.reqChannel.send(req)
|
||||||
deallocShared(req)
|
echo "--------- after reqType: ", $RequestType
|
||||||
return err("Couldn't send a request to the waku thread: " & $req[])
|
|
||||||
|
|
||||||
let fireSyncRes = ctx.reqSignal.fireSync()
|
|
||||||
if fireSyncRes.isErr():
|
|
||||||
deallocShared(req)
|
|
||||||
return err("failed fireSync: " & $fireSyncRes.error)
|
|
||||||
|
|
||||||
if fireSyncRes.get() == false:
|
|
||||||
deallocShared(req)
|
|
||||||
return err("Couldn't fireSync in time")
|
|
||||||
|
|
||||||
## wait until the Waku Thread properly received the request
|
|
||||||
let res = ctx.reqReceivedSignal.waitSync()
|
|
||||||
if res.isErr():
|
|
||||||
deallocShared(req)
|
|
||||||
return err("Couldn't receive reqReceivedSignal signal")
|
|
||||||
|
|
||||||
## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the
|
## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the
|
||||||
## process proc.
|
## process proc.
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user