From 00939ae6d6b0bb5efae5ddb18c3837912578121d Mon Sep 17 00:00:00 2001 From: erhant Date: Fri, 26 Jun 2026 15:34:17 +0300 Subject: [PATCH] refactor(indexer): use "stall reason" instead of "chain breaker" --- lez/indexer/core/src/block_store.rs | 88 +++++++++---------- lez/indexer/core/src/ingest_error.rs | 2 +- lez/indexer/core/src/lib.rs | 8 +- .../src/{chain_breaker.rs => stall_reason.rs} | 7 +- lez/indexer/core/src/status.rs | 24 ++--- lez/storage/src/indexer/indexer_cells.rs | 24 ++--- lez/storage/src/indexer/mod.rs | 4 +- lez/storage/src/indexer/read_once.rs | 10 +-- lez/storage/src/indexer/write_non_atomic.rs | 6 +- 9 files changed, 85 insertions(+), 88 deletions(-) rename lez/indexer/core/src/{chain_breaker.rs => stall_reason.rs} (79%) diff --git a/lez/indexer/core/src/block_store.rs b/lez/indexer/core/src/block_store.rs index 6fe3b229..b49c4bc9 100644 --- a/lez/indexer/core/src/block_store.rs +++ b/lez/indexer/core/src/block_store.rs @@ -14,7 +14,7 @@ use logos_blockchain_zone_sdk::Slot; use storage::indexer::RocksDBIO; use tokio::sync::RwLock; -use crate::{chain_breaker::ChainBreaker, ingest_error::BlockIngestError}; +use crate::{ingest_error::BlockIngestError, stall_reason::StallReason}; struct Tip { block_id: u64, @@ -25,7 +25,7 @@ struct Tip { pub enum AcceptOutcome { /// Chained and applied; tip and L1 read cursor both advance. Applied, - /// Did not chain or failed to apply; tip stays frozen, breaker recorded. + /// Did not chain or failed to apply; tip stays frozen, stall recorded. Parked(BlockIngestError), } @@ -129,18 +129,18 @@ impl IndexerStore { Ok(()) } - pub fn get_chain_breaker(&self) -> Result> { - let Some(bytes) = self.dbio.get_chain_breaker_bytes()? else { + pub fn get_stall_reason(&self) -> Result> { + let Some(bytes) = self.dbio.get_stall_reason_bytes()? else { return Ok(None); }; - let breaker: Option = - serde_json::from_slice(&bytes).context("Failed to deserialize stored chain breaker")?; - Ok(breaker) + let stall: Option = + serde_json::from_slice(&bytes).context("Failed to deserialize stored stall reason")?; + Ok(stall) } - pub fn set_chain_breaker(&self, breaker: &Option) -> Result<()> { - let bytes = serde_json::to_vec(breaker).context("Failed to serialize chain breaker")?; - self.dbio.put_chain_breaker_bytes(&bytes)?; + pub fn set_stall_reason(&self, stall: &Option) -> Result<()> { + let bytes = serde_json::to_vec(stall).context("Failed to serialize stall reason")?; + self.dbio.put_stall_reason_bytes(&bytes)?; Ok(()) } @@ -219,20 +219,20 @@ impl IndexerStore { Ok(None) } - /// Records the chain breaker: the first break is stored verbatim; subsequent + /// Records the stall reason: the first break is stored verbatim; subsequent /// breaks only bump `orphans_since`, preserving the original cause. - fn record_break( + fn record_stall( &self, header: Option<&BlockHeader>, l1_slot: serde_json::Value, error: BlockIngestError, ) -> Result<()> { - let breaker = match self.get_chain_breaker()? { + let stall = match self.get_stall_reason()? { Some(mut existing) => { existing.orphans_since = existing.orphans_since.saturating_add(1); existing } - None => ChainBreaker { + None => StallReason { block_id: header.map(|h| h.block_id), block_hash: header.map(|h| h.hash), prev_block_hash: header.map(|h| h.prev_block_hash), @@ -242,35 +242,35 @@ impl IndexerStore { orphans_since: 0, }, }; - self.set_chain_breaker(&Some(breaker)) + self.set_stall_reason(&Some(stall)) } - /// Records a breaker for an inscription that could not even be parsed. - pub fn record_deserialize_break( + /// Records a stall for an inscription that could not even be parsed. + pub fn record_deserialize_stall( &self, l1_slot: serde_json::Value, error: String, ) -> Result<()> { - self.record_break(None, l1_slot, BlockIngestError::Deserialize(error)) + self.record_stall(None, l1_slot, BlockIngestError::Deserialize(error)) } /// Validates `block` against the tip and, if it chains, applies it atomically /// (scratch clone, commit only on full success) and advances the tip. On any - /// failure records the breaker and returns `Parked` without touching state. + /// failure records the stall and returns `Parked` without touching state. pub async fn accept_block( &self, block: &Block, l1_slot: serde_json::Value, ) -> Result { if let Some(err) = self.acceptance_error(block)? { - self.record_break(Some(&block.header), l1_slot, err.clone())?; + self.record_stall(Some(&block.header), l1_slot, err.clone())?; return Ok(AcceptOutcome::Parked(err)); } // TODO: we use scratch state to be atomic, but need to revisit how expensive a clone is let mut scratch = self.current_state.read().await.clone(); if let Err(err) = apply_block_to_scratch(block, &mut scratch) { - self.record_break(Some(&block.header), l1_slot, err.clone())?; + self.record_stall(Some(&block.header), l1_slot, err.clone())?; return Ok(AcceptOutcome::Parked(err)); } @@ -278,13 +278,13 @@ impl IndexerStore { stored.bedrock_status = BedrockStatus::Finalized; if let Err(err) = self.dbio.put_block(&stored, [0_u8; 32]) { let ingest_err = BlockIngestError::Storage(err.to_string()); - self.record_break(Some(&block.header), l1_slot, ingest_err.clone())?; + self.record_stall(Some(&block.header), l1_slot, ingest_err.clone())?; return Ok(AcceptOutcome::Parked(ingest_err)); } // Commit in-memory state (infallible) only after the DB write succeeded. *self.current_state.write().await = scratch; - self.set_chain_breaker(&None)?; + self.set_stall_reason(&None)?; Ok(AcceptOutcome::Applied) } @@ -407,20 +407,20 @@ fn apply_block_to_scratch(block: &Block, state: &mut V03State) -> Result<(), Blo } #[cfg(test)] -mod chain_breaker_tests { +mod stall_reason_tests { use common::HashType; use super::*; - use crate::{chain_breaker::ChainBreaker, ingest_error::BlockIngestError}; + use crate::{ingest_error::BlockIngestError, stall_reason::StallReason}; #[tokio::test] - async fn chain_breaker_roundtrips_and_clears() { + async fn stall_reason_roundtrips_and_clears() { let dir = tempfile::tempdir().expect("tempdir"); let store = IndexerStore::open_db(dir.path()).expect("open store"); - assert!(store.get_chain_breaker().expect("get").is_none()); + assert!(store.get_stall_reason().expect("get").is_none()); - let breaker = ChainBreaker { + let stall = StallReason { block_id: Some(7), block_hash: Some(HashType([1_u8; 32])), prev_block_hash: Some(HashType([2_u8; 32])), @@ -429,17 +429,15 @@ mod chain_breaker_tests { first_seen: Some(99), orphans_since: 3, }; - store - .set_chain_breaker(&Some(breaker)) - .expect("set breaker"); + store.set_stall_reason(&Some(stall)).expect("set stall"); - let got = store.get_chain_breaker().expect("get").expect("present"); + let got = store.get_stall_reason().expect("get").expect("present"); assert_eq!(got.block_id, Some(7)); assert_eq!(got.orphans_since, 3); assert!(matches!(got.error, BlockIngestError::StateTransition(_))); - store.set_chain_breaker(&None).expect("clear"); - assert!(store.get_chain_breaker().expect("get").is_none()); + store.set_stall_reason(&None).expect("clear"); + assert!(store.get_stall_reason().expect("get").is_none()); } } @@ -606,9 +604,9 @@ mod accept_tests { got: 2 }) )); - let breaker = store.get_chain_breaker().expect("get").expect("present"); - assert_eq!(breaker.block_id, Some(2)); - assert_eq!(breaker.orphans_since, 0); + let stall = store.get_stall_reason().expect("get").expect("present"); + assert_eq!(stall.block_id, Some(2)); + assert_eq!(stall.orphans_since, 0); } #[tokio::test] @@ -645,22 +643,22 @@ mod accept_tests { .await .expect("accept"); - let breaker = store.get_chain_breaker().expect("get").expect("present"); - assert_eq!(breaker.block_id, Some(2), "first breaker preserved"); - assert_eq!(breaker.orphans_since, 1, "second break counted as orphan"); + let stall = store.get_stall_reason().expect("get").expect("present"); + assert_eq!(stall.block_id, Some(2), "first stall preserved"); + assert_eq!(stall.orphans_since, 1, "second break counted as orphan"); } #[tokio::test] - async fn deserialize_break_records_breaker_without_header() { + async fn deserialize_break_records_stall_without_header() { let dir = tempfile::tempdir().expect("tempdir"); let store = IndexerStore::open_db(dir.path()).expect("open store"); store - .record_deserialize_break(serde_json::Value::Null, "bad bytes".to_owned()) + .record_deserialize_stall(serde_json::Value::Null, "bad bytes".to_owned()) .expect("record"); - let breaker = store.get_chain_breaker().expect("get").expect("present"); - assert_eq!(breaker.block_id, None); - assert!(matches!(breaker.error, BlockIngestError::Deserialize(_))); + let stall = store.get_stall_reason().expect("get").expect("present"); + assert_eq!(stall.block_id, None); + assert!(matches!(stall.error, BlockIngestError::Deserialize(_))); } } diff --git a/lez/indexer/core/src/ingest_error.rs b/lez/indexer/core/src/ingest_error.rs index 4240a84c..76693a8e 100644 --- a/lez/indexer/core/src/ingest_error.rs +++ b/lez/indexer/core/src/ingest_error.rs @@ -2,7 +2,7 @@ use common::HashType; use serde::{Deserialize, Serialize}; /// Why the indexer could not apply an L2 block from the channel. Stored inside a -/// [`crate::chain_breaker::ChainBreaker`] and surfaced on the status snapshot. +/// [`crate::stall_reason::StallReason`] and surfaced on the status snapshot. #[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)] #[serde(rename_all = "camelCase")] pub enum BlockIngestError { diff --git a/lez/indexer/core/src/lib.rs b/lez/indexer/core/src/lib.rs index b2cf077a..88f9feae 100644 --- a/lez/indexer/core/src/lib.rs +++ b/lez/indexer/core/src/lib.rs @@ -2,7 +2,6 @@ use std::{path::Path, sync::Arc}; use anyhow::Result; use arc_swap::ArcSwap; -pub use chain_breaker::ChainBreaker; use common::block::Block; // ToDo: Remove after testnet use futures::StreamExt as _; @@ -12,6 +11,7 @@ use logos_blockchain_core::header::HeaderId; use logos_blockchain_zone_sdk::{ CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer, }; +pub use stall_reason::StallReason; use crate::{ block_store::IndexerStore, @@ -19,9 +19,9 @@ use crate::{ status::{IndexerStatus, IndexerSyncStatus}, }; pub mod block_store; -pub mod chain_breaker; pub mod config; pub mod ingest_error; +pub mod stall_reason; pub mod status; #[derive(Clone)] @@ -62,11 +62,11 @@ impl IndexerCore { pub fn status(&self) -> IndexerStatus { let sync = IndexerSyncStatus::clone(&self.status.load()); let indexed_block_id = self.store.get_last_block_id().ok().flatten(); - let chain_breaker = self.store.get_chain_breaker().ok().flatten(); + let stall_reason = self.store.get_stall_reason().ok().flatten(); IndexerStatus { sync, indexed_block_id, - chain_breaker, + stall_reason, } } diff --git a/lez/indexer/core/src/chain_breaker.rs b/lez/indexer/core/src/stall_reason.rs similarity index 79% rename from lez/indexer/core/src/chain_breaker.rs rename to lez/indexer/core/src/stall_reason.rs index 7aacbd79..273093d8 100644 --- a/lez/indexer/core/src/chain_breaker.rs +++ b/lez/indexer/core/src/stall_reason.rs @@ -5,19 +5,20 @@ use crate::ingest_error::BlockIngestError; /// Diagnostic record of the first block that broke the L2 chain. /// -/// Later non-chaining blocks (orphans, since the tip is frozen) only bump `orphans_since`. -/// /// The block-derived fields are `None` for a deserialize break (no header was /// ever parsed). `l1_slot` is the zone-sdk cursor position captured as JSON. /// `first_seen` is the breaking block's L2 timestamp (`None` for a deserialize break). #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub struct ChainBreaker { +pub struct StallReason { pub block_id: Option, pub block_hash: Option, pub prev_block_hash: Option, pub l1_slot: serde_json::Value, pub error: BlockIngestError, pub first_seen: Option, + /// Number of later non-chaining blocks (orphans, since the tip is frozen). + /// + /// TODO: We could store a different "branch" of blocks following this sta pub orphans_since: u64, } diff --git a/lez/indexer/core/src/status.rs b/lez/indexer/core/src/status.rs index 954cfa50..d4f6d918 100644 --- a/lez/indexer/core/src/status.rs +++ b/lez/indexer/core/src/status.rs @@ -1,6 +1,6 @@ use serde::Serialize; -use crate::chain_breaker::ChainBreaker; +use crate::stall_reason::StallReason; /// Coarse lifecycle state of the indexer's ingestion loop, so a client can tell /// "still catching up" apart from "something went wrong". @@ -15,8 +15,8 @@ pub enum IndexerSyncState { CaughtUp, /// The last cycle failed (e.g. the L1 node is unreachable). See `last_error`. Error, - /// Parked on a chain breaker: the validated tip is frozen awaiting a valid - /// continuation. See `last_error` and the snapshot's `chain_breaker`. + /// Parked on a stall reason: the validated tip is frozen awaiting a valid + /// continuation. See `last_error` and the snapshot's `stall_reason`. Stalled, } @@ -62,8 +62,8 @@ impl IndexerSyncStatus { } } - /// Parked on a chain breaker; `reason` mirrors the breaker's error message. - /// The full breaker is attached to the [`IndexerStatus`] snapshot. + /// Parked on a stall reason; `reason` mirrors the stall's error message. + /// The full stall is attached to the [`IndexerStatus`] snapshot. pub(crate) const fn stalled(reason: String) -> Self { Self { state: IndexerSyncState::Stalled, @@ -83,7 +83,7 @@ pub struct IndexerStatus { #[serde(flatten)] pub sync: IndexerSyncStatus, pub indexed_block_id: Option, - pub chain_breaker: Option, + pub stall_reason: Option, } #[cfg(test)] @@ -95,7 +95,7 @@ mod tests { let status = IndexerStatus { sync: IndexerSyncStatus::error("boom".to_owned()), indexed_block_id: Some(7), - chain_breaker: None, + stall_reason: None, }; let value = serde_json::to_value(&status).expect("serialize"); assert_eq!( @@ -104,7 +104,7 @@ mod tests { "state": "error", "lastError": "boom", "indexedBlockId": 7, - "chainBreaker": null, + "stallReason": null, }) ); } @@ -119,13 +119,13 @@ mod tests { } #[test] - fn stalled_status_serializes_with_breaker() { - use crate::{chain_breaker::ChainBreaker, ingest_error::BlockIngestError}; + fn stalled_status_serializes_with_stall_reason() { + use crate::{ingest_error::BlockIngestError, stall_reason::StallReason}; let status = IndexerStatus { sync: IndexerSyncStatus::stalled("broken chain link".to_owned()), indexed_block_id: Some(41), - chain_breaker: Some(ChainBreaker { + stall_reason: Some(StallReason { block_id: Some(42), block_hash: None, prev_block_hash: None, @@ -139,6 +139,6 @@ mod tests { assert_eq!(value["state"], serde_json::json!("stalled")); assert_eq!(value["lastError"], serde_json::json!("broken chain link")); assert_eq!(value["indexedBlockId"], serde_json::json!(41)); - assert_eq!(value["chainBreaker"]["orphansSince"], serde_json::json!(2)); + assert_eq!(value["stallReason"]["orphansSince"], serde_json::json!(2)); } } diff --git a/lez/storage/src/indexer/indexer_cells.rs b/lez/storage/src/indexer/indexer_cells.rs index 84379a1d..89e01a43 100644 --- a/lez/storage/src/indexer/indexer_cells.rs +++ b/lez/storage/src/indexer/indexer_cells.rs @@ -7,8 +7,8 @@ use crate::{ error::DbError, indexer::{ ACC_NUM_CELL_NAME, BLOCK_HASH_CELL_NAME, BREAKPOINT_CELL_NAME, CF_ACC_META, - CF_BREAKPOINT_NAME, CF_HASH_TO_ID, CF_TX_TO_ID, DB_META_CHAIN_BREAKER_KEY, - DB_META_LAST_BREAKPOINT_ID, DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY, + CF_BREAKPOINT_NAME, CF_HASH_TO_ID, CF_TX_TO_ID, DB_META_LAST_BREAKPOINT_ID, + DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY, DB_META_STALL_REASON_KEY, DB_META_ZONE_SDK_INDEXER_CURSOR_KEY, TX_HASH_CELL_NAME, }, }; @@ -247,36 +247,36 @@ impl SimpleWritableCell for ZoneSdkIndexerCursorCellRef<'_> { } } -/// Opaque JSON bytes for the indexer's persisted `Option`. +/// Opaque JSON bytes for the indexer's persisted `Option`. /// Serialized via `serde_json` by the caller (mirrors the zone-sdk cursor cell). #[derive(BorshDeserialize)] -pub struct ChainBreakerCellOwned(pub Vec); +pub struct StallReasonCellOwned(pub Vec); -impl SimpleStorableCell for ChainBreakerCellOwned { +impl SimpleStorableCell for StallReasonCellOwned { type KeyParams = (); - const CELL_NAME: &'static str = DB_META_CHAIN_BREAKER_KEY; + const CELL_NAME: &'static str = DB_META_STALL_REASON_KEY; const CF_NAME: &'static str = CF_META_NAME; } -impl SimpleReadableCell for ChainBreakerCellOwned {} +impl SimpleReadableCell for StallReasonCellOwned {} #[derive(BorshSerialize)] -pub struct ChainBreakerCellRef<'bytes>(pub &'bytes [u8]); +pub struct StallReasonCellRef<'bytes>(pub &'bytes [u8]); -impl SimpleStorableCell for ChainBreakerCellRef<'_> { +impl SimpleStorableCell for StallReasonCellRef<'_> { type KeyParams = (); - const CELL_NAME: &'static str = DB_META_CHAIN_BREAKER_KEY; + const CELL_NAME: &'static str = DB_META_STALL_REASON_KEY; const CF_NAME: &'static str = CF_META_NAME; } -impl SimpleWritableCell for ChainBreakerCellRef<'_> { +impl SimpleWritableCell for StallReasonCellRef<'_> { fn value_constructor(&self) -> DbResult> { borsh::to_vec(&self).map_err(|err| { DbError::borsh_cast_message( err, - Some("Failed to serialize chain breaker cell".to_owned()), + Some("Failed to serialize stall reason cell".to_owned()), ) }) } diff --git a/lez/storage/src/indexer/mod.rs b/lez/storage/src/indexer/mod.rs index 97362e8b..e93f86c0 100644 --- a/lez/storage/src/indexer/mod.rs +++ b/lez/storage/src/indexer/mod.rs @@ -24,8 +24,8 @@ pub const DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY: &str = pub const DB_META_LAST_BREAKPOINT_ID: &str = "last_breakpoint_id"; /// Key base for storing the zone-sdk indexer cursor (opaque bytes). pub const DB_META_ZONE_SDK_INDEXER_CURSOR_KEY: &str = "zone_sdk_indexer_cursor"; -/// Key base for storing the persisted `Option` diagnostic record (opaque JSON bytes). -pub const DB_META_CHAIN_BREAKER_KEY: &str = "chain_breaker"; +/// Key base for storing the persisted `Option` diagnostic record (opaque JSON bytes). +pub const DB_META_STALL_REASON_KEY: &str = "stall_reason"; /// Cell name for a breakpoint. pub const BREAKPOINT_CELL_NAME: &str = "breakpoint"; diff --git a/lez/storage/src/indexer/read_once.rs b/lez/storage/src/indexer/read_once.rs index 777bbf58..591f8405 100644 --- a/lez/storage/src/indexer/read_once.rs +++ b/lez/storage/src/indexer/read_once.rs @@ -3,8 +3,8 @@ use crate::{ DBIO as _, cells::shared_cells::{BlockCell, FirstBlockCell, FirstBlockSetCell, LastBlockCell}, indexer::indexer_cells::{ - AccNumTxCell, BlockHashToBlockIdMapCell, BreakpointCellOwned, ChainBreakerCellOwned, - LastBreakpointIdCell, LastObservedL1LibHeaderCell, TxHashToBlockIdMapCell, + AccNumTxCell, BlockHashToBlockIdMapCell, BreakpointCellOwned, LastBreakpointIdCell, + LastObservedL1LibHeaderCell, StallReasonCellOwned, TxHashToBlockIdMapCell, ZoneSdkIndexerCursorCellOwned, }, }; @@ -75,9 +75,7 @@ impl RocksDBIO { .map(|cell| cell.0)) } - pub fn get_chain_breaker_bytes(&self) -> DbResult>> { - Ok(self - .get_opt::(())? - .map(|cell| cell.0)) + pub fn get_stall_reason_bytes(&self) -> DbResult>> { + Ok(self.get_opt::(())?.map(|cell| cell.0)) } } diff --git a/lez/storage/src/indexer/write_non_atomic.rs b/lez/storage/src/indexer/write_non_atomic.rs index 555c5efb..215250b7 100644 --- a/lez/storage/src/indexer/write_non_atomic.rs +++ b/lez/storage/src/indexer/write_non_atomic.rs @@ -3,7 +3,7 @@ use crate::{ DBIO as _, cells::shared_cells::{FirstBlockSetCell, LastBlockCell}, indexer::indexer_cells::{ - BreakpointCellRef, ChainBreakerCellRef, LastBreakpointIdCell, LastObservedL1LibHeaderCell, + BreakpointCellRef, LastBreakpointIdCell, LastObservedL1LibHeaderCell, StallReasonCellRef, ZoneSdkIndexerCursorCellRef, }, }; @@ -35,8 +35,8 @@ impl RocksDBIO { self.put(&ZoneSdkIndexerCursorCellRef(bytes), ()) } - pub fn put_chain_breaker_bytes(&self, bytes: &[u8]) -> DbResult<()> { - self.put(&ChainBreakerCellRef(bytes), ()) + pub fn put_stall_reason_bytes(&self, bytes: &[u8]) -> DbResult<()> { + self.put(&StallReasonCellRef(bytes), ()) } // State