skeleton repo for tutorial

This commit is contained in:
kashepavadan 2025-09-19 14:57:37 -04:00
parent 2003a72b04
commit 89f29f56c6
9 changed files with 9 additions and 764 deletions

View File

@ -1,6 +1,4 @@
use anyhow::Result;
use nomos_core::da::BlobId;
pub async fn sampling(_blob_id: BlobId) -> Result<()> {
Ok(())
}
// Tutorial code

View File

@ -8,101 +8,4 @@ use serde::{Deserialize, Serialize};
use anyhow::Result;
use tracing::{debug, info};
pub const CRYPTARCHIA_INFO: &str = "cryptarchia/info";
pub const STORAGE_BLOCK: &str = "storage/block";
use futures::Stream;
pub mod da;
pub mod nomos;
pub mod proofcheck;
#[derive(Clone, Debug)]
pub struct Credentials {
pub username: String,
pub password: Option<String>,
}
pub struct NomosClient {
base_url: Url,
reqwest_client: reqwest::Client,
basic_auth: Credentials,
nomos_client: ExecutorHttpClient,
}
impl NomosClient {
pub fn new(base_url: Url, basic_auth: Credentials) -> Self {
Self {
base_url,
reqwest_client: reqwest::Client::new(),
basic_auth: basic_auth.clone(),
nomos_client: ExecutorHttpClient::new(Some(BasicAuthCredentials::new(
basic_auth.username,
basic_auth.password,
))),
}
}
pub async fn get_cryptarchia_info(&self) -> Result<CryptarchiaInfo> {
let url = self.base_url.join(CRYPTARCHIA_INFO).expect("Invalid URL");
debug!("Requesting cryptarchia info from {}", url);
let request = self.reqwest_client.get(url).basic_auth(
&self.basic_auth.username,
self.basic_auth.password.as_deref(),
);
let response = request.send().await?;
if !response.status().is_success() {
anyhow::bail!("Failed to get cryptarchia info: {}", response.status());
}
let info = response.json::<CryptarchiaInfo>().await?;
Ok(info)
}
pub async fn get_block(&self, id: HeaderId) -> Result<Block> {
let url = self.base_url.join(STORAGE_BLOCK).expect("Invalid URL");
info!("Requesting block with HeaderId {}", id);
let request = self
.reqwest_client
.post(url)
.header("Content-Type", "application/json")
.basic_auth(
&self.basic_auth.username,
self.basic_auth.password.as_deref(),
)
.body(serde_json::to_string(&id).unwrap());
let response = request.send().await?;
if !response.status().is_success() {
anyhow::bail!("Failed to get block: {}", response.status());
}
let block: Block = response.json().await?;
Ok(block)
}
pub async fn get_shares(
&self,
blob_id: [u8; 32],
) -> Result<impl Stream<Item = DaLightShare>, Error> {
self.nomos_client
.get_shares::<DaShare>(
self.base_url.clone(),
blob_id,
HashSet::new(),
HashSet::new(),
true,
)
.await
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Block {
pub blobs: Vec<[u8; 32]>,
}
// Tutorial code

View File

@ -13,245 +13,4 @@ use url::Url;
use anyhow::Result;
use tokio::time::{sleep, Duration};
#[derive(Parser, Debug)]
#[clap(author, version, about = "Light Node validator")]
struct Args {
#[clap(long, default_value = "info")]
log_level: String,
#[clap(long, default_value = "http://localhost:8545")]
rpc: Url,
#[clap(long, default_value = "ws://localhost:8546")]
ws_rpc: Url,
#[clap(long, default_value = "http://localhost:8070")]
prover_url: Url,
#[clap(long)]
nomos_node: Url,
#[clap(long, default_value = "10")]
batch_size: u64,
#[clap(long)]
zeth_binary_dir: Option<PathBuf>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn error::Error>> {
let args = Args::parse();
let filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&args.log_level));
fmt::fmt().with_env_filter(filter).with_target(false).init();
let zone_blocks = follow_sz(args.ws_rpc.clone()).await.unwrap();
let (tx, da_blobs) = tokio::sync::mpsc::channel::<BlobId>(MAX_BLOBS);
let username = std::env::var("NOMOS_USER").unwrap_or_default();
let password = std::env::var("NOMOS_PASSWORD").ok();
tokio::spawn(check_blobs(
NomosClient::new(
args.nomos_node.clone(),
Credentials {
username,
password,
},
),
tx
));
verify_zone_stf(
args.batch_size,
args.rpc.clone(),
args.prover_url.clone(),
args.zeth_binary_dir.as_deref().unwrap_or_else(|| Path::new("zeth")),
zone_blocks,
tokio_stream::wrappers::ReceiverStream::new(da_blobs),
)
.await?;
Ok(())
}
const MAX_BLOBS: usize = 1 << 10;
const MAX_PROOF_RETRIES: usize = 5;
async fn verify_zone_stf(
batch_size: u64,
rpc: Url,
prover_url: Url,
zeth_binary_dir: &Path,
blocks: impl Stream<Item = (u64, BlobId)>,
included_blobs: impl Stream<Item = BlobId>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut blobs_on_consensus = lru::LruCache::new(NonZero::new(MAX_BLOBS).unwrap());
let mut sz_blobs = lru::LruCache::new(NonZero::new(MAX_BLOBS).unwrap());
tokio::pin!(blocks);
tokio::pin!(included_blobs);
loop {
let to_verify = tokio::select! {
Some((block_n, blob_id)) = blocks.next() => {
if blobs_on_consensus.pop(&blob_id).is_some() {
tracing::debug!("Block confirmed on consensus: {:?}", block_n);
Some((block_n, blob_id))
} else {
if let Some(expired) = sz_blobs.push(blob_id, block_n) {
tracing::warn!("Block was not confirmed on mainnet: {:?}", expired);
}
None
}
}
Some(blob_id) = included_blobs.next() => {
if let Some(block_n) = sz_blobs.pop(&blob_id) {
tracing::debug!("Block confirmed on consensus: {:?}", block_n);
Some((block_n, blob_id))
} else {
// Blobs coming from other zones are not going to be confirmed and
// should be removed from the cache.
// It is highly unlikely that a zone blob is seen first on consensus and
// later enough on the sz rpc to be dropped from the cache, although
// it is possible.
// Do something here if you want to keep track of those blobs.
let _ = blobs_on_consensus.push(blob_id, ());
None
}
}
};
if let Some((block_n, blob_id)) = to_verify {
let rpc = rpc.clone();
let prover_url = prover_url.clone();
let zeth_binary_dir = zeth_binary_dir.to_path_buf();
tokio::spawn(async move {
verify_blob(
block_n,
blob_id,
batch_size,
&rpc,
&prover_url,
&zeth_binary_dir,
)
.await
.unwrap_or_else(|e| {
tracing::error!("Failed to verify blob: {:?}", e);
});
});
}
}
}
async fn verify_blob(
block_number: u64,
blob: BlobId,
batch_size: u64,
rpc: &Url,
prover_url: &Url,
zeth_binary_dir: &Path,
) -> Result<()> {
// block are proved in batches, aligned to `batch_size`
let block_number = block_number - block_number % batch_size;
futures::try_join!(evm_lightnode::da::sampling(blob), async {
let mut sleep_time = 1;
for _ in 0..MAX_PROOF_RETRIES {
if proofcheck::verify_proof(
block_number,
batch_size,
rpc,
prover_url,
zeth_binary_dir,
)
.await.is_ok() {
return Ok(());
}
sleep(Duration::from_secs(sleep_time)).await;
sleep_time *= 2;
}
Err(anyhow::anyhow!("Failed to verify proof after {} retries", MAX_PROOF_RETRIES))
})?;
// TODO: reuse rpc results
Ok(())
}
/// Follow the sovereign zone and return the blob ID of each new produced block
async fn follow_sz(
ws_rpc: Url,
) -> Result<impl Stream<Item = (u64, BlobId)>, Box<dyn error::Error>> {
let provider = ProviderBuilder::new().on_ws(WsConnect::new(ws_rpc)).await?;
// Pub-sub: get a live stream of blocks
let blocks = provider
.subscribe_full_blocks()
.full()
.into_stream()
.await?;
Ok(blocks.filter_map(|block| async move {
if let Ok(block) = block {
// poor man's proof of equivalence
use alloy::consensus::{Block, Signed};
let block: Block<Signed<_>> = block.into_consensus().convert_transactions();
let (data, _) = evm_sequencer_node::encode_block(&block.clone().convert_transactions());
let blob_id = {
let encoder = kzgrs_backend::encoder::DaEncoder::new(
kzgrs_backend::encoder::DaEncoderParams::new(
2,
false,
kzgrs_backend::global::GLOBAL_PARAMETERS.clone(),
),
);
// this is a REALLY heavy task, so we should try not to block the thread here
let heavy_task = tokio::task::spawn_blocking(move || encoder.encode(&data));
let encoded_data = heavy_task.await.unwrap().unwrap();
kzgrs_backend::common::build_blob_id(
&encoded_data.row_commitments,
)
};
Some((block.header.number, blob_id))
} else {
tracing::error!("Failed to get block");
None
}
}))
}
#[derive(Debug, Serialize, Deserialize)]
struct CryptarchiaInfo {
tip: HeaderId,
}
/// Return blobs confirmed on Nomos.
async fn check_blobs(
nomos_client: NomosClient,
sink: tokio::sync::mpsc::Sender<BlobId>,
) -> Result<()> {
// TODO: a good implementation should follow the different forks and react to possible
// reorgs.
// We don't currently habe a food api to follow the chain externally, so for now we're just
// going to simply follow the tip.
let mut current_tip = HeaderId::default();
loop {
let info = nomos_client.get_cryptarchia_info().await?;
if info.tip != current_tip {
current_tip = info.tip;
tracing::debug!("new tip: {:?}", info.tip);
let blobs = nomos_client.get_block(info.tip).await?.bl_blobs;
if blobs.is_empty() {
tracing::debug!("No blobs found in block");
continue;
}
for blob in blobs {
sink.send(blob.id).await?;
}
} else {
tracing::trace!("No new tip, sleeping...");
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
}
// Tutorial code

View File

@ -2,47 +2,4 @@ use serde::{Deserialize, Deserializer, Serialize};
use hex::FromHex;
#[derive(Serialize, Deserialize, Debug)]
pub struct CryptarchiaInfo {
pub tip: HeaderId,
pub slot: u64,
pub height: u64,
}
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord, Default)]
pub struct HeaderId([u8; 32]);
impl<'de> Deserialize<'de> for HeaderId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let hex_str = String::deserialize(deserializer)?;
let bytes = <[u8; 32]>::from_hex(hex_str)
.map_err(|e| serde::de::Error::custom(format!("Invalid hex string: {}", e)))?;
Ok(HeaderId(bytes))
}
}
impl Serialize for HeaderId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let hex_str = hex::encode(self.0);
serializer.serialize_str(&hex_str)
}
}
use std::fmt;
impl fmt::Display for HeaderId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for byte in &self.0 {
write!(f, "{:02x}", byte)?;
}
Ok(())
}
}
// Tutorial code

