From 575d97311837464efa2c1792ec4892287b35e328 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Mon, 25 Sep 2023 22:35:34 -0700 Subject: [PATCH] simplifying --- datastore/threads/threadproxyds.nim | 47 +++++++++++++++++++---------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index e5d2265..715a051 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -10,6 +10,7 @@ import std/atomics import std/strutils import std/tables import std/sequtils +import std/locks import pkg/chronos import pkg/chronos/threadsync @@ -36,6 +37,8 @@ logScope: topics = "datastore threadproxyds" type + SqliteDB = SQLiteBackend[KeyId,DataBuffer] + ThreadBackendKinds* = enum Sqlite # Filesystem @@ -49,8 +52,9 @@ type TaskCtx[D; T: ThreadTypes] = object ds: D res: ThreadResult[T] - cancelled: bool signal: ThreadSignalPtr + running: bool + cancelled: bool ThreadDatastore* = ref object of Datastore tp: Taskpool @@ -58,21 +62,30 @@ type semaphore: AsyncSemaphore # semaphore is used for backpressure \ # to avoid exhausting file descriptors +var finishLock: Lock -proc dispatchTask[D, T]( - self: ThreadDatastore, - ctx: var TaskCtx[D, T], - key: ?KeyId = KeyId.none, - runTask: proc -): Future[?!T] {.async.} = +finishLock.initLock() + +proc setCancelled(ctx: var TaskCtx): bool = + withLock(finishLock): + if ctx.running: + return false + else: + ctx.cancelled = true + return true + +proc dispatchTask[D, T](self: ThreadDatastore, + ctx: var TaskCtx[D, T], + key: ?KeyId = KeyId.none, + runTask: proc(): void + ): Future[?!T] {.async.} = try: runTask() - let fut = wait(ctx.signal) - + await wait(ctx.signal) except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg - ctx.cancelled = true - await ctx.signal.fire() + while not ctx.setCancelled(): + await sleepAsync(10.milliseconds) raise exc finally: discard ctx.signal.close() @@ -92,27 +105,29 @@ proc hasTask(ctx: ptr TaskCtx, key: KeyId) = try: let res = has(ctx.ds, key) + ctx.res = res.mapErr() do(e: ref CatchableError) -> ThreadResErr: + e.toThreadErr() except CatchableError as exc: trace "Unexpected exception thrown in asyncHasTask", exc = exc.msg raiseAssert exc.msg method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = - var - key = KeyId.new key.id() + var key = KeyId.new key.id() await self.semaphore.acquire() - let signal = ? acquireSignal() + without signal =? acquireSignal(), err: + return bool.failure err case self.backend.kind: of Sqlite: var ds = self.backend.sql - ctx = TaskCtx[typeof(ds), bool](ds: ds, signal: signal) + ctx = TaskCtx[SqliteDB, bool](ds: self.backend.sql, signal: signal) proc runTask() = self.tp.spawn hasTask(addr ctx, key) - return self.dispatchTask(ctx, key.some, runTask) + return await self.dispatchTask(ctx, key.some, runTask) # proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} = # if ctx.isNil: