From e04626063f3688c5bb7d5525da2ff5caa31e5daa Mon Sep 17 00:00:00 2001 From: erhant Date: Fri, 26 Jun 2026 20:37:35 +0300 Subject: [PATCH] feat(indexer): add startup genesis-consistency check test `RISC0_DEV_MODE=1 RISC0_SKIP_BUILD=1 cargo test -p indexer_core` --- lez/indexer/core/src/config.rs | 35 +++++++++ lez/indexer/core/src/lib.rs | 139 ++++++++++++++++++++++++++++++++- test_fixtures/src/config.rs | 1 + 3 files changed, 173 insertions(+), 2 deletions(-) diff --git a/lez/indexer/core/src/config.rs b/lez/indexer/core/src/config.rs index cb7f3dfe..580b9fbd 100644 --- a/lez/indexer/core/src/config.rs +++ b/lez/indexer/core/src/config.rs @@ -20,6 +20,12 @@ pub struct IndexerConfig { pub consensus_info_polling_interval: Duration, pub bedrock_config: ClientConfig, pub channel_id: ChannelId, + /// Whether to wipe the indexer store and re-index from scratch when a genesis mismatch occurs + /// (i.e. the L1/sequencer was reset but the old store was reused). + /// + /// Defaults to `false`: on mismatch the indexer refuses to start. + #[serde(default)] + pub allow_chain_reset: bool, } impl IndexerConfig { @@ -37,3 +43,32 @@ impl IndexerConfig { }) } } + +#[cfg(test)] +mod tests { + use super::IndexerConfig; + + const MINIMAL_CONFIG: &str = r#"{ + "consensus_info_polling_interval": "1s", + "bedrock_config": { "addr": "http://localhost:18080" }, + "channel_id": "0101010101010101010101010101010101010101010101010101010101010101" + }"#; + + #[test] + fn allow_chain_reset_defaults_to_false() { + let config: IndexerConfig = serde_json::from_str(MINIMAL_CONFIG).unwrap(); + assert!(!config.allow_chain_reset); + } + + #[test] + fn allow_chain_reset_parses_true() { + let json = r#"{ + "consensus_info_polling_interval": "1s", + "bedrock_config": { "addr": "http://localhost:18080" }, + "channel_id": "0101010101010101010101010101010101010101010101010101010101010101", + "allow_chain_reset": true + }"#; + let config: IndexerConfig = serde_json::from_str(json).unwrap(); + assert!(config.allow_chain_reset); + } +} diff --git a/lez/indexer/core/src/lib.rs b/lez/indexer/core/src/lib.rs index 74fa6582..1f915ba4 100644 --- a/lez/indexer/core/src/lib.rs +++ b/lez/indexer/core/src/lib.rs @@ -1,11 +1,12 @@ use std::{path::Path, sync::Arc}; -use anyhow::Result; +use anyhow::{Context as _, Result}; use arc_swap::ArcSwap; -use common::block::Block; +use common::{HashType, block::Block}; // ToDo: Remove after testnet use futures::StreamExt as _; pub use ingest_error::BlockIngestError; +use lee::GENESIS_BLOCK_ID; use log::{error, info, warn}; use logos_blockchain_zone_sdk::{ CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer, @@ -23,6 +24,14 @@ pub mod ingest_error; pub mod stall_reason; pub mod status; +/// Result of comparing the indexer's stored genesis against the channel's. +enum GenesisOutcome { + /// Match, or nothing to compare (fresh store / empty channel / L1 unreachable) — proceed. + Consistent, + /// The store holds a different chain than the channel now serves. + Mismatch { stored: HashType, current: HashType }, +} + #[derive(Clone)] pub struct IndexerCore { pub zone_indexer: Arc>, @@ -53,6 +62,73 @@ impl IndexerCore { }) } + /// Builds the core, then verifies the stored genesis matches the channel's. + /// On mismatch: refuse (error) unless `allow_reset`, in which case wipe the store + /// and re-index from scratch. Used at service/FFI startup in place of `new`. + pub async fn new_with_genesis_check( + config: IndexerConfig, + storage_dir: &Path, + allow_reset: bool, + ) -> Result { + let home = storage_dir.join(format!("rocksdb-{}", config.channel_id)); + let core = Self::new(config.clone(), storage_dir)?; + match core.genesis_outcome().await? { + GenesisOutcome::Consistent => Ok(core), + GenesisOutcome::Mismatch { stored, current } if allow_reset => { + warn!( + "Chain reset detected: stored genesis {stored} != channel genesis {current}. \ + Wiping indexer store at {} and re-indexing.", + home.display() + ); + drop(core); // sole owner before the ingest task is spawned → closes the DB + storage::indexer::RocksDBIO::destroy(&home)?; + Self::new(config, storage_dir) + } + GenesisOutcome::Mismatch { stored, current } => Err(anyhow::anyhow!( + "Indexer store at {} holds a different chain than the channel now serves \ + (stored genesis {stored} != channel genesis {current}). Run `just clean`, \ + point at a fresh storage dir, or set `allow_chain_reset` in the indexer config.", + home.display() + )), + } + } + + /// Compares the stored genesis (block 1) against the channel's current genesis. + async fn genesis_outcome(&self) -> Result { + let stored = self.store.get_block_at_id(GENESIS_BLOCK_ID)?; + if stored.is_none() { + return Ok(GenesisOutcome::Consistent); // fresh store: skip the L1 read entirely + } + let current = self.fetch_channel_genesis().await?; + Ok(compare_genesis(stored.as_ref(), current.as_ref())) + } + + /// Reads the channel's genesis (first `Block`) from the start of the channel. + /// Returns `None` if the channel has no block yet or L1 can't be reached within + /// the timeout — the check is best-effort and must never block or fail startup. + async fn fetch_channel_genesis(&self) -> Result> { + const GENESIS_FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + let fetch = async { + let stream = self.zone_indexer.next_messages(None).await?; + let mut stream = std::pin::pin!(stream); + while let Some((msg, _slot)) = stream.next().await { + if let ZoneMessage::Block(zone_block) = msg { + let block: Block = borsh::from_slice(&zone_block.data) + .context("Failed to deserialize channel genesis block")?; + return Ok::, anyhow::Error>(Some(block)); + } + } + Ok(None) + }; + match tokio::time::timeout(GENESIS_FETCH_TIMEOUT, fetch).await { + Ok(res) => res, + Err(_elapsed) => { + warn!("Timed out reading channel genesis for the consistency check; proceeding"); + Ok(None) + } + } + } + /// Snapshot of the current ingestion status (sync state + indexed tip). /// /// Combines the ingest loop's live status with the L2 tip read fresh from the @@ -194,3 +270,62 @@ impl IndexerCore { } } } + +/// Pure comparison: a mismatch requires BOTH a stored and a channel genesis whose +/// hashes differ; any missing side is treated as Consistent (nothing to act on). +fn compare_genesis(stored: Option<&Block>, current: Option<&Block>) -> GenesisOutcome { + match (stored, current) { + (Some(s), Some(c)) if s.header.hash != c.header.hash => GenesisOutcome::Mismatch { + stored: s.header.hash, + current: c.header.hash, + }, + _ => GenesisOutcome::Consistent, + } +} + +#[cfg(test)] +mod genesis_check_tests { + use common::{HashType, block::Block, test_utils::produce_dummy_block}; + + use super::{GenesisOutcome, compare_genesis}; + + fn genesis_with_prev(prev_seed: u8) -> Block { + produce_dummy_block(1, Some(HashType([prev_seed; 32])), vec![]) + } + + #[test] + fn matching_genesis_is_consistent() { + let g = genesis_with_prev(1); + assert!(matches!( + compare_genesis(Some(&g), Some(&g)), + GenesisOutcome::Consistent + )); + } + + #[test] + fn differing_genesis_is_mismatch() { + let stored = genesis_with_prev(1); + let current = genesis_with_prev(2); + assert!(matches!( + compare_genesis(Some(&stored), Some(¤t)), + GenesisOutcome::Mismatch { .. } + )); + } + + #[test] + fn missing_either_side_is_consistent() { + let g = genesis_with_prev(1); + assert!(matches!( + compare_genesis(None, Some(&g)), + GenesisOutcome::Consistent + )); + assert!(matches!( + compare_genesis(Some(&g), None), + GenesisOutcome::Consistent + )); + assert!(matches!( + compare_genesis(None, None), + GenesisOutcome::Consistent + )); + } +} diff --git a/test_fixtures/src/config.rs b/test_fixtures/src/config.rs index 8684dd4d..ebba9783 100644 --- a/test_fixtures/src/config.rs +++ b/test_fixtures/src/config.rs @@ -172,6 +172,7 @@ pub fn indexer_config(bedrock_addr: SocketAddr) -> Result { auth: None, }, channel_id: bedrock_channel_id(), + allow_chain_reset: false, }) }