Initial Setup (#1)

* asyncTaskMacro
* checks
* taskpools examples
* setup examples
* queues
* example workers future complete
* fire signal
* setup jobs
* logging
* cleanup
* docs
This commit is contained in:
Jaremy Creechley 2024-02-14 22:14:20 -07:00 committed by GitHub
parent ca276ef47a
commit 5c3b379885
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 586 additions and 33 deletions

4
.gitignore vendored
View File

@ -1 +1,5 @@
*
!*/
!*.*
.tool-versions
nim.cfg

20
README.md Normal file
View File

@ -0,0 +1,20 @@
# Apatheia
> *Apatheia* (*Greek: ἀπάθεια; from a- "without" and pathos "suffering" or "passion"*), in Stoicism, refers to a state of mind in which one is not disturbed by the passions. It might better be translated by the word equanimity than the word indifference.
WIP utilities for using Chronos async with threading. The desire is to provide safe, pre-tested constructs for using threads with async.
Goals:
- support orc and refc
+ refc may require extra copying for data
- use event queues (e.g. channels) to/from thread pool
+ make it easy to monitor and debug queue capacity
+ only use minimal AsyncFD handles
+ lessen pressure on the main chronos futures pending queue
- support backpressure at futures level
- benchmarking overhead
- special support for seq[byte]'s and strings with zero-copy
+ implement special but limited support zero-copy arguments on refc

View File

@ -6,7 +6,10 @@ description = "Async support for threading primitives"
license = "MIT"
srcDir = "src"
# Dependencies
requires "nim >= 1.6.18"
requires "chronos >= 4.0.0"
requires "threading"
requires "taskpools >= 0.0.5"
requires "chronicles"

2
config.nims Normal file
View File

@ -0,0 +1,2 @@
--threads:on

View File

@ -1,7 +1,3 @@
# This is just an example to get you started. A typical library package
# exports the main API in this file. Note that you cannot rename this file
# but you can remove it if you wish.
proc add*(x, y: int): int =
## Adds two numbers together.
return x + y
import apatheia/tasks
export tasks

90
src/apatheia/jobs.nim Normal file
View File

@ -0,0 +1,90 @@
import std/tables
import std/macros
import ./queues
import taskpools
import chronos
import chronicles
export queues
export chronicles
logScope:
# Lexical properties are typically assigned to a constant:
topics = "apatheia jobs"
## This module provides a simple way to submit jobs to taskpools
## and getting a result returned via an async future.
##
##
type
JobId* = uint ## job id, should match `future.id()`
JobQueue*[T] = ref object
## job queue object
queue*: SignalQueue[(JobId, T)]
futures*: Table[JobId, Future[T]]
taskpool*: Taskpool
running*: bool
JobResult*[T] = object
## hold a job result to be returned by jobs
id*: JobId
queue*: SignalQueue[(JobId, T)]
proc processJobs*[T](jobs: JobQueue[T]) {.async.} =
## Starts a "detached" async processor for a given job queue.
##
## This processor waits for events from the queue in the JobQueue
## and complete the associated futures.
const tn: string = $(JobQueue[T])
info "Processing jobs in job queue for type ", type=tn
while jobs.running:
let res = await(jobs.queue.wait()).get()
trace "got job result", jobResult = $res
let (id, ret) = res
var fut: Future[T]
if jobs.futures.pop(id, fut):
fut.complete(ret)
else:
raise newException(IndexDefect, "missing future: " & $id)
info "Finishing processing jobs for type ", type=tn
proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) =
## Creates a future that returns the result of the associated job.
let fut = newFuture[T](name)
let id = JobId fut.id()
jobs.futures[id] = fut
trace "jobs added: ", numberJobs = jobs.futures.len()
return (JobResult[T](id: id, queue: jobs.queue), fut, )
proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} =
## Creates a new async-compatible threaded job queue.
result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true)
asyncSpawn(processJobs(result))
macro submitMacro(tp: untyped, jobs: untyped, exp: untyped): untyped =
## modifies the call expression to include the job queue and
## the job id parameters
let jobRes = genSym(nskLet, "jobRes")
let futName = genSym(nskLet, "fut")
let nm = newLit(repr(exp))
var fncall = nnkCall.newTree(exp[0])
fncall.add(jobRes)
for p in exp[1..^1]: fncall.add(p)
result = quote do:
let (`jobRes`, `futName`) = createFuture(`jobs`, `nm`)
`jobs`.taskpool.spawn(`fncall`)
`futName`
# echo "submit: res:\n", result.repr
# echo ""
template submit*[T](jobs: JobQueue[T], exp: untyped): Future[T] =
submitMacro(T, jobs, exp)

