k8s: remove panics from runner invariants

This commit is contained in:
andrussal 2025-12-18 21:17:26 +01:00
parent e66a813e05
commit 9392280127
3 changed files with 37 additions and 21 deletions

View File

@ -14,9 +14,9 @@ use crate::{
infrastructure::{ infrastructure::{
assets::{AssetsError, prepare_assets}, assets::{AssetsError, prepare_assets},
cluster::{ cluster::{
ClusterEnvironment, NodeClientError, PortSpecs, RemoteReadinessError, ClusterEnvironment, ClusterEnvironmentError, NodeClientError, PortSpecs,
build_node_clients, cluster_identifiers, collect_port_specs, ensure_cluster_readiness, RemoteReadinessError, build_node_clients, cluster_identifiers, collect_port_specs,
install_stack, kill_port_forwards, wait_for_ports_or_cleanup, ensure_cluster_readiness, install_stack, kill_port_forwards, wait_for_ports_or_cleanup,
}, },
helm::HelmError, helm::HelmError,
}, },
@ -70,6 +70,8 @@ pub enum K8sRunnerError {
#[error(transparent)] #[error(transparent)]
Helm(#[from] HelmError), Helm(#[from] HelmError),
#[error(transparent)] #[error(transparent)]
ClusterEnvironment(#[from] ClusterEnvironmentError),
#[error(transparent)]
Cluster(#[from] Box<ClusterWaitError>), Cluster(#[from] Box<ClusterWaitError>),
#[error(transparent)] #[error(transparent)]
Readiness(#[from] RemoteReadinessError), Readiness(#[from] RemoteReadinessError),
@ -77,6 +79,8 @@ pub enum K8sRunnerError {
NodeClients(#[from] NodeClientError), NodeClients(#[from] NodeClientError),
#[error(transparent)] #[error(transparent)]
Telemetry(#[from] MetricsError), Telemetry(#[from] MetricsError),
#[error("internal invariant violated: {message}")]
InternalInvariant { message: String },
#[error("k8s runner requires at least one node client to follow blocks")] #[error("k8s runner requires at least one node client to follow blocks")]
BlockFeedMissing, BlockFeedMissing,
#[error("failed to initialize block feed: {source}")] #[error("failed to initialize block feed: {source}")]
@ -178,11 +182,12 @@ async fn deploy_with_observability<Caps>(
); );
info!("building node clients"); info!("building node clients");
let node_clients = match build_node_clients( let environment = cluster
cluster
.as_ref() .as_ref()
.expect("cluster must be available while building clients"), .ok_or_else(|| K8sRunnerError::InternalInvariant {
) { message: "cluster must be available while building clients".to_owned(),
})?;
let node_clients = match build_node_clients(environment) {
Ok(clients) => clients, Ok(clients) => clients,
Err(err) => { Err(err) => {
fail_cluster(&mut cluster, "failed to construct node api clients").await; fail_cluster(&mut cluster, "failed to construct node api clients").await;
@ -252,10 +257,12 @@ async fn deploy_with_observability<Caps>(
} }
} }
let (cleanup, port_forwards) = cluster let environment = cluster
.take() .take()
.expect("cluster should still be available") .ok_or_else(|| K8sRunnerError::InternalInvariant {
.into_cleanup(); message: "cluster should still be available".to_owned(),
})?;
let (cleanup, port_forwards) = environment.into_cleanup()?;
let cleanup_guard: Box<dyn CleanupGuard> = Box::new(K8sCleanupGuard::new( let cleanup_guard: Box<dyn CleanupGuard> = Box::new(K8sCleanupGuard::new(
cleanup, cleanup,
block_feed_guard, block_feed_guard,
@ -309,7 +316,9 @@ async fn setup_cluster(
release, release,
cleanup_guard cleanup_guard
.take() .take()
.expect("cleanup guard must exist after successful cluster startup"), .ok_or_else(|| K8sRunnerError::InternalInvariant {
message: "cleanup guard must exist after successful cluster startup".to_owned(),
})?,
&cluster_ready.ports, &cluster_ready.ports,
cluster_ready.port_forwards, cluster_ready.port_forwards,
); );

View File

@ -40,6 +40,12 @@ pub struct ClusterEnvironment {
port_forwards: Vec<PortForwardHandle>, port_forwards: Vec<PortForwardHandle>,
} }
#[derive(Debug, thiserror::Error)]
pub enum ClusterEnvironmentError {
#[error("cleanup guard is missing (it may have already been consumed)")]
MissingCleanupGuard,
}
impl ClusterEnvironment { impl ClusterEnvironment {
pub fn new( pub fn new(
client: Client, client: Client,
@ -83,11 +89,13 @@ impl ClusterEnvironment {
} }
} }
pub fn into_cleanup(self) -> (RunnerCleanup, Vec<PortForwardHandle>) { pub fn into_cleanup(
( self,
self.cleanup.expect("cleanup guard should be available"), ) -> Result<(RunnerCleanup, Vec<PortForwardHandle>), ClusterEnvironmentError> {
self.port_forwards, let cleanup = self
) .cleanup
.ok_or(ClusterEnvironmentError::MissingCleanupGuard)?;
Ok((cleanup, self.port_forwards))
} }
#[allow(dead_code)] #[allow(dead_code)]

View File

@ -15,6 +15,8 @@ pub enum HelmError {
#[source] #[source]
source: io::Error, source: io::Error,
}, },
#[error("kzg_path must be present for HostPath mode")]
MissingKzgPath,
#[error("{command} exited with status {status:?}\nstderr:\n{stderr}\nstdout:\n{stdout}")] #[error("{command} exited with status {status:?}\nstderr:\n{stderr}\nstdout:\n{stdout}")]
Failed { Failed {
command: String, command: String,
@ -34,10 +36,7 @@ pub async fn install_release(
) -> Result<(), HelmError> { ) -> Result<(), HelmError> {
let (host_path_type, host_path) = match assets.kzg_mode { let (host_path_type, host_path) = match assets.kzg_mode {
KzgMode::HostPath => { KzgMode::HostPath => {
let host_path = assets let host_path = assets.kzg_path.as_ref().ok_or(HelmError::MissingKzgPath)?;
.kzg_path
.as_ref()
.expect("kzg_path must be present for HostPath mode");
let host_path_type = if host_path.is_dir() { let host_path_type = if host_path.is_dir() {
"Directory" "Directory"
} else { } else {