mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-05 23:23:10 +00:00
adding back async semaphore
This commit is contained in:
parent
20d7234d50
commit
1713c7674c
96
datastore/threads/asyncsemaphore.nim
Normal file
96
datastore/threads/asyncsemaphore.nim
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
# Nim-Libp2p
|
||||||
|
# Copyright (c) 2023 Status Research & Development GmbH
|
||||||
|
# Licensed under either of
|
||||||
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
# at your option.
|
||||||
|
# This file may not be copied, modified, or distributed except according to
|
||||||
|
# those terms.
|
||||||
|
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import sequtils
|
||||||
|
import chronos, chronicles
|
||||||
|
|
||||||
|
# TODO: this should probably go in chronos
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "libp2p semaphore"
|
||||||
|
|
||||||
|
type
|
||||||
|
AsyncSemaphore* = ref object of RootObj
|
||||||
|
size*: int
|
||||||
|
count: int
|
||||||
|
queue: seq[Future[void]]
|
||||||
|
|
||||||
|
func new*(_: type AsyncSemaphore, size: int): AsyncSemaphore =
|
||||||
|
AsyncSemaphore(size: size, count: size)
|
||||||
|
|
||||||
|
proc `count`*(s: AsyncSemaphore): int = s.count
|
||||||
|
|
||||||
|
proc tryAcquire*(s: AsyncSemaphore): bool =
|
||||||
|
## Attempts to acquire a resource, if successful
|
||||||
|
## returns true, otherwise false
|
||||||
|
##
|
||||||
|
|
||||||
|
if s.count > 0 and s.queue.len == 0:
|
||||||
|
s.count.dec
|
||||||
|
trace "Acquired slot", available = s.count, queue = s.queue.len
|
||||||
|
return true
|
||||||
|
|
||||||
|
proc acquire*(s: AsyncSemaphore): Future[void] =
|
||||||
|
## Acquire a resource and decrement the resource
|
||||||
|
## counter. If no more resources are available,
|
||||||
|
## the returned future will not complete until
|
||||||
|
## the resource count goes above 0.
|
||||||
|
##
|
||||||
|
|
||||||
|
let fut = newFuture[void]("AsyncSemaphore.acquire")
|
||||||
|
if s.tryAcquire():
|
||||||
|
fut.complete()
|
||||||
|
return fut
|
||||||
|
|
||||||
|
proc cancellation(udata: pointer) {.gcsafe.} =
|
||||||
|
fut.cancelCallback = nil
|
||||||
|
if not fut.finished:
|
||||||
|
s.queue.keepItIf( it != fut )
|
||||||
|
|
||||||
|
fut.cancelCallback = cancellation
|
||||||
|
|
||||||
|
s.queue.add(fut)
|
||||||
|
|
||||||
|
trace "Queued slot", available = s.count, queue = s.queue.len
|
||||||
|
return fut
|
||||||
|
|
||||||
|
proc forceAcquire*(s: AsyncSemaphore) =
|
||||||
|
## ForceAcquire will always succeed,
|
||||||
|
## creating a temporary slot if required.
|
||||||
|
## This temporary slot will stay usable until
|
||||||
|
## there is less `acquire`s than `release`s
|
||||||
|
s.count.dec
|
||||||
|
|
||||||
|
proc release*(s: AsyncSemaphore) =
|
||||||
|
## Release a resource from the semaphore,
|
||||||
|
## by picking the first future from the queue
|
||||||
|
## and completing it and incrementing the
|
||||||
|
## internal resource count
|
||||||
|
##
|
||||||
|
|
||||||
|
doAssert(s.count <= s.size)
|
||||||
|
|
||||||
|
if s.count < s.size:
|
||||||
|
trace "Releasing slot", available = s.count,
|
||||||
|
queue = s.queue.len
|
||||||
|
|
||||||
|
s.count.inc
|
||||||
|
while s.queue.len > 0:
|
||||||
|
var fut = s.queue[0]
|
||||||
|
s.queue.delete(0)
|
||||||
|
if not fut.finished():
|
||||||
|
s.count.dec
|
||||||
|
fut.complete()
|
||||||
|
break
|
||||||
|
|
||||||
|
trace "Released slot", available = s.count,
|
||||||
|
queue = s.queue.len
|
||||||
|
return
|
||||||
206
tests/datastore/testasyncsemaphore.nim
Normal file
206
tests/datastore/testasyncsemaphore.nim
Normal file
@ -0,0 +1,206 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
# Nim-Libp2p
|
||||||
|
# Copyright (c) 2023 Status Research & Development GmbH
|
||||||
|
# Licensed under either of
|
||||||
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
# at your option.
|
||||||
|
# This file may not be copied, modified, or distributed except according to
|
||||||
|
# those terms.
|
||||||
|
|
||||||
|
import random
|
||||||
|
import chronos
|
||||||
|
|
||||||
|
import ../libp2p/utils/semaphore
|
||||||
|
|
||||||
|
import ./helpers
|
||||||
|
|
||||||
|
randomize()
|
||||||
|
|
||||||
|
suite "AsyncSemaphore":
|
||||||
|
asyncTest "should acquire":
|
||||||
|
let sema = newAsyncSemaphore(3)
|
||||||
|
|
||||||
|
await sema.acquire()
|
||||||
|
await sema.acquire()
|
||||||
|
await sema.acquire()
|
||||||
|
|
||||||
|
check sema.count == 0
|
||||||
|
|
||||||
|
asyncTest "should release":
|
||||||
|
let sema = newAsyncSemaphore(3)
|
||||||
|
|
||||||
|
await sema.acquire()
|
||||||
|
await sema.acquire()
|
||||||
|
await sema.acquire()
|
||||||
|
|
||||||
|
check sema.count == 0
|
||||||
|
sema.release()
|
||||||
|
sema.release()
|
||||||
|
sema.release()
|
||||||
|
check sema.count == 3
|
||||||
|
|
||||||
|
asyncTest "should queue acquire":
|
||||||
|
let sema = newAsyncSemaphore(1)
|
||||||
|
|
||||||
|
await sema.acquire()
|
||||||
|
let fut = sema.acquire()
|
||||||
|
|
||||||
|
check sema.count == 0
|
||||||
|
sema.release()
|
||||||
|
sema.release()
|
||||||
|
check sema.count == 1
|
||||||
|
|
||||||
|
await sleepAsync(10.millis)
|
||||||
|
check fut.finished()
|
||||||
|
|
||||||
|
asyncTest "should keep count == size":
|
||||||
|
let sema = newAsyncSemaphore(1)
|
||||||
|
sema.release()
|
||||||
|
sema.release()
|
||||||
|
sema.release()
|
||||||
|
check sema.count == 1
|
||||||
|
|
||||||
|
asyncTest "should tryAcquire":
|
||||||
|
let sema = newAsyncSemaphore(1)
|
||||||
|
await sema.acquire()
|
||||||
|
check sema.tryAcquire() == false
|
||||||
|
|
||||||
|
asyncTest "should tryAcquire and acquire":
|
||||||
|
let sema = newAsyncSemaphore(4)
|
||||||
|
check sema.tryAcquire() == true
|
||||||
|
check sema.tryAcquire() == true
|
||||||
|
check sema.tryAcquire() == true
|
||||||
|
check sema.tryAcquire() == true
|
||||||
|
check sema.count == 0
|
||||||
|
|
||||||
|
let fut = sema.acquire()
|
||||||
|
check fut.finished == false
|
||||||
|
check sema.count == 0
|
||||||
|
|
||||||
|
sema.release()
|
||||||
|
sema.release()
|
||||||
|
sema.release()
|
||||||
|
sema.release()
|
||||||
|
sema.release()
|
||||||
|
|
||||||
|
check fut.finished == true
|
||||||
|
check sema.count == 4
|
||||||
|
|
||||||
|
asyncTest "should restrict resource access":
|
||||||
|
let sema = newAsyncSemaphore(3)
|
||||||
|
var resource = 0
|
||||||
|
|
||||||
|
proc task() {.async.} =
|
||||||
|
try:
|
||||||
|
await sema.acquire()
|
||||||
|
resource.inc()
|
||||||
|
check resource > 0 and resource <= 3
|
||||||
|
let sleep = rand(0..10).millis
|
||||||
|
# echo sleep
|
||||||
|
await sleepAsync(sleep)
|
||||||
|
finally:
|
||||||
|
resource.dec()
|
||||||
|
sema.release()
|
||||||
|
|
||||||
|
var tasks: seq[Future[void]]
|
||||||
|
for i in 0..<10:
|
||||||
|
tasks.add(task())
|
||||||
|
|
||||||
|
await allFutures(tasks)
|
||||||
|
|
||||||
|
asyncTest "should cancel sequential semaphore slot":
|
||||||
|
let sema = newAsyncSemaphore(1)
|
||||||
|
|
||||||
|
await sema.acquire()
|
||||||
|
|
||||||
|
let
|
||||||
|
tmp = sema.acquire()
|
||||||
|
tmp2 = sema.acquire()
|
||||||
|
check:
|
||||||
|
not tmp.finished()
|
||||||
|
not tmp2.finished()
|
||||||
|
|
||||||
|
tmp.cancel()
|
||||||
|
sema.release()
|
||||||
|
|
||||||
|
check tmp2.finished()
|
||||||
|
|
||||||
|
sema.release()
|
||||||
|
|
||||||
|
check await sema.acquire().withTimeout(10.millis)
|
||||||
|
|
||||||
|
asyncTest "should handle out of order cancellations":
|
||||||
|
let sema = newAsyncSemaphore(1)
|
||||||
|
|
||||||
|
await sema.acquire() # 1st acquire
|
||||||
|
let tmp1 = sema.acquire() # 2nd acquire
|
||||||
|
check not tmp1.finished()
|
||||||
|
|
||||||
|
let tmp2 = sema.acquire() # 3rd acquire
|
||||||
|
check not tmp2.finished()
|
||||||
|
|
||||||
|
let tmp3 = sema.acquire() # 4th acquire
|
||||||
|
check not tmp3.finished()
|
||||||
|
|
||||||
|
# up to this point, we've called acquire 4 times
|
||||||
|
tmp1.cancel() # 1st release (implicit)
|
||||||
|
tmp2.cancel() # 2nd release (implicit)
|
||||||
|
|
||||||
|
check not tmp3.finished() # check that we didn't release the wrong slot
|
||||||
|
|
||||||
|
sema.release() # 3rd release (explicit)
|
||||||
|
check tmp3.finished()
|
||||||
|
|
||||||
|
sema.release() # 4th release
|
||||||
|
check await sema.acquire().withTimeout(10.millis)
|
||||||
|
|
||||||
|
asyncTest "should properly handle timeouts and cancellations":
|
||||||
|
let sema = newAsyncSemaphore(1)
|
||||||
|
|
||||||
|
await sema.acquire()
|
||||||
|
check not(await sema.acquire().withTimeout(1.millis)) # should not acquire but cancel
|
||||||
|
sema.release()
|
||||||
|
|
||||||
|
check await sema.acquire().withTimeout(10.millis)
|
||||||
|
|
||||||
|
asyncTest "should handle forceAcquire properly":
|
||||||
|
let sema = newAsyncSemaphore(1)
|
||||||
|
|
||||||
|
await sema.acquire()
|
||||||
|
check not(await sema.acquire().withTimeout(1.millis)) # should not acquire but cancel
|
||||||
|
|
||||||
|
let
|
||||||
|
fut1 = sema.acquire()
|
||||||
|
fut2 = sema.acquire()
|
||||||
|
|
||||||
|
sema.forceAcquire()
|
||||||
|
sema.release()
|
||||||
|
|
||||||
|
await fut1 or fut2 or sleepAsync(1.millis)
|
||||||
|
check:
|
||||||
|
fut1.finished()
|
||||||
|
not fut2.finished()
|
||||||
|
|
||||||
|
sema.release()
|
||||||
|
await fut1 or fut2 or sleepAsync(1.millis)
|
||||||
|
check:
|
||||||
|
fut1.finished()
|
||||||
|
fut2.finished()
|
||||||
|
|
||||||
|
|
||||||
|
sema.forceAcquire()
|
||||||
|
sema.forceAcquire()
|
||||||
|
|
||||||
|
let
|
||||||
|
fut3 = sema.acquire()
|
||||||
|
fut4 = sema.acquire()
|
||||||
|
fut5 = sema.acquire()
|
||||||
|
sema.release()
|
||||||
|
sema.release()
|
||||||
|
await sleepAsync(1.millis)
|
||||||
|
check:
|
||||||
|
fut3.finished()
|
||||||
|
fut4.finished()
|
||||||
|
not fut5.finished()
|
||||||
Loading…
x
Reference in New Issue
Block a user