mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-04-11 05:33:13 +00:00
refactor(local): add runtime-based app integration surface
This commit is contained in:
parent
36d7f3a0cf
commit
756a65fa77
@ -1,710 +0,0 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::{Ipv4Addr, SocketAddr},
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use serde::Serialize;
|
||||
use testing_framework_core::{
|
||||
scenario::{
|
||||
Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError,
|
||||
HttpReadinessRequirement, NodeAccess, ReadinessError, StartNodeOptions,
|
||||
wait_for_http_ports_with_requirement,
|
||||
},
|
||||
topology::DeploymentDescriptor,
|
||||
};
|
||||
|
||||
use crate::process::{LaunchSpec, NodeEndpointPort, NodeEndpoints, ProcessNode, ProcessSpawnError};
|
||||
|
||||
pub type Node<E> = ProcessNode<<E as Application>::NodeConfig, <E as Application>::NodeClient>;
|
||||
|
||||
pub struct BuiltNodeConfig<Config> {
|
||||
pub config: Config,
|
||||
pub network_port: u16,
|
||||
}
|
||||
|
||||
pub struct NodeConfigEntry<NodeConfigValue> {
|
||||
pub name: String,
|
||||
pub config: NodeConfigValue,
|
||||
}
|
||||
|
||||
pub struct LocalNodePorts {
|
||||
network_port: u16,
|
||||
named_ports: HashMap<&'static str, u16>,
|
||||
}
|
||||
|
||||
impl LocalNodePorts {
|
||||
#[must_use]
|
||||
pub fn network_port(&self) -> u16 {
|
||||
self.network_port
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn get(&self, name: &str) -> Option<u16> {
|
||||
self.named_ports.get(name).copied()
|
||||
}
|
||||
|
||||
pub fn require(&self, name: &str) -> Result<u16, DynError> {
|
||||
self.get(name)
|
||||
.ok_or_else(|| format!("missing reserved local port '{name}'").into())
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = (&'static str, u16)> + '_ {
|
||||
self.named_ports.iter().map(|(name, port)| (*name, *port))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LocalPeerNode {
|
||||
index: usize,
|
||||
network_port: u16,
|
||||
}
|
||||
|
||||
impl LocalPeerNode {
|
||||
#[must_use]
|
||||
pub fn index(&self) -> usize {
|
||||
self.index
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn network_port(&self) -> u16 {
|
||||
self.network_port
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn http_address(&self) -> String {
|
||||
format!("127.0.0.1:{}", self.network_port)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn authority(&self) -> String {
|
||||
self.http_address()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct LocalProcessSpec {
|
||||
pub binary_env_var: String,
|
||||
pub binary_name: String,
|
||||
pub config_file_name: String,
|
||||
pub config_arg: String,
|
||||
pub extra_args: Vec<String>,
|
||||
pub env: Vec<crate::process::LaunchEnvVar>,
|
||||
}
|
||||
|
||||
impl LocalProcessSpec {
|
||||
#[must_use]
|
||||
pub fn new(binary_env_var: &str, binary_name: &str) -> Self {
|
||||
Self {
|
||||
binary_env_var: binary_env_var.to_owned(),
|
||||
binary_name: binary_name.to_owned(),
|
||||
config_file_name: "config.yaml".to_owned(),
|
||||
config_arg: "--config".to_owned(),
|
||||
extra_args: Vec::new(),
|
||||
env: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_config_file(mut self, file_name: &str, arg: &str) -> Self {
|
||||
self.config_file_name = file_name.to_owned();
|
||||
self.config_arg = arg.to_owned();
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_env(mut self, key: &str, value: &str) -> Self {
|
||||
self.env.push(crate::process::LaunchEnvVar::new(key, value));
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_rust_log(self, value: &str) -> Self {
|
||||
self.with_env("RUST_LOG", value)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_args(mut self, args: impl IntoIterator<Item = String>) -> Self {
|
||||
self.extra_args.extend(args);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn preallocate_ports(count: usize, label: &str) -> Result<Vec<u16>, ProcessSpawnError> {
|
||||
(0..count)
|
||||
.map(|_| crate::process::allocate_available_port())
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(|source| ProcessSpawnError::Config {
|
||||
source: format!("failed to pre-allocate {label} ports: {source}").into(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn build_indexed_node_configs<T>(
|
||||
count: usize,
|
||||
name_prefix: &str,
|
||||
build: impl FnMut(usize) -> T,
|
||||
) -> Vec<NodeConfigEntry<T>> {
|
||||
(0..count)
|
||||
.map(build)
|
||||
.enumerate()
|
||||
.map(|(index, config)| NodeConfigEntry {
|
||||
name: format!("{name_prefix}-{index}"),
|
||||
config,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn reserve_local_node_ports(
|
||||
count: usize,
|
||||
names: &[&'static str],
|
||||
label: &str,
|
||||
) -> Result<Vec<LocalNodePorts>, ProcessSpawnError> {
|
||||
let network_ports = preallocate_ports(count, label)?;
|
||||
let mut named_by_role = HashMap::new();
|
||||
for name in names {
|
||||
named_by_role.insert(*name, preallocate_ports(count, &format!("{label} {name}"))?);
|
||||
}
|
||||
|
||||
Ok((0..count)
|
||||
.map(|index| LocalNodePorts {
|
||||
network_port: network_ports[index],
|
||||
named_ports: named_by_role
|
||||
.iter()
|
||||
.map(|(name, ports)| (*name, ports[index]))
|
||||
.collect(),
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub fn single_http_node_endpoints(port: u16) -> NodeEndpoints {
|
||||
NodeEndpoints::from_api_port(port)
|
||||
}
|
||||
|
||||
pub fn build_local_cluster_node_config<E>(
|
||||
index: usize,
|
||||
ports: &LocalNodePorts,
|
||||
peers: &[LocalPeerNode],
|
||||
) -> Result<<E as Application>::NodeConfig, DynError>
|
||||
where
|
||||
E: ClusterNodeConfigApplication,
|
||||
{
|
||||
let mut node = ClusterNodeView::new(index, "127.0.0.1", ports.network_port());
|
||||
for (name, port) in ports.iter() {
|
||||
node = node.with_named_port(name, port);
|
||||
}
|
||||
|
||||
let peer_views = peers
|
||||
.iter()
|
||||
.map(|peer| ClusterPeerView::new(peer.index(), "127.0.0.1", peer.network_port()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
E::build_cluster_node_config(&node, &peer_views).map_err(Into::into)
|
||||
}
|
||||
|
||||
pub fn discovered_node_access(endpoints: &NodeEndpoints) -> NodeAccess {
|
||||
let mut access = NodeAccess::new("127.0.0.1", endpoints.api.port());
|
||||
|
||||
for (key, port) in &endpoints.extra_ports {
|
||||
match key {
|
||||
NodeEndpointPort::TestingApi => {
|
||||
access = access.with_testing_port(*port);
|
||||
}
|
||||
NodeEndpointPort::Custom(name) => {
|
||||
access = access.with_named_port(name.clone(), *port);
|
||||
}
|
||||
NodeEndpointPort::Network => {}
|
||||
}
|
||||
}
|
||||
|
||||
access
|
||||
}
|
||||
|
||||
pub fn build_indexed_http_peers<T>(
|
||||
node_count: usize,
|
||||
self_index: usize,
|
||||
peer_ports: &[u16],
|
||||
mut build_peer: impl FnMut(usize, String) -> T,
|
||||
) -> Vec<T> {
|
||||
(0..node_count)
|
||||
.filter(|&i| i != self_index)
|
||||
.map(|i| build_peer(i, format!("127.0.0.1:{}", peer_ports[i])))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn compact_peer_ports(peer_ports: &[u16], self_index: usize) -> Vec<u16> {
|
||||
peer_ports
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(index, port)| (index != self_index).then_some(*port))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn build_local_peer_nodes(peer_ports: &[u16], self_index: usize) -> Vec<LocalPeerNode> {
|
||||
peer_ports
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(index, port)| {
|
||||
(index != self_index).then_some(LocalPeerNode {
|
||||
index,
|
||||
network_port: *port,
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn yaml_config_launch_spec<T: Serialize>(
|
||||
config: &T,
|
||||
spec: &LocalProcessSpec,
|
||||
) -> Result<LaunchSpec, DynError> {
|
||||
let config_yaml = serde_yaml::to_string(config)?;
|
||||
rendered_config_launch_spec(config_yaml.into_bytes(), spec)
|
||||
}
|
||||
|
||||
pub fn text_config_launch_spec(
|
||||
rendered_config: impl Into<Vec<u8>>,
|
||||
spec: &LocalProcessSpec,
|
||||
) -> Result<LaunchSpec, DynError> {
|
||||
rendered_config_launch_spec(rendered_config.into(), spec)
|
||||
}
|
||||
|
||||
pub fn default_yaml_launch_spec<T: Serialize>(
|
||||
config: &T,
|
||||
binary_env_var: &str,
|
||||
binary_name: &str,
|
||||
rust_log: &str,
|
||||
) -> Result<LaunchSpec, DynError> {
|
||||
yaml_config_launch_spec(
|
||||
config,
|
||||
&LocalProcessSpec::new(binary_env_var, binary_name).with_rust_log(rust_log),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn yaml_node_config<T: Serialize>(config: &T) -> Result<Vec<u8>, DynError> {
|
||||
Ok(serde_yaml::to_string(config)?.into_bytes())
|
||||
}
|
||||
|
||||
pub fn text_node_config(rendered_config: impl Into<Vec<u8>>) -> Vec<u8> {
|
||||
rendered_config.into()
|
||||
}
|
||||
|
||||
fn rendered_config_launch_spec(
|
||||
rendered_config: Vec<u8>,
|
||||
spec: &LocalProcessSpec,
|
||||
) -> Result<LaunchSpec, DynError> {
|
||||
let binary = resolve_binary(spec);
|
||||
let mut args = vec![spec.config_arg.clone(), spec.config_file_name.clone()];
|
||||
args.extend(spec.extra_args.iter().cloned());
|
||||
|
||||
Ok(LaunchSpec {
|
||||
binary,
|
||||
files: vec![crate::process::LaunchFile {
|
||||
relative_path: spec.config_file_name.clone().into(),
|
||||
contents: rendered_config,
|
||||
}],
|
||||
args,
|
||||
env: spec.env.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn resolve_binary(spec: &LocalProcessSpec) -> PathBuf {
|
||||
std::env::var(&spec.binary_env_var)
|
||||
.map(PathBuf::from)
|
||||
.or_else(|_| which::which(&spec.binary_name))
|
||||
.unwrap_or_else(|_| {
|
||||
let mut path = std::env::current_dir().unwrap_or_default();
|
||||
let mut debug = path.clone();
|
||||
debug.push(format!("target/debug/{}", spec.binary_name));
|
||||
if debug.exists() {
|
||||
return debug;
|
||||
}
|
||||
|
||||
path.push(format!("target/release/{}", spec.binary_name));
|
||||
path
|
||||
})
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait LocalDeployerEnv: Application + Sized
|
||||
where
|
||||
<Self as Application>::NodeConfig: Clone + Send + Sync + 'static,
|
||||
{
|
||||
fn local_port_names() -> &'static [&'static str] {
|
||||
Self::initial_local_port_names()
|
||||
}
|
||||
|
||||
fn build_node_config(
|
||||
topology: &Self::Deployment,
|
||||
index: usize,
|
||||
peer_ports_by_name: &HashMap<String, u16>,
|
||||
options: &StartNodeOptions<Self>,
|
||||
peer_ports: &[u16],
|
||||
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
|
||||
Self::build_node_config_from_template(
|
||||
topology,
|
||||
index,
|
||||
peer_ports_by_name,
|
||||
options,
|
||||
peer_ports,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
fn build_node_config_from_template(
|
||||
topology: &Self::Deployment,
|
||||
index: usize,
|
||||
peer_ports_by_name: &HashMap<String, u16>,
|
||||
options: &StartNodeOptions<Self>,
|
||||
peer_ports: &[u16],
|
||||
template_config: Option<&<Self as Application>::NodeConfig>,
|
||||
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
|
||||
let mut reserved = reserve_local_node_ports(1, Self::local_port_names(), "node")
|
||||
.map_err(|source| -> DynError { source.into() })?;
|
||||
let ports = reserved
|
||||
.pop()
|
||||
.ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?;
|
||||
let network_port = ports.network_port();
|
||||
let config = Self::build_local_node_config(
|
||||
topology,
|
||||
index,
|
||||
&ports,
|
||||
peer_ports_by_name,
|
||||
options,
|
||||
peer_ports,
|
||||
template_config,
|
||||
)?;
|
||||
|
||||
Ok(BuiltNodeConfig {
|
||||
config,
|
||||
network_port,
|
||||
})
|
||||
}
|
||||
|
||||
fn build_initial_node_configs(
|
||||
topology: &Self::Deployment,
|
||||
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError> {
|
||||
let reserved_ports = reserve_local_node_ports(
|
||||
topology.node_count(),
|
||||
Self::initial_local_port_names(),
|
||||
Self::initial_node_name_prefix(),
|
||||
)?;
|
||||
let peer_ports = reserved_ports
|
||||
.iter()
|
||||
.map(LocalNodePorts::network_port)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut configs = Vec::with_capacity(topology.node_count());
|
||||
for (index, ports) in reserved_ports.iter().enumerate() {
|
||||
let config = Self::build_initial_node_config(topology, index, ports, &peer_ports)
|
||||
.map_err(|source| ProcessSpawnError::Config { source })?;
|
||||
configs.push(NodeConfigEntry {
|
||||
name: format!("{}-{index}", Self::initial_node_name_prefix()),
|
||||
config,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(configs)
|
||||
}
|
||||
|
||||
fn initial_node_name_prefix() -> &'static str {
|
||||
"node"
|
||||
}
|
||||
|
||||
fn initial_local_port_names() -> &'static [&'static str] {
|
||||
&[]
|
||||
}
|
||||
|
||||
fn build_initial_node_config(
|
||||
topology: &Self::Deployment,
|
||||
index: usize,
|
||||
ports: &LocalNodePorts,
|
||||
peer_ports: &[u16],
|
||||
) -> Result<<Self as Application>::NodeConfig, DynError> {
|
||||
let compact_peer_ports = compact_peer_ports(peer_ports, index);
|
||||
let peer_ports_by_name = HashMap::new();
|
||||
let options = StartNodeOptions::<Self>::default();
|
||||
Self::build_local_node_config(
|
||||
topology,
|
||||
index,
|
||||
ports,
|
||||
&peer_ports_by_name,
|
||||
&options,
|
||||
&compact_peer_ports,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
fn build_local_node_config(
|
||||
_topology: &Self::Deployment,
|
||||
_index: usize,
|
||||
_ports: &LocalNodePorts,
|
||||
_peer_ports_by_name: &HashMap<String, u16>,
|
||||
_options: &StartNodeOptions<Self>,
|
||||
_peer_ports: &[u16],
|
||||
_template_config: Option<&<Self as Application>::NodeConfig>,
|
||||
) -> Result<<Self as Application>::NodeConfig, DynError> {
|
||||
let peers = build_local_peer_nodes(_peer_ports, _index);
|
||||
Self::build_local_node_config_with_peers(
|
||||
_topology,
|
||||
_index,
|
||||
_ports,
|
||||
&peers,
|
||||
_peer_ports_by_name,
|
||||
_options,
|
||||
_template_config,
|
||||
)
|
||||
}
|
||||
|
||||
fn build_local_node_config_with_peers(
|
||||
_topology: &Self::Deployment,
|
||||
_index: usize,
|
||||
_ports: &LocalNodePorts,
|
||||
_peers: &[LocalPeerNode],
|
||||
_peer_ports_by_name: &HashMap<String, u16>,
|
||||
_options: &StartNodeOptions<Self>,
|
||||
_template_config: Option<&<Self as Application>::NodeConfig>,
|
||||
) -> Result<<Self as Application>::NodeConfig, DynError> {
|
||||
Err(std::io::Error::other(
|
||||
"build_local_node_config_with_peers is not implemented for this app",
|
||||
)
|
||||
.into())
|
||||
}
|
||||
|
||||
fn initial_persist_dir(
|
||||
_topology: &Self::Deployment,
|
||||
_node_name: &str,
|
||||
_index: usize,
|
||||
) -> Option<PathBuf> {
|
||||
None
|
||||
}
|
||||
|
||||
fn initial_snapshot_dir(
|
||||
_topology: &Self::Deployment,
|
||||
_node_name: &str,
|
||||
_index: usize,
|
||||
) -> Option<PathBuf> {
|
||||
None
|
||||
}
|
||||
|
||||
fn local_process_spec() -> Option<LocalProcessSpec> {
|
||||
None
|
||||
}
|
||||
|
||||
fn render_local_config(
|
||||
_config: &<Self as Application>::NodeConfig,
|
||||
) -> Result<Vec<u8>, DynError> {
|
||||
Err(std::io::Error::other("render_local_config is not implemented for this app").into())
|
||||
}
|
||||
|
||||
fn build_launch_spec(
|
||||
config: &<Self as Application>::NodeConfig,
|
||||
_dir: &Path,
|
||||
_label: &str,
|
||||
) -> Result<LaunchSpec, DynError> {
|
||||
let spec = Self::local_process_spec().ok_or_else(|| {
|
||||
std::io::Error::other("build_launch_spec is not implemented for this app")
|
||||
})?;
|
||||
let rendered = Self::render_local_config(config)?;
|
||||
rendered_config_launch_spec(rendered, &spec)
|
||||
}
|
||||
|
||||
fn http_api_port(_config: &<Self as Application>::NodeConfig) -> Option<u16> {
|
||||
None
|
||||
}
|
||||
|
||||
fn node_endpoints(
|
||||
config: &<Self as Application>::NodeConfig,
|
||||
) -> Result<NodeEndpoints, DynError> {
|
||||
if let Some(port) = Self::http_api_port(config) {
|
||||
return Ok(NodeEndpoints {
|
||||
api: SocketAddr::from((Ipv4Addr::LOCALHOST, port)),
|
||||
extra_ports: HashMap::new(),
|
||||
});
|
||||
}
|
||||
|
||||
Err(std::io::Error::other("node_endpoints is not implemented for this app").into())
|
||||
}
|
||||
|
||||
fn node_peer_port(node: &Node<Self>) -> u16 {
|
||||
node.endpoints().api.port()
|
||||
}
|
||||
|
||||
fn node_client_from_api_endpoint(_api: SocketAddr) -> Option<Self::NodeClient> {
|
||||
None
|
||||
}
|
||||
|
||||
fn node_client(endpoints: &NodeEndpoints) -> Result<Self::NodeClient, DynError> {
|
||||
if let Ok(client) =
|
||||
<Self as Application>::build_node_client(&discovered_node_access(endpoints))
|
||||
{
|
||||
return Ok(client);
|
||||
}
|
||||
|
||||
if let Some(client) = Self::node_client_from_api_endpoint(endpoints.api) {
|
||||
return Ok(client);
|
||||
}
|
||||
|
||||
Err(std::io::Error::other("node_client is not implemented for this app").into())
|
||||
}
|
||||
|
||||
fn readiness_endpoint_path() -> &'static str {
|
||||
<Self as Application>::node_readiness_path()
|
||||
}
|
||||
|
||||
async fn wait_readiness_stable(_nodes: &[Node<Self>]) -> Result<(), DynError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn wait_local_http_readiness<E: LocalDeployerEnv>(
|
||||
nodes: &[Node<E>],
|
||||
requirement: HttpReadinessRequirement,
|
||||
) -> Result<(), ReadinessError> {
|
||||
let ports: Vec<_> = nodes
|
||||
.iter()
|
||||
.map(|node| node.endpoints().api.port())
|
||||
.collect();
|
||||
wait_for_http_ports_with_requirement(&ports, E::readiness_endpoint_path(), requirement).await?;
|
||||
|
||||
E::wait_readiness_stable(nodes)
|
||||
.await
|
||||
.map_err(|source| ReadinessError::ClusterStable { source })
|
||||
}
|
||||
|
||||
pub async fn spawn_node_from_config<E: LocalDeployerEnv>(
|
||||
label: String,
|
||||
config: <E as Application>::NodeConfig,
|
||||
keep_tempdir: bool,
|
||||
persist_dir: Option<&std::path::Path>,
|
||||
snapshot_dir: Option<&std::path::Path>,
|
||||
) -> Result<Node<E>, ProcessSpawnError> {
|
||||
ProcessNode::spawn(
|
||||
&label,
|
||||
config,
|
||||
E::build_launch_spec,
|
||||
E::node_endpoints,
|
||||
keep_tempdir,
|
||||
persist_dir,
|
||||
snapshot_dir,
|
||||
E::node_client,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{
|
||||
path::Path,
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use testing_framework_core::{
|
||||
scenario::{Application, DynError, Feed, FeedRuntime, NodeClients},
|
||||
topology::DeploymentDescriptor,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
static STABLE_CALLS: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct DummyFeed;
|
||||
|
||||
impl Feed for DummyFeed {
|
||||
type Subscription = ();
|
||||
|
||||
fn subscribe(&self) -> Self::Subscription {}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct DummyFeedRuntime;
|
||||
|
||||
#[async_trait]
|
||||
impl FeedRuntime for DummyFeedRuntime {
|
||||
type Feed = DummyFeed;
|
||||
|
||||
async fn run(self: Box<Self>) {}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct DummyConfig;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct DummyTopology;
|
||||
|
||||
impl DeploymentDescriptor for DummyTopology {
|
||||
fn node_count(&self) -> usize {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
struct DummyEnv;
|
||||
|
||||
#[async_trait]
|
||||
impl Application for DummyEnv {
|
||||
type Deployment = DummyTopology;
|
||||
type NodeClient = ();
|
||||
type NodeConfig = DummyConfig;
|
||||
type FeedRuntime = DummyFeedRuntime;
|
||||
|
||||
async fn prepare_feed(
|
||||
_node_clients: NodeClients<Self>,
|
||||
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError>
|
||||
{
|
||||
Ok((DummyFeed, DummyFeedRuntime))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LocalDeployerEnv for DummyEnv {
|
||||
fn build_node_config(
|
||||
_topology: &Self::Deployment,
|
||||
_index: usize,
|
||||
_peer_ports_by_name: &HashMap<String, u16>,
|
||||
_options: &StartNodeOptions<Self>,
|
||||
_peer_ports: &[u16],
|
||||
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
|
||||
unreachable!("not used in this test")
|
||||
}
|
||||
|
||||
fn build_initial_node_configs(
|
||||
_topology: &Self::Deployment,
|
||||
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError>
|
||||
{
|
||||
unreachable!("not used in this test")
|
||||
}
|
||||
|
||||
fn build_launch_spec(
|
||||
_config: &<Self as Application>::NodeConfig,
|
||||
_dir: &Path,
|
||||
_label: &str,
|
||||
) -> Result<LaunchSpec, DynError> {
|
||||
Ok(LaunchSpec::default())
|
||||
}
|
||||
|
||||
fn node_endpoints(
|
||||
_config: &<Self as Application>::NodeConfig,
|
||||
) -> Result<NodeEndpoints, DynError> {
|
||||
Ok(NodeEndpoints::default())
|
||||
}
|
||||
|
||||
fn node_client(_endpoints: &NodeEndpoints) -> Result<Self::NodeClient, DynError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_readiness_stable(_nodes: &[Node<Self>]) -> Result<(), DynError> {
|
||||
STABLE_CALLS.fetch_add(1, Ordering::SeqCst);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn empty_cluster_still_runs_stability_hook() {
|
||||
STABLE_CALLS.store(0, Ordering::SeqCst);
|
||||
let nodes: Vec<Node<DummyEnv>> = Vec::new();
|
||||
wait_local_http_readiness::<DummyEnv>(&nodes, HttpReadinessRequirement::AllNodesReady)
|
||||
.await
|
||||
.expect("empty cluster should be considered ready");
|
||||
assert_eq!(STABLE_CALLS.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
}
|
||||
314
testing-framework/deployers/local/src/env/helpers.rs
vendored
Normal file
314
testing-framework/deployers/local/src/env/helpers.rs
vendored
Normal file
@ -0,0 +1,314 @@
|
||||
use std::{collections::HashMap, path::PathBuf};
|
||||
|
||||
use serde::Serialize;
|
||||
use testing_framework_core::scenario::{
|
||||
Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError,
|
||||
NodeAccess,
|
||||
};
|
||||
|
||||
use crate::process::{LaunchSpec, NodeEndpointPort, NodeEndpoints, ProcessSpawnError};
|
||||
|
||||
pub struct BuiltNodeConfig<Config> {
|
||||
pub config: Config,
|
||||
pub network_port: u16,
|
||||
}
|
||||
|
||||
pub struct NodeConfigEntry<NodeConfigValue> {
|
||||
pub name: String,
|
||||
pub config: NodeConfigValue,
|
||||
}
|
||||
|
||||
pub struct LocalNodePorts {
|
||||
network_port: u16,
|
||||
named_ports: HashMap<&'static str, u16>,
|
||||
}
|
||||
|
||||
impl LocalNodePorts {
|
||||
#[must_use]
|
||||
pub fn network_port(&self) -> u16 {
|
||||
self.network_port
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn get(&self, name: &str) -> Option<u16> {
|
||||
self.named_ports.get(name).copied()
|
||||
}
|
||||
|
||||
pub fn require(&self, name: &str) -> Result<u16, DynError> {
|
||||
self.get(name)
|
||||
.ok_or_else(|| format!("missing reserved local port '{name}'").into())
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = (&'static str, u16)> + '_ {
|
||||
self.named_ports.iter().map(|(name, port)| (*name, *port))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LocalPeerNode {
|
||||
index: usize,
|
||||
network_port: u16,
|
||||
}
|
||||
|
||||
impl LocalPeerNode {
|
||||
#[must_use]
|
||||
pub fn index(&self) -> usize {
|
||||
self.index
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn network_port(&self) -> u16 {
|
||||
self.network_port
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn http_address(&self) -> String {
|
||||
format!("127.0.0.1:{}", self.network_port)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn authority(&self) -> String {
|
||||
self.http_address()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct LocalProcessSpec {
|
||||
pub binary_env_var: String,
|
||||
pub binary_name: String,
|
||||
pub config_file_name: String,
|
||||
pub config_arg: String,
|
||||
pub extra_args: Vec<String>,
|
||||
pub env: Vec<crate::process::LaunchEnvVar>,
|
||||
}
|
||||
|
||||
impl LocalProcessSpec {
|
||||
#[must_use]
|
||||
pub fn new(binary_env_var: &str, binary_name: &str) -> Self {
|
||||
Self {
|
||||
binary_env_var: binary_env_var.to_owned(),
|
||||
binary_name: binary_name.to_owned(),
|
||||
config_file_name: "config.yaml".to_owned(),
|
||||
config_arg: "--config".to_owned(),
|
||||
extra_args: Vec::new(),
|
||||
env: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_config_file(mut self, file_name: &str, arg: &str) -> Self {
|
||||
self.config_file_name = file_name.to_owned();
|
||||
self.config_arg = arg.to_owned();
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_env(mut self, key: &str, value: &str) -> Self {
|
||||
self.env.push(crate::process::LaunchEnvVar::new(key, value));
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_rust_log(self, value: &str) -> Self {
|
||||
self.with_env("RUST_LOG", value)
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_args(mut self, args: impl IntoIterator<Item = String>) -> Self {
|
||||
self.extra_args.extend(args);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn preallocate_ports(count: usize, label: &str) -> Result<Vec<u16>, ProcessSpawnError> {
|
||||
(0..count)
|
||||
.map(|_| crate::process::allocate_available_port())
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(|source| ProcessSpawnError::Config {
|
||||
source: format!("failed to pre-allocate {label} ports: {source}").into(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn build_indexed_node_configs<T>(
|
||||
count: usize,
|
||||
name_prefix: &str,
|
||||
build: impl FnMut(usize) -> T,
|
||||
) -> Vec<NodeConfigEntry<T>> {
|
||||
(0..count)
|
||||
.map(build)
|
||||
.enumerate()
|
||||
.map(|(index, config)| NodeConfigEntry {
|
||||
name: format!("{name_prefix}-{index}"),
|
||||
config,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn reserve_local_node_ports(
|
||||
count: usize,
|
||||
names: &[&'static str],
|
||||
label: &str,
|
||||
) -> Result<Vec<LocalNodePorts>, ProcessSpawnError> {
|
||||
let network_ports = preallocate_ports(count, label)?;
|
||||
let mut named_by_role = HashMap::new();
|
||||
for name in names {
|
||||
named_by_role.insert(*name, preallocate_ports(count, &format!("{label} {name}"))?);
|
||||
}
|
||||
|
||||
Ok((0..count)
|
||||
.map(|index| LocalNodePorts {
|
||||
network_port: network_ports[index],
|
||||
named_ports: named_by_role
|
||||
.iter()
|
||||
.map(|(name, ports)| (*name, ports[index]))
|
||||
.collect(),
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub fn single_http_node_endpoints(port: u16) -> NodeEndpoints {
|
||||
NodeEndpoints::from_api_port(port)
|
||||
}
|
||||
|
||||
pub fn build_local_cluster_node_config<E>(
|
||||
index: usize,
|
||||
ports: &LocalNodePorts,
|
||||
peers: &[LocalPeerNode],
|
||||
) -> Result<<E as Application>::NodeConfig, DynError>
|
||||
where
|
||||
E: ClusterNodeConfigApplication,
|
||||
{
|
||||
let mut node = ClusterNodeView::new(index, "127.0.0.1", ports.network_port());
|
||||
for (name, port) in ports.iter() {
|
||||
node = node.with_named_port(name, port);
|
||||
}
|
||||
|
||||
let peer_views = peers
|
||||
.iter()
|
||||
.map(|peer| ClusterPeerView::new(peer.index(), "127.0.0.1", peer.network_port()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
E::build_cluster_node_config(&node, &peer_views).map_err(Into::into)
|
||||
}
|
||||
|
||||
pub fn discovered_node_access(endpoints: &NodeEndpoints) -> NodeAccess {
|
||||
let mut access = NodeAccess::new("127.0.0.1", endpoints.api.port());
|
||||
|
||||
for (key, port) in &endpoints.extra_ports {
|
||||
match key {
|
||||
NodeEndpointPort::TestingApi => {
|
||||
access = access.with_testing_port(*port);
|
||||
}
|
||||
NodeEndpointPort::Custom(name) => {
|
||||
access = access.with_named_port(name.clone(), *port);
|
||||
}
|
||||
NodeEndpointPort::Network => {}
|
||||
}
|
||||
}
|
||||
|
||||
access
|
||||
}
|
||||
|
||||
pub fn build_indexed_http_peers<T>(
|
||||
node_count: usize,
|
||||
self_index: usize,
|
||||
peer_ports: &[u16],
|
||||
mut build_peer: impl FnMut(usize, String) -> T,
|
||||
) -> Vec<T> {
|
||||
(0..node_count)
|
||||
.filter(|&i| i != self_index)
|
||||
.map(|i| build_peer(i, format!("127.0.0.1:{}", peer_ports[i])))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn compact_peer_ports(peer_ports: &[u16], self_index: usize) -> Vec<u16> {
|
||||
peer_ports
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(index, port)| (index != self_index).then_some(*port))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn build_local_peer_nodes(peer_ports: &[u16], self_index: usize) -> Vec<LocalPeerNode> {
|
||||
peer_ports
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(index, port)| {
|
||||
(index != self_index).then_some(LocalPeerNode {
|
||||
index,
|
||||
network_port: *port,
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn yaml_config_launch_spec<T: Serialize>(
|
||||
config: &T,
|
||||
spec: &LocalProcessSpec,
|
||||
) -> Result<LaunchSpec, DynError> {
|
||||
let config_yaml = serde_yaml::to_string(config)?;
|
||||
rendered_config_launch_spec(config_yaml.into_bytes(), spec)
|
||||
}
|
||||
|
||||
pub fn text_config_launch_spec(
|
||||
rendered_config: impl Into<Vec<u8>>,
|
||||
spec: &LocalProcessSpec,
|
||||
) -> Result<LaunchSpec, DynError> {
|
||||
rendered_config_launch_spec(rendered_config.into(), spec)
|
||||
}
|
||||
|
||||
pub fn default_yaml_launch_spec<T: Serialize>(
|
||||
config: &T,
|
||||
binary_env_var: &str,
|
||||
binary_name: &str,
|
||||
rust_log: &str,
|
||||
) -> Result<LaunchSpec, DynError> {
|
||||
yaml_config_launch_spec(
|
||||
config,
|
||||
&LocalProcessSpec::new(binary_env_var, binary_name).with_rust_log(rust_log),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn yaml_node_config<T: Serialize>(config: &T) -> Result<Vec<u8>, DynError> {
|
||||
Ok(serde_yaml::to_string(config)?.into_bytes())
|
||||
}
|
||||
|
||||
pub fn text_node_config(rendered_config: impl Into<Vec<u8>>) -> Vec<u8> {
|
||||
rendered_config.into()
|
||||
}
|
||||
|
||||
pub(crate) fn rendered_config_launch_spec(
|
||||
rendered_config: Vec<u8>,
|
||||
spec: &LocalProcessSpec,
|
||||
) -> Result<LaunchSpec, DynError> {
|
||||
let binary = resolve_binary(spec);
|
||||
let mut args = vec![spec.config_arg.clone(), spec.config_file_name.clone()];
|
||||
args.extend(spec.extra_args.iter().cloned());
|
||||
|
||||
Ok(LaunchSpec {
|
||||
binary,
|
||||
files: vec![crate::process::LaunchFile {
|
||||
relative_path: spec.config_file_name.clone().into(),
|
||||
contents: rendered_config,
|
||||
}],
|
||||
args,
|
||||
env: spec.env.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn resolve_binary(spec: &LocalProcessSpec) -> PathBuf {
|
||||
std::env::var(&spec.binary_env_var)
|
||||
.map(PathBuf::from)
|
||||
.or_else(|_| which::which(&spec.binary_name))
|
||||
.unwrap_or_else(|_| {
|
||||
let mut path = std::env::current_dir().unwrap_or_default();
|
||||
let mut debug = path.clone();
|
||||
debug.push(format!("target/debug/{}", spec.binary_name));
|
||||
if debug.exists() {
|
||||
return debug;
|
||||
}
|
||||
|
||||
path.push(format!("target/release/{}", spec.binary_name));
|
||||
path
|
||||
})
|
||||
}
|
||||
341
testing-framework/deployers/local/src/env/mod.rs
vendored
Normal file
341
testing-framework/deployers/local/src/env/mod.rs
vendored
Normal file
@ -0,0 +1,341 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::{Ipv4Addr, SocketAddr},
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use testing_framework_core::{
|
||||
scenario::{
|
||||
Application, DynError, HttpReadinessRequirement, ReadinessError, StartNodeOptions,
|
||||
wait_for_http_ports_with_requirement,
|
||||
},
|
||||
topology::DeploymentDescriptor,
|
||||
};
|
||||
|
||||
use crate::process::{LaunchSpec, NodeEndpoints, ProcessNode, ProcessSpawnError};
|
||||
|
||||
mod helpers;
|
||||
mod runtime;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub use helpers::{
|
||||
BuiltNodeConfig, LocalNodePorts, LocalPeerNode, LocalProcessSpec, NodeConfigEntry,
|
||||
build_indexed_http_peers, build_indexed_node_configs, build_local_cluster_node_config,
|
||||
build_local_peer_nodes, default_yaml_launch_spec, discovered_node_access, preallocate_ports,
|
||||
reserve_local_node_ports, single_http_node_endpoints, text_config_launch_spec,
|
||||
text_node_config, yaml_config_launch_spec, yaml_node_config,
|
||||
};
|
||||
pub use runtime::{
|
||||
LocalAccess, LocalBuildContext, LocalProcess, LocalRuntime, cluster_node_config_from_context,
|
||||
};
|
||||
|
||||
pub type Node<E> = ProcessNode<<E as Application>::NodeConfig, <E as Application>::NodeClient>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait LocalDeployerEnv: Application + Sized
|
||||
where
|
||||
<Self as Application>::NodeConfig: Clone + Send + Sync + 'static,
|
||||
{
|
||||
fn local_runtime() -> Option<LocalRuntime<Self>> {
|
||||
None
|
||||
}
|
||||
|
||||
fn local_port_names() -> &'static [&'static str] {
|
||||
Self::local_runtime()
|
||||
.map(|runtime| runtime.process.port_names)
|
||||
.unwrap_or_else(Self::initial_local_port_names)
|
||||
}
|
||||
|
||||
fn build_node_config(
|
||||
topology: &Self::Deployment,
|
||||
index: usize,
|
||||
peer_ports_by_name: &HashMap<String, u16>,
|
||||
options: &StartNodeOptions<Self>,
|
||||
peer_ports: &[u16],
|
||||
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
|
||||
Self::build_node_config_from_template(
|
||||
topology,
|
||||
index,
|
||||
peer_ports_by_name,
|
||||
options,
|
||||
peer_ports,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
fn build_node_config_from_template(
|
||||
topology: &Self::Deployment,
|
||||
index: usize,
|
||||
peer_ports_by_name: &HashMap<String, u16>,
|
||||
options: &StartNodeOptions<Self>,
|
||||
peer_ports: &[u16],
|
||||
template_config: Option<&<Self as Application>::NodeConfig>,
|
||||
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
|
||||
let mut reserved = reserve_local_node_ports(1, Self::local_port_names(), "node")
|
||||
.map_err(|source| -> DynError { source.into() })?;
|
||||
let ports = reserved
|
||||
.pop()
|
||||
.ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?;
|
||||
let network_port = ports.network_port();
|
||||
let config = Self::build_local_node_config(
|
||||
topology,
|
||||
index,
|
||||
&ports,
|
||||
peer_ports_by_name,
|
||||
options,
|
||||
peer_ports,
|
||||
template_config,
|
||||
)?;
|
||||
|
||||
Ok(BuiltNodeConfig {
|
||||
config,
|
||||
network_port,
|
||||
})
|
||||
}
|
||||
|
||||
fn build_initial_node_configs(
|
||||
topology: &Self::Deployment,
|
||||
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError> {
|
||||
let reserved_ports = reserve_local_node_ports(
|
||||
topology.node_count(),
|
||||
Self::local_port_names(),
|
||||
Self::initial_node_name_prefix(),
|
||||
)?;
|
||||
let peer_ports = reserved_ports
|
||||
.iter()
|
||||
.map(LocalNodePorts::network_port)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut configs = Vec::with_capacity(topology.node_count());
|
||||
for (index, ports) in reserved_ports.iter().enumerate() {
|
||||
let config = Self::build_initial_node_config(topology, index, ports, &peer_ports)
|
||||
.map_err(|source| ProcessSpawnError::Config { source })?;
|
||||
configs.push(NodeConfigEntry {
|
||||
name: format!("{}-{index}", Self::initial_node_name_prefix()),
|
||||
config,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(configs)
|
||||
}
|
||||
|
||||
fn initial_node_name_prefix() -> &'static str {
|
||||
Self::local_runtime()
|
||||
.map(|runtime| runtime.process.node_name_prefix)
|
||||
.unwrap_or("node")
|
||||
}
|
||||
|
||||
fn initial_local_port_names() -> &'static [&'static str] {
|
||||
&[]
|
||||
}
|
||||
|
||||
fn build_initial_node_config(
|
||||
topology: &Self::Deployment,
|
||||
index: usize,
|
||||
ports: &LocalNodePorts,
|
||||
peer_ports: &[u16],
|
||||
) -> Result<<Self as Application>::NodeConfig, DynError> {
|
||||
let compact_peer_ports = helpers::compact_peer_ports(peer_ports, index);
|
||||
let peer_ports_by_name = HashMap::new();
|
||||
let options = StartNodeOptions::<Self>::default();
|
||||
Self::build_local_node_config(
|
||||
topology,
|
||||
index,
|
||||
ports,
|
||||
&peer_ports_by_name,
|
||||
&options,
|
||||
&compact_peer_ports,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
fn build_local_node_config(
|
||||
topology: &Self::Deployment,
|
||||
index: usize,
|
||||
ports: &LocalNodePorts,
|
||||
peer_ports_by_name: &HashMap<String, u16>,
|
||||
options: &StartNodeOptions<Self>,
|
||||
peer_ports: &[u16],
|
||||
template_config: Option<&<Self as Application>::NodeConfig>,
|
||||
) -> Result<<Self as Application>::NodeConfig, DynError> {
|
||||
let peers = build_local_peer_nodes(peer_ports, index);
|
||||
Self::build_local_node_config_with_peers(
|
||||
topology,
|
||||
index,
|
||||
ports,
|
||||
&peers,
|
||||
peer_ports_by_name,
|
||||
options,
|
||||
template_config,
|
||||
)
|
||||
}
|
||||
|
||||
fn build_local_node_config_with_peers(
|
||||
topology: &Self::Deployment,
|
||||
index: usize,
|
||||
ports: &LocalNodePorts,
|
||||
peers: &[LocalPeerNode],
|
||||
peer_ports_by_name: &HashMap<String, u16>,
|
||||
options: &StartNodeOptions<Self>,
|
||||
template_config: Option<&<Self as Application>::NodeConfig>,
|
||||
) -> Result<<Self as Application>::NodeConfig, DynError> {
|
||||
if let Some(runtime) = Self::local_runtime() {
|
||||
return (runtime.process.build_config)(LocalBuildContext {
|
||||
topology,
|
||||
index,
|
||||
ports,
|
||||
peers,
|
||||
peer_ports_by_name,
|
||||
options,
|
||||
template_config,
|
||||
});
|
||||
}
|
||||
|
||||
Err(std::io::Error::other(
|
||||
"build_local_node_config_with_peers is not implemented for this app",
|
||||
)
|
||||
.into())
|
||||
}
|
||||
|
||||
fn initial_persist_dir(
|
||||
_topology: &Self::Deployment,
|
||||
_node_name: &str,
|
||||
_index: usize,
|
||||
) -> Option<PathBuf> {
|
||||
None
|
||||
}
|
||||
|
||||
fn initial_snapshot_dir(
|
||||
_topology: &Self::Deployment,
|
||||
_node_name: &str,
|
||||
_index: usize,
|
||||
) -> Option<PathBuf> {
|
||||
None
|
||||
}
|
||||
|
||||
fn local_process_spec() -> Option<LocalProcessSpec> {
|
||||
Self::local_runtime().map(|runtime| runtime.process.spec)
|
||||
}
|
||||
|
||||
fn render_local_config(
|
||||
config: &<Self as Application>::NodeConfig,
|
||||
) -> Result<Vec<u8>, DynError> {
|
||||
if let Some(runtime) = Self::local_runtime() {
|
||||
return (runtime.process.render_config)(config);
|
||||
}
|
||||
|
||||
Err(std::io::Error::other("render_local_config is not implemented for this app").into())
|
||||
}
|
||||
|
||||
fn build_launch_spec(
|
||||
config: &<Self as Application>::NodeConfig,
|
||||
_dir: &Path,
|
||||
_label: &str,
|
||||
) -> Result<LaunchSpec, DynError> {
|
||||
let spec = Self::local_process_spec().ok_or_else(|| {
|
||||
std::io::Error::other("build_launch_spec is not implemented for this app")
|
||||
})?;
|
||||
let rendered = Self::render_local_config(config)?;
|
||||
helpers::rendered_config_launch_spec(rendered, &spec)
|
||||
}
|
||||
|
||||
fn http_api_port(config: &<Self as Application>::NodeConfig) -> Option<u16> {
|
||||
Self::local_runtime()
|
||||
.and_then(|runtime| runtime.access.api_port.map(|api_port| api_port(config)))
|
||||
}
|
||||
|
||||
fn node_endpoints(
|
||||
config: &<Self as Application>::NodeConfig,
|
||||
) -> Result<NodeEndpoints, DynError> {
|
||||
if let Some(runtime) = Self::local_runtime() {
|
||||
return runtime.access.node_endpoints(config);
|
||||
}
|
||||
|
||||
if let Some(port) = Self::http_api_port(config) {
|
||||
return Ok(NodeEndpoints {
|
||||
api: SocketAddr::from((Ipv4Addr::LOCALHOST, port)),
|
||||
extra_ports: HashMap::new(),
|
||||
});
|
||||
}
|
||||
|
||||
Err(std::io::Error::other("node_endpoints is not implemented for this app").into())
|
||||
}
|
||||
|
||||
fn node_peer_port(node: &Node<Self>) -> u16 {
|
||||
if let Some(runtime) = Self::local_runtime() {
|
||||
return runtime
|
||||
.access
|
||||
.node_peer_port(node.config(), node.endpoints());
|
||||
}
|
||||
|
||||
node.endpoints().api.port()
|
||||
}
|
||||
|
||||
fn node_client_from_api_endpoint(_api: SocketAddr) -> Option<Self::NodeClient> {
|
||||
None
|
||||
}
|
||||
|
||||
fn node_client(endpoints: &NodeEndpoints) -> Result<Self::NodeClient, DynError> {
|
||||
if let Some(runtime) = Self::local_runtime() {
|
||||
return runtime.access.node_client(endpoints);
|
||||
}
|
||||
|
||||
if let Ok(client) =
|
||||
<Self as Application>::build_node_client(&discovered_node_access(endpoints))
|
||||
{
|
||||
return Ok(client);
|
||||
}
|
||||
|
||||
if let Some(client) = Self::node_client_from_api_endpoint(endpoints.api) {
|
||||
return Ok(client);
|
||||
}
|
||||
|
||||
Err(std::io::Error::other("node_client is not implemented for this app").into())
|
||||
}
|
||||
|
||||
fn readiness_endpoint_path() -> &'static str {
|
||||
Self::local_runtime()
|
||||
.map(|runtime| runtime.access.readiness_path)
|
||||
.unwrap_or_else(<Self as Application>::node_readiness_path)
|
||||
}
|
||||
|
||||
async fn wait_readiness_stable(_nodes: &[Node<Self>]) -> Result<(), DynError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn wait_local_http_readiness<E: LocalDeployerEnv>(
|
||||
nodes: &[Node<E>],
|
||||
requirement: HttpReadinessRequirement,
|
||||
) -> Result<(), ReadinessError> {
|
||||
let ports: Vec<_> = nodes
|
||||
.iter()
|
||||
.map(|node| node.endpoints().api.port())
|
||||
.collect();
|
||||
wait_for_http_ports_with_requirement(&ports, E::readiness_endpoint_path(), requirement).await?;
|
||||
|
||||
E::wait_readiness_stable(nodes)
|
||||
.await
|
||||
.map_err(|source| ReadinessError::ClusterStable { source })
|
||||
}
|
||||
|
||||
pub async fn spawn_node_from_config<E: LocalDeployerEnv>(
|
||||
label: String,
|
||||
config: <E as Application>::NodeConfig,
|
||||
keep_tempdir: bool,
|
||||
persist_dir: Option<&std::path::Path>,
|
||||
snapshot_dir: Option<&std::path::Path>,
|
||||
) -> Result<Node<E>, ProcessSpawnError> {
|
||||
ProcessNode::spawn(
|
||||
&label,
|
||||
config,
|
||||
E::build_launch_spec,
|
||||
E::node_endpoints,
|
||||
keep_tempdir,
|
||||
persist_dir,
|
||||
snapshot_dir,
|
||||
E::node_client,
|
||||
)
|
||||
.await
|
||||
}
|
||||
232
testing-framework/deployers/local/src/env/runtime.rs
vendored
Normal file
232
testing-framework/deployers/local/src/env/runtime.rs
vendored
Normal file
@ -0,0 +1,232 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::{Ipv4Addr, SocketAddr},
|
||||
};
|
||||
|
||||
use serde::Serialize;
|
||||
use testing_framework_core::scenario::{
|
||||
Application, ClusterNodeConfigApplication, DynError, NodeAccess, StartNodeOptions,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
env::{
|
||||
LocalNodePorts, LocalPeerNode, LocalProcessSpec, NodeEndpoints, discovered_node_access,
|
||||
yaml_node_config,
|
||||
},
|
||||
process::LaunchEnvVar,
|
||||
};
|
||||
|
||||
pub struct LocalBuildContext<'a, E: Application> {
|
||||
pub topology: &'a E::Deployment,
|
||||
pub index: usize,
|
||||
pub ports: &'a LocalNodePorts,
|
||||
pub peers: &'a [LocalPeerNode],
|
||||
pub peer_ports_by_name: &'a HashMap<String, u16>,
|
||||
pub options: &'a StartNodeOptions<E>,
|
||||
pub template_config: Option<&'a E::NodeConfig>,
|
||||
}
|
||||
|
||||
pub type LocalConfigBuilder<E> =
|
||||
for<'a> fn(LocalBuildContext<'a, E>) -> Result<<E as Application>::NodeConfig, DynError>;
|
||||
|
||||
pub type LocalConfigRenderer<E> = fn(&<E as Application>::NodeConfig) -> Result<Vec<u8>, DynError>;
|
||||
|
||||
pub type LocalApiPort<E> = fn(&<E as Application>::NodeConfig) -> u16;
|
||||
|
||||
pub type LocalEndpoints<E> = fn(&<E as Application>::NodeConfig) -> Result<NodeEndpoints, DynError>;
|
||||
|
||||
pub type LocalClientBuilder<E> =
|
||||
fn(&NodeAccess) -> Result<<E as Application>::NodeClient, DynError>;
|
||||
|
||||
pub type LocalPeerPort<E> = fn(&<E as Application>::NodeConfig, &NodeEndpoints) -> u16;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LocalProcess<E: Application> {
|
||||
pub(crate) spec: LocalProcessSpec,
|
||||
pub(crate) build_config: LocalConfigBuilder<E>,
|
||||
pub(crate) render_config: LocalConfigRenderer<E>,
|
||||
pub(crate) node_name_prefix: &'static str,
|
||||
pub(crate) port_names: &'static [&'static str],
|
||||
}
|
||||
|
||||
impl<E: Application> LocalProcess<E> {
|
||||
#[must_use]
|
||||
pub fn new(
|
||||
binary_env_var: &'static str,
|
||||
binary_name: &'static str,
|
||||
build_config: LocalConfigBuilder<E>,
|
||||
render_config: LocalConfigRenderer<E>,
|
||||
) -> Self {
|
||||
Self {
|
||||
spec: LocalProcessSpec::new(binary_env_var, binary_name),
|
||||
build_config,
|
||||
render_config,
|
||||
node_name_prefix: "node",
|
||||
port_names: &[],
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_node_name_prefix(mut self, value: &'static str) -> Self {
|
||||
self.node_name_prefix = value;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_port_names(mut self, value: &'static [&'static str]) -> Self {
|
||||
self.port_names = value;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_config_file(mut self, file_name: &str, arg: &str) -> Self {
|
||||
self.spec = self.spec.with_config_file(file_name, arg);
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_env(mut self, key: &str, value: &str) -> Self {
|
||||
self.spec = self.spec.with_env(key, value);
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_rust_log(mut self, value: &str) -> Self {
|
||||
self.spec = self.spec.with_rust_log(value);
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_args(mut self, args: impl IntoIterator<Item = String>) -> Self {
|
||||
self.spec = self.spec.with_args(args);
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_launch_env(mut self, vars: impl IntoIterator<Item = LaunchEnvVar>) -> Self {
|
||||
self.spec.env.extend(vars);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<E> LocalProcess<E>
|
||||
where
|
||||
E: Application,
|
||||
E::NodeConfig: Serialize,
|
||||
{
|
||||
#[must_use]
|
||||
pub fn yaml(
|
||||
binary_env_var: &'static str,
|
||||
binary_name: &'static str,
|
||||
build_config: LocalConfigBuilder<E>,
|
||||
) -> Self {
|
||||
Self::new(
|
||||
binary_env_var,
|
||||
binary_name,
|
||||
build_config,
|
||||
yaml_node_config::<E::NodeConfig>,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LocalAccess<E: Application> {
|
||||
pub(crate) api_port: Option<LocalApiPort<E>>,
|
||||
pub(crate) endpoints: Option<LocalEndpoints<E>>,
|
||||
pub(crate) client: Option<LocalClientBuilder<E>>,
|
||||
pub(crate) peer_port: Option<LocalPeerPort<E>>,
|
||||
pub(crate) readiness_path: &'static str,
|
||||
}
|
||||
|
||||
impl<E: Application> LocalAccess<E> {
|
||||
#[must_use]
|
||||
pub fn http(api_port: LocalApiPort<E>) -> Self {
|
||||
Self {
|
||||
api_port: Some(api_port),
|
||||
endpoints: None,
|
||||
client: None,
|
||||
peer_port: None,
|
||||
readiness_path: E::node_readiness_path(),
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn custom(endpoints: LocalEndpoints<E>) -> Self {
|
||||
Self {
|
||||
api_port: None,
|
||||
endpoints: Some(endpoints),
|
||||
client: None,
|
||||
peer_port: None,
|
||||
readiness_path: E::node_readiness_path(),
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_client(mut self, client: LocalClientBuilder<E>) -> Self {
|
||||
self.client = Some(client);
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_peer_port(mut self, peer_port: LocalPeerPort<E>) -> Self {
|
||||
self.peer_port = Some(peer_port);
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_readiness_path(mut self, readiness_path: &'static str) -> Self {
|
||||
self.readiness_path = readiness_path;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn node_endpoints(&self, config: &E::NodeConfig) -> Result<NodeEndpoints, DynError> {
|
||||
if let Some(endpoints) = self.endpoints {
|
||||
return endpoints(config);
|
||||
}
|
||||
|
||||
if let Some(api_port) = self.api_port {
|
||||
return Ok(NodeEndpoints {
|
||||
api: SocketAddr::from((Ipv4Addr::LOCALHOST, api_port(config))),
|
||||
extra_ports: HashMap::new(),
|
||||
});
|
||||
}
|
||||
|
||||
Err(std::io::Error::other("node endpoints are not configured").into())
|
||||
}
|
||||
|
||||
pub(crate) fn node_client(&self, endpoints: &NodeEndpoints) -> Result<E::NodeClient, DynError> {
|
||||
if let Some(client) = self.client {
|
||||
return client(&discovered_node_access(endpoints));
|
||||
}
|
||||
|
||||
E::build_node_client(&discovered_node_access(endpoints))
|
||||
}
|
||||
|
||||
pub(crate) fn node_peer_port(&self, config: &E::NodeConfig, endpoints: &NodeEndpoints) -> u16 {
|
||||
self.peer_port
|
||||
.map(|peer_port| peer_port(config, endpoints))
|
||||
.unwrap_or_else(|| endpoints.api.port())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LocalRuntime<E: Application> {
|
||||
pub(crate) process: LocalProcess<E>,
|
||||
pub(crate) access: LocalAccess<E>,
|
||||
}
|
||||
|
||||
impl<E: Application> LocalRuntime<E> {
|
||||
#[must_use]
|
||||
pub fn new(process: LocalProcess<E>, access: LocalAccess<E>) -> Self {
|
||||
Self { process, access }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cluster_node_config_from_context<E>(
|
||||
context: LocalBuildContext<'_, E>,
|
||||
) -> Result<<E as Application>::NodeConfig, DynError>
|
||||
where
|
||||
E: Application + ClusterNodeConfigApplication,
|
||||
{
|
||||
crate::env::build_local_cluster_node_config::<E>(context.index, context.ports, context.peers)
|
||||
}
|
||||
113
testing-framework/deployers/local/src/env/tests.rs
vendored
Normal file
113
testing-framework/deployers/local/src/env/tests.rs
vendored
Normal file
@ -0,0 +1,113 @@
|
||||
use std::{
|
||||
path::Path,
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use testing_framework_core::{
|
||||
scenario::{Application, DynError, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients},
|
||||
topology::DeploymentDescriptor,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
static STABLE_CALLS: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct DummyFeed;
|
||||
|
||||
impl Feed for DummyFeed {
|
||||
type Subscription = ();
|
||||
|
||||
fn subscribe(&self) -> Self::Subscription {}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct DummyFeedRuntime;
|
||||
|
||||
#[async_trait]
|
||||
impl FeedRuntime for DummyFeedRuntime {
|
||||
type Feed = DummyFeed;
|
||||
|
||||
async fn run(self: Box<Self>) {}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct DummyConfig;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct DummyTopology;
|
||||
|
||||
impl DeploymentDescriptor for DummyTopology {
|
||||
fn node_count(&self) -> usize {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
struct DummyEnv;
|
||||
|
||||
#[async_trait]
|
||||
impl Application for DummyEnv {
|
||||
type Deployment = DummyTopology;
|
||||
type NodeClient = ();
|
||||
type NodeConfig = DummyConfig;
|
||||
type FeedRuntime = DummyFeedRuntime;
|
||||
|
||||
async fn prepare_feed(
|
||||
_node_clients: NodeClients<Self>,
|
||||
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError> {
|
||||
Ok((DummyFeed, DummyFeedRuntime))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LocalDeployerEnv for DummyEnv {
|
||||
fn build_node_config(
|
||||
_topology: &Self::Deployment,
|
||||
_index: usize,
|
||||
_peer_ports_by_name: &std::collections::HashMap<String, u16>,
|
||||
_options: &StartNodeOptions<Self>,
|
||||
_peer_ports: &[u16],
|
||||
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
|
||||
unreachable!("not used in this test")
|
||||
}
|
||||
|
||||
fn build_initial_node_configs(
|
||||
_topology: &Self::Deployment,
|
||||
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError> {
|
||||
unreachable!("not used in this test")
|
||||
}
|
||||
|
||||
fn build_launch_spec(
|
||||
_config: &<Self as Application>::NodeConfig,
|
||||
_dir: &Path,
|
||||
_label: &str,
|
||||
) -> Result<crate::process::LaunchSpec, DynError> {
|
||||
Ok(crate::process::LaunchSpec::default())
|
||||
}
|
||||
|
||||
fn node_endpoints(
|
||||
_config: &<Self as Application>::NodeConfig,
|
||||
) -> Result<NodeEndpoints, DynError> {
|
||||
Ok(NodeEndpoints::default())
|
||||
}
|
||||
|
||||
fn node_client(_endpoints: &NodeEndpoints) -> Result<Self::NodeClient, DynError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_readiness_stable(_nodes: &[Node<Self>]) -> Result<(), DynError> {
|
||||
STABLE_CALLS.fetch_add(1, Ordering::SeqCst);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn empty_cluster_still_runs_stability_hook() {
|
||||
STABLE_CALLS.store(0, Ordering::SeqCst);
|
||||
let nodes: Vec<Node<DummyEnv>> = Vec::new();
|
||||
wait_local_http_readiness::<DummyEnv>(&nodes, HttpReadinessRequirement::AllNodesReady)
|
||||
.await
|
||||
.expect("empty cluster should be considered ready");
|
||||
assert_eq!(STABLE_CALLS.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
@ -9,9 +9,10 @@ pub mod process;
|
||||
pub use binary::{BinaryConfig, BinaryResolver};
|
||||
pub use deployer::{ProcessDeployer, ProcessDeployerError};
|
||||
pub use env::{
|
||||
BuiltNodeConfig, LocalDeployerEnv, LocalNodePorts, LocalPeerNode, LocalProcessSpec,
|
||||
NodeConfigEntry, build_indexed_http_peers, build_indexed_node_configs,
|
||||
build_local_cluster_node_config, build_local_peer_nodes, default_yaml_launch_spec,
|
||||
BuiltNodeConfig, LocalAccess, LocalBuildContext, LocalDeployerEnv, LocalNodePorts,
|
||||
LocalPeerNode, LocalProcess, LocalProcessSpec, LocalRuntime, NodeConfigEntry,
|
||||
build_indexed_http_peers, build_indexed_node_configs, build_local_cluster_node_config,
|
||||
build_local_peer_nodes, cluster_node_config_from_context, default_yaml_launch_spec,
|
||||
discovered_node_access, preallocate_ports, reserve_local_node_ports,
|
||||
single_http_node_endpoints, text_config_launch_spec, text_node_config, yaml_config_launch_spec,
|
||||
yaml_node_config,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user