better code separation and better error handling

This commit is contained in:
Ivan FB 2026-05-02 00:56:43 +02:00
parent 4da0ec3cf1
commit a90453faa1
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
3 changed files with 68 additions and 51 deletions

View File

@ -2,9 +2,10 @@ import std/[atomics, tables]
import chronos, chronicles
import
ffi/internal/[ffi_library, ffi_macro],
ffi/[alloc, ffi_types, ffi_context, ffi_thread_request, serial]
ffi/[alloc, ffi_types, ffi_context, ffi_context_pool, ffi_thread_request, serial]
export atomics, tables
export chronos, chronicles
export
atomics, alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_thread_request, serial
atomics, alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_context_pool,
ffi_thread_request, serial

View File

@ -41,18 +41,6 @@ type FFIContext*[T] = object
var ffiCurrentCallbackState* {.threadvar.}: ptr FFICallbackState
## Set by ffiThreadBody at thread startup; read by dispatchFfiEvent.
const MaxFFIContexts* = 32
## Maximum number of concurrently live FFI contexts when using FFIContextPool.
## Fds and threads are only consumed for slots that are actually acquired,
## so this value only affects the upfront memory of the pool array.
type FFIContextPool*[T] = object
## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context
## and bounds the total number of file descriptors consumed by ThreadSignalPtrs
## to at most MaxFFIContexts * 2.
slots: array[MaxFFIContexts, FFIContext[T]]
inUse: array[MaxFFIContexts, Atomic[bool]]
const git_version* {.strdefine.} = "n/a"
var contextRegistry = initHashSet[pointer]()
@ -190,14 +178,14 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
# waitSync returns early if watchdogStopSignal fires (i.e. on destroy).
let startWait = ctx.watchdogStopSignal.waitSync(WatchdogStartDelay)
if startWait.isErr():
error "watchdog: start-delay waitSync failed", err = startWait.error
error "watchdog: start-delay waitSync failed", error = startWait.error
elif startWait.get():
return # stop signal fired during start delay
while ctx.running.load:
let intervalWait = ctx.watchdogStopSignal.waitSync(WatchdogTimeinterval)
if intervalWait.isErr():
error "watchdog: interval waitSync failed", err = intervalWait.error
error "watchdog: interval waitSync failed", error = intervalWait.error
elif intervalWait.get() or not ctx.running.load:
break
@ -254,7 +242,7 @@ proc processRequest[T](
try:
handleRes(res, request)
except Exception as exc:
error "Unexpected exception in handleRes", exc = exc.msg
error "Unexpected exception in handleRes", error = exc.msg
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
## FFI thread body that attends library user API requests
@ -297,7 +285,7 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
waitFor ffiRun(ctx)
proc closeResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
proc closeResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Closes file descriptors and deinits the lock. Does NOT free ctx memory.
## Used by initContextResources error paths and pool destroy, where ctx is
## not heap-allocated (pool slots live in a fixed array, not on the heap).
@ -320,7 +308,7 @@ proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
freeShared(ctx)
return ctx.closeResources()
proc initContextResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
proc initContextResources*[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Initialises all resources inside an already-allocated FFIContext slot.
## On failure every partially-initialised resource is closed; the caller
## is responsible for releasing the slot (freeShared or pool.releaseSlot).
@ -361,7 +349,7 @@ proc initContextResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
createThread(ctx.ffiThread, ffiThreadBody[T], ctx)
except ValueError, ResourceExhaustedError:
ctx.closeResources().isOkOr:
error "failed to close resources after ffiThread creation failure", err = error
error "failed to close resources after ffiThread creation failure", error = error
return err("failed to create the FFI thread: " & getCurrentExceptionMsg())
try:
@ -372,33 +360,17 @@ proc initContextResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
ctx.running.store(false)
let fireRes = ctx.reqSignal.fireSync()
if fireRes.isErr():
error "failed to signal ffiThread during watchdog cleanup",
err = fireRes.error
error "failed to signal ffiThread during watchdog cleanup", error = fireRes.error
joinThread(ctx.ffiThread)
ctx.closeResources().isOkOr:
error "failed to close resources after watchdogThread creation failure",
err = error
error = error
return err("failed to create the watchdog thread: " & getCurrentExceptionMsg())
registerCtx(cast[pointer](ctx))
success = true
return ok()
# ── Pool helpers ─────────────────────────────────────────────────────────────
proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] =
for i in 0 ..< MaxFFIContexts:
var expected = false
if pool.inUse[i].compareExchange(expected, true):
return ok(pool.slots[i].addr)
return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
for i in 0 ..< MaxFFIContexts:
if pool.slots[i].addr == ctx:
pool.inUse[i].store(false)
return
# ── Public API ────────────────────────────────────────────────────────────────
proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
@ -410,19 +382,7 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
return err(error)
return ok(ctx)
proc createFFIContext*[T](
pool: var FFIContextPool[T]
): Result[ptr FFIContext[T], string] =
## Acquires a slot from the fixed pool and initialises it as an FFI context.
## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open.
let ctx = pool.acquireSlot().valueOr:
return err(error)
initContextResources(ctx).isOkOr:
pool.releaseSlot(ctx)
return err(error)
return ok(ctx)
proc signalStop[T](ctx: ptr FFIContext[T]): Result[void, string] =
proc signalStop*[T](ctx: ptr FFIContext[T]): Result[void, string] =
ctx.running.store(false)
let ffiSignaled = ctx.reqSignal.fireSync().valueOr:
ctx.onNotResponding()

56
ffi/ffi_context_pool.nim Normal file
View File

@ -0,0 +1,56 @@
import std/atomics
import results, chronicles
import ./ffi_context
export ffi_context
const MaxFFIContexts* = 32
## Maximum number of concurrently live FFI contexts.
## Fds and threads are only consumed for slots that are actually acquired,
## so this value only affects the upfront memory of the pool array.
type FFIContextPool*[T] = object
## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context
## and bounds the total number of file descriptors consumed by ThreadSignalPtrs
## to at most MaxFFIContexts * 2.
slots: array[MaxFFIContexts, FFIContext[T]]
inUse: array[MaxFFIContexts, Atomic[bool]]
proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] =
for i in 0 ..< MaxFFIContexts:
var expected = false
if pool.inUse[i].compareExchange(expected, true):
return ok(pool.slots[i].addr)
return err("FFI context pool exhausted; max: " & $MaxFFIContexts & " contexts)")
proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
for i in 0 ..< MaxFFIContexts:
if pool.slots[i].addr == ctx:
pool.inUse[i].store(false)
return
proc createFFIContext*[T](
pool: var FFIContextPool[T]
): Result[ptr FFIContext[T], string] =
## Acquires a slot from the fixed pool and initialises it as an FFI context.
## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open.
let ctx = pool.acquireSlot().valueOr:
return err("failure calling acquireSlot in createFFIContext: " & $error)
initContextResources(ctx).isOkOr:
pool.releaseSlot(ctx)
return err("failure calling initContextResources in createFFIContext: " & $error)
return ok(ctx)
proc destroyFFIContext*[T](
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
): Result[void, string] =
## Stops the FFI context and returns its slot to the pool.
defer:
joinFFIThreads(ctx)
ctx.closeResources().isOkOr:
error "failed to close resources in destroyFFIContext", error = error
return err("failure calling closeResources in destroyFFIContext: " & $error)
pool.releaseSlot(ctx)
ctx.signalStop().isOkOr:
return err("failure calling signalStop in destroyFFIContext: " & $error)
return ok()