From 534cbe8f1a8ba4ef6805581f08b3a71652a323a0 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Sat, 2 May 2026 00:20:33 +0200 Subject: [PATCH] use fixed array of ctx to avoid consuming all fds --- ffi/ffi_context.nim | 156 +++++++++++++++++++++++++++++-------- tests/test_ffi_context.nim | 90 ++++++++++++++++++--- 2 files changed, 204 insertions(+), 42 deletions(-) diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 545d2d0..b195958 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -18,6 +18,8 @@ type FFIContext*[T] = object reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent reqReceivedSignal: ThreadSignalPtr # to signal main thread, interfacing with the FFI thread, that FFI thread received the request + watchdogStopSignal: ThreadSignalPtr + # fired by destroyFFIContext so the watchdog exits immediately instead of waiting out its sleep userData*: pointer eventCallback*: pointer eventUserdata*: pointer @@ -25,6 +27,18 @@ type FFIContext*[T] = object registeredRequests: ptr Table[cstring, FFIRequestProc] # Pointer to with the registered requests at compile time +const MaxFFIContexts* = 32 + ## Maximum number of concurrently live FFI contexts when using FFIContextPool. + ## Fds and threads are only consumed for slots that are actually acquired, + ## so this value only affects the upfront memory of the pool array. + +type FFIContextPool*[T] = object + ## Fixed-size pool of FFI contexts. Avoids dynamic heap allocation per context + ## and bounds the total number of file descriptors consumed by ThreadSignalPtrs + ## to at most MaxFFIContexts * 2. + slots: array[MaxFFIContexts, FFIContext[T]] + inUse: array[MaxFFIContexts, Atomic[bool]] + const git_version* {.strdefine.} = "n/a" template callEventCallback*(ctx: ptr FFIContext, eventName: string, body: untyped) = @@ -110,13 +124,19 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = const WatchdogTimeinterval = 1.seconds const WatchdogTimeout = 20.seconds - # Give time for the node to be created and up before sending watchdog requests - await sleepAsync(WatchdogStartDelay) - while true: - await sleepAsync(WatchdogTimeinterval) + # Give time for the node to be created and up before sending watchdog requests. + # waitSync returns early if watchdogStopSignal fires (i.e. on destroy). + let startWait = ctx.watchdogStopSignal.waitSync(WatchdogStartDelay) + if startWait.isErr(): + error "watchdog: start-delay waitSync failed", err = startWait.error + elif startWait.get(): + return # stop signal fired during start delay - if ctx.running.load == false: - debug "Watchdog thread exiting because FFIContext is not running" + while ctx.running.load: + let intervalWait = ctx.watchdogStopSignal.waitSync(WatchdogTimeinterval) + if intervalWait.isErr(): + error "watchdog: interval waitSync failed", err = intervalWait.error + elif intervalWait.get() or not ctx.running.load: break let callback = proc( @@ -164,7 +184,9 @@ proc processRequest[T]( try: await retFut except CatchableError as exc: - Result[string, string].err("Exception in processRequest for " & reqId & ": " & exc.msg) + Result[string, string].err( + "Exception in processRequest for " & reqId & ": " & exc.msg + ) ## handleRes may raise (OOM, GC setup) even though it is rare. Catching here ## keeps the async proc raises:[] compatible. The defer inside handleRes @@ -208,33 +230,47 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = waitFor ffiRun(ctx) -proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = - defer: - freeShared(ctx) +proc closeResources[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Closes file descriptors and deinits the lock. Does NOT free ctx memory. + ## Used by initContextResources error paths and pool destroy, where ctx is + ## not heap-allocated (pool slots live in a fixed array, not on the heap). ctx.lock.deinitLock() if not ctx.reqSignal.isNil(): ?ctx.reqSignal.close() if not ctx.reqReceivedSignal.isNil(): ?ctx.reqReceivedSignal.close() + if not ctx.watchdogStopSignal.isNil(): + ?ctx.watchdogStopSignal.close() return ok() -proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = - ## This proc is called from the main thread and it creates - ## the FFI working thread. - var ctx = createShared(FFIContext[T], 1) +proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Full cleanup for heap-allocated contexts: closes all resources and frees memory. + defer: + freeShared(ctx) + return ctx.closeResources() + +proc initContextResources[T](ctx: ptr FFIContext[T]): Result[void, string] = + ## Initialises all resources inside an already-allocated FFIContext slot. + ## On failure every partially-initialised resource is closed; the caller + ## is responsible for releasing the slot (freeShared or pool.releaseSlot). ctx.lock.initLock() ctx.reqSignal = ThreadSignalPtr.new().valueOr: - ctx.cleanUpResources().isOkOr: - return err("could not clean resources in a failure new reqSignal: " & $error) + ctx.closeResources().isOkOr: + return err("could not close resources after reqSignal failure: " & $error) return err("couldn't create reqSignal ThreadSignalPtr: " & $error) ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: - ctx.cleanUpResources().isOkOr: - return - err("could not clean resources in a failure new reqReceivedSignal: " & $error) + ctx.closeResources().isOkOr: + return err("could not close resources after reqReceivedSignal failure: " & $error) return err("couldn't create reqReceivedSignal ThreadSignalPtr") + ctx.watchdogStopSignal = ThreadSignalPtr.new().valueOr: + ctx.closeResources().isOkOr: + return + err("could not close resources after watchdogStopSignal failure: " & $error) + return err("couldn't create watchdogStopSignal ThreadSignalPtr") + ctx.registeredRequests = addr ffi_types.registeredRequests ctx.running.store(true) @@ -242,8 +278,8 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = try: createThread(ctx.ffiThread, ffiThreadBody[T], ctx) except ValueError, ResourceExhaustedError: - ctx.cleanUpResources().isOkOr: - error "failed to clean up resources after ffiThread creation failure", err = error + ctx.closeResources().isOkOr: + error "failed to close resources after ffiThread creation failure", err = error return err("failed to create the FFI thread: " & getCurrentExceptionMsg()) try: @@ -254,28 +290,84 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = if fireRes.isErr(): error "failed to signal ffiThread during watchdog cleanup", err = fireRes.error joinThread(ctx.ffiThread) - ctx.cleanUpResources().isOkOr: - error "failed to clean up resources after watchdogThread creation failure", err = error + ctx.closeResources().isOkOr: + error "failed to close resources after watchdogThread creation failure", + err = error return err("failed to create the watchdog thread: " & getCurrentExceptionMsg()) + return ok() + +# ── Pool helpers ───────────────────────────────────────────────────────────── + +proc acquireSlot[T](pool: var FFIContextPool[T]): Result[ptr FFIContext[T], string] = + for i in 0 ..< MaxFFIContexts: + var expected = false + if pool.inUse[i].compareExchange(expected, true): + return ok(pool.slots[i].addr) + return err("FFI context pool exhausted (max " & $MaxFFIContexts & " contexts)") + +proc releaseSlot[T](pool: var FFIContextPool[T], ctx: ptr FFIContext[T]) = + for i in 0 ..< MaxFFIContexts: + if pool.slots[i].addr == ctx: + pool.inUse[i].store(false) + return + +# ── Public API ──────────────────────────────────────────────────────────────── + +proc createFFIContext*[T](): Result[ptr FFIContext[T], string] = + ## Creates a heap-allocated FFI context. The caller must call destroyFFIContext(ctx) + ## to release it. Prefer the pool overload when the maximum context count is known. + var ctx = createShared(FFIContext[T], 1) + initContextResources(ctx).isOkOr: + freeShared(ctx) + return err(error) return ok(ctx) -proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] = +proc createFFIContext*[T]( + pool: var FFIContextPool[T] +): Result[ptr FFIContext[T], string] = + ## Acquires a slot from the fixed pool and initialises it as an FFI context. + ## Bounded fd usage: at most MaxFFIContexts * 2 ThreadSignalPtr fds are ever open. + let ctx = pool.acquireSlot().valueOr: + return err(error) + initContextResources(ctx).isOkOr: + pool.releaseSlot(ctx) + return err(error) + return ok(ctx) + +proc signalStop[T](ctx: ptr FFIContext[T]): Result[void, string] = ctx.running.store(false) + let ffiSignaled = ctx.reqSignal.fireSync().valueOr: + ctx.onNotResponding() + return err("error signaling reqSignal in destroyFFIContext: " & $error) + if not ffiSignaled: + ctx.onNotResponding() + return err("failed to signal reqSignal on time in destroyFFIContext") + let wdSignaled = ctx.watchdogStopSignal.fireSync().valueOr: + return err("error signaling watchdogStopSignal in destroyFFIContext: " & $error) + if not wdSignaled: + return err("failed to signal watchdogStopSignal on time in destroyFFIContext") + return ok() + +proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] = defer: joinThread(ctx.ffiThread) joinThread(ctx.watchdogThread) ctx.cleanUpResources().isOkOr: error "failed to clean up resources in destroyFFIContext", err = error + return ctx.signalStop() - let signaledOnTime = ctx.reqSignal.fireSync().valueOr: - ctx.onNotResponding() - return err("error in destroyFFIContext: " & $error) - if not signaledOnTime: - ctx.onNotResponding() - return err("failed to signal reqSignal on time in destroyFFIContext") - - return ok() +proc destroyFFIContext*[T]( + pool: var FFIContextPool[T], ctx: ptr FFIContext[T] +): Result[void, string] = + ## Stops the FFI context and returns its slot to the pool. + defer: + joinThread(ctx.ffiThread) + joinThread(ctx.watchdogThread) + ctx.closeResources().isOkOr: + error "failed to close resources in destroyFFIContext", err = error + pool.releaseSlot(ctx) + return ctx.signalStop() template checkParams*(ctx: ptr FFIContext, callback: FFICallBack, userData: pointer) = if not isNil(ctx): diff --git a/tests/test_ffi_context.nim b/tests/test_ffi_context.nim index 9cb9542..e9261e6 100644 --- a/tests/test_ffi_context.nim +++ b/tests/test_ffi_context.nim @@ -61,6 +61,62 @@ registerReqFFI(EmptyOkRequest, lib: ptr TestLib): proc(): Future[Result[string, string]] {.async.} = return ok("") +suite "FFIContextPool": + test "create and destroy via pool succeeds": + var pool: FFIContextPool[TestLib] + let ctx = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed: " & $error + return + check pool.destroyFFIContext(ctx).isOk() + + test "slot is reused after destroy": + var pool: FFIContextPool[TestLib] + let ctx1 = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed: " & $error + return + check pool.destroyFFIContext(ctx1).isOk() + # After destroying, the same slot must be available again + let ctx2 = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed after slot release: " & $error + return + check pool.destroyFFIContext(ctx2).isOk() + check ctx1 == ctx2 # same array slot reused + + test "pool exhaustion returns error": + var pool: FFIContextPool[TestLib] + var ctxs: array[MaxFFIContexts, ptr FFIContext[TestLib]] + for i in 0 ..< MaxFFIContexts: + ctxs[i] = pool.createFFIContext().valueOr: + for j in 0 ..< i: + discard pool.destroyFFIContext(ctxs[j]) + assert false, "createFFIContext(pool) failed at slot " & $i & ": " & $error + return + # Pool is now full — next create must fail + check pool.createFFIContext().isErr() + for i in 0 ..< MaxFFIContexts: + discard pool.destroyFFIContext(ctxs[i]) + + test "requests are processed via pool context": + var pool: FFIContextPool[TestLib] + var d: CallbackData + initCallbackData(d) + defer: + deinitCallbackData(d) + + let ctx = pool.createFFIContext().valueOr: + assert false, "createFFIContext(pool) failed: " & $error + return + defer: + discard pool.destroyFFIContext(ctx) + + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, "pool".cstring) + ) + .isOk() + waitCallback(d) + check d.retCode == RET_OK + check callbackMsg(d) == "pong:pool" + suite "createFFIContext / destroyFFIContext": test "create and destroy succeeds": let ctx = createFFIContext[TestLib]().valueOr: @@ -79,14 +135,19 @@ suite "sendRequestToFFIThread": test "successful request triggers RET_OK callback": var d: CallbackData initCallbackData(d) - defer: deinitCallbackData(d) + defer: + deinitCallbackData(d) let ctx = createFFIContext[TestLib]().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: + discard destroyFFIContext(ctx) - check sendRequestToFFIThread(ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring)).isOk() + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring) + ) + .isOk() waitCallback(d) check d.retCode == RET_OK check callbackMsg(d) == "pong:hello" @@ -94,12 +155,14 @@ suite "sendRequestToFFIThread": test "failing request triggers RET_ERR callback": var d: CallbackData initCallbackData(d) - defer: deinitCallbackData(d) + defer: + deinitCallbackData(d) let ctx = createFFIContext[TestLib]().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: + discard destroyFFIContext(ctx) check sendRequestToFFIThread(ctx, FailRequest.ffiNewReq(testCallback, addr d)).isOk() waitCallback(d) @@ -108,14 +171,17 @@ suite "sendRequestToFFIThread": test "empty ok response delivers empty message": var d: CallbackData initCallbackData(d) - defer: deinitCallbackData(d) + defer: + deinitCallbackData(d) let ctx = createFFIContext[TestLib]().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: + discard destroyFFIContext(ctx) - check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d)).isOk() + check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d)) + .isOk() waitCallback(d) check d.retCode == RET_OK check d.msgLen == 0 @@ -124,13 +190,17 @@ suite "sendRequestToFFIThread": let ctx = createFFIContext[TestLib]().valueOr: check false return - defer: discard destroyFFIContext(ctx) + defer: + discard destroyFFIContext(ctx) for i in 1 .. 5: var d: CallbackData initCallbackData(d) let msg = "msg" & $i - check sendRequestToFFIThread(ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring)).isOk() + check sendRequestToFFIThread( + ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring) + ) + .isOk() waitCallback(d) deinitCallbackData(d) check d.retCode == RET_OK