feat(indexer): add startup genesis-consistency check

test `RISC0_DEV_MODE=1 RISC0_SKIP_BUILD=1 cargo test -p indexer_core`
This commit is contained in:
erhant 2026-06-26 20:37:35 +03:00
parent 3a544aab32
commit e04626063f
3 changed files with 173 additions and 2 deletions

View File

@ -20,6 +20,12 @@ pub struct IndexerConfig {
pub consensus_info_polling_interval: Duration,
pub bedrock_config: ClientConfig,
pub channel_id: ChannelId,
/// Whether to wipe the indexer store and re-index from scratch when a genesis mismatch occurs
/// (i.e. the L1/sequencer was reset but the old store was reused).
///
/// Defaults to `false`: on mismatch the indexer refuses to start.
#[serde(default)]
pub allow_chain_reset: bool,
}
impl IndexerConfig {
@ -37,3 +43,32 @@ impl IndexerConfig {
})
}
}
#[cfg(test)]
mod tests {
use super::IndexerConfig;
const MINIMAL_CONFIG: &str = r#"{
"consensus_info_polling_interval": "1s",
"bedrock_config": { "addr": "http://localhost:18080" },
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101"
}"#;
#[test]
fn allow_chain_reset_defaults_to_false() {
let config: IndexerConfig = serde_json::from_str(MINIMAL_CONFIG).unwrap();
assert!(!config.allow_chain_reset);
}
#[test]
fn allow_chain_reset_parses_true() {
let json = r#"{
"consensus_info_polling_interval": "1s",
"bedrock_config": { "addr": "http://localhost:18080" },
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"allow_chain_reset": true
}"#;
let config: IndexerConfig = serde_json::from_str(json).unwrap();
assert!(config.allow_chain_reset);
}
}

View File

