feat: batch writes

This commit is contained in:
Pravdyvy 2026-03-11 18:37:54 +02:00
parent 53b26064eb
commit b57c2ac33d
4 changed files with 305 additions and 19 deletions

View File

@ -10,7 +10,7 @@ pub struct DBMetadata {
}
impl RocksDBIO {
fn meta_keys_list() -> DbResult<Vec<Vec<u8>>> {
pub fn meta_keys_list() -> DbResult<Vec<Vec<u8>>> {
let mut keys = vec![];
keys.push(
@ -54,7 +54,7 @@ impl RocksDBIO {
Ok(keys)
}
fn read_meta_all(&self) -> DbResult<Option<DBMetadata>> {
pub fn read_meta_all(&self) -> DbResult<Option<DBMetadata>> {
let cf_meta = self.meta_column();
let multi_get_res = self.db.multi_get_cf(

View File

@ -201,23 +201,6 @@ impl RocksDBIO {
}
}
pub fn put_next_breakpoint(&self) -> DbResult<()> {
let last_block = self.get_meta_last_block_in_db()?;
let next_breakpoint_id = self.get_meta_last_breakpoint_id()? + 1;
let block_to_break_id = next_breakpoint_id * BREAKPOINT_INTERVAL;
if block_to_break_id <= last_block {
let next_breakpoint = self.calculate_state_for_id(block_to_break_id)?;
self.put_breakpoint(next_breakpoint_id, next_breakpoint)?;
self.put_meta_last_breakpoint_id(next_breakpoint_id)
} else {
Err(DbError::db_interaction_error(
"Breakpoint not yet achieved".to_string(),
))
}
}
// Mappings
pub fn get_block_id_by_hash(&self, hash: [u8; 32]) -> DbResult<u64> {

View File

@ -75,4 +75,290 @@ impl RocksDBIO {
DbError::rocksdb_cast_message(rerr, Some("Failed to write batch".to_string()))
})
}
pub fn put_account_transactions_dependant(
&self,
acc_id: [u8; 32],
tx_hashes: Vec<[u8; 32]>,
write_batch: &mut WriteBatch,
) -> DbResult<()> {
let acc_num_tx = self.get_acc_meta_num_tx(acc_id)?.unwrap_or(0);
let cf_att = self.account_id_to_tx_hash_column();
for (tx_id, tx_hash) in tx_hashes.iter().enumerate() {
let put_id = acc_num_tx + tx_id as u64;
let mut prefix = borsh::to_vec(&acc_id).map_err(|berr| {
DbError::borsh_cast_message(
berr,
Some("Failed to serialize account id".to_string()),
)
})?;
let suffix = borsh::to_vec(&put_id).map_err(|berr| {
DbError::borsh_cast_message(berr, Some("Failed to serialize tx id".to_string()))
})?;
prefix.extend_from_slice(&suffix);
write_batch.put_cf(
&cf_att,
prefix,
borsh::to_vec(tx_hash).map_err(|berr| {
DbError::borsh_cast_message(
berr,
Some("Failed to serialize tx hash".to_string()),
)
})?,
);
}
self.update_acc_meta_batch(acc_id, acc_num_tx + (tx_hashes.len() as u64), write_batch)?;
Ok(())
}
// Meta
pub fn put_meta_first_block_in_db_batch(&self, block: Block) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize first block id".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
self.put_block_batch(block, [0; 32])?;
Ok(())
}
pub fn put_meta_last_block_in_db_batch(
&self,
block_id: u64,
write_batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
write_batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
);
Ok(())
}
pub fn put_meta_last_observed_l1_lib_header_in_db_batch(
&self,
l1_lib_header: [u8; 32],
write_batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
write_batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some(
"Failed to serialize DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY"
.to_string(),
),
)
})?,
borsh::to_vec(&l1_lib_header).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last l1 block header".to_string()),
)
})?,
);
Ok(())
}
pub fn put_meta_last_breakpoint_id_batch(
&self,
br_id: u64,
write_batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
write_batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BREAKPOINT_ID).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BREAKPOINT_ID".to_string()),
)
})?,
borsh::to_vec(&br_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
);
Ok(())
}
pub fn put_meta_is_first_block_set_batch(&self, write_batch: &mut WriteBatch) -> DbResult<()> {
let cf_meta = self.meta_column();
write_batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()),
)
})?,
[1u8; 1],
);
Ok(())
}
// Block
pub fn put_block_batch(&self, block: Block, l1_lib_header: [u8; 32]) -> DbResult<()> {
let cf_block = self.block_column();
let cf_hti = self.hash_to_id_column();
let cf_tti: Arc<BoundColumnFamily<'_>> = self.tx_hash_to_id_column();
let last_curr_block = self.get_meta_last_block_in_db()?;
let mut write_batch = WriteBatch::default();
// ToDo: rewrite this with write batching
write_batch.put_cf(
&cf_block,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
})?,
borsh::to_vec(&block).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block data".to_string()))
})?,
);
if block.header.block_id > last_curr_block {
self.put_meta_last_block_in_db_batch(block.header.block_id, &mut write_batch)?;
self.put_meta_last_observed_l1_lib_header_in_db_batch(l1_lib_header, &mut write_batch)?;
}
write_batch.put_cf(
&cf_hti,
borsh::to_vec(&block.header.hash).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block hash".to_string()))
})?,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
})?,
);
let mut acc_to_tx_map: HashMap<[u8; 32], Vec<[u8; 32]>> = HashMap::new();
for tx in block.body.transactions {
let tx_hash = tx.hash();
write_batch.put_cf(
&cf_tti,
borsh::to_vec(&tx_hash).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize tx hash".to_string()),
)
})?,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
);
let acc_ids = tx
.affected_public_account_ids()
.into_iter()
.map(|account_id| account_id.into_value())
.collect::<Vec<_>>();
for acc_id in acc_ids {
acc_to_tx_map
.entry(acc_id)
.and_modify(|tx_hashes| tx_hashes.push(tx_hash.into()))
.or_insert(vec![tx_hash.into()]);
}
}
for (acc_id, tx_hashes) in acc_to_tx_map {
self.put_account_transactions_dependant(acc_id, tx_hashes, &mut write_batch)?;
}
if block.header.block_id.is_multiple_of(BREAKPOINT_INTERVAL) {
self.put_next_breakpoint_batch(&mut write_batch)?;
}
self.db.write(write_batch).map_err(|rerr| {
DbError::rocksdb_cast_message(rerr, Some("Failed to write batch".to_string()))
})
}
// State
pub fn put_breakpoint_batch(
&self,
br_id: u64,
breakpoint: V02State,
write_batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_br = self.breakpoint_column();
write_batch.put_cf(
&cf_br,
borsh::to_vec(&br_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize breakpoint id".to_string()),
)
})?,
borsh::to_vec(&breakpoint).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize breakpoint data".to_string()),
)
})?,
);
Ok(())
}
pub fn put_next_breakpoint_batch(&self, write_batch: &mut WriteBatch) -> DbResult<()> {
let last_block = self.get_meta_last_block_in_db()?;
let next_breakpoint_id = self.get_meta_last_breakpoint_id()? + 1;
let block_to_break_id = next_breakpoint_id * BREAKPOINT_INTERVAL;
if block_to_break_id <= last_block {
let next_breakpoint = self.calculate_state_for_id(block_to_break_id)?;
self.put_breakpoint_batch(next_breakpoint_id, next_breakpoint, write_batch)?;
self.put_meta_last_breakpoint_id_batch(next_breakpoint_id, write_batch)
} else {
Err(DbError::db_interaction_error(
"Breakpoint not yet achieved".to_string(),
))
}
}
}

View File

@ -241,4 +241,21 @@ impl RocksDBIO {
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))
}
pub fn put_next_breakpoint(&self) -> DbResult<()> {
let last_block = self.get_meta_last_block_in_db()?;
let next_breakpoint_id = self.get_meta_last_breakpoint_id()? + 1;
let block_to_break_id = next_breakpoint_id * BREAKPOINT_INTERVAL;
if block_to_break_id <= last_block {
let next_breakpoint = self.calculate_state_for_id(block_to_break_id)?;
self.put_breakpoint(next_breakpoint_id, next_breakpoint)?;
self.put_meta_last_breakpoint_id(next_breakpoint_id)
} else {
Err(DbError::db_interaction_error(
"Breakpoint not yet achieved".to_string(),
))
}
}
}