From ca143227c63267418b3f41b7b6b427a85d08a0f2 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 26 Sep 2023 13:25:15 -0700 Subject: [PATCH] setup query --- datastore/threads/threadproxyds.nim | 51 ++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index c1a72b1..4d068b7 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -65,6 +65,15 @@ proc setCancelled(ctx: var TaskCtx): bool = ctx.cancelled = true return true +proc setRunning[T](ctx: ptr TaskCtx[T]): bool = + withLock(ctxLock): + if ctx.cancelled: + return + ctx.running = true +proc setDone[T](ctx: ptr TaskCtx[T]) = + withLock(ctxLock): + ctx.running = false + proc acquireSignal(): ?!ThreadSignalPtr = let signal = ThreadSignalPtr.new() if signal.isErr(): @@ -74,10 +83,8 @@ proc acquireSignal(): ?!ThreadSignalPtr = template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) = try: - withLock(ctxLock): - if ctx.cancelled: - return - ctx.running = true + if not ctx.setRunning(): + return ## run backend command let res = `blk` @@ -88,13 +95,11 @@ template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) = ctx.res.ok(res.get()) else: ctx.res.err res.error().toThreadErr() - - withLock(ctxLock): - ctx.running = false except CatchableError as exc: - trace "Unexpected exception thrown in asyncHasTask", exc = exc.msg - raiseAssert exc.msg + trace "Unexpected exception thrown in async task", exc = exc.msg + ctx[].res.err exc.toThreadErr() finally: + ctx.setDone() discard ctx[].signal.fireSync() template dispatchTask[T](self: ThreadDatastore, @@ -214,13 +219,27 @@ method close*(self: ThreadDatastore): Future[?!void] {.async.} = of Sqlite: self.backend.sql.close() -proc queryTask[T, DB](ctx: ptr TaskCtx[T], ds: DB; - query: DbQuery[KeyId]) {.gcsafe, nimcall.} = - ## run backend command - var handle = ds.query(query) - executeTask(ctx): - handle - +type + QResult = DbQueryResponse[KeyId, DataBuffer] + +proc queryTask[DB]( + ctx: ptr TaskCtx[QResult], + ds: DB, + dq: DbQuery[KeyId] +) {.gcsafe, nimcall.} = + ## run query command + if not ctx.setRunning(): + return + + var qh = ds.query(dq) + if qh .isOk(): + (?!QResult).ok(default(QResult)) + else: + (?!QResult).err(qh.error()) + + var handle = qh.get() + + for item in executeTask(ctx): query(ds, key)