integrate setups

This commit is contained in:
Jaremy Creechley 2023-09-26 12:51:28 -07:00
parent ca9edb2798
commit d34cbd7df5
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300

View File

@ -72,31 +72,6 @@ proc acquireSignal(): ?!ThreadSignalPtr =
else:
success signal.get()
template dispatchTask[T](self: ThreadDatastore,
signal: ThreadSignalPtr,
blk: untyped
): auto =
var
ctx {.inject.} = TaskCtx[T](signal: signal)
try:
case self.backend.kind:
of Sqlite:
var ds {.inject.} = self.backend.sql
proc runTask() =
`blk`
runTask()
await wait(ctx.signal)
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
while not ctx.setCancelled():
warn "waiting to cancel thread future!", fn = astToStr(fn)
await sleepAsync(10.milliseconds)
raise exc
finally:
discard ctx.signal.close()
self.semaphore.release()
template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) =
try:
withLock(ctxLock):
@ -122,6 +97,31 @@ template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) =
finally:
discard ctx[].signal.fireSync()
template dispatchTask[T](self: ThreadDatastore,
signal: ThreadSignalPtr,
blk: untyped
): auto =
var
ctx {.inject.} = TaskCtx[T](signal: signal)
try:
case self.backend.kind:
of Sqlite:
var ds {.inject.} = self.backend.sql
proc runTask() =
`blk`
runTask()
await wait(ctx.signal)
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
while not ctx.setCancelled():
warn "waiting to cancel thread future!", fn = astToStr(fn)
await sleepAsync(10.milliseconds)
raise exc
finally:
discard ctx.signal.close()
self.semaphore.release()
proc hasTask[T, DB](ctx: ptr TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} =
## run backend command
executeTask(ctx):
@ -214,15 +214,18 @@ method close*(self: ThreadDatastore): Future[?!void] {.async.} =
of Sqlite:
self.backend.sql.close()
proc queryTask[DB](ctx: ptr TaskCtx, ds: DB;
key: DbQuery[KeyId]) {.gcsafe, nimcall.} =
## run backend command
executeTask(ctx):
let handle = ds.query(q)
query(ds, key)
method query*(
self: ThreadDatastore,
query: Query): Future[?!QueryIter] {.async.} =
without var childIter =? await self.ds.query(query), error:
return failure error
var
iter = QueryIter.new()
lock = newAsyncLock() # serialize querying under threads
proc next(): Future[?!QueryResponse] {.async.} =
@ -242,9 +245,6 @@ method query*(
iter.finished = childIter.finished
var
res = ThreadResult[QueryResponse]()
ctx = TaskCtx[QueryResponse](
ds: self.ds,
res: addr res)
proc runTask() =
self.tp.spawn queryTask(addr ctx, addr childIter)