rename queue

This commit is contained in:
Jaremy Creechley 2024-02-09 21:25:34 -07:00
parent b6b77a686c
commit ac83bb60e6
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
3 changed files with 22 additions and 54 deletions

View File

@ -15,31 +15,31 @@ proc allocSharedChannel[T](): ChanPtr[T] =
cast[ChanPtr[T]](allocShared0(sizeof(Channel[T])))
type
AsyncQueue*[T] = object
SignalQueue*[T] = object
signal: ThreadSignalPtr
chan*: ChanPtr[T]
proc new*[T](tp: typedesc[AsyncQueue[T]]): AsyncQueue[T] {.raises: [ApatheiaSignalErr].} =
proc newSignalQueue*[T](): SignalQueue[T] {.raises: [ApatheiaSignalErr].} =
let res = ThreadSignalPtr.new()
if res.isErr():
raise newException(ApatheiaSignalErr, msg: res.err())
result.signal = res.get()
result.chan = allocSharedChannel()
proc send*[T](c: AsyncQueue[T], msg: sink T) {.inline.} =
proc send*[T](c: SignalQueue[T], msg: sink T) {.inline.} =
## Sends a message to a thread. `msg` is copied.
c.chan.send(msg)
c.signal.fireSync()
proc trySend*[T](c: AsyncQueue[T], msg: sink T): bool {.inline.} =
proc trySend*[T](c: SignalQueue[T], msg: sink T): bool {.inline.} =
result = c.chan.trySend(msg)
if result:
c.signal.fireSync()
proc recv*[T](c: AsyncQueue[T]): T =
proc recv*[T](c: SignalQueue[T]): T =
c.chan.recv()
proc tryRecv*[T](c: AsyncQueue[T]): Option[T] =
proc tryRecv*[T](c: SignalQueue[T]): Option[T] =
let res = c.chan.recv()
if res.dataAvailable:
some res.msg

View File

@ -5,6 +5,8 @@ import chronos/threadsync
import chronos/unittest2/asynctests
import taskpools
import apatheia/queues
## todo: setup basic async + threadsignal + taskpools example here
##
@ -13,7 +15,7 @@ type
doneSig: ThreadSignalPtr
value: float
proc addNums(a, b: float, ret: ptr ThreadArg) =
proc addNums(a, b: float, ret: AsyncQueue[float]) =
ret.value = a + b
os.sleep(500)
let res = ret.doneSig.fireSync().get()
@ -23,6 +25,7 @@ proc addNums(a, b: float, ret: ptr ThreadArg) =
suite "async tests":
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
var queue = newAsyncQueue[float]()
asyncTest "test":
var args = ThreadArg()

View File

@ -5,64 +5,29 @@ import taskpools
# From https://github.com/nim-lang/Nim/blob/v1.6.2/tests/parallel/tpi.nim
# Leibniz Formula https://en.wikipedia.org/wiki/Leibniz_formula_for_%CF%80
proc term(k: openArray[float]): float =
for n in k:
result += n
proc term(k: int): float =
if k mod 2 == 1:
-4'f / float(2*k + 1)
else:
4'f / float(2*k + 1)
proc piApprox(tp: Taskpool, n: int): float =
var args = newSeq[float]()
for i in 0..n:
args[i] = i.toFloat()
result = tp.spawn term(args) # Schedule a task on the threadpool a return a handle to retrieve the result.
var pendingFuts = newSeq[FlowVar[float]](n)
for k in 0 ..< pendingFuts.len:
pendingFuts[k] = tp.spawn term(k) # Schedule a task on the threadpool a return a handle to retrieve the result.
for k in 0 ..< pendingFuts.len:
result += sync pendingFuts[k] # Block until the result is available.
proc main() =
var n = 10_000
var n = 1_000
var nthreads = countProcessors()
var tp = Taskpool.new(num_threads = nthreads) # Default to the number of hardware threads.
echo tp.piApprox(n)
echo formatFloat(tp.piApprox(n))
tp.syncAll() # Block until all pending tasks are processed (implied in tp.shutdown())
tp.shutdown()
# Compile with nim c -r -d:release --threads:on --outdir:build example.nim
main()
when false:
let fut = newFlowVar(typeof(float))
proc taskpool_term(k: openArray[float]; fut: Flowvar[float]) {.nimcall.} =
let res = term(k)
readyWith(fut, res)
let taskNode = new(TaskNode, workerContext.currentTask) do:
type
ScratchObj_486539473 = object
k: seq[float]
fut: Flowvar[float]
let scratch_486539466 = cast[ptr ScratchObj_486539473](c_calloc(csize_t(1), csize_t(16)))
if isNil(scratch_486539466):
raise (ref OutOfMemDefect)(msg: "Could not allocate memory", parent: nil)
block:
var isoTemp_486539469 = isolate(`@`(args))
scratch_486539466.k = extract(isoTemp_486539469)
var isoTemp_486539471 = isolate(fut)
scratch_486539466.fut = extract(isoTemp_486539471)
proc taskpool_term_486539474(args: pointer) {.gcsafe, nimcall, raises: [].} =
let objTemp_486539468 = cast[ptr ScratchObj_486539473](args)
let k_486539470 = objTemp_486539468.k
let fut_486539472 = objTemp_486539468.fut
taskpool_term(k_486539470, fut_486539472)
proc destroyScratch_486539475(args: pointer) {.gcsafe, nimcall,
raises: [].} =
let obj_486539476 = cast[ptr ScratchObj_486539473](args)
`=destroy`(obj_486539476[])
Task(callback: taskpool_term_486539474,
args: scratch_486539466,
destroy: destroyScratch_486539475)
schedule(workerContext, taskNode)