mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-20 08:19:55 +00:00
refactor(host): rename the host-call token to callId
"token" was overloaded (auth tokens, cgo handles, lexer tokens) and didn't say
what it is — a per-call correlation id linking an outgoing {.ffiHost.} call to
the answer that arrives later (possibly from another thread). Renamed across the
runtime (ffi_host / ffi_context), the macro, the exported C ABI (FFIHostFn,
<lib>_host_complete), the Go trampoline, and the tests; regenerated bindings.
The unrelated request-path cgo.Handle result-slot (also informally called a
"token" in go.nim comments) is left as-is — different mechanism.
16 host unit tests + the examples/host_demo Go round-trip stay green.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
3240ac0080
commit
eb62813af5
@ -10,7 +10,7 @@ package host_demo
|
||||
#include <pthread.h>
|
||||
|
||||
extern void host_demoGoEvent(int ret, char* msg, size_t len, void* userData);
|
||||
extern void host_demoHostTrampoline(uint64_t token, char* req, size_t reqLen, void* userData);
|
||||
extern void host_demoHostTrampoline(uint64_t callId, char* req, size_t reqLen, void* userData);
|
||||
static int host_demoRegisterHost(void* ctx, const char* name, void* ud) {
|
||||
return host_demo_register_host_fn(ctx, name, (FFIHostFn)host_demoHostTrampoline, ud);
|
||||
}
|
||||
@ -117,7 +117,7 @@ type hostEntry struct {
|
||||
}
|
||||
|
||||
//export host_demoHostTrampoline
|
||||
func host_demoHostTrampoline(token C.uint64_t, req *C.char, reqLen C.size_t, userData unsafe.Pointer) {
|
||||
func host_demoHostTrampoline(callId C.uint64_t, req *C.char, reqLen C.size_t, userData unsafe.Pointer) {
|
||||
e := cgo.Handle(uintptr(userData)).Value().(hostEntry)
|
||||
reqStr := C.GoStringN(req, C.int(reqLen))
|
||||
go func() {
|
||||
@ -125,11 +125,11 @@ func host_demoHostTrampoline(token C.uint64_t, req *C.char, reqLen C.size_t, use
|
||||
if err != nil {
|
||||
msg := err.Error()
|
||||
cmsg := C.CString(msg)
|
||||
C.host_demo_host_complete(e.ctx, token, C.int(C.RET_ERR), cmsg, C.size_t(len(msg)))
|
||||
C.host_demo_host_complete(e.ctx, callId, C.int(C.RET_ERR), cmsg, C.size_t(len(msg)))
|
||||
C.free(unsafe.Pointer(cmsg))
|
||||
} else {
|
||||
cmsg := C.CString(res)
|
||||
C.host_demo_host_complete(e.ctx, token, C.int(C.RET_OK), cmsg, C.size_t(len(res)))
|
||||
C.host_demo_host_complete(e.ctx, callId, C.int(C.RET_OK), cmsg, C.size_t(len(res)))
|
||||
C.free(unsafe.Pointer(cmsg))
|
||||
}
|
||||
}()
|
||||
|
||||
@ -46,10 +46,10 @@ int host_demo_remove_event_listener(void *ctx, uint64_t listenerId);
|
||||
// --- host callbacks ({.ffiHost.}) — host-implemented functions --------
|
||||
#ifndef NIM_FFI_HOST_FN_T
|
||||
#define NIM_FFI_HOST_FN_T
|
||||
typedef void (*FFIHostFn)(uint64_t token, const char *req, size_t reqLen, void *userData);
|
||||
typedef void (*FFIHostFn)(uint64_t callId, const char *req, size_t reqLen, void *userData);
|
||||
#endif
|
||||
int host_demo_register_host_fn(void *ctx, const char *name, FFIHostFn fn, void *userData);
|
||||
int host_demo_host_complete(void *ctx, uint64_t token, int ret, const char *msg, size_t len);
|
||||
int host_demo_host_complete(void *ctx, uint64_t callId, int ret, const char *msg, size_t len);
|
||||
|
||||
#ifdef __cplusplus
|
||||
} // extern "C"
|
||||
|
||||
@ -117,10 +117,10 @@ int my_timer_remove_event_listener(void *ctx, uint64_t listenerId);
|
||||
// --- host callbacks ({.ffiHost.}) — host-implemented functions --------
|
||||
#ifndef NIM_FFI_HOST_FN_T
|
||||
#define NIM_FFI_HOST_FN_T
|
||||
typedef void (*FFIHostFn)(uint64_t token, const char *req, size_t reqLen, void *userData);
|
||||
typedef void (*FFIHostFn)(uint64_t callId, const char *req, size_t reqLen, void *userData);
|
||||
#endif
|
||||
int my_timer_register_host_fn(void *ctx, const char *name, FFIHostFn fn, void *userData);
|
||||
int my_timer_host_complete(void *ctx, uint64_t token, int ret, const char *msg, size_t len);
|
||||
int my_timer_host_complete(void *ctx, uint64_t callId, int ret, const char *msg, size_t len);
|
||||
|
||||
#ifdef __cplusplus
|
||||
} // extern "C"
|
||||
|
||||
@ -117,10 +117,10 @@ int my_timer_remove_event_listener(void *ctx, uint64_t listenerId);
|
||||
// --- host callbacks ({.ffiHost.}) — host-implemented functions --------
|
||||
#ifndef NIM_FFI_HOST_FN_T
|
||||
#define NIM_FFI_HOST_FN_T
|
||||
typedef void (*FFIHostFn)(uint64_t token, const char *req, size_t reqLen, void *userData);
|
||||
typedef void (*FFIHostFn)(uint64_t callId, const char *req, size_t reqLen, void *userData);
|
||||
#endif
|
||||
int my_timer_register_host_fn(void *ctx, const char *name, FFIHostFn fn, void *userData);
|
||||
int my_timer_host_complete(void *ctx, uint64_t token, int ret, const char *msg, size_t len);
|
||||
int my_timer_host_complete(void *ctx, uint64_t callId, int ret, const char *msg, size_t len);
|
||||
|
||||
#ifdef __cplusplus
|
||||
} // extern "C"
|
||||
|
||||
@ -177,7 +177,7 @@ proc generateCHeader*(
|
||||
lines.add("")
|
||||
|
||||
# Host callbacks ({.ffiHost.}): the host registers an implementation, the
|
||||
# library invokes it with a token + raw request, and the host answers by token
|
||||
# library invokes it with a callId + raw request, and the host answers by callId
|
||||
# (from any thread) via host_complete. Always exported, like the event ABI.
|
||||
lines.add(
|
||||
"// --- host callbacks ({.ffiHost.}) — host-implemented functions --------"
|
||||
@ -185,7 +185,7 @@ proc generateCHeader*(
|
||||
lines.add("#ifndef NIM_FFI_HOST_FN_T")
|
||||
lines.add("#define NIM_FFI_HOST_FN_T")
|
||||
lines.add(
|
||||
"typedef void (*FFIHostFn)(uint64_t token, const char *req, size_t reqLen, void *userData);"
|
||||
"typedef void (*FFIHostFn)(uint64_t callId, const char *req, size_t reqLen, void *userData);"
|
||||
)
|
||||
lines.add("#endif")
|
||||
lines.add(
|
||||
@ -194,7 +194,7 @@ proc generateCHeader*(
|
||||
)
|
||||
lines.add(
|
||||
"int " & libName &
|
||||
"_host_complete(void *ctx, uint64_t token, int ret, const char *msg, size_t len);"
|
||||
"_host_complete(void *ctx, uint64_t callId, int ret, const char *msg, size_t len);"
|
||||
)
|
||||
lines.add("")
|
||||
lines.add("#ifdef __cplusplus")
|
||||
|
||||
@ -377,7 +377,7 @@ proc generateGoFile*(
|
||||
if hosts.len > 0:
|
||||
L.add(
|
||||
"extern void " & libName &
|
||||
"HostTrampoline(uint64_t token, char* req, size_t reqLen, void* userData);"
|
||||
"HostTrampoline(uint64_t callId, char* req, size_t reqLen, void* userData);"
|
||||
)
|
||||
L.add(
|
||||
"static int " & libName &
|
||||
@ -582,7 +582,7 @@ proc generateGoFile*(
|
||||
# ---- host callbacks ({.ffiHost.}) ----------------------------------------
|
||||
# One exported trampoline serves all host fns; the cgo.Handle in userData
|
||||
# selects which Go closure. The closure runs on a fresh goroutine so the FFI
|
||||
# thread is never blocked (the non-blocking contract), then answers by token.
|
||||
# thread is never blocked (the non-blocking contract), then answers by callId.
|
||||
if hosts.len > 0:
|
||||
L.add("type hostEntry struct {")
|
||||
L.add("\tctx unsafe.Pointer")
|
||||
@ -592,7 +592,7 @@ proc generateGoFile*(
|
||||
L.add("//export " & libName & "HostTrampoline")
|
||||
L.add(
|
||||
"func " & libName &
|
||||
"HostTrampoline(token C.uint64_t, req *C.char, reqLen C.size_t, userData unsafe.Pointer) {"
|
||||
"HostTrampoline(callId C.uint64_t, req *C.char, reqLen C.size_t, userData unsafe.Pointer) {"
|
||||
)
|
||||
L.add("\te := cgo.Handle(uintptr(userData)).Value().(hostEntry)")
|
||||
L.add("\treqStr := C.GoStringN(req, C.int(reqLen))")
|
||||
@ -603,14 +603,14 @@ proc generateGoFile*(
|
||||
L.add("\t\t\tcmsg := C.CString(msg)")
|
||||
L.add(
|
||||
"\t\t\tC." & libName &
|
||||
"_host_complete(e.ctx, token, C.int(C.RET_ERR), cmsg, C.size_t(len(msg)))"
|
||||
"_host_complete(e.ctx, callId, C.int(C.RET_ERR), cmsg, C.size_t(len(msg)))"
|
||||
)
|
||||
L.add("\t\t\tC.free(unsafe.Pointer(cmsg))")
|
||||
L.add("\t\t} else {")
|
||||
L.add("\t\t\tcmsg := C.CString(res)")
|
||||
L.add(
|
||||
"\t\t\tC." & libName &
|
||||
"_host_complete(e.ctx, token, C.int(C.RET_OK), cmsg, C.size_t(len(res)))"
|
||||
"_host_complete(e.ctx, callId, C.int(C.RET_OK), cmsg, C.size_t(len(res)))"
|
||||
)
|
||||
L.add("\t\t\tC.free(unsafe.Pointer(cmsg))")
|
||||
L.add("\t\t}")
|
||||
|
||||
@ -31,7 +31,7 @@ type FFIContext*[T] = object
|
||||
hostRegistry*: FFIHostRegistry
|
||||
# host-provided functions a {.ffiHost.} proc dispatches to (roadmap #1)
|
||||
pendingTable*: FFIPendingTable
|
||||
# in-flight {.ffiHost.} calls: token -> the chronos Future being awaited
|
||||
# in-flight {.ffiHost.} calls: callId -> the chronos Future being awaited
|
||||
completionQueue: FFICompletionQueue
|
||||
# host answers parked from any thread, drained + completed on the FFI thread
|
||||
running: Atomic[bool] # To control when the threads are running
|
||||
@ -93,14 +93,14 @@ proc sendRequestToFFIThread*(
|
||||
return ok()
|
||||
|
||||
proc completeHostCall*[T](
|
||||
ctx: ptr FFIContext[T], token: uint64, ret: cint, msg: ptr cchar, len: csize_t
|
||||
ctx: ptr FFIContext[T], callId: uint64, ret: cint, msg: ptr cchar, len: csize_t
|
||||
) {.raises: [].} =
|
||||
## Backs `<lib>_host_complete`: the host delivers a `{.ffiHost.}` answer by
|
||||
## token. Callable from ANY thread — it only parks the result (GC-free) and
|
||||
## callId. Callable from ANY thread — it only parks the result (GC-free) and
|
||||
## wakes the FFI loop via the existing `reqSignal`; the future is completed on
|
||||
## the FFI thread when the loop drains the queue. A token with no pending call
|
||||
## the FFI thread when the loop drains the queue. A callId with no pending call
|
||||
## (late / double completion) is drained and dropped, never a crash.
|
||||
pushCompletion(ctx[].completionQueue, token, ret, msg, len)
|
||||
pushCompletion(ctx[].completionQueue, callId, ret, msg, len)
|
||||
discard ctx.reqSignal.fireSync()
|
||||
|
||||
type Foo = object
|
||||
|
||||
@ -8,9 +8,9 @@
|
||||
## 1. `FFIHostRegistry` — maps a wire name (e.g. "fetch_profile") to the host's
|
||||
## registered function pointer + userData. A missing entry is a normal,
|
||||
## non-fatal outcome (the imported proc resolves to an error), never a crash.
|
||||
## 2. `FFIPendingTable` — maps a monotonic `token` to the chronos `Future` an
|
||||
## 2. `FFIPendingTable` — maps a monotonic `callId` to the chronos `Future` an
|
||||
## awaiting `{.ffiHost.}` proc is blocked on. The host answers later (on any
|
||||
## thread) by `token`; the FFI thread drains and completes the future.
|
||||
## thread) by `callId`; the FFI thread drains and completes the future.
|
||||
##
|
||||
## Both structures are lock-guarded so a host thread (registering / completing)
|
||||
## and the FFI thread (looking up / completing) can touch them concurrently.
|
||||
@ -26,14 +26,14 @@ import ./ffi_types, ./alloc
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
type FFIHostFn* = proc(
|
||||
token: uint64, req: ptr cchar, reqLen: csize_t, userData: pointer
|
||||
callId: uint64, req: ptr cchar, reqLen: csize_t, userData: pointer
|
||||
) {.cdecl, gcsafe, raises: [].}
|
||||
## A host-implemented function. `req`/`reqLen` carry the marshaled request
|
||||
## (valid only for the duration of the call — the host copies what it needs).
|
||||
## The host answers asynchronously via `<lib>_host_complete(ctx, token, …)`.
|
||||
## The host answers asynchronously via `<lib>_host_complete(ctx, callId, …)`.
|
||||
|
||||
type HostResult* = object
|
||||
## The raw outcome the host delivered for one token: a return code plus the
|
||||
## The raw outcome the host delivered for one callId: a return code plus the
|
||||
## response bytes (native POD or CBOR — decoded by the awaiting proc).
|
||||
ret*: cint
|
||||
bytes*: seq[byte]
|
||||
@ -118,7 +118,7 @@ proc clearHostFns*(reg: var FFIHostRegistry) {.raises: [].} =
|
||||
|
||||
type FFIPendingTable* = object
|
||||
lock: Lock
|
||||
nextToken: uint64 ## Monotonic; 0 is reserved as "invalid", tokens start at 1.
|
||||
nextCallId: uint64 ## Monotonic; 0 is reserved as "invalid", callIds start at 1.
|
||||
pending: Table[uint64, Future[HostResult]]
|
||||
|
||||
# Set by the FFI thread at startup (see ffi_context.ffiThreadBody) so the body a
|
||||
@ -129,39 +129,39 @@ var ffiCurrentPendingTable* {.threadvar.}: ptr FFIPendingTable
|
||||
|
||||
proc initPendingTable*(tbl: var FFIPendingTable) =
|
||||
tbl.lock.initLock()
|
||||
tbl.nextToken = 0'u64
|
||||
tbl.nextCallId = 0'u64
|
||||
tbl.pending = initTable[uint64, Future[HostResult]]()
|
||||
|
||||
proc deinitPendingTable*(tbl: var FFIPendingTable) =
|
||||
tbl.lock.deinitLock()
|
||||
tbl.pending = default(Table[uint64, Future[HostResult]])
|
||||
tbl.nextToken = 0'u64
|
||||
tbl.nextCallId = 0'u64
|
||||
|
||||
proc newPending*(
|
||||
tbl: var FFIPendingTable
|
||||
): tuple[token: uint64, fut: Future[HostResult]] =
|
||||
## Allocates a token and registers a fresh, uncompleted future under it. The
|
||||
## `{.ffiHost.}` proc awaits the returned future; the host answers by token.
|
||||
): tuple[callId: uint64, fut: Future[HostResult]] =
|
||||
## Allocates a callId and registers a fresh, uncompleted future under it. The
|
||||
## `{.ffiHost.}` proc awaits the returned future; the host answers by callId.
|
||||
let fut = newFuture[HostResult]("ffiHostCall")
|
||||
var assigned: uint64 = 0
|
||||
withLock tbl.lock:
|
||||
tbl.nextToken.inc()
|
||||
assigned = tbl.nextToken
|
||||
tbl.nextCallId.inc()
|
||||
assigned = tbl.nextCallId
|
||||
tbl.pending[assigned] = fut
|
||||
return (assigned, fut)
|
||||
|
||||
proc completePending*(
|
||||
tbl: var FFIPendingTable, token: uint64, res: HostResult
|
||||
tbl: var FFIPendingTable, callId: uint64, res: HostResult
|
||||
): bool =
|
||||
## Completes and removes the future for `token`. Returns false for an unknown
|
||||
## or already-completed token — a late / double completion is dropped, not a
|
||||
## Completes and removes the future for `callId`. Returns false for an unknown
|
||||
## or already-completed callId — a late / double completion is dropped, not a
|
||||
## crash. MUST be called on the FFI (event-loop) thread: it touches the
|
||||
## chronos future.
|
||||
var fut: Future[HostResult] = nil
|
||||
withLock tbl.lock:
|
||||
if tbl.pending.hasKey(token):
|
||||
fut = tbl.pending.getOrDefault(token)
|
||||
tbl.pending.del(token)
|
||||
if tbl.pending.hasKey(callId):
|
||||
fut = tbl.pending.getOrDefault(callId)
|
||||
tbl.pending.del(callId)
|
||||
if fut.isNil() or fut.finished():
|
||||
return false
|
||||
fut.complete(res)
|
||||
@ -198,7 +198,7 @@ proc pendingCount*(tbl: var FFIPendingTable): int {.raises: [].} =
|
||||
|
||||
type
|
||||
CompletionNode = object
|
||||
token: uint64
|
||||
callId: uint64
|
||||
ret: cint
|
||||
buf: ptr UncheckedArray[byte] ## c_malloc'd copy of the host payload (or nil)
|
||||
bufLen: int
|
||||
@ -215,13 +215,13 @@ proc initCompletionQueue*(q: var FFICompletionQueue) =
|
||||
q.tail = nil
|
||||
|
||||
proc pushCompletion*(
|
||||
q: var FFICompletionQueue, token: uint64, ret: cint, msg: ptr cchar, len: csize_t
|
||||
q: var FFICompletionQueue, callId: uint64, ret: cint, msg: ptr cchar, len: csize_t
|
||||
) {.raises: [].} =
|
||||
## Enqueue one host answer. Safe to call from **any** thread; allocates only
|
||||
## via c_malloc so it never touches the Nim GC on a foreign thread. The FFI
|
||||
## thread copies the payload into a `seq[byte]` and frees the node on drain.
|
||||
let node = ffiCMalloc(CompletionNode)
|
||||
node.token = token
|
||||
node.callId = callId
|
||||
node.ret = ret
|
||||
node.bufLen = int(len)
|
||||
node.next = nil
|
||||
@ -241,7 +241,7 @@ proc drainCompletions*(
|
||||
q: var FFICompletionQueue, tbl: var FFIPendingTable
|
||||
): int {.discardable.} =
|
||||
## FFI-thread only. Detaches the whole queue, then for each entry resolves the
|
||||
## pending future by token (copying the payload into GC memory here, on the FFI
|
||||
## pending future by callId (copying the payload into GC memory here, on the FFI
|
||||
## thread) and frees the c_malloc'd node. Returns the number drained.
|
||||
var head: ptr CompletionNode = nil
|
||||
withLock q.lock:
|
||||
@ -256,7 +256,7 @@ proc drainCompletions*(
|
||||
var b = newSeq[byte](node.bufLen)
|
||||
if node.bufLen > 0:
|
||||
copyMem(addr b[0], node.buf, node.bufLen)
|
||||
discard completePending(tbl, node.token, HostResult(ret: node.ret, bytes: b))
|
||||
discard completePending(tbl, node.callId, HostResult(ret: node.ret, bytes: b))
|
||||
if not node.buf.isNil():
|
||||
ffiCFree(node.buf)
|
||||
ffiCFree(node)
|
||||
|
||||
@ -220,7 +220,7 @@ macro declareLibrary*(libraryName: static[string], libType: untyped): untyped =
|
||||
)
|
||||
|
||||
# --- {libraryName}_host_complete ----------------------------------------
|
||||
# The host delivers a {.ffiHost.} answer by token. Callable from ANY thread —
|
||||
# The host delivers a {.ffiHost.} answer by callId. Callable from ANY thread —
|
||||
# it parks the result and wakes the FFI loop, which completes the awaited
|
||||
# future. `retCode` (not `ret`) avoids colliding with chronos templates under
|
||||
# quote injection, like `listenerId` above.
|
||||
@ -230,7 +230,7 @@ macro declareLibrary*(libraryName: static[string], libType: untyped): untyped =
|
||||
if isNil(ctx):
|
||||
echo `completeErr`
|
||||
return cint(1)
|
||||
completeHostCall(ctx, token, retCode, msg, msgLen)
|
||||
completeHostCall(ctx, callId, retCode, msg, msgLen)
|
||||
return cint(0)
|
||||
|
||||
stmts.add(
|
||||
@ -239,7 +239,7 @@ macro declareLibrary*(libraryName: static[string], libType: untyped): untyped =
|
||||
params = @[
|
||||
ident("cint"),
|
||||
newIdentDefs(ident("ctx"), ctxType),
|
||||
newIdentDefs(ident("token"), ident("uint64")),
|
||||
newIdentDefs(ident("callId"), ident("uint64")),
|
||||
newIdentDefs(ident("retCode"), ident("cint")),
|
||||
newIdentDefs(ident("msg"), nnkPtrTy.newTree(ident("cchar"))),
|
||||
newIdentDefs(ident("msgLen"), ident("csize_t")),
|
||||
|
||||
@ -1932,7 +1932,7 @@ macro ffiHost*(prc: untyped): untyped =
|
||||
## Declares a function the *host* implements, which a `{.ffi.}` handler can
|
||||
## call and `await` (the inverse of `{.ffi.}`). The annotated proc has an empty
|
||||
## body; the macro fills it with the dispatch: look up the host's registered
|
||||
## implementation, hand it the marshaled request + a token, and await the
|
||||
## implementation, hand it the marshaled request + a callId, and await the
|
||||
## answer the host delivers (via `<lib>_host_complete`) on the FFI thread.
|
||||
##
|
||||
## First slice — raw (zero-serialization) ABI, exactly one `string` parameter,
|
||||
@ -1999,7 +1999,7 @@ macro ffiHost*(prc: untyped): untyped =
|
||||
)
|
||||
|
||||
# The generated async body: resolve the thread-local host context, look up the
|
||||
# registered fn, allocate a pending token, invoke the host with the raw request
|
||||
# registered fn, allocate a pending callId, invoke the host with the raw request
|
||||
# bytes, and await the answer. The host fn is called synchronously here (before
|
||||
# the await) while `argName` is still alive, honouring the "req valid only for
|
||||
# the call" contract.
|
||||
@ -2011,14 +2011,14 @@ macro ffiHost*(prc: untyped): untyped =
|
||||
let ffiHit = lookupHostFn(ffiReg[], `wireNameLit`)
|
||||
if not ffiHit.found:
|
||||
return err("ffiHost: host fn '" & `wireNameLit` & "' not registered")
|
||||
let (ffiTok, ffiFut) = newPending(ffiTbl[])
|
||||
let (ffiCallId, ffiFut) = newPending(ffiTbl[])
|
||||
if `argName`.len > 0:
|
||||
ffiHit.fn(
|
||||
ffiTok, cast[ptr cchar](unsafeAddr `argName`[0]), csize_t(`argName`.len),
|
||||
ffiCallId, cast[ptr cchar](unsafeAddr `argName`[0]), csize_t(`argName`.len),
|
||||
ffiHit.userData,
|
||||
)
|
||||
else:
|
||||
ffiHit.fn(ffiTok, nil, 0, ffiHit.userData)
|
||||
ffiHit.fn(ffiCallId, nil, 0, ffiHit.userData)
|
||||
let ffiRes = await ffiFut
|
||||
if ffiRes.ret != RET_OK:
|
||||
return err(resultText(ffiRes))
|
||||
|
||||
@ -35,31 +35,31 @@ registerReqFFI(HostCallRequest, lib: ptr TestLib):
|
||||
|
||||
# --- the host, answering on a worker thread --------------------------------
|
||||
# The host fn runs on the FFI thread, so it must NOT block: it copies the
|
||||
# request and hands (token, key) to a worker via a channel, then returns. The
|
||||
# request and hands (callId, key) to a worker via a channel, then returns. The
|
||||
# worker answers later through the exported <lib>_host_complete.
|
||||
var gHostJobs: Channel[tuple[token: uint64, key: string]]
|
||||
var gHostJobs: Channel[tuple[callId: uint64, key: string]]
|
||||
var gCtx: Atomic[pointer]
|
||||
|
||||
proc lookupHostFnImpl(
|
||||
token: uint64, req: ptr cchar, reqLen: csize_t, userData: pointer
|
||||
callId: uint64, req: ptr cchar, reqLen: csize_t, userData: pointer
|
||||
) {.cdecl, gcsafe, raises: [].} =
|
||||
var key = newString(int(reqLen))
|
||||
if reqLen > 0'u:
|
||||
copyMem(addr key[0], req, int(reqLen))
|
||||
try:
|
||||
gHostJobs.send((token: token, key: key))
|
||||
gHostJobs.send((callId: callId, key: key))
|
||||
except Exception:
|
||||
discard
|
||||
|
||||
proc hostWorker(_: pointer) {.thread.} =
|
||||
while true:
|
||||
let job = gHostJobs.recv()
|
||||
if job.token == 0'u64: # sentinel: shut down
|
||||
if job.callId == 0'u64: # sentinel: shut down
|
||||
break
|
||||
let answer = "reply:" & job.key
|
||||
completeHostCall(
|
||||
cast[ptr FFIContext[TestLib]](gCtx.load()),
|
||||
job.token,
|
||||
job.callId,
|
||||
RET_OK,
|
||||
cast[ptr cchar](unsafeAddr answer[0]),
|
||||
csize_t(answer.len),
|
||||
@ -129,7 +129,7 @@ suite "ffiHost end-to-end (cross-thread)":
|
||||
check cborDecode(callbackBytes(d), string).value == "got:reply:session"
|
||||
|
||||
# Shut the worker down, then tear the context down.
|
||||
gHostJobs.send((token: 0'u64, key: ""))
|
||||
gHostJobs.send((callId: 0'u64, key: ""))
|
||||
joinThread(worker)
|
||||
d.cond.deinitCond()
|
||||
d.lock.deinitLock()
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
## (see docs/design-host-callbacks.md).
|
||||
##
|
||||
## These exercise the data structures directly: no FFI thread, no macro, no
|
||||
## completion bridge. They pin down registration, lookup, token allocation, and
|
||||
## completion bridge. They pin down registration, lookup, callId allocation, and
|
||||
## future completion semantics in isolation.
|
||||
|
||||
import std/locks
|
||||
@ -14,7 +14,7 @@ import ffi
|
||||
# A host fn does nothing here — we only assert it round-trips through the
|
||||
# registry. `userData` carries a tag we read back to prove identity.
|
||||
proc noopHostFn(
|
||||
token: uint64, req: ptr cchar, reqLen: csize_t, userData: pointer
|
||||
callId: uint64, req: ptr cchar, reqLen: csize_t, userData: pointer
|
||||
) {.cdecl, gcsafe, raises: [].} =
|
||||
discard
|
||||
|
||||
@ -58,15 +58,15 @@ suite "FFIHostRegistry":
|
||||
check not lookupHostFn(reg, "b").found
|
||||
|
||||
suite "FFIPendingTable":
|
||||
test "tokens are monotonic and start at 1":
|
||||
test "callIds are monotonic and start at 1":
|
||||
var tbl: FFIPendingTable
|
||||
initPendingTable(tbl)
|
||||
defer:
|
||||
deinitPendingTable(tbl)
|
||||
let a = newPending(tbl)
|
||||
let b = newPending(tbl)
|
||||
check a.token == 1'u64
|
||||
check b.token == 2'u64
|
||||
check a.callId == 1'u64
|
||||
check b.callId == 2'u64
|
||||
check tbl.pendingCount == 2
|
||||
|
||||
test "completePending resolves the awaiting future and removes it":
|
||||
@ -75,7 +75,7 @@ suite "FFIPendingTable":
|
||||
defer:
|
||||
deinitPendingTable(tbl)
|
||||
let p = newPending(tbl)
|
||||
check completePending(tbl, p.token, okResult(@[byte 1, 2, 3]))
|
||||
check completePending(tbl, p.callId, okResult(@[byte 1, 2, 3]))
|
||||
check p.fut.finished()
|
||||
check waitFor(p.fut).ret == RET_OK
|
||||
check waitFor(p.fut).bytes == @[byte 1, 2, 3]
|
||||
@ -88,8 +88,8 @@ suite "FFIPendingTable":
|
||||
deinitPendingTable(tbl)
|
||||
check not completePending(tbl, 999'u64, okResult(@[]))
|
||||
let p = newPending(tbl)
|
||||
check completePending(tbl, p.token, okResult(@[]))
|
||||
check not completePending(tbl, p.token, okResult(@[])) # second time: dropped
|
||||
check completePending(tbl, p.callId, okResult(@[]))
|
||||
check not completePending(tbl, p.callId, okResult(@[])) # second time: dropped
|
||||
|
||||
test "failAllPending errors every outstanding future":
|
||||
var tbl: FFIPendingTable
|
||||
@ -107,14 +107,14 @@ suite "FFIPendingTable":
|
||||
check tbl.pendingCount == 0
|
||||
|
||||
# `pushCompletion` takes the raw (msg, len) a host hands across the C ABI.
|
||||
proc pushStr(q: var FFICompletionQueue, token: uint64, ret: cint, s: string) =
|
||||
proc pushStr(q: var FFICompletionQueue, callId: uint64, ret: cint, s: string) =
|
||||
if s.len == 0:
|
||||
pushCompletion(q, token, ret, nil, 0)
|
||||
pushCompletion(q, callId, ret, nil, 0)
|
||||
else:
|
||||
pushCompletion(q, token, ret, cast[ptr cchar](unsafeAddr s[0]), csize_t(s.len))
|
||||
pushCompletion(q, callId, ret, cast[ptr cchar](unsafeAddr s[0]), csize_t(s.len))
|
||||
|
||||
suite "FFICompletionQueue":
|
||||
test "drain resolves pending futures by token, in FIFO order":
|
||||
test "drain resolves pending futures by callId, in FIFO order":
|
||||
var tbl: FFIPendingTable
|
||||
var q: FFICompletionQueue
|
||||
initPendingTable(tbl)
|
||||
@ -123,10 +123,10 @@ suite "FFICompletionQueue":
|
||||
deinitPendingTable(tbl)
|
||||
deinitCompletionQueue(q)
|
||||
|
||||
let a = newPending(tbl) # token 1
|
||||
let b = newPending(tbl) # token 2
|
||||
pushStr(q, a.token, RET_OK, "alpha")
|
||||
pushStr(q, b.token, RET_ERR, "boom")
|
||||
let a = newPending(tbl) # callId 1
|
||||
let b = newPending(tbl) # callId 2
|
||||
pushStr(q, a.callId, RET_OK, "alpha")
|
||||
pushStr(q, b.callId, RET_ERR, "boom")
|
||||
|
||||
check drainCompletions(q, tbl) == 2
|
||||
check bytesToStr(waitFor(a.fut).bytes) == "alpha"
|
||||
@ -145,11 +145,11 @@ suite "FFICompletionQueue":
|
||||
deinitCompletionQueue(q)
|
||||
check drainCompletions(q, tbl) == 0 # nothing queued
|
||||
let p = newPending(tbl)
|
||||
pushStr(q, p.token, RET_OK, "") # empty (nil buf) payload
|
||||
pushStr(q, p.callId, RET_OK, "") # empty (nil buf) payload
|
||||
check drainCompletions(q, tbl) == 1
|
||||
check waitFor(p.fut).bytes.len == 0
|
||||
|
||||
test "completion for an unknown token is drained and dropped":
|
||||
test "completion for an unknown callId is drained and dropped":
|
||||
var tbl: FFIPendingTable
|
||||
var q: FFICompletionQueue
|
||||
initPendingTable(tbl)
|
||||
@ -157,7 +157,7 @@ suite "FFICompletionQueue":
|
||||
defer:
|
||||
deinitPendingTable(tbl)
|
||||
deinitCompletionQueue(q)
|
||||
pushStr(q, 999'u64, RET_OK, "orphan") # no pending future for this token
|
||||
pushStr(q, 999'u64, RET_OK, "orphan") # no pending future for this callId
|
||||
check drainCompletions(q, tbl) == 1 # drained (and its buffer freed)
|
||||
|
||||
test "deinit frees still-queued nodes without draining":
|
||||
@ -166,7 +166,7 @@ suite "FFICompletionQueue":
|
||||
initPendingTable(tbl)
|
||||
initCompletionQueue(q)
|
||||
let p = newPending(tbl)
|
||||
pushStr(q, p.token, RET_OK, "leftover")
|
||||
pushStr(q, p.callId, RET_OK, "leftover")
|
||||
failAllPending(tbl, "shutdown") # the future is settled separately
|
||||
deinitPendingTable(tbl)
|
||||
deinitCompletionQueue(q) # must free the queued node, no leak/crash
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user