146
src/apatheia/macroutils.nim Normal file
View File

@ -0,0 +1,146 @@
import std/[tables, strutils, typetraits, macros]
proc makeProcName*(s: string): string =
result = ""
for c in s:
if c.isAlphaNumeric: result.add c
proc hasReturnType*(params: NimNode): bool =
if params != nil and params.len > 0 and params[0] != nil and
params[0].kind != nnkEmpty:
result = true
proc getReturnType*(params: NimNode): NimNode =
if params != nil and params.len > 0 and params[0] != nil and
params[0].kind != nnkEmpty:
result = params[0]
proc firstArgument*(params: NimNode): (NimNode, NimNode) =
if params != nil and
params.len > 0 and
params[1] != nil and
params[1].kind == nnkIdentDefs:
result = (ident params[1][0].strVal, params[1][1])
else:
result = (ident "", newNimNode(nnkEmpty))
iterator paramsIter*(params: NimNode): tuple[name, ntype: NimNode] =
## iterators through the parameters
for i in 1 ..< params.len:
let arg = params[i]
let argType = arg[^2]
for j in 0 ..< arg.len-2:
yield (arg[j], argType)
proc signalTuple*(sig: NimNode): NimNode =
let otp = nnkEmpty.newTree()
# echo "signalObjRaw:sig1: ", sig.treeRepr
let sigTyp =
if sig.kind == nnkSym: sig.getTypeInst
else: sig.getTypeInst
# echo "signalObjRaw:sig2: ", sigTyp.treeRepr
let stp =
if sigTyp.kind == nnkProcTy:
sig.getTypeInst[0]
else:
sigTyp.params()
let isGeneric = false
# echo "signalObjRaw:obj: ", otp.repr
# echo "signalObjRaw:obj:tr: ", otp.treeRepr
# echo "signalObjRaw:obj:isGen: ", otp.kind == nnkBracketExpr
# echo "signalObjRaw:sig: ", stp.repr
var args: seq[NimNode]
for i in 2..<stp.len:
args.add stp[i]
result = nnkTupleConstr.newTree()
if isGeneric:
template genArgs(n): auto = n[1][1]
var genKinds: Table[string, NimNode]
for i in 1..<stp.genArgs.len:
genKinds[repr stp.genArgs[i]] = otp[i]
for arg in args:
result.add genKinds[arg[1].repr]
else:
# genKinds
# echo "ARGS: ", args.repr
for arg in args:
result.add arg[1]
# echo "ARG: ", result.repr
# echo ""
if result.len == 0:
# result = bindSym"void"
result = quote do:
tuple[]
proc mkParamsVars*(paramsIdent, paramsType, params: NimNode): NimNode =
## Create local variables for each parameter in the actual RPC call proc
if params.isNil: return
result = newStmtList()
var varList = newSeq[NimNode]()
var cnt = 0
for paramid, paramType in paramsIter(params):
let idx = newIntLitNode(cnt)
let vars = quote do:
var `paramid`: `paramType` = `paramsIdent`[`idx`]
varList.add vars
cnt.inc()
result.add varList
# echo "paramsSetup return:\n", treeRepr result
proc mkParamsType*(paramsIdent, paramsType, params, genericParams: NimNode): NimNode =
## Create a type that represents the arguments for this rpc call
##
## Example:
##
## proc multiplyrpc(a, b: int): int {.rpc.} =
## result = a * b
##
## Becomes:
## proc multiplyrpc(params: RpcType_multiplyrpc): int =
## var a = params.a
## var b = params.b
##
## proc multiplyrpc(params: RpcType_multiplyrpc): int =
##
if params.isNil: return
var tup = quote do:
type `paramsType` = tuple[]
for paramIdent, paramType in paramsIter(params):
# processing multiple variables of one type
tup[0][2].add newIdentDefs(paramIdent, paramType)
result = tup
result[0][1] = genericParams.copyNimTree()
# echo "mkParamsType: ", genericParams.treeRepr
proc identPub*(name: string): NimNode =
result = nnkPostfix.newTree(newIdentNode("*"), ident name)
proc procIdentAppend*(id: NimNode, name: string): NimNode =
result = id.copyNimTree()
if id.kind == nnkPostfix:
result[1] = ident(result[1].strVal & name)
else:
result = ident(id.strVal & name)
proc mkCall*(callName, params: NimNode): NimNode =
## Create local variables for each parameter in the actual RPC call proc
if params.isNil: return
var argList = newSeq[NimNode]()
for paramId, paramType in paramsIter(params):
argList.add paramId
result = newCall(callName, argList)
# echo "mkCall return:\n", treeRepr result
proc mkProc*(name, params, body: NimNode): NimNode =
let args = params.copyNimTree()
result = quote do:
proc `name`() {.nimcall.} =
`body`
result[3].del(0)
for arg in args:
result.params.add arg

