diff --git a/CHANGELOG.md b/CHANGELOG.md index 40adcc0..52ce175 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project are documented in this file. ## [Unreleased] ### Changed +- Enforce `-d:noSignalHandler` at compile time: the build now fails with an + actionable message unless the consumer sets `-d:noSignalHandler` (libraries + embedded in a foreign host such as Go/Rust) or `-d:ffiAllowSignalHandler` + (standalone Nim binaries). Prevents the Nim runtime from installing its own + OS signal handlers and clobbering the host's — the cause of a real status-go + regression. - User event callbacks now run on a dedicated event thread fed by a bounded SPSC queue (default capacity 1024), so a slow listener can no longer block the FFI thread or concurrent `add_event_listener` / @@ -14,15 +20,6 @@ All notable changes to this project are documented in this file. runs on the event thread. The FFI thread advances an atomic heartbeat each loop iteration; if it stalls for more than 1s past the start-up grace window, the event thread emits the `not_responding` event. -- Removed the redundant `ffiType` macro; the `ffi` macro is now the single - authoring entry point - ([#22](https://github.com/logos-messaging/nim-ffi/pull/22)). -- Generated C++ avoids move constructors and assignment operators - ([#36](https://github.com/logos-messaging/nim-ffi/pull/36)) and no longer - throws exceptions across the binding boundary - ([#46](https://github.com/logos-messaging/nim-ffi/pull/46)). -- Removed the wildcard event listener; event dispatch is now strictly - per-event ([#70](https://github.com/logos-messaging/nim-ffi/pull/70)). ### Added - Queue-overflow handling: when the bounded event queue is full, the @@ -30,6 +27,15 @@ All notable changes to this project are documented in this file. `not_responding` from the event thread, and rejects subsequent `sendRequestToFFIThread` calls with `event queue stuck - library cannot accept new requests`. + +## [0.2.0] - 2026-06-04 + +Major release introducing the CBOR-based wire format, CBOR-backed FFI events +with a multi-listener registry, multi-language binding generation (C++, Rust, +CDDL), CI hardening with sanitizers, and several robustness fixes around +context lifetime and memory safety. + +### Added - **CBOR serialization** as the FFI wire format, replacing the previous JSON/string-based `serial.nim` ([#23](https://github.com/logos-messaging/nim-ffi/pull/23)). @@ -63,6 +69,17 @@ All notable changes to this project are documented in this file. - CBOR type-coverage tests ([#41](https://github.com/logos-messaging/nim-ffi/pull/41)). +### Changed +- Removed the redundant `ffiType` macro; the `ffi` macro is now the single + authoring entry point + ([#22](https://github.com/logos-messaging/nim-ffi/pull/22)). +- Generated C++ avoids move constructors and assignment operators + ([#36](https://github.com/logos-messaging/nim-ffi/pull/36)) and no longer + throws exceptions across the binding boundary + ([#46](https://github.com/logos-messaging/nim-ffi/pull/46)). +- Removed the wildcard event listener; event dispatch is now strictly + per-event ([#70](https://github.com/logos-messaging/nim-ffi/pull/70)). + ### Fixed - Use-after-free in the event/context lifetime path ([#47](https://github.com/logos-messaging/nim-ffi/pull/47)). @@ -121,176 +138,3 @@ Initial tagged release. watchdog with configurable timeout ([#7](https://github.com/logos-messaging/nim-ffi/pull/7)). - License files updated to comply with Logos licensing requirements. -# Changelog - -All notable changes to this project are documented in this file. - -## [Unreleased] - -### Changed -- User event callbacks now run on a dedicated event thread fed by a - bounded SPSC queue (default capacity 1024), so a slow listener can no - longer block the FFI thread or concurrent `add_event_listener` / - `remove_event_listener` calls - ([#6](https://github.com/logos-messaging/nim-ffi/issues/6)). -- Replaced the dedicated watchdog thread with a heartbeat check that - runs on the event thread. The FFI thread advances an atomic heartbeat - each loop iteration; if it stalls for more than 1s past the start-up - grace window, the event thread emits the `not_responding` event. -- Removed the redundant `ffiType` macro; the `ffi` macro is now the single - authoring entry point - ([#22](https://github.com/logos-messaging/nim-ffi/pull/22)). -- Generated C++ avoids move constructors and assignment operators - ([#36](https://github.com/logos-messaging/nim-ffi/pull/36)) and no longer - throws exceptions across the binding boundary - ([#46](https://github.com/logos-messaging/nim-ffi/pull/46)). -- Removed the wildcard event listener; event dispatch is now strictly - per-event ([#70](https://github.com/logos-messaging/nim-ffi/pull/70)). - -### Added -- Queue-overflow handling: when the bounded event queue is full, the - library sets a sticky "stuck" flag, logs an error, fires - `not_responding` from the event thread, and rejects subsequent - `sendRequestToFFIThread` calls with `event queue stuck - library - cannot accept new requests`. -- **CBOR serialization** as the FFI wire format, replacing the previous - JSON/string-based `serial.nim` - ([#23](https://github.com/logos-messaging/nim-ffi/pull/23)). -- **CBOR-backed FFI events**: event payloads are now serialized with CBOR - ([#39](https://github.com/logos-messaging/nim-ffi/pull/39)). -- **Multi-listener event registry** (`FFIEventRegistry`) and its wiring into - `FFIContext` - ([#45](https://github.com/logos-messaging/nim-ffi/pull/45), - [#49](https://github.com/logos-messaging/nim-ffi/pull/49)). -- **Event-listener ABI** with per-event typed listeners - ([#50](https://github.com/logos-messaging/nim-ffi/pull/50)). -- **C++ typed per-event listeners** in the generated bindings - ([#51](https://github.com/logos-messaging/nim-ffi/pull/51)). -- **Rust per-event typed listeners** (`add_on__listener` + wildcard - `add_event_listener`) - ([#52](https://github.com/logos-messaging/nim-ffi/pull/52)) and Rust event - example bindings/clients - ([#53](https://github.com/logos-messaging/nim-ffi/pull/53)). -- **C++ binding generator** with end-to-end tests driven by CMake/CTest - ([#27](https://github.com/logos-messaging/nim-ffi/pull/27)), later expanded - with multi-context, cross-library, pipeline, and stress tests - ([#42](https://github.com/logos-messaging/nim-ffi/pull/42)). -- **CDDL schema generator** for the FFI types - ([#24](https://github.com/logos-messaging/nim-ffi/pull/24)). -- **CI pipeline**: parallel test execution - ([#26](https://github.com/logos-messaging/nim-ffi/pull/26)), - AddressSanitizer / UndefinedBehaviorSanitizer / ThreadSanitizer jobs - ([#34](https://github.com/logos-messaging/nim-ffi/pull/34)), and a - cross-platform OS matrix for the C++ e2e suite - ([#38](https://github.com/logos-messaging/nim-ffi/pull/38)). -- CBOR type-coverage tests - ([#41](https://github.com/logos-messaging/nim-ffi/pull/41)). - -### Fixed -- Use-after-free in the event/context lifetime path - ([#47](https://github.com/logos-messaging/nim-ffi/pull/47)). - -## [0.1.5] - 2026-06-08 - -[Full changelog](https://github.com/logos-messaging/nim-ffi/compare/v0.1.4...v0.1.5) - -### Fixed - -- Recycle FFI contexts in the pool instead of tearing them down per cycle, - stopping a per-cycle file-descriptor leak (#74). - -## [0.1.4] - 2026-05-13 - -[Full changelog](https://github.com/logos-messaging/nim-ffi/compare/v0.1.3...v0.1.4) - -### Added - -- Simplified FFI authoring with auto-generated C++ and Rust language bindings, - including new `ffi/codegen/cpp.nim`, `ffi/codegen/rust.nim` and shared - `ffi/codegen/meta.nim` helpers (#15). -- Rust example bindings and clients under `examples/nim_timer/` (`rust_bindings` - and `rust_client`, the latter with a Tokio async variant) (#15). -- JSON/string-based FFI (de)serialization via `ffi/serial.nim` - (`ffiSerialize`/`ffiDeserialize`), with `tests/test_serial.nim` coverage. - (CBOR replaced this layer later, in 0.2.0.) -- FFI context pool (`ffi/ffi_context_pool.nim`) using a fixed array of contexts. -- Test suite expansion: `test_alloc.nim`, `test_ctx_validation.nim`, - `test_ffi_context.nim`, `test_gc_compat.nim`. -- Continuous integration pipeline (#12). - -### Fixed - -- Context buffer overflow (#21). -- Use a fixed array of contexts to avoid consuming all file descriptors (#14). -- Memory leaks (#11). -- Add `install_name` for macOS shared libraries (#8). - -### Changed - -- Run tests with the `refc` garbage collector (#20). -- Remove `CatchableError` usage (#19). -- Update license files to comply with Logos licensing requirements. - -## [0.1.3] - 2026-01-23 - -### Fixed -- Properly import and re-export `chronicles` so downstream packages get the - logging macros transitively. - -## [0.1.2] - 2026-01-23 - -### Fixed -- Re-export `chronicles` and `std/tables` when the `ffi` module is imported, - so generated code resolves these symbols at the call site. - -## [0.1.1] - 2026-01-23 - -Initial tagged release. - -### Added -- Core `ffi` macro for declaring procs exposed across the FFI boundary. -- `FFIContext` with a dedicated worker thread, request dispatch, and a - watchdog with configurable timeout - ([#7](https://github.com/logos-messaging/nim-ffi/pull/7)). -- License files updated to comply with Logos licensing requirements. -# Changelog - -## [0.1.5] - 2026-06-08 - -[Full changelog](https://github.com/logos-messaging/nim-ffi/compare/v0.1.4...v0.1.5) - -### Fixed - -- Recycle FFI contexts in the pool instead of tearing them down per cycle, - stopping a per-cycle file-descriptor leak (#74). - -## [0.1.4] - 2026-06-02 - -[Full changelog](https://github.com/logos-messaging/nim-ffi/compare/v0.1.3...v0.1.4) - -### Added - -- Simplified FFI authoring with auto-generated C++ and Rust language bindings, - including new `ffi/codegen/cpp.nim`, `ffi/codegen/rust.nim` and shared - `ffi/codegen/meta.nim` helpers (#15). -- Rust example bindings and clients under `examples/nim_timer/` (`rust_bindings` - and `rust_client`, the latter with a Tokio async variant) (#15). -- CBOR serialization support via `ffi/serial.nim`, with `tests/test_serial.nim` - coverage. -- FFI context pool (`ffi/ffi_context_pool.nim`) using a fixed array of contexts. -- Test suite expansion: `test_alloc.nim`, `test_ctx_validation.nim`, - `test_ffi_context.nim`, `test_gc_compat.nim`. -- Continuous integration pipeline (#12). - -### Fixed - -- Context buffer overflow (#21). -- Use a fixed array of contexts to avoid consuming all file descriptors (#14). -- Memory leaks (#11). -- Add `install_name` for macOS shared libraries (#8). - -### Changed - -- Run tests with the `refc` garbage collector (#20). -- Remove `CatchableError` usage (#19). -- Update license files to comply with Logos licensing requirements. diff --git a/examples/echo/cpp_bindings/CMakeLists.txt b/examples/echo/cpp_bindings/CMakeLists.txt index 77dd7aa..6b6bd51 100644 --- a/examples/echo/cpp_bindings/CMakeLists.txt +++ b/examples/echo/cpp_bindings/CMakeLists.txt @@ -58,6 +58,7 @@ add_custom_command( COMMAND "${NIM_EXECUTABLE}" c --mm:orc -d:chronicles_log_level=WARN + -d:noSignalHandler --app:lib --noMain "--nimMainPrefix:libecho" diff --git a/examples/timer/cpp_bindings/CMakeLists.txt b/examples/timer/cpp_bindings/CMakeLists.txt index 162d844..11713cb 100644 --- a/examples/timer/cpp_bindings/CMakeLists.txt +++ b/examples/timer/cpp_bindings/CMakeLists.txt @@ -58,6 +58,7 @@ add_custom_command( COMMAND "${NIM_EXECUTABLE}" c --mm:orc -d:chronicles_log_level=WARN + -d:noSignalHandler --app:lib --noMain "--nimMainPrefix:libmy_timer" diff --git a/examples/timer/rust_bindings/build.rs b/examples/timer/rust_bindings/build.rs index ce9c76f..4d3b9b4 100644 --- a/examples/timer/rust_bindings/build.rs +++ b/examples/timer/rust_bindings/build.rs @@ -32,6 +32,7 @@ fn main() { cmd.arg("c") .arg("--mm:orc") .arg("-d:chronicles_log_level=WARN") + .arg("-d:noSignalHandler") .arg("--app:lib") .arg("--noMain") .arg(format!("--nimMainPrefix:libmy_timer")) diff --git a/ffi.nimble b/ffi.nimble index 9e195a6..edd9558 100644 --- a/ffi.nimble +++ b/ffi.nimble @@ -1,6 +1,6 @@ # ffi.nimble -version = "0.1.6" +version = "0.2.0" author = "Institute of Free Technology" description = "FFI framework with custom header generation" license = "MIT or Apache License 2.0" diff --git a/ffi/codegen/rust.nim b/ffi/codegen/rust.nim index a2acb17..f989c7e 100644 --- a/ffi/codegen/rust.nim +++ b/ffi/codegen/rust.nim @@ -128,6 +128,7 @@ fn main() { cmd.arg("c") .arg("--mm:orc") .arg("-d:chronicles_log_level=WARN") + .arg("-d:noSignalHandler") .arg("--app:lib") .arg("--noMain") .arg(format!("--nimMainPrefix:lib$2")) diff --git a/ffi/codegen/templates/cpp/CMakeLists.txt.tpl b/ffi/codegen/templates/cpp/CMakeLists.txt.tpl index b204d4e..76fe8ef 100644 --- a/ffi/codegen/templates/cpp/CMakeLists.txt.tpl +++ b/ffi/codegen/templates/cpp/CMakeLists.txt.tpl @@ -58,6 +58,7 @@ add_custom_command( COMMAND "${NIM_EXECUTABLE}" c --mm:orc -d:chronicles_log_level=WARN + -d:noSignalHandler --app:lib --noMain "--nimMainPrefix:lib{{LIB}}" diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index c05b301..340476d 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -19,22 +19,22 @@ when not defined(noSignalHandler) and not defined(ffiAllowSignalHandler): "its own process, build with -d:ffiAllowSignalHandler." .} -import std/[atomics, locks, json, tables, sequtils] +import std/[atomics, locks, options, sequtils, tables] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results -import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro, ./logging +import ./ffi_types, ./ffi_events, ./ffi_thread_request, ./logging, ./cbor_serial -type FFICallbackState* = object - ## Holds the C event callback and its associated user-data pointer. - ## Embedded in FFIContext and referenced from the FFI thread via a thread-local. - callback*: pointer - userData*: pointer +export ffi_events type CtxLifecycle {.pure.} = enum ## State machine guarding a pooled FFI context, held as an Atomic on FFIContext. + ## The threads, signals and dispatcher kqueues are created once per slot and + ## REUSED across acquire/release — chronos never frees a dispatcher's kqueue fd + ## (design decision; freed only at process exit), so spawning a thread per + ## context would leak fds unboundedly. Recycling parks the context instead. ## Transitions: - ## Active -> RecyclePending when ffiDtor is invoked - ## RecyclePending -> Recycling The process completed the in-flight processes and is ready for lib cleanup and release - ## Recycling -> Active When the FFI thread is ready again to attend to requests + ## Active -> RecyclePending when the destructor is invoked + ## RecyclePending -> Recycling FFI loop drains handlers, frees lib, releases slot + ## Recycling -> Active next createFFIContext reuses the slot (markAsActive) Active ## accepting and serving requests RecyclePending ## recycle requested; FFI thread loop hasn't claimed it yet Recycling ## FFI loop draining handlers, then frees lib + returns to pool @@ -59,15 +59,15 @@ type FFIContext*[T] = object # advanced each FFI-thread loop; event thread reads for liveness eventQueueStuck*: Atomic[bool] # sticky overflow flag running: Atomic[bool] # To control when the threads are running - lifecycle: Atomic[CtxLifecycle] + lifecycle: Atomic[CtxLifecycle] # Active / RecyclePending / Recycling recycleCallback: FFICallBack - # The destructor's callback, fired by the recycle handler with the outcome: + # destructor's callback, fired by the recycle handler with the outcome: # RET_OK once drained, RET_ERR if it timed out. Set by requestRecycle. recycleUserData: pointer inUse: Atomic[bool] - # Whether the context is claimed. createFFIContext claims it (false -> true); the - # recycle handler clears it once drained. On the context so the owning thread can - # release it without reaching into the pool. + # whether the slot is claimed; createFFIContext claims it, the recycle + # handler clears it once drained so the owning thread can release without + # reaching into the pool. registeredRequests: ptr Table[cstring, FFIRequestProc] var onFFIThread* {.threadvar.}: bool @@ -80,6 +80,21 @@ const FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup FFIHeartbeatStaleThreshold* = 1.seconds +proc tryClaim*[T](ctx: ptr FFIContext[T]): bool = + ## Returns true if the slot was free and is now claimed, false if already in use. + var expected = false + ctx.inUse.compareExchange(expected, true) + +proc release*[T](ctx: ptr FFIContext[T]) = + ctx.inUse.store(false) + +proc isInUse*[T](ctx: ptr FFIContext[T]): bool = + ctx.inUse.load() + +proc markAsActive*[T](ctx: ptr FFIContext[T]) = + ## Re-arms a reused (recycled) slot to accept requests again. + ctx.lifecycle.store(CtxLifecycle.Active) + include ./event_thread include ./ffi_thread @@ -91,285 +106,6 @@ template closeAndNil(field: untyped) = proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## Mirror of `initContextResources`. Threads MUST be joined first; ## fields are nil'd after close so re-init on the same slot is safe. -template callEventCallback*(ctx: ptr FFIContext, eventName: string, body: untyped) = - if isNil(ctx[].callbackState.callback): - chronicles.error eventName & " - eventCallback is nil" - return - - foreignThreadGc: - try: - let event = body - cast[FFICallBack](ctx[].callbackState.callback)( - RET_OK, - unsafeAddr event[0], - cast[csize_t](len(event)), - ctx[].callbackState.userData, - ) - except Exception, CatchableError: - let msg = - "Exception " & eventName & " when calling 'eventCallBack': " & - getCurrentExceptionMsg() - cast[FFICallBack](ctx[].callbackState.callback)( - RET_ERR, - unsafeAddr msg[0], - cast[csize_t](len(msg)), - ctx[].callbackState.userData, - ) - -template dispatchFfiEvent*(eventName: string, body: untyped) = - ## Dispatches an FFI event to the callback registered via `{libName}_set_event_callback`. - ## `body` is evaluated lazily — only when a callback is registered. - ## Valid only on the FFI thread (i.e., inside {.ffi.} proc bodies and their async closures). - let ffiState = ffiCurrentCallbackState - if isNil(ffiState) or isNil(ffiState[].callback): - chronicles.error eventName & " - event callback not set" - return - foreignThreadGc: - try: - let event = body - cast[FFICallBack](ffiState[].callback)( - RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ffiState[].userData - ) - except Exception, CatchableError: - let msg = "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg() - cast[FFICallBack](ffiState[].callback)( - RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ffiState[].userData - ) - -proc sendRequestToFFIThread*( - ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration -): Result[void, string] = - ctx.lock.acquire() - defer: - ctx.lock.release() - - if ctx.lifecycle.load() != CtxLifecycle.Active: - deleteRequest(ffiRequest) - return err("FFI context is not accepting requests (being recycled)") - - ## Sending the request to the FFI thread - let sentOk = ctx.reqChannel.trySend(ffiRequest) - if not sentOk: - deleteRequest(ffiRequest) - return err("Couldn't send a request to the ffi thread") - - let fireSyncRes = ctx.reqSignal.fireSync() - if fireSyncRes.isErr(): - deleteRequest(ffiRequest) - return err("failed fireSync: " & $fireSyncRes.error) - - if fireSyncRes.get() == false: - deleteRequest(ffiRequest) - return err("Couldn't fireSync in time") - - ## wait until the FFI working thread properly received the request - let res = ctx.reqReceivedSignal.waitSync(timeout) - if res.isErr(): - return err("Couldn't receive reqReceivedSignal signal") - - return ok() - -type Foo = object -registerReqFFI(WatchdogReq, foo: ptr Foo): - proc(): Future[Result[string, string]] {.async.} = - return ok("FFI thread is not blocked") - -type JsonNotRespondingEvent = object - eventType: string - -proc init(T: type JsonNotRespondingEvent): T = - return JsonNotRespondingEvent(eventType: "not_responding") - -proc `$`(event: JsonNotRespondingEvent): string = - $(%*event) - -proc onNotResponding*(ctx: ptr FFIContext) = - callEventCallback(ctx, "onNotResponding"): - $JsonNotRespondingEvent.init() - -proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = - ## Watchdog thread that monitors the FFI thread and notifies the library user if it hangs. - ## This thread never blocks. - - let watchdogRun = proc(ctx: ptr FFIContext) {.async.} = - const WatchdogStartDelay = 10.seconds - const WatchdogTimeinterval = 1.seconds - const WatchdogTimeout = 20.seconds - - # Give time for the node to be created and up before sending watchdog requests - let initialStop = await ctx.stopSignal.wait().withTimeout(WatchdogStartDelay) - if initialStop or ctx.running.load == false: - return - - while true: - let intervalStop = await ctx.stopSignal.wait().withTimeout(WatchdogTimeinterval) - - if intervalStop or ctx.running.load == false: - debug "Watchdog thread exiting because FFIContext is not running" - break - - if ctx.lifecycle.load() != CtxLifecycle.Active: - continue - - let callback = proc( - callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer - ) {.cdecl, gcsafe, raises: [].} = - discard ## Don't do anything. Just respecting the callback signature. - const nilUserData = nil - - trace "Sending watchdog request to FFI thread" - - try: - sendRequestToFFIThread( - ctx, WatchdogReq.ffiNewReq(callback, nilUserData), WatchdogTimeout - ).isOkOr: - error "Failed to send watchdog request to FFI thread", error = $error - onNotResponding(ctx) - except Exception as exc: - error "Exception sending watchdog request", exc = exc.msg - onNotResponding(ctx) - - waitFor watchdogRun(ctx) - -proc processRequest[T]( - request: ptr FFIThreadRequest, ctx: ptr FFIContext[T] -) {.async.} = - ## Invoked within the FFI thread to process a request coming from the FFI API consumer thread. - - let reqId = $request[].reqId - ## The reqId determines which proc will handle the request. - ## The registeredRequests represents a table defined at compile time. - ## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously] - - let reqIdCs = reqId.cstring - # keep `reqId` alive and avoid the implicit string→cstring warning. - - let retFut = - if not ctx[].registeredRequests[].contains(reqIdCs): - ## That shouldn't happen because only registered requests should be sent to the FFI thread. - nilProcess(request[].reqId) - else: - ctx[].registeredRequests[][reqIdCs](request[].reqContent, ctx) - - let res = - try: - await retFut - except CancelledError as exc: - Result[string, string].err("Request cancelled during destroy: " & exc.msg) - except AsyncError as exc: - Result[string, string].err( - "Async error in processRequest for " & reqId & ": " & exc.msg - ) - - ## handleRes may raise (OOM, GC setup) even though it is rare. - try: - handleRes(res, request) - except Exception as exc: - error "Unexpected exception in handleRes", error = exc.msg - -proc freeLib[T](ctx: ptr FFIContext[T]) {.gcsafe.} = - if ctx.myLib.isNil(): - return - - when not defined(gcRefc): - {.cast(gcsafe).}: - `=destroy`(ctx.myLib[]) - else: - discard - freeShared(ctx.myLib) - ctx.myLib = nil - -var RecycleTimeout* = 1500.milliseconds - ## Upper bound the recycle handler waits for in-flight handlers before it - ## cancels them and reports the ctx as stuck. The drain returns as soon as they - ## finish, so this only bounds a *stuck* handler. A `var` so tests can shorten it. - -proc recycleContext[T]( - ctx: ptr FFIContext[T], ongoingProcessReq: ptr seq[Future[void]] -) {.async.} = - ## Drain the in-flight handlers, free the lib object, release the context for reuse, - ## and fire the callback with the outcome. Never blocks the caller. - - ongoingProcessReq[].keepItIf(not it.finished()) - - ## 1. Let the in-flight handlers finish on their own, bounded by RecycleTimeout. - var naturallyDrained = ongoingProcessReq[].len == 0 - if not naturallyDrained: - naturallyDrained = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout) - - ## 2. If any are wedged, cancel them and give the cancellations a bounded moment - ## to unwind, so the context can be reclaimed rather than leaked. - var safeToRecycle = naturallyDrained - if not naturallyDrained: - for fut in ongoingProcessReq[]: - if not fut.finished(): - fut.cancelSoon() - safeToRecycle = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout) - - let cb = ctx.recycleCallback - let ud = ctx.recycleUserData - ctx.recycleCallback = nil - - if safeToRecycle: - freeLib(ctx) - ctx.callbackState = default(FFICallbackState) - ongoingProcessReq[].setLen(0) - ctx.release() - - if not cb.isNil(): - foreignThreadGc: - let msg = - if naturallyDrained: - "" - else: - "recycle: in-flight requests did not finish in time" - let cmsg = msg.cstring - let retCode = if naturallyDrained: RET_OK else: RET_ERR - cb(retCode, unsafeAddr cmsg[0], cast[csize_t](msg.len), ud) - -proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = - ## FFI thread body that attends library user API requests - ffiCurrentCallbackState = addr ctx[].callbackState - - logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) - - defer: - let fireRes = ctx.threadExitSignal.fireSync() - if fireRes.isErr(): - error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error - - let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = - - var ongoingProcessReq: seq[Future[void]] - - while ctx.running.load(): - var expected = CtxLifecycle.RecyclePending - if ctx.lifecycle.compareExchange(expected, CtxLifecycle.Recycling): - await recycleContext(ctx, addr ongoingProcessReq) - continue - - let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) - if not gotSignal: - continue - - ## Wait for a request from the ffi consumer thread - var request: ptr FFIThreadRequest - if not ctx.reqChannel.tryRecv(request): - continue - - ongoingProcessReq.keepItIf(not it.finished()) - ongoingProcessReq.add(processRequest(request, ctx)) - - let fireRes = ctx.reqReceivedSignal.fireSync() - if fireRes.isErr(): - error "could not fireSync back to requester thread", error = fireRes.error - - waitFor ffiRun(ctx) - -proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Full cleanup for heap-allocated contexts: closes all resources and frees memory. - defer: - freeShared(ctx) ctx.lock.deinitLock() deinitEventRegistry(ctx[].eventRegistry) deinitEventQueue(ctx[].eventQueue) @@ -460,14 +196,21 @@ proc fireOrErr(sig: ThreadSignalPtr, name: string): Result[void, string] = return err("failed to signal: " & name & " on time") ok() -proc waitExitOrErr( - sig: ThreadSignalPtr, name: string, timeout: Duration -): Result[void, string] = - let exited = sig.waitSync(timeout).valueOr: - return err("error waiting for exit: " & name & ": " & $error) - if not exited: - return err("did not exit in time: " & name & " (leaking ctx to avoid hang)") - ok() +proc reachedExitOrTimedOut(sig: ThreadSignalPtr, timeout: Duration): bool = + ## Best-effort bounded pre-check before joining a stopping thread. + ## Returns false ONLY on a genuine timeout (the exit signal was not observed + ## within `timeout`, so the thread may be wedged and the caller should skip + ## the join to avoid hanging). Returns true otherwise — including when + ## `waitSync` itself errors: it uses `select()`, which returns EINVAL once a + ## signal fd exceeds FD_SETSIZE under load. That error is NOT evidence the + ## thread is stuck (it was already signaled to stop and the async event loop + ## that drives its exit is unaffected), so we proceed to the authoritative, + ## fd-free joinThread rather than spuriously failing teardown and leaking the + ## pool slot. + let waited = sig.waitSync(timeout) + if waited.isOk() and not waited.get(): + return false # genuine timeout + true proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = # Skip onNotResponding on error: it takes reg.lock, which a back-pressuring @@ -490,26 +233,34 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.signalStop().isOkOr: return err("signalStop failed: " & $error) - ?ctx.threadExitSignal.waitExitOrErr("FFI thread", ThreadExitTimeout) + if not ctx.threadExitSignal.reachedExitOrTimedOut(ThreadExitTimeout): + return err("FFI thread did not exit in time (leaking ctx to avoid hang)") joinThread(ctx.ffiThread) - ?ctx.eventThreadExitSignal.waitExitOrErr("event thread", ThreadExitTimeout) + if not ctx.eventThreadExitSignal.reachedExitOrTimedOut(ThreadExitTimeout): + return err("event thread did not exit in time (leaking ctx to avoid hang)") joinThread(ctx.eventThread) ok() +proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Stops a heap-allocated FFI context. + ctx.stopAndJoinThreads().isOkOr: + return err("clearContext: " & $error) + ctx.cleanUpResources().isOkOr: + return err("cleanUpResources failed: " & $error) + ok() + proc requestRecycle*[T]( ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer ): Result[void, string] = - ## Starts ctx recycle process without stopping its worker, so the next - ## createFFIContext reuses the same threads and fds. - ## - ## During recycling, the FFI thread drains the handlers, frees the lib and releases - ## the context, then fires `callback` (RET_OK drained, RET_ERR stuck). - + ## Starts the context's recycle WITHOUT stopping its worker threads, so the + ## next createFFIContext reuses the same threads, signals and kqueue fds. + ## The FFI thread loop drains the in-flight handlers, frees the lib, clears the + ## per-context state and releases the slot, then fires `callback` + ## (RET_OK drained, RET_ERR stuck). Non-blocking. ctx.lock.acquire() if ctx.lifecycle.load() != CtxLifecycle.Active: ctx.lock.release() return err("requestRecycle: context is not Active (already recycling)") - ctx.recycleCallback = callback ctx.recycleUserData = userData ctx.lifecycle.store(CtxLifecycle.RecyclePending) @@ -519,18 +270,4 @@ proc requestRecycle*[T]( return err("requestRecycle: failed to signal the FFI thread: " & $error) if not fired: return err("requestRecycle: failed to signal the FFI thread in time") - return ok() - -proc markAsActive*[T](ctx: ptr FFIContext[T]) = - ctx.lifecycle.store(CtxLifecycle.Active) - -proc tryClaim*[T](ctx: ptr FFIContext[T]): bool = - ## Returns true if acquired the contex, false if it was already claimed. - var expected = false - ctx.inUse.compareExchange(expected, true) - -proc release*[T](ctx: ptr FFIContext[T]) = - ctx.inUse.store(false) - -proc isInUse*[T](ctx: ptr FFIContext[T]): bool = - ctx.inUse.load() + ok() diff --git a/ffi/ffi_context_pool.nim b/ffi/ffi_context_pool.nim index 421a636..179fb8f 100644 --- a/ffi/ffi_context_pool.nim +++ b/ffi/ffi_context_pool.nim @@ -4,23 +4,28 @@ import ./ffi_context, ./ffi_types const MaxFFIContexts* = 32 ## Maximum number of concurrently live FFI contexts when using FFIContextPool. + ## Each slot's threads, signals and dispatcher kqueue fds are created once and + ## reused, so this also caps the process's steady-state fd usage. type FFIContextPool*[T] = object contexts: array[MaxFFIContexts, FFIContext[T]] initialized: array[MaxFFIContexts, Atomic[bool]] + ## Whether the slot's worker has been built. Once true it stays true for the + ## process lifetime — destroy recycles the context rather than tearing it down. proc createFFIContext*[T]( pool: var FFIContextPool[T] ): Result[ptr FFIContext[T], string] = - ## Acquires a context from the fixed pool. The context's worker is built once on - ## first use and reused on every later acquisition. - + ## Acquires a context from the fixed pool. The worker (threads + signals + + ## dispatcher kqueues) is built once on first use and REUSED on every later + ## acquisition — chronos never frees a dispatcher's kqueue fd, so a + ## thread-per-context model would leak fds unboundedly. for i in 0 ..< MaxFFIContexts: let ctx = pool.contexts[i].addr if not ctx.tryClaim(): continue if pool.initialized[i].load(): - ## Reused context: a prior destroy drained and released it, worker still alive. + ## Reused slot: a prior destroy drained and parked it; worker still alive. ctx.markAsActive() return ok(ctx) initContextResources(ctx).isOkOr: @@ -33,13 +38,16 @@ proc createFFIContext*[T]( proc releaseFFIContext*[T]( ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer ): Result[void, string] = - return ctx.requestRecycle(callback, userData) + ## Recycle/park the context for reuse without stopping its worker threads. + ## `callback` fires once the FFI thread has drained and parked it. + ctx.requestRecycle(callback, userData) proc destroyFFIContext*[T]( pool: var FFIContextPool[T], ctx: ptr FFIContext[T] ): Result[void, string] = ## Full teardown: stops/joins the worker threads and returns the context to the - ## pool, marking it uninitialised so a later createFFIContext rebuilds it. + ## pool, marking it uninitialised so a later createFFIContext rebuilds it. Only + ## for process/pool shutdown — normal destruction uses releaseFFIContext. ctx.stopAndJoinThreads().isOkOr: return err("destroyFFIContext(pool): " & $error) for i in 0 ..< MaxFFIContexts: @@ -50,8 +58,8 @@ proc destroyFFIContext*[T]( return ok() proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool = - ## Returns true only if ctx points to one of the pool's contexts that is - ## currently in use. + ## True only if ctx points to one of the pool's contexts that is currently + ## claimed (in use). Rejects nil / dangling / parked contexts at the API boundary. if ctx.isNil(): return false for i in 0 ..< MaxFFIContexts: diff --git a/ffi/ffi_thread.nim b/ffi/ffi_thread.nim index 9685a45..5de7684 100644 --- a/ffi/ffi_thread.nim +++ b/ffi/ffi_thread.nim @@ -28,6 +28,13 @@ proc sendRequestToFFIThread*( defer: ctx.lock.release() + # Reject once recycling has been requested: the slot is being drained/parked + # and its handler table/lib are about to be torn down. requestRecycle flips + # this under the same lock, so the check is race-free. + if ctx.lifecycle.load() != CtxLifecycle.Active: + deleteRequest(ffiRequest) + return err("FFI context is not accepting requests (being recycled)") + let sentOk = ctx.reqChannel.trySend(ffiRequest) if not sentOk: deleteRequest(ffiRequest) @@ -42,10 +49,15 @@ proc sendRequestToFFIThread*( deleteRequest(ffiRequest) return err("Couldn't fireSync in time") + # The request is now owned by the FFI thread: it has been queued and the FFI + # thread signaled, so it WILL be received, processed, and answered through the + # callback via handleRes. The waitSync below only confirms prompt receipt; its + # failure (e.g. EINVAL/EINTR under load or signal teardown) must NOT be + # surfaced as an error, or the caller would fire the callback a SECOND time + # while handleRes also fires it — a double callback onto a freed response. let res = ctx.reqReceivedSignal.waitSync(timeout) if res.isErr(): - # FFI thread was signaled and owns the request; don't double-free. - return err("Couldn't receive reqReceivedSignal signal") + return ok() # On ok the FFI thread's processRequest deallocShared(req)'s. ok() @@ -79,6 +91,75 @@ proc processRequest[T]( except Exception as e: error "Unexpected exception in handleRes", error = e.msg +proc freeLib[T](ctx: ptr FFIContext[T]) {.gcsafe.} = + ## Frees the createShared'd library object built by the ctor. Logical resource + ## cleanup is the destructor body's job; this only releases the Nim storage. + if ctx.myLib.isNil(): + return + when not defined(gcRefc): + {.cast(gcsafe).}: + `=destroy`(ctx.myLib[]) + else: + discard + freeShared(ctx.myLib) + ctx.myLib = nil + +var RecycleTimeout* = 1500.milliseconds + ## Upper bound the recycle handler waits for in-flight handlers before it + ## cancels them and reports the ctx as stuck. Returns as soon as they finish, + ## so this only bounds a *stuck* handler. A `var` so tests can shorten it. + +proc recycleContext[T]( + ctx: ptr FFIContext[T], ongoingProcessReq: ptr seq[Future[void]] +) {.async.} = + ## Runs on the FFI thread. Drains in-flight handlers, frees the lib, clears the + ## per-context event state, releases the slot for reuse, and fires the recycle + ## callback. The worker threads (and their kqueue fds) stay alive for reuse. + ongoingProcessReq[].keepItIf(not it.finished()) + + ## 1. Let in-flight handlers finish on their own, bounded by RecycleTimeout. + var naturallyDrained = ongoingProcessReq[].len == 0 + if not naturallyDrained: + naturallyDrained = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout) + + ## 2. If any are wedged, cancel them and give the cancellations a bounded moment. + var safeToRecycle = naturallyDrained + if not naturallyDrained: + for fut in ongoingProcessReq[]: + if not fut.finished(): + fut.cancelSoon() + safeToRecycle = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout) + + let cb = ctx.recycleCallback + let ud = ctx.recycleUserData + ctx.recycleCallback = nil + + if safeToRecycle: + freeLib(ctx) + # Reset per-context event state so a reused slot starts clean: drop listeners + # and drain/free any queued events so they aren't delivered to the next owner. + removeAllEventListeners(ctx[].eventRegistry) + while true: + let opt = ctx.eventQueue.tryDequeueEvent() + if opt.isNone(): + break + let qe = opt.get() + freeEventBuffers(qe.name, qe.data) + ctx.eventQueueStuck.store(false) + ongoingProcessReq[].setLen(0) + ctx.release() + + if not cb.isNil(): + foreignThreadGc: + let msg = + if naturallyDrained: + "" + else: + "recycle: in-flight requests did not finish in time" + let cmsg = msg.cstring + let retCode = if naturallyDrained: RET_OK else: RET_ERR + cb(retCode, unsafeAddr cmsg[0], cast[csize_t](msg.len), ud) + var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr # Stashed so the hook has no closure env. @@ -121,6 +202,14 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = pending.del(i) while ctx.running.load(): + # A destructor requested recycle: drain + free + park this slot for reuse, + # keeping the threads (and their kqueue fds) alive. Claim it atomically so + # only this loop runs the recycle. + var expectedRecycle = CtxLifecycle.RecyclePending + if ctx.lifecycle.compareExchange(expectedRecycle, CtxLifecycle.Recycling): + await recycleContext(ctx, addr pending) + continue + # Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread. discard ctx.ffiHeartbeat.fetchAdd(1) diff --git a/ffi/internal/ffi_library.nim b/ffi/internal/ffi_library.nim index ed04d9e..7506fbf 100644 --- a/ffi/internal/ffi_library.nim +++ b/ffi/internal/ffi_library.nim @@ -139,6 +139,12 @@ macro declareLibrary*(libraryName: static[string], libType: untyped): untyped = let addName = libraryName & "_add_event_listener" let addErr = "error: invalid context in " & addName let addBody = quote: + # Sets up the calling (foreign) thread's GC before any Nim allocation + # below ($eventName / registry table+seq ops). Request entry procs do the + # same; without it a foreign thread with no Nim heap segfaults in the + # allocator under GC pressure. + when declared(initializeLibrary): + initializeLibrary() var ret: uint64 = 0 if isNil(ctx): echo `addErr` @@ -173,6 +179,10 @@ macro declareLibrary*(libraryName: static[string], libType: untyped): untyped = let removeName = libraryName & "_remove_event_listener" let removeErr = "error: invalid context in " & removeName let removeBody = quote: + # See add_event_listener: removeEventListener mutates the registry's + # GC-allocated table/seqs, so the calling foreign thread needs a GC. + when declared(initializeLibrary): + initializeLibrary() var ret: cint = 1 if isNil(ctx): echo `removeErr` diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index 714abd0..a1b74fb 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -750,6 +750,14 @@ macro ffi*(prc: untyped): untyped = let ffiBody = newStmtList() + # Set up the calling (foreign) thread's GC before any Nim allocation in + # this wrapper (e.g. `$reqTypeName` below). Without it a foreign thread + # with no Nim heap segfaults in the allocator under GC pressure. `ffiRaw` + # already does this; the `.ffi.` path must too. + ffiBody.add quote do: + when declared(initializeLibrary): + initializeLibrary() + ffiBody.add quote do: if callback.isNil: return RET_MISSING_CALLBACK @@ -1246,22 +1254,25 @@ macro ffiCtor*(prc: untyped): untyped = return stmts macro ffiDtor*(prc: untyped): untyped = - ## Defines a C-exported destructor that tears down the FFIContext after the - ## body runs. + ## Defines a C-exported destructor that RECYCLES the FFIContext after the + ## body runs — it parks the context for reuse instead of tearing down its + ## worker threads, because chronos never frees a dispatcher's kqueue fd, so a + ## thread-per-context teardown would leak fds unboundedly. ## ## The annotated proc must have exactly one parameter of the library type. - ## The body contains any library-level cleanup to run before context teardown. + ## The body contains any library-level cleanup to run before recycle. ## ## Example: - ## proc mylibobj_destroy*(obj: MyLibObj) {.ffiDtor.} = - ## obj.cleanup() + ## proc waku_destroy*(w: Waku) {.ffiDtor.} = + ## w.cleanup() ## ## The generated C-exported proc has the signature: - ## cint mylibobj_destroy(void* ctx, FfiCallback callback, void* userData) + ## int waku_destroy(void* ctx, FfiCallback callback, void* userData) ## - ## Recycle the context for reuse to keep fd usage bounded. - ## NON-BLOCKING: returns RET_OK once accepted; - ## the real outcome arrives via `callback`. + ## NON-BLOCKING: it runs the body, requests recycle, and returns RET_OK once + ## accepted; the real outcome (RET_OK drained / RET_ERR stuck) arrives via + ## `callback`. Returns RET_ERR synchronously only for a null/invalid ctx or a + ## rejected recycle request. let procName = prc[0] let formalParams = prc[3] @@ -1270,8 +1281,8 @@ macro ffiDtor*(prc: untyped): untyped = if formalParams.len < 2: error("ffiDtor: proc must have exactly one parameter (w: LibType)") - let libParamName = formalParams[1][0] # e.g. w - let libTypeName = formalParams[1][1] # e.g. MyLibObj + let libParamName = formalParams[1][0] + let libTypeName = formalParams[1][1] let procNameStr = block: let raw = $procName @@ -1288,7 +1299,7 @@ macro ffiDtor*(prc: untyped): untyped = if procName.kind == nnkPostfix: cExportProcName = procName[1] - let releaseResIdent = genSym(nskLet, "destroyRes") + let releaseResIdent = genSym(nskLet, "releaseRes") let ffiBody = newStmtList() @@ -1298,6 +1309,9 @@ macro ffiDtor*(prc: untyped): untyped = ffiBody.add quote do: if ctx.isNil or cast[ptr FFIContext[`libTypeName`]](ctx)[].myLib.isNil: + if not callback.isNil: + let errStr = "destroy: invalid context" + callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData) return RET_ERR ffiBody.add quote do: @@ -1311,21 +1325,28 @@ macro ffiDtor*(prc: untyped): untyped = if not isNoop: ffiBody.add(bodyNode) - let poolIdent = ident($libTypeName & "FFIPool") ffiBody.add quote do: - let `releaseResIdent` = releaseFFIContext( - cast[ptr FFIContext[`libTypeName`]](ctx), callback, userData - ) + # Recycle (park) the context for reuse instead of tearing down its threads. + # The callback fires from the recycle handler once drained. + let `releaseResIdent` = + releaseFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx), callback, userData) if `releaseResIdent`.isErr(): - if not callback.isNil(): - let errStr = "release failed: " & $`releaseResIdent`.error + if not callback.isNil: + let errStr = "destroy: " & `releaseResIdent`.error callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData) return RET_ERR return RET_OK + let poolIdent = ident($libTypeName & "FFIPool") + let ffiProc = newProc( name = postfix(cExportProcName, "*"), - params = @[ident("cint"), newIdentDefs(ident("ctx"), ident("pointer"))], + params = @[ + ident("cint"), + newIdentDefs(ident("ctx"), ident("pointer")), + newIdentDefs(ident("callback"), ident("FFICallBack")), + newIdentDefs(ident("userData"), ident("pointer")), + ], body = ffiBody, pragmas = newTree( nnkPragma, diff --git a/tests/test_ffi_context.nim b/tests/test_ffi_context.nim deleted file mode 100644 index bf6989c..0000000 --- a/tests/test_ffi_context.nim +++ /dev/null @@ -1,892 +0,0 @@ -import std/[locks, strutils, os, osproc, sequtils] -import unittest2 -import results -import ../ffi - -type TestLib = object - -## Per-request callback state. The test thread blocks on `cond` until the -## FFI thread signals it — no polling, no CPU waste. -type CallbackData = object - lock: Lock - cond: Cond - called: bool - retCode: cint - msg: array[512, char] - msgLen: int - -proc initCallbackData(d: var CallbackData) = - d.lock.initLock() - d.cond.initCond() - -proc deinitCallbackData(d: var CallbackData) = - d.cond.deinitCond() - d.lock.deinitLock() - -proc testCallback( - retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer -) {.cdecl, gcsafe, raises: [].} = - let d = cast[ptr CallbackData](userData) - acquire(d[].lock) - d[].retCode = retCode - let n = min(int(len), d[].msg.high) - if n > 0 and not msg.isNil: - copyMem(addr d[].msg[0], msg, n) - d[].msg[n] = '\0' - d[].msgLen = n - d[].called = true - signal(d[].cond) - release(d[].lock) - -proc waitCallback(d: var CallbackData) = - acquire(d.lock) - while not d.called: - wait(d.cond, d.lock) - release(d.lock) - -proc callbackMsg(d: var CallbackData): string = - result = newString(d.msgLen) - if d.msgLen > 0: - copyMem(addr result[0], addr d.msg[0], d.msgLen) - -registerReqFFI(PingRequest, lib: ptr TestLib): - proc(message: cstring): Future[Result[string, string]] {.async.} = - return ok("pong:" & $message) - -registerReqFFI(FailRequest, lib: ptr TestLib): - proc(): Future[Result[string, string]] {.async.} = - return err("intentional failure") - -registerReqFFI(EmptyOkRequest, lib: ptr TestLib): - proc(): Future[Result[string, string]] {.async.} = - return ok("") - -registerReqFFI(SlowRequest, lib: ptr TestLib): - proc(): Future[Result[string, string]] {.async.} = - await sleepAsync(500.milliseconds) - return ok("slow-done") - -# Coordination channel: the FFI handler signals the test thread the instant -# it is about to block the event loop, so the test can call destroyFFIContext -# while the event loop is truly frozen. -var gSyncBlockStarted: Channel[bool] -gSyncBlockStarted.open() - -registerReqFFI(SyncBlockingRequest, lib: ptr TestLib): - proc(): Future[Result[string, string]] {.async.} = - # Yield first so that reqReceivedSignal fires and sendRequestToFFIThread - # returns on the calling thread before we start the synchronous block. - await sleepAsync(0.milliseconds) - # Signal the test thread: the event loop is about to be frozen. - # Channel.send is annotated as raising under refc, so wrap. - try: - gSyncBlockStarted.send(true) - except Exception as exc: - return err("gSyncBlockStarted.send raised: " & exc.msg) - # Simulates a request that blocks the event-loop thread synchronously - # (e.g. w.stop() -> switch.stop() -> connManager.close() with blocking I/O). - # Unlike sleepAsync, os.sleep holds the OS thread and prevents Chronos from - # processing any callbacks -- including the reqSignal fired by destroyFFIContext. - os.sleep(5_000) - return ok("sync-blocking-done") - -# Approximates the heavy ref-object workload that libwaku/libp2p performs on -# the FFI thread. The exact cell count is large enough to force several refc -# GC cycles; under refc this stresses the heap state that, when later combined -# with a chronos Selector allocation on the main thread (via close()), used to -# trip the rawNewObj → signal-handler infinite recursion. -type RefCell = ref object - next: RefCell - payload: array[64, byte] - -registerReqFFI(HeavyRefAllocRequest, lib: ptr TestLib): - proc(): Future[Result[string, string]] {.async.} = - var head: RefCell - for i in 0 ..< 50_000: - let n = RefCell(next: head) - head = n - if i mod 1000 == 0: - await sleepAsync(0.milliseconds) - # Break the chain iteratively before releasing head. - # ORC's =destroy for RefCell recurses through .next, so a 50k-node chain - # would produce ~50k nested =destroy calls and overflow the stack. - # Walking the list and unlinking each node first keeps destruction O(n) - # iterative instead of O(n) recursive. - var node = head - head = nil - while not node.isNil(): - let nxt = node.next - node.next = nil # unlink before the refcount of `node` can drop to zero - node = nxt - await sleepAsync(10.milliseconds) - return ok("heavy-done") - -suite "FFIContextPool": - test "create and destroy via pool succeeds": - var pool: FFIContextPool[TestLib] - let ctx = pool.createFFIContext().valueOr: - assert false, "createFFIContext(pool) failed: " & $error - return - check pool.destroyFFIContext(ctx).isOk() - - test "context is reused after destroy": - var pool: FFIContextPool[TestLib] - let ctx1 = pool.createFFIContext().valueOr: - assert false, "createFFIContext(pool) failed: " & $error - return - check pool.destroyFFIContext(ctx1).isOk() - # After destroying, the same context must be available again - let ctx2 = pool.createFFIContext().valueOr: - assert false, "createFFIContext(pool) failed after context release: " & $error - return - check pool.destroyFFIContext(ctx2).isOk() - check ctx1 == ctx2 # same context reused - - test "pool exhaustion returns error": - var pool: FFIContextPool[TestLib] - var ctxs: array[MaxFFIContexts, ptr FFIContext[TestLib]] - for i in 0 ..< MaxFFIContexts: - ctxs[i] = pool.createFFIContext().valueOr: - for j in 0 ..< i: - discard pool.destroyFFIContext(ctxs[j]) - assert false, "createFFIContext(pool) failed at context " & $i & ": " & $error - return - # Pool is now full — next create must fail - check pool.createFFIContext().isErr() - for i in 0 ..< MaxFFIContexts: - discard pool.destroyFFIContext(ctxs[i]) - - test "requests are processed via pool context": - var pool: FFIContextPool[TestLib] - var d: CallbackData - initCallbackData(d) - defer: - deinitCallbackData(d) - - let ctx = pool.createFFIContext().valueOr: - assert false, "createFFIContext(pool) failed: " & $error - return - defer: - discard pool.destroyFFIContext(ctx) - - check sendRequestToFFIThread( - ctx, PingRequest.ffiNewReq(testCallback, addr d, "pool".cstring) - ) - .isOk() - waitCallback(d) - check d.retCode == RET_OK - check callbackMsg(d) == "pong:pool" - -suite "createFFIContext / destroyFFIContext": - test "create and destroy succeeds": - var pool: FFIContextPool[TestLib] - let ctx = pool.createFFIContext().valueOr: - checkpoint "createFFIContext failed: " & $error - check false - return - check pool.destroyFFIContext(ctx).isOk() - - test "double destroy is safe via running flag": - var pool: FFIContextPool[TestLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - check pool.destroyFFIContext(ctx).isOk() - -suite "destroyFFIContext does not hang": - test "destroy while a slow async request is still in-flight": - ## Reproduces the race where destroyFFIContext was called while a long- - ## running async request (e.g. stop_node / w.stop()) was still executing. - ## The destroy must return well within 2 seconds; before the fix it would - ## block forever on joinThread(ffiThread). - var pool: FFIContextPool[TestLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - - var d: CallbackData - initCallbackData(d) - defer: deinitCallbackData(d) - - # sendRequestToFFIThread returns as soon as the FFI thread ACKs receipt; - # the 500 ms work continues asynchronously on the FFI thread. - check sendRequestToFFIThread( - ctx, SlowRequest.ffiNewReq(testCallback, addr d) - ).isOk() - - # Destroy immediately while SlowRequest is still running. - let t0 = Moment.now() - check pool.destroyFFIContext(ctx).isOk() - check (Moment.now() - t0) < 2.seconds - -suite "destroyFFIContext does not hang when event loop is blocked": - test "destroy while sync-blocking request is in-flight": - ## Reproduces the hang seen in logosdelivery_example.c: - ## logosdelivery_stop_node(...) -- triggers w.stop() on the FFI thread - ## sleep(1) - ## logosdelivery_destroy(...) -- hangs forever - ## - ## Root cause: w.stop() (and similar tear-down calls) can execute a - ## synchronous blocking section that holds the OS thread, preventing - ## the Chronos event loop from processing the reqSignal fired by - ## destroyFFIContext. The result is joinThread(ffiThread) never returns. - ## - ## With the fix, destroyFFIContext must complete well within the 5 s that - ## SyncBlockingRequest holds the event loop. - var pool: FFIContextPool[TestLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - - # CallbackData and ctx are kept alive past destroyFFIContext: the leaked - # FFI thread is still inside os.sleep(5_000) and will eventually wake, - # run handleRes, fire testCallback, and exit normally. We wait for that - # to happen at the end of the test so the leaked thread cannot race with - # subsequent tests' createFFIContext on Linux/Windows. Heap allocation - # ensures the late callback's userData is still valid when it fires. - let d = createShared(CallbackData) - initCallbackData(d[]) - - check sendRequestToFFIThread( - ctx, SyncBlockingRequest.ffiNewReq(testCallback, d) - ).isOk() - - # Block until the FFI handler has signalled that os.sleep is about to start. - # This guarantees destroyFFIContext is called while the event loop is frozen. - discard gSyncBlockStarted.recv() - - # Destroy must return promptly even though the event loop is frozen for 5s. - # It deliberately returns err and leaks ctx in this scenario rather than - # hanging on joinThread. - let t0 = Moment.now() - check pool.destroyFFIContext(ctx).isErr() - check (Moment.now() - t0) < 3.seconds - - # Drain the leaked thread before the test scope ends. - # 1. waitCallback blocks until os.sleep(5_000) returns and handleRes - # invokes testCallback (~3.5s after destroy returned), which proves - # the leaked thread has reached the end of processRequest. - # 2. Yield briefly so the thread can finish iterating its while loop, - # fire threadExitSignal in its defer, and return. Without this, on - # Linux/Windows the still-live thread can race with the next test's - # createFFIContext under --mm:orc and segfault. - # ctx.cleanUpResources is intentionally NOT called: destroyFFIContext - # skipped it for a reason, and the signal fds are reclaimed by the OS - # at process exit. - waitCallback(d[]) - os.sleep(200) - deinitCallbackData(d[]) - freeShared(d) - -suite "destroyFFIContext refc workaround": - ## Documents the refc-specific workaround in cleanUpResources. - ## - ## Background: when the FFI thread does heavy ref-object work (the workload - ## that triggered the libwaku hang in production), the refc GC heap reaches - ## a state where the very first chronos Selector allocation on the *main* - ## thread — which happens lazily inside ThreadSignalPtr.close() through - ## getThreadDispatcher() — traps in rawNewObj. The refc signal handler - ## itself re-enters the same allocator and the process never returns. - ## Captured stack: - ## close → safeUnregisterAndCloseFd → getThreadDispatcher → - ## newDispatcher → Selector.new → newObj (gc.nim:488) → rawNewObj → - ## _sigtramp → signalHandler → newObjNoInit → addNewObjToZCT (loop) - ## - ## The workaround in cleanUpResources is `when defined(gcRefc): discard`, - ## i.e. skip the close() calls under refc only. orc is unaffected and - ## still cleans up the signal fds normally. - ## - ## NOTE: this test is documentation more than regression: a synthetic - ## ref-allocation workload of ~50k cells does NOT corrupt the refc heap - ## the way the real libwaku/libp2p teardown does, so this test passes - ## even when the workaround is disabled. Reproducing the actual hang - ## requires the full libwaku workload (logosdelivery_example.c). - ## Verification of the workaround was done end-to-end against that - ## example: with `--mm:refc` and close() enabled it hangs forever in - ## the captured stack above; with `when defined(gcRefc): discard` it - ## returns immediately. Under `--mm:orc` it returns immediately either - ## way. - test "destroy after heavy ref-allocation workload returns promptly": - var pool: FFIContextPool[TestLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - - var d: CallbackData - initCallbackData(d) - defer: deinitCallbackData(d) - - check sendRequestToFFIThread( - ctx, HeavyRefAllocRequest.ffiNewReq(testCallback, addr d) - ).isOk() - waitCallback(d) - check d.retCode == RET_OK - - let t0 = Moment.now() - check pool.destroyFFIContext(ctx).isOk() - check (Moment.now() - t0) < 3.seconds - -suite "sendRequestToFFIThread": - test "successful request triggers RET_OK callback": - var d: CallbackData - initCallbackData(d) - defer: - deinitCallbackData(d) - - var pool: FFIContextPool[TestLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) - - check sendRequestToFFIThread( - ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring) - ) - .isOk() - waitCallback(d) - check d.retCode == RET_OK - check callbackMsg(d) == "pong:hello" - - test "failing request triggers RET_ERR callback": - var d: CallbackData - initCallbackData(d) - defer: - deinitCallbackData(d) - - var pool: FFIContextPool[TestLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) - - check sendRequestToFFIThread(ctx, FailRequest.ffiNewReq(testCallback, addr d)).isOk() - waitCallback(d) - check d.retCode == RET_ERR - - test "empty ok response delivers empty message": - var d: CallbackData - initCallbackData(d) - defer: - deinitCallbackData(d) - - var pool: FFIContextPool[TestLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) - - check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d)) - .isOk() - waitCallback(d) - check d.retCode == RET_OK - check d.msgLen == 0 - - test "sequential requests are all processed": - var pool: FFIContextPool[TestLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) - - for i in 1 .. 5: - var d: CallbackData - initCallbackData(d) - let msg = "msg" & $i - check sendRequestToFFIThread( - ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring) - ) - .isOk() - waitCallback(d) - deinitCallbackData(d) - check d.retCode == RET_OK - check callbackMsg(d) == "pong:" & msg - -# --------------------------------------------------------------------------- -# ffiCtor macro integration test -# --------------------------------------------------------------------------- - -type SimpleLib = object - value: int - -ffiType: - type SimpleConfig = object - initialValue: int - -proc testlib_create*( - config: SimpleConfig -): Future[Result[SimpleLib, string]] {.ffiCtor.} = - return ok(SimpleLib(value: config.initialValue)) - -# Records the value of the library object the destructor body saw, so a test can -# confirm the user cleanup body ran with the right lib state before teardown. -var gDestroyedValue {.threadvar.}: int -proc testlib_destroy*(lib: SimpleLib) {.ffiDtor.} = - gDestroyedValue = lib.value - -suite "ffiCtor macro": - test "creates context and returns pointer via callback": - var d: CallbackData - initCallbackData(d) - defer: deinitCallbackData(d) - - let configJson = ffiSerialize(SimpleConfig(initialValue: 42)) - let ret = testlib_create(configJson.cstring, testCallback, addr d) - - check not ret.isNil() - - waitCallback(d) - - check d.retCode == RET_OK - - # The callback message is the ctx address as a decimal string - let addrStr = callbackMsg(d) - check addrStr.len > 0 - - let ctxAddr = cast[uint](parseBiggestUInt(addrStr)) - check ctxAddr != 0 - let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr) - - # Verify the library was properly initialized - check not ctx[].myLib.isNil - check ctx[].myLib[].value == 42 - - check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() - -proc createSimpleLib(initialValue: int): ptr FFIContext[SimpleLib] = - ## Helper: run the generated async ctor and return the live ctx. - var d: CallbackData - initCallbackData(d) - defer: - deinitCallbackData(d) - let ret = - testlib_create(ffiSerialize(SimpleConfig(initialValue: initialValue)).cstring, - testCallback, addr d) - doAssert not ret.isNil() - waitCallback(d) - doAssert d.retCode == RET_OK - return cast[ptr FFIContext[SimpleLib]](cast[uint](parseBiggestUInt(callbackMsg(d)))) - -suite "ffiDtor macro (async destroy + reuse)": - test "destroy fires RET_OK after teardown, frees myLib, and frees the context": - let ctx = createSimpleLib(5) - check not ctx[].myLib.isNil - check ctx[].myLib[].value == 5 - - var dD: CallbackData - initCallbackData(dD) - defer: - deinitCallbackData(dD) - - # Async destroy: the C return is just "accepted"; the real outcome arrives - # via the callback once the FFI thread has finished tearing the lib down. - check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK - waitCallback(dD) - check dD.retCode == RET_OK - check gDestroyedValue == 5 # the user cleanup body saw the live lib - check ctx[].myLib.isNil() # freed on the FFI thread - - # The context was freed from the FFI thread, so a fresh create reclaims it. - let ctx2 = createSimpleLib(9) - check ctx2 == ctx # same context, reused worker + fds - check ctx2[].myLib[].value == 9 - check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk() - - test "destroy waits for an in-flight request before reporting RET_OK": - let ctx = createSimpleLib(1) - - # Dispatch a 500 ms handler and do NOT wait — it is in flight at destroy time. - var slow: CallbackData - initCallbackData(slow) - defer: - deinitCallbackData(slow) - check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk() - - var dD: CallbackData - initCallbackData(dD) - defer: - deinitCallbackData(dD) - check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK - waitCallback(dD) - check dD.retCode == RET_OK - # Drained: the in-flight handler ran to completion before destroy reported OK. - check slow.called - check callbackMsg(slow) == "slow-done" - - let ctx2 = createSimpleLib(2) - check ctx2 == ctx - check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk() - - test "requests are rejected once a destroy closes the gate": - let ctx = createSimpleLib(3) - - var dD: CallbackData - initCallbackData(dD) - defer: - deinitCallbackData(dD) - check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK - waitCallback(dD) - check dD.retCode == RET_OK - - # Gate stays closed until the context is reacquired: a late request must not - # dispatch onto a context about to be (or already) reused. - var d: CallbackData - initCallbackData(d) - defer: - deinitCallbackData(d) - check sendRequestToFFIThread( - ctx, SlowRequest.ffiNewReq(testCallback, addr d) - ).isErr() - - let ctx2 = createSimpleLib(4) - check ctx2 == ctx - check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk() - - test "a stuck context is reported as RET_ERR rather than hanging": - let ctx = createSimpleLib(8) - - let savedTimeout = RecycleTimeout - RecycleTimeout = 150.milliseconds - defer: - RecycleTimeout = savedTimeout - - # In-flight handler outlasts the (shortened) drain timeout. - var slow: CallbackData - initCallbackData(slow) - defer: - deinitCallbackData(slow) - check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk() - - var dD: CallbackData - initCallbackData(dD) - defer: - deinitCallbackData(dD) - check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK - waitCallback(dD) - check dD.retCode == RET_ERR # drain timed out -> ctx reported stuck - - # The stuck context is leaked (not reused); the handler still finishes on its - # own. Wait for it, then fully tear the leaked context down. - waitCallback(slow) - check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() - -# --------------------------------------------------------------------------- -# Simplified .ffi. macro integration test -# --------------------------------------------------------------------------- - -ffiType: - type SendConfig = object - message: string - -proc testlib_send*( - lib: SimpleLib, cfg: SendConfig -): Future[Result[string, string]] {.ffi.} = - return ok("echo:" & cfg.message & ":" & $lib.value) - -suite "simplified .ffi. macro": - test "sends request and gets serialized response via callback": - # First create a context using ffiCtor - var ctorD: CallbackData - initCallbackData(ctorD) - defer: deinitCallbackData(ctorD) - - let configJson = ffiSerialize(SimpleConfig(initialValue: 7)) - let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD) - check not ctorRet.isNil() - - waitCallback(ctorD) - check ctorD.retCode == RET_OK - - let addrStr = callbackMsg(ctorD) - check addrStr.len > 0 - - let ctxAddr = cast[uint](parseBiggestUInt(addrStr)) - check ctxAddr != 0 - let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr) - defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() - - # Now call the .ffi. proc - var d: CallbackData - initCallbackData(d) - defer: deinitCallbackData(d) - - let cfgJson = ffiSerialize(SendConfig(message: "hello")) - let ret = testlib_send(ctx, testCallback, addr d, cfgJson.cstring) - check ret == RET_OK - - waitCallback(d) - check d.retCode == RET_OK - - let receivedMsg = callbackMsg(d) - let decoded = ffiDeserialize(receivedMsg.cstring, string).valueOr: - check false - "" - check decoded == "echo:hello:7" - -# --------------------------------------------------------------------------- -# async/sync detection in .ffi. macro integration test -# --------------------------------------------------------------------------- - -# Sync proc (no await in body) — macro detects this and bypasses thread machinery -proc testlib_version*( - lib: SimpleLib -): Future[Result[string, string]] {.ffi.} = - return ok("v" & $lib.value) - -suite "async/sync detection in .ffi.": - test "sync proc invokes callback without thread hop": - # Create a context using ffiCtor - var ctorD: CallbackData - initCallbackData(ctorD) - defer: deinitCallbackData(ctorD) - - let configJson = ffiSerialize(SimpleConfig(initialValue: 3)) - let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD) - check not ctorRet.isNil() - - waitCallback(ctorD) - check ctorD.retCode == RET_OK - - let addrStr = callbackMsg(ctorD) - check addrStr.len > 0 - - let ctxAddr = cast[uint](parseBiggestUInt(addrStr)) - check ctxAddr != 0 - let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr) - defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() - - var d2: CallbackData - initCallbackData(d2) - defer: deinitCallbackData(d2) - - # Call sync proc — callback should fire before the proc returns (no thread hop) - let ret = testlib_version(ctx, testCallback, addr d2) - # No sleep needed: sync path fires callback inline before returning - check ret == RET_OK - check d2.called # fires synchronously — no waitCallback needed - check d2.retCode == RET_OK - let receivedMsg = callbackMsg(d2) - let decoded = ffiDeserialize(receivedMsg.cstring, string).valueOr: - check false - "" - check decoded == "v3" - -# --------------------------------------------------------------------------- -# ptr T return type in .ffi. macro integration test -# --------------------------------------------------------------------------- - -type Handle = object - data: string - -ffiType: - type NameParam = object - name: string - -proc testlib_alloc_handle*( - lib: SimpleLib, np: NameParam -): Future[Result[ptr Handle, string]] {.ffi.} = - let h = createShared(Handle) - h[] = Handle(data: np.name & ":" & $lib.value) - return ok(h) - -proc testlib_read_handle*( - lib: SimpleLib, handle: pointer -): Future[Result[string, string]] {.ffi.} = - let h = cast[ptr Handle](handle) - return ok(h[].data) - -proc testlib_free_handle*( - lib: SimpleLib, handle: pointer -): Future[Result[string, string]] {.ffi.} = - let h = cast[ptr Handle](handle) - deallocShared(h) - return ok("freed") - -suite "ptr return type in .ffi.": - test "returns a heap-allocated handle and reads it back": - # Create context via ffiCtor - var ctorD: CallbackData - initCallbackData(ctorD) - defer: deinitCallbackData(ctorD) - - let configJson = ffiSerialize(SimpleConfig(initialValue: 5)) - let ctorRet = testlib_create(configJson.cstring, testCallback, addr ctorD) - check not ctorRet.isNil() - - waitCallback(ctorD) - check ctorD.retCode == RET_OK - - let ctxAddrStr = callbackMsg(ctorD) - check ctxAddrStr.len > 0 - let ctxAddr = cast[uint](parseBiggestUInt(ctxAddrStr)) - check ctxAddr != 0 - let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr) - defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk() - - # Alloc a handle - var allocD: CallbackData - initCallbackData(allocD) - defer: deinitCallbackData(allocD) - - let npJson = ffiSerialize(NameParam(name: "test")) - let allocRet = testlib_alloc_handle(ctx, testCallback, addr allocD, npJson.cstring) - check allocRet == RET_OK - - waitCallback(allocD) - check allocD.retCode == RET_OK - - let handleAddrStr = callbackMsg(allocD) - check handleAddrStr.len > 0 - let handleAddr = parseBiggestUInt(handleAddrStr) - check handleAddr != 0 - - # Read the handle back - var readD: CallbackData - initCallbackData(readD) - defer: deinitCallbackData(readD) - - let handleJson = ffiSerialize(cast[pointer](handleAddr)) - let readRet = testlib_read_handle(ctx, testCallback, addr readD, handleJson.cstring) - check readRet == RET_OK - - waitCallback(readD) - check readD.retCode == RET_OK - - let readMsg = callbackMsg(readD) - let decodedStr = ffiDeserialize(readMsg.cstring, string).valueOr: - check false - "" - check decodedStr == "test:5" - - # Free the handle - var freeD: CallbackData - initCallbackData(freeD) - defer: deinitCallbackData(freeD) - - let freeRet = testlib_free_handle(ctx, testCallback, addr freeD, handleJson.cstring) - check freeRet == RET_OK - - waitCallback(freeD) - check freeD.retCode == RET_OK - -# --------------------------------------------------------------------------- -# releaseFFIContext: park & reuse (fd-leak regression) -# --------------------------------------------------------------------------- - -proc countOpenFds(): int = - ## Number of open fds for this process, or -1 if not determinable on this - ## platform. On Linux we count /proc/self/fd; elsewhere we shell out to lsof - ## (skipped if lsof is unavailable, e.g. Windows). - when defined(linux): - var n = 0 - for _ in walkDir("/proc/self/fd"): - inc n - return n - else: - if findExe("lsof").len == 0: - return -1 - try: - let output = - execProcess("lsof", args = ["-p", $getCurrentProcessId()], options = {poUsePath}) - return output.splitLines().countIt(it.len > 0) - except CatchableError: - return -1 - -proc releaseAndWait[T](ctx: ptr FFIContext[T]): cint = - ## Test helper mirroring how a C consumer destroys a context: kick off the - ## (non-blocking) teardown and block on the callback, returning its retCode. - ## RET_OK means the lib's in-flight tasks finished and the context was parked. - var d: CallbackData - initCallbackData(d) - defer: - deinitCallbackData(d) - if ctx.releaseFFIContext(testCallback, addr d).isErr(): - return RET_ERR - waitCallback(d) - return d.retCode - -suite "releaseFFIContext (park & reuse)": - test "park returns the context and reuses the same live worker": - var pool: FFIContextPool[TestLib] - let ctx1 = pool.createFFIContext().valueOr: - check false - return - check ctx1.releaseAndWait() == RET_OK - - # Reacquire: must be the same context, with its worker still running. - let ctx2 = pool.createFFIContext().valueOr: - check false - return - check ctx1 == ctx2 - - var d: CallbackData - initCallbackData(d) - defer: - deinitCallbackData(d) - check sendRequestToFFIThread( - ctx2, PingRequest.ffiNewReq(testCallback, addr d, "reuse".cstring) - ).isOk() - waitCallback(d) - check d.retCode == RET_OK - check callbackMsg(d) == "pong:reuse" # reused worker still processes requests - - check pool.destroyFFIContext(ctx2).isOk() - - test "park drops the stale event callback and library pointer": - var pool: FFIContextPool[TestLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - ctx.callbackState.callback = cast[pointer](testCallback) - ctx.callbackState.userData = cast[pointer](0xDEAD) - - check ctx.releaseAndWait() == RET_OK - check ctx.callbackState.callback.isNil() # a watchdog tick can't call a freed cb - check ctx.callbackState.userData.isNil() - check ctx.myLib.isNil() - - check pool.destroyFFIContext(ctx).isOk() - - test "fd usage stays bounded across many park/reuse cycles": - if countOpenFds() < 0: - skip() # no fd-counting facility on this platform - else: - var pool: FFIContextPool[TestLib] - - # Warm up: the first create builds the context's worker (its fds are allocated - # once here); parking keeps them open for reuse. - block: - let ctx = pool.createFFIContext().valueOr: - check false - return - check ctx.releaseAndWait() == RET_OK - - let baseline = countOpenFds() - - for _ in 0 ..< 20: - let ctx = pool.createFFIContext().valueOr: - check false - return - var d: CallbackData - initCallbackData(d) - check sendRequestToFFIThread( - ctx, PingRequest.ffiNewReq(testCallback, addr d, "x".cstring) - ).isOk() - waitCallback(d) - deinitCallbackData(d) - check ctx.releaseAndWait() == RET_OK - - let afterCycles = countOpenFds() - # Reuse must not grow fds. Before the fix each cycle leaked ~10 fds (4 - # ThreadSignalPtr socketpairs + 2 dispatcher kqueues); the small slack - # only tolerates unrelated runtime fd noise, not a per-cycle leak. - check afterCycles <= baseline + 5 - - # Tear the (still parked) context's worker down so the test leaves no threads. - let last = pool.createFFIContext().valueOr: - check false - return - check pool.destroyFFIContext(last).isOk()