From 8b72f83995c4edd0551f6a1422e38a32b52b2150 Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Tue, 5 Sep 2023 16:20:56 -0700 Subject: [PATCH] adding threadsignalptr pool --- datastore/threadproxyds.nim | 10 +++++----- datastore/threadresults.nim | 25 +++++++++++++++---------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index 07960d6..adafb1e 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -34,7 +34,7 @@ method has*( has(ret, self.tds, key) await wait(ret[].signal) finally: - ret[].signal.close() + ret[].signal.release() return ret.convert(bool) @@ -49,7 +49,7 @@ method delete*( delete(ret, self.tds, key) await wait(ret[].signal) finally: - ret[].signal.close() + ret[].signal.release() return ret.convert(void) @@ -80,7 +80,7 @@ method get*( get(ret, self.tds, key) await wait(ret[].signal) finally: - ret[].signal.close() + ret[].signal.release() return ret.convert(seq[byte]) @@ -96,7 +96,7 @@ method put*( put(ret, self.tds, key, data) await wait(ret[].signal) finally: - ret[].signal.close() + ret[].signal.release() return ret.convert(void) @@ -155,7 +155,7 @@ method query*( proc dispose(): Future[?!void] {.async.} = iter[].it = nil # ensure our sharedptr doesn't try and dealloc - ret[].signal.close() + ret[].signal.release() return success() iterWrapper.next = next diff --git a/datastore/threadresults.nim b/datastore/threadresults.nim index 211d7e1..d9b32dc 100644 --- a/datastore/threadresults.nim +++ b/datastore/threadresults.nim @@ -73,22 +73,27 @@ proc getThreadSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} = var cnt = SignalPoolRetries while cnt > 0: cnt.dec() - withLock(signalPoolLock): + signalPoolLock.acquire() + try: if signalPoolFree.len() > 0: - try: - let res = signalPoolFree.pop() - signalPoolUsed.incl(res) - return res - except KeyError: - discard + let res = signalPoolFree.pop() + signalPoolUsed.incl(res) + echo "get:signalPoolUsed:size: ", signalPoolUsed.len() + return res + except KeyError: + discard + finally: + signalPoolLock.release() await sleepAsync(10.milliseconds) raise newException(DeadThreadDefect, "reached limit trying to acquire a ThreadSignalPtr") proc release*(sig: ThreadSignalPtr) = ## Release ThreadSignalPtr back to the pool in a thread-safe way. - withLock(signalPoolLock): - signalPoolUsed.excl(sig) - signalPoolFree.incl(sig) + {.cast(gcsafe).}: + withLock(signalPoolLock): + signalPoolUsed.excl(sig) + signalPoolFree.incl(sig) + echo "free:signalPoolUsed:size: ", signalPoolUsed.len() proc threadSafeType*[T: ThreadSafeTypes](tp: typedesc[T]) = ## Used to explicitly mark a type as threadsafe. It's checked