feat(indexer): add accept_block with chain-linkage check and atomic apply

test `RISC0_DEV_MODE=1 RISC0_SKIP_BUILD=1 cargo test -p indexer_core --lib accept_tests`
This commit is contained in:
erhant 2026-06-26 14:50:26 +03:00
parent 48a41fe1cf
commit fc3692813a

View File

@ -2,10 +2,11 @@ use std::{path::Path, sync::Arc};
use anyhow::{Context as _, Result};
use common::{
block::{BedrockStatus, Block},
HashType,
block::{BedrockStatus, Block, BlockHeader},
transaction::{LeeTransaction, clock_invocation},
};
use lee::{Account, AccountId, V03State};
use lee::{Account, AccountId, GENESIS_BLOCK_ID, V03State};
use lee_core::BlockId;
use log::info;
use logos_blockchain_core::header::HeaderId;
@ -13,7 +14,20 @@ use logos_blockchain_zone_sdk::Slot;
use storage::indexer::RocksDBIO;
use tokio::sync::RwLock;
use crate::chain_breaker::ChainBreaker;
use crate::{chain_breaker::ChainBreaker, ingest_error::BlockIngestError};
struct Tip {
block_id: u64,
hash: HashType,
}
/// Outcome of feeding a parsed L2 block to the validated tip.
pub enum AcceptOutcome {
/// Chained and applied; tip and L1 read cursor both advance.
Applied,
/// Did not chain or failed to apply; tip stays frozen, breaker recorded.
Parked(BlockIngestError),
}
#[derive(Clone)]
pub struct IndexerStore {
@ -151,6 +165,129 @@ impl IndexerStore {
.get_account_by_id(*account_id))
}
/// The last successfully applied block as `{block_id, hash}`, or `None` before
/// any block is stored (cold start). Read fresh from the store each call.
fn validated_tip(&self) -> Result<Option<Tip>> {
let Some(block_id) = self.dbio.get_meta_last_block_id_in_db()? else {
return Ok(None);
};
let Some(block) = self.dbio.get_block(block_id)? else {
return Ok(None);
};
Ok(Some(Tip {
block_id,
hash: block.header.hash,
}))
}
/// Returns `Some(err)` if `block` is not the valid continuation of the tip:
/// hash integrity, then block-id continuity, then `prev_block_hash` linkage.
fn acceptance_error(&self, block: &Block) -> Result<Option<BlockIngestError>> {
let computed = block.recompute_hash();
if computed != block.header.hash {
return Ok(Some(BlockIngestError::HashMismatch {
computed,
header: block.header.hash,
}));
}
match self.validated_tip()? {
None => {
if block.header.block_id != GENESIS_BLOCK_ID {
return Ok(Some(BlockIngestError::UnexpectedBlockId {
expected: GENESIS_BLOCK_ID,
got: block.header.block_id,
}));
}
}
Some(tip) => {
let expected = tip.block_id.checked_add(1).expect("block id overflow");
if block.header.block_id != expected {
return Ok(Some(BlockIngestError::UnexpectedBlockId {
expected,
got: block.header.block_id,
}));
}
if block.header.prev_block_hash != tip.hash {
return Ok(Some(BlockIngestError::BrokenChainLink {
expected_prev: tip.hash,
got_prev: block.header.prev_block_hash,
}));
}
}
}
Ok(None)
}
/// Records the chain breaker: the first break is stored verbatim; subsequent
/// breaks only bump `orphans_since`, preserving the original cause.
fn record_break(
&self,
header: Option<&BlockHeader>,
l1_slot: serde_json::Value,
error: BlockIngestError,
) -> Result<()> {
let breaker = match self.get_chain_breaker()? {
Some(mut existing) => {
existing.orphans_since = existing.orphans_since.saturating_add(1);
existing
}
None => ChainBreaker {
block_id: header.map(|h| h.block_id),
block_hash: header.map(|h| h.hash),
prev_block_hash: header.map(|h| h.prev_block_hash),
first_seen: header.map(|h| h.timestamp),
l1_slot,
error,
orphans_since: 0,
},
};
self.set_chain_breaker(&Some(breaker))
}
/// Records a breaker for an inscription that could not even be parsed.
pub fn record_deserialize_break(
&self,
l1_slot: serde_json::Value,
error: String,
) -> Result<()> {
self.record_break(None, l1_slot, BlockIngestError::Deserialize(error))
}
/// Validates `block` against the tip and, if it chains, applies it atomically
/// (scratch clone, commit only on full success) and advances the tip. On any
/// failure records the breaker and returns `Parked` without touching state.
pub async fn accept_block(
&self,
block: &Block,
l1_slot: serde_json::Value,
) -> Result<AcceptOutcome> {
if let Some(err) = self.acceptance_error(block)? {
self.record_break(Some(&block.header), l1_slot, err.clone())?;
return Ok(AcceptOutcome::Parked(err));
}
// TODO: we use scratch state to be atomic, but need to revisit how expensive a clone is
let mut scratch = self.current_state.read().await.clone();
if let Err(err) = apply_block_to_scratch(block, &mut scratch) {
self.record_break(Some(&block.header), l1_slot, err.clone())?;
return Ok(AcceptOutcome::Parked(err));
}
let mut stored = block.clone();
stored.bedrock_status = BedrockStatus::Finalized;
if let Err(err) = self.dbio.put_block(&stored, [0_u8; 32]) {
let ingest_err = BlockIngestError::Storage(err.to_string());
self.record_break(Some(&block.header), l1_slot, ingest_err.clone())?;
return Ok(AcceptOutcome::Parked(ingest_err));
}
// Commit in-memory state (infallible) only after the DB write succeeded.
*self.current_state.write().await = scratch;
self.set_chain_breaker(&None)?;
Ok(AcceptOutcome::Applied)
}
pub async fn put_block(&self, mut block: Block, l1_header: HeaderId) -> Result<()> {
info!("Applying block {}", block.header.block_id);
{
@ -214,6 +351,61 @@ impl IndexerStore {
}
}
/// Applies a block's transactions to `state`, mapping every failure to a
/// [`BlockIngestError`] so the caller can park rather than crash. Operates on a
/// scratch state; the caller commits only on `Ok`.
fn apply_block_to_scratch(block: &Block, state: &mut V03State) -> Result<(), BlockIngestError> {
let (clock_tx, user_txs) =
block.body.transactions.split_last().ok_or_else(|| {
BlockIngestError::StateTransition("block has no transactions".to_owned())
})?;
let expected_clock = LeeTransaction::Public(clock_invocation(block.header.timestamp));
if *clock_tx != expected_clock {
return Err(BlockIngestError::StateTransition(
"last transaction must be the clock invocation for the block timestamp".to_owned(),
));
}
let is_genesis = block.header.block_id == GENESIS_BLOCK_ID;
for transaction in user_txs {
if is_genesis {
let LeeTransaction::Public(public_tx) = transaction else {
return Err(BlockIngestError::StateTransition(
"genesis block should contain only public transactions".to_owned(),
));
};
state
.transition_from_public_transaction(
public_tx,
block.header.block_id,
block.header.timestamp,
)
.map_err(|err| BlockIngestError::StateTransition(format!("{err:?}")))?;
} else {
transaction
.clone()
.execute_on_state(state, block.header.block_id, block.header.timestamp)
.map_err(|err| BlockIngestError::StateTransition(format!("{err:?}")))?;
}
}
let LeeTransaction::Public(clock_public_tx) = clock_tx else {
return Err(BlockIngestError::StateTransition(
"clock invocation must be a public transaction".to_owned(),
));
};
state
.transition_from_public_transaction(
clock_public_tx,
block.header.block_id,
block.header.timestamp,
)
.map_err(|err| BlockIngestError::StateTransition(format!("{err:?}")))?;
Ok(())
}
#[cfg(test)]
mod chain_breaker_tests {
use common::HashType;
@ -372,3 +564,103 @@ mod tests {
assert_eq!(acc2_at_9.balance, 20090);
}
}
#[cfg(test)]
mod accept_tests {
use common::{HashType, block::HashableBlockData};
use super::*;
use crate::ingest_error::BlockIngestError;
fn signing_key() -> lee::PrivateKey {
lee::PrivateKey::try_new([7_u8; 32]).expect("valid key")
}
// A block with a correct hash but empty body — enough to exercise the
// acceptance checks (id/link/hash), which run before any state application.
fn valid_hash_block(block_id: u64, prev: HashType) -> common::block::Block {
HashableBlockData {
block_id,
prev_block_hash: prev,
timestamp: 0,
transactions: vec![],
}
.into_pending_block(&signing_key())
}
#[tokio::test]
async fn non_genesis_first_block_parks_with_unexpected_id() {
let dir = tempfile::tempdir().expect("tempdir");
let store = IndexerStore::open_db(dir.path()).expect("open store");
let block = valid_hash_block(2, HashType([0_u8; 32]));
let outcome = store
.accept_block(&block, serde_json::Value::Null)
.await
.expect("accept");
assert!(matches!(
outcome,
AcceptOutcome::Parked(BlockIngestError::UnexpectedBlockId {
expected: 1,
got: 2
})
));
let breaker = store.get_chain_breaker().expect("get").expect("present");
assert_eq!(breaker.block_id, Some(2));
assert_eq!(breaker.orphans_since, 0);
}
#[tokio::test]
async fn hash_mismatch_parks() {
let dir = tempfile::tempdir().expect("tempdir");
let store = IndexerStore::open_db(dir.path()).expect("open store");
let mut block = valid_hash_block(1, HashType([0_u8; 32]));
block.header.timestamp = 999; // invalidates the stored hash
let outcome = store
.accept_block(&block, serde_json::Value::Null)
.await
.expect("accept");
assert!(matches!(
outcome,
AcceptOutcome::Parked(BlockIngestError::HashMismatch { .. })
));
}
#[tokio::test]
async fn second_break_bumps_orphan_count_and_keeps_first() {
let dir = tempfile::tempdir().expect("tempdir");
let store = IndexerStore::open_db(dir.path()).expect("open store");
let first = valid_hash_block(2, HashType([0_u8; 32]));
store
.accept_block(&first, serde_json::Value::Null)
.await
.expect("accept");
let second = valid_hash_block(3, HashType([0_u8; 32]));
store
.accept_block(&second, serde_json::Value::Null)
.await
.expect("accept");
let breaker = store.get_chain_breaker().expect("get").expect("present");
assert_eq!(breaker.block_id, Some(2), "first breaker preserved");
assert_eq!(breaker.orphans_since, 1, "second break counted as orphan");
}
#[tokio::test]
async fn deserialize_break_records_breaker_without_header() {
let dir = tempfile::tempdir().expect("tempdir");
let store = IndexerStore::open_db(dir.path()).expect("open store");
store
.record_deserialize_break(serde_json::Value::Null, "bad bytes".to_owned())
.expect("record");
let breaker = store.get_chain_breaker().expect("get").expect("present");
assert_eq!(breaker.block_id, None);
assert!(matches!(breaker.error, BlockIngestError::Deserialize(_)));
}
}