mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-25 02:39:47 +00:00
use fixed array of ctx to avoid consuming all fds (#14)
This commit is contained in:
parent
81c62c263e
commit
6d31fa30bd
@ -72,7 +72,7 @@ proc nimtimerComplex*(
|
||||
# In a multi-file library, import all sub-modules first and call genBindings()
|
||||
# once, at the bottom of the top-level compilation-root file.
|
||||
# This call is a no-op unless -d:ffiGenBindings is passed to the compiler.
|
||||
genBindings() # reads -d:ffiOutputDir, -d:ffiNimSrcRelPath, -d:targetLang from compile flags
|
||||
genBindings()
|
||||
|
||||
proc nimtimer_destroy*(ctx: pointer) {.dynlib, exportc, cdecl, raises: [].} =
|
||||
## Tears down the FFI context created by nimtimer_create.
|
||||
|
||||
5
ffi.nim
5
ffi.nim
@ -2,9 +2,10 @@ import std/[atomics, tables]
|
||||
import chronos, chronicles
|
||||
import
|
||||
ffi/internal/[ffi_library, ffi_macro],
|
||||
ffi/[alloc, ffi_types, ffi_context, ffi_thread_request, serial]
|
||||
ffi/[alloc, ffi_types, ffi_context, ffi_context_pool, ffi_thread_request, serial]
|
||||
|
||||
export atomics, tables
|
||||
export chronos, chronicles
|
||||
export
|
||||
atomics, alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_thread_request, serial
|
||||
atomics, alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_context_pool,
|
||||
ffi_thread_request, serial
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
{.pragma: callback, cdecl, raises: [], gcsafe.}
|
||||
{.passc: "-fPIC".}
|
||||
|
||||
import std/[options, atomics, os, net, locks, json, tables, sets]
|
||||
import std/[atomics, locks, json, tables]
|
||||
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
|
||||
import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro, ./logging
|
||||
|
||||
@ -41,30 +41,6 @@ var ffiCurrentCallbackState* {.threadvar.}: ptr FFICallbackState
|
||||
|
||||
const git_version* {.strdefine.} = "n/a"
|
||||
|
||||
var contextRegistry = initHashSet[pointer]()
|
||||
var contextRegistryLock: Lock
|
||||
contextRegistryLock.initLock()
|
||||
|
||||
proc registerCtx(ctx: pointer) =
|
||||
{.cast(gcsafe).}:
|
||||
contextRegistryLock.acquire()
|
||||
defer: contextRegistryLock.release()
|
||||
contextRegistry.incl(ctx)
|
||||
|
||||
proc unregisterCtx(ctx: pointer) =
|
||||
{.cast(gcsafe).}:
|
||||
contextRegistryLock.acquire()
|
||||
defer: contextRegistryLock.release()
|
||||
contextRegistry.excl(ctx)
|
||||
|
||||
proc isValidCtx*(ctx: pointer): bool =
|
||||
## Returns true only if ctx was created by createFFIContext and not yet destroyed.
|
||||
## Rejects nil, offset-invalid, and dangling pointers at the API boundary.
|
||||
{.cast(gcsafe).}:
|
||||
contextRegistryLock.acquire()
|
||||
defer: contextRegistryLock.release()
|
||||
return contextRegistry.contains(ctx)
|
||||
|
||||
template callEventCallback*(ctx: ptr FFIContext, eventName: string, body: untyped) =
|
||||
if isNil(ctx[].callbackState.callback):
|
||||
chronicles.error eventName & " - eventCallback is nil"
|
||||
@ -74,14 +50,20 @@ template callEventCallback*(ctx: ptr FFIContext, eventName: string, body: untype
|
||||
try:
|
||||
let event = body
|
||||
cast[FFICallBack](ctx[].callbackState.callback)(
|
||||
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].callbackState.userData
|
||||
RET_OK,
|
||||
unsafeAddr event[0],
|
||||
cast[csize_t](len(event)),
|
||||
ctx[].callbackState.userData,
|
||||
)
|
||||
except Exception, CatchableError:
|
||||
let msg =
|
||||
"Exception " & eventName & " when calling 'eventCallBack': " &
|
||||
getCurrentExceptionMsg()
|
||||
cast[FFICallBack](ctx[].callbackState.callback)(
|
||||
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].callbackState.userData
|
||||
RET_ERR,
|
||||
unsafeAddr msg[0],
|
||||
cast[csize_t](len(msg)),
|
||||
ctx[].callbackState.userData,
|
||||
)
|
||||
|
||||
template dispatchFfiEvent*(eventName: string, body: untyped) =
|
||||
@ -99,8 +81,7 @@ template dispatchFfiEvent*(eventName: string, body: untyped) =
|
||||
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ffiState[].userData
|
||||
)
|
||||
except Exception, CatchableError:
|
||||
let msg =
|
||||
"Exception dispatching " & eventName & ": " & getCurrentExceptionMsg()
|
||||
let msg = "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg()
|
||||
cast[FFICallBack](ffiState[].callback)(
|
||||
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ffiState[].userData
|
||||
)
|
||||
@ -108,9 +89,6 @@ template dispatchFfiEvent*(eventName: string, body: untyped) =
|
||||
proc sendRequestToFFIThread*(
|
||||
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
|
||||
): Result[void, string] =
|
||||
if not isValidCtx(cast[pointer](ctx)):
|
||||
deleteRequest(ffiRequest)
|
||||
return err("ctx is not a valid FFI context")
|
||||
ctx.lock.acquire()
|
||||
# This lock is only necessary while we use a SP Channel and while the signalling
|
||||
# between threads assumes that there aren't concurrent requests.
|
||||
@ -229,7 +207,9 @@ proc processRequest[T](
|
||||
try:
|
||||
await retFut
|
||||
except AsyncError as exc:
|
||||
Result[string, string].err("Async error in processRequest for " & reqId & ": " & exc.msg)
|
||||
Result[string, string].err(
|
||||
"Async error in processRequest for " & reqId & ": " & exc.msg
|
||||
)
|
||||
|
||||
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
|
||||
## keeps the async proc raises:[] compatible. The defer inside handleRes
|
||||
@ -237,7 +217,7 @@ proc processRequest[T](
|
||||
try:
|
||||
handleRes(res, request)
|
||||
except Exception as exc:
|
||||
error "Unexpected exception in handleRes", exc = exc.msg
|
||||
error "Unexpected exception in handleRes", error = exc.msg
|
||||
|
||||
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
## FFI thread body that attends library user API requests
|
||||
@ -250,8 +230,7 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
# wait can unblock and proceed with cleanup.
|
||||
let fireRes = ctx.threadExitSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to fire threadExitSignal on FFI thread exit",
|
||||
err = fireRes.error
|
||||
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
|
||||
|
||||
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
|
||||
var ffiReqHandler: T
|
||||
@ -281,6 +260,7 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
waitFor ffiRun(ctx)
|
||||
|
||||
proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Full cleanup for heap-allocated contexts: closes all resources and frees memory.
|
||||
defer:
|
||||
freeShared(ctx)
|
||||
ctx.lock.deinitLock()
|
||||
@ -316,10 +296,10 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
?ctx.threadExitSignal.close()
|
||||
return ok()
|
||||
|
||||
proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
|
||||
## This proc is called from the main thread and it creates
|
||||
## the FFI working thread.
|
||||
var ctx = createShared(FFIContext[T], 1)
|
||||
proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Initialises all resources inside an already-allocated FFIContext slot.
|
||||
## On failure every partially-initialised resource is closed; the caller
|
||||
## is responsible for releasing the slot (freeShared or pool.releaseSlot).
|
||||
ctx.lock.initLock()
|
||||
|
||||
var success = false
|
||||
@ -327,7 +307,7 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
|
||||
if not success:
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
error "failed to clean up resources after createFFIContext failure",
|
||||
err = error
|
||||
error = error
|
||||
|
||||
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create reqSignal ThreadSignalPtr: " & $error)
|
||||
@ -358,61 +338,58 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
|
||||
ctx.running.store(false)
|
||||
let fireRes = ctx.reqSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to signal ffiThread during watchdog cleanup",
|
||||
err = fireRes.error
|
||||
error "failed to signal ffiThread during watchdog cleanup", error = fireRes.error
|
||||
joinThread(ctx.ffiThread)
|
||||
return err("failed to create the watchdog thread: " & getCurrentExceptionMsg())
|
||||
|
||||
registerCtx(cast[pointer](ctx))
|
||||
success = true
|
||||
return ok(ctx)
|
||||
|
||||
proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## If the FFI thread's event loop is blocked by a synchronous handler
|
||||
## (e.g. blocking I/O), it cannot process reqSignal in time to exit.
|
||||
## In that case we leak ctx and the thread rather than hanging forever:
|
||||
## the thread will eventually exit on its own, but cleanup is skipped
|
||||
## because the thread may still be touching ctx fields.
|
||||
const ThreadExitTimeout = 1500.milliseconds
|
||||
unregisterCtx(cast[pointer](ctx))
|
||||
return ok()
|
||||
|
||||
proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
ctx.running.store(false)
|
||||
|
||||
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
|
||||
let reqSignaled = ctx.reqSignal.fireSync().valueOr:
|
||||
ctx.onNotResponding()
|
||||
return err("error in destroyFFIContext: " & $error)
|
||||
if not signaledOnTime:
|
||||
return err("error signaling reqSignal in signalStop: " & $error)
|
||||
if not reqSignaled:
|
||||
ctx.onNotResponding()
|
||||
return err("failed to signal reqSignal on time in destroyFFIContext")
|
||||
return err("failed to signal reqSignal on time in signalStop")
|
||||
let stopSignaled = ctx.stopSignal.fireSync().valueOr:
|
||||
return err("error signaling stopSignal in signalStop: " & $error)
|
||||
if not stopSignaled:
|
||||
return err("failed to signal stopSignal on time in signalStop")
|
||||
return ok()
|
||||
|
||||
ctx.stopSignal.fireSync().isOkOr:
|
||||
error "failed to fire stopSignal in destroyFFIContext", err = $error
|
||||
## If the FFI thread's event loop is blocked by a synchronous handler
|
||||
## (e.g. blocking I/O), it cannot process reqSignal in time to exit.
|
||||
## clearContext waits on threadExitSignal up to this bound; on timeout it
|
||||
## returns err and skips joinThread/cleanup (leaking the thread + ctx slot)
|
||||
## rather than hanging the caller forever.
|
||||
const ThreadExitTimeout* = 1500.milliseconds
|
||||
|
||||
proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Signals the FFI and watchdog threads to stop, waits up to ThreadExitTimeout
|
||||
## for the FFI thread to exit, and joins both. On timeout returns err and
|
||||
## skips joinThread (leaving the threads live) rather than hanging the caller.
|
||||
## Resource cleanup (signal fds, lock) is the caller's responsibility.
|
||||
ctx.signalStop().isOkOr:
|
||||
return err("signalStop failed: " & $error)
|
||||
|
||||
## Bounded wait for ffiThread to exit. waitSync blocks the calling thread
|
||||
## up to the timeout; ffiThread fires threadExitSignal in its defer block.
|
||||
let exitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr:
|
||||
ctx.onNotResponding()
|
||||
return err("error waiting for FFI thread exit: " & $error)
|
||||
|
||||
if not exitedOnTime:
|
||||
## Event loop is blocked by a synchronous handler. Leak the thread and
|
||||
## ctx to avoid hanging the caller forever.
|
||||
ctx.onNotResponding()
|
||||
return err("FFI thread did not exit in time; leaking ctx to avoid hang")
|
||||
|
||||
joinThread(ctx.ffiThread)
|
||||
joinThread(ctx.watchdogThread)
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
error "failed to clean up resources in destroyFFIContext", err = error
|
||||
return err("cleanUpResources failed: " & $error)
|
||||
|
||||
return ok()
|
||||
|
||||
template checkParams*(ctx: ptr FFIContext, callback: FFICallBack, userData: pointer) =
|
||||
if not isValidCtx(cast[pointer](ctx)):
|
||||
return RET_ERR
|
||||
|
||||
ctx[].userData = userData
|
||||
|
||||
if isNil(callback):
|
||||
return RET_MISSING_CALLBACK
|
||||
proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Stops the FFI context that was created via createFFIContext[T]() (heap).
|
||||
ctx.stopAndJoinThreads().isOkOr:
|
||||
return err("clearContext: " & $error)
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
return err("cleanUpResources failed: " & $error)
|
||||
return ok()
|
||||
|
||||
63
ffi/ffi_context_pool.nim
Normal file
63
ffi/ffi_context_pool.nim
Normal file
@ -0,0 +1,63 @@
|
||||
import std/atomics
|
||||
import results
|
||||
import ./ffi_context
|
||||
|
||||
const MaxFFIContexts* = 32
|
||||
## Maximum number of concurrently live FFI contexts when using FFIContextPool.
|
||||
## Fds and threads are only consumed for slots that are actually acquired,
|
||||
## so this value only affects the upfront memory of the pool array.
|
||||
|
||||
type FFIContextPool*[T] = object
|
||||
## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context
|
||||
## and bounds the total number of file descriptors consumed by ThreadSignalPtrs
|
||||
## to at most MaxFFIContexts * 2.
|
||||
slots: array[MaxFFIContexts, FFIContext[T]]
|
||||
inUse: array[MaxFFIContexts, Atomic[bool]]
|
||||
|
||||
proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] =
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
var expected = false
|
||||
if pool.inUse[i].compareExchange(expected, true):
|
||||
return ok(pool.slots[i].addr)
|
||||
return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
|
||||
|
||||
proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
if pool.slots[i].addr == ctx:
|
||||
pool.inUse[i].store(false)
|
||||
return
|
||||
|
||||
proc createFFIContext*[T](
|
||||
pool: var FFIContextPool[T]
|
||||
): Result[ptr FFIContext[T], string] =
|
||||
## Acquires a slot from the fixed pool and initialises it as an FFI context.
|
||||
## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open.
|
||||
let ctx = pool.acquireSlot().valueOr:
|
||||
return err("createFFIContext: acquireSlot failed: " & $error)
|
||||
initContextResources(ctx).isOkOr:
|
||||
pool.releaseSlot(ctx)
|
||||
return err("createFFIContext: initContextResources failed: " & $error)
|
||||
return ok(ctx)
|
||||
|
||||
proc destroyFFIContext*[T](
|
||||
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
|
||||
): Result[void, string] =
|
||||
## Stops the FFI context and returns its slot to the pool. If the FFI thread
|
||||
## is blocked and does not exit in time, the slot is leaked rather than
|
||||
## reclaimed — closing its resources while the thread is still live would be
|
||||
## unsafe.
|
||||
ctx.stopAndJoinThreads().isOkOr:
|
||||
return err("destroyFFIContext(pool): " & $error)
|
||||
pool.releaseSlot(ctx)
|
||||
return ok()
|
||||
|
||||
proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool =
|
||||
## Returns true only if ctx points to one of the pool's slots that is
|
||||
## currently in use. Rejects nil, offset-invalid, and dangling pointers
|
||||
## at the API boundary, preventing use-after-free dereferences.
|
||||
if ctx.isNil():
|
||||
return false
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
if cast[pointer](pool.slots[i].addr) == ctx:
|
||||
return pool.inUse[i].load()
|
||||
return false
|
||||
@ -533,6 +533,12 @@ macro ffiRaw*(prc: untyped): untyped =
|
||||
let paramIdent = firstParam[0]
|
||||
let paramType = firstParam[1]
|
||||
|
||||
# The first param of an `.ffiRaw.` proc is `ctx: ptr FFIContext[LibType]`.
|
||||
# Extract LibType so we can call the module-level pool var (named
|
||||
# "<LibType>FFIPool", declared by `.ffiCtor.`) to validate ctx.
|
||||
let libTypeName = paramType[0][1]
|
||||
let poolIdent = ident($libTypeName & "FFIPool")
|
||||
|
||||
let reqName = ident($procName & "Req")
|
||||
let returnType = ident("cint")
|
||||
|
||||
@ -569,7 +575,7 @@ macro ffiRaw*(prc: untyped): untyped =
|
||||
let ffiBody = newStmtList(
|
||||
quote do:
|
||||
initializeLibrary()
|
||||
if not isValidCtx(cast[pointer](ctx)):
|
||||
if not `poolIdent`.isValidCtx(cast[pointer](ctx)):
|
||||
return RET_ERR
|
||||
ctx[].userData = userData
|
||||
if isNil(callback):
|
||||
@ -804,9 +810,10 @@ macro ffi*(prc: untyped): untyped =
|
||||
if callback.isNil:
|
||||
return RET_MISSING_CALLBACK
|
||||
|
||||
let asyncPoolIdent = ident($libTypeName & "FFIPool")
|
||||
ffiBody.add quote do:
|
||||
if ctx.isNil or ctx[].myLib.isNil:
|
||||
let errStr = "context not initialized"
|
||||
if not `asyncPoolIdent`.isValidCtx(cast[pointer](ctx)):
|
||||
let errStr = "ctx is not a valid FFI context"
|
||||
callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData)
|
||||
return RET_ERR
|
||||
|
||||
@ -930,9 +937,10 @@ macro ffi*(prc: untyped): untyped =
|
||||
if callback.isNil:
|
||||
return RET_MISSING_CALLBACK
|
||||
|
||||
let syncPoolIdent = ident($libTypeName & "FFIPool")
|
||||
syncFfiBody.add quote do:
|
||||
if ctx.isNil or ctx[].myLib.isNil:
|
||||
let errStr = "context not initialized"
|
||||
if not `syncPoolIdent`.isValidCtx(cast[pointer](ctx)):
|
||||
let errStr = "ctx is not a valid FFI context"
|
||||
callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData)
|
||||
return RET_ERR
|
||||
|
||||
@ -1383,9 +1391,12 @@ macro ffiCtor*(prc: untyped): untyped =
|
||||
# Use a gensym'd ctx identifier so both the let binding and usage match
|
||||
let ctxSym = genSym(nskLet, "ctx")
|
||||
|
||||
# Module-level pool shared by ctor and dtor for this libType
|
||||
let poolIdent = ident($libTypeName & "FFIPool")
|
||||
|
||||
# Create the FFIContext synchronously; return nil on failure
|
||||
ffiBody.add quote do:
|
||||
let `ctxSym` = createFFIContext[`libTypeName`]().valueOr:
|
||||
let `ctxSym` = `poolIdent`.createFFIContext().valueOr:
|
||||
if not callback.isNil:
|
||||
let errStr = "ffiCtor: failed to create FFIContext: " & $error
|
||||
callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData)
|
||||
@ -1476,8 +1487,13 @@ macro ffiCtor*(prc: untyped): untyped =
|
||||
)
|
||||
)
|
||||
|
||||
let poolDecl = quote do:
|
||||
when not declared(`poolIdent`):
|
||||
var `poolIdent`: FFIContextPool[`libTypeName`]
|
||||
|
||||
result = newStmtList(
|
||||
typeDef, deleteProc, ffiNewReqProc, helperProc, processProc, addToReg, ffiProc
|
||||
typeDef, deleteProc, ffiNewReqProc, helperProc, processProc, addToReg, poolDecl,
|
||||
ffiProc,
|
||||
)
|
||||
|
||||
when defined(ffiDumpMacros):
|
||||
@ -1548,9 +1564,10 @@ macro ffiDtor*(prc: untyped): untyped =
|
||||
if not isNoop:
|
||||
ffiBody.add(bodyNode)
|
||||
|
||||
let poolIdent = ident($libTypeName & "FFIPool")
|
||||
ffiBody.add quote do:
|
||||
let `destroyResIdent` =
|
||||
destroyFFIContext[`libTypeName`](cast[ptr FFIContext[`libTypeName`]](ctx))
|
||||
`poolIdent`.destroyFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx))
|
||||
if `destroyResIdent`.isErr():
|
||||
if not callback.isNil:
|
||||
let errStr = "destroy failed: " & $`destroyResIdent`.error
|
||||
@ -1593,7 +1610,11 @@ macro ffiDtor*(prc: untyped): untyped =
|
||||
)
|
||||
)
|
||||
|
||||
result = ffiProc
|
||||
let poolDecl = quote do:
|
||||
when not declared(`poolIdent`):
|
||||
var `poolIdent`: FFIContextPool[`libTypeName`]
|
||||
|
||||
result = newStmtList(poolDecl, ffiProc)
|
||||
|
||||
when defined(ffiDumpMacros):
|
||||
echo result.repr
|
||||
@ -1622,6 +1643,8 @@ macro genBindings*(
|
||||
## -d:ffiNimSrcRelPath, or can be passed as explicit arguments.
|
||||
## This macro is a no-op unless -d:ffiGenBindings is set.
|
||||
##
|
||||
## This reads -d:ffiOutputDir, -d:ffiNimSrcRelPath, -d:targetLang from compile flags.
|
||||
##
|
||||
## Example (all via compile flags):
|
||||
## genBindings()
|
||||
## # nim c -d:ffiGenBindings -d:targetLang=rust \
|
||||
|
||||
@ -1,41 +1,58 @@
|
||||
import std/locks
|
||||
import std/[atomics, locks]
|
||||
import unittest2
|
||||
import results
|
||||
import ../ffi
|
||||
|
||||
type TestLib = object
|
||||
|
||||
proc dummyCallback(
|
||||
ffiType:
|
||||
type CtxValidationConfig = object
|
||||
initialValue: int
|
||||
|
||||
proc ctxval_create*(
|
||||
config: CtxValidationConfig
|
||||
): Future[Result[TestLib, string]] {.ffiCtor.} =
|
||||
return ok(TestLib())
|
||||
|
||||
proc ctxval_ping*(lib: TestLib): Future[Result[string, string]] {.ffi.} =
|
||||
return ok("pong")
|
||||
|
||||
type CallbackState = object
|
||||
lock: Lock
|
||||
called: Atomic[bool]
|
||||
retCode: cint
|
||||
|
||||
proc initCbState(s: var CallbackState) =
|
||||
s.lock.initLock()
|
||||
s.called.store(false)
|
||||
|
||||
proc validationCallback(
|
||||
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
||||
) {.cdecl, gcsafe, raises: [].} =
|
||||
discard
|
||||
let s = cast[ptr CallbackState](userData)
|
||||
s[].retCode = retCode
|
||||
s[].called.store(true)
|
||||
|
||||
registerReqFFI(ValidationTestRequest, lib: ptr TestLib):
|
||||
proc(): Future[Result[string, string]] {.async.} =
|
||||
return ok("ok")
|
||||
suite "ctx pointer validation at the FFI entry point":
|
||||
# The macro-generated FFI entry point validates ctx via
|
||||
# <LibType>FFIPool.isValidCtx. Any caller — C or Nim — that passes a nil or
|
||||
# offset-invalid ctx with a valid callback should receive RET_ERR via the
|
||||
# callback and the proc should return RET_ERR, never crash.
|
||||
|
||||
suite "ctx pointer validation":
|
||||
# BUG: sendRequestToFFIThread has no nil-check on ctx.
|
||||
# checkParams / {.ffi.} generated code only guards against nil callback,
|
||||
# not nil (or otherwise invalid) ctx. Any caller — C or Nim — that passes
|
||||
# a nil or offset-invalid ctx with a valid callback bypasses the only guard
|
||||
# and reaches ctx.lock.acquire() where the nil/garbage dereference crashes.
|
||||
|
||||
test "nil ctx with valid callback should return an error, not crash":
|
||||
# Reproduces the nil case: ctx=nil, callback=valid.
|
||||
# Expected (after fix): sendRequestToFFIThread returns isErr().
|
||||
# Actual (currently) : SIGSEGV at ctx.lock.acquire() in sendRequestToFFIThread.
|
||||
test "nil ctx with valid callback returns RET_ERR via callback, no crash":
|
||||
var s: CallbackState
|
||||
initCbState(s)
|
||||
let nilCtx: ptr FFIContext[TestLib] = nil
|
||||
let req = ValidationTestRequest.ffiNewReq(dummyCallback, nil)
|
||||
let res = sendRequestToFFIThread(nilCtx, req)
|
||||
check res.isErr()
|
||||
let ret = ctxval_ping(nilCtx, validationCallback, addr s)
|
||||
check ret == RET_ERR
|
||||
check s.called.load()
|
||||
check s.retCode == RET_ERR
|
||||
|
||||
test "invalid non-nil ctx (ctx+123 style) should return an error, not crash":
|
||||
# Reproduces the offset-pointer case: a non-nil but invalid pointer passes
|
||||
# isNil() and reaches the lock dereference, causing a crash.
|
||||
# Expected (after fix): sendRequestToFFIThread returns isErr().
|
||||
# Actual (currently) : SIGSEGV when the garbage pointer is dereferenced.
|
||||
test "invalid non-nil ctx (offset-pointer) returns RET_ERR, no crash":
|
||||
var s: CallbackState
|
||||
initCbState(s)
|
||||
let invalidCtx = cast[ptr FFIContext[TestLib]](123)
|
||||
let req = ValidationTestRequest.ffiNewReq(dummyCallback, nil)
|
||||
let res = sendRequestToFFIThread(invalidCtx, req)
|
||||
check res.isErr()
|
||||
let ret = ctxval_ping(invalidCtx, validationCallback, addr s)
|
||||
check ret == RET_ERR
|
||||
check s.called.load()
|
||||
check s.retCode == RET_ERR
|
||||
|
||||
@ -121,19 +121,77 @@ registerReqFFI(HeavyRefAllocRequest, lib: ptr TestLib):
|
||||
await sleepAsync(10.milliseconds)
|
||||
return ok("heavy-done")
|
||||
|
||||
suite "FFIContextPool":
|
||||
test "create and destroy via pool succeeds":
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
assert false, "createFFIContext(pool) failed: " & $error
|
||||
return
|
||||
check pool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
test "slot is reused after destroy":
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx1 = pool.createFFIContext().valueOr:
|
||||
assert false, "createFFIContext(pool) failed: " & $error
|
||||
return
|
||||
check pool.destroyFFIContext(ctx1).isOk()
|
||||
# After destroying, the same slot must be available again
|
||||
let ctx2 = pool.createFFIContext().valueOr:
|
||||
assert false, "createFFIContext(pool) failed after slot release: " & $error
|
||||
return
|
||||
check pool.destroyFFIContext(ctx2).isOk()
|
||||
check ctx1 == ctx2 # same array slot reused
|
||||
|
||||
test "pool exhaustion returns error":
|
||||
var pool: FFIContextPool[TestLib]
|
||||
var ctxs: array[MaxFFIContexts, ptr FFIContext[TestLib]]
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
ctxs[i] = pool.createFFIContext().valueOr:
|
||||
for j in 0 ..< i:
|
||||
discard pool.destroyFFIContext(ctxs[j])
|
||||
assert false, "createFFIContext(pool) failed at slot " & $i & ": " & $error
|
||||
return
|
||||
# Pool is now full — next create must fail
|
||||
check pool.createFFIContext().isErr()
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
discard pool.destroyFFIContext(ctxs[i])
|
||||
|
||||
test "requests are processed via pool context":
|
||||
var pool: FFIContextPool[TestLib]
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer:
|
||||
deinitCallbackData(d)
|
||||
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
assert false, "createFFIContext(pool) failed: " & $error
|
||||
return
|
||||
defer:
|
||||
discard pool.destroyFFIContext(ctx)
|
||||
|
||||
check sendRequestToFFIThread(
|
||||
ctx, PingRequest.ffiNewReq(testCallback, addr d, "pool".cstring)
|
||||
)
|
||||
.isOk()
|
||||
waitCallback(d)
|
||||
check d.retCode == RET_OK
|
||||
check callbackMsg(d) == "pong:pool"
|
||||
|
||||
suite "createFFIContext / destroyFFIContext":
|
||||
test "create and destroy succeeds":
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
checkpoint "createFFIContext failed: " & $error
|
||||
check false
|
||||
return
|
||||
check destroyFFIContext(ctx).isOk()
|
||||
check pool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
test "double destroy is safe via running flag":
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
check destroyFFIContext(ctx).isOk()
|
||||
check pool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
suite "destroyFFIContext does not hang":
|
||||
test "destroy while a slow async request is still in-flight":
|
||||
@ -141,7 +199,8 @@ suite "destroyFFIContext does not hang":
|
||||
## running async request (e.g. stop_node / w.stop()) was still executing.
|
||||
## The destroy must return well within 2 seconds; before the fix it would
|
||||
## block forever on joinThread(ffiThread).
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
|
||||
@ -157,7 +216,7 @@ suite "destroyFFIContext does not hang":
|
||||
|
||||
# Destroy immediately while SlowRequest is still running.
|
||||
let t0 = Moment.now()
|
||||
check destroyFFIContext(ctx).isOk()
|
||||
check pool.destroyFFIContext(ctx).isOk()
|
||||
check (Moment.now() - t0) < 2.seconds
|
||||
|
||||
suite "destroyFFIContext does not hang when event loop is blocked":
|
||||
@ -174,7 +233,8 @@ suite "destroyFFIContext does not hang when event loop is blocked":
|
||||
##
|
||||
## With the fix, destroyFFIContext must complete well within the 5 s that
|
||||
## SyncBlockingRequest holds the event loop.
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
|
||||
@ -199,7 +259,7 @@ suite "destroyFFIContext does not hang when event loop is blocked":
|
||||
# It deliberately returns err and leaks ctx in this scenario rather than
|
||||
# hanging on joinThread.
|
||||
let t0 = Moment.now()
|
||||
check destroyFFIContext(ctx).isErr()
|
||||
check pool.destroyFFIContext(ctx).isErr()
|
||||
check (Moment.now() - t0) < 3.seconds
|
||||
|
||||
# Drain the leaked thread before the test scope ends.
|
||||
@ -247,7 +307,8 @@ suite "destroyFFIContext refc workaround":
|
||||
## returns immediately. Under `--mm:orc` it returns immediately either
|
||||
## way.
|
||||
test "destroy after heavy ref-allocation workload returns promptly":
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
|
||||
@ -262,21 +323,27 @@ suite "destroyFFIContext refc workaround":
|
||||
check d.retCode == RET_OK
|
||||
|
||||
let t0 = Moment.now()
|
||||
check destroyFFIContext(ctx).isOk()
|
||||
check pool.destroyFFIContext(ctx).isOk()
|
||||
check (Moment.now() - t0) < 3.seconds
|
||||
|
||||
suite "sendRequestToFFIThread":
|
||||
test "successful request triggers RET_OK callback":
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer: deinitCallbackData(d)
|
||||
defer:
|
||||
deinitCallbackData(d)
|
||||
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
defer: discard destroyFFIContext(ctx)
|
||||
defer:
|
||||
discard pool.destroyFFIContext(ctx)
|
||||
|
||||
check sendRequestToFFIThread(ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring)).isOk()
|
||||
check sendRequestToFFIThread(
|
||||
ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring)
|
||||
)
|
||||
.isOk()
|
||||
waitCallback(d)
|
||||
check d.retCode == RET_OK
|
||||
check callbackMsg(d) == "pong:hello"
|
||||
@ -284,12 +351,15 @@ suite "sendRequestToFFIThread":
|
||||
test "failing request triggers RET_ERR callback":
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer: deinitCallbackData(d)
|
||||
defer:
|
||||
deinitCallbackData(d)
|
||||
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
defer: discard destroyFFIContext(ctx)
|
||||
defer:
|
||||
discard pool.destroyFFIContext(ctx)
|
||||
|
||||
check sendRequestToFFIThread(ctx, FailRequest.ffiNewReq(testCallback, addr d)).isOk()
|
||||
waitCallback(d)
|
||||
@ -298,29 +368,38 @@ suite "sendRequestToFFIThread":
|
||||
test "empty ok response delivers empty message":
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer: deinitCallbackData(d)
|
||||
defer:
|
||||
deinitCallbackData(d)
|
||||
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
defer: discard destroyFFIContext(ctx)
|
||||
defer:
|
||||
discard pool.destroyFFIContext(ctx)
|
||||
|
||||
check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d)).isOk()
|
||||
check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d))
|
||||
.isOk()
|
||||
waitCallback(d)
|
||||
check d.retCode == RET_OK
|
||||
check d.msgLen == 0
|
||||
|
||||
test "sequential requests are all processed":
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
defer: discard destroyFFIContext(ctx)
|
||||
defer:
|
||||
discard pool.destroyFFIContext(ctx)
|
||||
|
||||
for i in 1 .. 5:
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
let msg = "msg" & $i
|
||||
check sendRequestToFFIThread(ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring)).isOk()
|
||||
check sendRequestToFFIThread(
|
||||
ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring)
|
||||
)
|
||||
.isOk()
|
||||
waitCallback(d)
|
||||
deinitCallbackData(d)
|
||||
check d.retCode == RET_OK
|
||||
@ -369,7 +448,7 @@ suite "ffiCtor macro":
|
||||
check not ctx[].myLib.isNil
|
||||
check ctx[].myLib[].value == 42
|
||||
|
||||
check destroyFFIContext(ctx).isOk()
|
||||
check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Simplified .ffi. macro integration test
|
||||
@ -404,7 +483,7 @@ suite "simplified .ffi. macro":
|
||||
let ctxAddr = cast[uint](parseBiggestUInt(addrStr))
|
||||
check ctxAddr != 0
|
||||
let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr)
|
||||
defer: check destroyFFIContext(ctx).isOk()
|
||||
defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
# Now call the .ffi. proc
|
||||
var d: CallbackData
|
||||
@ -454,7 +533,7 @@ suite "async/sync detection in .ffi.":
|
||||
let ctxAddr = cast[uint](parseBiggestUInt(addrStr))
|
||||
check ctxAddr != 0
|
||||
let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr)
|
||||
defer: check destroyFFIContext(ctx).isOk()
|
||||
defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
var d2: CallbackData
|
||||
initCallbackData(d2)
|
||||
@ -522,7 +601,7 @@ suite "ptr return type in .ffi.":
|
||||
let ctxAddr = cast[uint](parseBiggestUInt(ctxAddrStr))
|
||||
check ctxAddr != 0
|
||||
let ctx = cast[ptr FFIContext[SimpleLib]](ctxAddr)
|
||||
defer: check destroyFFIContext(ctx).isOk()
|
||||
defer: check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
# Alloc a handle
|
||||
var allocD: CallbackData
|
||||
|
||||
@ -95,11 +95,12 @@ suite "GC safety - string lifetime across thread boundary":
|
||||
initCallbackData(d)
|
||||
defer: deinitCallbackData(d)
|
||||
|
||||
let ctx = createFFIContext[GcTestLib]().valueOr:
|
||||
var pool: FFIContextPool[GcTestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
checkpoint "createFFIContext failed: " & $error
|
||||
check false
|
||||
return
|
||||
defer: discard destroyFFIContext(ctx)
|
||||
defer: discard pool.destroyFFIContext(ctx)
|
||||
|
||||
check sendRequestToFFIThread(
|
||||
ctx, StringLifetimeRequest.ffiNewReq(testCallback, addr d, "hello".cstring)
|
||||
@ -113,10 +114,11 @@ suite "GC safety - string lifetime across thread boundary":
|
||||
initCallbackData(d)
|
||||
defer: deinitCallbackData(d)
|
||||
|
||||
let ctx = createFFIContext[GcTestLib]().valueOr:
|
||||
var pool: FFIContextPool[GcTestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
defer: discard destroyFFIContext(ctx)
|
||||
defer: discard pool.destroyFFIContext(ctx)
|
||||
|
||||
check sendRequestToFFIThread(
|
||||
ctx, GcErrRequest.ffiNewReq(testCallback, addr d, "test".cstring)
|
||||
@ -130,10 +132,11 @@ suite "GC safety - string lifetime across thread boundary":
|
||||
initCallbackData(d)
|
||||
defer: deinitCallbackData(d)
|
||||
|
||||
let ctx = createFFIContext[GcTestLib]().valueOr:
|
||||
var pool: FFIContextPool[GcTestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
defer: discard destroyFFIContext(ctx)
|
||||
defer: discard pool.destroyFFIContext(ctx)
|
||||
|
||||
check sendRequestToFFIThread(
|
||||
ctx, LargeStringRequest.ffiNewReq(testCallback, addr d)
|
||||
@ -147,10 +150,11 @@ suite "GC safety - string lifetime across thread boundary":
|
||||
|
||||
suite "GC stability - repeated requests":
|
||||
test "20 sequential requests without GC corruption":
|
||||
let ctx = createFFIContext[GcTestLib]().valueOr:
|
||||
var pool: FFIContextPool[GcTestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
defer: discard destroyFFIContext(ctx)
|
||||
defer: discard pool.destroyFFIContext(ctx)
|
||||
|
||||
for i in 1 .. 20:
|
||||
var d: CallbackData
|
||||
|
||||
@ -104,10 +104,6 @@ suite "ffiDeserialize error handling":
|
||||
let back = ffiDeserialize("not json at all".cstring, int)
|
||||
check back.isErr()
|
||||
|
||||
test "wrong JSON type returns err for string":
|
||||
let back = ffiDeserialize("42".cstring, string)
|
||||
check back.isErr()
|
||||
|
||||
test "malformed JSON for object returns err":
|
||||
let back = ffiDeserialize("{bad json".cstring, Point)
|
||||
check back.isErr()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user