diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 6b4e8e0..002efc7 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -4736,6 +4736,7 @@ dependencies = [ "prost", "serde", "serde_json", + "tokio", "tui", "unicode-width 0.1.12", "waku-bindings", diff --git a/examples/basic/src/main.rs b/examples/basic/src/main.rs index f17af70..1f7fb85 100644 --- a/examples/basic/src/main.rs +++ b/examples/basic/src/main.rs @@ -13,12 +13,14 @@ async fn main() -> Result<(), Error> { tcp_port: Some(60010), // TODO: use any available port. ..Default::default() })) + .await .expect("should instantiate"); let node2 = waku_new(Some(WakuNodeConfig { tcp_port: Some(60020), // TODO: use any available port. ..Default::default() })) + .await .expect("should instantiate"); // ======================================================================== @@ -31,7 +33,7 @@ async fn main() -> Result<(), Error> { match event { WakuEvent::WakuMessage(evt) => { - println!("WakuMessage event received: {:?}", evt.waku_message); + // println!("WakuMessage event received: {:?}", evt.waku_message); let message = evt.waku_message; let payload = message.payload.to_vec(); let msg = from_utf8(&payload).expect("should be valid message"); @@ -54,7 +56,7 @@ async fn main() -> Result<(), Error> { match event { WakuEvent::WakuMessage(evt) => { - println!("WakuMessage event received: {:?}", evt.waku_message); + // println!("WakuMessage event received: {:?}", evt.waku_message); let message = evt.waku_message; let payload = message.payload.to_vec(); let msg = from_utf8(&payload).expect("should be valid message"); @@ -69,8 +71,8 @@ async fn main() -> Result<(), Error> { }) .expect("set event call back working"); - let node1 = node1.start().expect("node1 should start"); - let node2 = node2.start().expect("node2 should start"); + let node1 = node1.start().await.expect("node1 should start"); + let node2 = node2.start().await.expect("node2 should start"); // ======================================================================== // Subscribe to pubsub topic @@ -78,10 +80,12 @@ async fn main() -> Result<(), Error> { node1 .relay_subscribe(&topic) + .await .expect("node1 should subscribe"); node2 .relay_subscribe(&topic) + .await .expect("node2 should subscribe"); // ======================================================================== @@ -89,10 +93,12 @@ async fn main() -> Result<(), Error> { let addresses2 = node2 .listen_addresses() + .await .expect("should obtain the addresses"); node1 .connect(&addresses2[0], None) + .await .expect("node1 should connect to node2"); // ======================================================================== @@ -119,6 +125,7 @@ async fn main() -> Result<(), Error> { ); node1 .relay_publish_message(&message, &topic, None) + .await .expect("should have sent the message"); // ======================================================================== @@ -129,13 +136,13 @@ async fn main() -> Result<(), Error> { // ======================================================================== // Stop both instances - let node1 = node1.stop().expect("should stop"); - let node2 = node2.stop().expect("should stop"); + let node1 = node1.stop().await.expect("should stop"); + let node2 = node2.stop().await.expect("should stop"); // ======================================================================== // Free resources - node1.waku_destroy().expect("should deallocate"); - node2.waku_destroy().expect("should deallocate"); + node1.waku_destroy().await.expect("should deallocate"); + node2.waku_destroy().await.expect("should deallocate"); Ok(()) } diff --git a/examples/tic-tac-toe-gui/src/main.rs b/examples/tic-tac-toe-gui/src/main.rs index fef3a05..b298c0d 100644 --- a/examples/tic-tac-toe-gui/src/main.rs +++ b/examples/tic-tac-toe-gui/src/main.rs @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize}; use std::str::from_utf8; use std::sync::{Arc, Mutex}; use std::time::{SystemTime, Duration}; +use tokio::task; use tokio::sync::mpsc; use waku::{ @@ -48,7 +49,7 @@ impl TicTacToeApp { } } - fn start(self) -> TicTacToeApp { + async fn start(self) -> TicTacToeApp { let tx_clone = self.tx.clone(); let my_closure = move |response| { @@ -84,14 +85,14 @@ impl TicTacToeApp { self.waku.set_event_callback(my_closure).expect("set event call back working"); // Start the waku node - let waku = self.waku.start().expect("waku should start"); + let waku = self.waku.start().await.expect("waku should start"); // Subscribe to desired topic using the relay protocol - // self.waku.relay_subscribe(&self.game_topic.to_string()).expect("waku should subscribe"); + waku.relay_subscribe(&self.game_topic).await.expect("waku should subscribe"); - let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto); - let content_topics = vec![ctopic]; - waku.filter_subscribe(&self.game_topic, content_topics).expect("waku should subscribe"); + // let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto); + // let content_topics = vec![ctopic]; + // waku.filter_subscribe(&self.game_topic, content_topics).await.expect("waku should subscribe"); // Connect to hard-coded node // let target_node_multi_addr = @@ -114,7 +115,7 @@ impl TicTacToeApp { } impl TicTacToeApp { - fn send_game_state(&self, game_state: &GameState) { + async fn send_game_state(&self, game_state: &GameState) { let serialized_game_state = serde_json::to_string(game_state).unwrap(); let content_topic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto); @@ -132,9 +133,11 @@ impl TicTacToeApp { false, ); - // self.waku.relay_publish_message(&message, &self.game_topic.to_string(), None) - // .expect("Failed to send message"); - self.waku.lightpush_publish_message(&message, &self.game_topic).expect("Failed to send message"); + if let Ok(msg_hash) = self.waku.relay_publish_message(&message, &self.game_topic, None).await { + dbg!(format!("message hash published: {}", msg_hash)); + } + + // self.waku.lightpush_publish_message(&message, &self.game_topic); } fn make_move(&mut self, row: usize, col: usize) { @@ -159,7 +162,17 @@ impl TicTacToeApp { }; } - self.send_game_state(&game_state); // Send updated state after a move + // Call the async function in a blocking context + task::block_in_place(|| { + // Obtain the current runtime handle + let handle = tokio::runtime::Handle::current(); + + // Block on the async function + handle.block_on(async { + // Assuming `self` is available in the current context + self.send_game_state(&game_state).await; + }); + }); } } } @@ -314,7 +327,7 @@ async fn main() -> eframe::Result<()> { // node_key: Some(SecretKey::from_str("2fc0515879e52b7b73297cfd6ab3abf7c344ef84b7a90ff6f4cc19e05a198027").unwrap()), max_message_size: Some("1024KiB".to_string()), relay_topics: vec![String::from(&game_topic)], - log_level: Some("DEBUG"), // Supported: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL + log_level: Some("FATAL"), // Supported: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL keep_alive: Some(true), @@ -326,7 +339,7 @@ async fn main() -> eframe::Result<()> { // discv5_enr_auto_update: Some(false), ..Default::default() - })) + })).await .expect("should instantiate"); let game_state = GameState { @@ -339,7 +352,7 @@ async fn main() -> eframe::Result<()> { let clone = shared_state.clone(); let app = TicTacToeApp::new(waku, game_topic, clone, tx); - let app = app.start(); + let app = app.start().await; let clone = shared_state.clone(); // Listen for messages in the main thread diff --git a/examples/toy-chat/Cargo.toml b/examples/toy-chat/Cargo.toml index a9c13ed..abc5442 100644 --- a/examples/toy-chat/Cargo.toml +++ b/examples/toy-chat/Cargo.toml @@ -15,4 +15,5 @@ tui = "0.19" crossterm = "0.25" unicode-width = "0.1" prost = "0.11" -chrono = "0.4" \ No newline at end of file +chrono = "0.4" +tokio = { version = "1", features = ["full"] } \ No newline at end of file diff --git a/examples/toy-chat/src/main.rs b/examples/toy-chat/src/main.rs index f9eb2df..7bd92e9 100644 --- a/examples/toy-chat/src/main.rs +++ b/examples/toy-chat/src/main.rs @@ -1,6 +1,7 @@ mod protocol; use crate::protocol::{Chat2Message, TOY_CHAT_CONTENT_TOPIC}; +use tokio::task; use chrono::Utc; use crossterm::{ event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode}, @@ -48,7 +49,7 @@ struct App { } impl App { - fn new(nick: String) -> Result> { + async fn new(nick: String) -> Result> { let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC); let waku = waku_new(Some(WakuNodeConfig { tcp_port: Some(60010), @@ -69,7 +70,7 @@ impl App { // discv5_enr_auto_update: Some(false), ..Default::default() - }))?; + })).await?; Ok(App { input: String::new(), @@ -80,7 +81,7 @@ impl App { }) } - fn start_waku_node(self) -> Result> { + async fn start_waku_node(self) -> Result> { let shared_messages = Arc::clone(&self.messages); @@ -116,10 +117,10 @@ impl App { } })?; - let waku = self.waku.start()?; + let waku = self.waku.start().await?; let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC); - waku.relay_subscribe(&pubsub_topic)?; + waku.relay_subscribe(&pubsub_topic).await?; Ok(App { input: self.input, @@ -133,8 +134,8 @@ impl App { impl App { - fn retrieve_history(&mut self) { - let messages = self.waku.store_query(None, vec![TOY_CHAT_CONTENT_TOPIC.clone()], STORE_NODE).unwrap(); + async fn retrieve_history(&mut self) { + let messages = self.waku.store_query(None, vec![TOY_CHAT_CONTENT_TOPIC.clone()], STORE_NODE).await.unwrap(); let messages:Vec<_> = messages .iter() .map(|store_resp_msg| { @@ -183,15 +184,25 @@ impl App { false, ); - let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC); - if let Err(e) = self.waku.relay_publish_message( - &waku_message, - &pubsub_topic, - None, - ) { - let mut out = std::io::stderr(); - write!(out, "{e:?}").unwrap(); - } + // Call the async function in a blocking context + task::block_in_place(|| { + // Obtain the current runtime handle + let handle = tokio::runtime::Handle::current(); + + // Block on the async function + handle.block_on(async { + // Assuming `self` is available in the current context + let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC); + if let Err(e) = self.waku.relay_publish_message( + &waku_message, + &pubsub_topic, + None, + ).await { + let mut out = std::io::stderr(); + write!(out, "{e:?}").unwrap(); + } + }); + }); } KeyCode::Char(c) => { self.input.push(c); @@ -210,16 +221,17 @@ impl App { } } - fn stop_app(self) { - self.waku.stop().expect("the node should stop properly"); + async fn stop_app(self) { + self.waku.stop().await.expect("the node should stop properly"); } } -fn main() -> std::result::Result<(), Box> { +#[tokio::main] +async fn main() -> std::result::Result<(), Box> { let nick = std::env::args().nth(1).expect("Nick to be set"); - let app = App::new(nick)?; - let mut app = app.start_waku_node()?; + let app = App::new(nick).await?; + let mut app = app.start_waku_node().await?; // setup terminal enable_raw_mode()?; @@ -228,9 +240,9 @@ fn main() -> std::result::Result<(), Box> { let backend = CrosstermBackend::new(stdout); let mut terminal = Terminal::new(backend)?; - app.retrieve_history(); + app.retrieve_history().await; let res = app.run_main_loop(&mut terminal); - app.stop_app(); + app.stop_app().await; // restore terminal disable_raw_mode()?; diff --git a/waku-bindings/src/general/contenttopic.rs b/waku-bindings/src/general/contenttopic.rs index 381f57a..ad987be 100644 --- a/waku-bindings/src/general/contenttopic.rs +++ b/waku-bindings/src/general/contenttopic.rs @@ -1,4 +1,6 @@ // std +use crate::general::Result; +use crate::utils::WakuDecode; use std::borrow::Cow; use std::fmt::{Display, Formatter}; use std::str::FromStr; @@ -79,6 +81,12 @@ impl WakuContentTopic { } } +impl WakuDecode for WakuContentTopic { + fn decode(input: &str) -> Result { + Ok(serde_json::from_str(input).expect("could not parse store resp")) + } +} + impl FromStr for WakuContentTopic { type Err = String; diff --git a/waku-bindings/src/general/messagehash.rs b/waku-bindings/src/general/messagehash.rs index ffdd5a9..0f07a22 100644 --- a/waku-bindings/src/general/messagehash.rs +++ b/waku-bindings/src/general/messagehash.rs @@ -2,12 +2,28 @@ use crate::utils::WakuDecode; use hex::FromHex; use serde::{Deserialize, Deserializer, Serialize}; use std::convert::TryInto; +use std::fmt; +use std::hash::{Hash, Hasher}; use std::str::FromStr; /// Waku message hash, hex encoded sha256 digest of the message -#[derive(Debug, Serialize, Clone)] +#[derive(Debug, Serialize, PartialEq, Eq, Clone)] pub struct MessageHash([u8; 32]); +impl MessageHash { + fn to_hex_string(&self) -> String { + let hex: String = self.0.iter().map(|b| format!("{:02x}", b)).collect(); + format!("0x{}", hex) + } +} + +impl Hash for MessageHash { + fn hash(&self, state: &mut H) { + // Use the inner array to contribute to the hash + self.0.hash(state); + } +} + impl FromStr for MessageHash { type Err = String; @@ -45,6 +61,13 @@ impl<'de> Deserialize<'de> for MessageHash { impl WakuDecode for MessageHash { fn decode(input: &str) -> Result { - serde_json::from_str(input).expect("could not parse MessageHash") + MessageHash::from_str(input) + } +} + +// Implement the Display trait +impl fmt::Display for MessageHash { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.to_hex_string()) } } diff --git a/waku-bindings/src/node/events.rs b/waku-bindings/src/node/events.rs index 6b9d3ff..029bb98 100644 --- a/waku-bindings/src/node/events.rs +++ b/waku-bindings/src/node/events.rs @@ -41,7 +41,7 @@ mod tests { #[test] fn deserialize_message_event() { - let s = "{\"eventType\":\"message\",\"messageHash\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}"; + let s = "{\"eventType\":\"message\",\"messageHash\":[91, 70, 26, 8, 141, 232, 150, 200, 26, 206, 224, 175, 249, 74, 61, 140, 231, 126, 224, 160, 91, 80, 162, 65, 250, 171, 84, 149, 133, 110, 214, 101],\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}"; let evt: WakuEvent = serde_json::from_str(s).unwrap(); assert!(matches!(evt, WakuEvent::WakuMessage(_))); } diff --git a/waku-bindings/src/node/filter.rs b/waku-bindings/src/node/filter.rs index 9bf0208..c3d237a 100644 --- a/waku-bindings/src/node/filter.rs +++ b/waku-bindings/src/node/filter.rs @@ -4,6 +4,8 @@ use std::ffi::CString; // crates use libc::*; +use std::sync::Arc; +use tokio::sync::Notify; // internal use crate::general::contenttopic::WakuContentTopic; use crate::general::pubsubtopic::PubsubTopic; @@ -11,22 +13,29 @@ use crate::general::Result; use crate::node::context::WakuNodeContext; use crate::utils::{get_trampoline, handle_no_response, LibwakuResponse}; -pub fn waku_filter_subscribe( +pub async fn waku_filter_subscribe( ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic, content_topics: Vec, ) -> Result<()> { let content_topics = WakuContentTopic::join_content_topics(content_topics); - let pubsub_topic_ptr = CString::new(String::from(pubsub_topic)) - .expect("CString should build properly from pubsub topic") - .into_raw(); - let content_topics_ptr = CString::new(content_topics) - .expect("CString should build properly from content topic") - .into_raw(); + let pubsub_topic = CString::new(String::from(pubsub_topic)) + .expect("CString should build properly from pubsub topic"); + let pubsub_topic_ptr = pubsub_topic.as_ptr(); + + let content_topics = + CString::new(content_topics).expect("CString should build properly from content topic"); + let content_topics_ptr = content_topics.as_ptr(); + + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); @@ -38,31 +47,35 @@ pub fn waku_filter_subscribe( &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(pubsub_topic_ptr)); - drop(CString::from_raw(content_topics_ptr)); - out }; + notify.notified().await; // Wait until a result is received handle_no_response(code, result) } -pub fn waku_filter_unsubscribe( +pub async fn waku_filter_unsubscribe( ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic, content_topics: Vec, // comma-separated list of content topics ) -> Result<()> { let content_topics_topics = WakuContentTopic::join_content_topics(content_topics); - let pubsub_topic_ptr = CString::new(String::from(pubsub_topic)) - .expect("CString should build properly from pubsub topic") - .into_raw(); - let content_topics_topics_ptr = CString::new(content_topics_topics) - .expect("CString should build properly from content topic") - .into_raw(); + let pubsub_topic = CString::new(String::from(pubsub_topic)) + .expect("CString should build properly from pubsub topic"); + let pubsub_topic_ptr = pubsub_topic.as_ptr(); - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; + let content_topics_topics = CString::new(content_topics_topics) + .expect("CString should build properly from content topic"); + let content_topics_topics_ptr = content_topics_topics.as_ptr(); + + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); @@ -74,18 +87,21 @@ pub fn waku_filter_unsubscribe( &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(pubsub_topic_ptr)); - drop(CString::from_raw(content_topics_topics_ptr)); - out }; + notify.notified().await; // Wait until a result is received handle_no_response(code, result) } -pub fn waku_filter_unsubscribe_all(ctx: &WakuNodeContext) -> Result<()> { - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; +pub async fn waku_filter_unsubscribe_all(ctx: &WakuNodeContext) -> Result<()> { + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); @@ -96,5 +112,6 @@ pub fn waku_filter_unsubscribe_all(ctx: &WakuNodeContext) -> Result<()> { ) }; + notify.notified().await; // Wait until a result is received handle_no_response(code, result) } diff --git a/waku-bindings/src/node/lightpush.rs b/waku-bindings/src/node/lightpush.rs index 3b538eb..297c223 100644 --- a/waku-bindings/src/node/lightpush.rs +++ b/waku-bindings/src/node/lightpush.rs @@ -4,6 +4,8 @@ use std::ffi::CString; // crates use libc::*; +use std::sync::Arc; +use tokio::sync::Notify; // internal use crate::general::{messagehash::MessageHash, Result, WakuMessage}; use crate::node::context::WakuNodeContext; @@ -11,23 +13,30 @@ use crate::utils::{get_trampoline, handle_response, LibwakuResponse}; use crate::general::pubsubtopic::PubsubTopic; -pub fn waku_lightpush_publish_message( +pub async fn waku_lightpush_publish_message( ctx: &WakuNodeContext, message: &WakuMessage, pubsub_topic: &PubsubTopic, ) -> Result { - let message_ptr = CString::new( + let message = CString::new( serde_json::to_string(&message) .expect("WakuMessages should always be able to success serializing"), ) - .expect("CString should build properly from the serialized waku message") - .into_raw(); - let pubsub_topic_ptr = CString::new(String::from(pubsub_topic)) - .expect("CString should build properly from pubsub topic") - .into_raw(); + .expect("CString should build properly from the serialized waku message"); + let message_ptr = message.as_ptr(); + + let pubsub_topic = CString::new(String::from(pubsub_topic)) + .expect("CString should build properly from pubsub topic"); + let pubsub_topic_ptr = pubsub_topic.as_ptr(); + + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); @@ -39,11 +48,9 @@ pub fn waku_lightpush_publish_message( &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(message_ptr)); - drop(CString::from_raw(pubsub_topic_ptr)); - out }; + notify.notified().await; // Wait until a result is received handle_response(code, result) } diff --git a/waku-bindings/src/node/management.rs b/waku-bindings/src/node/management.rs index 8eb24f0..16fc9a0 100644 --- a/waku-bindings/src/node/management.rs +++ b/waku-bindings/src/node/management.rs @@ -5,37 +5,44 @@ use std::ffi::CString; // crates use libc::c_void; use multiaddr::Multiaddr; +use std::sync::Arc; +use tokio::sync::Notify; // internal use super::config::WakuNodeConfig; use crate::general::Result; use crate::node::context::WakuNodeContext; use crate::utils::LibwakuResponse; use crate::utils::WakuDecode; -use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response}; +use crate::utils::{get_trampoline, handle_no_response, handle_response}; /// Instantiates a Waku node /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig) -pub fn waku_new(config: Option) -> Result { +pub async fn waku_new(config: Option) -> Result { let config = config.unwrap_or_default(); - let config_ptr = CString::new( + let config = CString::new( serde_json::to_string(&config) .expect("Serialization from properly built NodeConfig should never fail"), ) - .expect("CString should build properly from the config") - .into_raw(); + .expect("CString should build properly from the config"); + let config_ptr = config.as_ptr(); - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let mut result = LibwakuResponse::default(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; let obj_ptr = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_new(config_ptr, cb, &mut closure as *mut _ as *mut c_void); - drop(CString::from_raw(config_ptr)); - out }; + notify.notified().await; // Wait until a result is received + match result { LibwakuResponse::MissingCallback => panic!("callback is required"), LibwakuResponse::Failure(v) => Err(v), @@ -43,57 +50,81 @@ pub fn waku_new(config: Option) -> Result { } } -pub fn waku_destroy(ctx: &WakuNodeContext) -> Result<()> { - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; +pub async fn waku_destroy(ctx: &WakuNodeContext) -> Result<()> { + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); waku_sys::waku_destroy(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) }; + notify.notified().await; // Wait until a result is received handle_no_response(code, result) } /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) -pub fn waku_start(ctx: &WakuNodeContext) -> Result<()> { - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; +pub async fn waku_start(ctx: &WakuNodeContext) -> Result<()> { + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); waku_sys::waku_start(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) }; + notify.notified().await; // Wait until a result is received handle_no_response(code, result) } /// Stops a Waku node /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) -pub fn waku_stop(ctx: &WakuNodeContext) -> Result<()> { - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; +pub async fn waku_stop(ctx: &WakuNodeContext) -> Result<()> { + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); waku_sys::waku_stop(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) }; + notify.notified().await; // Wait until a result is received handle_no_response(code, result) } /// nwaku version #[allow(clippy::not_unsafe_ptr_arg_deref)] -pub fn waku_version(ctx: &WakuNodeContext) -> Result { - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; +pub async fn waku_version(ctx: &WakuNodeContext) -> Result { + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); waku_sys::waku_version(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) }; + notify.notified().await; // Wait until a result is received handle_response(code, result) } @@ -110,45 +141,54 @@ impl WakuDecode for Vec { /// Get the multiaddresses the Waku node is listening to /// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses) -pub fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result> { - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; +pub async fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result> { + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; + let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); waku_sys::waku_listen_addresses(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) }; - handle_json_response(code, result) + notify.notified().await; // Wait until a result is received + handle_response(code, result) } #[cfg(test)] mod test { use super::waku_new; use crate::node::management::{waku_listen_addresses, waku_start, waku_stop, waku_version}; - use serial_test::serial; - #[test] - #[serial] - fn waku_flow() { - let node = waku_new(None).unwrap(); + #[tokio::test] + async fn waku_flow() { + let node = waku_new(None).await.unwrap(); - waku_start(&node).unwrap(); + waku_start(&node).await.unwrap(); // test addresses - let addresses = waku_listen_addresses(&node).unwrap(); + let addresses = waku_listen_addresses(&node).await.unwrap(); dbg!(&addresses); assert!(!addresses.is_empty()); - waku_stop(&node).unwrap(); + waku_stop(&node).await.unwrap(); } - #[test] - #[serial] - fn nwaku_version() { - let node = waku_new(None).unwrap(); - let version = waku_version(&node).expect("should return the version"); + #[tokio::test] + async fn nwaku_version() { + let node = waku_new(None).await.unwrap(); + + let version = waku_version(&node) + .await + .expect("should return the version"); + print!("Current version: {}", version); + assert!(!version.is_empty()); } } diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index 3c022e9..483c7d4 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -44,34 +44,41 @@ pub struct WakuNodeHandle { /// Spawn a new Waku node with the given configuration (default configuration if `None` provided) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig) -pub fn waku_new(config: Option) -> Result> { +pub async fn waku_new(config: Option) -> Result> { Ok(WakuNodeHandle { - ctx: management::waku_new(config)?, + ctx: management::waku_new(config).await?, _state: PhantomData, }) } impl WakuNodeHandle { /// Get the nwaku version - pub fn version(&self) -> Result { - management::waku_version(&self.ctx) + pub async fn version(&self) -> Result { + management::waku_version(&self.ctx).await } - pub fn waku_destroy(self) -> Result<()> { - let res = management::waku_destroy(&self.ctx); + pub async fn waku_destroy(self) -> Result<()> { + let res = management::waku_destroy(&self.ctx).await; self.ctx.reset_ptr(); res } + + /// Subscribe to WakuRelay to receive messages matching a content filter. + pub async fn relay_subscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> { + relay::waku_relay_subscribe(&self.ctx, pubsub_topic).await + } } impl WakuNodeHandle { /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) - pub fn start(self) -> Result> { - management::waku_start(&self.ctx).map(|_| WakuNodeHandle { - ctx: self.ctx, - _state: PhantomData, - }) + pub async fn start(self) -> Result> { + management::waku_start(&self.ctx) + .await + .map(|_| WakuNodeHandle { + ctx: self.ctx, + _state: PhantomData, + }) } pub fn set_event_callback( @@ -85,17 +92,19 @@ impl WakuNodeHandle { impl WakuNodeHandle { /// Stops a Waku node /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) - pub fn stop(self) -> Result> { - management::waku_stop(&self.ctx).map(|_| WakuNodeHandle { - ctx: self.ctx, - _state: PhantomData, - }) + pub async fn stop(self) -> Result> { + management::waku_stop(&self.ctx) + .await + .map(|_| WakuNodeHandle { + ctx: self.ctx, + _state: PhantomData, + }) } /// Get the multiaddresses the Waku node is listening to /// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses) - pub fn listen_addresses(&self) -> Result> { - management::waku_listen_addresses(&self.ctx) + pub async fn listen_addresses(&self) -> Result> { + management::waku_listen_addresses(&self.ctx).await } /// Dial peer using a multiaddress @@ -103,11 +112,11 @@ impl WakuNodeHandle { /// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned. /// Use 0 for no timeout /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms) - pub fn connect(&self, address: &Multiaddr, timeout: Option) -> Result<()> { - peers::waku_connect(&self.ctx, address, timeout) + pub async fn connect(&self, address: &Multiaddr, timeout: Option) -> Result<()> { + peers::waku_connect(&self.ctx, address, timeout).await } - pub fn relay_publish_txt( + pub async fn relay_publish_txt( &self, pubsub_topic: &PubsubTopic, msg_txt: &String, @@ -129,60 +138,55 @@ impl WakuNodeHandle { false, ); - relay::waku_relay_publish_message(&self.ctx, &message, pubsub_topic, timeout) + relay::waku_relay_publish_message(&self.ctx, &message, pubsub_topic, timeout).await } /// Publish a message using Waku Relay. /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms) /// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic. - pub fn relay_publish_message( + pub async fn relay_publish_message( &self, message: &WakuMessage, pubsub_topic: &PubsubTopic, timeout: Option, ) -> Result { - relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout) - } - - /// Subscribe to WakuRelay to receive messages matching a content filter. - pub fn relay_subscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> { - relay::waku_relay_subscribe(&self.ctx, pubsub_topic) + relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout).await } /// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic - pub fn relay_unsubscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> { - relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic) + pub async fn relay_unsubscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> { + relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic).await } - pub fn filter_subscribe( + pub async fn filter_subscribe( &self, pubsub_topic: &PubsubTopic, content_topics: Vec, ) -> Result<()> { - filter::waku_filter_subscribe(&self.ctx, pubsub_topic, content_topics) + filter::waku_filter_subscribe(&self.ctx, pubsub_topic, content_topics).await } - pub fn filter_unsubscribe( + pub async fn filter_unsubscribe( &self, pubsub_topic: &PubsubTopic, content_topics: Vec, ) -> Result<()> { - filter::waku_filter_unsubscribe(&self.ctx, pubsub_topic, content_topics) + filter::waku_filter_unsubscribe(&self.ctx, pubsub_topic, content_topics).await } - pub fn filter_unsubscribe_all(&self) -> Result<()> { - filter::waku_filter_unsubscribe_all(&self.ctx) + pub async fn filter_unsubscribe_all(&self) -> Result<()> { + filter::waku_filter_unsubscribe_all(&self.ctx).await } - pub fn lightpush_publish_message( + pub async fn lightpush_publish_message( &self, message: &WakuMessage, pubsub_topic: &PubsubTopic, ) -> Result { - lightpush::waku_lightpush_publish_message(&self.ctx, message, pubsub_topic) + lightpush::waku_lightpush_publish_message(&self.ctx, message, pubsub_topic).await } - pub fn store_query( + pub async fn store_query( &self, pubsub_topic: Option, content_topics: Vec, @@ -212,7 +216,8 @@ impl WakuNodeHandle { Some(25), // pagination_limit, peer_addr, None, // timeout_millis - )?; + ) + .await?; messages.extend(response.messages); diff --git a/waku-bindings/src/node/peers.rs b/waku-bindings/src/node/peers.rs index a5bf0ef..ac7616c 100644 --- a/waku-bindings/src/node/peers.rs +++ b/waku-bindings/src/node/peers.rs @@ -6,6 +6,8 @@ use std::time::Duration; // crates use libc::*; use multiaddr::Multiaddr; +use std::sync::Arc; +use tokio::sync::Notify; // internal use crate::general::Result; use crate::node::context::WakuNodeContext; @@ -17,17 +19,23 @@ use crate::utils::{get_trampoline, handle_no_response}; /// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned. /// Use 0 for no timeout /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms) -pub fn waku_connect( +pub async fn waku_connect( ctx: &WakuNodeContext, address: &Multiaddr, timeout: Option, ) -> Result<()> { - let address_ptr = CString::new(address.to_string()) - .expect("CString should build properly from multiaddress") - .into_raw(); + let address = + CString::new(address.to_string()).expect("CString should build properly from multiaddress"); - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; + let address_ptr = address.as_ptr(); + + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); @@ -41,10 +49,9 @@ pub fn waku_connect( &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(address_ptr)); - out }; + notify.notified().await; // Wait until a result is received handle_no_response(code, result) } diff --git a/waku-bindings/src/node/relay.rs b/waku-bindings/src/node/relay.rs index 5aa1ff1..efa0360 100644 --- a/waku-bindings/src/node/relay.rs +++ b/waku-bindings/src/node/relay.rs @@ -2,7 +2,9 @@ // std use std::ffi::CString; +use std::sync::Arc; use std::time::Duration; +use tokio::sync::Notify; // crates use libc::*; // internal @@ -15,25 +17,32 @@ use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuR /// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding) #[allow(clippy::not_unsafe_ptr_arg_deref)] -pub fn waku_create_content_topic( +pub async fn waku_create_content_topic( ctx: &WakuNodeContext, application_name: &str, application_version: u32, content_topic_name: &str, encoding: Encoding, ) -> WakuContentTopic { - let application_name_ptr = CString::new(application_name) - .expect("Application name should always transform to CString") - .into_raw(); - let content_topic_name_ptr = CString::new(content_topic_name) - .expect("Content topic should always transform to CString") - .into_raw(); - let encoding_ptr = CString::new(encoding.to_string()) - .expect("Encoding should always transform to CString") - .into_raw(); + let application_name = CString::new(application_name) + .expect("Application name should always transform to CString"); + let application_name_ptr = application_name.as_ptr(); - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; + let content_topic_name = + CString::new(content_topic_name).expect("Content topic should always transform to CString"); + let content_topic_name_ptr = content_topic_name.as_ptr(); + + let encoding = + CString::new(encoding.to_string()).expect("Encoding should always transform to CString"); + let encoding_ptr = encoding.as_ptr(); + + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); @@ -47,43 +56,44 @@ pub fn waku_create_content_topic( &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(application_name_ptr)); - drop(CString::from_raw(content_topic_name_ptr)); - drop(CString::from_raw(encoding_ptr)); - out }; + notify.notified().await; // Wait until a result is received handle_response(code, result).expect("&str from result should always be extracted") } /// Publish a message using Waku Relay /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms) -pub fn waku_relay_publish_message( +pub async fn waku_relay_publish_message( ctx: &WakuNodeContext, message: &WakuMessage, pubsub_topic: &PubsubTopic, timeout: Option, ) -> Result { - let message_ptr = CString::new( + let message = CString::new( serde_json::to_string(&message) .expect("WakuMessages should always be able to success serializing"), ) - .expect("CString should build properly from the serialized waku message") - .into_raw(); - let pubsub_topic_ptr = CString::new(String::from(pubsub_topic)) - .expect("CString should build properly from pubsub topic") - .into_raw(); + .expect("CString should build properly from the serialized waku message"); - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; + let pubsub_topic = CString::new(String::from(pubsub_topic)) + .expect("CString should build properly from pubsub topic"); + + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_relay_publish( ctx.get_ptr(), - pubsub_topic_ptr, - message_ptr, + pubsub_topic.as_ptr(), + message.as_ptr(), timeout .map(|duration| { duration @@ -96,22 +106,25 @@ pub fn waku_relay_publish_message( &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(message_ptr)); - drop(CString::from_raw(pubsub_topic_ptr)); - out }; + notify.notified().await; // Wait until a result is received handle_response(code, result) } -pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> { - let pubsub_topic_ptr = CString::new(String::from(pubsub_topic)) - .expect("CString should build properly from pubsub topic") - .into_raw(); +pub async fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> { + let pubsub_topic = CString::new(String::from(pubsub_topic)) + .expect("CString should build properly from pubsub topic"); + let pubsub_topic_ptr = pubsub_topic.as_ptr(); - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); @@ -122,21 +135,28 @@ pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) - &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(pubsub_topic_ptr)); - out }; + notify.notified().await; // Wait until a result is received handle_no_response(code, result) } -pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> { - let pubsub_topic_ptr = CString::new(String::from(pubsub_topic)) - .expect("CString should build properly from pubsub topic") - .into_raw(); +pub async fn waku_relay_unsubscribe( + ctx: &WakuNodeContext, + pubsub_topic: &PubsubTopic, +) -> Result<()> { + let pubsub_topic = CString::new(String::from(pubsub_topic)) + .expect("CString should build properly from pubsub topic"); + let pubsub_topic_ptr = pubsub_topic.as_ptr(); - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); @@ -147,10 +167,9 @@ pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(pubsub_topic_ptr)); - out }; + notify.notified().await; // Wait until a result is received handle_no_response(code, result) } diff --git a/waku-bindings/src/node/store.rs b/waku-bindings/src/node/store.rs index d436cb0..d8efb34 100644 --- a/waku-bindings/src/node/store.rs +++ b/waku-bindings/src/node/store.rs @@ -4,13 +4,15 @@ use std::ffi::CString; // crates use libc::*; +use std::sync::Arc; +use tokio::sync::Notify; // internal use crate::general::{ contenttopic::WakuContentTopic, messagehash::MessageHash, pubsubtopic::PubsubTopic, Result, WakuStoreRespMessage, }; use crate::node::context::WakuNodeContext; -use crate::utils::{get_trampoline, handle_json_response, LibwakuResponse, WakuDecode}; +use crate::utils::{get_trampoline, handle_response, LibwakuResponse, WakuDecode}; use multiaddr::Multiaddr; use serde::{Deserialize, Serialize}; @@ -76,7 +78,7 @@ impl WakuDecode for StoreResponse { } } -pub fn waku_store_query( +pub async fn waku_store_query( ctx: &WakuNodeContext, request_id: String, include_data: bool, @@ -107,37 +109,39 @@ pub fn waku_store_query( let json_query = CString::new( serde_json::to_string(&query).expect("StoreQuery should always be able to be serialized"), ) - .expect("CString should build properly from the serialized filter subscription") - .into_raw(); + .expect("CString should build properly from the serialized filter subscription"); + let json_query_ptr = json_query.as_ptr(); peer_addr .parse::() .expect("correct multiaddress in store query"); - let peer_addr = CString::new(peer_addr) - .expect("peer_addr CString should be created") - .into_raw(); + let peer_addr = CString::new(peer_addr).expect("peer_addr CString should be created"); + let peer_addr_ptr = peer_addr.as_ptr(); let timeout_millis = timeout_millis.unwrap_or(10000i32); - let mut result: LibwakuResponse = Default::default(); - let result_cb = |r: LibwakuResponse| result = r; + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); // Notify that the value has been updated + }; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_store_query( ctx.get_ptr(), - json_query, - peer_addr, + json_query_ptr, + peer_addr_ptr, timeout_millis, cb, &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(json_query)); - drop(CString::from_raw(peer_addr)); - out }; - handle_json_response(code, result) + notify.notified().await; // Wait until a result is received + handle_response(code, result) } diff --git a/waku-bindings/src/utils.rs b/waku-bindings/src/utils.rs index d4ec81a..3a6e606 100644 --- a/waku-bindings/src/utils.rs +++ b/waku-bindings/src/utils.rs @@ -1,11 +1,10 @@ use crate::general::Result; -use core::str::FromStr; use std::convert::TryFrom; use std::{slice, str}; use waku_sys::WakuCallBack; use waku_sys::{RET_ERR, RET_MISSING_CALLBACK, RET_OK}; -#[derive(Debug, Default, PartialEq)] +#[derive(Debug, Clone, Default, PartialEq)] pub enum LibwakuResponse { Success(Option), Failure(String), @@ -36,6 +35,12 @@ pub trait WakuDecode: Sized { fn decode(input: &str) -> Result; } +impl WakuDecode for String { + fn decode(input: &str) -> Result { + Ok(input.to_string()) + } +} + pub fn decode(input: String) -> Result { T::decode(input.as_str()) } @@ -87,7 +92,7 @@ pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> { } } -pub fn handle_json_response(code: i32, result: LibwakuResponse) -> Result { +pub fn handle_response(code: i32, result: LibwakuResponse) -> Result { match result { LibwakuResponse::Success(v) => decode(v.unwrap_or_default()), LibwakuResponse::Failure(v) => Err(v), @@ -98,21 +103,3 @@ pub fn handle_json_response(code: i32, result: LibwakuResponse) - ), } } - -pub fn handle_response(code: i32, result: LibwakuResponse) -> Result -where - ::Err: std::fmt::Debug, -{ - match result { - LibwakuResponse::Success(v) => v - .unwrap_or_default() - .parse() - .map_err(|e| format!("could not parse value: {:?}", e)), - LibwakuResponse::Failure(v) => Err(v), - LibwakuResponse::MissingCallback => panic!("callback is required"), - LibwakuResponse::Undefined => panic!( - "undefined ffi state: code({}) was returned but callback was not executed", - code - ), - } -} diff --git a/waku-bindings/tests/node.rs b/waku-bindings/tests/node.rs index 4c9fd81..9bb93cc 100644 --- a/waku-bindings/tests/node.rs +++ b/waku-bindings/tests/node.rs @@ -18,15 +18,13 @@ const ECHO_TIMEOUT: u64 = 1000; const ECHO_MESSAGE: &str = "Hi from 🦀!"; const TEST_PUBSUBTOPIC: &str = "test"; -fn try_publish_relay_messages( +async fn try_publish_relay_messages( node: &WakuNodeHandle, msg: &WakuMessage, ) -> Result, String> { - Ok(HashSet::from([node.relay_publish_message( - msg, - &PubsubTopic::new(TEST_PUBSUBTOPIC), - None, - )?])) + Ok(HashSet::from([node + .relay_publish_message(msg, &PubsubTopic::new(TEST_PUBSUBTOPIC), None) + .await?])) } async fn test_echo_messages( @@ -66,21 +64,23 @@ async fn test_echo_messages( .set_event_callback(closure) .expect("set event call back working"); // Set the event callback with the closure - let node1 = node1.start()?; - let node2 = node2.start()?; + let node1 = node1.start().await?; + let node2 = node2.start().await?; node1 .relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC)) + .await .unwrap(); node2 .relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC)) + .await .unwrap(); sleep(Duration::from_secs(3)).await; // Interconnect nodes // Replace all matches with 127.0.0.1 to avoid issue with NAT or firewall. - let addresses1 = node1.listen_addresses().unwrap(); + let addresses1 = node1.listen_addresses().await.unwrap(); let addresses1 = &addresses1[0].to_string(); let re = Regex::new(r"\b(?:\d{1,3}\.){3}\d{1,3}\b").unwrap(); @@ -89,7 +89,7 @@ async fn test_echo_messages( let addresses1 = addresses1.parse::().expect("parse multiaddress"); println!("Connecting node1 to node2: {}", addresses1); - node2.connect(&addresses1, None).unwrap(); + node2.connect(&addresses1, None).await.unwrap(); // Wait for mesh to form sleep(Duration::from_secs(3)).await; @@ -108,7 +108,9 @@ async fn test_echo_messages( Vec::new(), false, ); - let _ids = try_publish_relay_messages(&node1, &message).expect("send relay messages"); + let _ids = try_publish_relay_messages(&node1, &message) + .await + .expect("send relay messages"); // Wait for the msg to arrive let rx_waku_message_cloned = rx_waku_message.clone(); @@ -118,8 +120,8 @@ async fn test_echo_messages( let payload = msg.payload.to_vec(); let payload_str = from_utf8(&payload).expect("should be valid message"); if payload_str == ECHO_MESSAGE { - node1.stop()?; - node2.stop()?; + node1.stop().await?; + node2.stop().await?; return Ok(()); } } else { @@ -127,11 +129,11 @@ async fn test_echo_messages( } } - let node1 = node1.stop()?; - let node2 = node2.stop()?; + let node1 = node1.stop().await?; + let node2 = node2.stop().await?; - node1.waku_destroy()?; - node2.waku_destroy()?; + node1.waku_destroy().await?; + node2.waku_destroy().await?; return Err("Unexpected test ending".to_string()); } @@ -143,11 +145,13 @@ async fn default_echo() -> Result<(), String> { let node1 = waku_new(Some(WakuNodeConfig { tcp_port: Some(60010), ..Default::default() - }))?; + })) + .await?; let node2 = waku_new(Some(WakuNodeConfig { tcp_port: Some(60020), ..Default::default() - }))?; + })) + .await?; let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto); @@ -165,9 +169,8 @@ async fn default_echo() -> Result<(), String> { Ok(()) } -#[test] -#[serial] -fn node_restart() { +#[tokio::test] +async fn node_restart() { let config = WakuNodeConfig { node_key: Some( SecretKey::from_str("05f381866cc21f6c1e2e80e07fa732008e36d942dce3206ad6dcd6793c98d609") @@ -177,9 +180,14 @@ fn node_restart() { }; for _ in 0..3 { - let node = waku_new(config.clone().into()).expect("default config should be valid"); - let node = node.start().expect("node should start with valid config"); - let node = node.stop().expect("node should stop"); - node.waku_destroy().expect("free resources"); + let node = waku_new(config.clone().into()) + .await + .expect("default config should be valid"); + let node = node + .start() + .await + .expect("node should start with valid config"); + let node = node.stop().await.expect("node should stop"); + node.waku_destroy().await.expect("free resources"); } } diff --git a/waku-sys/vendor b/waku-sys/vendor index 1fa9165..1d206a5 160000 --- a/waku-sys/vendor +++ b/waku-sys/vendor @@ -1 +1 @@ -Subproject commit 1fa916589d3a69a2bb770aba27d4124b929cc4b2 +Subproject commit 1d206a5f5e89f7bac855bfd2e25066bcf1187ade