76
src/apatheia/queues.nim Normal file
View File

@ -0,0 +1,76 @@
import std/options
import ./types
import results
import chronos
import chronos/threadsync
export types
export options
export threadsync
export chronos
type
ChanPtr[T] = ptr Channel[T]
proc allocSharedChannel[T](): ChanPtr[T] =
cast[ChanPtr[T]](allocShared0(sizeof(Channel[T])))
type
SignalQueue*[T] = object
signal: ThreadSignalPtr
chan*: ChanPtr[T]
proc dispose*[T](val: SignalQueue[T]) =
## Call to properly dispose of a SignalQueue.
deallocShared(val.chan)
discard val.signal.close()
proc newSignalQueue*[T](maxItems: int = 0): SignalQueue[T] {.raises: [ApatheiaSignalErr].} =
## Create a signal queue compatible with Chronos async.
let res = ThreadSignalPtr.new()
if res.isErr():
raise newException(ApatheiaSignalErr, res.error())
result.signal = res.get()
result.chan = allocSharedChannel[T]()
result.chan[].open(maxItems)
proc send*[T](c: SignalQueue[T], msg: sink T): Result[void, string] {.raises: [].} =
## Sends a message to a thread. `msg` is copied.
## Note: currently non-blocking but future iterations may become blocking.
##
try:
c.chan[].send(msg)
except Exception as exc:
result = err exc.msg
let res = c.signal.fireSync()
if res.isErr():
let msg: string = res.error()
result = err msg
result = ok()
proc trySend*[T](c: SignalQueue[T], msg: sink T): bool =
## Trys to sends a message to a thread. `msg` is copied. Non-blocking.
result = c.chan.trySend(msg)
if result:
c.signal.fireSync()
proc recv*[T](c: SignalQueue[T]): Result[T, string] =
## Receive item from queue, blocking.
try:
result = ok c.chan[].recv()
except Exception as exc:
result = err exc.msg
proc tryRecv*[T](c: SignalQueue[T]): Option[T] =
## Try to receive item from queue, non-blocking.
let res = c.chan.recv()
if res.dataAvailable:
some res.msg
proc wait*[T](c: SignalQueue[T]): Future[Result[T, string]] {.async.} =
## Async compatible receive from queue. Pauses async execution until
## an item is received from the queue
await wait(c.signal)
return c.recv()

