From d8d6ad771a7d0b3446dc392c618f010fa9e86c60 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Mon, 25 Sep 2023 23:05:26 -0700 Subject: [PATCH] integrate setups --- datastore/threads/threadproxyds.nim | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 0060d78..1da13d3 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -91,25 +91,38 @@ template dispatchTask(self: ThreadDatastore, except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg while not ctx.setCancelled(): + warn "waiting to cancel thread future!", fn = astToStr(fn), key = $ctx.key await sleepAsync(10.milliseconds) raise exc finally: discard ctx.signal.close() self.semaphore.release() -proc hasTask[D](ctx: ptr TaskCtx, ds: D) = - defer: - if not ctx.isNil: - discard ctx[].signal.fireSync() - +proc runTask[D, T](ctx: ptr TaskCtx, ds: D, cb: proc(ctx: ptr TaskCtx): T {.gcsafe.}) {.gcsafe.} = try: - let res = has(ds, ctx.key) withLock(ctxLock): + if ctx.cancelled: + return + ctx.running = true + + ## run backend command + let res = cb(ctx) + # let res = has(ds, ctx.key) + + withLock(ctxLock): + ctx.running = false ctx.res = res.mapErr() do(e: ref CatchableError) -> ThreadResErr: e.toThreadErr() except CatchableError as exc: trace "Unexpected exception thrown in asyncHasTask", exc = exc.msg raiseAssert exc.msg + finally: + discard ctx[].signal.fireSync() + +proc hasTask[D](ctx: ptr TaskCtx, ds: D) {.gcsafe.} = + ## run backend command + runTask(ctx, ds) do(ctx: ptr TaskCtx) -> ?!bool: + has(ds, ctx.key) method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = var key = KeyId.new key.id()