From 48300c15a25812030214389cbb2b6dde3f82f516 Mon Sep 17 00:00:00 2001 From: gusto Date: Fri, 27 Jan 2023 11:35:07 +0200 Subject: [PATCH] Smoke tests for waku node (#41) * Fix for codecov branch names * Codecov with all tests included * Add tokio to node tests * Run echo tests as seperate processes * Add token to evade GH limiter * Discv5 and default test for node --- .github/workflows/codecov.yml | 5 +- Cargo.lock | 37 +++++ waku-bindings/Cargo.toml | 2 + waku-bindings/tests/node.rs | 292 ++++++++++++++++++++++++---------- 4 files changed, 250 insertions(+), 86 deletions(-) diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index 7225c9d..665d811 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -31,11 +31,14 @@ jobs: continue-on-error: true - run: | cargo install grcov; - cargo test --all-features -- --ignored; + cargo test --all-features; + cargo test discv5_echo -- --ignored; + cargo test default_echo -- --ignored; mkdir /tmp/cov; grcov . --binary-path ./target/debug/ -s . -t lcov --branch --ignore-not-existing --ignore '../*' --ignore "/*" -o /tmp/cov/tests.lcov; - uses: codecov/codecov-action@v3 with: + token: ${{ secrets.CODECOV_TOKEN }} directory: /tmp/cov/ name: waku-bindings-codecov fail_ci_if_error: true diff --git a/Cargo.lock b/Cargo.lock index 24ecd3e..f7f9564 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -439,6 +439,17 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" +[[package]] +name = "futures-macro" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.25" @@ -460,6 +471,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1232,6 +1244,29 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" +[[package]] +name = "tokio" +version = "1.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb" +dependencies = [ + "autocfg", + "pin-project-lite", + "tokio-macros", + "windows-sys 0.42.0", +] + +[[package]] +name = "tokio-macros" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "toml" version = "0.5.9" @@ -1350,6 +1385,7 @@ version = "0.1.0-beta3" dependencies = [ "aes-gcm", "base64", + "futures", "hex", "multiaddr", "once_cell", @@ -1360,6 +1396,7 @@ dependencies = [ "serial_test", "smart-default", "sscanf", + "tokio", "url", "waku-sys", ] diff --git a/waku-bindings/Cargo.toml b/waku-bindings/Cargo.toml index fa4ceeb..c650a59 100644 --- a/waku-bindings/Cargo.toml +++ b/waku-bindings/Cargo.toml @@ -28,4 +28,6 @@ url = "2.3" waku-sys = { version = "0.1.0-beta3", path = "../waku-sys" } [dev-dependencies] +futures = "0.3.25" serial_test = "0.10.0" +tokio = { version = "1.24.2", features = ["macros", "rt", "sync", "time"] } diff --git a/waku-bindings/tests/node.rs b/waku-bindings/tests/node.rs index 58856ea..7cc726d 100644 --- a/waku-bindings/tests/node.rs +++ b/waku-bindings/tests/node.rs @@ -2,38 +2,159 @@ use aes_gcm::{Aes256Gcm, KeyInit}; use multiaddr::Multiaddr; use rand::thread_rng; use secp256k1::{PublicKey, Secp256k1, SecretKey}; +use serial_test::serial; use std::net::IpAddr; use std::str::FromStr; use std::time::{Duration, SystemTime}; +use std::{collections::HashSet, str::from_utf8}; +use tokio::sync::mpsc::{self, Sender}; +use tokio::time; use waku_bindings::{ - waku_new, waku_set_event_callback, Encoding, Event, ProtocolId, WakuContentTopic, WakuLogLevel, - WakuMessage, WakuNodeConfig, + waku_new, waku_set_event_callback, Encoding, Event, Key, MessageId, ProtocolId, Running, + WakuContentTopic, WakuLogLevel, WakuMessage, WakuNodeConfig, WakuNodeHandle, }; +const ECHO_TIMEOUT: u64 = 10; +const ECHO_MESSAGE: &str = "Hi from 🦀!"; + const NODES: &[&str] = &[ "/dns4/node-01.ac-cn-hongkong-c.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm", "/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ", "/dns4/node-01.gc-us-central1-a.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS" ]; +fn try_publish_relay_messages( + node: &WakuNodeHandle, + msg: &WakuMessage, + sk: &SecretKey, + ssk: &Key, +) -> Result, String> { + let pk = PublicKey::from_secret_key(&Secp256k1::new(), sk); + + Ok(HashSet::from([ + node.relay_publish_message(msg, None, None)?, + node.relay_publish_encrypt_asymmetric(msg, None, &pk, None, None)?, + node.relay_publish_encrypt_symmetric(msg, None, ssk, None, None)?, + node.relay_publish_encrypt_asymmetric(msg, None, &pk, Some(sk), None)?, + node.relay_publish_encrypt_symmetric(msg, None, ssk, Some(sk), None)?, + ])) +} + +fn try_publish_lightpush_messages( + node: &WakuNodeHandle, + msg: &WakuMessage, + sk: &SecretKey, + ssk: &Key, +) -> Result, String> { + let pk = PublicKey::from_secret_key(&Secp256k1::new(), sk); + + let peer_id = node + .peers() + .unwrap() + .iter() + .map(|peer| peer.peer_id()) + .find(|id| id.as_str() != node.peer_id().unwrap().as_str()) + .unwrap() + .clone(); + + Ok(HashSet::from([ + node.lightpush_publish(msg, None, peer_id.clone(), None)?, + node.lightpush_publish_encrypt_asymmetric(msg, None, peer_id.clone(), &pk, None, None)?, + node.lightpush_publish_encrypt_asymmetric(msg, None, peer_id.clone(), &pk, Some(sk), None)?, + node.lightpush_publish_encrypt_symmetric(msg, None, peer_id.clone(), ssk, None, None)?, + node.lightpush_publish_encrypt_symmetric(msg, None, peer_id, ssk, Some(sk), None)?, + ])) +} + +#[derive(Debug)] +struct Response { + id: MessageId, + payload: Vec, +} + +fn set_callback(tx: Sender, sk: SecretKey, ssk: Key) { + waku_set_event_callback(move |signal| { + if let Event::WakuMessage(message) = signal.event() { + let id = message.message_id(); + let message = message.waku_message(); + + let payload = if let Ok(message) = message + .try_decode_asymmetric(&sk) + .map_err(|e| println!("{e}")) + { + message.data().to_vec() + } else if let Ok(message) = message + .try_decode_symmetric(&ssk) + .map_err(|e| println!("{e}")) + { + message.data().to_vec() + } else { + message.payload().to_vec() + }; + + futures::executor::block_on(tx.send(Response { + id: id.to_string(), + payload, + })) + .expect("send response to the receiver"); + } + }); +} + +async fn test_echo_messages( + node: &WakuNodeHandle, + content: &'static str, + content_topic: WakuContentTopic, + sk: SecretKey, + ssk: Key, +) { + let message = WakuMessage::new( + content, + content_topic, + 1, + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() + .try_into() + .unwrap(), + ); + + let (tx, mut rx) = mpsc::channel(1); + set_callback(tx, sk, ssk); + + let mut ids = + try_publish_relay_messages(node, &message, &sk, &ssk).expect("send relay messages"); + + ids.extend( + try_publish_lightpush_messages(node, &message, &sk, &ssk).expect("send lightpush messages"), + ); + + while let Some(res) = rx.recv().await { + if ids.take(&res.id).is_some() { + let msg = from_utf8(&res.payload).expect("should be valid message"); + assert_eq!(content, msg); + } + + if ids.is_empty() { + break; + } + } +} + #[ignore] -#[test] -pub fn main() -> Result<(), String> { +#[tokio::test] +#[serial] +async fn discv5_echo() -> Result<(), String> { let config = WakuNodeConfig { host: IpAddr::from_str("0.0.0.0").ok(), - port: None, - advertise_addr: None, - node_key: None, - keep_alive_interval: None, - relay: None, - relay_topics: vec![], - min_peers_to_publish: None, - filter: None, log_level: Some(WakuLogLevel::Error), discv5: Some(true), discv5_udp_port: Some(9000), discv5_bootstrap_nodes: Vec::new(), + ..Default::default() }; + let node = waku_new(Some(config))?; let node = node.start()?; println!("Node peer id: {}", node.peer_id()?); @@ -49,85 +170,22 @@ pub fn main() -> Result<(), String> { assert!(node.relay_enough_peers(None)?); let sk = SecretKey::new(&mut thread_rng()); - let pk = PublicKey::from_secret_key(&Secp256k1::new(), &sk); let ssk = Aes256Gcm::generate_key(&mut thread_rng()); - let content = "Hi from 🦀!"; - let content_callback = content; - - waku_set_event_callback(move |signal| match signal.event() { - Event::WakuMessage(message) => { - println!("Message with id [{}] received", message.message_id()); - let message = message.waku_message(); - let payload = if let Ok(message) = message - .try_decode_asymmetric(&sk) - .map_err(|e| println!("{e}")) - { - println!("Asymmetryc message"); - message.data().to_vec() - } else if let Ok(message) = message - .try_decode_symmetric(&ssk) - .map_err(|e| println!("{e}")) - { - println!("Symmetryc message"); - message.data().to_vec() - } else { - println!("Unencoded message"); - message.payload().to_vec() - }; - let message_content: String = - String::from_utf8(payload).expect("Message should be able to be read"); - println!("Message content: {message_content}"); - assert_eq!(message_content, content_callback); - } - _ => { - println!("Wtf is this event?"); - } - }); - - // subscribe to default channel + // Subscribe to default channel. node.relay_subscribe(None)?; let content_topic = WakuContentTopic::new("toychat", 2, "huilong", Encoding::Proto); - let message = WakuMessage::new( - content, - content_topic, - 1, - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() - .try_into() - .unwrap(), - ); + let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT)); + tokio::pin!(sleep); - node.relay_publish_message(&message, None, None)?; - node.relay_publish_encrypt_asymmetric(&message, None, &pk, None, None)?; - node.relay_publish_encrypt_symmetric(&message, None, &ssk, None, None)?; - node.relay_publish_encrypt_asymmetric(&message, None, &pk, Some(&sk), None)?; - node.relay_publish_encrypt_symmetric(&message, None, &ssk, Some(&sk), None)?; + // Send and receive messages. Waits until all messages received. + let got_all = tokio::select! { + _ = sleep => false, + _ = test_echo_messages(&node, ECHO_MESSAGE, content_topic, sk, ssk) => true, + }; - let peer_id = node - .peers() - .unwrap() - .iter() - .map(|peer| peer.peer_id()) - .find(|id| id.as_str() != node.peer_id().unwrap().as_str()) - .unwrap() - .clone(); - - node.lightpush_publish(&message, None, peer_id.clone(), None)?; - node.lightpush_publish_encrypt_asymmetric(&message, None, peer_id.clone(), &pk, None, None)?; - node.lightpush_publish_encrypt_asymmetric( - &message, - None, - peer_id.clone(), - &pk, - Some(&sk), - None, - )?; - node.lightpush_publish_encrypt_symmetric(&message, None, peer_id.clone(), &ssk, None, None)?; - node.lightpush_publish_encrypt_symmetric(&message, None, peer_id, &ssk, Some(&sk), None)?; + assert!(got_all); for node_data in node.peers()? { if node_data.peer_id() != &node.peer_id()? { @@ -135,7 +193,71 @@ pub fn main() -> Result<(), String> { } } - std::thread::sleep(Duration::from_secs(2)); node.stop()?; Ok(()) } + +#[ignore] +#[tokio::test] +#[serial] +async fn default_echo() -> Result<(), String> { + let config = WakuNodeConfig { + log_level: Some(WakuLogLevel::Error), + ..Default::default() + }; + + let node = waku_new(Some(config))?; + let node = node.start()?; + println!("Node peer id: {}", node.peer_id()?); + + for node_address in NODES { + let address: Multiaddr = node_address.parse().unwrap(); + let peer_id = node.add_peer(&address, ProtocolId::Relay)?; + node.connect_peer_with_id(peer_id, None)?; + } + + assert!(node.peers()?.len() >= NODES.len()); + assert!(node.peer_count()? >= NODES.len()); + + assert!(node.relay_enough_peers(None)?); + let sk = SecretKey::new(&mut thread_rng()); + let ssk = Aes256Gcm::generate_key(&mut thread_rng()); + + // subscribe to default channel + node.relay_subscribe(None)?; + let content_topic = WakuContentTopic::new("toychat", 2, "huilong", Encoding::Proto); + + let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT)); + tokio::pin!(sleep); + + // Send and receive messages. Waits until all messages received. + let got_all = tokio::select! { + _ = sleep => false, + _ = test_echo_messages(&node, ECHO_MESSAGE, content_topic, sk, ssk) => true, + }; + + assert!(got_all); + + for node_data in node.peers()? { + if node_data.peer_id() != &node.peer_id()? { + node.disconnect_peer_with_id(node_data.peer_id())?; + } + } + + node.stop()?; + Ok(()) +} + +#[test] +#[serial] +fn node_restart() { + let config = WakuNodeConfig::default(); + + for _ in 0..3 { + let node = waku_new(config.clone().into()).expect("default config should be valid"); + let node = node.start().expect("node should start with valid config"); + + assert!(node.peer_id().is_ok()); + node.stop().expect("node should stop"); + } +}