add lightnode

This commit is contained in:
Giacomo Pasini 2025-04-29 12:05:49 +02:00
parent d725b59f0f
commit 488d6c7ade
No known key found for this signature in database
GPG Key ID: FC08489D2D895D4B
9 changed files with 276 additions and 140 deletions

View File

@ -19,3 +19,10 @@ hex = { version = "0.4" }
futures = { version = "0.3" }
kzgrs-backend = { git = "https://github.com/logos-co/nomos", branch = "master" }
tempfile = "3.19.1"
alloy = { version = "0.13", features = ["full"] }
lru = { version = "0.14" }
nomos-core = { git = "https://github.com/logos-co/nomos", branch = "master" }
reth.workspace = true
evm-processor = { path = "../processor" }
anyhow = "1.0.98"
tokio-stream = "0.1.17"

View File

@ -0,0 +1,6 @@
use anyhow::Result;
use nomos_core::da::BlobId;
pub async fn sampling(_blob_id: BlobId) -> Result<()> {
Ok(())
}

View File

@ -4,12 +4,15 @@ use executor_http_client::{BasicAuthCredentials, Error, ExecutorHttpClient};
use kzgrs_backend::common::share::{DaLightShare, DaShare};
use nomos::{CryptarchiaInfo, HeaderId};
use reqwest::Url;
use tracing::{debug, error, info};
use serde::{Deserialize, Serialize};
use anyhow::Result;
use tracing::{debug, info};
pub const CRYPTARCHIA_INFO: &str = "cryptarchia/info";
pub const STORAGE_BLOCK: &str = "storage/block";
use futures::Stream;
pub mod da;
pub mod nomos;
pub mod proofcheck;
@ -39,7 +42,7 @@ impl NomosClient {
}
}
pub async fn get_cryptarchia_info(&self) -> Result<CryptarchiaInfo, String> {
pub async fn get_cryptarchia_info(&self) -> Result<CryptarchiaInfo> {
let url = self.base_url.join(CRYPTARCHIA_INFO).expect("Invalid URL");
debug!("Requesting cryptarchia info from {}", url);
@ -48,24 +51,17 @@ impl NomosClient {
self.basic_auth.password.as_deref(),
);
let response = request.send().await.map_err(|e| {
error!("Failed to send request: {}", e);
"Failed to send request".to_string()
})?;
let response = request.send().await?;
if !response.status().is_success() {
error!("Failed to get cryptarchia info: {}", response.status());
return Err("Failed to get cryptarchia info".to_string());
anyhow::bail!("Failed to get cryptarchia info: {}", response.status());
}
let info = response.json::<CryptarchiaInfo>().await.map_err(|e| {
error!("Failed to parse response: {}", e);
"Failed to parse response".to_string()
})?;
let info = response.json::<CryptarchiaInfo>().await?;
Ok(info)
}
pub async fn get_block(&self, id: HeaderId) -> Result<serde_json::Value, String> {
pub async fn get_block(&self, id: HeaderId) -> Result<Block> {
let url = self.base_url.join(STORAGE_BLOCK).expect("Invalid URL");
info!("Requesting block with HeaderId {}", id);
@ -79,22 +75,15 @@ impl NomosClient {
)
.body(serde_json::to_string(&id).unwrap());
let response = request.send().await.map_err(|e| {
error!("Failed to send request: {}", e);
"Failed to send request".to_string()
})?;
let response = request.send().await?;
if !response.status().is_success() {
error!("Failed to get block: {}", response.status());
return Err("Failed to get block".to_string());
anyhow::bail!("Failed to get block: {}", response.status());
}
let json: serde_json::Value = response.json().await.map_err(|e| {
error!("Failed to parse JSON: {}", e);
"Failed to parse JSON".to_string()
})?;
let block: Block = response.json().await?;
Ok(json)
Ok(block)
}
pub async fn get_shares(
@ -112,3 +101,8 @@ impl NomosClient {
.await
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Block {
pub blobs: Vec<[u8; 32]>,
}

View File

@ -1,11 +1,17 @@
use alloy::providers::{Provider, ProviderBuilder, WsConnect};
use clap::Parser;
use evm_lightnode::{Credentials, NomosClient, nomos::HeaderId, proofcheck};
use evm_lightnode::{NomosClient, nomos::HeaderId, proofcheck, Credentials};
use futures::Stream;
use futures::StreamExt;
use nomos_core::da::{BlobId, DaEncoder};
use serde::{Deserialize, Serialize};
use std::error;
use std::path::PathBuf;
use tracing::info;
use std::num::NonZero;
use std::path::{Path, PathBuf};
use tracing_subscriber::{EnvFilter, fmt};
use url::Url;
use anyhow::Result;
use tokio::time::{sleep, Duration};
#[derive(Parser, Debug)]
#[clap(author, version, about = "Light Node validator")]
@ -13,14 +19,17 @@ struct Args {
#[clap(long, default_value = "info")]
log_level: String,
#[clap(long, default_value = "http://localhost:8546")]
#[clap(long, default_value = "http://localhost:8545")]
rpc: Url,
#[clap(long, default_value = "wss://localhost:8546")]
ws_rpc: Url,
#[clap(long, default_value = "http://localhost:8070")]
prover_url: Url,
#[clap(long)]
start_block: u64,
#[clap(long, default_value = TESTNET_EXECUTOR)]
nomos_node: Url,
#[clap(long, default_value = "10")]
batch_size: u64,
@ -40,81 +49,210 @@ async fn main() -> Result<(), Box<dyn error::Error>> {
fmt::fmt().with_env_filter(filter).with_target(false).init();
proofcheck::verify_proof(
args.start_block,
let zone_blocks = follow_sz(args.ws_rpc.clone()).await?;
let (tx, da_blobs) = tokio::sync::mpsc::channel::<BlobId>(MAX_BLOBS);
tokio::spawn(check_blobs(
NomosClient::new(
args.nomos_node.clone(),
Credentials {
username: "user".to_string(),
password: Some("password".to_string()),
},
),
tx
));
verify_zone_stf(
args.batch_size,
&args.rpc,
&args.prover_url,
&args
.zeth_binary_dir
.unwrap_or_else(|| std::env::current_dir().unwrap())
.join("zeth-ethereum"),
args.rpc.clone(),
args.prover_url.clone(),
args.zeth_binary_dir.as_deref().unwrap_or_else(|| Path::new("zeth")),
zone_blocks,
tokio_stream::wrappers::ReceiverStream::new(da_blobs),
)
.await?;
//todo: use check_da_periodically to check for new blocks from da
Ok(())
}
#[allow(dead_code)]
async fn check_da_periodically() -> Result<(), Box<dyn error::Error>> {
let url = std::env::var("NOMOS_EXECUTOR").unwrap_or(TESTNET_EXECUTOR.to_string());
let user = std::env::var("NOMOS_USER").unwrap_or_default();
let password = std::env::var("NOMOS_PASSWORD").unwrap_or_default();
let basic_auth = Credentials {
username: user,
password: Some(password),
};
const MAX_BLOBS: usize = 1 << 10;
const MAX_PROOF_RETRIES: usize = 5;
let nomos_client = NomosClient::new(Url::parse(&url).unwrap(), basic_auth);
let mut current_tip = HeaderId::default();
async fn verify_zone_stf(
batch_size: u64,
rpc: Url,
prover_url: Url,
zeth_binary_dir: &Path,
blocks: impl Stream<Item = (u64, BlobId)>,
included_blobs: impl Stream<Item = BlobId>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut blobs_on_consensus = lru::LruCache::new(NonZero::new(MAX_BLOBS).unwrap());
let mut sz_blobs = lru::LruCache::new(NonZero::new(MAX_BLOBS).unwrap());
tokio::pin!(blocks);
tokio::pin!(included_blobs);
loop {
let info = nomos_client.get_cryptarchia_info().await?;
let to_verify = tokio::select! {
Some((block_n, blob_id)) = blocks.next() => {
if let Some(_) = blobs_on_consensus.pop(&blob_id) {
tracing::debug!("Block confirmed on consensus: {:?}", block_n);
Some((block_n, blob_id))
} else {
if let Some(expired) = sz_blobs.push(blob_id, block_n) {
tracing::warn!("Block was not confirmed on mainnet: {:?}", expired);
}
None
}
}
Some(blob_id) = included_blobs.next() => {
if let Some(block_n) = sz_blobs.pop(&blob_id) {
tracing::debug!("Block confirmed on consensus: {:?}", block_n);
Some((block_n, blob_id))
} else {
// Blobs coming from other zones are not going to be confirmed and
// should be removed from the cache.
// It is highly unlikely that a zone blob is seen first on consensus and
// later enough on the sz rpc to be dropped from the cache, although
// it is possible.
// Do something here if you want to keep track of those blobs.
let _ = blobs_on_consensus.push(blob_id, ());
None
}
}
};
if info.tip != current_tip {
current_tip = info.tip;
info!("New tip: {:?}", current_tip);
let block = nomos_client.get_block(info.tip).await?;
info!("Block: {:?}", block);
if let Some((block_n, blob_id)) = to_verify {
let rpc = rpc.clone();
let prover_url = prover_url.clone();
let zeth_binary_dir = zeth_binary_dir.to_path_buf();
tokio::spawn(async move {
verify_blob(
block_n,
blob_id,
batch_size,
&rpc,
&prover_url,
&zeth_binary_dir,
)
.await
.unwrap_or_else(|e| {
tracing::error!("Failed to verify blob: {:?}", e);
});
});
}
}
}
let blobs = block.get("bl_blobs");
async fn verify_blob(
block_number: u64,
blob: BlobId,
batch_size: u64,
rpc: &Url,
prover_url: &Url,
zeth_binary_dir: &Path,
) -> Result<()> {
// block are proved in batches, aligned to `batch_size`
let block_number = block_number - block_number % batch_size;
futures::try_join!(evm_lightnode::da::sampling(blob), async {
let mut sleep_time = 1;
for _ in 0..MAX_PROOF_RETRIES {
if proofcheck::verify_proof(
block_number,
batch_size,
rpc,
prover_url,
zeth_binary_dir,
)
.await.is_ok() {
return Ok(());
}
sleep(Duration::from_secs(sleep_time)).await;
sleep_time *= 2;
}
Err(anyhow::anyhow!("Failed to verify proof after {} retries", MAX_PROOF_RETRIES))
})?;
// TODO: reuse rpc results
Ok(())
}
if blobs.is_none() {
info!("Parsing error");
continue;
/// Follow the sovereign zone and return the blob ID of each new produced block
async fn follow_sz(
ws_rpc: Url,
) -> Result<impl Stream<Item = (u64, BlobId)>, Box<dyn error::Error>> {
let provider = ProviderBuilder::new().on_ws(WsConnect::new(ws_rpc)).await?;
// Pub-sub: get a live stream of blocks
let blocks = provider
.subscribe_full_blocks()
.full()
.into_stream()
.await?;
Ok(blocks.filter_map(|block| async move {
if let Ok(block) = block {
// poor man's proof of equivalence
use alloy::consensus::{Block, Signed};
let block: Block<Signed<_>> = block.into_consensus().convert_transactions();
let (data, _) = evm_processor::encode_block(&block.clone().convert_transactions());
let blob_id = {
let encoder = kzgrs_backend::encoder::DaEncoder::new(
kzgrs_backend::encoder::DaEncoderParams::new(
2,
false,
kzgrs_backend::global::GLOBAL_PARAMETERS.clone(),
),
);
// this is a REALLY heavy task, so we should try not to block the thread here
let heavy_task = tokio::task::spawn_blocking(move || encoder.encode(&data));
let encoded_data = heavy_task.await.unwrap().unwrap();
kzgrs_backend::common::build_blob_id(
&encoded_data.aggregated_column_commitment,
&encoded_data.row_commitments,
)
};
let blobs = blobs.unwrap().as_array().unwrap();
Some((block.header.number, blob_id))
} else {
tracing::error!("Failed to get block");
None
}
}))
}
#[derive(Debug, Serialize, Deserialize)]
struct CryptarchiaInfo {
tip: HeaderId,
}
/// Return blobs confirmed on Nomos.
async fn check_blobs(
nomos_client: NomosClient,
sink: tokio::sync::mpsc::Sender<BlobId>,
) -> Result<()> {
// TODO: a good implementation should follow the different forks and react to possible
// reorgs.
// We don't currently habe a food api to follow the chain externally, so for now we're just
// going to simply follow the tip.
let mut current_tip = HeaderId::default();
loop {
let info = nomos_client.get_cryptarchia_info().await?;
if info.tip != current_tip {
current_tip = info.tip;
tracing::debug!("new tip: {:?}", info.tip);
let blobs = nomos_client.get_block(info.tip).await?.blobs;
if blobs.is_empty() {
info!("No blobs found in block");
tracing::debug!("No blobs found in block");
continue;
}
for blob in blobs {
let id_array = blob.get("id").unwrap().as_array().unwrap();
let mut blob_id = [0u8; 32];
for (i, num) in id_array.iter().enumerate() {
if i < 32 {
blob_id[i] = num.as_u64().unwrap() as u8;
}
}
info!("fetching stream");
let shares_stream = nomos_client.get_shares(blob_id).await?;
let shares = shares_stream.collect::<Vec<_>>().await;
info!("Shares for blob_id {:?}: {:?}", blob_id, shares);
// todo: verify proof
sink.send(blob).await?;
}
} else {
info!("No new tip, sleeping...");
tracing::trace!("No new tip, sleeping...");
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
}

View File

@ -1,8 +1,9 @@
use tokio::process::Command;
use std::path::Path;
use std::io::Write;
use anyhow::Result;
use reqwest::Url;
use tracing::{error, info};
use std::io::Write;
use std::path::Path;
use tokio::process::Command;
use tracing::{info};
pub async fn verify_proof(
block_number: u64,
@ -10,7 +11,7 @@ pub async fn verify_proof(
rpc: &Url,
prover_url: &Url,
zeth_bin: &Path,
) -> Result<(), String> {
) -> Result<()> {
info!(
"Verifying proof for blocks {}-{}",
block_number,
@ -20,18 +21,11 @@ pub async fn verify_proof(
let url = prover_url.join(&format!(
"/?block_start={}&block_count={}",
block_number, block_count
)).map_err(|e| format!("Failed to construct URL: {}", e))?;
let proof = reqwest::get(url).await
.map_err(|e| format!("Failed to fetch proof: {}", e))?
.bytes()
.await
.map_err(|e| format!("Failed to read proof response: {}", e))?;
))?;
let proof = reqwest::get(url).await?.bytes().await?;
let mut tempfile = tempfile::NamedTempFile::new()
.map_err(|e| format!("Failed to create temporary file: {}", e))?;
tempfile.write_all(&proof)
.map_err(|e| format!("Failed to write proof to file: {}", e))?;
let mut tempfile = tempfile::NamedTempFile::new()?;
tempfile.write_all(&proof)?;
let output = Command::new(zeth_bin)
.args([
@ -41,16 +35,12 @@ pub async fn verify_proof(
&format!("--block-count={}", block_count),
&format!("--file={}", tempfile.path().display()),
])
.output().await
.map_err(|e| format!("Failed to execute zeth-ethereum verify: {}", e))?;
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
error!("zeth-ethereum verify command failed: {}", stderr);
return Err(format!(
"zeth-ethereum verify command failed with status: {}\nStderr: {}",
output.status, stderr
));
anyhow::bail!("zeth-ethereum verify command failed: {}", stderr);
}
info!("Proof verification successful");

View File

@ -5,6 +5,19 @@ use reqwest::Url;
use reth_ethereum::Block;
use reth_tracing::tracing::{error, info};
pub fn encode_block(block: &Block) -> (Vec<u8>, Metadata) {
let mut blob = bincode::serialize(&block).expect("Failed to serialize block");
let metadata = Metadata::new([0; 32], block.number.into());
// the node expects blobs to be padded to the next chunk size
let remainder = blob.len() % DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE;
blob.extend(std::iter::repeat_n(
0,
DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE - remainder,
));
(blob, metadata)
}
pub struct Processor {
da: NomosDa,
}
@ -16,14 +29,7 @@ impl Processor {
pub async fn process_blocks(&mut self, new_blocks: impl Iterator<Item = Block>) {
for block in new_blocks {
let mut blob = bincode::serialize(&block).expect("Failed to serialize block");
let metadata = Metadata::new([0; 32], block.number.into());
// the node expects blobs to be padded to the next chunk size
let remainder = blob.len() % DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE;
blob.extend(std::iter::repeat_n(
0,
DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE - remainder,
));
let (blob, metadata) = encode_block(&block);
if let Err(e) = self.da.disperse(blob, metadata).await {
error!("Failed to disperse block: {e}");
} else {
@ -49,6 +55,8 @@ impl NomosDa {
pub async fn disperse(&self, data: Vec<u8>, metadata: Metadata) -> Result<[u8; 32], Error> {
self.client
.publish_blob(self.url.clone(), data, metadata)
.await
.await?;
Ok([0; 32]) // Placeholder for the actual blob ID
}
}

View File

@ -1,4 +1,3 @@
use axum::{
extract::Query,
http::StatusCode,
@ -11,22 +10,22 @@ use tokio::fs;
#[derive(Deserialize)]
pub struct ProofRequest {
block_start: u64,
block_count: u64
block_count: u64,
}
/// Handler for GET /
pub async fn serve_proof(Query(query): Query<ProofRequest>) -> Response {
let file_name = format!("{}-{}.zkp", query.block_start, query.block_count + query.block_start);
let file_name = format!(
"{}-{}.zkp",
query.block_start,
query.block_count + query.block_start
);
let path = PathBuf::from(&file_name);
// Read file contents
match fs::read(&path).await {
Ok(bytes) => (
StatusCode::OK,
bytes,
).into_response(),
Ok(bytes) => (StatusCode::OK, bytes).into_response(),
Err(err) => {
let status = if err.kind() == std::io::ErrorKind::NotFound {
StatusCode::NOT_FOUND
@ -36,4 +35,4 @@ pub async fn serve_proof(Query(query): Query<ProofRequest>) -> Response {
(status, format!("Error reading file: {}", err)).into_response()
}
}
}
}

View File

@ -1,20 +1,17 @@
use axum::{Router, routing::get};
use clap::Parser;
use reqwest::blocking::Client;
use serde_json::{json, Value};
use std::{path::PathBuf, process::Command, thread, time::Duration, net::SocketAddr};
use serde_json::{Value, json};
use std::{net::SocketAddr, path::PathBuf, process::Command, thread, time::Duration};
use tracing::{debug, error, info};
use tracing_subscriber::{fmt, EnvFilter};
use axum::{
routing::get,
Router,
};
use tracing_subscriber::{EnvFilter, fmt};
mod http;
#[derive(Parser, Debug)]
#[clap(author, version, about = "Ethereum Proof Generation Tool")]
struct Args {
#[clap(long, default_value = "http://localhost:8546")]
#[clap(long, default_value = "http://localhost:8545")]
rpc: String,
#[clap(long, default_value = "5")]
@ -179,13 +176,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
#[tokio::main]
async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
// Build our application with a route
let app = Router::new()
.route("/", get(http::serve_proof));
let app = Router::new().route("/", get(http::serve_proof));
let addr = SocketAddr::from(([127, 0, 0, 1], 8070));
// Run it on localhost:8070
@ -194,4 +188,4 @@ async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
axum::serve(listener, app).await.unwrap();
Ok(())
}
}

View File

@ -51,10 +51,10 @@ fn main() -> eyre::Result<()> {
"--rpc.eth-proof-window=2048",
"--dev.block-time=2s",
"--http.addr=0.0.0.0",
"--http.port=8546",
"--http.api eth,net,web3,debug,trace,txpool", // Some might be unnecessary, but I guess
// they don't hurt
"--http.corsdomain \"*\"" // Needed locally, probably needed here as well.
"--http.api=eth,net,web3,debug,trace,txpool", // Some might be unnecessary, but I guess
"--ws.addr=0.0.0.0",
"--ws.api=eth,net,web3,txpool",
"--http.corsdomain=\"*\"", // Needed locally, probably needed here as well.
])
.unwrap()
.run(|builder, _| {