This commit is contained in:
Jaremy Creechley 2023-09-05 17:14:03 -07:00
parent 2f3449e92c
commit 82e003b048
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
4 changed files with 80 additions and 65 deletions

View File

@ -11,7 +11,7 @@ import pkg/upraises
import ./key
import ./query
import ./datastore
import ./databuffer
import ./threads/databuffer
export key, query

View File

@ -33,7 +33,7 @@ method has*(
has(ret, self.tds, key)
await wait(ret[].signal)
finally:
ret[].signal.release()
ret.release()
return ret.convert(bool)
@ -48,7 +48,7 @@ method delete*(
delete(ret, self.tds, key)
await wait(ret[].signal)
finally:
ret[].signal.release()
ret.release()
return ret.convert(void)
@ -79,7 +79,7 @@ method get*(
get(ret, self.tds, key)
await wait(ret[].signal)
finally:
ret[].signal.release()
ret.release()
return ret.convert(seq[byte])
@ -95,7 +95,7 @@ method put*(
put(ret, self.tds, key, data)
await wait(ret[].signal)
finally:
ret[].signal.release()
ret.release()
return ret.convert(void)
@ -154,7 +154,7 @@ method query*(
proc dispose(): Future[?!void] {.async.} =
iter[].it = nil # ensure our sharedptr doesn't try and dealloc
ret[].signal.release()
ret.release()
return success()
iterWrapper.next = next

View File

@ -6,6 +6,7 @@ import std/locks
import std/sets
import ./databuffer
import ./threadsignalpool
export databuffer
export smartptrs
@ -43,65 +44,6 @@ type
## result in likely memory corruption (use-after-free).
##
const
SignalPoolSize {.intdefine.} = 10
SignalPoolRetries {.intdefine.} = 10
var
signalPoolLock: Lock
signalPoolFree: HashSet[ThreadSignalPtr]
signalPoolUsed: HashSet[ThreadSignalPtr]
proc initSignalPool() =
signalPoolLock.initLock()
for i in 1..SignalPoolSize:
let signal = ThreadSignalPtr.new().get()
signalPoolFree.incl(signal)
initSignalPool()
proc getThreadSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} =
## Get's a ThreadSignalPtr from the pool in a thread-safe way.
##
## This provides a simple backpressue mechanism for the
## number of requests in flight (not for the file operations themselves).
##
## This setup provides two benefits:
## - backpressure on the number of disk IO requests
## - prevents leaks in ThreadSignalPtr's from exhausting the
## processes IO descriptor limit, which results in bad
## and unpredictable failure modes.
##
## This could be put onto its own thread and use it's own set ThreadSignalPtr,
## but the sleepAsync should prove if this is useful for not.
##
{.cast(gcsafe).}:
var cnt = SignalPoolRetries
while cnt > 0:
cnt.dec()
signalPoolLock.acquire()
try:
if signalPoolFree.len() > 0:
let res = signalPoolFree.pop()
signalPoolUsed.incl(res)
# echo "get:signalPoolUsed:size: ", signalPoolUsed.len()
return res
except KeyError:
discard
finally:
signalPoolLock.release()
# echo "wait:signalPoolUsed: "
await sleepAsync(10.milliseconds)
raise newException(DeadThreadDefect, "reached limit trying to acquire a ThreadSignalPtr")
proc release*(sig: ThreadSignalPtr) {.raises: [].} =
## Release ThreadSignalPtr back to the pool in a thread-safe way.
{.cast(gcsafe).}:
withLock(signalPoolLock):
signalPoolUsed.excl(sig)
signalPoolFree.incl(sig)
# echo "free:signalPoolUsed:size: ", signalPoolUsed.len()
proc threadSafeType*[T: ThreadSafeTypes](tp: typedesc[T]) =
## Used to explicitly mark a type as threadsafe. It's checked
## at compile time in `newThreadResult`.
@ -125,6 +67,9 @@ proc newThreadResult*[T](
res[].signal = await getThreadSignal()
res
proc release*[T](res: TResult[T]) {.raises: [].} =
res[].signal.release()
proc success*[T](ret: TResult[T], value: T) =
## convenience wrapper for `TResult` to replicate
## normal questionable api

View File

@ -0,0 +1,70 @@
import pkg/chronos/threadsync
import pkg/threading/smartptrs
import pkg/chronos
import std/locks
import std/sets
import ./databuffer
export databuffer
export smartptrs
export threadsync
const
SignalPoolSize {.intdefine.} = 1024
SignalPoolRetries {.intdefine.} = 100
var
signalPoolLock: Lock
signalPoolFree: HashSet[ThreadSignalPtr]
signalPoolUsed: HashSet[ThreadSignalPtr]
proc initSignalPool() =
signalPoolLock.initLock()
for i in 1..SignalPoolSize:
let signal = ThreadSignalPtr.new().get()
signalPoolFree.incl(signal)
initSignalPool()
proc getThreadSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} =
## Get's a ThreadSignalPtr from the pool in a thread-safe way.
##
## This provides a simple backpressue mechanism for the
## number of requests in flight (not for the file operations themselves).
##
## This setup provides two benefits:
## - backpressure on the number of disk IO requests
## - prevents leaks in ThreadSignalPtr's from exhausting the
## processes IO descriptor limit, which results in bad
## and unpredictable failure modes.
##
## This could be put onto its own thread and use it's own set ThreadSignalPtr,
## but the sleepAsync should prove if this is useful for not.
##
{.cast(gcsafe).}:
var cnt = SignalPoolRetries
while cnt > 0:
cnt.dec()
signalPoolLock.acquire()
try:
if signalPoolFree.len() > 0:
let res = signalPoolFree.pop()
signalPoolUsed.incl(res)
# echo "get:signalPoolUsed:size: ", signalPoolUsed.len()
return res
except KeyError:
discard
finally:
signalPoolLock.release()
# echo "wait:signalPoolUsed: "
await sleepAsync(10.milliseconds)
raise newException(DeadThreadDefect, "reached limit trying to acquire a ThreadSignalPtr")
proc release*(sig: ThreadSignalPtr) {.raises: [].} =
## Release ThreadSignalPtr back to the pool in a thread-safe way.
{.cast(gcsafe).}:
withLock(signalPoolLock):
signalPoolUsed.excl(sig)
signalPoolFree.incl(sig)
# echo "free:signalPoolUsed:size: ", signalPoolUsed.len()