From 4bf4c176b17f858f6ec4df1bc119d1a88fa73e8d Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 6 Feb 2025 13:11:38 +0200 Subject: [PATCH] initial integration of mp channel --- library/waku_thread/waku_thread.nim | 62 +++++++---------------------- 1 file changed, 15 insertions(+), 47 deletions(-) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 4e8019b08..705e56477 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -6,13 +6,11 @@ import std/[options, atomics, os, net] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types +const MaxChannelItems = 1000 + type WakuContext* = object thread: Thread[(ptr WakuContext)] - reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest] - reqSignal: ThreadSignalPtr - # to inform The Waku Thread (a.k.a TWT) that a new request is sent - reqReceivedSignal: ThreadSignalPtr - # to inform the main thread that the request is rx by TWT + reqChannel: Channel[ptr WakuThreadRequest] userData*: pointer eventCallback*: pointer eventUserdata*: pointer @@ -28,21 +26,16 @@ proc runWaku(ctx: ptr WakuContext) {.async.} = var waku: Waku while true: - await ctx.reqSignal.wait() - if ctx.running.load == false: break - ## Trying to get a request from the libwaku requestor thread var request: ptr WakuThreadRequest - let recvOk = ctx.reqChannel.tryRecv(request) - if not recvOk: - error "waku thread could not receive a request" - continue - - let fireRes = ctx.reqReceivedSignal.fireSync() - if fireRes.isErr(): - error "could not fireSync back to requester thread", error = fireRes.error + try: + echo "----------- before receive" + request = ctx.reqChannel.recv() + echo "----------- after receive" + except Exception: + error "exception trying to receive a request" ## Handle the request asyncSpawn WakuThreadRequest.process(request, addr waku) @@ -55,10 +48,7 @@ proc createWakuThread*(): Result[ptr WakuContext, string] = ## This proc is called from the main thread and it creates ## the Waku working thread. var ctx = createShared(WakuContext, 1) - ctx.reqSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create reqSignal ThreadSignalPtr") - ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create reqReceivedSignal ThreadSignalPtr") + ctx.reqChannel.open(MaxChannelItems) ctx.running.store(true) @@ -75,14 +65,8 @@ proc createWakuThread*(): Result[ptr WakuContext, string] = proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] = ctx.running.store(false) - let signaledOnTime = ctx.reqSignal.fireSync().valueOr: - return err("error in destroyWakuThread: " & $error) - if not signaledOnTime: - return err("failed to signal reqSignal on time in destroyWakuThread") - joinThread(ctx.thread) - ?ctx.reqSignal.close() - ?ctx.reqReceivedSignal.close() + ctx.reqChannel.close() freeShared(ctx) return ok() @@ -95,26 +79,10 @@ proc sendRequestToWakuThread*( userData: pointer, ): Result[void, string] = let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData) - ## Sending the request - let sentOk = ctx.reqChannel.trySend(req) - if not sentOk: - deallocShared(req) - return err("Couldn't send a request to the waku thread: " & $req[]) - - let fireSyncRes = ctx.reqSignal.fireSync() - if fireSyncRes.isErr(): - deallocShared(req) - return err("failed fireSync: " & $fireSyncRes.error) - - if fireSyncRes.get() == false: - deallocShared(req) - return err("Couldn't fireSync in time") - - ## wait until the Waku Thread properly received the request - let res = ctx.reqReceivedSignal.waitSync() - if res.isErr(): - deallocShared(req) - return err("Couldn't receive reqReceivedSignal signal") + ## Sending the request, blocks if the channel is full + echo "--------- before send reqType: ", $RequestType + ctx.reqChannel.send(req) + echo "--------- after reqType: ", $RequestType ## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the ## process proc.