mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-06-20 16:29:31 +00:00
fix(pool): recycle contexts to stop per-cycle fd leak (#74)
This commit is contained in:
parent
fb25f069d2
commit
c1aed9cfa5
4
.github/workflows/ci.yml
vendored
4
.github/workflows/ci.yml
vendored
@ -2,9 +2,9 @@ name: CI
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [master, main]
|
||||
branches: [master, main, 'release/*']
|
||||
pull_request:
|
||||
branches: [master, main]
|
||||
branches: [master, main, 'release/*']
|
||||
|
||||
env:
|
||||
NIM_VERSION: '2.2.4'
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
{.pragma: callback, cdecl, raises: [], gcsafe.}
|
||||
{.passc: "-fPIC".}
|
||||
|
||||
import std/[atomics, locks, json, tables]
|
||||
import std/[atomics, locks, json, tables, sequtils]
|
||||
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
|
||||
import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro, ./logging
|
||||
|
||||
@ -12,6 +12,16 @@ type FFICallbackState* = object
|
||||
callback*: pointer
|
||||
userData*: pointer
|
||||
|
||||
type CtxLifecycle {.pure.} = enum
|
||||
## State machine guarding a pooled FFI context, held as an Atomic on FFIContext.
|
||||
## Transitions:
|
||||
## Active -> RecyclePending when ffiDtor is invoked
|
||||
## RecyclePending -> Recycling The process completed the in-flight processes and is ready for lib cleanup and release
|
||||
## Recycling -> Active When the FFI thread is ready again to attend to requests
|
||||
Active ## accepting and serving requests
|
||||
RecyclePending ## recycle requested; FFI thread loop hasn't claimed it yet
|
||||
Recycling ## FFI loop draining handlers, then frees lib + returns to pool
|
||||
|
||||
type FFIContext*[T] = object
|
||||
myLib*: ptr T
|
||||
# main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library)
|
||||
@ -33,6 +43,15 @@ type FFIContext*[T] = object
|
||||
userData*: pointer
|
||||
callbackState*: FFICallbackState
|
||||
running: Atomic[bool] # To control when the threads are running
|
||||
lifecycle: Atomic[CtxLifecycle]
|
||||
recycleCallback: FFICallBack
|
||||
# The destructor's callback, fired by the recycle handler with the outcome:
|
||||
# RET_OK once drained, RET_ERR if it timed out. Set by requestRecycle.
|
||||
recycleUserData: pointer
|
||||
inUse: Atomic[bool]
|
||||
# Whether the context is claimed. createFFIContext claims it (false -> true); the
|
||||
# recycle handler clears it once drained. On the context so the owning thread can
|
||||
# release it without reaching into the pool.
|
||||
registeredRequests: ptr Table[cstring, FFIRequestProc]
|
||||
# Pointer to with the registered requests at compile time
|
||||
|
||||
@ -90,14 +109,14 @@ proc sendRequestToFFIThread*(
|
||||
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
|
||||
): Result[void, string] =
|
||||
ctx.lock.acquire()
|
||||
# This lock is only necessary while we use a SP Channel and while the signalling
|
||||
# between threads assumes that there aren't concurrent requests.
|
||||
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
|
||||
# requests concurrently and spare us the need of locks
|
||||
defer:
|
||||
ctx.lock.release()
|
||||
|
||||
## Sending the request
|
||||
if ctx.lifecycle.load() != CtxLifecycle.Active:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("FFI context is not accepting requests (being recycled)")
|
||||
|
||||
## Sending the request to the FFI thread
|
||||
let sentOk = ctx.reqChannel.trySend(ffiRequest)
|
||||
if not sentOk:
|
||||
deleteRequest(ffiRequest)
|
||||
@ -115,12 +134,8 @@ proc sendRequestToFFIThread*(
|
||||
## wait until the FFI working thread properly received the request
|
||||
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
||||
if res.isErr():
|
||||
## Do not free ffiRequest here: the FFI thread was already signaled and
|
||||
## will process (and free) it.
|
||||
return err("Couldn't receive reqReceivedSignal signal")
|
||||
|
||||
## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the
|
||||
## process proc.
|
||||
return ok()
|
||||
|
||||
type Foo = object
|
||||
@ -162,6 +177,9 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
|
||||
debug "Watchdog thread exiting because FFIContext is not running"
|
||||
break
|
||||
|
||||
if ctx.lifecycle.load() != CtxLifecycle.Active:
|
||||
continue
|
||||
|
||||
let callback = proc(
|
||||
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
||||
) {.cdecl, gcsafe, raises: [].} =
|
||||
@ -192,9 +210,8 @@ proc processRequest[T](
|
||||
## The registeredRequests represents a table defined at compile time.
|
||||
## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously]
|
||||
|
||||
## Explicit conversion keeps `reqId` alive as the backing string,
|
||||
## avoiding the implicit string→cstring warning that will become an error.
|
||||
let reqIdCs = reqId.cstring
|
||||
# keep `reqId` alive and avoid the implicit string→cstring warning.
|
||||
|
||||
let retFut =
|
||||
if not ctx[].registeredRequests[].contains(reqIdCs):
|
||||
@ -206,19 +223,79 @@ proc processRequest[T](
|
||||
let res =
|
||||
try:
|
||||
await retFut
|
||||
except CancelledError as exc:
|
||||
Result[string, string].err("Request cancelled during destroy: " & exc.msg)
|
||||
except AsyncError as exc:
|
||||
Result[string, string].err(
|
||||
"Async error in processRequest for " & reqId & ": " & exc.msg
|
||||
)
|
||||
|
||||
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
|
||||
## keeps the async proc raises:[] compatible. The defer inside handleRes
|
||||
## guarantees request is freed before the exception propagates.
|
||||
## handleRes may raise (OOM, GC setup) even though it is rare.
|
||||
try:
|
||||
handleRes(res, request)
|
||||
except Exception as exc:
|
||||
error "Unexpected exception in handleRes", error = exc.msg
|
||||
|
||||
proc freeLib[T](ctx: ptr FFIContext[T]) {.gcsafe.} =
|
||||
if ctx.myLib.isNil():
|
||||
return
|
||||
|
||||
when not defined(gcRefc):
|
||||
{.cast(gcsafe).}:
|
||||
`=destroy`(ctx.myLib[])
|
||||
else:
|
||||
discard
|
||||
freeShared(ctx.myLib)
|
||||
ctx.myLib = nil
|
||||
|
||||
var RecycleTimeout* = 1500.milliseconds
|
||||
## Upper bound the recycle handler waits for in-flight handlers before it
|
||||
## cancels them and reports the ctx as stuck. The drain returns as soon as they
|
||||
## finish, so this only bounds a *stuck* handler. A `var` so tests can shorten it.
|
||||
|
||||
proc recycleContext[T](
|
||||
ctx: ptr FFIContext[T], ongoingProcessReq: ptr seq[Future[void]]
|
||||
) {.async.} =
|
||||
## Drain the in-flight handlers, free the lib object, release the context for reuse,
|
||||
## and fire the callback with the outcome. Never blocks the caller.
|
||||
|
||||
ongoingProcessReq[].keepItIf(not it.finished())
|
||||
|
||||
## 1. Let the in-flight handlers finish on their own, bounded by RecycleTimeout.
|
||||
var naturallyDrained = ongoingProcessReq[].len == 0
|
||||
if not naturallyDrained:
|
||||
naturallyDrained = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout)
|
||||
|
||||
## 2. If any are wedged, cancel them and give the cancellations a bounded moment
|
||||
## to unwind, so the context can be reclaimed rather than leaked.
|
||||
var safeToRecycle = naturallyDrained
|
||||
if not naturallyDrained:
|
||||
for fut in ongoingProcessReq[]:
|
||||
if not fut.finished():
|
||||
fut.cancelSoon()
|
||||
safeToRecycle = await allFutures(ongoingProcessReq[]).withTimeout(RecycleTimeout)
|
||||
|
||||
let cb = ctx.recycleCallback
|
||||
let ud = ctx.recycleUserData
|
||||
ctx.recycleCallback = nil
|
||||
|
||||
if safeToRecycle:
|
||||
freeLib(ctx)
|
||||
ctx.callbackState = default(FFICallbackState)
|
||||
ongoingProcessReq[].setLen(0)
|
||||
ctx.release()
|
||||
|
||||
if not cb.isNil():
|
||||
foreignThreadGc:
|
||||
let msg =
|
||||
if naturallyDrained:
|
||||
""
|
||||
else:
|
||||
"recycle: in-flight requests did not finish in time"
|
||||
let cmsg = msg.cstring
|
||||
let retCode = if naturallyDrained: RET_OK else: RET_ERR
|
||||
cb(retCode, unsafeAddr cmsg[0], cast[csize_t](msg.len), ud)
|
||||
|
||||
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
## FFI thread body that attends library user API requests
|
||||
ffiCurrentCallbackState = addr ctx[].callbackState
|
||||
@ -226,18 +303,20 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
|
||||
|
||||
defer:
|
||||
# Signal destroyFFIContext that this thread has exited, so its bounded
|
||||
# wait can unblock and proceed with cleanup.
|
||||
let fireRes = ctx.threadExitSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
|
||||
|
||||
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
|
||||
var ffiReqHandler: T
|
||||
## Holds the main library object, i.e., in charge of handling the ffi requests.
|
||||
## e.g., Waku, LibP2P, SDS, etc.
|
||||
|
||||
var ongoingProcessReq: seq[Future[void]]
|
||||
|
||||
while ctx.running.load():
|
||||
var expected = CtxLifecycle.RecyclePending
|
||||
if ctx.lifecycle.compareExchange(expected, CtxLifecycle.Recycling):
|
||||
await recycleContext(ctx, addr ongoingProcessReq)
|
||||
continue
|
||||
|
||||
let gotSignal = await ctx.reqSignal.wait().withTimeout(100.milliseconds)
|
||||
if not gotSignal:
|
||||
continue
|
||||
@ -247,11 +326,8 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
if not ctx.reqChannel.tryRecv(request):
|
||||
continue
|
||||
|
||||
if ctx.myLib.isNil():
|
||||
ctx.myLib = addr ffiReqHandler
|
||||
|
||||
## Handle the request
|
||||
asyncSpawn processRequest(request, ctx)
|
||||
ongoingProcessReq.keepItIf(not it.finished())
|
||||
ongoingProcessReq.add(processRequest(request, ctx))
|
||||
|
||||
let fireRes = ctx.reqReceivedSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
@ -297,9 +373,9 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
return ok()
|
||||
|
||||
proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Initialises all resources inside an already-allocated FFIContext slot.
|
||||
## Initialises all resources inside an already-allocated FFIContext.
|
||||
## On failure every partially-initialised resource is closed; the caller
|
||||
## is responsible for releasing the slot (freeShared or pool.releaseSlot).
|
||||
## is responsible for releasing the context.
|
||||
ctx.lock.initLock()
|
||||
|
||||
var success = false
|
||||
@ -323,6 +399,7 @@ proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
|
||||
ctx.registeredRequests = addr ffi_types.registeredRequests
|
||||
|
||||
ctx.lifecycle.store(CtxLifecycle.Active)
|
||||
ctx.running.store(true)
|
||||
|
||||
try:
|
||||
@ -361,8 +438,8 @@ proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
|
||||
## If the FFI thread's event loop is blocked by a synchronous handler
|
||||
## (e.g. blocking I/O), it cannot process reqSignal in time to exit.
|
||||
## clearContext waits on threadExitSignal up to this bound; on timeout it
|
||||
## returns err and skips joinThread/cleanup (leaking the thread + ctx slot)
|
||||
## stopAndJoinThreads waits on threadExitSignal up to this bound; on timeout it
|
||||
## returns err and skips joinThread/cleanup (leaking the thread + ctx)
|
||||
## rather than hanging the caller forever.
|
||||
const ThreadExitTimeout* = 1500.milliseconds
|
||||
|
||||
@ -386,10 +463,41 @@ proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
joinThread(ctx.watchdogThread)
|
||||
return ok()
|
||||
|
||||
proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
## Stops the FFI context that was created via createFFIContext[T]() (heap).
|
||||
ctx.stopAndJoinThreads().isOkOr:
|
||||
return err("clearContext: " & $error)
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
return err("cleanUpResources failed: " & $error)
|
||||
proc requestRecycle*[T](
|
||||
ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer
|
||||
): Result[void, string] =
|
||||
## Starts ctx recycle process without stopping its worker, so the next
|
||||
## createFFIContext reuses the same threads and fds.
|
||||
##
|
||||
## During recycling, the FFI thread drains the handlers, frees the lib and releases
|
||||
## the context, then fires `callback` (RET_OK drained, RET_ERR stuck).
|
||||
|
||||
ctx.lock.acquire()
|
||||
if ctx.lifecycle.load() != CtxLifecycle.Active:
|
||||
ctx.lock.release()
|
||||
return err("requestRecycle: context is not Active (already recycling)")
|
||||
|
||||
ctx.recycleCallback = callback
|
||||
ctx.recycleUserData = userData
|
||||
ctx.lifecycle.store(CtxLifecycle.RecyclePending)
|
||||
ctx.lock.release()
|
||||
|
||||
let fired = ctx.reqSignal.fireSync().valueOr:
|
||||
return err("requestRecycle: failed to signal the FFI thread: " & $error)
|
||||
if not fired:
|
||||
return err("requestRecycle: failed to signal the FFI thread in time")
|
||||
return ok()
|
||||
|
||||
proc markAsActive*[T](ctx: ptr FFIContext[T]) =
|
||||
ctx.lifecycle.store(CtxLifecycle.Active)
|
||||
|
||||
proc tryClaim*[T](ctx: ptr FFIContext[T]): bool =
|
||||
## Returns true if acquired the contex, false if it was already claimed.
|
||||
var expected = false
|
||||
ctx.inUse.compareExchange(expected, true)
|
||||
|
||||
proc release*[T](ctx: ptr FFIContext[T]) =
|
||||
ctx.inUse.store(false)
|
||||
|
||||
proc isInUse*[T](ctx: ptr FFIContext[T]): bool =
|
||||
ctx.inUse.load()
|
||||
|
||||
@ -1,63 +1,60 @@
|
||||
import std/atomics
|
||||
import results
|
||||
import ./ffi_context
|
||||
import ./ffi_context, ./ffi_types
|
||||
|
||||
const MaxFFIContexts* = 32
|
||||
## Maximum number of concurrently live FFI contexts when using FFIContextPool.
|
||||
## Fds and threads are only consumed for slots that are actually acquired,
|
||||
## so this value only affects the upfront memory of the pool array.
|
||||
|
||||
type FFIContextPool*[T] = object
|
||||
## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context
|
||||
## and bounds the total number of file descriptors consumed by ThreadSignalPtrs
|
||||
## to at most MaxFFIContexts * 2.
|
||||
slots: array[MaxFFIContexts, FFIContext[T]]
|
||||
inUse: array[MaxFFIContexts, Atomic[bool]]
|
||||
|
||||
proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] =
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
var expected = false
|
||||
if pool.inUse[i].compareExchange(expected, true):
|
||||
return ok(pool.slots[i].addr)
|
||||
return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
|
||||
|
||||
proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
if pool.slots[i].addr == ctx:
|
||||
pool.inUse[i].store(false)
|
||||
return
|
||||
contexts: array[MaxFFIContexts, FFIContext[T]]
|
||||
initialized: array[MaxFFIContexts, Atomic[bool]]
|
||||
|
||||
proc createFFIContext*[T](
|
||||
pool: var FFIContextPool[T]
|
||||
): Result[ptr FFIContext[T], string] =
|
||||
## Acquires a slot from the fixed pool and initialises it as an FFI context.
|
||||
## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open.
|
||||
let ctx = pool.acquireSlot().valueOr:
|
||||
return err("createFFIContext: acquireSlot failed: " & $error)
|
||||
initContextResources(ctx).isOkOr:
|
||||
pool.releaseSlot(ctx)
|
||||
return err("createFFIContext: initContextResources failed: " & $error)
|
||||
return ok(ctx)
|
||||
## Acquires a context from the fixed pool. The context's worker is built once on
|
||||
## first use and reused on every later acquisition.
|
||||
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
let ctx = pool.contexts[i].addr
|
||||
if not ctx.tryClaim():
|
||||
continue
|
||||
if pool.initialized[i].load():
|
||||
## Reused context: a prior destroy drained and released it, worker still alive.
|
||||
ctx.markAsActive()
|
||||
return ok(ctx)
|
||||
initContextResources(ctx).isOkOr:
|
||||
ctx.release()
|
||||
return err("createFFIContext: initContextResources failed: " & $error)
|
||||
pool.initialized[i].store(true)
|
||||
return ok(ctx)
|
||||
return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
|
||||
|
||||
proc releaseFFIContext*[T](
|
||||
ctx: ptr FFIContext[T], callback: FFICallBack, userData: pointer
|
||||
): Result[void, string] =
|
||||
return ctx.requestRecycle(callback, userData)
|
||||
|
||||
proc destroyFFIContext*[T](
|
||||
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
|
||||
): Result[void, string] =
|
||||
## Stops the FFI context and returns its slot to the pool. If the FFI thread
|
||||
## is blocked and does not exit in time, the slot is leaked rather than
|
||||
## reclaimed — closing its resources while the thread is still live would be
|
||||
## unsafe.
|
||||
## Full teardown: stops/joins the worker threads and returns the context to the
|
||||
## pool, marking it uninitialised so a later createFFIContext rebuilds it.
|
||||
ctx.stopAndJoinThreads().isOkOr:
|
||||
return err("destroyFFIContext(pool): " & $error)
|
||||
pool.releaseSlot(ctx)
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
if pool.contexts[i].addr == ctx:
|
||||
pool.initialized[i].store(false)
|
||||
break
|
||||
ctx.release()
|
||||
return ok()
|
||||
|
||||
proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool =
|
||||
## Returns true only if ctx points to one of the pool's slots that is
|
||||
## currently in use. Rejects nil, offset-invalid, and dangling pointers
|
||||
## at the API boundary, preventing use-after-free dereferences.
|
||||
## Returns true only if ctx points to one of the pool's contexts that is
|
||||
## currently in use.
|
||||
if ctx.isNil():
|
||||
return false
|
||||
for i in 0 ..< MaxFFIContexts:
|
||||
if cast[pointer](pool.slots[i].addr) == ctx:
|
||||
return pool.inUse[i].load()
|
||||
if cast[pointer](pool.contexts[i].addr) == ctx:
|
||||
return cast[ptr FFIContext[T]](ctx).isInUse()
|
||||
return false
|
||||
|
||||
@ -1511,14 +1511,15 @@ macro ffiDtor*(prc: untyped): untyped =
|
||||
## The body contains any library-level cleanup to run before context teardown.
|
||||
##
|
||||
## Example:
|
||||
## proc waku_destroy*(w: Waku) {.ffiDtor.} =
|
||||
## w.cleanup()
|
||||
## proc mylibobj_destroy*(obj: MyLibObj) {.ffiDtor.} =
|
||||
## obj.cleanup()
|
||||
##
|
||||
## The generated C-exported proc has the signature:
|
||||
## cint waku_destroy(void* ctx, FfiCallback callback, void* userData)
|
||||
## cint mylibobj_destroy(void* ctx, FfiCallback callback, void* userData)
|
||||
##
|
||||
## It extracts the library value from ctx, runs the body, then calls
|
||||
## destroyFFIContext to tear down the FFI thread and free the context.
|
||||
## Recycle the context for reuse to keep fd usage bounded.
|
||||
## NON-BLOCKING: returns RET_OK once accepted;
|
||||
## the real outcome arrives via `callback`.
|
||||
|
||||
let procName = prc[0]
|
||||
let formalParams = prc[3]
|
||||
@ -1528,7 +1529,7 @@ macro ffiDtor*(prc: untyped): untyped =
|
||||
error("ffiDtor: proc must have exactly one parameter (w: LibType)")
|
||||
|
||||
let libParamName = formalParams[1][0] # e.g. w
|
||||
let libTypeName = formalParams[1][1] # e.g. Waku
|
||||
let libTypeName = formalParams[1][1] # e.g. MyLibObj
|
||||
|
||||
let procNameStr = block:
|
||||
let raw = $procName
|
||||
@ -1537,7 +1538,7 @@ macro ffiDtor*(prc: untyped): untyped =
|
||||
let exportedProcName =
|
||||
if procName.kind == nnkPostfix: procName[1] else: procName
|
||||
|
||||
let destroyResIdent = genSym(nskLet, "destroyRes")
|
||||
let releaseResIdent = genSym(nskLet, "destroyRes")
|
||||
|
||||
let ffiBody = newStmtList()
|
||||
|
||||
@ -1566,17 +1567,14 @@ macro ffiDtor*(prc: untyped): untyped =
|
||||
|
||||
let poolIdent = ident($libTypeName & "FFIPool")
|
||||
ffiBody.add quote do:
|
||||
let `destroyResIdent` =
|
||||
`poolIdent`.destroyFFIContext(cast[ptr FFIContext[`libTypeName`]](ctx))
|
||||
if `destroyResIdent`.isErr():
|
||||
if not callback.isNil:
|
||||
let errStr = "destroy failed: " & $`destroyResIdent`.error
|
||||
let `releaseResIdent` = releaseFFIContext(
|
||||
cast[ptr FFIContext[`libTypeName`]](ctx), callback, userData
|
||||
)
|
||||
if `releaseResIdent`.isErr():
|
||||
if not callback.isNil():
|
||||
let errStr = "release failed: " & $`releaseResIdent`.error
|
||||
callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData)
|
||||
return RET_ERR
|
||||
|
||||
ffiBody.add quote do:
|
||||
if not callback.isNil:
|
||||
callback(RET_OK, nil, 0, userData)
|
||||
return RET_OK
|
||||
|
||||
let ffiProc = newProc(
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import std/[locks, strutils, os]
|
||||
import std/[locks, strutils, os, osproc, sequtils]
|
||||
import unittest2
|
||||
import results
|
||||
import ../ffi
|
||||
@ -129,18 +129,18 @@ suite "FFIContextPool":
|
||||
return
|
||||
check pool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
test "slot is reused after destroy":
|
||||
test "context is reused after destroy":
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx1 = pool.createFFIContext().valueOr:
|
||||
assert false, "createFFIContext(pool) failed: " & $error
|
||||
return
|
||||
check pool.destroyFFIContext(ctx1).isOk()
|
||||
# After destroying, the same slot must be available again
|
||||
# After destroying, the same context must be available again
|
||||
let ctx2 = pool.createFFIContext().valueOr:
|
||||
assert false, "createFFIContext(pool) failed after slot release: " & $error
|
||||
assert false, "createFFIContext(pool) failed after context release: " & $error
|
||||
return
|
||||
check pool.destroyFFIContext(ctx2).isOk()
|
||||
check ctx1 == ctx2 # same array slot reused
|
||||
check ctx1 == ctx2 # same context reused
|
||||
|
||||
test "pool exhaustion returns error":
|
||||
var pool: FFIContextPool[TestLib]
|
||||
@ -149,7 +149,7 @@ suite "FFIContextPool":
|
||||
ctxs[i] = pool.createFFIContext().valueOr:
|
||||
for j in 0 ..< i:
|
||||
discard pool.destroyFFIContext(ctxs[j])
|
||||
assert false, "createFFIContext(pool) failed at slot " & $i & ": " & $error
|
||||
assert false, "createFFIContext(pool) failed at context " & $i & ": " & $error
|
||||
return
|
||||
# Pool is now full — next create must fail
|
||||
check pool.createFFIContext().isErr()
|
||||
@ -421,6 +421,12 @@ proc testlib_create*(
|
||||
): Future[Result[SimpleLib, string]] {.ffiCtor.} =
|
||||
return ok(SimpleLib(value: config.initialValue))
|
||||
|
||||
# Records the value of the library object the destructor body saw, so a test can
|
||||
# confirm the user cleanup body ran with the right lib state before teardown.
|
||||
var gDestroyedValue {.threadvar.}: int
|
||||
proc testlib_destroy*(lib: SimpleLib) {.ffiDtor.} =
|
||||
gDestroyedValue = lib.value
|
||||
|
||||
suite "ffiCtor macro":
|
||||
test "creates context and returns pointer via callback":
|
||||
var d: CallbackData
|
||||
@ -450,6 +456,123 @@ suite "ffiCtor macro":
|
||||
|
||||
check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
proc createSimpleLib(initialValue: int): ptr FFIContext[SimpleLib] =
|
||||
## Helper: run the generated async ctor and return the live ctx.
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer:
|
||||
deinitCallbackData(d)
|
||||
let ret =
|
||||
testlib_create(ffiSerialize(SimpleConfig(initialValue: initialValue)).cstring,
|
||||
testCallback, addr d)
|
||||
doAssert not ret.isNil()
|
||||
waitCallback(d)
|
||||
doAssert d.retCode == RET_OK
|
||||
return cast[ptr FFIContext[SimpleLib]](cast[uint](parseBiggestUInt(callbackMsg(d))))
|
||||
|
||||
suite "ffiDtor macro (async destroy + reuse)":
|
||||
test "destroy fires RET_OK after teardown, frees myLib, and frees the context":
|
||||
let ctx = createSimpleLib(5)
|
||||
check not ctx[].myLib.isNil
|
||||
check ctx[].myLib[].value == 5
|
||||
|
||||
var dD: CallbackData
|
||||
initCallbackData(dD)
|
||||
defer:
|
||||
deinitCallbackData(dD)
|
||||
|
||||
# Async destroy: the C return is just "accepted"; the real outcome arrives
|
||||
# via the callback once the FFI thread has finished tearing the lib down.
|
||||
check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK
|
||||
waitCallback(dD)
|
||||
check dD.retCode == RET_OK
|
||||
check gDestroyedValue == 5 # the user cleanup body saw the live lib
|
||||
check ctx[].myLib.isNil() # freed on the FFI thread
|
||||
|
||||
# The context was freed from the FFI thread, so a fresh create reclaims it.
|
||||
let ctx2 = createSimpleLib(9)
|
||||
check ctx2 == ctx # same context, reused worker + fds
|
||||
check ctx2[].myLib[].value == 9
|
||||
check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk()
|
||||
|
||||
test "destroy waits for an in-flight request before reporting RET_OK":
|
||||
let ctx = createSimpleLib(1)
|
||||
|
||||
# Dispatch a 500 ms handler and do NOT wait — it is in flight at destroy time.
|
||||
var slow: CallbackData
|
||||
initCallbackData(slow)
|
||||
defer:
|
||||
deinitCallbackData(slow)
|
||||
check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk()
|
||||
|
||||
var dD: CallbackData
|
||||
initCallbackData(dD)
|
||||
defer:
|
||||
deinitCallbackData(dD)
|
||||
check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK
|
||||
waitCallback(dD)
|
||||
check dD.retCode == RET_OK
|
||||
# Drained: the in-flight handler ran to completion before destroy reported OK.
|
||||
check slow.called
|
||||
check callbackMsg(slow) == "slow-done"
|
||||
|
||||
let ctx2 = createSimpleLib(2)
|
||||
check ctx2 == ctx
|
||||
check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk()
|
||||
|
||||
test "requests are rejected once a destroy closes the gate":
|
||||
let ctx = createSimpleLib(3)
|
||||
|
||||
var dD: CallbackData
|
||||
initCallbackData(dD)
|
||||
defer:
|
||||
deinitCallbackData(dD)
|
||||
check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK
|
||||
waitCallback(dD)
|
||||
check dD.retCode == RET_OK
|
||||
|
||||
# Gate stays closed until the context is reacquired: a late request must not
|
||||
# dispatch onto a context about to be (or already) reused.
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer:
|
||||
deinitCallbackData(d)
|
||||
check sendRequestToFFIThread(
|
||||
ctx, SlowRequest.ffiNewReq(testCallback, addr d)
|
||||
).isErr()
|
||||
|
||||
let ctx2 = createSimpleLib(4)
|
||||
check ctx2 == ctx
|
||||
check SimpleLibFFIPool.destroyFFIContext(ctx2).isOk()
|
||||
|
||||
test "a stuck context is reported as RET_ERR rather than hanging":
|
||||
let ctx = createSimpleLib(8)
|
||||
|
||||
let savedTimeout = RecycleTimeout
|
||||
RecycleTimeout = 150.milliseconds
|
||||
defer:
|
||||
RecycleTimeout = savedTimeout
|
||||
|
||||
# In-flight handler outlasts the (shortened) drain timeout.
|
||||
var slow: CallbackData
|
||||
initCallbackData(slow)
|
||||
defer:
|
||||
deinitCallbackData(slow)
|
||||
check sendRequestToFFIThread(ctx, SlowRequest.ffiNewReq(testCallback, addr slow)).isOk()
|
||||
|
||||
var dD: CallbackData
|
||||
initCallbackData(dD)
|
||||
defer:
|
||||
deinitCallbackData(dD)
|
||||
check testlib_destroy(cast[pointer](ctx), testCallback, addr dD) == RET_OK
|
||||
waitCallback(dD)
|
||||
check dD.retCode == RET_ERR # drain timed out -> ctx reported stuck
|
||||
|
||||
# The stuck context is leaked (not reused); the handler still finishes on its
|
||||
# own. Wait for it, then fully tear the leaked context down.
|
||||
waitCallback(slow)
|
||||
check SimpleLibFFIPool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Simplified .ffi. macro integration test
|
||||
# ---------------------------------------------------------------------------
|
||||
@ -648,3 +771,122 @@ suite "ptr return type in .ffi.":
|
||||
|
||||
waitCallback(freeD)
|
||||
check freeD.retCode == RET_OK
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# releaseFFIContext: park & reuse (fd-leak regression)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
proc countOpenFds(): int =
|
||||
## Number of open fds for this process, or -1 if not determinable on this
|
||||
## platform. On Linux we count /proc/self/fd; elsewhere we shell out to lsof
|
||||
## (skipped if lsof is unavailable, e.g. Windows).
|
||||
when defined(linux):
|
||||
var n = 0
|
||||
for _ in walkDir("/proc/self/fd"):
|
||||
inc n
|
||||
return n
|
||||
else:
|
||||
if findExe("lsof").len == 0:
|
||||
return -1
|
||||
try:
|
||||
let output =
|
||||
execProcess("lsof", args = ["-p", $getCurrentProcessId()], options = {poUsePath})
|
||||
return output.splitLines().countIt(it.len > 0)
|
||||
except CatchableError:
|
||||
return -1
|
||||
|
||||
proc releaseAndWait[T](ctx: ptr FFIContext[T]): cint =
|
||||
## Test helper mirroring how a C consumer destroys a context: kick off the
|
||||
## (non-blocking) teardown and block on the callback, returning its retCode.
|
||||
## RET_OK means the lib's in-flight tasks finished and the context was parked.
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer:
|
||||
deinitCallbackData(d)
|
||||
if ctx.releaseFFIContext(testCallback, addr d).isErr():
|
||||
return RET_ERR
|
||||
waitCallback(d)
|
||||
return d.retCode
|
||||
|
||||
suite "releaseFFIContext (park & reuse)":
|
||||
test "park returns the context and reuses the same live worker":
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx1 = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
check ctx1.releaseAndWait() == RET_OK
|
||||
|
||||
# Reacquire: must be the same context, with its worker still running.
|
||||
let ctx2 = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
check ctx1 == ctx2
|
||||
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer:
|
||||
deinitCallbackData(d)
|
||||
check sendRequestToFFIThread(
|
||||
ctx2, PingRequest.ffiNewReq(testCallback, addr d, "reuse".cstring)
|
||||
).isOk()
|
||||
waitCallback(d)
|
||||
check d.retCode == RET_OK
|
||||
check callbackMsg(d) == "pong:reuse" # reused worker still processes requests
|
||||
|
||||
check pool.destroyFFIContext(ctx2).isOk()
|
||||
|
||||
test "park drops the stale event callback and library pointer":
|
||||
var pool: FFIContextPool[TestLib]
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
ctx.callbackState.callback = cast[pointer](testCallback)
|
||||
ctx.callbackState.userData = cast[pointer](0xDEAD)
|
||||
|
||||
check ctx.releaseAndWait() == RET_OK
|
||||
check ctx.callbackState.callback.isNil() # a watchdog tick can't call a freed cb
|
||||
check ctx.callbackState.userData.isNil()
|
||||
check ctx.myLib.isNil()
|
||||
|
||||
check pool.destroyFFIContext(ctx).isOk()
|
||||
|
||||
test "fd usage stays bounded across many park/reuse cycles":
|
||||
if countOpenFds() < 0:
|
||||
skip() # no fd-counting facility on this platform
|
||||
else:
|
||||
var pool: FFIContextPool[TestLib]
|
||||
|
||||
# Warm up: the first create builds the context's worker (its fds are allocated
|
||||
# once here); parking keeps them open for reuse.
|
||||
block:
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
check ctx.releaseAndWait() == RET_OK
|
||||
|
||||
let baseline = countOpenFds()
|
||||
|
||||
for _ in 0 ..< 20:
|
||||
let ctx = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
check sendRequestToFFIThread(
|
||||
ctx, PingRequest.ffiNewReq(testCallback, addr d, "x".cstring)
|
||||
).isOk()
|
||||
waitCallback(d)
|
||||
deinitCallbackData(d)
|
||||
check ctx.releaseAndWait() == RET_OK
|
||||
|
||||
let afterCycles = countOpenFds()
|
||||
# Reuse must not grow fds. Before the fix each cycle leaked ~10 fds (4
|
||||
# ThreadSignalPtr socketpairs + 2 dispatcher kqueues); the small slack
|
||||
# only tolerates unrelated runtime fd noise, not a per-cycle leak.
|
||||
check afterCycles <= baseline + 5
|
||||
|
||||
# Tear the (still parked) context's worker down so the test leaves no threads.
|
||||
let last = pool.createFFIContext().valueOr:
|
||||
check false
|
||||
return
|
||||
check pool.destroyFFIContext(last).isOk()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user