chore: make sure waku_new, waku_start and waku_stop work

This commit is contained in:
Richard Ramos 2024-02-13 16:18:16 -04:00
parent ca72e70bb6
commit 5687e2585c
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
5 changed files with 78 additions and 55 deletions

View File

@ -13,7 +13,9 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
// internal
use crate::general::{WakuMessage, WakuPubSubTopic};
use crate::utils::get_trampoline;
use crate::MessageId;
use waku_sys::WakuCallBack;
/// Event signal
#[derive(Serialize, Deserialize)]
@ -67,31 +69,28 @@ impl WakuMessageEvent {
}
}
/// Wrapper callback, it transformst the `*const c_char` into a [`Signal`]
/// and executes the [`CALLBACK`] funtion with it
extern "C" fn callback(_ret_code: c_int, data: *const c_char, _user_data: *mut c_void) {
let raw_response = unsafe { CStr::from_ptr(data) }
.to_str()
.expect("Not null ptr");
let data: Signal = serde_json::from_str(raw_response).expect("Parsing signal to succeed");
(CALLBACK
.deref()
.lock()
.expect("Access to the shared callback")
.as_mut())(data)
fn callback() -> WakuCallBack {
let cb = |v: &str| {
print!("{}", v);
let data: Signal = serde_json::from_str(v).expect("Parsing signal to succeed");
};
let mut closure = cb;
get_trampoline(&closure)
}
/// Register callback to act as event handler and receive application signals,
/// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(Signal) + Send + Sync + 'static>(f: F) {
// TODO:
unsafe { waku_sys::waku_set_event_callback(Some(callback)) };
pub fn waku_set_event_callback(ctx: *mut c_void) {
// <F: FnMut(Signal) + Send + Sync + 'static> , , f: F
unsafe { waku_sys::waku_set_event_callback(ctx, callback(), std::ptr::null_mut()) };
}
#[cfg(test)]
mod tests {
use crate::events::waku_set_event_callback;
/*use crate::events::waku_set_event_callback;
use crate::{Event, Signal};
// TODO: how to actually send a signal and check if the callback is run?
@ -110,5 +109,5 @@ mod tests {
fn deserialize_event() {
let e = "{\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}";
let _: Event = serde_json::from_str(e).unwrap();
}
}*/
}

View File

@ -2,6 +2,7 @@
// std
use std::ffi::CString;
use std::ptr;
// crates
use libc::c_void;
// internal
@ -33,6 +34,12 @@ pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<*mut c_void> {
out
};
// TODO: create error handler function, format of err message is
// {"message":"The actual message","eventType":"error"}
if error != "" {
return Err(error);
}
Ok(node_ptr)
}
@ -68,12 +75,24 @@ pub fn waku_stop(ctx: *mut c_void) -> Result<()> {
mod test {
use super::waku_new;
use crate::node::management::{waku_start, waku_stop};
use crate::WakuNodeConfig;
use secp256k1::SecretKey;
use serial_test::serial;
use std::str::FromStr;
#[test]
#[serial]
fn waku_flow() {
let node = waku_new(None).unwrap();
let node = waku_new(Some(WakuNodeConfig {
node_key: Some(
SecretKey::from_str(
"05f381866cc21f6c1e2e80e07fa732008e36d942dce3206ad6dcd6793c98d609",
)
.unwrap(),
), // TODO: consider making this optional
..Default::default()
}))
.unwrap();
waku_start(node).unwrap();
waku_stop(node).unwrap();
}

View File

@ -1,6 +1,7 @@
//! Waku node implementation
mod config;
mod events;
mod management;
mod peers;
mod relay;
@ -17,6 +18,7 @@ use libc::c_void;
use crate::general::{MessageId, Result, WakuMessage, WakuPubSubTopic};
pub use config::WakuNodeConfig;
pub use events::{Event, Signal, WakuMessageEvent};
pub use relay::{waku_create_content_topic, waku_default_pubsub_topic};
/// Handle to the underliying waku node
@ -27,13 +29,13 @@ pub struct WakuNodeHandle {
impl WakuNodeHandle {
/// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start)
pub fn start(self) -> Result<()> {
pub fn start(&self) -> Result<()> {
management::waku_start(self.ctx)
}
/// Stops a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
pub fn stop(self) -> Result<()> {
pub fn stop(&self) -> Result<()> {
management::waku_stop(self.ctx)
}
@ -67,6 +69,10 @@ impl WakuNodeHandle {
pub fn relay_unsubscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
relay::waku_relay_unsubscribe(self.ctx, pubsub_topic)
}
pub fn set_event_callback(&self) {
events::waku_set_event_callback(self.ctx)
}
}
/// Spawn a new Waku node with the given configuration (default configuration if `None` provided)

View File

@ -3,13 +3,14 @@ use multiaddr::Multiaddr;
use rand::thread_rng;
use secp256k1::SecretKey;
use serial_test::serial;
use std::str::FromStr;
use std::time::{Duration, SystemTime};
use std::{collections::HashSet, str::from_utf8};
use tokio::sync::mpsc::{self, Sender};
use tokio::time;
use waku_bindings::{
waku_new, waku_set_event_callback, Encoding, Event, Key, MessageId, WakuContentTopic,
WakuMessage, WakuNodeConfig, WakuNodeHandle, WakuPubSubTopic,
waku_new, Encoding, Event, Key, MessageId, WakuContentTopic, WakuMessage, WakuNodeConfig,
WakuNodeHandle, WakuPubSubTopic,
};
const ECHO_TIMEOUT: u64 = 10;
@ -25,9 +26,10 @@ fn try_publish_relay_messages(
node: &WakuNodeHandle,
msg: &WakuMessage,
) -> Result<HashSet<MessageId>, String> {
Ok(HashSet::from(
[node.relay_publish_message(msg, None, None)?],
))
let topic = "test".to_string();
Ok(HashSet::from([
node.relay_publish_message(msg, &topic, None)?
]))
}
#[derive(Debug)]
@ -36,23 +38,6 @@ struct Response {
payload: Vec<u8>,
}
fn set_callback(tx: Sender<Response>, sk: SecretKey, ssk: Key<Aes256Gcm>) {
waku_set_event_callback(move |signal| {
if let Event::WakuMessage(message) = signal.event() {
let id = message.message_id();
let message = message.waku_message();
let payload = message.payload().to_vec();
futures::executor::block_on(tx.send(Response {
id: id.to_string(),
payload,
}))
.expect("send response to the receiver");
}
});
}
async fn test_echo_messages(
node: &WakuNodeHandle,
content: &'static str,
@ -74,21 +59,22 @@ async fn test_echo_messages(
false,
);
let (tx, mut rx) = mpsc::channel(1);
set_callback(tx, sk, ssk);
/*
// let (tx, mut rx) = mpsc::channel(1);
//set_callback(tx, sk, ssk);
let mut ids = try_publish_relay_messages(node, &message).expect("send relay messages");
let mut ids = try_publish_relay_messages(node, &message).expect("send relay messages");
while let Some(res) = rx.recv().await {
if ids.take(&res.id).is_some() {
let msg = from_utf8(&res.payload).expect("should be valid message");
assert_eq!(content, msg);
}
while let Some(res) = rx.recv().await {
if ids.take(&res.id).is_some() {
let msg = from_utf8(&res.payload).expect("should be valid message");
assert_eq!(content, msg);
}
if ids.is_empty() {
break;
}
}
if ids.is_empty() {
break;
}
}*/
}
#[ignore]
@ -96,6 +82,10 @@ async fn test_echo_messages(
#[serial]
async fn default_echo() -> Result<(), String> {
let config = WakuNodeConfig {
node_key: Some(
SecretKey::from_str("05f381866cc21f6c1e2e80e07fa732008e36d942dce3206ad6dcd6793c98d609")
.unwrap(),
), // TODO: consider making this optional
..Default::default()
};
@ -111,7 +101,9 @@ async fn default_echo() -> Result<(), String> {
let ssk = Aes256Gcm::generate_key(&mut thread_rng());
// subscribe to default channel
node.relay_subscribe(&content_filter)?;
let topic = "test".to_string();
node.relay_subscribe(&topic)?;
let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto);
@ -133,7 +125,13 @@ async fn default_echo() -> Result<(), String> {
#[test]
#[serial]
fn node_restart() {
let config = WakuNodeConfig::default();
let config = WakuNodeConfig {
node_key: Some(
SecretKey::from_str("05f381866cc21f6c1e2e80e07fa732008e36d942dce3206ad6dcd6793c98d609")
.unwrap(),
), // TODO: consider making this optional
..Default::default()
};
for _ in 0..3 {
let node = waku_new(config.clone().into()).expect("default config should be valid");

View File

@ -40,6 +40,7 @@ fn generate_bindgen_code(project_dir: &Path) {
println!("cargo:rustc-link-lib=static=backtrace");
// TODO: Determine if pthread is automatically included
// TODO: Test in other architectures
// Generate waku bindings with bindgen
let bindings = bindgen::Builder::default()