test(bench): submit-throughput scaling gate for sendRequestToFFIThread (issue #90) (#97)

This commit is contained in:
Gabriel Cruz 2026-06-25 18:02:45 -03:00 committed by GitHub
parent 64a332ca8b
commit 4bac7a7bc6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 348 additions and 77 deletions

View File

@ -169,51 +169,15 @@ jobs:
nimble test_cpp_e2e -y
check-bindings:
# Single OS is enough — codegen output is platform-independent; the Nim
# matrix catches version-sensitive output (the PR #39 drift class).
name: Check generated bindings
needs: versions
# Codegen output is platform-independent — single OS is enough. Matrix
# over Nim versions to catch any version-sensitive output. Catches the
# class of drift surfaced in PR #39 (C++ regen committed, Rust
# overlooked); see `nimble check_bindings` in ffi.nimble.
strategy:
fail-fast: false
matrix:
nim-version: ${{ fromJSON(needs.versions.outputs.nim-versions) }}
runs-on: ubuntu-22.04
env:
NIMBLE_VERSION: ${{ needs.versions.outputs.nimble }}
steps:
- uses: actions/checkout@v4
- name: Setup Nim
uses: jiro4989/setup-nim-action@v2
with:
nim-version: ${{ matrix.nim-version }}
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install Nimble ${{ env.NIMBLE_VERSION }}
run: |
cd /tmp && nimble install "nimble@${{ env.NIMBLE_VERSION }}" -y
echo "$HOME/.nimble/bin" >> $GITHUB_PATH
- name: Cache nimble deps
id: cache-nimbledeps
uses: actions/cache@v4
with:
path: |
nimbledeps/
nimble.paths
key: ${{ runner.os }}-nimbledeps-${{ matrix.nim-version }}-${{ hashFiles('*.nimble') }}
restore-keys: |
${{ runner.os }}-nimbledeps-${{ matrix.nim-version }}-
${{ runner.os }}-nimbledeps-
- name: Install nimble deps
if: steps.cache-nimbledeps.outputs.cache-hit != 'true'
run: nimble setup --localdeps -y
- name: Verify checked-in bindings match generator output
run: nimble check_bindings -y
uses: ./.github/workflows/nimble-job.yml
with:
run: nimble check_bindings -y
nim-versions: ${{ needs.versions.outputs.nim-versions }}
nimble-version: ${{ needs.versions.outputs.nimble }}
tests-asan-ubsan:
name: Tests · ASan+UBSan+LSan
@ -233,6 +197,21 @@ jobs:
nim-versions: ${{ needs.versions.outputs.nim-versions }}
nimble-version: ${{ needs.versions.outputs.nimble }}
submit-scaling-gate:
# Forcing function, red by design: asserts sendRequestToFFIThread submit
# throughput scales with producer-thread count. The per-request global lock
# serialises every submit, so this stays red until the lock is replaced with
# MPSC ingress — a standing reminder, not a transient failure. Pinned to orc +
# unsanitized because the gate is timing-based and the contention it measures
# is mm-independent. Full rationale and baseline numbers: tests/bench/README.md.
name: Submit Scaling Gate
needs: versions
uses: ./.github/workflows/nimble-job.yml
with:
run: NIM_FFI_MM=orc FFI_SUBMIT_PER_THREAD=20000 nimble bench_ffi_submit -y
nim-versions: ${{ needs.versions.outputs.nim-versions }}
nimble-version: ${{ needs.versions.outputs.nimble }}
auto-assign:
name: Auto-assign PR author
if: github.event_name == 'pull_request' && github.event.action == 'opened'

66
.github/workflows/nimble-job.yml vendored Normal file
View File

@ -0,0 +1,66 @@
name: nimble-job
# Single-OS job that sets up the Nim/Nimble toolchain + cached deps, then runs
# one shell command across the Nim-version matrix. Shared by the ci.yml jobs
# whose only difference is the command they run (check-bindings, the submit
# scaling gate, …). Multi-OS / sanitizer matrices live in test.yml /
# tests-sanitized.yml instead.
on:
workflow_call:
inputs:
run:
required: true
type: string
description: Shell command to run once the toolchain and deps are ready.
nim-versions:
required: true
type: string
description: JSON array of Nim versions to matrix over.
nimble-version:
required: true
type: string
runs-on:
required: false
type: string
default: ubuntu-22.04
jobs:
run:
strategy:
fail-fast: false
matrix:
nim-version: ${{ fromJSON(inputs.nim-versions) }}
runs-on: ${{ inputs.runs-on }}
name: Nim ${{ matrix.nim-version }}
steps:
- uses: actions/checkout@v4
- name: Setup Nim
uses: jiro4989/setup-nim-action@v2
with:
nim-version: ${{ matrix.nim-version }}
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install Nimble ${{ inputs.nimble-version }}
run: |
cd /tmp && nimble install "nimble@${{ inputs.nimble-version }}" -y
echo "$HOME/.nimble/bin" >> $GITHUB_PATH
- name: Cache nimble deps
id: cache-nimbledeps
uses: actions/cache@v4
with:
path: |
nimbledeps/
nimble.paths
key: ${{ runner.os }}-nimbledeps-${{ matrix.nim-version }}-${{ hashFiles('*.nimble') }}
restore-keys: |
${{ runner.os }}-nimbledeps-${{ matrix.nim-version }}-
${{ runner.os }}-nimbledeps-
- name: Install nimble deps
if: steps.cache-nimbledeps.outputs.cache-hit != 'true'
run: nimble setup --localdeps -y
- name: Run
run: ${{ inputs.run }}

View File

@ -89,5 +89,15 @@ jobs:
- name: Run unit tests (${{ inputs.sanitizer }})
run: nimble test_sanitized -y
# Correctness only here (exactly-once, no leaks/races). The scaling gate is
# off: sanitizers distort thread timing, so throughput scaling is measured
# in the non-sanitized Submit Scaling Gate job (ci.yml) instead.
- name: Run sendRequestToFFIThread submit stress (${{ inputs.sanitizer }})
env:
FFI_SUBMIT_PER_THREAD: 2000
FFI_SUBMIT_ITERS: 1
FFI_SCALING_GATE: 0
run: nimble bench_ffi_submit -y
- name: Run C++ e2e tests (${{ inputs.sanitizer }})
run: nimble test_cpp_e2e_sanitized -y

View File

@ -146,17 +146,20 @@ inline CborError encode_cbor(CborEncoder& e, const std::optional<T>& v) {
// ── decode_cbor overloads ───────────────────────────────────────────────
inline CborError decode_cbor(CborValue& it, bool& out) {
if (!cbor_value_is_boolean(&it)) return CborErrorImproperValue;
CborError err = cbor_value_get_boolean(&it, &out);
// After reading a leaf value, the parser must advance past it; both steps
// short-circuit on the same CborError, so they always travel together.
inline CborError advance_if_ok(CborValue& it, CborError err) {
if (err) return err;
return cbor_value_advance(&it);
}
inline CborError decode_cbor(CborValue& it, bool& out) {
if (!cbor_value_is_boolean(&it)) return CborErrorImproperValue;
return advance_if_ok(it, cbor_value_get_boolean(&it, &out));
}
inline CborError decode_cbor(CborValue& it, int64_t& out) {
if (!cbor_value_is_integer(&it)) return CborErrorImproperValue;
CborError err = cbor_value_get_int64_checked(&it, &out);
if (err) return err;
return cbor_value_advance(&it);
return advance_if_ok(it, cbor_value_get_int64_checked(&it, &out));
}
inline CborError decode_cbor(CborValue& it, int32_t& out) {
int64_t tmp = 0;
@ -167,15 +170,11 @@ inline CborError decode_cbor(CborValue& it, int32_t& out) {
}
inline CborError decode_cbor(CborValue& it, uint64_t& out) {
if (!cbor_value_is_unsigned_integer(&it)) return CborErrorImproperValue;
CborError err = cbor_value_get_uint64(&it, &out);
if (err) return err;
return cbor_value_advance(&it);
return advance_if_ok(it, cbor_value_get_uint64(&it, &out));
}
inline CborError decode_cbor(CborValue& it, double& out) {
if (cbor_value_is_double(&it)) {
CborError err = cbor_value_get_double(&it, &out);
if (err) return err;
return cbor_value_advance(&it);
return advance_if_ok(it, cbor_value_get_double(&it, &out));
}
if (cbor_value_is_float(&it)) {
float f = 0.0f;
@ -192,9 +191,8 @@ inline CborError decode_cbor(CborValue& it, std::string& out) {
CborError err = cbor_value_get_string_length(&it, &len);
if (err) return err;
out.resize(len);
err = cbor_value_copy_text_string(&it, out.empty() ? nullptr : &out[0], &len, nullptr);
if (err) return err;
return cbor_value_advance(&it);
return advance_if_ok(
it, cbor_value_copy_text_string(&it, out.empty() ? nullptr : &out[0], &len, nullptr));
}
template<typename T>
@ -223,10 +221,8 @@ inline CborError decode_cbor(CborValue& it, std::vector<std::uint8_t>& out) {
CborError err = cbor_value_get_string_length(&it, &len);
if (err) return err;
out.resize(len);
err = cbor_value_copy_byte_string(
&it, out.empty() ? nullptr : out.data(), &len, nullptr);
if (err) return err;
return cbor_value_advance(&it);
return advance_if_ok(
it, cbor_value_copy_byte_string(&it, out.empty() ? nullptr : out.data(), &len, nullptr));
}
template<typename T>

View File

@ -65,6 +65,25 @@ proc sanFlags(san: string): string =
else:
raise newException(ValueError, "unknown NIM_FFI_SAN: " & san)
proc mmModes(): seq[string] =
## Memory-management modes to build under, selected by NIM_FFI_MM (empty = both).
case getEnv("NIM_FFI_MM", "")
of "orc":
@[nimFlagsOrc]
of "refc":
@[nimFlagsRefc]
else:
@[nimFlagsOrc, nimFlagsRefc]
proc applyTsanSuppressions() =
## Adds tsan.supp to TSAN_OPTIONS without clobbering options the CI job set.
let suppPath = thisDir() & "/tsan.supp"
let existing = getEnv("TSAN_OPTIONS")
if existing == "":
putEnv("TSAN_OPTIONS", "suppressions=" & suppPath)
elif "suppressions=" notin existing:
putEnv("TSAN_OPTIONS", existing & ":suppressions=" & suppPath)
task buildffi, "Compile the library":
exec "nim c " & nimFlagsOrc & " --app:lib --noMain ffi.nim"
@ -90,6 +109,17 @@ task bench_codec, "Microbenchmark: cbor vs c (cwire) wire-format codecs":
# debug build. Not part of `test` — timing is a measurement, not a gate.
exec "nim c -r " & nimFlagsOrc & " -d:danger tests/bench/bench_codec.nim"
task bench_ffi_submit,
"Concurrent-submit stress + scaling gate for sendRequestToFFIThread":
# Honors NIM_FFI_SAN / NIM_FFI_MM like test_sanitized so CI drives it under
# asan-ubsan and tsan; FFI_SUBMIT_PER_THREAD sets per-thread volume.
let san = getEnv("NIM_FFI_SAN", "none")
let extra = sanFlags(san)
if san == "tsan":
applyTsanSuppressions()
for flags in mmModes():
exec "nim c -r " & flags & " -d:danger" & extra & " tests/bench/bench_ffi_submit.nim"
task test_cpp_e2e, "Build and run the C++ end-to-end tests for the timer example":
# Regenerate the C++ bindings so the suite always runs against fresh codegen.
runOrQuit "nimble genbindings_cpp"
@ -104,23 +134,10 @@ task test_cpp_e2e, "Build and run the C++ end-to-end tests for the timer example
task test_sanitized,
"Run all unit tests under a sanitizer (NIM_FFI_SAN) and mm (NIM_FFI_MM)":
let san = getEnv("NIM_FFI_SAN", "none")
let mm = getEnv("NIM_FFI_MM", "")
let extra = sanFlags(san)
let modes =
if mm == "orc":
@[nimFlagsOrc]
elif mm == "refc":
@[nimFlagsRefc]
else:
@[nimFlagsOrc, nimFlagsRefc]
if san == "tsan":
let suppPath = thisDir() & "/tsan.supp"
let existing = getEnv("TSAN_OPTIONS")
if existing == "":
putEnv("TSAN_OPTIONS", "suppressions=" & suppPath)
elif "suppressions=" notin existing:
putEnv("TSAN_OPTIONS", existing & ":suppressions=" & suppPath)
for flags in modes:
applyTsanSuppressions()
for flags in mmModes():
for t in unitTests:
exec "nim c -r " & flags & extra & " tests/unit/" & t & ".nim"

View File

@ -1,3 +1,36 @@
# FFI benchmarks
This directory holds Nim micro/stress benchmarks. Neither is part of `nimble test`.
- `bench_codec.nim``cbor` vs `c` (cwire) wire-format codec microbenchmark (documented below). Pure measurement, not a gate.
- `bench_ffi_submit.nim` — concurrent-submit stress test + throughput benchmark for `sendRequestToFFIThread` (documented next). Carries a **scaling gate** that fails CI until the per-request submit lock is replaced.
## `sendRequestToFFIThread` concurrent-submit stress / throughput
`bench_ffi_submit.nim` motivates [issue #90](https://github.com/logos-messaging/nim-ffi/issues/90): every foreign-thread call serialises the whole `trySend + reqSignal.fireSync + reqReceivedSignal.waitSync` cycle under a single `ctx.lock`. The lock is load-bearing because `reqChannel` is single-slot and the accept handshake waits on a *shared* `reqReceivedSignal`, so producers cannot overlap.
The bench fans **K producer threads (1 → 8)** at one context, each firing the same per-thread volume of no-op requests. It times the **submit phase only** — from the start gate until every producer returns from its last `sendRequestToFFIThread` — because that is the path the fix parallelises; completion is bounded by the single FFI thread and deliberately excluded. Each thread count runs `FFI_SUBMIT_ITERS` times (default 5) and the **median** submit/sec is reported, so run-to-run noise can't move the verdict.
It is also a correctness stress test: the aggregate callback count must match the submit count **exactly** (no drops or double-fires), with zero submit errors and (under asan/lsan/tsan) zero leaks or races.
```sh
nimble bench_ffi_submit
# smaller / faster (handy under sanitizers — they distort timing, so disable the gate):
FFI_SUBMIT_PER_THREAD=2000 FFI_SUBMIT_ITERS=1 FFI_SCALING_GATE=0 nimble bench_ffi_submit
# under a sanitizer (proves no leaks/races; gate off — see below):
NIM_FFI_SAN=tsan FFI_SUBMIT_PER_THREAD=2000 FFI_SCALING_GATE=0 nimble bench_ffi_submit
```
Env knobs: `FFI_SUBMIT_PER_THREAD` (volume per producer, default 20000), `FFI_SUBMIT_ITERS` (median sample count, default 5), `FFI_SCALING_GATE` (default `1`; set `0` to report numbers without failing).
### Scaling gate — red until the lock is replaced
By default the bench **fails** (non-zero exit) unless submit throughput at 8 threads is at least `1.5x` the 1-thread rate. This is a forcing function: it cannot pass while `sendRequestToFFIThread` holds `ctx.lock` across the synchronous `reqReceivedSignal` accept, because that serialises every submit no matter how many producers run.
Baseline measured 2026-06-24 (16-core Linux, orc, `-d:danger`, median of 5): submit scaling held at **0.981.16x** across threads — flat, as the lock dictates. `1.5x` sits above that noise ceiling (so the lock-bound code fails reliably) and well below the `>=2x` that parallel lock-free MPSC ingress yields on any multicore host (so the fix clears it with margin). Once it lands and this turns green, keep the gate as a regression guard.
The gate runs in the non-sanitized **Submit Scaling Gate** CI job (`.github/workflows/ci.yml`); the sanitized jobs run the same bench with `FFI_SCALING_GATE=0` for leak/race coverage only, since sanitizer instrumentation makes throughput scaling meaningless.
# FFI wire-format codec benchmark
`bench_codec.nim` is a single-process Nim microbenchmark comparing the two FFI

View File

@ -0,0 +1,170 @@
## Concurrent-submit stress test + throughput bench for `sendRequestToFFIThread`,
## motivating its per-request submit lock. See tests/bench/README.md for the why.
import std/[atomics, algorithm, strutils, os]
import results
import ../../ffi # chronos (Moment/Duration) and the FFI surface both arrive here.
type BenchLib = object
registerReqFFI(NoopRequest, lib: ptr BenchLib):
proc(): Future[Result[string, string]] {.async.} =
return ok("ok")
var gStart: Atomic[bool]
var gCompleted: Atomic[int] ## bumped once per callback; also the callback userData
var gSendErrors: Atomic[int]
let settleTimeout = 30.seconds
## Forcing gate: min submit-throughput scaling (max-threads / 1-thread); red
## until the per-request submit lock is replaced. See README "Scaling gate".
const RequiredScaling = 1.5
proc benchCallback(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
let counter = cast[ptr Atomic[int]](userData)
discard counter[].fetchAdd(1)
type ProducerArg = object
ctx: ptr FFIContext[BenchLib]
count: int
proc producerBody(arg: ptr ProducerArg) {.thread, gcsafe.} =
while not gStart.load():
discard
for _ in 0 ..< arg[].count:
let req = NoopRequest.ffiNewReq(benchCallback, addr gCompleted)
if sendRequestToFFIThread(arg[].ctx, req).isErr():
discard gSendErrors.fetchAdd(1)
proc waitForCompletions(target: int): bool =
## Spins until `gCompleted` reaches `target`, bounded by `settleTimeout`.
let deadline = Moment.now() + settleTimeout
while gCompleted.load() < target:
if Moment.now() > deadline:
return false
os.sleep(1)
true
proc median(xs: seq[float]): float =
if xs.len == 0:
return 0.0
let s = xs.sorted()
if s.len mod 2 == 1:
return s[s.len div 2]
(s[s.len div 2 - 1] + s[s.len div 2]) / 2.0
type IterResult = object
submitRate: float ## submits/sec over the submit phase only (sends issued)
sendErrors: int
overruns: int ## callbacks beyond `total` — must be 0 (no double-fire)
proc runOnce(
pool: var FFIContextPool[BenchLib], numThreads, perThread: int
): IterResult =
let ctx = pool.createFFIContext().valueOr:
quit("createFFIContext failed: " & $error)
defer:
discard pool.destroyFFIContext(ctx)
let total = numThreads * perThread
gStart.store(false)
gCompleted.store(0)
gSendErrors.store(0)
var threads = newSeq[Thread[ptr ProducerArg]](numThreads)
var args = newSeq[ProducerArg](numThreads)
for i in 0 ..< numThreads:
args[i] = ProducerArg(ctx: ctx, count: perThread)
createThread(threads[i], producerBody, addr args[i])
# Times the lock-serialised submit path only; completion (single FFI thread) is excluded.
let start = Moment.now()
gStart.store(true)
joinThreads(threads)
let submitSec = (Moment.now() - start).nanoseconds.float / 1_000_000_000.0
if not waitForCompletions(total):
quit("timed out waiting for callbacks: got " & $gCompleted.load() & " of " & $total)
os.sleep(50) # let any erroneous extra callbacks land before reading overruns
IterResult(
submitRate: total.float / submitSec,
sendErrors: gSendErrors.load(),
overruns: max(0, gCompleted.load() - total),
)
proc enforceScalingGate(medianRate: seq[float]) =
## Fails the process when submit throughput doesn't scale past RequiredScaling.
let scalingMax = medianRate[^1] / medianRate[0]
echo ""
if scalingMax < RequiredScaling:
quit(
"SCALING GATE: submit scaling " & formatFloat(scalingMax, ffDecimal, 2) &
"x < required " & formatFloat(RequiredScaling, ffDecimal, 2) &
"x. The per-request global lock serialises every submit; replace it with " &
"MPSC ingress (see tests/bench/README.md) to make this pass."
)
echo " scaling gate: ",
formatFloat(scalingMax, ffDecimal, 2),
"x >= ",
formatFloat(RequiredScaling, ffDecimal, 2),
"x — submit path scales."
proc main() =
let perThread = parseInt(getEnv("FFI_SUBMIT_PER_THREAD", "20000"))
let iters = parseInt(getEnv("FFI_SUBMIT_ITERS", "5"))
let gateOn = getEnv("FFI_SCALING_GATE", "1") != "0"
if perThread < 1 or iters < 1:
quit("FFI_SUBMIT_PER_THREAD and FFI_SUBMIT_ITERS must be >= 1")
let threadCounts = [1, 2, 4, 8]
echo "── sendRequestToFFIThread submit throughput (median of ",
iters, ") ──────"
echo " ", perThread, " submits per producer thread; noop handler (ok(\"ok\"))"
echo ""
echo " ",
alignLeft("threads", 9),
alignLeft("submits", 10),
alignLeft("submit/sec", 16),
alignLeft("vs 1-thread", 12)
var pool: FFIContextPool[BenchLib]
var medianRate: seq[float]
var allPassed = true
for n in threadCounts:
var rates: seq[float]
var sendErrors = 0
var overruns = 0
for _ in 0 ..< iters:
let r = runOnce(pool, n, perThread)
rates.add(r.submitRate)
sendErrors += r.sendErrors
overruns += r.overruns
let med = median(rates)
medianRate.add(med)
echo " ",
alignLeft($n, 9),
alignLeft($(n * perThread), 10),
alignLeft(formatFloat(med, ffDecimal, 0), 16),
alignLeft(formatFloat(med / medianRate[0], ffDecimal, 2) & "x", 12)
if sendErrors != 0:
echo " !! ", sendErrors, " submit errors at ", n, " threads"
allPassed = false
if overruns != 0:
echo " !! ", overruns, " callbacks fired beyond expected at ", n, " threads"
allPassed = false
if not allPassed:
quit("stress test FAILED: see !! lines above")
echo ""
echo " correctness: callback count matched submits exactly (no drops/dupes)."
if gateOn:
enforceScalingGate(medianRate)
when isMainModule:
main()