testing-framework: improve runner defaults + checks

This commit is contained in:
andrussal 2025-12-15 22:29:36 +01:00
parent bc943042d8
commit 6e619d4c03
27 changed files with 333 additions and 169 deletions

View File

@ -1,4 +1,4 @@
use std::time::Duration;
use std::{env, error::Error, process, str::FromStr, time::Duration};
use runner_examples::{ChaosBuilderExt as _, ScenarioBuilderExt as _};
use testing_framework_core::scenario::{Deployer as _, Runner, ScenarioBuilder};
@ -12,15 +12,24 @@ const MIXED_TXS_PER_BLOCK: u64 = 5;
const TOTAL_WALLETS: usize = 1000;
const TRANSACTION_WALLETS: usize = 500;
// Chaos Testing Constants
const CHAOS_MIN_DELAY_SECS: u64 = 120;
const CHAOS_MAX_DELAY_SECS: u64 = 180;
const CHAOS_COOLDOWN_SECS: u64 = 240;
// DA Testing Constants
const DA_CHANNEL_RATE: u64 = 1;
const DA_BLOB_RATE: u64 = 1;
#[tokio::main]
async fn main() {
// Compose containers mount KZG params at /kzgrs_test_params; ensure the
// generated configs point there unless the caller overrides explicitly.
if std::env::var("NOMOS_KZGRS_PARAMS_PATH").is_err() {
if env::var("NOMOS_KZGRS_PARAMS_PATH").is_err() {
// Safe: setting a process-wide environment variable before any threads
// or async tasks are spawned.
unsafe {
std::env::set_var(
env::set_var(
"NOMOS_KZGRS_PARAMS_PATH",
"/kzgrs_test_params/kzgrs_test_params",
);
@ -48,7 +57,7 @@ async fn main() {
if let Err(err) = run_compose_case(validators, executors, Duration::from_secs(run_secs)).await {
warn!("compose runner demo failed: {err}");
std::process::exit(1);
process::exit(1);
}
}
@ -57,7 +66,7 @@ async fn run_compose_case(
validators: usize,
executors: usize,
run_duration: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Box<dyn Error>> {
info!(
validators,
executors,
@ -74,9 +83,9 @@ async fn run_compose_case(
.chaos_with(|c| {
c.restart()
// Keep chaos restarts outside the test run window to avoid crash loops on restart.
.min_delay(Duration::from_secs(120))
.max_delay(Duration::from_secs(180))
.target_cooldown(Duration::from_secs(240))
.min_delay(Duration::from_secs(CHAOS_MIN_DELAY_SECS))
.max_delay(Duration::from_secs(CHAOS_MAX_DELAY_SECS))
.target_cooldown(Duration::from_secs(CHAOS_COOLDOWN_SECS))
.apply()
})
.wallets(TOTAL_WALLETS)
@ -85,8 +94,8 @@ async fn run_compose_case(
.users(TRANSACTION_WALLETS)
})
.da_with(|da| {
da.channel_rate(1)
.blob_rate(1)
da.channel_rate(DA_CHANNEL_RATE)
.blob_rate(DA_BLOB_RATE)
})
.with_run_duration(run_duration)
.expect_consensus_liveness()
@ -103,6 +112,7 @@ async fn run_compose_case(
}
Err(err) => return Err(err.into()),
};
if !runner.context().telemetry().is_configured() {
warn!("compose runner should expose prometheus metrics");
}
@ -113,13 +123,9 @@ async fn run_compose_case(
fn read_env_any<T>(keys: &[&str], default: T) -> T
where
T: std::str::FromStr + Copy,
T: FromStr + Copy,
{
keys.iter()
.find_map(|key| {
std::env::var(key)
.ok()
.and_then(|raw| raw.parse::<T>().ok())
})
.find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::<T>().ok()))
.unwrap_or(default)
}

View File

@ -1,4 +1,4 @@
use std::time::Duration;
use std::{env, error::Error, process, str::FromStr, time::Duration};
use runner_examples::ScenarioBuilderExt as _;
use testing_framework_core::scenario::{Deployer as _, Runner, ScenarioBuilder};
@ -11,6 +11,8 @@ const DEFAULT_EXECUTORS: usize = 1;
const MIXED_TXS_PER_BLOCK: u64 = 5;
const TOTAL_WALLETS: usize = 1000;
const TRANSACTION_WALLETS: usize = 500;
const DA_BLOB_RATE: u64 = 1;
const MIN_CONSENSUS_HEIGHT: u64 = 5;
#[tokio::main]
async fn main() {
@ -32,7 +34,7 @@ async fn main() {
if let Err(err) = run_k8s_case(validators, executors, Duration::from_secs(run_secs)).await {
warn!("k8s runner demo failed: {err}");
std::process::exit(1);
process::exit(1);
}
}
@ -41,7 +43,7 @@ async fn run_k8s_case(
validators: usize,
executors: usize,
run_duration: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Box<dyn Error>> {
info!(
validators,
executors,
@ -59,7 +61,7 @@ async fn run_k8s_case(
.users(TRANSACTION_WALLETS)
})
.da_with(|da| {
da.blob_rate(1)
da.blob_rate(DA_BLOB_RATE)
})
.with_run_duration(run_duration)
.expect_consensus_liveness()
@ -96,9 +98,9 @@ async fn run_k8s_case(
.consensus_info()
.await
.map_err(|err| format!("validator {idx} consensus_info failed: {err}"))?;
if info.height < 5 {
if info.height < MIN_CONSENSUS_HEIGHT {
return Err(format!(
"validator {idx} height {} should reach at least 5 blocks",
"validator {idx} height {} should reach at least {MIN_CONSENSUS_HEIGHT} blocks",
info.height
)
.into());
@ -113,13 +115,9 @@ async fn run_k8s_case(
fn read_env_any<T>(keys: &[&str], default: T) -> T
where
T: std::str::FromStr + Copy,
T: FromStr + Copy,
{
keys.iter()
.find_map(|key| {
std::env::var(key)
.ok()
.and_then(|raw| raw.parse::<T>().ok())
})
.find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::<T>().ok()))
.unwrap_or(default)
}

View File

@ -1,4 +1,4 @@
use std::time::Duration;
use std::{env, error::Error, process, str::FromStr, time::Duration};
use runner_examples::ScenarioBuilderExt as _;
use testing_framework_core::scenario::{Deployer as _, Runner, ScenarioBuilder};
@ -11,15 +11,15 @@ const DEFAULT_RUN_SECS: u64 = 60;
const MIXED_TXS_PER_BLOCK: u64 = 5;
const TOTAL_WALLETS: usize = 1000;
const TRANSACTION_WALLETS: usize = 500;
const DA_BLOB_RATE: u64 = 1;
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
if std::env::var("POL_PROOF_DEV_MODE").is_err() {
if env::var("POL_PROOF_DEV_MODE").is_err() {
warn!("POL_PROOF_DEV_MODE=true is required for the local runner demo");
std::process::exit(1);
process::exit(1);
}
let validators = read_env_any(
@ -42,8 +42,7 @@ async fn main() {
if let Err(err) = run_local_case(validators, executors, Duration::from_secs(run_secs)).await {
warn!("local runner demo failed: {err}");
std::process::exit(1);
process::exit(1);
}
}
@ -51,7 +50,7 @@ async fn run_local_case(
validators: usize,
executors: usize,
run_duration: Duration,
) -> Result<(), Box<dyn std::error::Error>> {
) -> Result<(), Box<dyn Error>> {
info!(
validators,
executors,
@ -64,7 +63,7 @@ async fn run_local_case(
})
.wallets(TOTAL_WALLETS)
.transactions_with(|txs| txs.rate(MIXED_TXS_PER_BLOCK).users(TRANSACTION_WALLETS))
.da_with(|da| da.blob_rate(1))
.da_with(|da| da.blob_rate(DA_BLOB_RATE))
.with_run_duration(run_duration)
.expect_consensus_liveness()
.build();
@ -83,13 +82,9 @@ async fn run_local_case(
fn read_env_any<T>(keys: &[&str], default: T) -> T
where
T: std::str::FromStr + Copy,
T: FromStr + Copy,
{
keys.iter()
.find_map(|key| {
std::env::var(key)
.ok()
.and_then(|raw| raw.parse::<T>().ok())
})
.find_map(|key| env::var(key).ok().and_then(|raw| raw.parse::<T>().ok()))
.unwrap_or(default)
}

View File

@ -14,10 +14,16 @@ pub static IS_DEBUG_TRACING: LazyLock<bool> = LazyLock::new(|| {
env::var("NOMOS_TESTS_TRACING").is_ok_and(|val| val.eq_ignore_ascii_case("true"))
});
const SLOW_ENV_TIMEOUT_MULTIPLIER: u32 = 2;
/// In slow test environments like Codecov, use 2x timeout.
#[must_use]
pub fn adjust_timeout(d: Duration) -> Duration {
if *IS_SLOW_TEST_ENV { d.mul(2) } else { d }
if *IS_SLOW_TEST_ENV {
d.mul(SLOW_ENV_TIMEOUT_MULTIPLIER)
} else {
d
}
}
#[must_use]

View File

@ -20,6 +20,19 @@ use crate::{
topology::configs::blend::GeneralBlendConfig as TopologyBlendConfig,
};
// Blend service constants
const BLEND_LAYERS_COUNT: u64 = 1;
const MINIMUM_NETWORK_SIZE: u64 = 1;
const ROUND_DURATION_SECS: u64 = 1;
const ROUNDS_PER_INTERVAL: u64 = 30;
const ROUNDS_PER_SESSION: u64 = 648_000;
const ROUNDS_PER_OBSERVATION_WINDOW: u64 = 30;
const ROUNDS_PER_SESSION_TRANSITION: u64 = 30;
const EPOCH_TRANSITION_SLOTS: u64 = 2_600;
const SAFETY_BUFFER_INTERVALS: u64 = 100;
const MESSAGE_FREQUENCY_PER_ROUND: f64 = 1.0;
const MAX_RELEASE_DELAY_ROUNDS: u64 = 3;
pub(crate) fn build_blend_service_config(
config: &TopologyBlendConfig,
) -> (
@ -60,26 +73,35 @@ pub(crate) fn build_blend_service_config(
let deployment_settings = BlendDeploymentSettings {
common: blend_deployment::CommonSettings {
num_blend_layers: NonZeroU64::try_from(1).unwrap(),
minimum_network_size: NonZeroU64::try_from(1).unwrap(),
num_blend_layers: NonZeroU64::try_from(BLEND_LAYERS_COUNT).unwrap(),
minimum_network_size: NonZeroU64::try_from(MINIMUM_NETWORK_SIZE).unwrap(),
timing: TimingSettings {
round_duration: Duration::from_secs(1),
rounds_per_interval: NonZeroU64::try_from(30u64).unwrap(),
rounds_per_session: NonZeroU64::try_from(648_000u64).unwrap(),
rounds_per_observation_window: NonZeroU64::try_from(30u64).unwrap(),
rounds_per_session_transition_period: NonZeroU64::try_from(30u64).unwrap(),
epoch_transition_period_in_slots: NonZeroU64::try_from(2_600).unwrap(),
round_duration: Duration::from_secs(ROUND_DURATION_SECS),
rounds_per_interval: NonZeroU64::try_from(ROUNDS_PER_INTERVAL).unwrap(),
rounds_per_session: NonZeroU64::try_from(ROUNDS_PER_SESSION).unwrap(),
rounds_per_observation_window: NonZeroU64::try_from(ROUNDS_PER_OBSERVATION_WINDOW)
.unwrap(),
rounds_per_session_transition_period: NonZeroU64::try_from(
ROUNDS_PER_SESSION_TRANSITION,
)
.unwrap(),
epoch_transition_period_in_slots: NonZeroU64::try_from(EPOCH_TRANSITION_SLOTS)
.unwrap(),
},
protocol_name: backend_core.protocol_name.clone(),
},
core: blend_deployment::CoreSettings {
scheduler: SchedulerSettings {
cover: CoverTrafficSettings {
intervals_for_safety_buffer: 100,
message_frequency_per_round: NonNegativeF64::try_from(1f64).unwrap(),
intervals_for_safety_buffer: SAFETY_BUFFER_INTERVALS,
message_frequency_per_round: NonNegativeF64::try_from(
MESSAGE_FREQUENCY_PER_ROUND,
)
.unwrap(),
},
delayer: MessageDelayerSettings {
maximum_release_delay_in_rounds: NonZeroU64::try_from(3u64).unwrap(),
maximum_release_delay_in_rounds: NonZeroU64::try_from(MAX_RELEASE_DELAY_ROUNDS)
.unwrap(),
},
},
minimum_messages_coefficient: backend_core.minimum_messages_coefficient,

View File

@ -34,6 +34,19 @@ use nomos_wallet::WalletServiceSettings;
use crate::{timeouts, topology::configs::GeneralConfig};
// Configuration constants
const CRYPTARCHIA_GOSSIPSUB_PROTOCOL: &str = "/cryptarchia/proto";
const MEMPOOL_PUBSUB_TOPIC: &str = "mantle";
const STATE_RECORDING_INTERVAL_SECS: u64 = 60;
const IBD_DOWNLOAD_DELAY_SECS: u64 = 10;
const MAX_ORPHAN_CACHE_SIZE: usize = 5;
const DA_PUBLISH_THRESHOLD: f64 = 0.8;
const API_RATE_LIMIT_PER_SECOND: u64 = 10000;
const API_RATE_LIMIT_BURST: u32 = 10000;
const API_MAX_CONCURRENT_REQUESTS: usize = 1000;
const BLOB_STORAGE_DIR: &str = "./";
const KZG_PARAMS_FILENAME: &str = "kzgrs_test_params";
pub(crate) fn cryptarchia_deployment(config: &GeneralConfig) -> CryptarchiaDeploymentSettings {
CryptarchiaDeploymentSettings {
epoch_config: config.consensus_config.ledger_config.epoch_config,
@ -47,7 +60,7 @@ pub(crate) fn cryptarchia_deployment(config: &GeneralConfig) -> CryptarchiaDeplo
.clone(),
min_stake: config.consensus_config.ledger_config.sdp_config.min_stake,
},
gossipsub_protocol: "/cryptarchia/proto".to_owned(),
gossipsub_protocol: CRYPTARCHIA_GOSSIPSUB_PROTOCOL.to_owned(),
}
}
@ -59,7 +72,7 @@ pub(crate) fn time_deployment(config: &GeneralConfig) -> TimeDeploymentSettings
pub(crate) fn mempool_deployment() -> MempoolDeploymentSettings {
MempoolDeploymentSettings {
pubsub_topic: "mantle".to_owned(),
pubsub_topic: MEMPOOL_PUBSUB_TOPIC.to_owned(),
}
}
@ -77,7 +90,7 @@ pub(crate) fn cryptarchia_config(config: &GeneralConfig) -> CryptarchiaConfig {
force_bootstrap: false,
offline_grace_period: chain_service::OfflineGracePeriodConfig {
grace_period: timeouts::grace_period(),
state_recording_interval: Duration::from_secs(60),
state_recording_interval: Duration::from_secs(STATE_RECORDING_INTERVAL_SECS),
},
},
},
@ -85,12 +98,12 @@ pub(crate) fn cryptarchia_config(config: &GeneralConfig) -> CryptarchiaConfig {
bootstrap: ChainBootstrapConfig {
ibd: chain_network::IbdConfig {
peers: HashSet::new(),
delay_before_new_download: Duration::from_secs(10),
delay_before_new_download: Duration::from_secs(IBD_DOWNLOAD_DELAY_SECS),
},
},
sync: SyncConfig {
orphan: OrphanConfig {
max_orphan_cache_size: NonZeroUsize::new(5)
max_orphan_cache_size: NonZeroUsize::new(MAX_ORPHAN_CACHE_SIZE)
.expect("Max orphan cache size must be non-zero"),
},
},
@ -104,9 +117,11 @@ pub(crate) fn cryptarchia_config(config: &GeneralConfig) -> CryptarchiaConfig {
fn kzg_params_path(raw: &str) -> String {
let path = PathBuf::from(raw);
if path.is_dir() {
return path.join("kzgrs_test_params").to_string_lossy().to_string();
return path.join(KZG_PARAMS_FILENAME).to_string_lossy().to_string();
}
path.to_string_lossy().to_string()
}
@ -121,10 +136,10 @@ pub(crate) fn da_verifier_config(
tx_verifier_settings: (),
network_adapter_settings: (),
storage_adapter_settings: VerifierStorageAdapterSettings {
blob_storage_directory: "./".into(),
blob_storage_directory: BLOB_STORAGE_DIR.into(),
},
mempool_trigger_settings: MempoolPublishTriggerConfig {
publish_threshold: NonNegativeF64::try_from(0.8).unwrap(),
publish_threshold: NonNegativeF64::try_from(DA_PUBLISH_THRESHOLD).unwrap(),
share_duration: timeouts::share_duration(),
prune_duration: timeouts::prune_duration(),
prune_interval: timeouts::prune_interval(),
@ -180,9 +195,9 @@ pub(crate) fn http_config(config: &GeneralConfig) -> ApiServiceSettings<NodeAxum
ApiServiceSettings {
backend_settings: NodeAxumBackendSettings {
address: config.api_config.address,
rate_limit_per_second: 10000,
rate_limit_burst: 10000,
max_concurrent_requests: 1000,
rate_limit_per_second: API_RATE_LIMIT_PER_SECOND,
rate_limit_burst: API_RATE_LIMIT_BURST,
max_concurrent_requests: API_MAX_CONCURRENT_REQUESTS,
..Default::default()
},
}
@ -194,9 +209,9 @@ pub(crate) fn testing_http_config(
ApiServiceSettings {
backend_settings: NodeAxumBackendSettings {
address: config.api_config.testing_http_address,
rate_limit_per_second: 10000,
rate_limit_burst: 10000,
max_concurrent_requests: 1000,
rate_limit_per_second: API_RATE_LIMIT_PER_SECOND,
rate_limit_burst: API_RATE_LIMIT_BURST,
max_concurrent_requests: API_MAX_CONCURRENT_REQUESTS,
..Default::default()
},
}

View File

@ -1,6 +1,11 @@
#![allow(dead_code)]
use std::{fs::File, io, path::Path};
use std::{
env,
fs::{self, File},
io,
path::{Path, PathBuf},
};
use nomos_tracing::logging::local::FileConfig;
use serde::Serialize;
@ -16,9 +21,11 @@ where
F: FnOnce(FileConfig),
{
debug!(prefix, base_dir = %base_dir.display(), "configuring node logging");
if let Ok(env_dir) = std::env::var("NOMOS_LOG_DIR") {
let log_dir = std::path::PathBuf::from(env_dir);
let _ = std::fs::create_dir_all(&log_dir);
if let Ok(env_dir) = env::var("NOMOS_LOG_DIR") {
let log_dir = PathBuf::from(env_dir);
let _ = fs::create_dir_all(&log_dir);
set_logger(FileConfig {
directory: log_dir,
prefix: Some(prefix.into()),
@ -39,10 +46,13 @@ where
F: FnOnce(&mut Value),
{
debug!(path = %path.display(), "writing node config with injection");
let mut yaml_value =
serde_yaml::to_value(config).map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
inject(&mut yaml_value);
normalize_ed25519_sigs(&mut yaml_value);
let file = File::create(path)?;
serde_yaml::to_writer(file, &yaml_value)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))

View File

@ -1,4 +1,9 @@
use std::{ops::Deref, path::PathBuf};
use std::{
net::SocketAddr,
ops::Deref,
path::{Path, PathBuf},
time::Duration,
};
use nomos_executor::config::Config;
use nomos_tracing_service::LoggerLayer;
@ -78,7 +83,7 @@ impl Executor {
}
/// Wait for the executor process to exit, with a timeout.
pub async fn wait_for_exit(&mut self, timeout: std::time::Duration) -> bool {
pub async fn wait_for_exit(&mut self, timeout: Duration) -> bool {
self.handle.wait_for_exit(timeout).await
}
}
@ -88,7 +93,7 @@ impl NodeConfigCommon for Config {
self.tracing.logger = logger;
}
fn set_paths(&mut self, base: &std::path::Path) {
fn set_paths(&mut self, base: &Path) {
self.storage.db_path = base.join("db");
base.clone_into(
&mut self
@ -98,7 +103,7 @@ impl NodeConfigCommon for Config {
);
}
fn addresses(&self) -> (std::net::SocketAddr, Option<std::net::SocketAddr>) {
fn addresses(&self) -> (SocketAddr, Option<SocketAddr>) {
(
self.http.backend_settings.address,
Some(self.testing_http.backend_settings.address),

View File

@ -124,8 +124,11 @@ impl GeneratedTopology {
pub async fn wait_remote_readiness(
&self,
// Node endpoints
validator_endpoints: &[Url],
executor_endpoints: &[Url],
// Membership endpoints
validator_membership_endpoints: Option<&[Url]>,
executor_membership_endpoints: Option<&[Url]>,
) -> Result<(), ReadinessError> {
@ -151,6 +154,7 @@ impl GeneratedTopology {
let labels = self.labels();
let client = Client::new();
let make_testing_base_url = |port: u16| -> Url {
Url::parse(&format!("http://127.0.0.1:{port}/"))
.expect("failed to construct local testing base url")
@ -163,6 +167,7 @@ impl GeneratedTopology {
&listen_ports,
&initial_peer_ports,
);
let network_check = HttpNetworkReadiness {
client: &client,
endpoints: &endpoints,
@ -180,6 +185,7 @@ impl GeneratedTopology {
urls.len(),
"validator membership endpoints must match topology"
);
membership_endpoints.extend_from_slice(urls);
} else {
membership_endpoints.extend(
@ -195,6 +201,7 @@ impl GeneratedTopology {
urls.len(),
"executor membership endpoints must match topology"
);
membership_endpoints.extend_from_slice(urls);
} else {
membership_endpoints.extend(
@ -282,6 +289,7 @@ pub fn find_expected_peer_counts(
let Some(peer_idx) = listen_ports.iter().position(|p| p == port) else {
continue;
};
if peer_idx == idx {
continue;
}

View File

@ -92,6 +92,7 @@ impl DeploymentOrchestrator {
host,
environment.grafana_port()
);
print_profiling_urls(&host, &host_ports);
}

View File

@ -35,6 +35,7 @@ impl PortManager {
environment
.fail("failed to determine container host ports")
.await;
tracing::warn!(%err, "failed to resolve host ports");
Err(err)
}

View File

@ -53,14 +53,17 @@ impl DeploymentSetup {
if prometheus_env.is_some() {
info!(port = prometheus_env, "using prometheus port from env");
}
let prometheus_port = prometheus_env
.and_then(|port| reserve_port(port))
.or_else(|| allocate_prometheus_port())
.unwrap_or_else(|| PortReservation::new(DEFAULT_PROMETHEUS_PORT, None));
debug!(
prometheus_port = prometheus_port.port(),
"selected prometheus port"
);
let environment =
prepare_environment(&self.descriptors, prometheus_port, prometheus_env.is_some())
.await?;

View File

@ -408,12 +408,14 @@ pub async fn prepare_environment(
) -> Result<StackEnvironment, ComposeRunnerError> {
let workspace = prepare_workspace_logged()?;
let cfgsync_port = allocate_cfgsync_port()?;
let grafana_env = env::var("COMPOSE_GRAFANA_PORT")
.ok()
.and_then(|raw| raw.parse::<u16>().ok());
if let Some(port) = grafana_env {
info!(port, "using grafana port from env");
}
update_cfgsync_logged(&workspace, descriptors, cfgsync_port)?;
ensure_compose_image().await?;
@ -439,6 +441,7 @@ pub async fn prepare_environment(
let mut cfgsync_handle = start_cfgsync_stage(&workspace, cfgsync_port).await?;
drop(prometheus_port);
match bring_up_stack_logged(
&compose_path,
&project_name,

View File

@ -6,8 +6,11 @@ use testing_framework_core::{
};
use tracing::{debug, info};
const DEFAULT_WAIT: Duration = Duration::from_secs(180);
const POLL_INTERVAL: Duration = Duration::from_millis(250);
const DEFAULT_WAIT_TIMEOUT_SECS: u64 = 180;
const POLL_INTERVAL_MILLIS: u64 = 250;
const DEFAULT_WAIT: Duration = Duration::from_secs(DEFAULT_WAIT_TIMEOUT_SECS);
const POLL_INTERVAL: Duration = Duration::from_millis(POLL_INTERVAL_MILLIS);
pub async fn wait_for_validators(ports: &[u16]) -> Result<(), HttpReadinessError> {
wait_for_ports(ports, NodeRole::Validator).await
@ -20,7 +23,9 @@ pub async fn wait_for_executors(ports: &[u16]) -> Result<(), HttpReadinessError>
async fn wait_for_ports(ports: &[u16], role: NodeRole) -> Result<(), HttpReadinessError> {
let host = compose_runner_host();
let timeout = compose_http_timeout();
info!(role = ?role, ports = ?ports, host, "waiting for compose HTTP readiness");
http_probe::wait_for_http_ports_with_host(
ports,
role,
@ -31,8 +36,10 @@ async fn wait_for_ports(ports: &[u16], role: NodeRole) -> Result<(), HttpReadine
.await
}
const DEFAULT_COMPOSE_HOST: &str = "127.0.0.1";
fn compose_runner_host() -> String {
let host = env::var("COMPOSE_RUNNER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let host = env::var("COMPOSE_RUNNER_HOST").unwrap_or_else(|_| DEFAULT_COMPOSE_HOST.to_string());
debug!(host, "compose runner host resolved");
host
}

View File

@ -140,6 +140,7 @@ impl Deployer for K8sDeployer {
return Err(err.into());
}
};
let (block_feed, block_feed_guard) = match spawn_block_feed_with(&node_clients).await {
Ok(pair) => pair,
Err(err) => {
@ -160,6 +161,7 @@ impl Deployer for K8sDeployer {
grafana_url = %format!("http://{}:{}/", node_host, 30030),
"grafana dashboard available via NodePort"
);
if std::env::var("TESTNET_PRINT_ENDPOINTS").is_ok() {
println!(
"TESTNET_ENDPOINTS prometheus=http://{}:{}/ grafana=http://{}:{}/",
@ -168,6 +170,7 @@ impl Deployer for K8sDeployer {
node_host,
30030
);
for (idx, client) in node_clients.validator_clients().iter().enumerate() {
println!(
"TESTNET_PPROF validator_{}={}/debug/pprof/profile?seconds=15&format=proto",
@ -175,6 +178,7 @@ impl Deployer for K8sDeployer {
client.base_url()
);
}
for (idx, client) in node_clients.executor_clients().iter().enumerate() {
println!(
"TESTNET_PPROF executor_{}={}/debug/pprof/profile?seconds=15&format=proto",
@ -183,6 +187,7 @@ impl Deployer for K8sDeployer {
);
}
}
let (cleanup, port_forwards) = cluster
.take()
.expect("cluster should still be available")
@ -192,6 +197,7 @@ impl Deployer for K8sDeployer {
block_feed_guard,
port_forwards,
));
let context = RunContext::new(
descriptors,
None,
@ -201,12 +207,14 @@ impl Deployer for K8sDeployer {
block_feed,
None,
);
info!(
validators = validator_count,
executors = executor_count,
duration_secs = scenario.duration().as_secs(),
"k8s deployment ready; handing control to scenario runner"
);
Ok(Runner::new(context, Some(cleanup_guard)))
}
}

View File

@ -47,15 +47,20 @@ impl ClusterEnvironment {
ports: &ClusterPorts,
port_forwards: Vec<std::process::Child>,
) -> Self {
let validator_api_ports = ports.validators.iter().map(|ports| ports.api).collect();
let validator_testing_ports = ports.validators.iter().map(|ports| ports.testing).collect();
let executor_api_ports = ports.executors.iter().map(|ports| ports.api).collect();
let executor_testing_ports = ports.executors.iter().map(|ports| ports.testing).collect();
Self {
client,
namespace,
release,
cleanup: Some(cleanup),
validator_api_ports: ports.validators.iter().map(|ports| ports.api).collect(),
validator_testing_ports: ports.validators.iter().map(|ports| ports.testing).collect(),
executor_api_ports: ports.executors.iter().map(|ports| ports.api).collect(),
executor_testing_ports: ports.executors.iter().map(|ports| ports.testing).collect(),
validator_api_ports,
validator_testing_ports,
executor_api_ports,
executor_testing_ports,
prometheus_port: ports.prometheus,
port_forwards,
}

View File

@ -4,15 +4,18 @@ use tokio::time::sleep;
use super::{ClusterWaitError, deployment_timeout};
const DEPLOYMENT_POLL_INTERVAL_SECS: u64 = 2;
pub async fn wait_for_deployment_ready(
client: &Client,
namespace: &str,
name: &str,
) -> Result<(), ClusterWaitError> {
let mut elapsed = std::time::Duration::ZERO;
let interval = std::time::Duration::from_secs(2);
let interval = std::time::Duration::from_secs(DEPLOYMENT_POLL_INTERVAL_SECS);
let timeout = deployment_timeout();
while elapsed <= timeout {
match Api::<Deployment>::namespaced(client.clone(), namespace)
.get(name)
@ -29,6 +32,7 @@ pub async fn wait_for_deployment_ready(
.as_ref()
.and_then(|status| status.ready_replicas)
.unwrap_or(0);
if ready >= desired {
return Ok(());
}

View File

@ -5,6 +5,8 @@ use std::{
time::Duration,
};
use anyhow::{Result as AnyhowResult, anyhow};
use super::{ClusterWaitError, NodeConfigPorts, NodePortAllocation};
pub fn port_forward_group(
@ -73,7 +75,7 @@ pub fn port_forward_service(
return Err(ClusterWaitError::PortForward {
service: service.to_owned(),
port: remote_port,
source: anyhow::anyhow!("kubectl exited with {status}"),
source: anyhow!("kubectl exited with {status}"),
});
}
if TcpStream::connect((Ipv4Addr::LOCALHOST, local_port)).is_ok() {
@ -86,7 +88,7 @@ pub fn port_forward_service(
Err(ClusterWaitError::PortForward {
service: service.to_owned(),
port: remote_port,
source: anyhow::anyhow!("port-forward did not become ready"),
source: anyhow!("port-forward did not become ready"),
})
}
@ -98,7 +100,7 @@ pub fn kill_port_forwards(handles: &mut Vec<Child>) {
handles.clear();
}
fn allocate_local_port() -> anyhow::Result<u16> {
fn allocate_local_port() -> AnyhowResult<u16> {
let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0))?;
let port = listener.local_addr()?.port();
drop(listener);

View File

@ -11,11 +11,13 @@ pub async fn wait_for_node_http_nodeport(
wait_for_node_http_on_host(ports, role, &host, node_http_probe_timeout()).await
}
const LOCALHOST: &str = "127.0.0.1";
pub async fn wait_for_node_http_port_forward(
ports: &[u16],
role: NodeRole,
) -> Result<(), ClusterWaitError> {
wait_for_node_http_on_host(ports, role, "127.0.0.1", node_http_timeout()).await
wait_for_node_http_on_host(ports, role, LOCALHOST, node_http_timeout()).await
}
async fn wait_for_node_http_on_host(

View File

@ -101,14 +101,17 @@ static DEPLOYMENT_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
DEFAULT_K8S_DEPLOYMENT_TIMEOUT,
)
});
static NODE_HTTP_TIMEOUT: LazyLock<Duration> =
LazyLock::new(|| env_duration_secs("K8S_RUNNER_HTTP_TIMEOUT_SECS", DEFAULT_NODE_HTTP_TIMEOUT));
static NODE_HTTP_PROBE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
env_duration_secs(
"K8S_RUNNER_HTTP_PROBE_TIMEOUT_SECS",
DEFAULT_NODE_HTTP_PROBE_TIMEOUT,
)
});
static HTTP_POLL_INTERVAL: LazyLock<Duration> = LazyLock::new(|| {
env_duration_secs(
"K8S_RUNNER_HTTP_POLL_INTERVAL_SECS",
@ -133,12 +136,14 @@ pub(crate) fn http_poll_interval() -> Duration {
}
pub(crate) const PROMETHEUS_HTTP_PORT: u16 = DEFAULT_PROMETHEUS_HTTP_PORT;
static PROMETHEUS_HTTP_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
env_duration_secs(
"K8S_RUNNER_PROMETHEUS_HTTP_TIMEOUT_SECS",
DEFAULT_PROMETHEUS_HTTP_TIMEOUT,
)
});
static PROMETHEUS_HTTP_PROBE_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
env_duration_secs(
"K8S_RUNNER_PROMETHEUS_HTTP_PROBE_TIMEOUT_SECS",

View File

@ -71,6 +71,7 @@ pub fn create_node_configs(
&ports,
&blend_ports,
);
let api_configs = build_api_configs(&hosts);
let mut configured_hosts = HashMap::new();
@ -93,25 +94,23 @@ pub fn create_node_configs(
let providers = create_providers(&hosts, &consensus_configs, &blend_configs, &da_configs);
// Update genesis TX to contain Blend and DA providers.
let ledger_tx = consensus_configs[0]
.genesis_tx
.mantle_tx()
.ledger_tx
.clone();
let genesis_tx = create_genesis_tx_with_declarations(ledger_tx, providers);
for c in &mut consensus_configs {
c.genesis_tx = genesis_tx.clone();
}
// Set Blend and DA keys in KMS of each node config.
let kms_configs = create_kms_configs(&blend_configs, &da_configs);
for (i, host) in hosts.into_iter().enumerate() {
let consensus_config = consensus_configs[i].clone();
let api_config = api_configs[i].clone();
// DA Libp2p network config.
let mut da_config = da_configs[i].clone();
da_config.listening_address = Multiaddr::from_str(&format!(
"/ip4/0.0.0.0/udp/{}/quic-v1",
@ -122,7 +121,6 @@ pub fn create_node_configs(
da_config.policy_settings.min_dispersal_peers = 0;
}
// Libp2p network config.
let mut network_config = network_configs[i].clone();
network_config.backend.swarm.host = Ipv4Addr::from_str("0.0.0.0").unwrap();
network_config.backend.swarm.port = host.network_port;
@ -135,7 +133,6 @@ pub fn create_node_configs(
.unwrap(),
};
// Blend network config.
let mut blend_config = blend_configs[i].clone();
blend_config.backend_core.listening_address =
Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.blend_port)).unwrap();
@ -166,9 +163,11 @@ pub fn create_node_configs(
fn generate_ids(count: usize, ids: Option<Vec<[u8; 32]>>) -> Vec<[u8; 32]> {
ids.unwrap_or_else(|| {
let mut generated = vec![[0; 32]; count];
for id in &mut generated {
thread_rng().fill(id);
}
generated
})
}

View File

@ -12,6 +12,8 @@ use tokio::{sync::oneshot::Sender, time::timeout};
use crate::{config::builder::create_node_configs, host::Host, server::CfgSyncConfig};
const HOST_POLLING_INTERVAL: Duration = Duration::from_secs(1);
pub enum RepoResponse {
Config(Box<GeneralConfig>),
Timeout,
@ -132,7 +134,7 @@ impl ConfigRepo {
if self.waiting_hosts.lock().unwrap().len() >= self.n_hosts {
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(HOST_POLLING_INTERVAL).await;
}
}
}

View File

@ -1,5 +1,18 @@
use std::{fs, net::Ipv4Addr, num::NonZero, path::PathBuf, sync::Arc, time::Duration};
// DA Policy Constants
const DEFAULT_MAX_DISPERSAL_FAILURES: usize = 3;
const DEFAULT_MAX_SAMPLING_FAILURES: usize = 3;
const DEFAULT_MAX_REPLICATION_FAILURES: usize = 3;
const DEFAULT_MALICIOUS_THRESHOLD: usize = 10;
// DA Network Constants
const DEFAULT_SUBNETS_REFRESH_INTERVAL_SECS: u64 = 30;
// Bootstrap Constants
const DEFAULT_DELAY_BEFORE_NEW_DOWNLOAD_SECS: u64 = 10;
const DEFAULT_MAX_ORPHAN_CACHE_SIZE: usize = 5;
use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::post};
use nomos_da_network_core::swarm::{
DAConnectionMonitorSettings, DAConnectionPolicySettings, ReplicationConfig,
@ -7,7 +20,7 @@ use nomos_da_network_core::swarm::{
use nomos_tracing_service::TracingSettings;
use nomos_utils::bounded_duration::{MinimalBoundedDuration, SECOND};
use serde::{Deserialize, Serialize};
use serde_json::json;
use serde_json::{Value, json, to_value};
use serde_with::serde_as;
use subnetworks_assignations::MembershipHandler;
use testing_framework_config::{
@ -93,10 +106,10 @@ impl CfgSyncConfig {
policy_settings: DAConnectionPolicySettings {
min_dispersal_peers: self.min_dispersal_peers,
min_replication_peers: self.min_replication_peers,
max_dispersal_failures: 3,
max_sampling_failures: 3,
max_replication_failures: 3,
malicious_threshold: 10,
max_dispersal_failures: DEFAULT_MAX_DISPERSAL_FAILURES,
max_sampling_failures: DEFAULT_MAX_SAMPLING_FAILURES,
max_replication_failures: DEFAULT_MAX_REPLICATION_FAILURES,
malicious_threshold: DEFAULT_MALICIOUS_THRESHOLD,
},
monitor_settings: DAConnectionMonitorSettings {
failure_time_window: self.monitor_failure_time_window,
@ -105,7 +118,7 @@ impl CfgSyncConfig {
balancer_interval: self.balancer_interval,
redial_cooldown: Duration::ZERO,
replication_settings: self.replication_settings,
subnets_refresh_interval: Duration::from_secs(30),
subnets_refresh_interval: Duration::from_secs(DEFAULT_SUBNETS_REFRESH_INTERVAL_SECS),
retry_shares_limit: self.retry_shares_limit,
retry_commitments_limit: self.retry_commitments_limit,
}
@ -167,12 +180,13 @@ async fn validator_config(
|config_response| match config_response {
RepoResponse::Config(config) => {
let config = create_validator_config(*config);
let mut value =
serde_json::to_value(&config).expect("validator config should serialize");
let mut value = to_value(&config).expect("validator config should serialize");
inject_defaults(&mut value);
override_api_ports(&mut value, &ports);
inject_da_assignations(&mut value, &config.da_network.membership);
override_min_session_members(&mut value);
(StatusCode::OK, Json(value)).into_response()
}
RepoResponse::Timeout => (StatusCode::REQUEST_TIMEOUT).into_response(),
@ -209,12 +223,13 @@ async fn executor_config(
|config_response| match config_response {
RepoResponse::Config(config) => {
let config = create_executor_config(*config);
let mut value =
serde_json::to_value(&config).expect("executor config should serialize");
let mut value = to_value(&config).expect("executor config should serialize");
inject_defaults(&mut value);
override_api_ports(&mut value, &ports);
inject_da_assignations(&mut value, &config.da_network.membership);
override_min_session_members(&mut value);
(StatusCode::OK, Json(value)).into_response()
}
RepoResponse::Timeout => (StatusCode::REQUEST_TIMEOUT).into_response(),
@ -229,7 +244,7 @@ pub fn cfgsync_app(config_repo: Arc<ConfigRepo>) -> Router {
.with_state(config_repo)
}
fn override_api_ports(config: &mut serde_json::Value, ports: &PortOverrides) {
fn override_api_ports(config: &mut Value, ports: &PortOverrides) {
if let Some(api_port) = ports.api_port {
if let Some(address) = config.pointer_mut("/http/backend_settings/address") {
*address = json!(format!("0.0.0.0:{api_port}"));
@ -243,54 +258,54 @@ fn override_api_ports(config: &mut serde_json::Value, ports: &PortOverrides) {
}
}
fn inject_da_assignations(
config: &mut serde_json::Value,
membership: &nomos_node::NomosDaMembership,
) {
let assignations: std::collections::HashMap<String, Vec<String>> = membership
.subnetworks()
fn inject_da_assignations(config: &mut Value, membership: &nomos_node::NomosDaMembership) {
fn convert_subnet_to_assignment(
subnet_id: impl ToString,
members: impl IntoIterator<Item = impl ToString>,
) -> (String, Vec<String>) {
let peer_strings: Vec<String> = members.into_iter().map(|peer| peer.to_string()).collect();
(subnet_id.to_string(), peer_strings)
}
let subnetworks = membership.subnetworks();
let assignations: std::collections::HashMap<String, Vec<String>> = subnetworks
.into_iter()
.map(|(subnet_id, members)| {
(
subnet_id.to_string(),
members.into_iter().map(|peer| peer.to_string()).collect(),
)
})
.map(|(subnet_id, members)| convert_subnet_to_assignment(subnet_id, members))
.collect();
if let Some(membership) = config.pointer_mut("/da_network/membership") {
if let Some(map) = membership.as_object_mut() {
map.insert("assignations".to_string(), serde_json::json!(assignations));
map.insert("assignations".to_string(), json!(assignations));
}
}
}
fn override_min_session_members(config: &mut serde_json::Value) {
fn override_min_session_members(config: &mut Value) {
if let Some(value) = config.pointer_mut("/da_network/min_session_members") {
*value = serde_json::json!(1);
*value = json!(1);
}
}
fn inject_defaults(config: &mut serde_json::Value) {
fn inject_defaults(config: &mut Value) {
if let Some(cryptarchia) = config
.get_mut("cryptarchia")
.and_then(|v| v.as_object_mut())
{
let bootstrap = cryptarchia
.entry("bootstrap")
.or_insert_with(|| serde_json::json!({}));
let bootstrap = cryptarchia.entry("bootstrap").or_insert_with(|| json!({}));
if let Some(bootstrap_map) = bootstrap.as_object_mut() {
bootstrap_map
.entry("ibd")
.or_insert_with(|| serde_json::json!({ "peers": [], "delay_before_new_download": { "secs": 10, "nanos": 0 } }));
bootstrap_map.entry("ibd").or_insert_with(
|| json!({ "peers": [], "delay_before_new_download": { "secs": DEFAULT_DELAY_BEFORE_NEW_DOWNLOAD_SECS, "nanos": 0 } }),
);
}
cryptarchia
.entry("network_adapter_settings")
.or_insert_with(|| serde_json::json!({ "topic": "/cryptarchia/proto" }));
.or_insert_with(|| json!({ "topic": "/cryptarchia/proto" }));
cryptarchia.entry("sync").or_insert_with(|| {
serde_json::json!({
"orphan": { "max_orphan_cache_size": 5 }
json!({
"orphan": { "max_orphan_cache_size": DEFAULT_MAX_ORPHAN_CACHE_SIZE }
})
});
}

View File

@ -107,17 +107,22 @@ impl ConsensusLiveness {
match Self::fetch_cluster_info(client).await {
Ok((height, tip)) => {
let label = format!("node-{idx}");
tracing::debug!(node = %label, height, tip = ?tip, attempt, "consensus_info collected");
samples.push(NodeSample { label, height, tip });
break;
}
Err(err) if attempt + 1 == REQUEST_RETRIES => {
tracing::warn!(node = %format!("node-{idx}"), %err, "consensus_info failed after retries");
issues.push(ConsensusLivenessIssue::RequestFailed {
node: format!("node-{idx}"),
source: err,
});
}
Err(_) => sleep(REQUEST_RETRY_DELAY).await,
}
}
@ -186,11 +191,14 @@ impl ConsensusLiveness {
}
if check.issues.is_empty() {
let observed_heights: Vec<_> = check.samples.iter().map(|s| s.height).collect();
let observed_tips: Vec<_> = check.samples.iter().map(|s| s.tip).collect();
tracing::info!(
target,
samples = check.samples.len(),
heights = ?check.samples.iter().map(|s| s.height).collect::<Vec<_>>(),
tips = ?check.samples.iter().map(|s| s.tip).collect::<Vec<_>>(),
heights = ?observed_heights,
tips = ?observed_tips,
"consensus liveness expectation satisfied"
);
Ok(())
@ -198,6 +206,7 @@ impl ConsensusLiveness {
for issue in &check.issues {
tracing::warn!(?issue, "consensus liveness issue");
}
Err(Box::new(ConsensusLivenessError::Violations {
target,
details: check.issues.into(),

View File

@ -15,7 +15,7 @@ use nomos_core::mantle::{
};
use testing_framework_core::scenario::{BlockRecord, DynError, Expectation, RunContext};
use thiserror::Error;
use tokio::sync::broadcast;
use tokio::{pin, select, spawn, sync::broadcast, time::sleep};
use super::workload::{planned_channel_count, planned_channel_ids};
@ -37,7 +37,7 @@ struct CaptureState {
}
const MIN_INSCRIPTION_INCLUSION_RATIO: f64 = 0.8;
const MIN_BLOB_INCLUSION_RATIO: f64 = 0.55;
const MIN_BLOB_INCLUSION_RATIO: f64 = 0.5;
#[derive(Debug, Error)]
enum DaExpectationError {
@ -115,12 +115,12 @@ impl Expectation for DaWorkloadExpectation {
{
let run_blocks = Arc::clone(&run_blocks);
let mut receiver = ctx.block_feed().subscribe();
tokio::spawn(async move {
let timer = tokio::time::sleep(run_duration);
tokio::pin!(timer);
spawn(async move {
let timer = sleep(run_duration);
pin!(timer);
loop {
tokio::select! {
select! {
_ = &mut timer => break,
result = receiver.recv() => match result {
Ok(_) => {
@ -139,7 +139,7 @@ impl Expectation for DaWorkloadExpectation {
let inscriptions_for_task = Arc::clone(&inscriptions);
let blobs_for_task = Arc::clone(&blobs);
tokio::spawn(async move {
spawn(async move {
loop {
match receiver.recv().await {
Ok(record) => capture_block(

View File

@ -77,13 +77,13 @@ impl Expectation for TxInclusionExpectation {
}
let available = limited_user_count(self.user_limit, wallet_accounts.len());
let (planned, _) = submission_plan(self.txs_per_block, ctx, available)?;
if planned == 0 {
let plan = submission_plan(self.txs_per_block, ctx, available)?;
if plan.transaction_count == 0 {
return Err(TxExpectationError::NoPlannedTransactions.into());
}
tracing::info!(
planned_txs = planned,
planned_txs = plan.transaction_count,
txs_per_block = self.txs_per_block.get(),
user_limit = self.user_limit.map(|u| u.get()),
"tx inclusion expectation starting capture"
@ -91,7 +91,7 @@ impl Expectation for TxInclusionExpectation {
let wallet_pks = wallet_accounts
.into_iter()
.take(planned)
.take(plan.transaction_count)
.map(|account| account.secret_key.to_public_key())
.collect::<HashSet<ZkPublicKey>>();
@ -136,7 +136,7 @@ impl Expectation for TxInclusionExpectation {
self.capture_state = Some(CaptureState {
observed,
expected: planned as u64,
expected: plan.transaction_count as u64,
});
Ok(())

View File

@ -15,6 +15,15 @@ use testing_framework_core::{
scenario::{DynError, Expectation, RunContext, RunMetrics, Workload as ScenarioWorkload},
topology::generation::{GeneratedNodeConfig, GeneratedTopology},
};
/// Submission timing plan for transaction workload execution
#[derive(Debug, Clone, Copy)]
pub(super) struct SubmissionPlan {
/// Number of transactions to submit
pub transaction_count: usize,
/// Time interval between submissions
pub submission_interval: Duration,
}
use tokio::time::sleep;
use super::expectation::TxInclusionExpectation;
@ -52,9 +61,13 @@ impl ScenarioWorkload for Workload {
_run_metrics: &RunMetrics,
) -> Result<(), DynError> {
tracing::info!("initializing transaction workload");
let wallet_accounts = descriptors.config().wallet().accounts.clone();
if wallet_accounts.is_empty() {
return Err("transaction workload requires seeded accounts".into());
return Err(
"Transaction workload initialization failed: no seeded wallet accounts configured"
.into(),
);
}
let reference_node = descriptors
@ -64,21 +77,27 @@ impl ScenarioWorkload for Workload {
.ok_or("transaction workload requires at least one node in the topology")?;
let utxo_map = wallet_utxo_map(reference_node);
fn match_account_to_utxo(
account: WalletAccount,
utxo_map: &HashMap<ZkPublicKey, Utxo>,
) -> Option<WalletInput> {
utxo_map
.get(&account.public_key())
.copied()
.map(|utxo| WalletInput { account, utxo })
}
let mut accounts = wallet_accounts
.into_iter()
.filter_map(|account| {
utxo_map
.get(&account.public_key())
.copied()
.map(|utxo| WalletInput { account, utxo })
})
.filter_map(|account| match_account_to_utxo(account, &utxo_map))
.collect::<Vec<_>>();
apply_user_limit(&mut accounts, self.user_limit);
if accounts.is_empty() {
return Err(
"transaction workload could not match any accounts to genesis UTXOs".into(),
"Transaction workload initialization failed: could not match any wallet accounts to genesis UTXOs".into(),
);
}
@ -149,22 +168,22 @@ struct Submission<'a> {
impl<'a> Submission<'a> {
fn new(workload: &Workload, ctx: &'a RunContext) -> Result<Self, DynError> {
if workload.accounts.is_empty() {
return Err("transaction workload has no available accounts".into());
return Err("Transaction workload submission failed: no available accounts for transaction creation".into());
}
let (planned, interval) =
let submission_plan =
submission_plan(workload.txs_per_block, ctx, workload.accounts.len())?;
let plan = workload
.accounts
.iter()
.take(planned)
.take(submission_plan.transaction_count)
.cloned()
.collect::<VecDeque<_>>();
tracing::info!(
planned,
interval_ms = interval.as_millis(),
planned = submission_plan.transaction_count,
interval_ms = submission_plan.submission_interval.as_millis(),
accounts_available = workload.accounts.len(),
"transaction workload submission plan"
);
@ -172,7 +191,7 @@ impl<'a> Submission<'a> {
Ok(Self {
plan,
ctx,
interval,
interval: submission_plan.submission_interval,
})
}
@ -183,6 +202,7 @@ impl<'a> Submission<'a> {
interval_ms = self.interval.as_millis(),
"begin transaction submissions"
);
while let Some(input) = self.plan.pop_front() {
submit_wallet_transaction(self.ctx, &input).await?;
@ -190,6 +210,7 @@ impl<'a> Submission<'a> {
sleep(self.interval).await;
}
}
tracing::info!("transaction submissions finished");
Ok(())
@ -212,22 +233,27 @@ fn build_wallet_transaction(input: &WalletInput) -> Result<SignedMantleTx, DynEr
.add_ledger_output(Note::new(input.utxo.note.value, input.account.public_key()));
let mantle_tx = builder.build();
let tx_hash = mantle_tx.hash();
let signature = ZkKey::multi_sign(
std::slice::from_ref(&input.account.secret_key),
tx_hash.as_ref(),
)
.map_err(|err| format!("transaction workload could not sign transaction: {err}"))?;
.map_err(|err| {
format!("Transaction workload signing failed: could not sign transaction: {err}")
})?;
SignedMantleTx::new(mantle_tx, Vec::new(), signature).map_err(|err| {
format!("transaction workload constructed invalid transaction: {err}").into()
format!("Transaction workload construction failed: invalid transaction structure: {err}")
.into()
})
}
fn wallet_utxo_map(node: &GeneratedNodeConfig) -> HashMap<ZkPublicKey, Utxo> {
let genesis_tx = node.general.consensus_config.genesis_tx.clone();
let ledger_tx = genesis_tx.mantle_tx().ledger_tx.clone();
let tx_hash = ledger_tx.hash();
ledger_tx
@ -241,6 +267,7 @@ fn wallet_utxo_map(node: &GeneratedNodeConfig) -> HashMap<ZkPublicKey, Utxo> {
fn apply_user_limit<T>(items: &mut Vec<T>, user_limit: Option<NonZeroUsize>) {
if let Some(limit) = user_limit {
let allowed = limit.get().min(items.len());
items.truncate(allowed);
}
}
@ -253,9 +280,9 @@ pub(super) fn submission_plan(
txs_per_block: NonZeroU64,
ctx: &RunContext,
available_accounts: usize,
) -> Result<(usize, Duration), DynError> {
) -> Result<SubmissionPlan, DynError> {
if available_accounts == 0 {
return Err("transaction workload scheduled zero transactions".into());
return Err("Transaction workload planning failed: no accounts available for transaction scheduling".into());
}
let run_secs = ctx.run_duration().as_secs_f64();
@ -265,16 +292,22 @@ pub(super) fn submission_plan(
.unwrap_or_else(|| ctx.run_duration())
.as_secs_f64();
let expected_blocks = run_secs / block_secs;
let requested = (expected_blocks * txs_per_block.get() as f64)
let estimated_blocks_in_run = run_secs / block_secs;
let target_transaction_count = (estimated_blocks_in_run * txs_per_block.get() as f64)
.floor()
.clamp(0.0, u64::MAX as f64) as u64;
let planned = requested.min(available_accounts as u64) as usize;
if planned == 0 {
return Err("transaction workload scheduled zero transactions".into());
let actual_transactions_to_submit =
target_transaction_count.min(available_accounts as u64) as usize;
if actual_transactions_to_submit == 0 {
return Err("Transaction workload planning failed: calculated zero transactions to submit based on run duration and target rate".into());
}
let interval = Duration::from_secs_f64(run_secs / planned as f64);
Ok((planned, interval))
let submission_interval =
Duration::from_secs_f64(run_secs / actual_transactions_to_submit as f64);
Ok(SubmissionPlan {
transaction_count: actual_transactions_to_submit,
submission_interval,
})
}