Remove DA

Remove DA workload usage from framework
This commit is contained in:
Andrus Salumets 2026-01-22 11:32:47 +01:00 committed by GitHub
commit 28b788eaac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 27 additions and 972 deletions

View File

@ -45,7 +45,7 @@ jobs:
sudo apt-get install -y clang llvm-dev libclang-dev pkg-config cmake libssl-dev rsync libgmp10 libgmp-dev libgomp1 nasm
- name: Install nomos circuits
run: |
./scripts/setup/setup-nomos-circuits.sh "${VERSION}" "$NOMOS_CIRCUITS"
./scripts/setup/setup-logos-blockchain-circuits.sh "${VERSION}" "$NOMOS_CIRCUITS"
echo "NOMOS_CIRCUITS=$NOMOS_CIRCUITS" >> "$GITHUB_ENV"
- name: Add top-level KZG params file
run: |

View File

@ -37,7 +37,7 @@ jobs:
: "${NOMOS_BUNDLE_VERSION:?Missing NOMOS_BUNDLE_VERSION}"
- name: Install nomos circuits
run: |
./scripts/setup/setup-nomos-circuits.sh "${VERSION}" "$HOME/.nomos-circuits"
./scripts/setup/setup-logos-blockchain-circuits.sh "${VERSION}" "$HOME/.nomos-circuits"
echo "NOMOS_CIRCUITS=$HOME/.nomos-circuits" >> "$GITHUB_ENV"
- uses: dtolnay/rust-toolchain@master
with:
@ -78,7 +78,7 @@ jobs:
: "${NOMOS_BUNDLE_VERSION:?Missing NOMOS_BUNDLE_VERSION}"
- name: Install nomos circuits
run: |
./scripts/setup/setup-nomos-circuits.sh "${VERSION}" "$HOME/.nomos-circuits"
./scripts/setup/setup-logos-blockchain-circuits.sh "${VERSION}" "$HOME/.nomos-circuits"
echo "NOMOS_CIRCUITS=$HOME/.nomos-circuits" >> "$GITHUB_ENV"
- uses: dtolnay/rust-toolchain@master
with:
@ -119,7 +119,7 @@ jobs:
: "${NOMOS_BUNDLE_VERSION:?Missing NOMOS_BUNDLE_VERSION}"
- name: Install nomos circuits
run: |
./scripts/setup/setup-nomos-circuits.sh "${VERSION}" "$HOME/.nomos-circuits"
./scripts/setup/setup-logos-blockchain-circuits.sh "${VERSION}" "$HOME/.nomos-circuits"
echo "NOMOS_CIRCUITS=$HOME/.nomos-circuits" >> "$GITHUB_ENV"
- uses: dtolnay/rust-toolchain@master
with:
@ -154,7 +154,7 @@ jobs:
: "${NOMOS_BUNDLE_VERSION:?Missing NOMOS_BUNDLE_VERSION}"
- name: Install nomos circuits
run: |
./scripts/setup/setup-nomos-circuits.sh "${VERSION}" "$HOME/.nomos-circuits"
./scripts/setup/setup-logos-blockchain-circuits.sh "${VERSION}" "$HOME/.nomos-circuits"
echo "NOMOS_CIRCUITS=$HOME/.nomos-circuits" >> "$GITHUB_ENV"
- uses: dtolnay/rust-toolchain@master
with:
@ -220,7 +220,7 @@ jobs:
: "${NOMOS_BUNDLE_VERSION:?Missing NOMOS_BUNDLE_VERSION}"
- name: Install nomos circuits
run: |
./scripts/setup/setup-nomos-circuits.sh "${VERSION}" "$HOME/.nomos-circuits"
./scripts/setup/setup-logos-blockchain-circuits.sh "${VERSION}" "$HOME/.nomos-circuits"
echo "NOMOS_CIRCUITS=$HOME/.nomos-circuits" >> "$GITHUB_ENV"
- uses: dtolnay/rust-toolchain@master
with:

2
Cargo.lock generated
View File

@ -7101,10 +7101,8 @@ name = "testing-framework-workflows"
version = "0.1.0"
dependencies = [
"async-trait",
"futures",
"logos-blockchain-chain-service",
"logos-blockchain-core",
"logos-blockchain-executor-http-client",
"logos-blockchain-key-management-system-service",
"rand 0.8.5",
"reqwest",

View File

@ -9,7 +9,6 @@ pub fn scenario_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
.wallets(50)
.transactions_with(|txs| txs.rate(5).users(20))
.da_with(|da| da.channel_rate(1).blob_rate(2))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(90))
.build()

View File

@ -12,11 +12,6 @@ pub async fn run_test() -> Result<()> {
txs.rate(5) // 5 transactions per block
.users(20)
})
.da_with(|da| {
da.channel_rate(1) // number of DA channels
.blob_rate(2) // target 2 blobs per block
.headroom_percent(20) // optional channel headroom
})
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(90))
.build()?;

View File

