mirror of
https://github.com/logos-blockchain/lssa.git
synced 2026-05-22 01:30:00 +00:00
refactor(e2e_bench)!: Duration-typed timings, seconds-float JSON, tokio::timeout
BREAKING CHANGE: bench JSON renames per-step / per-scenario timing fields from *_ms (float milliseconds) to *_s (float seconds). Renames: submit_ms → submit_s, inclusion_ms → inclusion_s, wallet_sync_ms → wallet_sync_s, total_ms → total_s, setup_ms → setup_s, bedrock_finality_ms → bedrock_finality_s, total_wall_seconds → total_wall_s. measure_bedrock_finality timeout floor also shifts slightly: on timeout the field is now ~60.000s rather than "first poll tick past 60s".
This commit is contained in:
parent
c3daa9897d
commit
619db3846d
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -2382,7 +2382,6 @@ checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555"
|
||||
name = "e2e_bench"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"amm_core",
|
||||
"anyhow",
|
||||
"borsh",
|
||||
"chrono",
|
||||
@ -2392,15 +2391,12 @@ dependencies = [
|
||||
"indexer_service_rpc",
|
||||
"integration_tests",
|
||||
"jsonrpsee",
|
||||
"log",
|
||||
"nssa",
|
||||
"nssa_core",
|
||||
"sequencer_service",
|
||||
"sequencer_service_rpc",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
"token_core",
|
||||
"tokio",
|
||||
"wallet",
|
||||
]
|
||||
|
||||
@ -9,21 +9,21 @@ publish = false
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
common.workspace = true
|
||||
indexer_service.workspace = true
|
||||
indexer_service_rpc = { workspace = true, features = ["client"] }
|
||||
integration_tests.workspace = true
|
||||
wallet.workspace = true
|
||||
nssa.workspace = true
|
||||
sequencer_service.workspace = true
|
||||
sequencer_service_rpc = { workspace = true, features = ["client"] }
|
||||
indexer_service.workspace = true
|
||||
indexer_service_rpc = { workspace = true, features = ["client"] }
|
||||
jsonrpsee = { workspace = true, features = ["ws-client"] }
|
||||
common.workspace = true
|
||||
tempfile.workspace = true
|
||||
borsh.workspace = true
|
||||
chrono.workspace = true
|
||||
wallet.workspace = true
|
||||
|
||||
anyhow.workspace = true
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] }
|
||||
borsh.workspace = true
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
jsonrpsee = { workspace = true, features = ["ws-client"] }
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tempfile.workspace = true
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] }
|
||||
|
||||
@ -26,7 +26,7 @@ Scenarios: `token`, `amm`, `fanout`, `private`, `parallel`, `all`.
|
||||
|
||||
## What you'll see
|
||||
|
||||
Per scenario: a step table (`submit_ms`, `inclusion_ms`, `sync_ms`, `total_ms`) and a size summary covering every block captured during the scenario (block_bytes total/mean/min/max; per-tx-variant sizes for public, PPE, and program-deployment transactions).
|
||||
Per scenario: a step table (`submit_s`, `inclusion_s`, `sync_s`, `total_s`) and a size summary covering every block captured during the scenario (block_bytes total/mean/min/max; per-tx-variant sizes for public, PPE, and program-deployment transactions).
|
||||
|
||||
The fanout, parallel, and private scenarios are the most representative for L1-payload-size measurements since they put multiple txs per block.
|
||||
|
||||
|
||||
@ -2,14 +2,19 @@
|
||||
//! Launches a fresh Bedrock instance per scenario so the indexer never has to
|
||||
//! catch up a large finalization backlog.
|
||||
//!
|
||||
//! Required env vars (no defaults — path layouts differ per developer):
|
||||
//! - `LEZ_BEDROCK_BIN` — absolute path to the `logos-blockchain-node` binary.
|
||||
//! - `LEZ_BEDROCK_CONFIG_DIR` — directory containing `node-config.yaml` and
|
||||
//! Required env vars (no defaults, path layouts differ per developer):
|
||||
//! - `LEZ_BEDROCK_BIN` absolute path to the `logos-blockchain-node` binary.
|
||||
//! - `LEZ_BEDROCK_CONFIG_DIR` directory containing `node-config.yaml` and
|
||||
//! `deployment-settings.yaml` (template with `PLACEHOLDER_CHAIN_START_TIME`).
|
||||
//!
|
||||
//! Optional:
|
||||
//! - `LEZ_BEDROCK_PORT` (default: 18080)
|
||||
|
||||
#![allow(
|
||||
clippy::let_underscore_must_use,
|
||||
reason = "file is deleted in the docker-compose pivot; teardown ignores child kill/wait results by design"
|
||||
)]
|
||||
|
||||
use std::{
|
||||
env,
|
||||
net::SocketAddr,
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
//! BenchContext: wires sequencer + indexer + wallet in-process against an
|
||||
//! `BenchContext`: wires sequencer + indexer + wallet in-process against an
|
||||
//! externally-running Bedrock node. Mirrors the surface of
|
||||
//! `integration_tests::TestContext` for the methods the scenarios need
|
||||
//! (`wallet_mut()`, `sequencer_client()`), but skips the docker setup.
|
||||
@ -6,6 +6,11 @@
|
||||
//! The external Bedrock URL defaults to 127.0.0.1:18080 and can be overridden
|
||||
//! with the `LEZ_BEDROCK_ADDR` env var.
|
||||
|
||||
#![allow(
|
||||
clippy::arbitrary_source_item_ordering,
|
||||
reason = "file is deleted in the docker-compose pivot; ordering churn is wasted work"
|
||||
)]
|
||||
|
||||
use std::{env, net::SocketAddr, path::Path};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
@ -145,7 +150,7 @@ impl BenchContext {
|
||||
&self.sequencer_client
|
||||
}
|
||||
|
||||
pub fn indexer_addr(&self) -> SocketAddr {
|
||||
pub const fn indexer_addr(&self) -> SocketAddr {
|
||||
self.indexer_handle.addr()
|
||||
}
|
||||
|
||||
|
||||
@ -1,11 +1,16 @@
|
||||
//! Step / scenario timing primitives shared across scenarios.
|
||||
|
||||
#![allow(
|
||||
clippy::ref_option,
|
||||
reason = "serde::serialize_with requires fn(&Option<T>, S) -> Result<...>"
|
||||
)]
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{Result, bail};
|
||||
use common::transaction::NSSATransaction;
|
||||
use sequencer_service_rpc::RpcClient as _;
|
||||
use serde::Serialize;
|
||||
use serde::{Serialize, Serializer};
|
||||
use wallet::cli::SubcommandReturnValue;
|
||||
|
||||
use crate::bench_context::BenchContext;
|
||||
@ -16,7 +21,7 @@ const TX_INCLUSION_TIMEOUT: Duration = Duration::from_secs(120);
|
||||
/// Borsh-serialized sizes for one zone block fetched after a step. `block_bytes`
|
||||
/// is the full Block (header + body + bedrock metadata) and is the closest
|
||||
/// proxy we have to the L1 payload posted per block. `tx_bytes` is each contained
|
||||
/// transaction split by variant — this is what the fee model's S_tx slot covers.
|
||||
/// transaction split by variant, which is what the fee model's `S_tx` slot covers.
|
||||
#[derive(Debug, Serialize, Clone, Default)]
|
||||
pub struct BlockSize {
|
||||
pub block_id: u64,
|
||||
@ -29,22 +34,28 @@ pub struct BlockSize {
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct StepResult {
|
||||
pub label: String,
|
||||
pub submit_ms: f64,
|
||||
pub inclusion_ms: Option<f64>,
|
||||
pub wallet_sync_ms: Option<f64>,
|
||||
pub total_ms: f64,
|
||||
#[serde(serialize_with = "ser_duration_secs", rename = "submit_s")]
|
||||
pub submit: Duration,
|
||||
#[serde(serialize_with = "ser_opt_duration_secs", rename = "inclusion_s")]
|
||||
pub inclusion: Option<Duration>,
|
||||
#[serde(serialize_with = "ser_opt_duration_secs", rename = "wallet_sync_s")]
|
||||
pub wallet_sync: Option<Duration>,
|
||||
#[serde(serialize_with = "ser_duration_secs", rename = "total_s")]
|
||||
pub total: Duration,
|
||||
pub tx_hash: Option<String>,
|
||||
/// Borsh sizes for every zone block produced during this step.
|
||||
/// Empty for steps that don't advance the chain (e.g. RegisterAccount).
|
||||
/// Empty for steps that don't advance the chain (e.g. `RegisterAccount`).
|
||||
pub blocks: Vec<BlockSize>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Default)]
|
||||
pub struct ScenarioResult {
|
||||
pub name: String,
|
||||
pub setup_ms: f64,
|
||||
#[serde(serialize_with = "ser_duration_secs", rename = "setup_s")]
|
||||
pub setup: Duration,
|
||||
pub steps: Vec<StepResult>,
|
||||
pub total_ms: f64,
|
||||
#[serde(serialize_with = "ser_duration_secs", rename = "total_s")]
|
||||
pub total: Duration,
|
||||
/// Disk sizes (sequencer / indexer / wallet tempdirs) sampled at scenario start.
|
||||
pub disk_before: Option<crate::bench_context::DiskSizes>,
|
||||
/// Disk sizes sampled at scenario end.
|
||||
@ -53,7 +64,8 @@ pub struct ScenarioResult {
|
||||
/// reporting the sequencer tip as L1-finalised. Effectively measures the
|
||||
/// sequencer→Bedrock posting + Bedrock finalisation + indexer L1 ingest path.
|
||||
/// A value at the timeout (60s) means finalisation did not happen within the bench window.
|
||||
pub bedrock_finality_ms: Option<f64>,
|
||||
#[serde(serialize_with = "ser_opt_duration_secs", rename = "bedrock_finality_s")]
|
||||
pub bedrock_finality: Option<Duration>,
|
||||
}
|
||||
|
||||
impl ScenarioResult {
|
||||
@ -65,11 +77,18 @@ impl ScenarioResult {
|
||||
}
|
||||
|
||||
pub fn push(&mut self, step: StepResult) {
|
||||
self.total_ms += step.total_ms;
|
||||
self.total = self.total.saturating_add(step.total);
|
||||
self.steps.push(step);
|
||||
}
|
||||
}
|
||||
|
||||
/// Begin a timed step. Capture this *before* submitting the wallet operation
|
||||
/// so we can later subtract it from the post-submit block height to detect
|
||||
/// when the chain has advanced past the tx's block.
|
||||
pub async fn begin_step(ctx: &BenchContext) -> Result<u64> {
|
||||
Ok(ctx.sequencer_client().get_last_block_id().await?)
|
||||
}
|
||||
|
||||
/// Finish a timed wallet step. Records submit (the time between `started`
|
||||
/// being captured and `ret` being received) and, if `ret` is a
|
||||
/// [`SubcommandReturnValue::PrivacyPreservingTransfer`], polls the sequencer
|
||||
@ -79,15 +98,8 @@ impl ScenarioResult {
|
||||
/// ```ignore
|
||||
/// let started = Instant::now();
|
||||
/// let ret = wallet::cli::execute_subcommand(ctx.wallet_mut(), cmd).await?;
|
||||
/// let step = finalize_step("label", started, ret, ctx).await?;
|
||||
/// let step = finalize_step("label", started, pre_block_id, &ret, ctx).await?;
|
||||
/// ```
|
||||
/// Begin a timed step. Capture this *before* submitting the wallet operation
|
||||
/// so we can later subtract it from the post-submit block height to detect
|
||||
/// when the chain has advanced past the tx's block.
|
||||
pub async fn begin_step(ctx: &BenchContext) -> Result<u64> {
|
||||
Ok(ctx.sequencer_client().get_last_block_id().await?)
|
||||
}
|
||||
|
||||
pub async fn finalize_step(
|
||||
label: impl Into<String>,
|
||||
started: Instant,
|
||||
@ -96,11 +108,11 @@ pub async fn finalize_step(
|
||||
ctx: &mut BenchContext,
|
||||
) -> Result<StepResult> {
|
||||
let label = label.into();
|
||||
let submit_ms = started.elapsed().as_secs_f64() * 1_000.0;
|
||||
let submit = started.elapsed();
|
||||
|
||||
let mut tx_hash_str = None;
|
||||
let mut inclusion_ms = None;
|
||||
let mut wallet_sync_ms = None;
|
||||
let mut inclusion = None;
|
||||
let mut wallet_sync = None;
|
||||
let mut blocks: Vec<BlockSize> = Vec::new();
|
||||
|
||||
// For non-account-create steps (anything that produces a tx_hash, or even
|
||||
@ -115,11 +127,11 @@ pub async fn finalize_step(
|
||||
}
|
||||
let started_inclusion = Instant::now();
|
||||
wait_for_chain_advance(ctx, pre_block_id, 2).await?;
|
||||
inclusion_ms = Some(started_inclusion.elapsed().as_secs_f64() * 1_000.0);
|
||||
inclusion = Some(started_inclusion.elapsed());
|
||||
|
||||
let started_sync = Instant::now();
|
||||
sync_wallet_to_tip(ctx).await?;
|
||||
wallet_sync_ms = Some(started_sync.elapsed().as_secs_f64() * 1_000.0);
|
||||
wallet_sync = Some(started_sync.elapsed());
|
||||
|
||||
// Capture block-byte and per-tx-byte sizes for every block produced
|
||||
// during this step. We intentionally capture all blocks, including
|
||||
@ -151,10 +163,10 @@ pub async fn finalize_step(
|
||||
|
||||
Ok(StepResult {
|
||||
label,
|
||||
submit_ms,
|
||||
inclusion_ms,
|
||||
wallet_sync_ms,
|
||||
total_ms: started.elapsed().as_secs_f64() * 1_000.0,
|
||||
submit,
|
||||
inclusion,
|
||||
wallet_sync,
|
||||
total: started.elapsed(),
|
||||
tx_hash: tx_hash_str,
|
||||
blocks,
|
||||
})
|
||||
@ -167,19 +179,21 @@ pub async fn wait_for_chain_advance(
|
||||
min_blocks: u64,
|
||||
) -> Result<()> {
|
||||
let target = from_block_id.saturating_add(min_blocks);
|
||||
let deadline = Instant::now() + TX_INCLUSION_TIMEOUT;
|
||||
loop {
|
||||
match ctx.sequencer_client().get_last_block_id().await {
|
||||
Ok(current) if current >= target => return Ok(()),
|
||||
Ok(_) => {}
|
||||
Err(err) => eprintln!("get_last_block_id error (continuing poll): {err:#}"),
|
||||
let poll = async {
|
||||
loop {
|
||||
match ctx.sequencer_client().get_last_block_id().await {
|
||||
Ok(current) if current >= target => return,
|
||||
Ok(_) => {}
|
||||
Err(err) => eprintln!("get_last_block_id error (continuing poll): {err:#}"),
|
||||
}
|
||||
tokio::time::sleep(TX_INCLUSION_POLL_INTERVAL).await;
|
||||
}
|
||||
if Instant::now() > deadline {
|
||||
bail!(
|
||||
"chain did not advance from {from_block_id} to at least {target} within {TX_INCLUSION_TIMEOUT:?}"
|
||||
);
|
||||
}
|
||||
tokio::time::sleep(TX_INCLUSION_POLL_INTERVAL).await;
|
||||
};
|
||||
match tokio::time::timeout(TX_INCLUSION_TIMEOUT, poll).await {
|
||||
Ok(()) => Ok(()),
|
||||
Err(_) => bail!(
|
||||
"chain did not advance from {from_block_id} to at least {target} within {TX_INCLUSION_TIMEOUT:?}"
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
@ -199,38 +213,35 @@ pub fn print_table(result: &ScenarioResult) {
|
||||
.max("step".len());
|
||||
|
||||
println!(
|
||||
"\nScenario: {} (setup {:.1} ms ({:.2}s), total {:.1} ms ({:.2}s))",
|
||||
"\nScenario: {} (setup {:.2}s, total {:.2}s)",
|
||||
result.name,
|
||||
result.setup_ms,
|
||||
result.setup_ms / 1_000.0,
|
||||
result.total_ms,
|
||||
result.total_ms / 1_000.0,
|
||||
result.setup.as_secs_f64(),
|
||||
result.total.as_secs_f64(),
|
||||
);
|
||||
println!(
|
||||
"{:<lw$} {:>10} {:>12} {:>10} {:>16}",
|
||||
"{:<lw$} {:>10} {:>12} {:>10} {:>10}",
|
||||
"step",
|
||||
"submit_ms",
|
||||
"inclusion_ms",
|
||||
"sync_ms",
|
||||
"total_ms (s)",
|
||||
"submit_s",
|
||||
"inclusion_s",
|
||||
"sync_s",
|
||||
"total_s",
|
||||
lw = label_width,
|
||||
);
|
||||
println!("{}", "-".repeat(label_width + 62));
|
||||
println!("{}", "-".repeat(label_width.saturating_add(50)));
|
||||
for s in &result.steps {
|
||||
let inclusion = s
|
||||
.inclusion_ms
|
||||
.map_or_else(|| "-".to_owned(), |v| format!("{v:.1}"));
|
||||
.inclusion
|
||||
.map_or_else(|| "-".to_owned(), |v| format!("{:.3}", v.as_secs_f64()));
|
||||
let sync = s
|
||||
.wallet_sync_ms
|
||||
.map_or_else(|| "-".to_owned(), |v| format!("{v:.1}"));
|
||||
let total = format!("{:.1} ({:.2}s)", s.total_ms, s.total_ms / 1_000.0);
|
||||
.wallet_sync
|
||||
.map_or_else(|| "-".to_owned(), |v| format!("{:.3}", v.as_secs_f64()));
|
||||
println!(
|
||||
"{:<lw$} {:>10.1} {:>12} {:>10} {:>16}",
|
||||
"{:<lw$} {:>10.3} {:>12} {:>10} {:>10.3}",
|
||||
s.label,
|
||||
s.submit_ms,
|
||||
s.submit.as_secs_f64(),
|
||||
inclusion,
|
||||
sync,
|
||||
total,
|
||||
s.total.as_secs_f64(),
|
||||
lw = label_width,
|
||||
);
|
||||
}
|
||||
@ -295,3 +306,17 @@ fn print_tx_line(label: &str, samples: &[usize]) {
|
||||
fn mean_usize(xs: &[usize]) -> usize {
|
||||
xs.iter().sum::<usize>().checked_div(xs.len()).unwrap_or(0)
|
||||
}
|
||||
|
||||
fn ser_duration_secs<S: Serializer>(d: &Duration, s: S) -> std::result::Result<S::Ok, S::Error> {
|
||||
s.serialize_f64(d.as_secs_f64())
|
||||
}
|
||||
|
||||
fn ser_opt_duration_secs<S: Serializer>(
|
||||
d: &Option<Duration>,
|
||||
s: S,
|
||||
) -> std::result::Result<S::Ok, S::Error> {
|
||||
match d {
|
||||
Some(d) => s.serialize_f64(d.as_secs_f64()),
|
||||
None => s.serialize_none(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,34 +7,25 @@
|
||||
//! block + tx sizes per scenario.
|
||||
//!
|
||||
//! Required env vars (no defaults; see `tools/e2e_bench/README.md`):
|
||||
//! LEZ_BEDROCK_BIN absolute path to logos-blockchain-node.
|
||||
//! LEZ_BEDROCK_CONFIG_DIR directory with node-config.yaml + deployment template.
|
||||
//! `LEZ_BEDROCK_BIN` absolute path to logos-blockchain-node.
|
||||
//! `LEZ_BEDROCK_CONFIG_DIR` directory with node-config.yaml + deployment template.
|
||||
//!
|
||||
//! Run examples:
|
||||
//! RISC0_DEV_MODE=1 cargo run --release -p e2e_bench -- --scenario all.
|
||||
//! cargo run --release -p e2e_bench -- --scenario amm.
|
||||
//! `RISC0_DEV_MODE=1` `cargo run --release -p e2e_bench -- --scenario all`.
|
||||
//! `cargo run --release -p e2e_bench -- --scenario amm`.
|
||||
//!
|
||||
//! `RISC0_DEV_MODE=1` skips proving and produces latency-only numbers in
|
||||
//! ~minutes; omitting it produces realistic proving-inclusive numbers but
|
||||
//! the run takes much longer.
|
||||
|
||||
#![expect(
|
||||
clippy::arbitrary_source_item_ordering,
|
||||
#![allow(
|
||||
clippy::arithmetic_side_effects,
|
||||
clippy::as_conversions,
|
||||
clippy::doc_markdown,
|
||||
clippy::float_arithmetic,
|
||||
clippy::let_underscore_must_use,
|
||||
clippy::let_underscore_untyped,
|
||||
clippy::missing_const_for_fn,
|
||||
clippy::print_stderr,
|
||||
clippy::print_stdout,
|
||||
clippy::single_call_fn,
|
||||
clippy::single_match_else,
|
||||
clippy::std_instead_of_core,
|
||||
clippy::too_many_lines,
|
||||
clippy::wildcard_enum_match_arm,
|
||||
reason = "Bench tool: matches test-style fixture code"
|
||||
reason = "Bench tool: stderr/stdout output is the deliverable; small Duration / iterator-sum \
|
||||
arithmetic is safe at bench scale; bench scenarios bail loudly on any unexpected \
|
||||
return variant, which is preferable to maintaining an exhaustive list in five files."
|
||||
)]
|
||||
|
||||
use std::{path::PathBuf, time::Duration};
|
||||
@ -68,7 +59,7 @@ struct Cli {
|
||||
#[arg(long, value_enum, default_value_t = ScenarioName::All)]
|
||||
scenario: ScenarioName,
|
||||
|
||||
/// Optional JSON output path. Defaults to <workspace>/target/e2e_bench.json.
|
||||
/// Optional JSON output path. Defaults to `<workspace>/target/e2e_bench.json`.
|
||||
#[arg(long)]
|
||||
json_out: Option<PathBuf>,
|
||||
}
|
||||
@ -77,7 +68,7 @@ struct Cli {
|
||||
struct BenchRunReport {
|
||||
risc0_dev_mode: bool,
|
||||
scenarios: Vec<ScenarioResult>,
|
||||
total_wall_seconds: f64,
|
||||
total_wall_s: f64,
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
@ -110,61 +101,61 @@ async fn main() -> Result<()> {
|
||||
|
||||
for name in to_run {
|
||||
eprintln!("\n=== running scenario: {name:?} ===");
|
||||
let setup_started = std::time::Instant::now();
|
||||
// Spawn a fresh Bedrock node for this scenario. Each scenario therefore
|
||||
// starts with an empty chain so the indexer never has a backlog from a
|
||||
// prior scenario.
|
||||
let bedrock = BedrockHandle::launch_fresh()
|
||||
.await
|
||||
.with_context(|| format!("failed to spawn Bedrock for scenario {name:?}"))?;
|
||||
let bedrock_addr_string = format!("{}", bedrock.addr());
|
||||
// Safety: we restore the previous LEZ_BEDROCK_ADDR value (if any) at scenario teardown.
|
||||
// SAFETY: this happens before any threaded setup that reads env.
|
||||
unsafe {
|
||||
std::env::set_var("LEZ_BEDROCK_ADDR", &bedrock_addr_string);
|
||||
{
|
||||
let setup_started = std::time::Instant::now();
|
||||
// Spawn a fresh Bedrock node for this scenario. Each scenario therefore
|
||||
// starts with an empty chain so the indexer never has a backlog from a
|
||||
// prior scenario.
|
||||
let bedrock = BedrockHandle::launch_fresh()
|
||||
.await
|
||||
.with_context(|| format!("failed to spawn Bedrock for scenario {name:?}"))?;
|
||||
let bedrock_addr_string = format!("{}", bedrock.addr());
|
||||
// SAFETY: env::set_var happens before any threaded setup that reads env.
|
||||
unsafe {
|
||||
std::env::set_var("LEZ_BEDROCK_ADDR", &bedrock_addr_string);
|
||||
}
|
||||
|
||||
let mut ctx = BenchContext::new()
|
||||
.await
|
||||
.with_context(|| format!("failed to setup BenchContext for scenario {name:?}"))?;
|
||||
let setup = setup_started.elapsed();
|
||||
eprintln!("setup: {:.2}s", setup.as_secs_f64());
|
||||
|
||||
let disk_before = ctx.disk_sizes();
|
||||
let mut result = run_scenario(name, setup, &mut ctx).await?;
|
||||
result.disk_before = Some(disk_before);
|
||||
result.disk_after = Some(ctx.disk_sizes());
|
||||
result.bedrock_finality = Some(measure_bedrock_finality(&ctx).await?);
|
||||
harness::print_table(&result);
|
||||
all_results.push(result);
|
||||
|
||||
// ctx and bedrock drop here at end of scope, killing the bedrock child
|
||||
// before we sleep so the next iteration can rebind the port.
|
||||
}
|
||||
|
||||
let mut ctx = BenchContext::new()
|
||||
.await
|
||||
.with_context(|| format!("failed to setup BenchContext for scenario {name:?}"))?;
|
||||
let setup_ms = elapsed_ms(setup_started);
|
||||
eprintln!("setup: {setup_ms:.1} ms");
|
||||
|
||||
let disk_before = ctx.disk_sizes();
|
||||
let mut result = run_scenario(name, setup_ms, &mut ctx).await?;
|
||||
result.disk_before = Some(disk_before);
|
||||
result.disk_after = Some(ctx.disk_sizes());
|
||||
result.bedrock_finality_ms = Some(measure_bedrock_finality(&ctx).await?);
|
||||
harness::print_table(&result);
|
||||
all_results.push(result);
|
||||
|
||||
drop(ctx);
|
||||
drop(bedrock);
|
||||
// Give Bedrock a moment to shut down before the next scenario.
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
}
|
||||
|
||||
let total_wall_seconds = overall_started.elapsed().as_secs_f64();
|
||||
eprintln!("\nTotal wall time: {total_wall_seconds:.1}s");
|
||||
let total_wall_s = overall_started.elapsed().as_secs_f64();
|
||||
eprintln!("\nTotal wall time: {total_wall_s:.1}s");
|
||||
|
||||
let report = BenchRunReport {
|
||||
risc0_dev_mode,
|
||||
scenarios: all_results,
|
||||
total_wall_seconds,
|
||||
total_wall_s,
|
||||
};
|
||||
|
||||
let out_path = match cli.json_out {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
||||
.join("..")
|
||||
.join("..")
|
||||
.canonicalize()?;
|
||||
let suffix = if risc0_dev_mode { "dev" } else { "prove" };
|
||||
workspace_root
|
||||
.join("target")
|
||||
.join(format!("e2e_bench_{suffix}.json"))
|
||||
}
|
||||
let out_path = if let Some(p) = cli.json_out {
|
||||
p
|
||||
} else {
|
||||
let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
||||
.join("..")
|
||||
.join("..")
|
||||
.canonicalize()?;
|
||||
let suffix = if risc0_dev_mode { "dev" } else { "prove" };
|
||||
workspace_root
|
||||
.join("target")
|
||||
.join(format!("e2e_bench_{suffix}.json"))
|
||||
};
|
||||
if let Some(parent) = out_path.parent() {
|
||||
std::fs::create_dir_all(parent)?;
|
||||
@ -177,7 +168,7 @@ async fn main() -> Result<()> {
|
||||
|
||||
async fn run_scenario(
|
||||
name: ScenarioName,
|
||||
setup_ms: f64,
|
||||
setup: Duration,
|
||||
ctx: &mut BenchContext,
|
||||
) -> Result<ScenarioResult> {
|
||||
let result = match name {
|
||||
@ -188,17 +179,13 @@ async fn run_scenario(
|
||||
ScenarioName::Parallel => scenarios::parallel::run(ctx).await?,
|
||||
ScenarioName::All => unreachable!("dispatched above"),
|
||||
};
|
||||
Ok(ScenarioResult { setup_ms, ..result })
|
||||
}
|
||||
|
||||
fn elapsed_ms(t: std::time::Instant) -> f64 {
|
||||
t.elapsed().as_secs_f64() * 1_000.0
|
||||
Ok(ScenarioResult { setup, ..result })
|
||||
}
|
||||
|
||||
/// Poll the indexer's L1-finalised block id until it catches up with the
|
||||
/// sequencer's last block id. This is effectively the sequencer→Bedrock posting
|
||||
/// plus Bedrock finalisation plus indexer ingest latency.
|
||||
async fn measure_bedrock_finality(ctx: &BenchContext) -> Result<f64> {
|
||||
async fn measure_bedrock_finality(ctx: &BenchContext) -> Result<Duration> {
|
||||
use indexer_service_rpc::RpcClient as _;
|
||||
use jsonrpsee::ws_client::WsClientBuilder;
|
||||
use sequencer_service_rpc::RpcClient as _;
|
||||
@ -210,20 +197,20 @@ async fn measure_bedrock_finality(ctx: &BenchContext) -> Result<f64> {
|
||||
.context("connect indexer WS")?;
|
||||
let sequencer_tip = ctx.sequencer_client().get_last_block_id().await?;
|
||||
|
||||
let timeout = Duration::from_secs(60);
|
||||
let started = std::time::Instant::now();
|
||||
let deadline = started + Duration::from_secs(60);
|
||||
loop {
|
||||
match indexer_ws.get_last_finalized_block_id().await {
|
||||
Ok(Some(b)) if b >= sequencer_tip => {
|
||||
return Ok(started.elapsed().as_secs_f64() * 1_000.0);
|
||||
let poll = async {
|
||||
loop {
|
||||
match indexer_ws.get_last_finalized_block_id().await {
|
||||
Ok(Some(b)) if b >= sequencer_tip => return,
|
||||
Ok(_) => {}
|
||||
Err(err) => eprintln!("indexer last_synced poll error: {err:#}"),
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(err) => eprintln!("indexer last_synced poll error: {err:#}"),
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
}
|
||||
if std::time::Instant::now() > deadline {
|
||||
eprintln!("indexer did not catch up to {sequencer_tip} within 60s");
|
||||
return Ok(started.elapsed().as_secs_f64() * 1_000.0);
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
};
|
||||
if tokio::time::timeout(timeout, poll).await.is_err() {
|
||||
eprintln!("indexer did not catch up to {sequencer_tip} within {timeout:?}");
|
||||
}
|
||||
Ok(started.elapsed())
|
||||
}
|
||||
|
||||
@ -42,7 +42,10 @@ pub async fn run(ctx: &mut BenchContext) -> Result<ScenarioResult> {
|
||||
}
|
||||
|
||||
// Mint full supply into master.
|
||||
let total_mint: u128 = (PARALLEL_FANOUT_N as u128) * AMOUNT_PER_TRANSFER * 10;
|
||||
let total_mint = u128::try_from(PARALLEL_FANOUT_N)
|
||||
.expect("usize fits u128")
|
||||
.saturating_mul(AMOUNT_PER_TRANSFER)
|
||||
.saturating_mul(10);
|
||||
{
|
||||
let pre_block = crate::harness::begin_step(ctx).await?;
|
||||
let started = Instant::now();
|
||||
@ -104,21 +107,21 @@ pub async fn run(ctx: &mut BenchContext) -> Result<ScenarioResult> {
|
||||
.await?;
|
||||
}
|
||||
let all_submitted_at = Instant::now();
|
||||
let submit_duration_ms = (all_submitted_at - burst_started).as_secs_f64() * 1_000.0;
|
||||
let submit_duration = all_submitted_at.saturating_duration_since(burst_started);
|
||||
|
||||
// Wait for the chain to advance by at least 2 blocks past pre_block_burst.
|
||||
// That guarantees the block holding our burst is sealed and applied.
|
||||
crate::harness::wait_for_chain_advance(ctx, pre_block_burst, 2).await?;
|
||||
let inclusion_done_at = Instant::now();
|
||||
let inclusion_after_submit_ms = (inclusion_done_at - all_submitted_at).as_secs_f64() * 1_000.0;
|
||||
let burst_total_ms = (inclusion_done_at - burst_started).as_secs_f64() * 1_000.0;
|
||||
let inclusion_after_submit = inclusion_done_at.saturating_duration_since(all_submitted_at);
|
||||
let burst_total = inclusion_done_at.saturating_duration_since(burst_started);
|
||||
|
||||
eprintln!(
|
||||
"parallel_fanout: submitted {} txs in {:.1} ms, inclusion in {:.1} ms, total {:.1} ms",
|
||||
"parallel_fanout: submitted {} txs in {:.3}s, inclusion in {:.3}s, total {:.3}s",
|
||||
senders.len(),
|
||||
submit_duration_ms,
|
||||
inclusion_after_submit_ms,
|
||||
burst_total_ms,
|
||||
submit_duration.as_secs_f64(),
|
||||
inclusion_after_submit.as_secs_f64(),
|
||||
burst_total.as_secs_f64(),
|
||||
);
|
||||
|
||||
// Capture every block produced during the burst window. This is the
|
||||
@ -149,13 +152,13 @@ pub async fn run(ctx: &mut BenchContext) -> Result<ScenarioResult> {
|
||||
}
|
||||
|
||||
// Synthesise a single summary "step" for the burst. Use the submit time
|
||||
// for `submit_ms` and the inclusion-wait time for `inclusion_ms`.
|
||||
// for `submit` and the inclusion-wait time for `inclusion`.
|
||||
let burst_step = StepResult {
|
||||
label: format!("burst_{}_transfers", senders.len()),
|
||||
submit_ms: submit_duration_ms,
|
||||
inclusion_ms: Some(inclusion_after_submit_ms),
|
||||
wallet_sync_ms: None,
|
||||
total_ms: burst_total_ms,
|
||||
submit: submit_duration,
|
||||
inclusion: Some(inclusion_after_submit),
|
||||
wallet_sync: None,
|
||||
total: burst_total,
|
||||
tx_hash: None,
|
||||
blocks,
|
||||
};
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user