mirror of
https://github.com/logos-storage/apatheia.git
synced 2026-01-04 05:53:11 +00:00
cleanup futures
This commit is contained in:
parent
e86a7f6727
commit
d75c2d8a19
@ -9,33 +9,43 @@ import chronos
|
|||||||
|
|
||||||
export queues
|
export queues
|
||||||
|
|
||||||
## TODO:
|
## This module provides a simple way to submit jobs to taskpools
|
||||||
## setup queue to tie together future and result for a specific instance
|
## and getting a result returned via an async future.
|
||||||
## this setup will result in out-of-order results
|
##
|
||||||
##
|
##
|
||||||
|
|
||||||
type
|
type
|
||||||
JobId* = uint ## job id, should match `future.id()`
|
JobId* = uint ## job id, should match `future.id()`
|
||||||
|
|
||||||
JobQueue*[T] = ref object
|
JobQueue*[T] = ref object
|
||||||
|
## job queue object
|
||||||
queue*: SignalQueue[(JobId, T)]
|
queue*: SignalQueue[(JobId, T)]
|
||||||
futures*: Table[JobId, Future[T]]
|
futures*: Table[JobId, Future[T]]
|
||||||
taskpool*: Taskpool
|
taskpool*: Taskpool
|
||||||
running*: bool
|
running*: bool
|
||||||
|
|
||||||
JobResult*[T] = object
|
JobResult*[T] = object
|
||||||
|
## hold a job result to be returned by jobs
|
||||||
id*: JobId
|
id*: JobId
|
||||||
queue*: SignalQueue[(JobId, T)]
|
queue*: SignalQueue[(JobId, T)]
|
||||||
|
|
||||||
proc processJobs*(jobs: JobQueue) {.async.} =
|
proc processJobs*[T](jobs: JobQueue[T]) {.async.} =
|
||||||
|
## Starts a "detached" async processor for a given job queue.
|
||||||
|
##
|
||||||
|
## This processor waits for events from the queue in the JobQueue
|
||||||
|
## and complete the associated futures.
|
||||||
|
|
||||||
while jobs.running:
|
while jobs.running:
|
||||||
echo "jobs running..."
|
echo "jobs running..."
|
||||||
let res = await(jobs.queue.wait()).get()
|
let res = await(jobs.queue.wait()).get()
|
||||||
echo "jobs result: ", res.repr
|
echo "jobs result: ", res.repr
|
||||||
echo "jobs futes: ", jobs.futures.unsafeAddr.pointer.repr, " => ", jobs.futures.keys().toSeq()
|
echo "jobs futes: ", jobs.futures.unsafeAddr.pointer.repr, " => ", jobs.futures.keys().toSeq()
|
||||||
let (id, ret) = res
|
let (id, ret) = res
|
||||||
let fut = jobs.futures[id]
|
var fut: Future[T]
|
||||||
fut.complete(ret)
|
if jobs.futures.pop(id, fut):
|
||||||
|
fut.complete(ret)
|
||||||
|
else:
|
||||||
|
raise newException(IndexDefect, "missing future: " & $id)
|
||||||
|
|
||||||
proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) =
|
proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) =
|
||||||
let fut = newFuture[T](name)
|
let fut = newFuture[T](name)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user