diff --git a/examples/src/bin/compose_runner.rs b/examples/src/bin/compose_runner.rs index f19c7de..1917f0c 100644 --- a/examples/src/bin/compose_runner.rs +++ b/examples/src/bin/compose_runner.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{env, error::Error, process, str::FromStr, time::Duration}; use runner_examples::{ChaosBuilderExt as _, ScenarioBuilderExt as _}; use testing_framework_core::scenario::{Deployer as _, Runner, ScenarioBuilder}; @@ -12,15 +12,24 @@ const MIXED_TXS_PER_BLOCK: u64 = 5; const TOTAL_WALLETS: usize = 1000; const TRANSACTION_WALLETS: usize = 500; +// Chaos Testing Constants +const CHAOS_MIN_DELAY_SECS: u64 = 120; +const CHAOS_MAX_DELAY_SECS: u64 = 180; +const CHAOS_COOLDOWN_SECS: u64 = 240; + +// DA Testing Constants +const DA_CHANNEL_RATE: u64 = 1; +const DA_BLOB_RATE: u64 = 1; + #[tokio::main] async fn main() { // Compose containers mount KZG params at /kzgrs_test_params; ensure the // generated configs point there unless the caller overrides explicitly. - if std::env::var("NOMOS_KZGRS_PARAMS_PATH").is_err() { + if env::var("NOMOS_KZGRS_PARAMS_PATH").is_err() { // Safe: setting a process-wide environment variable before any threads // or async tasks are spawned. unsafe { - std::env::set_var( + env::set_var( "NOMOS_KZGRS_PARAMS_PATH", "/kzgrs_test_params/kzgrs_test_params", ); @@ -48,7 +57,7 @@ async fn main() { if let Err(err) = run_compose_case(validators, executors, Duration::from_secs(run_secs)).await { warn!("compose runner demo failed: {err}"); - std::process::exit(1); + process::exit(1); } } @@ -57,7 +66,7 @@ async fn run_compose_case( validators: usize, executors: usize, run_duration: Duration, -) -> Result<(), Box> { +) -> Result<(), Box> { info!( validators, executors, @@ -74,9 +83,9 @@ async fn run_compose_case( .chaos_with(|c| { c.restart() // Keep chaos restarts outside the test run window to avoid crash loops on restart. - .min_delay(Duration::from_secs(120)) - .max_delay(Duration::from_secs(180)) - .target_cooldown(Duration::from_secs(240)) + .min_delay(Duration::from_secs(CHAOS_MIN_DELAY_SECS)) + .max_delay(Duration::from_secs(CHAOS_MAX_DELAY_SECS)) + .target_cooldown(Duration::from_secs(CHAOS_COOLDOWN_SECS)) .apply() }) .wallets(TOTAL_WALLETS) @@ -85,8 +94,8 @@ async fn run_compose_case( .users(TRANSACTION_WALLETS) }) .da_with(|da| { - da.channel_rate(1) - .blob_rate(1) + da.channel_rate(DA_CHANNEL_RATE) + .blob_rate(DA_BLOB_RATE) }) .with_run_duration(run_duration) .expect_consensus_liveness() @@ -103,6 +112,7 @@ async fn run_compose_case( } Err(err) => return Err(err.into()), }; + if !runner.context().telemetry().is_configured() { warn!("compose runner should expose prometheus metrics"); } @@ -113,13 +123,9 @@ async fn run_compose_case( fn read_env_any(keys: &[&str], default: T) -> T where - T: std::str::FromStr + Copy, + T: FromStr + Copy, { keys.iter() - .find_map(|key| { - std::env::var(key) - .ok() - .and_then(|raw| raw.parse::().ok()) - }) + .find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::().ok())) .unwrap_or(default) } diff --git a/examples/src/bin/k8s_runner.rs b/examples/src/bin/k8s_runner.rs index 33d0b22..c20128e 100644 --- a/examples/src/bin/k8s_runner.rs +++ b/examples/src/bin/k8s_runner.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{env, error::Error, process, str::FromStr, time::Duration}; use runner_examples::ScenarioBuilderExt as _; use testing_framework_core::scenario::{Deployer as _, Runner, ScenarioBuilder}; @@ -11,6 +11,8 @@ const DEFAULT_EXECUTORS: usize = 1; const MIXED_TXS_PER_BLOCK: u64 = 5; const TOTAL_WALLETS: usize = 1000; const TRANSACTION_WALLETS: usize = 500; +const DA_BLOB_RATE: u64 = 1; +const MIN_CONSENSUS_HEIGHT: u64 = 5; #[tokio::main] async fn main() { @@ -32,7 +34,7 @@ async fn main() { if let Err(err) = run_k8s_case(validators, executors, Duration::from_secs(run_secs)).await { warn!("k8s runner demo failed: {err}"); - std::process::exit(1); + process::exit(1); } } @@ -41,7 +43,7 @@ async fn run_k8s_case( validators: usize, executors: usize, run_duration: Duration, -) -> Result<(), Box> { +) -> Result<(), Box> { info!( validators, executors, @@ -59,7 +61,7 @@ async fn run_k8s_case( .users(TRANSACTION_WALLETS) }) .da_with(|da| { - da.blob_rate(1) + da.blob_rate(DA_BLOB_RATE) }) .with_run_duration(run_duration) .expect_consensus_liveness() @@ -96,9 +98,9 @@ async fn run_k8s_case( .consensus_info() .await .map_err(|err| format!("validator {idx} consensus_info failed: {err}"))?; - if info.height < 5 { + if info.height < MIN_CONSENSUS_HEIGHT { return Err(format!( - "validator {idx} height {} should reach at least 5 blocks", + "validator {idx} height {} should reach at least {MIN_CONSENSUS_HEIGHT} blocks", info.height ) .into()); @@ -113,13 +115,9 @@ async fn run_k8s_case( fn read_env_any(keys: &[&str], default: T) -> T where - T: std::str::FromStr + Copy, + T: FromStr + Copy, { keys.iter() - .find_map(|key| { - std::env::var(key) - .ok() - .and_then(|raw| raw.parse::().ok()) - }) + .find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::().ok())) .unwrap_or(default) } diff --git a/examples/src/bin/local_runner.rs b/examples/src/bin/local_runner.rs index e980ba4..bc72247 100644 --- a/examples/src/bin/local_runner.rs +++ b/examples/src/bin/local_runner.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{env, error::Error, process, str::FromStr, time::Duration}; use runner_examples::ScenarioBuilderExt as _; use testing_framework_core::scenario::{Deployer as _, Runner, ScenarioBuilder}; @@ -11,15 +11,15 @@ const DEFAULT_RUN_SECS: u64 = 60; const MIXED_TXS_PER_BLOCK: u64 = 5; const TOTAL_WALLETS: usize = 1000; const TRANSACTION_WALLETS: usize = 500; +const DA_BLOB_RATE: u64 = 1; #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); - if std::env::var("POL_PROOF_DEV_MODE").is_err() { + if env::var("POL_PROOF_DEV_MODE").is_err() { warn!("POL_PROOF_DEV_MODE=true is required for the local runner demo"); - - std::process::exit(1); + process::exit(1); } let validators = read_env_any( @@ -42,8 +42,7 @@ async fn main() { if let Err(err) = run_local_case(validators, executors, Duration::from_secs(run_secs)).await { warn!("local runner demo failed: {err}"); - - std::process::exit(1); + process::exit(1); } } @@ -51,7 +50,7 @@ async fn run_local_case( validators: usize, executors: usize, run_duration: Duration, -) -> Result<(), Box> { +) -> Result<(), Box> { info!( validators, executors, @@ -64,7 +63,7 @@ async fn run_local_case( }) .wallets(TOTAL_WALLETS) .transactions_with(|txs| txs.rate(MIXED_TXS_PER_BLOCK).users(TRANSACTION_WALLETS)) - .da_with(|da| da.blob_rate(1)) + .da_with(|da| da.blob_rate(DA_BLOB_RATE)) .with_run_duration(run_duration) .expect_consensus_liveness() .build(); @@ -83,13 +82,9 @@ async fn run_local_case( fn read_env_any(keys: &[&str], default: T) -> T where - T: std::str::FromStr + Copy, + T: FromStr + Copy, { keys.iter() - .find_map(|key| { - std::env::var(key) - .ok() - .and_then(|raw| raw.parse::().ok()) - }) + .find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::().ok())) .unwrap_or(default) } diff --git a/testing-framework/configs/src/lib.rs b/testing-framework/configs/src/lib.rs index fe87c39..a54fa87 100644 --- a/testing-framework/configs/src/lib.rs +++ b/testing-framework/configs/src/lib.rs @@ -14,10 +14,16 @@ pub static IS_DEBUG_TRACING: LazyLock = LazyLock::new(|| { env::var("NOMOS_TESTS_TRACING").is_ok_and(|val| val.eq_ignore_ascii_case("true")) }); +const SLOW_ENV_TIMEOUT_MULTIPLIER: u32 = 2; + /// In slow test environments like Codecov, use 2x timeout. #[must_use] pub fn adjust_timeout(d: Duration) -> Duration { - if *IS_SLOW_TEST_ENV { d.mul(2) } else { d } + if *IS_SLOW_TEST_ENV { + d.mul(SLOW_ENV_TIMEOUT_MULTIPLIER) + } else { + d + } } #[must_use] diff --git a/testing-framework/configs/src/nodes/blend.rs b/testing-framework/configs/src/nodes/blend.rs index da539b0..e414461 100644 --- a/testing-framework/configs/src/nodes/blend.rs +++ b/testing-framework/configs/src/nodes/blend.rs @@ -20,6 +20,19 @@ use crate::{ topology::configs::blend::GeneralBlendConfig as TopologyBlendConfig, }; +// Blend service constants +const BLEND_LAYERS_COUNT: u64 = 1; +const MINIMUM_NETWORK_SIZE: u64 = 1; +const ROUND_DURATION_SECS: u64 = 1; +const ROUNDS_PER_INTERVAL: u64 = 30; +const ROUNDS_PER_SESSION: u64 = 648_000; +const ROUNDS_PER_OBSERVATION_WINDOW: u64 = 30; +const ROUNDS_PER_SESSION_TRANSITION: u64 = 30; +const EPOCH_TRANSITION_SLOTS: u64 = 2_600; +const SAFETY_BUFFER_INTERVALS: u64 = 100; +const MESSAGE_FREQUENCY_PER_ROUND: f64 = 1.0; +const MAX_RELEASE_DELAY_ROUNDS: u64 = 3; + pub(crate) fn build_blend_service_config( config: &TopologyBlendConfig, ) -> ( @@ -60,26 +73,35 @@ pub(crate) fn build_blend_service_config( let deployment_settings = BlendDeploymentSettings { common: blend_deployment::CommonSettings { - num_blend_layers: NonZeroU64::try_from(1).unwrap(), - minimum_network_size: NonZeroU64::try_from(1).unwrap(), + num_blend_layers: NonZeroU64::try_from(BLEND_LAYERS_COUNT).unwrap(), + minimum_network_size: NonZeroU64::try_from(MINIMUM_NETWORK_SIZE).unwrap(), timing: TimingSettings { - round_duration: Duration::from_secs(1), - rounds_per_interval: NonZeroU64::try_from(30u64).unwrap(), - rounds_per_session: NonZeroU64::try_from(648_000u64).unwrap(), - rounds_per_observation_window: NonZeroU64::try_from(30u64).unwrap(), - rounds_per_session_transition_period: NonZeroU64::try_from(30u64).unwrap(), - epoch_transition_period_in_slots: NonZeroU64::try_from(2_600).unwrap(), + round_duration: Duration::from_secs(ROUND_DURATION_SECS), + rounds_per_interval: NonZeroU64::try_from(ROUNDS_PER_INTERVAL).unwrap(), + rounds_per_session: NonZeroU64::try_from(ROUNDS_PER_SESSION).unwrap(), + rounds_per_observation_window: NonZeroU64::try_from(ROUNDS_PER_OBSERVATION_WINDOW) + .unwrap(), + rounds_per_session_transition_period: NonZeroU64::try_from( + ROUNDS_PER_SESSION_TRANSITION, + ) + .unwrap(), + epoch_transition_period_in_slots: NonZeroU64::try_from(EPOCH_TRANSITION_SLOTS) + .unwrap(), }, protocol_name: backend_core.protocol_name.clone(), }, core: blend_deployment::CoreSettings { scheduler: SchedulerSettings { cover: CoverTrafficSettings { - intervals_for_safety_buffer: 100, - message_frequency_per_round: NonNegativeF64::try_from(1f64).unwrap(), + intervals_for_safety_buffer: SAFETY_BUFFER_INTERVALS, + message_frequency_per_round: NonNegativeF64::try_from( + MESSAGE_FREQUENCY_PER_ROUND, + ) + .unwrap(), }, delayer: MessageDelayerSettings { - maximum_release_delay_in_rounds: NonZeroU64::try_from(3u64).unwrap(), + maximum_release_delay_in_rounds: NonZeroU64::try_from(MAX_RELEASE_DELAY_ROUNDS) + .unwrap(), }, }, minimum_messages_coefficient: backend_core.minimum_messages_coefficient, diff --git a/testing-framework/configs/src/nodes/common.rs b/testing-framework/configs/src/nodes/common.rs index 42def3d..546e45b 100644 --- a/testing-framework/configs/src/nodes/common.rs +++ b/testing-framework/configs/src/nodes/common.rs @@ -34,6 +34,19 @@ use nomos_wallet::WalletServiceSettings; use crate::{timeouts, topology::configs::GeneralConfig}; +// Configuration constants +const CRYPTARCHIA_GOSSIPSUB_PROTOCOL: &str = "/cryptarchia/proto"; +const MEMPOOL_PUBSUB_TOPIC: &str = "mantle"; +const STATE_RECORDING_INTERVAL_SECS: u64 = 60; +const IBD_DOWNLOAD_DELAY_SECS: u64 = 10; +const MAX_ORPHAN_CACHE_SIZE: usize = 5; +const DA_PUBLISH_THRESHOLD: f64 = 0.8; +const API_RATE_LIMIT_PER_SECOND: u64 = 10000; +const API_RATE_LIMIT_BURST: u32 = 10000; +const API_MAX_CONCURRENT_REQUESTS: usize = 1000; +const BLOB_STORAGE_DIR: &str = "./"; +const KZG_PARAMS_FILENAME: &str = "kzgrs_test_params"; + pub(crate) fn cryptarchia_deployment(config: &GeneralConfig) -> CryptarchiaDeploymentSettings { CryptarchiaDeploymentSettings { epoch_config: config.consensus_config.ledger_config.epoch_config, @@ -47,7 +60,7 @@ pub(crate) fn cryptarchia_deployment(config: &GeneralConfig) -> CryptarchiaDeplo .clone(), min_stake: config.consensus_config.ledger_config.sdp_config.min_stake, }, - gossipsub_protocol: "/cryptarchia/proto".to_owned(), + gossipsub_protocol: CRYPTARCHIA_GOSSIPSUB_PROTOCOL.to_owned(), } } @@ -59,7 +72,7 @@ pub(crate) fn time_deployment(config: &GeneralConfig) -> TimeDeploymentSettings pub(crate) fn mempool_deployment() -> MempoolDeploymentSettings { MempoolDeploymentSettings { - pubsub_topic: "mantle".to_owned(), + pubsub_topic: MEMPOOL_PUBSUB_TOPIC.to_owned(), } } @@ -77,7 +90,7 @@ pub(crate) fn cryptarchia_config(config: &GeneralConfig) -> CryptarchiaConfig { force_bootstrap: false, offline_grace_period: chain_service::OfflineGracePeriodConfig { grace_period: timeouts::grace_period(), - state_recording_interval: Duration::from_secs(60), + state_recording_interval: Duration::from_secs(STATE_RECORDING_INTERVAL_SECS), }, }, }, @@ -85,12 +98,12 @@ pub(crate) fn cryptarchia_config(config: &GeneralConfig) -> CryptarchiaConfig { bootstrap: ChainBootstrapConfig { ibd: chain_network::IbdConfig { peers: HashSet::new(), - delay_before_new_download: Duration::from_secs(10), + delay_before_new_download: Duration::from_secs(IBD_DOWNLOAD_DELAY_SECS), }, }, sync: SyncConfig { orphan: OrphanConfig { - max_orphan_cache_size: NonZeroUsize::new(5) + max_orphan_cache_size: NonZeroUsize::new(MAX_ORPHAN_CACHE_SIZE) .expect("Max orphan cache size must be non-zero"), }, }, @@ -104,9 +117,11 @@ pub(crate) fn cryptarchia_config(config: &GeneralConfig) -> CryptarchiaConfig { fn kzg_params_path(raw: &str) -> String { let path = PathBuf::from(raw); + if path.is_dir() { - return path.join("kzgrs_test_params").to_string_lossy().to_string(); + return path.join(KZG_PARAMS_FILENAME).to_string_lossy().to_string(); } + path.to_string_lossy().to_string() } @@ -121,10 +136,10 @@ pub(crate) fn da_verifier_config( tx_verifier_settings: (), network_adapter_settings: (), storage_adapter_settings: VerifierStorageAdapterSettings { - blob_storage_directory: "./".into(), + blob_storage_directory: BLOB_STORAGE_DIR.into(), }, mempool_trigger_settings: MempoolPublishTriggerConfig { - publish_threshold: NonNegativeF64::try_from(0.8).unwrap(), + publish_threshold: NonNegativeF64::try_from(DA_PUBLISH_THRESHOLD).unwrap(), share_duration: timeouts::share_duration(), prune_duration: timeouts::prune_duration(), prune_interval: timeouts::prune_interval(), @@ -180,9 +195,9 @@ pub(crate) fn http_config(config: &GeneralConfig) -> ApiServiceSettings bool { + pub async fn wait_for_exit(&mut self, timeout: Duration) -> bool { self.handle.wait_for_exit(timeout).await } } @@ -88,7 +93,7 @@ impl NodeConfigCommon for Config { self.tracing.logger = logger; } - fn set_paths(&mut self, base: &std::path::Path) { + fn set_paths(&mut self, base: &Path) { self.storage.db_path = base.join("db"); base.clone_into( &mut self @@ -98,7 +103,7 @@ impl NodeConfigCommon for Config { ); } - fn addresses(&self) -> (std::net::SocketAddr, Option) { + fn addresses(&self) -> (SocketAddr, Option) { ( self.http.backend_settings.address, Some(self.testing_http.backend_settings.address), diff --git a/testing-framework/core/src/topology/generation.rs b/testing-framework/core/src/topology/generation.rs index b2b0197..fd30ee7 100644 --- a/testing-framework/core/src/topology/generation.rs +++ b/testing-framework/core/src/topology/generation.rs @@ -124,8 +124,11 @@ impl GeneratedTopology { pub async fn wait_remote_readiness( &self, + // Node endpoints validator_endpoints: &[Url], executor_endpoints: &[Url], + + // Membership endpoints validator_membership_endpoints: Option<&[Url]>, executor_membership_endpoints: Option<&[Url]>, ) -> Result<(), ReadinessError> { @@ -151,6 +154,7 @@ impl GeneratedTopology { let labels = self.labels(); let client = Client::new(); + let make_testing_base_url = |port: u16| -> Url { Url::parse(&format!("http://127.0.0.1:{port}/")) .expect("failed to construct local testing base url") @@ -163,6 +167,7 @@ impl GeneratedTopology { &listen_ports, &initial_peer_ports, ); + let network_check = HttpNetworkReadiness { client: &client, endpoints: &endpoints, @@ -180,6 +185,7 @@ impl GeneratedTopology { urls.len(), "validator membership endpoints must match topology" ); + membership_endpoints.extend_from_slice(urls); } else { membership_endpoints.extend( @@ -195,6 +201,7 @@ impl GeneratedTopology { urls.len(), "executor membership endpoints must match topology" ); + membership_endpoints.extend_from_slice(urls); } else { membership_endpoints.extend( @@ -282,6 +289,7 @@ pub fn find_expected_peer_counts( let Some(peer_idx) = listen_ports.iter().position(|p| p == port) else { continue; }; + if peer_idx == idx { continue; } diff --git a/testing-framework/runners/compose/src/deployer/orchestrator.rs b/testing-framework/runners/compose/src/deployer/orchestrator.rs index b33ae42..598a0ae 100644 --- a/testing-framework/runners/compose/src/deployer/orchestrator.rs +++ b/testing-framework/runners/compose/src/deployer/orchestrator.rs @@ -92,6 +92,7 @@ impl DeploymentOrchestrator { host, environment.grafana_port() ); + print_profiling_urls(&host, &host_ports); } diff --git a/testing-framework/runners/compose/src/deployer/ports.rs b/testing-framework/runners/compose/src/deployer/ports.rs index 9acf480..624259a 100644 --- a/testing-framework/runners/compose/src/deployer/ports.rs +++ b/testing-framework/runners/compose/src/deployer/ports.rs @@ -35,6 +35,7 @@ impl PortManager { environment .fail("failed to determine container host ports") .await; + tracing::warn!(%err, "failed to resolve host ports"); Err(err) } diff --git a/testing-framework/runners/compose/src/deployer/setup.rs b/testing-framework/runners/compose/src/deployer/setup.rs index e137ba5..3d9036d 100644 --- a/testing-framework/runners/compose/src/deployer/setup.rs +++ b/testing-framework/runners/compose/src/deployer/setup.rs @@ -53,14 +53,17 @@ impl DeploymentSetup { if prometheus_env.is_some() { info!(port = prometheus_env, "using prometheus port from env"); } + let prometheus_port = prometheus_env .and_then(|port| reserve_port(port)) .or_else(|| allocate_prometheus_port()) .unwrap_or_else(|| PortReservation::new(DEFAULT_PROMETHEUS_PORT, None)); + debug!( prometheus_port = prometheus_port.port(), "selected prometheus port" ); + let environment = prepare_environment(&self.descriptors, prometheus_port, prometheus_env.is_some()) .await?; diff --git a/testing-framework/runners/compose/src/infrastructure/environment.rs b/testing-framework/runners/compose/src/infrastructure/environment.rs index 86ec6ff..2310107 100644 --- a/testing-framework/runners/compose/src/infrastructure/environment.rs +++ b/testing-framework/runners/compose/src/infrastructure/environment.rs @@ -408,12 +408,14 @@ pub async fn prepare_environment( ) -> Result { let workspace = prepare_workspace_logged()?; let cfgsync_port = allocate_cfgsync_port()?; + let grafana_env = env::var("COMPOSE_GRAFANA_PORT") .ok() .and_then(|raw| raw.parse::().ok()); if let Some(port) = grafana_env { info!(port, "using grafana port from env"); } + update_cfgsync_logged(&workspace, descriptors, cfgsync_port)?; ensure_compose_image().await?; @@ -439,6 +441,7 @@ pub async fn prepare_environment( let mut cfgsync_handle = start_cfgsync_stage(&workspace, cfgsync_port).await?; drop(prometheus_port); + match bring_up_stack_logged( &compose_path, &project_name, diff --git a/testing-framework/runners/compose/src/lifecycle/wait.rs b/testing-framework/runners/compose/src/lifecycle/wait.rs index 8cab766..0d06ab0 100644 --- a/testing-framework/runners/compose/src/lifecycle/wait.rs +++ b/testing-framework/runners/compose/src/lifecycle/wait.rs @@ -6,8 +6,11 @@ use testing_framework_core::{ }; use tracing::{debug, info}; -const DEFAULT_WAIT: Duration = Duration::from_secs(180); -const POLL_INTERVAL: Duration = Duration::from_millis(250); +const DEFAULT_WAIT_TIMEOUT_SECS: u64 = 180; +const POLL_INTERVAL_MILLIS: u64 = 250; + +const DEFAULT_WAIT: Duration = Duration::from_secs(DEFAULT_WAIT_TIMEOUT_SECS); +const POLL_INTERVAL: Duration = Duration::from_millis(POLL_INTERVAL_MILLIS); pub async fn wait_for_validators(ports: &[u16]) -> Result<(), HttpReadinessError> { wait_for_ports(ports, NodeRole::Validator).await @@ -20,7 +23,9 @@ pub async fn wait_for_executors(ports: &[u16]) -> Result<(), HttpReadinessError> async fn wait_for_ports(ports: &[u16], role: NodeRole) -> Result<(), HttpReadinessError> { let host = compose_runner_host(); let timeout = compose_http_timeout(); + info!(role = ?role, ports = ?ports, host, "waiting for compose HTTP readiness"); + http_probe::wait_for_http_ports_with_host( ports, role, @@ -31,8 +36,10 @@ async fn wait_for_ports(ports: &[u16], role: NodeRole) -> Result<(), HttpReadine .await } +const DEFAULT_COMPOSE_HOST: &str = "127.0.0.1"; + fn compose_runner_host() -> String { - let host = env::var("COMPOSE_RUNNER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); + let host = env::var("COMPOSE_RUNNER_HOST").unwrap_or_else(|_| DEFAULT_COMPOSE_HOST.to_string()); debug!(host, "compose runner host resolved"); host } diff --git a/testing-framework/runners/k8s/src/deployer/orchestrator.rs b/testing-framework/runners/k8s/src/deployer/orchestrator.rs index 53ecbd9..f09b9a6 100644 --- a/testing-framework/runners/k8s/src/deployer/orchestrator.rs +++ b/testing-framework/runners/k8s/src/deployer/orchestrator.rs @@ -140,6 +140,7 @@ impl Deployer for K8sDeployer { return Err(err.into()); } }; + let (block_feed, block_feed_guard) = match spawn_block_feed_with(&node_clients).await { Ok(pair) => pair, Err(err) => { @@ -160,6 +161,7 @@ impl Deployer for K8sDeployer { grafana_url = %format!("http://{}:{}/", node_host, 30030), "grafana dashboard available via NodePort" ); + if std::env::var("TESTNET_PRINT_ENDPOINTS").is_ok() { println!( "TESTNET_ENDPOINTS prometheus=http://{}:{}/ grafana=http://{}:{}/", @@ -168,6 +170,7 @@ impl Deployer for K8sDeployer { node_host, 30030 ); + for (idx, client) in node_clients.validator_clients().iter().enumerate() { println!( "TESTNET_PPROF validator_{}={}/debug/pprof/profile?seconds=15&format=proto", @@ -175,6 +178,7 @@ impl Deployer for K8sDeployer { client.base_url() ); } + for (idx, client) in node_clients.executor_clients().iter().enumerate() { println!( "TESTNET_PPROF executor_{}={}/debug/pprof/profile?seconds=15&format=proto", @@ -183,6 +187,7 @@ impl Deployer for K8sDeployer { ); } } + let (cleanup, port_forwards) = cluster .take() .expect("cluster should still be available") @@ -192,6 +197,7 @@ impl Deployer for K8sDeployer { block_feed_guard, port_forwards, )); + let context = RunContext::new( descriptors, None, @@ -201,12 +207,14 @@ impl Deployer for K8sDeployer { block_feed, None, ); + info!( validators = validator_count, executors = executor_count, duration_secs = scenario.duration().as_secs(), "k8s deployment ready; handing control to scenario runner" ); + Ok(Runner::new(context, Some(cleanup_guard))) } } diff --git a/testing-framework/runners/k8s/src/infrastructure/cluster.rs b/testing-framework/runners/k8s/src/infrastructure/cluster.rs index 8fce680..1186d47 100644 --- a/testing-framework/runners/k8s/src/infrastructure/cluster.rs +++ b/testing-framework/runners/k8s/src/infrastructure/cluster.rs @@ -47,15 +47,20 @@ impl ClusterEnvironment { ports: &ClusterPorts, port_forwards: Vec, ) -> Self { + let validator_api_ports = ports.validators.iter().map(|ports| ports.api).collect(); + let validator_testing_ports = ports.validators.iter().map(|ports| ports.testing).collect(); + let executor_api_ports = ports.executors.iter().map(|ports| ports.api).collect(); + let executor_testing_ports = ports.executors.iter().map(|ports| ports.testing).collect(); + Self { client, namespace, release, cleanup: Some(cleanup), - validator_api_ports: ports.validators.iter().map(|ports| ports.api).collect(), - validator_testing_ports: ports.validators.iter().map(|ports| ports.testing).collect(), - executor_api_ports: ports.executors.iter().map(|ports| ports.api).collect(), - executor_testing_ports: ports.executors.iter().map(|ports| ports.testing).collect(), + validator_api_ports, + validator_testing_ports, + executor_api_ports, + executor_testing_ports, prometheus_port: ports.prometheus, port_forwards, } diff --git a/testing-framework/runners/k8s/src/lifecycle/wait/deployment.rs b/testing-framework/runners/k8s/src/lifecycle/wait/deployment.rs index 28d8540..027fd57 100644 --- a/testing-framework/runners/k8s/src/lifecycle/wait/deployment.rs +++ b/testing-framework/runners/k8s/src/lifecycle/wait/deployment.rs @@ -4,15 +4,18 @@ use tokio::time::sleep; use super::{ClusterWaitError, deployment_timeout}; +const DEPLOYMENT_POLL_INTERVAL_SECS: u64 = 2; + pub async fn wait_for_deployment_ready( client: &Client, namespace: &str, name: &str, ) -> Result<(), ClusterWaitError> { let mut elapsed = std::time::Duration::ZERO; - let interval = std::time::Duration::from_secs(2); + let interval = std::time::Duration::from_secs(DEPLOYMENT_POLL_INTERVAL_SECS); let timeout = deployment_timeout(); + while elapsed <= timeout { match Api::::namespaced(client.clone(), namespace) .get(name) @@ -29,6 +32,7 @@ pub async fn wait_for_deployment_ready( .as_ref() .and_then(|status| status.ready_replicas) .unwrap_or(0); + if ready >= desired { return Ok(()); } diff --git a/testing-framework/runners/k8s/src/lifecycle/wait/forwarding.rs b/testing-framework/runners/k8s/src/lifecycle/wait/forwarding.rs index 5120f55..b6620a2 100644 --- a/testing-framework/runners/k8s/src/lifecycle/wait/forwarding.rs +++ b/testing-framework/runners/k8s/src/lifecycle/wait/forwarding.rs @@ -5,6 +5,8 @@ use std::{ time::Duration, }; +use anyhow::{Result as AnyhowResult, anyhow}; + use super::{ClusterWaitError, NodeConfigPorts, NodePortAllocation}; pub fn port_forward_group( @@ -73,7 +75,7 @@ pub fn port_forward_service( return Err(ClusterWaitError::PortForward { service: service.to_owned(), port: remote_port, - source: anyhow::anyhow!("kubectl exited with {status}"), + source: anyhow!("kubectl exited with {status}"), }); } if TcpStream::connect((Ipv4Addr::LOCALHOST, local_port)).is_ok() { @@ -86,7 +88,7 @@ pub fn port_forward_service( Err(ClusterWaitError::PortForward { service: service.to_owned(), port: remote_port, - source: anyhow::anyhow!("port-forward did not become ready"), + source: anyhow!("port-forward did not become ready"), }) } @@ -98,7 +100,7 @@ pub fn kill_port_forwards(handles: &mut Vec) { handles.clear(); } -fn allocate_local_port() -> anyhow::Result { +fn allocate_local_port() -> AnyhowResult { let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0))?; let port = listener.local_addr()?.port(); drop(listener); diff --git a/testing-framework/runners/k8s/src/lifecycle/wait/http_probe.rs b/testing-framework/runners/k8s/src/lifecycle/wait/http_probe.rs index 7b0f32e..0715305 100644 --- a/testing-framework/runners/k8s/src/lifecycle/wait/http_probe.rs +++ b/testing-framework/runners/k8s/src/lifecycle/wait/http_probe.rs @@ -11,11 +11,13 @@ pub async fn wait_for_node_http_nodeport( wait_for_node_http_on_host(ports, role, &host, node_http_probe_timeout()).await } +const LOCALHOST: &str = "127.0.0.1"; + pub async fn wait_for_node_http_port_forward( ports: &[u16], role: NodeRole, ) -> Result<(), ClusterWaitError> { - wait_for_node_http_on_host(ports, role, "127.0.0.1", node_http_timeout()).await + wait_for_node_http_on_host(ports, role, LOCALHOST, node_http_timeout()).await } async fn wait_for_node_http_on_host( diff --git a/testing-framework/runners/k8s/src/lifecycle/wait/mod.rs b/testing-framework/runners/k8s/src/lifecycle/wait/mod.rs index f40f8eb..2f32865 100644 --- a/testing-framework/runners/k8s/src/lifecycle/wait/mod.rs +++ b/testing-framework/runners/k8s/src/lifecycle/wait/mod.rs @@ -101,14 +101,17 @@ static DEPLOYMENT_TIMEOUT: LazyLock = LazyLock::new(|| { DEFAULT_K8S_DEPLOYMENT_TIMEOUT, ) }); + static NODE_HTTP_TIMEOUT: LazyLock = LazyLock::new(|| env_duration_secs("K8S_RUNNER_HTTP_TIMEOUT_SECS", DEFAULT_NODE_HTTP_TIMEOUT)); + static NODE_HTTP_PROBE_TIMEOUT: LazyLock = LazyLock::new(|| { env_duration_secs( "K8S_RUNNER_HTTP_PROBE_TIMEOUT_SECS", DEFAULT_NODE_HTTP_PROBE_TIMEOUT, ) }); + static HTTP_POLL_INTERVAL: LazyLock = LazyLock::new(|| { env_duration_secs( "K8S_RUNNER_HTTP_POLL_INTERVAL_SECS", @@ -133,12 +136,14 @@ pub(crate) fn http_poll_interval() -> Duration { } pub(crate) const PROMETHEUS_HTTP_PORT: u16 = DEFAULT_PROMETHEUS_HTTP_PORT; + static PROMETHEUS_HTTP_TIMEOUT: LazyLock = LazyLock::new(|| { env_duration_secs( "K8S_RUNNER_PROMETHEUS_HTTP_TIMEOUT_SECS", DEFAULT_PROMETHEUS_HTTP_TIMEOUT, ) }); + static PROMETHEUS_HTTP_PROBE_TIMEOUT: LazyLock = LazyLock::new(|| { env_duration_secs( "K8S_RUNNER_PROMETHEUS_HTTP_PROBE_TIMEOUT_SECS", diff --git a/testing-framework/tools/cfgsync/src/config/builder.rs b/testing-framework/tools/cfgsync/src/config/builder.rs index 035551f..9fc3375 100644 --- a/testing-framework/tools/cfgsync/src/config/builder.rs +++ b/testing-framework/tools/cfgsync/src/config/builder.rs @@ -71,6 +71,7 @@ pub fn create_node_configs( &ports, &blend_ports, ); + let api_configs = build_api_configs(&hosts); let mut configured_hosts = HashMap::new(); @@ -93,25 +94,23 @@ pub fn create_node_configs( let providers = create_providers(&hosts, &consensus_configs, &blend_configs, &da_configs); - // Update genesis TX to contain Blend and DA providers. let ledger_tx = consensus_configs[0] .genesis_tx .mantle_tx() .ledger_tx .clone(); let genesis_tx = create_genesis_tx_with_declarations(ledger_tx, providers); + for c in &mut consensus_configs { c.genesis_tx = genesis_tx.clone(); } - // Set Blend and DA keys in KMS of each node config. let kms_configs = create_kms_configs(&blend_configs, &da_configs); for (i, host) in hosts.into_iter().enumerate() { let consensus_config = consensus_configs[i].clone(); let api_config = api_configs[i].clone(); - // DA Libp2p network config. let mut da_config = da_configs[i].clone(); da_config.listening_address = Multiaddr::from_str(&format!( "/ip4/0.0.0.0/udp/{}/quic-v1", @@ -122,7 +121,6 @@ pub fn create_node_configs( da_config.policy_settings.min_dispersal_peers = 0; } - // Libp2p network config. let mut network_config = network_configs[i].clone(); network_config.backend.swarm.host = Ipv4Addr::from_str("0.0.0.0").unwrap(); network_config.backend.swarm.port = host.network_port; @@ -135,7 +133,6 @@ pub fn create_node_configs( .unwrap(), }; - // Blend network config. let mut blend_config = blend_configs[i].clone(); blend_config.backend_core.listening_address = Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.blend_port)).unwrap(); @@ -166,9 +163,11 @@ pub fn create_node_configs( fn generate_ids(count: usize, ids: Option>) -> Vec<[u8; 32]> { ids.unwrap_or_else(|| { let mut generated = vec![[0; 32]; count]; + for id in &mut generated { thread_rng().fill(id); } + generated }) } diff --git a/testing-framework/tools/cfgsync/src/repo.rs b/testing-framework/tools/cfgsync/src/repo.rs index 83d83fb..ceeb936 100644 --- a/testing-framework/tools/cfgsync/src/repo.rs +++ b/testing-framework/tools/cfgsync/src/repo.rs @@ -12,6 +12,8 @@ use tokio::{sync::oneshot::Sender, time::timeout}; use crate::{config::builder::create_node_configs, host::Host, server::CfgSyncConfig}; +const HOST_POLLING_INTERVAL: Duration = Duration::from_secs(1); + pub enum RepoResponse { Config(Box), Timeout, @@ -132,7 +134,7 @@ impl ConfigRepo { if self.waiting_hosts.lock().unwrap().len() >= self.n_hosts { break; } - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(HOST_POLLING_INTERVAL).await; } } } diff --git a/testing-framework/tools/cfgsync/src/server.rs b/testing-framework/tools/cfgsync/src/server.rs index e09aa91..72697ff 100644 --- a/testing-framework/tools/cfgsync/src/server.rs +++ b/testing-framework/tools/cfgsync/src/server.rs @@ -1,5 +1,18 @@ use std::{fs, net::Ipv4Addr, num::NonZero, path::PathBuf, sync::Arc, time::Duration}; +// DA Policy Constants +const DEFAULT_MAX_DISPERSAL_FAILURES: usize = 3; +const DEFAULT_MAX_SAMPLING_FAILURES: usize = 3; +const DEFAULT_MAX_REPLICATION_FAILURES: usize = 3; +const DEFAULT_MALICIOUS_THRESHOLD: usize = 10; + +// DA Network Constants +const DEFAULT_SUBNETS_REFRESH_INTERVAL_SECS: u64 = 30; + +// Bootstrap Constants +const DEFAULT_DELAY_BEFORE_NEW_DOWNLOAD_SECS: u64 = 10; +const DEFAULT_MAX_ORPHAN_CACHE_SIZE: usize = 5; + use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post}; use nomos_da_network_core::swarm::{ DAConnectionMonitorSettings, DAConnectionPolicySettings, ReplicationConfig, @@ -7,7 +20,7 @@ use nomos_da_network_core::swarm::{ use nomos_tracing_service::TracingSettings; use nomos_utils::bounded_duration::{MinimalBoundedDuration, SECOND}; use serde::{Deserialize, Serialize}; -use serde_json::json; +use serde_json::{Value, json, to_value}; use serde_with::serde_as; use subnetworks_assignations::MembershipHandler; use testing_framework_config::{ @@ -93,10 +106,10 @@ impl CfgSyncConfig { policy_settings: DAConnectionPolicySettings { min_dispersal_peers: self.min_dispersal_peers, min_replication_peers: self.min_replication_peers, - max_dispersal_failures: 3, - max_sampling_failures: 3, - max_replication_failures: 3, - malicious_threshold: 10, + max_dispersal_failures: DEFAULT_MAX_DISPERSAL_FAILURES, + max_sampling_failures: DEFAULT_MAX_SAMPLING_FAILURES, + max_replication_failures: DEFAULT_MAX_REPLICATION_FAILURES, + malicious_threshold: DEFAULT_MALICIOUS_THRESHOLD, }, monitor_settings: DAConnectionMonitorSettings { failure_time_window: self.monitor_failure_time_window, @@ -105,7 +118,7 @@ impl CfgSyncConfig { balancer_interval: self.balancer_interval, redial_cooldown: Duration::ZERO, replication_settings: self.replication_settings, - subnets_refresh_interval: Duration::from_secs(30), + subnets_refresh_interval: Duration::from_secs(DEFAULT_SUBNETS_REFRESH_INTERVAL_SECS), retry_shares_limit: self.retry_shares_limit, retry_commitments_limit: self.retry_commitments_limit, } @@ -167,12 +180,13 @@ async fn validator_config( |config_response| match config_response { RepoResponse::Config(config) => { let config = create_validator_config(*config); - let mut value = - serde_json::to_value(&config).expect("validator config should serialize"); + let mut value = to_value(&config).expect("validator config should serialize"); + inject_defaults(&mut value); override_api_ports(&mut value, &ports); inject_da_assignations(&mut value, &config.da_network.membership); override_min_session_members(&mut value); + (StatusCode::OK, Json(value)).into_response() } RepoResponse::Timeout => (StatusCode::REQUEST_TIMEOUT).into_response(), @@ -209,12 +223,13 @@ async fn executor_config( |config_response| match config_response { RepoResponse::Config(config) => { let config = create_executor_config(*config); - let mut value = - serde_json::to_value(&config).expect("executor config should serialize"); + let mut value = to_value(&config).expect("executor config should serialize"); + inject_defaults(&mut value); override_api_ports(&mut value, &ports); inject_da_assignations(&mut value, &config.da_network.membership); override_min_session_members(&mut value); + (StatusCode::OK, Json(value)).into_response() } RepoResponse::Timeout => (StatusCode::REQUEST_TIMEOUT).into_response(), @@ -229,7 +244,7 @@ pub fn cfgsync_app(config_repo: Arc) -> Router { .with_state(config_repo) } -fn override_api_ports(config: &mut serde_json::Value, ports: &PortOverrides) { +fn override_api_ports(config: &mut Value, ports: &PortOverrides) { if let Some(api_port) = ports.api_port { if let Some(address) = config.pointer_mut("/http/backend_settings/address") { *address = json!(format!("0.0.0.0:{api_port}")); @@ -243,54 +258,54 @@ fn override_api_ports(config: &mut serde_json::Value, ports: &PortOverrides) { } } -fn inject_da_assignations( - config: &mut serde_json::Value, - membership: &nomos_node::NomosDaMembership, -) { - let assignations: std::collections::HashMap> = membership - .subnetworks() +fn inject_da_assignations(config: &mut Value, membership: &nomos_node::NomosDaMembership) { + fn convert_subnet_to_assignment( + subnet_id: impl ToString, + members: impl IntoIterator, + ) -> (String, Vec) { + let peer_strings: Vec = members.into_iter().map(|peer| peer.to_string()).collect(); + + (subnet_id.to_string(), peer_strings) + } + + let subnetworks = membership.subnetworks(); + let assignations: std::collections::HashMap> = subnetworks .into_iter() - .map(|(subnet_id, members)| { - ( - subnet_id.to_string(), - members.into_iter().map(|peer| peer.to_string()).collect(), - ) - }) + .map(|(subnet_id, members)| convert_subnet_to_assignment(subnet_id, members)) .collect(); if let Some(membership) = config.pointer_mut("/da_network/membership") { if let Some(map) = membership.as_object_mut() { - map.insert("assignations".to_string(), serde_json::json!(assignations)); + map.insert("assignations".to_string(), json!(assignations)); } } } -fn override_min_session_members(config: &mut serde_json::Value) { +fn override_min_session_members(config: &mut Value) { if let Some(value) = config.pointer_mut("/da_network/min_session_members") { - *value = serde_json::json!(1); + *value = json!(1); } } -fn inject_defaults(config: &mut serde_json::Value) { +fn inject_defaults(config: &mut Value) { if let Some(cryptarchia) = config .get_mut("cryptarchia") .and_then(|v| v.as_object_mut()) { - let bootstrap = cryptarchia - .entry("bootstrap") - .or_insert_with(|| serde_json::json!({})); + let bootstrap = cryptarchia.entry("bootstrap").or_insert_with(|| json!({})); if let Some(bootstrap_map) = bootstrap.as_object_mut() { - bootstrap_map - .entry("ibd") - .or_insert_with(|| serde_json::json!({ "peers": [], "delay_before_new_download": { "secs": 10, "nanos": 0 } })); + bootstrap_map.entry("ibd").or_insert_with( + || json!({ "peers": [], "delay_before_new_download": { "secs": DEFAULT_DELAY_BEFORE_NEW_DOWNLOAD_SECS, "nanos": 0 } }), + ); } cryptarchia .entry("network_adapter_settings") - .or_insert_with(|| serde_json::json!({ "topic": "/cryptarchia/proto" })); + .or_insert_with(|| json!({ "topic": "/cryptarchia/proto" })); + cryptarchia.entry("sync").or_insert_with(|| { - serde_json::json!({ - "orphan": { "max_orphan_cache_size": 5 } + json!({ + "orphan": { "max_orphan_cache_size": DEFAULT_MAX_ORPHAN_CACHE_SIZE } }) }); } diff --git a/testing-framework/workflows/src/expectations/consensus_liveness.rs b/testing-framework/workflows/src/expectations/consensus_liveness.rs index 77442fe..2ba1a03 100644 --- a/testing-framework/workflows/src/expectations/consensus_liveness.rs +++ b/testing-framework/workflows/src/expectations/consensus_liveness.rs @@ -107,17 +107,22 @@ impl ConsensusLiveness { match Self::fetch_cluster_info(client).await { Ok((height, tip)) => { let label = format!("node-{idx}"); + tracing::debug!(node = %label, height, tip = ?tip, attempt, "consensus_info collected"); samples.push(NodeSample { label, height, tip }); + break; } + Err(err) if attempt + 1 == REQUEST_RETRIES => { tracing::warn!(node = %format!("node-{idx}"), %err, "consensus_info failed after retries"); + issues.push(ConsensusLivenessIssue::RequestFailed { node: format!("node-{idx}"), source: err, }); } + Err(_) => sleep(REQUEST_RETRY_DELAY).await, } } @@ -186,11 +191,14 @@ impl ConsensusLiveness { } if check.issues.is_empty() { + let observed_heights: Vec<_> = check.samples.iter().map(|s| s.height).collect(); + let observed_tips: Vec<_> = check.samples.iter().map(|s| s.tip).collect(); + tracing::info!( target, samples = check.samples.len(), - heights = ?check.samples.iter().map(|s| s.height).collect::>(), - tips = ?check.samples.iter().map(|s| s.tip).collect::>(), + heights = ?observed_heights, + tips = ?observed_tips, "consensus liveness expectation satisfied" ); Ok(()) @@ -198,6 +206,7 @@ impl ConsensusLiveness { for issue in &check.issues { tracing::warn!(?issue, "consensus liveness issue"); } + Err(Box::new(ConsensusLivenessError::Violations { target, details: check.issues.into(), diff --git a/testing-framework/workflows/src/workloads/da/expectation.rs b/testing-framework/workflows/src/workloads/da/expectation.rs index 4056216..4e78eb0 100644 --- a/testing-framework/workflows/src/workloads/da/expectation.rs +++ b/testing-framework/workflows/src/workloads/da/expectation.rs @@ -15,7 +15,7 @@ use nomos_core::mantle::{ }; use testing_framework_core::scenario::{BlockRecord, DynError, Expectation, RunContext}; use thiserror::Error; -use tokio::sync::broadcast; +use tokio::{pin, select, spawn, sync::broadcast, time::sleep}; use super::workload::{planned_channel_count, planned_channel_ids}; @@ -37,7 +37,7 @@ struct CaptureState { } const MIN_INSCRIPTION_INCLUSION_RATIO: f64 = 0.8; -const MIN_BLOB_INCLUSION_RATIO: f64 = 0.55; +const MIN_BLOB_INCLUSION_RATIO: f64 = 0.5; #[derive(Debug, Error)] enum DaExpectationError { @@ -115,12 +115,12 @@ impl Expectation for DaWorkloadExpectation { { let run_blocks = Arc::clone(&run_blocks); let mut receiver = ctx.block_feed().subscribe(); - tokio::spawn(async move { - let timer = tokio::time::sleep(run_duration); - tokio::pin!(timer); + spawn(async move { + let timer = sleep(run_duration); + pin!(timer); loop { - tokio::select! { + select! { _ = &mut timer => break, result = receiver.recv() => match result { Ok(_) => { @@ -139,7 +139,7 @@ impl Expectation for DaWorkloadExpectation { let inscriptions_for_task = Arc::clone(&inscriptions); let blobs_for_task = Arc::clone(&blobs); - tokio::spawn(async move { + spawn(async move { loop { match receiver.recv().await { Ok(record) => capture_block( diff --git a/testing-framework/workflows/src/workloads/transaction/expectation.rs b/testing-framework/workflows/src/workloads/transaction/expectation.rs index 1790ed4..7a5ba00 100644 --- a/testing-framework/workflows/src/workloads/transaction/expectation.rs +++ b/testing-framework/workflows/src/workloads/transaction/expectation.rs @@ -77,13 +77,13 @@ impl Expectation for TxInclusionExpectation { } let available = limited_user_count(self.user_limit, wallet_accounts.len()); - let (planned, _) = submission_plan(self.txs_per_block, ctx, available)?; - if planned == 0 { + let plan = submission_plan(self.txs_per_block, ctx, available)?; + if plan.transaction_count == 0 { return Err(TxExpectationError::NoPlannedTransactions.into()); } tracing::info!( - planned_txs = planned, + planned_txs = plan.transaction_count, txs_per_block = self.txs_per_block.get(), user_limit = self.user_limit.map(|u| u.get()), "tx inclusion expectation starting capture" @@ -91,7 +91,7 @@ impl Expectation for TxInclusionExpectation { let wallet_pks = wallet_accounts .into_iter() - .take(planned) + .take(plan.transaction_count) .map(|account| account.secret_key.to_public_key()) .collect::>(); @@ -136,7 +136,7 @@ impl Expectation for TxInclusionExpectation { self.capture_state = Some(CaptureState { observed, - expected: planned as u64, + expected: plan.transaction_count as u64, }); Ok(()) diff --git a/testing-framework/workflows/src/workloads/transaction/workload.rs b/testing-framework/workflows/src/workloads/transaction/workload.rs index 5f87960..536c91e 100644 --- a/testing-framework/workflows/src/workloads/transaction/workload.rs +++ b/testing-framework/workflows/src/workloads/transaction/workload.rs @@ -15,6 +15,15 @@ use testing_framework_core::{ scenario::{DynError, Expectation, RunContext, RunMetrics, Workload as ScenarioWorkload}, topology::generation::{GeneratedNodeConfig, GeneratedTopology}, }; + +/// Submission timing plan for transaction workload execution +#[derive(Debug, Clone, Copy)] +pub(super) struct SubmissionPlan { + /// Number of transactions to submit + pub transaction_count: usize, + /// Time interval between submissions + pub submission_interval: Duration, +} use tokio::time::sleep; use super::expectation::TxInclusionExpectation; @@ -52,9 +61,13 @@ impl ScenarioWorkload for Workload { _run_metrics: &RunMetrics, ) -> Result<(), DynError> { tracing::info!("initializing transaction workload"); + let wallet_accounts = descriptors.config().wallet().accounts.clone(); if wallet_accounts.is_empty() { - return Err("transaction workload requires seeded accounts".into()); + return Err( + "Transaction workload initialization failed: no seeded wallet accounts configured" + .into(), + ); } let reference_node = descriptors @@ -64,21 +77,27 @@ impl ScenarioWorkload for Workload { .ok_or("transaction workload requires at least one node in the topology")?; let utxo_map = wallet_utxo_map(reference_node); + + fn match_account_to_utxo( + account: WalletAccount, + utxo_map: &HashMap, + ) -> Option { + utxo_map + .get(&account.public_key()) + .copied() + .map(|utxo| WalletInput { account, utxo }) + } + let mut accounts = wallet_accounts .into_iter() - .filter_map(|account| { - utxo_map - .get(&account.public_key()) - .copied() - .map(|utxo| WalletInput { account, utxo }) - }) + .filter_map(|account| match_account_to_utxo(account, &utxo_map)) .collect::>(); apply_user_limit(&mut accounts, self.user_limit); if accounts.is_empty() { return Err( - "transaction workload could not match any accounts to genesis UTXOs".into(), + "Transaction workload initialization failed: could not match any wallet accounts to genesis UTXOs".into(), ); } @@ -149,22 +168,22 @@ struct Submission<'a> { impl<'a> Submission<'a> { fn new(workload: &Workload, ctx: &'a RunContext) -> Result { if workload.accounts.is_empty() { - return Err("transaction workload has no available accounts".into()); + return Err("Transaction workload submission failed: no available accounts for transaction creation".into()); } - let (planned, interval) = + let submission_plan = submission_plan(workload.txs_per_block, ctx, workload.accounts.len())?; let plan = workload .accounts .iter() - .take(planned) + .take(submission_plan.transaction_count) .cloned() .collect::>(); tracing::info!( - planned, - interval_ms = interval.as_millis(), + planned = submission_plan.transaction_count, + interval_ms = submission_plan.submission_interval.as_millis(), accounts_available = workload.accounts.len(), "transaction workload submission plan" ); @@ -172,7 +191,7 @@ impl<'a> Submission<'a> { Ok(Self { plan, ctx, - interval, + interval: submission_plan.submission_interval, }) } @@ -183,6 +202,7 @@ impl<'a> Submission<'a> { interval_ms = self.interval.as_millis(), "begin transaction submissions" ); + while let Some(input) = self.plan.pop_front() { submit_wallet_transaction(self.ctx, &input).await?; @@ -190,6 +210,7 @@ impl<'a> Submission<'a> { sleep(self.interval).await; } } + tracing::info!("transaction submissions finished"); Ok(()) @@ -212,22 +233,27 @@ fn build_wallet_transaction(input: &WalletInput) -> Result HashMap { let genesis_tx = node.general.consensus_config.genesis_tx.clone(); let ledger_tx = genesis_tx.mantle_tx().ledger_tx.clone(); + let tx_hash = ledger_tx.hash(); ledger_tx @@ -241,6 +267,7 @@ fn wallet_utxo_map(node: &GeneratedNodeConfig) -> HashMap { fn apply_user_limit(items: &mut Vec, user_limit: Option) { if let Some(limit) = user_limit { let allowed = limit.get().min(items.len()); + items.truncate(allowed); } } @@ -253,9 +280,9 @@ pub(super) fn submission_plan( txs_per_block: NonZeroU64, ctx: &RunContext, available_accounts: usize, -) -> Result<(usize, Duration), DynError> { +) -> Result { if available_accounts == 0 { - return Err("transaction workload scheduled zero transactions".into()); + return Err("Transaction workload planning failed: no accounts available for transaction scheduling".into()); } let run_secs = ctx.run_duration().as_secs_f64(); @@ -265,16 +292,22 @@ pub(super) fn submission_plan( .unwrap_or_else(|| ctx.run_duration()) .as_secs_f64(); - let expected_blocks = run_secs / block_secs; - let requested = (expected_blocks * txs_per_block.get() as f64) + let estimated_blocks_in_run = run_secs / block_secs; + let target_transaction_count = (estimated_blocks_in_run * txs_per_block.get() as f64) .floor() .clamp(0.0, u64::MAX as f64) as u64; - let planned = requested.min(available_accounts as u64) as usize; - if planned == 0 { - return Err("transaction workload scheduled zero transactions".into()); + let actual_transactions_to_submit = + target_transaction_count.min(available_accounts as u64) as usize; + + if actual_transactions_to_submit == 0 { + return Err("Transaction workload planning failed: calculated zero transactions to submit based on run duration and target rate".into()); } - let interval = Duration::from_secs_f64(run_secs / planned as f64); - Ok((planned, interval)) + let submission_interval = + Duration::from_secs_f64(run_secs / actual_transactions_to_submit as f64); + Ok(SubmissionPlan { + transaction_count: actual_transactions_to_submit, + submission_interval, + }) }