From 670a7568a57e4efb9d63bf6e5f50d6e007dac315 Mon Sep 17 00:00:00 2001 From: Giacomo Pasini <21265557+zeegomo@users.noreply.github.com> Date: Wed, 16 Apr 2025 10:33:37 +0200 Subject: [PATCH] [SZ]: Publish blocks to da (#65) * Publish blocks to da * remove leftover comment --- sz-poc-offsite-2025/Cargo.toml | 4 +- sz-poc-offsite-2025/evm/aggregator/Cargo.toml | 6 -- sz-poc-offsite-2025/evm/aggregator/src/lib.rs | 14 ----- sz-poc-offsite-2025/evm/processor/Cargo.toml | 11 ++++ sz-poc-offsite-2025/evm/processor/src/lib.rs | 61 +++++++++++++++++++ .../evm/sequencer-node/Cargo.toml | 3 +- .../evm/sequencer-node/src/main.rs | 22 ++++--- 7 files changed, 90 insertions(+), 31 deletions(-) delete mode 100644 sz-poc-offsite-2025/evm/aggregator/Cargo.toml delete mode 100644 sz-poc-offsite-2025/evm/aggregator/src/lib.rs create mode 100644 sz-poc-offsite-2025/evm/processor/Cargo.toml create mode 100644 sz-poc-offsite-2025/evm/processor/src/lib.rs diff --git a/sz-poc-offsite-2025/Cargo.toml b/sz-poc-offsite-2025/Cargo.toml index 9e9d834..fd41978 100644 --- a/sz-poc-offsite-2025/Cargo.toml +++ b/sz-poc-offsite-2025/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["evm/aggregator", "evm/sequencer-node"] +members = ["evm/processor", "evm/sequencer-node"] resolver = "3" [workspace.package] @@ -7,7 +7,7 @@ edition = "2024" [workspace.dependencies] # Internal -evm-aggregator = { path = "evm/aggregator" } +evm-processor = { path = "evm/processor" } evm-sequencer-node = { path = "evm/sequencer-node" } # External diff --git a/sz-poc-offsite-2025/evm/aggregator/Cargo.toml b/sz-poc-offsite-2025/evm/aggregator/Cargo.toml deleted file mode 100644 index 4cf5bf8..0000000 --- a/sz-poc-offsite-2025/evm/aggregator/Cargo.toml +++ /dev/null @@ -1,6 +0,0 @@ -[package] -name = "evm-aggregator" -edition = { workspace = true } - -[dependencies] -reth-ethereum = { workspace = true } diff --git a/sz-poc-offsite-2025/evm/aggregator/src/lib.rs b/sz-poc-offsite-2025/evm/aggregator/src/lib.rs deleted file mode 100644 index 27bc9f4..0000000 --- a/sz-poc-offsite-2025/evm/aggregator/src/lib.rs +++ /dev/null @@ -1,14 +0,0 @@ -use reth_ethereum::Block; - -// TODO: The logic to batch multiple of these blocks (or the transactions within them) and send them to DA and generate proofs is still missing. It will have to be added at the offsite. -// This type does not support any recovery mechanism, so if the node is stopped, the state DB should be cleaned before starting again. The folder is specified by the `--datadir` option in the binary. -#[derive(Default)] -pub struct Aggregator { - unprocessed_blocks: Vec, -} - -impl Aggregator { - pub fn process_blocks(&mut self, new_blocks: impl Iterator) { - self.unprocessed_blocks.extend(new_blocks); - } -} diff --git a/sz-poc-offsite-2025/evm/processor/Cargo.toml b/sz-poc-offsite-2025/evm/processor/Cargo.toml new file mode 100644 index 0000000..e82fcd9 --- /dev/null +++ b/sz-poc-offsite-2025/evm/processor/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "evm-processor" +edition = { workspace = true } + +[dependencies] +reth-ethereum = { workspace = true } +executor-http-client = { git = "https://github.com/logos-co/nomos", branch = "master" } +reqwest = "0.11" +kzgrs-backend = { git = "https://github.com/logos-co/nomos", branch = "master" } +bincode = "1" +reth-tracing = { workspace = true } \ No newline at end of file diff --git a/sz-poc-offsite-2025/evm/processor/src/lib.rs b/sz-poc-offsite-2025/evm/processor/src/lib.rs new file mode 100644 index 0000000..f16c338 --- /dev/null +++ b/sz-poc-offsite-2025/evm/processor/src/lib.rs @@ -0,0 +1,61 @@ +use reth_ethereum::Block; +use executor_http_client::{ExecutorHttpClient, Error}; +pub use executor_http_client::BasicAuthCredentials; +use reqwest::Url; +use reth_tracing::tracing::{info, error}; +use kzgrs_backend::{dispersal::Metadata, encoder::DaEncoderParams}; + +pub struct Processor { + da: NomosDa, +} + +impl Processor { + pub fn new(da: NomosDa) -> Self { + Self { + da + } + } + + pub async fn process_blocks(&mut self, new_blocks: impl Iterator) { + for block in new_blocks { + let mut blob = bincode::serialize(&block).expect("Failed to serialize block"); + let metadata = Metadata::new([0; 32], block.number.into()); + // the node expects blobs to be padded to the next chunk size + let remainder = blob.len() % DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE; + blob.extend( + std::iter::repeat(0) + .take(DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE - remainder), + ); + if let Err(e) = self.da.disperse(blob, metadata).await + { + error!("Failed to disperse block: {e}"); + } else { + info!("Dispersed block: {:?}", block); + } + } + } +} + + + +pub struct NomosDa { + url: Url, + client: ExecutorHttpClient, +} + + + +impl NomosDa { + pub fn new(basic_auth: BasicAuthCredentials, url: Url) -> Self { + Self { + client: ExecutorHttpClient::new(Some(basic_auth)), + url, + } + } + + pub async fn disperse(&self, data: Vec, metadata: Metadata) -> Result<(), Error> { + self.client + .publish_blob(self.url.clone(), data, metadata).await + + } +} \ No newline at end of file diff --git a/sz-poc-offsite-2025/evm/sequencer-node/Cargo.toml b/sz-poc-offsite-2025/evm/sequencer-node/Cargo.toml index e76b274..0b5d940 100644 --- a/sz-poc-offsite-2025/evm/sequencer-node/Cargo.toml +++ b/sz-poc-offsite-2025/evm/sequencer-node/Cargo.toml @@ -4,7 +4,7 @@ edition = { workspace = true } [dependencies] -evm-aggregator = { workspace = true } +evm-processor = { workspace = true } eyre = { workspace = true } futures = { workspace = true } @@ -12,3 +12,4 @@ reth = { workspace = true } reth-ethereum = { workspace = true, features = ["full"] } reth-ethereum-primitives = { workspace = true } reth-tracing = { workspace = true } +url = { version = "2" } diff --git a/sz-poc-offsite-2025/evm/sequencer-node/src/main.rs b/sz-poc-offsite-2025/evm/sequencer-node/src/main.rs index 2de08b5..4927f8b 100644 --- a/sz-poc-offsite-2025/evm/sequencer-node/src/main.rs +++ b/sz-poc-offsite-2025/evm/sequencer-node/src/main.rs @@ -1,4 +1,4 @@ -use evm_aggregator::Aggregator; +use evm_processor::{Processor, NomosDa, BasicAuthCredentials}; use futures::TryStreamExt as _; use reth::{ api::{FullNodeTypes, NodePrimitives, NodeTypes}, @@ -10,9 +10,11 @@ use reth_ethereum::{ }; use reth_tracing::tracing::info; -async fn aggregate_block_txs( +const TESTNET_EXECUTOR: &str = "https://testnet.nomos.tech/node/3/"; + +async fn process_blocks( mut ctx: ExExContext, - mut aggregator: Aggregator, + mut processor: Processor, ) -> eyre::Result<()> where <::Types as NodeTypes>::Primitives: @@ -23,13 +25,13 @@ where continue; }; info!(committed_chain = ?new.range(), "Received commit"); - aggregator.process_blocks( + processor.process_blocks( new.inner() .0 .clone() .into_blocks() .map(reth_ethereum::primitives::RecoveredBlock::into_block), - ); + ).await; ctx.events .send(ExExEvent::FinishedHeight(new.tip().num_hash())) @@ -51,12 +53,16 @@ fn main() -> eyre::Result<()> { .unwrap() .run(|builder, _| { Box::pin(async move { - let aggregator = Aggregator::default(); + let url = std::env::var("NOMOS_EXECUTOR").unwrap_or(TESTNET_EXECUTOR.to_string()); + let user = std::env::var("NOMOS_USER").unwrap_or_default(); + let password = std::env::var("NOMOS_PASSWORD").unwrap_or_default(); + let da = NomosDa::new( BasicAuthCredentials::new(user, Some(password)), url::Url::parse(&url).unwrap()); + let processor = Processor::new(da); let handle = Box::pin( builder .node(EthereumNode::default()) - .install_exex("aggregate-block-txs", async move |ctx| { - Ok(aggregate_block_txs(ctx, aggregator)) + .install_exex("process-block", async move |ctx| { + Ok(process_blocks(ctx, processor)) }) .launch(), )