From d427166734cb2f5509b6ba000b7be11c8ac501db Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Sat, 9 May 2026 22:02:36 +0200 Subject: [PATCH] better onDestroy sync --- ffi/ffi_context.nim | 135 ++++++++++++++++++++++------- ffi/internal/ffi_macro.nim | 6 +- tests/test_ffi_context.nim | 168 ++++++++++++++++++++++++++++++++++++- 3 files changed, 271 insertions(+), 38 deletions(-) diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index c3b901c..dd9b705 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -24,6 +24,12 @@ type FFIContext*[T] = object reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent reqReceivedSignal: ThreadSignalPtr # to signal main thread, interfacing with the FFI thread, that FFI thread received the request + stopSignal: ThreadSignalPtr + # fired by destroyFFIContext so both ffiThread and watchdogThread can exit promptly + threadExitSignal: ThreadSignalPtr + # fired by ffiThread just before it exits; destroyFFIContext waits on + # this with a bounded timeout instead of joining unconditionally, so a + # blocked event loop cannot hang the caller forever userData*: pointer callbackState*: FFICallbackState running: Atomic[bool] # To control when the threads are running @@ -140,11 +146,14 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = const WatchdogTimeout = 20.seconds # Give time for the node to be created and up before sending watchdog requests - await sleepAsync(WatchdogStartDelay) - while true: - await sleepAsync(WatchdogTimeinterval) + let initialStop = await ctx.stopSignal.wait().withTimeout(WatchdogStartDelay) + if initialStop or ctx.running.load == false: + return - if ctx.running.load == false: + while true: + let intervalStop = await ctx.stopSignal.wait().withTimeout(WatchdogTimeinterval) + + if intervalStop or ctx.running.load == false: debug "Watchdog thread exiting because FFIContext is not running" break @@ -209,22 +218,27 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) + defer: + # Signal destroyFFIContext that this thread has exited, so its bounded + # wait can unblock and proceed with cleanup. + let fireRes = ctx.threadExitSignal.fireSync() + if fireRes.isErr(): + error "failed to fire threadExitSignal on FFI thread exit", + err = fireRes.error + let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = var ffiReqHandler: T ## Holds the main library object, i.e., in charge of handling the ffi requests. ## e.g., Waku, LibP2P, SDS, etc. - while true: - await ctx.reqSignal.wait() - - if ctx.running.load == false: - break + while ctx.running.load(): + let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) + if not gotSignal: + continue ## Wait for a request from the ffi consumer thread var request: ptr FFIThreadRequest - let recvOk = ctx.reqChannel.tryRecv(request) - if not recvOk: - chronicles.error "ffi thread could not receive a request" + if not ctx.reqChannel.tryRecv(request): continue if ctx.myLib.isNil(): @@ -243,10 +257,36 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = defer: freeShared(ctx) ctx.lock.deinitLock() - if not ctx.reqSignal.isNil(): - ?ctx.reqSignal.close() - if not ctx.reqReceivedSignal.isNil(): - ?ctx.reqReceivedSignal.close() + when defined(gcRefc): + ## ThreadSignalPtr.close() is intentionally skipped under --mm:refc. + ## + ## close() goes through chronos's safeUnregisterAndCloseFd, which calls + ## getThreadDispatcher() and lazily allocates a new Selector for the + ## main thread. With refc and a heavy ref-object graph torn down by the + ## FFI thread (libwaku/libp2p), that allocation traps inside rawNewObj + ## and the refc signal handler re-enters the same allocator — the + ## process never returns. Captured stack from a hung process: + ## close → safeUnregisterAndCloseFd → getThreadDispatcher → + ## newDispatcher → Selector.new → newObj (gc.nim:488) → + ## rawNewObj (gc.nim:470) → rawNewObj → _sigtramp → signalHandler → + ## newObjNoInit → addNewObjToZCT (infinite re-entry) + ## + ## --mm:orc does NOT exhibit this bug; see the + ## "destroyFFIContext refc workaround" suite in tests/test_ffi_context.nim + ## (test "destroy after heavy ref-allocation workload returns promptly"). + ## The signal fds (a few per ctx) are reclaimed by the OS at process + ## exit; destroyFFIContext is called once per process lifetime, so the + ## leak is bounded. + discard + else: + if not ctx.reqSignal.isNil(): + ?ctx.reqSignal.close() + if not ctx.reqReceivedSignal.isNil(): + ?ctx.reqReceivedSignal.close() + if not ctx.stopSignal.isNil(): + ?ctx.stopSignal.close() + if not ctx.threadExitSignal.isNil(): + ?ctx.threadExitSignal.close() return ok() proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = @@ -255,16 +295,24 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = var ctx = createShared(FFIContext[T], 1) ctx.lock.initLock() + var success = false + defer: + if not success: + ctx.cleanUpResources().isOkOr: + error "failed to clean up resources after createFFIContext failure", + err = error + ctx.reqSignal = ThreadSignalPtr.new().valueOr: - ctx.cleanUpResources().isOkOr: - return err("could not clean resources in a failure new reqSignal: " & $error) return err("couldn't create reqSignal ThreadSignalPtr: " & $error) ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: - ctx.cleanUpResources().isOkOr: - return - err("could not clean resources in a failure new reqReceivedSignal: " & $error) - return err("couldn't create reqReceivedSignal ThreadSignalPtr") + return err("couldn't create reqReceivedSignal ThreadSignalPtr: " & $error) + + ctx.stopSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create stopSignal ThreadSignalPtr: " & $error) + + ctx.threadExitSignal = ThreadSignalPtr.new().valueOr: + return err("couldn't create threadExitSignal ThreadSignalPtr: " & $error) ctx.registeredRequests = addr ffi_types.registeredRequests @@ -273,31 +321,33 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = try: createThread(ctx.ffiThread, ffiThreadBody[T], ctx) except ValueError, ResourceExhaustedError: - ctx.cleanUpResources().isOkOr: - error "failed to clean up resources after ffiThread creation failure", err = error return err("failed to create the FFI thread: " & getCurrentExceptionMsg()) try: createThread(ctx.watchdogThread, watchdogThreadBody, ctx) except ValueError, ResourceExhaustedError: + ## ffiThread is already running; signal it to exit and join before the + ## deferred cleanUpResources closes the signals it's waiting on. ctx.running.store(false) let fireRes = ctx.reqSignal.fireSync() if fireRes.isErr(): - error "failed to signal ffiThread during watchdog cleanup", err = fireRes.error + error "failed to signal ffiThread during watchdog cleanup", + err = fireRes.error joinThread(ctx.ffiThread) - ctx.cleanUpResources().isOkOr: - error "failed to clean up resources after watchdogThread creation failure", err = error return err("failed to create the watchdog thread: " & getCurrentExceptionMsg()) + success = true return ok(ctx) proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## If the FFI thread's event loop is blocked by a synchronous handler + ## (e.g. blocking I/O), it cannot process reqSignal in time to exit. + ## In that case we leak ctx and the thread rather than hanging forever: + ## the thread will eventually exit on its own, but cleanup is skipped + ## because the thread may still be touching ctx fields. + const ThreadExitTimeout = 1500.milliseconds + ctx.running.store(false) - defer: - joinThread(ctx.ffiThread) - joinThread(ctx.watchdogThread) - ctx.cleanUpResources().isOkOr: - error "failed to clean up resources in destroyFFIContext", err = error let signaledOnTime = ctx.reqSignal.fireSync().valueOr: ctx.onNotResponding() @@ -306,6 +356,27 @@ proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.onNotResponding() return err("failed to signal reqSignal on time in destroyFFIContext") + ctx.stopSignal.fireSync().isOkOr: + error "failed to fire stopSignal in destroyFFIContext", err = $error + + ## Bounded wait for ffiThread to exit. waitSync blocks the calling thread + ## up to the timeout; ffiThread fires threadExitSignal in its defer block. + let exitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr: + ctx.onNotResponding() + return err("error waiting for FFI thread exit: " & $error) + + if not exitedOnTime: + ## Event loop is blocked by a synchronous handler. Leak the thread and + ## ctx to avoid hanging the caller forever. + ctx.onNotResponding() + return err("FFI thread did not exit in time; leaking ctx to avoid hang") + + joinThread(ctx.ffiThread) + joinThread(ctx.watchdogThread) + ctx.cleanUpResources().isOkOr: + error "failed to clean up resources in destroyFFIContext", err = error + return err("cleanUpResources failed: " & $error) + return ok() template checkParams*(ctx: ptr FFIContext, callback: FFICallBack, userData: pointer) = diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index 9cd6359..9a75f93 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -958,11 +958,13 @@ macro ffi*(prc: untyped): untyped = let `retValOrErrIdent` = `syncHelperCall` if `retValOrErrIdent`.isErr(): let errStr = `retValOrErrIdent`.error - callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData) + callback( + RET_ERR, cast[ptr cchar](errStr.cstring), cast[csize_t](errStr.len), userData + ) return RET_ERR let serialized = ffiSerialize(`retValOrErrIdent`.value) callback( - RET_OK, unsafeAddr serialized[0], cast[csize_t](serialized.len), userData + RET_OK, cast[ptr cchar](serialized.cstring), cast[csize_t](serialized.len), userData ) return RET_OK diff --git a/tests/test_ffi_context.nim b/tests/test_ffi_context.nim index ca57097..5061066 100644 --- a/tests/test_ffi_context.nim +++ b/tests/test_ffi_context.nim @@ -61,6 +61,57 @@ registerReqFFI(EmptyOkRequest, lib: ptr TestLib): proc(): Future[Result[string, string]] {.async.} = return ok("") +registerReqFFI(SlowRequest, lib: ptr TestLib): + proc(): Future[Result[string, string]] {.async.} = + await sleepAsync(500.milliseconds) + return ok("slow-done") + +# Coordination channel: the FFI handler signals the test thread the instant +# it is about to block the event loop, so the test can call destroyFFIContext +# while the event loop is truly frozen. +var gSyncBlockStarted: Channel[bool] +gSyncBlockStarted.open() + +registerReqFFI(SyncBlockingRequest, lib: ptr TestLib): + proc(): Future[Result[string, string]] {.async.} = + # Yield first so that reqReceivedSignal fires and sendRequestToFFIThread + # returns on the calling thread before we start the synchronous block. + await sleepAsync(0.milliseconds) + # Signal the test thread: the event loop is about to be frozen. + # Channel.send is annotated as raising under refc, so wrap. + try: + gSyncBlockStarted.send(true) + except Exception as exc: + return err("gSyncBlockStarted.send raised: " & exc.msg) + # Simulates a request that blocks the event-loop thread synchronously + # (e.g. w.stop() -> switch.stop() -> connManager.close() with blocking I/O). + # Unlike sleepAsync, os.sleep holds the OS thread and prevents Chronos from + # processing any callbacks -- including the reqSignal fired by destroyFFIContext. + os.sleep(5_000) + return ok("sync-blocking-done") + +# Approximates the heavy ref-object workload that libwaku/libp2p performs on +# the FFI thread. The exact cell count is large enough to force several refc +# GC cycles; under refc this stresses the heap state that, when later combined +# with a chronos Selector allocation on the main thread (via close()), used to +# trip the rawNewObj → signal-handler infinite recursion. +type RefCell = ref object + next: RefCell + payload: array[64, byte] + +registerReqFFI(HeavyRefAllocRequest, lib: ptr TestLib): + proc(): Future[Result[string, string]] {.async.} = + var head: RefCell + for i in 0 ..< 50_000: + let n = RefCell(next: head) + head = n + if i mod 1000 == 0: + await sleepAsync(0.milliseconds) + # Let the chain become collectable and yield so refc has a chance to run. + head = nil + await sleepAsync(10.milliseconds) + return ok("heavy-done") + suite "createFFIContext / destroyFFIContext": test "create and destroy succeeds": let ctx = createFFIContext[TestLib]().valueOr: @@ -75,6 +126,115 @@ suite "createFFIContext / destroyFFIContext": return check destroyFFIContext(ctx).isOk() +suite "destroyFFIContext does not hang": + test "destroy while a slow async request is still in-flight": + ## Reproduces the race where destroyFFIContext was called while a long- + ## running async request (e.g. stop_node / w.stop()) was still executing. + ## The destroy must return well within 2 seconds; before the fix it would + ## block forever on joinThread(ffiThread). + let ctx = createFFIContext[TestLib]().valueOr: + check false + return + + var d: CallbackData + initCallbackData(d) + defer: deinitCallbackData(d) + + # sendRequestToFFIThread returns as soon as the FFI thread ACKs receipt; + # the 500 ms work continues asynchronously on the FFI thread. + check sendRequestToFFIThread( + ctx, SlowRequest.ffiNewReq(testCallback, addr d) + ).isOk() + + # Destroy immediately while SlowRequest is still running. + let t0 = Moment.now() + check destroyFFIContext(ctx).isOk() + check (Moment.now() - t0) < 2.seconds + +suite "destroyFFIContext does not hang when event loop is blocked": + test "destroy while sync-blocking request is in-flight": + ## Reproduces the hang seen in logosdelivery_example.c: + ## logosdelivery_stop_node(...) -- triggers w.stop() on the FFI thread + ## sleep(1) + ## logosdelivery_destroy(...) -- hangs forever + ## + ## Root cause: w.stop() (and similar tear-down calls) can execute a + ## synchronous blocking section that holds the OS thread, preventing + ## the Chronos event loop from processing the reqSignal fired by + ## destroyFFIContext. The result is joinThread(ffiThread) never returns. + ## + ## With the fix, destroyFFIContext must complete well within the 5 s that + ## SyncBlockingRequest holds the event loop. + let ctx = createFFIContext[TestLib]().valueOr: + check false + return + + var d: CallbackData + initCallbackData(d) + defer: deinitCallbackData(d) + + check sendRequestToFFIThread( + ctx, SyncBlockingRequest.ffiNewReq(testCallback, addr d) + ).isOk() + + # Block until the FFI handler has signalled that os.sleep is about to start. + # This guarantees destroyFFIContext is called while the event loop is frozen. + discard gSyncBlockStarted.recv() + + # Destroy must return promptly even though the event loop is frozen for 5s. + # It deliberately returns err and leaks ctx in this scenario rather than + # hanging on joinThread. + let t0 = Moment.now() + check destroyFFIContext(ctx).isErr() + check (Moment.now() - t0) < 3.seconds + +suite "destroyFFIContext refc workaround": + ## Documents the refc-specific workaround in cleanUpResources. + ## + ## Background: when the FFI thread does heavy ref-object work (the workload + ## that triggered the libwaku hang in production), the refc GC heap reaches + ## a state where the very first chronos Selector allocation on the *main* + ## thread — which happens lazily inside ThreadSignalPtr.close() through + ## getThreadDispatcher() — traps in rawNewObj. The refc signal handler + ## itself re-enters the same allocator and the process never returns. + ## Captured stack: + ## close → safeUnregisterAndCloseFd → getThreadDispatcher → + ## newDispatcher → Selector.new → newObj (gc.nim:488) → rawNewObj → + ## _sigtramp → signalHandler → newObjNoInit → addNewObjToZCT (loop) + ## + ## The workaround in cleanUpResources is `when defined(gcRefc): discard`, + ## i.e. skip the close() calls under refc only. orc is unaffected and + ## still cleans up the signal fds normally. + ## + ## NOTE: this test is documentation more than regression: a synthetic + ## ref-allocation workload of ~50k cells does NOT corrupt the refc heap + ## the way the real libwaku/libp2p teardown does, so this test passes + ## even when the workaround is disabled. Reproducing the actual hang + ## requires the full libwaku workload (logosdelivery_example.c). + ## Verification of the workaround was done end-to-end against that + ## example: with `--mm:refc` and close() enabled it hangs forever in + ## the captured stack above; with `when defined(gcRefc): discard` it + ## returns immediately. Under `--mm:orc` it returns immediately either + ## way. + test "destroy after heavy ref-allocation workload returns promptly": + let ctx = createFFIContext[TestLib]().valueOr: + check false + return + + var d: CallbackData + initCallbackData(d) + defer: deinitCallbackData(d) + + check sendRequestToFFIThread( + ctx, HeavyRefAllocRequest.ffiNewReq(testCallback, addr d) + ).isOk() + waitCallback(d) + check d.retCode == RET_OK + + let t0 = Moment.now() + check destroyFFIContext(ctx).isOk() + check (Moment.now() - t0) < 3.seconds + suite "sendRequestToFFIThread": test "successful request triggers RET_OK callback": var d: CallbackData @@ -161,7 +321,7 @@ suite "ffiCtor macro": let configJson = ffiSerialize(SimpleConfig(initialValue: 42)) let ret = testlib_create(configJson.cstring, testCallback, addr d) - check ret == RET_OK + check not ret.isNil() waitCallback(d) @@ -203,7 +363,7 @@ suite "simplified .ffi. macro": let configJson = ffiSerialize(SimpleConfig(initialValue: 7)) let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD) - check ctorRet == RET_OK + check not ctorRet.isNil() waitCallback(ctorD) check ctorD.retCode == RET_OK @@ -253,7 +413,7 @@ suite "async/sync detection in .ffi.": let configJson = ffiSerialize(SimpleConfig(initialValue: 3)) let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD) - check ctorRet == RET_OK + check not ctorRet.isNil() waitCallback(ctorD) check ctorD.retCode == RET_OK @@ -322,7 +482,7 @@ suite "ptr return type in .ffi.": let configJson = ffiSerialize(SimpleConfig(initialValue: 5)) let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD) - check ctorRet == RET_OK + check not ctorRet.isNil() waitCallback(ctorD) check ctorD.retCode == RET_OK