diff --git a/sz-poc-offsite-2025/Cargo.toml b/sz-poc-offsite-2025/Cargo.toml index fd41978..0e54241 100644 --- a/sz-poc-offsite-2025/Cargo.toml +++ b/sz-poc-offsite-2025/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["evm/processor", "evm/sequencer-node"] +members = ["evm/processor", "evm/sequencer-node", "evm/prover"] resolver = "3" [workspace.package] @@ -9,6 +9,7 @@ edition = "2024" # Internal evm-processor = { path = "evm/processor" } evm-sequencer-node = { path = "evm/sequencer-node" } +evm-prover = { path = "evm/prover" } # External eyre = { version = "0.6" } diff --git a/sz-poc-offsite-2025/evm/processor/src/lib.rs b/sz-poc-offsite-2025/evm/processor/src/lib.rs index c32243f..3cba4b5 100644 --- a/sz-poc-offsite-2025/evm/processor/src/lib.rs +++ b/sz-poc-offsite-2025/evm/processor/src/lib.rs @@ -1,9 +1,9 @@ -use reth_ethereum::Block; -use executor_http_client::{ExecutorHttpClient, Error}; pub use executor_http_client::BasicAuthCredentials; -use reqwest::Url; -use reth_tracing::tracing::{info, error}; +use executor_http_client::{Error, ExecutorHttpClient}; use kzgrs_backend::{dispersal::Metadata, encoder::DaEncoderParams}; +use reqwest::Url; +use reth_ethereum::Block; +use reth_tracing::tracing::{error, info}; pub struct Processor { da: NomosDa, @@ -11,9 +11,7 @@ pub struct Processor { impl Processor { pub fn new(da: NomosDa) -> Self { - Self { - da - } + Self { da } } pub async fn process_blocks(&mut self, new_blocks: impl Iterator) { @@ -22,12 +20,11 @@ impl Processor { let metadata = Metadata::new([0; 32], block.number.into()); // the node expects blobs to be padded to the next chunk size let remainder = blob.len() % DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE; - blob.extend( - std::iter::repeat(0) - .take(DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE - remainder), - ); - if let Err(e) = self.da.disperse(blob, metadata).await - { + blob.extend(std::iter::repeat_n( + 0, + DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE - remainder, + )); + if let Err(e) = self.da.disperse(blob, metadata).await { error!("Failed to disperse block: {e}"); } else { info!("Dispersed block: {:?}", block); @@ -36,25 +33,22 @@ impl Processor { } } - - pub struct NomosDa { url: Url, client: ExecutorHttpClient, } - - impl NomosDa { pub fn new(basic_auth: BasicAuthCredentials, url: Url) -> Self { Self { client: ExecutorHttpClient::new(Some(basic_auth)), url, } - } + } pub async fn disperse(&self, data: Vec, metadata: Metadata) -> Result<(), Error> { self.client - .publish_blob(self.url.clone(), data, metadata).await + .publish_blob(self.url.clone(), data, metadata) + .await } -} \ No newline at end of file +} diff --git a/sz-poc-offsite-2025/evm/prover/Cargo.toml b/sz-poc-offsite-2025/evm/prover/Cargo.toml new file mode 100644 index 0000000..b27505a --- /dev/null +++ b/sz-poc-offsite-2025/evm/prover/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "evm-prover" +edition = { workspace = true } + +[dependencies] +clap = { version = "4.5", features = ["derive"] } +reqwest = { version = "0.11", features = ["blocking", "json"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/sz-poc-offsite-2025/evm/prover/src/main.rs b/sz-poc-offsite-2025/evm/prover/src/main.rs new file mode 100644 index 0000000..8694f4e --- /dev/null +++ b/sz-poc-offsite-2025/evm/prover/src/main.rs @@ -0,0 +1,168 @@ +use clap::Parser; +use reqwest::blocking::Client; +use serde_json::{Value, json}; +use std::{path::PathBuf, process::Command, thread, time::Duration}; +use tracing::{debug, error, info}; +use tracing_subscriber::{EnvFilter, fmt}; + +#[derive(Parser, Debug)] +#[clap(author, version, about = "Ethereum Proof Generation Tool")] +struct Args { + #[clap(long, default_value = "http://localhost:8546")] + rpc: String, + + #[clap(long, default_value = "5")] + start_block: u64, + + #[clap(long, default_value = "10")] + batch_size: u64, + + #[clap(long, default_value = "5")] + interval: u64, + + #[clap(long)] + zeth_binary_dir: Option, + + #[clap(long, default_value = "info")] + log_level: String, +} + +fn run_ethereum_prove( + rpc: &str, + block_number: u64, + batch_size: u64, + zeth_binary_dir: Option, + log_level: &str, +) -> Result<(), String> { + info!( + "Running Ethereum prove for blocks {}-{}", + block_number, + block_number + batch_size - 1 + ); + + let mut binary_path = if let Some(dir) = &zeth_binary_dir { + PathBuf::from(dir) + } else { + debug!("No binary directory provided, trying current directory"); + std::env::current_dir().map_err(|e| format!("Failed to get current directory: {}", e))? + }; + + binary_path.push("zeth-ethereum"); + + if !binary_path.exists() { + return Err(format!("Binary not found at: {}", binary_path.display())); + } + + let output = Command::new(&binary_path) + .env("RUST_LOG", log_level) + .args([ + "prove", + &format!("--rpc={}", rpc), + &format!("--block-number={}", block_number), + &format!("--block-count={}", batch_size), + ]) + .output() + .map_err(|e| format!("Failed to execute zeth-ethereum prove: {}", e))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + error!("ethereum prove command failed: {}", stderr); + return Err(format!( + "ethereum prove command failed with status: {}\nStderr: {}", + output.status, stderr + )); + } + + info!("Successfully processed batch"); + Ok(()) +} + +fn get_latest_block(client: &Client, rpc: &str) -> Result { + debug!("Checking latest block height..."); + + let request_body = json!({ + "jsonrpc": "2.0", + "method": "eth_blockNumber", + "params": [], + "id": 1 + }); + + let response = client + .post(rpc) + .json(&request_body) + .send() + .map_err(|e| format!("Failed to send request: {}", e))?; + + if !response.status().is_success() { + return Err(format!("Request failed with status: {}", response.status())); + } + + let response_json: Value = response + .json() + .map_err(|e| format!("Failed to parse response JSON: {}", e))?; + + let block_hex = response_json + .get("result") + .and_then(Value::as_str) + .ok_or("Failed to parse response")? + .trim_start_matches("0x"); + + let block_number = u64::from_str_radix(block_hex, 16) + .map_err(|e| format!("Failed to parse hex block number: {}", e))?; + + debug!("Latest block: {}", block_number); + Ok(block_number) +} + +fn main() -> Result<(), Box> { + let args = Args::parse(); + + let filter = + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&args.log_level)); + + fmt::fmt().with_env_filter(filter).with_target(false).init(); + + info!("Starting Ethereum prover..."); + + let client = Client::new(); + let mut current_block = args.start_block; + + loop { + match get_latest_block(&client, &args.rpc) { + Ok(latest_block) => { + if latest_block >= current_block + args.batch_size { + info!( + "New blocks available. Current: {}, Latest: {}", + current_block, latest_block + ); + + match run_ethereum_prove( + &args.rpc, + current_block, + args.batch_size, + args.zeth_binary_dir.clone(), + &args.log_level, + ) { + Ok(_) => { + current_block += args.batch_size; + info!("Updated current block to {}", current_block); + } + Err(e) => { + error!("Error running prover: {}", e); + } + } + } else { + info!( + "No new blocks to process. Current: {}, Latest: {}, sleeping...", + current_block, latest_block + ); + thread::sleep(Duration::from_secs(args.interval)); + } + } + Err(e) => { + error!("Error getting latest block: {}", e); + thread::sleep(Duration::from_secs(args.interval)); + } + } + } +}