@ -1,15 +0,0 @@
use testing_framework_core::scenario::{Scenario, ScenarioBuilder};
use testing_framework_workflows::ScenarioBuilderExt;
use crate::SnippetResult;
pub fn da_plan() -> SnippetResult<Scenario<()>> {
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(1))
.wallets(50)
.da_with(|da| {
da.channel_rate(1) // number of DA channels to run
.blob_rate(2) // target 2 blobs per block (headroom applied)
.headroom_percent(20) // optional headroom when sizing channels
}) // Finish DA workload config
.build()
}

View File

@ -9,7 +9,6 @@ pub async fn sustained_load_test() -> Result<()> {
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(4).executors(2))
.wallets(100)
.transactions_with(|txs| txs.rate(15).users(50))
.da_with(|da| da.channel_rate(2).blob_rate(3))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(300))
.build()?;

View File

@ -5,11 +5,10 @@ use testing_framework_core::scenario::{Deployer, ScenarioBuilder};
use testing_framework_runner_local::LocalDeployer;
use testing_framework_workflows::ScenarioBuilderExt;
pub async fn da_and_transactions() -> Result<()> {
pub async fn transactions_multi_node() -> Result<()> {
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
.wallets(30)
.transactions_with(|txs| txs.rate(5).users(15))
.da_with(|da| da.channel_rate(2).blob_rate(2))
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(90))
.build()?;

View File

@ -16,7 +16,6 @@ mod dsl_cheat_sheet_topology;
mod dsl_cheat_sheet_transactions_workload;
mod dsl_cheat_sheet_wallets;
mod dsl_cheat_sheet_workload_chaos;
mod dsl_cheat_sheet_workload_da;
mod dsl_cheat_sheet_workload_execution;
mod examples_advanced_aggressive_chaos_test;
mod examples_advanced_load_progression_test;

View File

@ -6,18 +6,13 @@ use testing_framework_runner_local::LocalDeployer;
use testing_framework_workflows::ScenarioBuilderExt;
pub async fn run_local_demo() -> Result<()> {
// Define the scenario (1 validator + 1 executor, tx + DA workload)
// Define the scenario (1 validator + 1 executor, tx workload)
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(1))
.wallets(1_000)
.transactions_with(|txs| {
txs.rate(5) // 5 transactions per block
.users(500) // use 500 of the seeded wallets
})
.da_with(|da| {
da.channel_rate(1) // 1 channel
.blob_rate(1) // target 1 blob per block
.headroom_percent(20) // default headroom when sizing channels
})
.expect_consensus_liveness()
.with_run_duration(Duration::from_secs(60))
.build()?;

View File

@ -4,6 +4,6 @@ pub fn step_1_topology() -> testing_framework_core::scenario::Builder<()> {
ScenarioBuilder::topology_with(|t| {
t.network_star() // Star topology: all nodes connect to seed
.validators(1) // 1 validator node
.executors(1) // 1 executor node (validator + DA dispersal)
.executors(1) // 1 executor node
})
}

View File

@ -8,9 +8,4 @@ pub fn step_3_workloads() -> testing_framework_core::scenario::Builder<()> {
txs.rate(5) // 5 transactions per block
.users(500) // Use 500 of the 1,000 wallets
})
.da_with(|da| {
da.channel_rate(1) // 1 DA channel (more spawned with headroom)
.blob_rate(1) // target 1 blob per block
.headroom_percent(20) // default headroom when sizing channels
})
}

View File

