diff --git a/datastore/threadresults.nim b/datastore/threadresults.nim index 3afdaad..39b2181 100644 --- a/datastore/threadresults.nim +++ b/datastore/threadresults.nim @@ -1,7 +1,9 @@ import pkg/chronos/threadsync import pkg/threading/smartptrs +import pkg/chronos import std/locks +import std/sets import ./databuffer @@ -35,6 +37,59 @@ type ## be free'ed along with the rest of the async env. This would ## result in likely memory corruption (use-after-free). +const + SignalPoolSize {.intdefine.} = 1024 + SignalPoolRetries {.intdefine.} = 1_000 + +var + signalPoolLock: Lock + signalPoolFree: HashSet[ThreadSignalPtr] + signalPoolUsed: HashSet[ThreadSignalPtr] + +proc initSignalPool() = + signalPoolLock.initLock() + for i in 1..SignalPoolSize: + let signal = ThreadSignalPtr.new().get() + signalPoolFree.incl(signal) + +initSignalPool() + +proc getSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} = + ## Get's a ThreadSignalPtr from the pool in a thread-safe way. + ## + ## This provides a simple backpressue mechanism for the + ## number of requests in flight (not for the file operations themselves). + ## + ## This setup provides two benefits: + ## - backpressure on the number of disk IO requests + ## - prevents leaks in ThreadSignalPtr's from exhausting the + ## processes IO descriptor limit, which results in bad + ## and unpredictable failure modes. + ## + ## This could be put onto its own thread and use it's own set ThreadSignalPtr, + ## but the sleepAsync should prove if this is useful for not. + ## + {.cast(gcsafe).}: + var cnt = SignalPoolRetries + while cnt > 0: + cnt.dec() + withLock(signalPoolLock): + if signalPoolFree.len() > 0: + try: + let res = signalPoolFree.pop() + signalPoolUsed.incl(res) + return res + except KeyError: + discard + await sleepAsync(10.milliseconds) + raise newException(DeadThreadDefect, "reached limit trying to acquire a ThreadSignalPtr") + +proc releaseSignal*(sig: ThreadSignalPtr) = + ## Release ThreadSignalPtr back to the pool in a thread-safe way. + withLock(signalPoolLock): + signalPoolUsed.excl(sig) + signalPoolFree.incl(sig) + proc threadSafeType*[T: ThreadSafeTypes](tp: typedesc[T]) = ## Used to explicitly mark a type as threadsafe. It's checked ## at compile time in `newThreadResult`. @@ -55,7 +110,7 @@ proc newThreadResult*[T]( {.error: "only thread safe types can be used".} let res = newSharedPtr(ThreadResult[T]) - let signal = ThreadSignalPtr.new() + if signal.isErr: return err((ref CatchableError)(msg: signal.error())) else: