Add manual cluster restart options

This commit is contained in:
Andrus Salumets 2026-04-23 18:14:22 +07:00 committed by GitHub
commit 4854942dce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 321 additions and 88 deletions

View File

@ -6,11 +6,6 @@ exclude-dev = true
no-default-features = true no-default-features = true
[advisories] [advisories]
ignore = [
# Existing workspace dependencies still resolve rand 0.8 via tera/tokio-retry.
# Track removal when those upstream edges move to a fixed release.
"RUSTSEC-2026-0097",
]
yanked = "deny" yanked = "deny"
[bans] [bans]

28
Cargo.lock generated
View File

@ -156,7 +156,7 @@ dependencies = [
"nuid", "nuid",
"pin-project", "pin-project",
"portable-atomic", "portable-atomic",
"rand 0.8.5", "rand 0.8.6",
"regex", "regex",
"ring", "ring",
"rustls-native-certs", "rustls-native-certs",
@ -1977,7 +1977,7 @@ dependencies = [
"ed25519-dalek", "ed25519-dalek",
"getrandom 0.2.17", "getrandom 0.2.17",
"log", "log",
"rand 0.8.5", "rand 0.8.6",
"signatory", "signatory",
] ]
@ -1996,7 +1996,7 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83"
dependencies = [ dependencies = [
"rand 0.8.5", "rand 0.8.6",
] ]
[[package]] [[package]]
@ -2378,7 +2378,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d"
dependencies = [ dependencies = [
"phf_shared", "phf_shared",
"rand 0.8.5", "rand 0.8.6",
] ]
[[package]] [[package]]
@ -2706,9 +2706,9 @@ checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09"
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.8.5" version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a"
dependencies = [ dependencies = [
"libc", "libc",
"rand_chacha 0.3.1", "rand_chacha 0.3.1",
@ -2979,7 +2979,7 @@ dependencies = [
"borsh", "borsh",
"bytes", "bytes",
"num-traits", "num-traits",
"rand 0.8.5", "rand 0.8.6",
"rkyv", "rkyv",
"serde", "serde",
"serde_json", "serde_json",
@ -3059,9 +3059,9 @@ dependencies = [
[[package]] [[package]]
name = "rustls-webpki" name = "rustls-webpki"
version = "0.103.10" version = "0.103.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef" checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e"
dependencies = [ dependencies = [
"ring", "ring",
"rustls-pki-types", "rustls-pki-types",
@ -3514,7 +3514,7 @@ dependencies = [
"percent-encoding", "percent-encoding",
"pest", "pest",
"pest_derive", "pest_derive",
"rand 0.8.5", "rand 0.8.6",
"regex", "regex",
"serde", "serde",
"serde_json", "serde_json",
@ -3534,7 +3534,7 @@ dependencies = [
"futures", "futures",
"parking_lot", "parking_lot",
"prometheus-http-query", "prometheus-http-query",
"rand 0.8.5", "rand 0.8.6",
"reqwest", "reqwest",
"serde", "serde",
"serde_yaml", "serde_yaml",
@ -3755,7 +3755,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f"
dependencies = [ dependencies = [
"pin-project", "pin-project",
"rand 0.8.5", "rand 0.8.6",
"tokio", "tokio",
] ]
@ -3818,7 +3818,7 @@ dependencies = [
"futures-sink", "futures-sink",
"http", "http",
"httparse", "httparse",
"rand 0.8.5", "rand 0.8.6",
"ring", "ring",
"rustls-pki-types", "rustls-pki-types",
"tokio", "tokio",
@ -3997,7 +3997,7 @@ dependencies = [
"http", "http",
"httparse", "httparse",
"log", "log",
"rand 0.8.5", "rand 0.8.6",
"sha1", "sha1",
"thiserror 1.0.69", "thiserror 1.0.69",
"utf-8", "utf-8",

View File

@ -8,28 +8,28 @@ members = [
"examples/kvstore/kvstore-node", "examples/kvstore/kvstore-node",
"examples/kvstore/testing/integration", "examples/kvstore/testing/integration",
"examples/kvstore/testing/workloads", "examples/kvstore/testing/workloads",
"examples/openraft_kv/examples",
"examples/openraft_kv/openraft-kv-node",
"examples/openraft_kv/testing/integration",
"examples/openraft_kv/testing/workloads",
"examples/queue/examples",
"examples/queue/queue-node",
"examples/queue/testing/integration",
"examples/queue/testing/workloads",
"examples/metrics_counter/examples", "examples/metrics_counter/examples",
"examples/metrics_counter/metrics-counter-node", "examples/metrics_counter/metrics-counter-node",
"examples/metrics_counter/testing/integration", "examples/metrics_counter/testing/integration",
"examples/metrics_counter/testing/workloads", "examples/metrics_counter/testing/workloads",
"examples/redis_streams/examples",
"examples/redis_streams/testing/integration",
"examples/redis_streams/testing/workloads",
"examples/nats/examples", "examples/nats/examples",
"examples/nats/testing/integration", "examples/nats/testing/integration",
"examples/nats/testing/workloads", "examples/nats/testing/workloads",
"examples/openraft_kv/examples",
"examples/openraft_kv/openraft-kv-node",
"examples/openraft_kv/testing/integration",
"examples/openraft_kv/testing/workloads",
"examples/pubsub/examples", "examples/pubsub/examples",
"examples/pubsub/pubsub-node", "examples/pubsub/pubsub-node",
"examples/pubsub/testing/integration", "examples/pubsub/testing/integration",
"examples/pubsub/testing/workloads", "examples/pubsub/testing/workloads",
"examples/queue/examples",
"examples/queue/queue-node",
"examples/queue/testing/integration",
"examples/queue/testing/workloads",
"examples/redis_streams/examples",
"examples/redis_streams/testing/integration",
"examples/redis_streams/testing/workloads",
"testing-framework/core", "testing-framework/core",
"testing-framework/deployers/compose", "testing-framework/deployers/compose",
"testing-framework/deployers/k8s", "testing-framework/deployers/k8s",

View File

@ -1,4 +1,4 @@
use std::{fmt, marker::PhantomData, path::PathBuf, sync::Arc}; use std::{fmt, marker::PhantomData, path::PathBuf, sync::Arc, time::Duration};
use reqwest::Url; use reqwest::Url;
@ -34,7 +34,7 @@ pub enum PeerSelection {
#[derive(Clone)] #[derive(Clone)]
pub struct StartNodeOptions<E: Application> { pub struct StartNodeOptions<E: Application> {
/// How to select initial peers on startup. /// How to select initial peers on startup.
pub peers: PeerSelection, pub peers: Option<PeerSelection>,
/// Optional backend-specific initial config override. /// Optional backend-specific initial config override.
pub config_override: Option<E::NodeConfig>, pub config_override: Option<E::NodeConfig>,
/// Optional patch callback applied to generated node config before spawn. /// Optional patch callback applied to generated node config before spawn.
@ -44,9 +44,20 @@ pub struct StartNodeOptions<E: Application> {
pub persist_dir: Option<PathBuf>, pub persist_dir: Option<PathBuf>,
/// Optional directory whose contents should seed the node working dir. /// Optional directory whose contents should seed the node working dir.
pub snapshot_dir: Option<PathBuf>, pub snapshot_dir: Option<PathBuf>,
/// Extra process arguments appended on launch.
pub args: Vec<String>,
/// Runtime policy for this node launch.
pub runtime: NodeRuntimeOptions,
_phantom: PhantomData<E>, _phantom: PhantomData<E>,
} }
/// Runtime supervision options for a node process.
#[derive(Clone, Copy, Debug, Default)]
pub struct NodeRuntimeOptions {
/// Optional readiness/start timeout override for this node.
pub start_timeout: Option<Duration>,
}
impl<E: Application> fmt::Debug for StartNodeOptions<E> { impl<E: Application> fmt::Debug for StartNodeOptions<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StartNodeOptions") f.debug_struct("StartNodeOptions")
@ -55,6 +66,8 @@ impl<E: Application> fmt::Debug for StartNodeOptions<E> {
.field("config_patch", &self.config_patch.is_some()) .field("config_patch", &self.config_patch.is_some())
.field("persist_dir", &self.persist_dir) .field("persist_dir", &self.persist_dir)
.field("snapshot_dir", &self.snapshot_dir) .field("snapshot_dir", &self.snapshot_dir)
.field("args", &self.args)
.field("runtime", &self.runtime)
.finish() .finish()
} }
} }
@ -62,11 +75,13 @@ impl<E: Application> fmt::Debug for StartNodeOptions<E> {
impl<E: Application> Default for StartNodeOptions<E> { impl<E: Application> Default for StartNodeOptions<E> {
fn default() -> Self { fn default() -> Self {
Self { Self {
peers: PeerSelection::DefaultLayout, peers: None,
config_override: None, config_override: None,
config_patch: None, config_patch: None,
persist_dir: None, persist_dir: None,
snapshot_dir: None, snapshot_dir: None,
args: Vec::new(),
runtime: NodeRuntimeOptions::default(),
_phantom: PhantomData, _phantom: PhantomData,
} }
} }
@ -75,7 +90,7 @@ impl<E: Application> Default for StartNodeOptions<E> {
impl<E: Application> StartNodeOptions<E> { impl<E: Application> StartNodeOptions<E> {
#[must_use] #[must_use]
pub fn with_peers(mut self, peers: PeerSelection) -> Self { pub fn with_peers(mut self, peers: PeerSelection) -> Self {
self.peers = peers; self.peers = Some(peers);
self self
} }
@ -105,6 +120,24 @@ impl<E: Application> StartNodeOptions<E> {
self.snapshot_dir = Some(snapshot_dir); self.snapshot_dir = Some(snapshot_dir);
self self
} }
#[must_use]
pub fn with_args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.args.extend(args.into_iter().map(Into::into));
self
}
#[must_use]
pub fn with_runtime(mut self, runtime: NodeRuntimeOptions) -> Self {
self.runtime = runtime;
self
}
#[must_use]
pub fn with_start_timeout(mut self, start_timeout: Duration) -> Self {
self.runtime.start_timeout = Some(start_timeout);
self
}
} }
/// Indicates whether a capability requires node control. /// Indicates whether a capability requires node control.

View File

@ -163,16 +163,16 @@ where
options: &StartNodeOptions<Self>, options: &StartNodeOptions<Self>,
) -> Result<Option<ArtifactSet>, Self::Error> { ) -> Result<Option<ArtifactSet>, Self::Error> {
let mut config = match &options.peers { let mut config = match &options.peers {
PeerSelection::DefaultLayout => { None | Some(PeerSelection::DefaultLayout) => {
if options.config_override.is_none() && options.config_patch.is_none() { if options.config_override.is_none() && options.config_patch.is_none() {
return Ok(None); return Ok(None);
} }
build_static_cluster_node_config::<T>(deployment, node_index, Some(hostnames))? build_static_cluster_node_config::<T>(deployment, node_index, Some(hostnames))?
} }
PeerSelection::None => { Some(PeerSelection::None) => {
build_cluster_node_config_for_indices::<T>(node_index, hostnames, &[])? build_cluster_node_config_for_indices::<T>(node_index, hostnames, &[])?
} }
PeerSelection::Named(names) => { Some(PeerSelection::Named(names)) => {
let indices = resolve_named_peer_indices::<T>(deployment, node_index, names)?; let indices = resolve_named_peer_indices::<T>(deployment, node_index, names)?;
build_cluster_node_config_for_indices::<T>(node_index, hostnames, &indices)? build_cluster_node_config_for_indices::<T>(node_index, hostnames, &indices)?
} }

View File

@ -9,6 +9,14 @@ pub trait NodeControlHandle<E: Application>: Send + Sync {
Err("restart_node not supported by this deployer".into()) Err("restart_node not supported by this deployer".into())
} }
async fn restart_node_with(
&self,
_name: &str,
_options: StartNodeOptions<E>,
) -> Result<(), DynError> {
Err("restart_node_with not supported by this deployer".into())
}
async fn start_node(&self, _name: &str) -> Result<StartedNode<E>, DynError> { async fn start_node(&self, _name: &str) -> Result<StartedNode<E>, DynError> {
Err("start_node not supported by this deployer".into()) Err("start_node not supported by this deployer".into())
} }

View File

@ -22,8 +22,8 @@ pub type DynError = Box<dyn Error + Send + Sync + 'static>;
pub use builder_ext::{BuilderInputError, ObservabilityBuilderExt}; pub use builder_ext::{BuilderInputError, ObservabilityBuilderExt};
pub use capabilities::{ pub use capabilities::{
NodeControlCapability, ObservabilityCapability, PeerSelection, RequiresNodeControl, NodeControlCapability, NodeRuntimeOptions, ObservabilityCapability, PeerSelection,
StartNodeOptions, StartedNode, RequiresNodeControl, StartNodeOptions, StartedNode,
}; };
pub use client::NodeAccess; pub use client::NodeAccess;
pub use common_builder_ext::CoreBuilderExt; pub use common_builder_ext::CoreBuilderExt;
@ -45,6 +45,7 @@ pub use runtime::{
}, },
wait_for_http_ports, wait_for_http_ports_with_host, wait_for_http_ports, wait_for_http_ports_with_host,
wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement, wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement,
wait_for_http_ports_with_requirement_and_timeout, wait_for_http_ports_with_timeout,
wait_http_readiness, wait_until_stable, wait_http_readiness, wait_until_stable,
}; };
pub use sources::{ pub use sources::{

View File

@ -23,6 +23,7 @@ pub use node_clients::NodeClients;
pub use readiness::{ pub use readiness::{
HttpReadinessRequirement, ReadinessError, StabilizationConfig, wait_for_http_ports, HttpReadinessRequirement, ReadinessError, StabilizationConfig, wait_for_http_ports,
wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement,
wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable, wait_for_http_ports_with_requirement, wait_for_http_ports_with_requirement_and_timeout,
wait_for_http_ports_with_timeout, wait_http_readiness, wait_until_stable,
}; };
pub use runner::Runner; pub use runner::Runner;

View File

@ -232,12 +232,20 @@ where
pub async fn wait_http_readiness( pub async fn wait_http_readiness(
endpoints: &[Url], endpoints: &[Url],
requirement: HttpReadinessRequirement, requirement: HttpReadinessRequirement,
) -> Result<(), ReadinessError> {
wait_http_readiness_with_timeout(endpoints, requirement, None).await
}
pub async fn wait_http_readiness_with_timeout(
endpoints: &[Url],
requirement: HttpReadinessRequirement,
timeout: Option<Duration>,
) -> Result<(), ReadinessError> { ) -> Result<(), ReadinessError> {
if endpoints.is_empty() { if endpoints.is_empty() {
return Ok(()); return Ok(());
} }
let (poll_interval, max_attempts) = http_retry_plan(); let (poll_interval, max_attempts) = http_retry_plan(timeout);
let client = Client::new(); let client = Client::new();
let retry = RetryConfig::bounded(max_attempts, poll_interval, poll_interval); let retry = RetryConfig::bounded(max_attempts, poll_interval, poll_interval);
retry_async(retry, |_| async { retry_async(retry, |_| async {
@ -254,8 +262,8 @@ pub async fn wait_http_readiness(
}) })
} }
fn http_retry_plan() -> (Duration, usize) { fn http_retry_plan(timeout: Option<Duration>) -> (Duration, usize) {
let timeout_duration = adjust_timeout(DEFAULT_TIMEOUT); let timeout_duration = adjust_timeout(timeout.unwrap_or(DEFAULT_TIMEOUT));
let poll_interval = DEFAULT_POLL_INTERVAL; let poll_interval = DEFAULT_POLL_INTERVAL;
let max_attempts = retry_attempts(timeout_duration, poll_interval); let max_attempts = retry_attempts(timeout_duration, poll_interval);
(poll_interval, max_attempts) (poll_interval, max_attempts)
@ -269,17 +277,39 @@ fn retry_attempts(timeout: Duration, interval: Duration) -> usize {
} }
pub async fn wait_for_http_ports(ports: &[u16], endpoint_path: &str) -> Result<(), ReadinessError> { pub async fn wait_for_http_ports(ports: &[u16], endpoint_path: &str) -> Result<(), ReadinessError> {
wait_for_http_ports_with_requirement(ports, endpoint_path, default_readiness_requirement()) wait_for_http_ports_with_timeout(ports, endpoint_path, None).await
.await }
pub async fn wait_for_http_ports_with_timeout(
ports: &[u16],
endpoint_path: &str,
timeout: Option<Duration>,
) -> Result<(), ReadinessError> {
wait_for_http_ports_with_requirement_and_timeout(
ports,
endpoint_path,
default_readiness_requirement(),
timeout,
)
.await
} }
pub async fn wait_for_http_ports_with_requirement( pub async fn wait_for_http_ports_with_requirement(
ports: &[u16], ports: &[u16],
endpoint_path: &str, endpoint_path: &str,
requirement: HttpReadinessRequirement, requirement: HttpReadinessRequirement,
) -> Result<(), ReadinessError> {
wait_for_http_ports_with_requirement_and_timeout(ports, endpoint_path, requirement, None).await
}
pub async fn wait_for_http_ports_with_requirement_and_timeout(
ports: &[u16],
endpoint_path: &str,
requirement: HttpReadinessRequirement,
timeout: Option<Duration>,
) -> Result<(), ReadinessError> { ) -> Result<(), ReadinessError> {
let endpoints = build_local_endpoints(ports, endpoint_path)?; let endpoints = build_local_endpoints(ports, endpoint_path)?;
wait_http_readiness(&endpoints, requirement).await wait_http_readiness_with_timeout(&endpoints, requirement, timeout).await
} }
pub async fn wait_for_http_ports_with_host( pub async fn wait_for_http_ports_with_host(

View File

@ -14,7 +14,7 @@ use testing_framework_core::{
manual::ManualClusterHandle, manual::ManualClusterHandle,
scenario::{ scenario::{
ClusterWaitHandle, DynError, ExternalNodeSource, HttpReadinessRequirement, NodeClients, ClusterWaitHandle, DynError, ExternalNodeSource, HttpReadinessRequirement, NodeClients,
NodeControlHandle, StartNodeOptions, StartedNode, NodeControlHandle, PeerSelection, StartNodeOptions, StartedNode,
}, },
}; };
use thiserror::Error; use thiserror::Error;
@ -604,10 +604,7 @@ fn validate_start_options<E: K8sDeployEnv>(
fn ensure_default_cfgsync_options<E: K8sDeployEnv>( fn ensure_default_cfgsync_options<E: K8sDeployEnv>(
options: &StartNodeOptions<E>, options: &StartNodeOptions<E>,
) -> Result<(), ManualClusterError> { ) -> Result<(), ManualClusterError> {
let default_peers = matches!( let default_peers = matches!(options.peers, None | Some(PeerSelection::DefaultLayout));
options.peers,
testing_framework_core::scenario::PeerSelection::DefaultLayout
);
if default_peers && options.config_override.is_none() && options.config_patch.is_none() { if default_peers && options.config_override.is_none() && options.config_patch.is_none() {
return Ok(()); return Ok(());
} }
@ -719,14 +716,14 @@ mod tests {
options: &StartNodeOptions<Self>, options: &StartNodeOptions<Self>,
) -> Result<Option<cfgsync_artifacts::ArtifactSet>, Self::Error> { ) -> Result<Option<cfgsync_artifacts::ArtifactSet>, Self::Error> {
let mut config = match &options.peers { let mut config = match &options.peers {
PeerSelection::DefaultLayout => { None | Some(PeerSelection::DefaultLayout) => {
if options.config_override.is_none() && options.config_patch.is_none() { if options.config_override.is_none() && options.config_patch.is_none() {
return Ok(None); return Ok(None);
} }
format!("node={node_index};peers=default") format!("node={node_index};peers=default")
} }
PeerSelection::None => format!("node={node_index};peers=none"), Some(PeerSelection::None) => format!("node={node_index};peers=none"),
PeerSelection::Named(names) => { Some(PeerSelection::Named(names)) => {
format!("node={node_index};peers={}", names.join(",")) format!("node={node_index};peers={}", names.join(","))
} }
}; };

View File

@ -343,6 +343,20 @@ pub fn yaml_config_launch_spec<T: Serialize>(
rendered_config_launch_spec(config_yaml.into_bytes(), spec) rendered_config_launch_spec(config_yaml.into_bytes(), spec)
} }
pub fn build_launch_spec_with_args<E>(
config: &<E as Application>::NodeConfig,
dir: &std::path::Path,
label: &str,
extra_args: &[String],
) -> Result<LaunchSpec, DynError>
where
E: crate::env::LocalDeployerEnv,
{
let mut launch = E::build_launch_spec(config, dir, label)?;
launch.args.extend(extra_args.iter().cloned());
Ok(launch)
}
/// Uses an already rendered text config to build a launch spec for `spec`. /// Uses an already rendered text config to build a launch spec for `spec`.
pub fn text_config_launch_spec( pub fn text_config_launch_spec(
rendered_config: impl Into<Vec<u8>>, rendered_config: impl Into<Vec<u8>>,

View File

@ -21,10 +21,11 @@ mod tests;
pub use helpers::{ pub use helpers::{
BuiltNodeConfig, LocalNodePorts, LocalPeerNode, LocalProcessSpec, NodeConfigEntry, BuiltNodeConfig, LocalNodePorts, LocalPeerNode, LocalProcessSpec, NodeConfigEntry,
build_indexed_http_peers, build_indexed_node_configs, build_local_cluster_node_config, build_indexed_http_peers, build_indexed_node_configs, build_launch_spec_with_args,
build_local_peer_nodes, default_yaml_launch_spec, discovered_node_access, preallocate_ports, build_local_cluster_node_config, build_local_peer_nodes, default_yaml_launch_spec,
reserve_local_node_ports, single_http_node_endpoints, text_config_launch_spec, discovered_node_access, preallocate_ports, reserve_local_node_ports,
text_node_config, yaml_config_launch_spec, yaml_node_config, single_http_node_endpoints, text_config_launch_spec, text_node_config, yaml_config_launch_spec,
yaml_node_config,
}; };
/// Context passed while building a local node config. /// Context passed while building a local node config.
@ -506,11 +507,14 @@ pub async fn spawn_node_from_config<E: LocalDeployerEnv>(
keep_tempdir: bool, keep_tempdir: bool,
persist_dir: Option<&std::path::Path>, persist_dir: Option<&std::path::Path>,
snapshot_dir: Option<&std::path::Path>, snapshot_dir: Option<&std::path::Path>,
extra_args: &[String],
) -> Result<Node<E>, ProcessSpawnError> { ) -> Result<Node<E>, ProcessSpawnError> {
let extra_args = extra_args.to_vec();
ProcessNode::spawn( ProcessNode::spawn(
&label, &label,
config, config,
E::build_launch_spec, move |config, dir, label| build_launch_spec_with_args::<E>(config, dir, label, &extra_args),
E::node_endpoints, E::node_endpoints,
keep_tempdir, keep_tempdir,
persist_dir, persist_dir,

View File

@ -70,6 +70,14 @@ impl<E: LocalDeployerEnv> ManualCluster<E> {
Ok(self.nodes.restart_node(name).await?) Ok(self.nodes.restart_node(name).await?)
} }
pub async fn restart_node_with(
&self,
name: &str,
options: StartNodeOptions<E>,
) -> Result<(), ManualClusterError> {
Ok(self.nodes.restart_node_with(name, options).await?)
}
pub async fn stop_node(&self, name: &str) -> Result<(), ManualClusterError> { pub async fn stop_node(&self, name: &str) -> Result<(), ManualClusterError> {
Ok(self.nodes.stop_node(name).await?) Ok(self.nodes.stop_node(name).await?)
} }
@ -125,6 +133,17 @@ impl<E: LocalDeployerEnv> NodeControlHandle<E> for ManualCluster<E> {
.map_err(|err| err.into()) .map_err(|err| err.into())
} }
async fn restart_node_with(
&self,
name: &str,
options: StartNodeOptions<E>,
) -> Result<(), DynError> {
self.nodes
.restart_node_with(name, options)
.await
.map_err(|err| err.into())
}
async fn stop_node(&self, name: &str) -> Result<(), DynError> { async fn stop_node(&self, name: &str) -> Result<(), DynError> {
self.nodes.stop_node(name).await.map_err(|err| err.into()) self.nodes.stop_node(name).await.map_err(|err| err.into())
} }

View File

@ -4,16 +4,16 @@ use std::{
}; };
use testing_framework_core::scenario::{ use testing_framework_core::scenario::{
Application, DynError, NodeClients, NodeControlHandle, ReadinessError, StartNodeOptions, Application, DynError, NodeClients, NodeControlHandle, NodeRuntimeOptions, ReadinessError,
StartedNode, wait_for_http_ports, StartNodeOptions, StartedNode, wait_for_http_ports, wait_for_http_ports_with_timeout,
}; };
use thiserror::Error; use thiserror::Error;
use crate::{ use crate::{
env::{ env::{
LocalDeployerEnv, Node, build_initial_node_configs, build_node_from_template, LocalDeployerEnv, Node, build_initial_node_configs, build_launch_spec_with_args,
initial_persist_dir, initial_snapshot_dir, node_peer_port, readiness_endpoint_path, build_node_from_template, initial_persist_dir, initial_snapshot_dir, node_peer_port,
spawn_node_from_config, readiness_endpoint_path, spawn_node_from_config,
}, },
process::ProcessSpawnError, process::ProcessSpawnError,
}; };
@ -31,6 +31,12 @@ struct NodeStartSnapshot<Config> {
template_config: Option<Config>, template_config: Option<Config>,
} }
#[derive(Clone, Copy)]
struct NodeReadinessTarget {
port: u16,
runtime: NodeRuntimeOptions,
}
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum NodeManagerError { pub enum NodeManagerError {
#[error("failed to generate node config: {source}")] #[error("failed to generate node config: {source}")]
@ -96,6 +102,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
keep_tempdir, keep_tempdir,
persist_dir.as_deref(), persist_dir.as_deref(),
snapshot_dir.as_deref(), snapshot_dir.as_deref(),
&[],
) )
.await?, .await?,
); );
@ -119,6 +126,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
peer_ports_by_name: seed.peer_ports_by_name.clone(), peer_ports_by_name: seed.peer_ports_by_name.clone(),
clients_by_name: HashMap::new(), clients_by_name: HashMap::new(),
indices_by_name: HashMap::new(), indices_by_name: HashMap::new(),
runtime_by_name: HashMap::new(),
nodes: Vec::new(), nodes: Vec::new(),
template_config: None, template_config: None,
}; };
@ -165,6 +173,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
.clone_from(&self.seed.peer_ports_by_name); .clone_from(&self.seed.peer_ports_by_name);
state.clients_by_name.clear(); state.clients_by_name.clear();
state.indices_by_name.clear(); state.indices_by_name.clear();
state.runtime_by_name.clear();
state.node_count = self.seed.node_count; state.node_count = self.seed.node_count;
state.template_config = None; state.template_config = None;
self.node_clients.clear(); self.node_clients.clear();
@ -182,7 +191,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
let client = node.client(); let client = node.client();
self.node_clients.add_node(client.clone()); self.node_clients.add_node(client.clone());
state.register_node(&name, port, client, node); state.register_node(&name, port, client, NodeRuntimeOptions::default(), node);
} }
} }
@ -209,28 +218,15 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
} }
pub async fn wait_node_ready(&self, name: &str) -> Result<(), NodeManagerError> { pub async fn wait_node_ready(&self, name: &str) -> Result<(), NodeManagerError> {
let port = { let target = self.readiness_target(name)?;
let state = self.lock_state();
let index =
*state
.indices_by_name
.get(name)
.ok_or_else(|| NodeManagerError::NodeName {
name: name.to_string(),
})?;
state wait_for_http_ports_with_timeout(
.nodes &[target.port],
.get(index) readiness_endpoint_path::<E>(),
.map(|node| node.endpoints().api.port()) target.runtime.start_timeout,
.ok_or_else(|| NodeManagerError::NodeName { )
name: name.to_string(), .await
})? .map_err(|source| NodeManagerError::Readiness { source })
};
wait_for_http_ports(&[port], readiness_endpoint_path::<E>())
.await
.map_err(|source| NodeManagerError::Readiness { source })
} }
pub async fn start_node_with( pub async fn start_node_with(
@ -262,8 +258,10 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
&snapshot.node_name, &snapshot.node_name,
built.network_port, built.network_port,
built.config, built.config,
options.runtime,
options.persist_dir.as_deref(), options.persist_dir.as_deref(),
options.snapshot_dir.as_deref(), options.snapshot_dir.as_deref(),
&options.args,
) )
.await?; .await?;
@ -274,9 +272,28 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
} }
pub async fn restart_node(&self, name: &str) -> Result<(), NodeManagerError> { pub async fn restart_node(&self, name: &str) -> Result<(), NodeManagerError> {
self.restart_node_with(name, StartNodeOptions::default())
.await
}
pub async fn restart_node_with(
&self,
name: &str,
options: StartNodeOptions<E>,
) -> Result<(), NodeManagerError> {
let (index, mut node) = self.take_node(name)?; let (index, mut node) = self.take_node(name)?;
if let Err(source) = node.restart().await { validate_restart_options(&options)?;
let launch = build_launch_spec_with_args::<E>(
node.config(),
node.working_dir(),
name,
&options.args,
)
.map_err(|source| NodeManagerError::Config { source })?;
if let Err(source) = node.restart_with_launch(launch).await {
self.put_node_back(index, node); self.put_node_back(index, node);
return Err(NodeManagerError::Restart { return Err(NodeManagerError::Restart {
@ -285,6 +302,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
} }
self.put_node_back(index, node); self.put_node_back(index, node);
self.store_runtime_options(name, options.runtime);
Ok(()) Ok(())
} }
@ -303,8 +321,10 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
node_name: &str, node_name: &str,
network_port: u16, network_port: u16,
config: <E as Application>::NodeConfig, config: <E as Application>::NodeConfig,
runtime: NodeRuntimeOptions,
persist_dir: Option<&std::path::Path>, persist_dir: Option<&std::path::Path>,
snapshot_dir: Option<&std::path::Path>, snapshot_dir: Option<&std::path::Path>,
extra_args: &[String],
) -> Result<E::NodeClient, NodeManagerError> { ) -> Result<E::NodeClient, NodeManagerError> {
let node = spawn_node_from_config::<E>( let node = spawn_node_from_config::<E>(
node_name.to_string(), node_name.to_string(),
@ -312,6 +332,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
self.keep_tempdir, self.keep_tempdir,
persist_dir, persist_dir,
snapshot_dir, snapshot_dir,
extra_args,
) )
.await .await
.map_err(|source| NodeManagerError::Spawn { .map_err(|source| NodeManagerError::Spawn {
@ -326,7 +347,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
state.template_config = Some(node.config().clone()); state.template_config = Some(node.config().clone());
} }
state.register_node(node_name, network_port, client.clone(), node); state.register_node(node_name, network_port, client.clone(), runtime, node);
Ok(client) Ok(client)
} }
@ -341,6 +362,20 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
reinsert_node_at(&mut state, index, node); reinsert_node_at(&mut state, index, node);
} }
fn store_runtime_options(&self, name: &str, runtime: NodeRuntimeOptions) {
let mut state = self.lock_state();
state.runtime_by_name.insert(name.to_string(), runtime);
}
fn readiness_target(&self, name: &str) -> Result<NodeReadinessTarget, NodeManagerError> {
let state = self.lock_state();
let index = node_index(&state, name)?;
let port = node_api_port(&state, index, name)?;
let runtime = node_runtime_options(&state, name);
Ok(NodeReadinessTarget { port, runtime })
}
fn start_snapshot( fn start_snapshot(
&self, &self,
requested_name: &str, requested_name: &str,
@ -371,6 +406,7 @@ fn clear_registered_nodes<E: LocalDeployerEnv>(state: &mut LocalNodeManagerState
state.peer_ports_by_name.clear(); state.peer_ports_by_name.clear();
state.clients_by_name.clear(); state.clients_by_name.clear();
state.indices_by_name.clear(); state.indices_by_name.clear();
state.runtime_by_name.clear();
state.node_count = 0; state.node_count = 0;
state.template_config = None; state.template_config = None;
} }
@ -403,6 +439,72 @@ fn normalize_node_name(index: usize, requested_name: &str) -> String {
format!("node-{requested_name}") format!("node-{requested_name}")
} }
fn validate_restart_options<E: LocalDeployerEnv>(
options: &StartNodeOptions<E>,
) -> Result<(), NodeManagerError> {
if options.peers.is_some() {
return Err(unsupported_restart_override("peer selection"));
}
if options.config_override.is_some() {
return Err(unsupported_restart_override("config override"));
}
if options.config_patch.is_some() {
return Err(unsupported_restart_override("config patch"));
}
if options.persist_dir.is_some() {
return Err(unsupported_restart_override("persist dir"));
}
if options.snapshot_dir.is_some() {
return Err(unsupported_restart_override("snapshot dir"));
}
Ok(())
}
fn unsupported_restart_override(field: &str) -> NodeManagerError {
NodeManagerError::InvalidArgument {
message: format!("restart_node_with does not support {field} overrides"),
}
}
fn node_index<E: LocalDeployerEnv>(
state: &LocalNodeManagerState<E>,
name: &str,
) -> Result<usize, NodeManagerError> {
state
.indices_by_name
.get(name)
.copied()
.ok_or_else(|| NodeManagerError::NodeName {
name: name.to_string(),
})
}
fn node_api_port<E: LocalDeployerEnv>(
state: &LocalNodeManagerState<E>,
index: usize,
name: &str,
) -> Result<u16, NodeManagerError> {
state
.nodes
.get(index)
.map(|node| node.endpoints().api.port())
.ok_or_else(|| NodeManagerError::NodeName {
name: name.to_string(),
})
}
fn node_runtime_options<E: LocalDeployerEnv>(
state: &LocalNodeManagerState<E>,
name: &str,
) -> NodeRuntimeOptions {
state.runtime_by_name.get(name).copied().unwrap_or_default()
}
fn default_node_label(index: usize) -> String { fn default_node_label(index: usize) -> String {
format!("node-{index}") format!("node-{index}")
} }
@ -444,6 +546,16 @@ impl<E: LocalDeployerEnv> NodeControlHandle<E> for NodeManager<E> {
self.restart_node(name).await.map_err(|err| err.into()) self.restart_node(name).await.map_err(|err| err.into())
} }
async fn restart_node_with(
&self,
name: &str,
options: StartNodeOptions<E>,
) -> Result<(), DynError> {
self.restart_node_with(name, options)
.await
.map_err(|err| err.into())
}
async fn stop_node(&self, name: &str) -> Result<(), DynError> { async fn stop_node(&self, name: &str) -> Result<(), DynError> {
self.stop_node(name).await.map_err(|err| err.into()) self.stop_node(name).await.map_err(|err| err.into())
} }

View File

@ -1,5 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use testing_framework_core::scenario::NodeRuntimeOptions;
use crate::env::{LocalDeployerEnv, Node}; use crate::env::{LocalDeployerEnv, Node};
pub(crate) struct LocalNodeManagerState<E: LocalDeployerEnv> { pub(crate) struct LocalNodeManagerState<E: LocalDeployerEnv> {
@ -8,6 +10,7 @@ pub(crate) struct LocalNodeManagerState<E: LocalDeployerEnv> {
pub(crate) peer_ports_by_name: HashMap<String, u16>, pub(crate) peer_ports_by_name: HashMap<String, u16>,
pub(crate) clients_by_name: HashMap<String, E::NodeClient>, pub(crate) clients_by_name: HashMap<String, E::NodeClient>,
pub(crate) indices_by_name: HashMap<String, usize>, pub(crate) indices_by_name: HashMap<String, usize>,
pub(crate) runtime_by_name: HashMap<String, NodeRuntimeOptions>,
pub(crate) nodes: Vec<Node<E>>, pub(crate) nodes: Vec<Node<E>>,
pub(crate) template_config: Option<E::NodeConfig>, pub(crate) template_config: Option<E::NodeConfig>,
} }
@ -25,11 +28,13 @@ impl<E: LocalDeployerEnv> LocalNodeManagerState<E> {
node_name: &str, node_name: &str,
network_port: u16, network_port: u16,
client: E::NodeClient, client: E::NodeClient,
runtime: NodeRuntimeOptions,
node: Node<E>, node: Node<E>,
) { ) {
self.register_common(node_name, network_port, client); self.register_common(node_name, network_port, client);
let index = self.nodes.len(); let index = self.nodes.len();
self.indices_by_name.insert(node_name.to_string(), index); self.indices_by_name.insert(node_name.to_string(), index);
self.runtime_by_name.insert(node_name.to_string(), runtime);
self.node_count += 1; self.node_count += 1;
self.nodes.push(node); self.nodes.push(node);
} }

View File

@ -168,6 +168,10 @@ impl<Config: Clone + Send + Sync + 'static, Client: Clone + Send + Sync + 'stati
&self.endpoints &self.endpoints
} }
pub fn working_dir(&self) -> &Path {
self.tempdir.path()
}
pub fn pid(&self) -> u32 { pub fn pid(&self) -> u32 {
self.child.id().unwrap_or_default() self.child.id().unwrap_or_default()
} }
@ -255,6 +259,16 @@ impl<Config: Clone + Send + Sync + 'static, Client: Clone + Send + Sync + 'stati
Ok(()) Ok(())
} }
pub async fn restart_with_launch(
&mut self,
launch: LaunchSpec,
) -> Result<(), ProcessSpawnError> {
self.stop_child().await?;
self.launch = launch;
self.child = self.spawn_child().await?;
Ok(())
}
pub async fn stop(&mut self) { pub async fn stop(&mut self) {
let _ = self.stop_child().await; let _ = self.stop_child().await;
} }