Merge pull request #374 from logos-blockchain/Pravdyvy/indexer-final-state

Final state caching
This commit is contained in:
Pravdyvy 2026-03-18 09:49:28 +02:00 committed by GitHub
commit 821cb891d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 132 additions and 52 deletions

1
Cargo.lock generated
View File

@ -3714,6 +3714,7 @@ dependencies = [
"serde",
"serde_json",
"storage",
"tempfile",
"tokio",
"url",
]

View File

@ -25,3 +25,7 @@ url.workspace = true
logos-blockchain-core.workspace = true
serde_json.workspace = true
async-stream.workspace = true
[dev-dependencies]
tempfile.workspace = true

View File

@ -8,10 +8,12 @@ use common::{
};
use nssa::{Account, AccountId, V02State};
use storage::indexer::RocksDBIO;
use tokio::sync::RwLock;
#[derive(Clone)]
pub struct IndexerStore {
dbio: Arc<RocksDBIO>,
current_state: Arc<RwLock<V02State>>,
}
impl IndexerStore {
@ -25,9 +27,11 @@ impl IndexerStore {
initial_state: &V02State,
) -> Result<Self> {
let dbio = RocksDBIO::open_or_create(location, genesis_block, initial_state)?;
let current_state = dbio.final_state()?;
Ok(Self {
dbio: Arc::new(dbio),
current_state: Arc::new(RwLock::new(current_state)),
})
}
@ -93,22 +97,31 @@ impl IndexerStore {
Ok(self.dbio.calculate_state_for_id(block_id)?)
}
pub fn final_state(&self) -> Result<V02State> {
/// Recalculation of final state directly from DB.
///
/// Used for indexer healthcheck.
pub fn recalculate_final_state(&self) -> Result<V02State> {
Ok(self.dbio.final_state()?)
}
pub fn get_account_final(&self, account_id: &AccountId) -> Result<Account> {
Ok(self.final_state()?.get_account_by_id(*account_id))
pub async fn account_current_state(&self, account_id: &AccountId) -> Result<Account> {
Ok(self
.current_state
.read()
.await
.get_account_by_id(*account_id))
}
pub fn put_block(&self, mut block: Block, l1_header: HeaderId) -> Result<()> {
let mut final_state = self.dbio.final_state()?;
pub async fn put_block(&self, mut block: Block, l1_header: HeaderId) -> Result<()> {
{
let mut state_guard = self.current_state.write().await;
for transaction in &block.body.transactions {
transaction
.clone()
.transaction_stateless_check()?
.execute_check_on_state(&mut final_state)?;
for transaction in &block.body.transactions {
transaction
.clone()
.transaction_stateless_check()?
.execute_check_on_state(&mut state_guard)?;
}
}
// ToDo: Currently we are fetching only finalized blocks
@ -119,3 +132,95 @@ impl IndexerStore {
Ok(self.dbio.put_block(&block, l1_header.into())?)
}
}
#[cfg(test)]
mod tests {
use nssa::{AccountId, PublicKey};
use tempfile::tempdir;
use super::*;
fn genesis_block() -> Block {
common::test_utils::produce_dummy_block(1, None, vec![])
}
fn acc1_sign_key() -> nssa::PrivateKey {
nssa::PrivateKey::try_new([1; 32]).unwrap()
}
fn acc2_sign_key() -> nssa::PrivateKey {
nssa::PrivateKey::try_new([2; 32]).unwrap()
}
fn acc1() -> AccountId {
AccountId::from(&PublicKey::new_from_private_key(&acc1_sign_key()))
}
fn acc2() -> AccountId {
AccountId::from(&PublicKey::new_from_private_key(&acc2_sign_key()))
}
#[test]
fn correct_startup() {
let home = tempdir().unwrap();
let storage = IndexerStore::open_db_with_genesis(
home.as_ref(),
&genesis_block(),
&nssa::V02State::new_with_genesis_accounts(&[(acc1(), 10000), (acc2(), 20000)], &[]),
)
.unwrap();
let block = storage.get_block_at_id(1).unwrap();
let final_id = storage.get_last_block_id().unwrap();
assert_eq!(block.header.hash, genesis_block().header.hash);
assert_eq!(final_id, 1);
}
#[tokio::test]
async fn state_transition() {
let home = tempdir().unwrap();
let storage = IndexerStore::open_db_with_genesis(
home.as_ref(),
&genesis_block(),
&nssa::V02State::new_with_genesis_accounts(&[(acc1(), 10000), (acc2(), 20000)], &[]),
)
.unwrap();
let mut prev_hash = genesis_block().header.hash;
let from = acc1();
let to = acc2();
let sign_key = acc1_sign_key();
for i in 2..10 {
let tx = common::test_utils::create_transaction_native_token_transfer(
from,
i - 2,
to,
10,
&sign_key,
);
let next_block = common::test_utils::produce_dummy_block(
u64::try_from(i).unwrap(),
Some(prev_hash),
vec![tx],
);
prev_hash = next_block.header.hash;
storage
.put_block(next_block, HeaderId::from([u8::try_from(i).unwrap(); 32]))
.await
.unwrap();
}
let acc1_val = storage.account_current_state(&acc1()).await.unwrap();
let acc2_val = storage.account_current_state(&acc2()).await.unwrap();
assert_eq!(acc1_val.balance, 9920);
assert_eq!(acc2_val.balance, 20080);
}
}

View File

@ -124,8 +124,8 @@ impl IndexerCore {
l2_blocks_parsed_ids.sort_unstable();
info!("Parsed {} L2 blocks with ids {:?}", l2_block_vec.len(), l2_blocks_parsed_ids);
for l2_block in l2_block_vec {
self.store.put_block(l2_block.clone(), l1_header)?;
for l2_block in l2_block_vec {
self.store.put_block(l2_block.clone(), l1_header).await?;
yield Ok(l2_block);
}
@ -158,7 +158,7 @@ impl IndexerCore {
info!("Parsed {} L2 blocks with ids {:?}", l2_block_vec.len(), l2_blocks_parsed_ids);
for l2_block in l2_block_vec {
self.store.put_block(l2_block.clone(), header)?;
self.store.put_block(l2_block.clone(), header).await?;
yield Ok(l2_block);
}

View File

@ -74,7 +74,8 @@ impl indexer_service_rpc::RpcServer for IndexerService {
Ok(self
.indexer
.store
.get_account_final(&account_id.into())
.account_current_state(&account_id.into())
.await
.map_err(db_error)?
.into())
}
@ -131,7 +132,11 @@ impl indexer_service_rpc::RpcServer for IndexerService {
async fn healthcheck(&self) -> Result<(), ErrorObjectOwned> {
// Checking, that indexer can calculate last state
let _ = self.indexer.store.final_state().map_err(db_error)?;
let _ = self
.indexer
.store
.recalculate_final_state()
.map_err(db_error)?;
Ok(())
}

View File

@ -1,19 +1,14 @@
#![expect(
clippy::shadow_unrelated,
clippy::tests_outside_test_module,
reason = "We don't care about these in tests"
)]
use std::time::Duration;
use anyhow::{Context as _, Result};
use anyhow::Result;
use indexer_service_rpc::RpcClient as _;
use integration_tests::{
TIME_TO_WAIT_FOR_BLOCK_SECONDS, TestContext, format_private_account_id,
format_public_account_id, verify_commitment_is_in_state,
};
use integration_tests::{TIME_TO_WAIT_FOR_BLOCK_SECONDS, TestContext, format_public_account_id};
use log::info;
use nssa::AccountId;
use tokio::test;
use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand};
@ -120,36 +115,6 @@ async fn indexer_state_consistency() -> Result<()> {
assert_eq!(acc_1_balance.balance, 9900);
assert_eq!(acc_2_balance.balance, 20100);
let from: AccountId = ctx.existing_private_accounts()[0];
let to: AccountId = ctx.existing_private_accounts()[1];
let command = Command::AuthTransfer(AuthTransferSubcommand::Send {
from: format_private_account_id(from),
to: Some(format_private_account_id(to)),
to_npk: None,
to_vpk: None,
amount: 100,
});
wallet::cli::execute_subcommand(ctx.wallet_mut(), command).await?;
info!("Waiting for next block creation");
tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await;
let new_commitment1 = ctx
.wallet()
.get_private_account_commitment(from)
.context("Failed to get private account commitment for sender")?;
assert!(verify_commitment_is_in_state(new_commitment1, ctx.sequencer_client()).await);
let new_commitment2 = ctx
.wallet()
.get_private_account_commitment(to)
.context("Failed to get private account commitment for receiver")?;
assert!(verify_commitment_is_in_state(new_commitment2, ctx.sequencer_client()).await);
info!("Successfully transferred privately to owned account");
// WAIT
info!("Waiting for indexer to parse blocks");
tokio::time::sleep(std::time::Duration::from_millis(L2_TO_L1_TIMEOUT_MILLIS)).await;