better context modules org and avoid use ctx registry

This commit is contained in:
Ivan FB 2026-05-12 23:44:56 +02:00
parent 397150e6bc
commit dbc58520f4
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
5 changed files with 153 additions and 151 deletions

View File

@ -2,10 +2,10 @@ import std/[atomics, tables]
import chronos, chronicles
import
ffi/internal/[ffi_library, ffi_macro],
ffi/[alloc, ffi_types, ffi_context, ffi_thread_request, serial]
ffi/[alloc, ffi_types, ffi_context, ffi_context_pool, ffi_thread_request, serial]
export atomics, tables
export chronos, chronicles
export
atomics, alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_thread_request,
serial
atomics, alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_context_pool,
ffi_thread_request, serial

View File

@ -2,7 +2,7 @@
{.pragma: callback, cdecl, raises: [], gcsafe.}
{.passc: "-fPIC".}
import std/[options, atomics, os, net, locks, json, tables, sets]
import std/[atomics, locks, json, tables]
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro, ./logging
@ -41,30 +41,6 @@ var ffiCurrentCallbackState* {.threadvar.}: ptr FFICallbackState
const git_version* {.strdefine.} = "n/a"
var contextRegistry = initHashSet[pointer]()
var contextRegistryLock: Lock
contextRegistryLock.initLock()
proc registerCtx(ctx: pointer) =
{.cast(gcsafe).}:
contextRegistryLock.acquire()
defer: contextRegistryLock.release()
contextRegistry.incl(ctx)
proc unregisterCtx(ctx: pointer) =
{.cast(gcsafe).}:
contextRegistryLock.acquire()
defer: contextRegistryLock.release()
contextRegistry.excl(ctx)
proc isValidCtx*(ctx: pointer): bool =
## Returns true only if ctx was created by createFFIContext and not yet destroyed.
## Rejects nil, offset-invalid, and dangling pointers at the API boundary.
{.cast(gcsafe).}:
contextRegistryLock.acquire()
defer: contextRegistryLock.release()
return contextRegistry.contains(ctx)
template callEventCallback*(ctx: ptr FFIContext, eventName: string, body: untyped) =
if isNil(ctx[].callbackState.callback):
chronicles.error eventName & " - eventCallback is nil"
@ -74,14 +50,20 @@ template callEventCallback*(ctx: ptr FFIContext, eventName: string, body: untype
try:
let event = body
cast[FFICallBack](ctx[].callbackState.callback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].callbackState.userData
RET_OK,
unsafeAddr event[0],
cast[csize_t](len(event)),
ctx[].callbackState.userData,
)
except Exception, CatchableError:
let msg =
"Exception " & eventName & " when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[FFICallBack](ctx[].callbackState.callback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].callbackState.userData
RET_ERR,
unsafeAddr msg[0],
cast[csize_t](len(msg)),
ctx[].callbackState.userData,
)
template dispatchFfiEvent*(eventName: string, body: untyped) =
@ -99,8 +81,7 @@ template dispatchFfiEvent*(eventName: string, body: untyped) =
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ffiState[].userData
)
except Exception, CatchableError:
let msg =
"Exception dispatching " & eventName & ": " & getCurrentExceptionMsg()
let msg = "Exception dispatching " & eventName & ": " & getCurrentExceptionMsg()
cast[FFICallBack](ffiState[].callback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ffiState[].userData
)
@ -108,9 +89,6 @@ template dispatchFfiEvent*(eventName: string, body: untyped) =
proc sendRequestToFFIThread*(
ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration
): Result[void, string] =
if not isValidCtx(cast[pointer](ctx)):
deleteRequest(ffiRequest)
return err("ctx is not a valid FFI context")
ctx.lock.acquire()
# This lock is only necessary while we use a SP Channel and while the signalling
# between threads assumes that there aren't concurrent requests.
@ -229,7 +207,9 @@ proc processRequest[T](
try:
await retFut
except AsyncError as exc:
Result[string, string].err("Async error in processRequest for " & reqId & ": " & exc.msg)
Result[string, string].err(
"Async error in processRequest for " & reqId & ": " & exc.msg
)
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
## keeps the async proc raises:[] compatible. The defer inside handleRes
@ -250,8 +230,7 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
# wait can unblock and proceed with cleanup.
let fireRes = ctx.threadExitSignal.fireSync()
if fireRes.isErr():
error "failed to fire threadExitSignal on FFI thread exit",
err = fireRes.error
error "failed to fire threadExitSignal on FFI thread exit", err = fireRes.error
let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} =
var ffiReqHandler: T
@ -317,7 +296,7 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
?ctx.threadExitSignal.close()
return ok()
proc initContextResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Initialises all resources inside an already-allocated FFIContext slot.
## On failure every partially-initialised resource is closed; the caller
## is responsible for releasing the slot (freeShared or pool.releaseSlot).
@ -328,7 +307,7 @@ proc initContextResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
if not success:
ctx.cleanUpResources().isOkOr:
error "failed to clean up resources after createFFIContext failure",
err = error
error = error
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr: " & $error)
@ -363,47 +342,9 @@ proc initContextResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
joinThread(ctx.ffiThread)
return err("failed to create the watchdog thread: " & getCurrentExceptionMsg())
registerCtx(cast[pointer](ctx))
success = true
return ok()
const MaxFFIContexts* = 32
## Maximum number of concurrently live FFI contexts when using FFIContextPool.
## Fds and threads are only consumed for slots that are actually acquired,
## so this value only affects the upfront memory of the pool array.
type FFIContextPool*[T] = object
## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context
## and bounds the total number of file descriptors consumed by ThreadSignalPtrs
## to at most MaxFFIContexts * 2.
slots: array[MaxFFIContexts, FFIContext[T]]
inUse: array[MaxFFIContexts, Atomic[bool]]
proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] =
for i in 0 ..< MaxFFIContexts:
var expected = false
if pool.inUse[i].compareExchange(expected, true):
return ok(pool.slots[i].addr)
return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
for i in 0 ..< MaxFFIContexts:
if pool.slots[i].addr == ctx:
pool.inUse[i].store(false)
return
proc createFFIContext*[T](
pool: var FFIContextPool[T]
): Result[ptr FFIContext[T], string] =
## Acquires a slot from the fixed pool and initialises it as an FFI context.
## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open.
let ctx = pool.acquireSlot().valueOr:
return err("createFFIContext: acquireSlot failed: " & $error)
initContextResources(ctx).isOkOr:
pool.releaseSlot(ctx)
return err("createFFIContext: initContextResources failed: " & $error)
return ok(ctx)
proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] =
ctx.running.store(false)
let reqSignaled = ctx.reqSignal.fireSync().valueOr:
@ -423,14 +364,15 @@ proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] =
## clearContext waits on threadExitSignal up to this bound; on timeout it
## returns err and skips joinThread/cleanup (leaking the thread + ctx slot)
## rather than hanging the caller forever.
const ThreadExitTimeout = 1500.milliseconds
proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Stops the FFI context that was created via createFFIContext[T]() (heap).
unregisterCtx(cast[pointer](ctx))
const ThreadExitTimeout* = 1500.milliseconds
proc stopAndJoinThreads*[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Signals the FFI and watchdog threads to stop, waits up to ThreadExitTimeout
## for the FFI thread to exit, and joins both. On timeout returns err and
## skips joinThread (leaving the threads live) rather than hanging the caller.
## Resource cleanup (signal fds, lock) is the caller's responsibility.
ctx.signalStop().isOkOr:
return err("clearContext: signalStop failed: " & $error)
return err("signalStop failed: " & $error)
let exitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr:
ctx.onNotResponding()
@ -442,40 +384,12 @@ proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] =
joinThread(ctx.ffiThread)
joinThread(ctx.watchdogThread)
return ok()
proc clearContext[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Stops the FFI context that was created via createFFIContext[T]() (heap).
ctx.stopAndJoinThreads().isOkOr:
return err("clearContext: " & $error)
ctx.cleanUpResources().isOkOr:
return err("cleanUpResources failed: " & $error)
return ok()
proc destroyFFIContext*[T](
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
): Result[void, string] =
## Stops the FFI context and returns its slot to the pool. If the FFI thread
## is blocked and does not exit in time, the slot is leaked rather than
## reclaimed — closing its resources while the thread is still live would be
## unsafe.
unregisterCtx(cast[pointer](ctx))
ctx.signalStop().isOkOr:
return err("destroyFFIContext(pool): signalStop failed: " & $error)
let exitedOnTime = ctx.threadExitSignal.waitSync(ThreadExitTimeout).valueOr:
ctx.onNotResponding()
return err("error waiting for FFI thread exit: " & $error)
if not exitedOnTime:
ctx.onNotResponding()
return err("FFI thread did not exit in time; leaking pool slot to avoid hang")
joinThread(ctx.ffiThread)
joinThread(ctx.watchdogThread)
pool.releaseSlot(ctx)
return ok()
template checkParams*(ctx: ptr FFIContext, callback: FFICallBack, userData: pointer) =
if not isValidCtx(cast[pointer](ctx)):
return RET_ERR
ctx[].userData = userData
if isNil(callback):
return RET_MISSING_CALLBACK

63
ffi/ffi_context_pool.nim Normal file
View File

@ -0,0 +1,63 @@
import std/atomics
import results
import ./ffi_context
const MaxFFIContexts* = 32
## Maximum number of concurrently live FFI contexts when using FFIContextPool.
## Fds and threads are only consumed for slots that are actually acquired,
## so this value only affects the upfront memory of the pool array.
type FFIContextPool*[T] = object
## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context
## and bounds the total number of file descriptors consumed by ThreadSignalPtrs
## to at most MaxFFIContexts * 2.
slots: array[MaxFFIContexts, FFIContext[T]]
inUse: array[MaxFFIContexts, Atomic[bool]]
proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] =
for i in 0 ..< MaxFFIContexts:
var expected = false
if pool.inUse[i].compareExchange(expected, true):
return ok(pool.slots[i].addr)
return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
for i in 0 ..< MaxFFIContexts:
if pool.slots[i].addr == ctx:
pool.inUse[i].store(false)
return
proc createFFIContext*[T](
pool: var FFIContextPool[T]
): Result[ptr FFIContext[T], string] =
## Acquires a slot from the fixed pool and initialises it as an FFI context.
## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open.
let ctx = pool.acquireSlot().valueOr:
return err("createFFIContext: acquireSlot failed: " & $error)
initContextResources(ctx).isOkOr:
pool.releaseSlot(ctx)
return err("createFFIContext: initContextResources failed: " & $error)
return ok(ctx)
proc destroyFFIContext*[T](
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
): Result[void, string] =
## Stops the FFI context and returns its slot to the pool. If the FFI thread
## is blocked and does not exit in time, the slot is leaked rather than
## reclaimed — closing its resources while the thread is still live would be
## unsafe.
ctx.stopAndJoinThreads().isOkOr:
return err("destroyFFIContext(pool): " & $error)
pool.releaseSlot(ctx)
return ok()
proc isValidCtx*[T](pool: var FFIContextPool[T], ctx: pointer): bool =
## Returns true only if ctx points to one of the pool's slots that is
## currently in use. Rejects nil, offset-invalid, and dangling pointers
## at the API boundary, preventing use-after-free dereferences.
if ctx.isNil():
return false
for i in 0 ..< MaxFFIContexts:
if cast[pointer](pool.slots[i].addr) == ctx:
return pool.inUse[i].load()
return false

