diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index 699024b..5d10df4 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -7,7 +7,7 @@ import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, import ./ffi_types, ./ffi_thread_request, ./internal/ffi_macro, ./logging type FFIContext*[T] = object - myLib*: T + myLib*: ptr T # main library object (e.g., Waku, LibP2P, SDS, the one to be exposed as a library) ffiThread: Thread[(ptr FFIContext[T])] # represents the main FFI thread in charge of attending API consumer actions @@ -76,7 +76,7 @@ proc sendRequestToFFIThread*( ## Notice that in case of "ok", the deallocShared(req) is performed by the FFI Thread in the ## process proc. - ok() + return ok() type Foo = object registerReqFFI(WatchdogReq, foo: ptr Foo): @@ -98,6 +98,7 @@ proc onNotResponding*(ctx: ptr FFIContext) = proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = ## Watchdog thread that monitors the FFI thread and notifies the library user if it hangs. + ## This thread never blocks. let watchdogRun = proc(ctx: ptr FFIContext) {.async.} = const WatchdogStartDelay = 10.seconds @@ -126,12 +127,34 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = waitFor watchdogRun(ctx) +proc processRequest[T]( + request: ptr FFIThreadRequest, ctx: ptr FFIContext[T] +) {.async.} = + ## Invoked within the FFI thread to process a request coming from the FFI API consumer thread. + + let reqId = $request[].reqId + ## The reqId determines which proc will handle the request. + ## The registeredRequests represents a table defined at compile time. + ## Then, registeredRequests == Table[reqId, proc-handling-the-request-asynchronously] + + let retFut = + if not ctx[].registeredRequests[].contains(reqId): + ## That shouldn't happen because only registered requests should be sent to the FFI thread. + nilProcess(request[].reqId) + else: + ctx[].registeredRequests[][reqId](request[].reqContent, ctx) + handleRes(await retFut, request) + proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = - ## FFI thread that attends library user API requests + ## FFI thread body that attends library user API requests logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) let ffiRun = proc(ctx: ptr FFIContext[T]) {.async.} = + var ffiReqHandler: T + ## Holds the main library object, i.e., in charge of handling the ffi requests. + ## e.g., Waku, LibP2P, SDS, etc. + while true: await ctx.reqSignal.wait() @@ -145,10 +168,10 @@ proc ffiThreadBody[T](ctx: ptr FFIContext[T]) {.thread.} = chronicles.error "ffi thread could not receive a request" continue + ctx.myLib = addr ffiReqHandler + ## Handle the request - asyncSpawn FFIThreadRequest.process( - request, addr ctx.myLib, ctx.registeredRequests - ) + asyncSpawn processRequest(request, ctx) let fireRes = ctx.reqReceivedSignal.fireSync() if fireRes.isErr(): diff --git a/ffi/ffi_thread_request.nim b/ffi/ffi_thread_request.nim index 8ad25de..4f3c119 100644 --- a/ffi/ffi_thread_request.nim +++ b/ffi/ffi_thread_request.nim @@ -4,12 +4,12 @@ import std/[json, macros], results, tables import chronos, chronos/threadsync -import ./ffi_types, ./internal/ffi_macro, ./alloc, ./ffi_context +import ./ffi_types, ./internal/ffi_macro, ./alloc type FFIThreadRequest* = object callback: FFICallBack userData: pointer - reqId: cstring + reqId*: cstring reqContent*: pointer proc init*( @@ -30,7 +30,7 @@ proc deleteRequest(request: ptr FFIThreadRequest) = deallocShared(request[].reqId) deallocShared(request) -proc handleRes[T: string | void]( +proc handleRes*[T: string | void]( res: Result[T, string], request: ptr FFIThreadRequest ) = ## Handles the Result responses, which can either be Result[string, string] or @@ -56,20 +56,6 @@ proc handleRes[T: string | void]( ) return -proc nilProcess(reqId: cstring): Future[Result[string, string]] {.async.} = +proc nilProcess*(reqId: cstring): Future[Result[string, string]] {.async.} = return err("This request type is not implemented: " & $reqId) -proc process*[R]( - T: type FFIThreadRequest, - request: ptr FFIThreadRequest, - reqHandler: ptr R, - registeredRequests: ptr Table[cstring, FFIRequestProc], -) {.async.} = - let reqId = $request[].reqId - - let retFut = - if not registeredRequests[].contains(reqId): - nilProcess(request[].reqId) - else: - registeredRequests[][reqId](request[].reqContent, reqHandler) - handleRes(await retFut, request) diff --git a/ffi/ffi_types.nim b/ffi/ffi_types.nim index 3688ddb..76ead50 100644 --- a/ffi/ffi_types.nim +++ b/ffi/ffi_types.nim @@ -30,9 +30,9 @@ template foreignThreadGc*(body: untyped) = when declared(tearDownForeignThreadGc): tearDownForeignThreadGc() -type onDone* = proc() - -## Registered requests table populated at compile time and never updated at run time +## Registered requests table populated at compile time and never updated at run time. +## The key represents the request type name as cstring, e.g., "CreateNodeRequest". +## The value is a proc that handles the request asynchronously. var registeredRequests*: Table[cstring, FFIRequestProc] ### End of FFI utils diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index 88fc75b..0aa7a86 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -3,7 +3,18 @@ import chronos import ../ffi_types proc extractFieldsFromLambda(body: NimNode): seq[NimNode] = - ## Extracts the fields (params) from the given lambda body. + ## Extracts the fields (params) from the given lambda body, when using the registerReqFFI macro. + ## e.g., for: + ## registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]): + ## proc( + ## configJson: cstring, appCallbacks: AppCallbacks + ## ): Future[Result[string, string]] {.async.} = + ## ... + ## The extracted fields will be: + ## - configJson: cstring + ## - appCallbacks: AppCallbacks + ## + var procNode = body if procNode.kind == nnkStmtList and procNode.len == 1: procNode = procNode[0] @@ -15,11 +26,19 @@ proc extractFieldsFromLambda(body: NimNode): seq[NimNode] = for p in params[1 .. ^1]: # skip return type result.add newIdentDefs(p[0], p[1]) + when defined(ffiDumpMacros): + echo result.repr + proc buildRequestType(reqTypeName: NimNode, body: NimNode): NimNode = ## Builds: ## type * = object ## : ## ... + ## e.g.: + ## type CreateNodeRequest* = object + ## configJson: cstring + ## appCallbacks: AppCallbacks + ## var procNode = body if procNode.kind == nnkStmtList and procNode.len == 1: @@ -51,7 +70,28 @@ proc buildRequestType(reqTypeName: NimNode, body: NimNode): NimNode = result = newNimNode(nnkTypeSection).add(newTree(nnkTypeDef, typeName, newEmptyNode(), objTy)) + when defined(ffiDumpMacros): + echo result.repr + proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode = + ## Builds the ffiNewProc in charge of creating the FFIThreadRequest in shared memory. + ## Then, a pointer to this request will be sent to the FFI thread for processing. + ## e.g.: + ## proc ffiNewReq*(T: typedesc[CreateNodeRequest]; callback: FFICallBack; + ## userData: pointer; configJson: cstring; + ## appCallbacks: AppCallbacks): ptr FFIThreadRequest = + ## var reqObj = createShared(T) + ## reqObj[].configJson = configJson.alloc() + ## reqObj[].appCallbacks = appCallbacks + ## let typeStr`gensym2866 = $T + ## var ret`gensym2866 = FFIThreadRequest.init(callback, userData, + ## typeStr`gensym2866.cstring, reqObj) + ## return ret`gensym2866 + ## + ## This should be invoked by the ffi consumer thread (generally, main thread.) + ## Notice that the shared memory allocated by the main thread is freed by the FFI thread + ## after processing the request. + var formalParams = newSeq[NimNode]() var procNode: NimNode @@ -135,6 +175,9 @@ proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode = pragmas = newEmptyNode(), ) + when defined(ffiDumpMacros): + echo result.repr + proc buildFfiDeleteReqProc(reqTypeName: NimNode, fields: seq[NimNode]): NimNode = ## Generates: ## proc ffiDeleteReq(self: ptr ) = @@ -163,15 +206,15 @@ proc buildFfiDeleteReqProc(reqTypeName: NimNode, fields: seq[NimNode]): NimNode body = body, ) + when defined(ffiDumpMacros): + echo result.repr + proc buildProcessFFIRequestProc(reqTypeName, reqHandler, body: NimNode): NimNode = - ## Builds: + ## Builds, f.e.: ## proc processFFIRequest(T: typedesc[CreateNodeRequest]; ## configJson: cstring; ## appCallbacks: AppCallbacks; - ## waku: ptr Waku) ... - - ## Builds: - ## proc processFFIRequest*(request: pointer; waku: ptr Waku) ... + ## ctx: ptr FFIContext[Waku]) ... if reqHandler.kind != nnkExprColonExpr: error( @@ -237,10 +280,13 @@ proc buildProcessFFIRequestProc(reqTypeName, reqHandler, body: NimNode): NimNode newEmptyNode(), ) + when defined(ffiDumpMacros): + echo result.repr + proc addNewRequestToRegistry(reqTypeName, reqHandler: NimNode): NimNode = ## Adds a new request to the registeredRequests table. - ## The key is the hash of the request type name, and the value is the NimNode - ## representing the request type. + ## The key is a representation of the request, e.g. "CreateNodeReq". + ## The value is a proc definition in charge of handling the request from FFI thread. # Build: request[].reqContent let reqContent = @@ -294,16 +340,44 @@ proc addNewRequestToRegistry(reqTypeName, reqHandler: NimNode): NimNode = let reqTypeNameStr = $reqTypeName - # Use a string literal instead of reqTypeNameStr let key = newLit($reqTypeName) # Generate: registeredRequests["CreateNodeRequest"] = result = newAssignment(newTree(nnkBracketExpr, ident("registeredRequests"), key), asyncProc) + when defined(ffiDumpMacros): + echo result.repr + macro registerReqFFI*(reqTypeName, reqHandler, body: untyped): untyped = - ## Registers a request that will be handled by the ffi thread. + ## Registers a request that will be handled by the FFI/working thread. ## The request should be sent from the ffi consumer thread. - ## + ## + ## e.g.: + ## In this example, we register a CreateNodeRequest that will be handled by a proc that contains + ## the provided lambda body and parameters, by the FFI/working thread. + ## + ## The lambda passed to this macro must: + ## - only have no-GC'ed types. + ## - Return Future[Result[string, string]] and be annotated with {.async.} + ## And notice that the returned values will be sent back to the ffi consumer thread. + ## + ## registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]): + ## proc( + ## configJson: cstring, appCallbacks: AppCallbacks + ## ): Future[Result[string, string]] {.async.} = + ## ctx.myLib[] = (await createWaku(configJson, cast[AppCallbacks](appCallbacks))).valueOr: + ## return err($error) + ## return ok("") + ## + ## On the other hand, the created FFI request should be dispatched from the ffi consumer thread + ## (generally, the main thread) following something like: + ## + ## ffi.sendRequestToFFIThread( + ## ctx, CreateNodeRequest.ffiNewReq(callback, userData, configJson, appCallbacks) + ## ).isOkOr: + ## ... + ## ... + ## # Extract lambda params to generate fields let fields = extractFieldsFromLambda(body) @@ -315,10 +389,17 @@ macro registerReqFFI*(reqTypeName, reqHandler, body: untyped): untyped = let deleteProc = buildFfiDeleteReqProc(reqTypeName, fields) result = newStmtList(typeDef, ffiNewReqProc, deleteProc, processProc, addNewReqToReg) + when defined(ffiDumpMacros): + echo result.repr + macro processReq*( reqType, ctx, callback, userData: untyped, args: varargs[untyped] ): untyped = ## Expands T.processReq(ctx, callback, userData, a, b, ...) + ## e.g.: + ## waku_dial_peerReq.processReq(ctx, callback, userData, peerMultiAddr, protocol, timeoutMs) + ## + var callArgs = @[reqType, callback, userData] for a in args: callArgs.add a @@ -338,7 +419,52 @@ macro processReq*( return RET_ERR return RET_OK + when defined(ffiDumpMacros): + echo result.repr + macro ffi*(prc: untyped): untyped = + ## Defines an FFI-exported proc that registers a request handler to be executed + ## asynchronously in the FFI thread. + ## + ## {.ffi.} implicitly implies: ...Return[Future[Result[string, string]] {.async.} + ## + ## When using {.ffi.}, the first three parameters must be: + ## - ctx: ptr FFIContext[T] <-- T is the type that handles the FFI requests + ## - callback: FFICallBack + ## - userData: pointer + ## Then, additional parameters may be defined as needed, after these first three, always + ## considering that only no-GC'ed (or C-like) types are allowed. + ## + ## e.g.: + ## proc waku_version( + ## ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ## ) {.ffi.} = + ## return ok(WakuNodeVersionString) + ## + ## e.g2.: + ## proc waku_start( + ## ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer + ## ) {.ffi.} = + ## (await startWaku(ctx[].myLib)).isOkOr: + ## error "START_NODE failed", error = error + ## return err("failed to start: " & $error) + ## return ok("") + ## + ## e.g3.: + ## proc waku_peer_exchange_request( + ## ctx: ptr FFIContext[Waku], + ## callback: FFICallBack, + ## userData: pointer, + ## numPeers: uint64, + ## ) {.ffi.} = + ## let numValidPeers = (await performPeerExchangeRequestTo(numPeers, ctx.myLib[])).valueOr: + ## error "waku_peer_exchange_request failed", error = error + ## return err("failed peer exchange: " & $error) + ## return ok($numValidPeers) + ## + ## In these examples, notice that ctx.myLib is of type "ptr Waku", being Waku main library type. + ## + let procName = prc[0] let formalParams = prc[3] let bodyNode = prc[^1] @@ -414,3 +540,6 @@ macro ffi*(prc: untyped): untyped = `anonymousProcNode` result = newStmtList(registerReq, ffiProc) + + when defined(ffiDumpMacros): + echo result.repr