diff --git a/lightnode/src/da.rs b/lightnode/src/da.rs index dba14e1..2179a63 100644 --- a/lightnode/src/da.rs +++ b/lightnode/src/da.rs @@ -1,6 +1,4 @@ use anyhow::Result; use nomos_core::da::BlobId; -pub async fn sampling(_blob_id: BlobId) -> Result<()> { - Ok(()) -} +// Tutorial code diff --git a/lightnode/src/lib.rs b/lightnode/src/lib.rs index 43a21d9..c94d023 100644 --- a/lightnode/src/lib.rs +++ b/lightnode/src/lib.rs @@ -8,101 +8,4 @@ use serde::{Deserialize, Serialize}; use anyhow::Result; use tracing::{debug, info}; -pub const CRYPTARCHIA_INFO: &str = "cryptarchia/info"; -pub const STORAGE_BLOCK: &str = "storage/block"; -use futures::Stream; - -pub mod da; -pub mod nomos; -pub mod proofcheck; - -#[derive(Clone, Debug)] -pub struct Credentials { - pub username: String, - pub password: Option, -} - -pub struct NomosClient { - base_url: Url, - reqwest_client: reqwest::Client, - basic_auth: Credentials, - nomos_client: ExecutorHttpClient, -} - -impl NomosClient { - pub fn new(base_url: Url, basic_auth: Credentials) -> Self { - Self { - base_url, - reqwest_client: reqwest::Client::new(), - basic_auth: basic_auth.clone(), - nomos_client: ExecutorHttpClient::new(Some(BasicAuthCredentials::new( - basic_auth.username, - basic_auth.password, - ))), - } - } - - pub async fn get_cryptarchia_info(&self) -> Result { - let url = self.base_url.join(CRYPTARCHIA_INFO).expect("Invalid URL"); - - debug!("Requesting cryptarchia info from {}", url); - let request = self.reqwest_client.get(url).basic_auth( - &self.basic_auth.username, - self.basic_auth.password.as_deref(), - ); - - let response = request.send().await?; - - if !response.status().is_success() { - anyhow::bail!("Failed to get cryptarchia info: {}", response.status()); - } - - let info = response.json::().await?; - Ok(info) - } - - pub async fn get_block(&self, id: HeaderId) -> Result { - let url = self.base_url.join(STORAGE_BLOCK).expect("Invalid URL"); - - info!("Requesting block with HeaderId {}", id); - let request = self - .reqwest_client - .post(url) - .header("Content-Type", "application/json") - .basic_auth( - &self.basic_auth.username, - self.basic_auth.password.as_deref(), - ) - .body(serde_json::to_string(&id).unwrap()); - - let response = request.send().await?; - - if !response.status().is_success() { - anyhow::bail!("Failed to get block: {}", response.status()); - } - - let block: Block = response.json().await?; - - Ok(block) - } - - pub async fn get_shares( - &self, - blob_id: [u8; 32], - ) -> Result, Error> { - self.nomos_client - .get_shares::( - self.base_url.clone(), - blob_id, - HashSet::new(), - HashSet::new(), - true, - ) - .await - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Block { - pub blobs: Vec<[u8; 32]>, -} +// Tutorial code diff --git a/lightnode/src/main.rs b/lightnode/src/main.rs index b156da4..e3cd5aa 100644 --- a/lightnode/src/main.rs +++ b/lightnode/src/main.rs @@ -13,245 +13,4 @@ use url::Url; use anyhow::Result; use tokio::time::{sleep, Duration}; -#[derive(Parser, Debug)] -#[clap(author, version, about = "Light Node validator")] -struct Args { - #[clap(long, default_value = "info")] - log_level: String, - - #[clap(long, default_value = "http://localhost:8545")] - rpc: Url, - - #[clap(long, default_value = "ws://localhost:8546")] - ws_rpc: Url, - - #[clap(long, default_value = "http://localhost:8070")] - prover_url: Url, - - #[clap(long)] - nomos_node: Url, - - #[clap(long, default_value = "10")] - batch_size: u64, - - #[clap(long)] - zeth_binary_dir: Option, -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - let args = Args::parse(); - - let filter = - EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&args.log_level)); - - fmt::fmt().with_env_filter(filter).with_target(false).init(); - - let zone_blocks = follow_sz(args.ws_rpc.clone()).await.unwrap(); - let (tx, da_blobs) = tokio::sync::mpsc::channel::(MAX_BLOBS); - let username = std::env::var("NOMOS_USER").unwrap_or_default(); - let password = std::env::var("NOMOS_PASSWORD").ok(); - tokio::spawn(check_blobs( - NomosClient::new( - args.nomos_node.clone(), - Credentials { - username, - password, - }, - ), - tx - )); - - verify_zone_stf( - args.batch_size, - args.rpc.clone(), - args.prover_url.clone(), - args.zeth_binary_dir.as_deref().unwrap_or_else(|| Path::new("zeth")), - zone_blocks, - tokio_stream::wrappers::ReceiverStream::new(da_blobs), - ) - .await?; - - - Ok(()) -} - -const MAX_BLOBS: usize = 1 << 10; -const MAX_PROOF_RETRIES: usize = 5; - -async fn verify_zone_stf( - batch_size: u64, - rpc: Url, - prover_url: Url, - zeth_binary_dir: &Path, - blocks: impl Stream, - included_blobs: impl Stream, -) -> Result<(), Box> { - let mut blobs_on_consensus = lru::LruCache::new(NonZero::new(MAX_BLOBS).unwrap()); - let mut sz_blobs = lru::LruCache::new(NonZero::new(MAX_BLOBS).unwrap()); - - tokio::pin!(blocks); - tokio::pin!(included_blobs); - loop { - let to_verify = tokio::select! { - Some((block_n, blob_id)) = blocks.next() => { - if blobs_on_consensus.pop(&blob_id).is_some() { - tracing::debug!("Block confirmed on consensus: {:?}", block_n); - Some((block_n, blob_id)) - } else { - if let Some(expired) = sz_blobs.push(blob_id, block_n) { - tracing::warn!("Block was not confirmed on mainnet: {:?}", expired); - } - None - } - } - Some(blob_id) = included_blobs.next() => { - if let Some(block_n) = sz_blobs.pop(&blob_id) { - tracing::debug!("Block confirmed on consensus: {:?}", block_n); - Some((block_n, blob_id)) - } else { - // Blobs coming from other zones are not going to be confirmed and - // should be removed from the cache. - // It is highly unlikely that a zone blob is seen first on consensus and - // later enough on the sz rpc to be dropped from the cache, although - // it is possible. - // Do something here if you want to keep track of those blobs. - let _ = blobs_on_consensus.push(blob_id, ()); - None - } - } - }; - - if let Some((block_n, blob_id)) = to_verify { - let rpc = rpc.clone(); - let prover_url = prover_url.clone(); - let zeth_binary_dir = zeth_binary_dir.to_path_buf(); - tokio::spawn(async move { - verify_blob( - block_n, - blob_id, - batch_size, - &rpc, - &prover_url, - &zeth_binary_dir, - ) - .await - .unwrap_or_else(|e| { - tracing::error!("Failed to verify blob: {:?}", e); - }); - }); - } - } -} - -async fn verify_blob( - block_number: u64, - blob: BlobId, - batch_size: u64, - rpc: &Url, - prover_url: &Url, - zeth_binary_dir: &Path, -) -> Result<()> { - // block are proved in batches, aligned to `batch_size` - let block_number = block_number - block_number % batch_size; - futures::try_join!(evm_lightnode::da::sampling(blob), async { - let mut sleep_time = 1; - for _ in 0..MAX_PROOF_RETRIES { - if proofcheck::verify_proof( - block_number, - batch_size, - rpc, - prover_url, - zeth_binary_dir, - ) - .await.is_ok() { - return Ok(()); - } - - sleep(Duration::from_secs(sleep_time)).await; - sleep_time *= 2; - } - Err(anyhow::anyhow!("Failed to verify proof after {} retries", MAX_PROOF_RETRIES)) - })?; - // TODO: reuse rpc results - Ok(()) -} - -/// Follow the sovereign zone and return the blob ID of each new produced block -async fn follow_sz( - ws_rpc: Url, -) -> Result, Box> { - let provider = ProviderBuilder::new().on_ws(WsConnect::new(ws_rpc)).await?; - // Pub-sub: get a live stream of blocks - let blocks = provider - .subscribe_full_blocks() - .full() - .into_stream() - .await?; - - Ok(blocks.filter_map(|block| async move { - if let Ok(block) = block { - // poor man's proof of equivalence - use alloy::consensus::{Block, Signed}; - let block: Block> = block.into_consensus().convert_transactions(); - let (data, _) = evm_sequencer_node::encode_block(&block.clone().convert_transactions()); - let blob_id = { - let encoder = kzgrs_backend::encoder::DaEncoder::new( - kzgrs_backend::encoder::DaEncoderParams::new( - 2, - false, - kzgrs_backend::global::GLOBAL_PARAMETERS.clone(), - ), - ); - // this is a REALLY heavy task, so we should try not to block the thread here - let heavy_task = tokio::task::spawn_blocking(move || encoder.encode(&data)); - let encoded_data = heavy_task.await.unwrap().unwrap(); - kzgrs_backend::common::build_blob_id( - &encoded_data.row_commitments, - ) - }; - - Some((block.header.number, blob_id)) - } else { - tracing::error!("Failed to get block"); - None - } - })) -} - -#[derive(Debug, Serialize, Deserialize)] -struct CryptarchiaInfo { - tip: HeaderId, -} - -/// Return blobs confirmed on Nomos. -async fn check_blobs( - nomos_client: NomosClient, - sink: tokio::sync::mpsc::Sender, -) -> Result<()> { - // TODO: a good implementation should follow the different forks and react to possible - // reorgs. - // We don't currently habe a food api to follow the chain externally, so for now we're just - // going to simply follow the tip. - let mut current_tip = HeaderId::default(); - loop { - let info = nomos_client.get_cryptarchia_info().await?; - if info.tip != current_tip { - current_tip = info.tip; - tracing::debug!("new tip: {:?}", info.tip); - let blobs = nomos_client.get_block(info.tip).await?.bl_blobs; - - if blobs.is_empty() { - tracing::debug!("No blobs found in block"); - continue; - } - - for blob in blobs { - sink.send(blob.id).await?; - } - } else { - tracing::trace!("No new tip, sleeping..."); - } - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - } -} +// Tutorial code diff --git a/lightnode/src/nomos.rs b/lightnode/src/nomos.rs index 325a4b5..f6d679c 100644 --- a/lightnode/src/nomos.rs +++ b/lightnode/src/nomos.rs @@ -2,47 +2,4 @@ use serde::{Deserialize, Deserializer, Serialize}; use hex::FromHex; -#[derive(Serialize, Deserialize, Debug)] -pub struct CryptarchiaInfo { - pub tip: HeaderId, - pub slot: u64, - pub height: u64, -} - -#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord, Default)] -pub struct HeaderId([u8; 32]); - -impl<'de> Deserialize<'de> for HeaderId { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let hex_str = String::deserialize(deserializer)?; - - let bytes = <[u8; 32]>::from_hex(hex_str) - .map_err(|e| serde::de::Error::custom(format!("Invalid hex string: {}", e)))?; - - Ok(HeaderId(bytes)) - } -} - -impl Serialize for HeaderId { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - let hex_str = hex::encode(self.0); - serializer.serialize_str(&hex_str) - } -} - -use std::fmt; - -impl fmt::Display for HeaderId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - for byte in &self.0 { - write!(f, "{:02x}", byte)?; - } - Ok(()) - } -} +// Tutorial code diff --git a/lightnode/src/proofcheck.rs b/lightnode/src/proofcheck.rs index 73ad811..030595d 100644 --- a/lightnode/src/proofcheck.rs +++ b/lightnode/src/proofcheck.rs @@ -5,45 +5,4 @@ use std::path::Path; use tokio::process::Command; use tracing::{info}; -pub async fn verify_proof( - block_number: u64, - block_count: u64, - rpc: &Url, - prover_url: &Url, - zeth_bin: &Path, -) -> Result<()> { - info!( - "Verifying proof for blocks {}-{}", - block_number, - block_number + block_count - 1 - ); - - let url = prover_url.join(&format!( - "/?block_start={}&block_count={}", - block_number, block_count - ))?; - let proof = reqwest::get(url).await?.bytes().await?; - - let mut tempfile = tempfile::NamedTempFile::new()?; - tempfile.write_all(&proof)?; - - let output = Command::new(zeth_bin) - .args([ - "verify", - &format!("--rpc={}", rpc), - &format!("--block-number={}", block_number), - &format!("--block-count={}", block_count), - &format!("--file={}", tempfile.path().display()), - ]) - .output() - .await?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - anyhow::bail!("zeth-ethereum verify command failed: {}", stderr); - } - - info!("Proof verification successful"); - - Ok(()) -} +// Tutorial code diff --git a/prover/src/http.rs b/prover/src/http.rs index e285aa0..e187a0d 100644 --- a/prover/src/http.rs +++ b/prover/src/http.rs @@ -7,32 +7,4 @@ use serde::Deserialize; use std::path::PathBuf; use tokio::fs; -#[derive(Deserialize)] -pub struct ProofRequest { - block_start: u64, - block_count: u64, -} - -/// Handler for GET / -pub async fn serve_proof(Query(query): Query) -> Response { - let file_name = format!( - "{}-{}.zkp", - query.block_start, - query.block_count + query.block_start - ); - - let path = PathBuf::from(&file_name); - - // Read file contents - match fs::read(&path).await { - Ok(bytes) => (StatusCode::OK, bytes).into_response(), - Err(err) => { - let status = if err.kind() == std::io::ErrorKind::NotFound { - StatusCode::NOT_FOUND - } else { - StatusCode::INTERNAL_SERVER_ERROR - }; - (status, format!("Error reading file: {}", err)).into_response() - } - } -} +// Tutorial code diff --git a/prover/src/main.rs b/prover/src/main.rs index 49d01db..46c5532 100644 --- a/prover/src/main.rs +++ b/prover/src/main.rs @@ -8,184 +8,4 @@ use tracing_subscriber::{EnvFilter, fmt}; mod http; -#[derive(Parser, Debug)] -#[clap(author, version, about = "Ethereum Proof Generation Tool")] -struct Args { - #[clap(long, default_value = "http://localhost:8545")] - rpc: String, - - #[clap(long, default_value = "5")] - start_block: u64, - - #[clap(long, default_value = "10")] - batch_size: u64, - - #[clap(long, default_value = "5")] - interval: u64, - - #[clap(long)] - zeth_binary_dir: Option, - - #[clap(long, default_value = "info")] - log_level: String, -} - -fn run_ethereum_prove( - rpc: &str, - block_number: u64, - batch_size: u64, - zeth_binary_dir: Option, - log_level: &str, -) -> Result<(), String> { - info!( - "Running Ethereum prove for blocks {}-{}", - block_number, - block_number + batch_size - 1 - ); - - let mut binary_path = if let Some(dir) = &zeth_binary_dir { - PathBuf::from(dir) - } else { - debug!("No binary directory provided, trying current directory"); - std::env::current_dir().map_err(|e| format!("Failed to get current directory: {}", e))? - }; - - binary_path.push("zeth-ethereum"); - - if !binary_path.exists() { - return Err(format!("Binary not found at: {}", binary_path.display())); - } - - let output = Command::new(&binary_path) - .env("RUST_LOG", log_level) - .args([ - "prove", - &format!("--rpc={}", rpc), - &format!("--block-number={}", block_number), - &format!("--block-count={}", batch_size), - ]) - .output() - .map_err(|e| format!("Failed to execute zeth-ethereum prove: {}", e))?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - error!("ethereum prove command failed: {}", stderr); - return Err(format!( - "ethereum prove command failed with status: {}\nStderr: {}", - output.status, stderr - )); - } - - info!("Successfully processed batch"); - Ok(()) -} - -fn get_latest_block(client: &Client, rpc: &str) -> Result { - debug!("Checking latest block height..."); - - let request_body = json!({ - "jsonrpc": "2.0", - "method": "eth_blockNumber", - "params": [], - "id": 1 - }); - - let response = client - .post(rpc) - .json(&request_body) - .send() - .map_err(|e| format!("Failed to send request: {}", e))?; - - if !response.status().is_success() { - return Err(format!("Request failed with status: {}", response.status())); - } - - let response_json: Value = response - .json() - .map_err(|e| format!("Failed to parse response JSON: {}", e))?; - - let block_hex = response_json - .get("result") - .and_then(Value::as_str) - .ok_or("Failed to parse response")? - .trim_start_matches("0x"); - - let block_number = u64::from_str_radix(block_hex, 16) - .map_err(|e| format!("Failed to parse hex block number: {}", e))?; - - debug!("Latest block: {}", block_number); - Ok(block_number) -} - -fn main() -> Result<(), Box> { - let args = Args::parse(); - - std::thread::spawn(move || { - if let Err(e) = run_server() { - error!("Error running server: {}", e); - } - }); - - let filter = - EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&args.log_level)); - - fmt::fmt().with_env_filter(filter).with_target(false).init(); - - info!("Starting Ethereum prover..."); - - let client = Client::new(); - let mut current_block = args.start_block; - - loop { - match get_latest_block(&client, &args.rpc) { - Ok(latest_block) => { - if latest_block >= current_block + args.batch_size { - info!( - "New blocks available. Current: {}, Latest: {}", - current_block, latest_block - ); - - match run_ethereum_prove( - &args.rpc, - current_block, - args.batch_size, - args.zeth_binary_dir.clone(), - &args.log_level, - ) { - Ok(_) => { - current_block += args.batch_size; - info!("Updated current block to {}", current_block); - } - Err(e) => { - error!("Error running prover: {}", e); - } - } - } else { - info!( - "No new blocks to process. Current: {}, Latest: {}, sleeping...", - current_block, latest_block - ); - thread::sleep(Duration::from_secs(args.interval)); - } - } - Err(e) => { - error!("Error getting latest block: {}", e); - thread::sleep(Duration::from_secs(args.interval)); - } - } - } -} - -#[tokio::main] -async fn run_server() -> Result<(), Box> { - // Build our application with a route - let app = Router::new().route("/", get(http::serve_proof)); - - let addr = SocketAddr::from(([127, 0, 0, 1], 8070)); - // Run it on localhost:8070 - tracing::info!("Serving files on http://{}", addr); - let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); - axum::serve(listener, app).await.unwrap(); - - Ok(()) -} +// Tutorial code diff --git a/sequencer-node/src/lib.rs b/sequencer-node/src/lib.rs index 47321e8..653a73c 100644 --- a/sequencer-node/src/lib.rs +++ b/sequencer-node/src/lib.rs @@ -6,58 +6,4 @@ use kzgrs_backend::{dispersal::Metadata, encoder::DaEncoderParams}; -pub fn encode_block(block: &Block) -> (Vec, Metadata) { - let mut blob = bincode::serialize(&block).expect("Failed to serialize block"); - let metadata = Metadata::new([0; 32], block.number.into()); - // the node expects blobs to be padded to the next chunk size - let remainder = blob.len() % DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE; - blob.extend(std::iter::repeat_n( - 0, - DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE - remainder, - )); - - (blob, metadata) -} - -pub struct Processor { - da: NomosDa, -} - -impl Processor { - pub fn new(da: NomosDa) -> Self { - Self { da } - } - - pub async fn process_blocks(&mut self, new_blocks: impl Iterator) { - for block in new_blocks { - let (blob, metadata) = encode_block(&block); - if let Err(e) = self.da.disperse(blob, metadata).await { - error!("Failed to disperse block: {e}"); - } else { - info!("Dispersed block: {:?}", block); - } - } - } -} - -pub struct NomosDa { - url: Url, - client: ExecutorHttpClient, -} - -impl NomosDa { - pub fn new(basic_auth: BasicAuthCredentials, url: Url) -> Self { - Self { - client: ExecutorHttpClient::new(Some(basic_auth)), - url, - } - } - - pub async fn disperse(&self, data: Vec, metadata: Metadata) -> Result<[u8; 32], Error> { - self.client - .publish_blob(self.url.clone(), data, metadata) - .await?; - - Ok([0; 32]) // Placeholder for the actual blob ID - } -} +// Tutorial code diff --git a/sequencer-node/src/main.rs b/sequencer-node/src/main.rs index eb22c2a..6e8c7ff 100644 --- a/sequencer-node/src/main.rs +++ b/sequencer-node/src/main.rs @@ -11,73 +11,4 @@ use reth_tracing::tracing::info; use executor_http_client::BasicAuthCredentials; use evm_sequencer_node::{Processor, NomosDa}; -async fn process_blocks( - mut ctx: ExExContext, - mut processor: Processor, -) -> eyre::Result<()> -where - <::Types as NodeTypes>::Primitives: - NodePrimitives, -{ - while let Some(notification) = ctx.notifications.try_next().await? { - let ExExNotification::ChainCommitted { new } = ¬ification else { - continue; - }; - info!(committed_chain = ?new.range(), "Received commit"); - processor - .process_blocks( - new.inner() - .0 - .clone() - .into_blocks() - .map(reth_ethereum::primitives::RecoveredBlock::into_block), - ) - .await; - - ctx.events - .send(ExExEvent::FinishedHeight(new.tip().num_hash())) - .unwrap(); - } - - Ok(()) -} - -fn main() -> eyre::Result<()> { - Cli::try_parse_args_from([ - "reth", - "node", - "--dev", - "--rpc.eth-proof-window=2048", - "--dev.block-time=2s", - "--http.addr=0.0.0.0", - "--http.api=eth,net,web3,debug,trace,txpool", // Some might be unnecessary, but I guess - "--ws", - "--ws.addr=0.0.0.0", - "--ws.api=eth,net,web3,txpool", - "--http.corsdomain=\"*\"", // Needed locally, probably needed here as well. - ]) - .unwrap() - .run(|builder, _| { - Box::pin(async move { - let url = std::env::var("NOMOS_EXECUTOR").unwrap_or_default(); - let user = std::env::var("NOMOS_USER").unwrap_or_default(); - let password = std::env::var("NOMOS_PASSWORD").unwrap_or_default(); - let da = NomosDa::new( - BasicAuthCredentials::new(user, Some(password)), - url::Url::parse(&url).unwrap(), - ); - let processor = Processor::new(da); - let handle = Box::pin( - builder - .node(EthereumNode::default()) - .install_exex("process-block", async move |ctx| { - Ok(process_blocks(ctx, processor)) - }) - .launch(), - ) - .await?; - - handle.wait_for_node_exit().await - }) - }) -} +// Tutorial code