Merge branch 'master' into chore/ci/nph

This commit is contained in:
Gabriel Cruz 2026-06-09 11:38:37 -03:00
commit e0ef097ea6
No known key found for this signature in database
GPG Key ID: 2E467754A6BA9BA5
9 changed files with 746 additions and 670 deletions

View File

@ -2,6 +2,26 @@
All notable changes to this project are documented in this file.
## [Unreleased]
### Changed
- User event callbacks now run on a dedicated event thread fed by a
bounded SPSC queue (default capacity 1024), so a slow listener can no
longer block the FFI thread or concurrent `add_event_listener` /
`remove_event_listener` calls
([#6](https://github.com/logos-messaging/nim-ffi/issues/6)).
- Replaced the dedicated watchdog thread with a heartbeat check that
runs on the event thread. The FFI thread advances an atomic heartbeat
each loop iteration; if it stalls for more than 1s past the start-up
grace window, the event thread emits the `not_responding` event.
### Added
- Queue-overflow handling: when the bounded event queue is full, the
library sets a sticky "stuck" flag, logs an error, fires
`not_responding` from the event thread, and rejects subsequent
`sendRequestToFFIThread` calls with `event queue stuck - library
cannot accept new requests`.
## [0.2.0] - 2026-06-04
Major release introducing the CBOR-based wire format, CBOR-backed FFI events

View File

@ -5,7 +5,7 @@
## so each thread's machinery is readable on its own.
##
## Responsibilities:
## - Drain queued events into listener callbacks (queue producer lands in PR #69).
## - Drain queued events into listener callbacks.
## - Watch `ctx.ffiHeartbeat` and emit `NotRespondingEvent` / `RespondingEvent`
## on FFI-thread stall and recovery transitions.
@ -37,14 +37,13 @@ proc dispatchToListeners[T](
)
proc emitLivenessEvent[T, P](ctx: ptr FFIContext[T], name: string, payload: P) =
## Encodes a zero-field liveness event (`NotRespondingEvent`,
## `RespondingEvent`) and dispatches it directly to listeners, bypassing
## the event queue (which may itself be wedged). Runs on the event thread.
## Encodes a liveness event and dispatches directly to listeners (bypassing
## the queue, which may be wedged). Runs on the event thread.
let event =
try:
EventEnvelope[P](eventType: name, payload: payload).cborEncode()
except CatchableError as exc:
chronicles.error "liveness event encode failed", name = name, err = exc.msg
except CatchableError as e:
chronicles.error "liveness event encode failed", name = name, err = e.msg
return
let dataPtr: pointer =
if event.len > 0:
@ -57,18 +56,14 @@ proc onNotResponding*(ctx: ptr FFIContext) =
emitLivenessEvent(ctx, NotRespondingEventName, NotRespondingEvent())
proc onResponding*(ctx: ptr FFIContext) =
## Fired once when the FFI thread's heartbeat starts advancing again
## after a `NotRespondingEvent`. Lets consumers clear any "library
## hung" UI state without polling.
## Fired once when the heartbeat resumes after a NotRespondingEvent.
## Lets consumers clear any "library hung" UI state without polling.
emitLivenessEvent(ctx, RespondingEventName, RespondingEvent())
proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) =
## Frees `qe`'s c_malloc buffers on exit.
defer:
if not qe.name.isNil():
c_free(cast[pointer](qe.name))
if not qe.data.isNil():
c_free(qe.data)
freeEventBuffers(qe.name, qe.data)
ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen)
proc drainEventQueue[T](ctx: ptr FFIContext[T]) =
@ -94,10 +89,8 @@ proc init(T: type HeartbeatMonitor, ctx: ptr FFIContext): T =
)
proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) =
## Fires `onNotResponding` once the FFI thread's heartbeat counter stops
## advancing past the stale threshold, and fires `onResponding` once it
## starts advancing again. Both transitions latch so each is emitted at
## most once per stall episode.
## Fires onNotResponding / onResponding on heartbeat stall / recovery.
## Both transitions latch — each fires at most once per stall episode.
if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay:
return
let cur = ctx.ffiHeartbeat.load()
@ -113,14 +106,19 @@ proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) =
proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} =
var hb = HeartbeatMonitor.init(ctx)
var notifiedStuck = false # latched forever — eventQueueStuck is sticky terminal.
while ctx.running.load():
# Wake on enqueue or tick — whichever first. The enqueue path lands in PR #69;
# until then the wait always times out and we fall through to the heartbeat check.
# Wake on enqueue or tick — whichever first.
discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval)
ctx.drainEventQueue()
# Fire after drain so reg.lock is free — FFI-thread would deadlock here.
if not notifiedStuck and ctx.eventQueueStuck.load():
onNotResponding(ctx)
notifiedStuck = true
if not ctx.running.load():
break
hb.check(ctx)
@ -135,5 +133,5 @@ proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
try:
waitFor eventRun(ctx)
except CatchableError as exc:
error "event thread exited with exception", error = exc.msg
except CatchableError as e:
error "event thread exited with exception", error = e.msg

View File

