From d75c2d8a1942039fb076d93575c9fdb3f4c1bbc2 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 13 Feb 2024 23:18:22 -0700 Subject: [PATCH] cleanup futures --- src/apatheia/jobs.nim | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/src/apatheia/jobs.nim b/src/apatheia/jobs.nim index 4170740..911c9a6 100644 --- a/src/apatheia/jobs.nim +++ b/src/apatheia/jobs.nim @@ -9,33 +9,43 @@ import chronos export queues -## TODO: -## setup queue to tie together future and result for a specific instance -## this setup will result in out-of-order results +## This module provides a simple way to submit jobs to taskpools +## and getting a result returned via an async future. +## ## type JobId* = uint ## job id, should match `future.id()` JobQueue*[T] = ref object + ## job queue object queue*: SignalQueue[(JobId, T)] futures*: Table[JobId, Future[T]] taskpool*: Taskpool running*: bool JobResult*[T] = object + ## hold a job result to be returned by jobs id*: JobId queue*: SignalQueue[(JobId, T)] -proc processJobs*(jobs: JobQueue) {.async.} = +proc processJobs*[T](jobs: JobQueue[T]) {.async.} = + ## Starts a "detached" async processor for a given job queue. + ## + ## This processor waits for events from the queue in the JobQueue + ## and complete the associated futures. + while jobs.running: echo "jobs running..." let res = await(jobs.queue.wait()).get() echo "jobs result: ", res.repr echo "jobs futes: ", jobs.futures.unsafeAddr.pointer.repr, " => ", jobs.futures.keys().toSeq() let (id, ret) = res - let fut = jobs.futures[id] - fut.complete(ret) + var fut: Future[T] + if jobs.futures.pop(id, fut): + fut.complete(ret) + else: + raise newException(IndexDefect, "missing future: " & $id) proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) = let fut = newFuture[T](name)