From 81a9ec401c687bbfb6311603348dc3fb6d32124b Mon Sep 17 00:00:00 2001 From: Petar Radovic Date: Wed, 29 Apr 2026 11:06:05 +0200 Subject: [PATCH] indexer stream --- indexer/core/src/lib.rs | 76 +++++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 30 deletions(-) diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 96f4b673..936343a5 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -5,7 +5,7 @@ use common::block::{Block, HashableBlockData}; // ToDo: Remove after testnet use common::{HashType, PINATA_BASE58}; use futures::StreamExt as _; -use log::{error, info, warn}; +use log::{error, info}; use logos_blockchain_core::header::HeaderId; use logos_blockchain_zone_sdk::{ CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer, @@ -15,6 +15,10 @@ use testnet_initial_state::initial_state_testnet; use crate::{block_store::IndexerStore, config::IndexerConfig}; +// TODO: persist & restore cursor (e.g. in rocksdb) so restarts don't have to +// re-process the channel from the beginning. Mirrors the sequencer checkpoint +// TODO in `block_publisher.rs`. + pub mod block_store; pub mod config; @@ -105,47 +109,59 @@ impl IndexerCore { }) } - pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream> { + pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream> + '_ { + let poll_interval = self.config.consensus_info_polling_interval; + async_stream::stream! { - info!("Starting zone-sdk indexer using follow()"); + // In-memory only; not persisted across restarts (see top-of-file TODO). + let mut cursor = None; - let follow_stream = match self.zone_indexer.follow().await { - Ok(s) => s, - Err(e) => { - error!("Failed to start zone-sdk follow stream: {e}"); - return; - } - }; + info!("Starting indexer from beginning of channel"); - let mut follow_stream = std::pin::pin!(follow_stream); - - while let Some(zone_message) = follow_stream.next().await { - let zone_block = match zone_message { - ZoneMessage::Block(b) => b, - ZoneMessage::Deposit(_) | ZoneMessage::Withdraw(_) => continue, - }; - let block: Block = match borsh::from_slice(&zone_block.data) { - Ok(b) => b, - Err(e) => { - error!("Failed to deserialize L2 block from zone-sdk: {e}"); + loop { + let stream = match self.zone_indexer.next_messages(cursor).await { + Ok(s) => s, + Err(err) => { + error!("Failed to start zone-sdk next_messages stream: {err}"); + tokio::time::sleep(poll_interval).await; continue; } }; + let mut stream = std::pin::pin!(stream); - info!("Indexed L2 block {}", block.header.block_id); + while let Some((msg, slot)) = stream.next().await { + let zone_block = match msg { + ZoneMessage::Block(b) => b, + // Non-block messages don't carry a cursor position; the + // next ZoneBlock advances past them implicitly. + ZoneMessage::Deposit(_) | ZoneMessage::Withdraw(_) => continue, + }; - // TODO: Remove l1_header placeholder once storage layer - // no longer requires it. Zone-sdk handles L1 tracking internally. - let placeholder_l1_header = HeaderId::from([0_u8; 32]); + let block: Block = match borsh::from_slice(&zone_block.data) { + Ok(b) => b, + Err(e) => { + error!("Failed to deserialize L2 block from zone-sdk: {e}"); + cursor = Some((zone_block.id, slot)); + continue; + } + }; - if let Err(err) = self.store.put_block(block.clone(), placeholder_l1_header).await { - error!("Failed to store block {}: {err:#}", block.header.block_id); + info!("Indexed L2 block {}", block.header.block_id); + + // TODO: Remove l1_header placeholder once storage layer + // no longer requires it. Zone-sdk handles L1 tracking internally. + let placeholder_l1_header = HeaderId::from([0_u8; 32]); + if let Err(err) = self.store.put_block(block.clone(), placeholder_l1_header).await { + error!("Failed to store block {}: {err:#}", block.header.block_id); + } + + cursor = Some((zone_block.id, slot)); + yield Ok(block); } - yield Ok(block); + // Stream ended (caught up to LIB). Sleep then poll again. + tokio::time::sleep(poll_interval).await; } - - warn!("zone-sdk follow stream ended"); } } }