diff --git a/storage/src/indexer/mod.rs b/storage/src/indexer/mod.rs index c445e6bb..f07980a9 100644 --- a/storage/src/indexer/mod.rs +++ b/storage/src/indexer/mod.rs @@ -99,6 +99,7 @@ impl RocksDBIO { } else if let Some((block, initial_state)) = start_data { let block_id = block.header.block_id; dbio.put_meta_last_block_in_db(block_id)?; + dbio.put_meta_last_observed_l1_lib_header_in_db(block.bedrock_parent_id)?; dbio.put_meta_first_block_in_db(block)?; dbio.put_meta_is_first_block_set()?; @@ -179,9 +180,7 @@ impl RocksDBIO { self.get_meta_first_block_in_db()? }; - for id in start..=block_id { - let block = self.get_block(id)?; - + for block in self.get_block_batch_seq((start + 1)..=block_id)? { for transaction in block.body.transactions { transaction .transaction_stateless_check() @@ -347,7 +346,7 @@ mod tests { let dbio = RocksDBIO::open_or_create(temdir_path, Some((genesis_block(), initial_state()))) .unwrap(); - for i in 1..BREAKPOINT_INTERVAL { + for i in 1..(BREAKPOINT_INTERVAL + 1) { let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -355,7 +354,7 @@ mod tests { let transfer_tx = transfer(1, (i - 1) as u128, true); let block = common::test_utils::produce_dummy_block(i + 1, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block, [i as u8; 32]).unwrap(); + dbio.put_block_batch(block, [i as u8; 32]).unwrap(); } let last_id = dbio.get_meta_last_block_in_db().unwrap(); @@ -367,7 +366,7 @@ mod tests { let breakpoint = dbio.get_breakpoint(1).unwrap(); let final_state = dbio.final_state().unwrap(); - assert_eq!(last_id, 100); + assert_eq!(last_id, 101); assert_eq!(first_id, 1); assert!(is_first_set); assert_eq!(last_br_id, 1); @@ -375,20 +374,22 @@ mod tests { assert_eq!( prev_breakpoint.get_account_by_id(acc1()).balance - final_state.get_account_by_id(acc1()).balance, - 99 + 100 ); assert_eq!( final_state.get_account_by_id(acc2()).balance - prev_breakpoint.get_account_by_id(acc2()).balance, - 99 + 100 ); assert_eq!( - breakpoint.get_account_by_id(acc1()), - final_state.get_account_by_id(acc1()) + breakpoint.get_account_by_id(acc1()).balance + - final_state.get_account_by_id(acc1()).balance, + 1 ); assert_eq!( - breakpoint.get_account_by_id(acc2()), - final_state.get_account_by_id(acc2()) + final_state.get_account_by_id(acc2()).balance + - breakpoint.get_account_by_id(acc2()).balance, + 1 ); } @@ -531,6 +532,14 @@ mod tests { .collect(); assert_eq!(block_hashes_mem_limited, block_hashes_db_limited.as_slice()); + + let block_batch_seq = dbio.get_block_batch_seq(1..=5).unwrap(); + let block_batch_ids = block_batch_seq + .into_iter() + .map(|block| block.header.block_id) + .collect::>(); + + assert_eq!(block_batch_ids, vec![1, 2, 3, 4, 5]); } #[test] @@ -538,20 +547,25 @@ mod tests { let temp_dir = tempdir().unwrap(); let temdir_path = temp_dir.path(); - let mut tx_hash_res = vec![]; - let dbio = RocksDBIO::open_or_create(temdir_path, Some((genesis_block(), initial_state()))) .unwrap(); + let mut tx_hash_res = vec![]; + let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); let prev_hash = last_block.header.hash; - let transfer_tx = transfer(1, 0, true); + let transfer_tx1 = transfer(1, 0, true); + let transfer_tx2 = transfer(1, 1, true); + tx_hash_res.push(transfer_tx1.hash().0); + tx_hash_res.push(transfer_tx2.hash().0); - tx_hash_res.push(transfer_tx.hash().0); - - let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); + let block = common::test_utils::produce_dummy_block( + 2, + Some(prev_hash), + vec![transfer_tx1, transfer_tx2], + ); dbio.put_block(block, [1; 32]).unwrap(); @@ -559,11 +573,16 @@ mod tests { let last_block = dbio.get_block(last_id).unwrap(); let prev_hash = last_block.header.hash; - let transfer_tx = transfer(1, 1, true); + let transfer_tx1 = transfer(1, 2, true); + let transfer_tx2 = transfer(1, 3, true); + tx_hash_res.push(transfer_tx1.hash().0); + tx_hash_res.push(transfer_tx2.hash().0); - tx_hash_res.push(transfer_tx.hash().0); - - let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]); + let block = common::test_utils::produce_dummy_block( + 3, + Some(prev_hash), + vec![transfer_tx1, transfer_tx2], + ); dbio.put_block(block, [2; 32]).unwrap(); @@ -571,11 +590,16 @@ mod tests { let last_block = dbio.get_block(last_id).unwrap(); let prev_hash = last_block.header.hash; - let transfer_tx = transfer(1, 2, true); + let transfer_tx1 = transfer(1, 4, true); + let transfer_tx2 = transfer(1, 5, true); + tx_hash_res.push(transfer_tx1.hash().0); + tx_hash_res.push(transfer_tx2.hash().0); - tx_hash_res.push(transfer_tx.hash().0); - - let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); + let block = common::test_utils::produce_dummy_block( + 4, + Some(prev_hash), + vec![transfer_tx1, transfer_tx2], + ); dbio.put_block(block, [3; 32]).unwrap(); @@ -583,15 +607,14 @@ mod tests { let last_block = dbio.get_block(last_id).unwrap(); let prev_hash = last_block.header.hash; - let transfer_tx = transfer(1, 3, true); - + let transfer_tx = transfer(1, 6, true); tx_hash_res.push(transfer_tx.hash().0); let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); dbio.put_block(block, [4; 32]).unwrap(); - let acc1_tx = dbio.get_acc_transactions(*acc1().value(), 0, 4).unwrap(); + let acc1_tx = dbio.get_acc_transactions(*acc1().value(), 0, 7).unwrap(); let acc1_tx_hashes: Vec<[u8; 32]> = acc1_tx.into_iter().map(|tx| tx.hash().0).collect(); assert_eq!(acc1_tx_hashes, tx_hash_res); @@ -600,6 +623,6 @@ mod tests { let acc1_tx_limited_hashes: Vec<[u8; 32]> = acc1_tx_limited.into_iter().map(|tx| tx.hash().0).collect(); - assert_eq!(acc1_tx_limited_hashes.as_slice(), &tx_hash_res[1..]) + assert_eq!(acc1_tx_limited_hashes.as_slice(), &tx_hash_res[1..5]) } } diff --git a/storage/src/indexer/read_multi_get.rs b/storage/src/indexer/read_multi_get.rs index 6bfb28cc..981bc18e 100644 --- a/storage/src/indexer/read_multi_get.rs +++ b/storage/src/indexer/read_multi_get.rs @@ -1,151 +1,6 @@ use super::*; -#[derive(Debug, Clone)] -pub struct DBMetadata { - pub first_block_in_db: u64, - pub last_block_in_db: u64, - pub last_observed_l1_lib_header_in_db: [u8; 32], - pub is_first_block_set: bool, - pub last_breakpoint_id: u64, -} - impl RocksDBIO { - pub fn meta_keys_list() -> DbResult>> { - let mut keys = vec![]; - - keys.push( - borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()), - ) - })?, - ); - keys.push(borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()), - ) - })?); - keys.push( - borsh::to_vec(&DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY).map_err(|err| { - DbError::borsh_cast_message( - err, - Some( - "Failed to serialize DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY" - .to_string(), - ), - ) - })?, - ); - keys.push(borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()), - ) - })?); - keys.push(borsh::to_vec(&DB_META_LAST_BREAKPOINT_ID).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize DB_META_LAST_BREAKPOINT_ID".to_string()), - ) - })?); - - Ok(keys) - } - - pub fn read_meta_all(&self) -> DbResult> { - let cf_meta = self.meta_column(); - - let multi_get_res = self.db.multi_get_cf( - RocksDBIO::meta_keys_list()? - .into_iter() - .map(|key| (&cf_meta, key)), - ); - - let Some(first_block_in_db_raw) = - multi_get_res[0] - .as_ref() - .map_err(|err| DbError::RocksDbError { - error: err.clone(), - additional_info: Some("Failed to read first_block_in_db".to_string()), - })? - else { - return Ok(None); - }; - let Some(last_block_in_db_raw) = - multi_get_res[1] - .as_ref() - .map_err(|err| DbError::RocksDbError { - error: err.clone(), - additional_info: Some("Failed to read last_block_in_db".to_string()), - })? - else { - return Ok(None); - }; - let Some(last_observed_l1_lib_header_in_db_raw) = - multi_get_res[2] - .as_ref() - .map_err(|err| DbError::RocksDbError { - error: err.clone(), - additional_info: Some( - "Failed to read last_observed_l1_lib_header_in_db".to_string(), - ), - })? - else { - return Ok(None); - }; - let is_first_block_set = multi_get_res[3] - .as_ref() - .map_err(|err| DbError::RocksDbError { - error: err.clone(), - additional_info: Some("Failed to read is_first_block_set".to_string()), - })? - .is_some(); - let Some(last_breakpoint_id_raw) = - multi_get_res[4] - .as_ref() - .clone() - .map_err(|err| DbError::RocksDbError { - error: err.clone(), - additional_info: Some("Failed to read last_breakpoint_id".to_string()), - })? - else { - return Ok(None); - }; - - let first_block_in_db = borsh::from_slice::(first_block_in_db_raw).map_err(|err| { - DbError::borsh_cast_message(err, Some("Failed to deserialize first block".to_string())) - })?; - let last_block_in_db = borsh::from_slice::(last_block_in_db_raw).map_err(|err| { - DbError::borsh_cast_message(err, Some("Failed to deserialize last block".to_string())) - })?; - let last_observed_l1_lib_header_in_db = borsh::from_slice::<[u8; 32]>( - last_observed_l1_lib_header_in_db_raw, - ) - .map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to deserialize last l1 lib header".to_string()), - ) - })?; - let last_breakpoint_id = - borsh::from_slice::(last_breakpoint_id_raw).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to deserialize last breakpoint id".to_string()), - ) - })?; - - Ok(Some(DBMetadata { - first_block_in_db, - last_block_in_db, - last_observed_l1_lib_header_in_db, - is_first_block_set, - last_breakpoint_id, - })) - } - pub fn get_block_batch(&self, before: Option, limit: u64) -> DbResult> { let mut seq = vec![]; @@ -173,7 +28,7 @@ impl RocksDBIO { /// Currently assumes non-decreasing sequence /// /// ToDo: Add suport of arbitrary sequences - fn get_block_batch_seq(&self, seq: impl Iterator) -> DbResult> { + pub fn get_block_batch_seq(&self, seq: impl Iterator) -> DbResult> { let cf_block = self.block_column(); // Keys setup diff --git a/storage/src/indexer/write_batch.rs b/storage/src/indexer/write_batch.rs index 9febd605..b98e539a 100644 --- a/storage/src/indexer/write_batch.rs +++ b/storage/src/indexer/write_batch.rs @@ -308,57 +308,14 @@ impl RocksDBIO { self.put_account_transactions_dependant(acc_id, tx_hashes, &mut write_batch)?; } - if block.header.block_id.is_multiple_of(BREAKPOINT_INTERVAL) { - self.put_next_breakpoint_batch(&mut write_batch)?; - } - self.db.write(write_batch).map_err(|rerr| { DbError::rocksdb_cast_message(rerr, Some("Failed to write batch".to_string())) - }) - } + })?; - // State + if block.header.block_id.is_multiple_of(BREAKPOINT_INTERVAL) { + self.put_next_breakpoint()?; + } - pub fn put_breakpoint_batch( - &self, - br_id: u64, - breakpoint: V02State, - write_batch: &mut WriteBatch, - ) -> DbResult<()> { - let cf_br = self.breakpoint_column(); - - write_batch.put_cf( - &cf_br, - borsh::to_vec(&br_id).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize breakpoint id".to_string()), - ) - })?, - borsh::to_vec(&breakpoint).map_err(|err| { - DbError::borsh_cast_message( - err, - Some("Failed to serialize breakpoint data".to_string()), - ) - })?, - ); Ok(()) } - - pub fn put_next_breakpoint_batch(&self, write_batch: &mut WriteBatch) -> DbResult<()> { - let last_block = self.get_meta_last_block_in_db()?; - let next_breakpoint_id = self.get_meta_last_breakpoint_id()? + 1; - let block_to_break_id = next_breakpoint_id * BREAKPOINT_INTERVAL; - - if block_to_break_id <= last_block { - let next_breakpoint = self.calculate_state_for_id(block_to_break_id)?; - - self.put_breakpoint_batch(next_breakpoint_id, next_breakpoint, write_batch)?; - self.put_meta_last_breakpoint_id_batch(next_breakpoint_id, write_batch) - } else { - Err(DbError::db_interaction_error( - "Breakpoint not yet achieved".to_string(), - )) - } - } }