fix occasional deadlock

This commit is contained in:
Jaremy Creechley 2023-09-26 19:23:13 -07:00
parent dacc28ab02
commit 5c2cc57ccc
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300

View File

@ -260,14 +260,11 @@ proc queryTask[DB](
let handleRes = ds.query(query)
if handleRes.isErr():
# set error and exit executeTask, which will fire final signal
echo "\tqueryTask:query:err "
(?!QResult).err(handleRes.error())
else:
# otherwise manually an set empty ok result
ctx[].res.ok (KeyId.none, DataBuffer(), )
echo "\tqueryTask:query:fireSync "
discard ctx[].signal.fireSync()
echo "\tqueryTask:query:nextSignal:wait "
discard nextSignal.waitSync().get()
var handle = handleRes.get()
@ -275,87 +272,59 @@ proc queryTask[DB](
# wait for next request from async thread
if ctx[].cancelled:
echo "\tqueryTask:query:iter:cancelled"
# cancel iter, then run next cycle so it'll finish and close
handle.cancel = true
continue
else:
if item.isOk:
echo "\tqueryTask:query:iter:result:data: ", $item.get().data
ctx[].res = item.mapErr() do(exc: ref CatchableError) -> ThreadResErr:
exc
echo "\tqueryTask:query:iter:result: ", ctx[].res
echo "\tqueryTask:query:iter:fireSync "
discard ctx[].signal.fireSync()
echo "\tqueryTask:query:iter:nextSignal:wait! "
discard nextSignal.waitSync().get()
# set final result
echo "\tqueryTask:query:iter:done "
(?!QResult).ok((KeyId.none, DataBuffer()))
method query*(
self: ThreadDatastore,
q: Query): Future[?!QueryIter] {.async.} =
echo "\nquery:"
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err
without nextSignal =? acquireSignal(), err:
return failure err
echo "query:dbQuery:"
let query = dbQuery(
key= KeyId.new q.key.id(),
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
echo "query:init:dispatch:"
# setup initial queryTask
let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal))
dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal):
echo "query:init:dispatch:queryTask"
self.tp.spawn queryTask(ctx, ds, query, nextSignal)
# await wait(ctx[].signal)
await nextSignal.fire()
# await wait(ctx[].signal)
echo "query:init:dispatch:res: ", ctx[].res
var
lock = newAsyncLock() # serialize querying under threads
iter = QueryIter.new()
# echo "query:first:iter:dispatch"
# await nextSignal.fire()
echo "query:first:ready!\n"
proc next(): Future[?!QueryResponse] {.async.} =
echo "\n\nquery:next:exec: "
let ctx = ctx
trace "About to query"
if lock.locked:
echo "query:next:lock:fail:alreadyLock "
return failure (ref DatastoreError)(msg: "Should always await query features")
if iter.finished == true:
echo "query:next:iter:finished"
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")
echo "query:next:wait:signal"
await wait(ctx[].signal)
if not ctx[].running:
echo "query:next:iter:finished "
iter.finished = true
# return
echo "query:next:iter:res: ", ctx[].res, "\n"
defer:
echo "query:iter:nextSignal:fire!"
await nextSignal.fire()
if ctx[].res.isErr():