mirror of
https://github.com/logos-blockchain/logos-execution-zone.git
synced 2026-06-26 00:49:27 +00:00
feat(indexer)!: make storage location caller-driven, not config-driven
The indexer's storage location was the `home` field of IndexerConfig, used only to derive the RocksDB dir. Defaulting to "." meant it landed in the process CWD — fine for the standalone service, but wrong when the indexer runs embedded in a logos_host subprocess (RocksDB ended up in an arbitrary/unwritable dir). Storage location is an operational concern the host should own, not something baked into a user-editable config. Remove `home` from IndexerConfig and pass the storage directory explicitly: - core: `IndexerCore::new(config, storage_dir)` derives `<storage_dir>/rocksdb`. - ffi: `start_indexer(runtime, config_path, storage_dir)`; null/empty storage_dir falls back to ".". Lets a host (e.g. a Logos module's instance persistence path) own where state lives. - service: `run_server(config, storage_dir, port)` + a `--data-dir` flag (default ".") on the binary, preserving current behaviour. - drop `home` from the committed indexer config JSONs and the test fixtures. BREAKING CHANGE: `start_indexer` gains a `storage_dir` parameter and IndexerConfig no longer has a `home` field.
This commit is contained in:
parent
0c52ec9695
commit
1c1e80f646
@ -37,6 +37,7 @@ unsafe extern "C" {
|
||||
pub unsafe fn start_indexer(
|
||||
runtime: *const Runtime,
|
||||
config_path: *const c_char,
|
||||
storage_dir: *const c_char,
|
||||
) -> InitializedIndexerServiceFFIResult;
|
||||
}
|
||||
|
||||
@ -49,9 +50,8 @@ pub fn setup_indexer_ffi(bedrock_addr: SocketAddr) -> Result<(IndexerServiceFFI,
|
||||
temp_indexer_dir.path().display()
|
||||
);
|
||||
|
||||
let indexer_config =
|
||||
integration_tests::config::indexer_config(bedrock_addr, temp_indexer_dir.path().to_owned())
|
||||
.context("Failed to create Indexer config")?;
|
||||
let indexer_config = integration_tests::config::indexer_config(bedrock_addr)
|
||||
.context("Failed to create Indexer config")?;
|
||||
|
||||
let config_json = serde_json::to_vec(&indexer_config)?;
|
||||
let config_path = temp_indexer_dir.path().join("indexer_config.json");
|
||||
@ -59,10 +59,13 @@ pub fn setup_indexer_ffi(bedrock_addr: SocketAddr) -> Result<(IndexerServiceFFI,
|
||||
file.write_all(&config_json)?;
|
||||
file.flush()?;
|
||||
|
||||
let config_path_c = CString::new(config_path.to_str().unwrap())?;
|
||||
let storage_dir_c = CString::new(temp_indexer_dir.path().to_str().unwrap())?;
|
||||
let res =
|
||||
// SAFETY: null runtime → the FFI creates and owns its own tokio runtime,
|
||||
// so there is no external runtime whose address we must keep stable.
|
||||
unsafe { start_indexer(std::ptr::null(), CString::new(config_path.to_str().unwrap())?.as_ptr()) };
|
||||
// so there is no external runtime whose address we must keep stable. The
|
||||
// temp dir is the indexer's storage location.
|
||||
unsafe { start_indexer(std::ptr::null(), config_path_c.as_ptr(), storage_dir_c.as_ptr()) };
|
||||
|
||||
if res.error.is_error() {
|
||||
anyhow::bail!("Indexer FFI error {:?}", res.error);
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
{
|
||||
"home": "./indexer/service",
|
||||
"consensus_info_polling_interval": "1s",
|
||||
"bedrock_config": {
|
||||
"addr": "http://logos-blockchain-node-0:18080"
|
||||
|
||||
@ -1,9 +1,4 @@
|
||||
use std::{
|
||||
fs::File,
|
||||
io::BufReader,
|
||||
path::{Path, PathBuf},
|
||||
time::Duration,
|
||||
};
|
||||
use std::{fs::File, io::BufReader, path::Path, time::Duration};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use common::config::BasicAuth;
|
||||
@ -21,8 +16,6 @@ pub struct ClientConfig {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct IndexerConfig {
|
||||
/// Home dir of indexer storage.
|
||||
pub home: PathBuf,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub consensus_info_polling_interval: Duration,
|
||||
pub bedrock_config: ClientConfig,
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use std::sync::Arc;
|
||||
use std::{path::Path, sync::Arc};
|
||||
|
||||
use anyhow::Result;
|
||||
use common::block::Block;
|
||||
@ -23,8 +23,8 @@ pub struct IndexerCore {
|
||||
}
|
||||
|
||||
impl IndexerCore {
|
||||
pub fn new(config: IndexerConfig) -> Result<Self> {
|
||||
let home = config.home.join("rocksdb");
|
||||
pub fn new(config: IndexerConfig, storage_dir: &Path) -> Result<Self> {
|
||||
let home = storage_dir.join("rocksdb");
|
||||
|
||||
let basic_auth = config.bedrock_config.auth.clone().map(Into::into);
|
||||
let node = NodeHttpClient::new(
|
||||
|
||||
@ -416,6 +416,9 @@ typedef struct PointerResult_FfiVec_FfiTransaction_____OperationStatus {
|
||||
* - `runtime`: A runtime for the indexer to run on, or null to have the indexer create and own
|
||||
* one.
|
||||
* - `config_path`: A pointer to a string representing the path to the configuration file.
|
||||
* - `storage_dir`: A pointer to a string naming the directory under which the indexer stores its
|
||||
* state (`RocksDB`), or null/empty to use the current directory. The host (e.g. a Logos module's
|
||||
* instance persistence path) owns this location.
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
@ -426,9 +429,11 @@ typedef struct PointerResult_FfiVec_FfiTransaction_____OperationStatus {
|
||||
* The caller must ensure that:
|
||||
* - `runtime` is either null or a valid pointer to a [`Runtime`] that outlives the indexer.
|
||||
* - `config_path` is a valid pointer to a null-terminated C string.
|
||||
* - `storage_dir` is either null or a valid pointer to a null-terminated C string.
|
||||
*/
|
||||
InitializedIndexerServiceFFIResult start_indexer(const struct Runtime *runtime,
|
||||
const char *config_path);
|
||||
const char *config_path,
|
||||
const char *storage_dir);
|
||||
|
||||
/**
|
||||
* Stops and frees the resources associated with the given indexer service.
|
||||
|
||||
@ -15,6 +15,9 @@ pub type InitializedIndexerServiceFFIResult = PointerResult<IndexerServiceFFI, O
|
||||
/// - `runtime`: A runtime for the indexer to run on, or null to have the indexer create and own
|
||||
/// one.
|
||||
/// - `config_path`: A pointer to a string representing the path to the configuration file.
|
||||
/// - `storage_dir`: A pointer to a string naming the directory under which the indexer stores its
|
||||
/// state (`RocksDB`), or null/empty to use the current directory. The host (e.g. a Logos module's
|
||||
/// instance persistence path) owns this location.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
@ -25,13 +28,15 @@ pub type InitializedIndexerServiceFFIResult = PointerResult<IndexerServiceFFI, O
|
||||
/// The caller must ensure that:
|
||||
/// - `runtime` is either null or a valid pointer to a [`Runtime`] that outlives the indexer.
|
||||
/// - `config_path` is a valid pointer to a null-terminated C string.
|
||||
/// - `storage_dir` is either null or a valid pointer to a null-terminated C string.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn start_indexer(
|
||||
runtime: *const Runtime,
|
||||
config_path: *const c_char,
|
||||
storage_dir: *const c_char,
|
||||
) -> InitializedIndexerServiceFFIResult {
|
||||
// SAFETY: The caller must ensure the validness of the `runtime` and `config_path` pointers.
|
||||
unsafe { setup_indexer(runtime, config_path) }.map_or_else(
|
||||
// SAFETY: The caller must ensure the validness of the pointer arguments.
|
||||
unsafe { setup_indexer(runtime, config_path, storage_dir) }.map_or_else(
|
||||
InitializedIndexerServiceFFIResult::from_error,
|
||||
InitializedIndexerServiceFFIResult::from_value,
|
||||
)
|
||||
@ -44,6 +49,7 @@ pub unsafe extern "C" fn start_indexer(
|
||||
///
|
||||
/// - `runtime`: A runtime for the indexer to run on, or null to create and own one.
|
||||
/// - `config_path`: A pointer to a string representing the path to the configuration file.
|
||||
/// - `storage_dir`: A pointer to a string naming the storage directory, or null/empty for `.`.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
@ -54,9 +60,11 @@ pub unsafe extern "C" fn start_indexer(
|
||||
/// The caller must ensure that:
|
||||
/// - `runtime` is either null or a valid pointer to a [`Runtime`] that outlives the indexer.
|
||||
/// - `config_path` is a valid pointer to a null-terminated C string.
|
||||
/// - `storage_dir` is either null or a valid pointer to a null-terminated C string.
|
||||
unsafe fn setup_indexer(
|
||||
runtime: *const Runtime,
|
||||
config_path: *const c_char,
|
||||
storage_dir: *const c_char,
|
||||
) -> Result<IndexerServiceFFI, OperationStatus> {
|
||||
let user_config_path = PathBuf::from(
|
||||
unsafe { std::ffi::CStr::from_ptr(config_path) }
|
||||
@ -71,6 +79,25 @@ unsafe fn setup_indexer(
|
||||
OperationStatus::InitializationError
|
||||
})?;
|
||||
|
||||
// The host owns where state lives. An empty/null `storage_dir` falls back to
|
||||
// the current directory (matches the standalone service's `--data-dir`
|
||||
// default), but a Logos module passes its instance persistence path.
|
||||
let storage_dir = if storage_dir.is_null() {
|
||||
PathBuf::from(".")
|
||||
} else {
|
||||
let storage_dir = unsafe { std::ffi::CStr::from_ptr(storage_dir) }
|
||||
.to_str()
|
||||
.map_err(|e| {
|
||||
log::error!("Could not convert the storage dir to string: {e}");
|
||||
OperationStatus::InitializationError
|
||||
})?;
|
||||
if storage_dir.is_empty() {
|
||||
PathBuf::from(".")
|
||||
} else {
|
||||
PathBuf::from(storage_dir)
|
||||
}
|
||||
};
|
||||
|
||||
// Use the caller's runtime if one was supplied, otherwise create (and own)
|
||||
// our own. The `Runtime` wrapper drops the underlying tokio runtime only
|
||||
// when we own it; a borrowed one is left to its external owner.
|
||||
@ -85,7 +112,7 @@ unsafe fn setup_indexer(
|
||||
unsafe { Runtime::from_borrowed(caller.as_ref()) }
|
||||
};
|
||||
|
||||
let core = IndexerCore::new(config).map_err(|e| {
|
||||
let core = IndexerCore::new(config, &storage_dir).map_err(|e| {
|
||||
log::error!("Could not initialize indexer core: {e}");
|
||||
OperationStatus::InitializationError
|
||||
})?;
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
{
|
||||
"home": ".",
|
||||
"consensus_info_polling_interval": "1s",
|
||||
"bedrock_config": {
|
||||
"addr": "http://localhost:8080"
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::{net::SocketAddr, path::Path};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
pub use indexer_core::config::*;
|
||||
@ -65,9 +65,13 @@ impl Drop for IndexerHandle {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_server(config: IndexerConfig, port: u16) -> Result<IndexerHandle> {
|
||||
pub async fn run_server(
|
||||
config: IndexerConfig,
|
||||
storage_dir: &Path,
|
||||
port: u16,
|
||||
) -> Result<IndexerHandle> {
|
||||
#[cfg(feature = "mock-responses")]
|
||||
let _ = config;
|
||||
let _ = (config, storage_dir);
|
||||
|
||||
let server = Server::builder()
|
||||
.build(SocketAddr::from(([0, 0, 0, 0], port)))
|
||||
@ -82,8 +86,8 @@ pub async fn run_server(config: IndexerConfig, port: u16) -> Result<IndexerHandl
|
||||
|
||||
#[cfg(not(feature = "mock-responses"))]
|
||||
let handle = {
|
||||
let service =
|
||||
service::IndexerService::new(config).context("Failed to initialize indexer service")?;
|
||||
let service = service::IndexerService::new(config, storage_dir)
|
||||
.context("Failed to initialize indexer service")?;
|
||||
server.start(service.into_rpc())
|
||||
};
|
||||
#[cfg(feature = "mock-responses")]
|
||||
|
||||
@ -12,6 +12,9 @@ struct Args {
|
||||
config_path: PathBuf,
|
||||
#[clap(short, long, default_value = "8779")]
|
||||
port: u16,
|
||||
/// Directory under which the indexer stores its `RocksDB` state.
|
||||
#[clap(short, long, default_value = ".")]
|
||||
data_dir: PathBuf,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@ -22,12 +25,16 @@ struct Args {
|
||||
async fn main() -> Result<()> {
|
||||
env_logger::init();
|
||||
|
||||
let Args { config_path, port } = Args::parse();
|
||||
let Args {
|
||||
config_path,
|
||||
port,
|
||||
data_dir,
|
||||
} = Args::parse();
|
||||
|
||||
let cancellation_token = listen_for_shutdown_signal();
|
||||
|
||||
let config = indexer_service::IndexerConfig::from_path(&config_path)?;
|
||||
let indexer_handle = indexer_service::run_server(config, port).await?;
|
||||
let indexer_handle = indexer_service::run_server(config, data_dir.as_path(), port).await?;
|
||||
|
||||
tokio::select! {
|
||||
() = cancellation_token.cancelled() => {
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use std::{pin::pin, sync::Arc};
|
||||
use std::{path::Path, pin::pin, sync::Arc};
|
||||
|
||||
use anyhow::{Context as _, Result, bail};
|
||||
use arc_swap::ArcSwap;
|
||||
@ -19,8 +19,8 @@ pub struct IndexerService {
|
||||
}
|
||||
|
||||
impl IndexerService {
|
||||
pub fn new(config: IndexerConfig) -> Result<Self> {
|
||||
let indexer = IndexerCore::new(config)?;
|
||||
pub fn new(config: IndexerConfig, storage_dir: &Path) -> Result<Self> {
|
||||
let indexer = IndexerCore::new(config, storage_dir)?;
|
||||
let subscription_service = SubscriptionService::spawn_new(indexer.clone());
|
||||
|
||||
Ok(Self {
|
||||
|
||||
@ -163,9 +163,8 @@ pub fn wallet_config(sequencer_addr: SocketAddr) -> Result<WalletConfig> {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn indexer_config(bedrock_addr: SocketAddr, home: PathBuf) -> Result<IndexerConfig> {
|
||||
pub fn indexer_config(bedrock_addr: SocketAddr) -> Result<IndexerConfig> {
|
||||
Ok(IndexerConfig {
|
||||
home,
|
||||
consensus_info_polling_interval: Duration::from_secs(1),
|
||||
bedrock_config: ClientConfig {
|
||||
addr: addr_to_url(UrlProtocol::Http, bedrock_addr)
|
||||
|
||||
@ -98,10 +98,10 @@ pub async fn setup_indexer(bedrock_addr: SocketAddr) -> Result<(IndexerHandle, T
|
||||
temp_indexer_dir.path().display()
|
||||
);
|
||||
|
||||
let indexer_config = config::indexer_config(bedrock_addr, temp_indexer_dir.path().to_owned())
|
||||
.context("Failed to create Indexer config")?;
|
||||
let indexer_config =
|
||||
config::indexer_config(bedrock_addr).context("Failed to create Indexer config")?;
|
||||
|
||||
indexer_service::run_server(indexer_config, 0)
|
||||
indexer_service::run_server(indexer_config, temp_indexer_dir.path(), 0)
|
||||
.await
|
||||
.context("Failed to run Indexer Service")
|
||||
.map(|handle| (handle, temp_indexer_dir))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user