feat(indexer): park ingest loop on bad blocks instead of skipping

`RISC0_DEV_MODE=1 RISC0_SKIP_BUILD=1 cargo test -p indexer_core`
This commit is contained in:
erhant 2026-06-26 16:39:37 +03:00
parent 00939ae6d6
commit 8a719c289d
2 changed files with 127 additions and 168 deletions

View File

@ -8,7 +8,6 @@ use common::{
};
use lee::{Account, AccountId, GENESIS_BLOCK_ID, V03State};
use lee_core::BlockId;
use log::info;
use logos_blockchain_core::header::HeaderId;
use logos_blockchain_zone_sdk::Slot;
use storage::indexer::RocksDBIO;
@ -287,68 +286,6 @@ impl IndexerStore {
self.set_stall_reason(&None)?;
Ok(AcceptOutcome::Applied)
}
pub async fn put_block(&self, mut block: Block, l1_header: HeaderId) -> Result<()> {
info!("Applying block {}", block.header.block_id);
{
let mut state_guard = self.current_state.write().await;
let (clock_tx, user_txs) = block
.body
.transactions
.split_last()
.ok_or_else(|| anyhow::anyhow!("Block has no transactions"))?;
anyhow::ensure!(
*clock_tx == LeeTransaction::Public(clock_invocation(block.header.timestamp)),
"Last transaction in block must be the clock invocation for the block timestamp"
);
let is_genesis = block.header.block_id == 1;
for transaction in user_txs {
if is_genesis {
let genesis_tx = match transaction {
LeeTransaction::Public(public_tx) => public_tx,
LeeTransaction::PrivacyPreserving(_)
| LeeTransaction::ProgramDeployment(_) => {
anyhow::bail!("Genesis block should contain only public transactions")
}
};
state_guard
.transition_from_public_transaction(
genesis_tx,
block.header.block_id,
block.header.timestamp,
)
.context("Failed to execute genesis public transaction")?;
} else {
transaction.clone().execute_on_state(
&mut state_guard,
block.header.block_id,
block.header.timestamp,
)?;
}
}
// Apply the clock invocation directly (it is expected to modify clock accounts).
let LeeTransaction::Public(clock_public_tx) = clock_tx else {
anyhow::bail!("Clock invocation must be a public transaction");
};
state_guard.transition_from_public_transaction(
clock_public_tx,
block.header.block_id,
block.header.timestamp,
)?;
}
// ToDo: Currently we are fetching only finalized blocks
// if it changes, the following lines need to be updated
// to represent correct block finality
block.bedrock_status = BedrockStatus::Finalized;
info!("Putting block {} into DB", block.header.block_id);
Ok(self.dbio.put_block(&block, l1_header.into())?)
}
}
/// Applies a block's transactions to `state`, mapping every failure to a
@ -443,7 +380,7 @@ mod stall_reason_tests {
#[cfg(test)]
mod tests {
use common::{HashType, block::HashableBlockData};
use common::test_utils::{create_transaction_native_token_transfer, produce_dummy_block};
use tempfile::tempdir;
use testnet_initial_state::initial_pub_accounts_private_keys;
@ -461,105 +398,99 @@ mod tests {
}
#[tokio::test]
async fn state_transition() {
async fn accept_block_applies_transfers_and_advances_tip() {
let home = tempdir().unwrap();
let storage = IndexerStore::open_db(home.as_ref()).unwrap();
let store = IndexerStore::open_db(home.as_ref()).unwrap();
let initial_accounts = initial_pub_accounts_private_keys();
let from = initial_accounts[0].account_id;
let to = initial_accounts[1].account_id;
let sign_key = initial_accounts[0].pub_sign_key.clone();
// Submit genesis block
let clock_tx = LeeTransaction::Public(clock_invocation(0));
let genesis_block_data = HashableBlockData {
block_id: 1,
prev_block_hash: HashType::default(),
timestamp: 0,
transactions: vec![clock_tx],
};
let genesis_block = genesis_block_data
.into_pending_block(&common::test_utils::sequencer_sign_key_for_testing());
let mut prev_hash = Some(genesis_block.header.hash);
storage
.put_block(genesis_block, HeaderId::from([0_u8; 32]))
.await
.unwrap();
for i in 0..10 {
let tx = common::test_utils::create_transaction_native_token_transfer(
from, i, to, 10, &sign_key,
);
let block_id = u64::try_from(i + 1).unwrap();
let next_block = common::test_utils::produce_dummy_block(block_id, prev_hash, vec![tx]);
prev_hash = Some(next_block.header.hash);
storage
.put_block(
next_block,
HeaderId::from([u8::try_from(i + 1).unwrap(); 32]),
)
// Genesis (block 1): clock-only.
let genesis = produce_dummy_block(1, None, vec![]);
let mut prev_hash = genesis.header.hash;
assert!(matches!(
store
.accept_block(&genesis, serde_json::Value::Null)
.await
.unwrap();
.unwrap(),
AcceptOutcome::Applied
));
// Blocks 2..=11: one native transfer of 10 each (nonces 0..=9).
for i in 0..10_u64 {
let tx = create_transaction_native_token_transfer(from, i as u128, to, 10, &sign_key);
let block = produce_dummy_block(i + 2, Some(prev_hash), vec![tx]);
prev_hash = block.header.hash;
assert!(matches!(
store
.accept_block(&block, serde_json::Value::Null)
.await
.unwrap(),
AcceptOutcome::Applied
));
}
let acc1_val = storage.account_current_state(&from).await.unwrap();
let acc2_val = storage.account_current_state(&to).await.unwrap();
assert_eq!(acc1_val.balance, 9900);
assert_eq!(acc2_val.balance, 20100);
assert_eq!(
store.account_current_state(&from).await.unwrap().balance,
9900
);
assert_eq!(
store.account_current_state(&to).await.unwrap().balance,
20100
);
// Tip advanced to the last applied block; a clean run leaves no stall.
assert_eq!(store.get_last_block_id().unwrap(), Some(11));
assert!(store.get_stall_reason().unwrap().is_none());
}
#[tokio::test]
async fn account_state_at_block() {
async fn account_state_at_block_reflects_history() {
let home = tempdir().unwrap();
let storage = IndexerStore::open_db(home.as_ref()).unwrap();
let mut prev_hash = None;
let store = IndexerStore::open_db(home.as_ref()).unwrap();
let initial_accounts = initial_pub_accounts_private_keys();
let from = initial_accounts[0].account_id;
let to = initial_accounts[1].account_id;
let sign_key = initial_accounts[0].pub_sign_key.clone();
for i in 0..10 {
let tx = common::test_utils::create_transaction_native_token_transfer(
from, i, to, 10, &sign_key,
);
let block_id = u64::try_from(i + 1).unwrap();
let genesis = produce_dummy_block(1, None, vec![]);
let mut prev_hash = genesis.header.hash;
store
.accept_block(&genesis, serde_json::Value::Null)
.await
.unwrap();
let next_block = common::test_utils::produce_dummy_block(block_id, prev_hash, vec![tx]);
prev_hash = Some(next_block.header.hash);
storage
.put_block(
next_block,
HeaderId::from([u8::try_from(i + 1).unwrap(); 32]),
)
for i in 0..10_u64 {
let tx = create_transaction_native_token_transfer(from, i as u128, to, 10, &sign_key);
let block = produce_dummy_block(i + 2, Some(prev_hash), vec![tx]);
prev_hash = block.header.hash;
store
.accept_block(&block, serde_json::Value::Null)
.await
.unwrap();
}
// Genesis block: no transfers applied yet.
let acc1_at_1 = storage.account_state_at_block(&from, 1).unwrap();
let acc2_at_1 = storage.account_state_at_block(&to, 1).unwrap();
assert_eq!(acc1_at_1.balance, 9990);
assert_eq!(acc2_at_1.balance, 20010);
// After block 5: 4 transfers of 10 applied (one each in blocks 2..=5).
let acc1_at_5 = storage.account_state_at_block(&from, 5).unwrap();
let acc2_at_5 = storage.account_state_at_block(&to, 5).unwrap();
assert_eq!(acc1_at_5.balance, 9950);
assert_eq!(acc2_at_5.balance, 20050);
// After final block 9: 8 transfers applied; should match current state.
let acc1_at_9 = storage.account_state_at_block(&from, 9).unwrap();
let acc2_at_9 = storage.account_state_at_block(&to, 9).unwrap();
assert_eq!(acc1_at_9.balance, 9910);
assert_eq!(acc2_at_9.balance, 20090);
// State at block N is inclusive of block N.
// Block 1 (genesis, clock-only): no transfers yet.
assert_eq!(
store.account_state_at_block(&from, 1).unwrap().balance,
10000
);
assert_eq!(store.account_state_at_block(&to, 1).unwrap().balance, 20000);
// Through block 5: 4 transfers applied (blocks 2..=5).
assert_eq!(
store.account_state_at_block(&from, 5).unwrap().balance,
9960
);
assert_eq!(store.account_state_at_block(&to, 5).unwrap().balance, 20040);
// Through block 9: 8 transfers applied (blocks 2..=9).
assert_eq!(
store.account_state_at_block(&from, 9).unwrap().balance,
9920
);
assert_eq!(store.account_state_at_block(&to, 9).unwrap().balance, 20080);
}
}

View File

@ -7,14 +7,13 @@ use common::block::Block;
use futures::StreamExt as _;
pub use ingest_error::BlockIngestError;
use log::{error, info, warn};
use logos_blockchain_core::header::HeaderId;
use logos_blockchain_zone_sdk::{
CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
};
pub use stall_reason::StallReason;
use crate::{
block_store::IndexerStore,
block_store::{AcceptOutcome, IndexerStore},
config::IndexerConfig,
status::{IndexerStatus, IndexerSyncStatus},
};
@ -95,8 +94,6 @@ impl IndexerCore {
let stream = match self.zone_indexer.next_messages(cursor).await {
Ok(s) => s,
Err(err) => {
// `next_messages` reads L1 consensus info internally, so
// this also covers an unreachable/misconfigured L1 node.
error!("Failed to start zone-sdk next_messages stream: {err}");
self.set_status(IndexerSyncStatus::error(format!(
"cannot reach L1 / read channel: {err}"
@ -107,11 +104,8 @@ impl IndexerCore {
};
let mut stream = std::pin::pin!(stream);
// Flip to Syncing on the first message of this cycle (not merely on
// a successful poll) so the steady-state CaughtUp status doesn't
// flicker. Until then the state stays Starting (cold-start scan of
// empty L1 history) or CaughtUp (idle).
let mut announced_syncing = false;
let mut had_cycle_error = false;
while let Some((msg, slot)) = stream.next().await {
if !announced_syncing {
@ -121,17 +115,25 @@ impl IndexerCore {
let zone_block = match msg {
ZoneMessage::Block(b) => b,
// Non-block messages don't carry a cursor position; the
// next ZoneBlock advances past them implicitly.
ZoneMessage::Deposit(_) | ZoneMessage::Withdraw(_) => continue,
};
let l1_slot = serde_json::to_value(&slot).unwrap_or(serde_json::Value::Null);
let block: Block = match borsh::from_slice(&zone_block.data) {
Ok(b) => b,
Err(e) => {
error!("Failed to deserialize L2 block from zone-sdk: {e}");
// Advance past the broken inscription so we don't
// re-process it on restart.
if let Err(err) =
self.store.record_deserialize_stall(l1_slot, e.to_string())
{
warn!("Failed to record stall reason: {err:#}");
}
self.set_status(IndexerSyncStatus::stalled(format!(
"failed to deserialize L2 block: {e}"
)));
// Advance the L1 read cursor past the broken inscription;
// the validated tip stays frozen.
cursor = Some(slot);
if let Err(err) = self.store.set_zone_cursor(&slot) {
warn!("Failed to persist indexer cursor: {err:#}");
@ -140,27 +142,53 @@ impl IndexerCore {
}
};
info!("Indexed L2 block {}", block.header.block_id);
// TODO: Remove l1_header placeholder once storage layer
// no longer requires it. Zone-sdk handles L1 tracking internally.
let placeholder_l1_header = HeaderId::from([0_u8; 32]);
if let Err(err) = self.store.put_block(block.clone(), placeholder_l1_header).await {
error!("Failed to store block {}: {err:#}", block.header.block_id);
match self.store.accept_block(&block, l1_slot).await {
Ok(AcceptOutcome::Applied) => {
info!("Indexed L2 block {}", block.header.block_id);
self.set_status(IndexerSyncStatus::syncing());
cursor = Some(slot);
if let Err(err) = self.store.set_zone_cursor(&slot) {
warn!("Failed to persist indexer cursor: {err:#}");
}
yield Ok(block);
}
Ok(AcceptOutcome::Parked(ingest_err)) => {
error!(
"Parked at block {}: {ingest_err}",
block.header.block_id
);
self.set_status(IndexerSyncStatus::stalled(ingest_err.to_string()));
// Advance the L1 read cursor; tip stays frozen, no yield.
cursor = Some(slot);
if let Err(err) = self.store.set_zone_cursor(&slot) {
warn!("Failed to persist indexer cursor: {err:#}");
}
}
Err(err) => {
// Infrastructure error (DB read/write), not a bad block.
// Keep the cursor put; re-poll the same position next cycle.
error!(
"Store error applying block {}: {err:#}",
block.header.block_id
);
self.set_status(IndexerSyncStatus::error(format!(
"store error: {err:#}"
)));
had_cycle_error = true;
break;
}
}
cursor = Some(slot);
if let Err(err) = self.store.set_zone_cursor(&slot) {
warn!("Failed to persist indexer cursor: {err:#}");
}
yield Ok(block);
}
// Stream drained: caught up to LIB as of this cycle. Clears any
// prior error (e.g. a transient L1 disconnect that left no
// backlog, so the `Syncing` branch above never ran). Sleep then
// poll again.
self.set_status(IndexerSyncStatus::caught_up());
if had_cycle_error {
tokio::time::sleep(poll_interval).await;
continue;
}
// Stream drained. Stay Stalled if parked; otherwise we are caught up.
if self.store.get_stall_reason().ok().flatten().is_none() {
self.set_status(IndexerSyncStatus::caught_up());
}
tokio::time::sleep(poll_interval).await;
}
}