fix: merge fix

This commit is contained in:
Pravdyvy 2026-02-03 11:36:07 +02:00
parent e1df915357
commit efac8639c3
18 changed files with 141 additions and 114 deletions

1
Cargo.lock generated
View File

@ -3324,6 +3324,7 @@ dependencies = [
"env_logger",
"futures",
"hex",
"indexer_core",
"indexer_service",
"key_protocol",
"log",

View File

@ -46,7 +46,7 @@ pub enum BedrockStatus {
Finalized,
}
#[derive(Debug, BorshSerialize, BorshDeserialize)]
#[derive(Debug, BorshSerialize, BorshDeserialize, Clone)]
pub struct Block {
pub header: BlockHeader,
pub body: BlockBody,

View File

@ -90,7 +90,6 @@ parse_request!(GetAccountsNoncesRequest);
parse_request!(GetProofForCommitmentRequest);
parse_request!(GetAccountRequest);
parse_request!(GetProgramIdsRequest);
parse_request!(PostIndexerMessageRequest);
parse_request!(GetGenesisBlockRequest);
#[derive(Serialize, Deserialize, Debug)]

View File

@ -18,12 +18,7 @@ use crate::{
rpc_primitives::{
self,
requests::{
GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest,
GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse,
GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse, SendTxRequest, SendTxResponse,
GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetGenesisBlockRequest, GetGenesisBlockResponse, GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest, SendTxResponse
},
},
transaction::{EncodedTransaction, NSSATransaction},

View File

@ -1,4 +1,4 @@
use std::path::Path;
use std::{path::Path, sync::Arc};
use anyhow::Result;
use common::{
@ -8,8 +8,9 @@ use common::{
use nssa::V02State;
use storage::indexer::RocksDBIO;
#[derive(Clone)]
pub struct IndexerStore {
dbio: RocksDBIO,
dbio: Arc<RocksDBIO>,
}
impl IndexerStore {
@ -23,7 +24,7 @@ impl IndexerStore {
) -> Result<Self> {
let dbio = RocksDBIO::open_or_create(location, start_data)?;
Ok(Self { dbio })
Ok(Self { dbio: Arc::new(dbio) })
}
/// Reopening existing database

View File

@ -7,15 +7,13 @@ use std::{
use anyhow::{Context, Result};
use bedrock_client::BackoffConfig;
use common::{
block::{AccountInitialData, CommitmentsInitialData},
sequencer_client::BasicAuth,
};
block::{AccountInitialData, CommitmentsInitialData}, config::BasicAuth};
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BedrockClientConfig {
pub struct ClientConfig {
pub addr: Url,
pub auth: Option<BasicAuth>,
}
@ -31,7 +29,8 @@ pub struct IndexerConfig {
pub resubscribe_interval_millis: u64,
/// For individual RPC requests we use Fibonacci backoff retry strategy.
pub backoff: BackoffConfig,
pub bedrock_client_config: BedrockClientConfig,
pub bedrock_client_config: ClientConfig,
pub sequencer_client_config: ClientConfig,
pub channel_id: ChannelId,
}

View File

@ -1,89 +1,120 @@
use std::sync::Arc;
use anyhow::Result;
use bedrock_client::BedrockClient;
use common::block::Block;
// ToDo: Remove after testnet
use common::PINATA_BASE58;
use common::{
block::Block,
sequencer_client::SequencerClient,
};
use futures::StreamExt;
use log::info;
use logos_blockchain_core::mantle::{
Op, SignedMantleTx,
ops::channel::{ChannelId, inscribe::InscriptionOp},
};
use tokio::sync::RwLock;
use crate::{config::IndexerConfig, state::IndexerState};
use crate::{block_store::IndexerStore, config::IndexerConfig};
pub mod block_store;
pub mod config;
pub mod state;
#[derive(Clone)]
pub struct IndexerCore {
bedrock_client: BedrockClient,
config: IndexerConfig,
state: IndexerState,
pub bedrock_client: BedrockClient,
pub sequencer_client: SequencerClient,
pub config: IndexerConfig,
pub store: IndexerStore,
}
impl IndexerCore {
pub fn new(config: IndexerConfig) -> Result<Self> {
pub async fn new(config: IndexerConfig) -> Result<Self> {
let sequencer_client = SequencerClient::new_with_auth(
config.sequencer_client_config.addr.clone(),
config.sequencer_client_config.auth.clone(),
)?;
let start_block = sequencer_client.get_genesis_block().await?;
let initial_commitments: Vec<nssa_core::Commitment> = config
.initial_commitments
.iter()
.map(|init_comm_data| {
let npk = &init_comm_data.npk;
let mut acc = init_comm_data.account.clone();
acc.program_owner = nssa::program::Program::authenticated_transfer_program().id();
nssa_core::Commitment::new(npk, &acc)
})
.collect();
let init_accs: Vec<(nssa::AccountId, u128)> = config
.initial_accounts
.iter()
.map(|acc_data| (acc_data.account_id.parse().unwrap(), acc_data.balance))
.collect();
let mut state = nssa::V02State::new_with_genesis_accounts(&init_accs, &initial_commitments);
// ToDo: Remove after testnet
state.add_pinata_program(PINATA_BASE58.parse().unwrap());
let home = config.home.clone();
Ok(Self {
bedrock_client: BedrockClient::new(
config.bedrock_client_config.auth.clone().map(Into::into),
config.bedrock_client_config.addr.clone(),
)?,
sequencer_client,
config,
// No state setup for now, future task.
state: IndexerState {
latest_seen_block: Arc::new(RwLock::new(0)),
},
// ToDo: Implement restarts
store: IndexerStore::open_db_with_genesis(&home, Some((start_block, state)))?,
})
}
pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> {
async_stream::stream! {
loop {
let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);
loop {
let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);
info!("Block stream joined");
info!("Block stream joined");
while let Some(block_info) = stream_pinned.next().await {
let header_id = block_info.header_id;
while let Some(block_info) = stream_pinned.next().await {
let header_id = block_info.header_id;
info!("Observed L1 block at height {}", block_info.height);
info!("Observed L1 block at height {}", block_info.height);
if let Some(l1_block) = self
.bedrock_client
.get_block_by_id(header_id, &self.config.backoff)
.await?
{
info!("Extracted L1 block at height {}", block_info.height);
if let Some(l1_block) = self
.bedrock_client
.get_block_by_id(header_id, &self.config.backoff)
.await?
{
info!("Extracted L1 block at height {}", block_info.height);
let l2_blocks_parsed = parse_blocks(
l1_block.into_transactions().into_iter(),
&self.config.channel_id,
).collect::<Vec<_>>();
let l2_blocks_parsed = parse_blocks(
l1_block.into_transactions().into_iter(),
&self.config.channel_id,
).collect::<Vec<_>>();
info!("Parsed {} L2 blocks", l2_blocks_parsed.len());
info!("Parsed {} L2 blocks", l2_blocks_parsed.len());
for l2_block in l2_blocks_parsed {
// State modification, will be updated in future
{
let mut guard = self.state.latest_seen_block.write().await;
if l2_block.header.block_id > *guard {
*guard = l2_block.header.block_id;
}
}
for l2_block in l2_blocks_parsed {
self.store.put_block(l2_block.clone())?;
yield Ok(l2_block);
}
yield Ok(l2_block);
}
}
}
// Refetch stream after delay
tokio::time::sleep(std::time::Duration::from_millis(
self.config.resubscribe_interval_millis,
))
.await;
}
// Refetch stream after delay
tokio::time::sleep(std::time::Duration::from_millis(
self.config.resubscribe_interval_millis,
))
.await;
}
}
}
}
@ -104,4 +135,4 @@ fn parse_blocks(
_ => None,
})
})
}
}

View File

@ -72,7 +72,7 @@ pub async fn run_server(config: IndexerConfig, port: u16) -> Result<IndexerHandl
#[cfg(not(feature = "mock-responses"))]
let handle = {
let service =
service::IndexerService::new(config).context("Failed to initialize indexer service")?;
service::IndexerService::new(config).await.context("Failed to initialize indexer service")?;
server.start(service.into_rpc())
};
#[cfg(feature = "mock-responses")]

View File

@ -22,8 +22,8 @@ pub struct IndexerService {
}
impl IndexerService {
pub fn new(config: IndexerConfig) -> Result<Self> {
let indexer = IndexerCore::new(config)?;
pub async fn new(config: IndexerConfig) -> Result<Self> {
let indexer = IndexerCore::new(config).await?;
let subscription_service = SubscriptionService::spawn_new(indexer.clone());
Ok(Self {

View File

@ -12,8 +12,9 @@ wallet.workspace = true
common.workspace = true
key_protocol.workspace = true
indexer_service.workspace = true
url.workspace = true
indexer_core.workspace = true
url.workspace = true
anyhow.workspace = true
env_logger.workspace = true
log.workspace = true

View File

@ -1,28 +1,30 @@
use std::net::SocketAddr;
use anyhow::Result;
use indexer_service::{BackoffConfig, BedrockClientConfig, ChannelId, IndexerConfig};
use indexer_service::IndexerConfig;
use url::Url;
pub fn indexer_config(bedrock_addr: SocketAddr) -> IndexerConfig {
let channel_id: [u8; 32] = [0u8, 1]
.repeat(16)
.try_into()
.unwrap_or_else(|_| unreachable!());
let channel_id = ChannelId::try_from(channel_id).expect("Failed to create channel ID");
todo!()
IndexerConfig {
resubscribe_interval_millis: 1000,
backoff: BackoffConfig {
start_delay_millis: 100,
max_retries: 10,
},
bedrock_client_config: BedrockClientConfig {
addr: addr_to_http_url(bedrock_addr).expect("Failed to convert bedrock addr to URL"),
auth: None,
},
channel_id,
}
// let channel_id: [u8; 32] = [0u8, 1]
// .repeat(16)
// .try_into()
// .unwrap_or_else(|_| unreachable!());
// let channel_id = ChannelId::try_from(channel_id).expect("Failed to create channel ID");
// IndexerConfig {
// resubscribe_interval_millis: 1000,
// backoff: BackoffConfig {
// start_delay_millis: 100,
// max_retries: 10,
// },
// bedrock_client_config: BedrockClientConfig {
// addr: addr_to_http_url(bedrock_addr).expect("Failed to convert bedrock addr to URL"),
// auth: None,
// },
// channel_id,
// }
}
fn addr_to_http_url(addr: SocketAddr) -> Result<Url> {

View File

@ -44,7 +44,6 @@ pub struct TestContext {
_indexer_handle: IndexerHandle,
_temp_sequencer_dir: TempDir,
_temp_wallet_dir: TempDir,
_temp_indexer_dir: Option<TempDir>,
}
impl TestContext {

View File

@ -3,21 +3,21 @@ use integration_tests::TestContext;
use log::info;
use tokio::test;
#[ignore = "needs complicated setup"]
#[test]
// #[ignore = "needs complicated setup"]
// #[test]
// To run this test properly, you need nomos node running in the background.
// For instructions in building nomos node, refer to [this](https://github.com/logos-blockchain/logos-blockchain?tab=readme-ov-file#running-a-logos-blockchain-node).
//
// Recommended to run node locally from build binary.
async fn indexer_run_local_node() -> Result<()> {
let _ctx = TestContext::new_bedrock_local_attached().await?;
// async fn indexer_run_local_node() -> Result<()> {
// let _ctx = TestContext::new_bedrock_local_attached().await?;
info!("Let's observe behaviour");
// info!("Let's observe behaviour");
tokio::time::sleep(std::time::Duration::from_secs(180)).await;
// tokio::time::sleep(std::time::Duration::from_secs(180)).await;
// No way to check state of indexer now
// When it will be a service, then it will become possible.
// // No way to check state of indexer now
// // When it will be a service, then it will become possible.
Ok(())
}
// Ok(())
// }

View File

@ -26,9 +26,8 @@ pub async fn tps_test() -> Result<()> {
let target_tps = 12;
let tps_test = TpsTestManager::new(target_tps, num_transactions);
let ctx = TestContext::new_with_sequencer_and_maybe_indexer_configs(
tps_test.generate_sequencer_config(),
None,
let ctx = TestContext::new_with_sequencer_config(
tps_test.generate_sequencer_config()
)
.await?;

View File

@ -6,9 +6,7 @@ use std::{
use anyhow::Result;
use common::{
block::{AccountInitialData, CommitmentsInitialData},
sequencer_client::BasicAuth,
};
block::{AccountInitialData, CommitmentsInitialData}, config::BasicAuth};
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
use url::Url;

View File

@ -1,4 +1,4 @@
use std::{fmt::Display, sync::Arc, time::Instant};
use std::{sync::Arc, time::Instant};
use anyhow::Result;
#[cfg(feature = "testnet")]

View File

@ -11,15 +11,7 @@ use common::{
message::{Message, Request},
parser::RpcRequest,
requests::{
GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest,
GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse,
GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest,
GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse,
GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest,
SendTxResponse,
GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetGenesisBlockRequest, GetGenesisBlockResponse, GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest, SendTxResponse
},
},
transaction::{
@ -363,8 +355,7 @@ mod tests {
use base58::ToBase58;
use base64::{Engine, engine::general_purpose};
use common::{
config::BasicAuth, test_utils::sequencer_sign_key_for_testing,
transaction::EncodedTransaction,
block::AccountInitialData, config::BasicAuth, test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction
};
use sequencer_core::{
SequencerCore,

View File

@ -1,3 +1,14 @@
use std::{path::Path, sync::Arc};
use common::
block::Block;
use nssa::V02State;
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch,
};
use crate::error::DbError;
/// Maximal size of stored blocks in base
///
/// Used to control db size