From 357e0f1444fd462173bf179db2bc16d75e33c3cd Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Thu, 5 Feb 2026 16:21:08 +0200 Subject: [PATCH] feat: all methods populated --- common/src/transaction.rs | 12 +- indexer/core/src/block_store.rs | 9 + indexer/service/src/service.rs | 28 ++- .../transaction.rs | 10 + .../transaction.rs | 5 + nssa/src/public_transaction/transaction.rs | 10 + storage/src/indexer.rs | 226 +++++++++++++++++- 7 files changed, 289 insertions(+), 11 deletions(-) diff --git a/common/src/transaction.rs b/common/src/transaction.rs index 2245ec54..8c24d199 100644 --- a/common/src/transaction.rs +++ b/common/src/transaction.rs @@ -2,7 +2,7 @@ use std::fmt::Display; use borsh::{BorshDeserialize, BorshSerialize}; use log::{info, warn}; -use nssa::V02State; +use nssa::{AccountId, V02State}; use serde::{Deserialize, Serialize}; use sha2::{Digest, digest::FixedOutput}; @@ -27,6 +27,16 @@ impl From for NSSATransaction { } } +impl NSSATransaction { + pub fn affected_public_account_ids(&self) -> Vec { + match &self { + &NSSATransaction::ProgramDeployment(tx) => tx.affected_public_account_ids(), + &NSSATransaction::Public(tx) => tx.affected_public_account_ids(), + &NSSATransaction::PrivacyPreserving(tx) => tx.affected_public_account_ids(), + } + } +} + impl From for NSSATransaction { fn from(value: nssa::ProgramDeploymentTransaction) -> Self { Self::ProgramDeployment(value) diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index d83859ae..220d606c 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -64,6 +64,15 @@ impl IndexerStore { Ok(self.get_block_at_id(self.dbio.get_block_id_by_hash(hash)?)?) } + pub fn get_transactions_by_account( + &self, + acc_id: [u8; 32], + offset: u64, + limit: u64, + ) -> Result> { + Ok(self.dbio.get_acc_transactions(acc_id, offset, limit)?) + } + pub fn genesis_id(&self) -> u64 { self.dbio .get_meta_first_block_in_db() diff --git a/indexer/service/src/service.rs b/indexer/service/src/service.rs index 46cd995b..2c51aa3f 100644 --- a/indexer/service/src/service.rs +++ b/indexer/service/src/service.rs @@ -135,11 +135,31 @@ impl indexer_service_rpc::RpcServer for IndexerService { async fn get_transactions_by_account( &self, - _account_id: AccountId, - _limit: u32, - _offset: u32, + account_id: AccountId, + limit: u32, + offset: u32, ) -> Result, ErrorObjectOwned> { - todo!() + let transactions = self + .indexer + .store + .get_transactions_by_account(account_id.value, offset as u64, limit as u64) + .map_err(|err| { + ErrorObjectOwned::owned(-32001, format!("DBError"), Some(format!("{err:#?}"))) + })?; + + let mut tx_res = vec![]; + + for tx in transactions { + tx_res.push(tx.try_into().map_err(|err| { + ErrorObjectOwned::owned( + -32000, + format!("Conversion error"), + Some(format!("{err:#?}")), + ) + })?) + } + + Ok(tx_res) } } diff --git a/nssa/src/privacy_preserving_transaction/transaction.rs b/nssa/src/privacy_preserving_transaction/transaction.rs index 8eb4236e..70d98233 100644 --- a/nssa/src/privacy_preserving_transaction/transaction.rs +++ b/nssa/src/privacy_preserving_transaction/transaction.rs @@ -146,6 +146,16 @@ impl PrivacyPreservingTransaction { .map(|(_, public_key)| AccountId::from(public_key)) .collect() } + + pub fn affected_public_account_ids(&self) -> Vec { + let mut acc_set = self + .signer_account_ids() + .into_iter() + .collect::>(); + acc_set.extend(&self.message.public_account_ids); + + acc_set.into_iter().collect() + } } fn check_privacy_preserving_circuit_proof_is_valid( diff --git a/nssa/src/program_deployment_transaction/transaction.rs b/nssa/src/program_deployment_transaction/transaction.rs index 188b73ea..8e77bfe0 100644 --- a/nssa/src/program_deployment_transaction/transaction.rs +++ b/nssa/src/program_deployment_transaction/transaction.rs @@ -1,4 +1,5 @@ use borsh::{BorshDeserialize, BorshSerialize}; +use nssa_core::account::AccountId; use sha2::{Digest as _, digest::FixedOutput as _}; use crate::{ @@ -38,4 +39,8 @@ impl ProgramDeploymentTransaction { hasher.update(&bytes); hasher.finalize_fixed().into() } + + pub fn affected_public_account_ids(&self) -> Vec { + vec![] + } } diff --git a/nssa/src/public_transaction/transaction.rs b/nssa/src/public_transaction/transaction.rs index 7d42dccc..85608d4f 100644 --- a/nssa/src/public_transaction/transaction.rs +++ b/nssa/src/public_transaction/transaction.rs @@ -45,6 +45,16 @@ impl PublicTransaction { .collect() } + pub fn affected_public_account_ids(&self) -> Vec { + let mut acc_set = self + .signer_account_ids() + .into_iter() + .collect::>(); + acc_set.extend(&self.message.account_ids); + + acc_set.into_iter().collect() + } + pub fn hash(&self) -> [u8; 32] { let bytes = self.to_bytes(); let mut hasher = sha2::Sha256::new(); diff --git a/storage/src/indexer.rs b/storage/src/indexer.rs index e50accbd..67778a39 100644 --- a/storage/src/indexer.rs +++ b/storage/src/indexer.rs @@ -1,4 +1,4 @@ -use std::{ops::Div, path::Path, sync::Arc}; +use std::{collections::HashMap, ops::Div, path::Path, sync::Arc}; use common::{ block::Block, @@ -6,7 +6,7 @@ use common::{ }; use nssa::V02State; use rocksdb::{ - BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, + BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch, }; use crate::error::DbError; @@ -45,6 +45,10 @@ pub const CF_BREAKPOINT_NAME: &str = "cf_breakpoint"; pub const CF_HASH_TO_ID: &str = "cf_hash_to_id"; /// Name of tx hash to id map column family pub const CF_TX_TO_ID: &str = "cf_tx_to_id"; +/// Name of account meta column family +pub const CF_ACC_META: &str = "cf_acc_meta"; +/// Name of account id to tx hash map column family +pub const CF_ACC_TO_TX: &str = "cf_acc_to_tx"; pub type DbResult = Result; @@ -66,6 +70,8 @@ impl RocksDBIO { let cfbreakpoint = ColumnFamilyDescriptor::new(CF_BREAKPOINT_NAME, cf_opts.clone()); let cfhti = ColumnFamilyDescriptor::new(CF_HASH_TO_ID, cf_opts.clone()); let cftti = ColumnFamilyDescriptor::new(CF_TX_TO_ID, cf_opts.clone()); + let cfameta = ColumnFamilyDescriptor::new(CF_ACC_META, cf_opts.clone()); + let cfatt = ColumnFamilyDescriptor::new(CF_ACC_TO_TX, cf_opts.clone()); let mut db_opts = Options::default(); db_opts.create_missing_column_families(true); @@ -73,7 +79,7 @@ impl RocksDBIO { let db = DBWithThreadMode::::open_cf_descriptors( &db_opts, path, - vec![cfb, cfmeta, cfbreakpoint, cfhti, cftti], + vec![cfb, cfmeta, cfbreakpoint, cfhti, cftti, cfameta, cfatt], ); let dbio = Self { @@ -111,6 +117,8 @@ impl RocksDBIO { let _cfsnapshot = ColumnFamilyDescriptor::new(CF_BREAKPOINT_NAME, cf_opts.clone()); let _cfhti = ColumnFamilyDescriptor::new(CF_HASH_TO_ID, cf_opts.clone()); let _cftti = ColumnFamilyDescriptor::new(CF_TX_TO_ID, cf_opts.clone()); + let _cfameta = ColumnFamilyDescriptor::new(CF_ACC_META, cf_opts.clone()); + let _cfatt = ColumnFamilyDescriptor::new(CF_ACC_TO_TX, cf_opts.clone()); let mut db_opts = Options::default(); db_opts.create_missing_column_families(true); @@ -141,6 +149,14 @@ impl RocksDBIO { self.db.cf_handle(CF_TX_TO_ID).unwrap() } + pub fn account_id_to_tx_hash_column(&self) -> Arc> { + self.db.cf_handle(CF_ACC_TO_TX).unwrap() + } + + pub fn account_meta_column(&self) -> Arc> { + self.db.cf_handle(CF_ACC_META).unwrap() + } + // Meta pub fn get_meta_first_block_in_db(&self) -> DbResult { @@ -340,6 +356,8 @@ impl RocksDBIO { let cf_hti = self.hash_to_id_column(); let cf_tti = self.hash_to_id_column(); + // ToDo: rewrite this with write batching + self.db .put_cf( &cf_block, @@ -364,8 +382,6 @@ impl RocksDBIO { self.put_meta_last_block_in_db(block.header.block_id)?; } - // ToDo: rewrite this with write batching - self.db .put_cf( &cf_hti, @@ -384,11 +400,15 @@ impl RocksDBIO { ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + let mut acc_to_tx_map: HashMap<[u8; 32], Vec<[u8; 32]>> = HashMap::new(); + for tx in block.body.transactions { + let tx_hash = tx.hash(); + self.db .put_cf( &cf_tti, - borsh::to_vec(&tx.hash()).map_err(|err| { + borsh::to_vec(&tx_hash).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize tx hash".to_string()), @@ -402,6 +422,29 @@ impl RocksDBIO { })?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + let acc_ids = NSSATransaction::try_from(&tx) + .map_err(|err| { + DbError::db_interaction_error(format!( + "failed to decode transaction in block {} with err {err:?}", + block.header.block_id + )) + })? + .affected_public_account_ids() + .into_iter() + .map(|account_id| account_id.into_value()) + .collect::>(); + + for acc_id in acc_ids { + acc_to_tx_map + .entry(acc_id) + .and_modify(|tx_hashes| tx_hashes.push(tx_hash)) + .or_insert(vec![tx_hash]); + } + } + + for (acc_id, tx_hashes) in acc_to_tx_map { + self.put_account_transactions(acc_id, tx_hashes)?; } if block.header.block_id.is_multiple_of(BREAKPOINT_INTERVAL) { @@ -662,4 +705,175 @@ impl RocksDBIO { )) } } + + // Accounts meta + + fn update_acc_meta_batch( + &self, + acc_id: [u8; 32], + num_tx: u64, + write_batch: &mut WriteBatch, + ) -> DbResult<()> { + let cf_ameta = self.account_meta_column(); + + write_batch.put_cf( + &cf_ameta, + borsh::to_vec(&acc_id).map_err(|err| { + DbError::borsh_cast_message(err, Some("Failed to serialize account id".to_string())) + })?, + borsh::to_vec(&num_tx).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize acc metadata".to_string()), + ) + })?, + ); + + Ok(()) + } + + fn get_acc_meta_num_tx(&self, acc_id: [u8; 32]) -> DbResult> { + let cf_ameta = self.account_meta_column(); + let res = self.db.get_cf(&cf_ameta, acc_id).map_err(|rerr| { + DbError::rocksdb_cast_message(rerr, Some("Failed to read from acc meta cf".to_string())) + })?; + + res.map(|data| { + borsh::from_slice::(&data).map_err(|serr| { + DbError::borsh_cast_message(serr, Some("Failed to deserialize num tx".to_string())) + }) + }) + .transpose() + } + + // Account + + pub fn put_account_transactions( + &self, + acc_id: [u8; 32], + tx_hashes: Vec<[u8; 32]>, + ) -> DbResult<()> { + let acc_num_tx = self.get_acc_meta_num_tx(acc_id)?.unwrap_or(0); + let cf_att = self.account_id_to_tx_hash_column(); + let mut write_batch = WriteBatch::new(); + + for (tx_id, tx_hash) in tx_hashes.iter().enumerate() { + let put_id = acc_num_tx + tx_id as u64; + + let mut prefix = borsh::to_vec(&acc_id).map_err(|berr| { + DbError::borsh_cast_message( + berr, + Some("Failed to serialize account id".to_string()), + ) + })?; + let suffix = borsh::to_vec(&put_id).map_err(|berr| { + DbError::borsh_cast_message(berr, Some("Failed to serialize tx id".to_string())) + })?; + + prefix.extend_from_slice(&suffix); + + write_batch.put_cf( + &cf_att, + prefix, + borsh::to_vec(tx_hash).map_err(|berr| { + DbError::borsh_cast_message( + berr, + Some("Failed to serialize tx hash".to_string()), + ) + })?, + ); + } + + self.update_acc_meta_batch( + acc_id, + acc_num_tx + (tx_hashes.len() as u64), + &mut write_batch, + )?; + + self.db.write(write_batch).map_err(|rerr| { + DbError::rocksdb_cast_message(rerr, Some("Failed to write batch".to_string())) + }) + } + + fn get_acc_transaction_hashes( + &self, + acc_id: [u8; 32], + offset: u64, + limit: u64, + ) -> DbResult> { + let cf_att = self.account_id_to_tx_hash_column(); + let mut tx_batch = vec![]; + + // ToDo: Multi get this + + for tx_id in offset..(offset + limit) { + let mut prefix = borsh::to_vec(&acc_id).map_err(|berr| { + DbError::borsh_cast_message( + berr, + Some("Failed to serialize account id".to_string()), + ) + })?; + let suffix = borsh::to_vec(&tx_id).map_err(|berr| { + DbError::borsh_cast_message(berr, Some("Failed to serialize tx id".to_string())) + })?; + + prefix.extend_from_slice(&suffix); + + let res = self + .db + .get_cf(&cf_att, prefix) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + let tx_hash = if let Some(data) = res { + Ok(borsh::from_slice::<[u8; 32]>(&data).map_err(|serr| { + DbError::borsh_cast_message( + serr, + Some("Failed to deserialize tx_hash".to_string()), + ) + })?) + } else { + // Tx hash not found, assuming that previous one was the last + break; + }?; + + tx_batch.push(tx_hash); + } + + Ok(tx_batch) + } + + pub fn get_acc_transactions( + &self, + acc_id: [u8; 32], + offset: u64, + limit: u64, + ) -> DbResult> { + let mut tx_batch = vec![]; + + for tx_hash in self.get_acc_transaction_hashes(acc_id, offset, limit)? { + let block_id = self.get_block_id_by_hash(tx_hash)?; + let block = self.get_block(block_id)?; + + let enc_tx = block + .body + .transactions + .iter() + .find(|tx| tx.hash() == tx_hash) + .ok_or(DbError::db_interaction_error(format!( + "Missing transaction in block {} with hash {:#?}", + block.header.block_id, tx_hash + )))?; + + let transaction = NSSATransaction::try_from(enc_tx).map_err(|err| { + DbError::db_interaction_error(format!( + "failed to decode transaction in block {} with err {err:?}", + block.header.block_id + )) + })?; + + tx_batch.push(transaction); + } + + Ok(tx_batch) + } }