diff --git a/testing-framework/core/src/scenario/capabilities.rs b/testing-framework/core/src/scenario/capabilities.rs index fc056e4..d2275ab 100644 --- a/testing-framework/core/src/scenario/capabilities.rs +++ b/testing-framework/core/src/scenario/capabilities.rs @@ -1,4 +1,4 @@ -use std::{fmt, marker::PhantomData, path::PathBuf, sync::Arc}; +use std::{fmt, marker::PhantomData, path::PathBuf, sync::Arc, time::Duration}; use reqwest::Url; @@ -34,7 +34,7 @@ pub enum PeerSelection { #[derive(Clone)] pub struct StartNodeOptions { /// How to select initial peers on startup. - pub peers: PeerSelection, + pub peers: Option, /// Optional backend-specific initial config override. pub config_override: Option, /// Optional patch callback applied to generated node config before spawn. @@ -44,9 +44,20 @@ pub struct StartNodeOptions { pub persist_dir: Option, /// Optional directory whose contents should seed the node working dir. pub snapshot_dir: Option, + /// Extra process arguments appended on launch. + pub args: Vec, + /// Runtime policy for this node launch. + pub runtime: NodeRuntimeOptions, _phantom: PhantomData, } +/// Runtime supervision options for a node process. +#[derive(Clone, Copy, Debug, Default)] +pub struct NodeRuntimeOptions { + /// Optional readiness/start timeout override for this node. + pub start_timeout: Option, +} + impl fmt::Debug for StartNodeOptions { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StartNodeOptions") @@ -55,6 +66,8 @@ impl fmt::Debug for StartNodeOptions { .field("config_patch", &self.config_patch.is_some()) .field("persist_dir", &self.persist_dir) .field("snapshot_dir", &self.snapshot_dir) + .field("args", &self.args) + .field("runtime", &self.runtime) .finish() } } @@ -62,11 +75,13 @@ impl fmt::Debug for StartNodeOptions { impl Default for StartNodeOptions { fn default() -> Self { Self { - peers: PeerSelection::DefaultLayout, + peers: None, config_override: None, config_patch: None, persist_dir: None, snapshot_dir: None, + args: Vec::new(), + runtime: NodeRuntimeOptions::default(), _phantom: PhantomData, } } @@ -75,7 +90,7 @@ impl Default for StartNodeOptions { impl StartNodeOptions { #[must_use] pub fn with_peers(mut self, peers: PeerSelection) -> Self { - self.peers = peers; + self.peers = Some(peers); self } @@ -105,6 +120,24 @@ impl StartNodeOptions { self.snapshot_dir = Some(snapshot_dir); self } + + #[must_use] + pub fn with_args(mut self, args: impl IntoIterator>) -> Self { + self.args.extend(args.into_iter().map(Into::into)); + self + } + + #[must_use] + pub fn with_runtime(mut self, runtime: NodeRuntimeOptions) -> Self { + self.runtime = runtime; + self + } + + #[must_use] + pub fn with_start_timeout(mut self, start_timeout: Duration) -> Self { + self.runtime.start_timeout = Some(start_timeout); + self + } } /// Indicates whether a capability requires node control. diff --git a/testing-framework/core/src/scenario/config.rs b/testing-framework/core/src/scenario/config.rs index bd1aa7e..0a3530e 100644 --- a/testing-framework/core/src/scenario/config.rs +++ b/testing-framework/core/src/scenario/config.rs @@ -163,16 +163,16 @@ where options: &StartNodeOptions, ) -> Result, Self::Error> { let mut config = match &options.peers { - PeerSelection::DefaultLayout => { + None | Some(PeerSelection::DefaultLayout) => { if options.config_override.is_none() && options.config_patch.is_none() { return Ok(None); } build_static_cluster_node_config::(deployment, node_index, Some(hostnames))? } - PeerSelection::None => { + Some(PeerSelection::None) => { build_cluster_node_config_for_indices::(node_index, hostnames, &[])? } - PeerSelection::Named(names) => { + Some(PeerSelection::Named(names)) => { let indices = resolve_named_peer_indices::(deployment, node_index, names)?; build_cluster_node_config_for_indices::(node_index, hostnames, &indices)? } diff --git a/testing-framework/core/src/scenario/control.rs b/testing-framework/core/src/scenario/control.rs index 7cc95c9..951543b 100644 --- a/testing-framework/core/src/scenario/control.rs +++ b/testing-framework/core/src/scenario/control.rs @@ -9,12 +9,12 @@ pub trait NodeControlHandle: Send + Sync { Err("restart_node not supported by this deployer".into()) } - async fn restart_node_with_args( + async fn restart_node_with( &self, _name: &str, - _args: Vec, + _options: StartNodeOptions, ) -> Result<(), DynError> { - Err("restart_node_with_args not supported by this deployer".into()) + Err("restart_node_with not supported by this deployer".into()) } async fn start_node(&self, _name: &str) -> Result, DynError> { diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index 29eb104..0ceeeab 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -22,8 +22,8 @@ pub type DynError = Box; pub use builder_ext::{BuilderInputError, ObservabilityBuilderExt}; pub use capabilities::{ - NodeControlCapability, ObservabilityCapability, PeerSelection, RequiresNodeControl, - StartNodeOptions, StartedNode, + NodeControlCapability, NodeRuntimeOptions, ObservabilityCapability, PeerSelection, + RequiresNodeControl, StartNodeOptions, StartedNode, }; pub use client::NodeAccess; pub use common_builder_ext::CoreBuilderExt; @@ -45,6 +45,7 @@ pub use runtime::{ }, wait_for_http_ports, wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement, + wait_for_http_ports_with_requirement_and_timeout, wait_for_http_ports_with_timeout, wait_http_readiness, wait_until_stable, }; pub use sources::{ diff --git a/testing-framework/core/src/scenario/runtime/mod.rs b/testing-framework/core/src/scenario/runtime/mod.rs index 1c614fa..cae03b8 100644 --- a/testing-framework/core/src/scenario/runtime/mod.rs +++ b/testing-framework/core/src/scenario/runtime/mod.rs @@ -23,6 +23,7 @@ pub use node_clients::NodeClients; pub use readiness::{ HttpReadinessRequirement, ReadinessError, StabilizationConfig, wait_for_http_ports, wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, - wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable, + wait_for_http_ports_with_requirement, wait_for_http_ports_with_requirement_and_timeout, + wait_for_http_ports_with_timeout, wait_http_readiness, wait_until_stable, }; pub use runner::Runner; diff --git a/testing-framework/core/src/scenario/runtime/readiness.rs b/testing-framework/core/src/scenario/runtime/readiness.rs index f44e929..fa36e87 100644 --- a/testing-framework/core/src/scenario/runtime/readiness.rs +++ b/testing-framework/core/src/scenario/runtime/readiness.rs @@ -232,12 +232,20 @@ where pub async fn wait_http_readiness( endpoints: &[Url], requirement: HttpReadinessRequirement, +) -> Result<(), ReadinessError> { + wait_http_readiness_with_timeout(endpoints, requirement, None).await +} + +pub async fn wait_http_readiness_with_timeout( + endpoints: &[Url], + requirement: HttpReadinessRequirement, + timeout: Option, ) -> Result<(), ReadinessError> { if endpoints.is_empty() { return Ok(()); } - let (poll_interval, max_attempts) = http_retry_plan(); + let (poll_interval, max_attempts) = http_retry_plan(timeout); let client = Client::new(); let retry = RetryConfig::bounded(max_attempts, poll_interval, poll_interval); retry_async(retry, |_| async { @@ -254,8 +262,8 @@ pub async fn wait_http_readiness( }) } -fn http_retry_plan() -> (Duration, usize) { - let timeout_duration = adjust_timeout(DEFAULT_TIMEOUT); +fn http_retry_plan(timeout: Option) -> (Duration, usize) { + let timeout_duration = adjust_timeout(timeout.unwrap_or(DEFAULT_TIMEOUT)); let poll_interval = DEFAULT_POLL_INTERVAL; let max_attempts = retry_attempts(timeout_duration, poll_interval); (poll_interval, max_attempts) @@ -269,17 +277,39 @@ fn retry_attempts(timeout: Duration, interval: Duration) -> usize { } pub async fn wait_for_http_ports(ports: &[u16], endpoint_path: &str) -> Result<(), ReadinessError> { - wait_for_http_ports_with_requirement(ports, endpoint_path, default_readiness_requirement()) - .await + wait_for_http_ports_with_timeout(ports, endpoint_path, None).await +} + +pub async fn wait_for_http_ports_with_timeout( + ports: &[u16], + endpoint_path: &str, + timeout: Option, +) -> Result<(), ReadinessError> { + wait_for_http_ports_with_requirement_and_timeout( + ports, + endpoint_path, + default_readiness_requirement(), + timeout, + ) + .await } pub async fn wait_for_http_ports_with_requirement( ports: &[u16], endpoint_path: &str, requirement: HttpReadinessRequirement, +) -> Result<(), ReadinessError> { + wait_for_http_ports_with_requirement_and_timeout(ports, endpoint_path, requirement, None).await +} + +pub async fn wait_for_http_ports_with_requirement_and_timeout( + ports: &[u16], + endpoint_path: &str, + requirement: HttpReadinessRequirement, + timeout: Option, ) -> Result<(), ReadinessError> { let endpoints = build_local_endpoints(ports, endpoint_path)?; - wait_http_readiness(&endpoints, requirement).await + wait_http_readiness_with_timeout(&endpoints, requirement, timeout).await } pub async fn wait_for_http_ports_with_host( diff --git a/testing-framework/deployers/k8s/src/manual.rs b/testing-framework/deployers/k8s/src/manual.rs index 400fe0f..1c10c70 100644 --- a/testing-framework/deployers/k8s/src/manual.rs +++ b/testing-framework/deployers/k8s/src/manual.rs @@ -14,7 +14,7 @@ use testing_framework_core::{ manual::ManualClusterHandle, scenario::{ ClusterWaitHandle, DynError, ExternalNodeSource, HttpReadinessRequirement, NodeClients, - NodeControlHandle, StartNodeOptions, StartedNode, + NodeControlHandle, PeerSelection, StartNodeOptions, StartedNode, }, }; use thiserror::Error; @@ -604,10 +604,7 @@ fn validate_start_options( fn ensure_default_cfgsync_options( options: &StartNodeOptions, ) -> Result<(), ManualClusterError> { - let default_peers = matches!( - options.peers, - testing_framework_core::scenario::PeerSelection::DefaultLayout - ); + let default_peers = matches!(options.peers, None | Some(PeerSelection::DefaultLayout)); if default_peers && options.config_override.is_none() && options.config_patch.is_none() { return Ok(()); } @@ -719,14 +716,14 @@ mod tests { options: &StartNodeOptions, ) -> Result, Self::Error> { let mut config = match &options.peers { - PeerSelection::DefaultLayout => { + None | Some(PeerSelection::DefaultLayout) => { if options.config_override.is_none() && options.config_patch.is_none() { return Ok(None); } format!("node={node_index};peers=default") } - PeerSelection::None => format!("node={node_index};peers=none"), - PeerSelection::Named(names) => { + Some(PeerSelection::None) => format!("node={node_index};peers=none"), + Some(PeerSelection::Named(names)) => { format!("node={node_index};peers={}", names.join(",")) } }; diff --git a/testing-framework/deployers/local/src/env/mod.rs b/testing-framework/deployers/local/src/env/mod.rs index 7450f16..bbf36ea 100644 --- a/testing-framework/deployers/local/src/env/mod.rs +++ b/testing-framework/deployers/local/src/env/mod.rs @@ -22,10 +22,10 @@ mod tests; pub use helpers::{ BuiltNodeConfig, LocalNodePorts, LocalPeerNode, LocalProcessSpec, NodeConfigEntry, build_indexed_http_peers, build_indexed_node_configs, build_launch_spec_with_args, - build_local_cluster_node_config, - build_local_peer_nodes, default_yaml_launch_spec, discovered_node_access, preallocate_ports, - reserve_local_node_ports, single_http_node_endpoints, text_config_launch_spec, - text_node_config, yaml_config_launch_spec, yaml_node_config, + build_local_cluster_node_config, build_local_peer_nodes, default_yaml_launch_spec, + discovered_node_access, preallocate_ports, reserve_local_node_ports, + single_http_node_endpoints, text_config_launch_spec, text_node_config, yaml_config_launch_spec, + yaml_node_config, }; /// Context passed while building a local node config. diff --git a/testing-framework/deployers/local/src/manual/mod.rs b/testing-framework/deployers/local/src/manual/mod.rs index 489e150..ffe0be2 100644 --- a/testing-framework/deployers/local/src/manual/mod.rs +++ b/testing-framework/deployers/local/src/manual/mod.rs @@ -70,12 +70,12 @@ impl ManualCluster { Ok(self.nodes.restart_node(name).await?) } - pub async fn restart_node_with_args( + pub async fn restart_node_with( &self, name: &str, - args: Vec, + options: StartNodeOptions, ) -> Result<(), ManualClusterError> { - Ok(self.nodes.restart_node_with_args(name, args).await?) + Ok(self.nodes.restart_node_with(name, options).await?) } pub async fn stop_node(&self, name: &str) -> Result<(), ManualClusterError> { @@ -133,9 +133,13 @@ impl NodeControlHandle for ManualCluster { .map_err(|err| err.into()) } - async fn restart_node_with_args(&self, name: &str, args: Vec) -> Result<(), DynError> { + async fn restart_node_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result<(), DynError> { self.nodes - .restart_node_with_args(name, args) + .restart_node_with(name, options) .await .map_err(|err| err.into()) } diff --git a/testing-framework/deployers/local/src/node_control/mod.rs b/testing-framework/deployers/local/src/node_control/mod.rs index d3351dc..0b82550 100644 --- a/testing-framework/deployers/local/src/node_control/mod.rs +++ b/testing-framework/deployers/local/src/node_control/mod.rs @@ -4,8 +4,8 @@ use std::{ }; use testing_framework_core::scenario::{ - Application, DynError, NodeClients, NodeControlHandle, ReadinessError, StartNodeOptions, - StartedNode, wait_for_http_ports, + Application, DynError, NodeClients, NodeControlHandle, NodeRuntimeOptions, ReadinessError, + StartNodeOptions, StartedNode, wait_for_http_ports, wait_for_http_ports_with_timeout, }; use thiserror::Error; @@ -31,6 +31,12 @@ struct NodeStartSnapshot { template_config: Option, } +#[derive(Clone, Copy)] +struct NodeReadinessTarget { + port: u16, + runtime: NodeRuntimeOptions, +} + #[derive(Debug, Error)] pub enum NodeManagerError { #[error("failed to generate node config: {source}")] @@ -120,6 +126,7 @@ impl NodeManager { peer_ports_by_name: seed.peer_ports_by_name.clone(), clients_by_name: HashMap::new(), indices_by_name: HashMap::new(), + runtime_by_name: HashMap::new(), nodes: Vec::new(), template_config: None, }; @@ -166,6 +173,7 @@ impl NodeManager { .clone_from(&self.seed.peer_ports_by_name); state.clients_by_name.clear(); state.indices_by_name.clear(); + state.runtime_by_name.clear(); state.node_count = self.seed.node_count; state.template_config = None; self.node_clients.clear(); @@ -183,7 +191,7 @@ impl NodeManager { let client = node.client(); self.node_clients.add_node(client.clone()); - state.register_node(&name, port, client, node); + state.register_node(&name, port, client, NodeRuntimeOptions::default(), node); } } @@ -210,28 +218,15 @@ impl NodeManager { } pub async fn wait_node_ready(&self, name: &str) -> Result<(), NodeManagerError> { - let port = { - let state = self.lock_state(); - let index = - *state - .indices_by_name - .get(name) - .ok_or_else(|| NodeManagerError::NodeName { - name: name.to_string(), - })?; + let target = self.readiness_target(name)?; - state - .nodes - .get(index) - .map(|node| node.endpoints().api.port()) - .ok_or_else(|| NodeManagerError::NodeName { - name: name.to_string(), - })? - }; - - wait_for_http_ports(&[port], readiness_endpoint_path::()) - .await - .map_err(|source| NodeManagerError::Readiness { source }) + wait_for_http_ports_with_timeout( + &[target.port], + readiness_endpoint_path::(), + target.runtime.start_timeout, + ) + .await + .map_err(|source| NodeManagerError::Readiness { source }) } pub async fn start_node_with( @@ -263,9 +258,10 @@ impl NodeManager { &snapshot.node_name, built.network_port, built.config, + options.runtime, options.persist_dir.as_deref(), options.snapshot_dir.as_deref(), - &[], + &options.args, ) .await?; @@ -276,21 +272,24 @@ impl NodeManager { } pub async fn restart_node(&self, name: &str) -> Result<(), NodeManagerError> { - self.restart_node_with_args(name, Vec::new()).await + self.restart_node_with(name, StartNodeOptions::default()) + .await } - pub async fn restart_node_with_args( + pub async fn restart_node_with( &self, name: &str, - args: Vec, + options: StartNodeOptions, ) -> Result<(), NodeManagerError> { let (index, mut node) = self.take_node(name)?; + validate_restart_options(&options)?; + let launch = build_launch_spec_with_args::( node.config(), node.working_dir(), name, - &args, + &options.args, ) .map_err(|source| NodeManagerError::Config { source })?; @@ -303,6 +302,7 @@ impl NodeManager { } self.put_node_back(index, node); + self.store_runtime_options(name, options.runtime); Ok(()) } @@ -321,6 +321,7 @@ impl NodeManager { node_name: &str, network_port: u16, config: ::NodeConfig, + runtime: NodeRuntimeOptions, persist_dir: Option<&std::path::Path>, snapshot_dir: Option<&std::path::Path>, extra_args: &[String], @@ -346,7 +347,7 @@ impl NodeManager { state.template_config = Some(node.config().clone()); } - state.register_node(node_name, network_port, client.clone(), node); + state.register_node(node_name, network_port, client.clone(), runtime, node); Ok(client) } @@ -361,6 +362,20 @@ impl NodeManager { reinsert_node_at(&mut state, index, node); } + fn store_runtime_options(&self, name: &str, runtime: NodeRuntimeOptions) { + let mut state = self.lock_state(); + state.runtime_by_name.insert(name.to_string(), runtime); + } + + fn readiness_target(&self, name: &str) -> Result { + let state = self.lock_state(); + let index = node_index(&state, name)?; + let port = node_api_port(&state, index, name)?; + let runtime = node_runtime_options(&state, name); + + Ok(NodeReadinessTarget { port, runtime }) + } + fn start_snapshot( &self, requested_name: &str, @@ -391,6 +406,7 @@ fn clear_registered_nodes(state: &mut LocalNodeManagerState state.peer_ports_by_name.clear(); state.clients_by_name.clear(); state.indices_by_name.clear(); + state.runtime_by_name.clear(); state.node_count = 0; state.template_config = None; } @@ -423,6 +439,72 @@ fn normalize_node_name(index: usize, requested_name: &str) -> String { format!("node-{requested_name}") } +fn validate_restart_options( + options: &StartNodeOptions, +) -> Result<(), NodeManagerError> { + if options.peers.is_some() { + return Err(unsupported_restart_override("peer selection")); + } + + if options.config_override.is_some() { + return Err(unsupported_restart_override("config override")); + } + + if options.config_patch.is_some() { + return Err(unsupported_restart_override("config patch")); + } + + if options.persist_dir.is_some() { + return Err(unsupported_restart_override("persist dir")); + } + + if options.snapshot_dir.is_some() { + return Err(unsupported_restart_override("snapshot dir")); + } + + Ok(()) +} + +fn unsupported_restart_override(field: &str) -> NodeManagerError { + NodeManagerError::InvalidArgument { + message: format!("restart_node_with does not support {field} overrides"), + } +} + +fn node_index( + state: &LocalNodeManagerState, + name: &str, +) -> Result { + state + .indices_by_name + .get(name) + .copied() + .ok_or_else(|| NodeManagerError::NodeName { + name: name.to_string(), + }) +} + +fn node_api_port( + state: &LocalNodeManagerState, + index: usize, + name: &str, +) -> Result { + state + .nodes + .get(index) + .map(|node| node.endpoints().api.port()) + .ok_or_else(|| NodeManagerError::NodeName { + name: name.to_string(), + }) +} + +fn node_runtime_options( + state: &LocalNodeManagerState, + name: &str, +) -> NodeRuntimeOptions { + state.runtime_by_name.get(name).copied().unwrap_or_default() +} + fn default_node_label(index: usize) -> String { format!("node-{index}") } @@ -464,8 +546,12 @@ impl NodeControlHandle for NodeManager { self.restart_node(name).await.map_err(|err| err.into()) } - async fn restart_node_with_args(&self, name: &str, args: Vec) -> Result<(), DynError> { - self.restart_node_with_args(name, args) + async fn restart_node_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result<(), DynError> { + self.restart_node_with(name, options) .await .map_err(|err| err.into()) } diff --git a/testing-framework/deployers/local/src/node_control/state.rs b/testing-framework/deployers/local/src/node_control/state.rs index 59e3f1e..b40bfb9 100644 --- a/testing-framework/deployers/local/src/node_control/state.rs +++ b/testing-framework/deployers/local/src/node_control/state.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use testing_framework_core::scenario::NodeRuntimeOptions; + use crate::env::{LocalDeployerEnv, Node}; pub(crate) struct LocalNodeManagerState { @@ -8,6 +10,7 @@ pub(crate) struct LocalNodeManagerState { pub(crate) peer_ports_by_name: HashMap, pub(crate) clients_by_name: HashMap, pub(crate) indices_by_name: HashMap, + pub(crate) runtime_by_name: HashMap, pub(crate) nodes: Vec>, pub(crate) template_config: Option, } @@ -25,11 +28,13 @@ impl LocalNodeManagerState { node_name: &str, network_port: u16, client: E::NodeClient, + runtime: NodeRuntimeOptions, node: Node, ) { self.register_common(node_name, network_port, client); let index = self.nodes.len(); self.indices_by_name.insert(node_name.to_string(), index); + self.runtime_by_name.insert(node_name.to_string(), runtime); self.node_count += 1; self.nodes.push(node); }