mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-08 16:43:10 +00:00
remove asyncsemaphore
This commit is contained in:
parent
6c86d2b10e
commit
f0038127c8
@ -1,96 +0,0 @@
|
||||
# Nim-datastore
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
# at your option.
|
||||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import sequtils
|
||||
import chronos, chronicles
|
||||
|
||||
# TODO: this should probably go in chronos
|
||||
|
||||
logScope:
|
||||
topics = "datastore semaphore"
|
||||
|
||||
type
|
||||
AsyncSemaphore* = ref object of RootObj
|
||||
size*: int
|
||||
count: int
|
||||
queue: seq[Future[void]]
|
||||
|
||||
proc new*(_: type AsyncSemaphore, size: int): AsyncSemaphore =
|
||||
AsyncSemaphore(size: size, count: size)
|
||||
|
||||
proc `count`*(s: AsyncSemaphore): int = s.count
|
||||
|
||||
proc tryAcquire*(s: AsyncSemaphore): bool =
|
||||
## Attempts to acquire a resource, if successful
|
||||
## returns true, otherwise false
|
||||
##
|
||||
|
||||
if s.count > 0 and s.queue.len == 0:
|
||||
s.count.dec
|
||||
trace "Acquired slot", available = s.count, queue = s.queue.len
|
||||
return true
|
||||
|
||||
proc acquire*(s: AsyncSemaphore): Future[void] =
|
||||
## Acquire a resource and decrement the resource
|
||||
## counter. If no more resources are available,
|
||||
## the returned future will not complete until
|
||||
## the resource count goes above 0.
|
||||
##
|
||||
|
||||
let fut = newFuture[void]("AsyncSemaphore.acquire")
|
||||
if s.tryAcquire():
|
||||
fut.complete()
|
||||
return fut
|
||||
|
||||
proc cancellation(udata: pointer) {.gcsafe.} =
|
||||
fut.cancelCallback = nil
|
||||
if not fut.finished:
|
||||
s.queue.keepItIf( it != fut )
|
||||
|
||||
fut.cancelCallback = cancellation
|
||||
|
||||
s.queue.add(fut)
|
||||
|
||||
trace "Queued slot", available = s.count, queue = s.queue.len
|
||||
return fut
|
||||
|
||||
proc forceAcquire*(s: AsyncSemaphore) =
|
||||
## ForceAcquire will always succeed,
|
||||
## creating a temporary slot if required.
|
||||
## This temporary slot will stay usable until
|
||||
## there is less `acquire`s than `release`s
|
||||
s.count.dec
|
||||
|
||||
proc release*(s: AsyncSemaphore) =
|
||||
## Release a resource from the semaphore,
|
||||
## by picking the first future from the queue
|
||||
## and completing it and incrementing the
|
||||
## internal resource count
|
||||
##
|
||||
|
||||
doAssert(s.count <= s.size)
|
||||
|
||||
if s.count < s.size:
|
||||
trace "Releasing slot", available = s.count,
|
||||
queue = s.queue.len
|
||||
|
||||
s.count.inc
|
||||
while s.queue.len > 0:
|
||||
var fut = s.queue[0]
|
||||
s.queue.delete(0)
|
||||
if not fut.finished():
|
||||
s.count.dec
|
||||
fut.complete()
|
||||
break
|
||||
|
||||
trace "Released slot", available = s.count,
|
||||
queue = s.queue.len
|
||||
return
|
||||
@ -21,7 +21,6 @@ import ../key
|
||||
import ../query
|
||||
import ../datastore
|
||||
|
||||
import ./asyncsemaphore
|
||||
import ./databuffer
|
||||
|
||||
type
|
||||
@ -36,15 +35,17 @@ type
|
||||
ThreadDatastore* = ref object of Datastore
|
||||
tp: Taskpool
|
||||
ds: Datastore
|
||||
semaphore: AsyncSemaphore
|
||||
tasks: seq[Future[void]]
|
||||
|
||||
template dispatchTask(self: ThreadDatastore, ctx: TaskCtx, runTask: proc): untyped =
|
||||
template dispatchTask(
|
||||
self: ThreadDatastore,
|
||||
ctx: TaskCtx,
|
||||
runTask: proc): untyped =
|
||||
|
||||
let
|
||||
fut = wait(ctx.signal)
|
||||
|
||||
try:
|
||||
await self.semaphore.acquire()
|
||||
runTask()
|
||||
self.tasks.add(fut)
|
||||
await fut
|
||||
@ -58,8 +59,6 @@ template dispatchTask(self: ThreadDatastore, ctx: TaskCtx, runTask: proc): untyp
|
||||
idx != -1):
|
||||
self.tasks.del(idx)
|
||||
|
||||
self.semaphore.release()
|
||||
|
||||
proc hasTask(
|
||||
ctx: ptr TaskCtx,
|
||||
key: ptr Key) =
|
||||
@ -223,7 +222,7 @@ method close*(self: ThreadDatastore): Future[?!void] {.async.} =
|
||||
|
||||
await self.ds.close()
|
||||
|
||||
proc queryTask*(
|
||||
proc queryTask(
|
||||
ctx: ptr TaskCtx,
|
||||
iter: ptr QueryIter) =
|
||||
|
||||
@ -303,5 +302,4 @@ func new*(
|
||||
|
||||
success ThreadDatastore(
|
||||
tp: tp,
|
||||
ds: ds,
|
||||
semaphore: AsyncSemaphore.new(tp.numThreads - 1)) # one thread is needed for the task dispatcher
|
||||
ds: ds)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user