From c96ae7c016c1060fd9742dcc72dcbf45828e1735 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Mon, 18 Sep 2023 13:19:07 -0700 Subject: [PATCH] setup flags --- datastore/threads/threadproxyds.nim | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 9f145de..e18e64d 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -37,7 +37,8 @@ type TaskCtx[T: ThreadTypes] = ref object ds: ptr Datastore res: ThreadResult[T] - cancelled: bool + cancelled: Atomic[bool] + hasStarted: Atomic[bool] semaphore: AsyncSemaphore signal: ThreadSignalPtr @@ -85,7 +86,6 @@ template dispatchTask( withLocks(self, ctx, key, fut): try: - GC_ref(ctx) runTask() await fut @@ -93,7 +93,12 @@ template dispatchTask( result = failure(ctx.res.error()) # TODO: fix this, result shouldn't be accessed except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg - ctx.cancelled = true + if ctx.hasStarted.load(moAcquireRelease): + # could do a spinlock here, but for now + # it'd be better to leak than possibly corrupt memory + # since it's easier to detect and fix leaks + GC_ref(ctx) + ctx.cancelled.store(true, moAcquireRelease) await ctx.signal.fire() raise exc finally: @@ -105,10 +110,11 @@ proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} = ## try: + ctx[].hasStarted.store(true, moAcquireRelease) await ctx[].signal.wait() trace "Received signal" - if ctx[].cancelled: # there could eventually be other flags + if ctx[].cancelled.load(moAcquireRelease): # there could eventually be other flags trace "Cancelling future" if not fut.finished: await fut.cancelAndWait() # cancel the `has` future @@ -337,10 +343,10 @@ method get*( self.tp.spawn getTask(addr ctx, unsafeAddr key) self.dispatchTask(ctx, key.some, runTask) - if err =? res.errorOption: + if err =? ctx.res.errorOption: return failure err - return success(@(res.get())) + return success(@(ctx.res.get())) method close*(self: ThreadDatastore): Future[?!void] {.async.} = for fut in self.tasks.values.toSeq: