diff --git a/Cargo.lock b/Cargo.lock index 8dddcb92..678184e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1178,6 +1178,7 @@ dependencies = [ "futures", "log", "logos-blockchain-chain-broadcast-service", + "logos-blockchain-chain-service", "logos-blockchain-common-http-client", "logos-blockchain-core", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 5f2a7033..520844b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,6 +113,7 @@ logos-blockchain-common-http-client = { git = "https://github.com/logos-blockcha logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-chain-broadcast-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } +logos-blockchain-chain-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } rocksdb = { version = "0.24.0", default-features = false, features = [ "snappy", diff --git a/bedrock_client/Cargo.toml b/bedrock_client/Cargo.toml index bd275d8b..6f8b8a74 100644 --- a/bedrock_client/Cargo.toml +++ b/bedrock_client/Cargo.toml @@ -16,3 +16,4 @@ serde.workspace = true logos-blockchain-common-http-client.workspace = true logos-blockchain-core.workspace = true logos-blockchain-chain-broadcast-service.workspace = true +logos-blockchain-chain-service.workspace = true diff --git a/bedrock_client/src/lib.rs b/bedrock_client/src/lib.rs index 7655a31c..28d62aca 100644 --- a/bedrock_client/src/lib.rs +++ b/bedrock_client/src/lib.rs @@ -5,6 +5,7 @@ use common::config::BasicAuth; use futures::{Stream, TryFutureExt}; use log::{info, warn}; pub use logos_blockchain_chain_broadcast_service::BlockInfo; +use logos_blockchain_chain_service::CryptarchiaInfo; pub use logos_blockchain_common_http_client::{CommonHttpClient, Error}; pub use logos_blockchain_core::{block::Block, header::HeaderId, mantle::SignedMantleTx}; use reqwest::{Client, Url}; @@ -82,6 +83,17 @@ impl BedrockClient { .await } + pub async fn get_consensus_info( + &self, + ) -> Result { + Retry::spawn(self.backoff_strategy(), || { + self.http_client + .consensus_info(self.node_url.clone()) + .inspect_err(|err| warn!("Block fetching failed with error: {err:#}")) + }) + .await + } + fn backoff_strategy(&self) -> impl Iterator { tokio_retry::strategy::FibonacciBackoff::from_millis(self.backoff.start_delay_millis) .take(self.backoff.max_retries) diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 0e44efce..fd05f372 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -5,11 +5,10 @@ use bedrock_client::{BedrockClient, HeaderId}; use common::block::{Block, HashableBlockData}; // ToDo: Remove after testnet use common::{HashType, PINATA_BASE58}; -use futures::StreamExt; use log::info; use logos_blockchain_core::mantle::{ Op, SignedMantleTx, - ops::channel::{ChannelId, MsgId, inscribe::InscriptionOp}, + ops::channel::{ChannelId, inscribe::InscriptionOp}, }; use crate::{block_store::IndexerStore, config::IndexerConfig}; @@ -84,6 +83,7 @@ impl IndexerCore { let mut last_fin_header = match last_l1_header { Some(last_l1_header) => { + info!("Last fin header found: {last_l1_header}"); last_l1_header }, None => { @@ -94,7 +94,7 @@ impl IndexerCore { let last_l1_header = start_buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id(); for l1_block in start_buff { - info!("Observed L1 block at height {}", l1_block.header().slot().into_inner()); + info!("Observed L1 block at slot {}", l1_block.header().slot().into_inner()); let curr_l1_header = l1_block.header().id(); @@ -116,13 +116,17 @@ impl IndexerCore { }, }; - loop { - let buff = self.rollback_to_last_known_finalized_l1_id(last_fin_header).await?; + info!("INITIAL_SEARCH_END"); - last_fin_header = buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id(); + loop { + let buff = self.rollback_to_last_known_finalized_l1_id(last_fin_header).await + .inspect_err(|err| log::error!("Failed to roll back to last finalized l1 id with err {err:#?}"))?; + + last_fin_header = buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty")) + .inspect_err(|err| log::error!("Chain is empty {err:#?}"))?.header().id(); for l1_block in buff { - info!("Observed L1 block at height {}", l1_block.header().slot().into_inner()); + info!("Observed L1 block at slot {}", l1_block.header().slot().into_inner()); let curr_l1_header = l1_block.header().id(); @@ -143,13 +147,29 @@ impl IndexerCore { } } - async fn wait_last_finalized_block_header(&self) -> Result { - let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); - stream_pinned - .next() - .await - .ok_or(anyhow::anyhow!("Stream failure")) - .map(|info| info.header_id) + // async fn wait_last_finalized_block_header(&self) -> Result { + // let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); + // stream_pinned + // .next() + // .await + // .ok_or(anyhow::anyhow!("Stream failure")) + // .map(|info| info.header_id) + // } + + async fn get_tip(&self) -> Result { + Ok(self.bedrock_client.get_consensus_info().await?.tip) + } + + async fn get_next_tip(&self, prev_tip: HeaderId) -> Result { + loop { + let next_tip = self.get_tip().await?; + if next_tip != prev_tip { + break Ok(next_tip); + } else { + info!("Wait 10s to not spam the node"); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + } + } } /// WARNING: depending on chain behaviour, @@ -157,7 +177,8 @@ impl IndexerCore { pub async fn search_for_channel_start( &self, ) -> Result>> { - let mut curr_last_header = self.wait_last_finalized_block_header().await?; + // let mut curr_last_header = self.wait_last_finalized_block_header().await?; + let mut curr_last_header = self.get_tip().await?; // Storing start for future use let mut rollback_start = curr_last_header; // ToDo: How to get root? @@ -167,31 +188,20 @@ impl IndexerCore { let mut block_buffer = VecDeque::new(); 'outer: loop { + let mut cycle_header = curr_last_header; + loop { - // let res = self - // .bedrock_client - // .get_block_by_id(curr_last_header) - // .await?; - - // let curr_last_block; - - // match res { - // Some(block) => {curr_last_block = block}, - // None => { - // break; - // } - // } - - let Some(curr_last_block) = self + let curr_last_block = if let Some(block) = self .bedrock_client - .get_block_by_id(curr_last_header) + .get_block_by_id(cycle_header) .await? - else { - log::error!("Failed to get block for header {curr_last_header}"); - return Err(anyhow::anyhow!("Chain inconsistency")); + { + block + } else { + break; }; - info!("INITIAL_SEARCH: Observed L1 block at height {}", curr_last_block.header().slot().into_inner()); + info!("INITIAL_SEARCH: Observed L1 block at slot {}", curr_last_block.header().slot().into_inner()); info!("INITIAL_SEARCH: This block header is {}", curr_last_block.header().id()); info!("INITIAL_SEARCH: This block parent is {}", curr_last_block.header().parent()); @@ -200,11 +210,23 @@ impl IndexerCore { if let Some(_) = curr_last_block.transactions().find_map(|tx| { tx.mantle_tx.ops.iter().find_map(|op| match op { Op::ChannelInscribe(InscriptionOp { - channel_id, parent, .. + channel_id, inscription, .. }) => { - if (channel_id == &self.config.channel_id) && (parent == &MsgId::root()) + if channel_id == &self.config.channel_id { - Some(curr_last_block.header().id()) + let l2_block = borsh::from_slice::(&inscription). + inspect_err(|err| log::error!("INITIAL_SEARCH: failed to deserialize with err: {err:#?}")).ok(); + match l2_block { + Some(block) => { + info!("!!!!!!!!!!!!!!!INITIAL_SEARCH: Observed L2 block at id {}", block.header.block_id); + if block.header.block_id == 1 { + Some(curr_last_block.header().id()) + } else { + None + } + }, + None => None, + } } else { None } @@ -222,7 +244,7 @@ impl IndexerCore { break; } - curr_last_header = parent; + cycle_header = parent; }; } @@ -230,7 +252,17 @@ impl IndexerCore { block_buffer.clear(); rollback_limit = rollback_start; - curr_last_header = self.wait_last_finalized_block_header().await?; + + curr_last_header = loop { + let next_tip = self.get_tip().await?; + if next_tip != curr_last_header { + break next_tip; + } else { + info!("Wait 10s to not spam the node"); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + } + }; + rollback_start = curr_last_header; } @@ -241,7 +273,7 @@ impl IndexerCore { &self, last_fin_header: HeaderId, ) -> Result>> { - let mut curr_last_header = self.wait_last_finalized_block_header().await?; + let mut curr_last_header = self.get_next_tip(last_fin_header).await?; // ToDo: Not scalable, buffer should be stored in DB to not run out of memory // Don't want to complicate DB even more right now. let mut block_buffer = VecDeque::new(); diff --git a/integration_tests/src/config.rs b/integration_tests/src/config.rs index 62e34860..dd888e2c 100644 --- a/integration_tests/src/config.rs +++ b/integration_tests/src/config.rs @@ -74,7 +74,7 @@ pub fn sequencer_config( max_num_tx_in_block, mempool_max_size, block_create_timeout_millis, - retry_pending_blocks_timeout_millis: 240_000, + retry_pending_blocks_timeout_millis: 120_000, port: 0, initial_accounts: initial_data.sequencer_initial_accounts(), initial_commitments: initial_data.sequencer_initial_commitments(), diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs index d0bb80cf..fced7884 100644 --- a/integration_tests/tests/indexer.rs +++ b/integration_tests/tests/indexer.rs @@ -12,10 +12,10 @@ use tokio::test; use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand}; /// Timeout in milliseconds to reliably await for block finalization -const L2_TO_L1_TIMEOUT_MILLIS: u64 = 300000; +const L2_TO_L1_TIMEOUT_MILLIS: u64 = 500000; #[test] -#[ignore = "Not reliable with current bedrock node"] +//#[ignore = "Not reliable with current bedrock node"] async fn indexer_test_run() -> Result<()> { let ctx = TestContext::new().await?;