testing: allow dynamic node start with peers

This commit is contained in:
andrussal 2026-01-15 04:34:06 +01:00
parent 625179b0e9
commit 0fb4eff3c4
6 changed files with 465 additions and 306 deletions

579
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -2,7 +2,9 @@ use std::time::Duration;
use anyhow::Result;
use async_trait::async_trait;
use testing_framework_core::scenario::{Deployer, DynError, RunContext, ScenarioBuilder, Workload};
use testing_framework_core::scenario::{
Deployer, DynError, RunContext, ScenarioBuilder, StartNodeOptions, Workload,
};
use testing_framework_runner_local::LocalDeployer;
use testing_framework_workflows::ScenarioBuilderExt;
use tokio::time::{sleep, timeout};
@ -54,6 +56,55 @@ impl Workload for JoinNodeWorkload {
}
}
struct JoinNodeWithPeersWorkload {
name: String,
peers: Vec<String>,
}
impl JoinNodeWithPeersWorkload {
fn new(name: impl Into<String>, peers: Vec<String>) -> Self {
Self {
name: name.into(),
peers,
}
}
}
#[async_trait]
impl Workload for JoinNodeWithPeersWorkload {
fn name(&self) -> &str {
"dynamic_join_with_peers"
}
async fn start(&self, ctx: &RunContext) -> Result<(), DynError> {
let handle = ctx
.node_control()
.ok_or_else(|| "dynamic join workload requires node control".to_owned())?;
sleep(START_DELAY).await;
let options = StartNodeOptions {
peer_names: self.peers.clone(),
};
let node = handle.start_validator_with(&self.name, options).await?;
let client = node.api;
timeout(READY_TIMEOUT, async {
loop {
match client.consensus_info().await {
Ok(info) if info.height > 0 => break,
Ok(_) | Err(_) => sleep(READY_POLL_INTERVAL).await,
}
}
})
.await
.map_err(|_| "dynamic join node did not become ready in time")?;
sleep(ctx.run_duration()).await;
Ok(())
}
}
#[tokio::test]
#[ignore = "run manually with `cargo test -p runner-examples -- --ignored`"]
async fn dynamic_join_reaches_consensus_liveness() -> Result<()> {
@ -73,3 +124,24 @@ async fn dynamic_join_reaches_consensus_liveness() -> Result<()> {
Ok(())
}
#[tokio::test]
#[ignore = "run manually with `cargo test -p runner-examples -- --ignored`"]
async fn dynamic_join_with_peers_reaches_consensus_liveness() -> Result<()> {
let mut scenario =
ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(0))
.enable_node_control()
.with_workload(JoinNodeWithPeersWorkload::new(
"joiner",
vec!["validator-0".to_string()],
))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(60))
.build()?;
let deployer = LocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
let _handle = runner.run(&mut scenario).await?;
Ok(())
}

View File

