diff --git a/.cargo-deny.toml b/.cargo-deny.toml index 01afffa..9c2116c 100644 --- a/.cargo-deny.toml +++ b/.cargo-deny.toml @@ -6,11 +6,6 @@ exclude-dev = true no-default-features = true [advisories] -ignore = [ - # Existing workspace dependencies still resolve rand 0.8 via tera/tokio-retry. - # Track removal when those upstream edges move to a fixed release. - "RUSTSEC-2026-0097", -] yanked = "deny" [bans] diff --git a/Cargo.lock b/Cargo.lock index dfa3b09..5c00aab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -156,7 +156,7 @@ dependencies = [ "nuid", "pin-project", "portable-atomic", - "rand 0.8.5", + "rand 0.8.6", "regex", "ring", "rustls-native-certs", @@ -1977,7 +1977,7 @@ dependencies = [ "ed25519-dalek", "getrandom 0.2.17", "log", - "rand 0.8.5", + "rand 0.8.6", "signatory", ] @@ -1996,7 +1996,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" dependencies = [ - "rand 0.8.5", + "rand 0.8.6", ] [[package]] @@ -2378,7 +2378,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" dependencies = [ "phf_shared", - "rand 0.8.5", + "rand 0.8.6", ] [[package]] @@ -2706,9 +2706,9 @@ checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" [[package]] name = "rand" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a" dependencies = [ "libc", "rand_chacha 0.3.1", @@ -2979,7 +2979,7 @@ dependencies = [ "borsh", "bytes", "num-traits", - "rand 0.8.5", + "rand 0.8.6", "rkyv", "serde", "serde_json", @@ -3059,9 +3059,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.10" +version = "0.103.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef" +checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" dependencies = [ "ring", "rustls-pki-types", @@ -3514,7 +3514,7 @@ dependencies = [ "percent-encoding", "pest", "pest_derive", - "rand 0.8.5", + "rand 0.8.6", "regex", "serde", "serde_json", @@ -3534,7 +3534,7 @@ dependencies = [ "futures", "parking_lot", "prometheus-http-query", - "rand 0.8.5", + "rand 0.8.6", "reqwest", "serde", "serde_yaml", @@ -3755,7 +3755,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" dependencies = [ "pin-project", - "rand 0.8.5", + "rand 0.8.6", "tokio", ] @@ -3818,7 +3818,7 @@ dependencies = [ "futures-sink", "http", "httparse", - "rand 0.8.5", + "rand 0.8.6", "ring", "rustls-pki-types", "tokio", @@ -3997,7 +3997,7 @@ dependencies = [ "http", "httparse", "log", - "rand 0.8.5", + "rand 0.8.6", "sha1", "thiserror 1.0.69", "utf-8", diff --git a/Cargo.toml b/Cargo.toml index 42dd967..661629b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,28 +8,28 @@ members = [ "examples/kvstore/kvstore-node", "examples/kvstore/testing/integration", "examples/kvstore/testing/workloads", - "examples/openraft_kv/examples", - "examples/openraft_kv/openraft-kv-node", - "examples/openraft_kv/testing/integration", - "examples/openraft_kv/testing/workloads", - "examples/queue/examples", - "examples/queue/queue-node", - "examples/queue/testing/integration", - "examples/queue/testing/workloads", "examples/metrics_counter/examples", "examples/metrics_counter/metrics-counter-node", "examples/metrics_counter/testing/integration", "examples/metrics_counter/testing/workloads", - "examples/redis_streams/examples", - "examples/redis_streams/testing/integration", - "examples/redis_streams/testing/workloads", "examples/nats/examples", "examples/nats/testing/integration", "examples/nats/testing/workloads", + "examples/openraft_kv/examples", + "examples/openraft_kv/openraft-kv-node", + "examples/openraft_kv/testing/integration", + "examples/openraft_kv/testing/workloads", "examples/pubsub/examples", "examples/pubsub/pubsub-node", "examples/pubsub/testing/integration", "examples/pubsub/testing/workloads", + "examples/queue/examples", + "examples/queue/queue-node", + "examples/queue/testing/integration", + "examples/queue/testing/workloads", + "examples/redis_streams/examples", + "examples/redis_streams/testing/integration", + "examples/redis_streams/testing/workloads", "testing-framework/core", "testing-framework/deployers/compose", "testing-framework/deployers/k8s", diff --git a/testing-framework/core/src/scenario/capabilities.rs b/testing-framework/core/src/scenario/capabilities.rs index fc056e4..d2275ab 100644 --- a/testing-framework/core/src/scenario/capabilities.rs +++ b/testing-framework/core/src/scenario/capabilities.rs @@ -1,4 +1,4 @@ -use std::{fmt, marker::PhantomData, path::PathBuf, sync::Arc}; +use std::{fmt, marker::PhantomData, path::PathBuf, sync::Arc, time::Duration}; use reqwest::Url; @@ -34,7 +34,7 @@ pub enum PeerSelection { #[derive(Clone)] pub struct StartNodeOptions { /// How to select initial peers on startup. - pub peers: PeerSelection, + pub peers: Option, /// Optional backend-specific initial config override. pub config_override: Option, /// Optional patch callback applied to generated node config before spawn. @@ -44,9 +44,20 @@ pub struct StartNodeOptions { pub persist_dir: Option, /// Optional directory whose contents should seed the node working dir. pub snapshot_dir: Option, + /// Extra process arguments appended on launch. + pub args: Vec, + /// Runtime policy for this node launch. + pub runtime: NodeRuntimeOptions, _phantom: PhantomData, } +/// Runtime supervision options for a node process. +#[derive(Clone, Copy, Debug, Default)] +pub struct NodeRuntimeOptions { + /// Optional readiness/start timeout override for this node. + pub start_timeout: Option, +} + impl fmt::Debug for StartNodeOptions { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StartNodeOptions") @@ -55,6 +66,8 @@ impl fmt::Debug for StartNodeOptions { .field("config_patch", &self.config_patch.is_some()) .field("persist_dir", &self.persist_dir) .field("snapshot_dir", &self.snapshot_dir) + .field("args", &self.args) + .field("runtime", &self.runtime) .finish() } } @@ -62,11 +75,13 @@ impl fmt::Debug for StartNodeOptions { impl Default for StartNodeOptions { fn default() -> Self { Self { - peers: PeerSelection::DefaultLayout, + peers: None, config_override: None, config_patch: None, persist_dir: None, snapshot_dir: None, + args: Vec::new(), + runtime: NodeRuntimeOptions::default(), _phantom: PhantomData, } } @@ -75,7 +90,7 @@ impl Default for StartNodeOptions { impl StartNodeOptions { #[must_use] pub fn with_peers(mut self, peers: PeerSelection) -> Self { - self.peers = peers; + self.peers = Some(peers); self } @@ -105,6 +120,24 @@ impl StartNodeOptions { self.snapshot_dir = Some(snapshot_dir); self } + + #[must_use] + pub fn with_args(mut self, args: impl IntoIterator>) -> Self { + self.args.extend(args.into_iter().map(Into::into)); + self + } + + #[must_use] + pub fn with_runtime(mut self, runtime: NodeRuntimeOptions) -> Self { + self.runtime = runtime; + self + } + + #[must_use] + pub fn with_start_timeout(mut self, start_timeout: Duration) -> Self { + self.runtime.start_timeout = Some(start_timeout); + self + } } /// Indicates whether a capability requires node control. diff --git a/testing-framework/core/src/scenario/config.rs b/testing-framework/core/src/scenario/config.rs index bd1aa7e..0a3530e 100644 --- a/testing-framework/core/src/scenario/config.rs +++ b/testing-framework/core/src/scenario/config.rs @@ -163,16 +163,16 @@ where options: &StartNodeOptions, ) -> Result, Self::Error> { let mut config = match &options.peers { - PeerSelection::DefaultLayout => { + None | Some(PeerSelection::DefaultLayout) => { if options.config_override.is_none() && options.config_patch.is_none() { return Ok(None); } build_static_cluster_node_config::(deployment, node_index, Some(hostnames))? } - PeerSelection::None => { + Some(PeerSelection::None) => { build_cluster_node_config_for_indices::(node_index, hostnames, &[])? } - PeerSelection::Named(names) => { + Some(PeerSelection::Named(names)) => { let indices = resolve_named_peer_indices::(deployment, node_index, names)?; build_cluster_node_config_for_indices::(node_index, hostnames, &indices)? } diff --git a/testing-framework/core/src/scenario/control.rs b/testing-framework/core/src/scenario/control.rs index 4f59621..951543b 100644 --- a/testing-framework/core/src/scenario/control.rs +++ b/testing-framework/core/src/scenario/control.rs @@ -9,6 +9,14 @@ pub trait NodeControlHandle: Send + Sync { Err("restart_node not supported by this deployer".into()) } + async fn restart_node_with( + &self, + _name: &str, + _options: StartNodeOptions, + ) -> Result<(), DynError> { + Err("restart_node_with not supported by this deployer".into()) + } + async fn start_node(&self, _name: &str) -> Result, DynError> { Err("start_node not supported by this deployer".into()) } diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index 29eb104..0ceeeab 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -22,8 +22,8 @@ pub type DynError = Box; pub use builder_ext::{BuilderInputError, ObservabilityBuilderExt}; pub use capabilities::{ - NodeControlCapability, ObservabilityCapability, PeerSelection, RequiresNodeControl, - StartNodeOptions, StartedNode, + NodeControlCapability, NodeRuntimeOptions, ObservabilityCapability, PeerSelection, + RequiresNodeControl, StartNodeOptions, StartedNode, }; pub use client::NodeAccess; pub use common_builder_ext::CoreBuilderExt; @@ -45,6 +45,7 @@ pub use runtime::{ }, wait_for_http_ports, wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement, + wait_for_http_ports_with_requirement_and_timeout, wait_for_http_ports_with_timeout, wait_http_readiness, wait_until_stable, }; pub use sources::{ diff --git a/testing-framework/core/src/scenario/runtime/mod.rs b/testing-framework/core/src/scenario/runtime/mod.rs index 1c614fa..cae03b8 100644 --- a/testing-framework/core/src/scenario/runtime/mod.rs +++ b/testing-framework/core/src/scenario/runtime/mod.rs @@ -23,6 +23,7 @@ pub use node_clients::NodeClients; pub use readiness::{ HttpReadinessRequirement, ReadinessError, StabilizationConfig, wait_for_http_ports, wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, - wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable, + wait_for_http_ports_with_requirement, wait_for_http_ports_with_requirement_and_timeout, + wait_for_http_ports_with_timeout, wait_http_readiness, wait_until_stable, }; pub use runner::Runner; diff --git a/testing-framework/core/src/scenario/runtime/readiness.rs b/testing-framework/core/src/scenario/runtime/readiness.rs index f44e929..fa36e87 100644 --- a/testing-framework/core/src/scenario/runtime/readiness.rs +++ b/testing-framework/core/src/scenario/runtime/readiness.rs @@ -232,12 +232,20 @@ where pub async fn wait_http_readiness( endpoints: &[Url], requirement: HttpReadinessRequirement, +) -> Result<(), ReadinessError> { + wait_http_readiness_with_timeout(endpoints, requirement, None).await +} + +pub async fn wait_http_readiness_with_timeout( + endpoints: &[Url], + requirement: HttpReadinessRequirement, + timeout: Option, ) -> Result<(), ReadinessError> { if endpoints.is_empty() { return Ok(()); } - let (poll_interval, max_attempts) = http_retry_plan(); + let (poll_interval, max_attempts) = http_retry_plan(timeout); let client = Client::new(); let retry = RetryConfig::bounded(max_attempts, poll_interval, poll_interval); retry_async(retry, |_| async { @@ -254,8 +262,8 @@ pub async fn wait_http_readiness( }) } -fn http_retry_plan() -> (Duration, usize) { - let timeout_duration = adjust_timeout(DEFAULT_TIMEOUT); +fn http_retry_plan(timeout: Option) -> (Duration, usize) { + let timeout_duration = adjust_timeout(timeout.unwrap_or(DEFAULT_TIMEOUT)); let poll_interval = DEFAULT_POLL_INTERVAL; let max_attempts = retry_attempts(timeout_duration, poll_interval); (poll_interval, max_attempts) @@ -269,17 +277,39 @@ fn retry_attempts(timeout: Duration, interval: Duration) -> usize { } pub async fn wait_for_http_ports(ports: &[u16], endpoint_path: &str) -> Result<(), ReadinessError> { - wait_for_http_ports_with_requirement(ports, endpoint_path, default_readiness_requirement()) - .await + wait_for_http_ports_with_timeout(ports, endpoint_path, None).await +} + +pub async fn wait_for_http_ports_with_timeout( + ports: &[u16], + endpoint_path: &str, + timeout: Option, +) -> Result<(), ReadinessError> { + wait_for_http_ports_with_requirement_and_timeout( + ports, + endpoint_path, + default_readiness_requirement(), + timeout, + ) + .await } pub async fn wait_for_http_ports_with_requirement( ports: &[u16], endpoint_path: &str, requirement: HttpReadinessRequirement, +) -> Result<(), ReadinessError> { + wait_for_http_ports_with_requirement_and_timeout(ports, endpoint_path, requirement, None).await +} + +pub async fn wait_for_http_ports_with_requirement_and_timeout( + ports: &[u16], + endpoint_path: &str, + requirement: HttpReadinessRequirement, + timeout: Option, ) -> Result<(), ReadinessError> { let endpoints = build_local_endpoints(ports, endpoint_path)?; - wait_http_readiness(&endpoints, requirement).await + wait_http_readiness_with_timeout(&endpoints, requirement, timeout).await } pub async fn wait_for_http_ports_with_host( diff --git a/testing-framework/deployers/k8s/src/manual.rs b/testing-framework/deployers/k8s/src/manual.rs index 400fe0f..1c10c70 100644 --- a/testing-framework/deployers/k8s/src/manual.rs +++ b/testing-framework/deployers/k8s/src/manual.rs @@ -14,7 +14,7 @@ use testing_framework_core::{ manual::ManualClusterHandle, scenario::{ ClusterWaitHandle, DynError, ExternalNodeSource, HttpReadinessRequirement, NodeClients, - NodeControlHandle, StartNodeOptions, StartedNode, + NodeControlHandle, PeerSelection, StartNodeOptions, StartedNode, }, }; use thiserror::Error; @@ -604,10 +604,7 @@ fn validate_start_options( fn ensure_default_cfgsync_options( options: &StartNodeOptions, ) -> Result<(), ManualClusterError> { - let default_peers = matches!( - options.peers, - testing_framework_core::scenario::PeerSelection::DefaultLayout - ); + let default_peers = matches!(options.peers, None | Some(PeerSelection::DefaultLayout)); if default_peers && options.config_override.is_none() && options.config_patch.is_none() { return Ok(()); } @@ -719,14 +716,14 @@ mod tests { options: &StartNodeOptions, ) -> Result, Self::Error> { let mut config = match &options.peers { - PeerSelection::DefaultLayout => { + None | Some(PeerSelection::DefaultLayout) => { if options.config_override.is_none() && options.config_patch.is_none() { return Ok(None); } format!("node={node_index};peers=default") } - PeerSelection::None => format!("node={node_index};peers=none"), - PeerSelection::Named(names) => { + Some(PeerSelection::None) => format!("node={node_index};peers=none"), + Some(PeerSelection::Named(names)) => { format!("node={node_index};peers={}", names.join(",")) } }; diff --git a/testing-framework/deployers/local/src/env/helpers.rs b/testing-framework/deployers/local/src/env/helpers.rs index e2d54fe..7d1d3d5 100644 --- a/testing-framework/deployers/local/src/env/helpers.rs +++ b/testing-framework/deployers/local/src/env/helpers.rs @@ -343,6 +343,20 @@ pub fn yaml_config_launch_spec( rendered_config_launch_spec(config_yaml.into_bytes(), spec) } +pub fn build_launch_spec_with_args( + config: &::NodeConfig, + dir: &std::path::Path, + label: &str, + extra_args: &[String], +) -> Result +where + E: crate::env::LocalDeployerEnv, +{ + let mut launch = E::build_launch_spec(config, dir, label)?; + launch.args.extend(extra_args.iter().cloned()); + Ok(launch) +} + /// Uses an already rendered text config to build a launch spec for `spec`. pub fn text_config_launch_spec( rendered_config: impl Into>, diff --git a/testing-framework/deployers/local/src/env/mod.rs b/testing-framework/deployers/local/src/env/mod.rs index 17ee9ea..bbf36ea 100644 --- a/testing-framework/deployers/local/src/env/mod.rs +++ b/testing-framework/deployers/local/src/env/mod.rs @@ -21,10 +21,11 @@ mod tests; pub use helpers::{ BuiltNodeConfig, LocalNodePorts, LocalPeerNode, LocalProcessSpec, NodeConfigEntry, - build_indexed_http_peers, build_indexed_node_configs, build_local_cluster_node_config, - build_local_peer_nodes, default_yaml_launch_spec, discovered_node_access, preallocate_ports, - reserve_local_node_ports, single_http_node_endpoints, text_config_launch_spec, - text_node_config, yaml_config_launch_spec, yaml_node_config, + build_indexed_http_peers, build_indexed_node_configs, build_launch_spec_with_args, + build_local_cluster_node_config, build_local_peer_nodes, default_yaml_launch_spec, + discovered_node_access, preallocate_ports, reserve_local_node_ports, + single_http_node_endpoints, text_config_launch_spec, text_node_config, yaml_config_launch_spec, + yaml_node_config, }; /// Context passed while building a local node config. @@ -506,11 +507,14 @@ pub async fn spawn_node_from_config( keep_tempdir: bool, persist_dir: Option<&std::path::Path>, snapshot_dir: Option<&std::path::Path>, + extra_args: &[String], ) -> Result, ProcessSpawnError> { + let extra_args = extra_args.to_vec(); + ProcessNode::spawn( &label, config, - E::build_launch_spec, + move |config, dir, label| build_launch_spec_with_args::(config, dir, label, &extra_args), E::node_endpoints, keep_tempdir, persist_dir, diff --git a/testing-framework/deployers/local/src/manual/mod.rs b/testing-framework/deployers/local/src/manual/mod.rs index 028a690..ffe0be2 100644 --- a/testing-framework/deployers/local/src/manual/mod.rs +++ b/testing-framework/deployers/local/src/manual/mod.rs @@ -70,6 +70,14 @@ impl ManualCluster { Ok(self.nodes.restart_node(name).await?) } + pub async fn restart_node_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result<(), ManualClusterError> { + Ok(self.nodes.restart_node_with(name, options).await?) + } + pub async fn stop_node(&self, name: &str) -> Result<(), ManualClusterError> { Ok(self.nodes.stop_node(name).await?) } @@ -125,6 +133,17 @@ impl NodeControlHandle for ManualCluster { .map_err(|err| err.into()) } + async fn restart_node_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result<(), DynError> { + self.nodes + .restart_node_with(name, options) + .await + .map_err(|err| err.into()) + } + async fn stop_node(&self, name: &str) -> Result<(), DynError> { self.nodes.stop_node(name).await.map_err(|err| err.into()) } diff --git a/testing-framework/deployers/local/src/node_control/mod.rs b/testing-framework/deployers/local/src/node_control/mod.rs index 149a0dc..0b82550 100644 --- a/testing-framework/deployers/local/src/node_control/mod.rs +++ b/testing-framework/deployers/local/src/node_control/mod.rs @@ -4,16 +4,16 @@ use std::{ }; use testing_framework_core::scenario::{ - Application, DynError, NodeClients, NodeControlHandle, ReadinessError, StartNodeOptions, - StartedNode, wait_for_http_ports, + Application, DynError, NodeClients, NodeControlHandle, NodeRuntimeOptions, ReadinessError, + StartNodeOptions, StartedNode, wait_for_http_ports, wait_for_http_ports_with_timeout, }; use thiserror::Error; use crate::{ env::{ - LocalDeployerEnv, Node, build_initial_node_configs, build_node_from_template, - initial_persist_dir, initial_snapshot_dir, node_peer_port, readiness_endpoint_path, - spawn_node_from_config, + LocalDeployerEnv, Node, build_initial_node_configs, build_launch_spec_with_args, + build_node_from_template, initial_persist_dir, initial_snapshot_dir, node_peer_port, + readiness_endpoint_path, spawn_node_from_config, }, process::ProcessSpawnError, }; @@ -31,6 +31,12 @@ struct NodeStartSnapshot { template_config: Option, } +#[derive(Clone, Copy)] +struct NodeReadinessTarget { + port: u16, + runtime: NodeRuntimeOptions, +} + #[derive(Debug, Error)] pub enum NodeManagerError { #[error("failed to generate node config: {source}")] @@ -96,6 +102,7 @@ impl NodeManager { keep_tempdir, persist_dir.as_deref(), snapshot_dir.as_deref(), + &[], ) .await?, ); @@ -119,6 +126,7 @@ impl NodeManager { peer_ports_by_name: seed.peer_ports_by_name.clone(), clients_by_name: HashMap::new(), indices_by_name: HashMap::new(), + runtime_by_name: HashMap::new(), nodes: Vec::new(), template_config: None, }; @@ -165,6 +173,7 @@ impl NodeManager { .clone_from(&self.seed.peer_ports_by_name); state.clients_by_name.clear(); state.indices_by_name.clear(); + state.runtime_by_name.clear(); state.node_count = self.seed.node_count; state.template_config = None; self.node_clients.clear(); @@ -182,7 +191,7 @@ impl NodeManager { let client = node.client(); self.node_clients.add_node(client.clone()); - state.register_node(&name, port, client, node); + state.register_node(&name, port, client, NodeRuntimeOptions::default(), node); } } @@ -209,28 +218,15 @@ impl NodeManager { } pub async fn wait_node_ready(&self, name: &str) -> Result<(), NodeManagerError> { - let port = { - let state = self.lock_state(); - let index = - *state - .indices_by_name - .get(name) - .ok_or_else(|| NodeManagerError::NodeName { - name: name.to_string(), - })?; + let target = self.readiness_target(name)?; - state - .nodes - .get(index) - .map(|node| node.endpoints().api.port()) - .ok_or_else(|| NodeManagerError::NodeName { - name: name.to_string(), - })? - }; - - wait_for_http_ports(&[port], readiness_endpoint_path::()) - .await - .map_err(|source| NodeManagerError::Readiness { source }) + wait_for_http_ports_with_timeout( + &[target.port], + readiness_endpoint_path::(), + target.runtime.start_timeout, + ) + .await + .map_err(|source| NodeManagerError::Readiness { source }) } pub async fn start_node_with( @@ -262,8 +258,10 @@ impl NodeManager { &snapshot.node_name, built.network_port, built.config, + options.runtime, options.persist_dir.as_deref(), options.snapshot_dir.as_deref(), + &options.args, ) .await?; @@ -274,9 +272,28 @@ impl NodeManager { } pub async fn restart_node(&self, name: &str) -> Result<(), NodeManagerError> { + self.restart_node_with(name, StartNodeOptions::default()) + .await + } + + pub async fn restart_node_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result<(), NodeManagerError> { let (index, mut node) = self.take_node(name)?; - if let Err(source) = node.restart().await { + validate_restart_options(&options)?; + + let launch = build_launch_spec_with_args::( + node.config(), + node.working_dir(), + name, + &options.args, + ) + .map_err(|source| NodeManagerError::Config { source })?; + + if let Err(source) = node.restart_with_launch(launch).await { self.put_node_back(index, node); return Err(NodeManagerError::Restart { @@ -285,6 +302,7 @@ impl NodeManager { } self.put_node_back(index, node); + self.store_runtime_options(name, options.runtime); Ok(()) } @@ -303,8 +321,10 @@ impl NodeManager { node_name: &str, network_port: u16, config: ::NodeConfig, + runtime: NodeRuntimeOptions, persist_dir: Option<&std::path::Path>, snapshot_dir: Option<&std::path::Path>, + extra_args: &[String], ) -> Result { let node = spawn_node_from_config::( node_name.to_string(), @@ -312,6 +332,7 @@ impl NodeManager { self.keep_tempdir, persist_dir, snapshot_dir, + extra_args, ) .await .map_err(|source| NodeManagerError::Spawn { @@ -326,7 +347,7 @@ impl NodeManager { state.template_config = Some(node.config().clone()); } - state.register_node(node_name, network_port, client.clone(), node); + state.register_node(node_name, network_port, client.clone(), runtime, node); Ok(client) } @@ -341,6 +362,20 @@ impl NodeManager { reinsert_node_at(&mut state, index, node); } + fn store_runtime_options(&self, name: &str, runtime: NodeRuntimeOptions) { + let mut state = self.lock_state(); + state.runtime_by_name.insert(name.to_string(), runtime); + } + + fn readiness_target(&self, name: &str) -> Result { + let state = self.lock_state(); + let index = node_index(&state, name)?; + let port = node_api_port(&state, index, name)?; + let runtime = node_runtime_options(&state, name); + + Ok(NodeReadinessTarget { port, runtime }) + } + fn start_snapshot( &self, requested_name: &str, @@ -371,6 +406,7 @@ fn clear_registered_nodes(state: &mut LocalNodeManagerState state.peer_ports_by_name.clear(); state.clients_by_name.clear(); state.indices_by_name.clear(); + state.runtime_by_name.clear(); state.node_count = 0; state.template_config = None; } @@ -403,6 +439,72 @@ fn normalize_node_name(index: usize, requested_name: &str) -> String { format!("node-{requested_name}") } +fn validate_restart_options( + options: &StartNodeOptions, +) -> Result<(), NodeManagerError> { + if options.peers.is_some() { + return Err(unsupported_restart_override("peer selection")); + } + + if options.config_override.is_some() { + return Err(unsupported_restart_override("config override")); + } + + if options.config_patch.is_some() { + return Err(unsupported_restart_override("config patch")); + } + + if options.persist_dir.is_some() { + return Err(unsupported_restart_override("persist dir")); + } + + if options.snapshot_dir.is_some() { + return Err(unsupported_restart_override("snapshot dir")); + } + + Ok(()) +} + +fn unsupported_restart_override(field: &str) -> NodeManagerError { + NodeManagerError::InvalidArgument { + message: format!("restart_node_with does not support {field} overrides"), + } +} + +fn node_index( + state: &LocalNodeManagerState, + name: &str, +) -> Result { + state + .indices_by_name + .get(name) + .copied() + .ok_or_else(|| NodeManagerError::NodeName { + name: name.to_string(), + }) +} + +fn node_api_port( + state: &LocalNodeManagerState, + index: usize, + name: &str, +) -> Result { + state + .nodes + .get(index) + .map(|node| node.endpoints().api.port()) + .ok_or_else(|| NodeManagerError::NodeName { + name: name.to_string(), + }) +} + +fn node_runtime_options( + state: &LocalNodeManagerState, + name: &str, +) -> NodeRuntimeOptions { + state.runtime_by_name.get(name).copied().unwrap_or_default() +} + fn default_node_label(index: usize) -> String { format!("node-{index}") } @@ -444,6 +546,16 @@ impl NodeControlHandle for NodeManager { self.restart_node(name).await.map_err(|err| err.into()) } + async fn restart_node_with( + &self, + name: &str, + options: StartNodeOptions, + ) -> Result<(), DynError> { + self.restart_node_with(name, options) + .await + .map_err(|err| err.into()) + } + async fn stop_node(&self, name: &str) -> Result<(), DynError> { self.stop_node(name).await.map_err(|err| err.into()) } diff --git a/testing-framework/deployers/local/src/node_control/state.rs b/testing-framework/deployers/local/src/node_control/state.rs index 59e3f1e..b40bfb9 100644 --- a/testing-framework/deployers/local/src/node_control/state.rs +++ b/testing-framework/deployers/local/src/node_control/state.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use testing_framework_core::scenario::NodeRuntimeOptions; + use crate::env::{LocalDeployerEnv, Node}; pub(crate) struct LocalNodeManagerState { @@ -8,6 +10,7 @@ pub(crate) struct LocalNodeManagerState { pub(crate) peer_ports_by_name: HashMap, pub(crate) clients_by_name: HashMap, pub(crate) indices_by_name: HashMap, + pub(crate) runtime_by_name: HashMap, pub(crate) nodes: Vec>, pub(crate) template_config: Option, } @@ -25,11 +28,13 @@ impl LocalNodeManagerState { node_name: &str, network_port: u16, client: E::NodeClient, + runtime: NodeRuntimeOptions, node: Node, ) { self.register_common(node_name, network_port, client); let index = self.nodes.len(); self.indices_by_name.insert(node_name.to_string(), index); + self.runtime_by_name.insert(node_name.to_string(), runtime); self.node_count += 1; self.nodes.push(node); } diff --git a/testing-framework/deployers/local/src/process.rs b/testing-framework/deployers/local/src/process.rs index ced4959..53b8766 100644 --- a/testing-framework/deployers/local/src/process.rs +++ b/testing-framework/deployers/local/src/process.rs @@ -168,6 +168,10 @@ impl &Path { + self.tempdir.path() + } + pub fn pid(&self) -> u32 { self.child.id().unwrap_or_default() } @@ -255,6 +259,16 @@ impl Result<(), ProcessSpawnError> { + self.stop_child().await?; + self.launch = launch; + self.child = self.spawn_child().await?; + Ok(()) + } + pub async fn stop(&mut self) { let _ = self.stop_child().await; }