View File

@ -1,12 +0,0 @@
# This is just an example to get you started. Users of your library will
# import this file by writing ``import apatheia/submodule``. Feel free to rename or
# remove this file altogether. You may create additional modules alongside
# this file as required.
type
Submodule* = object
name*: string
proc initSubmodule*(): Submodule =
## Initialises a new ``Submodule`` object.
Submodule(name: "Anonymous")

79
src/apatheia/tasks.nim Normal file
View File

@ -0,0 +1,79 @@
import std/[macros, strutils]
import macroutils
import jobs
export jobs
# TODO: make these do something useful or remove them
template checkParamType*(obj: object): auto =
# for name, field in obj.fieldPairs():
# echo "field name: ", name
obj
template checkParamType*(obj: typed): auto =
obj
macro asyncTask*(p: untyped): untyped =
## Pragma to transfer a proc into a "tasklet" which runs
## the proc body in a separate thread and returns the result
## in an async compatible manner.
##
let
procId = p[0]
# procLineInfo = p.lineInfoObj
# genericParams = p[2]
params = p[3]
# pragmas = p[4]
body = p[6]
name = repr(procId).strip(false, true, {'*'})
if not hasReturnType(params):
error("tasklet definition must have return type", p)
# setup inner tasklet proc
let tp = mkProc(procId.procIdentAppend("Tasklet"),
params, body)
# setup async wrapper code
var asyncBody = newStmtList()
let tcall = newCall(ident(name & "Tasklet"))
for paramId, paramType in paramsIter(params):
tcall.add newCall("checkParamType", paramId)
asyncBody = quote do:
let val {.inject.} = `tcall`
discard jobResult.queue.send((jobResult.id, val,))
var asyncParams = params.copyNimTree()
let retType = if not hasReturnType(params): ident"void"
else: params.getReturnType()
let jobArg = nnkIdentDefs.newTree(
ident"jobResult",
nnkBracketExpr.newTree(ident"JobResult", retType),
newEmptyNode()
)
asyncParams[0] = newEmptyNode()
asyncParams.insert(1, jobArg)
let fn = mkProc(procId, asyncParams, asyncBody)
result = newStmtList()
result.add tp
result.add fn
when isMainModule:
echo "asyncTask:body:\n", result.repr
when isMainModule:
type
HashOptions* = object
striped*: bool
proc doHashes2*(data: openArray[byte],
opts: HashOptions): float {.asyncTask.} =
echo "hashing"

4
src/apatheia/types.nim Normal file
View File

@ -0,0 +1,4 @@
type
ApatheiaException* = object of CatchableError
ApatheiaSignalErr* = object of ApatheiaException

View File

@ -1 +1,4 @@
switch("path", "$projectDir/../src")
switch("path", "$projectDir/../src")
--threads:on
--mm:refc

63
tests/exPools.nim Normal file
View File

