refactor(libwaku): async (#3180)

This commit is contained in:
richΛrd 2024-12-02 10:56:12 -04:00 committed by GitHub
parent f856298caa
commit 47a6235414
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 322 additions and 292 deletions

View File

@ -5,6 +5,7 @@
#include <unistd.h>
#include <fcntl.h>
#include <stdint.h>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
@ -13,6 +14,22 @@
#include "base64.h"
#include "../../library/libwaku.h"
// Shared synchronization variables
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int callback_executed = 0;
void waitForCallback() {
pthread_mutex_lock(&mutex);
while (!callback_executed) {
pthread_cond_wait(&cond, &mutex);
}
callback_executed = 0;
pthread_mutex_unlock(&mutex);
}
#define WAKU_CALL(call) \
do { \
int ret = call; \
@ -20,6 +37,7 @@ do { \
printf("Failed the call to: %s. Returned code: %d\n", #call, ret); \
exit(1); \
} \
waitForCallback(); \
} while (0)
struct ConfigNode {
@ -99,6 +117,21 @@ void event_handler(int callerRet, const char* msg, size_t len, void* userData) {
else if (callerRet == RET_OK) {
printf("Receiving event: %s\n", msg);
}
pthread_mutex_lock(&mutex);
callback_executed = 1;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
void on_event_received(int callerRet, const char* msg, size_t len, void* userData) {
if (callerRet == RET_ERR) {
printf("Error: %s\n", msg);
exit(1);
}
else if (callerRet == RET_OK) {
printf("Receiving event: %s\n", msg);
}
}
char* contentTopic = NULL;
@ -161,10 +194,20 @@ void show_help_and_exit() {
void print_default_pubsub_topic(int callerRet, const char* msg, size_t len, void* userData) {
printf("Default pubsub topic: %s\n", msg);
pthread_mutex_lock(&mutex);
callback_executed = 1;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
void print_waku_version(int callerRet, const char* msg, size_t len, void* userData) {
printf("Git Version: %s\n", msg);
pthread_mutex_lock(&mutex);
callback_executed = 1;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
// Beginning of UI program logic
@ -247,15 +290,13 @@ void handle_user_input() {
// End of UI program logic
int main(int argc, char** argv) {
waku_setup();
struct ConfigNode cfgNode;
// default values
snprintf(cfgNode.host, 128, "0.0.0.0");
cfgNode.port = 60000;
cfgNode.relay = 1;
cfgNode.store = 1;
cfgNode.store = 0;
snprintf(cfgNode.storeNode, 2048, "");
snprintf(cfgNode.storeRetentionPolicy, 64, "time:6000000");
snprintf(cfgNode.storeDbUrl, 256, "postgres://postgres:test123@localhost:5432/postgres");
@ -296,6 +337,7 @@ int main(int argc, char** argv) {
cfgNode.storeMaxNumDbConnections);
ctx = waku_new(jsonConfig, event_handler, userData);
waitForCallback();
WAKU_CALL( waku_default_pubsub_topic(ctx, print_default_pubsub_topic, userData) );
WAKU_CALL( waku_version(ctx, print_waku_version, userData) );
@ -303,10 +345,12 @@ int main(int argc, char** argv) {
printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port);
printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO");
waku_set_event_callback(ctx, event_handler, userData);
waku_start(ctx, event_handler, userData);
waku_set_event_callback(ctx, on_event_received, userData);
waku_listen_addresses(ctx, event_handler, userData);
waku_start(ctx, event_handler, userData);
waitForCallback();
WAKU_CALL( waku_listen_addresses(ctx, event_handler, userData) );
printf("Establishing connection with: %s\n", cfgNode.peers);
@ -334,4 +378,7 @@ int main(int argc, char** argv) {
while(1) {
handle_user_input();
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
}

View File

@ -199,8 +199,6 @@ auto cify(F&& f) {
}
int main(int argc, char** argv) {
waku_setup();
struct ConfigNode cfgNode;
// default values
snprintf(cfgNode.host, 128, "0.0.0.0");

View File

@ -296,10 +296,6 @@ type WakuNode struct {
ctx unsafe.Pointer
}
func WakuSetup() {
C.waku_setup()
}
func WakuNew(config WakuConfig) (*WakuNode, error) {
jsonConfig, err := json.Marshal(config)
if err != nil {
@ -557,8 +553,6 @@ func (self *WakuNode) WakuGetMyENR() (string, error) {
}
func main() {
WakuSetup()
config := WakuConfig{
Host: "0.0.0.0",
Port: 30304,

View File

@ -177,7 +177,6 @@ jclass loadClass(JNIEnv *env, const char *className) {
}
void Java_com_mobile_WakuModule_wakuSetup(JNIEnv *env, jobject thiz) {
waku_setup();
LOGD("log example for debugging purposes...")
}

View File

@ -58,9 +58,6 @@ json_config = "{ \
callback_type = ctypes.CFUNCTYPE(None, ctypes.c_int, ctypes.c_char_p, ctypes.c_size_t)
# libwaku setup
libwaku.waku_setup()
# Node creation
libwaku.waku_new.restype = ctypes.c_void_p
libwaku.waku_new.argtypes = [ctypes.c_char_p,

View File

@ -13,8 +13,6 @@ pub type WakuCallback =
);
extern "C" {
pub fn waku_setup();
pub fn waku_new(
config_json: *const u8,
cb: WakuCallback,
@ -70,8 +68,6 @@ fn main() {
}";
unsafe {
waku_setup();
// Create the waku node
let closure = |ret: i32, data: &str| {
println!("Ret {ret}. Error creating waku node {data}");

View File

@ -1,13 +0,0 @@
import ./waku_thread/waku_thread
type WakuCallBack* = proc(
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].}
template checkLibwakuParams*(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
) =
ctx[].userData = userData
if isNil(callback):
return RET_MISSING_CALLBACK

30
library/ffi_types.nim Normal file
View File

@ -0,0 +1,30 @@
################################################################################
### Exported types
type WakuCallBack* = proc(
callerRet: cint, msg: ptr cchar, len: csize_t, userData: pointer
) {.cdecl, gcsafe, raises: [].}
const RET_OK*: cint = 0
const RET_ERR*: cint = 1
const RET_MISSING_CALLBACK*: cint = 2
### End of exported types
################################################################################
################################################################################
### FFI utils
template foreignThreadGc*(body: untyped) =
when declared(setupForeignThreadGc):
setupForeignThreadGc()
body
when declared(tearDownForeignThreadGc):
tearDownForeignThreadGc()
type onDone* = proc()
### End of FFI utils
################################################################################

View File

@ -20,9 +20,6 @@ extern "C" {
typedef void (*WakuCallBack) (int callerRet, const char* msg, size_t len, void* userData);
// Initializes the library. Should be called before any other function
void waku_setup();
// Creates a new instance of the waku node.
// Sets up the waku node from the given configuration.
// Returns a pointer to the Context needed by the rest of the API functions.

View File

@ -6,7 +6,7 @@ when defined(linux):
{.passl: "-Wl,-soname,libwaku.so".}
import std/[json, atomics, strformat, options, atomics]
import chronicles, chronos
import chronicles, chronos, chronos/threadsync
import
waku/common/base64,
waku/waku_core/message/message,
@ -27,51 +27,35 @@ import
./waku_thread/inter_thread_communication/requests/ping_request,
./waku_thread/inter_thread_communication/waku_thread_request,
./alloc,
./callback
./ffi_types
################################################################################
### Wrapper around the waku node
################################################################################
################################################################################
### Exported types
const RET_OK: cint = 0
const RET_ERR: cint = 1
const RET_MISSING_CALLBACK: cint = 2
### End of exported types
################################################################################
################################################################################
### Not-exported components
template foreignThreadGc(body: untyped) =
when declared(setupForeignThreadGc):
setupForeignThreadGc()
template checkLibwakuParams*(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
) =
ctx[].userData = userData
body
if isNil(callback):
return RET_MISSING_CALLBACK
when declared(tearDownForeignThreadGc):
tearDownForeignThreadGc()
proc handleRes[T: string | void](
res: Result[T, string], callback: WakuCallBack, userData: pointer
proc handleRequest(
ctx: ptr WakuContext,
requestType: RequestType,
content: pointer,
callback: WakuCallBack,
userData: pointer,
): cint =
## Handles the Result responses, which can either be Result[string, string] or
## Result[void, string]. Notice that in case of Result[void, string], it is enough to
## just return RET_OK and not provide any additional feedback through the callback.
if res.isErr():
foreignThreadGc:
let msg = "libwaku error: " & $res.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
waku_thread.sendRequestToWakuThread(ctx, requestType, content, callback, userData).isOkOr:
let msg = "libwaku error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
foreignThreadGc:
var msg: cstring = ""
when T is string:
msg = res.get().cstring()
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK
proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler =
@ -87,15 +71,14 @@ proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler =
error "eventUserData is nil"
return
try:
let event = $JsonMessageEvent.new(pubsubTopic, msg)
foreignThreadGc:
foreignThreadGc:
try:
let event = $JsonMessageEvent.new(pubsubTopic, msg)
cast[WakuCallBack](ctx[].eventCallback)(
RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), ctx[].eventUserData
)
except Exception, CatchableError:
let msg = "Exception when calling 'eventCallBack': " & getCurrentExceptionMsg()
foreignThreadGc:
except Exception, CatchableError:
let msg = "Exception when calling 'eventCallBack': " & getCurrentExceptionMsg()
cast[WakuCallBack](ctx[].eventCallback)(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), ctx[].eventUserData
)
@ -121,24 +104,27 @@ if defined(android):
) {.raises: [].} =
echo logLevel, msg
proc initializeLibrary() {.exported.} =
if not initialized.exchange(true):
NimMain() # Every Nim library needs to call `NimMain` once exactly
when declared(setupForeignThreadGc):
setupForeignThreadGc()
when declared(nimGC_setStackBottom):
var locals {.volatile, noinit.}: pointer
locals = addr(locals)
nimGC_setStackBottom(locals)
### End of library setup
################################################################################
################################################################################
### Exported procs
proc waku_setup() {.dynlib, exportc.} =
NimMain()
if not initialized.load:
initialized.store(true)
when declared(nimGC_setStackBottom):
var locals {.volatile, noinit.}: pointer
locals = addr(locals)
nimGC_setStackBottom(locals)
proc waku_new(
configJson: cstring, callback: WakuCallback, userData: pointer
): pointer {.dynlib, exportc, cdecl.} =
initializeLibrary()
## Creates a new instance of the WakuNode.
if isNil(callback):
echo "error: missing callback in waku_new"
@ -146,50 +132,56 @@ proc waku_new(
## Create the Waku thread that will keep waiting for req from the main thread.
var ctx = waku_thread.createWakuThread().valueOr:
foreignThreadGc:
let msg = "Error in createWakuThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
let msg = "Error in createWakuThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
ctx.userData = userData
waku_thread.sendRequestToWakuThread(
let retCode = handleRequest(
ctx,
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.CREATE_NODE, configJson),
).isOkOr:
foreignThreadGc:
let msg = $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil
callback,
userData,
)
if retCode == RET_ERR:
return nil
return ctx
proc waku_destroy(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread.destroyWakuThread(ctx).handleRes(callback, userData)
waku_thread.destroyWakuThread(ctx).isOkOr:
let msg = "libwaku error: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR
return RET_OK
proc waku_version(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
foreignThreadGc:
callback(
RET_OK,
cast[ptr cchar](WakuNodeVersionString),
cast[csize_t](len(WakuNodeVersionString)),
userData,
)
callback(
RET_OK,
cast[ptr cchar](WakuNodeVersionString),
cast[csize_t](len(WakuNodeVersionString)),
userData,
)
return RET_OK
proc waku_set_event_callback(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
) {.dynlib, exportc.} =
initializeLibrary()
ctx[].eventCallback = cast[pointer](callback)
ctx[].eventUserData = userData
@ -204,6 +196,7 @@ proc waku_content_topic(
): cint {.dynlib, exportc.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let appStr = appName.alloc()
@ -226,6 +219,7 @@ proc waku_pubsub_topic(
): cint {.dynlib, exportc, cdecl.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let topicNameStr = topicName.alloc()
@ -244,6 +238,7 @@ proc waku_default_pubsub_topic(
): cint {.dynlib, exportc.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_default_pubsub_topic
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
callback(
@ -265,6 +260,7 @@ proc waku_relay_publish(
): cint {.dynlib, exportc, cdecl.} =
# https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let jwm = jsonWakuMessage.alloc()
@ -295,8 +291,7 @@ proc waku_relay_publish(
else:
$pst
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(
@ -305,34 +300,35 @@ proc waku_relay_publish(
WakuRelayHandler(onReceivedMessage(ctx)),
wakuMessage,
),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_start(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.START_NODE),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_stop(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.LIFECYCLE,
NodeLifecycleRequest.createShared(NodeLifecycleMsgType.STOP_NODE),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_relay_subscribe(
ctx: ptr WakuContext,
@ -340,6 +336,7 @@ proc waku_relay_subscribe(
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let pst = pubSubTopic.alloc()
@ -347,15 +344,15 @@ proc waku_relay_subscribe(
deallocShared(pst)
var cb = onReceivedMessage(ctx)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(
RelayMsgType.SUBSCRIBE, PubsubTopic($pst), WakuRelayHandler(cb)
),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_relay_unsubscribe(
ctx: ptr WakuContext,
@ -363,14 +360,14 @@ proc waku_relay_unsubscribe(
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let pst = pubSubTopic.alloc()
defer:
deallocShared(pst)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(
@ -378,8 +375,9 @@ proc waku_relay_unsubscribe(
PubsubTopic($pst),
WakuRelayHandler(onReceivedMessage(ctx)),
),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_relay_get_num_connected_peers(
ctx: ptr WakuContext,
@ -387,19 +385,20 @@ proc waku_relay_get_num_connected_peers(
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let pst = pubSubTopic.alloc()
defer:
deallocShared(pst)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.LIST_CONNECTED_PEERS, PubsubTopic($pst)),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_relay_get_num_peers_in_mesh(
ctx: ptr WakuContext,
@ -407,19 +406,20 @@ proc waku_relay_get_num_peers_in_mesh(
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let pst = pubSubTopic.alloc()
defer:
deallocShared(pst)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.LIST_MESH_PEERS, PubsubTopic($pst)),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_filter_subscribe(
ctx: ptr WakuContext,
@ -428,10 +428,10 @@ proc waku_filter_subscribe(
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.FILTER,
FilterRequest.createShared(
@ -440,8 +440,9 @@ proc waku_filter_subscribe(
contentTopics,
FilterPushHandler(onReceivedMessage(ctx)),
),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_filter_unsubscribe(
ctx: ptr WakuContext,
@ -450,26 +451,30 @@ proc waku_filter_unsubscribe(
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.FILTER,
FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE, pubSubTopic, contentTopics),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_filter_unsubscribe_all(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
ctx, RequestType.FILTER, FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE_ALL)
handleRequest(
ctx,
RequestType.FILTER,
FilterRequest.createShared(FilterMsgType.UNSUBSCRIBE_ALL),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_lightpush_publish(
ctx: ptr WakuContext,
@ -478,6 +483,7 @@ proc waku_lightpush_publish(
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc, cdecl.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
let jwm = jsonWakuMessage.alloc()
@ -506,15 +512,15 @@ proc waku_lightpush_publish(
else:
$pst
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.LIGHTPUSH,
LightpushRequest.createShared(
LightpushMsgType.PUBLISH, PubsubTopic($pst), wakuMessage
),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_connect(
ctx: ptr WakuContext,
@ -523,32 +529,34 @@ proc waku_connect(
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
PeerManagementMsgType.CONNECT_TO, $peerMultiAddr, chronos.milliseconds(timeoutMs)
),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_disconnect_peer_by_id(
ctx: ptr WakuContext, peerId: cstring, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
op = PeerManagementMsgType.DISCONNECT_PEER_BY_ID, peerId = $peerId
),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_dial_peer(
ctx: ptr WakuContext,
@ -558,10 +566,10 @@ proc waku_dial_peer(
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
@ -569,8 +577,9 @@ proc waku_dial_peer(
peerMultiAddr = $peerMultiAddr,
protocol = $protocol,
),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_dial_peer_by_id(
ctx: ptr WakuContext,
@ -580,58 +589,62 @@ proc waku_dial_peer_by_id(
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
op = PeerManagementMsgType.DIAL_PEER_BY_ID, peerId = $peerId, protocol = $protocol
),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_get_peerids_from_peerstore(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(PeerManagementMsgType.GET_ALL_PEER_IDS),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_get_connected_peers(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(PeerManagementMsgType.GET_CONNECTED_PEERS),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_get_peerids_by_protocol(
ctx: ptr WakuContext, protocol: cstring, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(
op = PeerManagementMsgType.GET_PEER_IDS_BY_PROTOCOL, protocol = $protocol
),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_store_query(
ctx: ptr WakuContext,
@ -641,28 +654,30 @@ proc waku_store_query(
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.STORE,
JsonStoreQueryRequest.createShared(jsonQuery, peerAddr, timeoutMs),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_listen_addresses(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.DEBUG,
DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_LISTENING_ADDRESSES),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_dns_discovery(
ctx: ptr WakuContext,
@ -672,93 +687,106 @@ proc waku_dns_discovery(
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.DISCOVERY,
DiscoveryRequest.createRetrieveBootstrapNodesRequest(
DiscoveryMsgType.GET_BOOTSTRAP_NODES, entTreeUrl, nameDnsServer, timeoutMs
),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_discv5_update_bootnodes(
ctx: ptr WakuContext, bootnodes: cstring, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
## Updates the bootnode list used for discovering new peers via DiscoveryV5
## bootnodes - JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]`
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.DISCOVERY,
DiscoveryRequest.createUpdateBootstrapNodesRequest(
DiscoveryMsgType.UPDATE_DISCV5_BOOTSTRAP_NODES, bootnodes
),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_get_my_enr(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.DEBUG,
DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_MY_ENR),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_get_my_peerid(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.DEBUG,
DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_MY_PEER_ID),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_start_discv5(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StartRequest()
handleRequest(
ctx,
RequestType.DISCOVERY,
DiscoveryRequest.createDiscV5StartRequest(),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_stop_discv5(
ctx: ptr WakuContext, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
ctx, RequestType.DISCOVERY, DiscoveryRequest.createDiscV5StopRequest()
handleRequest(
ctx,
RequestType.DISCOVERY,
DiscoveryRequest.createDiscV5StopRequest(),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_peer_exchange_request(
ctx: ptr WakuContext, numPeers: uint64, callback: WakuCallBack, userData: pointer
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
ctx, RequestType.DISCOVERY, DiscoveryRequest.createPeerExchangeRequest(numPeers)
handleRequest(
ctx,
RequestType.DISCOVERY,
DiscoveryRequest.createPeerExchangeRequest(numPeers),
callback,
userData,
)
.handleRes(callback, userData)
proc waku_ping_peer(
ctx: ptr WakuContext,
@ -767,15 +795,16 @@ proc waku_ping_peer(
callback: WakuCallBack,
userData: pointer,
): cint {.dynlib, exportc.} =
initializeLibrary()
checkLibwakuParams(ctx, callback, userData)
waku_thread
.sendRequestToWakuThread(
handleRequest(
ctx,
RequestType.PING,
PingRequest.createShared(peerAddr, chronos.milliseconds(timeoutMs)),
callback,
userData,
)
.handleRes(callback, userData)
### End of exported procs
################################################################################

View File

@ -3,9 +3,10 @@
## the Waku Thread.
import std/json, results
import chronos
import chronos, chronos/threadsync
import
../../../waku/factory/waku,
../../ffi_types,
./requests/node_lifecycle_request,
./requests/peer_manager_request,
./requests/protocols/relay_request,
@ -27,27 +28,55 @@ type RequestType* {.pure.} = enum
LIGHTPUSH
FILTER
type InterThreadRequest* = object
type WakuThreadRequest* = object
reqType: RequestType
reqContent: pointer
callback: WakuCallBack
userData: pointer
proc createShared*(
T: type InterThreadRequest, reqType: RequestType, reqContent: pointer
T: type WakuThreadRequest,
reqType: RequestType,
reqContent: pointer,
callback: WakuCallBack,
userData: pointer,
): ptr type T =
var ret = createShared(T)
ret[].reqType = reqType
ret[].reqContent = reqContent
ret[].callback = callback
ret[].userData = userData
return ret
proc process*(
T: type InterThreadRequest, request: ptr InterThreadRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
## Processes the request and deallocates its memory
proc handleRes[T: string | void](
res: Result[T, string], request: ptr WakuThreadRequest
) =
## Handles the Result responses, which can either be Result[string, string] or
## Result[void, string].
defer:
deallocShared(request)
echo "Request received: " & $request[].reqType
if res.isErr():
foreignThreadGc:
let msg = "libwaku error: handleRes fireSyncRes error: " & $res.error
request[].callback(
RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
)
return
foreignThreadGc:
var msg: cstring = ""
when T is string:
msg = res.get().cstring()
request[].callback(
RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), request[].userData
)
return
proc process*(
T: type WakuThreadRequest, request: ptr WakuThreadRequest, waku: ptr Waku
) {.async.} =
let retFut =
case request[].reqType
of LIFECYCLE:
@ -69,7 +98,7 @@ proc process*(
of FILTER:
cast[ptr FilterRequest](request[].reqContent).process(waku)
return await retFut
handleRes(await retFut, request)
proc `$`*(self: InterThreadRequest): string =
proc `$`*(self: WakuThreadRequest): string =
return $self.reqType

View File

@ -1,48 +0,0 @@
## This file contains the base message response type that will be handled.
## The response will be created from the Waku Thread and processed in
## the main thread.
import std/json, results
import ../../alloc
type ResponseType {.pure.} = enum
OK
ERR
type InterThreadResponse* = object
respType: ResponseType
content: cstring
proc createShared*(
T: type InterThreadResponse, res: Result[string, string]
): ptr type T =
## Converts a `Result[string, string]` into a `ptr InterThreadResponse`
## so that it can be transfered to another thread in a safe way.
var ret = createShared(T)
if res.isOk():
let value = res.get()
ret[].respType = ResponseType.OK
ret[].content = value.alloc()
else:
let error = res.error
ret[].respType = ResponseType.ERR
ret[].content = res.error.alloc()
return ret
proc process*(
T: type InterThreadResponse, resp: ptr InterThreadResponse
): Result[string, string] =
## Converts the received `ptr InterThreadResponse` into a
## `Result[string, string]`. Notice that the response is expected to be
## allocated from the Waku Thread and deallocated by the main thread.
defer:
deallocShared(resp[].content)
deallocShared(resp)
case resp[].respType
of OK:
return ok($resp[].content)
of ERR:
return err($resp[].content)

View File

@ -4,17 +4,12 @@
import std/[options, atomics, os, net]
import chronicles, chronos, chronos/threadsync, taskpools/channels_spsc_single, results
import
waku/factory/waku,
./inter_thread_communication/waku_thread_request,
./inter_thread_communication/waku_thread_response
import waku/factory/waku, ./inter_thread_communication/waku_thread_request, ../ffi_types
type WakuContext* = object
thread: Thread[(ptr WakuContext)]
reqChannel: ChannelSPSCSingle[ptr InterThreadRequest]
reqChannel: ChannelSPSCSingle[ptr WakuThreadRequest]
reqSignal: ThreadSignalPtr
respChannel: ChannelSPSCSingle[ptr InterThreadResponse]
respSignal: ThreadSignalPtr
userData*: pointer
eventCallback*: pointer
eventUserdata*: pointer
@ -36,27 +31,14 @@ proc runWaku(ctx: ptr WakuContext) {.async.} =
break
## Trying to get a request from the libwaku requestor thread
var request: ptr InterThreadRequest
var request: ptr WakuThreadRequest
let recvOk = ctx.reqChannel.tryRecv(request)
if not recvOk:
error "waku thread could not receive a request"
continue
## Handle the request
let resultResponse = waitFor InterThreadRequest.process(request, addr waku)
## Converting a `Result` into a thread-safe transferable response type
let threadSafeResp = InterThreadResponse.createShared(resultResponse)
## Send the response back to the thread that sent the request
let sentOk = ctx.respChannel.trySend(threadSafeResp)
if not sentOk:
error "could not send a request to the requester thread",
original_request = $request[]
let fireRes = ctx.respSignal.fireSync()
if fireRes.isErr():
error "could not fireSync back to requester thread", error = fireRes.error
asyncSpawn WakuThreadRequest.process(request, addr waku)
proc run(ctx: ptr WakuContext) {.thread.} =
## Launch waku worker
@ -68,8 +50,6 @@ proc createWakuThread*(): Result[ptr WakuContext, string] =
var ctx = createShared(WakuContext, 1)
ctx.reqSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create reqSignal ThreadSignalPtr")
ctx.respSignal = ThreadSignalPtr.new().valueOr:
return err("couldn't create respSignal ThreadSignalPtr")
ctx.running.store(true)
@ -93,36 +73,31 @@ proc destroyWakuThread*(ctx: ptr WakuContext): Result[void, string] =
joinThread(ctx.thread)
?ctx.reqSignal.close()
?ctx.respSignal.close()
freeShared(ctx)
return ok()
proc sendRequestToWakuThread*(
ctx: ptr WakuContext, reqType: RequestType, reqContent: pointer
): Result[string, string] =
let req = InterThreadRequest.createShared(reqType, reqContent)
ctx: ptr WakuContext,
reqType: RequestType,
reqContent: pointer,
callback: WakuCallBack,
userData: pointer,
): Result[void, string] =
let req = WakuThreadRequest.createShared(reqType, reqContent, callback, userData)
## Sending the request
let sentOk = ctx.reqChannel.trySend(req)
if not sentOk:
deallocShared(req)
return err("Couldn't send a request to the waku thread: " & $req[])
let fireSyncRes = ctx.reqSignal.fireSync()
if fireSyncRes.isErr():
deallocShared(req)
return err("failed fireSync: " & $fireSyncRes.error)
if fireSyncRes.get() == false:
deallocShared(req)
return err("Couldn't fireSync in time")
# Waiting for the response
let res = waitSync(ctx.respSignal)
if res.isErr():
return err("Couldnt receive response signal")
var response: ptr InterThreadResponse
var recvOk = ctx.respChannel.tryRecv(response)
if recvOk == false:
return err("Couldn't receive response from the waku thread: " & $req[])
## Converting the thread-safe response into a managed/CG'ed `Result`
return InterThreadResponse.process(response)
ok()