From b6d7f1ecbd315f4049604d6325385f1b3b15a889 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Thu, 12 Feb 2026 14:27:36 +0200 Subject: [PATCH] feat: updated block streaming --- indexer/core/src/block_store.rs | 12 +++- indexer/core/src/lib.rs | 116 ++++++++++++++++++++++++-------- storage/src/indexer.rs | 89 +++++++++++++++++++----- 3 files changed, 171 insertions(+), 46 deletions(-) diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index b5de3896..f4b720c6 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -1,6 +1,7 @@ use std::{path::Path, sync::Arc}; use anyhow::Result; +use bedrock_client::HeaderId; use common::{ block::Block, transaction::{NSSATransaction, execute_check_transaction_on_state, transaction_pre_check}, @@ -34,6 +35,13 @@ impl IndexerStore { Self::open_db_with_genesis(location, None) } + pub fn last_observed_l1_header(&self) -> Result> { + Ok(self + .dbio + .get_meta_last_observed_l1_block_in_db()? + .map(|raw_id| HeaderId::from(raw_id))) + } + pub fn get_last_block_id(&self) -> Result { Ok(self.dbio.get_meta_last_block_in_db()?) } @@ -95,7 +103,7 @@ impl IndexerStore { Ok(self.final_state()?.get_account_by_id(*account_id)) } - pub fn put_block(&self, block: Block) -> Result<()> { + pub fn put_block(&self, block: Block, l1_header: HeaderId) -> Result<()> { let mut final_state = self.dbio.final_state()?; for transaction in &block.body.transactions { @@ -105,6 +113,6 @@ impl IndexerStore { )?; } - Ok(self.dbio.put_block(block)?) + Ok(self.dbio.put_block(block, l1_header.into())?) } } diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index c259406f..294ad905 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -1,3 +1,5 @@ +use std::collections::VecDeque; + use anyhow::Result; use bedrock_client::{BedrockClient, HeaderId}; use common::block::{Block, HashableBlockData}; @@ -78,22 +80,51 @@ impl IndexerCore { pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream> { async_stream::stream! { - loop { - let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); + let last_l1_header = self.store.last_observed_l1_header()?; - info!("Block stream joined"); + let mut last_fin_header = match last_l1_header { + Some(last_l1_header) => { + last_l1_header + }, + None => { + info!("Searching for the start of a channel"); - while let Some(block_info) = stream_pinned.next().await { - let header_id = block_info.header_id; + let start_buff = self.search_for_channel_start().await?; - info!("Observed L1 block at height {}", block_info.height); + let last_l1_header = start_buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id(); - if let Some(l1_block) = self - .bedrock_client - .get_block_by_id(header_id) - .await? - { - info!("Extracted L1 block at height {}", block_info.height); + for l1_block in start_buff { + info!("Observed L1 block at height {}", l1_block.header().slot().into_inner()); + + let curr_l1_header = l1_block.header().id(); + + let l2_blocks_parsed = parse_blocks( + l1_block.into_transactions().into_iter(), + &self.config.channel_id, + ).collect::>(); + + info!("Parsed {} L2 blocks", l2_blocks_parsed.len()); + + for l2_block in l2_blocks_parsed { + self.store.put_block(l2_block.clone(), curr_l1_header)?; + + yield Ok(l2_block); + } + } + + last_l1_header + }, + }; + + loop { + let buff = self.rollback_to_last_known_finalized_l1_id(last_fin_header).await?; + + last_fin_header = buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id(); + + for l1_block in buff { + info!("Observed L1 block at height {}", l1_block.header().slot().into_inner()); + + let curr_l1_header = l1_block.header().id(); let l2_blocks_parsed = parse_blocks( l1_block.into_transactions().into_iter(), @@ -103,19 +134,12 @@ impl IndexerCore { info!("Parsed {} L2 blocks", l2_blocks_parsed.len()); for l2_block in l2_blocks_parsed { - self.store.put_block(l2_block.clone())?; + self.store.put_block(l2_block.clone(), curr_l1_header)?; yield Ok(l2_block); } } } - - // Refetch stream after delay - tokio::time::sleep(std::time::Duration::from_millis( - self.config.resubscribe_interval_millis, - )) - .await; - } } } @@ -130,11 +154,13 @@ impl IndexerCore { pub async fn search_for_channel_start( &self, - channel_id_to_search: &ChannelId, - ) -> Result { + ) -> Result>> { let mut curr_last_header = self.wait_last_finalized_block_header().await?; + // ToDo: Not scalable, initial buffer should be stored in DB to not run out of memory + // Don't want to complicate DB even more right now. + let mut initial_block_buffer = VecDeque::new(); - let first_header = loop { + loop { let Some(curr_last_block) = self .bedrock_client .get_block_by_id(curr_last_header) @@ -143,12 +169,14 @@ impl IndexerCore { return Err(anyhow::anyhow!("Chain inconsistency")); }; - if let Some(search_res) = curr_last_block.transactions().find_map(|tx| { + initial_block_buffer.push_front(curr_last_block.clone()); + + if let Some(_) = curr_last_block.transactions().find_map(|tx| { tx.mantle_tx.ops.iter().find_map(|op| match op { Op::ChannelInscribe(InscriptionOp { channel_id, parent, .. }) => { - if (channel_id == channel_id_to_search) && (parent == &MsgId::root()) { + if (channel_id == &self.config.channel_id) && (parent == &MsgId::root()) { Some(curr_last_block.header().id()) } else { None @@ -157,13 +185,45 @@ impl IndexerCore { _ => None, }) }) { - break search_res; + break; } else { + // Step back to parent curr_last_header = curr_last_block.header().parent(); }; - }; + } - Ok(first_header) + Ok(initial_block_buffer) + } + + pub async fn rollback_to_last_known_finalized_l1_id( + &self, + last_fin_header: HeaderId, + ) -> Result>> { + let mut curr_last_header = self.wait_last_finalized_block_header().await?; + // ToDo: Not scalable, buffer should be stored in DB to not run out of memory + // Don't want to complicate DB even more right now. + let mut block_buffer = VecDeque::new(); + + loop { + let Some(curr_last_block) = self + .bedrock_client + .get_block_by_id(curr_last_header) + .await? + else { + return Err(anyhow::anyhow!("Chain inconsistency")); + }; + + if curr_last_block.header().id() == last_fin_header { + break; + } else { + // Step back to parent + curr_last_header = curr_last_block.header().parent(); + } + + block_buffer.push_front(curr_last_block.clone()); + } + + Ok(block_buffer) } } diff --git a/storage/src/indexer.rs b/storage/src/indexer.rs index ea98155b..44a13539 100644 --- a/storage/src/indexer.rs +++ b/storage/src/indexer.rs @@ -27,6 +27,8 @@ pub const CACHE_SIZE: usize = 1000; pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db"; /// Key base for storing metainformation about id of last current block in db pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db"; +/// Key base for storing metainformation about id of last observed L1 block in db +pub const DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY: &str = "last_observed_l1_block_in_db"; /// Key base for storing metainformation which describe if first block has been set pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set"; /// Key base for storing metainformation about the last breakpoint @@ -217,6 +219,35 @@ impl RocksDBIO { } } + pub fn get_meta_last_observed_l1_block_in_db(&self) -> DbResult> { + let cf_meta = self.meta_column(); + let res = self + .db + .get_cf( + &cf_meta, + borsh::to_vec(&DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY).map_err(|err| { + DbError::borsh_cast_message( + err, + Some( + "Failed to serialize DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY" + .to_string(), + ), + ) + })?, + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + res.map(|data| { + borsh::from_slice::<[u8; 32]>(&data).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to deserialize last l1 header".to_string()), + ) + }) + }) + .transpose() + } + pub fn get_meta_is_first_block_set(&self) -> DbResult { let cf_meta = self.meta_column(); let res = self @@ -284,7 +315,7 @@ impl RocksDBIO { ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; - self.put_block(block)?; + self.put_block(block, [0; 32])?; Ok(()) } @@ -310,6 +341,31 @@ impl RocksDBIO { Ok(()) } + pub fn put_meta_last_observed_l1_block_in_db(&self, l1_block_header: [u8; 32]) -> DbResult<()> { + let cf_meta = self.meta_column(); + self.db + .put_cf( + &cf_meta, + borsh::to_vec(&DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY).map_err(|err| { + DbError::borsh_cast_message( + err, + Some( + "Failed to serialize DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY" + .to_string(), + ), + ) + })?, + borsh::to_vec(&l1_block_header).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize last l1 block header".to_string()), + ) + })?, + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + Ok(()) + } + pub fn put_meta_last_breakpoint_id(&self, br_id: u64) -> DbResult<()> { let cf_meta = self.meta_column(); self.db @@ -351,7 +407,7 @@ impl RocksDBIO { // Block - pub fn put_block(&self, block: Block) -> DbResult<()> { + pub fn put_block(&self, block: Block, l1_block_header: [u8; 32]) -> DbResult<()> { let cf_block = self.block_column(); let cf_hti = self.hash_to_id_column(); let cf_tti: Arc> = self.tx_hash_to_id_column(); @@ -380,6 +436,7 @@ impl RocksDBIO { if block.header.block_id > last_curr_block { self.put_meta_last_block_in_db(block.header.block_id)?; + self.put_meta_last_observed_l1_block_in_db(l1_block_header)?; } self.db @@ -957,7 +1014,7 @@ mod tests { let transfer_tx = transfer(1, 0, true); let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let first_id = dbio.get_meta_first_block_in_db().unwrap(); @@ -1000,7 +1057,7 @@ mod tests { let transfer_tx = transfer(1, (i - 1) as u128, true); let block = common::test_utils::produce_dummy_block(i + 1, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [i as u8; 32]).unwrap(); } let last_id = dbio.get_meta_last_block_in_db().unwrap(); @@ -1054,7 +1111,7 @@ mod tests { let control_hash1 = block.header.hash; - dbio.put_block(block).unwrap(); + dbio.put_block(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1065,7 +1122,7 @@ mod tests { let control_hash2 = block.header.hash; - dbio.put_block(block).unwrap(); + dbio.put_block(block, [2; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1076,7 +1133,7 @@ mod tests { let control_tx_hash1 = transfer_tx.hash(); let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [3; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1087,7 +1144,7 @@ mod tests { let control_tx_hash2 = transfer_tx.hash(); let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [4; 32]).unwrap(); let control_block_id1 = dbio.get_block_id_by_hash(control_hash1.0).unwrap(); let control_block_id2 = dbio.get_block_id_by_hash(control_hash2.0).unwrap(); @@ -1118,7 +1175,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1128,7 +1185,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [2; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1138,7 +1195,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [3; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1148,7 +1205,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [4; 32]).unwrap(); let block_hashes_mem: Vec<[u8; 32]> = block_res.into_iter().map(|bl| bl.header.hash.0).collect(); @@ -1192,7 +1249,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1204,7 +1261,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [2; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1216,7 +1273,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [3; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1228,7 +1285,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [4; 32]).unwrap(); let acc1_tx = dbio.get_acc_transactions(*acc1().value(), 0, 4).unwrap(); let acc1_tx_hashes: Vec<[u8; 32]> = acc1_tx.into_iter().map(|tx| tx.hash().0).collect();