Factor readiness checks into dedicated modules

This commit is contained in:
andrussal 2025-12-10 08:07:16 +01:00
parent ab026f67fa
commit 253b0e7a0e
5 changed files with 403 additions and 378 deletions

View File

@ -6,7 +6,6 @@ use crate::topology::{
Topology, TopologyConfig,
configs::{GeneralConfig, wallet::WalletAccount},
readiness::{HttpMembershipReadiness, HttpNetworkReadiness, ReadinessCheck, ReadinessError},
utils::multiaddr_port,
};
/// Node role within the generated topology.
@ -159,8 +158,10 @@ impl GeneratedTopology {
if endpoints.len() > 1 {
let listen_ports = self.listen_ports();
let initial_peer_ports = self.initial_peer_ports();
let expected_peer_counts =
find_expected_peer_counts(&listen_ports, &initial_peer_ports);
let expected_peer_counts = crate::topology::generation::find_expected_peer_counts(
&listen_ports,
&initial_peer_ports,
);
let network_check = HttpNetworkReadiness {
client: &client,
endpoints: &endpoints,
@ -234,7 +235,7 @@ impl GeneratedTopology {
.backend
.initial_peers
.iter()
.filter_map(multiaddr_port)
.filter_map(crate::topology::utils::multiaddr_port)
.collect::<HashSet<u16>>()
})
.chain(self.executors.iter().map(|node| {
@ -243,7 +244,7 @@ impl GeneratedTopology {
.backend
.initial_peers
.iter()
.filter_map(multiaddr_port)
.filter_map(crate::topology::utils::multiaddr_port)
.collect::<HashSet<u16>>()
}))
.collect()

View File

@ -0,0 +1,78 @@
use nomos_da_network_core::swarm::BalancerStats;
use super::ReadinessCheck;
use crate::topology::Topology;
pub struct DaBalancerReadiness<'a> {
pub(crate) topology: &'a Topology,
pub(crate) labels: &'a [String],
}
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for DaBalancerReadiness<'a> {
type Data = Vec<(String, usize, BalancerStats)>;
async fn collect(&'a self) -> Self::Data {
let mut data = Vec::new();
for (idx, validator) in self.topology.validators.iter().enumerate() {
data.push((
self.labels[idx].clone(),
validator.config().da_network.subnet_threshold,
validator.balancer_stats().await,
));
}
for (offset, executor) in self.topology.executors.iter().enumerate() {
let label_index = self.topology.validators.len() + offset;
data.push((
self.labels[label_index].clone(),
executor.config().da_network.subnet_threshold,
executor.balancer_stats().await,
));
}
data
}
fn is_ready(&self, data: &Self::Data) -> bool {
data.iter().all(|(_, threshold, stats)| {
if *threshold == 0 {
return true;
}
connected_subnetworks(stats) >= *threshold
})
}
fn timeout_message(&self, data: Self::Data) -> String {
let summary = data
.into_iter()
.map(|(label, threshold, stats)| {
let connected = connected_subnetworks(&stats);
let details = format_balancer_stats(&stats);
format!("{label}: connected={connected}, required={threshold}, stats={details}")
})
.collect::<Vec<_>>()
.join(", ");
format!("timed out waiting for DA balancer readiness: {summary}")
}
fn poll_interval(&self) -> std::time::Duration {
std::time::Duration::from_secs(1)
}
}
fn connected_subnetworks(stats: &BalancerStats) -> usize {
stats
.values()
.filter(|stat| stat.inbound > 0 || stat.outbound > 0)
.count()
}
fn format_balancer_stats(stats: &BalancerStats) -> String {
if stats.is_empty() {
return "empty".into();
}
stats
.iter()
.map(|(subnet, stat)| format!("{}:in={},out={}", subnet, stat.inbound, stat.outbound))
.collect::<Vec<_>>()
.join(";")
}

View File

@ -0,0 +1,174 @@
use nomos_core::sdp::SessionNumber;
use nomos_da_network_service::MembershipResponse;
use reqwest::{Client, Url};
use super::ReadinessCheck;
use crate::topology::Topology;
pub struct MembershipReadiness<'a> {
pub(crate) topology: &'a Topology,
pub(crate) session: SessionNumber,
pub(crate) labels: &'a [String],
pub(crate) expect_non_empty: bool,
}
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for MembershipReadiness<'a> {
type Data = Vec<Result<MembershipResponse, reqwest::Error>>;
async fn collect(&'a self) -> Self::Data {
let (validator_responses, executor_responses) = tokio::join!(
futures::future::join_all(
self.topology
.validators
.iter()
.map(|node| node.da_get_membership(self.session)),
),
futures::future::join_all(
self.topology
.executors
.iter()
.map(|node| node.da_get_membership(self.session)),
)
);
validator_responses
.into_iter()
.chain(executor_responses)
.collect()
}
fn is_ready(&self, data: &Self::Data) -> bool {
self.assignation_statuses(data)
.into_iter()
.all(|ready| ready)
}
fn timeout_message(&self, data: Self::Data) -> String {
let statuses = self.assignation_statuses(&data);
let description = if self.expect_non_empty {
"non-empty assignations"
} else {
"empty assignations"
};
let summary = build_membership_summary(self.labels, &statuses, description);
format!("timed out waiting for DA membership readiness ({description}): {summary}")
}
}
impl MembershipReadiness<'_> {
fn assignation_statuses(
&self,
responses: &[Result<MembershipResponse, reqwest::Error>],
) -> Vec<bool> {
responses
.iter()
.map(|res| {
res.as_ref()
.map(|resp| {
let is_non_empty = !resp.assignations.is_empty();
if self.expect_non_empty {
is_non_empty
} else {
!is_non_empty
}
})
.unwrap_or(false)
})
.collect()
}
}
pub struct HttpMembershipReadiness<'a> {
pub(crate) client: &'a Client,
pub(crate) endpoints: &'a [Url],
pub(crate) session: SessionNumber,
pub(crate) labels: &'a [String],
pub(crate) expect_non_empty: bool,
}
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for HttpMembershipReadiness<'a> {
type Data = Vec<Result<MembershipResponse, reqwest::Error>>;
async fn collect(&'a self) -> Self::Data {
let futures = self
.endpoints
.iter()
.map(|endpoint| fetch_membership(self.client, endpoint, self.session));
futures::future::join_all(futures).await
}
fn is_ready(&self, data: &Self::Data) -> bool {
assignation_statuses(data, self.expect_non_empty)
.into_iter()
.all(|ready| ready)
}
fn timeout_message(&self, data: Self::Data) -> String {
let statuses = assignation_statuses(&data, self.expect_non_empty);
let description = if self.expect_non_empty {
"non-empty assignations"
} else {
"empty assignations"
};
let summary = build_membership_summary(self.labels, &statuses, description);
format!("timed out waiting for DA membership readiness ({description}): {summary}")
}
}
pub async fn fetch_membership(
client: &Client,
base: &Url,
session: SessionNumber,
) -> Result<MembershipResponse, reqwest::Error> {
let url = base
.join(nomos_http_api_common::paths::DA_GET_MEMBERSHIP.trim_start_matches('/'))
.unwrap_or_else(|err| {
panic!(
"failed to join url {base} with path {}: {err}",
nomos_http_api_common::paths::DA_GET_MEMBERSHIP
)
});
client
.post(url)
.json(&session)
.send()
.await?
.error_for_status()?
.json()
.await
}
pub fn assignation_statuses(
responses: &[Result<MembershipResponse, reqwest::Error>],
expect_non_empty: bool,
) -> Vec<bool> {
responses
.iter()
.map(|res| {
res.as_ref()
.map(|resp| {
let is_non_empty = !resp.assignations.is_empty();
if expect_non_empty {
is_non_empty
} else {
!is_non_empty
}
})
.unwrap_or(false)
})
.collect()
}
pub fn build_membership_summary(labels: &[String], statuses: &[bool], description: &str) -> String {
statuses
.iter()
.zip(labels.iter())
.map(|(ready, label)| {
let status = if *ready { "ready" } else { "waiting" };
format!("{label}: status={status}, expected {description}")
})
.collect::<Vec<_>>()
.join(", ")
}

