fix(indexer): run the chain-identity check even when the store is parked

This commit is contained in:
erhant 2026-06-27 19:22:54 +03:00
parent 2be1107d07
commit bb75b9857b

View File

@ -6,10 +6,9 @@ use common::block::Block;
// TODO: Remove after testnet
use futures::StreamExt as _;
pub use ingest_error::BlockIngestError;
use lee::GENESIS_BLOCK_ID;
use log::{error, info, warn};
use logos_blockchain_zone_sdk::{
CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
CommonHttpClient, Slot, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
};
pub use stall_reason::StallReason;
@ -24,42 +23,17 @@ pub mod ingest_error;
pub mod stall_reason;
pub mod status;
/// First post-genesis L2 block.
///
/// We use this to differentiate between a local rocksdb chain
/// and the connected chain, to see if we should reset (mostly dev purposes).
/// While such a discrepancy will not happen during live-chain indexing, this
/// saves us some trouble during development (especially when used within UI
/// with the indexer module).
///
/// Genesis is deterministic so it is byte-identical across chains, so instead
/// we use the second block to differentiate between chains.
const ANCHOR_BLOCK_ID: u64 = GENESIS_BLOCK_ID + 1;
/// Result of comparing the indexer's stored anchor block against the channel's.
/// Result of comparing the indexer's stored tip against the channel.
enum ChainIdentityOutcome {
/// One of the following possibilities:
/// - Anchors match
/// - Nothing to compare (store has only genesis)
/// - L1 was unreadable (proceed from the persisted cursor; fail-over)
/// Proceed from the persisted cursor. Either the channel still serves our tip
/// (same chain), there is nothing to compare (empty store / parked / no cursor),
/// or the check was inconclusive (L1 unreadable, or bedrock's LIB still behind
/// our tip slot) — none of which prove a reset.
Consistent,
/// The store holds a different chain than the channel now serves; `detail`
/// describes how (differing anchor, or the channel lacks the anchor entirely).
/// The channel serves a *different* block at the tip's id — a chain reset.
Mismatch { detail: String },
}
/// Outcome of reading the channel's anchor block at startup.
enum AnchorRead {
/// The channel's anchor block.
Found(Block),
/// The channel definitively has no anchor (drained up to LIB without it).
/// Since our stored anchor was finalized, a finalized channel that lacks it is
/// a different chain.
Absent,
/// Could not read the channel in time (slow/unreachable L1) — best-effort skip.
Unreadable,
}
#[derive(Clone)]
pub struct IndexerCore {
pub zone_indexer: Arc<ZoneIndexer<NodeHttpClient>>,
@ -91,10 +65,9 @@ impl IndexerCore {
}
/// Builds the core, then verifies the stored chain matches the channel's by
/// comparing the anchor block (id 2 — genesis is identical across chains, so
/// it can't detect a reset). On mismatch: refuse (error) unless `allow_reset`,
/// in which case wipe the store and re-index from scratch. Used at service/FFI
/// startup in place of `new`.
/// re-reading the channel at the stored tip's position. On mismatch: refuse
/// (error) unless `allow_reset`, in which case wipe the store and re-index from
/// scratch. Used at service/FFI startup in place of `new`.
pub async fn new_with_chain_check(
config: IndexerConfig,
storage_dir: &Path,
@ -122,64 +95,86 @@ impl IndexerCore {
}
}
/// Compares the stored anchor block (id 2) against the channel's current one.
/// An absent channel anchor means a different (shorter) chain, since our stored
/// anchor was finalized and finalized history does not shrink on the same chain.
/// Verifies the channel still serves our chain at the tip's L1 slot (the persisted
/// cursor), by comparing the first block it serves there against our stored block
/// of the *same id*.
///
/// This is mostly a development convenience: it lets the indexer notice that its
/// local state belongs to a different chain than the one it is now connected to
/// (e.g. a wiped/restarted bedrock) and reset instead of silently diverging. It
/// will not trigger during normal live indexing. Reading at the cursor — which is
/// recent — keeps this to ~one batch rather than a scan from genesis (L1 slots
/// are wall-clock-derived, so genesis is millions of empty slots away).
async fn chain_identity_outcome(&self) -> Result<ChainIdentityOutcome> {
let Some(stored) = self.store.get_block_at_id(ANCHOR_BLOCK_ID)? else {
// Store has at most genesis: nothing post-genesis to compare against.
// We deliberately do NOT skip parked stores: a parked store must still be able
// to detect a reset, or it stays parked across reboots forever (its persisted
// stall would short-circuit the check every boot). A same-chain park is still
// safe — the parked block sits at an id we never applied, so the lookup below
// misses and we stay Consistent.
let Some(cursor) = self.store.get_zone_cursor()? else {
return Ok(ChainIdentityOutcome::Consistent); // empty / cold store: nothing to verify
};
// Compare the first block the channel serves at/after the cursor against our
// stored block of the same id. On the same chain that is our tip and matches;
// a reset serves a different block here — crucially including a freshly
// restarted, *shorter* chain whose low-id block at this slot differs from ours
// (the old "look for our tip id" approach missed this: a short chain has no
// block at our tip id).
//
// Anything inconclusive stays Consistent (proceed, let ingest park if truly
// divergent) rather than wiping a valid store: an empty/unreadable read (most
// importantly bedrock's LIB still behind our tip slot), or a channel block at
// an id we don't hold. Blind spot: a store holding only genesis can't be
// distinguished (genesis is deterministic across chains), but that window is
// transient.
let Some(channel_block) = self.fetch_channel_block_from(cursor).await? else {
return Ok(ChainIdentityOutcome::Consistent);
};
Ok(match self.fetch_channel_anchor().await? {
AnchorRead::Found(current) => compare_anchor(&stored, &current),
AnchorRead::Absent => ChainIdentityOutcome::Mismatch {
detail: format!(
"channel serves no block {ANCHOR_BLOCK_ID}, but the store holds anchor {}",
stored.header.hash
),
},
AnchorRead::Unreadable => ChainIdentityOutcome::Consistent,
})
let Some(ours) = self.store.get_block_at_id(channel_block.header.block_id)? else {
return Ok(ChainIdentityOutcome::Consistent);
};
Ok(compare_block(&ours, &channel_block))
}
/// Reads the channel's anchor block (first `Block` with id [`ANCHOR_BLOCK_ID`])
/// from the start of the channel.
/// Reads the channel starting at the tip's L1 slot (the `cursor`) and returns the
/// first block it serves there. `next_messages` is exclusive of its argument, so
/// `cursor - 1` is passed to include the tip's own slot.
///
/// Bedrock can be slow to serve the channel right after boot, so we allow a
/// generous timeout. `Unreadable` on error/timeout keeps startup best-effort
/// (never refuse on a transient L1 hiccup); `Absent` means the channel has no
/// anchor in its finalized history, which the caller treats as a reset.
async fn fetch_channel_anchor(&self) -> Result<AnchorRead> {
const ANCHOR_FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(180);
/// Cheap: the cursor is recent, so this reads roughly one batch. `None` covers the
/// inconclusive cases — a slow/unreachable L1 (timeout/error) or bedrock's LIB
/// still behind our tip slot (empty stream) — neither of which proves a reset.
async fn fetch_channel_block_from(&self, cursor: Slot) -> Result<Option<Block>> {
const TIP_FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
// A slot-0 cursor is degenerate (real inscriptions live at wall-clock slots);
// bail rather than let `next_messages(None)` fall back to a from-genesis scan.
let Some(from_slot) = cursor.into_inner().checked_sub(1) else {
return Ok(None);
};
let fetch = async {
let stream = self.zone_indexer.next_messages(None).await?;
let stream = self
.zone_indexer
.next_messages(Some(Slot::from(from_slot)))
.await?;
let mut stream = std::pin::pin!(stream);
while let Some((msg, _slot)) = stream.next().await {
let ZoneMessage::Block(zone_block) = msg else {
continue;
};
let block: Block = borsh::from_slice(&zone_block.data)
.context("Failed to deserialize channel block")?;
if block.header.block_id == ANCHOR_BLOCK_ID {
return Ok::<AnchorRead, anyhow::Error>(AnchorRead::Found(block));
}
if block.header.block_id > ANCHOR_BLOCK_ID {
break; // blocks arrive in order: we passed the anchor without seeing it
if let ZoneMessage::Block(zone_block) = msg {
let block: Block = borsh::from_slice(&zone_block.data)
.context("Failed to deserialize channel block")?;
return Ok::<Option<Block>, anyhow::Error>(Some(block));
}
}
Ok(AnchorRead::Absent)
Ok(None)
};
match tokio::time::timeout(ANCHOR_FETCH_TIMEOUT, fetch).await {
Ok(Ok(read)) => Ok(read),
match tokio::time::timeout(TIP_FETCH_TIMEOUT, fetch).await {
Ok(Ok(found)) => Ok(found),
Ok(Err(err)) => {
warn!(
"Failed to read channel anchor for the consistency check; proceeding: {err:#}"
);
Ok(AnchorRead::Unreadable)
warn!("Failed to read channel tip for the consistency check; proceeding: {err:#}");
Ok(None)
}
Err(_elapsed) => {
warn!("Timed out reading channel anchor for the consistency check; proceeding");
Ok(AnchorRead::Unreadable)
warn!("Timed out reading channel tip for the consistency check; proceeding");
Ok(None)
}
}
}
@ -326,16 +321,19 @@ impl IndexerCore {
}
}
/// Pure comparison of two anchor blocks: a mismatch is differing hashes. The
/// missing-side cases are handled upstream (`Absent`/`Unreadable`).
fn compare_anchor(stored: &Block, current: &Block) -> ChainIdentityOutcome {
if stored.header.hash == current.header.hash {
/// Pure comparison of our stored block against the channel's block at the same id:
/// a mismatch is differing hashes. Missing/unreadable cases are handled upstream.
fn compare_block(ours: &Block, channel: &Block) -> ChainIdentityOutcome {
if ours.header.hash == channel.header.hash {
ChainIdentityOutcome::Consistent
} else {
ChainIdentityOutcome::Mismatch {
detail: format!(
"stored anchor {} != channel anchor {}",
stored.header.hash, current.header.hash
"stored block {} {} != channel block {} {}",
ours.header.block_id,
ours.header.hash,
channel.header.block_id,
channel.header.hash
),
}
}
@ -345,27 +343,27 @@ fn compare_anchor(stored: &Block, current: &Block) -> ChainIdentityOutcome {
mod chain_identity_tests {
use common::{HashType, block::Block, test_utils::produce_dummy_block};
use super::{ANCHOR_BLOCK_ID, ChainIdentityOutcome, compare_anchor};
use super::{ChainIdentityOutcome, compare_block};
fn anchor_with_prev(prev_seed: u8) -> Block {
produce_dummy_block(ANCHOR_BLOCK_ID, Some(HashType([prev_seed; 32])), vec![])
fn block_with_prev(prev_seed: u8) -> Block {
produce_dummy_block(5, Some(HashType([prev_seed; 32])), vec![])
}
#[test]
fn matching_anchor_is_consistent() {
let a = anchor_with_prev(1);
fn matching_block_is_consistent() {
let b = block_with_prev(1);
assert!(matches!(
compare_anchor(&a, &a),
compare_block(&b, &b),
ChainIdentityOutcome::Consistent
));
}
#[test]
fn differing_anchor_is_mismatch() {
let stored = anchor_with_prev(1);
let current = anchor_with_prev(2);
fn differing_block_is_mismatch() {
let stored = block_with_prev(1);
let current = block_with_prev(2);
assert!(matches!(
compare_anchor(&stored, &current),
compare_block(&stored, &current),
ChainIdentityOutcome::Mismatch { .. }
));
}