use fixed array of ctx to avoid consuming all fds

This commit is contained in:
Ivan FB 2026-05-02 00:20:33 +02:00
parent e3eca63236
commit 534cbe8f1a
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
2 changed files with 204 additions and 42 deletions

View File

@ -18,6 +18,8 @@ type FFIContext*[T] = object
reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent
reqReceivedSignal: ThreadSignalPtr
# to signal main thread, interfacing with the FFI thread, that FFI thread received the request
watchdogStopSignal: ThreadSignalPtr
# fired by destroyFFIContext so the watchdog exits immediately instead of waiting out its sleep
userData*: pointer
eventCallback*: pointer
eventUserdata*: pointer
@ -25,6 +27,18 @@ type FFIContext*[T] = object
registeredRequests: ptr Table[cstring, FFIRequestProc]
# Pointer to with the registered requests at compile time
const MaxFFIContexts* = 32
## Maximum number of concurrently live FFI contexts when using FFIContextPool.
## Fds and threads are only consumed for slots that are actually acquired,
## so this value only affects the upfront memory of the pool array.
type FFIContextPool*[T] = object
## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context
## and bounds the total number of file descriptors consumed by ThreadSignalPtrs
## to at most MaxFFIContexts * 2.
slots: array[MaxFFIContexts, FFIContext[T]]
inUse: array[MaxFFIContexts, Atomic[bool]]
const git_version* {.strdefine.} = "n/a"
template callEventCallback*(ctx: ptr FFIContext, eventName: string, body: untyped) =
@ -110,13 +124,19 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
const WatchdogTimeinterval = 1.seconds
const WatchdogTimeout = 20.seconds
# Give time for the node to be created and up before sending watchdog requests
await sleepAsync(WatchdogStartDelay)
while true:
await sleepAsync(WatchdogTimeinterval)
# Give time for the node to be created and up before sending watchdog requests.
# waitSync returns early if watchdogStopSignal fires (i.e. on destroy).
let startWait = ctx.watchdogStopSignal.waitSync(WatchdogStartDelay)
if startWait.isErr():
error "watchdog: start-delay waitSync failed", err = startWait.error
elif startWait.get():
return # stop signal fired during start delay
if ctx.running.load == false:
debug "Watchdog thread exiting because FFIContext is not running"
while ctx.running.load:
let intervalWait = ctx.watchdogStopSignal.waitSync(WatchdogTimeinterval)
if intervalWait.isErr():
error "watchdog: interval waitSync failed", err = intervalWait.error
elif intervalWait.get() or not ctx.running.load:
break
let callback = proc(
@ -164,7 +184,9 @@ proc processRequest[T](
try:
await retFut
except CatchableError as exc:
Result[string, string].err("Exception in processRequest for " & reqId & ": " & exc.msg)
Result[string, string].err(
"Exception in processRequest for " & reqId & ": " & exc.msg
)
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
## keeps the async proc raises:[] compatible. The defer inside handleRes
@ -208,33 +230,47 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
waitFor ffiRun(ctx)
proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
defer:
freeShared(ctx)
proc closeResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Closes file descriptors and deinits the lock. Does NOT free ctx memory.
## Used by initContextResources error paths and pool destroy, where ctx is
## not heap-allocated (pool slots live in a fixed array, not on the heap).
ctx.lock.deinitLock()
if not ctx.reqSignal.isNil():
?ctx.reqSignal.close()
if not ctx.reqReceivedSignal.isNil():
?ctx.reqReceivedSignal.close()
if not ctx.watchdogStopSignal.isNil():
?ctx.watchdogStopSignal.close()
return ok()
proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
## This proc is called from the main thread and it creates
## the FFI working thread.
var ctx = createShared(FFIContext[T], 1)
proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Full cleanup for heap-allocated contexts: closes all resources and frees memory.
defer:
freeShared(ctx)
return ctx.closeResources()
proc initContextResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
## Initialises all resources inside an already-allocated FFIContext slot.
## On failure every partially-initialised resource is closed; the caller
## is responsible for releasing the slot (freeShared or pool.releaseSlot).
ctx.lock.initLock()
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
ctx.cleanUpResources().isOkOr:
return err("could not clean resources in a failure new reqSignal: " & $error)
ctx.closeResources().isOkOr:
return err("could not close resources after reqSignal failure: " & $error)
return err("couldn't create reqSignal ThreadSignalPtr: " & $error)
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
ctx.cleanUpResources().isOkOr:
return
err("could not clean resources in a failure new reqReceivedSignal: " & $error)
ctx.closeResources().isOkOr:
return err("could not close resources after reqReceivedSignal failure: " & $error)
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
ctx.watchdogStopSignal = ThreadSignalPtr.new().valueOr:
ctx.closeResources().isOkOr:
return
err("could not close resources after watchdogStopSignal failure: " & $error)
return err("couldn't create watchdogStopSignal ThreadSignalPtr")
ctx.registeredRequests = addr ffi_types.registeredRequests
ctx.running.store(true)
@ -242,8 +278,8 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
try:
createThread(ctx.ffiThread, ffiThreadBody[T], ctx)
except ValueError, ResourceExhaustedError:
ctx.cleanUpResources().isOkOr:
error "failed to clean up resources after ffiThread creation failure", err = error
ctx.closeResources().isOkOr:
error "failed to close resources after ffiThread creation failure", err = error
return err("failed to create the FFI thread: " & getCurrentExceptionMsg())
try:
@ -254,28 +290,84 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
if fireRes.isErr():
error "failed to signal ffiThread during watchdog cleanup", err = fireRes.error
joinThread(ctx.ffiThread)
ctx.cleanUpResources().isOkOr:
error "failed to clean up resources after watchdogThread creation failure", err = error
ctx.closeResources().isOkOr:
error "failed to close resources after watchdogThread creation failure",
err = error
return err("failed to create the watchdog thread: " & getCurrentExceptionMsg())
return ok()
# ── Pool helpers ─────────────────────────────────────────────────────────────
proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] =
for i in 0 ..< MaxFFIContexts:
var expected = false
if pool.inUse[i].compareExchange(expected, true):
return ok(pool.slots[i].addr)
return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)")
proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) =
for i in 0 ..< MaxFFIContexts:
if pool.slots[i].addr == ctx:
pool.inUse[i].store(false)
return
# ── Public API ────────────────────────────────────────────────────────────────
proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
## Creates a heap-allocated FFI context. The caller must call destroyFFIContext(ctx)
## to release it. Prefer the pool overload when the maximum context count is known.
var ctx = createShared(FFIContext[T], 1)
initContextResources(ctx).isOkOr:
freeShared(ctx)
return err(error)
return ok(ctx)
proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] =
proc createFFIContext*[T](
pool: var FFIContextPool[T]
): Result[ptr FFIContext[T], string] =
## Acquires a slot from the fixed pool and initialises it as an FFI context.
## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open.
let ctx = pool.acquireSlot().valueOr:
return err(error)
initContextResources(ctx).isOkOr:
pool.releaseSlot(ctx)
return err(error)
return ok(ctx)
proc signalStop[T](ctx: ptr FFIContext[T]): Result[void, string] =
ctx.running.store(false)
let ffiSignaled = ctx.reqSignal.fireSync().valueOr:
ctx.onNotResponding()
return err("error signaling reqSignal in destroyFFIContext: " & $error)
if not ffiSignaled:
ctx.onNotResponding()
return err("failed to signal reqSignal on time in destroyFFIContext")
let wdSignaled = ctx.watchdogStopSignal.fireSync().valueOr:
return err("error signaling watchdogStopSignal in destroyFFIContext: " & $error)
if not wdSignaled:
return err("failed to signal watchdogStopSignal on time in destroyFFIContext")
return ok()
proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] =
defer:
joinThread(ctx.ffiThread)
joinThread(ctx.watchdogThread)
ctx.cleanUpResources().isOkOr:
error "failed to clean up resources in destroyFFIContext", err = error
return ctx.signalStop()
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
ctx.onNotResponding()
return err("error in destroyFFIContext: " & $error)
if not signaledOnTime:
ctx.onNotResponding()
return err("failed to signal reqSignal on time in destroyFFIContext")
return ok()
proc destroyFFIContext*[T](
pool: var FFIContextPool[T], ctx: ptr FFIContext[T]
): Result[void, string] =
## Stops the FFI context and returns its slot to the pool.
defer:
joinThread(ctx.ffiThread)
joinThread(ctx.watchdogThread)
ctx.closeResources().isOkOr:
error "failed to close resources in destroyFFIContext", err = error
pool.releaseSlot(ctx)
return ctx.signalStop()
template checkParams*(ctx: ptr FFIContext, callback: FFICallBack, userData: pointer) =
if not isNil(ctx):

