From 4b4403bf4d3b3141adff8271230970b0d25c8a3f Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 26 Sep 2023 17:59:05 -0700 Subject: [PATCH] test query --- datastore/threads/threadproxyds.nim | 36 ++++++++++++++--------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index d21b152..7e4e6bf 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -113,16 +113,22 @@ template dispatchTaskWrap[T](self: ThreadDatastore, signal: ThreadSignalPtr, blk: untyped ): auto = - try: - case self.backend.kind: - of Sqlite: - var ds {.used, inject.} = self.backend.sql - proc runTask() = - `blk` - runTask() - # echo "\t\tdispatchTask:wait!" - await wait(ctx[].signal) + case self.backend.kind: + of Sqlite: + var ds {.used, inject.} = self.backend.sql + proc runTask() = + `blk` + runTask() + # echo "\t\tdispatchTask:wait!" + await wait(ctx[].signal) +template dispatchTask[T](self: ThreadDatastore, + signal: ThreadSignalPtr, + blk: untyped + ): auto = + let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal)) + try: + dispatchTaskWrap[T](self, signal, blk) except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() @@ -131,13 +137,6 @@ template dispatchTaskWrap[T](self: ThreadDatastore, discard ctx[].signal.close() self.semaphore.release() -template dispatchTask[T](self: ThreadDatastore, - signal: ThreadSignalPtr, - blk: untyped - ): auto = - let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal)) - dispatchTaskWrap[T](self, signal, blk) - proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = ## run backend command @@ -307,18 +306,17 @@ method query*( value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) echo "query:init:dispatch:" - dispatchTask[DbQueryResponse[KeyId, DataBuffer]](self, signal): + let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) + dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): echo "query:init:dispatch:queryTask" self.tp.spawn queryTask(ctx, ds, dq) - echo "query:init:dispatch:started" echo "query:init:dispatch:res: ", ctx[].res var lock = newAsyncLock() # serialize querying under threads iter = QueryIter.new() - echo "query:asyncLock:done" echo "query:next:ready: " proc next(): Future[?!QueryResponse] {.async.} =