Start using nim-ffi to implement libwaku (#3656)

* deep changes in libwaku to adap to nim-ffi
* start using ffi pragma in library
* update some binding examples
* add missing declare_lib.nim file
* properly rename api files in library folder
This commit is contained in:
Ivan FB 2025-12-19 17:00:43 +01:00 committed by GitHub
parent 834eea945d
commit e3dd6203ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 1441 additions and 2752 deletions

5
.gitmodules vendored
View File

@ -184,3 +184,8 @@
url = https://github.com/logos-messaging/waku-rlnv2-contract.git url = https://github.com/logos-messaging/waku-rlnv2-contract.git
ignore = untracked ignore = untracked
branch = master branch = master
[submodule "vendor/nim-ffi"]
path = vendor/nim-ffi
url = https://github.com/logos-messaging/nim-ffi/
ignore = untracked
branch = master

View File

@ -19,9 +19,11 @@ pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int callback_executed = 0; int callback_executed = 0;
void waitForCallback() { void waitForCallback()
{
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
while (!callback_executed) { while (!callback_executed)
{
pthread_cond_wait(&cond, &mutex); pthread_cond_wait(&cond, &mutex);
} }
callback_executed = 0; callback_executed = 0;
@ -29,16 +31,19 @@ void waitForCallback() {
} }
#define WAKU_CALL(call) \ #define WAKU_CALL(call) \
do { \ do \
{ \
int ret = call; \ int ret = call; \
if (ret != 0) { \ if (ret != 0) \
{ \
printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \ printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \
exit(1); \ exit(1); \
} \ } \
waitForCallback(); \ waitForCallback(); \
} while (0) } while (0)
struct ConfigNode { struct ConfigNode
{
char host[128]; char host[128];
int port; int port;
char key[128]; char key[128];
@ -70,13 +75,14 @@ static struct argp_option options[] = {
{"relay", 'r', "RELAY", 0, "Enable relay protocol: 1 or 0. (default: 1)"}, {"relay", 'r', "RELAY", 0, "Enable relay protocol: 1 or 0. (default: 1)"},
{"peers", 'a', "PEERS", 0, "Comma-separated list of peer-multiaddress to connect\ {"peers", 'a', "PEERS", 0, "Comma-separated list of peer-multiaddress to connect\
to. (default: \"\") e.g. \"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\""}, to. (default: \"\") e.g. \"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\""},
{ 0 } {0}};
};
static error_t parse_opt(int key, char *arg, struct argp_state *state) { static error_t parse_opt(int key, char *arg, struct argp_state *state)
{
struct ConfigNode *cfgNode = state->input; struct ConfigNode *cfgNode = state->input;
switch (key) { switch (key)
{
case 'h': case 'h':
snprintf(cfgNode->host, 128, "%s", arg); snprintf(cfgNode->host, 128, "%s", arg);
break; break;
@ -105,7 +111,8 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
return 0; return 0;
} }
void signal_cond() { void signal_cond()
{
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
callback_executed = 1; callback_executed = 1;
pthread_cond_signal(&cond); pthread_cond_signal(&cond);
@ -114,31 +121,39 @@ void signal_cond() {
static struct argp argp = {options, parse_opt, args_doc, doc, 0, 0, 0}; static struct argp argp = {options, parse_opt, args_doc, doc, 0, 0, 0};
void event_handler(int callerRet, const char* msg, size_t len, void* userData) { void event_handler(int callerRet, const char *msg, size_t len, void *userData)
if (callerRet == RET_ERR) { {
if (callerRet == RET_ERR)
{
printf("Error: %s\n", msg); printf("Error: %s\n", msg);
exit(1); exit(1);
} }
else if (callerRet == RET_OK) { else if (callerRet == RET_OK)
{
printf("Receiving event: %s\n", msg); printf("Receiving event: %s\n", msg);
} }
signal_cond(); signal_cond();
} }
void on_event_received(int callerRet, const char* msg, size_t len, void* userData) { void on_event_received(int callerRet, const char *msg, size_t len, void *userData)
if (callerRet == RET_ERR) { {
if (callerRet == RET_ERR)
{
printf("Error: %s\n", msg); printf("Error: %s\n", msg);
exit(1); exit(1);
} }
else if (callerRet == RET_OK) { else if (callerRet == RET_OK)
{
printf("Receiving event: %s\n", msg); printf("Receiving event: %s\n", msg);
} }
} }
char *contentTopic = NULL; char *contentTopic = NULL;
void handle_content_topic(int callerRet, const char* msg, size_t len, void* userData) { void handle_content_topic(int callerRet, const char *msg, size_t len, void *userData)
if (contentTopic != NULL) { {
if (contentTopic != NULL)
{
free(contentTopic); free(contentTopic);
} }
@ -148,10 +163,12 @@ void handle_content_topic(int callerRet, const char* msg, size_t len, void* user
} }
char *publishResponse = NULL; char *publishResponse = NULL;
void handle_publish_ok(int callerRet, const char* msg, size_t len, void* userData) { void handle_publish_ok(int callerRet, const char *msg, size_t len, void *userData)
{
printf("Publish Ok: %s %lu\n", msg, len); printf("Publish Ok: %s %lu\n", msg, len);
if (publishResponse != NULL) { if (publishResponse != NULL)
{
free(publishResponse); free(publishResponse);
} }
@ -161,17 +178,18 @@ void handle_publish_ok(int callerRet, const char* msg, size_t len, void* userDat
#define MAX_MSG_SIZE 65535 #define MAX_MSG_SIZE 65535
void publish_message(const char* msg) { void publish_message(const char *msg)
{
char jsonWakuMsg[MAX_MSG_SIZE]; char jsonWakuMsg[MAX_MSG_SIZE];
char *msgPayload = b64_encode(msg, strlen(msg)); char *msgPayload = b64_encode(msg, strlen(msg));
WAKU_CALL(waku_content_topic(ctx, WAKU_CALL(waku_content_topic(ctx,
handle_content_topic,
userData,
"appName", "appName",
1, 1,
"contentTopicName", "contentTopicName",
"encoding", "encoding"));
handle_content_topic,
userData) );
snprintf(jsonWakuMsg, snprintf(jsonWakuMsg,
MAX_MSG_SIZE, MAX_MSG_SIZE,
"{\"payload\":\"%s\",\"contentTopic\":\"%s\"}", "{\"payload\":\"%s\",\"contentTopic\":\"%s\"}",
@ -180,31 +198,35 @@ void publish_message(const char* msg) {
free(msgPayload); free(msgPayload);
WAKU_CALL(waku_relay_publish(ctx, WAKU_CALL(waku_relay_publish(ctx,
event_handler,
userData,
"/waku/2/rs/16/32", "/waku/2/rs/16/32",
jsonWakuMsg, jsonWakuMsg,
10000 /*timeout ms*/, 10000 /*timeout ms*/));
event_handler,
userData) );
} }
void show_help_and_exit() { void show_help_and_exit()
{
printf("Wrong parameters\n"); printf("Wrong parameters\n");
exit(1); exit(1);
} }
void print_default_pubsub_topic(int callerRet, const char* msg, size_t len, void* userData) { void print_default_pubsub_topic(int callerRet, const char *msg, size_t len, void *userData)
{
printf("Default pubsub topic: %s\n", msg); printf("Default pubsub topic: %s\n", msg);
signal_cond(); signal_cond();
} }
void print_waku_version(int callerRet, const char* msg, size_t len, void* userData) { void print_waku_version(int callerRet, const char *msg, size_t len, void *userData)
{
printf("Git Version: %s\n", msg); printf("Git Version: %s\n", msg);
signal_cond(); signal_cond();
} }
// Beginning of UI program logic // Beginning of UI program logic
enum PROGRAM_STATE { enum PROGRAM_STATE
{
MAIN_MENU, MAIN_MENU,
SUBSCRIBE_TOPIC_MENU, SUBSCRIBE_TOPIC_MENU,
CONNECT_TO_OTHER_NODE_MENU, CONNECT_TO_OTHER_NODE_MENU,
@ -213,18 +235,21 @@ enum PROGRAM_STATE {
enum PROGRAM_STATE current_state = MAIN_MENU; enum PROGRAM_STATE current_state = MAIN_MENU;
void show_main_menu() { void show_main_menu()
{
printf("\nPlease, select an option:\n"); printf("\nPlease, select an option:\n");
printf("\t1.) Subscribe to topic\n"); printf("\t1.) Subscribe to topic\n");
printf("\t2.) Connect to other node\n"); printf("\t2.) Connect to other node\n");
printf("\t3.) Publish a message\n"); printf("\t3.) Publish a message\n");
} }
void handle_user_input() { void handle_user_input()
{
char cmd[1024]; char cmd[1024];
memset(cmd, 0, 1024); memset(cmd, 0, 1024);
int numRead = read(0, cmd, 1024); int numRead = read(0, cmd, 1024);
if (numRead <= 0) { if (numRead <= 0)
{
return; return;
} }
@ -237,9 +262,9 @@ void handle_user_input() {
scanf("%127s", pubsubTopic); scanf("%127s", pubsubTopic);
WAKU_CALL(waku_relay_subscribe(ctx, WAKU_CALL(waku_relay_subscribe(ctx,
pubsubTopic,
event_handler, event_handler,
userData) ); userData,
pubsubTopic));
printf("The subscription went well\n"); printf("The subscription went well\n");
show_main_menu(); show_main_menu();
@ -247,11 +272,11 @@ void handle_user_input() {
break; break;
case CONNECT_TO_OTHER_NODE_MENU: case CONNECT_TO_OTHER_NODE_MENU:
printf("Connecting to a node. Please indicate the peer Multiaddress:\n"); // printf("Connecting to a node. Please indicate the peer Multiaddress:\n");
printf("e.g.: /ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\n"); // printf("e.g.: /ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\n");
char peerAddr[512]; // char peerAddr[512];
scanf("%511s", peerAddr); // scanf("%511s", peerAddr);
WAKU_CALL(waku_connect(ctx, peerAddr, 10000 /* timeoutMs */, event_handler, userData)); // WAKU_CALL(waku_connect(ctx, peerAddr, 10000 /* timeoutMs */, event_handler, userData));
show_main_menu(); show_main_menu();
break; break;
@ -274,7 +299,8 @@ void handle_user_input() {
// End of UI program logic // End of UI program logic
int main(int argc, char** argv) { int main(int argc, char **argv)
{
struct ConfigNode cfgNode; struct ConfigNode cfgNode;
// default values // default values
snprintf(cfgNode.host, 128, "0.0.0.0"); snprintf(cfgNode.host, 128, "0.0.0.0");
@ -289,8 +315,8 @@ int main(int argc, char** argv) {
cfgNode.storeDbMigration = 0; cfgNode.storeDbMigration = 0;
cfgNode.storeMaxNumDbConnections = 30; cfgNode.storeMaxNumDbConnections = 30;
if (argp_parse(&argp, argc, argv, 0, 0, &cfgNode) if (argp_parse(&argp, argc, argv, 0, 0, &cfgNode) == ARGP_ERR_UNKNOWN)
== ARGP_ERR_UNKNOWN) { {
show_help_and_exit(); show_help_and_exit();
} }
@ -313,7 +339,8 @@ int main(int argc, char** argv) {
\"discv5UdpPort\": 9999, \ \"discv5UdpPort\": 9999, \
\"dnsDiscoveryUrl\": \"enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im\", \ \"dnsDiscoveryUrl\": \"enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im\", \
\"dnsDiscoveryNameServers\": [\"8.8.8.8\", \"1.0.0.1\"] \ \"dnsDiscoveryNameServers\": [\"8.8.8.8\", \"1.0.0.1\"] \
}", cfgNode.host, }",
cfgNode.host,
cfgNode.port, cfgNode.port,
cfgNode.relay ? "true" : "false", cfgNode.relay ? "true" : "false",
cfgNode.store ? "true" : "false", cfgNode.store ? "true" : "false",
@ -330,7 +357,7 @@ int main(int argc, char** argv) {
printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port); printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port);
printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES" : "NO"); printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES" : "NO");
waku_set_event_callback(ctx, on_event_received, userData); set_event_callback(ctx, on_event_received, userData);
waku_start(ctx, event_handler, userData); waku_start(ctx, event_handler, userData);
waitForCallback(); waitForCallback();
@ -338,21 +365,22 @@ int main(int argc, char** argv) {
WAKU_CALL(waku_listen_addresses(ctx, event_handler, userData)); WAKU_CALL(waku_listen_addresses(ctx, event_handler, userData));
WAKU_CALL(waku_relay_subscribe(ctx, WAKU_CALL(waku_relay_subscribe(ctx,
"/waku/2/rs/0/0",
event_handler, event_handler,
userData) ); userData,
"/waku/2/rs/16/32"));
WAKU_CALL(waku_discv5_update_bootnodes(ctx, WAKU_CALL(waku_discv5_update_bootnodes(ctx,
"[\"enr:-QEkuEBIkb8q8_mrorHndoXH9t5N6ZfD-jehQCrYeoJDPHqT0l0wyaONa2-piRQsi3oVKAzDShDVeoQhy0uwN1xbZfPZAYJpZIJ2NIJpcIQiQlleim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmdjLXVzLWNlbnRyYWwxLWEud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwA2Ni9ub2RlLTAxLmdjLXVzLWNlbnRyYWwxLWEud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQKnGt-GSgqPSf3IAPM7bFgTlpczpMZZLF3geeoNNsxzSoN0Y3CCdl-DdWRwgiMohXdha3UyDw\",\"enr:-QEkuEB3WHNS-xA3RDpfu9A2Qycr3bN3u7VoArMEiDIFZJ66F1EB3d4wxZN1hcdcOX-RfuXB-MQauhJGQbpz3qUofOtLAYJpZIJ2NIJpcIQI2SVcim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwA2Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQPK35Nnz0cWUtSAhBp7zvHEhyU_AqeQUlqzLiLxfP2L4oN0Y3CCdl-DdWRwgiMohXdha3UyDw\"]",
event_handler, event_handler,
userData) ); userData,
"[\"enr:-QEkuEBIkb8q8_mrorHndoXH9t5N6ZfD-jehQCrYeoJDPHqT0l0wyaONa2-piRQsi3oVKAzDShDVeoQhy0uwN1xbZfPZAYJpZIJ2NIJpcIQiQlleim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmdjLXVzLWNlbnRyYWwxLWEud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwA2Ni9ub2RlLTAxLmdjLXVzLWNlbnRyYWwxLWEud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQKnGt-GSgqPSf3IAPM7bFgTlpczpMZZLF3geeoNNsxzSoN0Y3CCdl-DdWRwgiMohXdha3UyDw\",\"enr:-QEkuEB3WHNS-xA3RDpfu9A2Qycr3bN3u7VoArMEiDIFZJ66F1EB3d4wxZN1hcdcOX-RfuXB-MQauhJGQbpz3qUofOtLAYJpZIJ2NIJpcIQI2SVcim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwA2Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQPK35Nnz0cWUtSAhBp7zvHEhyU_AqeQUlqzLiLxfP2L4oN0Y3CCdl-DdWRwgiMohXdha3UyDw\"]"));
WAKU_CALL(waku_get_peerids_from_peerstore(ctx, WAKU_CALL(waku_get_peerids_from_peerstore(ctx,
event_handler, event_handler,
userData)); userData));
show_main_menu(); show_main_menu();
while(1) { while (1)
{
handle_user_input(); handle_user_input();
// Uncomment the following if need to test the metrics retrieval // Uncomment the following if need to test the metrics retrieval

View File

@ -21,16 +21,19 @@ pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int callback_executed = 0; int callback_executed = 0;
void waitForCallback() { void waitForCallback()
{
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
while (!callback_executed) { while (!callback_executed)
{
pthread_cond_wait(&cond, &mutex); pthread_cond_wait(&cond, &mutex);
} }
callback_executed = 0; callback_executed = 0;
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
} }
void signal_cond() { void signal_cond()
{
pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex);
callback_executed = 1; callback_executed = 1;
pthread_cond_signal(&cond); pthread_cond_signal(&cond);
@ -38,15 +41,18 @@ void signal_cond() {
} }
#define WAKU_CALL(call) \ #define WAKU_CALL(call) \
do { \ do \
{ \
int ret = call; \ int ret = call; \
if (ret != 0) { \ if (ret != 0) \
{ \
std::cout << "Failed the call to: " << #call << ". Code: " << ret << "\n"; \ std::cout << "Failed the call to: " << #call << ". Code: " << ret << "\n"; \
} \ } \
waitForCallback(); \ waitForCallback(); \
} while (0) } while (0)
struct ConfigNode { struct ConfigNode
{
char host[128]; char host[128];
int port; int port;
char key[128]; char key[128];
@ -65,13 +71,14 @@ static struct argp_option options[] = {
{"relay", 'r', "RELAY", 0, "Enable relay protocol: 1 or 0. (default: 1)"}, {"relay", 'r', "RELAY", 0, "Enable relay protocol: 1 or 0. (default: 1)"},
{"peers", 'a', "PEERS", 0, "Comma-separated list of peer-multiaddress to connect\ {"peers", 'a', "PEERS", 0, "Comma-separated list of peer-multiaddress to connect\
to. (default: \"\") e.g. \"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\""}, to. (default: \"\") e.g. \"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN\""},
{ 0 } {0}};
};
static error_t parse_opt(int key, char *arg, struct argp_state *state) { static error_t parse_opt(int key, char *arg, struct argp_state *state)
{
struct ConfigNode *cfgNode = (ConfigNode *)state->input; struct ConfigNode *cfgNode = (ConfigNode *)state->input;
switch (key) { switch (key)
{
case 'h': case 'h':
snprintf(cfgNode->host, 128, "%s", arg); snprintf(cfgNode->host, 128, "%s", arg);
break; break;
@ -100,19 +107,23 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
return 0; return 0;
} }
void event_handler(const char* msg, size_t len) { void event_handler(const char *msg, size_t len)
{
printf("Receiving event: %s\n", msg); printf("Receiving event: %s\n", msg);
} }
void handle_error(const char* msg, size_t len) { void handle_error(const char *msg, size_t len)
{
printf("handle_error: %s\n", msg); printf("handle_error: %s\n", msg);
exit(1); exit(1);
} }
template <class F> template <class F>
auto cify(F&& f) { auto cify(F &&f)
{
static F fn = std::forward<F>(f); static F fn = std::forward<F>(f);
return [](int callerRet, const char* msg, size_t len, void* userData) { return [](int callerRet, const char *msg, size_t len, void *userData)
{
signal_cond(); signal_cond();
return fn(msg, len); return fn(msg, len);
}; };
@ -122,7 +133,8 @@ static struct argp argp = { options, parse_opt, args_doc, doc, 0, 0, 0 };
// Beginning of UI program logic // Beginning of UI program logic
enum PROGRAM_STATE { enum PROGRAM_STATE
{
MAIN_MENU, MAIN_MENU,
SUBSCRIBE_TOPIC_MENU, SUBSCRIBE_TOPIC_MENU,
CONNECT_TO_OTHER_NODE_MENU, CONNECT_TO_OTHER_NODE_MENU,
@ -131,18 +143,21 @@ enum PROGRAM_STATE {
enum PROGRAM_STATE current_state = MAIN_MENU; enum PROGRAM_STATE current_state = MAIN_MENU;
void show_main_menu() { void show_main_menu()
{
printf("\nPlease, select an option:\n"); printf("\nPlease, select an option:\n");
printf("\t1.) Subscribe to topic\n"); printf("\t1.) Subscribe to topic\n");
printf("\t2.) Connect to other node\n"); printf("\t2.) Connect to other node\n");
printf("\t3.) Publish a message\n"); printf("\t3.) Publish a message\n");
} }
void handle_user_input(void* ctx) { void handle_user_input(void *ctx)
{
char cmd[1024]; char cmd[1024];
memset(cmd, 0, 1024); memset(cmd, 0, 1024);
int numRead = read(0, cmd, 1024); int numRead = read(0, cmd, 1024);
if (numRead <= 0) { if (numRead <= 0)
{
return; return;
} }
@ -155,11 +170,10 @@ void handle_user_input(void* ctx) {
scanf("%127s", pubsubTopic); scanf("%127s", pubsubTopic);
WAKU_CALL(waku_relay_subscribe(ctx, WAKU_CALL(waku_relay_subscribe(ctx,
pubsubTopic, cify([&](const char *msg, size_t len)
cify([&](const char* msg, size_t len) { { event_handler(msg, len); }),
event_handler(msg, len); nullptr,
}), pubsubTopic));
nullptr) );
printf("The subscription went well\n"); printf("The subscription went well\n");
show_main_menu(); show_main_menu();
@ -172,12 +186,11 @@ void handle_user_input(void* ctx) {
char peerAddr[512]; char peerAddr[512];
scanf("%511s", peerAddr); scanf("%511s", peerAddr);
WAKU_CALL(waku_connect(ctx, WAKU_CALL(waku_connect(ctx,
cify([&](const char *msg, size_t len)
{ event_handler(msg, len); }),
nullptr,
peerAddr, peerAddr,
10000 /* timeoutMs */, 10000 /* timeoutMs */));
cify([&](const char* msg, size_t len) {
event_handler(msg, len);
}),
nullptr));
show_main_menu(); show_main_menu();
break; break;
@ -193,14 +206,13 @@ void handle_user_input(void* ctx) {
std::string contentTopic; std::string contentTopic;
waku_content_topic(ctx, waku_content_topic(ctx,
cify([&contentTopic](const char *msg, size_t len)
{ contentTopic = msg; }),
nullptr,
"appName", "appName",
1, 1,
"contentTopicName", "contentTopicName",
"encoding", "encoding");
cify([&contentTopic](const char* msg, size_t len) {
contentTopic = msg;
}),
nullptr);
snprintf(jsonWakuMsg, snprintf(jsonWakuMsg,
2048, 2048,
@ -208,13 +220,12 @@ void handle_user_input(void* ctx) {
msgPayload.data(), contentTopic.c_str()); msgPayload.data(), contentTopic.c_str());
WAKU_CALL(waku_relay_publish(ctx, WAKU_CALL(waku_relay_publish(ctx,
cify([&](const char *msg, size_t len)
{ event_handler(msg, len); }),
nullptr,
"/waku/2/rs/16/32", "/waku/2/rs/16/32",
jsonWakuMsg, jsonWakuMsg,
10000 /*timeout ms*/, 10000 /*timeout ms*/));
cify([&](const char* msg, size_t len) {
event_handler(msg, len);
}),
nullptr) );
show_main_menu(); show_main_menu();
} }
@ -227,12 +238,14 @@ void handle_user_input(void* ctx) {
// End of UI program logic // End of UI program logic
void show_help_and_exit() { void show_help_and_exit()
{
printf("Wrong parameters\n"); printf("Wrong parameters\n");
exit(1); exit(1);
} }
int main(int argc, char** argv) { int main(int argc, char **argv)
{
struct ConfigNode cfgNode; struct ConfigNode cfgNode;
// default values // default values
snprintf(cfgNode.host, 128, "0.0.0.0"); snprintf(cfgNode.host, 128, "0.0.0.0");
@ -241,8 +254,8 @@ int main(int argc, char** argv) {
cfgNode.port = 60000; cfgNode.port = 60000;
cfgNode.relay = 1; cfgNode.relay = 1;
if (argp_parse(&argp, argc, argv, 0, 0, &cfgNode) if (argp_parse(&argp, argc, argv, 0, 0, &cfgNode) == ARGP_ERR_UNKNOWN)
== ARGP_ERR_UNKNOWN) { {
show_help_and_exit(); show_help_and_exit();
} }
@ -260,17 +273,15 @@ int main(int argc, char** argv) {
\"discv5UdpPort\": 9999, \ \"discv5UdpPort\": 9999, \
\"dnsDiscoveryUrl\": \"enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im\", \ \"dnsDiscoveryUrl\": \"enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im\", \
\"dnsDiscoveryNameServers\": [\"8.8.8.8\", \"1.0.0.1\"] \ \"dnsDiscoveryNameServers\": [\"8.8.8.8\", \"1.0.0.1\"] \
}", cfgNode.host, }",
cfgNode.host,
cfgNode.port); cfgNode.port);
void *ctx = void *ctx =
waku_new(jsonConfig, waku_new(jsonConfig,
cify([](const char* msg, size_t len) { cify([](const char *msg, size_t len)
std::cout << "waku_new feedback: " << msg << std::endl; { std::cout << "waku_new feedback: " << msg << std::endl; }),
} nullptr);
),
nullptr
);
waitForCallback(); waitForCallback();
// example on how to retrieve a value from the `libwaku` callback. // example on how to retrieve a value from the `libwaku` callback.
@ -278,18 +289,15 @@ int main(int argc, char** argv) {
WAKU_CALL( WAKU_CALL(
waku_default_pubsub_topic( waku_default_pubsub_topic(
ctx, ctx,
cify([&defaultPubsubTopic](const char* msg, size_t len) { cify([&defaultPubsubTopic](const char *msg, size_t len)
defaultPubsubTopic = msg; { defaultPubsubTopic = msg; }),
}
),
nullptr)); nullptr));
std::cout << "Default pubsub topic: " << defaultPubsubTopic << std::endl; std::cout << "Default pubsub topic: " << defaultPubsubTopic << std::endl;
WAKU_CALL(waku_version(ctx, WAKU_CALL(waku_version(ctx,
cify([&](const char* msg, size_t len) { cify([&](const char *msg, size_t len)
std::cout << "Git Version: " << msg << std::endl; { std::cout << "Git Version: " << msg << std::endl; }),
}),
nullptr)); nullptr));
printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port); printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port);
@ -297,35 +305,32 @@ int main(int argc, char** argv) {
std::string pubsubTopic; std::string pubsubTopic;
WAKU_CALL(waku_pubsub_topic(ctx, WAKU_CALL(waku_pubsub_topic(ctx,
"example", cify([&](const char *msg, size_t len)
cify([&](const char* msg, size_t len) { { pubsubTopic = msg; }),
pubsubTopic = msg; nullptr,
}), "example"));
nullptr));
std::cout << "Custom pubsub topic: " << pubsubTopic << std::endl; std::cout << "Custom pubsub topic: " << pubsubTopic << std::endl;
waku_set_event_callback(ctx, set_event_callback(ctx,
cify([&](const char* msg, size_t len) { cify([&](const char *msg, size_t len)
event_handler(msg, len); { event_handler(msg, len); }),
}),
nullptr); nullptr);
WAKU_CALL(waku_start(ctx, WAKU_CALL(waku_start(ctx,
cify([&](const char* msg, size_t len) { cify([&](const char *msg, size_t len)
event_handler(msg, len); { event_handler(msg, len); }),
}),
nullptr)); nullptr));
WAKU_CALL(waku_relay_subscribe(ctx, WAKU_CALL(waku_relay_subscribe(ctx,
defaultPubsubTopic.c_str(), cify([&](const char *msg, size_t len)
cify([&](const char* msg, size_t len) { { event_handler(msg, len); }),
event_handler(msg, len); nullptr,
}), defaultPubsubTopic.c_str()));
nullptr) );
show_main_menu(); show_main_menu();
while(1) { while (1)
{
handle_user_input(ctx); handle_user_input(ctx);
} }
} }

View File

@ -71,32 +71,32 @@ package main
static void* cGoWakuNew(const char* configJson, void* resp) { static void* cGoWakuNew(const char* configJson, void* resp) {
// We pass NULL because we are not interested in retrieving data from this callback // We pass NULL because we are not interested in retrieving data from this callback
void* ret = waku_new(configJson, (WakuCallBack) callback, resp); void* ret = waku_new(configJson, (FFICallBack) callback, resp);
return ret; return ret;
} }
static void cGoWakuStart(void* wakuCtx, void* resp) { static void cGoWakuStart(void* wakuCtx, void* resp) {
WAKU_CALL(waku_start(wakuCtx, (WakuCallBack) callback, resp)); WAKU_CALL(waku_start(wakuCtx, (FFICallBack) callback, resp));
} }
static void cGoWakuStop(void* wakuCtx, void* resp) { static void cGoWakuStop(void* wakuCtx, void* resp) {
WAKU_CALL(waku_stop(wakuCtx, (WakuCallBack) callback, resp)); WAKU_CALL(waku_stop(wakuCtx, (FFICallBack) callback, resp));
} }
static void cGoWakuDestroy(void* wakuCtx, void* resp) { static void cGoWakuDestroy(void* wakuCtx, void* resp) {
WAKU_CALL(waku_destroy(wakuCtx, (WakuCallBack) callback, resp)); WAKU_CALL(waku_destroy(wakuCtx, (FFICallBack) callback, resp));
} }
static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) { static void cGoWakuStartDiscV5(void* wakuCtx, void* resp) {
WAKU_CALL(waku_start_discv5(wakuCtx, (WakuCallBack) callback, resp)); WAKU_CALL(waku_start_discv5(wakuCtx, (FFICallBack) callback, resp));
} }
static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) { static void cGoWakuStopDiscV5(void* wakuCtx, void* resp) {
WAKU_CALL(waku_stop_discv5(wakuCtx, (WakuCallBack) callback, resp)); WAKU_CALL(waku_stop_discv5(wakuCtx, (FFICallBack) callback, resp));
} }
static void cGoWakuVersion(void* wakuCtx, void* resp) { static void cGoWakuVersion(void* wakuCtx, void* resp) {
WAKU_CALL(waku_version(wakuCtx, (WakuCallBack) callback, resp)); WAKU_CALL(waku_version(wakuCtx, (FFICallBack) callback, resp));
} }
static void cGoWakuSetEventCallback(void* wakuCtx) { static void cGoWakuSetEventCallback(void* wakuCtx) {
@ -112,7 +112,7 @@ package main
// This technique is needed because cgo only allows to export Go functions and not methods. // This technique is needed because cgo only allows to export Go functions and not methods.
waku_set_event_callback(wakuCtx, (WakuCallBack) globalEventCallback, wakuCtx); set_event_callback(wakuCtx, (FFICallBack) globalEventCallback, wakuCtx);
} }
static void cGoWakuContentTopic(void* wakuCtx, static void cGoWakuContentTopic(void* wakuCtx,
@ -123,20 +123,21 @@ package main
void* resp) { void* resp) {
WAKU_CALL( waku_content_topic(wakuCtx, WAKU_CALL( waku_content_topic(wakuCtx,
(FFICallBack) callback,
resp,
appName, appName,
appVersion, appVersion,
contentTopicName, contentTopicName,
encoding, encoding
(WakuCallBack) callback, ) );
resp) );
} }
static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) { static void cGoWakuPubsubTopic(void* wakuCtx, char* topicName, void* resp) {
WAKU_CALL( waku_pubsub_topic(wakuCtx, topicName, (WakuCallBack) callback, resp) ); WAKU_CALL( waku_pubsub_topic(wakuCtx, (FFICallBack) callback, resp, topicName) );
} }
static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) { static void cGoWakuDefaultPubsubTopic(void* wakuCtx, void* resp) {
WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (WakuCallBack) callback, resp)); WAKU_CALL (waku_default_pubsub_topic(wakuCtx, (FFICallBack) callback, resp));
} }
static void cGoWakuRelayPublish(void* wakuCtx, static void cGoWakuRelayPublish(void* wakuCtx,
@ -146,34 +147,36 @@ package main
void* resp) { void* resp) {
WAKU_CALL (waku_relay_publish(wakuCtx, WAKU_CALL (waku_relay_publish(wakuCtx,
(FFICallBack) callback,
resp,
pubSubTopic, pubSubTopic,
jsonWakuMessage, jsonWakuMessage,
timeoutMs, timeoutMs
(WakuCallBack) callback, ));
resp));
} }
static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { static void cGoWakuRelaySubscribe(void* wakuCtx, char* pubSubTopic, void* resp) {
WAKU_CALL ( waku_relay_subscribe(wakuCtx, WAKU_CALL ( waku_relay_subscribe(wakuCtx,
pubSubTopic, (FFICallBack) callback,
(WakuCallBack) callback, resp,
resp) ); pubSubTopic) );
} }
static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) { static void cGoWakuRelayUnsubscribe(void* wakuCtx, char* pubSubTopic, void* resp) {
WAKU_CALL ( waku_relay_unsubscribe(wakuCtx, WAKU_CALL ( waku_relay_unsubscribe(wakuCtx,
pubSubTopic, (FFICallBack) callback,
(WakuCallBack) callback, resp,
resp) ); pubSubTopic) );
} }
static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) { static void cGoWakuConnect(void* wakuCtx, char* peerMultiAddr, int timeoutMs, void* resp) {
WAKU_CALL( waku_connect(wakuCtx, WAKU_CALL( waku_connect(wakuCtx,
(FFICallBack) callback,
resp,
peerMultiAddr, peerMultiAddr,
timeoutMs, timeoutMs
(WakuCallBack) callback, ) );
resp) );
} }
static void cGoWakuDialPeerById(void* wakuCtx, static void cGoWakuDialPeerById(void* wakuCtx,
@ -183,42 +186,44 @@ package main
void* resp) { void* resp) {
WAKU_CALL( waku_dial_peer_by_id(wakuCtx, WAKU_CALL( waku_dial_peer_by_id(wakuCtx,
(FFICallBack) callback,
resp,
peerId, peerId,
protocol, protocol,
timeoutMs, timeoutMs
(WakuCallBack) callback, ) );
resp) );
} }
static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) { static void cGoWakuDisconnectPeerById(void* wakuCtx, char* peerId, void* resp) {
WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx, WAKU_CALL( waku_disconnect_peer_by_id(wakuCtx,
peerId, (FFICallBack) callback,
(WakuCallBack) callback, resp,
resp) ); peerId
) );
} }
static void cGoWakuListenAddresses(void* wakuCtx, void* resp) { static void cGoWakuListenAddresses(void* wakuCtx, void* resp) {
WAKU_CALL (waku_listen_addresses(wakuCtx, (WakuCallBack) callback, resp) ); WAKU_CALL (waku_listen_addresses(wakuCtx, (FFICallBack) callback, resp) );
} }
static void cGoWakuGetMyENR(void* ctx, void* resp) { static void cGoWakuGetMyENR(void* ctx, void* resp) {
WAKU_CALL (waku_get_my_enr(ctx, (WakuCallBack) callback, resp) ); WAKU_CALL (waku_get_my_enr(ctx, (FFICallBack) callback, resp) );
} }
static void cGoWakuGetMyPeerId(void* ctx, void* resp) { static void cGoWakuGetMyPeerId(void* ctx, void* resp) {
WAKU_CALL (waku_get_my_peerid(ctx, (WakuCallBack) callback, resp) ); WAKU_CALL (waku_get_my_peerid(ctx, (FFICallBack) callback, resp) );
} }
static void cGoWakuListPeersInMesh(void* ctx, char* pubSubTopic, void* resp) { static void cGoWakuListPeersInMesh(void* ctx, char* pubSubTopic, void* resp) {
WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); WAKU_CALL (waku_relay_get_num_peers_in_mesh(ctx, (FFICallBack) callback, resp, pubSubTopic) );
} }
static void cGoWakuGetNumConnectedPeers(void* ctx, char* pubSubTopic, void* resp) { static void cGoWakuGetNumConnectedPeers(void* ctx, char* pubSubTopic, void* resp) {
WAKU_CALL (waku_relay_get_num_connected_peers(ctx, pubSubTopic, (WakuCallBack) callback, resp) ); WAKU_CALL (waku_relay_get_num_connected_peers(ctx, (FFICallBack) callback, resp, pubSubTopic) );
} }
static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) { static void cGoWakuGetPeerIdsFromPeerStore(void* wakuCtx, void* resp) {
WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (WakuCallBack) callback, resp) ); WAKU_CALL (waku_get_peerids_from_peerstore(wakuCtx, (FFICallBack) callback, resp) );
} }
static void cGoWakuLightpushPublish(void* wakuCtx, static void cGoWakuLightpushPublish(void* wakuCtx,
@ -227,10 +232,11 @@ package main
void* resp) { void* resp) {
WAKU_CALL (waku_lightpush_publish(wakuCtx, WAKU_CALL (waku_lightpush_publish(wakuCtx,
(FFICallBack) callback,
resp,
pubSubTopic, pubSubTopic,
jsonWakuMessage, jsonWakuMessage
(WakuCallBack) callback, ));
resp));
} }
static void cGoWakuStoreQuery(void* wakuCtx, static void cGoWakuStoreQuery(void* wakuCtx,
@ -240,11 +246,12 @@ package main
void* resp) { void* resp) {
WAKU_CALL (waku_store_query(wakuCtx, WAKU_CALL (waku_store_query(wakuCtx,
(FFICallBack) callback,
resp,
jsonQuery, jsonQuery,
peerAddr, peerAddr,
timeoutMs, timeoutMs
(WakuCallBack) callback, ));
resp));
} }
static void cGoWakuPeerExchangeQuery(void* wakuCtx, static void cGoWakuPeerExchangeQuery(void* wakuCtx,
@ -252,9 +259,10 @@ package main
void* resp) { void* resp) {
WAKU_CALL (waku_peer_exchange_request(wakuCtx, WAKU_CALL (waku_peer_exchange_request(wakuCtx,
numPeers, (FFICallBack) callback,
(WakuCallBack) callback, resp,
resp)); numPeers
));
} }
static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx, static void cGoWakuGetPeerIdsByProtocol(void* wakuCtx,
@ -262,9 +270,10 @@ package main
void* resp) { void* resp) {
WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx, WAKU_CALL (waku_get_peerids_by_protocol(wakuCtx,
protocol, (FFICallBack) callback,
(WakuCallBack) callback, resp,
resp)); protocol
));
} }
*/ */

View File

@ -102,8 +102,8 @@ print("Waku Relay enabled: {}".format(args.relay))
# Set the event callback # Set the event callback
callback = callback_type(handle_event) # This line is important so that the callback is not gc'ed callback = callback_type(handle_event) # This line is important so that the callback is not gc'ed
libwaku.waku_set_event_callback.argtypes = [callback_type, ctypes.c_void_p] libwaku.set_event_callback.argtypes = [callback_type, ctypes.c_void_p]
libwaku.waku_set_event_callback(callback, ctypes.c_void_p(0)) libwaku.set_event_callback(callback, ctypes.c_void_p(0))
# Start the node # Start the node
libwaku.waku_start.argtypes = [ctypes.c_void_p, libwaku.waku_start.argtypes = [ctypes.c_void_p,
@ -117,32 +117,32 @@ libwaku.waku_start(ctx,
# Subscribe to the default pubsub topic # Subscribe to the default pubsub topic
libwaku.waku_relay_subscribe.argtypes = [ctypes.c_void_p, libwaku.waku_relay_subscribe.argtypes = [ctypes.c_void_p,
ctypes.c_char_p,
callback_type, callback_type,
ctypes.c_void_p] ctypes.c_void_p,
ctypes.c_char_p]
libwaku.waku_relay_subscribe(ctx, libwaku.waku_relay_subscribe(ctx,
default_pubsub_topic.encode('utf-8'),
callback_type( callback_type(
#onErrCb #onErrCb
lambda ret, msg, len: lambda ret, msg, len:
print("Error calling waku_relay_subscribe: %s" % print("Error calling waku_relay_subscribe: %s" %
msg.decode('utf-8')) msg.decode('utf-8'))
), ),
ctypes.c_void_p(0)) ctypes.c_void_p(0),
default_pubsub_topic.encode('utf-8'))
libwaku.waku_connect.argtypes = [ctypes.c_void_p, libwaku.waku_connect.argtypes = [ctypes.c_void_p,
ctypes.c_char_p,
ctypes.c_int,
callback_type, callback_type,
ctypes.c_void_p] ctypes.c_void_p,
ctypes.c_char_p,
ctypes.c_int]
libwaku.waku_connect(ctx, libwaku.waku_connect(ctx,
args.peer.encode('utf-8'),
10000,
# onErrCb # onErrCb
callback_type( callback_type(
lambda ret, msg, len: lambda ret, msg, len:
print("Error calling waku_connect: %s" % msg.decode('utf-8'))), print("Error calling waku_connect: %s" % msg.decode('utf-8'))),
ctypes.c_void_p(0)) ctypes.c_void_p(0),
args.peer.encode('utf-8'),
10000)
# app = Flask(__name__) # app = Flask(__name__)
# @app.route("/") # @app.route("/")

View File

@ -27,7 +27,7 @@ public:
void initialize(const QString& jsonConfig, WakuCallBack event_handler, void* userData) { void initialize(const QString& jsonConfig, WakuCallBack event_handler, void* userData) {
ctx = waku_new(jsonConfig.toUtf8().constData(), WakuCallBack(event_handler), userData); ctx = waku_new(jsonConfig.toUtf8().constData(), WakuCallBack(event_handler), userData);
waku_set_event_callback(ctx, on_event_received, userData); set_event_callback(ctx, on_event_received, userData);
qDebug() << "Waku context initialized, ready to start."; qDebug() << "Waku context initialized, ready to start.";
} }

View File

@ -3,22 +3,22 @@ use std::ffi::CString;
use std::os::raw::{c_char, c_int, c_void}; use std::os::raw::{c_char, c_int, c_void};
use std::{slice, thread, time}; use std::{slice, thread, time};
pub type WakuCallback = unsafe extern "C" fn(c_int, *const c_char, usize, *const c_void); pub type FFICallBack = unsafe extern "C" fn(c_int, *const c_char, usize, *const c_void);
extern "C" { extern "C" {
pub fn waku_new( pub fn waku_new(
config_json: *const u8, config_json: *const u8,
cb: WakuCallback, cb: FFICallBack,
user_data: *const c_void, user_data: *const c_void,
) -> *mut c_void; ) -> *mut c_void;
pub fn waku_version(ctx: *const c_void, cb: WakuCallback, user_data: *const c_void) -> c_int; pub fn waku_version(ctx: *const c_void, cb: FFICallBack, user_data: *const c_void) -> c_int;
pub fn waku_start(ctx: *const c_void, cb: WakuCallback, user_data: *const c_void) -> c_int; pub fn waku_start(ctx: *const c_void, cb: FFICallBack, user_data: *const c_void) -> c_int;
pub fn waku_default_pubsub_topic( pub fn waku_default_pubsub_topic(
ctx: *mut c_void, ctx: *mut c_void,
cb: WakuCallback, cb: FFICallBack,
user_data: *const c_void, user_data: *const c_void,
) -> *mut c_void; ) -> *mut c_void;
} }
@ -40,7 +40,7 @@ pub unsafe extern "C" fn trampoline<C>(
closure(return_val, &buffer_utf8); closure(return_val, &buffer_utf8);
} }
pub fn get_trampoline<C>(_closure: &C) -> WakuCallback pub fn get_trampoline<C>(_closure: &C) -> FFICallBack
where where
C: FnMut(i32, &str), C: FnMut(i32, &str),
{ {

View File

@ -1,42 +0,0 @@
## Can be shared safely between threads
type SharedSeq*[T] = tuple[data: ptr UncheckedArray[T], len: int]
proc alloc*(str: cstring): cstring =
# Byte allocation from the given address.
# There should be the corresponding manual deallocation with deallocShared !
if str.isNil():
var ret = cast[cstring](allocShared(1)) # Allocate memory for the null terminator
ret[0] = '\0' # Set the null terminator
return ret
let ret = cast[cstring](allocShared(len(str) + 1))
copyMem(ret, str, len(str) + 1)
return ret
proc alloc*(str: string): cstring =
## Byte allocation from the given address.
## There should be the corresponding manual deallocation with deallocShared !
var ret = cast[cstring](allocShared(str.len + 1))
let s = cast[seq[char]](str)
for i in 0 ..< str.len:
ret[i] = s[i]
ret[str.len] = '\0'
return ret
proc allocSharedSeq*[T](s: seq[T]): SharedSeq[T] =
let data = allocShared(sizeof(T) * s.len)
if s.len != 0:
copyMem(data, unsafeAddr s[0], s.len)
return (cast[ptr UncheckedArray[T]](data), s.len)
proc deallocSharedSeq*[T](s: var SharedSeq[T]) =
deallocShared(s.data)
s.len = 0
proc toSeq*[T](s: SharedSeq[T]): seq[T] =
## Creates a seq[T] from a SharedSeq[T]. No explicit dealloc is required
## as req[T] is a GC managed type.
var ret = newSeq[T]()
for i in 0 ..< s.len:
ret.add(s.data[i])
return ret

10
library/declare_lib.nim Normal file
View File

@ -0,0 +1,10 @@
import ffi
import waku/factory/waku
declareLibrary("waku")
proc set_event_callback(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.dynlib, exportc, cdecl.} =
ctx[].eventCallback = cast[pointer](callback)
ctx[].eventUserData = userData

View File

@ -1,9 +0,0 @@
import system, std/json, ./json_base_event
type JsonWakuNotRespondingEvent* = ref object of JsonEvent
proc new*(T: type JsonWakuNotRespondingEvent): T =
return JsonWakuNotRespondingEvent(eventType: "waku_not_responding")
method `$`*(event: JsonWakuNotRespondingEvent): string =
$(%*event)

View File

@ -1,30 +0,0 @@
################################################################################
### Exported types
type WakuCallBack* = proc(
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].}
const RET_OK*: cint = 0
const RET_ERR*: cint = 1
const RET_MISSING_CALLBACK*: cint = 2
### End of exported types
################################################################################
################################################################################
### FFI utils
template foreignThreadGc*(body: untyped) =
when declared(setupForeignThreadGc):
setupForeignThreadGc()
body
when declared(tearDownForeignThreadGc):
tearDownForeignThreadGc()
type onDone* = proc()
### End of FFI utils
################################################################################

View File

@ -0,0 +1,49 @@
import std/json
import
chronicles,
chronos,
results,
eth/p2p/discoveryv5/enr,
strutils,
libp2p/peerid,
metrics,
ffi
import waku/factory/waku, waku/node/waku_node, waku/node/health_monitor, library/declare_lib
proc getMultiaddresses(node: WakuNode): seq[string] =
return node.info().listenAddresses
proc getMetrics(): string =
{.gcsafe.}:
return defaultRegistry.toText() ## defaultRegistry is {.global.} in metrics module
proc waku_version(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok(WakuNodeVersionString)
proc waku_listen_addresses(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
## returns a comma-separated string of the listen addresses
return ok(ctx.myLib[].node.getMultiaddresses().join(","))
proc waku_get_my_enr(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok(ctx.myLib[].node.enr.toURI())
proc waku_get_my_peerid(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok($ctx.myLib[].node.peerId())
proc waku_get_metrics(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok(getMetrics())
proc waku_is_online(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
return ok($ctx.myLib[].healthMonitor.onlineMonitor.amIOnline())

View File

@ -0,0 +1,96 @@
import std/json
import chronos, chronicles, results, strutils, libp2p/multiaddress, ffi
import
waku/factory/waku,
waku/discovery/waku_dnsdisc,
waku/discovery/waku_discv5,
waku/waku_core/peers,
waku/node/waku_node,
waku/node/kernel_api,
library/declare_lib
proc retrieveBootstrapNodes(
enrTreeUrl: string, ipDnsServer: string
): Future[Result[seq[string], string]] {.async.} =
let dnsNameServers = @[parseIpAddress(ipDnsServer)]
let discoveredPeers: seq[RemotePeerInfo] = (
await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers)
).valueOr:
return err("failed discovering peers from DNS: " & $error)
var multiAddresses = newSeq[string]()
for discPeer in discoveredPeers:
for address in discPeer.addrs:
multiAddresses.add($address & "/p2p/" & $discPeer)
return ok(multiAddresses)
proc updateDiscv5BootstrapNodes(nodes: string, waku: Waku): Result[void, string] =
waku.wakuDiscv5.updateBootstrapRecords(nodes).isOkOr:
return err("error in updateDiscv5BootstrapNodes: " & $error)
return ok()
proc performPeerExchangeRequestTo*(
numPeers: uint64, waku: Waku
): Future[Result[int, string]] {.async.} =
let numPeersRecv = (await waku.node.fetchPeerExchangePeers(numPeers)).valueOr:
return err($error)
return ok(numPeersRecv)
proc waku_discv5_update_bootnodes(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
bootnodes: cstring,
) {.ffi.} =
## Updates the bootnode list used for discovering new peers via DiscoveryV5
## bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
updateDiscv5BootstrapNodes($bootnodes, ctx.myLib[]).isOkOr:
error "UPDATE_DISCV5_BOOTSTRAP_NODES failed", error = error
return err($error)
return ok("discovery request processed correctly")
proc waku_dns_discovery(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
enrTreeUrl: cstring,
nameDnsServer: cstring,
timeoutMs: cint,
) {.ffi.} =
let nodes = (await retrieveBootstrapNodes($enrTreeUrl, $nameDnsServer)).valueOr:
error "GET_BOOTSTRAP_NODES failed", error = error
return err($error)
## returns a comma-separated string of bootstrap nodes' multiaddresses
return ok(nodes.join(","))
proc waku_start_discv5(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
(await ctx.myLib[].wakuDiscv5.start()).isOkOr:
error "START_DISCV5 failed", error = error
return err("error starting discv5: " & $error)
return ok("discv5 started correctly")
proc waku_stop_discv5(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
await ctx.myLib[].wakuDiscv5.stop()
return ok("discv5 stopped correctly")
proc waku_peer_exchange_request(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
numPeers: uint64,
) {.ffi.} =
let numValidPeers = (await performPeerExchangeRequestTo(numPeers, ctx.myLib[])).valueOr:
error "waku_peer_exchange_request failed", error = error
return err("failed peer exchange: " & $error)
return ok($numValidPeers)

View File

@ -1,43 +1,14 @@
import std/[options, json, strutils, net] import std/[options, json, strutils, net]
import chronos, chronicles, results, confutils, confutils/std/net import chronos, chronicles, results, confutils, confutils/std/net, ffi
import import
waku/node/peer_manager/peer_manager, waku/node/peer_manager/peer_manager,
tools/confutils/cli_args, tools/confutils/cli_args,
waku/factory/waku, waku/factory/waku,
waku/factory/node_factory, waku/factory/node_factory,
waku/factory/networks_config,
waku/factory/app_callbacks, waku/factory/app_callbacks,
waku/rest_api/endpoint/builder waku/rest_api/endpoint/builder,
library/declare_lib
import
../../alloc
type NodeLifecycleMsgType* = enum
CREATE_NODE
START_NODE
STOP_NODE
type NodeLifecycleRequest* = object
operation: NodeLifecycleMsgType
configJson: cstring ## Only used in 'CREATE_NODE' operation
appCallbacks: AppCallbacks
proc createShared*(
T: type NodeLifecycleRequest,
op: NodeLifecycleMsgType,
configJson: cstring = "",
appCallbacks: AppCallbacks = nil,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].appCallbacks = appCallbacks
ret[].configJson = configJson.alloc()
return ret
proc destroyShared(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)
proc createWaku( proc createWaku(
configJson: cstring, appCallbacks: AppCallbacks = nil configJson: cstring, appCallbacks: AppCallbacks = nil
@ -87,26 +58,30 @@ proc createWaku(
return ok(wakuRes) return ok(wakuRes)
proc process*( registerReqFFI(CreateNodeRequest, ctx: ptr FFIContext[Waku]):
self: ptr NodeLifecycleRequest, waku: ptr Waku proc(
configJson: cstring, appCallbacks: AppCallbacks
): Future[Result[string, string]] {.async.} = ): Future[Result[string, string]] {.async.} =
defer: ctx.myLib[] = (await createWaku(configJson, cast[AppCallbacks](appCallbacks))).valueOr:
destroyShared(self) error "CreateNodeRequest failed", error = error
case self.operation
of CREATE_NODE:
waku[] = (await createWaku(self.configJson, self.appCallbacks)).valueOr:
error "CREATE_NODE failed", error = error
return err($error) return err($error)
of START_NODE:
(await waku.startWaku()).isOkOr:
error "START_NODE failed", error = error
return err($error)
of STOP_NODE:
try:
await waku[].stop()
except Exception:
error "STOP_NODE failed", error = getCurrentExceptionMsg()
return err(getCurrentExceptionMsg())
return ok("") return ok("")
proc waku_start(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
(await startWaku(ctx[].myLib)).isOkOr:
error "START_NODE failed", error = error
return err("failed to start: " & $error)
return ok("")
proc waku_stop(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
try:
await ctx.myLib[].stop()
except Exception as exc:
error "STOP_NODE failed", error = exc.msg
return err("failed to stop: " & exc.msg)
return ok("")

View File

@ -0,0 +1,123 @@
import std/[sequtils, strutils, tables]
import chronicles, chronos, results, options, json, ffi
import waku/factory/waku, waku/node/waku_node, waku/node/peer_manager, ../declare_lib
type PeerInfo = object
protocols: seq[string]
addresses: seq[string]
proc waku_get_peerids_from_peerstore(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
## returns a comma-separated string of peerIDs
let peerIDs =
ctx.myLib[].node.peerManager.switch.peerStore.peers().mapIt($it.peerId).join(",")
return ok(peerIDs)
proc waku_connect(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
peerMultiAddr: cstring,
timeoutMs: cuint,
) {.ffi.} =
let peers = ($peerMultiAddr).split(",").mapIt(strip(it))
await ctx.myLib[].node.connectToNodes(peers, source = "static")
return ok("")
proc waku_disconnect_peer_by_id(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer, peerId: cstring
) {.ffi.} =
let pId = PeerId.init($peerId).valueOr:
error "DISCONNECT_PEER_BY_ID failed", error = $error
return err($error)
await ctx.myLib[].node.peerManager.disconnectNode(pId)
return ok("")
proc waku_disconnect_all_peers(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
await ctx.myLib[].node.peerManager.disconnectAllPeers()
return ok("")
proc waku_dial_peer(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
peerMultiAddr: cstring,
protocol: cstring,
timeoutMs: cuint,
) {.ffi.} =
let remotePeerInfo = parsePeerInfo($peerMultiAddr).valueOr:
error "DIAL_PEER failed", error = $error
return err($error)
let conn = await ctx.myLib[].node.peerManager.dialPeer(remotePeerInfo, $protocol)
if conn.isNone():
let msg = "failed dialing peer"
error "DIAL_PEER failed", error = msg, peerId = $remotePeerInfo.peerId
return err(msg)
return ok("")
proc waku_dial_peer_by_id(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
peerId: cstring,
protocol: cstring,
timeoutMs: cuint,
) {.ffi.} =
let pId = PeerId.init($peerId).valueOr:
error "DIAL_PEER_BY_ID failed", error = $error
return err($error)
let conn = await ctx.myLib[].node.peerManager.dialPeer(pId, $protocol)
if conn.isNone():
let msg = "failed dialing peer"
error "DIAL_PEER_BY_ID failed", error = msg, peerId = $peerId
return err(msg)
return ok("")
proc waku_get_connected_peers_info(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
## returns a JSON string mapping peerIDs to objects with protocols and addresses
var peersMap = initTable[string, PeerInfo]()
let peers = ctx.myLib[].node.peerManager.switch.peerStore.peers().filterIt(
it.connectedness == Connected
)
# Build a map of peer IDs to peer info objects
for peer in peers:
let peerIdStr = $peer.peerId
peersMap[peerIdStr] =
PeerInfo(protocols: peer.protocols, addresses: peer.addrs.mapIt($it))
# Convert the map to JSON string
let jsonObj = %*peersMap
let jsonStr = $jsonObj
return ok(jsonStr)
proc waku_get_connected_peers(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
## returns a comma-separated string of peerIDs
let
(inPeerIds, outPeerIds) = ctx.myLib[].node.peerManager.connectedPeers()
connectedPeerids = concat(inPeerIds, outPeerIds)
return ok(connectedPeerids.mapIt($it).join(","))
proc waku_get_peerids_by_protocol(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
protocol: cstring,
) {.ffi.} =
## returns a comma-separated string of peerIDs that mount the given protocol
let connectedPeers = ctx.myLib[].node.peerManager.switch.peerStore
.peers($protocol)
.filterIt(it.connectedness == Connected)
.mapIt($it.peerId)
.join(",")
return ok(connectedPeers)

View File

@ -0,0 +1,43 @@
import std/[json, strutils]
import chronos, results, ffi
import libp2p/[protocols/ping, switch, multiaddress, multicodec]
import waku/[factory/waku, waku_core/peers, node/waku_node], library/declare_lib
proc waku_ping_peer(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
peerAddr: cstring,
timeoutMs: cuint,
) {.ffi.} =
let peerInfo = peers.parsePeerInfo(($peerAddr).split(",")).valueOr:
return err("PingRequest failed to parse peer addr: " & $error)
let timeout = chronos.milliseconds(timeoutMs)
proc ping(): Future[Result[Duration, string]] {.async, gcsafe.} =
try:
let conn =
await ctx.myLib[].node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
defer:
await conn.close()
let pingRTT = await ctx.myLib[].node.libp2pPing.ping(conn)
if pingRTT == 0.nanos:
return err("could not ping peer: rtt-0")
return ok(pingRTT)
except CatchableError as exc:
return err("could not ping peer: " & exc.msg)
let pingFuture = ping()
let pingRTT: Duration =
if timeout == chronos.milliseconds(0): # No timeout expected
(await pingFuture).valueOr:
return err("ping failed, no timeout expected: " & error)
else:
let timedOut = not (await pingFuture.withTimeout(timeout))
if timedOut:
return err("ping timed out")
pingFuture.read().valueOr:
return err("failed to read ping future: " & error)
return ok($(pingRTT.nanos))

View File

@ -0,0 +1,109 @@
import options, std/[strutils, sequtils]
import chronicles, chronos, results, ffi
import
waku/waku_filter_v2/client,
waku/waku_core/message/message,
waku/factory/waku,
waku/waku_relay,
waku/waku_filter_v2/common,
waku/waku_core/subscription/push_handler,
waku/node/peer_manager/peer_manager,
waku/node/waku_node,
waku/node/kernel_api,
waku/waku_core/topics/pubsub_topic,
waku/waku_core/topics/content_topic,
library/events/json_message_event,
library/declare_lib
const FilterOpTimeout = 5.seconds
proc checkFilterClientMounted(waku: Waku): Result[string, string] =
if waku.node.wakuFilterClient.isNil():
let errorMsg = "wakuFilterClient is not mounted"
error "fail filter process", error = errorMsg
return err(errorMsg)
return ok("")
proc waku_filter_subscribe(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
contentTopics: cstring,
) {.ffi.} =
proc onReceivedMessage(ctx: ptr FFIContext): WakuRelayHandler =
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
callEventCallback(ctx, "onReceivedMessage"):
$JsonMessageEvent.new(pubsubTopic, msg)
checkFilterClientMounted(ctx.myLib[]).isOkOr:
return err($error)
var filterPushEventCallback = FilterPushHandler(onReceivedMessage(ctx))
ctx.myLib[].node.wakuFilterClient.registerPushHandler(filterPushEventCallback)
let peer = ctx.myLib[].node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg = "could not find peer with WakuFilterSubscribeCodec when subscribing"
error "fail filter subscribe", error = errorMsg
return err(errorMsg)
let subFut = ctx.myLib[].node.filterSubscribe(
some(PubsubTopic($pubsubTopic)),
($contentTopics).split(",").mapIt(ContentTopic(it)),
peer,
)
if not await subFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter subscription timed out"
error "fail filter unsubscribe", error = errorMsg
return err(errorMsg)
return ok("")
proc waku_filter_unsubscribe(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
contentTopics: cstring,
) {.ffi.} =
checkFilterClientMounted(ctx.myLib[]).isOkOr:
return err($error)
let peer = ctx.myLib[].node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when unsubscribing"
error "fail filter process", error = errorMsg
return err(errorMsg)
let subFut = ctx.myLib[].node.filterUnsubscribe(
some(PubsubTopic($pubsubTopic)),
($contentTopics).split(",").mapIt(ContentTopic(it)),
peer,
)
if not await subFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter un-subscription timed out"
error "fail filter unsubscribe", error = errorMsg
return err(errorMsg)
return ok("")
proc waku_filter_unsubscribe_all(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
checkFilterClientMounted(ctx.myLib[]).isOkOr:
return err($error)
let peer = ctx.myLib[].node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when unsubscribing all"
error "fail filter unsubscribe all", error = errorMsg
return err(errorMsg)
let unsubFut = ctx.myLib[].node.filterUnsubscribeAll(peer)
if not await unsubFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter un-subscription all timed out"
error "fail filter unsubscribe all", error = errorMsg
return err(errorMsg)
return ok("")

View File

@ -0,0 +1,51 @@
import options, std/[json, strformat]
import chronicles, chronos, results, ffi
import
waku/waku_core/message/message,
waku/waku_core/codecs,
waku/factory/waku,
waku/waku_core/message,
waku/waku_core/topics/pubsub_topic,
waku/waku_lightpush_legacy/client,
waku/node/peer_manager/peer_manager,
library/events/json_message_event,
library/declare_lib
proc waku_lightpush_publish(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
jsonWakuMessage: cstring,
) {.ffi.} =
if ctx.myLib[].node.wakuLightpushClient.isNil():
let errorMsg = "LightpushRequest waku.node.wakuLightpushClient is nil"
error "PUBLISH failed", error = errorMsg
return err(errorMsg)
var jsonMessage: JsonMessage
try:
let jsonContent = parseJson($jsonWakuMessage)
jsonMessage = JsonMessage.fromJsonNode(jsonContent).valueOr:
raise newException(JsonParsingError, $error)
except JsonParsingError as exc:
return err(fmt"Error parsing json message: {exc.msg}")
let msg = json_message_event.toWakuMessage(jsonMessage).valueOr:
return err("Problem building the WakuMessage: " & $error)
let peerOpt = ctx.myLib[].node.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
let errorMsg = "failed to lightpublish message, no suitable remote peers"
error "PUBLISH failed", error = errorMsg
return err(errorMsg)
let msgHashHex = (
await ctx.myLib[].node.wakuLegacyLightpushClient.publish(
$pubsubTopic, msg, peer = peerOpt.get()
)
).valueOr:
error "PUBLISH failed", error = error
return err($error)
return ok(msgHashHex)

View File

@ -0,0 +1,171 @@
import std/[net, sequtils, strutils, json], strformat
import chronicles, chronos, stew/byteutils, results, ffi
import
waku/waku_core/message/message,
waku/factory/[validator_signed, waku],
tools/confutils/cli_args,
waku/waku_core/message,
waku/waku_core/topics/pubsub_topic,
waku/waku_core/topics,
waku/node/kernel_api/relay,
waku/waku_relay/protocol,
waku/node/peer_manager,
library/events/json_message_event,
library/declare_lib
proc waku_relay_get_peers_in_mesh(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
let meshPeers = ctx.myLib[].node.wakuRelay.getPeersInMesh($pubsubTopic).valueOr:
error "LIST_MESH_PEERS failed", error = error
return err($error)
## returns a comma-separated string of peerIDs
return ok(meshPeers.mapIt($it).join(","))
proc waku_relay_get_num_peers_in_mesh(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
let numPeersInMesh = ctx.myLib[].node.wakuRelay.getNumPeersInMesh($pubsubTopic).valueOr:
error "NUM_MESH_PEERS failed", error = error
return err($error)
return ok($numPeersInMesh)
proc waku_relay_get_connected_peers(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
## Returns the list of all connected peers to an specific pubsub topic
let connPeers = ctx.myLib[].node.wakuRelay.getConnectedPeers($pubsubTopic).valueOr:
error "LIST_CONNECTED_PEERS failed", error = error
return err($error)
## returns a comma-separated string of peerIDs
return ok(connPeers.mapIt($it).join(","))
proc waku_relay_get_num_connected_peers(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
let numConnPeers = ctx.myLib[].node.wakuRelay.getNumConnectedPeers($pubsubTopic).valueOr:
error "NUM_CONNECTED_PEERS failed", error = error
return err($error)
return ok($numConnPeers)
proc waku_relay_add_protected_shard(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
clusterId: cint,
shardId: cint,
publicKey: cstring,
) {.ffi.} =
## Protects a shard with a public key
try:
let relayShard = RelayShard(clusterId: uint16(clusterId), shardId: uint16(shardId))
let protectedShard = ProtectedShard.parseCmdArg($relayShard & ":" & $publicKey)
ctx.myLib[].node.wakuRelay.addSignedShardsValidator(
@[protectedShard], uint16(clusterId)
)
except ValueError as exc:
return err("ERROR in waku_relay_add_protected_shard: " & exc.msg)
return ok("")
proc waku_relay_subscribe(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
echo "Subscribing to topic: " & $pubSubTopic & " ..."
proc onReceivedMessage(ctx: ptr FFIContext[Waku]): WakuRelayHandler =
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
callEventCallback(ctx, "onReceivedMessage"):
$JsonMessageEvent.new(pubsubTopic, msg)
var cb = onReceivedMessage(ctx)
ctx.myLib[].node.subscribe(
(kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic),
handler = WakuRelayHandler(cb),
).isOkOr:
error "SUBSCRIBE failed", error = error
return err($error)
return ok("")
proc waku_relay_unsubscribe(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
) {.ffi.} =
ctx.myLib[].node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic)).isOkOr:
error "UNSUBSCRIBE failed", error = error
return err($error)
return ok("")
proc waku_relay_publish(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
pubSubTopic: cstring,
jsonWakuMessage: cstring,
timeoutMs: cuint,
) {.ffi.} =
var
# https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms
jsonMessage: JsonMessage
try:
let jsonContent = parseJson($jsonWakuMessage)
jsonMessage = JsonMessage.fromJsonNode(jsonContent).valueOr:
raise newException(JsonParsingError, $error)
except JsonParsingError as exc:
return err(fmt"Error parsing json message: {exc.msg}")
let msg = json_message_event.toWakuMessage(jsonMessage).valueOr:
return err("Problem building the WakuMessage: " & $error)
(await ctx.myLib[].node.wakuRelay.publish($pubsubTopic, msg)).isOkOr:
error "PUBLISH failed", error = error
return err($error)
let msgHash = computeMessageHash($pubSubTopic, msg).to0xHex
return ok(msgHash)
proc waku_default_pubsub_topic(
ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
) {.ffi.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic
return ok(DefaultPubsubTopic)
proc waku_content_topic(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
appName: cstring,
appVersion: cuint,
contentTopicName: cstring,
encoding: cstring,
) {.ffi.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding
return ok(fmt"/{$appName}/{$appVersion}/{$contentTopicName}/{$encoding}")
proc waku_pubsub_topic(
ctx: ptr FFIContext[Waku],
callback: FFICallBack,
userData: pointer,
topicName: cstring,
) {.ffi.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding
return ok(fmt"/waku/2/{$topicName}")

View File

@ -1,28 +1,16 @@
import std/[json, sugar, strutils, options] import std/[json, sugar, strutils, options]
import chronos, chronicles, results, stew/byteutils import chronos, chronicles, results, stew/byteutils, ffi
import import
../../../../waku/factory/waku, waku/factory/waku,
../../../alloc, library/utils,
../../../utils, waku/waku_core/peers,
../../../../waku/waku_core/peers, waku/waku_core/message/digest,
../../../../waku/waku_core/time, waku/waku_store/common,
../../../../waku/waku_core/message/digest, waku/waku_store/client,
../../../../waku/waku_store/common, waku/common/paging,
../../../../waku/waku_store/client, library/declare_lib
../../../../waku/common/paging
type StoreReqType* = enum func fromJsonNode(jsonContent: JsonNode): Result[StoreQueryRequest, string] =
REMOTE_QUERY ## to perform a query to another Store node
type StoreRequest* = object
operation: StoreReqType
jsonQuery: cstring
peerAddr: cstring
timeoutMs: cint
func fromJsonNode(
T: type StoreRequest, jsonContent: JsonNode
): Result[StoreQueryRequest, string] =
var contentTopics: seq[string] var contentTopics: seq[string]
if jsonContent.contains("contentTopics"): if jsonContent.contains("contentTopics"):
contentTopics = collect(newSeq): contentTopics = collect(newSeq):
@ -78,54 +66,29 @@ func fromJsonNode(
) )
) )
proc createShared*( proc waku_store_query(
T: type StoreRequest, ctx: ptr FFIContext[Waku],
op: StoreReqType, callback: FFICallBack,
userData: pointer,
jsonQuery: cstring, jsonQuery: cstring,
peerAddr: cstring, peerAddr: cstring,
timeoutMs: cint, timeoutMs: cint,
): ptr type T = ) {.ffi.} =
var ret = createShared(T)
ret[].operation = op
ret[].timeoutMs = timeoutMs
ret[].jsonQuery = jsonQuery.alloc()
ret[].peerAddr = peerAddr.alloc()
return ret
proc destroyShared(self: ptr StoreRequest) =
deallocShared(self[].jsonQuery)
deallocShared(self[].peerAddr)
deallocShared(self)
proc process_remote_query(
self: ptr StoreRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
let jsonContentRes = catch: let jsonContentRes = catch:
parseJson($self[].jsonQuery) parseJson($jsonQuery)
if jsonContentRes.isErr(): if jsonContentRes.isErr():
return err("StoreRequest failed parsing store request: " & jsonContentRes.error.msg) return err("StoreRequest failed parsing store request: " & jsonContentRes.error.msg)
let storeQueryRequest = ?StoreRequest.fromJsonNode(jsonContentRes.get()) let storeQueryRequest = ?fromJsonNode(jsonContentRes.get())
let peer = peers.parsePeerInfo(($self[].peerAddr).split(",")).valueOr: let peer = peers.parsePeerInfo(($peerAddr).split(",")).valueOr:
return err("StoreRequest failed to parse peer addr: " & $error) return err("StoreRequest failed to parse peer addr: " & $error)
let queryResponse = (await waku.node.wakuStoreClient.query(storeQueryRequest, peer)).valueOr: let queryResponse = (
await ctx.myLib[].node.wakuStoreClient.query(storeQueryRequest, peer)
).valueOr:
return err("StoreRequest failed store query: " & $error) return err("StoreRequest failed store query: " & $error)
let res = $(%*(queryResponse.toHex())) let res = $(%*(queryResponse.toHex()))
return ok(res) ## returning the response in json format return ok(res) ## returning the response in json format
proc process*(
self: ptr StoreRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
deallocShared(self)
case self.operation
of REMOTE_QUERY:
return await self.process_remote_query(waku)
error "store request not handled at all"
return err("store request not handled at all")

View File

@ -15,180 +15,181 @@
#define RET_MISSING_CALLBACK 2 #define RET_MISSING_CALLBACK 2
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C"
{
#endif #endif
typedef void (*WakuCallBack) (int callerRet, const char* msg, size_t len, void* userData); typedef void (*FFICallBack)(int callerRet, const char *msg, size_t len, void *userData);
// Creates a new instance of the waku node. // Creates a new instance of the waku node.
// Sets up the waku node from the given configuration. // Sets up the waku node from the given configuration.
// Returns a pointer to the Context needed by the rest of the API functions. // Returns a pointer to the Context needed by the rest of the API functions.
void *waku_new( void *waku_new(
const char *configJson, const char *configJson,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_start(void *ctx, int waku_start(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_stop(void *ctx, int waku_stop(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
// Destroys an instance of a waku node created with waku_new // Destroys an instance of a waku node created with waku_new
int waku_destroy(void *ctx, int waku_destroy(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_version(void *ctx, int waku_version(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
// Sets a callback that will be invoked whenever an event occurs. // Sets a callback that will be invoked whenever an event occurs.
// It is crucial that the passed callback is fast, non-blocking and potentially thread-safe. // It is crucial that the passed callback is fast, non-blocking and potentially thread-safe.
void waku_set_event_callback(void* ctx, void set_event_callback(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_content_topic(void *ctx, int waku_content_topic(void *ctx,
FFICallBack callback,
void *userData,
const char *appName, const char *appName,
unsigned int appVersion, unsigned int appVersion,
const char *contentTopicName, const char *contentTopicName,
const char* encoding, const char *encoding);
WakuCallBack callback,
void* userData);
int waku_pubsub_topic(void *ctx, int waku_pubsub_topic(void *ctx,
const char* topicName, FFICallBack callback,
WakuCallBack callback, void *userData,
void* userData); const char *topicName);
int waku_default_pubsub_topic(void *ctx, int waku_default_pubsub_topic(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_relay_publish(void *ctx, int waku_relay_publish(void *ctx,
FFICallBack callback,
void *userData,
const char *pubSubTopic, const char *pubSubTopic,
const char *jsonWakuMessage, const char *jsonWakuMessage,
unsigned int timeoutMs, unsigned int timeoutMs);
WakuCallBack callback,
void* userData);
int waku_lightpush_publish(void *ctx, int waku_lightpush_publish(void *ctx,
FFICallBack callback,
void *userData,
const char *pubSubTopic, const char *pubSubTopic,
const char* jsonWakuMessage, const char *jsonWakuMessage);
WakuCallBack callback,
void* userData);
int waku_relay_subscribe(void *ctx, int waku_relay_subscribe(void *ctx,
const char* pubSubTopic, FFICallBack callback,
WakuCallBack callback, void *userData,
void* userData); const char *pubSubTopic);
int waku_relay_add_protected_shard(void *ctx, int waku_relay_add_protected_shard(void *ctx,
FFICallBack callback,
void *userData,
int clusterId, int clusterId,
int shardId, int shardId,
char* publicKey, char *publicKey);
WakuCallBack callback,
void* userData);
int waku_relay_unsubscribe(void *ctx, int waku_relay_unsubscribe(void *ctx,
const char* pubSubTopic, FFICallBack callback,
WakuCallBack callback, void *userData,
void* userData); const char *pubSubTopic);
int waku_filter_subscribe(void *ctx, int waku_filter_subscribe(void *ctx,
FFICallBack callback,
void *userData,
const char *pubSubTopic, const char *pubSubTopic,
const char* contentTopics, const char *contentTopics);
WakuCallBack callback,
void* userData);
int waku_filter_unsubscribe(void *ctx, int waku_filter_unsubscribe(void *ctx,
FFICallBack callback,
void *userData,
const char *pubSubTopic, const char *pubSubTopic,
const char* contentTopics, const char *contentTopics);
WakuCallBack callback,
void* userData);
int waku_filter_unsubscribe_all(void *ctx, int waku_filter_unsubscribe_all(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_relay_get_num_connected_peers(void *ctx, int waku_relay_get_num_connected_peers(void *ctx,
const char* pubSubTopic, FFICallBack callback,
WakuCallBack callback, void *userData,
void* userData); const char *pubSubTopic);
int waku_relay_get_connected_peers(void *ctx, int waku_relay_get_connected_peers(void *ctx,
const char* pubSubTopic, FFICallBack callback,
WakuCallBack callback, void *userData,
void* userData); const char *pubSubTopic);
int waku_relay_get_num_peers_in_mesh(void *ctx, int waku_relay_get_num_peers_in_mesh(void *ctx,
const char* pubSubTopic, FFICallBack callback,
WakuCallBack callback, void *userData,
void* userData); const char *pubSubTopic);
int waku_relay_get_peers_in_mesh(void *ctx, int waku_relay_get_peers_in_mesh(void *ctx,
const char* pubSubTopic, FFICallBack callback,
WakuCallBack callback, void *userData,
void* userData); const char *pubSubTopic);
int waku_store_query(void *ctx, int waku_store_query(void *ctx,
FFICallBack callback,
void *userData,
const char *jsonQuery, const char *jsonQuery,
const char *peerAddr, const char *peerAddr,
int timeoutMs, int timeoutMs);
WakuCallBack callback,
void* userData);
int waku_connect(void *ctx, int waku_connect(void *ctx,
FFICallBack callback,
void *userData,
const char *peerMultiAddr, const char *peerMultiAddr,
unsigned int timeoutMs, unsigned int timeoutMs);
WakuCallBack callback,
void* userData);
int waku_disconnect_peer_by_id(void *ctx, int waku_disconnect_peer_by_id(void *ctx,
const char* peerId, FFICallBack callback,
WakuCallBack callback, void *userData,
void* userData); const char *peerId);
int waku_disconnect_all_peers(void *ctx, int waku_disconnect_all_peers(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_dial_peer(void *ctx, int waku_dial_peer(void *ctx,
FFICallBack callback,
void *userData,
const char *peerMultiAddr, const char *peerMultiAddr,
const char *protocol, const char *protocol,
int timeoutMs, int timeoutMs);
WakuCallBack callback,
void* userData);
int waku_dial_peer_by_id(void *ctx, int waku_dial_peer_by_id(void *ctx,
FFICallBack callback,
void *userData,
const char *peerId, const char *peerId,
const char *protocol, const char *protocol,
int timeoutMs, int timeoutMs);
WakuCallBack callback,
void* userData);
int waku_get_peerids_from_peerstore(void *ctx, int waku_get_peerids_from_peerstore(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_get_connected_peers_info(void *ctx, int waku_get_connected_peers_info(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_get_peerids_by_protocol(void *ctx, int waku_get_peerids_by_protocol(void *ctx,
const char* protocol, FFICallBack callback,
WakuCallBack callback, void *userData,
void* userData); const char *protocol);
int waku_listen_addresses(void *ctx, int waku_listen_addresses(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_get_connected_peers(void *ctx, int waku_get_connected_peers(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
// Returns a list of multiaddress given a url to a DNS discoverable ENR tree // Returns a list of multiaddress given a url to a DNS discoverable ENR tree
@ -197,53 +198,53 @@ int waku_get_connected_peers(void* ctx,
// char* nameDnsServer: The nameserver to resolve the ENR tree url. // char* nameDnsServer: The nameserver to resolve the ENR tree url.
// int timeoutMs: Timeout value in milliseconds to execute the call. // int timeoutMs: Timeout value in milliseconds to execute the call.
int waku_dns_discovery(void *ctx, int waku_dns_discovery(void *ctx,
FFICallBack callback,
void *userData,
const char *entTreeUrl, const char *entTreeUrl,
const char *nameDnsServer, const char *nameDnsServer,
int timeoutMs, int timeoutMs);
WakuCallBack callback,
void* userData);
// Updates the bootnode list used for discovering new peers via DiscoveryV5 // Updates the bootnode list used for discovering new peers via DiscoveryV5
// bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]` // bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
int waku_discv5_update_bootnodes(void *ctx, int waku_discv5_update_bootnodes(void *ctx,
char* bootnodes, FFICallBack callback,
WakuCallBack callback, void *userData,
void* userData); char *bootnodes);
int waku_start_discv5(void *ctx, int waku_start_discv5(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_stop_discv5(void *ctx, int waku_stop_discv5(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
// Retrieves the ENR information // Retrieves the ENR information
int waku_get_my_enr(void *ctx, int waku_get_my_enr(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_get_my_peerid(void *ctx, int waku_get_my_peerid(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_get_metrics(void *ctx, int waku_get_metrics(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
int waku_peer_exchange_request(void *ctx, int waku_peer_exchange_request(void *ctx,
int numPeers, FFICallBack callback,
WakuCallBack callback, void *userData,
void* userData); int numPeers);
int waku_ping_peer(void *ctx, int waku_ping_peer(void *ctx,
FFICallBack callback,
void *userData,
const char *peerAddr, const char *peerAddr,
int timeoutMs, int timeoutMs);
WakuCallBack callback,
void* userData);
int waku_is_online(void *ctx, int waku_is_online(void *ctx,
WakuCallBack callback, FFICallBack callback,
void *userData); void *userData);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -1,107 +1,35 @@
{.pragma: exported, exportc, cdecl, raises: [].} import std/[atomics, options, atomics, macros]
{.pragma: callback, cdecl, raises: [], gcsafe.} import chronicles, chronos, chronos/threadsync, ffi
{.passc: "-fPIC".}
when defined(linux):
{.passl: "-Wl,-soname,libwaku.so".}
import std/[json, atomics, strformat, options, atomics]
import chronicles, chronos, chronos/threadsync
import import
waku/common/base64,
waku/waku_core/message/message, waku/waku_core/message/message,
waku/node/waku_node,
waku/node/peer_manager,
waku/waku_core/topics/pubsub_topic, waku/waku_core/topics/pubsub_topic,
waku/waku_core/subscription/push_handler,
waku/waku_relay, waku/waku_relay,
./events/json_message_event, ./events/json_message_event,
./waku_context, ./events/json_topic_health_change_event,
./waku_thread_requests/requests/node_lifecycle_request, ./events/json_connection_change_event,
./waku_thread_requests/requests/peer_manager_request, ../waku/factory/app_callbacks,
./waku_thread_requests/requests/protocols/relay_request, waku/factory/waku,
./waku_thread_requests/requests/protocols/store_request, waku/node/waku_node,
./waku_thread_requests/requests/protocols/lightpush_request, ./declare_lib
./waku_thread_requests/requests/protocols/filter_request,
./waku_thread_requests/requests/debug_node_request,
./waku_thread_requests/requests/discovery_request,
./waku_thread_requests/requests/ping_request,
./waku_thread_requests/waku_thread_request,
./alloc,
./ffi_types,
../waku/factory/app_callbacks
################################################################################ ################################################################################
### Wrapper around the waku node ## Include different APIs, i.e. all procs with {.ffi.} pragma
################################################################################ include
./kernel_api/peer_manager_api,
################################################################################ ./kernel_api/discovery_api,
### Not-exported components ./kernel_api/node_lifecycle_api,
./kernel_api/debug_node_api,
template checkLibwakuParams*( ./kernel_api/ping_api,
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ./kernel_api/protocols/relay_api,
) = ./kernel_api/protocols/store_api,
if not isNil(ctx): ./kernel_api/protocols/lightpush_api,
ctx[].userData = userData ./kernel_api/protocols/filter_api
if isNil(callback):
return RET_MISSING_CALLBACK
proc handleRequest(
ctx: ptr WakuContext,
requestType: RequestType,
content: pointer,
callback: WakuCallBack,
userData: pointer,
): cint =
waku_context.sendRequestToWakuThread(ctx, requestType, content, callback, userData).isOkOr:
let msg = "libwaku error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
return RET_OK
### End of not-exported components
################################################################################
################################################################################
### Library setup
# Every Nim library must have this function called - the name is derived from
# the `--nimMainPrefix` command line option
proc libwakuNimMain() {.importc.}
# To control when the library has been initialized
var initialized: Atomic[bool]
if defined(android):
# Redirect chronicles to Android System logs
when compiles(defaultChroniclesStream.outputs[0].writer):
defaultChroniclesStream.outputs[0].writer = proc(
logLevel: LogLevel, msg: LogOutputStr
) {.raises: [].} =
echo logLevel, msg
proc initializeLibrary() {.exported.} =
if not initialized.exchange(true):
## Every Nim library needs to call `<yourprefix>NimMain` once exactly, to initialize the Nim runtime.
## Being `<yourprefix>` the value given in the optional compilation flag --nimMainPrefix:yourprefix
libwakuNimMain()
when declared(setupForeignThreadGc):
setupForeignThreadGc()
when declared(nimGC_setStackBottom):
var locals {.volatile, noinit.}: pointer
locals = addr(locals)
nimGC_setStackBottom(locals)
### End of library setup
################################################################################
################################################################################ ################################################################################
### Exported procs ### Exported procs
proc waku_new( proc waku_new(
configJson: cstring, callback: WakuCallback, userData: pointer configJson: cstring, callback: FFICallback, userData: pointer
): pointer {.dynlib, exportc, cdecl.} = ): pointer {.dynlib, exportc, cdecl.} =
initializeLibrary() initializeLibrary()
@ -111,41 +39,50 @@ proc waku_new(
return nil return nil
## Create the Waku thread that will keep waiting for req from the main thread. ## Create the Waku thread that will keep waiting for req from the main thread.
var ctx = waku_context.createWakuContext().valueOr: var ctx = ffi.createFFIContext[Waku]().valueOr:
let msg = "Error in createWakuContext: " & $error let msg = "Error in createFFIContext: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil return nil
ctx.userData = userData ctx.userData = userData
proc onReceivedMessage(ctx: ptr FFIContext): WakuRelayHandler =
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
callEventCallback(ctx, "onReceivedMessage"):
$JsonMessageEvent.new(pubsubTopic, msg)
proc onTopicHealthChange(ctx: ptr FFIContext): TopicHealthChangeHandler =
return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} =
callEventCallback(ctx, "onTopicHealthChange"):
$JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth)
proc onConnectionChange(ctx: ptr FFIContext): ConnectionChangeHandler =
return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} =
callEventCallback(ctx, "onConnectionChange"):
$JsonConnectionChangeEvent.new($peerId, peerEvent)
let appCallbacks = AppCallbacks( let appCallbacks = AppCallbacks(
relayHandler: onReceivedMessage(ctx), relayHandler: onReceivedMessage(ctx),
topicHealthChangeHandler: onTopicHealthChange(ctx), topicHealthChangeHandler: onTopicHealthChange(ctx),
connectionChangeHandler: onConnectionChange(ctx), connectionChangeHandler: onConnectionChange(ctx),
) )
let retCode = handleRequest( ffi.sendRequestToFFIThread(
ctx, ctx, CreateNodeRequest.ffiNewReq(callback, userData, configJson, appCallbacks)
RequestType.LIFECYCLE, ).isOkOr:
NodeLifecycleRequest.createShared( let msg = "error in sendRequestToFFIThread: " & $error
NodeLifecycleMsgType.CREATE_NODE, configJson, appCallbacks callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
),
callback,
userData,
)
if retCode == RET_ERR:
return nil return nil
return ctx return ctx
proc waku_destroy( proc waku_destroy(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer
): cint {.dynlib, exportc.} = ): cint {.dynlib, exportc, cdecl.} =
initializeLibrary() initializeLibrary()
checkLibwakuParams(ctx, callback, userData) checkParams(ctx, callback, userData)
waku_context.destroyWakuContext(ctx).isOkOr: ffi.destroyFFIContext(ctx).isOkOr:
let msg = "libwaku error: " & $error let msg = "libwaku error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR return RET_ERR
@ -155,699 +92,5 @@ proc waku_destroy(
return RET_OK return RET_OK
proc waku_version( # ### End of exported procs
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer # ################################################################################
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
callback(
RET_OK,
cast[ptr cchar](WakuNodeVersionString),
cast[csize_t](len(WakuNodeVersionString)),
userData,
)
return RET_OK
proc waku_set_event_callback(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
) {.dynlib, exportc.} =
initializeLibrary()
ctx[].eventCallback = cast[pointer](callback)
ctx[].eventUserData = userData
proc waku_content_topic(
ctx: ptr WakuContext,
appName: cstring,
appVersion: cuint,
contentTopicName: cstring,
encoding: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let contentTopic = fmt"/{$appName}/{$appVersion}/{$contentTopicName}/{$encoding}"
callback(
RET_OK, unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic)), userData
)
return RET_OK
proc waku_pubsub_topic(
ctx: ptr WakuContext, topicName: cstring, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc, cdecl.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let outPubsubTopic = fmt"/waku/2/{$topicName}"
callback(
RET_OK, unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic)), userData
)
return RET_OK
proc waku_default_pubsub_topic(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
callback(
RET_OK,
cast[ptr cchar](DefaultPubsubTopic),
cast[csize_t](len(DefaultPubsubTopic)),
userData,
)
return RET_OK
proc waku_relay_publish(
ctx: ptr WakuContext,
pubSubTopic: cstring,
jsonWakuMessage: cstring,
timeoutMs: cuint,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc, cdecl.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
var jsonMessage: JsonMessage
try:
let jsonContent = parseJson($jsonWakuMessage)
jsonMessage = JsonMessage.fromJsonNode(jsonContent).valueOr:
raise newException(JsonParsingError, $error)
except JsonParsingError:
let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
let wakuMessage = jsonMessage.toWakuMessage().valueOr:
let msg = "Problem building the WakuMessage: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.PUBLISH, pubSubTopic, nil, wakuMessage),
callback,
userData,
)
proc waku_start(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.START_NODE),
callback,
userData,
)
proc waku_stop(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.STOP_NODE),
callback,
userData,
)
proc waku_relay_subscribe(
ctx: ptr WakuContext,
pubSubTopic: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
var cb = onReceivedMessage(ctx)
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.SUBSCRIBE, pubSubTopic, WakuRelayHandler(cb)),
callback,
userData,
)
proc waku_relay_add_protected_shard(
ctx: ptr WakuContext,
clusterId: cint,
shardId: cint,
publicKey: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc, cdecl.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(
RelayMsgType.ADD_PROTECTED_SHARD,
clusterId = clusterId,
shardId = shardId,
publicKey = publicKey,
),
callback,
userData,
)
proc waku_relay_unsubscribe(
ctx: ptr WakuContext,
pubSubTopic: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(
RelayMsgType.UNSUBSCRIBE, pubSubTopic, WakuRelayHandler(onReceivedMessage(ctx))
),
callback,
userData,
)
proc waku_relay_get_num_connected_peers(
ctx: ptr WakuContext,
pubSubTopic: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.NUM_CONNECTED_PEERS, pubSubTopic),
callback,
userData,
)
proc waku_relay_get_connected_peers(
ctx: ptr WakuContext,
pubSubTopic: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.LIST_CONNECTED_PEERS, pubSubTopic),
callback,
userData,
)
proc waku_relay_get_num_peers_in_mesh(
ctx: ptr WakuContext,
pubSubTopic: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.NUM_MESH_PEERS, pubSubTopic),
callback,
userData,
)
proc waku_relay_get_peers_in_mesh(
ctx: ptr WakuContext,
pubSubTopic: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.LIST_MESH_PEERS, pubSubTopic),
callback,
userData,
)
proc waku_filter_subscribe(
ctx: ptr WakuContext,
pubSubTopic: cstring,
contentTopics: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.FILTER,
FilterRequest.createShared(
FilterMsgType.SUBSCRIBE,
pubSubTopic,
contentTopics,
FilterPushHandler(onReceivedMessage(ctx)),
),
callback,
userData,
)
proc waku_filter_unsubscribe(
ctx: ptr WakuContext,
pubSubTopic: cstring,
contentTopics: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.FILTER,
FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE, pubSubTopic, contentTopics),
callback,
userData,
)
proc waku_filter_unsubscribe_all(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.FILTER,
FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE_ALL),
callback,
userData,
)
proc waku_lightpush_publish(
ctx: ptr WakuContext,
pubSubTopic: cstring,
jsonWakuMessage: cstring,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc, cdecl.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
var jsonMessage: JsonMessage
try:
let jsonContent = parseJson($jsonWakuMessage)
jsonMessage = JsonMessage.fromJsonNode(jsonContent).valueOr:
raise newException(JsonParsingError, $error)
except JsonParsingError:
let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
let wakuMessage = jsonMessage.toWakuMessage().valueOr:
let msg = "Problem building the WakuMessage: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
handleRequest(
ctx,
RequestType.LIGHTPUSH,
LightpushRequest.createShared(LightpushMsgType.PUBLISH, pubSubTopic, wakuMessage),
callback,
userData,
)
proc waku_connect(
ctx: ptr WakuContext,
peerMultiAddr: cstring,
timeoutMs: cuint,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
PeerManagementMsgType.CONNECT_TO, $peerMultiAddr, chronos.milliseconds(timeoutMs)
),
callback,
userData,
)
proc waku_disconnect_peer_by_id(
ctx: ptr WakuContext, peerId: cstring, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
op = PeerManagementMsgType.DISCONNECT_PEER_BY_ID, peerId = $peerId
),
callback,
userData,
)
proc waku_disconnect_all_peers(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(op = PeerManagementMsgType.DISCONNECT_ALL_PEERS),
callback,
userData,
)
proc waku_dial_peer(
ctx: ptr WakuContext,
peerMultiAddr: cstring,
protocol: cstring,
timeoutMs: cuint,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
op = PeerManagementMsgType.DIAL_PEER,
peerMultiAddr = $peerMultiAddr,
protocol = $protocol,
),
callback,
userData,
)
proc waku_dial_peer_by_id(
ctx: ptr WakuContext,
peerId: cstring,
protocol: cstring,
timeoutMs: cuint,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
op = PeerManagementMsgType.DIAL_PEER_BY_ID, peerId = $peerId, protocol = $protocol
),
callback,
userData,
)
proc waku_get_peerids_from_peerstore(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(PeerManagementMsgType.GET_ALL_PEER_IDS),
callback,
userData,
)
proc waku_get_connected_peers_info(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(PeerManagementMsgType.GET_CONNECTED_PEERS_INFO),
callback,
userData,
)
proc waku_get_connected_peers(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(PeerManagementMsgType.GET_CONNECTED_PEERS),
callback,
userData,
)
proc waku_get_peerids_by_protocol(
ctx: ptr WakuContext, protocol: cstring, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
op = PeerManagementMsgType.GET_PEER_IDS_BY_PROTOCOL, protocol = $protocol
),
callback,
userData,
)
proc waku_store_query(
ctx: ptr WakuContext,
jsonQuery: cstring,
peerAddr: cstring,
timeoutMs: cint,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.STORE,
StoreRequest.createShared(StoreReqType.REMOTE_QUERY, jsonQuery, peerAddr, timeoutMs),
callback,
userData,
)
proc waku_listen_addresses(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.DEBUG,
DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_LISTENING_ADDRESSES),
callback,
userData,
)
proc waku_dns_discovery(
ctx: ptr WakuContext,
entTreeUrl: cstring,
nameDnsServer: cstring,
timeoutMs: cint,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.DISCOVERY,
DiscoveryRequest.createRetrieveBootstrapNodesRequest(
DiscoveryMsgType.GET_BOOTSTRAP_NODES, entTreeUrl, nameDnsServer, timeoutMs
),
callback,
userData,
)
proc waku_discv5_update_bootnodes(
ctx: ptr WakuContext, bootnodes: cstring, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
## Updates the bootnode list used for discovering new peers via DiscoveryV5
## bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.DISCOVERY,
DiscoveryRequest.createUpdateBootstrapNodesRequest(
DiscoveryMsgType.UPDATE_DISCV5_BOOTSTRAP_NODES, bootnodes
),
callback,
userData,
)
proc waku_get_my_enr(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.DEBUG,
DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_MY_ENR),
callback,
userData,
)
proc waku_get_my_peerid(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.DEBUG,
DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_MY_PEER_ID),
callback,
userData,
)
proc waku_get_metrics(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.DEBUG,
DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_METRICS),
callback,
userData,
)
proc waku_start_discv5(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.DISCOVERY,
DiscoveryRequest.createDiscV5StartRequest(),
callback,
userData,
)
proc waku_stop_discv5(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.DISCOVERY,
DiscoveryRequest.createDiscV5StopRequest(),
callback,
userData,
)
proc waku_peer_exchange_request(
ctx: ptr WakuContext, numPeers: uint64, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.DISCOVERY,
DiscoveryRequest.createPeerExchangeRequest(numPeers),
callback,
userData,
)
proc waku_ping_peer(
ctx: ptr WakuContext,
peerAddr: cstring,
timeoutMs: cuint,
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.PING,
PingRequest.createShared(peerAddr, chronos.milliseconds(timeoutMs)),
callback,
userData,
)
proc waku_is_online(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
handleRequest(
ctx,
RequestType.DEBUG,
DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_ONLINE_STATE),
callback,
userData,
)
### End of exported procs
################################################################################

View File

@ -1,223 +0,0 @@
{.pragma: exported, exportc, cdecl, raises: [].}
{.pragma: callback, cdecl, raises: [], gcsafe.}
{.passc: "-fPIC".}
import std/[options, atomics, os, net, locks]
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
import
waku/common/logging,
waku/factory/waku,
waku/node/peer_manager,
waku/waku_relay/[protocol, topic_health],
waku/waku_core/[topics/pubsub_topic, message],
./waku_thread_requests/[waku_thread_request, requests/debug_node_request],
./ffi_types,
./events/[
json_message_event, json_topic_health_change_event, json_connection_change_event,
json_waku_not_responding_event,
]
type WakuContext* = object
wakuThread: Thread[(ptr WakuContext)]
watchdogThread: Thread[(ptr WakuContext)]
# monitors the Waku thread and notifies the Waku SDK consumer if it hangs
lock: Lock
reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest]
reqSignal: ThreadSignalPtr
# to inform The Waku Thread (a.k.a TWT) that a new request is sent
reqReceivedSignal: ThreadSignalPtr
# to inform the main thread that the request is rx by TWT
userData*: pointer
eventCallback*: pointer
eventUserdata*: pointer
running: Atomic[bool] # To control when the threads are running
const git_version* {.strdefine.} = "n/a"
const versionString = "version / git commit hash: " & waku.git_version
template callEventCallback(ctx: ptr WakuContext, eventName: string, body: untyped) =
if isNil(ctx[].eventCallback):
error eventName & " - eventCallback is nil"
return
foreignThreadGc:
try:
let event = body
cast[WakuCallBack](ctx[].eventCallback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
)
except Exception, CatchableError:
let msg =
"Exception " & eventName & " when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[WakuCallBack](ctx[].eventCallback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)
proc onConnectionChange*(ctx: ptr WakuContext): ConnectionChangeHandler =
return proc(peerId: PeerId, peerEvent: PeerEventKind) {.async.} =
callEventCallback(ctx, "onConnectionChange"):
$JsonConnectionChangeEvent.new($peerId, peerEvent)
proc onReceivedMessage*(ctx: ptr WakuContext): WakuRelayHandler =
return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} =
callEventCallback(ctx, "onReceivedMessage"):
$JsonMessageEvent.new(pubsubTopic, msg)
proc onTopicHealthChange*(ctx: ptr WakuContext): TopicHealthChangeHandler =
return proc(pubsubTopic: PubsubTopic, topicHealth: TopicHealth) {.async.} =
callEventCallback(ctx, "onTopicHealthChange"):
$JsonTopicHealthChangeEvent.new(pubsubTopic, topicHealth)
proc onWakuNotResponding*(ctx: ptr WakuContext) =
callEventCallback(ctx, "onWakuNotResponsive"):
$JsonWakuNotRespondingEvent.new()
proc sendRequestToWakuThread*(
ctx: ptr WakuContext,
reqType: RequestType,
reqContent: pointer,
callback: WakuCallBack,
userData: pointer,
timeout = InfiniteDuration,
): Result[void, string] =
ctx.lock.acquire()
# This lock is only necessary while we use a SP Channel and while the signalling
# between threads assumes that there aren't concurrent requests.
# Rearchitecting the signaling + migrating to a MP Channel will allow us to receive
# requests concurrently and spare us the need of locks
defer:
ctx.lock.release()
let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData)
## Sending the request
let sentOk = ctx.reqChannel.trySend(req)
if not sentOk:
deallocShared(req)
return err("Couldn't send a request to the waku thread: " & $req[])
let fireSync = ctx.reqSignal.fireSync().valueOr:
deallocShared(req)
return err("failed fireSync: " & $error)
if not fireSync:
deallocShared(req)
return err("Couldn't fireSync in time")
## wait until the Waku Thread properly received the request
ctx.reqReceivedSignal.waitSync(timeout).isOkOr:
deallocShared(req)
return err("Couldn't receive reqReceivedSignal signal")
## Notice that in case of "ok", the deallocShared(req) is performed by the Waku Thread in the
## process proc. See the 'waku_thread_request.nim' module for more details.
ok()
proc watchdogThreadBody(ctx: ptr WakuContext) {.thread.} =
## Watchdog thread that monitors the Waku thread and notifies the library user if it hangs.
let watchdogRun = proc(ctx: ptr WakuContext) {.async.} =
const WatchdogStartDelay = 10.seconds
const WatchdogTimeinterval = 1.seconds
const WakuNotRespondingTimeout = 3.seconds
# Give time for the node to be created and up before sending watchdog requests
await sleepAsync(WatchdogStartDelay)
while true:
await sleepAsync(WatchdogTimeinterval)
if ctx.running.load == false:
info "Watchdog thread exiting because WakuContext is not running"
break
let wakuCallback = proc(
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].} =
discard ## Don't do anything. Just respecting the callback signature.
const nilUserData = nil
trace "Sending watchdog request to Waku thread"
sendRequestToWakuThread(
ctx,
RequestType.DEBUG,
DebugNodeRequest.createShared(DebugNodeMsgType.CHECK_WAKU_NOT_BLOCKED),
wakuCallback,
nilUserData,
WakuNotRespondingTimeout,
).isOkOr:
error "Failed to send watchdog request to Waku thread", error = $error
onWakuNotResponding(ctx)
waitFor watchdogRun(ctx)
proc wakuThreadBody(ctx: ptr WakuContext) {.thread.} =
## Waku thread that attends library user requests (stop, connect_to, etc.)
logging.setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
let wakuRun = proc(ctx: ptr WakuContext) {.async.} =
var waku: Waku
while true:
await ctx.reqSignal.wait()
if ctx.running.load == false:
break
## Trying to get a request from the libwaku requestor thread
var request: ptr WakuThreadRequest
let recvOk = ctx.reqChannel.tryRecv(request)
if not recvOk:
error "waku thread could not receive a request"
continue
## Handle the request
asyncSpawn WakuThreadRequest.process(request, addr waku)
ctx.reqReceivedSignal.fireSync().isOkOr:
error "could not fireSync back to requester thread", error = error
waitFor wakuRun(ctx)
proc createWakuContext*(): Result[ptr WakuContext, string] =
## This proc is called from the main thread and it creates
## the Waku working thread.
var ctx = createShared(WakuContext, 1)
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr")
ctx.reqReceivedSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqReceivedSignal ThreadSignalPtr")
ctx.lock.initLock()
ctx.running.store(true)
try:
createThread(ctx.wakuThread, wakuThreadBody, ctx)
except ValueError, ResourceExhaustedError:
freeShared(ctx)
return err("failed to create the Waku thread: " & getCurrentExceptionMsg())
try:
createThread(ctx.watchdogThread, watchdogThreadBody, ctx)
except ValueError, ResourceExhaustedError:
freeShared(ctx)
return err("failed to create the watchdog thread: " & getCurrentExceptionMsg())
return ok(ctx)
proc destroyWakuContext*(ctx: ptr WakuContext): Result[void, string] =
ctx.running.store(false)
let signaledOnTime = ctx.reqSignal.fireSync().valueOr:
return err("error in destroyWakuContext: " & $error)
if not signaledOnTime:
return err("failed to signal reqSignal on time in destroyWakuContext")
joinThread(ctx.wakuThread)
joinThread(ctx.watchdogThread)
ctx.lock.deinitLock()
?ctx.reqSignal.close()
?ctx.reqReceivedSignal.close()
freeShared(ctx)
return ok()

View File

@ -1,63 +0,0 @@
import std/json
import
chronicles,
chronos,
results,
eth/p2p/discoveryv5/enr,
strutils,
libp2p/peerid,
metrics
import
../../../waku/factory/waku,
../../../waku/node/waku_node,
../../../waku/node/health_monitor
type DebugNodeMsgType* = enum
RETRIEVE_LISTENING_ADDRESSES
RETRIEVE_MY_ENR
RETRIEVE_MY_PEER_ID
RETRIEVE_METRICS
RETRIEVE_ONLINE_STATE
CHECK_WAKU_NOT_BLOCKED
type DebugNodeRequest* = object
operation: DebugNodeMsgType
proc createShared*(T: type DebugNodeRequest, op: DebugNodeMsgType): ptr type T =
var ret = createShared(T)
ret[].operation = op
return ret
proc destroyShared(self: ptr DebugNodeRequest) =
deallocShared(self)
proc getMultiaddresses(node: WakuNode): seq[string] =
return node.info().listenAddresses
proc getMetrics(): string =
{.gcsafe.}:
return defaultRegistry.toText() ## defaultRegistry is {.global.} in metrics module
proc process*(
self: ptr DebugNodeRequest, waku: Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
case self.operation
of RETRIEVE_LISTENING_ADDRESSES:
## returns a comma-separated string of the listen addresses
return ok(waku.node.getMultiaddresses().join(","))
of RETRIEVE_MY_ENR:
return ok(waku.node.enr.toURI())
of RETRIEVE_MY_PEER_ID:
return ok($waku.node.peerId())
of RETRIEVE_METRICS:
return ok(getMetrics())
of RETRIEVE_ONLINE_STATE:
return ok($waku.healthMonitor.onlineMonitor.amIOnline())
of CHECK_WAKU_NOT_BLOCKED:
return ok("waku thread is not blocked")
error "unsupported operation in DebugNodeRequest"
return err("unsupported operation in DebugNodeRequest")

View File

@ -1,151 +0,0 @@
import std/json
import chronos, chronicles, results, strutils, libp2p/multiaddress
import
../../../waku/factory/waku,
../../../waku/discovery/waku_dnsdisc,
../../../waku/discovery/waku_discv5,
../../../waku/waku_core/peers,
../../../waku/node/waku_node,
../../../waku/node/kernel_api,
../../alloc
type DiscoveryMsgType* = enum
GET_BOOTSTRAP_NODES
UPDATE_DISCV5_BOOTSTRAP_NODES
START_DISCV5
STOP_DISCV5
PEER_EXCHANGE
type DiscoveryRequest* = object
operation: DiscoveryMsgType
## used in GET_BOOTSTRAP_NODES
enrTreeUrl: cstring
nameDnsServer: cstring
timeoutMs: cint
## used in UPDATE_DISCV5_BOOTSTRAP_NODES
nodes: cstring
## used in PEER_EXCHANGE
numPeers: uint64
proc createShared(
T: type DiscoveryRequest,
op: DiscoveryMsgType,
enrTreeUrl: cstring,
nameDnsServer: cstring,
timeoutMs: cint,
nodes: cstring,
numPeers: uint64,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].enrTreeUrl = enrTreeUrl.alloc()
ret[].nameDnsServer = nameDnsServer.alloc()
ret[].timeoutMs = timeoutMs
ret[].nodes = nodes.alloc()
ret[].numPeers = numPeers
return ret
proc createRetrieveBootstrapNodesRequest*(
T: type DiscoveryRequest,
op: DiscoveryMsgType,
enrTreeUrl: cstring,
nameDnsServer: cstring,
timeoutMs: cint,
): ptr type T =
return T.createShared(op, enrTreeUrl, nameDnsServer, timeoutMs, "", 0)
proc createUpdateBootstrapNodesRequest*(
T: type DiscoveryRequest, op: DiscoveryMsgType, nodes: cstring
): ptr type T =
return T.createShared(op, "", "", 0, nodes, 0)
proc createDiscV5StartRequest*(T: type DiscoveryRequest): ptr type T =
return T.createShared(START_DISCV5, "", "", 0, "", 0)
proc createDiscV5StopRequest*(T: type DiscoveryRequest): ptr type T =
return T.createShared(STOP_DISCV5, "", "", 0, "", 0)
proc createPeerExchangeRequest*(
T: type DiscoveryRequest, numPeers: uint64
): ptr type T =
return T.createShared(PEER_EXCHANGE, "", "", 0, "", numPeers)
proc destroyShared(self: ptr DiscoveryRequest) =
deallocShared(self[].enrTreeUrl)
deallocShared(self[].nameDnsServer)
deallocShared(self[].nodes)
deallocShared(self)
proc retrieveBootstrapNodes(
enrTreeUrl: string, ipDnsServer: string
): Future[Result[seq[string], string]] {.async.} =
let dnsNameServers = @[parseIpAddress(ipDnsServer)]
let discoveredPeers: seq[RemotePeerInfo] = (
await retrieveDynamicBootstrapNodes(enrTreeUrl, dnsNameServers)
).valueOr:
return err("failed discovering peers from DNS: " & $error)
var multiAddresses = newSeq[string]()
for discPeer in discoveredPeers:
for address in discPeer.addrs:
multiAddresses.add($address & "/p2p/" & $discPeer)
return ok(multiAddresses)
proc updateDiscv5BootstrapNodes(nodes: string, waku: ptr Waku): Result[void, string] =
waku.wakuDiscv5.updateBootstrapRecords(nodes).isOkOr:
return err("error in updateDiscv5BootstrapNodes: " & $error)
return ok()
proc performPeerExchangeRequestTo(
numPeers: uint64, waku: ptr Waku
): Future[Result[int, string]] {.async.} =
let numPeersRecv = (await waku.node.fetchPeerExchangePeers(numPeers)).valueOr:
return err($error)
return ok(numPeersRecv)
proc process*(
self: ptr DiscoveryRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
case self.operation
of START_DISCV5:
let res = await waku.wakuDiscv5.start()
res.isOkOr:
error "START_DISCV5 failed", error = error
return err($error)
return ok("discv5 started correctly")
of STOP_DISCV5:
await waku.wakuDiscv5.stop()
return ok("discv5 stopped correctly")
of GET_BOOTSTRAP_NODES:
let nodes = (
await retrieveBootstrapNodes($self[].enrTreeUrl, $self[].nameDnsServer)
).valueOr:
error "GET_BOOTSTRAP_NODES failed", error = error
return err($error)
## returns a comma-separated string of bootstrap nodes' multiaddresses
return ok(nodes.join(","))
of UPDATE_DISCV5_BOOTSTRAP_NODES:
updateDiscv5BootstrapNodes($self[].nodes, waku).isOkOr:
error "UPDATE_DISCV5_BOOTSTRAP_NODES failed", error = error
return err($error)
return ok("discovery request processed correctly")
of PEER_EXCHANGE:
let numValidPeers = (await performPeerExchangeRequestTo(self[].numPeers, waku)).valueOr:
error "PEER_EXCHANGE failed", error = error
return err($error)
return ok($numValidPeers)
error "discovery request not handled"
return err("discovery request not handled")

View File

@ -1,135 +0,0 @@
import std/[sequtils, strutils, tables]
import chronicles, chronos, results, options, json
import
../../../waku/factory/waku,
../../../waku/node/waku_node,
../../alloc,
../../../waku/node/peer_manager
type PeerManagementMsgType* {.pure.} = enum
CONNECT_TO
GET_ALL_PEER_IDS
GET_CONNECTED_PEERS_INFO
GET_PEER_IDS_BY_PROTOCOL
DISCONNECT_PEER_BY_ID
DISCONNECT_ALL_PEERS
DIAL_PEER
DIAL_PEER_BY_ID
GET_CONNECTED_PEERS
type PeerManagementRequest* = object
operation: PeerManagementMsgType
peerMultiAddr: cstring
dialTimeout: Duration
protocol: cstring
peerId: cstring
type PeerInfo = object
protocols: seq[string]
addresses: seq[string]
proc createShared*(
T: type PeerManagementRequest,
op: PeerManagementMsgType,
peerMultiAddr = "",
dialTimeout = chronos.milliseconds(0), ## arbitrary Duration as not all ops needs dialTimeout
peerId = "",
protocol = "",
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].peerMultiAddr = peerMultiAddr.alloc()
ret[].peerId = peerId.alloc()
ret[].protocol = protocol.alloc()
ret[].dialTimeout = dialTimeout
return ret
proc destroyShared(self: ptr PeerManagementRequest) =
if not isNil(self[].peerMultiAddr):
deallocShared(self[].peerMultiAddr)
if not isNil(self[].peerId):
deallocShared(self[].peerId)
if not isNil(self[].protocol):
deallocShared(self[].protocol)
deallocShared(self)
proc process*(
self: ptr PeerManagementRequest, waku: Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
case self.operation
of CONNECT_TO:
let peers = ($self[].peerMultiAddr).split(",").mapIt(strip(it))
await waku.node.connectToNodes(peers, source = "static")
return ok("")
of GET_ALL_PEER_IDS:
## returns a comma-separated string of peerIDs
let peerIDs =
waku.node.peerManager.switch.peerStore.peers().mapIt($it.peerId).join(",")
return ok(peerIDs)
of GET_CONNECTED_PEERS_INFO:
## returns a JSON string mapping peerIDs to objects with protocols and addresses
var peersMap = initTable[string, PeerInfo]()
let peers = waku.node.peerManager.switch.peerStore.peers().filterIt(
it.connectedness == Connected
)
# Build a map of peer IDs to peer info objects
for peer in peers:
let peerIdStr = $peer.peerId
peersMap[peerIdStr] =
PeerInfo(protocols: peer.protocols, addresses: peer.addrs.mapIt($it))
# Convert the map to JSON string
let jsonObj = %*peersMap
let jsonStr = $jsonObj
return ok(jsonStr)
of GET_PEER_IDS_BY_PROTOCOL:
## returns a comma-separated string of peerIDs that mount the given protocol
let connectedPeers = waku.node.peerManager.switch.peerStore
.peers($self[].protocol)
.filterIt(it.connectedness == Connected)
.mapIt($it.peerId)
.join(",")
return ok(connectedPeers)
of DISCONNECT_PEER_BY_ID:
let peerId = PeerId.init($self[].peerId).valueOr:
error "DISCONNECT_PEER_BY_ID failed", error = $error
return err($error)
await waku.node.peerManager.disconnectNode(peerId)
return ok("")
of DISCONNECT_ALL_PEERS:
await waku.node.peerManager.disconnectAllPeers()
return ok("")
of DIAL_PEER:
let remotePeerInfo = parsePeerInfo($self[].peerMultiAddr).valueOr:
error "DIAL_PEER failed", error = $error
return err($error)
let conn = await waku.node.peerManager.dialPeer(remotePeerInfo, $self[].protocol)
if conn.isNone():
let msg = "failed dialing peer"
error "DIAL_PEER failed", error = msg, peerId = $remotePeerInfo.peerId
return err(msg)
of DIAL_PEER_BY_ID:
let peerId = PeerId.init($self[].peerId).valueOr:
error "DIAL_PEER_BY_ID failed", error = $error
return err($error)
let conn = await waku.node.peerManager.dialPeer(peerId, $self[].protocol)
if conn.isNone():
let msg = "failed dialing peer"
error "DIAL_PEER_BY_ID failed", error = msg, peerId = $peerId
return err(msg)
of GET_CONNECTED_PEERS:
## returns a comma-separated string of peerIDs
let
(inPeerIds, outPeerIds) = waku.node.peerManager.connectedPeers()
connectedPeerids = concat(inPeerIds, outPeerIds)
return ok(connectedPeerids.mapIt($it).join(","))
return ok("")

View File

@ -1,54 +0,0 @@
import std/[json, strutils]
import chronos, results
import libp2p/[protocols/ping, switch, multiaddress, multicodec]
import ../../../waku/[factory/waku, waku_core/peers, node/waku_node], ../../alloc
type PingRequest* = object
peerAddr: cstring
timeout: Duration
proc createShared*(
T: type PingRequest, peerAddr: cstring, timeout: Duration
): ptr type T =
var ret = createShared(T)
ret[].peerAddr = peerAddr.alloc()
ret[].timeout = timeout
return ret
proc destroyShared(self: ptr PingRequest) =
deallocShared(self[].peerAddr)
deallocShared(self)
proc process*(
self: ptr PingRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
let peerInfo = peers.parsePeerInfo(($self[].peerAddr).split(",")).valueOr:
return err("PingRequest failed to parse peer addr: " & $error)
proc ping(): Future[Result[Duration, string]] {.async, gcsafe.} =
try:
let conn = await waku.node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
defer:
await conn.close()
let pingRTT = await waku.node.libp2pPing.ping(conn)
if pingRTT == 0.nanos:
return err("could not ping peer: rtt-0")
return ok(pingRTT)
except CatchableError:
return err("could not ping peer: " & getCurrentExceptionMsg())
let pingFuture = ping()
let pingRTT: Duration =
if self[].timeout == chronos.milliseconds(0): # No timeout expected
?(await pingFuture)
else:
let timedOut = not (await pingFuture.withTimeout(self[].timeout))
if timedOut:
return err("ping timed out")
?(pingFuture.read())
ok($(pingRTT.nanos))

View File

@ -1,106 +0,0 @@
import options, std/[strutils, sequtils]
import chronicles, chronos, results
import
../../../../waku/waku_filter_v2/client,
../../../../waku/waku_core/message/message,
../../../../waku/factory/waku,
../../../../waku/waku_filter_v2/common,
../../../../waku/waku_core/subscription/push_handler,
../../../../waku/node/peer_manager/peer_manager,
../../../../waku/node/waku_node,
../../../../waku/node/kernel_api,
../../../../waku/waku_core/topics/pubsub_topic,
../../../../waku/waku_core/topics/content_topic,
../../../alloc
type FilterMsgType* = enum
SUBSCRIBE
UNSUBSCRIBE
UNSUBSCRIBE_ALL
type FilterRequest* = object
operation: FilterMsgType
pubsubTopic: cstring
contentTopics: cstring ## comma-separated list of content-topics
filterPushEventCallback: FilterPushHandler ## handles incoming filter pushed msgs
proc createShared*(
T: type FilterRequest,
op: FilterMsgType,
pubsubTopic: cstring = "",
contentTopics: cstring = "",
filterPushEventCallback: FilterPushHandler = nil,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].pubsubTopic = pubsubTopic.alloc()
ret[].contentTopics = contentTopics.alloc()
ret[].filterPushEventCallback = filterPushEventCallback
return ret
proc destroyShared(self: ptr FilterRequest) =
deallocShared(self[].pubsubTopic)
deallocShared(self[].contentTopics)
deallocShared(self)
proc process*(
self: ptr FilterRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
const FilterOpTimeout = 5.seconds
if waku.node.wakuFilterClient.isNil():
let errorMsg = "FilterRequest waku.node.wakuFilterClient is nil"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
case self.operation
of SUBSCRIBE:
waku.node.wakuFilterClient.registerPushHandler(self.filterPushEventCallback)
let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when subscribing"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
let pubsubTopic = some(PubsubTopic($self[].pubsubTopic))
let contentTopics = ($(self[].contentTopics)).split(",").mapIt(ContentTopic(it))
let subFut = waku.node.filterSubscribe(pubsubTopic, contentTopics, peer)
if not await subFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter subscription timed out"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
of UNSUBSCRIBE:
let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when unsubscribing"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
let pubsubTopic = some(PubsubTopic($self[].pubsubTopic))
let contentTopics = ($(self[].contentTopics)).split(",").mapIt(ContentTopic(it))
let subFut = waku.node.filterUnsubscribe(pubsubTopic, contentTopics, peer)
if not await subFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter un-subscription timed out"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
of UNSUBSCRIBE_ALL:
let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when unsubscribing all"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
let unsubFut = waku.node.filterUnsubscribeAll(peer)
if not await unsubFut.withTimeout(FilterOpTimeout):
let errorMsg = "filter un-subscription all timed out"
error "fail filter process", error = errorMsg, op = $(self.operation)
return err(errorMsg)
return ok("")

View File

@ -1,109 +0,0 @@
import options
import chronicles, chronos, results
import
../../../../waku/waku_core/message/message,
../../../../waku/waku_core/codecs,
../../../../waku/factory/waku,
../../../../waku/waku_core/message,
../../../../waku/waku_core/time, # Timestamp
../../../../waku/waku_core/topics/pubsub_topic,
../../../../waku/waku_lightpush_legacy/client,
../../../../waku/waku_lightpush_legacy/common,
../../../../waku/node/peer_manager/peer_manager,
../../../alloc
type LightpushMsgType* = enum
PUBLISH
type ThreadSafeWakuMessage* = object
payload: SharedSeq[byte]
contentTopic: cstring
meta: SharedSeq[byte]
version: uint32
timestamp: Timestamp
ephemeral: bool
when defined(rln):
proof: SharedSeq[byte]
type LightpushRequest* = object
operation: LightpushMsgType
pubsubTopic: cstring
message: ThreadSafeWakuMessage # only used in 'PUBLISH' requests
proc createShared*(
T: type LightpushRequest,
op: LightpushMsgType,
pubsubTopic: cstring,
m = WakuMessage(),
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].pubsubTopic = pubsubTopic.alloc()
ret[].message = ThreadSafeWakuMessage(
payload: allocSharedSeq(m.payload),
contentTopic: m.contentTopic.alloc(),
meta: allocSharedSeq(m.meta),
version: m.version,
timestamp: m.timestamp,
ephemeral: m.ephemeral,
)
when defined(rln):
ret[].message.proof = allocSharedSeq(m.proof)
return ret
proc destroyShared(self: ptr LightpushRequest) =
deallocSharedSeq(self[].message.payload)
deallocShared(self[].message.contentTopic)
deallocSharedSeq(self[].message.meta)
when defined(rln):
deallocSharedSeq(self[].message.proof)
deallocShared(self)
proc toWakuMessage(m: ThreadSafeWakuMessage): WakuMessage =
var wakuMessage = WakuMessage()
wakuMessage.payload = m.payload.toSeq()
wakuMessage.contentTopic = $m.contentTopic
wakuMessage.meta = m.meta.toSeq()
wakuMessage.version = m.version
wakuMessage.timestamp = m.timestamp
wakuMessage.ephemeral = m.ephemeral
when defined(rln):
wakuMessage.proof = m.proof
return wakuMessage
proc process*(
self: ptr LightpushRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
case self.operation
of PUBLISH:
let msg = self.message.toWakuMessage()
let pubsubTopic = $self.pubsubTopic
if waku.node.wakuLightpushClient.isNil():
let errorMsg = "LightpushRequest waku.node.wakuLightpushClient is nil"
error "PUBLISH failed", error = errorMsg
return err(errorMsg)
let peerOpt = waku.node.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone():
let errorMsg = "failed to lightpublish message, no suitable remote peers"
error "PUBLISH failed", error = errorMsg
return err(errorMsg)
let msgHashHex = (
await waku.node.wakuLegacyLightpushClient.publish(
pubsubTopic, msg, peer = peerOpt.get()
)
).valueOr:
error "PUBLISH failed", error = error
return err($error)
return ok(msgHashHex)

View File

@ -1,168 +0,0 @@
import std/[net, sequtils, strutils]
import chronicles, chronos, stew/byteutils, results
import
waku/waku_core/message/message,
waku/factory/[validator_signed, waku],
tools/confutils/cli_args,
waku/waku_node,
waku/waku_core/message,
waku/waku_core/time, # Timestamp
waku/waku_core/topics/pubsub_topic,
waku/waku_core/topics,
waku/waku_relay/protocol,
waku/node/peer_manager
import
../../../alloc
type RelayMsgType* = enum
SUBSCRIBE
UNSUBSCRIBE
PUBLISH
NUM_CONNECTED_PEERS
LIST_CONNECTED_PEERS
## to return the list of all connected peers to an specific pubsub topic
NUM_MESH_PEERS
LIST_MESH_PEERS
## to return the list of only the peers that conform the mesh for a particular pubsub topic
ADD_PROTECTED_SHARD ## Protects a shard with a public key
type ThreadSafeWakuMessage* = object
payload: SharedSeq[byte]
contentTopic: cstring
meta: SharedSeq[byte]
version: uint32
timestamp: Timestamp
ephemeral: bool
when defined(rln):
proof: SharedSeq[byte]
type RelayRequest* = object
operation: RelayMsgType
pubsubTopic: cstring
relayEventCallback: WakuRelayHandler # not used in 'PUBLISH' requests
message: ThreadSafeWakuMessage # only used in 'PUBLISH' requests
clusterId: cint # only used in 'ADD_PROTECTED_SHARD' requests
shardId: cint # only used in 'ADD_PROTECTED_SHARD' requests
publicKey: cstring # only used in 'ADD_PROTECTED_SHARD' requests
proc createShared*(
T: type RelayRequest,
op: RelayMsgType,
pubsubTopic: cstring = nil,
relayEventCallback: WakuRelayHandler = nil,
m = WakuMessage(),
clusterId: cint = 0,
shardId: cint = 0,
publicKey: cstring = nil,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].pubsubTopic = pubsubTopic.alloc()
ret[].clusterId = clusterId
ret[].shardId = shardId
ret[].publicKey = publicKey.alloc()
ret[].relayEventCallback = relayEventCallback
ret[].message = ThreadSafeWakuMessage(
payload: allocSharedSeq(m.payload),
contentTopic: m.contentTopic.alloc(),
meta: allocSharedSeq(m.meta),
version: m.version,
timestamp: m.timestamp,
ephemeral: m.ephemeral,
)
when defined(rln):
ret[].message.proof = allocSharedSeq(m.proof)
return ret
proc destroyShared(self: ptr RelayRequest) =
deallocSharedSeq(self[].message.payload)
deallocShared(self[].message.contentTopic)
deallocSharedSeq(self[].message.meta)
when defined(rln):
deallocSharedSeq(self[].message.proof)
deallocShared(self[].pubsubTopic)
deallocShared(self[].publicKey)
deallocShared(self)
proc toWakuMessage(m: ThreadSafeWakuMessage): WakuMessage =
var wakuMessage = WakuMessage()
wakuMessage.payload = m.payload.toSeq()
wakuMessage.contentTopic = $m.contentTopic
wakuMessage.meta = m.meta.toSeq()
wakuMessage.version = m.version
wakuMessage.timestamp = m.timestamp
wakuMessage.ephemeral = m.ephemeral
when defined(rln):
wakuMessage.proof = m.proof
return wakuMessage
proc process*(
self: ptr RelayRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
if waku.node.wakuRelay.isNil():
return err("Operation not supported without Waku Relay enabled.")
case self.operation
of SUBSCRIBE:
waku.node.subscribe(
(kind: SubscriptionKind.PubsubSub, topic: $self.pubsubTopic),
handler = self.relayEventCallback,
).isOkOr:
error "SUBSCRIBE failed", error
return err($error)
of UNSUBSCRIBE:
waku.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: $self.pubsubTopic)).isOkOr:
error "UNSUBSCRIBE failed", error
return err($error)
of PUBLISH:
let msg = self.message.toWakuMessage()
let pubsubTopic = $self.pubsubTopic
(await waku.node.wakuRelay.publish(pubsubTopic, msg)).isOkOr:
error "PUBLISH failed", error
return err($error)
let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex
return ok(msgHash)
of NUM_CONNECTED_PEERS:
let numConnPeers = waku.node.wakuRelay.getNumConnectedPeers($self.pubsubTopic).valueOr:
error "NUM_CONNECTED_PEERS failed", error
return err($error)
return ok($numConnPeers)
of LIST_CONNECTED_PEERS:
let connPeers = waku.node.wakuRelay.getConnectedPeers($self.pubsubTopic).valueOr:
error "LIST_CONNECTED_PEERS failed", error = error
return err($error)
## returns a comma-separated string of peerIDs
return ok(connPeers.mapIt($it).join(","))
of NUM_MESH_PEERS:
let numPeersInMesh = waku.node.wakuRelay.getNumPeersInMesh($self.pubsubTopic).valueOr:
error "NUM_MESH_PEERS failed", error = error
return err($error)
return ok($numPeersInMesh)
of LIST_MESH_PEERS:
let meshPeers = waku.node.wakuRelay.getPeersInMesh($self.pubsubTopic).valueOr:
error "LIST_MESH_PEERS failed", error = error
return err($error)
## returns a comma-separated string of peerIDs
return ok(meshPeers.mapIt($it).join(","))
of ADD_PROTECTED_SHARD:
try:
let relayShard =
RelayShard(clusterId: uint16(self.clusterId), shardId: uint16(self.shardId))
let protectedShard =
ProtectedShard.parseCmdArg($relayShard & ":" & $self.publicKey)
waku.node.wakuRelay.addSignedShardsValidator(
@[protectedShard], uint16(self.clusterId)
)
except ValueError:
return err(getCurrentExceptionMsg())
return ok("")

View File

@ -1,104 +0,0 @@
## This file contains the base message request type that will be handled.
## The requests are created by the main thread and processed by
## the Waku Thread.
import std/json, results
import chronos, chronos/threadsync
import
../../waku/factory/waku,
../ffi_types,
./requests/node_lifecycle_request,
./requests/peer_manager_request,
./requests/protocols/relay_request,
./requests/protocols/store_request,
./requests/protocols/lightpush_request,
./requests/protocols/filter_request,
./requests/debug_node_request,
./requests/discovery_request,
./requests/ping_request
type RequestType* {.pure.} = enum
LIFECYCLE
PEER_MANAGER
PING
RELAY
STORE
DEBUG
DISCOVERY
LIGHTPUSH
FILTER
type WakuThreadRequest* = object
reqType: RequestType
reqContent: pointer
callback: WakuCallBack
userData: pointer
proc createShared*(
T: type WakuThreadRequest,
reqType: RequestType,
reqContent: pointer,
callback: WakuCallBack,
userData: pointer,
): ptr type T =
var ret = createShared(T)
ret[].reqType = reqType
ret[].reqContent = reqContent
ret[].callback = callback
ret[].userData = userData
return ret
proc handleRes[T: string | void](
res: Result[T, string], request: ptr WakuThreadRequest
) =
## Handles the Result responses, which can either be Result[string, string] or
## Result[void, string].
defer:
deallocShared(request)
if res.isErr():
foreignThreadGc:
let msg = "libwaku error: handleRes fireSyncRes error: " & $res.error
request[].callback(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
)
return
foreignThreadGc:
var msg: cstring = ""
when T is string:
msg = res.get().cstring()
request[].callback(
RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
)
return
proc process*(
T: type WakuThreadRequest, request: ptr WakuThreadRequest, waku: ptr Waku
) {.async.} =
let retFut =
case request[].reqType
of LIFECYCLE:
cast[ptr NodeLifecycleRequest](request[].reqContent).process(waku)
of PEER_MANAGER:
cast[ptr PeerManagementRequest](request[].reqContent).process(waku[])
of PING:
cast[ptr PingRequest](request[].reqContent).process(waku)
of RELAY:
cast[ptr RelayRequest](request[].reqContent).process(waku)
of STORE:
cast[ptr StoreRequest](request[].reqContent).process(waku)
of DEBUG:
cast[ptr DebugNodeRequest](request[].reqContent).process(waku[])
of DISCOVERY:
cast[ptr DiscoveryRequest](request[].reqContent).process(waku)
of LIGHTPUSH:
cast[ptr LightpushRequest](request[].reqContent).process(waku)
of FILTER:
cast[ptr FilterRequest](request[].reqContent).process(waku)
handleRes(await retFut, request)
proc `$`*(self: WakuThreadRequest): string =
return $self.reqType

1
vendor/nim-ffi vendored Submodule

@ -0,0 +1 @@
Subproject commit d7a5492121aad190cf549436836e2fa42e34ff9b

View File

@ -30,7 +30,8 @@ requires "nim >= 2.2.4",
"regex", "regex",
"results", "results",
"db_connector", "db_connector",
"minilru" "minilru",
"ffi"
### Helper functions ### Helper functions
proc buildModule(filePath, params = "", lang = "c"): bool = proc buildModule(filePath, params = "", lang = "c"): bool =