nim-datastore/datastore/threads/threadresults.nim

159 lines
5.1 KiB
Nim
Raw Normal View History

2023-09-05 13:39:13 -07:00
import pkg/chronos/threadsync
import pkg/threading/smartptrs
2023-09-05 16:10:32 -07:00
import pkg/chronos
2023-09-05 15:34:40 -07:00
import std/locks
2023-09-05 16:10:32 -07:00
import std/sets
2023-09-05 13:39:13 -07:00
import ./databuffer
export databuffer
export smartptrs
export threadsync
type
ThreadSafeTypes* = DataBuffer | void | bool | SharedPtr ##\
## This is a whitelisting of types that can be used with ThreadResult below
## These types need to be thread safe with refc. That means no
## GC types.
ThreadResult*[T] = object
## Encapsulates both the results from a thread but also the cross
## thread signaling mechanism. This makes it easier to keep them
## together.
signal*: ThreadSignalPtr
results*: Result[T, CatchableErrorBuffer]
TResult*[T] = SharedPtr[ThreadResult[T]] ##\
## SharedPtr that allocates a shared buffer and keeps the
## memory allocated until all references to it are gone.
##
2023-09-05 16:45:12 -07:00
## Important:
## On `refc` that internal destructors for ThreadResult[T]
## are *not* called. Effectively limiting this to 1 depth
## of destructors. Hence the `threadSafeType` marker below.
##
2023-09-05 15:34:40 -07:00
## Since ThreadResult is a plain object, its lifetime can be
## tied to that of an async proc. In this case it could be
## freed before the other background thread is finished.
2023-09-05 13:39:13 -07:00
##
## For example, `myFuture.cancel()` can end an async proc early.
## If the ThreadResult was stored in the async's memory then it'd
## be free'ed along with the rest of the async env. This would
## result in likely memory corruption (use-after-free).
2023-09-05 16:45:12 -07:00
##
2023-09-05 13:39:13 -07:00
2023-09-05 16:10:32 -07:00
const
2023-09-05 17:08:40 -07:00
SignalPoolSize {.intdefine.} = 10
SignalPoolRetries {.intdefine.} = 10
2023-09-05 16:10:32 -07:00
var
signalPoolLock: Lock
signalPoolFree: HashSet[ThreadSignalPtr]
signalPoolUsed: HashSet[ThreadSignalPtr]
proc initSignalPool() =
signalPoolLock.initLock()
for i in 1..SignalPoolSize:
let signal = ThreadSignalPtr.new().get()
signalPoolFree.incl(signal)
initSignalPool()
2023-09-05 16:15:32 -07:00
proc getThreadSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} =
2023-09-05 16:10:32 -07:00
## Get's a ThreadSignalPtr from the pool in a thread-safe way.
##
## This provides a simple backpressue mechanism for the
## number of requests in flight (not for the file operations themselves).
##
## This setup provides two benefits:
## - backpressure on the number of disk IO requests
## - prevents leaks in ThreadSignalPtr's from exhausting the
## processes IO descriptor limit, which results in bad
## and unpredictable failure modes.
##
## This could be put onto its own thread and use it's own set ThreadSignalPtr,
## but the sleepAsync should prove if this is useful for not.
##
{.cast(gcsafe).}:
var cnt = SignalPoolRetries
while cnt > 0:
cnt.dec()
2023-09-05 16:20:56 -07:00
signalPoolLock.acquire()
try:
2023-09-05 16:10:32 -07:00
if signalPoolFree.len() > 0:
2023-09-05 16:20:56 -07:00
let res = signalPoolFree.pop()
signalPoolUsed.incl(res)
2023-09-05 17:08:40 -07:00
# echo "get:signalPoolUsed:size: ", signalPoolUsed.len()
2023-09-05 16:20:56 -07:00
return res
except KeyError:
discard
finally:
signalPoolLock.release()
2023-09-05 17:08:40 -07:00
# echo "wait:signalPoolUsed: "
2023-09-05 16:10:32 -07:00
await sleepAsync(10.milliseconds)
raise newException(DeadThreadDefect, "reached limit trying to acquire a ThreadSignalPtr")
2023-09-05 16:45:12 -07:00
proc release*(sig: ThreadSignalPtr) {.raises: [].} =
2023-09-05 16:10:32 -07:00
## Release ThreadSignalPtr back to the pool in a thread-safe way.
2023-09-05 16:20:56 -07:00
{.cast(gcsafe).}:
withLock(signalPoolLock):
signalPoolUsed.excl(sig)
signalPoolFree.incl(sig)
2023-09-05 17:08:40 -07:00
# echo "free:signalPoolUsed:size: ", signalPoolUsed.len()
2023-09-05 16:10:32 -07:00
2023-09-05 13:39:13 -07:00
proc threadSafeType*[T: ThreadSafeTypes](tp: typedesc[T]) =
2023-09-05 13:44:25 -07:00
## Used to explicitly mark a type as threadsafe. It's checked
## at compile time in `newThreadResult`.
##
## Warning! Only non-GC types should be used!
2023-09-05 13:39:13 -07:00
discard
proc newThreadResult*[T](
tp: typedesc[T]
2023-09-05 16:15:32 -07:00
): Future[TResult[T]] {.async.} =
2023-09-05 13:39:13 -07:00
## Creates a new TResult including allocating
## a new ThreadSignalPtr.
##
## Since allocating the TSP can fail, this returns
## a Result.
mixin threadSafeType
when not compiles(threadSafeType):
{.error: "only thread safe types can be used".}
let res = newSharedPtr(ThreadResult[T])
2023-09-05 16:15:32 -07:00
res[].signal = await getThreadSignal()
res
2023-09-05 13:39:13 -07:00
proc success*[T](ret: TResult[T], value: T) =
2023-09-05 13:44:25 -07:00
## convenience wrapper for `TResult` to replicate
## normal questionable api
2023-09-05 13:39:13 -07:00
ret[].results.ok(value)
proc success*[T: void](ret: TResult[T]) =
2023-09-05 13:44:25 -07:00
## convenience wrapper for `TResult` to replicate
## normal questionable api
2023-09-05 13:39:13 -07:00
ret[].results.ok()
proc failure*[T](ret: TResult[T], exc: ref Exception) =
2023-09-05 13:44:25 -07:00
## convenience wrapper for `TResult` to replicate
## normal questionable api
2023-09-05 13:39:13 -07:00
ret[].results.err(exc.toBuffer())
proc convert*[T, S](ret: TResult[T],
tp: typedesc[S]
): Result[S, ref CatchableError] =
## convenience wrapper for `TResult` to make
## fetching results from `TResult` easier.
if ret[].results.isOk():
when S is seq[byte]:
result.ok(ret[].results.get().toSeq(byte))
elif S is string:
result.ok(ret[].results.get().toString())
elif S is void:
result.ok()
else:
result.ok(ret[].results.get())
else:
let exc: ref CatchableError = ret[].results.error().toCatchable()
result.err(exc)