diff --git a/src/apatheia/jobs.nim b/src/apatheia/jobs.nim index a3bc3e0..f5e4ae2 100644 --- a/src/apatheia/jobs.nim +++ b/src/apatheia/jobs.nim @@ -1,6 +1,10 @@ +import std/tables +import std/macros + import ./queues import taskpools +import chronos export queues @@ -11,12 +15,47 @@ export queues type JobQueue*[T] = object - queue*: SignalQueue[T] + queue*: SignalQueue[(uint, T)] + futures*: Table[uint, Future[T]] taskpool*: Taskpool + running*: bool + +proc processJobs*(jobs: JobQueue) {.async.} = + while jobs.running: + echo "jobs running..." + let res = jobs.queue.wait() + echo "jobs result: ", res.repr proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} = - JobQueue[T](queue: newSignalQueue[T](maxItems), taskpool: taskpool) + result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool) + asyncSpawn(processJobs(result)) -template submit*[T](jobs: JobQueue[T], exp: typed): Future[T] = - jobs.taskpool.spawn(exp) - await wait(jobs.queue) +macro submitMacro*(tp: untyped, jobs: untyped, exp: untyped): untyped = + + echo "submit:::" + echo "submit:T: ", tp.treeRepr + echo "submit: ", exp.treerepr + + let futName = genSym(nskLet, "fut") + let idName = genSym(nskLet, "id") + let queueName = genSym(nskLet, "queue") + var fncall = nnkCall.newTree(exp[0]) + fncall.add(queueName) + fncall.add(idName) + for p in exp[1..^1]: + fncall.add(p) + echo "submit: ", fncall.treeRepr + + result = quote do: + let `queueName` = jobs.queue + let `futName` = newFuture[`tp`](astToStr(`exp`)) + let `idName` = `futName`.id() + `jobs`.futures[`idName`] = `futName` + `jobs`.taskpool.spawn(`fncall`) + `futName` + + echo "submit: res:\n", result.repr + echo "" + +template submit*[T](jobs: JobQueue[T], exp: untyped): Future[T] = + submitMacro(T, jobs, exp) \ No newline at end of file diff --git a/tests/tjobs.nim b/tests/tjobs.nim index 6bcab8a..c559e44 100644 --- a/tests/tjobs.nim +++ b/tests/tjobs.nim @@ -16,9 +16,9 @@ proc addNumsRaw(a, b: float): float = echo "adding: ", a, " + ", b return a + b -proc addNums(queue: SignalQueue[float], a, b: float) = +proc addNums(queue: SignalQueue[(uint, float)], id: uint, a, b: float) = let res = addNumsRaw(a, b) - discard queue.send(res) + discard queue.send((id, res,)) suite "async tests": @@ -29,7 +29,7 @@ suite "async tests": asyncTest "test": echo "\nstart" - let res = await jobs.submit(addNums(jobs.queue, 1.0, 2.0)) + let res = await jobs.submit(addNums(1.0, 2.0,)) # await sleepAsync(100.milliseconds) echo "result: ", res.repr