diff --git a/testing-framework/core/src/nodes/common/node.rs b/testing-framework/core/src/nodes/common/node.rs index d8da6bd..3553423 100644 --- a/testing-framework/core/src/nodes/common/node.rs +++ b/testing-framework/core/src/nodes/common/node.rs @@ -160,41 +160,18 @@ where { let (dir, config, addr, testing_addr) = prepare_node_config(config, log_prefix, enable_logging)?; + let config_path = dir.path().join(config_filename); - super::lifecycle::spawn::write_config_with_injection(&config, &config_path, |yaml| { - crate::nodes::common::config::injection::inject_ibd_into_cryptarchia(yaml) - }) - .map_err(|source| SpawnNodeError::WriteConfig { - path: config_path.clone(), - source, - })?; + write_node_config(&config, &config_path)?; debug!(config_file = %config_path.display(), binary = %binary_path.display(), "spawning node process"); - let child = Command::new(&binary_path) - .arg(&config_path) - .current_dir(dir.path()) - .stdin(Stdio::null()) - .stdout(Stdio::inherit()) - .stderr(Stdio::inherit()) - .spawn() - .map_err(|source| SpawnNodeError::Spawn { - binary: binary_path.clone(), - source, - })?; + let child = spawn_node_process(&binary_path, &config_path, dir.path())?; let mut handle = NodeHandle::new(child, dir, config, ApiClient::new(addr, testing_addr)); // Wait for readiness via consensus_info - let ready = time::timeout(STARTUP_TIMEOUT, async { - loop { - if handle.api.consensus_info().await.is_ok() { - break; - } - time::sleep(STARTUP_POLL_INTERVAL).await; - } - }) - .await; + let ready = wait_for_consensus_readiness(&handle.api).await; if let Err(err) = ready { // Persist tempdir to aid debugging if readiness fails. @@ -205,3 +182,43 @@ where info!("node readiness confirmed via consensus_info"); Ok(handle) } + +fn write_node_config(config: &C, config_path: &Path) -> Result<(), SpawnNodeError> { + super::lifecycle::spawn::write_config_with_injection(config, config_path, |yaml| { + crate::nodes::common::config::injection::inject_ibd_into_cryptarchia(yaml) + }) + .map_err(|source| SpawnNodeError::WriteConfig { + path: config_path.to_path_buf(), + source, + }) +} + +fn spawn_node_process( + binary_path: &Path, + config_path: &Path, + workdir: &Path, +) -> Result { + Command::new(binary_path) + .arg(config_path) + .current_dir(workdir) + .stdin(Stdio::null()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .spawn() + .map_err(|source| SpawnNodeError::Spawn { + binary: binary_path.to_path_buf(), + source, + }) +} + +async fn wait_for_consensus_readiness(api: &ApiClient) -> Result<(), time::error::Elapsed> { + time::timeout(STARTUP_TIMEOUT, async { + loop { + if api.consensus_info().await.is_ok() { + break; + } + time::sleep(STARTUP_POLL_INTERVAL).await; + } + }) + .await +} diff --git a/testing-framework/core/src/topology/config.rs b/testing-framework/core/src/topology/config.rs index 1b67747..ec21478 100644 --- a/testing-framework/core/src/topology/config.rs +++ b/testing-framework/core/src/topology/config.rs @@ -276,16 +276,10 @@ impl TopologyBuilder { blend_ports, } = self; - let n_participants = config.n_validators + config.n_executors; - if n_participants == 0 { - return Err(TopologyBuildError::EmptyParticipants); - } + let n_participants = participant_count(&config)?; - let ids = resolve_ids(ids, n_participants)?; - let da_ports = resolve_ports(da_ports, n_participants, "DA")?; - let blend_ports = resolve_ports(blend_ports, n_participants, "Blend")?; - - validate_generated_vectors(n_participants, &ids, &da_ports, &blend_ports)?; + let (ids, da_ports, blend_ports) = + resolve_and_validate_vectors(ids, da_ports, blend_ports, n_participants)?; let BaseConfigs { mut consensus_configs, @@ -302,6 +296,7 @@ impl TopologyBuilder { &da_ports, &blend_ports, )?; + let api_configs = create_api_configs(&ids)?; let tracing_configs = create_tracing_configs(&ids); let time_config = default_time_config(); @@ -309,177 +304,30 @@ impl TopologyBuilder { let first_consensus = consensus_configs .first() .ok_or(TopologyBuildError::MissingConsensusConfig)?; - let mut providers = Vec::with_capacity(da_configs.len() + blend_configs.len()); - for (i, da_conf) in da_configs.iter().enumerate() { - let note = first_consensus - .da_notes - .get(i) - .ok_or(TopologyBuildError::VectorLenMismatch { - label: "da_notes", - expected: da_configs.len(), - actual: first_consensus.da_notes.len(), - })? - .clone(); - providers.push(ProviderInfo { - service_type: ServiceType::DataAvailability, - provider_sk: da_conf.signer.clone(), - zk_sk: da_conf.secret_zk_key.clone(), - locator: Locator(da_conf.listening_address.clone()), - note, - }); - } - for (i, blend_conf) in blend_configs.iter().enumerate() { - let note = first_consensus - .blend_notes - .get(i) - .ok_or(TopologyBuildError::VectorLenMismatch { - label: "blend_notes", - expected: blend_configs.len(), - actual: first_consensus.blend_notes.len(), - })? - .clone(); - providers.push(ProviderInfo { - service_type: ServiceType::BlendNetwork, - provider_sk: blend_conf.signer.clone(), - zk_sk: blend_conf.secret_zk_key.clone(), - locator: Locator(blend_conf.backend_core.listening_address.clone()), - note, - }); - } + let providers = collect_provider_infos(first_consensus, &da_configs, &blend_configs)?; - let ledger_tx = first_consensus.genesis_tx.mantle_tx().ledger_tx.clone(); - let genesis_tx = create_genesis_tx_with_declarations(ledger_tx, providers)?; - for c in &mut consensus_configs { - c.genesis_tx = genesis_tx.clone(); - } + let genesis_tx = create_consensus_genesis_tx(first_consensus, providers)?; + apply_consensus_genesis_tx(&mut consensus_configs, &genesis_tx); let kms_configs = create_kms_configs(&blend_configs, &da_configs, &config.wallet_config.accounts); - let mut validators = Vec::with_capacity(config.n_validators); - let mut executors = Vec::with_capacity(config.n_executors); - - for i in 0..n_participants { - let consensus_config = consensus_configs - .get(i) - .ok_or(TopologyBuildError::VectorLenMismatch { - label: "consensus_configs", - expected: n_participants, - actual: consensus_configs.len(), - })? - .clone(); - let bootstrapping_config = bootstrapping_config - .get(i) - .ok_or(TopologyBuildError::VectorLenMismatch { - label: "bootstrap_configs", - expected: n_participants, - actual: bootstrapping_config.len(), - })? - .clone(); - let da_config = da_configs - .get(i) - .ok_or(TopologyBuildError::VectorLenMismatch { - label: "da_configs", - expected: n_participants, - actual: da_configs.len(), - })? - .clone(); - let network_config = network_configs - .get(i) - .ok_or(TopologyBuildError::VectorLenMismatch { - label: "network_configs", - expected: n_participants, - actual: network_configs.len(), - })? - .clone(); - let blend_config = blend_configs - .get(i) - .ok_or(TopologyBuildError::VectorLenMismatch { - label: "blend_configs", - expected: n_participants, - actual: blend_configs.len(), - })? - .clone(); - let api_config = api_configs - .get(i) - .ok_or(TopologyBuildError::VectorLenMismatch { - label: "api_configs", - expected: n_participants, - actual: api_configs.len(), - })? - .clone(); - let tracing_config = tracing_configs - .get(i) - .ok_or(TopologyBuildError::VectorLenMismatch { - label: "tracing_configs", - expected: n_participants, - actual: tracing_configs.len(), - })? - .clone(); - let kms_config = kms_configs - .get(i) - .ok_or(TopologyBuildError::VectorLenMismatch { - label: "kms_configs", - expected: n_participants, - actual: kms_configs.len(), - })? - .clone(); - let id = *ids.get(i).ok_or(TopologyBuildError::VectorLenMismatch { - label: "ids", - expected: n_participants, - actual: ids.len(), - })?; - let da_port = *da_ports - .get(i) - .ok_or(TopologyBuildError::VectorLenMismatch { - label: "da_ports", - expected: n_participants, - actual: da_ports.len(), - })?; - let blend_port = *blend_ports - .get(i) - .ok_or(TopologyBuildError::VectorLenMismatch { - label: "blend_ports", - expected: n_participants, - actual: blend_ports.len(), - })?; - - let general = GeneralConfig { - consensus_config, - bootstrapping_config, - da_config, - network_config, - blend_config, - api_config, - tracing_config, - time_config: time_config.clone(), - kms_config, - }; - - let role = if i < config.n_validators { - NodeRole::Validator - } else { - NodeRole::Executor - }; - let index = match role { - NodeRole::Validator => i, - NodeRole::Executor => i - config.n_validators, - }; - - let descriptor = GeneratedNodeConfig { - role, - index, - id, - general, - da_port, - blend_port, - }; - - match role { - NodeRole::Validator => validators.push(descriptor), - NodeRole::Executor => executors.push(descriptor), - } - } + let (validators, executors) = build_node_descriptors( + &config, + n_participants, + &ids, + &da_ports, + &blend_ports, + &consensus_configs, + &bootstrapping_config, + &da_configs, + &network_configs, + &blend_configs, + &api_configs, + &tracing_configs, + &kms_configs, + &time_config, + )?; Ok(GeneratedTopology { config, @@ -493,3 +341,188 @@ impl TopologyBuilder { &self.config } } + +fn participant_count(config: &TopologyConfig) -> Result { + let n_participants = config.n_validators + config.n_executors; + if n_participants == 0 { + return Err(TopologyBuildError::EmptyParticipants); + } + + Ok(n_participants) +} + +fn resolve_and_validate_vectors( + ids: Option>, + da_ports: Option>, + blend_ports: Option>, + n_participants: usize, +) -> Result<(Vec<[u8; 32]>, Vec, Vec), TopologyBuildError> { + let ids = resolve_ids(ids, n_participants)?; + let da_ports = resolve_ports(da_ports, n_participants, "DA")?; + let blend_ports = resolve_ports(blend_ports, n_participants, "Blend")?; + + validate_generated_vectors(n_participants, &ids, &da_ports, &blend_ports)?; + + Ok((ids, da_ports, blend_ports)) +} + +fn collect_provider_infos( + first_consensus: &testing_framework_config::topology::configs::consensus::GeneralConsensusConfig, + da_configs: &[testing_framework_config::topology::configs::da::GeneralDaConfig], + blend_configs: &[testing_framework_config::topology::configs::blend::GeneralBlendConfig], +) -> Result, TopologyBuildError> { + let mut providers = Vec::with_capacity(da_configs.len() + blend_configs.len()); + + for (i, da_conf) in da_configs.iter().enumerate() { + let note = get_cloned("da_notes", &first_consensus.da_notes, i, da_configs.len())?; + providers.push(ProviderInfo { + service_type: ServiceType::DataAvailability, + provider_sk: da_conf.signer.clone(), + zk_sk: da_conf.secret_zk_key.clone(), + locator: Locator(da_conf.listening_address.clone()), + note, + }); + } + + for (i, blend_conf) in blend_configs.iter().enumerate() { + let note = get_cloned( + "blend_notes", + &first_consensus.blend_notes, + i, + blend_configs.len(), + )?; + providers.push(ProviderInfo { + service_type: ServiceType::BlendNetwork, + provider_sk: blend_conf.signer.clone(), + zk_sk: blend_conf.secret_zk_key.clone(), + locator: Locator(blend_conf.backend_core.listening_address.clone()), + note, + }); + } + + Ok(providers) +} + +fn create_consensus_genesis_tx( + first_consensus: &testing_framework_config::topology::configs::consensus::GeneralConsensusConfig, + providers: Vec, +) -> Result { + let ledger_tx = first_consensus.genesis_tx.mantle_tx().ledger_tx.clone(); + Ok(create_genesis_tx_with_declarations(ledger_tx, providers)?) +} + +fn apply_consensus_genesis_tx( + consensus_configs: &mut [testing_framework_config::topology::configs::consensus::GeneralConsensusConfig], + genesis_tx: &nomos_core::mantle::genesis_tx::GenesisTx, +) { + for c in consensus_configs { + c.genesis_tx = genesis_tx.clone(); + } +} + +#[allow(clippy::too_many_arguments)] +fn build_node_descriptors( + config: &TopologyConfig, + n_participants: usize, + ids: &[[u8; 32]], + da_ports: &[u16], + blend_ports: &[u16], + consensus_configs: &[testing_framework_config::topology::configs::consensus::GeneralConsensusConfig], + bootstrapping_config: &[testing_framework_config::topology::configs::bootstrap::GeneralBootstrapConfig], + da_configs: &[testing_framework_config::topology::configs::da::GeneralDaConfig], + network_configs: &[testing_framework_config::topology::configs::network::GeneralNetworkConfig], + blend_configs: &[testing_framework_config::topology::configs::blend::GeneralBlendConfig], + api_configs: &[testing_framework_config::topology::configs::api::GeneralApiConfig], + tracing_configs: &[testing_framework_config::topology::configs::tracing::GeneralTracingConfig], + kms_configs: &[key_management_system_service::backend::preload::PreloadKMSBackendSettings], + time_config: &testing_framework_config::topology::configs::time::GeneralTimeConfig, +) -> Result<(Vec, Vec), TopologyBuildError> { + let mut validators = Vec::with_capacity(config.n_validators); + let mut executors = Vec::with_capacity(config.n_executors); + + for i in 0..n_participants { + let consensus_config = + get_cloned("consensus_configs", consensus_configs, i, n_participants)?; + let bootstrapping_config = + get_cloned("bootstrap_configs", bootstrapping_config, i, n_participants)?; + let da_config = get_cloned("da_configs", da_configs, i, n_participants)?; + let network_config = get_cloned("network_configs", network_configs, i, n_participants)?; + let blend_config = get_cloned("blend_configs", blend_configs, i, n_participants)?; + let api_config = get_cloned("api_configs", api_configs, i, n_participants)?; + let tracing_config = get_cloned("tracing_configs", tracing_configs, i, n_participants)?; + let kms_config = get_cloned("kms_configs", kms_configs, i, n_participants)?; + + let id = get_copied("ids", ids, i, n_participants)?; + let da_port = get_copied("da_ports", da_ports, i, n_participants)?; + let blend_port = get_copied("blend_ports", blend_ports, i, n_participants)?; + + let general = GeneralConfig { + consensus_config, + bootstrapping_config, + da_config, + network_config, + blend_config, + api_config, + tracing_config, + time_config: time_config.clone(), + kms_config, + }; + + let (role, index) = node_role_and_index(i, config.n_validators); + let descriptor = GeneratedNodeConfig { + role, + index, + id, + general, + da_port, + blend_port, + }; + + match role { + NodeRole::Validator => validators.push(descriptor), + NodeRole::Executor => executors.push(descriptor), + } + } + + Ok((validators, executors)) +} + +fn node_role_and_index(i: usize, n_validators: usize) -> (NodeRole, usize) { + if i < n_validators { + (NodeRole::Validator, i) + } else { + (NodeRole::Executor, i - n_validators) + } +} + +fn get_cloned( + label: &'static str, + items: &[T], + index: usize, + expected: usize, +) -> Result { + items + .get(index) + .cloned() + .ok_or(TopologyBuildError::VectorLenMismatch { + label, + expected, + actual: items.len(), + }) +} + +fn get_copied( + label: &'static str, + items: &[T], + index: usize, + expected: usize, +) -> Result { + items + .get(index) + .copied() + .ok_or(TopologyBuildError::VectorLenMismatch { + label, + expected, + actual: items.len(), + }) +} diff --git a/testing-framework/core/src/topology/generation.rs b/testing-framework/core/src/topology/generation.rs index deb2bda..ed0cf23 100644 --- a/testing-framework/core/src/topology/generation.rs +++ b/testing-framework/core/src/topology/generation.rs @@ -137,81 +137,20 @@ impl GeneratedTopology { return Ok(()); } - assert_eq!( - self.validators.len(), - validator_endpoints.len(), - "validator endpoints must match topology" - ); - assert_eq!( - self.executors.len(), - executor_endpoints.len(), - "executor endpoints must match topology" - ); - - let mut endpoints = Vec::with_capacity(total_nodes); - endpoints.extend_from_slice(validator_endpoints); - endpoints.extend_from_slice(executor_endpoints); - let labels = self.labels(); let client = Client::new(); - let make_testing_base_url = |port: u16| -> Url { - Url::parse(&format!("http://127.0.0.1:{port}/")).unwrap_or_else(|_| unsafe { - // Safety: `port` is a valid u16 port. - std::hint::unreachable_unchecked() - }) - }; + let endpoints = + collect_node_endpoints(self, validator_endpoints, executor_endpoints, total_nodes); - if endpoints.len() > 1 { - let listen_ports = self.listen_ports(); - let initial_peer_ports = self.initial_peer_ports(); - let expected_peer_counts = crate::topology::generation::find_expected_peer_counts( - &listen_ports, - &initial_peer_ports, - ); + wait_for_network_readiness(self, &client, &endpoints, &labels).await?; - let network_check = HttpNetworkReadiness { - client: &client, - endpoints: &endpoints, - expected_peer_counts: &expected_peer_counts, - labels: &labels, - }; - - network_check.wait().await?; - } - - let mut membership_endpoints = Vec::with_capacity(total_nodes); - if let Some(urls) = validator_membership_endpoints { - assert_eq!( - self.validators.len(), - urls.len(), - "validator membership endpoints must match topology" - ); - - membership_endpoints.extend_from_slice(urls); - } else { - membership_endpoints.extend( - self.validators - .iter() - .map(|node| make_testing_base_url(node.testing_http_port())), - ); - } - - if let Some(urls) = executor_membership_endpoints { - assert_eq!( - self.executors.len(), - urls.len(), - "executor membership endpoints must match topology" - ); - - membership_endpoints.extend_from_slice(urls); - } else { - membership_endpoints.extend( - self.executors - .iter() - .map(|node| make_testing_base_url(node.testing_http_port())), - ); - } + let membership_endpoints = collect_membership_endpoints( + self, + total_nodes, + validator_membership_endpoints, + executor_membership_endpoints, + ); let membership_check = HttpMembershipReadiness { client: &client, @@ -280,6 +219,100 @@ impl GeneratedTopology { } } +fn collect_node_endpoints( + topology: &GeneratedTopology, + validator_endpoints: &[Url], + executor_endpoints: &[Url], + total_nodes: usize, +) -> Vec { + assert_eq!( + topology.validators.len(), + validator_endpoints.len(), + "validator endpoints must match topology" + ); + assert_eq!( + topology.executors.len(), + executor_endpoints.len(), + "executor endpoints must match topology" + ); + + let mut endpoints = Vec::with_capacity(total_nodes); + endpoints.extend_from_slice(validator_endpoints); + endpoints.extend_from_slice(executor_endpoints); + endpoints +} + +async fn wait_for_network_readiness( + topology: &GeneratedTopology, + client: &Client, + endpoints: &[Url], + labels: &[String], +) -> Result<(), ReadinessError> { + if endpoints.len() <= 1 { + return Ok(()); + } + + let listen_ports = topology.listen_ports(); + let initial_peer_ports = topology.initial_peer_ports(); + let expected_peer_counts = + crate::topology::generation::find_expected_peer_counts(&listen_ports, &initial_peer_ports); + + let network_check = HttpNetworkReadiness { + client, + endpoints, + expected_peer_counts: &expected_peer_counts, + labels, + }; + + network_check.wait().await +} + +fn collect_membership_endpoints( + topology: &GeneratedTopology, + total_nodes: usize, + validator_membership_endpoints: Option<&[Url]>, + executor_membership_endpoints: Option<&[Url]>, +) -> Vec { + let mut membership_endpoints = Vec::with_capacity(total_nodes); + + membership_endpoints.extend(collect_role_membership_endpoints( + &topology.validators, + validator_membership_endpoints, + "validator membership endpoints must match topology", + )); + membership_endpoints.extend(collect_role_membership_endpoints( + &topology.executors, + executor_membership_endpoints, + "executor membership endpoints must match topology", + )); + + membership_endpoints +} + +fn collect_role_membership_endpoints( + nodes: &[GeneratedNodeConfig], + membership_endpoints: Option<&[Url]>, + mismatch_message: &'static str, +) -> Vec { + match membership_endpoints { + Some(urls) => { + assert_eq!(nodes.len(), urls.len(), "{mismatch_message}"); + urls.to_vec() + } + None => nodes + .iter() + .map(|node| testing_base_url(node.testing_http_port())) + .collect(), + } +} + +fn testing_base_url(port: u16) -> Url { + Url::parse(&format!("http://127.0.0.1:{port}/")).unwrap_or_else(|_| unsafe { + // Safety: `port` is a valid u16 port. + std::hint::unreachable_unchecked() + }) +} + pub fn find_expected_peer_counts( listen_ports: &[u16], initial_peer_ports: &[HashSet], diff --git a/testing-framework/core/src/topology/readiness/membership.rs b/testing-framework/core/src/topology/readiness/membership.rs index 332fc5a..9fe3fd5 100644 --- a/testing-framework/core/src/topology/readiness/membership.rs +++ b/testing-framework/core/src/topology/readiness/membership.rs @@ -38,52 +38,9 @@ impl<'a> ReadinessCheck<'a> for MembershipReadiness<'a> { type Data = Vec; async fn collect(&'a self) -> Self::Data { - let validator_futures = self - .topology - .validators - .iter() - .enumerate() - .map(|(idx, node)| { - let label = self - .labels - .get(idx) - .cloned() - .unwrap_or_else(|| format!("validator#{idx}")); - async move { - let result = node - .api() - .da_get_membership_checked(&self.session) - .await - .map_err(MembershipError::from); - NodeMembershipStatus { label, result } - } - }); - let offset = self.topology.validators.len(); - let executor_futures = self - .topology - .executors - .iter() - .enumerate() - .map(|(idx, node)| { - let global_idx = offset + idx; - let label = self - .labels - .get(global_idx) - .cloned() - .unwrap_or_else(|| format!("executor#{idx}")); - async move { - let result = node - .api() - .da_get_membership_checked(&self.session) - .await - .map_err(MembershipError::from); - NodeMembershipStatus { label, result } - } - }); - let (validator_statuses, executor_statuses) = tokio::join!( - futures::future::join_all(validator_futures), - futures::future::join_all(executor_futures) + collect_validator_statuses(self), + collect_executor_statuses(self) ); validator_statuses .into_iter() @@ -150,6 +107,62 @@ impl<'a> ReadinessCheck<'a> for HttpMembershipReadiness<'a> { } } +async fn collect_validator_statuses( + readiness: &MembershipReadiness<'_>, +) -> Vec { + let validator_futures = readiness + .topology + .validators + .iter() + .enumerate() + .map(|(idx, node)| { + let label = readiness + .labels + .get(idx) + .cloned() + .unwrap_or_else(|| format!("validator#{idx}")); + async move { + let result = node + .api() + .da_get_membership_checked(&readiness.session) + .await + .map_err(MembershipError::from); + NodeMembershipStatus { label, result } + } + }); + + futures::future::join_all(validator_futures).await +} + +async fn collect_executor_statuses( + readiness: &MembershipReadiness<'_>, +) -> Vec { + let offset = readiness.topology.validators.len(); + let executor_futures = readiness + .topology + .executors + .iter() + .enumerate() + .map(|(idx, node)| { + let global_idx = offset + idx; + let label = readiness + .labels + .get(global_idx) + .cloned() + .unwrap_or_else(|| format!("executor#{idx}")); + async move { + let result = node + .api() + .da_get_membership_checked(&readiness.session) + .await + .map_err(MembershipError::from); + NodeMembershipStatus { label, result } + } + }); + + futures::future::join_all(executor_futures).await +} + pub async fn try_fetch_membership( client: &Client, base: &Url, diff --git a/testing-framework/core/src/topology/readiness/network.rs b/testing-framework/core/src/topology/readiness/network.rs index 7498f1f..f251b91 100644 --- a/testing-framework/core/src/topology/readiness/network.rs +++ b/testing-framework/core/src/topology/readiness/network.rs @@ -36,62 +36,9 @@ impl<'a> ReadinessCheck<'a> for NetworkReadiness<'a> { type Data = Vec; async fn collect(&'a self) -> Self::Data { - let validator_futures = self - .topology - .validators - .iter() - .enumerate() - .map(|(idx, node)| { - let label = self - .labels - .get(idx) - .cloned() - .unwrap_or_else(|| format!("validator#{idx}")); - let expected_peers = self.expected_peer_counts.get(idx).copied(); - async move { - let result = node - .api() - .network_info() - .await - .map_err(NetworkInfoError::from); - NodeNetworkStatus { - label, - expected_peers, - result, - } - } - }); - let offset = self.topology.validators.len(); - let executor_futures = self - .topology - .executors - .iter() - .enumerate() - .map(|(idx, node)| { - let global_idx = offset + idx; - let label = self - .labels - .get(global_idx) - .cloned() - .unwrap_or_else(|| format!("executor#{idx}")); - let expected_peers = self.expected_peer_counts.get(global_idx).copied(); - async move { - let result = node - .api() - .network_info() - .await - .map_err(NetworkInfoError::from); - NodeNetworkStatus { - label, - expected_peers, - result, - } - } - }); - let (validator_statuses, executor_statuses) = tokio::join!( - futures::future::join_all(validator_futures), - futures::future::join_all(executor_futures) + collect_validator_statuses(self), + collect_executor_statuses(self) ); validator_statuses .into_iter() @@ -160,6 +107,68 @@ impl<'a> ReadinessCheck<'a> for HttpNetworkReadiness<'a> { } } +async fn collect_validator_statuses(readiness: &NetworkReadiness<'_>) -> Vec { + let validator_futures = readiness + .topology + .validators + .iter() + .enumerate() + .map(|(idx, node)| { + let label = readiness + .labels + .get(idx) + .cloned() + .unwrap_or_else(|| format!("validator#{idx}")); + let expected_peers = readiness.expected_peer_counts.get(idx).copied(); + async move { + let result = node + .api() + .network_info() + .await + .map_err(NetworkInfoError::from); + NodeNetworkStatus { + label, + expected_peers, + result, + } + } + }); + + futures::future::join_all(validator_futures).await +} + +async fn collect_executor_statuses(readiness: &NetworkReadiness<'_>) -> Vec { + let offset = readiness.topology.validators.len(); + let executor_futures = readiness + .topology + .executors + .iter() + .enumerate() + .map(|(idx, node)| { + let global_idx = offset + idx; + let label = readiness + .labels + .get(global_idx) + .cloned() + .unwrap_or_else(|| format!("executor#{idx}")); + let expected_peers = readiness.expected_peer_counts.get(global_idx).copied(); + async move { + let result = node + .api() + .network_info() + .await + .map_err(NetworkInfoError::from); + NodeNetworkStatus { + label, + expected_peers, + result, + } + } + }); + + futures::future::join_all(executor_futures).await +} + pub async fn try_fetch_network_info( client: &Client, base: &Url,