This commit is contained in:
Jaremy Creechley 2024-02-14 22:07:35 -07:00
parent 78562d367a
commit dbaebc26fe
2 changed files with 7 additions and 5 deletions

View File

@ -35,6 +35,8 @@ proc newSignalQueue*[T](maxItems: int = 0): SignalQueue[T] {.raises: [ApatheiaSi
proc send*[T](c: SignalQueue[T], msg: sink T): Result[void, string] {.raises: [].} =
## Sends a message to a thread. `msg` is copied.
## Note: currently non-blocking but future iterations may become blocking.
##
try:
c.chan[].send(msg)
except Exception as exc:
@ -47,21 +49,26 @@ proc send*[T](c: SignalQueue[T], msg: sink T): Result[void, string] {.raises: []
result = ok()
proc trySend*[T](c: SignalQueue[T], msg: sink T): bool =
## Trys to sends a message to a thread. `msg` is copied. Non-blocking.
result = c.chan.trySend(msg)
if result:
c.signal.fireSync()
proc recv*[T](c: SignalQueue[T]): Result[T, string] =
## Receive item from queue, blocking.
try:
result = ok c.chan[].recv()
except Exception as exc:
result = err exc.msg
proc tryRecv*[T](c: SignalQueue[T]): Option[T] =
## Try to receive item from queue, non-blocking.
let res = c.chan.recv()
if res.dataAvailable:
some res.msg
proc wait*[T](c: SignalQueue[T]): Future[Result[T, string]] {.async.} =
## Async compatible receive from queue. Pauses async execution until
## an item is received from the queue
await wait(c.signal)
return c.recv()

View File

@ -9,11 +9,6 @@ import taskpools
import apatheia/queues
import apatheia/jobs
## todo: setup basic async + threadsignal + taskpools example here
##
import std/macros
proc addNumsRaw(a, b: float): float =
os.sleep(50)
return a + b