chore(cbindings): avoid using global var in libwaku.nim (#2118)

* libwaku: Avoid global variable and changing callback signature

* Better signature for the callback. Two new parameters have been added:
  one aimed to allow passing the caller result code; the other
  param is to pass an optional userData pointer that might need
  to be linked locally with the Context object. For example, this is needed
  in Rust to make the passed closures live as
  long as the Context.

* waku_example.c: adaptation to the latest changes

* libwaku.h: removing 'waku_set_user_data' function

* libwaku.nim: renaming parameter in WakuCallBack (isOk -> callerRet)
This commit is contained in:
Ivan FB 2023-10-23 08:37:28 +02:00 committed by GitHub
parent 1669f710ce
commit 1e8f577104
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 213 additions and 105 deletions

View File

@ -30,6 +30,12 @@ struct ConfigNode {
char peers[2048];
};
// libwaku Context
void* ctx;
// For the case of C language we don't need to store a particular userData
void* userData = NULL;
// Arguments parsing
static char doc[] = "\nC example that shows how to use the waku library.";
static char args_doc[] = "";
@ -78,8 +84,18 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
static struct argp argp = { options, parse_opt, args_doc, doc, 0, 0, 0 };
void event_handler(int callerRet, const char* msg, size_t len) {
if (callerRet == RET_ERR) {
printf("Error: %s\n", msg);
exit(1);
}
else if (callerRet == RET_OK) {
printf("Receiving message %s\n", msg);
}
}
char* contentTopic = NULL;
void handle_content_topic(const char* msg, size_t len) {
void handle_content_topic(int callerRet, const char* msg, size_t len) {
if (contentTopic != NULL) {
free(contentTopic);
}
@ -89,7 +105,7 @@ void handle_content_topic(const char* msg, size_t len) {
}
char* publishResponse = NULL;
void handle_publish_ok(const char* msg, size_t len) {
void handle_publish_ok(int callerRet, const char* msg, size_t len) {
printf("Publish Ok: %s %lu\n", msg, len);
if (publishResponse != NULL) {
@ -100,22 +116,19 @@ void handle_publish_ok(const char* msg, size_t len) {
strcpy(publishResponse, msg);
}
void handle_error(const char* msg, size_t len) {
printf("Error: %s\n", msg);
exit(1);
}
#define MAX_MSG_SIZE 65535
void publish_message(char* pubsubTopic, const char* msg) {
char jsonWakuMsg[MAX_MSG_SIZE];
char *msgPayload = b64_encode(msg, strlen(msg));
WAKU_CALL( waku_content_topic("appName",
WAKU_CALL( waku_content_topic(RET_OK,
"appName",
1,
"contentTopicName",
"encoding",
handle_content_topic) );
handle_content_topic,
userData) );
snprintf(jsonWakuMsg,
MAX_MSG_SIZE,
@ -124,10 +137,12 @@ void publish_message(char* pubsubTopic, const char* msg) {
free(msgPayload);
WAKU_CALL( waku_relay_publish(pubsubTopic,
WAKU_CALL( waku_relay_publish(&ctx,
pubsubTopic,
jsonWakuMsg,
10000 /*timeout ms*/,
handle_error) );
event_handler,
userData) );
printf("waku relay response [%s]\n", publishResponse);
}
@ -137,15 +152,11 @@ void show_help_and_exit() {
exit(1);
}
void event_handler(const char* msg, size_t len) {
printf("Receiving message %s\n", msg);
}
void print_default_pubsub_topic(const char* msg, size_t len) {
void print_default_pubsub_topic(int callerRet, const char* msg, size_t len) {
printf("Default pubsub topic: %s\n", msg);
}
void print_waku_version(const char* msg, size_t len) {
void print_waku_version(int callerRet, const char* msg, size_t len) {
printf("Git Version: %s\n", msg);
}
@ -186,8 +197,10 @@ void handle_user_input() {
char pubsubTopic[128];
scanf("%127s", pubsubTopic);
WAKU_CALL( waku_relay_subscribe(pubsubTopic,
handle_error) );
WAKU_CALL( waku_relay_subscribe(&ctx,
pubsubTopic,
event_handler,
userData) );
printf("The subscription went well\n");
show_main_menu();
@ -199,7 +212,7 @@ void handle_user_input() {
printf("e.g.: /ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\n");
char peerAddr[512];
scanf("%511s", peerAddr);
WAKU_CALL(waku_connect(peerAddr, 10000 /* timeoutMs */, handle_error));
WAKU_CALL(waku_connect(&ctx, peerAddr, 10000 /* timeoutMs */, event_handler, userData));
show_main_menu();
break;
@ -239,6 +252,8 @@ int main(int argc, char** argv) {
show_help_and_exit();
}
ctx = waku_init(event_handler, userData);
char jsonConfig[1024];
snprintf(jsonConfig, 1024, "{ \
\"host\": \"%s\", \
@ -250,25 +265,29 @@ int main(int argc, char** argv) {
cfgNode.key,
cfgNode.relay ? "true":"false");
WAKU_CALL( waku_default_pubsub_topic(print_default_pubsub_topic) );
WAKU_CALL( waku_version(print_waku_version) );
WAKU_CALL( waku_default_pubsub_topic(&ctx, print_default_pubsub_topic, userData) );
WAKU_CALL( waku_version(&ctx, print_waku_version, userData) );
printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port);
printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO");
WAKU_CALL( waku_new(jsonConfig, handle_error) );
WAKU_CALL( waku_new(&ctx, jsonConfig, event_handler, userData) );
waku_set_event_callback(event_handler);
waku_start();
waku_set_event_callback(event_handler, userData);
waku_start(&ctx, event_handler, userData);
printf("Establishing connection with: %s\n", cfgNode.peers);
WAKU_CALL( waku_connect(cfgNode.peers,
WAKU_CALL( waku_connect(&ctx,
cfgNode.peers,
10000 /* timeoutMs */,
handle_error) );
event_handler,
userData) );
WAKU_CALL( waku_relay_subscribe("/waku/2/default-waku/proto",
handle_error) );
WAKU_CALL( waku_relay_subscribe(&ctx,
"/waku/2/default-waku/proto",
event_handler,
userData) );
show_main_menu();
while(1) {
handle_user_input();

View File

@ -15,45 +15,73 @@
extern "C" {
#endif
typedef void (*WakuCallBack) (const char* msg, size_t len_0);
typedef void (*WakuCallBack) (int callerRet, const char* msg, size_t len);
// Initializes the waku library and returns a pointer to the Context.
void* waku_init(WakuCallBack callback,
void* userData);
// Creates a new instance of the waku node.
// Sets up the waku node from the given configuration.
int waku_new(const char* configJson, WakuCallBack onErrCb);
int waku_new(void* ctx,
const char* configJson,
WakuCallBack callback,
void* userData);
void waku_start(void);
int waku_start(void* ctx,
WakuCallBack callback,
void* userData);
void waku_stop(void);
int waku_stop(void* ctx,
WakuCallBack callback,
void* userData);
int waku_version(WakuCallBack onOkCb);
int waku_version(void* ctx,
WakuCallBack callback,
void* userData);
void waku_set_event_callback(WakuCallBack callback);
void waku_set_event_callback(WakuCallBack callback,
void* userData);
int waku_content_topic(const char* appName,
int waku_content_topic(void* ctx,
const char* appName,
unsigned int appVersion,
const char* contentTopicName,
const char* encoding,
WakuCallBack onOkCb);
WakuCallBack callback,
void* userData);
int waku_pubsub_topic(const char* topicName,
WakuCallBack onOkCb);
int waku_pubsub_topic(void* ctx,
const char* topicName,
WakuCallBack callback,
void* userData);
int waku_default_pubsub_topic(WakuCallBack onOkCb);
int waku_default_pubsub_topic(void* ctx,
WakuCallBack callback,
void* userData);
int waku_relay_publish(const char* pubSubTopic,
int waku_relay_publish(void* ctx,
const char* pubSubTopic,
const char* jsonWakuMessage,
unsigned int timeoutMs,
WakuCallBack onErrCb);
WakuCallBack callback,
void* userData);
int waku_relay_subscribe(const char* pubSubTopic,
WakuCallBack onErrCb);
int waku_relay_subscribe(void* ctx,
const char* pubSubTopic,
WakuCallBack callback,
void* userData);
int waku_relay_unsubscribe(const char* pubSubTopic,
WakuCallBack onErrCb);
int waku_relay_unsubscribe(void* ctx,
const char* pubSubTopic,
WakuCallBack callback,
void* userData);
int waku_connect(const char* peerMultiAddr,
int waku_connect(void* ctx,
const char* peerMultiAddr,
unsigned int timeoutMs,
WakuCallBack onErrCb);
WakuCallBack callback,
void* userData);
#ifdef __cplusplus
}

View File

@ -4,8 +4,7 @@
{.passc: "-fPIC".}
import
std/[json,sequtils,times,strformat,options,atomics,strutils],
strutils
std/[json,sequtils,times,strformat,options,atomics,strutils]
import
chronicles,
chronos
@ -34,7 +33,9 @@ const RET_ERR: cint = 1
const RET_MISSING_CALLBACK: cint = 2
type
WakuCallBack* = proc(msg: ptr cchar, len: csize_t) {.cdecl, gcsafe.}
WakuCallBack* = proc(callerRet: cint,
msg: ptr cchar,
len: csize_t) {.cdecl, gcsafe.}
### End of exported types
################################################################################
@ -51,10 +52,11 @@ proc relayEventCallback(pubsubTopic: PubsubTopic,
if not isNil(extEventCallback):
try:
let event = $JsonMessageEvent.new(pubsubTopic, msg)
extEventCallback(unsafeAddr event[0], cast[csize_t](len(event)))
extEventCallback(RET_OK, unsafeAddr event[0], cast[csize_t](len(event)))
except Exception,CatchableError:
error "Exception when calling 'eventCallBack': " &
getCurrentExceptionMsg()
let msg = "Exception when calling 'eventCallBack': " &
getCurrentExceptionMsg()
extEventCallback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
else:
error "extEventCallback is nil"
@ -64,53 +66,72 @@ proc relayEventCallback(pubsubTopic: PubsubTopic,
################################################################################
### Exported procs
proc waku_new(configJson: cstring,
onErrCb: WakuCallback): cint
{.dynlib, exportc, cdecl.} =
# Creates a new instance of the WakuNode.
# Notice that the ConfigNode type is also exported and available for users.
if isNil(onErrCb):
return RET_MISSING_CALLBACK
proc waku_init(callback: WakuCallback): pointer {.dynlib, exportc, cdecl.} =
## Initializes the waku library.
## Create the Waku thread that will keep waiting for req from the main thread.
waku_thread.createWakuThread().isOkOr:
var ctx = waku_thread.createWakuThread().valueOr:
let msg = "Error in createWakuThread: " & $error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
return nil
return ctx
proc waku_new(ctx: ptr ptr Context,
configJson: cstring,
callback: WakuCallback,
userData: pointer): cint
{.dynlib, exportc, cdecl.} =
## Creates a new instance of the WakuNode.
## Notice that the ConfigNode type is also exported and available for users.
ctx[][].userData = userData
if isNil(callback):
return RET_MISSING_CALLBACK
let sendReqRes = waku_thread.sendRequestToWakuThread(
ctx[],
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(
NodeLifecycleMsgType.CREATE_NODE,
configJson))
if sendReqRes.isErr():
let msg = $sendReqRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
return RET_OK
proc waku_version(onOkCb: WakuCallBack): cint {.dynlib, exportc.} =
if isNil(onOkCb):
proc waku_version(ctx: ptr ptr Context,
callback: WakuCallBack,
userData: pointer): cint {.dynlib, exportc.} =
ctx[][].userData = userData
if isNil(callback):
return RET_MISSING_CALLBACK
onOkCb(cast[ptr cchar](WakuNodeVersionString),
cast[csize_t](len(WakuNodeVersionString)))
callback(RET_OK, cast[ptr cchar](WakuNodeVersionString),
cast[csize_t](len(WakuNodeVersionString)))
return RET_OK
proc waku_set_event_callback(callback: WakuCallBack) {.dynlib, exportc.} =
extEventCallback = callback
proc waku_content_topic(appName: cstring,
proc waku_content_topic(ctx: ptr ptr Context,
appName: cstring,
appVersion: cuint,
contentTopicName: cstring,
encoding: cstring,
onOkCb: WakuCallBack): cint {.dynlib, exportc.} =
callback: WakuCallBack,
userData: pointer): cint {.dynlib, exportc.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding
if isNil(onOkCb):
ctx[][].userData = userData
if isNil(callback):
return RET_MISSING_CALLBACK
let appStr = appName.alloc()
@ -118,7 +139,7 @@ proc waku_content_topic(appName: cstring,
let encodingStr = encoding.alloc()
let contentTopic = fmt"/{$appStr}/{appVersion}/{$ctnStr}/{$encodingStr}"
onOkCb(unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic)))
callback(RET_OK, unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic)))
deallocShared(appStr)
deallocShared(ctnStr)
@ -126,40 +147,53 @@ proc waku_content_topic(appName: cstring,
return RET_OK
proc waku_pubsub_topic(topicName: cstring,
onOkCb: WakuCallBack): cint {.dynlib, exportc, cdecl.} =
if isNil(onOkCb):
proc waku_pubsub_topic(ctx: ptr ptr Context,
topicName: cstring,
callback: WakuCallBack,
userData: pointer): cint {.dynlib, exportc, cdecl.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding
ctx[][].userData = userData
if isNil(callback):
return RET_MISSING_CALLBACK
let topicNameStr = topicName.alloc()
# https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding
let outPubsubTopic = fmt"/waku/2/{$topicNameStr}"
onOkCb(unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic)))
callback(RET_OK, unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic)))
deallocShared(topicNameStr)
return RET_OK
proc waku_default_pubsub_topic(onOkCb: WakuCallBack): cint {.dynlib, exportc.} =
proc waku_default_pubsub_topic(ctx: ptr ptr Context,
callback: WakuCallBack,
userData: pointer): cint {.dynlib, exportc.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic
if isNil(onOkCb):
ctx[][].userData = userData
if isNil(callback):
return RET_MISSING_CALLBACK
onOkCb(cast[ptr cchar](DefaultPubsubTopic),
cast[csize_t](len(DefaultPubsubTopic)))
callback(RET_OK, cast[ptr cchar](DefaultPubsubTopic), cast[csize_t](len(DefaultPubsubTopic)))
return RET_OK
proc waku_relay_publish(pubSubTopic: cstring,
proc waku_relay_publish(ctx: ptr ptr Context,
pubSubTopic: cstring,
jsonWakuMessage: cstring,
timeoutMs: cuint,
onErrCb: WakuCallBack): cint
callback: WakuCallBack,
userData: pointer): cint
{.dynlib, exportc, cdecl.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms
if isNil(onErrCb):
ctx[][].userData = userData
if isNil(callback):
return RET_MISSING_CALLBACK
let jwm = jsonWakuMessage.alloc()
@ -169,7 +203,7 @@ proc waku_relay_publish(pubSubTopic: cstring,
except JsonParsingError:
deallocShared(jwm)
let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}"
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
deallocShared(jwm)
@ -190,7 +224,7 @@ proc waku_relay_publish(pubSubTopic: cstring,
)
except KeyError:
let msg = fmt"Problem building the WakuMessage: {getCurrentExceptionMsg()}"
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
let pst = pubSubTopic.alloc()
@ -201,6 +235,7 @@ proc waku_relay_publish(pubSubTopic: cstring,
$pst
let sendReqRes = waku_thread.sendRequestToWakuThread(
ctx[],
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.PUBLISH,
PubsubTopic($pst),
@ -210,31 +245,47 @@ proc waku_relay_publish(pubSubTopic: cstring,
if sendReqRes.isErr():
let msg = $sendReqRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
return RET_OK
proc waku_start() {.dynlib, exportc.} =
proc waku_start(ctx: ptr ptr Context,
callback: WakuCallBack,
userData: pointer): cint {.dynlib, exportc.} =
ctx[][].userData = userData
## TODO: handle the error
discard waku_thread.sendRequestToWakuThread(
ctx[],
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(
NodeLifecycleMsgType.START_NODE))
proc waku_stop() {.dynlib, exportc.} =
proc waku_stop(ctx: ptr ptr Context,
callback: WakuCallBack,
userData: pointer): cint {.dynlib, exportc.} =
ctx[][].userData = userData
## TODO: handle the error
discard waku_thread.sendRequestToWakuThread(
ctx[],
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(
NodeLifecycleMsgType.STOP_NODE))
proc waku_relay_subscribe(
ctx: ptr ptr Context,
pubSubTopic: cstring,
onErrCb: WakuCallBack): cint
callback: WakuCallBack,
userData: pointer): cint
{.dynlib, exportc.} =
ctx[][].userData = userData
let pst = pubSubTopic.alloc()
let sendReqRes = waku_thread.sendRequestToWakuThread(
ctx[],
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
@ -243,19 +294,24 @@ proc waku_relay_subscribe(
if sendReqRes.isErr():
let msg = $sendReqRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
return RET_OK
proc waku_relay_unsubscribe(
ctx: ptr ptr Context,
pubSubTopic: cstring,
onErrCb: WakuCallBack): cint
callback: WakuCallBack,
userData: pointer): cint
{.dynlib, exportc.} =
ctx[][].userData = userData
let pst = pubSubTopic.alloc()
let sendReqRes = waku_thread.sendRequestToWakuThread(
ctx[],
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
@ -264,17 +320,22 @@ proc waku_relay_unsubscribe(
if sendReqRes.isErr():
let msg = $sendReqRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
return RET_OK
proc waku_connect(peerMultiAddr: cstring,
proc waku_connect(ctx: ptr ptr Context,
peerMultiAddr: cstring,
timeoutMs: cuint,
onErrCb: WakuCallBack): cint
callback: WakuCallBack,
userData: pointer): cint
{.dynlib, exportc.} =
ctx[][].userData = userData
let connRes = waku_thread.sendRequestToWakuThread(
ctx[],
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
PeerManagementMsgType.CONNECT_TO,
@ -282,7 +343,7 @@ proc waku_connect(peerMultiAddr: cstring,
chronos.milliseconds(timeoutMs)))
if connRes.isErr():
let msg = $connRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR
return RET_OK

View File

@ -25,8 +25,7 @@ type
reqSignal: ThreadSignalPtr
respChannel: ChannelSPSCSingle[ptr InterThreadResponse]
respSignal: ThreadSignalPtr
var ctx {.threadvar.}: ptr Context
userData*: pointer
# To control when the thread is running
var running: Atomic[bool]
@ -70,13 +69,13 @@ proc run(ctx: ptr Context) {.thread.} =
tearDownForeignThreadGc()
proc createWakuThread*(): Result[void, string] =
proc createWakuThread*(): Result[ptr Context, string] =
## This proc is called from the main thread and it creates
## the Waku working thread.
waku_init()
ctx = createShared(Context, 1)
var ctx = createShared(Context, 1)
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr")
ctx.respSignal = ThreadSignalPtr.new().valueOr:
@ -92,16 +91,17 @@ proc createWakuThread*(): Result[void, string] =
return err("failed to create the Waku thread: " & getCurrentExceptionMsg())
return ok()
return ok(ctx)
proc stopWakuNodeThread*() =
proc stopWakuNodeThread*(ctx: ptr Context) =
running.store(false)
joinThread(ctx.thread)
discard ctx.reqSignal.close()
discard ctx.respSignal.close()
freeShared(ctx)
proc sendRequestToWakuThread*(reqType: RequestType,
proc sendRequestToWakuThread*(ctx: ptr Context,
reqType: RequestType,
reqContent: pointer): Result[string, string] =
let req = InterThreadRequest.createShared(reqType, reqContent)