@ -6,122 +6,87 @@
{.passc: "-fPIC".}
import system/ansi_c
import std/[atomics, locks, options, tables]
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
import
./ffi_types,
./ffi_events,
./ffi_thread_request,
./internal/ffi_macro,
./logging,
./cbor_serial
import ./ffi_types, ./ffi_events, ./ffi_thread_request, ./logging, ./cbor_serial
export ffi_events
type FFIContext*[T] = object
myLib*: ptr T
# main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library)
myLib*: ptr T # main library object (Waku, LibP2P, SDS, …)
ffiThread: Thread[(ptr FFIContext[T])]
# represents the main FFI thread in charge of attending API consumer actions
eventThread: Thread[(ptr FFIContext[T])]
# drains the event queue and runs the FFI-thread heartbeat check
lock: Lock
reqChannel: ChannelSPSCSingle[ptr FFIThreadRequest]
reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent
reqSignal: ThreadSignalPtr
reqReceivedSignal: ThreadSignalPtr
# to signal main thread, interfacing with the FFI thread, that FFI thread received the request
stopSignal: ThreadSignalPtr
threadExitSignal: ThreadSignalPtr
# bounds destroyFFIContext's wait so a blocked loop cannot hang the caller
eventQueueSignal: ThreadSignalPtr
# wakes the event thread on enqueue (used once dispatch is rewired in PR #69)
eventQueueSignal: ThreadSignalPtr # wakes the event thread on enqueue
eventThreadExitSignal: ThreadSignalPtr # mirrors threadExitSignal for the event thread
userData*: pointer
eventRegistry*: FFIEventRegistry
eventQueue*: EventQueue
ffiHeartbeat*: Atomic[int64]
# advanced each FFI-thread loop; event thread reads for liveness
eventQueueStuck*: Atomic[bool] # sticky overflow flag
running: Atomic[bool] # To control when the threads are running
registeredRequests: ptr Table[cstring, FFIRequestProc]
# Pointer to with the registered requests at compile time
var onFFIThread* {.threadvar.}: bool
## True while executing inside `ffiThreadBody`. Used by
## `sendRequestToFFIThread` to detect re-entrant dispatch from a handler
## (which would self-deadlock on `reqReceivedSignal`).
# Re-entrant dispatch guard for `sendRequestToFFIThread`.
const git_version* {.strdefine.} = "n/a"
const
EventThreadTickInterval* = 1.seconds # bounds idle heartbeat check latency
EventThreadTickInterval* = 1.seconds
FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup
FFIHeartbeatStaleThreshold* = 1.seconds
include ./event_thread
include ./ffi_thread
template closeAndNil(field: untyped) =
if not field.isNil():
?field.close()
field = nil
proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Mirror of `initContextResources`: tears down lock, registry, queue,
## and signal fds in place. Threads MUST already be joined. Caller owns
## the memory holding `ctx`. Fields are nil'd after close so a re-init
## on the same slot doesn't double-close.
## Mirror of `initContextResources`. Threads MUST be joined first;
## fields are nil'd after close so re-init on the same slot is safe.
ctx.lock.deinitLock()
deinitEventRegistry(ctx[].eventRegistry)
deinitEventQueue(ctx[].eventQueue)
when defined(gcRefc):
## ThreadSignalPtr.close() is intentionally skipped under --mm:refc.
##
## close() goes through chronos's safeUnregisterAndCloseFd, which calls
## getThreadDispatcher() and lazily allocates a new Selector for the
## main thread. With refc and a heavy ref-object graph torn down by the
## FFI thread (libwaku/libp2p), that allocation traps inside rawNewObj
## and the refc signal handler re-enters the same allocator — the
## process never returns. Captured stack from a hung process:
## close → safeUnregisterAndCloseFd → getThreadDispatcher →
## newDispatcher → Selector.new → newObj (gc.nim:488) →
## rawNewObj (gc.nim:470) → rawNewObj → _sigtramp → signalHandler →
## newObjNoInit → addNewObjToZCT (infinite re-entry)
##
## --mm:orc does NOT exhibit this bug; see the
## "destroyFFIContext refc workaround" suite in tests/test_ffi_context.nim
## (test "destroy after heavy ref-allocation workload returns promptly").
## The signal fds (a few per ctx) are reclaimed by the OS at process
## exit; destroyFFIContext is called once per process lifetime, so the
## leak is bounded.
# ThreadSignalPtr.close() under refc traps in safeUnregisterAndCloseFd
# → newDispatcher → rawNewObj → signal-handler re-entry (process hangs).
# See tests/test_ffi_context.nim "destroyFFIContext refc workaround".
# Fd leak is bounded — destroy runs once per process lifetime.
discard
else:
if not ctx.reqSignal.isNil():
?ctx.reqSignal.close()
ctx.reqSignal = nil
if not ctx.reqReceivedSignal.isNil():
?ctx.reqReceivedSignal.close()
ctx.reqReceivedSignal = nil
if not ctx.stopSignal.isNil():
?ctx.stopSignal.close()
ctx.stopSignal = nil
if not ctx.threadExitSignal.isNil():
?ctx.threadExitSignal.close()
ctx.threadExitSignal = nil
if not ctx.eventQueueSignal.isNil():
?ctx.eventQueueSignal.close()
ctx.eventQueueSignal = nil
if not ctx.eventThreadExitSignal.isNil():
?ctx.eventThreadExitSignal.close()
ctx.eventThreadExitSignal = nil
return ok()
closeAndNil(ctx.reqSignal)
closeAndNil(ctx.reqReceivedSignal)
closeAndNil(ctx.stopSignal)
closeAndNil(ctx.threadExitSignal)
closeAndNil(ctx.eventQueueSignal)
closeAndNil(ctx.eventThreadExitSignal)
ok()
proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Full cleanup for heap-allocated contexts: closes all resources and frees memory.
## Deinit + free for heap-allocated contexts.
defer:
freeShared(ctx)
ctx.deinitContextResources()
template newSignalOrErr(field: untyped, name: string) =
field = ThreadSignalPtr.new().valueOr:
return err("couldn't create ThreadSignalPtr: " & name & ": " & $error)
proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Initialises all resources inside an already-allocated FFIContext slot.
## On failure every partially-initialised resource is closed; the caller
## is responsible for releasing the slot (freeShared or pool.releaseSlot).
# Defensive nil: deferred cleanup must never double-close stale pointers on a reused pool slot.
## On failure, the deferred cleanup closes partial state; caller releases
## the slot (freeShared or pool.releaseSlot).
# Nil first so deferred cleanup can't double-close a reused pool slot.
ctx.reqSignal = nil
ctx.reqReceivedSignal = nil
ctx.stopSignal = nil
@ -132,6 +97,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
initEventRegistry(ctx[].eventRegistry)
initEventQueue(ctx[].eventQueue)
ctx.ffiHeartbeat.store(0)
ctx.eventQueueStuck.store(false)
var success = false
defer:
@ -140,23 +106,12 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
error "failed to clean up resources after createFFIContext failure",
error = error
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr: " & $error)
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqReceivedSignal ThreadSignalPtr: " & $error)
ctx.stopSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create stopSignal ThreadSignalPtr: " & $error)
ctx.threadExitSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create threadExitSignal ThreadSignalPtr: " & $error)
ctx.eventQueueSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create eventQueueSignal ThreadSignalPtr: " & $error)
ctx.eventThreadExitSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create eventThreadExitSignal ThreadSignalPtr: " & $error)
newSignalOrErr(ctx.reqSignal, "reqSignal")
newSignalOrErr(ctx.reqReceivedSignal, "reqReceivedSignal")
newSignalOrErr(ctx.stopSignal, "stopSignal")
newSignalOrErr(ctx.threadExitSignal, "threadExitSignal")
newSignalOrErr(ctx.eventQueueSignal, "eventQueueSignal")
newSignalOrErr(ctx.eventThreadExitSignal, "eventThreadExitSignal")
ctx.registeredRequests = addr ffi_types.registeredRequests
@ -170,8 +125,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
try:
createThread(ctx.eventThread, eventThreadBody[T], ctx)
except ValueError, ResourceExhaustedError:
## ffiThread is already running; signal it to exit and join before the
## deferred cleanUpResources closes the signals it's waiting on.
# Join ffiThread before deferred cleanup closes signals it's waiting on.
ctx.running.store(false)
let fireRes = ctx.reqSignal.fireSync()
if fireRes.isErr():
@ -181,67 +135,55 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
return err("failed to create the event thread: " & getCurrentExceptionMsg())
success = true
return ok()
ok()
proc fireOrErr(sig: ThreadSignalPtr, name: string): Result[void, string] =
let fired = sig.fireSync().valueOr:
return err("error signaling: " & name & ": " & $error)
if not fired:
return err("failed to signal: " & name & " on time")
ok()
proc waitExitOrErr(
sig: ThreadSignalPtr, name: string, timeout: Duration
): Result[void, string] =
let exited = sig.waitSync(timeout).valueOr:
return err("error waiting for exit: " & name & ": " & $error)
if not exited:
return err("did not exit in time: " & name & " (leaking ctx to avoid hang)")
ok()
proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] =
# Error paths intentionally skip onNotResponding: a back-pressuring
# listener may hold reg.lock, and onNotResponding takes it — would
# amplify the stuck state into a deadlock instead of escaping it.
# Skip onNotResponding on error: it takes reg.lock, which a back-pressuring
# listener may hold — would deepen the stuck state into a deadlock.
ctx.running.store(false)
let reqSignaled = ctx.reqSignal.fireSync().valueOr:
return err("error signaling reqSignal in signalStop: " & $error)
if not reqSignaled:
return err("failed to signal reqSignal on time in signalStop")
let stopSignaled = ctx.stopSignal.fireSync().valueOr:
return err("error signaling stopSignal in signalStop: " & $error)
if not stopSignaled:
return err("failed to signal stopSignal on time in signalStop")
# Non-fatal: event thread will see running==false on the next tick.
let evtSignaled = ctx.eventQueueSignal.fireSync()
if evtSignaled.isErr():
error "failed to signal eventQueueSignal in signalStop", error = evtSignaled.error
elif evtSignaled.get() == false:
error "failed to signal eventQueueSignal on time in signalStop"
return ok()
?ctx.reqSignal.fireOrErr("reqSignal")
?ctx.stopSignal.fireOrErr("stopSignal")
# Non-fatal: event thread sees running==false on the next tick anyway.
ctx.eventQueueSignal.fireOrErr("eventQueueSignal").isOkOr:
error "failed to signal eventQueueSignal in signalStop", error = error
ok()
## If the FFI thread's event loop is blocked by a synchronous handler
## (e.g. blocking I/O), it cannot process reqSignal in time to exit.
## clearContext waits on threadExitSignal up to this bound; on timeout it
## returns err and skips joinThread/cleanup (leaking the thread + ctx slot)
## rather than hanging the caller forever.
## Bound on how long clearContext waits for the FFI thread to exit before
## leaking ctx rather than hanging the caller.
const ThreadExitTimeout* = 1500.milliseconds
proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Signals both threads to stop, waits up to ThreadExitTimeout per thread,
## and joins them. On timeout returns err and skips remaining joins
## (leaving the threads live) rather than hanging the caller. Resource
## cleanup is the caller's responsibility.
##
## Timeout paths skip onNotResponding for the same reason signalStop does.
## On timeout, returns err and skips remaining joins (leaves threads live).
## Caller owns resource cleanup. Skips onNotResponding (same reason as signalStop).
ctx.signalStop().isOkOr:
return err("signalStop failed: " & $error)
let ffiExitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr:
return err("error waiting for FFI thread exit: " & $error)
if not ffiExitedOnTime:
return err("FFI thread did not exit in time; leaking ctx to avoid hang")
?ctx.threadExitSignal.waitExitOrErr("FFI thread", ThreadExitTimeout)
joinThread(ctx.ffiThread)
let evtExitedOnTime = ctx.eventThreadExitSignal.waitSync(ThreadExitTimeout).valueOr:
return err("error waiting for event thread exit: " & $error)
if not evtExitedOnTime:
return err("event thread did not exit in time; leaking ctx to avoid hang")
?ctx.eventThreadExitSignal.waitExitOrErr("event thread", ThreadExitTimeout)
joinThread(ctx.eventThread)
return ok()
ok()
proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Stops the FFI context that was created via createFFIContext[T]() (heap).
## Stops a heap-allocated FFI context.
ctx.stopAndJoinThreads().isOkOr:
return err("clearContext: " & $error)
ctx.cleanUpResources().isOkOr:
return err("cleanUpResources failed: " & $error)
return ok()
ok()

View File

