mirror of
https://github.com/logos-messaging/nim-sds.git
synced 2026-02-16 12:03:07 +00:00
adopt self shutdown implementation without changing nim-chronos
This commit is contained in:
parent
239f619625
commit
cc0034f119
@ -5,7 +5,7 @@
|
||||
when defined(linux):
|
||||
{.passl: "-Wl,-soname,libsds.so".}
|
||||
|
||||
import std/[typetraits, tables, atomics, locks], chronos, chronicles
|
||||
import std/[typetraits, tables, atomics], chronos, chronicles
|
||||
import
|
||||
./sds_thread/sds_thread,
|
||||
./alloc,
|
||||
@ -57,29 +57,6 @@ template callEventCallback(ctx: ptr SdsContext, eventName: string, body: untyped
|
||||
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
|
||||
)
|
||||
|
||||
var
|
||||
ctxPool: seq[ptr SdsContext]
|
||||
ctxPoolLock: Lock
|
||||
|
||||
proc acquireCtx(callback: SdsCallBack, userData: pointer): ptr SdsContext =
|
||||
ctxPoolLock.acquire()
|
||||
defer: ctxPoolLock.release()
|
||||
if ctxPool.len > 0:
|
||||
result = ctxPool.pop()
|
||||
else:
|
||||
result = sds_thread.createSdsThread().valueOr:
|
||||
let msg = "Error in createSdsThread: " & $error
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
return nil
|
||||
|
||||
proc releaseCtx(ctx: ptr SdsContext) =
|
||||
ctxPoolLock.acquire()
|
||||
defer: ctxPoolLock.release()
|
||||
ctx.userData = nil
|
||||
ctx.eventCallback = nil
|
||||
ctx.eventUserData = nil
|
||||
ctxPool.add(ctx)
|
||||
|
||||
proc handleRequest(
|
||||
ctx: ptr SdsContext,
|
||||
requestType: RequestType,
|
||||
@ -140,7 +117,6 @@ proc initializeLibrary() {.exported.} =
|
||||
## Every Nim library needs to call `<yourprefix>NimMain` once exactly, to initialize the Nim runtime.
|
||||
## Being `<yourprefix>` the value given in the optional compilation flag --nimMainPrefix:yourprefix
|
||||
libsdsNimMain()
|
||||
ctxPoolLock.initLock() # ensure the lock is initialized once (fix Windows crash)
|
||||
when declared(setupForeignThreadGc):
|
||||
setupForeignThreadGc()
|
||||
when declared(nimGC_setStackBottom):
|
||||
@ -164,9 +140,10 @@ proc SdsNewReliabilityManager(
|
||||
echo "error: missing callback in NewReliabilityManager"
|
||||
return nil
|
||||
|
||||
## Create or reuse the SDS thread that will keep waiting for req from the main thread.
|
||||
var ctx = acquireCtx(callback, userData)
|
||||
if ctx.isNil():
|
||||
## Create the SDS thread that will keep waiting for req from the main thread.
|
||||
var ctx = sds_thread.createSdsThread().valueOr:
|
||||
let msg = "Error in createSdsThread: " & $error
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
return nil
|
||||
|
||||
ctx.userData = userData
|
||||
@ -206,20 +183,14 @@ proc SdsCleanupReliabilityManager(
|
||||
initializeLibrary()
|
||||
checkLibsdsParams(ctx, callback, userData)
|
||||
|
||||
let resetRes = handleRequest(
|
||||
ctx,
|
||||
RequestType.LIFECYCLE,
|
||||
SdsLifecycleRequest.createShared(SdsLifecycleMsgType.RESET_RELIABILITY_MANAGER),
|
||||
callback,
|
||||
userData,
|
||||
)
|
||||
|
||||
if resetRes == RET_ERR:
|
||||
sds_thread.destroySdsThread(ctx).isOkOr:
|
||||
let msg = "libsds error: " & $error
|
||||
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
|
||||
return RET_ERR
|
||||
|
||||
releaseCtx(ctx)
|
||||
## always need to invoke the callback although we don't retrieve value to the caller
|
||||
callback(RET_OK, nil, 0, userData)
|
||||
|
||||
# handleRequest already invoked the callback; nothing else to signal here.
|
||||
return RET_OK
|
||||
|
||||
proc SdsResetReliabilityManager(
|
||||
@ -352,4 +323,4 @@ proc SdsStartPeriodicTasks(
|
||||
)
|
||||
|
||||
### End of exported procs
|
||||
################################################################################
|
||||
################################################################################
|
||||
@ -7,7 +7,9 @@ import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single,
|
||||
import
|
||||
../ffi_types,
|
||||
./inter_thread_communication/sds_thread_request,
|
||||
../../src/[reliability_utils]
|
||||
../alloc,
|
||||
../../src/[reliability_utils],
|
||||
./shutdown
|
||||
|
||||
type SdsContext* = object
|
||||
thread: Thread[(ptr SdsContext)]
|
||||
@ -21,6 +23,7 @@ type SdsContext* = object
|
||||
eventCallback*: pointer
|
||||
eventUserdata*: pointer
|
||||
running: Atomic[bool] # To control when the thread is running
|
||||
threadErrorMsg: cstring # to store any error message from the thread
|
||||
|
||||
proc runSds(ctx: ptr SdsContext) {.async.} =
|
||||
## This is the worker body. This runs the SDS instance
|
||||
@ -52,6 +55,18 @@ proc run(ctx: ptr SdsContext) {.thread.} =
|
||||
## Launch sds worker
|
||||
waitFor runSds(ctx)
|
||||
|
||||
ctx.reqSignal.close().isOkOr:
|
||||
ctx.threadErrorMsg = alloc("error closing reqSignal: " & $error)
|
||||
return
|
||||
|
||||
ctx.reqReceivedSignal.close().isOkOr:
|
||||
ctx.threadErrorMsg = alloc("error closing reqReceivedSignal: " & $error)
|
||||
return
|
||||
|
||||
shutdown().isOkOr:
|
||||
ctx.threadErrorMsg = alloc("error calling shutdown: " & $error)
|
||||
return
|
||||
|
||||
proc createSdsThread*(): Result[ptr SdsContext, string] =
|
||||
## This proc is called from the main thread and it creates
|
||||
## the SDS working thread.
|
||||
@ -83,9 +98,13 @@ proc destroySdsThread*(ctx: ptr SdsContext): Result[void, string] =
|
||||
return err("failed to signal reqSignal on time in destroySdsThread")
|
||||
|
||||
joinThread(ctx.thread)
|
||||
|
||||
if ctx.threadErrorMsg.isNil() == false and ctx.threadErrorMsg.len > 0:
|
||||
let errorMsg = $ctx.threadErrorMsg
|
||||
dealloc(ctx.threadErrorMsg)
|
||||
return err("SDS thread error: " & errorMsg)
|
||||
|
||||
ctx.lock.deinitLock()
|
||||
?ctx.reqSignal.close()
|
||||
?ctx.reqReceivedSignal.close()
|
||||
freeShared(ctx)
|
||||
|
||||
return ok()
|
||||
@ -129,4 +148,4 @@ proc sendRequestToSdsThread*(
|
||||
|
||||
## Notice that in case of "ok", the deallocShared(req) is performed by the SDS Thread in the
|
||||
## process proc.
|
||||
ok()
|
||||
ok()
|
||||
50
library/sds_thread/shutdown.nim
Normal file
50
library/sds_thread/shutdown.nim
Normal file
@ -0,0 +1,50 @@
|
||||
|
||||
import chronos, chronos/selectors2
|
||||
|
||||
## Notice that this module extends current nim-chronos functionality to provide
|
||||
## proper shutdown of the thread's dispatcher.
|
||||
##
|
||||
## This is necessary because nim-chronos does not provide a way to close
|
||||
## the selector associated with a thread's dispatcher, which may lead to
|
||||
## resource leaks.
|
||||
##
|
||||
## Therefore, this ideally should be contributed back to nim-chronos.
|
||||
|
||||
when defined(windows):
|
||||
proc safeCloseHandle(h: HANDLE): Result[void, string] =
|
||||
let res = closeHandle(h)
|
||||
if res == 0: # WINBOOL FALSE
|
||||
return err("Failed to close handle error code: " & osErrorMsg(osLastError()))
|
||||
return ok()
|
||||
|
||||
proc closeDispatcher*(loop: PDispatcher): Result[void, string] =
|
||||
? safeCloseHandle(loop.ioPort)
|
||||
for i in loop.handles.items:
|
||||
closeHandle(i)
|
||||
loop.handles.clear()
|
||||
return ok()
|
||||
|
||||
elif defined(macosx) or defined(freebsd) or defined(netbsd) or
|
||||
defined(openbsd) or defined(dragonfly) or defined(macos) or
|
||||
defined(linux) or defined(android) or defined(solaris):
|
||||
|
||||
proc closeDispatcher*(loop: PDispatcher): Result[void, string] =
|
||||
## Close selector associated with current thread's dispatcher.
|
||||
try:
|
||||
loop.getIoHandler().close()
|
||||
except IOSelectorsException as e:
|
||||
return err("Exception in closeDispatcher: " & e.msg)
|
||||
return ok()
|
||||
|
||||
proc shutdown*(): Result[void, string] {.raises: [].} =
|
||||
## Performs final cleanup of all dispatcher resources.
|
||||
## Notice that this should be called only when sure that no new async tasks will be scheduled.
|
||||
##
|
||||
## This routine shall be called only after `pollFor` has completed. Upon
|
||||
## invocation, all streams are assumed to have been closed.
|
||||
##
|
||||
## Then, it assumes the thread's dispatcher has explicitly been stopped, destroyed and will never
|
||||
## be used again.
|
||||
|
||||
let disp = getThreadDispatcher()
|
||||
return ok()
|
||||
Loading…
x
Reference in New Issue
Block a user