This commit is contained in:
Jaremy Creechley 2024-02-16 14:16:17 -07:00
parent 898c7bce8c
commit 8a0c6757ec

View File

@ -21,16 +21,13 @@ logScope:
## ##
type type
JobQueue*[T] = ref object ## job queue object
JobQueue*[T] = ref object
## job queue object
queue*: SignalQueue[(JobId, T)] queue*: SignalQueue[(JobId, T)]
futures*: Table[JobId, Future[T]] futures*: Table[JobId, Future[T]]
taskpool*: Taskpool taskpool*: Taskpool
running*: bool running*: bool
JobResult*[T] = object JobResult*[T] = object ## hold a job result to be returned by jobs
## hold a job result to be returned by jobs
id*: JobId id*: JobId
queue*: SignalQueue[(JobId, T)] queue*: SignalQueue[(JobId, T)]
@ -41,7 +38,8 @@ type
template toOpenArray*[T](arr: OpenArrayHolder[T]): auto = template toOpenArray*[T](arr: OpenArrayHolder[T]): auto =
system.toOpenArray(arr.data, 0, arr.size) system.toOpenArray(arr.data, 0, arr.size)
func jobId*[T](fut: Future[T]): JobId = JobId fut.id() func jobId*[T](fut: Future[T]): JobId =
JobId fut.id()
proc processJobs*[T](jobs: JobQueue[T]) {.async.} = proc processJobs*[T](jobs: JobQueue[T]) {.async.} =
## Starts a "detached" async processor for a given job queue. ## Starts a "detached" async processor for a given job queue.
@ -50,7 +48,7 @@ proc processJobs*[T](jobs: JobQueue[T]) {.async.} =
## and complete the associated futures. ## and complete the associated futures.
const tn: string = $(JobQueue[T]) const tn: string = $(JobQueue[T])
info "Processing jobs in job queue for type ", type=tn info "Processing jobs in job queue for type ", type = tn
while jobs.running: while jobs.running:
let res = await(jobs.queue.wait()).get() let res = await(jobs.queue.wait()).get()
trace "got job result", jobResult = $res trace "got job result", jobResult = $res
@ -61,7 +59,7 @@ proc processJobs*[T](jobs: JobQueue[T]) {.async.} =
fut.complete(ret) fut.complete(ret)
else: else:
raise newException(IndexDefect, "missing future: " & $id) raise newException(IndexDefect, "missing future: " & $id)
info "Finishing processing jobs for type ", type=tn info "Finishing processing jobs for type ", type = tn
proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) = proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) =
## Creates a future that returns the result of the associated job. ## Creates a future that returns the result of the associated job.
@ -69,27 +67,31 @@ proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Fu
let id = fut.jobId() let id = fut.jobId()
jobs.futures[id] = fut jobs.futures[id] = fut
trace "jobs added: ", numberJobs = jobs.futures.len() trace "jobs added: ", numberJobs = jobs.futures.len()
return (JobResult[T](id: id, queue: jobs.queue), fut, ) return (JobResult[T](id: id, queue: jobs.queue), fut)
proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} = proc newJobQueue*[T](
maxItems: int = 0, taskpool: Taskpool = Taskpool.new()
): JobQueue[T] {.raises: [ApatheiaSignalErr].} =
## Creates a new async-compatible threaded job queue. ## Creates a new async-compatible threaded job queue.
result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true) result = JobQueue[T](
queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true
)
asyncSpawn(processJobs(result)) asyncSpawn(processJobs(result))
template checkJobArgs*[T](exp: seq[T], fut: untyped): OpenArrayHolder[T] = template checkJobArgs*[T](exp: seq[T], fut: untyped): OpenArrayHolder[T] =
when T is byte | SomeInteger | SomeFloat: when T is byte | SomeInteger | SomeFloat:
let rval = SeqHolder[T](data: exp) let rval = SeqHolder[T](data: exp)
fut.jobId().retainMemory(rval) fut.jobId().retainMemory(rval)
let expPtr = OpenArrayHolder[T](data: cast[ptr UncheckedArray[T]](unsafeAddr(rval.data[0])), size: rval.data.len()) let expPtr = OpenArrayHolder[T](
data: cast[ptr UncheckedArray[T]](unsafeAddr(rval.data[0])), size: rval.data.len()
)
expPtr expPtr
else: else:
{.error: "unsupported sequence type for job argument: " & $typeof(seq[T]).} {.error: "unsupported sequence type for job argument: " & $typeof(seq[T]).}
template checkJobArgs*(exp: typed, fut: untyped): auto = template checkJobArgs*(exp: typed, fut: untyped): auto =
exp exp
macro submitMacro(tp: untyped, jobs: untyped, exp: untyped): untyped = macro submitMacro(tp: untyped, jobs: untyped, exp: untyped): untyped =
## modifies the call expression to include the job queue and ## modifies the call expression to include the job queue and
## the job id parameters ## the job id parameters
@ -100,28 +102,31 @@ macro submitMacro(tp: untyped, jobs: untyped, exp: untyped): untyped =
var argids = newSeq[NimNode]() var argids = newSeq[NimNode]()
var letargs = nnkLetSection.newTree() var letargs = nnkLetSection.newTree()
for i, p in exp[1..^1]: for i, p in exp[1 ..^ 1]:
echo "CHECK ARGS: ", p.treeRepr echo "CHECK ARGS: ", p.treeRepr
let id = ident "arg" & $i let id = ident "arg" & $i
argids.add(id) argids.add(id)
let pn = nnkCall.newTree(ident"checkJobArgs", p, `futName`) let pn = nnkCall.newTree(ident"checkJobArgs", p, `futName`)
letargs.add nnkIdentDefs.newTree( id, newEmptyNode(), pn) letargs.add nnkIdentDefs.newTree(id, newEmptyNode(), pn)
# fncall.add(nnkCall.newTree(ident"checkJobArgs", p, `futName`)) # fncall.add(nnkCall.newTree(ident"checkJobArgs", p, `futName`))
echo "\nSUBMIT: ARGS: LET:\n", letargs.repr echo "\nSUBMIT: ARGS: LET:\n", letargs.repr
echo "" echo ""
var fncall = nnkCall.newTree(exp[0]) var fncall = nnkCall.newTree(exp[0])
fncall.add(jobRes) fncall.add(jobRes)
for p in argids: for p in argids:
fncall.add(p) fncall.add(p)
result = quote do: result = quote:
block: block:
let (`jobRes`, `futName`) = createFuture(`jobs`, `nm`) let (`jobRes`, `futName`) = createFuture(`jobs`, `nm`)
`letargs` `letargs`
when typeof(`fncall`) isnot void: when typeof(`fncall`) isnot void:
{.error: "Apatheia jobs cannot return values. The given proc returns type: " & $(typeof(`fncall`)) & {.
" for call " & astToStr(`fncall`).} error:
"Apatheia jobs cannot return values. The given proc returns type: " &
$(typeof(`fncall`)) & " for call " & astToStr(`fncall`)
.}
`jobs`.taskpool.spawn(`fncall`) `jobs`.taskpool.spawn(`fncall`)
`futName` `futName`
@ -138,34 +143,35 @@ when isMainModule:
import chronos/unittest2/asynctests import chronos/unittest2/asynctests
import std/macros import std/macros
proc addNumValues(jobResult: JobResult[float], base: float, vals: OpenArrayHolder[float]) = proc addNumValues(
jobResult: JobResult[float], base: float, vals: OpenArrayHolder[float]
) =
os.sleep(100) os.sleep(100)
var res = base var res = base
for x in vals.toOpenArray(): for x in vals.toOpenArray():
res += x res += x
discard jobResult.queue.send((jobResult.id, res,)) discard jobResult.queue.send((jobResult.id, res))
proc addStrings(jobResult: JobResult[float], vals: OpenArrayHolder[string]) = proc addStrings(jobResult: JobResult[float], vals: OpenArrayHolder[string]) =
discard discard
suite "async tests": suite "async tests":
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads. var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
asyncTest "basic openarray": asyncTest "basic openarray":
# expandMacros: var # expandMacros:
var jobs = newJobQueue[float](taskpool = tp) jobs = newJobQueue[float](taskpool = tp)
let job = jobs.submit(addNumValues(10.0, @[1.0.float, 2.0])) let job = jobs.submit(addNumValues(10.0, @[1.0.float, 2.0]))
let res = await job let res = await job
check res == 13.0 check res == 13.0
asyncTest "don't compile": asyncTest "don't compile":
# expandMacros: var # expandMacros:
var jobs = newJobQueue[float](taskpool = tp) jobs = newJobQueue[float](taskpool = tp)
let job = jobs.submit(addStrings(@["a", "b", "c"])) let job = jobs.submit(addStrings(@["a", "b", "c"]))
let res = await job let res = await job
check res == 13.0 check res == 13.0