1
0
mirror of synced 2025-02-15 17:26:40 +00:00

test: calling poll on validator_behaviour

This commit is contained in:
Roman 2024-10-25 16:27:35 +08:00
parent b7e8eb627c
commit ecd45c12e4
No known key found for this signature in database
GPG Key ID: B8FE070B54E11B75

View File

@ -156,7 +156,7 @@ mod tests {
};
use crate::protocols::replication::handler::DaMessage;
use crate::protocols::sampling::behaviour::{SamplingBehaviour, SamplingEvent};
use futures::task::ArcWake;
use futures::task::{waker_ref, ArcWake};
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::common::Column;
use libp2p::identity::Keypair;
@ -357,11 +357,13 @@ mod tests {
.map(|(_, peer_id, _)| peer_id.clone())
.collect::<Vec<_>>();
let addressbook = AddressBook::from_iter(
subnet_1_config
.iter()
.map(|(_, peer_id, addr)| (peer_id.clone(), addr.clone().with_p2p(peer_id.clone()).unwrap())),
);
let addressbook =
AddressBook::from_iter(subnet_1_config.iter().map(|(_, peer_id, addr)| {
(
peer_id.clone(),
addr.clone().with_p2p(peer_id.clone()).unwrap(),
)
}));
let subnet_0_membership = create_membership(num_instances / 2, 0, &subnet_0_ids);
let subnet_1_membership = create_membership(num_instances / 2, 0, &subnet_1_ids);
@ -378,40 +380,29 @@ mod tests {
for i in 0..num_instances / 2 {
let (k, executor_peer, addr) = subnet_0_config[i].clone();
let (k2, validator_peer, addr2) = subnet_1_config[i].clone();
let executor = executor_swarm(addressbook.clone(), k, executor_peer, all_neighbours.clone());
let executor = executor_swarm(
addressbook.clone(),
k,
executor_peer,
all_neighbours.clone(),
);
let validator = validator_swarm(k2, all_neighbours.clone());
executor_swarms.push(executor);
validator_swarms.push(validator);
}
let (validator_key, validator_id, validator_addr) = subnet_1_config[0].clone();
let validator_addr_p2p = validator_addr.clone().with_p2p(validator_id.clone()).unwrap();
let validator_addr_p2p = validator_addr
.clone()
.with_p2p(validator_id.clone())
.unwrap();
let msg_count = 10usize;
let validator_task = async move {
validator_swarms[0].listen_on(validator_addr).unwrap();
let mut res = vec![];
loop {
match validator_swarms[0].next().await {
Some(SwarmEvent::Behaviour(DispersalEvent::IncomingMessage { message })) => {
res.push(message);
}
event => {
info!("Validator event: {event:?}");
}
}
if res.len() == msg_count {
tokio::time::sleep(Duration::from_secs(2)).await;
break;
}
}
res
};
let join_validator = tokio::spawn(validator_task);
validator_swarms[0].listen_on(validator_addr).unwrap();
let validator_behaviour = validator_swarms[0].behaviour_mut();
let executor_disperse_blob_sender = executor_swarms[0].behaviour().blobs_sender();
let msg_count = 10usize;
let executor_poll = async move {
let mut res = vec![];
loop {
@ -423,7 +414,7 @@ mod tests {
}
}
_ = time::sleep(Duration::from_secs(1)) => {
_ = time::sleep(Duration::from_secs(10)) => {
if res.len() < msg_count {error!("Executor timeout reached");}
break;
}
@ -452,9 +443,43 @@ mod tests {
.unwrap();
}
assert_eq!(
executor_task.await.unwrap().len(),
join_validator.await.unwrap().len()
);
tokio::time::sleep(Duration::from_secs(1)).await;
// let mut res = vec![];
// loop {
// match validator_swarms[0].next().await {
// Some(SwarmEvent::Behaviour(DispersalEvent::IncomingMessage { message })) => {
// res.push(message);
// }
// event => {
// info!("Validator event: {event:?}");
// }
// }
// if res.len() == msg_count {
// tokio::time::sleep(Duration::from_secs(2)).await;
// break;
// }
// }
let waker = Arc::new(TestWaker);
let waker_ref = waker_ref(&waker);
let mut cx = Context::from_waker(&waker_ref);
let mut res = vec![];
loop {
match validator_behaviour.poll(&mut cx) {
event => {
info!("Validator collected event: {event:?}");
res.push(event);
}
}
if res.len() == msg_count {
tokio::time::sleep(Duration::from_secs(2)).await;
break;
}
}
executor_task.await.unwrap();
}
}