mirror of
https://github.com/logos-storage/apatheia.git
synced 2026-01-02 04:53:10 +00:00
Feature: handle seq[byte] and strings using openArray's and memory retention (#7)
Limited support for passing seq's of bytes and strings to tasklets using openArray
This commit is contained in:
parent
44d54bd4b3
commit
61ff2594d3
@ -13,3 +13,4 @@ requires "threading"
|
|||||||
requires "taskpools >= 0.0.5"
|
requires "taskpools >= 0.0.5"
|
||||||
requires "chronicles"
|
requires "chronicles"
|
||||||
|
|
||||||
|
include "build.nims"
|
||||||
|
|||||||
8
build.nims
Normal file
8
build.nims
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
|
||||||
|
import std/[os, strutils]
|
||||||
|
|
||||||
|
task test, "unit tests":
|
||||||
|
for file in listFiles("tests"):
|
||||||
|
let name = file.splitPath().tail
|
||||||
|
if name.startsWith("t") and name.endsWith(".nim"):
|
||||||
|
exec "nim c -r " & file
|
||||||
@ -1,2 +1,4 @@
|
|||||||
|
|
||||||
--threads:on
|
--threads:on
|
||||||
|
|
||||||
|
include "build.nims"
|
||||||
|
|||||||
@ -1,3 +1,2 @@
|
|||||||
|
|
||||||
import apatheia/tasks
|
import apatheia/tasks
|
||||||
export tasks
|
export tasks
|
||||||
|
|||||||
@ -2,6 +2,7 @@ import std/tables
|
|||||||
import std/macros
|
import std/macros
|
||||||
|
|
||||||
import ./queues
|
import ./queues
|
||||||
|
import ./memretainers
|
||||||
|
|
||||||
import taskpools
|
import taskpools
|
||||||
import chronos
|
import chronos
|
||||||
@ -16,75 +17,172 @@ logScope:
|
|||||||
|
|
||||||
## This module provides a simple way to submit jobs to taskpools
|
## This module provides a simple way to submit jobs to taskpools
|
||||||
## and getting a result returned via an async future.
|
## and getting a result returned via an async future.
|
||||||
##
|
##
|
||||||
|
## Any compatible arguments of `seq[T]` or `string` args passed
|
||||||
|
## via the `submit` macro will be converted into the special `OpenArrayHolder[T]` type.
|
||||||
|
## The `submit` macro converts these arguments in this object and retains the
|
||||||
|
## memory associated with the original `seq[T]` or `string` object.
|
||||||
|
## This greatly simplifies the passing of these these types in `refc`.
|
||||||
|
##
|
||||||
|
## Note, for `arc` or `orc` GC's this setup will be replaced with a move operation in the future.
|
||||||
|
## These GC's also allow greater support for moving GC types across thread boundaries.
|
||||||
|
##
|
||||||
|
## Currently this module limits support for GC types to ensure `refc` safety.
|
||||||
##
|
##
|
||||||
|
|
||||||
type
|
type
|
||||||
JobId* = uint ## job id, should match `future.id()`
|
JobQueue*[T] = ref object ## job queue object
|
||||||
|
|
||||||
JobQueue*[T] = ref object
|
|
||||||
## job queue object
|
|
||||||
queue*: SignalQueue[(JobId, T)]
|
queue*: SignalQueue[(JobId, T)]
|
||||||
futures*: Table[JobId, Future[T]]
|
futures*: Table[JobId, Future[T]]
|
||||||
taskpool*: Taskpool
|
taskpool*: Taskpool
|
||||||
running*: bool
|
running*: bool
|
||||||
|
|
||||||
JobResult*[T] = object
|
JobResult*[T] = object ## hold the result of a job after it finishes
|
||||||
## hold a job result to be returned by jobs
|
|
||||||
id*: JobId
|
id*: JobId
|
||||||
queue*: SignalQueue[(JobId, T)]
|
queue*: SignalQueue[(JobId, T)]
|
||||||
|
|
||||||
|
OpenArrayHolder*[T] = object
|
||||||
|
data*: ptr UncheckedArray[T]
|
||||||
|
size*: int
|
||||||
|
|
||||||
|
SupportedSeqTypes* = byte | SomeInteger | SomeFloat
|
||||||
|
|
||||||
|
template toOpenArray*[T](arr: OpenArrayHolder[T]): auto =
|
||||||
|
system.toOpenArray(arr.data, 0, arr.size)
|
||||||
|
|
||||||
|
func jobId*[T](fut: Future[T]): JobId =
|
||||||
|
JobId fut.id()
|
||||||
|
|
||||||
proc processJobs*[T](jobs: JobQueue[T]) {.async.} =
|
proc processJobs*[T](jobs: JobQueue[T]) {.async.} =
|
||||||
## Starts a "detached" async processor for a given job queue.
|
## Starts a "detached" async processor for a given job queue.
|
||||||
##
|
##
|
||||||
## This processor waits for events from the queue in the JobQueue
|
## This processor waits for events from the queue in the JobQueue
|
||||||
## and complete the associated futures.
|
## and completes the associated future.
|
||||||
|
|
||||||
const tn: string = $(JobQueue[T])
|
const tn: string = $(JobQueue[T])
|
||||||
info "Processing jobs in job queue for type ", type=tn
|
info "Processing jobs in job queue for type ", type = tn
|
||||||
while jobs.running:
|
while jobs.running:
|
||||||
let res = await(jobs.queue.wait()).get()
|
let (id, ret) = await(jobs.queue.wait()).get()
|
||||||
trace "got job result", jobResult = $res
|
trace "got job result", jobId = id
|
||||||
let (id, ret) = res
|
releaseMemory(id) # always release any retained memory
|
||||||
var fut: Future[T]
|
if (var fut: Future[T]; jobs.futures.pop(id, fut)):
|
||||||
if jobs.futures.pop(id, fut):
|
if not fut.finished():
|
||||||
fut.complete(ret)
|
fut.complete(ret)
|
||||||
else:
|
else:
|
||||||
raise newException(IndexDefect, "missing future: " & $id)
|
raise newException(IndexDefect, "missing future: " & $id)
|
||||||
info "Finishing processing jobs for type ", type=tn
|
info "Finishing processing jobs for type ", type = tn
|
||||||
|
|
||||||
proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) =
|
proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) =
|
||||||
## Creates a future that returns the result of the associated job.
|
## Creates a future that returns the result of the associated job.
|
||||||
let fut = newFuture[T](name)
|
let fut = newFuture[T](name)
|
||||||
let id = JobId fut.id()
|
let id = fut.jobId()
|
||||||
jobs.futures[id] = fut
|
jobs.futures[id] = fut
|
||||||
trace "jobs added: ", numberJobs = jobs.futures.len()
|
trace "job added: ", numberJobs = jobs.futures.len()
|
||||||
return (JobResult[T](id: id, queue: jobs.queue), fut, )
|
return (JobResult[T](id: id, queue: jobs.queue), fut)
|
||||||
|
|
||||||
proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} =
|
proc newJobQueue*[T](
|
||||||
|
maxItems: int = 0, taskpool: Taskpool = Taskpool.new()
|
||||||
|
): JobQueue[T] {.raises: [ApatheiaSignalErr].} =
|
||||||
## Creates a new async-compatible threaded job queue.
|
## Creates a new async-compatible threaded job queue.
|
||||||
result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true)
|
result = JobQueue[T](
|
||||||
|
queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true
|
||||||
|
)
|
||||||
asyncSpawn(processJobs(result))
|
asyncSpawn(processJobs(result))
|
||||||
|
|
||||||
|
template checkJobArgs*[T](exp: seq[T], fut: untyped): OpenArrayHolder[T] =
|
||||||
|
when T is SupportedSeqTypes:
|
||||||
|
let rval = SeqRetainer[T](data: exp)
|
||||||
|
retainMemory(fut.jobId(), rval)
|
||||||
|
let expPtr = OpenArrayHolder[T](
|
||||||
|
data: cast[ptr UncheckedArray[T]](unsafeAddr(rval.data[0])), size: rval.data.len()
|
||||||
|
)
|
||||||
|
expPtr
|
||||||
|
else:
|
||||||
|
{.error: "unsupported sequence type for job argument: " & $typeof(seq[T]).}
|
||||||
|
|
||||||
|
template checkJobArgs*(exp: string, fut: untyped): OpenArrayHolder[char] =
|
||||||
|
let rval = StrRetainer(data: exp)
|
||||||
|
retainMemory(fut.jobId(), rval)
|
||||||
|
let expPtr = OpenArrayHolder[char](
|
||||||
|
data: cast[ptr UncheckedArray[char]](unsafeAddr(rval.data[0])), size: rval.data.len()
|
||||||
|
)
|
||||||
|
expPtr
|
||||||
|
|
||||||
|
template checkJobArgs*(exp: typed, fut: untyped): auto =
|
||||||
|
exp
|
||||||
|
|
||||||
macro submitMacro(tp: untyped, jobs: untyped, exp: untyped): untyped =
|
macro submitMacro(tp: untyped, jobs: untyped, exp: untyped): untyped =
|
||||||
## modifies the call expression to include the job queue and
|
## modifies the call expression to include the job queue and
|
||||||
## the job id parameters
|
## the job id parameters
|
||||||
|
|
||||||
let jobRes = genSym(nskLet, "jobRes")
|
let jobRes = ident("jobRes")
|
||||||
let futName = genSym(nskLet, "fut")
|
let futName = ident("fut")
|
||||||
let nm = newLit(repr(exp))
|
let nm = newLit(repr(exp))
|
||||||
|
|
||||||
|
var argids = newSeq[NimNode]()
|
||||||
|
var letargs = nnkLetSection.newTree()
|
||||||
|
for i, p in exp[1 ..^ 1]:
|
||||||
|
let id = ident "arg" & $i
|
||||||
|
argids.add(id)
|
||||||
|
let pn = nnkCall.newTree(ident"checkJobArgs", p, `futName`)
|
||||||
|
letargs.add nnkIdentDefs.newTree(id, newEmptyNode(), pn)
|
||||||
|
|
||||||
var fncall = nnkCall.newTree(exp[0])
|
var fncall = nnkCall.newTree(exp[0])
|
||||||
fncall.add(jobRes)
|
fncall.add(jobRes)
|
||||||
for p in exp[1..^1]: fncall.add(p)
|
for p in argids:
|
||||||
|
fncall.add(p)
|
||||||
|
|
||||||
result = quote do:
|
result = quote:
|
||||||
let (`jobRes`, `futName`) = createFuture(`jobs`, `nm`)
|
block:
|
||||||
`jobs`.taskpool.spawn(`fncall`)
|
let (`jobRes`, `futName`) = createFuture(`jobs`, `nm`)
|
||||||
`futName`
|
`letargs`
|
||||||
|
when typeof(`fncall`) isnot void:
|
||||||
|
{.
|
||||||
|
error:
|
||||||
|
"Apatheia jobs cannot return values. The given proc returns type: " &
|
||||||
|
$(typeof(`fncall`)) & " for call " & astToStr(`fncall`)
|
||||||
|
.}
|
||||||
|
`jobs`.taskpool.spawn(`fncall`)
|
||||||
|
`futName`
|
||||||
|
|
||||||
# echo "submit: res:\n", result.repr
|
when isMainModule:
|
||||||
# echo ""
|
echo "\nSUBMIT MACRO::\n", result.repr
|
||||||
|
echo ""
|
||||||
|
|
||||||
template submit*[T](jobs: JobQueue[T], exp: untyped): Future[T] =
|
template submit*[T](jobs: JobQueue[T], exp: untyped): Future[T] =
|
||||||
submitMacro(T, jobs, exp)
|
submitMacro(T, jobs, exp)
|
||||||
|
|
||||||
|
when isMainModule:
|
||||||
|
import os
|
||||||
|
import chronos/threadsync
|
||||||
|
import chronos/unittest2/asynctests
|
||||||
|
|
||||||
|
proc addNumValues(
|
||||||
|
jobResult: JobResult[float], base: float, vals: OpenArrayHolder[float]
|
||||||
|
) =
|
||||||
|
os.sleep(100)
|
||||||
|
var res = base
|
||||||
|
for x in vals.toOpenArray():
|
||||||
|
res += x
|
||||||
|
discard jobResult.queue.send((jobResult.id, res))
|
||||||
|
|
||||||
|
proc addStrings(jobResult: JobResult[float], vals: OpenArrayHolder[string]) =
|
||||||
|
discard
|
||||||
|
|
||||||
|
suite "async tests":
|
||||||
|
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
||||||
|
|
||||||
|
asyncTest "basic openarray":
|
||||||
|
var jobs = newJobQueue[float](taskpool = tp)
|
||||||
|
|
||||||
|
let job = jobs.submit(addNumValues(10.0, @[1.0.float, 2.0]))
|
||||||
|
let res = await job
|
||||||
|
|
||||||
|
check res == 13.0
|
||||||
|
|
||||||
|
asyncTest "don't compile":
|
||||||
|
check not compiles(
|
||||||
|
block:
|
||||||
|
var jobs = newJobQueue[float](taskpool = tp)
|
||||||
|
let job = jobs.submit(addStrings(@["a", "b", "c"]))
|
||||||
|
)
|
||||||
|
|||||||
@ -3,22 +3,19 @@ import std/[tables, strutils, typetraits, macros]
|
|||||||
proc makeProcName*(s: string): string =
|
proc makeProcName*(s: string): string =
|
||||||
result = ""
|
result = ""
|
||||||
for c in s:
|
for c in s:
|
||||||
if c.isAlphaNumeric: result.add c
|
if c.isAlphaNumeric:
|
||||||
|
result.add c
|
||||||
|
|
||||||
proc hasReturnType*(params: NimNode): bool =
|
proc hasReturnType*(params: NimNode): bool =
|
||||||
if params != nil and params.len > 0 and params[0] != nil and
|
if params != nil and params.len > 0 and params[0] != nil and params[0].kind != nnkEmpty:
|
||||||
params[0].kind != nnkEmpty:
|
|
||||||
result = true
|
result = true
|
||||||
|
|
||||||
proc getReturnType*(params: NimNode): NimNode =
|
proc getReturnType*(params: NimNode): NimNode =
|
||||||
if params != nil and params.len > 0 and params[0] != nil and
|
if params != nil and params.len > 0 and params[0] != nil and params[0].kind != nnkEmpty:
|
||||||
params[0].kind != nnkEmpty:
|
|
||||||
result = params[0]
|
result = params[0]
|
||||||
|
|
||||||
proc firstArgument*(params: NimNode): (NimNode, NimNode) =
|
proc firstArgument*(params: NimNode): (NimNode, NimNode) =
|
||||||
if params != nil and
|
if params != nil and params.len > 0 and params[1] != nil and
|
||||||
params.len > 0 and
|
|
||||||
params[1] != nil and
|
|
||||||
params[1].kind == nnkIdentDefs:
|
params[1].kind == nnkIdentDefs:
|
||||||
result = (ident params[1][0].strVal, params[1][1])
|
result = (ident params[1][0].strVal, params[1][1])
|
||||||
else:
|
else:
|
||||||
@ -29,15 +26,13 @@ iterator paramsIter*(params: NimNode): tuple[name, ntype: NimNode] =
|
|||||||
for i in 1 ..< params.len:
|
for i in 1 ..< params.len:
|
||||||
let arg = params[i]
|
let arg = params[i]
|
||||||
let argType = arg[^2]
|
let argType = arg[^2]
|
||||||
for j in 0 ..< arg.len-2:
|
for j in 0 ..< arg.len - 2:
|
||||||
yield (arg[j], argType)
|
yield (arg[j], argType)
|
||||||
|
|
||||||
proc signalTuple*(sig: NimNode): NimNode =
|
proc signalTuple*(sig: NimNode): NimNode =
|
||||||
let otp = nnkEmpty.newTree()
|
let otp = nnkEmpty.newTree()
|
||||||
# echo "signalObjRaw:sig1: ", sig.treeRepr
|
# echo "signalObjRaw:sig1: ", sig.treeRepr
|
||||||
let sigTyp =
|
let sigTyp = if sig.kind == nnkSym: sig.getTypeInst else: sig.getTypeInst
|
||||||
if sig.kind == nnkSym: sig.getTypeInst
|
|
||||||
else: sig.getTypeInst
|
|
||||||
# echo "signalObjRaw:sig2: ", sigTyp.treeRepr
|
# echo "signalObjRaw:sig2: ", sigTyp.treeRepr
|
||||||
let stp =
|
let stp =
|
||||||
if sigTyp.kind == nnkProcTy:
|
if sigTyp.kind == nnkProcTy:
|
||||||
@ -52,14 +47,16 @@ proc signalTuple*(sig: NimNode): NimNode =
|
|||||||
# echo "signalObjRaw:sig: ", stp.repr
|
# echo "signalObjRaw:sig: ", stp.repr
|
||||||
|
|
||||||
var args: seq[NimNode]
|
var args: seq[NimNode]
|
||||||
for i in 2..<stp.len:
|
for i in 2 ..< stp.len:
|
||||||
args.add stp[i]
|
args.add stp[i]
|
||||||
|
|
||||||
result = nnkTupleConstr.newTree()
|
result = nnkTupleConstr.newTree()
|
||||||
if isGeneric:
|
if isGeneric:
|
||||||
template genArgs(n): auto = n[1][1]
|
template genArgs(n): auto =
|
||||||
|
n[1][1]
|
||||||
|
|
||||||
var genKinds: Table[string, NimNode]
|
var genKinds: Table[string, NimNode]
|
||||||
for i in 1..<stp.genArgs.len:
|
for i in 1 ..< stp.genArgs.len:
|
||||||
genKinds[repr stp.genArgs[i]] = otp[i]
|
genKinds[repr stp.genArgs[i]] = otp[i]
|
||||||
for arg in args:
|
for arg in args:
|
||||||
result.add genKinds[arg[1].repr]
|
result.add genKinds[arg[1].repr]
|
||||||
@ -72,19 +69,20 @@ proc signalTuple*(sig: NimNode): NimNode =
|
|||||||
# echo ""
|
# echo ""
|
||||||
if result.len == 0:
|
if result.len == 0:
|
||||||
# result = bindSym"void"
|
# result = bindSym"void"
|
||||||
result = quote do:
|
result = quote:
|
||||||
tuple[]
|
tuple[]
|
||||||
|
|
||||||
proc mkParamsVars*(paramsIdent, paramsType, params: NimNode): NimNode =
|
proc mkParamsVars*(paramsIdent, paramsType, params: NimNode): NimNode =
|
||||||
## Create local variables for each parameter in the actual RPC call proc
|
## Create local variables for each parameter in the actual RPC call proc
|
||||||
if params.isNil: return
|
if params.isNil:
|
||||||
|
return
|
||||||
|
|
||||||
result = newStmtList()
|
result = newStmtList()
|
||||||
var varList = newSeq[NimNode]()
|
var varList = newSeq[NimNode]()
|
||||||
var cnt = 0
|
var cnt = 0
|
||||||
for paramid, paramType in paramsIter(params):
|
for paramid, paramType in paramsIter(params):
|
||||||
let idx = newIntLitNode(cnt)
|
let idx = newIntLitNode(cnt)
|
||||||
let vars = quote do:
|
let vars = quote:
|
||||||
var `paramid`: `paramType` = `paramsIdent`[`idx`]
|
var `paramid`: `paramType` = `paramsIdent`[`idx`]
|
||||||
varList.add vars
|
varList.add vars
|
||||||
cnt.inc()
|
cnt.inc()
|
||||||
@ -106,9 +104,10 @@ proc mkParamsType*(paramsIdent, paramsType, params, genericParams: NimNode): Nim
|
|||||||
##
|
##
|
||||||
## proc multiplyrpc(params: RpcType_multiplyrpc): int =
|
## proc multiplyrpc(params: RpcType_multiplyrpc): int =
|
||||||
##
|
##
|
||||||
if params.isNil: return
|
if params.isNil:
|
||||||
|
return
|
||||||
|
|
||||||
var tup = quote do:
|
var tup = quote:
|
||||||
type `paramsType` = tuple[]
|
type `paramsType` = tuple[]
|
||||||
for paramIdent, paramType in paramsIter(params):
|
for paramIdent, paramType in paramsIter(params):
|
||||||
# processing multiple variables of one type
|
# processing multiple variables of one type
|
||||||
@ -129,7 +128,8 @@ proc procIdentAppend*(id: NimNode, name: string): NimNode =
|
|||||||
|
|
||||||
proc mkCall*(callName, params: NimNode): NimNode =
|
proc mkCall*(callName, params: NimNode): NimNode =
|
||||||
## Create local variables for each parameter in the actual RPC call proc
|
## Create local variables for each parameter in the actual RPC call proc
|
||||||
if params.isNil: return
|
if params.isNil:
|
||||||
|
return
|
||||||
var argList = newSeq[NimNode]()
|
var argList = newSeq[NimNode]()
|
||||||
for paramId, paramType in paramsIter(params):
|
for paramId, paramType in paramsIter(params):
|
||||||
argList.add paramId
|
argList.add paramId
|
||||||
@ -138,9 +138,10 @@ proc mkCall*(callName, params: NimNode): NimNode =
|
|||||||
|
|
||||||
proc mkProc*(name, params, body: NimNode): NimNode =
|
proc mkProc*(name, params, body: NimNode): NimNode =
|
||||||
let args = params.copyNimTree()
|
let args = params.copyNimTree()
|
||||||
result = quote do:
|
result = quote:
|
||||||
proc `name`() {.nimcall.} =
|
proc `name`() {.nimcall.} =
|
||||||
`body`
|
`body`
|
||||||
|
|
||||||
result[3].del(0)
|
result[3].del(0)
|
||||||
for arg in args:
|
for arg in args:
|
||||||
result.params.add arg
|
result.params.add arg
|
||||||
|
|||||||
30
src/apatheia/memretainers.nim
Normal file
30
src/apatheia/memretainers.nim
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
import std/tables
|
||||||
|
|
||||||
|
import ./types
|
||||||
|
export types
|
||||||
|
|
||||||
|
type
|
||||||
|
Retainer* = ref object of RootObj
|
||||||
|
|
||||||
|
SeqRetainer*[T] = ref object of Retainer
|
||||||
|
data*: seq[T]
|
||||||
|
|
||||||
|
StrRetainer* = ref object of Retainer
|
||||||
|
data*: string
|
||||||
|
|
||||||
|
var memoryRetainerTable = newTable[uint, seq[Retainer]]()
|
||||||
|
|
||||||
|
proc retainMemory*(id: JobId, mem: Retainer) {.gcsafe, raises: [].} =
|
||||||
|
{.cast(gcsafe).}:
|
||||||
|
memoryRetainerTable[].withValue(id, value):
|
||||||
|
value[].add(mem)
|
||||||
|
do:
|
||||||
|
memoryRetainerTable[id] = @[mem]
|
||||||
|
|
||||||
|
proc releaseMemory*(id: JobId) {.gcsafe, raises: [].} =
|
||||||
|
{.cast(gcsafe).}:
|
||||||
|
memoryRetainerTable.del(id)
|
||||||
|
|
||||||
|
proc retainedMemoryCount*(): int {.gcsafe, raises: [].} =
|
||||||
|
{.cast(gcsafe).}:
|
||||||
|
memoryRetainerTable.len()
|
||||||
@ -10,23 +10,23 @@ export options
|
|||||||
export threadsync
|
export threadsync
|
||||||
export chronos
|
export chronos
|
||||||
|
|
||||||
type
|
type ChanPtr[T] = ptr Channel[T]
|
||||||
ChanPtr[T] = ptr Channel[T]
|
|
||||||
|
|
||||||
proc allocSharedChannel[T](): ChanPtr[T] =
|
proc allocSharedChannel[T](): ChanPtr[T] =
|
||||||
cast[ChanPtr[T]](allocShared0(sizeof(Channel[T])))
|
cast[ChanPtr[T]](allocShared0(sizeof(Channel[T])))
|
||||||
|
|
||||||
type
|
type SignalQueue*[T] = object
|
||||||
SignalQueue*[T] = object
|
signal: ThreadSignalPtr
|
||||||
signal: ThreadSignalPtr
|
chan*: ChanPtr[T]
|
||||||
chan*: ChanPtr[T]
|
|
||||||
|
|
||||||
proc dispose*[T](val: SignalQueue[T]) =
|
proc dispose*[T](val: SignalQueue[T]) =
|
||||||
## Call to properly dispose of a SignalQueue.
|
## Call to properly dispose of a SignalQueue.
|
||||||
deallocShared(val.chan)
|
deallocShared(val.chan)
|
||||||
discard val.signal.close()
|
discard val.signal.close()
|
||||||
|
|
||||||
proc newSignalQueue*[T](maxItems: int = 0): SignalQueue[T] {.raises: [ApatheiaSignalErr].} =
|
proc newSignalQueue*[T](
|
||||||
|
maxItems: int = 0
|
||||||
|
): SignalQueue[T] {.raises: [ApatheiaSignalErr].} =
|
||||||
## Create a signal queue compatible with Chronos async.
|
## Create a signal queue compatible with Chronos async.
|
||||||
let res = ThreadSignalPtr.new()
|
let res = ThreadSignalPtr.new()
|
||||||
if res.isErr():
|
if res.isErr():
|
||||||
|
|||||||
@ -1,4 +1,3 @@
|
|||||||
|
|
||||||
import std/[macros, strutils]
|
import std/[macros, strutils]
|
||||||
|
|
||||||
import macroutils
|
import macroutils
|
||||||
@ -6,14 +5,38 @@ import macroutils
|
|||||||
import jobs
|
import jobs
|
||||||
export jobs
|
export jobs
|
||||||
|
|
||||||
# TODO: make these do something useful or remove them
|
## Tasks provide a convenience wrapper for using the jobs module. It also
|
||||||
|
## provides some extra conveniences like handling a subset of `openArray[T]`
|
||||||
|
## types in a safe manner using `OpenArrayHolder[T]` type.
|
||||||
|
##
|
||||||
|
## The `asyncTask` macro works by creating a wrapper proc around the
|
||||||
|
## annotated user proc. The transformation looks similar to:
|
||||||
|
##
|
||||||
|
## .. code-block::
|
||||||
|
## proc doHashes*(data: openArray[byte], opts: HashOptions): float {.asyncTask.} =
|
||||||
|
## result = 10.0
|
||||||
|
##
|
||||||
|
##
|
||||||
|
## .. code-block::
|
||||||
|
## proc doHashesTasklet*(data: openArray[byte]; opts: HashOptions): float {.nimcall.} =
|
||||||
|
## result = 10.0
|
||||||
|
##
|
||||||
|
## proc doHashes*(jobResult: JobResult[float]; data: OpenArrayHolder[byte];
|
||||||
|
## opts: HashOptions) {.nimcall.} =
|
||||||
|
## let val {.inject.} = doHashesTasklet(convertParamType(data),
|
||||||
|
## convertParamType(opts))
|
||||||
|
## discard jobResult.queue.send((jobResult.id, val))
|
||||||
|
##
|
||||||
|
## Paramters with type of `openArray[T]` have special support and are converted
|
||||||
|
## into the `OpenArrayHolder[T]` type from the jobs module. See the jobs module
|
||||||
|
## for more information.
|
||||||
|
##
|
||||||
|
template convertParamType*[T](obj: OpenArrayHolder[T]): auto =
|
||||||
|
static:
|
||||||
|
echo "CONVERTPARAMTYPE:: ", $typeof(obj)
|
||||||
|
obj.toOpenArray()
|
||||||
|
|
||||||
template checkParamType*(obj: object): auto =
|
template convertParamType*(obj: typed): auto =
|
||||||
# for name, field in obj.fieldPairs():
|
|
||||||
# echo "field name: ", name
|
|
||||||
obj
|
|
||||||
|
|
||||||
template checkParamType*(obj: typed): auto =
|
|
||||||
obj
|
obj
|
||||||
|
|
||||||
macro asyncTask*(p: untyped): untyped =
|
macro asyncTask*(p: untyped): untyped =
|
||||||
@ -30,33 +53,43 @@ macro asyncTask*(p: untyped): untyped =
|
|||||||
# pragmas = p[4]
|
# pragmas = p[4]
|
||||||
body = p[6]
|
body = p[6]
|
||||||
name = repr(procId).strip(false, true, {'*'})
|
name = repr(procId).strip(false, true, {'*'})
|
||||||
|
|
||||||
if not hasReturnType(params):
|
if not hasReturnType(params):
|
||||||
error("tasklet definition must have return type", p)
|
error("tasklet definition must have return type", p)
|
||||||
|
|
||||||
# setup inner tasklet proc
|
# setup inner tasklet proc
|
||||||
let tp = mkProc(procId.procIdentAppend("Tasklet"),
|
let tp = mkProc(procId.procIdentAppend("Tasklet"), params, body)
|
||||||
params, body)
|
|
||||||
|
|
||||||
# setup async wrapper code
|
# setup async wrapper code
|
||||||
var asyncBody = newStmtList()
|
var asyncBody = newStmtList()
|
||||||
let tcall = newCall(ident(name & "Tasklet"))
|
let tcall = newCall(ident(name & "Tasklet"))
|
||||||
for paramId, paramType in paramsIter(params):
|
for paramId, paramType in paramsIter(params):
|
||||||
tcall.add newCall("checkParamType", paramId)
|
tcall.add newCall("convertParamType", paramId)
|
||||||
asyncBody = quote do:
|
asyncBody = quote:
|
||||||
let val {.inject.} = `tcall`
|
let val {.inject.} = `tcall`
|
||||||
discard jobResult.queue.send((jobResult.id, val,))
|
discard jobResult.queue.send((jobResult.id, val))
|
||||||
|
|
||||||
|
let retType =
|
||||||
|
if not hasReturnType(params):
|
||||||
|
ident"void"
|
||||||
|
else:
|
||||||
|
params.getReturnType()
|
||||||
|
|
||||||
var asyncParams = params.copyNimTree()
|
|
||||||
let retType = if not hasReturnType(params): ident"void"
|
|
||||||
else: params.getReturnType()
|
|
||||||
let jobArg = nnkIdentDefs.newTree(
|
let jobArg = nnkIdentDefs.newTree(
|
||||||
ident"jobResult",
|
ident"jobResult", nnkBracketExpr.newTree(ident"JobResult", retType), newEmptyNode()
|
||||||
nnkBracketExpr.newTree(ident"JobResult", retType),
|
|
||||||
newEmptyNode()
|
|
||||||
)
|
)
|
||||||
asyncParams[0] = newEmptyNode()
|
var asyncParams = nnkFormalParams.newTree()
|
||||||
asyncParams.insert(1, jobArg)
|
asyncParams.add newEmptyNode()
|
||||||
|
asyncParams.add jobArg
|
||||||
|
for i, p in params[1 ..^ 1]:
|
||||||
|
let pt = p[1]
|
||||||
|
if pt.kind == nnkBracketExpr and pt[0].repr == "openArray":
|
||||||
|
# special case openArray to support special OpenArrayHolder from jobs module
|
||||||
|
p[1] = nnkBracketExpr.newTree(ident"OpenArrayHolder", pt[1])
|
||||||
|
asyncParams.add p
|
||||||
|
else:
|
||||||
|
asyncParams.add p
|
||||||
|
|
||||||
let fn = mkProc(procId, asyncParams, asyncBody)
|
let fn = mkProc(procId, asyncParams, asyncBody)
|
||||||
|
|
||||||
result = newStmtList()
|
result = newStmtList()
|
||||||
@ -67,13 +100,9 @@ macro asyncTask*(p: untyped): untyped =
|
|||||||
echo "asyncTask:body:\n", result.repr
|
echo "asyncTask:body:\n", result.repr
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
|
type HashOptions* = object
|
||||||
|
striped*: bool
|
||||||
|
|
||||||
type
|
proc doHashes*(data: openArray[byte], opts: HashOptions): float {.asyncTask.} =
|
||||||
HashOptions* = object
|
|
||||||
striped*: bool
|
|
||||||
|
|
||||||
proc doHashes2*(data: openArray[byte],
|
|
||||||
opts: HashOptions): float {.asyncTask.} =
|
|
||||||
echo "hashing"
|
echo "hashing"
|
||||||
|
result = 10.0
|
||||||
|
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
|
|
||||||
type
|
type
|
||||||
ApatheiaException* = object of CatchableError
|
ApatheiaException* = object of CatchableError
|
||||||
ApatheiaSignalErr* = object of ApatheiaException
|
ApatheiaSignalErr* = object of ApatheiaException
|
||||||
|
|
||||||
|
JobId* = uint ## job id, should match `future.id()`
|
||||||
|
|||||||
@ -8,6 +8,7 @@ import taskpools
|
|||||||
|
|
||||||
import apatheia/queues
|
import apatheia/queues
|
||||||
import apatheia/jobs
|
import apatheia/jobs
|
||||||
|
import apatheia/memretainers
|
||||||
|
|
||||||
proc addNumsRaw(a, b: float): float =
|
proc addNumsRaw(a, b: float): float =
|
||||||
os.sleep(50)
|
os.sleep(50)
|
||||||
@ -17,10 +18,29 @@ proc addNums(jobResult: JobResult[float], a, b: float) =
|
|||||||
let res = addNumsRaw(a, b)
|
let res = addNumsRaw(a, b)
|
||||||
discard jobResult.queue.send((jobResult.id, res,))
|
discard jobResult.queue.send((jobResult.id, res,))
|
||||||
|
|
||||||
|
proc addNumsIncorrect(jobResult: JobResult[float], vals: openArray[float]): float =
|
||||||
|
discard
|
||||||
|
|
||||||
|
proc addNumValues(jobResult: JobResult[float], base: float, vals: OpenArrayHolder[float]) =
|
||||||
|
os.sleep(100)
|
||||||
|
var res = base
|
||||||
|
for x in vals.toOpenArray():
|
||||||
|
res += x
|
||||||
|
discard jobResult.queue.send((jobResult.id, res,))
|
||||||
|
|
||||||
|
proc strCompute(jobResult: JobResult[int], vals: OpenArrayHolder[char]) =
|
||||||
|
discard jobResult.queue.send((jobResult.id, vals.size,))
|
||||||
|
|
||||||
|
proc addStrings(jobResult: JobResult[float], vals: OpenArrayHolder[string]) =
|
||||||
|
discard
|
||||||
|
|
||||||
suite "async tests":
|
suite "async tests":
|
||||||
|
|
||||||
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
||||||
|
|
||||||
|
asyncTest "cannot return value":
|
||||||
|
check not compiles(await jobs.submit(addNums(1.0, 2.0,)))
|
||||||
|
|
||||||
asyncTest "test":
|
asyncTest "test":
|
||||||
var jobs = newJobQueue[float](taskpool = tp)
|
var jobs = newJobQueue[float](taskpool = tp)
|
||||||
|
|
||||||
@ -28,3 +48,30 @@ suite "async tests":
|
|||||||
|
|
||||||
check res == 3.0
|
check res == 3.0
|
||||||
|
|
||||||
|
asyncTest "testing seq":
|
||||||
|
var jobs = newJobQueue[float](taskpool = tp)
|
||||||
|
let res = await jobs.submit(addNumValues(10.0, @[1.0.float, 2.0]))
|
||||||
|
check res == 13.0
|
||||||
|
|
||||||
|
asyncTest "testing string":
|
||||||
|
var jobs = newJobQueue[int](taskpool = tp)
|
||||||
|
let res = await jobs.submit(strCompute("hello world!"))
|
||||||
|
check res == 12
|
||||||
|
|
||||||
|
asyncTest "testing arrays":
|
||||||
|
var jobs = newJobQueue[float](taskpool = tp)
|
||||||
|
let fut1 = jobs.submit(addNumValues(10.0, @[1.0.float, 2.0]))
|
||||||
|
let fut2 = jobs.submit(addNumValues(20.0, @[3.0.float, 4.0]))
|
||||||
|
check retainedMemoryCount() == 2
|
||||||
|
let res1 = await fut1
|
||||||
|
let res2 = await fut2
|
||||||
|
check res1 == 13.0
|
||||||
|
check res2 == 27.0
|
||||||
|
check retainedMemoryCount() == 0
|
||||||
|
|
||||||
|
asyncTest "don't compile":
|
||||||
|
check not compiles(
|
||||||
|
block:
|
||||||
|
var jobs = newJobQueue[float](taskpool = tp)
|
||||||
|
let job = jobs.submit(addStrings(@["a", "b", "c"]))
|
||||||
|
)
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import chronos/unittest2/asynctests
|
|||||||
import taskpools
|
import taskpools
|
||||||
|
|
||||||
import apatheia/tasks
|
import apatheia/tasks
|
||||||
|
import apatheia/memretainers
|
||||||
|
|
||||||
proc addNums(a, b: float): float {.asyncTask.} =
|
proc addNums(a, b: float): float {.asyncTask.} =
|
||||||
os.sleep(50)
|
os.sleep(50)
|
||||||
@ -17,6 +18,11 @@ proc addNumValues(vals: openArray[float]): float {.asyncTask.} =
|
|||||||
for x in vals:
|
for x in vals:
|
||||||
result += x
|
result += x
|
||||||
|
|
||||||
|
proc strCompute(val: openArray[char]): int {.asyncTask.} =
|
||||||
|
## note includes null terminator!
|
||||||
|
return val.len()
|
||||||
|
|
||||||
|
|
||||||
suite "async tests":
|
suite "async tests":
|
||||||
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
||||||
var jobsVar = newJobQueue[float](taskpool = tp)
|
var jobsVar = newJobQueue[float](taskpool = tp)
|
||||||
@ -35,3 +41,19 @@ suite "async tests":
|
|||||||
let args = @[1.0, 2.0, 3.0]
|
let args = @[1.0, 2.0, 3.0]
|
||||||
let res = await jobs.submit(addNumValues(args))
|
let res = await jobs.submit(addNumValues(args))
|
||||||
check res == 6.0
|
check res == 6.0
|
||||||
|
|
||||||
|
asyncTest "test strCompute":
|
||||||
|
var jobs = newJobQueue[int](taskpool = tp)
|
||||||
|
let res = await jobs.submit(strCompute("hello world!"))
|
||||||
|
check res == 13 # note includes cstring null terminator
|
||||||
|
|
||||||
|
asyncTest "testing openArrays":
|
||||||
|
var jobs = newJobQueue[float](taskpool = tp)
|
||||||
|
let fut1 = jobs.submit(addNumValues(@[1.0.float, 2.0]))
|
||||||
|
let fut2 = jobs.submit(addNumValues(@[3.0.float, 4.0]))
|
||||||
|
check retainedMemoryCount() == 2
|
||||||
|
let res1 = await fut1
|
||||||
|
let res2 = await fut2
|
||||||
|
check res1 == 3.0
|
||||||
|
check res2 == 7.0
|
||||||
|
check retainedMemoryCount() == 0
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user