feat(indexer): persist ChainBreaker in RocksDB meta

test `RISC0_DEV_MODE=1 RISC0_SKIP_BUILD=1 cargo test -p storage -p indexer_core --lib chain_breaker`
This commit is contained in:
erhant 2026-06-26 14:25:02 +03:00
parent 16b0011abd
commit 48a41fe1cf
7 changed files with 133 additions and 6 deletions

View File

@ -13,6 +13,8 @@ use logos_blockchain_zone_sdk::Slot;
use storage::indexer::RocksDBIO;
use tokio::sync::RwLock;
use crate::chain_breaker::ChainBreaker;
#[derive(Clone)]
pub struct IndexerStore {
dbio: Arc<RocksDBIO>,
@ -113,6 +115,21 @@ impl IndexerStore {
Ok(())
}
pub fn get_chain_breaker(&self) -> Result<Option<ChainBreaker>> {
let Some(bytes) = self.dbio.get_chain_breaker_bytes()? else {
return Ok(None);
};
let breaker: Option<ChainBreaker> =
serde_json::from_slice(&bytes).context("Failed to deserialize stored chain breaker")?;
Ok(breaker)
}
pub fn set_chain_breaker(&self, breaker: &Option<ChainBreaker>) -> Result<()> {
let bytes = serde_json::to_vec(breaker).context("Failed to serialize chain breaker")?;
self.dbio.put_chain_breaker_bytes(&bytes)?;
Ok(())
}
/// Recalculation of final state directly from DB.
///
/// Used for indexer healthcheck.
@ -197,6 +214,43 @@ impl IndexerStore {
}
}
#[cfg(test)]
mod chain_breaker_tests {
use common::HashType;
use super::*;
use crate::{chain_breaker::ChainBreaker, ingest_error::BlockIngestError};
#[tokio::test]
async fn chain_breaker_roundtrips_and_clears() {
let dir = tempfile::tempdir().expect("tempdir");
let store = IndexerStore::open_db(dir.path()).expect("open store");
assert!(store.get_chain_breaker().expect("get").is_none());
let breaker = ChainBreaker {
block_id: Some(7),
block_hash: Some(HashType([1_u8; 32])),
prev_block_hash: Some(HashType([2_u8; 32])),
l1_slot: serde_json::Value::Null,
error: BlockIngestError::StateTransition("boom".to_owned()),
first_seen: Some(99),
orphans_since: 3,
};
store
.set_chain_breaker(&Some(breaker))
.expect("set breaker");
let got = store.get_chain_breaker().expect("get").expect("present");
assert_eq!(got.block_id, Some(7));
assert_eq!(got.orphans_since, 3);
assert!(matches!(got.error, BlockIngestError::StateTransition(_)));
store.set_chain_breaker(&None).expect("clear");
assert!(store.get_chain_breaker().expect("get").is_none());
}
}
#[cfg(test)]
mod tests {
use common::{HashType, block::HashableBlockData};

View File

@ -0,0 +1,23 @@
use common::HashType;
use serde::{Deserialize, Serialize};
use crate::ingest_error::BlockIngestError;
/// Diagnostic record of the first block that broke the L2 chain.
///
/// Later non-chaining blocks (orphans, since the tip is frozen) only bump `orphans_since`.
///
/// The block-derived fields are `None` for a deserialize break (no header was
/// ever parsed). `l1_slot` is the zone-sdk cursor position captured as JSON.
/// `first_seen` is the breaking block's L2 timestamp (`None` for a deserialize break).
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ChainBreaker {
pub block_id: Option<u64>,
pub block_hash: Option<HashType>,
pub prev_block_hash: Option<HashType>,
pub l1_slot: serde_json::Value,
pub error: BlockIngestError,
pub first_seen: Option<u64>,
pub orphans_since: u64,
}

View File

@ -2,6 +2,7 @@ use std::{path::Path, sync::Arc};
use anyhow::Result;
use arc_swap::ArcSwap;
pub use chain_breaker::ChainBreaker;
use common::block::Block;
// ToDo: Remove after testnet
use futures::StreamExt as _;
@ -18,6 +19,7 @@ use crate::{
status::{IndexerStatus, IndexerSyncStatus},
};
pub mod block_store;
pub mod chain_breaker;
pub mod config;
pub mod ingest_error;
pub mod status;

View File

@ -7,9 +7,9 @@ use crate::{
error::DbError,
indexer::{
ACC_NUM_CELL_NAME, BLOCK_HASH_CELL_NAME, BREAKPOINT_CELL_NAME, CF_ACC_META,
CF_BREAKPOINT_NAME, CF_HASH_TO_ID, CF_TX_TO_ID, DB_META_LAST_BREAKPOINT_ID,
DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY, DB_META_ZONE_SDK_INDEXER_CURSOR_KEY,
TX_HASH_CELL_NAME,
CF_BREAKPOINT_NAME, CF_HASH_TO_ID, CF_TX_TO_ID, DB_META_CHAIN_BREAKER_KEY,
DB_META_LAST_BREAKPOINT_ID, DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY,
DB_META_ZONE_SDK_INDEXER_CURSOR_KEY, TX_HASH_CELL_NAME,
},
};
@ -247,6 +247,41 @@ impl SimpleWritableCell for ZoneSdkIndexerCursorCellRef<'_> {
}
}
/// Opaque JSON bytes for the indexer's persisted `Option<ChainBreaker>`.
/// Serialized via `serde_json` by the caller (mirrors the zone-sdk cursor cell).
#[derive(BorshDeserialize)]
pub struct ChainBreakerCellOwned(pub Vec<u8>);
impl SimpleStorableCell for ChainBreakerCellOwned {
type KeyParams = ();
const CELL_NAME: &'static str = DB_META_CHAIN_BREAKER_KEY;
const CF_NAME: &'static str = CF_META_NAME;
}
impl SimpleReadableCell for ChainBreakerCellOwned {}
#[derive(BorshSerialize)]
pub struct ChainBreakerCellRef<'bytes>(pub &'bytes [u8]);
impl SimpleStorableCell for ChainBreakerCellRef<'_> {
type KeyParams = ();
const CELL_NAME: &'static str = DB_META_CHAIN_BREAKER_KEY;
const CF_NAME: &'static str = CF_META_NAME;
}
impl SimpleWritableCell for ChainBreakerCellRef<'_> {
fn value_constructor(&self) -> DbResult<Vec<u8>> {
borsh::to_vec(&self).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize chain breaker cell".to_owned()),
)
})
}
}
#[cfg(test)]
mod uniform_tests {
use crate::{

View File

@ -24,6 +24,8 @@ pub const DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY: &str =
pub const DB_META_LAST_BREAKPOINT_ID: &str = "last_breakpoint_id";
/// Key base for storing the zone-sdk indexer cursor (opaque bytes).
pub const DB_META_ZONE_SDK_INDEXER_CURSOR_KEY: &str = "zone_sdk_indexer_cursor";
/// Key base for storing the persisted `Option<ChainBreaker>` diagnostic record (opaque JSON bytes).
pub const DB_META_CHAIN_BREAKER_KEY: &str = "chain_breaker";
/// Cell name for a breakpoint.
pub const BREAKPOINT_CELL_NAME: &str = "breakpoint";

View File

@ -3,8 +3,9 @@ use crate::{
DBIO as _,
cells::shared_cells::{BlockCell, FirstBlockCell, FirstBlockSetCell, LastBlockCell},
indexer::indexer_cells::{
AccNumTxCell, BlockHashToBlockIdMapCell, BreakpointCellOwned, LastBreakpointIdCell,
LastObservedL1LibHeaderCell, TxHashToBlockIdMapCell, ZoneSdkIndexerCursorCellOwned,
AccNumTxCell, BlockHashToBlockIdMapCell, BreakpointCellOwned, ChainBreakerCellOwned,
LastBreakpointIdCell, LastObservedL1LibHeaderCell, TxHashToBlockIdMapCell,
ZoneSdkIndexerCursorCellOwned,
},
};
@ -73,4 +74,10 @@ impl RocksDBIO {
.get_opt::<ZoneSdkIndexerCursorCellOwned>(())?
.map(|cell| cell.0))
}
pub fn get_chain_breaker_bytes(&self) -> DbResult<Option<Vec<u8>>> {
Ok(self
.get_opt::<ChainBreakerCellOwned>(())?
.map(|cell| cell.0))
}
}

View File

@ -3,7 +3,7 @@ use crate::{
DBIO as _,
cells::shared_cells::{FirstBlockSetCell, LastBlockCell},
indexer::indexer_cells::{
BreakpointCellRef, LastBreakpointIdCell, LastObservedL1LibHeaderCell,
BreakpointCellRef, ChainBreakerCellRef, LastBreakpointIdCell, LastObservedL1LibHeaderCell,
ZoneSdkIndexerCursorCellRef,
},
};
@ -35,6 +35,10 @@ impl RocksDBIO {
self.put(&ZoneSdkIndexerCursorCellRef(bytes), ())
}
pub fn put_chain_breaker_bytes(&self, bytes: &[u8]) -> DbResult<()> {
self.put(&ChainBreakerCellRef(bytes), ())
}
// State
pub fn put_breakpoint(&self, br_id: u64, breakpoint: &V03State) -> DbResult<()> {