feat: db exposed 1

This commit is contained in:
Pravdyvy 2026-01-30 10:25:34 +02:00
parent 3cb7f17aaf
commit 17a1be5b0c
9 changed files with 175 additions and 87 deletions

2
Cargo.lock generated
View File

@ -2790,8 +2790,10 @@ dependencies = [
"futures",
"log",
"logos-blockchain-core",
"nssa",
"serde",
"serde_json",
"storage",
"tokio",
"url",
]

View File

@ -1,3 +1,5 @@
use std::fmt::Display;
use borsh::{BorshDeserialize, BorshSerialize};
use log::{info, warn};
use nssa::V02State;
@ -101,6 +103,44 @@ impl EncodedTransaction {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TransactionMalformationError {
InvalidSignature,
FailedToDecode { tx: HashType },
}
impl Display for TransactionMalformationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:#?}")
}
}
impl std::error::Error for TransactionMalformationError {}
// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction
pub fn transaction_pre_check(
tx: NSSATransaction,
) -> Result<NSSATransaction, TransactionMalformationError> {
// Stateless checks here
match tx {
NSSATransaction::Public(tx) => {
if tx.witness_set().is_valid_for(tx.message()) {
Ok(NSSATransaction::Public(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::PrivacyPreserving(tx) => {
if tx.witness_set().signatures_are_valid_for(tx.message()) {
Ok(NSSATransaction::PrivacyPreserving(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)),
}
}
pub fn execute_check_transaction_on_state(
state: &mut V02State,
tx: NSSATransaction,

View File

@ -6,6 +6,8 @@ edition = "2024"
[dependencies]
common.workspace = true
bedrock_client.workspace = true
nssa.workspace = true
storage.workspace = true
anyhow.workspace = true
log.workspace = true

View File

@ -0,0 +1,100 @@
use std::path::Path;
use anyhow::Result;
use common::{block::Block, transaction::{NSSATransaction, execute_check_transaction_on_state}};
use nssa::V02State;
use storage::indexer::RocksDBIO;
pub struct IndexerStore {
dbio: RocksDBIO,
}
impl IndexerStore {
/// Starting database at the start of new chain.
/// Creates files if necessary.
///
/// ATTENTION: Will overwrite genesis block.
pub fn open_db_with_genesis(
location: &Path,
start_data: Option<(Block, V02State)>,
) -> Result<Self> {
let dbio = RocksDBIO::open_or_create(location, start_data)?;
Ok(Self {
dbio,
})
}
/// Reopening existing database
pub fn open_db_restart(location: &Path) -> Result<Self> {
Self::open_db_with_genesis(location, None)
}
pub fn get_block_at_id(&self, id: u64) -> Result<Block> {
Ok(self.dbio.get_block(id)?)
}
pub fn genesis_id(&self) -> u64 {
self.dbio.get_meta_first_block_in_db().expect("Must be set at the DB startup")
}
pub fn last_block(&self) -> u64 {
self.dbio.get_meta_last_block_in_db().expect("Must be set at the DB startup")
}
pub fn get_state_at_block(&self, block_id: u64) -> Result<V02State> {
Ok(self.dbio.calculate_state_for_id(block_id)?)
}
pub fn put_block(&self, block: Block) -> Result<()> {
let mut final_state = self.dbio.final_state()?;
for encoded_transaction in &block.body.transactions {
let transaction = NSSATransaction::try_from(encoded_transaction)?;
execute_check_transaction_on_state(&mut final_state, transaction)?;
}
Ok(self.dbio.put_block(block)?)
}
}
// #[cfg(test)]
// mod tests {
// use common::{block::HashableBlockData, test_utils::sequencer_sign_key_for_testing};
// use tempfile::tempdir;
// use super::*;
// #[test]
// fn test_get_transaction_by_hash() {
// let temp_dir = tempdir().unwrap();
// let path = temp_dir.path();
// let signing_key = sequencer_sign_key_for_testing();
// let genesis_block_hashable_data = HashableBlockData {
// block_id: 0,
// prev_block_hash: [0; 32],
// timestamp: 0,
// transactions: vec![],
// };
// let genesis_block = genesis_block_hashable_data.into_pending_block(&signing_key, [0; 32]);
// // Start an empty node store
// let mut node_store =
// SequencerStore::open_db_with_genesis(path, Some(genesis_block), signing_key).unwrap();
// let tx = common::test_utils::produce_dummy_empty_transaction();
// let block = common::test_utils::produce_dummy_block(1, None, vec![tx.clone()]);
// // Try retrieve a tx that's not in the chain yet.
// let retrieved_tx = node_store.get_transaction_by_hash(tx.hash());
// assert_eq!(None, retrieved_tx);
// // Add the block with the transaction
// let dummy_state = V02State::new_with_genesis_accounts(&[], &[]);
// node_store.update(block, &dummy_state).unwrap();
// // Try again
// let retrieved_tx = node_store.get_transaction_by_hash(tx.hash());
// assert_eq!(Some(tx), retrieved_tx);
// }
// }

View File

@ -18,6 +18,7 @@ use crate::{config::IndexerConfig, state::IndexerState};
pub mod config;
pub mod state;
pub mod block_store;
pub struct IndexerCore {
pub bedrock_client: BedrockClient,

View File

@ -1,17 +1,15 @@
use std::{fmt::Display, time::Instant};
use std::time::Instant;
use anyhow::Result;
#[cfg(feature = "testnet")]
use common::PINATA_BASE58;
use common::{
HashType,
block::{BedrockStatus, Block, HashableBlockData, MantleMsgId},
transaction::{EncodedTransaction, NSSATransaction},
transaction::{EncodedTransaction, NSSATransaction, TransactionMalformationError},
};
use config::SequencerConfig;
use log::{info, warn};
use mempool::{MemPool, MemPoolHandle};
use serde::{Deserialize, Serialize};
use crate::{block_settlement_client::BlockSettlementClient, block_store::SequencerStore};
@ -29,20 +27,6 @@ pub struct SequencerCore {
last_bedrock_msg_id: MantleMsgId,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TransactionMalformationError {
InvalidSignature,
FailedToDecode { tx: HashType },
}
impl Display for TransactionMalformationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:#?}")
}
}
impl std::error::Error for TransactionMalformationError {}
impl SequencerCore {
/// Starts the sequencer using the provided configuration.
/// If an existing database is found, the sequencer state is loaded from it and
@ -268,36 +252,12 @@ impl SequencerCore {
}
}
// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction
pub fn transaction_pre_check(
tx: NSSATransaction,
) -> Result<NSSATransaction, TransactionMalformationError> {
// Stateless checks here
match tx {
NSSATransaction::Public(tx) => {
if tx.witness_set().is_valid_for(tx.message()) {
Ok(NSSATransaction::Public(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::PrivacyPreserving(tx) => {
if tx.witness_set().signatures_are_valid_for(tx.message()) {
Ok(NSSATransaction::PrivacyPreserving(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)),
}
}
#[cfg(test)]
mod tests {
use std::pin::pin;
use base58::{FromBase58, ToBase58};
use common::test_utils::sequencer_sign_key_for_testing;
use common::{test_utils::sequencer_sign_key_for_testing, transaction::transaction_pre_check};
use nssa::PrivateKey;
use super::*;

View File

@ -22,12 +22,12 @@ use common::{
PostIndexerMessageResponse, SendTxRequest, SendTxResponse,
},
},
transaction::{EncodedTransaction, NSSATransaction},
transaction::{EncodedTransaction, NSSATransaction, TransactionMalformationError, transaction_pre_check},
};
use itertools::Itertools as _;
use log::warn;
use nssa::{self, program::Program};
use sequencer_core::{TransactionMalformationError, config::AccountInitialData};
use sequencer_core::config::AccountInitialData;
use serde_json::Value;
use super::{JsonHandler, respond, types::err_rpc::RpcErr};
@ -88,7 +88,7 @@ impl JsonHandler {
let transaction = NSSATransaction::try_from(&tx)
.map_err(|_| TransactionMalformationError::FailedToDecode { tx: tx.hash() })?;
let authenticated_tx = sequencer_core::transaction_pre_check(transaction)
let authenticated_tx = transaction_pre_check(transaction)
.inspect_err(|err| warn!("Error at pre_check {err:#?}"))?;
// TODO: Do we need a timeout here? It will be usable if we have too many transactions to

View File

@ -1,6 +1,5 @@
use common::rpc_primitives::errors::{RpcError, RpcParseError};
use common::{rpc_primitives::errors::{RpcError, RpcParseError}, transaction::TransactionMalformationError};
use log::debug;
use sequencer_core::TransactionMalformationError;
pub struct RpcErr(pub RpcError);

View File

@ -29,8 +29,6 @@ pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db";
pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db";
/// Key base for storing metainformation which describe if first block has been set
pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set";
/// Key base for storing metainformation about the last finalized block on Bedrock
pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id";
/// Key base for storing metainformation about the last breakpoint
pub const DB_META_LAST_BREAKPOINT_ID: &str = "last_breakpoint_id";
@ -57,8 +55,7 @@ pub struct RocksDBIO {
impl RocksDBIO {
pub fn open_or_create(
path: &Path,
start_block: Option<Block>,
initial_state: V02State,
start_data: Option<(Block, V02State)>,
) -> DbResult<Self> {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
@ -85,12 +82,11 @@ impl RocksDBIO {
if is_start_set {
Ok(dbio)
} else if let Some(block) = start_block {
} else if let Some((block, initial_state)) = start_data {
let block_id = block.header.block_id;
dbio.put_meta_first_block_in_db(block)?;
dbio.put_meta_is_first_block_set()?;
dbio.put_meta_last_block_in_db(block_id)?;
dbio.put_meta_last_finalized_block_id(None)?;
// First breakpoint setup
dbio.put_breakpoint(0, initial_state)?;
@ -255,7 +251,7 @@ impl RocksDBIO {
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
self.put_block(block, true)?;
self.put_block(block)?;
Ok(())
}
@ -281,28 +277,6 @@ impl RocksDBIO {
Ok(())
}
pub fn put_meta_last_finalized_block_id(&self, block_id: Option<u64>) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_FINALIZED_BLOCK_ID).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_FINALIZED_BLOCK_ID".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn put_meta_last_breakpoint_id(&self, br_id: u64) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
@ -342,17 +316,9 @@ impl RocksDBIO {
Ok(())
}
pub fn put_block(&self, block: Block, first: bool) -> DbResult<()> {
pub fn put_block(&self, block: Block) -> DbResult<()> {
let cf_block = self.block_column();
if !first {
let last_curr_block = self.get_meta_last_block_in_db()?;
if block.header.block_id > last_curr_block {
self.put_meta_last_block_in_db(block.header.block_id)?;
}
}
self.db
.put_cf(
&cf_block,
@ -371,6 +337,12 @@ impl RocksDBIO {
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
let last_curr_block = self.get_meta_last_block_in_db()?;
if block.header.block_id > last_curr_block {
self.put_meta_last_block_in_db(block.header.block_id)?;
}
if block.header.block_id.is_multiple_of(BREAKPOINT_INTERVAL) {
self.put_next_breakpoint()?;
}
@ -465,7 +437,15 @@ impl RocksDBIO {
let br_id = closest_breakpoint_id(block_id);
let mut breakpoint = self.get_breakpoint(br_id)?;
for id in (BREAKPOINT_INTERVAL * br_id)..=block_id {
// ToDo: update it to handle any genesis id
// right now works correctly only if genesis_id < BREAKPOINT_INTERVAL
let start = if br_id != 0 {
BREAKPOINT_INTERVAL * br_id
} else {
self.get_meta_first_block_in_db()?
};
for id in start..=block_id {
let block = self.get_block(id)?;
for encoded_transaction in block.body.transactions {
@ -483,6 +463,10 @@ impl RocksDBIO {
}
}
pub fn final_state(&self) -> DbResult<V02State> {
self.calculate_state_for_id(self.get_meta_last_block_in_db()?)
}
pub fn put_next_breakpoint(&self) -> DbResult<()> {
let last_block = self.get_meta_last_block_in_db()?;
let breakpoint_id = self.get_meta_last_breakpoint_id()?;