general improvements

This commit is contained in:
Ivan Folgueira Bande 2024-11-27 17:54:04 +01:00
parent daf1cb6b08
commit 2fd169bda6
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
15 changed files with 272 additions and 249 deletions

1
Cargo.lock generated
View File

@ -3060,6 +3060,7 @@ dependencies = [
"multiaddr",
"once_cell",
"rand",
"regex",
"rln",
"secp256k1 0.26.0",
"serde",

30
examples/Cargo.lock generated
View File

@ -4277,15 +4277,6 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "termcolor"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adc4587ead41bf016f11af03e55a624c06568b5a19db4e90fde573d805074f83"
dependencies = [
"wincolor",
]
[[package]]
name = "textwrap"
version = "0.11.0"
@ -4325,17 +4316,6 @@ dependencies = [
"once_cell",
]
[[package]]
name = "tic-tac-toe"
version = "0.1.0"
dependencies = [
"ark-std",
"ctrlc",
"serde_json",
"termcolor",
"waku-bindings",
]
[[package]]
name = "tic-tac-toe-gui"
version = "0.1.0"
@ -4681,6 +4661,7 @@ dependencies = [
"multiaddr",
"once_cell",
"rand",
"regex",
"rln",
"secp256k1 0.26.0",
"serde",
@ -5206,15 +5187,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "wincolor"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eeb06499a3a4d44302791052df005d5232b927ed1a9658146d842165c4de7767"
dependencies = [
"winapi",
]
[[package]]
name = "windows"
version = "0.48.0"

View File

@ -3,70 +3,73 @@ use std::str::from_utf8;
use std::time::SystemTime;
use tokio::time::{sleep, Duration};
use waku::{
waku_destroy, waku_new, Encoding, Event, LibwakuResponse, WakuContentTopic, WakuMessage,
WakuNodeConfig,
waku_new, Encoding, Event, LibwakuResponse, WakuContentTopic, WakuMessage, WakuNodeConfig,
};
#[tokio::main]
async fn main() -> Result<(), Error> {
let node1 = waku_new(Some(WakuNodeConfig {
port: Some(60010), // TODO: use any available port.
tcp_port: Some(60010), // TODO: use any available port.
..Default::default()
}))
.expect("should instantiate");
let node2 = waku_new(Some(WakuNodeConfig {
port: Some(60020), // TODO: use any available port.
tcp_port: Some(60020), // TODO: use any available port.
..Default::default()
}))
.expect("should instantiate");
node1.start().expect("node1 should start");
node2.start().expect("node2 should start");
// ========================================================================
// Setting an event callback to be executed each time a message is received
node2.ctx.waku_set_event_callback(&|response| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
node2
.set_event_callback(&|response| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
match event {
Event::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 2: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
}
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
});
match event {
Event::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 2: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
}
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
})
.expect("set event call back working");
node1.ctx.waku_set_event_callback(&|response| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
node1
.set_event_callback(&|response| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
match event {
Event::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 1: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
}
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
});
match event {
Event::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 1: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
}
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
})
.expect("set event call back working");
let node1 = node1.start().expect("node1 should start");
let node2 = node2.start().expect("node2 should start");
// ========================================================================
// Subscribe to pubsub topic
@ -125,13 +128,13 @@ async fn main() -> Result<(), Error> {
// ========================================================================
// Stop both instances
node1.stop().expect("should stop");
node2.stop().expect("should stop");
let node1 = node1.stop().expect("should stop");
let node2 = node2.stop().expect("should stop");
// ========================================================================
// Free resources
waku_destroy(node1).expect("should deallocate");
waku_destroy(node2).expect("should deallocate");
node1.waku_destroy().expect("should deallocate");
node2.waku_destroy().expect("should deallocate");
Ok(())
}

View File

