From ef44d11e814fae722bc1cf8472e5c505d2ec5099 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Wed, 30 Oct 2024 14:58:10 +0100 Subject: [PATCH] temporary changes. pendin to fine tune them --- waku-bindings/src/general/mod.rs | 7 ++-- waku-bindings/src/lib.rs | 5 ++- waku-bindings/src/node/events.rs | 18 ++------- waku-bindings/src/node/mod.rs | 3 +- waku-bindings/src/utils.rs | 4 +- waku-bindings/tests/node.rs | 69 +++++++++++++++++--------------- 6 files changed, 52 insertions(+), 54 deletions(-) diff --git a/waku-bindings/src/general/mod.rs b/waku-bindings/src/general/mod.rs index bd880a7..dea964a 100644 --- a/waku-bindings/src/general/mod.rs +++ b/waku-bindings/src/general/mod.rs @@ -20,7 +20,7 @@ pub type Result = std::result::Result; // TODO: Properly type and deserialize payload form base64 encoded string /// Waku message in JSON format. /// as per the [specification](https://rfc.vac.dev/spec/36/#jsonmessage-type) -#[derive(Clone, Serialize, Deserialize, Debug)] +#[derive(Clone, Serialize, Deserialize, Debug, Default)] #[serde(rename_all = "camelCase")] pub struct WakuMessage { #[serde(with = "base64_serde", default = "Vec::new")] @@ -67,8 +67,9 @@ impl WakuMessage { } /// WakuMessage encoding scheme -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq, Default)] pub enum Encoding { + #[default] Proto, Rlp, Rfc26, @@ -105,7 +106,7 @@ impl RegexRepresentation for Encoding { } /// A waku content topic `/{application_name}/{version}/{content_topic_name}/{encdoing}` -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq, Default)] pub struct WakuContentTopic { pub application_name: Cow<'static, str>, pub version: Cow<'static, str>, diff --git a/waku-bindings/src/lib.rs b/waku-bindings/src/lib.rs index eee8841..e88bd52 100644 --- a/waku-bindings/src/lib.rs +++ b/waku-bindings/src/lib.rs @@ -3,7 +3,10 @@ //! Implementation on top of [`waku-bindings`](https://rfc.vac.dev/spec/36/) mod general; mod node; -mod utils; +pub mod utils; + +// Re-export the LibwakuResponse type to make it accessible outside this module +pub use utils::LibwakuResponse; // Required so functions inside libwaku can call RLN functions even if we // use it within the bindings functions diff --git a/waku-bindings/src/node/events.rs b/waku-bindings/src/node/events.rs index fd71b7a..e6dbde1 100644 --- a/waku-bindings/src/node/events.rs +++ b/waku-bindings/src/node/events.rs @@ -17,7 +17,7 @@ use crate::MessageHash; /// Waku event /// For now just WakuMessage is supported #[non_exhaustive] -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] #[serde(tag = "eventType", rename_all = "camelCase")] pub enum Event { #[serde(rename = "message")] @@ -26,7 +26,7 @@ pub enum Event { } /// Type of `event` field for a `message` event -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct WakuMessageEvent { /// The pubsub topic on which the message was received @@ -39,20 +39,10 @@ pub struct WakuMessageEvent { /// Register callback to act as event handler and receive application events, /// which are used to react to asynchronous events in Waku -pub fn waku_set_event_callback(ctx: &WakuNodeContext, mut f: F) { - let cb = |response: LibwakuResponse| { - if let LibwakuResponse::Success(v) = response { - let data: Event = - serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); - f(data); - }; - }; - +pub fn waku_set_event_callback(ctx: &WakuNodeContext, closure: F) { unsafe { - let mut closure = cb; let cb = get_trampoline(&closure); - - waku_sys::waku_set_event_callback(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void) + waku_sys::waku_set_event_callback(ctx.obj_ptr, cb, &closure as *const _ as *mut c_void) }; } diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index 817e34b..a160685 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -15,6 +15,7 @@ use std::marker::PhantomData; use std::time::Duration; // internal use crate::general::{MessageHash, Result, WakuMessage}; +use crate::LibwakuResponse; use context::WakuNodeContext; pub use config::RLNConfig; @@ -116,7 +117,7 @@ impl WakuNodeHandle { relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic) } - pub fn set_event_callback(&self, f: F) { + pub fn set_event_callback(&self, f: F) { events::waku_set_event_callback(&self.ctx, f) } } diff --git a/waku-bindings/src/utils.rs b/waku-bindings/src/utils.rs index f7337e7..760f774 100644 --- a/waku-bindings/src/utils.rs +++ b/waku-bindings/src/utils.rs @@ -48,7 +48,7 @@ unsafe extern "C" fn trampoline( ) where F: FnMut(LibwakuResponse), { - let user_data = &mut *(user_data as *mut F); + let closure = &mut *(user_data as *mut F); let response = if data.is_null() { "" @@ -60,7 +60,7 @@ unsafe extern "C" fn trampoline( let result = LibwakuResponse::try_from((ret_code as u32, response)) .expect("invalid response obtained from libwaku"); - user_data(result); + closure(result); } pub fn get_trampoline(_closure: &F) -> WakuCallBack diff --git a/waku-bindings/tests/node.rs b/waku-bindings/tests/node.rs index fee6100..2536de2 100644 --- a/waku-bindings/tests/node.rs +++ b/waku-bindings/tests/node.rs @@ -3,14 +3,16 @@ use serial_test::serial; use std::str::FromStr; use std::time::{Duration, SystemTime}; use std::{collections::HashSet, str::from_utf8}; +use std::cell::OnceCell; use tokio::sync::broadcast::{self, Sender}; +use waku_bindings::LibwakuResponse; use tokio::time; use tokio::time::sleep; use waku_bindings::{ waku_destroy, waku_new, Encoding, Event, MessageHash, Running, WakuContentTopic, WakuMessage, WakuNodeConfig, WakuNodeHandle, }; -const ECHO_TIMEOUT: u64 = 10; +const ECHO_TIMEOUT: u64 = 100; const ECHO_MESSAGE: &str = "Hi from 🦀!"; const TEST_PUBSUBTOPIC: &str = "test"; @@ -30,22 +32,6 @@ struct Response { payload: Vec, } -fn set_callback(node: &WakuNodeHandle, tx: Sender) { - node.set_event_callback(move |event| { - if let Event::WakuMessage(message) = event { - let hash = message.message_hash; - let message = message.waku_message; - let payload = message.payload.to_vec(); - - tx.send(Response { - hash: hash.to_string(), - payload, - }) - .expect("send response to the receiver"); - } - }); -} - async fn test_echo_messages( node1: &WakuNodeHandle, node2: &WakuNodeHandle, @@ -66,27 +52,39 @@ async fn test_echo_messages( false, ); - node1.set_event_callback(move |_event| {}); + // setting a naïve event handler to avoid appearing ERR messages in logs + node1.set_event_callback(|_LibwakuResponse| {}); - let (tx, mut rx) = broadcast::channel(1); - set_callback(node2, tx); + let rx_waku_message: OnceCell = OnceCell::new(); + + let closure = |response: LibwakuResponse| { + if let LibwakuResponse::Success(v) = response { + let event: Event = + serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); + + match event { + Event::WakuMessage(evt) => { + println!("WakuMessage event received: {:?}", evt.waku_message); + // let _ = rx_waku_message.set(evt.waku_message); // <-- this produces segfault + } + Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err), + _ => panic!("event case not expected"), + }; + }; + }; + + node2.set_event_callback(closure); let mut ids = try_publish_relay_messages(node1, &message).expect("send relay messages"); - while let Ok(res) = rx.recv().await { - if ids.take(&res.hash).is_some() { - let msg = from_utf8(&res.payload).expect("should be valid message"); - assert_eq!(content, msg); - } - if ids.is_empty() { - break; - } - } + // Wait for the msg to arrive to form + sleep(Duration::from_secs(1)).await; } #[tokio::test] #[serial] async fn default_echo() -> Result<(), String> { + println!("Test default_echo"); let node1 = waku_new(Some(WakuNodeConfig { port: Some(60010), ..Default::default() @@ -99,16 +97,20 @@ async fn default_echo() -> Result<(), String> { let node1 = node1.start()?; let node2 = node2.start()?; - let addresses1 = node1.listen_addresses()?; - node2.connect(&addresses1[0], None)?; - let topic = TEST_PUBSUBTOPIC.to_string(); node1.relay_subscribe(&topic)?; node2.relay_subscribe(&topic)?; + sleep(Duration::from_secs(35)).await; + + // Interconnect nodes + println!("Connecting node1 to node2"); + let addresses1 = node1.listen_addresses()?; + node2.connect(&addresses1[0], None)?; + // Wait for mesh to form - sleep(Duration::from_secs(3)).await; + sleep(Duration::from_secs(25)).await; let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto); @@ -135,6 +137,7 @@ async fn default_echo() -> Result<(), String> { #[test] #[serial] fn node_restart() { + println!("Test node_restart"); let config = WakuNodeConfig { node_key: Some( SecretKey::from_str("05f381866cc21f6c1e2e80e07fa732008e36d942dce3206ad6dcd6793c98d609")