Merge 9267f067e7441869d4d698ef14193eed2f0b7a5f into 47757bacea4bfeab362b4335a7cdc35c46c1e744

This commit is contained in:
Ivan FB 2026-02-09 12:07:05 +01:00 committed by GitHub
commit 3ace1b32b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 86 additions and 45 deletions

View File

@ -5,7 +5,7 @@
when defined(linux):
{.passl: "-Wl,-soname,libsds.so".}
import std/[typetraits, tables, atomics, locks], chronos, chronicles
import std/[typetraits, tables, atomics], chronos, chronicles
import
./sds_thread/sds_thread,
./alloc,
@ -57,29 +57,6 @@ template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)
var
ctxPool: seq[ptr SdsContext]
ctxPoolLock: Lock
proc acquireCtx(callback: SdsCallBack, userData: pointer): ptr SdsContext =
ctxPoolLock.acquire()
defer: ctxPoolLock.release()
if ctxPool.len > 0:
result = ctxPool.pop()
else:
result = sds_thread.createSdsThread().valueOr:
let msg = "Error in createSdsThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
proc releaseCtx(ctx: ptr SdsContext) =
ctxPoolLock.acquire()
defer: ctxPoolLock.release()
ctx.userData = nil
ctx.eventCallback = nil
ctx.eventUserData = nil
ctxPool.add(ctx)
proc handleRequest(
ctx: ptr SdsContext,
requestType: RequestType,
@ -159,7 +136,6 @@ proc initializeLibrary() {.exported.} =
## Every Nim library needs to call `<yourprefix>NimMain` once exactly, to initialize the Nim runtime.
## Being `<yourprefix>` the value given in the optional compilation flag --nimMainPrefix:yourprefix
libsdsNimMain()
ctxPoolLock.initLock() # ensure the lock is initialized once (fix Windows crash)
when declared(setupForeignThreadGc):
setupForeignThreadGc()
when declared(nimGC_setStackBottom):
@ -183,9 +159,10 @@ proc SdsNewReliabilityManager(
echo "error: missing callback in NewReliabilityManager"
return nil
## Create or reuse the SDS thread that will keep waiting for req from the main thread.
var ctx = acquireCtx(callback, userData)
if ctx.isNil():
## Create the SDS thread that will keep waiting for req from the main thread.
var ctx = sds_thread.createSdsThread().valueOr:
let msg = "Error in createSdsThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
ctx.userData = userData
@ -233,20 +210,14 @@ proc SdsCleanupReliabilityManager(
initializeLibrary()
checkLibsdsParams(ctx, callback, userData)
let resetRes = handleRequest(
ctx,
RequestType.LIFECYCLE,
SdsLifecycleRequest.createShared(SdsLifecycleMsgType.RESET_RELIABILITY_MANAGER),
callback,
userData,
)
if resetRes == RET_ERR:
sds_thread.destroySdsThread(ctx).isOkOr:
let msg = "libsds error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
releaseCtx(ctx)
## always need to invoke the callback although we don't retrieve value to the caller
callback(RET_OK, nil, 0, userData)
# handleRequest already invoked the callback; nothing else to signal here.
return RET_OK
proc SdsResetReliabilityManager(
@ -379,4 +350,4 @@ proc SdsStartPeriodicTasks(
)
### End of exported procs
################################################################################
################################################################################

View File

@ -7,7 +7,9 @@ import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single,
import
../ffi_types,
./inter_thread_communication/sds_thread_request,
../../src/[reliability_utils]
../alloc,
../../src/[reliability_utils],
./shutdown
type SdsContext* = object
thread: Thread[(ptr SdsContext)]
@ -23,6 +25,7 @@ type SdsContext* = object
retrievalHintProvider*: pointer
retrievalHintUserData*: pointer
running: Atomic[bool] # To control when the thread is running
threadErrorMsg: cstring # to store any error message from the thread
proc runSds(ctx: ptr SdsContext) {.async.} =
## This is the worker body. This runs the SDS instance
@ -54,6 +57,18 @@ proc run(ctx: ptr SdsContext) {.thread.} =
## Launch sds worker
waitFor runSds(ctx)
ctx.reqSignal.close().isOkOr:
ctx.threadErrorMsg = alloc("error closing reqSignal: " & $error)
return
ctx.reqReceivedSignal.close().isOkOr:
ctx.threadErrorMsg = alloc("error closing reqReceivedSignal: " & $error)
return
shutdown().isOkOr:
ctx.threadErrorMsg = alloc("error calling shutdown: " & $error)
return
proc createSdsThread*(): Result[ptr SdsContext, string] =
## This proc is called from the main thread and it creates
## the SDS working thread.
@ -85,9 +100,13 @@ proc destroySdsThread*(ctx: ptr SdsContext): Result[void, string] =
return err("failed to signal reqSignal on time in destroySdsThread")
joinThread(ctx.thread)
if ctx.threadErrorMsg.isNil() == false and ctx.threadErrorMsg.len > 0:
let errorMsg = $ctx.threadErrorMsg
dealloc(ctx.threadErrorMsg)
return err("SDS thread error: " & errorMsg)
ctx.lock.deinitLock()
?ctx.reqSignal.close()
?ctx.reqReceivedSignal.close()
freeShared(ctx)
return ok()
@ -131,4 +150,4 @@ proc sendRequestToSdsThread*(
## Notice that in case of "ok", the deallocShared(req) is performed by the SDS Thread in the
## process proc.
ok()
ok()

View File

@ -0,0 +1,51 @@
import chronos, chronos/selectors2
## Notice that this module extends current nim-chronos functionality to provide
## proper shutdown of the thread's dispatcher.
##
## This is necessary because nim-chronos does not provide a way to close
## the selector associated with a thread's dispatcher, which may lead to
## resource leaks.
##
## Therefore, this ideally should be contributed back to nim-chronos.
when defined(windows):
proc safeCloseHandle(h: HANDLE): Result[void, string] =
let res = closeHandle(h)
if res == 0: # WINBOOL FALSE
return err("Failed to close handle error code: " & osErrorMsg(osLastError()))
return ok()
proc closeDispatcher*(loop: PDispatcher): Result[void, string] =
? safeCloseHandle(loop.ioPort)
for i in loop.handles.items:
closeHandle(i)
loop.handles.clear()
return ok()
elif defined(macosx) or defined(freebsd) or defined(netbsd) or
defined(openbsd) or defined(dragonfly) or defined(macos) or
defined(linux) or defined(android) or defined(solaris):
proc closeDispatcher*(loop: PDispatcher): Result[void, string] =
## Close selector associated with current thread's dispatcher.
try:
loop.getIoHandler().close()
except IOSelectorsException as e:
return err("Exception in closeDispatcher: " & e.msg)
return ok()
proc shutdown*(): Result[void, string] {.raises: [].} =
## Performs final cleanup of all dispatcher resources.
## Notice that this should be called only when sure that no new async tasks will be scheduled.
##
## This routine shall be called only after `pollFor` has completed. Upon
## invocation, all streams are assumed to have been closed.
##
## Then, it assumes the thread's dispatcher has explicitly been stopped, destroyed and will never
## be used again.
let disp = getThreadDispatcher()
? closeDispatcher(disp)
return ok()

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit b55e2816eb45f698ddaca8d8473e401502562db2
Subproject commit 60d64317e66f245958a819a6ceb1b20db3d239a9