From dca235f46979fa231ed21b2251c96261bcf9b18f Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 20 Feb 2024 19:26:35 -0700 Subject: [PATCH] trying to setup a jobs flag --- apatheia.nimble | 2 +- src/apatheia/jobs.nim | 6 ++++++ tests/tjobs.nim | 16 ++++++++++++++++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/apatheia.nimble b/apatheia.nimble index da2af8c..b15ec81 100644 --- a/apatheia.nimble +++ b/apatheia.nimble @@ -9,7 +9,7 @@ srcDir = "src" # Dependencies requires "chronos >= 4.0.0" -requires "threading" +requires "threading#head" requires "taskpools >= 0.0.5" requires "chronicles" diff --git a/src/apatheia/jobs.nim b/src/apatheia/jobs.nim index a92d6f4..b326b53 100644 --- a/src/apatheia/jobs.nim +++ b/src/apatheia/jobs.nim @@ -61,6 +61,11 @@ template toOpenArray*[T](arr: OpenArrayHolder[T]): auto = func jobId*[T](fut: Future[T]): JobId = JobId fut.id() +proc cancelled*[T](jobResult: JobResult[T]): bool = + # acquire jobResult.flags[].lock + # result = jobResult.flags[].isCancelled + discard + proc processJobs*[T](jobs: JobQueue[T]) {.async.} = ## Starts a "detached" async processor for a given job queue. ## @@ -87,6 +92,7 @@ proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Fu let fut = newFuture[T](name) let id = fut.jobId() let flags = newSharedPtr(JobFlags) + flags[].lock.initLock() jobs.futures[id] = (fut, flags) trace "job added: ", numberJobs = jobs.futures.len() return (JobResult[T](id: id, queue: jobs.queue, flags: flags), fut) diff --git a/tests/tjobs.nim b/tests/tjobs.nim index de6e35c..49ab705 100644 --- a/tests/tjobs.nim +++ b/tests/tjobs.nim @@ -34,6 +34,17 @@ proc strCompute(jobResult: JobResult[int], vals: OpenArrayHolder[char]) = proc addStrings(jobResult: JobResult[float], vals: OpenArrayHolder[string]) = discard +proc cancelTest(jobResult: JobResult[float], base: float, vals: OpenArrayHolder[float]) = + os.sleep(300) + if jobResult.cancelled(): + discard + # discard jobResult.queue.send((jobResult.id, res,)) + + var res = base + for x in vals.toOpenArray(): + res += x + discard jobResult.queue.send((jobResult.id, res,)) + suite "async tests": var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads. @@ -75,3 +86,8 @@ suite "async tests": var jobs = newJobQueue[float](taskpool = tp) let job = jobs.submit(addStrings(@["a", "b", "c"])) ) + + asyncTest "testing cancel": + var jobs = newJobQueue[float](taskpool = tp) + let res = await jobs.submit(addNumValues(10.0, @[1.0.float, 2.0])) + check res == 13.0