add nextSignal using mutex

This commit is contained in:
Jaremy Creechley 2023-09-27 13:28:58 -07:00
parent 30728c1d0f
commit e571d2116a
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 9 additions and 11 deletions

View File

@ -260,8 +260,7 @@ method queryTask[DB](
# otherwise manually an set empty ok result # otherwise manually an set empty ok result
ctx[].res.ok (KeyId.none, DataBuffer(), ) ctx[].res.ok (KeyId.none, DataBuffer(), )
discard ctx[].signal.fireSync() discard ctx[].signal.fireSync()
if not nextSignal.waitSync(10.seconds).get(): ctx[].nextSignal.wait()
raise newException(DeadThreadDefect, "query task timeout; possible deadlock!")
var handle = handleRes.get() var handle = handleRes.get()
for item in handle.iter(): for item in handle.iter():
@ -276,8 +275,7 @@ method queryTask[DB](
exc exc
discard ctx[].signal.fireSync() discard ctx[].signal.fireSync()
ctx[].nextSignal.wait()
discard nextSignal.waitSync().get()
# set final result # set final result
(?!QResult).ok((KeyId.none, DataBuffer())) (?!QResult).ok((KeyId.none, DataBuffer()))
@ -302,7 +300,7 @@ method query*[BT](self: ThreadDatastore[BT],
# setup initial queryTask # setup initial queryTask
dispatchTaskWrap(self, signal): dispatchTaskWrap(self, signal):
self.tp.spawn queryTask(ctx, ds, query) self.tp.spawn queryTask(ctx, ds, query)
await nextSignal.fire() ctx[].nextSignal.fire()
var var
lock = newAsyncLock() # serialize querying under threads lock = newAsyncLock() # serialize querying under threads
@ -323,7 +321,7 @@ method query*[BT](self: ThreadDatastore[BT],
iter.finished = true iter.finished = true
defer: defer:
await nextSignal.fire() ctx[].nextSignal.fire()
if ctx[].res.isErr(): if ctx[].res.isErr():
return err(ctx[].res.error()) return err(ctx[].res.error())
@ -337,7 +335,7 @@ method query*[BT](self: ThreadDatastore[BT],
ctx.setCancelled() ctx.setCancelled()
discard ctx[].signal.close() discard ctx[].signal.close()
echo "nextSignal:CLOSE!" echo "nextSignal:CLOSE!"
discard nextSignal.close() ctx[].nextSignal.close()
self.semaphore.release() self.semaphore.release()
raise exc raise exc
@ -347,7 +345,7 @@ method query*[BT](self: ThreadDatastore[BT],
trace "Cancelling thread future!", exc = exc.msg trace "Cancelling thread future!", exc = exc.msg
discard signal.close() discard signal.close()
echo "nextSignal:CLOSE!" echo "nextSignal:CLOSE!"
discard nextSignal.close() ctx[].nextSignal.close()
self.semaphore.release() self.semaphore.release()
raise exc raise exc

View File

@ -56,16 +56,16 @@ proc toRes*[T,S](res: ThreadResult[T],
type type
MutexSignal* = tuple[lock: Lock, cond: Cond, open: bool] MutexSignal* = tuple[lock: Lock, cond: Cond, open: bool]
proc open*(sig: var MutexSignal) = proc init*(sig: var MutexSignal) =
sig.lock.initLock() sig.lock.initLock()
sig.cond.initCond() sig.cond.initCond()
sig.open = true sig.open = true
proc waitSync*(sig: var MutexSignal) = proc wait*(sig: var MutexSignal) =
withLock(sig.lock): withLock(sig.lock):
wait(sig.cond, sig.lock) wait(sig.cond, sig.lock)
proc fireSync*(sig: var MutexSignal) = proc fire*(sig: var MutexSignal) =
withLock(sig.lock): withLock(sig.lock):
signal(sig.cond) signal(sig.cond)