diff --git a/examples/cbindings/waku_example.c b/examples/cbindings/waku_example.c index 39ee89f27..d6a0cbc7d 100644 --- a/examples/cbindings/waku_example.c +++ b/examples/cbindings/waku_example.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -13,6 +14,22 @@ #include "base64.h" #include "../../library/libwaku.h" + +// Shared synchronization variables +pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +int callback_executed = 0; + +void waitForCallback() { + pthread_mutex_lock(&mutex); + while (!callback_executed) { + pthread_cond_wait(&cond, &mutex); + } + callback_executed = 0; + pthread_mutex_unlock(&mutex); +} + + #define WAKU_CALL(call) \ do { \ int ret = call; \ @@ -20,6 +37,7 @@ do { \ printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ exit(1); \ } \ + waitForCallback(); \ } while (0) struct ConfigNode { @@ -99,6 +117,21 @@ void event_handler(int callerRet, const char* msg, size_t len, void* userData) { else if (callerRet == RET_OK) { printf("Receiving event: %s\n", msg); } + + pthread_mutex_lock(&mutex); + callback_executed = 1; + pthread_cond_signal(&cond); + pthread_mutex_unlock(&mutex); +} + +void on_event_received(int callerRet, const char* msg, size_t len, void* userData) { + if (callerRet == RET_ERR) { + printf("Error: %s\n", msg); + exit(1); + } + else if (callerRet == RET_OK) { + printf("Receiving event: %s\n", msg); + } } char* contentTopic = NULL; @@ -161,10 +194,20 @@ void show_help_and_exit() { void print_default_pubsub_topic(int callerRet, const char* msg, size_t len, void* userData) { printf("Default pubsub topic: %s\n", msg); + + pthread_mutex_lock(&mutex); + callback_executed = 1; + pthread_cond_signal(&cond); + pthread_mutex_unlock(&mutex); } void print_waku_version(int callerRet, const char* msg, size_t len, void* userData) { printf("Git Version: %s\n", msg); + + pthread_mutex_lock(&mutex); + callback_executed = 1; + pthread_cond_signal(&cond); + pthread_mutex_unlock(&mutex); } // Beginning of UI program logic @@ -247,15 +290,13 @@ void handle_user_input() { // End of UI program logic int main(int argc, char** argv) { - waku_setup(); - struct ConfigNode cfgNode; // default values snprintf(cfgNode.host, 128, "0.0.0.0"); cfgNode.port = 60000; cfgNode.relay = 1; - cfgNode.store = 1; + cfgNode.store = 0; snprintf(cfgNode.storeNode, 2048, ""); snprintf(cfgNode.storeRetentionPolicy, 64, "time:6000000"); snprintf(cfgNode.storeDbUrl, 256, "postgres://postgres:test123@localhost:5432/postgres"); @@ -296,6 +337,7 @@ int main(int argc, char** argv) { cfgNode.storeMaxNumDbConnections); ctx = waku_new(jsonConfig, event_handler, userData); + waitForCallback(); WAKU_CALL( waku_default_pubsub_topic(ctx, print_default_pubsub_topic, userData) ); WAKU_CALL( waku_version(ctx, print_waku_version, userData) ); @@ -303,10 +345,12 @@ int main(int argc, char** argv) { printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port); printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO"); - waku_set_event_callback(ctx, event_handler, userData); - waku_start(ctx, event_handler, userData); + waku_set_event_callback(ctx, on_event_received, userData); - waku_listen_addresses(ctx, event_handler, userData); + waku_start(ctx, event_handler, userData); + waitForCallback(); + + WAKU_CALL( waku_listen_addresses(ctx, event_handler, userData) ); printf("Establishing connection with: %s\n", cfgNode.peers); @@ -334,4 +378,7 @@ int main(int argc, char** argv) { while(1) { handle_user_input(); } + + pthread_mutex_destroy(&mutex); + pthread_cond_destroy(&cond); } diff --git a/examples/cpp/waku.cpp b/examples/cpp/waku.cpp index 8b544160a..4b601c492 100644 --- a/examples/cpp/waku.cpp +++ b/examples/cpp/waku.cpp @@ -199,8 +199,6 @@ auto cify(F&& f) { } int main(int argc, char** argv) { - waku_setup(); - struct ConfigNode cfgNode; // default values snprintf(cfgNode.host, 128, "0.0.0.0"); diff --git a/examples/golang/waku.go b/examples/golang/waku.go index c7a6a2cc6..ad7c40b34 100644 --- a/examples/golang/waku.go +++ b/examples/golang/waku.go @@ -296,10 +296,6 @@ type WakuNode struct { ctx unsafe.Pointer } -func WakuSetup() { - C.waku_setup() -} - func WakuNew(config WakuConfig) (*WakuNode, error) { jsonConfig, err := json.Marshal(config) if err != nil { @@ -557,8 +553,6 @@ func (self *WakuNode) WakuGetMyENR() (string, error) { } func main() { - WakuSetup() - config := WakuConfig{ Host: "0.0.0.0", Port: 30304, diff --git a/examples/mobile/android/app/src/main/jni/waku_ffi.c b/examples/mobile/android/app/src/main/jni/waku_ffi.c index 129ebaa08..477e2dad2 100644 --- a/examples/mobile/android/app/src/main/jni/waku_ffi.c +++ b/examples/mobile/android/app/src/main/jni/waku_ffi.c @@ -177,7 +177,6 @@ jclass loadClass(JNIEnv *env, const char *className) { } void Java_com_mobile_WakuModule_wakuSetup(JNIEnv *env, jobject thiz) { - waku_setup(); LOGD("log example for debugging purposes...") } diff --git a/examples/python/waku.py b/examples/python/waku.py index 1c6ed7bc7..4d5f5643e 100644 --- a/examples/python/waku.py +++ b/examples/python/waku.py @@ -58,9 +58,6 @@ json_config = "{ \ callback_type = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_char_p, ctypes.c_size_t) -# libwaku setup -libwaku.waku_setup() - # Node creation libwaku.waku_new.restype = ctypes.c_void_p libwaku.waku_new.argtypes = [ctypes.c_char_p, diff --git a/examples/rust/src/main.rs b/examples/rust/src/main.rs index 6e8b9c1c9..7a91b0ddc 100644 --- a/examples/rust/src/main.rs +++ b/examples/rust/src/main.rs @@ -13,8 +13,6 @@ pub type WakuCallback = ); extern "C" { - pub fn waku_setup(); - pub fn waku_new( config_json: *const u8, cb: WakuCallback, @@ -70,8 +68,6 @@ fn main() { }"; unsafe { - waku_setup(); - // Create the waku node let closure = |ret: i32, data: &str| { println!("Ret {ret}. Error creating waku node {data}"); diff --git a/library/callback.nim b/library/callback.nim deleted file mode 100644 index 8a8522600..000000000 --- a/library/callback.nim +++ /dev/null @@ -1,13 +0,0 @@ -import ./waku_thread/waku_thread - -type WakuCallBack* = proc( - callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer -) {.cdecl, gcsafe, raises: [].} - -template checkLibwakuParams*( - ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer -) = - ctx[].userData = userData - - if isNil(callback): - return RET_MISSING_CALLBACK diff --git a/library/ffi_types.nim b/library/ffi_types.nim new file mode 100644 index 000000000..a5eeb9711 --- /dev/null +++ b/library/ffi_types.nim @@ -0,0 +1,30 @@ +################################################################################ +### Exported types + +type WakuCallBack* = proc( + callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer +) {.cdecl, gcsafe, raises: [].} + +const RET_OK*: cint = 0 +const RET_ERR*: cint = 1 +const RET_MISSING_CALLBACK*: cint = 2 + +### End of exported types +################################################################################ + +################################################################################ +### FFI utils + +template foreignThreadGc*(body: untyped) = + when declared(setupForeignThreadGc): + setupForeignThreadGc() + + body + + when declared(tearDownForeignThreadGc): + tearDownForeignThreadGc() + +type onDone* = proc() + +### End of FFI utils +################################################################################ diff --git a/library/libwaku.h b/library/libwaku.h index 0255e6115..c02f6da43 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -20,9 +20,6 @@ extern "C" { typedef void (*WakuCallBack) (int callerRet, const char* msg, size_t len, void* userData); -// Initializes the library. Should be called before any other function -void waku_setup(); - // Creates a new instance of the waku node. // Sets up the waku node from the given configuration. // Returns a pointer to the Context needed by the rest of the API functions. diff --git a/library/libwaku.nim b/library/libwaku.nim index 0dac652d0..f51ab4aae 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -6,7 +6,7 @@ when defined(linux): {.passl: "-Wl,-soname,libwaku.so".} import std/[json, atomics, strformat, options, atomics] -import chronicles, chronos +import chronicles, chronos, chronos/threadsync import waku/common/base64, waku/waku_core/message/message, @@ -27,51 +27,35 @@ import ./waku_thread/inter_thread_communication/requests/ping_request, ./waku_thread/inter_thread_communication/waku_thread_request, ./alloc, - ./callback + ./ffi_types ################################################################################ ### Wrapper around the waku node ################################################################################ -################################################################################ -### Exported types - -const RET_OK: cint = 0 -const RET_ERR: cint = 1 -const RET_MISSING_CALLBACK: cint = 2 - -### End of exported types -################################################################################ - ################################################################################ ### Not-exported components -template foreignThreadGc(body: untyped) = - when declared(setupForeignThreadGc): - setupForeignThreadGc() +template checkLibwakuParams*( + ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer +) = + ctx[].userData = userData - body + if isNil(callback): + return RET_MISSING_CALLBACK - when declared(tearDownForeignThreadGc): - tearDownForeignThreadGc() - -proc handleRes[T: string | void]( - res: Result[T, string], callback: WakuCallBack, userData: pointer +proc handleRequest( + ctx: ptr WakuContext, + requestType: RequestType, + content: pointer, + callback: WakuCallBack, + userData: pointer, ): cint = - ## Handles the Result responses, which can either be Result[string, string] or - ## Result[void, string]. Notice that in case of Result[void, string], it is enough to - ## just return RET_OK and not provide any additional feedback through the callback. - if res.isErr(): - foreignThreadGc: - let msg = "libwaku error: " & $res.error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + waku_thread.sendRequestToWakuThread(ctx, requestType, content, callback, userData).isOkOr: + let msg = "libwaku error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR - foreignThreadGc: - var msg: cstring = "" - when T is string: - msg = res.get().cstring() - callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_OK proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler = @@ -87,15 +71,14 @@ proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler = error "eventUserData is nil" return - try: - let event = $JsonMessageEvent.new(pubsubTopic, msg) - foreignThreadGc: + foreignThreadGc: + try: + let event = $JsonMessageEvent.new(pubsubTopic, msg) cast[WakuCallBack](ctx[].eventCallback)( RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData ) - except Exception, CatchableError: - let msg = "Exception when calling 'eventCallBack': " & getCurrentExceptionMsg() - foreignThreadGc: + except Exception, CatchableError: + let msg = "Exception when calling 'eventCallBack': " & getCurrentExceptionMsg() cast[WakuCallBack](ctx[].eventCallback)( RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData ) @@ -121,24 +104,27 @@ if defined(android): ) {.raises: [].} = echo logLevel, msg +proc initializeLibrary() {.exported.} = + if not initialized.exchange(true): + NimMain() # Every Nim library needs to call `NimMain` once exactly + when declared(setupForeignThreadGc): + setupForeignThreadGc() + when declared(nimGC_setStackBottom): + var locals {.volatile, noinit.}: pointer + locals = addr(locals) + nimGC_setStackBottom(locals) + ### End of library setup ################################################################################ ################################################################################ ### Exported procs -proc waku_setup() {.dynlib, exportc.} = - NimMain() - if not initialized.load: - initialized.store(true) - - when declared(nimGC_setStackBottom): - var locals {.volatile, noinit.}: pointer - locals = addr(locals) - nimGC_setStackBottom(locals) proc waku_new( configJson: cstring, callback: WakuCallback, userData: pointer ): pointer {.dynlib, exportc, cdecl.} = + initializeLibrary() + ## Creates a new instance of the WakuNode. if isNil(callback): echo "error: missing callback in waku_new" @@ -146,50 +132,56 @@ proc waku_new( ## Create the Waku thread that will keep waiting for req from the main thread. var ctx = waku_thread.createWakuThread().valueOr: - foreignThreadGc: - let msg = "Error in createWakuThread: " & $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + let msg = "Error in createWakuThread: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return nil ctx.userData = userData - waku_thread.sendRequestToWakuThread( + let retCode = handleRequest( ctx, RequestType.LIFECYCLE, NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson), - ).isOkOr: - foreignThreadGc: - let msg = $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) - return nil + callback, + userData, + ) + + if retCode == RET_ERR: + return nil return ctx proc waku_destroy( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread.destroyWakuThread(ctx).handleRes(callback, userData) + waku_thread.destroyWakuThread(ctx).isOkOr: + let msg = "libwaku error: " & $error + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) + return RET_ERR + + return RET_OK proc waku_version( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - - foreignThreadGc: - callback( - RET_OK, - cast[ptr cchar](WakuNodeVersionString), - cast[csize_t](len(WakuNodeVersionString)), - userData, - ) + callback( + RET_OK, + cast[ptr cchar](WakuNodeVersionString), + cast[csize_t](len(WakuNodeVersionString)), + userData, + ) return RET_OK proc waku_set_event_callback( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ) {.dynlib, exportc.} = + initializeLibrary() ctx[].eventCallback = cast[pointer](callback) ctx[].eventUserData = userData @@ -204,6 +196,7 @@ proc waku_content_topic( ): cint {.dynlib, exportc.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding + initializeLibrary() checkLibwakuParams(ctx, callback, userData) let appStr = appName.alloc() @@ -226,6 +219,7 @@ proc waku_pubsub_topic( ): cint {.dynlib, exportc, cdecl.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding + initializeLibrary() checkLibwakuParams(ctx, callback, userData) let topicNameStr = topicName.alloc() @@ -244,6 +238,7 @@ proc waku_default_pubsub_topic( ): cint {.dynlib, exportc.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic + initializeLibrary() checkLibwakuParams(ctx, callback, userData) callback( @@ -265,6 +260,7 @@ proc waku_relay_publish( ): cint {.dynlib, exportc, cdecl.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms + initializeLibrary() checkLibwakuParams(ctx, callback, userData) let jwm = jsonWakuMessage.alloc() @@ -295,8 +291,7 @@ proc waku_relay_publish( else: $pst - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.RELAY, RelayRequest.createShared( @@ -305,34 +300,35 @@ proc waku_relay_publish( WakuRelayHandler(onReceivedMessage(ctx)), wakuMessage, ), + callback, + userData, ) - .handleRes(callback, userData) proc waku_start( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.LIFECYCLE, NodeLifecycleRequest.createShared(NodeLifecycleMsgType.START_NODE), + callback, + userData, ) - .handleRes(callback, userData) proc waku_stop( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.LIFECYCLE, NodeLifecycleRequest.createShared(NodeLifecycleMsgType.STOP_NODE), + callback, + userData, ) - .handleRes(callback, userData) proc waku_relay_subscribe( ctx: ptr WakuContext, @@ -340,6 +336,7 @@ proc waku_relay_subscribe( callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) let pst = pubSubTopic.alloc() @@ -347,15 +344,15 @@ proc waku_relay_subscribe( deallocShared(pst) var cb = onReceivedMessage(ctx) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.RELAY, RelayRequest.createShared( RelayMsgType.SUBSCRIBE, PubsubTopic($pst), WakuRelayHandler(cb) ), + callback, + userData, ) - .handleRes(callback, userData) proc waku_relay_unsubscribe( ctx: ptr WakuContext, @@ -363,14 +360,14 @@ proc waku_relay_unsubscribe( callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) let pst = pubSubTopic.alloc() defer: deallocShared(pst) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.RELAY, RelayRequest.createShared( @@ -378,8 +375,9 @@ proc waku_relay_unsubscribe( PubsubTopic($pst), WakuRelayHandler(onReceivedMessage(ctx)), ), + callback, + userData, ) - .handleRes(callback, userData) proc waku_relay_get_num_connected_peers( ctx: ptr WakuContext, @@ -387,19 +385,20 @@ proc waku_relay_get_num_connected_peers( callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) let pst = pubSubTopic.alloc() defer: deallocShared(pst) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.RELAY, RelayRequest.createShared(RelayMsgType.LIST_CONNECTED_PEERS, PubsubTopic($pst)), + callback, + userData, ) - .handleRes(callback, userData) proc waku_relay_get_num_peers_in_mesh( ctx: ptr WakuContext, @@ -407,19 +406,20 @@ proc waku_relay_get_num_peers_in_mesh( callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) let pst = pubSubTopic.alloc() defer: deallocShared(pst) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.RELAY, RelayRequest.createShared(RelayMsgType.LIST_MESH_PEERS, PubsubTopic($pst)), + callback, + userData, ) - .handleRes(callback, userData) proc waku_filter_subscribe( ctx: ptr WakuContext, @@ -428,10 +428,10 @@ proc waku_filter_subscribe( callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.FILTER, FilterRequest.createShared( @@ -440,8 +440,9 @@ proc waku_filter_subscribe( contentTopics, FilterPushHandler(onReceivedMessage(ctx)), ), + callback, + userData, ) - .handleRes(callback, userData) proc waku_filter_unsubscribe( ctx: ptr WakuContext, @@ -450,26 +451,30 @@ proc waku_filter_unsubscribe( callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.FILTER, FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE, pubSubTopic, contentTopics), + callback, + userData, ) - .handleRes(callback, userData) proc waku_filter_unsubscribe_all( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( - ctx, RequestType.FILTER, FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE_ALL) + handleRequest( + ctx, + RequestType.FILTER, + FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE_ALL), + callback, + userData, ) - .handleRes(callback, userData) proc waku_lightpush_publish( ctx: ptr WakuContext, @@ -478,6 +483,7 @@ proc waku_lightpush_publish( callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc, cdecl.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) let jwm = jsonWakuMessage.alloc() @@ -506,15 +512,15 @@ proc waku_lightpush_publish( else: $pst - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.LIGHTPUSH, LightpushRequest.createShared( LightpushMsgType.PUBLISH, PubsubTopic($pst), wakuMessage ), + callback, + userData, ) - .handleRes(callback, userData) proc waku_connect( ctx: ptr WakuContext, @@ -523,32 +529,34 @@ proc waku_connect( callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared( PeerManagementMsgType.CONNECT_TO, $peerMultiAddr, chronos.milliseconds(timeoutMs) ), + callback, + userData, ) - .handleRes(callback, userData) proc waku_disconnect_peer_by_id( ctx: ptr WakuContext, peerId: cstring, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared( op = PeerManagementMsgType.DISCONNECT_PEER_BY_ID, peerId = $peerId ), + callback, + userData, ) - .handleRes(callback, userData) proc waku_dial_peer( ctx: ptr WakuContext, @@ -558,10 +566,10 @@ proc waku_dial_peer( callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared( @@ -569,8 +577,9 @@ proc waku_dial_peer( peerMultiAddr = $peerMultiAddr, protocol = $protocol, ), + callback, + userData, ) - .handleRes(callback, userData) proc waku_dial_peer_by_id( ctx: ptr WakuContext, @@ -580,58 +589,62 @@ proc waku_dial_peer_by_id( callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared( op = PeerManagementMsgType.DIAL_PEER_BY_ID, peerId = $peerId, protocol = $protocol ), + callback, + userData, ) - .handleRes(callback, userData) proc waku_get_peerids_from_peerstore( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared(PeerManagementMsgType.GET_ALL_PEER_IDS), + callback, + userData, ) - .handleRes(callback, userData) proc waku_get_connected_peers( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared(PeerManagementMsgType.GET_CONNECTED_PEERS), + callback, + userData, ) - .handleRes(callback, userData) proc waku_get_peerids_by_protocol( ctx: ptr WakuContext, protocol: cstring, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.PEER_MANAGER, PeerManagementRequest.createShared( op = PeerManagementMsgType.GET_PEER_IDS_BY_PROTOCOL, protocol = $protocol ), + callback, + userData, ) - .handleRes(callback, userData) proc waku_store_query( ctx: ptr WakuContext, @@ -641,28 +654,30 @@ proc waku_store_query( callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.STORE, JsonStoreQueryRequest.createShared(jsonQuery, peerAddr, timeoutMs), + callback, + userData, ) - .handleRes(callback, userData) proc waku_listen_addresses( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.DEBUG, DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_LISTENING_ADDRESSES), + callback, + userData, ) - .handleRes(callback, userData) proc waku_dns_discovery( ctx: ptr WakuContext, @@ -672,93 +687,106 @@ proc waku_dns_discovery( callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.DISCOVERY, DiscoveryRequest.createRetrieveBootstrapNodesRequest( DiscoveryMsgType.GET_BOOTSTRAP_NODES, entTreeUrl, nameDnsServer, timeoutMs ), + callback, + userData, ) - .handleRes(callback, userData) proc waku_discv5_update_bootnodes( ctx: ptr WakuContext, bootnodes: cstring, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = ## Updates the bootnode list used for discovering new peers via DiscoveryV5 ## bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]` + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.DISCOVERY, DiscoveryRequest.createUpdateBootstrapNodesRequest( DiscoveryMsgType.UPDATE_DISCV5_BOOTSTRAP_NODES, bootnodes ), + callback, + userData, ) - .handleRes(callback, userData) proc waku_get_my_enr( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.DEBUG, DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_MY_ENR), + callback, + userData, ) - .handleRes(callback, userData) proc waku_get_my_peerid( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.DEBUG, DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_MY_PEER_ID), + callback, + userData, ) - .handleRes(callback, userData) proc waku_start_discv5( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( - ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StartRequest() + handleRequest( + ctx, + RequestType.DISCOVERY, + DiscoveryRequest.createDiscV5StartRequest(), + callback, + userData, ) - .handleRes(callback, userData) proc waku_stop_discv5( ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( - ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StopRequest() + handleRequest( + ctx, + RequestType.DISCOVERY, + DiscoveryRequest.createDiscV5StopRequest(), + callback, + userData, ) - .handleRes(callback, userData) proc waku_peer_exchange_request( ctx: ptr WakuContext, numPeers: uint64, callback: WakuCallBack, userData: pointer ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( - ctx, RequestType.DISCOVERY, DiscoveryRequest.createPeerExchangeRequest(numPeers) + handleRequest( + ctx, + RequestType.DISCOVERY, + DiscoveryRequest.createPeerExchangeRequest(numPeers), + callback, + userData, ) - .handleRes(callback, userData) proc waku_ping_peer( ctx: ptr WakuContext, @@ -767,15 +795,16 @@ proc waku_ping_peer( callback: WakuCallBack, userData: pointer, ): cint {.dynlib, exportc.} = + initializeLibrary() checkLibwakuParams(ctx, callback, userData) - waku_thread - .sendRequestToWakuThread( + handleRequest( ctx, RequestType.PING, PingRequest.createShared(peerAddr, chronos.milliseconds(timeoutMs)), + callback, + userData, ) - .handleRes(callback, userData) ### End of exported procs ################################################################################ diff --git a/library/waku_thread/inter_thread_communication/waku_thread_request.nim b/library/waku_thread/inter_thread_communication/waku_thread_request.nim index 1036e2b2a..bcfb84198 100644 --- a/library/waku_thread/inter_thread_communication/waku_thread_request.nim +++ b/library/waku_thread/inter_thread_communication/waku_thread_request.nim @@ -3,9 +3,10 @@ ## the Waku Thread. import std/json, results -import chronos +import chronos, chronos/threadsync import ../../../waku/factory/waku, + ../../ffi_types, ./requests/node_lifecycle_request, ./requests/peer_manager_request, ./requests/protocols/relay_request, @@ -27,27 +28,55 @@ type RequestType* {.pure.} = enum LIGHTPUSH FILTER -type InterThreadRequest* = object +type WakuThreadRequest* = object reqType: RequestType reqContent: pointer + callback: WakuCallBack + userData: pointer proc createShared*( - T: type InterThreadRequest, reqType: RequestType, reqContent: pointer + T: type WakuThreadRequest, + reqType: RequestType, + reqContent: pointer, + callback: WakuCallBack, + userData: pointer, ): ptr type T = var ret = createShared(T) ret[].reqType = reqType ret[].reqContent = reqContent + ret[].callback = callback + ret[].userData = userData return ret -proc process*( - T: type InterThreadRequest, request: ptr InterThreadRequest, waku: ptr Waku -): Future[Result[string, string]] {.async.} = - ## Processes the request and deallocates its memory +proc handleRes[T: string | void]( + res: Result[T, string], request: ptr WakuThreadRequest +) = + ## Handles the Result responses, which can either be Result[string, string] or + ## Result[void, string]. + defer: deallocShared(request) - echo "Request received: " & $request[].reqType + if res.isErr(): + foreignThreadGc: + let msg = "libwaku error: handleRes fireSyncRes error: " & $res.error + request[].callback( + RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + foreignThreadGc: + var msg: cstring = "" + when T is string: + msg = res.get().cstring() + request[].callback( + RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData + ) + return + +proc process*( + T: type WakuThreadRequest, request: ptr WakuThreadRequest, waku: ptr Waku +) {.async.} = let retFut = case request[].reqType of LIFECYCLE: @@ -69,7 +98,7 @@ proc process*( of FILTER: cast[ptr FilterRequest](request[].reqContent).process(waku) - return await retFut + handleRes(await retFut, request) -proc `$`*(self: InterThreadRequest): string = +proc `$`*(self: WakuThreadRequest): string = return $self.reqType diff --git a/library/waku_thread/inter_thread_communication/waku_thread_response.nim b/library/waku_thread/inter_thread_communication/waku_thread_response.nim deleted file mode 100644 index e44e2d49f..000000000 --- a/library/waku_thread/inter_thread_communication/waku_thread_response.nim +++ /dev/null @@ -1,48 +0,0 @@ -## This file contains the base message response type that will be handled. -## The response will be created from the Waku Thread and processed in -## the main thread. - -import std/json, results -import ../../alloc - -type ResponseType {.pure.} = enum - OK - ERR - -type InterThreadResponse* = object - respType: ResponseType - content: cstring - -proc createShared*( - T: type InterThreadResponse, res: Result[string, string] -): ptr type T = - ## Converts a `Result[string, string]` into a `ptr InterThreadResponse` - ## so that it can be transfered to another thread in a safe way. - - var ret = createShared(T) - if res.isOk(): - let value = res.get() - ret[].respType = ResponseType.OK - ret[].content = value.alloc() - else: - let error = res.error - ret[].respType = ResponseType.ERR - ret[].content = res.error.alloc() - return ret - -proc process*( - T: type InterThreadResponse, resp: ptr InterThreadResponse -): Result[string, string] = - ## Converts the received `ptr InterThreadResponse` into a - ## `Result[string, string]`. Notice that the response is expected to be - ## allocated from the Waku Thread and deallocated by the main thread. - - defer: - deallocShared(resp[].content) - deallocShared(resp) - - case resp[].respType - of OK: - return ok($resp[].content) - of ERR: - return err($resp[].content) diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 5babbf380..1c7b87536 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -4,17 +4,12 @@ import std/[options, atomics, os, net] import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results -import - waku/factory/waku, - ./inter_thread_communication/waku_thread_request, - ./inter_thread_communication/waku_thread_response +import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types type WakuContext* = object thread: Thread[(ptr WakuContext)] - reqChannel: ChannelSPSCSingle[ptr InterThreadRequest] + reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest] reqSignal: ThreadSignalPtr - respChannel: ChannelSPSCSingle[ptr InterThreadResponse] - respSignal: ThreadSignalPtr userData*: pointer eventCallback*: pointer eventUserdata*: pointer @@ -36,27 +31,14 @@ proc runWaku(ctx: ptr WakuContext) {.async.} = break ## Trying to get a request from the libwaku requestor thread - var request: ptr InterThreadRequest + var request: ptr WakuThreadRequest let recvOk = ctx.reqChannel.tryRecv(request) if not recvOk: error "waku thread could not receive a request" continue ## Handle the request - let resultResponse = waitFor InterThreadRequest.process(request, addr waku) - - ## Converting a `Result` into a thread-safe transferable response type - let threadSafeResp = InterThreadResponse.createShared(resultResponse) - - ## Send the response back to the thread that sent the request - let sentOk = ctx.respChannel.trySend(threadSafeResp) - if not sentOk: - error "could not send a request to the requester thread", - original_request = $request[] - - let fireRes = ctx.respSignal.fireSync() - if fireRes.isErr(): - error "could not fireSync back to requester thread", error = fireRes.error + asyncSpawn WakuThreadRequest.process(request, addr waku) proc run(ctx: ptr WakuContext) {.thread.} = ## Launch waku worker @@ -68,8 +50,6 @@ proc createWakuThread*(): Result[ptr WakuContext, string] = var ctx = createShared(WakuContext, 1) ctx.reqSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create reqSignal ThreadSignalPtr") - ctx.respSignal = ThreadSignalPtr.new().valueOr: - return err("couldn't create respSignal ThreadSignalPtr") ctx.running.store(true) @@ -93,36 +73,31 @@ proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] = joinThread(ctx.thread) ?ctx.reqSignal.close() - ?ctx.respSignal.close() freeShared(ctx) return ok() proc sendRequestToWakuThread*( - ctx: ptr WakuContext, reqType: RequestType, reqContent: pointer -): Result[string, string] = - let req = InterThreadRequest.createShared(reqType, reqContent) + ctx: ptr WakuContext, + reqType: RequestType, + reqContent: pointer, + callback: WakuCallBack, + userData: pointer, +): Result[void, string] = + let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData) ## Sending the request let sentOk = ctx.reqChannel.trySend(req) if not sentOk: + deallocShared(req) return err("Couldn't send a request to the waku thread: " & $req[]) let fireSyncRes = ctx.reqSignal.fireSync() if fireSyncRes.isErr(): + deallocShared(req) return err("failed fireSync: " & $fireSyncRes.error) if fireSyncRes.get() == false: + deallocShared(req) return err("Couldn't fireSync in time") - # Waiting for the response - let res = waitSync(ctx.respSignal) - if res.isErr(): - return err("Couldnt receive response signal") - - var response: ptr InterThreadResponse - var recvOk = ctx.respChannel.tryRecv(response) - if recvOk == false: - return err("Couldn't receive response from the waku thread: " & $req[]) - - ## Converting the thread-safe response into a managed/CG'ed `Result` - return InterThreadResponse.process(response) + ok()