From 938d782f8d8d7c6d4ee5cdaf152112cb9134b070 Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 19 Dec 2025 01:41:07 +0100 Subject: [PATCH] deployers: split orchestrator and asset phases Refactors long deployer functions into smaller phase helpers. - k8s deploy_with_observability - Before: env/capability observability merge + cluster setup + client/telemetry/block feed init + endpoint printing + Runner construction. - After: resolve_observability_inputs/init_kube_client/build_node_clients_or_fail/ build_telemetry_or_fail/spawn_block_feed_or_fail/maybe_print_endpoints/finalize_runner. - k8s prepare_assets/build_values - Before: cfgsync render + tempdir + script/KZG/chart/values resolution in one function; duplicated validator/executor env building. - After: create_assets_tempdir/render_and_write_cfgsync/resolve_kzg_path/ render_and_write_values/testnet_image + build_node_group/build_node_values. - k8s install_release - Before: mixed KZG arg resolution + long CLI arg chain + debug logging. - After: resolve_kzg_install_args/build_install_command/maybe_log_install_output. - compose DeploymentOrchestrator::deploy - Before: env/cap observability merge + readiness + endpoint logging/printing + Runner construction. - After: resolve_observability_inputs/wait_for_readiness_or_grace_period/ log_observability_endpoints/maybe_print_endpoints. --- .../compose/src/deployer/orchestrator.rs | 113 +++++--- .../k8s/src/deployer/orchestrator.rs | 256 +++++++++++------- .../k8s/src/infrastructure/assets.rs | 198 +++++++------- .../deployers/k8s/src/infrastructure/helm.rs | 109 ++++++-- 4 files changed, 421 insertions(+), 255 deletions(-) diff --git a/testing-framework/deployers/compose/src/deployer/orchestrator.rs b/testing-framework/deployers/compose/src/deployer/orchestrator.rs index efc67eb..95bc984 100644 --- a/testing-framework/deployers/compose/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/compose/src/deployer/orchestrator.rs @@ -42,13 +42,7 @@ impl DeploymentOrchestrator { let setup = DeploymentSetup::new(scenario.topology()); setup.validate_environment().await?; - let env_inputs = ObservabilityInputs::from_env()?; - let cap_inputs = scenario - .capabilities() - .observability_capability() - .map(ObservabilityInputs::from_capability) - .unwrap_or_default(); - let observability = env_inputs.with_overrides(cap_inputs); + let observability = resolve_observability_inputs(scenario)?; let DeploymentContext { mut environment, @@ -73,12 +67,13 @@ impl DeploymentOrchestrator { let executor_count = descriptors.executors().len(); let host_ports = PortManager::prepare(&mut environment, &descriptors).await?; - if self.deployer.readiness_checks { - ReadinessChecker::wait_all(&descriptors, &host_ports, &mut environment).await?; - } else { - info!("readiness checks disabled; giving the stack a short grace period"); - crate::lifecycle::readiness::maybe_sleep_for_disabled_readiness(false).await; - } + wait_for_readiness_or_grace_period( + self.deployer.readiness_checks, + &descriptors, + &host_ports, + &mut environment, + ) + .await?; let host = compose_runner_host(); let client_builder = ClientBuilder::new(); @@ -88,32 +83,10 @@ impl DeploymentOrchestrator { let telemetry = observability.telemetry_handle()?; let node_control = self.maybe_node_control::(&environment); - if let Some(url) = observability.metrics_query_url.as_ref() { - info!(metrics_query_url = %url.as_str(), "metrics query endpoint configured"); - } - if let Some(url) = observability.grafana_url.as_ref() { - info!(grafana_url = %url.as_str(), "grafana url configured"); - } + log_observability_endpoints(&observability); log_profiling_urls(&host, &host_ports); - if std::env::var("TESTNET_PRINT_ENDPOINTS").is_ok() { - let prometheus = observability - .metrics_query_url - .as_ref() - .map(|u| u.as_str().to_string()) - .unwrap_or_else(|| "".to_string()); - let grafana = observability - .grafana_url - .as_ref() - .map(|u| u.as_str().to_string()) - .unwrap_or_else(|| "".to_string()); - println!( - "TESTNET_ENDPOINTS prometheus={} grafana={}", - prometheus, grafana - ); - - print_profiling_urls(&host, &host_ports); - } + maybe_print_endpoints(&observability, &host, &host_ports); let (block_feed, block_feed_guard) = client_builder .start_block_feed(&node_clients, &mut environment) @@ -158,6 +131,72 @@ impl DeploymentOrchestrator { } } +fn resolve_observability_inputs( + scenario: &Scenario, +) -> Result +where + Caps: ObservabilityCapabilityProvider, +{ + let env_inputs = ObservabilityInputs::from_env()?; + let cap_inputs = scenario + .capabilities() + .observability_capability() + .map(ObservabilityInputs::from_capability) + .unwrap_or_default(); + Ok(env_inputs.with_overrides(cap_inputs)) +} + +async fn wait_for_readiness_or_grace_period( + readiness_checks: bool, + descriptors: &testing_framework_core::topology::generation::GeneratedTopology, + host_ports: &HostPortMapping, + environment: &mut StackEnvironment, +) -> Result<(), ComposeRunnerError> { + if readiness_checks { + ReadinessChecker::wait_all(descriptors, host_ports, environment).await?; + return Ok(()); + } + + info!("readiness checks disabled; giving the stack a short grace period"); + crate::lifecycle::readiness::maybe_sleep_for_disabled_readiness(false).await; + Ok(()) +} + +fn log_observability_endpoints(observability: &ObservabilityInputs) { + if let Some(url) = observability.metrics_query_url.as_ref() { + info!( + metrics_query_url = %url.as_str(), + "metrics query endpoint configured" + ); + } + if let Some(url) = observability.grafana_url.as_ref() { + info!(grafana_url = %url.as_str(), "grafana url configured"); + } +} + +fn maybe_print_endpoints(observability: &ObservabilityInputs, host: &str, ports: &HostPortMapping) { + if std::env::var("TESTNET_PRINT_ENDPOINTS").is_err() { + return; + } + + let prometheus = observability + .metrics_query_url + .as_ref() + .map(|u| u.as_str().to_string()) + .unwrap_or_else(|| "".to_string()); + let grafana = observability + .grafana_url + .as_ref() + .map(|u| u.as_str().to_string()) + .unwrap_or_else(|| "".to_string()); + + println!( + "TESTNET_ENDPOINTS prometheus={} grafana={}", + prometheus, grafana + ); + print_profiling_urls(host, ports); +} + fn log_profiling_urls(host: &str, ports: &HostPortMapping) { for (idx, node) in ports.validators.iter().enumerate() { tracing::info!( diff --git a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs index 02edbf4..cc55100 100644 --- a/testing-framework/deployers/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/deployers/k8s/src/deployer/orchestrator.rs @@ -140,20 +140,14 @@ async fn deploy_with_observability( scenario: &Scenario, observability: Option<&ObservabilityCapability>, ) -> Result { - let env_inputs = ObservabilityInputs::from_env()?; - let cap_inputs = observability - .map(ObservabilityInputs::from_capability) - .unwrap_or_default(); - let observability = env_inputs.with_overrides(cap_inputs); + let observability = resolve_observability_inputs(observability)?; let descriptors = scenario.topology().clone(); let validator_count = descriptors.validators().len(); let executor_count = descriptors.executors().len(); ensure_supported_topology(&descriptors)?; - let client = Client::try_default() - .await - .map_err(|source| K8sRunnerError::ClientInit { source })?; + let client = init_kube_client().await?; info!( validators = validator_count, @@ -182,37 +176,12 @@ async fn deploy_with_observability( ); info!("building node clients"); - let environment = cluster - .as_ref() - .ok_or_else(|| K8sRunnerError::InternalInvariant { - message: "cluster must be available while building clients".to_owned(), - })?; - let node_clients = match build_node_clients(environment) { - Ok(clients) => clients, - Err(err) => { - fail_cluster(&mut cluster, "failed to construct node api clients").await; - error!(error = ?err, "failed to build k8s node clients"); - return Err(err.into()); - } - }; + let node_clients = build_node_clients_or_fail(&mut cluster).await?; - let telemetry = match observability.telemetry_handle() { - Ok(handle) => handle, - Err(err) => { - fail_cluster(&mut cluster, "failed to configure metrics telemetry handle").await; - error!(error = ?err, "failed to configure metrics telemetry handle"); - return Err(err.into()); - } - }; + let telemetry = build_telemetry_or_fail(&mut cluster, &observability).await?; - let (block_feed, block_feed_guard) = match spawn_block_feed_with(&node_clients).await { - Ok(pair) => pair, - Err(err) => { - fail_cluster(&mut cluster, "failed to initialize block feed").await; - error!(error = ?err, "failed to initialize block feed"); - return Err(err); - } - }; + let (block_feed, block_feed_guard) = + spawn_block_feed_or_fail(&mut cluster, &node_clients).await?; if let Some(url) = observability.metrics_query_url.as_ref() { info!( @@ -224,69 +193,19 @@ async fn deploy_with_observability( info!(grafana_url = %url.as_str(), "grafana url configured"); } - if std::env::var("TESTNET_PRINT_ENDPOINTS").is_ok() { - let prometheus = observability - .metrics_query_url - .as_ref() - .map(|u| u.as_str().to_string()) - .unwrap_or_else(|| "".to_string()); - println!( - "TESTNET_ENDPOINTS prometheus={} grafana={}", - prometheus, - observability - .grafana_url - .as_ref() - .map(|u| u.as_str().to_string()) - .unwrap_or_else(|| "".to_string()) - ); + maybe_print_endpoints(&observability, &node_clients); - for (idx, client) in node_clients.validator_clients().iter().enumerate() { - println!( - "TESTNET_PPROF validator_{}={}/debug/pprof/profile?seconds=15&format=proto", - idx, - client.base_url() - ); - } - - for (idx, client) in node_clients.executor_clients().iter().enumerate() { - println!( - "TESTNET_PPROF executor_{}={}/debug/pprof/profile?seconds=15&format=proto", - idx, - client.base_url() - ); - } - } - - let environment = cluster - .take() - .ok_or_else(|| K8sRunnerError::InternalInvariant { - message: "cluster should still be available".to_owned(), - })?; - let (cleanup, port_forwards) = environment.into_cleanup()?; - let cleanup_guard: Box = Box::new(K8sCleanupGuard::new( - cleanup, - block_feed_guard, - port_forwards, - )); - - let context = RunContext::new( + finalize_runner( + &mut cluster, descriptors, - None, node_clients, scenario.duration(), telemetry, block_feed, - None, - ); - - info!( - validators = validator_count, - executors = executor_count, - duration_secs = scenario.duration().as_secs(), - "k8s deployment ready; handing control to scenario runner" - ); - - Ok(Runner::new(context, Some(cleanup_guard))) + block_feed_guard, + validator_count, + executor_count, + ) } async fn setup_cluster( @@ -332,6 +251,155 @@ async fn setup_cluster( Ok(environment) } +fn resolve_observability_inputs( + observability: Option<&ObservabilityCapability>, +) -> Result { + let env_inputs = ObservabilityInputs::from_env()?; + let cap_inputs = observability + .map(ObservabilityInputs::from_capability) + .unwrap_or_default(); + Ok(env_inputs.with_overrides(cap_inputs)) +} + +async fn init_kube_client() -> Result { + Client::try_default() + .await + .map_err(|source| K8sRunnerError::ClientInit { source }) +} + +async fn build_node_clients_or_fail( + cluster: &mut Option, +) -> Result { + let environment = cluster + .as_ref() + .ok_or_else(|| K8sRunnerError::InternalInvariant { + message: "cluster must be available while building clients".to_owned(), + })?; + + match build_node_clients(environment) { + Ok(clients) => Ok(clients), + Err(err) => { + fail_cluster(cluster, "failed to construct node api clients").await; + error!(error = ?err, "failed to build k8s node clients"); + Err(err.into()) + } + } +} + +async fn build_telemetry_or_fail( + cluster: &mut Option, + observability: &ObservabilityInputs, +) -> Result { + match observability.telemetry_handle() { + Ok(handle) => Ok(handle), + Err(err) => { + fail_cluster(cluster, "failed to configure metrics telemetry handle").await; + error!(error = ?err, "failed to configure metrics telemetry handle"); + Err(err.into()) + } + } +} + +async fn spawn_block_feed_or_fail( + cluster: &mut Option, + node_clients: &testing_framework_core::scenario::NodeClients, +) -> Result<(testing_framework_core::scenario::BlockFeed, BlockFeedTask), K8sRunnerError> { + match spawn_block_feed_with(node_clients).await { + Ok(pair) => Ok(pair), + Err(err) => { + fail_cluster(cluster, "failed to initialize block feed").await; + error!(error = ?err, "failed to initialize block feed"); + Err(err) + } + } +} + +fn maybe_print_endpoints( + observability: &ObservabilityInputs, + node_clients: &testing_framework_core::scenario::NodeClients, +) { + if std::env::var("TESTNET_PRINT_ENDPOINTS").is_err() { + return; + } + + let prometheus = observability + .metrics_query_url + .as_ref() + .map(|u| u.as_str().to_string()) + .unwrap_or_else(|| "".to_string()); + + println!( + "TESTNET_ENDPOINTS prometheus={} grafana={}", + prometheus, + observability + .grafana_url + .as_ref() + .map(|u| u.as_str().to_string()) + .unwrap_or_else(|| "".to_string()) + ); + + for (idx, client) in node_clients.validator_clients().iter().enumerate() { + println!( + "TESTNET_PPROF validator_{}={}/debug/pprof/profile?seconds=15&format=proto", + idx, + client.base_url() + ); + } + + for (idx, client) in node_clients.executor_clients().iter().enumerate() { + println!( + "TESTNET_PPROF executor_{}={}/debug/pprof/profile?seconds=15&format=proto", + idx, + client.base_url() + ); + } +} + +#[allow(clippy::too_many_arguments)] +fn finalize_runner( + cluster: &mut Option, + descriptors: GeneratedTopology, + node_clients: testing_framework_core::scenario::NodeClients, + duration: std::time::Duration, + telemetry: testing_framework_core::scenario::Metrics, + block_feed: testing_framework_core::scenario::BlockFeed, + block_feed_guard: BlockFeedTask, + validator_count: usize, + executor_count: usize, +) -> Result { + let environment = cluster + .take() + .ok_or_else(|| K8sRunnerError::InternalInvariant { + message: "cluster should still be available".to_owned(), + })?; + let (cleanup, port_forwards) = environment.into_cleanup()?; + + let cleanup_guard: Box = Box::new(K8sCleanupGuard::new( + cleanup, + block_feed_guard, + port_forwards, + )); + + let context = RunContext::new( + descriptors, + None, + node_clients, + duration, + telemetry, + block_feed, + None, + ); + + info!( + validators = validator_count, + executors = executor_count, + duration_secs = duration.as_secs(), + "k8s deployment ready; handing control to scenario runner" + ); + + Ok(Runner::new(context, Some(cleanup_guard))) +} + struct K8sCleanupGuard { cleanup: RunnerCleanup, block_feed: Option, diff --git a/testing-framework/deployers/k8s/src/infrastructure/assets.rs b/testing-framework/deployers/k8s/src/infrastructure/assets.rs index 90684fc..f58afaa 100644 --- a/testing-framework/deployers/k8s/src/infrastructure/assets.rs +++ b/testing-framework/deployers/k8s/src/infrastructure/assets.rs @@ -91,24 +91,21 @@ pub fn prepare_assets( let root = workspace_root().map_err(|source| AssetsError::WorkspaceRoot { source })?; let kzg_spec = KzgParamsSpec::for_k8s(&root); - let cfgsync_yaml = render_cfgsync_config(&root, topology, &kzg_spec, metrics_otlp_ingest_url)?; - let tempdir = tempfile::Builder::new() - .prefix("nomos-helm-") - .tempdir() - .map_err(|source| AssetsError::TempDir { source })?; + let tempdir = create_assets_tempdir()?; - let cfgsync_file = write_temp_file(tempdir.path(), "cfgsync.yaml", cfgsync_yaml)?; + let cfgsync_file = render_and_write_cfgsync( + &root, + topology, + &kzg_spec, + metrics_otlp_ingest_url, + &tempdir, + )?; let scripts = validate_scripts(&root)?; - let kzg_path = match kzg_spec.mode { - KzgMode::HostPath => Some(validate_kzg_params(&root, &kzg_spec)?), - KzgMode::InImage => None, - }; + let kzg_path = resolve_kzg_path(&root, &kzg_spec)?; let chart_path = helm_chart_path()?; - let values_yaml = render_values_yaml(topology)?; - let values_file = write_temp_file(tempdir.path(), "values.yaml", values_yaml)?; - let image = tf_env::nomos_testnet_image() - .unwrap_or_else(|| String::from("public.ecr.aws/r4s5t9y4/logos/logos-blockchain:test")); + let values_file = render_and_write_values(topology, &tempdir)?; + let image = testnet_image(); let kzg_display = kzg_path .as_ref() @@ -139,6 +136,44 @@ pub fn prepare_assets( }) } +fn create_assets_tempdir() -> Result { + tempfile::Builder::new() + .prefix("nomos-helm-") + .tempdir() + .map_err(|source| AssetsError::TempDir { source }) +} + +fn render_and_write_cfgsync( + root: &Path, + topology: &GeneratedTopology, + kzg_spec: &KzgParamsSpec, + metrics_otlp_ingest_url: Option<&Url>, + tempdir: &TempDir, +) -> Result { + let cfgsync_yaml = render_cfgsync_config(root, topology, kzg_spec, metrics_otlp_ingest_url)?; + write_temp_file(tempdir.path(), "cfgsync.yaml", cfgsync_yaml) +} + +fn resolve_kzg_path(root: &Path, kzg_spec: &KzgParamsSpec) -> Result, AssetsError> { + match kzg_spec.mode { + KzgMode::HostPath => Ok(Some(validate_kzg_params(root, kzg_spec)?)), + KzgMode::InImage => Ok(None), + } +} + +fn render_and_write_values( + topology: &GeneratedTopology, + tempdir: &TempDir, +) -> Result { + let values_yaml = render_values_yaml(topology)?; + write_temp_file(tempdir.path(), "values.yaml", values_yaml) +} + +fn testnet_image() -> String { + tf_env::nomos_testnet_image() + .unwrap_or_else(|| String::from("public.ecr.aws/r4s5t9y4/logos/logos-blockchain:test")) +} + const CFGSYNC_K8S_TIMEOUT_SECS: u64 = 300; fn render_cfgsync_config( @@ -313,91 +348,64 @@ fn build_values(topology: &GeneratedTopology) -> HelmValues { let image_pull_policy = tf_env::nomos_testnet_image_pull_policy().unwrap_or_else(|| "IfNotPresent".into()); debug!(pol_mode, "rendering Helm values for k8s stack"); - let validators = topology - .validators() - .iter() - .enumerate() - .map(|(index, validator)| { - let mut env = BTreeMap::new(); - env.insert("POL_PROOF_DEV_MODE".into(), pol_mode.clone()); - env.insert( - "CFG_NETWORK_PORT".into(), - validator.network_port().to_string(), - ); - env.insert("CFG_DA_PORT".into(), validator.da_port.to_string()); - env.insert("CFG_BLEND_PORT".into(), validator.blend_port.to_string()); - env.insert( - "CFG_API_PORT".into(), - validator.general.api_config.address.port().to_string(), - ); - env.insert( - "CFG_TESTING_HTTP_PORT".into(), - validator - .general - .api_config - .testing_http_address - .port() - .to_string(), - ); - env.insert("CFG_HOST_KIND".into(), "validator".into()); - env.insert("CFG_HOST_IDENTIFIER".into(), format!("validator-{index}")); - - NodeValues { - api_port: validator.general.api_config.address.port(), - testing_http_port: validator.general.api_config.testing_http_address.port(), - env, - } - }) - .collect(); - - let executors = topology - .executors() - .iter() - .enumerate() - .map(|(index, executor)| { - let mut env = BTreeMap::new(); - env.insert("POL_PROOF_DEV_MODE".into(), pol_mode.clone()); - env.insert( - "CFG_NETWORK_PORT".into(), - executor.network_port().to_string(), - ); - env.insert("CFG_DA_PORT".into(), executor.da_port.to_string()); - env.insert("CFG_BLEND_PORT".into(), executor.blend_port.to_string()); - env.insert( - "CFG_API_PORT".into(), - executor.general.api_config.address.port().to_string(), - ); - env.insert( - "CFG_TESTING_HTTP_PORT".into(), - executor - .general - .api_config - .testing_http_address - .port() - .to_string(), - ); - env.insert("CFG_HOST_KIND".into(), "executor".into()); - env.insert("CFG_HOST_IDENTIFIER".into(), format!("executor-{index}")); - - NodeValues { - api_port: executor.general.api_config.address.port(), - testing_http_port: executor.general.api_config.testing_http_address.port(), - env, - } - }) - .collect(); + let validators = build_node_group("validator", topology.validators(), &pol_mode); + let executors = build_node_group("executor", topology.executors(), &pol_mode); HelmValues { image_pull_policy, cfgsync, - validators: NodeGroup { - count: topology.validators().len(), - nodes: validators, - }, - executors: NodeGroup { - count: topology.executors().len(), - nodes: executors, - }, + validators, + executors, + } +} + +fn build_node_group( + kind: &'static str, + nodes: &[testing_framework_core::topology::generation::GeneratedNodeConfig], + pol_mode: &str, +) -> NodeGroup { + let node_values = nodes + .iter() + .enumerate() + .map(|(index, node)| build_node_values(kind, index, node, pol_mode)) + .collect(); + + NodeGroup { + count: nodes.len(), + nodes: node_values, + } +} + +fn build_node_values( + kind: &'static str, + index: usize, + node: &testing_framework_core::topology::generation::GeneratedNodeConfig, + pol_mode: &str, +) -> NodeValues { + let mut env = BTreeMap::new(); + env.insert("POL_PROOF_DEV_MODE".into(), pol_mode.to_string()); + env.insert("CFG_NETWORK_PORT".into(), node.network_port().to_string()); + env.insert("CFG_DA_PORT".into(), node.da_port.to_string()); + env.insert("CFG_BLEND_PORT".into(), node.blend_port.to_string()); + env.insert( + "CFG_API_PORT".into(), + node.general.api_config.address.port().to_string(), + ); + env.insert( + "CFG_TESTING_HTTP_PORT".into(), + node.general + .api_config + .testing_http_address + .port() + .to_string(), + ); + env.insert("CFG_HOST_KIND".into(), kind.to_string()); + env.insert("CFG_HOST_IDENTIFIER".into(), format!("{kind}-{index}")); + + NodeValues { + api_port: node.general.api_config.address.port(), + testing_http_port: node.general.api_config.testing_http_address.port(), + env, } } diff --git a/testing-framework/deployers/k8s/src/infrastructure/helm.rs b/testing-framework/deployers/k8s/src/infrastructure/helm.rs index 9fadd9b..e4ed23b 100644 --- a/testing-framework/deployers/k8s/src/infrastructure/helm.rs +++ b/testing-framework/deployers/k8s/src/infrastructure/helm.rs @@ -1,4 +1,4 @@ -use std::{io, process::Stdio}; +use std::{io, path::Path, process::Stdio}; use thiserror::Error; use tokio::process::Command; @@ -34,18 +34,7 @@ pub async fn install_release( validators: usize, executors: usize, ) -> Result<(), HelmError> { - let (host_path_type, host_path) = match assets.kzg_mode { - KzgMode::HostPath => { - let host_path = assets.kzg_path.as_ref().ok_or(HelmError::MissingKzgPath)?; - let host_path_type = if host_path.is_dir() { - "Directory" - } else { - "File" - }; - (Some(host_path_type), Some(host_path)) - } - KzgMode::InImage => (None, None), - }; + let kzg = resolve_kzg_install_args(assets)?; info!( release, namespace, @@ -54,14 +43,69 @@ pub async fn install_release( image = %assets.image, cfgsync_port = cfgsync_port_value(), kzg_mode = ?assets.kzg_mode, - kzg = %host_path - .as_ref() - .map(|p| p.display().to_string()) - .unwrap_or_else(|| "".to_string()), + kzg = %kzg.display(), values = %assets.values_file.display(), "installing helm release" ); + let command = format!("helm install {release}"); + let cmd = build_install_command( + assets, release, namespace, validators, executors, &kzg, &command, + ); + let output = run_helm_command(cmd, &command).await?; + + maybe_log_install_output(&command, &output); + + info!(release, namespace, "helm install completed"); + Ok(()) +} + +struct KzgInstallArgs<'a> { + mode: &'static str, + host_path: Option<&'a Path>, + host_path_type: Option<&'static str>, +} + +impl KzgInstallArgs<'_> { + fn display(&self) -> String { + self.host_path + .map(|p| p.display().to_string()) + .unwrap_or_else(|| "".to_string()) + } +} + +fn resolve_kzg_install_args(assets: &RunnerAssets) -> Result, HelmError> { + match assets.kzg_mode { + KzgMode::HostPath => { + let host_path = assets.kzg_path.as_ref().ok_or(HelmError::MissingKzgPath)?; + let host_path_type = if host_path.is_dir() { + "Directory" + } else { + "File" + }; + Ok(KzgInstallArgs { + mode: "kzg.mode=hostPath", + host_path: Some(host_path), + host_path_type: Some(host_path_type), + }) + } + KzgMode::InImage => Ok(KzgInstallArgs { + mode: "kzg.mode=inImage", + host_path: None, + host_path_type: None, + }), + } +} + +fn build_install_command( + assets: &RunnerAssets, + release: &str, + namespace: &str, + validators: usize, + executors: usize, + kzg: &KzgInstallArgs<'_>, + command: &str, +) -> Command { let mut cmd = Command::new("helm"); cmd.arg("install") .arg(release) @@ -83,10 +127,7 @@ pub async fn install_release( .arg("-f") .arg(&assets.values_file) .arg("--set") - .arg(match assets.kzg_mode { - KzgMode::HostPath => "kzg.mode=hostPath", - KzgMode::InImage => "kzg.mode=inImage", - }) + .arg(kzg.mode) .arg("--set-file") .arg(format!("cfgsync.config={}", assets.cfgsync_file.display())) .arg("--set-file") @@ -112,7 +153,7 @@ pub async fn install_release( .stdout(Stdio::piped()) .stderr(Stdio::piped()); - if let (Some(host_path), Some(host_path_type)) = (host_path, host_path_type) { + if let (Some(host_path), Some(host_path_type)) = (kzg.host_path, kzg.host_path_type) { cmd.arg("--set") .arg(format!("kzg.hostPath={}", host_path.display())) .arg("--set") @@ -123,16 +164,26 @@ pub async fn install_release( cmd.current_dir(root); } - let command = format!("helm install {release}"); - let output = run_helm_command(cmd, &command).await?; + cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); + debug!(command, "prepared helm install command"); + cmd +} - if std::env::var("K8S_RUNNER_DEBUG").is_ok() { - debug!(command, stdout = %String::from_utf8_lossy(&output.stdout), "helm install stdout"); - debug!(command, stderr = %String::from_utf8_lossy(&output.stderr), "helm install stderr"); +fn maybe_log_install_output(command: &str, output: &std::process::Output) { + if std::env::var("K8S_RUNNER_DEBUG").is_err() { + return; } - info!(release, namespace, "helm install completed"); - Ok(()) + debug!( + command, + stdout = %String::from_utf8_lossy(&output.stdout), + "helm install stdout" + ); + debug!( + command, + stderr = %String::from_utf8_lossy(&output.stderr), + "helm install stderr" + ); } /// Uninstall the release and namespace resources.