From a1a6536b3c06b78b911eff1d1694b477c4d615b0 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 5 Sep 2025 21:31:01 +0200 Subject: [PATCH] general ffi increments --- ffi/ffi_context.nim | 52 ++++++++---- ffi/ffi_thread_request.nim | 2 +- ffi/internal/ffi_library.nim | 13 +++ ffi/internal/ffi_macro.nim | 157 ++++++++++++++++++++++++----------- 4 files changed, 158 insertions(+), 66 deletions(-) diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 782a156..0acb17d 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -2,7 +2,7 @@ {.pragma: callback, cdecl, raises: [], gcsafe.} {.passc: "-fPIC".} -import std/[options, atomics, os, net, locks] +import std/[options, atomics, os, net, locks, json] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro @@ -20,13 +20,13 @@ type FFIContext* = object eventCallback*: pointer eventUserdata*: pointer running: Atomic[bool] # To control when the threads are running - registeredRequests: Table[string, FFIRequestProc] + registeredRequests: ptr Table[cstring, FFIRequestProc] const git_version* {.strdefine.} = "n/a" -template callEventCallback(ctx: ptr FFIContext, eventName: string, body: untyped) = +template callEventCallback*(ctx: ptr FFIContext, eventName: string, body: untyped) = if isNil(ctx[].eventCallback): - error eventName & " - eventCallback is nil" + chronicles.error eventName & " - eventCallback is nil" return foreignThreadGc: @@ -75,6 +75,24 @@ proc sendRequestToFFIThread*( ## process proc. See the 'waku_thread_request.nim' module for more details. ok() +type Foo = object +registerReqFFI(WatchdogReq, foo: ptr Foo): + proc(): Future[Result[string, string]] {.async.} = + return ok("waku thread is not blocked") + +type JsonWakuNotRespondingEvent = object + eventType: string + +proc init(T: type JsonWakuNotRespondingEvent): T = + return JsonWakuNotRespondingEvent(eventType: "not_responding") + +proc `$`(event: JsonWakuNotRespondingEvent): string = + $(%*event) + +proc onWakuNotResponding*(ctx: ptr FFIContext) = + callEventCallback(ctx, "onWakuNotResponsive"): + $JsonWakuNotRespondingEvent.init() + proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = ## Watchdog thread that monitors the Waku thread and notifies the library user if it hangs. @@ -100,16 +118,9 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = trace "Sending watchdog request to FFI thread" - # sendRequestToFFIThread( - # ctx, - # 0, - # DebugNodeRequest.createShared(DebugNodeMsgType.CHECK_WAKU_NOT_BLOCKED), - # wakuCallback, - # nilUserData, - # WakuNotRespondingTimeout, - # ).isOkOr: - # error "Failed to send watchdog request to FFI thread", error = $error - # onWakuNotResponding(ctx) + sendRequestToFFIThread(ctx, WatchdogReq.ffiNewReq(wakuCallback, nilUserData)).isOkOr: + error "Failed to send watchdog request to FFI thread", error = $error + onWakuNotResponding(ctx) waitFor watchdogRun(ctx) @@ -133,7 +144,7 @@ proc ffiThreadBody[TT](ctx: ptr FFIContext) {.thread.} = ## Handle the request asyncSpawn FFIThreadRequest.process( - request, addr ffiHandler, addr ctx.registeredRequests + request, addr ffiHandler, ctx.registeredRequests ) let fireRes = ctx.reqReceivedSignal.fireSync() @@ -151,7 +162,7 @@ proc createFFIContext*[T](tt: typedesc[T]): Result[ptr FFIContext, string] = ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create reqReceivedSignal ThreadSignalPtr") ctx.lock.initLock() - ctx.registeredRequests = ffi_macro.registeredRequests + ctx.registeredRequests = addr ffi_macro.registeredRequests ctx.running.store(true) @@ -169,7 +180,7 @@ proc createFFIContext*[T](tt: typedesc[T]): Result[ptr FFIContext, string] = return ok(ctx) -proc destroyFFIContext*[T](ctx: ptr FFIContext): Result[void, string] = +proc destroyFFIContext*(ctx: ptr FFIContext): Result[void, string] = ctx.running.store(false) let signaledOnTime = ctx.reqSignal.fireSync().valueOr: @@ -185,3 +196,10 @@ proc destroyFFIContext*[T](ctx: ptr FFIContext): Result[void, string] = freeShared(ctx) return ok() + +template checkParams*(ctx: ptr FFIContext, callback: FFICallBack, userData: pointer) = + if not isNil(ctx): + ctx[].userData = userData + + if isNil(callback): + return RET_MISSING_CALLBACK diff --git a/ffi/ffi_thread_request.nim b/ffi/ffi_thread_request.nim index ac52348..1631d32 100644 --- a/ffi/ffi_thread_request.nim +++ b/ffi/ffi_thread_request.nim @@ -65,7 +65,7 @@ proc process*[R]( T: type FFIThreadRequest, request: ptr FFIThreadRequest, reqHandler: ptr R, - registeredRequests: ptr Table[string, FFIRequestProc], + registeredRequests: ptr Table[cstring, FFIRequestProc], ) {.async.} = let reqId = $request[].reqId diff --git a/ffi/internal/ffi_library.nim b/ffi/internal/ffi_library.nim index 0273018..23e9dad 100644 --- a/ffi/internal/ffi_library.nim +++ b/ffi/internal/ffi_library.nim @@ -78,5 +78,18 @@ macro declareLibrary*(libraryName: static[string]): untyped = res.add(initializeLibraryProc) + ## Generate the exported C-callable callback setter + let setCallbackProc = quote: + proc set_event_callback( + ctx: ptr FFIContext, callback: FFICallBack, userData: pointer + ) {.dynlib, exportc.} = + initializeLibrary() + ctx[].eventCallback = cast[pointer](callback) + ctx[].eventUserData = userData + + res.add(setCallbackProc) + echo result.repr return res + + diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index 246204a..3dce63d 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -2,7 +2,56 @@ import std/[macros, tables] import chronos import ../ffi_types -var registeredRequests* {.threadvar.}: Table[string, FFIRequestProc] +var registeredRequests* {.threadvar.}: Table[cstring, FFIRequestProc] + +proc extractFieldsFromLambda(body: NimNode): seq[NimNode] = + ## Extracts the fields (params) from the given lambda body. + var procNode = body + if procNode.kind == nnkStmtList and procNode.len == 1: + procNode = procNode[0] + if procNode.kind != nnkLambda: + error "registerReqFFI expects a lambda proc, found: " & $procNode.kind + + let params = procNode[3] # parameters list + result = @[] + for p in params[1 .. ^1]: # skip return type + result.add newIdentDefs(p[0], p[1]) + +proc buildRequestType(reqTypeName: NimNode, body: NimNode): NimNode = + ## Builds: + ## type * = object + ## : + ## ... + + var procNode = body + if procNode.kind == nnkStmtList and procNode.len == 1: + procNode = procNode[0] + if procNode.kind != nnkLambda: + error "registerReqFFI expects a lambda proc, found: " & $procNode.kind + + let params = procNode[3] # formal params of the lambda + var fields: seq[NimNode] = @[] + for p in params[1 .. ^1]: # skip return type at index 0 + let name = p[0] + let typ = p[1] + # Field must be nnkIdentDefs(name, type, defaultExpr) + fields.add newTree(nnkIdentDefs, name, typ, newEmptyNode()) + + # Wrap fields in a rec list + let recList = newTree(nnkRecList, fields) + + # object type node: object [of?] [] [pragma?] recList + let objTy = newTree(nnkObjectTy, newEmptyNode(), newEmptyNode(), recList) + + # Export the type (CreateNodeRequest*) + let typeName = + if reqTypeName.kind == nnkPostfix: + reqTypeName + else: + postfix(reqTypeName, "*") + + result = + newNimNode(nnkTypeSection).add(newTree(nnkTypeDef, typeName, newEmptyNode(), objTy)) proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode = var formalParams = newSeq[NimNode]() @@ -14,7 +63,7 @@ proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode = procNode = body if procNode.kind != nnkLambda: - error "registerFFI expects a lambda definition. Found: " & $procNode.kind + error "registerReqFFI expects a lambda definition. Found: " & $procNode.kind # T: typedesc[CreateNodeRequest] let typedescParam = newIdentDefs( @@ -88,30 +137,32 @@ proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode = pragmas = newEmptyNode(), ) -proc buildFfiDeleteReqProc(reqTypeName: NimNode): NimNode = - ## Generates something like: - ## proc ffiDeleteReq(self: ptr CreateNodeRequest) = - ## deallocShared(self[].configJson) +proc buildFfiDeleteReqProc(reqTypeName: NimNode, fields: seq[NimNode]): NimNode = + ## Generates: + ## proc ffiDeleteReq(self: ptr ) = + ## deallocShared(self[].) ## deallocShared(self) - result = newProc( - name = ident("ffiDeleteReq"), - params = - @[ - newEmptyNode(), # return type is empty (void) - newIdentDefs(ident("self"), nnkPtrTy.newTree(reqTypeName)), - ], - body = newStmtList( - nnkCall.newTree( + # Build the body + var body = newStmtList() + for f in fields: + if $f[1] == "cstring": # only dealloc cstring fields + body.add newCall( ident("deallocShared"), - nnkBracketExpr.newTree( - nnkBracketExpr.newTree(ident("self"), newEmptyNode()), # self[] - ident("configJson"), - ), - ), - nnkCall.newTree(ident("deallocShared"), ident("self")), - ), - procType = nnkProcDef, + newDotExpr(newTree(nnkDerefExpr, ident("self")), ident($f[0])), + ) + + # Always free the whole object at the end + body.add newCall(ident("deallocShared"), ident("self")) + + # Build the parameter: (self: ptr ) + let selfParam = newIdentDefs(ident("self"), newTree(nnkPtrTy, reqTypeName)) + + # Build the proc definition + result = newProc( + name = postfix(ident("ffiDeleteReq"), "*"), + params = @[newEmptyNode()] & @[selfParam], # ✅ properly wrapped in a sequence + body = body, ) proc buildProcessFFIRequestProc(reqTypeName, reqHandler, body: NimNode): NimNode = @@ -138,7 +189,7 @@ proc buildProcessFFIRequestProc(reqTypeName, reqHandler, body: NimNode): NimNode if procNode.kind == nnkStmtList and procNode.len == 1: procNode = procNode[0] if procNode.kind != nnkLambda: - error "registerFFI expects a lambda definition. Found: " & $procNode.kind + error "registerReqFFI expects a lambda definition. Found: " & $procNode.kind let typedescParam = newIdentDefs(ident("T"), nnkBracketExpr.newTree(ident("typedesc"), reqTypeName)) @@ -251,36 +302,46 @@ proc addNewRequestToRegistry(reqTypeName, reqHandler: NimNode): NimNode = result = newAssignment(newTree(nnkBracketExpr, ident("registeredRequests"), key), asyncProc) -macro registerFFI*(reqTypeName, reqHandler, body: untyped): untyped = +macro registerReqFFI*(reqTypeName, reqHandler, body: untyped): untyped = + ## Registers a request that will be handled by the ffi thread. + ## The request should be sent from the ffi consumer thread. + ## + + # Extract lambda params to generate fields + let fields = extractFieldsFromLambda(body) + + let typeDef = buildRequestType(reqTypeName, body) let ffiNewReqProc = buildFfiNewReqProc(reqTypeName, body) let processProc = buildProcessFFIRequestProc(reqTypeName, reqHandler, body) let addNewReqToReg = addNewRequestToRegistry(reqTypeName, reqHandler) - result = newStmtList(ffiNewReqProc, processProc, addNewReqToReg) + let deleteProc = buildFfiDeleteReqProc(reqTypeName, fields) + result = newStmtList(typeDef, ffiNewReqProc, deleteProc, processProc, addNewReqToReg) -macro ffiGenerateProcess*(): untyped = - ## Generates a case statement that handles all the possible registered FFI requests - let reqParam = ident"request" - let reqHandlerParam = ident"reqHandler" + echo "Registered FFI request: " & result.repr - var caseStmt = - newTree(nnkCaseStmt, newDotExpr(newTree(nnkBracketExpr, reqParam), ident"reqId")) +macro processReq*(reqType: typed, args: varargs[untyped]): untyped = + ## Expands T.processReq(a,b,...) into the sendRequest boilerplate. - caseStmt.add newTree( - nnkElse, - nnkStmtList.newTree( - newCall( - ident"nilProcess", newDotExpr(newTree(nnkBracketExpr, reqParam), ident"reqId") - ) - ), + # Collect the passed arguments as NimNodes + var callArgs = @[reqType, ident("callback"), ident("userData")] + for a in args: + callArgs.add a + + # Build: ffiNewReq(reqType, callback, userData, arg1, arg2, ...) + let newReqCall = newCall(ident("ffiNewReq"), callArgs) + + # Build: ffi_context.sendRequestToFFIThread(ctx, ) + let sendCall = newCall( + newDotExpr(ident("ffi_context"), ident("sendRequestToFFIThread")), + ident("ctx"), + newReqCall, ) - let retFutSym = ident"retFut" - result = quote: - proc process*[R]( - T: type FFIThreadRequest, - `reqParam`: ptr FFIThreadRequest, - `reqHandlerParam`: ptr R, - ) {.async.} = - let `retFutSym` = `caseStmt` - handleRes(await `retFutSym`, `reqParam`) + block: + let res = `sendCall` + if res.isErr: + let msg = "error in sendRequestToFFIThread: " & res.error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](msg.len), userData) + return RET_ERR + return RET_OK