From c90200de2be38f593a88c01be80911bc293463b5 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Sat, 13 Jun 2026 22:28:58 +0200 Subject: [PATCH] feat(host): FFIContext completion bridge for {.ffiHost.} MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Increment 2: wires the host-call machinery into the running FFI thread so a host answer (delivered from any thread) resolves the chronos Future an awaiting handler is blocked on. - FFICompletionQueue (ffi_host.nim): a GC-free intrusive queue. host_complete pushes c_malloc'd nodes from any thread; the FFI thread drains, copies the payload into GC memory, completes the future by token, and frees the node. - FFIContext gains hostRegistry / pendingTable / completionQueue, init'd and deinit'd alongside the event registry. - completeHostCall parks the answer and fires the EXISTING reqSignal — no second ThreadSignalPtr needed; the loop drains completions every iteration, on the loop thread (chronos single-thread invariant). - On shutdown the loop failAllPendings first, so a handler awaiting a host answer that never arrives can't hang the allFutures(pending) drain. 4 new queue unit tests (10 total) pass under orc+refc; the 19 ffi_context integration tests stay green. Co-Authored-By: Claude Opus 4.8 --- ffi/ffi_context.nim | 41 ++++++++++++- ffi/ffi_host.nim | 95 +++++++++++++++++++++++++++++- tests/unit/test_host_callbacks.nim | 66 +++++++++++++++++++++ 3 files changed, 198 insertions(+), 4 deletions(-) diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 7dcb01b..ec48c3a 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -3,10 +3,10 @@ import std/[atomics, locks, json, tables] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results import - ./ffi_types, ./ffi_events, ./ffi_thread_request, ./internal/ffi_macro, ./logging, - ./cbor_serial + ./ffi_types, ./ffi_events, ./ffi_host, ./ffi_thread_request, + ./internal/ffi_macro, ./logging, ./cbor_serial -export ffi_events +export ffi_events, ffi_host type FFIContext*[T] = object myLib*: ptr T @@ -28,6 +28,12 @@ type FFIContext*[T] = object # blocked event loop cannot hang the caller forever userData*: pointer eventRegistry*: FFIEventRegistry + hostRegistry*: FFIHostRegistry + # host-provided functions a {.ffiHost.} proc dispatches to (roadmap #1) + pendingTable*: FFIPendingTable + # in-flight {.ffiHost.} calls: token -> the chronos Future being awaited + completionQueue: FFICompletionQueue + # host answers parked from any thread, drained + completed on the FFI thread running: Atomic[bool] # To control when the threads are running registeredRequests: ptr Table[cstring, FFIRequestProc] # Pointer to with the registered requests at compile time @@ -86,6 +92,17 @@ proc sendRequestToFFIThread*( ## process proc. return ok() +proc completeHostCall*[T]( + ctx: ptr FFIContext[T], token: uint64, ret: cint, msg: ptr cchar, len: csize_t +) {.raises: [].} = + ## Backs `_host_complete`: the host delivers a `{.ffiHost.}` answer by + ## token. Callable from ANY thread — it only parks the result (GC-free) and + ## wakes the FFI loop via the existing `reqSignal`; the future is completed on + ## the FFI thread when the loop drains the queue. A token with no pending call + ## (late / double completion) is drained and dropped, never a crash. + pushCompletion(ctx[].completionQueue, token, ret, msg, len) + discard ctx.reqSignal.fireSync() + type Foo = object registerReqFFI(WatchdogReq, foo: ptr Foo): proc(): Future[Result[string, string]] {.async.} = @@ -242,6 +259,12 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = reapCompleted() let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds) + + ## Drain host-call answers every iteration (reqSignal is fired by + ## completeHostCall too): completing each awaited Future here, on the loop + ## thread, satisfies chronos's single-thread invariant. + drainCompletions(ctx[].completionQueue, ctx[].pendingTable) + if not gotSignal: continue @@ -264,6 +287,12 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = ## before we exit. Without this, abandoning a future mid-await would ## leak the request allocations (visible to LSan; previously hidden ## because Nim's pool allocator kept the chunks alive in the process). + ## + ## Fail every outstanding {.ffiHost.} call first: a handler awaiting a host + ## answer that never arrives would otherwise make `allFutures(pending)` hang + ## forever. Then drain any answer that raced in during shutdown. + failAllPending(ctx[].pendingTable, "FFIContext shutting down") + drainCompletions(ctx[].completionQueue, ctx[].pendingTable) reapCompleted() if pending.len > 0: try: @@ -280,6 +309,9 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = freeShared(ctx) ctx.lock.deinitLock() deinitEventRegistry(ctx[].eventRegistry) + deinitHostRegistry(ctx[].hostRegistry) + deinitPendingTable(ctx[].pendingTable) + deinitCompletionQueue(ctx[].completionQueue) when defined(gcRefc): ## ThreadSignalPtr.close() is intentionally skipped under --mm:refc. ## @@ -318,6 +350,9 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] = ## is responsible for releasing the slot (freeShared or pool.releaseSlot). ctx.lock.initLock() initEventRegistry(ctx[].eventRegistry) + initHostRegistry(ctx[].hostRegistry) + initPendingTable(ctx[].pendingTable) + initCompletionQueue(ctx[].completionQueue) var success = false defer: diff --git a/ffi/ffi_host.nim b/ffi/ffi_host.nim index b08964f..f21756c 100644 --- a/ffi/ffi_host.nim +++ b/ffi/ffi_host.nim @@ -19,7 +19,7 @@ import std/[locks, tables] import chronos -import ./ffi_types +import ./ffi_types, ./alloc # --------------------------------------------------------------------------- # Host function pointer @@ -171,3 +171,96 @@ proc pendingCount*(tbl: var FFIPendingTable): int {.raises: [].} = withLock tbl.lock: n = tbl.pending.len return n + +# --------------------------------------------------------------------------- +# Cross-thread completion queue +# --------------------------------------------------------------------------- +# +# `_host_complete` runs on the host's thread, but a chronos `Future` can +# only be completed on the FFI (event-loop) thread. So the host's answer is +# parked here and drained on the FFI thread. The producer side is **GC-free** — +# node and payload are `c_malloc`'d (ffiCMalloc / ffiCAllocArray) so no Nim GC +# runs on the foreign thread — mirroring how the rest of the boundary allocates. + +type + CompletionNode = object + token: uint64 + ret: cint + buf: ptr UncheckedArray[byte] ## c_malloc'd copy of the host payload (or nil) + bufLen: int + next: ptr CompletionNode + + FFICompletionQueue* = object + lock: Lock + head: ptr CompletionNode + tail: ptr CompletionNode + +proc initCompletionQueue*(q: var FFICompletionQueue) = + q.lock.initLock() + q.head = nil + q.tail = nil + +proc pushCompletion*( + q: var FFICompletionQueue, token: uint64, ret: cint, msg: ptr cchar, len: csize_t +) {.raises: [].} = + ## Enqueue one host answer. Safe to call from **any** thread; allocates only + ## via c_malloc so it never touches the Nim GC on a foreign thread. The FFI + ## thread copies the payload into a `seq[byte]` and frees the node on drain. + let node = ffiCMalloc(CompletionNode) + node.token = token + node.ret = ret + node.bufLen = int(len) + node.next = nil + if len > 0'u and not msg.isNil(): + node.buf = ffiCAllocArray(byte, int(len)) + copyMem(node.buf, msg, int(len)) + else: + node.buf = nil + withLock q.lock: + if q.tail.isNil(): + q.head = node + else: + q.tail.next = node + q.tail = node + +proc drainCompletions*( + q: var FFICompletionQueue, tbl: var FFIPendingTable +): int {.discardable.} = + ## FFI-thread only. Detaches the whole queue, then for each entry resolves the + ## pending future by token (copying the payload into GC memory here, on the FFI + ## thread) and frees the c_malloc'd node. Returns the number drained. + var head: ptr CompletionNode = nil + withLock q.lock: + head = q.head + q.head = nil + q.tail = nil + + var n = 0 + while not head.isNil(): + let node = head + head = node.next + var b = newSeq[byte](node.bufLen) + if node.bufLen > 0: + copyMem(addr b[0], node.buf, node.bufLen) + discard completePending(tbl, node.token, HostResult(ret: node.ret, bytes: b)) + if not node.buf.isNil(): + ffiCFree(node.buf) + ffiCFree(node) + inc n + return n + +proc deinitCompletionQueue*(q: var FFICompletionQueue) = + ## Frees any still-queued nodes (their futures are handled separately by + ## `failAllPending` on teardown) and releases the lock. + var head: ptr CompletionNode = nil + withLock q.lock: + head = q.head + q.head = nil + q.tail = nil + while not head.isNil(): + let node = head + head = node.next + if not node.buf.isNil(): + ffiCFree(node.buf) + ffiCFree(node) + q.lock.deinitLock() diff --git a/tests/unit/test_host_callbacks.nim b/tests/unit/test_host_callbacks.nim index 5b5a0df..02ac5e6 100644 --- a/tests/unit/test_host_callbacks.nim +++ b/tests/unit/test_host_callbacks.nim @@ -105,3 +105,69 @@ suite "FFIPendingTable": check r.ret == RET_ERR check bytesToStr(r.bytes) == "context shutting down" check tbl.pendingCount == 0 + +# `pushCompletion` takes the raw (msg, len) a host hands across the C ABI. +proc pushStr(q: var FFICompletionQueue, token: uint64, ret: cint, s: string) = + if s.len == 0: + pushCompletion(q, token, ret, nil, 0) + else: + pushCompletion(q, token, ret, cast[ptr cchar](unsafeAddr s[0]), csize_t(s.len)) + +suite "FFICompletionQueue": + test "drain resolves pending futures by token, in FIFO order": + var tbl: FFIPendingTable + var q: FFICompletionQueue + initPendingTable(tbl) + initCompletionQueue(q) + defer: + deinitPendingTable(tbl) + deinitCompletionQueue(q) + + let a = newPending(tbl) # token 1 + let b = newPending(tbl) # token 2 + pushStr(q, a.token, RET_OK, "alpha") + pushStr(q, b.token, RET_ERR, "boom") + + check drainCompletions(q, tbl) == 2 + check bytesToStr(waitFor(a.fut).bytes) == "alpha" + check waitFor(a.fut).ret == RET_OK + check bytesToStr(waitFor(b.fut).bytes) == "boom" + check waitFor(b.fut).ret == RET_ERR + check tbl.pendingCount == 0 + + test "empty payload and empty queue drain cleanly": + var tbl: FFIPendingTable + var q: FFICompletionQueue + initPendingTable(tbl) + initCompletionQueue(q) + defer: + deinitPendingTable(tbl) + deinitCompletionQueue(q) + check drainCompletions(q, tbl) == 0 # nothing queued + let p = newPending(tbl) + pushStr(q, p.token, RET_OK, "") # empty (nil buf) payload + check drainCompletions(q, tbl) == 1 + check waitFor(p.fut).bytes.len == 0 + + test "completion for an unknown token is drained and dropped": + var tbl: FFIPendingTable + var q: FFICompletionQueue + initPendingTable(tbl) + initCompletionQueue(q) + defer: + deinitPendingTable(tbl) + deinitCompletionQueue(q) + pushStr(q, 999'u64, RET_OK, "orphan") # no pending future for this token + check drainCompletions(q, tbl) == 1 # drained (and its buffer freed) + + test "deinit frees still-queued nodes without draining": + var tbl: FFIPendingTable + var q: FFICompletionQueue + initPendingTable(tbl) + initCompletionQueue(q) + let p = newPending(tbl) + pushStr(q, p.token, RET_OK, "leftover") + failAllPending(tbl, "shutdown") # the future is settled separately + deinitPendingTable(tbl) + deinitCompletionQueue(q) # must free the queued node, no leak/crash + check waitFor(p.fut).ret == RET_ERR