diff --git a/CHANGELOG.md b/CHANGELOG.md index 16277fa..805b8fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,26 @@ All notable changes to this project are documented in this file. +## [Unreleased] + +### Changed +- User event callbacks now run on a dedicated event thread fed by a + bounded SPSC queue (default capacity 1024), so a slow listener can no + longer block the FFI thread or concurrent `add_event_listener` / + `remove_event_listener` calls + ([#6](https://github.com/logos-messaging/nim-ffi/issues/6)). +- Replaced the dedicated watchdog thread with a heartbeat check that + runs on the event thread. The FFI thread advances an atomic heartbeat + each loop iteration; if it stalls for more than 1s past the start-up + grace window, the event thread emits the `not_responding` event. + +### Added +- Queue-overflow handling: when the bounded event queue is full, the + library sets a sticky "stuck" flag, logs an error, fires + `not_responding` from the event thread, and rejects subsequent + `sendRequestToFFIThread` calls with `event queue stuck - library + cannot accept new requests`. + ## [0.2.0] - 2026-06-04 Major release introducing the CBOR-based wire format, CBOR-backed FFI events diff --git a/ffi/event_thread.nim b/ffi/event_thread.nim index abe173a..e1037f5 100644 --- a/ffi/event_thread.nim +++ b/ffi/event_thread.nim @@ -5,7 +5,7 @@ ## so each thread's machinery is readable on its own. ## ## Responsibilities: -## - Drain queued events into listener callbacks (queue producer lands in PR #69). +## - Drain queued events into listener callbacks. ## - Watch `ctx.ffiHeartbeat` and emit `NotRespondingEvent` / `RespondingEvent` ## on FFI-thread stall and recovery transitions. @@ -37,14 +37,13 @@ proc dispatchToListeners[T]( ) proc emitLivenessEvent[T, P](ctx: ptr FFIContext[T], name: string, payload: P) = - ## Encodes a zero-field liveness event (`NotRespondingEvent`, - ## `RespondingEvent`) and dispatches it directly to listeners, bypassing - ## the event queue (which may itself be wedged). Runs on the event thread. + ## Encodes a liveness event and dispatches directly to listeners (bypassing + ## the queue, which may be wedged). Runs on the event thread. let event = try: EventEnvelope[P](eventType: name, payload: payload).cborEncode() - except CatchableError as exc: - chronicles.error "liveness event encode failed", name = name, err = exc.msg + except CatchableError as e: + chronicles.error "liveness event encode failed", name = name, err = e.msg return let dataPtr: pointer = if event.len > 0: @@ -57,18 +56,14 @@ proc onNotResponding*(ctx: ptr FFIContext) = emitLivenessEvent(ctx, NotRespondingEventName, NotRespondingEvent()) proc onResponding*(ctx: ptr FFIContext) = - ## Fired once when the FFI thread's heartbeat starts advancing again - ## after a `NotRespondingEvent`. Lets consumers clear any "library - ## hung" UI state without polling. + ## Fired once when the heartbeat resumes after a NotRespondingEvent. + ## Lets consumers clear any "library hung" UI state without polling. emitLivenessEvent(ctx, RespondingEventName, RespondingEvent()) proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) = ## Frees `qe`'s c_malloc buffers on exit. defer: - if not qe.name.isNil(): - c_free(cast[pointer](qe.name)) - if not qe.data.isNil(): - c_free(qe.data) + freeEventBuffers(qe.name, qe.data) ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen) proc drainEventQueue[T](ctx: ptr FFIContext[T]) = @@ -94,10 +89,8 @@ proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T = ) proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) = - ## Fires `onNotResponding` once the FFI thread's heartbeat counter stops - ## advancing past the stale threshold, and fires `onResponding` once it - ## starts advancing again. Both transitions latch so each is emitted at - ## most once per stall episode. + ## Fires onNotResponding / onResponding on heartbeat stall / recovery. + ## Both transitions latch — each fires at most once per stall episode. if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay: return let cur = ctx.ffiHeartbeat.load() @@ -113,14 +106,19 @@ proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) = proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} = var hb = HeartbeatMonitor.init(ctx) + var notifiedStuck = false # latched forever — eventQueueStuck is sticky terminal. while ctx.running.load(): - # Wake on enqueue or tick — whichever first. The enqueue path lands in PR #69; - # until then the wait always times out and we fall through to the heartbeat check. + # Wake on enqueue or tick — whichever first. discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval) ctx.drainEventQueue() + # Fire after drain so reg.lock is free — FFI-thread would deadlock here. + if not notifiedStuck and ctx.eventQueueStuck.load(): + onNotResponding(ctx) + notifiedStuck = true + if not ctx.running.load(): break hb.check(ctx) @@ -135,5 +133,5 @@ proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = try: waitFor eventRun(ctx) - except CatchableError as exc: - error "event thread exited with exception", error = exc.msg + except CatchableError as e: + error "event thread exited with exception", error = e.msg diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index a4ecaef..3fa50cf 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -6,122 +6,87 @@ {.passc: "-fPIC".} -import system/ansi_c import std/[atomics, locks, options, tables] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results -import - ./ffi_types, - ./ffi_events, - ./ffi_thread_request, - ./internal/ffi_macro, - ./logging, - ./cbor_serial +import ./ffi_types, ./ffi_events, ./ffi_thread_request, ./logging, ./cbor_serial export ffi_events type FFIContext*[T] = object - myLib*: ptr T - # main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library) + myLib*: ptr T # main library object (Waku, LibP2P, SDS, …) ffiThread: Thread[(ptr FFIContext[T])] - # represents the main FFI thread in charge of attending API consumer actions eventThread: Thread[(ptr FFIContext[T])] - # drains the event queue and runs the FFI-thread heartbeat check lock: Lock reqChannel: ChannelSPSCSingle[ptr FFIThreadRequest] - reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent + reqSignal: ThreadSignalPtr reqReceivedSignal: ThreadSignalPtr - # to signal main thread, interfacing with the FFI thread, that FFI thread received the request stopSignal: ThreadSignalPtr threadExitSignal: ThreadSignalPtr # bounds destroyFFIContext's wait so a blocked loop cannot hang the caller - eventQueueSignal: ThreadSignalPtr - # wakes the event thread on enqueue (used once dispatch is rewired in PR #69) + eventQueueSignal: ThreadSignalPtr # wakes the event thread on enqueue eventThreadExitSignal: ThreadSignalPtr # mirrors threadExitSignal for the event thread userData*: pointer eventRegistry*: FFIEventRegistry eventQueue*: EventQueue ffiHeartbeat*: Atomic[int64] # advanced each FFI-thread loop; event thread reads for liveness + eventQueueStuck*: Atomic[bool] # sticky overflow flag running: Atomic[bool] # To control when the threads are running registeredRequests: ptr Table[cstring, FFIRequestProc] - # Pointer to with the registered requests at compile time var onFFIThread* {.threadvar.}: bool - ## True while executing inside `ffiThreadBody`. Used by - ## `sendRequestToFFIThread` to detect re-entrant dispatch from a handler - ## (which would self-deadlock on `reqReceivedSignal`). + # Re-entrant dispatch guard for `sendRequestToFFIThread`. const git_version* {.strdefine.} = "n/a" const - EventThreadTickInterval* = 1.seconds # bounds idle heartbeat check latency + EventThreadTickInterval* = 1.seconds FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup FFIHeartbeatStaleThreshold* = 1.seconds include ./event_thread include ./ffi_thread +template closeAndNil(field: untyped) = + if not field.isNil(): + ?field.close() + field = nil + proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Mirror of `initContextResources`: tears down lock, registry, queue, - ## and signal fds in place. Threads MUST already be joined. Caller owns - ## the memory holding `ctx`. Fields are nil'd after close so a re-init - ## on the same slot doesn't double-close. + ## Mirror of `initContextResources`. Threads MUST be joined first; + ## fields are nil'd after close so re-init on the same slot is safe. ctx.lock.deinitLock() deinitEventRegistry(ctx[].eventRegistry) deinitEventQueue(ctx[].eventQueue) when defined(gcRefc): - ## ThreadSignalPtr.close() is intentionally skipped under --mm:refc. - ## - ## close() goes through chronos's safeUnregisterAndCloseFd, which calls - ## getThreadDispatcher() and lazily allocates a new Selector for the - ## main thread. With refc and a heavy ref-object graph torn down by the - ## FFI thread (libwaku/libp2p), that allocation traps inside rawNewObj - ## and the refc signal handler re-enters the same allocator — the - ## process never returns. Captured stack from a hung process: - ## close → safeUnregisterAndCloseFd → getThreadDispatcher → - ## newDispatcher → Selector.new → newObj (gc.nim:488) → - ## rawNewObj (gc.nim:470) → rawNewObj → _sigtramp → signalHandler → - ## newObjNoInit → addNewObjToZCT (infinite re-entry) - ## - ## --mm:orc does NOT exhibit this bug; see the - ## "destroyFFIContext refc workaround" suite in tests/test_ffi_context.nim - ## (test "destroy after heavy ref-allocation workload returns promptly"). - ## The signal fds (a few per ctx) are reclaimed by the OS at process - ## exit; destroyFFIContext is called once per process lifetime, so the - ## leak is bounded. + # ThreadSignalPtr.close() under refc traps in safeUnregisterAndCloseFd + # → newDispatcher → rawNewObj → signal-handler re-entry (process hangs). + # See tests/test_ffi_context.nim "destroyFFIContext refc workaround". + # Fd leak is bounded — destroy runs once per process lifetime. discard else: - if not ctx.reqSignal.isNil(): - ?ctx.reqSignal.close() - ctx.reqSignal = nil - if not ctx.reqReceivedSignal.isNil(): - ?ctx.reqReceivedSignal.close() - ctx.reqReceivedSignal = nil - if not ctx.stopSignal.isNil(): - ?ctx.stopSignal.close() - ctx.stopSignal = nil - if not ctx.threadExitSignal.isNil(): - ?ctx.threadExitSignal.close() - ctx.threadExitSignal = nil - if not ctx.eventQueueSignal.isNil(): - ?ctx.eventQueueSignal.close() - ctx.eventQueueSignal = nil - if not ctx.eventThreadExitSignal.isNil(): - ?ctx.eventThreadExitSignal.close() - ctx.eventThreadExitSignal = nil - return ok() + closeAndNil(ctx.reqSignal) + closeAndNil(ctx.reqReceivedSignal) + closeAndNil(ctx.stopSignal) + closeAndNil(ctx.threadExitSignal) + closeAndNil(ctx.eventQueueSignal) + closeAndNil(ctx.eventThreadExitSignal) + ok() proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Full cleanup for heap-allocated contexts: closes all resources and frees memory. + ## Deinit + free for heap-allocated contexts. defer: freeShared(ctx) ctx.deinitContextResources() +template newSignalOrErr(field: untyped, name: string) = + field = ThreadSignalPtr.new().valueOr: + return err("couldn't create ThreadSignalPtr: " & name & ": " & $error) + proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Initialises all resources inside an already-allocated FFIContext slot. - ## On failure every partially-initialised resource is closed; the caller - ## is responsible for releasing the slot (freeShared or pool.releaseSlot). - # Defensive nil: deferred cleanup must never double-close stale pointers on a reused pool slot. + ## On failure, the deferred cleanup closes partial state; caller releases + ## the slot (freeShared or pool.releaseSlot). + # Nil first so deferred cleanup can't double-close a reused pool slot. ctx.reqSignal = nil ctx.reqReceivedSignal = nil ctx.stopSignal = nil @@ -132,6 +97,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = initEventRegistry(ctx[].eventRegistry) initEventQueue(ctx[].eventQueue) ctx.ffiHeartbeat.store(0) + ctx.eventQueueStuck.store(false) var success = false defer: @@ -140,23 +106,12 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = error "failed to clean up resources after createFFIContext failure", error = error - ctx.reqSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create reqSignal ThreadSignalPtr: " & $error) - - ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create reqReceivedSignal ThreadSignalPtr: " & $error) - - ctx.stopSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create stopSignal ThreadSignalPtr: " & $error) - - ctx.threadExitSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create threadExitSignal ThreadSignalPtr: " & $error) - - ctx.eventQueueSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create eventQueueSignal ThreadSignalPtr: " & $error) - - ctx.eventThreadExitSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create eventThreadExitSignal ThreadSignalPtr: " & $error) + newSignalOrErr(ctx.reqSignal, "reqSignal") + newSignalOrErr(ctx.reqReceivedSignal, "reqReceivedSignal") + newSignalOrErr(ctx.stopSignal, "stopSignal") + newSignalOrErr(ctx.threadExitSignal, "threadExitSignal") + newSignalOrErr(ctx.eventQueueSignal, "eventQueueSignal") + newSignalOrErr(ctx.eventThreadExitSignal, "eventThreadExitSignal") ctx.registeredRequests = addr ffi_types.registeredRequests @@ -170,8 +125,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = try: createThread(ctx.eventThread, eventThreadBody[T], ctx) except ValueError, ResourceExhaustedError: - ## ffiThread is already running; signal it to exit and join before the - ## deferred cleanUpResources closes the signals it's waiting on. + # Join ffiThread before deferred cleanup closes signals it's waiting on. ctx.running.store(false) let fireRes = ctx.reqSignal.fireSync() if fireRes.isErr(): @@ -181,67 +135,55 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = return err("failed to create the event thread: " & getCurrentExceptionMsg()) success = true - return ok() + ok() + +proc fireOrErr(sig: ThreadSignalPtr, name: string): Result[void, string] = + let fired = sig.fireSync().valueOr: + return err("error signaling: " & name & ": " & $error) + if not fired: + return err("failed to signal: " & name & " on time") + ok() + +proc waitExitOrErr( + sig: ThreadSignalPtr, name: string, timeout: Duration +): Result[void, string] = + let exited = sig.waitSync(timeout).valueOr: + return err("error waiting for exit: " & name & ": " & $error) + if not exited: + return err("did not exit in time: " & name & " (leaking ctx to avoid hang)") + ok() proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = - # Error paths intentionally skip onNotResponding: a back-pressuring - # listener may hold reg.lock, and onNotResponding takes it — would - # amplify the stuck state into a deadlock instead of escaping it. + # Skip onNotResponding on error: it takes reg.lock, which a back-pressuring + # listener may hold — would deepen the stuck state into a deadlock. ctx.running.store(false) - let reqSignaled = ctx.reqSignal.fireSync().valueOr: - return err("error signaling reqSignal in signalStop: " & $error) - if not reqSignaled: - return err("failed to signal reqSignal on time in signalStop") - let stopSignaled = ctx.stopSignal.fireSync().valueOr: - return err("error signaling stopSignal in signalStop: " & $error) - if not stopSignaled: - return err("failed to signal stopSignal on time in signalStop") - # Non-fatal: event thread will see running==false on the next tick. - let evtSignaled = ctx.eventQueueSignal.fireSync() - if evtSignaled.isErr(): - error "failed to signal eventQueueSignal in signalStop", error = evtSignaled.error - elif evtSignaled.get() == false: - error "failed to signal eventQueueSignal on time in signalStop" - return ok() + ?ctx.reqSignal.fireOrErr("reqSignal") + ?ctx.stopSignal.fireOrErr("stopSignal") + # Non-fatal: event thread sees running==false on the next tick anyway. + ctx.eventQueueSignal.fireOrErr("eventQueueSignal").isOkOr: + error "failed to signal eventQueueSignal in signalStop", error = error + ok() -## If the FFI thread's event loop is blocked by a synchronous handler -## (e.g. blocking I/O), it cannot process reqSignal in time to exit. -## clearContext waits on threadExitSignal up to this bound; on timeout it -## returns err and skips joinThread/cleanup (leaking the thread + ctx slot) -## rather than hanging the caller forever. +## Bound on how long clearContext waits for the FFI thread to exit before +## leaking ctx rather than hanging the caller. const ThreadExitTimeout* = 1500.milliseconds proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Signals both threads to stop, waits up to ThreadExitTimeout per thread, - ## and joins them. On timeout returns err and skips remaining joins - ## (leaving the threads live) rather than hanging the caller. Resource - ## cleanup is the caller's responsibility. - ## - ## Timeout paths skip onNotResponding for the same reason signalStop does. + ## On timeout, returns err and skips remaining joins (leaves threads live). + ## Caller owns resource cleanup. Skips onNotResponding (same reason as signalStop). ctx.signalStop().isOkOr: return err("signalStop failed: " & $error) - let ffiExitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr: - return err("error waiting for FFI thread exit: " & $error) - - if not ffiExitedOnTime: - return err("FFI thread did not exit in time; leaking ctx to avoid hang") - + ?ctx.threadExitSignal.waitExitOrErr("FFI thread", ThreadExitTimeout) joinThread(ctx.ffiThread) - - let evtExitedOnTime = ctx.eventThreadExitSignal.waitSync(ThreadExitTimeout).valueOr: - return err("error waiting for event thread exit: " & $error) - - if not evtExitedOnTime: - return err("event thread did not exit in time; leaking ctx to avoid hang") - + ?ctx.eventThreadExitSignal.waitExitOrErr("event thread", ThreadExitTimeout) joinThread(ctx.eventThread) - return ok() + ok() proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Stops the FFI context that was created via createFFIContext[T]() (heap). + ## Stops a heap-allocated FFI context. ctx.stopAndJoinThreads().isOkOr: return err("clearContext: " & $error) ctx.cleanUpResources().isOkOr: return err("cleanUpResources failed: " & $error) - return ok() + ok() diff --git a/ffi/ffi_context_pool.nim b/ffi/ffi_context_pool.nim index a92ef6e..5e2d2cb 100644 --- a/ffi/ffi_context_pool.nim +++ b/ffi/ffi_context_pool.nim @@ -3,14 +3,10 @@ import results import ./ffi_context const MaxFFIContexts* = 32 - ## Maximum number of concurrently live FFI contexts when using FFIContextPool. - ## Fds and threads are only consumed for slots that are actually acquired, - ## so this value only affects the upfront memory of the pool array. + # Only affects upfront pool memory; fds/threads consumed per acquired slot. type FFIContextPool*[T] = object - ## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context - ## and bounds the total number of file descriptors consumed by ThreadSignalPtrs - ## to at most MaxFFIContexts * 2. + ## Fixed pool. Bounds ThreadSignalPtr fds at MaxFFIContexts * 2. slots: array[MaxFFIContexts, FFIContext[T]] inUse: array[MaxFFIContexts, Atomic[bool]] @@ -19,7 +15,7 @@ proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], stri var expected = false if pool.inUse[i].compareExchange(expected, true): return ok(pool.slots[i].addr) - return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") + err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = for i in 0 ..< MaxFFIContexts: @@ -30,39 +26,31 @@ proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = proc createFFIContext*[T]( pool: var FFIContextPool[T] ): Result[ptr FFIContext[T], string] = - ## Acquires a slot from the fixed pool and initialises it as an FFI context. - ## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open. let ctx = pool.acquireSlot().valueOr: return err("createFFIContext: acquireSlot failed: " & $error) initContextResources(ctx).isOkOr: pool.releaseSlot(ctx) return err("createFFIContext: initContextResources failed: " & $error) - return ok(ctx) + ok(ctx) proc destroyFFIContext*[T]( pool: var FFIContextPool[T], ctx: ptr FFIContext[T] ): Result[void, string] = - ## Stops the FFI context and returns its slot to the pool. If the FFI thread - ## is blocked and does not exit in time, the slot is leaked rather than - ## reclaimed — closing its resources while the thread is still live would be - ## unsafe. + ## On thread-exit timeout the slot is leaked — closing live-thread resources is unsafe. ctx.stopAndJoinThreads().isOkOr: return err("destroyFFIContext(pool): " & $error) - # Without this, the next acquisition would re-init an already-initialised - # lock (UB) and leak the previous signal fds. + # Required: next acquisition would otherwise re-init a live lock (UB). let deinitRes = ctx.deinitContextResources() pool.releaseSlot(ctx) deinitRes.isOkOr: return err("destroyFFIContext(pool): " & $error) - return ok() + ok() proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool = - ## Returns true only if ctx points to one of the pool's slots that is - ## currently in use. Rejects nil, offset-invalid, and dangling pointers - ## at the API boundary, preventing use-after-free dereferences. + ## Rejects nil / offset-invalid / dangling pointers at the API boundary. if ctx.isNil(): return false for i in 0 ..< MaxFFIContexts: if cast[pointer](pool.slots[i].addr) == ctx: return pool.inUse[i].load() - return false + false diff --git a/ffi/ffi_events.nim b/ffi/ffi_events.nim index 529a6f3..ba5b3d0 100644 --- a/ffi/ffi_events.nim +++ b/ffi/ffi_events.nim @@ -1,19 +1,15 @@ -## Event registry, bounded SPSC event queue, and dispatch templates for -## FFI library-initiated events. Listeners receive only the event name -## they subscribed to. Queue payloads travel via `c_malloc` so transfer -## across Nim heaps is safe under both `--mm:orc` and `--mm:refc`. +## Per-context event registry + bounded SPSC queue. FFI thread enqueues, +## event thread drains; payloads travel via `c_malloc` so they survive +## pool-slot reuse across thread heaps. {.pragma: callback, cdecl, raises: [], gcsafe.} import system/ansi_c -import std/[locks, sequtils, options, tables] +import std/[atomics, locks, sequtils, options, tables] import chronicles -import ./ffi_types, ./cbor_serial +import ./ffi_types, ./cbor_serial, ./alloc -type EventEnvelope*[T] = object - ## Standard wire shape for CBOR-encoded FFI events: - ## { eventType: tstr, payload: } - ## Pair with `dispatchFFIEventCbor` (or call `cborEncode` directly). +type EventEnvelope*[T] = object ## CBOR wire shape: { eventType: tstr, payload: }. eventType*: string payload*: T @@ -24,32 +20,20 @@ type userData*: pointer FFIEventRegistry* = object - ## Per-context multi-listener registry. `lock` guards every mutation; - ## readers (dispatch path) acquire it only long enough to copy out the - ## listener slice for the event being dispatched. lock*: Lock - nextId*: uint64 ## Monotonic id source. 0 is reserved as "invalid"; ids start at 1. + nextId*: uint64 # 0 is reserved as "invalid"; ids start at 1. byEvent*: Table[string, seq[FFIEventListener]] proc initEventRegistry*(reg: var FFIEventRegistry) = - ## Must be called exactly once on the owning thread before the registry - ## is shared. The embedded `Lock` wraps a platform primitive that cannot - ## be safely double-initialised, so concurrent callers would hit UB at - ## the OS layer — the lock itself can't defend against its own init. + ## Must run once on the owning thread before sharing — `initLock` on a + ## live primitive is UB at the OS layer. reg.lock.initLock() reg.nextId = 0'u64 reg.byEvent = initTable[string, seq[FFIEventListener]]() proc deinitEventRegistry*(reg: var FFIEventRegistry) = - ## Mirror of `initEventRegistry`: must be called exactly once, by the - ## same thread that owns the registry, after all other threads have - ## stopped using it. `deinitLock` on a platform primitive that any - ## thread might still be holding or about to acquire is UB at the OS - ## layer. - ## - ## Resets the GC-managed fields to default so `FFIContextPool`'s - ## slot reuse on a *different* thread doesn't trigger Nim's hidden - ## assignment destructor against this thread's heap allocations. + ## Mirror of `initEventRegistry`; same single-thread constraint. Resets GC + ## fields so pool-slot reuse on another thread sees no hidden dtor. reg.lock.deinitLock() reg.byEvent = default(Table[string, seq[FFIEventListener]]) reg.nextId = 0'u64 @@ -60,11 +44,7 @@ proc addEventListener*( callback: FFICallBack, userData: pointer, ): uint64 {.raises: [].} = - ## Registers `callback` for `eventName` and returns the listener's stable - ## id (always non-zero on success). A listener only receives events - ## dispatched under its own `eventName` — subscribe to each event - ## separately. Returns 0 if `callback` is nil — the only documented - ## failure mode. + ## Returns the listener id (>0), or 0 if `callback` is nil. if callback.isNil(): return 0 @@ -79,10 +59,8 @@ proc addEventListener*( assigned proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises: [].} = - ## Removes the listener with `id`. Returns true on success, false if no - ## listener with that id exists. Safe to call from inside a dispatch: - ## the in-flight snapshot still delivers exactly once to the listener - ## being removed. + ## Safe to call from inside a dispatch — the in-flight snapshot still + ## delivers exactly once to the removed listener. if id == 0'u64: return false @@ -106,41 +84,33 @@ proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises: removed proc removeAllEventListeners*(reg: var FFIEventRegistry) {.raises: [].} = - ## Drops every registered listener. Does not reset the listener-id - ## counter — subsequent `addEventListener` calls still return strictly - ## increasing ids. + ## Does not reset the id counter. withLock reg.lock: reg.byEvent.clear() proc snapshotListeners*( reg: var FFIEventRegistry, eventName: string ): seq[FFIEventListener] {.raises: [].} = - ## Returns a copy of the listener slice for `eventName`. The copy is what - ## makes re-entrant add/remove from inside a handler deadlock-free: - ## dispatch holds the lock only for the duration of the copy, then - ## iterates the copy outside the lock. + ## Lock held only across the copy — keeps re-entrant add/remove + ## from a handler deadlock-free. var listeners: seq[FFIEventListener] = @[] withLock reg.lock: - # `getOrDefault` avoids the raising `[]` path; returns empty when absent. for l in reg.byEvent.getOrDefault(eventName): listeners.add(l) listeners const EventQueueCapacity* = 1024 - ## ~24 KiB per context. Sustained backlog at this depth means a - ## listener is wedged — what the stuck flag exists to surface. + # Sustained backlog at this depth means a listener is wedged. type QueuedEvent* = object - ## All fields are raw `c_malloc` pointers so the buffer survives - ## pool-slot reuse across thread heaps without an assignment dtor. + # Raw `c_malloc` pointers so the buffer survives pool-slot reuse + # across thread heaps without an assignment dtor. name*: cstring data*: ptr UncheckedArray[byte] dataLen*: int - EventQueue* = object - ## SPSC ring: FFI thread enqueues, event thread dequeues. Plain lock - ## (no atomic indices) — operations are short and uncontended. + EventQueue* = object # SPSC ring; plain lock since ops are short and uncontended. lock*: Lock head*: int tail*: int @@ -148,7 +118,6 @@ type buf*: array[EventQueueCapacity, QueuedEvent] proc initEventQueue*(q: var EventQueue) {.raises: [].} = - ## Same single-owning-thread constraint as `initEventRegistry`. q.lock.initLock() q.head = 0 q.tail = 0 @@ -156,14 +125,18 @@ proc initEventQueue*(q: var EventQueue) {.raises: [].} = for i in 0 ..< EventQueueCapacity: q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0) +proc freeEventBuffers*( + name: cstring, data: ptr UncheckedArray[byte] +) {.raises: [], gcsafe.} = + if not name.isNil(): + c_free(cast[pointer](name)) + if not data.isNil(): + c_free(data) + proc deinitEventQueue*(q: var EventQueue) {.raises: [].} = - ## Both producer and consumer must have stopped before calling. + ## Both producer and consumer must have stopped. for i in 0 ..< EventQueueCapacity: - let e = q.buf[i] - if not e.name.isNil: - c_free(cast[pointer](e.name)) - if not e.data.isNil: - c_free(e.data) + freeEventBuffers(q.buf[i].name, q.buf[i].data) q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0) q.head = 0 q.tail = 0 @@ -173,8 +146,7 @@ proc deinitEventQueue*(q: var EventQueue) {.raises: [].} = proc tryEnqueueEvent*( q: var EventQueue, name: cstring, data: ptr UncheckedArray[byte], dataLen: int ): bool {.raises: [], gcsafe.} = - ## Both `name` and `data` must be `c_malloc`'d; on success the queue - ## takes ownership. On false the caller still owns and must free them. + ## On true the queue owns `name`/`data`; on false the caller still does. withLock q.lock: if q.count >= EventQueueCapacity: return false @@ -184,31 +156,29 @@ proc tryEnqueueEvent*( true proc tryDequeueEvent*(q: var EventQueue): Option[QueuedEvent] {.raises: [], gcsafe.} = - ## Transfers buffer ownership to the caller, who must `c_free` both. + ## Caller takes ownership and must `c_free` both buffers. withLock q.lock: if q.count == 0: return none(QueuedEvent) - let e = q.buf[q.head] + let dequeued = q.buf[q.head] q.buf[q.head] = QueuedEvent(name: nil, data: nil, dataLen: 0) q.head = (q.head + 1) mod EventQueueCapacity q.count.dec() - return some(e) + return some(dequeued) proc eventQueueLen*(q: var EventQueue): int {.raises: [], gcsafe.} = withLock q.lock: return q.count const emptyListenerPayload*: cstring = "" - ## Non-nil zero-length buffer handed to listeners when a payload is - ## empty, so a consumer doing `std::string(data, len)` / `memcpy` never - ## receives a nil pointer (which is UB even at len 0). + ## Non-nil zero-length buffer handed to listeners when the payload is empty + ## (a nil pointer would be UB for consumers doing `memcpy` even at len 0). proc notifyListeners*( listeners: seq[FFIEventListener], retCode: cint, data: pointer, dataLen: int ) = - ## Fans out a payload to every listener in the snapshot. Empty payloads - ## are delivered as the non-nil `emptyListenerPayload` sentinel so a - ## consumer doing `std::string(data, len)` / `memcpy` never receives nil. + ## Empty payloads go through `emptyListenerPayload` so consumers doing + ## `std::string(data, len)` / `memcpy` never see a nil pointer. let n = max(dataLen, 0) let dataPtr = if n > 0 and not data.isNil(): @@ -219,8 +189,6 @@ proc notifyListeners*( listener.callback(retCode, dataPtr, cast[csize_t](n), listener.userData) proc notifyListenersErr*(listeners: seq[FFIEventListener], msg: string) = - ## Error fan-out: adapts the message string to `notifyListeners`, which - ## supplies the non-nil pointer for the empty-message case. let p = if msg.len > 0: cast[pointer](unsafeAddr msg[0]) @@ -229,63 +197,62 @@ proc notifyListenersErr*(listeners: seq[FFIEventListener], msg: string) = notifyListeners(listeners, RET_ERR, p, msg.len) var ffiCurrentEventRegistry* {.threadvar.}: ptr FFIEventRegistry - ## Set by the FFI thread at startup so dispatchFFIEvent / dispatchFFIEventCbor - ## can find their registry without taking a context pointer per call site. + # Kept for tests that drive the registry directly. -template withFFIEventDispatch(eventName: string, listeners, body: untyped) = - ## Shared scaffold for `dispatchFFIEvent` / `dispatchFFIEventCbor`: - ## resolves the thread-local registry, snapshots listeners under - ## `reg.lock` into the caller-named `listeners` binding, then runs - ## `body` inside `foreignThreadGc` + try/except. - let regPtr = ffiCurrentEventRegistry - if regPtr.isNil(): - chronicles.error eventName & " - event registry not set on this thread" - return +var ffiCurrentEventQueue* {.threadvar.}: ptr EventQueue + # Installed by the FFI thread so dispatch templates need no `ctx`. - withLock regPtr[].lock: - let listeners = regPtr[].byEvent.getOrDefault(eventName) - if listeners.len == 0: - chronicles.debug eventName & " - no listener registered" - else: - foreignThreadGc: - try: - body - except Exception, CatchableError: - notifyListenersErr( - listeners, - "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(), - ) +var ffiCurrentEventQueueStuck* {.threadvar.}: ptr Atomic[bool] + # Sticky overflow flag; FFI request entry point reads it to reject. -template dispatchFFIEvent*(eventName: string, body: untyped) = - ## Dispatches an FFI event to every listener subscribed to `eventName`. - ## `body` must yield a `string` or `seq[byte]`. - ## - ## Valid only on the FFI thread (where `ffiCurrentEventRegistry` is set). - ## Holds `reg.lock` for the entire snapshot + invocation so a concurrent - ## `removeEventListener` from a foreign thread blocks until dispatch - ## returns. Handlers must not call addEventListener / removeEventListener - ## on the same registry (would self-deadlock). - withFFIEventDispatch(eventName, listeners): - let event = body - let dataPtr: pointer = - if event.len > 0: - cast[pointer](unsafeAddr event[0]) - else: - cast[pointer](emptyListenerPayload) - notifyListeners(listeners, RET_OK, dataPtr, event.len) +var ffiCurrentNotifyEventEnqueued* {.threadvar.}: proc() {.gcsafe, raises: [].} + # Hook so this module doesn't depend on chronos's ThreadSignalPtr. + # Nil-safe; tick-driven tests leave it unset. + +template enqueueOrMarkStuck( + eventName: string, namePtr: cstring, dataPtr: ptr UncheckedArray[byte], dataLen: int +) = + ## Takes ownership of `namePtr`/`dataPtr`. On queue-full sets the sticky + ## stuck flag and wakes the event thread (firing onNotResponding from here + ## would risk deadlock against a back-pressuring listener). + block enqueueBlock: + let q = ffiCurrentEventQueue + if q.isNil(): + chronicles.error "event queue not set on this thread", event = eventName + freeEventBuffers(namePtr, dataPtr) + break enqueueBlock + if not q[].tryEnqueueEvent(namePtr, dataPtr, dataLen): + chronicles.error "event queue full; library marked stuck", + event = eventName, capacity = EventQueueCapacity + freeEventBuffers(namePtr, dataPtr) + if not ffiCurrentEventQueueStuck.isNil(): + ffiCurrentEventQueueStuck[].store(true) + if not ffiCurrentNotifyEventEnqueued.isNil(): + ffiCurrentNotifyEventEnqueued() + break enqueueBlock + if not ffiCurrentNotifyEventEnqueued.isNil(): + ffiCurrentNotifyEventEnqueued() + + ## `body` must yield `string` / `seq[byte]`. FFI thread only: encodes into + ## a `c_malloc` buffer and enqueues; the event thread fans out to listeners. + block: + let evtName: string = eventName + let bodyVal = body + var dataPtr: ptr UncheckedArray[byte] = nil + let dataLen = bodyVal.len + if dataLen > 0: + dataPtr = cast[ptr UncheckedArray[byte]](c_malloc(csize_t(dataLen))) + copyMem(dataPtr, unsafeAddr bodyVal[0], dataLen) + let namePtr = alloc(evtName) + enqueueOrMarkStuck(evtName, namePtr, dataPtr, dataLen) template dispatchFFIEventCbor*(eventName: string, eventPayload: typed) = - ## Typed CBOR variant of `dispatchFFIEvent`. Wraps `eventPayload` in an - ## `EventEnvelope`, CBOR-encodes it into a `c_malloc` buffer once, and - ## fans the same buffer out to every registered listener. - ## - ## NB: parameter is `eventPayload`, not `payload` — Nim's template - ## substitution would otherwise also rewrite the `payload:` field inside - ## `EventEnvelope`. - withFFIEventDispatch(eventName, listeners): - var (data, dataLen) = cborEncodeShared( - EventEnvelope[typeof(eventPayload)](eventType: eventName, payload: eventPayload) + ## Typed CBOR variant of `dispatchFFIEvent`. The param is `eventPayload` + ## (not `payload`) to avoid clobbering `EventEnvelope.payload` substitution. + block: + let evtName: string = eventName + var (dataPtr, dataLen) = cborEncodeShared( + EventEnvelope[typeof(eventPayload)](eventType: evtName, payload: eventPayload) ) - defer: - cborFreeShared(data) - notifyListeners(listeners, RET_OK, data, dataLen) + let namePtr = alloc(evtName) + enqueueOrMarkStuck(evtName, namePtr, dataPtr, dataLen) diff --git a/ffi/ffi_thread.nim b/ffi/ffi_thread.nim index 1589c4a..94f561d 100644 --- a/ffi/ffi_thread.nim +++ b/ffi/ffi_thread.nim @@ -12,22 +12,22 @@ proc sendRequestToFFIThread*( ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration ): Result[void, string] = - # Reentrancy guard: only this thread can fire `reqReceivedSignal`, so a handler dispatching back would self-deadlock. + if ctx.eventQueueStuck.load(): + deleteRequest(ffiRequest) + return err("event queue stuck - library cannot accept new requests") + if onFFIThread: + # Re-entrant dispatch from a handler would self-deadlock on `reqReceivedSignal`. deleteRequest(ffiRequest) return err( "reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context" ) - # All async submissions serialise on `ctx.lock` for the full - # trySend + fireSync + waitSync sequence because `reqChannel` is - # single-producer and `reqReceivedSignal` is shared across callers. - # Multi-producer redesign is tracked as PR #23 review item 7. + # Serialise the trySend + fireSync + waitSync — reqChannel is SP and reqReceivedSignal is shared. ctx.lock.acquire() defer: ctx.lock.release() - ## Sending the request let sentOk = ctx.reqChannel.trySend(ffiRequest) if not sentOk: deleteRequest(ffiRequest) @@ -42,15 +42,12 @@ proc sendRequestToFFIThread*( deleteRequest(ffiRequest) return err("Couldn't fireSync in time") - ## wait until the FFI working thread properly received the request let res = ctx.reqReceivedSignal.waitSync(timeout) if res.isErr(): - ## Do not free ffiRequest here: the FFI thread was already signaled and - ## will process (and free) it. + # FFI thread was signaled and owns the request; don't double-free. return err("Couldn't receive reqReceivedSignal signal") - ## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the - ## process proc. + # On ok the FFI thread's processRequest deallocShared(req)'s. ok() proc processRequest[T]( @@ -59,44 +56,44 @@ proc processRequest[T]( ## Invoked within the FFI thread to process a request coming from the FFI API consumer thread. let reqId = $request[].reqId - ## The reqId determines which proc will handle the request. - ## The registeredRequests represents a table defined at compile time. - ## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously] - - ## Explicit conversion keeps `reqId` alive as the backing string, - ## avoiding the implicit string→cstring warning that will become an error. - let reqIdCs = reqId.cstring + let reqIdCs = reqId.cstring # keeps reqId alive; implicit string→cstring is a warning. let retFut = if not ctx[].registeredRequests[].contains(reqIdCs): - ## That shouldn't happen because only registered requests should be sent to the FFI thread. nilProcess(request[].reqId) else: ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx) - ## Catch every catchable exception (including CancelledError raised by - ## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest` - ## defer — always runs. Otherwise an abandoned in-flight handler would - ## leak its request envelope, reqId copy, and CBOR payload. + # CatchableError covers CancelledError from the shutdown drain; handleRes must still run. let res = try: await retFut - except CatchableError as exc: + except CatchableError as e: Result[seq[byte], string].err( - "Error in processRequest for " & reqId & ": " & exc.msg + "Error in processRequest for " & reqId & ": " & e.msg ) - ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here - ## keeps the async proc raises:[] compatible. The defer inside handleRes - ## guarantees request is freed before the exception propagates. try: handleRes(res, request) - except Exception as exc: - error "Unexpected exception in handleRes", error = exc.msg + except Exception as e: + error "Unexpected exception in handleRes", error = e.msg + +var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr + # Stashed so the hook has no closure env. + +proc ffiNotifyEventEnqueuedHook() {.gcsafe, raises: [].} = + if not ffiEventQueueSignalPtr.isNil(): + let res = ffiEventQueueSignalPtr.fireSync() + if res.isErr(): + error "failed to fire eventQueueSignal after enqueue", err = res.error proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = ## FFI thread body that attends library user API requests ffiCurrentEventRegistry = addr ctx[].eventRegistry + ffiCurrentEventQueue = addr ctx[].eventQueue + ffiCurrentEventQueueStuck = addr ctx[].eventQueueStuck + ffiEventQueueSignalPtr = ctx.eventQueueSignal + ffiCurrentNotifyEventEnqueued = ffiNotifyEventEnqueuedHook onFFIThread = true logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) @@ -109,23 +106,18 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = - var ffiReqHandler: T - ## Holds the main library object, i.e., in charge of handling the ffi requests. - ## e.g., Waku, LibP2P, SDS, etc. + var ffiReqHandler: T # main library object (Waku, LibP2P, SDS, …) - ## In-flight processRequest futures. Tracked so they can be drained on - ## shutdown — otherwise destroying the context while a handler is - ## awaiting (e.g. sleepAsync) abandons the future and leaks the - ## request's envelope/reqId/payload allocations. + # Tracked so shutdown can drain them; abandoning a mid-await future leaks the request. var pending: seq[Future[void]] = @[] proc reapCompleted() = var i = 0 while i < pending.len: - if pending[i].finished(): - pending.del(i) - else: + if not pending[i].finished(): inc i + continue + pending.del(i) while ctx.running.load(): # Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread. @@ -137,7 +129,6 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = if not gotSignal: continue - ## Wait for a request from the ffi consumer thread var request: ptr FFIThreadRequest if not ctx.reqChannel.tryRecv(request): continue @@ -145,22 +136,18 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = if ctx.myLib.isNil(): ctx.myLib = addr ffiReqHandler - ## Handle the request pending.add processRequest(request, ctx) let fireRes = ctx.reqReceivedSignal.fireSync() if fireRes.isErr(): error "could not fireSync back to requester thread", error = fireRes.error - ## Drain in-flight handlers so each request's `deleteRequest` runs - ## before we exit. Without this, abandoning a future mid-await would - ## leak the request allocations (visible to LSan; previously hidden - ## because Nim's pool allocator kept the chunks alive in the process). + # Drain so each pending handler's deleteRequest defer runs before exit. reapCompleted() if pending.len > 0: try: await allFutures(pending) - except CatchableError as exc: - error "draining pending FFI requests on shutdown raised", error = exc.msg + except CatchableError as e: + error "draining pending FFI requests on shutdown raised", error = e.msg waitFor ffiRun(ctx) diff --git a/tests/unit/test_event_dispatch.nim b/tests/unit/test_event_dispatch.nim index eab9a22..05005fc 100644 --- a/tests/unit/test_event_dispatch.nim +++ b/tests/unit/test_event_dispatch.nim @@ -1,11 +1,5 @@ -## Tests for the CBOR-style FFI event dispatch path: -## - `dispatchFFIEvent` accepts both `string` and `seq[byte]` bodies -## - `dispatchFFIEventCbor` wraps a typed payload in `EventEnvelope[T]`, -## CBOR-encodes it, and dispatches via the event callback -## -## Tests run end-to-end against a real FFI thread (via FFIContextPool + -## sendRequestToFFIThread) so we exercise the threadvar-backed -## ffiCurrentEventRegistry wiring, not just the templates in isolation. +## End-to-end tests for `dispatchFFIEvent` / `dispatchFFIEventCbor`, +## driven through a real `FFIContext` so the threadvar wiring is exercised. import std/[locks, os] import unittest2 @@ -14,16 +8,10 @@ import ffi type TestEvtLib = object -## Event payload type (would be `{.ffi.}` in production so the codec gen -## emits a matching struct on the foreign side; the test only needs CBOR -## round-trip, which `cborEncode`/`cborDecode` provide via cbor_serial's -## generic overloads). type MessageSentBody* {.ffi.} = object requestId*: string messageHash*: string -## Same callback-state helper as test_ffi_context.nim, duplicated here so -## this file stays a self-contained test binary. type CallbackData = object lock: Lock cond: Cond @@ -40,6 +28,13 @@ proc deinitCallbackData(d: var CallbackData) = d.cond.deinitCond() d.lock.deinitLock() +template setupCallbackData(name: untyped) = + ## Declares `name`, inits it, and defers its deinit in the caller's scope. + var name: CallbackData + initCallbackData(name) + defer: + deinitCallbackData(name) + proc captureCb( retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} = @@ -60,15 +55,27 @@ proc waitCallback(d: var CallbackData) = wait(d.cond, d.lock) release(d.lock) +proc resetCalled(d: var CallbackData) = + acquire(d.lock) + d.called = false + release(d.lock) + proc callbackBytes(d: var CallbackData): seq[byte] = var bytes = newSeq[byte](d.msgLen) if d.msgLen > 0: copyMem(addr bytes[0], addr d.msg[0], d.msgLen) bytes -## A request that dispatches a typed CBOR event from inside the FFI -## thread and then returns ok — so the response callback can be used to -## synchronize the test. +template withPool(ctxIdent: untyped, body: untyped) = + ## Sets up pool + ctx, runs body, destroys on exit. + var pool: FFIContextPool[TestEvtLib] + let ctxIdent = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctxIdent) + body + registerReqFFI(EmitCborEventRequest, lib: ptr TestEvtLib): proc(): Future[Result[string, string]] {.async.} = dispatchFFIEventCbor( @@ -76,19 +83,13 @@ registerReqFFI(EmitCborEventRequest, lib: ptr TestEvtLib): ) return ok("emitted") -## A request that uses the lower-level `dispatchFFIEvent` with a raw -## `seq[byte]` body — the path that previously rejected non-string bodies. registerReqFFI(EmitRawBytesEventRequest, lib: ptr TestEvtLib): proc(): Future[Result[string, string]] {.async.} = dispatchFFIEvent("raw_bytes"): @[byte 0x01, 0x02, 0x03] return ok("emitted") -## Setter-thread worker for the registry race regression test. Each -## iteration adds then immediately removes a listener for the dispatched -## event so a TSan-instrumented build can confirm `FFIEventRegistry.lock` -## serialises the cross-thread mutation against dispatch-time -## `snapshotListeners` reads from the FFI thread. +# Add/remove worker for the registry-race regression test. type SetterArgs = tuple[ ctx: ptr FFIContext[TestEvtLib], stop: ptr Atomic[bool], target: ptr CallbackData @@ -102,144 +103,83 @@ proc setterThreadBody(args: SetterArgs) {.thread.} = suite "dispatchFFIEventCbor": test "delivers EventEnvelope-shaped CBOR payload to event callback": - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) + # CallbackData defers declared first run last (LIFO), AFTER pool destroy + # joins the event thread — otherwise TSan flags captureCb on a destroyed mutex. + setupCallbackData(evt) + setupCallbackData(rsp) - var evt: CallbackData - initCallbackData(evt) - defer: - deinitCallbackData(evt) + withPool(ctx): + discard addEventListener(ctx[].eventRegistry, "message_sent", captureCb, addr evt) - # Subscribe to the specific event the request below dispatches. - discard addEventListener(ctx[].eventRegistry, "message_sent", captureCb, addr evt) + check sendRequestToFFIThread( + ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() + waitCallback(rsp) + waitCallback(evt) - # Trigger the dispatch from the FFI thread; the response callback is - # ignored (we only care that the request completed so we know the event - # has fired). - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) - - check sendRequestToFFIThread( - ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) - ) - .isOk() - waitCallback(rsp) - waitCallback(evt) - - check evt.retCode == RET_OK - let decoded = cborDecode(callbackBytes(evt), EventEnvelope[MessageSentBody]) - check decoded.isOk() - check decoded.value.eventType == "message_sent" - check decoded.value.payload.requestId == "req-1" - check decoded.value.payload.messageHash == "0xdeadbeef" + check evt.retCode == RET_OK + let decoded = cborDecode(callbackBytes(evt), EventEnvelope[MessageSentBody]) + check decoded.isOk() + check decoded.value.eventType == "message_sent" + check decoded.value.payload.requestId == "req-1" + check decoded.value.payload.messageHash == "0xdeadbeef" suite "dispatchFFIEvent with seq[byte]": test "accepts a raw seq[byte] body": - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) + setupCallbackData(evt) + setupCallbackData(rsp) - var evt: CallbackData - initCallbackData(evt) - defer: - deinitCallbackData(evt) + withPool(ctx): + discard addEventListener(ctx[].eventRegistry, "raw_bytes", captureCb, addr evt) - discard addEventListener(ctx[].eventRegistry, "raw_bytes", captureCb, addr evt) + check sendRequestToFFIThread( + ctx, EmitRawBytesEventRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() + waitCallback(rsp) + waitCallback(evt) - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) - - check sendRequestToFFIThread( - ctx, EmitRawBytesEventRequest.ffiNewReq(captureCb, addr rsp) - ) - .isOk() - waitCallback(rsp) - waitCallback(evt) - - check evt.retCode == RET_OK - check callbackBytes(evt) == @[byte 0x01, 0x02, 0x03] + check evt.retCode == RET_OK + check callbackBytes(evt) == @[byte 0x01, 0x02, 0x03] when not defined(gcRefc): - ## Skipped under `--mm:refc`: each setter thread grows / shrinks the - ## per-event listener `seq[FFIEventListener]` via `addEventListener`, - ## and refc's per-thread GC heap ownership makes cross-thread seq - ## buffer reallocation unsafe even when the surrounding lock is held. - ## ORC + the FFI thread + tsan (the combo this test was written for) - ## does not have that limitation. + ## Skipped under refc: setter threads grow/shrink the per-event listener + ## seq, and refc's per-thread GC heap makes that unsafe cross-thread. suite "FFIEventRegistry concurrent access": - ## Regression for PR #39 review comments r3288220895 / r3289285387. - ## Run under tsan to actually validate the fix: - ## NIM_FFI_SAN=tsan NIM_FFI_MM=orc nimble test_sanitized + ## Regression for PR #39 (r3288220895 / r3289285387). + ## Validate with: NIM_FFI_SAN=tsan NIM_FFI_MM=orc nimble test_sanitized test "concurrent add/remove writers vs dispatch reads stay race-free": - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) + setupCallbackData(evt) + setupCallbackData(rsp) - var evt: CallbackData - initCallbackData(evt) - defer: - deinitCallbackData(evt) + withPool(ctx): + # Seed an initial listener so the first dispatch has a target. + discard + addEventListener(ctx[].eventRegistry, "message_sent", captureCb, addr evt) - # Seed an initial callback so the FFI thread's first dispatch has a - # target. The setter threads will then repeatedly re-install the same - # (callback, userData) pair — what matters is the cross-thread write - # racing the FFI thread's read, not which pair "wins". - discard addEventListener(ctx[].eventRegistry, "message_sent", captureCb, addr evt) + const NumSetterThreads = 4 + const NumDispatchIters = 200 - const NumSetterThreads = 4 - const NumDispatchIters = 200 + var stop: Atomic[bool] + stop.store(false) + var setters: array[NumSetterThreads, Thread[SetterArgs]] + for i in 0 ..< NumSetterThreads: + createThread(setters[i], setterThreadBody, (ctx, addr stop, addr evt)) - var stop: Atomic[bool] - stop.store(false) - var setters: array[NumSetterThreads, Thread[SetterArgs]] - for i in 0 ..< NumSetterThreads: - createThread(setters[i], setterThreadBody, (ctx, addr stop, addr evt)) + for _ in 0 ..< NumDispatchIters: + resetCalled(rsp) + check sendRequestToFFIThread( + ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() + waitCallback(rsp) - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) + stop.store(true) + for i in 0 ..< NumSetterThreads: + joinThread(setters[i]) - for _ in 0 ..< NumDispatchIters: - # Reset rsp so each iteration's `waitCallback` blocks until the - # FFI thread fires the response — keeps the loop synchronous. - acquire(rsp.lock) - rsp.called = false - release(rsp.lock) - - check sendRequestToFFIThread( - ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) - ) - .isOk() - waitCallback(rsp) - - stop.store(true) - for i in 0 ..< NumSetterThreads: - joinThread(setters[i]) - - # `evt` got hit by every dispatch above; just confirm at least one - # actually landed so a silently-broken dispatch loop is caught. - check evt.called - -## A foreign-thread mutation must not be able to invalidate the -## listener's `userData` while an in-flight dispatch is mid-invocation. -## The dispatch templates hold `reg.lock` for the entire snapshot + -## invocation, so foreign `removeEventListener` blocks until dispatch -## returns. + check evt.called type SlowState = object entered: Atomic[bool] @@ -248,54 +188,41 @@ type SlowState = object proc slowEventCb( retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} = - ## Signal entry, sleep briefly (the window during which the main - ## thread must call removeEventListener and block), signal exit. let st = cast[ptr SlowState](userData) st[].entered.store(true) - os.sleep(15) + os.sleep(60) st[].exited.store(true) -type DispatcherArgs = tuple[reg: ptr FFIEventRegistry, done: ptr Atomic[bool]] - -proc dispatcherBody(args: DispatcherArgs) {.thread.} = - ffiCurrentEventRegistry = args.reg - dispatchFFIEvent("evt"): - "payload" - args.done[].store(true) - suite "registry lock held during invocation": test "removeEventListener blocks until in-flight dispatch finishes": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) + setupCallbackData(rsp) - var st: SlowState - st.entered.store(false) - st.exited.store(false) + withPool(ctx): + var st: SlowState + st.entered.store(false) + st.exited.store(false) - let id = addEventListener(reg, "evt", slowEventCb, addr st) - check id != 0'u64 + let id = + addEventListener(ctx[].eventRegistry, "message_sent", slowEventCb, addr st) + check id != 0'u64 - var done: Atomic[bool] - done.store(false) - var thr: Thread[DispatcherArgs] - createThread(thr, dispatcherBody, (addr reg, addr done)) + check sendRequestToFFIThread( + ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() + waitCallback(rsp) - # Wait until the worker thread is inside slowEventCb. - for _ in 0 ..< 200: - if st.entered.load(): - break - os.sleep(1) - check st.entered.load() - check not st.exited.load() + for _ in 0 ..< 500: + if st.entered.load(): + break + os.sleep(1) + check st.entered.load() + check not st.exited.load() - # Lock-during-invocation contract: remove blocks until dispatch - # finishes; by the time it returns, slowEventCb has set exited=true. - check removeEventListener(reg, id) - check st.exited.load() - joinThread(thr) - check done.load() + # Lock-during-invocation: remove blocks until dispatch finishes, + # by which time slowEventCb has set exited=true. + check removeEventListener(ctx[].eventRegistry, id) + check st.exited.load() suite "liveness events": ## `onNotResponding` / `onResponding` bypass the event queue and dispatch @@ -304,63 +231,41 @@ suite "liveness events": ## name + CBOR-encoded `EventEnvelope[…]`) so a future refactor can't ## silently break consumers polling for the "library hung" signal. test "onNotResponding delivers EventEnvelope[NotRespondingEvent] to subscribers": - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) + setupCallbackData(evt) - var evt: CallbackData - initCallbackData(evt) - defer: - deinitCallbackData(evt) + withPool(ctx): + discard addEventListener( + ctx[].eventRegistry, NotRespondingEventName, captureCb, addr evt + ) - discard - addEventListener(ctx[].eventRegistry, NotRespondingEventName, captureCb, addr evt) + onNotResponding(ctx) - onNotResponding(ctx) - - waitCallback(evt) - check evt.retCode == RET_OK - let decoded = cborDecode(callbackBytes(evt), EventEnvelope[NotRespondingEvent]) - check decoded.isOk() - check decoded.value.eventType == NotRespondingEventName + waitCallback(evt) + check evt.retCode == RET_OK + let decoded = cborDecode(callbackBytes(evt), EventEnvelope[NotRespondingEvent]) + check decoded.isOk() + check decoded.value.eventType == NotRespondingEventName test "onResponding delivers EventEnvelope[RespondingEvent] to subscribers": - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) + setupCallbackData(evt) - var evt: CallbackData - initCallbackData(evt) - defer: - deinitCallbackData(evt) + withPool(ctx): + discard + addEventListener(ctx[].eventRegistry, RespondingEventName, captureCb, addr evt) - discard - addEventListener(ctx[].eventRegistry, RespondingEventName, captureCb, addr evt) + onResponding(ctx) - onResponding(ctx) - - waitCallback(evt) - check evt.retCode == RET_OK - let decoded = cborDecode(callbackBytes(evt), EventEnvelope[RespondingEvent]) - check decoded.isOk() - check decoded.value.eventType == RespondingEventName + waitCallback(evt) + check evt.retCode == RET_OK + let decoded = cborDecode(callbackBytes(evt), EventEnvelope[RespondingEvent]) + check decoded.isOk() + check decoded.value.eventType == RespondingEventName test "liveness events with no subscriber are a no-op": - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) - # No listener registered — must not crash, must not block. - onNotResponding(ctx) - onResponding(ctx) + withPool(ctx): + # No listener registered — must not crash, must not block. + onNotResponding(ctx) + onResponding(ctx) suite "event thread drains queued events": ## The event thread wakes every `EventThreadTickInterval` (or on @@ -370,28 +275,19 @@ suite "event thread drains queued events": ## deliver it — exercises the `tryEnqueueEvent` → `drainEventQueue` → ## `dispatchQueuedEvent` → listener path end-to-end. test "enqueued event is delivered to subscriber within a tick": - var pool: FFIContextPool[TestEvtLib] - let ctx = pool.createFFIContext().valueOr: - check false - return - defer: - discard pool.destroyFFIContext(ctx) + setupCallbackData(evt) - var evt: CallbackData - initCallbackData(evt) - defer: - deinitCallbackData(evt) + withPool(ctx): + const QueuedEvtName = "queued_evt" + discard addEventListener(ctx[].eventRegistry, QueuedEvtName, captureCb, addr evt) - const QueuedEvtName = "queued_evt" - discard addEventListener(ctx[].eventRegistry, QueuedEvtName, captureCb, addr evt) + # `tryEnqueueEvent` takes ownership of both buffers on success; the + # event thread c_frees them after dispatch returns. + let nameBuf = alloc(QueuedEvtName) + let payload = @[byte 0xDE, 0xAD, 0xBE, 0xEF] + var shared = allocSharedSeq(payload) + check tryEnqueueEvent(ctx[].eventQueue, nameBuf, shared.data, shared.len) - # `tryEnqueueEvent` takes ownership of both buffers on success; the - # event thread c_frees them after dispatch returns. - let nameBuf = alloc(QueuedEvtName) - let payload = @[byte 0xDE, 0xAD, 0xBE, 0xEF] - var shared = allocSharedSeq(payload) - check tryEnqueueEvent(ctx[].eventQueue, nameBuf, shared.data, shared.len) - - waitCallback(evt) - check evt.retCode == RET_OK - check callbackBytes(evt) == payload + waitCallback(evt) + check evt.retCode == RET_OK + check callbackBytes(evt) == payload diff --git a/tests/unit/test_event_listener.nim b/tests/unit/test_event_listener.nim index 3a74bd3..f63e860 100644 --- a/tests/unit/test_event_listener.nim +++ b/tests/unit/test_event_listener.nim @@ -1,23 +1,12 @@ -## Unit tests for the `FFIEventRegistry` primitive — the multi-listener -## data structure that will back `_add_event_listener` / -## `_remove_event_listener` once the dispatch wiring lands. -## -## These tests exercise the registry directly (no FFI thread, no dispatch -## templates) so they stay fast and pin down the registry's mutation and -## snapshot semantics in isolation. +## Unit tests for the `FFIEventRegistry` primitive (no FFI thread, no dispatch). import std/locks import unittest2 import ffi -# Tiny helpers — a thread-safe sink each listener writes into so we can -# assert which callbacks would fire and in what order once dispatch lands. -# Today only `tagCb`'s presence is exercised; the recorder is also used to -# make sure listener bookkeeping doesn't accidentally invoke callbacks. - type Recorder = object lock: Lock - hits: seq[string] # tag captured from `userData` per invocation + hits: seq[string] retCodes: seq[cint] payloads: seq[string] @@ -34,7 +23,6 @@ proc record(r: var Recorder, tag: string, retCode: cint, payload: string) = r.payloads.add(payload) release(r.lock) -# Each listener is identified by a `Tag` passed through `userData`. type Tag = object name: string rec: ptr Recorder @@ -48,16 +36,22 @@ proc tagCb( copyMem(addr payload[0], msg, int(len)) record(t[].rec[], t[].name, retCode, payload) +template setupRegistry(regIdent: untyped) = + var regIdent: FFIEventRegistry + initEventRegistry(regIdent) + defer: + deinitEventRegistry(regIdent) + +template setupRecorder(recIdent: untyped) = + var recIdent: Recorder + initRecorder(recIdent) + defer: + deinitRecorder(recIdent) + suite "FFIEventRegistry mutation": test "addEventListener assigns monotonically increasing non-zero ids": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) - var rec: Recorder - initRecorder(rec) - defer: - deinitRecorder(rec) + setupRegistry(reg) + setupRecorder(rec) var t = Tag(name: "a", rec: addr rec) let id1 = addEventListener(reg, "evt", tagCb, addr t) @@ -68,30 +62,18 @@ suite "FFIEventRegistry mutation": check id3 == 3'u64 test "addEventListener returns 0 when callback is nil": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) + setupRegistry(reg) let id = addEventListener(reg, "evt", nil, nil) check id == 0'u64 test "removeEventListener returns false for unknown ids": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) + setupRegistry(reg) check not removeEventListener(reg, 0'u64) check not removeEventListener(reg, 99'u64) test "removeEventListener removes listeners across distinct events": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) - var rec: Recorder - initRecorder(rec) - defer: - deinitRecorder(rec) + setupRegistry(reg) + setupRecorder(rec) var t = Tag(name: "a", rec: addr rec) let id1 = addEventListener(reg, "evt", tagCb, addr t) @@ -99,7 +81,6 @@ suite "FFIEventRegistry mutation": check removeEventListener(reg, id1) check removeEventListener(reg, id2) - # Second remove of the same id is a no-op. check not removeEventListener(reg, id1) check snapshotListeners(reg, "evt").len == 0 @@ -107,14 +88,8 @@ suite "FFIEventRegistry mutation": suite "FFIEventRegistry snapshot semantics": test "snapshot returns only the listeners for the requested event": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) - var rec: Recorder - initRecorder(rec) - defer: - deinitRecorder(rec) + setupRegistry(reg) + setupRecorder(rec) var a = Tag(name: "a", rec: addr rec) var b = Tag(name: "b", rec: addr rec) var c = Tag(name: "c", rec: addr rec) @@ -123,32 +98,19 @@ suite "FFIEventRegistry snapshot semantics": discard addEventListener(reg, "evt", tagCb, addr b) discard addEventListener(reg, "other", tagCb, addr c) - let snapEvt = snapshotListeners(reg, "evt") - check snapEvt.len == 2 # both listeners for "evt" - - let snapOther = snapshotListeners(reg, "other") - check snapOther.len == 1 # only the listener for "other" - - let snapUnknown = snapshotListeners(reg, "no-subscriber") - check snapUnknown.len == 0 # no listener for this event + check snapshotListeners(reg, "evt").len == 2 + check snapshotListeners(reg, "other").len == 1 + check snapshotListeners(reg, "no-subscriber").len == 0 test "snapshot is a copy: post-snapshot mutation does not affect it": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) - var rec: Recorder - initRecorder(rec) - defer: - deinitRecorder(rec) + setupRegistry(reg) + setupRecorder(rec) var t = Tag(name: "a", rec: addr rec) let id1 = addEventListener(reg, "evt", tagCb, addr t) let snap = snapshotListeners(reg, "evt") check snap.len == 1 - # Mutating the registry after the snapshot must not retroactively - # shrink or grow the snapshot we already captured. check removeEventListener(reg, id1) discard addEventListener(reg, "evt", tagCb, addr t) check snap.len == 1 @@ -156,14 +118,8 @@ suite "FFIEventRegistry snapshot semantics": suite "removeAllEventListeners": test "drops every registered listener": - var reg: FFIEventRegistry - initEventRegistry(reg) - defer: - deinitEventRegistry(reg) - var rec: Recorder - initRecorder(rec) - defer: - deinitRecorder(rec) + setupRegistry(reg) + setupRecorder(rec) var a = Tag(name: "a", rec: addr rec) var b = Tag(name: "b", rec: addr rec) diff --git a/tests/unit/test_event_thread.nim b/tests/unit/test_event_thread.nim new file mode 100644 index 0000000..51f8829 --- /dev/null +++ b/tests/unit/test_event_thread.nim @@ -0,0 +1,322 @@ +## Integration tests for the dedicated event thread (issue #6). + +import std/[atomics, locks, os, strutils] +import unittest2 +import results +import ffi + +type TestEvtLib = object + +type LatchPayload* {.ffi.} = object + iter*: int + +type CallbackData = object + lock: Lock + cond: Cond + called: bool + retCode: cint + msg: array[1024, byte] + msgLen: int + +proc initCallbackData(d: var CallbackData) = + d.lock.initLock() + d.cond.initCond() + +proc deinitCallbackData(d: var CallbackData) = + d.cond.deinitCond() + d.lock.deinitLock() + +template setupCallbackData(name: untyped) = + ## Declares `name`, inits it, and defers its deinit in the caller's scope. + var name: CallbackData + initCallbackData(name) + defer: + deinitCallbackData(name) + +proc captureCb( + retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} = + let d = cast[ptr CallbackData](userData) + acquire(d[].lock) + d[].retCode = retCode + let n = min(int(len), d[].msg.len) + if n > 0 and not msg.isNil(): + copyMem(addr d[].msg[0], msg, n) + d[].msgLen = n + d[].called = true + signal(d[].cond) + release(d[].lock) + +proc waitCallback(d: var CallbackData) = + acquire(d.lock) + while not d.called: + wait(d.cond, d.lock) + release(d.lock) + +proc resetCalled(d: var CallbackData) = + acquire(d.lock) + d.called = false + release(d.lock) + +proc waitCallbackTimeout(d: var CallbackData, timeoutMs: int): bool = + ## Polls under `d.lock` so the load syncs with the `captureCb` writer. + let deadline = Moment.now() + timeoutMs.milliseconds + while true: + acquire(d.lock) + let done = d.called + release(d.lock) + if done: + return true + if Moment.now() >= deadline: + return false + os.sleep(10) + +template withPool(ctxIdent: untyped, body: untyped) = + ## Sets up a pool + ctx, runs body, destroys on exit. + var pool: FFIContextPool[TestEvtLib] + let ctxIdent = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctxIdent) + body + +registerReqFFI(EmitLatchEvent, lib: ptr TestEvtLib): + proc(iter: int): Future[Result[string, string]] {.async.} = + dispatchFFIEventCbor("latch", LatchPayload(iter: iter)) + return ok("emitted") + +registerReqFFI(PingEvent, lib: ptr TestEvtLib): + proc(): Future[Result[string, string]] {.async.} = + return ok("pong") + +# Atomic switch so the wedge fires deterministically per test. +var gBlockingEnabled: Atomic[bool] +gBlockingEnabled.store(false) + +registerReqFFI(BlockingRequest, lib: ptr TestEvtLib): + proc(milliseconds: int): Future[Result[string, string]] {.async.} = + if gBlockingEnabled.load(): + os.sleep(milliseconds) + return ok("done") + +var gListenerThreadId: Atomic[int] +gListenerThreadId.store(-1) + +proc captureThreadIdCb( + retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} = + gListenerThreadId.store(getThreadId()) + let d = cast[ptr CallbackData](userData) + acquire(d[].lock) + d[].called = true + signal(d[].cond) + release(d[].lock) + +var gFfiThreadId: Atomic[int] +gFfiThreadId.store(-1) + +registerReqFFI(CaptureFfiTidRequest, lib: ptr TestEvtLib): + proc(): Future[Result[string, string]] {.async.} = + gFfiThreadId.store(getThreadId()) + return ok("captured") + +suite "event delivery is asynchronous": + test "listener runs on the event thread, not the FFI thread": + # CallbackData defers declared first run last (LIFO): pool-destroy joins + # the event thread before any still-held mutex is torn down. TSan otherwise + # flags `captureCb` on a destroyed mutex. + setupCallbackData(evt) + setupCallbackData(rsp) + + withPool(ctx): + discard addEventListener( + ctx[].eventRegistry, "latch", captureThreadIdCb, addr evt + ) + + check sendRequestToFFIThread( + ctx, CaptureFfiTidRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() + waitCallback(rsp) + + resetCalled(rsp) + check sendRequestToFFIThread( + ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0) + ) + .isOk() + waitCallback(rsp) + waitCallback(evt) + + let ffiTid = gFfiThreadId.load() + let listenerTid = gListenerThreadId.load() + check ffiTid >= 0 + check listenerTid >= 0 + check ffiTid != listenerTid + +proc slowSleepCb( + retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} = + os.sleep(150) + +suite "FFI thread independence": + test "slow listener does not block FFI thread request round-trip": + setupCallbackData(rsp) + + withPool(ctx): + discard addEventListener( + ctx[].eventRegistry, "latch", slowSleepCb, nil + ) + + check sendRequestToFFIThread( + ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0) + ) + .isOk() + waitCallback(rsp) + resetCalled(rsp) + + # chronos's `Moment` — std/times exports a `milliseconds` that + # shadows chronos's at this generic-instantiation site. + let started = Moment.now() + check sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rsp)) + .isOk() + waitCallback(rsp) + let elapsed = Moment.now() - started + + check elapsed < 100.milliseconds # under the 150 ms slow-listener sleep + +when not defined(gcRefc): + ## Skipped under refc: sleeping the FFI thread inside a sync handler + ## interacts badly with refc + existing destroy-on-time policies. + suite "FFI heartbeat staleness": + test "wedged FFI thread triggers onNotResponding via heartbeat": + setupCallbackData(notif) + setupCallbackData(rsp) + + var pool: FFIContextPool[TestEvtLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + # Disable wedge first so destroy isn't blocked by the still-sleeping handler. + gBlockingEnabled.store(false) + discard pool.destroyFFIContext(ctx) + + discard addEventListener( + ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif + ) + + # Wait out the start-delay so the heartbeat check is armed. + os.sleep(FFIHeartbeatStartDelay.milliseconds.int + 200) + + # Wedge long enough to cross at least one tick boundary. + gBlockingEnabled.store(true) + let wedgeMs = + (EventThreadTickInterval + FFIHeartbeatStaleThreshold).milliseconds.int + + 1500 + check sendRequestToFFIThread( + ctx, BlockingRequest.ffiNewReq(captureCb, addr rsp, wedgeMs) + ) + .isOk() + waitCallback(rsp) + gBlockingEnabled.store(false) + + check waitCallbackTimeout(notif, 1500) + +type BackpressureState = object + enteredLock: Lock + enteredCond: Cond + entered: Atomic[bool] + releaseLock: Lock + releaseCond: Cond + release: Atomic[bool] + +proc initBackpressure(b: var BackpressureState) = + b.enteredLock.initLock() + b.enteredCond.initCond() + b.releaseLock.initLock() + b.releaseCond.initCond() + +proc deinitBackpressure(b: var BackpressureState) = + b.enteredCond.deinitCond() + b.enteredLock.deinitLock() + b.releaseCond.deinitCond() + b.releaseLock.deinitLock() + +proc backpressureCb( + retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} = + ## First call signals entered then blocks under reg.lock to back-pressure + ## subsequent dispatches — gives a deterministic way to fill the queue. + let b = cast[ptr BackpressureState](userData) + if not b[].entered.exchange(true): + acquire(b[].enteredLock) + signal(b[].enteredCond) + release(b[].enteredLock) + + acquire(b[].releaseLock) + while not b[].release.load(): + wait(b[].releaseCond, b[].releaseLock) + release(b[].releaseLock) + +registerReqFFI(BurstEmit, lib: ptr TestEvtLib): + proc(count: int): Future[Result[string, string]] {.async.} = + for i in 0 ..< count: + dispatchFFIEventCbor("latch", LatchPayload(iter: i)) + return ok("bursted") + +suite "queue overflow": + test "overflow sets stuck flag, fires onNotResponding, rejects new requests": + var bp: BackpressureState + initBackpressure(bp) + defer: + deinitBackpressure(bp) + + setupCallbackData(notif) + setupCallbackData(rsp) + setupCallbackData(rejected) + + withPool(ctx): + discard addEventListener( + ctx[].eventRegistry, "latch", backpressureCb, addr bp + ) + discard addEventListener( + ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif + ) + + # Kick one event so the listener holds reg.lock; subsequent enqueues + # pile up undrained. + check sendRequestToFFIThread( + ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, -1) + ) + .isOk() + waitCallback(rsp) + + acquire(bp.enteredLock) + while not bp.entered.load(): + wait(bp.enteredCond, bp.enteredLock) + release(bp.enteredLock) + + # Burst > capacity in one request; tail enqueues flip the stuck flag. + resetCalled(rsp) + check sendRequestToFFIThread( + ctx, BurstEmit.ffiNewReq(captureCb, addr rsp, EventQueueCapacity + 8) + ) + .isOk() + waitCallback(rsp) + + check ctx.eventQueueStuck.load() + + let res = + sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rejected)) + check res.isErr() + check res.error.contains("stuck") + + # Release backpressure so drain advances and the stuck flag fires + # not_responding. + acquire(bp.releaseLock) + bp.release.store(true) + signal(bp.releaseCond) + release(bp.releaseLock) + + check waitCallbackTimeout(notif, 2000)