diff --git a/lez/indexer/core/src/lib.rs b/lez/indexer/core/src/lib.rs index 2ba9186d..73bb0505 100644 --- a/lez/indexer/core/src/lib.rs +++ b/lez/indexer/core/src/lib.rs @@ -2,8 +2,8 @@ use std::{path::Path, sync::Arc}; use anyhow::{Context as _, Result}; use arc_swap::ArcSwap; -use common::{HashType, block::Block}; -// ToDo: Remove after testnet +use common::block::Block; +// TODO: Remove after testnet use futures::StreamExt as _; pub use ingest_error::BlockIngestError; use lee::GENESIS_BLOCK_ID; @@ -24,12 +24,40 @@ pub mod ingest_error; pub mod stall_reason; pub mod status; -/// Result of comparing the indexer's stored genesis against the channel's. -enum GenesisOutcome { - /// Match, or nothing to compare (fresh store / empty channel / L1 unreachable) — proceed. +/// First post-genesis L2 block. +/// +/// We use this to differentiate between a local rocksdb chain +/// and the connected chain, to see if we should reset (mostly dev purposes). +/// While such a discrepancy will not happen during live-chain indexing, this +/// saves us some trouble during development (especially when used within UI +/// with the indexer module). +/// +/// Genesis is deterministic so it is byte-identical across chains, so instead +/// we use the second block to differentiate between chains. +const ANCHOR_BLOCK_ID: u64 = GENESIS_BLOCK_ID + 1; + +/// Result of comparing the indexer's stored anchor block against the channel's. +enum ChainIdentityOutcome { + /// One of the following possibilities: + /// - Anchors match + /// - Nothing to compare (store has only genesis) + /// - L1 was unreadable (proceed from the persisted cursor; fail-over) Consistent, - /// The store holds a different chain than the channel now serves. - Mismatch { stored: HashType, current: HashType }, + /// The store holds a different chain than the channel now serves; `detail` + /// describes how (differing anchor, or the channel lacks the anchor entirely). + Mismatch { detail: String }, +} + +/// Outcome of reading the channel's anchor block at startup. +enum AnchorRead { + /// The channel's anchor block. + Found(Block), + /// The channel definitively has no anchor (drained up to LIB without it). + /// Since our stored anchor was finalized, a finalized channel that lacks it is + /// a different chain. + Absent, + /// Could not read the channel in time (slow/unreachable L1) — best-effort skip. + Unreadable, } #[derive(Clone)] @@ -62,80 +90,96 @@ impl IndexerCore { }) } - /// Builds the core, then verifies the stored genesis matches the channel's. - /// On mismatch: refuse (error) unless `allow_reset`, in which case wipe the store - /// and re-index from scratch. Used at service/FFI startup in place of `new`. - pub async fn new_with_genesis_check( + /// Builds the core, then verifies the stored chain matches the channel's by + /// comparing the anchor block (id 2 — genesis is identical across chains, so + /// it can't detect a reset). On mismatch: refuse (error) unless `allow_reset`, + /// in which case wipe the store and re-index from scratch. Used at service/FFI + /// startup in place of `new`. + pub async fn new_with_chain_check( config: IndexerConfig, storage_dir: &Path, allow_reset: bool, ) -> Result { let home = storage_dir.join(format!("rocksdb-{}", config.channel_id)); let core = Self::new(config.clone(), storage_dir)?; - match core.genesis_outcome().await? { - GenesisOutcome::Consistent => Ok(core), - GenesisOutcome::Mismatch { stored, current } if allow_reset => { + match core.chain_identity_outcome().await? { + ChainIdentityOutcome::Consistent => Ok(core), + ChainIdentityOutcome::Mismatch { detail } if allow_reset => { warn!( - "Chain reset detected: stored genesis {stored} != channel genesis {current}. \ - Wiping indexer store at {} and re-indexing.", + "Chain reset detected ({detail}). Wiping indexer store at {} and re-indexing.", home.display() ); drop(core); // sole owner before the ingest task is spawned → closes the DB storage::indexer::RocksDBIO::destroy(&home)?; Self::new(config, storage_dir) } - GenesisOutcome::Mismatch { stored, current } => Err(anyhow::anyhow!( + ChainIdentityOutcome::Mismatch { detail } => Err(anyhow::anyhow!( "Indexer store at {} holds a different chain than the channel now serves \ - (stored genesis {stored} != channel genesis {current}). Run `just clean`, \ - point at a fresh storage dir, or set `allow_chain_reset` in the indexer config.", + ({detail}). Run `just clean`, point at a fresh storage dir, or set \ + `allow_chain_reset` in the indexer config.", home.display() )), } } - /// Compares the stored genesis (block 1) against the channel's current genesis. - async fn genesis_outcome(&self) -> Result { - let stored = self.store.get_block_at_id(GENESIS_BLOCK_ID)?; - if stored.is_none() { - return Ok(GenesisOutcome::Consistent); // fresh store: skip the L1 read entirely - } - let current = self.fetch_channel_genesis().await?; - Ok(compare_genesis(stored.as_ref(), current.as_ref())) + /// Compares the stored anchor block (id 2) against the channel's current one. + /// An absent channel anchor means a different (shorter) chain, since our stored + /// anchor was finalized and finalized history does not shrink on the same chain. + async fn chain_identity_outcome(&self) -> Result { + let Some(stored) = self.store.get_block_at_id(ANCHOR_BLOCK_ID)? else { + // Store has at most genesis: nothing post-genesis to compare against. + return Ok(ChainIdentityOutcome::Consistent); + }; + Ok(match self.fetch_channel_anchor().await? { + AnchorRead::Found(current) => compare_anchor(&stored, ¤t), + AnchorRead::Absent => ChainIdentityOutcome::Mismatch { + detail: format!( + "channel serves no block {ANCHOR_BLOCK_ID}, but the store holds anchor {}", + stored.header.hash + ), + }, + AnchorRead::Unreadable => ChainIdentityOutcome::Consistent, + }) } - /// Reads the channel's genesis (first `Block`) from the start of the channel. + /// Reads the channel's anchor block (first `Block` with id [`ANCHOR_BLOCK_ID`]) + /// from the start of the channel. /// /// Bedrock can be slow to serve the channel right after boot, so we allow a - /// generous timeout. Returns `None` if the channel has no block yet, the read - /// errors, or the timeout elapses — the check is best-effort and must never - /// fail startup; a genuine reset is still caught by the per-block park logic. - async fn fetch_channel_genesis(&self) -> Result> { - const GENESIS_FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(180); + /// generous timeout. `Unreadable` on error/timeout keeps startup best-effort + /// (never refuse on a transient L1 hiccup); `Absent` means the channel has no + /// anchor in its finalized history, which the caller treats as a reset. + async fn fetch_channel_anchor(&self) -> Result { + const ANCHOR_FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(180); let fetch = async { let stream = self.zone_indexer.next_messages(None).await?; let mut stream = std::pin::pin!(stream); while let Some((msg, _slot)) = stream.next().await { - if let ZoneMessage::Block(zone_block) = msg { - let block: Block = borsh::from_slice(&zone_block.data) - .context("Failed to deserialize channel genesis block")?; - return Ok::, anyhow::Error>(Some(block)); + let ZoneMessage::Block(zone_block) = msg else { + continue; + }; + let block: Block = borsh::from_slice(&zone_block.data) + .context("Failed to deserialize channel block")?; + if block.header.block_id == ANCHOR_BLOCK_ID { + return Ok::(AnchorRead::Found(block)); + } + if block.header.block_id > ANCHOR_BLOCK_ID { + break; // blocks arrive in order: we passed the anchor without seeing it } } - Ok(None) + Ok(AnchorRead::Absent) }; - match tokio::time::timeout(GENESIS_FETCH_TIMEOUT, fetch).await { - Ok(Ok(maybe_block)) => Ok(maybe_block), - // A read error (e.g. bedrock briefly unreachable at boot) must not refuse - // startup — skip the check and proceed, consistent with the ingest loop. + match tokio::time::timeout(ANCHOR_FETCH_TIMEOUT, fetch).await { + Ok(Ok(read)) => Ok(read), Ok(Err(err)) => { warn!( - "Failed to read channel genesis for the consistency check; proceeding: {err:#}" + "Failed to read channel anchor for the consistency check; proceeding: {err:#}" ); - Ok(None) + Ok(AnchorRead::Unreadable) } Err(_elapsed) => { - warn!("Timed out reading channel genesis for the consistency check; proceeding"); - Ok(None) + warn!("Timed out reading channel anchor for the consistency check; proceeding"); + Ok(AnchorRead::Unreadable) } } } @@ -282,61 +326,47 @@ impl IndexerCore { } } -/// Pure comparison: a mismatch requires BOTH a stored and a channel genesis whose -/// hashes differ; any missing side is treated as Consistent (nothing to act on). -fn compare_genesis(stored: Option<&Block>, current: Option<&Block>) -> GenesisOutcome { - match (stored, current) { - (Some(s), Some(c)) if s.header.hash != c.header.hash => GenesisOutcome::Mismatch { - stored: s.header.hash, - current: c.header.hash, - }, - _ => GenesisOutcome::Consistent, +/// Pure comparison of two anchor blocks: a mismatch is differing hashes. The +/// missing-side cases are handled upstream (`Absent`/`Unreadable`). +fn compare_anchor(stored: &Block, current: &Block) -> ChainIdentityOutcome { + if stored.header.hash == current.header.hash { + ChainIdentityOutcome::Consistent + } else { + ChainIdentityOutcome::Mismatch { + detail: format!( + "stored anchor {} != channel anchor {}", + stored.header.hash, current.header.hash + ), + } } } #[cfg(test)] -mod genesis_check_tests { +mod chain_identity_tests { use common::{HashType, block::Block, test_utils::produce_dummy_block}; - use super::{GenesisOutcome, compare_genesis}; + use super::{ANCHOR_BLOCK_ID, ChainIdentityOutcome, compare_anchor}; - fn genesis_with_prev(prev_seed: u8) -> Block { - produce_dummy_block(1, Some(HashType([prev_seed; 32])), vec![]) + fn anchor_with_prev(prev_seed: u8) -> Block { + produce_dummy_block(ANCHOR_BLOCK_ID, Some(HashType([prev_seed; 32])), vec![]) } #[test] - fn matching_genesis_is_consistent() { - let g = genesis_with_prev(1); + fn matching_anchor_is_consistent() { + let a = anchor_with_prev(1); assert!(matches!( - compare_genesis(Some(&g), Some(&g)), - GenesisOutcome::Consistent + compare_anchor(&a, &a), + ChainIdentityOutcome::Consistent )); } #[test] - fn differing_genesis_is_mismatch() { - let stored = genesis_with_prev(1); - let current = genesis_with_prev(2); + fn differing_anchor_is_mismatch() { + let stored = anchor_with_prev(1); + let current = anchor_with_prev(2); assert!(matches!( - compare_genesis(Some(&stored), Some(¤t)), - GenesisOutcome::Mismatch { .. } - )); - } - - #[test] - fn missing_either_side_is_consistent() { - let g = genesis_with_prev(1); - assert!(matches!( - compare_genesis(None, Some(&g)), - GenesisOutcome::Consistent - )); - assert!(matches!( - compare_genesis(Some(&g), None), - GenesisOutcome::Consistent - )); - assert!(matches!( - compare_genesis(None, None), - GenesisOutcome::Consistent + compare_anchor(&stored, ¤t), + ChainIdentityOutcome::Mismatch { .. } )); } } diff --git a/lez/indexer/ffi/src/api/lifecycle.rs b/lez/indexer/ffi/src/api/lifecycle.rs index b9d5873b..1a71718c 100644 --- a/lez/indexer/ffi/src/api/lifecycle.rs +++ b/lez/indexer/ffi/src/api/lifecycle.rs @@ -114,7 +114,7 @@ unsafe fn setup_indexer( let allow_reset = config.allow_chain_reset; let core = runtime - .block_on(IndexerCore::new_with_genesis_check( + .block_on(IndexerCore::new_with_chain_check( config, &storage_dir, allow_reset, diff --git a/lez/indexer/service/configs/docker/indexer_config.json b/lez/indexer/service/configs/docker/indexer_config.json index f083ca27..ce28af0b 100644 --- a/lez/indexer/service/configs/docker/indexer_config.json +++ b/lez/indexer/service/configs/docker/indexer_config.json @@ -3,5 +3,6 @@ "bedrock_config": { "addr": "http://host.docker.internal:18080" }, - "channel_id": "0101010101010101010101010101010101010101010101010101010101010101" + "channel_id": "0101010101010101010101010101010101010101010101010101010101010101", + "allow_chain_reset": true } diff --git a/lez/indexer/service/src/service.rs b/lez/indexer/service/src/service.rs index 7a0b3a87..ca42fc06 100644 --- a/lez/indexer/service/src/service.rs +++ b/lez/indexer/service/src/service.rs @@ -21,7 +21,7 @@ pub struct IndexerService { impl IndexerService { pub async fn new(config: IndexerConfig, storage_dir: &Path) -> Result { let allow_reset = config.allow_chain_reset; - let indexer = IndexerCore::new_with_genesis_check(config, storage_dir, allow_reset).await?; + let indexer = IndexerCore::new_with_chain_check(config, storage_dir, allow_reset).await?; let subscription_service = SubscriptionService::spawn_new(indexer.clone()); Ok(Self {