From 5afec2b3d8cfefb74216ee8ac3c9cad21ed3b011 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Wed, 27 Sep 2023 16:16:18 -0700 Subject: [PATCH] change nextSignal back to ThreadSignalPtr for timeouts --- datastore/threads/threadproxyds.nim | 38 ++++++++++++++++------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index c192b06..8be8a8a 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -41,7 +41,7 @@ type signal: ThreadSignalPtr running: bool ## used to mark when a task worker is running cancelled: bool ## used to cancel a task before it's started - nextSignal: MutexSignal + nextSignal: ThreadSignalPtr TaskCtx*[T] = SharedPtr[TaskCtxObj[T]] ## Task context object. @@ -53,8 +53,10 @@ type semaphore: AsyncSemaphore # semaphore is used for backpressure \ # to avoid exhausting file descriptors -proc newTaskCtx*[T](tp: typedesc[T], signal: ThreadSignalPtr): TaskCtx[T] = - newSharedPtr(TaskCtxObj[T](signal: signal)) +proc newTaskCtx*[T](tp: typedesc[T], + signal: ThreadSignalPtr, + nextSignal: ThreadSignalPtr = nil): TaskCtx[T] = + newSharedPtr(TaskCtxObj[T](signal: signal, nextSignal: nextSignal)) proc setCancelled[T](ctx: TaskCtx[T]) = ctx[].cancelled = true @@ -137,7 +139,7 @@ proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = has(ds, key) method has*[BT](self: ThreadDatastore[BT], - key: Key): Future[?!bool] {.async.} = + key: Key): Future[?!bool] {.async.} = await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err @@ -256,7 +258,8 @@ method queryTask[DB]( # otherwise manually an set empty ok result ctx[].res.ok (KeyId.none, DataBuffer(), ) discard ctx[].signal.fireSync() - ctx[].nextSignal.wait() + if not ctx[].nextSignal.waitSync(10.seconds).get(): + raise newException(DeadThreadDefect, "queryTask timed out") var handle = handleRes.get() for item in handle.iter(): @@ -271,7 +274,8 @@ method queryTask[DB]( exc discard ctx[].signal.fireSync() - ctx[].nextSignal.wait() + if not ctx[].nextSignal.waitSync(10.seconds).get(): + raise newException(DeadThreadDefect, "queryTask timed out") # set final result (?!QResult).ok((KeyId.none, DataBuffer())) @@ -285,17 +289,17 @@ method query*[BT](self: ThreadDatastore[BT], await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - let ctx = newTaskCtx(QResult, signal=signal) - # echo "nextSignal:OPEN!" - ctx[].nextSignal.init() + without nextSignal =? acquireSignal(), err: + return failure err + let ctx = newTaskCtx(QResult, signal=signal, nextSignal=nextSignal) - proc iterDispose() = + proc iterDispose() {.async.} = # echo "signal:CLOSE!" ctx.setCancelled() - ctx[].nextSignal.fire() - discard signal.close() + await ctx[].nextSignal.fire() + discard ctx[].signal.close() # echo "nextSignal:CLOSE!" - ctx[].nextSignal.close() + discard ctx[].nextSignal.close() self.semaphore.release() try: @@ -306,7 +310,7 @@ method query*[BT](self: ThreadDatastore[BT], # setup initial queryTask dispatchTaskWrap(self, signal): self.tp.spawn queryTask(ctx, ds, query) - ctx[].nextSignal.fire() + await ctx[].nextSignal.fire() var lock = newAsyncLock() # serialize querying under threads var iter = QueryIter.new() @@ -328,7 +332,7 @@ method query*[BT](self: ThreadDatastore[BT], iter.finished = true defer: - ctx[].nextSignal.fire() + await ctx[].nextSignal.fire() if ctx[].res.isErr(): return err(ctx[].res.error()) @@ -340,13 +344,13 @@ method query*[BT](self: ThreadDatastore[BT], except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() - iterDispose() + await iterDispose() # todo: is this valid? raise exc return success iter except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg - iterDispose() + await iterDispose() raise exc proc new*[DB](self: type ThreadDatastore,