This commit is contained in:
Jaremy Creechley 2024-02-16 14:05:57 -07:00
parent 32111ed466
commit 8becb6ab5b
2 changed files with 7 additions and 4 deletions

View File

@ -42,6 +42,8 @@ type
template toOpenArray*[T](arr: OpenArrayHolder[T]): auto = template toOpenArray*[T](arr: OpenArrayHolder[T]): auto =
system.toOpenArray(arr.data, 0, arr.size) system.toOpenArray(arr.data, 0, arr.size)
func jobId*[T](fut: Future[T]): JobId = JobId fut.id()
proc processJobs*[T](jobs: JobQueue[T]) {.async.} = proc processJobs*[T](jobs: JobQueue[T]) {.async.} =
## Starts a "detached" async processor for a given job queue. ## Starts a "detached" async processor for a given job queue.
## ##
@ -54,6 +56,7 @@ proc processJobs*[T](jobs: JobQueue[T]) {.async.} =
let res = await(jobs.queue.wait()).get() let res = await(jobs.queue.wait()).get()
trace "got job result", jobResult = $res trace "got job result", jobResult = $res
let (id, ret) = res let (id, ret) = res
releaseMemory(id) # release any retained memory
var fut: Future[T] var fut: Future[T]
if jobs.futures.pop(id, fut): if jobs.futures.pop(id, fut):
fut.complete(ret) fut.complete(ret)
@ -64,7 +67,7 @@ proc processJobs*[T](jobs: JobQueue[T]) {.async.} =
proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) = proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Future[T]) =
## Creates a future that returns the result of the associated job. ## Creates a future that returns the result of the associated job.
let fut = newFuture[T](name) let fut = newFuture[T](name)
let id = JobId fut.id() let id = fut.jobId()
jobs.futures[id] = fut jobs.futures[id] = fut
trace "jobs added: ", numberJobs = jobs.futures.len() trace "jobs added: ", numberJobs = jobs.futures.len()
return (JobResult[T](id: id, queue: jobs.queue), fut, ) return (JobResult[T](id: id, queue: jobs.queue), fut, )
@ -78,7 +81,7 @@ template checkJobArgs*[T](exp: seq[T], fut: untyped): OpenArrayHolder[T] =
# static: # static:
# echo "checkJobArgs::SEQ: ", $typeof(exp) # echo "checkJobArgs::SEQ: ", $typeof(exp)
let rval = SeqHolder[T](data: exp) let rval = SeqHolder[T](data: exp)
storeMemoryHolder(fut.id().JobId, rval) fut.jobId().retainMemory(rval)
let expPtr = OpenArrayHolder[T](data: cast[ptr UncheckedArray[T]](unsafeAddr(rval.data[0])), size: rval.data.len()) let expPtr = OpenArrayHolder[T](data: cast[ptr UncheckedArray[T]](unsafeAddr(rval.data[0])), size: rval.data.len())
expPtr expPtr

View File

@ -12,13 +12,13 @@ type
var memHolderTable = newTable[uint, seq[MemHolder]]() var memHolderTable = newTable[uint, seq[MemHolder]]()
proc storeMemoryHolder*[T: uint](id: T, mem: MemHolder) {.gcsafe, raises: [].} = proc retainMemory*[T: uint](id: T, mem: MemHolder) {.gcsafe, raises: [].} =
{.cast(gcsafe).}: {.cast(gcsafe).}:
memHolderTable[].withValue(id, value): memHolderTable[].withValue(id, value):
value[].add(mem) value[].add(mem)
do: do:
memHolderTable[id] = @[mem] memHolderTable[id] = @[mem]
proc releaseMemoryHolder*[T: uint](id: T) {.gcsafe, raises: [].} = proc releaseMemory*[T: uint](id: T) {.gcsafe, raises: [].} =
{.cast(gcsafe).}: {.cast(gcsafe).}:
memHolderTable.del(id) memHolderTable.del(id)