mirror of
https://github.com/logos-storage/apatheia.git
synced 2026-01-02 13:03:11 +00:00
format all
This commit is contained in:
parent
d417cdec04
commit
1dfdbe8e1f
@ -1,3 +1,2 @@
|
||||
|
||||
import apatheia/tasks
|
||||
export tasks
|
||||
|
||||
@ -159,8 +159,7 @@ when isMainModule:
|
||||
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
||||
|
||||
asyncTest "basic openarray":
|
||||
var
|
||||
jobs = newJobQueue[float](taskpool = tp)
|
||||
var jobs = newJobQueue[float](taskpool = tp)
|
||||
|
||||
let job = jobs.submit(addNumValues(10.0, @[1.0.float, 2.0]))
|
||||
let res = await job
|
||||
|
||||
@ -3,22 +3,19 @@ import std/[tables, strutils, typetraits, macros]
|
||||
proc makeProcName*(s: string): string =
|
||||
result = ""
|
||||
for c in s:
|
||||
if c.isAlphaNumeric: result.add c
|
||||
if c.isAlphaNumeric:
|
||||
result.add c
|
||||
|
||||
proc hasReturnType*(params: NimNode): bool =
|
||||
if params != nil and params.len > 0 and params[0] != nil and
|
||||
params[0].kind != nnkEmpty:
|
||||
if params != nil and params.len > 0 and params[0] != nil and params[0].kind != nnkEmpty:
|
||||
result = true
|
||||
|
||||
proc getReturnType*(params: NimNode): NimNode =
|
||||
if params != nil and params.len > 0 and params[0] != nil and
|
||||
params[0].kind != nnkEmpty:
|
||||
if params != nil and params.len > 0 and params[0] != nil and params[0].kind != nnkEmpty:
|
||||
result = params[0]
|
||||
|
||||
proc firstArgument*(params: NimNode): (NimNode, NimNode) =
|
||||
if params != nil and
|
||||
params.len > 0 and
|
||||
params[1] != nil and
|
||||
if params != nil and params.len > 0 and params[1] != nil and
|
||||
params[1].kind == nnkIdentDefs:
|
||||
result = (ident params[1][0].strVal, params[1][1])
|
||||
else:
|
||||
@ -29,15 +26,13 @@ iterator paramsIter*(params: NimNode): tuple[name, ntype: NimNode] =
|
||||
for i in 1 ..< params.len:
|
||||
let arg = params[i]
|
||||
let argType = arg[^2]
|
||||
for j in 0 ..< arg.len-2:
|
||||
for j in 0 ..< arg.len - 2:
|
||||
yield (arg[j], argType)
|
||||
|
||||
proc signalTuple*(sig: NimNode): NimNode =
|
||||
let otp = nnkEmpty.newTree()
|
||||
# echo "signalObjRaw:sig1: ", sig.treeRepr
|
||||
let sigTyp =
|
||||
if sig.kind == nnkSym: sig.getTypeInst
|
||||
else: sig.getTypeInst
|
||||
let sigTyp = if sig.kind == nnkSym: sig.getTypeInst else: sig.getTypeInst
|
||||
# echo "signalObjRaw:sig2: ", sigTyp.treeRepr
|
||||
let stp =
|
||||
if sigTyp.kind == nnkProcTy:
|
||||
@ -52,14 +47,16 @@ proc signalTuple*(sig: NimNode): NimNode =
|
||||
# echo "signalObjRaw:sig: ", stp.repr
|
||||
|
||||
var args: seq[NimNode]
|
||||
for i in 2..<stp.len:
|
||||
for i in 2 ..< stp.len:
|
||||
args.add stp[i]
|
||||
|
||||
result = nnkTupleConstr.newTree()
|
||||
if isGeneric:
|
||||
template genArgs(n): auto = n[1][1]
|
||||
template genArgs(n): auto =
|
||||
n[1][1]
|
||||
|
||||
var genKinds: Table[string, NimNode]
|
||||
for i in 1..<stp.genArgs.len:
|
||||
for i in 1 ..< stp.genArgs.len:
|
||||
genKinds[repr stp.genArgs[i]] = otp[i]
|
||||
for arg in args:
|
||||
result.add genKinds[arg[1].repr]
|
||||
@ -72,19 +69,20 @@ proc signalTuple*(sig: NimNode): NimNode =
|
||||
# echo ""
|
||||
if result.len == 0:
|
||||
# result = bindSym"void"
|
||||
result = quote do:
|
||||
result = quote:
|
||||
tuple[]
|
||||
|
||||
proc mkParamsVars*(paramsIdent, paramsType, params: NimNode): NimNode =
|
||||
## Create local variables for each parameter in the actual RPC call proc
|
||||
if params.isNil: return
|
||||
if params.isNil:
|
||||
return
|
||||
|
||||
result = newStmtList()
|
||||
var varList = newSeq[NimNode]()
|
||||
var cnt = 0
|
||||
for paramid, paramType in paramsIter(params):
|
||||
let idx = newIntLitNode(cnt)
|
||||
let vars = quote do:
|
||||
let vars = quote:
|
||||
var `paramid`: `paramType` = `paramsIdent`[`idx`]
|
||||
varList.add vars
|
||||
cnt.inc()
|
||||
@ -106,9 +104,10 @@ proc mkParamsType*(paramsIdent, paramsType, params, genericParams: NimNode): Nim
|
||||
##
|
||||
## proc multiplyrpc(params: RpcType_multiplyrpc): int =
|
||||
##
|
||||
if params.isNil: return
|
||||
if params.isNil:
|
||||
return
|
||||
|
||||
var tup = quote do:
|
||||
var tup = quote:
|
||||
type `paramsType` = tuple[]
|
||||
for paramIdent, paramType in paramsIter(params):
|
||||
# processing multiple variables of one type
|
||||
@ -129,7 +128,8 @@ proc procIdentAppend*(id: NimNode, name: string): NimNode =
|
||||
|
||||
proc mkCall*(callName, params: NimNode): NimNode =
|
||||
## Create local variables for each parameter in the actual RPC call proc
|
||||
if params.isNil: return
|
||||
if params.isNil:
|
||||
return
|
||||
var argList = newSeq[NimNode]()
|
||||
for paramId, paramType in paramsIter(params):
|
||||
argList.add paramId
|
||||
@ -138,9 +138,10 @@ proc mkCall*(callName, params: NimNode): NimNode =
|
||||
|
||||
proc mkProc*(name, params, body: NimNode): NimNode =
|
||||
let args = params.copyNimTree()
|
||||
result = quote do:
|
||||
result = quote:
|
||||
proc `name`() {.nimcall.} =
|
||||
`body`
|
||||
|
||||
result[3].del(0)
|
||||
for arg in args:
|
||||
result.params.add arg
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
|
||||
import std/tables
|
||||
|
||||
import ./types
|
||||
|
||||
@ -10,23 +10,23 @@ export options
|
||||
export threadsync
|
||||
export chronos
|
||||
|
||||
type
|
||||
ChanPtr[T] = ptr Channel[T]
|
||||
type ChanPtr[T] = ptr Channel[T]
|
||||
|
||||
proc allocSharedChannel[T](): ChanPtr[T] =
|
||||
cast[ChanPtr[T]](allocShared0(sizeof(Channel[T])))
|
||||
|
||||
type
|
||||
SignalQueue*[T] = object
|
||||
signal: ThreadSignalPtr
|
||||
chan*: ChanPtr[T]
|
||||
type SignalQueue*[T] = object
|
||||
signal: ThreadSignalPtr
|
||||
chan*: ChanPtr[T]
|
||||
|
||||
proc dispose*[T](val: SignalQueue[T]) =
|
||||
## Call to properly dispose of a SignalQueue.
|
||||
deallocShared(val.chan)
|
||||
discard val.signal.close()
|
||||
|
||||
proc newSignalQueue*[T](maxItems: int = 0): SignalQueue[T] {.raises: [ApatheiaSignalErr].} =
|
||||
proc newSignalQueue*[T](
|
||||
maxItems: int = 0
|
||||
): SignalQueue[T] {.raises: [ApatheiaSignalErr].} =
|
||||
## Create a signal queue compatible with Chronos async.
|
||||
let res = ThreadSignalPtr.new()
|
||||
if res.isErr():
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
|
||||
import std/[macros, strutils]
|
||||
|
||||
import macroutils
|
||||
@ -30,30 +29,30 @@ macro asyncTask*(p: untyped): untyped =
|
||||
# pragmas = p[4]
|
||||
body = p[6]
|
||||
name = repr(procId).strip(false, true, {'*'})
|
||||
|
||||
|
||||
if not hasReturnType(params):
|
||||
error("tasklet definition must have return type", p)
|
||||
|
||||
# setup inner tasklet proc
|
||||
let tp = mkProc(procId.procIdentAppend("Tasklet"),
|
||||
params, body)
|
||||
let tp = mkProc(procId.procIdentAppend("Tasklet"), params, body)
|
||||
|
||||
# setup async wrapper code
|
||||
var asyncBody = newStmtList()
|
||||
let tcall = newCall(ident(name & "Tasklet"))
|
||||
for paramId, paramType in paramsIter(params):
|
||||
tcall.add newCall("checkParamType", paramId)
|
||||
asyncBody = quote do:
|
||||
asyncBody = quote:
|
||||
let val {.inject.} = `tcall`
|
||||
discard jobResult.queue.send((jobResult.id, val,))
|
||||
discard jobResult.queue.send((jobResult.id, val))
|
||||
|
||||
var asyncParams = params.copyNimTree()
|
||||
let retType = if not hasReturnType(params): ident"void"
|
||||
else: params.getReturnType()
|
||||
let retType =
|
||||
if not hasReturnType(params):
|
||||
ident"void"
|
||||
else:
|
||||
params.getReturnType()
|
||||
let jobArg = nnkIdentDefs.newTree(
|
||||
ident"jobResult",
|
||||
nnkBracketExpr.newTree(ident"JobResult", retType),
|
||||
newEmptyNode()
|
||||
ident"jobResult", nnkBracketExpr.newTree(ident"JobResult", retType), newEmptyNode()
|
||||
)
|
||||
asyncParams[0] = newEmptyNode()
|
||||
asyncParams.insert(1, jobArg)
|
||||
@ -67,13 +66,8 @@ macro asyncTask*(p: untyped): untyped =
|
||||
echo "asyncTask:body:\n", result.repr
|
||||
|
||||
when isMainModule:
|
||||
type HashOptions* = object
|
||||
striped*: bool
|
||||
|
||||
type
|
||||
HashOptions* = object
|
||||
striped*: bool
|
||||
|
||||
proc doHashes2*(data: openArray[byte],
|
||||
opts: HashOptions): float {.asyncTask.} =
|
||||
proc doHashes2*(data: openArray[byte], opts: HashOptions): float {.asyncTask.} =
|
||||
echo "hashing"
|
||||
|
||||
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
|
||||
type
|
||||
ApatheiaException* = object of CatchableError
|
||||
ApatheiaSignalErr* = object of ApatheiaException
|
||||
|
||||
JobId* = uint ## job id, should match `future.id()`
|
||||
JobId* = uint ## job id, should match `future.id()`
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user