@ -3,14 +3,10 @@ import results
import ./ffi_context
const MaxFFIContexts* = 32
## Maximum number of concurrently live FFI contexts when using FFIContextPool.
## Fds and threads are only consumed for slots that are actually acquired,
## so this value only affects the upfront memory of the pool array.
# Only affects upfront pool memory; fds/threads consumed per acquired slot.
type FFIContextPool*[T] = object
## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context
## and bounds the total number of file descriptors consumed by ThreadSignalPtrs
## to at most MaxFFIContexts * 2.
## Fixed pool. Bounds ThreadSignalPtr fds at MaxFFIContexts * 2.
slots: array[MaxFFIContexts, FFIContext[T]]
inUse: array[MaxFFIContexts, Atomic[bool]]
@ -19,7 +15,7 @@ proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], stri
var expected = false
if pool.inUse[i].compareExchange(expected, true):
return ok(pool.slots[i].addr)
return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
for i in 0 ..< MaxFFIContexts:
@ -30,39 +26,31 @@ proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
proc createFFIContext*[T](
pool: var FFIContextPool[T]
): Result[ptr FFIContext[T], string] =
## Acquires a slot from the fixed pool and initialises it as an FFI context.
## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open.
let ctx = pool.acquireSlot().valueOr:
return err("createFFIContext: acquireSlot failed: " & $error)
initContextResources(ctx).isOkOr:
pool.releaseSlot(ctx)
return err("createFFIContext: initContextResources failed: " & $error)
return ok(ctx)
ok(ctx)
proc destroyFFIContext*[T](
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
): Result[void, string] =
## Stops the FFI context and returns its slot to the pool. If the FFI thread
## is blocked and does not exit in time, the slot is leaked rather than
## reclaimed — closing its resources while the thread is still live would be
## unsafe.
## On thread-exit timeout the slot is leaked — closing live-thread resources is unsafe.
ctx.stopAndJoinThreads().isOkOr:
return err("destroyFFIContext(pool): " & $error)
# Without this, the next acquisition would re-init an already-initialised
# lock (UB) and leak the previous signal fds.
# Required: next acquisition would otherwise re-init a live lock (UB).
let deinitRes = ctx.deinitContextResources()
pool.releaseSlot(ctx)
deinitRes.isOkOr:
return err("destroyFFIContext(pool): " & $error)
return ok()
ok()
proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool =
## Returns true only if ctx points to one of the pool's slots that is
## currently in use. Rejects nil, offset-invalid, and dangling pointers
## at the API boundary, preventing use-after-free dereferences.
## Rejects nil / offset-invalid / dangling pointers at the API boundary.
if ctx.isNil():
return false
for i in 0 ..< MaxFFIContexts:
if cast[pointer](pool.slots[i].addr) == ctx:
return pool.inUse[i].load()
return false
false

View File

