From 71a7905a5845217ce7661db01c48a8be6cf33c48 Mon Sep 17 00:00:00 2001 From: Daniil Polyakov Date: Mon, 2 Feb 2026 19:07:24 +0300 Subject: [PATCH] Integrate Indexer Service into integration tests --- Cargo.lock | 2 +- common/src/lib.rs | 3 +- indexer/core/src/config.rs | 4 +- indexer/service/src/lib.rs | 78 +++++++++++++++ indexer/service/src/main.rs | 43 +------- integration_tests/Cargo.toml | 2 +- .../configs/indexer/indexer_config.json | 17 ---- integration_tests/src/config.rs | 39 ++++++++ integration_tests/src/lib.rs | 98 +++++-------------- sequencer_runner/src/lib.rs | 6 +- 10 files changed, 152 insertions(+), 140 deletions(-) delete mode 100644 integration_tests/configs/indexer/indexer_config.json create mode 100644 integration_tests/src/config.rs diff --git a/Cargo.lock b/Cargo.lock index 604ad55f..e0690a13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3321,7 +3321,7 @@ dependencies = [ "env_logger", "futures", "hex", - "indexer_core", + "indexer_service", "key_protocol", "log", "nssa", diff --git a/common/src/lib.rs b/common/src/lib.rs index 21c0796d..d44fd30f 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -8,6 +8,7 @@ pub mod transaction; // Module for tests utility functions // TODO: Compile only for tests pub mod test_utils; -pub type HashType = [u8; 32]; pub const PINATA_BASE58: &str = "EfQhKQAkX2FJiwNii2WFQsGndjvF1Mzd7RuVe7QdPLw7"; + +pub type HashType = [u8; 32]; diff --git a/indexer/core/src/config.rs b/indexer/core/src/config.rs index db7855ba..c05aa5cc 100644 --- a/indexer/core/src/config.rs +++ b/indexer/core/src/config.rs @@ -1,9 +1,9 @@ use std::{fs::File, io::BufReader, path::Path}; use anyhow::{Context as _, Result}; -use bedrock_client::BackoffConfig; +pub use bedrock_client::BackoffConfig; use common::config::BasicAuth; -use logos_blockchain_core::mantle::ops::channel::ChannelId; +pub use logos_blockchain_core::mantle::ops::channel::ChannelId; use serde::{Deserialize, Serialize}; use url::Url; diff --git a/indexer/service/src/lib.rs b/indexer/service/src/lib.rs index 0c18410e..8185cb40 100644 --- a/indexer/service/src/lib.rs +++ b/indexer/service/src/lib.rs @@ -1,4 +1,82 @@ +use std::net::SocketAddr; + +use anyhow::{Context as _, Result}; +pub use indexer_core::config::*; +use indexer_service_rpc::RpcServer as _; +use jsonrpsee::server::Server; +use log::{error, info}; + pub mod service; #[cfg(feature = "mock-responses")] pub mod mock_service; + +pub struct IndexerHandle { + addr: SocketAddr, + server_handle: Option, +} +impl IndexerHandle { + fn new(addr: SocketAddr, server_handle: jsonrpsee::server::ServerHandle) -> Self { + Self { + addr, + server_handle: Some(server_handle), + } + } + + pub fn addr(&self) -> SocketAddr { + self.addr + } + + pub async fn stopped(mut self) { + let handle = self + .server_handle + .take() + .expect("Indexer server handle is set"); + + handle.stopped().await + } +} + +impl Drop for IndexerHandle { + fn drop(&mut self) { + let Self { + addr: _, + server_handle, + } = self; + + let Some(handle) = server_handle else { + return; + }; + + if let Err(err) = handle.stop() { + error!("An error occurred while stopping Indexer RPC server: {err}"); + } + } +} + +pub async fn run_server(config: IndexerConfig, port: u16) -> Result { + #[cfg(feature = "mock-responses")] + let _ = config; + + let server = Server::builder() + .build(SocketAddr::from(([0, 0, 0, 0], port))) + .await + .context("Failed to build RPC server")?; + + let addr = server + .local_addr() + .context("Failed to get local address of RPC server")?; + + info!("Starting Indexer Service RPC server on {addr}"); + + #[cfg(not(feature = "mock-responses"))] + let handle = { + let service = + service::IndexerService::new(config).context("Failed to initialize indexer service")?; + server.start(service.into_rpc()) + }; + #[cfg(feature = "mock-responses")] + let handle = server.start(mock_service::MockIndexerService::new_with_mock_blocks().into_rpc()); + + Ok(IndexerHandle::new(addr, handle)) +} diff --git a/indexer/service/src/main.rs b/indexer/service/src/main.rs index e0ece691..e4d18feb 100644 --- a/indexer/service/src/main.rs +++ b/indexer/service/src/main.rs @@ -1,10 +1,7 @@ -use std::{net::SocketAddr, path::PathBuf}; +use std::path::PathBuf; -use anyhow::{Context as _, Result}; +use anyhow::Result; use clap::Parser; -use indexer_core::config::IndexerConfig; -use indexer_service_rpc::RpcServer as _; -use jsonrpsee::server::Server; use log::{error, info}; use tokio_util::sync::CancellationToken; @@ -25,14 +22,14 @@ async fn main() -> Result<()> { let cancellation_token = listen_for_shutdown_signal(); - let handle = run_server(config_path, port).await?; - let handle_clone = handle.clone(); + let config = indexer_service::IndexerConfig::from_path(&config_path)?; + let indexer_handle = indexer_service::run_server(config, port).await?; tokio::select! { _ = cancellation_token.cancelled() => { info!("Shutting down server..."); } - _ = handle_clone.stopped() => { + _ = indexer_handle.stopped() => { error!("Server stopped unexpectedly"); } } @@ -42,36 +39,6 @@ async fn main() -> Result<()> { Ok(()) } -async fn run_server(config_path: PathBuf, port: u16) -> Result { - let config = IndexerConfig::from_path(&config_path)?; - #[cfg(feature = "mock-responses")] - let _ = config; - - let server = Server::builder() - .build(SocketAddr::from(([0, 0, 0, 0], port))) - .await - .context("Failed to build RPC server")?; - - let addr = server - .local_addr() - .context("Failed to get local address of RPC server")?; - - info!("Starting Indexer Service RPC server on {addr}"); - - #[cfg(not(feature = "mock-responses"))] - let handle = { - let service = indexer_service::service::IndexerService::new(config) - .context("Failed to initialize indexer service")?; - server.start(service.into_rpc()) - }; - #[cfg(feature = "mock-responses")] - let handle = server.start( - indexer_service::mock_service::MockIndexerService::new_with_mock_blocks().into_rpc(), - ); - - Ok(handle) -} - fn listen_for_shutdown_signal() -> CancellationToken { let cancellation_token = CancellationToken::new(); let cancellation_token_clone = cancellation_token.clone(); diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index b7ca13dd..74fbd557 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -11,7 +11,7 @@ sequencer_runner.workspace = true wallet.workspace = true common.workspace = true key_protocol.workspace = true -indexer_core.workspace = true +indexer_service.workspace = true url.workspace = true anyhow.workspace = true diff --git a/integration_tests/configs/indexer/indexer_config.json b/integration_tests/configs/indexer/indexer_config.json deleted file mode 100644 index fd5309b2..00000000 --- a/integration_tests/configs/indexer/indexer_config.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "bedrock_client_config": { - "addr": "http://127.0.0.1:8080", - "auth": { - "username": "user" - } - }, - "channel_id": "0101010101010101010101010101010101010101010101010101010101010101", - "backoff": { - "max_retries": 10, - "start_delay_millis": 100 - }, - "resubscribe_interval_millis": 1000, - "sequencer_client_config": { - "addr": "will_be_replaced_in_runtime" - } -} \ No newline at end of file diff --git a/integration_tests/src/config.rs b/integration_tests/src/config.rs new file mode 100644 index 00000000..16d577b5 --- /dev/null +++ b/integration_tests/src/config.rs @@ -0,0 +1,39 @@ +use std::net::SocketAddr; + +use anyhow::Result; +use indexer_service::{BackoffConfig, BedrockClientConfig, ChannelId, IndexerConfig}; +use url::Url; + +pub fn indexer_config(bedrock_addr: SocketAddr) -> IndexerConfig { + let channel_id: [u8; 32] = [0u8, 1] + .repeat(16) + .try_into() + .unwrap_or_else(|_| unreachable!()); + let channel_id = ChannelId::try_from(channel_id).expect("Failed to create channel ID"); + + IndexerConfig { + resubscribe_interval_millis: 1000, + backoff: BackoffConfig { + start_delay_millis: 100, + max_retries: 10, + }, + bedrock_client_config: BedrockClientConfig { + addr: addr_to_http_url(bedrock_addr).expect("Failed to convert bedrock addr to URL"), + auth: None, + }, + channel_id, + } +} + +fn addr_to_http_url(addr: SocketAddr) -> Result { + // Convert 0.0.0.0 to 127.0.0.1 for client connections + // When binding to port 0, the server binds to 0.0.0.0: + // but clients need to connect to 127.0.0.1: to work reliably + let url_string = if addr.ip().is_unspecified() { + format!("http://127.0.0.1:{}", addr.port()) + } else { + format!("http://{addr}") + }; + + url_string.parse().map_err(Into::into) +} diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 5f39c872..24dccbf8 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -8,17 +8,18 @@ use common::{ sequencer_client::SequencerClient, transaction::{EncodedTransaction, NSSATransaction}, }; -use indexer_core::config::IndexerConfig; +use indexer_service::IndexerHandle; use log::debug; use nssa::PrivacyPreservingTransaction; use nssa_core::Commitment; use sequencer_core::config::SequencerConfig; use sequencer_runner::SequencerHandle; use tempfile::TempDir; -use tokio::task::JoinHandle; use url::Url; use wallet::{WalletCore, config::WalletConfigOverrides}; +mod config; + // TODO: Remove this and control time from tests pub const TIME_TO_WAIT_FOR_BLOCK_SECONDS: u64 = 12; @@ -37,16 +38,16 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init); /// It's memory and logically safe to create multiple instances of this struct in parallel tests, /// as each instance uses its own temporary directories for sequencer and wallet data. pub struct TestContext { - _sequencer_handle: SequencerHandle, - indexer_loop_handle: Option>>, sequencer_client: SequencerClient, wallet: WalletCore, + _sequencer_handle: SequencerHandle, + _indexer_handle: IndexerHandle, _temp_sequencer_dir: TempDir, _temp_wallet_dir: TempDir, } impl TestContext { - /// Create new test context in detached mode. Default. + /// Create new test context. pub async fn new() -> Result { let manifest_dir = env!("CARGO_MANIFEST_DIR"); @@ -56,42 +57,22 @@ impl TestContext { let sequencer_config = SequencerConfig::from_path(&sequencer_config_path) .context("Failed to create sequencer config from file")?; - Self::new_with_sequencer_and_maybe_indexer_configs(sequencer_config, None).await + Self::new_with_sequencer_config(sequencer_config).await } - /// Create new test context in local bedrock node attached mode. - pub async fn new_bedrock_local_attached() -> Result { - let manifest_dir = env!("CARGO_MANIFEST_DIR"); - - let sequencer_config_path = PathBuf::from(manifest_dir) - .join("configs/sequencer/bedrock_local_attached/sequencer_config.json"); - - let sequencer_config = SequencerConfig::from_path(&sequencer_config_path) - .context("Failed to create sequencer config from file")?; - - let indexer_config_path = - PathBuf::from(manifest_dir).join("configs/indexer/indexer_config.json"); - - let indexer_config = IndexerConfig::from_path(&indexer_config_path) - .context("Failed to create indexer config from file")?; - - Self::new_with_sequencer_and_maybe_indexer_configs(sequencer_config, Some(indexer_config)) - .await - } - - /// Create new test context with custom sequencer config and maybe indexer config. + /// Create new test context with custom sequencer config. /// /// `home` and `port` fields of the provided config will be overridden to meet tests parallelism /// requirements. - pub async fn new_with_sequencer_and_maybe_indexer_configs( - sequencer_config: SequencerConfig, - indexer_config: Option, - ) -> Result { + pub async fn new_with_sequencer_config(sequencer_config: SequencerConfig) -> Result { // Ensure logger is initialized only once *LOGGER; debug!("Test context setup"); + let bedrock_addr = todo!(); + let indexer_config = config::indexer_config(bedrock_addr); + let (_sequencer_handle, sequencer_addr, temp_sequencer_dir) = Self::setup_sequencer(sequencer_config) .await @@ -115,32 +96,18 @@ impl TestContext { ) .context("Failed to create sequencer client")?; - if let Some(indexer_config) = indexer_config { - // let indexer_core = IndexerCore::new(indexer_config)?; + let _indexer_handle = indexer_service::run_server(indexer_config, 0) + .await + .context("Failed to run Indexer Service")?; - // let indexer_loop_handle = Some(tokio::spawn(async move { - // indexer_core.subscribe_parse_block_stream().await - // })); - - // Ok(Self { - // _sequencer_handle, - // indexer_loop_handle, - // sequencer_client, - // wallet, - // _temp_sequencer_dir: temp_sequencer_dir, - // _temp_wallet_dir: temp_wallet_dir, - // }) - todo!() - } else { - Ok(Self { - _sequencer_handle, - indexer_loop_handle: None, - sequencer_client, - wallet, - _temp_sequencer_dir: temp_sequencer_dir, - _temp_wallet_dir: temp_wallet_dir, - }) - } + Ok(Self { + sequencer_client, + wallet, + _sequencer_handle, + _indexer_handle, + _temp_sequencer_dir: temp_sequencer_dir, + _temp_wallet_dir: temp_wallet_dir, + }) } async fn setup_sequencer( @@ -212,25 +179,6 @@ impl TestContext { } } -impl Drop for TestContext { - fn drop(&mut self) { - debug!("Test context cleanup"); - - let Self { - _sequencer_handle, - indexer_loop_handle, - sequencer_client: _, - wallet: _, - _temp_sequencer_dir, - _temp_wallet_dir, - } = self; - - if let Some(indexer_loop_handle) = indexer_loop_handle { - indexer_loop_handle.abort(); - } - } -} - pub fn format_public_account_id(account_id: &str) -> String { format!("Public/{account_id}") } diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs index 5913ee02..c74a943e 100644 --- a/sequencer_runner/src/lib.rs +++ b/sequencer_runner/src/lib.rs @@ -4,7 +4,7 @@ use actix_web::dev::ServerHandle; use anyhow::{Context as _, Result}; use clap::Parser; use common::rpc_primitives::RpcConfig; -use futures::FutureExt as _; +use futures::{FutureExt as _, never::Never}; use log::{error, info, warn}; use sequencer_core::{SequencerCore, config::SequencerConfig}; use sequencer_rpc::new_http_server; @@ -19,10 +19,6 @@ struct Args { home_dir: PathBuf, } -/// An enum that can never be instantiated, used to replace unstable `!` type. -#[derive(Debug)] -pub enum Never {} - /// Handle to manage the sequencer and its tasks. /// /// Implements `Drop` to ensure all tasks are aborted and the HTTP server is stopped when dropped.