@ -22,6 +22,13 @@ pub struct ObservabilityCapability {
pub grafana_url: Option<Url>,
}
/// Options for dynamically starting a node.
#[derive(Clone, Debug, Default)]
pub struct StartNodeOptions {
/// Names of nodes to connect to on startup (implementation-defined).
pub peer_names: Vec<String>,
}
/// Trait implemented by scenario capability markers to signal whether node
/// control is required.
pub trait RequiresNodeControl {
@ -54,6 +61,22 @@ pub trait NodeControlHandle: Send + Sync {
async fn start_executor(&self, _name: &str) -> Result<StartedNode, DynError> {
Err("start_executor not supported by this deployer".into())
}
async fn start_validator_with(
&self,
_name: &str,
_options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
Err("start_validator_with not supported by this deployer".into())
}
async fn start_executor_with(
&self,
_name: &str,
_options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
Err("start_executor_with not supported by this deployer".into())
}
}
#[derive(Clone)]

View File

@ -13,7 +13,7 @@ pub type DynError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub use capabilities::{
NodeControlCapability, NodeControlHandle, ObservabilityCapability, RequiresNodeControl,
StartedNode,
StartNodeOptions, StartedNode,
};
pub use definition::{
Builder, Scenario, ScenarioBuildError, ScenarioBuilder, TopologyConfigurator,

View File

@ -14,6 +14,7 @@ workspace = true
[dependencies]
async-trait = "0.1"
nomos-libp2p = { workspace = true }
nomos-utils = { workspace = true }
rand = { workspace = true }
testing-framework-config = { workspace = true }

View File

@ -1,6 +1,10 @@
use std::sync::{Arc, Mutex};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use async_trait::async_trait;
use nomos_libp2p::Multiaddr;
use nomos_utils::net::get_available_udp_port;
use rand::Rng as _;
use testing_framework_config::topology::configs::{
@ -9,11 +13,12 @@ use testing_framework_config::topology::configs::{
time,
};
use testing_framework_core::{
node_address_from_port,
nodes::{ApiClient, executor::Executor, validator::Validator},
scenario::{
BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, NodeControlCapability,
NodeControlHandle, RunContext, Runner, Scenario, ScenarioError, StartedNode,
spawn_block_feed,
NodeControlHandle, RunContext, Runner, Scenario, ScenarioError, StartNodeOptions,
StartedNode, spawn_block_feed,
},
topology::{
deployment::{SpawnTopologyError, Topology},
@ -213,6 +218,7 @@ struct LocalNodeControlState {
validator_count: usize,
executor_count: usize,
peer_ports: Vec<u16>,
peer_ports_by_name: HashMap<String, u16>,
validators: Vec<Validator>,
executors: Vec<Executor>,
}
@ -228,11 +234,29 @@ impl NodeControlHandle for LocalNodeControl {
}
async fn start_validator(&self, name: &str) -> Result<StartedNode, DynError> {
self.start_node(NodeRole::Validator, name).await
self.start_node(NodeRole::Validator, name, StartNodeOptions::default())
.await
}
async fn start_executor(&self, name: &str) -> Result<StartedNode, DynError> {
self.start_node(NodeRole::Executor, name).await
self.start_node(NodeRole::Executor, name, StartNodeOptions::default())
.await
}
async fn start_validator_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
self.start_node(NodeRole::Validator, name, options).await
}
async fn start_executor_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
self.start_node(NodeRole::Executor, name, options).await
}
}
@ -252,10 +276,23 @@ impl LocalNodeControl {
.map(|node| node.network_port())
.collect::<Vec<_>>();
let peer_ports_by_name = descriptors
.validators()
.iter()
.map(|node| (format!("validator-{}", node.index()), node.network_port()))
.chain(
descriptors
.executors()
.iter()
.map(|node| (format!("executor-{}", node.index()), node.network_port())),
)
.collect();
let state = LocalNodeControlState {
validator_count: descriptors.validators().len(),
executor_count: descriptors.executors().len(),
peer_ports,
peer_ports_by_name,
validators: Vec::new(),
executors: Vec::new(),
};
@ -269,8 +306,13 @@ impl LocalNodeControl {
}
}
async fn start_node(&self, role: NodeRole, name: &str) -> Result<StartedNode, DynError> {
let (peer_ports, node_name) = {
async fn start_node(
&self,
role: NodeRole,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
let (peer_ports, peer_ports_by_name, node_name) = {
let state = self.state.lock().expect("local node control lock poisoned");
let index = match role {
NodeRole::Validator => state.validator_count,
@ -288,7 +330,15 @@ impl LocalNodeControl {
format!("{role_label}-{name}")
};
(state.peer_ports.clone(), label)
if state.peer_ports_by_name.contains_key(&label) {
return Err(format!("node name '{label}' already exists").into());
}
(
state.peer_ports.clone(),
state.peer_ports_by_name.clone(),
label,
)
};
let id = random_node_id();
@ -297,7 +347,11 @@ impl LocalNodeControl {
let blend_port = allocate_udp_port("Blend port")?;
let topology = self.descriptors.config();
let initial_peers = build_initial_peers(&topology.network_params, &peer_ports);
let initial_peers = if options.peer_names.is_empty() {
build_initial_peers(&topology.network_params, &peer_ports)
} else {
resolve_peer_names(&peer_ports_by_name, &options.peer_names)?
};
let general_config = build_general_config_for_node(
id,
@ -326,6 +380,9 @@ impl LocalNodeControl {
let mut state = self.state.lock().expect("local node control lock poisoned");
state.peer_ports.push(network_port);
state
.peer_ports_by_name
.insert(node_name.clone(), network_port);
state.validator_count += 1;
state.validators.push(node);
@ -343,6 +400,9 @@ impl LocalNodeControl {
let mut state = self.state.lock().expect("local node control lock poisoned");
state.peer_ports.push(network_port);
state
.peer_ports_by_name
.insert(node_name.clone(), network_port);
state.executor_count += 1;
state.executors.push(node);
@ -358,6 +418,20 @@ impl LocalNodeControl {
}
}
fn resolve_peer_names(
peer_ports_by_name: &HashMap<String, u16>,
peer_names: &[String],
) -> Result<Vec<Multiaddr>, DynError> {
let mut peers = Vec::with_capacity(peer_names.len());
for name in peer_names {
let port = peer_ports_by_name
.get(name)
.ok_or_else(|| format!("unknown peer name '{name}'"))?;
peers.push(node_address_from_port(*port));
}
Ok(peers)
}
fn random_node_id() -> [u8; 32] {
let mut id = [0u8; 32];
rand::thread_rng().fill(&mut id);