chore: reduce coments, reuse code

This commit is contained in:
Gabriel Cruz 2026-06-05 14:20:01 -03:00
parent a429ee9e31
commit 45d5414323
No known key found for this signature in database
GPG Key ID: 2E467754A6BA9BA5
6 changed files with 401 additions and 781 deletions

View File

@ -1,48 +1,38 @@
{.passc: "-fPIC".}
import system/ansi_c
import std/[atomics, locks, options, tables]
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
import
./ffi_types, ./ffi_events, ./ffi_thread_request, ./internal/ffi_macro, ./logging,
./cbor_serial
import ./ffi_types, ./ffi_events, ./ffi_thread_request, ./logging, ./cbor_serial
export ffi_events
type FFIContext*[T] = object
myLib*: ptr T
# main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library)
myLib*: ptr T # main library object (Waku, LibP2P, SDS, …)
ffiThread: Thread[(ptr FFIContext[T])]
# represents the main FFI thread in charge of attending API consumer actions
eventThread: Thread[(ptr FFIContext[T])]
# drains the event queue and runs the FFI-thread heartbeat check
lock: Lock
reqChannel: ChannelSPSCSingle[ptr FFIThreadRequest]
reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent
reqSignal: ThreadSignalPtr
reqReceivedSignal: ThreadSignalPtr
# to signal main thread, interfacing with the FFI thread, that FFI thread received the request
stopSignal: ThreadSignalPtr
threadExitSignal: ThreadSignalPtr # bounds destroyFFIContext's wait so a blocked loop cannot hang the caller
eventQueueSignal: ThreadSignalPtr # wakes the event thread on enqueue
eventThreadExitSignal: ThreadSignalPtr # mirrors threadExitSignal for the event thread
threadExitSignal: ThreadSignalPtr
eventQueueSignal: ThreadSignalPtr
eventThreadExitSignal: ThreadSignalPtr
userData*: pointer
eventRegistry*: FFIEventRegistry
eventQueue*: EventQueue
ffiHeartbeat*: Atomic[int64] # advanced each FFI-thread loop; event thread reads for liveness
eventQueueStuck*: Atomic[bool] # sticky overflow flag; recovery is destroy+recreate
running: Atomic[bool] # To control when the threads are running
eventQueueStuck*: Atomic[bool] # sticky overflow flag
running: Atomic[bool]
registeredRequests: ptr Table[cstring, FFIRequestProc]
# Pointer to with the registered requests at compile time
var onFFIThread* {.threadvar.}: bool
## True while executing inside `ffiThreadBody`. Used by
## `sendRequestToFFIThread` to detect re-entrant dispatch from a handler
## (which would self-deadlock on `reqReceivedSignal`).
# Re-entrant dispatch guard for `sendRequestToFFIThread`.
const git_version* {.strdefine.} = "n/a"
const
EventThreadTickInterval* = 1.seconds # bounds idle heartbeat check latency
EventThreadTickInterval* = 1.seconds
FFIHeartbeatStartDelay* = 10.seconds # grace window for library startup
FFIHeartbeatStaleThreshold* = 1.seconds
@ -58,8 +48,8 @@ proc encodeNotRespondingEvent(): seq[byte] =
proc dispatchToListeners[T](
ctx: ptr FFIContext[T], eventName: string, data: pointer, dataLen: int
) =
## Holds reg.lock for the entire snapshot + invocation so concurrent
## add/remove on this registry blocks until dispatch returns.
## Lock held across the whole fan-out so foreign add/remove blocks
## until dispatch returns (UAF-close contract from PR #39).
withLock ctx[].eventRegistry.lock:
let listeners = ctx[].eventRegistry.byEvent.getOrDefault(eventName)
if listeners.len == 0:
@ -75,8 +65,7 @@ proc dispatchToListeners[T](
)
proc onNotResponding*(ctx: ptr FFIContext) =
## Bypasses the event queue (which may itself be wedged) and dispatches
## directly to listeners. Runs on the event thread.
## Bypasses the (possibly wedged) event queue; runs on the event thread.
let event =
try:
encodeNotRespondingEvent()
@ -91,28 +80,23 @@ proc onNotResponding*(ctx: ptr FFIContext) =
proc sendRequestToFFIThread*(
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
): Result[void, string] =
# Event-queue overflow refuses further requests; the event thread fires onNotResponding to avoid deadlocking on reg.lock here.
if ctx.eventQueueStuck.load():
deleteRequest(ffiRequest)
return
err("event queue stuck - library cannot accept new requests")
return err("event queue stuck - library cannot accept new requests")
# Reentrancy guard: only this thread can fire `reqReceivedSignal`, so a handler dispatching back would self-deadlock.
if onFFIThread:
# Re-entrant dispatch from a handler would self-deadlock on `reqReceivedSignal`.
deleteRequest(ffiRequest)
return err(
"reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context"
)
# All async submissions serialise on `ctx.lock` for the full
# trySend + fireSync + waitSync sequence because `reqChannel` is
# single-producer and `reqReceivedSignal` is shared across callers.
# Multi-producer redesign is tracked as PR #23 review item 7.
# `reqChannel` is single-producer and `reqReceivedSignal` shared; serialise
# the full trySend + fireSync + waitSync. PR #23 review item 7 tracks SP→MP.
ctx.lock.acquire()
defer:
ctx.lock.release()
## Sending the request
let sentOk = ctx.reqChannel.trySend(ffiRequest)
if not sentOk:
deleteRequest(ffiRequest)
@ -127,42 +111,28 @@ proc sendRequestToFFIThread*(
deleteRequest(ffiRequest)
return err("Couldn't fireSync in time")
## wait until the FFI working thread properly received the request
let res = ctx.reqReceivedSignal.waitSync(timeout)
if res.isErr():
## Do not free ffiRequest here: the FFI thread was already signaled and
## will process (and free) it.
# FFI thread was already signaled; it owns ffiRequest now.
return err("Couldn't receive reqReceivedSignal signal")
## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the
## process proc.
ok()
proc processRequest[T](
request: ptr FFIThreadRequest, ctx: ptr FFIContext[T]
) {.async.} =
## Invoked within the FFI thread to process a request coming from the FFI API consumer thread.
let reqId = $request[].reqId
## The reqId determines which proc will handle the request.
## The registeredRequests represents a table defined at compile time.
## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously]
## Explicit conversion keeps `reqId` alive as the backing string,
## avoiding the implicit string→cstring warning that will become an error.
# Keep `reqId` alive as backing for the cstring view.
let reqIdCs = reqId.cstring
let retFut =
if not ctx[].registeredRequests[].contains(reqIdCs):
## That shouldn't happen because only registered requests should be sent to the FFI thread.
nilProcess(request[].reqId)
else:
ctx[].registeredRequests[][reqIdCs](cast[pointer](request), ctx)
## Catch every catchable exception (including CancelledError raised by
## the shutdown drain in ffiRun) so handleRes — and its `deleteRequest`
## defer — always runs. Otherwise an abandoned in-flight handler would
## leak its request envelope, reqId copy, and CBOR payload.
# Catch all (incl. CancelledError from the shutdown drain) so handleRes —
# and its `deleteRequest` defer — always runs.
let res =
try:
await retFut
@ -171,16 +141,14 @@ proc processRequest[T](
"Error in processRequest for " & reqId & ": " & e.msg
)
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
## keeps the async proc raises:[] compatible. The defer inside handleRes
## guarantees request is freed before the exception propagates.
# handleRes may raise (rare: OOM, GC setup); keep `raises: []`.
try:
handleRes(res, request)
except Exception as e:
error "Unexpected exception in handleRes", error = e.msg
var ffiEventQueueSignalPtr {.threadvar.}: ThreadSignalPtr
## Stashed so the hook below has no closure environment.
# Stashed so the hook has no closure env.
proc ffiNotifyEventEnqueuedHook() {.gcsafe, raises: [].} =
if not ffiEventQueueSignalPtr.isNil():
@ -189,7 +157,6 @@ proc ffiNotifyEventEnqueuedHook() {.gcsafe, raises: [].} =
error "failed to fire eventQueueSignal after enqueue", err = res.error
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
## FFI thread body that attends library user API requests
ffiCurrentEventRegistry = addr ctx[].eventRegistry
ffiCurrentEventQueue = addr ctx[].eventQueue
ffiCurrentEventQueueStuck = addr ctx[].eventQueueStuck
@ -201,20 +168,16 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
defer:
onFFIThread = false
# Unblocks destroyFFIContext's bounded wait so cleanup can proceed.
# Unblocks destroyFFIContext's bounded wait.
let fireRes = ctx.threadExitSignal.fireSync()
if fireRes.isErr():
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
var ffiReqHandler: T
## Holds the main library object, i.e., in charge of handling the ffi requests.
## e.g., Waku, LibP2P, SDS, etc.
## In-flight processRequest futures. Tracked so they can be drained on
## shutdown — otherwise destroying the context while a handler is
## awaiting (e.g. sleepAsync) abandons the future and leaks the
## request's envelope/reqId/payload allocations.
# Track in-flight handlers so shutdown can drain them — otherwise
# abandoned futures leak request envelope/reqId/payload.
var pending: seq[Future[void]] = @[]
proc reapCompleted() =
@ -226,7 +189,7 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
inc i
while ctx.running.load():
# Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread.
# Freezes if a sync handler blocks; event thread reads for liveness.
discard ctx.ffiHeartbeat.fetchAdd(1)
reapCompleted()
@ -235,7 +198,6 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
if not gotSignal:
continue
## Wait for a request from the ffi consumer thread
var request: ptr FFIThreadRequest
if not ctx.reqChannel.tryRecv(request):
continue
@ -243,17 +205,13 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
if ctx.myLib.isNil():
ctx.myLib = addr ffiReqHandler
## Handle the request
pending.add processRequest(request, ctx)
let fireRes = ctx.reqReceivedSignal.fireSync()
if fireRes.isErr():
error "could not fireSync back to requester thread", error = fireRes.error
## Drain in-flight handlers so each request's `deleteRequest` runs
## before we exit. Without this, abandoning a future mid-await would
## leak the request allocations (visible to LSan; previously hidden
## because Nim's pool allocator kept the chunks alive in the process).
# Drain in-flight handlers so each request's `deleteRequest` runs.
reapCompleted()
if pending.len > 0:
try:
@ -264,7 +222,6 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
waitFor ffiRun(ctx)
proc dispatchQueuedEvent[T](ctx: ptr FFIContext[T], qe: QueuedEvent) =
## Frees `qe`'s c_malloc buffers on exit.
defer:
freeEventBuffers(qe.name, qe.data)
ctx.dispatchToListeners($qe.name, qe.data, qe.dataLen)
@ -292,31 +249,35 @@ proc initHeartbeatMonitor[T](ctx: ptr FFIContext[T]): HeartbeatMonitor =
)
proc check[T](hb: var HeartbeatMonitor, ctx: ptr FFIContext[T]) =
## Fires onNotResponding once the FFI thread's heartbeat counter stops
## advancing past the stale threshold. Latches until it moves again.
## Fires onNotResponding once the heartbeat stalls past the threshold;
## latches until it moves again.
if Moment.now() - hb.startedAt <= FFIHeartbeatStartDelay:
return
let cur = ctx.ffiHeartbeat.load()
if cur != hb.lastValue:
hb.lastValue = cur
hb.lastChange = Moment.now()
hb.notifiedStale = false
elif not hb.notifiedStale and
Moment.now() - hb.lastChange > FFIHeartbeatStaleThreshold:
onNotResponding(ctx)
hb.notifiedStale = true
return
if hb.notifiedStale:
return
if Moment.now() - hb.lastChange <= FFIHeartbeatStaleThreshold:
return
onNotResponding(ctx)
hb.notifiedStale = true
proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} =
var hb = initHeartbeatMonitor(ctx)
var notifiedStuck = false
while ctx.running.load():
# Wake on enqueue or tick — whichever first.
discard await ctx.eventQueueSignal.wait().withTimeout(EventThreadTickInterval)
ctx.drainEventQueue()
# Fires here (after drain releases reg.lock) — from the FFI thread it'd deadlock on a back-pressuring listener.
# Fire after drain so reg.lock is free — FFI-thread would deadlock here.
if not notifiedStuck and ctx.eventQueueStuck.load():
onNotResponding(ctx)
notifiedStuck = true
@ -326,8 +287,7 @@ proc eventRun[T](ctx: ptr FFIContext[T]) {.async.} =
hb.check(ctx)
proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
## Drains the event queue and runs the FFI-thread heartbeat check.
## Owns the queued `c_malloc` payloads until dispatch returns.
## Owns queued `c_malloc` payloads until dispatch returns.
defer:
let fireRes = ctx.eventThreadExitSignal.fireSync()
if fireRes.isErr():
@ -338,40 +298,24 @@ proc eventThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
except CatchableError as e:
error "event thread exited with exception", error = e.msg
template closeAndNil(field: untyped) =
if not field.isNil():
?field.close()
field = nil
proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Mirror of `initContextResources`: tears down lock, registry, queue,
## and signal fds in place. Threads MUST already be joined. Caller owns
## the memory holding `ctx`. Fields are nil'd after close so a re-init
## on the same slot doesn't double-close.
## Mirror of `initContextResources`. Threads MUST be joined first;
## fields are nil'd after close so re-init on the same slot is safe.
ctx.lock.deinitLock()
deinitEventRegistry(ctx[].eventRegistry)
deinitEventQueue(ctx[].eventQueue)
when defined(gcRefc):
## ThreadSignalPtr.close() is intentionally skipped under --mm:refc.
##
## close() goes through chronos's safeUnregisterAndCloseFd, which calls
## getThreadDispatcher() and lazily allocates a new Selector for the
## main thread. With refc and a heavy ref-object graph torn down by the
## FFI thread (libwaku/libp2p), that allocation traps inside rawNewObj
## and the refc signal handler re-enters the same allocator — the
## process never returns. Captured stack from a hung process:
## close → safeUnregisterAndCloseFd → getThreadDispatcher →
## newDispatcher → Selector.new → newObj (gc.nim:488) →
## rawNewObj (gc.nim:470) → rawNewObj → _sigtramp → signalHandler →
## newObjNoInit → addNewObjToZCT (infinite re-entry)
##
## --mm:orc does NOT exhibit this bug; see the
## "destroyFFIContext refc workaround" suite in tests/test_ffi_context.nim
## (test "destroy after heavy ref-allocation workload returns promptly").
## The signal fds (a few per ctx) are reclaimed by the OS at process
## exit; destroyFFIContext is called once per process lifetime, so the
## leak is bounded.
# ThreadSignalPtr.close() under refc traps in safeUnregisterAndCloseFd
# → newDispatcher → rawNewObj → signal-handler re-entry (process hangs).
# See tests/test_ffi_context.nim "destroyFFIContext refc workaround".
# Fd leak is bounded — destroy runs once per process lifetime.
discard
else:
template closeAndNil(field: untyped) =
if not field.isNil():
?field.close()
field = nil
closeAndNil(ctx.reqSignal)
closeAndNil(ctx.reqReceivedSignal)
closeAndNil(ctx.stopSignal)
@ -381,16 +325,19 @@ proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
ok()
proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Full cleanup for heap-allocated contexts: closes all resources and frees memory.
## Deinit + free for heap-allocated contexts.
defer:
freeShared(ctx)
ctx.deinitContextResources()
template newSignalOrErr(field: untyped, name: string) =
field = ThreadSignalPtr.new().valueOr:
return err("couldn't create " & name & " ThreadSignalPtr: " & $error)
proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Initialises all resources inside an already-allocated FFIContext slot.
## On failure every partially-initialised resource is closed; the caller
## is responsible for releasing the slot (freeShared or pool.releaseSlot).
# Defensive nil: deferred cleanup must never double-close stale pointers on a reused pool slot.
## On failure, the deferred cleanup closes partial state; caller releases
## the slot (freeShared or pool.releaseSlot).
# Nil first so deferred cleanup can't double-close a reused pool slot.
ctx.reqSignal = nil
ctx.reqReceivedSignal = nil
ctx.stopSignal = nil
@ -410,23 +357,12 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
error "failed to clean up resources after createFFIContext failure",
error = error
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr: " & $error)
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqReceivedSignal ThreadSignalPtr: " & $error)
ctx.stopSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create stopSignal ThreadSignalPtr: " & $error)
ctx.threadExitSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create threadExitSignal ThreadSignalPtr: " & $error)
ctx.eventQueueSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create eventQueueSignal ThreadSignalPtr: " & $error)
ctx.eventThreadExitSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create eventThreadExitSignal ThreadSignalPtr: " & $error)
newSignalOrErr(ctx.reqSignal, "reqSignal")
newSignalOrErr(ctx.reqReceivedSignal, "reqReceivedSignal")
newSignalOrErr(ctx.stopSignal, "stopSignal")
newSignalOrErr(ctx.threadExitSignal, "threadExitSignal")
newSignalOrErr(ctx.eventQueueSignal, "eventQueueSignal")
newSignalOrErr(ctx.eventThreadExitSignal, "eventThreadExitSignal")
ctx.registeredRequests = addr ffi_types.registeredRequests
@ -440,8 +376,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
try:
createThread(ctx.eventThread, eventThreadBody[T], ctx)
except ValueError, ResourceExhaustedError:
## ffiThread is already running; signal it to exit and join before the
## deferred cleanUpResources closes the signals it's waiting on.
# Join ffiThread before deferred cleanup closes signals it's waiting on.
ctx.running.store(false)
let fireRes = ctx.reqSignal.fireSync()
if fireRes.isErr():
@ -470,31 +405,23 @@ proc waitExitOrErr(
ok()
proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] =
# Error paths intentionally skip onNotResponding: a back-pressuring
# listener may hold reg.lock, and onNotResponding takes it — would
# amplify the stuck state into a deadlock instead of escaping it.
# Skip onNotResponding on error: it takes reg.lock, which a back-pressuring
# listener may hold — would deepen the stuck state into a deadlock.
ctx.running.store(false)
?ctx.reqSignal.fireOrErr("reqSignal")
?ctx.stopSignal.fireOrErr("stopSignal")
# Non-fatal: event thread will see running==false on the next tick.
# Non-fatal: event thread sees running==false on the next tick anyway.
ctx.eventQueueSignal.fireOrErr("eventQueueSignal").isOkOr:
error "failed to signal eventQueueSignal in signalStop", error = error
ok()
## If the FFI thread's event loop is blocked by a synchronous handler
## (e.g. blocking I/O), it cannot process reqSignal in time to exit.
## clearContext waits on threadExitSignal up to this bound; on timeout it
## returns err and skips joinThread/cleanup (leaking the thread + ctx slot)
## rather than hanging the caller forever.
## Bound on how long clearContext waits for the FFI thread to exit before
## leaking ctx rather than hanging the caller.
const ThreadExitTimeout* = 1500.milliseconds
proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Signals both threads to stop, waits up to ThreadExitTimeout per thread,
## and joins them. On timeout returns err and skips remaining joins
## (leaving the threads live) rather than hanging the caller. Resource
## cleanup is the caller's responsibility.
##
## Timeout paths skip onNotResponding for the same reason signalStop does.
## On timeout, returns err and skips remaining joins (leaves threads live).
## Caller owns resource cleanup. Skips onNotResponding (same reason as signalStop).
ctx.signalStop().isOkOr:
return err("signalStop failed: " & $error)
@ -505,7 +432,7 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] =
ok()
proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Stops the FFI context that was created via createFFIContext[T]() (heap).
## Stops a heap-allocated FFI context.
ctx.stopAndJoinThreads().isOkOr:
return err("clearContext: " & $error)
ctx.cleanUpResources().isOkOr:

