From bb75b9857b336a604bbaef7f03c6a63bf46b89d9 Mon Sep 17 00:00:00 2001 From: erhant Date: Sat, 27 Jun 2026 19:22:54 +0300 Subject: [PATCH] fix(indexer): run the chain-identity check even when the store is parked --- lez/indexer/core/src/lib.rs | 190 ++++++++++++++++++------------------ 1 file changed, 94 insertions(+), 96 deletions(-) diff --git a/lez/indexer/core/src/lib.rs b/lez/indexer/core/src/lib.rs index 73bb0505..18c09cfc 100644 --- a/lez/indexer/core/src/lib.rs +++ b/lez/indexer/core/src/lib.rs @@ -6,10 +6,9 @@ use common::block::Block; // TODO: Remove after testnet use futures::StreamExt as _; pub use ingest_error::BlockIngestError; -use lee::GENESIS_BLOCK_ID; use log::{error, info, warn}; use logos_blockchain_zone_sdk::{ - CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer, + CommonHttpClient, Slot, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer, }; pub use stall_reason::StallReason; @@ -24,42 +23,17 @@ pub mod ingest_error; pub mod stall_reason; pub mod status; -/// First post-genesis L2 block. -/// -/// We use this to differentiate between a local rocksdb chain -/// and the connected chain, to see if we should reset (mostly dev purposes). -/// While such a discrepancy will not happen during live-chain indexing, this -/// saves us some trouble during development (especially when used within UI -/// with the indexer module). -/// -/// Genesis is deterministic so it is byte-identical across chains, so instead -/// we use the second block to differentiate between chains. -const ANCHOR_BLOCK_ID: u64 = GENESIS_BLOCK_ID + 1; - -/// Result of comparing the indexer's stored anchor block against the channel's. +/// Result of comparing the indexer's stored tip against the channel. enum ChainIdentityOutcome { - /// One of the following possibilities: - /// - Anchors match - /// - Nothing to compare (store has only genesis) - /// - L1 was unreadable (proceed from the persisted cursor; fail-over) + /// Proceed from the persisted cursor. Either the channel still serves our tip + /// (same chain), there is nothing to compare (empty store / parked / no cursor), + /// or the check was inconclusive (L1 unreadable, or bedrock's LIB still behind + /// our tip slot) — none of which prove a reset. Consistent, - /// The store holds a different chain than the channel now serves; `detail` - /// describes how (differing anchor, or the channel lacks the anchor entirely). + /// The channel serves a *different* block at the tip's id — a chain reset. Mismatch { detail: String }, } -/// Outcome of reading the channel's anchor block at startup. -enum AnchorRead { - /// The channel's anchor block. - Found(Block), - /// The channel definitively has no anchor (drained up to LIB without it). - /// Since our stored anchor was finalized, a finalized channel that lacks it is - /// a different chain. - Absent, - /// Could not read the channel in time (slow/unreachable L1) — best-effort skip. - Unreadable, -} - #[derive(Clone)] pub struct IndexerCore { pub zone_indexer: Arc>, @@ -91,10 +65,9 @@ impl IndexerCore { } /// Builds the core, then verifies the stored chain matches the channel's by - /// comparing the anchor block (id 2 — genesis is identical across chains, so - /// it can't detect a reset). On mismatch: refuse (error) unless `allow_reset`, - /// in which case wipe the store and re-index from scratch. Used at service/FFI - /// startup in place of `new`. + /// re-reading the channel at the stored tip's position. On mismatch: refuse + /// (error) unless `allow_reset`, in which case wipe the store and re-index from + /// scratch. Used at service/FFI startup in place of `new`. pub async fn new_with_chain_check( config: IndexerConfig, storage_dir: &Path, @@ -122,64 +95,86 @@ impl IndexerCore { } } - /// Compares the stored anchor block (id 2) against the channel's current one. - /// An absent channel anchor means a different (shorter) chain, since our stored - /// anchor was finalized and finalized history does not shrink on the same chain. + /// Verifies the channel still serves our chain at the tip's L1 slot (the persisted + /// cursor), by comparing the first block it serves there against our stored block + /// of the *same id*. + /// + /// This is mostly a development convenience: it lets the indexer notice that its + /// local state belongs to a different chain than the one it is now connected to + /// (e.g. a wiped/restarted bedrock) and reset instead of silently diverging. It + /// will not trigger during normal live indexing. Reading at the cursor — which is + /// recent — keeps this to ~one batch rather than a scan from genesis (L1 slots + /// are wall-clock-derived, so genesis is millions of empty slots away). async fn chain_identity_outcome(&self) -> Result { - let Some(stored) = self.store.get_block_at_id(ANCHOR_BLOCK_ID)? else { - // Store has at most genesis: nothing post-genesis to compare against. + // We deliberately do NOT skip parked stores: a parked store must still be able + // to detect a reset, or it stays parked across reboots forever (its persisted + // stall would short-circuit the check every boot). A same-chain park is still + // safe — the parked block sits at an id we never applied, so the lookup below + // misses and we stay Consistent. + let Some(cursor) = self.store.get_zone_cursor()? else { + return Ok(ChainIdentityOutcome::Consistent); // empty / cold store: nothing to verify + }; + + // Compare the first block the channel serves at/after the cursor against our + // stored block of the same id. On the same chain that is our tip and matches; + // a reset serves a different block here — crucially including a freshly + // restarted, *shorter* chain whose low-id block at this slot differs from ours + // (the old "look for our tip id" approach missed this: a short chain has no + // block at our tip id). + // + // Anything inconclusive stays Consistent (proceed, let ingest park if truly + // divergent) rather than wiping a valid store: an empty/unreadable read (most + // importantly bedrock's LIB still behind our tip slot), or a channel block at + // an id we don't hold. Blind spot: a store holding only genesis can't be + // distinguished (genesis is deterministic across chains), but that window is + // transient. + let Some(channel_block) = self.fetch_channel_block_from(cursor).await? else { return Ok(ChainIdentityOutcome::Consistent); }; - Ok(match self.fetch_channel_anchor().await? { - AnchorRead::Found(current) => compare_anchor(&stored, ¤t), - AnchorRead::Absent => ChainIdentityOutcome::Mismatch { - detail: format!( - "channel serves no block {ANCHOR_BLOCK_ID}, but the store holds anchor {}", - stored.header.hash - ), - }, - AnchorRead::Unreadable => ChainIdentityOutcome::Consistent, - }) + let Some(ours) = self.store.get_block_at_id(channel_block.header.block_id)? else { + return Ok(ChainIdentityOutcome::Consistent); + }; + Ok(compare_block(&ours, &channel_block)) } - /// Reads the channel's anchor block (first `Block` with id [`ANCHOR_BLOCK_ID`]) - /// from the start of the channel. + /// Reads the channel starting at the tip's L1 slot (the `cursor`) and returns the + /// first block it serves there. `next_messages` is exclusive of its argument, so + /// `cursor - 1` is passed to include the tip's own slot. /// - /// Bedrock can be slow to serve the channel right after boot, so we allow a - /// generous timeout. `Unreadable` on error/timeout keeps startup best-effort - /// (never refuse on a transient L1 hiccup); `Absent` means the channel has no - /// anchor in its finalized history, which the caller treats as a reset. - async fn fetch_channel_anchor(&self) -> Result { - const ANCHOR_FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(180); + /// Cheap: the cursor is recent, so this reads roughly one batch. `None` covers the + /// inconclusive cases — a slow/unreachable L1 (timeout/error) or bedrock's LIB + /// still behind our tip slot (empty stream) — neither of which proves a reset. + async fn fetch_channel_block_from(&self, cursor: Slot) -> Result> { + const TIP_FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + // A slot-0 cursor is degenerate (real inscriptions live at wall-clock slots); + // bail rather than let `next_messages(None)` fall back to a from-genesis scan. + let Some(from_slot) = cursor.into_inner().checked_sub(1) else { + return Ok(None); + }; let fetch = async { - let stream = self.zone_indexer.next_messages(None).await?; + let stream = self + .zone_indexer + .next_messages(Some(Slot::from(from_slot))) + .await?; let mut stream = std::pin::pin!(stream); while let Some((msg, _slot)) = stream.next().await { - let ZoneMessage::Block(zone_block) = msg else { - continue; - }; - let block: Block = borsh::from_slice(&zone_block.data) - .context("Failed to deserialize channel block")?; - if block.header.block_id == ANCHOR_BLOCK_ID { - return Ok::(AnchorRead::Found(block)); - } - if block.header.block_id > ANCHOR_BLOCK_ID { - break; // blocks arrive in order: we passed the anchor without seeing it + if let ZoneMessage::Block(zone_block) = msg { + let block: Block = borsh::from_slice(&zone_block.data) + .context("Failed to deserialize channel block")?; + return Ok::, anyhow::Error>(Some(block)); } } - Ok(AnchorRead::Absent) + Ok(None) }; - match tokio::time::timeout(ANCHOR_FETCH_TIMEOUT, fetch).await { - Ok(Ok(read)) => Ok(read), + match tokio::time::timeout(TIP_FETCH_TIMEOUT, fetch).await { + Ok(Ok(found)) => Ok(found), Ok(Err(err)) => { - warn!( - "Failed to read channel anchor for the consistency check; proceeding: {err:#}" - ); - Ok(AnchorRead::Unreadable) + warn!("Failed to read channel tip for the consistency check; proceeding: {err:#}"); + Ok(None) } Err(_elapsed) => { - warn!("Timed out reading channel anchor for the consistency check; proceeding"); - Ok(AnchorRead::Unreadable) + warn!("Timed out reading channel tip for the consistency check; proceeding"); + Ok(None) } } } @@ -326,16 +321,19 @@ impl IndexerCore { } } -/// Pure comparison of two anchor blocks: a mismatch is differing hashes. The -/// missing-side cases are handled upstream (`Absent`/`Unreadable`). -fn compare_anchor(stored: &Block, current: &Block) -> ChainIdentityOutcome { - if stored.header.hash == current.header.hash { +/// Pure comparison of our stored block against the channel's block at the same id: +/// a mismatch is differing hashes. Missing/unreadable cases are handled upstream. +fn compare_block(ours: &Block, channel: &Block) -> ChainIdentityOutcome { + if ours.header.hash == channel.header.hash { ChainIdentityOutcome::Consistent } else { ChainIdentityOutcome::Mismatch { detail: format!( - "stored anchor {} != channel anchor {}", - stored.header.hash, current.header.hash + "stored block {} {} != channel block {} {}", + ours.header.block_id, + ours.header.hash, + channel.header.block_id, + channel.header.hash ), } } @@ -345,27 +343,27 @@ fn compare_anchor(stored: &Block, current: &Block) -> ChainIdentityOutcome { mod chain_identity_tests { use common::{HashType, block::Block, test_utils::produce_dummy_block}; - use super::{ANCHOR_BLOCK_ID, ChainIdentityOutcome, compare_anchor}; + use super::{ChainIdentityOutcome, compare_block}; - fn anchor_with_prev(prev_seed: u8) -> Block { - produce_dummy_block(ANCHOR_BLOCK_ID, Some(HashType([prev_seed; 32])), vec![]) + fn block_with_prev(prev_seed: u8) -> Block { + produce_dummy_block(5, Some(HashType([prev_seed; 32])), vec![]) } #[test] - fn matching_anchor_is_consistent() { - let a = anchor_with_prev(1); + fn matching_block_is_consistent() { + let b = block_with_prev(1); assert!(matches!( - compare_anchor(&a, &a), + compare_block(&b, &b), ChainIdentityOutcome::Consistent )); } #[test] - fn differing_anchor_is_mismatch() { - let stored = anchor_with_prev(1); - let current = anchor_with_prev(2); + fn differing_block_is_mismatch() { + let stored = block_with_prev(1); + let current = block_with_prev(2); assert!(matches!( - compare_anchor(&stored, ¤t), + compare_block(&stored, ¤t), ChainIdentityOutcome::Mismatch { .. } )); }