diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 5e11472..e5d2265 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -50,7 +50,6 @@ type ds: D res: ThreadResult[T] cancelled: bool - semaphore: AsyncSemaphore signal: ThreadSignalPtr ThreadDatastore* = ref object of Datastore @@ -58,59 +57,18 @@ type backend: ThreadBackend semaphore: AsyncSemaphore # semaphore is used for backpressure \ # to avoid exhausting file descriptors - withLocks: bool - tasks: Table[KeyId, Future[void]] - queryLock: AsyncLock # global query lock, this is only really \ - # needed for the fsds, but it is expensive! -template withLocks( - self: ThreadDatastore, - ctx: TaskCtx, - key: ?KeyId = KeyId.none, - body: untyped): untyped = + +proc dispatchTask[D, T]( + self: ThreadDatastore, + ctx: var TaskCtx[D, T], + key: ?KeyId = KeyId.none, + runTask: proc +): Future[?!T] {.async.} = try: - if key.isSome and key.get in self.tasks: - if self.withLocks: - await self.tasks[key.get] + runTask() + let fut = wait(ctx.signal) - if self.withLocks: - await self.queryLock.acquire() # only lock if it's required (fsds) - - block: - body - finally: - if self.withLocks: - if key.isSome and key.get in self.tasks: - self.tasks.del(key.get) - if self.queryLock.locked: - self.queryLock.release() - -# TODO: needs rework, we can't use `result` with async -template dispatchTask[D, T]( - self: ThreadDatastore, - ctx: TaskCtx[D, T], - key: ?KeyId = KeyId.none, - runTask: proc): auto = - try: - await self.semaphore.acquire() - let signal = ThreadSignalPtr.new() - if signal.isErr: - failure(signal.error) - else: - ctx.signal = signal.get() - let - fut = wait(ctx.signal) - - withLocks(self, ctx, key): - runTask() - await fut - if ctx.res.isErr: - failure ctx.res.error - else: - when result.T isnot void: - success result.T(ctx.res.get) - else: - success() except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.cancelled = true @@ -120,29 +78,12 @@ template dispatchTask[D, T]( discard ctx.signal.close() self.semaphore.release() -proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} = - ## Monitor the signal and cancel the future if - ## the cancellation flag is set - ## - - if ctx.isNil: - trace "ctx is nil" - return - - try: - await ctx[].signal.wait() - trace "Received signal" - - if ctx[].cancelled: # there could eventually be other flags - trace "Cancelling future" - if not fut.finished: - await fut.cancelAndWait() # cancel the `has` future - - discard ctx[].signal.fireSync() - except CatchableError as exc: - trace "Exception in thread signal monitor", exc = exc.msg - ctx[].res[].err(exc) - discard ctx[].signal.fireSync() +proc acquireSignal(): ?!ThreadSignalPtr = + let signal = ThreadSignalPtr.new() + if signal.isErr(): + failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & signal.error()) + else: + success signal.get() proc hasTask(ctx: ptr TaskCtx, key: KeyId) = defer: @@ -159,11 +100,14 @@ method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = var key = KeyId.new key.id() + await self.semaphore.acquire() + let signal = ? acquireSignal() + case self.backend.kind: of Sqlite: var ds = self.backend.sql - ctx = TaskCtx[typeof(ds), bool](ds: ds) + ctx = TaskCtx[typeof(ds), bool](ds: ds, signal: signal) proc runTask() = self.tp.spawn hasTask(addr ctx, key)