mirror of
https://github.com/logos-blockchain/logos-execution-zone.git
synced 2026-06-26 08:59:45 +00:00
Merge pull request #547 from logos-blockchain/erhant/fix-indexer-ffi
refactor(indexer)!: query IndexerCore directly in the FFI; fix #538 / #540 / #544, drop `port`
This commit is contained in:
commit
a9df90c5b6
10
.gitignore
vendored
10
.gitignore
vendored
@ -1,17 +1,25 @@
|
||||
.gitconfig
|
||||
|
||||
res/
|
||||
target/
|
||||
deps/
|
||||
data/
|
||||
|
||||
.idea/
|
||||
.vscode/
|
||||
rocksdb
|
||||
|
||||
rocksdb*
|
||||
sequencer/service/data/
|
||||
storage.json
|
||||
|
||||
result
|
||||
|
||||
wallet-ffi/wallet_ffi.h
|
||||
bedrock_signing_key
|
||||
integration_tests/configs/debug/
|
||||
venv/
|
||||
|
||||
keycard_wallet/python/__pycache__/
|
||||
keycard_wallet/python/keycard-py/
|
||||
|
||||
.DS_Store
|
||||
|
||||
10
Cargo.lock
generated
10
Cargo.lock
generated
@ -3799,6 +3799,7 @@ name = "indexer_core"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"async-stream",
|
||||
"authenticated_transfer_core",
|
||||
"borsh",
|
||||
@ -3823,16 +3824,15 @@ dependencies = [
|
||||
name = "indexer_ffi"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"cbindgen",
|
||||
"indexer_service",
|
||||
"env_logger",
|
||||
"futures",
|
||||
"indexer_core",
|
||||
"indexer_service_protocol",
|
||||
"indexer_service_rpc",
|
||||
"jsonrpsee",
|
||||
"lee",
|
||||
"log",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@ -117,6 +117,7 @@ hex = "0.4.3"
|
||||
bytemuck = "1.24.0"
|
||||
bytesize = { version = "2.3.1", features = ["serde"] }
|
||||
humantime-serde = "1.1"
|
||||
arc-swap = "1.7"
|
||||
humantime = "2.1"
|
||||
aes-gcm = "0.10.3"
|
||||
toml = "0.9.8"
|
||||
|
||||
4
Justfile
4
Justfile
@ -93,7 +93,7 @@ clean:
|
||||
@echo "🧹 Cleaning run artifacts"
|
||||
rm -rf lez/sequencer/service/bedrock_signing_key
|
||||
rm -rf lez/sequencer/service/rocksdb
|
||||
rm -rf lez/indexer/service/rocksdb
|
||||
rm -rf lez/indexer/service/rocksdb*
|
||||
rm -rf lez/wallet/configs/debug/storage.json
|
||||
rm -rf rocksdb
|
||||
rm -rf rocksdb*
|
||||
cd bedrock && docker compose down -v
|
||||
|
||||
@ -5,8 +5,7 @@
|
||||
)]
|
||||
|
||||
use anyhow::Result;
|
||||
use indexer_ffi::{Runtime, api::types::FfiOption};
|
||||
use integration_tests::L2_TO_L1_TIMEOUT;
|
||||
use indexer_ffi::api::types::FfiOption;
|
||||
use log::info;
|
||||
|
||||
#[path = "indexer_ffi_helpers/mod.rs"]
|
||||
@ -14,21 +13,15 @@ mod indexer_ffi_helpers;
|
||||
|
||||
#[test]
|
||||
fn indexer_ffi_block_batching() -> Result<()> {
|
||||
let (ctx, indexer_ffi, _indexer_dir) = indexer_ffi_helpers::setup()?;
|
||||
// `_ctx` keeps the bedrock/sequencer harness (and its runtime) alive for the
|
||||
// duration of the test; the indexer was started on that runtime.
|
||||
let (_ctx, indexer_ffi, _indexer_dir) = indexer_ffi_helpers::setup()?;
|
||||
|
||||
// WAIT
|
||||
// WAIT: poll until the indexer has finalized at least two blocks (so the
|
||||
// chain-consistency check below verifies at least one block link), returning
|
||||
// early instead of sleeping for the full timeout.
|
||||
info!("Waiting for indexer to parse blocks");
|
||||
std::thread::sleep(L2_TO_L1_TIMEOUT);
|
||||
|
||||
// Safety: ctx runtime is valid for the lifetime of the returned Runtime
|
||||
let runtime = unsafe { Runtime::from_borrowed(ctx.runtime()) };
|
||||
let last_block_indexer_ffi_res = unsafe {
|
||||
indexer_ffi_helpers::query_last_block(&raw const runtime, &raw const indexer_ffi)
|
||||
};
|
||||
|
||||
assert!(last_block_indexer_ffi_res.error.is_ok());
|
||||
|
||||
let last_block_indexer = unsafe { *last_block_indexer_ffi_res.value };
|
||||
let last_block_indexer = indexer_ffi_helpers::wait_for_indexer_ffi_block(&indexer_ffi, 2)?;
|
||||
|
||||
info!("Last block on indexer FFI now is {last_block_indexer}");
|
||||
|
||||
@ -37,14 +30,8 @@ fn indexer_ffi_block_batching() -> Result<()> {
|
||||
let before_ffi = FfiOption::<u64>::from_none();
|
||||
let limit = 100;
|
||||
|
||||
let block_batch_ffi_res = unsafe {
|
||||
indexer_ffi_helpers::query_block_vec(
|
||||
&raw const runtime,
|
||||
&raw const indexer_ffi,
|
||||
before_ffi,
|
||||
limit,
|
||||
)
|
||||
};
|
||||
let block_batch_ffi_res =
|
||||
unsafe { indexer_ffi_helpers::query_block_vec(&raw const indexer_ffi, before_ffi, limit) };
|
||||
|
||||
assert!(block_batch_ffi_res.error.is_ok());
|
||||
|
||||
|
||||
@ -13,6 +13,7 @@ use indexer_ffi::{
|
||||
api::{
|
||||
PointerResult,
|
||||
lifecycle::InitializedIndexerServiceFFIResult,
|
||||
query::LastBlockIdResult,
|
||||
types::{FfiAccountId, FfiOption, FfiVec, account::FfiAccount, block::FfiBlock},
|
||||
},
|
||||
};
|
||||
@ -20,20 +21,15 @@ use integration_tests::{BlockingTestContext, TestContext};
|
||||
use tempfile::TempDir;
|
||||
|
||||
unsafe extern "C" {
|
||||
pub unsafe fn query_last_block(
|
||||
runtime: *const Runtime,
|
||||
indexer: *const IndexerServiceFFI,
|
||||
) -> PointerResult<u64, OperationStatus>;
|
||||
pub unsafe fn query_last_block(indexer: *const IndexerServiceFFI) -> LastBlockIdResult;
|
||||
|
||||
pub unsafe fn query_block_vec(
|
||||
runtime: *const Runtime,
|
||||
indexer: *const IndexerServiceFFI,
|
||||
before: FfiOption<u64>,
|
||||
limit: u64,
|
||||
) -> PointerResult<FfiVec<FfiBlock>, OperationStatus>;
|
||||
|
||||
pub unsafe fn query_account(
|
||||
runtime: *const Runtime,
|
||||
indexer: *const IndexerServiceFFI,
|
||||
account_id: FfiAccountId,
|
||||
) -> PointerResult<FfiAccount, OperationStatus>;
|
||||
@ -41,14 +37,11 @@ unsafe extern "C" {
|
||||
pub unsafe fn start_indexer(
|
||||
runtime: *const Runtime,
|
||||
config_path: *const c_char,
|
||||
port: u16,
|
||||
storage_dir: *const c_char,
|
||||
) -> InitializedIndexerServiceFFIResult;
|
||||
}
|
||||
|
||||
pub fn setup_indexer_ffi(
|
||||
runtime: &Runtime,
|
||||
bedrock_addr: SocketAddr,
|
||||
) -> Result<(IndexerServiceFFI, TempDir)> {
|
||||
pub fn setup_indexer_ffi(bedrock_addr: SocketAddr) -> Result<(IndexerServiceFFI, TempDir)> {
|
||||
let temp_indexer_dir =
|
||||
tempfile::tempdir().context("Failed to create temp dir for indexer home")?;
|
||||
|
||||
@ -57,9 +50,8 @@ pub fn setup_indexer_ffi(
|
||||
temp_indexer_dir.path().display()
|
||||
);
|
||||
|
||||
let indexer_config =
|
||||
integration_tests::config::indexer_config(bedrock_addr, temp_indexer_dir.path().to_owned())
|
||||
.context("Failed to create Indexer config")?;
|
||||
let indexer_config = integration_tests::config::indexer_config(bedrock_addr)
|
||||
.context("Failed to create Indexer config")?;
|
||||
|
||||
let config_json = serde_json::to_vec(&indexer_config)?;
|
||||
let config_path = temp_indexer_dir.path().join("indexer_config.json");
|
||||
@ -67,9 +59,13 @@ pub fn setup_indexer_ffi(
|
||||
file.write_all(&config_json)?;
|
||||
file.flush()?;
|
||||
|
||||
let config_path_c = CString::new(config_path.to_str().unwrap())?;
|
||||
let storage_dir_c = CString::new(temp_indexer_dir.path().to_str().unwrap())?;
|
||||
let res =
|
||||
// SAFETY: lib function ensures validity of value.
|
||||
unsafe { start_indexer(std::ptr::from_ref(runtime), CString::new(config_path.to_str().unwrap())?.as_ptr(), 0) };
|
||||
// SAFETY: null runtime → the FFI creates and owns its own tokio runtime,
|
||||
// so there is no external runtime whose address we must keep stable. The
|
||||
// temp dir is the indexer's storage location.
|
||||
unsafe { start_indexer(std::ptr::null(), config_path_c.as_ptr(), storage_dir_c.as_ptr()) };
|
||||
|
||||
if res.error.is_error() {
|
||||
anyhow::bail!("Indexer FFI error {:?}", res.error);
|
||||
@ -84,8 +80,35 @@ pub fn setup_indexer_ffi(
|
||||
|
||||
pub fn setup() -> Result<(BlockingTestContext, IndexerServiceFFI, TempDir)> {
|
||||
let ctx = TestContext::builder().disable_indexer().build_blocking()?;
|
||||
// Safety: ctx runtime is valid for the lifetime of the returned Runtime
|
||||
let runtime = unsafe { Runtime::from_borrowed(ctx.runtime()) };
|
||||
let (indexer_ffi, indexer_dir) = setup_indexer_ffi(&runtime, ctx.ctx().bedrock_addr())?;
|
||||
// Don't borrow `ctx.runtime()`: `ctx` (and its by-value tokio runtime) is
|
||||
// moved into the returned tuple, which would leave any pointer into it
|
||||
// dangling. Pass a null runtime so the FFI owns its own — the same path the
|
||||
// production module uses.
|
||||
let (indexer_ffi, indexer_dir) = setup_indexer_ffi(ctx.ctx().bedrock_addr())?;
|
||||
Ok((ctx, indexer_ffi, indexer_dir))
|
||||
}
|
||||
|
||||
/// Poll the indexer FFI until its last finalized block id reaches `min_block_id`
|
||||
/// or until [`integration_tests::L2_TO_L1_TIMEOUT`] elapses.
|
||||
///
|
||||
/// This avoids blindly sleeping for the full timeout: the indexer typically
|
||||
/// catches up in a fraction of that time, so we return as soon as it does and
|
||||
/// only use the timeout as a ceiling. Returns the last observed block id.
|
||||
pub fn wait_for_indexer_ffi_block(indexer: &IndexerServiceFFI, min_block_id: u64) -> Result<u64> {
|
||||
let start = std::time::Instant::now();
|
||||
loop {
|
||||
// SAFETY: `indexer` is a valid reference for the duration of the call.
|
||||
let res = unsafe { query_last_block(std::ptr::from_ref(indexer)) };
|
||||
if res.error.is_ok() && res.is_some && res.block_id >= min_block_id {
|
||||
return Ok(res.block_id);
|
||||
}
|
||||
if start.elapsed() >= integration_tests::L2_TO_L1_TIMEOUT {
|
||||
anyhow::bail!(
|
||||
"Indexer FFI did not reach block {min_block_id} within {:?}. Last observed block id: {}",
|
||||
integration_tests::L2_TO_L1_TIMEOUT,
|
||||
res.block_id
|
||||
);
|
||||
}
|
||||
std::thread::sleep(std::time::Duration::from_secs(2));
|
||||
}
|
||||
}
|
||||
|
||||
@ -8,7 +8,6 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use indexer_ffi::Runtime;
|
||||
use indexer_service_protocol::Account;
|
||||
use integration_tests::{
|
||||
L2_TO_L1_TIMEOUT, TIME_TO_WAIT_FOR_BLOCK_SECONDS, private_mention, public_mention,
|
||||
@ -102,11 +101,8 @@ fn indexer_ffi_state_consistency() -> Result<()> {
|
||||
info!("Waiting for indexer to parse blocks");
|
||||
std::thread::sleep(L2_TO_L1_TIMEOUT);
|
||||
|
||||
// Safety: ctx runtime is valid for the lifetime of the returned Runtime
|
||||
let runtime = unsafe { Runtime::from_borrowed(ctx.runtime()) };
|
||||
let acc1_ind_state_ffi = unsafe {
|
||||
indexer_ffi_helpers::query_account(
|
||||
&raw const runtime,
|
||||
&raw const indexer_ffi,
|
||||
(&ctx.ctx().existing_public_accounts()[0]).into(),
|
||||
)
|
||||
@ -119,7 +115,6 @@ fn indexer_ffi_state_consistency() -> Result<()> {
|
||||
|
||||
let acc2_ind_state_ffi = unsafe {
|
||||
indexer_ffi_helpers::query_account(
|
||||
&raw const runtime,
|
||||
&raw const indexer_ffi,
|
||||
(&ctx.ctx().existing_public_accounts()[1]).into(),
|
||||
)
|
||||
|
||||
@ -8,7 +8,6 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use indexer_ffi::Runtime;
|
||||
use indexer_service_protocol::Account;
|
||||
use integration_tests::{L2_TO_L1_TIMEOUT, TIME_TO_WAIT_FOR_BLOCK_SECONDS, public_mention};
|
||||
use log::info;
|
||||
@ -75,11 +74,8 @@ fn indexer_ffi_state_consistency_with_labels() -> Result<()> {
|
||||
info!("Waiting for indexer to parse blocks");
|
||||
std::thread::sleep(L2_TO_L1_TIMEOUT);
|
||||
|
||||
// Safety: ctx runtime is valid for the lifetime of the returned Runtime
|
||||
let runtime = unsafe { Runtime::from_borrowed(ctx.runtime()) };
|
||||
let acc1_ind_state_ffi = unsafe {
|
||||
indexer_ffi_helpers::query_account(
|
||||
&raw const runtime,
|
||||
&raw const indexer_ffi,
|
||||
(&ctx.ctx().existing_public_accounts()[0]).into(),
|
||||
)
|
||||
|
||||
@ -1,12 +1,9 @@
|
||||
#![expect(
|
||||
clippy::tests_outside_test_module,
|
||||
clippy::undocumented_unsafe_blocks,
|
||||
reason = "We don't care about these in tests"
|
||||
)]
|
||||
|
||||
use anyhow::Result;
|
||||
use indexer_ffi::Runtime;
|
||||
use integration_tests::L2_TO_L1_TIMEOUT;
|
||||
use log::info;
|
||||
|
||||
#[path = "indexer_ffi_helpers/mod.rs"]
|
||||
@ -14,20 +11,13 @@ mod indexer_ffi_helpers;
|
||||
|
||||
#[test]
|
||||
fn indexer_test_run_ffi() -> Result<()> {
|
||||
let (ctx, indexer_ffi, _indexer_dir) = indexer_ffi_helpers::setup()?;
|
||||
// `_ctx` keeps the bedrock/sequencer harness (and its runtime) alive for the
|
||||
// duration of the test; the indexer was started on that runtime.
|
||||
let (_ctx, indexer_ffi, _indexer_dir) = indexer_ffi_helpers::setup()?;
|
||||
|
||||
// RUN OBSERVATION
|
||||
std::thread::sleep(L2_TO_L1_TIMEOUT);
|
||||
|
||||
// Safety: ctx runtime is valid for the lifetime of the returned Runtime
|
||||
let runtime = unsafe { Runtime::from_borrowed(ctx.runtime()) };
|
||||
let last_block_indexer_ffi_res = unsafe {
|
||||
indexer_ffi_helpers::query_last_block(&raw const runtime, &raw const indexer_ffi)
|
||||
};
|
||||
|
||||
assert!(last_block_indexer_ffi_res.error.is_ok());
|
||||
|
||||
let last_block_indexer_ffi = unsafe { *last_block_indexer_ffi_res.value };
|
||||
// RUN OBSERVATION: poll until the indexer has finalized at least one block,
|
||||
// returning early instead of sleeping for the full timeout.
|
||||
let last_block_indexer_ffi = indexer_ffi_helpers::wait_for_indexer_ffi_block(&indexer_ffi, 1)?;
|
||||
|
||||
info!("Last block on indexer FFI now is {last_block_indexer_ffi}");
|
||||
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
{
|
||||
"home": "./indexer/service",
|
||||
"consensus_info_polling_interval": "1s",
|
||||
"bedrock_config": {
|
||||
"addr": "http://logos-blockchain-node-0:18080"
|
||||
|
||||
@ -16,6 +16,7 @@ storage.workspace = true
|
||||
testnet_initial_state.workspace = true
|
||||
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
log.workspace = true
|
||||
serde.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
|
||||
@ -1,9 +1,4 @@
|
||||
use std::{
|
||||
fs::File,
|
||||
io::BufReader,
|
||||
path::{Path, PathBuf},
|
||||
time::Duration,
|
||||
};
|
||||
use std::{fs::File, io::BufReader, path::Path, time::Duration};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use common::config::BasicAuth;
|
||||
@ -21,8 +16,6 @@ pub struct ClientConfig {
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct IndexerConfig {
|
||||
/// Home dir of indexer storage.
|
||||
pub home: PathBuf,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub consensus_info_polling_interval: Duration,
|
||||
pub bedrock_config: ClientConfig,
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
use std::{path::Path, sync::Arc};
|
||||
|
||||
use anyhow::Result;
|
||||
use arc_swap::ArcSwap;
|
||||
use common::block::Block;
|
||||
// ToDo: Remove after testnet
|
||||
use futures::StreamExt as _;
|
||||
@ -10,21 +11,30 @@ use logos_blockchain_zone_sdk::{
|
||||
CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
|
||||
};
|
||||
|
||||
use crate::{block_store::IndexerStore, config::IndexerConfig};
|
||||
use crate::{
|
||||
block_store::IndexerStore,
|
||||
config::IndexerConfig,
|
||||
status::{IndexerStatus, IndexerSyncStatus},
|
||||
};
|
||||
|
||||
pub mod block_store;
|
||||
pub mod config;
|
||||
pub mod status;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IndexerCore {
|
||||
pub zone_indexer: Arc<ZoneIndexer<NodeHttpClient>>,
|
||||
pub config: IndexerConfig,
|
||||
pub store: IndexerStore,
|
||||
/// Live ingestion status; updated by the ingest stream, read by `status`.
|
||||
pub status: Arc<ArcSwap<IndexerSyncStatus>>,
|
||||
}
|
||||
|
||||
impl IndexerCore {
|
||||
pub fn new(config: IndexerConfig) -> Result<Self> {
|
||||
let home = config.home.join("rocksdb");
|
||||
pub fn new(config: IndexerConfig, storage_dir: &Path) -> Result<Self> {
|
||||
// Namespace the DB by channel so indexers on different channels can
|
||||
// share a storage dir without their RocksDB state colliding.
|
||||
let home = storage_dir.join(format!("rocksdb-{}", config.channel_id));
|
||||
|
||||
let basic_auth = config.bedrock_config.auth.clone().map(Into::into);
|
||||
let node = NodeHttpClient::new(
|
||||
@ -37,9 +47,29 @@ impl IndexerCore {
|
||||
zone_indexer: Arc::new(zone_indexer),
|
||||
config,
|
||||
store: IndexerStore::open_db(&home)?,
|
||||
status: Arc::new(ArcSwap::from_pointee(IndexerSyncStatus::starting())),
|
||||
})
|
||||
}
|
||||
|
||||
/// Snapshot of the current ingestion status (sync state + indexed tip).
|
||||
///
|
||||
/// Combines the ingest loop's live status with the L2 tip read fresh from the
|
||||
/// store, so callers (FFI/RPC) can tell "catching up" from "failed".
|
||||
#[must_use]
|
||||
pub fn status(&self) -> IndexerStatus {
|
||||
let sync = IndexerSyncStatus::clone(&self.status.load());
|
||||
let indexed_block_id = self.store.get_last_block_id().ok().flatten();
|
||||
IndexerStatus {
|
||||
sync,
|
||||
indexed_block_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Atomically publish a new ingestion status for readers of `status`.
|
||||
fn set_status(&self, status: IndexerSyncStatus) {
|
||||
self.status.store(Arc::new(status));
|
||||
}
|
||||
|
||||
pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> + '_ {
|
||||
let poll_interval = self.config.consensus_info_polling_interval;
|
||||
let initial_cursor = self
|
||||
@ -60,14 +90,30 @@ impl IndexerCore {
|
||||
let stream = match self.zone_indexer.next_messages(cursor).await {
|
||||
Ok(s) => s,
|
||||
Err(err) => {
|
||||
// `next_messages` reads L1 consensus info internally, so
|
||||
// this also covers an unreachable/misconfigured L1 node.
|
||||
error!("Failed to start zone-sdk next_messages stream: {err}");
|
||||
self.set_status(IndexerSyncStatus::error(format!(
|
||||
"cannot reach L1 / read channel: {err}"
|
||||
)));
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let mut stream = std::pin::pin!(stream);
|
||||
|
||||
// Flip to Syncing on the first message of this cycle (not merely on
|
||||
// a successful poll) so the steady-state CaughtUp status doesn't
|
||||
// flicker. Until then the state stays Starting (cold-start scan of
|
||||
// empty L1 history) or CaughtUp (idle).
|
||||
let mut announced_syncing = false;
|
||||
|
||||
while let Some((msg, slot)) = stream.next().await {
|
||||
if !announced_syncing {
|
||||
self.set_status(IndexerSyncStatus::syncing());
|
||||
announced_syncing = true;
|
||||
}
|
||||
|
||||
let zone_block = match msg {
|
||||
ZoneMessage::Block(b) => b,
|
||||
// Non-block messages don't carry a cursor position; the
|
||||
@ -105,7 +151,11 @@ impl IndexerCore {
|
||||
yield Ok(block);
|
||||
}
|
||||
|
||||
// Stream ended (caught up to LIB). Sleep then poll again.
|
||||
// Stream drained: caught up to LIB as of this cycle. Clears any
|
||||
// prior error (e.g. a transient L1 disconnect that left no
|
||||
// backlog, so the `Syncing` branch above never ran). Sleep then
|
||||
// poll again.
|
||||
self.set_status(IndexerSyncStatus::caught_up());
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
103
lez/indexer/core/src/status.rs
Normal file
103
lez/indexer/core/src/status.rs
Normal file
@ -0,0 +1,103 @@
|
||||
use serde::Serialize;
|
||||
|
||||
/// Coarse lifecycle state of the indexer's ingestion loop, so a client can tell
|
||||
/// "still catching up" apart from "something went wrong".
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum IndexerSyncState {
|
||||
/// Booted; no ingestion cycle has run yet.
|
||||
Starting,
|
||||
/// Streaming finalized messages toward the L1 frontier.
|
||||
Syncing,
|
||||
/// Drained the stream up to LIB; idle until new blocks finalize.
|
||||
CaughtUp,
|
||||
/// The last cycle failed (e.g. the L1 node is unreachable). See `last_error`.
|
||||
Error,
|
||||
}
|
||||
|
||||
/// Live ingestion status owned by the ingest loop: the coarse `state` plus the
|
||||
/// reason when it is `Error`.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct IndexerSyncStatus {
|
||||
pub state: IndexerSyncState,
|
||||
pub last_error: Option<String>,
|
||||
}
|
||||
|
||||
impl IndexerSyncStatus {
|
||||
/// Initial status before any ingestion cycle has run.
|
||||
pub(crate) const fn starting() -> Self {
|
||||
Self {
|
||||
state: IndexerSyncState::Starting,
|
||||
last_error: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Actively streaming finalized messages toward the L1 frontier.
|
||||
pub(crate) const fn syncing() -> Self {
|
||||
Self {
|
||||
state: IndexerSyncState::Syncing,
|
||||
last_error: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Drained the stream up to LIB; idle until new blocks finalize.
|
||||
pub(crate) const fn caught_up() -> Self {
|
||||
Self {
|
||||
state: IndexerSyncState::CaughtUp,
|
||||
last_error: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// The last cycle failed; `reason` explains why.
|
||||
pub(crate) const fn error(reason: String) -> Self {
|
||||
Self {
|
||||
state: IndexerSyncState::Error,
|
||||
last_error: Some(reason),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Full status snapshot returned to callers (FFI/RPC): the live [`IndexerSyncStatus`]
|
||||
/// plus the L2 tip (`indexed_block_id`) read fresh from the store at query time.
|
||||
///
|
||||
/// The tip is tracked by the store, not the ingest loop, so it lives here on the
|
||||
/// returned snapshot rather than inside the shared [`IndexerSyncStatus`].
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct IndexerStatus {
|
||||
#[serde(flatten)]
|
||||
pub sync: IndexerSyncStatus,
|
||||
pub indexed_block_id: Option<u64>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn indexer_status_serializes_to_flat_object() {
|
||||
let status = IndexerStatus {
|
||||
sync: IndexerSyncStatus::error("boom".to_owned()),
|
||||
indexed_block_id: Some(7),
|
||||
};
|
||||
let value = serde_json::to_value(&status).expect("serialize");
|
||||
assert_eq!(
|
||||
value,
|
||||
serde_json::json!({
|
||||
"state": "error",
|
||||
"lastError": "boom",
|
||||
"indexedBlockId": 7,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn caught_up_clears_error() {
|
||||
let value = serde_json::to_value(IndexerSyncStatus::caught_up()).expect("serialize");
|
||||
assert_eq!(
|
||||
value,
|
||||
serde_json::json!({ "state": "caught_up", "lastError": null })
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -6,15 +6,14 @@ version = "0.1.0"
|
||||
|
||||
[dependencies]
|
||||
lee.workspace = true
|
||||
indexer_service.workspace = true
|
||||
indexer_service_rpc = { workspace = true, features = ["client"] }
|
||||
indexer_service_protocol.workspace = true
|
||||
indexer_core.workspace = true
|
||||
indexer_service_protocol = { workspace = true, features = ["convert"] }
|
||||
|
||||
url.workspace = true
|
||||
env_logger.workspace = true
|
||||
log = { workspace = true }
|
||||
tokio = { features = ["rt-multi-thread"], workspace = true }
|
||||
jsonrpsee.workspace = true
|
||||
anyhow.workspace = true
|
||||
futures.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
cbindgen = "0.29"
|
||||
|
||||
@ -6,6 +6,8 @@ fn main() {
|
||||
cbindgen::Builder::new()
|
||||
.with_crate(crate_dir)
|
||||
.with_language(cbindgen::Language::C)
|
||||
.with_cpp_compat(true)
|
||||
.with_pragma_once(true)
|
||||
.generate()
|
||||
.expect("Unable to generate bindings")
|
||||
.write_to_file("indexer_ffi.h");
|
||||
|
||||
@ -1,2 +0,0 @@
|
||||
language = "C" # For increased compatibility
|
||||
no_includes = true
|
||||
@ -1,3 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <stdarg.h>
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
@ -22,26 +24,6 @@ typedef enum FfiBedrockStatus {
|
||||
Finalized,
|
||||
} FfiBedrockStatus;
|
||||
|
||||
typedef struct Option_u64 Option_u64;
|
||||
|
||||
typedef struct IndexerServiceFFI {
|
||||
void *indexer_handle;
|
||||
void *indexer_client;
|
||||
} IndexerServiceFFI;
|
||||
|
||||
/**
|
||||
* Simple wrapper around a pointer to a value or an error.
|
||||
*
|
||||
* Pointer is not guaranteed. You should check the error field before
|
||||
* dereferencing the pointer.
|
||||
*/
|
||||
typedef struct PointerResult_IndexerServiceFFI__OperationStatus {
|
||||
struct IndexerServiceFFI *value;
|
||||
enum OperationStatus error;
|
||||
} PointerResult_IndexerServiceFFI__OperationStatus;
|
||||
|
||||
typedef struct PointerResult_IndexerServiceFFI__OperationStatus InitializedIndexerServiceFFIResult;
|
||||
|
||||
typedef enum PointerKind_Tag {
|
||||
Owned,
|
||||
Borrowed,
|
||||
@ -72,15 +54,19 @@ typedef struct Runtime {
|
||||
} Runtime;
|
||||
|
||||
/**
|
||||
* Simple wrapper around a pointer to a value or an error.
|
||||
* FFI-owned indexer.
|
||||
*
|
||||
* Pointer is not guaranteed. You should check the error field before
|
||||
* dereferencing the pointer.
|
||||
* - An [`IndexerCore`] used to answer queries
|
||||
* - The background task [`JoinHandle`] that drives ingestion (consuming the block stream so the
|
||||
* store stays populated)
|
||||
* - The [`Runtime`] used to run async queries against the store (either owned or borrowed),
|
||||
* already FFI-safe.
|
||||
*/
|
||||
typedef struct PointerResult_Runtime__OperationStatus {
|
||||
struct Runtime *value;
|
||||
enum OperationStatus error;
|
||||
} PointerResult_Runtime__OperationStatus;
|
||||
typedef struct IndexerServiceFFI {
|
||||
void *core;
|
||||
void *ingest_handle;
|
||||
struct Runtime runtime;
|
||||
} IndexerServiceFFI;
|
||||
|
||||
/**
|
||||
* Simple wrapper around a pointer to a value or an error.
|
||||
@ -88,10 +74,26 @@ typedef struct PointerResult_Runtime__OperationStatus {
|
||||
* Pointer is not guaranteed. You should check the error field before
|
||||
* dereferencing the pointer.
|
||||
*/
|
||||
typedef struct PointerResult_Option_u64_____OperationStatus {
|
||||
struct Option_u64 *value;
|
||||
typedef struct PointerResult_IndexerServiceFFI__OperationStatus {
|
||||
struct IndexerServiceFFI *value;
|
||||
enum OperationStatus error;
|
||||
} PointerResult_Option_u64_____OperationStatus;
|
||||
} PointerResult_IndexerServiceFFI__OperationStatus;
|
||||
|
||||
typedef struct PointerResult_IndexerServiceFFI__OperationStatus InitializedIndexerServiceFFIResult;
|
||||
|
||||
/**
|
||||
* Result of [`query_last_block`], returned **inline** (no heap allocation, so
|
||||
* there is no corresponding `free_*` to call).
|
||||
*
|
||||
* `block_id` is only meaningful when `error` is `Ok` *and* `is_some` is
|
||||
* `true`. An `Ok` result with `is_some == false` means the indexer has no
|
||||
* finalized block yet (an empty chain) — which is distinct from an error.
|
||||
*/
|
||||
typedef struct LastBlockIdResult {
|
||||
uint64_t block_id;
|
||||
bool is_some;
|
||||
enum OperationStatus error;
|
||||
} LastBlockIdResult;
|
||||
|
||||
typedef uint64_t FfiBlockId;
|
||||
|
||||
@ -404,14 +406,22 @@ typedef struct PointerResult_FfiVec_FfiTransaction_____OperationStatus {
|
||||
enum OperationStatus error;
|
||||
} PointerResult_FfiVec_FfiTransaction_____OperationStatus;
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif // __cplusplus
|
||||
|
||||
/**
|
||||
* Creates and starts an indexer based on the provided
|
||||
* configuration file path.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* - `runtime`: A runtime for the indexer to run on, or null to have the indexer create and own
|
||||
* one.
|
||||
* - `config_path`: A pointer to a string representing the path to the configuration file.
|
||||
* - `port`: Number representing a port, on which indexers RPC will start.
|
||||
* - `storage_dir`: A pointer to a string naming the directory under which the indexer stores its
|
||||
* state (`RocksDB`), or null/empty to use the current directory. The host (e.g. a Logos module's
|
||||
* instance persistence path) owns this location.
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
@ -420,17 +430,13 @@ typedef struct PointerResult_FfiVec_FfiTransaction_____OperationStatus {
|
||||
*
|
||||
* # Safety
|
||||
* The caller must ensure that:
|
||||
* - `runtime` is a valid pointer to a `tokio::runtime::Runtime` instance.
|
||||
* - `runtime` is either null or a valid pointer to a [`Runtime`] that outlives the indexer.
|
||||
* - `config_path` is a valid pointer to a null-terminated C string.
|
||||
* - `storage_dir` is either null or a valid pointer to a null-terminated C string.
|
||||
*/
|
||||
InitializedIndexerServiceFFIResult start_indexer(const struct Runtime *runtime,
|
||||
const char *config_path,
|
||||
uint16_t port);
|
||||
|
||||
/**
|
||||
* Creates a new [`tokio::runtime::Runtime`].
|
||||
*/
|
||||
struct PointerResult_Runtime__OperationStatus new_runtime(void);
|
||||
const char *storage_dir);
|
||||
|
||||
/**
|
||||
* Stops and frees the resources associated with the given indexer service.
|
||||
@ -452,6 +458,20 @@ struct PointerResult_Runtime__OperationStatus new_runtime(void);
|
||||
*/
|
||||
enum OperationStatus stop_indexer(struct IndexerServiceFFI *indexer);
|
||||
|
||||
/**
|
||||
* Initializes logging for the indexer at `level`.
|
||||
*
|
||||
* - `level` is a null-terminated string (`off`/`error`/`warn`/`info`/`debug`/ `trace`,
|
||||
* case-insensitive); null or unparseable falls back to `info`.
|
||||
*
|
||||
* Only the `indexer_ffi` and `indexer_core` targets are enabled!
|
||||
*
|
||||
* # Safety
|
||||
* - `level` must be a valid null-terminated C string, or null.
|
||||
* - First call to this function wins; subsequent calls are no-ops.
|
||||
*/
|
||||
void init_logger(const char *level);
|
||||
|
||||
/**
|
||||
* # Safety
|
||||
* It's up to the caller to pass a proper pointer, if somehow from c/c++ side
|
||||
@ -469,16 +489,40 @@ void free_cstring(char *block);
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
* A `PointerResult<Option<u64>, OperationStatus>` indicating success or failure.
|
||||
* A [`LastBlockIdResult`] indicating success or failure. The block id is
|
||||
* returned inline; nothing needs to be freed.
|
||||
*
|
||||
* # Safety
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
* - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
*/
|
||||
struct PointerResult_Option_u64_____OperationStatus query_last_block(const struct Runtime *runtime,
|
||||
const struct IndexerServiceFFI *indexer);
|
||||
struct LastBlockIdResult query_last_block(const struct IndexerServiceFFI *indexer);
|
||||
|
||||
/**
|
||||
* Query the indexer's current sync status as a JSON C-string.
|
||||
*
|
||||
* The JSON schema is owned by `indexer_core` (`IndexerStatus`): an object with
|
||||
* `state` (`starting`/`syncing`/`caught_up`/`error`), `indexedBlockId`, and
|
||||
* `lastError`. Lets a client distinguish "still catching up" from "something
|
||||
* went wrong".
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* - `indexer`: A pointer to the [`IndexerServiceFFI`] instance to be queried.
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
* A heap-allocated, null-terminated JSON string that the caller MUST free with
|
||||
* `free_cstring`. Returns null on error (null `indexer` pointer or a
|
||||
* serialization failure).
|
||||
*
|
||||
* # Safety
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
*/
|
||||
char *query_status(const struct IndexerServiceFFI *indexer);
|
||||
|
||||
/**
|
||||
* Query the block by id from indexer.
|
||||
@ -495,15 +539,13 @@ struct PointerResult_Option_u64_____OperationStatus query_last_block(const struc
|
||||
* # Safety
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
* - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
*/
|
||||
struct PointerResult_FfiBlockOpt__OperationStatus query_block(const struct Runtime *runtime,
|
||||
const struct IndexerServiceFFI *indexer,
|
||||
struct PointerResult_FfiBlockOpt__OperationStatus query_block(const struct IndexerServiceFFI *indexer,
|
||||
FfiBlockId block_id);
|
||||
|
||||
/**
|
||||
* Query the block by id from indexer.
|
||||
* Query the block by hash from indexer.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
@ -517,11 +559,9 @@ struct PointerResult_FfiBlockOpt__OperationStatus query_block(const struct Runti
|
||||
* # Safety
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
* - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
*/
|
||||
struct PointerResult_FfiBlockOpt__OperationStatus query_block_by_hash(const struct Runtime *runtime,
|
||||
const struct IndexerServiceFFI *indexer,
|
||||
struct PointerResult_FfiBlockOpt__OperationStatus query_block_by_hash(const struct IndexerServiceFFI *indexer,
|
||||
FfiHashType hash);
|
||||
|
||||
/**
|
||||
@ -539,15 +579,13 @@ struct PointerResult_FfiBlockOpt__OperationStatus query_block_by_hash(const stru
|
||||
* # Safety
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
* - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
*/
|
||||
struct PointerResult_FfiAccount__OperationStatus query_account(const struct Runtime *runtime,
|
||||
const struct IndexerServiceFFI *indexer,
|
||||
struct PointerResult_FfiAccount__OperationStatus query_account(const struct IndexerServiceFFI *indexer,
|
||||
FfiAccountId account_id);
|
||||
|
||||
/**
|
||||
* Query the trasnaction by hash from indexer.
|
||||
* Query the transaction by hash from indexer.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
@ -562,10 +600,8 @@ struct PointerResult_FfiAccount__OperationStatus query_account(const struct Runt
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
* - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
*/
|
||||
struct PointerResult_FfiOption_FfiTransaction_____OperationStatus query_transaction(const struct Runtime *runtime,
|
||||
const struct IndexerServiceFFI *indexer,
|
||||
struct PointerResult_FfiOption_FfiTransaction_____OperationStatus query_transaction(const struct IndexerServiceFFI *indexer,
|
||||
FfiHashType hash);
|
||||
|
||||
/**
|
||||
@ -585,10 +621,8 @@ struct PointerResult_FfiOption_FfiTransaction_____OperationStatus query_transact
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
* - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
*/
|
||||
struct PointerResult_FfiVec_FfiBlock_____OperationStatus query_block_vec(const struct Runtime *runtime,
|
||||
const struct IndexerServiceFFI *indexer,
|
||||
struct PointerResult_FfiVec_FfiBlock_____OperationStatus query_block_vec(const struct IndexerServiceFFI *indexer,
|
||||
struct FfiOption_u64 before,
|
||||
uint64_t limit);
|
||||
|
||||
@ -604,16 +638,14 @@ struct PointerResult_FfiVec_FfiBlock_____OperationStatus query_block_vec(const s
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
* A `PointerResult<FfiVec<FfiBlock>, OperationStatus>` indicating success or failure.
|
||||
* A `PointerResult<FfiVec<FfiTransaction>, OperationStatus>` indicating success or failure.
|
||||
*
|
||||
* # Safety
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
* - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
*/
|
||||
struct PointerResult_FfiVec_FfiTransaction_____OperationStatus query_transactions_by_account(const struct Runtime *runtime,
|
||||
const struct IndexerServiceFFI *indexer,
|
||||
struct PointerResult_FfiVec_FfiTransaction_____OperationStatus query_transactions_by_account(const struct IndexerServiceFFI *indexer,
|
||||
FfiAccountId account_id,
|
||||
uint64_t offset,
|
||||
uint64_t limit);
|
||||
@ -621,9 +653,14 @@ struct PointerResult_FfiVec_FfiTransaction_____OperationStatus query_transaction
|
||||
/**
|
||||
* Frees the resources associated with the given ffi account.
|
||||
*
|
||||
* Takes ownership of the whole allocation produced by a `query_*` call: the
|
||||
* outer `Box<FfiAccount>` (the `PointerResult.value` pointer) *and* its inner
|
||||
* data buffer. Passing the struct by value previously freed only the inner
|
||||
* buffer and leaked the outer box.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* - `val`: An instance of `FfiAccount`.
|
||||
* - `val`: The `*mut FfiAccount` returned in `PointerResult.value`.
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
@ -632,12 +669,18 @@ struct PointerResult_FfiVec_FfiTransaction_____OperationStatus query_transaction
|
||||
* # Safety
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `val` is a valid instance of `FfiAccount`.
|
||||
* - `val` is a pointer to an `FfiAccount` produced by this library and not yet freed.
|
||||
*/
|
||||
void free_ffi_account(struct FfiAccount val);
|
||||
void free_ffi_account(struct FfiAccount *val);
|
||||
|
||||
/**
|
||||
* Frees the resources associated with the given ffi block.
|
||||
* Frees the resources owned by an `FfiBlock` value.
|
||||
*
|
||||
* This frees the block's transaction bodies (the only heap-owning field); the
|
||||
* header/status fields are `Copy`. It operates on the struct by value because
|
||||
* it is an element-level helper, used both for the vector path
|
||||
* ([`free_ffi_block_vec`]) and the optional path ([`free_ffi_block_opt`]) — in
|
||||
* neither case is an `FfiBlock` itself wrapped in its own outer box.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
@ -650,16 +693,20 @@ void free_ffi_account(struct FfiAccount val);
|
||||
* # Safety
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `val` is a valid instance of `FfiBlock`.
|
||||
* - `val` is a valid instance of `FfiBlock` produced by this library and not yet freed.
|
||||
*/
|
||||
void free_ffi_block(struct FfiBlock val);
|
||||
|
||||
/**
|
||||
* Frees the resources associated with the given ffi block option.
|
||||
*
|
||||
* Takes ownership of the whole allocation produced by a `query_*` call: the
|
||||
* outer `Box<FfiBlockOpt>` (the `PointerResult.value` pointer), the inner
|
||||
* `Box<FfiBlock>` (when present), and that block's transaction bodies.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* - `val`: An instance of `FfiBlockOpt`.
|
||||
* - `val`: The `*mut FfiBlockOpt` returned in `PointerResult.value`.
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
@ -668,16 +715,20 @@ void free_ffi_block(struct FfiBlock val);
|
||||
* # Safety
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `val` is a valid instance of `FfiBlockOpt`.
|
||||
* - `val` is a pointer to an `FfiBlockOpt` produced by this library and not yet freed.
|
||||
*/
|
||||
void free_ffi_block_opt(FfiBlockOpt val);
|
||||
void free_ffi_block_opt(FfiBlockOpt *val);
|
||||
|
||||
/**
|
||||
* Frees the resources associated with the given ffi block vector.
|
||||
*
|
||||
* Takes ownership of the whole allocation produced by a `query_*` call: the
|
||||
* outer `Box<FfiVec<FfiBlock>>` (the `PointerResult.value` pointer), the
|
||||
* vector's backing buffer, and every block within it.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* - `val`: An instance of `FfiVec<FfiBlock>`.
|
||||
* - `val`: The `*mut FfiVec<FfiBlock>` returned in `PointerResult.value`.
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
@ -686,9 +737,9 @@ void free_ffi_block_opt(FfiBlockOpt val);
|
||||
* # Safety
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `val` is a valid instance of `FfiVec<FfiBlock>`.
|
||||
* - `val` is a pointer to an `FfiVec<FfiBlock>` produced by this library and not yet freed.
|
||||
*/
|
||||
void free_ffi_block_vec(struct FfiVec_FfiBlock val);
|
||||
void free_ffi_block_vec(struct FfiVec_FfiBlock *val);
|
||||
|
||||
/**
|
||||
* Frees the resources associated with the given ffi transaction.
|
||||
@ -711,9 +762,13 @@ void free_ffi_transaction(struct FfiTransaction val);
|
||||
/**
|
||||
* Frees the resources associated with the given ffi transaction option.
|
||||
*
|
||||
* Takes ownership of the whole allocation produced by a `query_*` call: the
|
||||
* outer `Box<FfiOption<FfiTransaction>>` (the `PointerResult.value` pointer),
|
||||
* the inner `Box<FfiTransaction>` (when present), and its body.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* - `val`: An instance of `FfiOption<FfiTransaction>`.
|
||||
* - `val`: The `*mut FfiOption<FfiTransaction>` returned in `PointerResult.value`.
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
@ -722,16 +777,21 @@ void free_ffi_transaction(struct FfiTransaction val);
|
||||
* # Safety
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `val` is a valid instance of `FfiOption<FfiTransaction>`.
|
||||
* - `val` is a pointer to an `FfiOption<FfiTransaction>` produced by this library and not yet
|
||||
* freed.
|
||||
*/
|
||||
void free_ffi_transaction_opt(struct FfiOption_FfiTransaction val);
|
||||
void free_ffi_transaction_opt(struct FfiOption_FfiTransaction *val);
|
||||
|
||||
/**
|
||||
* Frees the resources associated with the given vector of ffi transactions.
|
||||
*
|
||||
* Takes ownership of the whole allocation produced by a `query_*` call: the
|
||||
* outer `Box<FfiVec<FfiTransaction>>` (the `PointerResult.value` pointer), the
|
||||
* vector's backing buffer, and every transaction within it.
|
||||
*
|
||||
* # Arguments
|
||||
*
|
||||
* - `val`: An instance of `FfiVec<FfiTransaction>`.
|
||||
* - `val`: The `*mut FfiVec<FfiTransaction>` returned in `PointerResult.value`.
|
||||
*
|
||||
* # Returns
|
||||
*
|
||||
@ -740,10 +800,14 @@ void free_ffi_transaction_opt(struct FfiOption_FfiTransaction val);
|
||||
* # Safety
|
||||
*
|
||||
* The caller must ensure that:
|
||||
* - `val` is a valid instance of `FfiVec<FfiTransaction>`.
|
||||
* - `val` is a pointer to an `FfiVec<FfiTransaction>` produced by this library and not yet freed.
|
||||
*/
|
||||
void free_ffi_transaction_vec(struct FfiVec_FfiTransaction val);
|
||||
void free_ffi_transaction_vec(struct FfiVec_FfiTransaction *val);
|
||||
|
||||
bool is_ok(const enum OperationStatus *self);
|
||||
|
||||
bool is_error(const enum OperationStatus *self);
|
||||
|
||||
#ifdef __cplusplus
|
||||
} // extern "C"
|
||||
#endif // __cplusplus
|
||||
|
||||
@ -1,36 +0,0 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use url::Url;
|
||||
|
||||
use crate::OperationStatus;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum UrlProtocol {
|
||||
Http,
|
||||
Ws,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for UrlProtocol {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Http => write!(f, "http"),
|
||||
Self::Ws => write!(f, "ws"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn addr_to_url(protocol: UrlProtocol, addr: SocketAddr) -> Result<Url, OperationStatus> {
|
||||
// Convert 0.0.0.0 to 127.0.0.1 for client connections
|
||||
// When binding to port 0, the server binds to 0.0.0.0:<random_port>
|
||||
// but clients need to connect to 127.0.0.1:<port> to work reliably
|
||||
let url_string = if addr.ip().is_unspecified() {
|
||||
format!("{protocol}://127.0.0.1:{}", addr.port())
|
||||
} else {
|
||||
format!("{protocol}://{addr}")
|
||||
};
|
||||
|
||||
url_string.parse().map_err(|e| {
|
||||
log::error!("Could not parse indexer url: {e}");
|
||||
OperationStatus::InitializationError
|
||||
})
|
||||
}
|
||||
@ -1,14 +1,9 @@
|
||||
use std::{ffi::c_char, path::PathBuf};
|
||||
|
||||
use crate::{
|
||||
IndexerServiceFFI, Runtime,
|
||||
api::{
|
||||
PointerResult,
|
||||
client::{UrlProtocol, addr_to_url},
|
||||
},
|
||||
client::{IndexerClient, IndexerClientTrait as _},
|
||||
errors::OperationStatus,
|
||||
};
|
||||
use futures::StreamExt as _;
|
||||
use indexer_core::{IndexerCore, config::IndexerConfig};
|
||||
|
||||
use crate::{IndexerServiceFFI, Runtime, api::PointerResult, errors::OperationStatus};
|
||||
|
||||
pub type InitializedIndexerServiceFFIResult = PointerResult<IndexerServiceFFI, OperationStatus>;
|
||||
|
||||
@ -17,8 +12,12 @@ pub type InitializedIndexerServiceFFIResult = PointerResult<IndexerServiceFFI, O
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// - `runtime`: A runtime for the indexer to run on, or null to have the indexer create and own
|
||||
/// one.
|
||||
/// - `config_path`: A pointer to a string representing the path to the configuration file.
|
||||
/// - `port`: Number representing a port, on which indexers RPC will start.
|
||||
/// - `storage_dir`: A pointer to a string naming the directory under which the indexer stores its
|
||||
/// state (`RocksDB`), or null/empty to use the current directory. The host (e.g. a Logos module's
|
||||
/// instance persistence path) owns this location.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
@ -27,37 +26,30 @@ pub type InitializedIndexerServiceFFIResult = PointerResult<IndexerServiceFFI, O
|
||||
///
|
||||
/// # Safety
|
||||
/// The caller must ensure that:
|
||||
/// - `runtime` is a valid pointer to a `tokio::runtime::Runtime` instance.
|
||||
/// - `runtime` is either null or a valid pointer to a [`Runtime`] that outlives the indexer.
|
||||
/// - `config_path` is a valid pointer to a null-terminated C string.
|
||||
/// - `storage_dir` is either null or a valid pointer to a null-terminated C string.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn start_indexer(
|
||||
runtime: *const Runtime,
|
||||
config_path: *const c_char,
|
||||
port: u16,
|
||||
storage_dir: *const c_char,
|
||||
) -> InitializedIndexerServiceFFIResult {
|
||||
// SAFETY: The caller must ensure the validness of the `runtime` and `config_path` pointers.
|
||||
unsafe { setup_indexer(runtime, config_path, port) }.map_or_else(
|
||||
// SAFETY: The caller must ensure the validness of the pointer arguments.
|
||||
unsafe { setup_indexer(runtime, config_path, storage_dir) }.map_or_else(
|
||||
InitializedIndexerServiceFFIResult::from_error,
|
||||
InitializedIndexerServiceFFIResult::from_value,
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a new [`tokio::runtime::Runtime`].
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn new_runtime() -> PointerResult<Runtime, OperationStatus> {
|
||||
Runtime::new().map_or_else(
|
||||
|_e| PointerResult::from_error(OperationStatus::InitializationError),
|
||||
PointerResult::from_value,
|
||||
)
|
||||
}
|
||||
|
||||
/// Initializes and starts an indexer based on the provided
|
||||
/// configuration file path.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// - `runtime`: A runtime for the indexer to run on, or null to create and own one.
|
||||
/// - `config_path`: A pointer to a string representing the path to the configuration file.
|
||||
/// - `port`: Number representing a port, on which indexers RPC will start.
|
||||
/// - `storage_dir`: A pointer to a string naming the storage directory, or null/empty for `.`.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
@ -66,12 +58,13 @@ pub extern "C" fn new_runtime() -> PointerResult<Runtime, OperationStatus> {
|
||||
///
|
||||
/// # Safety
|
||||
/// The caller must ensure that:
|
||||
/// - `runtime` is a valid pointer to a `tokio::runtime::Runtime` instance.
|
||||
/// - `runtime` is either null or a valid pointer to a [`Runtime`] that outlives the indexer.
|
||||
/// - `config_path` is a valid pointer to a null-terminated C string.
|
||||
/// - `storage_dir` is either null or a valid pointer to a null-terminated C string.
|
||||
unsafe fn setup_indexer(
|
||||
runtime: *const Runtime,
|
||||
config_path: *const c_char,
|
||||
port: u16,
|
||||
storage_dir: *const c_char,
|
||||
) -> Result<IndexerServiceFFI, OperationStatus> {
|
||||
let user_config_path = PathBuf::from(
|
||||
unsafe { std::ffi::CStr::from_ptr(config_path) }
|
||||
@ -81,31 +74,64 @@ unsafe fn setup_indexer(
|
||||
OperationStatus::InitializationError
|
||||
})?,
|
||||
);
|
||||
let config = indexer_service::IndexerConfig::from_path(&user_config_path).map_err(|e| {
|
||||
let config = IndexerConfig::from_path(&user_config_path).map_err(|e| {
|
||||
log::error!("Failed to read config: {e}");
|
||||
OperationStatus::InitializationError
|
||||
})?;
|
||||
|
||||
// SAFETY: The caller must ensure that `runtime` is a valid pointer to a
|
||||
// `tokio::runtime::Runtime` instance.
|
||||
let runtime = unsafe { &*runtime };
|
||||
// The host owns where state lives. An empty/null `storage_dir` falls back to
|
||||
// the current directory (matches the standalone service's `--data-dir`
|
||||
// default), but a Logos module passes its instance persistence path.
|
||||
let storage_dir = if storage_dir.is_null() {
|
||||
PathBuf::from(".")
|
||||
} else {
|
||||
let storage_dir = unsafe { std::ffi::CStr::from_ptr(storage_dir) }
|
||||
.to_str()
|
||||
.map_err(|e| {
|
||||
log::error!("Could not convert the storage dir to string: {e}");
|
||||
OperationStatus::InitializationError
|
||||
})?;
|
||||
if storage_dir.is_empty() {
|
||||
PathBuf::from(".")
|
||||
} else {
|
||||
PathBuf::from(storage_dir)
|
||||
}
|
||||
};
|
||||
|
||||
let indexer_handle = runtime
|
||||
.block_on(indexer_service::run_server(config, port))
|
||||
.map_err(|e| {
|
||||
log::error!("Could not start indexer service: {e}");
|
||||
// Use the caller's runtime if one was supplied, otherwise create (and own)
|
||||
// our own. The `Runtime` wrapper drops the underlying tokio runtime only
|
||||
// when we own it; a borrowed one is left to its external owner.
|
||||
let runtime = if runtime.is_null() {
|
||||
Runtime::new().map_err(|e| {
|
||||
log::error!("Could not create tokio runtime: {e}");
|
||||
OperationStatus::InitializationError
|
||||
})?;
|
||||
})?
|
||||
} else {
|
||||
// SAFETY: the caller guarantees `runtime` is valid and outlives the indexer.
|
||||
let caller = unsafe { &*runtime };
|
||||
unsafe { Runtime::from_borrowed(caller.as_ref()) }
|
||||
};
|
||||
|
||||
let indexer_url = addr_to_url(UrlProtocol::Ws, indexer_handle.addr())?;
|
||||
let indexer_client = runtime
|
||||
.block_on(IndexerClient::new(&indexer_url))
|
||||
.map_err(|e| {
|
||||
log::error!("Could not start indexer client: {e}");
|
||||
OperationStatus::InitializationError
|
||||
})?;
|
||||
let core = IndexerCore::new(config, &storage_dir).map_err(|e| {
|
||||
log::error!("Could not initialize indexer core: {e}");
|
||||
OperationStatus::InitializationError
|
||||
})?;
|
||||
|
||||
Ok(IndexerServiceFFI::new(indexer_handle, indexer_client))
|
||||
// The block stream writes each parsed block into the store as a side effect
|
||||
// of being polled, so we spawn a task that simply drains it. There are no
|
||||
// subscribers — queries read the store directly via `core()`.
|
||||
let ingest_core = core.clone();
|
||||
let ingest_handle = runtime.spawn(async move {
|
||||
let mut block_stream = std::pin::pin!(ingest_core.subscribe_parse_block_stream());
|
||||
while let Some(result) = block_stream.next().await {
|
||||
if let Err(e) = result {
|
||||
log::error!("Indexer ingestion error: {e:#}");
|
||||
}
|
||||
}
|
||||
log::warn!("Indexer block stream ended");
|
||||
});
|
||||
|
||||
Ok(IndexerServiceFFI::new(core, ingest_handle, runtime))
|
||||
}
|
||||
|
||||
/// Stops and frees the resources associated with the given indexer service.
|
||||
|
||||
32
lez/indexer/ffi/src/api/logging.rs
Normal file
32
lez/indexer/ffi/src/api/logging.rs
Normal file
@ -0,0 +1,32 @@
|
||||
use std::ffi::{CStr, c_char};
|
||||
|
||||
use log::LevelFilter;
|
||||
|
||||
/// Initializes logging for the indexer at `level`.
|
||||
///
|
||||
/// - `level` is a null-terminated string (`off`/`error`/`warn`/`info`/`debug`/ `trace`,
|
||||
/// case-insensitive); null or unparseable falls back to `info`.
|
||||
///
|
||||
/// Only the `indexer_ffi` and `indexer_core` targets are enabled!
|
||||
///
|
||||
/// # Safety
|
||||
/// - `level` must be a valid null-terminated C string, or null.
|
||||
/// - First call to this function wins; subsequent calls are no-ops.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn init_logger(level: *const c_char) {
|
||||
let level = if level.is_null() {
|
||||
LevelFilter::Info
|
||||
} else {
|
||||
unsafe { CStr::from_ptr(level) }
|
||||
.to_str()
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(LevelFilter::Info)
|
||||
};
|
||||
|
||||
let _dontcare = env_logger::Builder::new()
|
||||
.filter_level(LevelFilter::Off)
|
||||
.filter_module("indexer_ffi", level)
|
||||
.filter_module("indexer_core", level)
|
||||
.try_init();
|
||||
}
|
||||
@ -1,7 +1,7 @@
|
||||
pub use result::PointerResult;
|
||||
|
||||
pub mod client;
|
||||
pub mod lifecycle;
|
||||
pub mod logging;
|
||||
pub mod memory;
|
||||
pub mod query;
|
||||
pub mod result;
|
||||
|
||||
@ -1,8 +1,9 @@
|
||||
use indexer_service_protocol::{AccountId, HashType};
|
||||
use indexer_service_rpc::RpcClient as _;
|
||||
use std::ffi::{CString, c_char};
|
||||
|
||||
use indexer_service_protocol::AccountId;
|
||||
|
||||
use crate::{
|
||||
IndexerServiceFFI, Runtime,
|
||||
IndexerServiceFFI,
|
||||
api::{
|
||||
PointerResult,
|
||||
types::{
|
||||
@ -15,6 +16,45 @@ use crate::{
|
||||
errors::OperationStatus,
|
||||
};
|
||||
|
||||
/// Result of [`query_last_block`], returned **inline** (no heap allocation, so
|
||||
/// there is no corresponding `free_*` to call).
|
||||
///
|
||||
/// `block_id` is only meaningful when `error` is `Ok` *and* `is_some` is
|
||||
/// `true`. An `Ok` result with `is_some == false` means the indexer has no
|
||||
/// finalized block yet (an empty chain) — which is distinct from an error.
|
||||
#[repr(C)]
|
||||
pub struct LastBlockIdResult {
|
||||
pub block_id: u64,
|
||||
pub is_some: bool,
|
||||
pub error: OperationStatus,
|
||||
}
|
||||
|
||||
impl LastBlockIdResult {
|
||||
const fn error(error: OperationStatus) -> Self {
|
||||
Self {
|
||||
block_id: 0,
|
||||
is_some: false,
|
||||
error,
|
||||
}
|
||||
}
|
||||
|
||||
const fn none() -> Self {
|
||||
Self {
|
||||
block_id: 0,
|
||||
is_some: false,
|
||||
error: OperationStatus::Ok,
|
||||
}
|
||||
}
|
||||
|
||||
const fn some(block_id: u64) -> Self {
|
||||
Self {
|
||||
block_id,
|
||||
is_some: true,
|
||||
error: OperationStatus::Ok,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Query the last block id from indexer.
|
||||
///
|
||||
/// # Arguments
|
||||
@ -23,34 +63,79 @@ use crate::{
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A `PointerResult<Option<u64>, OperationStatus>` indicating success or failure.
|
||||
/// A [`LastBlockIdResult`] indicating success or failure. The block id is
|
||||
/// returned inline; nothing needs to be freed.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
/// - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn query_last_block(
|
||||
runtime: *const Runtime,
|
||||
indexer: *const IndexerServiceFFI,
|
||||
) -> PointerResult<Option<u64>, OperationStatus> {
|
||||
pub unsafe extern "C" fn query_last_block(indexer: *const IndexerServiceFFI) -> LastBlockIdResult {
|
||||
if indexer.is_null() {
|
||||
log::error!("Attempted to query a null indexer pointer. This is a bug. Aborting.");
|
||||
return PointerResult::from_error(OperationStatus::NullPointer);
|
||||
return LastBlockIdResult::error(OperationStatus::NullPointer);
|
||||
}
|
||||
|
||||
let indexer = unsafe { &*indexer };
|
||||
|
||||
let client = indexer.client();
|
||||
let runtime = unsafe { &*runtime };
|
||||
indexer.core().store.get_last_block_id().map_or_else(
|
||||
|e| {
|
||||
log::error!("Failed to query last block id: {e:#}");
|
||||
LastBlockIdResult::error(OperationStatus::ClientError)
|
||||
},
|
||||
|opt| opt.map_or_else(LastBlockIdResult::none, LastBlockIdResult::some),
|
||||
)
|
||||
}
|
||||
|
||||
runtime
|
||||
.block_on(client.get_last_finalized_block_id())
|
||||
.map_or_else(
|
||||
|_| PointerResult::from_error(OperationStatus::ClientError),
|
||||
PointerResult::from_value,
|
||||
)
|
||||
/// Query the indexer's current sync status as a JSON C-string.
|
||||
///
|
||||
/// The JSON schema is owned by `indexer_core` (`IndexerStatus`): an object with
|
||||
/// `state` (`starting`/`syncing`/`caught_up`/`error`), `indexedBlockId`, and
|
||||
/// `lastError`. Lets a client distinguish "still catching up" from "something
|
||||
/// went wrong".
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// - `indexer`: A pointer to the [`IndexerServiceFFI`] instance to be queried.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A heap-allocated, null-terminated JSON string that the caller MUST free with
|
||||
/// `free_cstring`. Returns null on error (null `indexer` pointer or a
|
||||
/// serialization failure).
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn query_status(indexer: *const IndexerServiceFFI) -> *mut c_char {
|
||||
if indexer.is_null() {
|
||||
log::error!(
|
||||
"Attempted to query status on a null indexer pointer. This is a bug. Aborting."
|
||||
);
|
||||
return std::ptr::null_mut();
|
||||
}
|
||||
|
||||
let indexer = unsafe { &*indexer };
|
||||
let status = indexer.core().status();
|
||||
|
||||
let json = match serde_json::to_string(&status) {
|
||||
Ok(json) => json,
|
||||
Err(e) => {
|
||||
log::error!("Failed to serialize indexer status: {e}");
|
||||
return std::ptr::null_mut();
|
||||
}
|
||||
};
|
||||
|
||||
CString::new(json).map_or_else(
|
||||
|e| {
|
||||
log::error!("Indexer status JSON contained an interior nul byte: {e}");
|
||||
std::ptr::null_mut()
|
||||
},
|
||||
CString::into_raw,
|
||||
)
|
||||
}
|
||||
|
||||
/// Query the block by id from indexer.
|
||||
@ -67,11 +152,9 @@ pub unsafe extern "C" fn query_last_block(
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
/// - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn query_block(
|
||||
runtime: *const Runtime,
|
||||
indexer: *const IndexerServiceFFI,
|
||||
block_id: FfiBlockId,
|
||||
) -> PointerResult<FfiBlockOpt, OperationStatus> {
|
||||
@ -82,24 +165,23 @@ pub unsafe extern "C" fn query_block(
|
||||
|
||||
let indexer = unsafe { &*indexer };
|
||||
|
||||
let client = indexer.client();
|
||||
let runtime = unsafe { &*runtime };
|
||||
indexer.core().store.get_block_at_id(block_id).map_or_else(
|
||||
|e| {
|
||||
log::error!("Failed to query block by id: {e:#}");
|
||||
PointerResult::from_error(OperationStatus::ClientError)
|
||||
},
|
||||
|block_opt| {
|
||||
let block_ffi = block_opt.map_or_else(FfiBlockOpt::from_none, |block| {
|
||||
let block: indexer_service_protocol::Block = block.into();
|
||||
FfiBlockOpt::from_value(block.into())
|
||||
});
|
||||
|
||||
runtime
|
||||
.block_on(client.get_block_by_id(block_id))
|
||||
.map_or_else(
|
||||
|_| PointerResult::from_error(OperationStatus::ClientError),
|
||||
|block_opt| {
|
||||
let block_ffi = block_opt.map_or_else(FfiBlockOpt::from_none, |block| {
|
||||
FfiBlockOpt::from_value(block.into())
|
||||
});
|
||||
|
||||
PointerResult::from_value(block_ffi)
|
||||
},
|
||||
)
|
||||
PointerResult::from_value(block_ffi)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Query the block by id from indexer.
|
||||
/// Query the block by hash from indexer.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
@ -113,11 +195,9 @@ pub unsafe extern "C" fn query_block(
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
/// - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn query_block_by_hash(
|
||||
runtime: *const Runtime,
|
||||
indexer: *const IndexerServiceFFI,
|
||||
hash: FfiHashType,
|
||||
) -> PointerResult<FfiBlockOpt, OperationStatus> {
|
||||
@ -128,15 +208,18 @@ pub unsafe extern "C" fn query_block_by_hash(
|
||||
|
||||
let indexer = unsafe { &*indexer };
|
||||
|
||||
let client = indexer.client();
|
||||
let runtime = unsafe { &*runtime };
|
||||
|
||||
runtime
|
||||
.block_on(client.get_block_by_hash(HashType(hash.data)))
|
||||
indexer
|
||||
.core()
|
||||
.store
|
||||
.get_block_by_hash(hash.data)
|
||||
.map_or_else(
|
||||
|_| PointerResult::from_error(OperationStatus::ClientError),
|
||||
|e| {
|
||||
log::error!("Failed to query block by hash: {e:#}");
|
||||
PointerResult::from_error(OperationStatus::ClientError)
|
||||
},
|
||||
|block_opt| {
|
||||
let block_ffi = block_opt.map_or_else(FfiBlockOpt::from_none, |block| {
|
||||
let block: indexer_service_protocol::Block = block.into();
|
||||
FfiBlockOpt::from_value(block.into())
|
||||
});
|
||||
|
||||
@ -159,11 +242,9 @@ pub unsafe extern "C" fn query_block_by_hash(
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
/// - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn query_account(
|
||||
runtime: *const Runtime,
|
||||
indexer: *const IndexerServiceFFI,
|
||||
account_id: FfiAccountId,
|
||||
) -> PointerResult<FfiAccount, OperationStatus> {
|
||||
@ -174,23 +255,29 @@ pub unsafe extern "C" fn query_account(
|
||||
|
||||
let indexer = unsafe { &*indexer };
|
||||
|
||||
let client = indexer.client();
|
||||
let runtime = unsafe { &*runtime };
|
||||
|
||||
runtime
|
||||
.block_on(client.get_account(AccountId {
|
||||
value: account_id.data,
|
||||
}))
|
||||
// `account_current_state` is the only async store call; drive it on the
|
||||
// runtime the indexer was started on.
|
||||
let account_id = AccountId {
|
||||
value: account_id.data,
|
||||
};
|
||||
indexer
|
||||
.runtime()
|
||||
.block_on(
|
||||
indexer
|
||||
.core()
|
||||
.store
|
||||
.account_current_state(&account_id.into()),
|
||||
)
|
||||
.map_or_else(
|
||||
|_| PointerResult::from_error(OperationStatus::ClientError),
|
||||
|acc| {
|
||||
let acc_lee: lee::Account = acc.try_into().expect("Source is in blocks, must fit");
|
||||
PointerResult::from_value(acc_lee.into())
|
||||
|e| {
|
||||
log::error!("Failed to query account: {e:#}");
|
||||
PointerResult::from_error(OperationStatus::ClientError)
|
||||
},
|
||||
|account| PointerResult::from_value(account.into()),
|
||||
)
|
||||
}
|
||||
|
||||
/// Query the trasnaction by hash from indexer.
|
||||
/// Query the transaction by hash from indexer.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
@ -205,10 +292,8 @@ pub unsafe extern "C" fn query_account(
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
/// - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn query_transaction(
|
||||
runtime: *const Runtime,
|
||||
indexer: *const IndexerServiceFFI,
|
||||
hash: FfiHashType,
|
||||
) -> PointerResult<FfiOption<FfiTransaction>, OperationStatus> {
|
||||
@ -219,15 +304,18 @@ pub unsafe extern "C" fn query_transaction(
|
||||
|
||||
let indexer = unsafe { &*indexer };
|
||||
|
||||
let client = indexer.client();
|
||||
let runtime = unsafe { &*runtime };
|
||||
|
||||
runtime
|
||||
.block_on(client.get_transaction(HashType(hash.data)))
|
||||
indexer
|
||||
.core()
|
||||
.store
|
||||
.get_transaction_by_hash(hash.data)
|
||||
.map_or_else(
|
||||
|_| PointerResult::from_error(OperationStatus::ClientError),
|
||||
|e| {
|
||||
log::error!("Failed to query transaction: {e:#}");
|
||||
PointerResult::from_error(OperationStatus::ClientError)
|
||||
},
|
||||
|tx_opt| {
|
||||
let tx_ffi = tx_opt.map_or_else(FfiOption::<FfiTransaction>::from_none, |tx| {
|
||||
let tx: indexer_service_protocol::Transaction = tx.into();
|
||||
FfiOption::<FfiTransaction>::from_value(tx.into())
|
||||
});
|
||||
|
||||
@ -252,10 +340,8 @@ pub unsafe extern "C" fn query_transaction(
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
/// - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn query_block_vec(
|
||||
runtime: *const Runtime,
|
||||
indexer: *const IndexerServiceFFI,
|
||||
before: FfiOption<u64>,
|
||||
limit: u64,
|
||||
@ -267,21 +353,26 @@ pub unsafe extern "C" fn query_block_vec(
|
||||
|
||||
let indexer = unsafe { &*indexer };
|
||||
|
||||
let client = indexer.client();
|
||||
let runtime = unsafe { &*runtime };
|
||||
|
||||
let before_std = before.is_some.then(|| unsafe { *before.value });
|
||||
|
||||
runtime
|
||||
.block_on(client.get_blocks(before_std, limit))
|
||||
indexer
|
||||
.core()
|
||||
.store
|
||||
.get_block_batch(before_std, limit)
|
||||
.map_or_else(
|
||||
|_| PointerResult::from_error(OperationStatus::ClientError),
|
||||
|e| {
|
||||
log::error!("Failed to query block batch: {e:#}");
|
||||
PointerResult::from_error(OperationStatus::ClientError)
|
||||
},
|
||||
|block_vec| {
|
||||
PointerResult::from_value(
|
||||
block_vec
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect::<Vec<_>>()
|
||||
.map(|block| {
|
||||
let block: indexer_service_protocol::Block = block.into();
|
||||
block.into()
|
||||
})
|
||||
.collect::<Vec<FfiBlock>>()
|
||||
.into(),
|
||||
)
|
||||
},
|
||||
@ -299,16 +390,14 @@ pub unsafe extern "C" fn query_block_vec(
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A `PointerResult<FfiVec<FfiBlock>, OperationStatus>` indicating success or failure.
|
||||
/// A `PointerResult<FfiVec<FfiTransaction>, OperationStatus>` indicating success or failure.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
|
||||
/// - `runtime` is a valid pointer to a [`Runtime`] instance.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn query_transactions_by_account(
|
||||
runtime: *const Runtime,
|
||||
indexer: *const IndexerServiceFFI,
|
||||
account_id: FfiAccountId,
|
||||
offset: u64,
|
||||
@ -321,25 +410,24 @@ pub unsafe extern "C" fn query_transactions_by_account(
|
||||
|
||||
let indexer = unsafe { &*indexer };
|
||||
|
||||
let client = indexer.client();
|
||||
let runtime = unsafe { &*runtime };
|
||||
|
||||
runtime
|
||||
.block_on(client.get_transactions_by_account(
|
||||
AccountId {
|
||||
value: account_id.data,
|
||||
},
|
||||
offset,
|
||||
limit,
|
||||
))
|
||||
indexer
|
||||
.core()
|
||||
.store
|
||||
.get_transactions_by_account(account_id.data, offset, limit)
|
||||
.map_or_else(
|
||||
|_| PointerResult::from_error(OperationStatus::ClientError),
|
||||
|e| {
|
||||
log::error!("Failed to query transactions by account: {e:#}");
|
||||
PointerResult::from_error(OperationStatus::ClientError)
|
||||
},
|
||||
|tx_vec| {
|
||||
PointerResult::from_value(
|
||||
tx_vec
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect::<Vec<_>>()
|
||||
.map(|tx| {
|
||||
let tx: indexer_service_protocol::Transaction = tx.into();
|
||||
tx.into()
|
||||
})
|
||||
.collect::<Vec<FfiTransaction>>()
|
||||
.into(),
|
||||
)
|
||||
},
|
||||
|
||||
@ -100,9 +100,14 @@ impl From<&FfiAccount> for indexer_service_protocol::Account {
|
||||
|
||||
/// Frees the resources associated with the given ffi account.
|
||||
///
|
||||
/// Takes ownership of the whole allocation produced by a `query_*` call: the
|
||||
/// outer `Box<FfiAccount>` (the `PointerResult.value` pointer) *and* its inner
|
||||
/// data buffer. Passing the struct by value previously freed only the inner
|
||||
/// buffer and leaked the outer box.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// - `val`: An instance of `FfiAccount`.
|
||||
/// - `val`: The `*mut FfiAccount` returned in `PointerResult.value`.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
@ -111,9 +116,15 @@ impl From<&FfiAccount> for indexer_service_protocol::Account {
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `val` is a valid instance of `FfiAccount`.
|
||||
/// - `val` is a pointer to an `FfiAccount` produced by this library and not yet freed.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn free_ffi_account(val: FfiAccount) {
|
||||
let orig_val: indexer_service_protocol::Account = val.into();
|
||||
pub unsafe extern "C" fn free_ffi_account(val: *mut FfiAccount) {
|
||||
if val.is_null() {
|
||||
log::error!("Trying to free a null pointer. Exiting");
|
||||
return;
|
||||
}
|
||||
// Reclaim the outer box, then convert to drop the inner data buffer.
|
||||
let boxed = unsafe { Box::from_raw(val) };
|
||||
let orig_val: indexer_service_protocol::Account = (*boxed).into();
|
||||
drop(orig_val);
|
||||
}
|
||||
|
||||
@ -2,7 +2,7 @@ use indexer_service_protocol::{BedrockStatus, Block, BlockHeader, HashType, Sign
|
||||
|
||||
use crate::api::types::{
|
||||
FfiBlockId, FfiHashType, FfiOption, FfiSignature, FfiTimestamp, FfiVec,
|
||||
transaction::free_ffi_transaction_vec, vectors::FfiBlockBody,
|
||||
transaction::free_transaction_vec_value, vectors::FfiBlockBody,
|
||||
};
|
||||
|
||||
#[repr(C)]
|
||||
@ -91,7 +91,13 @@ impl From<FfiBedrockStatus> for BedrockStatus {
|
||||
}
|
||||
}
|
||||
|
||||
/// Frees the resources associated with the given ffi block.
|
||||
/// Frees the resources owned by an `FfiBlock` value.
|
||||
///
|
||||
/// This frees the block's transaction bodies (the only heap-owning field); the
|
||||
/// header/status fields are `Copy`. It operates on the struct by value because
|
||||
/// it is an element-level helper, used both for the vector path
|
||||
/// ([`free_ffi_block_vec`]) and the optional path ([`free_ffi_block_opt`]) — in
|
||||
/// neither case is an `FfiBlock` itself wrapped in its own outer box.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
@ -104,7 +110,7 @@ impl From<FfiBedrockStatus> for BedrockStatus {
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `val` is a valid instance of `FfiBlock`.
|
||||
/// - `val` is a valid instance of `FfiBlock` produced by this library and not yet freed.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn free_ffi_block(val: FfiBlock) {
|
||||
// We don't really need all the casts, but just in case
|
||||
@ -121,16 +127,18 @@ pub unsafe extern "C" fn free_ffi_block(val: FfiBlock) {
|
||||
#[expect(clippy::let_underscore_must_use, reason = "No use for this Copy type")]
|
||||
let _: BedrockStatus = val.bedrock_status.into();
|
||||
|
||||
unsafe {
|
||||
free_ffi_transaction_vec(ffi_tx_ffi_vec);
|
||||
};
|
||||
free_transaction_vec_value(ffi_tx_ffi_vec);
|
||||
}
|
||||
|
||||
/// Frees the resources associated with the given ffi block option.
|
||||
///
|
||||
/// Takes ownership of the whole allocation produced by a `query_*` call: the
|
||||
/// outer `Box<FfiBlockOpt>` (the `PointerResult.value` pointer), the inner
|
||||
/// `Box<FfiBlock>` (when present), and that block's transaction bodies.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// - `val`: An instance of `FfiBlockOpt`.
|
||||
/// - `val`: The `*mut FfiBlockOpt` returned in `PointerResult.value`.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
@ -139,37 +147,32 @@ pub unsafe extern "C" fn free_ffi_block(val: FfiBlock) {
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `val` is a valid instance of `FfiBlockOpt`.
|
||||
/// - `val` is a pointer to an `FfiBlockOpt` produced by this library and not yet freed.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn free_ffi_block_opt(val: FfiBlockOpt) {
|
||||
if val.is_some {
|
||||
let value = unsafe { Box::from_raw(val.value) };
|
||||
|
||||
// We don't really need all the casts, but just in case
|
||||
// All except `ffi_tx_ffi_vec` is Copy types, so no need for Drop
|
||||
let _ = BlockHeader {
|
||||
block_id: value.header.block_id,
|
||||
prev_block_hash: HashType(value.header.prev_block_hash.data),
|
||||
hash: HashType(value.header.hash.data),
|
||||
timestamp: value.header.timestamp,
|
||||
signature: Signature(value.header.signature.data),
|
||||
};
|
||||
let ffi_tx_ffi_vec = value.body;
|
||||
|
||||
#[expect(clippy::let_underscore_must_use, reason = "No use for this Copy type")]
|
||||
let _: BedrockStatus = value.bedrock_status.into();
|
||||
|
||||
pub unsafe extern "C" fn free_ffi_block_opt(val: *mut FfiBlockOpt) {
|
||||
if val.is_null() {
|
||||
log::error!("Trying to free a null pointer. Exiting");
|
||||
return;
|
||||
}
|
||||
// Reclaim the outer box, then the inner block box (if any).
|
||||
let opt = unsafe { Box::from_raw(val) };
|
||||
if opt.is_some {
|
||||
let block = unsafe { Box::from_raw(opt.value) };
|
||||
unsafe {
|
||||
free_ffi_transaction_vec(ffi_tx_ffi_vec);
|
||||
};
|
||||
free_ffi_block(*block);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Frees the resources associated with the given ffi block vector.
|
||||
///
|
||||
/// Takes ownership of the whole allocation produced by a `query_*` call: the
|
||||
/// outer `Box<FfiVec<FfiBlock>>` (the `PointerResult.value` pointer), the
|
||||
/// vector's backing buffer, and every block within it.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// - `val`: An instance of `FfiVec<FfiBlock>`.
|
||||
/// - `val`: The `*mut FfiVec<FfiBlock>` returned in `PointerResult.value`.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
@ -178,10 +181,16 @@ pub unsafe extern "C" fn free_ffi_block_opt(val: FfiBlockOpt) {
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `val` is a valid instance of `FfiVec<FfiBlock>`.
|
||||
/// - `val` is a pointer to an `FfiVec<FfiBlock>` produced by this library and not yet freed.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn free_ffi_block_vec(val: FfiVec<FfiBlock>) {
|
||||
let ffi_block_std_vec: Vec<_> = val.into();
|
||||
pub unsafe extern "C" fn free_ffi_block_vec(val: *mut FfiVec<FfiBlock>) {
|
||||
if val.is_null() {
|
||||
log::error!("Trying to free a null pointer. Exiting");
|
||||
return;
|
||||
}
|
||||
// Reclaim the outer box, then the backing buffer and each block.
|
||||
let boxed = unsafe { Box::from_raw(val) };
|
||||
let ffi_block_std_vec: Vec<_> = (*boxed).into();
|
||||
for block in ffi_block_std_vec {
|
||||
unsafe {
|
||||
free_ffi_block(block);
|
||||
|
||||
@ -463,9 +463,13 @@ pub unsafe extern "C" fn free_ffi_transaction(val: FfiTransaction) {
|
||||
|
||||
/// Frees the resources associated with the given ffi transaction option.
|
||||
///
|
||||
/// Takes ownership of the whole allocation produced by a `query_*` call: the
|
||||
/// outer `Box<FfiOption<FfiTransaction>>` (the `PointerResult.value` pointer),
|
||||
/// the inner `Box<FfiTransaction>` (when present), and its body.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// - `val`: An instance of `FfiOption<FfiTransaction>`.
|
||||
/// - `val`: The `*mut FfiOption<FfiTransaction>` returned in `PointerResult.value`.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
@ -474,48 +478,32 @@ pub unsafe extern "C" fn free_ffi_transaction(val: FfiTransaction) {
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `val` is a valid instance of `FfiOption<FfiTransaction>`.
|
||||
/// - `val` is a pointer to an `FfiOption<FfiTransaction>` produced by this library and not yet
|
||||
/// freed.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn free_ffi_transaction_opt(val: FfiOption<FfiTransaction>) {
|
||||
if val.is_some {
|
||||
let value = unsafe { Box::from_raw(val.value) };
|
||||
|
||||
match value.kind {
|
||||
FfiTransactionKind::Public => {
|
||||
let body = unsafe { Box::from_raw(value.body.public_body) };
|
||||
let std_body: PublicTransaction = body.into();
|
||||
drop(std_body);
|
||||
}
|
||||
FfiTransactionKind::Private => {
|
||||
let body = unsafe { Box::from_raw(value.body.private_body) };
|
||||
let std_body: PrivacyPreservingTransaction = body.into();
|
||||
drop(std_body);
|
||||
}
|
||||
FfiTransactionKind::ProgramDeploy => {
|
||||
let body = unsafe { Box::from_raw(value.body.program_deployment_body) };
|
||||
let std_body: ProgramDeploymentTransaction = body.into();
|
||||
drop(std_body);
|
||||
}
|
||||
pub unsafe extern "C" fn free_ffi_transaction_opt(val: *mut FfiOption<FfiTransaction>) {
|
||||
if val.is_null() {
|
||||
log::error!("Trying to free a null pointer. Exiting");
|
||||
return;
|
||||
}
|
||||
// Reclaim the outer box, then the inner transaction box (if any).
|
||||
let opt = unsafe { Box::from_raw(val) };
|
||||
if opt.is_some {
|
||||
let tx = unsafe { Box::from_raw(opt.value) };
|
||||
unsafe {
|
||||
free_ffi_transaction(*tx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Frees the resources associated with the given vector of ffi transactions.
|
||||
/// Frees the resources owned by an `FfiVec<FfiTransaction>` value (the backing
|
||||
/// buffer and each transaction), without owning an outer box.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// - `val`: An instance of `FfiVec<FfiTransaction>`.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// void.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `val` is a valid instance of `FfiVec<FfiTransaction>`.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn free_ffi_transaction_vec(val: FfiVec<FfiTransaction>) {
|
||||
/// This is the element-level helper shared by the block free path
|
||||
/// ([`crate::api::types::block::free_ffi_block`], whose body is a transaction
|
||||
/// vector held by value) and the public [`free_ffi_transaction_vec`] entry
|
||||
/// point (which first reclaims the outer box).
|
||||
pub(crate) fn free_transaction_vec_value(val: FfiVec<FfiTransaction>) {
|
||||
let ffi_tx_std_vec: Vec<_> = val.into();
|
||||
for tx in ffi_tx_std_vec {
|
||||
unsafe {
|
||||
@ -524,6 +512,35 @@ pub unsafe extern "C" fn free_ffi_transaction_vec(val: FfiVec<FfiTransaction>) {
|
||||
}
|
||||
}
|
||||
|
||||
/// Frees the resources associated with the given vector of ffi transactions.
|
||||
///
|
||||
/// Takes ownership of the whole allocation produced by a `query_*` call: the
|
||||
/// outer `Box<FfiVec<FfiTransaction>>` (the `PointerResult.value` pointer), the
|
||||
/// vector's backing buffer, and every transaction within it.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// - `val`: The `*mut FfiVec<FfiTransaction>` returned in `PointerResult.value`.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// void.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The caller must ensure that:
|
||||
/// - `val` is a pointer to an `FfiVec<FfiTransaction>` produced by this library and not yet freed.
|
||||
#[unsafe(no_mangle)]
|
||||
pub unsafe extern "C" fn free_ffi_transaction_vec(val: *mut FfiVec<FfiTransaction>) {
|
||||
if val.is_null() {
|
||||
log::error!("Trying to free a null pointer. Exiting");
|
||||
return;
|
||||
}
|
||||
// Reclaim the outer box, then the backing buffer and each transaction.
|
||||
let boxed = unsafe { Box::from_raw(val) };
|
||||
free_transaction_vec_value(*boxed);
|
||||
}
|
||||
|
||||
fn cast_validity_window(window: ValidityWindow) -> [u64; 2] {
|
||||
[
|
||||
window.0.0.unwrap_or_default(),
|
||||
|
||||
@ -1,33 +0,0 @@
|
||||
use std::{ops::Deref, sync::Arc};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use log::info;
|
||||
pub use url::Url;
|
||||
|
||||
pub trait IndexerClientTrait: Clone {
|
||||
async fn new(indexer_url: &Url) -> Result<Self>;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IndexerClient(Arc<jsonrpsee::ws_client::WsClient>);
|
||||
|
||||
impl IndexerClientTrait for IndexerClient {
|
||||
async fn new(indexer_url: &Url) -> Result<Self> {
|
||||
info!("Connecting to Indexer at {indexer_url}");
|
||||
|
||||
let client = jsonrpsee::ws_client::WsClientBuilder::default()
|
||||
.build(indexer_url)
|
||||
.await
|
||||
.context("Failed to create websocket client")?;
|
||||
|
||||
Ok(Self(Arc::new(client)))
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for IndexerClient {
|
||||
type Target = jsonrpsee::ws_client::WsClient;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
@ -1,95 +1,66 @@
|
||||
use std::{ffi::c_void, net::SocketAddr};
|
||||
use std::ffi::c_void;
|
||||
|
||||
use indexer_service::IndexerHandle;
|
||||
use indexer_core::IndexerCore;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::client::IndexerClient;
|
||||
use crate::Runtime;
|
||||
|
||||
/// FFI-owned indexer.
|
||||
///
|
||||
/// - An [`IndexerCore`] used to answer queries
|
||||
/// - The background task [`JoinHandle`] that drives ingestion (consuming the block stream so the
|
||||
/// store stays populated)
|
||||
/// - The [`Runtime`] used to run async queries against the store (either owned or borrowed),
|
||||
/// already FFI-safe.
|
||||
#[repr(C)]
|
||||
pub struct IndexerServiceFFI {
|
||||
indexer_handle: *mut c_void,
|
||||
indexer_client: *mut c_void,
|
||||
core: *mut c_void,
|
||||
ingest_handle: *mut c_void,
|
||||
runtime: Runtime,
|
||||
}
|
||||
|
||||
impl IndexerServiceFFI {
|
||||
#[must_use]
|
||||
pub fn new(
|
||||
indexer_handle: indexer_service::IndexerHandle,
|
||||
indexer_client: IndexerClient,
|
||||
) -> Self {
|
||||
pub fn new(core: IndexerCore, ingest_handle: JoinHandle<()>, runtime: Runtime) -> Self {
|
||||
Self {
|
||||
// Box the complex types and convert to opaque pointers
|
||||
indexer_handle: Box::into_raw(Box::new(indexer_handle)).cast::<c_void>(),
|
||||
indexer_client: Box::into_raw(Box::new(indexer_client)).cast::<c_void>(),
|
||||
core: Box::into_raw(Box::new(core)).cast::<c_void>(),
|
||||
ingest_handle: Box::into_raw(Box::new(ingest_handle)).cast::<c_void>(),
|
||||
runtime,
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to take ownership back.
|
||||
/// Borrow the [`IndexerCore`] to run a query against its store.
|
||||
#[must_use]
|
||||
pub fn into_parts(mut self) -> (Box<IndexerHandle>, Box<IndexerClient>) {
|
||||
let Self {
|
||||
indexer_handle,
|
||||
indexer_client,
|
||||
} = &mut self;
|
||||
|
||||
let indexer_handle_boxed = unsafe { Box::from_raw(indexer_handle.cast::<IndexerHandle>()) };
|
||||
let indexer_client_boxed = unsafe { Box::from_raw(indexer_client.cast::<IndexerClient>()) };
|
||||
|
||||
// Assigning nulls to prevent double free on drop, since ownership is transferred to caller
|
||||
*indexer_handle = std::ptr::null_mut();
|
||||
*indexer_client = std::ptr::null_mut();
|
||||
|
||||
(indexer_handle_boxed, indexer_client_boxed)
|
||||
}
|
||||
|
||||
/// Helper to get indexer handle addr.
|
||||
#[must_use]
|
||||
pub const fn addr(&self) -> SocketAddr {
|
||||
let indexer_handle = unsafe {
|
||||
self.indexer_handle
|
||||
.cast::<IndexerHandle>()
|
||||
.as_ref()
|
||||
.expect("Indexer Handle must be non-null pointer")
|
||||
};
|
||||
|
||||
indexer_handle.addr()
|
||||
}
|
||||
|
||||
/// Helper to get indexer handle ref.
|
||||
#[must_use]
|
||||
pub const fn handle(&self) -> &IndexerHandle {
|
||||
pub const fn core(&self) -> &IndexerCore {
|
||||
unsafe {
|
||||
self.indexer_handle
|
||||
.cast::<IndexerHandle>()
|
||||
self.core
|
||||
.cast::<IndexerCore>()
|
||||
.as_ref()
|
||||
.expect("Indexer Handle must be non-null pointer")
|
||||
.expect("IndexerCore must be a non-null pointer")
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to get indexer client ref.
|
||||
/// Borrow the runtime to `block_on` an async store query.
|
||||
#[must_use]
|
||||
pub const fn client(&self) -> &IndexerClient {
|
||||
unsafe {
|
||||
self.indexer_client
|
||||
.cast::<IndexerClient>()
|
||||
.as_ref()
|
||||
.expect("Indexer Client must be non-null pointer")
|
||||
}
|
||||
pub const fn runtime(&self) -> &Runtime {
|
||||
&self.runtime
|
||||
}
|
||||
}
|
||||
|
||||
// Implement Drop to prevent memory leaks
|
||||
impl Drop for IndexerServiceFFI {
|
||||
fn drop(&mut self) {
|
||||
let Self {
|
||||
indexer_handle,
|
||||
indexer_client,
|
||||
} = self;
|
||||
if !self.ingest_handle.is_null() {
|
||||
let handle = unsafe { Box::from_raw(self.ingest_handle.cast::<JoinHandle<()>>()) };
|
||||
// stop the background ingestion task before tearing down the core.
|
||||
handle.abort();
|
||||
drop(handle);
|
||||
}
|
||||
if !self.core.is_null() {
|
||||
drop(unsafe { Box::from_raw(self.core.cast::<IndexerCore>()) });
|
||||
}
|
||||
|
||||
if !indexer_handle.is_null() {
|
||||
drop(unsafe { Box::from_raw(indexer_handle.cast::<IndexerHandle>()) });
|
||||
}
|
||||
if !indexer_client.is_null() {
|
||||
drop(unsafe { Box::from_raw(indexer_client.cast::<IndexerClient>()) });
|
||||
}
|
||||
// `runtime` field is dropped automatically on return here:
|
||||
// - if runtime was owned, it is shutdown at this point
|
||||
// - if it was borrowed, it continues to live within the external owner
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,7 +5,6 @@ pub use indexer::IndexerServiceFFI;
|
||||
pub use runtime::Runtime;
|
||||
|
||||
pub mod api;
|
||||
mod client;
|
||||
mod errors;
|
||||
mod indexer;
|
||||
mod runtime;
|
||||
|
||||
@ -71,4 +71,4 @@ ENV RUST_LOG=info
|
||||
USER indexer_service_user
|
||||
|
||||
WORKDIR /indexer_service
|
||||
CMD ["indexer_service", "/etc/indexer_service/indexer_config.json"]
|
||||
CMD ["indexer_service", "/etc/indexer_service/indexer_config.json", "--data-dir", "/var/lib/indexer_service"]
|
||||
|
||||
@ -1,5 +1,4 @@
|
||||
{
|
||||
"home": ".",
|
||||
"consensus_info_polling_interval": "1s",
|
||||
"bedrock_config": {
|
||||
"addr": "http://localhost:8080"
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::{net::SocketAddr, path::Path};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
pub use indexer_core::config::*;
|
||||
@ -65,9 +65,13 @@ impl Drop for IndexerHandle {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_server(config: IndexerConfig, port: u16) -> Result<IndexerHandle> {
|
||||
pub async fn run_server(
|
||||
config: IndexerConfig,
|
||||
storage_dir: &Path,
|
||||
port: u16,
|
||||
) -> Result<IndexerHandle> {
|
||||
#[cfg(feature = "mock-responses")]
|
||||
let _ = config;
|
||||
let _ = (config, storage_dir);
|
||||
|
||||
let server = Server::builder()
|
||||
.build(SocketAddr::from(([0, 0, 0, 0], port)))
|
||||
@ -82,8 +86,8 @@ pub async fn run_server(config: IndexerConfig, port: u16) -> Result<IndexerHandl
|
||||
|
||||
#[cfg(not(feature = "mock-responses"))]
|
||||
let handle = {
|
||||
let service =
|
||||
service::IndexerService::new(config).context("Failed to initialize indexer service")?;
|
||||
let service = service::IndexerService::new(config, storage_dir)
|
||||
.context("Failed to initialize indexer service")?;
|
||||
server.start(service.into_rpc())
|
||||
};
|
||||
#[cfg(feature = "mock-responses")]
|
||||
|
||||
@ -12,6 +12,9 @@ struct Args {
|
||||
config_path: PathBuf,
|
||||
#[clap(short, long, default_value = "8779")]
|
||||
port: u16,
|
||||
/// Directory under which the indexer stores its `RocksDB` state.
|
||||
#[clap(short, long, default_value = ".")]
|
||||
data_dir: PathBuf,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@ -22,12 +25,16 @@ struct Args {
|
||||
async fn main() -> Result<()> {
|
||||
env_logger::init();
|
||||
|
||||
let Args { config_path, port } = Args::parse();
|
||||
let Args {
|
||||
config_path,
|
||||
port,
|
||||
data_dir,
|
||||
} = Args::parse();
|
||||
|
||||
let cancellation_token = listen_for_shutdown_signal();
|
||||
|
||||
let config = indexer_service::IndexerConfig::from_path(&config_path)?;
|
||||
let indexer_handle = indexer_service::run_server(config, port).await?;
|
||||
let indexer_handle = indexer_service::run_server(config, data_dir.as_path(), port).await?;
|
||||
|
||||
tokio::select! {
|
||||
() = cancellation_token.cancelled() => {
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use std::{pin::pin, sync::Arc};
|
||||
use std::{path::Path, pin::pin, sync::Arc};
|
||||
|
||||
use anyhow::{Context as _, Result, bail};
|
||||
use arc_swap::ArcSwap;
|
||||
@ -19,8 +19,8 @@ pub struct IndexerService {
|
||||
}
|
||||
|
||||
impl IndexerService {
|
||||
pub fn new(config: IndexerConfig) -> Result<Self> {
|
||||
let indexer = IndexerCore::new(config)?;
|
||||
pub fn new(config: IndexerConfig, storage_dir: &Path) -> Result<Self> {
|
||||
let indexer = IndexerCore::new(config, storage_dir)?;
|
||||
let subscription_service = SubscriptionService::spawn_new(indexer.clone());
|
||||
|
||||
Ok(Self {
|
||||
|
||||
@ -163,9 +163,8 @@ pub fn wallet_config(sequencer_addr: SocketAddr) -> Result<WalletConfig> {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn indexer_config(bedrock_addr: SocketAddr, home: PathBuf) -> Result<IndexerConfig> {
|
||||
pub fn indexer_config(bedrock_addr: SocketAddr) -> Result<IndexerConfig> {
|
||||
Ok(IndexerConfig {
|
||||
home,
|
||||
consensus_info_polling_interval: Duration::from_secs(1),
|
||||
bedrock_config: ClientConfig {
|
||||
addr: addr_to_url(UrlProtocol::Http, bedrock_addr)
|
||||
|
||||
@ -98,10 +98,10 @@ pub async fn setup_indexer(bedrock_addr: SocketAddr) -> Result<(IndexerHandle, T
|
||||
temp_indexer_dir.path().display()
|
||||
);
|
||||
|
||||
let indexer_config = config::indexer_config(bedrock_addr, temp_indexer_dir.path().to_owned())
|
||||
.context("Failed to create Indexer config")?;
|
||||
let indexer_config =
|
||||
config::indexer_config(bedrock_addr).context("Failed to create Indexer config")?;
|
||||
|
||||
indexer_service::run_server(indexer_config, 0)
|
||||
indexer_service::run_server(indexer_config, temp_indexer_dir.path(), 0)
|
||||
.await
|
||||
.context("Failed to run Indexer Service")
|
||||
.map(|handle| (handle, temp_indexer_dir))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user