From dc5b5ba27861a5ce653ff1ec8b3b90c68397784a Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Wed, 4 Feb 2026 14:57:38 +0200 Subject: [PATCH] feat: api methods population --- common/src/sequencer_client.rs | 8 +- indexer/core/src/block_store.rs | 36 ++++++- indexer/core/src/config.rs | 4 +- indexer/core/src/lib.rs | 7 +- indexer/service/src/lib.rs | 5 +- indexer/service/src/service.rs | 97 ++++++++++++++++--- integration_tests/tests/tps.rs | 5 +- sequencer_core/src/config.rs | 4 +- sequencer_rpc/src/process.rs | 13 ++- storage/src/indexer.rs | 166 +++++++++++++++++++++++++++++++- storage/src/sequencer.rs | 5 +- 11 files changed, 313 insertions(+), 37 deletions(-) diff --git a/common/src/sequencer_client.rs b/common/src/sequencer_client.rs index 4c97ea1e..e65984b7 100644 --- a/common/src/sequencer_client.rs +++ b/common/src/sequencer_client.rs @@ -18,7 +18,13 @@ use crate::{ rpc_primitives::{ self, requests::{ - GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetGenesisBlockRequest, GetGenesisBlockResponse, GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest, SendTxResponse + GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, + GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, + GetGenesisBlockRequest, GetGenesisBlockResponse, GetInitialTestnetAccountsResponse, + GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, + GetProofForCommitmentRequest, GetProofForCommitmentResponse, + GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest, + SendTxResponse, }, }, transaction::{EncodedTransaction, NSSATransaction}, diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index 35037309..d83859ae 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -5,7 +5,7 @@ use common::{ block::Block, transaction::{NSSATransaction, execute_check_transaction_on_state, transaction_pre_check}, }; -use nssa::V02State; +use nssa::{Account, AccountId, V02State}; use storage::indexer::RocksDBIO; #[derive(Clone)] @@ -24,7 +24,9 @@ impl IndexerStore { ) -> Result { let dbio = RocksDBIO::open_or_create(location, start_data)?; - Ok(Self { dbio: Arc::new(dbio) }) + Ok(Self { + dbio: Arc::new(dbio), + }) } /// Reopening existing database @@ -36,6 +38,32 @@ impl IndexerStore { Ok(self.dbio.get_block(id)?) } + pub fn get_block_batch(&self, offset: u64, limit: u64) -> Result> { + Ok(self.dbio.get_block_batch(offset, limit)?) + } + + pub fn get_transaction_by_hash(&self, tx_hash: [u8; 32]) -> Result { + let block = self.get_block_at_id(self.dbio.get_block_id_by_tx_hash(tx_hash)?)?; + let encoded_transaction = block + .body + .transactions + .iter() + .find_map(|enc_tx| { + if enc_tx.hash() == tx_hash { + Some(enc_tx) + } else { + None + } + }) + .ok_or_else(|| anyhow::anyhow!("Transaction not found in DB"))?; + + Ok(NSSATransaction::try_from(encoded_transaction)?) + } + + pub fn get_block_by_hash(&self, hash: [u8; 32]) -> Result { + Ok(self.get_block_at_id(self.dbio.get_block_id_by_hash(hash)?)?) + } + pub fn genesis_id(&self) -> u64 { self.dbio .get_meta_first_block_in_db() @@ -56,6 +84,10 @@ impl IndexerStore { Ok(self.dbio.final_state()?) } + pub fn get_account_final(&self, account_id: &AccountId) -> Result { + Ok(self.final_state()?.get_account_by_id(account_id)) + } + pub fn put_block(&self, block: Block) -> Result<()> { let mut final_state = self.dbio.final_state()?; diff --git a/indexer/core/src/config.rs b/indexer/core/src/config.rs index 152b6916..2f871dcc 100644 --- a/indexer/core/src/config.rs +++ b/indexer/core/src/config.rs @@ -7,7 +7,9 @@ use std::{ use anyhow::{Context, Result}; use bedrock_client::BackoffConfig; use common::{ - block::{AccountInitialData, CommitmentsInitialData}, config::BasicAuth}; + block::{AccountInitialData, CommitmentsInitialData}, + config::BasicAuth, +}; use logos_blockchain_core::mantle::ops::channel::ChannelId; use serde::{Deserialize, Serialize}; use url::Url; diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index cb4d82f6..950b2538 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -2,10 +2,7 @@ use anyhow::Result; use bedrock_client::BedrockClient; // ToDo: Remove after testnet use common::PINATA_BASE58; -use common::{ - block::Block, - sequencer_client::SequencerClient, -}; +use common::{block::Block, sequencer_client::SequencerClient}; use futures::StreamExt; use log::info; use logos_blockchain_core::mantle::{ @@ -135,4 +132,4 @@ fn parse_blocks( _ => None, }) }) -} \ No newline at end of file +} diff --git a/indexer/service/src/lib.rs b/indexer/service/src/lib.rs index c9a60bcc..81c9a82a 100644 --- a/indexer/service/src/lib.rs +++ b/indexer/service/src/lib.rs @@ -71,8 +71,9 @@ pub async fn run_server(config: IndexerConfig, port: u16) -> Result Result { - todo!() + async fn get_block_by_id(&self, block_id: BlockId) -> Result { + self.indexer + .store + .get_block_at_id(block_id) + .map_err(|err| { + ErrorObjectOwned::owned(-32001, format!("DBError"), Some(format!("{err:#?}"))) + })? + .try_into() + .map_err(|err| { + ErrorObjectOwned::owned( + -32000, + format!("Conversion error"), + Some(format!("{err:#?}")), + ) + }) } - async fn get_block_by_hash(&self, _block_hash: Hash) -> Result { - todo!() + async fn get_block_by_hash(&self, block_hash: Hash) -> Result { + self.indexer + .store + .get_block_by_hash(block_hash.0) + .map_err(|err| { + ErrorObjectOwned::owned(-32001, format!("DBError"), Some(format!("{err:#?}"))) + })? + .try_into() + .map_err(|err| { + ErrorObjectOwned::owned( + -32000, + format!("Conversion error"), + Some(format!("{err:#?}")), + ) + }) } - async fn get_account(&self, _account_id: AccountId) -> Result { - todo!() + async fn get_account(&self, account_id: AccountId) -> Result { + self.indexer + .store + .get_account_final(&account_id.into()) + .map_err(|err| { + ErrorObjectOwned::owned(-32001, format!("DBError"), Some(format!("{err:#?}"))) + })? + .try_into() + .map_err(|err| { + ErrorObjectOwned::owned( + -32000, + format!("Conversion error"), + Some(format!("{err:#?}")), + ) + }) } - async fn get_transaction(&self, _tx_hash: Hash) -> Result { - todo!() + async fn get_transaction(&self, tx_hash: Hash) -> Result { + self.indexer + .store + .get_transaction_by_hash(tx_hash.0) + .map_err(|err| { + ErrorObjectOwned::owned(-32001, format!("DBError"), Some(format!("{err:#?}"))) + })? + .try_into() + .map_err(|err| { + ErrorObjectOwned::owned( + -32000, + format!("Conversion error"), + Some(format!("{err:#?}")), + ) + }) } - async fn get_blocks(&self, _offset: u32, _limit: u32) -> Result, ErrorObjectOwned> { - todo!() + async fn get_blocks(&self, offset: u32, limit: u32) -> Result, ErrorObjectOwned> { + let blocks = self + .indexer + .store + .get_block_batch(offset as u64, limit as u64) + .map_err(|err| { + ErrorObjectOwned::owned(-32001, format!("DBError"), Some(format!("{err:#?}"))) + })?; + + let mut block_res = vec![]; + + for block in blocks { + block_res.push(block.try_into().map_err(|err| { + ErrorObjectOwned::owned( + -32000, + format!("Conversion error"), + Some(format!("{err:#?}")), + ) + })?) + } + + Ok(block_res) } async fn get_transactions_by_account( diff --git a/integration_tests/tests/tps.rs b/integration_tests/tests/tps.rs index 6d451dcc..46b8ca53 100644 --- a/integration_tests/tests/tps.rs +++ b/integration_tests/tests/tps.rs @@ -26,10 +26,7 @@ pub async fn tps_test() -> Result<()> { let target_tps = 12; let tps_test = TpsTestManager::new(target_tps, num_transactions); - let ctx = TestContext::new_with_sequencer_config( - tps_test.generate_sequencer_config() - ) - .await?; + let ctx = TestContext::new_with_sequencer_config(tps_test.generate_sequencer_config()).await?; let target_time = tps_test.target_time(); info!( diff --git a/sequencer_core/src/config.rs b/sequencer_core/src/config.rs index 6cae44cb..669069ce 100644 --- a/sequencer_core/src/config.rs +++ b/sequencer_core/src/config.rs @@ -6,7 +6,9 @@ use std::{ use anyhow::Result; use common::{ - block::{AccountInitialData, CommitmentsInitialData}, config::BasicAuth}; + block::{AccountInitialData, CommitmentsInitialData}, + config::BasicAuth, +}; use logos_blockchain_core::mantle::ops::channel::ChannelId; use serde::{Deserialize, Serialize}; use url::Url; diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index 285e482c..c6a7e439 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -11,7 +11,15 @@ use common::{ message::{Message, Request}, parser::RpcRequest, requests::{ - GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetGenesisBlockRequest, GetGenesisBlockResponse, GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest, SendTxResponse + GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest, + GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, + GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest, + GetBlockRangeDataResponse, GetGenesisBlockRequest, GetGenesisBlockResponse, + GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest, + GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, + GetProofForCommitmentRequest, GetProofForCommitmentResponse, + GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse, + SendTxRequest, SendTxResponse, }, }, transaction::{ @@ -355,7 +363,8 @@ mod tests { use base58::ToBase58; use base64::{Engine, engine::general_purpose}; use common::{ - block::AccountInitialData, config::BasicAuth, test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction + block::AccountInitialData, config::BasicAuth, test_utils::sequencer_sign_key_for_testing, + transaction::EncodedTransaction, }; use sequencer_core::{ SequencerCore, diff --git a/storage/src/indexer.rs b/storage/src/indexer.rs index 35d14fa9..e50accbd 100644 --- a/storage/src/indexer.rs +++ b/storage/src/indexer.rs @@ -41,6 +41,10 @@ pub const CF_BLOCK_NAME: &str = "cf_block"; pub const CF_META_NAME: &str = "cf_meta"; /// Name of breakpoint column family pub const CF_BREAKPOINT_NAME: &str = "cf_breakpoint"; +/// Name of hash to id map column family +pub const CF_HASH_TO_ID: &str = "cf_hash_to_id"; +/// Name of tx hash to id map column family +pub const CF_TX_TO_ID: &str = "cf_tx_to_id"; pub type DbResult = Result; @@ -60,6 +64,8 @@ impl RocksDBIO { let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone()); let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone()); let cfbreakpoint = ColumnFamilyDescriptor::new(CF_BREAKPOINT_NAME, cf_opts.clone()); + let cfhti = ColumnFamilyDescriptor::new(CF_HASH_TO_ID, cf_opts.clone()); + let cftti = ColumnFamilyDescriptor::new(CF_TX_TO_ID, cf_opts.clone()); let mut db_opts = Options::default(); db_opts.create_missing_column_families(true); @@ -67,7 +73,7 @@ impl RocksDBIO { let db = DBWithThreadMode::::open_cf_descriptors( &db_opts, path, - vec![cfb, cfmeta, cfbreakpoint], + vec![cfb, cfmeta, cfbreakpoint, cfhti, cftti], ); let dbio = Self { @@ -103,6 +109,8 @@ impl RocksDBIO { let _cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone()); let _cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone()); let _cfsnapshot = ColumnFamilyDescriptor::new(CF_BREAKPOINT_NAME, cf_opts.clone()); + let _cfhti = ColumnFamilyDescriptor::new(CF_HASH_TO_ID, cf_opts.clone()); + let _cftti = ColumnFamilyDescriptor::new(CF_TX_TO_ID, cf_opts.clone()); let mut db_opts = Options::default(); db_opts.create_missing_column_families(true); @@ -111,6 +119,8 @@ impl RocksDBIO { .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None)) } + // Columns + pub fn meta_column(&self) -> Arc> { self.db.cf_handle(CF_META_NAME).unwrap() } @@ -123,6 +133,16 @@ impl RocksDBIO { self.db.cf_handle(CF_BREAKPOINT_NAME).unwrap() } + pub fn hash_to_id_column(&self) -> Arc> { + self.db.cf_handle(CF_HASH_TO_ID).unwrap() + } + + pub fn tx_hash_to_id_column(&self) -> Arc> { + self.db.cf_handle(CF_TX_TO_ID).unwrap() + } + + // Meta + pub fn get_meta_first_block_in_db(&self) -> DbResult { let cf_meta = self.meta_column(); let res = self @@ -313,8 +333,12 @@ impl RocksDBIO { Ok(()) } + // Block + pub fn put_block(&self, block: Block) -> DbResult<()> { let cf_block = self.block_column(); + let cf_hti = self.hash_to_id_column(); + let cf_tti = self.hash_to_id_column(); self.db .put_cf( @@ -340,6 +364,46 @@ impl RocksDBIO { self.put_meta_last_block_in_db(block.header.block_id)?; } + // ToDo: rewrite this with write batching + + self.db + .put_cf( + &cf_hti, + borsh::to_vec(&block.header.hash).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize block hash".to_string()), + ) + })?, + borsh::to_vec(&block.header.block_id).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize block id".to_string()), + ) + })?, + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + for tx in block.body.transactions { + self.db + .put_cf( + &cf_tti, + borsh::to_vec(&tx.hash()).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize tx hash".to_string()), + ) + })?, + borsh::to_vec(&block.header.block_id).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize block id".to_string()), + ) + })?, + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + } + if block.header.block_id.is_multiple_of(BREAKPOINT_INTERVAL) { self.put_next_breakpoint()?; } @@ -376,6 +440,46 @@ impl RocksDBIO { } } + pub fn get_block_batch(&self, offset: u64, limit: u64) -> DbResult> { + let cf_block = self.block_column(); + let mut block_batch = vec![]; + + // ToDo: Multi get this + + for block_id in offset..(offset + limit) { + let res = self + .db + .get_cf( + &cf_block, + borsh::to_vec(&block_id).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize block id".to_string()), + ) + })?, + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + let block = if let Some(data) = res { + Ok(borsh::from_slice::(&data).map_err(|serr| { + DbError::borsh_cast_message( + serr, + Some("Failed to deserialize block data".to_string()), + ) + })?) + } else { + // Block not found, assuming that previous one was the last + break; + }?; + + block_batch.push(block); + } + + Ok(block_batch) + } + + // State + pub fn put_breakpoint(&self, br_id: u64, breakpoint: V02State) -> DbResult<()> { let cf_br = self.breakpoint_column(); @@ -498,4 +602,64 @@ impl RocksDBIO { )) } } + + // Mappings + + pub fn get_block_id_by_hash(&self, hash: [u8; 32]) -> DbResult { + let cf_hti = self.hash_to_id_column(); + let res = self + .db + .get_cf( + &cf_hti, + borsh::to_vec(&hash).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize block hash".to_string()), + ) + })?, + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + if let Some(data) = res { + Ok(borsh::from_slice::(&data).map_err(|serr| { + DbError::borsh_cast_message( + serr, + Some("Failed to deserialize block id".to_string()), + ) + })?) + } else { + Err(DbError::db_interaction_error( + "Block on this hash not found".to_string(), + )) + } + } + + pub fn get_block_id_by_tx_hash(&self, tx_hash: [u8; 32]) -> DbResult { + let cf_tti = self.tx_hash_to_id_column(); + let res = self + .db + .get_cf( + &cf_tti, + borsh::to_vec(&tx_hash).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize block hash".to_string()), + ) + })?, + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + if let Some(data) = res { + Ok(borsh::from_slice::(&data).map_err(|serr| { + DbError::borsh_cast_message( + serr, + Some("Failed to deserialize block id".to_string()), + ) + })?) + } else { + Err(DbError::db_interaction_error( + "Block on this hash not found".to_string(), + )) + } + } } diff --git a/storage/src/sequencer.rs b/storage/src/sequencer.rs index d0985599..cd1a9f52 100644 --- a/storage/src/sequencer.rs +++ b/storage/src/sequencer.rs @@ -1,7 +1,6 @@ use std::{path::Path, sync::Arc}; -use common:: - block::Block; +use common::block::Block; use nssa::V02State; use rocksdb::{ BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch, @@ -439,4 +438,4 @@ impl RocksDBIO { ) }) } -} \ No newline at end of file +}