refactor(compose-runner): reduce duplicated failure paths

This commit is contained in:
andrussal 2025-12-15 23:59:08 +01:00
parent 94db9af3a7
commit 88e7bed12b
3 changed files with 85 additions and 42 deletions

View File

@ -27,16 +27,19 @@ impl ClientBuilder {
host: &str,
environment: &mut StackEnvironment,
) -> Result<NodeClients, ComposeRunnerError> {
match build_node_clients_with_ports(descriptors, host_ports, host) {
Ok(clients) => Ok(clients),
let clients = match build_node_clients_with_ports(descriptors, host_ports, host) {
Ok(clients) => clients,
Err(err) => {
environment
.fail("failed to construct node api clients")
.await;
tracing::warn!(error = ?err, host, "failed to build node clients");
Err(err.into())
return Err(fail_deploy_step(
environment,
"failed to construct node api clients",
"failed to build node clients",
err,
)
.await);
}
}
};
Ok(clients)
}
pub async fn start_block_feed(
@ -44,16 +47,33 @@ impl ClientBuilder {
node_clients: &NodeClients,
environment: &mut StackEnvironment,
) -> Result<(BlockFeed, BlockFeedTask), ComposeRunnerError> {
match spawn_block_feed_with_retry(node_clients).await {
Ok(pair) => {
info!("block feed connected to validator");
Ok(pair)
}
let pair = match spawn_block_feed_with_retry(node_clients).await {
Ok(pair) => pair,
Err(err) => {
environment.fail("failed to initialize block feed").await;
tracing::warn!(error = ?err, "block feed initialization failed");
Err(err)
return Err(fail_deploy_step(
environment,
"failed to initialize block feed",
"block feed initialization failed",
err,
)
.await);
}
}
};
info!("block feed connected to validator");
Ok(pair)
}
}
async fn fail_deploy_step<E>(
environment: &mut StackEnvironment,
reason: &str,
log_message: &str,
error: E,
) -> ComposeRunnerError
where
E: std::fmt::Debug + Into<ComposeRunnerError>,
{
environment.fail(reason).await;
tracing::warn!(error = ?error, "{log_message}");
error.into()
}

View File

@ -18,37 +18,56 @@ impl ReadinessChecker {
host_ports: &HostPortMapping,
environment: &mut StackEnvironment,
) -> Result<(), ComposeRunnerError> {
info!(
ports = ?host_ports.validator_api_ports(),
"waiting for validator HTTP endpoints"
);
if let Err(err) =
ensure_validators_ready_with_ports(&host_ports.validator_api_ports()).await
{
environment.fail("validator readiness failed").await;
tracing::warn!(error = ?err, "validator readiness failed");
return Err(err.into());
let validator_ports = host_ports.validator_api_ports();
info!(ports = ?validator_ports, "waiting for validator HTTP endpoints");
if let Err(err) = ensure_validators_ready_with_ports(&validator_ports).await {
return fail_readiness_step(
environment,
"validator readiness failed",
"validator readiness failed",
err,
)
.await;
}
info!(
ports = ?host_ports.executor_api_ports(),
"waiting for executor HTTP endpoints"
);
if let Err(err) = ensure_executors_ready_with_ports(&host_ports.executor_api_ports()).await
{
environment.fail("executor readiness failed").await;
tracing::warn!(error = ?err, "executor readiness failed");
return Err(err.into());
let executor_ports = host_ports.executor_api_ports();
info!(ports = ?executor_ports, "waiting for executor HTTP endpoints");
if let Err(err) = ensure_executors_ready_with_ports(&executor_ports).await {
return fail_readiness_step(
environment,
"executor readiness failed",
"executor readiness failed",
err,
)
.await;
}
info!("waiting for remote service readiness");
if let Err(err) = ensure_remote_readiness_with_ports(descriptors, host_ports).await {
environment.fail("remote readiness probe failed").await;
tracing::warn!(error = ?err, "remote readiness probe failed");
return Err(err.into());
return fail_readiness_step(
environment,
"remote readiness probe failed",
"remote readiness probe failed",
err,
)
.await;
}
info!("compose readiness checks passed");
Ok(())
}
}
async fn fail_readiness_step<E>(
environment: &mut StackEnvironment,
reason: &str,
log_message: &str,
error: E,
) -> Result<(), ComposeRunnerError>
where
E: std::fmt::Debug + Into<ComposeRunnerError>,
{
environment.fail(reason).await;
tracing::warn!(error = ?error, "{log_message}");
Err(error.into())
}

View File

@ -95,9 +95,7 @@ impl CleanupGuard for RunnerCleanup {
self.teardown_compose();
if let Some(mut handle) = self.cfgsync.take() {
handle.shutdown();
}
self.shutdown_cfgsync();
}
}
@ -114,4 +112,10 @@ impl RunnerCleanup {
info!("compose preserve flag set; skipping docker compose down");
}
fn shutdown_cfgsync(&mut self) {
if let Some(mut handle) = self.cfgsync.take() {
handle.shutdown();
}
}
}