@ -1,11 +1,12 @@
use std::{path::Path, sync::Arc};
use anyhow::Result;
use anyhow::{Context as _, Result};
use arc_swap::ArcSwap;
use common::block::Block;
use common::{HashType, block::Block};
// ToDo: Remove after testnet
use futures::StreamExt as _;
pub use ingest_error::BlockIngestError;
use lee::GENESIS_BLOCK_ID;
use log::{error, info, warn};
use logos_blockchain_zone_sdk::{
CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
@ -23,6 +24,14 @@ pub mod ingest_error;
pub mod stall_reason;
pub mod status;
/// Result of comparing the indexer's stored genesis against the channel's.
enum GenesisOutcome {
/// Match, or nothing to compare (fresh store / empty channel / L1 unreachable) — proceed.
Consistent,
/// The store holds a different chain than the channel now serves.
Mismatch { stored: HashType, current: HashType },
}
#[derive(Clone)]
pub struct IndexerCore {
pub zone_indexer: Arc<ZoneIndexer<NodeHttpClient>>,
@ -53,6 +62,73 @@ impl IndexerCore {
})
}
/// Builds the core, then verifies the stored genesis matches the channel's.
/// On mismatch: refuse (error) unless `allow_reset`, in which case wipe the store
/// and re-index from scratch. Used at service/FFI startup in place of `new`.
pub async fn new_with_genesis_check(
config: IndexerConfig,
storage_dir: &Path,
allow_reset: bool,
) -> Result<Self> {
let home = storage_dir.join(format!("rocksdb-{}", config.channel_id));
let core = Self::new(config.clone(), storage_dir)?;
match core.genesis_outcome().await? {
GenesisOutcome::Consistent => Ok(core),
GenesisOutcome::Mismatch { stored, current } if allow_reset => {
warn!(
"Chain reset detected: stored genesis {stored} != channel genesis {current}. \
Wiping indexer store at {} and re-indexing.",
home.display()
);
drop(core); // sole owner before the ingest task is spawned → closes the DB
storage::indexer::RocksDBIO::destroy(&home)?;
Self::new(config, storage_dir)
}
GenesisOutcome::Mismatch { stored, current } => Err(anyhow::anyhow!(
"Indexer store at {} holds a different chain than the channel now serves \
(stored genesis {stored} != channel genesis {current}). Run `just clean`, \
point at a fresh storage dir, or set `allow_chain_reset` in the indexer config.",
home.display()
)),
}
}
/// Compares the stored genesis (block 1) against the channel's current genesis.
async fn genesis_outcome(&self) -> Result<GenesisOutcome> {
let stored = self.store.get_block_at_id(GENESIS_BLOCK_ID)?;
if stored.is_none() {
return Ok(GenesisOutcome::Consistent); // fresh store: skip the L1 read entirely
}
let current = self.fetch_channel_genesis().await?;
Ok(compare_genesis(stored.as_ref(), current.as_ref()))
}
/// Reads the channel's genesis (first `Block`) from the start of the channel.
/// Returns `None` if the channel has no block yet or L1 can't be reached within
/// the timeout — the check is best-effort and must never block or fail startup.
async fn fetch_channel_genesis(&self) -> Result<Option<Block>> {
const GENESIS_FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
let fetch = async {
let stream = self.zone_indexer.next_messages(None).await?;
let mut stream = std::pin::pin!(stream);
while let Some((msg, _slot)) = stream.next().await {
if let ZoneMessage::Block(zone_block) = msg {
let block: Block = borsh::from_slice(&zone_block.data)
.context("Failed to deserialize channel genesis block")?;
return Ok::<Option<Block>, anyhow::Error>(Some(block));
}
}
Ok(None)
};
match tokio::time::timeout(GENESIS_FETCH_TIMEOUT, fetch).await {
Ok(res) => res,
Err(_elapsed) => {
warn!("Timed out reading channel genesis for the consistency check; proceeding");
Ok(None)
}
}
}
/// Snapshot of the current ingestion status (sync state + indexed tip).
///
/// Combines the ingest loop's live status with the L2 tip read fresh from the
@ -194,3 +270,62 @@ impl IndexerCore {
}
}
}
/// Pure comparison: a mismatch requires BOTH a stored and a channel genesis whose
/// hashes differ; any missing side is treated as Consistent (nothing to act on).
fn compare_genesis(stored: Option<&Block>, current: Option<&Block>) -> GenesisOutcome {
match (stored, current) {
(Some(s), Some(c)) if s.header.hash != c.header.hash => GenesisOutcome::Mismatch {
stored: s.header.hash,
current: c.header.hash,
},
_ => GenesisOutcome::Consistent,
}
}
#[cfg(test)]
mod genesis_check_tests {
use common::{HashType, block::Block, test_utils::produce_dummy_block};
use super::{GenesisOutcome, compare_genesis};
fn genesis_with_prev(prev_seed: u8) -> Block {
produce_dummy_block(1, Some(HashType([prev_seed; 32])), vec![])
}
#[test]
fn matching_genesis_is_consistent() {
let g = genesis_with_prev(1);
assert!(matches!(
compare_genesis(Some(&g), Some(&g)),
GenesisOutcome::Consistent
));
}
#[test]
fn differing_genesis_is_mismatch() {
let stored = genesis_with_prev(1);
let current = genesis_with_prev(2);
assert!(matches!(
compare_genesis(Some(&stored), Some(&current)),
GenesisOutcome::Mismatch { .. }
));
}
#[test]
fn missing_either_side_is_consistent() {
let g = genesis_with_prev(1);
assert!(matches!(
compare_genesis(None, Some(&g)),
GenesisOutcome::Consistent
));
assert!(matches!(
compare_genesis(Some(&g), None),
GenesisOutcome::Consistent
));
assert!(matches!(
compare_genesis(None, None),
GenesisOutcome::Consistent
));
}
}

View File

@ -172,6 +172,7 @@ pub fn indexer_config(bedrock_addr: SocketAddr) -> Result<IndexerConfig> {
auth: None,
},
channel_id: bedrock_channel_id(),
allow_chain_reset: false,
})
}