diff --git a/.gitignore b/.gitignore index f494b1b..ea03a9e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,5 @@ +* +!*/ +!*.* .tool-versions +nim.cfg diff --git a/README.md b/README.md new file mode 100644 index 0000000..82623a6 --- /dev/null +++ b/README.md @@ -0,0 +1,20 @@ + +# Apatheia + +> *Apatheia* (*Greek: ἀπάθεια; from a- "without" and pathos "suffering" or "passion"*), in Stoicism, refers to a state of mind in which one is not disturbed by the passions. It might better be translated by the word equanimity than the word indifference. + +WIP utilities for using Chronos async with threading. The desire is to provide safe, pre-tested constructs for using threads with async. + +Goals: + +- support orc and refc + + refc may require extra copying for data +- use event queues (e.g. channels) to/from thread pool + + make it easy to monitor and debug queue capacity + + only use minimal AsyncFD handles + + lessen pressure on the main chronos futures pending queue +- support backpressure at futures level +- benchmarking overhead +- special support for seq[byte]'s and strings with zero-copy + + implement special but limited support zero-copy arguments on refc + diff --git a/apatheia.nimble b/apatheia.nimble index a06a038..7bda637 100644 --- a/apatheia.nimble +++ b/apatheia.nimble @@ -6,7 +6,10 @@ description = "Async support for threading primitives" license = "MIT" srcDir = "src" - # Dependencies -requires "nim >= 1.6.18" +requires "chronos >= 4.0.0" +requires "threading" +requires "taskpools >= 0.0.5" +requires "chronicles" + diff --git a/config.nims b/config.nims new file mode 100644 index 0000000..bae2200 --- /dev/null +++ b/config.nims @@ -0,0 +1,2 @@ + +--threads:on diff --git a/src/apatheia.nim b/src/apatheia.nim index b7a2480..569c129 100644 --- a/src/apatheia.nim +++ b/src/apatheia.nim @@ -1,7 +1,3 @@ -# This is just an example to get you started. A typical library package -# exports the main API in this file. Note that you cannot rename this file -# but you can remove it if you wish. -proc add*(x, y: int): int = - ## Adds two numbers together. - return x + y +import apatheia/tasks +export tasks diff --git a/src/apatheia/jobs.nim b/src/apatheia/jobs.nim new file mode 100644 index 0000000..d4a1898 --- /dev/null +++ b/src/apatheia/jobs.nim @@ -0,0 +1,90 @@ +import std/tables +import std/macros + +import ./queues + +import taskpools +import chronos +import chronicles + +export queues +export chronicles + +logScope: + # Lexical properties are typically assigned to a constant: + topics = "apatheia jobs" + +## This module provides a simple way to submit jobs to taskpools +## and getting a result returned via an async future. +## +## + +type + JobId* = uint ## job id, should match `future.id()` + + JobQueue*[T] = ref object + ## job queue object + queue*: SignalQueue[(JobId, T)] + futures*: Table[JobId, Future[T]] + taskpool*: Taskpool + running*: bool + + JobResult*[T] = object + ## hold a job result to be returned by jobs + id*: JobId + queue*: SignalQueue[(JobId, T)] + +proc processJobs*[T](jobs: JobQueue[T]) {.async.} = + ## Starts a "detached" async processor for a given job queue. + ## + ## This processor waits for events from the queue in the JobQueue + ## and complete the associated futures. + + const tn: string = $(JobQueue[T]) + info "Processing jobs in job queue for type ", type=tn + while jobs.running: + let res = await(jobs.queue.wait()).get() + trace "got job result", jobResult = $res + let (id, ret) = res + var fut: Future[T] + if jobs.futures.pop(id, fut): + fut.complete(ret) + else: + raise newException(IndexDefect, "missing future: " & $id) + info "Finishing processing jobs for type ", type=tn + +proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) = + ## Creates a future that returns the result of the associated job. + let fut = newFuture[T](name) + let id = JobId fut.id() + jobs.futures[id] = fut + trace "jobs added: ", numberJobs = jobs.futures.len() + return (JobResult[T](id: id, queue: jobs.queue), fut, ) + +proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} = + ## Creates a new async-compatible threaded job queue. + result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true) + asyncSpawn(processJobs(result)) + +macro submitMacro(tp: untyped, jobs: untyped, exp: untyped): untyped = + ## modifies the call expression to include the job queue and + ## the job id parameters + + let jobRes = genSym(nskLet, "jobRes") + let futName = genSym(nskLet, "fut") + let nm = newLit(repr(exp)) + var fncall = nnkCall.newTree(exp[0]) + fncall.add(jobRes) + for p in exp[1..^1]: fncall.add(p) + + result = quote do: + let (`jobRes`, `futName`) = createFuture(`jobs`, `nm`) + `jobs`.taskpool.spawn(`fncall`) + `futName` + + # echo "submit: res:\n", result.repr + # echo "" + +template submit*[T](jobs: JobQueue[T], exp: untyped): Future[T] = + submitMacro(T, jobs, exp) + diff --git a/src/apatheia/macroutils.nim b/src/apatheia/macroutils.nim new file mode 100644 index 0000000..8d615fe --- /dev/null +++ b/src/apatheia/macroutils.nim @@ -0,0 +1,146 @@ +import std/[tables, strutils, typetraits, macros] + +proc makeProcName*(s: string): string = + result = "" + for c in s: + if c.isAlphaNumeric: result.add c + +proc hasReturnType*(params: NimNode): bool = + if params != nil and params.len > 0 and params[0] != nil and + params[0].kind != nnkEmpty: + result = true + +proc getReturnType*(params: NimNode): NimNode = + if params != nil and params.len > 0 and params[0] != nil and + params[0].kind != nnkEmpty: + result = params[0] + +proc firstArgument*(params: NimNode): (NimNode, NimNode) = + if params != nil and + params.len > 0 and + params[1] != nil and + params[1].kind == nnkIdentDefs: + result = (ident params[1][0].strVal, params[1][1]) + else: + result = (ident "", newNimNode(nnkEmpty)) + +iterator paramsIter*(params: NimNode): tuple[name, ntype: NimNode] = + ## iterators through the parameters + for i in 1 ..< params.len: + let arg = params[i] + let argType = arg[^2] + for j in 0 ..< arg.len-2: + yield (arg[j], argType) + +proc signalTuple*(sig: NimNode): NimNode = + let otp = nnkEmpty.newTree() + # echo "signalObjRaw:sig1: ", sig.treeRepr + let sigTyp = + if sig.kind == nnkSym: sig.getTypeInst + else: sig.getTypeInst + # echo "signalObjRaw:sig2: ", sigTyp.treeRepr + let stp = + if sigTyp.kind == nnkProcTy: + sig.getTypeInst[0] + else: + sigTyp.params() + let isGeneric = false + + # echo "signalObjRaw:obj: ", otp.repr + # echo "signalObjRaw:obj:tr: ", otp.treeRepr + # echo "signalObjRaw:obj:isGen: ", otp.kind == nnkBracketExpr + # echo "signalObjRaw:sig: ", stp.repr + + var args: seq[NimNode] + for i in 2..