From 2fa5fe7973ad3f0fd0d707220d7098d634235fe6 Mon Sep 17 00:00:00 2001 From: andrussal Date: Wed, 10 Dec 2025 06:35:11 +0100 Subject: [PATCH] Add consensus header range API and adaptive liveness lag --- .../core/src/nodes/api_client.rs | 36 ++++- .../src/expectations/consensus_liveness.rs | 148 ++++++++++++++---- 2 files changed, 148 insertions(+), 36 deletions(-) diff --git a/testing-framework/core/src/nodes/api_client.rs b/testing-framework/core/src/nodes/api_client.rs index f9a69d8..7e26b82 100644 --- a/testing-framework/core/src/nodes/api_client.rs +++ b/testing-framework/core/src/nodes/api_client.rs @@ -2,13 +2,14 @@ use std::net::SocketAddr; use chain_service::CryptarchiaInfo; use common_http_client::CommonHttpClient; +use hex; use nomos_core::{block::Block, da::BlobId, mantle::SignedMantleTx, sdp::SessionNumber}; use nomos_da_network_core::swarm::{BalancerStats, MonitorStats}; use nomos_da_network_service::MembershipResponse; use nomos_http_api_common::paths::{ - CRYPTARCHIA_INFO, DA_BALANCER_STATS, DA_BLACKLISTED_PEERS, DA_BLOCK_PEER, DA_GET_MEMBERSHIP, - DA_HISTORIC_SAMPLING, DA_MONITOR_STATS, DA_UNBLOCK_PEER, MEMPOOL_ADD_TX, NETWORK_INFO, - STORAGE_BLOCK, + CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_BALANCER_STATS, DA_BLACKLISTED_PEERS, DA_BLOCK_PEER, + DA_GET_MEMBERSHIP, DA_HISTORIC_SAMPLING, DA_MONITOR_STATS, DA_UNBLOCK_PEER, MEMPOOL_ADD_TX, + NETWORK_INFO, STORAGE_BLOCK, }; use nomos_network::backends::libp2p::Libp2pInfo; use nomos_node::{HeaderId, api::testing::handlers::HistoricSamplingRequest}; @@ -226,6 +227,35 @@ impl ApiClient { self.post_json_decode(STORAGE_BLOCK, id).await } + /// Fetch header ids between optional bounds. + /// When `from` is None, defaults to tip; when `to` is None, defaults to + /// LIB. + pub async fn consensus_headers( + &self, + from: Option, + to: Option, + ) -> reqwest::Result> { + let mut url = self.join_base(CRYPTARCHIA_HEADERS); + { + let mut pairs = url.query_pairs_mut(); + if let Some(from) = from { + let bytes: [u8; 32] = from.into(); + pairs.append_pair("from", &hex::encode(bytes)); + } + if let Some(to) = to { + let bytes: [u8; 32] = to.into(); + pairs.append_pair("to", &hex::encode(bytes)); + } + } + self.client + .get(url) + .send() + .await? + .error_for_status()? + .json() + .await + } + /// Query DA membership via testing API. pub async fn da_get_membership( &self, diff --git a/testing-framework/workflows/src/expectations/consensus_liveness.rs b/testing-framework/workflows/src/expectations/consensus_liveness.rs index 6124649..8bc3b8d 100644 --- a/testing-framework/workflows/src/expectations/consensus_liveness.rs +++ b/testing-framework/workflows/src/expectations/consensus_liveness.rs @@ -1,7 +1,11 @@ use std::time::Duration; use async_trait::async_trait; -use testing_framework_core::scenario::{DynError, Expectation, RunContext}; +use nomos_core::header::HeaderId; +use testing_framework_core::{ + nodes::ApiClient, + scenario::{DynError, Expectation, RunContext}, +}; use thiserror::Error; use tokio::time::sleep; @@ -24,6 +28,7 @@ const LAG_ALLOWANCE: u64 = 2; const MIN_PROGRESS_BLOCKS: u64 = 5; const REQUEST_RETRIES: usize = 5; const REQUEST_RETRY_DELAY: Duration = Duration::from_secs(2); +const MAX_LAG_ALLOWANCE: u64 = 5; #[async_trait] impl Expectation for ConsensusLiveness { @@ -34,7 +39,9 @@ impl Expectation for ConsensusLiveness { async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { Self::ensure_participants(ctx)?; let target_hint = Self::target_blocks(ctx); - let check = Self::collect_results(ctx).await; + let mut check = Self::collect_results(ctx).await; + let clients: Vec<_> = ctx.node_clients().all_clients().collect(); + self.anchor_check(target_hint, &mut check, &clients).await; (*self).report(target_hint, check) } } @@ -51,6 +58,8 @@ enum ConsensusLivenessIssue { height: u64, target: u64, }, + #[error("{node} could not traverse to anchor tip {anchor_tip:?}")] + MissingAnchorPath { node: String, anchor_tip: HeaderId }, #[error("{node} consensus_info failed: {source}")] RequestFailed { node: String, @@ -92,48 +101,41 @@ impl ConsensusLiveness { } async fn collect_results(ctx: &RunContext) -> LivenessCheck { - let participant_count = ctx.node_clients().all_clients().count().max(1); - let max_attempts = participant_count * REQUEST_RETRIES; - let mut samples = Vec::with_capacity(participant_count); + let clients: Vec<_> = ctx.node_clients().all_clients().collect(); + let mut samples = Vec::with_capacity(clients.len()); let mut issues = Vec::new(); - for attempt in 0..max_attempts { - match Self::fetch_cluster_height(ctx).await { - Ok(height) => { - samples.push(NodeSample { - label: format!("sample-{attempt}"), - height, - }); - if samples.len() >= participant_count { + for (idx, client) in clients.iter().enumerate() { + for attempt in 0..REQUEST_RETRIES { + match Self::fetch_cluster_info(client).await { + Ok((height, tip)) => { + samples.push(NodeSample { + label: format!("node-{idx}"), + height, + tip, + }); break; } + Err(err) if attempt + 1 == REQUEST_RETRIES => { + issues.push(ConsensusLivenessIssue::RequestFailed { + node: format!("node-{idx}"), + source: err, + }); + } + Err(_) => sleep(REQUEST_RETRY_DELAY).await, } - Err(err) => issues.push(ConsensusLivenessIssue::RequestFailed { - node: format!("sample-{attempt}"), - source: err, - }), - } - - if samples.len() < participant_count { - sleep(REQUEST_RETRY_DELAY).await; } } LivenessCheck { samples, issues } } - async fn fetch_cluster_height(ctx: &RunContext) -> Result { - ctx.cluster_client() - .try_all_clients(|client| { - Box::pin(async move { - client - .consensus_info() - .await - .map(|info| info.height) - .map_err(|err| -> DynError { err.into() }) - }) - }) + async fn fetch_cluster_info(client: &ApiClient) -> Result<(u64, HeaderId), DynError> { + client + .consensus_info() .await + .map(|info| (info.height, info.tip)) + .map_err(|err| -> DynError { err.into() }) } #[must_use] @@ -143,6 +145,83 @@ impl ConsensusLiveness { self } + fn effective_lag_allowance(&self, target: u64) -> u64 { + (target / 10).clamp(self.lag_allowance, MAX_LAG_ALLOWANCE) + } + + async fn anchor_check( + &self, + target_hint: u64, + check: &mut LivenessCheck, + clients: &[&ApiClient], + ) { + if let Some((_, anchor_tip)) = check + .samples + .iter() + .min_by_key(|s| s.height) + .map(|s| (s.height, s.tip)) + { + let max_height = check + .samples + .iter() + .map(|sample| sample.height) + .max() + .unwrap_or(0); + + let mut target = target_hint; + if target == 0 || target > max_height { + target = max_height; + } + + let lag_allowance = self.effective_lag_allowance(target); + for (idx, sample) in check.samples.iter().enumerate() { + if sample.height + lag_allowance < target { + continue; + } + + match clients + .get(idx) + .unwrap() + .consensus_headers(Some(sample.tip), Some(anchor_tip)) + .await + { + Ok(headers) if !headers.is_empty() && headers.first() == Some(&anchor_tip) => { + tracing::debug!( + node = %sample.label, + count = headers.len(), + "anchor check headers fetched" + ); + } + Ok(headers) => { + tracing::debug!( + node = %sample.label, + count = headers.len(), + "anchor check returned empty header list" + ); + check + .issues + .push(ConsensusLivenessIssue::MissingAnchorPath { + node: sample.label.clone(), + anchor_tip, + }); + } + Err(err) => { + tracing::debug!( + node = %sample.label, + "anchor check failed to fetch headers: {err}" + ); + check + .issues + .push(ConsensusLivenessIssue::MissingAnchorPath { + node: sample.label.clone(), + anchor_tip, + }); + } + } + } + } + } + fn report(self, target_hint: u64, mut check: LivenessCheck) -> Result<(), DynError> { if check.samples.is_empty() { return Err(Box::new(ConsensusLivenessError::MissingParticipants)); @@ -159,6 +238,7 @@ impl ConsensusLiveness { if target == 0 || target > max_height { target = max_height; } + let lag_allowance = self.effective_lag_allowance(target); if max_height < MIN_PROGRESS_BLOCKS { check @@ -171,7 +251,7 @@ impl ConsensusLiveness { } for sample in &check.samples { - if sample.height + self.lag_allowance < target { + if sample.height + lag_allowance < target { check .issues .push(ConsensusLivenessIssue::HeightBelowTarget { @@ -186,6 +266,7 @@ impl ConsensusLiveness { tracing::info!( target, heights = ?check.samples.iter().map(|s| s.height).collect::>(), + tips = ?check.samples.iter().map(|s| s.tip).collect::>(), "consensus liveness expectation satisfied" ); Ok(()) @@ -201,6 +282,7 @@ impl ConsensusLiveness { struct NodeSample { label: String, height: u64, + tip: HeaderId, } struct LivenessCheck {