diff --git a/examples/src/bin/k8s_runner.rs b/examples/src/bin/k8s_runner.rs index 087a764..a666d1f 100644 --- a/examples/src/bin/k8s_runner.rs +++ b/examples/src/bin/k8s_runner.rs @@ -13,8 +13,8 @@ const DEFAULT_RUN_SECS: u64 = 60; const DEFAULT_VALIDATORS: usize = 1; const DEFAULT_EXECUTORS: usize = 1; const MIXED_TXS_PER_BLOCK: u64 = 2; -const TOTAL_WALLETS: usize = 1000; -const TRANSACTION_WALLETS: usize = 500; +const TOTAL_WALLETS: usize = 200; +const TRANSACTION_WALLETS: usize = 50; const DA_BLOB_RATE: u64 = 1; const MIN_CONSENSUS_HEIGHT: u64 = 5; @@ -49,10 +49,6 @@ async fn run_k8s_case(validators: usize, executors: usize, run_duration: Duratio duration_secs = run_duration.as_secs(), "building scenario plan" ); - let enable_da = env::var("NOMOS_DEMO_DA") - .or_else(|_| env::var("K8S_DEMO_DA")) - .map(|value| value == "1" || value.eq_ignore_ascii_case("true")) - .unwrap_or(false); let mut scenario = ScenarioBuilder::topology_with(|t| { t.network_star().validators(validators).executors(executors) @@ -60,12 +56,9 @@ async fn run_k8s_case(validators: usize, executors: usize, run_duration: Duratio .with_capabilities(ObservabilityCapability::default()) .wallets(TOTAL_WALLETS) .transactions_with(|txs| txs.rate(MIXED_TXS_PER_BLOCK).users(TRANSACTION_WALLETS)) + .da_with(|da| da.blob_rate(DA_BLOB_RATE).headroom_percent(0)) .with_run_duration(run_duration); - if enable_da { - scenario = scenario.da_with(|da| da.blob_rate(DA_BLOB_RATE).headroom_percent(0)); - } - if let Ok(url) = env::var("K8S_RUNNER_METRICS_QUERY_URL") .or_else(|_| env::var("NOMOS_METRICS_QUERY_URL")) .or_else(|_| env::var("K8S_RUNNER_EXTERNAL_PROMETHEUS_URL")) diff --git a/testing-framework/workflows/src/workloads/da/expectation.rs b/testing-framework/workflows/src/workloads/da/expectation.rs index 4e78eb0..a340655 100644 --- a/testing-framework/workflows/src/workloads/da/expectation.rs +++ b/testing-framework/workflows/src/workloads/da/expectation.rs @@ -171,102 +171,163 @@ impl Expectation for DaWorkloadExpectation { } async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { - let state = self - .capture_state + let state = self.capture_state()?; + self.evaluate_inscriptions(state)?; + self.evaluate_blobs(ctx, state)?; + Ok(()) + } +} + +#[derive(Debug, Clone, Copy)] +struct BlockWindow { + observed_blocks: u64, + expected_blocks: u64, + effective_blocks: u64, +} + +#[derive(Debug)] +struct BlobObservation { + observed_total_blobs: u64, + channels_with_blobs: HashSet, +} + +impl DaWorkloadExpectation { + fn capture_state(&self) -> Result<&CaptureState, DynError> { + self.capture_state .as_ref() .ok_or(DaExpectationError::NotCaptured) - .map_err(DynError::from)?; + .map_err(DynError::from) + } + fn evaluate_inscriptions(&self, state: &CaptureState) -> Result<(), DynError> { let planned_total = state.planned.len(); - let missing_inscriptions = { - let inscriptions = state - .inscriptions - .lock() - .expect("inscription lock poisoned"); - missing_channels(&state.planned, &inscriptions) - }; + let missing_inscriptions = self.missing_inscriptions(state); let required_inscriptions = minimum_required(planned_total, MIN_INSCRIPTION_INCLUSION_RATIO); let observed_inscriptions = planned_total.saturating_sub(missing_inscriptions.len()); - if observed_inscriptions < required_inscriptions { - tracing::warn!( - planned = planned_total, - missing = missing_inscriptions.len(), - required = required_inscriptions, - "DA expectation missing inscriptions" - ); - return Err(DaExpectationError::MissingInscriptions { - planned: planned_total, - observed: observed_inscriptions, - required: required_inscriptions, - missing: missing_inscriptions, - } - .into()); + + if observed_inscriptions >= required_inscriptions { + return Ok(()); } - let observed_total_blobs = { - let blobs = state.blobs.lock().expect("blob lock poisoned"); - blobs.values().sum::() - }; + tracing::warn!( + planned = planned_total, + missing = missing_inscriptions.len(), + required = required_inscriptions, + "DA expectation missing inscriptions" + ); + Err(DaExpectationError::MissingInscriptions { + planned: planned_total, + observed: observed_inscriptions, + required: required_inscriptions, + missing: missing_inscriptions, + } + .into()) + } - let channels_with_blobs: HashSet = { - let blobs = state.blobs.lock().expect("blob lock poisoned"); - blobs - .iter() - .filter(|(_, count)| **count > 0) - .map(|(channel, _)| *channel) - .collect::>() - }; + fn evaluate_blobs(&self, ctx: &RunContext, state: &CaptureState) -> Result<(), DynError> { + let planned_total = state.planned.len(); + let BlobObservation { + observed_total_blobs, + channels_with_blobs, + } = self.observe_blobs(state); - let observed_blocks = state.run_blocks.load(Ordering::Relaxed).max(1); - let expected_blocks = ctx.run_metrics().expected_consensus_blocks().max(1); - let effective_blocks = observed_blocks.min(expected_blocks).max(1); - let expected_total_blobs = self - .blob_rate_per_block - .get() - .saturating_mul(effective_blocks); + let window = self.block_window(ctx, state); + let expected_total_blobs = self.expected_total_blobs(window.effective_blocks); + let required_blobs = minimum_required_u64(expected_total_blobs, MIN_BLOB_INCLUSION_RATIO); + + if observed_total_blobs >= required_blobs { + tracing::info!( + planned_channels = planned_total, + channels_with_blobs = channels_with_blobs.len(), + inscriptions_observed = planned_total - self.missing_inscriptions(state).len(), + observed_total_blobs, + expected_total_blobs, + required_blobs, + observed_blocks = window.observed_blocks, + expected_blocks = window.expected_blocks, + effective_blocks = window.effective_blocks, + "DA inclusion expectation satisfied" + ); + return Ok(()); + } let missing_blob_channels = missing_channels(&state.planned, &channels_with_blobs); - let required_blobs = minimum_required_u64(expected_total_blobs, MIN_BLOB_INCLUSION_RATIO); - if observed_total_blobs < required_blobs { - tracing::warn!( - expected_total_blobs, - observed_total_blobs, - required_blobs, - observed_blocks, - expected_blocks, - effective_blocks, - run_duration_secs = state.run_duration.as_secs(), - missing_blob_channels = missing_blob_channels.len(), - "DA expectation missing blobs" - ); - return Err(DaExpectationError::MissingBlobs { - expected_total_blobs, - observed_total_blobs, - required_blobs, - planned_channels: planned_total, - channels_with_blobs: channels_with_blobs.len(), - // Best-effort diagnostics: which planned channels never got any - // blob included. - missing: missing_blob_channels, - } - .into()); - } - - tracing::info!( - planned_channels = planned_total, - channels_with_blobs = channels_with_blobs.len(), - inscriptions_observed = planned_total - missing_inscriptions.len(), - observed_total_blobs, + tracing::warn!( expected_total_blobs, + observed_total_blobs, required_blobs, + observed_blocks = window.observed_blocks, + expected_blocks = window.expected_blocks, + effective_blocks = window.effective_blocks, + run_duration_secs = state.run_duration.as_secs(), + missing_blob_channels = missing_blob_channels.len(), + "DA expectation missing blobs" + ); + + Err(DaExpectationError::MissingBlobs { + expected_total_blobs, + observed_total_blobs, + required_blobs, + planned_channels: planned_total, + channels_with_blobs: channels_with_blobs.len(), + missing: missing_blob_channels, + } + .into()) + } + + fn missing_inscriptions(&self, state: &CaptureState) -> Vec { + let inscriptions = state + .inscriptions + .lock() + .expect("inscription lock poisoned"); + missing_channels(&state.planned, &inscriptions) + } + + fn observe_blobs(&self, state: &CaptureState) -> BlobObservation { + let blobs = state.blobs.lock().expect("blob lock poisoned"); + let observed_total_blobs = blobs.values().sum::(); + let channels_with_blobs = blobs + .iter() + .filter(|(_, count)| **count > 0) + .map(|(channel, _)| *channel) + .collect::>(); + + BlobObservation { + observed_total_blobs, + channels_with_blobs, + } + } + + fn block_window(&self, ctx: &RunContext, state: &CaptureState) -> BlockWindow { + let observed_blocks = state.run_blocks.load(Ordering::Relaxed).max(1); + let expected_blocks = ctx.run_metrics().expected_consensus_blocks().max(1); + let security_param = u64::from( + ctx.descriptors() + .config() + .consensus_params + .security_param + .get(), + ) + .max(1); + + let observed_inclusion_blocks = (observed_blocks / security_param).max(1); + let expected_inclusion_blocks = (expected_blocks / security_param).max(1); + let effective_blocks = observed_inclusion_blocks + .min(expected_inclusion_blocks) + .max(1); + + BlockWindow { observed_blocks, expected_blocks, effective_blocks, - "DA inclusion expectation satisfied" - ); + } + } - Ok(()) + fn expected_total_blobs(&self, effective_blocks: u64) -> u64 { + self.blob_rate_per_block + .get() + .saturating_mul(effective_blocks) } } diff --git a/testing-framework/workflows/src/workloads/da/workload.rs b/testing-framework/workflows/src/workloads/da/workload.rs index c1267ec..5f919aa 100644 --- a/testing-framework/workflows/src/workloads/da/workload.rs +++ b/testing-framework/workflows/src/workloads/da/workload.rs @@ -17,9 +17,7 @@ use nomos_core::{ use rand::{RngCore as _, seq::SliceRandom as _, thread_rng}; use testing_framework_core::{ nodes::ApiClient, - scenario::{ - BlockRecord, DynError, Expectation, RunContext, RunMetrics, Workload as ScenarioWorkload, - }, + scenario::{BlockRecord, DynError, Expectation, RunContext, Workload as ScenarioWorkload}, }; use tokio::{sync::broadcast, time::sleep}; @@ -96,7 +94,16 @@ impl ScenarioWorkload for Workload { self.headroom_percent, )); - let expected_blobs = planned_blob_count(self.blob_rate_per_block, &ctx.run_metrics()); + let expected_blobs = planned_blob_count( + self.blob_rate_per_block, + ctx.run_metrics().expected_consensus_blocks(), + ctx.descriptors() + .config() + .consensus_params + .security_param + .get() + .into(), + ); let per_channel_target = per_channel_blob_target(expected_blobs, planned_channels.len().max(1) as u64); @@ -303,9 +310,15 @@ pub fn planned_channel_count(channel_rate_per_block: NonZeroU64, headroom_percen } #[must_use] -pub fn planned_blob_count(blob_rate_per_block: NonZeroU64, run_metrics: &RunMetrics) -> u64 { - let expected_blocks = run_metrics.expected_consensus_blocks().max(1); - blob_rate_per_block.get().saturating_mul(expected_blocks) +pub fn planned_blob_count( + blob_rate_per_block: NonZeroU64, + expected_consensus_blocks: u64, + security_param: u64, +) -> u64 { + let expected_blocks = expected_consensus_blocks.max(1); + let security_param = security_param.max(1); + let inclusion_blocks = (expected_blocks / security_param).max(1); + blob_rate_per_block.get().saturating_mul(inclusion_blocks) } #[must_use] diff --git a/testing-framework/workflows/src/workloads/transaction/expectation.rs b/testing-framework/workflows/src/workloads/transaction/expectation.rs index 7a5ba00..53e20a7 100644 --- a/testing-framework/workflows/src/workloads/transaction/expectation.rs +++ b/testing-framework/workflows/src/workloads/transaction/expectation.rs @@ -5,6 +5,7 @@ use std::{ Arc, atomic::{AtomicU64, Ordering}, }, + time::Duration, }; use async_trait::async_trait; @@ -12,11 +13,13 @@ use key_management_system_service::keys::ZkPublicKey; use nomos_core::{header::HeaderId, mantle::AuthenticatedMantleTx as _}; use testing_framework_core::scenario::{DynError, Expectation, RunContext}; use thiserror::Error; -use tokio::sync::broadcast; +use tokio::{sync::broadcast, time::sleep}; use super::workload::{limited_user_count, submission_plan}; const MIN_INCLUSION_RATIO: f64 = 0.5; +const CATCHUP_POLL_INTERVAL: Duration = Duration::from_secs(1); +const MAX_CATCHUP_WAIT: Duration = Duration::from_secs(60); #[derive(Clone)] pub struct TxInclusionExpectation { @@ -142,21 +145,37 @@ impl Expectation for TxInclusionExpectation { Ok(()) } - async fn evaluate(&mut self, _ctx: &RunContext) -> Result<(), DynError> { + async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { let state = self .capture_state .as_ref() .ok_or(TxExpectationError::NotCaptured)?; - let observed = state.observed.load(Ordering::Relaxed); let required = ((state.expected as f64) * MIN_INCLUSION_RATIO).ceil() as u64; + let mut observed = state.observed.load(Ordering::Relaxed); + if observed < required { + let security_param = ctx.descriptors().config().consensus_params.security_param; + let hinted_wait = ctx + .run_metrics() + .block_interval_hint() + .map(|interval| interval.mul_f64(security_param.get() as f64)); + + let mut remaining = hinted_wait + .unwrap_or(MAX_CATCHUP_WAIT) + .min(MAX_CATCHUP_WAIT); + while observed < required && remaining > Duration::ZERO { + sleep(CATCHUP_POLL_INTERVAL).await; + remaining = remaining.saturating_sub(CATCHUP_POLL_INTERVAL); + observed = state.observed.load(Ordering::Relaxed); + } + } + if observed >= required { tracing::info!( observed, required, expected = state.expected, - min_inclusion_ratio = MIN_INCLUSION_RATIO, "tx inclusion expectation satisfied" ); Ok(()) diff --git a/testing-framework/workflows/src/workloads/transaction/workload.rs b/testing-framework/workflows/src/workloads/transaction/workload.rs index 536c91e..7d30fdb 100644 --- a/testing-framework/workflows/src/workloads/transaction/workload.rs +++ b/testing-framework/workflows/src/workloads/transaction/workload.rs @@ -29,6 +29,8 @@ use tokio::time::sleep; use super::expectation::TxInclusionExpectation; use crate::workloads::util::submit_transaction_via_cluster; +const MAX_SUBMISSION_INTERVAL: Duration = Duration::from_secs(1); + #[derive(Clone)] pub struct Workload { txs_per_block: NonZeroU64, @@ -304,8 +306,11 @@ pub(super) fn submission_plan( return Err("Transaction workload planning failed: calculated zero transactions to submit based on run duration and target rate".into()); } - let submission_interval = + let mut submission_interval = Duration::from_secs_f64(run_secs / actual_transactions_to_submit as f64); + if submission_interval > MAX_SUBMISSION_INTERVAL { + submission_interval = MAX_SUBMISSION_INTERVAL; + } Ok(SubmissionPlan { transaction_count: actual_transactions_to_submit, submission_interval,