refactor: move database not found error matching from sequencer to storage

This commit is contained in:
Daniil Polyakov 2026-05-13 23:39:48 +03:00
parent aef309f30d
commit fe047e169c
6 changed files with 106 additions and 74 deletions

1
Cargo.lock generated
View File

@ -8438,6 +8438,7 @@ dependencies = [
"tempfile",
"test_program_methods",
"testnet_initial_state",
"thiserror 2.0.18",
"tokio",
"url",
"vault_core",

View File

@ -8,7 +8,7 @@ use common::{
use log::info;
use logos_blockchain_core::{header::HeaderId, mantle::ops::channel::MsgId};
use logos_blockchain_zone_sdk::Slot;
use nssa::{Account, AccountId, V03State};
use nssa::{Account, AccountId, GENESIS_BLOCK_ID, V03State};
use nssa_core::BlockId;
use storage::indexer::RocksDBIO;
use tokio::sync::RwLock;
@ -150,7 +150,7 @@ impl IndexerStore {
"Last transaction in block must be the clock invocation for the block timestamp"
);
let is_genesis = block.header.block_id == 1;
let is_genesis = block.header.block_id == GENESIS_BLOCK_ID;
for transaction in user_txs {
if is_genesis {
let genesis_tx = match transaction {

View File

@ -17,6 +17,7 @@ logos-blockchain-zone-sdk.workspace = true
testnet_initial_state.workspace = true
vault_core.workspace = true
thiserror.workspace = true
anyhow.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@ -1,17 +1,31 @@
use std::{collections::HashMap, path::Path, sync::Arc};
use anyhow::{Context as _, Result};
use common::{
HashType,
block::{Block, BlockMeta, MantleMsgId},
transaction::NSSATransaction,
};
use log::info;
use logos_blockchain_zone_sdk::sequencer::SequencerCheckpoint;
use nssa::V03State;
pub use storage::DbResult;
use storage::sequencer::RocksDBIO;
#[derive(Debug, thiserror::Error)]
pub enum SequencerStoreError {
#[error("Database internal error")]
DatabaseInternal(#[from] storage::error::DbError),
#[error("Database not found at the specified location")]
DatabaseNotFound,
#[error("Zone SDK checkpoint serialization error")]
ZoneSdkCheckpointSerialization(#[source] serde_json::Error),
#[error("Zone SDK checkpoint deserialization error")]
ZoneSdkCheckpointDeserialization(#[source] serde_json::Error),
}
pub type Result<T> = std::result::Result<T, SequencerStoreError>;
pub struct SequencerStore {
dbio: Arc<RocksDBIO>,
// TODO: Consider adding the hashmap to the database for faster recovery.
@ -22,12 +36,22 @@ pub struct SequencerStore {
impl SequencerStore {
/// Open existing database at the given location. Fails if no database is found.
pub fn open_db(location: &Path, signing_key: nssa::PrivateKey) -> DbResult<Self> {
let dbio = Arc::new(RocksDBIO::open(location)?);
pub fn open_db(location: &Path, signing_key: nssa::PrivateKey) -> Result<Self> {
let dbio = match RocksDBIO::open(location) {
Ok(dbio) => dbio,
Err(storage::error::DbError::RocksDbError { error, .. })
if error.kind() == rocksdb::ErrorKind::InvalidArgument
&& error.to_string().contains("does not exist") =>
{
return Err(SequencerStoreError::DatabaseNotFound);
}
Err(e) => return Err(e.into()),
};
let dbio = Arc::new(dbio);
let genesis_id = dbio.get_meta_first_block_in_db()?;
let last_id = dbio.latest_block_meta()?.id;
info!("Preparing block cache");
let mut tx_hash_to_block_map = HashMap::new();
for i in genesis_id..=last_id {
let block = dbio
@ -36,10 +60,6 @@ impl SequencerStore {
tx_hash_to_block_map.extend(block_to_transactions_map(&block));
}
info!(
"Block cache prepared. Total blocks in cache: {}",
tx_hash_to_block_map.len()
);
Ok(Self {
dbio,
@ -59,7 +79,7 @@ impl SequencerStore {
genesis_msg_id: MantleMsgId,
genesis_state: &V03State,
signing_key: nssa::PrivateKey,
) -> DbResult<Self> {
) -> Result<Self> {
let dbio = Arc::new(RocksDBIO::create(
location,
genesis_block,
@ -85,16 +105,18 @@ impl SequencerStore {
Arc::clone(&self.dbio)
}
pub fn get_block_at_id(&self, id: u64) -> DbResult<Option<Block>> {
self.dbio.get_block(id)
pub fn get_block_at_id(&self, id: u64) -> Result<Option<Block>> {
self.dbio.get_block(id).map_err(Into::into)
}
pub fn delete_block_at_id(&mut self, block_id: u64) -> DbResult<()> {
self.dbio.delete_block(block_id)
pub fn delete_block_at_id(&mut self, block_id: u64) -> Result<()> {
self.dbio.delete_block(block_id).map_err(Into::into)
}
pub fn mark_block_as_finalized(&mut self, block_id: u64) -> DbResult<()> {
self.dbio.mark_block_as_finalized(block_id)
pub fn mark_block_as_finalized(&mut self, block_id: u64) -> Result<()> {
self.dbio
.mark_block_as_finalized(block_id)
.map_err(Into::into)
}
/// Returns the transaction corresponding to the given hash, if it exists in the blockchain.
@ -116,8 +138,8 @@ impl SequencerStore {
);
}
pub fn latest_block_meta(&self) -> DbResult<BlockMeta> {
self.dbio.latest_block_meta()
pub fn latest_block_meta(&self) -> Result<BlockMeta> {
self.dbio.latest_block_meta().map_err(Into::into)
}
#[must_use]
@ -130,8 +152,10 @@ impl SequencerStore {
&self.signing_key
}
pub fn get_all_blocks(&self) -> impl Iterator<Item = DbResult<Block>> {
self.dbio.get_all_blocks()
pub fn get_all_blocks(&self) -> impl Iterator<Item = Result<Block>> {
self.dbio
.get_all_blocks()
.map(|res| res.map_err(Into::into))
}
pub(crate) fn update(
@ -139,15 +163,15 @@ impl SequencerStore {
block: &Block,
msg_id: MantleMsgId,
state: &V03State,
) -> DbResult<()> {
) -> Result<()> {
let new_transactions_map = block_to_transactions_map(block);
self.dbio.atomic_update(block, msg_id, state)?;
self.tx_hash_to_block_map.extend(new_transactions_map);
Ok(())
}
pub fn get_nssa_state(&self) -> DbResult<V03State> {
self.dbio.get_nssa_state()
pub fn get_nssa_state(&self) -> Result<V03State> {
self.dbio.get_nssa_state().map_err(Into::into)
}
pub fn get_zone_checkpoint(&self) -> Result<Option<SequencerCheckpoint>> {
@ -155,13 +179,13 @@ impl SequencerStore {
return Ok(None);
};
let checkpoint: SequencerCheckpoint = serde_json::from_slice(&bytes)
.context("Failed to deserialize stored zone-sdk checkpoint")?;
.map_err(SequencerStoreError::ZoneSdkCheckpointDeserialization)?;
Ok(Some(checkpoint))
}
pub fn set_zone_checkpoint(&self, checkpoint: &SequencerCheckpoint) -> Result<()> {
let bytes =
serde_json::to_vec(checkpoint).context("Failed to serialize zone-sdk checkpoint")?;
let bytes = serde_json::to_vec(checkpoint)
.map_err(SequencerStoreError::ZoneSdkCheckpointSerialization)?;
self.dbio.put_zone_sdk_checkpoint_bytes(&bytes)?;
Ok(())
}

View File

@ -18,7 +18,7 @@ pub use storage::error::DbError;
use crate::{
block_publisher::{BlockPublisherTrait, ZoneSdkPublisher},
block_store::SequencerStore,
block_store::{SequencerStore, SequencerStoreError},
};
pub mod block_publisher;
@ -65,10 +65,7 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
.expect("Genesis block not found in store");
(store, state, genesis_block)
}
Err(DbError::RocksDbError { error, .. })
if error.kind() == rocksdb::ErrorKind::InvalidArgument
&& error.to_string().contains("does not exist") =>
{
Err(SequencerStoreError::DatabaseNotFound) => {
warn!(
"Database not found at {}, starting from genesis",
db_path.display()
@ -334,7 +331,7 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
Ok(self
.store
.get_all_blocks()
.collect::<block_store::DbResult<Vec<Block>>>()?
.collect::<block_store::Result<Vec<Block>>>()?
.into_iter()
.filter(|block| matches!(block.bedrock_status, BedrockStatus::Pending))
.collect())
@ -441,7 +438,7 @@ mod tests {
};
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use mempool::MemPoolHandle;
use tempfile::tempdir;
use tempfile::{TempDir, tempdir};
use testnet_initial_state::{initial_accounts, initial_pub_accounts_private_keys};
use crate::{
@ -451,25 +448,28 @@ mod tests {
mock::SequencerCoreWithMockClients,
};
fn setup_sequencer_config() -> SequencerConfig {
fn setup_sequencer_config() -> (SequencerConfig, TempDir) {
let tempdir = tempfile::tempdir().unwrap();
let home = tempdir.path().to_path_buf();
SequencerConfig {
home,
max_num_tx_in_block: 10,
max_block_size: bytesize::ByteSize::mib(1),
mempool_max_size: 10000,
block_create_timeout: Duration::from_secs(1),
signing_key: *sequencer_sign_key_for_testing().value(),
bedrock_config: BedrockConfig {
channel_id: ChannelId::from([0; 32]),
node_url: "http://not-used-in-unit-tests".parse().unwrap(),
auth: None,
(
SequencerConfig {
home,
max_num_tx_in_block: 10,
max_block_size: bytesize::ByteSize::mib(1),
mempool_max_size: 10000,
block_create_timeout: Duration::from_secs(1),
signing_key: *sequencer_sign_key_for_testing().value(),
bedrock_config: BedrockConfig {
channel_id: ChannelId::from([0; 32]),
node_url: "http://not-used-in-unit-tests".parse().unwrap(),
auth: None,
},
retry_pending_blocks_timeout: Duration::from_mins(4),
genesis: vec![],
},
retry_pending_blocks_timeout: Duration::from_mins(4),
genesis: vec![],
}
tempdir,
)
}
fn create_signing_key_for_account1() -> nssa::PrivateKey {
@ -480,9 +480,14 @@ mod tests {
initial_pub_accounts_private_keys()[1].pub_sign_key.clone()
}
async fn common_setup() -> (SequencerCoreWithMockClients, MemPoolHandle<NSSATransaction>) {
let config = setup_sequencer_config();
common_setup_with_config(config).await
async fn common_setup() -> (
SequencerCoreWithMockClients,
MemPoolHandle<NSSATransaction>,
TempDir,
) {
let (config, tempdir) = setup_sequencer_config();
let (sequencer, mempool_handle) = common_setup_with_config(config).await;
(sequencer, mempool_handle, tempdir)
}
async fn common_setup_with_config(
@ -501,7 +506,7 @@ mod tests {
#[tokio::test]
async fn start_from_config() {
let config = setup_sequencer_config();
let (config, _tempdir) = setup_sequencer_config();
let (sequencer, _mempool_handle) =
SequencerCoreWithMockClients::start_from_config(config.clone()).await;
@ -520,7 +525,7 @@ mod tests {
#[tokio::test]
async fn start_from_config_opens_existing_db_if_it_exists() {
let config = setup_sequencer_config();
let (config, _tempdir) = setup_sequencer_config();
let temp_dir = tempdir().unwrap();
let mut config = config;
config.home = temp_dir.path().to_path_buf();
@ -556,7 +561,7 @@ mod tests {
#[should_panic(expected = "Failed to open database")]
#[tokio::test]
async fn start_from_config_panics_when_db_open_returns_non_not_found_error() {
let mut config = setup_sequencer_config();
let (mut config, _tempdir) = setup_sequencer_config();
let temp_dir = tempdir().unwrap();
config.home = temp_dir.path().to_path_buf();
@ -579,7 +584,7 @@ mod tests {
#[tokio::test]
async fn transaction_pre_check_native_transfer_valid() {
let (_sequencer, _mempool_handle) = common_setup().await;
let (_sequencer, _mempool_handle, _tempdir) = common_setup().await;
let acc1 = initial_accounts()[0].account_id;
let acc2 = initial_accounts()[1].account_id;
@ -596,7 +601,7 @@ mod tests {
#[tokio::test]
async fn transaction_pre_check_native_transfer_other_signature() {
let (mut sequencer, _mempool_handle) = common_setup().await;
let (mut sequencer, _mempool_handle, _tempdir) = common_setup().await;
let acc1 = initial_accounts()[0].account_id;
let acc2 = initial_accounts()[1].account_id;
@ -621,7 +626,7 @@ mod tests {
#[tokio::test]
async fn transaction_pre_check_native_transfer_sent_too_much() {
let (mut sequencer, _mempool_handle) = common_setup().await;
let (mut sequencer, _mempool_handle, _tempdir) = common_setup().await;
let acc1 = initial_accounts()[0].account_id;
let acc2 = initial_accounts()[1].account_id;
@ -650,7 +655,7 @@ mod tests {
#[tokio::test]
async fn transaction_execute_native_transfer() {
let (mut sequencer, _mempool_handle) = common_setup().await;
let (mut sequencer, _mempool_handle, _tempdir) = common_setup().await;
let acc1 = initial_accounts()[0].account_id;
let acc2 = initial_accounts()[1].account_id;
@ -673,9 +678,10 @@ mod tests {
#[tokio::test]
async fn push_tx_into_mempool_blocks_until_mempool_is_full() {
let (config, _tempdir) = setup_sequencer_config();
let config = SequencerConfig {
mempool_max_size: 1,
..setup_sequencer_config()
..config
};
let (mut sequencer, mempool_handle) = common_setup_with_config(config).await;
@ -698,7 +704,7 @@ mod tests {
#[tokio::test]
async fn build_block_from_mempool() {
let (mut sequencer, mempool_handle) = common_setup().await;
let (mut sequencer, mempool_handle, _tempdir) = common_setup().await;
let genesis_height = sequencer.chain_height;
let tx = common::test_utils::produce_dummy_empty_transaction();
@ -711,7 +717,7 @@ mod tests {
#[tokio::test]
async fn replay_transactions_are_rejected_in_the_same_block() {
let (mut sequencer, mempool_handle) = common_setup().await;
let (mut sequencer, mempool_handle, _tempdir) = common_setup().await;
let acc1 = initial_accounts()[0].account_id;
let acc2 = initial_accounts()[1].account_id;
@ -748,7 +754,7 @@ mod tests {
#[tokio::test]
async fn replay_transactions_are_rejected_in_different_blocks() {
let (mut sequencer, mempool_handle) = common_setup().await;
let (mut sequencer, mempool_handle, _tempdir) = common_setup().await;
let acc1 = initial_accounts()[0].account_id;
let acc2 = initial_accounts()[1].account_id;
@ -794,7 +800,7 @@ mod tests {
#[tokio::test]
async fn restart_from_storage() {
let config = setup_sequencer_config();
let (config, _tempdir) = setup_sequencer_config();
let acc1_account_id = initial_accounts()[0].account_id;
let acc2_account_id = initial_accounts()[1].account_id;
let balance_to_move = 13;
@ -851,7 +857,7 @@ mod tests {
#[tokio::test]
async fn get_pending_blocks() {
let config = setup_sequencer_config();
let (config, _tempdir) = setup_sequencer_config();
let (mut sequencer, _mempool_handle) =
SequencerCoreWithMockClients::start_from_config(config).await;
sequencer.produce_new_block().await.unwrap();
@ -862,7 +868,7 @@ mod tests {
#[tokio::test]
async fn delete_blocks() {
let config = setup_sequencer_config();
let (config, _tempdir) = setup_sequencer_config();
let (mut sequencer, _mempool_handle) =
SequencerCoreWithMockClients::start_from_config(config).await;
sequencer.produce_new_block().await.unwrap();
@ -879,7 +885,7 @@ mod tests {
#[tokio::test]
async fn produce_block_with_correct_prev_meta_after_restart() {
let config = setup_sequencer_config();
let (config, _tempdir) = setup_sequencer_config();
let acc1_account_id = initial_accounts()[0].account_id;
let acc2_account_id = initial_accounts()[1].account_id;
@ -948,7 +954,7 @@ mod tests {
#[tokio::test]
async fn transactions_touching_clock_account_are_dropped_from_block() {
let (mut sequencer, mempool_handle) = common_setup().await;
let (mut sequencer, mempool_handle, _tempdir) = common_setup().await;
// Canonical clock invocation and a crafted variant with a different timestamp — both must
// be dropped because their diffs touch the clock accounts.
@ -989,7 +995,7 @@ mod tests {
#[tokio::test]
async fn user_tx_that_chain_calls_clock_is_dropped() {
let (mut sequencer, mempool_handle) = common_setup().await;
let (mut sequencer, mempool_handle, _tempdir) = common_setup().await;
// Deploy the clock_chain_caller test program.
let deploy_tx =
@ -1043,7 +1049,7 @@ mod tests {
#[tokio::test]
async fn block_production_aborts_when_clock_account_data_is_corrupted() {
let (mut sequencer, mempool_handle) = common_setup().await;
let (mut sequencer, mempool_handle, _tempdir) = common_setup().await;
// Corrupt the clock 01 account data so the clock program panics on deserialization.
let clock_account_id = nssa::CLOCK_01_PROGRAM_ACCOUNT_ID;

View File

@ -8,7 +8,7 @@ use jsonrpsee::{
use log::warn;
use mempool::MemPoolHandle;
use nssa::{self, program::Program};
use sequencer_core::{DbError, SequencerCore, block_publisher::BlockPublisherTrait};
use sequencer_core::{SequencerCore, block_publisher::BlockPublisherTrait};
use sequencer_service_protocol::{
Account, AccountId, Block, BlockId, Commitment, HashType, MembershipProof, Nonce, ProgramId,
};
@ -175,6 +175,6 @@ impl<BC: BlockPublisherTrait + Send + 'static> sequencer_service_rpc::RpcServer
}
}
fn internal_error(err: &DbError) -> ErrorObjectOwned {
fn internal_error(err: &impl ToString) -> ErrorObjectOwned {
ErrorObjectOwned::owned(ErrorCode::InternalError.code(), err.to_string(), None::<()>)
}