diff --git a/mistral/lightnode/Cargo.toml b/mistral/lightnode/Cargo.toml index da314fa..d573599 100644 --- a/mistral/lightnode/Cargo.toml +++ b/mistral/lightnode/Cargo.toml @@ -19,3 +19,10 @@ hex = { version = "0.4" } futures = { version = "0.3" } kzgrs-backend = { git = "https://github.com/logos-co/nomos", branch = "master" } tempfile = "3.19.1" +alloy = { version = "0.13", features = ["full"] } +lru = { version = "0.14" } +nomos-core = { git = "https://github.com/logos-co/nomos", branch = "master" } +reth.workspace = true +evm-processor = { path = "../processor" } +anyhow = "1.0.98" +tokio-stream = "0.1.17" diff --git a/mistral/lightnode/src/da.rs b/mistral/lightnode/src/da.rs new file mode 100644 index 0000000..dba14e1 --- /dev/null +++ b/mistral/lightnode/src/da.rs @@ -0,0 +1,6 @@ +use anyhow::Result; +use nomos_core::da::BlobId; + +pub async fn sampling(_blob_id: BlobId) -> Result<()> { + Ok(()) +} diff --git a/mistral/lightnode/src/lib.rs b/mistral/lightnode/src/lib.rs index 99dbad0..43a21d9 100644 --- a/mistral/lightnode/src/lib.rs +++ b/mistral/lightnode/src/lib.rs @@ -4,12 +4,15 @@ use executor_http_client::{BasicAuthCredentials, Error, ExecutorHttpClient}; use kzgrs_backend::common::share::{DaLightShare, DaShare}; use nomos::{CryptarchiaInfo, HeaderId}; use reqwest::Url; -use tracing::{debug, error, info}; +use serde::{Deserialize, Serialize}; +use anyhow::Result; +use tracing::{debug, info}; pub const CRYPTARCHIA_INFO: &str = "cryptarchia/info"; pub const STORAGE_BLOCK: &str = "storage/block"; use futures::Stream; +pub mod da; pub mod nomos; pub mod proofcheck; @@ -39,7 +42,7 @@ impl NomosClient { } } - pub async fn get_cryptarchia_info(&self) -> Result { + pub async fn get_cryptarchia_info(&self) -> Result { let url = self.base_url.join(CRYPTARCHIA_INFO).expect("Invalid URL"); debug!("Requesting cryptarchia info from {}", url); @@ -48,24 +51,17 @@ impl NomosClient { self.basic_auth.password.as_deref(), ); - let response = request.send().await.map_err(|e| { - error!("Failed to send request: {}", e); - "Failed to send request".to_string() - })?; + let response = request.send().await?; if !response.status().is_success() { - error!("Failed to get cryptarchia info: {}", response.status()); - return Err("Failed to get cryptarchia info".to_string()); + anyhow::bail!("Failed to get cryptarchia info: {}", response.status()); } - let info = response.json::().await.map_err(|e| { - error!("Failed to parse response: {}", e); - "Failed to parse response".to_string() - })?; + let info = response.json::().await?; Ok(info) } - pub async fn get_block(&self, id: HeaderId) -> Result { + pub async fn get_block(&self, id: HeaderId) -> Result { let url = self.base_url.join(STORAGE_BLOCK).expect("Invalid URL"); info!("Requesting block with HeaderId {}", id); @@ -79,22 +75,15 @@ impl NomosClient { ) .body(serde_json::to_string(&id).unwrap()); - let response = request.send().await.map_err(|e| { - error!("Failed to send request: {}", e); - "Failed to send request".to_string() - })?; + let response = request.send().await?; if !response.status().is_success() { - error!("Failed to get block: {}", response.status()); - return Err("Failed to get block".to_string()); + anyhow::bail!("Failed to get block: {}", response.status()); } - let json: serde_json::Value = response.json().await.map_err(|e| { - error!("Failed to parse JSON: {}", e); - "Failed to parse JSON".to_string() - })?; + let block: Block = response.json().await?; - Ok(json) + Ok(block) } pub async fn get_shares( @@ -112,3 +101,8 @@ impl NomosClient { .await } } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Block { + pub blobs: Vec<[u8; 32]>, +} diff --git a/mistral/lightnode/src/main.rs b/mistral/lightnode/src/main.rs index eb3f4a4..3f0a830 100644 --- a/mistral/lightnode/src/main.rs +++ b/mistral/lightnode/src/main.rs @@ -1,11 +1,17 @@ +use alloy::providers::{Provider, ProviderBuilder, WsConnect}; use clap::Parser; -use evm_lightnode::{Credentials, NomosClient, nomos::HeaderId, proofcheck}; +use evm_lightnode::{NomosClient, nomos::HeaderId, proofcheck, Credentials}; +use futures::Stream; use futures::StreamExt; +use nomos_core::da::{BlobId, DaEncoder}; +use serde::{Deserialize, Serialize}; use std::error; -use std::path::PathBuf; -use tracing::info; +use std::num::NonZero; +use std::path::{Path, PathBuf}; use tracing_subscriber::{EnvFilter, fmt}; use url::Url; +use anyhow::Result; +use tokio::time::{sleep, Duration}; #[derive(Parser, Debug)] #[clap(author, version, about = "Light Node validator")] @@ -13,14 +19,17 @@ struct Args { #[clap(long, default_value = "info")] log_level: String, - #[clap(long, default_value = "http://localhost:8546")] + #[clap(long, default_value = "http://localhost:8545")] rpc: Url, + #[clap(long, default_value = "wss://localhost:8546")] + ws_rpc: Url, + #[clap(long, default_value = "http://localhost:8070")] prover_url: Url, - #[clap(long)] - start_block: u64, + #[clap(long, default_value = TESTNET_EXECUTOR)] + nomos_node: Url, #[clap(long, default_value = "10")] batch_size: u64, @@ -40,81 +49,210 @@ async fn main() -> Result<(), Box> { fmt::fmt().with_env_filter(filter).with_target(false).init(); - proofcheck::verify_proof( - args.start_block, + let zone_blocks = follow_sz(args.ws_rpc.clone()).await?; + let (tx, da_blobs) = tokio::sync::mpsc::channel::(MAX_BLOBS); + tokio::spawn(check_blobs( + NomosClient::new( + args.nomos_node.clone(), + Credentials { + username: "user".to_string(), + password: Some("password".to_string()), + }, + ), + tx + )); + + verify_zone_stf( args.batch_size, - &args.rpc, - &args.prover_url, - &args - .zeth_binary_dir - .unwrap_or_else(|| std::env::current_dir().unwrap()) - .join("zeth-ethereum"), + args.rpc.clone(), + args.prover_url.clone(), + args.zeth_binary_dir.as_deref().unwrap_or_else(|| Path::new("zeth")), + zone_blocks, + tokio_stream::wrappers::ReceiverStream::new(da_blobs), ) .await?; - //todo: use check_da_periodically to check for new blocks from da Ok(()) } -#[allow(dead_code)] -async fn check_da_periodically() -> Result<(), Box> { - let url = std::env::var("NOMOS_EXECUTOR").unwrap_or(TESTNET_EXECUTOR.to_string()); - let user = std::env::var("NOMOS_USER").unwrap_or_default(); - let password = std::env::var("NOMOS_PASSWORD").unwrap_or_default(); - let basic_auth = Credentials { - username: user, - password: Some(password), - }; +const MAX_BLOBS: usize = 1 << 10; +const MAX_PROOF_RETRIES: usize = 5; - let nomos_client = NomosClient::new(Url::parse(&url).unwrap(), basic_auth); - - let mut current_tip = HeaderId::default(); +async fn verify_zone_stf( + batch_size: u64, + rpc: Url, + prover_url: Url, + zeth_binary_dir: &Path, + blocks: impl Stream, + included_blobs: impl Stream, +) -> Result<(), Box> { + let mut blobs_on_consensus = lru::LruCache::new(NonZero::new(MAX_BLOBS).unwrap()); + let mut sz_blobs = lru::LruCache::new(NonZero::new(MAX_BLOBS).unwrap()); + tokio::pin!(blocks); + tokio::pin!(included_blobs); loop { - let info = nomos_client.get_cryptarchia_info().await?; + let to_verify = tokio::select! { + Some((block_n, blob_id)) = blocks.next() => { + if let Some(_) = blobs_on_consensus.pop(&blob_id) { + tracing::debug!("Block confirmed on consensus: {:?}", block_n); + Some((block_n, blob_id)) + } else { + if let Some(expired) = sz_blobs.push(blob_id, block_n) { + tracing::warn!("Block was not confirmed on mainnet: {:?}", expired); + } + None + } + } + Some(blob_id) = included_blobs.next() => { + if let Some(block_n) = sz_blobs.pop(&blob_id) { + tracing::debug!("Block confirmed on consensus: {:?}", block_n); + Some((block_n, blob_id)) + } else { + // Blobs coming from other zones are not going to be confirmed and + // should be removed from the cache. + // It is highly unlikely that a zone blob is seen first on consensus and + // later enough on the sz rpc to be dropped from the cache, although + // it is possible. + // Do something here if you want to keep track of those blobs. + let _ = blobs_on_consensus.push(blob_id, ()); + None + } + } + }; - if info.tip != current_tip { - current_tip = info.tip; - info!("New tip: {:?}", current_tip); - let block = nomos_client.get_block(info.tip).await?; - info!("Block: {:?}", block); + if let Some((block_n, blob_id)) = to_verify { + let rpc = rpc.clone(); + let prover_url = prover_url.clone(); + let zeth_binary_dir = zeth_binary_dir.to_path_buf(); + tokio::spawn(async move { + verify_blob( + block_n, + blob_id, + batch_size, + &rpc, + &prover_url, + &zeth_binary_dir, + ) + .await + .unwrap_or_else(|e| { + tracing::error!("Failed to verify blob: {:?}", e); + }); + }); + } + } +} - let blobs = block.get("bl_blobs"); +async fn verify_blob( + block_number: u64, + blob: BlobId, + batch_size: u64, + rpc: &Url, + prover_url: &Url, + zeth_binary_dir: &Path, +) -> Result<()> { + // block are proved in batches, aligned to `batch_size` + let block_number = block_number - block_number % batch_size; + futures::try_join!(evm_lightnode::da::sampling(blob), async { + let mut sleep_time = 1; + for _ in 0..MAX_PROOF_RETRIES { + if proofcheck::verify_proof( + block_number, + batch_size, + rpc, + prover_url, + zeth_binary_dir, + ) + .await.is_ok() { + return Ok(()); + } + + sleep(Duration::from_secs(sleep_time)).await; + sleep_time *= 2; + } + Err(anyhow::anyhow!("Failed to verify proof after {} retries", MAX_PROOF_RETRIES)) + })?; + // TODO: reuse rpc results + Ok(()) +} - if blobs.is_none() { - info!("Parsing error"); - continue; +/// Follow the sovereign zone and return the blob ID of each new produced block +async fn follow_sz( + ws_rpc: Url, +) -> Result, Box> { + let provider = ProviderBuilder::new().on_ws(WsConnect::new(ws_rpc)).await?; + // Pub-sub: get a live stream of blocks + let blocks = provider + .subscribe_full_blocks() + .full() + .into_stream() + .await?; + + Ok(blocks.filter_map(|block| async move { + if let Ok(block) = block { + // poor man's proof of equivalence + use alloy::consensus::{Block, Signed}; + let block: Block> = block.into_consensus().convert_transactions(); + let (data, _) = evm_processor::encode_block(&block.clone().convert_transactions()); + let blob_id = { + let encoder = kzgrs_backend::encoder::DaEncoder::new( + kzgrs_backend::encoder::DaEncoderParams::new( + 2, + false, + kzgrs_backend::global::GLOBAL_PARAMETERS.clone(), + ), + ); + // this is a REALLY heavy task, so we should try not to block the thread here + let heavy_task = tokio::task::spawn_blocking(move || encoder.encode(&data)); + let encoded_data = heavy_task.await.unwrap().unwrap(); + kzgrs_backend::common::build_blob_id( + &encoded_data.aggregated_column_commitment, + &encoded_data.row_commitments, + ) }; - let blobs = blobs.unwrap().as_array().unwrap(); + Some((block.header.number, blob_id)) + } else { + tracing::error!("Failed to get block"); + None + } + })) +} + +#[derive(Debug, Serialize, Deserialize)] +struct CryptarchiaInfo { + tip: HeaderId, +} + +/// Return blobs confirmed on Nomos. +async fn check_blobs( + nomos_client: NomosClient, + sink: tokio::sync::mpsc::Sender, +) -> Result<()> { + // TODO: a good implementation should follow the different forks and react to possible + // reorgs. + // We don't currently habe a food api to follow the chain externally, so for now we're just + // going to simply follow the tip. + let mut current_tip = HeaderId::default(); + loop { + let info = nomos_client.get_cryptarchia_info().await?; + if info.tip != current_tip { + current_tip = info.tip; + tracing::debug!("new tip: {:?}", info.tip); + let blobs = nomos_client.get_block(info.tip).await?.blobs; + if blobs.is_empty() { - info!("No blobs found in block"); + tracing::debug!("No blobs found in block"); continue; } + for blob in blobs { - let id_array = blob.get("id").unwrap().as_array().unwrap(); - - let mut blob_id = [0u8; 32]; - for (i, num) in id_array.iter().enumerate() { - if i < 32 { - blob_id[i] = num.as_u64().unwrap() as u8; - } - } - - info!("fetching stream"); - let shares_stream = nomos_client.get_shares(blob_id).await?; - - let shares = shares_stream.collect::>().await; - - info!("Shares for blob_id {:?}: {:?}", blob_id, shares); - - // todo: verify proof + sink.send(blob).await?; } } else { - info!("No new tip, sleeping..."); + tracing::trace!("No new tip, sleeping..."); } - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; } } diff --git a/mistral/lightnode/src/proofcheck.rs b/mistral/lightnode/src/proofcheck.rs index 13ba32c..73ad811 100644 --- a/mistral/lightnode/src/proofcheck.rs +++ b/mistral/lightnode/src/proofcheck.rs @@ -1,8 +1,9 @@ -use tokio::process::Command; -use std::path::Path; -use std::io::Write; +use anyhow::Result; use reqwest::Url; -use tracing::{error, info}; +use std::io::Write; +use std::path::Path; +use tokio::process::Command; +use tracing::{info}; pub async fn verify_proof( block_number: u64, @@ -10,7 +11,7 @@ pub async fn verify_proof( rpc: &Url, prover_url: &Url, zeth_bin: &Path, -) -> Result<(), String> { +) -> Result<()> { info!( "Verifying proof for blocks {}-{}", block_number, @@ -20,18 +21,11 @@ pub async fn verify_proof( let url = prover_url.join(&format!( "/?block_start={}&block_count={}", block_number, block_count - )).map_err(|e| format!("Failed to construct URL: {}", e))?; - let proof = reqwest::get(url).await - .map_err(|e| format!("Failed to fetch proof: {}", e))? - .bytes() - .await - .map_err(|e| format!("Failed to read proof response: {}", e))?; + ))?; + let proof = reqwest::get(url).await?.bytes().await?; - let mut tempfile = tempfile::NamedTempFile::new() - .map_err(|e| format!("Failed to create temporary file: {}", e))?; - tempfile.write_all(&proof) - .map_err(|e| format!("Failed to write proof to file: {}", e))?; - + let mut tempfile = tempfile::NamedTempFile::new()?; + tempfile.write_all(&proof)?; let output = Command::new(zeth_bin) .args([ @@ -41,16 +35,12 @@ pub async fn verify_proof( &format!("--block-count={}", block_count), &format!("--file={}", tempfile.path().display()), ]) - .output().await - .map_err(|e| format!("Failed to execute zeth-ethereum verify: {}", e))?; + .output() + .await?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); - error!("zeth-ethereum verify command failed: {}", stderr); - return Err(format!( - "zeth-ethereum verify command failed with status: {}\nStderr: {}", - output.status, stderr - )); + anyhow::bail!("zeth-ethereum verify command failed: {}", stderr); } info!("Proof verification successful"); diff --git a/mistral/processor/src/lib.rs b/mistral/processor/src/lib.rs index 602a351..5d1f705 100644 --- a/mistral/processor/src/lib.rs +++ b/mistral/processor/src/lib.rs @@ -5,6 +5,19 @@ use reqwest::Url; use reth_ethereum::Block; use reth_tracing::tracing::{error, info}; +pub fn encode_block(block: &Block) -> (Vec, Metadata) { + let mut blob = bincode::serialize(&block).expect("Failed to serialize block"); + let metadata = Metadata::new([0; 32], block.number.into()); + // the node expects blobs to be padded to the next chunk size + let remainder = blob.len() % DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE; + blob.extend(std::iter::repeat_n( + 0, + DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE - remainder, + )); + + (blob, metadata) +} + pub struct Processor { da: NomosDa, } @@ -16,14 +29,7 @@ impl Processor { pub async fn process_blocks(&mut self, new_blocks: impl Iterator) { for block in new_blocks { - let mut blob = bincode::serialize(&block).expect("Failed to serialize block"); - let metadata = Metadata::new([0; 32], block.number.into()); - // the node expects blobs to be padded to the next chunk size - let remainder = blob.len() % DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE; - blob.extend(std::iter::repeat_n( - 0, - DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE - remainder, - )); + let (blob, metadata) = encode_block(&block); if let Err(e) = self.da.disperse(blob, metadata).await { error!("Failed to disperse block: {e}"); } else { @@ -49,6 +55,8 @@ impl NomosDa { pub async fn disperse(&self, data: Vec, metadata: Metadata) -> Result<[u8; 32], Error> { self.client .publish_blob(self.url.clone(), data, metadata) - .await + .await?; + + Ok([0; 32]) // Placeholder for the actual blob ID } } diff --git a/mistral/prover/src/http.rs b/mistral/prover/src/http.rs index 0eca425..e285aa0 100644 --- a/mistral/prover/src/http.rs +++ b/mistral/prover/src/http.rs @@ -1,4 +1,3 @@ - use axum::{ extract::Query, http::StatusCode, @@ -11,22 +10,22 @@ use tokio::fs; #[derive(Deserialize)] pub struct ProofRequest { block_start: u64, - block_count: u64 + block_count: u64, } - /// Handler for GET / pub async fn serve_proof(Query(query): Query) -> Response { - let file_name = format!("{}-{}.zkp", query.block_start, query.block_count + query.block_start); + let file_name = format!( + "{}-{}.zkp", + query.block_start, + query.block_count + query.block_start + ); let path = PathBuf::from(&file_name); // Read file contents match fs::read(&path).await { - Ok(bytes) => ( - StatusCode::OK, - bytes, - ).into_response(), + Ok(bytes) => (StatusCode::OK, bytes).into_response(), Err(err) => { let status = if err.kind() == std::io::ErrorKind::NotFound { StatusCode::NOT_FOUND @@ -36,4 +35,4 @@ pub async fn serve_proof(Query(query): Query) -> Response { (status, format!("Error reading file: {}", err)).into_response() } } -} \ No newline at end of file +} diff --git a/mistral/prover/src/main.rs b/mistral/prover/src/main.rs index e3c353f..49d01db 100644 --- a/mistral/prover/src/main.rs +++ b/mistral/prover/src/main.rs @@ -1,20 +1,17 @@ +use axum::{Router, routing::get}; use clap::Parser; use reqwest::blocking::Client; -use serde_json::{json, Value}; -use std::{path::PathBuf, process::Command, thread, time::Duration, net::SocketAddr}; +use serde_json::{Value, json}; +use std::{net::SocketAddr, path::PathBuf, process::Command, thread, time::Duration}; use tracing::{debug, error, info}; -use tracing_subscriber::{fmt, EnvFilter}; -use axum::{ - routing::get, - Router, -}; +use tracing_subscriber::{EnvFilter, fmt}; mod http; #[derive(Parser, Debug)] #[clap(author, version, about = "Ethereum Proof Generation Tool")] struct Args { - #[clap(long, default_value = "http://localhost:8546")] + #[clap(long, default_value = "http://localhost:8545")] rpc: String, #[clap(long, default_value = "5")] @@ -179,13 +176,10 @@ fn main() -> Result<(), Box> { } } - - #[tokio::main] async fn run_server() -> Result<(), Box> { // Build our application with a route - let app = Router::new() - .route("/", get(http::serve_proof)); + let app = Router::new().route("/", get(http::serve_proof)); let addr = SocketAddr::from(([127, 0, 0, 1], 8070)); // Run it on localhost:8070 @@ -194,4 +188,4 @@ async fn run_server() -> Result<(), Box> { axum::serve(listener, app).await.unwrap(); Ok(()) -} \ No newline at end of file +} diff --git a/mistral/sequencer-node/src/main.rs b/mistral/sequencer-node/src/main.rs index 39becd8..2607a37 100644 --- a/mistral/sequencer-node/src/main.rs +++ b/mistral/sequencer-node/src/main.rs @@ -51,10 +51,10 @@ fn main() -> eyre::Result<()> { "--rpc.eth-proof-window=2048", "--dev.block-time=2s", "--http.addr=0.0.0.0", - "--http.port=8546", - "--http.api eth,net,web3,debug,trace,txpool", // Some might be unnecessary, but I guess - // they don't hurt - "--http.corsdomain \"*\"" // Needed locally, probably needed here as well. + "--http.api=eth,net,web3,debug,trace,txpool", // Some might be unnecessary, but I guess + "--ws.addr=0.0.0.0", + "--ws.api=eth,net,web3,txpool", + "--http.corsdomain=\"*\"", // Needed locally, probably needed here as well. ]) .unwrap() .run(|builder, _| {