Unify local node control and restart support

This commit is contained in:
andrussal 2026-01-27 11:33:39 +01:00
parent 3977a90682
commit 160dbe1078
15 changed files with 369 additions and 159 deletions

View File

@ -1,10 +1,10 @@
use async_trait::async_trait;
use crate::scenario::{DynError, StartNodeOptions, StartedNode};
use crate::scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode};
/// Interface for imperative, deployer-backed manual clusters.
#[async_trait]
pub trait ManualClusterHandle: Send + Sync {
pub trait ManualClusterHandle: NodeControlHandle {
async fn start_node_with(
&self,
name: &str,

View File

@ -195,7 +195,7 @@ fn write_node_config<C: Serialize>(config: &C, config_path: &Path) -> Result<(),
})
}
fn spawn_node_process(
pub(crate) fn spawn_node_process(
binary_path: &Path,
config_path: &Path,
workdir: &Path,
@ -213,7 +213,9 @@ fn spawn_node_process(
})
}
async fn wait_for_consensus_readiness(api: &ApiClient) -> Result<(), time::error::Elapsed> {
pub(crate) async fn wait_for_consensus_readiness(
api: &ApiClient,
) -> Result<(), time::error::Elapsed> {
time::timeout(STARTUP_TIMEOUT, async {
loop {
if api.consensus_info().await.is_ok() {

View File

@ -13,7 +13,10 @@ use crate::{
common::{
binary::{BinaryConfig, BinaryResolver},
lifecycle::{kill::kill_child, monitor::is_running},
node::{NodeAddresses, NodeConfigCommon, NodeHandle, SpawnNodeError, spawn_node},
node::{
NodeAddresses, NodeConfigCommon, NodeHandle, SpawnNodeError, spawn_node,
spawn_node_process, wait_for_consensus_readiness,
},
},
},
scenario::DynError,
@ -21,6 +24,7 @@ use crate::{
};
const BIN_PATH: &str = "target/debug/logos-blockchain-node";
const RESTART_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
fn binary_path() -> PathBuf {
let cfg = BinaryConfig {
@ -75,6 +79,12 @@ impl Drop for Node {
}
impl Node {
/// Return the current process id for the running node.
#[must_use]
pub fn pid(&self) -> u32 {
self.handle.child.id()
}
/// Check if the node process is still running
pub fn is_running(&mut self) -> bool {
is_running(&mut self.handle.child)
@ -101,6 +111,30 @@ impl Node {
Ok(Self { handle })
}
/// Restart the node process using the existing config and data directory.
pub async fn restart(&mut self) -> Result<(), SpawnNodeError> {
let old_pid = self.pid();
debug!(old_pid, "restarting node process");
kill_child(&mut self.handle.child);
let _ = self.wait_for_exit(RESTART_SHUTDOWN_TIMEOUT).await;
let config_path = self.handle.tempdir.path().join("node.yaml");
let child = spawn_node_process(&binary_path(), &config_path, self.handle.tempdir.path())?;
self.handle.child = child;
let new_pid = self.pid();
wait_for_consensus_readiness(&self.handle.api)
.await
.map_err(|source| SpawnNodeError::Readiness { source })?;
info!(
old_pid,
new_pid, "node restart readiness confirmed via consensus_info"
);
Ok(())
}
}
impl NodeConfigCommon for Config {

View File

@ -1,6 +1,5 @@
use std::sync::Arc;
use async_trait::async_trait;
use reqwest::Url;
use super::DynError;
@ -81,28 +80,6 @@ impl RequiresNodeControl for ObservabilityCapability {
const REQUIRED: bool = false;
}
/// Interface exposed by runners that can restart nodes at runtime.
#[async_trait]
pub trait NodeControlHandle: Send + Sync {
async fn restart_node(&self, index: usize) -> Result<(), DynError>;
async fn start_node(&self, _name: &str) -> Result<StartedNode, DynError> {
Err("start_node not supported by this deployer".into())
}
async fn start_node_with(
&self,
_name: &str,
_options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
Err("start_node_with not supported by this deployer".into())
}
fn node_client(&self, _name: &str) -> Option<ApiClient> {
None
}
}
#[derive(Clone)]
pub struct StartedNode {
pub name: String,

View File

@ -0,0 +1,38 @@
use async_trait::async_trait;
use crate::{
nodes::ApiClient,
scenario::{DynError, StartNodeOptions, StartedNode},
};
/// Deployer-agnostic control surface for runtime node operations.
#[async_trait]
pub trait NodeControlHandle: Send + Sync {
async fn restart_node(&self, _index: usize) -> Result<(), DynError> {
Err("restart_node not supported by this deployer".into())
}
async fn start_node(&self, _name: &str) -> Result<StartedNode, DynError> {
Err("start_node not supported by this deployer".into())
}
async fn start_node_with(
&self,
_name: &str,
_options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
Err("start_node_with not supported by this deployer".into())
}
async fn stop_node(&self, _index: usize) -> Result<(), DynError> {
Err("stop_node not supported by this deployer".into())
}
fn node_client(&self, _name: &str) -> Option<ApiClient> {
None
}
fn node_pid(&self, _index: usize) -> Option<u32> {
None
}
}

View File

@ -2,6 +2,7 @@
mod capabilities;
pub mod cfgsync;
mod control;
mod definition;
mod expectation;
pub mod http_probe;
@ -12,9 +13,10 @@ mod workload;
pub type DynError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub use capabilities::{
NodeControlCapability, NodeControlHandle, ObservabilityCapability, PeerSelection,
RequiresNodeControl, StartNodeOptions, StartedNode,
NodeControlCapability, ObservabilityCapability, PeerSelection, RequiresNodeControl,
StartNodeOptions, StartedNode,
};
pub use control::NodeControlHandle;
pub use definition::{
Builder, Scenario, ScenarioBuildError, ScenarioBuilder, TopologyConfigurator,
};

View File

@ -1,16 +1,9 @@
use std::collections::HashSet;
use thiserror::Error;
use crate::{
nodes::{
common::node::SpawnNodeError,
node::{Node, apply_node_config_patch, create_node_config},
},
scenario,
nodes::node::Node,
topology::{
config::{TopologyBuildError, TopologyBuilder, TopologyConfig},
generation::{GeneratedNodeConfig, find_expected_peer_counts},
generation::find_expected_peer_counts,
readiness::{NetworkReadiness, ReadinessCheck, ReadinessError},
utils::multiaddr_port,
},
@ -21,65 +14,11 @@ pub struct Topology {
pub(crate) nodes: Vec<Node>,
}
pub type DeployedNodes = Vec<Node>;
#[derive(Debug, Error)]
pub enum SpawnTopologyError {
#[error(transparent)]
Build(#[from] TopologyBuildError),
#[error(transparent)]
Node(#[from] SpawnNodeError),
#[error("node config patch failed for node-{index}: {source}")]
ConfigPatch {
index: usize,
source: scenario::DynError,
},
}
impl Topology {
pub async fn spawn(config: TopologyConfig) -> Result<Self, SpawnTopologyError> {
let generated = TopologyBuilder::new(config.clone()).build()?;
let nodes = Self::spawn_nodes(generated.nodes()).await?;
Ok(Self { nodes })
}
pub async fn spawn_with_empty_membership(
config: TopologyConfig,
ids: &[[u8; 32]],
blend_ports: &[u16],
) -> Result<Self, SpawnTopologyError> {
let generated = TopologyBuilder::new(config.clone())
.with_ids(ids.to_vec())
.with_blend_ports(blend_ports.to_vec())
.build()?;
let nodes = Self::spawn_nodes(generated.nodes()).await?;
Ok(Self { nodes })
}
pub(crate) async fn spawn_nodes(
nodes: &[GeneratedNodeConfig],
) -> Result<DeployedNodes, SpawnTopologyError> {
let mut spawned = Vec::new();
for node in nodes {
let mut config = create_node_config(node.general.clone());
if let Some(patch) = node.config_patch.as_ref() {
config = apply_node_config_patch(config, patch).map_err(|source| {
SpawnTopologyError::ConfigPatch {
index: node.index,
source,
}
})?;
}
let label = format!("node-{}", node.index);
spawned.push(Node::spawn(config, &label).await?);
}
Ok(spawned)
/// Construct a topology from already-spawned nodes.
#[must_use]
pub fn from_nodes(nodes: Vec<Node>) -> Self {
Self { nodes }
}
#[must_use]
@ -87,6 +26,11 @@ impl Topology {
&self.nodes
}
#[must_use]
pub fn into_nodes(self) -> Vec<Node> {
self.nodes
}
pub async fn wait_network_ready(&self) -> Result<(), ReadinessError> {
let listen_ports = self.node_listen_ports();
if listen_ports.len() <= 1 {

View File

@ -5,7 +5,6 @@ use reqwest::{Client, Url};
use crate::topology::{
config::{NodeConfigPatch, TopologyConfig},
configs::{GeneralConfig, wallet::WalletAccount},
deployment::{SpawnTopologyError, Topology},
readiness::{HttpNetworkReadiness, ReadinessCheck, ReadinessError},
};
@ -82,12 +81,6 @@ impl GeneratedTopology {
&self.config.wallet_config.accounts
}
pub async fn spawn_local(&self) -> Result<Topology, SpawnTopologyError> {
let nodes = Topology::spawn_nodes(self.nodes()).await?;
Ok(Topology { nodes })
}
pub async fn wait_remote_readiness(
&self,
// Node endpoints

View File

@ -3,5 +3,5 @@ mod node_control;
mod runner;
pub use manual::{LocalManualCluster, ManualClusterError};
pub use node_control::{LocalDynamicError, LocalDynamicNodes, LocalDynamicSeed};
pub use node_control::{LocalNodeManager, LocalNodeManagerError, LocalNodeManagerSeed};
pub use runner::{LocalDeployer, LocalDeployerError};

View File

@ -1,7 +1,7 @@
use testing_framework_core::{
manual::ManualClusterHandle,
nodes::ApiClient,
scenario::{DynError, StartNodeOptions, StartedNode},
scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode},
topology::{
config::{TopologyBuildError, TopologyBuilder, TopologyConfig},
readiness::{ReadinessCheck, ReadinessError},
@ -9,7 +9,7 @@ use testing_framework_core::{
};
use thiserror::Error;
use crate::node_control::{LocalDynamicError, LocalDynamicNodes, ReadinessNode};
use crate::node_control::{LocalNodeManager, LocalNodeManagerError, ReadinessNode};
mod readiness;
@ -23,12 +23,12 @@ pub enum ManualClusterError {
source: TopologyBuildError,
},
#[error(transparent)]
Dynamic(#[from] LocalDynamicError),
Dynamic(#[from] LocalNodeManagerError),
}
/// Imperative, in-process cluster that can start nodes on demand.
pub struct LocalManualCluster {
nodes: LocalDynamicNodes,
nodes: LocalNodeManager,
}
impl LocalManualCluster {
@ -37,7 +37,7 @@ impl LocalManualCluster {
let descriptors = builder
.build()
.map_err(|source| ManualClusterError::Build { source })?;
let nodes = LocalDynamicNodes::new(
let nodes = LocalNodeManager::new(
descriptors,
testing_framework_core::scenario::NodeClients::default(),
);
@ -49,6 +49,11 @@ impl LocalManualCluster {
self.nodes.node_client(name)
}
#[must_use]
pub fn node_pid(&self, index: usize) -> Option<u32> {
self.nodes.node_pid(index)
}
pub async fn start_node(&self, name: &str) -> Result<StartedNode, ManualClusterError> {
Ok(self
.nodes
@ -68,6 +73,10 @@ impl LocalManualCluster {
self.nodes.stop_all();
}
pub async fn restart_node(&self, index: usize) -> Result<(), ManualClusterError> {
Ok(self.nodes.restart_node(index).await?)
}
pub async fn wait_network_ready(&self) -> Result<(), ReadinessError> {
let nodes = self.nodes.readiness_nodes();
if self.is_singleton(&nodes) {
@ -92,6 +101,40 @@ impl Drop for LocalManualCluster {
}
}
#[async_trait::async_trait]
impl NodeControlHandle for LocalManualCluster {
async fn restart_node(&self, index: usize) -> Result<(), DynError> {
self.nodes
.restart_node(index)
.await
.map_err(|err| err.into())
}
async fn start_node(&self, name: &str) -> Result<StartedNode, DynError> {
self.start_node_with(name, StartNodeOptions::default())
.await
.map_err(|err| err.into())
}
async fn start_node_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, DynError> {
self.start_node_with(name, options)
.await
.map_err(|err| err.into())
}
fn node_client(&self, name: &str) -> Option<ApiClient> {
self.node_client(name)
}
fn node_pid(&self, index: usize) -> Option<u32> {
self.node_pid(index)
}
}
#[async_trait::async_trait]
impl ManualClusterHandle for LocalManualCluster {
async fn start_node_with(

View File

@ -17,7 +17,7 @@ use testing_framework_core::{
},
};
use super::LocalDynamicError;
use super::LocalNodeManagerError;
pub(super) fn build_general_config_for(
descriptors: &GeneratedTopology,
@ -27,7 +27,7 @@ pub(super) fn build_general_config_for(
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions,
peer_ports: &[u16],
) -> Result<(GeneralConfig, u16, Option<NodeConfigPatch>), LocalDynamicError> {
) -> Result<(GeneralConfig, u16, Option<NodeConfigPatch>), LocalNodeManagerError> {
if let Some(node) = descriptor_for(descriptors, index) {
let mut config = node.general.clone();
let initial_peers = resolve_initial_peers(
@ -59,7 +59,7 @@ pub(super) fn build_general_config_for(
base_consensus,
base_time,
)
.map_err(|source| LocalDynamicError::Config { source })?;
.map_err(|source| LocalNodeManagerError::Config { source })?;
Ok((general_config, network_port, None))
}
@ -71,13 +71,13 @@ fn descriptor_for(descriptors: &GeneratedTopology, index: usize) -> Option<&Gene
fn resolve_peer_names(
peer_ports_by_name: &HashMap<String, u16>,
peer_names: &[String],
) -> Result<Vec<Multiaddr>, LocalDynamicError> {
) -> Result<Vec<Multiaddr>, LocalNodeManagerError> {
let mut peers = Vec::with_capacity(peer_names.len());
for name in peer_names {
let port =
peer_ports_by_name
.get(name)
.ok_or_else(|| LocalDynamicError::InvalidArgument {
.ok_or_else(|| LocalNodeManagerError::InvalidArgument {
message: format!("unknown peer name '{name}'"),
})?;
peers.push(testing_framework_config::node_address_from_port(*port));
@ -91,7 +91,7 @@ fn resolve_initial_peers(
default_peers: &[Multiaddr],
descriptors: &GeneratedTopology,
peer_ports: &[u16],
) -> Result<Vec<Multiaddr>, LocalDynamicError> {
) -> Result<Vec<Multiaddr>, LocalNodeManagerError> {
match &options.peers {
PeerSelection::Named(names) => resolve_peer_names(peer_ports_by_name, names),
PeerSelection::DefaultLayout => {
@ -112,8 +112,8 @@ fn random_node_id() -> [u8; 32] {
id
}
fn allocate_udp_port(label: &'static str) -> Result<u16, LocalDynamicError> {
get_available_udp_port().ok_or_else(|| LocalDynamicError::PortAllocation {
fn allocate_udp_port(label: &'static str) -> Result<u16, LocalNodeManagerError> {
get_available_udp_port().ok_or_else(|| LocalNodeManagerError::PortAllocation {
message: format!("failed to allocate free UDP port for {label}"),
})
}

View File

@ -12,6 +12,7 @@ use testing_framework_core::{
},
scenario::{DynError, NodeControlHandle, StartNodeOptions, StartedNode},
topology::{
deployment::Topology,
generation::{GeneratedTopology, find_expected_peer_counts},
utils::multiaddr_port,
},
@ -22,11 +23,11 @@ mod config;
mod state;
use config::build_general_config_for;
use state::LocalDynamicState;
use state::LocalNodeManagerState;
use testing_framework_core::scenario::NodeClients;
#[derive(Debug, Error)]
pub enum LocalDynamicError {
pub enum LocalNodeManagerError {
#[error("failed to generate node config: {source}")]
Config {
#[source]
@ -43,25 +44,32 @@ pub enum LocalDynamicError {
PortAllocation { message: String },
#[error("node config patch failed: {message}")]
ConfigPatch { message: String },
#[error("node index {index} is out of bounds")]
NodeIndex { index: usize },
#[error("failed to restart node: {source}")]
Restart {
#[source]
source: testing_framework_core::nodes::common::node::SpawnNodeError,
},
}
pub struct LocalDynamicNodes {
pub struct LocalNodeManager {
descriptors: GeneratedTopology,
base_consensus: consensus::GeneralConsensusConfig,
base_time: time::GeneralTimeConfig,
node_clients: NodeClients,
seed: LocalDynamicSeed,
state: Mutex<LocalDynamicState>,
seed: LocalNodeManagerSeed,
state: Mutex<LocalNodeManagerState>,
}
#[derive(Clone, Default)]
pub struct LocalDynamicSeed {
pub struct LocalNodeManagerSeed {
pub node_count: usize,
pub peer_ports: Vec<u16>,
pub peer_ports_by_name: HashMap<String, u16>,
}
impl LocalDynamicSeed {
impl LocalNodeManagerSeed {
#[must_use]
pub fn from_topology(descriptors: &GeneratedTopology) -> Self {
let peer_ports = descriptors
@ -90,15 +98,39 @@ pub(crate) struct ReadinessNode {
pub(crate) api: ApiClient,
}
impl LocalDynamicNodes {
impl LocalNodeManager {
fn default_label(index: usize) -> String {
format!("node-{index}")
}
pub async fn spawn_initial_nodes(
descriptors: &GeneratedTopology,
) -> Result<Vec<Node>, testing_framework_core::nodes::common::node::SpawnNodeError> {
let mut nodes = Vec::with_capacity(descriptors.nodes().len());
for node in descriptors.nodes() {
let label = Self::default_label(node.index());
let config = create_node_config(node.general.clone());
let spawned = Node::spawn(config, &label).await?;
nodes.push(spawned);
}
Ok(nodes)
}
pub async fn spawn_initial_topology(
descriptors: &GeneratedTopology,
) -> Result<Topology, testing_framework_core::nodes::common::node::SpawnNodeError> {
let nodes = Self::spawn_initial_nodes(descriptors).await?;
Ok(Topology::from_nodes(nodes))
}
pub fn new(descriptors: GeneratedTopology, node_clients: NodeClients) -> Self {
Self::new_with_seed(descriptors, node_clients, LocalDynamicSeed::default())
Self::new_with_seed(descriptors, node_clients, LocalNodeManagerSeed::default())
}
pub fn new_with_seed(
descriptors: GeneratedTopology,
node_clients: NodeClients,
seed: LocalDynamicSeed,
seed: LocalNodeManagerSeed,
) -> Self {
let base_node = descriptors
.nodes()
@ -108,7 +140,7 @@ impl LocalDynamicNodes {
let base_consensus = base_node.general.consensus_config.clone();
let base_time = base_node.general.time_config.clone();
let state = LocalDynamicState {
let state = LocalNodeManagerState {
node_count: seed.node_count,
peer_ports: seed.peer_ports.clone(),
peer_ports_by_name: seed.peer_ports_by_name.clone(),
@ -136,6 +168,16 @@ impl LocalDynamicNodes {
state.clients_by_name.get(name).cloned()
}
#[must_use]
pub fn node_pid(&self, index: usize) -> Option<u32> {
let state = self
.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state.nodes.get(index).map(|node| node.pid())
}
pub fn stop_all(&self) {
let mut state = self
.state
@ -152,11 +194,40 @@ impl LocalDynamicNodes {
self.node_clients.clear();
}
pub fn initialize_with_nodes(&self, nodes: Vec<Node>) {
self.node_clients.clear();
let mut state = self
.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state.nodes.clear();
state.peer_ports.clear();
state.peer_ports_by_name.clear();
state.clients_by_name.clear();
state.node_count = 0;
for (idx, node) in nodes.into_iter().enumerate() {
let name = Self::default_label(idx);
let port = node.config().network.backend.swarm.port;
let client = node.api().clone();
self.node_clients.add_node(client.clone());
state.register_node(&name, port, client, node);
}
}
#[must_use]
pub fn node_clients(&self) -> NodeClients {
self.node_clients.clone()
}
pub async fn start_node_with(
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, LocalDynamicError> {
) -> Result<StartedNode, LocalNodeManagerError> {
self.start_node(name, options).await
}
@ -204,7 +275,7 @@ impl LocalDynamicNodes {
&self,
name: &str,
options: StartNodeOptions,
) -> Result<StartedNode, LocalDynamicError> {
) -> Result<StartedNode, LocalNodeManagerError> {
let (peer_ports, peer_ports_by_name, node_name, index) = {
let state = self
.state
@ -213,13 +284,13 @@ impl LocalDynamicNodes {
let index = state.node_count;
let label = if name.trim().is_empty() {
format!("node-{index}")
Self::default_label(index)
} else {
format!("node-{name}")
};
if state.peer_ports_by_name.contains_key(&label) {
return Err(LocalDynamicError::InvalidArgument {
return Err(LocalNodeManagerError::InvalidArgument {
message: format!("node name '{label}' already exists"),
});
}
@ -258,15 +329,47 @@ impl LocalDynamicNodes {
})
}
pub async fn restart_node(&self, index: usize) -> Result<(), LocalNodeManagerError> {
let mut node = {
let mut state = self
.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if index >= state.nodes.len() {
return Err(LocalNodeManagerError::NodeIndex { index });
}
state.nodes.remove(index)
};
node.restart()
.await
.map_err(|source| LocalNodeManagerError::Restart { source })?;
let mut state = self
.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if index <= state.nodes.len() {
state.nodes.insert(index, node);
} else {
state.nodes.push(node);
}
Ok(())
}
async fn spawn_and_register_node(
&self,
node_name: &str,
network_port: u16,
config: NodeConfig,
) -> Result<ApiClient, LocalDynamicError> {
) -> Result<ApiClient, LocalNodeManagerError> {
let node = Node::spawn(config, node_name)
.await
.map_err(|source| LocalDynamicError::Spawn { source })?;
.map_err(|source| LocalNodeManagerError::Spawn { source })?;
let client = node.api().clone();
self.node_clients.add_node(client.clone());
@ -286,7 +389,7 @@ fn build_node_config(
general_config: testing_framework_config::topology::configs::GeneralConfig,
descriptor_patch: Option<&testing_framework_core::topology::config::NodeConfigPatch>,
options_patch: Option<&testing_framework_core::topology::config::NodeConfigPatch>,
) -> Result<NodeConfig, LocalDynamicError> {
) -> Result<NodeConfig, LocalNodeManagerError> {
let mut config = create_node_config(general_config);
config = apply_patch_if_needed(config, descriptor_patch)?;
config = apply_patch_if_needed(config, options_patch)?;
@ -297,20 +400,20 @@ fn build_node_config(
fn apply_patch_if_needed(
config: NodeConfig,
patch: Option<&testing_framework_core::topology::config::NodeConfigPatch>,
) -> Result<NodeConfig, LocalDynamicError> {
) -> Result<NodeConfig, LocalNodeManagerError> {
let Some(patch) = patch else {
return Ok(config);
};
apply_node_config_patch(config, patch).map_err(|err| LocalDynamicError::ConfigPatch {
apply_node_config_patch(config, patch).map_err(|err| LocalNodeManagerError::ConfigPatch {
message: err.to_string(),
})
}
#[async_trait::async_trait]
impl NodeControlHandle for LocalDynamicNodes {
async fn restart_node(&self, _index: usize) -> Result<(), DynError> {
Err("local deployer does not support restart_node".into())
impl NodeControlHandle for LocalNodeManager {
async fn restart_node(&self, index: usize) -> Result<(), DynError> {
self.restart_node(index).await.map_err(|err| err.into())
}
async fn start_node(&self, name: &str) -> Result<StartedNode, DynError> {
@ -332,4 +435,8 @@ impl NodeControlHandle for LocalDynamicNodes {
fn node_client(&self, name: &str) -> Option<ApiClient> {
self.node_client(name)
}
fn node_pid(&self, index: usize) -> Option<u32> {
self.node_pid(index)
}
}

View File

@ -2,7 +2,7 @@ use std::collections::HashMap;
use testing_framework_core::nodes::{ApiClient, node::Node};
pub(crate) struct LocalDynamicState {
pub(crate) struct LocalNodeManagerState {
pub(crate) node_count: usize,
pub(crate) peer_ports: Vec<u16>,
pub(crate) peer_ports_by_name: HashMap<String, u16>,
@ -10,7 +10,7 @@ pub(crate) struct LocalDynamicState {
pub(crate) nodes: Vec<Node>,
}
impl LocalDynamicState {
impl LocalNodeManagerState {
fn register_common(&mut self, node_name: &str, network_port: u16, client: ApiClient) {
self.peer_ports.push(network_port);
self.peer_ports_by_name

View File

@ -6,18 +6,14 @@ use testing_framework_core::{
BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, NodeControlCapability,
RunContext, Runner, Scenario, ScenarioError, spawn_block_feed,
},
topology::{
config::TopologyConfig,
deployment::{SpawnTopologyError, Topology},
readiness::ReadinessError,
},
topology::{config::TopologyConfig, deployment::Topology, readiness::ReadinessError},
};
use thiserror::Error;
use tracing::{debug, info};
use crate::{
manual::{LocalManualCluster, ManualClusterError},
node_control::{LocalDynamicNodes, LocalDynamicSeed},
node_control::{LocalNodeManager, LocalNodeManagerSeed},
};
/// Spawns nodes as local processes, reusing the existing
/// integration harness.
@ -32,7 +28,7 @@ pub enum LocalDeployerError {
#[error("failed to spawn local topology: {source}")]
Spawn {
#[source]
source: SpawnTopologyError,
source: testing_framework_core::nodes::common::node::SpawnNodeError,
},
#[error("readiness probe failed: {source}")]
ReadinessFailed {
@ -103,19 +99,35 @@ impl Deployer<NodeControlCapability> for LocalDeployer {
"starting local deployment with node control"
);
let topology = Self::prepare_topology(scenario, self.membership_check).await?;
let node_clients = NodeClients::from_topology(scenario.topology(), &topology);
let node_control = Arc::new(LocalDynamicNodes::new_with_seed(
let mut nodes = LocalNodeManager::spawn_initial_nodes(scenario.topology())
.await
.map_err(|source| LocalDeployerError::Spawn { source })?;
if self.membership_check {
let topology = Topology::from_nodes(nodes);
wait_for_readiness(&topology).await.map_err(|source| {
debug!(error = ?source, "local readiness failed");
LocalDeployerError::ReadinessFailed { source }
})?;
nodes = topology.into_nodes();
info!("local nodes are ready");
} else {
info!("skipping local membership readiness checks");
}
let node_control = Arc::new(LocalNodeManager::new_with_seed(
scenario.topology().clone(),
node_clients.clone(),
LocalDynamicSeed::from_topology(scenario.topology()),
NodeClients::default(),
LocalNodeManagerSeed::from_topology(scenario.topology()),
));
node_control.initialize_with_nodes(nodes);
let node_clients = node_control.node_clients();
let (block_feed, block_feed_guard) = spawn_block_feed_with(&node_clients).await?;
let context = RunContext::new(
scenario.topology().clone(),
Some(topology),
None,
node_clients,
scenario.duration(),
Metrics::empty(),
@ -158,9 +170,7 @@ impl LocalDeployer {
info!(nodes = descriptors.nodes().len(), "spawning local nodes");
let topology = descriptors
.clone()
.spawn_local()
let topology = LocalNodeManager::spawn_initial_topology(descriptors)
.await
.map_err(|source| LocalDeployerError::Spawn { source })?;

View File

@ -0,0 +1,60 @@
use std::time::Duration;
use testing_framework_core::{
scenario::{Deployer, ScenarioBuilder},
topology::config::TopologyConfig,
};
use testing_framework_runner_local::LocalDeployer;
#[tokio::test]
#[ignore = "requires local node binary and open ports"]
async fn local_restart_node() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut scenario = ScenarioBuilder::topology_with(|t| t.nodes(1))
.enable_node_control()
.with_run_duration(Duration::from_secs(1))
.build()?;
let deployer = LocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
let context = runner.context();
let control = context.node_control().ok_or("node control not available")?;
let old_pid = control.node_pid(0).ok_or("missing node pid")?;
control.restart_node(0).await?;
let new_pid = control.node_pid(0).ok_or("missing node pid")?;
assert_ne!(old_pid, new_pid, "expected a new process after restart");
let client = context
.node_clients()
.any_client()
.ok_or("no node clients available")?;
client.consensus_info().await?;
let _handle = runner.run(&mut scenario).await?;
Ok(())
}
#[tokio::test]
#[ignore = "requires local node binary and open ports"]
async fn manual_cluster_restart_node() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let deployer = LocalDeployer::default();
let cluster = deployer.manual_cluster(TopologyConfig::with_node_numbers(1))?;
cluster.start_node("a").await?;
let old_pid = cluster.node_pid(0).ok_or("missing node pid")?;
cluster.restart_node(0).await?;
let new_pid = cluster.node_pid(0).ok_or("missing node pid")?;
assert_ne!(old_pid, new_pid, "expected a new process after restart");
let client = cluster.node_client("node-a").ok_or("missing node client")?;
client.consensus_info().await?;
Ok(())
}