@ -90,8 +90,9 @@ impl TicTacToeApp<Initialized> {
// Subscribe to desired topic using the relay protocol
// self.waku.relay_subscribe(&self.game_topic.to_string()).expect("waku should subscribe");
let content_topic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
waku.filter_subscribe(&self.game_topic.to_string(), &content_topic.to_string()).expect("waku should subscribe");
let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
let content_topics = vec![ctopic];
waku.filter_subscribe(&self.game_topic.to_string(), content_topics).expect("waku should subscribe");
// Connect to hard-coded node
// let target_node_multi_addr =
@ -308,7 +309,7 @@ async fn main() -> eframe::Result<()> {
let game_topic = "/waku/2/rs/16/32";
// Create a Waku instance
let waku = waku_new(Some(WakuNodeConfig {
port: Some(60010),
tcp_port: Some(60010),
cluster_id: Some(16),
shards: vec![1, 32, 64, 128, 256],
// node_key: Some(SecretKey::from_str("2fc0515879e52b7b73297cfd6ab3abf7c344ef84b7a90ff6f4cc19e05a198027").unwrap()),

View File

@ -32,6 +32,7 @@ libc = "0.2"
serde-aux = "4.3.1"
rln = "0.3.4"
tokio = { version = "1", features = ["full"] }
regex = "1"
[dev-dependencies]
futures = "0.3.25"

View File

@ -0,0 +1,134 @@
// std
use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
use sscanf::{scanf, RegexRepresentation};
/// WakuMessage encoding scheme
#[derive(Clone, Debug, Eq, PartialEq, Default)]
pub enum Encoding {
#[default]
Proto,
Rlp,
Rfc26,
Unknown(String),
}
impl Display for Encoding {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let s = match self {
Encoding::Proto => "proto",
Encoding::Rlp => "rlp",
Encoding::Rfc26 => "rfc26",
Encoding::Unknown(value) => value,
};
f.write_str(s)
}
}
impl FromStr for Encoding {
type Err = std::io::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"proto" => Ok(Self::Proto),
"rlp" => Ok(Self::Rlp),
"rfc26" => Ok(Self::Rfc26),
encoding => Ok(Self::Unknown(encoding.to_string())),
}
}
}
impl RegexRepresentation for Encoding {
const REGEX: &'static str = r"\w";
}
/// A waku content topic `/{application_name}/{version}/{content_topic_name}/{encdoing}`
#[derive(Clone, Debug, Eq, PartialEq, Default)]
pub struct WakuContentTopic {
pub application_name: Cow<'static, str>,
pub version: Cow<'static, str>,
pub content_topic_name: Cow<'static, str>,
pub encoding: Encoding,
}
impl WakuContentTopic {
pub const fn new(
application_name: &'static str,
version: &'static str,
content_topic_name: &'static str,
encoding: Encoding,
) -> Self {
Self {
application_name: Cow::Borrowed(application_name),
version: Cow::Borrowed(version),
content_topic_name: Cow::Borrowed(content_topic_name),
encoding,
}
}
pub fn join_content_topics(topics: Vec<WakuContentTopic>) -> String {
topics
.iter()
.map(|topic| topic.to_string())
.collect::<Vec<_>>()
.join(",")
}
}
impl FromStr for WakuContentTopic {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
if let Ok((application_name, version, content_topic_name, encoding)) =
scanf!(s, "/{}/{}/{}/{:/.+?/}", String, String, String, Encoding)
{
Ok(WakuContentTopic {
application_name: Cow::Owned(application_name),
version: Cow::Owned(version),
content_topic_name: Cow::Owned(content_topic_name),
encoding,
})
} else {
Err(
format!(
"Wrong pub-sub topic format. Should be `/{{application-name}}/{{version-of-the-application}}/{{content-topic-name}}/{{encoding}}`. Got: {s}"
)
)
}
}
}
impl Display for WakuContentTopic {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"/{}/{}/{}/{}",
self.application_name, self.version, self.content_topic_name, self.encoding
)
}
}
impl Serialize for WakuContentTopic {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
self.to_string().serialize(serializer)
}
}
impl<'de> Deserialize<'de> for WakuContentTopic {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let as_string: String = String::deserialize(deserializer)?;
as_string
.parse::<WakuContentTopic>()
.map_err(D::Error::custom)
}
}

View File

