Merge pull request #331 from logos-blockchain/Pravdyvy/bedrock-parsing-from-start-of-a-channel

Indexer correct startup
This commit is contained in:
Sergio Chouhy 2026-02-26 10:09:30 -03:00 committed by GitHub
commit f8661b24e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 422 additions and 112 deletions

View File

@ -154,7 +154,35 @@ jobs:
env:
RISC0_DEV_MODE: "1"
RUST_LOG: "info"
run: cargo nextest run -p integration_tests -- --skip tps_test
run: cargo nextest run -p integration_tests -- --skip tps_test --skip indexer
integration-tests-indexer:
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v5
with:
ref: ${{ github.head_ref }}
- uses: ./.github/actions/install-system-deps
- uses: ./.github/actions/install-risc0
- uses: ./.github/actions/install-logos-blockchain-circuits
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install active toolchain
run: rustup install
- name: Install nextest
run: cargo install --locked cargo-nextest
- name: Run tests
env:
RISC0_DEV_MODE: "1"
RUST_LOG: "info"
run: cargo nextest run -p integration_tests indexer -- --skip tps_test
valid-proof-test:
runs-on: ubuntu-latest

69
Cargo.lock generated
View File

@ -1212,6 +1212,7 @@ dependencies = [
"futures",
"log",
"logos-blockchain-chain-broadcast-service",
"logos-blockchain-chain-service",
"logos-blockchain-common-http-client",
"logos-blockchain-core",
"reqwest",
@ -1448,9 +1449,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.19.1"
version = "3.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510"
checksum = "5c6f81257d10a0f602a294ae4182251151ff97dbb504ef9afcdda4a64b24d9b4"
[[package]]
name = "bytemuck"
@ -4647,7 +4648,7 @@ checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "logos-blockchain-blend-crypto"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"blake2",
"logos-blockchain-groth16",
@ -4661,7 +4662,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-blend-message"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"blake2",
"derivative",
@ -4683,7 +4684,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-blend-proofs"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"ed25519-dalek",
"generic-array 1.3.5",
@ -4700,7 +4701,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-chain-broadcast-service"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"async-trait",
"derivative",
@ -4716,7 +4717,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-chain-service"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"async-trait",
"bytes",
@ -4746,7 +4747,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-circuits-prover"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"logos-blockchain-circuits-utils",
"tempfile",
@ -4755,7 +4756,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-circuits-utils"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"dirs",
]
@ -4763,7 +4764,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-common-http-client"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"futures",
"hex",
@ -4783,7 +4784,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-core"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"ark-ff 0.4.2",
"bincode",
@ -4813,7 +4814,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-cryptarchia-engine"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"cfg_eval",
"logos-blockchain-pol",
@ -4829,7 +4830,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-cryptarchia-sync"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"bytes",
"futures",
@ -4846,7 +4847,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-groth16"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"ark-bn254 0.4.0",
"ark-ec 0.4.2",
@ -4864,7 +4865,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-http-api-common"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"axum 0.7.9",
"logos-blockchain-core",
@ -4878,7 +4879,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-key-management-system-keys"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"async-trait",
"bytes",
@ -4904,7 +4905,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-key-management-system-macros"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"proc-macro2",
"quote",
@ -4914,7 +4915,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-key-management-system-operators"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"async-trait",
"logos-blockchain-blend-proofs",
@ -4930,7 +4931,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-key-management-system-service"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"async-trait",
"log",
@ -4946,7 +4947,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-ledger"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"derivative",
"logos-blockchain-blend-crypto",
@ -4970,7 +4971,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-network-service"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"async-trait",
"futures",
@ -4986,7 +4987,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-poc"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"logos-blockchain-circuits-prover",
"logos-blockchain-circuits-utils",
@ -5002,7 +5003,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-pol"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"astro-float",
"logos-blockchain-circuits-prover",
@ -5021,7 +5022,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-poq"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"logos-blockchain-circuits-prover",
"logos-blockchain-circuits-utils",
@ -5038,7 +5039,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-poseidon2"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"ark-bn254 0.4.0",
"ark-ff 0.4.2",
@ -5049,7 +5050,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-services-utils"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"async-trait",
"futures",
@ -5064,7 +5065,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-storage-service"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"async-trait",
"bytes",
@ -5081,7 +5082,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-time-service"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"async-trait",
"futures",
@ -5099,7 +5100,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-utils"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"async-trait",
"blake2",
@ -5116,7 +5117,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-utxotree"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"ark-ff 0.4.2",
"logos-blockchain-groth16",
@ -5130,7 +5131,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-witness-generator"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"tempfile",
]
@ -5138,7 +5139,7 @@ dependencies = [
[[package]]
name = "logos-blockchain-zksign"
version = "0.1.0"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222"
source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290"
dependencies = [
"logos-blockchain-circuits-prover",
"logos-blockchain-circuits-utils",
@ -5450,9 +5451,9 @@ dependencies = [
[[package]]
name = "native-tls"
version = "0.2.16"
version = "0.2.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5d26952a508f321b4d3d2e80e78fc2603eaefcdf0c30783867f19586518bdc"
checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2"
dependencies = [
"libc",
"log",

View File

@ -113,6 +113,7 @@ logos-blockchain-common-http-client = { git = "https://github.com/logos-blockcha
logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-chain-broadcast-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-chain-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
rocksdb = { version = "0.24.0", default-features = false, features = [
"snappy",

View File

@ -157,7 +157,7 @@ The sequencer and logos blockchain node can be run locally:
### Notes on cleanup
After stopping services above you need to remove 3 folders to start cleanly:
1. In the `logos-blockchain/logos-blockchain` folder `db` (not needed in case of docker setup)
1. In the `logos-blockchain/logos-blockchain` folder `state` (not needed in case of docker setup)
2. In the `lssa` folder `sequencer_runner/rocksdb`
3. In the `lssa` file `sequencer_runner/bedrock_signing_key`
4. In the `lssa` folder `indexer/service/rocksdb`

View File

@ -16,3 +16,4 @@ serde.workspace = true
logos-blockchain-common-http-client.workspace = true
logos-blockchain-core.workspace = true
logos-blockchain-chain-broadcast-service.workspace = true
logos-blockchain-chain-service.workspace = true

View File

@ -5,6 +5,7 @@ use common::config::BasicAuth;
use futures::{Stream, TryFutureExt};
use log::{info, warn};
pub use logos_blockchain_chain_broadcast_service::BlockInfo;
use logos_blockchain_chain_service::CryptarchiaInfo;
pub use logos_blockchain_common_http_client::{CommonHttpClient, Error};
pub use logos_blockchain_core::{block::Block, header::HeaderId, mantle::SignedMantleTx};
use reqwest::{Client, Url};
@ -82,6 +83,15 @@ impl BedrockClient {
.await
}
pub async fn get_consensus_info(&self) -> Result<CryptarchiaInfo, Error> {
Retry::spawn(self.backoff_strategy(), || {
self.http_client
.consensus_info(self.node_url.clone())
.inspect_err(|err| warn!("Block fetching failed with error: {err:#}"))
})
.await
}
fn backoff_strategy(&self) -> impl Iterator<Item = Duration> {
tokio_retry::strategy::FibonacciBackoff::from_millis(self.backoff.start_delay_millis)
.take(self.backoff.max_retries)

View File

@ -1,6 +1,7 @@
use std::{path::Path, sync::Arc};
use anyhow::Result;
use bedrock_client::HeaderId;
use common::{block::Block, transaction::NSSATransaction};
use nssa::{Account, AccountId, V02State};
use storage::indexer::RocksDBIO;
@ -31,6 +32,13 @@ impl IndexerStore {
Self::open_db_with_genesis(location, None)
}
pub fn last_observed_l1_lib_header(&self) -> Result<Option<HeaderId>> {
Ok(self
.dbio
.get_meta_last_observed_l1_lib_header_in_db()?
.map(HeaderId::from))
}
pub fn get_last_block_id(&self) -> Result<u64> {
Ok(self.dbio.get_meta_last_block_in_db()?)
}
@ -92,7 +100,7 @@ impl IndexerStore {
Ok(self.final_state()?.get_account_by_id(*account_id))
}
pub fn put_block(&self, block: Block) -> Result<()> {
pub fn put_block(&self, block: Block, l1_header: HeaderId) -> Result<()> {
let mut final_state = self.dbio.final_state()?;
for transaction in &block.body.transactions {
@ -102,6 +110,6 @@ impl IndexerStore {
.execute_check_on_state(&mut final_state)?;
}
Ok(self.dbio.put_block(block)?)
Ok(self.dbio.put_block(block, l1_header.into())?)
}
}

View File

@ -32,12 +32,8 @@ pub struct IndexerConfig {
/// List of initial commitments
pub initial_commitments: Vec<CommitmentsInitialData>,
/// Sequencers signing key
///
/// ToDo: Remove it after introducing bedrock block parsing.
/// Currently can not be removed, because indexer must start
/// chain BEFORE sequencer.
pub signing_key: [u8; 32],
pub resubscribe_interval_millis: u64,
pub consensus_info_polling_interval_millis: u64,
pub bedrock_client_config: ClientConfig,
pub channel_id: ChannelId,
}

View File

@ -1,10 +1,11 @@
use std::collections::VecDeque;
use anyhow::Result;
use bedrock_client::BedrockClient;
use bedrock_client::{BedrockClient, HeaderId};
use common::block::{Block, HashableBlockData};
// ToDo: Remove after testnet
use common::{HashType, PINATA_BASE58};
use futures::StreamExt;
use log::info;
use log::{debug, error, info};
use logos_blockchain_core::mantle::{
Op, SignedMantleTx,
ops::channel::{ChannelId, inscribe::InscriptionOp},
@ -23,9 +24,22 @@ pub struct IndexerCore {
pub store: IndexerStore,
}
#[derive(Clone)]
/// This struct represents one L1 block data fetched from backfilling
pub struct BackfillBlockData {
l2_blocks: Vec<Block>,
l1_header: HeaderId,
}
#[derive(Clone)]
/// This struct represents data fetched fom backfilling in one iteration
pub struct BackfillData {
block_data: VecDeque<BackfillBlockData>,
curr_fin_l1_lib_header: HeaderId,
}
impl IndexerCore {
pub fn new(config: IndexerConfig) -> Result<Self> {
// ToDo: replace with correct startup
let hashable_data = HashableBlockData {
block_id: 1,
transactions: vec![],
@ -33,10 +47,20 @@ impl IndexerCore {
timestamp: 0,
};
// Genesis creation is fine as it is,
// because it will be overwritten by sequencer.
// Therefore:
// ToDo: remove key from indexer config, use some default.
let signing_key = nssa::PrivateKey::try_new(config.signing_key).unwrap();
let channel_genesis_msg_id = [0; 32];
let start_block = hashable_data.into_pending_block(&signing_key, channel_genesis_msg_id);
// This is a troubling moment, because changes in key protocol can
// affect this. And indexer can not reliably ask this data from sequencer
// because indexer must be independent from it.
// ToDo: move initial state generation into common and use the same method
// for indexer and sequencer. This way both services buit at same version
// could be in sync.
let initial_commitments: Vec<nssa_core::Commitment> = config
.initial_commitments
.iter()
@ -71,71 +95,250 @@ impl IndexerCore {
config.bedrock_client_config.auth.clone(),
)?,
config,
// ToDo: Implement restarts
store: IndexerStore::open_db_with_genesis(&home, Some((start_block, state)))?,
})
}
pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> {
async_stream::stream! {
loop {
let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);
info!("Searching for initial header");
info!("Block stream joined");
let last_l1_lib_header = self.store.last_observed_l1_lib_header()?;
while let Some(block_info) = stream_pinned.next().await {
let header_id = block_info.header_id;
let mut prev_last_l1_lib_header = match last_l1_lib_header {
Some(last_l1_lib_header) => {
info!("Last l1 lib header found: {last_l1_lib_header}");
last_l1_lib_header
},
None => {
info!("Last l1 lib header not found in DB");
info!("Searching for the start of a channel");
info!("Observed L1 block at height {}", block_info.height);
let BackfillData {
block_data: start_buff,
curr_fin_l1_lib_header: last_l1_lib_header,
} = self.search_for_channel_start().await?;
if let Some(l1_block) = self
.bedrock_client
.get_block_by_id(header_id)
.await?
{
info!("Extracted L1 block at height {}", block_info.height);
for BackfillBlockData {
l2_blocks: l2_block_vec,
l1_header,
} in start_buff {
for l2_block in l2_block_vec {
self.store.put_block(l2_block.clone(), l1_header)?;
let l2_blocks_parsed = parse_blocks(
l1_block.into_transactions().into_iter(),
&self.config.channel_id,
).collect::<Vec<_>>();
yield Ok(l2_block);
}
}
let mut l2_blocks_parsed_ids: Vec<_> = l2_blocks_parsed.iter().map(|block| block.header.block_id).collect();
l2_blocks_parsed_ids.sort();
info!("Parsed {} L2 blocks with ids {:?}", l2_blocks_parsed.len(), l2_blocks_parsed_ids);
last_l1_lib_header
},
};
for l2_block in l2_blocks_parsed {
self.store.put_block(l2_block.clone())?;
info!("Searching for initial header finished");
info!("Starting backfilling from {prev_last_l1_lib_header}");
loop {
let BackfillData {
block_data: buff,
curr_fin_l1_lib_header,
} = self
.backfill_to_last_l1_lib_header_id(prev_last_l1_lib_header, &self.config.channel_id)
.await
.inspect_err(|err| error!("Failed to backfill to last l1 lib header id with err {err:#?}"))?;
prev_last_l1_lib_header = curr_fin_l1_lib_header;
for BackfillBlockData {
l2_blocks: l2_block_vec,
l1_header: header,
} in buff {
for l2_block in l2_block_vec {
self.store.put_block(l2_block.clone(), header)?;
yield Ok(l2_block);
}
}
}
}
}
// Refetch stream after delay
tokio::time::sleep(std::time::Duration::from_millis(
self.config.resubscribe_interval_millis,
))
.await;
async fn get_lib(&self) -> Result<HeaderId> {
Ok(self.bedrock_client.get_consensus_info().await?.lib)
}
async fn get_next_lib(&self, prev_lib: HeaderId) -> Result<HeaderId> {
loop {
let next_lib = self.get_lib().await?;
if next_lib != prev_lib {
break Ok(next_lib);
} else {
info!(
"Wait {}ms to not spam the node",
self.config.consensus_info_polling_interval_millis
);
tokio::time::sleep(std::time::Duration::from_millis(
self.config.consensus_info_polling_interval_millis,
))
.await;
}
}
}
/// WARNING: depending on channel state,
/// may take indefinite amount of time
pub async fn search_for_channel_start(&self) -> Result<BackfillData> {
let mut curr_last_l1_lib_header = self.get_lib().await?;
let mut backfill_start = curr_last_l1_lib_header;
// ToDo: How to get root?
let mut backfill_limit = HeaderId::from([0; 32]);
// ToDo: Not scalable, initial buffer should be stored in DB to not run out of memory
// Don't want to complicate DB even more right now.
let mut block_buffer = VecDeque::new();
'outer: loop {
let mut cycle_header = curr_last_l1_lib_header;
loop {
let cycle_block =
if let Some(block) = self.bedrock_client.get_block_by_id(cycle_header).await? {
block
} else {
// First run can reach root easily
// so here we are optimistic about L1
// failing to get parent.
break;
};
// It would be better to have id, but block does not have it, so slot will do.
info!(
"INITIAL SEARCH: Observed L1 block at slot {}",
cycle_block.header().slot().into_inner()
);
debug!(
"INITIAL SEARCH: This block header is {}",
cycle_block.header().id()
);
debug!(
"INITIAL SEARCH: This block parent is {}",
cycle_block.header().parent()
);
let (l2_block_vec, l1_header) =
parse_block_owned(&cycle_block, &self.config.channel_id);
info!("Parsed {} L2 blocks", l2_block_vec.len());
if !l2_block_vec.is_empty() {
block_buffer.push_front(BackfillBlockData {
l2_blocks: l2_block_vec.clone(),
l1_header,
});
}
if let Some(first_l2_block) = l2_block_vec.first()
&& first_l2_block.header.block_id == 1
{
info!("INITIAL_SEARCH: Found channel start");
break 'outer;
}
// Step back to parent
let parent = cycle_block.header().parent();
if parent == backfill_limit {
break;
}
cycle_header = parent;
}
info!("INITIAL_SEARCH: Reached backfill limit, refetching last l1 lib header");
block_buffer.clear();
backfill_limit = backfill_start;
curr_last_l1_lib_header = self.get_next_lib(curr_last_l1_lib_header).await?;
backfill_start = curr_last_l1_lib_header;
}
Ok(BackfillData {
block_data: block_buffer,
curr_fin_l1_lib_header: backfill_limit,
})
}
pub async fn backfill_to_last_l1_lib_header_id(
&self,
last_fin_l1_lib_header: HeaderId,
channel_id: &ChannelId,
) -> Result<BackfillData> {
let curr_fin_l1_lib_header = self.get_next_lib(last_fin_l1_lib_header).await?;
// ToDo: Not scalable, buffer should be stored in DB to not run out of memory
// Don't want to complicate DB even more right now.
let mut block_buffer = VecDeque::new();
let mut cycle_header = curr_fin_l1_lib_header;
loop {
let Some(cycle_block) = self.bedrock_client.get_block_by_id(cycle_header).await? else {
return Err(anyhow::anyhow!("Parent not found"));
};
if cycle_block.header().id() == last_fin_l1_lib_header {
break;
} else {
// Step back to parent
cycle_header = cycle_block.header().parent();
}
// It would be better to have id, but block does not have it, so slot will do.
info!(
"Observed L1 block at slot {}",
cycle_block.header().slot().into_inner()
);
let (l2_block_vec, l1_header) = parse_block_owned(&cycle_block, channel_id);
info!("Parsed {} L2 blocks", l2_block_vec.len());
if !l2_block_vec.is_empty() {
block_buffer.push_front(BackfillBlockData {
l2_blocks: l2_block_vec,
l1_header,
});
}
}
Ok(BackfillData {
block_data: block_buffer,
curr_fin_l1_lib_header,
})
}
}
fn parse_blocks(
block_txs: impl Iterator<Item = SignedMantleTx>,
fn parse_block_owned(
l1_block: &bedrock_client::Block<SignedMantleTx>,
decoded_channel_id: &ChannelId,
) -> impl Iterator<Item = Block> {
block_txs.flat_map(|tx| {
tx.mantle_tx.ops.into_iter().filter_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
channel_id,
inscription,
..
}) if channel_id == *decoded_channel_id => {
borsh::from_slice::<Block>(&inscription).ok()
}
_ => None,
})
})
) -> (Vec<Block>, HeaderId) {
(
l1_block
.transactions()
.flat_map(|tx| {
tx.mantle_tx.ops.iter().filter_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
channel_id,
inscription,
..
}) if channel_id == decoded_channel_id => {
borsh::from_slice::<Block>(inscription)
.inspect_err(|err| {
error!("Failed to deserialize our inscription with err: {err:#?}")
})
.ok()
}
_ => None,
})
})
.collect(),
l1_block.header().id(),
)
}

View File

@ -1,6 +1,6 @@
{
"home": "./indexer/service",
"resubscribe_interval_millis": 1000,
"consensus_info_polling_interval_millis": 10000,
"bedrock_client_config": {
"addr": "http://localhost:8080",
"backoff": {

View File

@ -19,7 +19,7 @@ pub fn indexer_config(
) -> Result<IndexerConfig> {
Ok(IndexerConfig {
home,
resubscribe_interval_millis: 1000,
consensus_info_polling_interval_millis: 10000,
bedrock_client_config: ClientConfig {
addr: addr_to_url(UrlProtocol::Http, bedrock_addr)
.context("Failed to convert bedrock addr to URL")?,
@ -74,7 +74,7 @@ pub fn sequencer_config(
max_num_tx_in_block,
mempool_max_size,
block_create_timeout_millis,
retry_pending_blocks_timeout_millis: 240_000,
retry_pending_blocks_timeout_millis: 120_000,
port: 0,
initial_accounts: initial_data.sequencer_initial_accounts(),
initial_commitments: initial_data.sequencer_initial_commitments(),

View File

@ -12,10 +12,9 @@ use tokio::test;
use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand};
/// Timeout in milliseconds to reliably await for block finalization
const L2_TO_L1_TIMEOUT_MILLIS: u64 = 300000;
const L2_TO_L1_TIMEOUT_MILLIS: u64 = 600000;
#[test]
#[ignore = "Not reliable with current bedrock node"]
async fn indexer_test_run() -> Result<()> {
let ctx = TestContext::new().await?;
@ -45,7 +44,6 @@ async fn indexer_test_run() -> Result<()> {
}
#[test]
#[ignore = "Not reliable with current bedrock node"]
async fn indexer_block_batching() -> Result<()> {
let ctx = TestContext::new().await?;
@ -81,7 +79,6 @@ async fn indexer_block_batching() -> Result<()> {
}
#[test]
#[ignore = "Not reliable with current bedrock node"]
async fn indexer_state_consistency() -> Result<()> {
let mut ctx = TestContext::new().await?;

View File

@ -24,6 +24,9 @@ pub const CACHE_SIZE: usize = 1000;
pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db";
/// Key base for storing metainformation about id of last current block in db
pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db";
/// Key base for storing metainformation about id of last observed L1 lib header in db
pub const DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY: &str =
"last_observed_l1_lib_header_in_db";
/// Key base for storing metainformation which describe if first block has been set
pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set";
/// Key base for storing metainformation about the last breakpoint
@ -214,6 +217,37 @@ impl RocksDBIO {
}
}
pub fn get_meta_last_observed_l1_lib_header_in_db(&self) -> DbResult<Option<[u8; 32]>> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY).map_err(
|err| {
DbError::borsh_cast_message(
err,
Some(
"Failed to serialize DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY"
.to_string(),
),
)
},
)?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
res.map(|data| {
borsh::from_slice::<[u8; 32]>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize last l1 lib header".to_string()),
)
})
})
.transpose()
}
pub fn get_meta_is_first_block_set(&self) -> DbResult<bool> {
let cf_meta = self.meta_column();
let res = self
@ -281,7 +315,7 @@ impl RocksDBIO {
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
self.put_block(block)?;
self.put_block(block, [0; 32])?;
Ok(())
}
@ -307,6 +341,36 @@ impl RocksDBIO {
Ok(())
}
pub fn put_meta_last_observed_l1_lib_header_in_db(
&self,
l1_lib_header: [u8; 32],
) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY).map_err(
|err| {
DbError::borsh_cast_message(
err,
Some(
"Failed to serialize DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY"
.to_string(),
),
)
},
)?,
borsh::to_vec(&l1_lib_header).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last l1 block header".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn put_meta_last_breakpoint_id(&self, br_id: u64) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
@ -348,7 +412,7 @@ impl RocksDBIO {
// Block
pub fn put_block(&self, block: Block) -> DbResult<()> {
pub fn put_block(&self, block: Block, l1_lib_header: [u8; 32]) -> DbResult<()> {
let cf_block = self.block_column();
let cf_hti = self.hash_to_id_column();
let cf_tti: Arc<BoundColumnFamily<'_>> = self.tx_hash_to_id_column();
@ -377,6 +441,7 @@ impl RocksDBIO {
if block.header.block_id > last_curr_block {
self.put_meta_last_block_in_db(block.header.block_id)?;
self.put_meta_last_observed_l1_lib_header_in_db(l1_lib_header)?;
}
self.db
@ -954,7 +1019,7 @@ mod tests {
let transfer_tx = transfer(1, 0, true);
let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]);
dbio.put_block(block).unwrap();
dbio.put_block(block, [1; 32]).unwrap();
let last_id = dbio.get_meta_last_block_in_db().unwrap();
let first_id = dbio.get_meta_first_block_in_db().unwrap();
@ -997,7 +1062,7 @@ mod tests {
let transfer_tx = transfer(1, (i - 1) as u128, true);
let block =
common::test_utils::produce_dummy_block(i + 1, Some(prev_hash), vec![transfer_tx]);
dbio.put_block(block).unwrap();
dbio.put_block(block, [i as u8; 32]).unwrap();
}
let last_id = dbio.get_meta_last_block_in_db().unwrap();
@ -1051,7 +1116,7 @@ mod tests {
let control_hash1 = block.header.hash;
dbio.put_block(block).unwrap();
dbio.put_block(block, [1; 32]).unwrap();
let last_id = dbio.get_meta_last_block_in_db().unwrap();
let last_block = dbio.get_block(last_id).unwrap();
@ -1062,7 +1127,7 @@ mod tests {
let control_hash2 = block.header.hash;
dbio.put_block(block).unwrap();
dbio.put_block(block, [2; 32]).unwrap();
let last_id = dbio.get_meta_last_block_in_db().unwrap();
let last_block = dbio.get_block(last_id).unwrap();
@ -1073,7 +1138,7 @@ mod tests {
let control_tx_hash1 = transfer_tx.hash();
let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]);
dbio.put_block(block).unwrap();
dbio.put_block(block, [3; 32]).unwrap();
let last_id = dbio.get_meta_last_block_in_db().unwrap();
let last_block = dbio.get_block(last_id).unwrap();
@ -1084,7 +1149,7 @@ mod tests {
let control_tx_hash2 = transfer_tx.hash();
let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]);
dbio.put_block(block).unwrap();
dbio.put_block(block, [4; 32]).unwrap();
let control_block_id1 = dbio.get_block_id_by_hash(control_hash1.0).unwrap();
let control_block_id2 = dbio.get_block_id_by_hash(control_hash2.0).unwrap();
@ -1115,7 +1180,7 @@ mod tests {
let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]);
block_res.push(block.clone());
dbio.put_block(block).unwrap();
dbio.put_block(block, [1; 32]).unwrap();
let last_id = dbio.get_meta_last_block_in_db().unwrap();
let last_block = dbio.get_block(last_id).unwrap();
@ -1125,7 +1190,7 @@ mod tests {
let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]);
block_res.push(block.clone());
dbio.put_block(block).unwrap();
dbio.put_block(block, [2; 32]).unwrap();
let last_id = dbio.get_meta_last_block_in_db().unwrap();
let last_block = dbio.get_block(last_id).unwrap();
@ -1135,7 +1200,7 @@ mod tests {
let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]);
block_res.push(block.clone());
dbio.put_block(block).unwrap();
dbio.put_block(block, [3; 32]).unwrap();
let last_id = dbio.get_meta_last_block_in_db().unwrap();
let last_block = dbio.get_block(last_id).unwrap();
@ -1145,7 +1210,7 @@ mod tests {
let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]);
block_res.push(block.clone());
dbio.put_block(block).unwrap();
dbio.put_block(block, [4; 32]).unwrap();
let block_hashes_mem: Vec<[u8; 32]> =
block_res.into_iter().map(|bl| bl.header.hash.0).collect();
@ -1189,7 +1254,7 @@ mod tests {
let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]);
dbio.put_block(block).unwrap();
dbio.put_block(block, [1; 32]).unwrap();
let last_id = dbio.get_meta_last_block_in_db().unwrap();
let last_block = dbio.get_block(last_id).unwrap();
@ -1201,7 +1266,7 @@ mod tests {
let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]);
dbio.put_block(block).unwrap();
dbio.put_block(block, [2; 32]).unwrap();
let last_id = dbio.get_meta_last_block_in_db().unwrap();
let last_block = dbio.get_block(last_id).unwrap();
@ -1213,7 +1278,7 @@ mod tests {
let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]);
dbio.put_block(block).unwrap();
dbio.put_block(block, [3; 32]).unwrap();
let last_id = dbio.get_meta_last_block_in_db().unwrap();
let last_block = dbio.get_block(last_id).unwrap();
@ -1225,7 +1290,7 @@ mod tests {
let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]);
dbio.put_block(block).unwrap();
dbio.put_block(block, [4; 32]).unwrap();
let acc1_tx = dbio.get_acc_transactions(*acc1().value(), 0, 4).unwrap();
let acc1_tx_hashes: Vec<[u8; 32]> = acc1_tx.into_iter().map(|tx| tx.hash().0).collect();