View File

@ -533,6 +533,12 @@ macro ffiRaw*(prc: untyped): untyped =
let paramIdent = firstParam[0]
let paramType = firstParam[1]
# The first param of an `.ffiRaw.` proc is `ctx: ptr FFIContext[LibType]`.
# Extract LibType so we can call the module-level pool var (named
# "<LibType>FFIPool", declared by `.ffiCtor.`) to validate ctx.
let libTypeName = paramType[0][1]
let poolIdent = ident($libTypeName & "FFIPool")
let reqName = ident($procName & "Req")
let returnType = ident("cint")
@ -569,7 +575,7 @@ macro ffiRaw*(prc: untyped): untyped =
let ffiBody = newStmtList(
quote do:
initializeLibrary()
if not isValidCtx(cast[pointer](ctx)):
if not `poolIdent`.isValidCtx(cast[pointer](ctx)):
return RET_ERR
ctx[].userData = userData
if isNil(callback):
@ -804,9 +810,10 @@ macro ffi*(prc: untyped): untyped =
if callback.isNil:
return RET_MISSING_CALLBACK
let asyncPoolIdent = ident($libTypeName & "FFIPool")
ffiBody.add quote do:
if ctx.isNil or ctx[].myLib.isNil:
let errStr = "context not initialized"
if not `asyncPoolIdent`.isValidCtx(cast[pointer](ctx)):
let errStr = "ctx is not a valid FFI context"
callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData)
return RET_ERR
@ -930,9 +937,10 @@ macro ffi*(prc: untyped): untyped =
if callback.isNil:
return RET_MISSING_CALLBACK
let syncPoolIdent = ident($libTypeName & "FFIPool")
syncFfiBody.add quote do:
if ctx.isNil or ctx[].myLib.isNil:
let errStr = "context not initialized"
if not `syncPoolIdent`.isValidCtx(cast[pointer](ctx)):
let errStr = "ctx is not a valid FFI context"
callback(RET_ERR, unsafeAddr errStr[0], cast[csize_t](errStr.len), userData)
return RET_ERR

