[SZ]: Publish blocks to da (#65)

* Publish blocks to da

* remove leftover comment
This commit is contained in:
Giacomo Pasini 2025-04-16 10:33:37 +02:00 committed by GitHub
parent 609aa9f880
commit 670a7568a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 90 additions and 31 deletions

View File

@ -1,5 +1,5 @@
[workspace]
members = ["evm/aggregator", "evm/sequencer-node"]
members = ["evm/processor", "evm/sequencer-node"]
resolver = "3"
[workspace.package]
@ -7,7 +7,7 @@ edition = "2024"
[workspace.dependencies]
# Internal
evm-aggregator = { path = "evm/aggregator" }
evm-processor = { path = "evm/processor" }
evm-sequencer-node = { path = "evm/sequencer-node" }
# External

View File

@ -1,6 +0,0 @@
[package]
name = "evm-aggregator"
edition = { workspace = true }
[dependencies]
reth-ethereum = { workspace = true }

View File

@ -1,14 +0,0 @@
use reth_ethereum::Block;
// TODO: The logic to batch multiple of these blocks (or the transactions within them) and send them to DA and generate proofs is still missing. It will have to be added at the offsite.
// This type does not support any recovery mechanism, so if the node is stopped, the state DB should be cleaned before starting again. The folder is specified by the `--datadir` option in the binary.
#[derive(Default)]
pub struct Aggregator {
unprocessed_blocks: Vec<Block>,
}
impl Aggregator {
pub fn process_blocks(&mut self, new_blocks: impl Iterator<Item = Block>) {
self.unprocessed_blocks.extend(new_blocks);
}
}

View File

@ -0,0 +1,11 @@
[package]
name = "evm-processor"
edition = { workspace = true }
[dependencies]
reth-ethereum = { workspace = true }
executor-http-client = { git = "https://github.com/logos-co/nomos", branch = "master" }
reqwest = "0.11"
kzgrs-backend = { git = "https://github.com/logos-co/nomos", branch = "master" }
bincode = "1"
reth-tracing = { workspace = true }

View File

@ -0,0 +1,61 @@
use reth_ethereum::Block;
use executor_http_client::{ExecutorHttpClient, Error};
pub use executor_http_client::BasicAuthCredentials;
use reqwest::Url;
use reth_tracing::tracing::{info, error};
use kzgrs_backend::{dispersal::Metadata, encoder::DaEncoderParams};
pub struct Processor {
da: NomosDa,
}
impl Processor {
pub fn new(da: NomosDa) -> Self {
Self {
da
}
}
pub async fn process_blocks(&mut self, new_blocks: impl Iterator<Item = Block>) {
for block in new_blocks {
let mut blob = bincode::serialize(&block).expect("Failed to serialize block");
let metadata = Metadata::new([0; 32], block.number.into());
// the node expects blobs to be padded to the next chunk size
let remainder = blob.len() % DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE;
blob.extend(
std::iter::repeat(0)
.take(DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE - remainder),
);
if let Err(e) = self.da.disperse(blob, metadata).await
{
error!("Failed to disperse block: {e}");
} else {
info!("Dispersed block: {:?}", block);
}
}
}
}
pub struct NomosDa {
url: Url,
client: ExecutorHttpClient,
}
impl NomosDa {
pub fn new(basic_auth: BasicAuthCredentials, url: Url) -> Self {
Self {
client: ExecutorHttpClient::new(Some(basic_auth)),
url,
}
}
pub async fn disperse(&self, data: Vec<u8>, metadata: Metadata) -> Result<(), Error> {
self.client
.publish_blob(self.url.clone(), data, metadata).await
}
}

View File

@ -4,7 +4,7 @@ edition = { workspace = true }
[dependencies]
evm-aggregator = { workspace = true }
evm-processor = { workspace = true }
eyre = { workspace = true }
futures = { workspace = true }
@ -12,3 +12,4 @@ reth = { workspace = true }
reth-ethereum = { workspace = true, features = ["full"] }
reth-ethereum-primitives = { workspace = true }
reth-tracing = { workspace = true }
url = { version = "2" }

View File

@ -1,4 +1,4 @@
use evm_aggregator::Aggregator;
use evm_processor::{Processor, NomosDa, BasicAuthCredentials};
use futures::TryStreamExt as _;
use reth::{
api::{FullNodeTypes, NodePrimitives, NodeTypes},
@ -10,9 +10,11 @@ use reth_ethereum::{
};
use reth_tracing::tracing::info;
async fn aggregate_block_txs<Node: FullNodeComponents>(
const TESTNET_EXECUTOR: &str = "https://testnet.nomos.tech/node/3/";
async fn process_blocks<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
mut aggregator: Aggregator,
mut processor: Processor,
) -> eyre::Result<()>
where
<<Node as FullNodeTypes>::Types as NodeTypes>::Primitives:
@ -23,13 +25,13 @@ where
continue;
};
info!(committed_chain = ?new.range(), "Received commit");
aggregator.process_blocks(
processor.process_blocks(
new.inner()
.0
.clone()
.into_blocks()
.map(reth_ethereum::primitives::RecoveredBlock::into_block),
);
).await;
ctx.events
.send(ExExEvent::FinishedHeight(new.tip().num_hash()))
@ -51,12 +53,16 @@ fn main() -> eyre::Result<()> {
.unwrap()
.run(|builder, _| {
Box::pin(async move {
let aggregator = Aggregator::default();
let url = std::env::var("NOMOS_EXECUTOR").unwrap_or(TESTNET_EXECUTOR.to_string());
let user = std::env::var("NOMOS_USER").unwrap_or_default();
let password = std::env::var("NOMOS_PASSWORD").unwrap_or_default();
let da = NomosDa::new( BasicAuthCredentials::new(user, Some(password)), url::Url::parse(&url).unwrap());
let processor = Processor::new(da);
let handle = Box::pin(
builder
.node(EthereumNode::default())
.install_exex("aggregate-block-txs", async move |ctx| {
Ok(aggregate_block_txs(ctx, aggregator))
.install_exex("process-block", async move |ctx| {
Ok(process_blocks(ctx, processor))
})
.launch(),
)