From 61ff2594d3e204b2ef597b17207094ada2fd7c5c Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Fri, 16 Feb 2024 17:13:59 -0700 Subject: [PATCH] Feature: handle seq[byte] and strings using openArray's and memory retention (#7) Limited support for passing seq's of bytes and strings to tasklets using openArray --- apatheia.nimble | 1 + build.nims | 8 ++ config.nims | 2 + src/apatheia.nim | 1 - src/apatheia/jobs.nim | 158 +++++++++++++++++++++++++++------- src/apatheia/macroutils.nim | 45 +++++----- src/apatheia/memretainers.nim | 30 +++++++ src/apatheia/queues.nim | 14 +-- src/apatheia/tasks.nim | 89 ++++++++++++------- src/apatheia/types.nim | 3 +- tests/tjobs.nim | 47 ++++++++++ tests/ttasks.nim | 22 +++++ 12 files changed, 329 insertions(+), 91 deletions(-) create mode 100644 build.nims create mode 100644 src/apatheia/memretainers.nim diff --git a/apatheia.nimble b/apatheia.nimble index 7bda637..da2af8c 100644 --- a/apatheia.nimble +++ b/apatheia.nimble @@ -13,3 +13,4 @@ requires "threading" requires "taskpools >= 0.0.5" requires "chronicles" +include "build.nims" diff --git a/build.nims b/build.nims new file mode 100644 index 0000000..8fe8fb2 --- /dev/null +++ b/build.nims @@ -0,0 +1,8 @@ + +import std/[os, strutils] + +task test, "unit tests": + for file in listFiles("tests"): + let name = file.splitPath().tail + if name.startsWith("t") and name.endsWith(".nim"): + exec "nim c -r " & file diff --git a/config.nims b/config.nims index bae2200..60f0c73 100644 --- a/config.nims +++ b/config.nims @@ -1,2 +1,4 @@ --threads:on + +include "build.nims" diff --git a/src/apatheia.nim b/src/apatheia.nim index 569c129..4cd8c96 100644 --- a/src/apatheia.nim +++ b/src/apatheia.nim @@ -1,3 +1,2 @@ - import apatheia/tasks export tasks diff --git a/src/apatheia/jobs.nim b/src/apatheia/jobs.nim index d4a1898..74a677e 100644 --- a/src/apatheia/jobs.nim +++ b/src/apatheia/jobs.nim @@ -2,6 +2,7 @@ import std/tables import std/macros import ./queues +import ./memretainers import taskpools import chronos @@ -16,75 +17,172 @@ logScope: ## This module provides a simple way to submit jobs to taskpools ## and getting a result returned via an async future. -## +## +## Any compatible arguments of `seq[T]` or `string` args passed +## via the `submit` macro will be converted into the special `OpenArrayHolder[T]` type. +## The `submit` macro converts these arguments in this object and retains the +## memory associated with the original `seq[T]` or `string` object. +## This greatly simplifies the passing of these these types in `refc`. +## +## Note, for `arc` or `orc` GC's this setup will be replaced with a move operation in the future. +## These GC's also allow greater support for moving GC types across thread boundaries. +## +## Currently this module limits support for GC types to ensure `refc` safety. ## type - JobId* = uint ## job id, should match `future.id()` - - JobQueue*[T] = ref object - ## job queue object + JobQueue*[T] = ref object ## job queue object queue*: SignalQueue[(JobId, T)] futures*: Table[JobId, Future[T]] taskpool*: Taskpool running*: bool - JobResult*[T] = object - ## hold a job result to be returned by jobs + JobResult*[T] = object ## hold the result of a job after it finishes id*: JobId queue*: SignalQueue[(JobId, T)] + OpenArrayHolder*[T] = object + data*: ptr UncheckedArray[T] + size*: int + + SupportedSeqTypes* = byte | SomeInteger | SomeFloat + +template toOpenArray*[T](arr: OpenArrayHolder[T]): auto = + system.toOpenArray(arr.data, 0, arr.size) + +func jobId*[T](fut: Future[T]): JobId = + JobId fut.id() + proc processJobs*[T](jobs: JobQueue[T]) {.async.} = ## Starts a "detached" async processor for a given job queue. ## ## This processor waits for events from the queue in the JobQueue - ## and complete the associated futures. + ## and completes the associated future. const tn: string = $(JobQueue[T]) - info "Processing jobs in job queue for type ", type=tn + info "Processing jobs in job queue for type ", type = tn while jobs.running: - let res = await(jobs.queue.wait()).get() - trace "got job result", jobResult = $res - let (id, ret) = res - var fut: Future[T] - if jobs.futures.pop(id, fut): - fut.complete(ret) + let (id, ret) = await(jobs.queue.wait()).get() + trace "got job result", jobId = id + releaseMemory(id) # always release any retained memory + if (var fut: Future[T]; jobs.futures.pop(id, fut)): + if not fut.finished(): + fut.complete(ret) else: raise newException(IndexDefect, "missing future: " & $id) - info "Finishing processing jobs for type ", type=tn + info "Finishing processing jobs for type ", type = tn proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) = ## Creates a future that returns the result of the associated job. let fut = newFuture[T](name) - let id = JobId fut.id() + let id = fut.jobId() jobs.futures[id] = fut - trace "jobs added: ", numberJobs = jobs.futures.len() - return (JobResult[T](id: id, queue: jobs.queue), fut, ) + trace "job added: ", numberJobs = jobs.futures.len() + return (JobResult[T](id: id, queue: jobs.queue), fut) -proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} = +proc newJobQueue*[T]( + maxItems: int = 0, taskpool: Taskpool = Taskpool.new() +): JobQueue[T] {.raises: [ApatheiaSignalErr].} = ## Creates a new async-compatible threaded job queue. - result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true) + result = JobQueue[T]( + queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true + ) asyncSpawn(processJobs(result)) +template checkJobArgs*[T](exp: seq[T], fut: untyped): OpenArrayHolder[T] = + when T is SupportedSeqTypes: + let rval = SeqRetainer[T](data: exp) + retainMemory(fut.jobId(), rval) + let expPtr = OpenArrayHolder[T]( + data: cast[ptr UncheckedArray[T]](unsafeAddr(rval.data[0])), size: rval.data.len() + ) + expPtr + else: + {.error: "unsupported sequence type for job argument: " & $typeof(seq[T]).} + +template checkJobArgs*(exp: string, fut: untyped): OpenArrayHolder[char] = + let rval = StrRetainer(data: exp) + retainMemory(fut.jobId(), rval) + let expPtr = OpenArrayHolder[char]( + data: cast[ptr UncheckedArray[char]](unsafeAddr(rval.data[0])), size: rval.data.len() + ) + expPtr + +template checkJobArgs*(exp: typed, fut: untyped): auto = + exp + macro submitMacro(tp: untyped, jobs: untyped, exp: untyped): untyped = ## modifies the call expression to include the job queue and ## the job id parameters - let jobRes = genSym(nskLet, "jobRes") - let futName = genSym(nskLet, "fut") + let jobRes = ident("jobRes") + let futName = ident("fut") let nm = newLit(repr(exp)) + + var argids = newSeq[NimNode]() + var letargs = nnkLetSection.newTree() + for i, p in exp[1 ..^ 1]: + let id = ident "arg" & $i + argids.add(id) + let pn = nnkCall.newTree(ident"checkJobArgs", p, `futName`) + letargs.add nnkIdentDefs.newTree(id, newEmptyNode(), pn) + var fncall = nnkCall.newTree(exp[0]) fncall.add(jobRes) - for p in exp[1..^1]: fncall.add(p) + for p in argids: + fncall.add(p) - result = quote do: - let (`jobRes`, `futName`) = createFuture(`jobs`, `nm`) - `jobs`.taskpool.spawn(`fncall`) - `futName` + result = quote: + block: + let (`jobRes`, `futName`) = createFuture(`jobs`, `nm`) + `letargs` + when typeof(`fncall`) isnot void: + {. + error: + "Apatheia jobs cannot return values. The given proc returns type: " & + $(typeof(`fncall`)) & " for call " & astToStr(`fncall`) + .} + `jobs`.taskpool.spawn(`fncall`) + `futName` - # echo "submit: res:\n", result.repr - # echo "" + when isMainModule: + echo "\nSUBMIT MACRO::\n", result.repr + echo "" template submit*[T](jobs: JobQueue[T], exp: untyped): Future[T] = submitMacro(T, jobs, exp) +when isMainModule: + import os + import chronos/threadsync + import chronos/unittest2/asynctests + + proc addNumValues( + jobResult: JobResult[float], base: float, vals: OpenArrayHolder[float] + ) = + os.sleep(100) + var res = base + for x in vals.toOpenArray(): + res += x + discard jobResult.queue.send((jobResult.id, res)) + + proc addStrings(jobResult: JobResult[float], vals: OpenArrayHolder[string]) = + discard + + suite "async tests": + var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads. + + asyncTest "basic openarray": + var jobs = newJobQueue[float](taskpool = tp) + + let job = jobs.submit(addNumValues(10.0, @[1.0.float, 2.0])) + let res = await job + + check res == 13.0 + + asyncTest "don't compile": + check not compiles( + block: + var jobs = newJobQueue[float](taskpool = tp) + let job = jobs.submit(addStrings(@["a", "b", "c"])) + ) diff --git a/src/apatheia/macroutils.nim b/src/apatheia/macroutils.nim index 8d615fe..077ca51 100644 --- a/src/apatheia/macroutils.nim +++ b/src/apatheia/macroutils.nim @@ -3,22 +3,19 @@ import std/[tables, strutils, typetraits, macros] proc makeProcName*(s: string): string = result = "" for c in s: - if c.isAlphaNumeric: result.add c + if c.isAlphaNumeric: + result.add c proc hasReturnType*(params: NimNode): bool = - if params != nil and params.len > 0 and params[0] != nil and - params[0].kind != nnkEmpty: + if params != nil and params.len > 0 and params[0] != nil and params[0].kind != nnkEmpty: result = true proc getReturnType*(params: NimNode): NimNode = - if params != nil and params.len > 0 and params[0] != nil and - params[0].kind != nnkEmpty: + if params != nil and params.len > 0 and params[0] != nil and params[0].kind != nnkEmpty: result = params[0] proc firstArgument*(params: NimNode): (NimNode, NimNode) = - if params != nil and - params.len > 0 and - params[1] != nil and + if params != nil and params.len > 0 and params[1] != nil and params[1].kind == nnkIdentDefs: result = (ident params[1][0].strVal, params[1][1]) else: @@ -29,15 +26,13 @@ iterator paramsIter*(params: NimNode): tuple[name, ntype: NimNode] = for i in 1 ..< params.len: let arg = params[i] let argType = arg[^2] - for j in 0 ..< arg.len-2: + for j in 0 ..< arg.len - 2: yield (arg[j], argType) proc signalTuple*(sig: NimNode): NimNode = let otp = nnkEmpty.newTree() # echo "signalObjRaw:sig1: ", sig.treeRepr - let sigTyp = - if sig.kind == nnkSym: sig.getTypeInst - else: sig.getTypeInst + let sigTyp = if sig.kind == nnkSym: sig.getTypeInst else: sig.getTypeInst # echo "signalObjRaw:sig2: ", sigTyp.treeRepr let stp = if sigTyp.kind == nnkProcTy: @@ -52,14 +47,16 @@ proc signalTuple*(sig: NimNode): NimNode = # echo "signalObjRaw:sig: ", stp.repr var args: seq[NimNode] - for i in 2..