From 8ee6c1c12d24441464ab3126c71df7ed6ad9d7d0 Mon Sep 17 00:00:00 2001 From: Antonio Antonino Date: Wed, 9 Apr 2025 09:07:39 +0200 Subject: [PATCH 1/2] Custom binary wrapping reth works --- sz-poc-offsite-2025/Cargo.toml | 11 +++++-- sz-poc-offsite-2025/evm/evm-node/Cargo.toml | 11 +++++++ sz-poc-offsite-2025/evm/evm-node/src/main.rs | 34 ++++++++++++++++++++ sz-poc-offsite-2025/node/Cargo.toml | 6 ---- sz-poc-offsite-2025/node/src/main.rs | 2 -- 5 files changed, 53 insertions(+), 11 deletions(-) create mode 100644 sz-poc-offsite-2025/evm/evm-node/Cargo.toml create mode 100644 sz-poc-offsite-2025/evm/evm-node/src/main.rs delete mode 100644 sz-poc-offsite-2025/node/Cargo.toml delete mode 100644 sz-poc-offsite-2025/node/src/main.rs diff --git a/sz-poc-offsite-2025/Cargo.toml b/sz-poc-offsite-2025/Cargo.toml index fcc98c6..e4f16ed 100644 --- a/sz-poc-offsite-2025/Cargo.toml +++ b/sz-poc-offsite-2025/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["node"] +members = ["evm/evm-node"] resolver = "3" [workspace.package] @@ -7,7 +7,12 @@ edition = "2024" [workspace.dependencies] # Internal -node = { path = "node" } +evm-node = { path = "evm/evm-node" } # External -tokio = { version = "1.0" } +clap = { version = "4.5" } +futures-util = { version = "0.3" } +jsonrpsee = { version = "0.24" } +reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.8" } +reth-cli-commands = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.8" } +reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.8" } diff --git a/sz-poc-offsite-2025/evm/evm-node/Cargo.toml b/sz-poc-offsite-2025/evm/evm-node/Cargo.toml new file mode 100644 index 0000000..b67508d --- /dev/null +++ b/sz-poc-offsite-2025/evm/evm-node/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "evm-node" +edition = { workspace = true } + +[dependencies] +clap = { workspace = true } +futures-util = { workspace = true } +jsonrpsee = { workspace = true } +reth = { workspace = true } +reth-cli-commands = { workspace = true } +reth-node-ethereum = { workspace = true } diff --git a/sz-poc-offsite-2025/evm/evm-node/src/main.rs b/sz-poc-offsite-2025/evm/evm-node/src/main.rs new file mode 100644 index 0000000..d1f3656 --- /dev/null +++ b/sz-poc-offsite-2025/evm/evm-node/src/main.rs @@ -0,0 +1,34 @@ +use clap::Parser; +use futures_util::StreamExt; +use reth::{ + builder::NodeHandle, chainspec::EthereumChainSpecParser, cli::Cli, + transaction_pool::TransactionPool, +}; +use reth_cli_commands::node::NoArgs; +use reth_node_ethereum::node::EthereumNode; + +fn main() { + Cli::::parse() + .run(|builder, _| async move { + // launch the node + let NodeHandle { + node, + node_exit_future, + } = builder.node(EthereumNode::default()).launch().await?; + + // create a new subscription to pending transactions + let mut pending_transactions = node.pool.new_pending_pool_transactions_listener(); + + // Spawn an async block to listen for validated transactions. + node.task_executor.spawn(Box::pin(async move { + // Waiting for new transactions + while let Some(event) = pending_transactions.next().await { + let tx = event.transaction; + println!("Transaction received: {:?}", tx.transaction); + } + })); + + node_exit_future.await + }) + .unwrap(); +} diff --git a/sz-poc-offsite-2025/node/Cargo.toml b/sz-poc-offsite-2025/node/Cargo.toml deleted file mode 100644 index cfdc2e1..0000000 --- a/sz-poc-offsite-2025/node/Cargo.toml +++ /dev/null @@ -1,6 +0,0 @@ -[package] -name = "node" -edition = { workspace = true } - -[dependencies] -tokio = { workspace = true, features = ["full"] } diff --git a/sz-poc-offsite-2025/node/src/main.rs b/sz-poc-offsite-2025/node/src/main.rs deleted file mode 100644 index 7f755fb..0000000 --- a/sz-poc-offsite-2025/node/src/main.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[tokio::main] -async fn main() {} From 25cd9c9a52d63625c75714b8c310e9aad506eb52 Mon Sep 17 00:00:00 2001 From: Antonio Antonino Date: Wed, 9 Apr 2025 10:13:25 +0200 Subject: [PATCH 2/2] Skelton complete --- sz-poc-offsite-2025/Cargo.toml | 7 +- sz-poc-offsite-2025/evm/evm-node/src/main.rs | 34 -- .../evm/{evm-node => node}/Cargo.toml | 2 + sz-poc-offsite-2025/evm/node/src/main.rs | 26 ++ .../evm/sequencer-mempool/Cargo.toml | 9 + .../evm/sequencer-mempool/src/lib.rs | 365 ++++++++++++++++++ 6 files changed, 407 insertions(+), 36 deletions(-) delete mode 100644 sz-poc-offsite-2025/evm/evm-node/src/main.rs rename sz-poc-offsite-2025/evm/{evm-node => node}/Cargo.toml (86%) create mode 100644 sz-poc-offsite-2025/evm/node/src/main.rs create mode 100644 sz-poc-offsite-2025/evm/sequencer-mempool/Cargo.toml create mode 100644 sz-poc-offsite-2025/evm/sequencer-mempool/src/lib.rs diff --git a/sz-poc-offsite-2025/Cargo.toml b/sz-poc-offsite-2025/Cargo.toml index e4f16ed..4b21ae4 100644 --- a/sz-poc-offsite-2025/Cargo.toml +++ b/sz-poc-offsite-2025/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["evm/evm-node"] +members = ["evm/node", "evm/sequencer-mempool"] resolver = "3" [workspace.package] @@ -7,12 +7,15 @@ edition = "2024" [workspace.dependencies] # Internal -evm-node = { path = "evm/evm-node" } +evm-node = { path = "evm/node" } +evm-sequencer-mempool = { path = "evm/sequencer-mempool" } # External clap = { version = "4.5" } +eyre = { version = "0.6" } futures-util = { version = "0.3" } jsonrpsee = { version = "0.24" } reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.8" } reth-cli-commands = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.8" } reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.8" } +tokio = { version = "1.0" } diff --git a/sz-poc-offsite-2025/evm/evm-node/src/main.rs b/sz-poc-offsite-2025/evm/evm-node/src/main.rs deleted file mode 100644 index d1f3656..0000000 --- a/sz-poc-offsite-2025/evm/evm-node/src/main.rs +++ /dev/null @@ -1,34 +0,0 @@ -use clap::Parser; -use futures_util::StreamExt; -use reth::{ - builder::NodeHandle, chainspec::EthereumChainSpecParser, cli::Cli, - transaction_pool::TransactionPool, -}; -use reth_cli_commands::node::NoArgs; -use reth_node_ethereum::node::EthereumNode; - -fn main() { - Cli::::parse() - .run(|builder, _| async move { - // launch the node - let NodeHandle { - node, - node_exit_future, - } = builder.node(EthereumNode::default()).launch().await?; - - // create a new subscription to pending transactions - let mut pending_transactions = node.pool.new_pending_pool_transactions_listener(); - - // Spawn an async block to listen for validated transactions. - node.task_executor.spawn(Box::pin(async move { - // Waiting for new transactions - while let Some(event) = pending_transactions.next().await { - let tx = event.transaction; - println!("Transaction received: {:?}", tx.transaction); - } - })); - - node_exit_future.await - }) - .unwrap(); -} diff --git a/sz-poc-offsite-2025/evm/evm-node/Cargo.toml b/sz-poc-offsite-2025/evm/node/Cargo.toml similarity index 86% rename from sz-poc-offsite-2025/evm/evm-node/Cargo.toml rename to sz-poc-offsite-2025/evm/node/Cargo.toml index b67508d..2321a85 100644 --- a/sz-poc-offsite-2025/evm/evm-node/Cargo.toml +++ b/sz-poc-offsite-2025/evm/node/Cargo.toml @@ -3,6 +3,8 @@ name = "evm-node" edition = { workspace = true } [dependencies] +evm-sequencer-mempool = { workspace = true } + clap = { workspace = true } futures-util = { workspace = true } jsonrpsee = { workspace = true } diff --git a/sz-poc-offsite-2025/evm/node/src/main.rs b/sz-poc-offsite-2025/evm/node/src/main.rs new file mode 100644 index 0000000..46f7a98 --- /dev/null +++ b/sz-poc-offsite-2025/evm/node/src/main.rs @@ -0,0 +1,26 @@ +use reth::cli::Cli; +use reth_node_ethereum::node::{EthereumAddOns, EthereumNode}; + +use evm_sequencer_mempool::EvmSequencerMempoolBuilder; + +fn main() { + Cli::parse_args() + .run(|builder, _| async move { + // launch the node + let handle = Box::pin( + builder + // use the default ethereum node types + .with_types::() + // use default ethereum components but use our custom pool + .with_components( + EthereumNode::components().pool(EvmSequencerMempoolBuilder::default()), + ) + .with_add_ons(EthereumAddOns::default()) + .launch(), + ) + .await?; + + handle.wait_for_node_exit().await + }) + .unwrap(); +} diff --git a/sz-poc-offsite-2025/evm/sequencer-mempool/Cargo.toml b/sz-poc-offsite-2025/evm/sequencer-mempool/Cargo.toml new file mode 100644 index 0000000..4bc60c6 --- /dev/null +++ b/sz-poc-offsite-2025/evm/sequencer-mempool/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "evm-sequencer-mempool" +edition = { workspace = true } + +[dependencies] +eyre = { workspace = true } +reth = { workspace = true } +reth-node-ethereum = { workspace = true } +tokio = { workspace = true } diff --git a/sz-poc-offsite-2025/evm/sequencer-mempool/src/lib.rs b/sz-poc-offsite-2025/evm/sequencer-mempool/src/lib.rs new file mode 100644 index 0000000..3f06fab --- /dev/null +++ b/sz-poc-offsite-2025/evm/sequencer-mempool/src/lib.rs @@ -0,0 +1,365 @@ +use std::{collections::HashSet, sync::Arc}; +use tokio::sync::mpsc::Receiver; + +use reth::{ + api::{Block, FullNodeTypes, NodeTypes}, + builder::{BuilderContext, components::PoolBuilder}, + chainspec::ChainSpec, + network::types::HandleMempoolData, + primitives::{EthPrimitives, Recovered}, + providers::ChangedAccount, + revm::primitives::{Address, B256, alloy_primitives::TxHash}, + rpc::types::{BlobTransactionSidecar, engine::BlobAndProofV1}, + transaction_pool::{ + AllPoolTransactions, AllTransactionsEvents, BestTransactions, BestTransactionsAttributes, + BlobStore, BlobStoreError, BlockInfo, CanonicalStateUpdate, CoinbaseTipOrdering, + EthPoolTransaction, EthPooledTransaction, EthTransactionValidator, + GetPooledTransactionLimit, NewBlobSidecar, NewTransactionEvent, Pool, PoolResult, PoolSize, + PoolTransaction, PropagatedTransactions, TransactionEvents, TransactionListenerKind, + TransactionOrdering, TransactionOrigin, TransactionPool, TransactionPoolExt, + TransactionValidationTaskExecutor, TransactionValidator, ValidPoolTransaction, + blobstore::DiskFileBlobStore, + }, +}; +use reth_node_ethereum::node::EthereumPoolBuilder; + +#[derive(Default)] +pub struct EvmSequencerMempoolBuilder(EthereumPoolBuilder); + +impl PoolBuilder for EvmSequencerMempoolBuilder +where + Types: NodeTypes, + Node: FullNodeTypes, +{ + type Pool = EvmSequencerMempool; + + async fn build_pool(self, ctx: &BuilderContext) -> eyre::Result { + let pool = self.0.build_pool(ctx).await?; + Ok(EvmSequencerPool(pool)) + } +} + +pub type EvmSequencerMempool = EvmSequencerPool< + TransactionValidationTaskExecutor>, + CoinbaseTipOrdering, + S, +>; + +#[derive(Debug)] +pub struct EvmSequencerPool(Pool) +where + T: TransactionOrdering; + +impl TransactionPool for EvmSequencerPool +where + V: TransactionValidator, + ::Transaction: EthPoolTransaction, + T: TransactionOrdering::Transaction>, + S: BlobStore, +{ + type Transaction = T::Transaction; + + fn pool_size(&self) -> PoolSize { + self.0.pool_size() + } + + fn block_info(&self) -> BlockInfo { + self.0.block_info() + } + + fn add_transaction_and_subscribe( + &self, + origin: TransactionOrigin, + transaction: Self::Transaction, + ) -> impl Future> + Send { + self.0.add_transaction_and_subscribe(origin, transaction) + } + + fn add_transaction( + &self, + origin: TransactionOrigin, + transaction: Self::Transaction, + ) -> impl Future> + Send { + self.0.add_transaction(origin, transaction) + } + + fn add_transactions( + &self, + origin: TransactionOrigin, + transactions: Vec, + ) -> impl Future>> + Send { + self.0.add_transactions(origin, transactions) + } + + fn transaction_event_listener(&self, tx_hash: TxHash) -> Option { + self.0.transaction_event_listener(tx_hash) + } + + fn all_transactions_event_listener(&self) -> AllTransactionsEvents { + self.0.all_transactions_event_listener() + } + + fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver { + self.0.pending_transactions_listener_for(kind) + } + + fn blob_transaction_sidecars_listener(&self) -> Receiver { + self.0.blob_transaction_sidecars_listener() + } + + fn new_transactions_listener_for( + &self, + kind: TransactionListenerKind, + ) -> Receiver> { + self.0.new_transactions_listener_for(kind) + } + + fn pooled_transaction_hashes(&self) -> Vec { + self.0.pooled_transaction_hashes() + } + + fn pooled_transaction_hashes_max(&self, max: usize) -> Vec { + self.0.pooled_transaction_hashes_max(max) + } + + fn pooled_transactions(&self) -> Vec>> { + self.0.pooled_transactions() + } + + fn pooled_transactions_max( + &self, + max: usize, + ) -> Vec>> { + self.0.pooled_transactions_max(max) + } + + fn get_pooled_transaction_elements( + &self, + tx_hashes: Vec, + limit: GetPooledTransactionLimit, + ) -> Vec<::Pooled> { + self.0.get_pooled_transaction_elements(tx_hashes, limit) + } + + fn get_pooled_transaction_element( + &self, + tx_hash: TxHash, + ) -> Option::Pooled>> { + self.0.get_pooled_transaction_element(tx_hash) + } + + fn best_transactions( + &self, + ) -> Box>>> { + self.0.best_transactions() + } + + fn best_transactions_with_attributes( + &self, + best_transactions_attributes: BestTransactionsAttributes, + ) -> Box>>> { + self.0 + .best_transactions_with_attributes(best_transactions_attributes) + } + + fn pending_transactions(&self) -> Vec>> { + self.0.pending_transactions() + } + + fn pending_transactions_max( + &self, + max: usize, + ) -> Vec>> { + self.0.pending_transactions_max(max) + } + + fn queued_transactions(&self) -> Vec>> { + self.0.queued_transactions() + } + + fn all_transactions(&self) -> AllPoolTransactions { + self.0.all_transactions() + } + + fn remove_transactions( + &self, + hashes: Vec, + ) -> Vec>> { + self.0.remove_transactions(hashes) + } + + fn remove_transactions_and_descendants( + &self, + hashes: Vec, + ) -> Vec>> { + self.0.remove_transactions_and_descendants(hashes) + } + + fn remove_transactions_by_sender( + &self, + sender: Address, + ) -> Vec>> { + self.0.remove_transactions_by_sender(sender) + } + + fn retain_unknown(&self, announcement: &mut A) + where + A: HandleMempoolData, + { + self.0.retain_unknown(announcement); + } + + fn get(&self, tx_hash: &TxHash) -> Option>> { + self.0.get(tx_hash) + } + + fn get_all(&self, txs: Vec) -> Vec>> { + self.0.get_all(txs) + } + + fn on_propagated(&self, txs: PropagatedTransactions) { + self.0.on_propagated(txs); + } + + fn get_transactions_by_sender( + &self, + sender: Address, + ) -> Vec>> { + self.0.get_transactions_by_sender(sender) + } + + fn get_pending_transactions_with_predicate( + &self, + predicate: impl FnMut(&ValidPoolTransaction) -> bool, + ) -> Vec>> { + self.0.get_pending_transactions_with_predicate(predicate) + } + + fn get_pending_transactions_by_sender( + &self, + sender: Address, + ) -> Vec>> { + self.0.get_pending_transactions_by_sender(sender) + } + + fn get_queued_transactions_by_sender( + &self, + sender: Address, + ) -> Vec>> { + self.0.get_queued_transactions_by_sender(sender) + } + + fn get_highest_transaction_by_sender( + &self, + sender: Address, + ) -> Option>> { + self.0.get_highest_transaction_by_sender(sender) + } + + fn get_highest_consecutive_transaction_by_sender( + &self, + sender: Address, + on_chain_nonce: u64, + ) -> Option>> { + self.0 + .get_highest_consecutive_transaction_by_sender(sender, on_chain_nonce) + } + + fn get_transaction_by_sender_and_nonce( + &self, + sender: Address, + nonce: u64, + ) -> Option>> { + self.0.get_transaction_by_sender_and_nonce(sender, nonce) + } + + fn get_transactions_by_origin( + &self, + origin: TransactionOrigin, + ) -> Vec>> { + self.0.get_transactions_by_origin(origin) + } + + fn get_pending_transactions_by_origin( + &self, + origin: TransactionOrigin, + ) -> Vec>> { + self.0.get_pending_transactions_by_origin(origin) + } + + fn unique_senders(&self) -> HashSet
{ + self.0.unique_senders() + } + + fn get_blob( + &self, + tx_hash: TxHash, + ) -> Result>, BlobStoreError> { + self.0.get_blob(tx_hash) + } + + fn get_all_blobs( + &self, + tx_hashes: Vec, + ) -> Result)>, BlobStoreError> { + self.0.get_all_blobs(tx_hashes) + } + + fn get_all_blobs_exact( + &self, + tx_hashes: Vec, + ) -> Result>, BlobStoreError> { + self.0.get_all_blobs_exact(tx_hashes) + } + + fn get_blobs_for_versioned_hashes( + &self, + versioned_hashes: &[B256], + ) -> Result>, BlobStoreError> { + self.0.get_blobs_for_versioned_hashes(versioned_hashes) + } +} + +impl TransactionPoolExt for EvmSequencerPool +where + V: TransactionValidator, + ::Transaction: EthPoolTransaction, + T: TransactionOrdering::Transaction>, + S: BlobStore, +{ + fn set_block_info(&self, info: BlockInfo) { + self.0.set_block_info(info); + } + + fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, B>) + where + B: Block, + { + self.0.on_canonical_state_change(update); + } + + fn update_accounts(&self, accounts: Vec) { + self.0.update_accounts(accounts); + } + + fn delete_blob(&self, tx: TxHash) { + self.0.delete_blob(tx); + } + + fn delete_blobs(&self, txs: Vec) { + self.0.delete_blobs(txs); + } + + fn cleanup_blobs(&self) { + self.0.cleanup_blobs(); + } +} + +impl Clone for EvmSequencerPool +where + T: TransactionOrdering, +{ + fn clone(&self) -> Self { + Self(self.0.clone()) + } +}