View File

@ -5,45 +5,4 @@ use std::path::Path;
use tokio::process::Command;
use tracing::{info};
pub async fn verify_proof(
block_number: u64,
block_count: u64,
rpc: &Url,
prover_url: &Url,
zeth_bin: &Path,
) -> Result<()> {
info!(
"Verifying proof for blocks {}-{}",
block_number,
block_number + block_count - 1
);
let url = prover_url.join(&format!(
"/?block_start={}&block_count={}",
block_number, block_count
))?;
let proof = reqwest::get(url).await?.bytes().await?;
let mut tempfile = tempfile::NamedTempFile::new()?;
tempfile.write_all(&proof)?;
let output = Command::new(zeth_bin)
.args([
"verify",
&format!("--rpc={}", rpc),
&format!("--block-number={}", block_number),
&format!("--block-count={}", block_count),
&format!("--file={}", tempfile.path().display()),
])
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("zeth-ethereum verify command failed: {}", stderr);
}
info!("Proof verification successful");
Ok(())
}
// Tutorial code

View File

@ -7,32 +7,4 @@ use serde::Deserialize;
use std::path::PathBuf;
use tokio::fs;
#[derive(Deserialize)]
pub struct ProofRequest {
block_start: u64,
block_count: u64,
}
/// Handler for GET /
pub async fn serve_proof(Query(query): Query<ProofRequest>) -> Response {
let file_name = format!(
"{}-{}.zkp",
query.block_start,
query.block_count + query.block_start
);
let path = PathBuf::from(&file_name);
// Read file contents
match fs::read(&path).await {
Ok(bytes) => (StatusCode::OK, bytes).into_response(),
Err(err) => {
let status = if err.kind() == std::io::ErrorKind::NotFound {
StatusCode::NOT_FOUND
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
(status, format!("Error reading file: {}", err)).into_response()
}
}
}
// Tutorial code

View File

@ -8,184 +8,4 @@ use tracing_subscriber::{EnvFilter, fmt};
mod http;
#[derive(Parser, Debug)]
#[clap(author, version, about = "Ethereum Proof Generation Tool")]
struct Args {
#[clap(long, default_value = "http://localhost:8545")]
rpc: String,
#[clap(long, default_value = "5")]
start_block: u64,
#[clap(long, default_value = "10")]
batch_size: u64,
#[clap(long, default_value = "5")]
interval: u64,
#[clap(long)]
zeth_binary_dir: Option<String>,
#[clap(long, default_value = "info")]
log_level: String,
}
fn run_ethereum_prove(
rpc: &str,
block_number: u64,
batch_size: u64,
zeth_binary_dir: Option<String>,
log_level: &str,
) -> Result<(), String> {
info!(
"Running Ethereum prove for blocks {}-{}",
block_number,
block_number + batch_size - 1
);
let mut binary_path = if let Some(dir) = &zeth_binary_dir {
PathBuf::from(dir)
} else {
debug!("No binary directory provided, trying current directory");
std::env::current_dir().map_err(|e| format!("Failed to get current directory: {}", e))?
};
binary_path.push("zeth-ethereum");
if !binary_path.exists() {
return Err(format!("Binary not found at: {}", binary_path.display()));
}
let output = Command::new(&binary_path)
.env("RUST_LOG", log_level)
.args([
"prove",
&format!("--rpc={}", rpc),
&format!("--block-number={}", block_number),
&format!("--block-count={}", batch_size),
])
.output()
.map_err(|e| format!("Failed to execute zeth-ethereum prove: {}", e))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
error!("ethereum prove command failed: {}", stderr);
return Err(format!(
"ethereum prove command failed with status: {}\nStderr: {}",
output.status, stderr
));
}
info!("Successfully processed batch");
Ok(())
}
fn get_latest_block(client: &Client, rpc: &str) -> Result<u64, String> {
debug!("Checking latest block height...");
let request_body = json!({
"jsonrpc": "2.0",
"method": "eth_blockNumber",
"params": [],
"id": 1
});
let response = client
.post(rpc)
.json(&request_body)
.send()
.map_err(|e| format!("Failed to send request: {}", e))?;
if !response.status().is_success() {
return Err(format!("Request failed with status: {}", response.status()));
}
let response_json: Value = response
.json()
.map_err(|e| format!("Failed to parse response JSON: {}", e))?;
let block_hex = response_json
.get("result")
.and_then(Value::as_str)
.ok_or("Failed to parse response")?
.trim_start_matches("0x");
let block_number = u64::from_str_radix(block_hex, 16)
.map_err(|e| format!("Failed to parse hex block number: {}", e))?;
debug!("Latest block: {}", block_number);
Ok(block_number)
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
std::thread::spawn(move || {
if let Err(e) = run_server() {
error!("Error running server: {}", e);
}
});
let filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&args.log_level));
fmt::fmt().with_env_filter(filter).with_target(false).init();
info!("Starting Ethereum prover...");
let client = Client::new();
let mut current_block = args.start_block;
loop {
match get_latest_block(&client, &args.rpc) {
Ok(latest_block) => {
if latest_block >= current_block + args.batch_size {
info!(
"New blocks available. Current: {}, Latest: {}",
current_block, latest_block
);
match run_ethereum_prove(
&args.rpc,
current_block,
args.batch_size,
args.zeth_binary_dir.clone(),
&args.log_level,
) {
Ok(_) => {
current_block += args.batch_size;
info!("Updated current block to {}", current_block);
}
Err(e) => {
error!("Error running prover: {}", e);
}
}
} else {
info!(
"No new blocks to process. Current: {}, Latest: {}, sleeping...",
current_block, latest_block
);
thread::sleep(Duration::from_secs(args.interval));
}
}
Err(e) => {
error!("Error getting latest block: {}", e);
thread::sleep(Duration::from_secs(args.interval));
}
}
}
}
#[tokio::main]
async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
// Build our application with a route
let app = Router::new().route("/", get(http::serve_proof));
let addr = SocketAddr::from(([127, 0, 0, 1], 8070));
// Run it on localhost:8070
tracing::info!("Serving files on http://{}", addr);
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
Ok(())
}
// Tutorial code