View File

@ -1,17 +1,16 @@
pub mod balancer;
pub mod membership;
pub mod network;
use std::time::Duration;
use futures::future::join_all;
use nomos_core::sdp::SessionNumber;
use nomos_da_network_core::swarm::BalancerStats;
use nomos_da_network_service::MembershipResponse;
use nomos_http_api_common::paths;
use nomos_network::backends::libp2p::Libp2pInfo;
use reqwest::{Client, Url};
pub use balancer::DaBalancerReadiness;
pub use membership::{HttpMembershipReadiness, MembershipReadiness};
pub use network::{HttpNetworkReadiness, NetworkReadiness};
use thiserror::Error;
use tokio::time::{sleep, timeout};
use tracing::warn;
use crate::{adjust_timeout, topology::Topology};
use crate::adjust_timeout;
#[derive(Debug, Error)]
pub enum ReadinessError {
@ -59,367 +58,3 @@ pub trait ReadinessCheck<'a> {
Ok(())
}
}
pub struct NetworkReadiness<'a> {
pub(crate) topology: &'a Topology,
pub(crate) expected_peer_counts: &'a [usize],
pub(crate) labels: &'a [String],
}
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for NetworkReadiness<'a> {
type Data = Vec<Libp2pInfo>;
async fn collect(&'a self) -> Self::Data {
let (validator_infos, executor_infos) = tokio::join!(
join_all(
self.topology
.validators
.iter()
.map(crate::nodes::validator::Validator::network_info)
),
join_all(
self.topology
.executors
.iter()
.map(crate::nodes::executor::Executor::network_info)
)
);
validator_infos.into_iter().chain(executor_infos).collect()
}
fn is_ready(&self, data: &Self::Data) -> bool {
data.iter()
.enumerate()
.all(|(idx, info)| info.n_peers >= self.expected_peer_counts[idx])
}
fn timeout_message(&self, data: Self::Data) -> String {
let summary = build_timeout_summary(self.labels, data, self.expected_peer_counts);
format!("timed out waiting for network readiness: {summary}")
}
}
pub struct HttpNetworkReadiness<'a> {
pub(crate) client: &'a Client,
pub(crate) endpoints: &'a [Url],
pub(crate) expected_peer_counts: &'a [usize],
pub(crate) labels: &'a [String],
}
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for HttpNetworkReadiness<'a> {
type Data = Vec<Libp2pInfo>;
async fn collect(&'a self) -> Self::Data {
let futures = self
.endpoints
.iter()
.map(|endpoint| fetch_network_info(self.client, endpoint));
join_all(futures).await
}
fn is_ready(&self, data: &Self::Data) -> bool {
data.iter()
.enumerate()
.all(|(idx, info)| info.n_peers >= self.expected_peer_counts[idx])
}
fn timeout_message(&self, data: Self::Data) -> String {
let summary = build_timeout_summary(self.labels, data, self.expected_peer_counts);
format!("timed out waiting for network readiness: {summary}")
}
}
pub struct MembershipReadiness<'a> {
pub(crate) topology: &'a Topology,
pub(crate) session: SessionNumber,
pub(crate) labels: &'a [String],
pub(crate) expect_non_empty: bool,
}
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for MembershipReadiness<'a> {
type Data = Vec<Result<MembershipResponse, reqwest::Error>>;
async fn collect(&'a self) -> Self::Data {
let (validator_responses, executor_responses) = tokio::join!(
join_all(
self.topology
.validators
.iter()
.map(|node| node.da_get_membership(self.session)),
),
join_all(
self.topology
.executors
.iter()
.map(|node| node.da_get_membership(self.session)),
)
);
validator_responses
.into_iter()
.chain(executor_responses)
.collect()
}
fn is_ready(&self, data: &Self::Data) -> bool {
self.assignation_statuses(data)
.into_iter()
.all(|ready| ready)
}
fn timeout_message(&self, data: Self::Data) -> String {
let statuses = self.assignation_statuses(&data);
let description = if self.expect_non_empty {
"non-empty assignations"
} else {
"empty assignations"
};
let summary = build_membership_summary(self.labels, &statuses, description);
format!("timed out waiting for DA membership readiness ({description}): {summary}")
}
}
impl MembershipReadiness<'_> {
fn assignation_statuses(
&self,
responses: &[Result<MembershipResponse, reqwest::Error>],
) -> Vec<bool> {
responses
.iter()
.map(|res| {
res.as_ref()
.map(|resp| {
let is_non_empty = !resp.assignations.is_empty();
if self.expect_non_empty {
is_non_empty
} else {
!is_non_empty
}
})
.unwrap_or(false)
})
.collect()
}
}
pub struct HttpMembershipReadiness<'a> {
pub(crate) client: &'a Client,
pub(crate) endpoints: &'a [Url],
pub(crate) session: SessionNumber,
pub(crate) labels: &'a [String],
pub(crate) expect_non_empty: bool,
}
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for HttpMembershipReadiness<'a> {
type Data = Vec<Result<MembershipResponse, reqwest::Error>>;
async fn collect(&'a self) -> Self::Data {
let futures = self
.endpoints
.iter()
.map(|endpoint| fetch_membership(self.client, endpoint, self.session));
futures::future::join_all(futures).await
}
fn is_ready(&self, data: &Self::Data) -> bool {
assignation_statuses(data, self.expect_non_empty)
.into_iter()
.all(|ready| ready)
}
fn timeout_message(&self, data: Self::Data) -> String {
let statuses = assignation_statuses(&data, self.expect_non_empty);
let description = if self.expect_non_empty {
"non-empty assignations"
} else {
"empty assignations"
};
let summary = build_membership_summary(self.labels, &statuses, description);
format!("timed out waiting for DA membership readiness ({description}): {summary}")
}
}
pub struct DaBalancerReadiness<'a> {
pub(crate) topology: &'a Topology,
pub(crate) labels: &'a [String],
}
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for DaBalancerReadiness<'a> {
type Data = Vec<(String, usize, BalancerStats)>;
async fn collect(&'a self) -> Self::Data {
let mut data = Vec::new();
for (idx, validator) in self.topology.validators.iter().enumerate() {
data.push((
self.labels[idx].clone(),
validator.config().da_network.subnet_threshold,
validator.balancer_stats().await,
));
}
for (offset, executor) in self.topology.executors.iter().enumerate() {
let label_index = self.topology.validators.len() + offset;
data.push((
self.labels[label_index].clone(),
executor.config().da_network.subnet_threshold,
executor.balancer_stats().await,
));
}
data
}
fn is_ready(&self, data: &Self::Data) -> bool {
data.iter().all(|(_, threshold, stats)| {
if *threshold == 0 {
return true;
}
connected_subnetworks(stats) >= *threshold
})
}
fn timeout_message(&self, data: Self::Data) -> String {
let summary = data
.into_iter()
.map(|(label, threshold, stats)| {
let connected = connected_subnetworks(&stats);
let details = format_balancer_stats(&stats);
format!("{label}: connected={connected}, required={threshold}, stats={details}")
})
.collect::<Vec<_>>()
.join(", ");
format!("timed out waiting for DA balancer readiness: {summary}")
}
fn poll_interval(&self) -> Duration {
Duration::from_secs(1)
}
}
fn connected_subnetworks(stats: &BalancerStats) -> usize {
stats
.values()
.filter(|stat| stat.inbound > 0 || stat.outbound > 0)
.count()
}
fn format_balancer_stats(stats: &BalancerStats) -> String {
if stats.is_empty() {
return "empty".into();
}
stats
.iter()
.map(|(subnet, stat)| format!("{}:in={},out={}", subnet, stat.inbound, stat.outbound))
.collect::<Vec<_>>()
.join(";")
}
fn build_timeout_summary(
labels: &[String],
infos: Vec<Libp2pInfo>,
expected_counts: &[usize],
) -> String {
infos
.into_iter()
.zip(expected_counts.iter())
.zip(labels.iter())
.map(|((info, expected), label)| {
format!("{}: peers={}, expected={}", label, info.n_peers, expected)
})
.collect::<Vec<_>>()
.join(", ")
}
fn build_membership_summary(labels: &[String], statuses: &[bool], description: &str) -> String {
statuses
.iter()
.zip(labels.iter())
.map(|(ready, label)| {
let status = if *ready { "ready" } else { "waiting" };
format!("{label}: status={status}, expected {description}")
})
.collect::<Vec<_>>()
.join(", ")
}
async fn fetch_network_info(client: &Client, base: &Url) -> Libp2pInfo {
let url = join_path(base, paths::NETWORK_INFO);
let response = match client.get(url).send().await {
Ok(resp) => resp,
Err(err) => {
return log_network_warning(base, err, "failed to reach network info endpoint");
}
};
let response = match response.error_for_status() {
Ok(resp) => resp,
Err(err) => {
return log_network_warning(base, err, "network info endpoint returned error");
}
};
match response.json::<Libp2pInfo>().await {
Ok(info) => info,
Err(err) => log_network_warning(base, err, "failed to decode network info response"),
}
}
async fn fetch_membership(
client: &Client,
base: &Url,
session: SessionNumber,
) -> Result<MembershipResponse, reqwest::Error> {
let url = join_path(base, paths::DA_GET_MEMBERSHIP);
client
.post(url)
.json(&session)
.send()
.await?
.error_for_status()?
.json()
.await
}
fn log_network_warning(base: &Url, err: impl std::fmt::Display, message: &str) -> Libp2pInfo {
warn!(target: "readiness", url = %base, error = %err, "{message}");
empty_libp2p_info()
}
fn empty_libp2p_info() -> Libp2pInfo {
Libp2pInfo {
listen_addresses: Vec::with_capacity(0),
n_peers: 0,
n_connections: 0,
n_pending_connections: 0,
}
}
fn join_path(base: &Url, path: &str) -> Url {
base.join(path.trim_start_matches('/'))
.unwrap_or_else(|err| panic!("failed to join url {base} with path {path}: {err}"))
}
fn assignation_statuses(
responses: &[Result<MembershipResponse, reqwest::Error>],
expect_non_empty: bool,
) -> Vec<bool> {
responses
.iter()
.map(|res| {
res.as_ref()
.map(|resp| {
let is_non_empty = !resp.assignations.is_empty();
if expect_non_empty {
is_non_empty
} else {
!is_non_empty
}
})
.unwrap_or(false)
})
.collect()
}

