From a582c00692f90f7a6e9a2d149b568bfd0a169f6d Mon Sep 17 00:00:00 2001 From: andrussal Date: Thu, 18 Dec 2025 14:27:49 +0100 Subject: [PATCH] core: make readiness collectors resilient to HTTP errors --- .../core/src/topology/readiness/balancer.rs | 86 ++++-- .../core/src/topology/readiness/membership.rs | 248 +++++++++++------- .../core/src/topology/readiness/network.rs | 229 +++++++++++----- 3 files changed, 390 insertions(+), 173 deletions(-) diff --git a/testing-framework/core/src/topology/readiness/balancer.rs b/testing-framework/core/src/topology/readiness/balancer.rs index cd8105e..c513b43 100644 --- a/testing-framework/core/src/topology/readiness/balancer.rs +++ b/testing-framework/core/src/topology/readiness/balancer.rs @@ -5,6 +5,13 @@ use crate::topology::deployment::Topology; const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1); +#[derive(Debug)] +pub struct NodeBalancerStatus { + label: String, + threshold: usize, + result: Result, +} + pub struct DaBalancerReadiness<'a> { pub(crate) topology: &'a Topology, pub(crate) labels: &'a [String], @@ -12,44 +19,76 @@ pub struct DaBalancerReadiness<'a> { #[async_trait::async_trait] impl<'a> ReadinessCheck<'a> for DaBalancerReadiness<'a> { - type Data = Vec<(String, usize, BalancerStats)>; + type Data = Vec; async fn collect(&'a self) -> Self::Data { let mut data = Vec::new(); for (idx, validator) in self.topology.validators.iter().enumerate() { - data.push(( - self.labels[idx].clone(), - validator.config().da_network.subnet_threshold, - validator.api().balancer_stats().await.unwrap(), - )); + let label = self + .labels + .get(idx) + .cloned() + .unwrap_or_else(|| format!("validator#{idx}")); + data.push( + ( + label, + validator.config().da_network.subnet_threshold, + validator.api().balancer_stats().await, + ) + .into(), + ); } for (offset, executor) in self.topology.executors.iter().enumerate() { let label_index = self.topology.validators.len() + offset; - data.push(( - self.labels[label_index].clone(), - executor.config().da_network.subnet_threshold, - executor.api().balancer_stats().await.unwrap(), - )); + let label = self + .labels + .get(label_index) + .cloned() + .unwrap_or_else(|| format!("executor#{offset}")); + data.push( + ( + label, + executor.config().da_network.subnet_threshold, + executor.api().balancer_stats().await, + ) + .into(), + ); } data } fn is_ready(&self, data: &Self::Data) -> bool { - data.iter().all(|(_, threshold, stats)| { - if *threshold == 0 { + data.iter().all(|entry| { + if entry.threshold == 0 { return true; } - connected_subnetworks(stats) >= *threshold + entry + .result + .as_ref() + .is_ok_and(|stats| connected_subnetworks(stats) >= entry.threshold) }) } fn timeout_message(&self, data: Self::Data) -> String { let summary = data .into_iter() - .map(|(label, threshold, stats)| { - let connected = connected_subnetworks(&stats); - let details = format_balancer_stats(&stats); - format!("{label}: connected={connected}, required={threshold}, stats={details}") + .map(|entry| { + let (connected, details, error) = match entry.result { + Ok(stats) => ( + connected_subnetworks(&stats), + format_balancer_stats(&stats), + None, + ), + Err(err) => (0, "unavailable".to_string(), Some(err.to_string())), + }; + let mut msg = format!( + "{}: connected={connected}, required={}, stats={details}", + entry.label, entry.threshold + ); + if let Some(error) = error { + msg.push_str(&format!(", error={error}")); + } + msg }) .collect::>() .join(", "); @@ -78,3 +117,14 @@ fn format_balancer_stats(stats: &BalancerStats) -> String { .collect::>() .join(";") } + +impl From<(String, usize, Result)> for NodeBalancerStatus { + fn from(value: (String, usize, Result)) -> Self { + let (label, threshold, result) = value; + Self { + label, + threshold, + result, + } + } +} diff --git a/testing-framework/core/src/topology/readiness/membership.rs b/testing-framework/core/src/topology/readiness/membership.rs index c1cb8a0..d913bbf 100644 --- a/testing-framework/core/src/topology/readiness/membership.rs +++ b/testing-framework/core/src/topology/readiness/membership.rs @@ -1,10 +1,29 @@ use nomos_core::sdp::SessionNumber; use nomos_da_network_service::MembershipResponse; use reqwest::{Client, Url}; +use thiserror::Error; use super::ReadinessCheck; use crate::topology::deployment::Topology; +#[derive(Debug, Error)] +pub enum MembershipError { + #[error("failed to join url {base} with path {path}: {message}")] + JoinUrl { + base: Url, + path: &'static str, + message: String, + }, + #[error(transparent)] + Request(#[from] reqwest::Error), +} + +#[derive(Debug)] +pub struct NodeMembershipStatus { + label: String, + result: Result, +} + pub struct MembershipReadiness<'a> { pub(crate) topology: &'a Topology, pub(crate) session: SessionNumber, @@ -14,71 +33,78 @@ pub struct MembershipReadiness<'a> { #[async_trait::async_trait] impl<'a> ReadinessCheck<'a> for MembershipReadiness<'a> { - type Data = Vec>; + type Data = Vec; async fn collect(&'a self) -> Self::Data { - let (validator_responses, executor_responses) = tokio::join!( - futures::future::join_all( - self.topology - .validators - .iter() - .map(|node| node.api().da_get_membership(&self.session)), - ), - futures::future::join_all( - self.topology - .executors - .iter() - .map(|node| node.api().da_get_membership(&self.session)), - ) - ); + let validator_futures = self + .topology + .validators + .iter() + .enumerate() + .map(|(idx, node)| { + let label = self + .labels + .get(idx) + .cloned() + .unwrap_or_else(|| format!("validator#{idx}")); + async move { + let result = node + .api() + .da_get_membership(&self.session) + .await + .map_err(MembershipError::from); + NodeMembershipStatus { label, result } + } + }); + let offset = self.topology.validators.len(); + let executor_futures = self + .topology + .executors + .iter() + .enumerate() + .map(|(idx, node)| { + let global_idx = offset + idx; + let label = self + .labels + .get(global_idx) + .cloned() + .unwrap_or_else(|| format!("executor#{idx}")); + async move { + let result = node + .api() + .da_get_membership(&self.session) + .await + .map_err(MembershipError::from); + NodeMembershipStatus { label, result } + } + }); - validator_responses + let (validator_statuses, executor_statuses) = tokio::join!( + futures::future::join_all(validator_futures), + futures::future::join_all(executor_futures) + ); + validator_statuses .into_iter() - .chain(executor_responses) + .chain(executor_statuses) .collect() } fn is_ready(&self, data: &Self::Data) -> bool { - self.assignation_statuses(data) - .into_iter() - .all(|ready| ready) + data.iter() + .all(|entry| is_membership_ready(entry.result.as_ref(), self.expect_non_empty)) } fn timeout_message(&self, data: Self::Data) -> String { - let statuses = self.assignation_statuses(&data); let description = if self.expect_non_empty { "non-empty assignations" } else { "empty assignations" }; - let summary = build_membership_summary(self.labels, &statuses, description); + let summary = build_membership_status_summary(&data, description, self.expect_non_empty); format!("timed out waiting for DA membership readiness ({description}): {summary}") } } -impl MembershipReadiness<'_> { - fn assignation_statuses( - &self, - responses: &[Result], - ) -> Vec { - responses - .iter() - .map(|res| { - res.as_ref() - .map(|resp| { - let is_non_empty = !resp.assignations.is_empty(); - if self.expect_non_empty { - is_non_empty - } else { - !is_non_empty - } - }) - .unwrap_or(false) - }) - .collect() - } -} - pub struct HttpMembershipReadiness<'a> { pub(crate) client: &'a Client, pub(crate) endpoints: &'a [Url], @@ -89,78 +115,120 @@ pub struct HttpMembershipReadiness<'a> { #[async_trait::async_trait] impl<'a> ReadinessCheck<'a> for HttpMembershipReadiness<'a> { - type Data = Vec>; + type Data = Vec; async fn collect(&'a self) -> Self::Data { - let futures = self - .endpoints - .iter() - .map(|endpoint| fetch_membership(self.client, endpoint, self.session)); + let futures = self.endpoints.iter().enumerate().map(|(idx, endpoint)| { + let label = self + .labels + .get(idx) + .cloned() + .unwrap_or_else(|| format!("endpoint#{idx}")); + async move { + let result = try_fetch_membership(self.client, endpoint, self.session).await; + NodeMembershipStatus { label, result } + } + }); futures::future::join_all(futures).await } fn is_ready(&self, data: &Self::Data) -> bool { - assignation_statuses(data, self.expect_non_empty) - .into_iter() - .all(|ready| ready) + data.iter() + .all(|entry| is_membership_ready(entry.result.as_ref(), self.expect_non_empty)) } fn timeout_message(&self, data: Self::Data) -> String { - let statuses = assignation_statuses(&data, self.expect_non_empty); let description = if self.expect_non_empty { "non-empty assignations" } else { "empty assignations" }; - let summary = build_membership_summary(self.labels, &statuses, description); + let summary = build_membership_status_summary(&data, description, self.expect_non_empty); format!("timed out waiting for DA membership readiness ({description}): {summary}") } } +pub async fn try_fetch_membership( + client: &Client, + base: &Url, + session: SessionNumber, +) -> Result { + let path = nomos_http_api_common::paths::DA_GET_MEMBERSHIP.trim_start_matches('/'); + let url = base.join(path).map_err(|source| MembershipError::JoinUrl { + base: base.clone(), + path: nomos_http_api_common::paths::DA_GET_MEMBERSHIP, + message: source.to_string(), + })?; + client + .post(url) + .json(&session) + .send() + .await + .map_err(MembershipError::Request)? + .error_for_status() + .map_err(MembershipError::Request)? + .json() + .await + .map_err(MembershipError::Request) +} + +#[deprecated(note = "use try_fetch_membership to avoid panics and preserve error details")] pub async fn fetch_membership( client: &Client, base: &Url, session: SessionNumber, ) -> Result { - let url = base - .join(nomos_http_api_common::paths::DA_GET_MEMBERSHIP.trim_start_matches('/')) - .unwrap_or_else(|err| { - panic!( - "failed to join url {base} with path {}: {err}", - nomos_http_api_common::paths::DA_GET_MEMBERSHIP - ) - }); - client - .post(url) - .json(&session) - .send() - .await? - .error_for_status()? - .json() + try_fetch_membership(client, base, session) .await -} - -pub fn assignation_statuses( - responses: &[Result], - expect_non_empty: bool, -) -> Vec { - responses - .iter() - .map(|res| { - res.as_ref() - .map(|resp| { - let is_non_empty = !resp.assignations.is_empty(); - if expect_non_empty { - is_non_empty - } else { - !is_non_empty - } - }) - .unwrap_or(false) + .map_err(|err| match err { + MembershipError::Request(source) => source, + MembershipError::JoinUrl { + base, + path, + message, + } => { + panic!("failed to join url {base} with path {path}: {message}") + } }) - .collect() } +fn is_membership_ready( + result: Result<&MembershipResponse, &MembershipError>, + expect_non_empty: bool, +) -> bool { + match result { + Ok(resp) => { + let is_non_empty = !resp.assignations.is_empty(); + if expect_non_empty { + is_non_empty + } else { + !is_non_empty + } + } + Err(_) => false, + } +} + +fn build_membership_status_summary( + statuses: &[NodeMembershipStatus], + description: &str, + expect_non_empty: bool, +) -> String { + statuses + .iter() + .map(|entry| match entry.result.as_ref() { + Ok(resp) => { + let ready = is_membership_ready(Ok(resp), expect_non_empty); + let status = if ready { "ready" } else { "waiting" }; + format!("{}: status={status}, expected {description}", entry.label) + } + Err(err) => format!("{}: error={err}, expected {description}", entry.label), + }) + .collect::>() + .join(", ") +} + +#[deprecated(note = "use ReadinessCheck timeout_message for richer per-node errors")] pub fn build_membership_summary(labels: &[String], statuses: &[bool], description: &str) -> String { statuses .iter() diff --git a/testing-framework/core/src/topology/readiness/network.rs b/testing-framework/core/src/topology/readiness/network.rs index 3e74598..7498f1f 100644 --- a/testing-framework/core/src/topology/readiness/network.rs +++ b/testing-framework/core/src/topology/readiness/network.rs @@ -1,10 +1,30 @@ use nomos_network::backends::libp2p::Libp2pInfo; use reqwest::{Client, Url}; +use thiserror::Error; use tracing::warn; use super::ReadinessCheck; use crate::topology::deployment::Topology; +#[derive(Debug, Error)] +pub enum NetworkInfoError { + #[error("failed to join url {base} with path {path}: {message}")] + JoinUrl { + base: Url, + path: &'static str, + message: String, + }, + #[error(transparent)] + Request(#[from] reqwest::Error), +} + +#[derive(Debug)] +pub struct NodeNetworkStatus { + label: String, + expected_peers: Option, + result: Result, +} + pub struct NetworkReadiness<'a> { pub(crate) topology: &'a Topology, pub(crate) expected_peer_counts: &'a [usize], @@ -13,35 +33,83 @@ pub struct NetworkReadiness<'a> { #[async_trait::async_trait] impl<'a> ReadinessCheck<'a> for NetworkReadiness<'a> { - type Data = Vec; + type Data = Vec; async fn collect(&'a self) -> Self::Data { - let (validator_infos, executor_infos) = tokio::join!( - futures::future::join_all( - self.topology - .validators - .iter() - .map(|node| async { node.api().network_info().await.unwrap() }) - ), - futures::future::join_all( - self.topology - .executors - .iter() - .map(|node| async { node.api().network_info().await.unwrap() }) - ) - ); + let validator_futures = self + .topology + .validators + .iter() + .enumerate() + .map(|(idx, node)| { + let label = self + .labels + .get(idx) + .cloned() + .unwrap_or_else(|| format!("validator#{idx}")); + let expected_peers = self.expected_peer_counts.get(idx).copied(); + async move { + let result = node + .api() + .network_info() + .await + .map_err(NetworkInfoError::from); + NodeNetworkStatus { + label, + expected_peers, + result, + } + } + }); + let offset = self.topology.validators.len(); + let executor_futures = self + .topology + .executors + .iter() + .enumerate() + .map(|(idx, node)| { + let global_idx = offset + idx; + let label = self + .labels + .get(global_idx) + .cloned() + .unwrap_or_else(|| format!("executor#{idx}")); + let expected_peers = self.expected_peer_counts.get(global_idx).copied(); + async move { + let result = node + .api() + .network_info() + .await + .map_err(NetworkInfoError::from); + NodeNetworkStatus { + label, + expected_peers, + result, + } + } + }); - validator_infos.into_iter().chain(executor_infos).collect() + let (validator_statuses, executor_statuses) = tokio::join!( + futures::future::join_all(validator_futures), + futures::future::join_all(executor_futures) + ); + validator_statuses + .into_iter() + .chain(executor_statuses) + .collect() } fn is_ready(&self, data: &Self::Data) -> bool { - data.iter() - .enumerate() - .all(|(idx, info)| info.n_peers >= self.expected_peer_counts[idx]) + data.iter().all( + |status| match (status.expected_peers, status.result.as_ref()) { + (Some(expected), Ok(info)) => info.n_peers >= expected, + _ => false, + }, + ) } fn timeout_message(&self, data: Self::Data) -> String { - let summary = build_timeout_summary(self.labels, data, self.expected_peer_counts); + let summary = build_timeout_summary(&data); format!("timed out waiting for network readiness: {summary}") } } @@ -55,59 +123,85 @@ pub struct HttpNetworkReadiness<'a> { #[async_trait::async_trait] impl<'a> ReadinessCheck<'a> for HttpNetworkReadiness<'a> { - type Data = Vec; + type Data = Vec; async fn collect(&'a self) -> Self::Data { - let futures = self - .endpoints - .iter() - .map(|endpoint| fetch_network_info(self.client, endpoint)); + let futures = self.endpoints.iter().enumerate().map(|(idx, endpoint)| { + let label = self + .labels + .get(idx) + .cloned() + .unwrap_or_else(|| format!("endpoint#{idx}")); + let expected_peers = self.expected_peer_counts.get(idx).copied(); + async move { + let result = try_fetch_network_info(self.client, endpoint).await; + NodeNetworkStatus { + label, + expected_peers, + result, + } + } + }); futures::future::join_all(futures).await } fn is_ready(&self, data: &Self::Data) -> bool { - data.iter() - .enumerate() - .all(|(idx, info)| info.n_peers >= self.expected_peer_counts[idx]) + data.iter().all( + |status| match (status.expected_peers, status.result.as_ref()) { + (Some(expected), Ok(info)) => info.n_peers >= expected, + _ => false, + }, + ) } fn timeout_message(&self, data: Self::Data) -> String { - let summary = build_timeout_summary(self.labels, data, self.expected_peer_counts); + let summary = build_timeout_summary(&data); format!("timed out waiting for network readiness: {summary}") } } -async fn fetch_network_info(client: &Client, base: &Url) -> Libp2pInfo { +pub async fn try_fetch_network_info( + client: &Client, + base: &Url, +) -> Result { + let path = nomos_http_api_common::paths::NETWORK_INFO.trim_start_matches('/'); let url = base - .join(nomos_http_api_common::paths::NETWORK_INFO.trim_start_matches('/')) - .unwrap_or_else(|err| { - panic!( - "failed to join url {base} with path {}: {err}", - nomos_http_api_common::paths::NETWORK_INFO - ) - }); - let response = match client.get(url).send().await { - Ok(resp) => resp, - Err(err) => { - return log_network_warning(base, err, "failed to reach network info endpoint"); - } - }; + .join(path) + .map_err(|source| NetworkInfoError::JoinUrl { + base: base.clone(), + path: nomos_http_api_common::paths::NETWORK_INFO, + message: source.to_string(), + })?; - let response = match response.error_for_status() { - Ok(resp) => resp, - Err(err) => { - return log_network_warning(base, err, "network info endpoint returned error"); - } - }; + let response = client + .get(url) + .send() + .await + .map_err(NetworkInfoError::Request)? + .error_for_status() + .map_err(NetworkInfoError::Request)?; - match response.json::().await { + response + .json::() + .await + .map_err(NetworkInfoError::Request) +} + +#[deprecated(note = "use try_fetch_network_info to avoid panics and preserve error details")] +pub async fn fetch_network_info(client: &Client, base: &Url) -> Libp2pInfo { + match try_fetch_network_info(client, base).await { Ok(info) => info, - Err(err) => log_network_warning(base, err, "failed to decode network info response"), + Err(err) => log_network_warning(base, &err), } } -fn log_network_warning(base: &Url, err: impl std::fmt::Display, message: &str) -> Libp2pInfo { - warn!(target: "readiness", url = %base, error = %err, "{message}"); +fn log_network_warning(base: &Url, err: &NetworkInfoError) -> Libp2pInfo { + warn!( + target: "readiness", + url = %base, + error = %err, + "network readiness: failed to fetch network info" + ); empty_libp2p_info() } @@ -120,18 +214,23 @@ fn empty_libp2p_info() -> Libp2pInfo { } } -fn build_timeout_summary( - labels: &[String], - infos: Vec, - expected_counts: &[usize], -) -> String { - infos - .into_iter() - .zip(expected_counts.iter()) - .zip(labels.iter()) - .map(|((info, expected), label)| { - format!("{}: peers={}, expected={}", label, info.n_peers, expected) - }) +fn build_timeout_summary(statuses: &[NodeNetworkStatus]) -> String { + statuses + .iter() + .map( + |status| match (status.expected_peers, status.result.as_ref()) { + (None, _) => format!("{}: missing expected peer count", status.label), + (Some(expected), Ok(info)) => { + format!( + "{}: peers={}, expected={}", + status.label, info.n_peers, expected + ) + } + (Some(expected), Err(err)) => { + format!("{}: error={err}, expected_peers={expected}", status.label) + } + }, + ) .collect::>() .join(", ") }