core: make readiness collectors resilient to HTTP errors

This commit is contained in:
andrussal 2025-12-18 14:27:49 +01:00
parent bb0378792a
commit a582c00692
3 changed files with 390 additions and 173 deletions

View File

@ -5,6 +5,13 @@ use crate::topology::deployment::Topology;
const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
#[derive(Debug)]
pub struct NodeBalancerStatus {
label: String,
threshold: usize,
result: Result<BalancerStats, reqwest::Error>,
}
pub struct DaBalancerReadiness<'a> {
pub(crate) topology: &'a Topology,
pub(crate) labels: &'a [String],
@ -12,44 +19,76 @@ pub struct DaBalancerReadiness<'a> {
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for DaBalancerReadiness<'a> {
type Data = Vec<(String, usize, BalancerStats)>;
type Data = Vec<NodeBalancerStatus>;
async fn collect(&'a self) -> Self::Data {
let mut data = Vec::new();
for (idx, validator) in self.topology.validators.iter().enumerate() {
data.push((
self.labels[idx].clone(),
validator.config().da_network.subnet_threshold,
validator.api().balancer_stats().await.unwrap(),
));
let label = self
.labels
.get(idx)
.cloned()
.unwrap_or_else(|| format!("validator#{idx}"));
data.push(
(
label,
validator.config().da_network.subnet_threshold,
validator.api().balancer_stats().await,
)
.into(),
);
}
for (offset, executor) in self.topology.executors.iter().enumerate() {
let label_index = self.topology.validators.len() + offset;
data.push((
self.labels[label_index].clone(),
executor.config().da_network.subnet_threshold,
executor.api().balancer_stats().await.unwrap(),
));
let label = self
.labels
.get(label_index)
.cloned()
.unwrap_or_else(|| format!("executor#{offset}"));
data.push(
(
label,
executor.config().da_network.subnet_threshold,
executor.api().balancer_stats().await,
)
.into(),
);
}
data
}
fn is_ready(&self, data: &Self::Data) -> bool {
data.iter().all(|(_, threshold, stats)| {
if *threshold == 0 {
data.iter().all(|entry| {
if entry.threshold == 0 {
return true;
}
connected_subnetworks(stats) >= *threshold
entry
.result
.as_ref()
.is_ok_and(|stats| connected_subnetworks(stats) >= entry.threshold)
})
}
fn timeout_message(&self, data: Self::Data) -> String {
let summary = data
.into_iter()
.map(|(label, threshold, stats)| {
let connected = connected_subnetworks(&stats);
let details = format_balancer_stats(&stats);
format!("{label}: connected={connected}, required={threshold}, stats={details}")
.map(|entry| {
let (connected, details, error) = match entry.result {
Ok(stats) => (
connected_subnetworks(&stats),
format_balancer_stats(&stats),
None,
),
Err(err) => (0, "unavailable".to_string(), Some(err.to_string())),
};
let mut msg = format!(
"{}: connected={connected}, required={}, stats={details}",
entry.label, entry.threshold
);
if let Some(error) = error {
msg.push_str(&format!(", error={error}"));
}
msg
})
.collect::<Vec<_>>()
.join(", ");
@ -78,3 +117,14 @@ fn format_balancer_stats(stats: &BalancerStats) -> String {
.collect::<Vec<_>>()
.join(";")
}
impl From<(String, usize, Result<BalancerStats, reqwest::Error>)> for NodeBalancerStatus {
fn from(value: (String, usize, Result<BalancerStats, reqwest::Error>)) -> Self {
let (label, threshold, result) = value;
Self {
label,
threshold,
result,
}
}
}

View File

@ -1,10 +1,29 @@
use nomos_core::sdp::SessionNumber;
use nomos_da_network_service::MembershipResponse;
use reqwest::{Client, Url};
use thiserror::Error;
use super::ReadinessCheck;
use crate::topology::deployment::Topology;
#[derive(Debug, Error)]
pub enum MembershipError {
#[error("failed to join url {base} with path {path}: {message}")]
JoinUrl {
base: Url,
path: &'static str,
message: String,
},
#[error(transparent)]
Request(#[from] reqwest::Error),
}
#[derive(Debug)]
pub struct NodeMembershipStatus {
label: String,
result: Result<MembershipResponse, MembershipError>,
}
pub struct MembershipReadiness<'a> {
pub(crate) topology: &'a Topology,
pub(crate) session: SessionNumber,
@ -14,71 +33,78 @@ pub struct MembershipReadiness<'a> {
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for MembershipReadiness<'a> {
type Data = Vec<Result<MembershipResponse, reqwest::Error>>;
type Data = Vec<NodeMembershipStatus>;
async fn collect(&'a self) -> Self::Data {
let (validator_responses, executor_responses) = tokio::join!(
futures::future::join_all(
self.topology
.validators
.iter()
.map(|node| node.api().da_get_membership(&self.session)),
),
futures::future::join_all(
self.topology
.executors
.iter()
.map(|node| node.api().da_get_membership(&self.session)),
)
);
let validator_futures = self
.topology
.validators
.iter()
.enumerate()
.map(|(idx, node)| {
let label = self
.labels
.get(idx)
.cloned()
.unwrap_or_else(|| format!("validator#{idx}"));
async move {
let result = node
.api()
.da_get_membership(&self.session)
.await
.map_err(MembershipError::from);
NodeMembershipStatus { label, result }
}
});
let offset = self.topology.validators.len();
let executor_futures = self
.topology
.executors
.iter()
.enumerate()
.map(|(idx, node)| {
let global_idx = offset + idx;
let label = self
.labels
.get(global_idx)
.cloned()
.unwrap_or_else(|| format!("executor#{idx}"));
async move {
let result = node
.api()
.da_get_membership(&self.session)
.await
.map_err(MembershipError::from);
NodeMembershipStatus { label, result }
}
});
validator_responses
let (validator_statuses, executor_statuses) = tokio::join!(
futures::future::join_all(validator_futures),
futures::future::join_all(executor_futures)
);
validator_statuses
.into_iter()
.chain(executor_responses)
.chain(executor_statuses)
.collect()
}
fn is_ready(&self, data: &Self::Data) -> bool {
self.assignation_statuses(data)
.into_iter()
.all(|ready| ready)
data.iter()
.all(|entry| is_membership_ready(entry.result.as_ref(), self.expect_non_empty))
}
fn timeout_message(&self, data: Self::Data) -> String {
let statuses = self.assignation_statuses(&data);
let description = if self.expect_non_empty {
"non-empty assignations"
} else {
"empty assignations"
};
let summary = build_membership_summary(self.labels, &statuses, description);
let summary = build_membership_status_summary(&data, description, self.expect_non_empty);
format!("timed out waiting for DA membership readiness ({description}): {summary}")
}
}
impl MembershipReadiness<'_> {
fn assignation_statuses(
&self,
responses: &[Result<MembershipResponse, reqwest::Error>],
) -> Vec<bool> {
responses
.iter()
.map(|res| {
res.as_ref()
.map(|resp| {
let is_non_empty = !resp.assignations.is_empty();
if self.expect_non_empty {
is_non_empty
} else {
!is_non_empty
}
})
.unwrap_or(false)
})
.collect()
}
}
pub struct HttpMembershipReadiness<'a> {
pub(crate) client: &'a Client,
pub(crate) endpoints: &'a [Url],
@ -89,78 +115,120 @@ pub struct HttpMembershipReadiness<'a> {
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for HttpMembershipReadiness<'a> {
type Data = Vec<Result<MembershipResponse, reqwest::Error>>;
type Data = Vec<NodeMembershipStatus>;
async fn collect(&'a self) -> Self::Data {
let futures = self
.endpoints
.iter()
.map(|endpoint| fetch_membership(self.client, endpoint, self.session));
let futures = self.endpoints.iter().enumerate().map(|(idx, endpoint)| {
let label = self
.labels
.get(idx)
.cloned()
.unwrap_or_else(|| format!("endpoint#{idx}"));
async move {
let result = try_fetch_membership(self.client, endpoint, self.session).await;
NodeMembershipStatus { label, result }
}
});
futures::future::join_all(futures).await
}
fn is_ready(&self, data: &Self::Data) -> bool {
assignation_statuses(data, self.expect_non_empty)
.into_iter()
.all(|ready| ready)
data.iter()
.all(|entry| is_membership_ready(entry.result.as_ref(), self.expect_non_empty))
}
fn timeout_message(&self, data: Self::Data) -> String {
let statuses = assignation_statuses(&data, self.expect_non_empty);
let description = if self.expect_non_empty {
"non-empty assignations"
} else {
"empty assignations"
};
let summary = build_membership_summary(self.labels, &statuses, description);
let summary = build_membership_status_summary(&data, description, self.expect_non_empty);
format!("timed out waiting for DA membership readiness ({description}): {summary}")
}
}
pub async fn try_fetch_membership(
client: &Client,
base: &Url,
session: SessionNumber,
) -> Result<MembershipResponse, MembershipError> {
let path = nomos_http_api_common::paths::DA_GET_MEMBERSHIP.trim_start_matches('/');
let url = base.join(path).map_err(|source| MembershipError::JoinUrl {
base: base.clone(),
path: nomos_http_api_common::paths::DA_GET_MEMBERSHIP,
message: source.to_string(),
})?;
client
.post(url)
.json(&session)
.send()
.await
.map_err(MembershipError::Request)?
.error_for_status()
.map_err(MembershipError::Request)?
.json()
.await
.map_err(MembershipError::Request)
}
#[deprecated(note = "use try_fetch_membership to avoid panics and preserve error details")]
pub async fn fetch_membership(
client: &Client,
base: &Url,
session: SessionNumber,
) -> Result<MembershipResponse, reqwest::Error> {
let url = base
.join(nomos_http_api_common::paths::DA_GET_MEMBERSHIP.trim_start_matches('/'))
.unwrap_or_else(|err| {
panic!(
"failed to join url {base} with path {}: {err}",
nomos_http_api_common::paths::DA_GET_MEMBERSHIP
)
});
client
.post(url)
.json(&session)
.send()
.await?
.error_for_status()?
.json()
try_fetch_membership(client, base, session)
.await
}
pub fn assignation_statuses(
responses: &[Result<MembershipResponse, reqwest::Error>],
expect_non_empty: bool,
) -> Vec<bool> {
responses
.iter()
.map(|res| {
res.as_ref()
.map(|resp| {
let is_non_empty = !resp.assignations.is_empty();
if expect_non_empty {
is_non_empty
} else {
!is_non_empty
}
})
.unwrap_or(false)
.map_err(|err| match err {
MembershipError::Request(source) => source,
MembershipError::JoinUrl {
base,
path,
message,
} => {
panic!("failed to join url {base} with path {path}: {message}")
}
})
.collect()
}
fn is_membership_ready(
result: Result<&MembershipResponse, &MembershipError>,
expect_non_empty: bool,
) -> bool {
match result {
Ok(resp) => {
let is_non_empty = !resp.assignations.is_empty();
if expect_non_empty {
is_non_empty
} else {
!is_non_empty
}
}
Err(_) => false,
}
}
fn build_membership_status_summary(
statuses: &[NodeMembershipStatus],
description: &str,
expect_non_empty: bool,
) -> String {
statuses
.iter()
.map(|entry| match entry.result.as_ref() {
Ok(resp) => {
let ready = is_membership_ready(Ok(resp), expect_non_empty);
let status = if ready { "ready" } else { "waiting" };
format!("{}: status={status}, expected {description}", entry.label)
}
Err(err) => format!("{}: error={err}, expected {description}", entry.label),
})
.collect::<Vec<_>>()
.join(", ")
}
#[deprecated(note = "use ReadinessCheck timeout_message for richer per-node errors")]
pub fn build_membership_summary(labels: &[String], statuses: &[bool], description: &str) -> String {
statuses
.iter()

View File

@ -1,10 +1,30 @@
use nomos_network::backends::libp2p::Libp2pInfo;
use reqwest::{Client, Url};
use thiserror::Error;
use tracing::warn;
use super::ReadinessCheck;
use crate::topology::deployment::Topology;
#[derive(Debug, Error)]
pub enum NetworkInfoError {
#[error("failed to join url {base} with path {path}: {message}")]
JoinUrl {
base: Url,
path: &'static str,
message: String,
},
#[error(transparent)]
Request(#[from] reqwest::Error),
}
#[derive(Debug)]
pub struct NodeNetworkStatus {
label: String,
expected_peers: Option<usize>,
result: Result<Libp2pInfo, NetworkInfoError>,
}
pub struct NetworkReadiness<'a> {
pub(crate) topology: &'a Topology,
pub(crate) expected_peer_counts: &'a [usize],
@ -13,35 +33,83 @@ pub struct NetworkReadiness<'a> {
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for NetworkReadiness<'a> {
type Data = Vec<Libp2pInfo>;
type Data = Vec<NodeNetworkStatus>;
async fn collect(&'a self) -> Self::Data {
let (validator_infos, executor_infos) = tokio::join!(
futures::future::join_all(
self.topology
.validators
.iter()
.map(|node| async { node.api().network_info().await.unwrap() })
),
futures::future::join_all(
self.topology
.executors
.iter()
.map(|node| async { node.api().network_info().await.unwrap() })
)
);
let validator_futures = self
.topology
.validators
.iter()
.enumerate()
.map(|(idx, node)| {
let label = self
.labels
.get(idx)
.cloned()
.unwrap_or_else(|| format!("validator#{idx}"));
let expected_peers = self.expected_peer_counts.get(idx).copied();
async move {
let result = node
.api()
.network_info()
.await
.map_err(NetworkInfoError::from);
NodeNetworkStatus {
label,
expected_peers,
result,
}
}
});
let offset = self.topology.validators.len();
let executor_futures = self
.topology
.executors
.iter()
.enumerate()
.map(|(idx, node)| {
let global_idx = offset + idx;
let label = self
.labels
.get(global_idx)
.cloned()
.unwrap_or_else(|| format!("executor#{idx}"));
let expected_peers = self.expected_peer_counts.get(global_idx).copied();
async move {
let result = node
.api()
.network_info()
.await
.map_err(NetworkInfoError::from);
NodeNetworkStatus {
label,
expected_peers,
result,
}
}
});
validator_infos.into_iter().chain(executor_infos).collect()
let (validator_statuses, executor_statuses) = tokio::join!(
futures::future::join_all(validator_futures),
futures::future::join_all(executor_futures)
);
validator_statuses
.into_iter()
.chain(executor_statuses)
.collect()
}
fn is_ready(&self, data: &Self::Data) -> bool {
data.iter()
.enumerate()
.all(|(idx, info)| info.n_peers >= self.expected_peer_counts[idx])
data.iter().all(
|status| match (status.expected_peers, status.result.as_ref()) {
(Some(expected), Ok(info)) => info.n_peers >= expected,
_ => false,
},
)
}
fn timeout_message(&self, data: Self::Data) -> String {
let summary = build_timeout_summary(self.labels, data, self.expected_peer_counts);
let summary = build_timeout_summary(&data);
format!("timed out waiting for network readiness: {summary}")
}
}
@ -55,59 +123,85 @@ pub struct HttpNetworkReadiness<'a> {
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for HttpNetworkReadiness<'a> {
type Data = Vec<Libp2pInfo>;
type Data = Vec<NodeNetworkStatus>;
async fn collect(&'a self) -> Self::Data {
let futures = self
.endpoints
.iter()
.map(|endpoint| fetch_network_info(self.client, endpoint));
let futures = self.endpoints.iter().enumerate().map(|(idx, endpoint)| {
let label = self
.labels
.get(idx)
.cloned()
.unwrap_or_else(|| format!("endpoint#{idx}"));
let expected_peers = self.expected_peer_counts.get(idx).copied();
async move {
let result = try_fetch_network_info(self.client, endpoint).await;
NodeNetworkStatus {
label,
expected_peers,
result,
}
}
});
futures::future::join_all(futures).await
}
fn is_ready(&self, data: &Self::Data) -> bool {
data.iter()
.enumerate()
.all(|(idx, info)| info.n_peers >= self.expected_peer_counts[idx])
data.iter().all(
|status| match (status.expected_peers, status.result.as_ref()) {
(Some(expected), Ok(info)) => info.n_peers >= expected,
_ => false,
},
)
}
fn timeout_message(&self, data: Self::Data) -> String {
let summary = build_timeout_summary(self.labels, data, self.expected_peer_counts);
let summary = build_timeout_summary(&data);
format!("timed out waiting for network readiness: {summary}")
}
}
async fn fetch_network_info(client: &Client, base: &Url) -> Libp2pInfo {
pub async fn try_fetch_network_info(
client: &Client,
base: &Url,
) -> Result<Libp2pInfo, NetworkInfoError> {
let path = nomos_http_api_common::paths::NETWORK_INFO.trim_start_matches('/');
let url = base
.join(nomos_http_api_common::paths::NETWORK_INFO.trim_start_matches('/'))
.unwrap_or_else(|err| {
panic!(
"failed to join url {base} with path {}: {err}",
nomos_http_api_common::paths::NETWORK_INFO
)
});
let response = match client.get(url).send().await {
Ok(resp) => resp,
Err(err) => {
return log_network_warning(base, err, "failed to reach network info endpoint");
}
};
.join(path)
.map_err(|source| NetworkInfoError::JoinUrl {
base: base.clone(),
path: nomos_http_api_common::paths::NETWORK_INFO,
message: source.to_string(),
})?;
let response = match response.error_for_status() {
Ok(resp) => resp,
Err(err) => {
return log_network_warning(base, err, "network info endpoint returned error");
}
};
let response = client
.get(url)
.send()
.await
.map_err(NetworkInfoError::Request)?
.error_for_status()
.map_err(NetworkInfoError::Request)?;
match response.json::<Libp2pInfo>().await {
response
.json::<Libp2pInfo>()
.await
.map_err(NetworkInfoError::Request)
}
#[deprecated(note = "use try_fetch_network_info to avoid panics and preserve error details")]
pub async fn fetch_network_info(client: &Client, base: &Url) -> Libp2pInfo {
match try_fetch_network_info(client, base).await {
Ok(info) => info,
Err(err) => log_network_warning(base, err, "failed to decode network info response"),
Err(err) => log_network_warning(base, &err),
}
}
fn log_network_warning(base: &Url, err: impl std::fmt::Display, message: &str) -> Libp2pInfo {
warn!(target: "readiness", url = %base, error = %err, "{message}");
fn log_network_warning(base: &Url, err: &NetworkInfoError) -> Libp2pInfo {
warn!(
target: "readiness",
url = %base,
error = %err,
"network readiness: failed to fetch network info"
);
empty_libp2p_info()
}
@ -120,18 +214,23 @@ fn empty_libp2p_info() -> Libp2pInfo {
}
}
fn build_timeout_summary(
labels: &[String],
infos: Vec<Libp2pInfo>,
expected_counts: &[usize],
) -> String {
infos
.into_iter()
.zip(expected_counts.iter())
.zip(labels.iter())
.map(|((info, expected), label)| {
format!("{}: peers={}, expected={}", label, info.n_peers, expected)
})
fn build_timeout_summary(statuses: &[NodeNetworkStatus]) -> String {
statuses
.iter()
.map(
|status| match (status.expected_peers, status.result.as_ref()) {
(None, _) => format!("{}: missing expected peer count", status.label),
(Some(expected), Ok(info)) => {
format!(
"{}: peers={}, expected={}",
status.label, info.n_peers, expected
)
}
(Some(expected), Err(err)) => {
format!("{}: error={err}, expected_peers={expected}", status.label)
}
},
)
.collect::<Vec<_>>()
.join(", ")
}