testing: add dynamic node control and config helpers

This commit is contained in:
andrussal 2026-01-14 12:44:31 +01:00
parent 28b788eaac
commit 625179b0e9
23 changed files with 603 additions and 69 deletions

1
.gitignore vendored
View File

@ -21,3 +21,4 @@ testing-framework/assets/stack/kzgrs_test_params/
# Local test artifacts (kept when NOMOS_TESTS_KEEP_LOGS=1)
tests/workflows/.tmp*
tests/workflows/.tmp*/
examples/.tmp*/

4
Cargo.lock generated
View File

@ -6188,6 +6188,7 @@ name = "runner-examples"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"testing-framework-core",
"testing-framework-runner-compose",
"testing-framework-runner-k8s",
@ -7091,6 +7092,9 @@ name = "testing-framework-runner-local"
version = "0.1.0"
dependencies = [
"async-trait",
"logos-blockchain-utils",
"rand 0.8.5",
"testing-framework-config",
"testing-framework-core",
"thiserror 2.0.18",
"tracing",

View File

@ -20,5 +20,8 @@ tokio = { workspace = true, features = ["macros", "ne
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
[dev-dependencies]
async-trait = { workspace = true }
[lints]
workspace = true

View File

@ -18,16 +18,13 @@ impl Expectation for ReachabilityExpectation {
}
async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> {
let client = ctx
.node_clients()
.validator_clients()
.get(self.target_idx)
.ok_or_else(|| {
Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"missing target client",
)) as DynError
})?;
let validators = ctx.node_clients().validator_clients();
let client = validators.get(self.target_idx).ok_or_else(|| {
Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"missing target client",
)) as DynError
})?;
client
.consensus_info()

View File

@ -43,16 +43,13 @@ impl Workload for ReachabilityWorkload {
}
async fn start(&self, ctx: &RunContext) -> Result<(), DynError> {
let client = ctx
.node_clients()
.validator_clients()
.get(self.target_idx)
.ok_or_else(|| {
Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"missing target client",
)) as DynError
})?;
let validators = ctx.node_clients().validator_clients();
let client = validators.get(self.target_idx).ok_or_else(|| {
Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"missing target client",
)) as DynError
})?;
// Lightweight API call to prove reachability.
client

View File

@ -0,0 +1,75 @@
use std::time::Duration;
use anyhow::Result;
use async_trait::async_trait;
use testing_framework_core::scenario::{Deployer, DynError, RunContext, ScenarioBuilder, Workload};
use testing_framework_runner_local::LocalDeployer;
use testing_framework_workflows::ScenarioBuilderExt;
use tokio::time::{sleep, timeout};
use tracing_subscriber::fmt::try_init;
const START_DELAY: Duration = Duration::from_secs(5);
const READY_TIMEOUT: Duration = Duration::from_secs(60);
const READY_POLL_INTERVAL: Duration = Duration::from_secs(2);
struct JoinNodeWorkload {
name: String,
}
impl JoinNodeWorkload {
fn new(name: impl Into<String>) -> Self {
Self { name: name.into() }
}
}
#[async_trait]
impl Workload for JoinNodeWorkload {
fn name(&self) -> &str {
"dynamic_join"
}
async fn start(&self, ctx: &RunContext) -> Result<(), DynError> {
let handle = ctx
.node_control()
.ok_or_else(|| "dynamic join workload requires node control".to_owned())?;
sleep(START_DELAY).await;
let node = handle.start_validator(&self.name).await?;
let client = node.api;
timeout(READY_TIMEOUT, async {
loop {
match client.consensus_info().await {
Ok(info) if info.height > 0 => break,
Ok(_) | Err(_) => sleep(READY_POLL_INTERVAL).await,
}
}
})
.await
.map_err(|_| "dynamic join node did not become ready in time")?;
sleep(ctx.run_duration()).await;
Ok(())
}
}
#[tokio::test]
#[ignore = "run manually with `cargo test -p runner-examples -- --ignored`"]
async fn dynamic_join_reaches_consensus_liveness() -> Result<()> {
let _ = try_init();
let mut scenario =
ScenarioBuilder::topology_with(|t| t.network_star().validators(2).executors(0))
.enable_node_control()
.with_workload(JoinNodeWorkload::new("joiner"))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(60))
.build()?;
let deployer = LocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
let _handle = runner.run(&mut scenario).await?;
Ok(())
}

