From 8a0c6757ec151bacf47271486dce478a121a3958 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Fri, 16 Feb 2024 14:16:17 -0700 Subject: [PATCH] format --- src/apatheia/jobs.nim | 72 +++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/src/apatheia/jobs.nim b/src/apatheia/jobs.nim index d9f255d..7199fd0 100644 --- a/src/apatheia/jobs.nim +++ b/src/apatheia/jobs.nim @@ -21,16 +21,13 @@ logScope: ## type - - JobQueue*[T] = ref object - ## job queue object + JobQueue*[T] = ref object ## job queue object queue*: SignalQueue[(JobId, T)] futures*: Table[JobId, Future[T]] taskpool*: Taskpool running*: bool - JobResult*[T] = object - ## hold a job result to be returned by jobs + JobResult*[T] = object ## hold a job result to be returned by jobs id*: JobId queue*: SignalQueue[(JobId, T)] @@ -41,7 +38,8 @@ type template toOpenArray*[T](arr: OpenArrayHolder[T]): auto = system.toOpenArray(arr.data, 0, arr.size) -func jobId*[T](fut: Future[T]): JobId = JobId fut.id() +func jobId*[T](fut: Future[T]): JobId = + JobId fut.id() proc processJobs*[T](jobs: JobQueue[T]) {.async.} = ## Starts a "detached" async processor for a given job queue. @@ -50,7 +48,7 @@ proc processJobs*[T](jobs: JobQueue[T]) {.async.} = ## and complete the associated futures. const tn: string = $(JobQueue[T]) - info "Processing jobs in job queue for type ", type=tn + info "Processing jobs in job queue for type ", type = tn while jobs.running: let res = await(jobs.queue.wait()).get() trace "got job result", jobResult = $res @@ -61,7 +59,7 @@ proc processJobs*[T](jobs: JobQueue[T]) {.async.} = fut.complete(ret) else: raise newException(IndexDefect, "missing future: " & $id) - info "Finishing processing jobs for type ", type=tn + info "Finishing processing jobs for type ", type = tn proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) = ## Creates a future that returns the result of the associated job. @@ -69,27 +67,31 @@ proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Fu let id = fut.jobId() jobs.futures[id] = fut trace "jobs added: ", numberJobs = jobs.futures.len() - return (JobResult[T](id: id, queue: jobs.queue), fut, ) + return (JobResult[T](id: id, queue: jobs.queue), fut) -proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} = +proc newJobQueue*[T]( + maxItems: int = 0, taskpool: Taskpool = Taskpool.new() +): JobQueue[T] {.raises: [ApatheiaSignalErr].} = ## Creates a new async-compatible threaded job queue. - result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true) + result = JobQueue[T]( + queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true + ) asyncSpawn(processJobs(result)) template checkJobArgs*[T](exp: seq[T], fut: untyped): OpenArrayHolder[T] = when T is byte | SomeInteger | SomeFloat: let rval = SeqHolder[T](data: exp) fut.jobId().retainMemory(rval) - let expPtr = OpenArrayHolder[T](data: cast[ptr UncheckedArray[T]](unsafeAddr(rval.data[0])), size: rval.data.len()) + let expPtr = OpenArrayHolder[T]( + data: cast[ptr UncheckedArray[T]](unsafeAddr(rval.data[0])), size: rval.data.len() + ) expPtr else: {.error: "unsupported sequence type for job argument: " & $typeof(seq[T]).} - template checkJobArgs*(exp: typed, fut: untyped): auto = exp - macro submitMacro(tp: untyped, jobs: untyped, exp: untyped): untyped = ## modifies the call expression to include the job queue and ## the job id parameters @@ -100,28 +102,31 @@ macro submitMacro(tp: untyped, jobs: untyped, exp: untyped): untyped = var argids = newSeq[NimNode]() var letargs = nnkLetSection.newTree() - for i, p in exp[1..^1]: + for i, p in exp[1 ..^ 1]: echo "CHECK ARGS: ", p.treeRepr let id = ident "arg" & $i argids.add(id) let pn = nnkCall.newTree(ident"checkJobArgs", p, `futName`) - letargs.add nnkIdentDefs.newTree( id, newEmptyNode(), pn) + letargs.add nnkIdentDefs.newTree(id, newEmptyNode(), pn) # fncall.add(nnkCall.newTree(ident"checkJobArgs", p, `futName`)) echo "\nSUBMIT: ARGS: LET:\n", letargs.repr echo "" - + var fncall = nnkCall.newTree(exp[0]) fncall.add(jobRes) for p in argids: fncall.add(p) - result = quote do: + result = quote: block: let (`jobRes`, `futName`) = createFuture(`jobs`, `nm`) `letargs` when typeof(`fncall`) isnot void: - {.error: "Apatheia jobs cannot return values. The given proc returns type: " & $(typeof(`fncall`)) & - " for call " & astToStr(`fncall`).} + {. + error: + "Apatheia jobs cannot return values. The given proc returns type: " & + $(typeof(`fncall`)) & " for call " & astToStr(`fncall`) + .} `jobs`.taskpool.spawn(`fncall`) `futName` @@ -138,34 +143,35 @@ when isMainModule: import chronos/unittest2/asynctests import std/macros - proc addNumValues(jobResult: JobResult[float], base: float, vals: OpenArrayHolder[float]) = + proc addNumValues( + jobResult: JobResult[float], base: float, vals: OpenArrayHolder[float] + ) = os.sleep(100) var res = base for x in vals.toOpenArray(): res += x - discard jobResult.queue.send((jobResult.id, res,)) + discard jobResult.queue.send((jobResult.id, res)) proc addStrings(jobResult: JobResult[float], vals: OpenArrayHolder[string]) = discard suite "async tests": - var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads. asyncTest "basic openarray": - # expandMacros: - var jobs = newJobQueue[float](taskpool = tp) + var # expandMacros: + jobs = newJobQueue[float](taskpool = tp) - let job = jobs.submit(addNumValues(10.0, @[1.0.float, 2.0])) - let res = await job + let job = jobs.submit(addNumValues(10.0, @[1.0.float, 2.0])) + let res = await job - check res == 13.0 + check res == 13.0 asyncTest "don't compile": - # expandMacros: - var jobs = newJobQueue[float](taskpool = tp) + var # expandMacros: + jobs = newJobQueue[float](taskpool = tp) - let job = jobs.submit(addStrings(@["a", "b", "c"])) - let res = await job + let job = jobs.submit(addStrings(@["a", "b", "c"])) + let res = await job - check res == 13.0 + check res == 13.0