diff --git a/src/apatheia/queues.nim b/src/apatheia/queues.nim index 95a2914..102d782 100644 --- a/src/apatheia/queues.nim +++ b/src/apatheia/queues.nim @@ -15,31 +15,31 @@ proc allocSharedChannel[T](): ChanPtr[T] = cast[ChanPtr[T]](allocShared0(sizeof(Channel[T]))) type - AsyncQueue*[T] = object + SignalQueue*[T] = object signal: ThreadSignalPtr chan*: ChanPtr[T] -proc new*[T](tp: typedesc[AsyncQueue[T]]): AsyncQueue[T] {.raises: [ApatheiaSignalErr].} = +proc newSignalQueue*[T](): SignalQueue[T] {.raises: [ApatheiaSignalErr].} = let res = ThreadSignalPtr.new() if res.isErr(): raise newException(ApatheiaSignalErr, msg: res.err()) result.signal = res.get() result.chan = allocSharedChannel() -proc send*[T](c: AsyncQueue[T], msg: sink T) {.inline.} = +proc send*[T](c: SignalQueue[T], msg: sink T) {.inline.} = ## Sends a message to a thread. `msg` is copied. c.chan.send(msg) c.signal.fireSync() -proc trySend*[T](c: AsyncQueue[T], msg: sink T): bool {.inline.} = +proc trySend*[T](c: SignalQueue[T], msg: sink T): bool {.inline.} = result = c.chan.trySend(msg) if result: c.signal.fireSync() -proc recv*[T](c: AsyncQueue[T]): T = +proc recv*[T](c: SignalQueue[T]): T = c.chan.recv() -proc tryRecv*[T](c: AsyncQueue[T]): Option[T] = +proc tryRecv*[T](c: SignalQueue[T]): Option[T] = let res = c.chan.recv() if res.dataAvailable: some res.msg diff --git a/tests/tasyncsEx2.nim b/tests/tasyncsEx2.nim index d5a866c..4694cc5 100644 --- a/tests/tasyncsEx2.nim +++ b/tests/tasyncsEx2.nim @@ -5,6 +5,8 @@ import chronos/threadsync import chronos/unittest2/asynctests import taskpools +import apatheia/queues + ## todo: setup basic async + threadsignal + taskpools example here ## @@ -13,7 +15,7 @@ type doneSig: ThreadSignalPtr value: float -proc addNums(a, b: float, ret: ptr ThreadArg) = +proc addNums(a, b: float, ret: AsyncQueue[float]) = ret.value = a + b os.sleep(500) let res = ret.doneSig.fireSync().get() @@ -23,6 +25,7 @@ proc addNums(a, b: float, ret: ptr ThreadArg) = suite "async tests": var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads. + var queue = newAsyncQueue[float]() asyncTest "test": var args = ThreadArg() diff --git a/tests/tpoolsEx1.nim b/tests/tpoolsEx1.nim index e7b54c7..9678de4 100644 --- a/tests/tpoolsEx1.nim +++ b/tests/tpoolsEx1.nim @@ -5,64 +5,29 @@ import taskpools # From https://github.com/nim-lang/Nim/blob/v1.6.2/tests/parallel/tpi.nim # Leibniz Formula https://en.wikipedia.org/wiki/Leibniz_formula_for_%CF%80 -proc term(k: openArray[float]): float = - for n in k: - result += n +proc term(k: int): float = + if k mod 2 == 1: + -4'f / float(2*k + 1) + else: + 4'f / float(2*k + 1) proc piApprox(tp: Taskpool, n: int): float = - var args = newSeq[float]() - for i in 0..n: - args[i] = i.toFloat() - result = tp.spawn term(args) # Schedule a task on the threadpool a return a handle to retrieve the result. + var pendingFuts = newSeq[FlowVar[float]](n) + for k in 0 ..< pendingFuts.len: + pendingFuts[k] = tp.spawn term(k) # Schedule a task on the threadpool a return a handle to retrieve the result. + for k in 0 ..< pendingFuts.len: + result += sync pendingFuts[k] # Block until the result is available. proc main() = - var n = 10_000 + var n = 1_000 var nthreads = countProcessors() var tp = Taskpool.new(num_threads = nthreads) # Default to the number of hardware threads. - echo tp.piApprox(n) + echo formatFloat(tp.piApprox(n)) tp.syncAll() # Block until all pending tasks are processed (implied in tp.shutdown()) tp.shutdown() # Compile with nim c -r -d:release --threads:on --outdir:build example.nim main() - -when false: - let fut = newFlowVar(typeof(float)) - proc taskpool_term(k: openArray[float]; fut: Flowvar[float]) {.nimcall.} = - let res = term(k) - readyWith(fut, res) - - let taskNode = new(TaskNode, workerContext.currentTask) do: - type - ScratchObj_486539473 = object - k: seq[float] - fut: Flowvar[float] - - let scratch_486539466 = cast[ptr ScratchObj_486539473](c_calloc(csize_t(1), csize_t(16))) - if isNil(scratch_486539466): - raise (ref OutOfMemDefect)(msg: "Could not allocate memory", parent: nil) - block: - var isoTemp_486539469 = isolate(`@`(args)) - scratch_486539466.k = extract(isoTemp_486539469) - var isoTemp_486539471 = isolate(fut) - scratch_486539466.fut = extract(isoTemp_486539471) - - proc taskpool_term_486539474(args: pointer) {.gcsafe, nimcall, raises: [].} = - let objTemp_486539468 = cast[ptr ScratchObj_486539473](args) - let k_486539470 = objTemp_486539468.k - let fut_486539472 = objTemp_486539468.fut - taskpool_term(k_486539470, fut_486539472) - - proc destroyScratch_486539475(args: pointer) {.gcsafe, nimcall, - raises: [].} = - let obj_486539476 = cast[ptr ScratchObj_486539473](args) - `=destroy`(obj_486539476[]) - - Task(callback: taskpool_term_486539474, - args: scratch_486539466, - destroy: destroyScratch_486539475) - - schedule(workerContext, taskNode) \ No newline at end of file