diff --git a/Cargo.lock b/Cargo.lock index 5814b031..98f56c6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2790,8 +2790,10 @@ dependencies = [ "futures", "log", "logos-blockchain-core", + "nssa", "serde", "serde_json", + "storage", "tokio", "url", ] diff --git a/common/src/transaction.rs b/common/src/transaction.rs index 7100aec4..2245ec54 100644 --- a/common/src/transaction.rs +++ b/common/src/transaction.rs @@ -1,3 +1,5 @@ +use std::fmt::Display; + use borsh::{BorshDeserialize, BorshSerialize}; use log::{info, warn}; use nssa::V02State; @@ -101,6 +103,44 @@ impl EncodedTransaction { } } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum TransactionMalformationError { + InvalidSignature, + FailedToDecode { tx: HashType }, +} + +impl Display for TransactionMalformationError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:#?}") + } +} + +impl std::error::Error for TransactionMalformationError {} + +// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction +pub fn transaction_pre_check( + tx: NSSATransaction, +) -> Result { + // Stateless checks here + match tx { + NSSATransaction::Public(tx) => { + if tx.witness_set().is_valid_for(tx.message()) { + Ok(NSSATransaction::Public(tx)) + } else { + Err(TransactionMalformationError::InvalidSignature) + } + } + NSSATransaction::PrivacyPreserving(tx) => { + if tx.witness_set().signatures_are_valid_for(tx.message()) { + Ok(NSSATransaction::PrivacyPreserving(tx)) + } else { + Err(TransactionMalformationError::InvalidSignature) + } + } + NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)), + } +} + pub fn execute_check_transaction_on_state( state: &mut V02State, tx: NSSATransaction, diff --git a/indexer_core/Cargo.toml b/indexer_core/Cargo.toml index 922f566c..19b016a4 100644 --- a/indexer_core/Cargo.toml +++ b/indexer_core/Cargo.toml @@ -6,6 +6,8 @@ edition = "2024" [dependencies] common.workspace = true bedrock_client.workspace = true +nssa.workspace = true +storage.workspace = true anyhow.workspace = true log.workspace = true diff --git a/indexer_core/src/block_store.rs b/indexer_core/src/block_store.rs new file mode 100644 index 00000000..8cafc3ab --- /dev/null +++ b/indexer_core/src/block_store.rs @@ -0,0 +1,100 @@ +use std::path::Path; + +use anyhow::Result; +use common::{block::Block, transaction::{NSSATransaction, execute_check_transaction_on_state}}; +use nssa::V02State; +use storage::indexer::RocksDBIO; + +pub struct IndexerStore { + dbio: RocksDBIO, +} + +impl IndexerStore { + /// Starting database at the start of new chain. + /// Creates files if necessary. + /// + /// ATTENTION: Will overwrite genesis block. + pub fn open_db_with_genesis( + location: &Path, + start_data: Option<(Block, V02State)>, + ) -> Result { + let dbio = RocksDBIO::open_or_create(location, start_data)?; + + Ok(Self { + dbio, + }) + } + + /// Reopening existing database + pub fn open_db_restart(location: &Path) -> Result { + Self::open_db_with_genesis(location, None) + } + + pub fn get_block_at_id(&self, id: u64) -> Result { + Ok(self.dbio.get_block(id)?) + } + + pub fn genesis_id(&self) -> u64 { + self.dbio.get_meta_first_block_in_db().expect("Must be set at the DB startup") + } + + pub fn last_block(&self) -> u64 { + self.dbio.get_meta_last_block_in_db().expect("Must be set at the DB startup") + } + + pub fn get_state_at_block(&self, block_id: u64) -> Result { + Ok(self.dbio.calculate_state_for_id(block_id)?) + } + + pub fn put_block(&self, block: Block) -> Result<()> { + let mut final_state = self.dbio.final_state()?; + + for encoded_transaction in &block.body.transactions { + let transaction = NSSATransaction::try_from(encoded_transaction)?; + execute_check_transaction_on_state(&mut final_state, transaction)?; + } + + Ok(self.dbio.put_block(block)?) + } +} + +// #[cfg(test)] +// mod tests { +// use common::{block::HashableBlockData, test_utils::sequencer_sign_key_for_testing}; +// use tempfile::tempdir; + +// use super::*; + +// #[test] +// fn test_get_transaction_by_hash() { +// let temp_dir = tempdir().unwrap(); +// let path = temp_dir.path(); + +// let signing_key = sequencer_sign_key_for_testing(); + +// let genesis_block_hashable_data = HashableBlockData { +// block_id: 0, +// prev_block_hash: [0; 32], +// timestamp: 0, +// transactions: vec![], +// }; + +// let genesis_block = genesis_block_hashable_data.into_pending_block(&signing_key, [0; 32]); +// // Start an empty node store +// let mut node_store = +// SequencerStore::open_db_with_genesis(path, Some(genesis_block), signing_key).unwrap(); + +// let tx = common::test_utils::produce_dummy_empty_transaction(); +// let block = common::test_utils::produce_dummy_block(1, None, vec![tx.clone()]); + +// // Try retrieve a tx that's not in the chain yet. +// let retrieved_tx = node_store.get_transaction_by_hash(tx.hash()); +// assert_eq!(None, retrieved_tx); +// // Add the block with the transaction +// let dummy_state = V02State::new_with_genesis_accounts(&[], &[]); +// node_store.update(block, &dummy_state).unwrap(); +// // Try again +// let retrieved_tx = node_store.get_transaction_by_hash(tx.hash()); +// assert_eq!(Some(tx), retrieved_tx); +// } +// } diff --git a/indexer_core/src/lib.rs b/indexer_core/src/lib.rs index ca9ec22f..b6e79d22 100644 --- a/indexer_core/src/lib.rs +++ b/indexer_core/src/lib.rs @@ -18,6 +18,7 @@ use crate::{config::IndexerConfig, state::IndexerState}; pub mod config; pub mod state; +pub mod block_store; pub struct IndexerCore { pub bedrock_client: BedrockClient, diff --git a/sequencer_core/src/lib.rs b/sequencer_core/src/lib.rs index efddcd7e..b90cd524 100644 --- a/sequencer_core/src/lib.rs +++ b/sequencer_core/src/lib.rs @@ -1,17 +1,15 @@ -use std::{fmt::Display, time::Instant}; +use std::time::Instant; use anyhow::Result; #[cfg(feature = "testnet")] use common::PINATA_BASE58; use common::{ - HashType, block::{BedrockStatus, Block, HashableBlockData, MantleMsgId}, - transaction::{EncodedTransaction, NSSATransaction}, + transaction::{EncodedTransaction, NSSATransaction, TransactionMalformationError}, }; use config::SequencerConfig; use log::{info, warn}; use mempool::{MemPool, MemPoolHandle}; -use serde::{Deserialize, Serialize}; use crate::{block_settlement_client::BlockSettlementClient, block_store::SequencerStore}; @@ -29,20 +27,6 @@ pub struct SequencerCore { last_bedrock_msg_id: MantleMsgId, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum TransactionMalformationError { - InvalidSignature, - FailedToDecode { tx: HashType }, -} - -impl Display for TransactionMalformationError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{self:#?}") - } -} - -impl std::error::Error for TransactionMalformationError {} - impl SequencerCore { /// Starts the sequencer using the provided configuration. /// If an existing database is found, the sequencer state is loaded from it and @@ -268,36 +252,12 @@ impl SequencerCore { } } -// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction -pub fn transaction_pre_check( - tx: NSSATransaction, -) -> Result { - // Stateless checks here - match tx { - NSSATransaction::Public(tx) => { - if tx.witness_set().is_valid_for(tx.message()) { - Ok(NSSATransaction::Public(tx)) - } else { - Err(TransactionMalformationError::InvalidSignature) - } - } - NSSATransaction::PrivacyPreserving(tx) => { - if tx.witness_set().signatures_are_valid_for(tx.message()) { - Ok(NSSATransaction::PrivacyPreserving(tx)) - } else { - Err(TransactionMalformationError::InvalidSignature) - } - } - NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)), - } -} - #[cfg(test)] mod tests { use std::pin::pin; use base58::{FromBase58, ToBase58}; - use common::test_utils::sequencer_sign_key_for_testing; + use common::{test_utils::sequencer_sign_key_for_testing, transaction::transaction_pre_check}; use nssa::PrivateKey; use super::*; diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index 8b4ec7a5..246ead52 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -22,12 +22,12 @@ use common::{ PostIndexerMessageResponse, SendTxRequest, SendTxResponse, }, }, - transaction::{EncodedTransaction, NSSATransaction}, + transaction::{EncodedTransaction, NSSATransaction, TransactionMalformationError, transaction_pre_check}, }; use itertools::Itertools as _; use log::warn; use nssa::{self, program::Program}; -use sequencer_core::{TransactionMalformationError, config::AccountInitialData}; +use sequencer_core::config::AccountInitialData; use serde_json::Value; use super::{JsonHandler, respond, types::err_rpc::RpcErr}; @@ -88,7 +88,7 @@ impl JsonHandler { let transaction = NSSATransaction::try_from(&tx) .map_err(|_| TransactionMalformationError::FailedToDecode { tx: tx.hash() })?; - let authenticated_tx = sequencer_core::transaction_pre_check(transaction) + let authenticated_tx = transaction_pre_check(transaction) .inspect_err(|err| warn!("Error at pre_check {err:#?}"))?; // TODO: Do we need a timeout here? It will be usable if we have too many transactions to diff --git a/sequencer_rpc/src/types/err_rpc.rs b/sequencer_rpc/src/types/err_rpc.rs index 14807d5f..ad25eee3 100644 --- a/sequencer_rpc/src/types/err_rpc.rs +++ b/sequencer_rpc/src/types/err_rpc.rs @@ -1,6 +1,5 @@ -use common::rpc_primitives::errors::{RpcError, RpcParseError}; +use common::{rpc_primitives::errors::{RpcError, RpcParseError}, transaction::TransactionMalformationError}; use log::debug; -use sequencer_core::TransactionMalformationError; pub struct RpcErr(pub RpcError); diff --git a/storage/src/indexer.rs b/storage/src/indexer.rs index 7f7529d1..be9abc3f 100644 --- a/storage/src/indexer.rs +++ b/storage/src/indexer.rs @@ -29,8 +29,6 @@ pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db"; pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db"; /// Key base for storing metainformation which describe if first block has been set pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set"; -/// Key base for storing metainformation about the last finalized block on Bedrock -pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id"; /// Key base for storing metainformation about the last breakpoint pub const DB_META_LAST_BREAKPOINT_ID: &str = "last_breakpoint_id"; @@ -57,8 +55,7 @@ pub struct RocksDBIO { impl RocksDBIO { pub fn open_or_create( path: &Path, - start_block: Option, - initial_state: V02State, + start_data: Option<(Block, V02State)>, ) -> DbResult { let mut cf_opts = Options::default(); cf_opts.set_max_write_buffer_number(16); @@ -85,12 +82,11 @@ impl RocksDBIO { if is_start_set { Ok(dbio) - } else if let Some(block) = start_block { + } else if let Some((block, initial_state)) = start_data { let block_id = block.header.block_id; dbio.put_meta_first_block_in_db(block)?; dbio.put_meta_is_first_block_set()?; dbio.put_meta_last_block_in_db(block_id)?; - dbio.put_meta_last_finalized_block_id(None)?; // First breakpoint setup dbio.put_breakpoint(0, initial_state)?; @@ -255,7 +251,7 @@ impl RocksDBIO { ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; - self.put_block(block, true)?; + self.put_block(block)?; Ok(()) } @@ -281,28 +277,6 @@ impl RocksDBIO { Ok(()) } - pub fn put_meta_last_finalized_block_id(&self, block_id: Option) -> DbResult<()> { - let cf_meta = self.meta_column(); - self.db - .put_cf( - &cf_meta, - borsh::to_vec(&DB_META_LAST_FINALIZED_BLOCK_ID).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize DB_META_LAST_FINALIZED_BLOCK_ID".to_string()), - ) - })?, - borsh::to_vec(&block_id).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize last block id".to_string()), - ) - })?, - ) - .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; - Ok(()) - } - pub fn put_meta_last_breakpoint_id(&self, br_id: u64) -> DbResult<()> { let cf_meta = self.meta_column(); self.db @@ -342,17 +316,9 @@ impl RocksDBIO { Ok(()) } - pub fn put_block(&self, block: Block, first: bool) -> DbResult<()> { + pub fn put_block(&self, block: Block) -> DbResult<()> { let cf_block = self.block_column(); - if !first { - let last_curr_block = self.get_meta_last_block_in_db()?; - - if block.header.block_id > last_curr_block { - self.put_meta_last_block_in_db(block.header.block_id)?; - } - } - self.db .put_cf( &cf_block, @@ -371,6 +337,12 @@ impl RocksDBIO { ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + let last_curr_block = self.get_meta_last_block_in_db()?; + + if block.header.block_id > last_curr_block { + self.put_meta_last_block_in_db(block.header.block_id)?; + } + if block.header.block_id.is_multiple_of(BREAKPOINT_INTERVAL) { self.put_next_breakpoint()?; } @@ -465,7 +437,15 @@ impl RocksDBIO { let br_id = closest_breakpoint_id(block_id); let mut breakpoint = self.get_breakpoint(br_id)?; - for id in (BREAKPOINT_INTERVAL * br_id)..=block_id { + // ToDo: update it to handle any genesis id + // right now works correctly only if genesis_id < BREAKPOINT_INTERVAL + let start = if br_id != 0 { + BREAKPOINT_INTERVAL * br_id + } else { + self.get_meta_first_block_in_db()? + }; + + for id in start..=block_id { let block = self.get_block(id)?; for encoded_transaction in block.body.transactions { @@ -483,6 +463,10 @@ impl RocksDBIO { } } + pub fn final_state(&self) -> DbResult { + self.calculate_state_for_id(self.get_meta_last_block_in_db()?) + } + pub fn put_next_breakpoint(&self) -> DbResult<()> { let last_block = self.get_meta_last_block_in_db()?; let breakpoint_id = self.get_meta_last_breakpoint_id()?;