Add manual cluster support

This commit is contained in:
andrussal 2026-01-19 08:34:17 +01:00
parent 2178e4edf0
commit 13a75e7818
15 changed files with 904 additions and 270 deletions

4
Cargo.lock generated
View File

@ -7088,12 +7088,16 @@ name = "testing-framework-runner-local"
version = "0.1.0"
dependencies = [
"async-trait",
"logos-blockchain-executor",
"logos-blockchain-libp2p",
"logos-blockchain-network-service",
"logos-blockchain-node",
"logos-blockchain-utils",
"rand 0.8.5",
"testing-framework-config",
"testing-framework-core",
"thiserror 2.0.17",
"tokio",
"tracing",
]

View File

@ -3,7 +3,7 @@ use std::time::Duration;
use anyhow::Result;
use async_trait::async_trait;
use testing_framework_core::scenario::{
Deployer, DynError, RunContext, ScenarioBuilder, StartNodeOptions, Workload,
Deployer, DynError, PeerSelection, RunContext, ScenarioBuilder, StartNodeOptions, Workload,
};
use testing_framework_runner_local::LocalDeployer;
use testing_framework_workflows::ScenarioBuilderExt;
@ -84,7 +84,7 @@ impl Workload for JoinNodeWithPeersWorkload {
sleep(START_DELAY).await;
let options = StartNodeOptions {
peer_names: self.peers.clone(),
peers: PeerSelection::Named(self.peers.clone()),
};
let node = handle.start_validator_with(&self.name, options).await?;
let client = node.api;

View File

@ -0,0 +1,78 @@
use std::{env, time::Duration};
use anyhow::Result;
use testing_framework_core::{
scenario::{PeerSelection, StartNodeOptions},
topology::config::TopologyConfig,
};
use testing_framework_runner_local::ManualClusterGuard;
use tokio::time::sleep;
use tracing_subscriber::fmt::try_init;
const MAX_HEIGHT_DIFF: u64 = 5;
#[tokio::test]
#[ignore = "run manually with `cargo test -p runner-examples -- --ignored manual_cluster_two_clusters_merge`"]
async fn manual_cluster_two_clusters_merge() -> Result<()> {
let _ = try_init();
let workspace_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.expect("examples crate should live under the workspace root");
let circuits_dir = workspace_root.join("testing-framework/assets/stack/circuits");
unsafe {
env::set_var("LOGOS_BLOCKCHAIN_CIRCUITS", circuits_dir);
}
// Required env vars (set on the command line when running this test):
// - `POL_PROOF_DEV_MODE=true`
// - `RUST_LOG=info` (optional)
let config = TopologyConfig::with_node_numbers(2, 0);
let cluster = ManualClusterGuard::from_config(config)?;
// Nodes are stopped automatically when the guard is dropped.
println!("starting validator a");
let validator_a = cluster
.start_validator_with(
"a",
StartNodeOptions {
peers: PeerSelection::None,
},
)
.await?
.api;
println!("waiting briefly before starting c");
sleep(Duration::from_secs(30)).await;
println!("starting validator c -> a");
let validator_c = cluster
.start_validator_with(
"c",
StartNodeOptions {
peers: PeerSelection::Named(vec!["validator-a".to_owned()]),
},
)
.await?
.api;
println!("waiting for network readiness: cluster a,c");
cluster.wait_network_ready().await?;
sleep(Duration::from_secs(5)).await;
let a_info = validator_a.consensus_info().await?;
let c_info = validator_c.consensus_info().await?;
let height_diff = a_info.height.abs_diff(c_info.height);
println!(
"final heights: validator-a={}, validator-c={}, diff={}",
a_info.height, c_info.height, height_diff
);
if height_diff > MAX_HEIGHT_DIFF {
return Err(anyhow::anyhow!(
"height diff too large: {height_diff} > {MAX_HEIGHT_DIFF}"
));
}
Ok(())
}

View File

@ -327,10 +327,7 @@ build_bundle::prepare_circuits() {
}
build_bundle::build_binaries() {
FEATURES="testing"
if [ -n "${NOMOS_EXTRA_FEATURES:-}" ]; then
FEATURES="${FEATURES},${NOMOS_EXTRA_FEATURES}"
fi
BUILD_FEATURES_LABEL="all"
echo "==> Building binaries (platform=${PLATFORM})"
mkdir -p "${NODE_SRC}"
(
@ -362,13 +359,13 @@ build_bundle::build_binaries() {
RUSTFLAGS='--cfg feature="pol-dev-mode"' NOMOS_CIRCUITS="${CIRCUITS_DIR}" \
LOGOS_BLOCKCHAIN_CIRCUITS="${CIRCUITS_DIR}" \
RUSTUP_TOOLCHAIN="${BUNDLE_RUSTUP_TOOLCHAIN}" \
cargo build --features "${FEATURES}" \
cargo build --all-features \
-p logos-blockchain-node -p logos-blockchain-executor -p logos-blockchain-cli \
--target-dir "${NODE_TARGET}"
else
RUSTFLAGS='--cfg feature="pol-dev-mode"' NOMOS_CIRCUITS="${CIRCUITS_DIR}" \
LOGOS_BLOCKCHAIN_CIRCUITS="${CIRCUITS_DIR}" \
cargo build --features "${FEATURES}" \
cargo build --all-features \
-p logos-blockchain-node -p logos-blockchain-executor -p logos-blockchain-cli \
--target-dir "${NODE_TARGET}"
fi
@ -392,7 +389,7 @@ build_bundle::package_bundle() {
echo "nomos_node_git_head=$(git -C "${NODE_SRC}" rev-parse HEAD 2>/dev/null || true)"
fi
echo "platform=${PLATFORM}"
echo "features=${FEATURES}"
echo "features=${BUILD_FEATURES_LABEL}"
} > "${bundle_dir}/artifacts/nomos-bundle-meta.env"
mkdir -p "$(dirname "${OUTPUT}")"
@ -405,7 +402,7 @@ build_bundle::package_bundle() {
fi
echo "Bundle created at ${OUTPUT}"
if [[ "${FEATURES}" == *profiling* ]]; then
if [[ "${BUILD_FEATURES_LABEL}" == "all" ]] || [[ "${BUILD_FEATURES_LABEL}" == *profiling* ]]; then
cat <<'EOF_PROF'
Profiling endpoints (enabled by --features profiling):
CPU pprof (SVG): curl "http://<node-host>:8722/debug/pprof/profile?seconds=15&format=svg" -o profile.svg

View File

@ -22,11 +22,30 @@ pub struct ObservabilityCapability {
pub grafana_url: Option<Url>,
}
/// Peer selection strategy for dynamically started nodes.
#[derive(Clone, Debug)]
pub enum PeerSelection {
/// Use the topology default (star/chain/full).
DefaultLayout,
/// Start without any initial peers.
None,
/// Connect to the named peers.
Named(Vec<String>),
}
/// Options for dynamically starting a node.
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug)]
pub struct StartNodeOptions {
/// Names of nodes to connect to on startup (implementation-defined).
pub peer_names: Vec<String>,
/// How to select initial peers on startup.
pub peers: PeerSelection,
}
impl Default for StartNodeOptions {
fn default() -> Self {
Self {
peers: PeerSelection::DefaultLayout,
}
}
}
/// Trait implemented by scenario capability markers to signal whether node
@ -77,6 +96,10 @@ pub trait NodeControlHandle: Send + Sync {
) -> Result<StartedNode, DynError> {
Err("start_executor_with not supported by this deployer".into())
}
fn node_client(&self, _name: &str) -> Option<ApiClient> {
None
}
}
#[derive(Clone)]

View File

@ -12,8 +12,8 @@ mod workload;
pub type DynError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub use capabilities::{
NodeControlCapability, NodeControlHandle, ObservabilityCapability, RequiresNodeControl,
StartNodeOptions, StartedNode,
NodeControlCapability, NodeControlHandle, ObservabilityCapability, PeerSelection,
RequiresNodeControl, StartNodeOptions, StartedNode,
};
pub use definition::{
Builder, Scenario, ScenarioBuildError, ScenarioBuilder, TopologyConfigurator,

View File

@ -140,6 +140,12 @@ impl NodeClients {
let mut guard = self.inner.write().expect("node clients lock poisoned");
guard.executors.push(client);
}
pub fn clear(&self) {
let mut guard = self.inner.write().expect("node clients lock poisoned");
guard.validators.clear();
guard.executors.clear();
}
}
pub struct ClusterClient<'a> {

View File

@ -14,10 +14,14 @@ workspace = true
[dependencies]
async-trait = "0.1"
logos-blockchain-executor = { workspace = true }
nomos-libp2p = { workspace = true }
nomos-network = { workspace = true }
nomos-node = { workspace = true }
nomos-utils = { workspace = true }
rand = { workspace = true }
testing-framework-config = { workspace = true }
testing-framework-core = { path = "../../core" }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

View File

@ -1,3 +1,7 @@
mod manual;
mod node_control;
mod runner;
pub use manual::{ManualCluster, ManualClusterError, ManualClusterGuard};
pub use node_control::{LocalDynamicError, LocalDynamicNodes, LocalDynamicSeed};
pub use runner::{LocalDeployer, LocalDeployerError};

View File

@ -0,0 +1,129 @@
use std::ops::Deref;
use testing_framework_core::{
nodes::ApiClient,
scenario::{StartNodeOptions, StartedNode},
topology::{
config::{TopologyBuildError, TopologyBuilder, TopologyConfig},
readiness::{ReadinessCheck, ReadinessError},
},
};
use thiserror::Error;
use crate::node_control::{LocalDynamicError, LocalDynamicNodes, ReadinessNode};
mod readiness;
use readiness::ManualNetworkReadiness;
#[derive(Debug, Error)]
pub enum ManualClusterError {
#[error("failed to build topology: {source}")]
Build {
#[source]
source: TopologyBuildError,
},
#[error(transparent)]
Dynamic(#[from] LocalDynamicError),
}
/// Imperative, in-process cluster that can start nodes on demand.
pub struct ManualCluster {
nodes: LocalDynamicNodes,
}
pub struct ManualClusterGuard {
cluster: ManualCluster,
}
impl ManualClusterGuard {
pub fn from_config(config: TopologyConfig) -> Result<Self, ManualClusterError> {
Ok(Self {
cluster: ManualCluster::from_config(config)?,
})
}
}
impl Deref for ManualClusterGuard {
type Target = ManualCluster;
fn deref(&self) -> &Self::Target {
&self.cluster
}
}
impl Drop for ManualClusterGuard {
fn drop(&mut self) {
self.cluster.stop_all();
}
}
impl ManualCluster {
pub fn from_config(config: TopologyConfig) -> Result<Self, ManualClusterError> {
let builder = TopologyBuilder::new(config);
let descriptors = builder
.build()
.map_err(|source| ManualClusterError::Build { source })?;
let nodes = LocalDynamicNodes::new(
descriptors,
testing_framework_core::scenario::NodeClients::default(),
);
Ok(Self { nodes })
}
#[must_use]
pub fn node_client(&self, name: &str) -> Option<ApiClient> {
self.nodes.node_client(name)
}
pub async fn start_validator(&self, name: &str) -> Result<StartedNode, ManualClusterError> {
Ok(self
.nodes
.start_validator_with(name, StartNodeOptions::default())
.await?)
}
pub async fn start_executor(&self, name: &str) -> Result<StartedNode, ManualClusterError> {
Ok(self
.nodes
.start_executor_with(name, StartNodeOptions::default())
.await?)
}
pub async fn start_validator_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, ManualClusterError> {
Ok(self.nodes.start_validator_with(name, options).await?)
}
pub async fn start_executor_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, ManualClusterError> {
Ok(self.nodes.start_executor_with(name, options).await?)
}
pub fn stop_all(&self) {
self.nodes.stop_all();
}
pub async fn wait_network_ready(&self) -> Result<(), ReadinessError> {
let nodes = self.nodes.readiness_nodes();
if self.is_singleton(&nodes) {
return Ok(());
}
self.wait_nodes_ready(nodes).await
}
fn is_singleton(&self, nodes: &[ReadinessNode]) -> bool {
nodes.len() <= 1
}
async fn wait_nodes_ready(&self, nodes: Vec<ReadinessNode>) -> Result<(), ReadinessError> {
ManualNetworkReadiness::new(nodes).wait().await
}
}

View File

@ -0,0 +1,74 @@
use std::time::Duration;
use nomos_network::backends::libp2p::Libp2pInfo;
use testing_framework_core::topology::readiness::ReadinessCheck;
use tokio::time::timeout;
use crate::node_control::ReadinessNode;
const NETWORK_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
pub(super) struct ManualNetworkReadiness {
nodes: Vec<ReadinessNode>,
}
impl ManualNetworkReadiness {
pub(super) fn new(nodes: Vec<ReadinessNode>) -> Self {
Self { nodes }
}
}
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for ManualNetworkReadiness {
type Data = Vec<ManualNetworkStatus>;
async fn collect(&'a self) -> Self::Data {
let mut statuses = Vec::with_capacity(self.nodes.len());
for node in &self.nodes {
let result = timeout(NETWORK_REQUEST_TIMEOUT, node.api.network_info())
.await
.map_err(|_| "network_info request timed out".to_owned())
.and_then(|res| res.map_err(|err| err.to_string()));
statuses.push(ManualNetworkStatus {
label: node.label.clone(),
expected_peers: node.expected_peers,
result,
});
}
statuses
}
fn is_ready(&self, data: &Self::Data) -> bool {
data.iter().all(
|status| match (status.expected_peers, status.result.as_ref()) {
(Some(expected), Ok(info)) => info.n_peers >= expected,
_ => false,
},
)
}
fn timeout_message(&self, data: Self::Data) -> String {
let summary = data
.into_iter()
.map(|entry| match entry.result {
Ok(info) => format!(
"{} (peers {}/{})",
entry.label,
info.n_peers,
entry.expected_peers.unwrap_or(0)
),
Err(err) => format!("{} (error: {err})", entry.label),
})
.collect::<Vec<_>>()
.join(", ");
format!("timed out waiting for network readiness: {summary}")
}
}
pub(super) struct ManualNetworkStatus {
label: String,
expected_peers: Option<usize>,
result: Result<Libp2pInfo, String>,
}

View File

@ -0,0 +1,130 @@
use std::collections::HashMap;
use nomos_libp2p::Multiaddr;
use nomos_utils::net::get_available_udp_port;
use rand::Rng as _;
use testing_framework_config::topology::configs::{
consensus,
runtime::{build_general_config_for_node, build_initial_peers},
time::GeneralTimeConfig,
};
use testing_framework_core::{
scenario::{PeerSelection, StartNodeOptions},
topology::{
config::TopologyConfig,
configs::GeneralConfig,
generation::{GeneratedNodeConfig, GeneratedTopology, NodeRole},
},
};
use super::LocalDynamicError;
pub(super) fn build_general_config_for(
descriptors: &GeneratedTopology,
base_consensus: &consensus::GeneralConsensusConfig,
base_time: &GeneralTimeConfig,
role: NodeRole,
index: usize,
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions,
peer_ports: &[u16],
) -> Result<(GeneralConfig, u16), LocalDynamicError> {
if let Some(node) = descriptor_for(descriptors, role, index) {
let mut config = node.general.clone();
let initial_peers = resolve_initial_peers(
peer_ports_by_name,
options,
&config.network_config.backend.initial_peers,
descriptors,
peer_ports,
)?;
config.network_config.backend.initial_peers = initial_peers;
return Ok((config, node.network_port()));
}
let id = random_node_id();
let network_port = allocate_udp_port("network port")?;
let da_port = allocate_udp_port("DA port")?;
let blend_port = allocate_udp_port("Blend port")?;
let topology = descriptors.config();
let initial_peers =
resolve_initial_peers(peer_ports_by_name, options, &[], descriptors, peer_ports)?;
let general_config = build_general_config_for_node(
id,
network_port,
initial_peers,
da_port,
blend_port,
&topology.consensus_params,
&topology.da_params,
&topology.wallet_config,
base_consensus,
base_time,
)
.map_err(|source| LocalDynamicError::Config { source })?;
Ok((general_config, network_port))
}
fn descriptor_for(
descriptors: &GeneratedTopology,
role: NodeRole,
index: usize,
) -> Option<&GeneratedNodeConfig> {
match role {
NodeRole::Validator => descriptors.validators().get(index),
NodeRole::Executor => descriptors.executors().get(index),
}
}
fn resolve_peer_names(
peer_ports_by_name: &HashMap<String, u16>,
peer_names: &[String],
) -> Result<Vec<Multiaddr>, LocalDynamicError> {
let mut peers = Vec::with_capacity(peer_names.len());
for name in peer_names {
let port =
peer_ports_by_name
.get(name)
.ok_or_else(|| LocalDynamicError::InvalidArgument {
message: format!("unknown peer name '{name}'"),
})?;
peers.push(testing_framework_config::node_address_from_port(*port));
}
Ok(peers)
}
fn resolve_initial_peers(
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions,
default_peers: &[Multiaddr],
descriptors: &GeneratedTopology,
peer_ports: &[u16],
) -> Result<Vec<Multiaddr>, LocalDynamicError> {
match &options.peers {
PeerSelection::Named(names) => resolve_peer_names(peer_ports_by_name, names),
PeerSelection::DefaultLayout => {
if !default_peers.is_empty() {
Ok(default_peers.to_vec())
} else {
let topology: &TopologyConfig = descriptors.config();
Ok(build_initial_peers(&topology.network_params, peer_ports))
}
}
PeerSelection::None => Ok(Vec::new()),
}
}
fn random_node_id() -> [u8; 32] {
let mut id = [0u8; 32];
rand::thread_rng().fill(&mut id);
id
}
fn allocate_udp_port(label: &'static str) -> Result<u16, LocalDynamicError> {
get_available_udp_port().ok_or_else(|| LocalDynamicError::PortAllocation {
message: format!("failed to allocate free UDP port for {label}"),
})
}

View File

@ -0,0 +1,389 @@
use std::{
collections::{HashMap, HashSet},
sync::Mutex,
};
use logos_blockchain_executor::config::Config as ExecutorConfig;
use nomos_node::Config as ValidatorConfig;
use testing_framework_config::topology::configs::{consensus, time};
use testing_framework_core::{
nodes::{
ApiClient,
executor::{Executor, create_executor_config},
validator::{Validator, create_validator_config},
},
scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode},
topology::{
generation::{GeneratedTopology, NodeRole, find_expected_peer_counts},
utils::multiaddr_port,
},
};
use thiserror::Error;
mod config;
mod state;
use config::build_general_config_for;
use state::LocalDynamicState;
use testing_framework_core::scenario::NodeClients;
#[derive(Debug, Error)]
pub enum LocalDynamicError {
#[error("failed to generate node config: {source}")]
Config {
#[source]
source: testing_framework_config::topology::configs::GeneralConfigError,
},
#[error("failed to spawn node: {source}")]
Spawn {
#[source]
source: testing_framework_core::nodes::common::node::SpawnNodeError,
},
#[error("{message}")]
InvalidArgument { message: String },
#[error("{message}")]
PortAllocation { message: String },
}
pub struct LocalDynamicNodes {
descriptors: GeneratedTopology,
base_consensus: consensus::GeneralConsensusConfig,
base_time: time::GeneralTimeConfig,
node_clients: NodeClients,
seed: LocalDynamicSeed,
state: Mutex<LocalDynamicState>,
}
#[derive(Clone, Default)]
pub struct LocalDynamicSeed {
pub validator_count: usize,
pub executor_count: usize,
pub peer_ports: Vec<u16>,
pub peer_ports_by_name: HashMap<String, u16>,
}
impl LocalDynamicSeed {
#[must_use]
pub fn from_topology(descriptors: &GeneratedTopology) -> Self {
let peer_ports = descriptors
.nodes()
.map(|node| node.network_port())
.collect::<Vec<_>>();
let peer_ports_by_name = descriptors
.validators()
.iter()
.map(|node| (format!("validator-{}", node.index()), node.network_port()))
.chain(
descriptors
.executors()
.iter()
.map(|node| (format!("executor-{}", node.index()), node.network_port())),
)
.collect();
Self {
validator_count: descriptors.validators().len(),
executor_count: descriptors.executors().len(),
peer_ports,
peer_ports_by_name,
}
}
}
pub(crate) struct ReadinessNode {
pub(crate) label: String,
pub(crate) expected_peers: Option<usize>,
pub(crate) api: ApiClient,
}
impl LocalDynamicNodes {
pub fn new(descriptors: GeneratedTopology, node_clients: NodeClients) -> Self {
Self::new_with_seed(descriptors, node_clients, LocalDynamicSeed::default())
}
pub fn new_with_seed(
descriptors: GeneratedTopology,
node_clients: NodeClients,
seed: LocalDynamicSeed,
) -> Self {
let base_node = descriptors
.validators()
.first()
.or_else(|| descriptors.executors().first())
.expect("generated topology must include at least one node");
let base_consensus = base_node.general.consensus_config.clone();
let base_time = base_node.general.time_config.clone();
let state = LocalDynamicState {
validator_count: seed.validator_count,
executor_count: seed.executor_count,
peer_ports: seed.peer_ports.clone(),
peer_ports_by_name: seed.peer_ports_by_name.clone(),
clients_by_name: HashMap::new(),
validators: Vec::new(),
executors: Vec::new(),
};
Self {
descriptors,
base_consensus,
base_time,
node_clients,
seed,
state: Mutex::new(state),
}
}
#[must_use]
pub fn node_client(&self, name: &str) -> Option<ApiClient> {
let state = self.state.lock().expect("local dynamic lock poisoned");
state.clients_by_name.get(name).cloned()
}
pub fn stop_all(&self) {
let mut state = self.state.lock().expect("local dynamic lock poisoned");
state.validators.clear();
state.executors.clear();
state.peer_ports.clone_from(&self.seed.peer_ports);
state
.peer_ports_by_name
.clone_from(&self.seed.peer_ports_by_name);
state.clients_by_name.clear();
state.validator_count = self.seed.validator_count;
state.executor_count = self.seed.executor_count;
self.node_clients.clear();
}
pub async fn start_validator_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, LocalDynamicError> {
self.start_node(NodeRole::Validator, name, options).await
}
pub async fn start_executor_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, LocalDynamicError> {
self.start_node(NodeRole::Executor, name, options).await
}
pub(crate) fn readiness_nodes(&self) -> Vec<ReadinessNode> {
let state = self.state.lock().expect("local dynamic lock poisoned");
let listen_ports = state
.validators
.iter()
.map(|node| node.config().network.backend.swarm.port)
.chain(
state
.executors
.iter()
.map(|node| node.config().network.backend.swarm.port),
)
.collect::<Vec<_>>();
let initial_peer_ports = state
.validators
.iter()
.map(|node| {
node.config()
.network
.backend
.initial_peers
.iter()
.filter_map(multiaddr_port)
.collect::<HashSet<u16>>()
})
.chain(state.executors.iter().map(|node| {
node.config()
.network
.backend
.initial_peers
.iter()
.filter_map(multiaddr_port)
.collect::<HashSet<u16>>()
}))
.collect::<Vec<_>>();
let expected_peer_counts = find_expected_peer_counts(&listen_ports, &initial_peer_ports);
state
.validators
.iter()
.enumerate()
.map(|(idx, node)| ReadinessNode {
label: format!(
"validator#{idx}@{}",
node.config().network.backend.swarm.port
),
expected_peers: expected_peer_counts.get(idx).copied(),
api: node.api().clone(),
})
.chain(state.executors.iter().enumerate().map(|(idx, node)| {
let global_idx = state.validators.len() + idx;
ReadinessNode {
label: format!(
"executor#{idx}@{}",
node.config().network.backend.swarm.port
),
expected_peers: expected_peer_counts.get(global_idx).copied(),
api: node.api().clone(),
}
}))
.collect::<Vec<_>>()
}
async fn start_node(
&self,
role: NodeRole,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, LocalDynamicError> {
let (peer_ports, peer_ports_by_name, node_name, index) = {
let state = self.state.lock().expect("local dynamic lock poisoned");
let (index, role_label) = match role {
NodeRole::Validator => (state.validator_count, "validator"),
NodeRole::Executor => (state.executor_count, "executor"),
};
let label = if name.trim().is_empty() {
format!("{role_label}-{index}")
} else {
format!("{role_label}-{name}")
};
if state.peer_ports_by_name.contains_key(&label) {
return Err(LocalDynamicError::InvalidArgument {
message: format!("node name '{label}' already exists"),
});
}
(
state.peer_ports.clone(),
state.peer_ports_by_name.clone(),
label,
index,
)
};
let (general_config, network_port) = build_general_config_for(
&self.descriptors,
&self.base_consensus,
&self.base_time,
role,
index,
&peer_ports_by_name,
&options,
&peer_ports,
)?;
let api_client = match role {
NodeRole::Validator => {
let config = create_validator_config(general_config);
self.spawn_and_register_validator(&node_name, network_port, config)
.await?
}
NodeRole::Executor => {
let config = create_executor_config(general_config);
self.spawn_and_register_executor(&node_name, network_port, config)
.await?
}
};
Ok(StartedNode {
name: node_name,
role,
api: api_client,
})
}
async fn spawn_and_register_validator(
&self,
node_name: &str,
network_port: u16,
config: ValidatorConfig,
) -> Result<ApiClient, LocalDynamicError> {
let node = Validator::spawn(config, node_name)
.await
.map_err(|source| LocalDynamicError::Spawn { source })?;
let client = node.api().clone();
self.node_clients.add_validator(client.clone());
let mut state = self.state.lock().expect("local dynamic lock poisoned");
state.register_validator(node_name, network_port, client.clone(), node);
Ok(client)
}
async fn spawn_and_register_executor(
&self,
node_name: &str,
network_port: u16,
config: ExecutorConfig,
) -> Result<ApiClient, LocalDynamicError> {
let node = Executor::spawn(config, node_name)
.await
.map_err(|source| LocalDynamicError::Spawn { source })?;
let client = node.api().clone();
self.node_clients.add_executor(client.clone());
let mut state = self.state.lock().expect("local dynamic lock poisoned");
state.register_executor(node_name, network_port, client.clone(), node);
Ok(client)
}
}
#[async_trait::async_trait]
impl NodeControlHandle for LocalDynamicNodes {
async fn restart_validator(&self, _index: usize) -> Result<(), DynError> {
Err("local deployer does not support restart_validator".into())
}
async fn restart_executor(&self, _index: usize) -> Result<(), DynError> {
Err("local deployer does not support restart_executor".into())
}
async fn start_validator(&self, name: &str) -> Result<StartedNode, DynError> {
self.start_validator_with(name, StartNodeOptions::default())
.await
.map_err(|err| err.into())
}
async fn start_executor(&self, name: &str) -> Result<StartedNode, DynError> {
self.start_executor_with(name, StartNodeOptions::default())
.await
.map_err(|err| err.into())
}
async fn start_validator_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
self.start_validator_with(name, options)
.await
.map_err(|err| err.into())
}
async fn start_executor_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
self.start_executor_with(name, options)
.await
.map_err(|err| err.into())
}
fn node_client(&self, name: &str) -> Option<ApiClient> {
self.node_client(name)
}
}

View File

@ -0,0 +1,46 @@
use std::collections::HashMap;
use testing_framework_core::nodes::{ApiClient, executor::Executor, validator::Validator};
pub(crate) struct LocalDynamicState {
pub(crate) validator_count: usize,
pub(crate) executor_count: usize,
pub(crate) peer_ports: Vec<u16>,
pub(crate) peer_ports_by_name: HashMap<String, u16>,
pub(crate) clients_by_name: HashMap<String, ApiClient>,
pub(crate) validators: Vec<Validator>,
pub(crate) executors: Vec<Executor>,
}
impl LocalDynamicState {
fn register_common(&mut self, node_name: &str, network_port: u16, client: ApiClient) {
self.peer_ports.push(network_port);
self.peer_ports_by_name
.insert(node_name.to_string(), network_port);
self.clients_by_name.insert(node_name.to_string(), client);
}
pub(super) fn register_validator(
&mut self,
node_name: &str,
network_port: u16,
client: ApiClient,
node: Validator,
) {
self.register_common(node_name, network_port, client);
self.validator_count += 1;
self.validators.push(node);
}
pub(super) fn register_executor(
&mut self,
node_name: &str,
network_port: u16,
client: ApiClient,
node: Executor,
) {
self.register_common(node_name, network_port, client);
self.executor_count += 1;
self.executors.push(node);
}
}

View File

@ -1,34 +1,20 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use std::sync::Arc;
use async_trait::async_trait;
use nomos_libp2p::Multiaddr;
use nomos_utils::net::get_available_udp_port;
use rand::Rng as _;
use testing_framework_config::topology::configs::{
consensus,
runtime::{build_general_config_for_node, build_initial_peers},
time,
};
use testing_framework_core::{
node_address_from_port,
nodes::{ApiClient, executor::Executor, validator::Validator},
scenario::{
BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, NodeControlCapability,
NodeControlHandle, RunContext, Runner, Scenario, ScenarioError, StartNodeOptions,
StartedNode, spawn_block_feed,
RunContext, Runner, Scenario, ScenarioError, spawn_block_feed,
},
topology::{
deployment::{SpawnTopologyError, Topology},
generation::{GeneratedTopology, NodeRole},
readiness::ReadinessError,
},
};
use thiserror::Error;
use tracing::{debug, info};
use crate::node_control::{LocalDynamicNodes, LocalDynamicSeed};
/// Spawns validators and executors as local processes, reusing the existing
/// integration harness.
#[derive(Clone)]
@ -115,9 +101,10 @@ impl Deployer<NodeControlCapability> for LocalDeployer {
let topology = Self::prepare_topology(scenario).await?;
let node_clients = NodeClients::from_topology(scenario.topology(), &topology);
let node_control = Arc::new(LocalNodeControl::new(
let node_control = Arc::new(LocalDynamicNodes::new_with_seed(
scenario.topology().clone(),
node_clients.clone(),
LocalDynamicSeed::from_topology(scenario.topology()),
));
let (block_feed, block_feed_guard) = spawn_block_feed_with(&node_clients).await?;
@ -205,240 +192,3 @@ fn workload_error(source: impl Into<DynError>) -> LocalDeployerError {
source: source.into(),
}
}
struct LocalNodeControl {
descriptors: GeneratedTopology,
node_clients: NodeClients,
base_consensus: consensus::GeneralConsensusConfig,
base_time: time::GeneralTimeConfig,
state: Mutex<LocalNodeControlState>,
}
struct LocalNodeControlState {
validator_count: usize,
executor_count: usize,
peer_ports: Vec<u16>,
peer_ports_by_name: HashMap<String, u16>,
validators: Vec<Validator>,
executors: Vec<Executor>,
}
#[async_trait]
impl NodeControlHandle for LocalNodeControl {
async fn restart_validator(&self, _index: usize) -> Result<(), DynError> {
Err("local deployer does not support restart_validator".into())
}
async fn restart_executor(&self, _index: usize) -> Result<(), DynError> {
Err("local deployer does not support restart_executor".into())
}
async fn start_validator(&self, name: &str) -> Result<StartedNode, DynError> {
self.start_node(NodeRole::Validator, name, StartNodeOptions::default())
.await
}
async fn start_executor(&self, name: &str) -> Result<StartedNode, DynError> {
self.start_node(NodeRole::Executor, name, StartNodeOptions::default())
.await
}
async fn start_validator_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
self.start_node(NodeRole::Validator, name, options).await
}
async fn start_executor_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
self.start_node(NodeRole::Executor, name, options).await
}
}
impl LocalNodeControl {
fn new(descriptors: GeneratedTopology, node_clients: NodeClients) -> Self {
let base_node = descriptors
.validators()
.first()
.or_else(|| descriptors.executors().first())
.expect("generated topology must contain at least one node");
let base_consensus = base_node.general.consensus_config.clone();
let base_time = base_node.general.time_config.clone();
let peer_ports = descriptors
.nodes()
.map(|node| node.network_port())
.collect::<Vec<_>>();
let peer_ports_by_name = descriptors
.validators()
.iter()
.map(|node| (format!("validator-{}", node.index()), node.network_port()))
.chain(
descriptors
.executors()
.iter()
.map(|node| (format!("executor-{}", node.index()), node.network_port())),
)
.collect();
let state = LocalNodeControlState {
validator_count: descriptors.validators().len(),
executor_count: descriptors.executors().len(),
peer_ports,
peer_ports_by_name,
validators: Vec::new(),
executors: Vec::new(),
};
Self {
descriptors,
node_clients,
base_consensus,
base_time,
state: Mutex::new(state),
}
}
async fn start_node(
&self,
role: NodeRole,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
let (peer_ports, peer_ports_by_name, node_name) = {
let state = self.state.lock().expect("local node control lock poisoned");
let index = match role {
NodeRole::Validator => state.validator_count,
NodeRole::Executor => state.executor_count,
};
let role_label = match role {
NodeRole::Validator => "validator",
NodeRole::Executor => "executor",
};
let label = if name.trim().is_empty() {
format!("{role_label}-{index}")
} else {
format!("{role_label}-{name}")
};
if state.peer_ports_by_name.contains_key(&label) {
return Err(format!("node name '{label}' already exists").into());
}
(
state.peer_ports.clone(),
state.peer_ports_by_name.clone(),
label,
)
};
let id = random_node_id();
let network_port = allocate_udp_port("network port")?;
let da_port = allocate_udp_port("DA port")?;
let blend_port = allocate_udp_port("Blend port")?;
let topology = self.descriptors.config();
let initial_peers = if options.peer_names.is_empty() {
build_initial_peers(&topology.network_params, &peer_ports)
} else {
resolve_peer_names(&peer_ports_by_name, &options.peer_names)?
};
let general_config = build_general_config_for_node(
id,
network_port,
initial_peers,
da_port,
blend_port,
&topology.consensus_params,
&topology.da_params,
&topology.wallet_config,
&self.base_consensus,
&self.base_time,
)?;
let api_client = match role {
NodeRole::Validator => {
let config = testing_framework_core::nodes::validator::create_validator_config(
general_config,
);
let node = Validator::spawn(config, &node_name).await?;
let client = ApiClient::from_urls(node.url(), node.testing_url());
self.node_clients.add_validator(client.clone());
let mut state = self.state.lock().expect("local node control lock poisoned");
state.peer_ports.push(network_port);
state
.peer_ports_by_name
.insert(node_name.clone(), network_port);
state.validator_count += 1;
state.validators.push(node);
client
}
NodeRole::Executor => {
let config =
testing_framework_core::nodes::executor::create_executor_config(general_config);
let node = Executor::spawn(config, &node_name).await?;
let client = ApiClient::from_urls(node.url(), node.testing_url());
self.node_clients.add_executor(client.clone());
let mut state = self.state.lock().expect("local node control lock poisoned");
state.peer_ports.push(network_port);
state
.peer_ports_by_name
.insert(node_name.clone(), network_port);
state.executor_count += 1;
state.executors.push(node);
client
}
};
Ok(StartedNode {
name: node_name,
role,
api: api_client,
})
}
}
fn resolve_peer_names(
peer_ports_by_name: &HashMap<String, u16>,
peer_names: &[String],
) -> Result<Vec<Multiaddr>, DynError> {
let mut peers = Vec::with_capacity(peer_names.len());
for name in peer_names {
let port = peer_ports_by_name
.get(name)
.ok_or_else(|| format!("unknown peer name '{name}'"))?;
peers.push(node_address_from_port(*port));
}
Ok(peers)
}
fn random_node_id() -> [u8; 32] {
let mut id = [0u8; 32];
rand::thread_rng().fill(&mut id);
id
}
fn allocate_udp_port(label: &'static str) -> Result<u16, DynError> {
get_available_udp_port()
.ok_or_else(|| format!("failed to allocate free UDP port for {label}").into())
}