From 97d884c86cf649359271b4bb99d3fc7c01e6cf4d Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 5 Sep 2023 16:15:32 -0700 Subject: [PATCH] adding threadsignalptr pool --- datastore/threadproxyds.nim | 15 +++++---------- datastore/threadresults.nim | 14 +++++--------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index 6f94899..07960d6 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -28,8 +28,7 @@ method has*( key: Key ): Future[?!bool] {.async.} = - without ret =? newThreadResult(bool), err: - return failure(err) + let ret = await newThreadResult(bool) try: has(ret, self.tds, key) @@ -44,8 +43,7 @@ method delete*( key: Key ): Future[?!void] {.async.} = - without ret =? newThreadResult(void), err: - return failure(err) + let ret = await newThreadResult(void) try: delete(ret, self.tds, key) @@ -76,8 +74,7 @@ method get*( ## probably be switched to use a single ThreadSignal ## for the entire batch - without ret =? newThreadResult(ValueBuffer), err: - return failure(err) + let ret = await newThreadResult(ValueBuffer) try: get(ret, self.tds, key) @@ -93,8 +90,7 @@ method put*( data: seq[byte] ): Future[?!void] {.async.} = - without ret =? newThreadResult(void), err: - return failure(err) + let ret = await newThreadResult(void) try: put(ret, self.tds, key, data) @@ -127,8 +123,7 @@ method query*( query: Query ): Future[?!QueryIter] {.async.} = - without ret =? newThreadResult(QueryResponseBuffer), err: - return failure(err) + let ret = await newThreadResult(QueryResponseBuffer) # echo "\n\n=== Query Start === " diff --git a/datastore/threadresults.nim b/datastore/threadresults.nim index 39b2181..211d7e1 100644 --- a/datastore/threadresults.nim +++ b/datastore/threadresults.nim @@ -54,7 +54,7 @@ proc initSignalPool() = initSignalPool() -proc getSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} = +proc getThreadSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} = ## Get's a ThreadSignalPtr from the pool in a thread-safe way. ## ## This provides a simple backpressue mechanism for the @@ -84,7 +84,7 @@ proc getSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} = await sleepAsync(10.milliseconds) raise newException(DeadThreadDefect, "reached limit trying to acquire a ThreadSignalPtr") -proc releaseSignal*(sig: ThreadSignalPtr) = +proc release*(sig: ThreadSignalPtr) = ## Release ThreadSignalPtr back to the pool in a thread-safe way. withLock(signalPoolLock): signalPoolUsed.excl(sig) @@ -99,7 +99,7 @@ proc threadSafeType*[T: ThreadSafeTypes](tp: typedesc[T]) = proc newThreadResult*[T]( tp: typedesc[T] -): Result[TResult[T], ref CatchableError] = +): Future[TResult[T]] {.async.} = ## Creates a new TResult including allocating ## a new ThreadSignalPtr. ## @@ -110,12 +110,8 @@ proc newThreadResult*[T]( {.error: "only thread safe types can be used".} let res = newSharedPtr(ThreadResult[T]) - - if signal.isErr: - return err((ref CatchableError)(msg: signal.error())) - else: - res[].signal = signal.get() - ok res + res[].signal = await getThreadSignal() + res proc success*[T](ret: TResult[T], value: T) = ## convenience wrapper for `TResult` to replicate