adding threadsignalptr pool

This commit is contained in:
Jaremy Creechley 2023-09-05 16:10:32 -07:00
parent 13eaf47c16
commit 20c01996b7
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300

View File

@ -1,7 +1,9 @@
import pkg/chronos/threadsync
import pkg/threading/smartptrs
import pkg/chronos
import std/locks
import std/sets
import ./databuffer
@ -35,6 +37,59 @@ type
## be free'ed along with the rest of the async env. This would
## result in likely memory corruption (use-after-free).
const
SignalPoolSize {.intdefine.} = 1024
SignalPoolRetries {.intdefine.} = 1_000
var
signalPoolLock: Lock
signalPoolFree: HashSet[ThreadSignalPtr]
signalPoolUsed: HashSet[ThreadSignalPtr]
proc initSignalPool() =
signalPoolLock.initLock()
for i in 1..SignalPoolSize:
let signal = ThreadSignalPtr.new().get()
signalPoolFree.incl(signal)
initSignalPool()
proc getSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} =
## Get's a ThreadSignalPtr from the pool in a thread-safe way.
##
## This provides a simple backpressue mechanism for the
## number of requests in flight (not for the file operations themselves).
##
## This setup provides two benefits:
## - backpressure on the number of disk IO requests
## - prevents leaks in ThreadSignalPtr's from exhausting the
## processes IO descriptor limit, which results in bad
## and unpredictable failure modes.
##
## This could be put onto its own thread and use it's own set ThreadSignalPtr,
## but the sleepAsync should prove if this is useful for not.
##
{.cast(gcsafe).}:
var cnt = SignalPoolRetries
while cnt > 0:
cnt.dec()
withLock(signalPoolLock):
if signalPoolFree.len() > 0:
try:
let res = signalPoolFree.pop()
signalPoolUsed.incl(res)
return res
except KeyError:
discard
await sleepAsync(10.milliseconds)
raise newException(DeadThreadDefect, "reached limit trying to acquire a ThreadSignalPtr")
proc releaseSignal*(sig: ThreadSignalPtr) =
## Release ThreadSignalPtr back to the pool in a thread-safe way.
withLock(signalPoolLock):
signalPoolUsed.excl(sig)
signalPoolFree.incl(sig)
proc threadSafeType*[T: ThreadSafeTypes](tp: typedesc[T]) =
## Used to explicitly mark a type as threadsafe. It's checked
## at compile time in `newThreadResult`.
@ -55,7 +110,7 @@ proc newThreadResult*[T](
{.error: "only thread safe types can be used".}
let res = newSharedPtr(ThreadResult[T])
let signal = ThreadSignalPtr.new()
if signal.isErr:
return err((ref CatchableError)(msg: signal.error()))
else: