feat: api methods population

This commit is contained in:
Pravdyvy 2026-02-04 14:57:38 +02:00
parent efac8639c3
commit dc5b5ba278
11 changed files with 313 additions and 37 deletions

View File

@ -18,7 +18,13 @@ use crate::{
rpc_primitives::{
self,
requests::{
GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetGenesisBlockRequest, GetGenesisBlockResponse, GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest, SendTxResponse
GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest,
GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse,
GetGenesisBlockRequest, GetGenesisBlockResponse, GetInitialTestnetAccountsResponse,
GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse,
GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest,
SendTxResponse,
},
},
transaction::{EncodedTransaction, NSSATransaction},

View File

@ -5,7 +5,7 @@ use common::{
block::Block,
transaction::{NSSATransaction, execute_check_transaction_on_state, transaction_pre_check},
};
use nssa::V02State;
use nssa::{Account, AccountId, V02State};
use storage::indexer::RocksDBIO;
#[derive(Clone)]
@ -24,7 +24,9 @@ impl IndexerStore {
) -> Result<Self> {
let dbio = RocksDBIO::open_or_create(location, start_data)?;
Ok(Self { dbio: Arc::new(dbio) })
Ok(Self {
dbio: Arc::new(dbio),
})
}
/// Reopening existing database
@ -36,6 +38,32 @@ impl IndexerStore {
Ok(self.dbio.get_block(id)?)
}
pub fn get_block_batch(&self, offset: u64, limit: u64) -> Result<Vec<Block>> {
Ok(self.dbio.get_block_batch(offset, limit)?)
}
pub fn get_transaction_by_hash(&self, tx_hash: [u8; 32]) -> Result<NSSATransaction> {
let block = self.get_block_at_id(self.dbio.get_block_id_by_tx_hash(tx_hash)?)?;
let encoded_transaction = block
.body
.transactions
.iter()
.find_map(|enc_tx| {
if enc_tx.hash() == tx_hash {
Some(enc_tx)
} else {
None
}
})
.ok_or_else(|| anyhow::anyhow!("Transaction not found in DB"))?;
Ok(NSSATransaction::try_from(encoded_transaction)?)
}
pub fn get_block_by_hash(&self, hash: [u8; 32]) -> Result<Block> {
Ok(self.get_block_at_id(self.dbio.get_block_id_by_hash(hash)?)?)
}
pub fn genesis_id(&self) -> u64 {
self.dbio
.get_meta_first_block_in_db()
@ -56,6 +84,10 @@ impl IndexerStore {
Ok(self.dbio.final_state()?)
}
pub fn get_account_final(&self, account_id: &AccountId) -> Result<Account> {
Ok(self.final_state()?.get_account_by_id(account_id))
}
pub fn put_block(&self, block: Block) -> Result<()> {
let mut final_state = self.dbio.final_state()?;

View File

@ -7,7 +7,9 @@ use std::{
use anyhow::{Context, Result};
use bedrock_client::BackoffConfig;
use common::{
block::{AccountInitialData, CommitmentsInitialData}, config::BasicAuth};
block::{AccountInitialData, CommitmentsInitialData},
config::BasicAuth,
};
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
use url::Url;

View File

@ -2,10 +2,7 @@ use anyhow::Result;
use bedrock_client::BedrockClient;
// ToDo: Remove after testnet
use common::PINATA_BASE58;
use common::{
block::Block,
sequencer_client::SequencerClient,
};
use common::{block::Block, sequencer_client::SequencerClient};
use futures::StreamExt;
use log::info;
use logos_blockchain_core::mantle::{
@ -135,4 +132,4 @@ fn parse_blocks(
_ => None,
})
})
}
}

View File

@ -71,8 +71,9 @@ pub async fn run_server(config: IndexerConfig, port: u16) -> Result<IndexerHandl
#[cfg(not(feature = "mock-responses"))]
let handle = {
let service =
service::IndexerService::new(config).await.context("Failed to initialize indexer service")?;
let service = service::IndexerService::new(config)
.await
.context("Failed to initialize indexer service")?;
server.start(service.into_rpc())
};
#[cfg(feature = "mock-responses")]

View File

@ -13,11 +13,6 @@ use tokio::sync::{Mutex, mpsc::UnboundedSender};
pub struct IndexerService {
subscription_service: SubscriptionService,
#[expect(
dead_code,
reason = "Will be used in future implementations of RPC methods"
)]
indexer: IndexerCore,
}
@ -46,24 +41,96 @@ impl indexer_service_rpc::RpcServer for IndexerService {
Ok(())
}
async fn get_block_by_id(&self, _block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
todo!()
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
self.indexer
.store
.get_block_at_id(block_id)
.map_err(|err| {
ErrorObjectOwned::owned(-32001, format!("DBError"), Some(format!("{err:#?}")))
})?
.try_into()
.map_err(|err| {
ErrorObjectOwned::owned(
-32000,
format!("Conversion error"),
Some(format!("{err:#?}")),
)
})
}
async fn get_block_by_hash(&self, _block_hash: Hash) -> Result<Block, ErrorObjectOwned> {
todo!()
async fn get_block_by_hash(&self, block_hash: Hash) -> Result<Block, ErrorObjectOwned> {
self.indexer
.store
.get_block_by_hash(block_hash.0)
.map_err(|err| {
ErrorObjectOwned::owned(-32001, format!("DBError"), Some(format!("{err:#?}")))
})?
.try_into()
.map_err(|err| {
ErrorObjectOwned::owned(
-32000,
format!("Conversion error"),
Some(format!("{err:#?}")),
)
})
}
async fn get_account(&self, _account_id: AccountId) -> Result<Account, ErrorObjectOwned> {
todo!()
async fn get_account(&self, account_id: AccountId) -> Result<Account, ErrorObjectOwned> {
self.indexer
.store
.get_account_final(&account_id.into())
.map_err(|err| {
ErrorObjectOwned::owned(-32001, format!("DBError"), Some(format!("{err:#?}")))
})?
.try_into()
.map_err(|err| {
ErrorObjectOwned::owned(
-32000,
format!("Conversion error"),
Some(format!("{err:#?}")),
)
})
}
async fn get_transaction(&self, _tx_hash: Hash) -> Result<Transaction, ErrorObjectOwned> {
todo!()
async fn get_transaction(&self, tx_hash: Hash) -> Result<Transaction, ErrorObjectOwned> {
self.indexer
.store
.get_transaction_by_hash(tx_hash.0)
.map_err(|err| {
ErrorObjectOwned::owned(-32001, format!("DBError"), Some(format!("{err:#?}")))
})?
.try_into()
.map_err(|err| {
ErrorObjectOwned::owned(
-32000,
format!("Conversion error"),
Some(format!("{err:#?}")),
)
})
}
async fn get_blocks(&self, _offset: u32, _limit: u32) -> Result<Vec<Block>, ErrorObjectOwned> {
todo!()
async fn get_blocks(&self, offset: u32, limit: u32) -> Result<Vec<Block>, ErrorObjectOwned> {
let blocks = self
.indexer
.store
.get_block_batch(offset as u64, limit as u64)
.map_err(|err| {
ErrorObjectOwned::owned(-32001, format!("DBError"), Some(format!("{err:#?}")))
})?;
let mut block_res = vec![];
for block in blocks {
block_res.push(block.try_into().map_err(|err| {
ErrorObjectOwned::owned(
-32000,
format!("Conversion error"),
Some(format!("{err:#?}")),
)
})?)
}
Ok(block_res)
}
async fn get_transactions_by_account(

View File

@ -26,10 +26,7 @@ pub async fn tps_test() -> Result<()> {
let target_tps = 12;
let tps_test = TpsTestManager::new(target_tps, num_transactions);
let ctx = TestContext::new_with_sequencer_config(
tps_test.generate_sequencer_config()
)
.await?;
let ctx = TestContext::new_with_sequencer_config(tps_test.generate_sequencer_config()).await?;
let target_time = tps_test.target_time();
info!(

View File

@ -6,7 +6,9 @@ use std::{
use anyhow::Result;
use common::{
block::{AccountInitialData, CommitmentsInitialData}, config::BasicAuth};
block::{AccountInitialData, CommitmentsInitialData},
config::BasicAuth,
};
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
use url::Url;

View File

@ -11,7 +11,15 @@ use common::{
message::{Message, Request},
parser::RpcRequest,
requests::{
GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetGenesisBlockRequest, GetGenesisBlockResponse, GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest, SendTxResponse
GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest,
GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse,
GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest,
GetBlockRangeDataResponse, GetGenesisBlockRequest, GetGenesisBlockResponse,
GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest,
GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse,
GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse,
SendTxRequest, SendTxResponse,
},
},
transaction::{
@ -355,7 +363,8 @@ mod tests {
use base58::ToBase58;
use base64::{Engine, engine::general_purpose};
use common::{
block::AccountInitialData, config::BasicAuth, test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction
block::AccountInitialData, config::BasicAuth, test_utils::sequencer_sign_key_for_testing,
transaction::EncodedTransaction,
};
use sequencer_core::{
SequencerCore,

View File

@ -41,6 +41,10 @@ pub const CF_BLOCK_NAME: &str = "cf_block";
pub const CF_META_NAME: &str = "cf_meta";
/// Name of breakpoint column family
pub const CF_BREAKPOINT_NAME: &str = "cf_breakpoint";
/// Name of hash to id map column family
pub const CF_HASH_TO_ID: &str = "cf_hash_to_id";
/// Name of tx hash to id map column family
pub const CF_TX_TO_ID: &str = "cf_tx_to_id";
pub type DbResult<T> = Result<T, DbError>;
@ -60,6 +64,8 @@ impl RocksDBIO {
let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
let cfbreakpoint = ColumnFamilyDescriptor::new(CF_BREAKPOINT_NAME, cf_opts.clone());
let cfhti = ColumnFamilyDescriptor::new(CF_HASH_TO_ID, cf_opts.clone());
let cftti = ColumnFamilyDescriptor::new(CF_TX_TO_ID, cf_opts.clone());
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
@ -67,7 +73,7 @@ impl RocksDBIO {
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_opts,
path,
vec![cfb, cfmeta, cfbreakpoint],
vec![cfb, cfmeta, cfbreakpoint, cfhti, cftti],
);
let dbio = Self {
@ -103,6 +109,8 @@ impl RocksDBIO {
let _cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
let _cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
let _cfsnapshot = ColumnFamilyDescriptor::new(CF_BREAKPOINT_NAME, cf_opts.clone());
let _cfhti = ColumnFamilyDescriptor::new(CF_HASH_TO_ID, cf_opts.clone());
let _cftti = ColumnFamilyDescriptor::new(CF_TX_TO_ID, cf_opts.clone());
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
@ -111,6 +119,8 @@ impl RocksDBIO {
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))
}
// Columns
pub fn meta_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_META_NAME).unwrap()
}
@ -123,6 +133,16 @@ impl RocksDBIO {
self.db.cf_handle(CF_BREAKPOINT_NAME).unwrap()
}
pub fn hash_to_id_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_HASH_TO_ID).unwrap()
}
pub fn tx_hash_to_id_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_TX_TO_ID).unwrap()
}
// Meta
pub fn get_meta_first_block_in_db(&self) -> DbResult<u64> {
let cf_meta = self.meta_column();
let res = self
@ -313,8 +333,12 @@ impl RocksDBIO {
Ok(())
}
// Block
pub fn put_block(&self, block: Block) -> DbResult<()> {
let cf_block = self.block_column();
let cf_hti = self.hash_to_id_column();
let cf_tti = self.hash_to_id_column();
self.db
.put_cf(
@ -340,6 +364,46 @@ impl RocksDBIO {
self.put_meta_last_block_in_db(block.header.block_id)?;
}
// ToDo: rewrite this with write batching
self.db
.put_cf(
&cf_hti,
borsh::to_vec(&block.header.hash).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block hash".to_string()),
)
})?,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
for tx in block.body.transactions {
self.db
.put_cf(
&cf_tti,
borsh::to_vec(&tx.hash()).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize tx hash".to_string()),
)
})?,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
}
if block.header.block_id.is_multiple_of(BREAKPOINT_INTERVAL) {
self.put_next_breakpoint()?;
}
@ -376,6 +440,46 @@ impl RocksDBIO {
}
}
pub fn get_block_batch(&self, offset: u64, limit: u64) -> DbResult<Vec<Block>> {
let cf_block = self.block_column();
let mut block_batch = vec![];
// ToDo: Multi get this
for block_id in offset..(offset + limit) {
let res = self
.db
.get_cf(
&cf_block,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
let block = if let Some(data) = res {
Ok(borsh::from_slice::<Block>(&data).map_err(|serr| {
DbError::borsh_cast_message(
serr,
Some("Failed to deserialize block data".to_string()),
)
})?)
} else {
// Block not found, assuming that previous one was the last
break;
}?;
block_batch.push(block);
}
Ok(block_batch)
}
// State
pub fn put_breakpoint(&self, br_id: u64, breakpoint: V02State) -> DbResult<()> {
let cf_br = self.breakpoint_column();
@ -498,4 +602,64 @@ impl RocksDBIO {
))
}
}
// Mappings
pub fn get_block_id_by_hash(&self, hash: [u8; 32]) -> DbResult<u64> {
let cf_hti = self.hash_to_id_column();
let res = self
.db
.get_cf(
&cf_hti,
borsh::to_vec(&hash).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block hash".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<u64>(&data).map_err(|serr| {
DbError::borsh_cast_message(
serr,
Some("Failed to deserialize block id".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Block on this hash not found".to_string(),
))
}
}
pub fn get_block_id_by_tx_hash(&self, tx_hash: [u8; 32]) -> DbResult<u64> {
let cf_tti = self.tx_hash_to_id_column();
let res = self
.db
.get_cf(
&cf_tti,
borsh::to_vec(&tx_hash).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block hash".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<u64>(&data).map_err(|serr| {
DbError::borsh_cast_message(
serr,
Some("Failed to deserialize block id".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Block on this hash not found".to_string(),
))
}
}
}

View File

@ -1,7 +1,6 @@
use std::{path::Path, sync::Arc};
use common::
block::Block;
use common::block::Block;
use nssa::V02State;
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch,
@ -439,4 +438,4 @@ impl RocksDBIO {
)
})
}
}
}