lssa/indexer/core/src/lib.rs

168 lines
6.3 KiB
Rust
Raw Normal View History

use std::sync::Arc;
2026-02-12 14:27:36 +02:00
2026-01-27 09:46:31 +02:00
use anyhow::Result;
2026-02-10 14:03:56 +02:00
use common::block::{Block, HashableBlockData};
2026-02-03 11:36:07 +02:00
// ToDo: Remove after testnet
2026-02-10 14:03:56 +02:00
use common::{HashType, PINATA_BASE58};
use futures::StreamExt as _;
2026-04-29 11:06:05 +02:00
use log::{error, info};
use logos_blockchain_core::header::HeaderId;
2026-04-29 10:33:07 +02:00
use logos_blockchain_zone_sdk::{
CommonHttpClient, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
2026-01-12 15:51:24 +02:00
};
use nssa::V03State;
2026-03-19 18:01:15 +02:00
use testnet_initial_state::initial_state_testnet;
2026-01-09 15:10:38 +02:00
2026-02-03 11:36:07 +02:00
use crate::{block_store::IndexerStore, config::IndexerConfig};
2026-01-12 15:51:24 +02:00
2026-04-29 11:06:05 +02:00
// TODO: persist & restore cursor (e.g. in rocksdb) so restarts don't have to
// re-process the channel from the beginning. Mirrors the sequencer checkpoint
// TODO in `block_publisher.rs`.
2026-02-03 11:36:07 +02:00
pub mod block_store;
2026-01-12 15:51:24 +02:00
pub mod config;
2026-01-30 21:37:27 +03:00
#[derive(Clone)]
2026-01-09 15:10:38 +02:00
pub struct IndexerCore {
2026-04-29 10:33:07 +02:00
pub zone_indexer: Arc<ZoneIndexer<NodeHttpClient>>,
2026-02-03 11:36:07 +02:00
pub config: IndexerConfig,
pub store: IndexerStore,
2026-01-09 15:10:38 +02:00
}
impl IndexerCore {
pub fn new(config: IndexerConfig) -> Result<Self> {
2026-02-10 14:03:56 +02:00
let hashable_data = HashableBlockData {
block_id: 1,
transactions: vec![],
prev_block_hash: HashType([0; 32]),
timestamp: 0,
};
2026-02-03 11:36:07 +02:00
2026-02-18 14:58:33 +02:00
// Genesis creation is fine as it is,
// because it will be overwritten by sequencer.
// Therefore:
// ToDo: remove key from indexer config, use some default.
2026-02-10 14:03:56 +02:00
let signing_key = nssa::PrivateKey::try_new(config.signing_key).unwrap();
let channel_genesis_msg_id = [0; 32];
2026-03-04 18:42:33 +03:00
let genesis_block = hashable_data.into_pending_block(&signing_key, channel_genesis_msg_id);
2026-02-03 11:36:07 +02:00
2026-04-07 13:35:13 -03:00
let initial_private_accounts: Option<Vec<(nssa_core::Commitment, nssa_core::Nullifier)>> =
config.initial_private_accounts.as_ref().map(|accounts| {
accounts
2026-03-16 15:15:35 +02:00
.iter()
.map(|init_comm_data| {
let npk = &init_comm_data.npk;
let mut acc = init_comm_data.account.clone();
acc.program_owner =
nssa::program::Program::authenticated_transfer_program().id();
(
nssa_core::Commitment::new(npk, &acc),
2026-04-07 13:35:13 -03:00
nssa_core::Nullifier::for_account_initialization(npk),
)
2026-03-16 15:15:35 +02:00
})
.collect()
});
2026-03-19 13:01:43 +02:00
let init_accs: Option<Vec<(nssa::AccountId, u128)>> = config
.initial_public_accounts
2026-03-19 18:01:15 +02:00
.as_ref()
2026-03-19 13:01:43 +02:00
.map(|initial_accounts| {
2026-03-16 15:15:35 +02:00
initial_accounts
.iter()
.map(|acc_data| (acc_data.account_id, acc_data.balance))
.collect()
});
// If initial commitments or accounts are present in config, need to construct state from
// them
let state = if initial_private_accounts.is_some() || init_accs.is_some() {
let mut state = V03State::new_with_genesis_accounts(
2026-03-16 15:15:35 +02:00
&init_accs.unwrap_or_default(),
2026-04-07 13:35:13 -03:00
initial_private_accounts.unwrap_or_default(),
genesis_block.header.timestamp,
2026-03-16 15:15:35 +02:00
);
// ToDo: Remove after testnet
state.add_pinata_program(PINATA_BASE58.parse().unwrap());
state
} else {
initial_state_testnet()
};
2026-02-03 11:36:07 +02:00
2026-02-13 13:59:06 +02:00
let home = config.home.join("rocksdb");
2026-02-03 11:36:07 +02:00
2026-04-29 10:33:07 +02:00
let basic_auth = config.bedrock_client_config.auth.clone().map(Into::into);
let node = NodeHttpClient::new(
CommonHttpClient::new(basic_auth),
config.bedrock_client_config.addr.clone(),
);
2026-04-29 10:33:07 +02:00
let zone_indexer = ZoneIndexer::new(config.channel_id, node);
2026-01-12 15:51:24 +02:00
Ok(Self {
zone_indexer: Arc::new(zone_indexer),
2026-01-12 15:51:24 +02:00
config,
2026-03-04 18:42:33 +03:00
store: IndexerStore::open_db_with_genesis(&home, &genesis_block, &state)?,
2026-01-12 15:51:24 +02:00
})
2026-01-09 15:10:38 +02:00
}
2026-01-12 15:51:24 +02:00
2026-04-29 11:06:05 +02:00
pub fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> + '_ {
let poll_interval = self.config.consensus_info_polling_interval;
2026-01-30 21:37:27 +03:00
2026-04-29 11:06:05 +02:00
async_stream::stream! {
// In-memory only; not persisted across restarts (see top-of-file TODO).
let mut cursor = None;
2026-02-12 14:27:36 +02:00
2026-04-29 11:06:05 +02:00
info!("Starting indexer from beginning of channel");
2026-02-12 14:27:36 +02:00
2026-04-29 11:06:05 +02:00
loop {
let stream = match self.zone_indexer.next_messages(cursor).await {
Ok(s) => s,
Err(err) => {
error!("Failed to start zone-sdk next_messages stream: {err}");
tokio::time::sleep(poll_interval).await;
continue;
2026-01-16 16:15:21 +02:00
}
2026-03-03 23:21:08 +03:00
};
2026-04-29 11:06:05 +02:00
let mut stream = std::pin::pin!(stream);
while let Some((msg, slot)) = stream.next().await {
let zone_block = match msg {
ZoneMessage::Block(b) => b,
// Non-block messages don't carry a cursor position; the
// next ZoneBlock advances past them implicitly.
ZoneMessage::Deposit(_) | ZoneMessage::Withdraw(_) => continue,
};
let block: Block = match borsh::from_slice(&zone_block.data) {
Ok(b) => b,
Err(e) => {
error!("Failed to deserialize L2 block from zone-sdk: {e}");
cursor = Some((zone_block.id, slot));
continue;
}
};
info!("Indexed L2 block {}", block.header.block_id);
// TODO: Remove l1_header placeholder once storage layer
// no longer requires it. Zone-sdk handles L1 tracking internally.
let placeholder_l1_header = HeaderId::from([0_u8; 32]);
if let Err(err) = self.store.put_block(block.clone(), placeholder_l1_header).await {
error!("Failed to store block {}: {err:#}", block.header.block_id);
}
2026-02-16 17:54:54 +02:00
2026-04-29 11:06:05 +02:00
cursor = Some((zone_block.id, slot));
yield Ok(block);
2026-02-24 12:27:47 +02:00
}
2026-02-16 17:54:54 +02:00
2026-04-29 11:06:05 +02:00
// Stream ended (caught up to LIB). Sleep then poll again.
tokio::time::sleep(poll_interval).await;
2026-02-16 17:54:54 +02:00
}
2026-01-16 16:15:21 +02:00
}
2026-01-12 15:51:24 +02:00
}
2026-02-04 14:57:38 +02:00
}