@ -0,0 +1,63 @@
import std/[strutils, math, cpuinfo]
import taskpools
# From https://github.com/nim-lang/Nim/blob/v1.6.2/tests/parallel/tpi.nim
# Leibniz Formula https://en.wikipedia.org/wiki/Leibniz_formula_for_%CF%80
proc term(k: int): float =
if k mod 2 == 1:
-4'f / float(2*k + 1)
else:
4'f / float(2*k + 1)
proc piApprox(tp: Taskpool, n: int): float =
var pendingFuts = newSeq[FlowVar[float]](n)
for k in 0 ..< pendingFuts.len:
pendingFuts[k] = tp.spawn term(k) # Schedule a task on the threadpool a return a handle to retrieve the result.
for k in 0 ..< pendingFuts.len:
result += sync pendingFuts[k] # Block until the result is available.
proc main() =
var n = 1_000
var nthreads = countProcessors()
var tp = Taskpool.new(num_threads = nthreads) # Default to the number of hardware threads.
echo formatFloat(tp.piApprox(n))
tp.syncAll() # Block until all pending tasks are processed (implied in tp.shutdown())
tp.shutdown()
# Compile with nim c -r -d:release --threads:on --outdir:build example.nim
main()
when false:
type
ScratchObj_486539477 = object
k: int
fut: Flowvar[float]
let scratch_486539455 = cast[ptr ScratchObj_486539477](c_calloc(csize_t 1,
csize_t sizeof(ScratchObj_486539477)))
if scratch_486539455.isNil:
raise newException(OutOfMemDefect, "Could not allocate memory")
block:
var isoTemp_486539473 = isolate(k)
scratch_486539455.k = extract(isoTemp_486539473)
var isoTemp_486539475 = isolate(fut)
scratch_486539455.fut = extract(isoTemp_486539475)
proc taskpool_term_486539478(args: pointer) {.gcsafe, nimcall,
raises: [].} =
let objTemp_486539472 = cast[ptr ScratchObj_486539477](args)
let k_486539474 = objTemp_486539472.k
let fut_486539476 = objTemp_486539472.fut
taskpool_term(k = k_486539474, fut = fut_486539476)
proc destroyScratch_486539479(args: pointer) {.gcsafe, nimcall,
raises: [].} =
let obj_486539480 = cast[ptr ScratchObj_486539477](args)
`=destroy`(obj_486539480[])
Task(callback: taskpool_term_486539478, args: scratch_486539455,
destroy: destroyScratch_486539479)

View File

@ -1,12 +0,0 @@
# This is just an example to get you started. You may wish to put all of your
# tests into a single file, or separate them into multiple `test1`, `test2`
# etc. files (better names are recommended, just make sure the name starts with
# the letter 't').
#
# To run these tests, simply execute `nimble test`.
import unittest
import apatheia
test "can add":
check add(5, 5) == 10

30
tests/tjobs.nim Normal file
View File

@ -0,0 +1,30 @@
import std/os
import chronicles
import chronos
import chronos/threadsync
import chronos/unittest2/asynctests
import taskpools
import apatheia/queues
import apatheia/jobs
proc addNumsRaw(a, b: float): float =
os.sleep(50)
return a + b
proc addNums(jobResult: JobResult[float], a, b: float) =
let res = addNumsRaw(a, b)
discard jobResult.queue.send((jobResult.id, res,))
suite "async tests":
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
asyncTest "test":
var jobs = newJobQueue[float](taskpool = tp)
let res = await jobs.submit(addNums(1.0, 2.0,))
check res == 3.0

28
tests/tqueues.nim Normal file
View File

@ -0,0 +1,28 @@
import std/os
import chronos
import chronos/threadsync
import chronos/unittest2/asynctests
import taskpools
import apatheia/queues
proc addNums(a, b: float, queue: SignalQueue[float]) =
os.sleep(50)
discard queue.send(a + b)
suite "async tests":
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
var queue = newSignalQueue[float]()
asyncTest "test":
## init
tp.spawn addNums(1.0, 2.0, queue)
let res = await wait(queue).wait(1500.milliseconds)
check res.get() == 3.0

33
tests/ttasks.nim Normal file
View File

@ -0,0 +1,33 @@
import std/os
import chronos
import chronos/threadsync
import chronos/unittest2/asynctests
import taskpools
import apatheia/queues
import apatheia/tasks
proc addNums(a, b: float): float {.asyncTask.} =
os.sleep(50)
return a + b
proc addNumValues(vals: openArray[float]): float {.asyncTask.} =
os.sleep(100)
result = 0.0
for x in vals:
result += x
suite "async tests":
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
asyncTest "test addNums":
var jobs = newJobQueue[float](taskpool = tp)
let res = await jobs.submit(addNums(1.0, 2.0,))
check res == 3.0
asyncTest "test addNumValues":
var jobs = newJobQueue[float](taskpool = tp)
let args = @[1.0, 2.0, 3.0]
let res = await jobs.submit(addNumValues(args))
check res == 6.0