diff --git a/storage/src/indexer/read_multi_get.rs b/storage/src/indexer/read_multi_get.rs index 9fd68382..6bfb28cc 100644 --- a/storage/src/indexer/read_multi_get.rs +++ b/storage/src/indexer/read_multi_get.rs @@ -10,7 +10,7 @@ pub struct DBMetadata { } impl RocksDBIO { - fn meta_keys_list() -> DbResult>> { + pub fn meta_keys_list() -> DbResult>> { let mut keys = vec![]; keys.push( @@ -54,7 +54,7 @@ impl RocksDBIO { Ok(keys) } - fn read_meta_all(&self) -> DbResult> { + pub fn read_meta_all(&self) -> DbResult> { let cf_meta = self.meta_column(); let multi_get_res = self.db.multi_get_cf( diff --git a/storage/src/indexer/read_once.rs b/storage/src/indexer/read_once.rs index e544d277..8fe035fd 100644 --- a/storage/src/indexer/read_once.rs +++ b/storage/src/indexer/read_once.rs @@ -201,23 +201,6 @@ impl RocksDBIO { } } - pub fn put_next_breakpoint(&self) -> DbResult<()> { - let last_block = self.get_meta_last_block_in_db()?; - let next_breakpoint_id = self.get_meta_last_breakpoint_id()? + 1; - let block_to_break_id = next_breakpoint_id * BREAKPOINT_INTERVAL; - - if block_to_break_id <= last_block { - let next_breakpoint = self.calculate_state_for_id(block_to_break_id)?; - - self.put_breakpoint(next_breakpoint_id, next_breakpoint)?; - self.put_meta_last_breakpoint_id(next_breakpoint_id) - } else { - Err(DbError::db_interaction_error( - "Breakpoint not yet achieved".to_string(), - )) - } - } - // Mappings pub fn get_block_id_by_hash(&self, hash: [u8; 32]) -> DbResult { diff --git a/storage/src/indexer/write_batch.rs b/storage/src/indexer/write_batch.rs index 254e7dc9..9febd605 100644 --- a/storage/src/indexer/write_batch.rs +++ b/storage/src/indexer/write_batch.rs @@ -75,4 +75,290 @@ impl RocksDBIO { DbError::rocksdb_cast_message(rerr, Some("Failed to write batch".to_string())) }) } + + pub fn put_account_transactions_dependant( + &self, + acc_id: [u8; 32], + tx_hashes: Vec<[u8; 32]>, + write_batch: &mut WriteBatch, + ) -> DbResult<()> { + let acc_num_tx = self.get_acc_meta_num_tx(acc_id)?.unwrap_or(0); + let cf_att = self.account_id_to_tx_hash_column(); + + for (tx_id, tx_hash) in tx_hashes.iter().enumerate() { + let put_id = acc_num_tx + tx_id as u64; + + let mut prefix = borsh::to_vec(&acc_id).map_err(|berr| { + DbError::borsh_cast_message( + berr, + Some("Failed to serialize account id".to_string()), + ) + })?; + let suffix = borsh::to_vec(&put_id).map_err(|berr| { + DbError::borsh_cast_message(berr, Some("Failed to serialize tx id".to_string())) + })?; + + prefix.extend_from_slice(&suffix); + + write_batch.put_cf( + &cf_att, + prefix, + borsh::to_vec(tx_hash).map_err(|berr| { + DbError::borsh_cast_message( + berr, + Some("Failed to serialize tx hash".to_string()), + ) + })?, + ); + } + + self.update_acc_meta_batch(acc_id, acc_num_tx + (tx_hashes.len() as u64), write_batch)?; + + Ok(()) + } + + // Meta + + pub fn put_meta_first_block_in_db_batch(&self, block: Block) -> DbResult<()> { + let cf_meta = self.meta_column(); + self.db + .put_cf( + &cf_meta, + borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()), + ) + })?, + borsh::to_vec(&block.header.block_id).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize first block id".to_string()), + ) + })?, + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + self.put_block_batch(block, [0; 32])?; + Ok(()) + } + + pub fn put_meta_last_block_in_db_batch( + &self, + block_id: u64, + write_batch: &mut WriteBatch, + ) -> DbResult<()> { + let cf_meta = self.meta_column(); + write_batch.put_cf( + &cf_meta, + borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()), + ) + })?, + borsh::to_vec(&block_id).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize last block id".to_string()), + ) + })?, + ); + Ok(()) + } + + pub fn put_meta_last_observed_l1_lib_header_in_db_batch( + &self, + l1_lib_header: [u8; 32], + write_batch: &mut WriteBatch, + ) -> DbResult<()> { + let cf_meta = self.meta_column(); + write_batch.put_cf( + &cf_meta, + borsh::to_vec(&DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY).map_err(|err| { + DbError::borsh_cast_message( + err, + Some( + "Failed to serialize DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY" + .to_string(), + ), + ) + })?, + borsh::to_vec(&l1_lib_header).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize last l1 block header".to_string()), + ) + })?, + ); + Ok(()) + } + + pub fn put_meta_last_breakpoint_id_batch( + &self, + br_id: u64, + write_batch: &mut WriteBatch, + ) -> DbResult<()> { + let cf_meta = self.meta_column(); + write_batch.put_cf( + &cf_meta, + borsh::to_vec(&DB_META_LAST_BREAKPOINT_ID).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize DB_META_LAST_BREAKPOINT_ID".to_string()), + ) + })?, + borsh::to_vec(&br_id).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize last block id".to_string()), + ) + })?, + ); + Ok(()) + } + + pub fn put_meta_is_first_block_set_batch(&self, write_batch: &mut WriteBatch) -> DbResult<()> { + let cf_meta = self.meta_column(); + write_batch.put_cf( + &cf_meta, + borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()), + ) + })?, + [1u8; 1], + ); + Ok(()) + } + + // Block + + pub fn put_block_batch(&self, block: Block, l1_lib_header: [u8; 32]) -> DbResult<()> { + let cf_block = self.block_column(); + let cf_hti = self.hash_to_id_column(); + let cf_tti: Arc> = self.tx_hash_to_id_column(); + let last_curr_block = self.get_meta_last_block_in_db()?; + let mut write_batch = WriteBatch::default(); + + // ToDo: rewrite this with write batching + + write_batch.put_cf( + &cf_block, + borsh::to_vec(&block.header.block_id).map_err(|err| { + DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string())) + })?, + borsh::to_vec(&block).map_err(|err| { + DbError::borsh_cast_message(err, Some("Failed to serialize block data".to_string())) + })?, + ); + + if block.header.block_id > last_curr_block { + self.put_meta_last_block_in_db_batch(block.header.block_id, &mut write_batch)?; + self.put_meta_last_observed_l1_lib_header_in_db_batch(l1_lib_header, &mut write_batch)?; + } + + write_batch.put_cf( + &cf_hti, + borsh::to_vec(&block.header.hash).map_err(|err| { + DbError::borsh_cast_message(err, Some("Failed to serialize block hash".to_string())) + })?, + borsh::to_vec(&block.header.block_id).map_err(|err| { + DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string())) + })?, + ); + + let mut acc_to_tx_map: HashMap<[u8; 32], Vec<[u8; 32]>> = HashMap::new(); + + for tx in block.body.transactions { + let tx_hash = tx.hash(); + + write_batch.put_cf( + &cf_tti, + borsh::to_vec(&tx_hash).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize tx hash".to_string()), + ) + })?, + borsh::to_vec(&block.header.block_id).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize block id".to_string()), + ) + })?, + ); + + let acc_ids = tx + .affected_public_account_ids() + .into_iter() + .map(|account_id| account_id.into_value()) + .collect::>(); + + for acc_id in acc_ids { + acc_to_tx_map + .entry(acc_id) + .and_modify(|tx_hashes| tx_hashes.push(tx_hash.into())) + .or_insert(vec![tx_hash.into()]); + } + } + + for (acc_id, tx_hashes) in acc_to_tx_map { + self.put_account_transactions_dependant(acc_id, tx_hashes, &mut write_batch)?; + } + + if block.header.block_id.is_multiple_of(BREAKPOINT_INTERVAL) { + self.put_next_breakpoint_batch(&mut write_batch)?; + } + + self.db.write(write_batch).map_err(|rerr| { + DbError::rocksdb_cast_message(rerr, Some("Failed to write batch".to_string())) + }) + } + + // State + + pub fn put_breakpoint_batch( + &self, + br_id: u64, + breakpoint: V02State, + write_batch: &mut WriteBatch, + ) -> DbResult<()> { + let cf_br = self.breakpoint_column(); + + write_batch.put_cf( + &cf_br, + borsh::to_vec(&br_id).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize breakpoint id".to_string()), + ) + })?, + borsh::to_vec(&breakpoint).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize breakpoint data".to_string()), + ) + })?, + ); + Ok(()) + } + + pub fn put_next_breakpoint_batch(&self, write_batch: &mut WriteBatch) -> DbResult<()> { + let last_block = self.get_meta_last_block_in_db()?; + let next_breakpoint_id = self.get_meta_last_breakpoint_id()? + 1; + let block_to_break_id = next_breakpoint_id * BREAKPOINT_INTERVAL; + + if block_to_break_id <= last_block { + let next_breakpoint = self.calculate_state_for_id(block_to_break_id)?; + + self.put_breakpoint_batch(next_breakpoint_id, next_breakpoint, write_batch)?; + self.put_meta_last_breakpoint_id_batch(next_breakpoint_id, write_batch) + } else { + Err(DbError::db_interaction_error( + "Breakpoint not yet achieved".to_string(), + )) + } + } } diff --git a/storage/src/indexer/write_once.rs b/storage/src/indexer/write_once.rs index b4309c78..bf00fce5 100644 --- a/storage/src/indexer/write_once.rs +++ b/storage/src/indexer/write_once.rs @@ -241,4 +241,21 @@ impl RocksDBIO { ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None)) } + + pub fn put_next_breakpoint(&self) -> DbResult<()> { + let last_block = self.get_meta_last_block_in_db()?; + let next_breakpoint_id = self.get_meta_last_breakpoint_id()? + 1; + let block_to_break_id = next_breakpoint_id * BREAKPOINT_INTERVAL; + + if block_to_break_id <= last_block { + let next_breakpoint = self.calculate_state_for_id(block_to_break_id)?; + + self.put_breakpoint(next_breakpoint_id, next_breakpoint)?; + self.put_meta_last_breakpoint_id(next_breakpoint_id) + } else { + Err(DbError::db_interaction_error( + "Breakpoint not yet achieved".to_string(), + )) + } + } }