adding threadsignalptr pool

This commit is contained in:
Jaremy Creechley 2023-09-05 16:20:56 -07:00
parent 97d884c86c
commit 8b72f83995
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 20 additions and 15 deletions

View File

@ -34,7 +34,7 @@ method has*(
has(ret, self.tds, key)
await wait(ret[].signal)
finally:
ret[].signal.close()
ret[].signal.release()
return ret.convert(bool)
@ -49,7 +49,7 @@ method delete*(
delete(ret, self.tds, key)
await wait(ret[].signal)
finally:
ret[].signal.close()
ret[].signal.release()
return ret.convert(void)
@ -80,7 +80,7 @@ method get*(
get(ret, self.tds, key)
await wait(ret[].signal)
finally:
ret[].signal.close()
ret[].signal.release()
return ret.convert(seq[byte])
@ -96,7 +96,7 @@ method put*(
put(ret, self.tds, key, data)
await wait(ret[].signal)
finally:
ret[].signal.close()
ret[].signal.release()
return ret.convert(void)
@ -155,7 +155,7 @@ method query*(
proc dispose(): Future[?!void] {.async.} =
iter[].it = nil # ensure our sharedptr doesn't try and dealloc
ret[].signal.close()
ret[].signal.release()
return success()
iterWrapper.next = next

View File

@ -73,22 +73,27 @@ proc getThreadSignal*(): Future[ThreadSignalPtr] {.async, raises: [].} =
var cnt = SignalPoolRetries
while cnt > 0:
cnt.dec()
withLock(signalPoolLock):
signalPoolLock.acquire()
try:
if signalPoolFree.len() > 0:
try:
let res = signalPoolFree.pop()
signalPoolUsed.incl(res)
return res
except KeyError:
discard
let res = signalPoolFree.pop()
signalPoolUsed.incl(res)
echo "get:signalPoolUsed:size: ", signalPoolUsed.len()
return res
except KeyError:
discard
finally:
signalPoolLock.release()
await sleepAsync(10.milliseconds)
raise newException(DeadThreadDefect, "reached limit trying to acquire a ThreadSignalPtr")
proc release*(sig: ThreadSignalPtr) =
## Release ThreadSignalPtr back to the pool in a thread-safe way.
withLock(signalPoolLock):
signalPoolUsed.excl(sig)
signalPoolFree.incl(sig)
{.cast(gcsafe).}:
withLock(signalPoolLock):
signalPoolUsed.excl(sig)
signalPoolFree.incl(sig)
echo "free:signalPoolUsed:size: ", signalPoolUsed.len()
proc threadSafeType*[T: ThreadSafeTypes](tp: typedesc[T]) =
## Used to explicitly mark a type as threadsafe. It's checked