feat: added status reporter for indexer (for UI)

This commit is contained in:
erhant 2026-06-22 18:36:29 +03:00
parent c0fbaaf08e
commit c57bf16d15
6 changed files with 177 additions and 3 deletions

1
Cargo.lock generated
View File

@ -3830,6 +3830,7 @@ dependencies = [
"indexer_service_protocol",
"lee",
"log",
"serde_json",
"tokio",
]

View File

@ -1,4 +1,7 @@
use std::{path::Path, sync::Arc};
use std::{
path::Path,
sync::{Arc, Mutex},
};
use anyhow::Result;
use common::block::Block;
@ -10,16 +13,23 @@ use logos_blockchain_zone_sdk::{
CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
};
use crate::{block_store::IndexerStore, config::IndexerConfig};
use crate::{
block_store::IndexerStore,
config::IndexerConfig,
status::{IndexerStatus, SyncState},
};
pub mod block_store;
pub mod config;
pub mod status;
#[derive(Clone)]
pub struct IndexerCore {
pub zone_indexer: Arc<ZoneIndexer<NodeHttpClient>>,
pub config: IndexerConfig,
pub store: IndexerStore,
/// Live ingestion status; updated by the ingest stream, read by `status`.
pub status: Arc<Mutex<IndexerStatus>>,
}
impl IndexerCore {
@ -39,9 +49,31 @@ impl IndexerCore {
zone_indexer: Arc::new(zone_indexer),
config,
store: IndexerStore::open_db(&home)?,
status: Arc::new(Mutex::new(IndexerStatus::starting())),
})
}
/// Snapshot of the current ingestion status (sync state + indexed tip).
///
/// Combines the ingest loop's live state with the L2 tip read fresh from the
/// store, so callers (FFI/RPC) can tell "catching up" from "failed".
#[must_use]
pub fn status(&self) -> IndexerStatus {
let mut snapshot = self
.status
.lock()
.expect("indexer status mutex poisoned")
.clone();
snapshot.indexed_block_id = self.store.get_last_block_id().ok().flatten();
snapshot
}
/// Apply a short, non-blocking update to the shared status.
fn set_status(&self, update: impl FnOnce(&mut IndexerStatus)) {
let mut status = self.status.lock().expect("indexer status mutex poisoned");
update(&mut status);
}
pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> + '_ {
let poll_interval = self.config.consensus_info_polling_interval;
let initial_cursor = self
@ -62,14 +94,34 @@ impl IndexerCore {
let stream = match self.zone_indexer.next_messages(cursor).await {
Ok(s) => s,
Err(err) => {
// `next_messages` reads L1 consensus info internally, so
// this also covers an unreachable/misconfigured L1 node.
error!("Failed to start zone-sdk next_messages stream: {err}");
self.set_status(|s| {
s.state = SyncState::Error;
s.last_error = Some(format!("cannot reach L1 / read channel: {err}"));
});
tokio::time::sleep(poll_interval).await;
continue;
}
};
let mut stream = std::pin::pin!(stream);
// Flip to Syncing on the first message of this cycle (not merely on
// a successful poll) so the steady-state CaughtUp status doesn't
// flicker. Until then the state stays Starting (cold-start scan of
// empty L1 history) or CaughtUp (idle).
let mut announced_syncing = false;
while let Some((msg, slot)) = stream.next().await {
if !announced_syncing {
self.set_status(|s| {
s.state = SyncState::Syncing;
s.last_error = None;
});
announced_syncing = true;
}
let zone_block = match msg {
ZoneMessage::Block(b) => b,
// Non-block messages don't carry a cursor position; the
@ -107,7 +159,8 @@ impl IndexerCore {
yield Ok(block);
}
// Stream ended (caught up to LIB). Sleep then poll again.
// Stream drained: caught up to LIB as of this cycle. Sleep then poll again.
self.set_status(|s| s.state = SyncState::CaughtUp);
tokio::time::sleep(poll_interval).await;
}
}

View File

@ -0,0 +1,41 @@
use serde::Serialize;
/// Coarse lifecycle state of the indexer's ingestion loop, so a client can tell
/// "still catching up" apart from "something went wrong".
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum SyncState {
/// Booted; no ingestion cycle has run yet.
Starting,
/// Streaming finalized messages toward the L1 frontier.
Syncing,
/// Drained the stream up to LIB; idle until new blocks finalize.
CaughtUp,
/// The last cycle failed (e.g. the L1 node is unreachable). See `last_error`.
Error,
}
/// Snapshot of the indexer's sync status.
///
/// `state` is the coarse phase and `last_error` carries the reason when it is
/// `Error`. `indexed_block_id` is the L2 tip, filled from the store at read time
/// (so it is always `None` in the value held behind the status mutex — see
/// `IndexerCore::status`).
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct IndexerStatus {
pub state: SyncState,
pub indexed_block_id: Option<u64>,
pub last_error: Option<String>,
}
impl IndexerStatus {
/// Initial status before any ingestion cycle has run.
pub(crate) const fn starting() -> Self {
Self {
state: SyncState::Starting,
indexed_block_id: None,
last_error: None,
}
}
}

View File

@ -15,6 +15,8 @@ env_logger.workspace = true
log = { workspace = true }
tokio = { features = ["rt-multi-thread"], workspace = true }
futures.workspace = true
# Serializes the indexer status snapshot to JSON for `query_status`.
serde_json.workspace = true
[build-dependencies]
cbindgen = "0.29"

View File

@ -496,6 +496,31 @@ void free_cstring(char *block);
*/
struct LastBlockIdResult query_last_block(const struct IndexerServiceFFI *indexer);
/**
* Query the indexer's current sync status as a JSON C-string.
*
* The JSON schema is owned by `indexer_core` (`IndexerStatus`): an object with
* `state` (`starting`/`syncing`/`caught_up`/`error`), `indexedBlockId`, and
* `lastError`. Lets a client distinguish "still catching up" from "something
* went wrong".
*
* # Arguments
*
* - `indexer`: A pointer to the [`IndexerServiceFFI`] instance to be queried.
*
* # Returns
*
* A heap-allocated, null-terminated JSON string that the caller MUST free with
* `free_cstring`. Returns null on error (null `indexer` pointer or a
* serialization failure).
*
* # Safety
*
* The caller must ensure that:
* - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
*/
char *query_status(const struct IndexerServiceFFI *indexer);
/**
* Query the block by id from indexer.
*

View File

@ -1,3 +1,5 @@
use std::ffi::{CString, c_char};
use indexer_service_protocol::AccountId;
use crate::{
@ -86,6 +88,56 @@ pub unsafe extern "C" fn query_last_block(indexer: *const IndexerServiceFFI) ->
)
}
/// Query the indexer's current sync status as a JSON C-string.
///
/// The JSON schema is owned by `indexer_core` (`IndexerStatus`): an object with
/// `state` (`starting`/`syncing`/`caught_up`/`error`), `indexedBlockId`, and
/// `lastError`. Lets a client distinguish "still catching up" from "something
/// went wrong".
///
/// # Arguments
///
/// - `indexer`: A pointer to the [`IndexerServiceFFI`] instance to be queried.
///
/// # Returns
///
/// A heap-allocated, null-terminated JSON string that the caller MUST free with
/// `free_cstring`. Returns null on error (null `indexer` pointer or a
/// serialization failure).
///
/// # Safety
///
/// The caller must ensure that:
/// - `indexer` is a valid pointer to a [`IndexerServiceFFI`] instance.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn query_status(indexer: *const IndexerServiceFFI) -> *mut c_char {
if indexer.is_null() {
log::error!(
"Attempted to query status on a null indexer pointer. This is a bug. Aborting."
);
return std::ptr::null_mut();
}
let indexer = unsafe { &*indexer };
let status = indexer.core().status();
let json = match serde_json::to_string(&status) {
Ok(json) => json,
Err(e) => {
log::error!("Failed to serialize indexer status: {e}");
return std::ptr::null_mut();
}
};
CString::new(json).map_or_else(
|e| {
log::error!("Indexer status JSON contained an interior nul byte: {e}");
std::ptr::null_mut()
},
CString::into_raw,
)
}
/// Query the block by id from indexer.
///
/// # Arguments