chore: Da dispersal unit tests update (#720)

* test: prepare to call stream disperse

* fix: formatting

* fix: remove test_stream_disperse_error_cases

* test: utils for cli

* test: test_dispersal_with_swarms
- runtime preparation

* fix: cleanup
- tracing_subscriber

* fix: rewrite without using sampling

* fix: outstanding space

* fix: use executor.run

* fix: formatting

* test: add replies handling

* fix: formatting

* fix: switch to logging system

* fix: executor should wait for correct event
- cleanup

* Update nomos-cli/src/da/network/swarm.rs

Fix ignore value syntax.

Co-authored-by: gusto <bacv@users.noreply.github.com>

---------

Co-authored-by: gusto <bacv@users.noreply.github.com>
This commit is contained in:
Roman Zajic 2024-09-24 20:37:56 +08:00 committed by GitHub
parent 9f4f139771
commit 8142feaa8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 157 additions and 1 deletions

View File

@ -9,7 +9,7 @@ description = "Cli app to interact with Nomos nodes and perform various tasks"
[dependencies]
fraction = "0.13"
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
async-trait = "0.1"
clap = { version = "4", features = ["derive"] }
serde_yaml = "0.9"

View File

@ -144,3 +144,131 @@ where
}
}
}
#[cfg(test)]
pub mod test {
use crate::da::network::swarm::ExecutorSwarm;
use crate::test_utils::AllNeighbours;
use futures::StreamExt;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::common::Column;
use libp2p::identity::Keypair;
use libp2p::PeerId;
use nomos_da_network_core::address_book::AddressBook;
use nomos_da_network_core::behaviour::validator::ValidatorBehaviourEvent;
use nomos_da_network_core::protocols::dispersal::executor::behaviour::DispersalExecutorEvent;
use nomos_da_network_core::swarm::validator::ValidatorSwarm;
use nomos_libp2p::{Multiaddr, SwarmEvent};
use std::time::Duration;
use tokio::sync::mpsc::unbounded_channel;
use tokio::time;
use tracing::{error, info};
use tracing_subscriber::fmt::TestWriter;
use tracing_subscriber::EnvFilter;
#[tokio::test]
async fn test_dispersal_with_swarms() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.compact()
.with_writer(TestWriter::default())
.try_init();
let k1 = Keypair::generate_ed25519();
let k2 = Keypair::generate_ed25519();
let executor_peer = PeerId::from_public_key(&k1.public());
let validator_peer = PeerId::from_public_key(&k2.public());
let neighbours = AllNeighbours {
neighbours: [
PeerId::from_public_key(&k1.public()),
PeerId::from_public_key(&k2.public()),
]
.into_iter()
.collect(),
};
let addr: Multiaddr = "/ip4/127.0.0.1/udp/5063/quic-v1".parse().unwrap();
let addr2 = addr.clone().with_p2p(validator_peer).unwrap();
let addr2_book = AddressBook::from_iter(vec![(executor_peer, addr2.clone())]);
let (dispersal_events_sender, _) = unbounded_channel();
let mut executor = ExecutorSwarm::new(k1, neighbours.clone(), dispersal_events_sender);
let (mut validator, _) = ValidatorSwarm::new(k2, neighbours.clone(), addr2_book);
let msg_count = 10usize;
let validator_task = async move {
let validator_swarm = validator.protocol_swarm_mut();
validator_swarm.listen_on(addr).unwrap();
let mut res = vec![];
loop {
match validator_swarm.select_next_some().await {
SwarmEvent::Behaviour(ValidatorBehaviourEvent::Dispersal(event)) => {
res.push(event);
}
event => {
info!("Validator event: {event:?}");
}
}
if res.len() == msg_count {
tokio::time::sleep(Duration::from_secs(1)).await;
break;
}
}
res
};
let join_validator = tokio::spawn(validator_task);
executor.dial(addr2).unwrap();
let executor_open_stream_sender = executor.open_stream_sender();
let executor_disperse_blob_sender = executor.blobs_sender();
let executor_poll = async move {
let mut res = vec![];
loop {
tokio::select! {
Some(event) = executor.swarm.next() => {
info!("Executor event: {event:?}");
if let SwarmEvent::Behaviour(DispersalExecutorEvent::DispersalSuccess{blob_id, ..}) = event {
res.push(blob_id);
}
}
_ = time::sleep(Duration::from_secs(2)) => {
if res.len() < msg_count {error!("Executor timeout reached");}
break;
}
}
}
res
};
let executor_task = tokio::spawn(executor_poll);
executor_open_stream_sender.send(validator_peer).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
for i in 0..msg_count {
info!("Sending blob {i}...");
executor_disperse_blob_sender
.send((
0,
DaBlob {
column_idx: 0,
column: Column(vec![]),
column_commitment: Default::default(),
aggregated_column_commitment: Default::default(),
aggregated_column_proof: Default::default(),
rows_commitments: vec![],
rows_proofs: vec![],
},
))
.unwrap();
}
assert_eq!(
executor_task.await.unwrap().len(),
join_validator.await.unwrap().len()
);
}
}

View File

@ -2,6 +2,9 @@ pub mod api;
pub mod cmds;
pub mod da;
#[cfg(test)]
pub mod test_utils;
use clap::Parser;
use cmds::Command;

View File

@ -0,0 +1,25 @@
use libp2p::PeerId;
use std::collections::HashSet;
use subnetworks_assignations::MembershipHandler;
#[derive(Clone)]
pub struct AllNeighbours {
pub neighbours: HashSet<PeerId>,
}
impl MembershipHandler for AllNeighbours {
type NetworkId = u32;
type Id = PeerId;
fn membership(&self, _self_id: &Self::Id) -> HashSet<Self::NetworkId> {
[0].into_iter().collect()
}
fn is_allowed(&self, _id: &Self::Id) -> bool {
true
}
fn members_of(&self, _network_id: &Self::NetworkId) -> HashSet<Self::Id> {
self.neighbours.clone()
}
}