change nextSignal back to ThreadSignalPtr for timeouts

This commit is contained in:
Jaremy Creechley 2023-09-27 16:16:18 -07:00
parent 78ea3b117e
commit 5afec2b3d8
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300

View File

@ -41,7 +41,7 @@ type
signal: ThreadSignalPtr signal: ThreadSignalPtr
running: bool ## used to mark when a task worker is running running: bool ## used to mark when a task worker is running
cancelled: bool ## used to cancel a task before it's started cancelled: bool ## used to cancel a task before it's started
nextSignal: MutexSignal nextSignal: ThreadSignalPtr
TaskCtx*[T] = SharedPtr[TaskCtxObj[T]] TaskCtx*[T] = SharedPtr[TaskCtxObj[T]]
## Task context object. ## Task context object.
@ -53,8 +53,10 @@ type
semaphore: AsyncSemaphore # semaphore is used for backpressure \ semaphore: AsyncSemaphore # semaphore is used for backpressure \
# to avoid exhausting file descriptors # to avoid exhausting file descriptors
proc newTaskCtx*[T](tp: typedesc[T], signal: ThreadSignalPtr): TaskCtx[T] = proc newTaskCtx*[T](tp: typedesc[T],
newSharedPtr(TaskCtxObj[T](signal: signal)) signal: ThreadSignalPtr,
nextSignal: ThreadSignalPtr = nil): TaskCtx[T] =
newSharedPtr(TaskCtxObj[T](signal: signal, nextSignal: nextSignal))
proc setCancelled[T](ctx: TaskCtx[T]) = proc setCancelled[T](ctx: TaskCtx[T]) =
ctx[].cancelled = true ctx[].cancelled = true
@ -137,7 +139,7 @@ proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} =
has(ds, key) has(ds, key)
method has*[BT](self: ThreadDatastore[BT], method has*[BT](self: ThreadDatastore[BT],
key: Key): Future[?!bool] {.async.} = key: Key): Future[?!bool] {.async.} =
await self.semaphore.acquire() await self.semaphore.acquire()
without signal =? acquireSignal(), err: without signal =? acquireSignal(), err:
return failure err return failure err
@ -256,7 +258,8 @@ method queryTask[DB](
# otherwise manually an set empty ok result # otherwise manually an set empty ok result
ctx[].res.ok (KeyId.none, DataBuffer(), ) ctx[].res.ok (KeyId.none, DataBuffer(), )
discard ctx[].signal.fireSync() discard ctx[].signal.fireSync()
ctx[].nextSignal.wait() if not ctx[].nextSignal.waitSync(10.seconds).get():
raise newException(DeadThreadDefect, "queryTask timed out")
var handle = handleRes.get() var handle = handleRes.get()
for item in handle.iter(): for item in handle.iter():
@ -271,7 +274,8 @@ method queryTask[DB](
exc exc
discard ctx[].signal.fireSync() discard ctx[].signal.fireSync()
ctx[].nextSignal.wait() if not ctx[].nextSignal.waitSync(10.seconds).get():
raise newException(DeadThreadDefect, "queryTask timed out")
# set final result # set final result
(?!QResult).ok((KeyId.none, DataBuffer())) (?!QResult).ok((KeyId.none, DataBuffer()))
@ -285,17 +289,17 @@ method query*[BT](self: ThreadDatastore[BT],
await self.semaphore.acquire() await self.semaphore.acquire()
without signal =? acquireSignal(), err: without signal =? acquireSignal(), err:
return failure err return failure err
let ctx = newTaskCtx(QResult, signal=signal) without nextSignal =? acquireSignal(), err:
# echo "nextSignal:OPEN!" return failure err
ctx[].nextSignal.init() let ctx = newTaskCtx(QResult, signal=signal, nextSignal=nextSignal)
proc iterDispose() = proc iterDispose() {.async.} =
# echo "signal:CLOSE!" # echo "signal:CLOSE!"
ctx.setCancelled() ctx.setCancelled()
ctx[].nextSignal.fire() await ctx[].nextSignal.fire()
discard signal.close() discard ctx[].signal.close()
# echo "nextSignal:CLOSE!" # echo "nextSignal:CLOSE!"
ctx[].nextSignal.close() discard ctx[].nextSignal.close()
self.semaphore.release() self.semaphore.release()
try: try:
@ -306,7 +310,7 @@ method query*[BT](self: ThreadDatastore[BT],
# setup initial queryTask # setup initial queryTask
dispatchTaskWrap(self, signal): dispatchTaskWrap(self, signal):
self.tp.spawn queryTask(ctx, ds, query) self.tp.spawn queryTask(ctx, ds, query)
ctx[].nextSignal.fire() await ctx[].nextSignal.fire()
var lock = newAsyncLock() # serialize querying under threads var lock = newAsyncLock() # serialize querying under threads
var iter = QueryIter.new() var iter = QueryIter.new()
@ -328,7 +332,7 @@ method query*[BT](self: ThreadDatastore[BT],
iter.finished = true iter.finished = true
defer: defer:
ctx[].nextSignal.fire() await ctx[].nextSignal.fire()
if ctx[].res.isErr(): if ctx[].res.isErr():
return err(ctx[].res.error()) return err(ctx[].res.error())
@ -340,13 +344,13 @@ method query*[BT](self: ThreadDatastore[BT],
except CancelledError as exc: except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg trace "Cancelling thread future!", exc = exc.msg
ctx.setCancelled() ctx.setCancelled()
iterDispose() await iterDispose() # todo: is this valid?
raise exc raise exc
return success iter return success iter
except CancelledError as exc: except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg trace "Cancelling thread future!", exc = exc.msg
iterDispose() await iterDispose()
raise exc raise exc
proc new*[DB](self: type ThreadDatastore, proc new*[DB](self: type ThreadDatastore,