diff --git a/examples/timer/rust_bindings/src/api.rs b/examples/timer/rust_bindings/src/api.rs index 709f9c1..14a47a1 100644 --- a/examples/timer/rust_bindings/src/api.rs +++ b/examples/timer/rust_bindings/src/api.rs @@ -129,12 +129,13 @@ pub struct MyTimerCtx { // SAFETY: The `ptr` field points to an FFIContext owned by the Nim runtime. // Every call through the generated FFI proc goes through -// `sendRequestToFFIThread` on the Nim side, which serialises every request -// behind `ctx.lock` and dispatches handlers on a single FFI thread, so the -// pointer is never accessed concurrently from Rust. The Nim-side reentrancy -// guard (`onFFIThread` threadvar) prevents handlers from re-entering the -// dispatcher and self-deadlocking. These invariants make it sound to mark -// the wrapper as Send + Sync. +// `sendRequestToFFIThread` on the Nim side, which only enqueues the request +// onto a mutex-guarded MPSC queue (sound from any number of threads) and +// wakes the single FFI thread that dispatches every handler. The context is +// thus never mutated non-atomically from the caller's thread. The Nim-side +// reentrancy guard (`onFFIThread` threadvar) prevents handlers from +// re-entering the dispatcher. These invariants make it sound to mark the +// wrapper as Send + Sync. unsafe impl Send for MyTimerCtx {} unsafe impl Sync for MyTimerCtx {} diff --git a/ffi/codegen/rust.nim b/ffi/codegen/rust.nim index aaedd35..282911d 100644 --- a/ffi/codegen/rust.nim +++ b/ffi/codegen/rust.nim @@ -505,19 +505,18 @@ proc generateApiRs*( ) lines.add("// Every call through the generated FFI proc goes through") lines.add( - "// `sendRequestToFFIThread` on the Nim side, which serialises every request" + "// `sendRequestToFFIThread` on the Nim side, which only enqueues the request" + ) + lines.add("// onto a mutex-guarded MPSC queue (sound from any number of threads) and") + lines.add( + "// wakes the single FFI thread that dispatches every handler. The context is" ) lines.add( - "// behind `ctx.lock` and dispatches handlers on a single FFI thread, so the" + "// thus never mutated non-atomically from the caller's thread. The Nim-side" ) - lines.add( - "// pointer is never accessed concurrently from Rust. The Nim-side reentrancy" - ) - lines.add("// guard (`onFFIThread` threadvar) prevents handlers from re-entering the") - lines.add( - "// dispatcher and self-deadlocking. These invariants make it sound to mark" - ) - lines.add("// the wrapper as Send + Sync.") + lines.add("// reentrancy guard (`onFFIThread` threadvar) prevents handlers from") + lines.add("// re-entering the dispatcher. These invariants make it sound to mark the") + lines.add("// wrapper as Send + Sync.") lines.add("unsafe impl Send for $1 {}" % [ctxTypeName]) lines.add("unsafe impl Sync for $1 {}" % [ctxTypeName]) lines.add("") diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index ef1b6f5..422769f 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -7,12 +7,13 @@ {.passc: "-fPIC".} import std/[atomics, locks, options, tables] -import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results +import chronicles, chronos, chronos/threadsync, results import ./ffi_types, ./ffi_events, ./ffi_handles, ./ffi_thread_request, + ./ffi_request_queue, ./logging, ./cbor_serial @@ -22,10 +23,8 @@ type FFIContext*[T] = object myLib*: ptr T # main library object (Waku, LibP2P, SDS, …) ffiThread: Thread[(ptr FFIContext[T])] eventThread: Thread[(ptr FFIContext[T])] - lock: Lock - reqChannel: ChannelSPSCSingle[ptr FFIThreadRequest] - reqSignal: ThreadSignalPtr - reqReceivedSignal: ThreadSignalPtr + reqQueueBank: RequestQueueBank # mutex-guarded MPSC ingress from foreign threads + reqSignal: ThreadSignalPtr # wakes the FFI thread on enqueue stopSignal: ThreadSignalPtr threadExitSignal: ThreadSignalPtr # bounds destroyFFIContext's wait so a blocked loop cannot hang the caller @@ -60,9 +59,10 @@ template closeAndNil(field: untyped) = field = nil proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = - ## Mirror of `initContextResources`. Threads MUST be joined first; - ## fields are nil'd after close so re-init on the same slot is safe. - ctx.lock.deinitLock() + ## Mirror of `initContextResources`. Threads MUST be joined first (FFI thread + ## drained); fields are nil'd after close so re-init on the same slot is safe. + ## `deinitRequestQueue` frees any request raced in after the final drain. + deinitRequestQueue(ctx[].reqQueueBank) deinitEventRegistry(ctx[].eventRegistry) deinitHandleRegistry(ctx[].handles) deinitEventQueue(ctx[].eventQueue) @@ -74,7 +74,6 @@ proc deinitContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = discard else: closeAndNil(ctx.reqSignal) - closeAndNil(ctx.reqReceivedSignal) closeAndNil(ctx.stopSignal) closeAndNil(ctx.threadExitSignal) closeAndNil(ctx.eventQueueSignal) @@ -96,12 +95,11 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## the slot (freeShared or pool.releaseSlot). # Nil first so deferred cleanup can't double-close a reused pool slot. ctx.reqSignal = nil - ctx.reqReceivedSignal = nil ctx.stopSignal = nil ctx.threadExitSignal = nil ctx.eventQueueSignal = nil ctx.eventThreadExitSignal = nil - ctx.lock.initLock() + initRequestQueue(ctx[].reqQueueBank) initEventRegistry(ctx[].eventRegistry) initHandleRegistry(ctx[].handles) initEventQueue(ctx[].eventQueue) @@ -116,7 +114,6 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = error = error newSignalOrErr(ctx.reqSignal, "reqSignal") - newSignalOrErr(ctx.reqReceivedSignal, "reqReceivedSignal") newSignalOrErr(ctx.stopSignal, "stopSignal") newSignalOrErr(ctx.threadExitSignal, "threadExitSignal") newSignalOrErr(ctx.eventQueueSignal, "eventQueueSignal") diff --git a/ffi/ffi_request_queue.nim b/ffi/ffi_request_queue.nim new file mode 100644 index 0000000..b553f33 --- /dev/null +++ b/ffi/ffi_request_queue.nim @@ -0,0 +1,109 @@ +## Sharded, mutex-guarded MPSC ingress for `ptr FFIThreadRequest`: foreign +## threads enqueue without serialising against each other. +## +## Why sharded: one shared queue funnels all producers through a single cache +## line, capping submit throughput. N independent queues (one per producer) +## remove that hotspot — producers contend only when two pick the same queue. +## +## Each queue is an intrusive FIFO under its own `Lock`: race-free under TSAN, and +## the request is its own node (intrusive `next`), so enqueue never allocates nor +## touches a Nim GC heap (the cross-thread `MemRegion` hazard). +## +## FIFO holds per queue, not globally. Unbounded by design: submit never blocks +## or rejects; completion comes via each request's callback. + +import std/[atomics, locks] +import ./ffi_thread_request + +const + RequestQueueCount* = 16 + ## Independent ingress queues. ≥ the expected concurrent producer count keeps + ## queue collisions (hence lock contention) near zero. + QueuePadBytes = 192 + ## Pads each queue well past a cache line (128B on Apple silicon) so adjacent + ## queues' hot fields never false-share — false sharing would re-serialise + ## exactly what the sharding is meant to spread out. + +static: + # `myQueueIndex` maps threads to queues with an `and` mask, so the count must + # be a power of two — otherwise the distribution silently skews onto a subset. + doAssert (RequestQueueCount and (RequestQueueCount - 1)) == 0, + "RequestQueueCount must be a power of two" + +type + RequestQueue = object + lock: Lock + head: ptr FFIThreadRequest ## consumer pops here (oldest) + tail: ptr FFIThreadRequest ## producers on this queue append here (newest) + pad: array[QueuePadBytes, byte] + + RequestQueueBank* = object + queues: array[RequestQueueCount, RequestQueue] + +var gRequestQueue {.threadvar.}: int +var gRequestQueueAssigned {.threadvar.}: bool +var gRequestQueueCounter: Atomic[int] + ## Hands each producer thread a distinct queue round-robin on first use, so + ## queues fill evenly regardless of OS thread-id distribution. + +proc myQueueIndex(): int {.raises: [].} = + if not gRequestQueueAssigned: + gRequestQueue = gRequestQueueCounter.fetchAdd(1) + gRequestQueueAssigned = true + return gRequestQueue and (RequestQueueCount - 1) # RequestQueueCount is a power of two + +proc initRequestQueue*(bank: var RequestQueueBank) {.raises: [].} = + for queue in bank.queues.mitems: + queue.lock.initLock() + queue.head = nil + queue.tail = nil + +proc deinitRequestQueue*(bank: var RequestQueueBank) {.raises: [].} = + ## Both producers and the consumer must have stopped. Frees any request still + ## queued on any queue — e.g. one a producer raced in after the FFI thread's + ## final drain — so a teardown race leaks nothing instead of dangling them. + for queue in bank.queues.mitems: + var request = queue.head + while not request.isNil(): + let nextRequest = request[].next + deleteRequest(request) + request = nextRequest + queue.head = nil + queue.tail = nil + queue.lock.deinitLock() + +proc pushRequest*( + bank: var RequestQueueBank, request: ptr FFIThreadRequest +): bool {.raises: [].} = + ## Append `request` to this producer thread's queue (takes ownership). Returns + ## true only when the queue was empty: the consumer sleeps on an empty queue, so + ## that's the one push that must wake it; a missed wake just waits the 100ms poll. + request[].next = nil + let idx = myQueueIndex() + withLock bank.queues[idx].lock: + let wasEmpty = bank.queues[idx].tail.isNil() + if bank.queues[idx].tail.isNil(): + bank.queues[idx].head = request + else: + bank.queues[idx].tail[].next = request + bank.queues[idx].tail = request + return wasEmpty + +proc mergeQueues*(bank: var RequestQueueBank): ptr FFIThreadRequest {.raises: [].} = + ## Single-consumer: splice every queue into one chain, resetting them to empty. + ## Returns nil when all are empty; the caller then owns the chain and must read + ## each request's `next` before dispatching (dispatch frees the request). + var head: ptr FFIThreadRequest = nil + var tail: ptr FFIThreadRequest = nil + for queue in bank.queues.mitems: + withLock queue.lock: + let h = queue.head + if not h.isNil(): + if head.isNil(): + head = h + else: + tail[].next = h + tail = queue.tail + queue.head = nil + queue.tail = nil + return head diff --git a/ffi/ffi_thread.nim b/ffi/ffi_thread.nim index a75cff9..3ec3c3a 100644 --- a/ffi/ffi_thread.nim +++ b/ffi/ffi_thread.nim @@ -4,50 +4,44 @@ ## and the `onFFIThread` threadvar. Companion to `event_thread.nim`. ## ## Responsibilities: -## - Receive `FFIThreadRequest`s from foreign threads via `reqChannel` and -## dispatch them through the user-registered handler table. +## - Receive `FFIThreadRequest`s from foreign threads via `reqQueueBank` (a +## mutex-guarded MPSC queue) and dispatch them through the user-registered +## handler table. ## - Advance `ctx.ffiHeartbeat` each loop iteration so the event thread can ## detect a wedged FFI thread. proc sendRequestToFFIThread*( - ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration + ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest ): Result[void, string] = if ctx.eventQueueStuck.load(): deleteRequest(ffiRequest) return err("event queue stuck - library cannot accept new requests") if onFFIThread: - # Re-entrant dispatch from a handler would self-deadlock on `reqReceivedSignal`. + # A handler re-dispatching onto its own FFI thread would enqueue work the + # blocked dispatcher can never drain; reject instead of dead-locking. deleteRequest(ffiRequest) return err( "reentrant ffi call: a handler invoked sendRequestToFFIThread on its own context" ) - # Serialise the trySend + fireSync + waitSync — reqChannel is SP and reqReceivedSignal is shared. - ctx.lock.acquire() - defer: - ctx.lock.release() + # The lock inside pushRequest covers only the O(1) enqueue; the wake stays + # outside it, so concurrent producers don't serialise. Unbounded, so enqueue + # can't fail — completion comes via the request's own callback, no accept-ack. + # + # Wake only when the push found the queue empty: while the consumer drains, a + # fireSync() syscall per submit (contended across producers) is what destroys + # scaling. A skipped wake can't strand the request — the consumer re-polls 100ms. + let shouldWake = ctx.reqQueueBank.pushRequest(ffiRequest) - let sentOk = ctx.reqChannel.trySend(ffiRequest) - if not sentOk: - deleteRequest(ffiRequest) - return err("Couldn't send a request to the ffi thread") + # A failed wake is non-fatal: the request is queued and the poll-drain + # dispatches it within a tick anyway. Returning err would double-fire the + # caller's callback for a request that still completes. + if shouldWake: + ctx.reqSignal.fireSync().isOkOr: + error "failed to wake FFI thread after enqueue (request still queued)", + error = error - let fireSyncRes = ctx.reqSignal.fireSync() - if fireSyncRes.isErr(): - deleteRequest(ffiRequest) - return err("failed fireSync: " & $fireSyncRes.error) - - if fireSyncRes.get() == false: - deleteRequest(ffiRequest) - return err("Couldn't fireSync in time") - - let res = ctx.reqReceivedSignal.waitSync(timeout) - if res.isErr(): - # FFI thread was signaled and owns the request; don't double-free. - return err("Couldn't receive reqReceivedSignal signal") - - # On ok the FFI thread's processRequest deallocShared(req)'s. ok() proc processRequest[T]( @@ -88,6 +82,12 @@ proc ffiNotifyEventEnqueuedHook() {.gcsafe, raises: [].} = if res.isErr(): error "failed to fire eventQueueSignal after enqueue", err = res.error +proc proveAlive(ctx: ptr FFIContext) = + ## Advance the heartbeat the event thread polls to spot a wedged FFI thread. + ## Only that the counter keeps moving matters, never its value — so a plain + ## atomic increment, no read-back. + ctx.ffiHeartbeat.atomicInc() + proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = ## FFI thread body that attends library user API requests ffiCurrentEventRegistry = addr ctx[].eventRegistry @@ -114,7 +114,7 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = # Tracked so shutdown can drain them; abandoning a mid-await future leaks the request. var pending: seq[Future[void]] = @[] - proc reapCompleted() = + proc cleanFinishedRequests() = var i = 0 while i < pending.len: if not pending[i].finished(): @@ -122,31 +122,41 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = continue pending.del(i) + proc processQueue() = + ## Process enqueued requests until the queue is empty. A single wake can + ## stand for many submits, so we drain fully rather than once per wake — + ## otherwise queued requests would sit until the next wake. + while true: + var request = ctx.reqQueueBank.mergeQueues() + if request.isNil(): + break + while not request.isNil(): + let nextRequest = request[].next # read before processRequest frees it + # Tick per dispatch so a large backlog can't flatline the heartbeat + # and trip the event thread's wedged-FFI-thread detection mid-drain. + ctx.proveAlive() + if ctx.myLib.isNil(): + # This reference must stay inside the closure: it's what keeps + # `ffiReqHandler` in the async env, so `myLib` survives across awaits. + ctx.myLib = addr ffiReqHandler + + pending.add processRequest(request, ctx) + request = nextRequest + while ctx.running.load(): # Freezes if a sync handler blocks the dispatcher; event thread reads to detect wedged FFI thread. - discard ctx.ffiHeartbeat.fetchAdd(1) + ctx.proveAlive() - reapCompleted() + cleanFinishedRequests() - let gotSignal = await ctx.reqSignal.wait().withTimeout(chronos.milliseconds(100)) - if not gotSignal: - continue + # Block until a submit signals us, or for at most 100ms if none does. + discard await ctx.reqSignal.wait().withTimeout(chronos.milliseconds(100)) + processQueue() - var request: ptr FFIThreadRequest - if not ctx.reqChannel.tryRecv(request): - continue - - if ctx.myLib.isNil(): - ctx.myLib = addr ffiReqHandler - - pending.add processRequest(request, ctx) - - let fireRes = ctx.reqReceivedSignal.fireSync() - if fireRes.isErr(): - error "could not fireSync back to requester thread", error = fireRes.error - - # Drain so each pending handler's deleteRequest defer runs before exit. - reapCompleted() + # Drain once more so requests enqueued just before `running` flipped still + # dispatch and each pending handler's deleteRequest defer runs before exit. + processQueue() + cleanFinishedRequests() if pending.len > 0: try: await allFutures(pending) diff --git a/ffi/ffi_thread_request.nim b/ffi/ffi_thread_request.nim index 25a9632..b6b8daf 100644 --- a/ffi/ffi_thread_request.nim +++ b/ffi/ffi_thread_request.nim @@ -26,6 +26,10 @@ type FFIThreadRequest* = object reqId*: cstring ## Per-proc Req type name used to look up the handler. data*: ptr UncheckedArray[byte] ## Owned CBOR-encoded request payload. dataLen*: int + next*: ptr FFIThreadRequest + ## Intrusive ingress-queue link (see `ffi_request_queue.nim`). Touched only + ## under the queue's lock; the request doubles as its own node, so no + ## separate node alloc lands on the per-thread ORC MemRegion. proc allocBaseRequest( callback: FFICallBack, userData: pointer, reqId: cstring @@ -39,6 +43,7 @@ proc allocBaseRequest( ret[].reqId = reqId.alloc() ret[].data = nil ret[].dataLen = 0 + ret[].next = nil return ret proc copySharedPayload(req: ptr FFIThreadRequest, data: ptr byte, dataLen: int) = diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index 200c8da..1e33c59 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -857,12 +857,9 @@ macro ffi*(args: varargs[untyped]): untyped = ) proc asyncPath(): NimNode = - ## Emits the C-exported wrapper and registers the request handler. - ## All `.ffi.` procs dispatch through the FFI thread channel and reply - ## through the callback when the future resolves — the previous "sync - ## fast-path" that ran inline on the foreign caller thread was removed - ## (PR #23 review, items 1–5) because it bypassed `foreignThreadGc`, - ## `ctx.lock`, and chronos's single-thread invariant. + ## Emits the C-exported wrapper and registers the handler. Every `.ffi.` proc + ## dispatches through the FFI thread and replies via its callback, honouring + ## `foreignThreadGc`, the MPSC ingress hand-off, and chronos's invariant. let helperProc = buildAsyncHelperProc() # registerReqFFI lambda: typed params, returns user's typed Result. diff --git a/tests/bench/README.md b/tests/bench/README.md index f0c4482..09312fe 100644 --- a/tests/bench/README.md +++ b/tests/bench/README.md @@ -9,7 +9,13 @@ This directory holds Nim micro/stress benchmarks. Neither is part of `nimble tes `bench_ffi_submit.nim` motivates [issue #90](https://github.com/logos-messaging/nim-ffi/issues/90): every foreign-thread call serialises the whole `trySend + reqSignal.fireSync + reqReceivedSignal.waitSync` cycle under a single `ctx.lock`. The lock is load-bearing because `reqChannel` is single-slot and the accept handshake waits on a *shared* `reqReceivedSignal`, so producers cannot overlap. -The bench fans **K producer threads (1 → 8)** at one context, each firing the same per-thread volume of no-op requests. It times the **submit phase only** — from the start gate until every producer returns from its last `sendRequestToFFIThread` — because that is the path the fix parallelises; completion is bounded by the single FFI thread and deliberately excluded. Each thread count runs `FFI_SUBMIT_ITERS` times (default 5) and the **median** submit/sec is reported, so run-to-run noise can't move the verdict. +The bench fans **K producer threads** at one context (default sweep `1,2,4,8`), each firing the same per-thread volume of no-op requests. It times the **submit phase only** — from the start gate until every producer returns from its last `sendRequestToFFIThread` — because that is the path the fix parallelises; completion is bounded by the single FFI thread and deliberately excluded. Each thread count runs `FFI_SUBMIT_ITERS` times (default 5) and the **median** submit/sec is reported, so run-to-run noise can't move the verdict. + +The high-contention curve (up to 100 producers) is **opt-in for local runs**, not CI: under a sanitizer on a slow runner, 100 threads can't settle their callbacks within the bench's timeout and would fail on time, not on a bug. Run it on demand with `FFI_SUBMIT_THREADS`: + +```sh +FFI_SUBMIT_THREADS="1,8,16,32,64,100" nimble bench_ffi_submit +``` It is also a correctness stress test: the aggregate callback count must match the submit count **exactly** (no drops or double-fires), with zero submit errors and (under asan/lsan/tsan) zero leaks or races. @@ -21,13 +27,26 @@ FFI_SUBMIT_PER_THREAD=2000 FFI_SUBMIT_ITERS=1 FFI_SCALING_GATE=0 nimble bench_ff NIM_FFI_SAN=tsan FFI_SUBMIT_PER_THREAD=2000 FFI_SCALING_GATE=0 nimble bench_ffi_submit ``` -Env knobs: `FFI_SUBMIT_PER_THREAD` (volume per producer, default 20000), `FFI_SUBMIT_ITERS` (median sample count, default 5), `FFI_SCALING_GATE` (default `1`; set `0` to report numbers without failing). +Env knobs: `FFI_SUBMIT_PER_THREAD` (volume per producer, default 20000), `FFI_SUBMIT_ITERS` (median sample count, default 5), `FFI_SUBMIT_THREADS` (comma-separated producer counts, default `1,2,4,8`), `FFI_SCALING_GATE` (default `1`; set `0` to report numbers without failing). -### Scaling gate — red until the lock is replaced +### Scaling gate — guards the sharded ingress against regression -By default the bench **fails** (non-zero exit) unless submit throughput at 8 threads is at least `1.5x` the 1-thread rate. This is a forcing function: it cannot pass while `sendRequestToFFIThread` holds `ctx.lock` across the synchronous `reqReceivedSignal` accept, because that serialises every submit no matter how many producers run. +By default the bench **fails** (non-zero exit) unless submit throughput at the top thread count is at least `1.5x` the 1-thread rate. This was a forcing function while `sendRequestToFFIThread` still held `ctx.lock` across the synchronous `reqReceivedSignal` accept (which serialised every submit); it is now a regression guard for the sharded ingress that replaced it. -Baseline measured 2026-06-24 (16-core Linux, orc, `-d:danger`, median of 5): submit scaling held at **0.98–1.16x** across threads — flat, as the lock dictates. `1.5x` sits above that noise ceiling (so the lock-bound code fails reliably) and well below the `>=2x` that parallel lock-free MPSC ingress yields on any multicore host (so the fix clears it with margin). Once it lands and this turns green, keep the gate as a regression guard. +The original lock-bound code measured 2026-06-24 (16-core Linux, orc, `-d:danger`, median of 5): submit scaling held flat at **0.98–1.16x** — adding producers bought nothing. `1.5x` sits above that noise ceiling so the old code fails reliably, and well below what the sharded ingress delivers, so the fix clears it with wide margin. + +The fix replaces the single-slot channel + accept handshake with a **sharded, mutex-guarded MPSC ingress** (`ffi/ffi_request_queue.nim`): independent per-producer queues remove the single shared hotspot, and the wake fires only on a queue's empty→non-empty edge so submits don't each pay a syscall. Measured 2026-06-25 (Apple M5, 10 cores, orc, `-d:danger`, median of 5), submit/sec and scaling vs 1 thread: + +| threads | sharded ingress | vs 1T | lock-free MPSC (Vyukov) | vs 1T | +| ---: | ---: | :---: | ---: | :---: | +| 1 | 2.14M | 1.00x | 1.18M | 1.00x | +| 8 | 17.6M | 8.22x | 0.23M | 0.20x | +| 16 | 20.3M | 9.52x | 0.13M | 0.11x | +| 32 | 23.4M | 10.96x | 0.115M | 0.10x | +| 64 | 24.5M | 11.44x | 0.113M | 0.10x | +| 100 | 23.8M | **11.13x** | 0.113M | **0.10x** | + +The sharded ingress tracks the hardware ceiling (~24M/s plateau, saturating the cores) and degrades gracefully past it; the single-hotspot lock-free queue collapses to a contention floor and gets *worse* with more threads. Both are correct at every level (callback count matches submits exactly, no drops/dupes). The gate runs in the non-sanitized **Submit Scaling Gate** CI job (`.github/workflows/ci.yml`); the sanitized jobs run the same bench with `FFI_SCALING_GATE=0` for leak/race coverage only, since sanitizer instrumentation makes throughput scaling meaningless. diff --git a/tests/bench/bench_ffi_submit.nim b/tests/bench/bench_ffi_submit.nim index ec93da0..280371d 100644 --- a/tests/bench/bench_ffi_submit.nim +++ b/tests/bench/bench_ffi_submit.nim @@ -119,7 +119,19 @@ proc main() = let gateOn = getEnv("FFI_SCALING_GATE", "1") != "0" if perThread < 1 or iters < 1: quit("FFI_SUBMIT_PER_THREAD and FFI_SUBMIT_ITERS must be >= 1") - let threadCounts = [1, 2, 4, 8] + # Default sweep is light so CI (and the slower asan/tsan jobs) stays fast. Set + # FFI_SUBMIT_THREADS for the high-contention curve locally — under a sanitizer + # it can outrun `settleTimeout` and fail on timing, not a real bug. + # FFI_SUBMIT_THREADS="1,8,16,32,64,100" nimble bench_ffi_submit + let threadCounts = block: + var cs: seq[int] + for part in getEnv("FFI_SUBMIT_THREADS", "1,2,4,8").split(','): + let p = part.strip() + if p.len > 0: + cs.add(parseInt(p)) + if cs.len < 2: + quit("FFI_SUBMIT_THREADS needs >= 2 counts (first = baseline, last = peak)") + cs echo "── sendRequestToFFIThread submit throughput (median of ", iters, ") ──────" diff --git a/tests/unit/test_ffi_context.nim b/tests/unit/test_ffi_context.nim index b8e8cb8..0b76845 100644 --- a/tests/unit/test_ffi_context.nim +++ b/tests/unit/test_ffi_context.nim @@ -487,12 +487,9 @@ suite "Nim-native .ffi. / .ffiCtor. API": check ctorRes.isOk check ctorRes.value.value == 21 -# Regression for PR #23 review items 1–5: a `.ffi.` body without `await` -# used to be emitted as an inline-on-foreign-thread fast path, which bypassed -# `foreignThreadGc`, `ctx.lock`, and chronos's single-thread invariant. The -# sync fast-path was deleted; this test records `getThreadId()` inside a -# sync body and asserts the handler runs on the FFI thread, not on the -# caller's thread. +# A sync `.ffi.` body (no `await`) must run on the FFI thread, not the caller's, +# so it goes through `foreignThreadGc`, the MPSC ingress hand-off, and chronos's +# single-thread invariant. Records `getThreadId()` in a sync body and asserts it. var gRecordedHandlerTid: Atomic[int] @@ -551,11 +548,9 @@ suite "sync-body .ffi. runs on FFI thread (PR #23 regression)": # And the callback payload (the recorded tid) matches what the handler stored. check cborDecode(callbackBytes(d), int).value == handlerTid -# Regression for PR #23 review item 6: reentrancy guard on -# sendRequestToFFIThread. A handler running on the FFI thread that tries to -# dispatch back through sendRequestToFFIThread used to self-deadlock waiting -# on `reqReceivedSignal` (which only the FFI thread can fire). The guard now -# returns an Err immediately. +# Reentrancy guard on sendRequestToFFIThread: a handler running on the FFI thread +# that re-dispatches through it would enqueue work onto the very queue its blocked +# dispatcher can never drain, so the guard returns an Err immediately. var gReentrantNestedRes: Channel[string] gReentrantNestedRes.open()