refactoring

This commit is contained in:
Jaremy Creechley 2024-02-12 13:44:13 -07:00
parent a6b82b47e8
commit 889a8267c3
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 18 additions and 13 deletions

View File

@ -15,27 +15,34 @@ export queues
## ##
type type
JobId* = uint ## job id, should match `future.id()`
JobQueue*[T] = ref object JobQueue*[T] = ref object
queue*: SignalQueue[(uint, T)] queue*: SignalQueue[(JobId, T)]
futures*: Table[uint, Future[T]] futures*: Table[JobId, Future[T]]
taskpool*: Taskpool taskpool*: Taskpool
running*: bool running*: bool
JobResult*[T] = object
id*: JobId
queue*: SignalQueue[(JobId, T)]
proc processJobs*(jobs: JobQueue) {.async.} = proc processJobs*(jobs: JobQueue) {.async.} =
while jobs.running: while jobs.running:
echo "jobs running..." echo "jobs running..."
let res = get await jobs.queue.wait() let res = await(jobs.queue.wait()).get()
echo "jobs result: ", res.repr echo "jobs result: ", res.repr
echo "jobs futes: ", jobs.futures.unsafeAddr.pointer.repr, " => ", jobs.futures.keys().toSeq() echo "jobs futes: ", jobs.futures.unsafeAddr.pointer.repr, " => ", jobs.futures.keys().toSeq()
let (id, ret) = res let (id, ret) = res
let fut = jobs.futures[id] let fut = jobs.futures[id]
fut.complete(ret) fut.complete(ret)
proc createFuture*[T](jobs: JobQueue[T], name: static string): (uint, Future[T]) = proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) =
let fut = newFuture[T](name) let fut = newFuture[T](name)
jobs.futures[fut.id()] = fut let id = JobId fut.id()
jobs.futures[id] = fut
echo "jobs added: ", jobs.futures.unsafeAddr.pointer.repr, " => ", jobs.futures.keys().toSeq() echo "jobs added: ", jobs.futures.unsafeAddr.pointer.repr, " => ", jobs.futures.keys().toSeq()
return (fut.id(), fut, ) return (JobResult[T](id: id, queue: jobs.queue), fut, )
proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} = proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} =
result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true) result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true)
@ -45,17 +52,15 @@ macro submitMacro*(tp: untyped, jobs: untyped, exp: untyped): untyped =
## modifies the call expression to include the job queue and ## modifies the call expression to include the job queue and
## the job id parameters ## the job id parameters
let jobRes = genSym(nskLet, "jobRes")
let futName = genSym(nskLet, "fut") let futName = genSym(nskLet, "fut")
let idName = genSym(nskLet, "id")
let nm = newLit(repr(exp)) let nm = newLit(repr(exp))
let queueExpr = quote do: `jobs`.queue
var fncall = nnkCall.newTree(exp[0]) var fncall = nnkCall.newTree(exp[0])
fncall.add(queueExpr) fncall.add(jobRes)
fncall.add(idName)
for p in exp[1..^1]: fncall.add(p) for p in exp[1..^1]: fncall.add(p)
result = quote do: result = quote do:
let (`idName`, `futName`) = createFuture(`jobs`, `nm`) let (`jobRes`, `futName`) = createFuture(`jobs`, `nm`)
`jobs`.taskpool.spawn(`fncall`) `jobs`.taskpool.spawn(`fncall`)
`futName` `futName`

View File

@ -16,9 +16,9 @@ proc addNumsRaw(a, b: float): float =
echo "adding: ", a, " + ", b echo "adding: ", a, " + ", b
return a + b return a + b
proc addNums(queue: SignalQueue[(uint, float)], id: uint, a, b: float) = proc addNums(jobResult: JobResult[float], a, b: float) =
let res = addNumsRaw(a, b) let res = addNumsRaw(a, b)
discard queue.send((id, res,)) discard jobResult.queue.send((jobResult.id, res,))
suite "async tests": suite "async tests":