From 619db3846d502cec49c7940678c829177a88cb59 Mon Sep 17 00:00:00 2001 From: moudyellaz Date: Tue, 19 May 2026 22:59:02 +0200 Subject: [PATCH] refactor(e2e_bench)!: Duration-typed timings, seconds-float JSON, tokio::timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BREAKING CHANGE: bench JSON renames per-step / per-scenario timing fields from *_ms (float milliseconds) to *_s (float seconds). Renames: submit_ms → submit_s, inclusion_ms → inclusion_s, wallet_sync_ms → wallet_sync_s, total_ms → total_s, setup_ms → setup_s, bedrock_finality_ms → bedrock_finality_s, total_wall_seconds → total_wall_s. measure_bedrock_finality timeout floor also shifts slightly: on timeout the field is now ~60.000s rather than "first poll tick past 60s". --- Cargo.lock | 4 - tools/e2e_bench/Cargo.toml | 18 +-- tools/e2e_bench/README.md | 2 +- tools/e2e_bench/src/bedrock_handle.rs | 11 +- tools/e2e_bench/src/bench_context.rs | 9 +- tools/e2e_bench/src/harness.rs | 143 +++++++++++--------- tools/e2e_bench/src/main.rs | 151 ++++++++++------------ tools/e2e_bench/src/scenarios/parallel.rs | 29 +++-- 8 files changed, 194 insertions(+), 173 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96afcef6..f238fc2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2382,7 +2382,6 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" name = "e2e_bench" version = "0.1.0" dependencies = [ - "amm_core", "anyhow", "borsh", "chrono", @@ -2392,15 +2391,12 @@ dependencies = [ "indexer_service_rpc", "integration_tests", "jsonrpsee", - "log", "nssa", - "nssa_core", "sequencer_service", "sequencer_service_rpc", "serde", "serde_json", "tempfile", - "token_core", "tokio", "wallet", ] diff --git a/tools/e2e_bench/Cargo.toml b/tools/e2e_bench/Cargo.toml index c6658a8e..ab6a6eb0 100644 --- a/tools/e2e_bench/Cargo.toml +++ b/tools/e2e_bench/Cargo.toml @@ -9,21 +9,21 @@ publish = false workspace = true [dependencies] +common.workspace = true +indexer_service.workspace = true +indexer_service_rpc = { workspace = true, features = ["client"] } integration_tests.workspace = true -wallet.workspace = true nssa.workspace = true sequencer_service.workspace = true sequencer_service_rpc = { workspace = true, features = ["client"] } -indexer_service.workspace = true -indexer_service_rpc = { workspace = true, features = ["client"] } -jsonrpsee = { workspace = true, features = ["ws-client"] } -common.workspace = true -tempfile.workspace = true -borsh.workspace = true -chrono.workspace = true +wallet.workspace = true anyhow.workspace = true -tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] } +borsh.workspace = true +chrono.workspace = true clap.workspace = true +jsonrpsee = { workspace = true, features = ["ws-client"] } serde.workspace = true serde_json.workspace = true +tempfile.workspace = true +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] } diff --git a/tools/e2e_bench/README.md b/tools/e2e_bench/README.md index 37d6b175..ddc76bb7 100644 --- a/tools/e2e_bench/README.md +++ b/tools/e2e_bench/README.md @@ -26,7 +26,7 @@ Scenarios: `token`, `amm`, `fanout`, `private`, `parallel`, `all`. ## What you'll see -Per scenario: a step table (`submit_ms`, `inclusion_ms`, `sync_ms`, `total_ms`) and a size summary covering every block captured during the scenario (block_bytes total/mean/min/max; per-tx-variant sizes for public, PPE, and program-deployment transactions). +Per scenario: a step table (`submit_s`, `inclusion_s`, `sync_s`, `total_s`) and a size summary covering every block captured during the scenario (block_bytes total/mean/min/max; per-tx-variant sizes for public, PPE, and program-deployment transactions). The fanout, parallel, and private scenarios are the most representative for L1-payload-size measurements since they put multiple txs per block. diff --git a/tools/e2e_bench/src/bedrock_handle.rs b/tools/e2e_bench/src/bedrock_handle.rs index ef59d8ad..94a8514a 100644 --- a/tools/e2e_bench/src/bedrock_handle.rs +++ b/tools/e2e_bench/src/bedrock_handle.rs @@ -2,14 +2,19 @@ //! Launches a fresh Bedrock instance per scenario so the indexer never has to //! catch up a large finalization backlog. //! -//! Required env vars (no defaults — path layouts differ per developer): -//! - `LEZ_BEDROCK_BIN` — absolute path to the `logos-blockchain-node` binary. -//! - `LEZ_BEDROCK_CONFIG_DIR` — directory containing `node-config.yaml` and +//! Required env vars (no defaults, path layouts differ per developer): +//! - `LEZ_BEDROCK_BIN` absolute path to the `logos-blockchain-node` binary. +//! - `LEZ_BEDROCK_CONFIG_DIR` directory containing `node-config.yaml` and //! `deployment-settings.yaml` (template with `PLACEHOLDER_CHAIN_START_TIME`). //! //! Optional: //! - `LEZ_BEDROCK_PORT` (default: 18080) +#![allow( + clippy::let_underscore_must_use, + reason = "file is deleted in the docker-compose pivot; teardown ignores child kill/wait results by design" +)] + use std::{ env, net::SocketAddr, diff --git a/tools/e2e_bench/src/bench_context.rs b/tools/e2e_bench/src/bench_context.rs index 43376811..41f0d59d 100644 --- a/tools/e2e_bench/src/bench_context.rs +++ b/tools/e2e_bench/src/bench_context.rs @@ -1,4 +1,4 @@ -//! BenchContext: wires sequencer + indexer + wallet in-process against an +//! `BenchContext`: wires sequencer + indexer + wallet in-process against an //! externally-running Bedrock node. Mirrors the surface of //! `integration_tests::TestContext` for the methods the scenarios need //! (`wallet_mut()`, `sequencer_client()`), but skips the docker setup. @@ -6,6 +6,11 @@ //! The external Bedrock URL defaults to 127.0.0.1:18080 and can be overridden //! with the `LEZ_BEDROCK_ADDR` env var. +#![allow( + clippy::arbitrary_source_item_ordering, + reason = "file is deleted in the docker-compose pivot; ordering churn is wasted work" +)] + use std::{env, net::SocketAddr, path::Path}; use anyhow::{Context as _, Result}; @@ -145,7 +150,7 @@ impl BenchContext { &self.sequencer_client } - pub fn indexer_addr(&self) -> SocketAddr { + pub const fn indexer_addr(&self) -> SocketAddr { self.indexer_handle.addr() } diff --git a/tools/e2e_bench/src/harness.rs b/tools/e2e_bench/src/harness.rs index bcdff7a7..c83904fd 100644 --- a/tools/e2e_bench/src/harness.rs +++ b/tools/e2e_bench/src/harness.rs @@ -1,11 +1,16 @@ //! Step / scenario timing primitives shared across scenarios. +#![allow( + clippy::ref_option, + reason = "serde::serialize_with requires fn(&Option, S) -> Result<...>" +)] + use std::time::{Duration, Instant}; use anyhow::{Result, bail}; use common::transaction::NSSATransaction; use sequencer_service_rpc::RpcClient as _; -use serde::Serialize; +use serde::{Serialize, Serializer}; use wallet::cli::SubcommandReturnValue; use crate::bench_context::BenchContext; @@ -16,7 +21,7 @@ const TX_INCLUSION_TIMEOUT: Duration = Duration::from_secs(120); /// Borsh-serialized sizes for one zone block fetched after a step. `block_bytes` /// is the full Block (header + body + bedrock metadata) and is the closest /// proxy we have to the L1 payload posted per block. `tx_bytes` is each contained -/// transaction split by variant — this is what the fee model's S_tx slot covers. +/// transaction split by variant, which is what the fee model's `S_tx` slot covers. #[derive(Debug, Serialize, Clone, Default)] pub struct BlockSize { pub block_id: u64, @@ -29,22 +34,28 @@ pub struct BlockSize { #[derive(Debug, Serialize, Clone)] pub struct StepResult { pub label: String, - pub submit_ms: f64, - pub inclusion_ms: Option, - pub wallet_sync_ms: Option, - pub total_ms: f64, + #[serde(serialize_with = "ser_duration_secs", rename = "submit_s")] + pub submit: Duration, + #[serde(serialize_with = "ser_opt_duration_secs", rename = "inclusion_s")] + pub inclusion: Option, + #[serde(serialize_with = "ser_opt_duration_secs", rename = "wallet_sync_s")] + pub wallet_sync: Option, + #[serde(serialize_with = "ser_duration_secs", rename = "total_s")] + pub total: Duration, pub tx_hash: Option, /// Borsh sizes for every zone block produced during this step. - /// Empty for steps that don't advance the chain (e.g. RegisterAccount). + /// Empty for steps that don't advance the chain (e.g. `RegisterAccount`). pub blocks: Vec, } #[derive(Debug, Serialize, Default)] pub struct ScenarioResult { pub name: String, - pub setup_ms: f64, + #[serde(serialize_with = "ser_duration_secs", rename = "setup_s")] + pub setup: Duration, pub steps: Vec, - pub total_ms: f64, + #[serde(serialize_with = "ser_duration_secs", rename = "total_s")] + pub total: Duration, /// Disk sizes (sequencer / indexer / wallet tempdirs) sampled at scenario start. pub disk_before: Option, /// Disk sizes sampled at scenario end. @@ -53,7 +64,8 @@ pub struct ScenarioResult { /// reporting the sequencer tip as L1-finalised. Effectively measures the /// sequencer→Bedrock posting + Bedrock finalisation + indexer L1 ingest path. /// A value at the timeout (60s) means finalisation did not happen within the bench window. - pub bedrock_finality_ms: Option, + #[serde(serialize_with = "ser_opt_duration_secs", rename = "bedrock_finality_s")] + pub bedrock_finality: Option, } impl ScenarioResult { @@ -65,11 +77,18 @@ impl ScenarioResult { } pub fn push(&mut self, step: StepResult) { - self.total_ms += step.total_ms; + self.total = self.total.saturating_add(step.total); self.steps.push(step); } } +/// Begin a timed step. Capture this *before* submitting the wallet operation +/// so we can later subtract it from the post-submit block height to detect +/// when the chain has advanced past the tx's block. +pub async fn begin_step(ctx: &BenchContext) -> Result { + Ok(ctx.sequencer_client().get_last_block_id().await?) +} + /// Finish a timed wallet step. Records submit (the time between `started` /// being captured and `ret` being received) and, if `ret` is a /// [`SubcommandReturnValue::PrivacyPreservingTransfer`], polls the sequencer @@ -79,15 +98,8 @@ impl ScenarioResult { /// ```ignore /// let started = Instant::now(); /// let ret = wallet::cli::execute_subcommand(ctx.wallet_mut(), cmd).await?; -/// let step = finalize_step("label", started, ret, ctx).await?; +/// let step = finalize_step("label", started, pre_block_id, &ret, ctx).await?; /// ``` -/// Begin a timed step. Capture this *before* submitting the wallet operation -/// so we can later subtract it from the post-submit block height to detect -/// when the chain has advanced past the tx's block. -pub async fn begin_step(ctx: &BenchContext) -> Result { - Ok(ctx.sequencer_client().get_last_block_id().await?) -} - pub async fn finalize_step( label: impl Into, started: Instant, @@ -96,11 +108,11 @@ pub async fn finalize_step( ctx: &mut BenchContext, ) -> Result { let label = label.into(); - let submit_ms = started.elapsed().as_secs_f64() * 1_000.0; + let submit = started.elapsed(); let mut tx_hash_str = None; - let mut inclusion_ms = None; - let mut wallet_sync_ms = None; + let mut inclusion = None; + let mut wallet_sync = None; let mut blocks: Vec = Vec::new(); // For non-account-create steps (anything that produces a tx_hash, or even @@ -115,11 +127,11 @@ pub async fn finalize_step( } let started_inclusion = Instant::now(); wait_for_chain_advance(ctx, pre_block_id, 2).await?; - inclusion_ms = Some(started_inclusion.elapsed().as_secs_f64() * 1_000.0); + inclusion = Some(started_inclusion.elapsed()); let started_sync = Instant::now(); sync_wallet_to_tip(ctx).await?; - wallet_sync_ms = Some(started_sync.elapsed().as_secs_f64() * 1_000.0); + wallet_sync = Some(started_sync.elapsed()); // Capture block-byte and per-tx-byte sizes for every block produced // during this step. We intentionally capture all blocks, including @@ -151,10 +163,10 @@ pub async fn finalize_step( Ok(StepResult { label, - submit_ms, - inclusion_ms, - wallet_sync_ms, - total_ms: started.elapsed().as_secs_f64() * 1_000.0, + submit, + inclusion, + wallet_sync, + total: started.elapsed(), tx_hash: tx_hash_str, blocks, }) @@ -167,19 +179,21 @@ pub async fn wait_for_chain_advance( min_blocks: u64, ) -> Result<()> { let target = from_block_id.saturating_add(min_blocks); - let deadline = Instant::now() + TX_INCLUSION_TIMEOUT; - loop { - match ctx.sequencer_client().get_last_block_id().await { - Ok(current) if current >= target => return Ok(()), - Ok(_) => {} - Err(err) => eprintln!("get_last_block_id error (continuing poll): {err:#}"), + let poll = async { + loop { + match ctx.sequencer_client().get_last_block_id().await { + Ok(current) if current >= target => return, + Ok(_) => {} + Err(err) => eprintln!("get_last_block_id error (continuing poll): {err:#}"), + } + tokio::time::sleep(TX_INCLUSION_POLL_INTERVAL).await; } - if Instant::now() > deadline { - bail!( - "chain did not advance from {from_block_id} to at least {target} within {TX_INCLUSION_TIMEOUT:?}" - ); - } - tokio::time::sleep(TX_INCLUSION_POLL_INTERVAL).await; + }; + match tokio::time::timeout(TX_INCLUSION_TIMEOUT, poll).await { + Ok(()) => Ok(()), + Err(_) => bail!( + "chain did not advance from {from_block_id} to at least {target} within {TX_INCLUSION_TIMEOUT:?}" + ), } } @@ -199,38 +213,35 @@ pub fn print_table(result: &ScenarioResult) { .max("step".len()); println!( - "\nScenario: {} (setup {:.1} ms ({:.2}s), total {:.1} ms ({:.2}s))", + "\nScenario: {} (setup {:.2}s, total {:.2}s)", result.name, - result.setup_ms, - result.setup_ms / 1_000.0, - result.total_ms, - result.total_ms / 1_000.0, + result.setup.as_secs_f64(), + result.total.as_secs_f64(), ); println!( - "{:10} {:>12} {:>10} {:>16}", + "{:10} {:>12} {:>10} {:>10}", "step", - "submit_ms", - "inclusion_ms", - "sync_ms", - "total_ms (s)", + "submit_s", + "inclusion_s", + "sync_s", + "total_s", lw = label_width, ); - println!("{}", "-".repeat(label_width + 62)); + println!("{}", "-".repeat(label_width.saturating_add(50))); for s in &result.steps { let inclusion = s - .inclusion_ms - .map_or_else(|| "-".to_owned(), |v| format!("{v:.1}")); + .inclusion + .map_or_else(|| "-".to_owned(), |v| format!("{:.3}", v.as_secs_f64())); let sync = s - .wallet_sync_ms - .map_or_else(|| "-".to_owned(), |v| format!("{v:.1}")); - let total = format!("{:.1} ({:.2}s)", s.total_ms, s.total_ms / 1_000.0); + .wallet_sync + .map_or_else(|| "-".to_owned(), |v| format!("{:.3}", v.as_secs_f64())); println!( - "{:10.1} {:>12} {:>10} {:>16}", + "{:10.3} {:>12} {:>10} {:>10.3}", s.label, - s.submit_ms, + s.submit.as_secs_f64(), inclusion, sync, - total, + s.total.as_secs_f64(), lw = label_width, ); } @@ -295,3 +306,17 @@ fn print_tx_line(label: &str, samples: &[usize]) { fn mean_usize(xs: &[usize]) -> usize { xs.iter().sum::().checked_div(xs.len()).unwrap_or(0) } + +fn ser_duration_secs(d: &Duration, s: S) -> std::result::Result { + s.serialize_f64(d.as_secs_f64()) +} + +fn ser_opt_duration_secs( + d: &Option, + s: S, +) -> std::result::Result { + match d { + Some(d) => s.serialize_f64(d.as_secs_f64()), + None => s.serialize_none(), + } +} diff --git a/tools/e2e_bench/src/main.rs b/tools/e2e_bench/src/main.rs index 31ea1189..80f547ab 100644 --- a/tools/e2e_bench/src/main.rs +++ b/tools/e2e_bench/src/main.rs @@ -7,34 +7,25 @@ //! block + tx sizes per scenario. //! //! Required env vars (no defaults; see `tools/e2e_bench/README.md`): -//! LEZ_BEDROCK_BIN absolute path to logos-blockchain-node. -//! LEZ_BEDROCK_CONFIG_DIR directory with node-config.yaml + deployment template. +//! `LEZ_BEDROCK_BIN` absolute path to logos-blockchain-node. +//! `LEZ_BEDROCK_CONFIG_DIR` directory with node-config.yaml + deployment template. //! //! Run examples: -//! RISC0_DEV_MODE=1 cargo run --release -p e2e_bench -- --scenario all. -//! cargo run --release -p e2e_bench -- --scenario amm. +//! `RISC0_DEV_MODE=1` `cargo run --release -p e2e_bench -- --scenario all`. +//! `cargo run --release -p e2e_bench -- --scenario amm`. //! //! `RISC0_DEV_MODE=1` skips proving and produces latency-only numbers in //! ~minutes; omitting it produces realistic proving-inclusive numbers but //! the run takes much longer. -#![expect( - clippy::arbitrary_source_item_ordering, +#![allow( clippy::arithmetic_side_effects, - clippy::as_conversions, - clippy::doc_markdown, - clippy::float_arithmetic, - clippy::let_underscore_must_use, - clippy::let_underscore_untyped, - clippy::missing_const_for_fn, clippy::print_stderr, clippy::print_stdout, - clippy::single_call_fn, - clippy::single_match_else, - clippy::std_instead_of_core, - clippy::too_many_lines, clippy::wildcard_enum_match_arm, - reason = "Bench tool: matches test-style fixture code" + reason = "Bench tool: stderr/stdout output is the deliverable; small Duration / iterator-sum \ + arithmetic is safe at bench scale; bench scenarios bail loudly on any unexpected \ + return variant, which is preferable to maintaining an exhaustive list in five files." )] use std::{path::PathBuf, time::Duration}; @@ -68,7 +59,7 @@ struct Cli { #[arg(long, value_enum, default_value_t = ScenarioName::All)] scenario: ScenarioName, - /// Optional JSON output path. Defaults to /target/e2e_bench.json. + /// Optional JSON output path. Defaults to `/target/e2e_bench.json`. #[arg(long)] json_out: Option, } @@ -77,7 +68,7 @@ struct Cli { struct BenchRunReport { risc0_dev_mode: bool, scenarios: Vec, - total_wall_seconds: f64, + total_wall_s: f64, } #[tokio::main(flavor = "multi_thread")] @@ -110,61 +101,61 @@ async fn main() -> Result<()> { for name in to_run { eprintln!("\n=== running scenario: {name:?} ==="); - let setup_started = std::time::Instant::now(); - // Spawn a fresh Bedrock node for this scenario. Each scenario therefore - // starts with an empty chain so the indexer never has a backlog from a - // prior scenario. - let bedrock = BedrockHandle::launch_fresh() - .await - .with_context(|| format!("failed to spawn Bedrock for scenario {name:?}"))?; - let bedrock_addr_string = format!("{}", bedrock.addr()); - // Safety: we restore the previous LEZ_BEDROCK_ADDR value (if any) at scenario teardown. - // SAFETY: this happens before any threaded setup that reads env. - unsafe { - std::env::set_var("LEZ_BEDROCK_ADDR", &bedrock_addr_string); + { + let setup_started = std::time::Instant::now(); + // Spawn a fresh Bedrock node for this scenario. Each scenario therefore + // starts with an empty chain so the indexer never has a backlog from a + // prior scenario. + let bedrock = BedrockHandle::launch_fresh() + .await + .with_context(|| format!("failed to spawn Bedrock for scenario {name:?}"))?; + let bedrock_addr_string = format!("{}", bedrock.addr()); + // SAFETY: env::set_var happens before any threaded setup that reads env. + unsafe { + std::env::set_var("LEZ_BEDROCK_ADDR", &bedrock_addr_string); + } + + let mut ctx = BenchContext::new() + .await + .with_context(|| format!("failed to setup BenchContext for scenario {name:?}"))?; + let setup = setup_started.elapsed(); + eprintln!("setup: {:.2}s", setup.as_secs_f64()); + + let disk_before = ctx.disk_sizes(); + let mut result = run_scenario(name, setup, &mut ctx).await?; + result.disk_before = Some(disk_before); + result.disk_after = Some(ctx.disk_sizes()); + result.bedrock_finality = Some(measure_bedrock_finality(&ctx).await?); + harness::print_table(&result); + all_results.push(result); + + // ctx and bedrock drop here at end of scope, killing the bedrock child + // before we sleep so the next iteration can rebind the port. } - - let mut ctx = BenchContext::new() - .await - .with_context(|| format!("failed to setup BenchContext for scenario {name:?}"))?; - let setup_ms = elapsed_ms(setup_started); - eprintln!("setup: {setup_ms:.1} ms"); - - let disk_before = ctx.disk_sizes(); - let mut result = run_scenario(name, setup_ms, &mut ctx).await?; - result.disk_before = Some(disk_before); - result.disk_after = Some(ctx.disk_sizes()); - result.bedrock_finality_ms = Some(measure_bedrock_finality(&ctx).await?); - harness::print_table(&result); - all_results.push(result); - - drop(ctx); - drop(bedrock); // Give Bedrock a moment to shut down before the next scenario. tokio::time::sleep(Duration::from_secs(2)).await; } - let total_wall_seconds = overall_started.elapsed().as_secs_f64(); - eprintln!("\nTotal wall time: {total_wall_seconds:.1}s"); + let total_wall_s = overall_started.elapsed().as_secs_f64(); + eprintln!("\nTotal wall time: {total_wall_s:.1}s"); let report = BenchRunReport { risc0_dev_mode, scenarios: all_results, - total_wall_seconds, + total_wall_s, }; - let out_path = match cli.json_out { - Some(p) => p, - None => { - let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("..") - .join("..") - .canonicalize()?; - let suffix = if risc0_dev_mode { "dev" } else { "prove" }; - workspace_root - .join("target") - .join(format!("e2e_bench_{suffix}.json")) - } + let out_path = if let Some(p) = cli.json_out { + p + } else { + let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("..") + .join("..") + .canonicalize()?; + let suffix = if risc0_dev_mode { "dev" } else { "prove" }; + workspace_root + .join("target") + .join(format!("e2e_bench_{suffix}.json")) }; if let Some(parent) = out_path.parent() { std::fs::create_dir_all(parent)?; @@ -177,7 +168,7 @@ async fn main() -> Result<()> { async fn run_scenario( name: ScenarioName, - setup_ms: f64, + setup: Duration, ctx: &mut BenchContext, ) -> Result { let result = match name { @@ -188,17 +179,13 @@ async fn run_scenario( ScenarioName::Parallel => scenarios::parallel::run(ctx).await?, ScenarioName::All => unreachable!("dispatched above"), }; - Ok(ScenarioResult { setup_ms, ..result }) -} - -fn elapsed_ms(t: std::time::Instant) -> f64 { - t.elapsed().as_secs_f64() * 1_000.0 + Ok(ScenarioResult { setup, ..result }) } /// Poll the indexer's L1-finalised block id until it catches up with the /// sequencer's last block id. This is effectively the sequencer→Bedrock posting /// plus Bedrock finalisation plus indexer ingest latency. -async fn measure_bedrock_finality(ctx: &BenchContext) -> Result { +async fn measure_bedrock_finality(ctx: &BenchContext) -> Result { use indexer_service_rpc::RpcClient as _; use jsonrpsee::ws_client::WsClientBuilder; use sequencer_service_rpc::RpcClient as _; @@ -210,20 +197,20 @@ async fn measure_bedrock_finality(ctx: &BenchContext) -> Result { .context("connect indexer WS")?; let sequencer_tip = ctx.sequencer_client().get_last_block_id().await?; + let timeout = Duration::from_secs(60); let started = std::time::Instant::now(); - let deadline = started + Duration::from_secs(60); - loop { - match indexer_ws.get_last_finalized_block_id().await { - Ok(Some(b)) if b >= sequencer_tip => { - return Ok(started.elapsed().as_secs_f64() * 1_000.0); + let poll = async { + loop { + match indexer_ws.get_last_finalized_block_id().await { + Ok(Some(b)) if b >= sequencer_tip => return, + Ok(_) => {} + Err(err) => eprintln!("indexer last_synced poll error: {err:#}"), } - Ok(_) => {} - Err(err) => eprintln!("indexer last_synced poll error: {err:#}"), + tokio::time::sleep(Duration::from_millis(200)).await; } - if std::time::Instant::now() > deadline { - eprintln!("indexer did not catch up to {sequencer_tip} within 60s"); - return Ok(started.elapsed().as_secs_f64() * 1_000.0); - } - tokio::time::sleep(Duration::from_millis(200)).await; + }; + if tokio::time::timeout(timeout, poll).await.is_err() { + eprintln!("indexer did not catch up to {sequencer_tip} within {timeout:?}"); } + Ok(started.elapsed()) } diff --git a/tools/e2e_bench/src/scenarios/parallel.rs b/tools/e2e_bench/src/scenarios/parallel.rs index 43bd25ac..23dd2247 100644 --- a/tools/e2e_bench/src/scenarios/parallel.rs +++ b/tools/e2e_bench/src/scenarios/parallel.rs @@ -42,7 +42,10 @@ pub async fn run(ctx: &mut BenchContext) -> Result { } // Mint full supply into master. - let total_mint: u128 = (PARALLEL_FANOUT_N as u128) * AMOUNT_PER_TRANSFER * 10; + let total_mint = u128::try_from(PARALLEL_FANOUT_N) + .expect("usize fits u128") + .saturating_mul(AMOUNT_PER_TRANSFER) + .saturating_mul(10); { let pre_block = crate::harness::begin_step(ctx).await?; let started = Instant::now(); @@ -104,21 +107,21 @@ pub async fn run(ctx: &mut BenchContext) -> Result { .await?; } let all_submitted_at = Instant::now(); - let submit_duration_ms = (all_submitted_at - burst_started).as_secs_f64() * 1_000.0; + let submit_duration = all_submitted_at.saturating_duration_since(burst_started); // Wait for the chain to advance by at least 2 blocks past pre_block_burst. // That guarantees the block holding our burst is sealed and applied. crate::harness::wait_for_chain_advance(ctx, pre_block_burst, 2).await?; let inclusion_done_at = Instant::now(); - let inclusion_after_submit_ms = (inclusion_done_at - all_submitted_at).as_secs_f64() * 1_000.0; - let burst_total_ms = (inclusion_done_at - burst_started).as_secs_f64() * 1_000.0; + let inclusion_after_submit = inclusion_done_at.saturating_duration_since(all_submitted_at); + let burst_total = inclusion_done_at.saturating_duration_since(burst_started); eprintln!( - "parallel_fanout: submitted {} txs in {:.1} ms, inclusion in {:.1} ms, total {:.1} ms", + "parallel_fanout: submitted {} txs in {:.3}s, inclusion in {:.3}s, total {:.3}s", senders.len(), - submit_duration_ms, - inclusion_after_submit_ms, - burst_total_ms, + submit_duration.as_secs_f64(), + inclusion_after_submit.as_secs_f64(), + burst_total.as_secs_f64(), ); // Capture every block produced during the burst window. This is the @@ -149,13 +152,13 @@ pub async fn run(ctx: &mut BenchContext) -> Result { } // Synthesise a single summary "step" for the burst. Use the submit time - // for `submit_ms` and the inclusion-wait time for `inclusion_ms`. + // for `submit` and the inclusion-wait time for `inclusion`. let burst_step = StepResult { label: format!("burst_{}_transfers", senders.len()), - submit_ms: submit_duration_ms, - inclusion_ms: Some(inclusion_after_submit_ms), - wallet_sync_ms: None, - total_ms: burst_total_ms, + submit: submit_duration, + inclusion: Some(inclusion_after_submit), + wallet_sync: None, + total: burst_total, tx_hash: None, blocks, };