From feb66e6a18c0c188cf90cc89e84207aba0f38131 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Wed, 18 Feb 2026 14:58:33 +0200 Subject: [PATCH] fix: tests unignored --- indexer/core/src/block_store.rs | 6 +- indexer/core/src/config.rs | 6 +- indexer/core/src/lib.rs | 294 ++++++++++++++--------------- integration_tests/src/config.rs | 2 +- integration_tests/tests/indexer.rs | 5 +- storage/src/indexer.rs | 40 ++-- 6 files changed, 167 insertions(+), 186 deletions(-) diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index f4b720c6..307a3282 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -35,11 +35,11 @@ impl IndexerStore { Self::open_db_with_genesis(location, None) } - pub fn last_observed_l1_header(&self) -> Result> { + pub fn last_observed_l1_lib_header(&self) -> Result> { Ok(self .dbio - .get_meta_last_observed_l1_block_in_db()? - .map(|raw_id| HeaderId::from(raw_id))) + .get_meta_last_observed_l1_lib_header_in_db()? + .map(HeaderId::from)) } pub fn get_last_block_id(&self) -> Result { diff --git a/indexer/core/src/config.rs b/indexer/core/src/config.rs index 968678e5..e3cd4d04 100644 --- a/indexer/core/src/config.rs +++ b/indexer/core/src/config.rs @@ -32,12 +32,8 @@ pub struct IndexerConfig { /// List of initial commitments pub initial_commitments: Vec, /// Sequencers signing key - /// - /// ToDo: Remove it after introducing bedrock block parsing. - /// Currently can not be removed, because indexer must start - /// chain BEFORE sequencer. pub signing_key: [u8; 32], - pub resubscribe_interval_millis: u64, + pub consensus_info_polling_interval_millis: u64, pub bedrock_client_config: ClientConfig, pub channel_id: ChannelId, } diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 9ac20c2c..7832014b 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -5,7 +5,7 @@ use bedrock_client::{BedrockClient, HeaderId}; use common::block::{Block, HashableBlockData}; // ToDo: Remove after testnet use common::{HashType, PINATA_BASE58}; -use log::info; +use log::{debug, error, info}; use logos_blockchain_core::mantle::{ Op, SignedMantleTx, ops::channel::{ChannelId, inscribe::InscriptionOp}, @@ -26,7 +26,6 @@ pub struct IndexerCore { impl IndexerCore { pub fn new(config: IndexerConfig) -> Result { - // ToDo: replace with correct startup let hashable_data = HashableBlockData { block_id: 1, transactions: vec![], @@ -34,10 +33,20 @@ impl IndexerCore { timestamp: 0, }; + // Genesis creation is fine as it is, + // because it will be overwritten by sequencer. + // Therefore: + // ToDo: remove key from indexer config, use some default. let signing_key = nssa::PrivateKey::try_new(config.signing_key).unwrap(); let channel_genesis_msg_id = [0; 32]; let start_block = hashable_data.into_pending_block(&signing_key, channel_genesis_msg_id); + // This is a troubling moment, because changes in key protocol can + // affect this. And indexer can not reliably ask this data from sequencer + // because indexer must be independent from it. + // ToDo: move initial state generation into common and use the same method + // for indexer and sequencer. This way both services buit at same version + // could be in sync. let initial_commitments: Vec = config .initial_commitments .iter() @@ -72,75 +81,54 @@ impl IndexerCore { config.bedrock_client_config.auth.clone(), )?, config, - // ToDo: Implement restarts store: IndexerStore::open_db_with_genesis(&home, Some((start_block, state)))?, }) } pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream> { async_stream::stream! { - let last_l1_header = self.store.last_observed_l1_header()?; + info!("Searching for initial header"); - let mut last_fin_header = match last_l1_header { - Some(last_l1_header) => { - info!("Last fin header found: {last_l1_header}"); - last_l1_header + let last_l1_lib_header = self.store.last_observed_l1_lib_header()?; + + let mut prev_last_l1_lib_header = match last_l1_lib_header { + Some(last_l1_lib_header) => { + info!("Last l1 lib header found: {last_l1_lib_header}"); + last_l1_lib_header }, None => { + info!("Last l1 lib header not found in DB"); info!("Searching for the start of a channel"); - let start_buff = self.search_for_channel_start().await?; + let (start_buff, last_l1_lib_header) = self.search_for_channel_start().await?; - let last_l1_header = start_buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id(); - - for l1_block in start_buff { - info!("Observed L1 block at slot {}", l1_block.header().slot().into_inner()); - - let curr_l1_header = l1_block.header().id(); - - let l2_blocks_parsed = parse_blocks( - l1_block.into_transactions().into_iter(), - &self.config.channel_id, - ).collect::>(); - - info!("Parsed {} L2 blocks", l2_blocks_parsed.len()); - - for l2_block in l2_blocks_parsed { - self.store.put_block(l2_block.clone(), curr_l1_header)?; + for (l2_block_vec, l1_header) in start_buff { + for l2_block in l2_block_vec { + self.store.put_block(l2_block.clone(), l1_header)?; yield Ok(l2_block); } } - last_l1_header + last_l1_lib_header }, }; - info!("INITIAL_SEARCH_END"); + info!("Searching for initial header finished"); + + info!("Starting backfilling from {prev_last_l1_lib_header}"); loop { - let buff = self.rollback_to_last_known_finalized_l1_id(last_fin_header).await - .inspect_err(|err| log::error!("Failed to roll back to last finalized l1 id with err {err:#?}"))?; + let (buff, curr_last_l1_lib_header) = self + .backfill_to_last_l1_lib_header_id(prev_last_l1_lib_header, &self.config.channel_id) + .await + .inspect_err(|err| error!("Failed to backfill to last l1 lib header id with err {err:#?}"))?; - last_fin_header = buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty")) - .inspect_err(|err| log::error!("Chain is empty {err:#?}"))?.header().id(); + prev_last_l1_lib_header = curr_last_l1_lib_header; - for l1_block in buff { - info!("Observed L1 block at slot {}", l1_block.header().slot().into_inner()); - - let curr_l1_header = l1_block.header().id(); - - let l2_blocks_parsed = parse_blocks( - l1_block.into_transactions().into_iter(), - &self.config.channel_id, - ).collect::>(); - - let mut l2_blocks_parsed_ids: Vec<_> = l2_blocks_parsed.iter().map(|block| block.header.block_id).collect(); - l2_blocks_parsed_ids.sort(); - info!("Parsed {} L2 blocks with ids {:?}", l2_blocks_parsed.len(), l2_blocks_parsed_ids); - - for l2_block in l2_blocks_parsed { - self.store.put_block(l2_block.clone(), curr_l1_header)?; + for (l2_block_vec, header) in buff { + for l2_block in l2_block_vec { + self.store.put_block(l2_block.clone(), header)?; yield Ok(l2_block); } @@ -149,106 +137,88 @@ impl IndexerCore { } } - // async fn wait_last_finalized_block_header(&self) -> Result { - // let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); - // stream_pinned - // .next() - // .await - // .ok_or(anyhow::anyhow!("Stream failure")) - // .map(|info| info.header_id) - // } - - async fn get_tip(&self) -> Result { - Ok(self.bedrock_client.get_consensus_info().await?.tip) + async fn get_lib(&self) -> Result { + Ok(self.bedrock_client.get_consensus_info().await?.lib) } - async fn get_next_tip(&self, prev_tip: HeaderId) -> Result { + async fn get_next_lib(&self, prev_lib: HeaderId) -> Result { loop { - let next_tip = self.get_tip().await?; - if next_tip != prev_tip { - break Ok(next_tip); + let next_lib = self.get_lib().await?; + if next_lib != prev_lib { + break Ok(next_lib); } else { - info!("Wait 10s to not spam the node"); - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + info!( + "Wait {}ms to not spam the node", + self.config.consensus_info_polling_interval_millis + ); + tokio::time::sleep(std::time::Duration::from_millis( + self.config.consensus_info_polling_interval_millis, + )) + .await; } } } - /// WARNING: depending on chain behaviour, + /// WARNING: depending on channel state, /// may take indefinite amount of time pub async fn search_for_channel_start( &self, - ) -> Result>> { - // let mut curr_last_header = self.wait_last_finalized_block_header().await?; - let mut curr_last_header = self.get_tip().await?; - // Storing start for future use - let mut rollback_start = curr_last_header; + ) -> Result<(VecDeque<(Vec, HeaderId)>, HeaderId)> { + let mut curr_last_l1_lib_header = self.get_lib().await?; + let mut backfill_start = curr_last_l1_lib_header; // ToDo: How to get root? - let mut rollback_limit = HeaderId::from([0; 32]); + let mut backfill_limit = HeaderId::from([0; 32]); // ToDo: Not scalable, initial buffer should be stored in DB to not run out of memory // Don't want to complicate DB even more right now. let mut block_buffer = VecDeque::new(); 'outer: loop { - let mut cycle_header = curr_last_header; + let mut cycle_header = curr_last_l1_lib_header; loop { - let curr_last_block = + let cycle_block = if let Some(block) = self.bedrock_client.get_block_by_id(cycle_header).await? { block } else { + // First run can reach root easily + // so here we are optimistic about L1 + // failing to get parent. break; }; + // It would be better to have id, but block does not have it, so slot will do. info!( - "INITIAL_SEARCH: Observed L1 block at slot {}", - curr_last_block.header().slot().into_inner() + "INITIAL SEARCH: Observed L1 block at slot {}", + cycle_block.header().slot().into_inner() ); - info!( - "INITIAL_SEARCH: This block header is {}", - curr_last_block.header().id() + debug!( + "INITIAL SEARCH: This block header is {}", + cycle_block.header().id() ); - info!( - "INITIAL_SEARCH: This block parent is {}", - curr_last_block.header().parent() + debug!( + "INITIAL SEARCH: This block parent is {}", + cycle_block.header().parent() ); - block_buffer.push_front(curr_last_block.clone()); + let (l2_block_vec, l1_header) = + parse_block_owned(&cycle_block, &self.config.channel_id); - if let Some(_) = curr_last_block.transactions().find_map(|tx| { - tx.mantle_tx.ops.iter().find_map(|op| match op { - Op::ChannelInscribe(InscriptionOp { - channel_id, inscription, .. - }) => { - if channel_id == &self.config.channel_id - { - let l2_block = borsh::from_slice::(&inscription). - inspect_err(|err| log::error!("INITIAL_SEARCH: failed to deserialize with err: {err:#?}")).ok(); - match l2_block { - Some(block) => { - info!("!!!!!!!!!!!!!!!INITIAL_SEARCH: Observed L2 block at id {}", block.header.block_id); - if block.header.block_id == 1 { - Some(curr_last_block.header().id()) - } else { - None - } - }, - None => None, - } - } else { - None - } - } - _ => None, - }) - }) { - info!("INITIAL_SEARCH: Found channel start"); - break 'outer; + info!("Parsed {} L2 blocks", l2_block_vec.len()); + + if !l2_block_vec.is_empty() { + block_buffer.push_front((l2_block_vec.clone(), l1_header)); + } + + if let Some(first_l2_block) = l2_block_vec.first() { + if first_l2_block.header.block_id == 1 { + info!("INITIAL_SEARCH: Found channel start"); + break 'outer; + } } else { // Step back to parent - let parent = curr_last_block.header().parent(); + let parent = cycle_block.header().parent(); - if parent == rollback_limit { + if parent == backfill_limit { break; } @@ -256,73 +226,83 @@ impl IndexerCore { }; } - info!("INITIAL_SEARCH: Reached rollback limit, refetching last block"); + info!("INITIAL_SEARCH: Reached backfill limit, refetching last l1 lib header"); block_buffer.clear(); - rollback_limit = rollback_start; - - curr_last_header = loop { - let next_tip = self.get_tip().await?; - if next_tip != curr_last_header { - break next_tip; - } else { - info!("Wait 10s to not spam the node"); - tokio::time::sleep(std::time::Duration::from_secs(10)).await; - } - }; - - rollback_start = curr_last_header; + backfill_limit = backfill_start; + curr_last_l1_lib_header = self.get_next_lib(curr_last_l1_lib_header).await?; + backfill_start = curr_last_l1_lib_header; } - Ok(block_buffer) + Ok((block_buffer, backfill_limit)) } - pub async fn rollback_to_last_known_finalized_l1_id( + pub async fn backfill_to_last_l1_lib_header_id( &self, - last_fin_header: HeaderId, - ) -> Result>> { - let mut curr_last_header = self.get_next_tip(last_fin_header).await?; + last_fin_l1_lib_header: HeaderId, + channel_id: &ChannelId, + ) -> Result<(VecDeque<(Vec, HeaderId)>, HeaderId)> { + let curr_fin_l1_lib_header = self.get_next_lib(last_fin_l1_lib_header).await?; // ToDo: Not scalable, buffer should be stored in DB to not run out of memory // Don't want to complicate DB even more right now. let mut block_buffer = VecDeque::new(); + let mut cycle_header = curr_fin_l1_lib_header; loop { - let Some(curr_last_block) = self - .bedrock_client - .get_block_by_id(curr_last_header) - .await? - else { - return Err(anyhow::anyhow!("Chain inconsistency")); + let Some(cycle_block) = self.bedrock_client.get_block_by_id(cycle_header).await? else { + return Err(anyhow::anyhow!("Parent not found")); }; - if curr_last_block.header().id() == last_fin_header { + if cycle_block.header().id() == last_fin_l1_lib_header { break; } else { // Step back to parent - curr_last_header = curr_last_block.header().parent(); + cycle_header = cycle_block.header().parent(); } - block_buffer.push_front(curr_last_block.clone()); + // It would be better to have id, but block does not have it, so slot will do. + info!( + "Observed L1 block at slot {}", + cycle_block.header().slot().into_inner() + ); + + let (l2_block_vec, l1_header) = parse_block_owned(&cycle_block, channel_id); + + info!("Parsed {} L2 blocks", l2_block_vec.len()); + + if !l2_block_vec.is_empty() { + block_buffer.push_front((l2_block_vec, l1_header)); + } } - Ok(block_buffer) + Ok((block_buffer, curr_fin_l1_lib_header)) } } -fn parse_blocks( - block_txs: impl Iterator, +fn parse_block_owned( + l1_block: &bedrock_client::Block, decoded_channel_id: &ChannelId, -) -> impl Iterator { - block_txs.flat_map(|tx| { - tx.mantle_tx.ops.into_iter().filter_map(|op| match op { - Op::ChannelInscribe(InscriptionOp { - channel_id, - inscription, - .. - }) if channel_id == *decoded_channel_id => { - borsh::from_slice::(&inscription).ok() - } - _ => None, - }) - }) +) -> (Vec, HeaderId) { + ( + l1_block + .transactions() + .flat_map(|tx| { + tx.mantle_tx.ops.iter().filter_map(|op| match op { + Op::ChannelInscribe(InscriptionOp { + channel_id, + inscription, + .. + }) if channel_id == decoded_channel_id => { + borsh::from_slice::(inscription) + .inspect_err(|err| { + error!("Failed to deserialize our inscription with err: {err:#?}") + }) + .ok() + } + _ => None, + }) + }) + .collect(), + l1_block.header().id(), + ) } diff --git a/integration_tests/src/config.rs b/integration_tests/src/config.rs index dd888e2c..445929dd 100644 --- a/integration_tests/src/config.rs +++ b/integration_tests/src/config.rs @@ -19,7 +19,7 @@ pub fn indexer_config( ) -> Result { Ok(IndexerConfig { home, - resubscribe_interval_millis: 1000, + consensus_info_polling_interval_millis: 10000, bedrock_client_config: ClientConfig { addr: addr_to_url(UrlProtocol::Http, bedrock_addr) .context("Failed to convert bedrock addr to URL")?, diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs index fced7884..d5207a41 100644 --- a/integration_tests/tests/indexer.rs +++ b/integration_tests/tests/indexer.rs @@ -12,10 +12,9 @@ use tokio::test; use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand}; /// Timeout in milliseconds to reliably await for block finalization -const L2_TO_L1_TIMEOUT_MILLIS: u64 = 500000; +const L2_TO_L1_TIMEOUT_MILLIS: u64 = 600000; #[test] -//#[ignore = "Not reliable with current bedrock node"] async fn indexer_test_run() -> Result<()> { let ctx = TestContext::new().await?; @@ -45,7 +44,6 @@ async fn indexer_test_run() -> Result<()> { } #[test] -#[ignore = "Not reliable with current bedrock node"] async fn indexer_block_batching() -> Result<()> { let ctx = TestContext::new().await?; @@ -81,7 +79,6 @@ async fn indexer_block_batching() -> Result<()> { } #[test] -#[ignore = "Not reliable with current bedrock node"] async fn indexer_state_consistency() -> Result<()> { let mut ctx = TestContext::new().await?; diff --git a/storage/src/indexer.rs b/storage/src/indexer.rs index 5e467780..34285f34 100644 --- a/storage/src/indexer.rs +++ b/storage/src/indexer.rs @@ -27,8 +27,9 @@ pub const CACHE_SIZE: usize = 1000; pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db"; /// Key base for storing metainformation about id of last current block in db pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db"; -/// Key base for storing metainformation about id of last observed L1 block in db -pub const DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY: &str = "last_observed_l1_block_in_db"; +/// Key base for storing metainformation about id of last observed L1 lib header in db +pub const DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY: &str = + "last_observed_l1_lib_header_in_db"; /// Key base for storing metainformation which describe if first block has been set pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set"; /// Key base for storing metainformation about the last breakpoint @@ -219,21 +220,23 @@ impl RocksDBIO { } } - pub fn get_meta_last_observed_l1_block_in_db(&self) -> DbResult> { + pub fn get_meta_last_observed_l1_lib_header_in_db(&self) -> DbResult> { let cf_meta = self.meta_column(); let res = self .db .get_cf( &cf_meta, - borsh::to_vec(&DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY).map_err(|err| { - DbError::borsh_cast_message( + borsh::to_vec(&DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY).map_err( + |err| { + DbError::borsh_cast_message( err, Some( - "Failed to serialize DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY" + "Failed to serialize DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY" .to_string(), ), ) - })?, + }, + )?, ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; @@ -241,7 +244,7 @@ impl RocksDBIO { borsh::from_slice::<[u8; 32]>(&data).map_err(|err| { DbError::borsh_cast_message( err, - Some("Failed to deserialize last l1 header".to_string()), + Some("Failed to deserialize last l1 lib header".to_string()), ) }) }) @@ -341,21 +344,26 @@ impl RocksDBIO { Ok(()) } - pub fn put_meta_last_observed_l1_block_in_db(&self, l1_block_header: [u8; 32]) -> DbResult<()> { + pub fn put_meta_last_observed_l1_lib_header_in_db( + &self, + l1_lib_header: [u8; 32], + ) -> DbResult<()> { let cf_meta = self.meta_column(); self.db .put_cf( &cf_meta, - borsh::to_vec(&DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY).map_err(|err| { - DbError::borsh_cast_message( + borsh::to_vec(&DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY).map_err( + |err| { + DbError::borsh_cast_message( err, Some( - "Failed to serialize DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY" + "Failed to serialize DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY" .to_string(), ), ) - })?, - borsh::to_vec(&l1_block_header).map_err(|err| { + }, + )?, + borsh::to_vec(&l1_lib_header).map_err(|err| { DbError::borsh_cast_message( err, Some("Failed to serialize last l1 block header".to_string()), @@ -407,7 +415,7 @@ impl RocksDBIO { // Block - pub fn put_block(&self, block: Block, l1_block_header: [u8; 32]) -> DbResult<()> { + pub fn put_block(&self, block: Block, l1_lib_header: [u8; 32]) -> DbResult<()> { let cf_block = self.block_column(); let cf_hti = self.hash_to_id_column(); let cf_tti: Arc> = self.tx_hash_to_id_column(); @@ -436,7 +444,7 @@ impl RocksDBIO { if block.header.block_id > last_curr_block { self.put_meta_last_block_in_db(block.header.block_id)?; - self.put_meta_last_observed_l1_block_in_db(l1_block_header)?; + self.put_meta_last_observed_l1_lib_header_in_db(l1_lib_header)?; } self.db