use aes_gcm::{Aes256Gcm, KeyInit}; use multiaddr::Multiaddr; use rand::thread_rng; use secp256k1::{PublicKey, Secp256k1, SecretKey}; use serial_test::serial; use std::net::IpAddr; use std::str::FromStr; use std::time::{Duration, SystemTime}; use std::{collections::HashSet, str::from_utf8}; use tokio::sync::mpsc::{self, Sender}; use tokio::time; use waku_bindings::{ waku_new, waku_set_event_callback, Encoding, Event, Key, MessageId, ProtocolId, Running, WakuContentTopic, WakuLogLevel, WakuMessage, WakuNodeConfig, WakuNodeHandle, }; const ECHO_TIMEOUT: u64 = 10; const ECHO_MESSAGE: &str = "Hi from 🦀!"; const NODES: &[&str] = &[ "/dns4/node-01.ac-cn-hongkong-c.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm", "/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ", "/dns4/node-01.gc-us-central1-a.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS" ]; fn try_publish_relay_messages( node: &WakuNodeHandle, msg: &WakuMessage, sk: &SecretKey, ssk: &Key, ) -> Result, String> { let pk = PublicKey::from_secret_key(&Secp256k1::new(), sk); Ok(HashSet::from([ node.relay_publish_message(msg, None, None)?, node.relay_publish_encrypt_asymmetric(msg, None, &pk, None, None)?, node.relay_publish_encrypt_symmetric(msg, None, ssk, None, None)?, node.relay_publish_encrypt_asymmetric(msg, None, &pk, Some(sk), None)?, node.relay_publish_encrypt_symmetric(msg, None, ssk, Some(sk), None)?, ])) } fn try_publish_lightpush_messages( node: &WakuNodeHandle, msg: &WakuMessage, sk: &SecretKey, ssk: &Key, ) -> Result, String> { let pk = PublicKey::from_secret_key(&Secp256k1::new(), sk); let peer_id = node .peers() .unwrap() .iter() .map(|peer| peer.peer_id()) .find(|id| id.as_str() != node.peer_id().unwrap().as_str()) .unwrap() .clone(); Ok(HashSet::from([ node.lightpush_publish(msg, None, peer_id.clone(), None)?, node.lightpush_publish_encrypt_asymmetric(msg, None, peer_id.clone(), &pk, None, None)?, node.lightpush_publish_encrypt_asymmetric(msg, None, peer_id.clone(), &pk, Some(sk), None)?, node.lightpush_publish_encrypt_symmetric(msg, None, peer_id.clone(), ssk, None, None)?, node.lightpush_publish_encrypt_symmetric(msg, None, peer_id, ssk, Some(sk), None)?, ])) } #[derive(Debug)] struct Response { id: MessageId, payload: Vec, } fn set_callback(tx: Sender, sk: SecretKey, ssk: Key) { waku_set_event_callback(move |signal| { if let Event::WakuMessage(message) = signal.event() { let id = message.message_id(); let message = message.waku_message(); let payload = if let Ok(message) = message .try_decode_asymmetric(&sk) .map_err(|e| println!("{e}")) { message.data().to_vec() } else if let Ok(message) = message .try_decode_symmetric(&ssk) .map_err(|e| println!("{e}")) { message.data().to_vec() } else { message.payload().to_vec() }; futures::executor::block_on(tx.send(Response { id: id.to_string(), payload, })) .expect("send response to the receiver"); } }); } async fn test_echo_messages( node: &WakuNodeHandle, content: &'static str, content_topic: WakuContentTopic, sk: SecretKey, ssk: Key, ) { let message = WakuMessage::new( content, content_topic, 1, SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_millis() .try_into() .unwrap(), ); let (tx, mut rx) = mpsc::channel(1); set_callback(tx, sk, ssk); let mut ids = try_publish_relay_messages(node, &message, &sk, &ssk).expect("send relay messages"); ids.extend( try_publish_lightpush_messages(node, &message, &sk, &ssk).expect("send lightpush messages"), ); while let Some(res) = rx.recv().await { if ids.take(&res.id).is_some() { let msg = from_utf8(&res.payload).expect("should be valid message"); assert_eq!(content, msg); } if ids.is_empty() { break; } } } #[ignore] #[tokio::test] #[serial] async fn discv5_echo() -> Result<(), String> { let config = WakuNodeConfig { host: IpAddr::from_str("0.0.0.0").ok(), log_level: Some(WakuLogLevel::Error), discv5: Some(true), discv5_udp_port: Some(9000), discv5_bootstrap_nodes: Vec::new(), ..Default::default() }; let node = waku_new(Some(config))?; let node = node.start()?; println!("Node peer id: {}", node.peer_id()?); for node_address in NODES { let address: Multiaddr = node_address.parse().unwrap(); let peer_id = node.add_peer(&address, ProtocolId::Relay)?; node.connect_peer_with_id(&peer_id, None)?; } assert!(node.peers()?.len() >= NODES.len()); assert!(node.peer_count()? >= NODES.len()); assert!(node.relay_enough_peers(None)?); let sk = SecretKey::new(&mut thread_rng()); let ssk = Aes256Gcm::generate_key(&mut thread_rng()); // Subscribe to default channel. node.relay_subscribe(None)?; let content_topic = WakuContentTopic::new("toychat", 2, "huilong", Encoding::Proto); let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT)); tokio::pin!(sleep); // Send and receive messages. Waits until all messages received. let got_all = tokio::select! { _ = sleep => false, _ = test_echo_messages(&node, ECHO_MESSAGE, content_topic, sk, ssk) => true, }; assert!(got_all); for node_data in node.peers()? { if node_data.peer_id() != &node.peer_id()? { node.disconnect_peer_with_id(node_data.peer_id())?; } } node.stop()?; Ok(()) } #[ignore] #[tokio::test] #[serial] async fn default_echo() -> Result<(), String> { let config = WakuNodeConfig { log_level: Some(WakuLogLevel::Error), ..Default::default() }; let node = waku_new(Some(config))?; let node = node.start()?; println!("Node peer id: {}", node.peer_id()?); for node_address in NODES { let address: Multiaddr = node_address.parse().unwrap(); let peer_id = node.add_peer(&address, ProtocolId::Relay)?; node.connect_peer_with_id(&peer_id, None)?; } assert!(node.peers()?.len() >= NODES.len()); assert!(node.peer_count()? >= NODES.len()); assert!(node.relay_enough_peers(None)?); let sk = SecretKey::new(&mut thread_rng()); let ssk = Aes256Gcm::generate_key(&mut thread_rng()); // subscribe to default channel node.relay_subscribe(None)?; let content_topic = WakuContentTopic::new("toychat", 2, "huilong", Encoding::Proto); let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT)); tokio::pin!(sleep); // Send and receive messages. Waits until all messages received. let got_all = tokio::select! { _ = sleep => false, _ = test_echo_messages(&node, ECHO_MESSAGE, content_topic, sk, ssk) => true, }; assert!(got_all); for node_data in node.peers()? { if node_data.peer_id() != &node.peer_id()? { node.disconnect_peer_with_id(node_data.peer_id())?; } } node.stop()?; Ok(()) } #[test] #[serial] fn node_restart() { let config = WakuNodeConfig::default(); for _ in 0..3 { let node = waku_new(config.clone().into()).expect("default config should be valid"); let node = node.start().expect("node should start with valid config"); assert!(node.peer_id().is_ok()); node.stop().expect("node should stop"); } }