diff --git a/library/libsds.nim b/library/libsds.nim index ec0527f..467c056 100644 --- a/library/libsds.nim +++ b/library/libsds.nim @@ -5,7 +5,7 @@ when defined(linux): {.passl: "-Wl,-soname,libsds.so".} -import std/[typetraits, tables, atomics, locks], chronos, chronicles +import std/[typetraits, tables, atomics], chronos, chronicles import ./sds_thread/sds_thread, ./alloc, @@ -57,29 +57,6 @@ template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData ) -var - ctxPool: seq[ptr SdsContext] - ctxPoolLock: Lock - -proc acquireCtx(callback: SdsCallBack, userData: pointer): ptr SdsContext = - ctxPoolLock.acquire() - defer: ctxPoolLock.release() - if ctxPool.len > 0: - result = ctxPool.pop() - else: - result = sds_thread.createSdsThread().valueOr: - let msg = "Error in createSdsThread: " & $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - return nil - -proc releaseCtx(ctx: ptr SdsContext) = - ctxPoolLock.acquire() - defer: ctxPoolLock.release() - ctx.userData = nil - ctx.eventCallback = nil - ctx.eventUserData = nil - ctxPool.add(ctx) - proc handleRequest( ctx: ptr SdsContext, requestType: RequestType, @@ -159,7 +136,6 @@ proc initializeLibrary() {.exported.} = ## Every Nim library needs to call `NimMain` once exactly, to initialize the Nim runtime. ## Being `` the value given in the optional compilation flag --nimMainPrefix:yourprefix libsdsNimMain() - ctxPoolLock.initLock() # ensure the lock is initialized once (fix Windows crash) when declared(setupForeignThreadGc): setupForeignThreadGc() when declared(nimGC_setStackBottom): @@ -183,9 +159,10 @@ proc SdsNewReliabilityManager( echo "error: missing callback in NewReliabilityManager" return nil - ## Create or reuse the SDS thread that will keep waiting for req from the main thread. - var ctx = acquireCtx(callback, userData) - if ctx.isNil(): + ## Create the SDS thread that will keep waiting for req from the main thread. + var ctx = sds_thread.createSdsThread().valueOr: + let msg = "Error in createSdsThread: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return nil ctx.userData = userData @@ -233,20 +210,14 @@ proc SdsCleanupReliabilityManager( initializeLibrary() checkLibsdsParams(ctx, callback, userData) - let resetRes = handleRequest( - ctx, - RequestType.LIFECYCLE, - SdsLifecycleRequest.createShared(SdsLifecycleMsgType.RESET_RELIABILITY_MANAGER), - callback, - userData, - ) - - if resetRes == RET_ERR: + sds_thread.destroySdsThread(ctx).isOkOr: + let msg = "libsds error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR - releaseCtx(ctx) + ## always need to invoke the callback although we don't retrieve value to the caller + callback(RET_OK, nil, 0, userData) - # handleRequest already invoked the callback; nothing else to signal here. return RET_OK proc SdsResetReliabilityManager( @@ -379,4 +350,4 @@ proc SdsStartPeriodicTasks( ) ### End of exported procs -################################################################################ +################################################################################ \ No newline at end of file diff --git a/library/sds_thread/sds_thread.nim b/library/sds_thread/sds_thread.nim index f54b327..946ce49 100644 --- a/library/sds_thread/sds_thread.nim +++ b/library/sds_thread/sds_thread.nim @@ -7,7 +7,9 @@ import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, import ../ffi_types, ./inter_thread_communication/sds_thread_request, - ../../src/[reliability_utils] + ../alloc, + ../../src/[reliability_utils], + ./shutdown type SdsContext* = object thread: Thread[(ptr SdsContext)] @@ -23,6 +25,7 @@ type SdsContext* = object retrievalHintProvider*: pointer retrievalHintUserData*: pointer running: Atomic[bool] # To control when the thread is running + threadErrorMsg: cstring # to store any error message from the thread proc runSds(ctx: ptr SdsContext) {.async.} = ## This is the worker body. This runs the SDS instance @@ -54,6 +57,18 @@ proc run(ctx: ptr SdsContext) {.thread.} = ## Launch sds worker waitFor runSds(ctx) + ctx.reqSignal.close().isOkOr: + ctx.threadErrorMsg = alloc("error closing reqSignal: " & $error) + return + + ctx.reqReceivedSignal.close().isOkOr: + ctx.threadErrorMsg = alloc("error closing reqReceivedSignal: " & $error) + return + + shutdown().isOkOr: + ctx.threadErrorMsg = alloc("error calling shutdown: " & $error) + return + proc createSdsThread*(): Result[ptr SdsContext, string] = ## This proc is called from the main thread and it creates ## the SDS working thread. @@ -85,9 +100,13 @@ proc destroySdsThread*(ctx: ptr SdsContext): Result[void, string] = return err("failed to signal reqSignal on time in destroySdsThread") joinThread(ctx.thread) + + if ctx.threadErrorMsg.isNil() == false and ctx.threadErrorMsg.len > 0: + let errorMsg = $ctx.threadErrorMsg + dealloc(ctx.threadErrorMsg) + return err("SDS thread error: " & errorMsg) + ctx.lock.deinitLock() - ?ctx.reqSignal.close() - ?ctx.reqReceivedSignal.close() freeShared(ctx) return ok() @@ -131,4 +150,4 @@ proc sendRequestToSdsThread*( ## Notice that in case of "ok", the deallocShared(req) is performed by the SDS Thread in the ## process proc. - ok() + ok() \ No newline at end of file diff --git a/library/sds_thread/shutdown.nim b/library/sds_thread/shutdown.nim new file mode 100644 index 0000000..468bf53 --- /dev/null +++ b/library/sds_thread/shutdown.nim @@ -0,0 +1,51 @@ + +import chronos, chronos/selectors2 + +## Notice that this module extends current nim-chronos functionality to provide +## proper shutdown of the thread's dispatcher. +## +## This is necessary because nim-chronos does not provide a way to close +## the selector associated with a thread's dispatcher, which may lead to +## resource leaks. +## +## Therefore, this ideally should be contributed back to nim-chronos. + +when defined(windows): + proc safeCloseHandle(h: HANDLE): Result[void, string] = + let res = closeHandle(h) + if res == 0: # WINBOOL FALSE + return err("Failed to close handle error code: " & osErrorMsg(osLastError())) + return ok() + + proc closeDispatcher*(loop: PDispatcher): Result[void, string] = + ? safeCloseHandle(loop.ioPort) + for i in loop.handles.items: + closeHandle(i) + loop.handles.clear() + return ok() + +elif defined(macosx) or defined(freebsd) or defined(netbsd) or + defined(openbsd) or defined(dragonfly) or defined(macos) or + defined(linux) or defined(android) or defined(solaris): + + proc closeDispatcher*(loop: PDispatcher): Result[void, string] = + ## Close selector associated with current thread's dispatcher. + try: + loop.getIoHandler().close() + except IOSelectorsException as e: + return err("Exception in closeDispatcher: " & e.msg) + return ok() + +proc shutdown*(): Result[void, string] {.raises: [].} = + ## Performs final cleanup of all dispatcher resources. + ## Notice that this should be called only when sure that no new async tasks will be scheduled. + ## + ## This routine shall be called only after `pollFor` has completed. Upon + ## invocation, all streams are assumed to have been closed. + ## + ## Then, it assumes the thread's dispatcher has explicitly been stopped, destroyed and will never + ## be used again. + + let disp = getThreadDispatcher() + ? closeDispatcher(disp) + return ok() diff --git a/vendor/nim-chronos b/vendor/nim-chronos index b55e281..60d6431 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit b55e2816eb45f698ddaca8d8473e401502562db2 +Subproject commit 60d64317e66f245958a819a6ceb1b20db3d239a9