mirror of
https://github.com/logos-storage/apatheia.git
synced 2026-01-03 05:23:09 +00:00
trying to setup a jobs flag
This commit is contained in:
parent
9f984b21d1
commit
2de80356b2
@ -1,10 +1,12 @@
|
||||
import std/tables
|
||||
import std/macros
|
||||
import std/locks
|
||||
|
||||
import ./queues
|
||||
import ./memretainers
|
||||
|
||||
import taskpools
|
||||
import threading/smartptrs
|
||||
import chronos
|
||||
import chronicles
|
||||
|
||||
@ -33,13 +35,19 @@ logScope:
|
||||
type
|
||||
JobQueue*[T] = ref object ## job queue object
|
||||
queue*: SignalQueue[(JobId, T)]
|
||||
futures*: Table[JobId, Future[T]]
|
||||
futures*: Table[JobId, (Future[T], SharedPtr[JobFlags])]
|
||||
taskpool*: Taskpool
|
||||
running*: bool
|
||||
|
||||
JobFlags* = object ## hold the result of a job after it finishes
|
||||
lock*: Lock
|
||||
isRunning*: bool
|
||||
isCancelled*: bool
|
||||
|
||||
JobResult*[T] = object ## hold the result of a job after it finishes
|
||||
id*: JobId
|
||||
queue*: SignalQueue[(JobId, T)]
|
||||
flags*: SharedPtr[JobFlags]
|
||||
|
||||
OpenArrayHolder*[T] = object
|
||||
data*: ptr UncheckedArray[T]
|
||||
@ -65,7 +73,9 @@ proc processJobs*[T](jobs: JobQueue[T]) {.async.} =
|
||||
let (id, ret) = await(jobs.queue.wait()).get()
|
||||
trace "got job result", jobId = id
|
||||
releaseMemory(id) # always release any retained memory
|
||||
if (var fut: Future[T]; jobs.futures.pop(id, fut)):
|
||||
var futTuple: (Future[T], SharedPtr[JobFlags])
|
||||
if jobs.futures.pop(id, futTuple):
|
||||
let fut = futTuple[0]
|
||||
if not fut.finished():
|
||||
fut.complete(ret)
|
||||
else:
|
||||
@ -76,9 +86,10 @@ proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Fu
|
||||
## Creates a future that returns the result of the associated job.
|
||||
let fut = newFuture[T](name)
|
||||
let id = fut.jobId()
|
||||
jobs.futures[id] = fut
|
||||
let flags = newSharedPtr(JobFlags)
|
||||
jobs.futures[id] = (fut, flags)
|
||||
trace "job added: ", numberJobs = jobs.futures.len()
|
||||
return (JobResult[T](id: id, queue: jobs.queue), fut)
|
||||
return (JobResult[T](id: id, queue: jobs.queue, flags: flags), fut)
|
||||
|
||||
proc newJobQueue*[T](
|
||||
maxItems: int = 0, taskpool: Taskpool = Taskpool.new()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user