@ -1,13 +1,11 @@
//! Waku [general](https://rfc.vac.dev/spec/36/#general) types
// std
use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
pub mod contenttopic;
// crates
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
use contenttopic::WakuContentTopic;
use serde::{Deserialize, Serialize};
use serde_aux::prelude::*;
use sscanf::{scanf, RegexRepresentation};
/// Waku message version
pub type WakuMessageVersion = usize;
@ -65,124 +63,6 @@ impl WakuMessage {
}
}
/// WakuMessage encoding scheme
#[derive(Clone, Debug, Eq, PartialEq, Default)]
pub enum Encoding {
#[default]
Proto,
Rlp,
Rfc26,
Unknown(String),
}
impl Display for Encoding {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let s = match self {
Encoding::Proto => "proto",
Encoding::Rlp => "rlp",
Encoding::Rfc26 => "rfc26",
Encoding::Unknown(value) => value,
};
f.write_str(s)
}
}
impl FromStr for Encoding {
type Err = std::io::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"proto" => Ok(Self::Proto),
"rlp" => Ok(Self::Rlp),
"rfc26" => Ok(Self::Rfc26),
encoding => Ok(Self::Unknown(encoding.to_string())),
}
}
}
impl RegexRepresentation for Encoding {
const REGEX: &'static str = r"\w";
}
/// A waku content topic `/{application_name}/{version}/{content_topic_name}/{encdoing}`
#[derive(Clone, Debug, Eq, PartialEq, Default)]
pub struct WakuContentTopic {
pub application_name: Cow<'static, str>,
pub version: Cow<'static, str>,
pub content_topic_name: Cow<'static, str>,
pub encoding: Encoding,
}
impl WakuContentTopic {
pub const fn new(
application_name: &'static str,
version: &'static str,
content_topic_name: &'static str,
encoding: Encoding,
) -> Self {
Self {
application_name: Cow::Borrowed(application_name),
version: Cow::Borrowed(version),
content_topic_name: Cow::Borrowed(content_topic_name),
encoding,
}
}
}
impl FromStr for WakuContentTopic {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
if let Ok((application_name, version, content_topic_name, encoding)) =
scanf!(s, "/{}/{}/{}/{:/.+?/}", String, String, String, Encoding)
{
Ok(WakuContentTopic {
application_name: Cow::Owned(application_name),
version: Cow::Owned(version),
content_topic_name: Cow::Owned(content_topic_name),
encoding,
})
} else {
Err(
format!(
"Wrong pub-sub topic format. Should be `/{{application-name}}/{{version-of-the-application}}/{{content-topic-name}}/{{encoding}}`. Got: {s}"
)
)
}
}
}
impl Display for WakuContentTopic {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"/{}/{}/{}/{}",
self.application_name, self.version, self.content_topic_name, self.encoding
)
}
}
impl Serialize for WakuContentTopic {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
self.to_string().serialize(serializer)
}
}
impl<'de> Deserialize<'de> for WakuContentTopic {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let as_string: String = String::deserialize(deserializer)?;
as_string
.parse::<WakuContentTopic>()
.map_err(D::Error::custom)
}
}
mod base64_serde {
use base64::Engine;
use serde::de::Error;

View File

@ -15,10 +15,9 @@ pub use utils::LibwakuResponse;
use rln;
pub use node::{
waku_create_content_topic, waku_destroy, waku_new, Event, Initialized, Key, Multiaddr,
PublicKey, RLNConfig, Running, SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
waku_create_content_topic, waku_new, Event, Initialized, Key, Multiaddr, PublicKey, RLNConfig,
Running, SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
};
pub use general::{
Encoding, MessageHash, Result, WakuContentTopic, WakuMessage, WakuMessageVersion,
};
pub use general::contenttopic::{Encoding, WakuContentTopic};
pub use general::{MessageHash, Result, WakuMessage, WakuMessageVersion};

View File

@ -16,7 +16,7 @@ pub struct WakuNodeConfig {
pub host: Option<std::net::IpAddr>,
/// Libp2p TCP listening port. Default `60000`. Use `0` for **random**
#[default(Some(60000))]
pub port: Option<usize>,
pub tcp_port: Option<usize>,
/// Secp256k1 private key in Hex format (`0x123...abc`). Default random
#[serde(with = "secret_key_serde", rename = "key")]
pub node_key: Option<SecretKey>,
@ -28,6 +28,7 @@ pub struct WakuNodeConfig {
#[default(Some(true))]
pub relay: Option<bool>,
pub relay_topics: Vec<String>,
#[default(vec![1])]
pub shards: Vec<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_message_size: Option<String>,

View File

@ -1,4 +1,5 @@
use std::ffi::c_void;
use std::ptr::null_mut;
use std::sync::{Arc, Mutex};
use crate::utils::{get_trampoline, LibwakuResponse};
@ -31,6 +32,10 @@ impl WakuNodeContext {
self.obj_ptr
}
pub fn reset_ptr(mut self) {
self.obj_ptr = null_mut();
}
/// Register callback to act as event handler and receive application events,
/// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(LibwakuResponse) + 'static + Sync + Send>(

View File

@ -5,6 +5,7 @@ use std::ffi::CString;
// crates
use libc::*;
// internal
use crate::general::contenttopic::WakuContentTopic;
use crate::general::Result;
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_no_response, LibwakuResponse};
@ -12,10 +13,10 @@ use crate::utils::{get_trampoline, handle_no_response, LibwakuResponse};
pub fn waku_filter_subscribe(
ctx: &WakuNodeContext,
pubsub_topic: &str,
content_topics: &str, // comma-separated list of content topics
content_topics: Vec<WakuContentTopic>,
) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let content_topics = content_topics.to_string();
let content_topics = WakuContentTopic::join_content_topics(content_topics);
let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
@ -49,10 +50,10 @@ pub fn waku_filter_subscribe(
pub fn waku_filter_unsubscribe(
ctx: &WakuNodeContext,
pubsub_topic: &str,
content_topics_topics: &str, // comma-separated list of content topics
content_topics: Vec<WakuContentTopic>, // comma-separated list of content topics
) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let content_topics_topics = content_topics_topics.to_string();
let content_topics_topics = WakuContentTopic::join_content_topics(content_topics);
let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")

View File

@ -152,6 +152,7 @@ mod test {
fn nwaku_version() {
let node = waku_new(None).unwrap();
let version = waku_version(&node).expect("should return the version");
print!("Current version: {}", version);
assert!(!version.is_empty());
}
}

View File

@ -16,6 +16,7 @@ pub use secp256k1::{PublicKey, SecretKey};
use std::marker::PhantomData;
use std::time::Duration;
// internal
use crate::general::contenttopic::{Encoding, WakuContentTopic};
use crate::general::{MessageHash, Result, WakuMessage};
use crate::utils::LibwakuResponse;
@ -25,8 +26,6 @@ pub use config::WakuNodeConfig;
pub use events::{Event, WakuMessageEvent};
pub use relay::waku_create_content_topic;
use crate::Encoding;
use crate::WakuContentTopic;
use std::time::SystemTime;
// Define state marker types
@ -48,15 +47,17 @@ pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initial
})
}
pub fn waku_destroy(node: WakuNodeHandle<Initialized>) -> Result<()> {
management::waku_destroy(&node.ctx)
}
impl<State> WakuNodeHandle<State> {
/// Get the nwaku version
pub fn version(&self) -> Result<String> {
management::waku_version(&self.ctx)
}
pub fn waku_destroy(self) -> Result<()> {
let res = management::waku_destroy(&self.ctx);
self.ctx.reset_ptr();
res
}
}
impl WakuNodeHandle<Initialized> {
@ -149,11 +150,19 @@ impl WakuNodeHandle<Running> {
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic)
}
pub fn filter_subscribe(&self, pubsub_topic: &String, content_topics: &String) -> Result<()> {
pub fn filter_subscribe(
&self,
pubsub_topic: &String,
content_topics: Vec<WakuContentTopic>,
) -> Result<()> {
filter::waku_filter_subscribe(&self.ctx, pubsub_topic, content_topics)
}
pub fn filter_unsubscribe(&self, pubsub_topic: &String, content_topics: &String) -> Result<()> {
pub fn filter_unsubscribe(
&self,
pubsub_topic: &String,
content_topics: Vec<WakuContentTopic>,
) -> Result<()> {
filter::waku_filter_unsubscribe(&self.ctx, pubsub_topic, content_topics)
}

View File

@ -6,7 +6,8 @@ use std::time::Duration;
// crates
use libc::*;
// internal
use crate::general::{Encoding, MessageHash, Result, WakuContentTopic, WakuMessage};
use crate::general::contenttopic::{Encoding, WakuContentTopic};
use crate::general::{MessageHash, Result, WakuMessage};
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuResponse};

View File

@ -1,3 +1,5 @@
use multiaddr::Multiaddr;
use regex::Regex;
use secp256k1::SecretKey;
use serial_test::serial;
use std::str::FromStr;
@ -6,17 +8,17 @@ use std::time::{Duration, SystemTime};
use std::{collections::HashSet, str::from_utf8};
use tokio::time;
use tokio::time::sleep;
use waku_bindings::LibwakuResponse;
use waku_bindings::{
waku_destroy, waku_new, Encoding, Event, MessageHash, WakuContentTopic, WakuMessage,
waku_new, Encoding, Event, Initialized, MessageHash, WakuContentTopic, WakuMessage,
WakuNodeConfig, WakuNodeHandle,
};
use waku_bindings::{LibwakuResponse, Running};
const ECHO_TIMEOUT: u64 = 1000;
const ECHO_MESSAGE: &str = "Hi from 🦀!";
const TEST_PUBSUBTOPIC: &str = "test";
fn try_publish_relay_messages(
node: &WakuNodeHandle,
node: &WakuNodeHandle<Running>,
msg: &WakuMessage,
) -> Result<HashSet<MessageHash>, String> {
let topic = TEST_PUBSUBTOPIC.to_string();
@ -26,13 +28,15 @@ fn try_publish_relay_messages(
}
async fn test_echo_messages(
node1: &WakuNodeHandle,
node2: &WakuNodeHandle,
node1: WakuNodeHandle<Initialized>,
node2: WakuNodeHandle<Initialized>,
content: &'static str,
content_topic: WakuContentTopic,
) -> Result<(), String> {
// setting a naïve event handler to avoid appearing ERR messages in logs
let _ = node1.ctx.waku_set_event_callback(&|_| {});
node1
.set_event_callback(&|_| {})
.expect("set event call back working");
let rx_waku_message: Arc<Mutex<WakuMessage>> = Arc::new(Mutex::new(WakuMessage::default()));
@ -57,10 +61,12 @@ async fn test_echo_messages(
println!("Before setting event callback");
node2
.ctx
.waku_set_event_callback(closure)
.set_event_callback(closure)
.expect("set event call back working"); // Set the event callback with the closure
let node1 = node1.start()?;
let node2 = node2.start()?;
let topic = TEST_PUBSUBTOPIC.to_string();
node1.relay_subscribe(&topic).unwrap();
node2.relay_subscribe(&topic).unwrap();
@ -68,9 +74,17 @@ async fn test_echo_messages(
sleep(Duration::from_secs(3)).await;
// Interconnect nodes
println!("Connecting node1 to node2");
// Replace all matches with 127.0.0.1 to avoid issue with NAT or firewall.
let addresses1 = node1.listen_addresses().unwrap();
node2.connect(&addresses1[0], None).unwrap();
let addresses1 = &addresses1[0].to_string();
let re = Regex::new(r"\b(?:\d{1,3}\.){3}\d{1,3}\b").unwrap();
let addresses1 = re.replace_all(addresses1, "127.0.0.1").to_string();
let addresses1 = addresses1.parse::<Multiaddr>().expect("parse multiaddress");
println!("Connecting node1 to node2: {}", addresses1);
node2.connect(&addresses1, None).unwrap();
// Wait for mesh to form
sleep(Duration::from_secs(3)).await;
@ -89,7 +103,7 @@ async fn test_echo_messages(
Vec::new(),
false,
);
let _ids = try_publish_relay_messages(node1, &message).expect("send relay messages");
let _ids = try_publish_relay_messages(&node1, &message).expect("send relay messages");
// Wait for the msg to arrive
let rx_waku_message_cloned = rx_waku_message.clone();
@ -98,8 +112,9 @@ async fn test_echo_messages(
// dbg!("The waku message value is: {:?}", msg);
let payload = msg.payload.to_vec();
let payload_str = from_utf8(&payload).expect("should be valid message");
dbg!("payload: {:?}", payload_str);
if payload_str == ECHO_MESSAGE {
node1.stop()?;
node2.stop()?;
return Ok(());
}
} else {
@ -107,6 +122,12 @@ async fn test_echo_messages(
}
}
let node1 = node1.stop()?;
let node2 = node2.stop()?;
node1.waku_destroy()?;
node2.waku_destroy()?;
return Err("Unexpected test ending".to_string());
}
@ -115,17 +136,14 @@ async fn test_echo_messages(
async fn default_echo() -> Result<(), String> {
println!("Test default_echo");
let node1 = waku_new(Some(WakuNodeConfig {
port: Some(60010),
tcp_port: Some(60010),
..Default::default()
}))?;
let node2 = waku_new(Some(WakuNodeConfig {
port: Some(60020),
tcp_port: Some(60020),
..Default::default()
}))?;
node1.start()?;
node2.start()?;
let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto);
let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT));
@ -134,16 +152,11 @@ async fn default_echo() -> Result<(), String> {
// Send and receive messages. Waits until all messages received.
let got_all = tokio::select! {
_ = sleep => false,
_ = test_echo_messages(&node1, &node2, ECHO_MESSAGE, content_topic) => true,
_ = test_echo_messages(node1, node2, ECHO_MESSAGE, content_topic) => true,
};
assert!(got_all);
node1.stop()?;
node2.stop()?;
waku_destroy(node1)?;
waku_destroy(node2)?;
Ok(())
}
@ -160,7 +173,8 @@ fn node_restart() {
for _ in 0..3 {
let node = waku_new(config.clone().into()).expect("default config should be valid");
node.start().expect("node should start with valid config");
node.stop().expect("node should stop");
let node = node.start().expect("node should start with valid config");
let node = node.stop().expect("node should stop");
node.waku_destroy().expect("free resources");
}
}