From efac8639c3206c65d2e2863294f3934ee3e92856 Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Tue, 3 Feb 2026 11:36:07 +0200 Subject: [PATCH] fix: merge fix --- Cargo.lock | 1 + common/src/block.rs | 2 +- common/src/rpc_primitives/requests.rs | 1 - common/src/sequencer_client.rs | 7 +- indexer/core/src/block_store.rs | 7 +- indexer/core/src/config.rs | 9 +- indexer/core/src/lib.rs | 125 ++++++++++++++++---------- indexer/service/src/lib.rs | 2 +- indexer/service/src/service.rs | 4 +- integration_tests/Cargo.toml | 3 +- integration_tests/src/config.rs | 38 ++++---- integration_tests/src/lib.rs | 1 - integration_tests/tests/indexer.rs | 20 ++--- integration_tests/tests/tps.rs | 5 +- sequencer_core/src/config.rs | 4 +- sequencer_core/src/lib.rs | 2 +- sequencer_rpc/src/process.rs | 13 +-- storage/src/sequencer.rs | 11 +++ 18 files changed, 141 insertions(+), 114 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9518d358..2a573d1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3324,6 +3324,7 @@ dependencies = [ "env_logger", "futures", "hex", + "indexer_core", "indexer_service", "key_protocol", "log", diff --git a/common/src/block.rs b/common/src/block.rs index 2c2e4541..9af16fb6 100644 --- a/common/src/block.rs +++ b/common/src/block.rs @@ -46,7 +46,7 @@ pub enum BedrockStatus { Finalized, } -#[derive(Debug, BorshSerialize, BorshDeserialize)] +#[derive(Debug, BorshSerialize, BorshDeserialize, Clone)] pub struct Block { pub header: BlockHeader, pub body: BlockBody, diff --git a/common/src/rpc_primitives/requests.rs b/common/src/rpc_primitives/requests.rs index 526cdb67..c843eadd 100644 --- a/common/src/rpc_primitives/requests.rs +++ b/common/src/rpc_primitives/requests.rs @@ -90,7 +90,6 @@ parse_request!(GetAccountsNoncesRequest); parse_request!(GetProofForCommitmentRequest); parse_request!(GetAccountRequest); parse_request!(GetProgramIdsRequest); -parse_request!(PostIndexerMessageRequest); parse_request!(GetGenesisBlockRequest); #[derive(Serialize, Deserialize, Debug)] diff --git a/common/src/sequencer_client.rs b/common/src/sequencer_client.rs index 8d846421..4c97ea1e 100644 --- a/common/src/sequencer_client.rs +++ b/common/src/sequencer_client.rs @@ -18,12 +18,7 @@ use crate::{ rpc_primitives::{ self, requests::{ - GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, - GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, - GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse, - GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, - GetProofForCommitmentResponse, GetTransactionByHashRequest, - GetTransactionByHashResponse, SendTxRequest, SendTxResponse, + GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetGenesisBlockRequest, GetGenesisBlockResponse, GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest, SendTxResponse }, }, transaction::{EncodedTransaction, NSSATransaction}, diff --git a/indexer/core/src/block_store.rs b/indexer/core/src/block_store.rs index 0ec00c4f..35037309 100644 --- a/indexer/core/src/block_store.rs +++ b/indexer/core/src/block_store.rs @@ -1,4 +1,4 @@ -use std::path::Path; +use std::{path::Path, sync::Arc}; use anyhow::Result; use common::{ @@ -8,8 +8,9 @@ use common::{ use nssa::V02State; use storage::indexer::RocksDBIO; +#[derive(Clone)] pub struct IndexerStore { - dbio: RocksDBIO, + dbio: Arc, } impl IndexerStore { @@ -23,7 +24,7 @@ impl IndexerStore { ) -> Result { let dbio = RocksDBIO::open_or_create(location, start_data)?; - Ok(Self { dbio }) + Ok(Self { dbio: Arc::new(dbio) }) } /// Reopening existing database diff --git a/indexer/core/src/config.rs b/indexer/core/src/config.rs index 0127b30b..152b6916 100644 --- a/indexer/core/src/config.rs +++ b/indexer/core/src/config.rs @@ -7,15 +7,13 @@ use std::{ use anyhow::{Context, Result}; use bedrock_client::BackoffConfig; use common::{ - block::{AccountInitialData, CommitmentsInitialData}, - sequencer_client::BasicAuth, -}; + block::{AccountInitialData, CommitmentsInitialData}, config::BasicAuth}; use logos_blockchain_core::mantle::ops::channel::ChannelId; use serde::{Deserialize, Serialize}; use url::Url; #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct BedrockClientConfig { +pub struct ClientConfig { pub addr: Url, pub auth: Option, } @@ -31,7 +29,8 @@ pub struct IndexerConfig { pub resubscribe_interval_millis: u64, /// For individual RPC requests we use Fibonacci backoff retry strategy. pub backoff: BackoffConfig, - pub bedrock_client_config: BedrockClientConfig, + pub bedrock_client_config: ClientConfig, + pub sequencer_client_config: ClientConfig, pub channel_id: ChannelId, } diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 78c1e5ce..cb4d82f6 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -1,89 +1,120 @@ -use std::sync::Arc; - use anyhow::Result; use bedrock_client::BedrockClient; -use common::block::Block; +// ToDo: Remove after testnet +use common::PINATA_BASE58; +use common::{ + block::Block, + sequencer_client::SequencerClient, +}; use futures::StreamExt; use log::info; use logos_blockchain_core::mantle::{ Op, SignedMantleTx, ops::channel::{ChannelId, inscribe::InscriptionOp}, }; -use tokio::sync::RwLock; -use crate::{config::IndexerConfig, state::IndexerState}; +use crate::{block_store::IndexerStore, config::IndexerConfig}; +pub mod block_store; pub mod config; pub mod state; #[derive(Clone)] pub struct IndexerCore { - bedrock_client: BedrockClient, - config: IndexerConfig, - state: IndexerState, + pub bedrock_client: BedrockClient, + pub sequencer_client: SequencerClient, + pub config: IndexerConfig, + pub store: IndexerStore, } impl IndexerCore { - pub fn new(config: IndexerConfig) -> Result { + pub async fn new(config: IndexerConfig) -> Result { + let sequencer_client = SequencerClient::new_with_auth( + config.sequencer_client_config.addr.clone(), + config.sequencer_client_config.auth.clone(), + )?; + + let start_block = sequencer_client.get_genesis_block().await?; + + let initial_commitments: Vec = config + .initial_commitments + .iter() + .map(|init_comm_data| { + let npk = &init_comm_data.npk; + + let mut acc = init_comm_data.account.clone(); + + acc.program_owner = nssa::program::Program::authenticated_transfer_program().id(); + + nssa_core::Commitment::new(npk, &acc) + }) + .collect(); + + let init_accs: Vec<(nssa::AccountId, u128)> = config + .initial_accounts + .iter() + .map(|acc_data| (acc_data.account_id.parse().unwrap(), acc_data.balance)) + .collect(); + + let mut state = nssa::V02State::new_with_genesis_accounts(&init_accs, &initial_commitments); + + // ToDo: Remove after testnet + state.add_pinata_program(PINATA_BASE58.parse().unwrap()); + + let home = config.home.clone(); + Ok(Self { bedrock_client: BedrockClient::new( config.bedrock_client_config.auth.clone().map(Into::into), config.bedrock_client_config.addr.clone(), )?, + sequencer_client, config, - // No state setup for now, future task. - state: IndexerState { - latest_seen_block: Arc::new(RwLock::new(0)), - }, + // ToDo: Implement restarts + store: IndexerStore::open_db_with_genesis(&home, Some((start_block, state)))?, }) } pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream> { async_stream::stream! { - loop { - let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); + loop { + let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); - info!("Block stream joined"); + info!("Block stream joined"); - while let Some(block_info) = stream_pinned.next().await { - let header_id = block_info.header_id; + while let Some(block_info) = stream_pinned.next().await { + let header_id = block_info.header_id; - info!("Observed L1 block at height {}", block_info.height); + info!("Observed L1 block at height {}", block_info.height); - if let Some(l1_block) = self - .bedrock_client - .get_block_by_id(header_id, &self.config.backoff) - .await? - { - info!("Extracted L1 block at height {}", block_info.height); + if let Some(l1_block) = self + .bedrock_client + .get_block_by_id(header_id, &self.config.backoff) + .await? + { + info!("Extracted L1 block at height {}", block_info.height); - let l2_blocks_parsed = parse_blocks( - l1_block.into_transactions().into_iter(), - &self.config.channel_id, - ).collect::>(); + let l2_blocks_parsed = parse_blocks( + l1_block.into_transactions().into_iter(), + &self.config.channel_id, + ).collect::>(); - info!("Parsed {} L2 blocks", l2_blocks_parsed.len()); + info!("Parsed {} L2 blocks", l2_blocks_parsed.len()); - for l2_block in l2_blocks_parsed { - // State modification, will be updated in future - { - let mut guard = self.state.latest_seen_block.write().await; - if l2_block.header.block_id > *guard { - *guard = l2_block.header.block_id; - } - } + for l2_block in l2_blocks_parsed { + self.store.put_block(l2_block.clone())?; - yield Ok(l2_block); - } + yield Ok(l2_block); } } + } - // Refetch stream after delay - tokio::time::sleep(std::time::Duration::from_millis( - self.config.resubscribe_interval_millis, - )) - .await; - } + // Refetch stream after delay + tokio::time::sleep(std::time::Duration::from_millis( + self.config.resubscribe_interval_millis, + )) + .await; + } } } } @@ -104,4 +135,4 @@ fn parse_blocks( _ => None, }) }) -} +} \ No newline at end of file diff --git a/indexer/service/src/lib.rs b/indexer/service/src/lib.rs index 8185cb40..c9a60bcc 100644 --- a/indexer/service/src/lib.rs +++ b/indexer/service/src/lib.rs @@ -72,7 +72,7 @@ pub async fn run_server(config: IndexerConfig, port: u16) -> Result Result { - let indexer = IndexerCore::new(config)?; + pub async fn new(config: IndexerConfig) -> Result { + let indexer = IndexerCore::new(config).await?; let subscription_service = SubscriptionService::spawn_new(indexer.clone()); Ok(Self { diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 74fbd557..614f809e 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -12,8 +12,9 @@ wallet.workspace = true common.workspace = true key_protocol.workspace = true indexer_service.workspace = true -url.workspace = true +indexer_core.workspace = true +url.workspace = true anyhow.workspace = true env_logger.workspace = true log.workspace = true diff --git a/integration_tests/src/config.rs b/integration_tests/src/config.rs index 16d577b5..4c789076 100644 --- a/integration_tests/src/config.rs +++ b/integration_tests/src/config.rs @@ -1,28 +1,30 @@ use std::net::SocketAddr; use anyhow::Result; -use indexer_service::{BackoffConfig, BedrockClientConfig, ChannelId, IndexerConfig}; +use indexer_service::IndexerConfig; use url::Url; pub fn indexer_config(bedrock_addr: SocketAddr) -> IndexerConfig { - let channel_id: [u8; 32] = [0u8, 1] - .repeat(16) - .try_into() - .unwrap_or_else(|_| unreachable!()); - let channel_id = ChannelId::try_from(channel_id).expect("Failed to create channel ID"); + todo!() - IndexerConfig { - resubscribe_interval_millis: 1000, - backoff: BackoffConfig { - start_delay_millis: 100, - max_retries: 10, - }, - bedrock_client_config: BedrockClientConfig { - addr: addr_to_http_url(bedrock_addr).expect("Failed to convert bedrock addr to URL"), - auth: None, - }, - channel_id, - } + // let channel_id: [u8; 32] = [0u8, 1] + // .repeat(16) + // .try_into() + // .unwrap_or_else(|_| unreachable!()); + // let channel_id = ChannelId::try_from(channel_id).expect("Failed to create channel ID"); + + // IndexerConfig { + // resubscribe_interval_millis: 1000, + // backoff: BackoffConfig { + // start_delay_millis: 100, + // max_retries: 10, + // }, + // bedrock_client_config: BedrockClientConfig { + // addr: addr_to_http_url(bedrock_addr).expect("Failed to convert bedrock addr to URL"), + // auth: None, + // }, + // channel_id, + // } } fn addr_to_http_url(addr: SocketAddr) -> Result { diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 818d8f1a..24dccbf8 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -44,7 +44,6 @@ pub struct TestContext { _indexer_handle: IndexerHandle, _temp_sequencer_dir: TempDir, _temp_wallet_dir: TempDir, - _temp_indexer_dir: Option, } impl TestContext { diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs index b25c887b..b43b0c9c 100644 --- a/integration_tests/tests/indexer.rs +++ b/integration_tests/tests/indexer.rs @@ -3,21 +3,21 @@ use integration_tests::TestContext; use log::info; use tokio::test; -#[ignore = "needs complicated setup"] -#[test] +// #[ignore = "needs complicated setup"] +// #[test] // To run this test properly, you need nomos node running in the background. // For instructions in building nomos node, refer to [this](https://github.com/logos-blockchain/logos-blockchain?tab=readme-ov-file#running-a-logos-blockchain-node). // // Recommended to run node locally from build binary. -async fn indexer_run_local_node() -> Result<()> { - let _ctx = TestContext::new_bedrock_local_attached().await?; +// async fn indexer_run_local_node() -> Result<()> { +// let _ctx = TestContext::new_bedrock_local_attached().await?; - info!("Let's observe behaviour"); +// info!("Let's observe behaviour"); - tokio::time::sleep(std::time::Duration::from_secs(180)).await; +// tokio::time::sleep(std::time::Duration::from_secs(180)).await; - // No way to check state of indexer now - // When it will be a service, then it will become possible. +// // No way to check state of indexer now +// // When it will be a service, then it will become possible. - Ok(()) -} +// Ok(()) +// } diff --git a/integration_tests/tests/tps.rs b/integration_tests/tests/tps.rs index fdbba823..6d451dcc 100644 --- a/integration_tests/tests/tps.rs +++ b/integration_tests/tests/tps.rs @@ -26,9 +26,8 @@ pub async fn tps_test() -> Result<()> { let target_tps = 12; let tps_test = TpsTestManager::new(target_tps, num_transactions); - let ctx = TestContext::new_with_sequencer_and_maybe_indexer_configs( - tps_test.generate_sequencer_config(), - None, + let ctx = TestContext::new_with_sequencer_config( + tps_test.generate_sequencer_config() ) .await?; diff --git a/sequencer_core/src/config.rs b/sequencer_core/src/config.rs index 4dc1ed1c..6cae44cb 100644 --- a/sequencer_core/src/config.rs +++ b/sequencer_core/src/config.rs @@ -6,9 +6,7 @@ use std::{ use anyhow::Result; use common::{ - block::{AccountInitialData, CommitmentsInitialData}, - sequencer_client::BasicAuth, -}; + block::{AccountInitialData, CommitmentsInitialData}, config::BasicAuth}; use logos_blockchain_core::mantle::ops::channel::ChannelId; use serde::{Deserialize, Serialize}; use url::Url; diff --git a/sequencer_core/src/lib.rs b/sequencer_core/src/lib.rs index 552f9bbd..e874a764 100644 --- a/sequencer_core/src/lib.rs +++ b/sequencer_core/src/lib.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, sync::Arc, time::Instant}; +use std::{sync::Arc, time::Instant}; use anyhow::Result; #[cfg(feature = "testnet")] diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index a03ac32e..285e482c 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -11,15 +11,7 @@ use common::{ message::{Message, Request}, parser::RpcRequest, requests::{ - GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest, - GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, - GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest, - GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, - GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, - GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, - GetProofForCommitmentResponse, GetTransactionByHashRequest, - GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest, - SendTxResponse, + GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetGenesisBlockRequest, GetGenesisBlockResponse, GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest, SendTxResponse }, }, transaction::{ @@ -363,8 +355,7 @@ mod tests { use base58::ToBase58; use base64::{Engine, engine::general_purpose}; use common::{ - config::BasicAuth, test_utils::sequencer_sign_key_for_testing, - transaction::EncodedTransaction, + block::AccountInitialData, config::BasicAuth, test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction }; use sequencer_core::{ SequencerCore, diff --git a/storage/src/sequencer.rs b/storage/src/sequencer.rs index 40959eab..d0985599 100644 --- a/storage/src/sequencer.rs +++ b/storage/src/sequencer.rs @@ -1,3 +1,14 @@ +use std::{path::Path, sync::Arc}; + +use common:: + block::Block; +use nssa::V02State; +use rocksdb::{ + BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch, +}; + +use crate::error::DbError; + /// Maximal size of stored blocks in base /// /// Used to control db size