diff --git a/lez/indexer/core/src/block_store.rs b/lez/indexer/core/src/block_store.rs index a101bf94..6fe3b229 100644 --- a/lez/indexer/core/src/block_store.rs +++ b/lez/indexer/core/src/block_store.rs @@ -2,10 +2,11 @@ use std::{path::Path, sync::Arc}; use anyhow::{Context as _, Result}; use common::{ - block::{BedrockStatus, Block}, + HashType, + block::{BedrockStatus, Block, BlockHeader}, transaction::{LeeTransaction, clock_invocation}, }; -use lee::{Account, AccountId, V03State}; +use lee::{Account, AccountId, GENESIS_BLOCK_ID, V03State}; use lee_core::BlockId; use log::info; use logos_blockchain_core::header::HeaderId; @@ -13,7 +14,20 @@ use logos_blockchain_zone_sdk::Slot; use storage::indexer::RocksDBIO; use tokio::sync::RwLock; -use crate::chain_breaker::ChainBreaker; +use crate::{chain_breaker::ChainBreaker, ingest_error::BlockIngestError}; + +struct Tip { + block_id: u64, + hash: HashType, +} + +/// Outcome of feeding a parsed L2 block to the validated tip. +pub enum AcceptOutcome { + /// Chained and applied; tip and L1 read cursor both advance. + Applied, + /// Did not chain or failed to apply; tip stays frozen, breaker recorded. + Parked(BlockIngestError), +} #[derive(Clone)] pub struct IndexerStore { @@ -151,6 +165,129 @@ impl IndexerStore { .get_account_by_id(*account_id)) } + /// The last successfully applied block as `{block_id, hash}`, or `None` before + /// any block is stored (cold start). Read fresh from the store each call. + fn validated_tip(&self) -> Result> { + let Some(block_id) = self.dbio.get_meta_last_block_id_in_db()? else { + return Ok(None); + }; + let Some(block) = self.dbio.get_block(block_id)? else { + return Ok(None); + }; + Ok(Some(Tip { + block_id, + hash: block.header.hash, + })) + } + + /// Returns `Some(err)` if `block` is not the valid continuation of the tip: + /// hash integrity, then block-id continuity, then `prev_block_hash` linkage. + fn acceptance_error(&self, block: &Block) -> Result> { + let computed = block.recompute_hash(); + if computed != block.header.hash { + return Ok(Some(BlockIngestError::HashMismatch { + computed, + header: block.header.hash, + })); + } + + match self.validated_tip()? { + None => { + if block.header.block_id != GENESIS_BLOCK_ID { + return Ok(Some(BlockIngestError::UnexpectedBlockId { + expected: GENESIS_BLOCK_ID, + got: block.header.block_id, + })); + } + } + Some(tip) => { + let expected = tip.block_id.checked_add(1).expect("block id overflow"); + if block.header.block_id != expected { + return Ok(Some(BlockIngestError::UnexpectedBlockId { + expected, + got: block.header.block_id, + })); + } + if block.header.prev_block_hash != tip.hash { + return Ok(Some(BlockIngestError::BrokenChainLink { + expected_prev: tip.hash, + got_prev: block.header.prev_block_hash, + })); + } + } + } + Ok(None) + } + + /// Records the chain breaker: the first break is stored verbatim; subsequent + /// breaks only bump `orphans_since`, preserving the original cause. + fn record_break( + &self, + header: Option<&BlockHeader>, + l1_slot: serde_json::Value, + error: BlockIngestError, + ) -> Result<()> { + let breaker = match self.get_chain_breaker()? { + Some(mut existing) => { + existing.orphans_since = existing.orphans_since.saturating_add(1); + existing + } + None => ChainBreaker { + block_id: header.map(|h| h.block_id), + block_hash: header.map(|h| h.hash), + prev_block_hash: header.map(|h| h.prev_block_hash), + first_seen: header.map(|h| h.timestamp), + l1_slot, + error, + orphans_since: 0, + }, + }; + self.set_chain_breaker(&Some(breaker)) + } + + /// Records a breaker for an inscription that could not even be parsed. + pub fn record_deserialize_break( + &self, + l1_slot: serde_json::Value, + error: String, + ) -> Result<()> { + self.record_break(None, l1_slot, BlockIngestError::Deserialize(error)) + } + + /// Validates `block` against the tip and, if it chains, applies it atomically + /// (scratch clone, commit only on full success) and advances the tip. On any + /// failure records the breaker and returns `Parked` without touching state. + pub async fn accept_block( + &self, + block: &Block, + l1_slot: serde_json::Value, + ) -> Result { + if let Some(err) = self.acceptance_error(block)? { + self.record_break(Some(&block.header), l1_slot, err.clone())?; + return Ok(AcceptOutcome::Parked(err)); + } + + // TODO: we use scratch state to be atomic, but need to revisit how expensive a clone is + let mut scratch = self.current_state.read().await.clone(); + if let Err(err) = apply_block_to_scratch(block, &mut scratch) { + self.record_break(Some(&block.header), l1_slot, err.clone())?; + return Ok(AcceptOutcome::Parked(err)); + } + + let mut stored = block.clone(); + stored.bedrock_status = BedrockStatus::Finalized; + if let Err(err) = self.dbio.put_block(&stored, [0_u8; 32]) { + let ingest_err = BlockIngestError::Storage(err.to_string()); + self.record_break(Some(&block.header), l1_slot, ingest_err.clone())?; + return Ok(AcceptOutcome::Parked(ingest_err)); + } + + // Commit in-memory state (infallible) only after the DB write succeeded. + *self.current_state.write().await = scratch; + self.set_chain_breaker(&None)?; + Ok(AcceptOutcome::Applied) + } + pub async fn put_block(&self, mut block: Block, l1_header: HeaderId) -> Result<()> { info!("Applying block {}", block.header.block_id); { @@ -214,6 +351,61 @@ impl IndexerStore { } } +/// Applies a block's transactions to `state`, mapping every failure to a +/// [`BlockIngestError`] so the caller can park rather than crash. Operates on a +/// scratch state; the caller commits only on `Ok`. +fn apply_block_to_scratch(block: &Block, state: &mut V03State) -> Result<(), BlockIngestError> { + let (clock_tx, user_txs) = + block.body.transactions.split_last().ok_or_else(|| { + BlockIngestError::StateTransition("block has no transactions".to_owned()) + })?; + + let expected_clock = LeeTransaction::Public(clock_invocation(block.header.timestamp)); + if *clock_tx != expected_clock { + return Err(BlockIngestError::StateTransition( + "last transaction must be the clock invocation for the block timestamp".to_owned(), + )); + } + + let is_genesis = block.header.block_id == GENESIS_BLOCK_ID; + for transaction in user_txs { + if is_genesis { + let LeeTransaction::Public(public_tx) = transaction else { + return Err(BlockIngestError::StateTransition( + "genesis block should contain only public transactions".to_owned(), + )); + }; + state + .transition_from_public_transaction( + public_tx, + block.header.block_id, + block.header.timestamp, + ) + .map_err(|err| BlockIngestError::StateTransition(format!("{err:?}")))?; + } else { + transaction + .clone() + .execute_on_state(state, block.header.block_id, block.header.timestamp) + .map_err(|err| BlockIngestError::StateTransition(format!("{err:?}")))?; + } + } + + let LeeTransaction::Public(clock_public_tx) = clock_tx else { + return Err(BlockIngestError::StateTransition( + "clock invocation must be a public transaction".to_owned(), + )); + }; + state + .transition_from_public_transaction( + clock_public_tx, + block.header.block_id, + block.header.timestamp, + ) + .map_err(|err| BlockIngestError::StateTransition(format!("{err:?}")))?; + + Ok(()) +} + #[cfg(test)] mod chain_breaker_tests { use common::HashType; @@ -372,3 +564,103 @@ mod tests { assert_eq!(acc2_at_9.balance, 20090); } } + +#[cfg(test)] +mod accept_tests { + use common::{HashType, block::HashableBlockData}; + + use super::*; + use crate::ingest_error::BlockIngestError; + + fn signing_key() -> lee::PrivateKey { + lee::PrivateKey::try_new([7_u8; 32]).expect("valid key") + } + + // A block with a correct hash but empty body — enough to exercise the + // acceptance checks (id/link/hash), which run before any state application. + fn valid_hash_block(block_id: u64, prev: HashType) -> common::block::Block { + HashableBlockData { + block_id, + prev_block_hash: prev, + timestamp: 0, + transactions: vec![], + } + .into_pending_block(&signing_key()) + } + + #[tokio::test] + async fn non_genesis_first_block_parks_with_unexpected_id() { + let dir = tempfile::tempdir().expect("tempdir"); + let store = IndexerStore::open_db(dir.path()).expect("open store"); + + let block = valid_hash_block(2, HashType([0_u8; 32])); + let outcome = store + .accept_block(&block, serde_json::Value::Null) + .await + .expect("accept"); + + assert!(matches!( + outcome, + AcceptOutcome::Parked(BlockIngestError::UnexpectedBlockId { + expected: 1, + got: 2 + }) + )); + let breaker = store.get_chain_breaker().expect("get").expect("present"); + assert_eq!(breaker.block_id, Some(2)); + assert_eq!(breaker.orphans_since, 0); + } + + #[tokio::test] + async fn hash_mismatch_parks() { + let dir = tempfile::tempdir().expect("tempdir"); + let store = IndexerStore::open_db(dir.path()).expect("open store"); + + let mut block = valid_hash_block(1, HashType([0_u8; 32])); + block.header.timestamp = 999; // invalidates the stored hash + + let outcome = store + .accept_block(&block, serde_json::Value::Null) + .await + .expect("accept"); + assert!(matches!( + outcome, + AcceptOutcome::Parked(BlockIngestError::HashMismatch { .. }) + )); + } + + #[tokio::test] + async fn second_break_bumps_orphan_count_and_keeps_first() { + let dir = tempfile::tempdir().expect("tempdir"); + let store = IndexerStore::open_db(dir.path()).expect("open store"); + + let first = valid_hash_block(2, HashType([0_u8; 32])); + store + .accept_block(&first, serde_json::Value::Null) + .await + .expect("accept"); + let second = valid_hash_block(3, HashType([0_u8; 32])); + store + .accept_block(&second, serde_json::Value::Null) + .await + .expect("accept"); + + let breaker = store.get_chain_breaker().expect("get").expect("present"); + assert_eq!(breaker.block_id, Some(2), "first breaker preserved"); + assert_eq!(breaker.orphans_since, 1, "second break counted as orphan"); + } + + #[tokio::test] + async fn deserialize_break_records_breaker_without_header() { + let dir = tempfile::tempdir().expect("tempdir"); + let store = IndexerStore::open_db(dir.path()).expect("open store"); + + store + .record_deserialize_break(serde_json::Value::Null, "bad bytes".to_owned()) + .expect("record"); + + let breaker = store.get_chain_breaker().expect("get").expect("present"); + assert_eq!(breaker.block_id, None); + assert!(matches!(breaker.error, BlockIngestError::Deserialize(_))); + } +}