diff --git a/nomos-cli/Cargo.toml b/nomos-cli/Cargo.toml index 4d9876ae..34c3e0a0 100644 --- a/nomos-cli/Cargo.toml +++ b/nomos-cli/Cargo.toml @@ -9,7 +9,7 @@ description = "Cli app to interact with Nomos nodes and perform various tasks" [dependencies] fraction = "0.13" tracing = "0.1" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } async-trait = "0.1" clap = { version = "4", features = ["derive"] } serde_yaml = "0.9" diff --git a/nomos-cli/src/da/network/swarm.rs b/nomos-cli/src/da/network/swarm.rs index 2f0674a6..41aa4123 100644 --- a/nomos-cli/src/da/network/swarm.rs +++ b/nomos-cli/src/da/network/swarm.rs @@ -144,3 +144,131 @@ where } } } + +#[cfg(test)] +pub mod test { + use crate::da::network::swarm::ExecutorSwarm; + use crate::test_utils::AllNeighbours; + use futures::StreamExt; + use kzgrs_backend::common::blob::DaBlob; + use kzgrs_backend::common::Column; + use libp2p::identity::Keypair; + use libp2p::PeerId; + use nomos_da_network_core::address_book::AddressBook; + use nomos_da_network_core::behaviour::validator::ValidatorBehaviourEvent; + use nomos_da_network_core::protocols::dispersal::executor::behaviour::DispersalExecutorEvent; + use nomos_da_network_core::swarm::validator::ValidatorSwarm; + use nomos_libp2p::{Multiaddr, SwarmEvent}; + use std::time::Duration; + use tokio::sync::mpsc::unbounded_channel; + use tokio::time; + use tracing::{error, info}; + use tracing_subscriber::fmt::TestWriter; + use tracing_subscriber::EnvFilter; + + #[tokio::test] + async fn test_dispersal_with_swarms() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .compact() + .with_writer(TestWriter::default()) + .try_init(); + let k1 = Keypair::generate_ed25519(); + let k2 = Keypair::generate_ed25519(); + let executor_peer = PeerId::from_public_key(&k1.public()); + let validator_peer = PeerId::from_public_key(&k2.public()); + let neighbours = AllNeighbours { + neighbours: [ + PeerId::from_public_key(&k1.public()), + PeerId::from_public_key(&k2.public()), + ] + .into_iter() + .collect(), + }; + + let addr: Multiaddr = "/ip4/127.0.0.1/udp/5063/quic-v1".parse().unwrap(); + let addr2 = addr.clone().with_p2p(validator_peer).unwrap(); + let addr2_book = AddressBook::from_iter(vec![(executor_peer, addr2.clone())]); + let (dispersal_events_sender, _) = unbounded_channel(); + + let mut executor = ExecutorSwarm::new(k1, neighbours.clone(), dispersal_events_sender); + let (mut validator, _) = ValidatorSwarm::new(k2, neighbours.clone(), addr2_book); + + let msg_count = 10usize; + let validator_task = async move { + let validator_swarm = validator.protocol_swarm_mut(); + validator_swarm.listen_on(addr).unwrap(); + + let mut res = vec![]; + loop { + match validator_swarm.select_next_some().await { + SwarmEvent::Behaviour(ValidatorBehaviourEvent::Dispersal(event)) => { + res.push(event); + } + event => { + info!("Validator event: {event:?}"); + } + } + if res.len() == msg_count { + tokio::time::sleep(Duration::from_secs(1)).await; + break; + } + } + res + }; + let join_validator = tokio::spawn(validator_task); + + executor.dial(addr2).unwrap(); + + let executor_open_stream_sender = executor.open_stream_sender(); + let executor_disperse_blob_sender = executor.blobs_sender(); + + let executor_poll = async move { + let mut res = vec![]; + loop { + tokio::select! { + Some(event) = executor.swarm.next() => { + info!("Executor event: {event:?}"); + if let SwarmEvent::Behaviour(DispersalExecutorEvent::DispersalSuccess{blob_id, ..}) = event { + res.push(blob_id); + } + } + + _ = time::sleep(Duration::from_secs(2)) => { + if res.len() < msg_count {error!("Executor timeout reached");} + break; + } + } + } + res + }; + + let executor_task = tokio::spawn(executor_poll); + + executor_open_stream_sender.send(validator_peer).unwrap(); + tokio::time::sleep(Duration::from_secs(1)).await; + + for i in 0..msg_count { + info!("Sending blob {i}..."); + executor_disperse_blob_sender + .send(( + 0, + DaBlob { + column_idx: 0, + column: Column(vec![]), + column_commitment: Default::default(), + aggregated_column_commitment: Default::default(), + aggregated_column_proof: Default::default(), + rows_commitments: vec![], + rows_proofs: vec![], + }, + )) + .unwrap(); + } + + assert_eq!( + executor_task.await.unwrap().len(), + join_validator.await.unwrap().len() + ); + } +} diff --git a/nomos-cli/src/lib.rs b/nomos-cli/src/lib.rs index 27e0099a..f2c30c50 100644 --- a/nomos-cli/src/lib.rs +++ b/nomos-cli/src/lib.rs @@ -2,6 +2,9 @@ pub mod api; pub mod cmds; pub mod da; +#[cfg(test)] +pub mod test_utils; + use clap::Parser; use cmds::Command; diff --git a/nomos-cli/src/test_utils.rs b/nomos-cli/src/test_utils.rs new file mode 100644 index 00000000..55588604 --- /dev/null +++ b/nomos-cli/src/test_utils.rs @@ -0,0 +1,25 @@ +use libp2p::PeerId; +use std::collections::HashSet; +use subnetworks_assignations::MembershipHandler; + +#[derive(Clone)] +pub struct AllNeighbours { + pub neighbours: HashSet, +} + +impl MembershipHandler for AllNeighbours { + type NetworkId = u32; + type Id = PeerId; + + fn membership(&self, _self_id: &Self::Id) -> HashSet { + [0].into_iter().collect() + } + + fn is_allowed(&self, _id: &Self::Id) -> bool { + true + } + + fn members_of(&self, _network_id: &Self::NetworkId) -> HashSet { + self.neighbours.clone() + } +}