From ed299932c5b413311ef873d5bbd9ff5d7e485c41 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Mon, 2 Mar 2026 18:25:22 +0200 Subject: [PATCH 1/5] fear: final state field --- indexer/core/src/block_store.rs | 21 ++++++++++++++++----- indexer/core/src/lib.rs | 4 +++- indexer/service/src/service.rs | 4 ++-- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index ce3881bd..beed8e6d 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -12,6 +12,7 @@ use storage::indexer::RocksDBIO; #[derive(Clone)] pub struct IndexerStore { dbio: Arc, + final_state: V02State, } impl IndexerStore { @@ -24,9 +25,11 @@ impl IndexerStore { start_data: Option<(Block, V02State)>, ) -> Result { let dbio = RocksDBIO::open_or_create(location, start_data)?; + let final_state = dbio.final_state()?; Ok(Self { dbio: Arc::new(dbio), + final_state, }) } @@ -95,22 +98,30 @@ impl IndexerStore { Ok(self.dbio.calculate_state_for_id(block_id)?) } - pub fn final_state(&self) -> Result { + pub fn final_state(&self) -> &V02State { + &self.final_state + } + + pub fn final_state_mut(&mut self) -> &mut V02State { + &mut self.final_state + } + + pub fn final_state_db(&self) -> Result { Ok(self.dbio.final_state()?) } pub fn get_account_final(&self, account_id: &AccountId) -> Result { - Ok(self.final_state()?.get_account_by_id(*account_id)) + Ok(self.final_state().get_account_by_id(*account_id)) } - pub fn put_block(&self, mut block: Block, l1_header: HeaderId) -> Result<()> { - let mut final_state = self.dbio.final_state()?; + pub fn put_block(&mut self, mut block: Block, l1_header: HeaderId) -> Result<()> { + let final_state = self.final_state_mut(); for transaction in &block.body.transactions { transaction .clone() .transaction_stateless_check()? - .execute_check_on_state(&mut final_state)?; + .execute_check_on_state(final_state)?; } // ToDo: Currently we are fetching only finalized blocks diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 6d56eb18..5037ae20 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -98,7 +98,9 @@ impl IndexerCore { }) } - pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream> { + pub async fn subscribe_parse_block_stream( + &mut self, + ) -> impl futures::Stream> { async_stream::stream! { info!("Searching for initial header"); diff --git a/indexer/service/src/service.rs b/indexer/service/src/service.rs index 13ec2ea8..e02b6c3b 100644 --- a/indexer/service/src/service.rs +++ b/indexer/service/src/service.rs @@ -127,7 +127,7 @@ impl indexer_service_rpc::RpcServer for IndexerService { async fn healthcheck(&self) -> Result<(), ErrorObjectOwned> { // Checking, that indexer can calculate last state - let _ = self.indexer.store.final_state().map_err(db_error)?; + let _ = self.indexer.store.final_state_db().map_err(db_error)?; Ok(()) } @@ -179,7 +179,7 @@ impl SubscriptionService { Ok(()) } - fn spawn_respond_subscribers_loop(indexer: IndexerCore) -> SubscriptionLoopParts { + fn spawn_respond_subscribers_loop(mut indexer: IndexerCore) -> SubscriptionLoopParts { let (new_subscription_sender, mut sub_receiver) = tokio::sync::mpsc::unbounded_channel::>(); From 44a11bd3f1bc81e0cf11602b9fe706ce7701047a Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Wed, 4 Mar 2026 14:12:39 +0200 Subject: [PATCH 2/5] fix: state interactions fix --- Cargo.lock | 3 + indexer/core/Cargo.toml | 2 + indexer/core/src/block_store.rs | 140 +++++++++++++++++++++++++---- indexer/core/src/lib.rs | 4 +- indexer/service/src/service.rs | 1 + integration_tests/tests/indexer.rs | 38 +------- 6 files changed, 131 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33f810e0..8081f5c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -126,6 +126,7 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92589714878ca59a7626ea19734f0e07a6a875197eec751bb5d3f99e64998c63" dependencies = [ + "actix-macros", "futures-core", "tokio", ] @@ -3742,6 +3743,7 @@ checksum = "4ee796ad498c8d9a1d68e477df8f754ed784ef875de1414ebdaf169f70a6a784" name = "indexer_core" version = "0.1.0" dependencies = [ + "actix-rt", "anyhow", "async-stream", "bedrock_client", @@ -3756,6 +3758,7 @@ dependencies = [ "serde", "serde_json", "storage", + "tempfile", "tokio", "url", ] diff --git a/indexer/core/Cargo.toml b/indexer/core/Cargo.toml index 792fb4b7..3096249c 100644 --- a/indexer/core/Cargo.toml +++ b/indexer/core/Cargo.toml @@ -22,3 +22,5 @@ url.workspace = true logos-blockchain-core.workspace = true serde_json.workspace = true async-stream.workspace = true +tempfile.workspace = true +actix-rt.workspace = true diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index beed8e6d..1f3a8263 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -8,11 +8,12 @@ use common::{ }; use nssa::{Account, AccountId, V02State}; use storage::indexer::RocksDBIO; +use tokio::sync::RwLock; #[derive(Clone)] pub struct IndexerStore { dbio: Arc, - final_state: V02State, + final_state: Arc>, } impl IndexerStore { @@ -29,7 +30,7 @@ impl IndexerStore { Ok(Self { dbio: Arc::new(dbio), - final_state, + final_state: Arc::new(RwLock::new(final_state)), }) } @@ -98,30 +99,29 @@ impl IndexerStore { Ok(self.dbio.calculate_state_for_id(block_id)?) } - pub fn final_state(&self) -> &V02State { - &self.final_state - } - - pub fn final_state_mut(&mut self) -> &mut V02State { - &mut self.final_state - } - pub fn final_state_db(&self) -> Result { Ok(self.dbio.final_state()?) } - pub fn get_account_final(&self, account_id: &AccountId) -> Result { - Ok(self.final_state().get_account_by_id(*account_id)) + pub async fn get_account_final(&self, account_id: &AccountId) -> Result { + let account = { + let state_guard = self.final_state.read().await; + state_guard.get_account_by_id(*account_id) + }; + + Ok(account) } - pub fn put_block(&mut self, mut block: Block, l1_header: HeaderId) -> Result<()> { - let final_state = self.final_state_mut(); + pub async fn put_block(&mut self, mut block: Block, l1_header: HeaderId) -> Result<()> { + { + let mut state_guard = self.final_state.write().await; - for transaction in &block.body.transactions { - transaction - .clone() - .transaction_stateless_check()? - .execute_check_on_state(final_state)?; + for transaction in &block.body.transactions { + transaction + .clone() + .transaction_stateless_check()? + .execute_check_on_state(&mut state_guard)?; + } } // ToDo: Currently we are fetching only finalized blocks @@ -132,3 +132,105 @@ impl IndexerStore { Ok(self.dbio.put_block(block, l1_header.into())?) } } + +#[cfg(test)] +mod tests { + use nssa::AccountId; + use tempfile::tempdir; + + use super::*; + + fn genesis_block() -> Block { + common::test_utils::produce_dummy_block(1, None, vec![]) + } + + fn acc1() -> AccountId { + AccountId::new([ + 148, 179, 206, 253, 199, 51, 82, 86, 232, 2, 152, 122, 80, 243, 54, 207, 237, 112, 83, + 153, 44, 59, 204, 49, 128, 84, 160, 227, 216, 149, 97, 102, + ]) + } + + fn acc2() -> AccountId { + AccountId::new([ + 30, 145, 107, 3, 207, 73, 192, 230, 160, 63, 238, 207, 18, 69, 54, 216, 103, 244, 92, + 94, 124, 248, 42, 16, 141, 19, 119, 18, 14, 226, 140, 204, + ]) + } + + fn acc1_sign_key() -> nssa::PrivateKey { + nssa::PrivateKey::try_new([1; 32]).unwrap() + } + + fn acc2_sign_key() -> nssa::PrivateKey { + nssa::PrivateKey::try_new([2; 32]).unwrap() + } + + fn initial_state() -> V02State { + nssa::V02State::new_with_genesis_accounts(&[(acc1(), 10000), (acc2(), 20000)], &[]) + } + + fn transfer(amount: u128, nonce: u128, direction: bool) -> NSSATransaction { + let from; + let to; + let sign_key; + + if direction { + from = acc1(); + to = acc2(); + sign_key = acc1_sign_key(); + } else { + from = acc2(); + to = acc1(); + sign_key = acc2_sign_key(); + } + + common::test_utils::create_transaction_native_token_transfer( + from, nonce, to, amount, sign_key, + ) + } + + #[test] + fn test_correct_startup() { + let storage = IndexerStore::open_db_with_genesis( + tempdir().unwrap().as_ref(), + Some((genesis_block(), initial_state())), + ) + .unwrap(); + + let block = storage.get_block_at_id(1).unwrap(); + let final_id = storage.get_last_block_id().unwrap(); + + assert_eq!(block.header.hash, genesis_block().header.hash); + assert_eq!(final_id, 1); + } + + #[actix_rt::test] + async fn test_state_transition() { + let mut storage = IndexerStore::open_db_with_genesis( + tempdir().unwrap().as_ref(), + Some((genesis_block(), initial_state())), + ) + .unwrap(); + + let mut prev_hash = genesis_block().header.hash; + + for i in 2..10 { + let tx = transfer(10, i - 2, true); + let next_block = + common::test_utils::produce_dummy_block(i as u64, Some(prev_hash), vec![tx]); + prev_hash = next_block.header.hash; + + storage + .put_block(next_block, HeaderId::from([i as u8; 32])) + .await + .unwrap(); + } + + let acc1_val = storage.get_account_final(&acc1()).await.unwrap(); + let acc2_val = storage.get_account_final(&acc2()).await.unwrap(); + + assert_eq!(acc1_val.balance, 9920); + assert_eq!(acc2_val.balance, 20080); + } +} diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 5037ae20..153a8556 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -129,7 +129,7 @@ impl IndexerCore { info!("Parsed {} L2 blocks with ids {:?}", l2_block_vec.len(), l2_blocks_parsed_ids); for l2_block in l2_block_vec { - self.store.put_block(l2_block.clone(), l1_header)?; + self.store.put_block(l2_block.clone(), l1_header).await?; yield Ok(l2_block); } @@ -163,7 +163,7 @@ impl IndexerCore { info!("Parsed {} L2 blocks with ids {:?}", l2_block_vec.len(), l2_blocks_parsed_ids); for l2_block in l2_block_vec { - self.store.put_block(l2_block.clone(), header)?; + self.store.put_block(l2_block.clone(), header).await?; yield Ok(l2_block); } diff --git a/indexer/service/src/service.rs b/indexer/service/src/service.rs index e02b6c3b..e6eb40f8 100644 --- a/indexer/service/src/service.rs +++ b/indexer/service/src/service.rs @@ -75,6 +75,7 @@ impl indexer_service_rpc::RpcServer for IndexerService { .indexer .store .get_account_final(&account_id.into()) + .await .map_err(db_error)? .into()) } diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs index d5207a41..5a3e38a5 100644 --- a/integration_tests/tests/indexer.rs +++ b/integration_tests/tests/indexer.rs @@ -1,13 +1,9 @@ use std::time::Duration; -use anyhow::{Context, Result}; +use anyhow::Result; use indexer_service_rpc::RpcClient; -use integration_tests::{ - TIME_TO_WAIT_FOR_BLOCK_SECONDS, TestContext, format_private_account_id, - format_public_account_id, verify_commitment_is_in_state, -}; +use integration_tests::{TIME_TO_WAIT_FOR_BLOCK_SECONDS, TestContext, format_public_account_id}; use log::info; -use nssa::AccountId; use tokio::test; use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand}; @@ -111,36 +107,6 @@ async fn indexer_state_consistency() -> Result<()> { assert_eq!(acc_1_balance.balance, 9900); assert_eq!(acc_2_balance.balance, 20100); - let from: AccountId = ctx.existing_private_accounts()[0]; - let to: AccountId = ctx.existing_private_accounts()[1]; - - let command = Command::AuthTransfer(AuthTransferSubcommand::Send { - from: format_private_account_id(from), - to: Some(format_private_account_id(to)), - to_npk: None, - to_vpk: None, - amount: 100, - }); - - wallet::cli::execute_subcommand(ctx.wallet_mut(), command).await?; - - info!("Waiting for next block creation"); - tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await; - - let new_commitment1 = ctx - .wallet() - .get_private_account_commitment(from) - .context("Failed to get private account commitment for sender")?; - assert!(verify_commitment_is_in_state(new_commitment1, ctx.sequencer_client()).await); - - let new_commitment2 = ctx - .wallet() - .get_private_account_commitment(to) - .context("Failed to get private account commitment for receiver")?; - assert!(verify_commitment_is_in_state(new_commitment2, ctx.sequencer_client()).await); - - info!("Successfully transferred privately to owned account"); - // WAIT info!("Waiting for indexer to parse blocks"); tokio::time::sleep(std::time::Duration::from_millis(L2_TO_L1_TIMEOUT_MILLIS)).await; From 8a8c7f722bbf1b5ba47748994992a87a2fbda0bf Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Tue, 17 Mar 2026 15:10:12 +0200 Subject: [PATCH 3/5] fix: suggestions fix --- Cargo.lock | 2 - indexer/core/Cargo.toml | 4 +- indexer/core/src/block_store.rs | 98 +++++++++++++++------------------ indexer/service/src/service.rs | 2 +- 4 files changed, 48 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8081f5c4..524d07d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -126,7 +126,6 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92589714878ca59a7626ea19734f0e07a6a875197eec751bb5d3f99e64998c63" dependencies = [ - "actix-macros", "futures-core", "tokio", ] @@ -3743,7 +3742,6 @@ checksum = "4ee796ad498c8d9a1d68e477df8f754ed784ef875de1414ebdaf169f70a6a784" name = "indexer_core" version = "0.1.0" dependencies = [ - "actix-rt", "anyhow", "async-stream", "bedrock_client", diff --git a/indexer/core/Cargo.toml b/indexer/core/Cargo.toml index 3096249c..a7acc0bd 100644 --- a/indexer/core/Cargo.toml +++ b/indexer/core/Cargo.toml @@ -22,5 +22,7 @@ url.workspace = true logos-blockchain-core.workspace = true serde_json.workspace = true async-stream.workspace = true + +[dev-dependencies] tempfile.workspace = true -actix-rt.workspace = true + diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index 1f3a8263..a06180c3 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -13,7 +13,7 @@ use tokio::sync::RwLock; #[derive(Clone)] pub struct IndexerStore { dbio: Arc, - final_state: Arc>, + current_state: Arc>, } impl IndexerStore { @@ -26,11 +26,11 @@ impl IndexerStore { start_data: Option<(Block, V02State)>, ) -> Result { let dbio = RocksDBIO::open_or_create(location, start_data)?; - let final_state = dbio.final_state()?; + let current_state = dbio.final_state()?; Ok(Self { dbio: Arc::new(dbio), - final_state: Arc::new(RwLock::new(final_state)), + current_state: Arc::new(RwLock::new(current_state)), }) } @@ -103,18 +103,17 @@ impl IndexerStore { Ok(self.dbio.final_state()?) } - pub async fn get_account_final(&self, account_id: &AccountId) -> Result { - let account = { - let state_guard = self.final_state.read().await; - state_guard.get_account_by_id(*account_id) - }; - - Ok(account) + pub async fn account_current_state(&self, account_id: &AccountId) -> Result { + Ok(self + .current_state + .read() + .await + .get_account_by_id(*account_id)) } pub async fn put_block(&mut self, mut block: Block, l1_header: HeaderId) -> Result<()> { { - let mut state_guard = self.final_state.write().await; + let mut state_guard = self.current_state.write().await; for transaction in &block.body.transactions { transaction @@ -135,7 +134,7 @@ impl IndexerStore { #[cfg(test)] mod tests { - use nssa::AccountId; + use nssa::{AccountId, PublicKey}; use tempfile::tempdir; use super::*; @@ -144,20 +143,6 @@ mod tests { common::test_utils::produce_dummy_block(1, None, vec![]) } - fn acc1() -> AccountId { - AccountId::new([ - 148, 179, 206, 253, 199, 51, 82, 86, 232, 2, 152, 122, 80, 243, 54, 207, 237, 112, 83, - 153, 44, 59, 204, 49, 128, 84, 160, 227, 216, 149, 97, 102, - ]) - } - - fn acc2() -> AccountId { - AccountId::new([ - 30, 145, 107, 3, 207, 73, 192, 230, 160, 63, 238, 207, 18, 69, 54, 216, 103, 244, 92, - 94, 124, 248, 42, 16, 141, 19, 119, 18, 14, 226, 140, 204, - ]) - } - fn acc1_sign_key() -> nssa::PrivateKey { nssa::PrivateKey::try_new([1; 32]).unwrap() } @@ -166,35 +151,24 @@ mod tests { nssa::PrivateKey::try_new([2; 32]).unwrap() } - fn initial_state() -> V02State { - nssa::V02State::new_with_genesis_accounts(&[(acc1(), 10000), (acc2(), 20000)], &[]) + fn acc1() -> AccountId { + AccountId::from(&PublicKey::new_from_private_key(&acc1_sign_key())) } - fn transfer(amount: u128, nonce: u128, direction: bool) -> NSSATransaction { - let from; - let to; - let sign_key; - - if direction { - from = acc1(); - to = acc2(); - sign_key = acc1_sign_key(); - } else { - from = acc2(); - to = acc1(); - sign_key = acc2_sign_key(); - } - - common::test_utils::create_transaction_native_token_transfer( - from, nonce, to, amount, sign_key, - ) + fn acc2() -> AccountId { + AccountId::from(&PublicKey::new_from_private_key(&acc2_sign_key())) } #[test] fn test_correct_startup() { + let home = tempdir().unwrap(); + let storage = IndexerStore::open_db_with_genesis( - tempdir().unwrap().as_ref(), - Some((genesis_block(), initial_state())), + home.as_ref(), + Some(( + genesis_block(), + nssa::V02State::new_with_genesis_accounts(&[(acc1(), 10000), (acc2(), 20000)], &[]), + )), ) .unwrap(); @@ -205,18 +179,34 @@ mod tests { assert_eq!(final_id, 1); } - #[actix_rt::test] + #[tokio::test] async fn test_state_transition() { + let home = tempdir().unwrap(); + let mut storage = IndexerStore::open_db_with_genesis( - tempdir().unwrap().as_ref(), - Some((genesis_block(), initial_state())), + home.as_ref(), + Some(( + genesis_block(), + nssa::V02State::new_with_genesis_accounts(&[(acc1(), 10000), (acc2(), 20000)], &[]), + )), ) .unwrap(); let mut prev_hash = genesis_block().header.hash; + let from = acc1(); + let to = acc2(); + let sign_key = acc1_sign_key(); + for i in 2..10 { - let tx = transfer(10, i - 2, true); + let tx = common::test_utils::create_transaction_native_token_transfer( + from, + i - 2, + to, + 10, + sign_key.clone(), + ); + let next_block = common::test_utils::produce_dummy_block(i as u64, Some(prev_hash), vec![tx]); prev_hash = next_block.header.hash; @@ -227,8 +217,8 @@ mod tests { .unwrap(); } - let acc1_val = storage.get_account_final(&acc1()).await.unwrap(); - let acc2_val = storage.get_account_final(&acc2()).await.unwrap(); + let acc1_val = storage.account_current_state(&acc1()).await.unwrap(); + let acc2_val = storage.account_current_state(&acc2()).await.unwrap(); assert_eq!(acc1_val.balance, 9920); assert_eq!(acc2_val.balance, 20080); diff --git a/indexer/service/src/service.rs b/indexer/service/src/service.rs index e6eb40f8..11800160 100644 --- a/indexer/service/src/service.rs +++ b/indexer/service/src/service.rs @@ -74,7 +74,7 @@ impl indexer_service_rpc::RpcServer for IndexerService { Ok(self .indexer .store - .get_account_final(&account_id.into()) + .account_current_state(&account_id.into()) .await .map_err(db_error)? .into()) From 2fb8c2c87f11978c9123d627b133153cbaa08969 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Tue, 17 Mar 2026 15:18:12 +0200 Subject: [PATCH 4/5] fix: suggestions fix 2 --- indexer/core/src/block_store.rs | 5 ++++- indexer/service/src/service.rs | 6 +++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index a06180c3..46cf4f59 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -99,7 +99,10 @@ impl IndexerStore { Ok(self.dbio.calculate_state_for_id(block_id)?) } - pub fn final_state_db(&self) -> Result { + /// Recalculation of final state directly from DB + /// + /// Used for indexer healthcheck + pub fn recalculate_final_state(&self) -> Result { Ok(self.dbio.final_state()?) } diff --git a/indexer/service/src/service.rs b/indexer/service/src/service.rs index 11800160..17b7172e 100644 --- a/indexer/service/src/service.rs +++ b/indexer/service/src/service.rs @@ -128,7 +128,11 @@ impl indexer_service_rpc::RpcServer for IndexerService { async fn healthcheck(&self) -> Result<(), ErrorObjectOwned> { // Checking, that indexer can calculate last state - let _ = self.indexer.store.final_state_db().map_err(db_error)?; + let _ = self + .indexer + .store + .recalculate_final_state() + .map_err(db_error)?; Ok(()) } From de3d17c7f8bc521c0f78c41cbed287b37981db3b Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Wed, 18 Mar 2026 09:16:36 +0200 Subject: [PATCH 5/5] fix: fmt --- indexer/core/src/block_store.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index 82a836a0..db2f855b 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -204,8 +204,11 @@ mod tests { &sign_key, ); - let next_block = - common::test_utils::produce_dummy_block(u64::try_from(i).unwrap(), Some(prev_hash), vec![tx]); + let next_block = common::test_utils::produce_dummy_block( + u64::try_from(i).unwrap(), + Some(prev_hash), + vec![tx], + ); prev_hash = next_block.header.hash; storage