View File

@ -5,6 +5,7 @@ pub mod bootstrap;
pub mod consensus;
pub mod da;
pub mod network;
pub mod runtime;
pub mod time;
pub mod tracing;
pub mod wallet;

View File

@ -108,6 +108,36 @@ pub fn create_network_configs(
.collect())
}
pub fn build_network_config_for_node(
id: [u8; 32],
port: u16,
initial_peers: Vec<Multiaddr>,
) -> Result<GeneralNetworkConfig, NetworkConfigError> {
let mut node_key_bytes = id;
let node_key = ed25519::SecretKey::try_from_bytes(&mut node_key_bytes).map_err(|err| {
NetworkConfigError::NodeKeyFromBytes {
message: err.to_string(),
}
})?;
let swarm_config = SwarmConfig {
node_key,
port,
chain_sync_config: cryptarchia_sync::Config {
peer_response_timeout: PEER_RESPONSE_TIMEOUT,
},
nat_config: nat_settings(port)?,
..default_swarm_config()
};
Ok(GeneralNetworkConfig {
backend: BackendSettings {
initial_peers,
swarm: swarm_config,
},
})
}
fn initial_peers_by_network_layout(
swarm_configs: &[SwarmConfig],
network_params: &NetworkParams,

View File

@ -0,0 +1,147 @@
use std::collections::HashMap;
use key_management_system_service::{backend::preload::PreloadKMSBackendSettings, keys::Key};
use nomos_libp2p::Multiaddr;
use crate::{
node_address_from_port,
nodes::kms::key_id_for_preload_backend,
topology::configs::{
GeneralConfig, GeneralConfigError, api, blend, bootstrap, consensus,
consensus::{ConsensusParams, GeneralConsensusConfig},
da,
da::DaParams,
network,
network::{Libp2pNetworkLayout, NetworkParams},
time, tracing,
wallet::WalletConfig,
},
};
pub fn build_general_config_for_node(
id: [u8; 32],
network_port: u16,
initial_peers: Vec<Multiaddr>,
da_port: u16,
blend_port: u16,
consensus_params: &ConsensusParams,
da_params: &DaParams,
wallet_config: &WalletConfig,
base_consensus: &GeneralConsensusConfig,
time_config: &time::GeneralTimeConfig,
) -> Result<GeneralConfig, GeneralConfigError> {
let consensus_config =
build_consensus_config_for_node(id, consensus_params, wallet_config, base_consensus)?;
let bootstrap_config =
bootstrap::create_bootstrap_configs(&[id], bootstrap::SHORT_PROLONGED_BOOTSTRAP_PERIOD)
.into_iter()
.next()
.ok_or(GeneralConfigError::EmptyParticipants)?;
let da_config = da::try_create_da_configs(&[id], da_params, &[da_port])?
.into_iter()
.next()
.ok_or(GeneralConfigError::EmptyParticipants)?;
let blend_config = blend::create_blend_configs(&[id], &[blend_port])
.into_iter()
.next()
.ok_or(GeneralConfigError::EmptyParticipants)?;
let network_config = network::build_network_config_for_node(id, network_port, initial_peers)?;
let api_config = api::create_api_configs(&[id])?
.into_iter()
.next()
.ok_or(GeneralConfigError::EmptyParticipants)?;
let tracing_config = tracing::create_tracing_configs(&[id])
.into_iter()
.next()
.ok_or(GeneralConfigError::EmptyParticipants)?;
let kms_config = build_kms_config_for_node(&blend_config, &da_config, wallet_config);
Ok(GeneralConfig {
consensus_config,
bootstrapping_config: bootstrap_config,
da_config,
network_config,
blend_config,
api_config,
tracing_config,
time_config: time_config.clone(),
kms_config,
})
}
pub fn build_consensus_config_for_node(
id: [u8; 32],
consensus_params: &ConsensusParams,
wallet_config: &WalletConfig,
base: &GeneralConsensusConfig,
) -> Result<GeneralConsensusConfig, GeneralConfigError> {
let mut config = consensus::create_consensus_configs(&[id], consensus_params, wallet_config)?
.into_iter()
.next()
.ok_or(GeneralConfigError::EmptyParticipants)?;
config.genesis_tx = base.genesis_tx.clone();
config.utxos = base.utxos.clone();
config.da_notes = base.da_notes.clone();
config.blend_notes = base.blend_notes.clone();
config.wallet_accounts = base.wallet_accounts.clone();
Ok(config)
}
pub fn build_initial_peers(network_params: &NetworkParams, peer_ports: &[u16]) -> Vec<Multiaddr> {
match network_params.libp2p_network_layout {
Libp2pNetworkLayout::Star => peer_ports
.first()
.map(|port| vec![node_address_from_port(*port)])
.unwrap_or_default(),
Libp2pNetworkLayout::Chain => peer_ports
.last()
.map(|port| vec![node_address_from_port(*port)])
.unwrap_or_default(),
Libp2pNetworkLayout::Full => peer_ports
.iter()
.map(|port| node_address_from_port(*port))
.collect(),
}
}
fn build_kms_config_for_node(
blend_config: &blend::GeneralBlendConfig,
da_config: &da::GeneralDaConfig,
wallet_config: &WalletConfig,
) -> PreloadKMSBackendSettings {
let mut keys = HashMap::from([
(
key_id_for_preload_backend(&Key::Ed25519(blend_config.signer.clone())),
Key::Ed25519(blend_config.signer.clone()),
),
(
key_id_for_preload_backend(&Key::Zk(blend_config.secret_zk_key.clone())),
Key::Zk(blend_config.secret_zk_key.clone()),
),
(
key_id_for_preload_backend(&Key::Ed25519(da_config.signer.clone())),
Key::Ed25519(da_config.signer.clone()),
),
(
key_id_for_preload_backend(&Key::Zk(da_config.secret_zk_key.clone())),
Key::Zk(da_config.secret_zk_key.clone()),
),
]);
for account in &wallet_config.accounts {
let key = Key::Zk(account.secret_key.clone());
let key_id = key_id_for_preload_backend(&key);
keys.entry(key_id).or_insert(key);
}
PreloadKMSBackendSettings { keys }
}

View File

@ -60,10 +60,11 @@ impl Drop for Executor {
}
impl Executor {
pub async fn spawn(config: Config) -> Result<Self, SpawnNodeError> {
pub async fn spawn(config: Config, label: &str) -> Result<Self, SpawnNodeError> {
let log_prefix = format!("{LOGS_PREFIX}-{label}");
let handle = spawn_node(
config,
LOGS_PREFIX,
&log_prefix,
"executor.yaml",
binary_path(),
!*IS_DEBUG_TRACING,

View File

@ -72,10 +72,11 @@ impl Validator {
self.handle.wait_for_exit(timeout).await
}
pub async fn spawn(config: Config) -> Result<Self, SpawnNodeError> {
pub async fn spawn(config: Config, label: &str) -> Result<Self, SpawnNodeError> {
let log_prefix = format!("{LOGS_PREFIX}-{label}");
let handle = spawn_node(
config,
LOGS_PREFIX,
&log_prefix,
"validator.yaml",
binary_path(),
!*IS_DEBUG_TRACING,

View File

@ -2,6 +2,7 @@ use async_trait::async_trait;
use reqwest::Url;
use super::DynError;
use crate::{nodes::ApiClient, topology::generation::NodeRole};
/// Marker type used by scenario builders to request node control support.
#[derive(Clone, Copy, Debug, Default)]
@ -45,4 +46,19 @@ pub trait NodeControlHandle: Send + Sync {
async fn restart_validator(&self, index: usize) -> Result<(), DynError>;
async fn restart_executor(&self, index: usize) -> Result<(), DynError>;
async fn start_validator(&self, _name: &str) -> Result<StartedNode, DynError> {
Err("start_validator not supported by this deployer".into())
}
async fn start_executor(&self, _name: &str) -> Result<StartedNode, DynError> {
Err("start_executor not supported by this deployer".into())
}
}
#[derive(Clone)]
pub struct StartedNode {
pub name: String,
pub role: NodeRole,
pub api: ApiClient,
}

View File

@ -13,6 +13,7 @@ pub type DynError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub use capabilities::{
NodeControlCapability, NodeControlHandle, ObservabilityCapability, RequiresNodeControl,
StartedNode,
};
pub use definition::{
Builder, Scenario, ScenarioBuildError, ScenarioBuilder, TopologyConfigurator,

View File

@ -63,7 +63,7 @@ impl RunContext {
}
#[must_use]
pub fn random_node_client(&self) -> Option<&ApiClient> {
pub fn random_node_client(&self) -> Option<ApiClient> {
self.node_clients.any_client()
}

View File

@ -1,4 +1,7 @@
use std::pin::Pin;
use std::{
pin::Pin,
sync::{Arc, RwLock},
};
use rand::{Rng as _, seq::SliceRandom as _, thread_rng};
@ -11,6 +14,11 @@ use crate::{
/// Collection of API clients for the validator and executor set.
#[derive(Clone, Default)]
pub struct NodeClients {
inner: Arc<RwLock<NodeClientsInner>>,
}
#[derive(Default)]
struct NodeClientsInner {
validators: Vec<ApiClient>,
executors: Vec<ApiClient>,
}
@ -18,10 +26,12 @@ pub struct NodeClients {
impl NodeClients {
#[must_use]
/// Build clients from preconstructed vectors.
pub const fn new(validators: Vec<ApiClient>, executors: Vec<ApiClient>) -> Self {
pub fn new(validators: Vec<ApiClient>, executors: Vec<ApiClient>) -> Self {
Self {
validators,
executors,
inner: Arc::new(RwLock::new(NodeClientsInner {
validators,
executors,
})),
}
}
@ -43,48 +53,65 @@ impl NodeClients {
#[must_use]
/// Validator API clients.
pub fn validator_clients(&self) -> &[ApiClient] {
&self.validators
pub fn validator_clients(&self) -> Vec<ApiClient> {
self.inner
.read()
.expect("node clients lock poisoned")
.validators
.clone()
}
#[must_use]
/// Executor API clients.
pub fn executor_clients(&self) -> &[ApiClient] {
&self.executors
pub fn executor_clients(&self) -> Vec<ApiClient> {
self.inner
.read()
.expect("node clients lock poisoned")
.executors
.clone()
}
#[must_use]
/// Choose a random validator client if present.
pub fn random_validator(&self) -> Option<&ApiClient> {
if self.validators.is_empty() {
pub fn random_validator(&self) -> Option<ApiClient> {
let validators = self.validator_clients();
if validators.is_empty() {
return None;
}
let mut rng = thread_rng();
let idx = rng.gen_range(0..self.validators.len());
self.validators.get(idx)
let idx = rng.gen_range(0..validators.len());
validators.get(idx).cloned()
}
#[must_use]
/// Choose a random executor client if present.
pub fn random_executor(&self) -> Option<&ApiClient> {
if self.executors.is_empty() {
pub fn random_executor(&self) -> Option<ApiClient> {
let executors = self.executor_clients();
if executors.is_empty() {
return None;
}
let mut rng = thread_rng();
let idx = rng.gen_range(0..self.executors.len());
self.executors.get(idx)
let idx = rng.gen_range(0..executors.len());
executors.get(idx).cloned()
}
/// Iterator over all clients.
pub fn all_clients(&self) -> impl Iterator<Item = &ApiClient> {
self.validators.iter().chain(self.executors.iter())
pub fn all_clients(&self) -> Vec<ApiClient> {
let guard = self.inner.read().expect("node clients lock poisoned");
guard
.validators
.iter()
.chain(guard.executors.iter())
.cloned()
.collect()
}
#[must_use]
/// Choose any random client from validators+executors.
pub fn any_client(&self) -> Option<&ApiClient> {
let validator_count = self.validators.len();
let executor_count = self.executors.len();
pub fn any_client(&self) -> Option<ApiClient> {
let guard = self.inner.read().expect("node clients lock poisoned");
let validator_count = guard.validators.len();
let executor_count = guard.executors.len();
let total = validator_count + executor_count;
if total == 0 {
return None;
@ -92,9 +119,9 @@ impl NodeClients {
let mut rng = thread_rng();
let choice = rng.gen_range(0..total);
if choice < validator_count {
self.validators.get(choice)
guard.validators.get(choice).cloned()
} else {
self.executors.get(choice - validator_count)
guard.executors.get(choice - validator_count).cloned()
}
}
@ -103,6 +130,16 @@ impl NodeClients {
pub const fn cluster_client(&self) -> ClusterClient<'_> {
ClusterClient::new(self)
}
pub fn add_validator(&self, client: ApiClient) {
let mut guard = self.inner.write().expect("node clients lock poisoned");
guard.validators.push(client);
}
pub fn add_executor(&self, client: ApiClient) {
let mut guard = self.inner.write().expect("node clients lock poisoned");
guard.executors.push(client);
}
}
pub struct ClusterClient<'a> {
@ -127,7 +164,7 @@ impl<'a> ClusterClient<'a> {
where
E: Into<DynError>,
{
let mut clients: Vec<&ApiClient> = self.node_clients.all_clients().collect();
let mut clients = self.node_clients.all_clients();
if clients.is_empty() {
return Err("cluster client has no api clients".into());
}
@ -135,7 +172,7 @@ impl<'a> ClusterClient<'a> {
clients.shuffle(&mut thread_rng());
let mut last_err = None;
for client in clients {
for client in &clients {
match f(client).await {
Ok(value) => return Ok(value),
Err(err) => last_err = Some(err.into()),

View File

@ -91,13 +91,15 @@ impl Topology {
let mut validators = Vec::new();
for i in 0..n_validators {
let config = create_validator_config(config[i].clone());
validators.push(Validator::spawn(config).await?);
let label = format!("validator-{i}");
validators.push(Validator::spawn(config, &label).await?);
}
let mut executors = Vec::new();
for i in 0..n_executors {
let config = create_executor_config(config[n_validators + i].clone());
executors.push(Executor::spawn(config).await?);
let label = format!("executor-{i}");
executors.push(Executor::spawn(config, &label).await?);
}
Ok((validators, executors))

View File

@ -20,7 +20,6 @@ async fn spawn_block_feed_with(
let block_source_client = node_clients
.random_validator()
.cloned()
.ok_or(ComposeRunnerError::BlockFeedMissing)?;
spawn_block_feed(block_source_client)

View File

@ -338,7 +338,8 @@ fn maybe_print_endpoints(
.unwrap_or_else(|| "<disabled>".to_string())
);
for (idx, client) in node_clients.validator_clients().iter().enumerate() {
let validator_clients = node_clients.validator_clients();
for (idx, client) in validator_clients.iter().enumerate() {
println!(
"TESTNET_PPROF validator_{}={}/debug/pprof/profile?seconds=15&format=proto",
idx,
@ -346,7 +347,8 @@ fn maybe_print_endpoints(
);
}
for (idx, client) in node_clients.executor_clients().iter().enumerate() {
let executor_clients = node_clients.executor_clients();
for (idx, client) in executor_clients.iter().enumerate() {
println!(
"TESTNET_PPROF executor_{}={}/debug/pprof/profile?seconds=15&format=proto",
idx,

View File

@ -14,9 +14,9 @@ pub async fn spawn_block_feed_with(
let block_source_client = node_clients
.validator_clients()
.first()
.into_iter()
.next()
.or_else(|| node_clients.any_client())
.cloned()
.ok_or(K8sRunnerError::BlockFeedMissing)?;
info!("starting block feed");

View File

@ -13,7 +13,10 @@ version = "0.1.0"
workspace = true
[dependencies]
async-trait = "0.1"
testing-framework-core = { path = "../../core" }
thiserror = { workspace = true }
tracing = { workspace = true }
async-trait = "0.1"
nomos-utils = { workspace = true }
rand = { workspace = true }
testing-framework-config = { workspace = true }
testing-framework-core = { path = "../../core" }
thiserror = { workspace = true }
tracing = { workspace = true }

View File

@ -1,11 +1,23 @@
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use nomos_utils::net::get_available_udp_port;
use rand::Rng as _;
use testing_framework_config::topology::configs::{
consensus,
runtime::{build_general_config_for_node, build_initial_peers},
time,
};
use testing_framework_core::{
nodes::{ApiClient, executor::Executor, validator::Validator},
scenario::{
BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, RunContext, Runner,
Scenario, ScenarioError, spawn_block_feed,
BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, NodeControlCapability,
NodeControlHandle, RunContext, Runner, Scenario, ScenarioError, StartedNode,
spawn_block_feed,
},
topology::{
deployment::{SpawnTopologyError, Topology},
generation::{GeneratedTopology, NodeRole},
readiness::ReadinessError,
},
};
@ -82,6 +94,43 @@ impl Deployer<()> for LocalDeployer {
}
}
#[async_trait]
impl Deployer<NodeControlCapability> for LocalDeployer {
type Error = LocalDeployerError;
async fn deploy(
&self,
scenario: &Scenario<NodeControlCapability>,
) -> Result<Runner, Self::Error> {
info!(
validators = scenario.topology().validators().len(),
executors = scenario.topology().executors().len(),
"starting local deployment with node control"
);
let topology = Self::prepare_topology(scenario).await?;
let node_clients = NodeClients::from_topology(scenario.topology(), &topology);
let node_control = Arc::new(LocalNodeControl::new(
scenario.topology().clone(),
node_clients.clone(),
));
let (block_feed, block_feed_guard) = spawn_block_feed_with(&node_clients).await?;
let context = RunContext::new(
scenario.topology().clone(),
Some(topology),
node_clients,
scenario.duration(),
Metrics::empty(),
block_feed,
Some(node_control),
);
Ok(Runner::new(context, Some(Box::new(block_feed_guard))))
}
}
impl LocalDeployer {
#[must_use]
/// Construct a local deployer.
@ -89,7 +138,7 @@ impl LocalDeployer {
Self::default()
}
async fn prepare_topology(scenario: &Scenario<()>) -> Result<Topology, LocalDeployerError> {
async fn prepare_topology<C>(scenario: &Scenario<C>) -> Result<Topology, LocalDeployerError> {
let descriptors = scenario.topology();
info!(
validators = descriptors.validators().len(),
@ -133,7 +182,7 @@ async fn spawn_block_feed_with(
"selecting validator client for local block feed"
);
let Some(block_source_client) = node_clients.random_validator().cloned() else {
let Some(block_source_client) = node_clients.random_validator() else {
return Err(LocalDeployerError::WorkloadFailed {
source: "block feed requires at least one validator".into(),
});
@ -151,3 +200,171 @@ fn workload_error(source: impl Into<DynError>) -> LocalDeployerError {
source: source.into(),
}
}
struct LocalNodeControl {
descriptors: GeneratedTopology,
node_clients: NodeClients,
base_consensus: consensus::GeneralConsensusConfig,
base_time: time::GeneralTimeConfig,
state: Mutex<LocalNodeControlState>,
}
struct LocalNodeControlState {
validator_count: usize,
executor_count: usize,
peer_ports: Vec<u16>,
validators: Vec<Validator>,
executors: Vec<Executor>,
}
#[async_trait]
impl NodeControlHandle for LocalNodeControl {
async fn restart_validator(&self, _index: usize) -> Result<(), DynError> {
Err("local deployer does not support restart_validator".into())
}
async fn restart_executor(&self, _index: usize) -> Result<(), DynError> {
Err("local deployer does not support restart_executor".into())
}
async fn start_validator(&self, name: &str) -> Result<StartedNode, DynError> {
self.start_node(NodeRole::Validator, name).await
}
async fn start_executor(&self, name: &str) -> Result<StartedNode, DynError> {
self.start_node(NodeRole::Executor, name).await
}
}
impl LocalNodeControl {
fn new(descriptors: GeneratedTopology, node_clients: NodeClients) -> Self {
let base_node = descriptors
.validators()
.first()
.or_else(|| descriptors.executors().first())
.expect("generated topology must contain at least one node");
let base_consensus = base_node.general.consensus_config.clone();
let base_time = base_node.general.time_config.clone();
let peer_ports = descriptors
.nodes()
.map(|node| node.network_port())
.collect::<Vec<_>>();
let state = LocalNodeControlState {
validator_count: descriptors.validators().len(),
executor_count: descriptors.executors().len(),
peer_ports,
validators: Vec::new(),
executors: Vec::new(),
};
Self {
descriptors,
node_clients,
base_consensus,
base_time,
state: Mutex::new(state),
}
}
async fn start_node(&self, role: NodeRole, name: &str) -> Result<StartedNode, DynError> {
let (peer_ports, node_name) = {
let state = self.state.lock().expect("local node control lock poisoned");
let index = match role {
NodeRole::Validator => state.validator_count,
NodeRole::Executor => state.executor_count,
};
let role_label = match role {
NodeRole::Validator => "validator",
NodeRole::Executor => "executor",
};
let label = if name.trim().is_empty() {
format!("{role_label}-{index}")
} else {
format!("{role_label}-{name}")
};
(state.peer_ports.clone(), label)
};
let id = random_node_id();
let network_port = allocate_udp_port("network port")?;
let da_port = allocate_udp_port("DA port")?;
let blend_port = allocate_udp_port("Blend port")?;
let topology = self.descriptors.config();
let initial_peers = build_initial_peers(&topology.network_params, &peer_ports);
let general_config = build_general_config_for_node(
id,
network_port,
initial_peers,
da_port,
blend_port,
&topology.consensus_params,
&topology.da_params,
&topology.wallet_config,
&self.base_consensus,
&self.base_time,
)?;
let api_client = match role {
NodeRole::Validator => {
let config = testing_framework_core::nodes::validator::create_validator_config(
general_config,
);
let node = Validator::spawn(config, &node_name).await?;
let client = ApiClient::from_urls(node.url(), node.testing_url());
self.node_clients.add_validator(client.clone());
let mut state = self.state.lock().expect("local node control lock poisoned");
state.peer_ports.push(network_port);
state.validator_count += 1;
state.validators.push(node);
client
}
NodeRole::Executor => {
let config =
testing_framework_core::nodes::executor::create_executor_config(general_config);
let node = Executor::spawn(config, &node_name).await?;
let client = ApiClient::from_urls(node.url(), node.testing_url());
self.node_clients.add_executor(client.clone());
let mut state = self.state.lock().expect("local node control lock poisoned");
state.peer_ports.push(network_port);
state.executor_count += 1;
state.executors.push(node);
client
}
};
Ok(StartedNode {
name: node_name,
role,
api: api_client,
})
}
}
fn random_node_id() -> [u8; 32] {
let mut id = [0u8; 32];
rand::thread_rng().fill(&mut id);
id
}
fn allocate_udp_port(label: &'static str) -> Result<u16, DynError> {
get_available_udp_port()
.ok_or_else(|| format!("failed to allocate free UDP port for {label}").into())
}

View File

@ -90,7 +90,7 @@ impl ConsensusLiveness {
}
fn ensure_participants(ctx: &RunContext) -> Result<(), DynError> {
if ctx.node_clients().all_clients().count() == 0 {
if ctx.node_clients().all_clients().is_empty() {
Err(Box::new(ConsensusLivenessError::MissingParticipants))
} else {
Ok(())
@ -98,7 +98,7 @@ impl ConsensusLiveness {
}
async fn collect_results(ctx: &RunContext) -> LivenessCheck {
let clients: Vec<_> = ctx.node_clients().all_clients().collect();
let clients = ctx.node_clients().all_clients();
let mut samples = Vec::with_capacity(clients.len());
let mut issues = Vec::new();

View File

@ -48,8 +48,8 @@ pub async fn submit_transaction_via_cluster(
);
let node_clients = ctx.node_clients();
let mut validator_clients: Vec<_> = node_clients.validator_clients().iter().collect();
let mut executor_clients: Vec<_> = node_clients.executor_clients().iter().collect();
let mut validator_clients = node_clients.validator_clients();
let mut executor_clients = node_clients.executor_clients();
validator_clients.shuffle(&mut thread_rng());
executor_clients.shuffle(&mut thread_rng());