diff --git a/datastore/memoryds.nim b/datastore/memoryds.nim index dd3192f..4a85e11 100644 --- a/datastore/memoryds.nim +++ b/datastore/memoryds.nim @@ -11,7 +11,7 @@ import pkg/upraises import ./key import ./query import ./datastore -import ./databuffer +import ./threads/databuffer export key, query diff --git a/datastore/threadproxyds.nim b/datastore/threadproxyds.nim index 4c8434a..8fa2bee 100644 --- a/datastore/threadproxyds.nim +++ b/datastore/threadproxyds.nim @@ -33,7 +33,7 @@ method has*( has(ret, self.tds, key) await wait(ret[].signal) finally: - ret[].signal.release() + ret.release() return ret.convert(bool) @@ -48,7 +48,7 @@ method delete*( delete(ret, self.tds, key) await wait(ret[].signal) finally: - ret[].signal.release() + ret.release() return ret.convert(void) @@ -79,7 +79,7 @@ method get*( get(ret, self.tds, key) await wait(ret[].signal) finally: - ret[].signal.release() + ret.release() return ret.convert(seq[byte]) @@ -95,7 +95,7 @@ method put*( put(ret, self.tds, key, data) await wait(ret[].signal) finally: - ret[].signal.release() + ret.release() return ret.convert(void) @@ -154,7 +154,7 @@ method query*( proc dispose(): Future[?!void] {.async.} = iter[].it = nil # ensure our sharedptr doesn't try and dealloc - ret[].signal.release() + ret.release() return success() iterWrapper.next = next diff --git a/datastore/threads/threadresults.nim b/datastore/threads/threadresults.nim index 2afddf1..d8ebd7d 100644 --- a/datastore/threads/threadresults.nim +++ b/datastore/threads/threadresults.nim @@ -6,6 +6,7 @@ import std/locks import std/sets import ./databuffer +import ./threadsignalpool export databuffer export smartptrs @@ -43,65 +44,6 @@ type ## result in likely memory corruption (use-after-free). ## -const - SignalPoolSize {.intdefine.} = 10 - SignalPoolRetries {.intdefine.} = 10 - -var - signalPoolLock: Lock - signalPoolFree: HashSet[ThreadSignalPtr] - signalPoolUsed: HashSet[ThreadSignalPtr] - -proc initSignalPool() = - signalPoolLock.initLock() - for i in 1..SignalPoolSize: - let signal = ThreadSignalPtr.new().get() - signalPoolFree.incl(signal) - -initSignalPool() - -proc getThreadSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} = - ## Get's a ThreadSignalPtr from the pool in a thread-safe way. - ## - ## This provides a simple backpressue mechanism for the - ## number of requests in flight (not for the file operations themselves). - ## - ## This setup provides two benefits: - ## - backpressure on the number of disk IO requests - ## - prevents leaks in ThreadSignalPtr's from exhausting the - ## processes IO descriptor limit, which results in bad - ## and unpredictable failure modes. - ## - ## This could be put onto its own thread and use it's own set ThreadSignalPtr, - ## but the sleepAsync should prove if this is useful for not. - ## - {.cast(gcsafe).}: - var cnt = SignalPoolRetries - while cnt > 0: - cnt.dec() - signalPoolLock.acquire() - try: - if signalPoolFree.len() > 0: - let res = signalPoolFree.pop() - signalPoolUsed.incl(res) - # echo "get:signalPoolUsed:size: ", signalPoolUsed.len() - return res - except KeyError: - discard - finally: - signalPoolLock.release() - # echo "wait:signalPoolUsed: " - await sleepAsync(10.milliseconds) - raise newException(DeadThreadDefect, "reached limit trying to acquire a ThreadSignalPtr") - -proc release*(sig: ThreadSignalPtr) {.raises: [].} = - ## Release ThreadSignalPtr back to the pool in a thread-safe way. - {.cast(gcsafe).}: - withLock(signalPoolLock): - signalPoolUsed.excl(sig) - signalPoolFree.incl(sig) - # echo "free:signalPoolUsed:size: ", signalPoolUsed.len() - proc threadSafeType*[T: ThreadSafeTypes](tp: typedesc[T]) = ## Used to explicitly mark a type as threadsafe. It's checked ## at compile time in `newThreadResult`. @@ -125,6 +67,9 @@ proc newThreadResult*[T]( res[].signal = await getThreadSignal() res +proc release*[T](res: TResult[T]) {.raises: [].} = + res[].signal.release() + proc success*[T](ret: TResult[T], value: T) = ## convenience wrapper for `TResult` to replicate ## normal questionable api diff --git a/datastore/threads/threadsignalpool.nim b/datastore/threads/threadsignalpool.nim new file mode 100644 index 0000000..e050ada --- /dev/null +++ b/datastore/threads/threadsignalpool.nim @@ -0,0 +1,70 @@ +import pkg/chronos/threadsync +import pkg/threading/smartptrs +import pkg/chronos +import std/locks +import std/sets + +import ./databuffer + +export databuffer +export smartptrs +export threadsync + +const + SignalPoolSize {.intdefine.} = 1024 + SignalPoolRetries {.intdefine.} = 100 + +var + signalPoolLock: Lock + signalPoolFree: HashSet[ThreadSignalPtr] + signalPoolUsed: HashSet[ThreadSignalPtr] + +proc initSignalPool() = + signalPoolLock.initLock() + for i in 1..SignalPoolSize: + let signal = ThreadSignalPtr.new().get() + signalPoolFree.incl(signal) + +initSignalPool() + +proc getThreadSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} = + ## Get's a ThreadSignalPtr from the pool in a thread-safe way. + ## + ## This provides a simple backpressue mechanism for the + ## number of requests in flight (not for the file operations themselves). + ## + ## This setup provides two benefits: + ## - backpressure on the number of disk IO requests + ## - prevents leaks in ThreadSignalPtr's from exhausting the + ## processes IO descriptor limit, which results in bad + ## and unpredictable failure modes. + ## + ## This could be put onto its own thread and use it's own set ThreadSignalPtr, + ## but the sleepAsync should prove if this is useful for not. + ## + {.cast(gcsafe).}: + var cnt = SignalPoolRetries + while cnt > 0: + cnt.dec() + signalPoolLock.acquire() + try: + if signalPoolFree.len() > 0: + let res = signalPoolFree.pop() + signalPoolUsed.incl(res) + # echo "get:signalPoolUsed:size: ", signalPoolUsed.len() + return res + except KeyError: + discard + finally: + signalPoolLock.release() + # echo "wait:signalPoolUsed: " + await sleepAsync(10.milliseconds) + raise newException(DeadThreadDefect, "reached limit trying to acquire a ThreadSignalPtr") + +proc release*(sig: ThreadSignalPtr) {.raises: [].} = + ## Release ThreadSignalPtr back to the pool in a thread-safe way. + {.cast(gcsafe).}: + withLock(signalPoolLock): + signalPoolUsed.excl(sig) + signalPoolFree.incl(sig) + # echo "free:signalPoolUsed:size: ", signalPoolUsed.len()