@ -1,19 +1,15 @@
## Event registry, bounded SPSC event queue, and dispatch templates for
## FFI library-initiated events. Listeners receive only the event name
## they subscribed to. Queue payloads travel via `c_malloc` so transfer
## across Nim heaps is safe under both `--mm:orc` and `--mm:refc`.
## Per-context event registry + bounded SPSC queue. FFI thread enqueues,
## event thread drains; payloads travel via `c_malloc` so they survive
## pool-slot reuse across thread heaps.
{.pragma: callback, cdecl, raises: [], gcsafe.}
import system/ansi_c
import std/[locks, sequtils, options, tables]
import std/[atomics, locks, sequtils, options, tables]
import chronicles
import ./ffi_types, ./cbor_serial
import ./ffi_types, ./cbor_serial, ./alloc
type EventEnvelope*[T] = object
## Standard wire shape for CBOR-encoded FFI events:
## { eventType: tstr, payload: <T> }
## Pair with `dispatchFFIEventCbor` (or call `cborEncode` directly).
type EventEnvelope*[T] = object ## CBOR wire shape: { eventType: tstr, payload: <T> }.
eventType*: string
payload*: T
@ -24,32 +20,20 @@ type
userData*: pointer
FFIEventRegistry* = object
## Per-context multi-listener registry. `lock` guards every mutation;
## readers (dispatch path) acquire it only long enough to copy out the
## listener slice for the event being dispatched.
lock*: Lock
nextId*: uint64 ## Monotonic id source. 0 is reserved as "invalid"; ids start at 1.
nextId*: uint64 # 0 is reserved as "invalid"; ids start at 1.
byEvent*: Table[string, seq[FFIEventListener]]
proc initEventRegistry*(reg: var FFIEventRegistry) =
## Must be called exactly once on the owning thread before the registry
## is shared. The embedded `Lock` wraps a platform primitive that cannot
## be safely double-initialised, so concurrent callers would hit UB at
## the OS layer — the lock itself can't defend against its own init.
## Must run once on the owning thread before sharing — `initLock` on a
## live primitive is UB at the OS layer.
reg.lock.initLock()
reg.nextId = 0'u64
reg.byEvent = initTable[string, seq[FFIEventListener]]()
proc deinitEventRegistry*(reg: var FFIEventRegistry) =
## Mirror of `initEventRegistry`: must be called exactly once, by the
## same thread that owns the registry, after all other threads have
## stopped using it. `deinitLock` on a platform primitive that any
## thread might still be holding or about to acquire is UB at the OS
## layer.
##
## Resets the GC-managed fields to default so `FFIContextPool`'s
## slot reuse on a *different* thread doesn't trigger Nim's hidden
## assignment destructor against this thread's heap allocations.
## Mirror of `initEventRegistry`; same single-thread constraint. Resets GC
## fields so pool-slot reuse on another thread sees no hidden dtor.
reg.lock.deinitLock()
reg.byEvent = default(Table[string, seq[FFIEventListener]])
reg.nextId = 0'u64
@ -60,11 +44,7 @@ proc addEventListener*(
callback: FFICallBack,
userData: pointer,
): uint64 {.raises: [].} =
## Registers `callback` for `eventName` and returns the listener's stable
## id (always non-zero on success). A listener only receives events
## dispatched under its own `eventName` — subscribe to each event
## separately. Returns 0 if `callback` is nil — the only documented
## failure mode.
## Returns the listener id (>0), or 0 if `callback` is nil.
if callback.isNil():
return 0
@ -79,10 +59,8 @@ proc addEventListener*(
assigned
proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises: [].} =
## Removes the listener with `id`. Returns true on success, false if no
## listener with that id exists. Safe to call from inside a dispatch:
## the in-flight snapshot still delivers exactly once to the listener
## being removed.
## Safe to call from inside a dispatch — the in-flight snapshot still
## delivers exactly once to the removed listener.
if id == 0'u64:
return false
@ -106,41 +84,33 @@ proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises:
removed
proc removeAllEventListeners*(reg: var FFIEventRegistry) {.raises: [].} =
## Drops every registered listener. Does not reset the listener-id
## counter — subsequent `addEventListener` calls still return strictly
## increasing ids.
## Does not reset the id counter.
withLock reg.lock:
reg.byEvent.clear()
proc snapshotListeners*(
reg: var FFIEventRegistry, eventName: string
): seq[FFIEventListener] {.raises: [].} =
## Returns a copy of the listener slice for `eventName`. The copy is what
## makes re-entrant add/remove from inside a handler deadlock-free:
## dispatch holds the lock only for the duration of the copy, then
## iterates the copy outside the lock.
## Lock held only across the copy — keeps re-entrant add/remove
## from a handler deadlock-free.
var listeners: seq[FFIEventListener] = @[]
withLock reg.lock:
# `getOrDefault` avoids the raising `[]` path; returns empty when absent.
for l in reg.byEvent.getOrDefault(eventName):
listeners.add(l)
listeners
const EventQueueCapacity* = 1024
## ~24 KiB per context. Sustained backlog at this depth means a
## listener is wedged — what the stuck flag exists to surface.
# Sustained backlog at this depth means a listener is wedged.
type
QueuedEvent* = object
## All fields are raw `c_malloc` pointers so the buffer survives
## pool-slot reuse across thread heaps without an assignment dtor.
# Raw `c_malloc` pointers so the buffer survives pool-slot reuse
# across thread heaps without an assignment dtor.
name*: cstring
data*: ptr UncheckedArray[byte]
dataLen*: int
EventQueue* = object
## SPSC ring: FFI thread enqueues, event thread dequeues. Plain lock
## (no atomic indices) — operations are short and uncontended.
EventQueue* = object # SPSC ring; plain lock since ops are short and uncontended.
lock*: Lock
head*: int
tail*: int
@ -148,7 +118,6 @@ type
buf*: array[EventQueueCapacity, QueuedEvent]
proc initEventQueue*(q: var EventQueue) {.raises: [].} =
## Same single-owning-thread constraint as `initEventRegistry`.
q.lock.initLock()
q.head = 0
q.tail = 0
@ -156,14 +125,18 @@ proc initEventQueue*(q: var EventQueue) {.raises: [].} =
for i in 0 ..< EventQueueCapacity:
q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0)
proc freeEventBuffers*(
name: cstring, data: ptr UncheckedArray[byte]
) {.raises: [], gcsafe.} =
if not name.isNil():
c_free(cast[pointer](name))
if not data.isNil():
c_free(data)
proc deinitEventQueue*(q: var EventQueue) {.raises: [].} =
## Both producer and consumer must have stopped before calling.
## Both producer and consumer must have stopped.
for i in 0 ..< EventQueueCapacity:
let e = q.buf[i]
if not e.name.isNil:
c_free(cast[pointer](e.name))
if not e.data.isNil:
c_free(e.data)
freeEventBuffers(q.buf[i].name, q.buf[i].data)
q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0)
q.head = 0
q.tail = 0
@ -173,8 +146,7 @@ proc deinitEventQueue*(q: var EventQueue) {.raises: [].} =
proc tryEnqueueEvent*(
q: var EventQueue, name: cstring, data: ptr UncheckedArray[byte], dataLen: int
): bool {.raises: [], gcsafe.} =
## Both `name` and `data` must be `c_malloc`'d; on success the queue
## takes ownership. On false the caller still owns and must free them.
## On true the queue owns `name`/`data`; on false the caller still does.
withLock q.lock:
if q.count >= EventQueueCapacity:
return false
@ -184,31 +156,29 @@ proc tryEnqueueEvent*(
true
proc tryDequeueEvent*(q: var EventQueue): Option[QueuedEvent] {.raises: [], gcsafe.} =
## Transfers buffer ownership to the caller, who must `c_free` both.
## Caller takes ownership and must `c_free` both buffers.
withLock q.lock:
if q.count == 0:
return none(QueuedEvent)
let e = q.buf[q.head]
let dequeued = q.buf[q.head]
q.buf[q.head] = QueuedEvent(name: nil, data: nil, dataLen: 0)
q.head = (q.head + 1) mod EventQueueCapacity
q.count.dec()
return some(e)
return some(dequeued)
proc eventQueueLen*(q: var EventQueue): int {.raises: [], gcsafe.} =
withLock q.lock:
return q.count
const emptyListenerPayload*: cstring = ""
## Non-nil zero-length buffer handed to listeners when a payload is
## empty, so a consumer doing `std::string(data, len)` / `memcpy` never
## receives a nil pointer (which is UB even at len 0).
## Non-nil zero-length buffer handed to listeners when the payload is empty
## (a nil pointer would be UB for consumers doing `memcpy` even at len 0).
proc notifyListeners*(
listeners: seq[FFIEventListener], retCode: cint, data: pointer, dataLen: int
) =
## Fans out a payload to every listener in the snapshot. Empty payloads
## are delivered as the non-nil `emptyListenerPayload` sentinel so a
## consumer doing `std::string(data, len)` / `memcpy` never receives nil.
## Empty payloads go through `emptyListenerPayload` so consumers doing
## `std::string(data, len)` / `memcpy` never see a nil pointer.
let n = max(dataLen, 0)
let dataPtr =
if n > 0 and not data.isNil():
@ -219,8 +189,6 @@ proc notifyListeners*(
listener.callback(retCode, dataPtr, cast[csize_t](n), listener.userData)
proc notifyListenersErr*(listeners: seq[FFIEventListener], msg: string) =
## Error fan-out: adapts the message string to `notifyListeners`, which
## supplies the non-nil pointer for the empty-message case.
let p =
if msg.len > 0:
cast[pointer](unsafeAddr msg[0])
@ -229,63 +197,62 @@ proc notifyListenersErr*(listeners: seq[FFIEventListener], msg: string) =
notifyListeners(listeners, RET_ERR, p, msg.len)
var ffiCurrentEventRegistry* {.threadvar.}: ptr FFIEventRegistry
## Set by the FFI thread at startup so dispatchFFIEvent / dispatchFFIEventCbor
## can find their registry without taking a context pointer per call site.
# Kept for tests that drive the registry directly.
template withFFIEventDispatch(eventName: string, listeners, body: untyped) =
## Shared scaffold for `dispatchFFIEvent` / `dispatchFFIEventCbor`:
## resolves the thread-local registry, snapshots listeners under
## `reg.lock` into the caller-named `listeners` binding, then runs
## `body` inside `foreignThreadGc` + try/except.
let regPtr = ffiCurrentEventRegistry
if regPtr.isNil():
chronicles.error eventName & " - event registry not set on this thread"
return
var ffiCurrentEventQueue* {.threadvar.}: ptr EventQueue
# Installed by the FFI thread so dispatch templates need no `ctx`.
withLock regPtr[].lock:
let listeners = regPtr[].byEvent.getOrDefault(eventName)
if listeners.len == 0:
chronicles.debug eventName & " - no listener registered"
else:
foreignThreadGc:
try:
body
except Exception, CatchableError:
notifyListenersErr(
listeners,
"Exception dispatching " & eventName & ": " & getCurrentExceptionMsg(),
)
var ffiCurrentEventQueueStuck* {.threadvar.}: ptr Atomic[bool]
# Sticky overflow flag; FFI request entry point reads it to reject.
template dispatchFFIEvent*(eventName: string, body: untyped) =
## Dispatches an FFI event to every listener subscribed to `eventName`.
## `body` must yield a `string` or `seq[byte]`.
##
## Valid only on the FFI thread (where `ffiCurrentEventRegistry` is set).
## Holds `reg.lock` for the entire snapshot + invocation so a concurrent
## `removeEventListener` from a foreign thread blocks until dispatch
## returns. Handlers must not call addEventListener / removeEventListener
## on the same registry (would self-deadlock).
withFFIEventDispatch(eventName, listeners):
let event = body
let dataPtr: pointer =
if event.len > 0:
cast[pointer](unsafeAddr event[0])
else:
cast[pointer](emptyListenerPayload)
notifyListeners(listeners, RET_OK, dataPtr, event.len)
var ffiCurrentNotifyEventEnqueued* {.threadvar.}: proc() {.gcsafe, raises: [].}
# Hook so this module doesn't depend on chronos's ThreadSignalPtr.
# Nil-safe; tick-driven tests leave it unset.
template enqueueOrMarkStuck(
eventName: string, namePtr: cstring, dataPtr: ptr UncheckedArray[byte], dataLen: int
) =
## Takes ownership of `namePtr`/`dataPtr`. On queue-full sets the sticky
## stuck flag and wakes the event thread (firing onNotResponding from here
## would risk deadlock against a back-pressuring listener).
block enqueueBlock:
let q = ffiCurrentEventQueue
if q.isNil():
chronicles.error "event queue not set on this thread", event = eventName
freeEventBuffers(namePtr, dataPtr)
break enqueueBlock
if not q[].tryEnqueueEvent(namePtr, dataPtr, dataLen):
chronicles.error "event queue full; library marked stuck",
event = eventName, capacity = EventQueueCapacity
freeEventBuffers(namePtr, dataPtr)
if not ffiCurrentEventQueueStuck.isNil():
ffiCurrentEventQueueStuck[].store(true)
if not ffiCurrentNotifyEventEnqueued.isNil():
ffiCurrentNotifyEventEnqueued()
break enqueueBlock
if not ffiCurrentNotifyEventEnqueued.isNil():
ffiCurrentNotifyEventEnqueued()
## `body` must yield `string` / `seq[byte]`. FFI thread only: encodes into
## a `c_malloc` buffer and enqueues; the event thread fans out to listeners.
block:
let evtName: string = eventName
let bodyVal = body
var dataPtr: ptr UncheckedArray[byte] = nil
let dataLen = bodyVal.len
if dataLen > 0:
dataPtr = cast[ptr UncheckedArray[byte]](c_malloc(csize_t(dataLen)))
copyMem(dataPtr, unsafeAddr bodyVal[0], dataLen)
let namePtr = alloc(evtName)
enqueueOrMarkStuck(evtName, namePtr, dataPtr, dataLen)
template dispatchFFIEventCbor*(eventName: string, eventPayload: typed) =
## Typed CBOR variant of `dispatchFFIEvent`. Wraps `eventPayload` in an
## `EventEnvelope`, CBOR-encodes it into a `c_malloc` buffer once, and
## fans the same buffer out to every registered listener.
##
## NB: parameter is `eventPayload`, not `payload` — Nim's template
## substitution would otherwise also rewrite the `payload:` field inside
## `EventEnvelope`.
withFFIEventDispatch(eventName, listeners):
var (data, dataLen) = cborEncodeShared(
EventEnvelope[typeof(eventPayload)](eventType: eventName, payload: eventPayload)
## Typed CBOR variant of `dispatchFFIEvent`. The param is `eventPayload`
## (not `payload`) to avoid clobbering `EventEnvelope.payload` substitution.
block:
let evtName: string = eventName
var (dataPtr, dataLen) = cborEncodeShared(
EventEnvelope[typeof(eventPayload)](eventType: evtName, payload: eventPayload)
)
defer:
cborFreeShared(data)
notifyListeners(listeners, RET_OK, data, dataLen)
let namePtr = alloc(evtName)
enqueueOrMarkStuck(evtName, namePtr, dataPtr, dataLen)

View File

@ -12,22 +12,22 @@
proc sendRequestToFFIThread*(
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
): Result[void, string] =
# Reentrancy guard: only this thread can fire `reqReceivedSignal`, so a handler dispatching back would self-deadlock.
if ctx.eventQueueStuck.load():
deleteRequest(ffiRequest)
return err("event queue stuck - library cannot accept new requests")
if onFFIThread:
# Re-entrant dispatch from a handler would self-deadlock on `reqReceivedSignal`.
deleteRequest(ffiRequest)
return err(
"reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context"
)
# All async submissions serialise on `ctx.lock` for the full
# trySend + fireSync + waitSync sequence because `reqChannel` is
# single-producer and `reqReceivedSignal` is shared across callers.
# Multi-producer redesign is tracked as PR #23 review item 7.
# Serialise the trySend + fireSync + waitSync — reqChannel is SP and reqReceivedSignal is shared.
ctx.lock.acquire()
defer:
ctx.lock.release()
## Sending the request
let sentOk = ctx.reqChannel.trySend(ffiRequest)
if not sentOk:
deleteRequest(ffiRequest)
@ -42,15 +42,12 @@ proc sendRequestToFFIThread*(
deleteRequest(ffiRequest)
return err("Couldn't fireSync in time")
## wait until the FFI working thread properly received the request
let res = ctx.reqReceivedSignal.waitSync(timeout)
if res.isErr():
## Do not free ffiRequest here: the FFI thread was already signaled and
## will process (and free) it.
# FFI thread was signaled and owns the request; don't double-free.
return err("Couldn't receive reqReceivedSignal signal")
## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the
## process proc.
# On ok the FFI thread's processRequest deallocShared(req)'s.
ok()
proc processRequest[T](
@ -59,44 +56,44 @@ proc processRequest[T](
## Invoked within the FFI thread to process a request coming from the FFI API consumer thread.
let reqId = $request[].reqId
## The reqId determines which proc will handle the request.
## The registeredRequests represents a table defined at compile time.
## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously]
## Explicit conversion keeps `reqId` alive as the backing string,
## avoiding the implicit string→cstring warning that will become an error.
let reqIdCs = reqId.cstring
let reqIdCs = reqId.cstring # keeps reqId alive; implicit string→cstring is a warning.
let retFut =
if not ctx[].registeredRequests[].contains(reqIdCs):
## That shouldn't happen because only registered requests should be sent to the FFI thread.
nilProcess(request[].reqId)
else:
ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx)
## Catch every catchable exception (including CancelledError raised by
## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest`
## defer — always runs. Otherwise an abandoned in-flight handler would
## leak its request envelope, reqId copy, and CBOR payload.
# CatchableError covers CancelledError from the shutdown drain; handleRes must still run.
let res =
try:
await retFut
except CatchableError as exc:
except CatchableError as e:
Result[seq[byte], string].err(
"Error in processRequest for " & reqId & ": " & exc.msg
"Error in processRequest for " & reqId & ": " & e.msg
)
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
## keeps the async proc raises:[] compatible. The defer inside handleRes
## guarantees request is freed before the exception propagates.
try:
handleRes(res, request)
except Exception as exc:
error "Unexpected exception in handleRes", error = exc.msg
except Exception as e:
error "Unexpected exception in handleRes", error = e.msg
var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr
# Stashed so the hook has no closure env.
proc ffiNotifyEventEnqueuedHook() {.gcsafe, raises: [].} =
if not ffiEventQueueSignalPtr.isNil():
let res = ffiEventQueueSignalPtr.fireSync()
if res.isErr():
error "failed to fire eventQueueSignal after enqueue", err = res.error
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
## FFI thread body that attends library user API requests
ffiCurrentEventRegistry = addr ctx[].eventRegistry
ffiCurrentEventQueue = addr ctx[].eventQueue
ffiCurrentEventQueueStuck = addr ctx[].eventQueueStuck
ffiEventQueueSignalPtr = ctx.eventQueueSignal
ffiCurrentNotifyEventEnqueued = ffiNotifyEventEnqueuedHook
onFFIThread = true
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
@ -109,23 +106,18 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
var ffiReqHandler: T
## Holds the main library object, i.e., in charge of handling the ffi requests.
## e.g., Waku, LibP2P, SDS, etc.
var ffiReqHandler: T # main library object (Waku, LibP2P, SDS, …)
## In-flight processRequest futures. Tracked so they can be drained on
## shutdown — otherwise destroying the context while a handler is
## awaiting (e.g. sleepAsync) abandons the future and leaks the
## request's envelope/reqId/payload allocations.
# Tracked so shutdown can drain them; abandoning a mid-await future leaks the request.
var pending: seq[Future[void]] = @[]
proc reapCompleted() =
var i = 0
while i < pending.len:
if pending[i].finished():
pending.del(i)
else:
if not pending[i].finished():
inc i
continue
pending.del(i)
while ctx.running.load():
# Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread.
@ -137,7 +129,6 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
if not gotSignal:
continue
## Wait for a request from the ffi consumer thread
var request: ptr FFIThreadRequest
if not ctx.reqChannel.tryRecv(request):
continue
@ -145,22 +136,18 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
if ctx.myLib.isNil():
ctx.myLib = addr ffiReqHandler
## Handle the request
pending.add processRequest(request, ctx)
let fireRes = ctx.reqReceivedSignal.fireSync()
if fireRes.isErr():
error "could not fireSync back to requester thread", error = fireRes.error
## Drain in-flight handlers so each request's `deleteRequest` runs
## before we exit. Without this, abandoning a future mid-await would
## leak the request allocations (visible to LSan; previously hidden
## because Nim's pool allocator kept the chunks alive in the process).
# Drain so each pending handler's deleteRequest defer runs before exit.
reapCompleted()
if pending.len > 0:
try:
await allFutures(pending)
except CatchableError as exc:
error "draining pending FFI requests on shutdown raised", error = exc.msg
except CatchableError as e:
error "draining pending FFI requests on shutdown raised", error = e.msg
waitFor ffiRun(ctx)

View File

@ -1,11 +1,5 @@
## Tests for the CBOR-style FFI event dispatch path:
## - `dispatchFFIEvent` accepts both `string` and `seq[byte]` bodies
## - `dispatchFFIEventCbor` wraps a typed payload in `EventEnvelope[T]`,
## CBOR-encodes it, and dispatches via the event callback
##
## Tests run end-to-end against a real FFI thread (via FFIContextPool +
## sendRequestToFFIThread) so we exercise the threadvar-backed
## ffiCurrentEventRegistry wiring, not just the templates in isolation.
## End-to-end tests for `dispatchFFIEvent` / `dispatchFFIEventCbor`,
## driven through a real `FFIContext` so the threadvar wiring is exercised.
import std/[locks, os]
import unittest2
@ -14,16 +8,10 @@ import ffi
type TestEvtLib = object
## Event payload type (would be `{.ffi.}` in production so the codec gen
## emits a matching struct on the foreign side; the test only needs CBOR
## round-trip, which `cborEncode`/`cborDecode` provide via cbor_serial's
## generic overloads).
type MessageSentBody* {.ffi.} = object
requestId*: string
messageHash*: string
## Same callback-state helper as test_ffi_context.nim, duplicated here so
## this file stays a self-contained test binary.
type CallbackData = object
lock: Lock
cond: Cond
@ -40,6 +28,13 @@ proc deinitCallbackData(d: var CallbackData) =
d.cond.deinitCond()
d.lock.deinitLock()
template setupCallbackData(name: untyped) =
## Declares `name`, inits it, and defers its deinit in the caller's scope.
var name: CallbackData
initCallbackData(name)
defer:
deinitCallbackData(name)
proc captureCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
@ -60,15 +55,27 @@ proc waitCallback(d: var CallbackData) =
wait(d.cond, d.lock)
release(d.lock)
proc resetCalled(d: var CallbackData) =
acquire(d.lock)
d.called = false
release(d.lock)
proc callbackBytes(d: var CallbackData): seq[byte] =
var bytes = newSeq[byte](d.msgLen)
if d.msgLen > 0:
copyMem(addr bytes[0], addr d.msg[0], d.msgLen)
bytes
## A request that dispatches a typed CBOR event from inside the FFI
## thread and then returns ok — so the response callback can be used to
## synchronize the test.
template withPool(ctxIdent: untyped, body: untyped) =
## Sets up pool + ctx, runs body, destroys on exit.
var pool: FFIContextPool[TestEvtLib]
let ctxIdent = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctxIdent)
body
registerReqFFI(EmitCborEventRequest, lib: ptr TestEvtLib):
proc(): Future[Result[string, string]] {.async.} =
dispatchFFIEventCbor(
@ -76,19 +83,13 @@ registerReqFFI(EmitCborEventRequest, lib: ptr TestEvtLib):
)
return ok("emitted")
## A request that uses the lower-level `dispatchFFIEvent` with a raw
## `seq[byte]` body — the path that previously rejected non-string bodies.
registerReqFFI(EmitRawBytesEventRequest, lib: ptr TestEvtLib):
proc(): Future[Result[string, string]] {.async.} =
dispatchFFIEvent("raw_bytes"):
@[byte 0x01, 0x02, 0x03]
return ok("emitted")
## Setter-thread worker for the registry race regression test. Each
## iteration adds then immediately removes a listener for the dispatched
## event so a TSan-instrumented build can confirm `FFIEventRegistry.lock`
## serialises the cross-thread mutation against dispatch-time
## `snapshotListeners` reads from the FFI thread.
# Add/remove worker for the registry-race regression test.
type SetterArgs =
tuple[
ctx: ptr FFIContext[TestEvtLib], stop: ptr Atomic[bool], target: ptr CallbackData
@ -102,144 +103,83 @@ proc setterThreadBody(args: SetterArgs) {.thread.} =
suite "dispatchFFIEventCbor":
test "delivers EventEnvelope-shaped CBOR payload to event callback":
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
# CallbackData defers declared first run last (LIFO), AFTER pool destroy
# joins the event thread — otherwise TSan flags captureCb on a destroyed mutex.
setupCallbackData(evt)
setupCallbackData(rsp)
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
withPool(ctx):
discard addEventListener(ctx[].eventRegistry, "message_sent", captureCb, addr evt)
# Subscribe to the specific event the request below dispatches.
discard addEventListener(ctx[].eventRegistry, "message_sent", captureCb, addr evt)
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
waitCallback(evt)
# Trigger the dispatch from the FFI thread; the response callback is
# ignored (we only care that the request completed so we know the event
# has fired).
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
waitCallback(evt)
check evt.retCode == RET_OK
let decoded = cborDecode(callbackBytes(evt), EventEnvelope[MessageSentBody])
check decoded.isOk()
check decoded.value.eventType == "message_sent"
check decoded.value.payload.requestId == "req-1"
check decoded.value.payload.messageHash == "0xdeadbeef"
check evt.retCode == RET_OK
let decoded = cborDecode(callbackBytes(evt), EventEnvelope[MessageSentBody])
check decoded.isOk()
check decoded.value.eventType == "message_sent"
check decoded.value.payload.requestId == "req-1"
check decoded.value.payload.messageHash == "0xdeadbeef"
suite "dispatchFFIEvent with seq[byte]":
test "accepts a raw seq[byte] body":
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
setupCallbackData(evt)
setupCallbackData(rsp)
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
withPool(ctx):
discard addEventListener(ctx[].eventRegistry, "raw_bytes", captureCb, addr evt)
discard addEventListener(ctx[].eventRegistry, "raw_bytes", captureCb, addr evt)
check sendRequestToFFIThread(
ctx, EmitRawBytesEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
waitCallback(evt)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
check sendRequestToFFIThread(
ctx, EmitRawBytesEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
waitCallback(evt)
check evt.retCode == RET_OK
check callbackBytes(evt) == @[byte 0x01, 0x02, 0x03]
check evt.retCode == RET_OK
check callbackBytes(evt) == @[byte 0x01, 0x02, 0x03]
when not defined(gcRefc):
## Skipped under `--mm:refc`: each setter thread grows / shrinks the
## per-event listener `seq[FFIEventListener]` via `addEventListener`,
## and refc's per-thread GC heap ownership makes cross-thread seq
## buffer reallocation unsafe even when the surrounding lock is held.
## ORC + the FFI thread + tsan (the combo this test was written for)
## does not have that limitation.
## Skipped under refc: setter threads grow/shrink the per-event listener
## seq, and refc's per-thread GC heap makes that unsafe cross-thread.
suite "FFIEventRegistry concurrent access":
## Regression for PR #39 review comments r3288220895 / r3289285387.
## Run under tsan to actually validate the fix:
## NIM_FFI_SAN=tsan NIM_FFI_MM=orc nimble test_sanitized
## Regression for PR #39 (r3288220895 / r3289285387).
## Validate with: NIM_FFI_SAN=tsan NIM_FFI_MM=orc nimble test_sanitized
test "concurrent add/remove writers vs dispatch reads stay race-free":
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
setupCallbackData(evt)
setupCallbackData(rsp)
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
withPool(ctx):
# Seed an initial listener so the first dispatch has a target.
discard
addEventListener(ctx[].eventRegistry, "message_sent", captureCb, addr evt)
# Seed an initial callback so the FFI thread's first dispatch has a
# target. The setter threads will then repeatedly re-install the same
# (callback, userData) pair — what matters is the cross-thread write
# racing the FFI thread's read, not which pair "wins".
discard addEventListener(ctx[].eventRegistry, "message_sent", captureCb, addr evt)
const NumSetterThreads = 4
const NumDispatchIters = 200
const NumSetterThreads = 4
const NumDispatchIters = 200
var stop: Atomic[bool]
stop.store(false)
var setters: array[NumSetterThreads, Thread[SetterArgs]]
for i in 0 ..< NumSetterThreads:
createThread(setters[i], setterThreadBody, (ctx, addr stop, addr evt))
var stop: Atomic[bool]
stop.store(false)
var setters: array[NumSetterThreads, Thread[SetterArgs]]
for i in 0 ..< NumSetterThreads:
createThread(setters[i], setterThreadBody, (ctx, addr stop, addr evt))
for _ in 0 ..< NumDispatchIters:
resetCalled(rsp)
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
stop.store(true)
for i in 0 ..< NumSetterThreads:
joinThread(setters[i])
for _ in 0 ..< NumDispatchIters:
# Reset rsp so each iteration's `waitCallback` blocks until the
# FFI thread fires the response — keeps the loop synchronous.
acquire(rsp.lock)
rsp.called = false
release(rsp.lock)
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
stop.store(true)
for i in 0 ..< NumSetterThreads:
joinThread(setters[i])
# `evt` got hit by every dispatch above; just confirm at least one
# actually landed so a silently-broken dispatch loop is caught.
check evt.called
## A foreign-thread mutation must not be able to invalidate the
## listener's `userData` while an in-flight dispatch is mid-invocation.
## The dispatch templates hold `reg.lock` for the entire snapshot +
## invocation, so foreign `removeEventListener` blocks until dispatch
## returns.
check evt.called
type SlowState = object
entered: Atomic[bool]
@ -248,54 +188,41 @@ type SlowState = object
proc slowEventCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
## Signal entry, sleep briefly (the window during which the main
## thread must call removeEventListener and block), signal exit.
let st = cast[ptr SlowState](userData)
st[].entered.store(true)
os.sleep(15)
os.sleep(60)
st[].exited.store(true)
type DispatcherArgs = tuple[reg: ptr FFIEventRegistry, done: ptr Atomic[bool]]
proc dispatcherBody(args: DispatcherArgs) {.thread.} =
ffiCurrentEventRegistry = args.reg
dispatchFFIEvent("evt"):
"payload"
args.done[].store(true)
suite "registry lock held during invocation":
test "removeEventListener blocks until in-flight dispatch finishes":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
setupCallbackData(rsp)
var st: SlowState
st.entered.store(false)
st.exited.store(false)
withPool(ctx):
var st: SlowState
st.entered.store(false)
st.exited.store(false)
let id = addEventListener(reg, "evt", slowEventCb, addr st)
check id != 0'u64
let id =
addEventListener(ctx[].eventRegistry, "message_sent", slowEventCb, addr st)
check id != 0'u64
var done: Atomic[bool]
done.store(false)
var thr: Thread[DispatcherArgs]
createThread(thr, dispatcherBody, (addr reg, addr done))
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
# Wait until the worker thread is inside slowEventCb.
for _ in 0 ..< 200:
if st.entered.load():
break
os.sleep(1)
check st.entered.load()
check not st.exited.load()
for _ in 0 ..< 500:
if st.entered.load():
break
os.sleep(1)
check st.entered.load()
check not st.exited.load()
# Lock-during-invocation contract: remove blocks until dispatch
# finishes; by the time it returns, slowEventCb has set exited=true.
check removeEventListener(reg, id)
check st.exited.load()
joinThread(thr)
check done.load()
# Lock-during-invocation: remove blocks until dispatch finishes,
# by which time slowEventCb has set exited=true.
check removeEventListener(ctx[].eventRegistry, id)
check st.exited.load()
suite "liveness events":
## `onNotResponding` / `onResponding` bypass the event queue and dispatch
@ -304,63 +231,41 @@ suite "liveness events":
## name + CBOR-encoded `EventEnvelope[…]`) so a future refactor can't
## silently break consumers polling for the "library hung" signal.
test "onNotResponding delivers EventEnvelope[NotRespondingEvent] to subscribers":
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
setupCallbackData(evt)
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
withPool(ctx):
discard addEventListener(
ctx[].eventRegistry, NotRespondingEventName, captureCb, addr evt
)
discard
addEventListener(ctx[].eventRegistry, NotRespondingEventName, captureCb, addr evt)
onNotResponding(ctx)
onNotResponding(ctx)
waitCallback(evt)
check evt.retCode == RET_OK
let decoded = cborDecode(callbackBytes(evt), EventEnvelope[NotRespondingEvent])
check decoded.isOk()
check decoded.value.eventType == NotRespondingEventName
waitCallback(evt)
check evt.retCode == RET_OK
let decoded = cborDecode(callbackBytes(evt), EventEnvelope[NotRespondingEvent])
check decoded.isOk()
check decoded.value.eventType == NotRespondingEventName
test "onResponding delivers EventEnvelope[RespondingEvent] to subscribers":
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
setupCallbackData(evt)
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
withPool(ctx):
discard
addEventListener(ctx[].eventRegistry, RespondingEventName, captureCb, addr evt)
discard
addEventListener(ctx[].eventRegistry, RespondingEventName, captureCb, addr evt)
onResponding(ctx)
onResponding(ctx)
waitCallback(evt)
check evt.retCode == RET_OK
let decoded = cborDecode(callbackBytes(evt), EventEnvelope[RespondingEvent])
check decoded.isOk()
check decoded.value.eventType == RespondingEventName
waitCallback(evt)
check evt.retCode == RET_OK
let decoded = cborDecode(callbackBytes(evt), EventEnvelope[RespondingEvent])
check decoded.isOk()
check decoded.value.eventType == RespondingEventName
test "liveness events with no subscriber are a no-op":
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
# No listener registered — must not crash, must not block.
onNotResponding(ctx)
onResponding(ctx)
withPool(ctx):
# No listener registered — must not crash, must not block.
onNotResponding(ctx)
onResponding(ctx)
suite "event thread drains queued events":
## The event thread wakes every `EventThreadTickInterval` (or on
@ -370,28 +275,19 @@ suite "event thread drains queued events":
## deliver it — exercises the `tryEnqueueEvent` → `drainEventQueue` →
## `dispatchQueuedEvent` → listener path end-to-end.
test "enqueued event is delivered to subscriber within a tick":
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
setupCallbackData(evt)
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
withPool(ctx):
const QueuedEvtName = "queued_evt"
discard addEventListener(ctx[].eventRegistry, QueuedEvtName, captureCb, addr evt)
const QueuedEvtName = "queued_evt"
discard addEventListener(ctx[].eventRegistry, QueuedEvtName, captureCb, addr evt)
# `tryEnqueueEvent` takes ownership of both buffers on success; the
# event thread c_frees them after dispatch returns.
let nameBuf = alloc(QueuedEvtName)
let payload = @[byte 0xDE, 0xAD, 0xBE, 0xEF]
var shared = allocSharedSeq(payload)
check tryEnqueueEvent(ctx[].eventQueue, nameBuf, shared.data, shared.len)
# `tryEnqueueEvent` takes ownership of both buffers on success; the
# event thread c_frees them after dispatch returns.
let nameBuf = alloc(QueuedEvtName)
let payload = @[byte 0xDE, 0xAD, 0xBE, 0xEF]
var shared = allocSharedSeq(payload)
check tryEnqueueEvent(ctx[].eventQueue, nameBuf, shared.data, shared.len)
waitCallback(evt)
check evt.retCode == RET_OK
check callbackBytes(evt) == payload
waitCallback(evt)
check evt.retCode == RET_OK
check callbackBytes(evt) == payload

View File

@ -1,23 +1,12 @@
## Unit tests for the `FFIEventRegistry` primitive — the multi-listener
## data structure that will back `<lib>_add_event_listener` /
## `<lib>_remove_event_listener` once the dispatch wiring lands.
##
## These tests exercise the registry directly (no FFI thread, no dispatch
## templates) so they stay fast and pin down the registry's mutation and
## snapshot semantics in isolation.
## Unit tests for the `FFIEventRegistry` primitive (no FFI thread, no dispatch).
import std/locks
import unittest2
import ffi
# Tiny helpers — a thread-safe sink each listener writes into so we can
# assert which callbacks would fire and in what order once dispatch lands.
# Today only `tagCb`'s presence is exercised; the recorder is also used to
# make sure listener bookkeeping doesn't accidentally invoke callbacks.
type Recorder = object
lock: Lock
hits: seq[string] # tag captured from `userData` per invocation
hits: seq[string]
retCodes: seq[cint]
payloads: seq[string]
@ -34,7 +23,6 @@ proc record(r: var Recorder, tag: string, retCode: cint, payload: string) =
r.payloads.add(payload)
release(r.lock)
# Each listener is identified by a `Tag` passed through `userData`.
type Tag = object
name: string
rec: ptr Recorder
@ -48,16 +36,22 @@ proc tagCb(
copyMem(addr payload[0], msg, int(len))
record(t[].rec[], t[].name, retCode, payload)
template setupRegistry(regIdent: untyped) =
var regIdent: FFIEventRegistry
initEventRegistry(regIdent)
defer:
deinitEventRegistry(regIdent)
template setupRecorder(recIdent: untyped) =
var recIdent: Recorder
initRecorder(recIdent)
defer:
deinitRecorder(recIdent)
suite "FFIEventRegistry mutation":
test "addEventListener assigns monotonically increasing non-zero ids":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
var rec: Recorder
initRecorder(rec)
defer:
deinitRecorder(rec)
setupRegistry(reg)
setupRecorder(rec)
var t = Tag(name: "a", rec: addr rec)
let id1 = addEventListener(reg, "evt", tagCb, addr t)
@ -68,30 +62,18 @@ suite "FFIEventRegistry mutation":
check id3 == 3'u64
test "addEventListener returns 0 when callback is nil":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
setupRegistry(reg)
let id = addEventListener(reg, "evt", nil, nil)
check id == 0'u64
test "removeEventListener returns false for unknown ids":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
setupRegistry(reg)
check not removeEventListener(reg, 0'u64)
check not removeEventListener(reg, 99'u64)
test "removeEventListener removes listeners across distinct events":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
var rec: Recorder
initRecorder(rec)
defer:
deinitRecorder(rec)
setupRegistry(reg)
setupRecorder(rec)
var t = Tag(name: "a", rec: addr rec)
let id1 = addEventListener(reg, "evt", tagCb, addr t)
@ -99,7 +81,6 @@ suite "FFIEventRegistry mutation":
check removeEventListener(reg, id1)
check removeEventListener(reg, id2)
# Second remove of the same id is a no-op.
check not removeEventListener(reg, id1)
check snapshotListeners(reg, "evt").len == 0
@ -107,14 +88,8 @@ suite "FFIEventRegistry mutation":
suite "FFIEventRegistry snapshot semantics":
test "snapshot returns only the listeners for the requested event":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
var rec: Recorder
initRecorder(rec)
defer:
deinitRecorder(rec)
setupRegistry(reg)
setupRecorder(rec)
var a = Tag(name: "a", rec: addr rec)
var b = Tag(name: "b", rec: addr rec)
var c = Tag(name: "c", rec: addr rec)
@ -123,32 +98,19 @@ suite "FFIEventRegistry snapshot semantics":
discard addEventListener(reg, "evt", tagCb, addr b)
discard addEventListener(reg, "other", tagCb, addr c)
let snapEvt = snapshotListeners(reg, "evt")
check snapEvt.len == 2 # both listeners for "evt"
let snapOther = snapshotListeners(reg, "other")
check snapOther.len == 1 # only the listener for "other"
let snapUnknown = snapshotListeners(reg, "no-subscriber")
check snapUnknown.len == 0 # no listener for this event
check snapshotListeners(reg, "evt").len == 2
check snapshotListeners(reg, "other").len == 1
check snapshotListeners(reg, "no-subscriber").len == 0
test "snapshot is a copy: post-snapshot mutation does not affect it":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
var rec: Recorder
initRecorder(rec)
defer:
deinitRecorder(rec)
setupRegistry(reg)
setupRecorder(rec)
var t = Tag(name: "a", rec: addr rec)
let id1 = addEventListener(reg, "evt", tagCb, addr t)
let snap = snapshotListeners(reg, "evt")
check snap.len == 1
# Mutating the registry after the snapshot must not retroactively
# shrink or grow the snapshot we already captured.
check removeEventListener(reg, id1)
discard addEventListener(reg, "evt", tagCb, addr t)
check snap.len == 1
@ -156,14 +118,8 @@ suite "FFIEventRegistry snapshot semantics":
suite "removeAllEventListeners":
test "drops every registered listener":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
var rec: Recorder
initRecorder(rec)
defer:
deinitRecorder(rec)
setupRegistry(reg)
setupRecorder(rec)
var a = Tag(name: "a", rec: addr rec)
var b = Tag(name: "b", rec: addr rec)

View File

@ -0,0 +1,322 @@
## Integration tests for the dedicated event thread (issue #6).
import std/[atomics, locks, os, strutils]
import unittest2
import results
import ffi
type TestEvtLib = object
type LatchPayload* {.ffi.} = object
iter*: int
type CallbackData = object
lock: Lock
cond: Cond
called: bool
retCode: cint
msg: array[1024, byte]
msgLen: int
proc initCallbackData(d: var CallbackData) =
d.lock.initLock()
d.cond.initCond()
proc deinitCallbackData(d: var CallbackData) =
d.cond.deinitCond()
d.lock.deinitLock()
template setupCallbackData(name: untyped) =
## Declares `name`, inits it, and defers its deinit in the caller's scope.
var name: CallbackData
initCallbackData(name)
defer:
deinitCallbackData(name)
proc captureCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
let d = cast[ptr CallbackData](userData)
acquire(d[].lock)
d[].retCode = retCode
let n = min(int(len), d[].msg.len)
if n > 0 and not msg.isNil():
copyMem(addr d[].msg[0], msg, n)
d[].msgLen = n
d[].called = true
signal(d[].cond)
release(d[].lock)
proc waitCallback(d: var CallbackData) =
acquire(d.lock)
while not d.called:
wait(d.cond, d.lock)
release(d.lock)
proc resetCalled(d: var CallbackData) =
acquire(d.lock)
d.called = false
release(d.lock)
proc waitCallbackTimeout(d: var CallbackData, timeoutMs: int): bool =
## Polls under `d.lock` so the load syncs with the `captureCb` writer.
let deadline = Moment.now() + timeoutMs.milliseconds
while true:
acquire(d.lock)
let done = d.called
release(d.lock)
if done:
return true
if Moment.now() >= deadline:
return false
os.sleep(10)
template withPool(ctxIdent: untyped, body: untyped) =
## Sets up a pool + ctx, runs body, destroys on exit.
var pool: FFIContextPool[TestEvtLib]
let ctxIdent = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctxIdent)
body
registerReqFFI(EmitLatchEvent, lib: ptr TestEvtLib):
proc(iter: int): Future[Result[string, string]] {.async.} =
dispatchFFIEventCbor("latch", LatchPayload(iter: iter))
return ok("emitted")
registerReqFFI(PingEvent, lib: ptr TestEvtLib):
proc(): Future[Result[string, string]] {.async.} =
return ok("pong")
# Atomic switch so the wedge fires deterministically per test.
var gBlockingEnabled: Atomic[bool]
gBlockingEnabled.store(false)
registerReqFFI(BlockingRequest, lib: ptr TestEvtLib):
proc(milliseconds: int): Future[Result[string, string]] {.async.} =
if gBlockingEnabled.load():
os.sleep(milliseconds)
return ok("done")
var gListenerThreadId: Atomic[int]
gListenerThreadId.store(-1)
proc captureThreadIdCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
gListenerThreadId.store(getThreadId())
let d = cast[ptr CallbackData](userData)
acquire(d[].lock)
d[].called = true
signal(d[].cond)
release(d[].lock)
var gFfiThreadId: Atomic[int]
gFfiThreadId.store(-1)
registerReqFFI(CaptureFfiTidRequest, lib: ptr TestEvtLib):
proc(): Future[Result[string, string]] {.async.} =
gFfiThreadId.store(getThreadId())
return ok("captured")
suite "event delivery is asynchronous":
test "listener runs on the event thread, not the FFI thread":
# CallbackData defers declared first run last (LIFO): pool-destroy joins
# the event thread before any still-held mutex is torn down. TSan otherwise
# flags `captureCb` on a destroyed mutex.
setupCallbackData(evt)
setupCallbackData(rsp)
withPool(ctx):
discard addEventListener(
ctx[].eventRegistry, "latch", captureThreadIdCb, addr evt
)
check sendRequestToFFIThread(
ctx, CaptureFfiTidRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
resetCalled(rsp)
check sendRequestToFFIThread(
ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0)
)
.isOk()
waitCallback(rsp)
waitCallback(evt)
let ffiTid = gFfiThreadId.load()
let listenerTid = gListenerThreadId.load()
check ffiTid >= 0
check listenerTid >= 0
check ffiTid != listenerTid
proc slowSleepCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
os.sleep(150)
suite "FFI thread independence":
test "slow listener does not block FFI thread request round-trip":
setupCallbackData(rsp)
withPool(ctx):
discard addEventListener(
ctx[].eventRegistry, "latch", slowSleepCb, nil
)
check sendRequestToFFIThread(
ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0)
)
.isOk()
waitCallback(rsp)
resetCalled(rsp)
# chronos's `Moment` — std/times exports a `milliseconds` that
# shadows chronos's at this generic-instantiation site.
let started = Moment.now()
check sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rsp))
.isOk()
waitCallback(rsp)
let elapsed = Moment.now() - started
check elapsed < 100.milliseconds # under the 150 ms slow-listener sleep
when not defined(gcRefc):
## Skipped under refc: sleeping the FFI thread inside a sync handler
## interacts badly with refc + existing destroy-on-time policies.
suite "FFI heartbeat staleness":
test "wedged FFI thread triggers onNotResponding via heartbeat":
setupCallbackData(notif)
setupCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
# Disable wedge first so destroy isn't blocked by the still-sleeping handler.
gBlockingEnabled.store(false)
discard pool.destroyFFIContext(ctx)
discard addEventListener(
ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif
)
# Wait out the start-delay so the heartbeat check is armed.
os.sleep(FFIHeartbeatStartDelay.milliseconds.int + 200)
# Wedge long enough to cross at least one tick boundary.
gBlockingEnabled.store(true)
let wedgeMs =
(EventThreadTickInterval + FFIHeartbeatStaleThreshold).milliseconds.int +
1500
check sendRequestToFFIThread(
ctx, BlockingRequest.ffiNewReq(captureCb, addr rsp, wedgeMs)
)
.isOk()
waitCallback(rsp)
gBlockingEnabled.store(false)
check waitCallbackTimeout(notif, 1500)
type BackpressureState = object
enteredLock: Lock
enteredCond: Cond
entered: Atomic[bool]
releaseLock: Lock
releaseCond: Cond
release: Atomic[bool]
proc initBackpressure(b: var BackpressureState) =
b.enteredLock.initLock()
b.enteredCond.initCond()
b.releaseLock.initLock()
b.releaseCond.initCond()
proc deinitBackpressure(b: var BackpressureState) =
b.enteredCond.deinitCond()
b.enteredLock.deinitLock()
b.releaseCond.deinitCond()
b.releaseLock.deinitLock()
proc backpressureCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
## First call signals entered then blocks under reg.lock to back-pressure
## subsequent dispatches — gives a deterministic way to fill the queue.
let b = cast[ptr BackpressureState](userData)
if not b[].entered.exchange(true):
acquire(b[].enteredLock)
signal(b[].enteredCond)
release(b[].enteredLock)
acquire(b[].releaseLock)
while not b[].release.load():
wait(b[].releaseCond, b[].releaseLock)
release(b[].releaseLock)
registerReqFFI(BurstEmit, lib: ptr TestEvtLib):
proc(count: int): Future[Result[string, string]] {.async.} =
for i in 0 ..< count:
dispatchFFIEventCbor("latch", LatchPayload(iter: i))
return ok("bursted")
suite "queue overflow":
test "overflow sets stuck flag, fires onNotResponding, rejects new requests":
var bp: BackpressureState
initBackpressure(bp)
defer:
deinitBackpressure(bp)
setupCallbackData(notif)
setupCallbackData(rsp)
setupCallbackData(rejected)
withPool(ctx):
discard addEventListener(
ctx[].eventRegistry, "latch", backpressureCb, addr bp
)
discard addEventListener(
ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif
)
# Kick one event so the listener holds reg.lock; subsequent enqueues
# pile up undrained.
check sendRequestToFFIThread(
ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, -1)
)
.isOk()
waitCallback(rsp)
acquire(bp.enteredLock)
while not bp.entered.load():
wait(bp.enteredCond, bp.enteredLock)
release(bp.enteredLock)
# Burst > capacity in one request; tail enqueues flip the stuck flag.
resetCalled(rsp)
check sendRequestToFFIThread(
ctx, BurstEmit.ffiNewReq(captureCb, addr rsp, EventQueueCapacity + 8)
)
.isOk()
waitCallback(rsp)
check ctx.eventQueueStuck.load()
let res =
sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rejected))
check res.isErr()
check res.error.contains("stuck")
# Release backpressure so drain advances and the stuck flag fires
# not_responding.
acquire(bp.releaseLock)
bp.release.store(true)
signal(bp.releaseCond)
release(bp.releaseLock)
check waitCallbackTimeout(notif, 2000)