From 062be51a4f516dad67fd933e62dc03c032a13ca8 Mon Sep 17 00:00:00 2001 From: Andrus Salumets Date: Fri, 30 Jan 2026 13:05:46 +0100 Subject: [PATCH] Local deployer allows to stop and restart nodes (#16) * Unify local node control and restart support * Add local stop-node support * Use node names for restart/stop control * merge --------- Co-authored-by: hansieodendaal --- Cargo.lock | 1 + .../src/node_control_accessing_control.rs | 4 +- .../doc-snippets/src/node_control_trait.rs | 2 +- scripts/build/build-bundle.sh | 15 +- scripts/run/run-examples.sh | 11 +- testing-framework/core/src/manual.rs | 4 +- .../core/src/nodes/common/node.rs | 6 +- testing-framework/core/src/nodes/node.rs | 46 +++- .../core/src/scenario/capabilities.rs | 23 -- .../core/src/scenario/control.rs | 38 +++ testing-framework/core/src/scenario/mod.rs | 6 +- .../core/src/topology/deployment.rs | 78 +------ .../core/src/topology/generation.rs | 7 - .../deployers/compose/src/docker/control.rs | 12 +- testing-framework/deployers/local/Cargo.toml | 3 + testing-framework/deployers/local/src/lib.rs | 2 +- .../deployers/local/src/manual/mod.rs | 61 ++++- .../local/src/node_control/config.rs | 18 +- .../deployers/local/src/node_control/mod.rs | 221 +++++++++++++++--- .../deployers/local/src/node_control/state.rs | 7 +- .../deployers/local/src/runner.rs | 47 ++-- .../deployers/local/tests/restart.rs | 67 ++++++ .../workflows/src/workloads/chaos.rs | 20 +- 23 files changed, 507 insertions(+), 192 deletions(-) create mode 100644 testing-framework/core/src/scenario/control.rs create mode 100644 testing-framework/deployers/local/tests/restart.rs diff --git a/Cargo.lock b/Cargo.lock index ea66c15..942da1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6569,6 +6569,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tracing", + "tracing-subscriber 0.3.22", ] [[package]] diff --git a/examples/doc-snippets/src/node_control_accessing_control.rs b/examples/doc-snippets/src/node_control_accessing_control.rs index a7cc27d..f4404f3 100644 --- a/examples/doc-snippets/src/node_control_accessing_control.rs +++ b/examples/doc-snippets/src/node_control_accessing_control.rs @@ -11,8 +11,8 @@ impl Workload for RestartWorkload { async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { if let Some(control) = ctx.node_control() { - // Restart the first node (index 0) if supported. - control.restart_node(0).await?; + // Restart the first node by name if supported. + control.restart_node("node-0").await?; } Ok(()) } diff --git a/examples/doc-snippets/src/node_control_trait.rs b/examples/doc-snippets/src/node_control_trait.rs index 98976ca..6064b80 100644 --- a/examples/doc-snippets/src/node_control_trait.rs +++ b/examples/doc-snippets/src/node_control_trait.rs @@ -3,5 +3,5 @@ use testing_framework_core::scenario::DynError; #[async_trait] pub trait NodeControlHandle: Send + Sync { - async fn restart_node(&self, index: usize) -> Result<(), DynError>; + async fn restart_node(&self, name: &str) -> Result<(), DynError>; } diff --git a/scripts/build/build-bundle.sh b/scripts/build/build-bundle.sh index 4a32ede..87d25a7 100755 --- a/scripts/build/build-bundle.sh +++ b/scripts/build/build-bundle.sh @@ -309,11 +309,14 @@ build_bundle::prepare_circuits() { } build_bundle::build_binaries() { - BUILD_FEATURES_LABEL="all" + BUILD_FEATURES_LABEL="all,pol-dev-mode,verification-keys" echo "==> Building binaries (platform=${PLATFORM})" mkdir -p "${NODE_SRC}" ( cd "${NODE_SRC}" + if [ -d "${NODE_TARGET}" ]; then + rm -rf "${NODE_TARGET}" + fi if [ -n "${LOGOS_BLOCKCHAIN_NODE_PATH}" ]; then echo "Using local logos-blockchain-node checkout at ${NODE_SRC} (no fetch/checkout)" else @@ -326,18 +329,16 @@ build_bundle::build_binaries() { git clean -fdx fi - if [ -z "${LOGOS_BLOCKCHAIN_NODE_PATH}" ]; then - build_bundle::apply_nomos_node_patches "${NODE_SRC}" - fi - unset CARGO_FEATURE_BUILD_VERIFICATION_KEY if [ -n "${BUNDLE_RUSTUP_TOOLCHAIN}" ]; then - RUSTFLAGS='--cfg feature="pol-dev-mode"' \ + RUSTFLAGS='--cfg feature="pol-dev-mode" --cfg feature="build-verification-key"' \ + CARGO_FEATURE_BUILD_VERIFICATION_KEY=1 \ RUSTUP_TOOLCHAIN="${BUNDLE_RUSTUP_TOOLCHAIN}" \ cargo build --all-features \ -p logos-blockchain-node \ --target-dir "${NODE_TARGET}" else - RUSTFLAGS='--cfg feature="pol-dev-mode"' \ + RUSTFLAGS='--cfg feature="pol-dev-mode" --cfg feature="build-verification-key"' \ + CARGO_FEATURE_BUILD_VERIFICATION_KEY=1 \ cargo build --all-features \ -p logos-blockchain-node \ --target-dir "${NODE_TARGET}" diff --git a/scripts/run/run-examples.sh b/scripts/run/run-examples.sh index e4af1c0..f34af21 100755 --- a/scripts/run/run-examples.sh +++ b/scripts/run/run-examples.sh @@ -60,6 +60,7 @@ Environment: LOGOS_BLOCKCHAIN_TESTNET_IMAGE_PULL_POLICY K8s imagePullPolicy (default ${DEFAULT_PULL_POLICY_LOCAL}; set to ${DEFAULT_PULL_POLICY_ECR} for --ecr) LOGOS_BLOCKCHAIN_BINARIES_TAR Path to prebuilt binaries tarball (default .tmp/nomos-binaries--.tar.gz) LOGOS_BLOCKCHAIN_CIRCUITS Directory containing circuits assets (defaults to ~/.logos-blockchain-circuits) + CARGO_FEATURE_BUILD_VERIFICATION_KEY Build flag to embed Groth16 verification keys in node binaries (recommended for host) LOGOS_BLOCKCHAIN_SKIP_IMAGE_BUILD Set to 1 to skip rebuilding the compose/k8s image LOGOS_BLOCKCHAIN_FORCE_IMAGE_BUILD Set to 1 to force image rebuild even for k8s ECR mode LOGOS_BLOCKCHAIN_METRICS_QUERY_URL PromQL base URL for the runner process (optional) @@ -301,8 +302,9 @@ run_examples::bundle_matches_expected() { local tar_path="$1" [ -f "${tar_path}" ] || return 1 [ -z "${LOGOS_BLOCKCHAIN_NODE_REV:-}" ] && return 0 + local expected_features="${RUN_EXAMPLES_EXPECTED_BUNDLE_FEATURES:-all,pol-dev-mode,verification-keys}" - local meta tar_rev tar_head + local meta tar_rev tar_head tar_features meta="$(tar -xOzf "${tar_path}" artifacts/nomos-bundle-meta.env 2>/dev/null || true)" if [ -z "${meta}" ]; then echo "Bundle meta missing in ${tar_path}; treating as stale and rebuilding." >&2 @@ -310,6 +312,11 @@ run_examples::bundle_matches_expected() { fi tar_rev="$(echo "${meta}" | sed -n 's/^nomos_node_rev=//p' | head -n 1)" tar_head="$(echo "${meta}" | sed -n 's/^nomos_node_git_head=//p' | head -n 1)" + tar_features="$(echo "${meta}" | sed -n 's/^features=//p' | head -n 1)" + if [ -n "${expected_features}" ] && [ "${tar_features}" != "${expected_features}" ]; then + echo "Bundle ${tar_path} features '${tar_features}' do not match expected '${expected_features}'; rebuilding." >&2 + return 1 + fi if [ -n "${tar_rev}" ] && [ "${tar_rev}" != "${LOGOS_BLOCKCHAIN_NODE_REV}" ]; then echo "Bundle ${tar_path} is for logos-blockchain-node rev ${tar_rev}, expected ${LOGOS_BLOCKCHAIN_NODE_REV}; rebuilding." >&2 return 1 @@ -501,6 +508,8 @@ run_examples::run() { if [ "${MODE}" = "host" ]; then run_examples::ensure_circuits + # Ensure Groth16 verification keys are embedded when building local node binaries. + export CARGO_FEATURE_BUILD_VERIFICATION_KEY=1 fi echo "==> Running ${BIN} for ${RUN_SECS}s (mode=${MODE}, image=${IMAGE})" diff --git a/testing-framework/core/src/manual.rs b/testing-framework/core/src/manual.rs index 1f8618b..a0fec65 100644 --- a/testing-framework/core/src/manual.rs +++ b/testing-framework/core/src/manual.rs @@ -1,10 +1,10 @@ use async_trait::async_trait; -use crate::scenario::{DynError, StartNodeOptions, StartedNode}; +use crate::scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode}; /// Interface for imperative, deployer-backed manual clusters. #[async_trait] -pub trait ManualClusterHandle: Send + Sync { +pub trait ManualClusterHandle: NodeControlHandle { async fn start_node_with( &self, name: &str, diff --git a/testing-framework/core/src/nodes/common/node.rs b/testing-framework/core/src/nodes/common/node.rs index c119edf..79323ce 100644 --- a/testing-framework/core/src/nodes/common/node.rs +++ b/testing-framework/core/src/nodes/common/node.rs @@ -195,7 +195,7 @@ fn write_node_config(config: &C, config_path: &Path) -> Result<(), }) } -fn spawn_node_process( +pub(crate) fn spawn_node_process( binary_path: &Path, config_path: &Path, workdir: &Path, @@ -213,7 +213,9 @@ fn spawn_node_process( }) } -async fn wait_for_consensus_readiness(api: &ApiClient) -> Result<(), time::error::Elapsed> { +pub(crate) async fn wait_for_consensus_readiness( + api: &ApiClient, +) -> Result<(), time::error::Elapsed> { time::timeout(STARTUP_TIMEOUT, async { loop { if api.consensus_info().await.is_ok() { diff --git a/testing-framework/core/src/nodes/node.rs b/testing-framework/core/src/nodes/node.rs index 837e4ea..80035a3 100644 --- a/testing-framework/core/src/nodes/node.rs +++ b/testing-framework/core/src/nodes/node.rs @@ -13,7 +13,10 @@ use crate::{ common::{ binary::{BinaryConfig, BinaryResolver}, lifecycle::{kill::kill_child, monitor::is_running}, - node::{NodeAddresses, NodeConfigCommon, NodeHandle, SpawnNodeError, spawn_node}, + node::{ + NodeAddresses, NodeConfigCommon, NodeHandle, SpawnNodeError, spawn_node, + spawn_node_process, wait_for_consensus_readiness, + }, }, }, scenario::DynError, @@ -21,6 +24,7 @@ use crate::{ }; const BIN_PATH: &str = "target/debug/logos-blockchain-node"; +const RESTART_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10); fn binary_path() -> PathBuf { let cfg = BinaryConfig { @@ -75,6 +79,12 @@ impl Drop for Node { } impl Node { + /// Return the current process id for the running node. + #[must_use] + pub fn pid(&self) -> u32 { + self.handle.child.id() + } + /// Check if the node process is still running pub fn is_running(&mut self) -> bool { is_running(&mut self.handle.child) @@ -101,6 +111,40 @@ impl Node { Ok(Self { handle }) } + + /// Restart the node process using the existing config and data directory. + pub async fn restart(&mut self) -> Result<(), SpawnNodeError> { + let old_pid = self.pid(); + debug!(old_pid, "restarting node process"); + + kill_child(&mut self.handle.child); + let _ = self.wait_for_exit(RESTART_SHUTDOWN_TIMEOUT).await; + + let config_path = self.handle.tempdir.path().join("node.yaml"); + let child = spawn_node_process(&binary_path(), &config_path, self.handle.tempdir.path())?; + self.handle.child = child; + + let new_pid = self.pid(); + wait_for_consensus_readiness(&self.handle.api) + .await + .map_err(|source| SpawnNodeError::Readiness { source })?; + + info!( + old_pid, + new_pid, "node restart readiness confirmed via consensus_info" + ); + + Ok(()) + } + + /// Stop the node process without restarting it. + pub async fn stop(&mut self) { + let pid = self.pid(); + debug!(pid, "stopping node process"); + + kill_child(&mut self.handle.child); + let _ = self.wait_for_exit(RESTART_SHUTDOWN_TIMEOUT).await; + } } impl NodeConfigCommon for RunConfig { diff --git a/testing-framework/core/src/scenario/capabilities.rs b/testing-framework/core/src/scenario/capabilities.rs index 1dda452..c0e657f 100644 --- a/testing-framework/core/src/scenario/capabilities.rs +++ b/testing-framework/core/src/scenario/capabilities.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; use reqwest::Url; use super::DynError; @@ -84,28 +83,6 @@ impl RequiresNodeControl for ObservabilityCapability { const REQUIRED: bool = false; } -/// Interface exposed by runners that can restart nodes at runtime. -#[async_trait] -pub trait NodeControlHandle: Send + Sync { - async fn restart_node(&self, index: usize) -> Result<(), DynError>; - - async fn start_node(&self, _name: &str) -> Result { - Err("start_node not supported by this deployer".into()) - } - - async fn start_node_with( - &self, - _name: &str, - _options: StartNodeOptions, - ) -> Result { - Err("start_node_with not supported by this deployer".into()) - } - - fn node_client(&self, _name: &str) -> Option { - None - } -} - #[derive(Clone)] pub struct StartedNode { pub name: String, diff --git a/testing-framework/core/src/scenario/control.rs b/testing-framework/core/src/scenario/control.rs new file mode 100644 index 0000000..682937d --- /dev/null +++ b/testing-framework/core/src/scenario/control.rs @@ -0,0 +1,38 @@ +use async_trait::async_trait; + +use crate::{ + nodes::ApiClient, + scenario::{DynError, StartNodeOptions, StartedNode}, +}; + +/// Deployer-agnostic control surface for runtime node operations. +#[async_trait] +pub trait NodeControlHandle: Send + Sync { + async fn restart_node(&self, _name: &str) -> Result<(), DynError> { + Err("restart_node not supported by this deployer".into()) + } + + async fn start_node(&self, _name: &str) -> Result { + Err("start_node not supported by this deployer".into()) + } + + async fn start_node_with( + &self, + _name: &str, + _options: StartNodeOptions, + ) -> Result { + Err("start_node_with not supported by this deployer".into()) + } + + async fn stop_node(&self, _name: &str) -> Result<(), DynError> { + Err("stop_node not supported by this deployer".into()) + } + + fn node_client(&self, _name: &str) -> Option { + None + } + + fn node_pid(&self, _name: &str) -> Option { + None + } +} diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index 47c7de3..6e348c6 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -2,6 +2,7 @@ mod capabilities; pub mod cfgsync; +mod control; mod definition; mod expectation; pub mod http_probe; @@ -12,9 +13,10 @@ mod workload; pub type DynError = Box; pub use capabilities::{ - NodeControlCapability, NodeControlHandle, ObservabilityCapability, PeerSelection, - RequiresNodeControl, StartNodeOptions, StartedNode, + NodeControlCapability, ObservabilityCapability, PeerSelection, RequiresNodeControl, + StartNodeOptions, StartedNode, }; +pub use control::NodeControlHandle; pub use definition::{ Builder, Scenario, ScenarioBuildError, ScenarioBuilder, TopologyConfigurator, }; diff --git a/testing-framework/core/src/topology/deployment.rs b/testing-framework/core/src/topology/deployment.rs index bb3aa0b..360e698 100644 --- a/testing-framework/core/src/topology/deployment.rs +++ b/testing-framework/core/src/topology/deployment.rs @@ -1,16 +1,9 @@ use std::collections::HashSet; -use thiserror::Error; - use crate::{ - nodes::{ - common::node::SpawnNodeError, - node::{Node, apply_node_config_patch, create_node_config}, - }, - scenario, + nodes::node::Node, topology::{ - config::{TopologyBuildError, TopologyBuilder, TopologyConfig}, - generation::{GeneratedNodeConfig, find_expected_peer_counts}, + generation::find_expected_peer_counts, readiness::{NetworkReadiness, ReadinessCheck, ReadinessError}, utils::multiaddr_port, }, @@ -21,65 +14,11 @@ pub struct Topology { pub(crate) nodes: Vec, } -pub type DeployedNodes = Vec; - -#[derive(Debug, Error)] -pub enum SpawnTopologyError { - #[error(transparent)] - Build(#[from] TopologyBuildError), - #[error(transparent)] - Node(#[from] SpawnNodeError), - #[error("node config patch failed for node-{index}: {source}")] - ConfigPatch { - index: usize, - source: scenario::DynError, - }, -} - impl Topology { - pub async fn spawn(config: TopologyConfig) -> Result { - let generated = TopologyBuilder::new(config.clone()).build()?; - let nodes = Self::spawn_nodes(generated.nodes()).await?; - - Ok(Self { nodes }) - } - - pub async fn spawn_with_empty_membership( - config: TopologyConfig, - ids: &[[u8; 32]], - blend_ports: &[u16], - ) -> Result { - let generated = TopologyBuilder::new(config.clone()) - .with_ids(ids.to_vec()) - .with_blend_ports(blend_ports.to_vec()) - .build()?; - - let nodes = Self::spawn_nodes(generated.nodes()).await?; - - Ok(Self { nodes }) - } - - pub(crate) async fn spawn_nodes( - nodes: &[GeneratedNodeConfig], - ) -> Result { - let mut spawned = Vec::new(); - for node in nodes { - let mut config = create_node_config(node.general.clone()); - - if let Some(patch) = node.config_patch.as_ref() { - config = apply_node_config_patch(config, patch).map_err(|source| { - SpawnTopologyError::ConfigPatch { - index: node.index, - source, - } - })?; - } - - let label = format!("node-{}", node.index); - spawned.push(Node::spawn(config, &label).await?); - } - - Ok(spawned) + /// Construct a topology from already-spawned nodes. + #[must_use] + pub fn from_nodes(nodes: Vec) -> Self { + Self { nodes } } #[must_use] @@ -87,6 +26,11 @@ impl Topology { &self.nodes } + #[must_use] + pub fn into_nodes(self) -> Vec { + self.nodes + } + pub async fn wait_network_ready(&self) -> Result<(), ReadinessError> { let listen_ports = self.node_listen_ports(); if listen_ports.len() <= 1 { diff --git a/testing-framework/core/src/topology/generation.rs b/testing-framework/core/src/topology/generation.rs index 270e4f3..3ea2b2d 100644 --- a/testing-framework/core/src/topology/generation.rs +++ b/testing-framework/core/src/topology/generation.rs @@ -5,7 +5,6 @@ use reqwest::{Client, Url}; use crate::topology::{ config::{NodeConfigPatch, TopologyConfig}, configs::{GeneralConfig, wallet::WalletAccount}, - deployment::{SpawnTopologyError, Topology}, readiness::{HttpNetworkReadiness, ReadinessCheck, ReadinessError}, }; @@ -82,12 +81,6 @@ impl GeneratedTopology { &self.config.wallet_config.accounts } - pub async fn spawn_local(&self) -> Result { - let nodes = Topology::spawn_nodes(self.nodes()).await?; - - Ok(Topology { nodes }) - } - pub async fn wait_remote_readiness( &self, // Node endpoints diff --git a/testing-framework/deployers/compose/src/docker/control.rs b/testing-framework/deployers/compose/src/docker/control.rs index 4a27e02..24d924f 100644 --- a/testing-framework/deployers/compose/src/docker/control.rs +++ b/testing-framework/deployers/compose/src/docker/control.rs @@ -45,13 +45,9 @@ pub struct ComposeNodeControl { #[async_trait::async_trait] impl NodeControlHandle for ComposeNodeControl { - async fn restart_node(&self, index: usize) -> Result<(), DynError> { - restart_compose_service( - &self.compose_file, - &self.project_name, - &format!("node-{index}"), - ) - .await - .map_err(|err| format!("node restart failed: {err}").into()) + async fn restart_node(&self, name: &str) -> Result<(), DynError> { + restart_compose_service(&self.compose_file, &self.project_name, name) + .await + .map_err(|err| format!("node restart failed: {err}").into()) } } diff --git a/testing-framework/deployers/local/Cargo.toml b/testing-framework/deployers/local/Cargo.toml index 46e9b56..32b895a 100644 --- a/testing-framework/deployers/local/Cargo.toml +++ b/testing-framework/deployers/local/Cargo.toml @@ -24,3 +24,6 @@ testing-framework-core = { path = "../../core" } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } + +[dev-dependencies] +tracing-subscriber = "0.3" diff --git a/testing-framework/deployers/local/src/lib.rs b/testing-framework/deployers/local/src/lib.rs index 7bb7b39..a9379bf 100644 --- a/testing-framework/deployers/local/src/lib.rs +++ b/testing-framework/deployers/local/src/lib.rs @@ -3,5 +3,5 @@ mod node_control; mod runner; pub use manual::{LocalManualCluster, ManualClusterError}; -pub use node_control::{LocalDynamicError, LocalDynamicNodes, LocalDynamicSeed}; +pub use node_control::{LocalNodeManager, LocalNodeManagerError, LocalNodeManagerSeed}; pub use runner::{LocalDeployer, LocalDeployerError}; diff --git a/testing-framework/deployers/local/src/manual/mod.rs b/testing-framework/deployers/local/src/manual/mod.rs index f935065..51a75bf 100644 --- a/testing-framework/deployers/local/src/manual/mod.rs +++ b/testing-framework/deployers/local/src/manual/mod.rs @@ -1,7 +1,7 @@ use testing_framework_core::{ manual::ManualClusterHandle, nodes::ApiClient, - scenario::{DynError, StartNodeOptions, StartedNode}, + scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode}, topology::{ config::{TopologyBuildError, TopologyBuilder, TopologyConfig}, readiness::{ReadinessCheck, ReadinessError}, @@ -9,7 +9,7 @@ use testing_framework_core::{ }; use thiserror::Error; -use crate::node_control::{LocalDynamicError, LocalDynamicNodes, ReadinessNode}; +use crate::node_control::{LocalNodeManager, LocalNodeManagerError, ReadinessNode}; mod readiness; @@ -23,12 +23,12 @@ pub enum ManualClusterError { source: TopologyBuildError, }, #[error(transparent)] - Dynamic(#[from] LocalDynamicError), + Dynamic(#[from] LocalNodeManagerError), } /// Imperative, in-process cluster that can start nodes on demand. pub struct LocalManualCluster { - nodes: LocalDynamicNodes, + nodes: LocalNodeManager, } impl LocalManualCluster { @@ -37,7 +37,7 @@ impl LocalManualCluster { let descriptors = builder .build() .map_err(|source| ManualClusterError::Build { source })?; - let nodes = LocalDynamicNodes::new( + let nodes = LocalNodeManager::new( descriptors, testing_framework_core::scenario::NodeClients::default(), ); @@ -49,6 +49,11 @@ impl LocalManualCluster { self.nodes.node_client(name) } + #[must_use] + pub fn node_pid(&self, name: &str) -> Option { + self.nodes.node_pid(name) + } + pub async fn start_node(&self, name: &str) -> Result { Ok(self .nodes @@ -68,6 +73,14 @@ impl LocalManualCluster { self.nodes.stop_all(); } + pub async fn restart_node(&self, name: &str) -> Result<(), ManualClusterError> { + Ok(self.nodes.restart_node(name).await?) + } + + pub async fn stop_node(&self, name: &str) -> Result<(), ManualClusterError> { + Ok(self.nodes.stop_node(name).await?) + } + pub async fn wait_network_ready(&self) -> Result<(), ReadinessError> { let nodes = self.nodes.readiness_nodes(); if self.is_singleton(&nodes) { @@ -92,6 +105,44 @@ impl Drop for LocalManualCluster { } } +#[async_trait::async_trait] +impl NodeControlHandle for LocalManualCluster { + async fn restart_node(&self, name: &str) -> Result<(), DynError> { + self.nodes + .restart_node(name) + .await + .map_err(|err| err.into()) + } + + async fn stop_node(&self, name: &str) -> Result<(), DynError> { + self.nodes.stop_node(name).await.map_err(|err| err.into()) + } + + async fn start_node(&self, name: &str) -> Result { + self.start_node_with(name, StartNodeOptions::default()) + .await + .map_err(|err| err.into()) + } + + async fn start_node_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result { + self.start_node_with(name, options) + .await + .map_err(|err| err.into()) + } + + fn node_client(&self, name: &str) -> Option { + self.node_client(name) + } + + fn node_pid(&self, name: &str) -> Option { + self.node_pid(name) + } +} + #[async_trait::async_trait] impl ManualClusterHandle for LocalManualCluster { async fn start_node_with( diff --git a/testing-framework/deployers/local/src/node_control/config.rs b/testing-framework/deployers/local/src/node_control/config.rs index 28673f2..8fe9aec 100644 --- a/testing-framework/deployers/local/src/node_control/config.rs +++ b/testing-framework/deployers/local/src/node_control/config.rs @@ -8,7 +8,7 @@ use testing_framework_config::topology::configs::{ runtime::{build_general_config_for_node, build_initial_peers}, time::GeneralTimeConfig, }; -use testing_framework_core::{ +pub(crate) use testing_framework_core::{ scenario::{PeerSelection, StartNodeOptions}, topology::{ config::{NodeConfigPatch, TopologyConfig}, @@ -17,7 +17,7 @@ use testing_framework_core::{ }, }; -use super::LocalDynamicError; +use super::LocalNodeManagerError; pub(super) fn build_general_config_for( descriptors: &GeneratedTopology, @@ -27,7 +27,7 @@ pub(super) fn build_general_config_for( peer_ports_by_name: &HashMap, options: &StartNodeOptions, peer_ports: &[u16], -) -> Result<(GeneralConfig, u16, Option), LocalDynamicError> { +) -> Result<(GeneralConfig, u16, Option), LocalNodeManagerError> { if let Some(node) = descriptor_for(descriptors, index) { let mut config = node.general.clone(); let initial_peers = resolve_initial_peers( @@ -59,7 +59,7 @@ pub(super) fn build_general_config_for( base_consensus, base_time, ) - .map_err(|source| LocalDynamicError::Config { source })?; + .map_err(|source| LocalNodeManagerError::Config { source })?; Ok((general_config, network_port, None)) } @@ -71,13 +71,13 @@ fn descriptor_for(descriptors: &GeneratedTopology, index: usize) -> Option<&Gene fn resolve_peer_names( peer_ports_by_name: &HashMap, peer_names: &[String], -) -> Result, LocalDynamicError> { +) -> Result, LocalNodeManagerError> { let mut peers = Vec::with_capacity(peer_names.len()); for name in peer_names { let port = peer_ports_by_name .get(name) - .ok_or_else(|| LocalDynamicError::InvalidArgument { + .ok_or_else(|| LocalNodeManagerError::InvalidArgument { message: format!("unknown peer name '{name}'"), })?; peers.push(testing_framework_config::node_address_from_port(*port)); @@ -91,7 +91,7 @@ fn resolve_initial_peers( default_peers: &[Multiaddr], descriptors: &GeneratedTopology, peer_ports: &[u16], -) -> Result, LocalDynamicError> { +) -> Result, LocalNodeManagerError> { match &options.peers { PeerSelection::Named(names) => resolve_peer_names(peer_ports_by_name, names), PeerSelection::DefaultLayout => { @@ -112,8 +112,8 @@ fn random_node_id() -> [u8; 32] { id } -fn allocate_udp_port(label: &'static str) -> Result { - get_available_udp_port().ok_or_else(|| LocalDynamicError::PortAllocation { +fn allocate_udp_port(label: &'static str) -> Result { + get_available_udp_port().ok_or_else(|| LocalNodeManagerError::PortAllocation { message: format!("failed to allocate free UDP port for {label}"), }) } diff --git a/testing-framework/deployers/local/src/node_control/mod.rs b/testing-framework/deployers/local/src/node_control/mod.rs index b3978a4..b3b9846 100644 --- a/testing-framework/deployers/local/src/node_control/mod.rs +++ b/testing-framework/deployers/local/src/node_control/mod.rs @@ -12,6 +12,7 @@ use testing_framework_core::{ }, scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode}, topology::{ + deployment::Topology, generation::{GeneratedTopology, find_expected_peer_counts}, utils::multiaddr_port, }, @@ -22,11 +23,11 @@ mod config; mod state; use config::build_general_config_for; -use state::LocalDynamicState; +use state::LocalNodeManagerState; use testing_framework_core::scenario::NodeClients; #[derive(Debug, Error)] -pub enum LocalDynamicError { +pub enum LocalNodeManagerError { #[error("failed to generate node config: {source}")] Config { #[source] @@ -43,25 +44,32 @@ pub enum LocalDynamicError { PortAllocation { message: String }, #[error("node config patch failed: {message}")] ConfigPatch { message: String }, + #[error("node name '{name}' is unknown")] + NodeName { name: String }, + #[error("failed to restart node: {source}")] + Restart { + #[source] + source: testing_framework_core::nodes::common::node::SpawnNodeError, + }, } -pub struct LocalDynamicNodes { +pub struct LocalNodeManager { descriptors: GeneratedTopology, base_consensus: consensus::GeneralConsensusConfig, base_time: time::GeneralTimeConfig, node_clients: NodeClients, - seed: LocalDynamicSeed, - state: Mutex, + seed: LocalNodeManagerSeed, + state: Mutex, } #[derive(Clone, Default)] -pub struct LocalDynamicSeed { +pub struct LocalNodeManagerSeed { pub node_count: usize, pub peer_ports: Vec, pub peer_ports_by_name: HashMap, } -impl LocalDynamicSeed { +impl LocalNodeManagerSeed { #[must_use] pub fn from_topology(descriptors: &GeneratedTopology) -> Self { let peer_ports = descriptors @@ -90,15 +98,39 @@ pub(crate) struct ReadinessNode { pub(crate) api: ApiClient, } -impl LocalDynamicNodes { +impl LocalNodeManager { + fn default_label(index: usize) -> String { + format!("node-{index}") + } + + pub async fn spawn_initial_nodes( + descriptors: &GeneratedTopology, + ) -> Result, testing_framework_core::nodes::common::node::SpawnNodeError> { + let mut nodes = Vec::with_capacity(descriptors.nodes().len()); + for node in descriptors.nodes() { + let label = Self::default_label(node.index()); + let config = create_node_config(node.general.clone()); + let spawned = Node::spawn(config, &label).await?; + nodes.push(spawned); + } + + Ok(nodes) + } + + pub async fn spawn_initial_topology( + descriptors: &GeneratedTopology, + ) -> Result { + let nodes = Self::spawn_initial_nodes(descriptors).await?; + Ok(Topology::from_nodes(nodes)) + } pub fn new(descriptors: GeneratedTopology, node_clients: NodeClients) -> Self { - Self::new_with_seed(descriptors, node_clients, LocalDynamicSeed::default()) + Self::new_with_seed(descriptors, node_clients, LocalNodeManagerSeed::default()) } pub fn new_with_seed( descriptors: GeneratedTopology, node_clients: NodeClients, - seed: LocalDynamicSeed, + seed: LocalNodeManagerSeed, ) -> Self { let base_node = descriptors .nodes() @@ -108,11 +140,12 @@ impl LocalDynamicNodes { let base_consensus = base_node.general.consensus_config.clone(); let base_time = base_node.general.time_config.clone(); - let state = LocalDynamicState { + let state = LocalNodeManagerState { node_count: seed.node_count, peer_ports: seed.peer_ports.clone(), peer_ports_by_name: seed.peer_ports_by_name.clone(), clients_by_name: HashMap::new(), + indices_by_name: HashMap::new(), nodes: Vec::new(), }; @@ -136,6 +169,22 @@ impl LocalDynamicNodes { state.clients_by_name.get(name).cloned() } + #[must_use] + pub fn node_pid(&self, name: &str) -> Option { + let mut state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + + let index = *state.indices_by_name.get(name)?; + let node = state.nodes.get_mut(index)?; + if node.is_running() { + Some(node.pid()) + } else { + None + } + } + pub fn stop_all(&self) { let mut state = self .state @@ -148,15 +197,46 @@ impl LocalDynamicNodes { .peer_ports_by_name .clone_from(&self.seed.peer_ports_by_name); state.clients_by_name.clear(); + state.indices_by_name.clear(); state.node_count = self.seed.node_count; self.node_clients.clear(); } + pub fn initialize_with_nodes(&self, nodes: Vec) { + self.node_clients.clear(); + + let mut state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + + state.nodes.clear(); + state.peer_ports.clear(); + state.peer_ports_by_name.clear(); + state.clients_by_name.clear(); + state.indices_by_name.clear(); + state.node_count = 0; + + for (idx, node) in nodes.into_iter().enumerate() { + let name = Self::default_label(idx); + let port = node.config().user.network.backend.swarm.port; + let client = node.api().clone(); + + self.node_clients.add_node(client.clone()); + state.register_node(&name, port, client, node); + } + } + + #[must_use] + pub fn node_clients(&self) -> NodeClients { + self.node_clients.clone() + } + pub async fn start_node_with( &self, name: &str, options: StartNodeOptions, - ) -> Result { + ) -> Result { self.start_node(name, options).await } @@ -208,7 +288,7 @@ impl LocalDynamicNodes { &self, name: &str, options: StartNodeOptions, - ) -> Result { + ) -> Result { let (peer_ports, peer_ports_by_name, node_name, index) = { let state = self .state @@ -217,13 +297,15 @@ impl LocalDynamicNodes { let index = state.node_count; let label = if name.trim().is_empty() { - format!("node-{index}") + Self::default_label(index) + } else if name.starts_with("node-") { + name.to_string() } else { format!("node-{name}") }; if state.peer_ports_by_name.contains_key(&label) { - return Err(LocalDynamicError::InvalidArgument { + return Err(LocalNodeManagerError::InvalidArgument { message: format!("node name '{label}' already exists"), }); } @@ -262,15 +344,94 @@ impl LocalDynamicNodes { }) } + pub async fn restart_node(&self, name: &str) -> Result<(), LocalNodeManagerError> { + let (index, mut node) = { + let mut state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + + let Some(index) = state.indices_by_name.get(name).copied() else { + return Err(LocalNodeManagerError::NodeName { + name: name.to_string(), + }); + }; + + if index >= state.nodes.len() { + return Err(LocalNodeManagerError::NodeName { + name: name.to_string(), + }); + } + + let node = state.nodes.remove(index); + (index, node) + }; + + node.restart() + .await + .map_err(|source| LocalNodeManagerError::Restart { source })?; + + let mut state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + + if index <= state.nodes.len() { + state.nodes.insert(index, node); + } else { + state.nodes.push(node); + } + + Ok(()) + } + + pub async fn stop_node(&self, name: &str) -> Result<(), LocalNodeManagerError> { + let (index, mut node) = { + let mut state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + + let Some(index) = state.indices_by_name.get(name).copied() else { + return Err(LocalNodeManagerError::NodeName { + name: name.to_string(), + }); + }; + + if index >= state.nodes.len() { + return Err(LocalNodeManagerError::NodeName { + name: name.to_string(), + }); + } + + let node = state.nodes.remove(index); + (index, node) + }; + + node.stop().await; + + let mut state = self + .state + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + + if index <= state.nodes.len() { + state.nodes.insert(index, node); + } else { + state.nodes.push(node); + } + Ok(()) + } + async fn spawn_and_register_node( &self, node_name: &str, network_port: u16, config: RunConfig, - ) -> Result { + ) -> Result { let node = Node::spawn(config, node_name) .await - .map_err(|source| LocalDynamicError::Spawn { source })?; + .map_err(|source| LocalNodeManagerError::Spawn { source })?; let client = node.api().clone(); self.node_clients.add_node(client.clone()); @@ -288,9 +449,9 @@ impl LocalDynamicNodes { fn build_node_config( general_config: testing_framework_config::topology::configs::GeneralConfig, - descriptor_patch: Option<&testing_framework_core::topology::config::NodeConfigPatch>, - options_patch: Option<&testing_framework_core::topology::config::NodeConfigPatch>, -) -> Result { + descriptor_patch: Option<&config::NodeConfigPatch>, + options_patch: Option<&config::NodeConfigPatch>, +) -> Result { let mut config = create_node_config(general_config); config = apply_patch_if_needed(config, descriptor_patch)?; config = apply_patch_if_needed(config, options_patch)?; @@ -300,21 +461,25 @@ fn build_node_config( fn apply_patch_if_needed( config: RunConfig, - patch: Option<&testing_framework_core::topology::config::NodeConfigPatch>, -) -> Result { + patch: Option<&config::NodeConfigPatch>, +) -> Result { let Some(patch) = patch else { return Ok(config); }; - apply_node_config_patch(config, patch).map_err(|err| LocalDynamicError::ConfigPatch { + apply_node_config_patch(config, patch).map_err(|err| LocalNodeManagerError::ConfigPatch { message: err.to_string(), }) } #[async_trait::async_trait] -impl NodeControlHandle for LocalDynamicNodes { - async fn restart_node(&self, _index: usize) -> Result<(), DynError> { - Err("local deployer does not support restart_node".into()) +impl NodeControlHandle for LocalNodeManager { + async fn restart_node(&self, name: &str) -> Result<(), DynError> { + self.restart_node(name).await.map_err(|err| err.into()) + } + + async fn stop_node(&self, name: &str) -> Result<(), DynError> { + self.stop_node(name).await.map_err(|err| err.into()) } async fn start_node(&self, name: &str) -> Result { @@ -336,4 +501,8 @@ impl NodeControlHandle for LocalDynamicNodes { fn node_client(&self, name: &str) -> Option { self.node_client(name) } + + fn node_pid(&self, name: &str) -> Option { + self.node_pid(name) + } } diff --git a/testing-framework/deployers/local/src/node_control/state.rs b/testing-framework/deployers/local/src/node_control/state.rs index 4eb03f8..81fa525 100644 --- a/testing-framework/deployers/local/src/node_control/state.rs +++ b/testing-framework/deployers/local/src/node_control/state.rs @@ -2,15 +2,16 @@ use std::collections::HashMap; use testing_framework_core::nodes::{ApiClient, node::Node}; -pub(crate) struct LocalDynamicState { +pub(crate) struct LocalNodeManagerState { pub(crate) node_count: usize, pub(crate) peer_ports: Vec, pub(crate) peer_ports_by_name: HashMap, pub(crate) clients_by_name: HashMap, + pub(crate) indices_by_name: HashMap, pub(crate) nodes: Vec, } -impl LocalDynamicState { +impl LocalNodeManagerState { fn register_common(&mut self, node_name: &str, network_port: u16, client: ApiClient) { self.peer_ports.push(network_port); self.peer_ports_by_name @@ -26,6 +27,8 @@ impl LocalDynamicState { node: Node, ) { self.register_common(node_name, network_port, client); + let index = self.nodes.len(); + self.indices_by_name.insert(node_name.to_string(), index); self.node_count += 1; self.nodes.push(node); } diff --git a/testing-framework/deployers/local/src/runner.rs b/testing-framework/deployers/local/src/runner.rs index 98b467b..cfbb7d9 100644 --- a/testing-framework/deployers/local/src/runner.rs +++ b/testing-framework/deployers/local/src/runner.rs @@ -2,22 +2,19 @@ use std::sync::Arc; use async_trait::async_trait; use testing_framework_core::{ + nodes::common::node::SpawnNodeError, scenario::{ BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, NodeControlCapability, RunContext, Runner, Scenario, ScenarioError, spawn_block_feed, }, - topology::{ - config::TopologyConfig, - deployment::{SpawnTopologyError, Topology}, - readiness::ReadinessError, - }, + topology::{config::TopologyConfig, deployment::Topology, readiness::ReadinessError}, }; use thiserror::Error; use tracing::{debug, info}; use crate::{ manual::{LocalManualCluster, ManualClusterError}, - node_control::{LocalDynamicNodes, LocalDynamicSeed}, + node_control::{LocalNodeManager, LocalNodeManagerSeed}, }; /// Spawns nodes as local processes, reusing the existing /// integration harness. @@ -32,7 +29,7 @@ pub enum LocalDeployerError { #[error("failed to spawn local topology: {source}")] Spawn { #[source] - source: SpawnTopologyError, + source: SpawnNodeError, }, #[error("readiness probe failed: {source}")] ReadinessFailed { @@ -103,19 +100,39 @@ impl Deployer for LocalDeployer { "starting local deployment with node control" ); - let topology = Self::prepare_topology(scenario, self.membership_check).await?; - let node_clients = NodeClients::from_topology(scenario.topology(), &topology); - let node_control = Arc::new(LocalDynamicNodes::new_with_seed( + let mut nodes = LocalNodeManager::spawn_initial_nodes(scenario.topology()) + .await + .map_err(|source| LocalDeployerError::Spawn { source })?; + + if self.membership_check { + let topology = Topology::from_nodes(nodes); + + wait_for_readiness(&topology).await.map_err(|source| { + debug!(error = ?source, "local readiness failed"); + LocalDeployerError::ReadinessFailed { source } + })?; + + nodes = topology.into_nodes(); + + info!("local nodes are ready"); + } else { + info!("skipping local membership readiness checks"); + } + + let node_control = Arc::new(LocalNodeManager::new_with_seed( scenario.topology().clone(), - node_clients.clone(), - LocalDynamicSeed::from_topology(scenario.topology()), + NodeClients::default(), + LocalNodeManagerSeed::from_topology(scenario.topology()), )); + node_control.initialize_with_nodes(nodes); + let node_clients = node_control.node_clients(); + let (block_feed, block_feed_guard) = spawn_block_feed_with(&node_clients).await?; let context = RunContext::new( scenario.topology().clone(), - Some(topology), + None, node_clients, scenario.duration(), Metrics::empty(), @@ -158,9 +175,7 @@ impl LocalDeployer { info!(nodes = descriptors.nodes().len(), "spawning local nodes"); - let topology = descriptors - .clone() - .spawn_local() + let topology = LocalNodeManager::spawn_initial_topology(descriptors) .await .map_err(|source| LocalDeployerError::Spawn { source })?; diff --git a/testing-framework/deployers/local/tests/restart.rs b/testing-framework/deployers/local/tests/restart.rs new file mode 100644 index 0000000..a16ee50 --- /dev/null +++ b/testing-framework/deployers/local/tests/restart.rs @@ -0,0 +1,67 @@ +use std::time::Duration; + +use testing_framework_core::{ + scenario::{Deployer, ScenarioBuilder}, + topology::config::TopologyConfig, +}; +use testing_framework_runner_local::LocalDeployer; +use tracing_subscriber::fmt::try_init; + +#[tokio::test] +#[ignore = "requires local node binary and open ports"] +async fn local_restart_node() -> Result<(), Box> { + let _ = try_init(); + let mut scenario = ScenarioBuilder::topology_with(|t| t.nodes(1)) + .enable_node_control() + .with_run_duration(Duration::from_secs(1)) + .build()?; + + let deployer = LocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + let context = runner.context(); + + let control = context.node_control().ok_or("node control not available")?; + + let node_name = "node-0"; + let old_pid = control.node_pid(node_name).ok_or("missing node pid")?; + + control.restart_node(node_name).await?; + + let new_pid = control.node_pid(node_name).ok_or("missing node pid")?; + assert_ne!(old_pid, new_pid, "expected a new process after restart"); + + control.stop_node(node_name).await?; + assert!( + control.node_pid(node_name).is_none(), + "expected node pid to be absent after stop" + ); + + let _handle = runner.run(&mut scenario).await?; + + Ok(()) +} + +#[tokio::test] +#[ignore = "requires local node binary and open ports"] +async fn manual_cluster_restart_node() -> Result<(), Box> { + let _ = try_init(); + let deployer = LocalDeployer::default(); + let cluster = deployer.manual_cluster(TopologyConfig::with_node_numbers(1))?; + + let node_name = cluster.start_node("a").await?.name; + + let old_pid = cluster.node_pid(&node_name).ok_or("missing node pid")?; + + cluster.restart_node(&node_name).await?; + + let new_pid = cluster.node_pid(&node_name).ok_or("missing node pid")?; + assert_ne!(old_pid, new_pid, "expected a new process after restart"); + + cluster.stop_node(&node_name).await?; + assert!( + cluster.node_pid(&node_name).is_none(), + "expected node pid to be absent after stop" + ); + + Ok(()) +} diff --git a/testing-framework/workflows/src/workloads/chaos.rs b/testing-framework/workflows/src/workloads/chaos.rs index b964152..20c7afe 100644 --- a/testing-framework/workflows/src/workloads/chaos.rs +++ b/testing-framework/workflows/src/workloads/chaos.rs @@ -44,7 +44,7 @@ impl RandomRestartWorkload { if self.include_nodes { if node_count > 1 { for index in 0..node_count { - targets.push(Target::Node(index)); + targets.push(Target::Node(format!("node-{index}"))); } } else if node_count == 1 { info!("chaos restart skipping nodes: only one node configured"); @@ -76,7 +76,7 @@ impl RandomRestartWorkload { let ready = now.checked_sub(self.target_cooldown).unwrap_or(now); targets .iter() - .copied() + .cloned() .map(|target| (target, ready)) .collect() } @@ -111,16 +111,16 @@ impl RandomRestartWorkload { let available: Vec = targets .iter() - .copied() + .cloned() .filter(|target| cooldowns.get(target).is_none_or(|ready| *ready <= now)) .collect(); - if let Some(choice) = available.choose(&mut thread_rng()).copied() { + if let Some(choice) = available.choose(&mut thread_rng()).cloned() { tracing::debug!(?choice, "chaos restart picked target"); return Ok(choice); } - if let Some(choice) = targets.choose(&mut thread_rng()).copied() { + if let Some(choice) = targets.choose(&mut thread_rng()).cloned() { return Ok(choice); } return Err("chaos restart workload has no eligible targets".into()); @@ -158,10 +158,10 @@ impl Workload for RandomRestartWorkload { let target = self.pick_target(&targets, &cooldowns).await?; match target { - Target::Node(index) => { - tracing::info!(index, "chaos restarting node"); + Target::Node(ref name) => { + tracing::info!(name, "chaos restarting node"); handle - .restart_node(index) + .restart_node(name) .await .map_err(|err| format!("node restart failed: {err}"))? } @@ -172,7 +172,7 @@ impl Workload for RandomRestartWorkload { } } -#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +#[derive(Clone, PartialEq, Eq, Hash, Debug)] enum Target { - Node(usize), + Node(String), }