View File

@ -6,58 +6,4 @@ use kzgrs_backend::{dispersal::Metadata, encoder::DaEncoderParams};
pub fn encode_block(block: &Block) -> (Vec<u8>, Metadata) {
let mut blob = bincode::serialize(&block).expect("Failed to serialize block");
let metadata = Metadata::new([0; 32], block.number.into());
// the node expects blobs to be padded to the next chunk size
let remainder = blob.len() % DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE;
blob.extend(std::iter::repeat_n(
0,
DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE - remainder,
));
(blob, metadata)
}
pub struct Processor {
da: NomosDa,
}
impl Processor {
pub fn new(da: NomosDa) -> Self {
Self { da }
}
pub async fn process_blocks(&mut self, new_blocks: impl Iterator<Item = Block>) {
for block in new_blocks {
let (blob, metadata) = encode_block(&block);
if let Err(e) = self.da.disperse(blob, metadata).await {
error!("Failed to disperse block: {e}");
} else {
info!("Dispersed block: {:?}", block);
}
}
}
}
pub struct NomosDa {
url: Url,
client: ExecutorHttpClient,
}
impl NomosDa {
pub fn new(basic_auth: BasicAuthCredentials, url: Url) -> Self {
Self {
client: ExecutorHttpClient::new(Some(basic_auth)),
url,
}
}
pub async fn disperse(&self, data: Vec<u8>, metadata: Metadata) -> Result<[u8; 32], Error> {
self.client
.publish_blob(self.url.clone(), data, metadata)
.await?;
Ok([0; 32]) // Placeholder for the actual blob ID
}
}
// Tutorial code

