diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d9ec05f..761ee53 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,9 +2,9 @@ name: CI on: push: - branches: [master, main] + branches: [master, main, 'release/*'] pull_request: - branches: [master, main] + branches: [master, main, 'release/*'] jobs: # Single source of truth for Nim / Nimble versions used by every job and diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c009658..7729732 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -83,4 +83,4 @@ jobs: if [ "$RUNNER_OS" == "Windows" ]; then export PATH="$GITHUB_WORKSPACE/.nim_runtime/bin:$HOME/.nimble/bin:$PATH" fi - nim c -r --mm:${{ matrix.mm }} -d:chronicles_log_level=WARN tests/unit/${{ inputs.test }}.nim + nim c -r --mm:${{ matrix.mm }} -d:chronicles_log_level=WARN -d:ffiAllowSignalHandler tests/unit/${{ inputs.test }}.nim diff --git a/CHANGELOG.md b/CHANGELOG.md index 805b8fa..dada7b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,32 +4,12 @@ All notable changes to this project are documented in this file. ## [Unreleased] -### Changed -- User event callbacks now run on a dedicated event thread fed by a - bounded SPSC queue (default capacity 1024), so a slow listener can no - longer block the FFI thread or concurrent `add_event_listener` / - `remove_event_listener` calls - ([#6](https://github.com/logos-messaging/nim-ffi/issues/6)). -- Replaced the dedicated watchdog thread with a heartbeat check that - runs on the event thread. The FFI thread advances an atomic heartbeat - each loop iteration; if it stalls for more than 1s past the start-up - grace window, the event thread emits the `not_responding` event. - ### Added - Queue-overflow handling: when the bounded event queue is full, the library sets a sticky "stuck" flag, logs an error, fires `not_responding` from the event thread, and rejects subsequent `sendRequestToFFIThread` calls with `event queue stuck - library cannot accept new requests`. - -## [0.2.0] - 2026-06-04 - -Major release introducing the CBOR-based wire format, CBOR-backed FFI events -with a multi-listener registry, multi-language binding generation (C++, Rust, -CDDL), CI hardening with sanitizers, and several robustness fixes around -context lifetime and memory safety. - -### Added - **CBOR serialization** as the FFI wire format, replacing the previous JSON/string-based `serial.nim` ([#23](https://github.com/logos-messaging/nim-ffi/pull/23)). @@ -64,6 +44,21 @@ context lifetime and memory safety. ([#41](https://github.com/logos-messaging/nim-ffi/pull/41)). ### Changed +- Enforce `-d:noSignalHandler` at compile time: the build now fails with an + actionable message unless the consumer sets `-d:noSignalHandler` (libraries + embedded in a foreign host such as Go/Rust) or `-d:ffiAllowSignalHandler` + (standalone Nim binaries). Prevents the Nim runtime from installing its own + OS signal handlers and clobbering the host's — the cause of a real status-go + regression. +- User event callbacks now run on a dedicated event thread fed by a + bounded SPSC queue (default capacity 1024), so a slow listener can no + longer block the FFI thread or concurrent `add_event_listener` / + `remove_event_listener` calls + ([#6](https://github.com/logos-messaging/nim-ffi/issues/6)). +- Replaced the dedicated watchdog thread with a heartbeat check that + runs on the event thread. The FFI thread advances an atomic heartbeat + each loop iteration; if it stalls for more than 1s past the start-up + grace window, the event thread emits the `not_responding` event. - Removed the redundant `ffiType` macro; the `ffi` macro is now the single authoring entry point ([#22](https://github.com/logos-messaging/nim-ffi/pull/22)). @@ -91,7 +86,7 @@ context lifetime and memory safety. and `rust_client`, the latter with a Tokio async variant) (#15). - JSON/string-based FFI (de)serialization via `ffi/serial.nim` (`ffiSerialize`/`ffiDeserialize`), with `tests/test_serial.nim` coverage. - (CBOR replaced this layer later, in 0.2.0.) + (CBOR replaced this layer later.) - FFI context pool (`ffi/ffi_context_pool.nim`) using a fixed array of contexts. - Test suite expansion: `test_alloc.nim`, `test_ctx_validation.nim`, `test_ffi_context.nim`, `test_gc_compat.nim`. diff --git a/examples/echo/cpp_bindings/CMakeLists.txt b/examples/echo/cpp_bindings/CMakeLists.txt index 77dd7aa..6b6bd51 100644 --- a/examples/echo/cpp_bindings/CMakeLists.txt +++ b/examples/echo/cpp_bindings/CMakeLists.txt @@ -58,6 +58,7 @@ add_custom_command( COMMAND "${NIM_EXECUTABLE}" c --mm:orc -d:chronicles_log_level=WARN + -d:noSignalHandler --app:lib --noMain "--nimMainPrefix:libecho" diff --git a/examples/echo/cpp_bindings/echo.hpp b/examples/echo/cpp_bindings/echo.hpp index efcfa2e..39824e7 100644 --- a/examples/echo/cpp_bindings/echo.hpp +++ b/examples/echo/cpp_bindings/echo.hpp @@ -406,7 +406,7 @@ typedef void (*FFICallback)(int ret, const char* msg, size_t len, void* user_dat void* echo_create(const uint8_t* req_cbor, size_t req_cbor_len, FFICallback callback, void* user_data); int echo_shout(void* ctx, FFICallback callback, void* user_data, const uint8_t* req_cbor, size_t req_cbor_len); int echo_version(void* ctx, FFICallback callback, void* user_data, const uint8_t* req_cbor, size_t req_cbor_len); -int echo_destroy(void* ctx); +int echo_destroy(void* ctx, FFICallback callback, void* user_data); uint64_t echo_add_event_listener(void* ctx, const char* event_name, FFICallback callback, void* user_data); int echo_remove_event_listener(void* ctx, uint64_t listener_id); } // extern "C" @@ -516,7 +516,14 @@ public: // context. ~EchoCtx() { if (ptr_) { - echo_destroy(ptr_); + // `echo_destroy` is non-blocking at the C ABI: it parks the + // context for reuse and reports the outcome via the callback. Block + // here until that callback fires so the pool slot is fully drained + // and parked before this object goes away — otherwise a rapid + // create/destroy churn could outrun the recycle and exhaust the pool. + (void)ffi_call_([this](FFICallback cb, void* ud) { + return echo_destroy(ptr_, cb, ud); + }, timeout_); ptr_ = nullptr; } } diff --git a/examples/echo/echo.nimble b/examples/echo/echo.nimble index 013a49e..8243d43 100644 --- a/examples/echo/echo.nimble +++ b/examples/echo/echo.nimble @@ -9,7 +9,7 @@ requires "nim >= 2.2.4" requires "chronos" requires "chronicles" requires "taskpools" -requires "https://github.com/logos-messaging/nim-ffi >= 0.2.0" +requires "https://github.com/logos-messaging/nim-ffi >= 0.1.4" const nimFlags = "--mm:orc -d:chronicles_log_level=WARN" diff --git a/examples/timer/cpp_bindings/CMakeLists.txt b/examples/timer/cpp_bindings/CMakeLists.txt index 162d844..11713cb 100644 --- a/examples/timer/cpp_bindings/CMakeLists.txt +++ b/examples/timer/cpp_bindings/CMakeLists.txt @@ -58,6 +58,7 @@ add_custom_command( COMMAND "${NIM_EXECUTABLE}" c --mm:orc -d:chronicles_log_level=WARN + -d:noSignalHandler --app:lib --noMain "--nimMainPrefix:libmy_timer" diff --git a/examples/timer/cpp_bindings/my_timer.hpp b/examples/timer/cpp_bindings/my_timer.hpp index 8fa440b..a4a6a3a 100644 --- a/examples/timer/cpp_bindings/my_timer.hpp +++ b/examples/timer/cpp_bindings/my_timer.hpp @@ -706,7 +706,7 @@ int my_timer_echo(void* ctx, FFICallback callback, void* user_data, const uint8_ int my_timer_version(void* ctx, FFICallback callback, void* user_data, const uint8_t* req_cbor, size_t req_cbor_len); int my_timer_complex(void* ctx, FFICallback callback, void* user_data, const uint8_t* req_cbor, size_t req_cbor_len); int my_timer_schedule(void* ctx, FFICallback callback, void* user_data, const uint8_t* req_cbor, size_t req_cbor_len); -int my_timer_destroy(void* ctx); +int my_timer_destroy(void* ctx, FFICallback callback, void* user_data); uint64_t my_timer_add_event_listener(void* ctx, const char* event_name, FFICallback callback, void* user_data); int my_timer_remove_event_listener(void* ctx, uint64_t listener_id); } // extern "C" @@ -816,7 +816,14 @@ public: // context. ~MyTimerCtx() { if (ptr_) { - my_timer_destroy(ptr_); + // `my_timer_destroy` is non-blocking at the C ABI: it parks the + // context for reuse and reports the outcome via the callback. Block + // here until that callback fires so the pool slot is fully drained + // and parked before this object goes away — otherwise a rapid + // create/destroy churn could outrun the recycle and exhaust the pool. + (void)ffi_call_([this](FFICallback cb, void* ud) { + return my_timer_destroy(ptr_, cb, ud); + }, timeout_); ptr_ = nullptr; } } diff --git a/examples/timer/rust_bindings/build.rs b/examples/timer/rust_bindings/build.rs index ce9c76f..4d3b9b4 100644 --- a/examples/timer/rust_bindings/build.rs +++ b/examples/timer/rust_bindings/build.rs @@ -32,6 +32,7 @@ fn main() { cmd.arg("c") .arg("--mm:orc") .arg("-d:chronicles_log_level=WARN") + .arg("-d:noSignalHandler") .arg("--app:lib") .arg("--noMain") .arg(format!("--nimMainPrefix:libmy_timer")) diff --git a/examples/timer/rust_bindings/src/api.rs b/examples/timer/rust_bindings/src/api.rs index 709f9c1..8ad86ca 100644 --- a/examples/timer/rust_bindings/src/api.rs +++ b/examples/timer/rust_bindings/src/api.rs @@ -141,7 +141,9 @@ unsafe impl Sync for MyTimerCtx {} impl Drop for MyTimerCtx { fn drop(&mut self) { if !self.ptr.is_null() { - unsafe { ffi::my_timer_destroy(self.ptr); } + let _ = ffi_call_sync(self.timeout, |cb, ud| unsafe { + ffi::my_timer_destroy(self.ptr, cb, ud) + }); self.ptr = std::ptr::null_mut(); } } diff --git a/examples/timer/rust_bindings/src/ffi.rs b/examples/timer/rust_bindings/src/ffi.rs index 905acf4..6626b92 100644 --- a/examples/timer/rust_bindings/src/ffi.rs +++ b/examples/timer/rust_bindings/src/ffi.rs @@ -14,7 +14,7 @@ extern "C" { pub fn my_timer_version(ctx: *mut c_void, callback: FFICallback, user_data: *mut c_void, req_cbor: *const u8, req_cbor_len: usize) -> c_int; pub fn my_timer_complex(ctx: *mut c_void, callback: FFICallback, user_data: *mut c_void, req_cbor: *const u8, req_cbor_len: usize) -> c_int; pub fn my_timer_schedule(ctx: *mut c_void, callback: FFICallback, user_data: *mut c_void, req_cbor: *const u8, req_cbor_len: usize) -> c_int; - pub fn my_timer_destroy(ctx: *mut c_void) -> c_int; + pub fn my_timer_destroy(ctx: *mut c_void, callback: FFICallback, user_data: *mut c_void) -> c_int; pub fn my_timer_add_event_listener(ctx: *mut c_void, event_name: *const c_char, callback: FFICallback, user_data: *mut c_void) -> u64; pub fn my_timer_remove_event_listener(ctx: *mut c_void, listener_id: u64) -> c_int; } diff --git a/examples/timer/timer.nimble b/examples/timer/timer.nimble index 0b1966d..7c23f00 100644 --- a/examples/timer/timer.nimble +++ b/examples/timer/timer.nimble @@ -8,7 +8,7 @@ requires "nim >= 2.2.4" requires "chronos" requires "chronicles" requires "taskpools" -requires "https://github.com/logos-messaging/nim-ffi >= 0.2.0" +requires "https://github.com/logos-messaging/nim-ffi >= 0.1.4" const nimFlags = "--mm:orc -d:chronicles_log_level=WARN" diff --git a/ffi.nimble b/ffi.nimble index dc168db..5878338 100644 --- a/ffi.nimble +++ b/ffi.nimble @@ -1,6 +1,6 @@ # ffi.nimble -version = "0.2.0" +version = "0.1.4" author = "Institute of Free Technology" description = "FFI framework with custom header generation" license = "MIT or Apache License 2.0" @@ -13,8 +13,8 @@ requires "chronicles" requires "taskpools" requires "cbor_serialization == 0.3.0" -const nimFlagsOrc = "--mm:orc -d:chronicles_log_level=WARN" -const nimFlagsRefc = "--mm:refc -d:chronicles_log_level=WARN" +const nimFlagsOrc = "--mm:orc -d:chronicles_log_level=WARN -d:ffiAllowSignalHandler" +const nimFlagsRefc = "--mm:refc -d:chronicles_log_level=WARN -d:ffiAllowSignalHandler" import std/[algorithm, os, strutils] diff --git a/ffi/codegen/cpp.nim b/ffi/codegen/cpp.nim index d0454c2..02b8410 100644 --- a/ffi/codegen/cpp.nim +++ b/ffi/codegen/cpp.nim @@ -339,7 +339,9 @@ proc generateCppHeader*( [p.procName] ) of FFIKind.DTOR: - lines.add("int $1(void* ctx);" % [p.procName]) + lines.add( + "int $1(void* ctx, FFICallback callback, void* user_data);" % [p.procName] + ) # `declareLibrary` always exports the listener-registration ABI. Declare # it here so the typed event-handler wiring below can call into it. lines.add( @@ -570,8 +572,8 @@ proc generateCppHeader*( lines.add(" std::chrono::milliseconds timeout_;") if events.len > 0: # One owning entry per live listener, keyed by id. Destroyed after - # the destructor body runs `_destroy(ptr_)`, by which point the - # FFI side has joined its threads so no callback is mid-flight. + # the destructor blocks on `_destroy`'s recycle callback, by which + # point the FFI side has drained/parked the slot so no callback is mid-flight. lines.add( " std::unordered_map> listeners_;" ) diff --git a/ffi/codegen/rust.nim b/ffi/codegen/rust.nim index aaedd35..656398b 100644 --- a/ffi/codegen/rust.nim +++ b/ffi/codegen/rust.nim @@ -128,6 +128,7 @@ fn main() { cmd.arg("c") .arg("--mm:orc") .arg("-d:chronicles_log_level=WARN") + .arg("-d:noSignalHandler") .arg("--app:lib") .arg("--noMain") .arg(format!("--nimMainPrefix:lib$2")) @@ -206,6 +207,8 @@ proc generateFFIRs*(procs: seq[FFIProcMeta]): string = lines.add(" pub fn $1($2) -> *mut c_void;" % [p.procName, params.join(", ")]) of FFIKind.DTOR: params.add("ctx: *mut c_void") + params.add("callback: FFICallback") + params.add("user_data: *mut c_void") lines.add(" pub fn $1($2) -> c_int;" % [p.procName, params.join(", ")]) # Listener-registration ABI — emitted on the Nim side by `declareLibrary`, @@ -530,7 +533,14 @@ proc generateApiRs*( lines.add("impl Drop for $1 {" % [ctxTypeName]) lines.add(" fn drop(&mut self) {") lines.add(" if !self.ptr.is_null() {") - lines.add(" unsafe { ffi::$1(self.ptr); }" % [dtorProcName]) + # `_destroy` is non-blocking at the C ABI: it parks the context for + # reuse and reports the outcome via the callback. Block until that callback + # fires so the pool slot is fully drained and parked before this handle goes + # away — otherwise rapid create/destroy churn could outrun the recycle and + # exhaust the pool. The recycle outcome is best-effort on drop, so discard it. + lines.add(" let _ = ffi_call_sync(self.timeout, |cb, ud| unsafe {") + lines.add(" ffi::$1(self.ptr, cb, ud)" % [dtorProcName]) + lines.add(" });") lines.add(" self.ptr = std::ptr::null_mut();") lines.add(" }") # `listeners` is dropped automatically after this body returns. By diff --git a/ffi/codegen/templates/cpp/CMakeLists.txt.tpl b/ffi/codegen/templates/cpp/CMakeLists.txt.tpl index b204d4e..76fe8ef 100644 --- a/ffi/codegen/templates/cpp/CMakeLists.txt.tpl +++ b/ffi/codegen/templates/cpp/CMakeLists.txt.tpl @@ -58,6 +58,7 @@ add_custom_command( COMMAND "${NIM_EXECUTABLE}" c --mm:orc -d:chronicles_log_level=WARN + -d:noSignalHandler --app:lib --noMain "--nimMainPrefix:lib{{LIB}}" diff --git a/ffi/codegen/templates/cpp/context_rule_of_5.hpp.tpl b/ffi/codegen/templates/cpp/context_rule_of_5.hpp.tpl index 22ce19d..e93aa2c 100644 --- a/ffi/codegen/templates/cpp/context_rule_of_5.hpp.tpl +++ b/ffi/codegen/templates/cpp/context_rule_of_5.hpp.tpl @@ -9,7 +9,14 @@ // context. ~{{CTX}}() { if (ptr_) { - {{LIB}}_destroy(ptr_); + // `{{LIB}}_destroy` is non-blocking at the C ABI: it parks the + // context for reuse and reports the outcome via the callback. Block + // here until that callback fires so the pool slot is fully drained + // and parked before this object goes away — otherwise a rapid + // create/destroy churn could outrun the recycle and exhaust the pool. + (void)ffi_call_([this](FFICallback cb, void* ud) { + return {{LIB}}_destroy(ptr_, cb, ud); + }, timeout_); ptr_ = nullptr; } } diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index ef1b6f5..9be1791 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -6,7 +6,20 @@ {.passc: "-fPIC".} -import std/[atomics, locks, options, tables] +# Embedded in a foreign host (Go/Rust/...) the host must own OS signal handling; +# Nim installing its own handlers clobbers it (e.g. Go's SIGSEGV -> sigpanic). +# Enforce -d:noSignalHandler; standalone Nim binaries opt out via -d:ffiAllowSignalHandler. +when not defined(noSignalHandler) and not defined(ffiAllowSignalHandler): + {. + error: + "nim-ffi: missing required compile flag. If this library is embedded in a " & + "host process (Go/Rust/...), build with -d:noSignalHandler so the host keeps " & + "ownership of OS signal handlers (it needs SIGSEGV for crash recovery, stack " & + "growth and preemption). If instead this is a standalone Nim program that owns " & + "its own process, build with -d:ffiAllowSignalHandler." + .} + +import std/[atomics, locks, options, sequtils, tables] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results import ./ffi_types, @@ -18,6 +31,20 @@ import export ffi_events, ffi_handles +type CtxLifecycle {.pure.} = enum + ## State machine guarding a pooled FFI context, held as an Atomic on FFIContext. + ## The threads, signals and dispatcher kqueues are created once per slot and + ## REUSED across acquire/release — chronos never frees a dispatcher's kqueue fd + ## (design decision; freed only at process exit), so spawning a thread per + ## context would leak fds unboundedly. Recycling parks the context instead. + ## Transitions: + ## Active -> RecyclePending when the destructor is invoked + ## RecyclePending -> Recycling FFI loop drains handlers, frees lib, releases slot + ## Recycling -> Active next createFFIContext reuses the slot (markAsActive) + Active ## accepting and serving requests + RecyclePending ## recycle requested; FFI thread loop hasn't claimed it yet + Recycling ## FFI loop draining handlers, then frees lib + returns to pool + type FFIContext*[T] = object myLib*: ptr T # main library object (Waku, LibP2P, SDS, …) ffiThread: Thread[(ptr FFIContext[T])] @@ -39,6 +66,15 @@ type FFIContext*[T] = object # advanced each FFI-thread loop; event thread reads for liveness eventQueueStuck*: Atomic[bool] # sticky overflow flag running: Atomic[bool] # To control when the threads are running + lifecycle: Atomic[CtxLifecycle] # Active / RecyclePending / Recycling + recycleCallback: FFICallBack + # destructor's callback, fired by the recycle handler with the outcome: + # RET_OK once drained, RET_ERR if it timed out. Set by requestRecycle. + recycleUserData: pointer + inUse: Atomic[bool] + # whether the slot is claimed; createFFIContext claims it, the recycle + # handler clears it once drained so the owning thread can release without + # reaching into the pool. registeredRequests: ptr Table[cstring, FFIRequestProc] var onFFIThread* {.threadvar.}: bool @@ -51,6 +87,21 @@ const FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup FFIHeartbeatStaleThreshold* = 1.seconds +proc tryClaim*[T](ctx: ptr FFIContext[T]): bool = + ## Returns true if the slot was free and is now claimed, false if already in use. + var expected = false + ctx.inUse.compareExchange(expected, true) + +proc release*[T](ctx: ptr FFIContext[T]) = + ctx.inUse.store(false) + +proc isInUse*[T](ctx: ptr FFIContext[T]): bool = + ctx.inUse.load() + +proc markAsActive*[T](ctx: ptr FFIContext[T]) = + ## Re-arms a reused (recycled) slot to accept requests again. + ctx.lifecycle.store(CtxLifecycle.Active) + include ./event_thread include ./ffi_thread @@ -124,6 +175,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.registeredRequests = addr ffi_types.registeredRequests + ctx.lifecycle.store(CtxLifecycle.Active) ctx.running.store(true) try: @@ -153,14 +205,21 @@ proc fireOrErr(sig: ThreadSignalPtr, name: string): Result[void, string] = return err("failed to signal: " & name & " on time") ok() -proc waitExitOrErr( - sig: ThreadSignalPtr, name: string, timeout: Duration -): Result[void, string] = - let exited = sig.waitSync(timeout).valueOr: - return err("error waiting for exit: " & name & ": " & $error) - if not exited: - return err("did not exit in time: " & name & " (leaking ctx to avoid hang)") - ok() +proc reachedExitOrTimedOut(sig: ThreadSignalPtr, timeout: Duration): bool = + ## Best-effort bounded pre-check before joining a stopping thread. + ## Returns false ONLY on a genuine timeout (the exit signal was not observed + ## within `timeout`, so the thread may be wedged and the caller should skip + ## the join to avoid hanging). Returns true otherwise — including when + ## `waitSync` itself errors: it uses `select()`, which returns EINVAL once a + ## signal fd exceeds FD_SETSIZE under load. That error is NOT evidence the + ## thread is stuck (it was already signaled to stop and the async event loop + ## that drives its exit is unaffected), so we proceed to the authoritative, + ## fd-free joinThread rather than spuriously failing teardown and leaking the + ## pool slot. + let waited = sig.waitSync(timeout) + if waited.isOk() and not waited.get(): + return false # genuine timeout + true proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = # Skip onNotResponding on error: it takes reg.lock, which a back-pressuring @@ -183,9 +242,11 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.signalStop().isOkOr: return err("signalStop failed: " & $error) - ?ctx.threadExitSignal.waitExitOrErr("FFI thread", ThreadExitTimeout) + if not ctx.threadExitSignal.reachedExitOrTimedOut(ThreadExitTimeout): + return err("FFI thread did not exit in time (leaking ctx to avoid hang)") joinThread(ctx.ffiThread) - ?ctx.eventThreadExitSignal.waitExitOrErr("event thread", ThreadExitTimeout) + if not ctx.eventThreadExitSignal.reachedExitOrTimedOut(ThreadExitTimeout): + return err("event thread did not exit in time (leaking ctx to avoid hang)") joinThread(ctx.eventThread) ok() @@ -196,3 +257,26 @@ proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.cleanUpResources().isOkOr: return err("cleanUpResources failed: " & $error) ok() + +proc requestRecycle*[T]( + ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer +): Result[void, string] = + ## Starts the context's recycle WITHOUT stopping its worker threads, so the + ## next createFFIContext reuses the same threads, signals and kqueue fds. + ## The FFI thread loop drains the in-flight handlers, frees the lib, clears the + ## per-context state and releases the slot, then fires `callback` + ## (RET_OK drained, RET_ERR stuck). Non-blocking. + ctx.lock.acquire() + if ctx.lifecycle.load() != CtxLifecycle.Active: + ctx.lock.release() + return err("requestRecycle: context is not Active (already recycling)") + ctx.recycleCallback = callback + ctx.recycleUserData = userData + ctx.lifecycle.store(CtxLifecycle.RecyclePending) + ctx.lock.release() + + let fired = ctx.reqSignal.fireSync().valueOr: + return err("requestRecycle: failed to signal the FFI thread: " & $error) + if not fired: + return err("requestRecycle: failed to signal the FFI thread in time") + ok() diff --git a/ffi/ffi_context_pool.nim b/ffi/ffi_context_pool.nim index 5e2d2cb..e39f077 100644 --- a/ffi/ffi_context_pool.nim +++ b/ffi/ffi_context_pool.nim @@ -1,56 +1,76 @@ import std/atomics import results -import ./ffi_context +import ./ffi_context, ./ffi_types const MaxFFIContexts* = 32 - # Only affects upfront pool memory; fds/threads consumed per acquired slot. + ## Maximum number of concurrently live FFI contexts when using FFIContextPool. + ## Each slot's threads, signals and dispatcher kqueue fds are created once and + ## reused, so this also caps the process's steady-state fd usage. type FFIContextPool*[T] = object - ## Fixed pool. Bounds ThreadSignalPtr fds at MaxFFIContexts * 2. - slots: array[MaxFFIContexts, FFIContext[T]] - inUse: array[MaxFFIContexts, Atomic[bool]] - -proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] = - for i in 0 ..< MaxFFIContexts: - var expected = false - if pool.inUse[i].compareExchange(expected, true): - return ok(pool.slots[i].addr) - err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") - -proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = - for i in 0 ..< MaxFFIContexts: - if pool.slots[i].addr == ctx: - pool.inUse[i].store(false) - return + contexts: array[MaxFFIContexts, FFIContext[T]] + initialized: array[MaxFFIContexts, Atomic[bool]] + ## Whether the slot's worker has been built. Once true it stays true for the + ## process lifetime — destroy recycles the context rather than tearing it down. proc createFFIContext*[T]( pool: var FFIContextPool[T] ): Result[ptr FFIContext[T], string] = - let ctx = pool.acquireSlot().valueOr: - return err("createFFIContext: acquireSlot failed: " & $error) - initContextResources(ctx).isOkOr: - pool.releaseSlot(ctx) - return err("createFFIContext: initContextResources failed: " & $error) - ok(ctx) + ## Acquires a context from the fixed pool. The worker (threads + signals + + ## dispatcher kqueues) is built once on first use and REUSED on every later + ## acquisition — chronos never frees a dispatcher's kqueue fd, so a + ## thread-per-context model would leak fds unboundedly. + for i in 0 ..< MaxFFIContexts: + let ctx = pool.contexts[i].addr + if not ctx.tryClaim(): + continue + if pool.initialized[i].load(): + ## Reused slot: a prior destroy drained and parked it; worker still alive. + ctx.markAsActive() + return ok(ctx) + initContextResources(ctx).isOkOr: + ctx.release() + return err("createFFIContext: initContextResources failed: " & $error) + pool.initialized[i].store(true) + return ok(ctx) + return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") + +proc releaseFFIContext*[T]( + ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer +): Result[void, string] = + ## Recycle/park the context for reuse without stopping its worker threads. + ## `callback` fires once the FFI thread has drained and parked it. + ctx.requestRecycle(callback, userData) proc destroyFFIContext*[T]( pool: var FFIContextPool[T], ctx: ptr FFIContext[T] ): Result[void, string] = - ## On thread-exit timeout the slot is leaked — closing live-thread resources is unsafe. + ## Full teardown: stops/joins the worker threads and returns the context to the + ## pool, marking it uninitialised so a later createFFIContext rebuilds it. Only + ## for process/pool shutdown — normal destruction uses releaseFFIContext. ctx.stopAndJoinThreads().isOkOr: return err("destroyFFIContext(pool): " & $error) - # Required: next acquisition would otherwise re-init a live lock (UB). + # Close the ThreadSignalPtr fds and deinit the lock + event registry/queue + # BEFORE the slot is marked for rebuild. createFFIContext's rebuild path reruns + # initContextResources (initLock / initEventRegistry / initEventQueue + fresh + # signals); skipping deinit here would re-init still-live locks (UB) and orphan + # the old fds — the very leak this teardown exists to prevent. let deinitRes = ctx.deinitContextResources() - pool.releaseSlot(ctx) + for i in 0 ..< MaxFFIContexts: + if pool.contexts[i].addr == ctx: + pool.initialized[i].store(false) + break + ctx.release() deinitRes.isOkOr: return err("destroyFFIContext(pool): " & $error) - ok() + return ok() proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool = - ## Rejects nil / offset-invalid / dangling pointers at the API boundary. + ## True only if ctx points to one of the pool's contexts that is currently + ## claimed (in use). Rejects nil / dangling / parked contexts at the API boundary. if ctx.isNil(): return false for i in 0 ..< MaxFFIContexts: - if cast[pointer](pool.slots[i].addr) == ctx: - return pool.inUse[i].load() - false + if cast[pointer](pool.contexts[i].addr) == ctx: + return cast[ptr FFIContext[T]](ctx).isInUse() + return false diff --git a/ffi/ffi_thread.nim b/ffi/ffi_thread.nim index ae5b6d8..7f11054 100644 --- a/ffi/ffi_thread.nim +++ b/ffi/ffi_thread.nim @@ -28,6 +28,13 @@ proc sendRequestToFFIThread*( defer: ctx.lock.release() + # Reject once recycling has been requested: the slot is being drained/parked + # and its handler table/lib are about to be torn down. requestRecycle flips + # this under the same lock, so the check is race-free. + if ctx.lifecycle.load() != CtxLifecycle.Active: + deleteRequest(ffiRequest) + return err("FFI context is not accepting requests (being recycled)") + let sentOk = ctx.reqChannel.trySend(ffiRequest) if not sentOk: deleteRequest(ffiRequest) @@ -42,10 +49,15 @@ proc sendRequestToFFIThread*( deleteRequest(ffiRequest) return err("Couldn't fireSync in time") + # The request is now owned by the FFI thread: it has been queued and the FFI + # thread signaled, so it WILL be received, processed, and answered through the + # callback via handleRes. The waitSync below only confirms prompt receipt; its + # failure (e.g. EINVAL/EINTR under load or signal teardown) must NOT be + # surfaced as an error, or the caller would fire the callback a SECOND time + # while handleRes also fires it — a double callback onto a freed response. let res = ctx.reqReceivedSignal.waitSync(timeout) if res.isErr(): - # FFI thread was signaled and owns the request; don't double-free. - return err("Couldn't receive reqReceivedSignal signal") + return ok() # On ok the FFI thread's processRequest deallocShared(req)'s. ok() @@ -79,6 +91,75 @@ proc processRequest[T]( except Exception as e: error "Unexpected exception in handleRes", error = e.msg +proc freeLib[T](ctx: ptr FFIContext[T]) {.gcsafe.} = + ## Frees the createShared'd library object built by the ctor. Logical resource + ## cleanup is the destructor body's job; this only releases the Nim storage. + if ctx.myLib.isNil(): + return + when not defined(gcRefc): + {.cast(gcsafe).}: + `=destroy`(ctx.myLib[]) + else: + discard + freeShared(ctx.myLib) + ctx.myLib = nil + +var RecycleTimeout* = 1500.milliseconds + ## Upper bound the recycle handler waits for in-flight handlers before it + ## cancels them and reports the ctx as stuck. Returns as soon as they finish, + ## so this only bounds a *stuck* handler. A `var` so tests can shorten it. + +proc recycleContext[T]( + ctx: ptr FFIContext[T], ongoingProcessReq: ptr seq[Future[void]] +) {.async.} = + ## Runs on the FFI thread. Drains in-flight handlers, frees the lib, clears the + ## per-context event state, releases the slot for reuse, and fires the recycle + ## callback. The worker threads (and their kqueue fds) stay alive for reuse. + ongoingProcessReq[].keepItIf(not it.finished()) + + ## 1. Let in-flight handlers finish on their own, bounded by RecycleTimeout. + var naturallyDrained = ongoingProcessReq[].len == 0 + if not naturallyDrained: + naturallyDrained = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout) + + ## 2. If any are wedged, cancel them and give the cancellations a bounded moment. + var safeToRecycle = naturallyDrained + if not naturallyDrained: + for fut in ongoingProcessReq[]: + if not fut.finished(): + fut.cancelSoon() + safeToRecycle = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout) + + let cb = ctx.recycleCallback + let ud = ctx.recycleUserData + ctx.recycleCallback = nil + + if safeToRecycle: + freeLib(ctx) + # Reset per-context event state so a reused slot starts clean: drop listeners + # and drain/free any queued events so they aren't delivered to the next owner. + removeAllEventListeners(ctx[].eventRegistry) + while true: + let opt = ctx.eventQueue.tryDequeueEvent() + if opt.isNone(): + break + let qe = opt.get() + freeEventBuffers(qe.name, qe.data) + ctx.eventQueueStuck.store(false) + ongoingProcessReq[].setLen(0) + ctx.release() + + if not cb.isNil(): + foreignThreadGc: + let msg = + if naturallyDrained: + "" + else: + "recycle: in-flight requests did not finish in time" + let cmsg = msg.cstring + let retCode = if naturallyDrained: RET_OK else: RET_ERR + cb(retCode, unsafeAddr cmsg[0], cast[csize_t](msg.len), ud) + var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr # Stashed so the hook has no closure env. @@ -123,6 +204,14 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = pending.del(i) while ctx.running.load(): + # A destructor requested recycle: drain + free + park this slot for reuse, + # keeping the threads (and their kqueue fds) alive. Claim it atomically so + # only this loop runs the recycle. + var expectedRecycle = CtxLifecycle.RecyclePending + if ctx.lifecycle.compareExchange(expectedRecycle, CtxLifecycle.Recycling): + await recycleContext(ctx, addr pending) + continue + # Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread. discard ctx.ffiHeartbeat.fetchAdd(1) diff --git a/ffi/internal/ffi_library.nim b/ffi/internal/ffi_library.nim index e0e00e8..663dbea 100644 --- a/ffi/internal/ffi_library.nim +++ b/ffi/internal/ffi_library.nim @@ -147,6 +147,12 @@ macro declareLibrary*(libraryName: static[string], libType: untyped): untyped = let addName = libraryName & "_add_event_listener" let addErr = "error: invalid context in " & addName let addBody = quote: + # Sets up the calling (foreign) thread's GC before any Nim allocation + # below ($eventName / registry table+seq ops). Request entry procs do the + # same; without it a foreign thread with no Nim heap segfaults in the + # allocator under GC pressure. + when declared(initializeLibrary): + initializeLibrary() var ret: uint64 = 0 if isNil(ctx): echo `addErr` @@ -181,6 +187,10 @@ macro declareLibrary*(libraryName: static[string], libType: untyped): untyped = let removeName = libraryName & "_remove_event_listener" let removeErr = "error: invalid context in " & removeName let removeBody = quote: + # See add_event_listener: removeEventListener mutates the registry's + # GC-allocated table/seqs, so the calling foreign thread needs a GC. + when declared(initializeLibrary): + initializeLibrary() var ret: cint = 1 if isNil(ctx): echo `removeErr` diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index 7ec84fe..05304a1 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -835,6 +835,14 @@ macro ffi*(prc: untyped): untyped = let ffiBody = newStmtList() + # Set up the calling (foreign) thread's GC before any Nim allocation in + # this wrapper (e.g. `$reqTypeName` below). Without it a foreign thread + # with no Nim heap segfaults in the allocator under GC pressure. `ffiRaw` + # already does this; the `.ffi.` path must too. + ffiBody.add quote do: + when declared(initializeLibrary): + initializeLibrary() + ffiBody.add quote do: if callback.isNil: return RET_MISSING_CALLBACK @@ -1336,23 +1344,25 @@ macro ffiCtor*(prc: untyped): untyped = return stmts macro ffiDtor*(prc: untyped): untyped = - ## Defines a C-exported destructor that tears down the FFIContext after the - ## body runs. + ## Defines a C-exported destructor that RECYCLES the FFIContext after the + ## body runs — it parks the context for reuse instead of tearing down its + ## worker threads, because chronos never frees a dispatcher's kqueue fd, so a + ## thread-per-context teardown would leak fds unboundedly. ## ## The annotated proc must have exactly one parameter of the library type. - ## The body contains any library-level cleanup to run before context teardown. + ## The body contains any library-level cleanup to run before recycle. ## ## Example: ## proc waku_destroy*(w: Waku) {.ffiDtor.} = ## w.cleanup() ## ## The generated C-exported proc has the signature: - ## int waku_destroy(void* ctx) + ## int waku_destroy(void* ctx, FfiCallback callback, void* userData) ## - ## It extracts the library value from ctx, runs the body, then calls - ## destroyFFIContext to tear down the FFI thread and free the context. - ## Returns RET_OK on success, RET_ERR on failure (null/invalid ctx, or - ## destroyFFIContext failure). + ## NON-BLOCKING: it runs the body, requests recycle, and returns RET_OK once + ## accepted; the real outcome (RET_OK drained / RET_ERR stuck) arrives via + ## `callback`. Returns RET_ERR synchronously only for a null/invalid ctx or a + ## rejected recycle request. let procName = prc[0] let formalParams = prc[3] @@ -1379,7 +1389,7 @@ macro ffiDtor*(prc: untyped): untyped = if procName.kind == nnkPostfix: cExportProcName = procName[1] - let destroyResIdent = genSym(nskLet, "destroyRes") + let releaseResIdent = genSym(nskLet, "releaseRes") let ffiBody = newStmtList() @@ -1389,6 +1399,9 @@ macro ffiDtor*(prc: untyped): untyped = ffiBody.add quote do: if ctx.isNil or cast[ptr FFIContext[`libTypeName`]](ctx)[].myLib.isNil: + if not callback.isNil: + let errStr = "destroy: invalid context" + callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData) return RET_ERR ffiBody.add quote do: @@ -1402,19 +1415,28 @@ macro ffiDtor*(prc: untyped): untyped = if not isNoop: ffiBody.add(bodyNode) - let poolIdent = ident($libTypeName & "FFIPool") ffiBody.add quote do: - let `destroyResIdent` = - `poolIdent`.destroyFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx)) - if `destroyResIdent`.isErr(): + # Recycle (park) the context for reuse instead of tearing down its threads. + # The callback fires from the recycle handler once drained. + let `releaseResIdent` = + releaseFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx), callback, userData) + if `releaseResIdent`.isErr(): + if not callback.isNil: + let errStr = "destroy: " & `releaseResIdent`.error + callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData) return RET_ERR - - ffiBody.add quote do: return RET_OK + let poolIdent = ident($libTypeName & "FFIPool") + let ffiProc = newProc( name = postfix(cExportProcName, "*"), - params = @[ident("cint"), newIdentDefs(ident("ctx"), ident("pointer"))], + params = @[ + ident("cint"), + newIdentDefs(ident("ctx"), ident("pointer")), + newIdentDefs(ident("callback"), ident("FFICallBack")), + newIdentDefs(ident("userData"), ident("pointer")), + ], body = ffiBody, pragmas = newTree( nnkPragma,