mirror of
https://github.com/logos-storage/apatheia.git
synced 2026-01-07 15:33:09 +00:00
refactor
This commit is contained in:
parent
a7f443c957
commit
32111ed466
@ -2,6 +2,7 @@ import std/tables
|
|||||||
import std/macros
|
import std/macros
|
||||||
|
|
||||||
import ./queues
|
import ./queues
|
||||||
|
import ./memholders
|
||||||
|
|
||||||
import taskpools
|
import taskpools
|
||||||
import chronos
|
import chronos
|
||||||
@ -38,15 +39,6 @@ type
|
|||||||
data*: ptr UncheckedArray[T]
|
data*: ptr UncheckedArray[T]
|
||||||
size*: int
|
size*: int
|
||||||
|
|
||||||
type
|
|
||||||
MemoryHolder* = ref object of RootObj
|
|
||||||
|
|
||||||
SeqHolder*[T] = ref object of MemoryHolder
|
|
||||||
data*: seq[T]
|
|
||||||
|
|
||||||
StrHolder*[T] = ref object of MemoryHolder
|
|
||||||
data*: string
|
|
||||||
|
|
||||||
template toOpenArray*[T](arr: OpenArrayHolder[T]): auto =
|
template toOpenArray*[T](arr: OpenArrayHolder[T]): auto =
|
||||||
system.toOpenArray(arr.data, 0, arr.size)
|
system.toOpenArray(arr.data, 0, arr.size)
|
||||||
|
|
||||||
@ -77,19 +69,6 @@ proc createFuture*[T](jobs: JobQueue[T], name: static string): (JobResult[T], Fu
|
|||||||
trace "jobs added: ", numberJobs = jobs.futures.len()
|
trace "jobs added: ", numberJobs = jobs.futures.len()
|
||||||
return (JobResult[T](id: id, queue: jobs.queue), fut, )
|
return (JobResult[T](id: id, queue: jobs.queue), fut, )
|
||||||
|
|
||||||
var memoryHolder = newTable[JobId, seq[MemoryHolder]]()
|
|
||||||
|
|
||||||
proc storeMemoryHolder*(id: JobId, mem: MemoryHolder) {.gcsafe, raises: [].} =
|
|
||||||
{.cast(gcsafe).}:
|
|
||||||
memoryHolder[].withValue(id, value):
|
|
||||||
value[].add(mem)
|
|
||||||
do:
|
|
||||||
memoryHolder[id] = @[mem]
|
|
||||||
|
|
||||||
proc releaseMemoryHolder*(id: JobId) {.gcsafe, raises: [].} =
|
|
||||||
{.cast(gcsafe).}:
|
|
||||||
memoryHolder.del(id)
|
|
||||||
|
|
||||||
proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} =
|
proc newJobQueue*[T](maxItems: int = 0, taskpool: Taskpool = Taskpool.new()): JobQueue[T] {.raises: [ApatheiaSignalErr].} =
|
||||||
## Creates a new async-compatible threaded job queue.
|
## Creates a new async-compatible threaded job queue.
|
||||||
result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true)
|
result = JobQueue[T](queue: newSignalQueue[(uint, T)](maxItems), taskpool: taskpool, running: true)
|
||||||
@ -99,13 +78,8 @@ template checkJobArgs*[T](exp: seq[T], fut: untyped): OpenArrayHolder[T] =
|
|||||||
# static:
|
# static:
|
||||||
# echo "checkJobArgs::SEQ: ", $typeof(exp)
|
# echo "checkJobArgs::SEQ: ", $typeof(exp)
|
||||||
let rval = SeqHolder[T](data: exp)
|
let rval = SeqHolder[T](data: exp)
|
||||||
storeMemoryHolder(jobId, rval)
|
storeMemoryHolder(fut.id().JobId, rval)
|
||||||
let expPtr = OpenArrayHolder[T](data: cast[ptr UncheckedArray[T]](unsafeAddr(rval.data[0])), size: rval.data.len())
|
let expPtr = OpenArrayHolder[T](data: cast[ptr UncheckedArray[T]](unsafeAddr(rval.data[0])), size: rval.data.len())
|
||||||
# fut.addCallback proc(data: pointer) =
|
|
||||||
# ## keep the rval GC object alive for duration of the job
|
|
||||||
# discard rval.data.len()
|
|
||||||
# echo "FREE RVaL: ", rval.data.len()
|
|
||||||
## TODO: how to handle cancellations?
|
|
||||||
expPtr
|
expPtr
|
||||||
|
|
||||||
|
|
||||||
@ -129,7 +103,7 @@ macro submitMacro(tp: untyped, jobs: untyped, exp: untyped): untyped =
|
|||||||
echo "CHECK ARGS: ", p.treeRepr
|
echo "CHECK ARGS: ", p.treeRepr
|
||||||
let id = ident "arg" & $i
|
let id = ident "arg" & $i
|
||||||
argids.add(id)
|
argids.add(id)
|
||||||
let pn = nnkCall.newTree(ident"checkJobArgs", p, nil)
|
let pn = nnkCall.newTree(ident"checkJobArgs", p, `futName`)
|
||||||
letargs.add nnkIdentDefs.newTree( id, newEmptyNode(), pn)
|
letargs.add nnkIdentDefs.newTree( id, newEmptyNode(), pn)
|
||||||
# fncall.add(nnkCall.newTree(ident"checkJobArgs", p, `futName`))
|
# fncall.add(nnkCall.newTree(ident"checkJobArgs", p, `futName`))
|
||||||
echo "\nSUBMIT: ARGS: LET:\n", letargs.repr
|
echo "\nSUBMIT: ARGS: LET:\n", letargs.repr
|
||||||
@ -145,8 +119,8 @@ macro submitMacro(tp: untyped, jobs: untyped, exp: untyped): untyped =
|
|||||||
|
|
||||||
result = quote do:
|
result = quote do:
|
||||||
block:
|
block:
|
||||||
`letargs`
|
|
||||||
let (`jobRes`, `futName`) = createFuture(`jobs`, `nm`)
|
let (`jobRes`, `futName`) = createFuture(`jobs`, `nm`)
|
||||||
|
`letargs`
|
||||||
when typeof(`fncall`) isnot void:
|
when typeof(`fncall`) isnot void:
|
||||||
{.error: "Apatheia jobs cannot return values. The given proc returns type: " & $(typeof(`fncall`)) &
|
{.error: "Apatheia jobs cannot return values. The given proc returns type: " & $(typeof(`fncall`)) &
|
||||||
" for call " & astToStr(`fncall`).}
|
" for call " & astToStr(`fncall`).}
|
||||||
|
|||||||
24
src/apatheia/memholders.nim
Normal file
24
src/apatheia/memholders.nim
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
|
||||||
|
import std/tables
|
||||||
|
|
||||||
|
type
|
||||||
|
MemHolder* = ref object of RootObj
|
||||||
|
|
||||||
|
SeqHolder*[T] = ref object of MemHolder
|
||||||
|
data*: seq[T]
|
||||||
|
|
||||||
|
StrHolder*[T] = ref object of MemHolder
|
||||||
|
data*: string
|
||||||
|
|
||||||
|
var memHolderTable = newTable[uint, seq[MemHolder]]()
|
||||||
|
|
||||||
|
proc storeMemoryHolder*[T: uint](id: T, mem: MemHolder) {.gcsafe, raises: [].} =
|
||||||
|
{.cast(gcsafe).}:
|
||||||
|
memHolderTable[].withValue(id, value):
|
||||||
|
value[].add(mem)
|
||||||
|
do:
|
||||||
|
memHolderTable[id] = @[mem]
|
||||||
|
|
||||||
|
proc releaseMemoryHolder*[T: uint](id: T) {.gcsafe, raises: [].} =
|
||||||
|
{.cast(gcsafe).}:
|
||||||
|
memHolderTable.del(id)
|
||||||
Loading…
x
Reference in New Issue
Block a user