use std::{ops::Div, path::Path, sync::Arc}; use common::{block::Block, transaction::{NSSATransaction, execute_check_transaction_on_state}}; use nssa::V02State; use rocksdb::{ BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, }; use crate::error::DbError; /// Maximal size of stored blocks in base /// /// Used to control db size /// /// Currently effectively unbounded. pub const BUFF_SIZE_ROCKSDB: usize = usize::MAX; /// Size of stored blocks cache in memory /// /// Keeping small to not run out of memory pub const CACHE_SIZE: usize = 1000; /// Key base for storing metainformation about id of first block in db pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db"; /// Key base for storing metainformation about id of last current block in db pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db"; /// Key base for storing metainformation which describe if first block has been set pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set"; /// Key base for storing metainformation about the last finalized block on Bedrock pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id"; /// Key base for storing metainformation about the last breakpoint pub const DB_META_LAST_BREAKPOINT_ID: &str = "last_breakpoint_id"; /// Interval between state breakpoints pub const BREAKPOINT_INTERVAL: u64 = 100; /// Name of block column family pub const CF_BLOCK_NAME: &str = "cf_block"; /// Name of meta column family pub const CF_META_NAME: &str = "cf_meta"; /// Name of breakpoint column family pub const CF_BREAKPOINT_NAME: &str = "cf_breakpoint"; pub type DbResult = Result; fn closest_breakpoint_id(block_id: u64) -> u64 { block_id.div(BREAKPOINT_INTERVAL) } pub struct RocksDBIO { pub db: DBWithThreadMode, } impl RocksDBIO { pub fn open_or_create(path: &Path, start_block: Option, initial_state: V02State) -> DbResult { let mut cf_opts = Options::default(); cf_opts.set_max_write_buffer_number(16); // ToDo: Add more column families for different data let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone()); let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone()); let cfbreakpoint = ColumnFamilyDescriptor::new(CF_BREAKPOINT_NAME, cf_opts.clone()); let mut db_opts = Options::default(); db_opts.create_missing_column_families(true); db_opts.create_if_missing(true); let db = DBWithThreadMode::::open_cf_descriptors( &db_opts, path, vec![cfb, cfmeta, cfbreakpoint], ); let dbio = Self { // There is no point in handling this from runner code db: db.unwrap(), }; let is_start_set = dbio.get_meta_is_first_block_set()?; if is_start_set { Ok(dbio) } else if let Some(block) = start_block { let block_id = block.header.block_id; dbio.put_meta_first_block_in_db(block)?; dbio.put_meta_is_first_block_set()?; dbio.put_meta_last_block_in_db(block_id)?; dbio.put_meta_last_finalized_block_id(None)?; // First breakpoint setup dbio.put_breakpoint(0, initial_state)?; dbio.put_meta_last_breakpoint_id(0)?; Ok(dbio) } else { // Here we are trying to start a DB without a block, one should not do it. unreachable!() } } pub fn destroy(path: &Path) -> DbResult<()> { let mut cf_opts = Options::default(); cf_opts.set_max_write_buffer_number(16); // ToDo: Add more column families for different data let _cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone()); let _cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone()); let _cfsnapshot = ColumnFamilyDescriptor::new(CF_BREAKPOINT_NAME, cf_opts.clone()); let mut db_opts = Options::default(); db_opts.create_missing_column_families(true); db_opts.create_if_missing(true); DBWithThreadMode::::destroy(&db_opts, path) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None)) } pub fn meta_column(&self) -> Arc> { self.db.cf_handle(CF_META_NAME).unwrap() } pub fn block_column(&self) -> Arc> { self.db.cf_handle(CF_BLOCK_NAME).unwrap() } pub fn breakpoint_column(&self) -> Arc> { self.db.cf_handle(CF_BREAKPOINT_NAME).unwrap() } pub fn get_meta_first_block_in_db(&self) -> DbResult { let cf_meta = self.meta_column(); let res = self .db .get_cf( &cf_meta, borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()), ) })?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; if let Some(data) = res { Ok(borsh::from_slice::(&data).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to deserialize first block".to_string()), ) })?) } else { Err(DbError::db_interaction_error( "First block not found".to_string(), )) } } pub fn get_meta_last_block_in_db(&self) -> DbResult { let cf_meta = self.meta_column(); let res = self .db .get_cf( &cf_meta, borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()), ) })?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; if let Some(data) = res { Ok(borsh::from_slice::(&data).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to deserialize last block".to_string()), ) })?) } else { Err(DbError::db_interaction_error( "Last block not found".to_string(), )) } } pub fn get_meta_is_first_block_set(&self) -> DbResult { let cf_meta = self.meta_column(); let res = self .db .get_cf( &cf_meta, borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()), ) })?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; Ok(res.is_some()) } pub fn get_meta_last_breakpoint_id(&self) -> DbResult { let cf_meta = self.meta_column(); let res = self.db .get_cf( &cf_meta, borsh::to_vec(&DB_META_LAST_BREAKPOINT_ID).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize DB_META_LAST_BREAKPOINT_ID".to_string()), ) })?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; if let Some(data) = res { Ok(borsh::from_slice::(&data).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to deserialize last breakpoint id".to_string()), ) })?) } else { Err(DbError::db_interaction_error( "Last breakpoint id not found".to_string(), )) } } pub fn put_meta_first_block_in_db(&self, block: Block) -> DbResult<()> { let cf_meta = self.meta_column(); self.db .put_cf( &cf_meta, borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_string()), ) })?, borsh::to_vec(&block.header.block_id).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize first block id".to_string()), ) })?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; self.put_block(block, true)?; Ok(()) } pub fn put_meta_last_block_in_db(&self, block_id: u64) -> DbResult<()> { let cf_meta = self.meta_column(); self.db .put_cf( &cf_meta, borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()), ) })?, borsh::to_vec(&block_id).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize last block id".to_string()), ) })?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; Ok(()) } pub fn put_meta_last_finalized_block_id(&self, block_id: Option) -> DbResult<()> { let cf_meta = self.meta_column(); self.db .put_cf( &cf_meta, borsh::to_vec(&DB_META_LAST_FINALIZED_BLOCK_ID).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize DB_META_LAST_FINALIZED_BLOCK_ID".to_string()), ) })?, borsh::to_vec(&block_id).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize last block id".to_string()), ) })?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; Ok(()) } pub fn put_meta_last_breakpoint_id(&self, br_id: u64) -> DbResult<()> { let cf_meta = self.meta_column(); self.db .put_cf( &cf_meta, borsh::to_vec(&DB_META_LAST_BREAKPOINT_ID).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize DB_META_LAST_BREAKPOINT_ID".to_string()), ) })?, borsh::to_vec(&br_id).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize last block id".to_string()), ) })?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; Ok(()) } pub fn put_meta_is_first_block_set(&self) -> DbResult<()> { let cf_meta = self.meta_column(); self.db .put_cf( &cf_meta, borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_string()), ) })?, [1u8; 1], ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; Ok(()) } pub fn put_block(&self, block: Block, first: bool) -> DbResult<()> { let cf_block = self.block_column(); if !first { let last_curr_block = self.get_meta_last_block_in_db()?; if block.header.block_id > last_curr_block { self.put_meta_last_block_in_db(block.header.block_id)?; } } self.db .put_cf( &cf_block, borsh::to_vec(&block.header.block_id).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize block id".to_string()), ) })?, borsh::to_vec(&block).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize block data".to_string()), ) })?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; if block.header.block_id.is_multiple_of(BREAKPOINT_INTERVAL) { self.put_next_breakpoint()?; } Ok(()) } pub fn get_block(&self, block_id: u64) -> DbResult { let cf_block = self.block_column(); let res = self .db .get_cf( &cf_block, borsh::to_vec(&block_id).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize block id".to_string()), ) })?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; if let Some(data) = res { Ok(borsh::from_slice::(&data).map_err(|serr| { DbError::borsh_cast_message( serr, Some("Failed to deserialize block data".to_string()), ) })?) } else { Err(DbError::db_interaction_error( "Block on this id not found".to_string(), )) } } pub fn put_breakpoint(&self, br_id: u64, breakpoint: V02State) -> DbResult<()> { let cf_br = self.breakpoint_column(); self.db .put_cf( &cf_br, borsh::to_vec(&br_id).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize breakpoint id".to_string()), ) })?, borsh::to_vec(&breakpoint).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize breakpoint data".to_string()), ) })?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None)) } pub fn get_breakpoint(&self, br_id: u64) -> DbResult { let cf_br = self.breakpoint_column(); let res = self .db .get_cf( &cf_br, borsh::to_vec(&br_id).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize breakpoint id".to_string()), ) })?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; if let Some(data) = res { Ok(borsh::from_slice::(&data).map_err(|serr| { DbError::borsh_cast_message( serr, Some("Failed to deserialize breakpoint data".to_string()), ) })?) } else { Err(DbError::db_interaction_error( "Breakpoint on this id not found".to_string(), )) } } pub fn calculate_state_for_id(&self, block_id: u64) -> DbResult { let last_block = self.get_meta_last_block_in_db()?; if last_block <= block_id { let br_id = closest_breakpoint_id(block_id); let mut breakpoint = self.get_breakpoint(br_id)?; for id in (BREAKPOINT_INTERVAL*br_id)..=block_id { let block = self.get_block(id)?; for encoded_transaction in block.body.transactions { let transaction = NSSATransaction::try_from(&encoded_transaction).unwrap(); execute_check_transaction_on_state(&mut breakpoint, transaction).unwrap(); } } Ok(breakpoint) } else { Err(DbError::db_interaction_error( "Block on this id not found".to_string(), )) } } pub fn put_next_breakpoint(&self) -> DbResult<()> { let last_block = self.get_meta_last_block_in_db()?; let breakpoint_id = self.get_meta_last_breakpoint_id()?; let block_to_break_id = breakpoint_id * BREAKPOINT_INTERVAL; if last_block <= block_to_break_id { let next_breakpoint = self.calculate_state_for_id(block_to_break_id)?; self.put_breakpoint(breakpoint_id, next_breakpoint)?; self.put_meta_last_breakpoint_id(breakpoint_id) } else { Err(DbError::db_interaction_error( "Breakpoint not yet achieved".to_string(), )) } } }