mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-03 22:23:10 +00:00
setup query end
This commit is contained in:
parent
77a147cad2
commit
b6baefc19c
@ -60,13 +60,9 @@ type
|
|||||||
var ctxLock: Lock
|
var ctxLock: Lock
|
||||||
ctxLock.initLock()
|
ctxLock.initLock()
|
||||||
|
|
||||||
proc setCancelled[T](ctx: TaskCtx[T]): bool =
|
proc setCancelled[T](ctx: TaskCtx[T]) =
|
||||||
withLock(ctxLock):
|
withLock(ctxLock):
|
||||||
if ctx[].running:
|
ctx[].cancelled = true
|
||||||
return false
|
|
||||||
else:
|
|
||||||
ctx[].cancelled = true
|
|
||||||
return true
|
|
||||||
|
|
||||||
proc setRunning[T](ctx: TaskCtx[T]): bool =
|
proc setRunning[T](ctx: TaskCtx[T]): bool =
|
||||||
withLock(ctxLock):
|
withLock(ctxLock):
|
||||||
@ -125,9 +121,7 @@ template dispatchTask[T](self: ThreadDatastore,
|
|||||||
await wait(ctx[].signal)
|
await wait(ctx[].signal)
|
||||||
except CancelledError as exc:
|
except CancelledError as exc:
|
||||||
trace "Cancelling thread future!", exc = exc.msg
|
trace "Cancelling thread future!", exc = exc.msg
|
||||||
while not ctx.setCancelled():
|
ctx.setCancelled()
|
||||||
warn "waiting to cancel thread future!", fn = astToStr(fn)
|
|
||||||
await sleepAsync(10.milliseconds)
|
|
||||||
raise exc
|
raise exc
|
||||||
finally:
|
finally:
|
||||||
discard ctx[].signal.close()
|
discard ctx[].signal.close()
|
||||||
@ -234,21 +228,28 @@ proc queryTask[DB](
|
|||||||
dq: DbQuery[KeyId]
|
dq: DbQuery[KeyId]
|
||||||
) {.gcsafe, nimcall.} =
|
) {.gcsafe, nimcall.} =
|
||||||
## run query command
|
## run query command
|
||||||
var qh: typeof(ds.query(dq))
|
|
||||||
executeTask(ctx):
|
executeTask(ctx):
|
||||||
qh = ds.query(dq)
|
let qh = ds.query(dq)
|
||||||
if qh.isOk(): (?!QResult).ok(default(QResult))
|
if qh.isOk():
|
||||||
else: (?!QResult).err(qh.error())
|
ctx[].res.ok default(QResult)
|
||||||
if qh.isErr():
|
else:
|
||||||
return
|
ctx[].res.err qh.error().toThreadErr()
|
||||||
|
return
|
||||||
|
|
||||||
var handle = qh.get()
|
var handle = qh.get()
|
||||||
for item in handle.iter():
|
|
||||||
executeTask(ctx):
|
for item in handle.iter():
|
||||||
discard ctx[].signal.waitSync().get()
|
if ctx[].cancelled:
|
||||||
item
|
# cancel iter, then run next cycle so it'll finish and close
|
||||||
|
handle.cancel = true
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
# wait for next request from async thread
|
||||||
|
discard ctx[].signal.waitSync().get()
|
||||||
|
ctx[].res = item.mapErr() do(exc: ref CatchableError) -> ThreadResErr:
|
||||||
|
exc
|
||||||
|
discard ctx[].signal.fireSync()
|
||||||
|
|
||||||
executeTask(ctx):
|
|
||||||
(?!QResult).err((ref QueryEndedError)(msg: "done").toThreadErr())
|
(?!QResult).err((ref QueryEndedError)(msg: "done").toThreadErr())
|
||||||
|
|
||||||
method query*(
|
method query*(
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user