diff --git a/src/apatheia/queues.nim b/src/apatheia/queues.nim index 102d782..a083897 100644 --- a/src/apatheia/queues.nim +++ b/src/apatheia/queues.nim @@ -1,12 +1,15 @@ import std/options import ./types +import results import chronos import results import chronos/threadsync export types export options +export threadsync +export chronos type ChanPtr[T] = ptr Channel[T] @@ -22,16 +25,24 @@ type proc newSignalQueue*[T](): SignalQueue[T] {.raises: [ApatheiaSignalErr].} = let res = ThreadSignalPtr.new() if res.isErr(): - raise newException(ApatheiaSignalErr, msg: res.err()) + raise newException(ApatheiaSignalErr, res.error()) result.signal = res.get() - result.chan = allocSharedChannel() + result.chan = allocSharedChannel[T]() -proc send*[T](c: SignalQueue[T], msg: sink T) {.inline.} = +proc send*[T](c: SignalQueue[T], msg: sink T): Result[void, string] {.raises: [].} = ## Sends a message to a thread. `msg` is copied. - c.chan.send(msg) - c.signal.fireSync() + try: + c.chan[].send(msg) + except Exception as exc: + result = err exc.msg -proc trySend*[T](c: SignalQueue[T], msg: sink T): bool {.inline.} = + let res = c.signal.fireSync() + if res.isErr(): + let msg: string = res.error() + result = err msg + result = ok() + +proc trySend*[T](c: SignalQueue[T], msg: sink T): bool = result = c.chan.trySend(msg) if result: c.signal.fireSync() @@ -44,3 +55,5 @@ proc tryRecv*[T](c: SignalQueue[T]): Option[T] = if res.dataAvailable: some res.msg +proc wait*[T](c: SignalQueue[T]) {.async.} = + await wait(c.signal) diff --git a/src/apatheia/types.nim b/src/apatheia/types.nim index a2a845b..6e156fd 100644 --- a/src/apatheia/types.nim +++ b/src/apatheia/types.nim @@ -1,4 +1,4 @@ type - ApatheiaException* = ref object of CatchableError - ApatheiaSignalErr* = ref object of ApatheiaException + ApatheiaException* = object of CatchableError + ApatheiaSignalErr* = object of ApatheiaException diff --git a/tests/tasyncsEx2.nim b/tests/tasyncsEx2.nim index 4694cc5..edf70c1 100644 --- a/tests/tasyncsEx2.nim +++ b/tests/tasyncsEx2.nim @@ -15,27 +15,23 @@ type doneSig: ThreadSignalPtr value: float -proc addNums(a, b: float, ret: AsyncQueue[float]) = - ret.value = a + b +proc addNums(a, b: float, queue: SignalQueue[float]) = os.sleep(500) - let res = ret.doneSig.fireSync().get() - if not res: - echo "ERROR FIRING!" + discard queue.send(a + b) suite "async tests": var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads. - var queue = newAsyncQueue[float]() + var queue = newSignalQueue[float]() asyncTest "test": - var args = ThreadArg() - args.doneSig = ThreadSignalPtr.new().get() - tp.spawn addNums(1, 2, addr args) + tp.spawn addNums(1.0, 2.0, queue) + # await sleepAsync(100.milliseconds) - await wait(args.doneSig).wait(1500.milliseconds) + await wait(queue).wait(1500.milliseconds) - echo "\nRES: ", args.value + # echo "\nRES: ", args.value check true