core: split topology build/readiness phases

Refactors long functions into small helpers without behavior changes.

- TopologyBuilder::build
  - Before: validation + base config build + provider/genesis wiring + node descriptor assembly.
  - After: participant_count/resolve_and_validate_vectors/collect_provider_infos/
    create+apply_consensus_genesis_tx/build_node_descriptors.

- GeneratedTopology::wait_remote_readiness
  - Before: endpoint assertions + network readiness + membership endpoint selection.
  - After: collect_node_endpoints/wait_for_network_readiness/
    collect_membership_endpoints/testing_base_url.

- NetworkReadiness::collect + MembershipReadiness::collect
  - Before: duplicated validator/executor join_all blocks.
  - After: collect_validator_statuses/collect_executor_statuses helpers.

- spawn_node
  - Before: mixed config IO + process spawn + readiness polling.
  - After: write_node_config/spawn_node_process/wait_for_consensus_readiness.
This commit is contained in:
andrussal 2025-12-19 00:05:32 +01:00
parent a1e0dddea3
commit 4d2c4c77e4
5 changed files with 477 additions and 372 deletions

View File

@ -160,41 +160,18 @@ where
{
let (dir, config, addr, testing_addr) =
prepare_node_config(config, log_prefix, enable_logging)?;
let config_path = dir.path().join(config_filename);
super::lifecycle::spawn::write_config_with_injection(&config, &config_path, |yaml| {
crate::nodes::common::config::injection::inject_ibd_into_cryptarchia(yaml)
})
.map_err(|source| SpawnNodeError::WriteConfig {
path: config_path.clone(),
source,
})?;
write_node_config(&config, &config_path)?;
debug!(config_file = %config_path.display(), binary = %binary_path.display(), "spawning node process");
let child = Command::new(&binary_path)
.arg(&config_path)
.current_dir(dir.path())
.stdin(Stdio::null())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()
.map_err(|source| SpawnNodeError::Spawn {
binary: binary_path.clone(),
source,
})?;
let child = spawn_node_process(&binary_path, &config_path, dir.path())?;
let mut handle = NodeHandle::new(child, dir, config, ApiClient::new(addr, testing_addr));
// Wait for readiness via consensus_info
let ready = time::timeout(STARTUP_TIMEOUT, async {
loop {
if handle.api.consensus_info().await.is_ok() {
break;
}
time::sleep(STARTUP_POLL_INTERVAL).await;
}
})
.await;
let ready = wait_for_consensus_readiness(&handle.api).await;
if let Err(err) = ready {
// Persist tempdir to aid debugging if readiness fails.
@ -205,3 +182,43 @@ where
info!("node readiness confirmed via consensus_info");
Ok(handle)
}
fn write_node_config<C: Serialize>(config: &C, config_path: &Path) -> Result<(), SpawnNodeError> {
super::lifecycle::spawn::write_config_with_injection(config, config_path, |yaml| {
crate::nodes::common::config::injection::inject_ibd_into_cryptarchia(yaml)
})
.map_err(|source| SpawnNodeError::WriteConfig {
path: config_path.to_path_buf(),
source,
})
}
fn spawn_node_process(
binary_path: &Path,
config_path: &Path,
workdir: &Path,
) -> Result<Child, SpawnNodeError> {
Command::new(binary_path)
.arg(config_path)
.current_dir(workdir)
.stdin(Stdio::null())
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()
.map_err(|source| SpawnNodeError::Spawn {
binary: binary_path.to_path_buf(),
source,
})
}
async fn wait_for_consensus_readiness(api: &ApiClient) -> Result<(), time::error::Elapsed> {
time::timeout(STARTUP_TIMEOUT, async {
loop {
if api.consensus_info().await.is_ok() {
break;
}
time::sleep(STARTUP_POLL_INTERVAL).await;
}
})
.await
}

View File

@ -276,16 +276,10 @@ impl TopologyBuilder {
blend_ports,
} = self;
let n_participants = config.n_validators + config.n_executors;
if n_participants == 0 {
return Err(TopologyBuildError::EmptyParticipants);
}
let n_participants = participant_count(&config)?;
let ids = resolve_ids(ids, n_participants)?;
let da_ports = resolve_ports(da_ports, n_participants, "DA")?;
let blend_ports = resolve_ports(blend_ports, n_participants, "Blend")?;
validate_generated_vectors(n_participants, &ids, &da_ports, &blend_ports)?;
let (ids, da_ports, blend_ports) =
resolve_and_validate_vectors(ids, da_ports, blend_ports, n_participants)?;
let BaseConfigs {
mut consensus_configs,
@ -302,6 +296,7 @@ impl TopologyBuilder {
&da_ports,
&blend_ports,
)?;
let api_configs = create_api_configs(&ids)?;
let tracing_configs = create_tracing_configs(&ids);
let time_config = default_time_config();
@ -309,177 +304,30 @@ impl TopologyBuilder {
let first_consensus = consensus_configs
.first()
.ok_or(TopologyBuildError::MissingConsensusConfig)?;
let mut providers = Vec::with_capacity(da_configs.len() + blend_configs.len());
for (i, da_conf) in da_configs.iter().enumerate() {
let note = first_consensus
.da_notes
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "da_notes",
expected: da_configs.len(),
actual: first_consensus.da_notes.len(),
})?
.clone();
providers.push(ProviderInfo {
service_type: ServiceType::DataAvailability,
provider_sk: da_conf.signer.clone(),
zk_sk: da_conf.secret_zk_key.clone(),
locator: Locator(da_conf.listening_address.clone()),
note,
});
}
for (i, blend_conf) in blend_configs.iter().enumerate() {
let note = first_consensus
.blend_notes
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "blend_notes",
expected: blend_configs.len(),
actual: first_consensus.blend_notes.len(),
})?
.clone();
providers.push(ProviderInfo {
service_type: ServiceType::BlendNetwork,
provider_sk: blend_conf.signer.clone(),
zk_sk: blend_conf.secret_zk_key.clone(),
locator: Locator(blend_conf.backend_core.listening_address.clone()),
note,
});
}
let providers = collect_provider_infos(first_consensus, &da_configs, &blend_configs)?;
let ledger_tx = first_consensus.genesis_tx.mantle_tx().ledger_tx.clone();
let genesis_tx = create_genesis_tx_with_declarations(ledger_tx, providers)?;
for c in &mut consensus_configs {
c.genesis_tx = genesis_tx.clone();
}
let genesis_tx = create_consensus_genesis_tx(first_consensus, providers)?;
apply_consensus_genesis_tx(&mut consensus_configs, &genesis_tx);
let kms_configs =
create_kms_configs(&blend_configs, &da_configs, &config.wallet_config.accounts);
let mut validators = Vec::with_capacity(config.n_validators);
let mut executors = Vec::with_capacity(config.n_executors);
for i in 0..n_participants {
let consensus_config = consensus_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "consensus_configs",
expected: n_participants,
actual: consensus_configs.len(),
})?
.clone();
let bootstrapping_config = bootstrapping_config
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "bootstrap_configs",
expected: n_participants,
actual: bootstrapping_config.len(),
})?
.clone();
let da_config = da_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "da_configs",
expected: n_participants,
actual: da_configs.len(),
})?
.clone();
let network_config = network_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "network_configs",
expected: n_participants,
actual: network_configs.len(),
})?
.clone();
let blend_config = blend_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "blend_configs",
expected: n_participants,
actual: blend_configs.len(),
})?
.clone();
let api_config = api_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "api_configs",
expected: n_participants,
actual: api_configs.len(),
})?
.clone();
let tracing_config = tracing_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "tracing_configs",
expected: n_participants,
actual: tracing_configs.len(),
})?
.clone();
let kms_config = kms_configs
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "kms_configs",
expected: n_participants,
actual: kms_configs.len(),
})?
.clone();
let id = *ids.get(i).ok_or(TopologyBuildError::VectorLenMismatch {
label: "ids",
expected: n_participants,
actual: ids.len(),
})?;
let da_port = *da_ports
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "da_ports",
expected: n_participants,
actual: da_ports.len(),
})?;
let blend_port = *blend_ports
.get(i)
.ok_or(TopologyBuildError::VectorLenMismatch {
label: "blend_ports",
expected: n_participants,
actual: blend_ports.len(),
})?;
let general = GeneralConfig {
consensus_config,
bootstrapping_config,
da_config,
network_config,
blend_config,
api_config,
tracing_config,
time_config: time_config.clone(),
kms_config,
};
let role = if i < config.n_validators {
NodeRole::Validator
} else {
NodeRole::Executor
};
let index = match role {
NodeRole::Validator => i,
NodeRole::Executor => i - config.n_validators,
};
let descriptor = GeneratedNodeConfig {
role,
index,
id,
general,
da_port,
blend_port,
};
match role {
NodeRole::Validator => validators.push(descriptor),
NodeRole::Executor => executors.push(descriptor),
}
}
let (validators, executors) = build_node_descriptors(
&config,
n_participants,
&ids,
&da_ports,
&blend_ports,
&consensus_configs,
&bootstrapping_config,
&da_configs,
&network_configs,
&blend_configs,
&api_configs,
&tracing_configs,
&kms_configs,
&time_config,
)?;
Ok(GeneratedTopology {
config,
@ -493,3 +341,188 @@ impl TopologyBuilder {
&self.config
}
}
fn participant_count(config: &TopologyConfig) -> Result<usize, TopologyBuildError> {
let n_participants = config.n_validators + config.n_executors;
if n_participants == 0 {
return Err(TopologyBuildError::EmptyParticipants);
}
Ok(n_participants)
}
fn resolve_and_validate_vectors(
ids: Option<Vec<[u8; 32]>>,
da_ports: Option<Vec<u16>>,
blend_ports: Option<Vec<u16>>,
n_participants: usize,
) -> Result<(Vec<[u8; 32]>, Vec<u16>, Vec<u16>), TopologyBuildError> {
let ids = resolve_ids(ids, n_participants)?;
let da_ports = resolve_ports(da_ports, n_participants, "DA")?;
let blend_ports = resolve_ports(blend_ports, n_participants, "Blend")?;
validate_generated_vectors(n_participants, &ids, &da_ports, &blend_ports)?;
Ok((ids, da_ports, blend_ports))
}
fn collect_provider_infos(
first_consensus: &testing_framework_config::topology::configs::consensus::GeneralConsensusConfig,
da_configs: &[testing_framework_config::topology::configs::da::GeneralDaConfig],
blend_configs: &[testing_framework_config::topology::configs::blend::GeneralBlendConfig],
) -> Result<Vec<ProviderInfo>, TopologyBuildError> {
let mut providers = Vec::with_capacity(da_configs.len() + blend_configs.len());
for (i, da_conf) in da_configs.iter().enumerate() {
let note = get_cloned("da_notes", &first_consensus.da_notes, i, da_configs.len())?;
providers.push(ProviderInfo {
service_type: ServiceType::DataAvailability,
provider_sk: da_conf.signer.clone(),
zk_sk: da_conf.secret_zk_key.clone(),
locator: Locator(da_conf.listening_address.clone()),
note,
});
}
for (i, blend_conf) in blend_configs.iter().enumerate() {
let note = get_cloned(
"blend_notes",
&first_consensus.blend_notes,
i,
blend_configs.len(),
)?;
providers.push(ProviderInfo {
service_type: ServiceType::BlendNetwork,
provider_sk: blend_conf.signer.clone(),
zk_sk: blend_conf.secret_zk_key.clone(),
locator: Locator(blend_conf.backend_core.listening_address.clone()),
note,
});
}
Ok(providers)
}
fn create_consensus_genesis_tx(
first_consensus: &testing_framework_config::topology::configs::consensus::GeneralConsensusConfig,
providers: Vec<ProviderInfo>,
) -> Result<nomos_core::mantle::genesis_tx::GenesisTx, TopologyBuildError> {
let ledger_tx = first_consensus.genesis_tx.mantle_tx().ledger_tx.clone();
Ok(create_genesis_tx_with_declarations(ledger_tx, providers)?)
}
fn apply_consensus_genesis_tx(
consensus_configs: &mut [testing_framework_config::topology::configs::consensus::GeneralConsensusConfig],
genesis_tx: &nomos_core::mantle::genesis_tx::GenesisTx,
) {
for c in consensus_configs {
c.genesis_tx = genesis_tx.clone();
}
}
#[allow(clippy::too_many_arguments)]
fn build_node_descriptors(
config: &TopologyConfig,
n_participants: usize,
ids: &[[u8; 32]],
da_ports: &[u16],
blend_ports: &[u16],
consensus_configs: &[testing_framework_config::topology::configs::consensus::GeneralConsensusConfig],
bootstrapping_config: &[testing_framework_config::topology::configs::bootstrap::GeneralBootstrapConfig],
da_configs: &[testing_framework_config::topology::configs::da::GeneralDaConfig],
network_configs: &[testing_framework_config::topology::configs::network::GeneralNetworkConfig],
blend_configs: &[testing_framework_config::topology::configs::blend::GeneralBlendConfig],
api_configs: &[testing_framework_config::topology::configs::api::GeneralApiConfig],
tracing_configs: &[testing_framework_config::topology::configs::tracing::GeneralTracingConfig],
kms_configs: &[key_management_system_service::backend::preload::PreloadKMSBackendSettings],
time_config: &testing_framework_config::topology::configs::time::GeneralTimeConfig,
) -> Result<(Vec<GeneratedNodeConfig>, Vec<GeneratedNodeConfig>), TopologyBuildError> {
let mut validators = Vec::with_capacity(config.n_validators);
let mut executors = Vec::with_capacity(config.n_executors);
for i in 0..n_participants {
let consensus_config =
get_cloned("consensus_configs", consensus_configs, i, n_participants)?;
let bootstrapping_config =
get_cloned("bootstrap_configs", bootstrapping_config, i, n_participants)?;
let da_config = get_cloned("da_configs", da_configs, i, n_participants)?;
let network_config = get_cloned("network_configs", network_configs, i, n_participants)?;
let blend_config = get_cloned("blend_configs", blend_configs, i, n_participants)?;
let api_config = get_cloned("api_configs", api_configs, i, n_participants)?;
let tracing_config = get_cloned("tracing_configs", tracing_configs, i, n_participants)?;
let kms_config = get_cloned("kms_configs", kms_configs, i, n_participants)?;
let id = get_copied("ids", ids, i, n_participants)?;
let da_port = get_copied("da_ports", da_ports, i, n_participants)?;
let blend_port = get_copied("blend_ports", blend_ports, i, n_participants)?;
let general = GeneralConfig {
consensus_config,
bootstrapping_config,
da_config,
network_config,
blend_config,
api_config,
tracing_config,
time_config: time_config.clone(),
kms_config,
};
let (role, index) = node_role_and_index(i, config.n_validators);
let descriptor = GeneratedNodeConfig {
role,
index,
id,
general,
da_port,
blend_port,
};
match role {
NodeRole::Validator => validators.push(descriptor),
NodeRole::Executor => executors.push(descriptor),
}
}
Ok((validators, executors))
}
fn node_role_and_index(i: usize, n_validators: usize) -> (NodeRole, usize) {
if i < n_validators {
(NodeRole::Validator, i)
} else {
(NodeRole::Executor, i - n_validators)
}
}
fn get_cloned<T: Clone>(
label: &'static str,
items: &[T],
index: usize,
expected: usize,
) -> Result<T, TopologyBuildError> {
items
.get(index)
.cloned()
.ok_or(TopologyBuildError::VectorLenMismatch {
label,
expected,
actual: items.len(),
})
}
fn get_copied<T: Copy>(
label: &'static str,
items: &[T],
index: usize,
expected: usize,
) -> Result<T, TopologyBuildError> {
items
.get(index)
.copied()
.ok_or(TopologyBuildError::VectorLenMismatch {
label,
expected,
actual: items.len(),
})
}

View File

@ -137,81 +137,20 @@ impl GeneratedTopology {
return Ok(());
}
assert_eq!(
self.validators.len(),
validator_endpoints.len(),
"validator endpoints must match topology"
);
assert_eq!(
self.executors.len(),
executor_endpoints.len(),
"executor endpoints must match topology"
);
let mut endpoints = Vec::with_capacity(total_nodes);
endpoints.extend_from_slice(validator_endpoints);
endpoints.extend_from_slice(executor_endpoints);
let labels = self.labels();
let client = Client::new();
let make_testing_base_url = |port: u16| -> Url {
Url::parse(&format!("http://127.0.0.1:{port}/")).unwrap_or_else(|_| unsafe {
// Safety: `port` is a valid u16 port.
std::hint::unreachable_unchecked()
})
};
let endpoints =
collect_node_endpoints(self, validator_endpoints, executor_endpoints, total_nodes);
if endpoints.len() > 1 {
let listen_ports = self.listen_ports();
let initial_peer_ports = self.initial_peer_ports();
let expected_peer_counts = crate::topology::generation::find_expected_peer_counts(
&listen_ports,
&initial_peer_ports,
);
wait_for_network_readiness(self, &client, &endpoints, &labels).await?;
let network_check = HttpNetworkReadiness {
client: &client,
endpoints: &endpoints,
expected_peer_counts: &expected_peer_counts,
labels: &labels,
};
network_check.wait().await?;
}
let mut membership_endpoints = Vec::with_capacity(total_nodes);
if let Some(urls) = validator_membership_endpoints {
assert_eq!(
self.validators.len(),
urls.len(),
"validator membership endpoints must match topology"
);
membership_endpoints.extend_from_slice(urls);
} else {
membership_endpoints.extend(
self.validators
.iter()
.map(|node| make_testing_base_url(node.testing_http_port())),
);
}
if let Some(urls) = executor_membership_endpoints {
assert_eq!(
self.executors.len(),
urls.len(),
"executor membership endpoints must match topology"
);
membership_endpoints.extend_from_slice(urls);
} else {
membership_endpoints.extend(
self.executors
.iter()
.map(|node| make_testing_base_url(node.testing_http_port())),
);
}
let membership_endpoints = collect_membership_endpoints(
self,
total_nodes,
validator_membership_endpoints,
executor_membership_endpoints,
);
let membership_check = HttpMembershipReadiness {
client: &client,
@ -280,6 +219,100 @@ impl GeneratedTopology {
}
}
fn collect_node_endpoints(
topology: &GeneratedTopology,
validator_endpoints: &[Url],
executor_endpoints: &[Url],
total_nodes: usize,
) -> Vec<Url> {
assert_eq!(
topology.validators.len(),
validator_endpoints.len(),
"validator endpoints must match topology"
);
assert_eq!(
topology.executors.len(),
executor_endpoints.len(),
"executor endpoints must match topology"
);
let mut endpoints = Vec::with_capacity(total_nodes);
endpoints.extend_from_slice(validator_endpoints);
endpoints.extend_from_slice(executor_endpoints);
endpoints
}
async fn wait_for_network_readiness(
topology: &GeneratedTopology,
client: &Client,
endpoints: &[Url],
labels: &[String],
) -> Result<(), ReadinessError> {
if endpoints.len() <= 1 {
return Ok(());
}
let listen_ports = topology.listen_ports();
let initial_peer_ports = topology.initial_peer_ports();
let expected_peer_counts =
crate::topology::generation::find_expected_peer_counts(&listen_ports, &initial_peer_ports);
let network_check = HttpNetworkReadiness {
client,
endpoints,
expected_peer_counts: &expected_peer_counts,
labels,
};
network_check.wait().await
}
fn collect_membership_endpoints(
topology: &GeneratedTopology,
total_nodes: usize,
validator_membership_endpoints: Option<&[Url]>,
executor_membership_endpoints: Option<&[Url]>,
) -> Vec<Url> {
let mut membership_endpoints = Vec::with_capacity(total_nodes);
membership_endpoints.extend(collect_role_membership_endpoints(
&topology.validators,
validator_membership_endpoints,
"validator membership endpoints must match topology",
));
membership_endpoints.extend(collect_role_membership_endpoints(
&topology.executors,
executor_membership_endpoints,
"executor membership endpoints must match topology",
));
membership_endpoints
}
fn collect_role_membership_endpoints(
nodes: &[GeneratedNodeConfig],
membership_endpoints: Option<&[Url]>,
mismatch_message: &'static str,
) -> Vec<Url> {
match membership_endpoints {
Some(urls) => {
assert_eq!(nodes.len(), urls.len(), "{mismatch_message}");
urls.to_vec()
}
None => nodes
.iter()
.map(|node| testing_base_url(node.testing_http_port()))
.collect(),
}
}
fn testing_base_url(port: u16) -> Url {
Url::parse(&format!("http://127.0.0.1:{port}/")).unwrap_or_else(|_| unsafe {
// Safety: `port` is a valid u16 port.
std::hint::unreachable_unchecked()
})
}
pub fn find_expected_peer_counts(
listen_ports: &[u16],
initial_peer_ports: &[HashSet<u16>],

View File

@ -38,52 +38,9 @@ impl<'a> ReadinessCheck<'a> for MembershipReadiness<'a> {
type Data = Vec<NodeMembershipStatus>;
async fn collect(&'a self) -> Self::Data {
let validator_futures = self
.topology
.validators
.iter()
.enumerate()
.map(|(idx, node)| {
let label = self
.labels
.get(idx)
.cloned()
.unwrap_or_else(|| format!("validator#{idx}"));
async move {
let result = node
.api()
.da_get_membership_checked(&self.session)
.await
.map_err(MembershipError::from);
NodeMembershipStatus { label, result }
}
});
let offset = self.topology.validators.len();
let executor_futures = self
.topology
.executors
.iter()
.enumerate()
.map(|(idx, node)| {
let global_idx = offset + idx;
let label = self
.labels
.get(global_idx)
.cloned()
.unwrap_or_else(|| format!("executor#{idx}"));
async move {
let result = node
.api()
.da_get_membership_checked(&self.session)
.await
.map_err(MembershipError::from);
NodeMembershipStatus { label, result }
}
});
let (validator_statuses, executor_statuses) = tokio::join!(
futures::future::join_all(validator_futures),
futures::future::join_all(executor_futures)
collect_validator_statuses(self),
collect_executor_statuses(self)
);
validator_statuses
.into_iter()
@ -150,6 +107,62 @@ impl<'a> ReadinessCheck<'a> for HttpMembershipReadiness<'a> {
}
}
async fn collect_validator_statuses(
readiness: &MembershipReadiness<'_>,
) -> Vec<NodeMembershipStatus> {
let validator_futures = readiness
.topology
.validators
.iter()
.enumerate()
.map(|(idx, node)| {
let label = readiness
.labels
.get(idx)
.cloned()
.unwrap_or_else(|| format!("validator#{idx}"));
async move {
let result = node
.api()
.da_get_membership_checked(&readiness.session)
.await
.map_err(MembershipError::from);
NodeMembershipStatus { label, result }
}
});
futures::future::join_all(validator_futures).await
}
async fn collect_executor_statuses(
readiness: &MembershipReadiness<'_>,
) -> Vec<NodeMembershipStatus> {
let offset = readiness.topology.validators.len();
let executor_futures = readiness
.topology
.executors
.iter()
.enumerate()
.map(|(idx, node)| {
let global_idx = offset + idx;
let label = readiness
.labels
.get(global_idx)
.cloned()
.unwrap_or_else(|| format!("executor#{idx}"));
async move {
let result = node
.api()
.da_get_membership_checked(&readiness.session)
.await
.map_err(MembershipError::from);
NodeMembershipStatus { label, result }
}
});
futures::future::join_all(executor_futures).await
}
pub async fn try_fetch_membership(
client: &Client,
base: &Url,

View File

@ -36,62 +36,9 @@ impl<'a> ReadinessCheck<'a> for NetworkReadiness<'a> {
type Data = Vec<NodeNetworkStatus>;
async fn collect(&'a self) -> Self::Data {
let validator_futures = self
.topology
.validators
.iter()
.enumerate()
.map(|(idx, node)| {
let label = self
.labels
.get(idx)
.cloned()
.unwrap_or_else(|| format!("validator#{idx}"));
let expected_peers = self.expected_peer_counts.get(idx).copied();
async move {
let result = node
.api()
.network_info()
.await
.map_err(NetworkInfoError::from);
NodeNetworkStatus {
label,
expected_peers,
result,
}
}
});
let offset = self.topology.validators.len();
let executor_futures = self
.topology
.executors
.iter()
.enumerate()
.map(|(idx, node)| {
let global_idx = offset + idx;
let label = self
.labels
.get(global_idx)
.cloned()
.unwrap_or_else(|| format!("executor#{idx}"));
let expected_peers = self.expected_peer_counts.get(global_idx).copied();
async move {
let result = node
.api()
.network_info()
.await
.map_err(NetworkInfoError::from);
NodeNetworkStatus {
label,
expected_peers,
result,
}
}
});
let (validator_statuses, executor_statuses) = tokio::join!(
futures::future::join_all(validator_futures),
futures::future::join_all(executor_futures)
collect_validator_statuses(self),
collect_executor_statuses(self)
);
validator_statuses
.into_iter()
@ -160,6 +107,68 @@ impl<'a> ReadinessCheck<'a> for HttpNetworkReadiness<'a> {
}
}
async fn collect_validator_statuses(readiness: &NetworkReadiness<'_>) -> Vec<NodeNetworkStatus> {
let validator_futures = readiness
.topology
.validators
.iter()
.enumerate()
.map(|(idx, node)| {
let label = readiness
.labels
.get(idx)
.cloned()
.unwrap_or_else(|| format!("validator#{idx}"));
let expected_peers = readiness.expected_peer_counts.get(idx).copied();
async move {
let result = node
.api()
.network_info()
.await
.map_err(NetworkInfoError::from);
NodeNetworkStatus {
label,
expected_peers,
result,
}
}
});
futures::future::join_all(validator_futures).await
}
async fn collect_executor_statuses(readiness: &NetworkReadiness<'_>) -> Vec<NodeNetworkStatus> {
let offset = readiness.topology.validators.len();
let executor_futures = readiness
.topology
.executors
.iter()
.enumerate()
.map(|(idx, node)| {
let global_idx = offset + idx;
let label = readiness
.labels
.get(global_idx)
.cloned()
.unwrap_or_else(|| format!("executor#{idx}"));
let expected_peers = readiness.expected_peer_counts.get(global_idx).copied();
async move {
let result = node
.api()
.network_info()
.await
.map_err(NetworkInfoError::from);
NodeNetworkStatus {
label,
expected_peers,
result,
}
}
});
futures::future::join_all(executor_futures).await
}
pub async fn try_fetch_network_info(
client: &Client,
base: &Url,