mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-06-06 17:09:27 +00:00
Remove DA workload usage from framework
This commit is contained in:
parent
015c884f9a
commit
6310a5cbe9
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -7101,10 +7101,8 @@ name = "testing-framework-workflows"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"futures",
|
|
||||||
"logos-blockchain-chain-service",
|
"logos-blockchain-chain-service",
|
||||||
"logos-blockchain-core",
|
"logos-blockchain-core",
|
||||||
"logos-blockchain-executor-http-client",
|
|
||||||
"logos-blockchain-key-management-system-service",
|
"logos-blockchain-key-management-system-service",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
|
|||||||
0
examples/cucumber/features/compose_smoke.feature
Normal file
0
examples/cucumber/features/compose_smoke.feature
Normal file
0
examples/cucumber/features/local_smoke.feature
Normal file
0
examples/cucumber/features/local_smoke.feature
Normal file
@ -9,7 +9,6 @@ pub fn scenario_plan() -> SnippetResult<Scenario<()>> {
|
|||||||
ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
|
ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
|
||||||
.wallets(50)
|
.wallets(50)
|
||||||
.transactions_with(|txs| txs.rate(5).users(20))
|
.transactions_with(|txs| txs.rate(5).users(20))
|
||||||
.da_with(|da| da.channel_rate(1).blob_rate(2))
|
|
||||||
.expect_consensus_liveness()
|
.expect_consensus_liveness()
|
||||||
.with_run_duration(Duration::from_secs(90))
|
.with_run_duration(Duration::from_secs(90))
|
||||||
.build()
|
.build()
|
||||||
|
|||||||
@ -12,11 +12,6 @@ pub async fn run_test() -> Result<()> {
|
|||||||
txs.rate(5) // 5 transactions per block
|
txs.rate(5) // 5 transactions per block
|
||||||
.users(20)
|
.users(20)
|
||||||
})
|
})
|
||||||
.da_with(|da| {
|
|
||||||
da.channel_rate(1) // number of DA channels
|
|
||||||
.blob_rate(2) // target 2 blobs per block
|
|
||||||
.headroom_percent(20) // optional channel headroom
|
|
||||||
})
|
|
||||||
.expect_consensus_liveness()
|
.expect_consensus_liveness()
|
||||||
.with_run_duration(Duration::from_secs(90))
|
.with_run_duration(Duration::from_secs(90))
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|||||||
@ -1,15 +0,0 @@
|
|||||||
use testing_framework_core::scenario::{Scenario, ScenarioBuilder};
|
|
||||||
use testing_framework_workflows::ScenarioBuilderExt;
|
|
||||||
|
|
||||||
use crate::SnippetResult;
|
|
||||||
|
|
||||||
pub fn da_plan() -> SnippetResult<Scenario<()>> {
|
|
||||||
ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(1))
|
|
||||||
.wallets(50)
|
|
||||||
.da_with(|da| {
|
|
||||||
da.channel_rate(1) // number of DA channels to run
|
|
||||||
.blob_rate(2) // target 2 blobs per block (headroom applied)
|
|
||||||
.headroom_percent(20) // optional headroom when sizing channels
|
|
||||||
}) // Finish DA workload config
|
|
||||||
.build()
|
|
||||||
}
|
|
||||||
@ -9,7 +9,6 @@ pub async fn sustained_load_test() -> Result<()> {
|
|||||||
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(4).executors(2))
|
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(4).executors(2))
|
||||||
.wallets(100)
|
.wallets(100)
|
||||||
.transactions_with(|txs| txs.rate(15).users(50))
|
.transactions_with(|txs| txs.rate(15).users(50))
|
||||||
.da_with(|da| da.channel_rate(2).blob_rate(3))
|
|
||||||
.expect_consensus_liveness()
|
.expect_consensus_liveness()
|
||||||
.with_run_duration(Duration::from_secs(300))
|
.with_run_duration(Duration::from_secs(300))
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|||||||
@ -5,11 +5,10 @@ use testing_framework_core::scenario::{Deployer, ScenarioBuilder};
|
|||||||
use testing_framework_runner_local::LocalDeployer;
|
use testing_framework_runner_local::LocalDeployer;
|
||||||
use testing_framework_workflows::ScenarioBuilderExt;
|
use testing_framework_workflows::ScenarioBuilderExt;
|
||||||
|
|
||||||
pub async fn da_and_transactions() -> Result<()> {
|
pub async fn transactions_multi_node() -> Result<()> {
|
||||||
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
|
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(3).executors(2))
|
||||||
.wallets(30)
|
.wallets(30)
|
||||||
.transactions_with(|txs| txs.rate(5).users(15))
|
.transactions_with(|txs| txs.rate(5).users(15))
|
||||||
.da_with(|da| da.channel_rate(2).blob_rate(2))
|
|
||||||
.expect_consensus_liveness()
|
.expect_consensus_liveness()
|
||||||
.with_run_duration(Duration::from_secs(90))
|
.with_run_duration(Duration::from_secs(90))
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|||||||
@ -16,7 +16,6 @@ mod dsl_cheat_sheet_topology;
|
|||||||
mod dsl_cheat_sheet_transactions_workload;
|
mod dsl_cheat_sheet_transactions_workload;
|
||||||
mod dsl_cheat_sheet_wallets;
|
mod dsl_cheat_sheet_wallets;
|
||||||
mod dsl_cheat_sheet_workload_chaos;
|
mod dsl_cheat_sheet_workload_chaos;
|
||||||
mod dsl_cheat_sheet_workload_da;
|
|
||||||
mod dsl_cheat_sheet_workload_execution;
|
mod dsl_cheat_sheet_workload_execution;
|
||||||
mod examples_advanced_aggressive_chaos_test;
|
mod examples_advanced_aggressive_chaos_test;
|
||||||
mod examples_advanced_load_progression_test;
|
mod examples_advanced_load_progression_test;
|
||||||
|
|||||||
@ -6,18 +6,13 @@ use testing_framework_runner_local::LocalDeployer;
|
|||||||
use testing_framework_workflows::ScenarioBuilderExt;
|
use testing_framework_workflows::ScenarioBuilderExt;
|
||||||
|
|
||||||
pub async fn run_local_demo() -> Result<()> {
|
pub async fn run_local_demo() -> Result<()> {
|
||||||
// Define the scenario (1 validator + 1 executor, tx + DA workload)
|
// Define the scenario (1 validator + 1 executor, tx workload)
|
||||||
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(1))
|
let mut plan = ScenarioBuilder::topology_with(|t| t.network_star().validators(1).executors(1))
|
||||||
.wallets(1_000)
|
.wallets(1_000)
|
||||||
.transactions_with(|txs| {
|
.transactions_with(|txs| {
|
||||||
txs.rate(5) // 5 transactions per block
|
txs.rate(5) // 5 transactions per block
|
||||||
.users(500) // use 500 of the seeded wallets
|
.users(500) // use 500 of the seeded wallets
|
||||||
})
|
})
|
||||||
.da_with(|da| {
|
|
||||||
da.channel_rate(1) // 1 channel
|
|
||||||
.blob_rate(1) // target 1 blob per block
|
|
||||||
.headroom_percent(20) // default headroom when sizing channels
|
|
||||||
})
|
|
||||||
.expect_consensus_liveness()
|
.expect_consensus_liveness()
|
||||||
.with_run_duration(Duration::from_secs(60))
|
.with_run_duration(Duration::from_secs(60))
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|||||||
@ -4,6 +4,6 @@ pub fn step_1_topology() -> testing_framework_core::scenario::Builder<()> {
|
|||||||
ScenarioBuilder::topology_with(|t| {
|
ScenarioBuilder::topology_with(|t| {
|
||||||
t.network_star() // Star topology: all nodes connect to seed
|
t.network_star() // Star topology: all nodes connect to seed
|
||||||
.validators(1) // 1 validator node
|
.validators(1) // 1 validator node
|
||||||
.executors(1) // 1 executor node (validator + DA dispersal)
|
.executors(1) // 1 executor node
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -8,9 +8,4 @@ pub fn step_3_workloads() -> testing_framework_core::scenario::Builder<()> {
|
|||||||
txs.rate(5) // 5 transactions per block
|
txs.rate(5) // 5 transactions per block
|
||||||
.users(500) // Use 500 of the 1,000 wallets
|
.users(500) // Use 500 of the 1,000 wallets
|
||||||
})
|
})
|
||||||
.da_with(|da| {
|
|
||||||
da.channel_rate(1) // 1 DA channel (more spawned with headroom)
|
|
||||||
.blob_rate(1) // target 1 blob per block
|
|
||||||
.headroom_percent(20) // default headroom when sizing channels
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -61,7 +61,7 @@ async fn run_local_case(validators: usize, executors: usize, run_duration: Durat
|
|||||||
|
|
||||||
let mut plan = scenario.build()?;
|
let mut plan = scenario.build()?;
|
||||||
|
|
||||||
let deployer = LocalDeployer::default().with_membership_check(true);
|
let deployer = LocalDeployer::default();
|
||||||
info!("deploying local nodes");
|
info!("deploying local nodes");
|
||||||
|
|
||||||
let runner: Runner = deployer
|
let runner: Runner = deployer
|
||||||
|
|||||||
@ -145,6 +145,10 @@ impl GeneratedTopology {
|
|||||||
|
|
||||||
wait_for_network_readiness(self, &client, &endpoints, &labels).await?;
|
wait_for_network_readiness(self, &client, &endpoints, &labels).await?;
|
||||||
|
|
||||||
|
if validator_membership_endpoints.is_none() && executor_membership_endpoints.is_none() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
let membership_endpoints = collect_membership_endpoints(
|
let membership_endpoints = collect_membership_endpoints(
|
||||||
self,
|
self,
|
||||||
total_nodes,
|
total_nodes,
|
||||||
|
|||||||
0
testing-framework/cucumber/src/steps/run.rs
Normal file
0
testing-framework/cucumber/src/steps/run.rs
Normal file
0
testing-framework/cucumber/src/steps/workloads.rs
Normal file
0
testing-framework/cucumber/src/steps/workloads.rs
Normal file
0
testing-framework/cucumber/src/world.rs
Normal file
0
testing-framework/cucumber/src/world.rs
Normal file
@ -160,24 +160,8 @@ pub async fn ensure_remote_readiness_with_ports(
|
|||||||
.map(|ports| readiness_url(HttpNodeRole::Executor, ports.api))
|
.map(|ports| readiness_url(HttpNodeRole::Executor, ports.api))
|
||||||
.collect::<Result<Vec<_>, _>>()?;
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
let validator_membership_urls = mapping
|
|
||||||
.validators
|
|
||||||
.iter()
|
|
||||||
.map(|ports| readiness_url(HttpNodeRole::Validator, ports.testing))
|
|
||||||
.collect::<Result<Vec<_>, _>>()?;
|
|
||||||
let executor_membership_urls = mapping
|
|
||||||
.executors
|
|
||||||
.iter()
|
|
||||||
.map(|ports| readiness_url(HttpNodeRole::Executor, ports.testing))
|
|
||||||
.collect::<Result<Vec<_>, _>>()?;
|
|
||||||
|
|
||||||
descriptors
|
descriptors
|
||||||
.wait_remote_readiness(
|
.wait_remote_readiness(&validator_urls, &executor_urls, None, None)
|
||||||
&validator_urls,
|
|
||||||
&executor_urls,
|
|
||||||
Some(&validator_membership_urls),
|
|
||||||
Some(&executor_membership_urls),
|
|
||||||
)
|
|
||||||
.await
|
.await
|
||||||
.map_err(|source| StackReadinessError::Remote { source })
|
.map_err(|source| StackReadinessError::Remote { source })
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,9 +15,7 @@ use tracing::{debug, info};
|
|||||||
/// Spawns validators and executors as local processes, reusing the existing
|
/// Spawns validators and executors as local processes, reusing the existing
|
||||||
/// integration harness.
|
/// integration harness.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct LocalDeployer {
|
pub struct LocalDeployer {}
|
||||||
membership_check: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Errors surfaced by the local deployer while driving a scenario.
|
/// Errors surfaced by the local deployer while driving a scenario.
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
@ -63,10 +61,9 @@ impl Deployer<()> for LocalDeployer {
|
|||||||
info!(
|
info!(
|
||||||
validators = scenario.topology().validators().len(),
|
validators = scenario.topology().validators().len(),
|
||||||
executors = scenario.topology().executors().len(),
|
executors = scenario.topology().executors().len(),
|
||||||
membership_checks = self.membership_check,
|
|
||||||
"starting local deployment"
|
"starting local deployment"
|
||||||
);
|
);
|
||||||
let topology = Self::prepare_topology(scenario, self.membership_check).await?;
|
let topology = Self::prepare_topology(scenario).await?;
|
||||||
let node_clients = NodeClients::from_topology(scenario.topology(), &topology);
|
let node_clients = NodeClients::from_topology(scenario.topology(), &topology);
|
||||||
|
|
||||||
let (block_feed, block_feed_guard) = spawn_block_feed_with(&node_clients).await?;
|
let (block_feed, block_feed_guard) = spawn_block_feed_with(&node_clients).await?;
|
||||||
@ -87,22 +84,12 @@ impl Deployer<()> for LocalDeployer {
|
|||||||
|
|
||||||
impl LocalDeployer {
|
impl LocalDeployer {
|
||||||
#[must_use]
|
#[must_use]
|
||||||
/// Construct with membership readiness checks enabled.
|
/// Construct a local deployer.
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self::default()
|
Self::default()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[must_use]
|
async fn prepare_topology(scenario: &Scenario<()>) -> Result<Topology, LocalDeployerError> {
|
||||||
/// Enable or disable membership readiness probes.
|
|
||||||
pub const fn with_membership_check(mut self, enabled: bool) -> Self {
|
|
||||||
self.membership_check = enabled;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn prepare_topology(
|
|
||||||
scenario: &Scenario<()>,
|
|
||||||
membership_check: bool,
|
|
||||||
) -> Result<Topology, LocalDeployerError> {
|
|
||||||
let descriptors = scenario.topology();
|
let descriptors = scenario.topology();
|
||||||
info!(
|
info!(
|
||||||
validators = descriptors.validators().len(),
|
validators = descriptors.validators().len(),
|
||||||
@ -115,13 +102,10 @@ impl LocalDeployer {
|
|||||||
.await
|
.await
|
||||||
.map_err(|source| LocalDeployerError::Spawn { source })?;
|
.map_err(|source| LocalDeployerError::Spawn { source })?;
|
||||||
|
|
||||||
let skip_membership = !membership_check;
|
wait_for_readiness(&topology).await.map_err(|source| {
|
||||||
wait_for_readiness(&topology, skip_membership)
|
debug!(error = ?source, "local readiness failed");
|
||||||
.await
|
LocalDeployerError::ReadinessFailed { source }
|
||||||
.map_err(|source| {
|
})?;
|
||||||
debug!(error = ?source, "local readiness failed");
|
|
||||||
LocalDeployerError::ReadinessFailed { source }
|
|
||||||
})?;
|
|
||||||
|
|
||||||
info!("local nodes are ready");
|
info!("local nodes are ready");
|
||||||
Ok(topology)
|
Ok(topology)
|
||||||
@ -130,27 +114,14 @@ impl LocalDeployer {
|
|||||||
|
|
||||||
impl Default for LocalDeployer {
|
impl Default for LocalDeployer {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {}
|
||||||
membership_check: true,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn wait_for_readiness(
|
async fn wait_for_readiness(topology: &Topology) -> Result<(), ReadinessError> {
|
||||||
topology: &Topology,
|
|
||||||
skip_membership: bool,
|
|
||||||
) -> Result<(), ReadinessError> {
|
|
||||||
info!("waiting for local network readiness");
|
info!("waiting for local network readiness");
|
||||||
topology.wait_network_ready().await?;
|
topology.wait_network_ready().await?;
|
||||||
if skip_membership {
|
Ok(())
|
||||||
// Allow callers to bypass deeper readiness for lightweight demos.
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
info!("waiting for membership readiness");
|
|
||||||
topology.wait_membership_ready().await?;
|
|
||||||
|
|
||||||
info!("waiting for DA balancer readiness");
|
|
||||||
topology.wait_da_balancer_ready().await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn spawn_block_feed_with(
|
async fn spawn_block_feed_with(
|
||||||
|
|||||||
@ -15,8 +15,6 @@ workspace = true
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
chain-service = { workspace = true }
|
chain-service = { workspace = true }
|
||||||
executor-http-client = { workspace = true }
|
|
||||||
futures = "0.3"
|
|
||||||
key-management-system-service = { workspace = true }
|
key-management-system-service = { workspace = true }
|
||||||
nomos-core = { workspace = true }
|
nomos-core = { workspace = true }
|
||||||
rand = { workspace = true }
|
rand = { workspace = true }
|
||||||
|
|||||||
@ -10,7 +10,7 @@ use testing_framework_core::{
|
|||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
expectations::ConsensusLiveness,
|
expectations::ConsensusLiveness,
|
||||||
workloads::{chaos::RandomRestartWorkload, da, transaction},
|
workloads::{chaos::RandomRestartWorkload, transaction},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
@ -35,15 +35,6 @@ pub trait ScenarioBuilderExt<Caps>: Sized {
|
|||||||
self,
|
self,
|
||||||
f: impl FnOnce(TransactionFlowBuilder<Caps>) -> TransactionFlowBuilder<Caps>,
|
f: impl FnOnce(TransactionFlowBuilder<Caps>) -> TransactionFlowBuilder<Caps>,
|
||||||
) -> CoreScenarioBuilder<Caps>;
|
) -> CoreScenarioBuilder<Caps>;
|
||||||
|
|
||||||
/// Configure a data-availability workload.
|
|
||||||
fn da(self) -> DataAvailabilityFlowBuilder<Caps>;
|
|
||||||
|
|
||||||
/// Configure a data-availability workload via closure.
|
|
||||||
fn da_with(
|
|
||||||
self,
|
|
||||||
f: impl FnOnce(DataAvailabilityFlowBuilder<Caps>) -> DataAvailabilityFlowBuilder<Caps>,
|
|
||||||
) -> CoreScenarioBuilder<Caps>;
|
|
||||||
#[must_use]
|
#[must_use]
|
||||||
/// Attach a consensus liveness expectation.
|
/// Attach a consensus liveness expectation.
|
||||||
fn expect_consensus_liveness(self) -> Self;
|
fn expect_consensus_liveness(self) -> Self;
|
||||||
@ -65,17 +56,6 @@ impl<Caps> ScenarioBuilderExt<Caps> for CoreScenarioBuilder<Caps> {
|
|||||||
f(self.transactions()).apply()
|
f(self.transactions()).apply()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn da(self) -> DataAvailabilityFlowBuilder<Caps> {
|
|
||||||
DataAvailabilityFlowBuilder::new(self)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn da_with(
|
|
||||||
self,
|
|
||||||
f: impl FnOnce(DataAvailabilityFlowBuilder<Caps>) -> DataAvailabilityFlowBuilder<Caps>,
|
|
||||||
) -> CoreScenarioBuilder<Caps> {
|
|
||||||
f(self.da()).apply()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn expect_consensus_liveness(self) -> Self {
|
fn expect_consensus_liveness(self) -> Self {
|
||||||
self.with_expectation(ConsensusLiveness::default())
|
self.with_expectation(ConsensusLiveness::default())
|
||||||
}
|
}
|
||||||
@ -498,112 +478,6 @@ impl<Caps> TransactionFlowBuilder<Caps> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Builder for data availability workloads.
|
|
||||||
pub struct DataAvailabilityFlowBuilder<Caps> {
|
|
||||||
builder: CoreScenarioBuilder<Caps>,
|
|
||||||
channel_rate: NonZeroU64,
|
|
||||||
blob_rate: NonZeroU64,
|
|
||||||
headroom_percent: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Caps> DataAvailabilityFlowBuilder<Caps> {
|
|
||||||
const fn default_channel_rate() -> NonZeroU64 {
|
|
||||||
NonZeroU64::MIN
|
|
||||||
}
|
|
||||||
|
|
||||||
const fn default_blob_rate() -> NonZeroU64 {
|
|
||||||
NonZeroU64::MIN
|
|
||||||
}
|
|
||||||
|
|
||||||
const fn new(builder: CoreScenarioBuilder<Caps>) -> Self {
|
|
||||||
Self {
|
|
||||||
builder,
|
|
||||||
channel_rate: Self::default_channel_rate(),
|
|
||||||
blob_rate: Self::default_blob_rate(),
|
|
||||||
headroom_percent: da::Workload::default_headroom_percent(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[must_use]
|
|
||||||
/// Set the number of DA channels to run (ignores zero).
|
|
||||||
pub fn channel_rate(mut self, rate: u64) -> Self {
|
|
||||||
match NonZeroU64::new(rate) {
|
|
||||||
Some(rate) => self.channel_rate = rate,
|
|
||||||
None => tracing::warn!(
|
|
||||||
rate,
|
|
||||||
"DA channel rate must be non-zero; keeping previous rate"
|
|
||||||
),
|
|
||||||
}
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Like `channel_rate`, but returns an error instead of panicking.
|
|
||||||
pub fn try_channel_rate(self, rate: u64) -> Result<Self, BuilderInputError> {
|
|
||||||
let Some(rate) = NonZeroU64::new(rate) else {
|
|
||||||
return Err(BuilderInputError::ZeroValue {
|
|
||||||
field: "da_channel_rate",
|
|
||||||
});
|
|
||||||
};
|
|
||||||
Ok(self.channel_rate_per_block(rate))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[must_use]
|
|
||||||
/// Set the number of DA channels to run.
|
|
||||||
pub const fn channel_rate_per_block(mut self, rate: NonZeroU64) -> Self {
|
|
||||||
self.channel_rate = rate;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
#[must_use]
|
|
||||||
/// Set blob publish rate (per block).
|
|
||||||
pub fn blob_rate(mut self, rate: u64) -> Self {
|
|
||||||
match NonZeroU64::new(rate) {
|
|
||||||
Some(rate) => self.blob_rate = rate,
|
|
||||||
None => tracing::warn!(rate, "DA blob rate must be non-zero; keeping previous rate"),
|
|
||||||
}
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Like `blob_rate`, but returns an error instead of panicking.
|
|
||||||
pub fn try_blob_rate(self, rate: u64) -> Result<Self, BuilderInputError> {
|
|
||||||
let Some(rate) = NonZeroU64::new(rate) else {
|
|
||||||
return Err(BuilderInputError::ZeroValue {
|
|
||||||
field: "da_blob_rate",
|
|
||||||
});
|
|
||||||
};
|
|
||||||
Ok(self.blob_rate_per_block(rate))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[must_use]
|
|
||||||
/// Set blob publish rate per block.
|
|
||||||
pub const fn blob_rate_per_block(mut self, rate: NonZeroU64) -> Self {
|
|
||||||
self.blob_rate = rate;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
#[must_use]
|
|
||||||
/// Apply headroom when converting blob rate into channel count.
|
|
||||||
pub const fn headroom_percent(mut self, percent: u64) -> Self {
|
|
||||||
self.headroom_percent = percent;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
#[must_use]
|
|
||||||
pub fn apply(mut self) -> CoreScenarioBuilder<Caps> {
|
|
||||||
let workload =
|
|
||||||
da::Workload::with_rate(self.blob_rate, self.channel_rate, self.headroom_percent);
|
|
||||||
tracing::info!(
|
|
||||||
channel_rate = self.channel_rate.get(),
|
|
||||||
blob_rate = self.blob_rate.get(),
|
|
||||||
headroom_percent = self.headroom_percent,
|
|
||||||
"attaching data-availability workload"
|
|
||||||
);
|
|
||||||
|
|
||||||
self.builder = self.builder.with_workload(workload);
|
|
||||||
self.builder
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Chaos helpers for scenarios that can control nodes.
|
/// Chaos helpers for scenarios that can control nodes.
|
||||||
pub trait ChaosBuilderExt: Sized {
|
pub trait ChaosBuilderExt: Sized {
|
||||||
/// Entry point into chaos workloads.
|
/// Entry point into chaos workloads.
|
||||||
|
|||||||
@ -1,412 +0,0 @@
|
|||||||
use std::{
|
|
||||||
collections::{HashMap, HashSet},
|
|
||||||
num::NonZeroU64,
|
|
||||||
sync::{
|
|
||||||
Arc, Mutex,
|
|
||||||
atomic::{AtomicU64, Ordering},
|
|
||||||
},
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use nomos_core::mantle::{
|
|
||||||
AuthenticatedMantleTx as _,
|
|
||||||
ops::{Op, channel::ChannelId},
|
|
||||||
};
|
|
||||||
use testing_framework_core::scenario::{BlockRecord, DynError, Expectation, RunContext};
|
|
||||||
use thiserror::Error;
|
|
||||||
use tokio::{pin, select, spawn, sync::broadcast, time::sleep};
|
|
||||||
|
|
||||||
use super::workload::{planned_channel_count, planned_channel_ids};
|
|
||||||
|
|
||||||
fn lock_or_recover<'a, T>(mutex: &'a Mutex<T>, name: &'static str) -> std::sync::MutexGuard<'a, T> {
|
|
||||||
match mutex.lock() {
|
|
||||||
Ok(guard) => guard,
|
|
||||||
Err(poisoned) => {
|
|
||||||
tracing::warn!(lock = name, "mutex poisoned; recovering inner value");
|
|
||||||
poisoned.into_inner()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct DaWorkloadExpectation {
|
|
||||||
blob_rate_per_block: NonZeroU64,
|
|
||||||
channel_rate_per_block: NonZeroU64,
|
|
||||||
headroom_percent: u64,
|
|
||||||
capture_state: Option<CaptureState>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct CaptureState {
|
|
||||||
planned: Arc<HashSet<ChannelId>>,
|
|
||||||
inscriptions: Arc<Mutex<HashSet<ChannelId>>>,
|
|
||||||
blobs: Arc<Mutex<HashMap<ChannelId, u64>>>,
|
|
||||||
run_blocks: Arc<AtomicU64>,
|
|
||||||
run_duration: Duration,
|
|
||||||
}
|
|
||||||
|
|
||||||
const MIN_INSCRIPTION_INCLUSION_RATIO: f64 = 0.8;
|
|
||||||
const MIN_BLOB_INCLUSION_RATIO: f64 = 0.5;
|
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
|
||||||
enum DaExpectationError {
|
|
||||||
#[error("da workload expectation not started")]
|
|
||||||
NotCaptured,
|
|
||||||
#[error(
|
|
||||||
"missing inscriptions: observed={observed}/{planned} required={required} missing={missing:?}"
|
|
||||||
)]
|
|
||||||
MissingInscriptions {
|
|
||||||
planned: usize,
|
|
||||||
observed: usize,
|
|
||||||
required: usize,
|
|
||||||
missing: Vec<ChannelId>,
|
|
||||||
},
|
|
||||||
#[error(
|
|
||||||
"missing blobs: observed_total_blobs={observed_total_blobs} expected_total_blobs={expected_total_blobs} required_blobs={required_blobs} channels_with_blobs={channels_with_blobs}/{planned_channels} missing_channels={missing:?}"
|
|
||||||
)]
|
|
||||||
MissingBlobs {
|
|
||||||
expected_total_blobs: u64,
|
|
||||||
observed_total_blobs: u64,
|
|
||||||
required_blobs: u64,
|
|
||||||
planned_channels: usize,
|
|
||||||
channels_with_blobs: usize,
|
|
||||||
missing: Vec<ChannelId>,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DaWorkloadExpectation {
|
|
||||||
/// Validates that inscriptions and blobs landed for the planned channels.
|
|
||||||
pub const fn new(
|
|
||||||
blob_rate_per_block: NonZeroU64,
|
|
||||||
channel_rate_per_block: NonZeroU64,
|
|
||||||
headroom_percent: u64,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
blob_rate_per_block,
|
|
||||||
channel_rate_per_block,
|
|
||||||
headroom_percent,
|
|
||||||
capture_state: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl Expectation for DaWorkloadExpectation {
|
|
||||||
fn name(&self) -> &'static str {
|
|
||||||
"da_workload_inclusions"
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn start_capture(&mut self, ctx: &RunContext) -> Result<(), DynError> {
|
|
||||||
if self.capture_state.is_some() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let planned_ids = self.planned_channel_ids();
|
|
||||||
let run_duration = ctx.run_metrics().run_duration();
|
|
||||||
|
|
||||||
tracing::info!(
|
|
||||||
planned_channels = planned_ids.len(),
|
|
||||||
blob_rate_per_block = self.blob_rate_per_block.get(),
|
|
||||||
headroom_percent = self.headroom_percent,
|
|
||||||
run_duration_secs = run_duration.as_secs(),
|
|
||||||
"DA inclusion expectation starting capture"
|
|
||||||
);
|
|
||||||
|
|
||||||
let capture = build_capture_state(planned_ids, run_duration);
|
|
||||||
let block_feed = ctx.block_feed();
|
|
||||||
|
|
||||||
spawn_run_block_counter(
|
|
||||||
Arc::clone(&capture.run_blocks),
|
|
||||||
run_duration,
|
|
||||||
block_feed.clone(),
|
|
||||||
);
|
|
||||||
spawn_da_capture(
|
|
||||||
Arc::clone(&capture.planned),
|
|
||||||
Arc::clone(&capture.inscriptions),
|
|
||||||
Arc::clone(&capture.blobs),
|
|
||||||
block_feed,
|
|
||||||
);
|
|
||||||
|
|
||||||
self.capture_state = Some(capture);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> {
|
|
||||||
let state = self.capture_state()?;
|
|
||||||
self.evaluate_inscriptions(state)?;
|
|
||||||
self.evaluate_blobs(ctx, state)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
|
||||||
struct BlockWindow {
|
|
||||||
observed_blocks: u64,
|
|
||||||
expected_blocks: u64,
|
|
||||||
effective_blocks: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct BlobObservation {
|
|
||||||
observed_total_blobs: u64,
|
|
||||||
channels_with_blobs: HashSet<ChannelId>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DaWorkloadExpectation {
|
|
||||||
fn planned_channel_ids(&self) -> Vec<ChannelId> {
|
|
||||||
planned_channel_ids(planned_channel_count(
|
|
||||||
self.channel_rate_per_block,
|
|
||||||
self.headroom_percent,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn capture_state(&self) -> Result<&CaptureState, DynError> {
|
|
||||||
self.capture_state
|
|
||||||
.as_ref()
|
|
||||||
.ok_or(DaExpectationError::NotCaptured)
|
|
||||||
.map_err(DynError::from)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn evaluate_inscriptions(&self, state: &CaptureState) -> Result<(), DynError> {
|
|
||||||
let planned_total = state.planned.len();
|
|
||||||
let missing_inscriptions = self.missing_inscriptions(state);
|
|
||||||
let required_inscriptions =
|
|
||||||
minimum_required(planned_total, MIN_INSCRIPTION_INCLUSION_RATIO);
|
|
||||||
let observed_inscriptions = planned_total.saturating_sub(missing_inscriptions.len());
|
|
||||||
|
|
||||||
if observed_inscriptions >= required_inscriptions {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
tracing::warn!(
|
|
||||||
planned = planned_total,
|
|
||||||
missing = missing_inscriptions.len(),
|
|
||||||
required = required_inscriptions,
|
|
||||||
"DA expectation missing inscriptions"
|
|
||||||
);
|
|
||||||
Err(DaExpectationError::MissingInscriptions {
|
|
||||||
planned: planned_total,
|
|
||||||
observed: observed_inscriptions,
|
|
||||||
required: required_inscriptions,
|
|
||||||
missing: missing_inscriptions,
|
|
||||||
}
|
|
||||||
.into())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn evaluate_blobs(&self, ctx: &RunContext, state: &CaptureState) -> Result<(), DynError> {
|
|
||||||
let planned_total = state.planned.len();
|
|
||||||
let BlobObservation {
|
|
||||||
observed_total_blobs,
|
|
||||||
channels_with_blobs,
|
|
||||||
} = self.observe_blobs(state);
|
|
||||||
|
|
||||||
let window = self.block_window(ctx, state);
|
|
||||||
let expected_total_blobs = self.expected_total_blobs(window.effective_blocks);
|
|
||||||
let required_blobs = minimum_required_u64(expected_total_blobs, MIN_BLOB_INCLUSION_RATIO);
|
|
||||||
|
|
||||||
if observed_total_blobs >= required_blobs {
|
|
||||||
tracing::info!(
|
|
||||||
planned_channels = planned_total,
|
|
||||||
channels_with_blobs = channels_with_blobs.len(),
|
|
||||||
inscriptions_observed = planned_total - self.missing_inscriptions(state).len(),
|
|
||||||
observed_total_blobs,
|
|
||||||
expected_total_blobs,
|
|
||||||
required_blobs,
|
|
||||||
observed_blocks = window.observed_blocks,
|
|
||||||
expected_blocks = window.expected_blocks,
|
|
||||||
effective_blocks = window.effective_blocks,
|
|
||||||
"DA inclusion expectation satisfied"
|
|
||||||
);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let missing_blob_channels = missing_channels(&state.planned, &channels_with_blobs);
|
|
||||||
tracing::warn!(
|
|
||||||
expected_total_blobs,
|
|
||||||
observed_total_blobs,
|
|
||||||
required_blobs,
|
|
||||||
observed_blocks = window.observed_blocks,
|
|
||||||
expected_blocks = window.expected_blocks,
|
|
||||||
effective_blocks = window.effective_blocks,
|
|
||||||
run_duration_secs = state.run_duration.as_secs(),
|
|
||||||
missing_blob_channels = missing_blob_channels.len(),
|
|
||||||
"DA expectation missing blobs"
|
|
||||||
);
|
|
||||||
|
|
||||||
Err(DaExpectationError::MissingBlobs {
|
|
||||||
expected_total_blobs,
|
|
||||||
observed_total_blobs,
|
|
||||||
required_blobs,
|
|
||||||
planned_channels: planned_total,
|
|
||||||
channels_with_blobs: channels_with_blobs.len(),
|
|
||||||
missing: missing_blob_channels,
|
|
||||||
}
|
|
||||||
.into())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn missing_inscriptions(&self, state: &CaptureState) -> Vec<ChannelId> {
|
|
||||||
let inscriptions = lock_or_recover(&state.inscriptions, "da_inscriptions");
|
|
||||||
missing_channels(&state.planned, &inscriptions)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn observe_blobs(&self, state: &CaptureState) -> BlobObservation {
|
|
||||||
let blobs = lock_or_recover(&state.blobs, "da_blobs");
|
|
||||||
let observed_total_blobs = blobs.values().sum::<u64>();
|
|
||||||
let channels_with_blobs = blobs
|
|
||||||
.iter()
|
|
||||||
.filter(|(_, count)| **count > 0)
|
|
||||||
.map(|(channel, _)| *channel)
|
|
||||||
.collect::<HashSet<_>>();
|
|
||||||
|
|
||||||
BlobObservation {
|
|
||||||
observed_total_blobs,
|
|
||||||
channels_with_blobs,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn block_window(&self, ctx: &RunContext, state: &CaptureState) -> BlockWindow {
|
|
||||||
let observed_blocks = state.run_blocks.load(Ordering::Relaxed).max(1);
|
|
||||||
let expected_blocks = ctx.run_metrics().expected_consensus_blocks().max(1);
|
|
||||||
let security_param = u64::from(
|
|
||||||
ctx.descriptors()
|
|
||||||
.config()
|
|
||||||
.consensus_params
|
|
||||||
.security_param
|
|
||||||
.get(),
|
|
||||||
)
|
|
||||||
.max(1);
|
|
||||||
|
|
||||||
let observed_inclusion_blocks = (observed_blocks / security_param).max(1);
|
|
||||||
let expected_inclusion_blocks = (expected_blocks / security_param).max(1);
|
|
||||||
let effective_blocks = observed_inclusion_blocks
|
|
||||||
.min(expected_inclusion_blocks)
|
|
||||||
.max(1);
|
|
||||||
|
|
||||||
BlockWindow {
|
|
||||||
observed_blocks,
|
|
||||||
expected_blocks,
|
|
||||||
effective_blocks,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn expected_total_blobs(&self, effective_blocks: u64) -> u64 {
|
|
||||||
self.blob_rate_per_block
|
|
||||||
.get()
|
|
||||||
.saturating_mul(effective_blocks)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn build_capture_state(planned_ids: Vec<ChannelId>, run_duration: Duration) -> CaptureState {
|
|
||||||
CaptureState {
|
|
||||||
planned: Arc::new(planned_ids.into_iter().collect()),
|
|
||||||
inscriptions: Arc::new(Mutex::new(HashSet::new())),
|
|
||||||
blobs: Arc::new(Mutex::new(HashMap::new())),
|
|
||||||
run_blocks: Arc::new(AtomicU64::new(0)),
|
|
||||||
run_duration,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn spawn_run_block_counter(
|
|
||||||
run_blocks: Arc<AtomicU64>,
|
|
||||||
run_duration: Duration,
|
|
||||||
block_feed: testing_framework_core::scenario::BlockFeed,
|
|
||||||
) {
|
|
||||||
let mut receiver = block_feed.subscribe();
|
|
||||||
spawn(async move {
|
|
||||||
let timer = sleep(run_duration);
|
|
||||||
pin!(timer);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
select! {
|
|
||||||
_ = &mut timer => break,
|
|
||||||
result = receiver.recv() => match result {
|
|
||||||
Ok(_) => {
|
|
||||||
run_blocks.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
Err(broadcast::error::RecvError::Lagged(_)) => {}
|
|
||||||
Err(broadcast::error::RecvError::Closed) => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn spawn_da_capture(
|
|
||||||
planned: Arc<HashSet<ChannelId>>,
|
|
||||||
inscriptions: Arc<Mutex<HashSet<ChannelId>>>,
|
|
||||||
blobs: Arc<Mutex<HashMap<ChannelId, u64>>>,
|
|
||||||
block_feed: testing_framework_core::scenario::BlockFeed,
|
|
||||||
) {
|
|
||||||
let mut receiver = block_feed.subscribe();
|
|
||||||
|
|
||||||
spawn(async move {
|
|
||||||
loop {
|
|
||||||
match receiver.recv().await {
|
|
||||||
Ok(record) => capture_block(record.as_ref(), &planned, &inscriptions, &blobs),
|
|
||||||
Err(broadcast::error::RecvError::Lagged(skipped)) => {
|
|
||||||
tracing::debug!(skipped, "DA expectation: receiver lagged");
|
|
||||||
}
|
|
||||||
Err(broadcast::error::RecvError::Closed) => {
|
|
||||||
tracing::debug!("DA expectation: block feed closed");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn capture_block(
|
|
||||||
block: &BlockRecord,
|
|
||||||
planned: &HashSet<ChannelId>,
|
|
||||||
inscriptions: &Arc<Mutex<HashSet<ChannelId>>>,
|
|
||||||
blobs: &Arc<Mutex<HashMap<ChannelId, u64>>>,
|
|
||||||
) {
|
|
||||||
let mut new_inscriptions = Vec::new();
|
|
||||||
let mut new_blobs = Vec::new();
|
|
||||||
|
|
||||||
for tx in block.block.transactions() {
|
|
||||||
for op in &tx.mantle_tx().ops {
|
|
||||||
match op {
|
|
||||||
Op::ChannelInscribe(inscribe) if planned.contains(&inscribe.channel_id) => {
|
|
||||||
new_inscriptions.push(inscribe.channel_id);
|
|
||||||
}
|
|
||||||
Op::ChannelBlob(blob) if planned.contains(&blob.channel) => {
|
|
||||||
new_blobs.push(blob.channel);
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !new_inscriptions.is_empty() {
|
|
||||||
let mut guard = lock_or_recover(inscriptions, "da_inscriptions");
|
|
||||||
guard.extend(new_inscriptions);
|
|
||||||
tracing::debug!(count = guard.len(), "DA expectation captured inscriptions");
|
|
||||||
}
|
|
||||||
|
|
||||||
if !new_blobs.is_empty() {
|
|
||||||
let mut guard = lock_or_recover(blobs, "da_blobs");
|
|
||||||
for channel in new_blobs {
|
|
||||||
let entry = guard.entry(channel).or_insert(0);
|
|
||||||
*entry += 1;
|
|
||||||
}
|
|
||||||
tracing::debug!(
|
|
||||||
total_blobs = guard.values().sum::<u64>(),
|
|
||||||
"DA expectation captured blobs"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn missing_channels(planned: &HashSet<ChannelId>, observed: &HashSet<ChannelId>) -> Vec<ChannelId> {
|
|
||||||
planned.difference(observed).copied().collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn minimum_required(total: usize, ratio: f64) -> usize {
|
|
||||||
((total as f64) * ratio).ceil() as usize
|
|
||||||
}
|
|
||||||
|
|
||||||
fn minimum_required_u64(total: u64, ratio: f64) -> u64 {
|
|
||||||
((total as f64) * ratio).ceil() as u64
|
|
||||||
}
|
|
||||||
@ -1,4 +0,0 @@
|
|||||||
mod expectation;
|
|
||||||
mod workload;
|
|
||||||
|
|
||||||
pub use workload::Workload;
|
|
||||||
@ -1,323 +0,0 @@
|
|||||||
use std::{num::NonZeroU64, sync::Arc, time::Duration};
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use executor_http_client::ExecutorHttpClient;
|
|
||||||
use futures::future::try_join_all;
|
|
||||||
use key_management_system_service::keys::{Ed25519Key, Ed25519PublicKey};
|
|
||||||
use nomos_core::{
|
|
||||||
da::BlobId,
|
|
||||||
mantle::{
|
|
||||||
AuthenticatedMantleTx as _,
|
|
||||||
ops::{
|
|
||||||
Op,
|
|
||||||
channel::{ChannelId, MsgId},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
};
|
|
||||||
use rand::{RngCore as _, seq::SliceRandom as _, thread_rng};
|
|
||||||
use testing_framework_core::{
|
|
||||||
nodes::ApiClient,
|
|
||||||
scenario::{BlockRecord, DynError, Expectation, RunContext, Workload as ScenarioWorkload},
|
|
||||||
};
|
|
||||||
use tokio::{sync::broadcast, time::sleep};
|
|
||||||
|
|
||||||
use super::expectation::DaWorkloadExpectation;
|
|
||||||
use crate::{
|
|
||||||
util::tx,
|
|
||||||
workloads::util::{find_channel_op, submit_transaction_via_cluster},
|
|
||||||
};
|
|
||||||
|
|
||||||
const TEST_KEY_BYTES: [u8; 32] = [0u8; 32];
|
|
||||||
const BLOB_CHUNK_OPTIONS: &[usize] = &[1, 2, 4, 8];
|
|
||||||
const PUBLISH_RETRIES: usize = 5;
|
|
||||||
const PUBLISH_RETRY_DELAY: Duration = Duration::from_secs(2);
|
|
||||||
const DEFAULT_HEADROOM_PERCENT: u64 = 20;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Workload {
|
|
||||||
blob_rate_per_block: NonZeroU64,
|
|
||||||
channel_rate_per_block: NonZeroU64,
|
|
||||||
headroom_percent: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for Workload {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::with_rate(NonZeroU64::MIN, NonZeroU64::MIN, DEFAULT_HEADROOM_PERCENT)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Workload {
|
|
||||||
/// Creates a workload that targets a blobs-per-block rate and applies a
|
|
||||||
/// headroom factor when deriving the channel count.
|
|
||||||
#[must_use]
|
|
||||||
pub const fn with_rate(
|
|
||||||
blob_rate_per_block: NonZeroU64,
|
|
||||||
channel_rate_per_block: NonZeroU64,
|
|
||||||
headroom_percent: u64,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
blob_rate_per_block,
|
|
||||||
channel_rate_per_block,
|
|
||||||
headroom_percent,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[must_use]
|
|
||||||
pub const fn default_headroom_percent() -> u64 {
|
|
||||||
DEFAULT_HEADROOM_PERCENT
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl ScenarioWorkload for Workload {
|
|
||||||
fn name(&self) -> &'static str {
|
|
||||||
"channel_workload"
|
|
||||||
}
|
|
||||||
|
|
||||||
fn expectations(&self) -> Vec<Box<dyn Expectation>> {
|
|
||||||
vec![Box::new(DaWorkloadExpectation::new(
|
|
||||||
self.blob_rate_per_block,
|
|
||||||
self.channel_rate_per_block,
|
|
||||||
self.headroom_percent,
|
|
||||||
))]
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn start(&self, ctx: &RunContext) -> Result<(), DynError> {
|
|
||||||
let planned_channels = planned_channel_ids(planned_channel_count(
|
|
||||||
self.channel_rate_per_block,
|
|
||||||
self.headroom_percent,
|
|
||||||
));
|
|
||||||
|
|
||||||
let expected_blobs = planned_blob_count(
|
|
||||||
self.blob_rate_per_block,
|
|
||||||
ctx.run_metrics().expected_consensus_blocks(),
|
|
||||||
ctx.descriptors()
|
|
||||||
.config()
|
|
||||||
.consensus_params
|
|
||||||
.security_param
|
|
||||||
.get()
|
|
||||||
.into(),
|
|
||||||
);
|
|
||||||
let per_channel_target =
|
|
||||||
per_channel_blob_target(expected_blobs, planned_channels.len().max(1) as u64);
|
|
||||||
|
|
||||||
tracing::info!(
|
|
||||||
blob_rate_per_block = self.blob_rate_per_block.get(),
|
|
||||||
channel_rate = self.channel_rate_per_block.get(),
|
|
||||||
headroom_percent = self.headroom_percent,
|
|
||||||
planned_channels = planned_channels.len(),
|
|
||||||
expected_blobs,
|
|
||||||
per_channel_target,
|
|
||||||
"DA workload derived planned channels"
|
|
||||||
);
|
|
||||||
|
|
||||||
try_join_all(planned_channels.into_iter().map(|channel_id| {
|
|
||||||
let ctx = ctx;
|
|
||||||
async move {
|
|
||||||
tracing::info!(channel_id = ?channel_id, blobs = per_channel_target, "DA workload starting channel flow");
|
|
||||||
run_channel_flow(ctx, channel_id, per_channel_target).await?;
|
|
||||||
tracing::info!(channel_id = ?channel_id, "DA workload finished channel flow");
|
|
||||||
Ok::<(), DynError>(())
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
tracing::info!("DA workload completed all channel flows");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_channel_flow(
|
|
||||||
ctx: &RunContext,
|
|
||||||
channel_id: ChannelId,
|
|
||||||
target_blobs: u64,
|
|
||||||
) -> Result<(), DynError> {
|
|
||||||
tracing::debug!(channel_id = ?channel_id, "DA: submitting inscription tx");
|
|
||||||
let inscription_tx = Arc::new(tx::create_inscription_transaction_with_id(channel_id)?);
|
|
||||||
submit_transaction_via_cluster(ctx, Arc::clone(&inscription_tx)).await?;
|
|
||||||
|
|
||||||
let mut receiver = ctx.block_feed().subscribe();
|
|
||||||
let inscription_id = wait_for_inscription(&mut receiver, channel_id).await?;
|
|
||||||
|
|
||||||
let mut parent_id = inscription_id;
|
|
||||||
for idx in 0..target_blobs {
|
|
||||||
let payload = random_blob_payload();
|
|
||||||
let published_blob_id = publish_blob(ctx, channel_id, parent_id, payload).await?;
|
|
||||||
let (next_parent, included_blob_id) =
|
|
||||||
wait_for_blob_with_parent(&mut receiver, channel_id, parent_id).await?;
|
|
||||||
parent_id = next_parent;
|
|
||||||
|
|
||||||
tracing::debug!(
|
|
||||||
channel_id = ?channel_id,
|
|
||||||
blob_index = idx,
|
|
||||||
published_blob_id = ?published_blob_id,
|
|
||||||
included_blob_id = ?included_blob_id,
|
|
||||||
"DA: blob published"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn wait_for_inscription(
|
|
||||||
receiver: &mut broadcast::Receiver<Arc<BlockRecord>>,
|
|
||||||
channel_id: ChannelId,
|
|
||||||
) -> Result<MsgId, DynError> {
|
|
||||||
wait_for_channel_op(receiver, move |op| {
|
|
||||||
if let Op::ChannelInscribe(inscribe) = op
|
|
||||||
&& inscribe.channel_id == channel_id
|
|
||||||
{
|
|
||||||
Some(inscribe.id())
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn wait_for_blob_with_parent(
|
|
||||||
receiver: &mut broadcast::Receiver<Arc<BlockRecord>>,
|
|
||||||
channel_id: ChannelId,
|
|
||||||
parent_msg: MsgId,
|
|
||||||
) -> Result<(MsgId, BlobId), DynError> {
|
|
||||||
loop {
|
|
||||||
match receiver.recv().await {
|
|
||||||
Ok(record) => {
|
|
||||||
for tx in record.block.transactions() {
|
|
||||||
for op in &tx.mantle_tx().ops {
|
|
||||||
if let Op::ChannelBlob(blob_op) = op
|
|
||||||
&& blob_op.channel == channel_id
|
|
||||||
&& blob_op.parent == parent_msg
|
|
||||||
{
|
|
||||||
let msg_id = blob_op.id();
|
|
||||||
return Ok((msg_id, blob_op.blob));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(broadcast::error::RecvError::Lagged(_)) => {}
|
|
||||||
Err(broadcast::error::RecvError::Closed) => {
|
|
||||||
return Err("block feed closed while waiting for channel operations".into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn wait_for_channel_op<F>(
|
|
||||||
receiver: &mut broadcast::Receiver<Arc<BlockRecord>>,
|
|
||||||
mut matcher: F,
|
|
||||||
) -> Result<MsgId, DynError>
|
|
||||||
where
|
|
||||||
F: FnMut(&Op) -> Option<MsgId>,
|
|
||||||
{
|
|
||||||
loop {
|
|
||||||
match receiver.recv().await {
|
|
||||||
Ok(record) => {
|
|
||||||
if let Some(msg_id) = find_channel_op(record.block.as_ref(), &mut matcher) {
|
|
||||||
tracing::debug!(?msg_id, "DA: matched channel operation");
|
|
||||||
return Ok(msg_id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(broadcast::error::RecvError::Lagged(_)) => {}
|
|
||||||
Err(broadcast::error::RecvError::Closed) => {
|
|
||||||
return Err("block feed closed while waiting for channel operations".into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn publish_blob(
|
|
||||||
ctx: &RunContext,
|
|
||||||
channel_id: ChannelId,
|
|
||||||
parent_msg: MsgId,
|
|
||||||
data: Vec<u8>,
|
|
||||||
) -> Result<BlobId, DynError> {
|
|
||||||
let executors = ctx.node_clients().executor_clients();
|
|
||||||
if executors.is_empty() {
|
|
||||||
return Err("da workload requires at least one executor".into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let signer = test_signer();
|
|
||||||
tracing::debug!(channel = ?channel_id, payload_bytes = data.len(), "DA: prepared blob payload");
|
|
||||||
let client = ExecutorHttpClient::new(None);
|
|
||||||
|
|
||||||
let mut candidates: Vec<&ApiClient> = executors.iter().collect();
|
|
||||||
let mut last_err = None;
|
|
||||||
for attempt in 1..=PUBLISH_RETRIES {
|
|
||||||
candidates.shuffle(&mut thread_rng());
|
|
||||||
for executor in &candidates {
|
|
||||||
let executor_url = executor.base_url().clone();
|
|
||||||
match client
|
|
||||||
.publish_blob(executor_url, channel_id, parent_msg, signer, data.clone())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(blob_id) => return Ok(blob_id),
|
|
||||||
Err(err) => {
|
|
||||||
tracing::debug!(attempt, executor = %executor.base_url(), %err, "DA: publish_blob failed");
|
|
||||||
last_err = Some(err.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if attempt < PUBLISH_RETRIES {
|
|
||||||
sleep(PUBLISH_RETRY_DELAY).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(last_err.unwrap_or_else(|| "da workload could not publish blob".into()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn test_signer() -> Ed25519PublicKey {
|
|
||||||
Ed25519Key::from_bytes(&TEST_KEY_BYTES).public_key()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn random_blob_payload() -> Vec<u8> {
|
|
||||||
let mut rng = thread_rng();
|
|
||||||
// KZGRS encoder expects the polynomial degree to be a power of two, which
|
|
||||||
// effectively constrains the blob chunk count.
|
|
||||||
let chunks = BLOB_CHUNK_OPTIONS.choose(&mut rng).copied().unwrap_or(1);
|
|
||||||
let mut data = vec![0u8; 31 * chunks];
|
|
||||||
rng.fill_bytes(&mut data);
|
|
||||||
data
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn planned_channel_ids(total: usize) -> Vec<ChannelId> {
|
|
||||||
(0..total as u64)
|
|
||||||
.map(deterministic_channel_id)
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn deterministic_channel_id(index: u64) -> ChannelId {
|
|
||||||
let mut bytes = [0u8; 32];
|
|
||||||
bytes[..8].copy_from_slice(b"chn_wrkd");
|
|
||||||
bytes[24..].copy_from_slice(&index.to_be_bytes());
|
|
||||||
ChannelId::from(bytes)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[must_use]
|
|
||||||
pub fn planned_channel_count(channel_rate_per_block: NonZeroU64, headroom_percent: u64) -> usize {
|
|
||||||
let base = channel_rate_per_block.get() as usize;
|
|
||||||
let extra = (base.saturating_mul(headroom_percent as usize) + 99) / 100;
|
|
||||||
let total = base.saturating_add(extra);
|
|
||||||
total.max(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[must_use]
|
|
||||||
pub fn planned_blob_count(
|
|
||||||
blob_rate_per_block: NonZeroU64,
|
|
||||||
expected_consensus_blocks: u64,
|
|
||||||
security_param: u64,
|
|
||||||
) -> u64 {
|
|
||||||
let expected_blocks = expected_consensus_blocks.max(1);
|
|
||||||
let security_param = security_param.max(1);
|
|
||||||
let inclusion_blocks = (expected_blocks / security_param).max(1);
|
|
||||||
blob_rate_per_block.get().saturating_mul(inclusion_blocks)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[must_use]
|
|
||||||
pub fn per_channel_blob_target(total_blobs: u64, channel_count: u64) -> u64 {
|
|
||||||
if channel_count == 0 {
|
|
||||||
return total_blobs.max(1);
|
|
||||||
}
|
|
||||||
let per = (total_blobs + channel_count - 1) / channel_count;
|
|
||||||
per.max(1)
|
|
||||||
}
|
|
||||||
@ -1,5 +1,4 @@
|
|||||||
pub mod chaos;
|
pub mod chaos;
|
||||||
pub mod da;
|
|
||||||
pub mod transaction;
|
pub mod transaction;
|
||||||
pub mod util;
|
pub mod util;
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user