mirror of
https://github.com/logos-storage/apatheia.git
synced 2026-05-03 15:43:35 +00:00
setup jobs
This commit is contained in:
parent
8a36e252fa
commit
98075da2b2
@ -1,6 +1,10 @@
|
|||||||
|
import std/tables
|
||||||
|
import std/macros
|
||||||
|
|
||||||
import ./queues
|
import ./queues
|
||||||
|
|
||||||
import taskpools
|
import taskpools
|
||||||
|
import chronos
|
||||||
|
|
||||||
export queues
|
export queues
|
||||||
|
|
||||||
@ -11,12 +15,47 @@ export queues
|
|||||||
|
|
||||||
type
|
type
|
||||||
JobQueue*[T] = object
|
JobQueue*[T] = object
|
||||||
queue*: SignalQueue[T]
|
queue*: SignalQueue[(uint, T)]
|
||||||
|
futures*: Table[uint, Future[T]]
|
||||||
taskpool*: Taskpool
|
taskpool*: Taskpool
|
||||||
|
running*: bool
|
||||||
|
|
||||||
|
proc processJobs*(jobs: JobQueue) {.async.} =
|
||||||
|
while jobs.running:
|
||||||
|
echo "jobs running..."
|
||||||
|
let res = jobs.queue.wait()
|
||||||
|
echo "jobs result: ", res.repr
|
||||||
|
|
||||||
proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} =
|
proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} =
|
||||||
JobQueue[T](queue: newSignalQueue[T](maxItems), taskpool: taskpool)
|
result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool)
|
||||||
|
asyncSpawn(processJobs(result))
|
||||||
|
|
||||||
template submit*[T](jobs: JobQueue[T], exp: typed): Future[T] =
|
macro submitMacro*(tp: untyped, jobs: untyped, exp: untyped): untyped =
|
||||||
jobs.taskpool.spawn(exp)
|
|
||||||
await wait(jobs.queue)
|
echo "submit:::"
|
||||||
|
echo "submit:T: ", tp.treeRepr
|
||||||
|
echo "submit: ", exp.treerepr
|
||||||
|
|
||||||
|
let futName = genSym(nskLet, "fut")
|
||||||
|
let idName = genSym(nskLet, "id")
|
||||||
|
let queueName = genSym(nskLet, "queue")
|
||||||
|
var fncall = nnkCall.newTree(exp[0])
|
||||||
|
fncall.add(queueName)
|
||||||
|
fncall.add(idName)
|
||||||
|
for p in exp[1..^1]:
|
||||||
|
fncall.add(p)
|
||||||
|
echo "submit: ", fncall.treeRepr
|
||||||
|
|
||||||
|
result = quote do:
|
||||||
|
let `queueName` = jobs.queue
|
||||||
|
let `futName` = newFuture[`tp`](astToStr(`exp`))
|
||||||
|
let `idName` = `futName`.id()
|
||||||
|
`jobs`.futures[`idName`] = `futName`
|
||||||
|
`jobs`.taskpool.spawn(`fncall`)
|
||||||
|
`futName`
|
||||||
|
|
||||||
|
echo "submit: res:\n", result.repr
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
template submit*[T](jobs: JobQueue[T], exp: untyped): Future[T] =
|
||||||
|
submitMacro(T, jobs, exp)
|
||||||
@ -16,9 +16,9 @@ proc addNumsRaw(a, b: float): float =
|
|||||||
echo "adding: ", a, " + ", b
|
echo "adding: ", a, " + ", b
|
||||||
return a + b
|
return a + b
|
||||||
|
|
||||||
proc addNums(queue: SignalQueue[float], a, b: float) =
|
proc addNums(queue: SignalQueue[(uint, float)], id: uint, a, b: float) =
|
||||||
let res = addNumsRaw(a, b)
|
let res = addNumsRaw(a, b)
|
||||||
discard queue.send(res)
|
discard queue.send((id, res,))
|
||||||
|
|
||||||
suite "async tests":
|
suite "async tests":
|
||||||
|
|
||||||
@ -29,7 +29,7 @@ suite "async tests":
|
|||||||
asyncTest "test":
|
asyncTest "test":
|
||||||
|
|
||||||
echo "\nstart"
|
echo "\nstart"
|
||||||
let res = await jobs.submit(addNums(jobs.queue, 1.0, 2.0))
|
let res = await jobs.submit(addNums(1.0, 2.0,))
|
||||||
|
|
||||||
# await sleepAsync(100.milliseconds)
|
# await sleepAsync(100.milliseconds)
|
||||||
echo "result: ", res.repr
|
echo "result: ", res.repr
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user