refactor(local): redesign runtime around startup access lifecycle

This commit is contained in:
andrussal 2026-04-10 16:43:13 +02:00
parent 756a65fa77
commit e5406badd2
7 changed files with 441 additions and 362 deletions

View File

@ -1,12 +1,18 @@
use std::{collections::HashMap, path::PathBuf};
use serde::Serialize;
use testing_framework_core::scenario::{
Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError,
NodeAccess,
use testing_framework_core::{
scenario::{
Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError,
NodeAccess,
},
topology::DeploymentDescriptor,
};
use crate::process::{LaunchSpec, NodeEndpointPort, NodeEndpoints, ProcessSpawnError};
use crate::{
env::LocalBuildContext,
process::{LaunchSpec, NodeEndpointPort, NodeEndpoints, ProcessSpawnError},
};
pub struct BuiltNodeConfig<Config> {
pub config: Config,
@ -242,6 +248,50 @@ pub fn build_local_peer_nodes(peer_ports: &[u16], self_index: usize) -> Vec<Loca
.collect()
}
pub fn build_generated_initial_nodes<E>(
topology: &E::Deployment,
node_name_prefix: &str,
port_names: &[&'static str],
build_node: impl Fn(LocalBuildContext<'_, E>) -> Result<BuiltNodeConfig<E::NodeConfig>, DynError>,
) -> Result<Vec<NodeConfigEntry<E::NodeConfig>>, ProcessSpawnError>
where
E: Application,
{
let reserved_ports =
reserve_local_node_ports(topology.node_count(), port_names, node_name_prefix)?;
let peer_ports = reserved_ports
.iter()
.map(LocalNodePorts::network_port)
.collect::<Vec<_>>();
let peer_ports_by_name = HashMap::new();
let options = testing_framework_core::scenario::StartNodeOptions::<E>::default();
reserved_ports
.iter()
.enumerate()
.map(|(index, ports)| {
let compact_peer_ports = compact_peer_ports(&peer_ports, index);
let peers = build_local_peer_nodes(&compact_peer_ports, index);
let built = build_node(LocalBuildContext {
topology,
index,
ports,
peers: &peers,
peer_ports: &compact_peer_ports,
peer_ports_by_name: &peer_ports_by_name,
options: &options,
template_config: None,
})
.map_err(|source| ProcessSpawnError::Config { source })?;
Ok(NodeConfigEntry {
name: format!("{node_name_prefix}-{index}"),
config: built.config,
})
})
.collect()
}
pub fn yaml_config_launch_spec<T: Serialize>(
config: &T,
spec: &LocalProcessSpec,

View File

@ -1,19 +1,13 @@
use std::{
collections::HashMap,
net::{Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
use testing_framework_core::scenario::{
Application, DynError, HttpReadinessRequirement, ReadinessError,
wait_for_http_ports_with_requirement,
};
use testing_framework_core::{
scenario::{
Application, DynError, HttpReadinessRequirement, ReadinessError, StartNodeOptions,
wait_for_http_ports_with_requirement,
},
topology::DeploymentDescriptor,
use crate::{
LaunchSpec, NodeEndpoints,
process::{ProcessNode, ProcessSpawnError},
};
use crate::process::{LaunchSpec, NodeEndpoints, ProcessNode, ProcessSpawnError};
mod helpers;
mod runtime;
#[cfg(test)]
@ -27,282 +21,107 @@ pub use helpers::{
text_node_config, yaml_config_launch_spec, yaml_node_config,
};
pub use runtime::{
LocalAccess, LocalBuildContext, LocalProcess, LocalRuntime, cluster_node_config_from_context,
LocalAccess, LocalBuildContext, LocalLifecycle, LocalProcess, LocalRuntime,
LocalStableReadinessFuture, cluster_node_config_from_context,
};
pub type Node<E> = ProcessNode<<E as Application>::NodeConfig, <E as Application>::NodeClient>;
#[async_trait::async_trait]
pub trait LocalDeployerEnv: Application + Sized
where
<Self as Application>::NodeConfig: Clone + Send + Sync + 'static,
{
fn local_runtime() -> Option<LocalRuntime<Self>> {
None
}
fn local_runtime() -> LocalRuntime<Self>;
}
fn local_port_names() -> &'static [&'static str] {
Self::local_runtime()
.map(|runtime| runtime.process.port_names)
.unwrap_or_else(Self::initial_local_port_names)
}
pub(crate) fn runtime_for<E: LocalDeployerEnv>() -> LocalRuntime<E> {
E::local_runtime()
}
fn build_node_config(
topology: &Self::Deployment,
index: usize,
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
peer_ports: &[u16],
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
Self::build_node_config_from_template(
topology,
index,
peer_ports_by_name,
options,
peer_ports,
None,
)
}
pub(crate) fn build_node_from_template<E: LocalDeployerEnv>(
topology: &E::Deployment,
index: usize,
peer_ports_by_name: &std::collections::HashMap<String, u16>,
options: &testing_framework_core::scenario::StartNodeOptions<E>,
peer_ports: &[u16],
template_config: Option<&E::NodeConfig>,
) -> Result<BuiltNodeConfig<E::NodeConfig>, DynError> {
let runtime = runtime_for::<E>();
let mut reserved = reserve_local_node_ports(1, runtime.process.port_names, "node")
.map_err(|source| -> DynError { source.into() })?;
let ports = reserved
.pop()
.ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?;
let peers = build_local_peer_nodes(peer_ports, index);
fn build_node_config_from_template(
topology: &Self::Deployment,
index: usize,
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
peer_ports: &[u16],
template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
let mut reserved = reserve_local_node_ports(1, Self::local_port_names(), "node")
.map_err(|source| -> DynError { source.into() })?;
let ports = reserved
.pop()
.ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?;
let network_port = ports.network_port();
let config = Self::build_local_node_config(
topology,
index,
&ports,
peer_ports_by_name,
options,
peer_ports,
template_config,
)?;
runtime.process.build_node(LocalBuildContext {
topology,
index,
ports: &ports,
peers: &peers,
peer_ports,
peer_ports_by_name,
options,
template_config,
})
}
Ok(BuiltNodeConfig {
config,
network_port,
})
}
pub(crate) fn build_initial_node_configs<E: LocalDeployerEnv>(
topology: &E::Deployment,
) -> Result<Vec<NodeConfigEntry<E::NodeConfig>>, ProcessSpawnError> {
runtime_for::<E>().process.build_initial_nodes(topology)
}
fn build_initial_node_configs(
topology: &Self::Deployment,
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError> {
let reserved_ports = reserve_local_node_ports(
topology.node_count(),
Self::local_port_names(),
Self::initial_node_name_prefix(),
)?;
let peer_ports = reserved_ports
.iter()
.map(LocalNodePorts::network_port)
.collect::<Vec<_>>();
pub(crate) fn initial_persist_dir<E: LocalDeployerEnv>(
topology: &E::Deployment,
node_name: &str,
index: usize,
) -> Option<std::path::PathBuf> {
runtime_for::<E>()
.lifecycle
.initial_persist_dir(topology, node_name, index)
}
let mut configs = Vec::with_capacity(topology.node_count());
for (index, ports) in reserved_ports.iter().enumerate() {
let config = Self::build_initial_node_config(topology, index, ports, &peer_ports)
.map_err(|source| ProcessSpawnError::Config { source })?;
configs.push(NodeConfigEntry {
name: format!("{}-{index}", Self::initial_node_name_prefix()),
config,
});
}
pub(crate) fn initial_snapshot_dir<E: LocalDeployerEnv>(
topology: &E::Deployment,
node_name: &str,
index: usize,
) -> Option<std::path::PathBuf> {
runtime_for::<E>()
.lifecycle
.initial_snapshot_dir(topology, node_name, index)
}
Ok(configs)
}
pub(crate) fn build_launch_spec<E: LocalDeployerEnv>(
config: &E::NodeConfig,
dir: &std::path::Path,
label: &str,
) -> Result<LaunchSpec, DynError> {
runtime_for::<E>()
.process
.build_launch_spec(config, dir, label)
}
fn initial_node_name_prefix() -> &'static str {
Self::local_runtime()
.map(|runtime| runtime.process.node_name_prefix)
.unwrap_or("node")
}
pub(crate) fn node_endpoints<E: LocalDeployerEnv>(
config: &E::NodeConfig,
) -> Result<NodeEndpoints, DynError> {
runtime_for::<E>().access.node_endpoints(config)
}
fn initial_local_port_names() -> &'static [&'static str] {
&[]
}
pub(crate) fn node_client<E: LocalDeployerEnv>(
endpoints: &NodeEndpoints,
) -> Result<E::NodeClient, DynError> {
runtime_for::<E>().access.node_client(endpoints)
}
fn build_initial_node_config(
topology: &Self::Deployment,
index: usize,
ports: &LocalNodePorts,
peer_ports: &[u16],
) -> Result<<Self as Application>::NodeConfig, DynError> {
let compact_peer_ports = helpers::compact_peer_ports(peer_ports, index);
let peer_ports_by_name = HashMap::new();
let options = StartNodeOptions::<Self>::default();
Self::build_local_node_config(
topology,
index,
ports,
&peer_ports_by_name,
&options,
&compact_peer_ports,
None,
)
}
pub(crate) fn node_peer_port<E: LocalDeployerEnv>(node: &Node<E>) -> u16 {
runtime_for::<E>()
.access
.node_peer_port(node.config(), node.endpoints())
}
fn build_local_node_config(
topology: &Self::Deployment,
index: usize,
ports: &LocalNodePorts,
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
peer_ports: &[u16],
template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<<Self as Application>::NodeConfig, DynError> {
let peers = build_local_peer_nodes(peer_ports, index);
Self::build_local_node_config_with_peers(
topology,
index,
ports,
&peers,
peer_ports_by_name,
options,
template_config,
)
}
fn build_local_node_config_with_peers(
topology: &Self::Deployment,
index: usize,
ports: &LocalNodePorts,
peers: &[LocalPeerNode],
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<<Self as Application>::NodeConfig, DynError> {
if let Some(runtime) = Self::local_runtime() {
return (runtime.process.build_config)(LocalBuildContext {
topology,
index,
ports,
peers,
peer_ports_by_name,
options,
template_config,
});
}
Err(std::io::Error::other(
"build_local_node_config_with_peers is not implemented for this app",
)
.into())
}
fn initial_persist_dir(
_topology: &Self::Deployment,
_node_name: &str,
_index: usize,
) -> Option<PathBuf> {
None
}
fn initial_snapshot_dir(
_topology: &Self::Deployment,
_node_name: &str,
_index: usize,
) -> Option<PathBuf> {
None
}
fn local_process_spec() -> Option<LocalProcessSpec> {
Self::local_runtime().map(|runtime| runtime.process.spec)
}
fn render_local_config(
config: &<Self as Application>::NodeConfig,
) -> Result<Vec<u8>, DynError> {
if let Some(runtime) = Self::local_runtime() {
return (runtime.process.render_config)(config);
}
Err(std::io::Error::other("render_local_config is not implemented for this app").into())
}
fn build_launch_spec(
config: &<Self as Application>::NodeConfig,
_dir: &Path,
_label: &str,
) -> Result<LaunchSpec, DynError> {
let spec = Self::local_process_spec().ok_or_else(|| {
std::io::Error::other("build_launch_spec is not implemented for this app")
})?;
let rendered = Self::render_local_config(config)?;
helpers::rendered_config_launch_spec(rendered, &spec)
}
fn http_api_port(config: &<Self as Application>::NodeConfig) -> Option<u16> {
Self::local_runtime()
.and_then(|runtime| runtime.access.api_port.map(|api_port| api_port(config)))
}
fn node_endpoints(
config: &<Self as Application>::NodeConfig,
) -> Result<NodeEndpoints, DynError> {
if let Some(runtime) = Self::local_runtime() {
return runtime.access.node_endpoints(config);
}
if let Some(port) = Self::http_api_port(config) {
return Ok(NodeEndpoints {
api: SocketAddr::from((Ipv4Addr::LOCALHOST, port)),
extra_ports: HashMap::new(),
});
}
Err(std::io::Error::other("node_endpoints is not implemented for this app").into())
}
fn node_peer_port(node: &Node<Self>) -> u16 {
if let Some(runtime) = Self::local_runtime() {
return runtime
.access
.node_peer_port(node.config(), node.endpoints());
}
node.endpoints().api.port()
}
fn node_client_from_api_endpoint(_api: SocketAddr) -> Option<Self::NodeClient> {
None
}
fn node_client(endpoints: &NodeEndpoints) -> Result<Self::NodeClient, DynError> {
if let Some(runtime) = Self::local_runtime() {
return runtime.access.node_client(endpoints);
}
if let Ok(client) =
<Self as Application>::build_node_client(&discovered_node_access(endpoints))
{
return Ok(client);
}
if let Some(client) = Self::node_client_from_api_endpoint(endpoints.api) {
return Ok(client);
}
Err(std::io::Error::other("node_client is not implemented for this app").into())
}
fn readiness_endpoint_path() -> &'static str {
Self::local_runtime()
.map(|runtime| runtime.access.readiness_path)
.unwrap_or_else(<Self as Application>::node_readiness_path)
}
async fn wait_readiness_stable(_nodes: &[Node<Self>]) -> Result<(), DynError> {
Ok(())
}
pub(crate) fn readiness_endpoint_path<E: LocalDeployerEnv>() -> &'static str {
runtime_for::<E>().access.readiness_path()
}
pub async fn wait_local_http_readiness<E: LocalDeployerEnv>(
@ -313,9 +132,13 @@ pub async fn wait_local_http_readiness<E: LocalDeployerEnv>(
.iter()
.map(|node| node.endpoints().api.port())
.collect();
wait_for_http_ports_with_requirement(&ports, E::readiness_endpoint_path(), requirement).await?;
E::wait_readiness_stable(nodes)
wait_for_http_ports_with_requirement(&ports, readiness_endpoint_path::<E>(), requirement)
.await?;
runtime_for::<E>()
.lifecycle
.wait_stable(nodes)
.await
.map_err(|source| ReadinessError::ClusterStable { source })
}
@ -330,12 +153,12 @@ pub async fn spawn_node_from_config<E: LocalDeployerEnv>(
ProcessNode::spawn(
&label,
config,
E::build_launch_spec,
E::node_endpoints,
build_launch_spec::<E>,
node_endpoints::<E>,
keep_tempdir,
persist_dir,
snapshot_dir,
E::node_client,
node_client::<E>,
)
.await
}

View File

@ -1,6 +1,8 @@
use std::{
collections::HashMap,
net::{Ipv4Addr, SocketAddr},
future::Future,
path::{Path, PathBuf},
pin::Pin,
};
use serde::Serialize;
@ -10,10 +12,10 @@ use testing_framework_core::scenario::{
use crate::{
env::{
LocalNodePorts, LocalPeerNode, LocalProcessSpec, NodeEndpoints, discovered_node_access,
yaml_node_config,
BuiltNodeConfig, LocalNodePorts, LocalPeerNode, LocalProcessSpec, Node, NodeConfigEntry,
NodeEndpoints, build_local_cluster_node_config, discovered_node_access, yaml_node_config,
},
process::LaunchEnvVar,
process::{LaunchEnvVar, LaunchSpec, ProcessSpawnError},
};
pub struct LocalBuildContext<'a, E: Application> {
@ -21,6 +23,7 @@ pub struct LocalBuildContext<'a, E: Application> {
pub index: usize,
pub ports: &'a LocalNodePorts,
pub peers: &'a [LocalPeerNode],
pub peer_ports: &'a [u16],
pub peer_ports_by_name: &'a HashMap<String, u16>,
pub options: &'a StartNodeOptions<E>,
pub template_config: Option<&'a E::NodeConfig>,
@ -29,24 +32,78 @@ pub struct LocalBuildContext<'a, E: Application> {
pub type LocalConfigBuilder<E> =
for<'a> fn(LocalBuildContext<'a, E>) -> Result<<E as Application>::NodeConfig, DynError>;
pub type LocalDynamicNodeBuilder<E> =
for<'a> fn(
LocalBuildContext<'a, E>,
) -> Result<BuiltNodeConfig<<E as Application>::NodeConfig>, DynError>;
pub type LocalConfigRenderer<E> = fn(&<E as Application>::NodeConfig) -> Result<Vec<u8>, DynError>;
pub type LocalInitialNodesBuilder<E> =
fn(
&<E as Application>::Deployment,
) -> Result<Vec<NodeConfigEntry<<E as Application>::NodeConfig>>, ProcessSpawnError>;
pub type LocalLaunchSpecBuilder<E> =
fn(&<E as Application>::NodeConfig, &Path, &str) -> Result<LaunchSpec, DynError>;
pub type LocalApiPort<E> = fn(&<E as Application>::NodeConfig) -> u16;
pub type LocalEndpoints<E> = fn(&<E as Application>::NodeConfig) -> Result<NodeEndpoints, DynError>;
pub type LocalClientBuilder<E> =
fn(&NodeAccess) -> Result<<E as Application>::NodeClient, DynError>;
pub type LocalPeerPort<E> = fn(&<E as Application>::NodeConfig, &NodeEndpoints) -> u16;
pub type LocalPersistDir<E> = fn(&<E as Application>::Deployment, &str, usize) -> Option<PathBuf>;
pub type LocalSnapshotDir<E> = fn(&<E as Application>::Deployment, &str, usize) -> Option<PathBuf>;
pub type LocalStableReadinessFuture<'a> =
Pin<Box<dyn Future<Output = Result<(), DynError>> + Send + 'a>>;
pub type LocalStableReadiness<E> = for<'a> fn(&'a [Node<E>]) -> LocalStableReadinessFuture<'a>;
#[derive(Clone)]
enum LocalDynamicNode<E: Application> {
Standard { build_config: LocalConfigBuilder<E> },
Custom(LocalDynamicNodeBuilder<E>),
}
impl<E: Application> LocalDynamicNode<E> {
fn build(
&self,
context: LocalBuildContext<'_, E>,
) -> Result<BuiltNodeConfig<E::NodeConfig>, DynError> {
match self {
Self::Standard { build_config } => {
let network_port = context.ports.network_port();
Ok(BuiltNodeConfig {
config: build_config(context)?,
network_port,
})
}
Self::Custom(build) => build(context),
}
}
}
#[derive(Clone)]
enum LocalInitialNodes<E: Application> {
Generated,
Custom(LocalInitialNodesBuilder<E>),
}
#[derive(Clone)]
enum LocalLaunch<E: Application> {
Rendered {
spec: LocalProcessSpec,
render_config: LocalConfigRenderer<E>,
},
Custom(LocalLaunchSpecBuilder<E>),
}
#[derive(Clone)]
pub struct LocalProcess<E: Application> {
pub(crate) spec: LocalProcessSpec,
pub(crate) build_config: LocalConfigBuilder<E>,
pub(crate) render_config: LocalConfigRenderer<E>,
pub(crate) node_name_prefix: &'static str,
pub(crate) port_names: &'static [&'static str],
dynamic_node: LocalDynamicNode<E>,
initial_nodes: LocalInitialNodes<E>,
launch: LocalLaunch<E>,
}
impl<E: Application> LocalProcess<E> {
@ -58,11 +115,28 @@ impl<E: Application> LocalProcess<E> {
render_config: LocalConfigRenderer<E>,
) -> Self {
Self {
spec: LocalProcessSpec::new(binary_env_var, binary_name),
build_config,
render_config,
node_name_prefix: "node",
port_names: &[],
dynamic_node: LocalDynamicNode::Standard { build_config },
initial_nodes: LocalInitialNodes::Generated,
launch: LocalLaunch::Rendered {
spec: LocalProcessSpec::new(binary_env_var, binary_name),
render_config,
},
}
}
#[must_use]
pub fn custom(
build_node: LocalDynamicNodeBuilder<E>,
build_launch_spec: LocalLaunchSpecBuilder<E>,
) -> Self {
Self {
node_name_prefix: "node",
port_names: &[],
dynamic_node: LocalDynamicNode::Custom(build_node),
initial_nodes: LocalInitialNodes::Generated,
launch: LocalLaunch::Custom(build_launch_spec),
}
}
@ -78,35 +152,91 @@ impl<E: Application> LocalProcess<E> {
self
}
#[must_use]
pub fn with_initial_nodes(mut self, build_initial_nodes: LocalInitialNodesBuilder<E>) -> Self {
self.initial_nodes = LocalInitialNodes::Custom(build_initial_nodes);
self
}
#[must_use]
pub fn with_config_file(mut self, file_name: &str, arg: &str) -> Self {
self.spec = self.spec.with_config_file(file_name, arg);
if let LocalLaunch::Rendered { spec, .. } = &mut self.launch {
*spec = spec.clone().with_config_file(file_name, arg);
}
self
}
#[must_use]
pub fn with_env(mut self, key: &str, value: &str) -> Self {
self.spec = self.spec.with_env(key, value);
if let LocalLaunch::Rendered { spec, .. } = &mut self.launch {
*spec = spec.clone().with_env(key, value);
}
self
}
#[must_use]
pub fn with_rust_log(mut self, value: &str) -> Self {
self.spec = self.spec.with_rust_log(value);
if let LocalLaunch::Rendered { spec, .. } = &mut self.launch {
*spec = spec.clone().with_rust_log(value);
}
self
}
#[must_use]
pub fn with_args(mut self, args: impl IntoIterator<Item = String>) -> Self {
self.spec = self.spec.with_args(args);
if let LocalLaunch::Rendered { spec, .. } = &mut self.launch {
*spec = spec.clone().with_args(args);
}
self
}
#[must_use]
pub fn with_launch_env(mut self, vars: impl IntoIterator<Item = LaunchEnvVar>) -> Self {
self.spec.env.extend(vars);
if let LocalLaunch::Rendered { spec, .. } = &mut self.launch {
spec.env.extend(vars);
}
self
}
pub(crate) fn build_node(
&self,
context: LocalBuildContext<'_, E>,
) -> Result<BuiltNodeConfig<E::NodeConfig>, DynError> {
self.dynamic_node.build(context)
}
pub(crate) fn build_initial_nodes(
&self,
topology: &E::Deployment,
) -> Result<Vec<NodeConfigEntry<E::NodeConfig>>, ProcessSpawnError>
where
E::NodeConfig: Clone,
{
match self.initial_nodes {
LocalInitialNodes::Generated => super::helpers::build_generated_initial_nodes::<E>(
topology,
self.node_name_prefix,
self.port_names,
|context| self.build_node(context),
),
LocalInitialNodes::Custom(build) => build(topology),
}
}
pub(crate) fn build_launch_spec(
&self,
config: &E::NodeConfig,
dir: &Path,
label: &str,
) -> Result<LaunchSpec, DynError> {
match &self.launch {
LocalLaunch::Rendered {
spec,
render_config,
} => super::helpers::rendered_config_launch_spec(render_config(config)?, spec),
LocalLaunch::Custom(build) => build(config, dir, label),
}
}
}
impl<E> LocalProcess<E>
@ -131,11 +261,11 @@ where
#[derive(Clone)]
pub struct LocalAccess<E: Application> {
pub(crate) api_port: Option<LocalApiPort<E>>,
pub(crate) endpoints: Option<LocalEndpoints<E>>,
pub(crate) client: Option<LocalClientBuilder<E>>,
pub(crate) peer_port: Option<LocalPeerPort<E>>,
pub(crate) readiness_path: &'static str,
api_port: Option<LocalApiPort<E>>,
endpoints: Option<LocalEndpoints<E>>,
client: Option<LocalClientBuilder<E>>,
peer_port: Option<LocalPeerPort<E>>,
readiness_path: &'static str,
}
impl<E: Application> LocalAccess<E> {
@ -185,10 +315,7 @@ impl<E: Application> LocalAccess<E> {
}
if let Some(api_port) = self.api_port {
return Ok(NodeEndpoints {
api: SocketAddr::from((Ipv4Addr::LOCALHOST, api_port(config))),
extra_ports: HashMap::new(),
});
return Ok(NodeEndpoints::from_api_port(api_port(config)));
}
Err(std::io::Error::other("node endpoints are not configured").into())
@ -207,18 +334,97 @@ impl<E: Application> LocalAccess<E> {
.map(|peer_port| peer_port(config, endpoints))
.unwrap_or_else(|| endpoints.api.port())
}
pub(crate) fn readiness_path(&self) -> &'static str {
self.readiness_path
}
}
#[derive(Clone)]
pub struct LocalLifecycle<E: Application> {
initial_persist_dir: Option<LocalPersistDir<E>>,
initial_snapshot_dir: Option<LocalSnapshotDir<E>>,
stable_readiness: Option<LocalStableReadiness<E>>,
}
impl<E: Application> LocalLifecycle<E> {
#[must_use]
pub fn new() -> Self {
Self {
initial_persist_dir: None,
initial_snapshot_dir: None,
stable_readiness: None,
}
}
#[must_use]
pub fn with_initial_persist_dir(mut self, persist_dir: LocalPersistDir<E>) -> Self {
self.initial_persist_dir = Some(persist_dir);
self
}
#[must_use]
pub fn with_initial_snapshot_dir(mut self, snapshot_dir: LocalSnapshotDir<E>) -> Self {
self.initial_snapshot_dir = Some(snapshot_dir);
self
}
#[must_use]
pub fn with_stable_readiness(mut self, stable_readiness: LocalStableReadiness<E>) -> Self {
self.stable_readiness = Some(stable_readiness);
self
}
pub(crate) fn initial_persist_dir(
&self,
topology: &E::Deployment,
node_name: &str,
index: usize,
) -> Option<PathBuf> {
self.initial_persist_dir
.and_then(|persist_dir| persist_dir(topology, node_name, index))
}
pub(crate) fn initial_snapshot_dir(
&self,
topology: &E::Deployment,
node_name: &str,
index: usize,
) -> Option<PathBuf> {
self.initial_snapshot_dir
.and_then(|snapshot_dir| snapshot_dir(topology, node_name, index))
}
pub(crate) async fn wait_stable(&self, nodes: &[Node<E>]) -> Result<(), DynError> {
if let Some(stable_readiness) = self.stable_readiness {
return stable_readiness(nodes).await;
}
Ok(())
}
}
#[derive(Clone)]
pub struct LocalRuntime<E: Application> {
pub(crate) process: LocalProcess<E>,
pub(crate) access: LocalAccess<E>,
pub(crate) lifecycle: LocalLifecycle<E>,
}
impl<E: Application> LocalRuntime<E> {
#[must_use]
pub fn new(process: LocalProcess<E>, access: LocalAccess<E>) -> Self {
Self { process, access }
Self {
process,
access,
lifecycle: LocalLifecycle::new(),
}
}
#[must_use]
pub fn with_lifecycle(mut self, lifecycle: LocalLifecycle<E>) -> Self {
self.lifecycle = lifecycle;
self
}
}
@ -228,5 +434,5 @@ pub fn cluster_node_config_from_context<E>(
where
E: Application + ClusterNodeConfigApplication,
{
crate::env::build_local_cluster_node_config::<E>(context.index, context.ports, context.peers)
build_local_cluster_node_config::<E>(context.index, context.ports, context.peers)
}

View File

@ -1,9 +1,5 @@
use std::{
path::Path,
sync::atomic::{AtomicUsize, Ordering},
};
use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait;
use testing_framework_core::{
scenario::{Application, DynError, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients},
topology::DeploymentDescriptor,
@ -25,7 +21,7 @@ impl Feed for DummyFeed {
#[derive(Default)]
struct DummyFeedRuntime;
#[async_trait]
#[async_trait::async_trait]
impl FeedRuntime for DummyFeedRuntime {
type Feed = DummyFeed;
@ -46,7 +42,7 @@ impl DeploymentDescriptor for DummyTopology {
struct DummyEnv;
#[async_trait]
#[async_trait::async_trait]
impl Application for DummyEnv {
type Deployment = DummyTopology;
type NodeClient = ();
@ -60,46 +56,46 @@ impl Application for DummyEnv {
}
}
#[async_trait]
impl LocalDeployerEnv for DummyEnv {
fn build_node_config(
_topology: &Self::Deployment,
_index: usize,
_peer_ports_by_name: &std::collections::HashMap<String, u16>,
_options: &StartNodeOptions<Self>,
_peer_ports: &[u16],
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
unreachable!("not used in this test")
fn local_runtime() -> LocalRuntime<Self> {
LocalRuntime::new(
LocalProcess::custom(build_dummy_node, build_dummy_launch_spec)
.with_initial_nodes(build_dummy_initial_nodes),
LocalAccess::custom(dummy_endpoints).with_client(|_| Ok(())),
)
.with_lifecycle(LocalLifecycle::new().with_stable_readiness(dummy_wait_stable))
}
}
fn build_initial_node_configs(
_topology: &Self::Deployment,
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError> {
unreachable!("not used in this test")
}
fn build_dummy_node(
_context: LocalBuildContext<'_, DummyEnv>,
) -> Result<BuiltNodeConfig<DummyConfig>, DynError> {
unreachable!("not used in this test")
}
fn build_launch_spec(
_config: &<Self as Application>::NodeConfig,
_dir: &Path,
_label: &str,
) -> Result<crate::process::LaunchSpec, DynError> {
Ok(crate::process::LaunchSpec::default())
}
fn build_dummy_initial_nodes(
_topology: &DummyTopology,
) -> Result<Vec<NodeConfigEntry<DummyConfig>>, crate::process::ProcessSpawnError> {
unreachable!("not used in this test")
}
fn node_endpoints(
_config: &<Self as Application>::NodeConfig,
) -> Result<NodeEndpoints, DynError> {
Ok(NodeEndpoints::default())
}
fn build_dummy_launch_spec(
_config: &DummyConfig,
_dir: &std::path::Path,
_label: &str,
) -> Result<crate::process::LaunchSpec, DynError> {
Ok(crate::process::LaunchSpec::default())
}
fn node_client(_endpoints: &NodeEndpoints) -> Result<Self::NodeClient, DynError> {
Ok(())
}
fn dummy_endpoints(_config: &DummyConfig) -> Result<NodeEndpoints, DynError> {
Ok(NodeEndpoints::default())
}
async fn wait_readiness_stable(_nodes: &[Node<Self>]) -> Result<(), DynError> {
fn dummy_wait_stable<'a>(_nodes: &'a [Node<DummyEnv>]) -> runtime::LocalStableReadinessFuture<'a> {
Box::pin(async {
STABLE_CALLS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
})
}
#[tokio::test]

View File

@ -2,7 +2,7 @@ use std::net::ToSocketAddrs;
use testing_framework_core::scenario::{DynError, ExternalNodeSource};
use crate::{LocalDeployerEnv, NodeEndpoints};
use crate::{LocalDeployerEnv, NodeEndpoints, env::node_client};
#[derive(Debug, thiserror::Error)]
pub enum ExternalClientBuildError {
@ -31,7 +31,7 @@ pub fn build_external_client<E: LocalDeployerEnv>(
let api = resolve_api_socket(source)?;
let mut endpoints = NodeEndpoints::default();
endpoints.api = api;
E::node_client(&endpoints)
node_client::<E>(&endpoints)
}
fn resolve_api_socket(source: &ExternalNodeSource) -> Result<std::net::SocketAddr, DynError> {

View File

@ -9,13 +9,13 @@ pub mod process;
pub use binary::{BinaryConfig, BinaryResolver};
pub use deployer::{ProcessDeployer, ProcessDeployerError};
pub use env::{
BuiltNodeConfig, LocalAccess, LocalBuildContext, LocalDeployerEnv, LocalNodePorts,
LocalPeerNode, LocalProcess, LocalProcessSpec, LocalRuntime, NodeConfigEntry,
build_indexed_http_peers, build_indexed_node_configs, build_local_cluster_node_config,
build_local_peer_nodes, cluster_node_config_from_context, default_yaml_launch_spec,
discovered_node_access, preallocate_ports, reserve_local_node_ports,
single_http_node_endpoints, text_config_launch_spec, text_node_config, yaml_config_launch_spec,
yaml_node_config,
BuiltNodeConfig, LocalAccess, LocalBuildContext, LocalDeployerEnv, LocalLifecycle,
LocalNodePorts, LocalPeerNode, LocalProcess, LocalProcessSpec, LocalRuntime,
LocalStableReadinessFuture, NodeConfigEntry, build_indexed_http_peers,
build_indexed_node_configs, build_local_cluster_node_config, build_local_peer_nodes,
cluster_node_config_from_context, default_yaml_launch_spec, discovered_node_access,
preallocate_ports, reserve_local_node_ports, single_http_node_endpoints,
text_config_launch_spec, text_node_config, yaml_config_launch_spec, yaml_node_config,
};
pub use manual::{ManualCluster, ManualClusterError};
pub use node_control::{NodeManager, NodeManagerError, NodeManagerSeed};

View File

@ -10,7 +10,11 @@ use testing_framework_core::scenario::{
use thiserror::Error;
use crate::{
env::{LocalDeployerEnv, Node, spawn_node_from_config},
env::{
LocalDeployerEnv, Node, build_initial_node_configs, build_node_from_template,
initial_persist_dir, initial_snapshot_dir, node_peer_port, readiness_endpoint_path,
spawn_node_from_config,
},
process::ProcessSpawnError,
};
@ -79,12 +83,12 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
descriptors: &E::Deployment,
keep_tempdir: bool,
) -> Result<Vec<Node<E>>, ProcessSpawnError> {
let configs = E::build_initial_node_configs(descriptors)?;
let configs = build_initial_node_configs::<E>(descriptors)?;
let mut spawned = Vec::with_capacity(configs.len());
for (index, config_entry) in configs.into_iter().enumerate() {
let persist_dir = E::initial_persist_dir(descriptors, &config_entry.name, index);
let snapshot_dir = E::initial_snapshot_dir(descriptors, &config_entry.name, index);
let persist_dir = initial_persist_dir::<E>(descriptors, &config_entry.name, index);
let snapshot_dir = initial_snapshot_dir::<E>(descriptors, &config_entry.name, index);
spawned.push(
spawn_node_from_config::<E>(
config_entry.name,
@ -174,7 +178,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
for (idx, node) in nodes.into_iter().enumerate() {
let name = default_node_label(idx);
let port = E::node_peer_port(&node);
let port = node_peer_port::<E>(&node);
let client = node.client();
self.node_clients.add_node(client.clone());
@ -201,7 +205,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
return Ok(());
}
wait_for_http_ports(&ports, E::readiness_endpoint_path()).await
wait_for_http_ports(&ports, readiness_endpoint_path::<E>()).await
}
pub async fn wait_node_ready(&self, name: &str) -> Result<(), NodeManagerError> {
@ -224,7 +228,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
})?
};
wait_for_http_ports(&[port], E::readiness_endpoint_path())
wait_for_http_ports(&[port], readiness_endpoint_path::<E>())
.await
.map_err(|source| NodeManagerError::Readiness { source })
}
@ -236,7 +240,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
) -> Result<StartedNode<E>, NodeManagerError> {
let snapshot = self.start_snapshot(name)?;
let mut built = E::build_node_config_from_template(
let mut built = build_node_from_template::<E>(
&self.descriptors,
snapshot.index,
&snapshot.peer_ports_by_name,