diff --git a/examples/cbindings/waku_example.c b/examples/cbindings/waku_example.c index b58dbc9a6..ec1b8f9d3 100644 --- a/examples/cbindings/waku_example.c +++ b/examples/cbindings/waku_example.c @@ -30,6 +30,12 @@ struct ConfigNode { char peers[2048]; }; +// libwaku Context +void* ctx; + +// For the case of C language we don't need to store a particular userData +void* userData = NULL; + // Arguments parsing static char doc[] = "\nC example that shows how to use the waku library."; static char args_doc[] = ""; @@ -78,8 +84,18 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { static struct argp argp = { options, parse_opt, args_doc, doc, 0, 0, 0 }; +void event_handler(int callerRet, const char* msg, size_t len) { + if (callerRet == RET_ERR) { + printf("Error: %s\n", msg); + exit(1); + } + else if (callerRet == RET_OK) { + printf("Receiving message %s\n", msg); + } +} + char* contentTopic = NULL; -void handle_content_topic(const char* msg, size_t len) { +void handle_content_topic(int callerRet, const char* msg, size_t len) { if (contentTopic != NULL) { free(contentTopic); } @@ -89,7 +105,7 @@ void handle_content_topic(const char* msg, size_t len) { } char* publishResponse = NULL; -void handle_publish_ok(const char* msg, size_t len) { +void handle_publish_ok(int callerRet, const char* msg, size_t len) { printf("Publish Ok: %s %lu\n", msg, len); if (publishResponse != NULL) { @@ -100,22 +116,19 @@ void handle_publish_ok(const char* msg, size_t len) { strcpy(publishResponse, msg); } -void handle_error(const char* msg, size_t len) { - printf("Error: %s\n", msg); - exit(1); -} - #define MAX_MSG_SIZE 65535 void publish_message(char* pubsubTopic, const char* msg) { char jsonWakuMsg[MAX_MSG_SIZE]; char *msgPayload = b64_encode(msg, strlen(msg)); - WAKU_CALL( waku_content_topic("appName", + WAKU_CALL( waku_content_topic(RET_OK, + "appName", 1, "contentTopicName", "encoding", - handle_content_topic) ); + handle_content_topic, + userData) ); snprintf(jsonWakuMsg, MAX_MSG_SIZE, @@ -124,10 +137,12 @@ void publish_message(char* pubsubTopic, const char* msg) { free(msgPayload); - WAKU_CALL( waku_relay_publish(pubsubTopic, + WAKU_CALL( waku_relay_publish(&ctx, + pubsubTopic, jsonWakuMsg, 10000 /*timeout ms*/, - handle_error) ); + event_handler, + userData) ); printf("waku relay response [%s]\n", publishResponse); } @@ -137,15 +152,11 @@ void show_help_and_exit() { exit(1); } -void event_handler(const char* msg, size_t len) { - printf("Receiving message %s\n", msg); -} - -void print_default_pubsub_topic(const char* msg, size_t len) { +void print_default_pubsub_topic(int callerRet, const char* msg, size_t len) { printf("Default pubsub topic: %s\n", msg); } -void print_waku_version(const char* msg, size_t len) { +void print_waku_version(int callerRet, const char* msg, size_t len) { printf("Git Version: %s\n", msg); } @@ -186,8 +197,10 @@ void handle_user_input() { char pubsubTopic[128]; scanf("%127s", pubsubTopic); - WAKU_CALL( waku_relay_subscribe(pubsubTopic, - handle_error) ); + WAKU_CALL( waku_relay_subscribe(&ctx, + pubsubTopic, + event_handler, + userData) ); printf("The subscription went well\n"); show_main_menu(); @@ -199,7 +212,7 @@ void handle_user_input() { printf("e.g.: /ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\n"); char peerAddr[512]; scanf("%511s", peerAddr); - WAKU_CALL(waku_connect(peerAddr, 10000 /* timeoutMs */, handle_error)); + WAKU_CALL(waku_connect(&ctx, peerAddr, 10000 /* timeoutMs */, event_handler, userData)); show_main_menu(); break; @@ -239,6 +252,8 @@ int main(int argc, char** argv) { show_help_and_exit(); } + ctx = waku_init(event_handler, userData); + char jsonConfig[1024]; snprintf(jsonConfig, 1024, "{ \ \"host\": \"%s\", \ @@ -250,25 +265,29 @@ int main(int argc, char** argv) { cfgNode.key, cfgNode.relay ? "true":"false"); - WAKU_CALL( waku_default_pubsub_topic(print_default_pubsub_topic) ); - WAKU_CALL( waku_version(print_waku_version) ); + WAKU_CALL( waku_default_pubsub_topic(&ctx, print_default_pubsub_topic, userData) ); + WAKU_CALL( waku_version(&ctx, print_waku_version, userData) ); printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port); printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO"); - WAKU_CALL( waku_new(jsonConfig, handle_error) ); + WAKU_CALL( waku_new(&ctx, jsonConfig, event_handler, userData) ); - waku_set_event_callback(event_handler); - waku_start(); + waku_set_event_callback(event_handler, userData); + waku_start(&ctx, event_handler, userData); printf("Establishing connection with: %s\n", cfgNode.peers); - WAKU_CALL( waku_connect(cfgNode.peers, + WAKU_CALL( waku_connect(&ctx, + cfgNode.peers, 10000 /* timeoutMs */, - handle_error) ); + event_handler, + userData) ); - WAKU_CALL( waku_relay_subscribe("/waku/2/default-waku/proto", - handle_error) ); + WAKU_CALL( waku_relay_subscribe(&ctx, + "/waku/2/default-waku/proto", + event_handler, + userData) ); show_main_menu(); while(1) { handle_user_input(); diff --git a/library/libwaku.h b/library/libwaku.h index d2fe5cd1e..bbd7f3515 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -15,45 +15,73 @@ extern "C" { #endif -typedef void (*WakuCallBack) (const char* msg, size_t len_0); +typedef void (*WakuCallBack) (int callerRet, const char* msg, size_t len); + +// Initializes the waku library and returns a pointer to the Context. +void* waku_init(WakuCallBack callback, + void* userData); // Creates a new instance of the waku node. // Sets up the waku node from the given configuration. -int waku_new(const char* configJson, WakuCallBack onErrCb); +int waku_new(void* ctx, + const char* configJson, + WakuCallBack callback, + void* userData); -void waku_start(void); +int waku_start(void* ctx, + WakuCallBack callback, + void* userData); -void waku_stop(void); +int waku_stop(void* ctx, + WakuCallBack callback, + void* userData); -int waku_version(WakuCallBack onOkCb); +int waku_version(void* ctx, + WakuCallBack callback, + void* userData); -void waku_set_event_callback(WakuCallBack callback); +void waku_set_event_callback(WakuCallBack callback, + void* userData); -int waku_content_topic(const char* appName, +int waku_content_topic(void* ctx, + const char* appName, unsigned int appVersion, const char* contentTopicName, const char* encoding, - WakuCallBack onOkCb); + WakuCallBack callback, + void* userData); -int waku_pubsub_topic(const char* topicName, - WakuCallBack onOkCb); +int waku_pubsub_topic(void* ctx, + const char* topicName, + WakuCallBack callback, + void* userData); -int waku_default_pubsub_topic(WakuCallBack onOkCb); +int waku_default_pubsub_topic(void* ctx, + WakuCallBack callback, + void* userData); -int waku_relay_publish(const char* pubSubTopic, +int waku_relay_publish(void* ctx, + const char* pubSubTopic, const char* jsonWakuMessage, unsigned int timeoutMs, - WakuCallBack onErrCb); + WakuCallBack callback, + void* userData); -int waku_relay_subscribe(const char* pubSubTopic, - WakuCallBack onErrCb); +int waku_relay_subscribe(void* ctx, + const char* pubSubTopic, + WakuCallBack callback, + void* userData); -int waku_relay_unsubscribe(const char* pubSubTopic, - WakuCallBack onErrCb); +int waku_relay_unsubscribe(void* ctx, + const char* pubSubTopic, + WakuCallBack callback, + void* userData); -int waku_connect(const char* peerMultiAddr, +int waku_connect(void* ctx, + const char* peerMultiAddr, unsigned int timeoutMs, - WakuCallBack onErrCb); + WakuCallBack callback, + void* userData); #ifdef __cplusplus } diff --git a/library/libwaku.nim b/library/libwaku.nim index 1757de721..9f6449d53 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -4,8 +4,7 @@ {.passc: "-fPIC".} import - std/[json,sequtils,times,strformat,options,atomics,strutils], - strutils + std/[json,sequtils,times,strformat,options,atomics,strutils] import chronicles, chronos @@ -34,7 +33,9 @@ const RET_ERR: cint = 1 const RET_MISSING_CALLBACK: cint = 2 type - WakuCallBack* = proc(msg: ptr cchar, len: csize_t) {.cdecl, gcsafe.} + WakuCallBack* = proc(callerRet: cint, + msg: ptr cchar, + len: csize_t) {.cdecl, gcsafe.} ### End of exported types ################################################################################ @@ -51,10 +52,11 @@ proc relayEventCallback(pubsubTopic: PubsubTopic, if not isNil(extEventCallback): try: let event = $JsonMessageEvent.new(pubsubTopic, msg) - extEventCallback(unsafeAddr event[0], cast[csize_t](len(event))) + extEventCallback(RET_OK, unsafeAddr event[0], cast[csize_t](len(event))) except Exception,CatchableError: - error "Exception when calling 'eventCallBack': " & - getCurrentExceptionMsg() + let msg = "Exception when calling 'eventCallBack': " & + getCurrentExceptionMsg() + extEventCallback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) else: error "extEventCallback is nil" @@ -64,53 +66,72 @@ proc relayEventCallback(pubsubTopic: PubsubTopic, ################################################################################ ### Exported procs -proc waku_new(configJson: cstring, - onErrCb: WakuCallback): cint - {.dynlib, exportc, cdecl.} = - # Creates a new instance of the WakuNode. - # Notice that the ConfigNode type is also exported and available for users. - - if isNil(onErrCb): - return RET_MISSING_CALLBACK +proc waku_init(callback: WakuCallback): pointer {.dynlib, exportc, cdecl.} = + ## Initializes the waku library. ## Create the Waku thread that will keep waiting for req from the main thread. - waku_thread.createWakuThread().isOkOr: + var ctx = waku_thread.createWakuThread().valueOr: let msg = "Error in createWakuThread: " & $error - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) - return RET_ERR + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) + return nil + + return ctx + +proc waku_new(ctx: ptr ptr Context, + configJson: cstring, + callback: WakuCallback, + userData: pointer): cint + {.dynlib, exportc, cdecl.} = + ## Creates a new instance of the WakuNode. + ## Notice that the ConfigNode type is also exported and available for users. + + ctx[][].userData = userData + + if isNil(callback): + return RET_MISSING_CALLBACK let sendReqRes = waku_thread.sendRequestToWakuThread( + ctx[], RequestType.LIFECYCLE, NodeLifecycleRequest.createShared( NodeLifecycleMsgType.CREATE_NODE, configJson)) if sendReqRes.isErr(): let msg = $sendReqRes.error - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR return RET_OK -proc waku_version(onOkCb: WakuCallBack): cint {.dynlib, exportc.} = - if isNil(onOkCb): +proc waku_version(ctx: ptr ptr Context, + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = + + ctx[][].userData = userData + + if isNil(callback): return RET_MISSING_CALLBACK - onOkCb(cast[ptr cchar](WakuNodeVersionString), - cast[csize_t](len(WakuNodeVersionString))) + callback(RET_OK, cast[ptr cchar](WakuNodeVersionString), + cast[csize_t](len(WakuNodeVersionString))) return RET_OK proc waku_set_event_callback(callback: WakuCallBack) {.dynlib, exportc.} = extEventCallback = callback -proc waku_content_topic(appName: cstring, +proc waku_content_topic(ctx: ptr ptr Context, + appName: cstring, appVersion: cuint, contentTopicName: cstring, encoding: cstring, - onOkCb: WakuCallBack): cint {.dynlib, exportc.} = + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding - if isNil(onOkCb): + ctx[][].userData = userData + + if isNil(callback): return RET_MISSING_CALLBACK let appStr = appName.alloc() @@ -118,7 +139,7 @@ proc waku_content_topic(appName: cstring, let encodingStr = encoding.alloc() let contentTopic = fmt"/{$appStr}/{appVersion}/{$ctnStr}/{$encodingStr}" - onOkCb(unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic))) + callback(RET_OK, unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic))) deallocShared(appStr) deallocShared(ctnStr) @@ -126,40 +147,53 @@ proc waku_content_topic(appName: cstring, return RET_OK -proc waku_pubsub_topic(topicName: cstring, - onOkCb: WakuCallBack): cint {.dynlib, exportc, cdecl.} = - if isNil(onOkCb): +proc waku_pubsub_topic(ctx: ptr ptr Context, + topicName: cstring, + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc, cdecl.} = + # https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding + + ctx[][].userData = userData + + if isNil(callback): return RET_MISSING_CALLBACK let topicNameStr = topicName.alloc() - # https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding let outPubsubTopic = fmt"/waku/2/{$topicNameStr}" - onOkCb(unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic))) + callback(RET_OK, unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic))) deallocShared(topicNameStr) return RET_OK -proc waku_default_pubsub_topic(onOkCb: WakuCallBack): cint {.dynlib, exportc.} = +proc waku_default_pubsub_topic(ctx: ptr ptr Context, + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic - if isNil(onOkCb): + + ctx[][].userData = userData + + if isNil(callback): return RET_MISSING_CALLBACK - onOkCb(cast[ptr cchar](DefaultPubsubTopic), - cast[csize_t](len(DefaultPubsubTopic))) + callback(RET_OK, cast[ptr cchar](DefaultPubsubTopic), cast[csize_t](len(DefaultPubsubTopic))) return RET_OK -proc waku_relay_publish(pubSubTopic: cstring, +proc waku_relay_publish(ctx: ptr ptr Context, + pubSubTopic: cstring, jsonWakuMessage: cstring, timeoutMs: cuint, - onErrCb: WakuCallBack): cint + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc, cdecl.} = # https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms - if isNil(onErrCb): + ctx[][].userData = userData + + if isNil(callback): return RET_MISSING_CALLBACK let jwm = jsonWakuMessage.alloc() @@ -169,7 +203,7 @@ proc waku_relay_publish(pubSubTopic: cstring, except JsonParsingError: deallocShared(jwm) let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}" - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR deallocShared(jwm) @@ -190,7 +224,7 @@ proc waku_relay_publish(pubSubTopic: cstring, ) except KeyError: let msg = fmt"Problem building the WakuMessage: {getCurrentExceptionMsg()}" - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR let pst = pubSubTopic.alloc() @@ -201,6 +235,7 @@ proc waku_relay_publish(pubSubTopic: cstring, $pst let sendReqRes = waku_thread.sendRequestToWakuThread( + ctx[], RequestType.RELAY, RelayRequest.createShared(RelayMsgType.PUBLISH, PubsubTopic($pst), @@ -210,31 +245,47 @@ proc waku_relay_publish(pubSubTopic: cstring, if sendReqRes.isErr(): let msg = $sendReqRes.error - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR return RET_OK -proc waku_start() {.dynlib, exportc.} = +proc waku_start(ctx: ptr ptr Context, + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = + + ctx[][].userData = userData + ## TODO: handle the error discard waku_thread.sendRequestToWakuThread( + ctx[], RequestType.LIFECYCLE, NodeLifecycleRequest.createShared( NodeLifecycleMsgType.START_NODE)) -proc waku_stop() {.dynlib, exportc.} = +proc waku_stop(ctx: ptr ptr Context, + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = + ctx[][].userData = userData + ## TODO: handle the error discard waku_thread.sendRequestToWakuThread( + ctx[], RequestType.LIFECYCLE, NodeLifecycleRequest.createShared( NodeLifecycleMsgType.STOP_NODE)) proc waku_relay_subscribe( + ctx: ptr ptr Context, pubSubTopic: cstring, - onErrCb: WakuCallBack): cint + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = + ctx[][].userData = userData + let pst = pubSubTopic.alloc() let sendReqRes = waku_thread.sendRequestToWakuThread( + ctx[], RequestType.RELAY, RelayRequest.createShared(RelayMsgType.SUBSCRIBE, PubsubTopic($pst), @@ -243,19 +294,24 @@ proc waku_relay_subscribe( if sendReqRes.isErr(): let msg = $sendReqRes.error - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR return RET_OK proc waku_relay_unsubscribe( + ctx: ptr ptr Context, pubSubTopic: cstring, - onErrCb: WakuCallBack): cint + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = + ctx[][].userData = userData + let pst = pubSubTopic.alloc() let sendReqRes = waku_thread.sendRequestToWakuThread( + ctx[], RequestType.RELAY, RelayRequest.createShared(RelayMsgType.SUBSCRIBE, PubsubTopic($pst), @@ -264,17 +320,22 @@ proc waku_relay_unsubscribe( if sendReqRes.isErr(): let msg = $sendReqRes.error - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR return RET_OK -proc waku_connect(peerMultiAddr: cstring, +proc waku_connect(ctx: ptr ptr Context, + peerMultiAddr: cstring, timeoutMs: cuint, - onErrCb: WakuCallBack): cint + callback: WakuCallBack, + userData: pointer): cint {.dynlib, exportc.} = + ctx[][].userData = userData + let connRes = waku_thread.sendRequestToWakuThread( + ctx[], RequestType.PEER_MANAGER, PeerManagementRequest.createShared( PeerManagementMsgType.CONNECT_TO, @@ -282,7 +343,7 @@ proc waku_connect(peerMultiAddr: cstring, chronos.milliseconds(timeoutMs))) if connRes.isErr(): let msg = $connRes.error - onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR return RET_OK diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index a2965e4dd..807fb1fe7 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -25,8 +25,7 @@ type reqSignal: ThreadSignalPtr respChannel: ChannelSPSCSingle[ptr InterThreadResponse] respSignal: ThreadSignalPtr - -var ctx {.threadvar.}: ptr Context + userData*: pointer # To control when the thread is running var running: Atomic[bool] @@ -70,13 +69,13 @@ proc run(ctx: ptr Context) {.thread.} = tearDownForeignThreadGc() -proc createWakuThread*(): Result[void, string] = +proc createWakuThread*(): Result[ptr Context, string] = ## This proc is called from the main thread and it creates ## the Waku working thread. waku_init() - ctx = createShared(Context, 1) + var ctx = createShared(Context, 1) ctx.reqSignal = ThreadSignalPtr.new().valueOr: return err("couldn't create reqSignal ThreadSignalPtr") ctx.respSignal = ThreadSignalPtr.new().valueOr: @@ -92,16 +91,17 @@ proc createWakuThread*(): Result[void, string] = return err("failed to create the Waku thread: " & getCurrentExceptionMsg()) - return ok() + return ok(ctx) -proc stopWakuNodeThread*() = +proc stopWakuNodeThread*(ctx: ptr Context) = running.store(false) joinThread(ctx.thread) discard ctx.reqSignal.close() discard ctx.respSignal.close() freeShared(ctx) -proc sendRequestToWakuThread*(reqType: RequestType, +proc sendRequestToWakuThread*(ctx: ptr Context, + reqType: RequestType, reqContent: pointer): Result[string, string] = let req = InterThreadRequest.createShared(reqType, reqContent)