@ -61,7 +61,7 @@ async fn run_local_case(validators: usize, executors: usize, run_duration: Durat
let mut plan = scenario.build()?;
let deployer = LocalDeployer::default().with_membership_check(true);
let deployer = LocalDeployer::default();
info!("deploying local nodes");
let runner: Runner = deployer

View File

@ -145,6 +145,10 @@ impl GeneratedTopology {
wait_for_network_readiness(self, &client, &endpoints, &labels).await?;
if validator_membership_endpoints.is_none() && executor_membership_endpoints.is_none() {
return Ok(());
}
let membership_endpoints = collect_membership_endpoints(
self,
total_nodes,

View File

View File

@ -160,24 +160,8 @@ pub async fn ensure_remote_readiness_with_ports(
.map(|ports| readiness_url(HttpNodeRole::Executor, ports.api))
.collect::<Result<Vec<_>, _>>()?;
let validator_membership_urls = mapping
.validators
.iter()
.map(|ports| readiness_url(HttpNodeRole::Validator, ports.testing))
.collect::<Result<Vec<_>, _>>()?;
let executor_membership_urls = mapping
.executors
.iter()
.map(|ports| readiness_url(HttpNodeRole::Executor, ports.testing))
.collect::<Result<Vec<_>, _>>()?;
descriptors
.wait_remote_readiness(
&validator_urls,
&executor_urls,
Some(&validator_membership_urls),
Some(&executor_membership_urls),
)
.wait_remote_readiness(&validator_urls, &executor_urls, None, None)
.await
.map_err(|source| StackReadinessError::Remote { source })
}

View File

@ -15,9 +15,7 @@ use tracing::{debug, info};
/// Spawns validators and executors as local processes, reusing the existing
/// integration harness.
#[derive(Clone)]
pub struct LocalDeployer {
membership_check: bool,
}
pub struct LocalDeployer {}
/// Errors surfaced by the local deployer while driving a scenario.
#[derive(Debug, Error)]
@ -63,10 +61,9 @@ impl Deployer<()> for LocalDeployer {
info!(
validators = scenario.topology().validators().len(),
executors = scenario.topology().executors().len(),
membership_checks = self.membership_check,
"starting local deployment"
);
let topology = Self::prepare_topology(scenario, self.membership_check).await?;
let topology = Self::prepare_topology(scenario).await?;
let node_clients = NodeClients::from_topology(scenario.topology(), &topology);
let (block_feed, block_feed_guard) = spawn_block_feed_with(&node_clients).await?;
@ -87,22 +84,12 @@ impl Deployer<()> for LocalDeployer {
impl LocalDeployer {
#[must_use]
/// Construct with membership readiness checks enabled.
/// Construct a local deployer.
pub fn new() -> Self {
Self::default()
}
#[must_use]
/// Enable or disable membership readiness probes.
pub const fn with_membership_check(mut self, enabled: bool) -> Self {
self.membership_check = enabled;
self
}
async fn prepare_topology(
scenario: &Scenario<()>,
membership_check: bool,
) -> Result<Topology, LocalDeployerError> {
async fn prepare_topology(scenario: &Scenario<()>) -> Result<Topology, LocalDeployerError> {
let descriptors = scenario.topology();
info!(
validators = descriptors.validators().len(),
@ -115,13 +102,10 @@ impl LocalDeployer {
.await
.map_err(|source| LocalDeployerError::Spawn { source })?;
let skip_membership = !membership_check;
wait_for_readiness(&topology, skip_membership)
.await
.map_err(|source| {
debug!(error = ?source, "local readiness failed");
LocalDeployerError::ReadinessFailed { source }
})?;
wait_for_readiness(&topology).await.map_err(|source| {
debug!(error = ?source, "local readiness failed");
LocalDeployerError::ReadinessFailed { source }
})?;
info!("local nodes are ready");
Ok(topology)
@ -130,27 +114,14 @@ impl LocalDeployer {
impl Default for LocalDeployer {
fn default() -> Self {
Self {
membership_check: true,
}
Self {}
}
}
async fn wait_for_readiness(
topology: &Topology,
skip_membership: bool,
) -> Result<(), ReadinessError> {
async fn wait_for_readiness(topology: &Topology) -> Result<(), ReadinessError> {
info!("waiting for local network readiness");
topology.wait_network_ready().await?;
if skip_membership {
// Allow callers to bypass deeper readiness for lightweight demos.
return Ok(());
}
info!("waiting for membership readiness");
topology.wait_membership_ready().await?;
info!("waiting for DA balancer readiness");
topology.wait_da_balancer_ready().await
Ok(())
}
async fn spawn_block_feed_with(

View File

@ -15,8 +15,6 @@ workspace = true
[dependencies]
async-trait = "0.1"
chain-service = { workspace = true }
executor-http-client = { workspace = true }
futures = "0.3"
key-management-system-service = { workspace = true }
nomos-core = { workspace = true }
rand = { workspace = true }

View File

@ -10,7 +10,7 @@ use testing_framework_core::{
use crate::{
expectations::ConsensusLiveness,
workloads::{chaos::RandomRestartWorkload, da, transaction},
workloads::{chaos::RandomRestartWorkload, transaction},
};
#[derive(Debug, thiserror::Error)]
@ -35,15 +35,6 @@ pub trait ScenarioBuilderExt<Caps>: Sized {
self,
f: impl FnOnce(TransactionFlowBuilder<Caps>) -> TransactionFlowBuilder<Caps>,
) -> CoreScenarioBuilder<Caps>;
/// Configure a data-availability workload.
fn da(self) -> DataAvailabilityFlowBuilder<Caps>;
/// Configure a data-availability workload via closure.
fn da_with(
self,
f: impl FnOnce(DataAvailabilityFlowBuilder<Caps>) -> DataAvailabilityFlowBuilder<Caps>,
) -> CoreScenarioBuilder<Caps>;
#[must_use]
/// Attach a consensus liveness expectation.
fn expect_consensus_liveness(self) -> Self;
@ -65,17 +56,6 @@ impl<Caps> ScenarioBuilderExt<Caps> for CoreScenarioBuilder<Caps> {
f(self.transactions()).apply()
}
fn da(self) -> DataAvailabilityFlowBuilder<Caps> {
DataAvailabilityFlowBuilder::new(self)
}
fn da_with(
self,
f: impl FnOnce(DataAvailabilityFlowBuilder<Caps>) -> DataAvailabilityFlowBuilder<Caps>,
) -> CoreScenarioBuilder<Caps> {
f(self.da()).apply()
}
fn expect_consensus_liveness(self) -> Self {
self.with_expectation(ConsensusLiveness::default())
}
@ -498,112 +478,6 @@ impl<Caps> TransactionFlowBuilder<Caps> {
}
}
/// Builder for data availability workloads.
pub struct DataAvailabilityFlowBuilder<Caps> {
builder: CoreScenarioBuilder<Caps>,
channel_rate: NonZeroU64,
blob_rate: NonZeroU64,
headroom_percent: u64,
}
impl<Caps> DataAvailabilityFlowBuilder<Caps> {
const fn default_channel_rate() -> NonZeroU64 {
NonZeroU64::MIN
}
const fn default_blob_rate() -> NonZeroU64 {
NonZeroU64::MIN
}
const fn new(builder: CoreScenarioBuilder<Caps>) -> Self {
Self {
builder,
channel_rate: Self::default_channel_rate(),
blob_rate: Self::default_blob_rate(),
headroom_percent: da::Workload::default_headroom_percent(),
}
}
#[must_use]
/// Set the number of DA channels to run (ignores zero).
pub fn channel_rate(mut self, rate: u64) -> Self {
match NonZeroU64::new(rate) {
Some(rate) => self.channel_rate = rate,
None => tracing::warn!(
rate,
"DA channel rate must be non-zero; keeping previous rate"
),
}
self
}
/// Like `channel_rate`, but returns an error instead of panicking.
pub fn try_channel_rate(self, rate: u64) -> Result<Self, BuilderInputError> {
let Some(rate) = NonZeroU64::new(rate) else {
return Err(BuilderInputError::ZeroValue {
field: "da_channel_rate",
});
};
Ok(self.channel_rate_per_block(rate))
}
#[must_use]
/// Set the number of DA channels to run.
pub const fn channel_rate_per_block(mut self, rate: NonZeroU64) -> Self {
self.channel_rate = rate;
self
}
#[must_use]
/// Set blob publish rate (per block).
pub fn blob_rate(mut self, rate: u64) -> Self {
match NonZeroU64::new(rate) {
Some(rate) => self.blob_rate = rate,
None => tracing::warn!(rate, "DA blob rate must be non-zero; keeping previous rate"),
}
self
}
/// Like `blob_rate`, but returns an error instead of panicking.
pub fn try_blob_rate(self, rate: u64) -> Result<Self, BuilderInputError> {
let Some(rate) = NonZeroU64::new(rate) else {
return Err(BuilderInputError::ZeroValue {
field: "da_blob_rate",
});
};
Ok(self.blob_rate_per_block(rate))
}
#[must_use]
/// Set blob publish rate per block.
pub const fn blob_rate_per_block(mut self, rate: NonZeroU64) -> Self {
self.blob_rate = rate;
self
}
#[must_use]
/// Apply headroom when converting blob rate into channel count.
pub const fn headroom_percent(mut self, percent: u64) -> Self {
self.headroom_percent = percent;
self
}
#[must_use]
pub fn apply(mut self) -> CoreScenarioBuilder<Caps> {
let workload =
da::Workload::with_rate(self.blob_rate, self.channel_rate, self.headroom_percent);
tracing::info!(
channel_rate = self.channel_rate.get(),
blob_rate = self.blob_rate.get(),
headroom_percent = self.headroom_percent,
"attaching data-availability workload"
);
self.builder = self.builder.with_workload(workload);
self.builder
}
}
/// Chaos helpers for scenarios that can control nodes.
pub trait ChaosBuilderExt: Sized {
/// Entry point into chaos workloads.

View File

@ -1,412 +0,0 @@
use std::{
collections::{HashMap, HashSet},
num::NonZeroU64,
sync::{
Arc, Mutex,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use async_trait::async_trait;
use nomos_core::mantle::{
AuthenticatedMantleTx as _,
ops::{Op, channel::ChannelId},
};
use testing_framework_core::scenario::{BlockRecord, DynError, Expectation, RunContext};
use thiserror::Error;
use tokio::{pin, select, spawn, sync::broadcast, time::sleep};
use super::workload::{planned_channel_count, planned_channel_ids};
fn lock_or_recover<'a, T>(mutex: &'a Mutex<T>, name: &'static str) -> std::sync::MutexGuard<'a, T> {
match mutex.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::warn!(lock = name, "mutex poisoned; recovering inner value");
poisoned.into_inner()
}
}
}
#[derive(Debug)]
pub struct DaWorkloadExpectation {
blob_rate_per_block: NonZeroU64,
channel_rate_per_block: NonZeroU64,
headroom_percent: u64,
capture_state: Option<CaptureState>,
}
#[derive(Debug)]
struct CaptureState {
planned: Arc<HashSet<ChannelId>>,
inscriptions: Arc<Mutex<HashSet<ChannelId>>>,
blobs: Arc<Mutex<HashMap<ChannelId, u64>>>,
run_blocks: Arc<AtomicU64>,
run_duration: Duration,
}
const MIN_INSCRIPTION_INCLUSION_RATIO: f64 = 0.8;
const MIN_BLOB_INCLUSION_RATIO: f64 = 0.5;
#[derive(Debug, Error)]
enum DaExpectationError {
#[error("da workload expectation not started")]
NotCaptured,
#[error(
"missing inscriptions: observed={observed}/{planned} required={required} missing={missing:?}"
)]
MissingInscriptions {
planned: usize,
observed: usize,
required: usize,
missing: Vec<ChannelId>,
},
#[error(
"missing blobs: observed_total_blobs={observed_total_blobs} expected_total_blobs={expected_total_blobs} required_blobs={required_blobs} channels_with_blobs={channels_with_blobs}/{planned_channels} missing_channels={missing:?}"
)]
MissingBlobs {
expected_total_blobs: u64,
observed_total_blobs: u64,
required_blobs: u64,
planned_channels: usize,
channels_with_blobs: usize,
missing: Vec<ChannelId>,
},
}
impl DaWorkloadExpectation {
/// Validates that inscriptions and blobs landed for the planned channels.
pub const fn new(
blob_rate_per_block: NonZeroU64,
channel_rate_per_block: NonZeroU64,
headroom_percent: u64,
) -> Self {
Self {
blob_rate_per_block,
channel_rate_per_block,
headroom_percent,
capture_state: None,
}
}
}
#[async_trait]
impl Expectation for DaWorkloadExpectation {
fn name(&self) -> &'static str {
"da_workload_inclusions"
}
async fn start_capture(&mut self, ctx: &RunContext) -> Result<(), DynError> {
if self.capture_state.is_some() {
return Ok(());
}
let planned_ids = self.planned_channel_ids();
let run_duration = ctx.run_metrics().run_duration();
tracing::info!(
planned_channels = planned_ids.len(),
blob_rate_per_block = self.blob_rate_per_block.get(),
headroom_percent = self.headroom_percent,
run_duration_secs = run_duration.as_secs(),
"DA inclusion expectation starting capture"
);
let capture = build_capture_state(planned_ids, run_duration);
let block_feed = ctx.block_feed();
spawn_run_block_counter(
Arc::clone(&capture.run_blocks),
run_duration,
block_feed.clone(),
);
spawn_da_capture(
Arc::clone(&capture.planned),
Arc::clone(&capture.inscriptions),
Arc::clone(&capture.blobs),
block_feed,
);
self.capture_state = Some(capture);
Ok(())
}
async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> {
let state = self.capture_state()?;
self.evaluate_inscriptions(state)?;
self.evaluate_blobs(ctx, state)?;
Ok(())
}
}
#[derive(Debug, Clone, Copy)]
struct BlockWindow {
observed_blocks: u64,
expected_blocks: u64,
effective_blocks: u64,
}
#[derive(Debug)]
struct BlobObservation {
observed_total_blobs: u64,
channels_with_blobs: HashSet<ChannelId>,
}
impl DaWorkloadExpectation {
fn planned_channel_ids(&self) -> Vec<ChannelId> {
planned_channel_ids(planned_channel_count(
self.channel_rate_per_block,
self.headroom_percent,
))
}
fn capture_state(&self) -> Result<&CaptureState, DynError> {
self.capture_state
.as_ref()
.ok_or(DaExpectationError::NotCaptured)
.map_err(DynError::from)
}
fn evaluate_inscriptions(&self, state: &CaptureState) -> Result<(), DynError> {
let planned_total = state.planned.len();
let missing_inscriptions = self.missing_inscriptions(state);
let required_inscriptions =
minimum_required(planned_total, MIN_INSCRIPTION_INCLUSION_RATIO);
let observed_inscriptions = planned_total.saturating_sub(missing_inscriptions.len());
if observed_inscriptions >= required_inscriptions {
return Ok(());
}
tracing::warn!(
planned = planned_total,
missing = missing_inscriptions.len(),
required = required_inscriptions,
"DA expectation missing inscriptions"
);
Err(DaExpectationError::MissingInscriptions {
planned: planned_total,
observed: observed_inscriptions,
required: required_inscriptions,
missing: missing_inscriptions,
}
.into())
}
fn evaluate_blobs(&self, ctx: &RunContext, state: &CaptureState) -> Result<(), DynError> {
let planned_total = state.planned.len();
let BlobObservation {
observed_total_blobs,
channels_with_blobs,
} = self.observe_blobs(state);
let window = self.block_window(ctx, state);
let expected_total_blobs = self.expected_total_blobs(window.effective_blocks);
let required_blobs = minimum_required_u64(expected_total_blobs, MIN_BLOB_INCLUSION_RATIO);
if observed_total_blobs >= required_blobs {
tracing::info!(
planned_channels = planned_total,
channels_with_blobs = channels_with_blobs.len(),
inscriptions_observed = planned_total - self.missing_inscriptions(state).len(),
observed_total_blobs,
expected_total_blobs,
required_blobs,
observed_blocks = window.observed_blocks,
expected_blocks = window.expected_blocks,
effective_blocks = window.effective_blocks,
"DA inclusion expectation satisfied"
);
return Ok(());
}
let missing_blob_channels = missing_channels(&state.planned, &channels_with_blobs);
tracing::warn!(
expected_total_blobs,
observed_total_blobs,
required_blobs,
observed_blocks = window.observed_blocks,
expected_blocks = window.expected_blocks,
effective_blocks = window.effective_blocks,
run_duration_secs = state.run_duration.as_secs(),
missing_blob_channels = missing_blob_channels.len(),
"DA expectation missing blobs"
);
Err(DaExpectationError::MissingBlobs {
expected_total_blobs,
observed_total_blobs,
required_blobs,
planned_channels: planned_total,
channels_with_blobs: channels_with_blobs.len(),
missing: missing_blob_channels,
}
.into())
}
fn missing_inscriptions(&self, state: &CaptureState) -> Vec<ChannelId> {
let inscriptions = lock_or_recover(&state.inscriptions, "da_inscriptions");
missing_channels(&state.planned, &inscriptions)
}
fn observe_blobs(&self, state: &CaptureState) -> BlobObservation {
let blobs = lock_or_recover(&state.blobs, "da_blobs");
let observed_total_blobs = blobs.values().sum::<u64>();
let channels_with_blobs = blobs
.iter()
.filter(|(_, count)| **count > 0)
.map(|(channel, _)| *channel)
.collect::<HashSet<_>>();
BlobObservation {
observed_total_blobs,
channels_with_blobs,
}
}
fn block_window(&self, ctx: &RunContext, state: &CaptureState) -> BlockWindow {
let observed_blocks = state.run_blocks.load(Ordering::Relaxed).max(1);
let expected_blocks = ctx.run_metrics().expected_consensus_blocks().max(1);
let security_param = u64::from(
ctx.descriptors()
.config()
.consensus_params
.security_param
.get(),
)
.max(1);
let observed_inclusion_blocks = (observed_blocks / security_param).max(1);
let expected_inclusion_blocks = (expected_blocks / security_param).max(1);
let effective_blocks = observed_inclusion_blocks
.min(expected_inclusion_blocks)
.max(1);
BlockWindow {
observed_blocks,
expected_blocks,
effective_blocks,
}
}
fn expected_total_blobs(&self, effective_blocks: u64) -> u64 {
self.blob_rate_per_block
.get()
.saturating_mul(effective_blocks)
}
}
fn build_capture_state(planned_ids: Vec<ChannelId>, run_duration: Duration) -> CaptureState {
CaptureState {
planned: Arc::new(planned_ids.into_iter().collect()),
inscriptions: Arc::new(Mutex::new(HashSet::new())),
blobs: Arc::new(Mutex::new(HashMap::new())),
run_blocks: Arc::new(AtomicU64::new(0)),
run_duration,
}
}
fn spawn_run_block_counter(
run_blocks: Arc<AtomicU64>,
run_duration: Duration,
block_feed: testing_framework_core::scenario::BlockFeed,
) {
let mut receiver = block_feed.subscribe();
spawn(async move {
let timer = sleep(run_duration);
pin!(timer);
loop {
select! {
_ = &mut timer => break,
result = receiver.recv() => match result {
Ok(_) => {
run_blocks.fetch_add(1, Ordering::Relaxed);
}
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
});
}
fn spawn_da_capture(
planned: Arc<HashSet<ChannelId>>,
inscriptions: Arc<Mutex<HashSet<ChannelId>>>,
blobs: Arc<Mutex<HashMap<ChannelId, u64>>>,
block_feed: testing_framework_core::scenario::BlockFeed,
) {
let mut receiver = block_feed.subscribe();
spawn(async move {
loop {
match receiver.recv().await {
Ok(record) => capture_block(record.as_ref(), &planned, &inscriptions, &blobs),
Err(broadcast::error::RecvError::Lagged(skipped)) => {
tracing::debug!(skipped, "DA expectation: receiver lagged");
}
Err(broadcast::error::RecvError::Closed) => {
tracing::debug!("DA expectation: block feed closed");
break;
}
}
}
});
}
fn capture_block(
block: &BlockRecord,
planned: &HashSet<ChannelId>,
inscriptions: &Arc<Mutex<HashSet<ChannelId>>>,
blobs: &Arc<Mutex<HashMap<ChannelId, u64>>>,
) {
let mut new_inscriptions = Vec::new();
let mut new_blobs = Vec::new();
for tx in block.block.transactions() {
for op in &tx.mantle_tx().ops {
match op {
Op::ChannelInscribe(inscribe) if planned.contains(&inscribe.channel_id) => {
new_inscriptions.push(inscribe.channel_id);
}
Op::ChannelBlob(blob) if planned.contains(&blob.channel) => {
new_blobs.push(blob.channel);
}
_ => {}
}
}
}
if !new_inscriptions.is_empty() {
let mut guard = lock_or_recover(inscriptions, "da_inscriptions");
guard.extend(new_inscriptions);
tracing::debug!(count = guard.len(), "DA expectation captured inscriptions");
}
if !new_blobs.is_empty() {
let mut guard = lock_or_recover(blobs, "da_blobs");
for channel in new_blobs {
let entry = guard.entry(channel).or_insert(0);
*entry += 1;
}
tracing::debug!(
total_blobs = guard.values().sum::<u64>(),
"DA expectation captured blobs"
);
}
}
fn missing_channels(planned: &HashSet<ChannelId>, observed: &HashSet<ChannelId>) -> Vec<ChannelId> {
planned.difference(observed).copied().collect()
}
fn minimum_required(total: usize, ratio: f64) -> usize {
((total as f64) * ratio).ceil() as usize
}
fn minimum_required_u64(total: u64, ratio: f64) -> u64 {
((total as f64) * ratio).ceil() as u64
}

View File

@ -1,4 +0,0 @@
mod expectation;
mod workload;
pub use workload::Workload;

View File

@ -1,323 +0,0 @@
use std::{num::NonZeroU64, sync::Arc, time::Duration};
use async_trait::async_trait;
use executor_http_client::ExecutorHttpClient;
use futures::future::try_join_all;
use key_management_system_service::keys::{Ed25519Key, Ed25519PublicKey};
use nomos_core::{
da::BlobId,
mantle::{
AuthenticatedMantleTx as _,
ops::{
Op,
channel::{ChannelId, MsgId},
},
},
};
use rand::{RngCore as _, seq::SliceRandom as _, thread_rng};
use testing_framework_core::{
nodes::ApiClient,
scenario::{BlockRecord, DynError, Expectation, RunContext, Workload as ScenarioWorkload},
};
use tokio::{sync::broadcast, time::sleep};
use super::expectation::DaWorkloadExpectation;
use crate::{
util::tx,
workloads::util::{find_channel_op, submit_transaction_via_cluster},
};
const TEST_KEY_BYTES: [u8; 32] = [0u8; 32];
const BLOB_CHUNK_OPTIONS: &[usize] = &[1, 2, 4, 8];
const PUBLISH_RETRIES: usize = 5;
const PUBLISH_RETRY_DELAY: Duration = Duration::from_secs(2);
const DEFAULT_HEADROOM_PERCENT: u64 = 20;
#[derive(Clone)]
pub struct Workload {
blob_rate_per_block: NonZeroU64,
channel_rate_per_block: NonZeroU64,
headroom_percent: u64,
}
impl Default for Workload {
fn default() -> Self {
Self::with_rate(NonZeroU64::MIN, NonZeroU64::MIN, DEFAULT_HEADROOM_PERCENT)
}
}
impl Workload {
/// Creates a workload that targets a blobs-per-block rate and applies a
/// headroom factor when deriving the channel count.
#[must_use]
pub const fn with_rate(
blob_rate_per_block: NonZeroU64,
channel_rate_per_block: NonZeroU64,
headroom_percent: u64,
) -> Self {
Self {
blob_rate_per_block,
channel_rate_per_block,
headroom_percent,
}
}
#[must_use]
pub const fn default_headroom_percent() -> u64 {
DEFAULT_HEADROOM_PERCENT
}
}
#[async_trait]
impl ScenarioWorkload for Workload {
fn name(&self) -> &'static str {
"channel_workload"
}
fn expectations(&self) -> Vec<Box<dyn Expectation>> {
vec![Box::new(DaWorkloadExpectation::new(
self.blob_rate_per_block,
self.channel_rate_per_block,
self.headroom_percent,
))]
}
async fn start(&self, ctx: &RunContext) -> Result<(), DynError> {
let planned_channels = planned_channel_ids(planned_channel_count(
self.channel_rate_per_block,
self.headroom_percent,
));
let expected_blobs = planned_blob_count(
self.blob_rate_per_block,
ctx.run_metrics().expected_consensus_blocks(),
ctx.descriptors()
.config()
.consensus_params
.security_param
.get()
.into(),
);
let per_channel_target =
per_channel_blob_target(expected_blobs, planned_channels.len().max(1) as u64);
tracing::info!(
blob_rate_per_block = self.blob_rate_per_block.get(),
channel_rate = self.channel_rate_per_block.get(),
headroom_percent = self.headroom_percent,
planned_channels = planned_channels.len(),
expected_blobs,
per_channel_target,
"DA workload derived planned channels"
);
try_join_all(planned_channels.into_iter().map(|channel_id| {
let ctx = ctx;
async move {
tracing::info!(channel_id = ?channel_id, blobs = per_channel_target, "DA workload starting channel flow");
run_channel_flow(ctx, channel_id, per_channel_target).await?;
tracing::info!(channel_id = ?channel_id, "DA workload finished channel flow");
Ok::<(), DynError>(())
}
}))
.await?;
tracing::info!("DA workload completed all channel flows");
Ok(())
}
}
async fn run_channel_flow(
ctx: &RunContext,
channel_id: ChannelId,
target_blobs: u64,
) -> Result<(), DynError> {
tracing::debug!(channel_id = ?channel_id, "DA: submitting inscription tx");
let inscription_tx = Arc::new(tx::create_inscription_transaction_with_id(channel_id)?);
submit_transaction_via_cluster(ctx, Arc::clone(&inscription_tx)).await?;
let mut receiver = ctx.block_feed().subscribe();
let inscription_id = wait_for_inscription(&mut receiver, channel_id).await?;
let mut parent_id = inscription_id;
for idx in 0..target_blobs {
let payload = random_blob_payload();
let published_blob_id = publish_blob(ctx, channel_id, parent_id, payload).await?;
let (next_parent, included_blob_id) =
wait_for_blob_with_parent(&mut receiver, channel_id, parent_id).await?;
parent_id = next_parent;
tracing::debug!(
channel_id = ?channel_id,
blob_index = idx,
published_blob_id = ?published_blob_id,
included_blob_id = ?included_blob_id,
"DA: blob published"
);
}
Ok(())
}
async fn wait_for_inscription(
receiver: &mut broadcast::Receiver<Arc<BlockRecord>>,
channel_id: ChannelId,
) -> Result<MsgId, DynError> {
wait_for_channel_op(receiver, move |op| {
if let Op::ChannelInscribe(inscribe) = op
&& inscribe.channel_id == channel_id
{
Some(inscribe.id())
} else {
None
}
})
.await
}
async fn wait_for_blob_with_parent(
receiver: &mut broadcast::Receiver<Arc<BlockRecord>>,
channel_id: ChannelId,
parent_msg: MsgId,
) -> Result<(MsgId, BlobId), DynError> {
loop {
match receiver.recv().await {
Ok(record) => {
for tx in record.block.transactions() {
for op in &tx.mantle_tx().ops {
if let Op::ChannelBlob(blob_op) = op
&& blob_op.channel == channel_id
&& blob_op.parent == parent_msg
{
let msg_id = blob_op.id();
return Ok((msg_id, blob_op.blob));
}
}
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => {
return Err("block feed closed while waiting for channel operations".into());
}
}
}
}
async fn wait_for_channel_op<F>(
receiver: &mut broadcast::Receiver<Arc<BlockRecord>>,
mut matcher: F,
) -> Result<MsgId, DynError>
where
F: FnMut(&Op) -> Option<MsgId>,
{
loop {
match receiver.recv().await {
Ok(record) => {
if let Some(msg_id) = find_channel_op(record.block.as_ref(), &mut matcher) {
tracing::debug!(?msg_id, "DA: matched channel operation");
return Ok(msg_id);
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => {
return Err("block feed closed while waiting for channel operations".into());
}
}
}
}
async fn publish_blob(
ctx: &RunContext,
channel_id: ChannelId,
parent_msg: MsgId,
data: Vec<u8>,
) -> Result<BlobId, DynError> {
let executors = ctx.node_clients().executor_clients();
if executors.is_empty() {
return Err("da workload requires at least one executor".into());
}
let signer = test_signer();
tracing::debug!(channel = ?channel_id, payload_bytes = data.len(), "DA: prepared blob payload");
let client = ExecutorHttpClient::new(None);
let mut candidates: Vec<&ApiClient> = executors.iter().collect();
let mut last_err = None;
for attempt in 1..=PUBLISH_RETRIES {
candidates.shuffle(&mut thread_rng());
for executor in &candidates {
let executor_url = executor.base_url().clone();
match client
.publish_blob(executor_url, channel_id, parent_msg, signer, data.clone())
.await
{
Ok(blob_id) => return Ok(blob_id),
Err(err) => {
tracing::debug!(attempt, executor = %executor.base_url(), %err, "DA: publish_blob failed");
last_err = Some(err.into())
}
}
}
if attempt < PUBLISH_RETRIES {
sleep(PUBLISH_RETRY_DELAY).await;
}
}
Err(last_err.unwrap_or_else(|| "da workload could not publish blob".into()))
}
fn test_signer() -> Ed25519PublicKey {
Ed25519Key::from_bytes(&TEST_KEY_BYTES).public_key()
}
fn random_blob_payload() -> Vec<u8> {
let mut rng = thread_rng();
// KZGRS encoder expects the polynomial degree to be a power of two, which
// effectively constrains the blob chunk count.
let chunks = BLOB_CHUNK_OPTIONS.choose(&mut rng).copied().unwrap_or(1);
let mut data = vec![0u8; 31 * chunks];
rng.fill_bytes(&mut data);
data
}
pub fn planned_channel_ids(total: usize) -> Vec<ChannelId> {
(0..total as u64)
.map(deterministic_channel_id)
.collect::<Vec<_>>()
}
fn deterministic_channel_id(index: u64) -> ChannelId {
let mut bytes = [0u8; 32];
bytes[..8].copy_from_slice(b"chn_wrkd");
bytes[24..].copy_from_slice(&index.to_be_bytes());
ChannelId::from(bytes)
}
#[must_use]
pub fn planned_channel_count(channel_rate_per_block: NonZeroU64, headroom_percent: u64) -> usize {
let base = channel_rate_per_block.get() as usize;
let extra = (base.saturating_mul(headroom_percent as usize) + 99) / 100;
let total = base.saturating_add(extra);
total.max(1)
}
#[must_use]
pub fn planned_blob_count(
blob_rate_per_block: NonZeroU64,
expected_consensus_blocks: u64,
security_param: u64,
) -> u64 {
let expected_blocks = expected_consensus_blocks.max(1);
let security_param = security_param.max(1);
let inclusion_blocks = (expected_blocks / security_param).max(1);
blob_rate_per_block.get().saturating_mul(inclusion_blocks)
}
#[must_use]
pub fn per_channel_blob_target(total_blobs: u64, channel_count: u64) -> u64 {
if channel_count == 0 {
return total_blobs.max(1);
}
let per = (total_blobs + channel_count - 1) / channel_count;
per.max(1)
}

View File

@ -1,5 +1,4 @@
pub mod chaos;
pub mod da;
pub mod transaction;
pub mod util;