From e5406badd2e31f88a1a885b7c062542b59490849 Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 10 Apr 2026 16:43:13 +0200 Subject: [PATCH] refactor(local): redesign runtime around startup access lifecycle --- .../deployers/local/src/env/helpers.rs | 58 ++- .../deployers/local/src/env/mod.rs | 371 +++++------------- .../deployers/local/src/env/runtime.rs | 264 +++++++++++-- .../deployers/local/src/env/tests.rs | 72 ++-- .../deployers/local/src/external.rs | 4 +- testing-framework/deployers/local/src/lib.rs | 14 +- .../deployers/local/src/node_control/mod.rs | 20 +- 7 files changed, 441 insertions(+), 362 deletions(-) diff --git a/testing-framework/deployers/local/src/env/helpers.rs b/testing-framework/deployers/local/src/env/helpers.rs index fae98c5..0602c7f 100644 --- a/testing-framework/deployers/local/src/env/helpers.rs +++ b/testing-framework/deployers/local/src/env/helpers.rs @@ -1,12 +1,18 @@ use std::{collections::HashMap, path::PathBuf}; use serde::Serialize; -use testing_framework_core::scenario::{ - Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError, - NodeAccess, +use testing_framework_core::{ + scenario::{ + Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError, + NodeAccess, + }, + topology::DeploymentDescriptor, }; -use crate::process::{LaunchSpec, NodeEndpointPort, NodeEndpoints, ProcessSpawnError}; +use crate::{ + env::LocalBuildContext, + process::{LaunchSpec, NodeEndpointPort, NodeEndpoints, ProcessSpawnError}, +}; pub struct BuiltNodeConfig { pub config: Config, @@ -242,6 +248,50 @@ pub fn build_local_peer_nodes(peer_ports: &[u16], self_index: usize) -> Vec( + topology: &E::Deployment, + node_name_prefix: &str, + port_names: &[&'static str], + build_node: impl Fn(LocalBuildContext<'_, E>) -> Result, DynError>, +) -> Result>, ProcessSpawnError> +where + E: Application, +{ + let reserved_ports = + reserve_local_node_ports(topology.node_count(), port_names, node_name_prefix)?; + let peer_ports = reserved_ports + .iter() + .map(LocalNodePorts::network_port) + .collect::>(); + let peer_ports_by_name = HashMap::new(); + let options = testing_framework_core::scenario::StartNodeOptions::::default(); + + reserved_ports + .iter() + .enumerate() + .map(|(index, ports)| { + let compact_peer_ports = compact_peer_ports(&peer_ports, index); + let peers = build_local_peer_nodes(&compact_peer_ports, index); + let built = build_node(LocalBuildContext { + topology, + index, + ports, + peers: &peers, + peer_ports: &compact_peer_ports, + peer_ports_by_name: &peer_ports_by_name, + options: &options, + template_config: None, + }) + .map_err(|source| ProcessSpawnError::Config { source })?; + + Ok(NodeConfigEntry { + name: format!("{node_name_prefix}-{index}"), + config: built.config, + }) + }) + .collect() +} + pub fn yaml_config_launch_spec( config: &T, spec: &LocalProcessSpec, diff --git a/testing-framework/deployers/local/src/env/mod.rs b/testing-framework/deployers/local/src/env/mod.rs index fde33de..626e773 100644 --- a/testing-framework/deployers/local/src/env/mod.rs +++ b/testing-framework/deployers/local/src/env/mod.rs @@ -1,19 +1,13 @@ -use std::{ - collections::HashMap, - net::{Ipv4Addr, SocketAddr}, - path::{Path, PathBuf}, +use testing_framework_core::scenario::{ + Application, DynError, HttpReadinessRequirement, ReadinessError, + wait_for_http_ports_with_requirement, }; -use testing_framework_core::{ - scenario::{ - Application, DynError, HttpReadinessRequirement, ReadinessError, StartNodeOptions, - wait_for_http_ports_with_requirement, - }, - topology::DeploymentDescriptor, +use crate::{ + LaunchSpec, NodeEndpoints, + process::{ProcessNode, ProcessSpawnError}, }; -use crate::process::{LaunchSpec, NodeEndpoints, ProcessNode, ProcessSpawnError}; - mod helpers; mod runtime; #[cfg(test)] @@ -27,282 +21,107 @@ pub use helpers::{ text_node_config, yaml_config_launch_spec, yaml_node_config, }; pub use runtime::{ - LocalAccess, LocalBuildContext, LocalProcess, LocalRuntime, cluster_node_config_from_context, + LocalAccess, LocalBuildContext, LocalLifecycle, LocalProcess, LocalRuntime, + LocalStableReadinessFuture, cluster_node_config_from_context, }; pub type Node = ProcessNode<::NodeConfig, ::NodeClient>; -#[async_trait::async_trait] pub trait LocalDeployerEnv: Application + Sized where ::NodeConfig: Clone + Send + Sync + 'static, { - fn local_runtime() -> Option> { - None - } + fn local_runtime() -> LocalRuntime; +} - fn local_port_names() -> &'static [&'static str] { - Self::local_runtime() - .map(|runtime| runtime.process.port_names) - .unwrap_or_else(Self::initial_local_port_names) - } +pub(crate) fn runtime_for() -> LocalRuntime { + E::local_runtime() +} - fn build_node_config( - topology: &Self::Deployment, - index: usize, - peer_ports_by_name: &HashMap, - options: &StartNodeOptions, - peer_ports: &[u16], - ) -> Result::NodeConfig>, DynError> { - Self::build_node_config_from_template( - topology, - index, - peer_ports_by_name, - options, - peer_ports, - None, - ) - } +pub(crate) fn build_node_from_template( + topology: &E::Deployment, + index: usize, + peer_ports_by_name: &std::collections::HashMap, + options: &testing_framework_core::scenario::StartNodeOptions, + peer_ports: &[u16], + template_config: Option<&E::NodeConfig>, +) -> Result, DynError> { + let runtime = runtime_for::(); + let mut reserved = reserve_local_node_ports(1, runtime.process.port_names, "node") + .map_err(|source| -> DynError { source.into() })?; + let ports = reserved + .pop() + .ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?; + let peers = build_local_peer_nodes(peer_ports, index); - fn build_node_config_from_template( - topology: &Self::Deployment, - index: usize, - peer_ports_by_name: &HashMap, - options: &StartNodeOptions, - peer_ports: &[u16], - template_config: Option<&::NodeConfig>, - ) -> Result::NodeConfig>, DynError> { - let mut reserved = reserve_local_node_ports(1, Self::local_port_names(), "node") - .map_err(|source| -> DynError { source.into() })?; - let ports = reserved - .pop() - .ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?; - let network_port = ports.network_port(); - let config = Self::build_local_node_config( - topology, - index, - &ports, - peer_ports_by_name, - options, - peer_ports, - template_config, - )?; + runtime.process.build_node(LocalBuildContext { + topology, + index, + ports: &ports, + peers: &peers, + peer_ports, + peer_ports_by_name, + options, + template_config, + }) +} - Ok(BuiltNodeConfig { - config, - network_port, - }) - } +pub(crate) fn build_initial_node_configs( + topology: &E::Deployment, +) -> Result>, ProcessSpawnError> { + runtime_for::().process.build_initial_nodes(topology) +} - fn build_initial_node_configs( - topology: &Self::Deployment, - ) -> Result::NodeConfig>>, ProcessSpawnError> { - let reserved_ports = reserve_local_node_ports( - topology.node_count(), - Self::local_port_names(), - Self::initial_node_name_prefix(), - )?; - let peer_ports = reserved_ports - .iter() - .map(LocalNodePorts::network_port) - .collect::>(); +pub(crate) fn initial_persist_dir( + topology: &E::Deployment, + node_name: &str, + index: usize, +) -> Option { + runtime_for::() + .lifecycle + .initial_persist_dir(topology, node_name, index) +} - let mut configs = Vec::with_capacity(topology.node_count()); - for (index, ports) in reserved_ports.iter().enumerate() { - let config = Self::build_initial_node_config(topology, index, ports, &peer_ports) - .map_err(|source| ProcessSpawnError::Config { source })?; - configs.push(NodeConfigEntry { - name: format!("{}-{index}", Self::initial_node_name_prefix()), - config, - }); - } +pub(crate) fn initial_snapshot_dir( + topology: &E::Deployment, + node_name: &str, + index: usize, +) -> Option { + runtime_for::() + .lifecycle + .initial_snapshot_dir(topology, node_name, index) +} - Ok(configs) - } +pub(crate) fn build_launch_spec( + config: &E::NodeConfig, + dir: &std::path::Path, + label: &str, +) -> Result { + runtime_for::() + .process + .build_launch_spec(config, dir, label) +} - fn initial_node_name_prefix() -> &'static str { - Self::local_runtime() - .map(|runtime| runtime.process.node_name_prefix) - .unwrap_or("node") - } +pub(crate) fn node_endpoints( + config: &E::NodeConfig, +) -> Result { + runtime_for::().access.node_endpoints(config) +} - fn initial_local_port_names() -> &'static [&'static str] { - &[] - } +pub(crate) fn node_client( + endpoints: &NodeEndpoints, +) -> Result { + runtime_for::().access.node_client(endpoints) +} - fn build_initial_node_config( - topology: &Self::Deployment, - index: usize, - ports: &LocalNodePorts, - peer_ports: &[u16], - ) -> Result<::NodeConfig, DynError> { - let compact_peer_ports = helpers::compact_peer_ports(peer_ports, index); - let peer_ports_by_name = HashMap::new(); - let options = StartNodeOptions::::default(); - Self::build_local_node_config( - topology, - index, - ports, - &peer_ports_by_name, - &options, - &compact_peer_ports, - None, - ) - } +pub(crate) fn node_peer_port(node: &Node) -> u16 { + runtime_for::() + .access + .node_peer_port(node.config(), node.endpoints()) +} - fn build_local_node_config( - topology: &Self::Deployment, - index: usize, - ports: &LocalNodePorts, - peer_ports_by_name: &HashMap, - options: &StartNodeOptions, - peer_ports: &[u16], - template_config: Option<&::NodeConfig>, - ) -> Result<::NodeConfig, DynError> { - let peers = build_local_peer_nodes(peer_ports, index); - Self::build_local_node_config_with_peers( - topology, - index, - ports, - &peers, - peer_ports_by_name, - options, - template_config, - ) - } - - fn build_local_node_config_with_peers( - topology: &Self::Deployment, - index: usize, - ports: &LocalNodePorts, - peers: &[LocalPeerNode], - peer_ports_by_name: &HashMap, - options: &StartNodeOptions, - template_config: Option<&::NodeConfig>, - ) -> Result<::NodeConfig, DynError> { - if let Some(runtime) = Self::local_runtime() { - return (runtime.process.build_config)(LocalBuildContext { - topology, - index, - ports, - peers, - peer_ports_by_name, - options, - template_config, - }); - } - - Err(std::io::Error::other( - "build_local_node_config_with_peers is not implemented for this app", - ) - .into()) - } - - fn initial_persist_dir( - _topology: &Self::Deployment, - _node_name: &str, - _index: usize, - ) -> Option { - None - } - - fn initial_snapshot_dir( - _topology: &Self::Deployment, - _node_name: &str, - _index: usize, - ) -> Option { - None - } - - fn local_process_spec() -> Option { - Self::local_runtime().map(|runtime| runtime.process.spec) - } - - fn render_local_config( - config: &::NodeConfig, - ) -> Result, DynError> { - if let Some(runtime) = Self::local_runtime() { - return (runtime.process.render_config)(config); - } - - Err(std::io::Error::other("render_local_config is not implemented for this app").into()) - } - - fn build_launch_spec( - config: &::NodeConfig, - _dir: &Path, - _label: &str, - ) -> Result { - let spec = Self::local_process_spec().ok_or_else(|| { - std::io::Error::other("build_launch_spec is not implemented for this app") - })?; - let rendered = Self::render_local_config(config)?; - helpers::rendered_config_launch_spec(rendered, &spec) - } - - fn http_api_port(config: &::NodeConfig) -> Option { - Self::local_runtime() - .and_then(|runtime| runtime.access.api_port.map(|api_port| api_port(config))) - } - - fn node_endpoints( - config: &::NodeConfig, - ) -> Result { - if let Some(runtime) = Self::local_runtime() { - return runtime.access.node_endpoints(config); - } - - if let Some(port) = Self::http_api_port(config) { - return Ok(NodeEndpoints { - api: SocketAddr::from((Ipv4Addr::LOCALHOST, port)), - extra_ports: HashMap::new(), - }); - } - - Err(std::io::Error::other("node_endpoints is not implemented for this app").into()) - } - - fn node_peer_port(node: &Node) -> u16 { - if let Some(runtime) = Self::local_runtime() { - return runtime - .access - .node_peer_port(node.config(), node.endpoints()); - } - - node.endpoints().api.port() - } - - fn node_client_from_api_endpoint(_api: SocketAddr) -> Option { - None - } - - fn node_client(endpoints: &NodeEndpoints) -> Result { - if let Some(runtime) = Self::local_runtime() { - return runtime.access.node_client(endpoints); - } - - if let Ok(client) = - ::build_node_client(&discovered_node_access(endpoints)) - { - return Ok(client); - } - - if let Some(client) = Self::node_client_from_api_endpoint(endpoints.api) { - return Ok(client); - } - - Err(std::io::Error::other("node_client is not implemented for this app").into()) - } - - fn readiness_endpoint_path() -> &'static str { - Self::local_runtime() - .map(|runtime| runtime.access.readiness_path) - .unwrap_or_else(::node_readiness_path) - } - - async fn wait_readiness_stable(_nodes: &[Node]) -> Result<(), DynError> { - Ok(()) - } +pub(crate) fn readiness_endpoint_path() -> &'static str { + runtime_for::().access.readiness_path() } pub async fn wait_local_http_readiness( @@ -313,9 +132,13 @@ pub async fn wait_local_http_readiness( .iter() .map(|node| node.endpoints().api.port()) .collect(); - wait_for_http_ports_with_requirement(&ports, E::readiness_endpoint_path(), requirement).await?; - E::wait_readiness_stable(nodes) + wait_for_http_ports_with_requirement(&ports, readiness_endpoint_path::(), requirement) + .await?; + + runtime_for::() + .lifecycle + .wait_stable(nodes) .await .map_err(|source| ReadinessError::ClusterStable { source }) } @@ -330,12 +153,12 @@ pub async fn spawn_node_from_config( ProcessNode::spawn( &label, config, - E::build_launch_spec, - E::node_endpoints, + build_launch_spec::, + node_endpoints::, keep_tempdir, persist_dir, snapshot_dir, - E::node_client, + node_client::, ) .await } diff --git a/testing-framework/deployers/local/src/env/runtime.rs b/testing-framework/deployers/local/src/env/runtime.rs index 79844b3..28e72d2 100644 --- a/testing-framework/deployers/local/src/env/runtime.rs +++ b/testing-framework/deployers/local/src/env/runtime.rs @@ -1,6 +1,8 @@ use std::{ collections::HashMap, - net::{Ipv4Addr, SocketAddr}, + future::Future, + path::{Path, PathBuf}, + pin::Pin, }; use serde::Serialize; @@ -10,10 +12,10 @@ use testing_framework_core::scenario::{ use crate::{ env::{ - LocalNodePorts, LocalPeerNode, LocalProcessSpec, NodeEndpoints, discovered_node_access, - yaml_node_config, + BuiltNodeConfig, LocalNodePorts, LocalPeerNode, LocalProcessSpec, Node, NodeConfigEntry, + NodeEndpoints, build_local_cluster_node_config, discovered_node_access, yaml_node_config, }, - process::LaunchEnvVar, + process::{LaunchEnvVar, LaunchSpec, ProcessSpawnError}, }; pub struct LocalBuildContext<'a, E: Application> { @@ -21,6 +23,7 @@ pub struct LocalBuildContext<'a, E: Application> { pub index: usize, pub ports: &'a LocalNodePorts, pub peers: &'a [LocalPeerNode], + pub peer_ports: &'a [u16], pub peer_ports_by_name: &'a HashMap, pub options: &'a StartNodeOptions, pub template_config: Option<&'a E::NodeConfig>, @@ -29,24 +32,78 @@ pub struct LocalBuildContext<'a, E: Application> { pub type LocalConfigBuilder = for<'a> fn(LocalBuildContext<'a, E>) -> Result<::NodeConfig, DynError>; +pub type LocalDynamicNodeBuilder = + for<'a> fn( + LocalBuildContext<'a, E>, + ) -> Result::NodeConfig>, DynError>; + pub type LocalConfigRenderer = fn(&::NodeConfig) -> Result, DynError>; +pub type LocalInitialNodesBuilder = + fn( + &::Deployment, + ) -> Result::NodeConfig>>, ProcessSpawnError>; + +pub type LocalLaunchSpecBuilder = + fn(&::NodeConfig, &Path, &str) -> Result; + pub type LocalApiPort = fn(&::NodeConfig) -> u16; - pub type LocalEndpoints = fn(&::NodeConfig) -> Result; - pub type LocalClientBuilder = fn(&NodeAccess) -> Result<::NodeClient, DynError>; - pub type LocalPeerPort = fn(&::NodeConfig, &NodeEndpoints) -> u16; +pub type LocalPersistDir = fn(&::Deployment, &str, usize) -> Option; +pub type LocalSnapshotDir = fn(&::Deployment, &str, usize) -> Option; +pub type LocalStableReadinessFuture<'a> = + Pin> + Send + 'a>>; +pub type LocalStableReadiness = for<'a> fn(&'a [Node]) -> LocalStableReadinessFuture<'a>; + +#[derive(Clone)] +enum LocalDynamicNode { + Standard { build_config: LocalConfigBuilder }, + Custom(LocalDynamicNodeBuilder), +} + +impl LocalDynamicNode { + fn build( + &self, + context: LocalBuildContext<'_, E>, + ) -> Result, DynError> { + match self { + Self::Standard { build_config } => { + let network_port = context.ports.network_port(); + Ok(BuiltNodeConfig { + config: build_config(context)?, + network_port, + }) + } + Self::Custom(build) => build(context), + } + } +} + +#[derive(Clone)] +enum LocalInitialNodes { + Generated, + Custom(LocalInitialNodesBuilder), +} + +#[derive(Clone)] +enum LocalLaunch { + Rendered { + spec: LocalProcessSpec, + render_config: LocalConfigRenderer, + }, + Custom(LocalLaunchSpecBuilder), +} #[derive(Clone)] pub struct LocalProcess { - pub(crate) spec: LocalProcessSpec, - pub(crate) build_config: LocalConfigBuilder, - pub(crate) render_config: LocalConfigRenderer, pub(crate) node_name_prefix: &'static str, pub(crate) port_names: &'static [&'static str], + dynamic_node: LocalDynamicNode, + initial_nodes: LocalInitialNodes, + launch: LocalLaunch, } impl LocalProcess { @@ -58,11 +115,28 @@ impl LocalProcess { render_config: LocalConfigRenderer, ) -> Self { Self { - spec: LocalProcessSpec::new(binary_env_var, binary_name), - build_config, - render_config, node_name_prefix: "node", port_names: &[], + dynamic_node: LocalDynamicNode::Standard { build_config }, + initial_nodes: LocalInitialNodes::Generated, + launch: LocalLaunch::Rendered { + spec: LocalProcessSpec::new(binary_env_var, binary_name), + render_config, + }, + } + } + + #[must_use] + pub fn custom( + build_node: LocalDynamicNodeBuilder, + build_launch_spec: LocalLaunchSpecBuilder, + ) -> Self { + Self { + node_name_prefix: "node", + port_names: &[], + dynamic_node: LocalDynamicNode::Custom(build_node), + initial_nodes: LocalInitialNodes::Generated, + launch: LocalLaunch::Custom(build_launch_spec), } } @@ -78,35 +152,91 @@ impl LocalProcess { self } + #[must_use] + pub fn with_initial_nodes(mut self, build_initial_nodes: LocalInitialNodesBuilder) -> Self { + self.initial_nodes = LocalInitialNodes::Custom(build_initial_nodes); + self + } + #[must_use] pub fn with_config_file(mut self, file_name: &str, arg: &str) -> Self { - self.spec = self.spec.with_config_file(file_name, arg); + if let LocalLaunch::Rendered { spec, .. } = &mut self.launch { + *spec = spec.clone().with_config_file(file_name, arg); + } self } #[must_use] pub fn with_env(mut self, key: &str, value: &str) -> Self { - self.spec = self.spec.with_env(key, value); + if let LocalLaunch::Rendered { spec, .. } = &mut self.launch { + *spec = spec.clone().with_env(key, value); + } self } #[must_use] pub fn with_rust_log(mut self, value: &str) -> Self { - self.spec = self.spec.with_rust_log(value); + if let LocalLaunch::Rendered { spec, .. } = &mut self.launch { + *spec = spec.clone().with_rust_log(value); + } self } #[must_use] pub fn with_args(mut self, args: impl IntoIterator) -> Self { - self.spec = self.spec.with_args(args); + if let LocalLaunch::Rendered { spec, .. } = &mut self.launch { + *spec = spec.clone().with_args(args); + } self } #[must_use] pub fn with_launch_env(mut self, vars: impl IntoIterator) -> Self { - self.spec.env.extend(vars); + if let LocalLaunch::Rendered { spec, .. } = &mut self.launch { + spec.env.extend(vars); + } self } + + pub(crate) fn build_node( + &self, + context: LocalBuildContext<'_, E>, + ) -> Result, DynError> { + self.dynamic_node.build(context) + } + + pub(crate) fn build_initial_nodes( + &self, + topology: &E::Deployment, + ) -> Result>, ProcessSpawnError> + where + E::NodeConfig: Clone, + { + match self.initial_nodes { + LocalInitialNodes::Generated => super::helpers::build_generated_initial_nodes::( + topology, + self.node_name_prefix, + self.port_names, + |context| self.build_node(context), + ), + LocalInitialNodes::Custom(build) => build(topology), + } + } + + pub(crate) fn build_launch_spec( + &self, + config: &E::NodeConfig, + dir: &Path, + label: &str, + ) -> Result { + match &self.launch { + LocalLaunch::Rendered { + spec, + render_config, + } => super::helpers::rendered_config_launch_spec(render_config(config)?, spec), + LocalLaunch::Custom(build) => build(config, dir, label), + } + } } impl LocalProcess @@ -131,11 +261,11 @@ where #[derive(Clone)] pub struct LocalAccess { - pub(crate) api_port: Option>, - pub(crate) endpoints: Option>, - pub(crate) client: Option>, - pub(crate) peer_port: Option>, - pub(crate) readiness_path: &'static str, + api_port: Option>, + endpoints: Option>, + client: Option>, + peer_port: Option>, + readiness_path: &'static str, } impl LocalAccess { @@ -185,10 +315,7 @@ impl LocalAccess { } if let Some(api_port) = self.api_port { - return Ok(NodeEndpoints { - api: SocketAddr::from((Ipv4Addr::LOCALHOST, api_port(config))), - extra_ports: HashMap::new(), - }); + return Ok(NodeEndpoints::from_api_port(api_port(config))); } Err(std::io::Error::other("node endpoints are not configured").into()) @@ -207,18 +334,97 @@ impl LocalAccess { .map(|peer_port| peer_port(config, endpoints)) .unwrap_or_else(|| endpoints.api.port()) } + + pub(crate) fn readiness_path(&self) -> &'static str { + self.readiness_path + } +} + +#[derive(Clone)] +pub struct LocalLifecycle { + initial_persist_dir: Option>, + initial_snapshot_dir: Option>, + stable_readiness: Option>, +} + +impl LocalLifecycle { + #[must_use] + pub fn new() -> Self { + Self { + initial_persist_dir: None, + initial_snapshot_dir: None, + stable_readiness: None, + } + } + + #[must_use] + pub fn with_initial_persist_dir(mut self, persist_dir: LocalPersistDir) -> Self { + self.initial_persist_dir = Some(persist_dir); + self + } + + #[must_use] + pub fn with_initial_snapshot_dir(mut self, snapshot_dir: LocalSnapshotDir) -> Self { + self.initial_snapshot_dir = Some(snapshot_dir); + self + } + + #[must_use] + pub fn with_stable_readiness(mut self, stable_readiness: LocalStableReadiness) -> Self { + self.stable_readiness = Some(stable_readiness); + self + } + + pub(crate) fn initial_persist_dir( + &self, + topology: &E::Deployment, + node_name: &str, + index: usize, + ) -> Option { + self.initial_persist_dir + .and_then(|persist_dir| persist_dir(topology, node_name, index)) + } + + pub(crate) fn initial_snapshot_dir( + &self, + topology: &E::Deployment, + node_name: &str, + index: usize, + ) -> Option { + self.initial_snapshot_dir + .and_then(|snapshot_dir| snapshot_dir(topology, node_name, index)) + } + + pub(crate) async fn wait_stable(&self, nodes: &[Node]) -> Result<(), DynError> { + if let Some(stable_readiness) = self.stable_readiness { + return stable_readiness(nodes).await; + } + + Ok(()) + } } #[derive(Clone)] pub struct LocalRuntime { pub(crate) process: LocalProcess, pub(crate) access: LocalAccess, + pub(crate) lifecycle: LocalLifecycle, } impl LocalRuntime { #[must_use] pub fn new(process: LocalProcess, access: LocalAccess) -> Self { - Self { process, access } + Self { + process, + access, + lifecycle: LocalLifecycle::new(), + } + } + + #[must_use] + pub fn with_lifecycle(mut self, lifecycle: LocalLifecycle) -> Self { + self.lifecycle = lifecycle; + self } } @@ -228,5 +434,5 @@ pub fn cluster_node_config_from_context( where E: Application + ClusterNodeConfigApplication, { - crate::env::build_local_cluster_node_config::(context.index, context.ports, context.peers) + build_local_cluster_node_config::(context.index, context.ports, context.peers) } diff --git a/testing-framework/deployers/local/src/env/tests.rs b/testing-framework/deployers/local/src/env/tests.rs index 8037c5c..a9a6c69 100644 --- a/testing-framework/deployers/local/src/env/tests.rs +++ b/testing-framework/deployers/local/src/env/tests.rs @@ -1,9 +1,5 @@ -use std::{ - path::Path, - sync::atomic::{AtomicUsize, Ordering}, -}; +use std::sync::atomic::{AtomicUsize, Ordering}; -use async_trait::async_trait; use testing_framework_core::{ scenario::{Application, DynError, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients}, topology::DeploymentDescriptor, @@ -25,7 +21,7 @@ impl Feed for DummyFeed { #[derive(Default)] struct DummyFeedRuntime; -#[async_trait] +#[async_trait::async_trait] impl FeedRuntime for DummyFeedRuntime { type Feed = DummyFeed; @@ -46,7 +42,7 @@ impl DeploymentDescriptor for DummyTopology { struct DummyEnv; -#[async_trait] +#[async_trait::async_trait] impl Application for DummyEnv { type Deployment = DummyTopology; type NodeClient = (); @@ -60,46 +56,46 @@ impl Application for DummyEnv { } } -#[async_trait] impl LocalDeployerEnv for DummyEnv { - fn build_node_config( - _topology: &Self::Deployment, - _index: usize, - _peer_ports_by_name: &std::collections::HashMap, - _options: &StartNodeOptions, - _peer_ports: &[u16], - ) -> Result::NodeConfig>, DynError> { - unreachable!("not used in this test") + fn local_runtime() -> LocalRuntime { + LocalRuntime::new( + LocalProcess::custom(build_dummy_node, build_dummy_launch_spec) + .with_initial_nodes(build_dummy_initial_nodes), + LocalAccess::custom(dummy_endpoints).with_client(|_| Ok(())), + ) + .with_lifecycle(LocalLifecycle::new().with_stable_readiness(dummy_wait_stable)) } +} - fn build_initial_node_configs( - _topology: &Self::Deployment, - ) -> Result::NodeConfig>>, ProcessSpawnError> { - unreachable!("not used in this test") - } +fn build_dummy_node( + _context: LocalBuildContext<'_, DummyEnv>, +) -> Result, DynError> { + unreachable!("not used in this test") +} - fn build_launch_spec( - _config: &::NodeConfig, - _dir: &Path, - _label: &str, - ) -> Result { - Ok(crate::process::LaunchSpec::default()) - } +fn build_dummy_initial_nodes( + _topology: &DummyTopology, +) -> Result>, crate::process::ProcessSpawnError> { + unreachable!("not used in this test") +} - fn node_endpoints( - _config: &::NodeConfig, - ) -> Result { - Ok(NodeEndpoints::default()) - } +fn build_dummy_launch_spec( + _config: &DummyConfig, + _dir: &std::path::Path, + _label: &str, +) -> Result { + Ok(crate::process::LaunchSpec::default()) +} - fn node_client(_endpoints: &NodeEndpoints) -> Result { - Ok(()) - } +fn dummy_endpoints(_config: &DummyConfig) -> Result { + Ok(NodeEndpoints::default()) +} - async fn wait_readiness_stable(_nodes: &[Node]) -> Result<(), DynError> { +fn dummy_wait_stable<'a>(_nodes: &'a [Node]) -> runtime::LocalStableReadinessFuture<'a> { + Box::pin(async { STABLE_CALLS.fetch_add(1, Ordering::SeqCst); Ok(()) - } + }) } #[tokio::test] diff --git a/testing-framework/deployers/local/src/external.rs b/testing-framework/deployers/local/src/external.rs index 09de43d..46b84d6 100644 --- a/testing-framework/deployers/local/src/external.rs +++ b/testing-framework/deployers/local/src/external.rs @@ -2,7 +2,7 @@ use std::net::ToSocketAddrs; use testing_framework_core::scenario::{DynError, ExternalNodeSource}; -use crate::{LocalDeployerEnv, NodeEndpoints}; +use crate::{LocalDeployerEnv, NodeEndpoints, env::node_client}; #[derive(Debug, thiserror::Error)] pub enum ExternalClientBuildError { @@ -31,7 +31,7 @@ pub fn build_external_client( let api = resolve_api_socket(source)?; let mut endpoints = NodeEndpoints::default(); endpoints.api = api; - E::node_client(&endpoints) + node_client::(&endpoints) } fn resolve_api_socket(source: &ExternalNodeSource) -> Result { diff --git a/testing-framework/deployers/local/src/lib.rs b/testing-framework/deployers/local/src/lib.rs index 71b96ef..a2d7317 100644 --- a/testing-framework/deployers/local/src/lib.rs +++ b/testing-framework/deployers/local/src/lib.rs @@ -9,13 +9,13 @@ pub mod process; pub use binary::{BinaryConfig, BinaryResolver}; pub use deployer::{ProcessDeployer, ProcessDeployerError}; pub use env::{ - BuiltNodeConfig, LocalAccess, LocalBuildContext, LocalDeployerEnv, LocalNodePorts, - LocalPeerNode, LocalProcess, LocalProcessSpec, LocalRuntime, NodeConfigEntry, - build_indexed_http_peers, build_indexed_node_configs, build_local_cluster_node_config, - build_local_peer_nodes, cluster_node_config_from_context, default_yaml_launch_spec, - discovered_node_access, preallocate_ports, reserve_local_node_ports, - single_http_node_endpoints, text_config_launch_spec, text_node_config, yaml_config_launch_spec, - yaml_node_config, + BuiltNodeConfig, LocalAccess, LocalBuildContext, LocalDeployerEnv, LocalLifecycle, + LocalNodePorts, LocalPeerNode, LocalProcess, LocalProcessSpec, LocalRuntime, + LocalStableReadinessFuture, NodeConfigEntry, build_indexed_http_peers, + build_indexed_node_configs, build_local_cluster_node_config, build_local_peer_nodes, + cluster_node_config_from_context, default_yaml_launch_spec, discovered_node_access, + preallocate_ports, reserve_local_node_ports, single_http_node_endpoints, + text_config_launch_spec, text_node_config, yaml_config_launch_spec, yaml_node_config, }; pub use manual::{ManualCluster, ManualClusterError}; pub use node_control::{NodeManager, NodeManagerError, NodeManagerSeed}; diff --git a/testing-framework/deployers/local/src/node_control/mod.rs b/testing-framework/deployers/local/src/node_control/mod.rs index 324878a..149a0dc 100644 --- a/testing-framework/deployers/local/src/node_control/mod.rs +++ b/testing-framework/deployers/local/src/node_control/mod.rs @@ -10,7 +10,11 @@ use testing_framework_core::scenario::{ use thiserror::Error; use crate::{ - env::{LocalDeployerEnv, Node, spawn_node_from_config}, + env::{ + LocalDeployerEnv, Node, build_initial_node_configs, build_node_from_template, + initial_persist_dir, initial_snapshot_dir, node_peer_port, readiness_endpoint_path, + spawn_node_from_config, + }, process::ProcessSpawnError, }; @@ -79,12 +83,12 @@ impl NodeManager { descriptors: &E::Deployment, keep_tempdir: bool, ) -> Result>, ProcessSpawnError> { - let configs = E::build_initial_node_configs(descriptors)?; + let configs = build_initial_node_configs::(descriptors)?; let mut spawned = Vec::with_capacity(configs.len()); for (index, config_entry) in configs.into_iter().enumerate() { - let persist_dir = E::initial_persist_dir(descriptors, &config_entry.name, index); - let snapshot_dir = E::initial_snapshot_dir(descriptors, &config_entry.name, index); + let persist_dir = initial_persist_dir::(descriptors, &config_entry.name, index); + let snapshot_dir = initial_snapshot_dir::(descriptors, &config_entry.name, index); spawned.push( spawn_node_from_config::( config_entry.name, @@ -174,7 +178,7 @@ impl NodeManager { for (idx, node) in nodes.into_iter().enumerate() { let name = default_node_label(idx); - let port = E::node_peer_port(&node); + let port = node_peer_port::(&node); let client = node.client(); self.node_clients.add_node(client.clone()); @@ -201,7 +205,7 @@ impl NodeManager { return Ok(()); } - wait_for_http_ports(&ports, E::readiness_endpoint_path()).await + wait_for_http_ports(&ports, readiness_endpoint_path::()).await } pub async fn wait_node_ready(&self, name: &str) -> Result<(), NodeManagerError> { @@ -224,7 +228,7 @@ impl NodeManager { })? }; - wait_for_http_ports(&[port], E::readiness_endpoint_path()) + wait_for_http_ports(&[port], readiness_endpoint_path::()) .await .map_err(|source| NodeManagerError::Readiness { source }) } @@ -236,7 +240,7 @@ impl NodeManager { ) -> Result, NodeManagerError> { let snapshot = self.start_snapshot(name)?; - let mut built = E::build_node_config_from_template( + let mut built = build_node_from_template::( &self.descriptors, snapshot.index, &snapshot.peer_ports_by_name,