remove indexer dep from sequencer:

This commit is contained in:
Petar Radovic 2026-04-30 11:13:50 +02:00
parent 9f6468682a
commit 265a269da0
11 changed files with 99 additions and 144 deletions

View File

@ -169,7 +169,6 @@ pub fn sequencer_config(
partial: SequencerPartialConfig,
home: PathBuf,
bedrock_addr: SocketAddr,
indexer_addr: SocketAddr,
initial_data: &InitialData,
) -> Result<SequencerConfig> {
let SequencerPartialConfig {
@ -197,8 +196,6 @@ pub fn sequencer_config(
.context("Failed to convert bedrock addr to URL")?,
auth: None,
},
indexer_rpc_url: addr_to_url(UrlProtocol::Ws, indexer_addr)
.context("Failed to convert indexer addr to URL")?,
})
}

View File

@ -77,14 +77,10 @@ impl TestContext {
.await
.context("Failed to setup Indexer")?;
let (sequencer_handle, temp_sequencer_dir) = setup_sequencer(
sequencer_partial_config,
bedrock_addr,
indexer_handle.addr(),
&initial_data,
)
.await
.context("Failed to setup Sequencer")?;
let (sequencer_handle, temp_sequencer_dir) =
setup_sequencer(sequencer_partial_config, bedrock_addr, &initial_data)
.await
.context("Failed to setup Sequencer")?;
let (wallet, temp_wallet_dir, wallet_password) =
setup_wallet(sequencer_handle.addr(), &initial_data)

View File

@ -119,7 +119,6 @@ pub(crate) async fn setup_indexer(
pub(crate) async fn setup_sequencer(
partial: config::SequencerPartialConfig,
bedrock_addr: SocketAddr,
indexer_addr: SocketAddr,
initial_data: &config::InitialData,
) -> Result<(SequencerHandle, TempDir)> {
let temp_sequencer_dir =
@ -134,7 +133,6 @@ pub(crate) async fn setup_sequencer(
partial,
temp_sequencer_dir.path().to_owned(),
bedrock_addr,
indexer_addr,
initial_data,
)
.context("Failed to create Sequencer config")?;

View File

@ -85,8 +85,6 @@ impl TestContextFFI {
.block_on(setup_sequencer(
sequencer_partial_config,
bedrock_addr,
// SAFETY: addr is valid if indexer_ffi is valid.
unsafe { indexer_ffi.addr() },
initial_data,
))
.context("Failed to setup Sequencer")?;

View File

@ -2,6 +2,7 @@ use std::{sync::Arc, time::Duration};
use anyhow::{Context as _, Result, anyhow};
use common::block::Block;
use log::warn;
pub use logos_blockchain_core::mantle::ops::channel::MsgId;
pub use logos_blockchain_key_management_system_service::keys::Ed25519Key;
pub use logos_blockchain_zone_sdk::sequencer::SequencerCheckpoint;
@ -9,6 +10,7 @@ use logos_blockchain_zone_sdk::{
CommonHttpClient,
adapter::NodeHttpClient,
sequencer::{Event, SequencerConfig as ZoneSdkSequencerConfig, SequencerHandle, ZoneSequencer},
state::InscriptionInfo,
};
use tokio::task::JoinHandle;
@ -18,6 +20,11 @@ use crate::config::BedrockConfig;
/// Caller is responsible for persistence (e.g. writing to rocksdb).
pub type CheckpointSink = Box<dyn Fn(SequencerCheckpoint) + Send + Sync + 'static>;
/// Sink for finalized L2 block ids derived from `Event::TxsFinalized` and
/// `Event::FinalizedInscriptions`. Caller is responsible for cleanup
/// (e.g. marking pending blocks as finalized in storage).
pub type FinalizedBlockSink = Box<dyn Fn(u64) + Send + Sync + 'static>;
#[expect(async_fn_in_trait, reason = "We don't care about Send/Sync here")]
pub trait BlockPublisherTrait: Clone {
async fn new(
@ -26,6 +33,7 @@ pub trait BlockPublisherTrait: Clone {
resubmit_interval: Duration,
initial_checkpoint: Option<SequencerCheckpoint>,
on_checkpoint: CheckpointSink,
on_finalized_block: FinalizedBlockSink,
) -> Result<Self>;
/// Fire-and-forget publish. Zone-sdk drives the actual submission and
@ -56,6 +64,7 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
resubmit_interval: Duration,
initial_checkpoint: Option<SequencerCheckpoint>,
on_checkpoint: CheckpointSink,
on_finalized_block: FinalizedBlockSink,
) -> Result<Self> {
let basic_auth = config.auth.clone().map(Into::into);
let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), config.node_url.clone());
@ -75,8 +84,18 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
let drive_task = tokio::spawn(async move {
loop {
if let Some(Event::Published { checkpoint, .. }) = sequencer.next_event().await {
on_checkpoint(checkpoint);
let Some(event) = sequencer.next_event().await else {
continue;
};
match event {
Event::Published { checkpoint, .. } => on_checkpoint(checkpoint),
Event::TxsFinalized { inscriptions, .. }
| Event::FinalizedInscriptions { inscriptions } => {
if let Some(max_block_id) = max_block_id_from_inscriptions(&inscriptions) {
on_finalized_block(max_block_id);
}
}
Event::ChannelUpdate { .. } | Event::Ready => {}
}
}
});
@ -98,3 +117,20 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
Ok(())
}
}
/// Deserialize each inscription payload as a `Block` and return the highest
/// `block_id`. Bad payloads are logged and skipped.
fn max_block_id_from_inscriptions(inscriptions: &[InscriptionInfo]) -> Option<u64> {
inscriptions
.iter()
.filter_map(
|inscription| match borsh::from_slice::<Block>(&inscription.payload) {
Ok(block) => Some(block.header.block_id),
Err(err) => {
warn!("Failed to deserialize finalized inscription as Block: {err:#}");
None
}
},
)
.max()
}

View File

@ -41,8 +41,6 @@ pub struct SequencerConfig {
pub signing_key: [u8; 32],
/// Bedrock configuration options.
pub bedrock_config: BedrockConfig,
/// Indexer RPC URL.
pub indexer_rpc_url: Url,
#[serde(skip_serializing_if = "Option::is_none")]
pub initial_public_accounts: Option<Vec<PublicAccountPublicInitialData>>,
#[serde(skip_serializing_if = "Option::is_none")]

View File

@ -21,31 +21,29 @@ use testnet_initial_state::initial_state;
use crate::{
block_publisher::{BlockPublisherTrait, ZoneSdkPublisher},
block_store::SequencerStore,
indexer_client::{IndexerClient, IndexerClientTrait},
};
pub mod block_publisher;
pub mod block_store;
pub mod config;
// Kept as a thin client lib for callers that want to query the indexer
// directly (e.g. integration tests). The sequencer no longer depends on the
// indexer at runtime — finalization comes from zone-sdk events.
pub mod indexer_client;
#[cfg(feature = "mock")]
pub mod mock;
pub struct SequencerCore<
BP: BlockPublisherTrait = ZoneSdkPublisher,
IC: IndexerClientTrait = IndexerClient,
> {
pub struct SequencerCore<BP: BlockPublisherTrait = ZoneSdkPublisher> {
state: nssa::V03State,
store: SequencerStore,
mempool: MemPool<NSSATransaction>,
sequencer_config: SequencerConfig,
chain_height: u64,
block_publisher: BP,
indexer_client: IC,
}
impl<BP: BlockPublisherTrait, IC: IndexerClientTrait> SequencerCore<BP, IC> {
impl<BP: BlockPublisherTrait> SequencerCore<BP> {
/// Starts the sequencer using the provided configuration.
/// If an existing database is found, the sequencer state is loaded from it and
/// assumed to represent the correct latest state consistent with Bedrock-finalized data.
@ -69,10 +67,6 @@ impl<BP: BlockPublisherTrait, IC: IndexerClientTrait> SequencerCore<BP, IC> {
load_or_create_signing_key(&config.home.join("bedrock_signing_key"))
.expect("Failed to load or create bedrock signing key");
let indexer_client = IC::new(&config.indexer_rpc_url)
.await
.expect("Failed to create Indexer Client");
// TODO: Remove msg_id from BlockMeta — it is no longer needed now that
// zone-sdk manages L1 settlement state via its own checkpoint.
let genesis_msg_id = [0_u8; 32];
@ -109,12 +103,20 @@ impl<BP: BlockPublisherTrait, IC: IndexerClientTrait> SequencerCore<BP, IC> {
}
});
let dbio_for_finalized = store.dbio();
let on_finalized_block: block_publisher::FinalizedBlockSink = Box::new(move |block_id| {
if let Err(err) = dbio_for_finalized.clean_pending_blocks_up_to(block_id) {
error!("Failed to mark pending blocks finalized up to {block_id}: {err:#}");
}
});
let block_publisher = BP::new(
&config.bedrock_config,
bedrock_signing_key,
config.retry_pending_blocks_timeout,
initial_checkpoint,
on_checkpoint,
on_finalized_block,
)
.await
.expect("Failed to initialize Block Publisher");
@ -192,7 +194,6 @@ impl<BP: BlockPublisherTrait, IC: IndexerClientTrait> SequencerCore<BP, IC> {
chain_height: latest_block_meta.id,
sequencer_config: config,
block_publisher,
indexer_client,
};
(sequencer_core, mempool_handle)
@ -341,22 +342,19 @@ impl<BP: BlockPublisherTrait, IC: IndexerClientTrait> SequencerCore<BP, IC> {
&self.sequencer_config
}
/// Deletes finalized blocks from the sequencer's pending block list.
/// This method must be called when new blocks are finalized on Bedrock.
/// All pending blocks with an ID less than or equal to `last_finalized_block_id`
/// are removed from the database.
pub fn clean_finalized_blocks_from_db(&mut self, last_finalized_block_id: u64) -> Result<()> {
self.get_pending_blocks()?
.iter()
.map(|block| block.header.block_id)
.min()
.map_or(Ok(()), |first_pending_block_id| {
info!("Clearing pending blocks up to id: {last_finalized_block_id}");
// TODO: Delete blocks instead of marking them as finalized.
// Current approach is used because we still have `GetBlockDataRequest`.
(first_pending_block_id..=last_finalized_block_id)
.try_for_each(|id| self.store.mark_block_as_finalized(id))
})
/// Marks all pending blocks with `block_id <= last_finalized_block_id` as
/// finalized. Idempotent. Production callers don't invoke this directly —
/// it's wired up in `start_from_config` to the publisher's
/// `on_finalized_block` sink, which fires on `Event::TxsFinalized` /
/// `Event::FinalizedInscriptions`. Kept on the type for tests.
// TODO: Delete blocks instead of marking them as finalized. Current
// approach is used because we still have `GetBlockDataRequest`.
pub fn clean_finalized_blocks_from_db(&self, last_finalized_block_id: u64) -> Result<()> {
info!("Clearing pending blocks up to id: {last_finalized_block_id}");
self.store
.dbio()
.clean_pending_blocks_up_to(last_finalized_block_id)?;
Ok(())
}
/// Returns the list of stored pending blocks.
@ -374,10 +372,6 @@ impl<BP: BlockPublisherTrait, IC: IndexerClientTrait> SequencerCore<BP, IC> {
self.block_publisher.clone()
}
pub fn indexer_client(&self) -> IC {
self.indexer_client.clone()
}
fn next_block_id(&self) -> u64 {
self.chain_height
.checked_add(1)
@ -446,7 +440,6 @@ mod tests {
auth: None,
},
retry_pending_blocks_timeout: Duration::from_mins(4),
indexer_rpc_url: "ws://localhost:8779".parse().unwrap(),
initial_public_accounts: None,
initial_private_accounts: None,
}

View File

@ -3,15 +3,15 @@ use std::time::Duration;
use anyhow::Result;
use common::block::Block;
use logos_blockchain_key_management_system_service::keys::Ed25519Key;
use url::Url;
use crate::{
block_publisher::{BlockPublisherTrait, CheckpointSink, SequencerCheckpoint},
block_publisher::{
BlockPublisherTrait, CheckpointSink, FinalizedBlockSink, SequencerCheckpoint,
},
config::BedrockConfig,
indexer_client::IndexerClientTrait,
};
pub type SequencerCoreWithMockClients = crate::SequencerCore<MockBlockPublisher, MockIndexerClient>;
pub type SequencerCoreWithMockClients = crate::SequencerCore<MockBlockPublisher>;
#[derive(Clone)]
pub struct MockBlockPublisher;
@ -23,6 +23,7 @@ impl BlockPublisherTrait for MockBlockPublisher {
_resubmit_interval: Duration,
_initial_checkpoint: Option<SequencerCheckpoint>,
_on_checkpoint: CheckpointSink,
_on_finalized_block: FinalizedBlockSink,
) -> Result<Self> {
Ok(Self)
}
@ -31,12 +32,3 @@ impl BlockPublisherTrait for MockBlockPublisher {
Ok(())
}
}
#[derive(Copy, Clone)]
pub struct MockIndexerClient;
impl IndexerClientTrait for MockIndexerClient {
async fn new(_indexer_url: &Url) -> Result<Self> {
Ok(Self)
}
}

View File

@ -5,8 +5,6 @@ use bytesize::ByteSize;
use common::transaction::NSSATransaction;
use futures::never::Never;
use jsonrpsee::server::ServerHandle;
#[cfg(not(feature = "standalone"))]
use log::warn;
use log::{error, info};
use mempool::MemPoolHandle;
#[cfg(not(feature = "standalone"))]
@ -29,7 +27,6 @@ pub struct SequencerHandle {
/// Option because of `Drop` which forbids to simply move out of `self` in `stopped()`.
server_handle: Option<ServerHandle>,
main_loop_handle: JoinHandle<Result<Never>>,
listen_for_bedrock_blocks_loop_handle: JoinHandle<Result<Never>>,
}
impl SequencerHandle {
@ -37,13 +34,11 @@ impl SequencerHandle {
addr: SocketAddr,
server_handle: ServerHandle,
main_loop_handle: JoinHandle<Result<Never>>,
listen_for_bedrock_blocks_loop_handle: JoinHandle<Result<Never>>,
) -> Self {
Self {
addr,
server_handle: Some(server_handle),
main_loop_handle,
listen_for_bedrock_blocks_loop_handle,
}
}
@ -57,7 +52,6 @@ impl SequencerHandle {
addr: _,
server_handle,
main_loop_handle,
listen_for_bedrock_blocks_loop_handle,
} = &mut self;
let server_handle = server_handle.take().expect("Server handle is set");
@ -71,11 +65,6 @@ impl SequencerHandle {
.context("Main loop task panicked")?
.context("Main loop exited unexpectedly")
}
res = listen_for_bedrock_blocks_loop_handle => {
res
.context("Listen for bedrock blocks loop task panicked")?
.context("Listen for bedrock blocks loop exited unexpectedly")
}
}
}
@ -89,12 +78,10 @@ impl SequencerHandle {
addr: _,
server_handle,
main_loop_handle,
listen_for_bedrock_blocks_loop_handle,
} = self;
let stopped = server_handle.as_ref().is_none_or(ServerHandle::is_stopped)
|| main_loop_handle.is_finished()
|| listen_for_bedrock_blocks_loop_handle.is_finished();
|| main_loop_handle.is_finished();
!stopped
}
@ -110,11 +97,9 @@ impl Drop for SequencerHandle {
addr: _,
server_handle,
main_loop_handle,
listen_for_bedrock_blocks_loop_handle,
} = self;
main_loop_handle.abort();
listen_for_bedrock_blocks_loop_handle.abort();
let Some(handle) = server_handle else {
return;
@ -146,18 +131,9 @@ pub async fn run(config: SequencerConfig, port: u16) -> Result<SequencerHandle>
info!("RPC server started");
info!("Starting main sequencer loop");
let main_loop_handle = tokio::spawn(main_loop(Arc::clone(&seq_core_wrapped), block_timeout));
let main_loop_handle = tokio::spawn(main_loop(seq_core_wrapped, block_timeout));
info!("Starting bedrock block listening loop");
let listen_for_bedrock_blocks_loop_handle =
tokio::spawn(listen_for_bedrock_blocks_loop(seq_core_wrapped));
Ok(SequencerHandle::new(
addr,
server_handle,
main_loop_handle,
listen_for_bedrock_blocks_loop_handle,
))
Ok(SequencerHandle::new(addr, server_handle, main_loop_handle))
}
async fn run_server(
@ -206,45 +182,3 @@ async fn main_loop(seq_core: Arc<Mutex<SequencerCore>>, block_timeout: Duration)
info!("Waiting for new transactions");
}
}
#[cfg(not(feature = "standalone"))]
async fn listen_for_bedrock_blocks_loop(seq_core: Arc<Mutex<SequencerCore>>) -> Result<Never> {
use indexer_service_rpc::RpcClient as _;
let indexer_client = seq_core.lock().await.indexer_client();
let retry_delay = Duration::from_secs(5);
loop {
// TODO: Subscribe from the first pending block ID?
let mut subscription = indexer_client
.subscribe_to_finalized_blocks()
.await
.context("Failed to subscribe to finalized blocks")?;
while let Some(block_id) = subscription.next().await {
let block_id = block_id.context("Failed to get next block from subscription")?;
info!("Received new L2 block with ID {block_id}");
seq_core
.lock()
.await
.clean_finalized_blocks_from_db(block_id)
.with_context(|| {
format!("Failed to clean finalized blocks from DB for block ID {block_id}")
})?;
}
warn!(
"Block subscription closed unexpectedly, reason: {:?}, retrying after {retry_delay:?}",
subscription.close_reason()
);
tokio::time::sleep(retry_delay).await;
}
}
#[cfg(feature = "standalone")]
async fn listen_for_bedrock_blocks_loop(_seq_core: Arc<Mutex<SequencerCore>>) -> Result<Never> {
std::future::pending::<Result<Never>>().await
}

View File

@ -8,10 +8,7 @@ use jsonrpsee::{
use log::warn;
use mempool::MemPoolHandle;
use nssa::{self, program::Program};
use sequencer_core::{
DbError, SequencerCore, block_publisher::BlockPublisherTrait,
indexer_client::IndexerClientTrait,
};
use sequencer_core::{DbError, SequencerCore, block_publisher::BlockPublisherTrait};
use sequencer_service_protocol::{
Account, AccountId, Block, BlockId, Commitment, HashType, MembershipProof, Nonce, ProgramId,
};
@ -19,15 +16,15 @@ use tokio::sync::Mutex;
const NOT_FOUND_ERROR_CODE: i32 = -31999;
pub struct SequencerService<BC: BlockPublisherTrait, IC: IndexerClientTrait> {
sequencer: Arc<Mutex<SequencerCore<BC, IC>>>,
pub struct SequencerService<BC: BlockPublisherTrait> {
sequencer: Arc<Mutex<SequencerCore<BC>>>,
mempool_handle: MemPoolHandle<NSSATransaction>,
max_block_size: u64,
}
impl<BC: BlockPublisherTrait, IC: IndexerClientTrait> SequencerService<BC, IC> {
impl<BC: BlockPublisherTrait> SequencerService<BC> {
pub const fn new(
sequencer: Arc<Mutex<SequencerCore<BC, IC>>>,
sequencer: Arc<Mutex<SequencerCore<BC>>>,
mempool_handle: MemPoolHandle<NSSATransaction>,
max_block_size: u64,
) -> Self {
@ -40,8 +37,8 @@ impl<BC: BlockPublisherTrait, IC: IndexerClientTrait> SequencerService<BC, IC> {
}
#[async_trait]
impl<BC: BlockPublisherTrait + Send + 'static, IC: IndexerClientTrait + Send + 'static>
sequencer_service_rpc::RpcServer for SequencerService<BC, IC>
impl<BC: BlockPublisherTrait + Send + 'static> sequencer_service_rpc::RpcServer
for SequencerService<BC>
{
async fn send_transaction(&self, tx: NSSATransaction) -> Result<HashType, ErrorObjectOwned> {
// Reserve ~200 bytes for block header overhead

View File

@ -287,6 +287,22 @@ impl RocksDBIO {
Ok(())
}
/// Mark every pending block with `block_id <= last_finalized` as finalized.
/// Idempotent — already-finalized blocks are skipped.
pub fn clean_pending_blocks_up_to(&self, last_finalized: u64) -> DbResult<()> {
let pending_ids: Vec<u64> = self
.get_all_blocks()
.filter_map(Result::ok)
.filter(|b| matches!(b.bedrock_status, BedrockStatus::Pending))
.map(|b| b.header.block_id)
.filter(|id| *id <= last_finalized)
.collect();
for id in pending_ids {
self.mark_block_as_finalized(id)?;
}
Ok(())
}
pub fn mark_block_as_finalized(&self, block_id: u64) -> DbResult<()> {
let mut block = self.get_block(block_id)?.ok_or_else(|| {
DbError::db_interaction_error(format!("Block with id {block_id} not found"))