1
0
mirror of synced 2025-02-10 22:56:51 +00:00

fix: add forgotten await and cleanup

This commit is contained in:
Roman 2024-10-28 11:01:01 +08:00
parent 4bb706c6cc
commit 5be2adec5c
No known key found for this signature in database
GPG Key ID: B8FE070B54E11B75

View File

@ -150,11 +150,10 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
mod tests {
use super::*;
use crate::address_book::AddressBook;
use crate::behaviour::executor::ExecutorBehaviour;
use crate::protocols::dispersal::executor::behaviour::{
DispersalExecutorBehaviour, DispersalExecutorEvent,
};
use crate::protocols::sampling::behaviour::SamplingBehaviour;
use futures::future::join_all;
use futures::task::ArcWake;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::common::Column;
@ -163,10 +162,8 @@ mod tests {
use libp2p::{identity, quic, PeerId, Swarm};
use nomos_da_messages::common::Blob;
use std::collections::{HashMap, HashSet};
use std::mem;
use std::sync::Arc;
use std::time::Duration;
use futures::future::join_all;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time;
use tracing::{error, info};
@ -229,8 +226,6 @@ mod tests {
) -> libp2p::Swarm<
DispersalExecutorBehaviour<impl MembershipHandler<NetworkId = u32, Id = PeerId>>,
> {
info!("Creating executor_swarm with peerID {}", peer_id);
libp2p::SwarmBuilder::with_existing_identity(key)
.with_tokio()
.with_other_transport(|keypair| quic::tokio::Transport::new(quic::Config::new(keypair)))
@ -405,10 +400,10 @@ mod tests {
let mut validator_1_swarms: Vec<_> = vec![];
for i in 0..all_instances / 4 {
let (k, executor_peer, addr) = executor_0_config[i].clone();
let (k2, _, addr2) = validator_0_config[i].clone();
let (k3, executor_peer2, addr3) = executor_1_config[i].clone();
let (k4, _, addr4) = validator_1_config[i].clone();
let (k, executor_peer, _) = executor_0_config[i].clone();
let (k2, _, _) = validator_0_config[i].clone();
let (k3, executor_peer2, _) = executor_1_config[i].clone();
let (k4, _, _) = validator_1_config[i].clone();
let executor_0 = executor_swarm(
validator_addressbook.clone(),
@ -433,17 +428,16 @@ mod tests {
// Let validator swarms to listen
for i in 0..all_instances / 4 {
let (_, peer_id, addr) = validator_0_config[i].clone();
let validator_0_addr = addr.clone().with_p2p(peer_id).unwrap();
validator_0_swarms[i].listen_on(validator_0_addr).unwrap();
let (_, _, mut addr) = validator_0_config[i].clone();
validator_0_swarms[i].listen_on(addr).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let (_, peer_id, addr) = validator_1_config[i].clone();
let validator_1_addr = addr.clone().with_p2p(peer_id).unwrap();
validator_1_swarms[i].listen_on(validator_1_addr).unwrap();
(_, _, addr) = validator_1_config[i].clone();
validator_1_swarms[i].listen_on(addr).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
}
tokio::time::sleep(Duration::from_secs(5)).await;
// Collect blob message senders from executors
let mut message_senders: Vec<UnboundedSender<(u32, DaBlob)>> = Vec::new();
for i in 0..all_instances / 4 {
@ -453,14 +447,18 @@ mod tests {
}
async fn run_executor_swarm(
mut swarm: Swarm<DispersalExecutorBehaviour<impl MembershipHandler<NetworkId=u32, Id=PeerId> + Sized + 'static>>
mut swarm: Swarm<
DispersalExecutorBehaviour<
impl MembershipHandler<NetworkId = u32, Id = PeerId> + Sized + 'static,
>,
>,
) {
let mut msg_counter = 0;
loop {
tokio::select! {
Some(event) = swarm.next() => {
info!("Executor event: {event:?}");
if let SwarmEvent::Behaviour(DispersalExecutorEvent::DispersalSuccess{blob_id, ..}) = event {
if let SwarmEvent::Behaviour(DispersalExecutorEvent::DispersalSuccess{..}) = event {
msg_counter += 1;
}
}
@ -474,7 +472,11 @@ mod tests {
}
async fn run_validator_swarm(
mut swarm: Swarm<DispersalValidatorBehaviour<impl MembershipHandler<NetworkId=u32, Id=PeerId> + Sized + 'static>>
mut swarm: Swarm<
DispersalValidatorBehaviour<
impl MembershipHandler<NetworkId = u32, Id = PeerId> + Sized + 'static,
>,
>,
) {
let mut msg_counter = 0;
loop {
@ -485,7 +487,8 @@ mod tests {
// Check data has structure and content as expected
match message.blob {
Some(Blob { blob_id, data }) => {
let deserialized_blob: DaBlob = bincode::deserialize(&data).unwrap();
let deserialized_blob: DaBlob =
bincode::deserialize(&data).unwrap();
assert_eq!(blob_id, deserialized_blob.id());
msg_counter += 1;
}
@ -508,12 +511,9 @@ mod tests {
break;
}
}
}
async fn send_dispersal_messages(
disperse_blob_sender: UnboundedSender<(u32, DaBlob)>
) {
async fn send_dispersal_messages(disperse_blob_sender: UnboundedSender<(u32, DaBlob)>) {
for i in 0..10 {
info!("Sending blob {i}...");
disperse_blob_sender
@ -539,7 +539,7 @@ mod tests {
for i in (0..all_instances / 4).rev() {
let swarm = executor_0_swarms.remove(i);
let executor_poll = async {
run_executor_swarm(swarm);
run_executor_swarm(swarm).await;
};
let executor_task = tokio::spawn(executor_poll);
@ -547,18 +547,18 @@ mod tests {
let swarm = executor_1_swarms.remove(i);
let executor_poll = async {
run_executor_swarm(swarm);
run_executor_swarm(swarm).await;
};
let executor_task = tokio::spawn(executor_poll);
executor_tasks.push(executor_task);
}
// Send messages
// Send messages in parallel from all executors
for i in (0..all_instances / 2).rev() {
let sender = message_senders.remove(i);
let send_messages_task = async {
send_dispersal_messages(sender);
send_dispersal_messages(sender).await;
};
tokio::spawn(send_messages_task);
}
@ -567,20 +567,17 @@ mod tests {
for i in (0..all_instances / 4).rev() {
let swarm = validator_0_swarms.remove(i);
let validator_poll = async {
run_validator_swarm(swarm);
run_validator_swarm(swarm).await;
};
tokio::spawn(validator_poll);
let swarm = validator_1_swarms.remove(i);
let validator_poll = async {
run_validator_swarm(swarm);
run_validator_swarm(swarm).await;
};
tokio::spawn(validator_poll);
}
join_all(executor_tasks).await;
}
}