feat: eventCallback per wakunode and userData (#2418)

* feat: store event callback in `Context`
* feat: add userData to callbacks
This commit is contained in:
richΛrd 2024-02-13 10:22:22 -04:00 committed by GitHub
parent 66bc31cede
commit 1ac6fb63e2
5 changed files with 39 additions and 38 deletions

View File

@ -294,7 +294,7 @@ int main(int argc, char** argv) {
printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port); printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port);
printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO"); printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO");
waku_set_event_callback(event_handler, userData); waku_set_event_callback(ctx, event_handler, userData);
waku_start(ctx, event_handler, userData); waku_start(ctx, event_handler, userData);
printf("Establishing connection with: %s\n", cfgNode.peers); printf("Establishing connection with: %s\n", cfgNode.peers);

View File

@ -2,4 +2,5 @@
type type
WakuCallBack* = proc(callerRet: cint, WakuCallBack* = proc(callerRet: cint,
msg: ptr cchar, msg: ptr cchar,
len: csize_t) {.cdecl, gcsafe, raises: [].} len: csize_t,
userData: pointer) {.cdecl, gcsafe, raises: [].}

View File

@ -17,7 +17,7 @@
extern "C" { extern "C" {
#endif #endif
typedef void (*WakuCallBack) (int callerRet, const char* msg, size_t len); typedef void (*WakuCallBack) (int callerRet, const char* msg, size_t len, void* userData);
// Creates a new instance of the waku node. // Creates a new instance of the waku node.
// Sets up the waku node from the given configuration. // Sets up the waku node from the given configuration.
@ -39,7 +39,8 @@ int waku_version(void* ctx,
WakuCallBack callback, WakuCallBack callback,
void* userData); void* userData);
void waku_set_event_callback(WakuCallBack callback, void waku_set_event_callback(void* ctx,
WakuCallBack callback,
void* userData); void* userData);
int waku_content_topic(void* ctx, int waku_content_topic(void* ctx,

View File

@ -40,22 +40,19 @@ const RET_MISSING_CALLBACK: cint = 2
################################################################################ ################################################################################
### Not-exported components ### Not-exported components
# May keep a reference to a callback defined externally proc relayEventCallback(ctx: ptr Context): WakuRelayHandler =
var extEventCallback*: WakuCallBack = nil return proc (pubsubTopic: PubsubTopic, msg: WakuMessage): Future[system.void]{.async.} =
# Callback that hadles the Waku Relay events. i.e. messages or errors.
proc relayEventCallback(pubsubTopic: PubsubTopic, if not isNil(ctx[].eventCallback):
msg: WakuMessage): Future[void] {.async.} = try:
# Callback that hadles the Waku Relay events. i.e. messages or errors. let event = $JsonMessageEvent.new(pubsubTopic, msg)
if not isNil(extEventCallback): cast[WakuCallBack](ctx[].eventCallback)(RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), nil)
try: except Exception,CatchableError:
let event = $JsonMessageEvent.new(pubsubTopic, msg) let msg = "Exception when calling 'eventCallBack': " &
extEventCallback(RET_OK, unsafeAddr event[0], cast[csize_t](len(event))) getCurrentExceptionMsg()
except Exception,CatchableError: cast[WakuCallBack](ctx[].eventCallback)(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), nil)
let msg = "Exception when calling 'eventCallBack': " & else:
getCurrentExceptionMsg() error "eventCallback is nil"
extEventCallback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
else:
error "extEventCallback is nil"
### End of not-exported components ### End of not-exported components
################################################################################ ################################################################################
@ -76,7 +73,7 @@ proc waku_new(configJson: cstring,
## Create the Waku thread that will keep waiting for req from the main thread. ## Create the Waku thread that will keep waiting for req from the main thread.
var ctx = waku_thread.createWakuThread().valueOr: var ctx = waku_thread.createWakuThread().valueOr:
let msg = "Error in createWakuThread: " & $error let msg = "Error in createWakuThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil return nil
ctx.userData = userData ctx.userData = userData
@ -89,7 +86,7 @@ proc waku_new(configJson: cstring,
configJson)) configJson))
if sendReqRes.isErr(): if sendReqRes.isErr():
let msg = $sendReqRes.error let msg = $sendReqRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil return nil
return ctx return ctx
@ -104,12 +101,13 @@ proc waku_version(ctx: ptr Context,
return RET_MISSING_CALLBACK return RET_MISSING_CALLBACK
callback(RET_OK, cast[ptr cchar](WakuNodeVersionString), callback(RET_OK, cast[ptr cchar](WakuNodeVersionString),
cast[csize_t](len(WakuNodeVersionString))) cast[csize_t](len(WakuNodeVersionString)), userData)
return RET_OK return RET_OK
proc waku_set_event_callback(callback: WakuCallBack) {.dynlib, exportc.} = proc waku_set_event_callback(ctx: ptr Context,
extEventCallback = callback callback: WakuCallBack) {.dynlib, exportc.} =
ctx[].eventCallback = cast[pointer](callback)
proc waku_content_topic(ctx: ptr Context, proc waku_content_topic(ctx: ptr Context,
appName: cstring, appName: cstring,
@ -130,7 +128,7 @@ proc waku_content_topic(ctx: ptr Context,
let encodingStr = encoding.alloc() let encodingStr = encoding.alloc()
let contentTopic = fmt"/{$appStr}/{appVersion}/{$ctnStr}/{$encodingStr}" let contentTopic = fmt"/{$appStr}/{appVersion}/{$ctnStr}/{$encodingStr}"
callback(RET_OK, unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic))) callback(RET_OK, unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic)), userData)
deallocShared(appStr) deallocShared(appStr)
deallocShared(ctnStr) deallocShared(ctnStr)
@ -152,7 +150,7 @@ proc waku_pubsub_topic(ctx: ptr Context,
let topicNameStr = topicName.alloc() let topicNameStr = topicName.alloc()
let outPubsubTopic = fmt"/waku/2/{$topicNameStr}" let outPubsubTopic = fmt"/waku/2/{$topicNameStr}"
callback(RET_OK, unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic))) callback(RET_OK, unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic)), userData)
deallocShared(topicNameStr) deallocShared(topicNameStr)
@ -168,7 +166,7 @@ proc waku_default_pubsub_topic(ctx: ptr Context,
if isNil(callback): if isNil(callback):
return RET_MISSING_CALLBACK return RET_MISSING_CALLBACK
callback(RET_OK, cast[ptr cchar](DefaultPubsubTopic), cast[csize_t](len(DefaultPubsubTopic))) callback(RET_OK, cast[ptr cchar](DefaultPubsubTopic), cast[csize_t](len(DefaultPubsubTopic)), userData)
return RET_OK return RET_OK
@ -194,7 +192,7 @@ proc waku_relay_publish(ctx: ptr Context,
except JsonParsingError: except JsonParsingError:
deallocShared(jwm) deallocShared(jwm)
let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}" let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR return RET_ERR
deallocShared(jwm) deallocShared(jwm)
@ -215,7 +213,7 @@ proc waku_relay_publish(ctx: ptr Context,
) )
except KeyError: except KeyError:
let msg = fmt"Problem building the WakuMessage: {getCurrentExceptionMsg()}" let msg = fmt"Problem building the WakuMessage: {getCurrentExceptionMsg()}"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR return RET_ERR
let pst = pubSubTopic.alloc() let pst = pubSubTopic.alloc()
@ -230,13 +228,13 @@ proc waku_relay_publish(ctx: ptr Context,
RequestType.RELAY, RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.PUBLISH, RelayRequest.createShared(RelayMsgType.PUBLISH,
PubsubTopic($pst), PubsubTopic($pst),
WakuRelayHandler(relayEventCallback), WakuRelayHandler(relayEventCallback(ctx)),
wakuMessage)) wakuMessage))
deallocShared(pst) deallocShared(pst)
if sendReqRes.isErr(): if sendReqRes.isErr():
let msg = $sendReqRes.error let msg = $sendReqRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR return RET_ERR
return RET_OK return RET_OK
@ -274,18 +272,18 @@ proc waku_relay_subscribe(
ctx[].userData = userData ctx[].userData = userData
let pst = pubSubTopic.alloc() let pst = pubSubTopic.alloc()
var cb = relayEventCallback(ctx)
let sendReqRes = waku_thread.sendRequestToWakuThread( let sendReqRes = waku_thread.sendRequestToWakuThread(
ctx, ctx,
RequestType.RELAY, RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.SUBSCRIBE, RelayRequest.createShared(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst), PubsubTopic($pst),
WakuRelayHandler(relayEventCallback))) WakuRelayHandler(cb)))
deallocShared(pst) deallocShared(pst)
if sendReqRes.isErr(): if sendReqRes.isErr():
let msg = $sendReqRes.error let msg = $sendReqRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR return RET_ERR
return RET_OK return RET_OK
@ -306,12 +304,12 @@ proc waku_relay_unsubscribe(
RequestType.RELAY, RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.SUBSCRIBE, RelayRequest.createShared(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst), PubsubTopic($pst),
WakuRelayHandler(relayEventCallback))) WakuRelayHandler(relayEventCallback(ctx))))
deallocShared(pst) deallocShared(pst)
if sendReqRes.isErr(): if sendReqRes.isErr():
let msg = $sendReqRes.error let msg = $sendReqRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR return RET_ERR
return RET_OK return RET_OK
@ -334,7 +332,7 @@ proc waku_connect(ctx: ptr Context,
chronos.milliseconds(timeoutMs))) chronos.milliseconds(timeoutMs)))
if connRes.isErr(): if connRes.isErr():
let msg = $connRes.error let msg = $connRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg))) callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR return RET_ERR
return RET_OK return RET_OK

View File

@ -26,6 +26,7 @@ type
respChannel: ChannelSPSCSingle[ptr InterThreadResponse] respChannel: ChannelSPSCSingle[ptr InterThreadResponse]
respSignal: ThreadSignalPtr respSignal: ThreadSignalPtr
userData*: pointer userData*: pointer
eventCallback*: pointer
# To control when the thread is running # To control when the thread is running
var running: Atomic[bool] var running: Atomic[bool]