From f0038127c83a7191970fe186d7d701dbec182f62 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 14 Sep 2023 17:56:02 -0600 Subject: [PATCH] remove asyncsemaphore --- datastore/threads/asyncsemaphore.nim | 96 ---------------------------- datastore/threads/threadproxyds.nim | 16 ++--- 2 files changed, 7 insertions(+), 105 deletions(-) delete mode 100644 datastore/threads/asyncsemaphore.nim diff --git a/datastore/threads/asyncsemaphore.nim b/datastore/threads/asyncsemaphore.nim deleted file mode 100644 index ee57ee0..0000000 --- a/datastore/threads/asyncsemaphore.nim +++ /dev/null @@ -1,96 +0,0 @@ -# Nim-datastore -# Copyright (c) 2023 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -# * MIT license ([LICENSE-MIT](LICENSE-MIT)) -# at your option. -# This file may not be copied, modified, or distributed except according to -# those terms. - -{.push raises: [].} - -import sequtils -import chronos, chronicles - -# TODO: this should probably go in chronos - -logScope: - topics = "datastore semaphore" - -type - AsyncSemaphore* = ref object of RootObj - size*: int - count: int - queue: seq[Future[void]] - -proc new*(_: type AsyncSemaphore, size: int): AsyncSemaphore = - AsyncSemaphore(size: size, count: size) - -proc `count`*(s: AsyncSemaphore): int = s.count - -proc tryAcquire*(s: AsyncSemaphore): bool = - ## Attempts to acquire a resource, if successful - ## returns true, otherwise false - ## - - if s.count > 0 and s.queue.len == 0: - s.count.dec - trace "Acquired slot", available = s.count, queue = s.queue.len - return true - -proc acquire*(s: AsyncSemaphore): Future[void] = - ## Acquire a resource and decrement the resource - ## counter. If no more resources are available, - ## the returned future will not complete until - ## the resource count goes above 0. - ## - - let fut = newFuture[void]("AsyncSemaphore.acquire") - if s.tryAcquire(): - fut.complete() - return fut - - proc cancellation(udata: pointer) {.gcsafe.} = - fut.cancelCallback = nil - if not fut.finished: - s.queue.keepItIf( it != fut ) - - fut.cancelCallback = cancellation - - s.queue.add(fut) - - trace "Queued slot", available = s.count, queue = s.queue.len - return fut - -proc forceAcquire*(s: AsyncSemaphore) = - ## ForceAcquire will always succeed, - ## creating a temporary slot if required. - ## This temporary slot will stay usable until - ## there is less `acquire`s than `release`s - s.count.dec - -proc release*(s: AsyncSemaphore) = - ## Release a resource from the semaphore, - ## by picking the first future from the queue - ## and completing it and incrementing the - ## internal resource count - ## - - doAssert(s.count <= s.size) - - if s.count < s.size: - trace "Releasing slot", available = s.count, - queue = s.queue.len - - s.count.inc - while s.queue.len > 0: - var fut = s.queue[0] - s.queue.delete(0) - if not fut.finished(): - s.count.dec - fut.complete() - break - - trace "Released slot", available = s.count, - queue = s.queue.len - return diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index d3ae21a..09f448f 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -21,7 +21,6 @@ import ../key import ../query import ../datastore -import ./asyncsemaphore import ./databuffer type @@ -36,15 +35,17 @@ type ThreadDatastore* = ref object of Datastore tp: Taskpool ds: Datastore - semaphore: AsyncSemaphore tasks: seq[Future[void]] -template dispatchTask(self: ThreadDatastore, ctx: TaskCtx, runTask: proc): untyped = +template dispatchTask( + self: ThreadDatastore, + ctx: TaskCtx, + runTask: proc): untyped = + let fut = wait(ctx.signal) try: - await self.semaphore.acquire() runTask() self.tasks.add(fut) await fut @@ -58,8 +59,6 @@ template dispatchTask(self: ThreadDatastore, ctx: TaskCtx, runTask: proc): untyp idx != -1): self.tasks.del(idx) - self.semaphore.release() - proc hasTask( ctx: ptr TaskCtx, key: ptr Key) = @@ -223,7 +222,7 @@ method close*(self: ThreadDatastore): Future[?!void] {.async.} = await self.ds.close() -proc queryTask*( +proc queryTask( ctx: ptr TaskCtx, iter: ptr QueryIter) = @@ -303,5 +302,4 @@ func new*( success ThreadDatastore( tp: tp, - ds: ds, - semaphore: AsyncSemaphore.new(tp.numThreads - 1)) # one thread is needed for the task dispatcher + ds: ds)