diff --git a/lez/indexer/core/src/block_store.rs b/lez/indexer/core/src/block_store.rs index b49c4bc9..3d0c83dc 100644 --- a/lez/indexer/core/src/block_store.rs +++ b/lez/indexer/core/src/block_store.rs @@ -8,7 +8,6 @@ use common::{ }; use lee::{Account, AccountId, GENESIS_BLOCK_ID, V03State}; use lee_core::BlockId; -use log::info; use logos_blockchain_core::header::HeaderId; use logos_blockchain_zone_sdk::Slot; use storage::indexer::RocksDBIO; @@ -287,68 +286,6 @@ impl IndexerStore { self.set_stall_reason(&None)?; Ok(AcceptOutcome::Applied) } - - pub async fn put_block(&self, mut block: Block, l1_header: HeaderId) -> Result<()> { - info!("Applying block {}", block.header.block_id); - { - let mut state_guard = self.current_state.write().await; - - let (clock_tx, user_txs) = block - .body - .transactions - .split_last() - .ok_or_else(|| anyhow::anyhow!("Block has no transactions"))?; - - anyhow::ensure!( - *clock_tx == LeeTransaction::Public(clock_invocation(block.header.timestamp)), - "Last transaction in block must be the clock invocation for the block timestamp" - ); - - let is_genesis = block.header.block_id == 1; - for transaction in user_txs { - if is_genesis { - let genesis_tx = match transaction { - LeeTransaction::Public(public_tx) => public_tx, - LeeTransaction::PrivacyPreserving(_) - | LeeTransaction::ProgramDeployment(_) => { - anyhow::bail!("Genesis block should contain only public transactions") - } - }; - state_guard - .transition_from_public_transaction( - genesis_tx, - block.header.block_id, - block.header.timestamp, - ) - .context("Failed to execute genesis public transaction")?; - } else { - transaction.clone().execute_on_state( - &mut state_guard, - block.header.block_id, - block.header.timestamp, - )?; - } - } - - // Apply the clock invocation directly (it is expected to modify clock accounts). - let LeeTransaction::Public(clock_public_tx) = clock_tx else { - anyhow::bail!("Clock invocation must be a public transaction"); - }; - state_guard.transition_from_public_transaction( - clock_public_tx, - block.header.block_id, - block.header.timestamp, - )?; - } - - // ToDo: Currently we are fetching only finalized blocks - // if it changes, the following lines need to be updated - // to represent correct block finality - block.bedrock_status = BedrockStatus::Finalized; - - info!("Putting block {} into DB", block.header.block_id); - Ok(self.dbio.put_block(&block, l1_header.into())?) - } } /// Applies a block's transactions to `state`, mapping every failure to a @@ -443,7 +380,7 @@ mod stall_reason_tests { #[cfg(test)] mod tests { - use common::{HashType, block::HashableBlockData}; + use common::test_utils::{create_transaction_native_token_transfer, produce_dummy_block}; use tempfile::tempdir; use testnet_initial_state::initial_pub_accounts_private_keys; @@ -461,105 +398,99 @@ mod tests { } #[tokio::test] - async fn state_transition() { + async fn accept_block_applies_transfers_and_advances_tip() { let home = tempdir().unwrap(); - - let storage = IndexerStore::open_db(home.as_ref()).unwrap(); + let store = IndexerStore::open_db(home.as_ref()).unwrap(); let initial_accounts = initial_pub_accounts_private_keys(); let from = initial_accounts[0].account_id; let to = initial_accounts[1].account_id; let sign_key = initial_accounts[0].pub_sign_key.clone(); - // Submit genesis block - let clock_tx = LeeTransaction::Public(clock_invocation(0)); - let genesis_block_data = HashableBlockData { - block_id: 1, - prev_block_hash: HashType::default(), - timestamp: 0, - transactions: vec![clock_tx], - }; - let genesis_block = genesis_block_data - .into_pending_block(&common::test_utils::sequencer_sign_key_for_testing()); - let mut prev_hash = Some(genesis_block.header.hash); - storage - .put_block(genesis_block, HeaderId::from([0_u8; 32])) - .await - .unwrap(); - - for i in 0..10 { - let tx = common::test_utils::create_transaction_native_token_transfer( - from, i, to, 10, &sign_key, - ); - let block_id = u64::try_from(i + 1).unwrap(); - - let next_block = common::test_utils::produce_dummy_block(block_id, prev_hash, vec![tx]); - prev_hash = Some(next_block.header.hash); - - storage - .put_block( - next_block, - HeaderId::from([u8::try_from(i + 1).unwrap(); 32]), - ) + // Genesis (block 1): clock-only. + let genesis = produce_dummy_block(1, None, vec![]); + let mut prev_hash = genesis.header.hash; + assert!(matches!( + store + .accept_block(&genesis, serde_json::Value::Null) .await - .unwrap(); + .unwrap(), + AcceptOutcome::Applied + )); + + // Blocks 2..=11: one native transfer of 10 each (nonces 0..=9). + for i in 0..10_u64 { + let tx = create_transaction_native_token_transfer(from, i as u128, to, 10, &sign_key); + let block = produce_dummy_block(i + 2, Some(prev_hash), vec![tx]); + prev_hash = block.header.hash; + assert!(matches!( + store + .accept_block(&block, serde_json::Value::Null) + .await + .unwrap(), + AcceptOutcome::Applied + )); } - let acc1_val = storage.account_current_state(&from).await.unwrap(); - let acc2_val = storage.account_current_state(&to).await.unwrap(); - - assert_eq!(acc1_val.balance, 9900); - assert_eq!(acc2_val.balance, 20100); + assert_eq!( + store.account_current_state(&from).await.unwrap().balance, + 9900 + ); + assert_eq!( + store.account_current_state(&to).await.unwrap().balance, + 20100 + ); + // Tip advanced to the last applied block; a clean run leaves no stall. + assert_eq!(store.get_last_block_id().unwrap(), Some(11)); + assert!(store.get_stall_reason().unwrap().is_none()); } #[tokio::test] - async fn account_state_at_block() { + async fn account_state_at_block_reflects_history() { let home = tempdir().unwrap(); - - let storage = IndexerStore::open_db(home.as_ref()).unwrap(); - - let mut prev_hash = None; + let store = IndexerStore::open_db(home.as_ref()).unwrap(); let initial_accounts = initial_pub_accounts_private_keys(); let from = initial_accounts[0].account_id; let to = initial_accounts[1].account_id; let sign_key = initial_accounts[0].pub_sign_key.clone(); - for i in 0..10 { - let tx = common::test_utils::create_transaction_native_token_transfer( - from, i, to, 10, &sign_key, - ); - let block_id = u64::try_from(i + 1).unwrap(); + let genesis = produce_dummy_block(1, None, vec![]); + let mut prev_hash = genesis.header.hash; + store + .accept_block(&genesis, serde_json::Value::Null) + .await + .unwrap(); - let next_block = common::test_utils::produce_dummy_block(block_id, prev_hash, vec![tx]); - prev_hash = Some(next_block.header.hash); - - storage - .put_block( - next_block, - HeaderId::from([u8::try_from(i + 1).unwrap(); 32]), - ) + for i in 0..10_u64 { + let tx = create_transaction_native_token_transfer(from, i as u128, to, 10, &sign_key); + let block = produce_dummy_block(i + 2, Some(prev_hash), vec![tx]); + prev_hash = block.header.hash; + store + .accept_block(&block, serde_json::Value::Null) .await .unwrap(); } - // Genesis block: no transfers applied yet. - let acc1_at_1 = storage.account_state_at_block(&from, 1).unwrap(); - let acc2_at_1 = storage.account_state_at_block(&to, 1).unwrap(); - assert_eq!(acc1_at_1.balance, 9990); - assert_eq!(acc2_at_1.balance, 20010); - - // After block 5: 4 transfers of 10 applied (one each in blocks 2..=5). - let acc1_at_5 = storage.account_state_at_block(&from, 5).unwrap(); - let acc2_at_5 = storage.account_state_at_block(&to, 5).unwrap(); - assert_eq!(acc1_at_5.balance, 9950); - assert_eq!(acc2_at_5.balance, 20050); - - // After final block 9: 8 transfers applied; should match current state. - let acc1_at_9 = storage.account_state_at_block(&from, 9).unwrap(); - let acc2_at_9 = storage.account_state_at_block(&to, 9).unwrap(); - assert_eq!(acc1_at_9.balance, 9910); - assert_eq!(acc2_at_9.balance, 20090); + // State at block N is inclusive of block N. + // Block 1 (genesis, clock-only): no transfers yet. + assert_eq!( + store.account_state_at_block(&from, 1).unwrap().balance, + 10000 + ); + assert_eq!(store.account_state_at_block(&to, 1).unwrap().balance, 20000); + // Through block 5: 4 transfers applied (blocks 2..=5). + assert_eq!( + store.account_state_at_block(&from, 5).unwrap().balance, + 9960 + ); + assert_eq!(store.account_state_at_block(&to, 5).unwrap().balance, 20040); + // Through block 9: 8 transfers applied (blocks 2..=9). + assert_eq!( + store.account_state_at_block(&from, 9).unwrap().balance, + 9920 + ); + assert_eq!(store.account_state_at_block(&to, 9).unwrap().balance, 20080); } } diff --git a/lez/indexer/core/src/lib.rs b/lez/indexer/core/src/lib.rs index 88f9feae..669b389c 100644 --- a/lez/indexer/core/src/lib.rs +++ b/lez/indexer/core/src/lib.rs @@ -7,14 +7,13 @@ use common::block::Block; use futures::StreamExt as _; pub use ingest_error::BlockIngestError; use log::{error, info, warn}; -use logos_blockchain_core::header::HeaderId; use logos_blockchain_zone_sdk::{ CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer, }; pub use stall_reason::StallReason; use crate::{ - block_store::IndexerStore, + block_store::{AcceptOutcome, IndexerStore}, config::IndexerConfig, status::{IndexerStatus, IndexerSyncStatus}, }; @@ -95,8 +94,6 @@ impl IndexerCore { let stream = match self.zone_indexer.next_messages(cursor).await { Ok(s) => s, Err(err) => { - // `next_messages` reads L1 consensus info internally, so - // this also covers an unreachable/misconfigured L1 node. error!("Failed to start zone-sdk next_messages stream: {err}"); self.set_status(IndexerSyncStatus::error(format!( "cannot reach L1 / read channel: {err}" @@ -107,11 +104,8 @@ impl IndexerCore { }; let mut stream = std::pin::pin!(stream); - // Flip to Syncing on the first message of this cycle (not merely on - // a successful poll) so the steady-state CaughtUp status doesn't - // flicker. Until then the state stays Starting (cold-start scan of - // empty L1 history) or CaughtUp (idle). let mut announced_syncing = false; + let mut had_cycle_error = false; while let Some((msg, slot)) = stream.next().await { if !announced_syncing { @@ -121,17 +115,25 @@ impl IndexerCore { let zone_block = match msg { ZoneMessage::Block(b) => b, - // Non-block messages don't carry a cursor position; the - // next ZoneBlock advances past them implicitly. ZoneMessage::Deposit(_) | ZoneMessage::Withdraw(_) => continue, }; + let l1_slot = serde_json::to_value(&slot).unwrap_or(serde_json::Value::Null); + let block: Block = match borsh::from_slice(&zone_block.data) { Ok(b) => b, Err(e) => { error!("Failed to deserialize L2 block from zone-sdk: {e}"); - // Advance past the broken inscription so we don't - // re-process it on restart. + if let Err(err) = + self.store.record_deserialize_stall(l1_slot, e.to_string()) + { + warn!("Failed to record stall reason: {err:#}"); + } + self.set_status(IndexerSyncStatus::stalled(format!( + "failed to deserialize L2 block: {e}" + ))); + // Advance the L1 read cursor past the broken inscription; + // the validated tip stays frozen. cursor = Some(slot); if let Err(err) = self.store.set_zone_cursor(&slot) { warn!("Failed to persist indexer cursor: {err:#}"); @@ -140,27 +142,53 @@ impl IndexerCore { } }; - info!("Indexed L2 block {}", block.header.block_id); - - // TODO: Remove l1_header placeholder once storage layer - // no longer requires it. Zone-sdk handles L1 tracking internally. - let placeholder_l1_header = HeaderId::from([0_u8; 32]); - if let Err(err) = self.store.put_block(block.clone(), placeholder_l1_header).await { - error!("Failed to store block {}: {err:#}", block.header.block_id); + match self.store.accept_block(&block, l1_slot).await { + Ok(AcceptOutcome::Applied) => { + info!("Indexed L2 block {}", block.header.block_id); + self.set_status(IndexerSyncStatus::syncing()); + cursor = Some(slot); + if let Err(err) = self.store.set_zone_cursor(&slot) { + warn!("Failed to persist indexer cursor: {err:#}"); + } + yield Ok(block); + } + Ok(AcceptOutcome::Parked(ingest_err)) => { + error!( + "Parked at block {}: {ingest_err}", + block.header.block_id + ); + self.set_status(IndexerSyncStatus::stalled(ingest_err.to_string())); + // Advance the L1 read cursor; tip stays frozen, no yield. + cursor = Some(slot); + if let Err(err) = self.store.set_zone_cursor(&slot) { + warn!("Failed to persist indexer cursor: {err:#}"); + } + } + Err(err) => { + // Infrastructure error (DB read/write), not a bad block. + // Keep the cursor put; re-poll the same position next cycle. + error!( + "Store error applying block {}: {err:#}", + block.header.block_id + ); + self.set_status(IndexerSyncStatus::error(format!( + "store error: {err:#}" + ))); + had_cycle_error = true; + break; + } } - - cursor = Some(slot); - if let Err(err) = self.store.set_zone_cursor(&slot) { - warn!("Failed to persist indexer cursor: {err:#}"); - } - yield Ok(block); } - // Stream drained: caught up to LIB as of this cycle. Clears any - // prior error (e.g. a transient L1 disconnect that left no - // backlog, so the `Syncing` branch above never ran). Sleep then - // poll again. - self.set_status(IndexerSyncStatus::caught_up()); + if had_cycle_error { + tokio::time::sleep(poll_interval).await; + continue; + } + + // Stream drained. Stay Stalled if parked; otherwise we are caught up. + if self.store.get_stall_reason().ok().flatten().is_none() { + self.set_status(IndexerSyncStatus::caught_up()); + } tokio::time::sleep(poll_interval).await; } }