From c57bf16d1545aa9a116f2383f814c50a37e7b005 Mon Sep 17 00:00:00 2001 From: erhant Date: Mon, 22 Jun 2026 18:36:29 +0300 Subject: [PATCH] feat: added status reporter for indexer (for UI) --- Cargo.lock | 1 + lez/indexer/core/src/lib.rs | 59 ++++++++++++++++++++++++++++++-- lez/indexer/core/src/status.rs | 41 ++++++++++++++++++++++ lez/indexer/ffi/Cargo.toml | 2 ++ lez/indexer/ffi/indexer_ffi.h | 25 ++++++++++++++ lez/indexer/ffi/src/api/query.rs | 52 ++++++++++++++++++++++++++++ 6 files changed, 177 insertions(+), 3 deletions(-) create mode 100644 lez/indexer/core/src/status.rs diff --git a/Cargo.lock b/Cargo.lock index 9af3ff27..318b47e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3830,6 +3830,7 @@ dependencies = [ "indexer_service_protocol", "lee", "log", + "serde_json", "tokio", ] diff --git a/lez/indexer/core/src/lib.rs b/lez/indexer/core/src/lib.rs index d8890af2..deb4d7af 100644 --- a/lez/indexer/core/src/lib.rs +++ b/lez/indexer/core/src/lib.rs @@ -1,4 +1,7 @@ -use std::{path::Path, sync::Arc}; +use std::{ + path::Path, + sync::{Arc, Mutex}, +}; use anyhow::Result; use common::block::Block; @@ -10,16 +13,23 @@ use logos_blockchain_zone_sdk::{ CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer, }; -use crate::{block_store::IndexerStore, config::IndexerConfig}; +use crate::{ + block_store::IndexerStore, + config::IndexerConfig, + status::{IndexerStatus, SyncState}, +}; pub mod block_store; pub mod config; +pub mod status; #[derive(Clone)] pub struct IndexerCore { pub zone_indexer: Arc>, pub config: IndexerConfig, pub store: IndexerStore, + /// Live ingestion status; updated by the ingest stream, read by `status`. + pub status: Arc>, } impl IndexerCore { @@ -39,9 +49,31 @@ impl IndexerCore { zone_indexer: Arc::new(zone_indexer), config, store: IndexerStore::open_db(&home)?, + status: Arc::new(Mutex::new(IndexerStatus::starting())), }) } + /// Snapshot of the current ingestion status (sync state + indexed tip). + /// + /// Combines the ingest loop's live state with the L2 tip read fresh from the + /// store, so callers (FFI/RPC) can tell "catching up" from "failed". + #[must_use] + pub fn status(&self) -> IndexerStatus { + let mut snapshot = self + .status + .lock() + .expect("indexer status mutex poisoned") + .clone(); + snapshot.indexed_block_id = self.store.get_last_block_id().ok().flatten(); + snapshot + } + + /// Apply a short, non-blocking update to the shared status. + fn set_status(&self, update: impl FnOnce(&mut IndexerStatus)) { + let mut status = self.status.lock().expect("indexer status mutex poisoned"); + update(&mut status); + } + pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream> + '_ { let poll_interval = self.config.consensus_info_polling_interval; let initial_cursor = self @@ -62,14 +94,34 @@ impl IndexerCore { let stream = match self.zone_indexer.next_messages(cursor).await { Ok(s) => s, Err(err) => { + // `next_messages` reads L1 consensus info internally, so + // this also covers an unreachable/misconfigured L1 node. error!("Failed to start zone-sdk next_messages stream: {err}"); + self.set_status(|s| { + s.state = SyncState::Error; + s.last_error = Some(format!("cannot reach L1 / read channel: {err}")); + }); tokio::time::sleep(poll_interval).await; continue; } }; let mut stream = std::pin::pin!(stream); + // Flip to Syncing on the first message of this cycle (not merely on + // a successful poll) so the steady-state CaughtUp status doesn't + // flicker. Until then the state stays Starting (cold-start scan of + // empty L1 history) or CaughtUp (idle). + let mut announced_syncing = false; + while let Some((msg, slot)) = stream.next().await { + if !announced_syncing { + self.set_status(|s| { + s.state = SyncState::Syncing; + s.last_error = None; + }); + announced_syncing = true; + } + let zone_block = match msg { ZoneMessage::Block(b) => b, // Non-block messages don't carry a cursor position; the @@ -107,7 +159,8 @@ impl IndexerCore { yield Ok(block); } - // Stream ended (caught up to LIB). Sleep then poll again. + // Stream drained: caught up to LIB as of this cycle. Sleep then poll again. + self.set_status(|s| s.state = SyncState::CaughtUp); tokio::time::sleep(poll_interval).await; } } diff --git a/lez/indexer/core/src/status.rs b/lez/indexer/core/src/status.rs new file mode 100644 index 00000000..a8a8b845 --- /dev/null +++ b/lez/indexer/core/src/status.rs @@ -0,0 +1,41 @@ +use serde::Serialize; + +/// Coarse lifecycle state of the indexer's ingestion loop, so a client can tell +/// "still catching up" apart from "something went wrong". +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum SyncState { + /// Booted; no ingestion cycle has run yet. + Starting, + /// Streaming finalized messages toward the L1 frontier. + Syncing, + /// Drained the stream up to LIB; idle until new blocks finalize. + CaughtUp, + /// The last cycle failed (e.g. the L1 node is unreachable). See `last_error`. + Error, +} + +/// Snapshot of the indexer's sync status. +/// +/// `state` is the coarse phase and `last_error` carries the reason when it is +/// `Error`. `indexed_block_id` is the L2 tip, filled from the store at read time +/// (so it is always `None` in the value held behind the status mutex — see +/// `IndexerCore::status`). +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct IndexerStatus { + pub state: SyncState, + pub indexed_block_id: Option, + pub last_error: Option, +} + +impl IndexerStatus { + /// Initial status before any ingestion cycle has run. + pub(crate) const fn starting() -> Self { + Self { + state: SyncState::Starting, + indexed_block_id: None, + last_error: None, + } + } +} diff --git a/lez/indexer/ffi/Cargo.toml b/lez/indexer/ffi/Cargo.toml index 50748d56..331a996a 100644 --- a/lez/indexer/ffi/Cargo.toml +++ b/lez/indexer/ffi/Cargo.toml @@ -15,6 +15,8 @@ env_logger.workspace = true log = { workspace = true } tokio = { features = ["rt-multi-thread"], workspace = true } futures.workspace = true +# Serializes the indexer status snapshot to JSON for `query_status`. +serde_json.workspace = true [build-dependencies] cbindgen = "0.29" diff --git a/lez/indexer/ffi/indexer_ffi.h b/lez/indexer/ffi/indexer_ffi.h index eb3bf117..ae61eecb 100644 --- a/lez/indexer/ffi/indexer_ffi.h +++ b/lez/indexer/ffi/indexer_ffi.h @@ -496,6 +496,31 @@ void free_cstring(char *block); */ struct LastBlockIdResult query_last_block(const struct IndexerServiceFFI *indexer); +/** + * Query the indexer's current sync status as a JSON C-string. + * + * The JSON schema is owned by `indexer_core` (`IndexerStatus`): an object with + * `state` (`starting`/`syncing`/`caught_up`/`error`), `indexedBlockId`, and + * `lastError`. Lets a client distinguish "still catching up" from "something + * went wrong". + * + * # Arguments + * + * - `indexer`: A pointer to the [`IndexerServiceFFI`] instance to be queried. + * + * # Returns + * + * A heap-allocated, null-terminated JSON string that the caller MUST free with + * `free_cstring`. Returns null on error (null `indexer` pointer or a + * serialization failure). + * + * # Safety + * + * The caller must ensure that: + * - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance. + */ +char *query_status(const struct IndexerServiceFFI *indexer); + /** * Query the block by id from indexer. * diff --git a/lez/indexer/ffi/src/api/query.rs b/lez/indexer/ffi/src/api/query.rs index 23233a76..1943f6d4 100644 --- a/lez/indexer/ffi/src/api/query.rs +++ b/lez/indexer/ffi/src/api/query.rs @@ -1,3 +1,5 @@ +use std::ffi::{CString, c_char}; + use indexer_service_protocol::AccountId; use crate::{ @@ -86,6 +88,56 @@ pub unsafe extern "C" fn query_last_block(indexer: *const IndexerServiceFFI) -> ) } +/// Query the indexer's current sync status as a JSON C-string. +/// +/// The JSON schema is owned by `indexer_core` (`IndexerStatus`): an object with +/// `state` (`starting`/`syncing`/`caught_up`/`error`), `indexedBlockId`, and +/// `lastError`. Lets a client distinguish "still catching up" from "something +/// went wrong". +/// +/// # Arguments +/// +/// - `indexer`: A pointer to the [`IndexerServiceFFI`] instance to be queried. +/// +/// # Returns +/// +/// A heap-allocated, null-terminated JSON string that the caller MUST free with +/// `free_cstring`. Returns null on error (null `indexer` pointer or a +/// serialization failure). +/// +/// # Safety +/// +/// The caller must ensure that: +/// - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance. +#[unsafe(no_mangle)] +pub unsafe extern "C" fn query_status(indexer: *const IndexerServiceFFI) -> *mut c_char { + if indexer.is_null() { + log::error!( + "Attempted to query status on a null indexer pointer. This is a bug. Aborting." + ); + return std::ptr::null_mut(); + } + + let indexer = unsafe { &*indexer }; + let status = indexer.core().status(); + + let json = match serde_json::to_string(&status) { + Ok(json) => json, + Err(e) => { + log::error!("Failed to serialize indexer status: {e}"); + return std::ptr::null_mut(); + } + }; + + CString::new(json).map_or_else( + |e| { + log::error!("Indexer status JSON contained an interior nul byte: {e}"); + std::ptr::null_mut() + }, + CString::into_raw, + ) +} + /// Query the block by id from indexer. /// /// # Arguments