mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-01-02 13:23:13 +00:00
deployers: split orchestrator and asset phases
Refactors long deployer functions into smaller phase helpers.
- k8s deploy_with_observability
- Before: env/capability observability merge + cluster setup + client/telemetry/block feed init + endpoint printing + Runner construction.
- After: resolve_observability_inputs/init_kube_client/build_node_clients_or_fail/
build_telemetry_or_fail/spawn_block_feed_or_fail/maybe_print_endpoints/finalize_runner.
- k8s prepare_assets/build_values
- Before: cfgsync render + tempdir + script/KZG/chart/values resolution in one function; duplicated validator/executor env building.
- After: create_assets_tempdir/render_and_write_cfgsync/resolve_kzg_path/
render_and_write_values/testnet_image + build_node_group/build_node_values.
- k8s install_release
- Before: mixed KZG arg resolution + long CLI arg chain + debug logging.
- After: resolve_kzg_install_args/build_install_command/maybe_log_install_output.
- compose DeploymentOrchestrator::deploy
- Before: env/cap observability merge + readiness + endpoint logging/printing + Runner construction.
- After: resolve_observability_inputs/wait_for_readiness_or_grace_period/
log_observability_endpoints/maybe_print_endpoints.
This commit is contained in:
parent
30a18710da
commit
938d782f8d
@ -42,13 +42,7 @@ impl DeploymentOrchestrator {
|
||||
let setup = DeploymentSetup::new(scenario.topology());
|
||||
setup.validate_environment().await?;
|
||||
|
||||
let env_inputs = ObservabilityInputs::from_env()?;
|
||||
let cap_inputs = scenario
|
||||
.capabilities()
|
||||
.observability_capability()
|
||||
.map(ObservabilityInputs::from_capability)
|
||||
.unwrap_or_default();
|
||||
let observability = env_inputs.with_overrides(cap_inputs);
|
||||
let observability = resolve_observability_inputs(scenario)?;
|
||||
|
||||
let DeploymentContext {
|
||||
mut environment,
|
||||
@ -73,12 +67,13 @@ impl DeploymentOrchestrator {
|
||||
let executor_count = descriptors.executors().len();
|
||||
let host_ports = PortManager::prepare(&mut environment, &descriptors).await?;
|
||||
|
||||
if self.deployer.readiness_checks {
|
||||
ReadinessChecker::wait_all(&descriptors, &host_ports, &mut environment).await?;
|
||||
} else {
|
||||
info!("readiness checks disabled; giving the stack a short grace period");
|
||||
crate::lifecycle::readiness::maybe_sleep_for_disabled_readiness(false).await;
|
||||
}
|
||||
wait_for_readiness_or_grace_period(
|
||||
self.deployer.readiness_checks,
|
||||
&descriptors,
|
||||
&host_ports,
|
||||
&mut environment,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let host = compose_runner_host();
|
||||
let client_builder = ClientBuilder::new();
|
||||
@ -88,32 +83,10 @@ impl DeploymentOrchestrator {
|
||||
let telemetry = observability.telemetry_handle()?;
|
||||
let node_control = self.maybe_node_control::<Caps>(&environment);
|
||||
|
||||
if let Some(url) = observability.metrics_query_url.as_ref() {
|
||||
info!(metrics_query_url = %url.as_str(), "metrics query endpoint configured");
|
||||
}
|
||||
if let Some(url) = observability.grafana_url.as_ref() {
|
||||
info!(grafana_url = %url.as_str(), "grafana url configured");
|
||||
}
|
||||
log_observability_endpoints(&observability);
|
||||
log_profiling_urls(&host, &host_ports);
|
||||
|
||||
if std::env::var("TESTNET_PRINT_ENDPOINTS").is_ok() {
|
||||
let prometheus = observability
|
||||
.metrics_query_url
|
||||
.as_ref()
|
||||
.map(|u| u.as_str().to_string())
|
||||
.unwrap_or_else(|| "<disabled>".to_string());
|
||||
let grafana = observability
|
||||
.grafana_url
|
||||
.as_ref()
|
||||
.map(|u| u.as_str().to_string())
|
||||
.unwrap_or_else(|| "<disabled>".to_string());
|
||||
println!(
|
||||
"TESTNET_ENDPOINTS prometheus={} grafana={}",
|
||||
prometheus, grafana
|
||||
);
|
||||
|
||||
print_profiling_urls(&host, &host_ports);
|
||||
}
|
||||
maybe_print_endpoints(&observability, &host, &host_ports);
|
||||
|
||||
let (block_feed, block_feed_guard) = client_builder
|
||||
.start_block_feed(&node_clients, &mut environment)
|
||||
@ -158,6 +131,72 @@ impl DeploymentOrchestrator {
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_observability_inputs<Caps>(
|
||||
scenario: &Scenario<Caps>,
|
||||
) -> Result<ObservabilityInputs, ComposeRunnerError>
|
||||
where
|
||||
Caps: ObservabilityCapabilityProvider,
|
||||
{
|
||||
let env_inputs = ObservabilityInputs::from_env()?;
|
||||
let cap_inputs = scenario
|
||||
.capabilities()
|
||||
.observability_capability()
|
||||
.map(ObservabilityInputs::from_capability)
|
||||
.unwrap_or_default();
|
||||
Ok(env_inputs.with_overrides(cap_inputs))
|
||||
}
|
||||
|
||||
async fn wait_for_readiness_or_grace_period(
|
||||
readiness_checks: bool,
|
||||
descriptors: &testing_framework_core::topology::generation::GeneratedTopology,
|
||||
host_ports: &HostPortMapping,
|
||||
environment: &mut StackEnvironment,
|
||||
) -> Result<(), ComposeRunnerError> {
|
||||
if readiness_checks {
|
||||
ReadinessChecker::wait_all(descriptors, host_ports, environment).await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!("readiness checks disabled; giving the stack a short grace period");
|
||||
crate::lifecycle::readiness::maybe_sleep_for_disabled_readiness(false).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn log_observability_endpoints(observability: &ObservabilityInputs) {
|
||||
if let Some(url) = observability.metrics_query_url.as_ref() {
|
||||
info!(
|
||||
metrics_query_url = %url.as_str(),
|
||||
"metrics query endpoint configured"
|
||||
);
|
||||
}
|
||||
if let Some(url) = observability.grafana_url.as_ref() {
|
||||
info!(grafana_url = %url.as_str(), "grafana url configured");
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_print_endpoints(observability: &ObservabilityInputs, host: &str, ports: &HostPortMapping) {
|
||||
if std::env::var("TESTNET_PRINT_ENDPOINTS").is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
let prometheus = observability
|
||||
.metrics_query_url
|
||||
.as_ref()
|
||||
.map(|u| u.as_str().to_string())
|
||||
.unwrap_or_else(|| "<disabled>".to_string());
|
||||
let grafana = observability
|
||||
.grafana_url
|
||||
.as_ref()
|
||||
.map(|u| u.as_str().to_string())
|
||||
.unwrap_or_else(|| "<disabled>".to_string());
|
||||
|
||||
println!(
|
||||
"TESTNET_ENDPOINTS prometheus={} grafana={}",
|
||||
prometheus, grafana
|
||||
);
|
||||
print_profiling_urls(host, ports);
|
||||
}
|
||||
|
||||
fn log_profiling_urls(host: &str, ports: &HostPortMapping) {
|
||||
for (idx, node) in ports.validators.iter().enumerate() {
|
||||
tracing::info!(
|
||||
|
||||
@ -140,20 +140,14 @@ async fn deploy_with_observability<Caps>(
|
||||
scenario: &Scenario<Caps>,
|
||||
observability: Option<&ObservabilityCapability>,
|
||||
) -> Result<Runner, K8sRunnerError> {
|
||||
let env_inputs = ObservabilityInputs::from_env()?;
|
||||
let cap_inputs = observability
|
||||
.map(ObservabilityInputs::from_capability)
|
||||
.unwrap_or_default();
|
||||
let observability = env_inputs.with_overrides(cap_inputs);
|
||||
let observability = resolve_observability_inputs(observability)?;
|
||||
|
||||
let descriptors = scenario.topology().clone();
|
||||
let validator_count = descriptors.validators().len();
|
||||
let executor_count = descriptors.executors().len();
|
||||
ensure_supported_topology(&descriptors)?;
|
||||
|
||||
let client = Client::try_default()
|
||||
.await
|
||||
.map_err(|source| K8sRunnerError::ClientInit { source })?;
|
||||
let client = init_kube_client().await?;
|
||||
|
||||
info!(
|
||||
validators = validator_count,
|
||||
@ -182,37 +176,12 @@ async fn deploy_with_observability<Caps>(
|
||||
);
|
||||
|
||||
info!("building node clients");
|
||||
let environment = cluster
|
||||
.as_ref()
|
||||
.ok_or_else(|| K8sRunnerError::InternalInvariant {
|
||||
message: "cluster must be available while building clients".to_owned(),
|
||||
})?;
|
||||
let node_clients = match build_node_clients(environment) {
|
||||
Ok(clients) => clients,
|
||||
Err(err) => {
|
||||
fail_cluster(&mut cluster, "failed to construct node api clients").await;
|
||||
error!(error = ?err, "failed to build k8s node clients");
|
||||
return Err(err.into());
|
||||
}
|
||||
};
|
||||
let node_clients = build_node_clients_or_fail(&mut cluster).await?;
|
||||
|
||||
let telemetry = match observability.telemetry_handle() {
|
||||
Ok(handle) => handle,
|
||||
Err(err) => {
|
||||
fail_cluster(&mut cluster, "failed to configure metrics telemetry handle").await;
|
||||
error!(error = ?err, "failed to configure metrics telemetry handle");
|
||||
return Err(err.into());
|
||||
}
|
||||
};
|
||||
let telemetry = build_telemetry_or_fail(&mut cluster, &observability).await?;
|
||||
|
||||
let (block_feed, block_feed_guard) = match spawn_block_feed_with(&node_clients).await {
|
||||
Ok(pair) => pair,
|
||||
Err(err) => {
|
||||
fail_cluster(&mut cluster, "failed to initialize block feed").await;
|
||||
error!(error = ?err, "failed to initialize block feed");
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
let (block_feed, block_feed_guard) =
|
||||
spawn_block_feed_or_fail(&mut cluster, &node_clients).await?;
|
||||
|
||||
if let Some(url) = observability.metrics_query_url.as_ref() {
|
||||
info!(
|
||||
@ -224,69 +193,19 @@ async fn deploy_with_observability<Caps>(
|
||||
info!(grafana_url = %url.as_str(), "grafana url configured");
|
||||
}
|
||||
|
||||
if std::env::var("TESTNET_PRINT_ENDPOINTS").is_ok() {
|
||||
let prometheus = observability
|
||||
.metrics_query_url
|
||||
.as_ref()
|
||||
.map(|u| u.as_str().to_string())
|
||||
.unwrap_or_else(|| "<disabled>".to_string());
|
||||
println!(
|
||||
"TESTNET_ENDPOINTS prometheus={} grafana={}",
|
||||
prometheus,
|
||||
observability
|
||||
.grafana_url
|
||||
.as_ref()
|
||||
.map(|u| u.as_str().to_string())
|
||||
.unwrap_or_else(|| "<disabled>".to_string())
|
||||
);
|
||||
maybe_print_endpoints(&observability, &node_clients);
|
||||
|
||||
for (idx, client) in node_clients.validator_clients().iter().enumerate() {
|
||||
println!(
|
||||
"TESTNET_PPROF validator_{}={}/debug/pprof/profile?seconds=15&format=proto",
|
||||
idx,
|
||||
client.base_url()
|
||||
);
|
||||
}
|
||||
|
||||
for (idx, client) in node_clients.executor_clients().iter().enumerate() {
|
||||
println!(
|
||||
"TESTNET_PPROF executor_{}={}/debug/pprof/profile?seconds=15&format=proto",
|
||||
idx,
|
||||
client.base_url()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let environment = cluster
|
||||
.take()
|
||||
.ok_or_else(|| K8sRunnerError::InternalInvariant {
|
||||
message: "cluster should still be available".to_owned(),
|
||||
})?;
|
||||
let (cleanup, port_forwards) = environment.into_cleanup()?;
|
||||
let cleanup_guard: Box<dyn CleanupGuard> = Box::new(K8sCleanupGuard::new(
|
||||
cleanup,
|
||||
block_feed_guard,
|
||||
port_forwards,
|
||||
));
|
||||
|
||||
let context = RunContext::new(
|
||||
finalize_runner(
|
||||
&mut cluster,
|
||||
descriptors,
|
||||
None,
|
||||
node_clients,
|
||||
scenario.duration(),
|
||||
telemetry,
|
||||
block_feed,
|
||||
None,
|
||||
);
|
||||
|
||||
info!(
|
||||
validators = validator_count,
|
||||
executors = executor_count,
|
||||
duration_secs = scenario.duration().as_secs(),
|
||||
"k8s deployment ready; handing control to scenario runner"
|
||||
);
|
||||
|
||||
Ok(Runner::new(context, Some(cleanup_guard)))
|
||||
block_feed_guard,
|
||||
validator_count,
|
||||
executor_count,
|
||||
)
|
||||
}
|
||||
|
||||
async fn setup_cluster(
|
||||
@ -332,6 +251,155 @@ async fn setup_cluster(
|
||||
Ok(environment)
|
||||
}
|
||||
|
||||
fn resolve_observability_inputs(
|
||||
observability: Option<&ObservabilityCapability>,
|
||||
) -> Result<ObservabilityInputs, K8sRunnerError> {
|
||||
let env_inputs = ObservabilityInputs::from_env()?;
|
||||
let cap_inputs = observability
|
||||
.map(ObservabilityInputs::from_capability)
|
||||
.unwrap_or_default();
|
||||
Ok(env_inputs.with_overrides(cap_inputs))
|
||||
}
|
||||
|
||||
async fn init_kube_client() -> Result<Client, K8sRunnerError> {
|
||||
Client::try_default()
|
||||
.await
|
||||
.map_err(|source| K8sRunnerError::ClientInit { source })
|
||||
}
|
||||
|
||||
async fn build_node_clients_or_fail(
|
||||
cluster: &mut Option<ClusterEnvironment>,
|
||||
) -> Result<testing_framework_core::scenario::NodeClients, K8sRunnerError> {
|
||||
let environment = cluster
|
||||
.as_ref()
|
||||
.ok_or_else(|| K8sRunnerError::InternalInvariant {
|
||||
message: "cluster must be available while building clients".to_owned(),
|
||||
})?;
|
||||
|
||||
match build_node_clients(environment) {
|
||||
Ok(clients) => Ok(clients),
|
||||
Err(err) => {
|
||||
fail_cluster(cluster, "failed to construct node api clients").await;
|
||||
error!(error = ?err, "failed to build k8s node clients");
|
||||
Err(err.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_telemetry_or_fail(
|
||||
cluster: &mut Option<ClusterEnvironment>,
|
||||
observability: &ObservabilityInputs,
|
||||
) -> Result<testing_framework_core::scenario::Metrics, K8sRunnerError> {
|
||||
match observability.telemetry_handle() {
|
||||
Ok(handle) => Ok(handle),
|
||||
Err(err) => {
|
||||
fail_cluster(cluster, "failed to configure metrics telemetry handle").await;
|
||||
error!(error = ?err, "failed to configure metrics telemetry handle");
|
||||
Err(err.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn spawn_block_feed_or_fail(
|
||||
cluster: &mut Option<ClusterEnvironment>,
|
||||
node_clients: &testing_framework_core::scenario::NodeClients,
|
||||
) -> Result<(testing_framework_core::scenario::BlockFeed, BlockFeedTask), K8sRunnerError> {
|
||||
match spawn_block_feed_with(node_clients).await {
|
||||
Ok(pair) => Ok(pair),
|
||||
Err(err) => {
|
||||
fail_cluster(cluster, "failed to initialize block feed").await;
|
||||
error!(error = ?err, "failed to initialize block feed");
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_print_endpoints(
|
||||
observability: &ObservabilityInputs,
|
||||
node_clients: &testing_framework_core::scenario::NodeClients,
|
||||
) {
|
||||
if std::env::var("TESTNET_PRINT_ENDPOINTS").is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
let prometheus = observability
|
||||
.metrics_query_url
|
||||
.as_ref()
|
||||
.map(|u| u.as_str().to_string())
|
||||
.unwrap_or_else(|| "<disabled>".to_string());
|
||||
|
||||
println!(
|
||||
"TESTNET_ENDPOINTS prometheus={} grafana={}",
|
||||
prometheus,
|
||||
observability
|
||||
.grafana_url
|
||||
.as_ref()
|
||||
.map(|u| u.as_str().to_string())
|
||||
.unwrap_or_else(|| "<disabled>".to_string())
|
||||
);
|
||||
|
||||
for (idx, client) in node_clients.validator_clients().iter().enumerate() {
|
||||
println!(
|
||||
"TESTNET_PPROF validator_{}={}/debug/pprof/profile?seconds=15&format=proto",
|
||||
idx,
|
||||
client.base_url()
|
||||
);
|
||||
}
|
||||
|
||||
for (idx, client) in node_clients.executor_clients().iter().enumerate() {
|
||||
println!(
|
||||
"TESTNET_PPROF executor_{}={}/debug/pprof/profile?seconds=15&format=proto",
|
||||
idx,
|
||||
client.base_url()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn finalize_runner(
|
||||
cluster: &mut Option<ClusterEnvironment>,
|
||||
descriptors: GeneratedTopology,
|
||||
node_clients: testing_framework_core::scenario::NodeClients,
|
||||
duration: std::time::Duration,
|
||||
telemetry: testing_framework_core::scenario::Metrics,
|
||||
block_feed: testing_framework_core::scenario::BlockFeed,
|
||||
block_feed_guard: BlockFeedTask,
|
||||
validator_count: usize,
|
||||
executor_count: usize,
|
||||
) -> Result<Runner, K8sRunnerError> {
|
||||
let environment = cluster
|
||||
.take()
|
||||
.ok_or_else(|| K8sRunnerError::InternalInvariant {
|
||||
message: "cluster should still be available".to_owned(),
|
||||
})?;
|
||||
let (cleanup, port_forwards) = environment.into_cleanup()?;
|
||||
|
||||
let cleanup_guard: Box<dyn CleanupGuard> = Box::new(K8sCleanupGuard::new(
|
||||
cleanup,
|
||||
block_feed_guard,
|
||||
port_forwards,
|
||||
));
|
||||
|
||||
let context = RunContext::new(
|
||||
descriptors,
|
||||
None,
|
||||
node_clients,
|
||||
duration,
|
||||
telemetry,
|
||||
block_feed,
|
||||
None,
|
||||
);
|
||||
|
||||
info!(
|
||||
validators = validator_count,
|
||||
executors = executor_count,
|
||||
duration_secs = duration.as_secs(),
|
||||
"k8s deployment ready; handing control to scenario runner"
|
||||
);
|
||||
|
||||
Ok(Runner::new(context, Some(cleanup_guard)))
|
||||
}
|
||||
|
||||
struct K8sCleanupGuard {
|
||||
cleanup: RunnerCleanup,
|
||||
block_feed: Option<BlockFeedTask>,
|
||||
|
||||
@ -91,24 +91,21 @@ pub fn prepare_assets(
|
||||
|
||||
let root = workspace_root().map_err(|source| AssetsError::WorkspaceRoot { source })?;
|
||||
let kzg_spec = KzgParamsSpec::for_k8s(&root);
|
||||
let cfgsync_yaml = render_cfgsync_config(&root, topology, &kzg_spec, metrics_otlp_ingest_url)?;
|
||||
|
||||
let tempdir = tempfile::Builder::new()
|
||||
.prefix("nomos-helm-")
|
||||
.tempdir()
|
||||
.map_err(|source| AssetsError::TempDir { source })?;
|
||||
let tempdir = create_assets_tempdir()?;
|
||||
|
||||
let cfgsync_file = write_temp_file(tempdir.path(), "cfgsync.yaml", cfgsync_yaml)?;
|
||||
let cfgsync_file = render_and_write_cfgsync(
|
||||
&root,
|
||||
topology,
|
||||
&kzg_spec,
|
||||
metrics_otlp_ingest_url,
|
||||
&tempdir,
|
||||
)?;
|
||||
let scripts = validate_scripts(&root)?;
|
||||
let kzg_path = match kzg_spec.mode {
|
||||
KzgMode::HostPath => Some(validate_kzg_params(&root, &kzg_spec)?),
|
||||
KzgMode::InImage => None,
|
||||
};
|
||||
let kzg_path = resolve_kzg_path(&root, &kzg_spec)?;
|
||||
let chart_path = helm_chart_path()?;
|
||||
let values_yaml = render_values_yaml(topology)?;
|
||||
let values_file = write_temp_file(tempdir.path(), "values.yaml", values_yaml)?;
|
||||
let image = tf_env::nomos_testnet_image()
|
||||
.unwrap_or_else(|| String::from("public.ecr.aws/r4s5t9y4/logos/logos-blockchain:test"));
|
||||
let values_file = render_and_write_values(topology, &tempdir)?;
|
||||
let image = testnet_image();
|
||||
|
||||
let kzg_display = kzg_path
|
||||
.as_ref()
|
||||
@ -139,6 +136,44 @@ pub fn prepare_assets(
|
||||
})
|
||||
}
|
||||
|
||||
fn create_assets_tempdir() -> Result<TempDir, AssetsError> {
|
||||
tempfile::Builder::new()
|
||||
.prefix("nomos-helm-")
|
||||
.tempdir()
|
||||
.map_err(|source| AssetsError::TempDir { source })
|
||||
}
|
||||
|
||||
fn render_and_write_cfgsync(
|
||||
root: &Path,
|
||||
topology: &GeneratedTopology,
|
||||
kzg_spec: &KzgParamsSpec,
|
||||
metrics_otlp_ingest_url: Option<&Url>,
|
||||
tempdir: &TempDir,
|
||||
) -> Result<PathBuf, AssetsError> {
|
||||
let cfgsync_yaml = render_cfgsync_config(root, topology, kzg_spec, metrics_otlp_ingest_url)?;
|
||||
write_temp_file(tempdir.path(), "cfgsync.yaml", cfgsync_yaml)
|
||||
}
|
||||
|
||||
fn resolve_kzg_path(root: &Path, kzg_spec: &KzgParamsSpec) -> Result<Option<PathBuf>, AssetsError> {
|
||||
match kzg_spec.mode {
|
||||
KzgMode::HostPath => Ok(Some(validate_kzg_params(root, kzg_spec)?)),
|
||||
KzgMode::InImage => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn render_and_write_values(
|
||||
topology: &GeneratedTopology,
|
||||
tempdir: &TempDir,
|
||||
) -> Result<PathBuf, AssetsError> {
|
||||
let values_yaml = render_values_yaml(topology)?;
|
||||
write_temp_file(tempdir.path(), "values.yaml", values_yaml)
|
||||
}
|
||||
|
||||
fn testnet_image() -> String {
|
||||
tf_env::nomos_testnet_image()
|
||||
.unwrap_or_else(|| String::from("public.ecr.aws/r4s5t9y4/logos/logos-blockchain:test"))
|
||||
}
|
||||
|
||||
const CFGSYNC_K8S_TIMEOUT_SECS: u64 = 300;
|
||||
|
||||
fn render_cfgsync_config(
|
||||
@ -313,91 +348,64 @@ fn build_values(topology: &GeneratedTopology) -> HelmValues {
|
||||
let image_pull_policy =
|
||||
tf_env::nomos_testnet_image_pull_policy().unwrap_or_else(|| "IfNotPresent".into());
|
||||
debug!(pol_mode, "rendering Helm values for k8s stack");
|
||||
let validators = topology
|
||||
.validators()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(index, validator)| {
|
||||
let mut env = BTreeMap::new();
|
||||
env.insert("POL_PROOF_DEV_MODE".into(), pol_mode.clone());
|
||||
env.insert(
|
||||
"CFG_NETWORK_PORT".into(),
|
||||
validator.network_port().to_string(),
|
||||
);
|
||||
env.insert("CFG_DA_PORT".into(), validator.da_port.to_string());
|
||||
env.insert("CFG_BLEND_PORT".into(), validator.blend_port.to_string());
|
||||
env.insert(
|
||||
"CFG_API_PORT".into(),
|
||||
validator.general.api_config.address.port().to_string(),
|
||||
);
|
||||
env.insert(
|
||||
"CFG_TESTING_HTTP_PORT".into(),
|
||||
validator
|
||||
.general
|
||||
.api_config
|
||||
.testing_http_address
|
||||
.port()
|
||||
.to_string(),
|
||||
);
|
||||
env.insert("CFG_HOST_KIND".into(), "validator".into());
|
||||
env.insert("CFG_HOST_IDENTIFIER".into(), format!("validator-{index}"));
|
||||
|
||||
NodeValues {
|
||||
api_port: validator.general.api_config.address.port(),
|
||||
testing_http_port: validator.general.api_config.testing_http_address.port(),
|
||||
env,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let executors = topology
|
||||
.executors()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(index, executor)| {
|
||||
let mut env = BTreeMap::new();
|
||||
env.insert("POL_PROOF_DEV_MODE".into(), pol_mode.clone());
|
||||
env.insert(
|
||||
"CFG_NETWORK_PORT".into(),
|
||||
executor.network_port().to_string(),
|
||||
);
|
||||
env.insert("CFG_DA_PORT".into(), executor.da_port.to_string());
|
||||
env.insert("CFG_BLEND_PORT".into(), executor.blend_port.to_string());
|
||||
env.insert(
|
||||
"CFG_API_PORT".into(),
|
||||
executor.general.api_config.address.port().to_string(),
|
||||
);
|
||||
env.insert(
|
||||
"CFG_TESTING_HTTP_PORT".into(),
|
||||
executor
|
||||
.general
|
||||
.api_config
|
||||
.testing_http_address
|
||||
.port()
|
||||
.to_string(),
|
||||
);
|
||||
env.insert("CFG_HOST_KIND".into(), "executor".into());
|
||||
env.insert("CFG_HOST_IDENTIFIER".into(), format!("executor-{index}"));
|
||||
|
||||
NodeValues {
|
||||
api_port: executor.general.api_config.address.port(),
|
||||
testing_http_port: executor.general.api_config.testing_http_address.port(),
|
||||
env,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let validators = build_node_group("validator", topology.validators(), &pol_mode);
|
||||
let executors = build_node_group("executor", topology.executors(), &pol_mode);
|
||||
|
||||
HelmValues {
|
||||
image_pull_policy,
|
||||
cfgsync,
|
||||
validators: NodeGroup {
|
||||
count: topology.validators().len(),
|
||||
nodes: validators,
|
||||
},
|
||||
executors: NodeGroup {
|
||||
count: topology.executors().len(),
|
||||
nodes: executors,
|
||||
},
|
||||
validators,
|
||||
executors,
|
||||
}
|
||||
}
|
||||
|
||||
fn build_node_group(
|
||||
kind: &'static str,
|
||||
nodes: &[testing_framework_core::topology::generation::GeneratedNodeConfig],
|
||||
pol_mode: &str,
|
||||
) -> NodeGroup {
|
||||
let node_values = nodes
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(index, node)| build_node_values(kind, index, node, pol_mode))
|
||||
.collect();
|
||||
|
||||
NodeGroup {
|
||||
count: nodes.len(),
|
||||
nodes: node_values,
|
||||
}
|
||||
}
|
||||
|
||||
fn build_node_values(
|
||||
kind: &'static str,
|
||||
index: usize,
|
||||
node: &testing_framework_core::topology::generation::GeneratedNodeConfig,
|
||||
pol_mode: &str,
|
||||
) -> NodeValues {
|
||||
let mut env = BTreeMap::new();
|
||||
env.insert("POL_PROOF_DEV_MODE".into(), pol_mode.to_string());
|
||||
env.insert("CFG_NETWORK_PORT".into(), node.network_port().to_string());
|
||||
env.insert("CFG_DA_PORT".into(), node.da_port.to_string());
|
||||
env.insert("CFG_BLEND_PORT".into(), node.blend_port.to_string());
|
||||
env.insert(
|
||||
"CFG_API_PORT".into(),
|
||||
node.general.api_config.address.port().to_string(),
|
||||
);
|
||||
env.insert(
|
||||
"CFG_TESTING_HTTP_PORT".into(),
|
||||
node.general
|
||||
.api_config
|
||||
.testing_http_address
|
||||
.port()
|
||||
.to_string(),
|
||||
);
|
||||
env.insert("CFG_HOST_KIND".into(), kind.to_string());
|
||||
env.insert("CFG_HOST_IDENTIFIER".into(), format!("{kind}-{index}"));
|
||||
|
||||
NodeValues {
|
||||
api_port: node.general.api_config.address.port(),
|
||||
testing_http_port: node.general.api_config.testing_http_address.port(),
|
||||
env,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use std::{io, process::Stdio};
|
||||
use std::{io, path::Path, process::Stdio};
|
||||
|
||||
use thiserror::Error;
|
||||
use tokio::process::Command;
|
||||
@ -34,18 +34,7 @@ pub async fn install_release(
|
||||
validators: usize,
|
||||
executors: usize,
|
||||
) -> Result<(), HelmError> {
|
||||
let (host_path_type, host_path) = match assets.kzg_mode {
|
||||
KzgMode::HostPath => {
|
||||
let host_path = assets.kzg_path.as_ref().ok_or(HelmError::MissingKzgPath)?;
|
||||
let host_path_type = if host_path.is_dir() {
|
||||
"Directory"
|
||||
} else {
|
||||
"File"
|
||||
};
|
||||
(Some(host_path_type), Some(host_path))
|
||||
}
|
||||
KzgMode::InImage => (None, None),
|
||||
};
|
||||
let kzg = resolve_kzg_install_args(assets)?;
|
||||
info!(
|
||||
release,
|
||||
namespace,
|
||||
@ -54,14 +43,69 @@ pub async fn install_release(
|
||||
image = %assets.image,
|
||||
cfgsync_port = cfgsync_port_value(),
|
||||
kzg_mode = ?assets.kzg_mode,
|
||||
kzg = %host_path
|
||||
.as_ref()
|
||||
.map(|p| p.display().to_string())
|
||||
.unwrap_or_else(|| "<in-image>".to_string()),
|
||||
kzg = %kzg.display(),
|
||||
values = %assets.values_file.display(),
|
||||
"installing helm release"
|
||||
);
|
||||
|
||||
let command = format!("helm install {release}");
|
||||
let cmd = build_install_command(
|
||||
assets, release, namespace, validators, executors, &kzg, &command,
|
||||
);
|
||||
let output = run_helm_command(cmd, &command).await?;
|
||||
|
||||
maybe_log_install_output(&command, &output);
|
||||
|
||||
info!(release, namespace, "helm install completed");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct KzgInstallArgs<'a> {
|
||||
mode: &'static str,
|
||||
host_path: Option<&'a Path>,
|
||||
host_path_type: Option<&'static str>,
|
||||
}
|
||||
|
||||
impl KzgInstallArgs<'_> {
|
||||
fn display(&self) -> String {
|
||||
self.host_path
|
||||
.map(|p| p.display().to_string())
|
||||
.unwrap_or_else(|| "<in-image>".to_string())
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_kzg_install_args(assets: &RunnerAssets) -> Result<KzgInstallArgs<'_>, HelmError> {
|
||||
match assets.kzg_mode {
|
||||
KzgMode::HostPath => {
|
||||
let host_path = assets.kzg_path.as_ref().ok_or(HelmError::MissingKzgPath)?;
|
||||
let host_path_type = if host_path.is_dir() {
|
||||
"Directory"
|
||||
} else {
|
||||
"File"
|
||||
};
|
||||
Ok(KzgInstallArgs {
|
||||
mode: "kzg.mode=hostPath",
|
||||
host_path: Some(host_path),
|
||||
host_path_type: Some(host_path_type),
|
||||
})
|
||||
}
|
||||
KzgMode::InImage => Ok(KzgInstallArgs {
|
||||
mode: "kzg.mode=inImage",
|
||||
host_path: None,
|
||||
host_path_type: None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_install_command(
|
||||
assets: &RunnerAssets,
|
||||
release: &str,
|
||||
namespace: &str,
|
||||
validators: usize,
|
||||
executors: usize,
|
||||
kzg: &KzgInstallArgs<'_>,
|
||||
command: &str,
|
||||
) -> Command {
|
||||
let mut cmd = Command::new("helm");
|
||||
cmd.arg("install")
|
||||
.arg(release)
|
||||
@ -83,10 +127,7 @@ pub async fn install_release(
|
||||
.arg("-f")
|
||||
.arg(&assets.values_file)
|
||||
.arg("--set")
|
||||
.arg(match assets.kzg_mode {
|
||||
KzgMode::HostPath => "kzg.mode=hostPath",
|
||||
KzgMode::InImage => "kzg.mode=inImage",
|
||||
})
|
||||
.arg(kzg.mode)
|
||||
.arg("--set-file")
|
||||
.arg(format!("cfgsync.config={}", assets.cfgsync_file.display()))
|
||||
.arg("--set-file")
|
||||
@ -112,7 +153,7 @@ pub async fn install_release(
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped());
|
||||
|
||||
if let (Some(host_path), Some(host_path_type)) = (host_path, host_path_type) {
|
||||
if let (Some(host_path), Some(host_path_type)) = (kzg.host_path, kzg.host_path_type) {
|
||||
cmd.arg("--set")
|
||||
.arg(format!("kzg.hostPath={}", host_path.display()))
|
||||
.arg("--set")
|
||||
@ -123,16 +164,26 @@ pub async fn install_release(
|
||||
cmd.current_dir(root);
|
||||
}
|
||||
|
||||
let command = format!("helm install {release}");
|
||||
let output = run_helm_command(cmd, &command).await?;
|
||||
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
|
||||
debug!(command, "prepared helm install command");
|
||||
cmd
|
||||
}
|
||||
|
||||
if std::env::var("K8S_RUNNER_DEBUG").is_ok() {
|
||||
debug!(command, stdout = %String::from_utf8_lossy(&output.stdout), "helm install stdout");
|
||||
debug!(command, stderr = %String::from_utf8_lossy(&output.stderr), "helm install stderr");
|
||||
fn maybe_log_install_output(command: &str, output: &std::process::Output) {
|
||||
if std::env::var("K8S_RUNNER_DEBUG").is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
info!(release, namespace, "helm install completed");
|
||||
Ok(())
|
||||
debug!(
|
||||
command,
|
||||
stdout = %String::from_utf8_lossy(&output.stdout),
|
||||
"helm install stdout"
|
||||
);
|
||||
debug!(
|
||||
command,
|
||||
stderr = %String::from_utf8_lossy(&output.stderr),
|
||||
"helm install stderr"
|
||||
);
|
||||
}
|
||||
|
||||
/// Uninstall the release and namespace resources.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user