mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-21 00:40:16 +00:00
feat(host): FFIContext completion bridge for {.ffiHost.}
Increment 2: wires the host-call machinery into the running FFI thread so a host answer (delivered from any thread) resolves the chronos Future an awaiting handler is blocked on. - FFICompletionQueue (ffi_host.nim): a GC-free intrusive queue. host_complete pushes c_malloc'd nodes from any thread; the FFI thread drains, copies the payload into GC memory, completes the future by token, and frees the node. - FFIContext gains hostRegistry / pendingTable / completionQueue, init'd and deinit'd alongside the event registry. - completeHostCall parks the answer and fires the EXISTING reqSignal — no second ThreadSignalPtr needed; the loop drains completions every iteration, on the loop thread (chronos single-thread invariant). - On shutdown the loop failAllPendings first, so a handler awaiting a host answer that never arrives can't hang the allFutures(pending) drain. 4 new queue unit tests (10 total) pass under orc+refc; the 19 ffi_context integration tests stay green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
c0013b3a8f
commit
c90200de2b
@ -3,10 +3,10 @@
|
||||
import std/[atomics, locks, json, tables]
|
||||
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
|
||||
import
|
||||
./ffi_types, ./ffi_events, ./ffi_thread_request, ./internal/ffi_macro, ./logging,
|
||||
./cbor_serial
|
||||
./ffi_types, ./ffi_events, ./ffi_host, ./ffi_thread_request,
|
||||
./internal/ffi_macro, ./logging, ./cbor_serial
|
||||
|
||||
export ffi_events
|
||||
export ffi_events, ffi_host
|
||||
|
||||
type FFIContext*[T] = object
|
||||
myLib*: ptr T
|
||||
@ -28,6 +28,12 @@ type FFIContext*[T] = object
|
||||
# blocked event loop cannot hang the caller forever
|
||||
userData*: pointer
|
||||
eventRegistry*: FFIEventRegistry
|
||||
hostRegistry*: FFIHostRegistry
|
||||
# host-provided functions a {.ffiHost.} proc dispatches to (roadmap #1)
|
||||
pendingTable*: FFIPendingTable
|
||||
# in-flight {.ffiHost.} calls: token -> the chronos Future being awaited
|
||||
completionQueue: FFICompletionQueue
|
||||
# host answers parked from any thread, drained + completed on the FFI thread
|
||||
running: Atomic[bool] # To control when the threads are running
|
||||
registeredRequests: ptr Table[cstring, FFIRequestProc]
|
||||
# Pointer to with the registered requests at compile time
|
||||
@ -86,6 +92,17 @@ proc sendRequestToFFIThread*(
|
||||
## process proc.
|
||||
return ok()
|
||||
|
||||
proc completeHostCall*[T](
|
||||
ctx: ptr FFIContext[T], token: uint64, ret: cint, msg: ptr cchar, len: csize_t
|
||||
) {.raises: [].} =
|
||||
## Backs `<lib>_host_complete`: the host delivers a `{.ffiHost.}` answer by
|
||||
## token. Callable from ANY thread — it only parks the result (GC-free) and
|
||||
## wakes the FFI loop via the existing `reqSignal`; the future is completed on
|
||||
## the FFI thread when the loop drains the queue. A token with no pending call
|
||||
## (late / double completion) is drained and dropped, never a crash.
|
||||
pushCompletion(ctx[].completionQueue, token, ret, msg, len)
|
||||
discard ctx.reqSignal.fireSync()
|
||||
|
||||
type Foo = object
|
||||
registerReqFFI(WatchdogReq, foo: ptr Foo):
|
||||
proc(): Future[Result[string, string]] {.async.} =
|
||||
@ -242,6 +259,12 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
reapCompleted()
|
||||
|
||||
let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds)
|
||||
|
||||
## Drain host-call answers every iteration (reqSignal is fired by
|
||||
## completeHostCall too): completing each awaited Future here, on the loop
|
||||
## thread, satisfies chronos's single-thread invariant.
|
||||
drainCompletions(ctx[].completionQueue, ctx[].pendingTable)
|
||||
|
||||
if not gotSignal:
|
||||
continue
|
||||
|
||||
@ -264,6 +287,12 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
## before we exit. Without this, abandoning a future mid-await would
|
||||
## leak the request allocations (visible to LSan; previously hidden
|
||||
## because Nim's pool allocator kept the chunks alive in the process).
|
||||
##
|
||||
## Fail every outstanding {.ffiHost.} call first: a handler awaiting a host
|
||||
## answer that never arrives would otherwise make `allFutures(pending)` hang
|
||||
## forever. Then drain any answer that raced in during shutdown.
|
||||
failAllPending(ctx[].pendingTable, "FFIContext shutting down")
|
||||
drainCompletions(ctx[].completionQueue, ctx[].pendingTable)
|
||||
reapCompleted()
|
||||
if pending.len > 0:
|
||||
try:
|
||||
@ -280,6 +309,9 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
freeShared(ctx)
|
||||
ctx.lock.deinitLock()
|
||||
deinitEventRegistry(ctx[].eventRegistry)
|
||||
deinitHostRegistry(ctx[].hostRegistry)
|
||||
deinitPendingTable(ctx[].pendingTable)
|
||||
deinitCompletionQueue(ctx[].completionQueue)
|
||||
when defined(gcRefc):
|
||||
## ThreadSignalPtr.close() is intentionally skipped under --mm:refc.
|
||||
##
|
||||
@ -318,6 +350,9 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## is responsible for releasing the slot (freeShared or pool.releaseSlot).
|
||||
ctx.lock.initLock()
|
||||
initEventRegistry(ctx[].eventRegistry)
|
||||
initHostRegistry(ctx[].hostRegistry)
|
||||
initPendingTable(ctx[].pendingTable)
|
||||
initCompletionQueue(ctx[].completionQueue)
|
||||
|
||||
var success = false
|
||||
defer:
|
||||
|
||||
@ -19,7 +19,7 @@
|
||||
|
||||
import std/[locks, tables]
|
||||
import chronos
|
||||
import ./ffi_types
|
||||
import ./ffi_types, ./alloc
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Host function pointer
|
||||
@ -171,3 +171,96 @@ proc pendingCount*(tbl: var FFIPendingTable): int {.raises: [].} =
|
||||
withLock tbl.lock:
|
||||
n = tbl.pending.len
|
||||
return n
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cross-thread completion queue
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# `<lib>_host_complete` runs on the host's thread, but a chronos `Future` can
|
||||
# only be completed on the FFI (event-loop) thread. So the host's answer is
|
||||
# parked here and drained on the FFI thread. The producer side is **GC-free** —
|
||||
# node and payload are `c_malloc`'d (ffiCMalloc / ffiCAllocArray) so no Nim GC
|
||||
# runs on the foreign thread — mirroring how the rest of the boundary allocates.
|
||||
|
||||
type
|
||||
CompletionNode = object
|
||||
token: uint64
|
||||
ret: cint
|
||||
buf: ptr UncheckedArray[byte] ## c_malloc'd copy of the host payload (or nil)
|
||||
bufLen: int
|
||||
next: ptr CompletionNode
|
||||
|
||||
FFICompletionQueue* = object
|
||||
lock: Lock
|
||||
head: ptr CompletionNode
|
||||
tail: ptr CompletionNode
|
||||
|
||||
proc initCompletionQueue*(q: var FFICompletionQueue) =
|
||||
q.lock.initLock()
|
||||
q.head = nil
|
||||
q.tail = nil
|
||||
|
||||
proc pushCompletion*(
|
||||
q: var FFICompletionQueue, token: uint64, ret: cint, msg: ptr cchar, len: csize_t
|
||||
) {.raises: [].} =
|
||||
## Enqueue one host answer. Safe to call from **any** thread; allocates only
|
||||
## via c_malloc so it never touches the Nim GC on a foreign thread. The FFI
|
||||
## thread copies the payload into a `seq[byte]` and frees the node on drain.
|
||||
let node = ffiCMalloc(CompletionNode)
|
||||
node.token = token
|
||||
node.ret = ret
|
||||
node.bufLen = int(len)
|
||||
node.next = nil
|
||||
if len > 0'u and not msg.isNil():
|
||||
node.buf = ffiCAllocArray(byte, int(len))
|
||||
copyMem(node.buf, msg, int(len))
|
||||
else:
|
||||
node.buf = nil
|
||||
withLock q.lock:
|
||||
if q.tail.isNil():
|
||||
q.head = node
|
||||
else:
|
||||
q.tail.next = node
|
||||
q.tail = node
|
||||
|
||||
proc drainCompletions*(
|
||||
q: var FFICompletionQueue, tbl: var FFIPendingTable
|
||||
): int {.discardable.} =
|
||||
## FFI-thread only. Detaches the whole queue, then for each entry resolves the
|
||||
## pending future by token (copying the payload into GC memory here, on the FFI
|
||||
## thread) and frees the c_malloc'd node. Returns the number drained.
|
||||
var head: ptr CompletionNode = nil
|
||||
withLock q.lock:
|
||||
head = q.head
|
||||
q.head = nil
|
||||
q.tail = nil
|
||||
|
||||
var n = 0
|
||||
while not head.isNil():
|
||||
let node = head
|
||||
head = node.next
|
||||
var b = newSeq[byte](node.bufLen)
|
||||
if node.bufLen > 0:
|
||||
copyMem(addr b[0], node.buf, node.bufLen)
|
||||
discard completePending(tbl, node.token, HostResult(ret: node.ret, bytes: b))
|
||||
if not node.buf.isNil():
|
||||
ffiCFree(node.buf)
|
||||
ffiCFree(node)
|
||||
inc n
|
||||
return n
|
||||
|
||||
proc deinitCompletionQueue*(q: var FFICompletionQueue) =
|
||||
## Frees any still-queued nodes (their futures are handled separately by
|
||||
## `failAllPending` on teardown) and releases the lock.
|
||||
var head: ptr CompletionNode = nil
|
||||
withLock q.lock:
|
||||
head = q.head
|
||||
q.head = nil
|
||||
q.tail = nil
|
||||
while not head.isNil():
|
||||
let node = head
|
||||
head = node.next
|
||||
if not node.buf.isNil():
|
||||
ffiCFree(node.buf)
|
||||
ffiCFree(node)
|
||||
q.lock.deinitLock()
|
||||
|
||||
@ -105,3 +105,69 @@ suite "FFIPendingTable":
|
||||
check r.ret == RET_ERR
|
||||
check bytesToStr(r.bytes) == "context shutting down"
|
||||
check tbl.pendingCount == 0
|
||||
|
||||
# `pushCompletion` takes the raw (msg, len) a host hands across the C ABI.
|
||||
proc pushStr(q: var FFICompletionQueue, token: uint64, ret: cint, s: string) =
|
||||
if s.len == 0:
|
||||
pushCompletion(q, token, ret, nil, 0)
|
||||
else:
|
||||
pushCompletion(q, token, ret, cast[ptr cchar](unsafeAddr s[0]), csize_t(s.len))
|
||||
|
||||
suite "FFICompletionQueue":
|
||||
test "drain resolves pending futures by token, in FIFO order":
|
||||
var tbl: FFIPendingTable
|
||||
var q: FFICompletionQueue
|
||||
initPendingTable(tbl)
|
||||
initCompletionQueue(q)
|
||||
defer:
|
||||
deinitPendingTable(tbl)
|
||||
deinitCompletionQueue(q)
|
||||
|
||||
let a = newPending(tbl) # token 1
|
||||
let b = newPending(tbl) # token 2
|
||||
pushStr(q, a.token, RET_OK, "alpha")
|
||||
pushStr(q, b.token, RET_ERR, "boom")
|
||||
|
||||
check drainCompletions(q, tbl) == 2
|
||||
check bytesToStr(waitFor(a.fut).bytes) == "alpha"
|
||||
check waitFor(a.fut).ret == RET_OK
|
||||
check bytesToStr(waitFor(b.fut).bytes) == "boom"
|
||||
check waitFor(b.fut).ret == RET_ERR
|
||||
check tbl.pendingCount == 0
|
||||
|
||||
test "empty payload and empty queue drain cleanly":
|
||||
var tbl: FFIPendingTable
|
||||
var q: FFICompletionQueue
|
||||
initPendingTable(tbl)
|
||||
initCompletionQueue(q)
|
||||
defer:
|
||||
deinitPendingTable(tbl)
|
||||
deinitCompletionQueue(q)
|
||||
check drainCompletions(q, tbl) == 0 # nothing queued
|
||||
let p = newPending(tbl)
|
||||
pushStr(q, p.token, RET_OK, "") # empty (nil buf) payload
|
||||
check drainCompletions(q, tbl) == 1
|
||||
check waitFor(p.fut).bytes.len == 0
|
||||
|
||||
test "completion for an unknown token is drained and dropped":
|
||||
var tbl: FFIPendingTable
|
||||
var q: FFICompletionQueue
|
||||
initPendingTable(tbl)
|
||||
initCompletionQueue(q)
|
||||
defer:
|
||||
deinitPendingTable(tbl)
|
||||
deinitCompletionQueue(q)
|
||||
pushStr(q, 999'u64, RET_OK, "orphan") # no pending future for this token
|
||||
check drainCompletions(q, tbl) == 1 # drained (and its buffer freed)
|
||||
|
||||
test "deinit frees still-queued nodes without draining":
|
||||
var tbl: FFIPendingTable
|
||||
var q: FFICompletionQueue
|
||||
initPendingTable(tbl)
|
||||
initCompletionQueue(q)
|
||||
let p = newPending(tbl)
|
||||
pushStr(q, p.token, RET_OK, "leftover")
|
||||
failAllPending(tbl, "shutdown") # the future is settled separately
|
||||
deinitPendingTable(tbl)
|
||||
deinitCompletionQueue(q) # must free the queued node, no leak/crash
|
||||
check waitFor(p.fut).ret == RET_ERR
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user