mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-08 00:23:10 +00:00
test query
This commit is contained in:
parent
d0ee284d90
commit
f59cf06419
@ -250,21 +250,28 @@ proc queryTask[DB](
|
|||||||
dq: DbQuery[KeyId]
|
dq: DbQuery[KeyId]
|
||||||
) {.gcsafe, nimcall.} =
|
) {.gcsafe, nimcall.} =
|
||||||
## run query command
|
## run query command
|
||||||
|
echo "\n\tqueryTask:init"
|
||||||
executeTask(ctx):
|
executeTask(ctx):
|
||||||
|
echo "\tqueryTask:exec:"
|
||||||
# we execute this all inside `executeTask`
|
# we execute this all inside `executeTask`
|
||||||
# so we need to return a final result
|
# so we need to return a final result
|
||||||
let qh = ds.query(dq)
|
let qh = ds.query(dq)
|
||||||
|
echo "\tqueryTask:query: ", qh
|
||||||
if qh.isErr():
|
if qh.isErr():
|
||||||
# set error and exit executeTask, which will fire final signal
|
# set error and exit executeTask, which will fire final signal
|
||||||
|
echo "\tqueryTask:query:err "
|
||||||
(?!QResult).err(qh.error())
|
(?!QResult).err(qh.error())
|
||||||
else:
|
else:
|
||||||
|
echo "\tqueryTask:query:ok "
|
||||||
# otherwise manually an set empty ok result
|
# otherwise manually an set empty ok result
|
||||||
ctx[].res.ok (KeyId.none, DataBuffer(), )
|
ctx[].res.ok (KeyId.none, DataBuffer(), )
|
||||||
discard ctx[].signal.fireSync()
|
discard ctx[].signal.fireSync()
|
||||||
|
echo "\tqueryTask:query:fireSync "
|
||||||
|
|
||||||
var handle = qh.get()
|
var handle = qh.get()
|
||||||
for item in handle.iter():
|
for item in handle.iter():
|
||||||
# wait for next request from async thread
|
# wait for next request from async thread
|
||||||
|
echo "\tqueryTask:query:iter:wait "
|
||||||
discard ctx[].signal.waitSync().get()
|
discard ctx[].signal.waitSync().get()
|
||||||
|
|
||||||
if ctx[].cancelled:
|
if ctx[].cancelled:
|
||||||
@ -274,6 +281,7 @@ proc queryTask[DB](
|
|||||||
else:
|
else:
|
||||||
ctx[].res = item.mapErr() do(exc: ref CatchableError) -> ThreadResErr:
|
ctx[].res = item.mapErr() do(exc: ref CatchableError) -> ThreadResErr:
|
||||||
exc
|
exc
|
||||||
|
echo "\tqueryTask:query:iter:fireSync "
|
||||||
discard ctx[].signal.fireSync()
|
discard ctx[].signal.fireSync()
|
||||||
|
|
||||||
# set final result
|
# set final result
|
||||||
@ -283,22 +291,31 @@ method query*(
|
|||||||
self: ThreadDatastore,
|
self: ThreadDatastore,
|
||||||
q: Query): Future[?!QueryIter] {.async.} =
|
q: Query): Future[?!QueryIter] {.async.} =
|
||||||
|
|
||||||
|
echo "\nquery:"
|
||||||
await self.semaphore.acquire()
|
await self.semaphore.acquire()
|
||||||
without signal =? acquireSignal(), err:
|
without signal =? acquireSignal(), err:
|
||||||
return failure err
|
return failure err
|
||||||
|
|
||||||
|
echo "query:dbQuery:"
|
||||||
let dq = dbQuery(
|
let dq = dbQuery(
|
||||||
key= KeyId.new q.key.id(),
|
key= KeyId.new q.key.id(),
|
||||||
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
|
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
|
||||||
|
|
||||||
|
echo "query:init:dispatch:"
|
||||||
dispatchTask[DbQueryResponse[KeyId, DataBuffer]](self, signal):
|
dispatchTask[DbQueryResponse[KeyId, DataBuffer]](self, signal):
|
||||||
|
echo "query:init:dispatch:queryTask"
|
||||||
self.tp.spawn queryTask(ctx, ds, dq)
|
self.tp.spawn queryTask(ctx, ds, dq)
|
||||||
|
|
||||||
|
echo "query:init:dispatch:queryTask:done"
|
||||||
var
|
var
|
||||||
lock = newAsyncLock() # serialize querying under threads
|
lock = newAsyncLock() # serialize querying under threads
|
||||||
iter = QueryIter.new()
|
iter = QueryIter.new()
|
||||||
|
|
||||||
|
echo "query:asyncLock:done"
|
||||||
|
echo "query:next:ready: "
|
||||||
|
|
||||||
proc next(): Future[?!QueryResponse] {.async.} =
|
proc next(): Future[?!QueryResponse] {.async.} =
|
||||||
|
echo "query:next:exec: "
|
||||||
let ctx = ctx
|
let ctx = ctx
|
||||||
defer:
|
defer:
|
||||||
if lock.locked:
|
if lock.locked:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user