From 5c2cc57ccc720d916f2d94a2569d0b94b31a019c Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 26 Sep 2023 19:23:13 -0700 Subject: [PATCH] fix occasional deadlock --- datastore/threads/threadproxyds.nim | 33 +---------------------------- 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index d7bcd18..4b46795 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -260,14 +260,11 @@ proc queryTask[DB]( let handleRes = ds.query(query) if handleRes.isErr(): # set error and exit executeTask, which will fire final signal - echo "\tqueryTask:query:err " (?!QResult).err(handleRes.error()) else: # otherwise manually an set empty ok result ctx[].res.ok (KeyId.none, DataBuffer(), ) - echo "\tqueryTask:query:fireSync " discard ctx[].signal.fireSync() - echo "\tqueryTask:query:nextSignal:wait " discard nextSignal.waitSync().get() var handle = handleRes.get() @@ -275,87 +272,59 @@ proc queryTask[DB]( # wait for next request from async thread if ctx[].cancelled: - echo "\tqueryTask:query:iter:cancelled" # cancel iter, then run next cycle so it'll finish and close handle.cancel = true continue else: - if item.isOk: - echo "\tqueryTask:query:iter:result:data: ", $item.get().data ctx[].res = item.mapErr() do(exc: ref CatchableError) -> ThreadResErr: exc - echo "\tqueryTask:query:iter:result: ", ctx[].res - echo "\tqueryTask:query:iter:fireSync " discard ctx[].signal.fireSync() - echo "\tqueryTask:query:iter:nextSignal:wait! " discard nextSignal.waitSync().get() # set final result - echo "\tqueryTask:query:iter:done " (?!QResult).ok((KeyId.none, DataBuffer())) method query*( self: ThreadDatastore, q: Query): Future[?!QueryIter] {.async.} = - echo "\nquery:" await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err without nextSignal =? acquireSignal(), err: return failure err - echo "query:dbQuery:" let query = dbQuery( key= KeyId.new q.key.id(), value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) - echo "query:init:dispatch:" + # setup initial queryTask let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): - echo "query:init:dispatch:queryTask" self.tp.spawn queryTask(ctx, ds, query, nextSignal) - - # await wait(ctx[].signal) await nextSignal.fire() - # await wait(ctx[].signal) - - echo "query:init:dispatch:res: ", ctx[].res var lock = newAsyncLock() # serialize querying under threads iter = QueryIter.new() - # echo "query:first:iter:dispatch" - # await nextSignal.fire() - - echo "query:first:ready!\n" - proc next(): Future[?!QueryResponse] {.async.} = - echo "\n\nquery:next:exec: " let ctx = ctx trace "About to query" if lock.locked: - echo "query:next:lock:fail:alreadyLock " return failure (ref DatastoreError)(msg: "Should always await query features") if iter.finished == true: - echo "query:next:iter:finished" return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") - echo "query:next:wait:signal" await wait(ctx[].signal) if not ctx[].running: - echo "query:next:iter:finished " iter.finished = true - # return - echo "query:next:iter:res: ", ctx[].res, "\n" defer: - echo "query:iter:nextSignal:fire!" await nextSignal.fire() if ctx[].res.isErr():