diff --git a/datastore/threads/keylocks.nim b/datastore/threads/keylocks.nim new file mode 100644 index 0000000..ba647e2 --- /dev/null +++ b/datastore/threads/keylocks.nim @@ -0,0 +1,57 @@ +import pkg/chronos/threadsync +import pkg/chronos +import std/locks +import std/sets + +import ./databuffer +import ./sharedptr + +export databuffer +export sharedptr +export threadsync + +const + KeyLocksSize {.intdefine.} = 100 + KeyLocksRetries {.intdefine.} = 1000 + +var + keyLocksLock: Lock + keyLocksUsed: HashSet[KeyBuffer] + +type + KeyLock* = object + ## TODO: anything here? + +proc initKeyLocks() = + keyLocksLock.initLock() + keyLocksUsed = initHashSet[KeyBuffer](2*KeyLocksSize) ## avoid re-allocating this + +initKeyLocks() + +proc acquireKeyLock*(key: KeyBuffer): Future[KeyLock] {.async, raises: [].} = + ## Simple locking table for Datastore keys with async backpressure + ## + {.cast(gcsafe).}: + var cnt = KeyLocksRetries + while cnt > 0: + cnt.dec() + keyLocksLock.acquire() + try: + if key notin keyLocksUsed: + keyLocksUsed.incl(key) + return + except KeyError: + discard + finally: + keyLocksLock.release() + # echo "wait:KeyLocksUsed: " + await sleepAsync(1.milliseconds) + raise newException(DeadThreadDefect, "reached limit trying to acquire a KeyBuffer") + +proc release*(key: KeyBuffer) {.raises: [].} = + ## Release KeyBuffer back to the pool in a thread-safe way. + {.cast(gcsafe).}: + withLock(keyLocksLock): + keyLocksUsed.excl(key) + # echo "free:KeyLocksUsed:size: ", KeyLocksUsed.len() + diff --git a/datastore/threads/then.nim b/datastore/threads/then.nim deleted file mode 100644 index d98d461..0000000 --- a/datastore/threads/then.nim +++ /dev/null @@ -1,209 +0,0 @@ -import pkg/chronos -import pkg/questionable -import pkg/questionable/results -import pkg/upraises - -## duplicate of codex/utils/then.nim - -# Similar to JavaScript's Promise API, `.then` and `.catch` can be used to -# handle results and errors of async `Futures` within a synchronous closure. -# They can be used as an alternative to `asyncSpawn` which does not return a -# value and will raise a `FutureDefect` if there are unhandled errors -# encountered. Both `.then` and `.catch` act as callbacks that do not block the -# synchronous closure's flow. - -# `.then` is called when the `Future` is successfully completed and can be -# chained as many times as desired, calling each `.then` callback in order. When -# the `Future` returns `Result[T, ref CatchableError]` (or `?!T`), the value -# called in the `.then` callback will be unpacked from the `Result` as a -# convenience. In other words, for `Future[?!T]`, the `.then` callback will take -# a single parameter `T`. See `tests/utils/testthen.nim` for more examples. To -# allow for chaining, `.then` returns its future. If the future is already -# complete, the `.then` callback will be executed immediately. - -# `.catch` is called when the `Future` fails. In the case when the `Future` -# returns a `Result[T, ref CatchableError` (or `?!T`), `.catch` will be called -# if the `Result` contains an error. If the `Future` is already failed (or -# `Future[?!T]` contains an error), the `.catch` callback will be executed -# immediately. - -# `.cancelled` is called when the `Future` is cancelled. If the `Future` is -# already cancelled, the `.cancelled` callback will be executed immediately. - -# More info on JavaScript's Promise API can be found at: -# https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise - -runnableExamples: - proc asyncProc(): Future[int] {.async.} = - await sleepAsync(1.millis) - return 1 - - asyncProc() - .then(proc(i: int) = echo "returned ", i) - .catch(proc(e: ref CatchableError) = doAssert false, "will not be triggered") - - # outputs "returned 1" - - proc asyncProcWithError(): Future[int] {.async.} = - await sleepAsync(1.millis) - raise newException(ValueError, "some error") - - asyncProcWithError() - .then(proc(i: int) = doAssert false, "will not be triggered") - .catch(proc(e: ref CatchableError) = echo "errored: ", e.msg) - - # outputs "errored: some error" - -type - OnSuccess*[T] = proc(val: T) {.gcsafe, upraises: [].} - OnError* = proc(err: ref CatchableError) {.gcsafe, upraises: [].} - OnCancelled* = proc() {.gcsafe, upraises: [].} - -proc ignoreError(err: ref CatchableError) = discard -proc ignoreCancelled() = discard - -template handleFinished(future: FutureBase, - onError: OnError, - onCancelled: OnCancelled) = - - if not future.finished: - return - - if future.cancelled: - onCancelled() - return - - if future.failed: - onError(future.error) - return - -proc then*(future: Future[void], onSuccess: OnSuccess[void]): Future[void] = - - proc cb(udata: pointer) = - future.handleFinished(ignoreError, ignoreCancelled) - onSuccess() - - proc cancellation(udata: pointer) = - if not future.finished(): - future.removeCallback(cb) - - future.addCallback(cb) - future.cancelCallback = cancellation - return future - -proc then*[T](future: Future[T], onSuccess: OnSuccess[T]): Future[T] = - - proc cb(udata: pointer) = - future.handleFinished(ignoreError, ignoreCancelled) - - if val =? future.read.catch: - onSuccess(val) - - proc cancellation(udata: pointer) = - if not future.finished(): - future.removeCallback(cb) - - future.addCallback(cb) - future.cancelCallback = cancellation - return future - -proc then*[T](future: Future[?!T], onSuccess: OnSuccess[T]): Future[?!T] = - - proc cb(udata: pointer) = - future.handleFinished(ignoreError, ignoreCancelled) - - try: - if val =? future.read: - onSuccess(val) - except CatchableError as e: - ignoreError(e) - - proc cancellation(udata: pointer) = - if not future.finished(): - future.removeCallback(cb) - - future.addCallback(cb) - future.cancelCallback = cancellation - return future - -proc then*(future: Future[?!void], onSuccess: OnSuccess[void]): Future[?!void] = - - proc cb(udata: pointer) = - future.handleFinished(ignoreError, ignoreCancelled) - - try: - if future.read.isOk: - onSuccess() - except CatchableError as e: - ignoreError(e) - return - - proc cancellation(udata: pointer) = - if not future.finished(): - future.removeCallback(cb) - - future.addCallback(cb) - future.cancelCallback = cancellation - return future - -proc catch*[T](future: Future[T], onError: OnError) = - - if future.isNil: return - - proc cb(udata: pointer) = - future.handleFinished(onError, ignoreCancelled) - - proc cancellation(udata: pointer) = - if not future.finished(): - future.removeCallback(cb) - - future.addCallback(cb) - future.cancelCallback = cancellation - -proc catch*[T](future: Future[?!T], onError: OnError) = - - if future.isNil: return - - proc cb(udata: pointer) = - future.handleFinished(onError, ignoreCancelled) - - try: - if err =? future.read.errorOption: - onError(err) - except CatchableError as e: - onError(e) - - proc cancellation(udata: pointer) = - if not future.finished(): - future.removeCallback(cb) - - future.addCallback(cb) - future.cancelCallback = cancellation - -proc cancelled*[T](future: Future[T], onCancelled: OnCancelled): Future[T] = - - proc cb(udata: pointer) = - future.handleFinished(ignoreError, onCancelled) - - proc cancellation(udata: pointer) = - if not future.finished(): - future.removeCallback(cb) - onCancelled() - - future.addCallback(cb) - future.cancelCallback = cancellation - return future - -proc cancelled*[T](future: Future[?!T], onCancelled: OnCancelled): Future[?!T] = - - proc cb(udata: pointer) = - future.handleFinished(ignoreError, onCancelled) - - proc cancellation(udata: pointer) = - if not future.finished(): - future.removeCallback(cb) - onCancelled() - - future.addCallback(cb) - future.cancelCallback = cancellation - return future \ No newline at end of file