diff --git a/src/apatheia/jobs.nim b/src/apatheia/jobs.nim index 4d02438..6c56490 100644 --- a/src/apatheia/jobs.nim +++ b/src/apatheia/jobs.nim @@ -38,9 +38,15 @@ type data*: ptr UncheckedArray[T] size*: int - SeqHolder*[T] = ref object +type + MemoryHolder* = ref object of RootObj + + SeqHolder*[T] = ref object of MemoryHolder data*: seq[T] + StrHolder*[T] = ref object of MemoryHolder + data*: string + template toOpenArray*[T](arr: OpenArrayHolder[T]): auto = system.toOpenArray(arr.data, 0, arr.size) @@ -71,6 +77,19 @@ proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Fu trace "jobs added: ", numberJobs = jobs.futures.len() return (JobResult[T](id: id, queue: jobs.queue), fut, ) +var memoryHolder = newTable[JobId, seq[MemoryHolder]]() + +proc storeMemoryHolder*(id: JobId, mem: MemoryHolder) {.gcsafe, raises: [].} = + {.cast(gcsafe).}: + memoryHolder[].withValue(id, value): + value[].add(mem) + do: + memoryHolder[id] = @[mem] + +proc releaseMemoryHolder*(id: JobId) {.gcsafe, raises: [].} = + {.cast(gcsafe).}: + memoryHolder.del(id) + proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} = ## Creates a new async-compatible threaded job queue. result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true) @@ -80,6 +99,7 @@ template checkJobArgs*[T](exp: seq[T], fut: untyped): OpenArrayHolder[T] = # static: # echo "checkJobArgs::SEQ: ", $typeof(exp) let rval = SeqHolder[T](data: exp) + storeMemoryHolder(jobId, rval) let expPtr = OpenArrayHolder[T](data: cast[ptr UncheckedArray[T]](unsafeAddr(rval.data[0])), size: rval.data.len()) # fut.addCallback proc(data: pointer) = # ## keep the rval GC object alive for duration of the job