diff --git a/src/apatheia/jobs.nim b/src/apatheia/jobs.nim index 5905167..dbbbe31 100644 --- a/src/apatheia/jobs.nim +++ b/src/apatheia/jobs.nim @@ -43,7 +43,7 @@ template toOpenArray*[T](arr: OpenArrayHolder[T]): auto = func jobId*[T](fut: Future[T]): JobId = JobId fut.id() -proc processJobs*[T](jobs: JobQueue[T]) {.async.} = +proc processJobs*[T](jobs: JobQueue[T]) {.async, raises: [].} = ## Starts a "detached" async processor for a given job queue. ## ## This processor waits for events from the queue in the JobQueue @@ -52,13 +52,13 @@ proc processJobs*[T](jobs: JobQueue[T]) {.async.} = const tn: string = $(JobQueue[T]) info "Processing jobs in job queue for type ", type = tn while jobs.running: - let res = await(jobs.queue.wait()).get() - trace "got job result", jobResult = $res - let (id, ret) = res - releaseMemory(id) # release any retained memory + let (id, ret) = await(jobs.queue.wait()).get() + trace "got job result", jobId = id + releaseMemory(id) # always release any retained memory var fut: Future[T] if jobs.futures.pop(id, fut): - fut.complete(ret) + if not fut.finished(): + fut.complete(ret) else: raise newException(IndexDefect, "missing future: " & $id) info "Finishing processing jobs for type ", type = tn