diff --git a/src/apatheia/queues.nim b/src/apatheia/queues.nim index 85707d6..f7aa638 100644 --- a/src/apatheia/queues.nim +++ b/src/apatheia/queues.nim @@ -1,4 +1,4 @@ - +import std/options import ./types import chronos @@ -6,6 +6,7 @@ import results import chronos/threadsync export types +export options type ChanPtr[T] = ptr Channel[T] @@ -24,5 +25,19 @@ proc new*[T](tp: typedesc[AsyncQueue[T]]): AsyncQueue[T] {.raises: [ApatheiaSign raise newException(ApatheiaSignalErr, msg: res.err()) result.signal = res.get() result.chan = allocSharedChannel() - + +proc send*[T](c: AsyncQueue[T], msg: sink T) {.inline.} = + ## Sends a message to a thread. `msg` is copied. + c.chan.send(msg) + +proc trySend*[T](c: AsyncQueue[T], msg: sink T): bool {.inline.} = + c.chan.trySend(msg) + +proc recv*[T](c: AsyncQueue[T]): T = + c.chan.recv() + +proc tryRecv*[T](c: AsyncQueue[T]): Option[T] = + let res = c.chan.recv() + if res.dataAvailable: + some res.msg