From 247094ca614ebd7d404ea1bb94fdb305699188b4 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Thu, 12 Mar 2026 16:21:04 +0200 Subject: [PATCH] fix: batching usage in indexer service --- indexer/core/src/block_store.rs | 2 +- storage/src/indexer/mod.rs | 28 +++---- storage/src/indexer/write_once.rs | 124 ------------------------------ 3 files changed, 15 insertions(+), 139 deletions(-) diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index 681b63c8..7844b947 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -118,6 +118,6 @@ impl IndexerStore { // to represent correct block finality block.bedrock_status = BedrockStatus::Finalized; - Ok(self.dbio.put_block(block, l1_header.into())?) + Ok(self.dbio.put_block_batch(block, l1_header.into())?) } } diff --git a/storage/src/indexer/mod.rs b/storage/src/indexer/mod.rs index f07980a9..db47c0a6 100644 --- a/storage/src/indexer/mod.rs +++ b/storage/src/indexer/mod.rs @@ -100,7 +100,7 @@ impl RocksDBIO { let block_id = block.header.block_id; dbio.put_meta_last_block_in_db(block_id)?; dbio.put_meta_last_observed_l1_lib_header_in_db(block.bedrock_parent_id)?; - dbio.put_meta_first_block_in_db(block)?; + dbio.put_meta_first_block_in_db_batch(block)?; dbio.put_meta_is_first_block_set()?; // First breakpoint setup @@ -311,7 +311,7 @@ mod tests { let transfer_tx = transfer(1, 0, true); let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block, [1; 32]).unwrap(); + dbio.put_block_batch(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let first_id = dbio.get_meta_first_block_in_db().unwrap(); @@ -410,7 +410,7 @@ mod tests { let control_hash1 = block.header.hash; - dbio.put_block(block, [1; 32]).unwrap(); + dbio.put_block_batch(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -421,7 +421,7 @@ mod tests { let control_hash2 = block.header.hash; - dbio.put_block(block, [2; 32]).unwrap(); + dbio.put_block_batch(block, [2; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -432,7 +432,7 @@ mod tests { let control_tx_hash1 = transfer_tx.hash(); let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block, [3; 32]).unwrap(); + dbio.put_block_batch(block, [3; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -443,7 +443,7 @@ mod tests { let control_tx_hash2 = transfer_tx.hash(); let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block, [4; 32]).unwrap(); + dbio.put_block_batch(block, [4; 32]).unwrap(); let control_block_id1 = dbio.get_block_id_by_hash(control_hash1.0).unwrap(); let control_block_id2 = dbio.get_block_id_by_hash(control_hash2.0).unwrap(); @@ -474,7 +474,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block, [1; 32]).unwrap(); + dbio.put_block_batch(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -484,7 +484,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block, [2; 32]).unwrap(); + dbio.put_block_batch(block, [2; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -494,7 +494,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block, [3; 32]).unwrap(); + dbio.put_block_batch(block, [3; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -504,7 +504,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block, [4; 32]).unwrap(); + dbio.put_block_batch(block, [4; 32]).unwrap(); let block_hashes_mem: Vec<[u8; 32]> = block_res.into_iter().map(|bl| bl.header.hash.0).collect(); @@ -567,7 +567,7 @@ mod tests { vec![transfer_tx1, transfer_tx2], ); - dbio.put_block(block, [1; 32]).unwrap(); + dbio.put_block_batch(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -584,7 +584,7 @@ mod tests { vec![transfer_tx1, transfer_tx2], ); - dbio.put_block(block, [2; 32]).unwrap(); + dbio.put_block_batch(block, [2; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -601,7 +601,7 @@ mod tests { vec![transfer_tx1, transfer_tx2], ); - dbio.put_block(block, [3; 32]).unwrap(); + dbio.put_block_batch(block, [3; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -612,7 +612,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block, [4; 32]).unwrap(); + dbio.put_block_batch(block, [4; 32]).unwrap(); let acc1_tx = dbio.get_acc_transactions(*acc1().value(), 0, 7).unwrap(); let acc1_tx_hashes: Vec<[u8; 32]> = acc1_tx.into_iter().map(|tx| tx.hash().0).collect(); diff --git a/storage/src/indexer/write_once.rs b/storage/src/indexer/write_once.rs index bf00fce5..ab600f84 100644 --- a/storage/src/indexer/write_once.rs +++ b/storage/src/indexer/write_once.rs @@ -3,30 +3,6 @@ use super::*; impl RocksDBIO { // Meta - pub fn put_meta_first_block_in_db(&self, block: Block) -> DbResult<()> { - let cf_meta = self.meta_column(); - self.db - .put_cf( - &cf_meta, - borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()), - ) - })?, - borsh::to_vec(&block.header.block_id).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize first block id".to_string()), - ) - })?, - ) - .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; - - self.put_block(block, [0; 32])?; - Ok(()) - } - pub fn put_meta_last_block_in_db(&self, block_id: u64) -> DbResult<()> { let cf_meta = self.meta_column(); self.db @@ -118,106 +94,6 @@ impl RocksDBIO { Ok(()) } - // Block - - pub fn put_block(&self, block: Block, l1_lib_header: [u8; 32]) -> DbResult<()> { - let cf_block = self.block_column(); - let cf_hti = self.hash_to_id_column(); - let cf_tti: Arc> = self.tx_hash_to_id_column(); - - // ToDo: rewrite this with write batching - - self.db - .put_cf( - &cf_block, - borsh::to_vec(&block.header.block_id).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize block id".to_string()), - ) - })?, - borsh::to_vec(&block).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize block data".to_string()), - ) - })?, - ) - .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; - - let last_curr_block = self.get_meta_last_block_in_db()?; - - if block.header.block_id > last_curr_block { - self.put_meta_last_block_in_db(block.header.block_id)?; - self.put_meta_last_observed_l1_lib_header_in_db(l1_lib_header)?; - } - - self.db - .put_cf( - &cf_hti, - borsh::to_vec(&block.header.hash).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize block hash".to_string()), - ) - })?, - borsh::to_vec(&block.header.block_id).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize block id".to_string()), - ) - })?, - ) - .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; - - let mut acc_to_tx_map: HashMap<[u8; 32], Vec<[u8; 32]>> = HashMap::new(); - - for tx in block.body.transactions { - let tx_hash = tx.hash(); - - self.db - .put_cf( - &cf_tti, - borsh::to_vec(&tx_hash).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize tx hash".to_string()), - ) - })?, - borsh::to_vec(&block.header.block_id).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize block id".to_string()), - ) - })?, - ) - .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; - - let acc_ids = tx - .affected_public_account_ids() - .into_iter() - .map(|account_id| account_id.into_value()) - .collect::>(); - - for acc_id in acc_ids { - acc_to_tx_map - .entry(acc_id) - .and_modify(|tx_hashes| tx_hashes.push(tx_hash.into())) - .or_insert(vec![tx_hash.into()]); - } - } - - for (acc_id, tx_hashes) in acc_to_tx_map { - self.put_account_transactions(acc_id, tx_hashes)?; - } - - if block.header.block_id.is_multiple_of(BREAKPOINT_INTERVAL) { - self.put_next_breakpoint()?; - } - - Ok(()) - } - // State pub fn put_breakpoint(&self, br_id: u64, breakpoint: V02State) -> DbResult<()> {