mirror of
https://github.com/logos-messaging/nim-ffi.git
synced 2026-05-10 20:29:47 +00:00
Fix memleaks (#11)
* protect against mem leak in case of failures sending requests to ffi thread * better cleanup if failures in createFFIContext * avoid dangling cstring in handleRes under ARC/ORC * better resource cleanup in destroyFFIContext * invoke onNotResponding if failure in destroyFFIContext * correct seq copy in alloc * make sure the lock is init before cleanUpResources * better possible exception handling in processReq * guard allocSharedSeq if given seq is empty * enhance error handling in ffi_context * add new tests and some corrections
This commit is contained in:
parent
bb8a3e7e22
commit
df2277e726
20
ffi.nimble
20
ffi.nimble
@ -12,11 +12,17 @@ requires "chronos"
|
||||
requires "chronicles"
|
||||
requires "taskpools"
|
||||
|
||||
# Source files to include
|
||||
# srcDir = "src"
|
||||
# installFiles = @["src/ffi.nim", "mylib.h"]
|
||||
const nimFlags = "--mm:orc -d:chronicles_log_level=WARN"
|
||||
|
||||
# # 💡 Custom build step before installation
|
||||
# before install:
|
||||
# echo "Generating custom C header..."
|
||||
# exec "nim r tools/gen_header.nim"
|
||||
task build, "Compile the library":
|
||||
exec "nim c " & nimFlags & " --noMain ffi.nim"
|
||||
|
||||
task test, "Run all tests":
|
||||
exec "nim c -r " & nimFlags & " tests/test_alloc.nim"
|
||||
exec "nim c -r " & nimFlags & " tests/test_ffi_context.nim"
|
||||
|
||||
task test_alloc, "Run alloc unit tests":
|
||||
exec "nim c -r " & nimFlags & " tests/test_alloc.nim"
|
||||
|
||||
task test_ffi, "Run FFI context integration tests":
|
||||
exec "nim c -r " & nimFlags & " tests/test_ffi_context.nim"
|
||||
|
||||
@ -25,12 +25,16 @@ proc alloc*(str: string): cstring =
|
||||
|
||||
proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] =
|
||||
let data = allocShared(sizeof(T) * s.len)
|
||||
if s.len != 0:
|
||||
copyMem(data, unsafeAddr s[0], s.len)
|
||||
|
||||
if s.len == 0:
|
||||
return (cast[ptr UncheckedArray[T]](nil), 0)
|
||||
|
||||
copyMem(data, unsafeAddr s[0], sizeof(T) * s.len)
|
||||
return (cast[ptr UncheckedArray[T]](data), s.len)
|
||||
|
||||
proc deallocSharedSeq*[T](s: var SharedSeq[T]) =
|
||||
deallocShared(s.data)
|
||||
if not s.data.isNil():
|
||||
deallocShared(s.data)
|
||||
s.len = 0
|
||||
|
||||
proc toSeq*[T](s: SharedSeq[T]): seq[T] =
|
||||
|
||||
@ -60,18 +60,23 @@ proc sendRequestToFFIThread*(
|
||||
## Sending the request
|
||||
let sentOk = ctx.reqChannel.trySend(ffiRequest)
|
||||
if not sentOk:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("Couldn't send a request to the ffi thread")
|
||||
|
||||
let fireSyncRes = ctx.reqSignal.fireSync()
|
||||
if fireSyncRes.isErr():
|
||||
deleteRequest(ffiRequest)
|
||||
return err("failed fireSync: " & $fireSyncRes.error)
|
||||
|
||||
if fireSyncRes.get() == false:
|
||||
deleteRequest(ffiRequest)
|
||||
return err("Couldn't fireSync in time")
|
||||
|
||||
## wait until the FFI working thread properly received the request
|
||||
let res = ctx.reqReceivedSignal.waitSync(timeout)
|
||||
if res.isErr():
|
||||
## Do not free ffiRequest here: the FFI thread was already signaled and
|
||||
## will process (and free) it.
|
||||
return err("Couldn't receive reqReceivedSignal signal")
|
||||
|
||||
## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the
|
||||
@ -122,8 +127,14 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} =
|
||||
|
||||
trace "Sending watchdog request to FFI thread"
|
||||
|
||||
sendRequestToFFIThread(ctx, WatchdogReq.ffiNewReq(callback, nilUserData), WatchdogTimeout).isOkOr:
|
||||
error "Failed to send watchdog request to FFI thread", error = $error
|
||||
try:
|
||||
sendRequestToFFIThread(
|
||||
ctx, WatchdogReq.ffiNewReq(callback, nilUserData), WatchdogTimeout
|
||||
).isOkOr:
|
||||
error "Failed to send watchdog request to FFI thread", error = $error
|
||||
onNotResponding(ctx)
|
||||
except Exception as exc:
|
||||
error "Exception sending watchdog request", exc = exc.msg
|
||||
onNotResponding(ctx)
|
||||
|
||||
waitFor watchdogRun(ctx)
|
||||
@ -138,13 +149,30 @@ proc processRequest[T](
|
||||
## The registeredRequests represents a table defined at compile time.
|
||||
## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously]
|
||||
|
||||
## Explicit conversion keeps `reqId` alive as the backing string,
|
||||
## avoiding the implicit string→cstring warning that will become an error.
|
||||
let reqIdCs = reqId.cstring
|
||||
|
||||
let retFut =
|
||||
if not ctx[].registeredRequests[].contains(reqId):
|
||||
if not ctx[].registeredRequests[].contains(reqIdCs):
|
||||
## That shouldn't happen because only registered requests should be sent to the FFI thread.
|
||||
nilProcess(request[].reqId)
|
||||
else:
|
||||
ctx[].registeredRequests[][reqId](request[].reqContent, ctx)
|
||||
handleRes(await retFut, request)
|
||||
ctx[].registeredRequests[][reqIdCs](request[].reqContent, ctx)
|
||||
|
||||
let res =
|
||||
try:
|
||||
await retFut
|
||||
except CatchableError as exc:
|
||||
Result[string, string].err("Exception in processRequest for " & reqId & ": " & exc.msg)
|
||||
|
||||
## handleRes may raise (OOM, GC setup) even though it is rare. Catching here
|
||||
## keeps the async proc raises:[] compatible. The defer inside handleRes
|
||||
## guarantees request is freed before the exception propagates.
|
||||
try:
|
||||
handleRes(res, request)
|
||||
except Exception as exc:
|
||||
error "Unexpected exception in handleRes", exc = exc.msg
|
||||
|
||||
proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
## FFI thread body that attends library user API requests
|
||||
@ -180,15 +208,33 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} =
|
||||
|
||||
waitFor ffiRun(ctx)
|
||||
|
||||
proc cleanUpResources[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
defer:
|
||||
freeShared(ctx)
|
||||
ctx.lock.deinitLock()
|
||||
if not ctx.reqSignal.isNil():
|
||||
?ctx.reqSignal.close()
|
||||
if not ctx.reqReceivedSignal.isNil():
|
||||
?ctx.reqReceivedSignal.close()
|
||||
return ok()
|
||||
|
||||
proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
|
||||
## This proc is called from the main thread and it creates
|
||||
## the FFI working thread.
|
||||
var ctx = createShared(FFIContext[T], 1)
|
||||
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create reqSignal ThreadSignalPtr")
|
||||
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
|
||||
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
|
||||
ctx.lock.initLock()
|
||||
|
||||
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
return err("could not clean resources in a failure new reqSignal: " & $error)
|
||||
return err("couldn't create reqSignal ThreadSignalPtr: " & $error)
|
||||
|
||||
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
return
|
||||
err("could not clean resources in a failure new reqReceivedSignal: " & $error)
|
||||
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
|
||||
|
||||
ctx.registeredRequests = addr ffi_types.registeredRequests
|
||||
|
||||
ctx.running.store(true)
|
||||
@ -196,32 +242,39 @@ proc createFFIContext*[T](): Result[ptr FFIContext[T], string] =
|
||||
try:
|
||||
createThread(ctx.ffiThread, ffiThreadBody[T], ctx)
|
||||
except ValueError, ResourceExhaustedError:
|
||||
freeShared(ctx)
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
error "failed to clean up resources after ffiThread creation failure", err = error
|
||||
return err("failed to create the FFI thread: " & getCurrentExceptionMsg())
|
||||
|
||||
try:
|
||||
createThread(ctx.watchdogThread, watchdogThreadBody, ctx)
|
||||
except ValueError, ResourceExhaustedError:
|
||||
freeShared(ctx)
|
||||
ctx.running.store(false)
|
||||
let fireRes = ctx.reqSignal.fireSync()
|
||||
if fireRes.isErr():
|
||||
error "failed to signal ffiThread during watchdog cleanup", err = fireRes.error
|
||||
joinThread(ctx.ffiThread)
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
error "failed to clean up resources after watchdogThread creation failure", err = error
|
||||
return err("failed to create the watchdog thread: " & getCurrentExceptionMsg())
|
||||
|
||||
return ok(ctx)
|
||||
|
||||
proc destroyFFIContext*[T](ctx: ptr FFIContext[T]): Result[void, string] =
|
||||
ctx.running.store(false)
|
||||
defer:
|
||||
joinThread(ctx.ffiThread)
|
||||
joinThread(ctx.watchdogThread)
|
||||
ctx.cleanUpResources().isOkOr:
|
||||
error "failed to clean up resources in destroyFFIContext", err = error
|
||||
|
||||
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
|
||||
ctx.onNotResponding()
|
||||
return err("error in destroyFFIContext: " & $error)
|
||||
if not signaledOnTime:
|
||||
ctx.onNotResponding()
|
||||
return err("failed to signal reqSignal on time in destroyFFIContext")
|
||||
|
||||
joinThread(ctx.ffiThread)
|
||||
joinThread(ctx.watchdogThread)
|
||||
ctx.lock.deinitLock()
|
||||
?ctx.reqSignal.close()
|
||||
?ctx.reqReceivedSignal.close()
|
||||
freeShared(ctx)
|
||||
|
||||
return ok()
|
||||
|
||||
template checkParams*(ctx: ptr FFIContext, callback: FFICallBack, userData: pointer) =
|
||||
|
||||
@ -6,11 +6,16 @@ import std/[json, macros], results, tables
|
||||
import chronos, chronos/threadsync
|
||||
import ./ffi_types, ./internal/ffi_macro, ./alloc
|
||||
|
||||
type FFIDestroyContentProc* = proc(content: pointer) {.nimcall, gcsafe.}
|
||||
|
||||
type FFIThreadRequest* = object
|
||||
callback: FFICallBack
|
||||
userData: pointer
|
||||
reqId*: cstring
|
||||
reqContent*: pointer
|
||||
deleteReqContent*: FFIDestroyContentProc
|
||||
## Called by sendRequestToFFIThread on failure to free reqContent when
|
||||
## the FFI thread will never process (and thus never free) this request.
|
||||
|
||||
proc init*(
|
||||
T: typedesc[FFIThreadRequest],
|
||||
@ -26,7 +31,9 @@ proc init*(
|
||||
ret[].reqContent = reqContent
|
||||
return ret
|
||||
|
||||
proc deleteRequest(request: ptr FFIThreadRequest) =
|
||||
proc deleteRequest*(request: ptr FFIThreadRequest) =
|
||||
if not request[].deleteReqContent.isNil():
|
||||
request[].deleteReqContent(request[].reqContent)
|
||||
deallocShared(request[].reqId)
|
||||
deallocShared(request)
|
||||
|
||||
@ -48,9 +55,11 @@ proc handleRes*[T: string | void](
|
||||
return
|
||||
|
||||
foreignThreadGc:
|
||||
var msg: cstring = ""
|
||||
var resStr: string
|
||||
## we need to bind the string to extend its lifetime to callback's in ARC/ORC
|
||||
when T is string:
|
||||
msg = res.get().cstring()
|
||||
resStr = res.get()
|
||||
let msg: cstring = resStr.cstring()
|
||||
request[].callback(
|
||||
RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
|
||||
)
|
||||
@ -58,4 +67,3 @@ proc handleRes*[T: string | void](
|
||||
|
||||
proc nilProcess*(reqId: cstring): Future[Result[string, string]] {.async.} =
|
||||
return err("This request type is not implemented: " & $reqId)
|
||||
|
||||
|
||||
@ -164,6 +164,9 @@ proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode =
|
||||
let typeStr = $T
|
||||
var ret =
|
||||
FFIThreadRequest.init(callback, userData, typeStr.cstring, `reqObjIdent`)
|
||||
proc destroyContent(content: pointer) {.nimcall.} =
|
||||
ffiDeleteReq(cast[ptr `reqTypeName`](content))
|
||||
ret[].deleteReqContent = destroyContent
|
||||
return ret
|
||||
)
|
||||
|
||||
@ -255,8 +258,6 @@ proc buildProcessFFIRequestProc(reqTypeName, reqHandler, body: NimNode): NimNode
|
||||
|
||||
newBody.add quote do:
|
||||
let `reqIdent`: ptr `reqTypeName` = cast[ptr `reqTypeName`](request)
|
||||
defer:
|
||||
ffiDeleteReq(`reqIdent`)
|
||||
|
||||
# automatically unpack fields into locals
|
||||
for p in procParams[1 ..^ 1]:
|
||||
@ -387,7 +388,7 @@ macro registerReqFFI*(reqTypeName, reqHandler, body: untyped): untyped =
|
||||
let processProc = buildProcessFFIRequestProc(reqTypeName, reqHandler, body)
|
||||
let addNewReqToReg = addNewRequestToRegistry(reqTypeName, reqHandler)
|
||||
let deleteProc = buildFfiDeleteReqProc(reqTypeName, fields)
|
||||
result = newStmtList(typeDef, ffiNewReqProc, deleteProc, processProc, addNewReqToReg)
|
||||
result = newStmtList(typeDef, deleteProc, ffiNewReqProc, processProc, addNewReqToReg)
|
||||
|
||||
when defined(ffiDumpMacros):
|
||||
echo result.repr
|
||||
|
||||
71
tests/test_alloc.nim
Normal file
71
tests/test_alloc.nim
Normal file
@ -0,0 +1,71 @@
|
||||
import unittest2
|
||||
import ../ffi/alloc
|
||||
|
||||
suite "alloc(cstring)":
|
||||
test "nil input returns empty cstring":
|
||||
let s: cstring = nil
|
||||
let res = alloc(s)
|
||||
check res != nil
|
||||
check res[0] == '\0'
|
||||
deallocShared(res)
|
||||
|
||||
test "copies content":
|
||||
let res = alloc("hello world".cstring)
|
||||
check $res == "hello world"
|
||||
deallocShared(res)
|
||||
|
||||
test "empty cstring":
|
||||
let res = alloc("".cstring)
|
||||
check len(res) == 0
|
||||
deallocShared(res)
|
||||
|
||||
suite "alloc(string)":
|
||||
test "copies content":
|
||||
let res = alloc("test string")
|
||||
check $res == "test string"
|
||||
deallocShared(res)
|
||||
|
||||
test "empty string":
|
||||
let res = alloc("")
|
||||
check len(res) == 0
|
||||
deallocShared(res)
|
||||
|
||||
test "string with special characters":
|
||||
let res = alloc("abc\0xyz")
|
||||
check res[0] == 'a'
|
||||
deallocShared(res)
|
||||
|
||||
suite "allocSharedSeq / deallocSharedSeq / toSeq":
|
||||
test "roundtrip int seq":
|
||||
let original = @[1, 2, 3, 4, 5]
|
||||
var shared = allocSharedSeq(original)
|
||||
check shared.len == 5
|
||||
let back = toSeq(shared)
|
||||
check back == original
|
||||
deallocSharedSeq(shared)
|
||||
check shared.len == 0
|
||||
|
||||
test "empty seq":
|
||||
let original: seq[int] = @[]
|
||||
var shared = allocSharedSeq(original)
|
||||
check shared.len == 0
|
||||
check shared.data == nil
|
||||
let back = toSeq(shared)
|
||||
check back.len == 0
|
||||
deallocSharedSeq(shared)
|
||||
|
||||
test "preserves element values by index":
|
||||
let original = @[10, 20, 30]
|
||||
var shared = allocSharedSeq(original)
|
||||
check shared.data[0] == 10
|
||||
check shared.data[1] == 20
|
||||
check shared.data[2] == 30
|
||||
deallocSharedSeq(shared)
|
||||
|
||||
test "works with byte seq":
|
||||
let original = @[byte(0xFF), byte(0x00), byte(0xAB)]
|
||||
var shared = allocSharedSeq(original)
|
||||
check shared.len == 3
|
||||
let back = toSeq(shared)
|
||||
check back == original
|
||||
deallocSharedSeq(shared)
|
||||
137
tests/test_ffi_context.nim
Normal file
137
tests/test_ffi_context.nim
Normal file
@ -0,0 +1,137 @@
|
||||
import std/locks
|
||||
import unittest2
|
||||
import results
|
||||
import ../ffi
|
||||
|
||||
type TestLib = object
|
||||
|
||||
## Per-request callback state. The test thread blocks on `cond` until the
|
||||
## FFI thread signals it — no polling, no CPU waste.
|
||||
type CallbackData = object
|
||||
lock: Lock
|
||||
cond: Cond
|
||||
called: bool
|
||||
retCode: cint
|
||||
msg: array[512, char]
|
||||
msgLen: int
|
||||
|
||||
proc initCallbackData(d: var CallbackData) =
|
||||
d.lock.initLock()
|
||||
d.cond.initCond()
|
||||
|
||||
proc deinitCallbackData(d: var CallbackData) =
|
||||
d.cond.deinitCond()
|
||||
d.lock.deinitLock()
|
||||
|
||||
proc testCallback(
|
||||
retCode: cint, msg: ptr cchar, len: csize_t, userData: pointer
|
||||
) {.cdecl, gcsafe, raises: [].} =
|
||||
let d = cast[ptr CallbackData](userData)
|
||||
acquire(d[].lock)
|
||||
d[].retCode = retCode
|
||||
let n = min(int(len), d[].msg.high)
|
||||
if n > 0 and not msg.isNil:
|
||||
copyMem(addr d[].msg[0], msg, n)
|
||||
d[].msg[n] = '\0'
|
||||
d[].msgLen = n
|
||||
d[].called = true
|
||||
signal(d[].cond)
|
||||
release(d[].lock)
|
||||
|
||||
proc waitCallback(d: var CallbackData) =
|
||||
acquire(d.lock)
|
||||
while not d.called:
|
||||
wait(d.cond, d.lock)
|
||||
release(d.lock)
|
||||
|
||||
proc callbackMsg(d: var CallbackData): string =
|
||||
result = newString(d.msgLen)
|
||||
if d.msgLen > 0:
|
||||
copyMem(addr result[0], addr d.msg[0], d.msgLen)
|
||||
|
||||
registerReqFFI(PingRequest, lib: ptr TestLib):
|
||||
proc(message: cstring): Future[Result[string, string]] {.async.} =
|
||||
return ok("pong:" & $message)
|
||||
|
||||
registerReqFFI(FailRequest, lib: ptr TestLib):
|
||||
proc(): Future[Result[string, string]] {.async.} =
|
||||
return err("intentional failure")
|
||||
|
||||
registerReqFFI(EmptyOkRequest, lib: ptr TestLib):
|
||||
proc(): Future[Result[string, string]] {.async.} =
|
||||
return ok("")
|
||||
|
||||
suite "createFFIContext / destroyFFIContext":
|
||||
test "create and destroy succeeds":
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
checkpoint "createFFIContext failed: " & $error
|
||||
check false
|
||||
return
|
||||
check destroyFFIContext(ctx).isOk()
|
||||
|
||||
test "double destroy is safe via running flag":
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
check false
|
||||
return
|
||||
check destroyFFIContext(ctx).isOk()
|
||||
|
||||
suite "sendRequestToFFIThread":
|
||||
test "successful request triggers RET_OK callback":
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer: deinitCallbackData(d)
|
||||
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
check false
|
||||
return
|
||||
defer: discard destroyFFIContext(ctx)
|
||||
|
||||
check sendRequestToFFIThread(ctx, PingRequest.ffiNewReq(testCallback, addr d, "hello".cstring)).isOk()
|
||||
waitCallback(d)
|
||||
check d.retCode == RET_OK
|
||||
check callbackMsg(d) == "pong:hello"
|
||||
|
||||
test "failing request triggers RET_ERR callback":
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer: deinitCallbackData(d)
|
||||
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
check false
|
||||
return
|
||||
defer: discard destroyFFIContext(ctx)
|
||||
|
||||
check sendRequestToFFIThread(ctx, FailRequest.ffiNewReq(testCallback, addr d)).isOk()
|
||||
waitCallback(d)
|
||||
check d.retCode == RET_ERR
|
||||
|
||||
test "empty ok response delivers empty message":
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
defer: deinitCallbackData(d)
|
||||
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
check false
|
||||
return
|
||||
defer: discard destroyFFIContext(ctx)
|
||||
|
||||
check sendRequestToFFIThread(ctx, EmptyOkRequest.ffiNewReq(testCallback, addr d)).isOk()
|
||||
waitCallback(d)
|
||||
check d.retCode == RET_OK
|
||||
check d.msgLen == 0
|
||||
|
||||
test "sequential requests are all processed":
|
||||
let ctx = createFFIContext[TestLib]().valueOr:
|
||||
check false
|
||||
return
|
||||
defer: discard destroyFFIContext(ctx)
|
||||
|
||||
for i in 1 .. 5:
|
||||
var d: CallbackData
|
||||
initCallbackData(d)
|
||||
let msg = "msg" & $i
|
||||
check sendRequestToFFIThread(ctx, PingRequest.ffiNewReq(testCallback, addr d, msg.cstring)).isOk()
|
||||
waitCallback(d)
|
||||
deinitCallbackData(d)
|
||||
check d.retCode == RET_OK
|
||||
check callbackMsg(d) == "pong:" & msg
|
||||
Loading…
x
Reference in New Issue
Block a user