diff --git a/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs b/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs index 0fa9cc78..c519e112 100644 --- a/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs +++ b/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs @@ -150,11 +150,10 @@ impl + 'static> Netw mod tests { use super::*; use crate::address_book::AddressBook; - use crate::behaviour::executor::ExecutorBehaviour; use crate::protocols::dispersal::executor::behaviour::{ DispersalExecutorBehaviour, DispersalExecutorEvent, }; - use crate::protocols::sampling::behaviour::SamplingBehaviour; + use futures::future::join_all; use futures::task::ArcWake; use kzgrs_backend::common::blob::DaBlob; use kzgrs_backend::common::Column; @@ -163,10 +162,8 @@ mod tests { use libp2p::{identity, quic, PeerId, Swarm}; use nomos_da_messages::common::Blob; use std::collections::{HashMap, HashSet}; - use std::mem; use std::sync::Arc; use std::time::Duration; - use futures::future::join_all; use tokio::sync::mpsc::UnboundedSender; use tokio::time; use tracing::{error, info}; @@ -229,8 +226,6 @@ mod tests { ) -> libp2p::Swarm< DispersalExecutorBehaviour>, > { - info!("Creating executor_swarm with peerID {}", peer_id); - libp2p::SwarmBuilder::with_existing_identity(key) .with_tokio() .with_other_transport(|keypair| quic::tokio::Transport::new(quic::Config::new(keypair))) @@ -405,10 +400,10 @@ mod tests { let mut validator_1_swarms: Vec<_> = vec![]; for i in 0..all_instances / 4 { - let (k, executor_peer, addr) = executor_0_config[i].clone(); - let (k2, _, addr2) = validator_0_config[i].clone(); - let (k3, executor_peer2, addr3) = executor_1_config[i].clone(); - let (k4, _, addr4) = validator_1_config[i].clone(); + let (k, executor_peer, _) = executor_0_config[i].clone(); + let (k2, _, _) = validator_0_config[i].clone(); + let (k3, executor_peer2, _) = executor_1_config[i].clone(); + let (k4, _, _) = validator_1_config[i].clone(); let executor_0 = executor_swarm( validator_addressbook.clone(), @@ -433,17 +428,16 @@ mod tests { // Let validator swarms to listen for i in 0..all_instances / 4 { - let (_, peer_id, addr) = validator_0_config[i].clone(); - let validator_0_addr = addr.clone().with_p2p(peer_id).unwrap(); - validator_0_swarms[i].listen_on(validator_0_addr).unwrap(); + let (_, _, mut addr) = validator_0_config[i].clone(); + validator_0_swarms[i].listen_on(addr).unwrap(); + tokio::time::sleep(Duration::from_secs(1)).await; - let (_, peer_id, addr) = validator_1_config[i].clone(); - let validator_1_addr = addr.clone().with_p2p(peer_id).unwrap(); - validator_1_swarms[i].listen_on(validator_1_addr).unwrap(); + (_, _, addr) = validator_1_config[i].clone(); + validator_1_swarms[i].listen_on(addr).unwrap(); + tokio::time::sleep(Duration::from_secs(1)).await; } - tokio::time::sleep(Duration::from_secs(5)).await; - + // Collect blob message senders from executors let mut message_senders: Vec> = Vec::new(); for i in 0..all_instances / 4 { @@ -453,14 +447,18 @@ mod tests { } async fn run_executor_swarm( - mut swarm: Swarm + Sized + 'static>> + mut swarm: Swarm< + DispersalExecutorBehaviour< + impl MembershipHandler + Sized + 'static, + >, + >, ) { let mut msg_counter = 0; loop { tokio::select! { Some(event) = swarm.next() => { info!("Executor event: {event:?}"); - if let SwarmEvent::Behaviour(DispersalExecutorEvent::DispersalSuccess{blob_id, ..}) = event { + if let SwarmEvent::Behaviour(DispersalExecutorEvent::DispersalSuccess{..}) = event { msg_counter += 1; } } @@ -474,7 +472,11 @@ mod tests { } async fn run_validator_swarm( - mut swarm: Swarm + Sized + 'static>> + mut swarm: Swarm< + DispersalValidatorBehaviour< + impl MembershipHandler + Sized + 'static, + >, + >, ) { let mut msg_counter = 0; loop { @@ -485,7 +487,8 @@ mod tests { // Check data has structure and content as expected match message.blob { Some(Blob { blob_id, data }) => { - let deserialized_blob: DaBlob = bincode::deserialize(&data).unwrap(); + let deserialized_blob: DaBlob = + bincode::deserialize(&data).unwrap(); assert_eq!(blob_id, deserialized_blob.id()); msg_counter += 1; } @@ -508,12 +511,9 @@ mod tests { break; } } - } - async fn send_dispersal_messages( - disperse_blob_sender: UnboundedSender<(u32, DaBlob)> - ) { + async fn send_dispersal_messages(disperse_blob_sender: UnboundedSender<(u32, DaBlob)>) { for i in 0..10 { info!("Sending blob {i}..."); disperse_blob_sender @@ -539,7 +539,7 @@ mod tests { for i in (0..all_instances / 4).rev() { let swarm = executor_0_swarms.remove(i); let executor_poll = async { - run_executor_swarm(swarm); + run_executor_swarm(swarm).await; }; let executor_task = tokio::spawn(executor_poll); @@ -547,18 +547,18 @@ mod tests { let swarm = executor_1_swarms.remove(i); let executor_poll = async { - run_executor_swarm(swarm); + run_executor_swarm(swarm).await; }; let executor_task = tokio::spawn(executor_poll); executor_tasks.push(executor_task); } - // Send messages + // Send messages in parallel from all executors for i in (0..all_instances / 2).rev() { let sender = message_senders.remove(i); let send_messages_task = async { - send_dispersal_messages(sender); + send_dispersal_messages(sender).await; }; tokio::spawn(send_messages_task); } @@ -567,20 +567,17 @@ mod tests { for i in (0..all_instances / 4).rev() { let swarm = validator_0_swarms.remove(i); let validator_poll = async { - run_validator_swarm(swarm); + run_validator_swarm(swarm).await; }; tokio::spawn(validator_poll); let swarm = validator_1_swarms.remove(i); let validator_poll = async { - run_validator_swarm(swarm); + run_validator_swarm(swarm).await; }; tokio::spawn(validator_poll); } - - - join_all(executor_tasks).await; } }