core: make node spawning fallible

This commit is contained in:
andrussal 2025-12-18 22:23:02 +01:00
parent e16a169bbb
commit 132c8d823e
6 changed files with 94 additions and 37 deletions

View File

@ -1,4 +1,5 @@
use std::{
io,
net::SocketAddr,
path::{Path, PathBuf},
process::{Child, Command, Stdio},
@ -26,6 +27,37 @@ const STARTUP_TIMEOUT: Duration = Duration::from_secs(60);
pub type NodeAddresses = (SocketAddr, Option<SocketAddr>);
pub type PreparedNodeConfig<T> = (TempDir, T, SocketAddr, Option<SocketAddr>);
#[derive(Debug, thiserror::Error)]
pub enum SpawnNodeError {
#[error("failed to create node tempdir: {source}")]
TempDir {
#[source]
source: io::Error,
},
#[error("failed to prepare node recovery paths: {source}")]
RecoveryPaths {
#[source]
source: io::Error,
},
#[error("failed to write node config at {path}: {source}")]
WriteConfig {
path: PathBuf,
#[source]
source: io::Error,
},
#[error("failed to spawn node process '{binary}': {source}")]
Spawn {
binary: PathBuf,
#[source]
source: io::Error,
},
#[error("node did not become ready before timeout: {source}")]
Readiness {
#[source]
source: tokio::time::error::Elapsed,
},
}
/// Minimal interface to apply common node setup.
pub trait NodeConfigCommon {
fn set_logger(&mut self, logger: LoggerLayer);
@ -92,14 +124,14 @@ pub fn prepare_node_config<T: NodeConfigCommon>(
mut config: T,
log_prefix: &str,
enable_logging: bool,
) -> PreparedNodeConfig<T> {
let dir = create_tempdir().expect("tempdir");
) -> Result<PreparedNodeConfig<T>, SpawnNodeError> {
let dir = create_tempdir().map_err(|source| SpawnNodeError::TempDir { source })?;
debug!(dir = %dir.path().display(), log_prefix, enable_logging, "preparing node config");
// Ensure recovery files/dirs exist so services that persist state do not fail
// on startup.
let _ = ensure_recovery_paths(dir.path());
ensure_recovery_paths(dir.path()).map_err(|source| SpawnNodeError::RecoveryPaths { source })?;
if enable_logging {
configure_logging(dir.path(), log_prefix, |file_cfg| {
@ -112,7 +144,7 @@ pub fn prepare_node_config<T: NodeConfigCommon>(
debug!(addr = %addr, testing_addr = ?testing_addr, "configured node addresses");
(dir, config, addr, testing_addr)
Ok((dir, config, addr, testing_addr))
}
/// Spawn a node with shared setup, config writing, and readiness wait.
@ -122,27 +154,34 @@ pub async fn spawn_node<C>(
config_filename: &str,
binary_path: PathBuf,
enable_logging: bool,
) -> Result<NodeHandle<C>, tokio::time::error::Elapsed>
) -> Result<NodeHandle<C>, SpawnNodeError>
where
C: NodeConfigCommon + Serialize,
{
let (dir, config, addr, testing_addr) = prepare_node_config(config, log_prefix, enable_logging);
let (dir, config, addr, testing_addr) =
prepare_node_config(config, log_prefix, enable_logging)?;
let config_path = dir.path().join(config_filename);
super::lifecycle::spawn::write_config_with_injection(&config, &config_path, |yaml| {
crate::nodes::common::config::injection::inject_ibd_into_cryptarchia(yaml)
})
.expect("failed to write node config");
.map_err(|source| SpawnNodeError::WriteConfig {
path: config_path.clone(),
source,
})?;
debug!(config_file = %config_path.display(), binary = %binary_path.display(), "spawning node process");
let child = Command::new(binary_path)
let child = Command::new(&binary_path)
.arg(&config_path)
.current_dir(dir.path())
.stdin(Stdio::null())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()
.expect("failed to spawn node process");
.map_err(|source| SpawnNodeError::Spawn {
binary: binary_path.clone(),
source,
})?;
let mut handle = NodeHandle::new(child, dir, config, ApiClient::new(addr, testing_addr));
@ -160,7 +199,7 @@ where
if let Err(err) = ready {
// Persist tempdir to aid debugging if readiness fails.
let _ = persist_tempdir(&mut handle.tempdir, "nomos-node");
return Err(err);
return Err(SpawnNodeError::Readiness { source: err });
}
info!("node readiness confirmed via consensus_info");

View File

@ -17,7 +17,7 @@ use crate::{
common::{
binary::{BinaryConfig, BinaryResolver},
lifecycle::{kill::kill_child, monitor::is_running},
node::{NodeAddresses, NodeConfigCommon, NodeHandle, spawn_node},
node::{NodeAddresses, NodeConfigCommon, NodeHandle, SpawnNodeError, spawn_node},
},
},
};
@ -60,7 +60,7 @@ impl Drop for Executor {
}
impl Executor {
pub async fn spawn(config: Config) -> Self {
pub async fn spawn(config: Config) -> Result<Self, SpawnNodeError> {
let handle = spawn_node(
config,
LOGS_PREFIX,
@ -68,12 +68,11 @@ impl Executor {
binary_path(),
!*IS_DEBUG_TRACING,
)
.await
.expect("executor did not become ready");
.await?;
info!("executor spawned and ready");
Self { handle }
Ok(Self { handle })
}
/// Check if the executor process is still running

View File

@ -3,7 +3,6 @@ use std::{ops::Deref, path::PathBuf, time::Duration};
use nomos_node::Config;
use nomos_tracing_service::LoggerLayer;
pub use testing_framework_config::nodes::validator::create_validator_config;
use tokio::time::error::Elapsed;
use tracing::{debug, info};
use super::{persist_tempdir, should_persist_tempdir};
@ -14,7 +13,7 @@ use crate::{
common::{
binary::{BinaryConfig, BinaryResolver},
lifecycle::{kill::kill_child, monitor::is_running},
node::{NodeAddresses, NodeConfigCommon, NodeHandle, spawn_node},
node::{NodeAddresses, NodeConfigCommon, NodeHandle, SpawnNodeError, spawn_node},
},
},
};
@ -73,7 +72,7 @@ impl Validator {
self.handle.wait_for_exit(timeout).await
}
pub async fn spawn(config: Config) -> Result<Self, Elapsed> {
pub async fn spawn(config: Config) -> Result<Self, SpawnNodeError> {
let handle = spawn_node(
config,
LOGS_PREFIX,

View File

@ -1,9 +1,11 @@
use std::collections::HashSet;
use nomos_core::sdp::SessionNumber;
use thiserror::Error;
use crate::{
nodes::{
common::node::SpawnNodeError,
executor::{Executor, create_executor_config},
validator::{Validator, create_validator_config},
},
@ -27,8 +29,14 @@ pub struct Topology {
pub type DeployedNodes = (Vec<Validator>, Vec<Executor>);
#[derive(Debug, Error)]
pub enum SpawnTopologyError {
#[error(transparent)]
Node(#[from] SpawnNodeError),
}
impl Topology {
pub async fn spawn(config: TopologyConfig) -> Self {
pub async fn spawn(config: TopologyConfig) -> Result<Self, SpawnTopologyError> {
let generated = TopologyBuilder::new(config.clone()).build();
let n_validators = config.n_validators;
let n_executors = config.n_executors;
@ -38,12 +46,12 @@ impl Topology {
.collect::<Vec<_>>();
let (validators, executors) =
Self::spawn_validators_executors(node_configs, n_validators, n_executors).await;
Self::spawn_validators_executors(node_configs, n_validators, n_executors).await?;
Self {
Ok(Self {
validators,
executors,
}
})
}
pub async fn spawn_with_empty_membership(
@ -51,7 +59,7 @@ impl Topology {
ids: &[[u8; 32]],
da_ports: &[u16],
blend_ports: &[u16],
) -> Self {
) -> Result<Self, SpawnTopologyError> {
let generated = TopologyBuilder::new(config.clone())
.with_ids(ids.to_vec())
.with_da_ports(da_ports.to_vec())
@ -65,32 +73,32 @@ impl Topology {
let (validators, executors) =
Self::spawn_validators_executors(node_configs, config.n_validators, config.n_executors)
.await;
.await?;
Self {
Ok(Self {
validators,
executors,
}
})
}
pub(crate) async fn spawn_validators_executors(
config: Vec<GeneralConfig>,
n_validators: usize,
n_executors: usize,
) -> DeployedNodes {
) -> Result<DeployedNodes, SpawnTopologyError> {
let mut validators = Vec::new();
for i in 0..n_validators {
let config = create_validator_config(config[i].clone());
validators.push(Validator::spawn(config).await.unwrap());
validators.push(Validator::spawn(config).await?);
}
let mut executors = Vec::new();
for i in 0..n_executors {
let config = create_executor_config(config[n_validators + i].clone());
executors.push(Executor::spawn(config).await);
executors.push(Executor::spawn(config).await?);
}
(validators, executors)
Ok((validators, executors))
}
#[must_use]

View File

@ -5,7 +5,7 @@ use reqwest::{Client, Url};
use crate::topology::{
config::TopologyConfig,
configs::{GeneralConfig, wallet::WalletAccount},
deployment::Topology,
deployment::{SpawnTopologyError, Topology},
readiness::{HttpMembershipReadiness, HttpNetworkReadiness, ReadinessCheck, ReadinessError},
};
@ -103,7 +103,7 @@ impl GeneratedTopology {
&self.config.wallet_config.accounts
}
pub async fn spawn_local(&self) -> Topology {
pub async fn spawn_local(&self) -> Result<Topology, SpawnTopologyError> {
let configs = self
.nodes()
.map(|node| node.general.clone())
@ -114,12 +114,12 @@ impl GeneratedTopology {
self.config.n_validators,
self.config.n_executors,
)
.await;
.await?;
Topology {
Ok(Topology {
validators,
executors,
}
})
}
pub async fn wait_remote_readiness(

View File

@ -4,7 +4,10 @@ use testing_framework_core::{
BlockFeed, BlockFeedTask, Deployer, DynError, Metrics, NodeClients, RunContext, Runner,
Scenario, ScenarioError, spawn_block_feed,
},
topology::{deployment::Topology, readiness::ReadinessError},
topology::{
deployment::{SpawnTopologyError, Topology},
readiness::ReadinessError,
},
};
use thiserror::Error;
use tracing::{debug, info};
@ -19,6 +22,11 @@ pub struct LocalDeployer {
/// Errors surfaced by the local deployer while driving a scenario.
#[derive(Debug, Error)]
pub enum LocalDeployerError {
#[error("failed to spawn local topology: {source}")]
Spawn {
#[source]
source: SpawnTopologyError,
},
#[error("readiness probe failed: {source}")]
ReadinessFailed {
#[source]
@ -101,7 +109,11 @@ impl LocalDeployer {
executors = descriptors.executors().len(),
"spawning local validators/executors"
);
let topology = descriptors.clone().spawn_local().await;
let topology = descriptors
.clone()
.spawn_local()
.await
.map_err(|source| LocalDeployerError::Spawn { source })?;
let skip_membership = !membership_check;
wait_for_readiness(&topology, skip_membership)