fix(indexer)!: address @Arjentix comments on IndexerStatus

This commit is contained in:
erhant 2026-06-23 14:33:38 +03:00
parent 2195937400
commit 735235efa1
5 changed files with 102 additions and 42 deletions

1
Cargo.lock generated
View File

@ -3799,6 +3799,7 @@ name = "indexer_core"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"async-stream",
"authenticated_transfer_core",
"borsh",

View File

@ -117,6 +117,7 @@ hex = "0.4.3"
bytemuck = "1.24.0"
bytesize = { version = "2.3.1", features = ["serde"] }
humantime-serde = "1.1"
arc-swap = "1.7"
humantime = "2.1"
aes-gcm = "0.10.3"
toml = "0.9.8"

View File

@ -16,6 +16,7 @@ storage.workspace = true
testnet_initial_state.workspace = true
anyhow.workspace = true
arc-swap.workspace = true
log.workspace = true
serde.workspace = true
humantime-serde.workspace = true

View File

@ -1,9 +1,7 @@
use std::{
path::Path,
sync::{Arc, Mutex},
};
use std::{path::Path, sync::Arc};
use anyhow::Result;
use arc_swap::ArcSwap;
use common::block::Block;
// ToDo: Remove after testnet
use futures::StreamExt as _;
@ -16,7 +14,7 @@ use logos_blockchain_zone_sdk::{
use crate::{
block_store::IndexerStore,
config::IndexerConfig,
status::{IndexerStatus, SyncState},
status::{IndexerStatus, IndexerSyncStatus},
};
pub mod block_store;
@ -29,7 +27,7 @@ pub struct IndexerCore {
pub config: IndexerConfig,
pub store: IndexerStore,
/// Live ingestion status; updated by the ingest stream, read by `status`.
pub status: Arc<Mutex<IndexerStatus>>,
pub status: Arc<ArcSwap<IndexerSyncStatus>>,
}
impl IndexerCore {
@ -49,29 +47,27 @@ impl IndexerCore {
zone_indexer: Arc::new(zone_indexer),
config,
store: IndexerStore::open_db(&home)?,
status: Arc::new(Mutex::new(IndexerStatus::starting())),
status: Arc::new(ArcSwap::from_pointee(IndexerSyncStatus::starting())),
})
}
/// Snapshot of the current ingestion status (sync state + indexed tip).
///
/// Combines the ingest loop's live state with the L2 tip read fresh from the
/// Combines the ingest loop's live status with the L2 tip read fresh from the
/// store, so callers (FFI/RPC) can tell "catching up" from "failed".
#[must_use]
pub fn status(&self) -> IndexerStatus {
let mut snapshot = self
.status
.lock()
.expect("indexer status mutex poisoned")
.clone();
snapshot.indexed_block_id = self.store.get_last_block_id().ok().flatten();
snapshot
let sync = IndexerSyncStatus::clone(&self.status.load());
let indexed_block_id = self.store.get_last_block_id().ok().flatten();
IndexerStatus {
sync,
indexed_block_id,
}
}
/// Apply a short, non-blocking update to the shared status.
fn set_status(&self, update: impl FnOnce(&mut IndexerStatus)) {
let mut status = self.status.lock().expect("indexer status mutex poisoned");
update(&mut status);
/// Atomically publish a new ingestion status for readers of `status`.
fn set_status(&self, status: IndexerSyncStatus) {
self.status.store(Arc::new(status));
}
pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> + '_ {
@ -97,10 +93,9 @@ impl IndexerCore {
// `next_messages` reads L1 consensus info internally, so
// this also covers an unreachable/misconfigured L1 node.
error!("Failed to start zone-sdk next_messages stream: {err}");
self.set_status(|s| {
s.state = SyncState::Error;
s.last_error = Some(format!("cannot reach L1 / read channel: {err}"));
});
self.set_status(IndexerSyncStatus::error(format!(
"cannot reach L1 / read channel: {err}"
)));
tokio::time::sleep(poll_interval).await;
continue;
}
@ -115,10 +110,7 @@ impl IndexerCore {
while let Some((msg, slot)) = stream.next().await {
if !announced_syncing {
self.set_status(|s| {
s.state = SyncState::Syncing;
s.last_error = None;
});
self.set_status(IndexerSyncStatus::syncing());
announced_syncing = true;
}
@ -159,8 +151,11 @@ impl IndexerCore {
yield Ok(block);
}
// Stream drained: caught up to LIB as of this cycle. Sleep then poll again.
self.set_status(|s| s.state = SyncState::CaughtUp);
// Stream drained: caught up to LIB as of this cycle. Clears any
// prior error (e.g. a transient L1 disconnect that left no
// backlog, so the `Syncing` branch above never ran). Sleep then
// poll again.
self.set_status(IndexerSyncStatus::caught_up());
tokio::time::sleep(poll_interval).await;
}
}

View File

@ -4,7 +4,7 @@ use serde::Serialize;
/// "still catching up" apart from "something went wrong".
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum SyncState {
pub enum IndexerSyncState {
/// Booted; no ingestion cycle has run yet.
Starting,
/// Streaming finalized messages toward the L1 frontier.
@ -15,27 +15,89 @@ pub enum SyncState {
Error,
}
/// Snapshot of the indexer's sync status.
///
/// `state` is the coarse phase and `last_error` carries the reason when it is
/// `Error`. `indexed_block_id` is the L2 tip, filled from the store at read time
/// (so it is always `None` in the value held behind the status mutex — see
/// `IndexerCore::status`).
/// Live ingestion status owned by the ingest loop: the coarse `state` plus the
/// reason when it is `Error`.
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct IndexerStatus {
pub state: SyncState,
pub indexed_block_id: Option<u64>,
pub struct IndexerSyncStatus {
pub state: IndexerSyncState,
pub last_error: Option<String>,
}
impl IndexerStatus {
impl IndexerSyncStatus {
/// Initial status before any ingestion cycle has run.
pub(crate) const fn starting() -> Self {
Self {
state: SyncState::Starting,
indexed_block_id: None,
state: IndexerSyncState::Starting,
last_error: None,
}
}
/// Actively streaming finalized messages toward the L1 frontier.
pub(crate) const fn syncing() -> Self {
Self {
state: IndexerSyncState::Syncing,
last_error: None,
}
}
/// Drained the stream up to LIB; idle until new blocks finalize.
pub(crate) const fn caught_up() -> Self {
Self {
state: IndexerSyncState::CaughtUp,
last_error: None,
}
}
/// The last cycle failed; `reason` explains why.
pub(crate) const fn error(reason: String) -> Self {
Self {
state: IndexerSyncState::Error,
last_error: Some(reason),
}
}
}
/// Full status snapshot returned to callers (FFI/RPC): the live [`SyncStatus`]
/// plus the L2 tip (`indexed_block_id`) read fresh from the store at query time.
///
/// The tip is tracked by the store, not the ingest loop, so it lives here on the
/// returned snapshot rather than inside the shared [`SyncStatus`].
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct IndexerStatus {
#[serde(flatten)]
pub sync: IndexerSyncStatus,
pub indexed_block_id: Option<u64>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn indexer_status_serializes_to_flat_object() {
let status = IndexerStatus {
sync: IndexerSyncStatus::error("boom".to_owned()),
indexed_block_id: Some(7),
};
let value = serde_json::to_value(&status).expect("serialize");
assert_eq!(
value,
serde_json::json!({
"state": "error",
"lastError": "boom",
"indexedBlockId": 7,
})
);
}
#[test]
fn caught_up_clears_error() {
let value = serde_json::to_value(IndexerSyncStatus::caught_up()).expect("serialize");
assert_eq!(
value,
serde_json::json!({ "state": "caught_up", "lastError": null })
);
}
}