diff --git a/README.md b/README.md index eb1930c..db71190 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,8 @@ The goal of the apatheia library is to provide a painless, suffering free way of using multi-threading with async in Nim. +It's essentially just a wrapper around best patterns for integrating nim-taskpools with Chronos multithread signalling. + The main modules are: - queues - queues with support for async signals - jobs - macro and utilities for submitting jobs to a taskpool diff --git a/apatheia.nimble b/apatheia.nimble index da2af8c..b15ec81 100644 --- a/apatheia.nimble +++ b/apatheia.nimble @@ -9,7 +9,7 @@ srcDir = "src" # Dependencies requires "chronos >= 4.0.0" -requires "threading" +requires "threading#head" requires "taskpools >= 0.0.5" requires "chronicles" diff --git a/src/apatheia/jobs.nim b/src/apatheia/jobs.nim index 74a677e..b326b53 100644 --- a/src/apatheia/jobs.nim +++ b/src/apatheia/jobs.nim @@ -1,10 +1,12 @@ import std/tables import std/macros +import std/locks import ./queues import ./memretainers import taskpools +import threading/smartptrs import chronos import chronicles @@ -33,13 +35,19 @@ logScope: type JobQueue*[T] = ref object ## job queue object queue*: SignalQueue[(JobId, T)] - futures*: Table[JobId, Future[T]] + futures*: Table[JobId, (Future[T], SharedPtr[JobFlags])] taskpool*: Taskpool running*: bool + JobFlags* = object ## hold the result of a job after it finishes + lock*: Lock + isRunning*: bool + isCancelled*: bool + JobResult*[T] = object ## hold the result of a job after it finishes id*: JobId queue*: SignalQueue[(JobId, T)] + flags*: SharedPtr[JobFlags] OpenArrayHolder*[T] = object data*: ptr UncheckedArray[T] @@ -53,6 +61,11 @@ template toOpenArray*[T](arr: OpenArrayHolder[T]): auto = func jobId*[T](fut: Future[T]): JobId = JobId fut.id() +proc cancelled*[T](jobResult: JobResult[T]): bool = + # acquire jobResult.flags[].lock + # result = jobResult.flags[].isCancelled + discard + proc processJobs*[T](jobs: JobQueue[T]) {.async.} = ## Starts a "detached" async processor for a given job queue. ## @@ -65,7 +78,9 @@ proc processJobs*[T](jobs: JobQueue[T]) {.async.} = let (id, ret) = await(jobs.queue.wait()).get() trace "got job result", jobId = id releaseMemory(id) # always release any retained memory - if (var fut: Future[T]; jobs.futures.pop(id, fut)): + var futTuple: (Future[T], SharedPtr[JobFlags]) + if jobs.futures.pop(id, futTuple): + let fut = futTuple[0] if not fut.finished(): fut.complete(ret) else: @@ -76,9 +91,11 @@ proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Fu ## Creates a future that returns the result of the associated job. let fut = newFuture[T](name) let id = fut.jobId() - jobs.futures[id] = fut + let flags = newSharedPtr(JobFlags) + flags[].lock.initLock() + jobs.futures[id] = (fut, flags) trace "job added: ", numberJobs = jobs.futures.len() - return (JobResult[T](id: id, queue: jobs.queue), fut) + return (JobResult[T](id: id, queue: jobs.queue, flags: flags), fut) proc newJobQueue*[T]( maxItems: int = 0, taskpool: Taskpool = Taskpool.new() diff --git a/src/apatheia/queues.nim b/src/apatheia/queues.nim index f7e7643..e63830b 100644 --- a/src/apatheia/queues.nim +++ b/src/apatheia/queues.nim @@ -10,14 +10,9 @@ export options export threadsync export chronos -type ChanPtr[T] = ptr Channel[T] - -proc allocSharedChannel[T](): ChanPtr[T] = - cast[ChanPtr[T]](allocShared0(sizeof(Channel[T]))) - type SignalQueue*[T] = object signal: ThreadSignalPtr - chan*: ChanPtr[T] + chan*: ptr Channel[T] proc dispose*[T](val: SignalQueue[T]) = ## Call to properly dispose of a SignalQueue. @@ -32,7 +27,7 @@ proc newSignalQueue*[T]( if res.isErr(): raise newException(ApatheiaSignalErr, res.error()) result.signal = res.get() - result.chan = allocSharedChannel[T]() + result.chan = cast[ptr Channel[T]](allocShared0(sizeof(Channel[T]))) result.chan[].open(maxItems) proc send*[T](c: SignalQueue[T], msg: sink T): Result[void, string] {.raises: [].} = diff --git a/tests/tjobs.nim b/tests/tjobs.nim index de6e35c..49ab705 100644 --- a/tests/tjobs.nim +++ b/tests/tjobs.nim @@ -34,6 +34,17 @@ proc strCompute(jobResult: JobResult[int], vals: OpenArrayHolder[char]) = proc addStrings(jobResult: JobResult[float], vals: OpenArrayHolder[string]) = discard +proc cancelTest(jobResult: JobResult[float], base: float, vals: OpenArrayHolder[float]) = + os.sleep(300) + if jobResult.cancelled(): + discard + # discard jobResult.queue.send((jobResult.id, res,)) + + var res = base + for x in vals.toOpenArray(): + res += x + discard jobResult.queue.send((jobResult.id, res,)) + suite "async tests": var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads. @@ -75,3 +86,8 @@ suite "async tests": var jobs = newJobQueue[float](taskpool = tp) let job = jobs.submit(addStrings(@["a", "b", "c"])) ) + + asyncTest "testing cancel": + var jobs = newJobQueue[float](taskpool = tp) + let res = await jobs.submit(addNumValues(10.0, @[1.0.float, 2.0])) + check res == 13.0