Add consensus header range API and adaptive liveness lag

This commit is contained in:
andrussal 2025-12-10 06:35:11 +01:00
parent 32bfe6cef2
commit 2fa5fe7973
2 changed files with 148 additions and 36 deletions

View File

@ -2,13 +2,14 @@ use std::net::SocketAddr;
use chain_service::CryptarchiaInfo; use chain_service::CryptarchiaInfo;
use common_http_client::CommonHttpClient; use common_http_client::CommonHttpClient;
use hex;
use nomos_core::{block::Block, da::BlobId, mantle::SignedMantleTx, sdp::SessionNumber}; use nomos_core::{block::Block, da::BlobId, mantle::SignedMantleTx, sdp::SessionNumber};
use nomos_da_network_core::swarm::{BalancerStats, MonitorStats}; use nomos_da_network_core::swarm::{BalancerStats, MonitorStats};
use nomos_da_network_service::MembershipResponse; use nomos_da_network_service::MembershipResponse;
use nomos_http_api_common::paths::{ use nomos_http_api_common::paths::{
CRYPTARCHIA_INFO, DA_BALANCER_STATS, DA_BLACKLISTED_PEERS, DA_BLOCK_PEER, DA_GET_MEMBERSHIP, CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_BALANCER_STATS, DA_BLACKLISTED_PEERS, DA_BLOCK_PEER,
DA_HISTORIC_SAMPLING, DA_MONITOR_STATS, DA_UNBLOCK_PEER, MEMPOOL_ADD_TX, NETWORK_INFO, DA_GET_MEMBERSHIP, DA_HISTORIC_SAMPLING, DA_MONITOR_STATS, DA_UNBLOCK_PEER, MEMPOOL_ADD_TX,
STORAGE_BLOCK, NETWORK_INFO, STORAGE_BLOCK,
}; };
use nomos_network::backends::libp2p::Libp2pInfo; use nomos_network::backends::libp2p::Libp2pInfo;
use nomos_node::{HeaderId, api::testing::handlers::HistoricSamplingRequest}; use nomos_node::{HeaderId, api::testing::handlers::HistoricSamplingRequest};
@ -226,6 +227,35 @@ impl ApiClient {
self.post_json_decode(STORAGE_BLOCK, id).await self.post_json_decode(STORAGE_BLOCK, id).await
} }
/// Fetch header ids between optional bounds.
/// When `from` is None, defaults to tip; when `to` is None, defaults to
/// LIB.
pub async fn consensus_headers(
&self,
from: Option<HeaderId>,
to: Option<HeaderId>,
) -> reqwest::Result<Vec<HeaderId>> {
let mut url = self.join_base(CRYPTARCHIA_HEADERS);
{
let mut pairs = url.query_pairs_mut();
if let Some(from) = from {
let bytes: [u8; 32] = from.into();
pairs.append_pair("from", &hex::encode(bytes));
}
if let Some(to) = to {
let bytes: [u8; 32] = to.into();
pairs.append_pair("to", &hex::encode(bytes));
}
}
self.client
.get(url)
.send()
.await?
.error_for_status()?
.json()
.await
}
/// Query DA membership via testing API. /// Query DA membership via testing API.
pub async fn da_get_membership( pub async fn da_get_membership(
&self, &self,

View File

@ -1,7 +1,11 @@
use std::time::Duration; use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use testing_framework_core::scenario::{DynError, Expectation, RunContext}; use nomos_core::header::HeaderId;
use testing_framework_core::{
nodes::ApiClient,
scenario::{DynError, Expectation, RunContext},
};
use thiserror::Error; use thiserror::Error;
use tokio::time::sleep; use tokio::time::sleep;
@ -24,6 +28,7 @@ const LAG_ALLOWANCE: u64 = 2;
const MIN_PROGRESS_BLOCKS: u64 = 5; const MIN_PROGRESS_BLOCKS: u64 = 5;
const REQUEST_RETRIES: usize = 5; const REQUEST_RETRIES: usize = 5;
const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(2); const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(2);
const MAX_LAG_ALLOWANCE: u64 = 5;
#[async_trait] #[async_trait]
impl Expectation for ConsensusLiveness { impl Expectation for ConsensusLiveness {
@ -34,7 +39,9 @@ impl Expectation for ConsensusLiveness {
async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> {
Self::ensure_participants(ctx)?; Self::ensure_participants(ctx)?;
let target_hint = Self::target_blocks(ctx); let target_hint = Self::target_blocks(ctx);
let check = Self::collect_results(ctx).await; let mut check = Self::collect_results(ctx).await;
let clients: Vec<_> = ctx.node_clients().all_clients().collect();
self.anchor_check(target_hint, &mut check, &clients).await;
(*self).report(target_hint, check) (*self).report(target_hint, check)
} }
} }
@ -51,6 +58,8 @@ enum ConsensusLivenessIssue {
height: u64, height: u64,
target: u64, target: u64,
}, },
#[error("{node} could not traverse to anchor tip {anchor_tip:?}")]
MissingAnchorPath { node: String, anchor_tip: HeaderId },
#[error("{node} consensus_info failed: {source}")] #[error("{node} consensus_info failed: {source}")]
RequestFailed { RequestFailed {
node: String, node: String,
@ -92,48 +101,41 @@ impl ConsensusLiveness {
} }
async fn collect_results(ctx: &RunContext) -> LivenessCheck { async fn collect_results(ctx: &RunContext) -> LivenessCheck {
let participant_count = ctx.node_clients().all_clients().count().max(1); let clients: Vec<_> = ctx.node_clients().all_clients().collect();
let max_attempts = participant_count * REQUEST_RETRIES; let mut samples = Vec::with_capacity(clients.len());
let mut samples = Vec::with_capacity(participant_count);
let mut issues = Vec::new(); let mut issues = Vec::new();
for attempt in 0..max_attempts { for (idx, client) in clients.iter().enumerate() {
match Self::fetch_cluster_height(ctx).await { for attempt in 0..REQUEST_RETRIES {
Ok(height) => { match Self::fetch_cluster_info(client).await {
samples.push(NodeSample { Ok((height, tip)) => {
label: format!("sample-{attempt}"), samples.push(NodeSample {
height, label: format!("node-{idx}"),
}); height,
if samples.len() >= participant_count { tip,
});
break; break;
} }
Err(err) if attempt + 1 == REQUEST_RETRIES => {
issues.push(ConsensusLivenessIssue::RequestFailed {
node: format!("node-{idx}"),
source: err,
});
}
Err(_) => sleep(REQUEST_RETRY_DELAY).await,
} }
Err(err) => issues.push(ConsensusLivenessIssue::RequestFailed {
node: format!("sample-{attempt}"),
source: err,
}),
}
if samples.len() < participant_count {
sleep(REQUEST_RETRY_DELAY).await;
} }
} }
LivenessCheck { samples, issues } LivenessCheck { samples, issues }
} }
async fn fetch_cluster_height(ctx: &RunContext) -> Result<u64, DynError> { async fn fetch_cluster_info(client: &ApiClient) -> Result<(u64, HeaderId), DynError> {
ctx.cluster_client() client
.try_all_clients(|client| { .consensus_info()
Box::pin(async move {
client
.consensus_info()
.await
.map(|info| info.height)
.map_err(|err| -> DynError { err.into() })
})
})
.await .await
.map(|info| (info.height, info.tip))
.map_err(|err| -> DynError { err.into() })
} }
#[must_use] #[must_use]
@ -143,6 +145,83 @@ impl ConsensusLiveness {
self self
} }
fn effective_lag_allowance(&self, target: u64) -> u64 {
(target / 10).clamp(self.lag_allowance, MAX_LAG_ALLOWANCE)
}
async fn anchor_check(
&self,
target_hint: u64,
check: &mut LivenessCheck,
clients: &[&ApiClient],
) {
if let Some((_, anchor_tip)) = check
.samples
.iter()
.min_by_key(|s| s.height)
.map(|s| (s.height, s.tip))
{
let max_height = check
.samples
.iter()
.map(|sample| sample.height)
.max()
.unwrap_or(0);
let mut target = target_hint;
if target == 0 || target > max_height {
target = max_height;
}
let lag_allowance = self.effective_lag_allowance(target);
for (idx, sample) in check.samples.iter().enumerate() {
if sample.height + lag_allowance < target {
continue;
}
match clients
.get(idx)
.unwrap()
.consensus_headers(Some(sample.tip), Some(anchor_tip))
.await
{
Ok(headers) if !headers.is_empty() && headers.first() == Some(&anchor_tip) => {
tracing::debug!(
node = %sample.label,
count = headers.len(),
"anchor check headers fetched"
);
}
Ok(headers) => {
tracing::debug!(
node = %sample.label,
count = headers.len(),
"anchor check returned empty header list"
);
check
.issues
.push(ConsensusLivenessIssue::MissingAnchorPath {
node: sample.label.clone(),
anchor_tip,
});
}
Err(err) => {
tracing::debug!(
node = %sample.label,
"anchor check failed to fetch headers: {err}"
);
check
.issues
.push(ConsensusLivenessIssue::MissingAnchorPath {
node: sample.label.clone(),
anchor_tip,
});
}
}
}
}
}
fn report(self, target_hint: u64, mut check: LivenessCheck) -> Result<(), DynError> { fn report(self, target_hint: u64, mut check: LivenessCheck) -> Result<(), DynError> {
if check.samples.is_empty() { if check.samples.is_empty() {
return Err(Box::new(ConsensusLivenessError::MissingParticipants)); return Err(Box::new(ConsensusLivenessError::MissingParticipants));
@ -159,6 +238,7 @@ impl ConsensusLiveness {
if target == 0 || target > max_height { if target == 0 || target > max_height {
target = max_height; target = max_height;
} }
let lag_allowance = self.effective_lag_allowance(target);
if max_height < MIN_PROGRESS_BLOCKS { if max_height < MIN_PROGRESS_BLOCKS {
check check
@ -171,7 +251,7 @@ impl ConsensusLiveness {
} }
for sample in &check.samples { for sample in &check.samples {
if sample.height + self.lag_allowance < target { if sample.height + lag_allowance < target {
check check
.issues .issues
.push(ConsensusLivenessIssue::HeightBelowTarget { .push(ConsensusLivenessIssue::HeightBelowTarget {
@ -186,6 +266,7 @@ impl ConsensusLiveness {
tracing::info!( tracing::info!(
target, target,
heights = ?check.samples.iter().map(|s| s.height).collect::<Vec<_>>(), heights = ?check.samples.iter().map(|s| s.height).collect::<Vec<_>>(),
tips = ?check.samples.iter().map(|s| s.tip).collect::<Vec<_>>(),
"consensus liveness expectation satisfied" "consensus liveness expectation satisfied"
); );
Ok(()) Ok(())
@ -201,6 +282,7 @@ impl ConsensusLiveness {
struct NodeSample { struct NodeSample {
label: String, label: String,
height: u64, height: u64,
tip: HeaderId,
} }
struct LivenessCheck { struct LivenessCheck {