diff --git a/testing-framework/deployers/local/src/env.rs b/testing-framework/deployers/local/src/env.rs deleted file mode 100644 index 8af34e9..0000000 --- a/testing-framework/deployers/local/src/env.rs +++ /dev/null @@ -1,710 +0,0 @@ -use std::{ - collections::HashMap, - net::{Ipv4Addr, SocketAddr}, - path::{Path, PathBuf}, -}; - -use serde::Serialize; -use testing_framework_core::{ - scenario::{ - Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError, - HttpReadinessRequirement, NodeAccess, ReadinessError, StartNodeOptions, - wait_for_http_ports_with_requirement, - }, - topology::DeploymentDescriptor, -}; - -use crate::process::{LaunchSpec, NodeEndpointPort, NodeEndpoints, ProcessNode, ProcessSpawnError}; - -pub type Node = ProcessNode<::NodeConfig, ::NodeClient>; - -pub struct BuiltNodeConfig { - pub config: Config, - pub network_port: u16, -} - -pub struct NodeConfigEntry { - pub name: String, - pub config: NodeConfigValue, -} - -pub struct LocalNodePorts { - network_port: u16, - named_ports: HashMap<&'static str, u16>, -} - -impl LocalNodePorts { - #[must_use] - pub fn network_port(&self) -> u16 { - self.network_port - } - - #[must_use] - pub fn get(&self, name: &str) -> Option { - self.named_ports.get(name).copied() - } - - pub fn require(&self, name: &str) -> Result { - self.get(name) - .ok_or_else(|| format!("missing reserved local port '{name}'").into()) - } - - pub fn iter(&self) -> impl Iterator + '_ { - self.named_ports.iter().map(|(name, port)| (*name, *port)) - } -} - -#[derive(Clone, Debug)] -pub struct LocalPeerNode { - index: usize, - network_port: u16, -} - -impl LocalPeerNode { - #[must_use] - pub fn index(&self) -> usize { - self.index - } - - #[must_use] - pub fn network_port(&self) -> u16 { - self.network_port - } - - #[must_use] - pub fn http_address(&self) -> String { - format!("127.0.0.1:{}", self.network_port) - } - - #[must_use] - pub fn authority(&self) -> String { - self.http_address() - } -} - -#[derive(Clone, Default)] -pub struct LocalProcessSpec { - pub binary_env_var: String, - pub binary_name: String, - pub config_file_name: String, - pub config_arg: String, - pub extra_args: Vec, - pub env: Vec, -} - -impl LocalProcessSpec { - #[must_use] - pub fn new(binary_env_var: &str, binary_name: &str) -> Self { - Self { - binary_env_var: binary_env_var.to_owned(), - binary_name: binary_name.to_owned(), - config_file_name: "config.yaml".to_owned(), - config_arg: "--config".to_owned(), - extra_args: Vec::new(), - env: Vec::new(), - } - } - - #[must_use] - pub fn with_config_file(mut self, file_name: &str, arg: &str) -> Self { - self.config_file_name = file_name.to_owned(); - self.config_arg = arg.to_owned(); - self - } - - #[must_use] - pub fn with_env(mut self, key: &str, value: &str) -> Self { - self.env.push(crate::process::LaunchEnvVar::new(key, value)); - self - } - - #[must_use] - pub fn with_rust_log(self, value: &str) -> Self { - self.with_env("RUST_LOG", value) - } - - #[must_use] - pub fn with_args(mut self, args: impl IntoIterator) -> Self { - self.extra_args.extend(args); - self - } -} - -pub fn preallocate_ports(count: usize, label: &str) -> Result, ProcessSpawnError> { - (0..count) - .map(|_| crate::process::allocate_available_port()) - .collect::, _>>() - .map_err(|source| ProcessSpawnError::Config { - source: format!("failed to pre-allocate {label} ports: {source}").into(), - }) -} - -pub fn build_indexed_node_configs( - count: usize, - name_prefix: &str, - build: impl FnMut(usize) -> T, -) -> Vec> { - (0..count) - .map(build) - .enumerate() - .map(|(index, config)| NodeConfigEntry { - name: format!("{name_prefix}-{index}"), - config, - }) - .collect() -} - -pub fn reserve_local_node_ports( - count: usize, - names: &[&'static str], - label: &str, -) -> Result, ProcessSpawnError> { - let network_ports = preallocate_ports(count, label)?; - let mut named_by_role = HashMap::new(); - for name in names { - named_by_role.insert(*name, preallocate_ports(count, &format!("{label} {name}"))?); - } - - Ok((0..count) - .map(|index| LocalNodePorts { - network_port: network_ports[index], - named_ports: named_by_role - .iter() - .map(|(name, ports)| (*name, ports[index])) - .collect(), - }) - .collect()) -} - -pub fn single_http_node_endpoints(port: u16) -> NodeEndpoints { - NodeEndpoints::from_api_port(port) -} - -pub fn build_local_cluster_node_config( - index: usize, - ports: &LocalNodePorts, - peers: &[LocalPeerNode], -) -> Result<::NodeConfig, DynError> -where - E: ClusterNodeConfigApplication, -{ - let mut node = ClusterNodeView::new(index, "127.0.0.1", ports.network_port()); - for (name, port) in ports.iter() { - node = node.with_named_port(name, port); - } - - let peer_views = peers - .iter() - .map(|peer| ClusterPeerView::new(peer.index(), "127.0.0.1", peer.network_port())) - .collect::>(); - - E::build_cluster_node_config(&node, &peer_views).map_err(Into::into) -} - -pub fn discovered_node_access(endpoints: &NodeEndpoints) -> NodeAccess { - let mut access = NodeAccess::new("127.0.0.1", endpoints.api.port()); - - for (key, port) in &endpoints.extra_ports { - match key { - NodeEndpointPort::TestingApi => { - access = access.with_testing_port(*port); - } - NodeEndpointPort::Custom(name) => { - access = access.with_named_port(name.clone(), *port); - } - NodeEndpointPort::Network => {} - } - } - - access -} - -pub fn build_indexed_http_peers( - node_count: usize, - self_index: usize, - peer_ports: &[u16], - mut build_peer: impl FnMut(usize, String) -> T, -) -> Vec { - (0..node_count) - .filter(|&i| i != self_index) - .map(|i| build_peer(i, format!("127.0.0.1:{}", peer_ports[i]))) - .collect() -} - -fn compact_peer_ports(peer_ports: &[u16], self_index: usize) -> Vec { - peer_ports - .iter() - .enumerate() - .filter_map(|(index, port)| (index != self_index).then_some(*port)) - .collect() -} - -pub fn build_local_peer_nodes(peer_ports: &[u16], self_index: usize) -> Vec { - peer_ports - .iter() - .enumerate() - .filter_map(|(index, port)| { - (index != self_index).then_some(LocalPeerNode { - index, - network_port: *port, - }) - }) - .collect() -} - -pub fn yaml_config_launch_spec( - config: &T, - spec: &LocalProcessSpec, -) -> Result { - let config_yaml = serde_yaml::to_string(config)?; - rendered_config_launch_spec(config_yaml.into_bytes(), spec) -} - -pub fn text_config_launch_spec( - rendered_config: impl Into>, - spec: &LocalProcessSpec, -) -> Result { - rendered_config_launch_spec(rendered_config.into(), spec) -} - -pub fn default_yaml_launch_spec( - config: &T, - binary_env_var: &str, - binary_name: &str, - rust_log: &str, -) -> Result { - yaml_config_launch_spec( - config, - &LocalProcessSpec::new(binary_env_var, binary_name).with_rust_log(rust_log), - ) -} - -pub fn yaml_node_config(config: &T) -> Result, DynError> { - Ok(serde_yaml::to_string(config)?.into_bytes()) -} - -pub fn text_node_config(rendered_config: impl Into>) -> Vec { - rendered_config.into() -} - -fn rendered_config_launch_spec( - rendered_config: Vec, - spec: &LocalProcessSpec, -) -> Result { - let binary = resolve_binary(spec); - let mut args = vec![spec.config_arg.clone(), spec.config_file_name.clone()]; - args.extend(spec.extra_args.iter().cloned()); - - Ok(LaunchSpec { - binary, - files: vec![crate::process::LaunchFile { - relative_path: spec.config_file_name.clone().into(), - contents: rendered_config, - }], - args, - env: spec.env.clone(), - }) -} - -fn resolve_binary(spec: &LocalProcessSpec) -> PathBuf { - std::env::var(&spec.binary_env_var) - .map(PathBuf::from) - .or_else(|_| which::which(&spec.binary_name)) - .unwrap_or_else(|_| { - let mut path = std::env::current_dir().unwrap_or_default(); - let mut debug = path.clone(); - debug.push(format!("target/debug/{}", spec.binary_name)); - if debug.exists() { - return debug; - } - - path.push(format!("target/release/{}", spec.binary_name)); - path - }) -} - -#[async_trait::async_trait] -pub trait LocalDeployerEnv: Application + Sized -where - ::NodeConfig: Clone + Send + Sync + 'static, -{ - fn local_port_names() -> &'static [&'static str] { - Self::initial_local_port_names() - } - - fn build_node_config( - topology: &Self::Deployment, - index: usize, - peer_ports_by_name: &HashMap, - options: &StartNodeOptions, - peer_ports: &[u16], - ) -> Result::NodeConfig>, DynError> { - Self::build_node_config_from_template( - topology, - index, - peer_ports_by_name, - options, - peer_ports, - None, - ) - } - - fn build_node_config_from_template( - topology: &Self::Deployment, - index: usize, - peer_ports_by_name: &HashMap, - options: &StartNodeOptions, - peer_ports: &[u16], - template_config: Option<&::NodeConfig>, - ) -> Result::NodeConfig>, DynError> { - let mut reserved = reserve_local_node_ports(1, Self::local_port_names(), "node") - .map_err(|source| -> DynError { source.into() })?; - let ports = reserved - .pop() - .ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?; - let network_port = ports.network_port(); - let config = Self::build_local_node_config( - topology, - index, - &ports, - peer_ports_by_name, - options, - peer_ports, - template_config, - )?; - - Ok(BuiltNodeConfig { - config, - network_port, - }) - } - - fn build_initial_node_configs( - topology: &Self::Deployment, - ) -> Result::NodeConfig>>, ProcessSpawnError> { - let reserved_ports = reserve_local_node_ports( - topology.node_count(), - Self::initial_local_port_names(), - Self::initial_node_name_prefix(), - )?; - let peer_ports = reserved_ports - .iter() - .map(LocalNodePorts::network_port) - .collect::>(); - - let mut configs = Vec::with_capacity(topology.node_count()); - for (index, ports) in reserved_ports.iter().enumerate() { - let config = Self::build_initial_node_config(topology, index, ports, &peer_ports) - .map_err(|source| ProcessSpawnError::Config { source })?; - configs.push(NodeConfigEntry { - name: format!("{}-{index}", Self::initial_node_name_prefix()), - config, - }); - } - - Ok(configs) - } - - fn initial_node_name_prefix() -> &'static str { - "node" - } - - fn initial_local_port_names() -> &'static [&'static str] { - &[] - } - - fn build_initial_node_config( - topology: &Self::Deployment, - index: usize, - ports: &LocalNodePorts, - peer_ports: &[u16], - ) -> Result<::NodeConfig, DynError> { - let compact_peer_ports = compact_peer_ports(peer_ports, index); - let peer_ports_by_name = HashMap::new(); - let options = StartNodeOptions::::default(); - Self::build_local_node_config( - topology, - index, - ports, - &peer_ports_by_name, - &options, - &compact_peer_ports, - None, - ) - } - - fn build_local_node_config( - _topology: &Self::Deployment, - _index: usize, - _ports: &LocalNodePorts, - _peer_ports_by_name: &HashMap, - _options: &StartNodeOptions, - _peer_ports: &[u16], - _template_config: Option<&::NodeConfig>, - ) -> Result<::NodeConfig, DynError> { - let peers = build_local_peer_nodes(_peer_ports, _index); - Self::build_local_node_config_with_peers( - _topology, - _index, - _ports, - &peers, - _peer_ports_by_name, - _options, - _template_config, - ) - } - - fn build_local_node_config_with_peers( - _topology: &Self::Deployment, - _index: usize, - _ports: &LocalNodePorts, - _peers: &[LocalPeerNode], - _peer_ports_by_name: &HashMap, - _options: &StartNodeOptions, - _template_config: Option<&::NodeConfig>, - ) -> Result<::NodeConfig, DynError> { - Err(std::io::Error::other( - "build_local_node_config_with_peers is not implemented for this app", - ) - .into()) - } - - fn initial_persist_dir( - _topology: &Self::Deployment, - _node_name: &str, - _index: usize, - ) -> Option { - None - } - - fn initial_snapshot_dir( - _topology: &Self::Deployment, - _node_name: &str, - _index: usize, - ) -> Option { - None - } - - fn local_process_spec() -> Option { - None - } - - fn render_local_config( - _config: &::NodeConfig, - ) -> Result, DynError> { - Err(std::io::Error::other("render_local_config is not implemented for this app").into()) - } - - fn build_launch_spec( - config: &::NodeConfig, - _dir: &Path, - _label: &str, - ) -> Result { - let spec = Self::local_process_spec().ok_or_else(|| { - std::io::Error::other("build_launch_spec is not implemented for this app") - })?; - let rendered = Self::render_local_config(config)?; - rendered_config_launch_spec(rendered, &spec) - } - - fn http_api_port(_config: &::NodeConfig) -> Option { - None - } - - fn node_endpoints( - config: &::NodeConfig, - ) -> Result { - if let Some(port) = Self::http_api_port(config) { - return Ok(NodeEndpoints { - api: SocketAddr::from((Ipv4Addr::LOCALHOST, port)), - extra_ports: HashMap::new(), - }); - } - - Err(std::io::Error::other("node_endpoints is not implemented for this app").into()) - } - - fn node_peer_port(node: &Node) -> u16 { - node.endpoints().api.port() - } - - fn node_client_from_api_endpoint(_api: SocketAddr) -> Option { - None - } - - fn node_client(endpoints: &NodeEndpoints) -> Result { - if let Ok(client) = - ::build_node_client(&discovered_node_access(endpoints)) - { - return Ok(client); - } - - if let Some(client) = Self::node_client_from_api_endpoint(endpoints.api) { - return Ok(client); - } - - Err(std::io::Error::other("node_client is not implemented for this app").into()) - } - - fn readiness_endpoint_path() -> &'static str { - ::node_readiness_path() - } - - async fn wait_readiness_stable(_nodes: &[Node]) -> Result<(), DynError> { - Ok(()) - } -} - -pub async fn wait_local_http_readiness( - nodes: &[Node], - requirement: HttpReadinessRequirement, -) -> Result<(), ReadinessError> { - let ports: Vec<_> = nodes - .iter() - .map(|node| node.endpoints().api.port()) - .collect(); - wait_for_http_ports_with_requirement(&ports, E::readiness_endpoint_path(), requirement).await?; - - E::wait_readiness_stable(nodes) - .await - .map_err(|source| ReadinessError::ClusterStable { source }) -} - -pub async fn spawn_node_from_config( - label: String, - config: ::NodeConfig, - keep_tempdir: bool, - persist_dir: Option<&std::path::Path>, - snapshot_dir: Option<&std::path::Path>, -) -> Result, ProcessSpawnError> { - ProcessNode::spawn( - &label, - config, - E::build_launch_spec, - E::node_endpoints, - keep_tempdir, - persist_dir, - snapshot_dir, - E::node_client, - ) - .await -} - -#[cfg(test)] -mod tests { - use std::{ - path::Path, - sync::atomic::{AtomicUsize, Ordering}, - }; - - use async_trait::async_trait; - use testing_framework_core::{ - scenario::{Application, DynError, Feed, FeedRuntime, NodeClients}, - topology::DeploymentDescriptor, - }; - - use super::*; - - static STABLE_CALLS: AtomicUsize = AtomicUsize::new(0); - - #[derive(Clone, Default)] - struct DummyFeed; - - impl Feed for DummyFeed { - type Subscription = (); - - fn subscribe(&self) -> Self::Subscription {} - } - - #[derive(Default)] - struct DummyFeedRuntime; - - #[async_trait] - impl FeedRuntime for DummyFeedRuntime { - type Feed = DummyFeed; - - async fn run(self: Box) {} - } - - #[derive(Clone)] - struct DummyConfig; - - #[derive(Clone)] - struct DummyTopology; - - impl DeploymentDescriptor for DummyTopology { - fn node_count(&self) -> usize { - 0 - } - } - - struct DummyEnv; - - #[async_trait] - impl Application for DummyEnv { - type Deployment = DummyTopology; - type NodeClient = (); - type NodeConfig = DummyConfig; - type FeedRuntime = DummyFeedRuntime; - - async fn prepare_feed( - _node_clients: NodeClients, - ) -> Result<(::Feed, Self::FeedRuntime), DynError> - { - Ok((DummyFeed, DummyFeedRuntime)) - } - } - - #[async_trait] - impl LocalDeployerEnv for DummyEnv { - fn build_node_config( - _topology: &Self::Deployment, - _index: usize, - _peer_ports_by_name: &HashMap, - _options: &StartNodeOptions, - _peer_ports: &[u16], - ) -> Result::NodeConfig>, DynError> { - unreachable!("not used in this test") - } - - fn build_initial_node_configs( - _topology: &Self::Deployment, - ) -> Result::NodeConfig>>, ProcessSpawnError> - { - unreachable!("not used in this test") - } - - fn build_launch_spec( - _config: &::NodeConfig, - _dir: &Path, - _label: &str, - ) -> Result { - Ok(LaunchSpec::default()) - } - - fn node_endpoints( - _config: &::NodeConfig, - ) -> Result { - Ok(NodeEndpoints::default()) - } - - fn node_client(_endpoints: &NodeEndpoints) -> Result { - Ok(()) - } - - async fn wait_readiness_stable(_nodes: &[Node]) -> Result<(), DynError> { - STABLE_CALLS.fetch_add(1, Ordering::SeqCst); - Ok(()) - } - } - - #[tokio::test] - async fn empty_cluster_still_runs_stability_hook() { - STABLE_CALLS.store(0, Ordering::SeqCst); - let nodes: Vec> = Vec::new(); - wait_local_http_readiness::(&nodes, HttpReadinessRequirement::AllNodesReady) - .await - .expect("empty cluster should be considered ready"); - assert_eq!(STABLE_CALLS.load(Ordering::SeqCst), 1); - } -} diff --git a/testing-framework/deployers/local/src/env/helpers.rs b/testing-framework/deployers/local/src/env/helpers.rs new file mode 100644 index 0000000..fae98c5 --- /dev/null +++ b/testing-framework/deployers/local/src/env/helpers.rs @@ -0,0 +1,314 @@ +use std::{collections::HashMap, path::PathBuf}; + +use serde::Serialize; +use testing_framework_core::scenario::{ + Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError, + NodeAccess, +}; + +use crate::process::{LaunchSpec, NodeEndpointPort, NodeEndpoints, ProcessSpawnError}; + +pub struct BuiltNodeConfig { + pub config: Config, + pub network_port: u16, +} + +pub struct NodeConfigEntry { + pub name: String, + pub config: NodeConfigValue, +} + +pub struct LocalNodePorts { + network_port: u16, + named_ports: HashMap<&'static str, u16>, +} + +impl LocalNodePorts { + #[must_use] + pub fn network_port(&self) -> u16 { + self.network_port + } + + #[must_use] + pub fn get(&self, name: &str) -> Option { + self.named_ports.get(name).copied() + } + + pub fn require(&self, name: &str) -> Result { + self.get(name) + .ok_or_else(|| format!("missing reserved local port '{name}'").into()) + } + + pub fn iter(&self) -> impl Iterator + '_ { + self.named_ports.iter().map(|(name, port)| (*name, *port)) + } +} + +#[derive(Clone, Debug)] +pub struct LocalPeerNode { + index: usize, + network_port: u16, +} + +impl LocalPeerNode { + #[must_use] + pub fn index(&self) -> usize { + self.index + } + + #[must_use] + pub fn network_port(&self) -> u16 { + self.network_port + } + + #[must_use] + pub fn http_address(&self) -> String { + format!("127.0.0.1:{}", self.network_port) + } + + #[must_use] + pub fn authority(&self) -> String { + self.http_address() + } +} + +#[derive(Clone, Default)] +pub struct LocalProcessSpec { + pub binary_env_var: String, + pub binary_name: String, + pub config_file_name: String, + pub config_arg: String, + pub extra_args: Vec, + pub env: Vec, +} + +impl LocalProcessSpec { + #[must_use] + pub fn new(binary_env_var: &str, binary_name: &str) -> Self { + Self { + binary_env_var: binary_env_var.to_owned(), + binary_name: binary_name.to_owned(), + config_file_name: "config.yaml".to_owned(), + config_arg: "--config".to_owned(), + extra_args: Vec::new(), + env: Vec::new(), + } + } + + #[must_use] + pub fn with_config_file(mut self, file_name: &str, arg: &str) -> Self { + self.config_file_name = file_name.to_owned(); + self.config_arg = arg.to_owned(); + self + } + + #[must_use] + pub fn with_env(mut self, key: &str, value: &str) -> Self { + self.env.push(crate::process::LaunchEnvVar::new(key, value)); + self + } + + #[must_use] + pub fn with_rust_log(self, value: &str) -> Self { + self.with_env("RUST_LOG", value) + } + + #[must_use] + pub fn with_args(mut self, args: impl IntoIterator) -> Self { + self.extra_args.extend(args); + self + } +} + +pub fn preallocate_ports(count: usize, label: &str) -> Result, ProcessSpawnError> { + (0..count) + .map(|_| crate::process::allocate_available_port()) + .collect::, _>>() + .map_err(|source| ProcessSpawnError::Config { + source: format!("failed to pre-allocate {label} ports: {source}").into(), + }) +} + +pub fn build_indexed_node_configs( + count: usize, + name_prefix: &str, + build: impl FnMut(usize) -> T, +) -> Vec> { + (0..count) + .map(build) + .enumerate() + .map(|(index, config)| NodeConfigEntry { + name: format!("{name_prefix}-{index}"), + config, + }) + .collect() +} + +pub fn reserve_local_node_ports( + count: usize, + names: &[&'static str], + label: &str, +) -> Result, ProcessSpawnError> { + let network_ports = preallocate_ports(count, label)?; + let mut named_by_role = HashMap::new(); + for name in names { + named_by_role.insert(*name, preallocate_ports(count, &format!("{label} {name}"))?); + } + + Ok((0..count) + .map(|index| LocalNodePorts { + network_port: network_ports[index], + named_ports: named_by_role + .iter() + .map(|(name, ports)| (*name, ports[index])) + .collect(), + }) + .collect()) +} + +pub fn single_http_node_endpoints(port: u16) -> NodeEndpoints { + NodeEndpoints::from_api_port(port) +} + +pub fn build_local_cluster_node_config( + index: usize, + ports: &LocalNodePorts, + peers: &[LocalPeerNode], +) -> Result<::NodeConfig, DynError> +where + E: ClusterNodeConfigApplication, +{ + let mut node = ClusterNodeView::new(index, "127.0.0.1", ports.network_port()); + for (name, port) in ports.iter() { + node = node.with_named_port(name, port); + } + + let peer_views = peers + .iter() + .map(|peer| ClusterPeerView::new(peer.index(), "127.0.0.1", peer.network_port())) + .collect::>(); + + E::build_cluster_node_config(&node, &peer_views).map_err(Into::into) +} + +pub fn discovered_node_access(endpoints: &NodeEndpoints) -> NodeAccess { + let mut access = NodeAccess::new("127.0.0.1", endpoints.api.port()); + + for (key, port) in &endpoints.extra_ports { + match key { + NodeEndpointPort::TestingApi => { + access = access.with_testing_port(*port); + } + NodeEndpointPort::Custom(name) => { + access = access.with_named_port(name.clone(), *port); + } + NodeEndpointPort::Network => {} + } + } + + access +} + +pub fn build_indexed_http_peers( + node_count: usize, + self_index: usize, + peer_ports: &[u16], + mut build_peer: impl FnMut(usize, String) -> T, +) -> Vec { + (0..node_count) + .filter(|&i| i != self_index) + .map(|i| build_peer(i, format!("127.0.0.1:{}", peer_ports[i]))) + .collect() +} + +pub(crate) fn compact_peer_ports(peer_ports: &[u16], self_index: usize) -> Vec { + peer_ports + .iter() + .enumerate() + .filter_map(|(index, port)| (index != self_index).then_some(*port)) + .collect() +} + +pub fn build_local_peer_nodes(peer_ports: &[u16], self_index: usize) -> Vec { + peer_ports + .iter() + .enumerate() + .filter_map(|(index, port)| { + (index != self_index).then_some(LocalPeerNode { + index, + network_port: *port, + }) + }) + .collect() +} + +pub fn yaml_config_launch_spec( + config: &T, + spec: &LocalProcessSpec, +) -> Result { + let config_yaml = serde_yaml::to_string(config)?; + rendered_config_launch_spec(config_yaml.into_bytes(), spec) +} + +pub fn text_config_launch_spec( + rendered_config: impl Into>, + spec: &LocalProcessSpec, +) -> Result { + rendered_config_launch_spec(rendered_config.into(), spec) +} + +pub fn default_yaml_launch_spec( + config: &T, + binary_env_var: &str, + binary_name: &str, + rust_log: &str, +) -> Result { + yaml_config_launch_spec( + config, + &LocalProcessSpec::new(binary_env_var, binary_name).with_rust_log(rust_log), + ) +} + +pub fn yaml_node_config(config: &T) -> Result, DynError> { + Ok(serde_yaml::to_string(config)?.into_bytes()) +} + +pub fn text_node_config(rendered_config: impl Into>) -> Vec { + rendered_config.into() +} + +pub(crate) fn rendered_config_launch_spec( + rendered_config: Vec, + spec: &LocalProcessSpec, +) -> Result { + let binary = resolve_binary(spec); + let mut args = vec![spec.config_arg.clone(), spec.config_file_name.clone()]; + args.extend(spec.extra_args.iter().cloned()); + + Ok(LaunchSpec { + binary, + files: vec![crate::process::LaunchFile { + relative_path: spec.config_file_name.clone().into(), + contents: rendered_config, + }], + args, + env: spec.env.clone(), + }) +} + +fn resolve_binary(spec: &LocalProcessSpec) -> PathBuf { + std::env::var(&spec.binary_env_var) + .map(PathBuf::from) + .or_else(|_| which::which(&spec.binary_name)) + .unwrap_or_else(|_| { + let mut path = std::env::current_dir().unwrap_or_default(); + let mut debug = path.clone(); + debug.push(format!("target/debug/{}", spec.binary_name)); + if debug.exists() { + return debug; + } + + path.push(format!("target/release/{}", spec.binary_name)); + path + }) +} diff --git a/testing-framework/deployers/local/src/env/mod.rs b/testing-framework/deployers/local/src/env/mod.rs new file mode 100644 index 0000000..fde33de --- /dev/null +++ b/testing-framework/deployers/local/src/env/mod.rs @@ -0,0 +1,341 @@ +use std::{ + collections::HashMap, + net::{Ipv4Addr, SocketAddr}, + path::{Path, PathBuf}, +}; + +use testing_framework_core::{ + scenario::{ + Application, DynError, HttpReadinessRequirement, ReadinessError, StartNodeOptions, + wait_for_http_ports_with_requirement, + }, + topology::DeploymentDescriptor, +}; + +use crate::process::{LaunchSpec, NodeEndpoints, ProcessNode, ProcessSpawnError}; + +mod helpers; +mod runtime; +#[cfg(test)] +mod tests; + +pub use helpers::{ + BuiltNodeConfig, LocalNodePorts, LocalPeerNode, LocalProcessSpec, NodeConfigEntry, + build_indexed_http_peers, build_indexed_node_configs, build_local_cluster_node_config, + build_local_peer_nodes, default_yaml_launch_spec, discovered_node_access, preallocate_ports, + reserve_local_node_ports, single_http_node_endpoints, text_config_launch_spec, + text_node_config, yaml_config_launch_spec, yaml_node_config, +}; +pub use runtime::{ + LocalAccess, LocalBuildContext, LocalProcess, LocalRuntime, cluster_node_config_from_context, +}; + +pub type Node = ProcessNode<::NodeConfig, ::NodeClient>; + +#[async_trait::async_trait] +pub trait LocalDeployerEnv: Application + Sized +where + ::NodeConfig: Clone + Send + Sync + 'static, +{ + fn local_runtime() -> Option> { + None + } + + fn local_port_names() -> &'static [&'static str] { + Self::local_runtime() + .map(|runtime| runtime.process.port_names) + .unwrap_or_else(Self::initial_local_port_names) + } + + fn build_node_config( + topology: &Self::Deployment, + index: usize, + peer_ports_by_name: &HashMap, + options: &StartNodeOptions, + peer_ports: &[u16], + ) -> Result::NodeConfig>, DynError> { + Self::build_node_config_from_template( + topology, + index, + peer_ports_by_name, + options, + peer_ports, + None, + ) + } + + fn build_node_config_from_template( + topology: &Self::Deployment, + index: usize, + peer_ports_by_name: &HashMap, + options: &StartNodeOptions, + peer_ports: &[u16], + template_config: Option<&::NodeConfig>, + ) -> Result::NodeConfig>, DynError> { + let mut reserved = reserve_local_node_ports(1, Self::local_port_names(), "node") + .map_err(|source| -> DynError { source.into() })?; + let ports = reserved + .pop() + .ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?; + let network_port = ports.network_port(); + let config = Self::build_local_node_config( + topology, + index, + &ports, + peer_ports_by_name, + options, + peer_ports, + template_config, + )?; + + Ok(BuiltNodeConfig { + config, + network_port, + }) + } + + fn build_initial_node_configs( + topology: &Self::Deployment, + ) -> Result::NodeConfig>>, ProcessSpawnError> { + let reserved_ports = reserve_local_node_ports( + topology.node_count(), + Self::local_port_names(), + Self::initial_node_name_prefix(), + )?; + let peer_ports = reserved_ports + .iter() + .map(LocalNodePorts::network_port) + .collect::>(); + + let mut configs = Vec::with_capacity(topology.node_count()); + for (index, ports) in reserved_ports.iter().enumerate() { + let config = Self::build_initial_node_config(topology, index, ports, &peer_ports) + .map_err(|source| ProcessSpawnError::Config { source })?; + configs.push(NodeConfigEntry { + name: format!("{}-{index}", Self::initial_node_name_prefix()), + config, + }); + } + + Ok(configs) + } + + fn initial_node_name_prefix() -> &'static str { + Self::local_runtime() + .map(|runtime| runtime.process.node_name_prefix) + .unwrap_or("node") + } + + fn initial_local_port_names() -> &'static [&'static str] { + &[] + } + + fn build_initial_node_config( + topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + peer_ports: &[u16], + ) -> Result<::NodeConfig, DynError> { + let compact_peer_ports = helpers::compact_peer_ports(peer_ports, index); + let peer_ports_by_name = HashMap::new(); + let options = StartNodeOptions::::default(); + Self::build_local_node_config( + topology, + index, + ports, + &peer_ports_by_name, + &options, + &compact_peer_ports, + None, + ) + } + + fn build_local_node_config( + topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + peer_ports_by_name: &HashMap, + options: &StartNodeOptions, + peer_ports: &[u16], + template_config: Option<&::NodeConfig>, + ) -> Result<::NodeConfig, DynError> { + let peers = build_local_peer_nodes(peer_ports, index); + Self::build_local_node_config_with_peers( + topology, + index, + ports, + &peers, + peer_ports_by_name, + options, + template_config, + ) + } + + fn build_local_node_config_with_peers( + topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + peers: &[LocalPeerNode], + peer_ports_by_name: &HashMap, + options: &StartNodeOptions, + template_config: Option<&::NodeConfig>, + ) -> Result<::NodeConfig, DynError> { + if let Some(runtime) = Self::local_runtime() { + return (runtime.process.build_config)(LocalBuildContext { + topology, + index, + ports, + peers, + peer_ports_by_name, + options, + template_config, + }); + } + + Err(std::io::Error::other( + "build_local_node_config_with_peers is not implemented for this app", + ) + .into()) + } + + fn initial_persist_dir( + _topology: &Self::Deployment, + _node_name: &str, + _index: usize, + ) -> Option { + None + } + + fn initial_snapshot_dir( + _topology: &Self::Deployment, + _node_name: &str, + _index: usize, + ) -> Option { + None + } + + fn local_process_spec() -> Option { + Self::local_runtime().map(|runtime| runtime.process.spec) + } + + fn render_local_config( + config: &::NodeConfig, + ) -> Result, DynError> { + if let Some(runtime) = Self::local_runtime() { + return (runtime.process.render_config)(config); + } + + Err(std::io::Error::other("render_local_config is not implemented for this app").into()) + } + + fn build_launch_spec( + config: &::NodeConfig, + _dir: &Path, + _label: &str, + ) -> Result { + let spec = Self::local_process_spec().ok_or_else(|| { + std::io::Error::other("build_launch_spec is not implemented for this app") + })?; + let rendered = Self::render_local_config(config)?; + helpers::rendered_config_launch_spec(rendered, &spec) + } + + fn http_api_port(config: &::NodeConfig) -> Option { + Self::local_runtime() + .and_then(|runtime| runtime.access.api_port.map(|api_port| api_port(config))) + } + + fn node_endpoints( + config: &::NodeConfig, + ) -> Result { + if let Some(runtime) = Self::local_runtime() { + return runtime.access.node_endpoints(config); + } + + if let Some(port) = Self::http_api_port(config) { + return Ok(NodeEndpoints { + api: SocketAddr::from((Ipv4Addr::LOCALHOST, port)), + extra_ports: HashMap::new(), + }); + } + + Err(std::io::Error::other("node_endpoints is not implemented for this app").into()) + } + + fn node_peer_port(node: &Node) -> u16 { + if let Some(runtime) = Self::local_runtime() { + return runtime + .access + .node_peer_port(node.config(), node.endpoints()); + } + + node.endpoints().api.port() + } + + fn node_client_from_api_endpoint(_api: SocketAddr) -> Option { + None + } + + fn node_client(endpoints: &NodeEndpoints) -> Result { + if let Some(runtime) = Self::local_runtime() { + return runtime.access.node_client(endpoints); + } + + if let Ok(client) = + ::build_node_client(&discovered_node_access(endpoints)) + { + return Ok(client); + } + + if let Some(client) = Self::node_client_from_api_endpoint(endpoints.api) { + return Ok(client); + } + + Err(std::io::Error::other("node_client is not implemented for this app").into()) + } + + fn readiness_endpoint_path() -> &'static str { + Self::local_runtime() + .map(|runtime| runtime.access.readiness_path) + .unwrap_or_else(::node_readiness_path) + } + + async fn wait_readiness_stable(_nodes: &[Node]) -> Result<(), DynError> { + Ok(()) + } +} + +pub async fn wait_local_http_readiness( + nodes: &[Node], + requirement: HttpReadinessRequirement, +) -> Result<(), ReadinessError> { + let ports: Vec<_> = nodes + .iter() + .map(|node| node.endpoints().api.port()) + .collect(); + wait_for_http_ports_with_requirement(&ports, E::readiness_endpoint_path(), requirement).await?; + + E::wait_readiness_stable(nodes) + .await + .map_err(|source| ReadinessError::ClusterStable { source }) +} + +pub async fn spawn_node_from_config( + label: String, + config: ::NodeConfig, + keep_tempdir: bool, + persist_dir: Option<&std::path::Path>, + snapshot_dir: Option<&std::path::Path>, +) -> Result, ProcessSpawnError> { + ProcessNode::spawn( + &label, + config, + E::build_launch_spec, + E::node_endpoints, + keep_tempdir, + persist_dir, + snapshot_dir, + E::node_client, + ) + .await +} diff --git a/testing-framework/deployers/local/src/env/runtime.rs b/testing-framework/deployers/local/src/env/runtime.rs new file mode 100644 index 0000000..79844b3 --- /dev/null +++ b/testing-framework/deployers/local/src/env/runtime.rs @@ -0,0 +1,232 @@ +use std::{ + collections::HashMap, + net::{Ipv4Addr, SocketAddr}, +}; + +use serde::Serialize; +use testing_framework_core::scenario::{ + Application, ClusterNodeConfigApplication, DynError, NodeAccess, StartNodeOptions, +}; + +use crate::{ + env::{ + LocalNodePorts, LocalPeerNode, LocalProcessSpec, NodeEndpoints, discovered_node_access, + yaml_node_config, + }, + process::LaunchEnvVar, +}; + +pub struct LocalBuildContext<'a, E: Application> { + pub topology: &'a E::Deployment, + pub index: usize, + pub ports: &'a LocalNodePorts, + pub peers: &'a [LocalPeerNode], + pub peer_ports_by_name: &'a HashMap, + pub options: &'a StartNodeOptions, + pub template_config: Option<&'a E::NodeConfig>, +} + +pub type LocalConfigBuilder = + for<'a> fn(LocalBuildContext<'a, E>) -> Result<::NodeConfig, DynError>; + +pub type LocalConfigRenderer = fn(&::NodeConfig) -> Result, DynError>; + +pub type LocalApiPort = fn(&::NodeConfig) -> u16; + +pub type LocalEndpoints = fn(&::NodeConfig) -> Result; + +pub type LocalClientBuilder = + fn(&NodeAccess) -> Result<::NodeClient, DynError>; + +pub type LocalPeerPort = fn(&::NodeConfig, &NodeEndpoints) -> u16; + +#[derive(Clone)] +pub struct LocalProcess { + pub(crate) spec: LocalProcessSpec, + pub(crate) build_config: LocalConfigBuilder, + pub(crate) render_config: LocalConfigRenderer, + pub(crate) node_name_prefix: &'static str, + pub(crate) port_names: &'static [&'static str], +} + +impl LocalProcess { + #[must_use] + pub fn new( + binary_env_var: &'static str, + binary_name: &'static str, + build_config: LocalConfigBuilder, + render_config: LocalConfigRenderer, + ) -> Self { + Self { + spec: LocalProcessSpec::new(binary_env_var, binary_name), + build_config, + render_config, + node_name_prefix: "node", + port_names: &[], + } + } + + #[must_use] + pub fn with_node_name_prefix(mut self, value: &'static str) -> Self { + self.node_name_prefix = value; + self + } + + #[must_use] + pub fn with_port_names(mut self, value: &'static [&'static str]) -> Self { + self.port_names = value; + self + } + + #[must_use] + pub fn with_config_file(mut self, file_name: &str, arg: &str) -> Self { + self.spec = self.spec.with_config_file(file_name, arg); + self + } + + #[must_use] + pub fn with_env(mut self, key: &str, value: &str) -> Self { + self.spec = self.spec.with_env(key, value); + self + } + + #[must_use] + pub fn with_rust_log(mut self, value: &str) -> Self { + self.spec = self.spec.with_rust_log(value); + self + } + + #[must_use] + pub fn with_args(mut self, args: impl IntoIterator) -> Self { + self.spec = self.spec.with_args(args); + self + } + + #[must_use] + pub fn with_launch_env(mut self, vars: impl IntoIterator) -> Self { + self.spec.env.extend(vars); + self + } +} + +impl LocalProcess +where + E: Application, + E::NodeConfig: Serialize, +{ + #[must_use] + pub fn yaml( + binary_env_var: &'static str, + binary_name: &'static str, + build_config: LocalConfigBuilder, + ) -> Self { + Self::new( + binary_env_var, + binary_name, + build_config, + yaml_node_config::, + ) + } +} + +#[derive(Clone)] +pub struct LocalAccess { + pub(crate) api_port: Option>, + pub(crate) endpoints: Option>, + pub(crate) client: Option>, + pub(crate) peer_port: Option>, + pub(crate) readiness_path: &'static str, +} + +impl LocalAccess { + #[must_use] + pub fn http(api_port: LocalApiPort) -> Self { + Self { + api_port: Some(api_port), + endpoints: None, + client: None, + peer_port: None, + readiness_path: E::node_readiness_path(), + } + } + + #[must_use] + pub fn custom(endpoints: LocalEndpoints) -> Self { + Self { + api_port: None, + endpoints: Some(endpoints), + client: None, + peer_port: None, + readiness_path: E::node_readiness_path(), + } + } + + #[must_use] + pub fn with_client(mut self, client: LocalClientBuilder) -> Self { + self.client = Some(client); + self + } + + #[must_use] + pub fn with_peer_port(mut self, peer_port: LocalPeerPort) -> Self { + self.peer_port = Some(peer_port); + self + } + + #[must_use] + pub fn with_readiness_path(mut self, readiness_path: &'static str) -> Self { + self.readiness_path = readiness_path; + self + } + + pub(crate) fn node_endpoints(&self, config: &E::NodeConfig) -> Result { + if let Some(endpoints) = self.endpoints { + return endpoints(config); + } + + if let Some(api_port) = self.api_port { + return Ok(NodeEndpoints { + api: SocketAddr::from((Ipv4Addr::LOCALHOST, api_port(config))), + extra_ports: HashMap::new(), + }); + } + + Err(std::io::Error::other("node endpoints are not configured").into()) + } + + pub(crate) fn node_client(&self, endpoints: &NodeEndpoints) -> Result { + if let Some(client) = self.client { + return client(&discovered_node_access(endpoints)); + } + + E::build_node_client(&discovered_node_access(endpoints)) + } + + pub(crate) fn node_peer_port(&self, config: &E::NodeConfig, endpoints: &NodeEndpoints) -> u16 { + self.peer_port + .map(|peer_port| peer_port(config, endpoints)) + .unwrap_or_else(|| endpoints.api.port()) + } +} + +#[derive(Clone)] +pub struct LocalRuntime { + pub(crate) process: LocalProcess, + pub(crate) access: LocalAccess, +} + +impl LocalRuntime { + #[must_use] + pub fn new(process: LocalProcess, access: LocalAccess) -> Self { + Self { process, access } + } +} + +pub fn cluster_node_config_from_context( + context: LocalBuildContext<'_, E>, +) -> Result<::NodeConfig, DynError> +where + E: Application + ClusterNodeConfigApplication, +{ + crate::env::build_local_cluster_node_config::(context.index, context.ports, context.peers) +} diff --git a/testing-framework/deployers/local/src/env/tests.rs b/testing-framework/deployers/local/src/env/tests.rs new file mode 100644 index 0000000..8037c5c --- /dev/null +++ b/testing-framework/deployers/local/src/env/tests.rs @@ -0,0 +1,113 @@ +use std::{ + path::Path, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use async_trait::async_trait; +use testing_framework_core::{ + scenario::{Application, DynError, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients}, + topology::DeploymentDescriptor, +}; + +use super::*; + +static STABLE_CALLS: AtomicUsize = AtomicUsize::new(0); + +#[derive(Clone, Default)] +struct DummyFeed; + +impl Feed for DummyFeed { + type Subscription = (); + + fn subscribe(&self) -> Self::Subscription {} +} + +#[derive(Default)] +struct DummyFeedRuntime; + +#[async_trait] +impl FeedRuntime for DummyFeedRuntime { + type Feed = DummyFeed; + + async fn run(self: Box) {} +} + +#[derive(Clone)] +struct DummyConfig; + +#[derive(Clone)] +struct DummyTopology; + +impl DeploymentDescriptor for DummyTopology { + fn node_count(&self) -> usize { + 0 + } +} + +struct DummyEnv; + +#[async_trait] +impl Application for DummyEnv { + type Deployment = DummyTopology; + type NodeClient = (); + type NodeConfig = DummyConfig; + type FeedRuntime = DummyFeedRuntime; + + async fn prepare_feed( + _node_clients: NodeClients, + ) -> Result<(::Feed, Self::FeedRuntime), DynError> { + Ok((DummyFeed, DummyFeedRuntime)) + } +} + +#[async_trait] +impl LocalDeployerEnv for DummyEnv { + fn build_node_config( + _topology: &Self::Deployment, + _index: usize, + _peer_ports_by_name: &std::collections::HashMap, + _options: &StartNodeOptions, + _peer_ports: &[u16], + ) -> Result::NodeConfig>, DynError> { + unreachable!("not used in this test") + } + + fn build_initial_node_configs( + _topology: &Self::Deployment, + ) -> Result::NodeConfig>>, ProcessSpawnError> { + unreachable!("not used in this test") + } + + fn build_launch_spec( + _config: &::NodeConfig, + _dir: &Path, + _label: &str, + ) -> Result { + Ok(crate::process::LaunchSpec::default()) + } + + fn node_endpoints( + _config: &::NodeConfig, + ) -> Result { + Ok(NodeEndpoints::default()) + } + + fn node_client(_endpoints: &NodeEndpoints) -> Result { + Ok(()) + } + + async fn wait_readiness_stable(_nodes: &[Node]) -> Result<(), DynError> { + STABLE_CALLS.fetch_add(1, Ordering::SeqCst); + Ok(()) + } +} + +#[tokio::test] +async fn empty_cluster_still_runs_stability_hook() { + STABLE_CALLS.store(0, Ordering::SeqCst); + let nodes: Vec> = Vec::new(); + wait_local_http_readiness::(&nodes, HttpReadinessRequirement::AllNodesReady) + .await + .expect("empty cluster should be considered ready"); + assert_eq!(STABLE_CALLS.load(Ordering::SeqCst), 1); +} diff --git a/testing-framework/deployers/local/src/lib.rs b/testing-framework/deployers/local/src/lib.rs index 6014a62..71b96ef 100644 --- a/testing-framework/deployers/local/src/lib.rs +++ b/testing-framework/deployers/local/src/lib.rs @@ -9,9 +9,10 @@ pub mod process; pub use binary::{BinaryConfig, BinaryResolver}; pub use deployer::{ProcessDeployer, ProcessDeployerError}; pub use env::{ - BuiltNodeConfig, LocalDeployerEnv, LocalNodePorts, LocalPeerNode, LocalProcessSpec, - NodeConfigEntry, build_indexed_http_peers, build_indexed_node_configs, - build_local_cluster_node_config, build_local_peer_nodes, default_yaml_launch_spec, + BuiltNodeConfig, LocalAccess, LocalBuildContext, LocalDeployerEnv, LocalNodePorts, + LocalPeerNode, LocalProcess, LocalProcessSpec, LocalRuntime, NodeConfigEntry, + build_indexed_http_peers, build_indexed_node_configs, build_local_cluster_node_config, + build_local_peer_nodes, cluster_node_config_from_context, default_yaml_launch_spec, discovered_node_access, preallocate_ports, reserve_local_node_ports, single_http_node_endpoints, text_config_launch_spec, text_node_config, yaml_config_launch_spec, yaml_node_config,