diff --git a/CHANGELOG.md b/CHANGELOG.md index ac701fc..011f9d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,26 @@ All notable changes to this project are documented in this file. +## [Unreleased] + +### Changed +- User event callbacks now run on a dedicated event thread fed by a + bounded SPSC queue (default capacity 1024), so a slow listener can no + longer block the FFI thread or concurrent `add_event_listener` / + `remove_event_listener` calls + ([#6](https://github.com/logos-messaging/nim-ffi/issues/6)). +- Replaced the dedicated watchdog thread with a heartbeat check that + runs on the event thread. The FFI thread advances an atomic heartbeat + each loop iteration; if it stalls for more than 1s past the start-up + grace window, the event thread emits the `not_responding` event. + +### Added +- Queue-overflow handling: when the bounded event queue is full, the + library sets a sticky "stuck" flag, logs an error, fires + `not_responding` from the event thread, and rejects subsequent + `sendRequestToFFIThread` calls with `event queue stuck - library + cannot accept new requests`. + ## [0.2.0] - 2026-05-20 Major release introducing CBOR-based wire format, multi-language binding diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index f9284d7..17a4412 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -4,11 +4,7 @@ import system/ansi_c import std/[atomics, locks, options, tables] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results import - ./ffi_types, - ./ffi_events, - ./ffi_thread_request, - ./internal/ffi_macro, - ./logging, + ./ffi_types, ./ffi_events, ./ffi_thread_request, ./internal/ffi_macro, ./logging, ./cbor_serial export ffi_events @@ -98,7 +94,8 @@ proc sendRequestToFFIThread*( # Event-queue overflow refuses further requests; the event thread fires onNotResponding to avoid deadlocking on reg.lock here. if ctx.eventQueueStuck.load(): deleteRequest(ffiRequest) - return err("event queue stuck - library cannot accept new requests") + return + err("event queue stuck - library cannot accept new requests") # Reentrancy guard: only this thread can fire `reqReceivedSignal`, so a handler dispatching back would self-deadlock. if onFFIThread: @@ -262,7 +259,8 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = try: await allFutures(pending) except CatchableError as exc: - error "draining pending FFI requests on shutdown raised", error = exc.msg + error "draining pending FFI requests on shutdown raised", + error = exc.msg waitFor ffiRun(ctx) @@ -486,7 +484,8 @@ proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] = # Non-fatal: event thread will see running==false on the next tick. let evtSignaled = ctx.eventQueueSignal.fireSync() if evtSignaled.isErr(): - error "failed to signal eventQueueSignal in signalStop", error = evtSignaled.error + error "failed to signal eventQueueSignal in signalStop", + error = evtSignaled.error elif evtSignaled.get() == false: error "failed to signal eventQueueSignal on time in signalStop" ok() diff --git a/ffi/ffi_events.nim b/ffi/ffi_events.nim index 96cc0cd..cfcd0ff 100644 --- a/ffi/ffi_events.nim +++ b/ffi/ffi_events.nim @@ -2,13 +2,21 @@ ## FFI library-initiated events. Listeners receive only the event name ## they subscribed to. Queue payloads travel via `c_malloc` so transfer ## across Nim heaps is safe under both `--mm:orc` and `--mm:refc`. +## +## Dispatch templates enqueue and return on the FFI thread; a dedicated +## event thread (owned by `FFIContext`) drains the queue and invokes +## listeners, so a slow handler can never block the FFI event loop. On +## queue overflow the templates log, set a sticky stuck flag, and wake +## the event thread — which fires the global "not responding" +## notification from its own loop (firing it from the FFI thread would +## risk deadlocking against a listener back-pressuring under `reg.lock`). {.pragma: callback, cdecl, raises: [], gcsafe.} import system/ansi_c import std/[atomics, locks, sequtils, options, tables] import chronicles -import ./ffi_types, ./cbor_serial +import ./ffi_types, ./cbor_serial, ./alloc type EventEnvelope*[T] = object @@ -226,62 +234,86 @@ proc notifyListenersErr*(listeners: seq[FFIEventListener], msg: string) = ) var ffiCurrentEventRegistry* {.threadvar.}: ptr FFIEventRegistry + ## Kept for tests that drive the registry directly. Dispatch no longer + ## reads it — invocation has moved to the event thread. + var ffiCurrentEventQueue* {.threadvar.}: ptr EventQueue + ## Installed by the FFI thread so dispatch templates enqueue without + ## threading a `ctx` parameter through every call site. + var ffiCurrentEventQueueStuck* {.threadvar.}: ptr Atomic[bool] + ## Sticky overflow flag on the owning `FFIContext`. Set by dispatch + ## templates when enqueue fails; read by the FFI request entry point + ## to reject further calls. + var ffiCurrentNotifyEventEnqueued* {.threadvar.}: proc() {.gcsafe, raises: [].} ## Hook (not a queue field) so this module doesn't depend on chronos's - ## ThreadSignalPtr. Nil-safe. + ## ThreadSignalPtr. Nil-safe — tests that drive the queue directly leave + ## it unset and the event thread picks up enqueued events on the next tick. -template withFFIEventDispatch(eventName: string, listeners, body: untyped) = - ## Resolves the thread-local registry, snapshots listeners under - ## `reg.lock`, then runs `body` inside `foreignThreadGc` + try/except. - let regPtr = ffiCurrentEventRegistry - if regPtr.isNil(): - chronicles.error eventName & " - event registry not set on this thread" - return - - withLock regPtr[].lock: - let listeners = regPtr[].byEvent.getOrDefault(eventName) - if listeners.len == 0: - chronicles.debug eventName & " - no listener registered" - else: - foreignThreadGc: - try: - body - except Exception, CatchableError: - notifyListenersErr( - listeners, - "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(), - ) +template enqueueOrMarkStuck( + eventName: string, + namePtr: cstring, + dataPtr: ptr UncheckedArray[byte], + dataLen: int, +) = + ## Common tail for both dispatch templates. Takes ownership of `namePtr` + ## and `dataPtr` (both `c_malloc`'d). On queue-full, frees the buffers, + ## sets the sticky stuck flag, and wakes the event thread — which fires + ## onNotResponding from its loop (firing it here would risk deadlocking + ## against a listener back-pressuring under `reg.lock`). + let q = ffiCurrentEventQueue + if q.isNil(): + chronicles.error "event queue not set on this thread", event = eventName + if not namePtr.isNil: + c_free(cast[pointer](namePtr)) + if not dataPtr.isNil: + c_free(dataPtr) + elif not q[].tryEnqueueEvent(namePtr, dataPtr, dataLen): + chronicles.error "event queue full; library marked stuck", + event = eventName, capacity = EventQueueCapacity + if not namePtr.isNil: + c_free(cast[pointer](namePtr)) + if not dataPtr.isNil: + c_free(dataPtr) + if not ffiCurrentEventQueueStuck.isNil(): + ffiCurrentEventQueueStuck[].store(true) + if not ffiCurrentNotifyEventEnqueued.isNil(): + ffiCurrentNotifyEventEnqueued() + else: + if not ffiCurrentNotifyEventEnqueued.isNil(): + ffiCurrentNotifyEventEnqueued() template dispatchFFIEvent*(eventName: string, body: untyped) = ## Dispatches an FFI event to every listener subscribed to `eventName`. ## `body` must yield a `string` or `seq[byte]`. ## - ## Valid only on the FFI thread (where `ffiCurrentEventRegistry` is set). - ## Holds `reg.lock` for the entire snapshot + invocation so a concurrent - ## `removeEventListener` from a foreign thread blocks until dispatch - ## returns. Handlers must not call addEventListener / removeEventListener - ## on the same registry (would self-deadlock). - withFFIEventDispatch(eventName, listeners): - let event = body - let dataPtr: pointer = - if event.len > 0: unsafeAddr event[0] - else: nil - notifyListenersOk(listeners, dataPtr, event.len) + ## Runs on the FFI thread: encodes the body into a fresh `c_malloc` + ## buffer and enqueues it. Listener invocation happens later on the + ## dedicated event thread, so user code can never block the FFI loop. + block: + let evtName: string = eventName + let bodyVal = body + var dataPtr: ptr UncheckedArray[byte] = nil + let dataLen = bodyVal.len + if dataLen > 0: + dataPtr = cast[ptr UncheckedArray[byte]](c_malloc(csize_t(dataLen))) + copyMem(dataPtr, unsafeAddr bodyVal[0], dataLen) + let namePtr = alloc(evtName) + enqueueOrMarkStuck(evtName, namePtr, dataPtr, dataLen) template dispatchFFIEventCbor*(eventName: string, eventPayload: typed) = ## Typed CBOR variant of `dispatchFFIEvent`. Wraps `eventPayload` in an ## `EventEnvelope`, CBOR-encodes it into a `c_malloc` buffer once, and - ## fans the same buffer out to every registered listener. + ## queues it for the event thread to fan out to listeners. ## ## NB: parameter is `eventPayload`, not `payload` — Nim's template ## substitution would otherwise also rewrite the `payload:` field inside ## `EventEnvelope`. - withFFIEventDispatch(eventName, listeners): - var (data, dataLen) = cborEncodeShared( - EventEnvelope[typeof(eventPayload)](eventType: eventName, payload: eventPayload) + block: + let evtName: string = eventName + var (dataPtr, dataLen) = cborEncodeShared( + EventEnvelope[typeof(eventPayload)](eventType: evtName, payload: eventPayload) ) - defer: - cborFreeShared(data) - notifyListenersOk(listeners, data, dataLen) + let namePtr = alloc(evtName) + enqueueOrMarkStuck(evtName, namePtr, dataPtr, dataLen) diff --git a/tests/unit/test_event_dispatch.nim b/tests/unit/test_event_dispatch.nim index dfc2cac..deb6724 100644 --- a/tests/unit/test_event_dispatch.nim +++ b/tests/unit/test_event_dispatch.nim @@ -104,6 +104,20 @@ proc setterThreadBody(args: SetterArgs) {.thread.} = suite "dispatchFFIEventCbor": test "delivers EventEnvelope-shaped CBOR payload to event callback": + # CallbackData defers declared first so they run LAST (LIFO), + # AFTER pool.destroyFFIContext joins the event thread. Otherwise + # TSan flags `captureCb` accessing an already-destroyed mutex + # whose memory got reused by the next test's stack frame. + var evt: CallbackData + initCallbackData(evt) + defer: + deinitCallbackData(evt) + + var rsp: CallbackData + initCallbackData(rsp) + defer: + deinitCallbackData(rsp) + var pool: FFIContextPool[TestEvtLib] let ctx = pool.createFFIContext().valueOr: check false @@ -111,24 +125,11 @@ suite "dispatchFFIEventCbor": defer: discard pool.destroyFFIContext(ctx) - var evt: CallbackData - initCallbackData(evt) - defer: - deinitCallbackData(evt) - # Subscribe to the specific event the request below dispatches. discard addEventListener( ctx[].eventRegistry, "message_sent", captureCb, addr evt ) - # Trigger the dispatch from the FFI thread; the response callback is - # ignored (we only care that the request completed so we know the event - # has fired). - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) - check sendRequestToFFIThread( ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) ) @@ -145,6 +146,16 @@ suite "dispatchFFIEventCbor": suite "dispatchFFIEvent with seq[byte]": test "accepts a raw seq[byte] body": + var evt: CallbackData + initCallbackData(evt) + defer: + deinitCallbackData(evt) + + var rsp: CallbackData + initCallbackData(rsp) + defer: + deinitCallbackData(rsp) + var pool: FFIContextPool[TestEvtLib] let ctx = pool.createFFIContext().valueOr: check false @@ -152,20 +163,10 @@ suite "dispatchFFIEvent with seq[byte]": defer: discard pool.destroyFFIContext(ctx) - var evt: CallbackData - initCallbackData(evt) - defer: - deinitCallbackData(evt) - discard addEventListener( ctx[].eventRegistry, "raw_bytes", captureCb, addr evt ) - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) - check sendRequestToFFIThread( ctx, EmitRawBytesEventRequest.ffiNewReq(captureCb, addr rsp) ) @@ -188,6 +189,16 @@ when not defined(gcRefc): ## Run under tsan to actually validate the fix: ## NIM_FFI_SAN=tsan NIM_FFI_MM=orc nimble test_sanitized test "concurrent add/remove writers vs dispatch reads stay race-free": + var evt: CallbackData + initCallbackData(evt) + defer: + deinitCallbackData(evt) + + var rsp: CallbackData + initCallbackData(rsp) + defer: + deinitCallbackData(rsp) + var pool: FFIContextPool[TestEvtLib] let ctx = pool.createFFIContext().valueOr: check false @@ -195,11 +206,6 @@ when not defined(gcRefc): defer: discard pool.destroyFFIContext(ctx) - var evt: CallbackData - initCallbackData(evt) - defer: - deinitCallbackData(evt) - # Seed an initial callback so the FFI thread's first dispatch has a # target. The setter threads will then repeatedly re-install the same # (callback, userData) pair — what matters is the cross-thread write @@ -217,11 +223,6 @@ when not defined(gcRefc): for i in 0 ..< NumSetterThreads: createThread(setters[i], setterThreadBody, (ctx, addr stop, addr evt)) - var rsp: CallbackData - initCallbackData(rsp) - defer: - deinitCallbackData(rsp) - for _ in 0 ..< NumDispatchIters: # Reset rsp so each iteration's `waitCallback` blocks until the # FFI thread fires the response — keeps the loop synchronous. @@ -243,12 +244,13 @@ when not defined(gcRefc): # actually landed so a silently-broken dispatch loop is caught. check evt.called - -## A foreign-thread mutation must not be able to invalidate the -## listener's `userData` while an in-flight dispatch is mid-invocation. -## The dispatch templates hold `reg.lock` for the entire snapshot + -## invocation, so foreign `removeEventListener` blocks until dispatch -## returns. +## A foreign-thread mutation must not be able to invalidate a listener's +## `userData` while an in-flight dispatch is mid-invocation. Dispatch +## now lives on the dedicated event thread, which still holds +## `reg.lock` across the listener fan-out — so foreign +## `removeEventListener` blocks until dispatch returns. This test +## drives a real `FFIContext` (FFI thread enqueues, event thread +## dispatches) and asserts the same contract end-to-end. type SlowState = object entered: Atomic[bool] @@ -257,44 +259,49 @@ type SlowState = object proc slowEventCb( retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer ) {.cdecl, gcsafe, raises: [].} = - ## Signal entry, sleep briefly (the window during which the main - ## thread must call removeEventListener and block), signal exit. + ## Signal entry, sleep long enough that the test thread can race in + ## with removeEventListener and observe the block, then signal exit. let st = cast[ptr SlowState](userData) st[].entered.store(true) - os.sleep(15) + os.sleep(60) st[].exited.store(true) -type DispatcherArgs = tuple - reg: ptr FFIEventRegistry - done: ptr Atomic[bool] - -proc dispatcherBody(args: DispatcherArgs) {.thread.} = - ffiCurrentEventRegistry = args.reg - dispatchFFIEvent("evt"): - "payload" - args.done[].store(true) - suite "registry lock held during invocation": test "removeEventListener blocks until in-flight dispatch finishes": - var reg: FFIEventRegistry - initEventRegistry(reg) + var rsp: CallbackData + initCallbackData(rsp) defer: - deinitEventRegistry(reg) + deinitCallbackData(rsp) + + var pool: FFIContextPool[TestEvtLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctx) var st: SlowState st.entered.store(false) st.exited.store(false) - let id = addEventListener(reg, "evt", slowEventCb, addr st) + # Register the slow callback under the same event the dispatch fires. + let id = addEventListener( + ctx[].eventRegistry, "message_sent", slowEventCb, addr st + ) check id != 0'u64 - var done: Atomic[bool] - done.store(false) - var thr: Thread[DispatcherArgs] - createThread(thr, dispatcherBody, (addr reg, addr done)) + # Fire an event from the FFI thread; the request-response callback + # only confirms the request completed — the slow listener is invoked + # asynchronously by the event thread. - # Wait until the worker thread is inside slowEventCb. - for _ in 0 ..< 200: + check sendRequestToFFIThread( + ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() + waitCallback(rsp) + + # Wait until the event thread is inside slowEventCb. + for _ in 0 ..< 500: if st.entered.load(): break os.sleep(1) @@ -303,7 +310,5 @@ suite "registry lock held during invocation": # Lock-during-invocation contract: remove blocks until dispatch # finishes; by the time it returns, slowEventCb has set exited=true. - check removeEventListener(reg, id) + check removeEventListener(ctx[].eventRegistry, id) check st.exited.load() - joinThread(thr) - check done.load() diff --git a/tests/unit/test_event_thread.nim b/tests/unit/test_event_thread.nim new file mode 100644 index 0000000..a459a24 --- /dev/null +++ b/tests/unit/test_event_thread.nim @@ -0,0 +1,441 @@ +## Integration tests for the dedicated event thread introduced by +## issue #6. Each test stands up a real `FFIContext` (via +## `FFIContextPool`) and exercises the FFI thread → bounded queue → +## event thread → listener pipeline end to end. + +import std/[atomics, locks, os, strutils] +import unittest2 +import results +import ffi + +type TestEvtLib = object + +type LatchPayload* {.ffi.} = object + iter*: int + +## Captured-state callback identical to the helpers in +## `test_event_dispatch.nim`, repeated here so this file stays a +## self-contained test binary. +type CallbackData = object + lock: Lock + cond: Cond + called: bool + retCode: cint + msg: array[1024, byte] + msgLen: int + +proc initCallbackData(d: var CallbackData) = + d.lock.initLock() + d.cond.initCond() + +proc deinitCallbackData(d: var CallbackData) = + d.cond.deinitCond() + d.lock.deinitLock() + +proc captureCb( + retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} = + let d = cast[ptr CallbackData](userData) + acquire(d[].lock) + d[].retCode = retCode + let n = min(int(len), d[].msg.len) + if n > 0 and not msg.isNil: + copyMem(addr d[].msg[0], msg, n) + d[].msgLen = n + d[].called = true + signal(d[].cond) + release(d[].lock) + +proc waitCallback(d: var CallbackData) = + acquire(d.lock) + while not d.called: + wait(d.cond, d.lock) + release(d.lock) + +proc waitCallbackTimeout(d: var CallbackData, timeoutMs: int): bool = + ## Polling variant for tests where the callback may legitimately + ## never fire — returns true if observed `called` within the budget, + ## false on timeout. Polls in 10 ms increments under `d.lock` so the + ## load is synchronised with the `captureCb` writer. + let deadline = Moment.now() + timeoutMs.milliseconds + while true: + acquire(d.lock) + let done = d.called + release(d.lock) + if done: + return true + if Moment.now() >= deadline: + return false + os.sleep(10) + +# --------------------------------------------------------------------------- +# Request helpers +# --------------------------------------------------------------------------- + +registerReqFFI(EmitLatchEvent, lib: ptr TestEvtLib): + proc(iter: int): Future[Result[string, string]] {.async.} = + dispatchFFIEventCbor("latch", LatchPayload(iter: iter)) + return ok("emitted") + +## A request whose async body completes immediately — useful for +## probing FFI-thread round-trip latency under load. +registerReqFFI(PingEvent, lib: ptr TestEvtLib): + proc(): Future[Result[string, string]] {.async.} = + return ok("pong") + +## A request whose async body blocks the FFI thread synchronously +## (no await) so the heartbeat freezes. Used to exercise the new +## heartbeat-staleness path. Guarded by an Atomic switch so we can +## enable it deterministically per test rather than turning every +## test of this request type into a watchdog probe. +var gBlockingEnabled: Atomic[bool] +gBlockingEnabled.store(false) + +registerReqFFI(BlockingRequest, lib: ptr TestEvtLib): + proc(milliseconds: int): Future[Result[string, string]] {.async.} = + if gBlockingEnabled.load(): + os.sleep(milliseconds) + return ok("done") + +# --------------------------------------------------------------------------- +# Thread-id capture +# --------------------------------------------------------------------------- + +var gListenerThreadId: Atomic[int] +gListenerThreadId.store(-1) + +proc captureThreadIdCb( + retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} = + ## Records the OS thread id of the listener invocation, so the test + ## can assert it differs from the FFI thread's id. + gListenerThreadId.store(getThreadId()) + let d = cast[ptr CallbackData](userData) + acquire(d[].lock) + d[].called = true + signal(d[].cond) + release(d[].lock) + +## Returns the FFI thread's id by running a no-op request and reading +## `getThreadId()` from inside the handler. Used to compare against +## the listener's thread id below. +var gFfiThreadId: Atomic[int] +gFfiThreadId.store(-1) + +registerReqFFI(CaptureFfiTidRequest, lib: ptr TestEvtLib): + proc(): Future[Result[string, string]] {.async.} = + gFfiThreadId.store(getThreadId()) + return ok("captured") + +suite "event delivery is asynchronous": + test "listener runs on the event thread, not the FFI thread": + # CallbackData defers come BEFORE the pool-destroy defer so they run + # AFTER it (LIFO): the event thread is joined before any lock the + # event thread might still be holding is torn down — otherwise TSan + # flags `captureCb` accessing an already-destroyed mutex. + var evt: CallbackData + initCallbackData(evt) + defer: + deinitCallbackData(evt) + + var rsp: CallbackData + initCallbackData(rsp) + defer: + deinitCallbackData(rsp) + + var pool: FFIContextPool[TestEvtLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctx) + + discard addEventListener( + ctx[].eventRegistry, "latch", captureThreadIdCb, addr evt + ) + + # Capture the FFI thread id. + check sendRequestToFFIThread( + ctx, CaptureFfiTidRequest.ffiNewReq(captureCb, addr rsp) + ) + .isOk() + waitCallback(rsp) + + # Now emit an event from the FFI thread and wait for the listener. + acquire(rsp.lock) + rsp.called = false + release(rsp.lock) + check sendRequestToFFIThread( + ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0) + ) + .isOk() + waitCallback(rsp) + waitCallback(evt) + + let ffiTid = gFfiThreadId.load() + let listenerTid = gListenerThreadId.load() + check ffiTid >= 0 + check listenerTid >= 0 + check ffiTid != listenerTid + +# --------------------------------------------------------------------------- +# Slow listener does not block the FFI thread +# --------------------------------------------------------------------------- + +proc slowSleepCb( + retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} = + ## Sleeps long enough that we'd notice if the FFI thread were waiting + ## on us before accepting the next request. + os.sleep(150) + +suite "FFI thread independence": + test "slow listener does not block FFI thread request round-trip": + var rsp: CallbackData + initCallbackData(rsp) + defer: + deinitCallbackData(rsp) + + var pool: FFIContextPool[TestEvtLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctx) + + discard addEventListener( + ctx[].eventRegistry, "latch", slowSleepCb, nil + ) + + # Fire an event, then immediately fire a ping request. With dispatch + # off the FFI thread, the ping must complete in well under 150 ms + # even though the prior event is still in flight on the event thread. + check sendRequestToFFIThread( + ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0) + ) + .isOk() + waitCallback(rsp) + + acquire(rsp.lock) + rsp.called = false + release(rsp.lock) + + # Use chronos's Moment for timing so we don't pull std/times into + # scope — std/times exports a `milliseconds` proc that shadows the + # chronos one used inside the FFI thread body, breaking compilation + # of generic instantiations at this call site. + let started = Moment.now() + check sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rsp)) + .isOk() + waitCallback(rsp) + let elapsed = Moment.now() - started + + check elapsed < 100.milliseconds # ample margin under the 150 ms slow-listener sleep + +# --------------------------------------------------------------------------- +# Heartbeat staleness fires onNotResponding +# --------------------------------------------------------------------------- + +when not defined(gcRefc): + ## Skipped under `--mm:refc`: this test relies on `os.sleep`ing the + ## FFI thread for several seconds inside a synchronous handler. + ## refc plus the existing destroy-on-time policies make that + ## combination flaky in CI; the orc path is the contract we care + ## about here. + suite "FFI heartbeat staleness": + test "wedged FFI thread triggers onNotResponding via heartbeat": + # Lock-bearing CallbackData defers are declared FIRST so they run + # LAST (LIFO); pool-destroy runs FIRST and joins the event thread + # before any mutex it might still be holding is destroyed. + var notif: CallbackData + initCallbackData(notif) + defer: + deinitCallbackData(notif) + + var rsp: CallbackData + initCallbackData(rsp) + defer: + deinitCallbackData(rsp) + + var pool: FFIContextPool[TestEvtLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + # Disable the wedge before tearing down so destroy isn't blocked + # by the still-sleeping handler. + gBlockingEnabled.store(false) + discard pool.destroyFFIContext(ctx) + + # Subscribe to the exact event name `onNotResponding` looks up so + # the captured signal can't be ambiguously satisfied by an unrelated + # event the test happens to dispatch. + discard addEventListener( + ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif + ) + + # Wait out the start-delay grace window first so the heartbeat + # check is actually armed. + os.sleep(FFIHeartbeatStartDelay.milliseconds.int + 200) + + # Wedge the FFI thread for longer than EventThreadTickInterval + + # FFIHeartbeatStaleThreshold so the event thread observes a + # frozen heartbeat across at least one tick boundary. + gBlockingEnabled.store(true) + let wedgeMs = + (EventThreadTickInterval + FFIHeartbeatStaleThreshold).milliseconds.int + + 1500 + check sendRequestToFFIThread( + ctx, BlockingRequest.ffiNewReq(captureCb, addr rsp, wedgeMs) + ) + .isOk() + waitCallback(rsp) + gBlockingEnabled.store(false) + + # The not_responding event should have been delivered while the + # FFI thread was wedged. captureCb writes `called` under + # `notif.lock`, so wait through the cond rather than reading it + # raw — avoids both the data race and the just-missed-it window. + check waitCallbackTimeout(notif, 1500) + +# --------------------------------------------------------------------------- +# Queue overflow sets the stuck flag and rejects further requests +# --------------------------------------------------------------------------- + +type BackpressureState = object + enteredLock: Lock + enteredCond: Cond + entered: Atomic[bool] + releaseLock: Lock + releaseCond: Cond + release: Atomic[bool] + +proc initBackpressure(b: var BackpressureState) = + b.enteredLock.initLock() + b.enteredCond.initCond() + b.releaseLock.initLock() + b.releaseCond.initCond() + +proc deinitBackpressure(b: var BackpressureState) = + b.enteredCond.deinitCond() + b.enteredLock.deinitLock() + b.releaseCond.deinitCond() + b.releaseLock.deinitLock() + +proc backpressureCb( + retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} = + ## First invocation signals entered and then blocks until released — + ## holds the event thread inside `withLock reg.lock`, which back-pressures + ## subsequent dispatches and gives us a deterministic way to fill the + ## queue. + let b = cast[ptr BackpressureState](userData) + if not b[].entered.exchange(true): + acquire(b[].enteredLock) + signal(b[].enteredCond) + release(b[].enteredLock) + + acquire(b[].releaseLock) + while not b[].release.load(): + wait(b[].releaseCond, b[].releaseLock) + release(b[].releaseLock) + +## A request that does N back-to-back dispatches from the FFI thread. +## Used to push enough events at once that the queue saturates while +## the event thread is stuck inside the backpressure listener above. +registerReqFFI(BurstEmit, lib: ptr TestEvtLib): + proc(count: int): Future[Result[string, string]] {.async.} = + for i in 0 ..< count: + dispatchFFIEventCbor("latch", LatchPayload(iter: i)) + return ok("bursted") + +suite "queue overflow": + test "overflow sets stuck flag, fires onNotResponding, rejects new requests": + # Lock-bearing state defers come FIRST so they run LAST (LIFO); + # pool destroy joins the event thread before any mutex still + # referenced from a listener is torn down. + var bp: BackpressureState + initBackpressure(bp) + defer: + deinitBackpressure(bp) + + var notif: CallbackData + initCallbackData(notif) + defer: + deinitCallbackData(notif) + + var rsp: CallbackData + initCallbackData(rsp) + defer: + deinitCallbackData(rsp) + + var rejected: CallbackData + initCallbackData(rejected) + defer: + deinitCallbackData(rejected) + + var pool: FFIContextPool[TestEvtLib] + let ctx = pool.createFFIContext().valueOr: + check false + return + defer: + discard pool.destroyFFIContext(ctx) + + discard addEventListener( + ctx[].eventRegistry, "latch", backpressureCb, addr bp + ) + # Subscribe to the exact not_responding event name so the wait below + # can't be falsely satisfied by a "latch" payload from the burst. + discard addEventListener( + ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif + ) + + # Kick off one event so the event thread enters backpressureCb and + # holds reg.lock. Once that's confirmed, any subsequent enqueues + # pile up in the queue without being drained. + check sendRequestToFFIThread( + ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, -1) + ) + .isOk() + waitCallback(rsp) + + acquire(bp.enteredLock) + while not bp.entered.load(): + wait(bp.enteredCond, bp.enteredLock) + release(bp.enteredLock) + + # Now flood the queue. EventQueueCapacity+8 dispatches in one + # FFI-thread request, atomically from the queue's perspective: the + # event thread can't drain anything because it's stuck inside the + # first callback. The last several enqueues must hit the + # queue-full path and flip the stuck flag. + acquire(rsp.lock) + rsp.called = false + release(rsp.lock) + check sendRequestToFFIThread( + ctx, BurstEmit.ffiNewReq(captureCb, addr rsp, EventQueueCapacity + 8) + ) + .isOk() + waitCallback(rsp) + + # The stuck flag is set as soon as the first overflow happens; + # subsequent sendRequestToFFIThread calls must short-circuit. + check ctx.eventQueueStuck.load() + + let res = + sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rejected)) + check res.isErr() + check res.error.contains("stuck") + + # Release backpressure so the event thread can drain, advance past + # the backpressure listener, observe the stuck flag, and fire the + # not_responding notification. + acquire(bp.releaseLock) + bp.release.store(true) + signal(bp.releaseCond) + release(bp.releaseLock) + + # The whole point of this test is that overflow surfaces the + # not_responding signal — assert it actually fires within a + # bounded window rather than letting the test silently pass. + check waitCallbackTimeout(notif, 2000)