From dacc28ab02e891cb53bbf42d21dbf5a83a7971d3 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 26 Sep 2023 19:21:09 -0700 Subject: [PATCH] fix occasional deadlock --- datastore/threads/threadproxyds.nim | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index ed11f18..d7bcd18 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -245,6 +245,8 @@ method close*(self: ThreadDatastore): Future[?!void] {.async.} = type QResult = DbQueryResponse[KeyId, DataBuffer] +import os + proc queryTask[DB]( ctx: TaskCtx[QResult], ds: DB, @@ -265,6 +267,8 @@ proc queryTask[DB]( ctx[].res.ok (KeyId.none, DataBuffer(), ) echo "\tqueryTask:query:fireSync " discard ctx[].signal.fireSync() + echo "\tqueryTask:query:nextSignal:wait " + discard nextSignal.waitSync().get() var handle = handleRes.get() for item in handle.iter(): @@ -313,6 +317,10 @@ method query*( dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): echo "query:init:dispatch:queryTask" self.tp.spawn queryTask(ctx, ds, query, nextSignal) + + # await wait(ctx[].signal) + await nextSignal.fire() + # await wait(ctx[].signal) echo "query:init:dispatch:res: ", ctx[].res @@ -328,9 +336,6 @@ method query*( proc next(): Future[?!QueryResponse] {.async.} = echo "\n\nquery:next:exec: " let ctx = ctx - # defer: - # if lock.locked: - # lock.release() trace "About to query" if lock.locked: @@ -340,9 +345,6 @@ method query*( echo "query:next:iter:finished" return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") - # echo "query:next:acquire:lock" - # await lock.acquire() - echo "query:next:wait:signal" await wait(ctx[].signal)