Add unhappy path tests (#247)
* Make timeout configurable Add a way to configure the consensus timeout at startup. * Make leader threshold and timeout configurable in tests * Add tests for the unhappy path Add a test for the unhappy path by stopping a node. The rest of the peers are sufficient to reach a quorum but the offline node will fail to produce a block when it's its turn as a leader, thus triggering the recovery procedure twice before the test is considered complete. * ignore clippy warning
This commit is contained in:
parent
2b9769b5b7
commit
da2dba2e51
|
@ -46,8 +46,11 @@ use overwatch_rs::services::{
|
|||
ServiceCore, ServiceData, ServiceId,
|
||||
};
|
||||
|
||||
// TODO: tale this from config
|
||||
const TIMEOUT: Duration = Duration::from_secs(60);
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
fn default_timeout() -> Duration {
|
||||
DEFAULT_TIMEOUT
|
||||
}
|
||||
|
||||
// Raw bytes for now, could be a ed25519 public key
|
||||
pub type NodeId = PublicKey;
|
||||
|
@ -59,6 +62,8 @@ pub struct CarnotSettings<Fountain: FountainCode, O: Overlay> {
|
|||
pub private_key: [u8; 32],
|
||||
pub fountain_settings: Fountain::Settings,
|
||||
pub overlay_settings: O::Settings,
|
||||
#[serde(default = "default_timeout")]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
impl<Fountain: FountainCode, O: Overlay> Clone for CarnotSettings<Fountain, O> {
|
||||
|
@ -67,6 +72,7 @@ impl<Fountain: FountainCode, O: Overlay> Clone for CarnotSettings<Fountain, O> {
|
|||
private_key: self.private_key,
|
||||
fountain_settings: self.fountain_settings.clone(),
|
||||
overlay_settings: self.overlay_settings.clone(),
|
||||
timeout: self.timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -77,11 +83,13 @@ impl<Fountain: FountainCode, O: Overlay> CarnotSettings<Fountain, O> {
|
|||
private_key: [u8; 32],
|
||||
fountain_settings: Fountain::Settings,
|
||||
overlay_settings: O::Settings,
|
||||
timeout: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
private_key,
|
||||
fountain_settings,
|
||||
overlay_settings,
|
||||
timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -166,6 +174,7 @@ where
|
|||
private_key,
|
||||
fountain_settings,
|
||||
overlay_settings,
|
||||
timeout,
|
||||
} = self.service_state.settings_reader.get_updated_settings();
|
||||
|
||||
let overlay = O::new(overlay_settings);
|
||||
|
@ -198,6 +207,7 @@ where
|
|||
genesis_block.view - 1,
|
||||
&mut task_manager,
|
||||
adapter.clone(),
|
||||
timeout,
|
||||
)
|
||||
.await;
|
||||
// we already have the genesis block, no need to wait for it
|
||||
|
@ -236,6 +246,7 @@ where
|
|||
private_key,
|
||||
mempool_relay.clone(),
|
||||
&fountain,
|
||||
timeout,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
@ -287,6 +298,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn process_carnot_event(
|
||||
mut carnot: Carnot<O>,
|
||||
event: Event<P::Tx>,
|
||||
|
@ -295,6 +307,7 @@ where
|
|||
private_key: PrivateKey,
|
||||
mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>,
|
||||
fountain: &F,
|
||||
timeout: Duration,
|
||||
) -> Carnot<O> {
|
||||
let mut output = None;
|
||||
let prev_view = carnot.current_view();
|
||||
|
@ -316,7 +329,7 @@ where
|
|||
output = out.map(Output::Send);
|
||||
// keep timeout until the situation is resolved
|
||||
task_manager.push(view, async move {
|
||||
tokio::time::sleep(TIMEOUT).await;
|
||||
tokio::time::sleep(timeout).await;
|
||||
Event::LocalTimeout { view }
|
||||
});
|
||||
}
|
||||
|
@ -349,8 +362,14 @@ where
|
|||
|
||||
let current_view = carnot.current_view();
|
||||
if current_view != prev_view {
|
||||
Self::process_view_change(carnot.clone(), prev_view, task_manager, adapter.clone())
|
||||
.await;
|
||||
Self::process_view_change(
|
||||
carnot.clone(),
|
||||
prev_view,
|
||||
task_manager,
|
||||
adapter.clone(),
|
||||
timeout,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
if let Some(output) = output {
|
||||
|
@ -369,6 +388,11 @@ where
|
|||
adapter: A,
|
||||
) -> (Carnot<O>, Option<Output<P::Tx>>) {
|
||||
tracing::debug!("received proposal {:?}", block);
|
||||
if carnot.highest_voted_view() >= block.header().view {
|
||||
tracing::debug!("already voted for view {}", block.header().view);
|
||||
return (carnot, None);
|
||||
}
|
||||
|
||||
let original_block = block;
|
||||
let block = original_block.header().clone();
|
||||
let self_committee = carnot.self_committee();
|
||||
|
@ -498,8 +522,18 @@ where
|
|||
carnot: Carnot<O>,
|
||||
timeouts: HashSet<Timeout>,
|
||||
) -> (Carnot<O>, Option<Output<P::Tx>>) {
|
||||
// TODO: filter timeouts upon reception
|
||||
assert!(timeouts.iter().all(|t| t.view == carnot.current_view()));
|
||||
// we might have received a timeout_qc sent by some other node and advanced the view
|
||||
// already, in which case we should ignore the timeout
|
||||
if carnot.current_view() != timeouts.iter().map(|t| t.view).max().unwrap_or(0) {
|
||||
return (carnot, None);
|
||||
}
|
||||
|
||||
assert!(
|
||||
timeouts.iter().all(|t| t.view == carnot.current_view()),
|
||||
"{:?} {}",
|
||||
timeouts.iter().collect::<Vec<_>>(),
|
||||
carnot.current_view(),
|
||||
);
|
||||
let high_qc = timeouts
|
||||
.iter()
|
||||
.map(|t| &t.high_qc)
|
||||
|
@ -548,6 +582,7 @@ where
|
|||
prev_view: View,
|
||||
task_manager: &mut TaskManager<View, Event<P::Tx>>,
|
||||
adapter: A,
|
||||
timeout: Duration,
|
||||
) {
|
||||
let current_view = carnot.current_view();
|
||||
// First we cancel previous processing view tasks
|
||||
|
@ -555,7 +590,7 @@ where
|
|||
tracing::debug!("Advanced view from {prev_view} to {current_view}");
|
||||
// View change!
|
||||
task_manager.push(current_view, async move {
|
||||
tokio::time::sleep(TIMEOUT).await;
|
||||
tokio::time::sleep(timeout).await;
|
||||
Event::LocalTimeout { view: current_view }
|
||||
});
|
||||
task_manager.push(
|
||||
|
|
|
@ -31,6 +31,10 @@ fraction = "0.13"
|
|||
name = "test_consensus_happy_path"
|
||||
path = "src/tests/happy.rs"
|
||||
|
||||
[[test]]
|
||||
name = "test_consensus_unhappy_path"
|
||||
path = "src/tests/unhappy.rs"
|
||||
|
||||
|
||||
[features]
|
||||
metrics = ["nomos-node/metrics"]
|
||||
|
|
|
@ -1,12 +1,17 @@
|
|||
mod nodes;
|
||||
|
||||
pub use nodes::NomosNode;
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::SeedableRng;
|
||||
use rand_xoshiro::Xoshiro256PlusPlus;
|
||||
|
||||
// std
|
||||
use std::fmt::Debug;
|
||||
use std::net::TcpListener;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
//crates
|
||||
use fraction::Fraction;
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::SeedableRng;
|
||||
use rand_xoshiro::Xoshiro256PlusPlus;
|
||||
|
||||
static RNG: Lazy<Mutex<Xoshiro256PlusPlus>> =
|
||||
Lazy::new(|| Mutex::new(Xoshiro256PlusPlus::seed_from_u64(42)));
|
||||
|
@ -32,5 +37,9 @@ pub trait Node: Sized {
|
|||
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum SpawnConfig {
|
||||
Star { n_participants: usize },
|
||||
Star {
|
||||
n_participants: usize,
|
||||
threshold: Fraction,
|
||||
timeout: Duration,
|
||||
},
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@ use nomos_network::{
|
|||
use nomos_node::Config;
|
||||
use waku_bindings::{Multiaddr, PeerId};
|
||||
// crates
|
||||
use fraction::{Fraction, One};
|
||||
use fraction::Fraction;
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::Rng;
|
||||
use reqwest::Client;
|
||||
|
@ -143,14 +143,18 @@ impl Node for NomosNode {
|
|||
|
||||
async fn spawn_nodes(config: SpawnConfig) -> Vec<Self> {
|
||||
match config {
|
||||
SpawnConfig::Star { n_participants } => {
|
||||
SpawnConfig::Star {
|
||||
n_participants,
|
||||
threshold,
|
||||
timeout,
|
||||
} => {
|
||||
let mut ids = vec![[0; 32]; n_participants];
|
||||
for id in &mut ids {
|
||||
RNG.lock().unwrap().fill(id);
|
||||
}
|
||||
let mut configs = ids
|
||||
.iter()
|
||||
.map(|id| create_node_config(ids.clone(), *id))
|
||||
.map(|id| create_node_config(ids.clone(), *id, threshold, timeout))
|
||||
.collect::<Vec<_>>();
|
||||
let mut nodes = vec![Self::spawn(configs.swap_remove(0)).await];
|
||||
let listening_addr = nodes[0].get_listening_address().await;
|
||||
|
@ -180,7 +184,12 @@ impl Node for NomosNode {
|
|||
}
|
||||
}
|
||||
|
||||
fn create_node_config(nodes: Vec<[u8; 32]>, private_key: [u8; 32]) -> Config {
|
||||
fn create_node_config(
|
||||
nodes: Vec<[u8; 32]>,
|
||||
private_key: [u8; 32],
|
||||
threshold: Fraction,
|
||||
timeout: Duration,
|
||||
) -> Config {
|
||||
let mut config = Config {
|
||||
network: NetworkConfig {
|
||||
backend: WakuConfig {
|
||||
|
@ -197,8 +206,9 @@ fn create_node_config(nodes: Vec<[u8; 32]>, private_key: [u8; 32]) -> Config {
|
|||
// By setting the leader_threshold to 1 we ensure that all nodes come
|
||||
// online before progressing. This is only necessary until we add a way
|
||||
// to recover poast blocks from other nodes.
|
||||
leader_super_majority_threshold: Some(Fraction::one()),
|
||||
leader_super_majority_threshold: Some(threshold),
|
||||
},
|
||||
timeout,
|
||||
},
|
||||
log: Default::default(),
|
||||
http: nomos_http::http::HttpServiceSettings {
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
use fraction::{Fraction, One};
|
||||
use futures::stream::{self, StreamExt};
|
||||
use std::collections::HashSet;
|
||||
use std::time::Duration;
|
||||
use tests::{Node, NomosNode, SpawnConfig};
|
||||
|
||||
const TARGET_VIEW: i64 = 20;
|
||||
|
@ -45,12 +47,22 @@ async fn happy_test(nodes: Vec<NomosNode>) {
|
|||
|
||||
#[tokio::test]
|
||||
async fn two_nodes_happy() {
|
||||
let nodes = NomosNode::spawn_nodes(SpawnConfig::Star { n_participants: 2 }).await;
|
||||
let nodes = NomosNode::spawn_nodes(SpawnConfig::Star {
|
||||
n_participants: 2,
|
||||
threshold: Fraction::one(),
|
||||
timeout: Duration::from_secs(10),
|
||||
})
|
||||
.await;
|
||||
happy_test(nodes).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ten_nodes_happy() {
|
||||
let nodes = NomosNode::spawn_nodes(SpawnConfig::Star { n_participants: 10 }).await;
|
||||
let nodes = NomosNode::spawn_nodes(SpawnConfig::Star {
|
||||
n_participants: 10,
|
||||
threshold: Fraction::one(),
|
||||
timeout: Duration::from_secs(10),
|
||||
})
|
||||
.await;
|
||||
happy_test(nodes).await;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
use fraction::Fraction;
|
||||
use futures::stream::{self, StreamExt};
|
||||
use std::collections::HashSet;
|
||||
use tests::{Node, NomosNode, SpawnConfig};
|
||||
|
||||
const TARGET_VIEW: i64 = 20;
|
||||
|
||||
#[tokio::test]
|
||||
async fn ten_nodes_one_down() {
|
||||
let mut nodes = NomosNode::spawn_nodes(SpawnConfig::Star {
|
||||
n_participants: 10,
|
||||
threshold: Fraction::new(9u32, 10u32),
|
||||
timeout: std::time::Duration::from_secs(5),
|
||||
})
|
||||
.await;
|
||||
let mut failed_node = nodes.pop().unwrap();
|
||||
failed_node.stop();
|
||||
let timeout = std::time::Duration::from_secs(120);
|
||||
let timeout = tokio::time::sleep(timeout);
|
||||
tokio::select! {
|
||||
_ = timeout => panic!("timed out waiting for nodes to reach view {}", TARGET_VIEW),
|
||||
_ = async { while stream::iter(&nodes)
|
||||
.any(|n| async move { n.consensus_info().await.current_view < TARGET_VIEW })
|
||||
.await
|
||||
{
|
||||
println!(
|
||||
"waiting... {}",
|
||||
stream::iter(&nodes)
|
||||
.then(|n| async move { format!("{}", n.consensus_info().await.current_view) })
|
||||
.collect::<Vec<_>>()
|
||||
.await
|
||||
.join(" | ")
|
||||
);
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
}
|
||||
} => {}
|
||||
};
|
||||
|
||||
let infos = stream::iter(nodes)
|
||||
.then(|n| async move { n.consensus_info().await })
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
// check that they have the same block
|
||||
let blocks = infos
|
||||
.iter()
|
||||
.map(|i| {
|
||||
i.safe_blocks
|
||||
.values()
|
||||
.find(|b| b.view == TARGET_VIEW)
|
||||
.unwrap()
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
assert_eq!(blocks.len(), 1);
|
||||
}
|
Loading…
Reference in New Issue