refactor(deployers): split simple app traits from advanced hooks

This commit is contained in:
andrussal 2026-04-11 06:42:28 +02:00
parent 1c2f734bca
commit c172714a2f
9 changed files with 975 additions and 1548 deletions

File diff suppressed because it is too large Load Diff

View File

@ -21,9 +21,9 @@ pub use docker::{
platform::host_gateway_entry,
};
pub use env::{
ComposeAccess, ComposeCfgsync, ComposeConfigContext, ComposeConfigServerMode, ComposeConfigs,
ComposeDeployEnv, ComposeNodeConfigFileName, ComposeNodes, ComposeReadinessProbe,
ComposeRuntime, ComposeStack, ConfigServerHandle, discovered_node_access,
ComposeBinaryApp, ComposeConfigServerMode, ComposeDeployEnv, ComposeNodeConfigFileName,
ComposeReadinessProbe, ConfigServerHandle, discovered_node_access,
write_registration_server_compose_configs,
};
pub use errors::ComposeRunnerError;
pub use infrastructure::{

View File

@ -308,307 +308,260 @@ where
}
}
type K8sPortSpecsBuilder<E> =
Box<dyn Fn(&<E as Application>::Deployment) -> PortSpecs + Send + Sync>;
type K8sPreparedStackBuilder<E> = Box<
dyn Fn(
&<E as Application>::Deployment,
Option<&Url>,
) -> Result<Box<dyn PreparedK8sStack>, DynError>
+ Send
+ Sync,
>;
type K8sClusterIdentifiers = Box<dyn Fn() -> (String, String) + Send + Sync>;
type K8sIndexedName = Box<dyn Fn(&str, usize) -> String + Send + Sync>;
type K8sAttachSelector = Box<dyn Fn(&str) -> String + Send + Sync>;
type K8sNodeClientsBuilder<E> = Box<
dyn Fn(&str, &[u16], &[u16]) -> Result<Vec<<E as Application>::NodeClient>, DynError>
+ Send
+ Sync,
>;
type K8sNodeBaseUrl<E> =
Box<dyn Fn(&<E as Application>::NodeClient) -> Option<String> + Send + Sync>;
type K8sCfgsyncService = Box<dyn Fn(&str) -> Option<(String, u16)> + Send + Sync>;
type K8sCfgsyncHostnames = Box<dyn Fn(&str, usize) -> Vec<String> + Send + Sync>;
type K8sCfgsyncOverrideBuilder<E> = Box<
dyn Fn(
&<E as Application>::Deployment,
usize,
&[String],
&testing_framework_core::scenario::StartNodeOptions<E>,
) -> Result<Option<ArtifactSet>, DynError>
+ Send
+ Sync,
>;
/// Advanced k8s deployer integration.
#[async_trait]
pub trait K8sDeployEnv: Application + Sized {
type Assets: PreparedK8sStack + Send + Sync + 'static;
pub struct K8sRuntime<E: Application> {
install: K8sInstall<E>,
access: K8sAccess<E>,
manual: K8sManual<E>,
}
fn collect_port_specs(topology: &Self::Deployment) -> PortSpecs;
pub struct K8sInstall<E: Application> {
collect_port_specs: K8sPortSpecsBuilder<E>,
prepare_stack: K8sPreparedStackBuilder<E>,
cluster_identifiers: K8sClusterIdentifiers,
node_deployment_name: K8sIndexedName,
node_service_name: K8sIndexedName,
attach_node_service_selector: K8sAttachSelector,
}
fn prepare_assets(
topology: &Self::Deployment,
metrics_otlp_ingest_url: Option<&Url>,
) -> Result<Self::Assets, DynError>;
pub struct K8sAccess<E: Application> {
build_node_clients: K8sNodeClientsBuilder<E>,
readiness_path: &'static str,
node_role: &'static str,
node_base_url: K8sNodeBaseUrl<E>,
}
pub struct K8sManual<E: Application> {
cfgsync_service: K8sCfgsyncService,
cfgsync_hostnames: Option<K8sCfgsyncHostnames>,
build_cfgsync_override_artifacts: K8sCfgsyncOverrideBuilder<E>,
}
impl<E: Application> K8sRuntime<E> {
#[must_use]
pub fn new(install: K8sInstall<E>) -> Self {
Self {
install,
access: K8sAccess::default(),
manual: K8sManual::default(),
}
async fn install_stack(
client: &Client,
assets: &Self::Assets,
namespace: &str,
release: &str,
nodes: usize,
) -> Result<RunnerCleanup, DynError>
where
Self::Assets: PreparedK8sStack,
{
assets.install(client, namespace, release, nodes).await
}
#[must_use]
pub fn with_access(mut self, access: K8sAccess<E>) -> Self {
self.access = access;
self
fn cluster_identifiers() -> (String, String) {
default_cluster_identifiers()
}
#[must_use]
pub fn with_manual(mut self, manual: K8sManual<E>) -> Self {
self.manual = manual;
self
fn node_client_from_ports(
host: &str,
api_port: u16,
auxiliary_port: u16,
) -> Result<Self::NodeClient, DynError> {
<Self as Application>::build_node_client(&discovered_node_access(
host,
api_port,
auxiliary_port,
))
}
fn build_node_clients(
host: &str,
node_api_ports: &[u16],
node_auxiliary_ports: &[u16],
) -> Result<Vec<Self::NodeClient>, DynError> {
node_api_ports
.iter()
.zip(node_auxiliary_ports.iter())
.map(|(&api_port, &auxiliary_port)| {
Self::node_client_from_ports(host, api_port, auxiliary_port)
})
.collect()
}
fn node_readiness_path() -> &'static str {
<Self as Application>::node_readiness_path()
}
async fn wait_remote_readiness(
_deployment: &Self::Deployment,
urls: &[Url],
requirement: HttpReadinessRequirement,
) -> Result<(), DynError> {
let readiness_urls: Vec<_> = urls
.iter()
.map(|url| {
let mut endpoint = url.clone();
endpoint.set_path(<Self as K8sDeployEnv>::node_readiness_path());
endpoint
})
.collect();
wait_http_readiness(&readiness_urls, requirement).await?;
Ok(())
}
fn node_role() -> &'static str {
"node"
}
fn node_deployment_name(release: &str, index: usize) -> String {
default_node_name(release, index)
}
fn node_service_name(release: &str, index: usize) -> String {
default_node_name(release, index)
}
fn attach_node_service_selector(release: &str) -> String {
default_attach_node_service_selector(release)
}
async fn wait_for_node_http(
ports: &[u16],
role: &'static str,
host: &str,
timeout: Duration,
poll_interval: Duration,
requirement: HttpReadinessRequirement,
) -> Result<(), DynError> {
let _ = role;
let _ = timeout;
let _ = poll_interval;
wait_for_http_ports_with_host_and_requirement(
ports,
host,
<Self as K8sDeployEnv>::node_readiness_path(),
requirement,
)
.await?;
Ok(())
}
fn node_base_url(_client: &Self::NodeClient) -> Option<String> {
None
}
fn cfgsync_service(_release: &str) -> Option<(String, u16)> {
None
}
fn cfgsync_hostnames(release: &str, node_count: usize) -> Vec<String> {
(0..node_count)
.map(|index| Self::node_service_name(release, index))
.collect()
}
fn build_cfgsync_override_artifacts(
_deployment: &Self::Deployment,
_node_index: usize,
_hostnames: &[String],
_options: &testing_framework_core::scenario::StartNodeOptions<Self>,
) -> Result<Option<ArtifactSet>, DynError> {
Ok(None)
}
}
impl<E> K8sRuntime<E>
/// Common binary+config k8s path.
pub trait K8sBinaryApp: Application + StaticNodeConfigProvider + Sized
where
E: Application + StaticNodeConfigProvider,
E::Deployment: DeploymentDescriptor,
Self::Deployment: DeploymentDescriptor,
{
#[must_use]
pub fn binary_config(spec: BinaryConfigK8sSpec) -> Self {
let prepare_spec = spec.clone();
let name_prefix = spec.node_name_prefix.clone();
let container_http_port = spec.container_http_port;
let service_testing_port = spec.service_testing_port;
fn k8s_binary_spec() -> BinaryConfigK8sSpec;
Self::new(
K8sInstall::new(
move |topology: &E::Deployment| {
standard_port_specs(
topology.node_count(),
container_http_port,
service_testing_port,
)
},
move |topology, _metrics_otlp_ingest_url| {
let assets =
render_binary_config_node_chart_assets::<E>(topology, &prepare_spec)?;
Ok(Box::new(assets) as Box<dyn PreparedK8sStack>)
},
)
.with_node_name_prefix(name_prefix),
fn extend_k8s_manifest(
_deployment: &Self::Deployment,
_manifest: &mut HelmManifest,
) -> Result<(), DynError> {
Ok(())
}
fn build_node_clients(
host: &str,
node_api_ports: &[u16],
node_auxiliary_ports: &[u16],
) -> Result<Vec<Self::NodeClient>, DynError> {
node_api_ports
.iter()
.zip(node_auxiliary_ports.iter())
.map(|(&api_port, &auxiliary_port)| {
<Self as Application>::build_node_client(&discovered_node_access(
host,
api_port,
auxiliary_port,
))
})
.collect()
}
fn node_role() -> &'static str {
"node"
}
fn node_base_url(_client: &Self::NodeClient) -> Option<String> {
None
}
}
#[async_trait]
impl<T> K8sDeployEnv for T
where
T: K8sBinaryApp,
T::Deployment: DeploymentDescriptor,
{
type Assets = RenderedHelmChartAssets;
fn collect_port_specs(topology: &Self::Deployment) -> PortSpecs {
let spec = T::k8s_binary_spec();
standard_port_specs(
topology.node_count(),
spec.container_http_port,
spec.service_testing_port,
)
}
}
impl<E: Application> K8sInstall<E> {
#[must_use]
pub fn new<FP, FA>(collect_port_specs: FP, prepare_stack: FA) -> Self
where
FP: Fn(&E::Deployment) -> PortSpecs + Send + Sync + 'static,
FA: Fn(&E::Deployment, Option<&Url>) -> Result<Box<dyn PreparedK8sStack>, DynError>
+ Send
+ Sync
+ 'static,
{
Self {
collect_port_specs: Box::new(collect_port_specs),
prepare_stack: Box::new(prepare_stack),
cluster_identifiers: Box::new(default_cluster_identifiers),
node_deployment_name: Box::new(default_node_name),
node_service_name: Box::new(default_node_name),
attach_node_service_selector: Box::new(default_attach_node_service_selector),
}
fn prepare_assets(
topology: &Self::Deployment,
_metrics_otlp_ingest_url: Option<&Url>,
) -> Result<Self::Assets, DynError> {
let spec = T::k8s_binary_spec();
let mut manifest = HelmManifest::new();
manifest.push_raw_yaml(&render_binary_config_node_manifest::<T>(topology, &spec)?);
T::extend_k8s_manifest(topology, &mut manifest)?;
render_manifest_chart_assets(
&spec.chart_name,
&format!("{}.yaml", spec.chart_name),
&manifest,
)
}
#[must_use]
pub fn with_cluster_identifiers<F>(mut self, cluster_identifiers: F) -> Self
where
F: Fn() -> (String, String) + Send + Sync + 'static,
{
self.cluster_identifiers = Box::new(cluster_identifiers);
self
fn build_node_clients(
host: &str,
node_api_ports: &[u16],
node_auxiliary_ports: &[u16],
) -> Result<Vec<Self::NodeClient>, DynError> {
T::build_node_clients(host, node_api_ports, node_auxiliary_ports)
}
#[must_use]
pub fn with_node_name_prefix(mut self, prefix: impl Into<String>) -> Self {
let prefix = prefix.into();
self.node_deployment_name = Box::new(named_resource(prefix.clone()));
self.node_service_name = Box::new(named_resource(prefix));
self
fn node_role() -> &'static str {
T::node_role()
}
#[must_use]
pub fn with_resource_names<FD, FS>(
mut self,
node_deployment_name: FD,
node_service_name: FS,
) -> Self
where
FD: Fn(&str, usize) -> String + Send + Sync + 'static,
FS: Fn(&str, usize) -> String + Send + Sync + 'static,
{
self.node_deployment_name = Box::new(node_deployment_name);
self.node_service_name = Box::new(node_service_name);
self
fn node_base_url(client: &Self::NodeClient) -> Option<String> {
T::node_base_url(client)
}
#[must_use]
pub fn with_attach_node_service_selector<F>(mut self, attach_node_service_selector: F) -> Self
where
F: Fn(&str) -> String + Send + Sync + 'static,
{
self.attach_node_service_selector = Box::new(attach_node_service_selector);
self
}
}
impl<E: Application> Default for K8sAccess<E> {
fn default() -> Self {
Self {
build_node_clients: Box::new(default_build_node_clients::<E>),
readiness_path: E::node_readiness_path(),
node_role: "node",
node_base_url: Box::new(|_client| None),
}
}
}
impl<E: Application> K8sAccess<E> {
#[must_use]
pub fn new() -> Self {
Self::default()
fn node_deployment_name(_release: &str, index: usize) -> String {
format!("{}-{index}", T::k8s_binary_spec().node_name_prefix)
}
#[must_use]
pub fn with_node_clients<F>(mut self, build_node_clients: F) -> Self
where
F: Fn(&str, &[u16], &[u16]) -> Result<Vec<E::NodeClient>, DynError> + Send + Sync + 'static,
{
self.build_node_clients = Box::new(build_node_clients);
self
fn node_service_name(_release: &str, index: usize) -> String {
format!("{}-{index}", T::k8s_binary_spec().node_name_prefix)
}
#[must_use]
pub fn with_readiness_path(mut self, readiness_path: &'static str) -> Self {
self.readiness_path = readiness_path;
self
}
#[must_use]
pub fn with_node_role(mut self, node_role: &'static str) -> Self {
self.node_role = node_role;
self
}
#[must_use]
pub fn with_node_base_url<F>(mut self, node_base_url: F) -> Self
where
F: Fn(&E::NodeClient) -> Option<String> + Send + Sync + 'static,
{
self.node_base_url = Box::new(node_base_url);
self
}
}
impl<E: Application> Default for K8sManual<E> {
fn default() -> Self {
Self {
cfgsync_service: Box::new(|_release| None),
cfgsync_hostnames: None,
build_cfgsync_override_artifacts: Box::new(
|_topology, _node_index, _hostnames, _options| Ok(None),
),
}
}
}
impl<E: Application> K8sManual<E> {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_cfgsync_service<F>(mut self, cfgsync_service: F) -> Self
where
F: Fn(&str) -> Option<(String, u16)> + Send + Sync + 'static,
{
self.cfgsync_service = Box::new(cfgsync_service);
self
}
#[must_use]
pub fn with_cfgsync_hostnames<F>(mut self, cfgsync_hostnames: F) -> Self
where
F: Fn(&str, usize) -> Vec<String> + Send + Sync + 'static,
{
self.cfgsync_hostnames = Some(Box::new(cfgsync_hostnames));
self
}
#[must_use]
pub fn with_cfgsync_override_artifacts<F>(mut self, build_override_artifacts: F) -> Self
where
F: Fn(
&E::Deployment,
usize,
&[String],
&testing_framework_core::scenario::StartNodeOptions<E>,
) -> Result<Option<ArtifactSet>, DynError>
+ Send
+ Sync
+ 'static,
{
self.build_cfgsync_override_artifacts = Box::new(build_override_artifacts);
self
}
}
pub trait K8sDeployEnv: Application + Sized {
fn k8s_runtime() -> K8sRuntime<Self>;
}
pub(crate) fn runtime_for<E: K8sDeployEnv>() -> K8sRuntime<E> {
E::k8s_runtime()
}
pub(crate) fn collect_port_specs<E: K8sDeployEnv>(deployment: &E::Deployment) -> PortSpecs {
(runtime_for::<E>().install.collect_port_specs)(deployment)
E::collect_port_specs(deployment)
}
pub(crate) fn prepare_stack<E: K8sDeployEnv>(
deployment: &E::Deployment,
metrics_otlp_ingest_url: Option<&Url>,
) -> Result<Box<dyn PreparedK8sStack>, DynError> {
(runtime_for::<E>().install.prepare_stack)(deployment, metrics_otlp_ingest_url)
) -> Result<Box<dyn PreparedK8sStack>, DynError>
where
E::Assets: PreparedK8sStack + 'static,
{
Ok(Box::new(E::prepare_assets(
deployment,
metrics_otlp_ingest_url,
)?))
}
pub(crate) fn cluster_identifiers<E: K8sDeployEnv>() -> (String, String) {
(runtime_for::<E>().install.cluster_identifiers)()
E::cluster_identifiers()
}
pub(crate) fn build_node_clients<E: K8sDeployEnv>(
@ -616,44 +569,35 @@ pub(crate) fn build_node_clients<E: K8sDeployEnv>(
node_api_ports: &[u16],
node_auxiliary_ports: &[u16],
) -> Result<Vec<E::NodeClient>, DynError> {
(runtime_for::<E>().access.build_node_clients)(host, node_api_ports, node_auxiliary_ports)
E::build_node_clients(host, node_api_ports, node_auxiliary_ports)
}
pub(crate) fn node_readiness_path<E: K8sDeployEnv>() -> &'static str {
runtime_for::<E>().access.readiness_path
<E as K8sDeployEnv>::node_readiness_path()
}
pub(crate) async fn wait_remote_readiness<E: K8sDeployEnv>(
_deployment: &E::Deployment,
deployment: &E::Deployment,
urls: &[Url],
requirement: HttpReadinessRequirement,
) -> Result<(), DynError> {
let readiness_urls: Vec<_> = urls
.iter()
.map(|url| {
let mut endpoint = url.clone();
endpoint.set_path(node_readiness_path::<E>());
endpoint
})
.collect();
wait_http_readiness(&readiness_urls, requirement).await?;
Ok(())
E::wait_remote_readiness(deployment, urls, requirement).await
}
pub(crate) fn node_role<E: K8sDeployEnv>() -> &'static str {
runtime_for::<E>().access.node_role
E::node_role()
}
pub(crate) fn node_deployment_name<E: K8sDeployEnv>(release: &str, index: usize) -> String {
(runtime_for::<E>().install.node_deployment_name)(release, index)
E::node_deployment_name(release, index)
}
pub(crate) fn node_service_name<E: K8sDeployEnv>(release: &str, index: usize) -> String {
(runtime_for::<E>().install.node_service_name)(release, index)
E::node_service_name(release, index)
}
pub(crate) fn attach_node_service_selector<E: K8sDeployEnv>(release: &str) -> String {
(runtime_for::<E>().install.attach_node_service_selector)(release)
E::attach_node_service_selector(release)
}
pub(crate) async fn wait_for_node_http<E: K8sDeployEnv>(
@ -664,36 +608,19 @@ pub(crate) async fn wait_for_node_http<E: K8sDeployEnv>(
poll_interval: Duration,
requirement: HttpReadinessRequirement,
) -> Result<(), DynError> {
let _ = role;
let _ = timeout;
let _ = poll_interval;
wait_for_http_ports_with_host_and_requirement(
ports,
host,
node_readiness_path::<E>(),
requirement,
)
.await?;
Ok(())
E::wait_for_node_http(ports, role, host, timeout, poll_interval, requirement).await
}
pub(crate) fn node_base_url<E: K8sDeployEnv>(client: &E::NodeClient) -> Option<String> {
(runtime_for::<E>().access.node_base_url)(client)
E::node_base_url(client)
}
pub(crate) fn cfgsync_service<E: K8sDeployEnv>(release: &str) -> Option<(String, u16)> {
(runtime_for::<E>().manual.cfgsync_service)(release)
E::cfgsync_service(release)
}
pub(crate) fn cfgsync_hostnames<E: K8sDeployEnv>(release: &str, node_count: usize) -> Vec<String> {
let runtime = runtime_for::<E>();
if let Some(cfgsync_hostnames) = runtime.manual.cfgsync_hostnames {
return cfgsync_hostnames(release, node_count);
}
(0..node_count)
.map(|index| (runtime.install.node_service_name)(release, index))
.collect()
E::cfgsync_hostnames(release, node_count)
}
pub(crate) fn build_cfgsync_override_artifacts<E: K8sDeployEnv>(
@ -702,9 +629,7 @@ pub(crate) fn build_cfgsync_override_artifacts<E: K8sDeployEnv>(
hostnames: &[String],
options: &testing_framework_core::scenario::StartNodeOptions<E>,
) -> Result<Option<ArtifactSet>, DynError> {
(runtime_for::<E>().manual.build_cfgsync_override_artifacts)(
deployment, node_index, hostnames, options,
)
E::build_cfgsync_override_artifacts(deployment, node_index, hostnames, options)
}
fn default_cluster_identifiers() -> (String, String) {
@ -716,24 +641,6 @@ fn default_cluster_identifiers() -> (String, String) {
(format!("tf-testnet-{suffix}"), String::from("tf-runner"))
}
fn default_build_node_clients<E: Application>(
host: &str,
node_api_ports: &[u16],
node_auxiliary_ports: &[u16],
) -> Result<Vec<E::NodeClient>, DynError> {
node_api_ports
.iter()
.zip(node_auxiliary_ports.iter())
.map(|(&api_port, &auxiliary_port)| {
<E as Application>::build_node_client(&discovered_node_access(
host,
api_port,
auxiliary_port,
))
})
.collect()
}
fn default_node_name(release: &str, index: usize) -> String {
format!("{release}-node-{index}")
}
@ -741,7 +648,3 @@ fn default_node_name(release: &str, index: usize) -> String {
fn default_attach_node_service_selector(release: &str) -> String {
format!("app.kubernetes.io/instance={release}")
}
fn named_resource(prefix: String) -> impl Fn(&str, usize) -> String + Send + Sync + 'static {
move |_release, index| format!("{prefix}-{index}")
}

View File

@ -23,8 +23,8 @@ pub(crate) fn ensure_rustls_provider_installed() {
pub use deployer::{K8sDeployer, K8sDeploymentMetadata, K8sRunnerError};
pub use env::{
BinaryConfigK8sSpec, HelmManifest, HelmReleaseAssets, K8sAccess, K8sDeployEnv, K8sInstall,
K8sManual, K8sRuntime, PreparedK8sStack, RenderedHelmChartAssets, discovered_node_access,
BinaryConfigK8sSpec, HelmManifest, HelmReleaseAssets, K8sBinaryApp, K8sDeployEnv,
PreparedK8sStack, RenderedHelmChartAssets, discovered_node_access,
install_helm_release_with_cleanup, render_binary_config_node_chart_assets,
render_binary_config_node_manifest, render_manifest_chart_assets,
render_single_template_chart_assets, standard_port_specs,

View File

@ -684,25 +684,33 @@ mod tests {
#[async_trait::async_trait]
impl K8sDeployEnv for DummyEnv {
fn k8s_runtime() -> crate::env::K8sRuntime<Self> {
crate::env::K8sRuntime::new(crate::env::K8sInstall::new(
|_topology| standard_port_specs(1, 8080, 8081),
|_topology, _metrics_otlp_ingest_url| {
let assets: RenderedHelmChartAssets =
render_single_template_chart_assets("dummy", "dummy.yaml", "")?;
Ok(Box::new(assets) as Box<dyn crate::env::PreparedK8sStack>)
},
))
.with_manual(
crate::env::K8sManual::new()
.with_cfgsync_service(|release| Some((format!("{release}-cfgsync"), 4400)))
.with_cfgsync_override_artifacts(|topology, node_index, hostnames, options| {
build_node_artifact_override::<Self>(
topology, node_index, hostnames, options,
)
.map_err(Into::into)
}),
)
type Assets = RenderedHelmChartAssets;
fn collect_port_specs(
_topology: &Self::Deployment,
) -> crate::infrastructure::cluster::PortSpecs {
standard_port_specs(1, 8080, 8081)
}
fn prepare_assets(
_topology: &Self::Deployment,
_metrics_otlp_ingest_url: Option<&reqwest::Url>,
) -> Result<Self::Assets, DynError> {
render_single_template_chart_assets("dummy", "dummy.yaml", "")
}
fn cfgsync_service(release: &str) -> Option<(String, u16)> {
Some((format!("{release}-cfgsync"), 4400))
}
fn build_cfgsync_override_artifacts(
topology: &Self::Deployment,
node_index: usize,
hostnames: &[String],
options: &testing_framework_core::scenario::StartNodeOptions<Self>,
) -> Result<Option<cfgsync_artifacts::ArtifactSet>, DynError> {
build_node_artifact_override::<Self>(topology, node_index, hostnames, options)
.map_err(Into::into)
}
}

View File

@ -1,5 +1,12 @@
use std::{
collections::HashMap,
net::{Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
};
use async_trait::async_trait;
use testing_framework_core::scenario::{
Application, DynError, HttpReadinessRequirement, ReadinessError,
Application, DynError, HttpReadinessRequirement, ReadinessError, StartNodeOptions,
wait_for_http_ports_with_requirement,
};
@ -9,7 +16,6 @@ use crate::{
};
mod helpers;
mod runtime;
#[cfg(test)]
mod tests;
@ -20,108 +26,405 @@ pub use helpers::{
reserve_local_node_ports, single_http_node_endpoints, text_config_launch_spec,
text_node_config, yaml_config_launch_spec, yaml_node_config,
};
pub use runtime::{
LocalAccess, LocalBuildContext, LocalLifecycle, LocalProcess, LocalRuntime,
LocalStableReadinessFuture, cluster_node_config_from_context,
};
/// Context passed while building a local node config.
pub struct LocalBuildContext<'a, E: Application> {
pub topology: &'a E::Deployment,
pub index: usize,
pub ports: &'a LocalNodePorts,
pub peers: &'a [LocalPeerNode],
pub peer_ports: &'a [u16],
pub peer_ports_by_name: &'a HashMap<String, u16>,
pub options: &'a StartNodeOptions<E>,
pub template_config: Option<&'a E::NodeConfig>,
}
/// Spawned local process node for a concrete application environment.
pub type Node<E> = ProcessNode<<E as Application>::NodeConfig, <E as Application>::NodeClient>;
/// Advanced local deployer integration.
///
/// This is the full-control path. It exposes runner-facing hooks directly and
/// is intended for applications that need custom startup, endpoint discovery,
/// or lifecycle behavior.
#[async_trait]
pub trait LocalDeployerEnv: Application + Sized
where
<Self as Application>::NodeConfig: Clone + Send + Sync + 'static,
{
fn local_runtime() -> LocalRuntime<Self>;
fn local_port_names() -> &'static [&'static str] {
Self::initial_local_port_names()
}
fn build_node_config(
topology: &Self::Deployment,
index: usize,
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
peer_ports: &[u16],
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
Self::build_node_config_from_template(
topology,
index,
peer_ports_by_name,
options,
peer_ports,
None,
)
}
fn build_node_config_from_template(
topology: &Self::Deployment,
index: usize,
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
peer_ports: &[u16],
template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
let mut reserved = reserve_local_node_ports(1, Self::local_port_names(), "node")
.map_err(|source| -> DynError { source.into() })?;
let ports = reserved
.pop()
.ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?;
let network_port = ports.network_port();
let config = Self::build_local_node_config(
topology,
index,
&ports,
peer_ports_by_name,
options,
peer_ports,
template_config,
)?;
Ok(BuiltNodeConfig {
config,
network_port,
})
}
fn build_initial_node_configs(
topology: &Self::Deployment,
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError> {
helpers::build_generated_initial_nodes::<Self>(
topology,
Self::initial_node_name_prefix(),
Self::initial_local_port_names(),
|context| {
Self::build_node_config_from_template(
context.topology,
context.index,
context.peer_ports_by_name,
context.options,
context.peer_ports,
context.template_config,
)
},
)
}
fn initial_node_name_prefix() -> &'static str {
"node"
}
fn initial_local_port_names() -> &'static [&'static str] {
&[]
}
fn build_initial_node_config(
topology: &Self::Deployment,
index: usize,
ports: &LocalNodePorts,
peer_ports: &[u16],
) -> Result<<Self as Application>::NodeConfig, DynError> {
let peer_ports = helpers::compact_peer_ports(peer_ports, index);
let peer_ports_by_name = HashMap::new();
let options = StartNodeOptions::<Self>::default();
Self::build_local_node_config(
topology,
index,
ports,
&peer_ports_by_name,
&options,
&peer_ports,
None,
)
}
fn build_local_node_config(
topology: &Self::Deployment,
index: usize,
ports: &LocalNodePorts,
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
peer_ports: &[u16],
template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<<Self as Application>::NodeConfig, DynError> {
let peers = build_local_peer_nodes(peer_ports, index);
Self::build_local_node_config_with_peers(
topology,
index,
ports,
&peers,
peer_ports_by_name,
options,
template_config,
)
}
fn build_local_node_config_with_peers(
_topology: &Self::Deployment,
_index: usize,
_ports: &LocalNodePorts,
_peers: &[LocalPeerNode],
_peer_ports_by_name: &HashMap<String, u16>,
_options: &StartNodeOptions<Self>,
_template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<<Self as Application>::NodeConfig, DynError> {
Err(std::io::Error::other(
"build_local_node_config_with_peers is not implemented for this app",
)
.into())
}
fn initial_persist_dir(
_topology: &Self::Deployment,
_node_name: &str,
_index: usize,
) -> Option<PathBuf> {
None
}
fn initial_snapshot_dir(
_topology: &Self::Deployment,
_node_name: &str,
_index: usize,
) -> Option<PathBuf> {
None
}
fn local_process_spec() -> Option<LocalProcessSpec> {
None
}
fn render_local_config(
_config: &<Self as Application>::NodeConfig,
) -> Result<Vec<u8>, DynError> {
Err(std::io::Error::other("render_local_config is not implemented for this app").into())
}
fn build_launch_spec(
config: &<Self as Application>::NodeConfig,
_dir: &Path,
_label: &str,
) -> Result<LaunchSpec, DynError> {
let spec = Self::local_process_spec().ok_or_else(|| {
std::io::Error::other("build_launch_spec is not implemented for this app")
})?;
let rendered = Self::render_local_config(config)?;
helpers::rendered_config_launch_spec(rendered, &spec)
}
fn http_api_port(_config: &<Self as Application>::NodeConfig) -> Option<u16> {
None
}
fn node_endpoints(
config: &<Self as Application>::NodeConfig,
) -> Result<NodeEndpoints, DynError> {
if let Some(port) = Self::http_api_port(config) {
return Ok(NodeEndpoints {
api: SocketAddr::from((Ipv4Addr::LOCALHOST, port)),
extra_ports: HashMap::new(),
});
}
Err(std::io::Error::other("node_endpoints is not implemented for this app").into())
}
fn node_peer_port(node: &Node<Self>) -> u16 {
node.endpoints().api.port()
}
fn node_client_from_api_endpoint(_api: SocketAddr) -> Option<Self::NodeClient> {
None
}
fn node_client(endpoints: &NodeEndpoints) -> Result<Self::NodeClient, DynError> {
if let Ok(client) =
<Self as Application>::build_node_client(&discovered_node_access(endpoints))
{
return Ok(client);
}
if let Some(client) = Self::node_client_from_api_endpoint(endpoints.api) {
return Ok(client);
}
Err(std::io::Error::other("node_client is not implemented for this app").into())
}
fn readiness_endpoint_path() -> &'static str {
<Self as Application>::node_readiness_path()
}
async fn wait_readiness_stable(_nodes: &[Node<Self>]) -> Result<(), DynError> {
Ok(())
}
}
pub(crate) fn runtime_for<E: LocalDeployerEnv>() -> LocalRuntime<E> {
E::local_runtime()
/// Common local binary-app path.
///
/// This is the compact path for apps that:
/// - launch one local binary per node
/// - materialize one config file per node
/// - expose an HTTP API port used for readiness and discovery
#[async_trait]
pub trait LocalBinaryApp: Application + Sized
where
<Self as Application>::NodeConfig: Clone + Send + Sync + 'static,
{
fn initial_node_name_prefix() -> &'static str;
fn initial_local_port_names() -> &'static [&'static str] {
&[]
}
fn build_local_node_config_with_peers(
topology: &Self::Deployment,
index: usize,
ports: &LocalNodePorts,
peers: &[LocalPeerNode],
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<<Self as Application>::NodeConfig, DynError>;
fn local_process_spec() -> LocalProcessSpec;
fn render_local_config(config: &<Self as Application>::NodeConfig)
-> Result<Vec<u8>, DynError>;
fn http_api_port(config: &<Self as Application>::NodeConfig) -> u16;
fn readiness_endpoint_path() -> &'static str {
<Self as Application>::node_readiness_path()
}
async fn wait_readiness_stable(_nodes: &[Node<Self>]) -> Result<(), DynError> {
Ok(())
}
}
#[async_trait]
impl<T> LocalDeployerEnv for T
where
T: LocalBinaryApp,
<T as Application>::NodeConfig: Clone + Send + Sync + 'static,
{
fn initial_node_name_prefix() -> &'static str {
T::initial_node_name_prefix()
}
fn initial_local_port_names() -> &'static [&'static str] {
T::initial_local_port_names()
}
fn build_local_node_config_with_peers(
topology: &Self::Deployment,
index: usize,
ports: &LocalNodePorts,
peers: &[LocalPeerNode],
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<<Self as Application>::NodeConfig, DynError> {
T::build_local_node_config_with_peers(
topology,
index,
ports,
peers,
peer_ports_by_name,
options,
template_config,
)
}
fn local_process_spec() -> Option<LocalProcessSpec> {
Some(T::local_process_spec())
}
fn render_local_config(
config: &<Self as Application>::NodeConfig,
) -> Result<Vec<u8>, DynError> {
T::render_local_config(config)
}
fn http_api_port(config: &<Self as Application>::NodeConfig) -> Option<u16> {
Some(T::http_api_port(config))
}
fn readiness_endpoint_path() -> &'static str {
T::readiness_endpoint_path()
}
async fn wait_readiness_stable(nodes: &[Node<Self>]) -> Result<(), DynError> {
T::wait_readiness_stable(nodes).await
}
}
pub(crate) fn build_node_from_template<E: LocalDeployerEnv>(
topology: &E::Deployment,
index: usize,
peer_ports_by_name: &std::collections::HashMap<String, u16>,
options: &testing_framework_core::scenario::StartNodeOptions<E>,
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<E>,
peer_ports: &[u16],
template_config: Option<&E::NodeConfig>,
) -> Result<BuiltNodeConfig<E::NodeConfig>, DynError> {
let runtime = runtime_for::<E>();
let mut reserved = reserve_local_node_ports(1, runtime.process.port_names, "node")
.map_err(|source| -> DynError { source.into() })?;
let ports = reserved
.pop()
.ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?;
let peers = build_local_peer_nodes(peer_ports, index);
runtime.process.build_node(LocalBuildContext {
E::build_node_config_from_template(
topology,
index,
ports: &ports,
peers: &peers,
peer_ports,
peer_ports_by_name,
options,
peer_ports,
template_config,
})
)
}
pub(crate) fn build_initial_node_configs<E: LocalDeployerEnv>(
topology: &E::Deployment,
) -> Result<Vec<NodeConfigEntry<E::NodeConfig>>, ProcessSpawnError> {
runtime_for::<E>().process.build_initial_nodes(topology)
E::build_initial_node_configs(topology)
}
pub(crate) fn initial_persist_dir<E: LocalDeployerEnv>(
topology: &E::Deployment,
node_name: &str,
index: usize,
) -> Option<std::path::PathBuf> {
runtime_for::<E>()
.lifecycle
.initial_persist_dir(topology, node_name, index)
) -> Option<PathBuf> {
E::initial_persist_dir(topology, node_name, index)
}
pub(crate) fn initial_snapshot_dir<E: LocalDeployerEnv>(
topology: &E::Deployment,
node_name: &str,
index: usize,
) -> Option<std::path::PathBuf> {
runtime_for::<E>()
.lifecycle
.initial_snapshot_dir(topology, node_name, index)
}
pub(crate) fn build_launch_spec<E: LocalDeployerEnv>(
config: &E::NodeConfig,
dir: &std::path::Path,
label: &str,
) -> Result<LaunchSpec, DynError> {
runtime_for::<E>()
.process
.build_launch_spec(config, dir, label)
}
pub(crate) fn node_endpoints<E: LocalDeployerEnv>(
config: &E::NodeConfig,
) -> Result<NodeEndpoints, DynError> {
runtime_for::<E>().access.node_endpoints(config)
) -> Option<PathBuf> {
E::initial_snapshot_dir(topology, node_name, index)
}
pub(crate) fn node_client<E: LocalDeployerEnv>(
endpoints: &NodeEndpoints,
) -> Result<E::NodeClient, DynError> {
runtime_for::<E>().access.node_client(endpoints)
E::node_client(endpoints)
}
pub(crate) fn node_peer_port<E: LocalDeployerEnv>(node: &Node<E>) -> u16 {
runtime_for::<E>()
.access
.node_peer_port(node.config(), node.endpoints())
E::node_peer_port(node)
}
pub(crate) fn readiness_endpoint_path<E: LocalDeployerEnv>() -> &'static str {
runtime_for::<E>().access.readiness_path()
E::readiness_endpoint_path()
}
pub async fn wait_local_http_readiness<E: LocalDeployerEnv>(
@ -133,12 +436,9 @@ pub async fn wait_local_http_readiness<E: LocalDeployerEnv>(
.map(|node| node.endpoints().api.port())
.collect();
wait_for_http_ports_with_requirement(&ports, readiness_endpoint_path::<E>(), requirement)
.await?;
wait_for_http_ports_with_requirement(&ports, E::readiness_endpoint_path(), requirement).await?;
runtime_for::<E>()
.lifecycle
.wait_stable(nodes)
E::wait_readiness_stable(nodes)
.await
.map_err(|source| ReadinessError::ClusterStable { source })
}
@ -153,12 +453,12 @@ pub async fn spawn_node_from_config<E: LocalDeployerEnv>(
ProcessNode::spawn(
&label,
config,
build_launch_spec::<E>,
node_endpoints::<E>,
E::build_launch_spec,
E::node_endpoints,
keep_tempdir,
persist_dir,
snapshot_dir,
node_client::<E>,
E::node_client,
)
.await
}

View File

@ -1,438 +0,0 @@
use std::{
collections::HashMap,
future::Future,
path::{Path, PathBuf},
pin::Pin,
};
use serde::Serialize;
use testing_framework_core::scenario::{
Application, ClusterNodeConfigApplication, DynError, NodeAccess, StartNodeOptions,
};
use crate::{
env::{
BuiltNodeConfig, LocalNodePorts, LocalPeerNode, LocalProcessSpec, Node, NodeConfigEntry,
NodeEndpoints, build_local_cluster_node_config, discovered_node_access, yaml_node_config,
},
process::{LaunchEnvVar, LaunchSpec, ProcessSpawnError},
};
pub struct LocalBuildContext<'a, E: Application> {
pub topology: &'a E::Deployment,
pub index: usize,
pub ports: &'a LocalNodePorts,
pub peers: &'a [LocalPeerNode],
pub peer_ports: &'a [u16],
pub peer_ports_by_name: &'a HashMap<String, u16>,
pub options: &'a StartNodeOptions<E>,
pub template_config: Option<&'a E::NodeConfig>,
}
pub type LocalConfigBuilder<E> =
for<'a> fn(LocalBuildContext<'a, E>) -> Result<<E as Application>::NodeConfig, DynError>;
pub type LocalDynamicNodeBuilder<E> =
for<'a> fn(
LocalBuildContext<'a, E>,
) -> Result<BuiltNodeConfig<<E as Application>::NodeConfig>, DynError>;
pub type LocalConfigRenderer<E> = fn(&<E as Application>::NodeConfig) -> Result<Vec<u8>, DynError>;
pub type LocalInitialNodesBuilder<E> =
fn(
&<E as Application>::Deployment,
) -> Result<Vec<NodeConfigEntry<<E as Application>::NodeConfig>>, ProcessSpawnError>;
pub type LocalLaunchSpecBuilder<E> =
fn(&<E as Application>::NodeConfig, &Path, &str) -> Result<LaunchSpec, DynError>;
pub type LocalApiPort<E> = fn(&<E as Application>::NodeConfig) -> u16;
pub type LocalEndpoints<E> = fn(&<E as Application>::NodeConfig) -> Result<NodeEndpoints, DynError>;
pub type LocalClientBuilder<E> =
fn(&NodeAccess) -> Result<<E as Application>::NodeClient, DynError>;
pub type LocalPeerPort<E> = fn(&<E as Application>::NodeConfig, &NodeEndpoints) -> u16;
pub type LocalPersistDir<E> = fn(&<E as Application>::Deployment, &str, usize) -> Option<PathBuf>;
pub type LocalSnapshotDir<E> = fn(&<E as Application>::Deployment, &str, usize) -> Option<PathBuf>;
pub type LocalStableReadinessFuture<'a> =
Pin<Box<dyn Future<Output = Result<(), DynError>> + Send + 'a>>;
pub type LocalStableReadiness<E> = for<'a> fn(&'a [Node<E>]) -> LocalStableReadinessFuture<'a>;
#[derive(Clone)]
enum LocalDynamicNode<E: Application> {
Standard { build_config: LocalConfigBuilder<E> },
Custom(LocalDynamicNodeBuilder<E>),
}
impl<E: Application> LocalDynamicNode<E> {
fn build(
&self,
context: LocalBuildContext<'_, E>,
) -> Result<BuiltNodeConfig<E::NodeConfig>, DynError> {
match self {
Self::Standard { build_config } => {
let network_port = context.ports.network_port();
Ok(BuiltNodeConfig {
config: build_config(context)?,
network_port,
})
}
Self::Custom(build) => build(context),
}
}
}
#[derive(Clone)]
enum LocalInitialNodes<E: Application> {
Generated,
Custom(LocalInitialNodesBuilder<E>),
}
#[derive(Clone)]
enum LocalLaunch<E: Application> {
Rendered {
spec: LocalProcessSpec,
render_config: LocalConfigRenderer<E>,
},
Custom(LocalLaunchSpecBuilder<E>),
}
#[derive(Clone)]
pub struct LocalProcess<E: Application> {
pub(crate) node_name_prefix: &'static str,
pub(crate) port_names: &'static [&'static str],
dynamic_node: LocalDynamicNode<E>,
initial_nodes: LocalInitialNodes<E>,
launch: LocalLaunch<E>,
}
impl<E: Application> LocalProcess<E> {
#[must_use]
pub fn new(
binary_env_var: &'static str,
binary_name: &'static str,
build_config: LocalConfigBuilder<E>,
render_config: LocalConfigRenderer<E>,
) -> Self {
Self {
node_name_prefix: "node",
port_names: &[],
dynamic_node: LocalDynamicNode::Standard { build_config },
initial_nodes: LocalInitialNodes::Generated,
launch: LocalLaunch::Rendered {
spec: LocalProcessSpec::new(binary_env_var, binary_name),
render_config,
},
}
}
#[must_use]
pub fn custom(
build_node: LocalDynamicNodeBuilder<E>,
build_launch_spec: LocalLaunchSpecBuilder<E>,
) -> Self {
Self {
node_name_prefix: "node",
port_names: &[],
dynamic_node: LocalDynamicNode::Custom(build_node),
initial_nodes: LocalInitialNodes::Generated,
launch: LocalLaunch::Custom(build_launch_spec),
}
}
#[must_use]
pub fn with_node_name_prefix(mut self, value: &'static str) -> Self {
self.node_name_prefix = value;
self
}
#[must_use]
pub fn with_port_names(mut self, value: &'static [&'static str]) -> Self {
self.port_names = value;
self
}
#[must_use]
pub fn with_initial_nodes(mut self, build_initial_nodes: LocalInitialNodesBuilder<E>) -> Self {
self.initial_nodes = LocalInitialNodes::Custom(build_initial_nodes);
self
}
#[must_use]
pub fn with_config_file(mut self, file_name: &str, arg: &str) -> Self {
if let LocalLaunch::Rendered { spec, .. } = &mut self.launch {
*spec = spec.clone().with_config_file(file_name, arg);
}
self
}
#[must_use]
pub fn with_env(mut self, key: &str, value: &str) -> Self {
if let LocalLaunch::Rendered { spec, .. } = &mut self.launch {
*spec = spec.clone().with_env(key, value);
}
self
}
#[must_use]
pub fn with_rust_log(mut self, value: &str) -> Self {
if let LocalLaunch::Rendered { spec, .. } = &mut self.launch {
*spec = spec.clone().with_rust_log(value);
}
self
}
#[must_use]
pub fn with_args(mut self, args: impl IntoIterator<Item = String>) -> Self {
if let LocalLaunch::Rendered { spec, .. } = &mut self.launch {
*spec = spec.clone().with_args(args);
}
self
}
#[must_use]
pub fn with_launch_env(mut self, vars: impl IntoIterator<Item = LaunchEnvVar>) -> Self {
if let LocalLaunch::Rendered { spec, .. } = &mut self.launch {
spec.env.extend(vars);
}
self
}
pub(crate) fn build_node(
&self,
context: LocalBuildContext<'_, E>,
) -> Result<BuiltNodeConfig<E::NodeConfig>, DynError> {
self.dynamic_node.build(context)
}
pub(crate) fn build_initial_nodes(
&self,
topology: &E::Deployment,
) -> Result<Vec<NodeConfigEntry<E::NodeConfig>>, ProcessSpawnError>
where
E::NodeConfig: Clone,
{
match self.initial_nodes {
LocalInitialNodes::Generated => super::helpers::build_generated_initial_nodes::<E>(
topology,
self.node_name_prefix,
self.port_names,
|context| self.build_node(context),
),
LocalInitialNodes::Custom(build) => build(topology),
}
}
pub(crate) fn build_launch_spec(
&self,
config: &E::NodeConfig,
dir: &Path,
label: &str,
) -> Result<LaunchSpec, DynError> {
match &self.launch {
LocalLaunch::Rendered {
spec,
render_config,
} => super::helpers::rendered_config_launch_spec(render_config(config)?, spec),
LocalLaunch::Custom(build) => build(config, dir, label),
}
}
}
impl<E> LocalProcess<E>
where
E: Application,
E::NodeConfig: Serialize,
{
#[must_use]
pub fn yaml(
binary_env_var: &'static str,
binary_name: &'static str,
build_config: LocalConfigBuilder<E>,
) -> Self {
Self::new(
binary_env_var,
binary_name,
build_config,
yaml_node_config::<E::NodeConfig>,
)
}
}
#[derive(Clone)]
pub struct LocalAccess<E: Application> {
api_port: Option<LocalApiPort<E>>,
endpoints: Option<LocalEndpoints<E>>,
client: Option<LocalClientBuilder<E>>,
peer_port: Option<LocalPeerPort<E>>,
readiness_path: &'static str,
}
impl<E: Application> LocalAccess<E> {
#[must_use]
pub fn http(api_port: LocalApiPort<E>) -> Self {
Self {
api_port: Some(api_port),
endpoints: None,
client: None,
peer_port: None,
readiness_path: E::node_readiness_path(),
}
}
#[must_use]
pub fn custom(endpoints: LocalEndpoints<E>) -> Self {
Self {
api_port: None,
endpoints: Some(endpoints),
client: None,
peer_port: None,
readiness_path: E::node_readiness_path(),
}
}
#[must_use]
pub fn with_client(mut self, client: LocalClientBuilder<E>) -> Self {
self.client = Some(client);
self
}
#[must_use]
pub fn with_peer_port(mut self, peer_port: LocalPeerPort<E>) -> Self {
self.peer_port = Some(peer_port);
self
}
#[must_use]
pub fn with_readiness_path(mut self, readiness_path: &'static str) -> Self {
self.readiness_path = readiness_path;
self
}
pub(crate) fn node_endpoints(&self, config: &E::NodeConfig) -> Result<NodeEndpoints, DynError> {
if let Some(endpoints) = self.endpoints {
return endpoints(config);
}
if let Some(api_port) = self.api_port {
return Ok(NodeEndpoints::from_api_port(api_port(config)));
}
Err(std::io::Error::other("node endpoints are not configured").into())
}
pub(crate) fn node_client(&self, endpoints: &NodeEndpoints) -> Result<E::NodeClient, DynError> {
if let Some(client) = self.client {
return client(&discovered_node_access(endpoints));
}
E::build_node_client(&discovered_node_access(endpoints))
}
pub(crate) fn node_peer_port(&self, config: &E::NodeConfig, endpoints: &NodeEndpoints) -> u16 {
self.peer_port
.map(|peer_port| peer_port(config, endpoints))
.unwrap_or_else(|| endpoints.api.port())
}
pub(crate) fn readiness_path(&self) -> &'static str {
self.readiness_path
}
}
#[derive(Clone)]
pub struct LocalLifecycle<E: Application> {
initial_persist_dir: Option<LocalPersistDir<E>>,
initial_snapshot_dir: Option<LocalSnapshotDir<E>>,
stable_readiness: Option<LocalStableReadiness<E>>,
}
impl<E: Application> LocalLifecycle<E> {
#[must_use]
pub fn new() -> Self {
Self {
initial_persist_dir: None,
initial_snapshot_dir: None,
stable_readiness: None,
}
}
#[must_use]
pub fn with_initial_persist_dir(mut self, persist_dir: LocalPersistDir<E>) -> Self {
self.initial_persist_dir = Some(persist_dir);
self
}
#[must_use]
pub fn with_initial_snapshot_dir(mut self, snapshot_dir: LocalSnapshotDir<E>) -> Self {
self.initial_snapshot_dir = Some(snapshot_dir);
self
}
#[must_use]
pub fn with_stable_readiness(mut self, stable_readiness: LocalStableReadiness<E>) -> Self {
self.stable_readiness = Some(stable_readiness);
self
}
pub(crate) fn initial_persist_dir(
&self,
topology: &E::Deployment,
node_name: &str,
index: usize,
) -> Option<PathBuf> {
self.initial_persist_dir
.and_then(|persist_dir| persist_dir(topology, node_name, index))
}
pub(crate) fn initial_snapshot_dir(
&self,
topology: &E::Deployment,
node_name: &str,
index: usize,
) -> Option<PathBuf> {
self.initial_snapshot_dir
.and_then(|snapshot_dir| snapshot_dir(topology, node_name, index))
}
pub(crate) async fn wait_stable(&self, nodes: &[Node<E>]) -> Result<(), DynError> {
if let Some(stable_readiness) = self.stable_readiness {
return stable_readiness(nodes).await;
}
Ok(())
}
}
#[derive(Clone)]
pub struct LocalRuntime<E: Application> {
pub(crate) process: LocalProcess<E>,
pub(crate) access: LocalAccess<E>,
pub(crate) lifecycle: LocalLifecycle<E>,
}
impl<E: Application> LocalRuntime<E> {
#[must_use]
pub fn new(process: LocalProcess<E>, access: LocalAccess<E>) -> Self {
Self {
process,
access,
lifecycle: LocalLifecycle::new(),
}
}
#[must_use]
pub fn with_lifecycle(mut self, lifecycle: LocalLifecycle<E>) -> Self {
self.lifecycle = lifecycle;
self
}
}
pub fn cluster_node_config_from_context<E>(
context: LocalBuildContext<'_, E>,
) -> Result<<E as Application>::NodeConfig, DynError>
where
E: Application + ClusterNodeConfigApplication,
{
build_local_cluster_node_config::<E>(context.index, context.ports, context.peers)
}

View File

@ -56,26 +56,51 @@ impl Application for DummyEnv {
}
}
#[async_trait::async_trait]
impl LocalDeployerEnv for DummyEnv {
fn local_runtime() -> LocalRuntime<Self> {
LocalRuntime::new(
LocalProcess::custom(build_dummy_node, build_dummy_launch_spec)
.with_initial_nodes(build_dummy_initial_nodes),
LocalAccess::custom(dummy_endpoints).with_client(|_| Ok(())),
)
.with_lifecycle(LocalLifecycle::new().with_stable_readiness(dummy_wait_stable))
fn build_node_config(
_topology: &Self::Deployment,
_index: usize,
_peer_ports_by_name: &std::collections::HashMap<String, u16>,
_options: &testing_framework_core::scenario::StartNodeOptions<Self>,
_peer_ports: &[u16],
) -> Result<BuiltNodeConfig<DummyConfig>, DynError> {
build_dummy_node()
}
fn build_initial_node_configs(
_topology: &Self::Deployment,
) -> Result<Vec<NodeConfigEntry<DummyConfig>>, crate::process::ProcessSpawnError> {
build_dummy_initial_nodes()
}
fn build_launch_spec(
config: &DummyConfig,
dir: &std::path::Path,
label: &str,
) -> Result<crate::process::LaunchSpec, DynError> {
build_dummy_launch_spec(config, dir, label)
}
fn node_endpoints(_config: &DummyConfig) -> Result<NodeEndpoints, DynError> {
dummy_endpoints()
}
fn node_client(_endpoints: &NodeEndpoints) -> Result<Self::NodeClient, DynError> {
Ok(())
}
async fn wait_readiness_stable(_nodes: &[Node<Self>]) -> Result<(), DynError> {
dummy_wait_stable().await
}
}
fn build_dummy_node(
_context: LocalBuildContext<'_, DummyEnv>,
) -> Result<BuiltNodeConfig<DummyConfig>, DynError> {
fn build_dummy_node() -> Result<BuiltNodeConfig<DummyConfig>, DynError> {
unreachable!("not used in this test")
}
fn build_dummy_initial_nodes(
_topology: &DummyTopology,
) -> Result<Vec<NodeConfigEntry<DummyConfig>>, crate::process::ProcessSpawnError> {
fn build_dummy_initial_nodes()
-> Result<Vec<NodeConfigEntry<DummyConfig>>, crate::process::ProcessSpawnError> {
unreachable!("not used in this test")
}
@ -87,15 +112,13 @@ fn build_dummy_launch_spec(
Ok(crate::process::LaunchSpec::default())
}
fn dummy_endpoints(_config: &DummyConfig) -> Result<NodeEndpoints, DynError> {
fn dummy_endpoints() -> Result<NodeEndpoints, DynError> {
Ok(NodeEndpoints::default())
}
fn dummy_wait_stable<'a>(_nodes: &'a [Node<DummyEnv>]) -> runtime::LocalStableReadinessFuture<'a> {
Box::pin(async {
STABLE_CALLS.fetch_add(1, Ordering::SeqCst);
Ok(())
})
async fn dummy_wait_stable() -> Result<(), DynError> {
STABLE_CALLS.fetch_add(1, Ordering::SeqCst);
Ok(())
}
#[tokio::test]

View File

@ -9,13 +9,12 @@ pub mod process;
pub use binary::{BinaryConfig, BinaryResolver};
pub use deployer::{ProcessDeployer, ProcessDeployerError};
pub use env::{
BuiltNodeConfig, LocalAccess, LocalBuildContext, LocalDeployerEnv, LocalLifecycle,
LocalNodePorts, LocalPeerNode, LocalProcess, LocalProcessSpec, LocalRuntime,
LocalStableReadinessFuture, NodeConfigEntry, build_indexed_http_peers,
BuiltNodeConfig, LocalBinaryApp, LocalBuildContext, LocalDeployerEnv, LocalNodePorts,
LocalPeerNode, LocalProcessSpec, NodeConfigEntry, build_indexed_http_peers,
build_indexed_node_configs, build_local_cluster_node_config, build_local_peer_nodes,
cluster_node_config_from_context, default_yaml_launch_spec, discovered_node_access,
preallocate_ports, reserve_local_node_ports, single_http_node_endpoints,
text_config_launch_spec, text_node_config, yaml_config_launch_spec, yaml_node_config,
default_yaml_launch_spec, discovered_node_access, preallocate_ports, reserve_local_node_ports,
single_http_node_endpoints, text_config_launch_spec, text_node_config, yaml_config_launch_spec,
yaml_node_config,
};
pub use manual::{ManualCluster, ManualClusterError};
pub use node_control::{NodeManager, NodeManagerError, NodeManagerSeed};