diff --git a/ffi.nim b/ffi.nim index a9c983c..50d8326 100644 --- a/ffi.nim +++ b/ffi.nim @@ -1,3 +1,5 @@ -import ffi/alloc +import + ffi/[alloc, ffi_types, ffi_context, ffi_thread_request], + ffi/internal/[ffi_library, ffi_macro] -export alloc +export alloc, ffi_library, ffi_macro, ffi_types, ffi_context, ffi_thread_request diff --git a/ffi/ffi_context.nim b/ffi/ffi_context.nim index e09308d..8af9820 100644 --- a/ffi/ffi_context.nim +++ b/ffi/ffi_context.nim @@ -4,14 +4,13 @@ import std/[options, atomics, os, net, locks] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results -import ./ffi_types +import ./ffi_types, ./ffi_thread_request type FFIContext* = object - libraryName: cstring ffiThread: Thread[(ptr FFIContext)] - # represents the main thread in charge of attending SDK consumer actions + # represents the main FFI thread in charge of attending API consumer actions watchdogThread: Thread[(ptr FFIContext)] - # monitors the FFI thread and notifies the FFI SDK consumer if it hangs + # monitors the FFI thread and notifies the FFI API consumer if it hangs lock: Lock reqChannel: ChannelSPSCSingle[ptr FFIThreadRequest] reqSignal: ThreadSignalPtr # to notify the FFI Thread that a new request is sent @@ -43,13 +42,8 @@ template callEventCallback(ctx: ptr FFIContext, eventName: string, body: untyped RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData ) -proc sendRequestToWakuThread*( - ctx: ptr FFIContext, - reqType: RequestType, - reqContent: pointer, - callback: FFICallBack, - userData: pointer, - timeout = InfiniteDuration, +proc sendRequestToFFIThread*( + ctx: ptr FFIContext, ffiRequest: ptr FFIThreadRequest, timeout = InfiniteDuration ): Result[void, string] = ctx.lock.acquire() # This lock is only necessary while we use a SP Channel and while the signalling @@ -59,26 +53,21 @@ proc sendRequestToWakuThread*( defer: ctx.lock.release() - let req = FFIThreadRequest.createShared(reqType, reqContent, callback, userData) ## Sending the request - let sentOk = ctx.reqChannel.trySend(req) + let sentOk = ctx.reqChannel.trySend(ffiRequest) if not sentOk: - deallocShared(req) - return err("Couldn't send a request to the waku thread: " & $req[]) + return err("Couldn't send a request to the ffi thread") let fireSyncRes = ctx.reqSignal.fireSync() if fireSyncRes.isErr(): - deallocShared(req) return err("failed fireSync: " & $fireSyncRes.error) if fireSyncRes.get() == false: - deallocShared(req) return err("Couldn't fireSync in time") ## wait until the Waku Thread properly received the request let res = ctx.reqReceivedSignal.waitSync(timeout) if res.isErr(): - deallocShared(req) return err("Couldn't receive reqReceivedSignal signal") ## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the @@ -110,24 +99,24 @@ proc watchdogThreadBody(ctx: ptr FFIContext) {.thread.} = trace "Sending watchdog request to FFI thread" - sendRequestToWakuThread( - ctx, - RequestType.DEBUG, - DebugNodeRequest.createShared(DebugNodeMsgType.CHECK_WAKU_NOT_BLOCKED), - wakuCallback, - nilUserData, - WakuNotRespondingTimeout, - ).isOkOr: - error "Failed to send watchdog request to FFI thread", error = $error - onWakuNotResponding(ctx) + # sendRequestToFFIThread( + # ctx, + # 0, + # DebugNodeRequest.createShared(DebugNodeMsgType.CHECK_WAKU_NOT_BLOCKED), + # wakuCallback, + # nilUserData, + # WakuNotRespondingTimeout, + # ).isOkOr: + # error "Failed to send watchdog request to FFI thread", error = $error + # onWakuNotResponding(ctx) waitFor watchdogRun(ctx) -proc ffiThreadBody(ctx: ptr FFIContext) {.thread.} = +proc ffiThreadBody[TT](ctx: ptr FFIContext) {.thread.} = ## FFI thread that attends library user API requests let ffiRun = proc(ctx: ptr FFIContext) {.async.} = - var waku: Waku + var ffiHandler: TT while true: await ctx.reqSignal.wait() @@ -138,11 +127,11 @@ proc ffiThreadBody(ctx: ptr FFIContext) {.thread.} = var request: ptr FFIThreadRequest let recvOk = ctx.reqChannel.tryRecv(request) if not recvOk: - error "ffi thread could not receive a request" + chronicles.error "ffi thread could not receive a request" continue ## Handle the request - asyncSpawn FFIThreadRequest.process(request, addr waku) + asyncSpawn FFIThreadRequest.process(request, addr ffiHandler) let fireRes = ctx.reqReceivedSignal.fireSync() if fireRes.isErr(): @@ -150,7 +139,7 @@ proc ffiThreadBody(ctx: ptr FFIContext) {.thread.} = waitFor ffiRun(ctx) -proc createFFIContext*(libraryName: cstring): Result[ptr FFIContext, string] = +proc createFFIContext*[T](tt: typedesc[T]): Result[ptr FFIContext, string] = ## This proc is called from the main thread and it creates ## the FFI working thread. var ctx = createShared(FFIContext, 1) @@ -163,7 +152,7 @@ proc createFFIContext*(libraryName: cstring): Result[ptr FFIContext, string] = ctx.running.store(true) try: - createThread(ctx.ffiThread, ffiThreadBody, ctx) + createThread(ctx.ffiThread, ffiThreadBody[tt], ctx) except ValueError, ResourceExhaustedError: freeShared(ctx) return err("failed to create the Waku thread: " & getCurrentExceptionMsg()) @@ -176,7 +165,7 @@ proc createFFIContext*(libraryName: cstring): Result[ptr FFIContext, string] = return ok(ctx) -proc destroyFFIContext*(ctx: ptr FFIContext): Result[void, string] = +proc destroyFFIContext*[T](ctx: ptr FFIContext): Result[void, string] = ctx.running.store(false) let signaledOnTime = ctx.reqSignal.fireSync().valueOr: @@ -189,7 +178,6 @@ proc destroyFFIContext*(ctx: ptr FFIContext): Result[void, string] = ctx.lock.deinitLock() ?ctx.reqSignal.close() ?ctx.reqReceivedSignal.close() - deallocShared(ctx.libraryName) freeShared(ctx) return ok() diff --git a/ffi/ffi_thread_request.nim b/ffi/ffi_thread_request.nim new file mode 100644 index 0000000..0bc9e91 --- /dev/null +++ b/ffi/ffi_thread_request.nim @@ -0,0 +1,83 @@ +## This file contains the base message request type that will be handled. +## The requests are created by the main thread and processed by +## the Waku Thread. + +import std/macros +import std/json, results +import chronos, chronos/threadsync +import ./ffi_types, ./internal/ffi_macro +import waku/factory/waku +import ../../../library/waku_thread_requests/requests/peer_manager_request +# type FFIRequestProcessor* = +# concept +# proc process[R]( +# T: type FFIThreadRequest, request: ptr FFIThreadRequest, reqHandler: ptr R +# ) + +type FFIThreadRequest* = object + callback: FFICallBack + userData: pointer + reqId: uint + reqContent*: pointer + +proc init*( + T: typedesc[FFIThreadRequest], + callback: FFICallBack, + userData: pointer, + reqId: uint, + reqContent: pointer, +): ptr type T = + var ret = createShared(FFIThreadRequest) + ret[].callback = callback + ret[].userData = userData + ret[].reqId = reqId + ret[].reqContent = reqContent + return ret + +proc deleteRequest(request: ptr FFIThreadRequest) = + deallocShared(request) + +proc handleRes[T: string | void]( + res: Result[T, string], request: ptr FFIThreadRequest +) = + ## Handles the Result responses, which can either be Result[string, string] or + ## Result[void, string]. + + defer: + deleteRequest(request) + + if res.isErr(): + foreignThreadGc: + let msg = "libwaku error: handleRes fireSyncRes error: " & $res.error + request[].callback( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + + foreignThreadGc: + var msg: cstring = "" + when T is string: + msg = res.get().cstring() + request[].callback( + RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + +proc nilProcess(reqId: uint): Future[Result[string, string]] {.async.} = + return err("This request type is not implemented: " & $reqId) + +ffiGenerateProcess() + +# dumpAstGen: +# proc process*[R]( +# T: type FFIThreadRequest, request: ptr FFIThreadRequest, reqHandler: ptr R +# ) {.async.} = +# # reqHandler represents the object that actually processes the request. +# let retFut = +# case request[].reqId +# of 1: +# cast[ptr PeerManagementRequest](request[].reqContent).process(reqHandler) +# else: +# nilProcess(request[].reqId) + +# handleRes(await retFut, request) diff --git a/ffi/internal/ffi_library.nim b/ffi/internal/ffi_library.nim index 04d1dee..0273018 100644 --- a/ffi/internal/ffi_library.nim +++ b/ffi/internal/ffi_library.nim @@ -1,51 +1,82 @@ -import std/macros, strformat +import std/[macros, atomics], strformat, chronicles, chronos macro declareLibrary*(libraryName: static[string]): untyped = - result = newStmtList() + var res = newStmtList() - {.pragma: exported, exportc, cdecl, raises: [].} - {.pragma: callback, cdecl, raises: [], gcsafe.} - {.passc: "-fPIC".} + ## Generate {.pragma: exported, exportc, cdecl, raises: [].} + res.add nnkPragma.newTree( + nnkExprColonExpr.newTree(ident"pragma", ident"exported"), + ident"exportc", + ident"cdecl", + nnkExprColonExpr.newTree(ident"raises", nnkBracket.newTree()), + ) + + ## Generate {.pragma: callback, cdecl, raises: [], gcsafe.} + res.add nnkPragma.newTree( + nnkExprColonExpr.newTree(ident"pragma", ident"callback"), + ident"cdecl", + nnkExprColonExpr.newTree(ident"raises", nnkBracket.newTree()), + ident"gcsafe", + ) + + ## Generate {.passc: "-fPIC".} + res.add nnkPragma.newTree(nnkExprColonExpr.newTree(ident"passc", newLit("-fPIC"))) when defined(linux): - ## Generates {.passl: "-Wl,-soname,libwaku.so".} + ## Generates {.passl: "-Wl,-soname,libwaku.so".} (considering libraryName=="waku", for example) let soName = fmt"-Wl,-soname,lib{libraryName}.so" - result.add(nnkPragmaStmt.newTree(ident"passl", newStrLitNode(soName))) + res.add( + newNimNode(nnkPragma).add( + nnkExprColonExpr.newTree(ident"passl", newStrLitNode(soName)) + ) + ) ## proc lib{libraryName}NimMain() {.importc.} - let procName = ident(fmt"lib{libraryName}NimMain") + let libNimMainName = ident(fmt"lib{libraryName}NimMain") let importcPragma = nnkPragma.newTree(ident"importc") let procDef = newProc( - name = procName, - params = @[], + name = libNimMainName, + params = @[ident"void"], pragmas = importcPragma, body = newEmptyNode(), - returnType = newEmptyNode(), # no return value ) - result.add(procDef) + res.add(procDef) -################################################################################ -### Library setup + # Create: var initialized: Atomic[bool] + let atomicType = nnkBracketExpr.newTree(ident("Atomic"), ident("bool")) + let varStmt = nnkVarSection.newTree( + nnkIdentDefs.newTree(ident("initialized"), atomicType, newEmptyNode()) + ) + res.add(varStmt) -# To control when the library has been initialized -var initialized: Atomic[bool] + ## Android chronicles redirection + let chroniclesBlock = quote: + when defined(android) and compiles(defaultChroniclesStream.outputs[0].writer): + defaultChroniclesStream.outputs[0].writer = proc( + logLevel: LogLevel, msg: LogOutputStr + ) {.raises: [].} = + echo logLevel, msg + result.add(chroniclesBlock) -if defined(android): - # Redirect chronicles to Android System logs - when compiles(defaultChroniclesStream.outputs[0].writer): - defaultChroniclesStream.outputs[0].writer = proc( - logLevel: LogLevel, msg: LogOutputStr - ) {.raises: [].} = - echo logLevel, msg + let procName = ident("initializeLibrary") + let nimMainName = ident("lib" & libraryName & "NimMain") -proc initializeLibrary() {.exported.} = - if not initialized.exchange(true): - ## Every Nim library needs to call `NimMain` once exactly, to initialize the Nim runtime. - ## Being `` the value given in the optional compilation flag --nimMainPrefix:yourprefix - libwakuNimMain() - when declared(setupForeignThreadGc): - setupForeignThreadGc() - when declared(nimGC_setStackBottom): - var locals {.volatile, noinit.}: pointer - locals = addr(locals) - nimGC_setStackBottom(locals) + let initializeLibraryProc = quote: + proc `procName`() {.exported.} = + if not initialized.exchange(true): + ## Every Nim library needs to call `NimMain` once exactly, + ## to initialize the Nim runtime. + ## Being `` the value given in the optional + ## compilation flag --nimMainPrefix:yourprefix + `nimMainName`() + when declared(setupForeignThreadGc): + setupForeignThreadGc() + when declared(nimGC_setStackBottom): + var locals {.volatile, noinit.}: pointer + locals = addr(locals) + nimGC_setStackBottom(locals) + + res.add(initializeLibraryProc) + + echo result.repr + return res diff --git a/ffi/internal/ffi_macro.nim b/ffi/internal/ffi_macro.nim index ad532d5..e01fbcd 100644 --- a/ffi/internal/ffi_macro.nim +++ b/ffi/internal/ffi_macro.nim @@ -1,51 +1,258 @@ -import std/macros - +import std/[macros, tables] import chronos +import ../ffi_thread_request, ../ffi_types -macro ffi*(p: typed, args: varargs[untyped]): untyped = - ## p: the proc definition AST - ## args: expected to be (RequestTypeValue, MsgTypeValue) +var registeredRequests {.compileTime.}: Table[uint, NimNode] - if p.kind != nnkProcDef: - error("ffi pragma can only be applied to proc definitions", p) - - if args.len != 2: - error( - "ffi pragma requires exactly two arguments: (RequestTypeValue, MsgTypeValue)", p - ) - - let reqType = args[0] - let msgType = args[1] - - let origProc = p - let origName = p.name - let exportedName = origName - - result = newStmtList() - result.add(origProc) - - # Build exported wrapper proc: - let wrapperParams = newSeq[NimNode]() - wrapperParams.add(newIdentDefs(ident("ctx"), newPtrType(ident("WakuContext")))) - wrapperParams.add(newIdentDefs(ident("callback"), ident("WakuCallBack"))) - wrapperParams.add(newIdentDefs(ident("userData"), ident("pointer"))) - - let wrapperPragmas = nnkPragma.newTree(ident("dynlib"), ident("exportc")) +proc fnv1aHash32*(s: string): uint = + const + FNV_offset_basis: uint = 2166136261'u32.uint + FNV_prime: uint = 16777619'u32.uint + var hash: uint = FNV_offset_basis + for c in s: + hash = hash xor uint(ord(c)) + hash = hash * FNV_prime + return hash +proc generateProc(p: NimNode, args: varargs[NimNode]): NimNode = + let wrapperPragmas = + nnkPragma.newTree(ident("dynlib"), ident("exportc"), ident("cdecl")) + let origBody = p.body let wrapperBody = quote: initializeLibrary() - checkLibwakuParams(ctx, callback, userData) - handleRequest( - ctx, `reqType`, PeerManagementRequest.createShared(`msgType`), callback, userData - ) - return 0.cint + `origBody` + let origParams = p.params let wrapperProc = newProc( - name = exportedName, - params = wrapperParams, - pragmas = wrapperPragmas, - body = wrapperBody, - returnType = ident("cint"), + name = p.name, params = @[origParams], pragmas = wrapperPragmas, body = wrapperBody ) - result.add(wrapperProc) + var res = newStmtList() + res.add(wrapperProc) + return res + +proc buildFfiNewReqProc(reqTypeName, body: NimNode): NimNode = + # Standard FFI params + var formalParams = newSeq[NimNode]() + + var procNode: NimNode + if body.kind == nnkStmtList and body.len == 1: + procNode = body[0] # unwrap single statement + else: + procNode = body + + if procNode.kind != nnkLambda: + error "registerFFI expects a lambda definition. Found: " & $procNode.kind + + # T: typedesc[CreateNodeRequest] + let typedescParam = newIdentDefs( + ident("T"), # param name + nnkBracketExpr.newTree(ident("typedesc"), reqTypeName), # typedesc[T] + ) + formalParams.add(typedescParam) + + # Other fixed FFI params + formalParams.add(newIdentDefs(ident("callback"), ident("FFICallBack"))) + formalParams.add(newIdentDefs(ident("userData"), ident("pointer"))) + + # Add original lambda params + let procParams = procNode[3] + for p in procParams[1 .. ^1]: + formalParams.add(p) + + # Build `ptr FFIThreadRequest` + let retType = newNimNode(nnkPtrTy) + retType.add(ident("FFIThreadRequest")) + + formalParams = @[retType] & formalParams + + # Build body + let reqObjIdent = ident("reqObj") + var newBody = newStmtList() + newBody.add( + quote do: + var `reqObjIdent` = createShared(T) + ) + + for p in procParams[1 .. ^1]: + let fieldNameIdent = ident($p[0]) + let fieldTypeNode = p[1] + + # Extract type name as string + var typeStr: string + if fieldTypeNode.kind == nnkIdent: + typeStr = $fieldTypeNode + elif fieldTypeNode.kind == nnkBracketExpr: + typeStr = $fieldTypeNode[0] # e.g., `ptr` in `ptr[Waku]` + else: + typeStr = "" # fallback + + # Apply .alloc() only to cstrings + if typeStr == "cstring": + newBody.add( + quote do: + `reqObjIdent`[].`fieldNameIdent` = `fieldNameIdent`.alloc() + ) + else: + newBody.add( + quote do: + `reqObjIdent`[].`fieldNameIdent` = `fieldNameIdent` + ) + + # FFIThreadRequest.init using fnv1aHash32 + newBody.add( + quote do: + let typeStr = $T + var ret = + FFIThreadRequest.init(callback, userData, fnv1aHash32(typeStr), `reqObjIdent`) + return ret + ) + + # Build the proc node + result = newProc( + name = postfix(ident("ffiNewReq"), "*"), + params = formalParams, + body = newBody, + pragmas = newEmptyNode(), + ) + + echo result.repr + +proc buildFfiDeleteReqProc(reqTypeName: NimNode): NimNode = + ## Generates something like: + ## proc ffiDeleteReq(self: ptr CreateNodeRequest) = + ## deallocShared(self[].configJson) + ## deallocShared(self) + + result = newProc( + name = ident("ffiDeleteReq"), + params = + @[ + newEmptyNode(), # return type is empty (void) + newIdentDefs(ident("self"), nnkPtrTy.newTree(reqTypeName)), + ], + body = newStmtList( + nnkCall.newTree( + ident("deallocShared"), + nnkBracketExpr.newTree( + nnkBracketExpr.newTree(ident("self"), newEmptyNode()), # self[] + ident("configJson"), + ), + ), + nnkCall.newTree(ident("deallocShared"), ident("self")), + ), + procType = nnkProcDef, + ) + +proc buildProcessFFIRequestProc(reqTypeName, reqHandler, body: NimNode): NimNode = + ## Builds: + ## proc processFFIRequest(T: typedesc[CreateNodeRequest]; + ## configJson: cstring; + ## appCallbacks: AppCallbacks; + ## waku: ptr Waku) ... + + # Require: ident: ptr SomeType + if reqHandler.kind != nnkExprColonExpr: + error( + "Second argument must be a typed parameter, e.g., waku: ptr Waku. Found: " & + $reqHandler.kind + ) + + let rhs = reqHandler[1] + if rhs.kind != nnkPtrTy: + error("Second argument must be a pointer type, e.g., waku: ptr Waku") + + var procNode = body + if procNode.kind == nnkStmtList and procNode.len == 1: + procNode = procNode[0] + + if procNode.kind != nnkLambda: + error "registerFFI expects a lambda definition. Found: " & $procNode.kind + + var formalParams = newSeq[NimNode]() + + # First param: T: typedesc[reqTypeName] + let typedescParam = + newIdentDefs(ident("T"), nnkBracketExpr.newTree(ident("typedesc"), reqTypeName)) + formalParams.add(typedescParam) + + # Add original lambda params + let procParams = procNode[3] + for p in procParams[1 .. ^1]: + formalParams.add(p) + + # Add pointer handler param at the end + formalParams.add( + newIdentDefs(reqHandler[0], rhs) # e.g., waku: ptr Waku + ) + + # Return type first + formalParams = @[procParams[0]] & formalParams + + # Pragmas + let pragmas = + if procNode.len >= 5: + procNode[4] + else: + newEmptyNode() + + # Body + let bodyNode = + if procNode.body.kind == nnkStmtList: + procNode.body + else: + newStmtList(procNode.body) + + # Build proc + result = newProc( + name = postfix(ident("processFFIRequest"), "*"), + params = formalParams, + body = bodyNode, + procType = nnkProcDef, + pragmas = pragmas, + ) + echo result.repr + +macro registerFFI*(reqTypeName, reqHandler, body: untyped): untyped = + let ffiNewReqProc = buildFfiNewReqProc(reqTypeName, body) + let processProc = buildProcessFFIRequestProc(reqTypeName, reqHandler, body) + result = newStmtList(ffiNewReqProc, processProc) + +macro ffiGenerateProcess*(): untyped = + ## Generates a case statement that handles all the possible registered FFI requests + let reqParam = ident"request" + let reqHandlerParam = ident"reqHandler" + + var caseStmt = + newTree(nnkCaseStmt, newDotExpr(newTree(nnkBracketExpr, reqParam), ident"reqId")) + + for caseKey, typeIdent in registeredRequests.pairs: + let castExpr = newTree( + nnkCast, + newTree(nnkPtrTy, typeIdent), + newDotExpr(newTree(nnkBracketExpr, reqParam), ident"reqContent"), + ) + let callExpr = newCall(ident"process", castExpr, reqHandlerParam) + caseStmt.add newTree(nnkOfBranch, newLit(caseKey), nnkStmtList.newTree(callExpr)) + + caseStmt.add newTree( + nnkElse, + nnkStmtList.newTree( + newCall( + ident"nilProcess", newDotExpr(newTree(nnkBracketExpr, reqParam), ident"reqId") + ) + ), + ) + + let retFutSym = ident"retFut" + + result = quote: + proc process*[R]( + T: type FFIThreadRequest, + `reqParam`: ptr FFIThreadRequest, + `reqHandlerParam`: ptr R, + ) {.async.} = + let `retFutSym` = `caseStmt` + handleRes(await `retFutSym`, `reqParam`) + + echo result.repr diff --git a/ffi/thread_requests/ffi_lifecycle_request.nim b/ffi/thread_requests/ffi_lifecycle_request.nim deleted file mode 100644 index fc6d5de..0000000 --- a/ffi/thread_requests/ffi_lifecycle_request.nim +++ /dev/null @@ -1,18 +0,0 @@ -type LifeCycleRequest {.ffi.} = object - discard - -type PeerManagerRequest {.ffi.} = object - discard - -type PeerManagerRequest* = object - reqType: 12123491 ## random int - reqContent: pointer - callback: WakuCallBack - userData: pointer - -## createShared -## process -## deallocShared -## -## -## \ No newline at end of file diff --git a/ffi/thread_requests/ffi_thread_request.nim b/ffi/thread_requests/ffi_thread_request.nim deleted file mode 100644 index bdaaa22..0000000 --- a/ffi/thread_requests/ffi_thread_request.nim +++ /dev/null @@ -1,104 +0,0 @@ -## This file contains the base message request type that will be handled. -## The requests are created by the main thread and processed by -## the Waku Thread. - -import std/json, results -import chronos, chronos/threadsync -import - ../../waku/factory/waku, - ../ffi_types, - ./requests/node_lifecycle_request, - ./requests/peer_manager_request, - ./requests/protocols/relay_request, - ./requests/protocols/store_request, - ./requests/protocols/lightpush_request, - ./requests/protocols/filter_request, - ./requests/debug_node_request, - ./requests/discovery_request, - ./requests/ping_request - -type RequestType* {.pure.} = enum - LIFECYCLE - PEER_MANAGER - PING - RELAY - STORE - DEBUG - DISCOVERY - LIGHTPUSH - FILTER - -type FFIThreadRequest* = object - reqType: RequestType - reqContent: pointer - callback: FFICallBack - userData: pointer - -proc createShared*( - T: type FFIThreadRequest, - reqType: RequestType, - reqContent: pointer, - callback: FFICallBack, - userData: pointer, -): ptr type T = - var ret = createShared(T) - ret[].reqType = reqType - ret[].reqContent = reqContent - ret[].callback = callback - ret[].userData = userData - return ret - -proc handleRes[T: string | void]( - res: Result[T, string], request: ptr FFIThreadRequest -) = - ## Handles the Result responses, which can either be Result[string, string] or - ## Result[void, string]. - - defer: - deallocShared(request) - - if res.isErr(): - foreignThreadGc: - let msg = "libwaku error: handleRes fireSyncRes error: " & $res.error - request[].callback( - RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData - ) - return - - foreignThreadGc: - var msg: cstring = "" - when T is string: - msg = res.get().cstring() - request[].callback( - RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData - ) - return - -proc process*( - T: type FFIThreadRequest, request: ptr FFIThreadRequest, waku: ptr Waku -) {.async.} = - let retFut = - case request[].reqType - of LIFECYCLE: - cast[ptr NodeLifecycleRequest](request[].reqContent).process(waku) - of PEER_MANAGER: - cast[ptr PeerManagementRequest](request[].reqContent).process(waku[]) - of PING: - cast[ptr PingRequest](request[].reqContent).process(waku) - of RELAY: - cast[ptr RelayRequest](request[].reqContent).process(waku) - of STORE: - cast[ptr StoreRequest](request[].reqContent).process(waku) - of DEBUG: - cast[ptr DebugNodeRequest](request[].reqContent).process(waku[]) - of DISCOVERY: - cast[ptr DiscoveryRequest](request[].reqContent).process(waku) - of LIGHTPUSH: - cast[ptr LightpushRequest](request[].reqContent).process(waku) - of FILTER: - cast[ptr FilterRequest](request[].reqContent).process(waku) - - handleRes(await retFut, request) - -proc `$`*(self: FFIThreadRequest): string = - return $self.reqType