docs(tf): document public deployer API

This commit is contained in:
andrussal 2026-04-11 09:36:42 +02:00
parent d131301d78
commit 23d4bf2d07
5 changed files with 223 additions and 2 deletions

View File

@ -35,32 +35,46 @@ use crate::{
/// Handle returned by a compose config server (cfgsync or equivalent).
pub trait ConfigServerHandle: Send + Sync {
/// Stops the config server and releases any local resources it owns.
fn shutdown(&mut self);
/// Marks the config server as preserved so runner cleanup should not remove
/// it.
fn mark_preserved(&mut self);
/// Returns the backing container name when the handle is container-backed.
fn container_name(&self) -> Option<&str> {
None
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
/// Selects how compose nodes receive config updates.
pub enum ComposeConfigServerMode {
/// Do not start any config server.
Disabled,
/// Start a Docker-backed config server sidecar.
Docker,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
/// Readiness probe used for compose nodes.
pub enum ComposeReadinessProbe {
/// Probe a concrete HTTP path on each node.
Http { path: &'static str },
/// Probe raw TCP reachability on the testing port.
Tcp,
}
#[derive(Clone, Copy)]
/// Naming strategy for static compose node config files.
pub enum ComposeNodeConfigFileName {
/// Use `node-{index}.<extension>`.
FixedExtension(&'static str),
/// Build the file name directly from the node index.
Custom(fn(usize) -> String),
}
impl ComposeNodeConfigFileName {
/// Resolves the config file name for one node index.
#[must_use]
pub fn resolve(&self, index: usize) -> String {
match self {
@ -73,6 +87,7 @@ impl ComposeNodeConfigFileName {
/// Advanced compose deployer integration.
#[async_trait]
pub trait ComposeDeployEnv: Application + Sized {
/// Prepares compose workspace files before the stack is started.
fn prepare_compose_configs(
_path: &Path,
_topology: &<Self as Application>::Deployment,
@ -82,10 +97,13 @@ pub trait ComposeDeployEnv: Application + Sized {
Ok(())
}
/// Returns the static config file name used for one node.
fn static_node_config_file_name(index: usize) -> String {
format!("node-{index}.yaml")
}
/// Returns the runtime spec for one loopback node when using the standard
/// one-container-per-node shape.
fn loopback_node_runtime_spec(
topology: &<Self as Application>::Deployment,
index: usize,
@ -96,6 +114,8 @@ pub trait ComposeDeployEnv: Application + Sized {
Ok(None)
}
/// Returns the binary+config node spec when the app follows the standard
/// binary compose path.
fn binary_config_node_spec(
_topology: &<Self as Application>::Deployment,
_index: usize,
@ -103,6 +123,7 @@ pub trait ComposeDeployEnv: Application + Sized {
Ok(None)
}
/// Builds the full compose descriptor for the deployment.
fn compose_descriptor(
topology: &<Self as Application>::Deployment,
_cfgsync_port: u16,
@ -116,6 +137,7 @@ pub trait ComposeDeployEnv: Application + Sized {
Ok(ComposeDescriptor::new(nodes))
}
/// Returns the container ports exposed by each compose node.
fn node_container_ports(
topology: &<Self as Application>::Deployment,
) -> Result<Vec<NodeContainerPorts>, DynError> {
@ -129,12 +151,14 @@ pub trait ComposeDeployEnv: Application + Sized {
.collect())
}
/// Returns the hostnames advertised to cfgsync-rendered node configs.
fn cfgsync_hostnames(topology: &<Self as Application>::Deployment) -> Vec<String> {
(0..topology.node_count())
.map(crate::infrastructure::ports::node_identifier)
.collect()
}
/// Adds extra artifacts to the cfgsync materialization output.
fn enrich_cfgsync_artifacts(
_topology: &<Self as Application>::Deployment,
_artifacts: &mut MaterializedArtifacts,
@ -142,10 +166,12 @@ pub trait ComposeDeployEnv: Application + Sized {
Ok(())
}
/// Selects how compose nodes receive config updates.
fn cfgsync_server_mode() -> ComposeConfigServerMode {
ComposeConfigServerMode::Disabled
}
/// Builds the config-server container spec when cfgsync is enabled.
fn cfgsync_container_spec(
_cfgsync_path: &Path,
_port: u16,
@ -154,10 +180,12 @@ pub trait ComposeDeployEnv: Application + Sized {
Err(std::io::Error::other("cfgsync_container_spec is not implemented for this app").into())
}
/// Returns how long the runner should wait for the config server to start.
fn cfgsync_start_timeout() -> Duration {
Duration::from_secs(180)
}
/// Builds one node client from mapped compose ports.
fn node_client_from_ports(
ports: &NodeHostPorts,
host: &str,
@ -165,6 +193,7 @@ pub trait ComposeDeployEnv: Application + Sized {
<Self as Application>::build_node_client(&discovered_node_access(host, ports))
}
/// Builds node clients for the full compose deployment.
fn build_node_clients(
_topology: &<Self as Application>::Deployment,
host_ports: &HostPortMapping,
@ -178,16 +207,19 @@ pub trait ComposeDeployEnv: Application + Sized {
Ok(NodeClients::new(clients))
}
/// Returns the readiness probe used for compose nodes.
fn readiness_probe() -> ComposeReadinessProbe {
ComposeReadinessProbe::Http {
path: <Self as Application>::node_readiness_path(),
}
}
/// Returns the host that should be used to access forwarded compose ports.
fn compose_runner_host() -> String {
compose_runner_host()
}
/// Waits for remote readiness using the app's compose-specific probe model.
async fn wait_remote_readiness(
_topology: &<Self as Application>::Deployment,
mapping: &HostPortMapping,
@ -204,6 +236,7 @@ pub trait ComposeDeployEnv: Application + Sized {
}
}
/// Waits for local host ports using the app's compose-specific probe model.
async fn wait_for_nodes(
ports: &[u16],
host: &str,
@ -234,8 +267,10 @@ pub trait ComposeDeployEnv: Application + Sized {
pub trait ComposeBinaryApp:
Application + Sized + StaticArtifactRenderer<Deployment = <Self as Application>::Deployment>
{
/// Returns the binary+config runtime spec shared by all compose nodes.
fn compose_node_spec() -> BinaryConfigNodeSpec;
/// Prepares any extra workspace files needed before config rendering.
fn prepare_compose_workspace(
_path: &Path,
_topology: &<Self as Application>::Deployment,
@ -244,17 +279,21 @@ pub trait ComposeBinaryApp:
Ok(())
}
/// Returns extra non-node services that should be added to the compose
/// stack.
fn compose_extra_services(
_topology: &<Self as Application>::Deployment,
) -> Result<Vec<NodeDescriptor>, DynError> {
Ok(Vec::new())
}
/// Returns the static node config file name used for one node.
fn static_node_config_file_name(index: usize) -> String {
make_extension_node_config_file_name(&Self::compose_node_spec().config_file_extension)
.resolve(index)
}
/// Builds one node client from mapped compose ports.
fn node_client_from_ports(
ports: &NodeHostPorts,
host: &str,
@ -262,12 +301,14 @@ pub trait ComposeBinaryApp:
<Self as Application>::build_node_client(&discovered_node_access(host, ports))
}
/// Returns the readiness probe used for compose nodes.
fn readiness_probe() -> ComposeReadinessProbe {
ComposeReadinessProbe::Http {
path: <Self as Application>::node_readiness_path(),
}
}
/// Returns the host that should be used to access forwarded compose ports.
fn compose_runner_host() -> String {
compose_runner_host()
}
@ -439,6 +480,7 @@ where
Ok(())
}
/// Materializes cfgsync registration-server config files for a compose stack.
pub fn write_registration_server_compose_configs<E>(
path: &Path,
topology: &<E as Application>::Deployment,
@ -529,6 +571,7 @@ async fn wait_for_tcp_readiness(
}
}
/// Converts mapped compose ports into generic node access.
pub fn discovered_node_access(host: &str, ports: &NodeHostPorts) -> NodeAccess {
NodeAccess::new(host, ports.api).with_testing_port(ports.testing)
}

View File

@ -26,32 +26,47 @@ use crate::{
lifecycle::cleanup::RunnerCleanup,
};
/// Assets that can be installed as one Helm release.
pub trait HelmReleaseAssets {
/// Returns the Helm bundle used to install the release.
fn release_bundle(&self) -> HelmReleaseBundle;
}
#[derive(Debug)]
/// Rendered chart directory backed by a temporary workspace.
pub struct RenderedHelmChartAssets {
chart_path: PathBuf,
_tempdir: TempDir,
}
#[derive(Clone, Debug, Default)]
/// In-memory YAML manifest that can be rendered into a single Helm template.
pub struct HelmManifest {
documents: Vec<String>,
}
#[derive(Clone, Debug)]
/// Standard binary+config k8s node installation contract.
pub struct BinaryConfigK8sSpec {
/// Helm chart name used for the generated chart.
pub chart_name: String,
/// Prefix used for generated node deployment and service names.
pub node_name_prefix: String,
/// Binary path inside the container image.
pub binary_path: String,
/// Absolute config path inside the container.
pub config_container_path: String,
/// Main HTTP port exposed by the container.
pub container_http_port: u16,
/// Auxiliary service port exposed by the Service object.
pub service_testing_port: u16,
/// Primary environment variable used to override the image.
pub image_env_var: String,
/// Secondary fallback environment variable used to override the image.
pub fallback_image_env_var: String,
/// Default local image tag used when no override is present.
pub default_image: String,
/// Image pull policy written into the generated Deployment.
pub image_pull_policy: String,
}
@ -62,11 +77,13 @@ impl HelmReleaseAssets for RenderedHelmChartAssets {
}
impl HelmManifest {
/// Creates an empty manifest.
#[must_use]
pub fn new() -> Self {
Self::default()
}
/// Serializes one structured value as YAML and appends it as a document.
pub fn push_yaml<T>(&mut self, value: &T) -> Result<(), DynError>
where
T: Serialize,
@ -76,6 +93,7 @@ impl HelmManifest {
Ok(())
}
/// Appends a raw YAML document after trimming surrounding whitespace.
pub fn push_raw_yaml(&mut self, yaml: &str) {
let yaml = yaml.trim();
if !yaml.is_empty() {
@ -83,16 +101,20 @@ impl HelmManifest {
}
}
/// Appends all documents from another manifest.
pub fn extend(&mut self, other: Self) {
self.documents.extend(other.documents);
}
/// Renders all documents separated by `---`.
#[must_use]
pub fn render(&self) -> String {
self.documents.join("\n---\n")
}
}
/// Builds the standard forwarded port layout for a generated binary-config
/// k8s stack.
pub fn standard_port_specs(node_count: usize, api: u16, auxiliary: u16) -> PortSpecs {
PortSpecs {
nodes: (0..node_count)
@ -102,6 +124,7 @@ pub fn standard_port_specs(node_count: usize, api: u16, auxiliary: u16) -> PortS
}
impl BinaryConfigK8sSpec {
/// Builds the conventional binary-config spec for a generated chart.
#[must_use]
pub fn conventional(
chart_name: &str,
@ -137,6 +160,8 @@ impl BinaryConfigK8sSpec {
}
}
/// Renders a temporary single-template Helm chart for the standard
/// binary-config node layout.
pub fn render_binary_config_node_chart_assets<E>(
deployment: &E::Deployment,
spec: &BinaryConfigK8sSpec,
@ -153,6 +178,8 @@ where
)
}
/// Renders the standard ConfigMap, Deployment, and Service objects for each
/// node in a binary-config k8s stack.
pub fn render_binary_config_node_manifest<E>(
deployment: &E::Deployment,
spec: &BinaryConfigK8sSpec,
@ -222,6 +249,7 @@ fn k8s_image(spec: &BinaryConfigK8sSpec) -> String {
.unwrap_or_else(|_| spec.default_image.clone())
}
/// Renders a minimal chart directory with a single YAML template file.
pub fn render_single_template_chart_assets(
chart_name: &str,
template_name: &str,
@ -239,6 +267,7 @@ pub fn render_single_template_chart_assets(
})
}
/// Renders a chart from an in-memory manifest.
pub fn render_manifest_chart_assets(
chart_name: &str,
template_name: &str,
@ -247,6 +276,7 @@ pub fn render_manifest_chart_assets(
render_single_template_chart_assets(chart_name, template_name, &manifest.render())
}
/// Converts forwarded k8s ports into generic node access.
pub fn discovered_node_access(host: &str, api_port: u16, auxiliary_port: u16) -> NodeAccess {
NodeAccess::new(host, api_port).with_testing_port(auxiliary_port)
}
@ -259,6 +289,7 @@ fn normalize_yaml_document(yaml: &str) -> String {
yaml.trim_start_matches("---\n").trim().to_owned()
}
/// Installs a Helm release and returns the corresponding cleanup handle.
pub async fn install_helm_release_with_cleanup<A: HelmReleaseAssets>(
client: &Client,
assets: &A,
@ -281,7 +312,9 @@ pub async fn install_helm_release_with_cleanup<A: HelmReleaseAssets>(
}
#[async_trait]
/// Prepared k8s stack ready to be installed into a namespace.
pub trait PreparedK8sStack: Send + Sync {
/// Installs the prepared stack and returns the cleanup handle.
async fn install(
&self,
client: &Client,
@ -311,15 +344,19 @@ where
/// Advanced k8s deployer integration.
#[async_trait]
pub trait K8sDeployEnv: Application + Sized {
/// Prepared stack type produced by this environment.
type Assets: PreparedK8sStack + Send + Sync + 'static;
/// Returns the ports that must be exposed and forwarded for the deployment.
fn collect_port_specs(topology: &Self::Deployment) -> PortSpecs;
/// Prepares installable assets for the deployment.
fn prepare_assets(
topology: &Self::Deployment,
metrics_otlp_ingest_url: Option<&Url>,
) -> Result<Self::Assets, DynError>;
/// Installs one prepared stack into the target namespace and release name.
async fn install_stack(
client: &Client,
assets: &Self::Assets,
@ -333,10 +370,12 @@ pub trait K8sDeployEnv: Application + Sized {
assets.install(client, namespace, release, nodes).await
}
/// Returns the generated namespace and release names for one test run.
fn cluster_identifiers() -> (String, String) {
default_cluster_identifiers()
}
/// Builds one node client from forwarded k8s ports.
fn node_client_from_ports(
host: &str,
api_port: u16,
@ -349,6 +388,7 @@ pub trait K8sDeployEnv: Application + Sized {
))
}
/// Builds node clients for the full deployment.
fn build_node_clients(
host: &str,
node_api_ports: &[u16],
@ -363,10 +403,12 @@ pub trait K8sDeployEnv: Application + Sized {
.collect()
}
/// Returns the readiness endpoint path used for remote HTTP probes.
fn node_readiness_path() -> &'static str {
<Self as Application>::node_readiness_path()
}
/// Waits for remote node readiness after port-forwarding is established.
async fn wait_remote_readiness(
_deployment: &Self::Deployment,
urls: &[Url],
@ -384,22 +426,27 @@ pub trait K8sDeployEnv: Application + Sized {
Ok(())
}
/// Returns the Kubernetes role label used for node workloads.
fn node_role() -> &'static str {
"node"
}
/// Returns the Deployment name for one node.
fn node_deployment_name(release: &str, index: usize) -> String {
default_node_name(release, index)
}
/// Returns the Service name for one node.
fn node_service_name(release: &str, index: usize) -> String {
default_node_name(release, index)
}
/// Returns the label selector used by attach flows to locate node services.
fn attach_node_service_selector(release: &str) -> String {
default_attach_node_service_selector(release)
}
/// Waits for direct HTTP readiness against forwarded node ports.
async fn wait_for_node_http(
ports: &[u16],
role: &'static str,
@ -421,20 +468,26 @@ pub trait K8sDeployEnv: Application + Sized {
Ok(())
}
/// Returns the externally reachable base URL for a node client, if the app
/// needs it for attach or manual flows.
fn node_base_url(_client: &Self::NodeClient) -> Option<String> {
None
}
/// Returns the cfgsync service host and port when manual-cluster override
/// flows are enabled.
fn cfgsync_service(_release: &str) -> Option<(String, u16)> {
None
}
/// Returns cfgsync hostnames for all nodes in the deployment.
fn cfgsync_hostnames(release: &str, node_count: usize) -> Vec<String> {
(0..node_count)
.map(|index| Self::node_service_name(release, index))
.collect()
}
/// Builds app-specific cfgsync override artifacts for one node.
fn build_cfgsync_override_artifacts(
_deployment: &Self::Deployment,
_node_index: usize,
@ -450,8 +503,10 @@ pub trait K8sBinaryApp: Application + StaticNodeConfigProvider + Sized
where
Self::Deployment: DeploymentDescriptor,
{
/// Returns the standard binary+config install specification.
fn k8s_binary_spec() -> BinaryConfigK8sSpec;
/// Adds extra YAML resources to the generated manifest.
fn extend_k8s_manifest(
_deployment: &Self::Deployment,
_manifest: &mut HelmManifest,
@ -459,6 +514,7 @@ where
Ok(())
}
/// Builds node clients for the full deployment.
fn build_node_clients(
host: &str,
node_api_ports: &[u16],
@ -477,10 +533,12 @@ where
.collect()
}
/// Returns the Kubernetes role label used for node workloads.
fn node_role() -> &'static str {
"node"
}
/// Returns the externally reachable base URL for a node client, if needed.
fn node_base_url(_client: &Self::NodeClient) -> Option<String> {
None
}

View File

@ -18,7 +18,9 @@ use crate::{
};
#[derive(Default)]
/// Port specification for a k8s deployment before port-forwarding starts.
pub struct PortSpecs {
/// Per-node API and auxiliary ports that must be exposed.
pub nodes: Vec<NodeConfigPorts>,
}
@ -35,12 +37,15 @@ pub struct ClusterEnvironment {
}
#[derive(Debug, thiserror::Error)]
/// Failures while managing the cluster environment wrapper.
pub enum ClusterEnvironmentError {
#[error("cleanup guard is missing (it may have already been consumed)")]
/// The environment no longer owns a cleanup guard.
MissingCleanupGuard,
}
impl ClusterEnvironment {
/// Creates a cluster environment from forwarded ports and cleanup state.
pub fn new(
client: Client,
namespace: String,
@ -64,6 +69,8 @@ impl ClusterEnvironment {
}
}
/// Marks the environment as failed, dumps diagnostics, and triggers
/// cleanup.
pub async fn fail(&mut self, reason: &str) {
tracing::error!(
reason = reason,
@ -78,6 +85,8 @@ impl ClusterEnvironment {
}
}
/// Consumes the environment and returns the cleanup handle plus remaining
/// port-forwards.
pub fn into_cleanup(
self,
) -> Result<(RunnerCleanup, Vec<PortForwardHandle>), ClusterEnvironmentError> {
@ -88,38 +97,44 @@ impl ClusterEnvironment {
}
#[allow(dead_code)]
/// Returns the namespace backing this cluster.
pub fn namespace(&self) -> &str {
&self.namespace
}
#[allow(dead_code)]
/// Returns the Helm release name backing this cluster.
pub fn release(&self) -> &str {
&self.release
}
/// Returns the Kubernetes client used by this environment.
pub fn client(&self) -> &Client {
&self.client
}
/// Returns forwarded API and auxiliary node ports.
pub fn node_ports(&self) -> (&[u16], &[u16]) {
(&self.node_api_ports, &self.node_auxiliary_ports)
}
}
#[derive(Debug, thiserror::Error)]
/// Failures while building node clients against forwarded ports.
#[derive(Debug, thiserror::Error)]
pub enum NodeClientError {
#[error("failed to build node clients: {source}")]
/// Building one or more node clients failed.
Build {
#[source]
source: DynError,
},
}
#[derive(Debug, thiserror::Error)]
/// Readiness check failures for the remote cluster endpoints.
#[derive(Debug, thiserror::Error)]
pub enum RemoteReadinessError {
#[error("failed to build readiness URL for {role} port {port}: {source}")]
/// Building one readiness URL failed.
Endpoint {
role: &'static str,
port: u16,
@ -127,18 +142,21 @@ pub enum RemoteReadinessError {
source: ParseError,
},
#[error("remote readiness probe failed: {source}")]
/// The remote readiness probe failed after the URLs were built.
Remote {
#[source]
source: DynError,
},
}
/// Collects the required port-forward specification for one deployment.
pub fn collect_port_specs<E: K8sDeployEnv>(descriptors: &E::Deployment) -> PortSpecs {
let specs = collect_k8s_port_specs::<E>(descriptors);
debug!(nodes = specs.nodes.len(), "collected k8s port specs");
specs
}
/// Builds node clients against the forwarded cluster ports.
pub fn build_node_clients<E: K8sDeployEnv>(
cluster: &ClusterEnvironment,
) -> Result<NodeClients<E>, NodeClientError> {
@ -154,6 +172,8 @@ pub fn build_node_clients<E: K8sDeployEnv>(
Ok(NodeClients::new(nodes))
}
/// Waits until the remote k8s cluster satisfies the requested readiness
/// requirement.
pub async fn ensure_cluster_readiness<E: K8sDeployEnv>(
descriptors: &E::Deployment,
cluster: &ClusterEnvironment,
@ -176,6 +196,7 @@ pub async fn ensure_cluster_readiness<E: K8sDeployEnv>(
Ok(())
}
/// Waits for cluster port-forwards and cleans up on failure.
pub async fn wait_for_ports_or_cleanup<E: K8sDeployEnv>(
client: &Client,
namespace: &str,
@ -205,6 +226,7 @@ pub async fn wait_for_ports_or_cleanup<E: K8sDeployEnv>(
}
}
/// Stops all active port-forwards and clears the handle list.
pub fn kill_port_forwards(handles: &mut Vec<PortForwardHandle>) {
for handle in handles.iter_mut() {
handle.shutdown();

View File

@ -14,64 +14,81 @@ use crate::{
process::{LaunchSpec, NodeEndpointPort, NodeEndpoints, ProcessSpawnError},
};
/// Result of building a local node config together with the node's reserved
/// network port.
pub struct BuiltNodeConfig<Config> {
/// Materialized node config value.
pub config: Config,
/// Reserved network port used for peer traffic.
pub network_port: u16,
}
/// Named initial config entry generated for one node.
pub struct NodeConfigEntry<NodeConfigValue> {
/// Stable generated node name.
pub name: String,
/// Config value associated with `name`.
pub config: NodeConfigValue,
}
/// Reserved local ports assigned to one node.
pub struct LocalNodePorts {
network_port: u16,
named_ports: HashMap<&'static str, u16>,
}
impl LocalNodePorts {
/// Returns the reserved network port.
#[must_use]
pub fn network_port(&self) -> u16 {
self.network_port
}
/// Returns a reserved named port, if present.
#[must_use]
pub fn get(&self, name: &str) -> Option<u16> {
self.named_ports.get(name).copied()
}
/// Returns a reserved named port or an error if it is missing.
pub fn require(&self, name: &str) -> Result<u16, DynError> {
self.get(name)
.ok_or_else(|| format!("missing reserved local port '{name}'").into())
}
/// Iterates over all reserved named ports.
pub fn iter(&self) -> impl Iterator<Item = (&'static str, u16)> + '_ {
self.named_ports.iter().map(|(name, port)| (*name, *port))
}
}
#[derive(Clone, Debug)]
/// Peer node view used while constructing local configs.
pub struct LocalPeerNode {
index: usize,
network_port: u16,
}
impl LocalPeerNode {
/// Returns the peer's zero-based node index.
#[must_use]
pub fn index(&self) -> usize {
self.index
}
/// Returns the peer's reserved network port.
#[must_use]
pub fn network_port(&self) -> u16 {
self.network_port
}
/// Returns the peer's loopback HTTP authority as `127.0.0.1:<port>`.
#[must_use]
pub fn http_address(&self) -> String {
format!("127.0.0.1:{}", self.network_port)
}
/// Returns the peer authority used in local configs.
#[must_use]
pub fn authority(&self) -> String {
self.http_address()
@ -79,16 +96,24 @@ impl LocalPeerNode {
}
#[derive(Clone, Default)]
/// Standard local process description for one node binary plus one config file.
pub struct LocalProcessSpec {
/// Environment variable that points to the node binary.
pub binary_env_var: String,
/// Fallback binary name resolved from `PATH` or `target/`.
pub binary_name: String,
/// Config file name written into the temp launch directory.
pub config_file_name: String,
/// CLI flag used to point the process at `config_file_name`.
pub config_arg: String,
/// Extra CLI arguments passed after the config flag.
pub extra_args: Vec<String>,
/// Extra environment variables for the child process.
pub env: Vec<crate::process::LaunchEnvVar>,
}
impl LocalProcessSpec {
/// Creates a standard binary+config local process spec.
#[must_use]
pub fn new(binary_env_var: &str, binary_name: &str) -> Self {
Self {
@ -101,6 +126,7 @@ impl LocalProcessSpec {
}
}
/// Overrides the config file name and CLI flag used to pass it.
#[must_use]
pub fn with_config_file(mut self, file_name: &str, arg: &str) -> Self {
self.config_file_name = file_name.to_owned();
@ -108,17 +134,20 @@ impl LocalProcessSpec {
self
}
/// Appends one extra environment variable.
#[must_use]
pub fn with_env(mut self, key: &str, value: &str) -> Self {
self.env.push(crate::process::LaunchEnvVar::new(key, value));
self
}
/// Convenience helper for setting `RUST_LOG`.
#[must_use]
pub fn with_rust_log(self, value: &str) -> Self {
self.with_env("RUST_LOG", value)
}
/// Appends extra CLI arguments after the config flag pair.
#[must_use]
pub fn with_args(mut self, args: impl IntoIterator<Item = String>) -> Self {
self.extra_args.extend(args);
@ -126,6 +155,7 @@ impl LocalProcessSpec {
}
}
/// Preallocates `count` local TCP ports for later use.
pub fn preallocate_ports(count: usize, label: &str) -> Result<Vec<u16>, ProcessSpawnError> {
(0..count)
.map(|_| crate::process::allocate_available_port())
@ -135,6 +165,7 @@ pub fn preallocate_ports(count: usize, label: &str) -> Result<Vec<u16>, ProcessS
})
}
/// Builds a stable `name_prefix-{index}` config list.
pub fn build_indexed_node_configs<T>(
count: usize,
name_prefix: &str,
@ -150,6 +181,7 @@ pub fn build_indexed_node_configs<T>(
.collect()
}
/// Reserves network and named ports for `count` local nodes.
pub fn reserve_local_node_ports(
count: usize,
names: &[&'static str],
@ -172,10 +204,13 @@ pub fn reserve_local_node_ports(
.collect())
}
/// Builds the default single-HTTP-endpoint node access shape.
pub fn single_http_node_endpoints(port: u16) -> NodeEndpoints {
NodeEndpoints::from_api_port(port)
}
/// Builds a cluster node config for the local loopback environment from the
/// shared cluster-config application model.
pub fn build_local_cluster_node_config<E>(
index: usize,
ports: &LocalNodePorts,
@ -197,6 +232,8 @@ where
E::build_cluster_node_config(&node, &peer_views).map_err(Into::into)
}
/// Converts discovered local node endpoints into the generic `NodeAccess`
/// shape used by `Application::build_node_client`.
pub fn discovered_node_access(endpoints: &NodeEndpoints) -> NodeAccess {
let mut access = NodeAccess::new("127.0.0.1", endpoints.api.port());
@ -215,6 +252,8 @@ pub fn discovered_node_access(endpoints: &NodeEndpoints) -> NodeAccess {
access
}
/// Builds peer values from a full indexed port list while skipping
/// `self_index`.
pub fn build_indexed_http_peers<T>(
node_count: usize,
self_index: usize,
@ -235,6 +274,8 @@ pub(crate) fn compact_peer_ports(peer_ports: &[u16], self_index: usize) -> Vec<u
.collect()
}
/// Builds local peer-node views from a full indexed port list while skipping
/// `self_index`.
pub fn build_local_peer_nodes(peer_ports: &[u16], self_index: usize) -> Vec<LocalPeerNode> {
peer_ports
.iter()
@ -248,6 +289,7 @@ pub fn build_local_peer_nodes(peer_ports: &[u16], self_index: usize) -> Vec<Loca
.collect()
}
/// Generates the initial local node configs for one deployment.
pub fn build_generated_initial_nodes<E>(
topology: &E::Deployment,
node_name_prefix: &str,
@ -292,6 +334,7 @@ where
.collect()
}
/// Serializes a config as YAML and builds a launch spec for `spec`.
pub fn yaml_config_launch_spec<T: Serialize>(
config: &T,
spec: &LocalProcessSpec,
@ -300,6 +343,7 @@ pub fn yaml_config_launch_spec<T: Serialize>(
rendered_config_launch_spec(config_yaml.into_bytes(), spec)
}
/// Uses an already rendered text config to build a launch spec for `spec`.
pub fn text_config_launch_spec(
rendered_config: impl Into<Vec<u8>>,
spec: &LocalProcessSpec,
@ -307,6 +351,7 @@ pub fn text_config_launch_spec(
rendered_config_launch_spec(rendered_config.into(), spec)
}
/// Uses the standard binary+config launch shape for a YAML-rendered config.
pub fn default_yaml_launch_spec<T: Serialize>(
config: &T,
binary_env_var: &str,
@ -319,10 +364,12 @@ pub fn default_yaml_launch_spec<T: Serialize>(
)
}
/// Serializes a node config as YAML bytes.
pub fn yaml_node_config<T: Serialize>(config: &T) -> Result<Vec<u8>, DynError> {
Ok(serde_yaml::to_string(config)?.into_bytes())
}
/// Converts an already rendered text config into launch-file bytes.
pub fn text_node_config(rendered_config: impl Into<Vec<u8>>) -> Vec<u8> {
rendered_config.into()
}

View File

@ -29,13 +29,21 @@ pub use helpers::{
/// Context passed while building a local node config.
pub struct LocalBuildContext<'a, E: Application> {
/// Full deployment topology for the current scenario.
pub topology: &'a E::Deployment,
/// Zero-based node index being built.
pub index: usize,
/// Reserved local ports assigned to this node.
pub ports: &'a LocalNodePorts,
/// Peer nodes visible to this node after excluding `index`.
pub peers: &'a [LocalPeerNode],
/// Peer network ports for the current node view.
pub peer_ports: &'a [u16],
/// Peer ports keyed by application-defined port name.
pub peer_ports_by_name: &'a HashMap<String, u16>,
/// Start-time options for the node being built.
pub options: &'a StartNodeOptions<E>,
/// Optional template config to derive from during manual restart flows.
pub template_config: Option<&'a E::NodeConfig>,
}
@ -52,10 +60,14 @@ pub trait LocalDeployerEnv: Application + Sized
where
<Self as Application>::NodeConfig: Clone + Send + Sync + 'static,
{
/// Returns named ports that should be reserved in addition to the main
/// network port for each node.
fn local_port_names() -> &'static [&'static str] {
Self::initial_local_port_names()
}
/// Builds a node config and reserves any local ports needed for a node
/// started after initial cluster creation.
fn build_node_config(
topology: &Self::Deployment,
index: usize,
@ -73,6 +85,8 @@ where
)
}
/// Builds a node config from an optional template during restart or
/// manual-cluster flows.
fn build_node_config_from_template(
topology: &Self::Deployment,
index: usize,
@ -103,6 +117,7 @@ where
})
}
/// Builds the initial local configs for every node in the deployment.
fn build_initial_node_configs(
topology: &Self::Deployment,
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError> {
@ -129,14 +144,17 @@ where
)
}
/// Prefix used for generated node names such as `node-0`.
fn initial_node_name_prefix() -> &'static str {
"node"
}
/// Additional named ports to reserve for each initial node.
fn initial_local_port_names() -> &'static [&'static str] {
&[]
}
/// Builds one initial node config from already reserved ports.
fn build_initial_node_config(
topology: &Self::Deployment,
index: usize,
@ -157,6 +175,7 @@ where
)
}
/// Builds a local node config from peer port information.
fn build_local_node_config(
topology: &Self::Deployment,
index: usize,
@ -178,6 +197,7 @@ where
)
}
/// Builds a local node config from full peer node descriptions.
fn build_local_node_config_with_peers(
_topology: &Self::Deployment,
_index: usize,
@ -193,6 +213,8 @@ where
.into())
}
/// Returns the initial persist directory for a node, if one should be
/// mounted before startup.
fn initial_persist_dir(
_topology: &Self::Deployment,
_node_name: &str,
@ -201,6 +223,8 @@ where
None
}
/// Returns the initial snapshot directory for a node, if one should be
/// mounted before startup.
fn initial_snapshot_dir(
_topology: &Self::Deployment,
_node_name: &str,
@ -209,16 +233,20 @@ where
None
}
/// Returns the default local process description for this app.
fn local_process_spec() -> Option<LocalProcessSpec> {
None
}
/// Serializes a local node config into the file bytes written next to the
/// spawned process.
fn render_local_config(
_config: &<Self as Application>::NodeConfig,
) -> Result<Vec<u8>, DynError> {
Err(std::io::Error::other("render_local_config is not implemented for this app").into())
}
/// Builds the full launch spec for a local node process.
fn build_launch_spec(
config: &<Self as Application>::NodeConfig,
_dir: &Path,
@ -231,10 +259,13 @@ where
helpers::rendered_config_launch_spec(rendered, &spec)
}
/// Returns the main HTTP API port from a node config when the app follows
/// the standard single-HTTP-endpoint pattern.
fn http_api_port(_config: &<Self as Application>::NodeConfig) -> Option<u16> {
None
}
/// Resolves the full local endpoint set exposed by a node.
fn node_endpoints(
config: &<Self as Application>::NodeConfig,
) -> Result<NodeEndpoints, DynError> {
@ -248,14 +279,18 @@ where
Err(std::io::Error::other("node_endpoints is not implemented for this app").into())
}
/// Resolves the port peers should use for cluster traffic.
fn node_peer_port(node: &Node<Self>) -> u16 {
node.endpoints().api.port()
}
/// Builds a node client directly from the API endpoint when the default
/// `NodeAccess`-based path is not suitable.
fn node_client_from_api_endpoint(_api: SocketAddr) -> Option<Self::NodeClient> {
None
}
/// Builds a node client from discovered local endpoints.
fn node_client(endpoints: &NodeEndpoints) -> Result<Self::NodeClient, DynError> {
if let Ok(client) =
<Self as Application>::build_node_client(&discovered_node_access(endpoints))
@ -270,10 +305,13 @@ where
Err(std::io::Error::other("node_client is not implemented for this app").into())
}
/// Returns the readiness endpoint path used for local HTTP probes.
fn readiness_endpoint_path() -> &'static str {
<Self as Application>::node_readiness_path()
}
/// Waits for any additional cluster-specific stabilization after the HTTP
/// readiness probe succeeds.
async fn wait_readiness_stable(_nodes: &[Node<Self>]) -> Result<(), DynError> {
Ok(())
}
@ -290,12 +328,15 @@ pub trait LocalBinaryApp: Application + Sized
where
<Self as Application>::NodeConfig: Clone + Send + Sync + 'static,
{
/// Prefix used for generated initial node names such as `node-0`.
fn initial_node_name_prefix() -> &'static str;
/// Additional named ports to reserve for each local node.
fn initial_local_port_names() -> &'static [&'static str] {
&[]
}
/// Builds a local node config from full peer node descriptions.
fn build_local_node_config_with_peers(
topology: &Self::Deployment,
index: usize,
@ -306,17 +347,24 @@ where
template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<<Self as Application>::NodeConfig, DynError>;
/// Returns the standard process description for launching one local node.
fn local_process_spec() -> LocalProcessSpec;
/// Serializes a local node config into the file bytes written next to the
/// spawned process.
fn render_local_config(config: &<Self as Application>::NodeConfig)
-> Result<Vec<u8>, DynError>;
/// Returns the main HTTP API port used for discovery and readiness.
fn http_api_port(config: &<Self as Application>::NodeConfig) -> u16;
/// Returns the readiness endpoint path used for local HTTP probes.
fn readiness_endpoint_path() -> &'static str {
<Self as Application>::node_readiness_path()
}
/// Waits for any additional cluster-specific stabilization after the HTTP
/// readiness probe succeeds.
async fn wait_readiness_stable(_nodes: &[Node<Self>]) -> Result<(), DynError> {
Ok(())
}
@ -433,6 +481,8 @@ pub(crate) fn readiness_endpoint_path<E: LocalDeployerEnv>() -> &'static str {
E::readiness_endpoint_path()
}
/// Waits for local HTTP readiness across the provided nodes and then applies
/// any app-specific stabilization hook.
pub async fn wait_local_http_readiness<E: LocalDeployerEnv>(
nodes: &[Node<E>],
requirement: HttpReadinessRequirement,
@ -449,6 +499,7 @@ pub async fn wait_local_http_readiness<E: LocalDeployerEnv>(
.map_err(|source| ReadinessError::ClusterStable { source })
}
/// Spawns a local process node from an already prepared config value.
pub async fn spawn_node_from_config<E: LocalDeployerEnv>(
label: String,
config: <E as Application>::NodeConfig,