diff --git a/lez/indexer/core/src/block_store.rs b/lez/indexer/core/src/block_store.rs index 617c4c60..a101bf94 100644 --- a/lez/indexer/core/src/block_store.rs +++ b/lez/indexer/core/src/block_store.rs @@ -13,6 +13,8 @@ use logos_blockchain_zone_sdk::Slot; use storage::indexer::RocksDBIO; use tokio::sync::RwLock; +use crate::chain_breaker::ChainBreaker; + #[derive(Clone)] pub struct IndexerStore { dbio: Arc, @@ -113,6 +115,21 @@ impl IndexerStore { Ok(()) } + pub fn get_chain_breaker(&self) -> Result> { + let Some(bytes) = self.dbio.get_chain_breaker_bytes()? else { + return Ok(None); + }; + let breaker: Option = + serde_json::from_slice(&bytes).context("Failed to deserialize stored chain breaker")?; + Ok(breaker) + } + + pub fn set_chain_breaker(&self, breaker: &Option) -> Result<()> { + let bytes = serde_json::to_vec(breaker).context("Failed to serialize chain breaker")?; + self.dbio.put_chain_breaker_bytes(&bytes)?; + Ok(()) + } + /// Recalculation of final state directly from DB. /// /// Used for indexer healthcheck. @@ -197,6 +214,43 @@ impl IndexerStore { } } +#[cfg(test)] +mod chain_breaker_tests { + use common::HashType; + + use super::*; + use crate::{chain_breaker::ChainBreaker, ingest_error::BlockIngestError}; + + #[tokio::test] + async fn chain_breaker_roundtrips_and_clears() { + let dir = tempfile::tempdir().expect("tempdir"); + let store = IndexerStore::open_db(dir.path()).expect("open store"); + + assert!(store.get_chain_breaker().expect("get").is_none()); + + let breaker = ChainBreaker { + block_id: Some(7), + block_hash: Some(HashType([1_u8; 32])), + prev_block_hash: Some(HashType([2_u8; 32])), + l1_slot: serde_json::Value::Null, + error: BlockIngestError::StateTransition("boom".to_owned()), + first_seen: Some(99), + orphans_since: 3, + }; + store + .set_chain_breaker(&Some(breaker)) + .expect("set breaker"); + + let got = store.get_chain_breaker().expect("get").expect("present"); + assert_eq!(got.block_id, Some(7)); + assert_eq!(got.orphans_since, 3); + assert!(matches!(got.error, BlockIngestError::StateTransition(_))); + + store.set_chain_breaker(&None).expect("clear"); + assert!(store.get_chain_breaker().expect("get").is_none()); + } +} + #[cfg(test)] mod tests { use common::{HashType, block::HashableBlockData}; diff --git a/lez/indexer/core/src/chain_breaker.rs b/lez/indexer/core/src/chain_breaker.rs new file mode 100644 index 00000000..7aacbd79 --- /dev/null +++ b/lez/indexer/core/src/chain_breaker.rs @@ -0,0 +1,23 @@ +use common::HashType; +use serde::{Deserialize, Serialize}; + +use crate::ingest_error::BlockIngestError; + +/// Diagnostic record of the first block that broke the L2 chain. +/// +/// Later non-chaining blocks (orphans, since the tip is frozen) only bump `orphans_since`. +/// +/// The block-derived fields are `None` for a deserialize break (no header was +/// ever parsed). `l1_slot` is the zone-sdk cursor position captured as JSON. +/// `first_seen` is the breaking block's L2 timestamp (`None` for a deserialize break). +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ChainBreaker { + pub block_id: Option, + pub block_hash: Option, + pub prev_block_hash: Option, + pub l1_slot: serde_json::Value, + pub error: BlockIngestError, + pub first_seen: Option, + pub orphans_since: u64, +} diff --git a/lez/indexer/core/src/lib.rs b/lez/indexer/core/src/lib.rs index 35ff3c6b..a48712fd 100644 --- a/lez/indexer/core/src/lib.rs +++ b/lez/indexer/core/src/lib.rs @@ -2,6 +2,7 @@ use std::{path::Path, sync::Arc}; use anyhow::Result; use arc_swap::ArcSwap; +pub use chain_breaker::ChainBreaker; use common::block::Block; // ToDo: Remove after testnet use futures::StreamExt as _; @@ -18,6 +19,7 @@ use crate::{ status::{IndexerStatus, IndexerSyncStatus}, }; pub mod block_store; +pub mod chain_breaker; pub mod config; pub mod ingest_error; pub mod status; diff --git a/lez/storage/src/indexer/indexer_cells.rs b/lez/storage/src/indexer/indexer_cells.rs index b19a5510..84379a1d 100644 --- a/lez/storage/src/indexer/indexer_cells.rs +++ b/lez/storage/src/indexer/indexer_cells.rs @@ -7,9 +7,9 @@ use crate::{ error::DbError, indexer::{ ACC_NUM_CELL_NAME, BLOCK_HASH_CELL_NAME, BREAKPOINT_CELL_NAME, CF_ACC_META, - CF_BREAKPOINT_NAME, CF_HASH_TO_ID, CF_TX_TO_ID, DB_META_LAST_BREAKPOINT_ID, - DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY, DB_META_ZONE_SDK_INDEXER_CURSOR_KEY, - TX_HASH_CELL_NAME, + CF_BREAKPOINT_NAME, CF_HASH_TO_ID, CF_TX_TO_ID, DB_META_CHAIN_BREAKER_KEY, + DB_META_LAST_BREAKPOINT_ID, DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY, + DB_META_ZONE_SDK_INDEXER_CURSOR_KEY, TX_HASH_CELL_NAME, }, }; @@ -247,6 +247,41 @@ impl SimpleWritableCell for ZoneSdkIndexerCursorCellRef<'_> { } } +/// Opaque JSON bytes for the indexer's persisted `Option`. +/// Serialized via `serde_json` by the caller (mirrors the zone-sdk cursor cell). +#[derive(BorshDeserialize)] +pub struct ChainBreakerCellOwned(pub Vec); + +impl SimpleStorableCell for ChainBreakerCellOwned { + type KeyParams = (); + + const CELL_NAME: &'static str = DB_META_CHAIN_BREAKER_KEY; + const CF_NAME: &'static str = CF_META_NAME; +} + +impl SimpleReadableCell for ChainBreakerCellOwned {} + +#[derive(BorshSerialize)] +pub struct ChainBreakerCellRef<'bytes>(pub &'bytes [u8]); + +impl SimpleStorableCell for ChainBreakerCellRef<'_> { + type KeyParams = (); + + const CELL_NAME: &'static str = DB_META_CHAIN_BREAKER_KEY; + const CF_NAME: &'static str = CF_META_NAME; +} + +impl SimpleWritableCell for ChainBreakerCellRef<'_> { + fn value_constructor(&self) -> DbResult> { + borsh::to_vec(&self).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize chain breaker cell".to_owned()), + ) + }) + } +} + #[cfg(test)] mod uniform_tests { use crate::{ diff --git a/lez/storage/src/indexer/mod.rs b/lez/storage/src/indexer/mod.rs index f74a9c64..97362e8b 100644 --- a/lez/storage/src/indexer/mod.rs +++ b/lez/storage/src/indexer/mod.rs @@ -24,6 +24,8 @@ pub const DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY: &str = pub const DB_META_LAST_BREAKPOINT_ID: &str = "last_breakpoint_id"; /// Key base for storing the zone-sdk indexer cursor (opaque bytes). pub const DB_META_ZONE_SDK_INDEXER_CURSOR_KEY: &str = "zone_sdk_indexer_cursor"; +/// Key base for storing the persisted `Option` diagnostic record (opaque JSON bytes). +pub const DB_META_CHAIN_BREAKER_KEY: &str = "chain_breaker"; /// Cell name for a breakpoint. pub const BREAKPOINT_CELL_NAME: &str = "breakpoint"; diff --git a/lez/storage/src/indexer/read_once.rs b/lez/storage/src/indexer/read_once.rs index 6e79adc4..777bbf58 100644 --- a/lez/storage/src/indexer/read_once.rs +++ b/lez/storage/src/indexer/read_once.rs @@ -3,8 +3,9 @@ use crate::{ DBIO as _, cells::shared_cells::{BlockCell, FirstBlockCell, FirstBlockSetCell, LastBlockCell}, indexer::indexer_cells::{ - AccNumTxCell, BlockHashToBlockIdMapCell, BreakpointCellOwned, LastBreakpointIdCell, - LastObservedL1LibHeaderCell, TxHashToBlockIdMapCell, ZoneSdkIndexerCursorCellOwned, + AccNumTxCell, BlockHashToBlockIdMapCell, BreakpointCellOwned, ChainBreakerCellOwned, + LastBreakpointIdCell, LastObservedL1LibHeaderCell, TxHashToBlockIdMapCell, + ZoneSdkIndexerCursorCellOwned, }, }; @@ -73,4 +74,10 @@ impl RocksDBIO { .get_opt::(())? .map(|cell| cell.0)) } + + pub fn get_chain_breaker_bytes(&self) -> DbResult>> { + Ok(self + .get_opt::(())? + .map(|cell| cell.0)) + } } diff --git a/lez/storage/src/indexer/write_non_atomic.rs b/lez/storage/src/indexer/write_non_atomic.rs index 7ddab1dd..555c5efb 100644 --- a/lez/storage/src/indexer/write_non_atomic.rs +++ b/lez/storage/src/indexer/write_non_atomic.rs @@ -3,7 +3,7 @@ use crate::{ DBIO as _, cells::shared_cells::{FirstBlockSetCell, LastBlockCell}, indexer::indexer_cells::{ - BreakpointCellRef, LastBreakpointIdCell, LastObservedL1LibHeaderCell, + BreakpointCellRef, ChainBreakerCellRef, LastBreakpointIdCell, LastObservedL1LibHeaderCell, ZoneSdkIndexerCursorCellRef, }, }; @@ -35,6 +35,10 @@ impl RocksDBIO { self.put(&ZoneSdkIndexerCursorCellRef(bytes), ()) } + pub fn put_chain_breaker_bytes(&self, bytes: &[u8]) -> DbResult<()> { + self.put(&ChainBreakerCellRef(bytes), ()) + } + // State pub fn put_breakpoint(&self, br_id: u64, breakpoint: &V03State) -> DbResult<()> {