From e5aaa4d90bee5973822057feca832e687feceedf Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Thu, 31 Oct 2024 13:44:12 +0100 Subject: [PATCH] temporary changes that allow default_echo test not to crash --- waku-bindings/src/node/management.rs | 1 - waku-bindings/src/node/mod.rs | 2 +- waku-bindings/tests/node.rs | 110 +++++++++++++++++---------- 3 files changed, 69 insertions(+), 44 deletions(-) diff --git a/waku-bindings/src/node/management.rs b/waku-bindings/src/node/management.rs index db7872d..ff9fa8a 100644 --- a/waku-bindings/src/node/management.rs +++ b/waku-bindings/src/node/management.rs @@ -20,7 +20,6 @@ pub fn waku_new(config: Option) -> Result { waku_sys::waku_setup(); } - let config = config.unwrap_or_default(); let config_ptr = CString::new( serde_json::to_string(&config) diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index a160685..35c67ee 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -37,7 +37,7 @@ impl WakuNodeState for Running {} /// Handle to the underliying waku node pub struct WakuNodeHandle { - ctx: WakuNodeContext, + pub ctx: WakuNodeContext, phantom: PhantomData, } diff --git a/waku-bindings/tests/node.rs b/waku-bindings/tests/node.rs index 2536de2..879c21a 100644 --- a/waku-bindings/tests/node.rs +++ b/waku-bindings/tests/node.rs @@ -6,13 +6,16 @@ use std::{collections::HashSet, str::from_utf8}; use std::cell::OnceCell; use tokio::sync::broadcast::{self, Sender}; use waku_bindings::LibwakuResponse; +use std::sync::{Arc, OnceLock, Mutex}; // Import Arc and Mutex use tokio::time; use tokio::time::sleep; +use waku_bindings::utils; use waku_bindings::{ waku_destroy, waku_new, Encoding, Event, MessageHash, Running, WakuContentTopic, WakuMessage, WakuNodeConfig, WakuNodeHandle, }; -const ECHO_TIMEOUT: u64 = 100; +use std::ffi::c_void; +const ECHO_TIMEOUT: u64 = 1000; const ECHO_MESSAGE: &str = "Hi from 🦀!"; const TEST_PUBSUBTOPIC: &str = "test"; @@ -38,6 +41,52 @@ async fn test_echo_messages( content: &'static str, content_topic: WakuContentTopic, ) { + // setting a naïve event handler to avoid appearing ERR messages in logs + node1.set_event_callback(|_LibwakuResponse| {}); + + let rx_waku_message: OnceCell = OnceCell::new(); + + let closure = |response: LibwakuResponse| { + if let LibwakuResponse::Success(v) = response { + let event: Event = + serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); + + match event { + Event::WakuMessage(evt) => { + println!("WakuMessage event received: {:?}", evt.waku_message); + // rx_waku_message = evt.waku_message; // Use the shared reference + let _ = rx_waku_message.set(evt.waku_message); + } + Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err), + _ => panic!("event case not expected"), + }; + } + }; + + println!("Before setting event callback"); + + unsafe { + let cb = utils::get_trampoline(&closure); + waku_sys::waku_set_event_callback(node2.ctx.obj_ptr, cb, &closure as *const _ as *mut c_void) + }; + + // node2.set_event_callback(closure); // Set the event callback with the closure + + let topic = TEST_PUBSUBTOPIC.to_string(); + node1.relay_subscribe(&topic).unwrap(); + node2.relay_subscribe(&topic).unwrap(); + + sleep(Duration::from_secs(3)).await; + + // Interconnect nodes + println!("Connecting node1 to node2"); + let addresses1 = node1.listen_addresses().unwrap(); + node2.connect(&addresses1[0], None).unwrap(); + + // Wait for mesh to form + sleep(Duration::from_secs(3)).await; + + println!("Before publish"); let message = WakuMessage::new( content, content_topic, @@ -51,34 +100,22 @@ async fn test_echo_messages( Vec::new(), false, ); + let ids = try_publish_relay_messages(node1, &message).expect("send relay messages"); + println!("After publish"); - // setting a naïve event handler to avoid appearing ERR messages in logs - node1.set_event_callback(|_LibwakuResponse| {}); + // Wait for the msg to arrive + for _ in 0..50 { + if let Some(value) = rx_waku_message.get() { + println!("The waku message value is: {:?}", value); + break; + } else { + sleep(Duration::from_millis(100)).await; + } + } - let rx_waku_message: OnceCell = OnceCell::new(); - - let closure = |response: LibwakuResponse| { - if let LibwakuResponse::Success(v) = response { - let event: Event = - serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); - - match event { - Event::WakuMessage(evt) => { - println!("WakuMessage event received: {:?}", evt.waku_message); - // let _ = rx_waku_message.set(evt.waku_message); // <-- this produces segfault - } - Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err), - _ => panic!("event case not expected"), - }; - }; - }; - - node2.set_event_callback(closure); - - let mut ids = try_publish_relay_messages(node1, &message).expect("send relay messages"); - - // Wait for the msg to arrive to form - sleep(Duration::from_secs(1)).await; + if let None = rx_waku_message.get() { + println!("ERROR could not get waku message"); + } } #[tokio::test] @@ -97,21 +134,7 @@ async fn default_echo() -> Result<(), String> { let node1 = node1.start()?; let node2 = node2.start()?; - let topic = TEST_PUBSUBTOPIC.to_string(); - - node1.relay_subscribe(&topic)?; - node2.relay_subscribe(&topic)?; - - sleep(Duration::from_secs(35)).await; - - // Interconnect nodes - println!("Connecting node1 to node2"); - let addresses1 = node1.listen_addresses()?; - node2.connect(&addresses1[0], None)?; - - // Wait for mesh to form - sleep(Duration::from_secs(25)).await; - + let waku_version = node2.version()?; let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto); let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT)); @@ -128,6 +151,9 @@ async fn default_echo() -> Result<(), String> { let node2 = node2.stop()?; let node1 = node1.stop()?; + let sleep = time::sleep(Duration::from_secs(5)); + tokio::pin!(sleep); + waku_destroy(node1)?; waku_destroy(node2)?;