chore: separate event threadghub pr edit 69 --base chore/ffi-context-lifecycle

This commit is contained in:
Gabriel Cruz 2026-06-02 15:33:25 -03:00
parent 20e29fd454
commit c7c9329abd
No known key found for this signature in database
GPG Key ID: 2E467754A6BA9BA5
5 changed files with 611 additions and 114 deletions

View File

@ -2,6 +2,26 @@
All notable changes to this project are documented in this file.
## [Unreleased]
### Changed
- User event callbacks now run on a dedicated event thread fed by a
bounded SPSC queue (default capacity 1024), so a slow listener can no
longer block the FFI thread or concurrent `add_event_listener` /
`remove_event_listener` calls
([#6](https://github.com/logos-messaging/nim-ffi/issues/6)).
- Replaced the dedicated watchdog thread with a heartbeat check that
runs on the event thread. The FFI thread advances an atomic heartbeat
each loop iteration; if it stalls for more than 1s past the start-up
grace window, the event thread emits the `not_responding` event.
### Added
- Queue-overflow handling: when the bounded event queue is full, the
library sets a sticky "stuck" flag, logs an error, fires
`not_responding` from the event thread, and rejects subsequent
`sendRequestToFFIThread` calls with `event queue stuck - library
cannot accept new requests`.
## [0.2.0] - 2026-05-20
Major release introducing CBOR-based wire format, multi-language binding

View File

@ -4,11 +4,7 @@ import system/ansi_c
import std/[atomics, locks, options, tables]
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
import
./ffi_types,
./ffi_events,
./ffi_thread_request,
./internal/ffi_macro,
./logging,
./ffi_types, ./ffi_events, ./ffi_thread_request, ./internal/ffi_macro, ./logging,
./cbor_serial
export ffi_events
@ -98,7 +94,8 @@ proc sendRequestToFFIThread*(
# Event-queue overflow refuses further requests; the event thread fires onNotResponding to avoid deadlocking on reg.lock here.
if ctx.eventQueueStuck.load():
deleteRequest(ffiRequest)
return err("event queue stuck - library cannot accept new requests")
return
err("event queue stuck - library cannot accept new requests")
# Reentrancy guard: only this thread can fire `reqReceivedSignal`, so a handler dispatching back would self-deadlock.
if onFFIThread:
@ -262,7 +259,8 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
try:
await allFutures(pending)
except CatchableError as exc:
error "draining pending FFI requests on shutdown raised", error = exc.msg
error "draining pending FFI requests on shutdown raised",
error = exc.msg
waitFor ffiRun(ctx)
@ -486,7 +484,8 @@ proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] =
# Non-fatal: event thread will see running==false on the next tick.
let evtSignaled = ctx.eventQueueSignal.fireSync()
if evtSignaled.isErr():
error "failed to signal eventQueueSignal in signalStop", error = evtSignaled.error
error "failed to signal eventQueueSignal in signalStop",
error = evtSignaled.error
elif evtSignaled.get() == false:
error "failed to signal eventQueueSignal on time in signalStop"
ok()

View File

@ -2,13 +2,21 @@
## FFI library-initiated events. Listeners receive only the event name
## they subscribed to. Queue payloads travel via `c_malloc` so transfer
## across Nim heaps is safe under both `--mm:orc` and `--mm:refc`.
##
## Dispatch templates enqueue and return on the FFI thread; a dedicated
## event thread (owned by `FFIContext`) drains the queue and invokes
## listeners, so a slow handler can never block the FFI event loop. On
## queue overflow the templates log, set a sticky stuck flag, and wake
## the event thread — which fires the global "not responding"
## notification from its own loop (firing it from the FFI thread would
## risk deadlocking against a listener back-pressuring under `reg.lock`).
{.pragma: callback, cdecl, raises: [], gcsafe.}
import system/ansi_c
import std/[atomics, locks, sequtils, options, tables]
import chronicles
import ./ffi_types, ./cbor_serial
import ./ffi_types, ./cbor_serial, ./alloc
type EventEnvelope*[T] = object
@ -226,62 +234,86 @@ proc notifyListenersErr*(listeners: seq[FFIEventListener], msg: string) =
)
var ffiCurrentEventRegistry* {.threadvar.}: ptr FFIEventRegistry
## Kept for tests that drive the registry directly. Dispatch no longer
## reads it — invocation has moved to the event thread.
var ffiCurrentEventQueue* {.threadvar.}: ptr EventQueue
## Installed by the FFI thread so dispatch templates enqueue without
## threading a `ctx` parameter through every call site.
var ffiCurrentEventQueueStuck* {.threadvar.}: ptr Atomic[bool]
## Sticky overflow flag on the owning `FFIContext`. Set by dispatch
## templates when enqueue fails; read by the FFI request entry point
## to reject further calls.
var ffiCurrentNotifyEventEnqueued* {.threadvar.}: proc() {.gcsafe, raises: [].}
## Hook (not a queue field) so this module doesn't depend on chronos's
## ThreadSignalPtr. Nil-safe.
## ThreadSignalPtr. Nil-safe — tests that drive the queue directly leave
## it unset and the event thread picks up enqueued events on the next tick.
template withFFIEventDispatch(eventName: string, listeners, body: untyped) =
## Resolves the thread-local registry, snapshots listeners under
## `reg.lock`, then runs `body` inside `foreignThreadGc` + try/except.
let regPtr = ffiCurrentEventRegistry
if regPtr.isNil():
chronicles.error eventName & " - event registry not set on this thread"
return
withLock regPtr[].lock:
let listeners = regPtr[].byEvent.getOrDefault(eventName)
if listeners.len == 0:
chronicles.debug eventName & " - no listener registered"
else:
foreignThreadGc:
try:
body
except Exception, CatchableError:
notifyListenersErr(
listeners,
"Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(),
)
template enqueueOrMarkStuck(
eventName: string,
namePtr: cstring,
dataPtr: ptr UncheckedArray[byte],
dataLen: int,
) =
## Common tail for both dispatch templates. Takes ownership of `namePtr`
## and `dataPtr` (both `c_malloc`'d). On queue-full, frees the buffers,
## sets the sticky stuck flag, and wakes the event thread — which fires
## onNotResponding from its loop (firing it here would risk deadlocking
## against a listener back-pressuring under `reg.lock`).
let q = ffiCurrentEventQueue
if q.isNil():
chronicles.error "event queue not set on this thread", event = eventName
if not namePtr.isNil:
c_free(cast[pointer](namePtr))
if not dataPtr.isNil:
c_free(dataPtr)
elif not q[].tryEnqueueEvent(namePtr, dataPtr, dataLen):
chronicles.error "event queue full; library marked stuck",
event = eventName, capacity = EventQueueCapacity
if not namePtr.isNil:
c_free(cast[pointer](namePtr))
if not dataPtr.isNil:
c_free(dataPtr)
if not ffiCurrentEventQueueStuck.isNil():
ffiCurrentEventQueueStuck[].store(true)
if not ffiCurrentNotifyEventEnqueued.isNil():
ffiCurrentNotifyEventEnqueued()
else:
if not ffiCurrentNotifyEventEnqueued.isNil():
ffiCurrentNotifyEventEnqueued()
template dispatchFFIEvent*(eventName: string, body: untyped) =
## Dispatches an FFI event to every listener subscribed to `eventName`.
## `body` must yield a `string` or `seq[byte]`.
##
## Valid only on the FFI thread (where `ffiCurrentEventRegistry` is set).
## Holds `reg.lock` for the entire snapshot + invocation so a concurrent
## `removeEventListener` from a foreign thread blocks until dispatch
## returns. Handlers must not call addEventListener / removeEventListener
## on the same registry (would self-deadlock).
withFFIEventDispatch(eventName, listeners):
let event = body
let dataPtr: pointer =
if event.len > 0: unsafeAddr event[0]
else: nil
notifyListenersOk(listeners, dataPtr, event.len)
## Runs on the FFI thread: encodes the body into a fresh `c_malloc`
## buffer and enqueues it. Listener invocation happens later on the
## dedicated event thread, so user code can never block the FFI loop.
block:
let evtName: string = eventName
let bodyVal = body
var dataPtr: ptr UncheckedArray[byte] = nil
let dataLen = bodyVal.len
if dataLen > 0:
dataPtr = cast[ptr UncheckedArray[byte]](c_malloc(csize_t(dataLen)))
copyMem(dataPtr, unsafeAddr bodyVal[0], dataLen)
let namePtr = alloc(evtName)
enqueueOrMarkStuck(evtName, namePtr, dataPtr, dataLen)
template dispatchFFIEventCbor*(eventName: string, eventPayload: typed) =
## Typed CBOR variant of `dispatchFFIEvent`. Wraps `eventPayload` in an
## `EventEnvelope`, CBOR-encodes it into a `c_malloc` buffer once, and
## fans the same buffer out to every registered listener.
## queues it for the event thread to fan out to listeners.
##
## NB: parameter is `eventPayload`, not `payload` — Nim's template
## substitution would otherwise also rewrite the `payload:` field inside
## `EventEnvelope`.
withFFIEventDispatch(eventName, listeners):
var (data, dataLen) = cborEncodeShared(
EventEnvelope[typeof(eventPayload)](eventType: eventName, payload: eventPayload)
block:
let evtName: string = eventName
var (dataPtr, dataLen) = cborEncodeShared(
EventEnvelope[typeof(eventPayload)](eventType: evtName, payload: eventPayload)
)
defer:
cborFreeShared(data)
notifyListenersOk(listeners, data, dataLen)
let namePtr = alloc(evtName)
enqueueOrMarkStuck(evtName, namePtr, dataPtr, dataLen)

View File

@ -104,6 +104,20 @@ proc setterThreadBody(args: SetterArgs) {.thread.} =
suite "dispatchFFIEventCbor":
test "delivers EventEnvelope-shaped CBOR payload to event callback":
# CallbackData defers declared first so they run LAST (LIFO),
# AFTER pool.destroyFFIContext joins the event thread. Otherwise
# TSan flags `captureCb` accessing an already-destroyed mutex
# whose memory got reused by the next test's stack frame.
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
@ -111,24 +125,11 @@ suite "dispatchFFIEventCbor":
defer:
discard pool.destroyFFIContext(ctx)
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
# Subscribe to the specific event the request below dispatches.
discard addEventListener(
ctx[].eventRegistry, "message_sent", captureCb, addr evt
)
# Trigger the dispatch from the FFI thread; the response callback is
# ignored (we only care that the request completed so we know the event
# has fired).
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
@ -145,6 +146,16 @@ suite "dispatchFFIEventCbor":
suite "dispatchFFIEvent with seq[byte]":
test "accepts a raw seq[byte] body":
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
@ -152,20 +163,10 @@ suite "dispatchFFIEvent with seq[byte]":
defer:
discard pool.destroyFFIContext(ctx)
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
discard addEventListener(
ctx[].eventRegistry, "raw_bytes", captureCb, addr evt
)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
check sendRequestToFFIThread(
ctx, EmitRawBytesEventRequest.ffiNewReq(captureCb, addr rsp)
)
@ -188,6 +189,16 @@ when not defined(gcRefc):
## Run under tsan to actually validate the fix:
## NIM_FFI_SAN=tsan NIM_FFI_MM=orc nimble test_sanitized
test "concurrent add/remove writers vs dispatch reads stay race-free":
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
@ -195,11 +206,6 @@ when not defined(gcRefc):
defer:
discard pool.destroyFFIContext(ctx)
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
# Seed an initial callback so the FFI thread's first dispatch has a
# target. The setter threads will then repeatedly re-install the same
# (callback, userData) pair — what matters is the cross-thread write
@ -217,11 +223,6 @@ when not defined(gcRefc):
for i in 0 ..< NumSetterThreads:
createThread(setters[i], setterThreadBody, (ctx, addr stop, addr evt))
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
for _ in 0 ..< NumDispatchIters:
# Reset rsp so each iteration's `waitCallback` blocks until the
# FFI thread fires the response — keeps the loop synchronous.
@ -243,12 +244,13 @@ when not defined(gcRefc):
# actually landed so a silently-broken dispatch loop is caught.
check evt.called
## A foreign-thread mutation must not be able to invalidate the
## listener's `userData` while an in-flight dispatch is mid-invocation.
## The dispatch templates hold `reg.lock` for the entire snapshot +
## invocation, so foreign `removeEventListener` blocks until dispatch
## returns.
## A foreign-thread mutation must not be able to invalidate a listener's
## `userData` while an in-flight dispatch is mid-invocation. Dispatch
## now lives on the dedicated event thread, which still holds
## `reg.lock` across the listener fan-out — so foreign
## `removeEventListener` blocks until dispatch returns. This test
## drives a real `FFIContext` (FFI thread enqueues, event thread
## dispatches) and asserts the same contract end-to-end.
type SlowState = object
entered: Atomic[bool]
@ -257,44 +259,49 @@ type SlowState = object
proc slowEventCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
## Signal entry, sleep briefly (the window during which the main
## thread must call removeEventListener and block), signal exit.
## Signal entry, sleep long enough that the test thread can race in
## with removeEventListener and observe the block, then signal exit.
let st = cast[ptr SlowState](userData)
st[].entered.store(true)
os.sleep(15)
os.sleep(60)
st[].exited.store(true)
type DispatcherArgs = tuple
reg: ptr FFIEventRegistry
done: ptr Atomic[bool]
proc dispatcherBody(args: DispatcherArgs) {.thread.} =
ffiCurrentEventRegistry = args.reg
dispatchFFIEvent("evt"):
"payload"
args.done[].store(true)
suite "registry lock held during invocation":
test "removeEventListener blocks until in-flight dispatch finishes":
var reg: FFIEventRegistry
initEventRegistry(reg)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitEventRegistry(reg)
deinitCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
var st: SlowState
st.entered.store(false)
st.exited.store(false)
let id = addEventListener(reg, "evt", slowEventCb, addr st)
# Register the slow callback under the same event the dispatch fires.
let id = addEventListener(
ctx[].eventRegistry, "message_sent", slowEventCb, addr st
)
check id != 0'u64
var done: Atomic[bool]
done.store(false)
var thr: Thread[DispatcherArgs]
createThread(thr, dispatcherBody, (addr reg, addr done))
# Fire an event from the FFI thread; the request-response callback
# only confirms the request completed — the slow listener is invoked
# asynchronously by the event thread.
# Wait until the worker thread is inside slowEventCb.
for _ in 0 ..< 200:
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
# Wait until the event thread is inside slowEventCb.
for _ in 0 ..< 500:
if st.entered.load():
break
os.sleep(1)
@ -303,7 +310,5 @@ suite "registry lock held during invocation":
# Lock-during-invocation contract: remove blocks until dispatch
# finishes; by the time it returns, slowEventCb has set exited=true.
check removeEventListener(reg, id)
check removeEventListener(ctx[].eventRegistry, id)
check st.exited.load()
joinThread(thr)
check done.load()

View File

@ -0,0 +1,441 @@
## Integration tests for the dedicated event thread introduced by
## issue #6. Each test stands up a real `FFIContext` (via
## `FFIContextPool`) and exercises the FFI thread → bounded queue →
## event thread → listener pipeline end to end.
import std/[atomics, locks, os, strutils]
import unittest2
import results
import ffi
type TestEvtLib = object
type LatchPayload* {.ffi.} = object
iter*: int
## Captured-state callback identical to the helpers in
## `test_event_dispatch.nim`, repeated here so this file stays a
## self-contained test binary.
type CallbackData = object
lock: Lock
cond: Cond
called: bool
retCode: cint
msg: array[1024, byte]
msgLen: int
proc initCallbackData(d: var CallbackData) =
d.lock.initLock()
d.cond.initCond()
proc deinitCallbackData(d: var CallbackData) =
d.cond.deinitCond()
d.lock.deinitLock()
proc captureCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
let d = cast[ptr CallbackData](userData)
acquire(d[].lock)
d[].retCode = retCode
let n = min(int(len), d[].msg.len)
if n > 0 and not msg.isNil:
copyMem(addr d[].msg[0], msg, n)
d[].msgLen = n
d[].called = true
signal(d[].cond)
release(d[].lock)
proc waitCallback(d: var CallbackData) =
acquire(d.lock)
while not d.called:
wait(d.cond, d.lock)
release(d.lock)
proc waitCallbackTimeout(d: var CallbackData, timeoutMs: int): bool =
## Polling variant for tests where the callback may legitimately
## never fire — returns true if observed `called` within the budget,
## false on timeout. Polls in 10 ms increments under `d.lock` so the
## load is synchronised with the `captureCb` writer.
let deadline = Moment.now() + timeoutMs.milliseconds
while true:
acquire(d.lock)
let done = d.called
release(d.lock)
if done:
return true
if Moment.now() >= deadline:
return false
os.sleep(10)
# ---------------------------------------------------------------------------
# Request helpers
# ---------------------------------------------------------------------------
registerReqFFI(EmitLatchEvent, lib: ptr TestEvtLib):
proc(iter: int): Future[Result[string, string]] {.async.} =
dispatchFFIEventCbor("latch", LatchPayload(iter: iter))
return ok("emitted")
## A request whose async body completes immediately — useful for
## probing FFI-thread round-trip latency under load.
registerReqFFI(PingEvent, lib: ptr TestEvtLib):
proc(): Future[Result[string, string]] {.async.} =
return ok("pong")
## A request whose async body blocks the FFI thread synchronously
## (no await) so the heartbeat freezes. Used to exercise the new
## heartbeat-staleness path. Guarded by an Atomic switch so we can
## enable it deterministically per test rather than turning every
## test of this request type into a watchdog probe.
var gBlockingEnabled: Atomic[bool]
gBlockingEnabled.store(false)
registerReqFFI(BlockingRequest, lib: ptr TestEvtLib):
proc(milliseconds: int): Future[Result[string, string]] {.async.} =
if gBlockingEnabled.load():
os.sleep(milliseconds)
return ok("done")
# ---------------------------------------------------------------------------
# Thread-id capture
# ---------------------------------------------------------------------------
var gListenerThreadId: Atomic[int]
gListenerThreadId.store(-1)
proc captureThreadIdCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
## Records the OS thread id of the listener invocation, so the test
## can assert it differs from the FFI thread's id.
gListenerThreadId.store(getThreadId())
let d = cast[ptr CallbackData](userData)
acquire(d[].lock)
d[].called = true
signal(d[].cond)
release(d[].lock)
## Returns the FFI thread's id by running a no-op request and reading
## `getThreadId()` from inside the handler. Used to compare against
## the listener's thread id below.
var gFfiThreadId: Atomic[int]
gFfiThreadId.store(-1)
registerReqFFI(CaptureFfiTidRequest, lib: ptr TestEvtLib):
proc(): Future[Result[string, string]] {.async.} =
gFfiThreadId.store(getThreadId())
return ok("captured")
suite "event delivery is asynchronous":
test "listener runs on the event thread, not the FFI thread":
# CallbackData defers come BEFORE the pool-destroy defer so they run
# AFTER it (LIFO): the event thread is joined before any lock the
# event thread might still be holding is torn down — otherwise TSan
# flags `captureCb` accessing an already-destroyed mutex.
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
discard addEventListener(
ctx[].eventRegistry, "latch", captureThreadIdCb, addr evt
)
# Capture the FFI thread id.
check sendRequestToFFIThread(
ctx, CaptureFfiTidRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
# Now emit an event from the FFI thread and wait for the listener.
acquire(rsp.lock)
rsp.called = false
release(rsp.lock)
check sendRequestToFFIThread(
ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0)
)
.isOk()
waitCallback(rsp)
waitCallback(evt)
let ffiTid = gFfiThreadId.load()
let listenerTid = gListenerThreadId.load()
check ffiTid >= 0
check listenerTid >= 0
check ffiTid != listenerTid
# ---------------------------------------------------------------------------
# Slow listener does not block the FFI thread
# ---------------------------------------------------------------------------
proc slowSleepCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
## Sleeps long enough that we'd notice if the FFI thread were waiting
## on us before accepting the next request.
os.sleep(150)
suite "FFI thread independence":
test "slow listener does not block FFI thread request round-trip":
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
discard addEventListener(
ctx[].eventRegistry, "latch", slowSleepCb, nil
)
# Fire an event, then immediately fire a ping request. With dispatch
# off the FFI thread, the ping must complete in well under 150 ms
# even though the prior event is still in flight on the event thread.
check sendRequestToFFIThread(
ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0)
)
.isOk()
waitCallback(rsp)
acquire(rsp.lock)
rsp.called = false
release(rsp.lock)
# Use chronos's Moment for timing so we don't pull std/times into
# scope — std/times exports a `milliseconds` proc that shadows the
# chronos one used inside the FFI thread body, breaking compilation
# of generic instantiations at this call site.
let started = Moment.now()
check sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rsp))
.isOk()
waitCallback(rsp)
let elapsed = Moment.now() - started
check elapsed < 100.milliseconds # ample margin under the 150 ms slow-listener sleep
# ---------------------------------------------------------------------------
# Heartbeat staleness fires onNotResponding
# ---------------------------------------------------------------------------
when not defined(gcRefc):
## Skipped under `--mm:refc`: this test relies on `os.sleep`ing the
## FFI thread for several seconds inside a synchronous handler.
## refc plus the existing destroy-on-time policies make that
## combination flaky in CI; the orc path is the contract we care
## about here.
suite "FFI heartbeat staleness":
test "wedged FFI thread triggers onNotResponding via heartbeat":
# Lock-bearing CallbackData defers are declared FIRST so they run
# LAST (LIFO); pool-destroy runs FIRST and joins the event thread
# before any mutex it might still be holding is destroyed.
var notif: CallbackData
initCallbackData(notif)
defer:
deinitCallbackData(notif)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
# Disable the wedge before tearing down so destroy isn't blocked
# by the still-sleeping handler.
gBlockingEnabled.store(false)
discard pool.destroyFFIContext(ctx)
# Subscribe to the exact event name `onNotResponding` looks up so
# the captured signal can't be ambiguously satisfied by an unrelated
# event the test happens to dispatch.
discard addEventListener(
ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif
)
# Wait out the start-delay grace window first so the heartbeat
# check is actually armed.
os.sleep(FFIHeartbeatStartDelay.milliseconds.int + 200)
# Wedge the FFI thread for longer than EventThreadTickInterval +
# FFIHeartbeatStaleThreshold so the event thread observes a
# frozen heartbeat across at least one tick boundary.
gBlockingEnabled.store(true)
let wedgeMs =
(EventThreadTickInterval + FFIHeartbeatStaleThreshold).milliseconds.int +
1500
check sendRequestToFFIThread(
ctx, BlockingRequest.ffiNewReq(captureCb, addr rsp, wedgeMs)
)
.isOk()
waitCallback(rsp)
gBlockingEnabled.store(false)
# The not_responding event should have been delivered while the
# FFI thread was wedged. captureCb writes `called` under
# `notif.lock`, so wait through the cond rather than reading it
# raw — avoids both the data race and the just-missed-it window.
check waitCallbackTimeout(notif, 1500)
# ---------------------------------------------------------------------------
# Queue overflow sets the stuck flag and rejects further requests
# ---------------------------------------------------------------------------
type BackpressureState = object
enteredLock: Lock
enteredCond: Cond
entered: Atomic[bool]
releaseLock: Lock
releaseCond: Cond
release: Atomic[bool]
proc initBackpressure(b: var BackpressureState) =
b.enteredLock.initLock()
b.enteredCond.initCond()
b.releaseLock.initLock()
b.releaseCond.initCond()
proc deinitBackpressure(b: var BackpressureState) =
b.enteredCond.deinitCond()
b.enteredLock.deinitLock()
b.releaseCond.deinitCond()
b.releaseLock.deinitLock()
proc backpressureCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
## First invocation signals entered and then blocks until released —
## holds the event thread inside `withLock reg.lock`, which back-pressures
## subsequent dispatches and gives us a deterministic way to fill the
## queue.
let b = cast[ptr BackpressureState](userData)
if not b[].entered.exchange(true):
acquire(b[].enteredLock)
signal(b[].enteredCond)
release(b[].enteredLock)
acquire(b[].releaseLock)
while not b[].release.load():
wait(b[].releaseCond, b[].releaseLock)
release(b[].releaseLock)
## A request that does N back-to-back dispatches from the FFI thread.
## Used to push enough events at once that the queue saturates while
## the event thread is stuck inside the backpressure listener above.
registerReqFFI(BurstEmit, lib: ptr TestEvtLib):
proc(count: int): Future[Result[string, string]] {.async.} =
for i in 0 ..< count:
dispatchFFIEventCbor("latch", LatchPayload(iter: i))
return ok("bursted")
suite "queue overflow":
test "overflow sets stuck flag, fires onNotResponding, rejects new requests":
# Lock-bearing state defers come FIRST so they run LAST (LIFO);
# pool destroy joins the event thread before any mutex still
# referenced from a listener is torn down.
var bp: BackpressureState
initBackpressure(bp)
defer:
deinitBackpressure(bp)
var notif: CallbackData
initCallbackData(notif)
defer:
deinitCallbackData(notif)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
var rejected: CallbackData
initCallbackData(rejected)
defer:
deinitCallbackData(rejected)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
discard addEventListener(
ctx[].eventRegistry, "latch", backpressureCb, addr bp
)
# Subscribe to the exact not_responding event name so the wait below
# can't be falsely satisfied by a "latch" payload from the burst.
discard addEventListener(
ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif
)
# Kick off one event so the event thread enters backpressureCb and
# holds reg.lock. Once that's confirmed, any subsequent enqueues
# pile up in the queue without being drained.
check sendRequestToFFIThread(
ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, -1)
)
.isOk()
waitCallback(rsp)
acquire(bp.enteredLock)
while not bp.entered.load():
wait(bp.enteredCond, bp.enteredLock)
release(bp.enteredLock)
# Now flood the queue. EventQueueCapacity+8 dispatches in one
# FFI-thread request, atomically from the queue's perspective: the
# event thread can't drain anything because it's stuck inside the
# first callback. The last several enqueues must hit the
# queue-full path and flip the stuck flag.
acquire(rsp.lock)
rsp.called = false
release(rsp.lock)
check sendRequestToFFIThread(
ctx, BurstEmit.ffiNewReq(captureCb, addr rsp, EventQueueCapacity + 8)
)
.isOk()
waitCallback(rsp)
# The stuck flag is set as soon as the first overflow happens;
# subsequent sendRequestToFFIThread calls must short-circuit.
check ctx.eventQueueStuck.load()
let res =
sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rejected))
check res.isErr()
check res.error.contains("stuck")
# Release backpressure so the event thread can drain, advance past
# the backpressure listener, observe the stuck flag, and fire the
# not_responding notification.
acquire(bp.releaseLock)
bp.release.store(true)
signal(bp.releaseCond)
release(bp.releaseLock)
# The whole point of this test is that overflow surfaces the
# not_responding signal — assert it actually fires within a
# bounded window rather than letting the test silently pass.
check waitCallbackTimeout(notif, 2000)