From b7f44ffd65fdc21e8b8c064d928f6d2790731bd9 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Wed, 11 Feb 2026 16:24:46 +0200 Subject: [PATCH 01/10] feat: search for channel start --- indexer/core/src/lib.rs | 51 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 0497e68a..32bd373c 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use bedrock_client::BedrockClient; +use bedrock_client::{BedrockClient, HeaderId}; use common::block::{Block, HashableBlockData}; // ToDo: Remove after testnet use common::{HashType, PINATA_BASE58}; @@ -7,7 +7,7 @@ use futures::StreamExt; use log::info; use logos_blockchain_core::mantle::{ Op, SignedMantleTx, - ops::channel::{ChannelId, inscribe::InscriptionOp}, + ops::channel::{ChannelId, MsgId, inscribe::InscriptionOp}, }; use crate::{block_store::IndexerStore, config::IndexerConfig}; @@ -120,6 +120,53 @@ impl IndexerCore { } } } + + async fn wait_last_finalized_block_header(&self) -> Result { + let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); + stream_pinned + .next() + .await + .ok_or(anyhow::anyhow!("Stream failure")) + .map(|info| info.header_id) + } + + pub async fn search_for_channel_start( + &self, + channel_id_to_search: &ChannelId, + ) -> Result { + let mut curr_last_header = self.wait_last_finalized_block_header().await?; + + let first_header = loop { + let Some(curr_last_block) = self + .bedrock_client + .get_block_by_id(curr_last_header) + .await? + else { + return Err(anyhow::anyhow!("Chain inconsistency")); + }; + + if let Some(search_res) = curr_last_block.transactions().find_map(|tx| { + tx.mantle_tx.ops.iter().find_map(|op| match op { + Op::ChannelInscribe(InscriptionOp { + channel_id, parent, .. + }) => { + if (channel_id == channel_id_to_search) && (parent == &MsgId::root()) { + Some(curr_last_block.header().id()) + } else { + None + } + } + _ => None, + }) + }) { + break search_res; + } else { + curr_last_header = curr_last_block.header().parent(); + }; + }; + + Ok(first_header) + } } fn parse_blocks( From de1aa99fd414c55f7827691e3f14ea0602fd08d0 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Thu, 12 Feb 2026 14:27:36 +0200 Subject: [PATCH 02/10] feat: updated block streaming --- indexer/core/src/block_store.rs | 12 +++- indexer/core/src/lib.rs | 116 ++++++++++++++++++++++++-------- storage/src/indexer.rs | 89 +++++++++++++++++++----- 3 files changed, 171 insertions(+), 46 deletions(-) diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index b5de3896..f4b720c6 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -1,6 +1,7 @@ use std::{path::Path, sync::Arc}; use anyhow::Result; +use bedrock_client::HeaderId; use common::{ block::Block, transaction::{NSSATransaction, execute_check_transaction_on_state, transaction_pre_check}, @@ -34,6 +35,13 @@ impl IndexerStore { Self::open_db_with_genesis(location, None) } + pub fn last_observed_l1_header(&self) -> Result> { + Ok(self + .dbio + .get_meta_last_observed_l1_block_in_db()? + .map(|raw_id| HeaderId::from(raw_id))) + } + pub fn get_last_block_id(&self) -> Result { Ok(self.dbio.get_meta_last_block_in_db()?) } @@ -95,7 +103,7 @@ impl IndexerStore { Ok(self.final_state()?.get_account_by_id(*account_id)) } - pub fn put_block(&self, block: Block) -> Result<()> { + pub fn put_block(&self, block: Block, l1_header: HeaderId) -> Result<()> { let mut final_state = self.dbio.final_state()?; for transaction in &block.body.transactions { @@ -105,6 +113,6 @@ impl IndexerStore { )?; } - Ok(self.dbio.put_block(block)?) + Ok(self.dbio.put_block(block, l1_header.into())?) } } diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 32bd373c..0d5b431d 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -1,3 +1,5 @@ +use std::collections::VecDeque; + use anyhow::Result; use bedrock_client::{BedrockClient, HeaderId}; use common::block::{Block, HashableBlockData}; @@ -78,22 +80,51 @@ impl IndexerCore { pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream> { async_stream::stream! { - loop { - let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); + let last_l1_header = self.store.last_observed_l1_header()?; - info!("Block stream joined"); + let mut last_fin_header = match last_l1_header { + Some(last_l1_header) => { + last_l1_header + }, + None => { + info!("Searching for the start of a channel"); - while let Some(block_info) = stream_pinned.next().await { - let header_id = block_info.header_id; + let start_buff = self.search_for_channel_start().await?; - info!("Observed L1 block at height {}", block_info.height); + let last_l1_header = start_buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id(); - if let Some(l1_block) = self - .bedrock_client - .get_block_by_id(header_id) - .await? - { - info!("Extracted L1 block at height {}", block_info.height); + for l1_block in start_buff { + info!("Observed L1 block at height {}", l1_block.header().slot().into_inner()); + + let curr_l1_header = l1_block.header().id(); + + let l2_blocks_parsed = parse_blocks( + l1_block.into_transactions().into_iter(), + &self.config.channel_id, + ).collect::>(); + + info!("Parsed {} L2 blocks", l2_blocks_parsed.len()); + + for l2_block in l2_blocks_parsed { + self.store.put_block(l2_block.clone(), curr_l1_header)?; + + yield Ok(l2_block); + } + } + + last_l1_header + }, + }; + + loop { + let buff = self.rollback_to_last_known_finalized_l1_id(last_fin_header).await?; + + last_fin_header = buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id(); + + for l1_block in buff { + info!("Observed L1 block at height {}", l1_block.header().slot().into_inner()); + + let curr_l1_header = l1_block.header().id(); let l2_blocks_parsed = parse_blocks( l1_block.into_transactions().into_iter(), @@ -105,19 +136,12 @@ impl IndexerCore { info!("Parsed {} L2 blocks with ids {:?}", l2_blocks_parsed.len(), l2_blocks_parsed_ids); for l2_block in l2_blocks_parsed { - self.store.put_block(l2_block.clone())?; + self.store.put_block(l2_block.clone(), curr_l1_header)?; yield Ok(l2_block); } } } - - // Refetch stream after delay - tokio::time::sleep(std::time::Duration::from_millis( - self.config.resubscribe_interval_millis, - )) - .await; - } } } @@ -132,11 +156,13 @@ impl IndexerCore { pub async fn search_for_channel_start( &self, - channel_id_to_search: &ChannelId, - ) -> Result { + ) -> Result>> { let mut curr_last_header = self.wait_last_finalized_block_header().await?; + // ToDo: Not scalable, initial buffer should be stored in DB to not run out of memory + // Don't want to complicate DB even more right now. + let mut initial_block_buffer = VecDeque::new(); - let first_header = loop { + loop { let Some(curr_last_block) = self .bedrock_client .get_block_by_id(curr_last_header) @@ -145,12 +171,14 @@ impl IndexerCore { return Err(anyhow::anyhow!("Chain inconsistency")); }; - if let Some(search_res) = curr_last_block.transactions().find_map(|tx| { + initial_block_buffer.push_front(curr_last_block.clone()); + + if let Some(_) = curr_last_block.transactions().find_map(|tx| { tx.mantle_tx.ops.iter().find_map(|op| match op { Op::ChannelInscribe(InscriptionOp { channel_id, parent, .. }) => { - if (channel_id == channel_id_to_search) && (parent == &MsgId::root()) { + if (channel_id == &self.config.channel_id) && (parent == &MsgId::root()) { Some(curr_last_block.header().id()) } else { None @@ -159,13 +187,45 @@ impl IndexerCore { _ => None, }) }) { - break search_res; + break; } else { + // Step back to parent curr_last_header = curr_last_block.header().parent(); }; - }; + } - Ok(first_header) + Ok(initial_block_buffer) + } + + pub async fn rollback_to_last_known_finalized_l1_id( + &self, + last_fin_header: HeaderId, + ) -> Result>> { + let mut curr_last_header = self.wait_last_finalized_block_header().await?; + // ToDo: Not scalable, buffer should be stored in DB to not run out of memory + // Don't want to complicate DB even more right now. + let mut block_buffer = VecDeque::new(); + + loop { + let Some(curr_last_block) = self + .bedrock_client + .get_block_by_id(curr_last_header) + .await? + else { + return Err(anyhow::anyhow!("Chain inconsistency")); + }; + + if curr_last_block.header().id() == last_fin_header { + break; + } else { + // Step back to parent + curr_last_header = curr_last_block.header().parent(); + } + + block_buffer.push_front(curr_last_block.clone()); + } + + Ok(block_buffer) } } diff --git a/storage/src/indexer.rs b/storage/src/indexer.rs index 98a9a629..5e467780 100644 --- a/storage/src/indexer.rs +++ b/storage/src/indexer.rs @@ -27,6 +27,8 @@ pub const CACHE_SIZE: usize = 1000; pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db"; /// Key base for storing metainformation about id of last current block in db pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db"; +/// Key base for storing metainformation about id of last observed L1 block in db +pub const DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY: &str = "last_observed_l1_block_in_db"; /// Key base for storing metainformation which describe if first block has been set pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set"; /// Key base for storing metainformation about the last breakpoint @@ -217,6 +219,35 @@ impl RocksDBIO { } } + pub fn get_meta_last_observed_l1_block_in_db(&self) -> DbResult> { + let cf_meta = self.meta_column(); + let res = self + .db + .get_cf( + &cf_meta, + borsh::to_vec(&DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY).map_err(|err| { + DbError::borsh_cast_message( + err, + Some( + "Failed to serialize DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY" + .to_string(), + ), + ) + })?, + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + res.map(|data| { + borsh::from_slice::<[u8; 32]>(&data).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to deserialize last l1 header".to_string()), + ) + }) + }) + .transpose() + } + pub fn get_meta_is_first_block_set(&self) -> DbResult { let cf_meta = self.meta_column(); let res = self @@ -284,7 +315,7 @@ impl RocksDBIO { ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; - self.put_block(block)?; + self.put_block(block, [0; 32])?; Ok(()) } @@ -310,6 +341,31 @@ impl RocksDBIO { Ok(()) } + pub fn put_meta_last_observed_l1_block_in_db(&self, l1_block_header: [u8; 32]) -> DbResult<()> { + let cf_meta = self.meta_column(); + self.db + .put_cf( + &cf_meta, + borsh::to_vec(&DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY).map_err(|err| { + DbError::borsh_cast_message( + err, + Some( + "Failed to serialize DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY" + .to_string(), + ), + ) + })?, + borsh::to_vec(&l1_block_header).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize last l1 block header".to_string()), + ) + })?, + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + Ok(()) + } + pub fn put_meta_last_breakpoint_id(&self, br_id: u64) -> DbResult<()> { let cf_meta = self.meta_column(); self.db @@ -351,7 +407,7 @@ impl RocksDBIO { // Block - pub fn put_block(&self, block: Block) -> DbResult<()> { + pub fn put_block(&self, block: Block, l1_block_header: [u8; 32]) -> DbResult<()> { let cf_block = self.block_column(); let cf_hti = self.hash_to_id_column(); let cf_tti: Arc> = self.tx_hash_to_id_column(); @@ -380,6 +436,7 @@ impl RocksDBIO { if block.header.block_id > last_curr_block { self.put_meta_last_block_in_db(block.header.block_id)?; + self.put_meta_last_observed_l1_block_in_db(l1_block_header)?; } self.db @@ -957,7 +1014,7 @@ mod tests { let transfer_tx = transfer(1, 0, true); let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let first_id = dbio.get_meta_first_block_in_db().unwrap(); @@ -1000,7 +1057,7 @@ mod tests { let transfer_tx = transfer(1, (i - 1) as u128, true); let block = common::test_utils::produce_dummy_block(i + 1, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [i as u8; 32]).unwrap(); } let last_id = dbio.get_meta_last_block_in_db().unwrap(); @@ -1054,7 +1111,7 @@ mod tests { let control_hash1 = block.header.hash; - dbio.put_block(block).unwrap(); + dbio.put_block(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1065,7 +1122,7 @@ mod tests { let control_hash2 = block.header.hash; - dbio.put_block(block).unwrap(); + dbio.put_block(block, [2; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1076,7 +1133,7 @@ mod tests { let control_tx_hash1 = transfer_tx.hash(); let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [3; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1087,7 +1144,7 @@ mod tests { let control_tx_hash2 = transfer_tx.hash(); let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [4; 32]).unwrap(); let control_block_id1 = dbio.get_block_id_by_hash(control_hash1.0).unwrap(); let control_block_id2 = dbio.get_block_id_by_hash(control_hash2.0).unwrap(); @@ -1118,7 +1175,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1128,7 +1185,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [2; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1138,7 +1195,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [3; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1148,7 +1205,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [4; 32]).unwrap(); let block_hashes_mem: Vec<[u8; 32]> = block_res.into_iter().map(|bl| bl.header.hash.0).collect(); @@ -1192,7 +1249,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1204,7 +1261,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [2; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1216,7 +1273,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [3; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1228,7 +1285,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [4; 32]).unwrap(); let acc1_tx = dbio.get_acc_transactions(*acc1().value(), 0, 4).unwrap(); let acc1_tx_hashes: Vec<[u8; 32]> = acc1_tx.into_iter().map(|tx| tx.hash().0).collect(); From 77f1be59b0aefc011a89ecad5bb6768f95967348 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Mon, 16 Feb 2026 17:54:54 +0200 Subject: [PATCH 03/10] feat: start search corerct --- indexer/core/src/lib.rs | 107 +++++++++++++++++++++++++++++----------- 1 file changed, 79 insertions(+), 28 deletions(-) diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 0d5b431d..f93a4c86 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -154,47 +154,98 @@ impl IndexerCore { .map(|info| info.header_id) } + /// WARNING: depending on chain behaviour, + /// may take indefinite amount of time pub async fn search_for_channel_start( &self, ) -> Result>> { let mut curr_last_header = self.wait_last_finalized_block_header().await?; + // Storing start for future use + let mut rollback_start = curr_last_header; + // ToDo: How to get root? + let mut rollback_limit = HeaderId::from([0; 32]); // ToDo: Not scalable, initial buffer should be stored in DB to not run out of memory // Don't want to complicate DB even more right now. - let mut initial_block_buffer = VecDeque::new(); + let mut block_buffer = VecDeque::new(); - loop { - let Some(curr_last_block) = self - .bedrock_client - .get_block_by_id(curr_last_header) - .await? - else { - return Err(anyhow::anyhow!("Chain inconsistency")); - }; + 'outer: loop { + loop { + // let res = self + // .bedrock_client + // .get_block_by_id(curr_last_header) + // .await?; - initial_block_buffer.push_front(curr_last_block.clone()); + // let curr_last_block; - if let Some(_) = curr_last_block.transactions().find_map(|tx| { - tx.mantle_tx.ops.iter().find_map(|op| match op { - Op::ChannelInscribe(InscriptionOp { - channel_id, parent, .. - }) => { - if (channel_id == &self.config.channel_id) && (parent == &MsgId::root()) { - Some(curr_last_block.header().id()) - } else { - None + // match res { + // Some(block) => {curr_last_block = block}, + // None => { + // break; + // } + // } + + let Some(curr_last_block) = self + .bedrock_client + .get_block_by_id(curr_last_header) + .await? + else { + log::error!("Failed to get block for header {curr_last_header}"); + return Err(anyhow::anyhow!("Chain inconsistency")); + }; + + info!( + "INITIAL_SEARCH: Observed L1 block at height {}", + curr_last_block.header().slot().into_inner() + ); + info!( + "INITIAL_SEARCH: This block header is {}", + curr_last_block.header().id() + ); + info!( + "INITIAL_SEARCH: This block parent is {}", + curr_last_block.header().parent() + ); + + block_buffer.push_front(curr_last_block.clone()); + + if let Some(_) = curr_last_block.transactions().find_map(|tx| { + tx.mantle_tx.ops.iter().find_map(|op| match op { + Op::ChannelInscribe(InscriptionOp { + channel_id, parent, .. + }) => { + if (channel_id == &self.config.channel_id) && (parent == &MsgId::root()) + { + Some(curr_last_block.header().id()) + } else { + None + } } + _ => None, + }) + }) { + info!("INITIAL_SEARCH: Found channel start"); + break 'outer; + } else { + // Step back to parent + let parent = curr_last_block.header().parent(); + + if parent == rollback_limit { + break; } - _ => None, - }) - }) { - break; - } else { - // Step back to parent - curr_last_header = curr_last_block.header().parent(); - }; + + curr_last_header = parent; + }; + } + + info!("INITIAL_SEARCH: Reached rollback limit, refetching last block"); + + block_buffer.clear(); + rollback_limit = rollback_start; + curr_last_header = self.wait_last_finalized_block_header().await?; + rollback_start = curr_last_header; } - Ok(initial_block_buffer) + Ok(block_buffer) } pub async fn rollback_to_last_known_finalized_l1_id( From 6f979786e705870cc5ddd4663177e3a2d74c956d Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Tue, 17 Feb 2026 10:46:51 +0200 Subject: [PATCH 04/10] feat: tip polling --- Cargo.lock | 1 + Cargo.toml | 1 + bedrock_client/Cargo.toml | 1 + bedrock_client/src/lib.rs | 10 +++ indexer/core/src/lib.rs | 115 ++++++++++++++++++----------- integration_tests/src/config.rs | 2 +- integration_tests/tests/indexer.rs | 4 +- 7 files changed, 88 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5543243a..7e54020e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1212,6 +1212,7 @@ dependencies = [ "futures", "log", "logos-blockchain-chain-broadcast-service", + "logos-blockchain-chain-service", "logos-blockchain-common-http-client", "logos-blockchain-core", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 5f2a7033..520844b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,6 +113,7 @@ logos-blockchain-common-http-client = { git = "https://github.com/logos-blockcha logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-chain-broadcast-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } +logos-blockchain-chain-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } rocksdb = { version = "0.24.0", default-features = false, features = [ "snappy", diff --git a/bedrock_client/Cargo.toml b/bedrock_client/Cargo.toml index bd275d8b..6f8b8a74 100644 --- a/bedrock_client/Cargo.toml +++ b/bedrock_client/Cargo.toml @@ -16,3 +16,4 @@ serde.workspace = true logos-blockchain-common-http-client.workspace = true logos-blockchain-core.workspace = true logos-blockchain-chain-broadcast-service.workspace = true +logos-blockchain-chain-service.workspace = true diff --git a/bedrock_client/src/lib.rs b/bedrock_client/src/lib.rs index 7655a31c..7aae7783 100644 --- a/bedrock_client/src/lib.rs +++ b/bedrock_client/src/lib.rs @@ -5,6 +5,7 @@ use common::config::BasicAuth; use futures::{Stream, TryFutureExt}; use log::{info, warn}; pub use logos_blockchain_chain_broadcast_service::BlockInfo; +use logos_blockchain_chain_service::CryptarchiaInfo; pub use logos_blockchain_common_http_client::{CommonHttpClient, Error}; pub use logos_blockchain_core::{block::Block, header::HeaderId, mantle::SignedMantleTx}; use reqwest::{Client, Url}; @@ -82,6 +83,15 @@ impl BedrockClient { .await } + pub async fn get_consensus_info(&self) -> Result { + Retry::spawn(self.backoff_strategy(), || { + self.http_client + .consensus_info(self.node_url.clone()) + .inspect_err(|err| warn!("Block fetching failed with error: {err:#}")) + }) + .await + } + fn backoff_strategy(&self) -> impl Iterator { tokio_retry::strategy::FibonacciBackoff::from_millis(self.backoff.start_delay_millis) .take(self.backoff.max_retries) diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index f93a4c86..dd260f00 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -5,11 +5,10 @@ use bedrock_client::{BedrockClient, HeaderId}; use common::block::{Block, HashableBlockData}; // ToDo: Remove after testnet use common::{HashType, PINATA_BASE58}; -use futures::StreamExt; use log::info; use logos_blockchain_core::mantle::{ Op, SignedMantleTx, - ops::channel::{ChannelId, MsgId, inscribe::InscriptionOp}, + ops::channel::{ChannelId, inscribe::InscriptionOp}, }; use crate::{block_store::IndexerStore, config::IndexerConfig}; @@ -84,6 +83,7 @@ impl IndexerCore { let mut last_fin_header = match last_l1_header { Some(last_l1_header) => { + info!("Last fin header found: {last_l1_header}"); last_l1_header }, None => { @@ -94,7 +94,7 @@ impl IndexerCore { let last_l1_header = start_buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id(); for l1_block in start_buff { - info!("Observed L1 block at height {}", l1_block.header().slot().into_inner()); + info!("Observed L1 block at slot {}", l1_block.header().slot().into_inner()); let curr_l1_header = l1_block.header().id(); @@ -116,13 +116,17 @@ impl IndexerCore { }, }; - loop { - let buff = self.rollback_to_last_known_finalized_l1_id(last_fin_header).await?; + info!("INITIAL_SEARCH_END"); - last_fin_header = buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id(); + loop { + let buff = self.rollback_to_last_known_finalized_l1_id(last_fin_header).await + .inspect_err(|err| log::error!("Failed to roll back to last finalized l1 id with err {err:#?}"))?; + + last_fin_header = buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty")) + .inspect_err(|err| log::error!("Chain is empty {err:#?}"))?.header().id(); for l1_block in buff { - info!("Observed L1 block at height {}", l1_block.header().slot().into_inner()); + info!("Observed L1 block at slot {}", l1_block.header().slot().into_inner()); let curr_l1_header = l1_block.header().id(); @@ -145,13 +149,29 @@ impl IndexerCore { } } - async fn wait_last_finalized_block_header(&self) -> Result { - let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); - stream_pinned - .next() - .await - .ok_or(anyhow::anyhow!("Stream failure")) - .map(|info| info.header_id) + // async fn wait_last_finalized_block_header(&self) -> Result { + // let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); + // stream_pinned + // .next() + // .await + // .ok_or(anyhow::anyhow!("Stream failure")) + // .map(|info| info.header_id) + // } + + async fn get_tip(&self) -> Result { + Ok(self.bedrock_client.get_consensus_info().await?.tip) + } + + async fn get_next_tip(&self, prev_tip: HeaderId) -> Result { + loop { + let next_tip = self.get_tip().await?; + if next_tip != prev_tip { + break Ok(next_tip); + } else { + info!("Wait 10s to not spam the node"); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + } + } } /// WARNING: depending on chain behaviour, @@ -159,7 +179,8 @@ impl IndexerCore { pub async fn search_for_channel_start( &self, ) -> Result>> { - let mut curr_last_header = self.wait_last_finalized_block_header().await?; + // let mut curr_last_header = self.wait_last_finalized_block_header().await?; + let mut curr_last_header = self.get_tip().await?; // Storing start for future use let mut rollback_start = curr_last_header; // ToDo: How to get root? @@ -169,29 +190,15 @@ impl IndexerCore { let mut block_buffer = VecDeque::new(); 'outer: loop { + let mut cycle_header = curr_last_header; + loop { - // let res = self - // .bedrock_client - // .get_block_by_id(curr_last_header) - // .await?; - - // let curr_last_block; - - // match res { - // Some(block) => {curr_last_block = block}, - // None => { - // break; - // } - // } - - let Some(curr_last_block) = self - .bedrock_client - .get_block_by_id(curr_last_header) - .await? - else { - log::error!("Failed to get block for header {curr_last_header}"); - return Err(anyhow::anyhow!("Chain inconsistency")); - }; + let curr_last_block = + if let Some(block) = self.bedrock_client.get_block_by_id(cycle_header).await? { + block + } else { + break; + }; info!( "INITIAL_SEARCH: Observed L1 block at height {}", @@ -211,11 +218,23 @@ impl IndexerCore { if let Some(_) = curr_last_block.transactions().find_map(|tx| { tx.mantle_tx.ops.iter().find_map(|op| match op { Op::ChannelInscribe(InscriptionOp { - channel_id, parent, .. + channel_id, inscription, .. }) => { - if (channel_id == &self.config.channel_id) && (parent == &MsgId::root()) + if channel_id == &self.config.channel_id { - Some(curr_last_block.header().id()) + let l2_block = borsh::from_slice::(&inscription). + inspect_err(|err| log::error!("INITIAL_SEARCH: failed to deserialize with err: {err:#?}")).ok(); + match l2_block { + Some(block) => { + info!("!!!!!!!!!!!!!!!INITIAL_SEARCH: Observed L2 block at id {}", block.header.block_id); + if block.header.block_id == 1 { + Some(curr_last_block.header().id()) + } else { + None + } + }, + None => None, + } } else { None } @@ -233,7 +252,7 @@ impl IndexerCore { break; } - curr_last_header = parent; + cycle_header = parent; }; } @@ -241,7 +260,17 @@ impl IndexerCore { block_buffer.clear(); rollback_limit = rollback_start; - curr_last_header = self.wait_last_finalized_block_header().await?; + + curr_last_header = loop { + let next_tip = self.get_tip().await?; + if next_tip != curr_last_header { + break next_tip; + } else { + info!("Wait 10s to not spam the node"); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + } + }; + rollback_start = curr_last_header; } @@ -252,7 +281,7 @@ impl IndexerCore { &self, last_fin_header: HeaderId, ) -> Result>> { - let mut curr_last_header = self.wait_last_finalized_block_header().await?; + let mut curr_last_header = self.get_next_tip(last_fin_header).await?; // ToDo: Not scalable, buffer should be stored in DB to not run out of memory // Don't want to complicate DB even more right now. let mut block_buffer = VecDeque::new(); diff --git a/integration_tests/src/config.rs b/integration_tests/src/config.rs index 62e34860..dd888e2c 100644 --- a/integration_tests/src/config.rs +++ b/integration_tests/src/config.rs @@ -74,7 +74,7 @@ pub fn sequencer_config( max_num_tx_in_block, mempool_max_size, block_create_timeout_millis, - retry_pending_blocks_timeout_millis: 240_000, + retry_pending_blocks_timeout_millis: 120_000, port: 0, initial_accounts: initial_data.sequencer_initial_accounts(), initial_commitments: initial_data.sequencer_initial_commitments(), diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs index d0bb80cf..fced7884 100644 --- a/integration_tests/tests/indexer.rs +++ b/integration_tests/tests/indexer.rs @@ -12,10 +12,10 @@ use tokio::test; use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand}; /// Timeout in milliseconds to reliably await for block finalization -const L2_TO_L1_TIMEOUT_MILLIS: u64 = 300000; +const L2_TO_L1_TIMEOUT_MILLIS: u64 = 500000; #[test] -#[ignore = "Not reliable with current bedrock node"] +//#[ignore = "Not reliable with current bedrock node"] async fn indexer_test_run() -> Result<()> { let ctx = TestContext::new().await?; From f39fd9ee5300690de593ac225d92e04ed1fb2620 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Wed, 18 Feb 2026 11:39:05 +0200 Subject: [PATCH 05/10] fix: fmt --- indexer/core/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index dd260f00..9ac20c2c 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -201,7 +201,7 @@ impl IndexerCore { }; info!( - "INITIAL_SEARCH: Observed L1 block at height {}", + "INITIAL_SEARCH: Observed L1 block at slot {}", curr_last_block.header().slot().into_inner() ); info!( From feb66e6a18c0c188cf90cc89e84207aba0f38131 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Wed, 18 Feb 2026 14:58:33 +0200 Subject: [PATCH 06/10] fix: tests unignored --- indexer/core/src/block_store.rs | 6 +- indexer/core/src/config.rs | 6 +- indexer/core/src/lib.rs | 294 ++++++++++++++--------------- integration_tests/src/config.rs | 2 +- integration_tests/tests/indexer.rs | 5 +- storage/src/indexer.rs | 40 ++-- 6 files changed, 167 insertions(+), 186 deletions(-) diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index f4b720c6..307a3282 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -35,11 +35,11 @@ impl IndexerStore { Self::open_db_with_genesis(location, None) } - pub fn last_observed_l1_header(&self) -> Result> { + pub fn last_observed_l1_lib_header(&self) -> Result> { Ok(self .dbio - .get_meta_last_observed_l1_block_in_db()? - .map(|raw_id| HeaderId::from(raw_id))) + .get_meta_last_observed_l1_lib_header_in_db()? + .map(HeaderId::from)) } pub fn get_last_block_id(&self) -> Result { diff --git a/indexer/core/src/config.rs b/indexer/core/src/config.rs index 968678e5..e3cd4d04 100644 --- a/indexer/core/src/config.rs +++ b/indexer/core/src/config.rs @@ -32,12 +32,8 @@ pub struct IndexerConfig { /// List of initial commitments pub initial_commitments: Vec, /// Sequencers signing key - /// - /// ToDo: Remove it after introducing bedrock block parsing. - /// Currently can not be removed, because indexer must start - /// chain BEFORE sequencer. pub signing_key: [u8; 32], - pub resubscribe_interval_millis: u64, + pub consensus_info_polling_interval_millis: u64, pub bedrock_client_config: ClientConfig, pub channel_id: ChannelId, } diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 9ac20c2c..7832014b 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -5,7 +5,7 @@ use bedrock_client::{BedrockClient, HeaderId}; use common::block::{Block, HashableBlockData}; // ToDo: Remove after testnet use common::{HashType, PINATA_BASE58}; -use log::info; +use log::{debug, error, info}; use logos_blockchain_core::mantle::{ Op, SignedMantleTx, ops::channel::{ChannelId, inscribe::InscriptionOp}, @@ -26,7 +26,6 @@ pub struct IndexerCore { impl IndexerCore { pub fn new(config: IndexerConfig) -> Result { - // ToDo: replace with correct startup let hashable_data = HashableBlockData { block_id: 1, transactions: vec![], @@ -34,10 +33,20 @@ impl IndexerCore { timestamp: 0, }; + // Genesis creation is fine as it is, + // because it will be overwritten by sequencer. + // Therefore: + // ToDo: remove key from indexer config, use some default. let signing_key = nssa::PrivateKey::try_new(config.signing_key).unwrap(); let channel_genesis_msg_id = [0; 32]; let start_block = hashable_data.into_pending_block(&signing_key, channel_genesis_msg_id); + // This is a troubling moment, because changes in key protocol can + // affect this. And indexer can not reliably ask this data from sequencer + // because indexer must be independent from it. + // ToDo: move initial state generation into common and use the same method + // for indexer and sequencer. This way both services buit at same version + // could be in sync. let initial_commitments: Vec = config .initial_commitments .iter() @@ -72,75 +81,54 @@ impl IndexerCore { config.bedrock_client_config.auth.clone(), )?, config, - // ToDo: Implement restarts store: IndexerStore::open_db_with_genesis(&home, Some((start_block, state)))?, }) } pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream> { async_stream::stream! { - let last_l1_header = self.store.last_observed_l1_header()?; + info!("Searching for initial header"); - let mut last_fin_header = match last_l1_header { - Some(last_l1_header) => { - info!("Last fin header found: {last_l1_header}"); - last_l1_header + let last_l1_lib_header = self.store.last_observed_l1_lib_header()?; + + let mut prev_last_l1_lib_header = match last_l1_lib_header { + Some(last_l1_lib_header) => { + info!("Last l1 lib header found: {last_l1_lib_header}"); + last_l1_lib_header }, None => { + info!("Last l1 lib header not found in DB"); info!("Searching for the start of a channel"); - let start_buff = self.search_for_channel_start().await?; + let (start_buff, last_l1_lib_header) = self.search_for_channel_start().await?; - let last_l1_header = start_buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id(); - - for l1_block in start_buff { - info!("Observed L1 block at slot {}", l1_block.header().slot().into_inner()); - - let curr_l1_header = l1_block.header().id(); - - let l2_blocks_parsed = parse_blocks( - l1_block.into_transactions().into_iter(), - &self.config.channel_id, - ).collect::>(); - - info!("Parsed {} L2 blocks", l2_blocks_parsed.len()); - - for l2_block in l2_blocks_parsed { - self.store.put_block(l2_block.clone(), curr_l1_header)?; + for (l2_block_vec, l1_header) in start_buff { + for l2_block in l2_block_vec { + self.store.put_block(l2_block.clone(), l1_header)?; yield Ok(l2_block); } } - last_l1_header + last_l1_lib_header }, }; - info!("INITIAL_SEARCH_END"); + info!("Searching for initial header finished"); + + info!("Starting backfilling from {prev_last_l1_lib_header}"); loop { - let buff = self.rollback_to_last_known_finalized_l1_id(last_fin_header).await - .inspect_err(|err| log::error!("Failed to roll back to last finalized l1 id with err {err:#?}"))?; + let (buff, curr_last_l1_lib_header) = self + .backfill_to_last_l1_lib_header_id(prev_last_l1_lib_header, &self.config.channel_id) + .await + .inspect_err(|err| error!("Failed to backfill to last l1 lib header id with err {err:#?}"))?; - last_fin_header = buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty")) - .inspect_err(|err| log::error!("Chain is empty {err:#?}"))?.header().id(); + prev_last_l1_lib_header = curr_last_l1_lib_header; - for l1_block in buff { - info!("Observed L1 block at slot {}", l1_block.header().slot().into_inner()); - - let curr_l1_header = l1_block.header().id(); - - let l2_blocks_parsed = parse_blocks( - l1_block.into_transactions().into_iter(), - &self.config.channel_id, - ).collect::>(); - - let mut l2_blocks_parsed_ids: Vec<_> = l2_blocks_parsed.iter().map(|block| block.header.block_id).collect(); - l2_blocks_parsed_ids.sort(); - info!("Parsed {} L2 blocks with ids {:?}", l2_blocks_parsed.len(), l2_blocks_parsed_ids); - - for l2_block in l2_blocks_parsed { - self.store.put_block(l2_block.clone(), curr_l1_header)?; + for (l2_block_vec, header) in buff { + for l2_block in l2_block_vec { + self.store.put_block(l2_block.clone(), header)?; yield Ok(l2_block); } @@ -149,106 +137,88 @@ impl IndexerCore { } } - // async fn wait_last_finalized_block_header(&self) -> Result { - // let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); - // stream_pinned - // .next() - // .await - // .ok_or(anyhow::anyhow!("Stream failure")) - // .map(|info| info.header_id) - // } - - async fn get_tip(&self) -> Result { - Ok(self.bedrock_client.get_consensus_info().await?.tip) + async fn get_lib(&self) -> Result { + Ok(self.bedrock_client.get_consensus_info().await?.lib) } - async fn get_next_tip(&self, prev_tip: HeaderId) -> Result { + async fn get_next_lib(&self, prev_lib: HeaderId) -> Result { loop { - let next_tip = self.get_tip().await?; - if next_tip != prev_tip { - break Ok(next_tip); + let next_lib = self.get_lib().await?; + if next_lib != prev_lib { + break Ok(next_lib); } else { - info!("Wait 10s to not spam the node"); - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + info!( + "Wait {}ms to not spam the node", + self.config.consensus_info_polling_interval_millis + ); + tokio::time::sleep(std::time::Duration::from_millis( + self.config.consensus_info_polling_interval_millis, + )) + .await; } } } - /// WARNING: depending on chain behaviour, + /// WARNING: depending on channel state, /// may take indefinite amount of time pub async fn search_for_channel_start( &self, - ) -> Result>> { - // let mut curr_last_header = self.wait_last_finalized_block_header().await?; - let mut curr_last_header = self.get_tip().await?; - // Storing start for future use - let mut rollback_start = curr_last_header; + ) -> Result<(VecDeque<(Vec, HeaderId)>, HeaderId)> { + let mut curr_last_l1_lib_header = self.get_lib().await?; + let mut backfill_start = curr_last_l1_lib_header; // ToDo: How to get root? - let mut rollback_limit = HeaderId::from([0; 32]); + let mut backfill_limit = HeaderId::from([0; 32]); // ToDo: Not scalable, initial buffer should be stored in DB to not run out of memory // Don't want to complicate DB even more right now. let mut block_buffer = VecDeque::new(); 'outer: loop { - let mut cycle_header = curr_last_header; + let mut cycle_header = curr_last_l1_lib_header; loop { - let curr_last_block = + let cycle_block = if let Some(block) = self.bedrock_client.get_block_by_id(cycle_header).await? { block } else { + // First run can reach root easily + // so here we are optimistic about L1 + // failing to get parent. break; }; + // It would be better to have id, but block does not have it, so slot will do. info!( - "INITIAL_SEARCH: Observed L1 block at slot {}", - curr_last_block.header().slot().into_inner() + "INITIAL SEARCH: Observed L1 block at slot {}", + cycle_block.header().slot().into_inner() ); - info!( - "INITIAL_SEARCH: This block header is {}", - curr_last_block.header().id() + debug!( + "INITIAL SEARCH: This block header is {}", + cycle_block.header().id() ); - info!( - "INITIAL_SEARCH: This block parent is {}", - curr_last_block.header().parent() + debug!( + "INITIAL SEARCH: This block parent is {}", + cycle_block.header().parent() ); - block_buffer.push_front(curr_last_block.clone()); + let (l2_block_vec, l1_header) = + parse_block_owned(&cycle_block, &self.config.channel_id); - if let Some(_) = curr_last_block.transactions().find_map(|tx| { - tx.mantle_tx.ops.iter().find_map(|op| match op { - Op::ChannelInscribe(InscriptionOp { - channel_id, inscription, .. - }) => { - if channel_id == &self.config.channel_id - { - let l2_block = borsh::from_slice::(&inscription). - inspect_err(|err| log::error!("INITIAL_SEARCH: failed to deserialize with err: {err:#?}")).ok(); - match l2_block { - Some(block) => { - info!("!!!!!!!!!!!!!!!INITIAL_SEARCH: Observed L2 block at id {}", block.header.block_id); - if block.header.block_id == 1 { - Some(curr_last_block.header().id()) - } else { - None - } - }, - None => None, - } - } else { - None - } - } - _ => None, - }) - }) { - info!("INITIAL_SEARCH: Found channel start"); - break 'outer; + info!("Parsed {} L2 blocks", l2_block_vec.len()); + + if !l2_block_vec.is_empty() { + block_buffer.push_front((l2_block_vec.clone(), l1_header)); + } + + if let Some(first_l2_block) = l2_block_vec.first() { + if first_l2_block.header.block_id == 1 { + info!("INITIAL_SEARCH: Found channel start"); + break 'outer; + } } else { // Step back to parent - let parent = curr_last_block.header().parent(); + let parent = cycle_block.header().parent(); - if parent == rollback_limit { + if parent == backfill_limit { break; } @@ -256,73 +226,83 @@ impl IndexerCore { }; } - info!("INITIAL_SEARCH: Reached rollback limit, refetching last block"); + info!("INITIAL_SEARCH: Reached backfill limit, refetching last l1 lib header"); block_buffer.clear(); - rollback_limit = rollback_start; - - curr_last_header = loop { - let next_tip = self.get_tip().await?; - if next_tip != curr_last_header { - break next_tip; - } else { - info!("Wait 10s to not spam the node"); - tokio::time::sleep(std::time::Duration::from_secs(10)).await; - } - }; - - rollback_start = curr_last_header; + backfill_limit = backfill_start; + curr_last_l1_lib_header = self.get_next_lib(curr_last_l1_lib_header).await?; + backfill_start = curr_last_l1_lib_header; } - Ok(block_buffer) + Ok((block_buffer, backfill_limit)) } - pub async fn rollback_to_last_known_finalized_l1_id( + pub async fn backfill_to_last_l1_lib_header_id( &self, - last_fin_header: HeaderId, - ) -> Result>> { - let mut curr_last_header = self.get_next_tip(last_fin_header).await?; + last_fin_l1_lib_header: HeaderId, + channel_id: &ChannelId, + ) -> Result<(VecDeque<(Vec, HeaderId)>, HeaderId)> { + let curr_fin_l1_lib_header = self.get_next_lib(last_fin_l1_lib_header).await?; // ToDo: Not scalable, buffer should be stored in DB to not run out of memory // Don't want to complicate DB even more right now. let mut block_buffer = VecDeque::new(); + let mut cycle_header = curr_fin_l1_lib_header; loop { - let Some(curr_last_block) = self - .bedrock_client - .get_block_by_id(curr_last_header) - .await? - else { - return Err(anyhow::anyhow!("Chain inconsistency")); + let Some(cycle_block) = self.bedrock_client.get_block_by_id(cycle_header).await? else { + return Err(anyhow::anyhow!("Parent not found")); }; - if curr_last_block.header().id() == last_fin_header { + if cycle_block.header().id() == last_fin_l1_lib_header { break; } else { // Step back to parent - curr_last_header = curr_last_block.header().parent(); + cycle_header = cycle_block.header().parent(); } - block_buffer.push_front(curr_last_block.clone()); + // It would be better to have id, but block does not have it, so slot will do. + info!( + "Observed L1 block at slot {}", + cycle_block.header().slot().into_inner() + ); + + let (l2_block_vec, l1_header) = parse_block_owned(&cycle_block, channel_id); + + info!("Parsed {} L2 blocks", l2_block_vec.len()); + + if !l2_block_vec.is_empty() { + block_buffer.push_front((l2_block_vec, l1_header)); + } } - Ok(block_buffer) + Ok((block_buffer, curr_fin_l1_lib_header)) } } -fn parse_blocks( - block_txs: impl Iterator, +fn parse_block_owned( + l1_block: &bedrock_client::Block, decoded_channel_id: &ChannelId, -) -> impl Iterator { - block_txs.flat_map(|tx| { - tx.mantle_tx.ops.into_iter().filter_map(|op| match op { - Op::ChannelInscribe(InscriptionOp { - channel_id, - inscription, - .. - }) if channel_id == *decoded_channel_id => { - borsh::from_slice::(&inscription).ok() - } - _ => None, - }) - }) +) -> (Vec, HeaderId) { + ( + l1_block + .transactions() + .flat_map(|tx| { + tx.mantle_tx.ops.iter().filter_map(|op| match op { + Op::ChannelInscribe(InscriptionOp { + channel_id, + inscription, + .. + }) if channel_id == decoded_channel_id => { + borsh::from_slice::(inscription) + .inspect_err(|err| { + error!("Failed to deserialize our inscription with err: {err:#?}") + }) + .ok() + } + _ => None, + }) + }) + .collect(), + l1_block.header().id(), + ) } diff --git a/integration_tests/src/config.rs b/integration_tests/src/config.rs index dd888e2c..445929dd 100644 --- a/integration_tests/src/config.rs +++ b/integration_tests/src/config.rs @@ -19,7 +19,7 @@ pub fn indexer_config( ) -> Result { Ok(IndexerConfig { home, - resubscribe_interval_millis: 1000, + consensus_info_polling_interval_millis: 10000, bedrock_client_config: ClientConfig { addr: addr_to_url(UrlProtocol::Http, bedrock_addr) .context("Failed to convert bedrock addr to URL")?, diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs index fced7884..d5207a41 100644 --- a/integration_tests/tests/indexer.rs +++ b/integration_tests/tests/indexer.rs @@ -12,10 +12,9 @@ use tokio::test; use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand}; /// Timeout in milliseconds to reliably await for block finalization -const L2_TO_L1_TIMEOUT_MILLIS: u64 = 500000; +const L2_TO_L1_TIMEOUT_MILLIS: u64 = 600000; #[test] -//#[ignore = "Not reliable with current bedrock node"] async fn indexer_test_run() -> Result<()> { let ctx = TestContext::new().await?; @@ -45,7 +44,6 @@ async fn indexer_test_run() -> Result<()> { } #[test] -#[ignore = "Not reliable with current bedrock node"] async fn indexer_block_batching() -> Result<()> { let ctx = TestContext::new().await?; @@ -81,7 +79,6 @@ async fn indexer_block_batching() -> Result<()> { } #[test] -#[ignore = "Not reliable with current bedrock node"] async fn indexer_state_consistency() -> Result<()> { let mut ctx = TestContext::new().await?; diff --git a/storage/src/indexer.rs b/storage/src/indexer.rs index 5e467780..34285f34 100644 --- a/storage/src/indexer.rs +++ b/storage/src/indexer.rs @@ -27,8 +27,9 @@ pub const CACHE_SIZE: usize = 1000; pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db"; /// Key base for storing metainformation about id of last current block in db pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db"; -/// Key base for storing metainformation about id of last observed L1 block in db -pub const DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY: &str = "last_observed_l1_block_in_db"; +/// Key base for storing metainformation about id of last observed L1 lib header in db +pub const DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY: &str = + "last_observed_l1_lib_header_in_db"; /// Key base for storing metainformation which describe if first block has been set pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set"; /// Key base for storing metainformation about the last breakpoint @@ -219,21 +220,23 @@ impl RocksDBIO { } } - pub fn get_meta_last_observed_l1_block_in_db(&self) -> DbResult> { + pub fn get_meta_last_observed_l1_lib_header_in_db(&self) -> DbResult> { let cf_meta = self.meta_column(); let res = self .db .get_cf( &cf_meta, - borsh::to_vec(&DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY).map_err(|err| { - DbError::borsh_cast_message( + borsh::to_vec(&DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY).map_err( + |err| { + DbError::borsh_cast_message( err, Some( - "Failed to serialize DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY" + "Failed to serialize DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY" .to_string(), ), ) - })?, + }, + )?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; @@ -241,7 +244,7 @@ impl RocksDBIO { borsh::from_slice::<[u8; 32]>(&data).map_err(|err| { DbError::borsh_cast_message( err, - Some("Failed to deserialize last l1 header".to_string()), + Some("Failed to deserialize last l1 lib header".to_string()), ) }) }) @@ -341,21 +344,26 @@ impl RocksDBIO { Ok(()) } - pub fn put_meta_last_observed_l1_block_in_db(&self, l1_block_header: [u8; 32]) -> DbResult<()> { + pub fn put_meta_last_observed_l1_lib_header_in_db( + &self, + l1_lib_header: [u8; 32], + ) -> DbResult<()> { let cf_meta = self.meta_column(); self.db .put_cf( &cf_meta, - borsh::to_vec(&DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY).map_err(|err| { - DbError::borsh_cast_message( + borsh::to_vec(&DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY).map_err( + |err| { + DbError::borsh_cast_message( err, Some( - "Failed to serialize DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY" + "Failed to serialize DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY" .to_string(), ), ) - })?, - borsh::to_vec(&l1_block_header).map_err(|err| { + }, + )?, + borsh::to_vec(&l1_lib_header).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize last l1 block header".to_string()), @@ -407,7 +415,7 @@ impl RocksDBIO { // Block - pub fn put_block(&self, block: Block, l1_block_header: [u8; 32]) -> DbResult<()> { + pub fn put_block(&self, block: Block, l1_lib_header: [u8; 32]) -> DbResult<()> { let cf_block = self.block_column(); let cf_hti = self.hash_to_id_column(); let cf_tti: Arc> = self.tx_hash_to_id_column(); @@ -436,7 +444,7 @@ impl RocksDBIO { if block.header.block_id > last_curr_block { self.put_meta_last_block_in_db(block.header.block_id)?; - self.put_meta_last_observed_l1_block_in_db(l1_block_header)?; + self.put_meta_last_observed_l1_lib_header_in_db(l1_lib_header)?; } self.db From d1265af406200db5f81fe2f24b071fdf4997b358 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Thu, 19 Feb 2026 15:39:09 +0200 Subject: [PATCH 07/10] fix: config updates --- Cargo.lock | 68 ++++++++++----------- indexer/service/configs/indexer_config.json | 2 +- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7e54020e..85856c94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1449,9 +1449,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.19.1" +version = "3.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" +checksum = "5c6f81257d10a0f602a294ae4182251151ff97dbb504ef9afcdda4a64b24d9b4" [[package]] name = "bytemuck" @@ -4648,7 +4648,7 @@ checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "logos-blockchain-blend-crypto" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "blake2", "logos-blockchain-groth16", @@ -4662,7 +4662,7 @@ dependencies = [ [[package]] name = "logos-blockchain-blend-message" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "blake2", "derivative", @@ -4684,7 +4684,7 @@ dependencies = [ [[package]] name = "logos-blockchain-blend-proofs" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "ed25519-dalek", "generic-array 1.3.5", @@ -4701,7 +4701,7 @@ dependencies = [ [[package]] name = "logos-blockchain-chain-broadcast-service" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "derivative", @@ -4717,7 +4717,7 @@ dependencies = [ [[package]] name = "logos-blockchain-chain-service" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "bytes", @@ -4747,7 +4747,7 @@ dependencies = [ [[package]] name = "logos-blockchain-circuits-prover" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "logos-blockchain-circuits-utils", "tempfile", @@ -4756,7 +4756,7 @@ dependencies = [ [[package]] name = "logos-blockchain-circuits-utils" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "dirs", ] @@ -4764,7 +4764,7 @@ dependencies = [ [[package]] name = "logos-blockchain-common-http-client" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "futures", "hex", @@ -4784,7 +4784,7 @@ dependencies = [ [[package]] name = "logos-blockchain-core" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "ark-ff 0.4.2", "bincode", @@ -4814,7 +4814,7 @@ dependencies = [ [[package]] name = "logos-blockchain-cryptarchia-engine" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "cfg_eval", "logos-blockchain-pol", @@ -4830,7 +4830,7 @@ dependencies = [ [[package]] name = "logos-blockchain-cryptarchia-sync" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "bytes", "futures", @@ -4847,7 +4847,7 @@ dependencies = [ [[package]] name = "logos-blockchain-groth16" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "ark-bn254 0.4.0", "ark-ec 0.4.2", @@ -4865,7 +4865,7 @@ dependencies = [ [[package]] name = "logos-blockchain-http-api-common" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "axum 0.7.9", "logos-blockchain-core", @@ -4879,7 +4879,7 @@ dependencies = [ [[package]] name = "logos-blockchain-key-management-system-keys" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "bytes", @@ -4905,7 +4905,7 @@ dependencies = [ [[package]] name = "logos-blockchain-key-management-system-macros" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "proc-macro2", "quote", @@ -4915,7 +4915,7 @@ dependencies = [ [[package]] name = "logos-blockchain-key-management-system-operators" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "logos-blockchain-blend-proofs", @@ -4931,7 +4931,7 @@ dependencies = [ [[package]] name = "logos-blockchain-key-management-system-service" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "log", @@ -4947,7 +4947,7 @@ dependencies = [ [[package]] name = "logos-blockchain-ledger" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "derivative", "logos-blockchain-blend-crypto", @@ -4971,7 +4971,7 @@ dependencies = [ [[package]] name = "logos-blockchain-network-service" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "futures", @@ -4987,7 +4987,7 @@ dependencies = [ [[package]] name = "logos-blockchain-poc" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "logos-blockchain-circuits-prover", "logos-blockchain-circuits-utils", @@ -5003,7 +5003,7 @@ dependencies = [ [[package]] name = "logos-blockchain-pol" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "astro-float", "logos-blockchain-circuits-prover", @@ -5022,7 +5022,7 @@ dependencies = [ [[package]] name = "logos-blockchain-poq" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "logos-blockchain-circuits-prover", "logos-blockchain-circuits-utils", @@ -5039,7 +5039,7 @@ dependencies = [ [[package]] name = "logos-blockchain-poseidon2" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "ark-bn254 0.4.0", "ark-ff 0.4.2", @@ -5050,7 +5050,7 @@ dependencies = [ [[package]] name = "logos-blockchain-services-utils" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "futures", @@ -5065,7 +5065,7 @@ dependencies = [ [[package]] name = "logos-blockchain-storage-service" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "bytes", @@ -5082,7 +5082,7 @@ dependencies = [ [[package]] name = "logos-blockchain-time-service" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "futures", @@ -5100,7 +5100,7 @@ dependencies = [ [[package]] name = "logos-blockchain-utils" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "blake2", @@ -5117,7 +5117,7 @@ dependencies = [ [[package]] name = "logos-blockchain-utxotree" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "ark-ff 0.4.2", "logos-blockchain-groth16", @@ -5131,7 +5131,7 @@ dependencies = [ [[package]] name = "logos-blockchain-witness-generator" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "tempfile", ] @@ -5139,7 +5139,7 @@ dependencies = [ [[package]] name = "logos-blockchain-zksign" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "logos-blockchain-circuits-prover", "logos-blockchain-circuits-utils", @@ -5451,9 +5451,9 @@ dependencies = [ [[package]] name = "native-tls" -version = "0.2.16" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5d26952a508f321b4d3d2e80e78fc2603eaefcdf0c30783867f19586518bdc" +checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2" dependencies = [ "libc", "log", diff --git a/indexer/service/configs/indexer_config.json b/indexer/service/configs/indexer_config.json index 97461b8b..e748d96a 100644 --- a/indexer/service/configs/indexer_config.json +++ b/indexer/service/configs/indexer_config.json @@ -1,6 +1,6 @@ { "home": "./indexer/service", - "resubscribe_interval_millis": 1000, + "consensus_info_polling_interval_millis": 10000, "bedrock_client_config": { "addr": "http://localhost:8080", "backoff": { From 35bb85ebafee12163513c3f13fed696355d92685 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Mon, 23 Feb 2026 11:39:04 +0200 Subject: [PATCH 08/10] fix: suggestions fix 1 --- indexer/core/src/lib.rs | 68 +++++++++++++++++++++++++++++++---------- 1 file changed, 52 insertions(+), 16 deletions(-) diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 7832014b..46a153e3 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -24,6 +24,20 @@ pub struct IndexerCore { pub store: IndexerStore, } +#[derive(Clone)] +/// This struct represents one L1 block data fetched from backfilling +pub struct BackfillBlockData { + l2_blocks: Vec, + l1_header: HeaderId, +} + +#[derive(Clone)] +/// This struct represents data fetched fom backfilling in one iteration +pub struct BackfillData { + block_data: VecDeque, + curr_fin_l1_lib_header: HeaderId, +} + impl IndexerCore { pub fn new(config: IndexerConfig) -> Result { let hashable_data = HashableBlockData { @@ -100,9 +114,15 @@ impl IndexerCore { info!("Last l1 lib header not found in DB"); info!("Searching for the start of a channel"); - let (start_buff, last_l1_lib_header) = self.search_for_channel_start().await?; + let BackfillData { + block_data: start_buff, + curr_fin_l1_lib_header: last_l1_lib_header, + } = self.search_for_channel_start().await?; - for (l2_block_vec, l1_header) in start_buff { + for BackfillBlockData { + l2_blocks: l2_block_vec, + l1_header, + } in start_buff { for l2_block in l2_block_vec { self.store.put_block(l2_block.clone(), l1_header)?; @@ -119,14 +139,20 @@ impl IndexerCore { info!("Starting backfilling from {prev_last_l1_lib_header}"); loop { - let (buff, curr_last_l1_lib_header) = self - .backfill_to_last_l1_lib_header_id(prev_last_l1_lib_header, &self.config.channel_id) - .await - .inspect_err(|err| error!("Failed to backfill to last l1 lib header id with err {err:#?}"))?; + let BackfillData { + block_data: buff, + curr_fin_l1_lib_header, + } = self + .backfill_to_last_l1_lib_header_id(prev_last_l1_lib_header, &self.config.channel_id) + .await + .inspect_err(|err| error!("Failed to backfill to last l1 lib header id with err {err:#?}"))?; - prev_last_l1_lib_header = curr_last_l1_lib_header; + prev_last_l1_lib_header = curr_fin_l1_lib_header; - for (l2_block_vec, header) in buff { + for BackfillBlockData { + l2_blocks: l2_block_vec, + l1_header: header, + } in buff { for l2_block in l2_block_vec { self.store.put_block(l2_block.clone(), header)?; @@ -161,9 +187,7 @@ impl IndexerCore { /// WARNING: depending on channel state, /// may take indefinite amount of time - pub async fn search_for_channel_start( - &self, - ) -> Result<(VecDeque<(Vec, HeaderId)>, HeaderId)> { + pub async fn search_for_channel_start(&self) -> Result { let mut curr_last_l1_lib_header = self.get_lib().await?; let mut backfill_start = curr_last_l1_lib_header; // ToDo: How to get root? @@ -206,7 +230,10 @@ impl IndexerCore { info!("Parsed {} L2 blocks", l2_block_vec.len()); if !l2_block_vec.is_empty() { - block_buffer.push_front((l2_block_vec.clone(), l1_header)); + block_buffer.push_front(BackfillBlockData { + l2_blocks: l2_block_vec.clone(), + l1_header, + }); } if let Some(first_l2_block) = l2_block_vec.first() { @@ -234,14 +261,17 @@ impl IndexerCore { backfill_start = curr_last_l1_lib_header; } - Ok((block_buffer, backfill_limit)) + Ok(BackfillData { + block_data: block_buffer, + curr_fin_l1_lib_header: backfill_limit, + }) } pub async fn backfill_to_last_l1_lib_header_id( &self, last_fin_l1_lib_header: HeaderId, channel_id: &ChannelId, - ) -> Result<(VecDeque<(Vec, HeaderId)>, HeaderId)> { + ) -> Result { let curr_fin_l1_lib_header = self.get_next_lib(last_fin_l1_lib_header).await?; // ToDo: Not scalable, buffer should be stored in DB to not run out of memory // Don't want to complicate DB even more right now. @@ -271,11 +301,17 @@ impl IndexerCore { info!("Parsed {} L2 blocks", l2_block_vec.len()); if !l2_block_vec.is_empty() { - block_buffer.push_front((l2_block_vec, l1_header)); + block_buffer.push_front(BackfillBlockData { + l2_blocks: l2_block_vec, + l1_header, + }); } } - Ok((block_buffer, curr_fin_l1_lib_header)) + Ok(BackfillData { + block_data: block_buffer, + curr_fin_l1_lib_header, + }) } } From 548e4c8d9a3c30bb08c18bb7fe88ddeab5aae668 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Tue, 24 Feb 2026 12:27:47 +0200 Subject: [PATCH 09/10] fix: node db not empty fix --- .github/workflows/ci.yml | 30 +++++++++++++++++++++++++++++- indexer/core/src/lib.rs | 26 +++++++++++++------------- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ce9903b5..eb4cc791 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -154,7 +154,35 @@ jobs: env: RISC0_DEV_MODE: "1" RUST_LOG: "info" - run: cargo nextest run -p integration_tests -- --skip tps_test + run: cargo nextest run -p integration_tests -- --skip tps_test --skip indexer + + integration-tests-indexer: + runs-on: ubuntu-latest + timeout-minutes: 60 + steps: + - uses: actions/checkout@v5 + with: + ref: ${{ github.head_ref }} + + - uses: ./.github/actions/install-system-deps + + - uses: ./.github/actions/install-risc0 + + - uses: ./.github/actions/install-logos-blockchain-circuits + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Install active toolchain + run: rustup install + + - name: Install nextest + run: cargo install --locked cargo-nextest + + - name: Run tests + env: + RISC0_DEV_MODE: "1" + RUST_LOG: "info" + run: cargo nextest run -p integration_tests indexer -- --skip tps_test valid-proof-test: runs-on: ubuntu-latest diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 46a153e3..1e2986a3 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -236,21 +236,21 @@ impl IndexerCore { }); } - if let Some(first_l2_block) = l2_block_vec.first() { - if first_l2_block.header.block_id == 1 { - info!("INITIAL_SEARCH: Found channel start"); - break 'outer; - } - } else { - // Step back to parent - let parent = cycle_block.header().parent(); + if let Some(first_l2_block) = l2_block_vec.first() + && first_l2_block.header.block_id == 1 + { + info!("INITIAL_SEARCH: Found channel start"); + break 'outer; + } - if parent == backfill_limit { - break; - } + // Step back to parent + let parent = cycle_block.header().parent(); - cycle_header = parent; - }; + if parent == backfill_limit { + break; + } + + cycle_header = parent; } info!("INITIAL_SEARCH: Reached backfill limit, refetching last l1 lib header"); From c8bd5b3679bdc3c6fa38febe440f6a4d075d7018 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Tue, 24 Feb 2026 18:52:32 +0200 Subject: [PATCH 10/10] fix: cleanup instructions fix --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4f9fa7ca..b94a68d4 100644 --- a/README.md +++ b/README.md @@ -157,7 +157,7 @@ The sequencer and logos blockchain node can be run locally: ### Notes on cleanup After stopping services above you need to remove 3 folders to start cleanly: - 1. In the `logos-blockchain/logos-blockchain` folder `db` (not needed in case of docker setup) + 1. In the `logos-blockchain/logos-blockchain` folder `state` (not needed in case of docker setup) 2. In the `lssa` folder `sequencer_runner/rocksdb` 3. In the `lssa` file `sequencer_runner/bedrock_signing_key` 4. In the `lssa` folder `indexer/service/rocksdb`