mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-01-02 14:13:07 +00:00
feat: thread pool
This commit is contained in:
parent
e67639ee08
commit
2be27d01f0
@ -5,7 +5,7 @@
|
||||
when defined(linux):
|
||||
{.passl: "-Wl,-soname,libsds.so".}
|
||||
|
||||
import std/[typetraits, tables, atomics], chronos, chronicles
|
||||
import std/[typetraits, tables, atomics, locks], chronos, chronicles
|
||||
import
|
||||
./sds_thread/sds_thread,
|
||||
./alloc,
|
||||
@ -57,6 +57,29 @@ template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped
|
||||
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
|
||||
)
|
||||
|
||||
var
|
||||
ctxPool: seq[ptr SdsContext]
|
||||
ctxPoolLock: Lock
|
||||
|
||||
proc acquireCtx(callback: SdsCallBack, userData: pointer): ptr SdsContext =
|
||||
ctxPoolLock.acquire()
|
||||
defer: ctxPoolLock.release()
|
||||
if ctxPool.len > 0:
|
||||
result = ctxPool.pop()
|
||||
else:
|
||||
result = sds_thread.createSdsThread().valueOr:
|
||||
let msg = "Error in createSdsThread: " & $error
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
return nil
|
||||
|
||||
proc releaseCtx(ctx: ptr SdsContext) =
|
||||
ctxPoolLock.acquire()
|
||||
defer: ctxPoolLock.release()
|
||||
ctx.userData = nil
|
||||
ctx.eventCallback = nil
|
||||
ctx.eventUserData = nil
|
||||
ctxPool.add(ctx)
|
||||
|
||||
proc handleRequest(
|
||||
ctx: ptr SdsContext,
|
||||
requestType: RequestType,
|
||||
@ -140,10 +163,9 @@ proc SdsNewReliabilityManager(
|
||||
echo "error: missing callback in NewReliabilityManager"
|
||||
return nil
|
||||
|
||||
## Create the SDS thread that will keep waiting for req from the main thread.
|
||||
var ctx = sds_thread.createSdsThread().valueOr:
|
||||
let msg = "Error in createSdsThread: " & $error
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
## Create or reuse the SDS thread that will keep waiting for req from the main thread.
|
||||
var ctx = acquireCtx(callback, userData)
|
||||
if ctx.isNil():
|
||||
return nil
|
||||
|
||||
ctx.userData = userData
|
||||
@ -183,14 +205,20 @@ proc SdsCleanupReliabilityManager(
|
||||
initializeLibrary()
|
||||
checkLibsdsParams(ctx, callback, userData)
|
||||
|
||||
sds_thread.destroySdsThread(ctx).isOkOr:
|
||||
let msg = "libsds error: " & $error
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
let resetRes = handleRequest(
|
||||
ctx,
|
||||
RequestType.LIFECYCLE,
|
||||
SdsLifecycleRequest.createShared(SdsLifecycleMsgType.RESET_RELIABILITY_MANAGER),
|
||||
callback,
|
||||
userData,
|
||||
)
|
||||
|
||||
if resetRes == RET_ERR:
|
||||
return RET_ERR
|
||||
|
||||
## always need to invoke the callback although we don't retrieve value to the caller
|
||||
callback(RET_OK, nil, 0, userData)
|
||||
releaseCtx(ctx)
|
||||
|
||||
# handleRequest already invoked the callback; nothing else to signal here.
|
||||
return RET_OK
|
||||
|
||||
proc SdsResetReliabilityManager(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user