refactor(tf): simplify app integration surface

This commit is contained in:
andrussal 2026-03-29 11:07:34 +02:00
parent d055dafedf
commit 909a56e3be
21 changed files with 1294 additions and 87 deletions

3
Cargo.lock generated
View File

@ -2269,12 +2269,15 @@ version = "0.1.0"
dependencies = [
"async-trait",
"fs_extra",
"serde",
"serde_yaml",
"tempfile",
"testing-framework-core",
"thiserror 2.0.18",
"tokio",
"tokio-retry",
"tracing",
"which",
]
[[package]]

View File

@ -7,31 +7,34 @@ pub use cfgsync_core::render::{CfgsyncOutputPaths, RenderedCfgsync};
use serde::Serialize;
use thiserror::Error;
use crate::{scenario::Application, topology::DeploymentDescriptor};
#[doc(hidden)]
pub type DynCfgsyncError = Box<dyn Error + Send + Sync + 'static>;
#[doc(hidden)]
pub trait StaticArtifactRenderer {
type Deployment;
type Node;
type NodeConfig;
type Error: Error + Send + Sync + 'static;
fn nodes(deployment: &Self::Deployment) -> &[Self::Node];
fn node_count(deployment: &Self::Deployment) -> usize;
fn node_identifier(index: usize, node: &Self::Node) -> String;
fn node_identifier(index: usize) -> String;
fn build_node_config(
deployment: &Self::Deployment,
node: &Self::Node,
node_index: usize,
) -> Result<Self::NodeConfig, Self::Error>;
fn rewrite_for_hostnames(
deployment: &Self::Deployment,
node_index: usize,
hostnames: &[String],
config: &mut Self::NodeConfig,
) -> Result<(), Self::Error>;
_deployment: &Self::Deployment,
_node_index: usize,
_hostnames: &[String],
_config: &mut Self::NodeConfig,
) -> Result<(), Self::Error> {
Ok(())
}
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error>;
}
@ -39,6 +42,65 @@ pub trait StaticArtifactRenderer {
#[doc(hidden)]
pub use StaticArtifactRenderer as CfgsyncEnv;
#[doc(hidden)]
pub trait StaticNodeConfigProvider: Application {
type Error: Error + Send + Sync + 'static;
fn build_node_config(
deployment: &Self::Deployment,
node_index: usize,
) -> Result<Self::NodeConfig, Self::Error>;
fn rewrite_for_hostnames(
_deployment: &Self::Deployment,
_node_index: usize,
_hostnames: &[String],
_config: &mut Self::NodeConfig,
) -> Result<(), Self::Error> {
Ok(())
}
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error>;
}
impl<T> StaticArtifactRenderer for T
where
T: StaticNodeConfigProvider,
T::Deployment: DeploymentDescriptor,
{
type Deployment = T::Deployment;
type NodeConfig = T::NodeConfig;
type Error = T::Error;
fn node_count(deployment: &Self::Deployment) -> usize {
deployment.node_count()
}
fn node_identifier(index: usize) -> String {
format!("node-{index}")
}
fn build_node_config(
deployment: &Self::Deployment,
node_index: usize,
) -> Result<Self::NodeConfig, Self::Error> {
T::build_node_config(deployment, node_index)
}
fn rewrite_for_hostnames(
deployment: &Self::Deployment,
node_index: usize,
hostnames: &[String],
config: &mut Self::NodeConfig,
) -> Result<(), Self::Error> {
T::rewrite_for_hostnames(deployment, node_index, hostnames, config)
}
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error> {
T::serialize_node_config(config)
}
}
#[derive(Debug, Error)]
pub enum BuildStaticArtifactsError {
#[error("cfgsync hostnames mismatch (nodes={nodes}, hostnames={hostnames})")]
@ -63,25 +125,25 @@ pub fn build_static_artifacts<E: StaticArtifactRenderer>(
deployment: &E::Deployment,
hostnames: &[String],
) -> Result<cfgsync_adapter::MaterializedArtifacts, BuildStaticArtifactsError> {
let nodes = E::nodes(deployment);
let node_count = E::node_count(deployment);
if nodes.len() != hostnames.len() {
if node_count != hostnames.len() {
return Err(BuildStaticArtifactsError::HostnameCountMismatch {
nodes: nodes.len(),
nodes: node_count,
hostnames: hostnames.len(),
});
}
let mut output = std::collections::HashMap::with_capacity(nodes.len());
let mut output = std::collections::HashMap::with_capacity(node_count);
for (index, node) in nodes.iter().enumerate() {
let mut node_config = E::build_node_config(deployment, node).map_err(adapter_error)?;
for index in 0..node_count {
let mut node_config = E::build_node_config(deployment, index).map_err(adapter_error)?;
E::rewrite_for_hostnames(deployment, index, hostnames, &mut node_config)
.map_err(adapter_error)?;
let config_yaml = E::serialize_node_config(&node_config).map_err(adapter_error)?;
output.insert(
E::node_identifier(index, node),
E::node_identifier(index),
ArtifactSet::new(vec![ArtifactFile::new(
"/config.yaml".to_string(),
config_yaml.clone(),

View File

@ -3,7 +3,7 @@ use std::io;
use async_trait::async_trait;
use crate::{
scenario::{DynError, ExternalNodeSource, FeedRuntime, NodeClients},
scenario::{DynError, ExternalNodeSource, FeedRuntime, NodeAccess, NodeClients},
topology::DeploymentDescriptor,
};
@ -25,6 +25,16 @@ pub trait Application: Send + Sync + 'static {
Err(io::Error::other("external node sources are not supported").into())
}
/// Build an application node client from deployer-provided node access.
fn build_node_client(_access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
Err(io::Error::other("node access is not supported").into())
}
/// Path appended by deployers during default readiness probing.
fn node_readiness_path() -> &'static str {
"/"
}
async fn prepare_feed(
node_clients: NodeClients<Self>,
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError>

View File

@ -0,0 +1,65 @@
use std::collections::HashMap;
use reqwest::Url;
use super::DynError;
/// Deployer-neutral node access facts discovered at runtime.
#[derive(Clone, Debug, Default)]
pub struct NodeAccess {
host: String,
api_port: u16,
testing_port: Option<u16>,
named_ports: HashMap<String, u16>,
}
impl NodeAccess {
#[must_use]
pub fn new(host: impl Into<String>, api_port: u16) -> Self {
Self {
host: host.into(),
api_port,
testing_port: None,
named_ports: HashMap::new(),
}
}
#[must_use]
pub fn with_testing_port(mut self, port: u16) -> Self {
self.testing_port = Some(port);
self
}
#[must_use]
pub fn with_named_port(mut self, name: impl Into<String>, port: u16) -> Self {
self.named_ports.insert(name.into(), port);
self
}
#[must_use]
pub fn host(&self) -> &str {
&self.host
}
#[must_use]
pub fn api_port(&self) -> u16 {
self.api_port
}
#[must_use]
pub fn testing_port(&self) -> Option<u16> {
self.testing_port
}
#[must_use]
pub fn named_port(&self, name: &str) -> Option<u16> {
self.named_ports.get(name).copied()
}
pub fn api_base_url(&self) -> Result<Url, DynError> {
Ok(Url::parse(&format!(
"http://{}:{}",
self.host, self.api_port
))?)
}
}

View File

@ -0,0 +1,185 @@
use std::{collections::HashMap, error::Error};
use super::ScenarioApplication;
use crate::{cfgsync::StaticNodeConfigProvider, topology::DeploymentDescriptor};
#[derive(Clone, Debug)]
pub struct ClusterPeerView {
index: usize,
host: String,
network_port: u16,
}
impl ClusterPeerView {
#[must_use]
pub fn new(index: usize, host: impl Into<String>, network_port: u16) -> Self {
Self {
index,
host: host.into(),
network_port,
}
}
#[must_use]
pub fn index(&self) -> usize {
self.index
}
#[must_use]
pub fn host(&self) -> &str {
&self.host
}
#[must_use]
pub fn network_port(&self) -> u16 {
self.network_port
}
#[must_use]
pub fn authority(&self) -> String {
format!("{}:{}", self.host, self.network_port)
}
}
#[derive(Clone, Debug)]
pub struct ClusterNodeView {
index: usize,
host: String,
network_port: u16,
named_ports: HashMap<&'static str, u16>,
}
impl ClusterNodeView {
#[must_use]
pub fn new(index: usize, host: impl Into<String>, network_port: u16) -> Self {
Self {
index,
host: host.into(),
network_port,
named_ports: HashMap::new(),
}
}
#[must_use]
pub fn with_named_port(mut self, name: &'static str, port: u16) -> Self {
self.named_ports.insert(name, port);
self
}
#[must_use]
pub fn index(&self) -> usize {
self.index
}
#[must_use]
pub fn host(&self) -> &str {
&self.host
}
#[must_use]
pub fn network_port(&self) -> u16 {
self.network_port
}
pub fn require_named_port(&self, name: &str) -> Result<u16, std::io::Error> {
self.named_ports
.get(name)
.copied()
.ok_or_else(|| std::io::Error::other(format!("missing node port '{name}'")))
}
#[must_use]
pub fn authority(&self) -> String {
format!("{}:{}", self.host, self.network_port)
}
}
pub trait ClusterNodeConfigApplication: ScenarioApplication {
type ConfigError: Error + Send + Sync + 'static;
fn static_network_port() -> u16;
fn static_named_ports() -> &'static [(&'static str, u16)] {
&[]
}
fn build_cluster_node_config(
node: &ClusterNodeView,
peers: &[ClusterPeerView],
) -> Result<Self::NodeConfig, Self::ConfigError>;
fn serialize_cluster_node_config(
config: &Self::NodeConfig,
) -> Result<String, Self::ConfigError>;
}
impl<T> StaticNodeConfigProvider for T
where
T: ClusterNodeConfigApplication,
T::Deployment: DeploymentDescriptor,
{
type Error = T::ConfigError;
fn build_node_config(
deployment: &Self::Deployment,
node_index: usize,
) -> Result<Self::NodeConfig, Self::Error> {
build_static_cluster_node_config::<T>(deployment, node_index, None)
}
fn rewrite_for_hostnames(
deployment: &Self::Deployment,
node_index: usize,
hostnames: &[String],
config: &mut Self::NodeConfig,
) -> Result<(), Self::Error> {
*config = build_static_cluster_node_config::<T>(deployment, node_index, Some(hostnames))?;
Ok(())
}
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error> {
T::serialize_cluster_node_config(config)
}
}
fn build_static_cluster_node_config<T>(
deployment: &T::Deployment,
node_index: usize,
hostnames: Option<&[String]>,
) -> Result<T::NodeConfig, T::ConfigError>
where
T: ClusterNodeConfigApplication,
T::Deployment: DeploymentDescriptor,
{
let node = static_node_view::<T>(node_index, hostnames);
let peers = (0..deployment.node_count())
.filter(|&i| i != node_index)
.map(|i| static_peer_view::<T>(i, hostnames))
.collect::<Vec<_>>();
T::build_cluster_node_config(&node, &peers)
}
fn static_node_view<T>(node_index: usize, hostnames: Option<&[String]>) -> ClusterNodeView
where
T: ClusterNodeConfigApplication,
{
let host = hostnames
.and_then(|names| names.get(node_index).cloned())
.unwrap_or_else(|| format!("node-{node_index}"));
let mut node = ClusterNodeView::new(node_index, host, T::static_network_port());
for (name, port) in T::static_named_ports() {
node = node.with_named_port(name, *port);
}
node
}
fn static_peer_view<T>(node_index: usize, hostnames: Option<&[String]>) -> ClusterPeerView
where
T: ClusterNodeConfigApplication,
{
let host = hostnames
.and_then(|names| names.get(node_index).cloned())
.unwrap_or_else(|| format!("node-{node_index}"));
ClusterPeerView::new(node_index, host, T::static_network_port())
}

View File

@ -5,12 +5,15 @@ use std::error::Error;
mod builder_ext;
mod builder_ops;
mod capabilities;
mod client;
mod common_builder_ext;
mod config;
mod control;
mod definition;
mod deployment_policy;
mod expectation;
pub mod internal;
mod noop;
mod observability;
mod runtime;
mod sources;
@ -23,11 +26,14 @@ pub use capabilities::{
NodeControlCapability, ObservabilityCapability, PeerSelection, RequiresNodeControl,
StartNodeOptions, StartedNode,
};
pub use client::NodeAccess;
pub use common_builder_ext::CoreBuilderExt;
pub use config::{ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView};
pub use control::{ClusterWaitHandle, NodeControlHandle};
pub use definition::{Scenario, ScenarioBuildError, ScenarioBuilder};
pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy};
pub use expectation::Expectation;
pub use noop::ScenarioApplication;
pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs};
pub use runtime::{
Deployer, Feed, FeedRuntime, HttpReadinessRequirement, NodeClients, ReadinessError, RunContext,

View File

@ -0,0 +1,78 @@
use async_trait::async_trait;
use super::{Application, DynError, Feed, FeedRuntime, NodeAccess, NodeClients};
#[derive(Clone)]
pub struct DefaultFeed;
impl Feed for DefaultFeed {
type Subscription = ();
fn subscribe(&self) -> Self::Subscription {}
}
pub struct DefaultFeedRuntime;
#[async_trait]
impl FeedRuntime for DefaultFeedRuntime {
type Feed = DefaultFeed;
async fn run(self: Box<Self>) {}
}
/// App surface for the common case where the framework default feed behavior is
/// sufficient and no custom feed runtime is needed.
#[async_trait]
pub trait ScenarioApplication: Send + Sync + 'static {
type Deployment: crate::topology::DeploymentDescriptor + Clone + 'static;
type NodeClient: Clone + Send + Sync + 'static;
type NodeConfig: Clone + Send + Sync + 'static;
fn external_node_client(
_source: &super::ExternalNodeSource,
) -> Result<Self::NodeClient, DynError> {
Err(std::io::Error::other("external node sources are not supported").into())
}
fn build_node_client(_access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
Err(std::io::Error::other("node access is not supported").into())
}
fn node_readiness_path() -> &'static str {
"/"
}
}
#[async_trait]
impl<T> Application for T
where
T: ScenarioApplication,
{
type Deployment = T::Deployment;
type NodeClient = T::NodeClient;
type NodeConfig = T::NodeConfig;
type FeedRuntime = DefaultFeedRuntime;
fn external_node_client(
source: &super::ExternalNodeSource,
) -> Result<Self::NodeClient, DynError> {
T::external_node_client(source)
}
fn build_node_client(access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
T::build_node_client(access)
}
fn node_readiness_path() -> &'static str {
T::node_readiness_path()
}
async fn prepare_feed(
_node_clients: NodeClients<Self>,
) -> Result<(<Self::FeedRuntime as FeedRuntime>::Feed, Self::FeedRuntime), DynError>
where
Self: Sized,
{
Ok((DefaultFeed, DefaultFeedRuntime))
}
}

View File

@ -19,8 +19,10 @@ pub type DynTopologyError = Box<dyn Error + Send + Sync + 'static>;
pub mod generated;
pub mod shape;
pub mod simple;
pub use generated::{DeploymentPlan, RuntimeTopology, SharedTopology};
pub use shape::TopologyShapeBuilder;
pub use simple::{ClusterTopology, NodeCountTopology};
pub trait DeploymentDescriptor: Send + Sync {
fn node_count(&self) -> usize;

View File

@ -0,0 +1,31 @@
use super::DeploymentDescriptor;
#[derive(Clone, Debug)]
pub struct ClusterTopology {
pub node_count: usize,
node_indices: Vec<usize>,
}
impl ClusterTopology {
#[must_use]
pub fn new(node_count: usize) -> Self {
Self {
node_count,
node_indices: (0..node_count).collect(),
}
}
#[must_use]
pub fn node_indices(&self) -> &[usize] {
&self.node_indices
}
}
impl DeploymentDescriptor for ClusterTopology {
fn node_count(&self) -> usize {
self.node_count
}
}
#[doc(hidden)]
pub type NodeCountTopology = ClusterTopology;

View File

@ -198,7 +198,7 @@ async fn collect_readiness_endpoints<E: ComposeDeployEnv>(
let container_id = discover_service_container_id(project, service).await?;
let api_port = discover_api_port(&container_id).await?;
let mut endpoint = build_service_endpoint(host, api_port)?;
endpoint.set_path(E::readiness_path());
endpoint.set_path(<E as ComposeDeployEnv>::node_readiness_path());
endpoints.push(endpoint);
}

View File

@ -2,7 +2,11 @@ use serde::Serialize;
mod node;
pub use node::{EnvEntry, NodeDescriptor};
pub use node::{
BinaryConfigNodeSpec, EnvEntry, LoopbackNodeRuntimeSpec, NodeDescriptor,
binary_config_node_runtime_spec, build_binary_config_node_descriptors,
build_loopback_node_descriptors,
};
/// Top-level docker-compose descriptor built from an environment-specific
/// topology.

View File

@ -1,5 +1,9 @@
use std::env;
use serde::Serialize;
use crate::infrastructure::ports::node_identifier;
/// Describes a node container in the compose stack.
#[derive(Clone, Debug, Serialize)]
pub struct NodeDescriptor {
@ -127,3 +131,119 @@ impl NodeDescriptor {
&self.environment
}
}
#[derive(Clone, Debug)]
pub struct LoopbackNodeRuntimeSpec {
pub image: String,
pub entrypoint: String,
pub volumes: Vec<String>,
pub extra_hosts: Vec<String>,
pub container_ports: Vec<u16>,
pub environment: Vec<EnvEntry>,
pub platform: Option<String>,
}
#[derive(Clone, Debug)]
pub struct BinaryConfigNodeSpec {
pub image_env_var: String,
pub default_image: String,
pub platform_env_var: String,
pub binary_path: String,
pub config_container_path: String,
pub config_file_extension: String,
pub container_ports: Vec<u16>,
pub rust_log: String,
}
impl BinaryConfigNodeSpec {
#[must_use]
pub fn conventional(
binary_path: &str,
config_container_path: &str,
container_ports: Vec<u16>,
) -> Self {
let binary_name = binary_path
.rsplit('/')
.next()
.unwrap_or(binary_path)
.to_owned();
let app_name = binary_name
.strip_suffix("-node")
.unwrap_or(&binary_name)
.to_owned();
let env_prefix = app_name.replace('-', "_").to_ascii_uppercase();
let config_file_extension = config_container_path
.rsplit('.')
.next()
.filter(|ext| !ext.contains('/'))
.unwrap_or("yaml")
.to_owned();
let rust_target = binary_name.replace('-', "_");
Self {
image_env_var: format!("{env_prefix}_IMAGE"),
default_image: format!("{binary_name}:local"),
platform_env_var: format!("{env_prefix}_PLATFORM"),
binary_path: binary_path.to_owned(),
config_container_path: config_container_path.to_owned(),
config_file_extension,
container_ports,
rust_log: format!("{rust_target}=info"),
}
}
}
pub fn build_loopback_node_descriptors(
node_count: usize,
mut spec_for_index: impl FnMut(usize) -> LoopbackNodeRuntimeSpec,
) -> Vec<NodeDescriptor> {
(0..node_count)
.map(|index| {
let spec = spec_for_index(index);
NodeDescriptor::with_loopback_ports(
node_identifier(index),
spec.image,
spec.entrypoint,
spec.volumes,
spec.extra_hosts,
spec.container_ports,
spec.environment,
spec.platform,
)
})
.collect()
}
pub fn build_binary_config_node_descriptors(
node_count: usize,
spec: &BinaryConfigNodeSpec,
) -> Vec<NodeDescriptor> {
build_loopback_node_descriptors(node_count, |index| {
binary_config_node_runtime_spec(index, spec)
})
}
pub fn binary_config_node_runtime_spec(
index: usize,
spec: &BinaryConfigNodeSpec,
) -> LoopbackNodeRuntimeSpec {
let image = env::var(&spec.image_env_var).unwrap_or_else(|_| spec.default_image.clone());
let platform = env::var(&spec.platform_env_var).ok();
let entrypoint = format!(
"/bin/sh -c '{} --config {}'",
spec.binary_path, spec.config_container_path
);
LoopbackNodeRuntimeSpec {
image,
entrypoint,
volumes: vec![format!(
"./stack/configs/node-{index}.{}:{}:ro",
spec.config_file_extension, spec.config_container_path
)],
extra_hosts: vec![],
container_ports: spec.container_ports.clone(),
environment: vec![EnvEntry::new("RUST_LOG", &spec.rust_log)],
platform,
}
}

View File

@ -1,20 +1,21 @@
use std::{path::Path, time::Duration};
use std::{fs, path::Path, time::Duration};
use async_trait::async_trait;
use reqwest::Url;
use testing_framework_core::{
cfgsync::{
CfgsyncOutputPaths, MaterializedArtifacts, RegistrationServerRenderOptions,
StaticArtifactRenderer, render_and_write_registration_server,
},
cfgsync::{MaterializedArtifacts, StaticArtifactRenderer},
scenario::{
Application, DynError, HttpReadinessRequirement, NodeClients,
Application, DynError, HttpReadinessRequirement, NodeAccess, NodeClients,
wait_for_http_ports_with_host_and_requirement, wait_http_readiness,
},
topology::DeploymentDescriptor,
};
use crate::{
descriptor::{ComposeDescriptor, NodeDescriptor},
descriptor::{
BinaryConfigNodeSpec, ComposeDescriptor, LoopbackNodeRuntimeSpec, NodeDescriptor,
binary_config_node_runtime_spec, build_loopback_node_descriptors,
},
docker::config_server::DockerConfigServerSpec,
infrastructure::ports::{
HostPortMapping, NodeContainerPorts, NodeHostPorts, compose_runner_host,
@ -33,11 +34,49 @@ pub trait ConfigServerHandle: Send + Sync {
/// Compose-specific topology surface needed by the runner.
#[async_trait]
pub trait ComposeDeployEnv: Application {
/// Write per-node config files or other compose-time assets into the stack
/// workspace before the stack starts.
fn prepare_compose_configs(
_path: &Path,
_topology: &<Self as Application>::Deployment,
_metrics_otlp_ingest_url: Option<&Url>,
) -> Result<(), DynError> {
Ok(())
}
/// File name for a static per-node config rendered into the compose stack.
fn static_node_config_file_name(index: usize) -> String {
format!("node-{index}.yaml")
}
fn loopback_node_runtime_spec(
_topology: &<Self as Application>::Deployment,
_index: usize,
) -> Option<LoopbackNodeRuntimeSpec> {
if let Some(spec) = Self::binary_config_node_spec(_topology, _index) {
return Some(binary_config_node_runtime_spec(_index, &spec));
}
None
}
fn binary_config_node_spec(
_topology: &<Self as Application>::Deployment,
_index: usize,
) -> Option<BinaryConfigNodeSpec> {
None
}
/// Produce the compose descriptor for the given topology.
fn compose_descriptor(
topology: &<Self as Application>::Deployment,
cfgsync_port: u16,
) -> ComposeDescriptor;
_cfgsync_port: u16,
) -> ComposeDescriptor {
let nodes = build_loopback_node_descriptors(topology.node_count(), |index| {
Self::loopback_node_runtime_spec(topology, index)
.unwrap_or_else(|| panic!("compose_descriptor is not implemented for this app"))
});
ComposeDescriptor::new(nodes)
}
/// Container ports (API/testing) per node, used for docker-compose port
/// discovery.
@ -49,12 +88,17 @@ pub trait ComposeDeployEnv: Application {
.nodes()
.iter()
.enumerate()
.take(topology.node_count())
.filter_map(|(index, node)| parse_node_container_ports(index, node))
.collect()
}
/// Hostnames used when rewriting node configs for cfgsync delivery.
fn cfgsync_hostnames(topology: &<Self as Application>::Deployment) -> Vec<String>;
fn cfgsync_hostnames(topology: &<Self as Application>::Deployment) -> Vec<String> {
(0..topology.node_count())
.map(crate::infrastructure::ports::node_identifier)
.collect()
}
/// App-specific cfgsync artifact enrichment.
fn enrich_cfgsync_artifacts(
@ -74,34 +118,19 @@ pub trait ComposeDeployEnv: Application {
where
Self: Sized + StaticArtifactRenderer<Deployment = <Self as Application>::Deployment>,
{
let _ = metrics_otlp_ingest_url;
let options = RegistrationServerRenderOptions {
port: Some(port),
artifacts_path: None,
};
let artifacts_path = cfgsync_artifacts_path(path);
let output = CfgsyncOutputPaths {
config_path: path,
artifacts_path: &artifacts_path,
};
render_and_write_registration_server::<Self, _>(
topology,
&Self::cfgsync_hostnames(topology),
options,
output,
|artifacts| Self::enrich_cfgsync_artifacts(topology, artifacts),
)?;
write_static_compose_configs::<Self>(path, topology, metrics_otlp_ingest_url)?;
write_dummy_cfgsync_config(path, port)?;
Ok(())
}
/// Build the config server container specification.
fn cfgsync_container_spec(
cfgsync_path: &Path,
_cfgsync_path: &Path,
port: u16,
network: &str,
) -> Result<DockerConfigServerSpec, DynError>;
) -> Result<DockerConfigServerSpec, DynError> {
Ok(dummy_cfgsync_spec(port, network))
}
/// Timeout used when launching the config server container.
fn cfgsync_start_timeout() -> Duration {
@ -112,7 +141,9 @@ pub trait ComposeDeployEnv: Application {
fn node_client_from_ports(
ports: &NodeHostPorts,
host: &str,
) -> Result<Self::NodeClient, DynError>;
) -> Result<Self::NodeClient, DynError> {
<Self as Application>::build_node_client(&discovered_node_access(host, ports))
}
/// Build node clients from discovered host ports.
fn build_node_clients(
@ -132,8 +163,8 @@ pub trait ComposeDeployEnv: Application {
}
/// Path used by default readiness checks.
fn readiness_path() -> &'static str {
"/"
fn node_readiness_path() -> &'static str {
<Self as Application>::node_readiness_path()
}
/// Host used by default remote readiness checks.
@ -148,7 +179,11 @@ pub trait ComposeDeployEnv: Application {
requirement: HttpReadinessRequirement,
) -> Result<(), DynError> {
let host = Self::compose_runner_host();
let urls = readiness_urls(&host, mapping, Self::readiness_path())?;
let urls = readiness_urls(
&host,
mapping,
<Self as ComposeDeployEnv>::node_readiness_path(),
)?;
wait_http_readiness(&urls, requirement).await?;
Ok(())
}
@ -162,7 +197,7 @@ pub trait ComposeDeployEnv: Application {
wait_for_http_ports_with_host_and_requirement(
ports,
host,
Self::readiness_path(),
<Self as ComposeDeployEnv>::node_readiness_path(),
requirement,
)
.await?;
@ -180,11 +215,63 @@ impl<T> ComposeCfgsyncEnv for T where
{
}
fn cfgsync_artifacts_path(config_path: &Path) -> std::path::PathBuf {
config_path
fn write_static_compose_configs<E>(
path: &Path,
topology: &<E as Application>::Deployment,
metrics_otlp_ingest_url: Option<&Url>,
) -> Result<(), DynError>
where
E: ComposeDeployEnv + StaticArtifactRenderer<Deployment = <E as Application>::Deployment>,
{
E::prepare_compose_configs(path, topology, metrics_otlp_ingest_url)?;
let hostnames = E::cfgsync_hostnames(topology);
let configs_dir = stack_configs_dir(path)?;
fs::create_dir_all(&configs_dir)?;
for index in 0..topology.node_count() {
let mut config = E::build_node_config(topology, index)?;
E::rewrite_for_hostnames(topology, index, &hostnames, &mut config)?;
let rendered = E::serialize_node_config(&config)?;
let output_path = configs_dir.join(E::static_node_config_file_name(index));
fs::write(&output_path, rendered)?;
}
Ok(())
}
fn stack_configs_dir(cfgsync_path: &Path) -> Result<std::path::PathBuf, DynError> {
let stack_dir = cfgsync_path
.parent()
.unwrap_or(config_path)
.join("cfgsync.artifacts.yaml")
.ok_or_else(|| anyhow::anyhow!("cfgsync path has no parent"))?;
Ok(stack_dir.join("configs"))
}
fn write_dummy_cfgsync_config(path: &Path, port: u16) -> Result<(), DynError> {
fs::write(
path,
format!(
"port: {port}\nsource:\n kind: static\n artifacts_path: cfgsync.artifacts.yaml\n"
),
)?;
Ok(())
}
fn dummy_cfgsync_spec(port: u16, network: &str) -> DockerConfigServerSpec {
use crate::docker::config_server::DockerPortBinding;
DockerConfigServerSpec::new(
"cfgsync".to_owned(),
network.to_owned(),
"sh".to_owned(),
"busybox:1.36".to_owned(),
)
.with_network_alias("cfgsync".to_owned())
.with_args(vec![
"-c".to_owned(),
format!("while true; do nc -l -p {port} >/dev/null 2>&1; done"),
])
.with_ports(vec![DockerPortBinding::tcp(port, port)])
}
fn parse_node_container_ports(index: usize, node: &NodeDescriptor) -> Option<NodeContainerPorts> {
@ -199,6 +286,10 @@ fn parse_node_container_ports(index: usize, node: &NodeDescriptor) -> Option<Nod
})
}
pub fn discovered_node_access(host: &str, ports: &NodeHostPorts) -> NodeAccess {
NodeAccess::new(host, ports.api).with_testing_port(ports.testing)
}
fn readiness_urls(
host: &str,
mapping: &HostPortMapping,

View File

@ -7,7 +7,11 @@ pub mod infrastructure;
pub mod lifecycle;
pub use deployer::{ComposeDeployer, ComposeDeploymentMetadata};
pub use descriptor::{ComposeDescriptor, EnvEntry, NodeDescriptor};
pub use descriptor::{
BinaryConfigNodeSpec, ComposeDescriptor, EnvEntry, LoopbackNodeRuntimeSpec, NodeDescriptor,
binary_config_node_runtime_spec, build_binary_config_node_descriptors,
build_loopback_node_descriptors,
};
pub use docker::{
commands::{ComposeCommandError, compose_down, compose_up, dump_compose_logs},
config_server::{
@ -16,7 +20,7 @@ pub use docker::{
},
platform::host_gateway_entry,
};
pub use env::{ComposeDeployEnv, ConfigServerHandle};
pub use env::{ComposeDeployEnv, ConfigServerHandle, discovered_node_access};
pub use errors::ComposeRunnerError;
pub use infrastructure::{
ports::{HostPortMapping, NodeHostPorts, compose_runner_host, node_identifier},

View File

@ -263,7 +263,7 @@ fn collect_readiness_endpoints<E: K8sDeployEnv>(
for service in services {
let api_port = extract_api_node_port(service)?;
let mut endpoint = Url::parse(&format!("http://{host}:{api_port}/"))?;
endpoint.set_path(E::readiness_path());
endpoint.set_path(<E as K8sDeployEnv>::node_readiness_path());
endpoints.push(endpoint);
}

View File

@ -1,14 +1,17 @@
use std::{
env, process,
env, fs,
path::PathBuf,
process,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use async_trait::async_trait;
use kube::Client;
use reqwest::Url;
use tempfile::TempDir;
use testing_framework_core::scenario::{
Application, DynError, HttpReadinessRequirement, wait_for_http_ports_with_host_and_requirement,
wait_http_readiness,
Application, DynError, HttpReadinessRequirement, NodeAccess,
wait_for_http_ports_with_host_and_requirement, wait_http_readiness,
};
use crate::{
@ -21,6 +24,51 @@ pub trait HelmReleaseAssets {
fn release_bundle(&self) -> HelmReleaseBundle;
}
#[derive(Debug)]
pub struct RenderedHelmChartAssets {
chart_path: PathBuf,
_tempdir: TempDir,
}
impl HelmReleaseAssets for RenderedHelmChartAssets {
fn release_bundle(&self) -> HelmReleaseBundle {
HelmReleaseBundle::new(self.chart_path.clone())
}
}
pub fn standard_port_specs(node_count: usize, api: u16, auxiliary: u16) -> PortSpecs {
PortSpecs {
nodes: (0..node_count)
.map(|_| crate::wait::NodeConfigPorts { api, auxiliary })
.collect(),
}
}
pub fn render_single_template_chart_assets(
chart_name: &str,
template_name: &str,
manifest: &str,
) -> Result<RenderedHelmChartAssets, DynError> {
let tempdir = tempfile::tempdir()?;
let chart_path = tempdir.path().join("chart");
let templates_path = chart_path.join("templates");
fs::create_dir_all(&templates_path)?;
fs::write(chart_path.join("Chart.yaml"), render_chart_yaml(chart_name))?;
fs::write(templates_path.join(template_name), manifest)?;
Ok(RenderedHelmChartAssets {
chart_path,
_tempdir: tempdir,
})
}
pub fn discovered_node_access(host: &str, api_port: u16, auxiliary_port: u16) -> NodeAccess {
NodeAccess::new(host, api_port).with_testing_port(auxiliary_port)
}
fn render_chart_yaml(chart_name: &str) -> String {
format!("apiVersion: v2\nname: {chart_name}\nversion: 0.1.0\n")
}
pub async fn install_helm_release_with_cleanup<A: HelmReleaseAssets>(
client: &Client,
assets: &A,
@ -85,7 +133,13 @@ pub trait K8sDeployEnv: Application {
host: &str,
api_port: u16,
auxiliary_port: u16,
) -> Result<Self::NodeClient, DynError>;
) -> Result<Self::NodeClient, DynError> {
<Self as Application>::build_node_client(&discovered_node_access(
host,
api_port,
auxiliary_port,
))
}
/// Build node clients from forwarded ports.
fn build_node_clients(
@ -103,8 +157,8 @@ pub trait K8sDeployEnv: Application {
}
/// Path appended to readiness probe URLs.
fn readiness_path() -> &'static str {
"/"
fn node_readiness_path() -> &'static str {
<Self as Application>::node_readiness_path()
}
/// Wait for remote readiness using topology + URLs.
@ -118,7 +172,7 @@ pub trait K8sDeployEnv: Application {
.iter()
.map(|url| {
let mut endpoint = url.clone();
endpoint.set_path(Self::readiness_path());
endpoint.set_path(<Self as K8sDeployEnv>::node_readiness_path());
endpoint
})
.collect();
@ -162,7 +216,7 @@ pub trait K8sDeployEnv: Application {
wait_for_http_ports_with_host_and_requirement(
ports,
host,
Self::readiness_path(),
<Self as K8sDeployEnv>::node_readiness_path(),
requirement,
)
.await?;

View File

@ -9,7 +9,10 @@ pub mod wait {
}
pub use deployer::{K8sDeployer, K8sDeploymentMetadata, K8sRunnerError};
pub use env::{HelmReleaseAssets, K8sDeployEnv, install_helm_release_with_cleanup};
pub use env::{
HelmReleaseAssets, K8sDeployEnv, RenderedHelmChartAssets, discovered_node_access,
install_helm_release_with_cleanup, render_single_template_chart_assets, standard_port_specs,
};
pub use infrastructure::{
chart_values::{
BootstrapExtraFile, BootstrapFiles, BootstrapScripts, BootstrapValues, NodeGroup,

View File

@ -15,9 +15,12 @@ workspace = true
[dependencies]
async-trait = "0.1"
fs_extra = "1.3"
serde = { workspace = true }
serde_yaml = { workspace = true }
tempfile = { workspace = true }
testing-framework-core = { path = "../../core" }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-retry = "0.3"
tracing = { workspace = true }
which = "6.0"

View File

@ -1,14 +1,20 @@
use std::{
collections::HashMap,
net::{Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
};
use testing_framework_core::scenario::{
Application, DynError, HttpReadinessRequirement, ReadinessError, StartNodeOptions,
wait_for_http_ports_with_requirement,
use serde::Serialize;
use testing_framework_core::{
scenario::{
Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError,
HttpReadinessRequirement, NodeAccess, ReadinessError, StartNodeOptions,
wait_for_http_ports_with_requirement,
},
topology::DeploymentDescriptor,
};
use crate::process::{LaunchSpec, NodeEndpoints, ProcessNode, ProcessSpawnError};
use crate::process::{LaunchSpec, NodeEndpointPort, NodeEndpoints, ProcessNode, ProcessSpawnError};
pub type Node<E> = ProcessNode<<E as Application>::NodeConfig, <E as Application>::NodeClient>;
@ -22,18 +28,326 @@ pub struct NodeConfigEntry<NodeConfigValue> {
pub config: NodeConfigValue,
}
pub struct LocalNodePorts {
network_port: u16,
named_ports: HashMap<&'static str, u16>,
}
impl LocalNodePorts {
#[must_use]
pub fn network_port(&self) -> u16 {
self.network_port
}
#[must_use]
pub fn get(&self, name: &str) -> Option<u16> {
self.named_ports.get(name).copied()
}
pub fn require(&self, name: &str) -> Result<u16, DynError> {
self.get(name)
.ok_or_else(|| format!("missing reserved local port '{name}'").into())
}
pub fn iter(&self) -> impl Iterator<Item = (&'static str, u16)> + '_ {
self.named_ports.iter().map(|(name, port)| (*name, *port))
}
}
#[derive(Clone, Debug)]
pub struct LocalPeerNode {
index: usize,
network_port: u16,
}
impl LocalPeerNode {
#[must_use]
pub fn index(&self) -> usize {
self.index
}
#[must_use]
pub fn network_port(&self) -> u16 {
self.network_port
}
#[must_use]
pub fn http_address(&self) -> String {
format!("127.0.0.1:{}", self.network_port)
}
#[must_use]
pub fn authority(&self) -> String {
self.http_address()
}
}
#[derive(Clone, Default)]
pub struct LocalProcessSpec {
pub binary_env_var: String,
pub binary_name: String,
pub config_file_name: String,
pub config_arg: String,
pub extra_args: Vec<String>,
pub env: Vec<crate::process::LaunchEnvVar>,
}
impl LocalProcessSpec {
#[must_use]
pub fn new(binary_env_var: &str, binary_name: &str) -> Self {
Self {
binary_env_var: binary_env_var.to_owned(),
binary_name: binary_name.to_owned(),
config_file_name: "config.yaml".to_owned(),
config_arg: "--config".to_owned(),
extra_args: Vec::new(),
env: Vec::new(),
}
}
#[must_use]
pub fn with_config_file(mut self, file_name: &str, arg: &str) -> Self {
self.config_file_name = file_name.to_owned();
self.config_arg = arg.to_owned();
self
}
#[must_use]
pub fn with_env(mut self, key: &str, value: &str) -> Self {
self.env.push(crate::process::LaunchEnvVar::new(key, value));
self
}
#[must_use]
pub fn with_rust_log(self, value: &str) -> Self {
self.with_env("RUST_LOG", value)
}
#[must_use]
pub fn with_args(mut self, args: impl IntoIterator<Item = String>) -> Self {
self.extra_args.extend(args);
self
}
}
pub fn preallocate_ports(count: usize, label: &str) -> Result<Vec<u16>, ProcessSpawnError> {
(0..count)
.map(|_| crate::process::allocate_available_port())
.collect::<Result<Vec<_>, _>>()
.map_err(|source| ProcessSpawnError::Config {
source: format!("failed to pre-allocate {label} ports: {source}").into(),
})
}
pub fn build_indexed_node_configs<T>(
count: usize,
name_prefix: &str,
build: impl FnMut(usize) -> T,
) -> Vec<NodeConfigEntry<T>> {
(0..count)
.map(build)
.enumerate()
.map(|(index, config)| NodeConfigEntry {
name: format!("{name_prefix}-{index}"),
config,
})
.collect()
}
pub fn reserve_local_node_ports(
count: usize,
names: &[&'static str],
label: &str,
) -> Result<Vec<LocalNodePorts>, ProcessSpawnError> {
let network_ports = preallocate_ports(count, label)?;
let mut named_by_role = HashMap::new();
for name in names {
named_by_role.insert(*name, preallocate_ports(count, &format!("{label} {name}"))?);
}
Ok((0..count)
.map(|index| LocalNodePorts {
network_port: network_ports[index],
named_ports: named_by_role
.iter()
.map(|(name, ports)| (*name, ports[index]))
.collect(),
})
.collect())
}
pub fn single_http_node_endpoints(port: u16) -> NodeEndpoints {
NodeEndpoints::from_api_port(port)
}
pub fn build_local_cluster_node_config<E>(
index: usize,
ports: &LocalNodePorts,
peers: &[LocalPeerNode],
) -> Result<<E as Application>::NodeConfig, DynError>
where
E: ClusterNodeConfigApplication,
{
let mut node = ClusterNodeView::new(index, "127.0.0.1", ports.network_port());
for (name, port) in ports.iter() {
node = node.with_named_port(name, port);
}
let peer_views = peers
.iter()
.map(|peer| ClusterPeerView::new(peer.index(), "127.0.0.1", peer.network_port()))
.collect::<Vec<_>>();
E::build_cluster_node_config(&node, &peer_views).map_err(Into::into)
}
pub fn discovered_node_access(endpoints: &NodeEndpoints) -> NodeAccess {
let mut access = NodeAccess::new("127.0.0.1", endpoints.api.port());
for (key, port) in &endpoints.extra_ports {
match key {
NodeEndpointPort::TestingApi => {
access = access.with_testing_port(*port);
}
NodeEndpointPort::Custom(name) => {
access = access.with_named_port(name.clone(), *port);
}
NodeEndpointPort::Network => {}
}
}
access
}
pub fn build_indexed_http_peers<T>(
node_count: usize,
self_index: usize,
peer_ports: &[u16],
mut build_peer: impl FnMut(usize, String) -> T,
) -> Vec<T> {
(0..node_count)
.filter(|&i| i != self_index)
.map(|i| build_peer(i, format!("127.0.0.1:{}", peer_ports[i])))
.collect()
}
fn compact_peer_ports(peer_ports: &[u16], self_index: usize) -> Vec<u16> {
peer_ports
.iter()
.enumerate()
.filter_map(|(index, port)| (index != self_index).then_some(*port))
.collect()
}
pub fn build_local_peer_nodes(peer_ports: &[u16], self_index: usize) -> Vec<LocalPeerNode> {
peer_ports
.iter()
.enumerate()
.filter_map(|(index, port)| {
(index != self_index).then_some(LocalPeerNode {
index,
network_port: *port,
})
})
.collect()
}
pub fn yaml_config_launch_spec<T: Serialize>(
config: &T,
spec: &LocalProcessSpec,
) -> Result<LaunchSpec, DynError> {
let config_yaml = serde_yaml::to_string(config)?;
rendered_config_launch_spec(config_yaml.into_bytes(), spec)
}
pub fn text_config_launch_spec(
rendered_config: impl Into<Vec<u8>>,
spec: &LocalProcessSpec,
) -> Result<LaunchSpec, DynError> {
rendered_config_launch_spec(rendered_config.into(), spec)
}
pub fn default_yaml_launch_spec<T: Serialize>(
config: &T,
binary_env_var: &str,
binary_name: &str,
rust_log: &str,
) -> Result<LaunchSpec, DynError> {
yaml_config_launch_spec(
config,
&LocalProcessSpec::new(binary_env_var, binary_name).with_rust_log(rust_log),
)
}
pub fn yaml_node_config<T: Serialize>(config: &T) -> Result<Vec<u8>, DynError> {
Ok(serde_yaml::to_string(config)?.into_bytes())
}
pub fn text_node_config(rendered_config: impl Into<Vec<u8>>) -> Vec<u8> {
rendered_config.into()
}
fn rendered_config_launch_spec(
rendered_config: Vec<u8>,
spec: &LocalProcessSpec,
) -> Result<LaunchSpec, DynError> {
let binary = resolve_binary(spec);
let mut args = vec![spec.config_arg.clone(), spec.config_file_name.clone()];
args.extend(spec.extra_args.iter().cloned());
Ok(LaunchSpec {
binary,
files: vec![crate::process::LaunchFile {
relative_path: spec.config_file_name.clone().into(),
contents: rendered_config,
}],
args,
env: spec.env.clone(),
})
}
fn resolve_binary(spec: &LocalProcessSpec) -> PathBuf {
std::env::var(&spec.binary_env_var)
.map(PathBuf::from)
.or_else(|_| which::which(&spec.binary_name))
.unwrap_or_else(|_| {
let mut path = std::env::current_dir().unwrap_or_default();
let mut debug = path.clone();
debug.push(format!("target/debug/{}", spec.binary_name));
if debug.exists() {
return debug;
}
path.push(format!("target/release/{}", spec.binary_name));
path
})
}
#[async_trait::async_trait]
pub trait LocalDeployerEnv: Application + Sized
where
<Self as Application>::NodeConfig: Clone + Send + Sync + 'static,
{
fn local_port_names() -> &'static [&'static str] {
Self::initial_local_port_names()
}
fn build_node_config(
topology: &Self::Deployment,
index: usize,
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
peer_ports: &[u16],
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError>;
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
Self::build_node_config_from_template(
topology,
index,
peer_ports_by_name,
options,
peer_ports,
None,
)
}
fn build_node_config_from_template(
topology: &Self::Deployment,
@ -41,14 +355,119 @@ where
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
peer_ports: &[u16],
_template_config: Option<&<Self as Application>::NodeConfig>,
template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
Self::build_node_config(topology, index, peer_ports_by_name, options, peer_ports)
let mut reserved = reserve_local_node_ports(1, Self::local_port_names(), "node")
.map_err(|source| -> DynError { source.into() })?;
let ports = reserved
.pop()
.ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?;
let network_port = ports.network_port();
let config = Self::build_local_node_config(
topology,
index,
&ports,
peer_ports_by_name,
options,
peer_ports,
template_config,
)?;
Ok(BuiltNodeConfig {
config,
network_port,
})
}
fn build_initial_node_configs(
topology: &Self::Deployment,
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError>;
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError> {
let reserved_ports = reserve_local_node_ports(
topology.node_count(),
Self::initial_local_port_names(),
Self::initial_node_name_prefix(),
)?;
let peer_ports = reserved_ports
.iter()
.map(LocalNodePorts::network_port)
.collect::<Vec<_>>();
let mut configs = Vec::with_capacity(topology.node_count());
for (index, ports) in reserved_ports.iter().enumerate() {
let config = Self::build_initial_node_config(topology, index, ports, &peer_ports)
.map_err(|source| ProcessSpawnError::Config { source })?;
configs.push(NodeConfigEntry {
name: format!("{}-{index}", Self::initial_node_name_prefix()),
config,
});
}
Ok(configs)
}
fn initial_node_name_prefix() -> &'static str {
"node"
}
fn initial_local_port_names() -> &'static [&'static str] {
&[]
}
fn build_initial_node_config(
topology: &Self::Deployment,
index: usize,
ports: &LocalNodePorts,
peer_ports: &[u16],
) -> Result<<Self as Application>::NodeConfig, DynError> {
let compact_peer_ports = compact_peer_ports(peer_ports, index);
let peer_ports_by_name = HashMap::new();
let options = StartNodeOptions::<Self>::default();
Self::build_local_node_config(
topology,
index,
ports,
&peer_ports_by_name,
&options,
&compact_peer_ports,
None,
)
}
fn build_local_node_config(
_topology: &Self::Deployment,
_index: usize,
_ports: &LocalNodePorts,
_peer_ports_by_name: &HashMap<String, u16>,
_options: &StartNodeOptions<Self>,
_peer_ports: &[u16],
_template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<<Self as Application>::NodeConfig, DynError> {
let peers = build_local_peer_nodes(_peer_ports, _index);
Self::build_local_node_config_with_peers(
_topology,
_index,
_ports,
&peers,
_peer_ports_by_name,
_options,
_template_config,
)
}
fn build_local_node_config_with_peers(
_topology: &Self::Deployment,
_index: usize,
_ports: &LocalNodePorts,
_peers: &[LocalPeerNode],
_peer_ports_by_name: &HashMap<String, u16>,
_options: &StartNodeOptions<Self>,
_template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<<Self as Application>::NodeConfig, DynError> {
Err(std::io::Error::other(
"build_local_node_config_with_peers is not implemented for this app",
)
.into())
}
fn initial_persist_dir(
_topology: &Self::Deployment,
@ -66,22 +485,67 @@ where
None
}
fn local_process_spec() -> Option<LocalProcessSpec> {
None
}
fn render_local_config(
_config: &<Self as Application>::NodeConfig,
) -> Result<Vec<u8>, DynError> {
Err(std::io::Error::other("render_local_config is not implemented for this app").into())
}
fn build_launch_spec(
config: &<Self as Application>::NodeConfig,
dir: &Path,
label: &str,
) -> Result<LaunchSpec, DynError>;
_dir: &Path,
_label: &str,
) -> Result<LaunchSpec, DynError> {
let spec = Self::local_process_spec().ok_or_else(|| {
std::io::Error::other("build_launch_spec is not implemented for this app")
})?;
let rendered = Self::render_local_config(config)?;
rendered_config_launch_spec(rendered, &spec)
}
fn node_endpoints(config: &<Self as Application>::NodeConfig) -> NodeEndpoints;
fn http_api_port(_config: &<Self as Application>::NodeConfig) -> Option<u16> {
None
}
fn node_endpoints(config: &<Self as Application>::NodeConfig) -> NodeEndpoints {
if let Some(port) = Self::http_api_port(config) {
return NodeEndpoints {
api: SocketAddr::from((Ipv4Addr::LOCALHOST, port)),
extra_ports: HashMap::new(),
};
}
panic!("node_endpoints is not implemented for this app");
}
fn node_peer_port(node: &Node<Self>) -> u16 {
node.endpoints().api.port()
}
fn node_client(endpoints: &NodeEndpoints) -> Self::NodeClient;
fn node_client_from_api_endpoint(_api: SocketAddr) -> Option<Self::NodeClient> {
None
}
fn node_client(endpoints: &NodeEndpoints) -> Self::NodeClient {
if let Ok(client) =
<Self as Application>::build_node_client(&discovered_node_access(endpoints))
{
return client;
}
if let Some(client) = Self::node_client_from_api_endpoint(endpoints.api) {
return client;
}
panic!("node_client is not implemented for this app");
}
fn readiness_endpoint_path() -> &'static str {
"/"
<Self as Application>::node_readiness_path()
}
async fn wait_readiness_stable(_nodes: &[Node<Self>]) -> Result<(), DynError> {

View File

@ -8,12 +8,19 @@ pub mod process;
pub use binary::{BinaryConfig, BinaryResolver};
pub use deployer::{ProcessDeployer, ProcessDeployerError};
pub use env::{BuiltNodeConfig, LocalDeployerEnv, NodeConfigEntry};
pub use env::{
BuiltNodeConfig, LocalDeployerEnv, LocalNodePorts, LocalPeerNode, LocalProcessSpec,
NodeConfigEntry, build_indexed_http_peers, build_indexed_node_configs,
build_local_cluster_node_config, build_local_peer_nodes, default_yaml_launch_spec,
discovered_node_access, preallocate_ports, reserve_local_node_ports,
single_http_node_endpoints, text_config_launch_spec, text_node_config, yaml_config_launch_spec,
yaml_node_config,
};
pub use manual::{ManualCluster, ManualClusterError};
pub use node_control::{NodeManager, NodeManagerError, NodeManagerSeed};
pub use process::{
LaunchEnvVar, LaunchFile, LaunchSpec, NodeEndpointPort, NodeEndpoints, ProcessNode,
ProcessSpawnError,
ProcessSpawnError, allocate_available_port,
};
const KEEP_LOGS_ENV: &str = "TF_KEEP_LOGS";

View File

@ -41,6 +41,14 @@ impl Default for NodeEndpoints {
}
impl NodeEndpoints {
#[must_use]
pub fn from_api_port(port: u16) -> Self {
Self {
api: SocketAddr::from((Ipv4Addr::LOCALHOST, port)),
extra_ports: HashMap::new(),
}
}
pub fn insert_port(&mut self, key: NodeEndpointPort, port: u16) {
self.extra_ports.insert(key, port);
}
@ -353,6 +361,13 @@ fn default_api_socket() -> SocketAddr {
SocketAddr::from((Ipv4Addr::LOCALHOST, 0))
}
pub fn allocate_available_port() -> Result<u16, io::Error> {
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
let port = listener.local_addr()?.port();
drop(listener);
Ok(port)
}
fn create_tempdir(persist_dir: Option<&Path>) -> Result<TempDir, ProcessSpawnError> {
match persist_dir {
Some(dir) => {