diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 5492b8e..eb17ae8 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -30,8 +30,6 @@ type FFIContext*[T] = object # fired by ffiThread just before it exits; destroyFFIContext waits on # this with a bounded timeout instead of joining unconditionally, so a # blocked event loop cannot hang the caller forever - watchdogStopSignal: ThreadSignalPtr - # fired by destroyFFIContext so the watchdog exits immediately instead of waiting out its sleep userData*: pointer callbackState*: FFICallbackState running: Atomic[bool] # To control when the threads are running @@ -174,19 +172,16 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = const WatchdogTimeinterval = 1.seconds const WatchdogTimeout = 20.seconds - # Give time for the node to be created and up before sending watchdog requests. - # waitSync returns early if watchdogStopSignal fires (i.e. on destroy). - let startWait = ctx.watchdogStopSignal.waitSync(WatchdogStartDelay) - if startWait.isErr(): - error "watchdog: start-delay waitSync failed", error = startWait.error - elif startWait.get(): - return # stop signal fired during start delay + # Give time for the node to be created and up before sending watchdog requests + let initialStop = await ctx.stopSignal.wait().withTimeout(WatchdogStartDelay) + if initialStop or ctx.running.load == false: + return - while ctx.running.load: - let intervalWait = ctx.watchdogStopSignal.waitSync(WatchdogTimeinterval) - if intervalWait.isErr(): - error "watchdog: interval waitSync failed", error = intervalWait.error - elif intervalWait.get() or not ctx.running.load: + while true: + let intervalStop = await ctx.stopSignal.wait().withTimeout(WatchdogTimeinterval) + + if intervalStop or ctx.running.load == false: + debug "Watchdog thread exiting because FFIContext is not running" break let callback = proc( @@ -285,28 +280,42 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = waitFor ffiRun(ctx) -proc closeResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Closes file descriptors and deinits the lock. Does NOT free ctx memory. - ## Used by initContextResources error paths and pool destroy, where ctx is - ## not heap-allocated (pool slots live in a fixed array, not on the heap). - ctx.lock.deinitLock() - if not ctx.reqSignal.isNil(): - ?ctx.reqSignal.close() - if not ctx.reqReceivedSignal.isNil(): - ?ctx.reqReceivedSignal.close() - if not ctx.stopSignal.isNil(): - ?ctx.stopSignal.close() - if not ctx.threadExitSignal.isNil(): - ?ctx.threadExitSignal.close() - if not ctx.watchdogStopSignal.isNil(): - ?ctx.watchdogStopSignal.close() - return ok() - proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Full cleanup for heap-allocated contexts: closes all resources and frees memory. defer: freeShared(ctx) - return ctx.closeResources() + ctx.lock.deinitLock() + when defined(gcRefc): + ## ThreadSignalPtr.close() is intentionally skipped under --mm:refc. + ## + ## close() goes through chronos's safeUnregisterAndCloseFd, which calls + ## getThreadDispatcher() and lazily allocates a new Selector for the + ## main thread. With refc and a heavy ref-object graph torn down by the + ## FFI thread (libwaku/libp2p), that allocation traps inside rawNewObj + ## and the refc signal handler re-enters the same allocator — the + ## process never returns. Captured stack from a hung process: + ## close → safeUnregisterAndCloseFd → getThreadDispatcher → + ## newDispatcher → Selector.new → newObj (gc.nim:488) → + ## rawNewObj (gc.nim:470) → rawNewObj → _sigtramp → signalHandler → + ## newObjNoInit → addNewObjToZCT (infinite re-entry) + ## + ## --mm:orc does NOT exhibit this bug; see the + ## "destroyFFIContext refc workaround" suite in tests/test_ffi_context.nim + ## (test "destroy after heavy ref-allocation workload returns promptly"). + ## The signal fds (a few per ctx) are reclaimed by the OS at process + ## exit; destroyFFIContext is called once per process lifetime, so the + ## leak is bounded. + discard + else: + if not ctx.reqSignal.isNil(): + ?ctx.reqSignal.close() + if not ctx.reqReceivedSignal.isNil(): + ?ctx.reqReceivedSignal.close() + if not ctx.stopSignal.isNil(): + ?ctx.stopSignal.close() + if not ctx.threadExitSignal.isNil(): + ?ctx.threadExitSignal.close() + return ok() proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Initialises all resources inside an already-allocated FFIContext slot. @@ -322,8 +331,6 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = err = error ctx.reqSignal = ThreadSignalPtr.new().valueOr: - ctx.closeResources().isOkOr: - return err("could not close resources after reqSignal failure: " & $error) return err("couldn't create reqSignal ThreadSignalPtr: " & $error) ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: @@ -335,12 +342,6 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.threadExitSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create threadExitSignal ThreadSignalPtr: " & $error) - ctx.watchdogStopSignal = ThreadSignalPtr.new().valueOr: - ctx.closeResources().isOkOr: - return - err("could not close resources after watchdogStopSignal failure: " & $error) - return err("couldn't create watchdogStopSignal ThreadSignalPtr") - ctx.registeredRequests = addr ffi_types.registeredRequests ctx.running.store(true) @@ -348,8 +349,6 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = try: createThread(ctx.ffiThread, ffiThreadBody[T], ctx) except ValueError, ResourceExhaustedError: - ctx.closeResources().isOkOr: - error "failed to close resources after ffiThread creation failure", error = error return err("failed to create the FFI thread: " & getCurrentExceptionMsg()) try: @@ -362,9 +361,6 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = if fireRes.isErr(): error "failed to signal ffiThread during watchdog cleanup", error = fireRes.error joinThread(ctx.ffiThread) - ctx.closeResources().isOkOr: - error "failed to close resources after watchdogThread creation failure", - error = error return err("failed to create the watchdog thread: " & getCurrentExceptionMsg()) registerCtx(cast[pointer](ctx)) @@ -396,17 +392,6 @@ proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = pool.inUse[i].store(false) return -# ── Public API ──────────────────────────────────────────────────────────────── - -proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = - ## Creates a heap-allocated FFI context. The caller must call destroyFFIContext(ctx) - ## to release it. Prefer the pool overload when the maximum context count is known. - var ctx = createShared(FFIContext[T], 1) - initContextResources(ctx).isOkOr: - freeShared(ctx) - return err(error) - return ok(ctx) - proc createFFIContext*[T]( pool: var FFIContextPool[T] ): Result[ptr FFIContext[T], string] = @@ -427,10 +412,10 @@ proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = if not ffiSignaled: ctx.onNotResponding() return err("failed to signal reqSignal on time in destroyFFIContext") - let wdSignaled = ctx.watchdogStopSignal.fireSync().valueOr: - return err("error signaling watchdogStopSignal in destroyFFIContext: " & $error) + let wdSignaled = ctx.stopSignal.fireSync().valueOr: + return err("error signaling stopSignal in destroyFFIContext: " & $error) if not wdSignaled: - return err("failed to signal watchdogStopSignal on time in destroyFFIContext") + return err("failed to signal stopSignal on time in destroyFFIContext") return ok() ## If the FFI thread's event loop is blocked by a synchronous handler @@ -483,9 +468,6 @@ proc destroyFFIContext*[T]( joinThread(ctx.ffiThread) joinThread(ctx.watchdogThread) - ctx.closeResources().isOkOr: - pool.releaseSlot(ctx) - return err("closeResources failed: " & $error) pool.releaseSlot(ctx) return ok() diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index 46642b0..f310983 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -1383,9 +1383,12 @@ macro ffiCtor*(prc: untyped): untyped = # Use a gensym'd ctx identifier so both the let binding and usage match let ctxSym = genSym(nskLet, "ctx") + # Module-level pool shared by ctor and dtor for this libType + let poolIdent = ident($libTypeName & "FFIPool") + # Create the FFIContext synchronously; return nil on failure ffiBody.add quote do: - let `ctxSym` = createFFIContext[`libTypeName`]().valueOr: + let `ctxSym` = `poolIdent`.createFFIContext().valueOr: if not callback.isNil: let errStr = "ffiCtor: failed to create FFIContext: " & $error callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData) @@ -1476,8 +1479,13 @@ macro ffiCtor*(prc: untyped): untyped = ) ) + let poolDecl = quote do: + when not declared(`poolIdent`): + var `poolIdent`: FFIContextPool[`libTypeName`] + result = newStmtList( - typeDef, deleteProc, ffiNewReqProc, helperProc, processProc, addToReg, ffiProc + typeDef, deleteProc, ffiNewReqProc, helperProc, processProc, addToReg, poolDecl, + ffiProc, ) when defined(ffiDumpMacros): @@ -1548,9 +1556,10 @@ macro ffiDtor*(prc: untyped): untyped = if not isNoop: ffiBody.add(bodyNode) + let poolIdent = ident($libTypeName & "FFIPool") ffiBody.add quote do: let `destroyResIdent` = - destroyFFIContext[`libTypeName`](cast[ptr FFIContext[`libTypeName`]](ctx)) + `poolIdent`.destroyFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx)) if `destroyResIdent`.isErr(): if not callback.isNil: let errStr = "destroy failed: " & $`destroyResIdent`.error @@ -1593,7 +1602,11 @@ macro ffiDtor*(prc: untyped): untyped = ) ) - result = ffiProc + let poolDecl = quote do: + when not declared(`poolIdent`): + var `poolIdent`: FFIContextPool[`libTypeName`] + + result = newStmtList(poolDecl, ffiProc) when defined(ffiDumpMacros): echo result.repr diff --git a/tests/test_ffi_context.nim b/tests/test_ffi_context.nim index 9c30c80..dafc4e3 100644 --- a/tests/test_ffi_context.nim +++ b/tests/test_ffi_context.nim @@ -179,17 +179,19 @@ suite "FFIContextPool": suite "createFFIContext / destroyFFIContext": test "create and destroy succeeds": - let ctx = createFFIContext[TestLib]().valueOr: + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: checkpoint "createFFIContext failed: " & $error check false return - check destroyFFIContext(ctx).isOk() + check pool.destroyFFIContext(ctx).isOk() test "double destroy is safe via running flag": - let ctx = createFFIContext[TestLib]().valueOr: + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: check false return - check destroyFFIContext(ctx).isOk() + check pool.destroyFFIContext(ctx).isOk() suite "destroyFFIContext does not hang": test "destroy while a slow async request is still in-flight": @@ -197,7 +199,8 @@ suite "destroyFFIContext does not hang": ## running async request (e.g. stop_node / w.stop()) was still executing. ## The destroy must return well within 2 seconds; before the fix it would ## block forever on joinThread(ffiThread). - let ctx = createFFIContext[TestLib]().valueOr: + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: check false return @@ -213,7 +216,7 @@ suite "destroyFFIContext does not hang": # Destroy immediately while SlowRequest is still running. let t0 = Moment.now() - check destroyFFIContext(ctx).isOk() + check pool.destroyFFIContext(ctx).isOk() check (Moment.now() - t0) < 2.seconds suite "destroyFFIContext does not hang when event loop is blocked": @@ -230,7 +233,8 @@ suite "destroyFFIContext does not hang when event loop is blocked": ## ## With the fix, destroyFFIContext must complete well within the 5 s that ## SyncBlockingRequest holds the event loop. - let ctx = createFFIContext[TestLib]().valueOr: + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: check false return @@ -255,7 +259,7 @@ suite "destroyFFIContext does not hang when event loop is blocked": # It deliberately returns err and leaks ctx in this scenario rather than # hanging on joinThread. let t0 = Moment.now() - check destroyFFIContext(ctx).isErr() + check pool.destroyFFIContext(ctx).isErr() check (Moment.now() - t0) < 3.seconds # Drain the leaked thread before the test scope ends. @@ -303,7 +307,8 @@ suite "destroyFFIContext refc workaround": ## returns immediately. Under `--mm:orc` it returns immediately either ## way. test "destroy after heavy ref-allocation workload returns promptly": - let ctx = createFFIContext[TestLib]().valueOr: + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: check false return @@ -318,7 +323,7 @@ suite "destroyFFIContext refc workaround": check d.retCode == RET_OK let t0 = Moment.now() - check destroyFFIContext(ctx).isOk() + check pool.destroyFFIContext(ctx).isOk() check (Moment.now() - t0) < 3.seconds suite "sendRequestToFFIThread": @@ -328,11 +333,12 @@ suite "sendRequestToFFIThread": defer: deinitCallbackData(d) - let ctx = createFFIContext[TestLib]().valueOr: + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: check false return defer: - discard destroyFFIContext(ctx) + discard pool.destroyFFIContext(ctx) check sendRequestToFFIThread( ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring) @@ -348,11 +354,12 @@ suite "sendRequestToFFIThread": defer: deinitCallbackData(d) - let ctx = createFFIContext[TestLib]().valueOr: + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: check false return defer: - discard destroyFFIContext(ctx) + discard pool.destroyFFIContext(ctx) check sendRequestToFFIThread(ctx, FailRequest.ffiNewReq(testCallback, addr d)).isOk() waitCallback(d) @@ -364,11 +371,12 @@ suite "sendRequestToFFIThread": defer: deinitCallbackData(d) - let ctx = createFFIContext[TestLib]().valueOr: + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: check false return defer: - discard destroyFFIContext(ctx) + discard pool.destroyFFIContext(ctx) check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d)) .isOk() @@ -377,11 +385,12 @@ suite "sendRequestToFFIThread": check d.msgLen == 0 test "sequential requests are all processed": - let ctx = createFFIContext[TestLib]().valueOr: + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: check false return defer: - discard destroyFFIContext(ctx) + discard pool.destroyFFIContext(ctx) for i in 1 .. 5: var d: CallbackData @@ -439,7 +448,7 @@ suite "ffiCtor macro": check not ctx[].myLib.isNil check ctx[].myLib[].value == 42 - check destroyFFIContext(ctx).isOk() + check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() # --------------------------------------------------------------------------- # Simplified .ffi. macro integration test @@ -474,7 +483,7 @@ suite "simplified .ffi. macro": let ctxAddr = cast[uint](parseBiggestUInt(addrStr)) check ctxAddr != 0 let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr) - defer: check destroyFFIContext(ctx).isOk() + defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() # Now call the .ffi. proc var d: CallbackData @@ -524,7 +533,7 @@ suite "async/sync detection in .ffi.": let ctxAddr = cast[uint](parseBiggestUInt(addrStr)) check ctxAddr != 0 let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr) - defer: check destroyFFIContext(ctx).isOk() + defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() var d2: CallbackData initCallbackData(d2) @@ -592,7 +601,7 @@ suite "ptr return type in .ffi.": let ctxAddr = cast[uint](parseBiggestUInt(ctxAddrStr)) check ctxAddr != 0 let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr) - defer: check destroyFFIContext(ctx).isOk() + defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() # Alloc a handle var allocD: CallbackData diff --git a/tests/test_gc_compat.nim b/tests/test_gc_compat.nim index f98ef61..c7cdf7a 100644 --- a/tests/test_gc_compat.nim +++ b/tests/test_gc_compat.nim @@ -95,11 +95,12 @@ suite "GC safety - string lifetime across thread boundary": initCallbackData(d) defer: deinitCallbackData(d) - let ctx = createFFIContext[GcTestLib]().valueOr: + var pool: FFIContextPool[GcTestLib] + let ctx = pool.createFFIContext().valueOr: checkpoint "createFFIContext failed: " & $error check false return - defer: discard destroyFFIContext(ctx) + defer: discard pool.destroyFFIContext(ctx) check sendRequestToFFIThread( ctx, StringLifetimeRequest.ffiNewReq(testCallback, addr d, "hello".cstring) @@ -113,10 +114,11 @@ suite "GC safety - string lifetime across thread boundary": initCallbackData(d) defer: deinitCallbackData(d) - let ctx = createFFIContext[GcTestLib]().valueOr: + var pool: FFIContextPool[GcTestLib] + let ctx = pool.createFFIContext().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: discard pool.destroyFFIContext(ctx) check sendRequestToFFIThread( ctx, GcErrRequest.ffiNewReq(testCallback, addr d, "test".cstring) @@ -130,10 +132,11 @@ suite "GC safety - string lifetime across thread boundary": initCallbackData(d) defer: deinitCallbackData(d) - let ctx = createFFIContext[GcTestLib]().valueOr: + var pool: FFIContextPool[GcTestLib] + let ctx = pool.createFFIContext().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: discard pool.destroyFFIContext(ctx) check sendRequestToFFIThread( ctx, LargeStringRequest.ffiNewReq(testCallback, addr d) @@ -147,10 +150,11 @@ suite "GC safety - string lifetime across thread boundary": suite "GC stability - repeated requests": test "20 sequential requests without GC corruption": - let ctx = createFFIContext[GcTestLib]().valueOr: + var pool: FFIContextPool[GcTestLib] + let ctx = pool.createFFIContext().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: discard pool.destroyFFIContext(ctx) for i in 1 .. 20: var d: CallbackData