From 4da0ec3cf1d4976161f099ed63c6ff760a1dc19b Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Sat, 2 May 2026 00:20:33 +0200 Subject: [PATCH] use fixed array of ctx to avoid consuming all fds --- ffi/ffi_context.nim | 214 ++++++++++++++++++++++++++----------- tests/test_ffi_context.nim | 90 ++++++++++++++-- 2 files changed, 229 insertions(+), 75 deletions(-) diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 48d33e4..dc25dd7 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -30,6 +30,8 @@ type FFIContext*[T] = object # fired by ffiThread just before it exits; destroyFFIContext waits on # this with a bounded timeout instead of joining unconditionally, so a # blocked event loop cannot hang the caller forever + watchdogStopSignal: ThreadSignalPtr + # fired by destroyFFIContext so the watchdog exits immediately instead of waiting out its sleep userData*: pointer callbackState*: FFICallbackState running: Atomic[bool] # To control when the threads are running @@ -39,6 +41,18 @@ type FFIContext*[T] = object var ffiCurrentCallbackState* {.threadvar.}: ptr FFICallbackState ## Set by ffiThreadBody at thread startup; read by dispatchFfiEvent. +const MaxFFIContexts* = 32 + ## Maximum number of concurrently live FFI contexts when using FFIContextPool. + ## Fds and threads are only consumed for slots that are actually acquired, + ## so this value only affects the upfront memory of the pool array. + +type FFIContextPool*[T] = object + ## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context + ## and bounds the total number of file descriptors consumed by ThreadSignalPtrs + ## to at most MaxFFIContexts * 2. + slots: array[MaxFFIContexts, FFIContext[T]] + inUse: array[MaxFFIContexts, Atomic[bool]] + const git_version* {.strdefine.} = "n/a" var contextRegistry = initHashSet[pointer]() @@ -172,16 +186,19 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = const WatchdogTimeinterval = 1.seconds const WatchdogTimeout = 20.seconds - # Give time for the node to be created and up before sending watchdog requests - let initialStop = await ctx.stopSignal.wait().withTimeout(WatchdogStartDelay) - if initialStop or ctx.running.load == false: - return + # Give time for the node to be created and up before sending watchdog requests. + # waitSync returns early if watchdogStopSignal fires (i.e. on destroy). + let startWait = ctx.watchdogStopSignal.waitSync(WatchdogStartDelay) + if startWait.isErr(): + error "watchdog: start-delay waitSync failed", err = startWait.error + elif startWait.get(): + return # stop signal fired during start delay - while true: - let intervalStop = await ctx.stopSignal.wait().withTimeout(WatchdogTimeinterval) - - if intervalStop or ctx.running.load == false: - debug "Watchdog thread exiting because FFIContext is not running" + while ctx.running.load: + let intervalWait = ctx.watchdogStopSignal.waitSync(WatchdogTimeinterval) + if intervalWait.isErr(): + error "watchdog: interval waitSync failed", err = intervalWait.error + elif intervalWait.get() or not ctx.running.load: break let callback = proc( @@ -280,46 +297,33 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = waitFor ffiRun(ctx) -proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = - defer: - freeShared(ctx) +proc closeResources[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Closes file descriptors and deinits the lock. Does NOT free ctx memory. + ## Used by initContextResources error paths and pool destroy, where ctx is + ## not heap-allocated (pool slots live in a fixed array, not on the heap). ctx.lock.deinitLock() - when defined(gcRefc): - ## ThreadSignalPtr.close() is intentionally skipped under --mm:refc. - ## - ## close() goes through chronos's safeUnregisterAndCloseFd, which calls - ## getThreadDispatcher() and lazily allocates a new Selector for the - ## main thread. With refc and a heavy ref-object graph torn down by the - ## FFI thread (libwaku/libp2p), that allocation traps inside rawNewObj - ## and the refc signal handler re-enters the same allocator — the - ## process never returns. Captured stack from a hung process: - ## close → safeUnregisterAndCloseFd → getThreadDispatcher → - ## newDispatcher → Selector.new → newObj (gc.nim:488) → - ## rawNewObj (gc.nim:470) → rawNewObj → _sigtramp → signalHandler → - ## newObjNoInit → addNewObjToZCT (infinite re-entry) - ## - ## --mm:orc does NOT exhibit this bug; see the - ## "destroyFFIContext refc workaround" suite in tests/test_ffi_context.nim - ## (test "destroy after heavy ref-allocation workload returns promptly"). - ## The signal fds (a few per ctx) are reclaimed by the OS at process - ## exit; destroyFFIContext is called once per process lifetime, so the - ## leak is bounded. - discard - else: - if not ctx.reqSignal.isNil(): - ?ctx.reqSignal.close() - if not ctx.reqReceivedSignal.isNil(): - ?ctx.reqReceivedSignal.close() - if not ctx.stopSignal.isNil(): - ?ctx.stopSignal.close() - if not ctx.threadExitSignal.isNil(): - ?ctx.threadExitSignal.close() + if not ctx.reqSignal.isNil(): + ?ctx.reqSignal.close() + if not ctx.reqReceivedSignal.isNil(): + ?ctx.reqReceivedSignal.close() + if not ctx.stopSignal.isNil(): + ?ctx.stopSignal.close() + if not ctx.threadExitSignal.isNil(): + ?ctx.threadExitSignal.close() + if not ctx.watchdogStopSignal.isNil(): + ?ctx.watchdogStopSignal.close() return ok() -proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = - ## This proc is called from the main thread and it creates - ## the FFI working thread. - var ctx = createShared(FFIContext[T], 1) +proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Full cleanup for heap-allocated contexts: closes all resources and frees memory. + defer: + freeShared(ctx) + return ctx.closeResources() + +proc initContextResources[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Initialises all resources inside an already-allocated FFIContext slot. + ## On failure every partially-initialised resource is closed; the caller + ## is responsible for releasing the slot (freeShared or pool.releaseSlot). ctx.lock.initLock() var success = false @@ -330,6 +334,8 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = err = error ctx.reqSignal = ThreadSignalPtr.new().valueOr: + ctx.closeResources().isOkOr: + return err("could not close resources after reqSignal failure: " & $error) return err("couldn't create reqSignal ThreadSignalPtr: " & $error) ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: @@ -341,6 +347,12 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = ctx.threadExitSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create threadExitSignal ThreadSignalPtr: " & $error) + ctx.watchdogStopSignal = ThreadSignalPtr.new().valueOr: + ctx.closeResources().isOkOr: + return + err("could not close resources after watchdogStopSignal failure: " & $error) + return err("couldn't create watchdogStopSignal ThreadSignalPtr") + ctx.registeredRequests = addr ffi_types.registeredRequests ctx.running.store(true) @@ -348,6 +360,8 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = try: createThread(ctx.ffiThread, ffiThreadBody[T], ctx) except ValueError, ResourceExhaustedError: + ctx.closeResources().isOkOr: + error "failed to close resources after ffiThread creation failure", err = error return err("failed to create the FFI thread: " & getCurrentExceptionMsg()) try: @@ -361,51 +375,121 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = error "failed to signal ffiThread during watchdog cleanup", err = fireRes.error joinThread(ctx.ffiThread) + ctx.closeResources().isOkOr: + error "failed to close resources after watchdogThread creation failure", + err = error return err("failed to create the watchdog thread: " & getCurrentExceptionMsg()) registerCtx(cast[pointer](ctx)) success = true + return ok() + +# ── Pool helpers ───────────────────────────────────────────────────────────── + +proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] = + for i in 0 ..< MaxFFIContexts: + var expected = false + if pool.inUse[i].compareExchange(expected, true): + return ok(pool.slots[i].addr) + return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") + +proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = + for i in 0 ..< MaxFFIContexts: + if pool.slots[i].addr == ctx: + pool.inUse[i].store(false) + return + +# ── Public API ──────────────────────────────────────────────────────────────── + +proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = + ## Creates a heap-allocated FFI context. The caller must call destroyFFIContext(ctx) + ## to release it. Prefer the pool overload when the maximum context count is known. + var ctx = createShared(FFIContext[T], 1) + initContextResources(ctx).isOkOr: + freeShared(ctx) + return err(error) return ok(ctx) -proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## If the FFI thread's event loop is blocked by a synchronous handler - ## (e.g. blocking I/O), it cannot process reqSignal in time to exit. - ## In that case we leak ctx and the thread rather than hanging forever: - ## the thread will eventually exit on its own, but cleanup is skipped - ## because the thread may still be touching ctx fields. - const ThreadExitTimeout = 1500.milliseconds - unregisterCtx(cast[pointer](ctx)) +proc createFFIContext*[T]( + pool: var FFIContextPool[T] +): Result[ptr FFIContext[T], string] = + ## Acquires a slot from the fixed pool and initialises it as an FFI context. + ## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open. + let ctx = pool.acquireSlot().valueOr: + return err(error) + initContextResources(ctx).isOkOr: + pool.releaseSlot(ctx) + return err(error) + return ok(ctx) +proc signalStop[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.running.store(false) - - let signaledOnTime = ctx.reqSignal.fireSync().valueOr: + let ffiSignaled = ctx.reqSignal.fireSync().valueOr: ctx.onNotResponding() - return err("error in destroyFFIContext: " & $error) - if not signaledOnTime: + return err("error signaling reqSignal in destroyFFIContext: " & $error) + if not ffiSignaled: ctx.onNotResponding() return err("failed to signal reqSignal on time in destroyFFIContext") + let wdSignaled = ctx.watchdogStopSignal.fireSync().valueOr: + return err("error signaling watchdogStopSignal in destroyFFIContext: " & $error) + if not wdSignaled: + return err("failed to signal watchdogStopSignal on time in destroyFFIContext") + return ok() - ctx.stopSignal.fireSync().isOkOr: - error "failed to fire stopSignal in destroyFFIContext", err = $error +## If the FFI thread's event loop is blocked by a synchronous handler +## (e.g. blocking I/O), it cannot process reqSignal in time to exit. +## destroyFFIContext waits on threadExitSignal up to this bound; on timeout it +## returns err and skips joinThread/cleanup (leaking the thread + ctx slot) +## rather than hanging the caller forever. +const ThreadExitTimeout = 1500.milliseconds + +proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Stops the FFI context that was created via createFFIContext[T]() (heap). + unregisterCtx(cast[pointer](ctx)) + + ctx.signalStop().isOkOr: + return err(error) - ## Bounded wait for ffiThread to exit. waitSync blocks the calling thread - ## up to the timeout; ffiThread fires threadExitSignal in its defer block. let exitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr: ctx.onNotResponding() return err("error waiting for FFI thread exit: " & $error) if not exitedOnTime: - ## Event loop is blocked by a synchronous handler. Leak the thread and - ## ctx to avoid hanging the caller forever. ctx.onNotResponding() return err("FFI thread did not exit in time; leaking ctx to avoid hang") joinThread(ctx.ffiThread) joinThread(ctx.watchdogThread) ctx.cleanUpResources().isOkOr: - error "failed to clean up resources in destroyFFIContext", err = error return err("cleanUpResources failed: " & $error) + return ok() +proc destroyFFIContext*[T]( + pool: var FFIContextPool[T], ctx: ptr FFIContext[T] +): Result[void, string] = + ## Stops the FFI context and returns its slot to the pool. If the FFI thread + ## is blocked and does not exit in time, the slot is leaked rather than + ## reclaimed — closing its resources while the thread is still live would be + ## unsafe. + unregisterCtx(cast[pointer](ctx)) + + ctx.signalStop().isOkOr: + return err(error) + + let exitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr: + ctx.onNotResponding() + return err("error waiting for FFI thread exit: " & $error) + + if not exitedOnTime: + ctx.onNotResponding() + return err("FFI thread did not exit in time; leaking pool slot to avoid hang") + + joinThread(ctx.ffiThread) + joinThread(ctx.watchdogThread) + ctx.closeResources().isOkOr: + pool.releaseSlot(ctx) + return err("closeResources failed: " & $error) + pool.releaseSlot(ctx) return ok() template checkParams*(ctx: ptr FFIContext, callback: FFICallBack, userData: pointer) = diff --git a/tests/test_ffi_context.nim b/tests/test_ffi_context.nim index 3ce05a8..9c30c80 100644 --- a/tests/test_ffi_context.nim +++ b/tests/test_ffi_context.nim @@ -121,6 +121,62 @@ registerReqFFI(HeavyRefAllocRequest, lib: ptr TestLib): await sleepAsync(10.milliseconds) return ok("heavy-done") +suite "FFIContextPool": + test "create and destroy via pool succeeds": + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed: " & $error + return + check pool.destroyFFIContext(ctx).isOk() + + test "slot is reused after destroy": + var pool: FFIContextPool[TestLib] + let ctx1 = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed: " & $error + return + check pool.destroyFFIContext(ctx1).isOk() + # After destroying, the same slot must be available again + let ctx2 = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed after slot release: " & $error + return + check pool.destroyFFIContext(ctx2).isOk() + check ctx1 == ctx2 # same array slot reused + + test "pool exhaustion returns error": + var pool: FFIContextPool[TestLib] + var ctxs: array[MaxFFIContexts, ptr FFIContext[TestLib]] + for i in 0 ..< MaxFFIContexts: + ctxs[i] = pool.createFFIContext().valueOr: + for j in 0 ..< i: + discard pool.destroyFFIContext(ctxs[j]) + assert false, "createFFIContext(pool) failed at slot " & $i & ": " & $error + return + # Pool is now full — next create must fail + check pool.createFFIContext().isErr() + for i in 0 ..< MaxFFIContexts: + discard pool.destroyFFIContext(ctxs[i]) + + test "requests are processed via pool context": + var pool: FFIContextPool[TestLib] + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + + let ctx = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed: " & $error + return + defer: + discard pool.destroyFFIContext(ctx) + + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, "pool".cstring) + ) + .isOk() + waitCallback(d) + check d.retCode == RET_OK + check callbackMsg(d) == "pong:pool" + suite "createFFIContext / destroyFFIContext": test "create and destroy succeeds": let ctx = createFFIContext[TestLib]().valueOr: @@ -269,14 +325,19 @@ suite "sendRequestToFFIThread": test "successful request triggers RET_OK callback": var d: CallbackData initCallbackData(d) - defer: deinitCallbackData(d) + defer: + deinitCallbackData(d) let ctx = createFFIContext[TestLib]().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: + discard destroyFFIContext(ctx) - check sendRequestToFFIThread(ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring)).isOk() + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring) + ) + .isOk() waitCallback(d) check d.retCode == RET_OK check callbackMsg(d) == "pong:hello" @@ -284,12 +345,14 @@ suite "sendRequestToFFIThread": test "failing request triggers RET_ERR callback": var d: CallbackData initCallbackData(d) - defer: deinitCallbackData(d) + defer: + deinitCallbackData(d) let ctx = createFFIContext[TestLib]().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: + discard destroyFFIContext(ctx) check sendRequestToFFIThread(ctx, FailRequest.ffiNewReq(testCallback, addr d)).isOk() waitCallback(d) @@ -298,14 +361,17 @@ suite "sendRequestToFFIThread": test "empty ok response delivers empty message": var d: CallbackData initCallbackData(d) - defer: deinitCallbackData(d) + defer: + deinitCallbackData(d) let ctx = createFFIContext[TestLib]().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: + discard destroyFFIContext(ctx) - check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d)).isOk() + check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d)) + .isOk() waitCallback(d) check d.retCode == RET_OK check d.msgLen == 0 @@ -314,13 +380,17 @@ suite "sendRequestToFFIThread": let ctx = createFFIContext[TestLib]().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: + discard destroyFFIContext(ctx) for i in 1 .. 5: var d: CallbackData initCallbackData(d) let msg = "msg" & $i - check sendRequestToFFIThread(ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring)).isOk() + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring) + ) + .isOk() waitCallback(d) deinitCallbackData(d) check d.retCode == RET_OK