mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-09 00:53:08 +00:00
adding threadsignalptr pool
This commit is contained in:
parent
20c01996b7
commit
97d884c86c
@ -28,8 +28,7 @@ method has*(
|
||||
key: Key
|
||||
): Future[?!bool] {.async.} =
|
||||
|
||||
without ret =? newThreadResult(bool), err:
|
||||
return failure(err)
|
||||
let ret = await newThreadResult(bool)
|
||||
|
||||
try:
|
||||
has(ret, self.tds, key)
|
||||
@ -44,8 +43,7 @@ method delete*(
|
||||
key: Key
|
||||
): Future[?!void] {.async.} =
|
||||
|
||||
without ret =? newThreadResult(void), err:
|
||||
return failure(err)
|
||||
let ret = await newThreadResult(void)
|
||||
|
||||
try:
|
||||
delete(ret, self.tds, key)
|
||||
@ -76,8 +74,7 @@ method get*(
|
||||
## probably be switched to use a single ThreadSignal
|
||||
## for the entire batch
|
||||
|
||||
without ret =? newThreadResult(ValueBuffer), err:
|
||||
return failure(err)
|
||||
let ret = await newThreadResult(ValueBuffer)
|
||||
|
||||
try:
|
||||
get(ret, self.tds, key)
|
||||
@ -93,8 +90,7 @@ method put*(
|
||||
data: seq[byte]
|
||||
): Future[?!void] {.async.} =
|
||||
|
||||
without ret =? newThreadResult(void), err:
|
||||
return failure(err)
|
||||
let ret = await newThreadResult(void)
|
||||
|
||||
try:
|
||||
put(ret, self.tds, key, data)
|
||||
@ -127,8 +123,7 @@ method query*(
|
||||
query: Query
|
||||
): Future[?!QueryIter] {.async.} =
|
||||
|
||||
without ret =? newThreadResult(QueryResponseBuffer), err:
|
||||
return failure(err)
|
||||
let ret = await newThreadResult(QueryResponseBuffer)
|
||||
|
||||
# echo "\n\n=== Query Start === "
|
||||
|
||||
|
||||
@ -54,7 +54,7 @@ proc initSignalPool() =
|
||||
|
||||
initSignalPool()
|
||||
|
||||
proc getSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} =
|
||||
proc getThreadSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} =
|
||||
## Get's a ThreadSignalPtr from the pool in a thread-safe way.
|
||||
##
|
||||
## This provides a simple backpressue mechanism for the
|
||||
@ -84,7 +84,7 @@ proc getSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} =
|
||||
await sleepAsync(10.milliseconds)
|
||||
raise newException(DeadThreadDefect, "reached limit trying to acquire a ThreadSignalPtr")
|
||||
|
||||
proc releaseSignal*(sig: ThreadSignalPtr) =
|
||||
proc release*(sig: ThreadSignalPtr) =
|
||||
## Release ThreadSignalPtr back to the pool in a thread-safe way.
|
||||
withLock(signalPoolLock):
|
||||
signalPoolUsed.excl(sig)
|
||||
@ -99,7 +99,7 @@ proc threadSafeType*[T: ThreadSafeTypes](tp: typedesc[T]) =
|
||||
|
||||
proc newThreadResult*[T](
|
||||
tp: typedesc[T]
|
||||
): Result[TResult[T], ref CatchableError] =
|
||||
): Future[TResult[T]] {.async.} =
|
||||
## Creates a new TResult including allocating
|
||||
## a new ThreadSignalPtr.
|
||||
##
|
||||
@ -110,12 +110,8 @@ proc newThreadResult*[T](
|
||||
{.error: "only thread safe types can be used".}
|
||||
|
||||
let res = newSharedPtr(ThreadResult[T])
|
||||
|
||||
if signal.isErr:
|
||||
return err((ref CatchableError)(msg: signal.error()))
|
||||
else:
|
||||
res[].signal = signal.get()
|
||||
ok res
|
||||
res[].signal = await getThreadSignal()
|
||||
res
|
||||
|
||||
proc success*[T](ret: TResult[T], value: T) =
|
||||
## convenience wrapper for `TResult` to replicate
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user