diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 7e4e6bf..c79a181 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -248,7 +248,8 @@ type proc queryTask[DB]( ctx: TaskCtx[QResult], ds: DB, - dq: DbQuery[KeyId] + query: DbQuery[KeyId], + nextSignal: ThreadSignalPtr ) {.gcsafe, nimcall.} = ## run query command echo "\n\tqueryTask:init" @@ -256,12 +257,12 @@ proc queryTask[DB]( echo "\tqueryTask:exec:" # we execute this all inside `executeTask` # so we need to return a final result - let qh = ds.query(dq) - echo "\tqueryTask:query: ", qh - if qh.isErr(): + let handleRes = ds.query(query) + echo "\tqueryTask:query: ", handleRes + if handleRes.isErr(): # set error and exit executeTask, which will fire final signal echo "\tqueryTask:query:err " - (?!QResult).err(qh.error()) + (?!QResult).err(handleRes.error()) else: echo "\tqueryTask:query:ok " # otherwise manually an set empty ok result @@ -269,7 +270,7 @@ proc queryTask[DB]( discard ctx[].signal.fireSync() echo "\tqueryTask:query:fireSync " - var handle = qh.get() + var handle = handleRes.get() for item in handle.iter(): # wait for next request from async thread echo "\tqueryTask:query:iter:wait! " @@ -282,13 +283,14 @@ proc queryTask[DB]( handle.cancel = true continue else: - echo "\tqueryTask:query:iter:done" + echo "\tqueryTask:query:iter:result:" ctx[].res = item.mapErr() do(exc: ref CatchableError) -> ThreadResErr: exc echo "\tqueryTask:query:iter:fireSync " discard ctx[].signal.fireSync() # set final result + echo "\tqueryTask:query:iter:done " (?!QResult).ok((KeyId.none, DataBuffer())) method query*( @@ -299,9 +301,11 @@ method query*( await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err + without nextSignal =? acquireSignal(), err: + return failure err echo "query:dbQuery:" - let dq = dbQuery( + let query = dbQuery( key= KeyId.new q.key.id(), value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) @@ -309,7 +313,7 @@ method query*( let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): echo "query:init:dispatch:queryTask" - self.tp.spawn queryTask(ctx, ds, dq) + self.tp.spawn queryTask(ctx, ds, query, nextSignal) echo "query:init:dispatch:res: ", ctx[].res @@ -339,12 +343,11 @@ method query*( echo "query:next:iter:dispatch" await ctx[].signal.fire() - dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): - # trigger query task to iterate then wait for new result! - echo "query:next:iter:dispatch:fireSync" - echo "query:next:iter:dispatch:wait" + echo "query:next:iter:dispatch:wait" + await wait(ctx[].signal) + + echo "query:next:iter:res: ", ctx[].res - echo "query:next:iter:res: " if not ctx[].running: echo "query:next:iter:finished " iter.finished = true