Merge dca235f46979fa231ed21b2251c96261bcf9b18f into 61ff2594d3e204b2ef597b17207094ada2fd7c5c

This commit is contained in:
Jaremy Creechley 2024-05-24 16:18:19 +02:00 committed by GitHub
commit 19c92f5f93
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 42 additions and 12 deletions

View File

@ -5,6 +5,8 @@
The goal of the apatheia library is to provide a painless, suffering free way of using multi-threading with async in Nim.
It's essentially just a wrapper around best patterns for integrating nim-taskpools with Chronos multithread signalling.
The main modules are:
- queues - queues with support for async signals
- jobs - macro and utilities for submitting jobs to a taskpool

View File

@ -9,7 +9,7 @@ srcDir = "src"
# Dependencies
requires "chronos >= 4.0.0"
requires "threading"
requires "threading#head"
requires "taskpools >= 0.0.5"
requires "chronicles"

View File

@ -1,10 +1,12 @@
import std/tables
import std/macros
import std/locks
import ./queues
import ./memretainers
import taskpools
import threading/smartptrs
import chronos
import chronicles
@ -33,13 +35,19 @@ logScope:
type
JobQueue*[T] = ref object ## job queue object
queue*: SignalQueue[(JobId, T)]
futures*: Table[JobId, Future[T]]
futures*: Table[JobId, (Future[T], SharedPtr[JobFlags])]
taskpool*: Taskpool
running*: bool
JobFlags* = object ## hold the result of a job after it finishes
lock*: Lock
isRunning*: bool
isCancelled*: bool
JobResult*[T] = object ## hold the result of a job after it finishes
id*: JobId
queue*: SignalQueue[(JobId, T)]
flags*: SharedPtr[JobFlags]
OpenArrayHolder*[T] = object
data*: ptr UncheckedArray[T]
@ -53,6 +61,11 @@ template toOpenArray*[T](arr: OpenArrayHolder[T]): auto =
func jobId*[T](fut: Future[T]): JobId =
JobId fut.id()
proc cancelled*[T](jobResult: JobResult[T]): bool =
# acquire jobResult.flags[].lock
# result = jobResult.flags[].isCancelled
discard
proc processJobs*[T](jobs: JobQueue[T]) {.async.} =
## Starts a "detached" async processor for a given job queue.
##
@ -65,7 +78,9 @@ proc processJobs*[T](jobs: JobQueue[T]) {.async.} =
let (id, ret) = await(jobs.queue.wait()).get()
trace "got job result", jobId = id
releaseMemory(id) # always release any retained memory
if (var fut: Future[T]; jobs.futures.pop(id, fut)):
var futTuple: (Future[T], SharedPtr[JobFlags])
if jobs.futures.pop(id, futTuple):
let fut = futTuple[0]
if not fut.finished():
fut.complete(ret)
else:
@ -76,9 +91,11 @@ proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Fu
## Creates a future that returns the result of the associated job.
let fut = newFuture[T](name)
let id = fut.jobId()
jobs.futures[id] = fut
let flags = newSharedPtr(JobFlags)
flags[].lock.initLock()
jobs.futures[id] = (fut, flags)
trace "job added: ", numberJobs = jobs.futures.len()
return (JobResult[T](id: id, queue: jobs.queue), fut)
return (JobResult[T](id: id, queue: jobs.queue, flags: flags), fut)
proc newJobQueue*[T](
maxItems: int = 0, taskpool: Taskpool = Taskpool.new()

View File

@ -10,14 +10,9 @@ export options
export threadsync
export chronos
type ChanPtr[T] = ptr Channel[T]
proc allocSharedChannel[T](): ChanPtr[T] =
cast[ChanPtr[T]](allocShared0(sizeof(Channel[T])))
type SignalQueue*[T] = object
signal: ThreadSignalPtr
chan*: ChanPtr[T]
chan*: ptr Channel[T]
proc dispose*[T](val: SignalQueue[T]) =
## Call to properly dispose of a SignalQueue.
@ -32,7 +27,7 @@ proc newSignalQueue*[T](
if res.isErr():
raise newException(ApatheiaSignalErr, res.error())
result.signal = res.get()
result.chan = allocSharedChannel[T]()
result.chan = cast[ptr Channel[T]](allocShared0(sizeof(Channel[T])))
result.chan[].open(maxItems)
proc send*[T](c: SignalQueue[T], msg: sink T): Result[void, string] {.raises: [].} =

View File

@ -34,6 +34,17 @@ proc strCompute(jobResult: JobResult[int], vals: OpenArrayHolder[char]) =
proc addStrings(jobResult: JobResult[float], vals: OpenArrayHolder[string]) =
discard
proc cancelTest(jobResult: JobResult[float], base: float, vals: OpenArrayHolder[float]) =
os.sleep(300)
if jobResult.cancelled():
discard
# discard jobResult.queue.send((jobResult.id, res,))
var res = base
for x in vals.toOpenArray():
res += x
discard jobResult.queue.send((jobResult.id, res,))
suite "async tests":
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
@ -75,3 +86,8 @@ suite "async tests":
var jobs = newJobQueue[float](taskpool = tp)
let job = jobs.submit(addStrings(@["a", "b", "c"]))
)
asyncTest "testing cancel":
var jobs = newJobQueue[float](taskpool = tp)
let res = await jobs.submit(addNumValues(10.0, @[1.0.float, 2.0]))
check res == 13.0