1
0
mirror of synced 2025-02-10 22:56:51 +00:00

fix: add message counting to validator tasks

This commit is contained in:
Roman 2024-10-28 15:18:59 +08:00
parent 5be2adec5c
commit 47ed7641cb
No known key found for this signature in database
GPG Key ID: B8FE070B54E11B75

View File

@ -153,7 +153,6 @@ mod tests {
use crate::protocols::dispersal::executor::behaviour::{
DispersalExecutorBehaviour, DispersalExecutorEvent,
};
use futures::future::join_all;
use futures::task::ArcWake;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::common::Column;
@ -343,13 +342,14 @@ mod tests {
.with_writer(TestWriter::default())
.try_init();
let all_instances = 20;
const ALL_INSTANCES: usize = 20;
const MESSAGES_TO_SEND: usize = 10;
let executor_0_config = prepare_swarm_config(all_instances / 4, 0);
let validator_0_config = prepare_swarm_config(all_instances / 4, 1);
let executor_0_config = prepare_swarm_config(ALL_INSTANCES / 4, 0);
let validator_0_config = prepare_swarm_config(ALL_INSTANCES / 4, 1);
let executor_1_config = prepare_swarm_config(all_instances / 4, 2);
let validator_1_config = prepare_swarm_config(all_instances / 4, 3);
let executor_1_config = prepare_swarm_config(ALL_INSTANCES / 4, 2);
let validator_1_config = prepare_swarm_config(ALL_INSTANCES / 4, 3);
let subnet_0_ids = executor_0_config
.iter()
@ -385,8 +385,8 @@ mod tests {
.chain(validator_1_config.iter().map(to_p2p_address)),
);
let subnet_0_membership = create_membership(all_instances / 2, 0, &subnet_0_ids);
let subnet_1_membership = create_membership(all_instances / 2, 0, &subnet_1_ids);
let subnet_0_membership = create_membership(ALL_INSTANCES / 2, 0, &subnet_0_ids);
let subnet_1_membership = create_membership(ALL_INSTANCES / 2, 0, &subnet_1_ids);
let mut all_neighbours = subnet_0_membership;
all_neighbours
@ -399,7 +399,7 @@ mod tests {
let mut executor_1_swarms: Vec<_> = vec![];
let mut validator_1_swarms: Vec<_> = vec![];
for i in 0..all_instances / 4 {
for i in 0..ALL_INSTANCES / 4 {
let (k, executor_peer, _) = executor_0_config[i].clone();
let (k2, _, _) = validator_0_config[i].clone();
let (k3, executor_peer2, _) = executor_1_config[i].clone();
@ -427,20 +427,18 @@ mod tests {
}
// Let validator swarms to listen
for i in 0..all_instances / 4 {
for i in 0..ALL_INSTANCES / 4 {
let (_, _, mut addr) = validator_0_config[i].clone();
validator_0_swarms[i].listen_on(addr).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
(_, _, addr) = validator_1_config[i].clone();
validator_1_swarms[i].listen_on(addr).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
}
// Collect blob message senders from executors
let mut message_senders: Vec<UnboundedSender<(u32, DaBlob)>> = Vec::new();
for i in 0..all_instances / 4 {
for i in 0..ALL_INSTANCES / 4 {
let blob_sender_0 = executor_0_swarms[i].behaviour().blobs_sender();
let blob_sender_1 = executor_1_swarms[i].behaviour().blobs_sender();
message_senders.extend(vec![blob_sender_0, blob_sender_1]);
@ -452,7 +450,7 @@ mod tests {
impl MembershipHandler<NetworkId = u32, Id = PeerId> + Sized + 'static,
>,
>,
) {
) -> usize {
let mut msg_counter = 0;
loop {
tokio::select! {
@ -464,11 +462,12 @@ mod tests {
}
_ = time::sleep(Duration::from_secs(2)) => {
if msg_counter < 10 {error!("Executor timeout reached");}
if msg_counter < MESSAGES_TO_SEND {error!("Executor timeout reached");}
break;
}
}
}
msg_counter
}
async fn run_validator_swarm(
@ -477,8 +476,8 @@ mod tests {
impl MembershipHandler<NetworkId = u32, Id = PeerId> + Sized + 'static,
>,
>,
) {
let mut msg_counter = 0;
) -> (usize, usize) {
let (mut msg_0_counter, mut msg_1_counter) = (0, 0);
loop {
match swarm.next().await {
Some(SwarmEvent::Behaviour(DispersalEvent::IncomingMessage { message })) => {
@ -490,15 +489,14 @@ mod tests {
let deserialized_blob: DaBlob =
bincode::deserialize(&data).unwrap();
assert_eq!(blob_id, deserialized_blob.id());
msg_counter += 1;
if message.subnetwork_id == 0 {
msg_0_counter += 1;
} else {
msg_1_counter += 1;
}
}
None => {}
}
match message.subnetwork_id {
subnet_id => {
assert_eq!(subnet_id, 0);
}
}
}
Some(event) => {
info!("Validator received event: {event:?}");
@ -506,19 +504,22 @@ mod tests {
_ => {}
}
if msg_counter == 10 {
tokio::time::sleep(Duration::from_secs(2)).await;
break;
if msg_0_counter == MESSAGES_TO_SEND || msg_1_counter == MESSAGES_TO_SEND {
tokio::time::sleep(Duration::from_secs(5)).await;
return (msg_0_counter, msg_1_counter)
}
}
}
async fn send_dispersal_messages(disperse_blob_sender: UnboundedSender<(u32, DaBlob)>) {
for i in 0..10 {
info!("Sending blob {i}...");
async fn send_dispersal_messages(
disperse_blob_sender: UnboundedSender<(u32, DaBlob)>,
subnet_id: u32,
) {
for i in 0..MESSAGES_TO_SEND {
info!("Sending blob {i} to subnet {subnet_id} ...");
disperse_blob_sender
.send((
0,
subnet_id,
DaBlob {
column_idx: 0,
column: Column(vec![]),
@ -536,48 +537,47 @@ mod tests {
let mut executor_tasks = vec![];
// Spawn executors
for i in (0..all_instances / 4).rev() {
for i in (0..ALL_INSTANCES / 4).rev() {
let swarm = executor_0_swarms.remove(i);
let executor_poll = async {
run_executor_swarm(swarm).await;
};
let executor_task = tokio::spawn(executor_poll);
executor_tasks.push(executor_task);
let executor_0_poll = async { run_executor_swarm(swarm).await };
let swarm = executor_1_swarms.remove(i);
let executor_poll = async {
run_executor_swarm(swarm).await;
};
let executor_1_poll = async { run_executor_swarm(swarm).await };
let executor_task = tokio::spawn(executor_poll);
executor_tasks.push(executor_task);
executor_tasks.extend(vec![tokio::spawn(executor_0_poll), tokio::spawn(executor_1_poll)]);
}
// Send messages in parallel from all executors
for i in (0..all_instances / 2).rev() {
// Send messages from all executors
for i in (0..ALL_INSTANCES / 2).rev() {
let sender = message_senders.remove(i);
let send_messages_task = async {
send_dispersal_messages(sender).await;
let send_messages_task = async move {
send_dispersal_messages(sender, if i % 2 == 0 { 0 } else { 1 }).await;
};
tokio::spawn(send_messages_task);
}
let mut validator_tasks = vec![];
// Spawn validators
for i in (0..all_instances / 4).rev() {
for i in (0..ALL_INSTANCES / 4).rev() {
let swarm = validator_0_swarms.remove(i);
let validator_poll = async {
let validator_0_poll = async {
run_validator_swarm(swarm).await;
};
tokio::spawn(validator_poll);
let swarm = validator_1_swarms.remove(i);
let validator_poll = async {
let validator_1_poll = async {
run_validator_swarm(swarm).await;
};
tokio::spawn(validator_poll);
validator_tasks.extend(vec![tokio::spawn(validator_0_poll), tokio::spawn(validator_1_poll)]);
}
join_all(executor_tasks).await;
tokio::time::sleep(Duration::from_secs(10)).await;
for task in executor_tasks {
info!("Executor task received: {:?} message dispersal success", task.await.unwrap());
}
}
}