View File

@ -1,41 +1,58 @@
import std/locks
import std/[atomics, locks]
import unittest2
import results
import ../ffi
type TestLib = object
proc dummyCallback(
ffiType:
type CtxValidationConfig = object
initialValue: int
proc ctxval_create*(
config: CtxValidationConfig
): Future[Result[TestLib, string]] {.ffiCtor.} =
return ok(TestLib())
proc ctxval_ping*(lib: TestLib): Future[Result[string, string]] {.ffi.} =
return ok("pong")
type CallbackState = object
lock: Lock
called: Atomic[bool]
retCode: cint
proc initCbState(s: var CallbackState) =
s.lock.initLock()
s.called.store(false)
proc validationCallback(
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
discard
let s = cast[ptr CallbackState](userData)
s[].retCode = retCode
s[].called.store(true)
registerReqFFI(ValidationTestRequest, lib: ptr TestLib):
proc(): Future[Result[string, string]] {.async.} =
return ok("ok")
suite "ctx pointer validation at the FFI entry point":
# The macro-generated FFI entry point validates ctx via
# <LibType>FFIPool.isValidCtx. Any caller — C or Nim — that passes a nil or
# offset-invalid ctx with a valid callback should receive RET_ERR via the
# callback and the proc should return RET_ERR, never crash.
suite "ctx pointer validation":
# BUG: sendRequestToFFIThread has no nil-check on ctx.
# checkParams / {.ffi.} generated code only guards against nil callback,
# not nil (or otherwise invalid) ctx. Any caller — C or Nim — that passes
# a nil or offset-invalid ctx with a valid callback bypasses the only guard
# and reaches ctx.lock.acquire() where the nil/garbage dereference crashes.
test "nil ctx with valid callback should return an error, not crash":
# Reproduces the nil case: ctx=nil, callback=valid.
# Expected (after fix): sendRequestToFFIThread returns isErr().
# Actual (currently) : SIGSEGV at ctx.lock.acquire() in sendRequestToFFIThread.
test "nil ctx with valid callback returns RET_ERR via callback, no crash":
var s: CallbackState
initCbState(s)
let nilCtx: ptr FFIContext[TestLib] = nil
let req = ValidationTestRequest.ffiNewReq(dummyCallback, nil)
let res = sendRequestToFFIThread(nilCtx, req)
check res.isErr()
let ret = ctxval_ping(nilCtx, validationCallback, addr s)
check ret == RET_ERR
check s.called.load()
check s.retCode == RET_ERR
test "invalid non-nil ctx (ctx+123 style) should return an error, not crash":
# Reproduces the offset-pointer case: a non-nil but invalid pointer passes
# isNil() and reaches the lock dereference, causing a crash.
# Expected (after fix): sendRequestToFFIThread returns isErr().
# Actual (currently) : SIGSEGV when the garbage pointer is dereferenced.
test "invalid non-nil ctx (offset-pointer) returns RET_ERR, no crash":
var s: CallbackState
initCbState(s)
let invalidCtx = cast[ptr FFIContext[TestLib]](123)
let req = ValidationTestRequest.ffiNewReq(dummyCallback, nil)
let res = sendRequestToFFIThread(invalidCtx, req)
check res.isErr()
let ret = ctxval_ping(invalidCtx, validationCallback, addr s)
check ret == RET_ERR
check s.called.load()
check s.retCode == RET_ERR