feat(local): support restart launch args and runtime timeouts

This commit is contained in:
andrussal 2026-04-23 06:54:21 +02:00
parent e6d692f4d6
commit 5baf6b9577
11 changed files with 225 additions and 68 deletions

View File

@ -1,4 +1,4 @@
use std::{fmt, marker::PhantomData, path::PathBuf, sync::Arc};
use std::{fmt, marker::PhantomData, path::PathBuf, sync::Arc, time::Duration};
use reqwest::Url;
@ -34,7 +34,7 @@ pub enum PeerSelection {
#[derive(Clone)]
pub struct StartNodeOptions<E: Application> {
/// How to select initial peers on startup.
pub peers: PeerSelection,
pub peers: Option<PeerSelection>,
/// Optional backend-specific initial config override.
pub config_override: Option<E::NodeConfig>,
/// Optional patch callback applied to generated node config before spawn.
@ -44,9 +44,20 @@ pub struct StartNodeOptions<E: Application> {
pub persist_dir: Option<PathBuf>,
/// Optional directory whose contents should seed the node working dir.
pub snapshot_dir: Option<PathBuf>,
/// Extra process arguments appended on launch.
pub args: Vec<String>,
/// Runtime policy for this node launch.
pub runtime: NodeRuntimeOptions,
_phantom: PhantomData<E>,
}
/// Runtime supervision options for a node process.
#[derive(Clone, Copy, Debug, Default)]
pub struct NodeRuntimeOptions {
/// Optional readiness/start timeout override for this node.
pub start_timeout: Option<Duration>,
}
impl<E: Application> fmt::Debug for StartNodeOptions<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StartNodeOptions")
@ -55,6 +66,8 @@ impl<E: Application> fmt::Debug for StartNodeOptions<E> {
.field("config_patch", &self.config_patch.is_some())
.field("persist_dir", &self.persist_dir)
.field("snapshot_dir", &self.snapshot_dir)
.field("args", &self.args)
.field("runtime", &self.runtime)
.finish()
}
}
@ -62,11 +75,13 @@ impl<E: Application> fmt::Debug for StartNodeOptions<E> {
impl<E: Application> Default for StartNodeOptions<E> {
fn default() -> Self {
Self {
peers: PeerSelection::DefaultLayout,
peers: None,
config_override: None,
config_patch: None,
persist_dir: None,
snapshot_dir: None,
args: Vec::new(),
runtime: NodeRuntimeOptions::default(),
_phantom: PhantomData,
}
}
@ -75,7 +90,7 @@ impl<E: Application> Default for StartNodeOptions<E> {
impl<E: Application> StartNodeOptions<E> {
#[must_use]
pub fn with_peers(mut self, peers: PeerSelection) -> Self {
self.peers = peers;
self.peers = Some(peers);
self
}
@ -105,6 +120,24 @@ impl<E: Application> StartNodeOptions<E> {
self.snapshot_dir = Some(snapshot_dir);
self
}
#[must_use]
pub fn with_args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.args.extend(args.into_iter().map(Into::into));
self
}
#[must_use]
pub fn with_runtime(mut self, runtime: NodeRuntimeOptions) -> Self {
self.runtime = runtime;
self
}
#[must_use]
pub fn with_start_timeout(mut self, start_timeout: Duration) -> Self {
self.runtime.start_timeout = Some(start_timeout);
self
}
}
/// Indicates whether a capability requires node control.

View File

@ -163,16 +163,16 @@ where
options: &StartNodeOptions<Self>,
) -> Result<Option<ArtifactSet>, Self::Error> {
let mut config = match &options.peers {
PeerSelection::DefaultLayout => {
None | Some(PeerSelection::DefaultLayout) => {
if options.config_override.is_none() && options.config_patch.is_none() {
return Ok(None);
}
build_static_cluster_node_config::<T>(deployment, node_index, Some(hostnames))?
}
PeerSelection::None => {
Some(PeerSelection::None) => {
build_cluster_node_config_for_indices::<T>(node_index, hostnames, &[])?
}
PeerSelection::Named(names) => {
Some(PeerSelection::Named(names)) => {
let indices = resolve_named_peer_indices::<T>(deployment, node_index, names)?;
build_cluster_node_config_for_indices::<T>(node_index, hostnames, &indices)?
}

View File

@ -9,12 +9,12 @@ pub trait NodeControlHandle<E: Application>: Send + Sync {
Err("restart_node not supported by this deployer".into())
}
async fn restart_node_with_args(
async fn restart_node_with(
&self,
_name: &str,
_args: Vec<String>,
_options: StartNodeOptions<E>,
) -> Result<(), DynError> {
Err("restart_node_with_args not supported by this deployer".into())
Err("restart_node_with not supported by this deployer".into())
}
async fn start_node(&self, _name: &str) -> Result<StartedNode<E>, DynError> {

View File

@ -22,8 +22,8 @@ pub type DynError = Box<dyn Error + Send + Sync + 'static>;
pub use builder_ext::{BuilderInputError, ObservabilityBuilderExt};
pub use capabilities::{
NodeControlCapability, ObservabilityCapability, PeerSelection, RequiresNodeControl,
StartNodeOptions, StartedNode,
NodeControlCapability, NodeRuntimeOptions, ObservabilityCapability, PeerSelection,
RequiresNodeControl, StartNodeOptions, StartedNode,
};
pub use client::NodeAccess;
pub use common_builder_ext::CoreBuilderExt;
@ -45,6 +45,7 @@ pub use runtime::{
},
wait_for_http_ports, wait_for_http_ports_with_host,
wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement,
wait_for_http_ports_with_requirement_and_timeout, wait_for_http_ports_with_timeout,
wait_http_readiness, wait_until_stable,
};
pub use sources::{

View File

@ -23,6 +23,7 @@ pub use node_clients::NodeClients;
pub use readiness::{
HttpReadinessRequirement, ReadinessError, StabilizationConfig, wait_for_http_ports,
wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement,
wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable,
wait_for_http_ports_with_requirement, wait_for_http_ports_with_requirement_and_timeout,
wait_for_http_ports_with_timeout, wait_http_readiness, wait_until_stable,
};
pub use runner::Runner;

View File

@ -232,12 +232,20 @@ where
pub async fn wait_http_readiness(
endpoints: &[Url],
requirement: HttpReadinessRequirement,
) -> Result<(), ReadinessError> {
wait_http_readiness_with_timeout(endpoints, requirement, None).await
}
pub async fn wait_http_readiness_with_timeout(
endpoints: &[Url],
requirement: HttpReadinessRequirement,
timeout: Option<Duration>,
) -> Result<(), ReadinessError> {
if endpoints.is_empty() {
return Ok(());
}
let (poll_interval, max_attempts) = http_retry_plan();
let (poll_interval, max_attempts) = http_retry_plan(timeout);
let client = Client::new();
let retry = RetryConfig::bounded(max_attempts, poll_interval, poll_interval);
retry_async(retry, |_| async {
@ -254,8 +262,8 @@ pub async fn wait_http_readiness(
})
}
fn http_retry_plan() -> (Duration, usize) {
let timeout_duration = adjust_timeout(DEFAULT_TIMEOUT);
fn http_retry_plan(timeout: Option<Duration>) -> (Duration, usize) {
let timeout_duration = adjust_timeout(timeout.unwrap_or(DEFAULT_TIMEOUT));
let poll_interval = DEFAULT_POLL_INTERVAL;
let max_attempts = retry_attempts(timeout_duration, poll_interval);
(poll_interval, max_attempts)
@ -269,17 +277,39 @@ fn retry_attempts(timeout: Duration, interval: Duration) -> usize {
}
pub async fn wait_for_http_ports(ports: &[u16], endpoint_path: &str) -> Result<(), ReadinessError> {
wait_for_http_ports_with_requirement(ports, endpoint_path, default_readiness_requirement())
.await
wait_for_http_ports_with_timeout(ports, endpoint_path, None).await
}
pub async fn wait_for_http_ports_with_timeout(
ports: &[u16],
endpoint_path: &str,
timeout: Option<Duration>,
) -> Result<(), ReadinessError> {
wait_for_http_ports_with_requirement_and_timeout(
ports,
endpoint_path,
default_readiness_requirement(),
timeout,
)
.await
}
pub async fn wait_for_http_ports_with_requirement(
ports: &[u16],
endpoint_path: &str,
requirement: HttpReadinessRequirement,
) -> Result<(), ReadinessError> {
wait_for_http_ports_with_requirement_and_timeout(ports, endpoint_path, requirement, None).await
}
pub async fn wait_for_http_ports_with_requirement_and_timeout(
ports: &[u16],
endpoint_path: &str,
requirement: HttpReadinessRequirement,
timeout: Option<Duration>,
) -> Result<(), ReadinessError> {
let endpoints = build_local_endpoints(ports, endpoint_path)?;
wait_http_readiness(&endpoints, requirement).await
wait_http_readiness_with_timeout(&endpoints, requirement, timeout).await
}
pub async fn wait_for_http_ports_with_host(

View File

@ -14,7 +14,7 @@ use testing_framework_core::{
manual::ManualClusterHandle,
scenario::{
ClusterWaitHandle, DynError, ExternalNodeSource, HttpReadinessRequirement, NodeClients,
NodeControlHandle, StartNodeOptions, StartedNode,
NodeControlHandle, PeerSelection, StartNodeOptions, StartedNode,
},
};
use thiserror::Error;
@ -604,10 +604,7 @@ fn validate_start_options<E: K8sDeployEnv>(
fn ensure_default_cfgsync_options<E: K8sDeployEnv>(
options: &StartNodeOptions<E>,
) -> Result<(), ManualClusterError> {
let default_peers = matches!(
options.peers,
testing_framework_core::scenario::PeerSelection::DefaultLayout
);
let default_peers = matches!(options.peers, None | Some(PeerSelection::DefaultLayout));
if default_peers && options.config_override.is_none() && options.config_patch.is_none() {
return Ok(());
}
@ -719,14 +716,14 @@ mod tests {
options: &StartNodeOptions<Self>,
) -> Result<Option<cfgsync_artifacts::ArtifactSet>, Self::Error> {
let mut config = match &options.peers {
PeerSelection::DefaultLayout => {
None | Some(PeerSelection::DefaultLayout) => {
if options.config_override.is_none() && options.config_patch.is_none() {
return Ok(None);
}
format!("node={node_index};peers=default")
}
PeerSelection::None => format!("node={node_index};peers=none"),
PeerSelection::Named(names) => {
Some(PeerSelection::None) => format!("node={node_index};peers=none"),
Some(PeerSelection::Named(names)) => {
format!("node={node_index};peers={}", names.join(","))
}
};

View File

@ -22,10 +22,10 @@ mod tests;
pub use helpers::{
BuiltNodeConfig, LocalNodePorts, LocalPeerNode, LocalProcessSpec, NodeConfigEntry,
build_indexed_http_peers, build_indexed_node_configs, build_launch_spec_with_args,
build_local_cluster_node_config,
build_local_peer_nodes, default_yaml_launch_spec, discovered_node_access, preallocate_ports,
reserve_local_node_ports, single_http_node_endpoints, text_config_launch_spec,
text_node_config, yaml_config_launch_spec, yaml_node_config,
build_local_cluster_node_config, build_local_peer_nodes, default_yaml_launch_spec,
discovered_node_access, preallocate_ports, reserve_local_node_ports,
single_http_node_endpoints, text_config_launch_spec, text_node_config, yaml_config_launch_spec,
yaml_node_config,
};
/// Context passed while building a local node config.

View File

@ -70,12 +70,12 @@ impl<E: LocalDeployerEnv> ManualCluster<E> {
Ok(self.nodes.restart_node(name).await?)
}
pub async fn restart_node_with_args(
pub async fn restart_node_with(
&self,
name: &str,
args: Vec<String>,
options: StartNodeOptions<E>,
) -> Result<(), ManualClusterError> {
Ok(self.nodes.restart_node_with_args(name, args).await?)
Ok(self.nodes.restart_node_with(name, options).await?)
}
pub async fn stop_node(&self, name: &str) -> Result<(), ManualClusterError> {
@ -133,9 +133,13 @@ impl<E: LocalDeployerEnv> NodeControlHandle<E> for ManualCluster<E> {
.map_err(|err| err.into())
}
async fn restart_node_with_args(&self, name: &str, args: Vec<String>) -> Result<(), DynError> {
async fn restart_node_with(
&self,
name: &str,
options: StartNodeOptions<E>,
) -> Result<(), DynError> {
self.nodes
.restart_node_with_args(name, args)
.restart_node_with(name, options)
.await
.map_err(|err| err.into())
}

View File

@ -4,8 +4,8 @@ use std::{
};
use testing_framework_core::scenario::{
Application, DynError, NodeClients, NodeControlHandle, ReadinessError, StartNodeOptions,
StartedNode, wait_for_http_ports,
Application, DynError, NodeClients, NodeControlHandle, NodeRuntimeOptions, ReadinessError,
StartNodeOptions, StartedNode, wait_for_http_ports, wait_for_http_ports_with_timeout,
};
use thiserror::Error;
@ -31,6 +31,12 @@ struct NodeStartSnapshot<Config> {
template_config: Option<Config>,
}
#[derive(Clone, Copy)]
struct NodeReadinessTarget {
port: u16,
runtime: NodeRuntimeOptions,
}
#[derive(Debug, Error)]
pub enum NodeManagerError {
#[error("failed to generate node config: {source}")]
@ -120,6 +126,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
peer_ports_by_name: seed.peer_ports_by_name.clone(),
clients_by_name: HashMap::new(),
indices_by_name: HashMap::new(),
runtime_by_name: HashMap::new(),
nodes: Vec::new(),
template_config: None,
};
@ -166,6 +173,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
.clone_from(&self.seed.peer_ports_by_name);
state.clients_by_name.clear();
state.indices_by_name.clear();
state.runtime_by_name.clear();
state.node_count = self.seed.node_count;
state.template_config = None;
self.node_clients.clear();
@ -183,7 +191,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
let client = node.client();
self.node_clients.add_node(client.clone());
state.register_node(&name, port, client, node);
state.register_node(&name, port, client, NodeRuntimeOptions::default(), node);
}
}
@ -210,28 +218,15 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
}
pub async fn wait_node_ready(&self, name: &str) -> Result<(), NodeManagerError> {
let port = {
let state = self.lock_state();
let index =
*state
.indices_by_name
.get(name)
.ok_or_else(|| NodeManagerError::NodeName {
name: name.to_string(),
})?;
let target = self.readiness_target(name)?;
state
.nodes
.get(index)
.map(|node| node.endpoints().api.port())
.ok_or_else(|| NodeManagerError::NodeName {
name: name.to_string(),
})?
};
wait_for_http_ports(&[port], readiness_endpoint_path::<E>())
.await
.map_err(|source| NodeManagerError::Readiness { source })
wait_for_http_ports_with_timeout(
&[target.port],
readiness_endpoint_path::<E>(),
target.runtime.start_timeout,
)
.await
.map_err(|source| NodeManagerError::Readiness { source })
}
pub async fn start_node_with(
@ -263,9 +258,10 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
&snapshot.node_name,
built.network_port,
built.config,
options.runtime,
options.persist_dir.as_deref(),
options.snapshot_dir.as_deref(),
&[],
&options.args,
)
.await?;
@ -276,21 +272,24 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
}
pub async fn restart_node(&self, name: &str) -> Result<(), NodeManagerError> {
self.restart_node_with_args(name, Vec::new()).await
self.restart_node_with(name, StartNodeOptions::default())
.await
}
pub async fn restart_node_with_args(
pub async fn restart_node_with(
&self,
name: &str,
args: Vec<String>,
options: StartNodeOptions<E>,
) -> Result<(), NodeManagerError> {
let (index, mut node) = self.take_node(name)?;
validate_restart_options(&options)?;
let launch = build_launch_spec_with_args::<E>(
node.config(),
node.working_dir(),
name,
&args,
&options.args,
)
.map_err(|source| NodeManagerError::Config { source })?;
@ -303,6 +302,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
}
self.put_node_back(index, node);
self.store_runtime_options(name, options.runtime);
Ok(())
}
@ -321,6 +321,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
node_name: &str,
network_port: u16,
config: <E as Application>::NodeConfig,
runtime: NodeRuntimeOptions,
persist_dir: Option<&std::path::Path>,
snapshot_dir: Option<&std::path::Path>,
extra_args: &[String],
@ -346,7 +347,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
state.template_config = Some(node.config().clone());
}
state.register_node(node_name, network_port, client.clone(), node);
state.register_node(node_name, network_port, client.clone(), runtime, node);
Ok(client)
}
@ -361,6 +362,20 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
reinsert_node_at(&mut state, index, node);
}
fn store_runtime_options(&self, name: &str, runtime: NodeRuntimeOptions) {
let mut state = self.lock_state();
state.runtime_by_name.insert(name.to_string(), runtime);
}
fn readiness_target(&self, name: &str) -> Result<NodeReadinessTarget, NodeManagerError> {
let state = self.lock_state();
let index = node_index(&state, name)?;
let port = node_api_port(&state, index, name)?;
let runtime = node_runtime_options(&state, name);
Ok(NodeReadinessTarget { port, runtime })
}
fn start_snapshot(
&self,
requested_name: &str,
@ -391,6 +406,7 @@ fn clear_registered_nodes<E: LocalDeployerEnv>(state: &mut LocalNodeManagerState
state.peer_ports_by_name.clear();
state.clients_by_name.clear();
state.indices_by_name.clear();
state.runtime_by_name.clear();
state.node_count = 0;
state.template_config = None;
}
@ -423,6 +439,72 @@ fn normalize_node_name(index: usize, requested_name: &str) -> String {
format!("node-{requested_name}")
}
fn validate_restart_options<E: LocalDeployerEnv>(
options: &StartNodeOptions<E>,
) -> Result<(), NodeManagerError> {
if options.peers.is_some() {
return Err(unsupported_restart_override("peer selection"));
}
if options.config_override.is_some() {
return Err(unsupported_restart_override("config override"));
}
if options.config_patch.is_some() {
return Err(unsupported_restart_override("config patch"));
}
if options.persist_dir.is_some() {
return Err(unsupported_restart_override("persist dir"));
}
if options.snapshot_dir.is_some() {
return Err(unsupported_restart_override("snapshot dir"));
}
Ok(())
}
fn unsupported_restart_override(field: &str) -> NodeManagerError {
NodeManagerError::InvalidArgument {
message: format!("restart_node_with does not support {field} overrides"),
}
}
fn node_index<E: LocalDeployerEnv>(
state: &LocalNodeManagerState<E>,
name: &str,
) -> Result<usize, NodeManagerError> {
state
.indices_by_name
.get(name)
.copied()
.ok_or_else(|| NodeManagerError::NodeName {
name: name.to_string(),
})
}
fn node_api_port<E: LocalDeployerEnv>(
state: &LocalNodeManagerState<E>,
index: usize,
name: &str,
) -> Result<u16, NodeManagerError> {
state
.nodes
.get(index)
.map(|node| node.endpoints().api.port())
.ok_or_else(|| NodeManagerError::NodeName {
name: name.to_string(),
})
}
fn node_runtime_options<E: LocalDeployerEnv>(
state: &LocalNodeManagerState<E>,
name: &str,
) -> NodeRuntimeOptions {
state.runtime_by_name.get(name).copied().unwrap_or_default()
}
fn default_node_label(index: usize) -> String {
format!("node-{index}")
}
@ -464,8 +546,12 @@ impl<E: LocalDeployerEnv> NodeControlHandle<E> for NodeManager<E> {
self.restart_node(name).await.map_err(|err| err.into())
}
async fn restart_node_with_args(&self, name: &str, args: Vec<String>) -> Result<(), DynError> {
self.restart_node_with_args(name, args)
async fn restart_node_with(
&self,
name: &str,
options: StartNodeOptions<E>,
) -> Result<(), DynError> {
self.restart_node_with(name, options)
.await
.map_err(|err| err.into())
}

View File

@ -1,5 +1,7 @@
use std::collections::HashMap;
use testing_framework_core::scenario::NodeRuntimeOptions;
use crate::env::{LocalDeployerEnv, Node};
pub(crate) struct LocalNodeManagerState<E: LocalDeployerEnv> {
@ -8,6 +10,7 @@ pub(crate) struct LocalNodeManagerState<E: LocalDeployerEnv> {
pub(crate) peer_ports_by_name: HashMap<String, u16>,
pub(crate) clients_by_name: HashMap<String, E::NodeClient>,
pub(crate) indices_by_name: HashMap<String, usize>,
pub(crate) runtime_by_name: HashMap<String, NodeRuntimeOptions>,
pub(crate) nodes: Vec<Node<E>>,
pub(crate) template_config: Option<E::NodeConfig>,
}
@ -25,11 +28,13 @@ impl<E: LocalDeployerEnv> LocalNodeManagerState<E> {
node_name: &str,
network_port: u16,
client: E::NodeClient,
runtime: NodeRuntimeOptions,
node: Node<E>,
) {
self.register_common(node_name, network_port, client);
let index = self.nodes.len();
self.indices_by_name.insert(node_name.to_string(), index);
self.runtime_by_name.insert(node_name.to_string(), runtime);
self.node_count += 1;
self.nodes.push(node);
}