From fe047e169c39e3a7490ae8c9df9583e3810663aa Mon Sep 17 00:00:00 2001 From: Daniil Polyakov Date: Wed, 13 May 2026 23:39:48 +0300 Subject: [PATCH] refactor: move database not found error matching from sequencer to storage --- Cargo.lock | 1 + indexer/core/src/block_store.rs | 4 +- sequencer/core/Cargo.toml | 1 + sequencer/core/src/block_store.rs | 78 +++++++++++++++++--------- sequencer/core/src/lib.rs | 92 ++++++++++++++++--------------- sequencer/service/src/service.rs | 4 +- 6 files changed, 106 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f5e766b..9e6788b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8438,6 +8438,7 @@ dependencies = [ "tempfile", "test_program_methods", "testnet_initial_state", + "thiserror 2.0.18", "tokio", "url", "vault_core", diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index b66b778f..5d80245f 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -8,7 +8,7 @@ use common::{ use log::info; use logos_blockchain_core::{header::HeaderId, mantle::ops::channel::MsgId}; use logos_blockchain_zone_sdk::Slot; -use nssa::{Account, AccountId, V03State}; +use nssa::{Account, AccountId, GENESIS_BLOCK_ID, V03State}; use nssa_core::BlockId; use storage::indexer::RocksDBIO; use tokio::sync::RwLock; @@ -150,7 +150,7 @@ impl IndexerStore { "Last transaction in block must be the clock invocation for the block timestamp" ); - let is_genesis = block.header.block_id == 1; + let is_genesis = block.header.block_id == GENESIS_BLOCK_ID; for transaction in user_txs { if is_genesis { let genesis_tx = match transaction { diff --git a/sequencer/core/Cargo.toml b/sequencer/core/Cargo.toml index da88d481..9729d209 100644 --- a/sequencer/core/Cargo.toml +++ b/sequencer/core/Cargo.toml @@ -17,6 +17,7 @@ logos-blockchain-zone-sdk.workspace = true testnet_initial_state.workspace = true vault_core.workspace = true +thiserror.workspace = true anyhow.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/sequencer/core/src/block_store.rs b/sequencer/core/src/block_store.rs index ada6d306..c7192285 100644 --- a/sequencer/core/src/block_store.rs +++ b/sequencer/core/src/block_store.rs @@ -1,17 +1,31 @@ use std::{collections::HashMap, path::Path, sync::Arc}; -use anyhow::{Context as _, Result}; use common::{ HashType, block::{Block, BlockMeta, MantleMsgId}, transaction::NSSATransaction, }; -use log::info; use logos_blockchain_zone_sdk::sequencer::SequencerCheckpoint; use nssa::V03State; -pub use storage::DbResult; use storage::sequencer::RocksDBIO; +#[derive(Debug, thiserror::Error)] +pub enum SequencerStoreError { + #[error("Database internal error")] + DatabaseInternal(#[from] storage::error::DbError), + + #[error("Database not found at the specified location")] + DatabaseNotFound, + + #[error("Zone SDK checkpoint serialization error")] + ZoneSdkCheckpointSerialization(#[source] serde_json::Error), + + #[error("Zone SDK checkpoint deserialization error")] + ZoneSdkCheckpointDeserialization(#[source] serde_json::Error), +} + +pub type Result = std::result::Result; + pub struct SequencerStore { dbio: Arc, // TODO: Consider adding the hashmap to the database for faster recovery. @@ -22,12 +36,22 @@ pub struct SequencerStore { impl SequencerStore { /// Open existing database at the given location. Fails if no database is found. - pub fn open_db(location: &Path, signing_key: nssa::PrivateKey) -> DbResult { - let dbio = Arc::new(RocksDBIO::open(location)?); + pub fn open_db(location: &Path, signing_key: nssa::PrivateKey) -> Result { + let dbio = match RocksDBIO::open(location) { + Ok(dbio) => dbio, + Err(storage::error::DbError::RocksDbError { error, .. }) + if error.kind() == rocksdb::ErrorKind::InvalidArgument + && error.to_string().contains("does not exist") => + { + return Err(SequencerStoreError::DatabaseNotFound); + } + Err(e) => return Err(e.into()), + }; + + let dbio = Arc::new(dbio); let genesis_id = dbio.get_meta_first_block_in_db()?; let last_id = dbio.latest_block_meta()?.id; - info!("Preparing block cache"); let mut tx_hash_to_block_map = HashMap::new(); for i in genesis_id..=last_id { let block = dbio @@ -36,10 +60,6 @@ impl SequencerStore { tx_hash_to_block_map.extend(block_to_transactions_map(&block)); } - info!( - "Block cache prepared. Total blocks in cache: {}", - tx_hash_to_block_map.len() - ); Ok(Self { dbio, @@ -59,7 +79,7 @@ impl SequencerStore { genesis_msg_id: MantleMsgId, genesis_state: &V03State, signing_key: nssa::PrivateKey, - ) -> DbResult { + ) -> Result { let dbio = Arc::new(RocksDBIO::create( location, genesis_block, @@ -85,16 +105,18 @@ impl SequencerStore { Arc::clone(&self.dbio) } - pub fn get_block_at_id(&self, id: u64) -> DbResult> { - self.dbio.get_block(id) + pub fn get_block_at_id(&self, id: u64) -> Result> { + self.dbio.get_block(id).map_err(Into::into) } - pub fn delete_block_at_id(&mut self, block_id: u64) -> DbResult<()> { - self.dbio.delete_block(block_id) + pub fn delete_block_at_id(&mut self, block_id: u64) -> Result<()> { + self.dbio.delete_block(block_id).map_err(Into::into) } - pub fn mark_block_as_finalized(&mut self, block_id: u64) -> DbResult<()> { - self.dbio.mark_block_as_finalized(block_id) + pub fn mark_block_as_finalized(&mut self, block_id: u64) -> Result<()> { + self.dbio + .mark_block_as_finalized(block_id) + .map_err(Into::into) } /// Returns the transaction corresponding to the given hash, if it exists in the blockchain. @@ -116,8 +138,8 @@ impl SequencerStore { ); } - pub fn latest_block_meta(&self) -> DbResult { - self.dbio.latest_block_meta() + pub fn latest_block_meta(&self) -> Result { + self.dbio.latest_block_meta().map_err(Into::into) } #[must_use] @@ -130,8 +152,10 @@ impl SequencerStore { &self.signing_key } - pub fn get_all_blocks(&self) -> impl Iterator> { - self.dbio.get_all_blocks() + pub fn get_all_blocks(&self) -> impl Iterator> { + self.dbio + .get_all_blocks() + .map(|res| res.map_err(Into::into)) } pub(crate) fn update( @@ -139,15 +163,15 @@ impl SequencerStore { block: &Block, msg_id: MantleMsgId, state: &V03State, - ) -> DbResult<()> { + ) -> Result<()> { let new_transactions_map = block_to_transactions_map(block); self.dbio.atomic_update(block, msg_id, state)?; self.tx_hash_to_block_map.extend(new_transactions_map); Ok(()) } - pub fn get_nssa_state(&self) -> DbResult { - self.dbio.get_nssa_state() + pub fn get_nssa_state(&self) -> Result { + self.dbio.get_nssa_state().map_err(Into::into) } pub fn get_zone_checkpoint(&self) -> Result> { @@ -155,13 +179,13 @@ impl SequencerStore { return Ok(None); }; let checkpoint: SequencerCheckpoint = serde_json::from_slice(&bytes) - .context("Failed to deserialize stored zone-sdk checkpoint")?; + .map_err(SequencerStoreError::ZoneSdkCheckpointDeserialization)?; Ok(Some(checkpoint)) } pub fn set_zone_checkpoint(&self, checkpoint: &SequencerCheckpoint) -> Result<()> { - let bytes = - serde_json::to_vec(checkpoint).context("Failed to serialize zone-sdk checkpoint")?; + let bytes = serde_json::to_vec(checkpoint) + .map_err(SequencerStoreError::ZoneSdkCheckpointSerialization)?; self.dbio.put_zone_sdk_checkpoint_bytes(&bytes)?; Ok(()) } diff --git a/sequencer/core/src/lib.rs b/sequencer/core/src/lib.rs index bfc49aa9..c103adfc 100644 --- a/sequencer/core/src/lib.rs +++ b/sequencer/core/src/lib.rs @@ -18,7 +18,7 @@ pub use storage::error::DbError; use crate::{ block_publisher::{BlockPublisherTrait, ZoneSdkPublisher}, - block_store::SequencerStore, + block_store::{SequencerStore, SequencerStoreError}, }; pub mod block_publisher; @@ -65,10 +65,7 @@ impl SequencerCore { .expect("Genesis block not found in store"); (store, state, genesis_block) } - Err(DbError::RocksDbError { error, .. }) - if error.kind() == rocksdb::ErrorKind::InvalidArgument - && error.to_string().contains("does not exist") => - { + Err(SequencerStoreError::DatabaseNotFound) => { warn!( "Database not found at {}, starting from genesis", db_path.display() @@ -334,7 +331,7 @@ impl SequencerCore { Ok(self .store .get_all_blocks() - .collect::>>()? + .collect::>>()? .into_iter() .filter(|block| matches!(block.bedrock_status, BedrockStatus::Pending)) .collect()) @@ -441,7 +438,7 @@ mod tests { }; use logos_blockchain_core::mantle::ops::channel::ChannelId; use mempool::MemPoolHandle; - use tempfile::tempdir; + use tempfile::{TempDir, tempdir}; use testnet_initial_state::{initial_accounts, initial_pub_accounts_private_keys}; use crate::{ @@ -451,25 +448,28 @@ mod tests { mock::SequencerCoreWithMockClients, }; - fn setup_sequencer_config() -> SequencerConfig { + fn setup_sequencer_config() -> (SequencerConfig, TempDir) { let tempdir = tempfile::tempdir().unwrap(); let home = tempdir.path().to_path_buf(); - SequencerConfig { - home, - max_num_tx_in_block: 10, - max_block_size: bytesize::ByteSize::mib(1), - mempool_max_size: 10000, - block_create_timeout: Duration::from_secs(1), - signing_key: *sequencer_sign_key_for_testing().value(), - bedrock_config: BedrockConfig { - channel_id: ChannelId::from([0; 32]), - node_url: "http://not-used-in-unit-tests".parse().unwrap(), - auth: None, + ( + SequencerConfig { + home, + max_num_tx_in_block: 10, + max_block_size: bytesize::ByteSize::mib(1), + mempool_max_size: 10000, + block_create_timeout: Duration::from_secs(1), + signing_key: *sequencer_sign_key_for_testing().value(), + bedrock_config: BedrockConfig { + channel_id: ChannelId::from([0; 32]), + node_url: "http://not-used-in-unit-tests".parse().unwrap(), + auth: None, + }, + retry_pending_blocks_timeout: Duration::from_mins(4), + genesis: vec![], }, - retry_pending_blocks_timeout: Duration::from_mins(4), - genesis: vec![], - } + tempdir, + ) } fn create_signing_key_for_account1() -> nssa::PrivateKey { @@ -480,9 +480,14 @@ mod tests { initial_pub_accounts_private_keys()[1].pub_sign_key.clone() } - async fn common_setup() -> (SequencerCoreWithMockClients, MemPoolHandle) { - let config = setup_sequencer_config(); - common_setup_with_config(config).await + async fn common_setup() -> ( + SequencerCoreWithMockClients, + MemPoolHandle, + TempDir, + ) { + let (config, tempdir) = setup_sequencer_config(); + let (sequencer, mempool_handle) = common_setup_with_config(config).await; + (sequencer, mempool_handle, tempdir) } async fn common_setup_with_config( @@ -501,7 +506,7 @@ mod tests { #[tokio::test] async fn start_from_config() { - let config = setup_sequencer_config(); + let (config, _tempdir) = setup_sequencer_config(); let (sequencer, _mempool_handle) = SequencerCoreWithMockClients::start_from_config(config.clone()).await; @@ -520,7 +525,7 @@ mod tests { #[tokio::test] async fn start_from_config_opens_existing_db_if_it_exists() { - let config = setup_sequencer_config(); + let (config, _tempdir) = setup_sequencer_config(); let temp_dir = tempdir().unwrap(); let mut config = config; config.home = temp_dir.path().to_path_buf(); @@ -556,7 +561,7 @@ mod tests { #[should_panic(expected = "Failed to open database")] #[tokio::test] async fn start_from_config_panics_when_db_open_returns_non_not_found_error() { - let mut config = setup_sequencer_config(); + let (mut config, _tempdir) = setup_sequencer_config(); let temp_dir = tempdir().unwrap(); config.home = temp_dir.path().to_path_buf(); @@ -579,7 +584,7 @@ mod tests { #[tokio::test] async fn transaction_pre_check_native_transfer_valid() { - let (_sequencer, _mempool_handle) = common_setup().await; + let (_sequencer, _mempool_handle, _tempdir) = common_setup().await; let acc1 = initial_accounts()[0].account_id; let acc2 = initial_accounts()[1].account_id; @@ -596,7 +601,7 @@ mod tests { #[tokio::test] async fn transaction_pre_check_native_transfer_other_signature() { - let (mut sequencer, _mempool_handle) = common_setup().await; + let (mut sequencer, _mempool_handle, _tempdir) = common_setup().await; let acc1 = initial_accounts()[0].account_id; let acc2 = initial_accounts()[1].account_id; @@ -621,7 +626,7 @@ mod tests { #[tokio::test] async fn transaction_pre_check_native_transfer_sent_too_much() { - let (mut sequencer, _mempool_handle) = common_setup().await; + let (mut sequencer, _mempool_handle, _tempdir) = common_setup().await; let acc1 = initial_accounts()[0].account_id; let acc2 = initial_accounts()[1].account_id; @@ -650,7 +655,7 @@ mod tests { #[tokio::test] async fn transaction_execute_native_transfer() { - let (mut sequencer, _mempool_handle) = common_setup().await; + let (mut sequencer, _mempool_handle, _tempdir) = common_setup().await; let acc1 = initial_accounts()[0].account_id; let acc2 = initial_accounts()[1].account_id; @@ -673,9 +678,10 @@ mod tests { #[tokio::test] async fn push_tx_into_mempool_blocks_until_mempool_is_full() { + let (config, _tempdir) = setup_sequencer_config(); let config = SequencerConfig { mempool_max_size: 1, - ..setup_sequencer_config() + ..config }; let (mut sequencer, mempool_handle) = common_setup_with_config(config).await; @@ -698,7 +704,7 @@ mod tests { #[tokio::test] async fn build_block_from_mempool() { - let (mut sequencer, mempool_handle) = common_setup().await; + let (mut sequencer, mempool_handle, _tempdir) = common_setup().await; let genesis_height = sequencer.chain_height; let tx = common::test_utils::produce_dummy_empty_transaction(); @@ -711,7 +717,7 @@ mod tests { #[tokio::test] async fn replay_transactions_are_rejected_in_the_same_block() { - let (mut sequencer, mempool_handle) = common_setup().await; + let (mut sequencer, mempool_handle, _tempdir) = common_setup().await; let acc1 = initial_accounts()[0].account_id; let acc2 = initial_accounts()[1].account_id; @@ -748,7 +754,7 @@ mod tests { #[tokio::test] async fn replay_transactions_are_rejected_in_different_blocks() { - let (mut sequencer, mempool_handle) = common_setup().await; + let (mut sequencer, mempool_handle, _tempdir) = common_setup().await; let acc1 = initial_accounts()[0].account_id; let acc2 = initial_accounts()[1].account_id; @@ -794,7 +800,7 @@ mod tests { #[tokio::test] async fn restart_from_storage() { - let config = setup_sequencer_config(); + let (config, _tempdir) = setup_sequencer_config(); let acc1_account_id = initial_accounts()[0].account_id; let acc2_account_id = initial_accounts()[1].account_id; let balance_to_move = 13; @@ -851,7 +857,7 @@ mod tests { #[tokio::test] async fn get_pending_blocks() { - let config = setup_sequencer_config(); + let (config, _tempdir) = setup_sequencer_config(); let (mut sequencer, _mempool_handle) = SequencerCoreWithMockClients::start_from_config(config).await; sequencer.produce_new_block().await.unwrap(); @@ -862,7 +868,7 @@ mod tests { #[tokio::test] async fn delete_blocks() { - let config = setup_sequencer_config(); + let (config, _tempdir) = setup_sequencer_config(); let (mut sequencer, _mempool_handle) = SequencerCoreWithMockClients::start_from_config(config).await; sequencer.produce_new_block().await.unwrap(); @@ -879,7 +885,7 @@ mod tests { #[tokio::test] async fn produce_block_with_correct_prev_meta_after_restart() { - let config = setup_sequencer_config(); + let (config, _tempdir) = setup_sequencer_config(); let acc1_account_id = initial_accounts()[0].account_id; let acc2_account_id = initial_accounts()[1].account_id; @@ -948,7 +954,7 @@ mod tests { #[tokio::test] async fn transactions_touching_clock_account_are_dropped_from_block() { - let (mut sequencer, mempool_handle) = common_setup().await; + let (mut sequencer, mempool_handle, _tempdir) = common_setup().await; // Canonical clock invocation and a crafted variant with a different timestamp — both must // be dropped because their diffs touch the clock accounts. @@ -989,7 +995,7 @@ mod tests { #[tokio::test] async fn user_tx_that_chain_calls_clock_is_dropped() { - let (mut sequencer, mempool_handle) = common_setup().await; + let (mut sequencer, mempool_handle, _tempdir) = common_setup().await; // Deploy the clock_chain_caller test program. let deploy_tx = @@ -1043,7 +1049,7 @@ mod tests { #[tokio::test] async fn block_production_aborts_when_clock_account_data_is_corrupted() { - let (mut sequencer, mempool_handle) = common_setup().await; + let (mut sequencer, mempool_handle, _tempdir) = common_setup().await; // Corrupt the clock 01 account data so the clock program panics on deserialization. let clock_account_id = nssa::CLOCK_01_PROGRAM_ACCOUNT_ID; diff --git a/sequencer/service/src/service.rs b/sequencer/service/src/service.rs index 0bb8e1dd..74784430 100644 --- a/sequencer/service/src/service.rs +++ b/sequencer/service/src/service.rs @@ -8,7 +8,7 @@ use jsonrpsee::{ use log::warn; use mempool::MemPoolHandle; use nssa::{self, program::Program}; -use sequencer_core::{DbError, SequencerCore, block_publisher::BlockPublisherTrait}; +use sequencer_core::{SequencerCore, block_publisher::BlockPublisherTrait}; use sequencer_service_protocol::{ Account, AccountId, Block, BlockId, Commitment, HashType, MembershipProof, Nonce, ProgramId, }; @@ -175,6 +175,6 @@ impl sequencer_service_rpc::RpcServer } } -fn internal_error(err: &DbError) -> ErrorObjectOwned { +fn internal_error(err: &impl ToString) -> ErrorObjectOwned { ErrorObjectOwned::owned(ErrorCode::InternalError.code(), err.to_string(), None::<()>) }