From 8eef3e644204a1a79b50b88dba278424538acd26 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Mon, 25 Sep 2023 23:26:59 -0700 Subject: [PATCH] integrate setups --- datastore/threads/threadproxyds.nim | 76 +++++++++-------------------- 1 file changed, 24 insertions(+), 52 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index c2d0286..56f8490 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -98,7 +98,7 @@ template dispatchTask(self: ThreadDatastore, discard ctx.signal.close() self.semaphore.release() -template runTask(ctx: ptr TaskCtx, blk: untyped) = +template executeTask(ctx: ptr TaskCtx, blk: untyped) = try: withLock(ctxLock): if ctx.cancelled: @@ -120,71 +120,43 @@ template runTask(ctx: ptr TaskCtx, blk: untyped) = proc hasTask[DB](ctx: ptr TaskCtx, ds: DB, key: KeyId) {.gcsafe.} = ## run backend command - runTask(ctx): + executeTask(ctx): has(ds, key) -method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = +method has*(self: ThreadDatastore, + key: Key): Future[?!bool] {.async.} = await self.semaphore.acquire() without signal =? acquireSignal(), err: - return bool.failure err + return failure err let key = KeyId.new key.id() dispatchTask(self, signal): self.tp.spawn hasTask(addr ctx, ds, key) -# proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} = -# if ctx.isNil: -# trace "ctx is nil" -# return +proc deleteTask[DB](ctx: ptr TaskCtx, ds: DB; + key: KeyId) {.gcsafe.} = + ## run backend command + executeTask(ctx): + delete(ds, key) -# let -# key = key[] -# fut = ctx[].ds.delete(key) +method delete*(self: ThreadDatastore, + key: Key): Future[?!void] {.async.} = + await self.semaphore.acquire() + without signal =? acquireSignal(), err: + return failure err -# asyncSpawn signalMonitor(ctx, fut) -# without res =? (await fut).catch, error: -# trace "Error in asyncDelTask", error = error.msg -# ctx[].res[].err(error) -# return + let key = KeyId.new key.id() + dispatchTask(self, signal): + self.tp.spawn deleteTask(addr ctx, ds, key) -# ctx[].res[].ok() -# return +method delete*(self: ThreadDatastore, + keys: seq[Key]): Future[?!void] {.async.} = -# proc delTask(ctx: ptr TaskCtx, key: ptr Key) = -# defer: -# if not ctx.isNil: -# discard ctx[].signal.fireSync() + for key in keys: + if err =? (await self.delete(key)).errorOption: + return failure err -# try: -# waitFor asyncDelTask(ctx, key) -# except CatchableError as exc: -# trace "Unexpected exception thrown in asyncDelTask", exc = exc.msg -# raiseAssert exc.msg - -# method delete*( -# self: ThreadDatastore, -# key: Key): Future[?!void] {.async.} = -# var -# key = key -# res = ThreadResult[void]() -# ctx = TaskCtx[void]( -# ds: self.ds, -# res: addr res) - -# proc runTask() = -# self.tp.spawn delTask(addr ctx, addr key) - -# return self.dispatchTask(ctx, key.some, runTask) - -# method delete*( -# self: ThreadDatastore, -# keys: seq[Key]): Future[?!void] {.async.} = - -# for key in keys: -# if err =? (await self.delete(key)).errorOption: -# return failure err - -# return success() + return success() # proc asyncPutTask( # ctx: ptr TaskCtx[void],