From 2fd169bda6d260214ed8d6ee91bdfecd22bedff9 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Wed, 27 Nov 2024 17:54:04 +0100 Subject: [PATCH] general improvements --- Cargo.lock | 1 + examples/Cargo.lock | 30 +---- examples/basic/src/main.rs | 101 ++++++++-------- examples/tic-tac-toe-gui/src/main.rs | 7 +- waku-bindings/Cargo.toml | 1 + waku-bindings/src/general/contenttopic.rs | 134 ++++++++++++++++++++++ waku-bindings/src/general/mod.rs | 128 +-------------------- waku-bindings/src/lib.rs | 9 +- waku-bindings/src/node/config.rs | 3 +- waku-bindings/src/node/context.rs | 5 + waku-bindings/src/node/filter.rs | 9 +- waku-bindings/src/node/management.rs | 1 + waku-bindings/src/node/mod.rs | 25 ++-- waku-bindings/src/node/relay.rs | 3 +- waku-bindings/tests/node.rs | 64 +++++++---- 15 files changed, 272 insertions(+), 249 deletions(-) create mode 100644 waku-bindings/src/general/contenttopic.rs diff --git a/Cargo.lock b/Cargo.lock index 066b0ba..06bdc08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3060,6 +3060,7 @@ dependencies = [ "multiaddr", "once_cell", "rand", + "regex", "rln", "secp256k1 0.26.0", "serde", diff --git a/examples/Cargo.lock b/examples/Cargo.lock index f3426b5..5627c99 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -4277,15 +4277,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "termcolor" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adc4587ead41bf016f11af03e55a624c06568b5a19db4e90fde573d805074f83" -dependencies = [ - "wincolor", -] - [[package]] name = "textwrap" version = "0.11.0" @@ -4325,17 +4316,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "tic-tac-toe" -version = "0.1.0" -dependencies = [ - "ark-std", - "ctrlc", - "serde_json", - "termcolor", - "waku-bindings", -] - [[package]] name = "tic-tac-toe-gui" version = "0.1.0" @@ -4681,6 +4661,7 @@ dependencies = [ "multiaddr", "once_cell", "rand", + "regex", "rln", "secp256k1 0.26.0", "serde", @@ -5206,15 +5187,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" -[[package]] -name = "wincolor" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eeb06499a3a4d44302791052df005d5232b927ed1a9658146d842165c4de7767" -dependencies = [ - "winapi", -] - [[package]] name = "windows" version = "0.48.0" diff --git a/examples/basic/src/main.rs b/examples/basic/src/main.rs index 177bb29..ac11429 100644 --- a/examples/basic/src/main.rs +++ b/examples/basic/src/main.rs @@ -3,70 +3,73 @@ use std::str::from_utf8; use std::time::SystemTime; use tokio::time::{sleep, Duration}; use waku::{ - waku_destroy, waku_new, Encoding, Event, LibwakuResponse, WakuContentTopic, WakuMessage, - WakuNodeConfig, + waku_new, Encoding, Event, LibwakuResponse, WakuContentTopic, WakuMessage, WakuNodeConfig, }; #[tokio::main] async fn main() -> Result<(), Error> { let node1 = waku_new(Some(WakuNodeConfig { - port: Some(60010), // TODO: use any available port. + tcp_port: Some(60010), // TODO: use any available port. ..Default::default() })) .expect("should instantiate"); let node2 = waku_new(Some(WakuNodeConfig { - port: Some(60020), // TODO: use any available port. + tcp_port: Some(60020), // TODO: use any available port. ..Default::default() })) .expect("should instantiate"); - node1.start().expect("node1 should start"); - node2.start().expect("node2 should start"); - // ======================================================================== // Setting an event callback to be executed each time a message is received - node2.ctx.waku_set_event_callback(&|response| { - if let LibwakuResponse::Success(v) = response { - let event: Event = - serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); + node2 + .set_event_callback(&|response| { + if let LibwakuResponse::Success(v) = response { + let event: Event = + serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); - match event { - Event::WakuMessage(evt) => { - println!("WakuMessage event received: {:?}", evt.waku_message); - let message = evt.waku_message; - let payload = message.payload.to_vec(); - let msg = from_utf8(&payload).expect("should be valid message"); - println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); - println!("Message Received in NODE 2: {}", msg); - println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); - } - Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err), - _ => panic!("event case not expected"), - }; - } - }); + match event { + Event::WakuMessage(evt) => { + println!("WakuMessage event received: {:?}", evt.waku_message); + let message = evt.waku_message; + let payload = message.payload.to_vec(); + let msg = from_utf8(&payload).expect("should be valid message"); + println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); + println!("Message Received in NODE 2: {}", msg); + println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); + } + Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err), + _ => panic!("event case not expected"), + }; + } + }) + .expect("set event call back working"); - node1.ctx.waku_set_event_callback(&|response| { - if let LibwakuResponse::Success(v) = response { - let event: Event = - serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); + node1 + .set_event_callback(&|response| { + if let LibwakuResponse::Success(v) = response { + let event: Event = + serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); - match event { - Event::WakuMessage(evt) => { - println!("WakuMessage event received: {:?}", evt.waku_message); - let message = evt.waku_message; - let payload = message.payload.to_vec(); - let msg = from_utf8(&payload).expect("should be valid message"); - println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); - println!("Message Received in NODE 1: {}", msg); - println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); - } - Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err), - _ => panic!("event case not expected"), - }; - } - }); + match event { + Event::WakuMessage(evt) => { + println!("WakuMessage event received: {:?}", evt.waku_message); + let message = evt.waku_message; + let payload = message.payload.to_vec(); + let msg = from_utf8(&payload).expect("should be valid message"); + println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); + println!("Message Received in NODE 1: {}", msg); + println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); + } + Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err), + _ => panic!("event case not expected"), + }; + } + }) + .expect("set event call back working"); + + let node1 = node1.start().expect("node1 should start"); + let node2 = node2.start().expect("node2 should start"); // ======================================================================== // Subscribe to pubsub topic @@ -125,13 +128,13 @@ async fn main() -> Result<(), Error> { // ======================================================================== // Stop both instances - node1.stop().expect("should stop"); - node2.stop().expect("should stop"); + let node1 = node1.stop().expect("should stop"); + let node2 = node2.stop().expect("should stop"); // ======================================================================== // Free resources - waku_destroy(node1).expect("should deallocate"); - waku_destroy(node2).expect("should deallocate"); + node1.waku_destroy().expect("should deallocate"); + node2.waku_destroy().expect("should deallocate"); Ok(()) } diff --git a/examples/tic-tac-toe-gui/src/main.rs b/examples/tic-tac-toe-gui/src/main.rs index bafc45e..83c050e 100644 --- a/examples/tic-tac-toe-gui/src/main.rs +++ b/examples/tic-tac-toe-gui/src/main.rs @@ -90,8 +90,9 @@ impl TicTacToeApp { // Subscribe to desired topic using the relay protocol // self.waku.relay_subscribe(&self.game_topic.to_string()).expect("waku should subscribe"); - let content_topic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto); - waku.filter_subscribe(&self.game_topic.to_string(), &content_topic.to_string()).expect("waku should subscribe"); + let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto); + let content_topics = vec![ctopic]; + waku.filter_subscribe(&self.game_topic.to_string(), content_topics).expect("waku should subscribe"); // Connect to hard-coded node // let target_node_multi_addr = @@ -308,7 +309,7 @@ async fn main() -> eframe::Result<()> { let game_topic = "/waku/2/rs/16/32"; // Create a Waku instance let waku = waku_new(Some(WakuNodeConfig { - port: Some(60010), + tcp_port: Some(60010), cluster_id: Some(16), shards: vec![1, 32, 64, 128, 256], // node_key: Some(SecretKey::from_str("2fc0515879e52b7b73297cfd6ab3abf7c344ef84b7a90ff6f4cc19e05a198027").unwrap()), diff --git a/waku-bindings/Cargo.toml b/waku-bindings/Cargo.toml index 89f5234..58c5830 100644 --- a/waku-bindings/Cargo.toml +++ b/waku-bindings/Cargo.toml @@ -32,6 +32,7 @@ libc = "0.2" serde-aux = "4.3.1" rln = "0.3.4" tokio = { version = "1", features = ["full"] } +regex = "1" [dev-dependencies] futures = "0.3.25" diff --git a/waku-bindings/src/general/contenttopic.rs b/waku-bindings/src/general/contenttopic.rs new file mode 100644 index 0000000..381f57a --- /dev/null +++ b/waku-bindings/src/general/contenttopic.rs @@ -0,0 +1,134 @@ +// std +use std::borrow::Cow; +use std::fmt::{Display, Formatter}; +use std::str::FromStr; + +use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; + +use sscanf::{scanf, RegexRepresentation}; + +/// WakuMessage encoding scheme +#[derive(Clone, Debug, Eq, PartialEq, Default)] +pub enum Encoding { + #[default] + Proto, + Rlp, + Rfc26, + Unknown(String), +} + +impl Display for Encoding { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let s = match self { + Encoding::Proto => "proto", + Encoding::Rlp => "rlp", + Encoding::Rfc26 => "rfc26", + Encoding::Unknown(value) => value, + }; + f.write_str(s) + } +} + +impl FromStr for Encoding { + type Err = std::io::Error; + + fn from_str(s: &str) -> std::result::Result { + match s.to_lowercase().as_str() { + "proto" => Ok(Self::Proto), + "rlp" => Ok(Self::Rlp), + "rfc26" => Ok(Self::Rfc26), + encoding => Ok(Self::Unknown(encoding.to_string())), + } + } +} + +impl RegexRepresentation for Encoding { + const REGEX: &'static str = r"\w"; +} + +/// A waku content topic `/{application_name}/{version}/{content_topic_name}/{encdoing}` +#[derive(Clone, Debug, Eq, PartialEq, Default)] +pub struct WakuContentTopic { + pub application_name: Cow<'static, str>, + pub version: Cow<'static, str>, + pub content_topic_name: Cow<'static, str>, + pub encoding: Encoding, +} + +impl WakuContentTopic { + pub const fn new( + application_name: &'static str, + version: &'static str, + content_topic_name: &'static str, + encoding: Encoding, + ) -> Self { + Self { + application_name: Cow::Borrowed(application_name), + version: Cow::Borrowed(version), + content_topic_name: Cow::Borrowed(content_topic_name), + encoding, + } + } + + pub fn join_content_topics(topics: Vec) -> String { + topics + .iter() + .map(|topic| topic.to_string()) + .collect::>() + .join(",") + } +} + +impl FromStr for WakuContentTopic { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + if let Ok((application_name, version, content_topic_name, encoding)) = + scanf!(s, "/{}/{}/{}/{:/.+?/}", String, String, String, Encoding) + { + Ok(WakuContentTopic { + application_name: Cow::Owned(application_name), + version: Cow::Owned(version), + content_topic_name: Cow::Owned(content_topic_name), + encoding, + }) + } else { + Err( + format!( + "Wrong pub-sub topic format. Should be `/{{application-name}}/{{version-of-the-application}}/{{content-topic-name}}/{{encoding}}`. Got: {s}" + ) + ) + } + } +} + +impl Display for WakuContentTopic { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "/{}/{}/{}/{}", + self.application_name, self.version, self.content_topic_name, self.encoding + ) + } +} + +impl Serialize for WakuContentTopic { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: Serializer, + { + self.to_string().serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for WakuContentTopic { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + let as_string: String = String::deserialize(deserializer)?; + as_string + .parse::() + .map_err(D::Error::custom) + } +} diff --git a/waku-bindings/src/general/mod.rs b/waku-bindings/src/general/mod.rs index dc0efb3..8731694 100644 --- a/waku-bindings/src/general/mod.rs +++ b/waku-bindings/src/general/mod.rs @@ -1,13 +1,11 @@ //! Waku [general](https://rfc.vac.dev/spec/36/#general) types -// std -use std::borrow::Cow; -use std::fmt::{Display, Formatter}; -use std::str::FromStr; +pub mod contenttopic; + // crates -use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; +use contenttopic::WakuContentTopic; +use serde::{Deserialize, Serialize}; use serde_aux::prelude::*; -use sscanf::{scanf, RegexRepresentation}; /// Waku message version pub type WakuMessageVersion = usize; @@ -65,124 +63,6 @@ impl WakuMessage { } } -/// WakuMessage encoding scheme -#[derive(Clone, Debug, Eq, PartialEq, Default)] -pub enum Encoding { - #[default] - Proto, - Rlp, - Rfc26, - Unknown(String), -} - -impl Display for Encoding { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let s = match self { - Encoding::Proto => "proto", - Encoding::Rlp => "rlp", - Encoding::Rfc26 => "rfc26", - Encoding::Unknown(value) => value, - }; - f.write_str(s) - } -} - -impl FromStr for Encoding { - type Err = std::io::Error; - - fn from_str(s: &str) -> std::result::Result { - match s.to_lowercase().as_str() { - "proto" => Ok(Self::Proto), - "rlp" => Ok(Self::Rlp), - "rfc26" => Ok(Self::Rfc26), - encoding => Ok(Self::Unknown(encoding.to_string())), - } - } -} - -impl RegexRepresentation for Encoding { - const REGEX: &'static str = r"\w"; -} - -/// A waku content topic `/{application_name}/{version}/{content_topic_name}/{encdoing}` -#[derive(Clone, Debug, Eq, PartialEq, Default)] -pub struct WakuContentTopic { - pub application_name: Cow<'static, str>, - pub version: Cow<'static, str>, - pub content_topic_name: Cow<'static, str>, - pub encoding: Encoding, -} - -impl WakuContentTopic { - pub const fn new( - application_name: &'static str, - version: &'static str, - content_topic_name: &'static str, - encoding: Encoding, - ) -> Self { - Self { - application_name: Cow::Borrowed(application_name), - version: Cow::Borrowed(version), - content_topic_name: Cow::Borrowed(content_topic_name), - encoding, - } - } -} - -impl FromStr for WakuContentTopic { - type Err = String; - - fn from_str(s: &str) -> std::result::Result { - if let Ok((application_name, version, content_topic_name, encoding)) = - scanf!(s, "/{}/{}/{}/{:/.+?/}", String, String, String, Encoding) - { - Ok(WakuContentTopic { - application_name: Cow::Owned(application_name), - version: Cow::Owned(version), - content_topic_name: Cow::Owned(content_topic_name), - encoding, - }) - } else { - Err( - format!( - "Wrong pub-sub topic format. Should be `/{{application-name}}/{{version-of-the-application}}/{{content-topic-name}}/{{encoding}}`. Got: {s}" - ) - ) - } - } -} - -impl Display for WakuContentTopic { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "/{}/{}/{}/{}", - self.application_name, self.version, self.content_topic_name, self.encoding - ) - } -} - -impl Serialize for WakuContentTopic { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: Serializer, - { - self.to_string().serialize(serializer) - } -} - -impl<'de> Deserialize<'de> for WakuContentTopic { - fn deserialize(deserializer: D) -> std::result::Result - where - D: Deserializer<'de>, - { - let as_string: String = String::deserialize(deserializer)?; - as_string - .parse::() - .map_err(D::Error::custom) - } -} - mod base64_serde { use base64::Engine; use serde::de::Error; diff --git a/waku-bindings/src/lib.rs b/waku-bindings/src/lib.rs index ee98013..1e7569f 100644 --- a/waku-bindings/src/lib.rs +++ b/waku-bindings/src/lib.rs @@ -15,10 +15,9 @@ pub use utils::LibwakuResponse; use rln; pub use node::{ - waku_create_content_topic, waku_destroy, waku_new, Event, Initialized, Key, Multiaddr, - PublicKey, RLNConfig, Running, SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle, + waku_create_content_topic, waku_new, Event, Initialized, Key, Multiaddr, PublicKey, RLNConfig, + Running, SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle, }; -pub use general::{ - Encoding, MessageHash, Result, WakuContentTopic, WakuMessage, WakuMessageVersion, -}; +pub use general::contenttopic::{Encoding, WakuContentTopic}; +pub use general::{MessageHash, Result, WakuMessage, WakuMessageVersion}; diff --git a/waku-bindings/src/node/config.rs b/waku-bindings/src/node/config.rs index 8149088..8d26f3b 100644 --- a/waku-bindings/src/node/config.rs +++ b/waku-bindings/src/node/config.rs @@ -16,7 +16,7 @@ pub struct WakuNodeConfig { pub host: Option, /// Libp2p TCP listening port. Default `60000`. Use `0` for **random** #[default(Some(60000))] - pub port: Option, + pub tcp_port: Option, /// Secp256k1 private key in Hex format (`0x123...abc`). Default random #[serde(with = "secret_key_serde", rename = "key")] pub node_key: Option, @@ -28,6 +28,7 @@ pub struct WakuNodeConfig { #[default(Some(true))] pub relay: Option, pub relay_topics: Vec, + #[default(vec![1])] pub shards: Vec, #[serde(skip_serializing_if = "Option::is_none")] pub max_message_size: Option, diff --git a/waku-bindings/src/node/context.rs b/waku-bindings/src/node/context.rs index 1d41273..a9cbd1a 100644 --- a/waku-bindings/src/node/context.rs +++ b/waku-bindings/src/node/context.rs @@ -1,4 +1,5 @@ use std::ffi::c_void; +use std::ptr::null_mut; use std::sync::{Arc, Mutex}; use crate::utils::{get_trampoline, LibwakuResponse}; @@ -31,6 +32,10 @@ impl WakuNodeContext { self.obj_ptr } + pub fn reset_ptr(mut self) { + self.obj_ptr = null_mut(); + } + /// Register callback to act as event handler and receive application events, /// which are used to react to asynchronous events in Waku pub fn waku_set_event_callback( diff --git a/waku-bindings/src/node/filter.rs b/waku-bindings/src/node/filter.rs index 30a72b1..3f28737 100644 --- a/waku-bindings/src/node/filter.rs +++ b/waku-bindings/src/node/filter.rs @@ -5,6 +5,7 @@ use std::ffi::CString; // crates use libc::*; // internal +use crate::general::contenttopic::WakuContentTopic; use crate::general::Result; use crate::node::context::WakuNodeContext; use crate::utils::{get_trampoline, handle_no_response, LibwakuResponse}; @@ -12,10 +13,10 @@ use crate::utils::{get_trampoline, handle_no_response, LibwakuResponse}; pub fn waku_filter_subscribe( ctx: &WakuNodeContext, pubsub_topic: &str, - content_topics: &str, // comma-separated list of content topics + content_topics: Vec, ) -> Result<()> { let pubsub_topic = pubsub_topic.to_string(); - let content_topics = content_topics.to_string(); + let content_topics = WakuContentTopic::join_content_topics(content_topics); let pubsub_topic_ptr = CString::new(pubsub_topic) .expect("CString should build properly from pubsub topic") @@ -49,10 +50,10 @@ pub fn waku_filter_subscribe( pub fn waku_filter_unsubscribe( ctx: &WakuNodeContext, pubsub_topic: &str, - content_topics_topics: &str, // comma-separated list of content topics + content_topics: Vec, // comma-separated list of content topics ) -> Result<()> { let pubsub_topic = pubsub_topic.to_string(); - let content_topics_topics = content_topics_topics.to_string(); + let content_topics_topics = WakuContentTopic::join_content_topics(content_topics); let pubsub_topic_ptr = CString::new(pubsub_topic) .expect("CString should build properly from pubsub topic") diff --git a/waku-bindings/src/node/management.rs b/waku-bindings/src/node/management.rs index bcec19e..eb30998 100644 --- a/waku-bindings/src/node/management.rs +++ b/waku-bindings/src/node/management.rs @@ -152,6 +152,7 @@ mod test { fn nwaku_version() { let node = waku_new(None).unwrap(); let version = waku_version(&node).expect("should return the version"); + print!("Current version: {}", version); assert!(!version.is_empty()); } } diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index 5ffcd1e..8326976 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -16,6 +16,7 @@ pub use secp256k1::{PublicKey, SecretKey}; use std::marker::PhantomData; use std::time::Duration; // internal +use crate::general::contenttopic::{Encoding, WakuContentTopic}; use crate::general::{MessageHash, Result, WakuMessage}; use crate::utils::LibwakuResponse; @@ -25,8 +26,6 @@ pub use config::WakuNodeConfig; pub use events::{Event, WakuMessageEvent}; pub use relay::waku_create_content_topic; -use crate::Encoding; -use crate::WakuContentTopic; use std::time::SystemTime; // Define state marker types @@ -48,15 +47,17 @@ pub fn waku_new(config: Option) -> Result) -> Result<()> { - management::waku_destroy(&node.ctx) -} - impl WakuNodeHandle { /// Get the nwaku version pub fn version(&self) -> Result { management::waku_version(&self.ctx) } + + pub fn waku_destroy(self) -> Result<()> { + let res = management::waku_destroy(&self.ctx); + self.ctx.reset_ptr(); + res + } } impl WakuNodeHandle { @@ -149,11 +150,19 @@ impl WakuNodeHandle { relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic) } - pub fn filter_subscribe(&self, pubsub_topic: &String, content_topics: &String) -> Result<()> { + pub fn filter_subscribe( + &self, + pubsub_topic: &String, + content_topics: Vec, + ) -> Result<()> { filter::waku_filter_subscribe(&self.ctx, pubsub_topic, content_topics) } - pub fn filter_unsubscribe(&self, pubsub_topic: &String, content_topics: &String) -> Result<()> { + pub fn filter_unsubscribe( + &self, + pubsub_topic: &String, + content_topics: Vec, + ) -> Result<()> { filter::waku_filter_unsubscribe(&self.ctx, pubsub_topic, content_topics) } diff --git a/waku-bindings/src/node/relay.rs b/waku-bindings/src/node/relay.rs index 32ba2a6..2b781a4 100644 --- a/waku-bindings/src/node/relay.rs +++ b/waku-bindings/src/node/relay.rs @@ -6,7 +6,8 @@ use std::time::Duration; // crates use libc::*; // internal -use crate::general::{Encoding, MessageHash, Result, WakuContentTopic, WakuMessage}; +use crate::general::contenttopic::{Encoding, WakuContentTopic}; +use crate::general::{MessageHash, Result, WakuMessage}; use crate::node::context::WakuNodeContext; use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuResponse}; diff --git a/waku-bindings/tests/node.rs b/waku-bindings/tests/node.rs index a41fa98..b847e93 100644 --- a/waku-bindings/tests/node.rs +++ b/waku-bindings/tests/node.rs @@ -1,3 +1,5 @@ +use multiaddr::Multiaddr; +use regex::Regex; use secp256k1::SecretKey; use serial_test::serial; use std::str::FromStr; @@ -6,17 +8,17 @@ use std::time::{Duration, SystemTime}; use std::{collections::HashSet, str::from_utf8}; use tokio::time; use tokio::time::sleep; -use waku_bindings::LibwakuResponse; use waku_bindings::{ - waku_destroy, waku_new, Encoding, Event, MessageHash, WakuContentTopic, WakuMessage, + waku_new, Encoding, Event, Initialized, MessageHash, WakuContentTopic, WakuMessage, WakuNodeConfig, WakuNodeHandle, }; +use waku_bindings::{LibwakuResponse, Running}; const ECHO_TIMEOUT: u64 = 1000; const ECHO_MESSAGE: &str = "Hi from 🦀!"; const TEST_PUBSUBTOPIC: &str = "test"; fn try_publish_relay_messages( - node: &WakuNodeHandle, + node: &WakuNodeHandle, msg: &WakuMessage, ) -> Result, String> { let topic = TEST_PUBSUBTOPIC.to_string(); @@ -26,13 +28,15 @@ fn try_publish_relay_messages( } async fn test_echo_messages( - node1: &WakuNodeHandle, - node2: &WakuNodeHandle, + node1: WakuNodeHandle, + node2: WakuNodeHandle, content: &'static str, content_topic: WakuContentTopic, ) -> Result<(), String> { // setting a naïve event handler to avoid appearing ERR messages in logs - let _ = node1.ctx.waku_set_event_callback(&|_| {}); + node1 + .set_event_callback(&|_| {}) + .expect("set event call back working"); let rx_waku_message: Arc> = Arc::new(Mutex::new(WakuMessage::default())); @@ -57,10 +61,12 @@ async fn test_echo_messages( println!("Before setting event callback"); node2 - .ctx - .waku_set_event_callback(closure) + .set_event_callback(closure) .expect("set event call back working"); // Set the event callback with the closure + let node1 = node1.start()?; + let node2 = node2.start()?; + let topic = TEST_PUBSUBTOPIC.to_string(); node1.relay_subscribe(&topic).unwrap(); node2.relay_subscribe(&topic).unwrap(); @@ -68,9 +74,17 @@ async fn test_echo_messages( sleep(Duration::from_secs(3)).await; // Interconnect nodes - println!("Connecting node1 to node2"); + // Replace all matches with 127.0.0.1 to avoid issue with NAT or firewall. let addresses1 = node1.listen_addresses().unwrap(); - node2.connect(&addresses1[0], None).unwrap(); + let addresses1 = &addresses1[0].to_string(); + + let re = Regex::new(r"\b(?:\d{1,3}\.){3}\d{1,3}\b").unwrap(); + let addresses1 = re.replace_all(addresses1, "127.0.0.1").to_string(); + + let addresses1 = addresses1.parse::().expect("parse multiaddress"); + + println!("Connecting node1 to node2: {}", addresses1); + node2.connect(&addresses1, None).unwrap(); // Wait for mesh to form sleep(Duration::from_secs(3)).await; @@ -89,7 +103,7 @@ async fn test_echo_messages( Vec::new(), false, ); - let _ids = try_publish_relay_messages(node1, &message).expect("send relay messages"); + let _ids = try_publish_relay_messages(&node1, &message).expect("send relay messages"); // Wait for the msg to arrive let rx_waku_message_cloned = rx_waku_message.clone(); @@ -98,8 +112,9 @@ async fn test_echo_messages( // dbg!("The waku message value is: {:?}", msg); let payload = msg.payload.to_vec(); let payload_str = from_utf8(&payload).expect("should be valid message"); - dbg!("payload: {:?}", payload_str); if payload_str == ECHO_MESSAGE { + node1.stop()?; + node2.stop()?; return Ok(()); } } else { @@ -107,6 +122,12 @@ async fn test_echo_messages( } } + let node1 = node1.stop()?; + let node2 = node2.stop()?; + + node1.waku_destroy()?; + node2.waku_destroy()?; + return Err("Unexpected test ending".to_string()); } @@ -115,17 +136,14 @@ async fn test_echo_messages( async fn default_echo() -> Result<(), String> { println!("Test default_echo"); let node1 = waku_new(Some(WakuNodeConfig { - port: Some(60010), + tcp_port: Some(60010), ..Default::default() }))?; let node2 = waku_new(Some(WakuNodeConfig { - port: Some(60020), + tcp_port: Some(60020), ..Default::default() }))?; - node1.start()?; - node2.start()?; - let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto); let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT)); @@ -134,16 +152,11 @@ async fn default_echo() -> Result<(), String> { // Send and receive messages. Waits until all messages received. let got_all = tokio::select! { _ = sleep => false, - _ = test_echo_messages(&node1, &node2, ECHO_MESSAGE, content_topic) => true, + _ = test_echo_messages(node1, node2, ECHO_MESSAGE, content_topic) => true, }; assert!(got_all); - node1.stop()?; - node2.stop()?; - waku_destroy(node1)?; - waku_destroy(node2)?; - Ok(()) } @@ -160,7 +173,8 @@ fn node_restart() { for _ in 0..3 { let node = waku_new(config.clone().into()).expect("default config should be valid"); - node.start().expect("node should start with valid config"); - node.stop().expect("node should stop"); + let node = node.start().expect("node should start with valid config"); + let node = node.stop().expect("node should stop"); + node.waku_destroy().expect("free resources"); } }