mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-03 06:03:06 +00:00
test query
This commit is contained in:
parent
4b4403bf4d
commit
a4c3574ff2
@ -248,7 +248,8 @@ type
|
||||
proc queryTask[DB](
|
||||
ctx: TaskCtx[QResult],
|
||||
ds: DB,
|
||||
dq: DbQuery[KeyId]
|
||||
query: DbQuery[KeyId],
|
||||
nextSignal: ThreadSignalPtr
|
||||
) {.gcsafe, nimcall.} =
|
||||
## run query command
|
||||
echo "\n\tqueryTask:init"
|
||||
@ -256,12 +257,12 @@ proc queryTask[DB](
|
||||
echo "\tqueryTask:exec:"
|
||||
# we execute this all inside `executeTask`
|
||||
# so we need to return a final result
|
||||
let qh = ds.query(dq)
|
||||
echo "\tqueryTask:query: ", qh
|
||||
if qh.isErr():
|
||||
let handleRes = ds.query(query)
|
||||
echo "\tqueryTask:query: ", handleRes
|
||||
if handleRes.isErr():
|
||||
# set error and exit executeTask, which will fire final signal
|
||||
echo "\tqueryTask:query:err "
|
||||
(?!QResult).err(qh.error())
|
||||
(?!QResult).err(handleRes.error())
|
||||
else:
|
||||
echo "\tqueryTask:query:ok "
|
||||
# otherwise manually an set empty ok result
|
||||
@ -269,7 +270,7 @@ proc queryTask[DB](
|
||||
discard ctx[].signal.fireSync()
|
||||
echo "\tqueryTask:query:fireSync "
|
||||
|
||||
var handle = qh.get()
|
||||
var handle = handleRes.get()
|
||||
for item in handle.iter():
|
||||
# wait for next request from async thread
|
||||
echo "\tqueryTask:query:iter:wait! "
|
||||
@ -282,13 +283,14 @@ proc queryTask[DB](
|
||||
handle.cancel = true
|
||||
continue
|
||||
else:
|
||||
echo "\tqueryTask:query:iter:done"
|
||||
echo "\tqueryTask:query:iter:result:"
|
||||
ctx[].res = item.mapErr() do(exc: ref CatchableError) -> ThreadResErr:
|
||||
exc
|
||||
echo "\tqueryTask:query:iter:fireSync "
|
||||
discard ctx[].signal.fireSync()
|
||||
|
||||
# set final result
|
||||
echo "\tqueryTask:query:iter:done "
|
||||
(?!QResult).ok((KeyId.none, DataBuffer()))
|
||||
|
||||
method query*(
|
||||
@ -299,9 +301,11 @@ method query*(
|
||||
await self.semaphore.acquire()
|
||||
without signal =? acquireSignal(), err:
|
||||
return failure err
|
||||
without nextSignal =? acquireSignal(), err:
|
||||
return failure err
|
||||
|
||||
echo "query:dbQuery:"
|
||||
let dq = dbQuery(
|
||||
let query = dbQuery(
|
||||
key= KeyId.new q.key.id(),
|
||||
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
|
||||
|
||||
@ -309,7 +313,7 @@ method query*(
|
||||
let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal))
|
||||
dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal):
|
||||
echo "query:init:dispatch:queryTask"
|
||||
self.tp.spawn queryTask(ctx, ds, dq)
|
||||
self.tp.spawn queryTask(ctx, ds, query, nextSignal)
|
||||
|
||||
echo "query:init:dispatch:res: ", ctx[].res
|
||||
|
||||
@ -339,12 +343,11 @@ method query*(
|
||||
|
||||
echo "query:next:iter:dispatch"
|
||||
await ctx[].signal.fire()
|
||||
dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal):
|
||||
# trigger query task to iterate then wait for new result!
|
||||
echo "query:next:iter:dispatch:fireSync"
|
||||
echo "query:next:iter:dispatch:wait"
|
||||
echo "query:next:iter:dispatch:wait"
|
||||
await wait(ctx[].signal)
|
||||
|
||||
echo "query:next:iter:res: ", ctx[].res
|
||||
|
||||
echo "query:next:iter:res: "
|
||||
if not ctx[].running:
|
||||
echo "query:next:iter:finished "
|
||||
iter.finished = true
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user