From 53b26064ebdb2e3ace9410290cf8e7f869d8adb6 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Mon, 9 Mar 2026 16:35:03 +0200 Subject: [PATCH] feat: multi gets --- storage/src/indexer/mod.rs | 6 +- storage/src/indexer/read_multi_get.rs | 349 +++++++++++++++++++++++++- storage/src/indexer/read_once.rs | 136 +--------- storage/src/indexer/write_batch.rs | 2 +- storage/src/indexer/write_once.rs | 8 +- 5 files changed, 359 insertions(+), 142 deletions(-) diff --git a/storage/src/indexer/mod.rs b/storage/src/indexer/mod.rs index 311746e6..c445e6bb 100644 --- a/storage/src/indexer/mod.rs +++ b/storage/src/indexer/mod.rs @@ -8,10 +8,10 @@ use rocksdb::{ use crate::error::DbError; -pub mod read_once; -pub mod write_once; pub mod read_multi_get; +pub mod read_once; pub mod write_batch; +pub mod write_once; /// Maximal size of stored blocks in base /// @@ -162,7 +162,7 @@ impl RocksDBIO { self.db.cf_handle(CF_ACC_META).unwrap() } - //State + // State pub fn calculate_state_for_id(&self, block_id: u64) -> DbResult { let last_block = self.get_meta_last_block_in_db()?; diff --git a/storage/src/indexer/read_multi_get.rs b/storage/src/indexer/read_multi_get.rs index 15124682..9fd68382 100644 --- a/storage/src/indexer/read_multi_get.rs +++ b/storage/src/indexer/read_multi_get.rs @@ -1,5 +1,350 @@ use super::*; +#[derive(Debug, Clone)] +pub struct DBMetadata { + pub first_block_in_db: u64, + pub last_block_in_db: u64, + pub last_observed_l1_lib_header_in_db: [u8; 32], + pub is_first_block_set: bool, + pub last_breakpoint_id: u64, +} + impl RocksDBIO { - -} \ No newline at end of file + fn meta_keys_list() -> DbResult>> { + let mut keys = vec![]; + + keys.push( + borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()), + ) + })?, + ); + keys.push(borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()), + ) + })?); + keys.push( + borsh::to_vec(&DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY).map_err(|err| { + DbError::borsh_cast_message( + err, + Some( + "Failed to serialize DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY" + .to_string(), + ), + ) + })?, + ); + keys.push(borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()), + ) + })?); + keys.push(borsh::to_vec(&DB_META_LAST_BREAKPOINT_ID).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize DB_META_LAST_BREAKPOINT_ID".to_string()), + ) + })?); + + Ok(keys) + } + + fn read_meta_all(&self) -> DbResult> { + let cf_meta = self.meta_column(); + + let multi_get_res = self.db.multi_get_cf( + RocksDBIO::meta_keys_list()? + .into_iter() + .map(|key| (&cf_meta, key)), + ); + + let Some(first_block_in_db_raw) = + multi_get_res[0] + .as_ref() + .map_err(|err| DbError::RocksDbError { + error: err.clone(), + additional_info: Some("Failed to read first_block_in_db".to_string()), + })? + else { + return Ok(None); + }; + let Some(last_block_in_db_raw) = + multi_get_res[1] + .as_ref() + .map_err(|err| DbError::RocksDbError { + error: err.clone(), + additional_info: Some("Failed to read last_block_in_db".to_string()), + })? + else { + return Ok(None); + }; + let Some(last_observed_l1_lib_header_in_db_raw) = + multi_get_res[2] + .as_ref() + .map_err(|err| DbError::RocksDbError { + error: err.clone(), + additional_info: Some( + "Failed to read last_observed_l1_lib_header_in_db".to_string(), + ), + })? + else { + return Ok(None); + }; + let is_first_block_set = multi_get_res[3] + .as_ref() + .map_err(|err| DbError::RocksDbError { + error: err.clone(), + additional_info: Some("Failed to read is_first_block_set".to_string()), + })? + .is_some(); + let Some(last_breakpoint_id_raw) = + multi_get_res[4] + .as_ref() + .clone() + .map_err(|err| DbError::RocksDbError { + error: err.clone(), + additional_info: Some("Failed to read last_breakpoint_id".to_string()), + })? + else { + return Ok(None); + }; + + let first_block_in_db = borsh::from_slice::(first_block_in_db_raw).map_err(|err| { + DbError::borsh_cast_message(err, Some("Failed to deserialize first block".to_string())) + })?; + let last_block_in_db = borsh::from_slice::(last_block_in_db_raw).map_err(|err| { + DbError::borsh_cast_message(err, Some("Failed to deserialize last block".to_string())) + })?; + let last_observed_l1_lib_header_in_db = borsh::from_slice::<[u8; 32]>( + last_observed_l1_lib_header_in_db_raw, + ) + .map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to deserialize last l1 lib header".to_string()), + ) + })?; + let last_breakpoint_id = + borsh::from_slice::(last_breakpoint_id_raw).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to deserialize last breakpoint id".to_string()), + ) + })?; + + Ok(Some(DBMetadata { + first_block_in_db, + last_block_in_db, + last_observed_l1_lib_header_in_db, + is_first_block_set, + last_breakpoint_id, + })) + } + + pub fn get_block_batch(&self, before: Option, limit: u64) -> DbResult> { + let mut seq = vec![]; + + // Determine the starting block ID + let start_block_id = if let Some(before_id) = before { + before_id.saturating_sub(1) + } else { + // Get the latest block ID + self.get_meta_last_block_in_db()? + }; + + for i in 0..limit { + let block_id = start_block_id.saturating_sub(i); + if block_id == 0 { + break; + } + seq.push(block_id); + } + + self.get_block_batch_seq(seq.into_iter()) + } + + /// Get block batch from a sequence + /// + /// Currently assumes non-decreasing sequence + /// + /// ToDo: Add suport of arbitrary sequences + fn get_block_batch_seq(&self, seq: impl Iterator) -> DbResult> { + let cf_block = self.block_column(); + + // Keys setup + let mut keys = vec![]; + for block_id in seq { + keys.push(( + &cf_block, + borsh::to_vec(&block_id).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize block id".to_string()), + ) + })?, + )); + } + + let multi_get_res = self.db.multi_get_cf(keys); + + // Keys parsing + let mut block_batch = vec![]; + for res in multi_get_res { + let res = res.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + let block = if let Some(data) = res { + Ok(borsh::from_slice::(&data).map_err(|serr| { + DbError::borsh_cast_message( + serr, + Some("Failed to deserialize block data".to_string()), + ) + })?) + } else { + // Block not found, assuming that previous one was the last + break; + }?; + + block_batch.push(block); + } + + Ok(block_batch) + } + + /// Get block ids by txs + /// + /// Transactions must be sorted by time of arrival + /// + /// ToDo: There may be multiple transactions in one block + /// so this method can take redundant reads. + /// Need to update signature and implementation. + fn get_block_ids_by_tx_vec(&self, tx_vec: &[[u8; 32]]) -> DbResult> { + let cf_tti = self.tx_hash_to_id_column(); + + // Keys setup + let mut keys = vec![]; + for tx_hash in tx_vec { + keys.push(( + &cf_tti, + borsh::to_vec(tx_hash).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize tx_hash".to_string()), + ) + })?, + )); + } + + let multi_get_res = self.db.multi_get_cf(keys); + + // Keys parsing + let mut block_id_batch = vec![]; + for res in multi_get_res { + let res = res.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + let block_id = if let Some(data) = res { + Ok(borsh::from_slice::(&data).map_err(|serr| { + DbError::borsh_cast_message( + serr, + Some("Failed to deserialize block id".to_string()), + ) + })?) + } else { + // Block not found, assuming that previous one was the last + break; + }?; + + block_id_batch.push(block_id); + } + + Ok(block_id_batch) + } + + // Account + + pub(crate) fn get_acc_transaction_hashes( + &self, + acc_id: [u8; 32], + offset: u64, + limit: u64, + ) -> DbResult> { + let cf_att = self.account_id_to_tx_hash_column(); + let mut tx_batch = vec![]; + + // Keys preparation + let mut keys = vec![]; + for tx_id in offset..(offset + limit) { + let mut prefix = borsh::to_vec(&acc_id).map_err(|berr| { + DbError::borsh_cast_message( + berr, + Some("Failed to serialize account id".to_string()), + ) + })?; + let suffix = borsh::to_vec(&tx_id).map_err(|berr| { + DbError::borsh_cast_message(berr, Some("Failed to serialize tx id".to_string())) + })?; + + prefix.extend_from_slice(&suffix); + + keys.push((&cf_att, prefix)); + } + + let multi_get_res = self.db.multi_get_cf(keys); + + for res in multi_get_res { + let res = res.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + let tx_hash = if let Some(data) = res { + Ok(borsh::from_slice::<[u8; 32]>(&data).map_err(|serr| { + DbError::borsh_cast_message( + serr, + Some("Failed to deserialize tx_hash".to_string()), + ) + })?) + } else { + // Tx hash not found, assuming that previous one was the last + break; + }?; + + tx_batch.push(tx_hash); + } + + Ok(tx_batch) + } + + pub fn get_acc_transactions( + &self, + acc_id: [u8; 32], + offset: u64, + limit: u64, + ) -> DbResult> { + let mut tx_batch = vec![]; + + let tx_hashes = self.get_acc_transaction_hashes(acc_id, offset, limit)?; + + let associated_blocks_multi_get = self + .get_block_batch_seq(self.get_block_ids_by_tx_vec(&tx_hashes)?.into_iter())? + .into_iter() + .zip(tx_hashes); + + for (block, tx_hash) in associated_blocks_multi_get { + let transaction = block + .body + .transactions + .iter() + .find(|tx| tx.hash().0 == tx_hash) + .ok_or(DbError::db_interaction_error(format!( + "Missing transaction in block {} with hash {:#?}", + block.header.block_id, tx_hash + )))?; + + tx_batch.push(transaction.clone()); + } + + Ok(tx_batch) + } +} diff --git a/storage/src/indexer/read_once.rs b/storage/src/indexer/read_once.rs index 1091dbc1..e544d277 100644 --- a/storage/src/indexer/read_once.rs +++ b/storage/src/indexer/read_once.rs @@ -1,7 +1,7 @@ use super::*; impl RocksDBIO { - //Meta + // Meta pub fn get_meta_first_block_in_db(&self) -> DbResult { let cf_meta = self.meta_column(); @@ -139,7 +139,7 @@ impl RocksDBIO { } } - //Block + // Block pub fn get_block(&self, block_id: u64) -> DbResult { let cf_block = self.block_column(); @@ -170,58 +170,7 @@ impl RocksDBIO { } } - pub fn get_block_batch(&self, before: Option, limit: u64) -> DbResult> { - let cf_block = self.block_column(); - let mut block_batch = vec![]; - - // Determine the starting block ID - let start_block_id = if let Some(before_id) = before { - before_id.saturating_sub(1) - } else { - // Get the latest block ID - self.get_meta_last_block_in_db()? - }; - - // ToDo: Multi get this - - for i in 0..limit { - let block_id = start_block_id.saturating_sub(i); - if block_id == 0 { - break; - } - - let res = self - .db - .get_cf( - &cf_block, - borsh::to_vec(&block_id).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize block id".to_string()), - ) - })?, - ) - .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; - - let block = if let Some(data) = res { - Ok(borsh::from_slice::(&data).map_err(|serr| { - DbError::borsh_cast_message( - serr, - Some("Failed to deserialize block data".to_string()), - ) - })?) - } else { - // Block not found, assuming that previous one was the last - break; - }?; - - block_batch.push(block); - } - - Ok(block_batch) - } - - //State + // State pub fn get_breakpoint(&self, br_id: u64) -> DbResult { let cf_br = self.breakpoint_column(); @@ -344,81 +293,4 @@ impl RocksDBIO { }) .transpose() } - - // Account - - fn get_acc_transaction_hashes( - &self, - acc_id: [u8; 32], - offset: u64, - limit: u64, - ) -> DbResult> { - let cf_att = self.account_id_to_tx_hash_column(); - let mut tx_batch = vec![]; - - // ToDo: Multi get this - - for tx_id in offset..(offset + limit) { - let mut prefix = borsh::to_vec(&acc_id).map_err(|berr| { - DbError::borsh_cast_message( - berr, - Some("Failed to serialize account id".to_string()), - ) - })?; - let suffix = borsh::to_vec(&tx_id).map_err(|berr| { - DbError::borsh_cast_message(berr, Some("Failed to serialize tx id".to_string())) - })?; - - prefix.extend_from_slice(&suffix); - - let res = self - .db - .get_cf(&cf_att, prefix) - .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; - - let tx_hash = if let Some(data) = res { - Ok(borsh::from_slice::<[u8; 32]>(&data).map_err(|serr| { - DbError::borsh_cast_message( - serr, - Some("Failed to deserialize tx_hash".to_string()), - ) - })?) - } else { - // Tx hash not found, assuming that previous one was the last - break; - }?; - - tx_batch.push(tx_hash); - } - - Ok(tx_batch) - } - - pub fn get_acc_transactions( - &self, - acc_id: [u8; 32], - offset: u64, - limit: u64, - ) -> DbResult> { - let mut tx_batch = vec![]; - - for tx_hash in self.get_acc_transaction_hashes(acc_id, offset, limit)? { - let block_id = self.get_block_id_by_tx_hash(tx_hash)?; - let block = self.get_block(block_id)?; - - let transaction = block - .body - .transactions - .iter() - .find(|tx| tx.hash().0 == tx_hash) - .ok_or(DbError::db_interaction_error(format!( - "Missing transaction in block {} with hash {:#?}", - block.header.block_id, tx_hash - )))?; - - tx_batch.push(transaction.clone()); - } - - Ok(tx_batch) - } -} \ No newline at end of file +} diff --git a/storage/src/indexer/write_batch.rs b/storage/src/indexer/write_batch.rs index 2bb11a85..254e7dc9 100644 --- a/storage/src/indexer/write_batch.rs +++ b/storage/src/indexer/write_batch.rs @@ -75,4 +75,4 @@ impl RocksDBIO { DbError::rocksdb_cast_message(rerr, Some("Failed to write batch".to_string())) }) } -} \ No newline at end of file +} diff --git a/storage/src/indexer/write_once.rs b/storage/src/indexer/write_once.rs index 71946f63..b4309c78 100644 --- a/storage/src/indexer/write_once.rs +++ b/storage/src/indexer/write_once.rs @@ -1,7 +1,7 @@ use super::*; impl RocksDBIO { - //Meta + // Meta pub fn put_meta_first_block_in_db(&self, block: Block) -> DbResult<()> { let cf_meta = self.meta_column(); @@ -118,7 +118,7 @@ impl RocksDBIO { Ok(()) } - //Block + // Block pub fn put_block(&self, block: Block, l1_lib_header: [u8; 32]) -> DbResult<()> { let cf_block = self.block_column(); @@ -218,7 +218,7 @@ impl RocksDBIO { Ok(()) } - //State + // State pub fn put_breakpoint(&self, br_id: u64, breakpoint: V02State) -> DbResult<()> { let cf_br = self.breakpoint_column(); @@ -241,4 +241,4 @@ impl RocksDBIO { ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None)) } -} \ No newline at end of file +}