View File

@ -3,14 +3,10 @@ import results
import ./ffi_context
const MaxFFIContexts* = 32
## Maximum number of concurrently live FFI contexts when using FFIContextPool.
## Fds and threads are only consumed for slots that are actually acquired,
## so this value only affects the upfront memory of the pool array.
# Only affects upfront pool memory; fds/threads consumed per acquired slot.
type FFIContextPool*[T] = object
## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context
## and bounds the total number of file descriptors consumed by ThreadSignalPtrs
## to at most MaxFFIContexts * 2.
## Fixed pool. Bounds ThreadSignalPtr fds at MaxFFIContexts * 2.
slots: array[MaxFFIContexts, FFIContext[T]]
inUse: array[MaxFFIContexts, Atomic[bool]]
@ -19,7 +15,7 @@ proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], stri
var expected = false
if pool.inUse[i].compareExchange(expected, true):
return ok(pool.slots[i].addr)
return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
for i in 0 ..< MaxFFIContexts:
@ -30,39 +26,31 @@ proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
proc createFFIContext*[T](
pool: var FFIContextPool[T]
): Result[ptr FFIContext[T], string] =
## Acquires a slot from the fixed pool and initialises it as an FFI context.
## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open.
let ctx = pool.acquireSlot().valueOr:
return err("createFFIContext: acquireSlot failed: " & $error)
initContextResources(ctx).isOkOr:
pool.releaseSlot(ctx)
return err("createFFIContext: initContextResources failed: " & $error)
return ok(ctx)
ok(ctx)
proc destroyFFIContext*[T](
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
): Result[void, string] =
## Stops the FFI context and returns its slot to the pool. If the FFI thread
## is blocked and does not exit in time, the slot is leaked rather than
## reclaimed — closing its resources while the thread is still live would be
## unsafe.
## On thread-exit timeout the slot is leaked — closing live-thread resources is unsafe.
ctx.stopAndJoinThreads().isOkOr:
return err("destroyFFIContext(pool): " & $error)
# Without this, the next acquisition would re-init an already-initialised
# lock (UB) and leak the previous signal fds.
# Required: next acquisition would otherwise re-init a live lock (UB).
let deinitRes = ctx.deinitContextResources()
pool.releaseSlot(ctx)
deinitRes.isOkOr:
return err("destroyFFIContext(pool): " & $error)
return ok()
ok()
proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool =
## Returns true only if ctx points to one of the pool's slots that is
## currently in use. Rejects nil, offset-invalid, and dangling pointers
## at the API boundary, preventing use-after-free dereferences.
## Rejects nil / offset-invalid / dangling pointers at the API boundary.
if ctx.isNil():
return false
for i in 0 ..< MaxFFIContexts:
if cast[pointer](pool.slots[i].addr) == ctx:
return pool.inUse[i].load()
return false
false

