diff --git a/Cargo.lock b/Cargo.lock index fab7b447..6bb8255c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3714,6 +3714,7 @@ dependencies = [ "serde", "serde_json", "storage", + "tempfile", "tokio", "url", ] diff --git a/indexer/core/Cargo.toml b/indexer/core/Cargo.toml index 198cf78e..8129c1ea 100644 --- a/indexer/core/Cargo.toml +++ b/indexer/core/Cargo.toml @@ -25,3 +25,7 @@ url.workspace = true logos-blockchain-core.workspace = true serde_json.workspace = true async-stream.workspace = true + +[dev-dependencies] +tempfile.workspace = true + diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index 8742184f..db2f855b 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -8,10 +8,12 @@ use common::{ }; use nssa::{Account, AccountId, V02State}; use storage::indexer::RocksDBIO; +use tokio::sync::RwLock; #[derive(Clone)] pub struct IndexerStore { dbio: Arc, + current_state: Arc>, } impl IndexerStore { @@ -25,9 +27,11 @@ impl IndexerStore { initial_state: &V02State, ) -> Result { let dbio = RocksDBIO::open_or_create(location, genesis_block, initial_state)?; + let current_state = dbio.final_state()?; Ok(Self { dbio: Arc::new(dbio), + current_state: Arc::new(RwLock::new(current_state)), }) } @@ -93,22 +97,31 @@ impl IndexerStore { Ok(self.dbio.calculate_state_for_id(block_id)?) } - pub fn final_state(&self) -> Result { + /// Recalculation of final state directly from DB. + /// + /// Used for indexer healthcheck. + pub fn recalculate_final_state(&self) -> Result { Ok(self.dbio.final_state()?) } - pub fn get_account_final(&self, account_id: &AccountId) -> Result { - Ok(self.final_state()?.get_account_by_id(*account_id)) + pub async fn account_current_state(&self, account_id: &AccountId) -> Result { + Ok(self + .current_state + .read() + .await + .get_account_by_id(*account_id)) } - pub fn put_block(&self, mut block: Block, l1_header: HeaderId) -> Result<()> { - let mut final_state = self.dbio.final_state()?; + pub async fn put_block(&self, mut block: Block, l1_header: HeaderId) -> Result<()> { + { + let mut state_guard = self.current_state.write().await; - for transaction in &block.body.transactions { - transaction - .clone() - .transaction_stateless_check()? - .execute_check_on_state(&mut final_state)?; + for transaction in &block.body.transactions { + transaction + .clone() + .transaction_stateless_check()? + .execute_check_on_state(&mut state_guard)?; + } } // ToDo: Currently we are fetching only finalized blocks @@ -119,3 +132,95 @@ impl IndexerStore { Ok(self.dbio.put_block(&block, l1_header.into())?) } } + +#[cfg(test)] +mod tests { + use nssa::{AccountId, PublicKey}; + use tempfile::tempdir; + + use super::*; + + fn genesis_block() -> Block { + common::test_utils::produce_dummy_block(1, None, vec![]) + } + + fn acc1_sign_key() -> nssa::PrivateKey { + nssa::PrivateKey::try_new([1; 32]).unwrap() + } + + fn acc2_sign_key() -> nssa::PrivateKey { + nssa::PrivateKey::try_new([2; 32]).unwrap() + } + + fn acc1() -> AccountId { + AccountId::from(&PublicKey::new_from_private_key(&acc1_sign_key())) + } + + fn acc2() -> AccountId { + AccountId::from(&PublicKey::new_from_private_key(&acc2_sign_key())) + } + + #[test] + fn correct_startup() { + let home = tempdir().unwrap(); + + let storage = IndexerStore::open_db_with_genesis( + home.as_ref(), + &genesis_block(), + &nssa::V02State::new_with_genesis_accounts(&[(acc1(), 10000), (acc2(), 20000)], &[]), + ) + .unwrap(); + + let block = storage.get_block_at_id(1).unwrap(); + let final_id = storage.get_last_block_id().unwrap(); + + assert_eq!(block.header.hash, genesis_block().header.hash); + assert_eq!(final_id, 1); + } + + #[tokio::test] + async fn state_transition() { + let home = tempdir().unwrap(); + + let storage = IndexerStore::open_db_with_genesis( + home.as_ref(), + &genesis_block(), + &nssa::V02State::new_with_genesis_accounts(&[(acc1(), 10000), (acc2(), 20000)], &[]), + ) + .unwrap(); + + let mut prev_hash = genesis_block().header.hash; + + let from = acc1(); + let to = acc2(); + let sign_key = acc1_sign_key(); + + for i in 2..10 { + let tx = common::test_utils::create_transaction_native_token_transfer( + from, + i - 2, + to, + 10, + &sign_key, + ); + + let next_block = common::test_utils::produce_dummy_block( + u64::try_from(i).unwrap(), + Some(prev_hash), + vec![tx], + ); + prev_hash = next_block.header.hash; + + storage + .put_block(next_block, HeaderId::from([u8::try_from(i).unwrap(); 32])) + .await + .unwrap(); + } + + let acc1_val = storage.account_current_state(&acc1()).await.unwrap(); + let acc2_val = storage.account_current_state(&acc2()).await.unwrap(); + + assert_eq!(acc1_val.balance, 9920); + assert_eq!(acc2_val.balance, 20080); + } +} diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 8f901774..6c96821e 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -124,8 +124,8 @@ impl IndexerCore { l2_blocks_parsed_ids.sort_unstable(); info!("Parsed {} L2 blocks with ids {:?}", l2_block_vec.len(), l2_blocks_parsed_ids); - for l2_block in l2_block_vec { - self.store.put_block(l2_block.clone(), l1_header)?; + for l2_block in l2_block_vec { + self.store.put_block(l2_block.clone(), l1_header).await?; yield Ok(l2_block); } @@ -158,7 +158,7 @@ impl IndexerCore { info!("Parsed {} L2 blocks with ids {:?}", l2_block_vec.len(), l2_blocks_parsed_ids); for l2_block in l2_block_vec { - self.store.put_block(l2_block.clone(), header)?; + self.store.put_block(l2_block.clone(), header).await?; yield Ok(l2_block); } diff --git a/indexer/service/src/service.rs b/indexer/service/src/service.rs index 2b2e3eca..256ef33d 100644 --- a/indexer/service/src/service.rs +++ b/indexer/service/src/service.rs @@ -74,7 +74,8 @@ impl indexer_service_rpc::RpcServer for IndexerService { Ok(self .indexer .store - .get_account_final(&account_id.into()) + .account_current_state(&account_id.into()) + .await .map_err(db_error)? .into()) } @@ -131,7 +132,11 @@ impl indexer_service_rpc::RpcServer for IndexerService { async fn healthcheck(&self) -> Result<(), ErrorObjectOwned> { // Checking, that indexer can calculate last state - let _ = self.indexer.store.final_state().map_err(db_error)?; + let _ = self + .indexer + .store + .recalculate_final_state() + .map_err(db_error)?; Ok(()) } diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs index 5169aacf..0b947135 100644 --- a/integration_tests/tests/indexer.rs +++ b/integration_tests/tests/indexer.rs @@ -1,19 +1,14 @@ #![expect( - clippy::shadow_unrelated, clippy::tests_outside_test_module, reason = "We don't care about these in tests" )] use std::time::Duration; -use anyhow::{Context as _, Result}; +use anyhow::Result; use indexer_service_rpc::RpcClient as _; -use integration_tests::{ - TIME_TO_WAIT_FOR_BLOCK_SECONDS, TestContext, format_private_account_id, - format_public_account_id, verify_commitment_is_in_state, -}; +use integration_tests::{TIME_TO_WAIT_FOR_BLOCK_SECONDS, TestContext, format_public_account_id}; use log::info; -use nssa::AccountId; use tokio::test; use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand}; @@ -120,36 +115,6 @@ async fn indexer_state_consistency() -> Result<()> { assert_eq!(acc_1_balance.balance, 9900); assert_eq!(acc_2_balance.balance, 20100); - let from: AccountId = ctx.existing_private_accounts()[0]; - let to: AccountId = ctx.existing_private_accounts()[1]; - - let command = Command::AuthTransfer(AuthTransferSubcommand::Send { - from: format_private_account_id(from), - to: Some(format_private_account_id(to)), - to_npk: None, - to_vpk: None, - amount: 100, - }); - - wallet::cli::execute_subcommand(ctx.wallet_mut(), command).await?; - - info!("Waiting for next block creation"); - tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await; - - let new_commitment1 = ctx - .wallet() - .get_private_account_commitment(from) - .context("Failed to get private account commitment for sender")?; - assert!(verify_commitment_is_in_state(new_commitment1, ctx.sequencer_client()).await); - - let new_commitment2 = ctx - .wallet() - .get_private_account_commitment(to) - .context("Failed to get private account commitment for receiver")?; - assert!(verify_commitment_is_in_state(new_commitment2, ctx.sequencer_client()).await); - - info!("Successfully transferred privately to owned account"); - // WAIT info!("Waiting for indexer to parse blocks"); tokio::time::sleep(std::time::Duration::from_millis(L2_TO_L1_TIMEOUT_MILLIS)).await;