setup query end

This commit is contained in:
Jaremy Creechley 2023-09-26 14:53:12 -07:00
parent 33ca4346e4
commit 173d42631a
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300

View File

@ -15,6 +15,7 @@ import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/taskpools import pkg/taskpools
import pkg/chronicles import pkg/chronicles
import pkg/threading/smartptrs
import ../key import ../key
import ../query import ../query
@ -42,12 +43,14 @@ type
of Sqlite: of Sqlite:
sql*: SQLiteBackend[KeyId,DataBuffer] sql*: SQLiteBackend[KeyId,DataBuffer]
TaskCtx[T: ThreadTypes] = object TaskCtxObj[T: ThreadTypes] = object
res: ThreadResult[T] res: ThreadResult[T]
signal: ThreadSignalPtr signal: ThreadSignalPtr
running: bool running: bool
cancelled: bool cancelled: bool
TaskCtx[T] = SharedPtr[TaskCtxObj[T]]
ThreadDatastore* = ref object of Datastore ThreadDatastore* = ref object of Datastore
tp: Taskpool tp: Taskpool
backend: ThreadBackend backend: ThreadBackend
@ -57,22 +60,22 @@ type
var ctxLock: Lock var ctxLock: Lock
ctxLock.initLock() ctxLock.initLock()
proc setCancelled(ctx: var TaskCtx): bool = proc setCancelled[T](ctx: TaskCtx[T]): bool =
withLock(ctxLock): withLock(ctxLock):
if ctx.running: if ctx[].running:
return false return false
else: else:
ctx.cancelled = true ctx[].cancelled = true
return true return true
proc setRunning[T](ctx: ptr TaskCtx[T]): bool = proc setRunning[T](ctx: TaskCtx[T]): bool =
withLock(ctxLock): withLock(ctxLock):
if ctx.cancelled: if ctx[].cancelled:
return return
ctx.running = true ctx[].running = true
proc setDone[T](ctx: ptr TaskCtx[T]) = proc setDone[T](ctx: TaskCtx[T]) =
withLock(ctxLock): withLock(ctxLock):
ctx.running = false ctx[].running = false
proc acquireSignal(): ?!ThreadSignalPtr = proc acquireSignal(): ?!ThreadSignalPtr =
let signal = ThreadSignalPtr.new() let signal = ThreadSignalPtr.new()
@ -81,7 +84,7 @@ proc acquireSignal(): ?!ThreadSignalPtr =
else: else:
success signal.get() success signal.get()
template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) = template executeTask[T](ctx: TaskCtx[T], blk: untyped) =
try: try:
if not ctx.setRunning(): if not ctx.setRunning():
return return
@ -90,11 +93,11 @@ template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) =
let res = `blk` let res = `blk`
if res.isOk(): if res.isOk():
when T is void: when T is void:
ctx.res.ok() ctx[].res.ok()
else: else:
ctx.res.ok(res.get()) ctx[].res.ok(res.get())
else: else:
ctx.res.err res.error().toThreadErr() ctx[].res.err res.error().toThreadErr()
except CatchableError as exc: except CatchableError as exc:
trace "Unexpected exception thrown in async task", exc = exc.msg trace "Unexpected exception thrown in async task", exc = exc.msg
ctx[].res.err exc.toThreadErr() ctx[].res.err exc.toThreadErr()
@ -110,7 +113,7 @@ template dispatchTask[T](self: ThreadDatastore,
blk: untyped blk: untyped
): auto = ): auto =
var var
ctx {.inject.} = TaskCtx[T](signal: signal) ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal))
try: try:
case self.backend.kind: case self.backend.kind:
of Sqlite: of Sqlite:
@ -119,7 +122,7 @@ template dispatchTask[T](self: ThreadDatastore,
`blk` `blk`
runTask() runTask()
await wait(ctx.signal) await wait(ctx[].signal)
except CancelledError as exc: except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg trace "Cancelling thread future!", exc = exc.msg
while not ctx.setCancelled(): while not ctx.setCancelled():
@ -127,10 +130,10 @@ template dispatchTask[T](self: ThreadDatastore,
await sleepAsync(10.milliseconds) await sleepAsync(10.milliseconds)
raise exc raise exc
finally: finally:
discard ctx.signal.close() discard ctx[].signal.close()
self.semaphore.release() self.semaphore.release()
proc hasTask[T, DB](ctx: ptr TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} =
## run backend command ## run backend command
executeTask(ctx): executeTask(ctx):
has(ds, key) has(ds, key)
@ -143,9 +146,9 @@ method has*(self: ThreadDatastore,
let key = KeyId.new key.id() let key = KeyId.new key.id()
dispatchTask[bool](self, signal): dispatchTask[bool](self, signal):
self.tp.spawn hasTask(addr ctx, ds, key) self.tp.spawn hasTask(ctx, ds, key)
proc deleteTask[T, DB](ctx: ptr TaskCtx[T], ds: DB; proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB;
key: KeyId) {.gcsafe.} = key: KeyId) {.gcsafe.} =
## run backend command ## run backend command
executeTask(ctx): executeTask(ctx):
@ -159,7 +162,7 @@ method delete*(self: ThreadDatastore,
let key = KeyId.new key.id() let key = KeyId.new key.id()
dispatchTask[void](self, signal): dispatchTask[void](self, signal):
self.tp.spawn deleteTask(addr ctx, ds, key) self.tp.spawn deleteTask(ctx, ds, key)
method delete*(self: ThreadDatastore, method delete*(self: ThreadDatastore,
keys: seq[Key]): Future[?!void] {.async.} = keys: seq[Key]): Future[?!void] {.async.} =
@ -170,7 +173,7 @@ method delete*(self: ThreadDatastore,
return success() return success()
proc putTask[T, DB](ctx: ptr TaskCtx[T], ds: DB; proc putTask[T, DB](ctx: TaskCtx[T], ds: DB;
key: KeyId, key: KeyId,
data: DataBuffer) {.gcsafe, nimcall.} = data: DataBuffer) {.gcsafe, nimcall.} =
## run backend command ## run backend command
@ -187,7 +190,7 @@ method put*(self: ThreadDatastore,
let key = KeyId.new key.id() let key = KeyId.new key.id()
let data = DataBuffer.new data let data = DataBuffer.new data
dispatchTask[void](self, signal): dispatchTask[void](self, signal):
self.tp.spawn putTask(addr ctx, ds, key, data) self.tp.spawn putTask(ctx, ds, key, data)
method put*( method put*(
self: ThreadDatastore, self: ThreadDatastore,
@ -199,7 +202,7 @@ method put*(
return success() return success()
proc getTask[T, DB](ctx: ptr TaskCtx[T], ds: DB; proc getTask[T, DB](ctx: TaskCtx[T], ds: DB;
key: KeyId) {.gcsafe, nimcall.} = key: KeyId) {.gcsafe, nimcall.} =
## run backend command ## run backend command
executeTask(ctx): executeTask(ctx):
@ -214,7 +217,7 @@ method get*(self: ThreadDatastore,
let key = KeyId.new key.id() let key = KeyId.new key.id()
dispatchTask[void](self, signal): dispatchTask[void](self, signal):
self.tp.spawn getTask(addr ctx, ds, key) self.tp.spawn getTask(ctx, ds, key)
method close*(self: ThreadDatastore): Future[?!void] {.async.} = method close*(self: ThreadDatastore): Future[?!void] {.async.} =
await self.semaphore.closeAll() await self.semaphore.closeAll()
@ -226,7 +229,7 @@ type
QResult = DbQueryResponse[KeyId, DataBuffer] QResult = DbQueryResponse[KeyId, DataBuffer]
proc queryTask[DB]( proc queryTask[DB](
ctx: ptr TaskCtx[QResult], ctx: TaskCtx[QResult],
ds: DB, ds: DB,
dq: DbQuery[KeyId] dq: DbQuery[KeyId]
) {.gcsafe, nimcall.} = ) {.gcsafe, nimcall.} =
@ -266,7 +269,7 @@ method query*(
) )
dispatchTask[DbQueryResponse[KeyId, DataBuffer]](self, signal): dispatchTask[DbQueryResponse[KeyId, DataBuffer]](self, signal):
self.tp.spawn queryTask(addr ctx, ds, dq) self.tp.spawn queryTask(ctx, ds, dq)
var var
lock = newAsyncLock() # serialize querying under threads lock = newAsyncLock() # serialize querying under threads
@ -286,16 +289,16 @@ method query*(
await lock.acquire() await lock.acquire()
dispatchTask[void](self, signal): dispatchTask[void](self, signal):
discard ctx.signal.fireSync() discard ctx[].signal.fireSync()
if ctx.res.isErr() and ctx.res.error()[0] == ErrorEnum.QueryEndedErr: if ctx[].res.isErr() and ctx[].res.error()[0] == ErrorEnum.QueryEndedErr:
iter.finished = true iter.finished = true
return return
elif ctx.res.isErr(): elif ctx[].res.isErr():
return err(ctx.res.error()) return err(ctx[].res.error())
else: else:
let qres = ctx.res.get() # let qres = ctx[].res.get()
return ok(default) return (?!QueryResponse).ok(default(QueryResponse))
iter.next = next iter.next = next
return success iter return success iter