Fix DA workload inclusions in compose runs

This commit is contained in:
andrussal 2025-12-13 11:36:47 +01:00
parent 6b6be07c23
commit a54eb53697
10 changed files with 240 additions and 94 deletions

View File

@ -30,7 +30,6 @@ allow = [
"BSD-3-Clause",
"BSL-1.0",
"CC0-1.0",
"CDDL-1.0",
"CDLA-Permissive-2.0",
"ISC",
"MIT",

4
Cargo.lock generated
View File

@ -6767,7 +6767,6 @@ dependencies = [
"chain-service",
"cryptarchia-engine",
"cryptarchia-sync",
"ed25519-dalek",
"groth16",
"hex",
"key-management-system-service",
@ -6795,7 +6794,6 @@ dependencies = [
"subnetworks-assignations",
"time",
"tracing",
"zksign",
]
[[package]]
@ -6843,6 +6841,7 @@ dependencies = [
"async-trait",
"cfgsync",
"groth16",
"key-management-system-service",
"nomos-core",
"nomos-ledger",
"nomos-tracing-service",
@ -6896,7 +6895,6 @@ version = "0.1.0"
dependencies = [
"async-trait",
"chain-service",
"ed25519-dalek",
"executor-http-client",
"futures",
"key-management-system-service",

View File

@ -16,7 +16,6 @@ chain-network = { workspace = true }
chain-service = { workspace = true }
cryptarchia-engine = { workspace = true, features = ["serde"] }
cryptarchia-sync = { workspace = true }
ed25519-dalek = { version = "2.2.0", features = ["rand_core", "serde"] }
groth16 = { workspace = true }
hex = { version = "0.4.3", default-features = false }
key-management-system-service = { workspace = true }
@ -44,7 +43,6 @@ serde = { workspace = true, features = ["derive"] }
subnetworks-assignations = { workspace = true }
time = { version = "0.3", default-features = true }
tracing = { workspace = true }
zksign = { workspace = true }
[lints]
workspace = true

View File

@ -69,7 +69,7 @@ impl Runner {
return Err(error);
}
Self::cooldown(&context).await;
Self::settle_before_expectations(&context).await;
if let Err(error) =
Self::run_expectations(scenario.expectations_mut(), context.as_ref()).await
@ -104,9 +104,40 @@ impl Runner {
{
let mut workloads = Self::spawn_workloads(scenario, context);
let _ = Self::drive_until_timer(&mut workloads, scenario.duration()).await?;
// Keep workloads running during the cooldown window so that late
// inclusions (especially DA parent-linked ops) still have a chance to
// land before expectations evaluate. We still abort everything at the
// end of cooldown to prevent leaking tasks across runs.
if let Some(cooldown) = Self::cooldown_duration(context.as_ref()) {
if !cooldown.is_zero() {
if workloads.is_empty() {
sleep(cooldown).await;
} else {
let _ = Self::drive_until_timer(&mut workloads, cooldown).await?;
}
}
}
Self::drain_workloads(&mut workloads).await
}
async fn settle_before_expectations(context: &Arc<RunContext>) {
// `BlockFeed` polls node storage on an interval. After we abort workloads
// we give the feed a moment to catch up with the last blocks that might
// include workload operations so expectations evaluate on a more stable
// snapshot.
let has_node_control = context.node_control().is_some();
let hint = context.run_metrics().block_interval_hint();
if !has_node_control && hint.is_none() {
return;
}
let mut wait = hint.unwrap_or_else(|| Duration::from_secs(1));
wait = wait.max(Duration::from_secs(2));
sleep(wait).await;
}
/// Evaluates every registered expectation, aggregating failures so callers
/// can see all missing conditions in a single report.
async fn run_expectations(
@ -133,14 +164,6 @@ impl Runner {
Err(ScenarioError::Expectations(summary.into()))
}
async fn cooldown(context: &Arc<RunContext>) {
if let Some(wait) = Self::cooldown_duration(context) {
if !wait.is_zero() {
sleep(wait).await;
}
}
}
fn cooldown_duration(context: &RunContext) -> Option<Duration> {
let metrics = context.run_metrics();
let needs_stabilization = context.node_control().is_some();
@ -149,6 +172,19 @@ impl Runner {
return None;
}
let mut wait = interval.mul_f64(5.0);
// Expectations observe blocks via `BlockFeed`, which ultimately
// follows the chain information returned by `consensus_info`.
// When the consensus uses a security parameter (finality depth),
// newly included operations can take ~k blocks to show up in the
// observable chain. Short smoke runs otherwise end up evaluating
// before finality catches up, systematically failing inclusion
// expectations (especially for DA, where ops are parent-linked).
let security_param = context
.descriptors()
.config()
.consensus_params
.security_param;
wait = wait.max(interval.mul_f64(security_param.get() as f64));
if needs_stabilization {
let minimum = Duration::from_secs(30);
wait = wait.max(minimum);

View File

@ -28,9 +28,10 @@ url = { version = "2" }
uuid = { version = "1", features = ["v4"] }
[dev-dependencies]
groth16 = { workspace = true }
nomos-core = { workspace = true }
nomos-ledger = { workspace = true }
nomos-tracing-service = { workspace = true }
tests = { workspace = true }
zksign = { workspace = true }
groth16 = { workspace = true }
key-management-system-service = { workspace = true }
nomos-core = { workspace = true }
nomos-ledger = { workspace = true }
nomos-tracing-service = { workspace = true }
tests = { workspace = true }
zksign = { workspace = true }

View File

@ -91,7 +91,7 @@ mod tests {
host::{Host, PortOverrides},
};
use groth16::Fr;
use key_management_system_keys::keys::ZkPublicKey;
use key_management_system_service::keys::ZkPublicKey;
use nomos_core::{
mantle::{GenesisTx as GenesisTxTrait, ledger::NoteId},
sdp::{ProviderId, ServiceType},

View File

@ -15,7 +15,6 @@ workspace = true
[dependencies]
async-trait = "0.1"
chain-service = { workspace = true }
ed25519-dalek = { version = "2.2.0", features = ["rand_core", "serde"] }
executor-http-client = { workspace = true }
futures = "0.3"
key-management-system-service = { workspace = true }

View File

@ -1,7 +1,11 @@
use std::{
collections::{HashMap, HashSet},
num::NonZeroU64,
sync::{Arc, Mutex},
sync::{
Arc, Mutex,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use async_trait::async_trait;
@ -13,7 +17,7 @@ use testing_framework_core::scenario::{BlockRecord, DynError, Expectation, RunCo
use thiserror::Error;
use tokio::sync::broadcast;
use super::workload::{planned_blob_count, planned_channel_count, planned_channel_ids};
use super::workload::{planned_channel_count, planned_channel_ids};
#[derive(Debug)]
pub struct DaWorkloadExpectation {
@ -28,7 +32,8 @@ struct CaptureState {
planned: Arc<HashSet<ChannelId>>,
inscriptions: Arc<Mutex<HashSet<ChannelId>>>,
blobs: Arc<Mutex<HashMap<ChannelId, u64>>>,
expected_total_blobs: u64,
run_blocks: Arc<AtomicU64>,
run_duration: Duration,
}
const MIN_INCLUSION_RATIO: f64 = 0.8;
@ -37,10 +42,26 @@ const MIN_INCLUSION_RATIO: f64 = 0.8;
enum DaExpectationError {
#[error("da workload expectation not started")]
NotCaptured,
#[error("missing inscriptions for {missing:?}")]
MissingInscriptions { missing: Vec<ChannelId> },
#[error("missing blobs for {missing:?}")]
MissingBlobs { missing: Vec<ChannelId> },
#[error(
"missing inscriptions: observed={observed}/{planned} required={required} missing={missing:?}"
)]
MissingInscriptions {
planned: usize,
observed: usize,
required: usize,
missing: Vec<ChannelId>,
},
#[error(
"missing blobs: observed_total_blobs={observed_total_blobs} expected_total_blobs={expected_total_blobs} required_blobs={required_blobs} channels_with_blobs={channels_with_blobs}/{planned_channels} missing_channels={missing:?}"
)]
MissingBlobs {
expected_total_blobs: u64,
observed_total_blobs: u64,
required_blobs: u64,
planned_channels: usize,
channels_with_blobs: usize,
missing: Vec<ChannelId>,
},
}
impl DaWorkloadExpectation {
@ -75,19 +96,42 @@ impl Expectation for DaWorkloadExpectation {
self.headroom_percent,
));
let expected_total_blobs = planned_blob_count(self.blob_rate_per_block, &ctx.run_metrics());
let run_duration = ctx.run_metrics().run_duration();
tracing::info!(
planned_channels = planned_ids.len(),
blob_rate_per_block = self.blob_rate_per_block.get(),
headroom_percent = self.headroom_percent,
expected_total_blobs,
run_duration_secs = run_duration.as_secs(),
"DA inclusion expectation starting capture"
);
let planned = Arc::new(planned_ids.iter().copied().collect::<HashSet<_>>());
let inscriptions = Arc::new(Mutex::new(HashSet::new()));
let blobs = Arc::new(Mutex::new(HashMap::new()));
let run_blocks = Arc::new(AtomicU64::new(0));
{
let run_blocks = Arc::clone(&run_blocks);
let mut receiver = ctx.block_feed().subscribe();
tokio::spawn(async move {
let timer = tokio::time::sleep(run_duration);
tokio::pin!(timer);
loop {
tokio::select! {
_ = &mut timer => break,
result = receiver.recv() => match result {
Ok(_) => {
run_blocks.fetch_add(1, Ordering::Relaxed);
}
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
});
}
let mut receiver = ctx.block_feed().subscribe();
let planned_for_task = Arc::clone(&planned);
@ -118,7 +162,8 @@ impl Expectation for DaWorkloadExpectation {
planned,
inscriptions,
blobs,
expected_total_blobs,
run_blocks,
run_duration,
});
Ok(())
@ -140,7 +185,8 @@ impl Expectation for DaWorkloadExpectation {
missing_channels(&state.planned, &inscriptions)
};
let required_inscriptions = minimum_required(planned_total, MIN_INCLUSION_RATIO);
if planned_total.saturating_sub(missing_inscriptions.len()) < required_inscriptions {
let observed_inscriptions = planned_total.saturating_sub(missing_inscriptions.len());
if observed_inscriptions < required_inscriptions {
tracing::warn!(
planned = planned_total,
missing = missing_inscriptions.len(),
@ -148,6 +194,9 @@ impl Expectation for DaWorkloadExpectation {
"DA expectation missing inscriptions"
);
return Err(DaExpectationError::MissingInscriptions {
planned: planned_total,
observed: observed_inscriptions,
required: required_inscriptions,
missing: missing_inscriptions,
}
.into());
@ -157,16 +206,43 @@ impl Expectation for DaWorkloadExpectation {
let blobs = state.blobs.lock().expect("blob lock poisoned");
blobs.values().sum::<u64>()
};
let required_blobs = minimum_required_u64(state.expected_total_blobs, MIN_INCLUSION_RATIO);
let channels_with_blobs: HashSet<ChannelId> = {
let blobs = state.blobs.lock().expect("blob lock poisoned");
blobs
.iter()
.filter(|(_, count)| **count > 0)
.map(|(channel, _)| *channel)
.collect::<HashSet<_>>()
};
let observed_blocks = state.run_blocks.load(Ordering::Relaxed).max(1);
let expected_total_blobs = self
.blob_rate_per_block
.get()
.saturating_mul(observed_blocks);
let missing_blob_channels = missing_channels(&state.planned, &channels_with_blobs);
let required_blobs = minimum_required_u64(expected_total_blobs, MIN_INCLUSION_RATIO);
if observed_total_blobs < required_blobs {
tracing::warn!(
planned = state.expected_total_blobs,
observed = observed_total_blobs,
required = required_blobs,
expected_total_blobs,
observed_total_blobs,
required_blobs,
observed_blocks,
run_duration_secs = state.run_duration.as_secs(),
missing_blob_channels = missing_blob_channels.len(),
"DA expectation missing blobs"
);
return Err(DaExpectationError::MissingBlobs {
missing: Vec::new(),
expected_total_blobs,
observed_total_blobs,
required_blobs,
planned_channels: planned_total,
channels_with_blobs: channels_with_blobs.len(),
// Best-effort diagnostics: which planned channels never got any
// blob included.
missing: missing_blob_channels,
}
.into());
}

View File

@ -1,18 +1,20 @@
use std::{num::NonZeroU64, sync::Arc, time::Duration};
use async_trait::async_trait;
use ed25519_dalek::SigningKey;
use executor_http_client::ExecutorHttpClient;
use futures::future::try_join_all;
use key_management_system_service::keys::Ed25519PublicKey;
use key_management_system_service::keys::{Ed25519Key, Ed25519PublicKey};
use nomos_core::{
da::BlobId,
mantle::ops::{
Op,
channel::{ChannelId, MsgId},
mantle::{
AuthenticatedMantleTx as _,
ops::{
Op,
channel::{ChannelId, MsgId},
},
},
};
use rand::{Rng as _, RngCore as _, seq::SliceRandom as _, thread_rng};
use rand::{RngCore as _, seq::SliceRandom as _, thread_rng};
use testing_framework_core::{
nodes::ApiClient,
scenario::{
@ -30,8 +32,7 @@ use crate::{
const TEST_KEY_BYTES: [u8; 32] = [0u8; 32];
const DEFAULT_BLOB_RATE_PER_BLOCK: u64 = 1;
const DEFAULT_CHANNEL_RATE_PER_BLOCK: u64 = 1;
const MIN_BLOB_CHUNKS: usize = 1;
const MAX_BLOB_CHUNKS: usize = 8;
const BLOB_CHUNK_OPTIONS: &[usize] = &[1, 2, 4, 8];
const PUBLISH_RETRIES: usize = 5;
const PUBLISH_RETRY_DELAY: Duration = Duration::from_secs(2);
const DEFAULT_HEADROOM_PERCENT: u64 = 20;
@ -112,9 +113,8 @@ impl ScenarioWorkload for Workload {
try_join_all(planned_channels.into_iter().map(|channel_id| {
let ctx = ctx;
async move {
let mut receiver = ctx.block_feed().subscribe();
tracing::info!(channel_id = ?channel_id, blobs = per_channel_target, "DA workload starting channel flow");
run_channel_flow(ctx, &mut receiver, channel_id, per_channel_target).await?;
run_channel_flow(ctx, channel_id, per_channel_target).await?;
tracing::info!(channel_id = ?channel_id, "DA workload finished channel flow");
Ok::<(), DynError>(())
}
@ -128,22 +128,31 @@ impl ScenarioWorkload for Workload {
async fn run_channel_flow(
ctx: &RunContext,
receiver: &mut broadcast::Receiver<Arc<BlockRecord>>,
channel_id: ChannelId,
target_blobs: u64,
) -> Result<(), DynError> {
tracing::debug!(channel_id = ?channel_id, "DA: submitting inscription tx");
let tx = Arc::new(tx::create_inscription_transaction_with_id(channel_id));
submit_transaction_via_cluster(ctx, Arc::clone(&tx)).await?;
let inscription_tx = Arc::new(tx::create_inscription_transaction_with_id(channel_id));
submit_transaction_via_cluster(ctx, Arc::clone(&inscription_tx)).await?;
let mut receiver = ctx.block_feed().subscribe();
let inscription_id = wait_for_inscription(&mut receiver, channel_id).await?;
let inscription_id = wait_for_inscription(receiver, channel_id).await?;
tracing::debug!(channel_id = ?channel_id, inscription_id = ?inscription_id, "DA: inscription observed");
let mut parent_id = inscription_id;
for idx in 0..target_blobs {
let payload = random_blob_payload();
let published_blob_id = publish_blob(ctx, channel_id, parent_id, payload).await?;
let (next_parent, included_blob_id) =
wait_for_blob_with_parent(&mut receiver, channel_id, parent_id).await?;
parent_id = next_parent;
for _ in 0..target_blobs {
let blob_id = publish_blob(ctx, channel_id, parent_id).await?;
tracing::debug!(channel_id = ?channel_id, blob_id = ?blob_id, "DA: blob published");
parent_id = wait_for_blob(receiver, channel_id, blob_id).await?;
tracing::debug!(
channel_id = ?channel_id,
blob_index = idx,
published_blob_id = ?published_blob_id,
included_blob_id = ?included_blob_id,
"DA: blob published"
);
}
Ok(())
}
@ -164,22 +173,32 @@ async fn wait_for_inscription(
.await
}
async fn wait_for_blob(
async fn wait_for_blob_with_parent(
receiver: &mut broadcast::Receiver<Arc<BlockRecord>>,
channel_id: ChannelId,
blob_id: BlobId,
) -> Result<MsgId, DynError> {
wait_for_channel_op(receiver, move |op| {
if let Op::ChannelBlob(blob_op) = op
&& blob_op.channel == channel_id
&& blob_op.blob == blob_id
{
Some(blob_op.id())
} else {
None
parent_msg: MsgId,
) -> Result<(MsgId, BlobId), DynError> {
loop {
match receiver.recv().await {
Ok(record) => {
for tx in record.block.transactions() {
for op in &tx.mantle_tx().ops {
if let Op::ChannelBlob(blob_op) = op
&& blob_op.channel == channel_id
&& blob_op.parent == parent_msg
{
let msg_id = blob_op.id();
return Ok((msg_id, blob_op.blob));
}
}
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => {
return Err("block feed closed while waiting for channel operations".into());
}
}
})
.await
}
}
async fn wait_for_channel_op<F>(
@ -209,16 +228,14 @@ async fn publish_blob(
ctx: &RunContext,
channel_id: ChannelId,
parent_msg: MsgId,
data: Vec<u8>,
) -> Result<BlobId, DynError> {
let executors = ctx.node_clients().executor_clients();
if executors.is_empty() {
return Err("da workload requires at least one executor".into());
}
let signer: Ed25519PublicKey = SigningKey::from_bytes(&TEST_KEY_BYTES)
.verifying_key()
.into();
let data = random_blob_payload();
let signer = test_signer();
tracing::debug!(channel = ?channel_id, payload_bytes = data.len(), "DA: prepared blob payload");
let client = ExecutorHttpClient::new(None);
@ -248,9 +265,17 @@ async fn publish_blob(
Err(last_err.unwrap_or_else(|| "da workload could not publish blob".into()))
}
fn test_signer() -> Ed25519PublicKey {
Ed25519Key::from_bytes(&TEST_KEY_BYTES).public_key()
}
fn random_blob_payload() -> Vec<u8> {
let mut rng = thread_rng();
let chunks = rng.gen_range(MIN_BLOB_CHUNKS..=MAX_BLOB_CHUNKS);
// KZGRS encoder expects the polynomial degree to be a power of two, which
// effectively constrains the blob chunk count.
let chunks = *BLOB_CHUNK_OPTIONS
.choose(&mut rng)
.expect("non-empty chunk options");
let mut data = vec![0u8; 31 * chunks];
rng.fill_bytes(&mut data);
data

View File

@ -7,6 +7,7 @@ use nomos_core::{
ops::{Op, channel::MsgId},
},
};
use rand::{seq::SliceRandom as _, thread_rng};
use testing_framework_core::scenario::{DynError, RunContext};
use tracing::debug;
@ -38,22 +39,35 @@ pub async fn submit_transaction_via_cluster(
tx: Arc<SignedMantleTx>,
) -> Result<(), DynError> {
let tx_hash = tx.hash();
debug!(?tx_hash, "submitting transaction via cluster");
ctx.cluster_client()
.try_all_clients(|client| {
let tx = Arc::clone(&tx);
Box::pin(async move {
let url = client.base_url().clone();
debug!(?tx_hash, %url, "submitting transaction to client");
let res = client
.submit_transaction(&tx)
.await
.map_err(|err| -> DynError { err.into() });
if res.is_err() {
debug!(?tx_hash, %url, "transaction submission failed");
}
res
})
})
.await
debug!(
?tx_hash,
"submitting transaction via cluster (validators first)"
);
let node_clients = ctx.node_clients();
let mut validator_clients: Vec<_> = node_clients.validator_clients().iter().collect();
let mut executor_clients: Vec<_> = node_clients.executor_clients().iter().collect();
validator_clients.shuffle(&mut thread_rng());
executor_clients.shuffle(&mut thread_rng());
let clients = validator_clients.into_iter().chain(executor_clients);
let mut last_err = None;
for client in clients {
let url = client.base_url().clone();
debug!(?tx_hash, %url, "submitting transaction to client");
match client
.submit_transaction(&tx)
.await
.map_err(|err| -> DynError { err.into() })
{
Ok(()) => return Ok(()),
Err(err) => {
debug!(?tx_hash, %url, "transaction submission failed");
last_err = Some(err);
}
}
}
Err(last_err.unwrap_or_else(|| "cluster client exhausted all nodes".into()))
}