indexer stream

This commit is contained in:
Petar Radovic 2026-04-29 11:06:05 +02:00
parent 6bcbe058ed
commit 81a9ec401c

View File

@ -5,7 +5,7 @@ use common::block::{Block, HashableBlockData};
// ToDo: Remove after testnet
use common::{HashType, PINATA_BASE58};
use futures::StreamExt as _;
use log::{error, info, warn};
use log::{error, info};
use logos_blockchain_core::header::HeaderId;
use logos_blockchain_zone_sdk::{
CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
@ -15,6 +15,10 @@ use testnet_initial_state::initial_state_testnet;
use crate::{block_store::IndexerStore, config::IndexerConfig};
// TODO: persist & restore cursor (e.g. in rocksdb) so restarts don't have to
// re-process the channel from the beginning. Mirrors the sequencer checkpoint
// TODO in `block_publisher.rs`.
pub mod block_store;
pub mod config;
@ -105,47 +109,59 @@ impl IndexerCore {
})
}
pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> {
pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> + '_ {
let poll_interval = self.config.consensus_info_polling_interval;
async_stream::stream! {
info!("Starting zone-sdk indexer using follow()");
// In-memory only; not persisted across restarts (see top-of-file TODO).
let mut cursor = None;
let follow_stream = match self.zone_indexer.follow().await {
Ok(s) => s,
Err(e) => {
error!("Failed to start zone-sdk follow stream: {e}");
return;
}
};
info!("Starting indexer from beginning of channel");
let mut follow_stream = std::pin::pin!(follow_stream);
while let Some(zone_message) = follow_stream.next().await {
let zone_block = match zone_message {
ZoneMessage::Block(b) => b,
ZoneMessage::Deposit(_) | ZoneMessage::Withdraw(_) => continue,
};
let block: Block = match borsh::from_slice(&zone_block.data) {
Ok(b) => b,
Err(e) => {
error!("Failed to deserialize L2 block from zone-sdk: {e}");
loop {
let stream = match self.zone_indexer.next_messages(cursor).await {
Ok(s) => s,
Err(err) => {
error!("Failed to start zone-sdk next_messages stream: {err}");
tokio::time::sleep(poll_interval).await;
continue;
}
};
let mut stream = std::pin::pin!(stream);
info!("Indexed L2 block {}", block.header.block_id);
while let Some((msg, slot)) = stream.next().await {
let zone_block = match msg {
ZoneMessage::Block(b) => b,
// Non-block messages don't carry a cursor position; the
// next ZoneBlock advances past them implicitly.
ZoneMessage::Deposit(_) | ZoneMessage::Withdraw(_) => continue,
};
// TODO: Remove l1_header placeholder once storage layer
// no longer requires it. Zone-sdk handles L1 tracking internally.
let placeholder_l1_header = HeaderId::from([0_u8; 32]);
let block: Block = match borsh::from_slice(&zone_block.data) {
Ok(b) => b,
Err(e) => {
error!("Failed to deserialize L2 block from zone-sdk: {e}");
cursor = Some((zone_block.id, slot));
continue;
}
};
if let Err(err) = self.store.put_block(block.clone(), placeholder_l1_header).await {
error!("Failed to store block {}: {err:#}", block.header.block_id);
info!("Indexed L2 block {}", block.header.block_id);
// TODO: Remove l1_header placeholder once storage layer
// no longer requires it. Zone-sdk handles L1 tracking internally.
let placeholder_l1_header = HeaderId::from([0_u8; 32]);
if let Err(err) = self.store.put_block(block.clone(), placeholder_l1_header).await {
error!("Failed to store block {}: {err:#}", block.header.block_id);
}
cursor = Some((zone_block.id, slot));
yield Ok(block);
}
yield Ok(block);
// Stream ended (caught up to LIB). Sleep then poll again.
tokio::time::sleep(poll_interval).await;
}
warn!("zone-sdk follow stream ended");
}
}
}