diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ce9903b5..eb4cc791 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -154,7 +154,35 @@ jobs: env: RISC0_DEV_MODE: "1" RUST_LOG: "info" - run: cargo nextest run -p integration_tests -- --skip tps_test + run: cargo nextest run -p integration_tests -- --skip tps_test --skip indexer + + integration-tests-indexer: + runs-on: ubuntu-latest + timeout-minutes: 60 + steps: + - uses: actions/checkout@v5 + with: + ref: ${{ github.head_ref }} + + - uses: ./.github/actions/install-system-deps + + - uses: ./.github/actions/install-risc0 + + - uses: ./.github/actions/install-logos-blockchain-circuits + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Install active toolchain + run: rustup install + + - name: Install nextest + run: cargo install --locked cargo-nextest + + - name: Run tests + env: + RISC0_DEV_MODE: "1" + RUST_LOG: "info" + run: cargo nextest run -p integration_tests indexer -- --skip tps_test valid-proof-test: runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 5543243a..85856c94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1212,6 +1212,7 @@ dependencies = [ "futures", "log", "logos-blockchain-chain-broadcast-service", + "logos-blockchain-chain-service", "logos-blockchain-common-http-client", "logos-blockchain-core", "reqwest", @@ -1448,9 +1449,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.19.1" +version = "3.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" +checksum = "5c6f81257d10a0f602a294ae4182251151ff97dbb504ef9afcdda4a64b24d9b4" [[package]] name = "bytemuck" @@ -4647,7 +4648,7 @@ checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "logos-blockchain-blend-crypto" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "blake2", "logos-blockchain-groth16", @@ -4661,7 +4662,7 @@ dependencies = [ [[package]] name = "logos-blockchain-blend-message" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "blake2", "derivative", @@ -4683,7 +4684,7 @@ dependencies = [ [[package]] name = "logos-blockchain-blend-proofs" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "ed25519-dalek", "generic-array 1.3.5", @@ -4700,7 +4701,7 @@ dependencies = [ [[package]] name = "logos-blockchain-chain-broadcast-service" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "derivative", @@ -4716,7 +4717,7 @@ dependencies = [ [[package]] name = "logos-blockchain-chain-service" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "bytes", @@ -4746,7 +4747,7 @@ dependencies = [ [[package]] name = "logos-blockchain-circuits-prover" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "logos-blockchain-circuits-utils", "tempfile", @@ -4755,7 +4756,7 @@ dependencies = [ [[package]] name = "logos-blockchain-circuits-utils" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "dirs", ] @@ -4763,7 +4764,7 @@ dependencies = [ [[package]] name = "logos-blockchain-common-http-client" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "futures", "hex", @@ -4783,7 +4784,7 @@ dependencies = [ [[package]] name = "logos-blockchain-core" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "ark-ff 0.4.2", "bincode", @@ -4813,7 +4814,7 @@ dependencies = [ [[package]] name = "logos-blockchain-cryptarchia-engine" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "cfg_eval", "logos-blockchain-pol", @@ -4829,7 +4830,7 @@ dependencies = [ [[package]] name = "logos-blockchain-cryptarchia-sync" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "bytes", "futures", @@ -4846,7 +4847,7 @@ dependencies = [ [[package]] name = "logos-blockchain-groth16" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "ark-bn254 0.4.0", "ark-ec 0.4.2", @@ -4864,7 +4865,7 @@ dependencies = [ [[package]] name = "logos-blockchain-http-api-common" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "axum 0.7.9", "logos-blockchain-core", @@ -4878,7 +4879,7 @@ dependencies = [ [[package]] name = "logos-blockchain-key-management-system-keys" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "bytes", @@ -4904,7 +4905,7 @@ dependencies = [ [[package]] name = "logos-blockchain-key-management-system-macros" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "proc-macro2", "quote", @@ -4914,7 +4915,7 @@ dependencies = [ [[package]] name = "logos-blockchain-key-management-system-operators" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "logos-blockchain-blend-proofs", @@ -4930,7 +4931,7 @@ dependencies = [ [[package]] name = "logos-blockchain-key-management-system-service" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "log", @@ -4946,7 +4947,7 @@ dependencies = [ [[package]] name = "logos-blockchain-ledger" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "derivative", "logos-blockchain-blend-crypto", @@ -4970,7 +4971,7 @@ dependencies = [ [[package]] name = "logos-blockchain-network-service" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "futures", @@ -4986,7 +4987,7 @@ dependencies = [ [[package]] name = "logos-blockchain-poc" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "logos-blockchain-circuits-prover", "logos-blockchain-circuits-utils", @@ -5002,7 +5003,7 @@ dependencies = [ [[package]] name = "logos-blockchain-pol" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "astro-float", "logos-blockchain-circuits-prover", @@ -5021,7 +5022,7 @@ dependencies = [ [[package]] name = "logos-blockchain-poq" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "logos-blockchain-circuits-prover", "logos-blockchain-circuits-utils", @@ -5038,7 +5039,7 @@ dependencies = [ [[package]] name = "logos-blockchain-poseidon2" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "ark-bn254 0.4.0", "ark-ff 0.4.2", @@ -5049,7 +5050,7 @@ dependencies = [ [[package]] name = "logos-blockchain-services-utils" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "futures", @@ -5064,7 +5065,7 @@ dependencies = [ [[package]] name = "logos-blockchain-storage-service" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "bytes", @@ -5081,7 +5082,7 @@ dependencies = [ [[package]] name = "logos-blockchain-time-service" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "futures", @@ -5099,7 +5100,7 @@ dependencies = [ [[package]] name = "logos-blockchain-utils" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "async-trait", "blake2", @@ -5116,7 +5117,7 @@ dependencies = [ [[package]] name = "logos-blockchain-utxotree" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "ark-ff 0.4.2", "logos-blockchain-groth16", @@ -5130,7 +5131,7 @@ dependencies = [ [[package]] name = "logos-blockchain-witness-generator" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "tempfile", ] @@ -5138,7 +5139,7 @@ dependencies = [ [[package]] name = "logos-blockchain-zksign" version = "0.1.0" -source = "git+https://github.com/logos-blockchain/logos-blockchain.git#b862e6f640a79097b8a42c072e1f78bc430fa222" +source = "git+https://github.com/logos-blockchain/logos-blockchain.git#81192877116cbc3eedf6688b85fab6dd0e448290" dependencies = [ "logos-blockchain-circuits-prover", "logos-blockchain-circuits-utils", @@ -5450,9 +5451,9 @@ dependencies = [ [[package]] name = "native-tls" -version = "0.2.16" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5d26952a508f321b4d3d2e80e78fc2603eaefcdf0c30783867f19586518bdc" +checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2" dependencies = [ "libc", "log", diff --git a/Cargo.toml b/Cargo.toml index 5f2a7033..520844b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -113,6 +113,7 @@ logos-blockchain-common-http-client = { git = "https://github.com/logos-blockcha logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-chain-broadcast-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } +logos-blockchain-chain-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } rocksdb = { version = "0.24.0", default-features = false, features = [ "snappy", diff --git a/README.md b/README.md index 4f9fa7ca..b94a68d4 100644 --- a/README.md +++ b/README.md @@ -157,7 +157,7 @@ The sequencer and logos blockchain node can be run locally: ### Notes on cleanup After stopping services above you need to remove 3 folders to start cleanly: - 1. In the `logos-blockchain/logos-blockchain` folder `db` (not needed in case of docker setup) + 1. In the `logos-blockchain/logos-blockchain` folder `state` (not needed in case of docker setup) 2. In the `lssa` folder `sequencer_runner/rocksdb` 3. In the `lssa` file `sequencer_runner/bedrock_signing_key` 4. In the `lssa` folder `indexer/service/rocksdb` diff --git a/bedrock_client/Cargo.toml b/bedrock_client/Cargo.toml index bd275d8b..6f8b8a74 100644 --- a/bedrock_client/Cargo.toml +++ b/bedrock_client/Cargo.toml @@ -16,3 +16,4 @@ serde.workspace = true logos-blockchain-common-http-client.workspace = true logos-blockchain-core.workspace = true logos-blockchain-chain-broadcast-service.workspace = true +logos-blockchain-chain-service.workspace = true diff --git a/bedrock_client/src/lib.rs b/bedrock_client/src/lib.rs index 7655a31c..7aae7783 100644 --- a/bedrock_client/src/lib.rs +++ b/bedrock_client/src/lib.rs @@ -5,6 +5,7 @@ use common::config::BasicAuth; use futures::{Stream, TryFutureExt}; use log::{info, warn}; pub use logos_blockchain_chain_broadcast_service::BlockInfo; +use logos_blockchain_chain_service::CryptarchiaInfo; pub use logos_blockchain_common_http_client::{CommonHttpClient, Error}; pub use logos_blockchain_core::{block::Block, header::HeaderId, mantle::SignedMantleTx}; use reqwest::{Client, Url}; @@ -82,6 +83,15 @@ impl BedrockClient { .await } + pub async fn get_consensus_info(&self) -> Result { + Retry::spawn(self.backoff_strategy(), || { + self.http_client + .consensus_info(self.node_url.clone()) + .inspect_err(|err| warn!("Block fetching failed with error: {err:#}")) + }) + .await + } + fn backoff_strategy(&self) -> impl Iterator { tokio_retry::strategy::FibonacciBackoff::from_millis(self.backoff.start_delay_millis) .take(self.backoff.max_retries) diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index 496d0bf3..fc7641a1 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -1,6 +1,7 @@ use std::{path::Path, sync::Arc}; use anyhow::Result; +use bedrock_client::HeaderId; use common::{block::Block, transaction::NSSATransaction}; use nssa::{Account, AccountId, V02State}; use storage::indexer::RocksDBIO; @@ -31,6 +32,13 @@ impl IndexerStore { Self::open_db_with_genesis(location, None) } + pub fn last_observed_l1_lib_header(&self) -> Result> { + Ok(self + .dbio + .get_meta_last_observed_l1_lib_header_in_db()? + .map(HeaderId::from)) + } + pub fn get_last_block_id(&self) -> Result { Ok(self.dbio.get_meta_last_block_in_db()?) } @@ -92,7 +100,7 @@ impl IndexerStore { Ok(self.final_state()?.get_account_by_id(*account_id)) } - pub fn put_block(&self, block: Block) -> Result<()> { + pub fn put_block(&self, block: Block, l1_header: HeaderId) -> Result<()> { let mut final_state = self.dbio.final_state()?; for transaction in &block.body.transactions { @@ -102,6 +110,6 @@ impl IndexerStore { .execute_check_on_state(&mut final_state)?; } - Ok(self.dbio.put_block(block)?) + Ok(self.dbio.put_block(block, l1_header.into())?) } } diff --git a/indexer/core/src/config.rs b/indexer/core/src/config.rs index 968678e5..e3cd4d04 100644 --- a/indexer/core/src/config.rs +++ b/indexer/core/src/config.rs @@ -32,12 +32,8 @@ pub struct IndexerConfig { /// List of initial commitments pub initial_commitments: Vec, /// Sequencers signing key - /// - /// ToDo: Remove it after introducing bedrock block parsing. - /// Currently can not be removed, because indexer must start - /// chain BEFORE sequencer. pub signing_key: [u8; 32], - pub resubscribe_interval_millis: u64, + pub consensus_info_polling_interval_millis: u64, pub bedrock_client_config: ClientConfig, pub channel_id: ChannelId, } diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 0497e68a..1e2986a3 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -1,10 +1,11 @@ +use std::collections::VecDeque; + use anyhow::Result; -use bedrock_client::BedrockClient; +use bedrock_client::{BedrockClient, HeaderId}; use common::block::{Block, HashableBlockData}; // ToDo: Remove after testnet use common::{HashType, PINATA_BASE58}; -use futures::StreamExt; -use log::info; +use log::{debug, error, info}; use logos_blockchain_core::mantle::{ Op, SignedMantleTx, ops::channel::{ChannelId, inscribe::InscriptionOp}, @@ -23,9 +24,22 @@ pub struct IndexerCore { pub store: IndexerStore, } +#[derive(Clone)] +/// This struct represents one L1 block data fetched from backfilling +pub struct BackfillBlockData { + l2_blocks: Vec, + l1_header: HeaderId, +} + +#[derive(Clone)] +/// This struct represents data fetched fom backfilling in one iteration +pub struct BackfillData { + block_data: VecDeque, + curr_fin_l1_lib_header: HeaderId, +} + impl IndexerCore { pub fn new(config: IndexerConfig) -> Result { - // ToDo: replace with correct startup let hashable_data = HashableBlockData { block_id: 1, transactions: vec![], @@ -33,10 +47,20 @@ impl IndexerCore { timestamp: 0, }; + // Genesis creation is fine as it is, + // because it will be overwritten by sequencer. + // Therefore: + // ToDo: remove key from indexer config, use some default. let signing_key = nssa::PrivateKey::try_new(config.signing_key).unwrap(); let channel_genesis_msg_id = [0; 32]; let start_block = hashable_data.into_pending_block(&signing_key, channel_genesis_msg_id); + // This is a troubling moment, because changes in key protocol can + // affect this. And indexer can not reliably ask this data from sequencer + // because indexer must be independent from it. + // ToDo: move initial state generation into common and use the same method + // for indexer and sequencer. This way both services buit at same version + // could be in sync. let initial_commitments: Vec = config .initial_commitments .iter() @@ -71,71 +95,250 @@ impl IndexerCore { config.bedrock_client_config.auth.clone(), )?, config, - // ToDo: Implement restarts store: IndexerStore::open_db_with_genesis(&home, Some((start_block, state)))?, }) } pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream> { async_stream::stream! { - loop { - let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); + info!("Searching for initial header"); - info!("Block stream joined"); + let last_l1_lib_header = self.store.last_observed_l1_lib_header()?; - while let Some(block_info) = stream_pinned.next().await { - let header_id = block_info.header_id; + let mut prev_last_l1_lib_header = match last_l1_lib_header { + Some(last_l1_lib_header) => { + info!("Last l1 lib header found: {last_l1_lib_header}"); + last_l1_lib_header + }, + None => { + info!("Last l1 lib header not found in DB"); + info!("Searching for the start of a channel"); - info!("Observed L1 block at height {}", block_info.height); + let BackfillData { + block_data: start_buff, + curr_fin_l1_lib_header: last_l1_lib_header, + } = self.search_for_channel_start().await?; - if let Some(l1_block) = self - .bedrock_client - .get_block_by_id(header_id) - .await? - { - info!("Extracted L1 block at height {}", block_info.height); + for BackfillBlockData { + l2_blocks: l2_block_vec, + l1_header, + } in start_buff { + for l2_block in l2_block_vec { + self.store.put_block(l2_block.clone(), l1_header)?; - let l2_blocks_parsed = parse_blocks( - l1_block.into_transactions().into_iter(), - &self.config.channel_id, - ).collect::>(); + yield Ok(l2_block); + } + } - let mut l2_blocks_parsed_ids: Vec<_> = l2_blocks_parsed.iter().map(|block| block.header.block_id).collect(); - l2_blocks_parsed_ids.sort(); - info!("Parsed {} L2 blocks with ids {:?}", l2_blocks_parsed.len(), l2_blocks_parsed_ids); + last_l1_lib_header + }, + }; - for l2_block in l2_blocks_parsed { - self.store.put_block(l2_block.clone())?; + info!("Searching for initial header finished"); + + info!("Starting backfilling from {prev_last_l1_lib_header}"); + + loop { + let BackfillData { + block_data: buff, + curr_fin_l1_lib_header, + } = self + .backfill_to_last_l1_lib_header_id(prev_last_l1_lib_header, &self.config.channel_id) + .await + .inspect_err(|err| error!("Failed to backfill to last l1 lib header id with err {err:#?}"))?; + + prev_last_l1_lib_header = curr_fin_l1_lib_header; + + for BackfillBlockData { + l2_blocks: l2_block_vec, + l1_header: header, + } in buff { + for l2_block in l2_block_vec { + self.store.put_block(l2_block.clone(), header)?; yield Ok(l2_block); } } } + } + } - // Refetch stream after delay - tokio::time::sleep(std::time::Duration::from_millis( - self.config.resubscribe_interval_millis, - )) - .await; + async fn get_lib(&self) -> Result { + Ok(self.bedrock_client.get_consensus_info().await?.lib) + } + + async fn get_next_lib(&self, prev_lib: HeaderId) -> Result { + loop { + let next_lib = self.get_lib().await?; + if next_lib != prev_lib { + break Ok(next_lib); + } else { + info!( + "Wait {}ms to not spam the node", + self.config.consensus_info_polling_interval_millis + ); + tokio::time::sleep(std::time::Duration::from_millis( + self.config.consensus_info_polling_interval_millis, + )) + .await; + } } + } + + /// WARNING: depending on channel state, + /// may take indefinite amount of time + pub async fn search_for_channel_start(&self) -> Result { + let mut curr_last_l1_lib_header = self.get_lib().await?; + let mut backfill_start = curr_last_l1_lib_header; + // ToDo: How to get root? + let mut backfill_limit = HeaderId::from([0; 32]); + // ToDo: Not scalable, initial buffer should be stored in DB to not run out of memory + // Don't want to complicate DB even more right now. + let mut block_buffer = VecDeque::new(); + + 'outer: loop { + let mut cycle_header = curr_last_l1_lib_header; + + loop { + let cycle_block = + if let Some(block) = self.bedrock_client.get_block_by_id(cycle_header).await? { + block + } else { + // First run can reach root easily + // so here we are optimistic about L1 + // failing to get parent. + break; + }; + + // It would be better to have id, but block does not have it, so slot will do. + info!( + "INITIAL SEARCH: Observed L1 block at slot {}", + cycle_block.header().slot().into_inner() + ); + debug!( + "INITIAL SEARCH: This block header is {}", + cycle_block.header().id() + ); + debug!( + "INITIAL SEARCH: This block parent is {}", + cycle_block.header().parent() + ); + + let (l2_block_vec, l1_header) = + parse_block_owned(&cycle_block, &self.config.channel_id); + + info!("Parsed {} L2 blocks", l2_block_vec.len()); + + if !l2_block_vec.is_empty() { + block_buffer.push_front(BackfillBlockData { + l2_blocks: l2_block_vec.clone(), + l1_header, + }); + } + + if let Some(first_l2_block) = l2_block_vec.first() + && first_l2_block.header.block_id == 1 + { + info!("INITIAL_SEARCH: Found channel start"); + break 'outer; + } + + // Step back to parent + let parent = cycle_block.header().parent(); + + if parent == backfill_limit { + break; + } + + cycle_header = parent; + } + + info!("INITIAL_SEARCH: Reached backfill limit, refetching last l1 lib header"); + + block_buffer.clear(); + backfill_limit = backfill_start; + curr_last_l1_lib_header = self.get_next_lib(curr_last_l1_lib_header).await?; + backfill_start = curr_last_l1_lib_header; } + + Ok(BackfillData { + block_data: block_buffer, + curr_fin_l1_lib_header: backfill_limit, + }) + } + + pub async fn backfill_to_last_l1_lib_header_id( + &self, + last_fin_l1_lib_header: HeaderId, + channel_id: &ChannelId, + ) -> Result { + let curr_fin_l1_lib_header = self.get_next_lib(last_fin_l1_lib_header).await?; + // ToDo: Not scalable, buffer should be stored in DB to not run out of memory + // Don't want to complicate DB even more right now. + let mut block_buffer = VecDeque::new(); + + let mut cycle_header = curr_fin_l1_lib_header; + loop { + let Some(cycle_block) = self.bedrock_client.get_block_by_id(cycle_header).await? else { + return Err(anyhow::anyhow!("Parent not found")); + }; + + if cycle_block.header().id() == last_fin_l1_lib_header { + break; + } else { + // Step back to parent + cycle_header = cycle_block.header().parent(); + } + + // It would be better to have id, but block does not have it, so slot will do. + info!( + "Observed L1 block at slot {}", + cycle_block.header().slot().into_inner() + ); + + let (l2_block_vec, l1_header) = parse_block_owned(&cycle_block, channel_id); + + info!("Parsed {} L2 blocks", l2_block_vec.len()); + + if !l2_block_vec.is_empty() { + block_buffer.push_front(BackfillBlockData { + l2_blocks: l2_block_vec, + l1_header, + }); + } + } + + Ok(BackfillData { + block_data: block_buffer, + curr_fin_l1_lib_header, + }) } } -fn parse_blocks( - block_txs: impl Iterator, +fn parse_block_owned( + l1_block: &bedrock_client::Block, decoded_channel_id: &ChannelId, -) -> impl Iterator { - block_txs.flat_map(|tx| { - tx.mantle_tx.ops.into_iter().filter_map(|op| match op { - Op::ChannelInscribe(InscriptionOp { - channel_id, - inscription, - .. - }) if channel_id == *decoded_channel_id => { - borsh::from_slice::(&inscription).ok() - } - _ => None, - }) - }) +) -> (Vec, HeaderId) { + ( + l1_block + .transactions() + .flat_map(|tx| { + tx.mantle_tx.ops.iter().filter_map(|op| match op { + Op::ChannelInscribe(InscriptionOp { + channel_id, + inscription, + .. + }) if channel_id == decoded_channel_id => { + borsh::from_slice::(inscription) + .inspect_err(|err| { + error!("Failed to deserialize our inscription with err: {err:#?}") + }) + .ok() + } + _ => None, + }) + }) + .collect(), + l1_block.header().id(), + ) } diff --git a/indexer/service/configs/indexer_config.json b/indexer/service/configs/indexer_config.json index 97461b8b..e748d96a 100644 --- a/indexer/service/configs/indexer_config.json +++ b/indexer/service/configs/indexer_config.json @@ -1,6 +1,6 @@ { "home": "./indexer/service", - "resubscribe_interval_millis": 1000, + "consensus_info_polling_interval_millis": 10000, "bedrock_client_config": { "addr": "http://localhost:8080", "backoff": { diff --git a/integration_tests/src/config.rs b/integration_tests/src/config.rs index 62e34860..445929dd 100644 --- a/integration_tests/src/config.rs +++ b/integration_tests/src/config.rs @@ -19,7 +19,7 @@ pub fn indexer_config( ) -> Result { Ok(IndexerConfig { home, - resubscribe_interval_millis: 1000, + consensus_info_polling_interval_millis: 10000, bedrock_client_config: ClientConfig { addr: addr_to_url(UrlProtocol::Http, bedrock_addr) .context("Failed to convert bedrock addr to URL")?, @@ -74,7 +74,7 @@ pub fn sequencer_config( max_num_tx_in_block, mempool_max_size, block_create_timeout_millis, - retry_pending_blocks_timeout_millis: 240_000, + retry_pending_blocks_timeout_millis: 120_000, port: 0, initial_accounts: initial_data.sequencer_initial_accounts(), initial_commitments: initial_data.sequencer_initial_commitments(), diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs index d0bb80cf..d5207a41 100644 --- a/integration_tests/tests/indexer.rs +++ b/integration_tests/tests/indexer.rs @@ -12,10 +12,9 @@ use tokio::test; use wallet::cli::{Command, programs::native_token_transfer::AuthTransferSubcommand}; /// Timeout in milliseconds to reliably await for block finalization -const L2_TO_L1_TIMEOUT_MILLIS: u64 = 300000; +const L2_TO_L1_TIMEOUT_MILLIS: u64 = 600000; #[test] -#[ignore = "Not reliable with current bedrock node"] async fn indexer_test_run() -> Result<()> { let ctx = TestContext::new().await?; @@ -45,7 +44,6 @@ async fn indexer_test_run() -> Result<()> { } #[test] -#[ignore = "Not reliable with current bedrock node"] async fn indexer_block_batching() -> Result<()> { let ctx = TestContext::new().await?; @@ -81,7 +79,6 @@ async fn indexer_block_batching() -> Result<()> { } #[test] -#[ignore = "Not reliable with current bedrock node"] async fn indexer_state_consistency() -> Result<()> { let mut ctx = TestContext::new().await?; diff --git a/storage/src/indexer.rs b/storage/src/indexer.rs index e5e23455..94a1a1a0 100644 --- a/storage/src/indexer.rs +++ b/storage/src/indexer.rs @@ -24,6 +24,9 @@ pub const CACHE_SIZE: usize = 1000; pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db"; /// Key base for storing metainformation about id of last current block in db pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db"; +/// Key base for storing metainformation about id of last observed L1 lib header in db +pub const DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY: &str = + "last_observed_l1_lib_header_in_db"; /// Key base for storing metainformation which describe if first block has been set pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set"; /// Key base for storing metainformation about the last breakpoint @@ -214,6 +217,37 @@ impl RocksDBIO { } } + pub fn get_meta_last_observed_l1_lib_header_in_db(&self) -> DbResult> { + let cf_meta = self.meta_column(); + let res = self + .db + .get_cf( + &cf_meta, + borsh::to_vec(&DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY).map_err( + |err| { + DbError::borsh_cast_message( + err, + Some( + "Failed to serialize DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY" + .to_string(), + ), + ) + }, + )?, + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + res.map(|data| { + borsh::from_slice::<[u8; 32]>(&data).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to deserialize last l1 lib header".to_string()), + ) + }) + }) + .transpose() + } + pub fn get_meta_is_first_block_set(&self) -> DbResult { let cf_meta = self.meta_column(); let res = self @@ -281,7 +315,7 @@ impl RocksDBIO { ) .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; - self.put_block(block)?; + self.put_block(block, [0; 32])?; Ok(()) } @@ -307,6 +341,36 @@ impl RocksDBIO { Ok(()) } + pub fn put_meta_last_observed_l1_lib_header_in_db( + &self, + l1_lib_header: [u8; 32], + ) -> DbResult<()> { + let cf_meta = self.meta_column(); + self.db + .put_cf( + &cf_meta, + borsh::to_vec(&DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY).map_err( + |err| { + DbError::borsh_cast_message( + err, + Some( + "Failed to serialize DB_META_LAST_OBSERVED_L1_LIB_HEADER_ID_IN_DB_KEY" + .to_string(), + ), + ) + }, + )?, + borsh::to_vec(&l1_lib_header).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize last l1 block header".to_string()), + ) + })?, + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + Ok(()) + } + pub fn put_meta_last_breakpoint_id(&self, br_id: u64) -> DbResult<()> { let cf_meta = self.meta_column(); self.db @@ -348,7 +412,7 @@ impl RocksDBIO { // Block - pub fn put_block(&self, block: Block) -> DbResult<()> { + pub fn put_block(&self, block: Block, l1_lib_header: [u8; 32]) -> DbResult<()> { let cf_block = self.block_column(); let cf_hti = self.hash_to_id_column(); let cf_tti: Arc> = self.tx_hash_to_id_column(); @@ -377,6 +441,7 @@ impl RocksDBIO { if block.header.block_id > last_curr_block { self.put_meta_last_block_in_db(block.header.block_id)?; + self.put_meta_last_observed_l1_lib_header_in_db(l1_lib_header)?; } self.db @@ -954,7 +1019,7 @@ mod tests { let transfer_tx = transfer(1, 0, true); let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let first_id = dbio.get_meta_first_block_in_db().unwrap(); @@ -997,7 +1062,7 @@ mod tests { let transfer_tx = transfer(1, (i - 1) as u128, true); let block = common::test_utils::produce_dummy_block(i + 1, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [i as u8; 32]).unwrap(); } let last_id = dbio.get_meta_last_block_in_db().unwrap(); @@ -1051,7 +1116,7 @@ mod tests { let control_hash1 = block.header.hash; - dbio.put_block(block).unwrap(); + dbio.put_block(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1062,7 +1127,7 @@ mod tests { let control_hash2 = block.header.hash; - dbio.put_block(block).unwrap(); + dbio.put_block(block, [2; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1073,7 +1138,7 @@ mod tests { let control_tx_hash1 = transfer_tx.hash(); let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [3; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1084,7 +1149,7 @@ mod tests { let control_tx_hash2 = transfer_tx.hash(); let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [4; 32]).unwrap(); let control_block_id1 = dbio.get_block_id_by_hash(control_hash1.0).unwrap(); let control_block_id2 = dbio.get_block_id_by_hash(control_hash2.0).unwrap(); @@ -1115,7 +1180,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1125,7 +1190,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [2; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1135,7 +1200,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [3; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1145,7 +1210,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); block_res.push(block.clone()); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [4; 32]).unwrap(); let block_hashes_mem: Vec<[u8; 32]> = block_res.into_iter().map(|bl| bl.header.hash.0).collect(); @@ -1189,7 +1254,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [1; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1201,7 +1266,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [2; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1213,7 +1278,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [3; 32]).unwrap(); let last_id = dbio.get_meta_last_block_in_db().unwrap(); let last_block = dbio.get_block(last_id).unwrap(); @@ -1225,7 +1290,7 @@ mod tests { let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]); - dbio.put_block(block).unwrap(); + dbio.put_block(block, [4; 32]).unwrap(); let acc1_tx = dbio.get_acc_transactions(*acc1().value(), 0, 4).unwrap(); let acc1_tx_hashes: Vec<[u8; 32]> = acc1_tx.into_iter().map(|tx| tx.hash().0).collect();