View File

@ -0,0 +1,137 @@
use nomos_network::backends::libp2p::Libp2pInfo;
use reqwest::{Client, Url};
use tracing::warn;
use super::ReadinessCheck;
use crate::topology::Topology;
pub struct NetworkReadiness<'a> {
pub(crate) topology: &'a Topology,
pub(crate) expected_peer_counts: &'a [usize],
pub(crate) labels: &'a [String],
}
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for NetworkReadiness<'a> {
type Data = Vec<Libp2pInfo>;
async fn collect(&'a self) -> Self::Data {
let (validator_infos, executor_infos) = tokio::join!(
futures::future::join_all(
self.topology
.validators
.iter()
.map(crate::nodes::validator::Validator::network_info)
),
futures::future::join_all(
self.topology
.executors
.iter()
.map(crate::nodes::executor::Executor::network_info)
)
);
validator_infos.into_iter().chain(executor_infos).collect()
}
fn is_ready(&self, data: &Self::Data) -> bool {
data.iter()
.enumerate()
.all(|(idx, info)| info.n_peers >= self.expected_peer_counts[idx])
}
fn timeout_message(&self, data: Self::Data) -> String {
let summary = build_timeout_summary(self.labels, data, self.expected_peer_counts);
format!("timed out waiting for network readiness: {summary}")
}
}
pub struct HttpNetworkReadiness<'a> {
pub(crate) client: &'a Client,
pub(crate) endpoints: &'a [Url],
pub(crate) expected_peer_counts: &'a [usize],
pub(crate) labels: &'a [String],
}
#[async_trait::async_trait]
impl<'a> ReadinessCheck<'a> for HttpNetworkReadiness<'a> {
type Data = Vec<Libp2pInfo>;
async fn collect(&'a self) -> Self::Data {
let futures = self
.endpoints
.iter()
.map(|endpoint| fetch_network_info(self.client, endpoint));
futures::future::join_all(futures).await
}
fn is_ready(&self, data: &Self::Data) -> bool {
data.iter()
.enumerate()
.all(|(idx, info)| info.n_peers >= self.expected_peer_counts[idx])
}
fn timeout_message(&self, data: Self::Data) -> String {
let summary = build_timeout_summary(self.labels, data, self.expected_peer_counts);
format!("timed out waiting for network readiness: {summary}")
}
}
async fn fetch_network_info(client: &Client, base: &Url) -> Libp2pInfo {
let url = base
.join(nomos_http_api_common::paths::NETWORK_INFO.trim_start_matches('/'))
.unwrap_or_else(|err| {
panic!(
"failed to join url {base} with path {}: {err}",
nomos_http_api_common::paths::NETWORK_INFO
)
});
let response = match client.get(url).send().await {
Ok(resp) => resp,
Err(err) => {
return log_network_warning(base, err, "failed to reach network info endpoint");
}
};
let response = match response.error_for_status() {
Ok(resp) => resp,
Err(err) => {
return log_network_warning(base, err, "network info endpoint returned error");
}
};
match response.json::<Libp2pInfo>().await {
Ok(info) => info,
Err(err) => log_network_warning(base, err, "failed to decode network info response"),
}
}
fn log_network_warning(base: &Url, err: impl std::fmt::Display, message: &str) -> Libp2pInfo {
warn!(target: "readiness", url = %base, error = %err, "{message}");
empty_libp2p_info()
}
fn empty_libp2p_info() -> Libp2pInfo {
Libp2pInfo {
listen_addresses: Vec::with_capacity(0),
n_peers: 0,
n_connections: 0,
n_pending_connections: 0,
}
}
fn build_timeout_summary(
labels: &[String],
infos: Vec<Libp2pInfo>,
expected_counts: &[usize],
) -> String {
infos
.into_iter()
.zip(expected_counts.iter())
.zip(labels.iter())
.map(|((info, expected), label)| {
format!("{}: peers={}, expected={}", label, info.n_peers, expected)
})
.collect::<Vec<_>>()
.join(", ")
}