diff --git a/src/apatheia/jobs.nim b/src/apatheia/jobs.nim index 35a47f3..df83117 100644 --- a/src/apatheia/jobs.nim +++ b/src/apatheia/jobs.nim @@ -15,27 +15,34 @@ export queues ## type + JobId* = uint ## job id, should match `future.id()` + JobQueue*[T] = ref object - queue*: SignalQueue[(uint, T)] - futures*: Table[uint, Future[T]] + queue*: SignalQueue[(JobId, T)] + futures*: Table[JobId, Future[T]] taskpool*: Taskpool running*: bool + JobResult*[T] = object + id*: JobId + queue*: SignalQueue[(JobId, T)] + proc processJobs*(jobs: JobQueue) {.async.} = while jobs.running: echo "jobs running..." - let res = get await jobs.queue.wait() + let res = await(jobs.queue.wait()).get() echo "jobs result: ", res.repr echo "jobs futes: ", jobs.futures.unsafeAddr.pointer.repr, " => ", jobs.futures.keys().toSeq() let (id, ret) = res let fut = jobs.futures[id] fut.complete(ret) -proc createFuture*[T](jobs: JobQueue[T], name: static string): (uint, Future[T]) = +proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) = let fut = newFuture[T](name) - jobs.futures[fut.id()] = fut + let id = JobId fut.id() + jobs.futures[id] = fut echo "jobs added: ", jobs.futures.unsafeAddr.pointer.repr, " => ", jobs.futures.keys().toSeq() - return (fut.id(), fut, ) + return (JobResult[T](id: id, queue: jobs.queue), fut, ) proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} = result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true) @@ -45,17 +52,15 @@ macro submitMacro*(tp: untyped, jobs: untyped, exp: untyped): untyped = ## modifies the call expression to include the job queue and ## the job id parameters + let jobRes = genSym(nskLet, "jobRes") let futName = genSym(nskLet, "fut") - let idName = genSym(nskLet, "id") let nm = newLit(repr(exp)) - let queueExpr = quote do: `jobs`.queue var fncall = nnkCall.newTree(exp[0]) - fncall.add(queueExpr) - fncall.add(idName) + fncall.add(jobRes) for p in exp[1..^1]: fncall.add(p) result = quote do: - let (`idName`, `futName`) = createFuture(`jobs`, `nm`) + let (`jobRes`, `futName`) = createFuture(`jobs`, `nm`) `jobs`.taskpool.spawn(`fncall`) `futName` diff --git a/tests/tjobs.nim b/tests/tjobs.nim index 16572f9..8888d8e 100644 --- a/tests/tjobs.nim +++ b/tests/tjobs.nim @@ -16,9 +16,9 @@ proc addNumsRaw(a, b: float): float = echo "adding: ", a, " + ", b return a + b -proc addNums(queue: SignalQueue[(uint, float)], id: uint, a, b: float) = +proc addNums(jobResult: JobResult[float], a, b: float) = let res = addNumsRaw(a, b) - discard queue.send((id, res,)) + discard jobResult.queue.send((jobResult.id, res,)) suite "async tests":