From dfe068222d1a7e0b42812b83d067cdfff9013020 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Sat, 2 Nov 2024 23:29:41 +0100 Subject: [PATCH] final set of adaptations to current nwaku --- Cargo.lock | 1 + examples/basic/Cargo.toml | 1 + examples/basic/src/main.rs | 54 ++++++++++++++++++++++---------- waku-bindings/Cargo.toml | 1 + waku-bindings/src/node/events.rs | 6 ++-- waku-bindings/src/node/mod.rs | 4 +-- waku-bindings/tests/node.rs | 48 ++++++++++------------------ 7 files changed, 62 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cfd6ca6..066b0ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -380,6 +380,7 @@ name = "basic" version = "0.1.0" dependencies = [ "futures", + "serde_json", "tokio", "tokio-util", "waku-bindings", diff --git a/examples/basic/Cargo.toml b/examples/basic/Cargo.toml index 3a75fb7..29ddd5e 100644 --- a/examples/basic/Cargo.toml +++ b/examples/basic/Cargo.toml @@ -10,3 +10,4 @@ futures = "0.3.30" tokio = { version = "1.36.0", features = ["full"] } tokio-util = { version = "0.7.10", features = ["rt"] } waku = { path = "../../waku-bindings", package = "waku-bindings" } +serde_json = "1.0" diff --git a/examples/basic/src/main.rs b/examples/basic/src/main.rs index 5193431..c8e214a 100644 --- a/examples/basic/src/main.rs +++ b/examples/basic/src/main.rs @@ -3,7 +3,7 @@ use std::str::from_utf8; use std::time::SystemTime; use tokio::time::{sleep, Duration}; use waku::{ - waku_destroy, waku_new, Encoding, Event, WakuContentTopic, WakuMessage, WakuNodeConfig, + waku_destroy, waku_new, Encoding, Event, WakuContentTopic, WakuMessage, WakuNodeConfig, LibwakuResponse, }; #[tokio::main] @@ -25,25 +25,45 @@ async fn main() -> Result<(), Error> { // ======================================================================== // Setting an event callback to be executed each time a message is received - node2.set_event_callback(move |event| { - if let Event::WakuMessage(message) = event { - let message = message.waku_message; - let payload = message.payload.to_vec(); - let msg = from_utf8(&payload).expect("should be valid message"); - println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); - println!("Message Received in NODE 2: {}", msg); - println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); + node2.set_event_callback(&|response| { + if let LibwakuResponse::Success(v) = response { + let event: Event = + serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); + + match event { + Event::WakuMessage(evt) => { + println!("WakuMessage event received: {:?}", evt.waku_message); + let message = evt.waku_message; + let payload = message.payload.to_vec(); + let msg = from_utf8(&payload).expect("should be valid message"); + println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); + println!("Message Received in NODE 2: {}", msg); + println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); + } + Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err), + _ => panic!("event case not expected"), + }; } }); - node1.set_event_callback(move |event| { - if let Event::WakuMessage(message) = event { - let message = message.waku_message; - let payload = message.payload.to_vec(); - let msg = from_utf8(&payload).expect("should be valid message"); - println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); - println!("Message Received in NODE 1: {}", msg); - println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); + node1.set_event_callback(&|response| { + if let LibwakuResponse::Success(v) = response { + let event: Event = + serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); + + match event { + Event::WakuMessage(evt) => { + println!("WakuMessage event received: {:?}", evt.waku_message); + let message = evt.waku_message; + let payload = message.payload.to_vec(); + let msg = from_utf8(&payload).expect("should be valid message"); + println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); + println!("Message Received in NODE 1: {}", msg); + println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); + } + Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err), + _ => panic!("event case not expected"), + }; } }); diff --git a/waku-bindings/Cargo.toml b/waku-bindings/Cargo.toml index d709db1..89f5234 100644 --- a/waku-bindings/Cargo.toml +++ b/waku-bindings/Cargo.toml @@ -31,6 +31,7 @@ waku-sys = { version = "0.5.0", path = "../waku-sys" } libc = "0.2" serde-aux = "4.3.1" rln = "0.3.4" +tokio = { version = "1", features = ["full"] } [dev-dependencies] futures = "0.3.25" diff --git a/waku-bindings/src/node/events.rs b/waku-bindings/src/node/events.rs index e6dbde1..4871c07 100644 --- a/waku-bindings/src/node/events.rs +++ b/waku-bindings/src/node/events.rs @@ -39,10 +39,10 @@ pub struct WakuMessageEvent { /// Register callback to act as event handler and receive application events, /// which are used to react to asynchronous events in Waku -pub fn waku_set_event_callback(ctx: &WakuNodeContext, closure: F) { +pub fn waku_set_event_callback(ctx: &WakuNodeContext, closure: &F) { unsafe { - let cb = get_trampoline(&closure); - waku_sys::waku_set_event_callback(ctx.obj_ptr, cb, &closure as *const _ as *mut c_void) + let cb = get_trampoline(closure); + waku_sys::waku_set_event_callback(ctx.obj_ptr, cb, closure as *const _ as *mut c_void) }; } diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index 35c67ee..4549b2c 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -117,7 +117,7 @@ impl WakuNodeHandle { relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic) } - pub fn set_event_callback(&self, f: F) { - events::waku_set_event_callback(&self.ctx, f) + pub fn set_event_callback(&self, closure: &F) { + events::waku_set_event_callback(&self.ctx, closure) } } diff --git a/waku-bindings/tests/node.rs b/waku-bindings/tests/node.rs index 879c21a..08c2f40 100644 --- a/waku-bindings/tests/node.rs +++ b/waku-bindings/tests/node.rs @@ -4,17 +4,13 @@ use std::str::FromStr; use std::time::{Duration, SystemTime}; use std::{collections::HashSet, str::from_utf8}; use std::cell::OnceCell; -use tokio::sync::broadcast::{self, Sender}; use waku_bindings::LibwakuResponse; -use std::sync::{Arc, OnceLock, Mutex}; // Import Arc and Mutex use tokio::time; use tokio::time::sleep; -use waku_bindings::utils; use waku_bindings::{ waku_destroy, waku_new, Encoding, Event, MessageHash, Running, WakuContentTopic, WakuMessage, WakuNodeConfig, WakuNodeHandle, }; -use std::ffi::c_void; const ECHO_TIMEOUT: u64 = 1000; const ECHO_MESSAGE: &str = "Hi from 🦀!"; const TEST_PUBSUBTOPIC: &str = "test"; @@ -29,24 +25,18 @@ fn try_publish_relay_messages( ])) } -#[derive(Debug, Clone)] -struct Response { - hash: MessageHash, - payload: Vec, -} - async fn test_echo_messages( node1: &WakuNodeHandle, node2: &WakuNodeHandle, content: &'static str, content_topic: WakuContentTopic, -) { +) -> Result<(), String> { // setting a naïve event handler to avoid appearing ERR messages in logs - node1.set_event_callback(|_LibwakuResponse| {}); + node1.set_event_callback(&|_| {}); let rx_waku_message: OnceCell = OnceCell::new(); - let closure = |response: LibwakuResponse| { + let closure = |response| { if let LibwakuResponse::Success(v) = response { let event: Event = serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); @@ -65,12 +55,7 @@ async fn test_echo_messages( println!("Before setting event callback"); - unsafe { - let cb = utils::get_trampoline(&closure); - waku_sys::waku_set_event_callback(node2.ctx.obj_ptr, cb, &closure as *const _ as *mut c_void) - }; - - // node2.set_event_callback(closure); // Set the event callback with the closure + node2.set_event_callback(&closure); // Set the event callback with the closure let topic = TEST_PUBSUBTOPIC.to_string(); node1.relay_subscribe(&topic).unwrap(); @@ -100,22 +85,28 @@ async fn test_echo_messages( Vec::new(), false, ); - let ids = try_publish_relay_messages(node1, &message).expect("send relay messages"); - println!("After publish"); + let _ids = try_publish_relay_messages(node1, &message).expect("send relay messages"); // Wait for the msg to arrive for _ in 0..50 { - if let Some(value) = rx_waku_message.get() { - println!("The waku message value is: {:?}", value); - break; + if let Some(msg) = rx_waku_message.get() { + println!("The waku message value is: {:?}", msg); + let payload = msg.payload.to_vec(); + let payload_str = from_utf8(&payload).expect("should be valid message"); + println!("payload: {:?}", payload_str); + if payload_str == ECHO_MESSAGE { + return Ok(()) + } } else { sleep(Duration::from_millis(100)).await; } } if let None = rx_waku_message.get() { - println!("ERROR could not get waku message"); + return Err("could not get waku message".to_string()) } + + return Err("Unexpected test ending".to_string()) } #[tokio::test] @@ -134,7 +125,6 @@ async fn default_echo() -> Result<(), String> { let node1 = node1.start()?; let node2 = node2.start()?; - let waku_version = node2.version()?; let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto); let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT)); @@ -148,12 +138,8 @@ async fn default_echo() -> Result<(), String> { assert!(got_all); - let node2 = node2.stop()?; let node1 = node1.stop()?; - - let sleep = time::sleep(Duration::from_secs(5)); - tokio::pin!(sleep); - + let node2 = node2.stop()?; waku_destroy(node1)?; waku_destroy(node2)?;