checkpoints

This commit is contained in:
Petar Radovic 2026-04-29 14:05:23 +02:00
parent 7aecc4faba
commit 798b89839c
17 changed files with 225 additions and 50 deletions

View File

@ -64,9 +64,7 @@ clean:
@echo "🧹 Cleaning run artifacts"
rm -rf sequencer/service/bedrock_signing_key
rm -rf sequencer/service/rocksdb
rm -rf sequencer/service/zone_sdk_checkpoint.json
rm -rf indexer/service/rocksdb
rm -rf indexer/service/zone_sdk_indexer_cursor.json
rm -rf wallet/configs/debug/storage.json
rm -rf rocksdb
cd bedrock && docker compose down -v

View File

@ -1,12 +1,8 @@
{
"home": "./indexer/service",
"consensus_info_polling_interval": "1s",
"bedrock_client_config": {
"addr": "http://logos-blockchain-node-0:18080",
"backoff": {
"start_delay": "100ms",
"max_retries": 5
}
"bedrock_config": {
"addr": "http://logos-blockchain-node-0:18080"
},
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"initial_accounts": [

View File

@ -1,11 +1,12 @@
use std::{path::Path, sync::Arc};
use anyhow::Result;
use anyhow::{Context as _, Result};
use common::{
block::{BedrockStatus, Block},
transaction::{NSSATransaction, clock_invocation},
};
use logos_blockchain_core::header::HeaderId;
use logos_blockchain_core::{header::HeaderId, mantle::ops::channel::MsgId};
use logos_blockchain_zone_sdk::Slot;
use nssa::{Account, AccountId, V03State};
use nssa_core::BlockId;
use storage::indexer::RocksDBIO;
@ -103,6 +104,22 @@ impl IndexerStore {
Ok(self.dbio.calculate_state_for_id(block_id)?)
}
pub fn get_zone_cursor(&self) -> Result<Option<(MsgId, Slot)>> {
let Some(bytes) = self.dbio.get_zone_sdk_indexer_cursor_bytes()? else {
return Ok(None);
};
let cursor: (MsgId, Slot) = serde_json::from_slice(&bytes)
.context("Failed to deserialize stored zone-sdk indexer cursor")?;
Ok(Some(cursor))
}
pub fn set_zone_cursor(&self, cursor: &(MsgId, Slot)) -> Result<()> {
let bytes =
serde_json::to_vec(cursor).context("Failed to serialize zone-sdk indexer cursor")?;
self.dbio.put_zone_sdk_indexer_cursor_bytes(&bytes)?;
Ok(())
}
/// Recalculation of final state directly from DB.
///
/// Used for indexer healthcheck.

View File

@ -28,7 +28,7 @@ pub struct IndexerConfig {
pub signing_key: [u8; 32],
#[serde(with = "humantime_serde")]
pub consensus_info_polling_interval: Duration,
pub bedrock_client_config: ClientConfig,
pub bedrock_config: ClientConfig,
pub channel_id: ChannelId,
#[serde(skip_serializing_if = "Option::is_none")]
pub initial_public_accounts: Option<Vec<PublicAccountPublicInitialData>>,

View File

@ -5,7 +5,7 @@ use common::block::{Block, HashableBlockData};
// ToDo: Remove after testnet
use common::{HashType, PINATA_BASE58};
use futures::StreamExt as _;
use log::{error, info};
use log::{error, info, warn};
use logos_blockchain_core::header::HeaderId;
use logos_blockchain_zone_sdk::{
CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
@ -15,10 +15,6 @@ use testnet_initial_state::initial_state_testnet;
use crate::{block_store::IndexerStore, config::IndexerConfig};
// TODO: persist & restore cursor (e.g. in rocksdb) so restarts don't have to
// re-process the channel from the beginning. Mirrors the sequencer checkpoint
// TODO in `block_publisher.rs`.
pub mod block_store;
pub mod config;
@ -95,10 +91,10 @@ impl IndexerCore {
let home = config.home.join("rocksdb");
let basic_auth = config.bedrock_client_config.auth.clone().map(Into::into);
let basic_auth = config.bedrock_config.auth.clone().map(Into::into);
let node = NodeHttpClient::new(
CommonHttpClient::new(basic_auth),
config.bedrock_client_config.addr.clone(),
config.bedrock_config.addr.clone(),
);
let zone_indexer = ZoneIndexer::new(config.channel_id, node);
@ -111,12 +107,19 @@ impl IndexerCore {
pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> + '_ {
let poll_interval = self.config.consensus_info_polling_interval;
let initial_cursor = self
.store
.get_zone_cursor()
.expect("Failed to load zone-sdk indexer cursor");
async_stream::stream! {
// In-memory only; not persisted across restarts (see top-of-file TODO).
let mut cursor = None;
let mut cursor = initial_cursor;
info!("Starting indexer from beginning of channel");
if cursor.is_some() {
info!("Resuming indexer from cursor {cursor:?}");
} else {
info!("Starting indexer from beginning of channel");
}
loop {
let stream = match self.zone_indexer.next_messages(cursor).await {
@ -141,7 +144,12 @@ impl IndexerCore {
Ok(b) => b,
Err(e) => {
error!("Failed to deserialize L2 block from zone-sdk: {e}");
// Advance past the broken inscription so we don't
// re-process it on restart.
cursor = Some((zone_block.id, slot));
if let Err(err) = self.store.set_zone_cursor(&(zone_block.id, slot)) {
warn!("Failed to persist indexer cursor: {err:#}");
}
continue;
}
};
@ -156,6 +164,9 @@ impl IndexerCore {
}
cursor = Some((zone_block.id, slot));
if let Err(err) = self.store.set_zone_cursor(&(zone_block.id, slot)) {
warn!("Failed to persist indexer cursor: {err:#}");
}
yield Ok(block);
}

View File

@ -1,12 +1,8 @@
{
"home": ".",
"consensus_info_polling_interval": "1s",
"bedrock_client_config": {
"addr": "http://localhost:8080",
"backoff": {
"start_delay": "100ms",
"max_retries": 5
}
"bedrock_config": {
"addr": "http://localhost:8080"
},
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"initial_accounts": [

View File

@ -226,7 +226,7 @@ pub fn indexer_config(
Ok(IndexerConfig {
home,
consensus_info_polling_interval: Duration::from_secs(1),
bedrock_client_config: ClientConfig {
bedrock_config: ClientConfig {
addr: addr_to_url(UrlProtocol::Http, bedrock_addr)
.context("Failed to convert bedrock addr to URL")?,
auth: None,

View File

@ -4,21 +4,28 @@ use anyhow::{Context as _, Result, anyhow};
use common::block::Block;
pub use logos_blockchain_core::mantle::ops::channel::MsgId;
pub use logos_blockchain_key_management_system_service::keys::Ed25519Key;
pub use logos_blockchain_zone_sdk::sequencer::SequencerCheckpoint;
use logos_blockchain_zone_sdk::{
CommonHttpClient,
adapter::NodeHttpClient,
sequencer::{SequencerConfig as ZoneSdkSequencerConfig, SequencerHandle, ZoneSequencer},
sequencer::{Event, SequencerConfig as ZoneSdkSequencerConfig, SequencerHandle, ZoneSequencer},
};
use tokio::task::JoinHandle;
use crate::config::BedrockConfig;
/// Sink for `Event::Published` checkpoints emitted by the drive task.
/// Caller is responsible for persistence (e.g. writing to rocksdb).
pub type CheckpointSink = Box<dyn Fn(SequencerCheckpoint) + Send + Sync + 'static>;
#[expect(async_fn_in_trait, reason = "We don't care about Send/Sync here")]
pub trait BlockPublisherTrait: Clone {
async fn new(
config: &BedrockConfig,
bedrock_signing_key: Ed25519Key,
resubmit_interval: Duration,
initial_checkpoint: Option<SequencerCheckpoint>,
on_checkpoint: CheckpointSink,
) -> Result<Self>;
/// Fire-and-forget publish. Zone-sdk drives the actual submission and
@ -47,6 +54,8 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
config: &BedrockConfig,
bedrock_signing_key: Ed25519Key,
resubmit_interval: Duration,
initial_checkpoint: Option<SequencerCheckpoint>,
on_checkpoint: CheckpointSink,
) -> Result<Self> {
let basic_auth = config.auth.clone().map(Into::into);
let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), config.node_url.clone());
@ -56,19 +65,19 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
..ZoneSdkSequencerConfig::default()
};
// TODO: persist & restore SequencerCheckpoint via Event::Published listener
// for crash recovery. Always-fresh-start for now.
let (mut sequencer, mut handle) = ZoneSequencer::init_with_config(
config.channel_id,
bedrock_signing_key,
node,
zone_sdk_config,
None,
initial_checkpoint,
);
let drive_task = tokio::spawn(async move {
loop {
sequencer.next_event().await;
if let Some(Event::Published { checkpoint, .. }) = sequencer.next_event().await {
on_checkpoint(checkpoint);
}
}
});

View File

@ -1,16 +1,17 @@
use std::{collections::HashMap, path::Path};
use std::{collections::HashMap, path::Path, sync::Arc};
use anyhow::Result;
use anyhow::{Context as _, Result};
use common::{
HashType,
block::{Block, BlockMeta, MantleMsgId},
transaction::NSSATransaction,
};
use logos_blockchain_zone_sdk::sequencer::SequencerCheckpoint;
use nssa::V03State;
use storage::{error::DbError, sequencer::RocksDBIO};
pub struct SequencerStore {
dbio: RocksDBIO,
dbio: Arc<RocksDBIO>,
// TODO: Consider adding the hashmap to the database for faster recovery.
tx_hash_to_block_map: HashMap<HashType, u64>,
genesis_id: u64,
@ -30,7 +31,11 @@ impl SequencerStore {
) -> Result<Self> {
let tx_hash_to_block_map = block_to_transactions_map(genesis_block);
let dbio = RocksDBIO::open_or_create(location, genesis_block, genesis_msg_id)?;
let dbio = Arc::new(RocksDBIO::open_or_create(
location,
genesis_block,
genesis_msg_id,
)?);
let genesis_id = dbio.get_meta_first_block_in_db()?;
@ -42,6 +47,14 @@ impl SequencerStore {
})
}
/// Shared handle to the underlying rocksdb. Used to persist the zone-sdk
/// checkpoint from the sequencer's drive task without needing &mut to the
/// store.
#[must_use]
pub fn dbio(&self) -> Arc<RocksDBIO> {
Arc::clone(&self.dbio)
}
pub fn get_block_at_id(&self, id: u64) -> Result<Option<Block>, DbError> {
self.dbio.get_block(id)
}
@ -103,6 +116,22 @@ impl SequencerStore {
pub fn get_nssa_state(&self) -> Option<V03State> {
self.dbio.get_nssa_state().ok()
}
pub fn get_zone_checkpoint(&self) -> Result<Option<SequencerCheckpoint>> {
let Some(bytes) = self.dbio.get_zone_sdk_checkpoint_bytes()? else {
return Ok(None);
};
let checkpoint: SequencerCheckpoint = serde_json::from_slice(&bytes)
.context("Failed to deserialize stored zone-sdk checkpoint")?;
Ok(Some(checkpoint))
}
pub fn set_zone_checkpoint(&self, checkpoint: &SequencerCheckpoint) -> Result<()> {
let bytes =
serde_json::to_vec(checkpoint).context("Failed to serialize zone-sdk checkpoint")?;
self.dbio.put_zone_sdk_checkpoint_bytes(&bytes)?;
Ok(())
}
}
pub(crate) fn block_to_transactions_map(block: &Block) -> HashMap<HashType, u64> {

View File

@ -90,22 +90,40 @@ impl<BP: BlockPublisherTrait, IC: IndexerClientTrait> SequencerCore<BP, IC> {
.latest_block_meta()
.expect("Failed to read latest block meta from store");
let initial_checkpoint = store
.get_zone_checkpoint()
.expect("Failed to load zone-sdk checkpoint");
let is_fresh_start = initial_checkpoint.is_none();
let dbio_for_checkpoint = store.dbio();
let on_checkpoint: block_publisher::CheckpointSink = Box::new(move |cp| {
let bytes = match serde_json::to_vec(&cp) {
Ok(b) => b,
Err(err) => {
error!("Failed to serialize zone-sdk checkpoint: {err:#}");
return;
}
};
if let Err(err) = dbio_for_checkpoint.put_zone_sdk_checkpoint_bytes(&bytes) {
error!("Failed to persist zone-sdk checkpoint: {err:#}");
}
});
let block_publisher = BP::new(
&config.bedrock_config,
bedrock_signing_key,
config.retry_pending_blocks_timeout,
initial_checkpoint,
on_checkpoint,
)
.await
.expect("Failed to initialize Block Publisher");
// Publish the genesis block on every startup so the indexer can find the
// channel start. Zone-sdk dedups by msg_id, so re-publishing on restart
// is a no-op once it's already on-chain.
// TODO: persist & restore SequencerCheckpoint so restarts don't have to
// republish anything.
if latest_block_meta.id == config.genesis_id
&& let Err(err) = block_publisher.publish_block(&genesis_block).await
{
// On a truly fresh start (no checkpoint persisted yet), publish the
// genesis block so the indexer can find the channel start. After the
// first publish, zone-sdk's checkpoint persistence covers further
// restarts.
if is_fresh_start && let Err(err) = block_publisher.publish_block(&genesis_block).await {
error!("Failed to publish genesis block: {err:#}");
}

View File

@ -6,7 +6,9 @@ use logos_blockchain_key_management_system_service::keys::Ed25519Key;
use url::Url;
use crate::{
block_publisher::BlockPublisherTrait, config::BedrockConfig, indexer_client::IndexerClientTrait,
block_publisher::{BlockPublisherTrait, CheckpointSink, SequencerCheckpoint},
config::BedrockConfig,
indexer_client::IndexerClientTrait,
};
pub type SequencerCoreWithMockClients = crate::SequencerCore<MockBlockPublisher, MockIndexerClient>;
@ -19,6 +21,8 @@ impl BlockPublisherTrait for MockBlockPublisher {
_config: &BedrockConfig,
_bedrock_signing_key: Ed25519Key,
_resubmit_interval: Duration,
_initial_checkpoint: Option<SequencerCheckpoint>,
_on_checkpoint: CheckpointSink,
) -> Result<Self> {
Ok(Self)
}

View File

@ -8,7 +8,8 @@ use crate::{
indexer::{
ACC_NUM_CELL_NAME, BLOCK_HASH_CELL_NAME, BREAKPOINT_CELL_NAME, CF_ACC_META,
CF_BREAKPOINT_NAME, CF_HASH_TO_ID, CF_TX_TO_ID, DB_META_LAST_BREAKPOINT_ID,
DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY, TX_HASH_CELL_NAME,
DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY, DB_META_ZONE_SDK_INDEXER_CURSOR_KEY,
TX_HASH_CELL_NAME,
},
};
@ -211,6 +212,41 @@ impl SimpleWritableCell for AccNumTxCell {
}
}
/// Opaque bytes for the zone-sdk indexer cursor `Option<(MsgId, Slot)>`.
/// The caller serializes via serde_json (neither type derives borsh).
#[derive(BorshDeserialize)]
pub struct ZoneSdkIndexerCursorCellOwned(pub Vec<u8>);
impl SimpleStorableCell for ZoneSdkIndexerCursorCellOwned {
type KeyParams = ();
const CELL_NAME: &'static str = DB_META_ZONE_SDK_INDEXER_CURSOR_KEY;
const CF_NAME: &'static str = CF_META_NAME;
}
impl SimpleReadableCell for ZoneSdkIndexerCursorCellOwned {}
#[derive(BorshSerialize)]
pub struct ZoneSdkIndexerCursorCellRef<'bytes>(pub &'bytes [u8]);
impl SimpleStorableCell for ZoneSdkIndexerCursorCellRef<'_> {
type KeyParams = ();
const CELL_NAME: &'static str = DB_META_ZONE_SDK_INDEXER_CURSOR_KEY;
const CF_NAME: &'static str = CF_META_NAME;
}
impl SimpleWritableCell for ZoneSdkIndexerCursorCellRef<'_> {
fn value_constructor(&self) -> DbResult<Vec<u8>> {
borsh::to_vec(&self).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize zone-sdk indexer cursor cell".to_owned()),
)
})
}
}
#[cfg(test)]
mod uniform_tests {
use crate::{

View File

@ -22,6 +22,8 @@ pub const DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY: &str =
"last_observed_l1_lib_header_in_db";
/// Key base for storing metainformation about the last breakpoint.
pub const DB_META_LAST_BREAKPOINT_ID: &str = "last_breakpoint_id";
/// Key base for storing the zone-sdk indexer cursor (opaque bytes).
pub const DB_META_ZONE_SDK_INDEXER_CURSOR_KEY: &str = "zone_sdk_indexer_cursor";
/// Cell name for a breakpoint.
pub const BREAKPOINT_CELL_NAME: &str = "breakpoint";

View File

@ -4,7 +4,7 @@ use crate::{
cells::shared_cells::{BlockCell, FirstBlockCell, FirstBlockSetCell, LastBlockCell},
indexer::indexer_cells::{
AccNumTxCell, BlockHashToBlockIdMapCell, BreakpointCellOwned, LastBreakpointIdCell,
LastObservedL1LibHeaderCell, TxHashToBlockIdMapCell,
LastObservedL1LibHeaderCell, TxHashToBlockIdMapCell, ZoneSdkIndexerCursorCellOwned,
},
};
@ -64,4 +64,10 @@ impl RocksDBIO {
self.get_opt::<AccNumTxCell>(acc_id)
.map(|opt| opt.map(|cell| cell.0))
}
pub fn get_zone_sdk_indexer_cursor_bytes(&self) -> DbResult<Option<Vec<u8>>> {
Ok(self
.get_opt::<ZoneSdkIndexerCursorCellOwned>(())?
.map(|cell| cell.0))
}
}

View File

@ -4,6 +4,7 @@ use crate::{
cells::shared_cells::{FirstBlockSetCell, LastBlockCell},
indexer::indexer_cells::{
BreakpointCellRef, LastBreakpointIdCell, LastObservedL1LibHeaderCell,
ZoneSdkIndexerCursorCellRef,
},
};
@ -30,6 +31,10 @@ impl RocksDBIO {
self.put(&FirstBlockSetCell(true), ())
}
pub fn put_zone_sdk_indexer_cursor_bytes(&self, bytes: &[u8]) -> DbResult<()> {
self.put(&ZoneSdkIndexerCursorCellRef(bytes), ())
}
// State
pub fn put_breakpoint(&self, br_id: u64, breakpoint: &V03State) -> DbResult<()> {

View File

@ -12,7 +12,7 @@ use crate::{
error::DbError,
sequencer::sequencer_cells::{
LastFinalizedBlockIdCell, LatestBlockMetaCellOwned, LatestBlockMetaCellRef,
NSSAStateCellOwned, NSSAStateCellRef,
NSSAStateCellOwned, NSSAStateCellRef, ZoneSdkCheckpointCellOwned, ZoneSdkCheckpointCellRef,
},
};
@ -22,6 +22,8 @@ pub mod sequencer_cells;
pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id";
/// Key base for storing metainformation about the latest block meta.
pub const DB_META_LATEST_BLOCK_META_KEY: &str = "latest_block_meta";
/// Key base for storing the zone-sdk sequencer checkpoint (opaque bytes).
pub const DB_META_ZONE_SDK_CHECKPOINT_KEY: &str = "zone_sdk_checkpoint";
/// Key base for storing the NSSA state.
pub const DB_NSSA_STATE_KEY: &str = "nssa_state";
@ -205,6 +207,16 @@ impl RocksDBIO {
self.get::<LatestBlockMetaCellOwned>(()).map(|val| val.0)
}
pub fn get_zone_sdk_checkpoint_bytes(&self) -> DbResult<Option<Vec<u8>>> {
Ok(self
.get_opt::<ZoneSdkCheckpointCellOwned>(())?
.map(|cell| cell.0))
}
pub fn put_zone_sdk_checkpoint_bytes(&self, bytes: &[u8]) -> DbResult<()> {
self.put(&ZoneSdkCheckpointCellRef(bytes), ())
}
pub fn put_block(
&self,
block: &Block,

View File

@ -8,7 +8,7 @@ use crate::{
error::DbError,
sequencer::{
CF_NSSA_STATE_NAME, DB_META_LAST_FINALIZED_BLOCK_ID, DB_META_LATEST_BLOCK_META_KEY,
DB_NSSA_STATE_KEY,
DB_META_ZONE_SDK_CHECKPOINT_KEY, DB_NSSA_STATE_KEY,
},
};
@ -95,6 +95,42 @@ impl SimpleWritableCell for LatestBlockMetaCellRef<'_> {
}
}
/// Opaque bytes for the zone-sdk sequencer checkpoint. The caller is
/// responsible for the actual encoding (we use serde_json since
/// `SequencerCheckpoint` only derives serde, not borsh).
#[derive(BorshDeserialize)]
pub struct ZoneSdkCheckpointCellOwned(pub Vec<u8>);
impl SimpleStorableCell for ZoneSdkCheckpointCellOwned {
type KeyParams = ();
const CELL_NAME: &'static str = DB_META_ZONE_SDK_CHECKPOINT_KEY;
const CF_NAME: &'static str = CF_META_NAME;
}
impl SimpleReadableCell for ZoneSdkCheckpointCellOwned {}
#[derive(BorshSerialize)]
pub struct ZoneSdkCheckpointCellRef<'bytes>(pub &'bytes [u8]);
impl SimpleStorableCell for ZoneSdkCheckpointCellRef<'_> {
type KeyParams = ();
const CELL_NAME: &'static str = DB_META_ZONE_SDK_CHECKPOINT_KEY;
const CF_NAME: &'static str = CF_META_NAME;
}
impl SimpleWritableCell for ZoneSdkCheckpointCellRef<'_> {
fn value_constructor(&self) -> DbResult<Vec<u8>> {
borsh::to_vec(&self).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize zone-sdk checkpoint cell".to_owned()),
)
})
}
}
#[cfg(test)]
mod uniform_tests {
use crate::{