From a54eb53697e38850f31c9d41a5afe93b6797fdd8 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sat, 13 Dec 2025 11:36:47 +0100 Subject: [PATCH] Fix DA workload inclusions in compose runs --- .cargo-deny.toml | 1 - Cargo.lock | 4 +- testing-framework/configs/Cargo.toml | 2 - .../core/src/scenario/runtime/runner.rs | 54 +++++++-- testing-framework/runners/compose/Cargo.toml | 13 ++- .../runners/compose/src/deployer/mod.rs | 2 +- testing-framework/workflows/Cargo.toml | 1 - .../workflows/src/workloads/da/expectation.rs | 108 +++++++++++++++--- .../workflows/src/workloads/da/workload.rs | 99 ++++++++++------ .../workflows/src/workloads/util.rs | 50 +++++--- 10 files changed, 240 insertions(+), 94 deletions(-) diff --git a/.cargo-deny.toml b/.cargo-deny.toml index 426bb9f..d20a51e 100644 --- a/.cargo-deny.toml +++ b/.cargo-deny.toml @@ -30,7 +30,6 @@ allow = [ "BSD-3-Clause", "BSL-1.0", "CC0-1.0", - "CDDL-1.0", "CDLA-Permissive-2.0", "ISC", "MIT", diff --git a/Cargo.lock b/Cargo.lock index 8914a67..2f888b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6767,7 +6767,6 @@ dependencies = [ "chain-service", "cryptarchia-engine", "cryptarchia-sync", - "ed25519-dalek", "groth16", "hex", "key-management-system-service", @@ -6795,7 +6794,6 @@ dependencies = [ "subnetworks-assignations", "time", "tracing", - "zksign", ] [[package]] @@ -6843,6 +6841,7 @@ dependencies = [ "async-trait", "cfgsync", "groth16", + "key-management-system-service", "nomos-core", "nomos-ledger", "nomos-tracing-service", @@ -6896,7 +6895,6 @@ version = "0.1.0" dependencies = [ "async-trait", "chain-service", - "ed25519-dalek", "executor-http-client", "futures", "key-management-system-service", diff --git a/testing-framework/configs/Cargo.toml b/testing-framework/configs/Cargo.toml index b8294fc..aee02f7 100644 --- a/testing-framework/configs/Cargo.toml +++ b/testing-framework/configs/Cargo.toml @@ -16,7 +16,6 @@ chain-network = { workspace = true } chain-service = { workspace = true } cryptarchia-engine = { workspace = true, features = ["serde"] } cryptarchia-sync = { workspace = true } -ed25519-dalek = { version = "2.2.0", features = ["rand_core", "serde"] } groth16 = { workspace = true } hex = { version = "0.4.3", default-features = false } key-management-system-service = { workspace = true } @@ -44,7 +43,6 @@ serde = { workspace = true, features = ["derive"] } subnetworks-assignations = { workspace = true } time = { version = "0.3", default-features = true } tracing = { workspace = true } -zksign = { workspace = true } [lints] workspace = true diff --git a/testing-framework/core/src/scenario/runtime/runner.rs b/testing-framework/core/src/scenario/runtime/runner.rs index 9901ab4..98f2ead 100644 --- a/testing-framework/core/src/scenario/runtime/runner.rs +++ b/testing-framework/core/src/scenario/runtime/runner.rs @@ -69,7 +69,7 @@ impl Runner { return Err(error); } - Self::cooldown(&context).await; + Self::settle_before_expectations(&context).await; if let Err(error) = Self::run_expectations(scenario.expectations_mut(), context.as_ref()).await @@ -104,9 +104,40 @@ impl Runner { { let mut workloads = Self::spawn_workloads(scenario, context); let _ = Self::drive_until_timer(&mut workloads, scenario.duration()).await?; + + // Keep workloads running during the cooldown window so that late + // inclusions (especially DA parent-linked ops) still have a chance to + // land before expectations evaluate. We still abort everything at the + // end of cooldown to prevent leaking tasks across runs. + if let Some(cooldown) = Self::cooldown_duration(context.as_ref()) { + if !cooldown.is_zero() { + if workloads.is_empty() { + sleep(cooldown).await; + } else { + let _ = Self::drive_until_timer(&mut workloads, cooldown).await?; + } + } + } + Self::drain_workloads(&mut workloads).await } + async fn settle_before_expectations(context: &Arc) { + // `BlockFeed` polls node storage on an interval. After we abort workloads + // we give the feed a moment to catch up with the last blocks that might + // include workload operations so expectations evaluate on a more stable + // snapshot. + let has_node_control = context.node_control().is_some(); + let hint = context.run_metrics().block_interval_hint(); + if !has_node_control && hint.is_none() { + return; + } + + let mut wait = hint.unwrap_or_else(|| Duration::from_secs(1)); + wait = wait.max(Duration::from_secs(2)); + sleep(wait).await; + } + /// Evaluates every registered expectation, aggregating failures so callers /// can see all missing conditions in a single report. async fn run_expectations( @@ -133,14 +164,6 @@ impl Runner { Err(ScenarioError::Expectations(summary.into())) } - async fn cooldown(context: &Arc) { - if let Some(wait) = Self::cooldown_duration(context) { - if !wait.is_zero() { - sleep(wait).await; - } - } - } - fn cooldown_duration(context: &RunContext) -> Option { let metrics = context.run_metrics(); let needs_stabilization = context.node_control().is_some(); @@ -149,6 +172,19 @@ impl Runner { return None; } let mut wait = interval.mul_f64(5.0); + // Expectations observe blocks via `BlockFeed`, which ultimately + // follows the chain information returned by `consensus_info`. + // When the consensus uses a security parameter (finality depth), + // newly included operations can take ~k blocks to show up in the + // observable chain. Short smoke runs otherwise end up evaluating + // before finality catches up, systematically failing inclusion + // expectations (especially for DA, where ops are parent-linked). + let security_param = context + .descriptors() + .config() + .consensus_params + .security_param; + wait = wait.max(interval.mul_f64(security_param.get() as f64)); if needs_stabilization { let minimum = Duration::from_secs(30); wait = wait.max(minimum); diff --git a/testing-framework/runners/compose/Cargo.toml b/testing-framework/runners/compose/Cargo.toml index 49dcfe8..a00a0c6 100644 --- a/testing-framework/runners/compose/Cargo.toml +++ b/testing-framework/runners/compose/Cargo.toml @@ -28,9 +28,10 @@ url = { version = "2" } uuid = { version = "1", features = ["v4"] } [dev-dependencies] -groth16 = { workspace = true } -nomos-core = { workspace = true } -nomos-ledger = { workspace = true } -nomos-tracing-service = { workspace = true } -tests = { workspace = true } -zksign = { workspace = true } +groth16 = { workspace = true } +key-management-system-service = { workspace = true } +nomos-core = { workspace = true } +nomos-ledger = { workspace = true } +nomos-tracing-service = { workspace = true } +tests = { workspace = true } +zksign = { workspace = true } diff --git a/testing-framework/runners/compose/src/deployer/mod.rs b/testing-framework/runners/compose/src/deployer/mod.rs index 8851de6..4060acf 100644 --- a/testing-framework/runners/compose/src/deployer/mod.rs +++ b/testing-framework/runners/compose/src/deployer/mod.rs @@ -91,7 +91,7 @@ mod tests { host::{Host, PortOverrides}, }; use groth16::Fr; - use key_management_system_keys::keys::ZkPublicKey; + use key_management_system_service::keys::ZkPublicKey; use nomos_core::{ mantle::{GenesisTx as GenesisTxTrait, ledger::NoteId}, sdp::{ProviderId, ServiceType}, diff --git a/testing-framework/workflows/Cargo.toml b/testing-framework/workflows/Cargo.toml index 0af58b0..7ba08e4 100644 --- a/testing-framework/workflows/Cargo.toml +++ b/testing-framework/workflows/Cargo.toml @@ -15,7 +15,6 @@ workspace = true [dependencies] async-trait = "0.1" chain-service = { workspace = true } -ed25519-dalek = { version = "2.2.0", features = ["rand_core", "serde"] } executor-http-client = { workspace = true } futures = "0.3" key-management-system-service = { workspace = true } diff --git a/testing-framework/workflows/src/workloads/da/expectation.rs b/testing-framework/workflows/src/workloads/da/expectation.rs index 4e33e49..87d7e73 100644 --- a/testing-framework/workflows/src/workloads/da/expectation.rs +++ b/testing-framework/workflows/src/workloads/da/expectation.rs @@ -1,7 +1,11 @@ use std::{ collections::{HashMap, HashSet}, num::NonZeroU64, - sync::{Arc, Mutex}, + sync::{ + Arc, Mutex, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, }; use async_trait::async_trait; @@ -13,7 +17,7 @@ use testing_framework_core::scenario::{BlockRecord, DynError, Expectation, RunCo use thiserror::Error; use tokio::sync::broadcast; -use super::workload::{planned_blob_count, planned_channel_count, planned_channel_ids}; +use super::workload::{planned_channel_count, planned_channel_ids}; #[derive(Debug)] pub struct DaWorkloadExpectation { @@ -28,7 +32,8 @@ struct CaptureState { planned: Arc>, inscriptions: Arc>>, blobs: Arc>>, - expected_total_blobs: u64, + run_blocks: Arc, + run_duration: Duration, } const MIN_INCLUSION_RATIO: f64 = 0.8; @@ -37,10 +42,26 @@ const MIN_INCLUSION_RATIO: f64 = 0.8; enum DaExpectationError { #[error("da workload expectation not started")] NotCaptured, - #[error("missing inscriptions for {missing:?}")] - MissingInscriptions { missing: Vec }, - #[error("missing blobs for {missing:?}")] - MissingBlobs { missing: Vec }, + #[error( + "missing inscriptions: observed={observed}/{planned} required={required} missing={missing:?}" + )] + MissingInscriptions { + planned: usize, + observed: usize, + required: usize, + missing: Vec, + }, + #[error( + "missing blobs: observed_total_blobs={observed_total_blobs} expected_total_blobs={expected_total_blobs} required_blobs={required_blobs} channels_with_blobs={channels_with_blobs}/{planned_channels} missing_channels={missing:?}" + )] + MissingBlobs { + expected_total_blobs: u64, + observed_total_blobs: u64, + required_blobs: u64, + planned_channels: usize, + channels_with_blobs: usize, + missing: Vec, + }, } impl DaWorkloadExpectation { @@ -75,19 +96,42 @@ impl Expectation for DaWorkloadExpectation { self.headroom_percent, )); - let expected_total_blobs = planned_blob_count(self.blob_rate_per_block, &ctx.run_metrics()); + let run_duration = ctx.run_metrics().run_duration(); tracing::info!( planned_channels = planned_ids.len(), blob_rate_per_block = self.blob_rate_per_block.get(), headroom_percent = self.headroom_percent, - expected_total_blobs, + run_duration_secs = run_duration.as_secs(), "DA inclusion expectation starting capture" ); let planned = Arc::new(planned_ids.iter().copied().collect::>()); let inscriptions = Arc::new(Mutex::new(HashSet::new())); let blobs = Arc::new(Mutex::new(HashMap::new())); + let run_blocks = Arc::new(AtomicU64::new(0)); + + { + let run_blocks = Arc::clone(&run_blocks); + let mut receiver = ctx.block_feed().subscribe(); + tokio::spawn(async move { + let timer = tokio::time::sleep(run_duration); + tokio::pin!(timer); + + loop { + tokio::select! { + _ = &mut timer => break, + result = receiver.recv() => match result { + Ok(_) => { + run_blocks.fetch_add(1, Ordering::Relaxed); + } + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => break, + } + } + } + }); + } let mut receiver = ctx.block_feed().subscribe(); let planned_for_task = Arc::clone(&planned); @@ -118,7 +162,8 @@ impl Expectation for DaWorkloadExpectation { planned, inscriptions, blobs, - expected_total_blobs, + run_blocks, + run_duration, }); Ok(()) @@ -140,7 +185,8 @@ impl Expectation for DaWorkloadExpectation { missing_channels(&state.planned, &inscriptions) }; let required_inscriptions = minimum_required(planned_total, MIN_INCLUSION_RATIO); - if planned_total.saturating_sub(missing_inscriptions.len()) < required_inscriptions { + let observed_inscriptions = planned_total.saturating_sub(missing_inscriptions.len()); + if observed_inscriptions < required_inscriptions { tracing::warn!( planned = planned_total, missing = missing_inscriptions.len(), @@ -148,6 +194,9 @@ impl Expectation for DaWorkloadExpectation { "DA expectation missing inscriptions" ); return Err(DaExpectationError::MissingInscriptions { + planned: planned_total, + observed: observed_inscriptions, + required: required_inscriptions, missing: missing_inscriptions, } .into()); @@ -157,16 +206,43 @@ impl Expectation for DaWorkloadExpectation { let blobs = state.blobs.lock().expect("blob lock poisoned"); blobs.values().sum::() }; - let required_blobs = minimum_required_u64(state.expected_total_blobs, MIN_INCLUSION_RATIO); + + let channels_with_blobs: HashSet = { + let blobs = state.blobs.lock().expect("blob lock poisoned"); + blobs + .iter() + .filter(|(_, count)| **count > 0) + .map(|(channel, _)| *channel) + .collect::>() + }; + + let observed_blocks = state.run_blocks.load(Ordering::Relaxed).max(1); + let expected_total_blobs = self + .blob_rate_per_block + .get() + .saturating_mul(observed_blocks); + + let missing_blob_channels = missing_channels(&state.planned, &channels_with_blobs); + let required_blobs = minimum_required_u64(expected_total_blobs, MIN_INCLUSION_RATIO); if observed_total_blobs < required_blobs { tracing::warn!( - planned = state.expected_total_blobs, - observed = observed_total_blobs, - required = required_blobs, + expected_total_blobs, + observed_total_blobs, + required_blobs, + observed_blocks, + run_duration_secs = state.run_duration.as_secs(), + missing_blob_channels = missing_blob_channels.len(), "DA expectation missing blobs" ); return Err(DaExpectationError::MissingBlobs { - missing: Vec::new(), + expected_total_blobs, + observed_total_blobs, + required_blobs, + planned_channels: planned_total, + channels_with_blobs: channels_with_blobs.len(), + // Best-effort diagnostics: which planned channels never got any + // blob included. + missing: missing_blob_channels, } .into()); } diff --git a/testing-framework/workflows/src/workloads/da/workload.rs b/testing-framework/workflows/src/workloads/da/workload.rs index 0849067..c1267ec 100644 --- a/testing-framework/workflows/src/workloads/da/workload.rs +++ b/testing-framework/workflows/src/workloads/da/workload.rs @@ -1,18 +1,20 @@ use std::{num::NonZeroU64, sync::Arc, time::Duration}; use async_trait::async_trait; -use ed25519_dalek::SigningKey; use executor_http_client::ExecutorHttpClient; use futures::future::try_join_all; -use key_management_system_service::keys::Ed25519PublicKey; +use key_management_system_service::keys::{Ed25519Key, Ed25519PublicKey}; use nomos_core::{ da::BlobId, - mantle::ops::{ - Op, - channel::{ChannelId, MsgId}, + mantle::{ + AuthenticatedMantleTx as _, + ops::{ + Op, + channel::{ChannelId, MsgId}, + }, }, }; -use rand::{Rng as _, RngCore as _, seq::SliceRandom as _, thread_rng}; +use rand::{RngCore as _, seq::SliceRandom as _, thread_rng}; use testing_framework_core::{ nodes::ApiClient, scenario::{ @@ -30,8 +32,7 @@ use crate::{ const TEST_KEY_BYTES: [u8; 32] = [0u8; 32]; const DEFAULT_BLOB_RATE_PER_BLOCK: u64 = 1; const DEFAULT_CHANNEL_RATE_PER_BLOCK: u64 = 1; -const MIN_BLOB_CHUNKS: usize = 1; -const MAX_BLOB_CHUNKS: usize = 8; +const BLOB_CHUNK_OPTIONS: &[usize] = &[1, 2, 4, 8]; const PUBLISH_RETRIES: usize = 5; const PUBLISH_RETRY_DELAY: Duration = Duration::from_secs(2); const DEFAULT_HEADROOM_PERCENT: u64 = 20; @@ -112,9 +113,8 @@ impl ScenarioWorkload for Workload { try_join_all(planned_channels.into_iter().map(|channel_id| { let ctx = ctx; async move { - let mut receiver = ctx.block_feed().subscribe(); tracing::info!(channel_id = ?channel_id, blobs = per_channel_target, "DA workload starting channel flow"); - run_channel_flow(ctx, &mut receiver, channel_id, per_channel_target).await?; + run_channel_flow(ctx, channel_id, per_channel_target).await?; tracing::info!(channel_id = ?channel_id, "DA workload finished channel flow"); Ok::<(), DynError>(()) } @@ -128,22 +128,31 @@ impl ScenarioWorkload for Workload { async fn run_channel_flow( ctx: &RunContext, - receiver: &mut broadcast::Receiver>, channel_id: ChannelId, target_blobs: u64, ) -> Result<(), DynError> { tracing::debug!(channel_id = ?channel_id, "DA: submitting inscription tx"); - let tx = Arc::new(tx::create_inscription_transaction_with_id(channel_id)); - submit_transaction_via_cluster(ctx, Arc::clone(&tx)).await?; + let inscription_tx = Arc::new(tx::create_inscription_transaction_with_id(channel_id)); + submit_transaction_via_cluster(ctx, Arc::clone(&inscription_tx)).await?; + + let mut receiver = ctx.block_feed().subscribe(); + let inscription_id = wait_for_inscription(&mut receiver, channel_id).await?; - let inscription_id = wait_for_inscription(receiver, channel_id).await?; - tracing::debug!(channel_id = ?channel_id, inscription_id = ?inscription_id, "DA: inscription observed"); let mut parent_id = inscription_id; + for idx in 0..target_blobs { + let payload = random_blob_payload(); + let published_blob_id = publish_blob(ctx, channel_id, parent_id, payload).await?; + let (next_parent, included_blob_id) = + wait_for_blob_with_parent(&mut receiver, channel_id, parent_id).await?; + parent_id = next_parent; - for _ in 0..target_blobs { - let blob_id = publish_blob(ctx, channel_id, parent_id).await?; - tracing::debug!(channel_id = ?channel_id, blob_id = ?blob_id, "DA: blob published"); - parent_id = wait_for_blob(receiver, channel_id, blob_id).await?; + tracing::debug!( + channel_id = ?channel_id, + blob_index = idx, + published_blob_id = ?published_blob_id, + included_blob_id = ?included_blob_id, + "DA: blob published" + ); } Ok(()) } @@ -164,22 +173,32 @@ async fn wait_for_inscription( .await } -async fn wait_for_blob( +async fn wait_for_blob_with_parent( receiver: &mut broadcast::Receiver>, channel_id: ChannelId, - blob_id: BlobId, -) -> Result { - wait_for_channel_op(receiver, move |op| { - if let Op::ChannelBlob(blob_op) = op - && blob_op.channel == channel_id - && blob_op.blob == blob_id - { - Some(blob_op.id()) - } else { - None + parent_msg: MsgId, +) -> Result<(MsgId, BlobId), DynError> { + loop { + match receiver.recv().await { + Ok(record) => { + for tx in record.block.transactions() { + for op in &tx.mantle_tx().ops { + if let Op::ChannelBlob(blob_op) = op + && blob_op.channel == channel_id + && blob_op.parent == parent_msg + { + let msg_id = blob_op.id(); + return Ok((msg_id, blob_op.blob)); + } + } + } + } + Err(broadcast::error::RecvError::Lagged(_)) => {} + Err(broadcast::error::RecvError::Closed) => { + return Err("block feed closed while waiting for channel operations".into()); + } } - }) - .await + } } async fn wait_for_channel_op( @@ -209,16 +228,14 @@ async fn publish_blob( ctx: &RunContext, channel_id: ChannelId, parent_msg: MsgId, + data: Vec, ) -> Result { let executors = ctx.node_clients().executor_clients(); if executors.is_empty() { return Err("da workload requires at least one executor".into()); } - let signer: Ed25519PublicKey = SigningKey::from_bytes(&TEST_KEY_BYTES) - .verifying_key() - .into(); - let data = random_blob_payload(); + let signer = test_signer(); tracing::debug!(channel = ?channel_id, payload_bytes = data.len(), "DA: prepared blob payload"); let client = ExecutorHttpClient::new(None); @@ -248,9 +265,17 @@ async fn publish_blob( Err(last_err.unwrap_or_else(|| "da workload could not publish blob".into())) } +fn test_signer() -> Ed25519PublicKey { + Ed25519Key::from_bytes(&TEST_KEY_BYTES).public_key() +} + fn random_blob_payload() -> Vec { let mut rng = thread_rng(); - let chunks = rng.gen_range(MIN_BLOB_CHUNKS..=MAX_BLOB_CHUNKS); + // KZGRS encoder expects the polynomial degree to be a power of two, which + // effectively constrains the blob chunk count. + let chunks = *BLOB_CHUNK_OPTIONS + .choose(&mut rng) + .expect("non-empty chunk options"); let mut data = vec![0u8; 31 * chunks]; rng.fill_bytes(&mut data); data diff --git a/testing-framework/workflows/src/workloads/util.rs b/testing-framework/workflows/src/workloads/util.rs index 034e9be..498140d 100644 --- a/testing-framework/workflows/src/workloads/util.rs +++ b/testing-framework/workflows/src/workloads/util.rs @@ -7,6 +7,7 @@ use nomos_core::{ ops::{Op, channel::MsgId}, }, }; +use rand::{seq::SliceRandom as _, thread_rng}; use testing_framework_core::scenario::{DynError, RunContext}; use tracing::debug; @@ -38,22 +39,35 @@ pub async fn submit_transaction_via_cluster( tx: Arc, ) -> Result<(), DynError> { let tx_hash = tx.hash(); - debug!(?tx_hash, "submitting transaction via cluster"); - ctx.cluster_client() - .try_all_clients(|client| { - let tx = Arc::clone(&tx); - Box::pin(async move { - let url = client.base_url().clone(); - debug!(?tx_hash, %url, "submitting transaction to client"); - let res = client - .submit_transaction(&tx) - .await - .map_err(|err| -> DynError { err.into() }); - if res.is_err() { - debug!(?tx_hash, %url, "transaction submission failed"); - } - res - }) - }) - .await + debug!( + ?tx_hash, + "submitting transaction via cluster (validators first)" + ); + + let node_clients = ctx.node_clients(); + let mut validator_clients: Vec<_> = node_clients.validator_clients().iter().collect(); + let mut executor_clients: Vec<_> = node_clients.executor_clients().iter().collect(); + validator_clients.shuffle(&mut thread_rng()); + executor_clients.shuffle(&mut thread_rng()); + + let clients = validator_clients.into_iter().chain(executor_clients); + let mut last_err = None; + + for client in clients { + let url = client.base_url().clone(); + debug!(?tx_hash, %url, "submitting transaction to client"); + match client + .submit_transaction(&tx) + .await + .map_err(|err| -> DynError { err.into() }) + { + Ok(()) => return Ok(()), + Err(err) => { + debug!(?tx_hash, %url, "transaction submission failed"); + last_err = Some(err); + } + } + } + + Err(last_err.unwrap_or_else(|| "cluster client exhausted all nodes".into())) }