rename queue

This commit is contained in:
Jaremy Creechley 2024-02-09 21:44:07 -07:00
parent ac83bb60e6
commit a7465c753f
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
3 changed files with 28 additions and 19 deletions

View File

@ -1,12 +1,15 @@
import std/options
import ./types
import results
import chronos
import results
import chronos/threadsync
export types
export options
export threadsync
export chronos
type
ChanPtr[T] = ptr Channel[T]
@ -22,16 +25,24 @@ type
proc newSignalQueue*[T](): SignalQueue[T] {.raises: [ApatheiaSignalErr].} =
let res = ThreadSignalPtr.new()
if res.isErr():
raise newException(ApatheiaSignalErr, msg: res.err())
raise newException(ApatheiaSignalErr, res.error())
result.signal = res.get()
result.chan = allocSharedChannel()
result.chan = allocSharedChannel[T]()
proc send*[T](c: SignalQueue[T], msg: sink T) {.inline.} =
proc send*[T](c: SignalQueue[T], msg: sink T): Result[void, string] {.raises: [].} =
## Sends a message to a thread. `msg` is copied.
c.chan.send(msg)
c.signal.fireSync()
try:
c.chan[].send(msg)
except Exception as exc:
result = err exc.msg
proc trySend*[T](c: SignalQueue[T], msg: sink T): bool {.inline.} =
let res = c.signal.fireSync()
if res.isErr():
let msg: string = res.error()
result = err msg
result = ok()
proc trySend*[T](c: SignalQueue[T], msg: sink T): bool =
result = c.chan.trySend(msg)
if result:
c.signal.fireSync()
@ -44,3 +55,5 @@ proc tryRecv*[T](c: SignalQueue[T]): Option[T] =
if res.dataAvailable:
some res.msg
proc wait*[T](c: SignalQueue[T]) {.async.} =
await wait(c.signal)

View File

@ -1,4 +1,4 @@
type
ApatheiaException* = ref object of CatchableError
ApatheiaSignalErr* = ref object of ApatheiaException
ApatheiaException* = object of CatchableError
ApatheiaSignalErr* = object of ApatheiaException

View File

@ -15,27 +15,23 @@ type
doneSig: ThreadSignalPtr
value: float
proc addNums(a, b: float, ret: AsyncQueue[float]) =
ret.value = a + b
proc addNums(a, b: float, queue: SignalQueue[float]) =
os.sleep(500)
let res = ret.doneSig.fireSync().get()
if not res:
echo "ERROR FIRING!"
discard queue.send(a + b)
suite "async tests":
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
var queue = newAsyncQueue[float]()
var queue = newSignalQueue[float]()
asyncTest "test":
var args = ThreadArg()
args.doneSig = ThreadSignalPtr.new().get()
tp.spawn addNums(1, 2, addr args)
tp.spawn addNums(1.0, 2.0, queue)
# await sleepAsync(100.milliseconds)
await wait(args.doneSig).wait(1500.milliseconds)
await wait(queue).wait(1500.milliseconds)
echo "\nRES: ", args.value
# echo "\nRES: ", args.value
check true