From 2de80356b2a3d7269793698ff20dd62654dc4c01 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 20 Feb 2024 16:41:12 -0700 Subject: [PATCH] trying to setup a jobs flag --- src/apatheia/jobs.nim | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/apatheia/jobs.nim b/src/apatheia/jobs.nim index 74a677e..a92d6f4 100644 --- a/src/apatheia/jobs.nim +++ b/src/apatheia/jobs.nim @@ -1,10 +1,12 @@ import std/tables import std/macros +import std/locks import ./queues import ./memretainers import taskpools +import threading/smartptrs import chronos import chronicles @@ -33,13 +35,19 @@ logScope: type JobQueue*[T] = ref object ## job queue object queue*: SignalQueue[(JobId, T)] - futures*: Table[JobId, Future[T]] + futures*: Table[JobId, (Future[T], SharedPtr[JobFlags])] taskpool*: Taskpool running*: bool + JobFlags* = object ## hold the result of a job after it finishes + lock*: Lock + isRunning*: bool + isCancelled*: bool + JobResult*[T] = object ## hold the result of a job after it finishes id*: JobId queue*: SignalQueue[(JobId, T)] + flags*: SharedPtr[JobFlags] OpenArrayHolder*[T] = object data*: ptr UncheckedArray[T] @@ -65,7 +73,9 @@ proc processJobs*[T](jobs: JobQueue[T]) {.async.} = let (id, ret) = await(jobs.queue.wait()).get() trace "got job result", jobId = id releaseMemory(id) # always release any retained memory - if (var fut: Future[T]; jobs.futures.pop(id, fut)): + var futTuple: (Future[T], SharedPtr[JobFlags]) + if jobs.futures.pop(id, futTuple): + let fut = futTuple[0] if not fut.finished(): fut.complete(ret) else: @@ -76,9 +86,10 @@ proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Fu ## Creates a future that returns the result of the associated job. let fut = newFuture[T](name) let id = fut.jobId() - jobs.futures[id] = fut + let flags = newSharedPtr(JobFlags) + jobs.futures[id] = (fut, flags) trace "job added: ", numberJobs = jobs.futures.len() - return (JobResult[T](id: id, queue: jobs.queue), fut) + return (JobResult[T](id: id, queue: jobs.queue, flags: flags), fut) proc newJobQueue*[T]( maxItems: int = 0, taskpool: Taskpool = Taskpool.new()