test query

This commit is contained in:
Jaremy Creechley 2023-09-26 18:33:36 -07:00
parent 70b5956773
commit 02de983dbd
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300

View File

@ -252,28 +252,23 @@ proc queryTask[DB](
nextSignal: ThreadSignalPtr nextSignal: ThreadSignalPtr
) {.gcsafe, nimcall.} = ) {.gcsafe, nimcall.} =
## run query command ## run query command
echo "\n\tqueryTask:init"
executeTask(ctx): executeTask(ctx):
echo "\tqueryTask:exec:"
# we execute this all inside `executeTask` # we execute this all inside `executeTask`
# so we need to return a final result # so we need to return a final result
let handleRes = ds.query(query) let handleRes = ds.query(query)
echo "\tqueryTask:query: ", handleRes
if handleRes.isErr(): if handleRes.isErr():
# set error and exit executeTask, which will fire final signal # set error and exit executeTask, which will fire final signal
echo "\tqueryTask:query:err " echo "\tqueryTask:query:err "
(?!QResult).err(handleRes.error()) (?!QResult).err(handleRes.error())
else: else:
echo "\tqueryTask:query:ok "
# otherwise manually an set empty ok result # otherwise manually an set empty ok result
ctx[].res.ok (KeyId.none, DataBuffer(), ) ctx[].res.ok (KeyId.none, DataBuffer(), )
discard ctx[].signal.fireSync()
echo "\tqueryTask:query:fireSync " echo "\tqueryTask:query:fireSync "
discard ctx[].signal.fireSync()
var handle = handleRes.get() var handle = handleRes.get()
for item in handle.iter(): for item in handle.iter():
# wait for next request from async thread # wait for next request from async thread
echo "\tqueryTask:query:iter:wait! "
if ctx[].cancelled: if ctx[].cancelled:
echo "\tqueryTask:query:iter:cancelled" echo "\tqueryTask:query:iter:cancelled"
@ -286,6 +281,8 @@ proc queryTask[DB](
echo "\tqueryTask:query:iter:result: ", ctx[].res echo "\tqueryTask:query:iter:result: ", ctx[].res
echo "\tqueryTask:query:iter:fireSync " echo "\tqueryTask:query:iter:fireSync "
discard ctx[].signal.fireSync() discard ctx[].signal.fireSync()
echo "\tqueryTask:query:iter:nextSignal:wait! "
discard nextSignal.waitSync().get() discard nextSignal.waitSync().get()
# set final result # set final result
@ -320,13 +317,13 @@ method query*(
lock = newAsyncLock() # serialize querying under threads lock = newAsyncLock() # serialize querying under threads
iter = QueryIter.new() iter = QueryIter.new()
echo "query:first:iter:dispatch" # echo "query:first:iter:dispatch"
await nextSignal.fire() # await nextSignal.fire()
echo "query:next:ready: " echo "query:first:ready!\n"
proc next(): Future[?!QueryResponse] {.async.} = proc next(): Future[?!QueryResponse] {.async.} =
echo "query:next:exec: " echo "\n\nquery:next:exec: "
let ctx = ctx let ctx = ctx
defer: defer:
if lock.locked: if lock.locked:
@ -351,6 +348,9 @@ method query*(
# return # return
echo "query:next:iter:res: ", ctx[].res, "\n" echo "query:next:iter:res: ", ctx[].res, "\n"
defer:
echo "query:iter:nextSignal:fire!"
await nextSignal.fire()
if ctx[].res.isErr(): if ctx[].res.isErr():
return err(ctx[].res.error()) return err(ctx[].res.error())
@ -360,8 +360,6 @@ method query*(
let data = qres.data.toSeq() let data = qres.data.toSeq()
return (?!QueryResponse).ok((key: key, data: data)) return (?!QueryResponse).ok((key: key, data: data))
echo "query:next:iter:dispatch"
await nextSignal.fire()
iter.next = next iter.next = next
return success iter return success iter