View File

@ -61,6 +61,62 @@ registerReqFFI(EmptyOkRequest, lib: ptr TestLib):
proc(): Future[Result[string, string]] {.async.} =
return ok("")
suite "FFIContextPool":
test "create and destroy via pool succeeds":
var pool: FFIContextPool[TestLib]
let ctx = pool.createFFIContext().valueOr:
assert false, "createFFIContext(pool) failed: " & $error
return
check pool.destroyFFIContext(ctx).isOk()
test "slot is reused after destroy":
var pool: FFIContextPool[TestLib]
let ctx1 = pool.createFFIContext().valueOr:
assert false, "createFFIContext(pool) failed: " & $error
return
check pool.destroyFFIContext(ctx1).isOk()
# After destroying, the same slot must be available again
let ctx2 = pool.createFFIContext().valueOr:
assert false, "createFFIContext(pool) failed after slot release: " & $error
return
check pool.destroyFFIContext(ctx2).isOk()
check ctx1 == ctx2 # same array slot reused
test "pool exhaustion returns error":
var pool: FFIContextPool[TestLib]
var ctxs: array[MaxFFIContexts, ptr FFIContext[TestLib]]
for i in 0 ..< MaxFFIContexts:
ctxs[i] = pool.createFFIContext().valueOr:
for j in 0 ..< i:
discard pool.destroyFFIContext(ctxs[j])
assert false, "createFFIContext(pool) failed at slot " & $i & ": " & $error
return
# Pool is now full — next create must fail
check pool.createFFIContext().isErr()
for i in 0 ..< MaxFFIContexts:
discard pool.destroyFFIContext(ctxs[i])
test "requests are processed via pool context":
var pool: FFIContextPool[TestLib]
var d: CallbackData
initCallbackData(d)
defer:
deinitCallbackData(d)
let ctx = pool.createFFIContext().valueOr:
assert false, "createFFIContext(pool) failed: " & $error
return
defer:
discard pool.destroyFFIContext(ctx)
check sendRequestToFFIThread(
ctx, PingRequest.ffiNewReq(testCallback, addr d, "pool".cstring)
)
.isOk()
waitCallback(d)
check d.retCode == RET_OK
check callbackMsg(d) == "pong:pool"
suite "createFFIContext / destroyFFIContext":
test "create and destroy succeeds":
let ctx = createFFIContext[TestLib]().valueOr:
@ -79,14 +135,19 @@ suite "sendRequestToFFIThread":
test "successful request triggers RET_OK callback":
var d: CallbackData
initCallbackData(d)
defer: deinitCallbackData(d)
defer:
deinitCallbackData(d)
let ctx = createFFIContext[TestLib]().valueOr:
check false
return
defer: discard destroyFFIContext(ctx)
defer:
discard destroyFFIContext(ctx)
check sendRequestToFFIThread(ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring)).isOk()
check sendRequestToFFIThread(
ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring)
)
.isOk()
waitCallback(d)
check d.retCode == RET_OK
check callbackMsg(d) == "pong:hello"
@ -94,12 +155,14 @@ suite "sendRequestToFFIThread":
test "failing request triggers RET_ERR callback":
var d: CallbackData
initCallbackData(d)
defer: deinitCallbackData(d)
defer:
deinitCallbackData(d)
let ctx = createFFIContext[TestLib]().valueOr:
check false
return
defer: discard destroyFFIContext(ctx)
defer:
discard destroyFFIContext(ctx)
check sendRequestToFFIThread(ctx, FailRequest.ffiNewReq(testCallback, addr d)).isOk()
waitCallback(d)
@ -108,14 +171,17 @@ suite "sendRequestToFFIThread":
test "empty ok response delivers empty message":
var d: CallbackData
initCallbackData(d)
defer: deinitCallbackData(d)
defer:
deinitCallbackData(d)
let ctx = createFFIContext[TestLib]().valueOr:
check false
return
defer: discard destroyFFIContext(ctx)
defer:
discard destroyFFIContext(ctx)
check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d)).isOk()
check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d))
.isOk()
waitCallback(d)
check d.retCode == RET_OK
check d.msgLen == 0
@ -124,13 +190,17 @@ suite "sendRequestToFFIThread":
let ctx = createFFIContext[TestLib]().valueOr:
check false
return
defer: discard destroyFFIContext(ctx)
defer:
discard destroyFFIContext(ctx)
for i in 1 .. 5:
var d: CallbackData
initCallbackData(d)
let msg = "msg" & $i
check sendRequestToFFIThread(ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring)).isOk()
check sendRequestToFFIThread(
ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring)
)
.isOk()
waitCallback(d)
deinitCallbackData(d)
check d.retCode == RET_OK