From 707f3e8bf08a2b961114e6a99d3eddc10a2d3825 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Tue, 13 Feb 2024 10:22:22 -0400 Subject: [PATCH] feat: `eventCallback` per wakunode and `userData` (#2418) * feat: store event callback in `Context` * feat: add userData to callbacks --- examples/cbindings/waku_example.c | 2 +- library/callback.nim | 3 +- library/libwaku.h | 5 ++- library/libwaku.nim | 66 ++++++++++++++--------------- library/waku_thread/waku_thread.nim | 1 + 5 files changed, 39 insertions(+), 38 deletions(-) diff --git a/examples/cbindings/waku_example.c b/examples/cbindings/waku_example.c index bca7f771a..7053dd71c 100644 --- a/examples/cbindings/waku_example.c +++ b/examples/cbindings/waku_example.c @@ -294,7 +294,7 @@ int main(int argc, char** argv) { printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port); printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO"); - waku_set_event_callback(event_handler, userData); + waku_set_event_callback(ctx, event_handler, userData); waku_start(ctx, event_handler, userData); printf("Establishing connection with: %s\n", cfgNode.peers); diff --git a/library/callback.nim b/library/callback.nim index 55301e7bb..e811dc06f 100644 --- a/library/callback.nim +++ b/library/callback.nim @@ -2,4 +2,5 @@ type WakuCallBack* = proc(callerRet: cint, msg: ptr cchar, - len: csize_t) {.cdecl, gcsafe, raises: [].} + len: csize_t, + userData: pointer) {.cdecl, gcsafe, raises: [].} diff --git a/library/libwaku.h b/library/libwaku.h index feb197307..4a30a8ad5 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -17,7 +17,7 @@ extern "C" { #endif -typedef void (*WakuCallBack) (int callerRet, const char* msg, size_t len); +typedef void (*WakuCallBack) (int callerRet, const char* msg, size_t len, void* userData); // Creates a new instance of the waku node. // Sets up the waku node from the given configuration. @@ -39,7 +39,8 @@ int waku_version(void* ctx, WakuCallBack callback, void* userData); -void waku_set_event_callback(WakuCallBack callback, +void waku_set_event_callback(void* ctx, + WakuCallBack callback, void* userData); int waku_content_topic(void* ctx, diff --git a/library/libwaku.nim b/library/libwaku.nim index eab08ed04..976948bff 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -40,22 +40,19 @@ const RET_MISSING_CALLBACK: cint = 2 ################################################################################ ### Not-exported components -# May keep a reference to a callback defined externally -var extEventCallback*: WakuCallBack = nil - -proc relayEventCallback(pubsubTopic: PubsubTopic, - msg: WakuMessage): Future[void] {.async.} = - # Callback that hadles the Waku Relay events. i.e. messages or errors. - if not isNil(extEventCallback): - try: - let event = $JsonMessageEvent.new(pubsubTopic, msg) - extEventCallback(RET_OK, unsafeAddr event[0], cast[csize_t](len(event))) - except Exception,CatchableError: - let msg = "Exception when calling 'eventCallBack': " & - getCurrentExceptionMsg() - extEventCallback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) - else: - error "extEventCallback is nil" +proc relayEventCallback(ctx: ptr Context): WakuRelayHandler = + return proc (pubsubTopic: PubsubTopic, msg: WakuMessage): Future[system.void]{.async.} = + # Callback that hadles the Waku Relay events. i.e. messages or errors. + if not isNil(ctx[].eventCallback): + try: + let event = $JsonMessageEvent.new(pubsubTopic, msg) + cast[WakuCallBack](ctx[].eventCallback)(RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), nil) + except Exception,CatchableError: + let msg = "Exception when calling 'eventCallBack': " & + getCurrentExceptionMsg() + cast[WakuCallBack](ctx[].eventCallback)(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), nil) + else: + error "eventCallback is nil" ### End of not-exported components ################################################################################ @@ -76,7 +73,7 @@ proc waku_new(configJson: cstring, ## Create the Waku thread that will keep waiting for req from the main thread. var ctx = waku_thread.createWakuThread().valueOr: let msg = "Error in createWakuThread: " & $error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return nil ctx.userData = userData @@ -89,7 +86,7 @@ proc waku_new(configJson: cstring, configJson)) if sendReqRes.isErr(): let msg = $sendReqRes.error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return nil return ctx @@ -104,12 +101,13 @@ proc waku_version(ctx: ptr Context, return RET_MISSING_CALLBACK callback(RET_OK, cast[ptr cchar](WakuNodeVersionString), - cast[csize_t](len(WakuNodeVersionString))) + cast[csize_t](len(WakuNodeVersionString)), userData) return RET_OK -proc waku_set_event_callback(callback: WakuCallBack) {.dynlib, exportc.} = - extEventCallback = callback +proc waku_set_event_callback(ctx: ptr Context, + callback: WakuCallBack) {.dynlib, exportc.} = + ctx[].eventCallback = cast[pointer](callback) proc waku_content_topic(ctx: ptr Context, appName: cstring, @@ -130,7 +128,7 @@ proc waku_content_topic(ctx: ptr Context, let encodingStr = encoding.alloc() let contentTopic = fmt"/{$appStr}/{appVersion}/{$ctnStr}/{$encodingStr}" - callback(RET_OK, unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic))) + callback(RET_OK, unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic)), userData) deallocShared(appStr) deallocShared(ctnStr) @@ -152,7 +150,7 @@ proc waku_pubsub_topic(ctx: ptr Context, let topicNameStr = topicName.alloc() let outPubsubTopic = fmt"/waku/2/{$topicNameStr}" - callback(RET_OK, unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic))) + callback(RET_OK, unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic)), userData) deallocShared(topicNameStr) @@ -168,7 +166,7 @@ proc waku_default_pubsub_topic(ctx: ptr Context, if isNil(callback): return RET_MISSING_CALLBACK - callback(RET_OK, cast[ptr cchar](DefaultPubsubTopic), cast[csize_t](len(DefaultPubsubTopic))) + callback(RET_OK, cast[ptr cchar](DefaultPubsubTopic), cast[csize_t](len(DefaultPubsubTopic)), userData) return RET_OK @@ -194,7 +192,7 @@ proc waku_relay_publish(ctx: ptr Context, except JsonParsingError: deallocShared(jwm) let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR deallocShared(jwm) @@ -215,7 +213,7 @@ proc waku_relay_publish(ctx: ptr Context, ) except KeyError: let msg = fmt"Problem building the WakuMessage: {getCurrentExceptionMsg()}" - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR let pst = pubSubTopic.alloc() @@ -230,13 +228,13 @@ proc waku_relay_publish(ctx: ptr Context, RequestType.RELAY, RelayRequest.createShared(RelayMsgType.PUBLISH, PubsubTopic($pst), - WakuRelayHandler(relayEventCallback), + WakuRelayHandler(relayEventCallback(ctx)), wakuMessage)) deallocShared(pst) if sendReqRes.isErr(): let msg = $sendReqRes.error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR return RET_OK @@ -274,18 +272,18 @@ proc waku_relay_subscribe( ctx[].userData = userData let pst = pubSubTopic.alloc() - + var cb = relayEventCallback(ctx) let sendReqRes = waku_thread.sendRequestToWakuThread( ctx, RequestType.RELAY, RelayRequest.createShared(RelayMsgType.SUBSCRIBE, PubsubTopic($pst), - WakuRelayHandler(relayEventCallback))) + WakuRelayHandler(cb))) deallocShared(pst) if sendReqRes.isErr(): let msg = $sendReqRes.error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR return RET_OK @@ -306,12 +304,12 @@ proc waku_relay_unsubscribe( RequestType.RELAY, RelayRequest.createShared(RelayMsgType.SUBSCRIBE, PubsubTopic($pst), - WakuRelayHandler(relayEventCallback))) + WakuRelayHandler(relayEventCallback(ctx)))) deallocShared(pst) if sendReqRes.isErr(): let msg = $sendReqRes.error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR return RET_OK @@ -334,7 +332,7 @@ proc waku_connect(ctx: ptr Context, chronos.milliseconds(timeoutMs))) if connRes.isErr(): let msg = $connRes.error - callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) + callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_ERR return RET_OK diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index 807fb1fe7..d69c7099e 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -26,6 +26,7 @@ type respChannel: ChannelSPSCSingle[ptr InterThreadResponse] respSignal: ThreadSignalPtr userData*: pointer + eventCallback*: pointer # To control when the thread is running var running: Atomic[bool]