mirror of
https://github.com/logos-blockchain/logos-execution-zone.git
synced 2026-06-29 18:39:30 +00:00
fix(indexer): detect chain reset via anchor block, **not** deterministic genesis
test `RISC0_DEV_MODE=1 RISC0_SKIP_BUILD=1 cargo test -p indexer_core`
This commit is contained in:
parent
50f5bbd54c
commit
2be1107d07
@ -2,8 +2,8 @@ use std::{path::Path, sync::Arc};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use arc_swap::ArcSwap;
|
||||
use common::{HashType, block::Block};
|
||||
// ToDo: Remove after testnet
|
||||
use common::block::Block;
|
||||
// TODO: Remove after testnet
|
||||
use futures::StreamExt as _;
|
||||
pub use ingest_error::BlockIngestError;
|
||||
use lee::GENESIS_BLOCK_ID;
|
||||
@ -24,12 +24,40 @@ pub mod ingest_error;
|
||||
pub mod stall_reason;
|
||||
pub mod status;
|
||||
|
||||
/// Result of comparing the indexer's stored genesis against the channel's.
|
||||
enum GenesisOutcome {
|
||||
/// Match, or nothing to compare (fresh store / empty channel / L1 unreachable) — proceed.
|
||||
/// First post-genesis L2 block.
|
||||
///
|
||||
/// We use this to differentiate between a local rocksdb chain
|
||||
/// and the connected chain, to see if we should reset (mostly dev purposes).
|
||||
/// While such a discrepancy will not happen during live-chain indexing, this
|
||||
/// saves us some trouble during development (especially when used within UI
|
||||
/// with the indexer module).
|
||||
///
|
||||
/// Genesis is deterministic so it is byte-identical across chains, so instead
|
||||
/// we use the second block to differentiate between chains.
|
||||
const ANCHOR_BLOCK_ID: u64 = GENESIS_BLOCK_ID + 1;
|
||||
|
||||
/// Result of comparing the indexer's stored anchor block against the channel's.
|
||||
enum ChainIdentityOutcome {
|
||||
/// One of the following possibilities:
|
||||
/// - Anchors match
|
||||
/// - Nothing to compare (store has only genesis)
|
||||
/// - L1 was unreadable (proceed from the persisted cursor; fail-over)
|
||||
Consistent,
|
||||
/// The store holds a different chain than the channel now serves.
|
||||
Mismatch { stored: HashType, current: HashType },
|
||||
/// The store holds a different chain than the channel now serves; `detail`
|
||||
/// describes how (differing anchor, or the channel lacks the anchor entirely).
|
||||
Mismatch { detail: String },
|
||||
}
|
||||
|
||||
/// Outcome of reading the channel's anchor block at startup.
|
||||
enum AnchorRead {
|
||||
/// The channel's anchor block.
|
||||
Found(Block),
|
||||
/// The channel definitively has no anchor (drained up to LIB without it).
|
||||
/// Since our stored anchor was finalized, a finalized channel that lacks it is
|
||||
/// a different chain.
|
||||
Absent,
|
||||
/// Could not read the channel in time (slow/unreachable L1) — best-effort skip.
|
||||
Unreadable,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@ -62,80 +90,96 @@ impl IndexerCore {
|
||||
})
|
||||
}
|
||||
|
||||
/// Builds the core, then verifies the stored genesis matches the channel's.
|
||||
/// On mismatch: refuse (error) unless `allow_reset`, in which case wipe the store
|
||||
/// and re-index from scratch. Used at service/FFI startup in place of `new`.
|
||||
pub async fn new_with_genesis_check(
|
||||
/// Builds the core, then verifies the stored chain matches the channel's by
|
||||
/// comparing the anchor block (id 2 — genesis is identical across chains, so
|
||||
/// it can't detect a reset). On mismatch: refuse (error) unless `allow_reset`,
|
||||
/// in which case wipe the store and re-index from scratch. Used at service/FFI
|
||||
/// startup in place of `new`.
|
||||
pub async fn new_with_chain_check(
|
||||
config: IndexerConfig,
|
||||
storage_dir: &Path,
|
||||
allow_reset: bool,
|
||||
) -> Result<Self> {
|
||||
let home = storage_dir.join(format!("rocksdb-{}", config.channel_id));
|
||||
let core = Self::new(config.clone(), storage_dir)?;
|
||||
match core.genesis_outcome().await? {
|
||||
GenesisOutcome::Consistent => Ok(core),
|
||||
GenesisOutcome::Mismatch { stored, current } if allow_reset => {
|
||||
match core.chain_identity_outcome().await? {
|
||||
ChainIdentityOutcome::Consistent => Ok(core),
|
||||
ChainIdentityOutcome::Mismatch { detail } if allow_reset => {
|
||||
warn!(
|
||||
"Chain reset detected: stored genesis {stored} != channel genesis {current}. \
|
||||
Wiping indexer store at {} and re-indexing.",
|
||||
"Chain reset detected ({detail}). Wiping indexer store at {} and re-indexing.",
|
||||
home.display()
|
||||
);
|
||||
drop(core); // sole owner before the ingest task is spawned → closes the DB
|
||||
storage::indexer::RocksDBIO::destroy(&home)?;
|
||||
Self::new(config, storage_dir)
|
||||
}
|
||||
GenesisOutcome::Mismatch { stored, current } => Err(anyhow::anyhow!(
|
||||
ChainIdentityOutcome::Mismatch { detail } => Err(anyhow::anyhow!(
|
||||
"Indexer store at {} holds a different chain than the channel now serves \
|
||||
(stored genesis {stored} != channel genesis {current}). Run `just clean`, \
|
||||
point at a fresh storage dir, or set `allow_chain_reset` in the indexer config.",
|
||||
({detail}). Run `just clean`, point at a fresh storage dir, or set \
|
||||
`allow_chain_reset` in the indexer config.",
|
||||
home.display()
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Compares the stored genesis (block 1) against the channel's current genesis.
|
||||
async fn genesis_outcome(&self) -> Result<GenesisOutcome> {
|
||||
let stored = self.store.get_block_at_id(GENESIS_BLOCK_ID)?;
|
||||
if stored.is_none() {
|
||||
return Ok(GenesisOutcome::Consistent); // fresh store: skip the L1 read entirely
|
||||
}
|
||||
let current = self.fetch_channel_genesis().await?;
|
||||
Ok(compare_genesis(stored.as_ref(), current.as_ref()))
|
||||
/// Compares the stored anchor block (id 2) against the channel's current one.
|
||||
/// An absent channel anchor means a different (shorter) chain, since our stored
|
||||
/// anchor was finalized and finalized history does not shrink on the same chain.
|
||||
async fn chain_identity_outcome(&self) -> Result<ChainIdentityOutcome> {
|
||||
let Some(stored) = self.store.get_block_at_id(ANCHOR_BLOCK_ID)? else {
|
||||
// Store has at most genesis: nothing post-genesis to compare against.
|
||||
return Ok(ChainIdentityOutcome::Consistent);
|
||||
};
|
||||
Ok(match self.fetch_channel_anchor().await? {
|
||||
AnchorRead::Found(current) => compare_anchor(&stored, ¤t),
|
||||
AnchorRead::Absent => ChainIdentityOutcome::Mismatch {
|
||||
detail: format!(
|
||||
"channel serves no block {ANCHOR_BLOCK_ID}, but the store holds anchor {}",
|
||||
stored.header.hash
|
||||
),
|
||||
},
|
||||
AnchorRead::Unreadable => ChainIdentityOutcome::Consistent,
|
||||
})
|
||||
}
|
||||
|
||||
/// Reads the channel's genesis (first `Block`) from the start of the channel.
|
||||
/// Reads the channel's anchor block (first `Block` with id [`ANCHOR_BLOCK_ID`])
|
||||
/// from the start of the channel.
|
||||
///
|
||||
/// Bedrock can be slow to serve the channel right after boot, so we allow a
|
||||
/// generous timeout. Returns `None` if the channel has no block yet, the read
|
||||
/// errors, or the timeout elapses — the check is best-effort and must never
|
||||
/// fail startup; a genuine reset is still caught by the per-block park logic.
|
||||
async fn fetch_channel_genesis(&self) -> Result<Option<Block>> {
|
||||
const GENESIS_FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(180);
|
||||
/// generous timeout. `Unreadable` on error/timeout keeps startup best-effort
|
||||
/// (never refuse on a transient L1 hiccup); `Absent` means the channel has no
|
||||
/// anchor in its finalized history, which the caller treats as a reset.
|
||||
async fn fetch_channel_anchor(&self) -> Result<AnchorRead> {
|
||||
const ANCHOR_FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(180);
|
||||
let fetch = async {
|
||||
let stream = self.zone_indexer.next_messages(None).await?;
|
||||
let mut stream = std::pin::pin!(stream);
|
||||
while let Some((msg, _slot)) = stream.next().await {
|
||||
if let ZoneMessage::Block(zone_block) = msg {
|
||||
let block: Block = borsh::from_slice(&zone_block.data)
|
||||
.context("Failed to deserialize channel genesis block")?;
|
||||
return Ok::<Option<Block>, anyhow::Error>(Some(block));
|
||||
let ZoneMessage::Block(zone_block) = msg else {
|
||||
continue;
|
||||
};
|
||||
let block: Block = borsh::from_slice(&zone_block.data)
|
||||
.context("Failed to deserialize channel block")?;
|
||||
if block.header.block_id == ANCHOR_BLOCK_ID {
|
||||
return Ok::<AnchorRead, anyhow::Error>(AnchorRead::Found(block));
|
||||
}
|
||||
if block.header.block_id > ANCHOR_BLOCK_ID {
|
||||
break; // blocks arrive in order: we passed the anchor without seeing it
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
Ok(AnchorRead::Absent)
|
||||
};
|
||||
match tokio::time::timeout(GENESIS_FETCH_TIMEOUT, fetch).await {
|
||||
Ok(Ok(maybe_block)) => Ok(maybe_block),
|
||||
// A read error (e.g. bedrock briefly unreachable at boot) must not refuse
|
||||
// startup — skip the check and proceed, consistent with the ingest loop.
|
||||
match tokio::time::timeout(ANCHOR_FETCH_TIMEOUT, fetch).await {
|
||||
Ok(Ok(read)) => Ok(read),
|
||||
Ok(Err(err)) => {
|
||||
warn!(
|
||||
"Failed to read channel genesis for the consistency check; proceeding: {err:#}"
|
||||
"Failed to read channel anchor for the consistency check; proceeding: {err:#}"
|
||||
);
|
||||
Ok(None)
|
||||
Ok(AnchorRead::Unreadable)
|
||||
}
|
||||
Err(_elapsed) => {
|
||||
warn!("Timed out reading channel genesis for the consistency check; proceeding");
|
||||
Ok(None)
|
||||
warn!("Timed out reading channel anchor for the consistency check; proceeding");
|
||||
Ok(AnchorRead::Unreadable)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -282,61 +326,47 @@ impl IndexerCore {
|
||||
}
|
||||
}
|
||||
|
||||
/// Pure comparison: a mismatch requires BOTH a stored and a channel genesis whose
|
||||
/// hashes differ; any missing side is treated as Consistent (nothing to act on).
|
||||
fn compare_genesis(stored: Option<&Block>, current: Option<&Block>) -> GenesisOutcome {
|
||||
match (stored, current) {
|
||||
(Some(s), Some(c)) if s.header.hash != c.header.hash => GenesisOutcome::Mismatch {
|
||||
stored: s.header.hash,
|
||||
current: c.header.hash,
|
||||
},
|
||||
_ => GenesisOutcome::Consistent,
|
||||
/// Pure comparison of two anchor blocks: a mismatch is differing hashes. The
|
||||
/// missing-side cases are handled upstream (`Absent`/`Unreadable`).
|
||||
fn compare_anchor(stored: &Block, current: &Block) -> ChainIdentityOutcome {
|
||||
if stored.header.hash == current.header.hash {
|
||||
ChainIdentityOutcome::Consistent
|
||||
} else {
|
||||
ChainIdentityOutcome::Mismatch {
|
||||
detail: format!(
|
||||
"stored anchor {} != channel anchor {}",
|
||||
stored.header.hash, current.header.hash
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod genesis_check_tests {
|
||||
mod chain_identity_tests {
|
||||
use common::{HashType, block::Block, test_utils::produce_dummy_block};
|
||||
|
||||
use super::{GenesisOutcome, compare_genesis};
|
||||
use super::{ANCHOR_BLOCK_ID, ChainIdentityOutcome, compare_anchor};
|
||||
|
||||
fn genesis_with_prev(prev_seed: u8) -> Block {
|
||||
produce_dummy_block(1, Some(HashType([prev_seed; 32])), vec![])
|
||||
fn anchor_with_prev(prev_seed: u8) -> Block {
|
||||
produce_dummy_block(ANCHOR_BLOCK_ID, Some(HashType([prev_seed; 32])), vec![])
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn matching_genesis_is_consistent() {
|
||||
let g = genesis_with_prev(1);
|
||||
fn matching_anchor_is_consistent() {
|
||||
let a = anchor_with_prev(1);
|
||||
assert!(matches!(
|
||||
compare_genesis(Some(&g), Some(&g)),
|
||||
GenesisOutcome::Consistent
|
||||
compare_anchor(&a, &a),
|
||||
ChainIdentityOutcome::Consistent
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn differing_genesis_is_mismatch() {
|
||||
let stored = genesis_with_prev(1);
|
||||
let current = genesis_with_prev(2);
|
||||
fn differing_anchor_is_mismatch() {
|
||||
let stored = anchor_with_prev(1);
|
||||
let current = anchor_with_prev(2);
|
||||
assert!(matches!(
|
||||
compare_genesis(Some(&stored), Some(¤t)),
|
||||
GenesisOutcome::Mismatch { .. }
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn missing_either_side_is_consistent() {
|
||||
let g = genesis_with_prev(1);
|
||||
assert!(matches!(
|
||||
compare_genesis(None, Some(&g)),
|
||||
GenesisOutcome::Consistent
|
||||
));
|
||||
assert!(matches!(
|
||||
compare_genesis(Some(&g), None),
|
||||
GenesisOutcome::Consistent
|
||||
));
|
||||
assert!(matches!(
|
||||
compare_genesis(None, None),
|
||||
GenesisOutcome::Consistent
|
||||
compare_anchor(&stored, ¤t),
|
||||
ChainIdentityOutcome::Mismatch { .. }
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@ -114,7 +114,7 @@ unsafe fn setup_indexer(
|
||||
|
||||
let allow_reset = config.allow_chain_reset;
|
||||
let core = runtime
|
||||
.block_on(IndexerCore::new_with_genesis_check(
|
||||
.block_on(IndexerCore::new_with_chain_check(
|
||||
config,
|
||||
&storage_dir,
|
||||
allow_reset,
|
||||
|
||||
@ -3,5 +3,6 @@
|
||||
"bedrock_config": {
|
||||
"addr": "http://host.docker.internal:18080"
|
||||
},
|
||||
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101"
|
||||
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
|
||||
"allow_chain_reset": true
|
||||
}
|
||||
|
||||
@ -21,7 +21,7 @@ pub struct IndexerService {
|
||||
impl IndexerService {
|
||||
pub async fn new(config: IndexerConfig, storage_dir: &Path) -> Result<Self> {
|
||||
let allow_reset = config.allow_chain_reset;
|
||||
let indexer = IndexerCore::new_with_genesis_check(config, storage_dir, allow_reset).await?;
|
||||
let indexer = IndexerCore::new_with_chain_check(config, storage_dir, allow_reset).await?;
|
||||
let subscription_service = SubscriptionService::spawn_new(indexer.clone());
|
||||
|
||||
Ok(Self {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user