View File

@ -1,15 +1,6 @@
## Event registry, bounded SPSC event queue, and dispatch templates for
## FFI library-initiated events. Listeners receive only the event name
## they subscribed to. Queue payloads travel via `c_malloc` so transfer
## across Nim heaps is safe under both `--mm:orc` and `--mm:refc`.
##
## Dispatch templates enqueue and return on the FFI thread; a dedicated
## event thread (owned by `FFIContext`) drains the queue and invokes
## listeners, so a slow handler can never block the FFI event loop. On
## queue overflow the templates log, set a sticky stuck flag, and wake
## the event thread — which fires the global "not responding"
## notification from its own loop (firing it from the FFI thread would
## risk deadlocking against a listener back-pressuring under `reg.lock`).
## Per-context event registry + bounded SPSC queue. FFI thread enqueues,
## event thread drains; payloads travel via `c_malloc` so they survive
## pool-slot reuse across thread heaps.
{.pragma: callback, cdecl, raises: [], gcsafe.}
@ -20,9 +11,7 @@ import ./ffi_types, ./cbor_serial, ./alloc
type EventEnvelope*[T] = object
## Standard wire shape for CBOR-encoded FFI events:
## { eventType: tstr, payload: <T> }
## Pair with `dispatchFFIEventCbor` (or call `cborEncode` directly).
## CBOR wire shape: { eventType: tstr, payload: <T> }.
eventType*: string
payload*: T
@ -34,33 +23,22 @@ type
userData*: pointer
FFIEventRegistry* = object
## Per-context multi-listener registry. `lock` guards every mutation;
## readers (dispatch path) acquire it only long enough to copy out the
## listener slice for the event being dispatched.
lock*: Lock
nextId*: uint64 ## Monotonic id source. 0 is reserved as "invalid"; ids start at 1.
nextId*: uint64 # 0 is reserved as "invalid"; ids start at 1.
byEvent*: Table[string, seq[FFIEventListener]]
proc initEventRegistry*(reg: var FFIEventRegistry) =
## Must be called exactly once on the owning thread before the registry
## is shared. The embedded `Lock` wraps a platform primitive that cannot
## be safely double-initialised, so concurrent callers would hit UB at
## the OS layer — the lock itself can't defend against its own init.
## Must run once on the owning thread before sharing — `initLock` on a
## live primitive is UB at the OS layer.
reg.lock.initLock()
reg.nextId = 0'u64
reg.byEvent = initTable[string, seq[FFIEventListener]]()
proc deinitEventRegistry*(reg: var FFIEventRegistry) =
## Mirror of `initEventRegistry`: must be called exactly once, by the
## same thread that owns the registry, after all other threads have
## stopped using it. `deinitLock` on a platform primitive that any
## thread might still be holding or about to acquire is UB at the OS
## layer.
##
## Resets the GC-managed fields to default so `FFIContextPool`'s
## slot reuse on a *different* thread doesn't trigger Nim's hidden
## assignment destructor against this thread's heap allocations.
## Mirror of `initEventRegistry`; same single-thread constraint.
## Resets GC fields so pool-slot reuse on another thread doesn't fire
## Nim's hidden assignment dtor against this thread's heap.
reg.lock.deinitLock()
reg.byEvent = default(Table[string, seq[FFIEventListener]])
reg.nextId = 0'u64
@ -71,11 +49,7 @@ proc addEventListener*(
callback: FFICallBack,
userData: pointer,
): uint64 {.raises: [].} =
## Registers `callback` for `eventName` and returns the listener's stable
## id (always non-zero on success). A listener only receives events
## dispatched under its own `eventName` — subscribe to each event
## separately. Returns 0 if `callback` is nil — the only documented
## failure mode.
## Returns the listener id (>0), or 0 if `callback` is nil.
if callback.isNil():
return 0
@ -90,10 +64,8 @@ proc addEventListener*(
assigned
proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises: [].} =
## Removes the listener with `id`. Returns true on success, false if no
## listener with that id exists. Safe to call from inside a dispatch:
## the in-flight snapshot still delivers exactly once to the listener
## being removed.
## Safe to call from inside a dispatch — the in-flight snapshot still
## delivers exactly once to the removed listener.
if id == 0'u64:
return false
@ -117,42 +89,35 @@ proc removeEventListener*(reg: var FFIEventRegistry, id: uint64): bool {.raises:
removed
proc removeAllEventListeners*(reg: var FFIEventRegistry) {.raises: [].} =
## Drops every registered listener. Does not reset the listener-id
## counter — subsequent `addEventListener` calls still return strictly
## increasing ids.
## Does not reset the id counter.
withLock reg.lock:
reg.byEvent.clear()
proc snapshotListeners*(
reg: var FFIEventRegistry, eventName: string
): seq[FFIEventListener] {.raises: [].} =
## Returns a copy of the listener slice for `eventName`. The copy is what
## makes re-entrant add/remove from inside a handler deadlock-free:
## dispatch holds the lock only for the duration of the copy, then
## iterates the copy outside the lock.
## Lock held only across the copy — keeps re-entrant add/remove
## from a handler deadlock-free.
var listeners: seq[FFIEventListener] = @[]
withLock reg.lock:
# `getOrDefault` avoids the raising `[]` path; returns empty when absent.
for l in reg.byEvent.getOrDefault(eventName):
listeners.add(l)
listeners
const EventQueueCapacity* = 1024
## ~24 KiB per context. Sustained backlog at this depth means a
## listener is wedged — what the stuck flag exists to surface.
# Sustained backlog at this depth means a listener is wedged.
type
QueuedEvent* = object
## All fields are raw `c_malloc` pointers so the buffer survives
## pool-slot reuse across thread heaps without an assignment dtor.
# Raw `c_malloc` pointers so the buffer survives pool-slot reuse
# across thread heaps without an assignment dtor.
name*: cstring
data*: ptr UncheckedArray[byte]
dataLen*: int
EventQueue* = object
## SPSC ring: FFI thread enqueues, event thread dequeues. Plain lock
## (no atomic indices) — operations are short and uncontended.
# SPSC ring; plain lock since ops are short and uncontended.
lock*: Lock
head*: int
tail*: int
@ -160,7 +125,6 @@ type
buf*: array[EventQueueCapacity, QueuedEvent]
proc initEventQueue*(q: var EventQueue) {.raises: [].} =
## Same single-owning-thread constraint as `initEventRegistry`.
q.lock.initLock()
q.head = 0
q.tail = 0
@ -177,7 +141,7 @@ proc freeEventBuffers*(
c_free(data)
proc deinitEventQueue*(q: var EventQueue) {.raises: [].} =
## Both producer and consumer must have stopped before calling.
## Both producer and consumer must have stopped.
for i in 0 ..< EventQueueCapacity:
freeEventBuffers(q.buf[i].name, q.buf[i].data)
q.buf[i] = QueuedEvent(name: nil, data: nil, dataLen: 0)
@ -189,8 +153,7 @@ proc deinitEventQueue*(q: var EventQueue) {.raises: [].} =
proc tryEnqueueEvent*(
q: var EventQueue, name: cstring, data: ptr UncheckedArray[byte], dataLen: int
): bool {.raises: [], gcsafe.} =
## Both `name` and `data` must be `c_malloc`'d; on success the queue
## takes ownership. On false the caller still owns and must free them.
## On true the queue owns `name`/`data`; on false the caller still does.
withLock q.lock:
if q.count >= EventQueueCapacity:
return false
@ -200,7 +163,7 @@ proc tryEnqueueEvent*(
true
proc tryDequeueEvent*(q: var EventQueue): Option[QueuedEvent] {.raises: [], gcsafe.} =
## Transfers buffer ownership to the caller, who must `c_free` both.
## Caller takes ownership and must `c_free` both buffers.
withLock q.lock:
if q.count == 0:
return none(QueuedEvent)
@ -218,7 +181,6 @@ proc eventQueueLen*(q: var EventQueue): int {.raises: [], gcsafe.} =
proc notifyListenersOk*(
listeners: seq[FFIEventListener], data: pointer, dataLen: int
) =
## Fans out a successful payload to every listener in the snapshot.
let dataPtr =
if dataLen > 0: cast[ptr cchar](data)
else: nil
@ -228,7 +190,6 @@ proc notifyListenersOk*(
)
proc notifyListenersErr*(listeners: seq[FFIEventListener], msg: string) =
## Fans out an error message to every listener in the snapshot.
let dataPtr =
if msg.len > 0: cast[ptr cchar](unsafeAddr msg[0])
else: nil
@ -238,22 +199,17 @@ proc notifyListenersErr*(listeners: seq[FFIEventListener], msg: string) =
)
var ffiCurrentEventRegistry* {.threadvar.}: ptr FFIEventRegistry
## Kept for tests that drive the registry directly. Dispatch no longer
## reads it — invocation has moved to the event thread.
# Kept for tests that drive the registry directly.
var ffiCurrentEventQueue* {.threadvar.}: ptr EventQueue
## Installed by the FFI thread so dispatch templates enqueue without
## threading a `ctx` parameter through every call site.
# Installed by the FFI thread so dispatch templates need no `ctx`.
var ffiCurrentEventQueueStuck* {.threadvar.}: ptr Atomic[bool]
## Sticky overflow flag on the owning `FFIContext`. Set by dispatch
## templates when enqueue fails; read by the FFI request entry point
## to reject further calls.
# Sticky overflow flag; FFI request entry point reads it to reject.
var ffiCurrentNotifyEventEnqueued* {.threadvar.}: proc() {.gcsafe, raises: [].}
## Hook (not a queue field) so this module doesn't depend on chronos's
## ThreadSignalPtr. Nil-safe — tests that drive the queue directly leave
## it unset and the event thread picks up enqueued events on the next tick.
# Hook so this module doesn't depend on chronos's ThreadSignalPtr.
# Nil-safe; tick-driven tests leave it unset.
template enqueueOrMarkStuck(
eventName: string,
@ -261,34 +217,31 @@ template enqueueOrMarkStuck(
dataPtr: ptr UncheckedArray[byte],
dataLen: int,
) =
## Common tail for both dispatch templates. Takes ownership of `namePtr`
## and `dataPtr` (both `c_malloc`'d). On queue-full, frees the buffers,
## sets the sticky stuck flag, and wakes the event thread — which fires
## onNotResponding from its loop (firing it here would risk deadlocking
## against a listener back-pressuring under `reg.lock`).
let q = ffiCurrentEventQueue
if q.isNil():
chronicles.error "event queue not set on this thread", event = eventName
freeEventBuffers(namePtr, dataPtr)
elif not q[].tryEnqueueEvent(namePtr, dataPtr, dataLen):
chronicles.error "event queue full; library marked stuck",
event = eventName, capacity = EventQueueCapacity
freeEventBuffers(namePtr, dataPtr)
if not ffiCurrentEventQueueStuck.isNil():
ffiCurrentEventQueueStuck[].store(true)
if not ffiCurrentNotifyEventEnqueued.isNil():
ffiCurrentNotifyEventEnqueued()
else:
## Takes ownership of `namePtr`/`dataPtr`. On queue-full sets the sticky
## stuck flag and wakes the event thread (firing onNotResponding here
## would risk deadlock against a back-pressuring listener).
block enqueueBlock:
let q = ffiCurrentEventQueue
if q.isNil():
chronicles.error "event queue not set on this thread", event = eventName
freeEventBuffers(namePtr, dataPtr)
break enqueueBlock
if not q[].tryEnqueueEvent(namePtr, dataPtr, dataLen):
chronicles.error "event queue full; library marked stuck",
event = eventName, capacity = EventQueueCapacity
freeEventBuffers(namePtr, dataPtr)
if not ffiCurrentEventQueueStuck.isNil():
ffiCurrentEventQueueStuck[].store(true)
if not ffiCurrentNotifyEventEnqueued.isNil():
ffiCurrentNotifyEventEnqueued()
break enqueueBlock
if not ffiCurrentNotifyEventEnqueued.isNil():
ffiCurrentNotifyEventEnqueued()
template dispatchFFIEvent*(eventName: string, body: untyped) =
## Dispatches an FFI event to every listener subscribed to `eventName`.
## `body` must yield a `string` or `seq[byte]`.
##
## Runs on the FFI thread: encodes the body into a fresh `c_malloc`
## buffer and enqueues it. Listener invocation happens later on the
## dedicated event thread, so user code can never block the FFI loop.
## `body` must yield `string` or `seq[byte]`. Runs on the FFI thread —
## encodes into a `c_malloc` buffer and enqueues; the event thread
## fans out to listeners.
block:
let evtName: string = eventName
let bodyVal = body
@ -301,12 +254,9 @@ template dispatchFFIEvent*(eventName: string, body: untyped) =
enqueueOrMarkStuck(evtName, namePtr, dataPtr, dataLen)
template dispatchFFIEventCbor*(eventName: string, eventPayload: typed) =
## Typed CBOR variant of `dispatchFFIEvent`. Wraps `eventPayload` in an
## `EventEnvelope`, CBOR-encodes it into a `c_malloc` buffer once, and
## queues it for the event thread to fan out to listeners.
##
## NB: parameter is `eventPayload`, not `payload` — Nim's template
## substitution would otherwise also rewrite the `payload:` field inside
## Typed CBOR variant of `dispatchFFIEvent`.
## Parameter is `eventPayload`, not `payload` — Nim's template
## substitution would otherwise rewrite the `payload:` field inside
## `EventEnvelope`.
block:
let evtName: string = eventName

View File

@ -1,11 +1,5 @@
## Tests for the CBOR-style FFI event dispatch path:
## - `dispatchFFIEvent` accepts both `string` and `seq[byte]` bodies
## - `dispatchFFIEventCbor` wraps a typed payload in `EventEnvelope[T]`,
## CBOR-encodes it, and dispatches via the event callback
##
## Tests run end-to-end against a real FFI thread (via FFIContextPool +
## sendRequestToFFIThread) so we exercise the threadvar-backed
## ffiCurrentEventRegistry wiring, not just the templates in isolation.
## End-to-end tests for `dispatchFFIEvent` / `dispatchFFIEventCbor`,
## driven through a real `FFIContext` so the threadvar wiring is exercised.
import std/[locks, os]
import unittest2
@ -14,16 +8,10 @@ import ffi
type TestEvtLib = object
## Event payload type (would be `{.ffi.}` in production so the codec gen
## emits a matching struct on the foreign side; the test only needs CBOR
## round-trip, which `cborEncode`/`cborDecode` provide via cbor_serial's
## generic overloads).
type MessageSentBody* {.ffi.} = object
requestId*: string
messageHash*: string
## Same callback-state helper as test_ffi_context.nim, duplicated here so
## this file stays a self-contained test binary.
type CallbackData = object
lock: Lock
cond: Cond
@ -40,6 +28,13 @@ proc deinitCallbackData(d: var CallbackData) =
d.cond.deinitCond()
d.lock.deinitLock()
template setupCallbackData(name: untyped) =
## Declares `name`, inits it, and defers its deinit in the caller's scope.
var name: CallbackData
initCallbackData(name)
defer:
deinitCallbackData(name)
proc captureCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
@ -60,15 +55,27 @@ proc waitCallback(d: var CallbackData) =
wait(d.cond, d.lock)
release(d.lock)
proc resetCalled(d: var CallbackData) =
acquire(d.lock)
d.called = false
release(d.lock)
proc callbackBytes(d: var CallbackData): seq[byte] =
var bytes = newSeq[byte](d.msgLen)
if d.msgLen > 0:
copyMem(addr bytes[0], addr d.msg[0], d.msgLen)
bytes
## A request that dispatches a typed CBOR event from inside the FFI
## thread and then returns ok — so the response callback can be used to
## synchronize the test.
template withPool(ctxIdent: untyped, body: untyped) =
## Sets up pool + ctx, runs body, destroys on exit.
var pool: FFIContextPool[TestEvtLib]
let ctxIdent = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctxIdent)
body
registerReqFFI(EmitCborEventRequest, lib: ptr TestEvtLib):
proc(): Future[Result[string, string]] {.async.} =
dispatchFFIEventCbor(
@ -77,19 +84,13 @@ registerReqFFI(EmitCborEventRequest, lib: ptr TestEvtLib):
)
return ok("emitted")
## A request that uses the lower-level `dispatchFFIEvent` with a raw
## `seq[byte]` body — the path that previously rejected non-string bodies.
registerReqFFI(EmitRawBytesEventRequest, lib: ptr TestEvtLib):
proc(): Future[Result[string, string]] {.async.} =
dispatchFFIEvent("raw_bytes"):
@[byte 0x01, 0x02, 0x03]
return ok("emitted")
## Setter-thread worker for the registry race regression test. Each
## iteration adds then immediately removes a listener for the dispatched
## event so a TSan-instrumented build can confirm `FFIEventRegistry.lock`
## serialises the cross-thread mutation against dispatch-time
## `snapshotListeners` reads from the FFI thread.
# Add/remove worker for the registry-race regression test.
type SetterArgs = tuple
ctx: ptr FFIContext[TestEvtLib]
stop: ptr Atomic[bool]
@ -104,153 +105,88 @@ proc setterThreadBody(args: SetterArgs) {.thread.} =
suite "dispatchFFIEventCbor":
test "delivers EventEnvelope-shaped CBOR payload to event callback":
# CallbackData defers declared first so they run LAST (LIFO),
# AFTER pool.destroyFFIContext joins the event thread. Otherwise
# TSan flags `captureCb` accessing an already-destroyed mutex
# whose memory got reused by the next test's stack frame.
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
# CallbackData defers declared first run last (LIFO), AFTER pool destroy
# joins the event thread — otherwise TSan flags captureCb on a destroyed mutex.
setupCallbackData(evt)
setupCallbackData(rsp)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
# Subscribe to the specific event the request below dispatches.
discard addEventListener(
ctx[].eventRegistry, "message_sent", captureCb, addr evt
)
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
waitCallback(evt)
check evt.retCode == RET_OK
let decoded = cborDecode(callbackBytes(evt), EventEnvelope[MessageSentBody])
check decoded.isOk()
check decoded.value.eventType == "message_sent"
check decoded.value.payload.requestId == "req-1"
check decoded.value.payload.messageHash == "0xdeadbeef"
suite "dispatchFFIEvent with seq[byte]":
test "accepts a raw seq[byte] body":
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
discard addEventListener(
ctx[].eventRegistry, "raw_bytes", captureCb, addr evt
)
check sendRequestToFFIThread(
ctx, EmitRawBytesEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
waitCallback(evt)
check evt.retCode == RET_OK
check callbackBytes(evt) == @[byte 0x01, 0x02, 0x03]
when not defined(gcRefc):
## Skipped under `--mm:refc`: each setter thread grows / shrinks the
## per-event listener `seq[FFIEventListener]` via `addEventListener`,
## and refc's per-thread GC heap ownership makes cross-thread seq
## buffer reallocation unsafe even when the surrounding lock is held.
## ORC + the FFI thread + tsan (the combo this test was written for)
## does not have that limitation.
suite "FFIEventRegistry concurrent access":
## Regression for PR #39 review comments r3288220895 / r3289285387.
## Run under tsan to actually validate the fix:
## NIM_FFI_SAN=tsan NIM_FFI_MM=orc nimble test_sanitized
test "concurrent add/remove writers vs dispatch reads stay race-free":
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
# Seed an initial callback so the FFI thread's first dispatch has a
# target. The setter threads will then repeatedly re-install the same
# (callback, userData) pair — what matters is the cross-thread write
# racing the FFI thread's read, not which pair "wins".
withPool(ctx):
discard addEventListener(
ctx[].eventRegistry, "message_sent", captureCb, addr evt
)
const NumSetterThreads = 4
const NumDispatchIters = 200
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
waitCallback(evt)
var stop: Atomic[bool]
stop.store(false)
var setters: array[NumSetterThreads, Thread[SetterArgs]]
for i in 0 ..< NumSetterThreads:
createThread(setters[i], setterThreadBody, (ctx, addr stop, addr evt))
check evt.retCode == RET_OK
let decoded = cborDecode(callbackBytes(evt), EventEnvelope[MessageSentBody])
check decoded.isOk()
check decoded.value.eventType == "message_sent"
check decoded.value.payload.requestId == "req-1"
check decoded.value.payload.messageHash == "0xdeadbeef"
for _ in 0 ..< NumDispatchIters:
# Reset rsp so each iteration's `waitCallback` blocks until the
# FFI thread fires the response — keeps the loop synchronous.
acquire(rsp.lock)
rsp.called = false
release(rsp.lock)
suite "dispatchFFIEvent with seq[byte]":
test "accepts a raw seq[byte] body":
setupCallbackData(evt)
setupCallbackData(rsp)
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
withPool(ctx):
discard addEventListener(
ctx[].eventRegistry, "raw_bytes", captureCb, addr evt
)
check sendRequestToFFIThread(
ctx, EmitRawBytesEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
waitCallback(evt)
check evt.retCode == RET_OK
check callbackBytes(evt) == @[byte 0x01, 0x02, 0x03]
when not defined(gcRefc):
## Skipped under refc: setter threads grow/shrink the per-event listener
## seq, and refc's per-thread GC heap makes that unsafe cross-thread.
suite "FFIEventRegistry concurrent access":
## Regression for PR #39 (r3288220895 / r3289285387).
## Validate with: NIM_FFI_SAN=tsan NIM_FFI_MM=orc nimble test_sanitized
test "concurrent add/remove writers vs dispatch reads stay race-free":
setupCallbackData(evt)
setupCallbackData(rsp)
withPool(ctx):
# Seed an initial listener so the first dispatch has a target.
discard addEventListener(
ctx[].eventRegistry, "message_sent", captureCb, addr evt
)
.isOk()
waitCallback(rsp)
stop.store(true)
for i in 0 ..< NumSetterThreads:
joinThread(setters[i])
const NumSetterThreads = 4
const NumDispatchIters = 200
# `evt` got hit by every dispatch above; just confirm at least one
# actually landed so a silently-broken dispatch loop is caught.
check evt.called
var stop: Atomic[bool]
stop.store(false)
var setters: array[NumSetterThreads, Thread[SetterArgs]]
for i in 0 ..< NumSetterThreads:
createThread(setters[i], setterThreadBody, (ctx, addr stop, addr evt))
## A foreign-thread mutation must not be able to invalidate a listener's
## `userData` while an in-flight dispatch is mid-invocation. Dispatch
## now lives on the dedicated event thread, which still holds
## `reg.lock` across the listener fan-out — so foreign
## `removeEventListener` blocks until dispatch returns. This test
## drives a real `FFIContext` (FFI thread enqueues, event thread
## dispatches) and asserts the same contract end-to-end.
for _ in 0 ..< NumDispatchIters:
resetCalled(rsp)
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
stop.store(true)
for i in 0 ..< NumSetterThreads:
joinThread(setters[i])
check evt.called
type SlowState = object
entered: Atomic[bool]
@ -259,8 +195,6 @@ type SlowState = object
proc slowEventCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
## Signal entry, sleep long enough that the test thread can race in
## with removeEventListener and observe the block, then signal exit.
let st = cast[ptr SlowState](userData)
st[].entered.store(true)
os.sleep(60)
@ -268,47 +202,32 @@ proc slowEventCb(
suite "registry lock held during invocation":
test "removeEventListener blocks until in-flight dispatch finishes":
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
setupCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
withPool(ctx):
var st: SlowState
st.entered.store(false)
st.exited.store(false)
var st: SlowState
st.entered.store(false)
st.exited.store(false)
let id = addEventListener(
ctx[].eventRegistry, "message_sent", slowEventCb, addr st
)
check id != 0'u64
# Register the slow callback under the same event the dispatch fires.
let id = addEventListener(
ctx[].eventRegistry, "message_sent", slowEventCb, addr st
)
check id != 0'u64
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
# Fire an event from the FFI thread; the request-response callback
# only confirms the request completed — the slow listener is invoked
# asynchronously by the event thread.
for _ in 0 ..< 500:
if st.entered.load():
break
os.sleep(1)
check st.entered.load()
check not st.exited.load()
check sendRequestToFFIThread(
ctx, EmitCborEventRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
# Wait until the event thread is inside slowEventCb.
for _ in 0 ..< 500:
if st.entered.load():
break
os.sleep(1)
check st.entered.load()
check not st.exited.load()
# Lock-during-invocation contract: remove blocks until dispatch
# finishes; by the time it returns, slowEventCb has set exited=true.
check removeEventListener(ctx[].eventRegistry, id)
check st.exited.load()
# Lock-during-invocation: remove blocks until dispatch finishes,
# by which time slowEventCb has set exited=true.
check removeEventListener(ctx[].eventRegistry, id)
check st.exited.load()

View File

@ -1,23 +1,12 @@
## Unit tests for the `FFIEventRegistry` primitive — the multi-listener
## data structure that will back `<lib>_add_event_listener` /
## `<lib>_remove_event_listener` once the dispatch wiring lands.
##
## These tests exercise the registry directly (no FFI thread, no dispatch
## templates) so they stay fast and pin down the registry's mutation and
## snapshot semantics in isolation.
## Unit tests for the `FFIEventRegistry` primitive (no FFI thread, no dispatch).
import std/locks
import unittest2
import ffi
# Tiny helpers — a thread-safe sink each listener writes into so we can
# assert which callbacks would fire and in what order once dispatch lands.
# Today only `tagCb`'s presence is exercised; the recorder is also used to
# make sure listener bookkeeping doesn't accidentally invoke callbacks.
type Recorder = object
lock: Lock
hits: seq[string] # tag captured from `userData` per invocation
hits: seq[string]
retCodes: seq[cint]
payloads: seq[string]
@ -34,7 +23,6 @@ proc record(r: var Recorder, tag: string, retCode: cint, payload: string) =
r.payloads.add(payload)
release(r.lock)
# Each listener is identified by a `Tag` passed through `userData`.
type Tag = object
name: string
rec: ptr Recorder
@ -48,17 +36,22 @@ proc tagCb(
copyMem(addr payload[0], msg, int(len))
record(t[].rec[], t[].name, retCode, payload)
template setupRegistry(regIdent: untyped) =
var regIdent: FFIEventRegistry
initEventRegistry(regIdent)
defer:
deinitEventRegistry(regIdent)
template setupRecorder(recIdent: untyped) =
var recIdent: Recorder
initRecorder(recIdent)
defer:
deinitRecorder(recIdent)
suite "FFIEventRegistry mutation":
test "addEventListener assigns monotonically increasing non-zero ids":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
var rec: Recorder
initRecorder(rec)
defer:
deinitRecorder(rec)
setupRegistry(reg)
setupRecorder(rec)
var t = Tag(name: "a", rec: addr rec)
let id1 = addEventListener(reg, "evt", tagCb, addr t)
@ -69,30 +62,18 @@ suite "FFIEventRegistry mutation":
check id3 == 3'u64
test "addEventListener returns 0 when callback is nil":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
setupRegistry(reg)
let id = addEventListener(reg, "evt", nil, nil)
check id == 0'u64
test "removeEventListener returns false for unknown ids":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
setupRegistry(reg)
check not removeEventListener(reg, 0'u64)
check not removeEventListener(reg, 99'u64)
test "removeEventListener removes listeners across distinct events":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
var rec: Recorder
initRecorder(rec)
defer:
deinitRecorder(rec)
setupRegistry(reg)
setupRecorder(rec)
var t = Tag(name: "a", rec: addr rec)
let id1 = addEventListener(reg, "evt", tagCb, addr t)
@ -100,7 +81,6 @@ suite "FFIEventRegistry mutation":
check removeEventListener(reg, id1)
check removeEventListener(reg, id2)
# Second remove of the same id is a no-op.
check not removeEventListener(reg, id1)
check snapshotListeners(reg, "evt").len == 0
@ -108,14 +88,8 @@ suite "FFIEventRegistry mutation":
suite "FFIEventRegistry snapshot semantics":
test "snapshot returns only the listeners for the requested event":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
var rec: Recorder
initRecorder(rec)
defer:
deinitRecorder(rec)
setupRegistry(reg)
setupRecorder(rec)
var a = Tag(name: "a", rec: addr rec)
var b = Tag(name: "b", rec: addr rec)
var c = Tag(name: "c", rec: addr rec)
@ -124,32 +98,19 @@ suite "FFIEventRegistry snapshot semantics":
discard addEventListener(reg, "evt", tagCb, addr b)
discard addEventListener(reg, "other", tagCb, addr c)
let snapEvt = snapshotListeners(reg, "evt")
check snapEvt.len == 2 # both listeners for "evt"
let snapOther = snapshotListeners(reg, "other")
check snapOther.len == 1 # only the listener for "other"
let snapUnknown = snapshotListeners(reg, "no-subscriber")
check snapUnknown.len == 0 # no listener for this event
check snapshotListeners(reg, "evt").len == 2
check snapshotListeners(reg, "other").len == 1
check snapshotListeners(reg, "no-subscriber").len == 0
test "snapshot is a copy: post-snapshot mutation does not affect it":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
var rec: Recorder
initRecorder(rec)
defer:
deinitRecorder(rec)
setupRegistry(reg)
setupRecorder(rec)
var t = Tag(name: "a", rec: addr rec)
let id1 = addEventListener(reg, "evt", tagCb, addr t)
let snap = snapshotListeners(reg, "evt")
check snap.len == 1
# Mutating the registry after the snapshot must not retroactively
# shrink or grow the snapshot we already captured.
check removeEventListener(reg, id1)
discard addEventListener(reg, "evt", tagCb, addr t)
check snap.len == 1
@ -157,14 +118,8 @@ suite "FFIEventRegistry snapshot semantics":
suite "removeAllEventListeners":
test "drops every registered listener":
var reg: FFIEventRegistry
initEventRegistry(reg)
defer:
deinitEventRegistry(reg)
var rec: Recorder
initRecorder(rec)
defer:
deinitRecorder(rec)
setupRegistry(reg)
setupRecorder(rec)
var a = Tag(name: "a", rec: addr rec)
var b = Tag(name: "b", rec: addr rec)

View File

@ -1,7 +1,4 @@
## Integration tests for the dedicated event thread introduced by
## issue #6. Each test stands up a real `FFIContext` (via
## `FFIContextPool`) and exercises the FFI thread → bounded queue →
## event thread → listener pipeline end to end.
## Integration tests for the dedicated event thread (issue #6).
import std/[atomics, locks, os, strutils]
import unittest2
@ -13,9 +10,6 @@ type TestEvtLib = object
type LatchPayload* {.ffi.} = object
iter*: int
## Captured-state callback identical to the helpers in
## `test_event_dispatch.nim`, repeated here so this file stays a
## self-contained test binary.
type CallbackData = object
lock: Lock
cond: Cond
@ -32,6 +26,13 @@ proc deinitCallbackData(d: var CallbackData) =
d.cond.deinitCond()
d.lock.deinitLock()
template setupCallbackData(name: untyped) =
## Declares `name`, inits it, and defers its deinit in the caller's scope.
var name: CallbackData
initCallbackData(name)
defer:
deinitCallbackData(name)
proc captureCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
@ -52,11 +53,13 @@ proc waitCallback(d: var CallbackData) =
wait(d.cond, d.lock)
release(d.lock)
proc resetCalled(d: var CallbackData) =
acquire(d.lock)
d.called = false
release(d.lock)
proc waitCallbackTimeout(d: var CallbackData, timeoutMs: int): bool =
## Polling variant for tests where the callback may legitimately
## never fire — returns true if observed `called` within the budget,
## false on timeout. Polls in 10 ms increments under `d.lock` so the
## load is synchronised with the `captureCb` writer.
## Polls under `d.lock` so the load syncs with the `captureCb` writer.
let deadline = Moment.now() + timeoutMs.milliseconds
while true:
acquire(d.lock)
@ -68,26 +71,26 @@ proc waitCallbackTimeout(d: var CallbackData, timeoutMs: int): bool =
return false
os.sleep(10)
# ---------------------------------------------------------------------------
# Request helpers
# ---------------------------------------------------------------------------
template withPool(ctxIdent: untyped, body: untyped) =
## Sets up a pool + ctx, runs body, destroys on exit.
var pool: FFIContextPool[TestEvtLib]
let ctxIdent = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctxIdent)
body
registerReqFFI(EmitLatchEvent, lib: ptr TestEvtLib):
proc(iter: int): Future[Result[string, string]] {.async.} =
dispatchFFIEventCbor("latch", LatchPayload(iter: iter))
return ok("emitted")
## A request whose async body completes immediately — useful for
## probing FFI-thread round-trip latency under load.
registerReqFFI(PingEvent, lib: ptr TestEvtLib):
proc(): Future[Result[string, string]] {.async.} =
return ok("pong")
## A request whose async body blocks the FFI thread synchronously
## (no await) so the heartbeat freezes. Used to exercise the new
## heartbeat-staleness path. Guarded by an Atomic switch so we can
## enable it deterministically per test rather than turning every
## test of this request type into a watchdog probe.
# Atomic switch so the wedge fires deterministically per test.
var gBlockingEnabled: Atomic[bool]
gBlockingEnabled.store(false)
@ -97,18 +100,12 @@ registerReqFFI(BlockingRequest, lib: ptr TestEvtLib):
os.sleep(milliseconds)
return ok("done")
# ---------------------------------------------------------------------------
# Thread-id capture
# ---------------------------------------------------------------------------
var gListenerThreadId: Atomic[int]
gListenerThreadId.store(-1)
proc captureThreadIdCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
## Records the OS thread id of the listener invocation, so the test
## can assert it differs from the FFI thread's id.
gListenerThreadId.store(getThreadId())
let d = cast[ptr CallbackData](userData)
acquire(d[].lock)
@ -116,9 +113,6 @@ proc captureThreadIdCb(
signal(d[].cond)
release(d[].lock)
## Returns the FFI thread's id by running a no-op request and reading
## `getThreadId()` from inside the handler. Used to compare against
## the listener's thread id below.
var gFfiThreadId: Atomic[int]
gFfiThreadId.store(-1)
@ -129,158 +123,93 @@ registerReqFFI(CaptureFfiTidRequest, lib: ptr TestEvtLib):
suite "event delivery is asynchronous":
test "listener runs on the event thread, not the FFI thread":
# CallbackData defers come BEFORE the pool-destroy defer so they run
# AFTER it (LIFO): the event thread is joined before any lock the
# event thread might still be holding is torn down — otherwise TSan
# flags `captureCb` accessing an already-destroyed mutex.
var evt: CallbackData
initCallbackData(evt)
defer:
deinitCallbackData(evt)
# CallbackData defers declared first run last (LIFO): pool-destroy joins
# the event thread before any still-held mutex is torn down. TSan otherwise
# flags `captureCb` on a destroyed mutex.
setupCallbackData(evt)
setupCallbackData(rsp)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
withPool(ctx):
discard addEventListener(
ctx[].eventRegistry, "latch", captureThreadIdCb, addr evt
)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
check sendRequestToFFIThread(
ctx, CaptureFfiTidRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
discard addEventListener(
ctx[].eventRegistry, "latch", captureThreadIdCb, addr evt
)
resetCalled(rsp)
check sendRequestToFFIThread(
ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0)
)
.isOk()
waitCallback(rsp)
waitCallback(evt)
# Capture the FFI thread id.
check sendRequestToFFIThread(
ctx, CaptureFfiTidRequest.ffiNewReq(captureCb, addr rsp)
)
.isOk()
waitCallback(rsp)
# Now emit an event from the FFI thread and wait for the listener.
acquire(rsp.lock)
rsp.called = false
release(rsp.lock)
check sendRequestToFFIThread(
ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0)
)
.isOk()
waitCallback(rsp)
waitCallback(evt)
let ffiTid = gFfiThreadId.load()
let listenerTid = gListenerThreadId.load()
check ffiTid >= 0
check listenerTid >= 0
check ffiTid != listenerTid
# ---------------------------------------------------------------------------
# Slow listener does not block the FFI thread
# ---------------------------------------------------------------------------
let ffiTid = gFfiThreadId.load()
let listenerTid = gListenerThreadId.load()
check ffiTid >= 0
check listenerTid >= 0
check ffiTid != listenerTid
proc slowSleepCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
## Sleeps long enough that we'd notice if the FFI thread were waiting
## on us before accepting the next request.
os.sleep(150)
suite "FFI thread independence":
test "slow listener does not block FFI thread request round-trip":
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
setupCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
withPool(ctx):
discard addEventListener(
ctx[].eventRegistry, "latch", slowSleepCb, nil
)
discard addEventListener(
ctx[].eventRegistry, "latch", slowSleepCb, nil
)
check sendRequestToFFIThread(
ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0)
)
.isOk()
waitCallback(rsp)
resetCalled(rsp)
# Fire an event, then immediately fire a ping request. With dispatch
# off the FFI thread, the ping must complete in well under 150 ms
# even though the prior event is still in flight on the event thread.
check sendRequestToFFIThread(
ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, 0)
)
.isOk()
waitCallback(rsp)
# chronos's `Moment` — std/times exports a `milliseconds` that
# shadows chronos's at this generic-instantiation site.
let started = Moment.now()
check sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rsp))
.isOk()
waitCallback(rsp)
let elapsed = Moment.now() - started
acquire(rsp.lock)
rsp.called = false
release(rsp.lock)
# Use chronos's Moment for timing so we don't pull std/times into
# scope — std/times exports a `milliseconds` proc that shadows the
# chronos one used inside the FFI thread body, breaking compilation
# of generic instantiations at this call site.
let started = Moment.now()
check sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rsp))
.isOk()
waitCallback(rsp)
let elapsed = Moment.now() - started
check elapsed < 100.milliseconds # ample margin under the 150 ms slow-listener sleep
# ---------------------------------------------------------------------------
# Heartbeat staleness fires onNotResponding
# ---------------------------------------------------------------------------
check elapsed < 100.milliseconds # under the 150 ms slow-listener sleep
when not defined(gcRefc):
## Skipped under `--mm:refc`: this test relies on `os.sleep`ing the
## FFI thread for several seconds inside a synchronous handler.
## refc plus the existing destroy-on-time policies make that
## combination flaky in CI; the orc path is the contract we care
## about here.
## Skipped under refc: sleeping the FFI thread inside a sync handler
## interacts badly with refc + existing destroy-on-time policies.
suite "FFI heartbeat staleness":
test "wedged FFI thread triggers onNotResponding via heartbeat":
# Lock-bearing CallbackData defers are declared FIRST so they run
# LAST (LIFO); pool-destroy runs FIRST and joins the event thread
# before any mutex it might still be holding is destroyed.
var notif: CallbackData
initCallbackData(notif)
defer:
deinitCallbackData(notif)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
setupCallbackData(notif)
setupCallbackData(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
# Disable the wedge before tearing down so destroy isn't blocked
# by the still-sleeping handler.
# Disable wedge first so destroy isn't blocked by the still-sleeping handler.
gBlockingEnabled.store(false)
discard pool.destroyFFIContext(ctx)
# Subscribe to the exact event name `onNotResponding` looks up so
# the captured signal can't be ambiguously satisfied by an unrelated
# event the test happens to dispatch.
discard addEventListener(
ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif
)
# Wait out the start-delay grace window first so the heartbeat
# check is actually armed.
# Wait out the start-delay so the heartbeat check is armed.
os.sleep(FFIHeartbeatStartDelay.milliseconds.int + 200)
# Wedge the FFI thread for longer than EventThreadTickInterval +
# FFIHeartbeatStaleThreshold so the event thread observes a
# frozen heartbeat across at least one tick boundary.
# Wedge long enough to cross at least one tick boundary.
gBlockingEnabled.store(true)
let wedgeMs =
(EventThreadTickInterval + FFIHeartbeatStaleThreshold).milliseconds.int +
@ -292,16 +221,8 @@ when not defined(gcRefc):
waitCallback(rsp)
gBlockingEnabled.store(false)
# The not_responding event should have been delivered while the
# FFI thread was wedged. captureCb writes `called` under
# `notif.lock`, so wait through the cond rather than reading it
# raw — avoids both the data race and the just-missed-it window.
check waitCallbackTimeout(notif, 1500)
# ---------------------------------------------------------------------------
# Queue overflow sets the stuck flag and rejects further requests
# ---------------------------------------------------------------------------
type BackpressureState = object
enteredLock: Lock
enteredCond: Cond
@ -325,10 +246,8 @@ proc deinitBackpressure(b: var BackpressureState) =
proc backpressureCb(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
## First invocation signals entered and then blocks until released —
## holds the event thread inside `withLock reg.lock`, which back-pressures
## subsequent dispatches and gives us a deterministic way to fill the
## queue.
## First call signals entered then blocks under reg.lock to back-pressure
## subsequent dispatches — gives a deterministic way to fill the queue.
let b = cast[ptr BackpressureState](userData)
if not b[].entered.exchange(true):
acquire(b[].enteredLock)
@ -340,9 +259,6 @@ proc backpressureCb(
wait(b[].releaseCond, b[].releaseLock)
release(b[].releaseLock)
## A request that does N back-to-back dispatches from the FFI thread.
## Used to push enough events at once that the queue saturates while
## the event thread is stuck inside the backpressure listener above.
registerReqFFI(BurstEmit, lib: ptr TestEvtLib):
proc(count: int): Future[Result[string, string]] {.async.} =
for i in 0 ..< count:
@ -351,91 +267,56 @@ registerReqFFI(BurstEmit, lib: ptr TestEvtLib):
suite "queue overflow":
test "overflow sets stuck flag, fires onNotResponding, rejects new requests":
# Lock-bearing state defers come FIRST so they run LAST (LIFO);
# pool destroy joins the event thread before any mutex still
# referenced from a listener is torn down.
var bp: BackpressureState
initBackpressure(bp)
defer:
deinitBackpressure(bp)
var notif: CallbackData
initCallbackData(notif)
defer:
deinitCallbackData(notif)
setupCallbackData(notif)
setupCallbackData(rsp)
setupCallbackData(rejected)
var rsp: CallbackData
initCallbackData(rsp)
defer:
deinitCallbackData(rsp)
withPool(ctx):
discard addEventListener(
ctx[].eventRegistry, "latch", backpressureCb, addr bp
)
discard addEventListener(
ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif
)
var rejected: CallbackData
initCallbackData(rejected)
defer:
deinitCallbackData(rejected)
# Kick one event so the listener holds reg.lock; subsequent enqueues
# pile up undrained.
check sendRequestToFFIThread(
ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, -1)
)
.isOk()
waitCallback(rsp)
var pool: FFIContextPool[TestEvtLib]
let ctx = pool.createFFIContext().valueOr:
check false
return
defer:
discard pool.destroyFFIContext(ctx)
acquire(bp.enteredLock)
while not bp.entered.load():
wait(bp.enteredCond, bp.enteredLock)
release(bp.enteredLock)
discard addEventListener(
ctx[].eventRegistry, "latch", backpressureCb, addr bp
)
# Subscribe to the exact not_responding event name so the wait below
# can't be falsely satisfied by a "latch" payload from the burst.
discard addEventListener(
ctx[].eventRegistry, NotRespondingEventName, captureCb, addr notif
)
# Burst > capacity in one request; tail enqueues flip the stuck flag.
resetCalled(rsp)
check sendRequestToFFIThread(
ctx, BurstEmit.ffiNewReq(captureCb, addr rsp, EventQueueCapacity + 8)
)
.isOk()
waitCallback(rsp)
# Kick off one event so the event thread enters backpressureCb and
# holds reg.lock. Once that's confirmed, any subsequent enqueues
# pile up in the queue without being drained.
check sendRequestToFFIThread(
ctx, EmitLatchEvent.ffiNewReq(captureCb, addr rsp, -1)
)
.isOk()
waitCallback(rsp)
check ctx.eventQueueStuck.load()
acquire(bp.enteredLock)
while not bp.entered.load():
wait(bp.enteredCond, bp.enteredLock)
release(bp.enteredLock)
let res =
sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rejected))
check res.isErr()
check res.error.contains("stuck")
# Now flood the queue. EventQueueCapacity+8 dispatches in one
# FFI-thread request, atomically from the queue's perspective: the
# event thread can't drain anything because it's stuck inside the
# first callback. The last several enqueues must hit the
# queue-full path and flip the stuck flag.
acquire(rsp.lock)
rsp.called = false
release(rsp.lock)
check sendRequestToFFIThread(
ctx, BurstEmit.ffiNewReq(captureCb, addr rsp, EventQueueCapacity + 8)
)
.isOk()
waitCallback(rsp)
# Release backpressure so drain advances and the stuck flag fires
# not_responding.
acquire(bp.releaseLock)
bp.release.store(true)
signal(bp.releaseCond)
release(bp.releaseLock)
# The stuck flag is set as soon as the first overflow happens;
# subsequent sendRequestToFFIThread calls must short-circuit.
check ctx.eventQueueStuck.load()
let res =
sendRequestToFFIThread(ctx, PingEvent.ffiNewReq(captureCb, addr rejected))
check res.isErr()
check res.error.contains("stuck")
# Release backpressure so the event thread can drain, advance past
# the backpressure listener, observe the stuck flag, and fire the
# not_responding notification.
acquire(bp.releaseLock)
bp.release.store(true)
signal(bp.releaseCond)
release(bp.releaseLock)
# The whole point of this test is that overflow surfaces the
# not_responding signal — assert it actually fires within a
# bounded window rather than letting the test silently pass.
check waitCallbackTimeout(notif, 2000)
check waitCallbackTimeout(notif, 2000)