Implement custom extension to listen to chain events

This commit is contained in:
Antonio Antonino 2025-04-09 16:22:26 +02:00
parent 3c937cddc9
commit 894249fa4d
No known key found for this signature in database
GPG Key ID: 70CC1DF6BCF7E76D
7 changed files with 48 additions and 408 deletions

View File

@ -1,5 +1,5 @@
[workspace]
members = ["evm/sequencer-node", "evm/sequencer-mempool"]
members = ["evm/sequencer-node"]
resolver = "3"
[workspace.package]
@ -8,14 +8,10 @@ edition = "2024"
[workspace.dependencies]
# Internal
evm-sequencer-node = { path = "evm/sequencer-node" }
evm-sequencer-mempool = { path = "evm/sequencer-mempool" }
# External
clap = { version = "4.5" }
eyre = { version = "0.6" }
futures-util = { version = "0.3" }
jsonrpsee = { version = "0.24" }
futures = { version = "0.3" }
reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.8" }
reth-cli-commands = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.8" }
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.8" }
tokio = { version = "1.0" }
reth-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.8" }
reth-tracing = { git = "https://github.com/paradigmxyz/reth", tag = "v1.3.8" }

View File

@ -1,5 +0,0 @@
[package]
name = "evm-sequencer-batcher"
edition = { workspace = true }
[dependencies]

View File

@ -1,9 +0,0 @@
[package]
name = "evm-sequencer-mempool"
edition = { workspace = true }
[dependencies]
eyre = { workspace = true }
reth = { workspace = true }
reth-node-ethereum = { workspace = true }
tokio = { workspace = true }

View File

