From ecd45c12e4725291018d1794cc990f60aa8eaa44 Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 25 Oct 2024 16:27:35 +0800 Subject: [PATCH] test: calling poll on validator_behaviour --- .../dispersal/validator/behaviour.rs | 95 ++++++++++++------- 1 file changed, 60 insertions(+), 35 deletions(-) diff --git a/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs b/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs index d2ef4554..746e8116 100644 --- a/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs +++ b/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs @@ -156,7 +156,7 @@ mod tests { }; use crate::protocols::replication::handler::DaMessage; use crate::protocols::sampling::behaviour::{SamplingBehaviour, SamplingEvent}; - use futures::task::ArcWake; + use futures::task::{waker_ref, ArcWake}; use kzgrs_backend::common::blob::DaBlob; use kzgrs_backend::common::Column; use libp2p::identity::Keypair; @@ -357,11 +357,13 @@ mod tests { .map(|(_, peer_id, _)| peer_id.clone()) .collect::>(); - let addressbook = AddressBook::from_iter( - subnet_1_config - .iter() - .map(|(_, peer_id, addr)| (peer_id.clone(), addr.clone().with_p2p(peer_id.clone()).unwrap())), - ); + let addressbook = + AddressBook::from_iter(subnet_1_config.iter().map(|(_, peer_id, addr)| { + ( + peer_id.clone(), + addr.clone().with_p2p(peer_id.clone()).unwrap(), + ) + })); let subnet_0_membership = create_membership(num_instances / 2, 0, &subnet_0_ids); let subnet_1_membership = create_membership(num_instances / 2, 0, &subnet_1_ids); @@ -378,40 +380,29 @@ mod tests { for i in 0..num_instances / 2 { let (k, executor_peer, addr) = subnet_0_config[i].clone(); let (k2, validator_peer, addr2) = subnet_1_config[i].clone(); - let executor = executor_swarm(addressbook.clone(), k, executor_peer, all_neighbours.clone()); + let executor = executor_swarm( + addressbook.clone(), + k, + executor_peer, + all_neighbours.clone(), + ); let validator = validator_swarm(k2, all_neighbours.clone()); executor_swarms.push(executor); validator_swarms.push(validator); } let (validator_key, validator_id, validator_addr) = subnet_1_config[0].clone(); - let validator_addr_p2p = validator_addr.clone().with_p2p(validator_id.clone()).unwrap(); + let validator_addr_p2p = validator_addr + .clone() + .with_p2p(validator_id.clone()) + .unwrap(); - let msg_count = 10usize; - let validator_task = async move { - validator_swarms[0].listen_on(validator_addr).unwrap(); - - let mut res = vec![]; - loop { - match validator_swarms[0].next().await { - Some(SwarmEvent::Behaviour(DispersalEvent::IncomingMessage { message })) => { - res.push(message); - } - event => { - info!("Validator event: {event:?}"); - } - } - if res.len() == msg_count { - tokio::time::sleep(Duration::from_secs(2)).await; - break; - } - } - res - }; - let join_validator = tokio::spawn(validator_task); + validator_swarms[0].listen_on(validator_addr).unwrap(); + let validator_behaviour = validator_swarms[0].behaviour_mut(); let executor_disperse_blob_sender = executor_swarms[0].behaviour().blobs_sender(); + let msg_count = 10usize; let executor_poll = async move { let mut res = vec![]; loop { @@ -423,7 +414,7 @@ mod tests { } } - _ = time::sleep(Duration::from_secs(1)) => { + _ = time::sleep(Duration::from_secs(10)) => { if res.len() < msg_count {error!("Executor timeout reached");} break; } @@ -452,9 +443,43 @@ mod tests { .unwrap(); } - assert_eq!( - executor_task.await.unwrap().len(), - join_validator.await.unwrap().len() - ); + tokio::time::sleep(Duration::from_secs(1)).await; + + // let mut res = vec![]; + // loop { + // match validator_swarms[0].next().await { + // Some(SwarmEvent::Behaviour(DispersalEvent::IncomingMessage { message })) => { + // res.push(message); + // } + // event => { + // info!("Validator event: {event:?}"); + // } + // } + // if res.len() == msg_count { + // tokio::time::sleep(Duration::from_secs(2)).await; + // break; + // } + // } + + let waker = Arc::new(TestWaker); + let waker_ref = waker_ref(&waker); + let mut cx = Context::from_waker(&waker_ref); + + let mut res = vec![]; + loop { + match validator_behaviour.poll(&mut cx) { + event => { + info!("Validator collected event: {event:?}"); + res.push(event); + } + } + + if res.len() == msg_count { + tokio::time::sleep(Duration::from_secs(2)).await; + break; + } + } + + executor_task.await.unwrap(); } }