From b6baefc19c85214cdbd48cf44272f909a464c56b Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 26 Sep 2023 15:29:35 -0700 Subject: [PATCH] setup query end --- datastore/threads/threadproxyds.nim | 43 +++++++++++++++-------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index a95b9cd..f4b6b59 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -60,13 +60,9 @@ type var ctxLock: Lock ctxLock.initLock() -proc setCancelled[T](ctx: TaskCtx[T]): bool = +proc setCancelled[T](ctx: TaskCtx[T]) = withLock(ctxLock): - if ctx[].running: - return false - else: - ctx[].cancelled = true - return true + ctx[].cancelled = true proc setRunning[T](ctx: TaskCtx[T]): bool = withLock(ctxLock): @@ -125,9 +121,7 @@ template dispatchTask[T](self: ThreadDatastore, await wait(ctx[].signal) except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg - while not ctx.setCancelled(): - warn "waiting to cancel thread future!", fn = astToStr(fn) - await sleepAsync(10.milliseconds) + ctx.setCancelled() raise exc finally: discard ctx[].signal.close() @@ -234,21 +228,28 @@ proc queryTask[DB]( dq: DbQuery[KeyId] ) {.gcsafe, nimcall.} = ## run query command - var qh: typeof(ds.query(dq)) executeTask(ctx): - qh = ds.query(dq) - if qh.isOk(): (?!QResult).ok(default(QResult)) - else: (?!QResult).err(qh.error()) - if qh.isErr(): - return + let qh = ds.query(dq) + if qh.isOk(): + ctx[].res.ok default(QResult) + else: + ctx[].res.err qh.error().toThreadErr() + return - var handle = qh.get() - for item in handle.iter(): - executeTask(ctx): - discard ctx[].signal.waitSync().get() - item + var handle = qh.get() + + for item in handle.iter(): + if ctx[].cancelled: + # cancel iter, then run next cycle so it'll finish and close + handle.cancel = true + continue + else: + # wait for next request from async thread + discard ctx[].signal.waitSync().get() + ctx[].res = item.mapErr() do(exc: ref CatchableError) -> ThreadResErr: + exc + discard ctx[].signal.fireSync() - executeTask(ctx): (?!QResult).err((ref QueryEndedError)(msg: "done").toThreadErr()) method query*(