Add manual cluster restart options

This commit is contained in:
andrussal 2026-04-02 08:18:52 +02:00
parent f73179193b
commit 5b8610a937
6 changed files with 141 additions and 5 deletions

View File

@ -35,6 +35,8 @@ pub enum PeerSelection {
pub struct StartNodeOptions<E: Application> {
/// How to select initial peers on startup.
pub peers: PeerSelection,
/// Additional command-line arguments passed to the node binary.
pub args: Vec<String>,
/// Optional backend-specific initial config override.
pub config_override: Option<E::NodeConfig>,
/// Optional patch callback applied to generated node config before spawn.
@ -51,6 +53,7 @@ impl<E: Application> fmt::Debug for StartNodeOptions<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StartNodeOptions")
.field("peers", &self.peers)
.field("args", &self.args)
.field("config_override", &self.config_override.is_some())
.field("config_patch", &self.config_patch.is_some())
.field("persist_dir", &self.persist_dir)
@ -63,6 +66,7 @@ impl<E: Application> Default for StartNodeOptions<E> {
fn default() -> Self {
Self {
peers: PeerSelection::DefaultLayout,
args: Vec::new(),
config_override: None,
config_patch: None,
persist_dir: None,
@ -79,6 +83,12 @@ impl<E: Application> StartNodeOptions<E> {
self
}
#[must_use]
pub fn with_args(mut self, args: impl IntoIterator<Item = String>) -> Self {
self.args.extend(args);
self
}
#[must_use]
pub fn with_config_override(mut self, config_override: E::NodeConfig) -> Self {
self.config_override = Some(config_override);

View File

@ -9,6 +9,14 @@ pub trait NodeControlHandle<E: Application>: Send + Sync {
Err("restart_node not supported by this deployer".into())
}
async fn restart_node_with(
&self,
_name: &str,
_options: StartNodeOptions<E>,
) -> Result<(), DynError> {
Err("restart_node_with not supported by this deployer".into())
}
async fn start_node(&self, _name: &str) -> Result<StartedNode<E>, DynError> {
Err("start_node not supported by this deployer".into())
}

View File

@ -260,6 +260,19 @@ pub fn yaml_config_launch_spec<T: Serialize>(
rendered_config_launch_spec(config_yaml.into_bytes(), spec)
}
pub fn build_launch_spec_with_args<E: LocalDeployerEnv>(
config: &<E as Application>::NodeConfig,
dir: &Path,
label: &str,
extra_args: &[String],
) -> Result<LaunchSpec, DynError> {
let mut launch = E::build_launch_spec(config, dir, label)?;
launch.args.extend(extra_args.iter().cloned());
Ok(launch)
}
pub fn text_config_launch_spec(
rendered_config: impl Into<Vec<u8>>,
spec: &LocalProcessSpec,
@ -574,11 +587,14 @@ pub async fn spawn_node_from_config<E: LocalDeployerEnv>(
keep_tempdir: bool,
persist_dir: Option<&std::path::Path>,
snapshot_dir: Option<&std::path::Path>,
extra_args: &[String],
) -> Result<Node<E>, ProcessSpawnError> {
let extra_args = extra_args.to_vec();
ProcessNode::spawn(
&label,
config,
E::build_launch_spec,
move |config, dir, label| build_launch_spec_with_args::<E>(config, dir, label, &extra_args),
E::node_endpoints,
keep_tempdir,
persist_dir,

View File

@ -70,6 +70,14 @@ impl<E: LocalDeployerEnv> ManualCluster<E> {
Ok(self.nodes.restart_node(name).await?)
}
pub async fn restart_node_with(
&self,
name: &str,
options: StartNodeOptions<E>,
) -> Result<(), ManualClusterError> {
Ok(self.nodes.restart_node_with(name, options).await?)
}
pub async fn stop_node(&self, name: &str) -> Result<(), ManualClusterError> {
Ok(self.nodes.stop_node(name).await?)
}
@ -125,6 +133,17 @@ impl<E: LocalDeployerEnv> NodeControlHandle<E> for ManualCluster<E> {
.map_err(|err| err.into())
}
async fn restart_node_with(
&self,
name: &str,
options: StartNodeOptions<E>,
) -> Result<(), DynError> {
self.nodes
.restart_node_with(name, options)
.await
.map_err(|err| err.into())
}
async fn stop_node(&self, name: &str) -> Result<(), DynError> {
self.nodes.stop_node(name).await.map_err(|err| err.into())
}

View File

@ -4,13 +4,13 @@ use std::{
};
use testing_framework_core::scenario::{
Application, DynError, NodeClients, NodeControlHandle, ReadinessError, StartNodeOptions,
StartedNode, wait_for_http_ports,
Application, DynError, NodeClients, NodeControlHandle, PeerSelection, ReadinessError,
StartNodeOptions, StartedNode, wait_for_http_ports,
};
use thiserror::Error;
use crate::{
env::{LocalDeployerEnv, Node, spawn_node_from_config},
env::{LocalDeployerEnv, Node, build_launch_spec_with_args, spawn_node_from_config},
process::ProcessSpawnError,
};
@ -92,6 +92,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
keep_tempdir,
persist_dir.as_deref(),
snapshot_dir.as_deref(),
&[],
)
.await?,
);
@ -260,6 +261,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
built.config,
options.persist_dir.as_deref(),
options.snapshot_dir.as_deref(),
&options.args,
)
.await?;
@ -270,9 +272,28 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
}
pub async fn restart_node(&self, name: &str) -> Result<(), NodeManagerError> {
self.restart_node_with(name, StartNodeOptions::<E>::default())
.await
}
pub async fn restart_node_with(
&self,
name: &str,
options: StartNodeOptions<E>,
) -> Result<(), NodeManagerError> {
validate_restart_options::<E>(&options)?;
let (index, mut node) = self.take_node(name)?;
if let Err(source) = node.restart().await {
let launch = build_launch_spec_with_args::<E>(
node.config(),
node.working_dir(),
name,
&options.args,
)
.map_err(|source| NodeManagerError::Config { source })?;
if let Err(source) = node.restart_with_launch(launch).await {
self.put_node_back(index, node);
return Err(NodeManagerError::Restart {
@ -301,6 +322,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
config: <E as Application>::NodeConfig,
persist_dir: Option<&std::path::Path>,
snapshot_dir: Option<&std::path::Path>,
extra_args: &[String],
) -> Result<E::NodeClient, NodeManagerError> {
let node = spawn_node_from_config::<E>(
node_name.to_string(),
@ -308,6 +330,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
self.keep_tempdir,
persist_dir,
snapshot_dir,
extra_args,
)
.await
.map_err(|source| NodeManagerError::Spawn {
@ -440,6 +463,16 @@ impl<E: LocalDeployerEnv> NodeControlHandle<E> for NodeManager<E> {
self.restart_node(name).await.map_err(|err| err.into())
}
async fn restart_node_with(
&self,
name: &str,
options: StartNodeOptions<E>,
) -> Result<(), DynError> {
self.restart_node_with(name, options)
.await
.map_err(|err| err.into())
}
async fn stop_node(&self, name: &str) -> Result<(), DynError> {
self.stop_node(name).await.map_err(|err| err.into())
}
@ -468,3 +501,39 @@ impl<E: LocalDeployerEnv> NodeControlHandle<E> for NodeManager<E> {
self.node_pid(name)
}
}
fn validate_restart_options<E: LocalDeployerEnv>(
options: &StartNodeOptions<E>,
) -> Result<(), NodeManagerError> {
if !matches!(options.peers, PeerSelection::DefaultLayout) {
return Err(NodeManagerError::InvalidArgument {
message: "restart_node_with does not support peer selection overrides".to_string(),
});
}
if options.config_override.is_some() {
return Err(NodeManagerError::InvalidArgument {
message: "restart_node_with does not support config_override".to_string(),
});
}
if options.config_patch.is_some() {
return Err(NodeManagerError::InvalidArgument {
message: "restart_node_with does not support config_patch".to_string(),
});
}
if options.persist_dir.is_some() {
return Err(NodeManagerError::InvalidArgument {
message: "restart_node_with does not support persist_dir".to_string(),
});
}
if options.snapshot_dir.is_some() {
return Err(NodeManagerError::InvalidArgument {
message: "restart_node_with does not support snapshot_dir".to_string(),
});
}
Ok(())
}

View File

@ -168,6 +168,10 @@ impl<Config: Clone + Send + Sync + 'static, Client: Clone + Send + Sync + 'stati
&self.endpoints
}
pub fn working_dir(&self) -> &Path {
self.tempdir.path()
}
pub fn pid(&self) -> u32 {
self.child.id().unwrap_or_default()
}
@ -253,6 +257,16 @@ impl<Config: Clone + Send + Sync + 'static, Client: Clone + Send + Sync + 'stati
Ok(())
}
pub async fn restart_with_launch(
&mut self,
launch: LaunchSpec,
) -> Result<(), ProcessSpawnError> {
self.stop_child().await?;
self.launch = launch;
self.child = self.spawn_child().await?;
Ok(())
}
pub async fn stop(&mut self) {
let _ = self.stop_child().await;
}