Integrate Indexer Service into integration tests

This commit is contained in:
Daniil Polyakov 2026-02-02 19:07:24 +03:00
parent 321f31a54b
commit 71a7905a58
10 changed files with 152 additions and 140 deletions

2
Cargo.lock generated
View File

@ -3321,7 +3321,7 @@ dependencies = [
"env_logger",
"futures",
"hex",
"indexer_core",
"indexer_service",
"key_protocol",
"log",
"nssa",

View File

@ -8,6 +8,7 @@ pub mod transaction;
// Module for tests utility functions
// TODO: Compile only for tests
pub mod test_utils;
pub type HashType = [u8; 32];
pub const PINATA_BASE58: &str = "EfQhKQAkX2FJiwNii2WFQsGndjvF1Mzd7RuVe7QdPLw7";
pub type HashType = [u8; 32];

View File

@ -1,9 +1,9 @@
use std::{fs::File, io::BufReader, path::Path};
use anyhow::{Context as _, Result};
use bedrock_client::BackoffConfig;
pub use bedrock_client::BackoffConfig;
use common::config::BasicAuth;
use logos_blockchain_core::mantle::ops::channel::ChannelId;
pub use logos_blockchain_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
use url::Url;

View File

@ -1,4 +1,82 @@
use std::net::SocketAddr;
use anyhow::{Context as _, Result};
pub use indexer_core::config::*;
use indexer_service_rpc::RpcServer as _;
use jsonrpsee::server::Server;
use log::{error, info};
pub mod service;
#[cfg(feature = "mock-responses")]
pub mod mock_service;
pub struct IndexerHandle {
addr: SocketAddr,
server_handle: Option<jsonrpsee::server::ServerHandle>,
}
impl IndexerHandle {
fn new(addr: SocketAddr, server_handle: jsonrpsee::server::ServerHandle) -> Self {
Self {
addr,
server_handle: Some(server_handle),
}
}
pub fn addr(&self) -> SocketAddr {
self.addr
}
pub async fn stopped(mut self) {
let handle = self
.server_handle
.take()
.expect("Indexer server handle is set");
handle.stopped().await
}
}
impl Drop for IndexerHandle {
fn drop(&mut self) {
let Self {
addr: _,
server_handle,
} = self;
let Some(handle) = server_handle else {
return;
};
if let Err(err) = handle.stop() {
error!("An error occurred while stopping Indexer RPC server: {err}");
}
}
}
pub async fn run_server(config: IndexerConfig, port: u16) -> Result<IndexerHandle> {
#[cfg(feature = "mock-responses")]
let _ = config;
let server = Server::builder()
.build(SocketAddr::from(([0, 0, 0, 0], port)))
.await
.context("Failed to build RPC server")?;
let addr = server
.local_addr()
.context("Failed to get local address of RPC server")?;
info!("Starting Indexer Service RPC server on {addr}");
#[cfg(not(feature = "mock-responses"))]
let handle = {
let service =
service::IndexerService::new(config).context("Failed to initialize indexer service")?;
server.start(service.into_rpc())
};
#[cfg(feature = "mock-responses")]
let handle = server.start(mock_service::MockIndexerService::new_with_mock_blocks().into_rpc());
Ok(IndexerHandle::new(addr, handle))
}

View File

@ -1,10 +1,7 @@
use std::{net::SocketAddr, path::PathBuf};
use std::path::PathBuf;
use anyhow::{Context as _, Result};
use anyhow::Result;
use clap::Parser;
use indexer_core::config::IndexerConfig;
use indexer_service_rpc::RpcServer as _;
use jsonrpsee::server::Server;
use log::{error, info};
use tokio_util::sync::CancellationToken;
@ -25,14 +22,14 @@ async fn main() -> Result<()> {
let cancellation_token = listen_for_shutdown_signal();
let handle = run_server(config_path, port).await?;
let handle_clone = handle.clone();
let config = indexer_service::IndexerConfig::from_path(&config_path)?;
let indexer_handle = indexer_service::run_server(config, port).await?;
tokio::select! {
_ = cancellation_token.cancelled() => {
info!("Shutting down server...");
}
_ = handle_clone.stopped() => {
_ = indexer_handle.stopped() => {
error!("Server stopped unexpectedly");
}
}
@ -42,36 +39,6 @@ async fn main() -> Result<()> {
Ok(())
}
async fn run_server(config_path: PathBuf, port: u16) -> Result<jsonrpsee::server::ServerHandle> {
let config = IndexerConfig::from_path(&config_path)?;
#[cfg(feature = "mock-responses")]
let _ = config;
let server = Server::builder()
.build(SocketAddr::from(([0, 0, 0, 0], port)))
.await
.context("Failed to build RPC server")?;
let addr = server
.local_addr()
.context("Failed to get local address of RPC server")?;
info!("Starting Indexer Service RPC server on {addr}");
#[cfg(not(feature = "mock-responses"))]
let handle = {
let service = indexer_service::service::IndexerService::new(config)
.context("Failed to initialize indexer service")?;
server.start(service.into_rpc())
};
#[cfg(feature = "mock-responses")]
let handle = server.start(
indexer_service::mock_service::MockIndexerService::new_with_mock_blocks().into_rpc(),
);
Ok(handle)
}
fn listen_for_shutdown_signal() -> CancellationToken {
let cancellation_token = CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();

View File

@ -11,7 +11,7 @@ sequencer_runner.workspace = true
wallet.workspace = true
common.workspace = true
key_protocol.workspace = true
indexer_core.workspace = true
indexer_service.workspace = true
url.workspace = true
anyhow.workspace = true

View File

@ -1,17 +0,0 @@
{
"bedrock_client_config": {
"addr": "http://127.0.0.1:8080",
"auth": {
"username": "user"
}
},
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"backoff": {
"max_retries": 10,
"start_delay_millis": 100
},
"resubscribe_interval_millis": 1000,
"sequencer_client_config": {
"addr": "will_be_replaced_in_runtime"
}
}

View File

@ -0,0 +1,39 @@
use std::net::SocketAddr;
use anyhow::Result;
use indexer_service::{BackoffConfig, BedrockClientConfig, ChannelId, IndexerConfig};
use url::Url;
pub fn indexer_config(bedrock_addr: SocketAddr) -> IndexerConfig {
let channel_id: [u8; 32] = [0u8, 1]
.repeat(16)
.try_into()
.unwrap_or_else(|_| unreachable!());
let channel_id = ChannelId::try_from(channel_id).expect("Failed to create channel ID");
IndexerConfig {
resubscribe_interval_millis: 1000,
backoff: BackoffConfig {
start_delay_millis: 100,
max_retries: 10,
},
bedrock_client_config: BedrockClientConfig {
addr: addr_to_http_url(bedrock_addr).expect("Failed to convert bedrock addr to URL"),
auth: None,
},
channel_id,
}
}
fn addr_to_http_url(addr: SocketAddr) -> Result<Url> {
// Convert 0.0.0.0 to 127.0.0.1 for client connections
// When binding to port 0, the server binds to 0.0.0.0:<random_port>
// but clients need to connect to 127.0.0.1:<port> to work reliably
let url_string = if addr.ip().is_unspecified() {
format!("http://127.0.0.1:{}", addr.port())
} else {
format!("http://{addr}")
};
url_string.parse().map_err(Into::into)
}

View File

@ -8,17 +8,18 @@ use common::{
sequencer_client::SequencerClient,
transaction::{EncodedTransaction, NSSATransaction},
};
use indexer_core::config::IndexerConfig;
use indexer_service::IndexerHandle;
use log::debug;
use nssa::PrivacyPreservingTransaction;
use nssa_core::Commitment;
use sequencer_core::config::SequencerConfig;
use sequencer_runner::SequencerHandle;
use tempfile::TempDir;
use tokio::task::JoinHandle;
use url::Url;
use wallet::{WalletCore, config::WalletConfigOverrides};
mod config;
// TODO: Remove this and control time from tests
pub const TIME_TO_WAIT_FOR_BLOCK_SECONDS: u64 = 12;
@ -37,16 +38,16 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init);
/// It's memory and logically safe to create multiple instances of this struct in parallel tests,
/// as each instance uses its own temporary directories for sequencer and wallet data.
pub struct TestContext {
_sequencer_handle: SequencerHandle,
indexer_loop_handle: Option<JoinHandle<Result<()>>>,
sequencer_client: SequencerClient,
wallet: WalletCore,
_sequencer_handle: SequencerHandle,
_indexer_handle: IndexerHandle,
_temp_sequencer_dir: TempDir,
_temp_wallet_dir: TempDir,
}
impl TestContext {
/// Create new test context in detached mode. Default.
/// Create new test context.
pub async fn new() -> Result<Self> {
let manifest_dir = env!("CARGO_MANIFEST_DIR");
@ -56,42 +57,22 @@ impl TestContext {
let sequencer_config = SequencerConfig::from_path(&sequencer_config_path)
.context("Failed to create sequencer config from file")?;
Self::new_with_sequencer_and_maybe_indexer_configs(sequencer_config, None).await
Self::new_with_sequencer_config(sequencer_config).await
}
/// Create new test context in local bedrock node attached mode.
pub async fn new_bedrock_local_attached() -> Result<Self> {
let manifest_dir = env!("CARGO_MANIFEST_DIR");
let sequencer_config_path = PathBuf::from(manifest_dir)
.join("configs/sequencer/bedrock_local_attached/sequencer_config.json");
let sequencer_config = SequencerConfig::from_path(&sequencer_config_path)
.context("Failed to create sequencer config from file")?;
let indexer_config_path =
PathBuf::from(manifest_dir).join("configs/indexer/indexer_config.json");
let indexer_config = IndexerConfig::from_path(&indexer_config_path)
.context("Failed to create indexer config from file")?;
Self::new_with_sequencer_and_maybe_indexer_configs(sequencer_config, Some(indexer_config))
.await
}
/// Create new test context with custom sequencer config and maybe indexer config.
/// Create new test context with custom sequencer config.
///
/// `home` and `port` fields of the provided config will be overridden to meet tests parallelism
/// requirements.
pub async fn new_with_sequencer_and_maybe_indexer_configs(
sequencer_config: SequencerConfig,
indexer_config: Option<IndexerConfig>,
) -> Result<Self> {
pub async fn new_with_sequencer_config(sequencer_config: SequencerConfig) -> Result<Self> {
// Ensure logger is initialized only once
*LOGGER;
debug!("Test context setup");
let bedrock_addr = todo!();
let indexer_config = config::indexer_config(bedrock_addr);
let (_sequencer_handle, sequencer_addr, temp_sequencer_dir) =
Self::setup_sequencer(sequencer_config)
.await
@ -115,32 +96,18 @@ impl TestContext {
)
.context("Failed to create sequencer client")?;
if let Some(indexer_config) = indexer_config {
// let indexer_core = IndexerCore::new(indexer_config)?;
let _indexer_handle = indexer_service::run_server(indexer_config, 0)
.await
.context("Failed to run Indexer Service")?;
// let indexer_loop_handle = Some(tokio::spawn(async move {
// indexer_core.subscribe_parse_block_stream().await
// }));
// Ok(Self {
// _sequencer_handle,
// indexer_loop_handle,
// sequencer_client,
// wallet,
// _temp_sequencer_dir: temp_sequencer_dir,
// _temp_wallet_dir: temp_wallet_dir,
// })
todo!()
} else {
Ok(Self {
_sequencer_handle,
indexer_loop_handle: None,
sequencer_client,
wallet,
_temp_sequencer_dir: temp_sequencer_dir,
_temp_wallet_dir: temp_wallet_dir,
})
}
Ok(Self {
sequencer_client,
wallet,
_sequencer_handle,
_indexer_handle,
_temp_sequencer_dir: temp_sequencer_dir,
_temp_wallet_dir: temp_wallet_dir,
})
}
async fn setup_sequencer(
@ -212,25 +179,6 @@ impl TestContext {
}
}
impl Drop for TestContext {
fn drop(&mut self) {
debug!("Test context cleanup");
let Self {
_sequencer_handle,
indexer_loop_handle,
sequencer_client: _,
wallet: _,
_temp_sequencer_dir,
_temp_wallet_dir,
} = self;
if let Some(indexer_loop_handle) = indexer_loop_handle {
indexer_loop_handle.abort();
}
}
}
pub fn format_public_account_id(account_id: &str) -> String {
format!("Public/{account_id}")
}

View File

@ -4,7 +4,7 @@ use actix_web::dev::ServerHandle;
use anyhow::{Context as _, Result};
use clap::Parser;
use common::rpc_primitives::RpcConfig;
use futures::FutureExt as _;
use futures::{FutureExt as _, never::Never};
use log::{error, info, warn};
use sequencer_core::{SequencerCore, config::SequencerConfig};
use sequencer_rpc::new_http_server;
@ -19,10 +19,6 @@ struct Args {
home_dir: PathBuf,
}
/// An enum that can never be instantiated, used to replace unstable `!` type.
#[derive(Debug)]
pub enum Never {}
/// Handle to manage the sequencer and its tasks.
///
/// Implements `Drop` to ensure all tasks are aborted and the HTTP server is stopped when dropped.