From d2a8a72a7d4825b035caab3d2c46469e4ababc30 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Mon, 18 Sep 2023 13:23:05 -0700 Subject: [PATCH] setup flags --- datastore/threads/threadproxyds.nim | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index e18e64d..23e7f9d 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -38,7 +38,7 @@ type ds: ptr Datastore res: ThreadResult[T] cancelled: Atomic[bool] - hasStarted: Atomic[bool] + isActive: Atomic[bool] semaphore: AsyncSemaphore signal: ThreadSignalPtr @@ -93,11 +93,12 @@ template dispatchTask( result = failure(ctx.res.error()) # TODO: fix this, result shouldn't be accessed except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg - if ctx.hasStarted.load(moAcquireRelease): - # could do a spinlock here, but for now - # it'd be better to leak than possibly corrupt memory - # since it's easier to detect and fix leaks + if ctx.isActive.load(moAcquireRelease): + # could do a spinlock here until the other side cancels, + # but for now it'd at least be better to leak than possibly + # corrupt memory since it's easier to detect and fix leaks GC_ref(ctx) + warn "request was cancelled while thread task is running", exc = exc.msg ctx.cancelled.store(true, moAcquireRelease) await ctx.signal.fire() raise exc @@ -110,7 +111,7 @@ proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} = ## try: - ctx[].hasStarted.store(true, moAcquireRelease) + ctx[].isActive.store(true, moAcquireRelease) await ctx[].signal.wait() trace "Received signal" @@ -124,6 +125,8 @@ proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} = trace "Exception in thread signal monitor", exc = exc.msg ctx.res.err(exc) discard ctx[].signal.fireSync() + finally: + ctx[].isActive.store(false, moAcquireRelease) proc asyncHasTask( ctx: ptr TaskCtx[bool],