View File

@ -11,73 +11,4 @@ use reth_tracing::tracing::info;
use executor_http_client::BasicAuthCredentials;
use evm_sequencer_node::{Processor, NomosDa};
async fn process_blocks<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
mut processor: Processor,
) -> eyre::Result<()>
where
<<Node as FullNodeTypes>::Types as NodeTypes>::Primitives:
NodePrimitives<Block = reth_ethereum::Block>,
{
while let Some(notification) = ctx.notifications.try_next().await? {
let ExExNotification::ChainCommitted { new } = &notification else {
continue;
};
info!(committed_chain = ?new.range(), "Received commit");
processor
.process_blocks(
new.inner()
.0
.clone()
.into_blocks()
.map(reth_ethereum::primitives::RecoveredBlock::into_block),
)
.await;
ctx.events
.send(ExExEvent::FinishedHeight(new.tip().num_hash()))
.unwrap();
}
Ok(())
}
fn main() -> eyre::Result<()> {
Cli::try_parse_args_from([
"reth",
"node",
"--dev",
"--rpc.eth-proof-window=2048",
"--dev.block-time=2s",
"--http.addr=0.0.0.0",
"--http.api=eth,net,web3,debug,trace,txpool", // Some might be unnecessary, but I guess
"--ws",
"--ws.addr=0.0.0.0",
"--ws.api=eth,net,web3,txpool",
"--http.corsdomain=\"*\"", // Needed locally, probably needed here as well.
])
.unwrap()
.run(|builder, _| {
Box::pin(async move {
let url = std::env::var("NOMOS_EXECUTOR").unwrap_or_default();
let user = std::env::var("NOMOS_USER").unwrap_or_default();
let password = std::env::var("NOMOS_PASSWORD").unwrap_or_default();
let da = NomosDa::new(
BasicAuthCredentials::new(user, Some(password)),
url::Url::parse(&url).unwrap(),
);
let processor = Processor::new(da);
let handle = Box::pin(
builder
.node(EthereumNode::default())
.install_exex("process-block", async move |ctx| {
Ok(process_blocks(ctx, processor))
})
.launch(),
)
.await?;
handle.wait_for_node_exit().await
})
})
}
// Tutorial code