From 735235efa1dc18240258cb033f0694d77c00828e Mon Sep 17 00:00:00 2001 From: erhant Date: Tue, 23 Jun 2026 14:33:38 +0300 Subject: [PATCH] fix(indexer)!: address @Arjentix comments on `IndexerStatus` --- Cargo.lock | 1 + Cargo.toml | 1 + lez/indexer/core/Cargo.toml | 1 + lez/indexer/core/src/lib.rs | 53 ++++++++++---------- lez/indexer/core/src/status.rs | 88 +++++++++++++++++++++++++++++----- 5 files changed, 102 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 318b47e6..3a4db53d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3799,6 +3799,7 @@ name = "indexer_core" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "async-stream", "authenticated_transfer_core", "borsh", diff --git a/Cargo.toml b/Cargo.toml index 25f03774..4c64aaba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,6 +117,7 @@ hex = "0.4.3" bytemuck = "1.24.0" bytesize = { version = "2.3.1", features = ["serde"] } humantime-serde = "1.1" +arc-swap = "1.7" humantime = "2.1" aes-gcm = "0.10.3" toml = "0.9.8" diff --git a/lez/indexer/core/Cargo.toml b/lez/indexer/core/Cargo.toml index c6cc5fc6..758acdd6 100644 --- a/lez/indexer/core/Cargo.toml +++ b/lez/indexer/core/Cargo.toml @@ -16,6 +16,7 @@ storage.workspace = true testnet_initial_state.workspace = true anyhow.workspace = true +arc-swap.workspace = true log.workspace = true serde.workspace = true humantime-serde.workspace = true diff --git a/lez/indexer/core/src/lib.rs b/lez/indexer/core/src/lib.rs index deb4d7af..0d595fc0 100644 --- a/lez/indexer/core/src/lib.rs +++ b/lez/indexer/core/src/lib.rs @@ -1,9 +1,7 @@ -use std::{ - path::Path, - sync::{Arc, Mutex}, -}; +use std::{path::Path, sync::Arc}; use anyhow::Result; +use arc_swap::ArcSwap; use common::block::Block; // ToDo: Remove after testnet use futures::StreamExt as _; @@ -16,7 +14,7 @@ use logos_blockchain_zone_sdk::{ use crate::{ block_store::IndexerStore, config::IndexerConfig, - status::{IndexerStatus, SyncState}, + status::{IndexerStatus, IndexerSyncStatus}, }; pub mod block_store; @@ -29,7 +27,7 @@ pub struct IndexerCore { pub config: IndexerConfig, pub store: IndexerStore, /// Live ingestion status; updated by the ingest stream, read by `status`. - pub status: Arc>, + pub status: Arc>, } impl IndexerCore { @@ -49,29 +47,27 @@ impl IndexerCore { zone_indexer: Arc::new(zone_indexer), config, store: IndexerStore::open_db(&home)?, - status: Arc::new(Mutex::new(IndexerStatus::starting())), + status: Arc::new(ArcSwap::from_pointee(IndexerSyncStatus::starting())), }) } /// Snapshot of the current ingestion status (sync state + indexed tip). /// - /// Combines the ingest loop's live state with the L2 tip read fresh from the + /// Combines the ingest loop's live status with the L2 tip read fresh from the /// store, so callers (FFI/RPC) can tell "catching up" from "failed". #[must_use] pub fn status(&self) -> IndexerStatus { - let mut snapshot = self - .status - .lock() - .expect("indexer status mutex poisoned") - .clone(); - snapshot.indexed_block_id = self.store.get_last_block_id().ok().flatten(); - snapshot + let sync = IndexerSyncStatus::clone(&self.status.load()); + let indexed_block_id = self.store.get_last_block_id().ok().flatten(); + IndexerStatus { + sync, + indexed_block_id, + } } - /// Apply a short, non-blocking update to the shared status. - fn set_status(&self, update: impl FnOnce(&mut IndexerStatus)) { - let mut status = self.status.lock().expect("indexer status mutex poisoned"); - update(&mut status); + /// Atomically publish a new ingestion status for readers of `status`. + fn set_status(&self, status: IndexerSyncStatus) { + self.status.store(Arc::new(status)); } pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream> + '_ { @@ -97,10 +93,9 @@ impl IndexerCore { // `next_messages` reads L1 consensus info internally, so // this also covers an unreachable/misconfigured L1 node. error!("Failed to start zone-sdk next_messages stream: {err}"); - self.set_status(|s| { - s.state = SyncState::Error; - s.last_error = Some(format!("cannot reach L1 / read channel: {err}")); - }); + self.set_status(IndexerSyncStatus::error(format!( + "cannot reach L1 / read channel: {err}" + ))); tokio::time::sleep(poll_interval).await; continue; } @@ -115,10 +110,7 @@ impl IndexerCore { while let Some((msg, slot)) = stream.next().await { if !announced_syncing { - self.set_status(|s| { - s.state = SyncState::Syncing; - s.last_error = None; - }); + self.set_status(IndexerSyncStatus::syncing()); announced_syncing = true; } @@ -159,8 +151,11 @@ impl IndexerCore { yield Ok(block); } - // Stream drained: caught up to LIB as of this cycle. Sleep then poll again. - self.set_status(|s| s.state = SyncState::CaughtUp); + // Stream drained: caught up to LIB as of this cycle. Clears any + // prior error (e.g. a transient L1 disconnect that left no + // backlog, so the `Syncing` branch above never ran). Sleep then + // poll again. + self.set_status(IndexerSyncStatus::caught_up()); tokio::time::sleep(poll_interval).await; } } diff --git a/lez/indexer/core/src/status.rs b/lez/indexer/core/src/status.rs index a8a8b845..a7b91f8b 100644 --- a/lez/indexer/core/src/status.rs +++ b/lez/indexer/core/src/status.rs @@ -4,7 +4,7 @@ use serde::Serialize; /// "still catching up" apart from "something went wrong". #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] #[serde(rename_all = "snake_case")] -pub enum SyncState { +pub enum IndexerSyncState { /// Booted; no ingestion cycle has run yet. Starting, /// Streaming finalized messages toward the L1 frontier. @@ -15,27 +15,89 @@ pub enum SyncState { Error, } -/// Snapshot of the indexer's sync status. -/// -/// `state` is the coarse phase and `last_error` carries the reason when it is -/// `Error`. `indexed_block_id` is the L2 tip, filled from the store at read time -/// (so it is always `None` in the value held behind the status mutex — see -/// `IndexerCore::status`). +/// Live ingestion status owned by the ingest loop: the coarse `state` plus the +/// reason when it is `Error`. #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] -pub struct IndexerStatus { - pub state: SyncState, - pub indexed_block_id: Option, +pub struct IndexerSyncStatus { + pub state: IndexerSyncState, pub last_error: Option, } -impl IndexerStatus { +impl IndexerSyncStatus { /// Initial status before any ingestion cycle has run. pub(crate) const fn starting() -> Self { Self { - state: SyncState::Starting, - indexed_block_id: None, + state: IndexerSyncState::Starting, last_error: None, } } + + /// Actively streaming finalized messages toward the L1 frontier. + pub(crate) const fn syncing() -> Self { + Self { + state: IndexerSyncState::Syncing, + last_error: None, + } + } + + /// Drained the stream up to LIB; idle until new blocks finalize. + pub(crate) const fn caught_up() -> Self { + Self { + state: IndexerSyncState::CaughtUp, + last_error: None, + } + } + + /// The last cycle failed; `reason` explains why. + pub(crate) const fn error(reason: String) -> Self { + Self { + state: IndexerSyncState::Error, + last_error: Some(reason), + } + } +} + +/// Full status snapshot returned to callers (FFI/RPC): the live [`SyncStatus`] +/// plus the L2 tip (`indexed_block_id`) read fresh from the store at query time. +/// +/// The tip is tracked by the store, not the ingest loop, so it lives here on the +/// returned snapshot rather than inside the shared [`SyncStatus`]. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct IndexerStatus { + #[serde(flatten)] + pub sync: IndexerSyncStatus, + pub indexed_block_id: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn indexer_status_serializes_to_flat_object() { + let status = IndexerStatus { + sync: IndexerSyncStatus::error("boom".to_owned()), + indexed_block_id: Some(7), + }; + let value = serde_json::to_value(&status).expect("serialize"); + assert_eq!( + value, + serde_json::json!({ + "state": "error", + "lastError": "boom", + "indexedBlockId": 7, + }) + ); + } + + #[test] + fn caught_up_clears_error() { + let value = serde_json::to_value(IndexerSyncStatus::caught_up()).expect("serialize"); + assert_eq!( + value, + serde_json::json!({ "state": "caught_up", "lastError": null }) + ); + } }