2026-03-09 12:31:49 +00:00
|
|
|
use std::sync::Arc;
|
2026-02-12 14:27:36 +02:00
|
|
|
|
2026-01-27 09:46:31 +02:00
|
|
|
use anyhow::Result;
|
2026-04-10 20:23:25 +03:00
|
|
|
use common::block::Block;
|
2026-02-03 11:36:07 +02:00
|
|
|
// ToDo: Remove after testnet
|
2026-03-09 12:31:49 +00:00
|
|
|
use futures::StreamExt as _;
|
2026-04-29 14:05:23 +02:00
|
|
|
use log::{error, info, warn};
|
2026-03-09 12:31:49 +00:00
|
|
|
use logos_blockchain_core::header::HeaderId;
|
2026-04-29 10:33:07 +02:00
|
|
|
use logos_blockchain_zone_sdk::{
|
|
|
|
|
CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
|
2026-01-12 15:51:24 +02:00
|
|
|
};
|
2026-01-09 15:10:38 +02:00
|
|
|
|
2026-02-03 11:36:07 +02:00
|
|
|
use crate::{block_store::IndexerStore, config::IndexerConfig};
|
2026-01-12 15:51:24 +02:00
|
|
|
|
2026-02-03 11:36:07 +02:00
|
|
|
pub mod block_store;
|
2026-01-12 15:51:24 +02:00
|
|
|
pub mod config;
|
|
|
|
|
|
2026-01-30 21:37:27 +03:00
|
|
|
#[derive(Clone)]
|
2026-01-09 15:10:38 +02:00
|
|
|
pub struct IndexerCore {
|
2026-04-29 10:33:07 +02:00
|
|
|
pub zone_indexer: Arc<ZoneIndexer<NodeHttpClient>>,
|
2026-02-03 11:36:07 +02:00
|
|
|
pub config: IndexerConfig,
|
|
|
|
|
pub store: IndexerStore,
|
2026-01-09 15:10:38 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl IndexerCore {
|
2026-01-29 22:20:42 +03:00
|
|
|
pub fn new(config: IndexerConfig) -> Result<Self> {
|
2026-02-13 13:59:06 +02:00
|
|
|
let home = config.home.join("rocksdb");
|
2026-02-03 11:36:07 +02:00
|
|
|
|
2026-04-29 14:05:23 +02:00
|
|
|
let basic_auth = config.bedrock_config.auth.clone().map(Into::into);
|
2026-04-29 10:33:07 +02:00
|
|
|
let node = NodeHttpClient::new(
|
|
|
|
|
CommonHttpClient::new(basic_auth),
|
2026-04-29 14:05:23 +02:00
|
|
|
config.bedrock_config.addr.clone(),
|
2026-03-09 12:31:49 +00:00
|
|
|
);
|
2026-04-29 10:33:07 +02:00
|
|
|
let zone_indexer = ZoneIndexer::new(config.channel_id, node);
|
2026-03-09 12:31:49 +00:00
|
|
|
|
2026-01-12 15:51:24 +02:00
|
|
|
Ok(Self {
|
2026-03-09 12:31:49 +00:00
|
|
|
zone_indexer: Arc::new(zone_indexer),
|
2026-01-12 15:51:24 +02:00
|
|
|
config,
|
2026-04-10 20:23:25 +03:00
|
|
|
store: IndexerStore::open_db(&home)?,
|
2026-01-12 15:51:24 +02:00
|
|
|
})
|
2026-01-09 15:10:38 +02:00
|
|
|
}
|
2026-01-12 15:51:24 +02:00
|
|
|
|
2026-04-29 11:06:05 +02:00
|
|
|
pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> + '_ {
|
|
|
|
|
let poll_interval = self.config.consensus_info_polling_interval;
|
2026-04-29 14:05:23 +02:00
|
|
|
let initial_cursor = self
|
|
|
|
|
.store
|
|
|
|
|
.get_zone_cursor()
|
|
|
|
|
.expect("Failed to load zone-sdk indexer cursor");
|
2026-01-30 21:37:27 +03:00
|
|
|
|
2026-04-29 11:06:05 +02:00
|
|
|
async_stream::stream! {
|
2026-04-29 14:05:23 +02:00
|
|
|
let mut cursor = initial_cursor;
|
2026-02-12 14:27:36 +02:00
|
|
|
|
2026-04-29 14:05:23 +02:00
|
|
|
if cursor.is_some() {
|
|
|
|
|
info!("Resuming indexer from cursor {cursor:?}");
|
|
|
|
|
} else {
|
|
|
|
|
info!("Starting indexer from beginning of channel");
|
|
|
|
|
}
|
2026-02-12 14:27:36 +02:00
|
|
|
|
2026-04-29 11:06:05 +02:00
|
|
|
loop {
|
|
|
|
|
let stream = match self.zone_indexer.next_messages(cursor).await {
|
|
|
|
|
Ok(s) => s,
|
|
|
|
|
Err(err) => {
|
|
|
|
|
error!("Failed to start zone-sdk next_messages stream: {err}");
|
|
|
|
|
tokio::time::sleep(poll_interval).await;
|
2026-03-09 12:31:49 +00:00
|
|
|
continue;
|
2026-01-16 16:15:21 +02:00
|
|
|
}
|
2026-03-03 23:21:08 +03:00
|
|
|
};
|
2026-04-29 11:06:05 +02:00
|
|
|
let mut stream = std::pin::pin!(stream);
|
|
|
|
|
|
|
|
|
|
while let Some((msg, slot)) = stream.next().await {
|
|
|
|
|
let zone_block = match msg {
|
|
|
|
|
ZoneMessage::Block(b) => b,
|
|
|
|
|
// Non-block messages don't carry a cursor position; the
|
|
|
|
|
// next ZoneBlock advances past them implicitly.
|
|
|
|
|
ZoneMessage::Deposit(_) | ZoneMessage::Withdraw(_) => continue,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let block: Block = match borsh::from_slice(&zone_block.data) {
|
|
|
|
|
Ok(b) => b,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
error!("Failed to deserialize L2 block from zone-sdk: {e}");
|
2026-04-29 14:05:23 +02:00
|
|
|
// Advance past the broken inscription so we don't
|
|
|
|
|
// re-process it on restart.
|
2026-04-29 11:06:05 +02:00
|
|
|
cursor = Some((zone_block.id, slot));
|
2026-04-29 14:05:23 +02:00
|
|
|
if let Err(err) = self.store.set_zone_cursor(&(zone_block.id, slot)) {
|
|
|
|
|
warn!("Failed to persist indexer cursor: {err:#}");
|
|
|
|
|
}
|
2026-04-29 11:06:05 +02:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
info!("Indexed L2 block {}", block.header.block_id);
|
|
|
|
|
|
|
|
|
|
// TODO: Remove l1_header placeholder once storage layer
|
|
|
|
|
// no longer requires it. Zone-sdk handles L1 tracking internally.
|
|
|
|
|
let placeholder_l1_header = HeaderId::from([0_u8; 32]);
|
|
|
|
|
if let Err(err) = self.store.put_block(block.clone(), placeholder_l1_header).await {
|
|
|
|
|
error!("Failed to store block {}: {err:#}", block.header.block_id);
|
|
|
|
|
}
|
2026-02-16 17:54:54 +02:00
|
|
|
|
2026-04-29 11:06:05 +02:00
|
|
|
cursor = Some((zone_block.id, slot));
|
2026-04-29 14:05:23 +02:00
|
|
|
if let Err(err) = self.store.set_zone_cursor(&(zone_block.id, slot)) {
|
|
|
|
|
warn!("Failed to persist indexer cursor: {err:#}");
|
|
|
|
|
}
|
2026-04-29 11:06:05 +02:00
|
|
|
yield Ok(block);
|
2026-02-24 12:27:47 +02:00
|
|
|
}
|
2026-02-16 17:54:54 +02:00
|
|
|
|
2026-04-29 11:06:05 +02:00
|
|
|
// Stream ended (caught up to LIB). Sleep then poll again.
|
|
|
|
|
tokio::time::sleep(poll_interval).await;
|
2026-02-16 17:54:54 +02:00
|
|
|
}
|
2026-01-16 16:15:21 +02:00
|
|
|
}
|
2026-01-12 15:51:24 +02:00
|
|
|
}
|
2026-02-04 14:57:38 +02:00
|
|
|
}
|