From e571d2116a986fb7f2233e084170a182afee9b05 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 13:28:58 -0700 Subject: [PATCH] add nextSignal using mutex --- datastore/threads/threadproxyds.nim | 14 ++++++-------- datastore/threads/threadresult.nim | 6 +++--- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 1c5528b..8a543b8 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -260,8 +260,7 @@ method queryTask[DB]( # otherwise manually an set empty ok result ctx[].res.ok (KeyId.none, DataBuffer(), ) discard ctx[].signal.fireSync() - if not nextSignal.waitSync(10.seconds).get(): - raise newException(DeadThreadDefect, "query task timeout; possible deadlock!") + ctx[].nextSignal.wait() var handle = handleRes.get() for item in handle.iter(): @@ -276,8 +275,7 @@ method queryTask[DB]( exc discard ctx[].signal.fireSync() - - discard nextSignal.waitSync().get() + ctx[].nextSignal.wait() # set final result (?!QResult).ok((KeyId.none, DataBuffer())) @@ -302,7 +300,7 @@ method query*[BT](self: ThreadDatastore[BT], # setup initial queryTask dispatchTaskWrap(self, signal): self.tp.spawn queryTask(ctx, ds, query) - await nextSignal.fire() + ctx[].nextSignal.fire() var lock = newAsyncLock() # serialize querying under threads @@ -323,7 +321,7 @@ method query*[BT](self: ThreadDatastore[BT], iter.finished = true defer: - await nextSignal.fire() + ctx[].nextSignal.fire() if ctx[].res.isErr(): return err(ctx[].res.error()) @@ -337,7 +335,7 @@ method query*[BT](self: ThreadDatastore[BT], ctx.setCancelled() discard ctx[].signal.close() echo "nextSignal:CLOSE!" - discard nextSignal.close() + ctx[].nextSignal.close() self.semaphore.release() raise exc @@ -347,7 +345,7 @@ method query*[BT](self: ThreadDatastore[BT], trace "Cancelling thread future!", exc = exc.msg discard signal.close() echo "nextSignal:CLOSE!" - discard nextSignal.close() + ctx[].nextSignal.close() self.semaphore.release() raise exc diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim index 9caa70c..3508e9e 100644 --- a/datastore/threads/threadresult.nim +++ b/datastore/threads/threadresult.nim @@ -56,16 +56,16 @@ proc toRes*[T,S](res: ThreadResult[T], type MutexSignal* = tuple[lock: Lock, cond: Cond, open: bool] -proc open*(sig: var MutexSignal) = +proc init*(sig: var MutexSignal) = sig.lock.initLock() sig.cond.initCond() sig.open = true -proc waitSync*(sig: var MutexSignal) = +proc wait*(sig: var MutexSignal) = withLock(sig.lock): wait(sig.cond, sig.lock) -proc fireSync*(sig: var MutexSignal) = +proc fire*(sig: var MutexSignal) = withLock(sig.lock): signal(sig.cond)