feat: tip polling

This commit is contained in:
Pravdyvy 2026-02-17 10:46:51 +02:00
parent 99a4e8482c
commit ae00d62b98
7 changed files with 91 additions and 44 deletions

1
Cargo.lock generated
View File

@ -1178,6 +1178,7 @@ dependencies = [
"futures",
"log",
"logos-blockchain-chain-broadcast-service",
"logos-blockchain-chain-service",
"logos-blockchain-common-http-client",
"logos-blockchain-core",
"reqwest",

View File

@ -113,6 +113,7 @@ logos-blockchain-common-http-client = { git = "https://github.com/logos-blockcha
logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-chain-broadcast-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-chain-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
rocksdb = { version = "0.24.0", default-features = false, features = [
"snappy",

View File

@ -16,3 +16,4 @@ serde.workspace = true
logos-blockchain-common-http-client.workspace = true
logos-blockchain-core.workspace = true
logos-blockchain-chain-broadcast-service.workspace = true
logos-blockchain-chain-service.workspace = true

View File

@ -5,6 +5,7 @@ use common::config::BasicAuth;
use futures::{Stream, TryFutureExt};
use log::{info, warn};
pub use logos_blockchain_chain_broadcast_service::BlockInfo;
use logos_blockchain_chain_service::CryptarchiaInfo;
pub use logos_blockchain_common_http_client::{CommonHttpClient, Error};
pub use logos_blockchain_core::{block::Block, header::HeaderId, mantle::SignedMantleTx};
use reqwest::{Client, Url};
@ -82,6 +83,17 @@ impl BedrockClient {
.await
}
pub async fn get_consensus_info(
&self,
) -> Result<CryptarchiaInfo, Error> {
Retry::spawn(self.backoff_strategy(), || {
self.http_client
.consensus_info(self.node_url.clone())
.inspect_err(|err| warn!("Block fetching failed with error: {err:#}"))
})
.await
}
fn backoff_strategy(&self) -> impl Iterator<Item = Duration> {
tokio_retry::strategy::FibonacciBackoff::from_millis(self.backoff.start_delay_millis)
.take(self.backoff.max_retries)

View File

@ -5,11 +5,10 @@ use bedrock_client::{BedrockClient, HeaderId};
use common::block::{Block, HashableBlockData};
// ToDo: Remove after testnet
use common::{HashType, PINATA_BASE58};
use futures::StreamExt;
use log::info;
use logos_blockchain_core::mantle::{
Op, SignedMantleTx,
ops::channel::{ChannelId, MsgId, inscribe::InscriptionOp},
ops::channel::{ChannelId, inscribe::InscriptionOp},
};
use crate::{block_store::IndexerStore, config::IndexerConfig};
@ -84,6 +83,7 @@ impl IndexerCore {
let mut last_fin_header = match last_l1_header {
Some(last_l1_header) => {
info!("Last fin header found: {last_l1_header}");
last_l1_header
},
None => {
@ -94,7 +94,7 @@ impl IndexerCore {
let last_l1_header = start_buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id();
for l1_block in start_buff {
info!("Observed L1 block at height {}", l1_block.header().slot().into_inner());
info!("Observed L1 block at slot {}", l1_block.header().slot().into_inner());
let curr_l1_header = l1_block.header().id();
@ -116,13 +116,17 @@ impl IndexerCore {
},
};
loop {
let buff = self.rollback_to_last_known_finalized_l1_id(last_fin_header).await?;
info!("INITIAL_SEARCH_END");
last_fin_header = buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id();
loop {
let buff = self.rollback_to_last_known_finalized_l1_id(last_fin_header).await
.inspect_err(|err| log::error!("Failed to roll back to last finalized l1 id with err {err:#?}"))?;
last_fin_header = buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))
.inspect_err(|err| log::error!("Chain is empty {err:#?}"))?.header().id();
for l1_block in buff {
info!("Observed L1 block at height {}", l1_block.header().slot().into_inner());
info!("Observed L1 block at slot {}", l1_block.header().slot().into_inner());
let curr_l1_header = l1_block.header().id();
@ -143,13 +147,29 @@ impl IndexerCore {
}
}
async fn wait_last_finalized_block_header(&self) -> Result<HeaderId> {
let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);
stream_pinned
.next()
.await
.ok_or(anyhow::anyhow!("Stream failure"))
.map(|info| info.header_id)
// async fn wait_last_finalized_block_header(&self) -> Result<HeaderId> {
// let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);
// stream_pinned
// .next()
// .await
// .ok_or(anyhow::anyhow!("Stream failure"))
// .map(|info| info.header_id)
// }
async fn get_tip(&self) -> Result<HeaderId> {
Ok(self.bedrock_client.get_consensus_info().await?.tip)
}
async fn get_next_tip(&self, prev_tip: HeaderId) -> Result<HeaderId> {
loop {
let next_tip = self.get_tip().await?;
if next_tip != prev_tip {
break Ok(next_tip);
} else {
info!("Wait 10s to not spam the node");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
}
}
/// WARNING: depending on chain behaviour,
@ -157,7 +177,8 @@ impl IndexerCore {
pub async fn search_for_channel_start(
&self,
) -> Result<VecDeque<bedrock_client::Block<SignedMantleTx>>> {
let mut curr_last_header = self.wait_last_finalized_block_header().await?;
// let mut curr_last_header = self.wait_last_finalized_block_header().await?;
let mut curr_last_header = self.get_tip().await?;
// Storing start for future use
let mut rollback_start = curr_last_header;
// ToDo: How to get root?
@ -167,31 +188,20 @@ impl IndexerCore {
let mut block_buffer = VecDeque::new();
'outer: loop {
let mut cycle_header = curr_last_header;
loop {
// let res = self
// .bedrock_client
// .get_block_by_id(curr_last_header)
// .await?;
// let curr_last_block;
// match res {
// Some(block) => {curr_last_block = block},
// None => {
// break;
// }
// }
let Some(curr_last_block) = self
let curr_last_block = if let Some(block) = self
.bedrock_client
.get_block_by_id(curr_last_header)
.get_block_by_id(cycle_header)
.await?
else {
log::error!("Failed to get block for header {curr_last_header}");
return Err(anyhow::anyhow!("Chain inconsistency"));
{
block
} else {
break;
};
info!("INITIAL_SEARCH: Observed L1 block at height {}", curr_last_block.header().slot().into_inner());
info!("INITIAL_SEARCH: Observed L1 block at slot {}", curr_last_block.header().slot().into_inner());
info!("INITIAL_SEARCH: This block header is {}", curr_last_block.header().id());
info!("INITIAL_SEARCH: This block parent is {}", curr_last_block.header().parent());
@ -200,11 +210,23 @@ impl IndexerCore {
if let Some(_) = curr_last_block.transactions().find_map(|tx| {
tx.mantle_tx.ops.iter().find_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
channel_id, parent, ..
channel_id, inscription, ..
}) => {
if (channel_id == &self.config.channel_id) && (parent == &MsgId::root())
if channel_id == &self.config.channel_id
{
Some(curr_last_block.header().id())
let l2_block = borsh::from_slice::<Block>(&inscription).
inspect_err(|err| log::error!("INITIAL_SEARCH: failed to deserialize with err: {err:#?}")).ok();
match l2_block {
Some(block) => {
info!("!!!!!!!!!!!!!!!INITIAL_SEARCH: Observed L2 block at id {}", block.header.block_id);
if block.header.block_id == 1 {
Some(curr_last_block.header().id())
} else {
None
}
},
None => None,
}
} else {
None
}
@ -222,7 +244,7 @@ impl IndexerCore {
break;
}
curr_last_header = parent;
cycle_header = parent;
};
}
@ -230,7 +252,17 @@ impl IndexerCore {
block_buffer.clear();
rollback_limit = rollback_start;
curr_last_header = self.wait_last_finalized_block_header().await?;
curr_last_header = loop {
let next_tip = self.get_tip().await?;
if next_tip != curr_last_header {
break next_tip;
} else {
info!("Wait 10s to not spam the node");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
};
rollback_start = curr_last_header;
}
@ -241,7 +273,7 @@ impl IndexerCore {
&self,
last_fin_header: HeaderId,
) -> Result<VecDeque<bedrock_client::Block<SignedMantleTx>>> {
let mut curr_last_header = self.wait_last_finalized_block_header().await?;
let mut curr_last_header = self.get_next_tip(last_fin_header).await?;
// ToDo: Not scalable, buffer should be stored in DB to not run out of memory
// Don't want to complicate DB even more right now.
let mut block_buffer = VecDeque::new();

View File

@ -74,7 +74,7 @@ pub fn sequencer_config(
max_num_tx_in_block,
mempool_max_size,
block_create_timeout_millis,
retry_pending_blocks_timeout_millis: 240_000,
retry_pending_blocks_timeout_millis: 120_000,
port: 0,
initial_accounts: initial_data.sequencer_initial_accounts(),
initial_commitments: initial_data.sequencer_initial_commitments(),

View File

@ -12,10 +12,10 @@ use tokio::test;
use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand};
/// Timeout in milliseconds to reliably await for block finalization
const L2_TO_L1_TIMEOUT_MILLIS: u64 = 300000;
const L2_TO_L1_TIMEOUT_MILLIS: u64 = 500000;
#[test]
#[ignore = "Not reliable with current bedrock node"]
//#[ignore = "Not reliable with current bedrock node"]
async fn indexer_test_run() -> Result<()> {
let ctx = TestContext::new().await?;