diff --git a/Cargo.lock b/Cargo.lock index c7c61f15..359b5cb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6785,6 +6785,7 @@ dependencies = [ "common", "nssa", "rocksdb", + "tempfile", "thiserror 2.0.17", ] diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index 220d606c..27f866b2 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -34,6 +34,10 @@ impl IndexerStore { Self::open_db_with_genesis(location, None) } + pub fn get_last_block_id(&self) -> Result { + Ok(self.dbio.get_meta_last_block_in_db()?) + } + pub fn get_block_at_id(&self, id: u64) -> Result { Ok(self.dbio.get_block(id)?) } diff --git a/indexer/service/rpc/src/lib.rs b/indexer/service/rpc/src/lib.rs index 52c5f0fb..eb0656ce 100644 --- a/indexer/service/rpc/src/lib.rs +++ b/indexer/service/rpc/src/lib.rs @@ -26,6 +26,9 @@ pub trait Rpc { #[subscription(name = "subscribeToFinalizedBlocks", item = BlockId)] async fn subscribe_to_finalized_blocks(&self) -> SubscriptionResult; + #[method(name = "getLastFinalizedBlockId")] + async fn get_last_finalized_block_id(&self) -> Result; + #[method(name = "getBlockById")] async fn get_block_by_id(&self, block_id: BlockId) -> Result; @@ -48,4 +51,8 @@ pub trait Rpc { limit: u32, offset: u32, ) -> Result, ErrorObjectOwned>; + + // ToDo: expand healthcheck response into some kind of report + #[method(name = "checkHealth")] + async fn healthcheck(&self) -> Result<(), ErrorObjectOwned>; } diff --git a/indexer/service/src/mock_service.rs b/indexer/service/src/mock_service.rs index e7afda18..e1a69d34 100644 --- a/indexer/service/src/mock_service.rs +++ b/indexer/service/src/mock_service.rs @@ -179,6 +179,15 @@ impl indexer_service_rpc::RpcServer for MockIndexerService { Ok(()) } + async fn get_last_finalized_block_id(&self) -> Result { + self.blocks + .last() + .map(|bl| bl.header.block_id) + .ok_or_else(|| { + ErrorObjectOwned::owned(-32001, format!("Last block not found"), None::<()>) + }) + } + async fn get_block_by_id(&self, block_id: BlockId) -> Result { self.blocks .iter() @@ -267,4 +276,8 @@ impl indexer_service_rpc::RpcServer for MockIndexerService { .map(|(tx, _)| tx.clone()) .collect()) } + + async fn healthcheck(&self) -> Result<(), ErrorObjectOwned> { + Ok(()) + } } diff --git a/indexer/service/src/service.rs b/indexer/service/src/service.rs index 2c51aa3f..0d1d7028 100644 --- a/indexer/service/src/service.rs +++ b/indexer/service/src/service.rs @@ -41,6 +41,12 @@ impl indexer_service_rpc::RpcServer for IndexerService { Ok(()) } + async fn get_last_finalized_block_id(&self) -> Result { + self.indexer.store.get_last_block_id().map_err(|err| { + ErrorObjectOwned::owned(-32001, format!("DBError"), Some(format!("{err:#?}"))) + }) + } + async fn get_block_by_id(&self, block_id: BlockId) -> Result { self.indexer .store @@ -161,6 +167,15 @@ impl indexer_service_rpc::RpcServer for IndexerService { Ok(tx_res) } + + async fn healthcheck(&self) -> Result<(), ErrorObjectOwned> { + // Checking, that indexer can calculate last state + let _ = self.indexer.store.final_state().map_err(|err| { + ErrorObjectOwned::owned(-32001, format!("DBError"), Some(format!("{err:#?}"))) + })?; + + Ok(()) + } } struct SubscriptionService { diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 91d2231a..b6374249 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -11,3 +11,4 @@ nssa.workspace = true thiserror.workspace = true borsh.workspace = true rocksdb.workspace = true +tempfile.workspace = true diff --git a/storage/src/indexer.rs b/storage/src/indexer.rs index 67778a39..937e6355 100644 --- a/storage/src/indexer.rs +++ b/storage/src/indexer.rs @@ -53,7 +53,7 @@ pub const CF_ACC_TO_TX: &str = "cf_acc_to_tx"; pub type DbResult = Result; fn closest_breakpoint_id(block_id: u64) -> u64 { - block_id.div(BREAKPOINT_INTERVAL) + block_id.saturating_sub(1).div(BREAKPOINT_INTERVAL) } pub struct RocksDBIO { @@ -354,7 +354,7 @@ impl RocksDBIO { pub fn put_block(&self, block: Block) -> DbResult<()> { let cf_block = self.block_column(); let cf_hti = self.hash_to_id_column(); - let cf_tti = self.hash_to_id_column(); + let cf_tti: Arc> = self.tx_hash_to_id_column(); // ToDo: rewrite this with write batching @@ -577,7 +577,7 @@ impl RocksDBIO { pub fn calculate_state_for_id(&self, block_id: u64) -> DbResult { let last_block = self.get_meta_last_block_in_db()?; - if last_block <= block_id { + if block_id <= last_block { let br_id = closest_breakpoint_id(block_id); let mut breakpoint = self.get_breakpoint(br_id)?; @@ -631,14 +631,14 @@ impl RocksDBIO { pub fn put_next_breakpoint(&self) -> DbResult<()> { let last_block = self.get_meta_last_block_in_db()?; - let breakpoint_id = self.get_meta_last_breakpoint_id()?; - let block_to_break_id = breakpoint_id * BREAKPOINT_INTERVAL; + let next_breakpoint_id = self.get_meta_last_breakpoint_id()? + 1; + let block_to_break_id = next_breakpoint_id * BREAKPOINT_INTERVAL; - if last_block <= block_to_break_id { + if block_to_break_id <= last_block { let next_breakpoint = self.calculate_state_for_id(block_to_break_id)?; - self.put_breakpoint(breakpoint_id, next_breakpoint)?; - self.put_meta_last_breakpoint_id(breakpoint_id) + self.put_breakpoint(next_breakpoint_id, next_breakpoint)?; + self.put_meta_last_breakpoint_id(next_breakpoint_id) } else { Err(DbError::db_interaction_error( "Breakpoint not yet achieved".to_string(), @@ -686,7 +686,7 @@ impl RocksDBIO { borsh::to_vec(&tx_hash).map_err(|err| { DbError::borsh_cast_message( err, - Some("Failed to serialize block hash".to_string()), + Some("Failed to serialize transaction hash".to_string()), ) })?, ) @@ -701,7 +701,7 @@ impl RocksDBIO { })?) } else { Err(DbError::db_interaction_error( - "Block on this hash not found".to_string(), + "Block for this tx hash not found".to_string(), )) } } @@ -851,7 +851,7 @@ impl RocksDBIO { let mut tx_batch = vec![]; for tx_hash in self.get_acc_transaction_hashes(acc_id, offset, limit)? { - let block_id = self.get_block_id_by_hash(tx_hash)?; + let block_id = self.get_block_id_by_tx_hash(tx_hash)?; let block = self.get_block(block_id)?; let enc_tx = block @@ -877,3 +877,399 @@ impl RocksDBIO { Ok(tx_batch) } } + +#[cfg(test)] +mod tests { + use common::transaction::EncodedTransaction; + use nssa::AccountId; + use tempfile::tempdir; + + use super::*; + + fn genesis_block() -> Block { + common::test_utils::produce_dummy_block(1, None, vec![]) + } + + fn acc1() -> AccountId { + AccountId::new([ + 208, 122, 210, 232, 75, 39, 250, 0, 194, 98, 240, 161, 238, 160, 255, 53, 202, 9, 115, + 84, 126, 106, 16, 111, 114, 241, 147, 194, 220, 131, 139, 68, + ]) + } + + fn acc2() -> AccountId { + AccountId::new([ + 231, 174, 119, 197, 239, 26, 5, 153, 147, 68, 175, 73, 159, 199, 138, 23, 5, 57, 141, + 98, 237, 6, 207, 46, 20, 121, 246, 222, 248, 154, 57, 188, + ]) + } + + fn acc1_sign_key() -> nssa::PrivateKey { + nssa::PrivateKey::try_new([1; 32]).unwrap() + } + + fn acc2_sign_key() -> nssa::PrivateKey { + nssa::PrivateKey::try_new([2; 32]).unwrap() + } + + fn initial_state() -> V02State { + nssa::V02State::new_with_genesis_accounts(&[(acc1(), 10000), (acc2(), 20000)], &[]) + } + + fn transfer(amount: u128, nonce: u128, direction: bool) -> EncodedTransaction { + let from; + let to; + let sign_key; + + if direction { + from = acc1(); + to = acc2(); + sign_key = acc1_sign_key(); + } else { + from = acc2(); + to = acc1(); + sign_key = acc2_sign_key(); + } + + common::test_utils::create_transaction_native_token_transfer( + *from.value(), + nonce, + *to.value(), + amount, + sign_key, + ) + } + + #[test] + fn test_start_db() { + let temp_dir = tempdir().unwrap(); + let temdir_path = temp_dir.path(); + + let dbio = RocksDBIO::open_or_create(temdir_path, Some((genesis_block(), initial_state()))) + .unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let first_id = dbio.get_meta_first_block_in_db().unwrap(); + let is_first_set = dbio.get_meta_is_first_block_set().unwrap(); + let last_br_id = dbio.get_meta_last_breakpoint_id().unwrap(); + let last_block = dbio.get_block(1).unwrap(); + let breakpoint = dbio.get_breakpoint(0).unwrap(); + let final_state = dbio.final_state().unwrap(); + + assert_eq!(last_id, 1); + assert_eq!(first_id, 1); + assert_eq!(is_first_set, true); + assert_eq!(last_br_id, 0); + assert_eq!(last_block.header.hash, genesis_block().header.hash); + assert_eq!( + breakpoint.get_account_by_id(&acc1()), + final_state.get_account_by_id(&acc1()) + ); + assert_eq!( + breakpoint.get_account_by_id(&acc2()), + final_state.get_account_by_id(&acc2()) + ); + } + + #[test] + fn test_one_block_insertion() { + let temp_dir = tempdir().unwrap(); + let temdir_path = temp_dir.path(); + + let dbio = RocksDBIO::open_or_create(temdir_path, Some((genesis_block(), initial_state()))) + .unwrap(); + + let prev_hash = genesis_block().header.hash; + let transfer_tx = transfer(1, 0, true); + let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); + + dbio.put_block(block).unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let first_id = dbio.get_meta_first_block_in_db().unwrap(); + let is_first_set = dbio.get_meta_is_first_block_set().unwrap(); + let last_br_id = dbio.get_meta_last_breakpoint_id().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + let breakpoint = dbio.get_breakpoint(0).unwrap(); + let final_state = dbio.final_state().unwrap(); + + assert_eq!(last_id, 2); + assert_eq!(first_id, 1); + assert_eq!(is_first_set, true); + assert_eq!(last_br_id, 0); + assert_ne!(last_block.header.hash, genesis_block().header.hash); + assert_eq!( + breakpoint.get_account_by_id(&acc1()).balance + - final_state.get_account_by_id(&acc1()).balance, + 1 + ); + assert_eq!( + final_state.get_account_by_id(&acc2()).balance + - breakpoint.get_account_by_id(&acc2()).balance, + 1 + ); + } + + #[test] + fn test_new_breakpoint() { + let temp_dir = tempdir().unwrap(); + let temdir_path = temp_dir.path(); + + let dbio = RocksDBIO::open_or_create(temdir_path, Some((genesis_block(), initial_state()))) + .unwrap(); + + for i in 1..BREAKPOINT_INTERVAL { + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + + let prev_hash = last_block.header.hash; + let transfer_tx = transfer(1, (i - 1) as u128, true); + let block = + common::test_utils::produce_dummy_block(i + 1, Some(prev_hash), vec![transfer_tx]); + dbio.put_block(block).unwrap(); + } + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let first_id = dbio.get_meta_first_block_in_db().unwrap(); + let is_first_set = dbio.get_meta_is_first_block_set().unwrap(); + let last_br_id = dbio.get_meta_last_breakpoint_id().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + let prev_breakpoint = dbio.get_breakpoint(0).unwrap(); + let breakpoint = dbio.get_breakpoint(1).unwrap(); + let final_state = dbio.final_state().unwrap(); + + assert_eq!(last_id, 100); + assert_eq!(first_id, 1); + assert_eq!(is_first_set, true); + assert_eq!(last_br_id, 1); + assert_ne!(last_block.header.hash, genesis_block().header.hash); + assert_eq!( + prev_breakpoint.get_account_by_id(&acc1()).balance + - final_state.get_account_by_id(&acc1()).balance, + 99 + ); + assert_eq!( + final_state.get_account_by_id(&acc2()).balance + - prev_breakpoint.get_account_by_id(&acc2()).balance, + 99 + ); + assert_eq!( + breakpoint.get_account_by_id(&acc1()), + final_state.get_account_by_id(&acc1()) + ); + assert_eq!( + breakpoint.get_account_by_id(&acc2()), + final_state.get_account_by_id(&acc2()) + ); + } + + #[test] + fn test_simple_maps() { + let temp_dir = tempdir().unwrap(); + let temdir_path = temp_dir.path(); + + let dbio = RocksDBIO::open_or_create(temdir_path, Some((genesis_block(), initial_state()))) + .unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + + let prev_hash = last_block.header.hash; + let transfer_tx = transfer(1, 0, true); + let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); + + let control_hash1 = block.header.hash; + + dbio.put_block(block).unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + + let prev_hash = last_block.header.hash; + let transfer_tx = transfer(1, 1, true); + let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]); + + let control_hash2 = block.header.hash; + + dbio.put_block(block).unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + + let prev_hash = last_block.header.hash; + let transfer_tx = transfer(1, 2, true); + + let control_tx_hash1 = transfer_tx.hash(); + + let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); + dbio.put_block(block).unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + + let prev_hash = last_block.header.hash; + let transfer_tx = transfer(1, 3, true); + + let control_tx_hash2 = transfer_tx.hash(); + + let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); + dbio.put_block(block).unwrap(); + + let control_block_id1 = dbio.get_block_id_by_hash(control_hash1).unwrap(); + let control_block_id2 = dbio.get_block_id_by_hash(control_hash2).unwrap(); + let control_block_id3 = dbio.get_block_id_by_tx_hash(control_tx_hash1).unwrap(); + let control_block_id4 = dbio.get_block_id_by_tx_hash(control_tx_hash2).unwrap(); + + assert_eq!(control_block_id1, 2); + assert_eq!(control_block_id2, 3); + assert_eq!(control_block_id3, 4); + assert_eq!(control_block_id4, 5); + } + + #[test] + fn test_block_batch() { + let temp_dir = tempdir().unwrap(); + let temdir_path = temp_dir.path(); + + let mut block_res = vec![]; + + let dbio = RocksDBIO::open_or_create(temdir_path, Some((genesis_block(), initial_state()))) + .unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + + let prev_hash = last_block.header.hash; + let transfer_tx = transfer(1, 0, true); + let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); + + block_res.push(block.clone()); + dbio.put_block(block).unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + + let prev_hash = last_block.header.hash; + let transfer_tx = transfer(1, 1, true); + let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]); + + block_res.push(block.clone()); + dbio.put_block(block).unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + + let prev_hash = last_block.header.hash; + let transfer_tx = transfer(1, 2, true); + + let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); + block_res.push(block.clone()); + dbio.put_block(block).unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + + let prev_hash = last_block.header.hash; + let transfer_tx = transfer(1, 3, true); + + let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); + block_res.push(block.clone()); + dbio.put_block(block).unwrap(); + + let block_hashes_mem: Vec<[u8; 32]> = + block_res.into_iter().map(|bl| bl.header.hash).collect(); + + let batch_res = dbio.get_block_batch(2, 4).unwrap(); + + let block_hashes_db: Vec<[u8; 32]> = + batch_res.into_iter().map(|bl| bl.header.hash).collect(); + + assert_eq!(block_hashes_mem, block_hashes_db); + + let block_hashes_mem_limited = &block_hashes_mem[1..]; + + let batch_res_limited = dbio.get_block_batch(3, 4).unwrap(); + + let block_hashes_db_limited: Vec<[u8; 32]> = batch_res_limited + .into_iter() + .map(|bl| bl.header.hash) + .collect(); + + assert_eq!(block_hashes_mem_limited, block_hashes_db_limited.as_slice()); + } + + #[test] + fn test_account_map() { + let temp_dir = tempdir().unwrap(); + let temdir_path = temp_dir.path(); + + let mut tx_hash_res = vec![]; + + let dbio = RocksDBIO::open_or_create(temdir_path, Some((genesis_block(), initial_state()))) + .unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + + let prev_hash = last_block.header.hash; + let transfer_tx = transfer(1, 0, true); + + tx_hash_res.push(transfer_tx.hash()); + + let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); + + dbio.put_block(block).unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + + let prev_hash = last_block.header.hash; + let transfer_tx = transfer(1, 1, true); + + tx_hash_res.push(transfer_tx.hash()); + + let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]); + + dbio.put_block(block).unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + + let prev_hash = last_block.header.hash; + let transfer_tx = transfer(1, 2, true); + + tx_hash_res.push(transfer_tx.hash()); + + let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); + + dbio.put_block(block).unwrap(); + + let last_id = dbio.get_meta_last_block_in_db().unwrap(); + let last_block = dbio.get_block(last_id).unwrap(); + + let prev_hash = last_block.header.hash; + let transfer_tx = transfer(1, 3, true); + + tx_hash_res.push(transfer_tx.hash()); + + let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); + + dbio.put_block(block).unwrap(); + + let acc1_tx = dbio.get_acc_transactions(*acc1().value(), 0, 4).unwrap(); + let acc1_tx_hashes: Vec<[u8; 32]> = acc1_tx + .into_iter() + .map(|tx| EncodedTransaction::from(tx).hash()) + .collect(); + + assert_eq!(acc1_tx_hashes, tx_hash_res); + + let acc1_tx_limited = dbio.get_acc_transactions(*acc1().value(), 1, 4).unwrap(); + let acc1_tx_limited_hashes: Vec<[u8; 32]> = acc1_tx_limited + .into_iter() + .map(|tx| EncodedTransaction::from(tx).hash()) + .collect(); + + assert_eq!(acc1_tx_limited_hashes.as_slice(), &tx_hash_res[1..]) + } +}