@ -1,365 +0,0 @@
use std::{collections::HashSet, sync::Arc};
use tokio::sync::mpsc::Receiver;
use reth::{
api::{Block, FullNodeTypes, NodeTypes},
builder::{BuilderContext, components::PoolBuilder},
chainspec::ChainSpec,
network::types::HandleMempoolData,
primitives::{EthPrimitives, Recovered},
providers::ChangedAccount,
revm::primitives::{Address, B256, alloy_primitives::TxHash},
rpc::types::{BlobTransactionSidecar, engine::BlobAndProofV1},
transaction_pool::{
AllPoolTransactions, AllTransactionsEvents, BestTransactions, BestTransactionsAttributes,
BlobStore, BlobStoreError, BlockInfo, CanonicalStateUpdate, CoinbaseTipOrdering,
EthPoolTransaction, EthPooledTransaction, EthTransactionValidator,
GetPooledTransactionLimit, NewBlobSidecar, NewTransactionEvent, Pool, PoolResult, PoolSize,
PoolTransaction, PropagatedTransactions, TransactionEvents, TransactionListenerKind,
TransactionOrdering, TransactionOrigin, TransactionPool, TransactionPoolExt,
TransactionValidationTaskExecutor, TransactionValidator, ValidPoolTransaction,
blobstore::DiskFileBlobStore,
},
};
use reth_node_ethereum::node::EthereumPoolBuilder;
#[derive(Default)]
pub struct EvmSequencerMempoolBuilder(EthereumPoolBuilder);
impl<Types, Node> PoolBuilder<Node> for EvmSequencerMempoolBuilder
where
Types: NodeTypes<ChainSpec = ChainSpec, Primitives = EthPrimitives>,
Node: FullNodeTypes<Types = Types>,
{
type Pool = EvmSequencerMempool<Node::Provider, DiskFileBlobStore>;
async fn build_pool(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
let pool = self.0.build_pool(ctx).await?;
Ok(EvmSequencerPool(pool))
}
}
pub type EvmSequencerMempool<Client, S> = EvmSequencerPool<
TransactionValidationTaskExecutor<EthTransactionValidator<Client, EthPooledTransaction>>,
CoinbaseTipOrdering<EthPooledTransaction>,
S,
>;
#[derive(Debug)]
pub struct EvmSequencerPool<V, T, S>(Pool<V, T, S>)
where
T: TransactionOrdering;
impl<V, T, S> TransactionPool for EvmSequencerPool<V, T, S>
where
V: TransactionValidator,
<V as TransactionValidator>::Transaction: EthPoolTransaction,
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
S: BlobStore,
{
type Transaction = T::Transaction;
fn pool_size(&self) -> PoolSize {
self.0.pool_size()
}
fn block_info(&self) -> BlockInfo {
self.0.block_info()
}
fn add_transaction_and_subscribe(
&self,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> impl Future<Output = PoolResult<TransactionEvents>> + Send {
self.0.add_transaction_and_subscribe(origin, transaction)
}
fn add_transaction(
&self,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> impl Future<Output = PoolResult<TxHash>> + Send {
self.0.add_transaction(origin, transaction)
}
fn add_transactions(
&self,
origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
) -> impl Future<Output = Vec<PoolResult<TxHash>>> + Send {
self.0.add_transactions(origin, transactions)
}
fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
self.0.transaction_event_listener(tx_hash)
}
fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction> {
self.0.all_transactions_event_listener()
}
fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash> {
self.0.pending_transactions_listener_for(kind)
}
fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar> {
self.0.blob_transaction_sidecars_listener()
}
fn new_transactions_listener_for(
&self,
kind: TransactionListenerKind,
) -> Receiver<NewTransactionEvent<Self::Transaction>> {
self.0.new_transactions_listener_for(kind)
}
fn pooled_transaction_hashes(&self) -> Vec<TxHash> {
self.0.pooled_transaction_hashes()
}
fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<TxHash> {
self.0.pooled_transaction_hashes_max(max)
}
fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.pooled_transactions()
}
fn pooled_transactions_max(
&self,
max: usize,
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.pooled_transactions_max(max)
}
fn get_pooled_transaction_elements(
&self,
tx_hashes: Vec<TxHash>,
limit: GetPooledTransactionLimit,
) -> Vec<<Self::Transaction as PoolTransaction>::Pooled> {
self.0.get_pooled_transaction_elements(tx_hashes, limit)
}
fn get_pooled_transaction_element(
&self,
tx_hash: TxHash,
) -> Option<Recovered<<Self::Transaction as PoolTransaction>::Pooled>> {
self.0.get_pooled_transaction_element(tx_hash)
}
fn best_transactions(
&self,
) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
self.0.best_transactions()
}
fn best_transactions_with_attributes(
&self,
best_transactions_attributes: BestTransactionsAttributes,
) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
self.0
.best_transactions_with_attributes(best_transactions_attributes)
}
fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.pending_transactions()
}
fn pending_transactions_max(
&self,
max: usize,
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.pending_transactions_max(max)
}
fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.queued_transactions()
}
fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction> {
self.0.all_transactions()
}
fn remove_transactions(
&self,
hashes: Vec<TxHash>,
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.remove_transactions(hashes)
}
fn remove_transactions_and_descendants(
&self,
hashes: Vec<TxHash>,
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.remove_transactions_and_descendants(hashes)
}
fn remove_transactions_by_sender(
&self,
sender: Address,
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.remove_transactions_by_sender(sender)
}
fn retain_unknown<A>(&self, announcement: &mut A)
where
A: HandleMempoolData,
{
self.0.retain_unknown(announcement);
}
fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.get(tx_hash)
}
fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.get_all(txs)
}
fn on_propagated(&self, txs: PropagatedTransactions) {
self.0.on_propagated(txs);
}
fn get_transactions_by_sender(
&self,
sender: Address,
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.get_transactions_by_sender(sender)
}
fn get_pending_transactions_with_predicate(
&self,
predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.get_pending_transactions_with_predicate(predicate)
}
fn get_pending_transactions_by_sender(
&self,
sender: Address,
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.get_pending_transactions_by_sender(sender)
}
fn get_queued_transactions_by_sender(
&self,
sender: Address,
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.get_queued_transactions_by_sender(sender)
}
fn get_highest_transaction_by_sender(
&self,
sender: Address,
) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.get_highest_transaction_by_sender(sender)
}
fn get_highest_consecutive_transaction_by_sender(
&self,
sender: Address,
on_chain_nonce: u64,
) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0
.get_highest_consecutive_transaction_by_sender(sender, on_chain_nonce)
}
fn get_transaction_by_sender_and_nonce(
&self,
sender: Address,
nonce: u64,
) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.get_transaction_by_sender_and_nonce(sender, nonce)
}
fn get_transactions_by_origin(
&self,
origin: TransactionOrigin,
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.get_transactions_by_origin(origin)
}
fn get_pending_transactions_by_origin(
&self,
origin: TransactionOrigin,
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
self.0.get_pending_transactions_by_origin(origin)
}
fn unique_senders(&self) -> HashSet<Address> {
self.0.unique_senders()
}
fn get_blob(
&self,
tx_hash: TxHash,
) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.0.get_blob(tx_hash)
}
fn get_all_blobs(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
self.0.get_all_blobs(tx_hashes)
}
fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.0.get_all_blobs_exact(tx_hashes)
}
fn get_blobs_for_versioned_hashes(
&self,
versioned_hashes: &[B256],
) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
self.0.get_blobs_for_versioned_hashes(versioned_hashes)
}
}
impl<V, T, S> TransactionPoolExt for EvmSequencerPool<V, T, S>
where
V: TransactionValidator,
<V as TransactionValidator>::Transaction: EthPoolTransaction,
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
S: BlobStore,
{
fn set_block_info(&self, info: BlockInfo) {
self.0.set_block_info(info);
}
fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
where
B: Block,
{
self.0.on_canonical_state_change(update);
}
fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
self.0.update_accounts(accounts);
}
fn delete_blob(&self, tx: TxHash) {
self.0.delete_blob(tx);
}
fn delete_blobs(&self, txs: Vec<TxHash>) {
self.0.delete_blobs(txs);
}
fn cleanup_blobs(&self) {
self.0.cleanup_blobs();
}
}
impl<V, T, S> Clone for EvmSequencerPool<V, T, S>
where
T: TransactionOrdering,
{
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

View File

@ -3,11 +3,8 @@ name = "evm-sequencer-node"
edition = { workspace = true }
[dependencies]
evm-sequencer-mempool = { workspace = true }
clap = { workspace = true }
futures-util = { workspace = true }
jsonrpsee = { workspace = true }
eyre = { workspace = true }
futures = { workspace = true }
reth = { workspace = true }
reth-cli-commands = { workspace = true }
reth-node-ethereum = { workspace = true }
reth-ethereum = { workspace = true, features = ["full"] }
reth-tracing = { workspace = true }

View File

@ -1,26 +1,52 @@
use futures::TryStreamExt as _;
use reth::cli::Cli;
use reth_node_ethereum::node::{EthereumAddOns, EthereumNode};
use reth_ethereum::{
exex::{ExExContext, ExExEvent, ExExNotification},
node::{EthereumNode, api::FullNodeComponents},
};
use reth_tracing::tracing::info;
use evm_sequencer_mempool::EvmSequencerMempoolBuilder;
async fn push_block_to_da<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.try_next().await? {
match &notification {
ExExNotification::ChainCommitted { new } => {
// TODO: Push range of finalized blocks to DA, and interact with prover to generate a proof over the range.
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
};
fn main() {
Cli::parse_args()
.run(|builder, _| async move {
// launch the node
if let Some(committed_chain) = notification.committed_chain() {
ctx.events
.send(ExExEvent::FinishedHeight(committed_chain.tip().num_hash()))
.unwrap();
}
}
Ok(())
}
fn main() -> eyre::Result<()> {
Cli::parse_args().run(|builder, _| {
Box::pin(async move {
let handle = Box::pin(
builder
// use the default ethereum node types
.with_types::<EthereumNode>()
// use default ethereum components but use our custom pool
.with_components(
EthereumNode::components().pool(EvmSequencerMempoolBuilder::default()),
)
.with_add_ons(EthereumAddOns::default())
.node(EthereumNode::default())
.install_exex("push-block-to-da", async move |ctx| {
Ok(push_block_to_da(ctx))
})
.launch(),
)
.await?;
handle.wait_for_node_exit().await
})
.unwrap();
})
}