From 265a269da058929c25ae36289955136eed12f12c Mon Sep 17 00:00:00 2001 From: Petar Radovic Date: Thu, 30 Apr 2026 11:13:50 +0200 Subject: [PATCH] remove indexer dep from sequencer: --- integration_tests/src/config.rs | 3 - integration_tests/src/lib.rs | 12 ++-- integration_tests/src/setup.rs | 2 - integration_tests/src/test_context_ffi.rs | 2 - sequencer/core/src/block_publisher.rs | 40 ++++++++++++- sequencer/core/src/config.rs | 2 - sequencer/core/src/lib.rs | 59 ++++++++----------- sequencer/core/src/mock.rs | 18 ++---- sequencer/service/src/lib.rs | 72 +---------------------- sequencer/service/src/service.rs | 17 +++--- storage/src/sequencer/mod.rs | 16 +++++ 11 files changed, 99 insertions(+), 144 deletions(-) diff --git a/integration_tests/src/config.rs b/integration_tests/src/config.rs index f2fbd5b3..733a9169 100644 --- a/integration_tests/src/config.rs +++ b/integration_tests/src/config.rs @@ -169,7 +169,6 @@ pub fn sequencer_config( partial: SequencerPartialConfig, home: PathBuf, bedrock_addr: SocketAddr, - indexer_addr: SocketAddr, initial_data: &InitialData, ) -> Result { let SequencerPartialConfig { @@ -197,8 +196,6 @@ pub fn sequencer_config( .context("Failed to convert bedrock addr to URL")?, auth: None, }, - indexer_rpc_url: addr_to_url(UrlProtocol::Ws, indexer_addr) - .context("Failed to convert indexer addr to URL")?, }) } diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index fcae2c71..47324e53 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -77,14 +77,10 @@ impl TestContext { .await .context("Failed to setup Indexer")?; - let (sequencer_handle, temp_sequencer_dir) = setup_sequencer( - sequencer_partial_config, - bedrock_addr, - indexer_handle.addr(), - &initial_data, - ) - .await - .context("Failed to setup Sequencer")?; + let (sequencer_handle, temp_sequencer_dir) = + setup_sequencer(sequencer_partial_config, bedrock_addr, &initial_data) + .await + .context("Failed to setup Sequencer")?; let (wallet, temp_wallet_dir, wallet_password) = setup_wallet(sequencer_handle.addr(), &initial_data) diff --git a/integration_tests/src/setup.rs b/integration_tests/src/setup.rs index 58b33c60..774c67e3 100644 --- a/integration_tests/src/setup.rs +++ b/integration_tests/src/setup.rs @@ -119,7 +119,6 @@ pub(crate) async fn setup_indexer( pub(crate) async fn setup_sequencer( partial: config::SequencerPartialConfig, bedrock_addr: SocketAddr, - indexer_addr: SocketAddr, initial_data: &config::InitialData, ) -> Result<(SequencerHandle, TempDir)> { let temp_sequencer_dir = @@ -134,7 +133,6 @@ pub(crate) async fn setup_sequencer( partial, temp_sequencer_dir.path().to_owned(), bedrock_addr, - indexer_addr, initial_data, ) .context("Failed to create Sequencer config")?; diff --git a/integration_tests/src/test_context_ffi.rs b/integration_tests/src/test_context_ffi.rs index 7d21aa28..77e5f0f2 100644 --- a/integration_tests/src/test_context_ffi.rs +++ b/integration_tests/src/test_context_ffi.rs @@ -85,8 +85,6 @@ impl TestContextFFI { .block_on(setup_sequencer( sequencer_partial_config, bedrock_addr, - // SAFETY: addr is valid if indexer_ffi is valid. - unsafe { indexer_ffi.addr() }, initial_data, )) .context("Failed to setup Sequencer")?; diff --git a/sequencer/core/src/block_publisher.rs b/sequencer/core/src/block_publisher.rs index 238c9553..9f4c8235 100644 --- a/sequencer/core/src/block_publisher.rs +++ b/sequencer/core/src/block_publisher.rs @@ -2,6 +2,7 @@ use std::{sync::Arc, time::Duration}; use anyhow::{Context as _, Result, anyhow}; use common::block::Block; +use log::warn; pub use logos_blockchain_core::mantle::ops::channel::MsgId; pub use logos_blockchain_key_management_system_service::keys::Ed25519Key; pub use logos_blockchain_zone_sdk::sequencer::SequencerCheckpoint; @@ -9,6 +10,7 @@ use logos_blockchain_zone_sdk::{ CommonHttpClient, adapter::NodeHttpClient, sequencer::{Event, SequencerConfig as ZoneSdkSequencerConfig, SequencerHandle, ZoneSequencer}, + state::InscriptionInfo, }; use tokio::task::JoinHandle; @@ -18,6 +20,11 @@ use crate::config::BedrockConfig; /// Caller is responsible for persistence (e.g. writing to rocksdb). pub type CheckpointSink = Box; +/// Sink for finalized L2 block ids derived from `Event::TxsFinalized` and +/// `Event::FinalizedInscriptions`. Caller is responsible for cleanup +/// (e.g. marking pending blocks as finalized in storage). +pub type FinalizedBlockSink = Box; + #[expect(async_fn_in_trait, reason = "We don't care about Send/Sync here")] pub trait BlockPublisherTrait: Clone { async fn new( @@ -26,6 +33,7 @@ pub trait BlockPublisherTrait: Clone { resubmit_interval: Duration, initial_checkpoint: Option, on_checkpoint: CheckpointSink, + on_finalized_block: FinalizedBlockSink, ) -> Result; /// Fire-and-forget publish. Zone-sdk drives the actual submission and @@ -56,6 +64,7 @@ impl BlockPublisherTrait for ZoneSdkPublisher { resubmit_interval: Duration, initial_checkpoint: Option, on_checkpoint: CheckpointSink, + on_finalized_block: FinalizedBlockSink, ) -> Result { let basic_auth = config.auth.clone().map(Into::into); let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), config.node_url.clone()); @@ -75,8 +84,18 @@ impl BlockPublisherTrait for ZoneSdkPublisher { let drive_task = tokio::spawn(async move { loop { - if let Some(Event::Published { checkpoint, .. }) = sequencer.next_event().await { - on_checkpoint(checkpoint); + let Some(event) = sequencer.next_event().await else { + continue; + }; + match event { + Event::Published { checkpoint, .. } => on_checkpoint(checkpoint), + Event::TxsFinalized { inscriptions, .. } + | Event::FinalizedInscriptions { inscriptions } => { + if let Some(max_block_id) = max_block_id_from_inscriptions(&inscriptions) { + on_finalized_block(max_block_id); + } + } + Event::ChannelUpdate { .. } | Event::Ready => {} } } }); @@ -98,3 +117,20 @@ impl BlockPublisherTrait for ZoneSdkPublisher { Ok(()) } } + +/// Deserialize each inscription payload as a `Block` and return the highest +/// `block_id`. Bad payloads are logged and skipped. +fn max_block_id_from_inscriptions(inscriptions: &[InscriptionInfo]) -> Option { + inscriptions + .iter() + .filter_map( + |inscription| match borsh::from_slice::(&inscription.payload) { + Ok(block) => Some(block.header.block_id), + Err(err) => { + warn!("Failed to deserialize finalized inscription as Block: {err:#}"); + None + } + }, + ) + .max() +} diff --git a/sequencer/core/src/config.rs b/sequencer/core/src/config.rs index fad2fef3..b33dd694 100644 --- a/sequencer/core/src/config.rs +++ b/sequencer/core/src/config.rs @@ -41,8 +41,6 @@ pub struct SequencerConfig { pub signing_key: [u8; 32], /// Bedrock configuration options. pub bedrock_config: BedrockConfig, - /// Indexer RPC URL. - pub indexer_rpc_url: Url, #[serde(skip_serializing_if = "Option::is_none")] pub initial_public_accounts: Option>, #[serde(skip_serializing_if = "Option::is_none")] diff --git a/sequencer/core/src/lib.rs b/sequencer/core/src/lib.rs index 5d1d00a3..e95bd139 100644 --- a/sequencer/core/src/lib.rs +++ b/sequencer/core/src/lib.rs @@ -21,31 +21,29 @@ use testnet_initial_state::initial_state; use crate::{ block_publisher::{BlockPublisherTrait, ZoneSdkPublisher}, block_store::SequencerStore, - indexer_client::{IndexerClient, IndexerClientTrait}, }; pub mod block_publisher; pub mod block_store; pub mod config; +// Kept as a thin client lib for callers that want to query the indexer +// directly (e.g. integration tests). The sequencer no longer depends on the +// indexer at runtime — finalization comes from zone-sdk events. pub mod indexer_client; #[cfg(feature = "mock")] pub mod mock; -pub struct SequencerCore< - BP: BlockPublisherTrait = ZoneSdkPublisher, - IC: IndexerClientTrait = IndexerClient, -> { +pub struct SequencerCore { state: nssa::V03State, store: SequencerStore, mempool: MemPool, sequencer_config: SequencerConfig, chain_height: u64, block_publisher: BP, - indexer_client: IC, } -impl SequencerCore { +impl SequencerCore { /// Starts the sequencer using the provided configuration. /// If an existing database is found, the sequencer state is loaded from it and /// assumed to represent the correct latest state consistent with Bedrock-finalized data. @@ -69,10 +67,6 @@ impl SequencerCore { load_or_create_signing_key(&config.home.join("bedrock_signing_key")) .expect("Failed to load or create bedrock signing key"); - let indexer_client = IC::new(&config.indexer_rpc_url) - .await - .expect("Failed to create Indexer Client"); - // TODO: Remove msg_id from BlockMeta — it is no longer needed now that // zone-sdk manages L1 settlement state via its own checkpoint. let genesis_msg_id = [0_u8; 32]; @@ -109,12 +103,20 @@ impl SequencerCore { } }); + let dbio_for_finalized = store.dbio(); + let on_finalized_block: block_publisher::FinalizedBlockSink = Box::new(move |block_id| { + if let Err(err) = dbio_for_finalized.clean_pending_blocks_up_to(block_id) { + error!("Failed to mark pending blocks finalized up to {block_id}: {err:#}"); + } + }); + let block_publisher = BP::new( &config.bedrock_config, bedrock_signing_key, config.retry_pending_blocks_timeout, initial_checkpoint, on_checkpoint, + on_finalized_block, ) .await .expect("Failed to initialize Block Publisher"); @@ -192,7 +194,6 @@ impl SequencerCore { chain_height: latest_block_meta.id, sequencer_config: config, block_publisher, - indexer_client, }; (sequencer_core, mempool_handle) @@ -341,22 +342,19 @@ impl SequencerCore { &self.sequencer_config } - /// Deletes finalized blocks from the sequencer's pending block list. - /// This method must be called when new blocks are finalized on Bedrock. - /// All pending blocks with an ID less than or equal to `last_finalized_block_id` - /// are removed from the database. - pub fn clean_finalized_blocks_from_db(&mut self, last_finalized_block_id: u64) -> Result<()> { - self.get_pending_blocks()? - .iter() - .map(|block| block.header.block_id) - .min() - .map_or(Ok(()), |first_pending_block_id| { - info!("Clearing pending blocks up to id: {last_finalized_block_id}"); - // TODO: Delete blocks instead of marking them as finalized. - // Current approach is used because we still have `GetBlockDataRequest`. - (first_pending_block_id..=last_finalized_block_id) - .try_for_each(|id| self.store.mark_block_as_finalized(id)) - }) + /// Marks all pending blocks with `block_id <= last_finalized_block_id` as + /// finalized. Idempotent. Production callers don't invoke this directly — + /// it's wired up in `start_from_config` to the publisher's + /// `on_finalized_block` sink, which fires on `Event::TxsFinalized` / + /// `Event::FinalizedInscriptions`. Kept on the type for tests. + // TODO: Delete blocks instead of marking them as finalized. Current + // approach is used because we still have `GetBlockDataRequest`. + pub fn clean_finalized_blocks_from_db(&self, last_finalized_block_id: u64) -> Result<()> { + info!("Clearing pending blocks up to id: {last_finalized_block_id}"); + self.store + .dbio() + .clean_pending_blocks_up_to(last_finalized_block_id)?; + Ok(()) } /// Returns the list of stored pending blocks. @@ -374,10 +372,6 @@ impl SequencerCore { self.block_publisher.clone() } - pub fn indexer_client(&self) -> IC { - self.indexer_client.clone() - } - fn next_block_id(&self) -> u64 { self.chain_height .checked_add(1) @@ -446,7 +440,6 @@ mod tests { auth: None, }, retry_pending_blocks_timeout: Duration::from_mins(4), - indexer_rpc_url: "ws://localhost:8779".parse().unwrap(), initial_public_accounts: None, initial_private_accounts: None, } diff --git a/sequencer/core/src/mock.rs b/sequencer/core/src/mock.rs index df560938..ebe6ea5d 100644 --- a/sequencer/core/src/mock.rs +++ b/sequencer/core/src/mock.rs @@ -3,15 +3,15 @@ use std::time::Duration; use anyhow::Result; use common::block::Block; use logos_blockchain_key_management_system_service::keys::Ed25519Key; -use url::Url; use crate::{ - block_publisher::{BlockPublisherTrait, CheckpointSink, SequencerCheckpoint}, + block_publisher::{ + BlockPublisherTrait, CheckpointSink, FinalizedBlockSink, SequencerCheckpoint, + }, config::BedrockConfig, - indexer_client::IndexerClientTrait, }; -pub type SequencerCoreWithMockClients = crate::SequencerCore; +pub type SequencerCoreWithMockClients = crate::SequencerCore; #[derive(Clone)] pub struct MockBlockPublisher; @@ -23,6 +23,7 @@ impl BlockPublisherTrait for MockBlockPublisher { _resubmit_interval: Duration, _initial_checkpoint: Option, _on_checkpoint: CheckpointSink, + _on_finalized_block: FinalizedBlockSink, ) -> Result { Ok(Self) } @@ -31,12 +32,3 @@ impl BlockPublisherTrait for MockBlockPublisher { Ok(()) } } - -#[derive(Copy, Clone)] -pub struct MockIndexerClient; - -impl IndexerClientTrait for MockIndexerClient { - async fn new(_indexer_url: &Url) -> Result { - Ok(Self) - } -} diff --git a/sequencer/service/src/lib.rs b/sequencer/service/src/lib.rs index 7c6de793..319b75ad 100644 --- a/sequencer/service/src/lib.rs +++ b/sequencer/service/src/lib.rs @@ -5,8 +5,6 @@ use bytesize::ByteSize; use common::transaction::NSSATransaction; use futures::never::Never; use jsonrpsee::server::ServerHandle; -#[cfg(not(feature = "standalone"))] -use log::warn; use log::{error, info}; use mempool::MemPoolHandle; #[cfg(not(feature = "standalone"))] @@ -29,7 +27,6 @@ pub struct SequencerHandle { /// Option because of `Drop` which forbids to simply move out of `self` in `stopped()`. server_handle: Option, main_loop_handle: JoinHandle>, - listen_for_bedrock_blocks_loop_handle: JoinHandle>, } impl SequencerHandle { @@ -37,13 +34,11 @@ impl SequencerHandle { addr: SocketAddr, server_handle: ServerHandle, main_loop_handle: JoinHandle>, - listen_for_bedrock_blocks_loop_handle: JoinHandle>, ) -> Self { Self { addr, server_handle: Some(server_handle), main_loop_handle, - listen_for_bedrock_blocks_loop_handle, } } @@ -57,7 +52,6 @@ impl SequencerHandle { addr: _, server_handle, main_loop_handle, - listen_for_bedrock_blocks_loop_handle, } = &mut self; let server_handle = server_handle.take().expect("Server handle is set"); @@ -71,11 +65,6 @@ impl SequencerHandle { .context("Main loop task panicked")? .context("Main loop exited unexpectedly") } - res = listen_for_bedrock_blocks_loop_handle => { - res - .context("Listen for bedrock blocks loop task panicked")? - .context("Listen for bedrock blocks loop exited unexpectedly") - } } } @@ -89,12 +78,10 @@ impl SequencerHandle { addr: _, server_handle, main_loop_handle, - listen_for_bedrock_blocks_loop_handle, } = self; let stopped = server_handle.as_ref().is_none_or(ServerHandle::is_stopped) - || main_loop_handle.is_finished() - || listen_for_bedrock_blocks_loop_handle.is_finished(); + || main_loop_handle.is_finished(); !stopped } @@ -110,11 +97,9 @@ impl Drop for SequencerHandle { addr: _, server_handle, main_loop_handle, - listen_for_bedrock_blocks_loop_handle, } = self; main_loop_handle.abort(); - listen_for_bedrock_blocks_loop_handle.abort(); let Some(handle) = server_handle else { return; @@ -146,18 +131,9 @@ pub async fn run(config: SequencerConfig, port: u16) -> Result info!("RPC server started"); info!("Starting main sequencer loop"); - let main_loop_handle = tokio::spawn(main_loop(Arc::clone(&seq_core_wrapped), block_timeout)); + let main_loop_handle = tokio::spawn(main_loop(seq_core_wrapped, block_timeout)); - info!("Starting bedrock block listening loop"); - let listen_for_bedrock_blocks_loop_handle = - tokio::spawn(listen_for_bedrock_blocks_loop(seq_core_wrapped)); - - Ok(SequencerHandle::new( - addr, - server_handle, - main_loop_handle, - listen_for_bedrock_blocks_loop_handle, - )) + Ok(SequencerHandle::new(addr, server_handle, main_loop_handle)) } async fn run_server( @@ -206,45 +182,3 @@ async fn main_loop(seq_core: Arc>, block_timeout: Duration) info!("Waiting for new transactions"); } } - -#[cfg(not(feature = "standalone"))] -async fn listen_for_bedrock_blocks_loop(seq_core: Arc>) -> Result { - use indexer_service_rpc::RpcClient as _; - - let indexer_client = seq_core.lock().await.indexer_client(); - - let retry_delay = Duration::from_secs(5); - - loop { - // TODO: Subscribe from the first pending block ID? - let mut subscription = indexer_client - .subscribe_to_finalized_blocks() - .await - .context("Failed to subscribe to finalized blocks")?; - - while let Some(block_id) = subscription.next().await { - let block_id = block_id.context("Failed to get next block from subscription")?; - - info!("Received new L2 block with ID {block_id}"); - - seq_core - .lock() - .await - .clean_finalized_blocks_from_db(block_id) - .with_context(|| { - format!("Failed to clean finalized blocks from DB for block ID {block_id}") - })?; - } - - warn!( - "Block subscription closed unexpectedly, reason: {:?}, retrying after {retry_delay:?}", - subscription.close_reason() - ); - tokio::time::sleep(retry_delay).await; - } -} - -#[cfg(feature = "standalone")] -async fn listen_for_bedrock_blocks_loop(_seq_core: Arc>) -> Result { - std::future::pending::>().await -} diff --git a/sequencer/service/src/service.rs b/sequencer/service/src/service.rs index de70c72c..0bb8e1dd 100644 --- a/sequencer/service/src/service.rs +++ b/sequencer/service/src/service.rs @@ -8,10 +8,7 @@ use jsonrpsee::{ use log::warn; use mempool::MemPoolHandle; use nssa::{self, program::Program}; -use sequencer_core::{ - DbError, SequencerCore, block_publisher::BlockPublisherTrait, - indexer_client::IndexerClientTrait, -}; +use sequencer_core::{DbError, SequencerCore, block_publisher::BlockPublisherTrait}; use sequencer_service_protocol::{ Account, AccountId, Block, BlockId, Commitment, HashType, MembershipProof, Nonce, ProgramId, }; @@ -19,15 +16,15 @@ use tokio::sync::Mutex; const NOT_FOUND_ERROR_CODE: i32 = -31999; -pub struct SequencerService { - sequencer: Arc>>, +pub struct SequencerService { + sequencer: Arc>>, mempool_handle: MemPoolHandle, max_block_size: u64, } -impl SequencerService { +impl SequencerService { pub const fn new( - sequencer: Arc>>, + sequencer: Arc>>, mempool_handle: MemPoolHandle, max_block_size: u64, ) -> Self { @@ -40,8 +37,8 @@ impl SequencerService { } #[async_trait] -impl - sequencer_service_rpc::RpcServer for SequencerService +impl sequencer_service_rpc::RpcServer + for SequencerService { async fn send_transaction(&self, tx: NSSATransaction) -> Result { // Reserve ~200 bytes for block header overhead diff --git a/storage/src/sequencer/mod.rs b/storage/src/sequencer/mod.rs index 71a80796..537d198d 100644 --- a/storage/src/sequencer/mod.rs +++ b/storage/src/sequencer/mod.rs @@ -287,6 +287,22 @@ impl RocksDBIO { Ok(()) } + /// Mark every pending block with `block_id <= last_finalized` as finalized. + /// Idempotent — already-finalized blocks are skipped. + pub fn clean_pending_blocks_up_to(&self, last_finalized: u64) -> DbResult<()> { + let pending_ids: Vec = self + .get_all_blocks() + .filter_map(Result::ok) + .filter(|b| matches!(b.bedrock_status, BedrockStatus::Pending)) + .map(|b| b.header.block_id) + .filter(|id| *id <= last_finalized) + .collect(); + for id in pending_ids { + self.mark_block_as_finalized(id)?; + } + Ok(()) + } + pub fn mark_block_as_finalized(&self, block_id: u64) -> DbResult<()> { let mut block = self.get_block(block_id)?.ok_or_else(|| { DbError::db_interaction_error(format!("Block with id {block_id} not found"))