From 44a11bd3f1bc81e0cf11602b9fe706ce7701047a Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Wed, 4 Mar 2026 14:12:39 +0200 Subject: [PATCH] fix: state interactions fix --- Cargo.lock | 3 + indexer/core/Cargo.toml | 2 + indexer/core/src/block_store.rs | 140 +++++++++++++++++++++++++---- indexer/core/src/lib.rs | 4 +- indexer/service/src/service.rs | 1 + integration_tests/tests/indexer.rs | 38 +------- 6 files changed, 131 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33f810e0..8081f5c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -126,6 +126,7 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92589714878ca59a7626ea19734f0e07a6a875197eec751bb5d3f99e64998c63" dependencies = [ + "actix-macros", "futures-core", "tokio", ] @@ -3742,6 +3743,7 @@ checksum = "4ee796ad498c8d9a1d68e477df8f754ed784ef875de1414ebdaf169f70a6a784" name = "indexer_core" version = "0.1.0" dependencies = [ + "actix-rt", "anyhow", "async-stream", "bedrock_client", @@ -3756,6 +3758,7 @@ dependencies = [ "serde", "serde_json", "storage", + "tempfile", "tokio", "url", ] diff --git a/indexer/core/Cargo.toml b/indexer/core/Cargo.toml index 792fb4b7..3096249c 100644 --- a/indexer/core/Cargo.toml +++ b/indexer/core/Cargo.toml @@ -22,3 +22,5 @@ url.workspace = true logos-blockchain-core.workspace = true serde_json.workspace = true async-stream.workspace = true +tempfile.workspace = true +actix-rt.workspace = true diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index beed8e6d..1f3a8263 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -8,11 +8,12 @@ use common::{ }; use nssa::{Account, AccountId, V02State}; use storage::indexer::RocksDBIO; +use tokio::sync::RwLock; #[derive(Clone)] pub struct IndexerStore { dbio: Arc, - final_state: V02State, + final_state: Arc>, } impl IndexerStore { @@ -29,7 +30,7 @@ impl IndexerStore { Ok(Self { dbio: Arc::new(dbio), - final_state, + final_state: Arc::new(RwLock::new(final_state)), }) } @@ -98,30 +99,29 @@ impl IndexerStore { Ok(self.dbio.calculate_state_for_id(block_id)?) } - pub fn final_state(&self) -> &V02State { - &self.final_state - } - - pub fn final_state_mut(&mut self) -> &mut V02State { - &mut self.final_state - } - pub fn final_state_db(&self) -> Result { Ok(self.dbio.final_state()?) } - pub fn get_account_final(&self, account_id: &AccountId) -> Result { - Ok(self.final_state().get_account_by_id(*account_id)) + pub async fn get_account_final(&self, account_id: &AccountId) -> Result { + let account = { + let state_guard = self.final_state.read().await; + state_guard.get_account_by_id(*account_id) + }; + + Ok(account) } - pub fn put_block(&mut self, mut block: Block, l1_header: HeaderId) -> Result<()> { - let final_state = self.final_state_mut(); + pub async fn put_block(&mut self, mut block: Block, l1_header: HeaderId) -> Result<()> { + { + let mut state_guard = self.final_state.write().await; - for transaction in &block.body.transactions { - transaction - .clone() - .transaction_stateless_check()? - .execute_check_on_state(final_state)?; + for transaction in &block.body.transactions { + transaction + .clone() + .transaction_stateless_check()? + .execute_check_on_state(&mut state_guard)?; + } } // ToDo: Currently we are fetching only finalized blocks @@ -132,3 +132,105 @@ impl IndexerStore { Ok(self.dbio.put_block(block, l1_header.into())?) } } + +#[cfg(test)] +mod tests { + use nssa::AccountId; + use tempfile::tempdir; + + use super::*; + + fn genesis_block() -> Block { + common::test_utils::produce_dummy_block(1, None, vec![]) + } + + fn acc1() -> AccountId { + AccountId::new([ + 148, 179, 206, 253, 199, 51, 82, 86, 232, 2, 152, 122, 80, 243, 54, 207, 237, 112, 83, + 153, 44, 59, 204, 49, 128, 84, 160, 227, 216, 149, 97, 102, + ]) + } + + fn acc2() -> AccountId { + AccountId::new([ + 30, 145, 107, 3, 207, 73, 192, 230, 160, 63, 238, 207, 18, 69, 54, 216, 103, 244, 92, + 94, 124, 248, 42, 16, 141, 19, 119, 18, 14, 226, 140, 204, + ]) + } + + fn acc1_sign_key() -> nssa::PrivateKey { + nssa::PrivateKey::try_new([1; 32]).unwrap() + } + + fn acc2_sign_key() -> nssa::PrivateKey { + nssa::PrivateKey::try_new([2; 32]).unwrap() + } + + fn initial_state() -> V02State { + nssa::V02State::new_with_genesis_accounts(&[(acc1(), 10000), (acc2(), 20000)], &[]) + } + + fn transfer(amount: u128, nonce: u128, direction: bool) -> NSSATransaction { + let from; + let to; + let sign_key; + + if direction { + from = acc1(); + to = acc2(); + sign_key = acc1_sign_key(); + } else { + from = acc2(); + to = acc1(); + sign_key = acc2_sign_key(); + } + + common::test_utils::create_transaction_native_token_transfer( + from, nonce, to, amount, sign_key, + ) + } + + #[test] + fn test_correct_startup() { + let storage = IndexerStore::open_db_with_genesis( + tempdir().unwrap().as_ref(), + Some((genesis_block(), initial_state())), + ) + .unwrap(); + + let block = storage.get_block_at_id(1).unwrap(); + let final_id = storage.get_last_block_id().unwrap(); + + assert_eq!(block.header.hash, genesis_block().header.hash); + assert_eq!(final_id, 1); + } + + #[actix_rt::test] + async fn test_state_transition() { + let mut storage = IndexerStore::open_db_with_genesis( + tempdir().unwrap().as_ref(), + Some((genesis_block(), initial_state())), + ) + .unwrap(); + + let mut prev_hash = genesis_block().header.hash; + + for i in 2..10 { + let tx = transfer(10, i - 2, true); + let next_block = + common::test_utils::produce_dummy_block(i as u64, Some(prev_hash), vec![tx]); + prev_hash = next_block.header.hash; + + storage + .put_block(next_block, HeaderId::from([i as u8; 32])) + .await + .unwrap(); + } + + let acc1_val = storage.get_account_final(&acc1()).await.unwrap(); + let acc2_val = storage.get_account_final(&acc2()).await.unwrap(); + + assert_eq!(acc1_val.balance, 9920); + assert_eq!(acc2_val.balance, 20080); + } +} diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 5037ae20..153a8556 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -129,7 +129,7 @@ impl IndexerCore { info!("Parsed {} L2 blocks with ids {:?}", l2_block_vec.len(), l2_blocks_parsed_ids); for l2_block in l2_block_vec { - self.store.put_block(l2_block.clone(), l1_header)?; + self.store.put_block(l2_block.clone(), l1_header).await?; yield Ok(l2_block); } @@ -163,7 +163,7 @@ impl IndexerCore { info!("Parsed {} L2 blocks with ids {:?}", l2_block_vec.len(), l2_blocks_parsed_ids); for l2_block in l2_block_vec { - self.store.put_block(l2_block.clone(), header)?; + self.store.put_block(l2_block.clone(), header).await?; yield Ok(l2_block); } diff --git a/indexer/service/src/service.rs b/indexer/service/src/service.rs index e02b6c3b..e6eb40f8 100644 --- a/indexer/service/src/service.rs +++ b/indexer/service/src/service.rs @@ -75,6 +75,7 @@ impl indexer_service_rpc::RpcServer for IndexerService { .indexer .store .get_account_final(&account_id.into()) + .await .map_err(db_error)? .into()) } diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs index d5207a41..5a3e38a5 100644 --- a/integration_tests/tests/indexer.rs +++ b/integration_tests/tests/indexer.rs @@ -1,13 +1,9 @@ use std::time::Duration; -use anyhow::{Context, Result}; +use anyhow::Result; use indexer_service_rpc::RpcClient; -use integration_tests::{ - TIME_TO_WAIT_FOR_BLOCK_SECONDS, TestContext, format_private_account_id, - format_public_account_id, verify_commitment_is_in_state, -}; +use integration_tests::{TIME_TO_WAIT_FOR_BLOCK_SECONDS, TestContext, format_public_account_id}; use log::info; -use nssa::AccountId; use tokio::test; use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand}; @@ -111,36 +107,6 @@ async fn indexer_state_consistency() -> Result<()> { assert_eq!(acc_1_balance.balance, 9900); assert_eq!(acc_2_balance.balance, 20100); - let from: AccountId = ctx.existing_private_accounts()[0]; - let to: AccountId = ctx.existing_private_accounts()[1]; - - let command = Command::AuthTransfer(AuthTransferSubcommand::Send { - from: format_private_account_id(from), - to: Some(format_private_account_id(to)), - to_npk: None, - to_vpk: None, - amount: 100, - }); - - wallet::cli::execute_subcommand(ctx.wallet_mut(), command).await?; - - info!("Waiting for next block creation"); - tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await; - - let new_commitment1 = ctx - .wallet() - .get_private_account_commitment(from) - .context("Failed to get private account commitment for sender")?; - assert!(verify_commitment_is_in_state(new_commitment1, ctx.sequencer_client()).await); - - let new_commitment2 = ctx - .wallet() - .get_private_account_commitment(to) - .context("Failed to get private account commitment for receiver")?; - assert!(verify_commitment_is_in_state(new_commitment2, ctx.sequencer_client()).await); - - info!("Successfully transferred privately to owned account"); - // WAIT info!("Waiting for indexer to parse blocks"); tokio::time::sleep(std::time::Duration::from_millis(L2_TO_L1_TIMEOUT_MILLIS)).await;