307 lines
9.1 KiB
Rust
Raw Normal View History

2026-01-30 21:37:27 +03:00
use std::{pin::pin, sync::Arc};
2026-02-02 17:05:58 +03:00
use anyhow::{Context as _, Result, bail};
use arc_swap::ArcSwap;
use futures::{StreamExt as _, never::Never};
2026-01-29 23:03:00 +03:00
use indexer_core::{IndexerCore, config::IndexerConfig};
use indexer_service_protocol::{Account, AccountId, Block, BlockId, HashType, Transaction};
2026-01-31 18:56:04 -03:00
use jsonrpsee::{
2026-02-02 17:05:58 +03:00
SubscriptionSink,
core::{Serialize, SubscriptionResult, async_trait},
types::{ErrorCode, ErrorObject, ErrorObjectOwned},
2026-01-31 18:56:04 -03:00
};
use log::{debug, error, info, warn};
use tokio::sync::mpsc::UnboundedSender;
2026-01-29 23:03:00 +03:00
pub struct IndexerService {
2026-02-02 17:05:58 +03:00
subscription_service: SubscriptionService,
indexer: IndexerCore,
2026-01-29 23:03:00 +03:00
}
impl IndexerService {
pub fn new(config: IndexerConfig) -> Result<Self> {
let indexer = IndexerCore::new(config)?;
2026-02-02 17:05:58 +03:00
let subscription_service = SubscriptionService::spawn_new(indexer.clone());
2026-01-30 21:37:27 +03:00
2026-01-29 23:03:00 +03:00
Ok(Self {
2026-02-02 17:05:58 +03:00
subscription_service,
indexer,
2026-01-29 23:03:00 +03:00
})
}
}
#[async_trait]
impl indexer_service_rpc::RpcServer for IndexerService {
async fn subscribe_to_finalized_blocks(
&self,
2026-01-30 21:37:27 +03:00
subscription_sink: jsonrpsee::PendingSubscriptionSink,
) -> SubscriptionResult {
2026-01-30 21:37:27 +03:00
let sink = subscription_sink.accept().await?;
info!(
"Accepted new subscription to finalized blocks with ID {:?}",
sink.subscription_id()
);
2026-02-02 17:05:58 +03:00
self.subscription_service
.add_subscription(Subscription::new(sink))
.await?;
2026-01-31 18:56:04 -03:00
2026-01-30 21:37:27 +03:00
Ok(())
}
2026-02-09 15:05:01 +02:00
async fn get_last_finalized_block_id(&self) -> Result<BlockId, ErrorObjectOwned> {
2026-02-23 10:55:00 +02:00
self.indexer.store.get_last_block_id().map_err(db_error)
2026-02-09 15:05:01 +02:00
}
2026-02-04 14:57:38 +02:00
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
2026-02-10 15:54:57 +02:00
Ok(self
.indexer
2026-02-04 14:57:38 +02:00
.store
.get_block_at_id(block_id)
2026-02-23 10:55:00 +02:00
.map_err(db_error)?
2026-02-10 15:54:57 +02:00
.into())
}
async fn get_block_by_hash(&self, block_hash: HashType) -> Result<Block, ErrorObjectOwned> {
2026-02-10 15:54:57 +02:00
Ok(self
.indexer
2026-02-04 14:57:38 +02:00
.store
.get_block_by_hash(block_hash.0)
2026-02-23 10:55:00 +02:00
.map_err(db_error)?
2026-02-10 15:54:57 +02:00
.into())
}
2026-02-04 14:57:38 +02:00
async fn get_account(&self, account_id: AccountId) -> Result<Account, ErrorObjectOwned> {
2026-02-10 15:54:57 +02:00
Ok(self
.indexer
2026-02-04 14:57:38 +02:00
.store
2026-03-17 15:10:12 +02:00
.account_current_state(&account_id.into())
2026-03-04 14:12:39 +02:00
.await
2026-02-23 10:55:00 +02:00
.map_err(db_error)?
2026-02-10 15:54:57 +02:00
.into())
}
async fn get_transaction(&self, tx_hash: HashType) -> Result<Transaction, ErrorObjectOwned> {
2026-02-10 15:54:57 +02:00
Ok(self
.indexer
2026-02-04 14:57:38 +02:00
.store
.get_transaction_by_hash(tx_hash.0)
2026-02-23 10:55:00 +02:00
.map_err(db_error)?
2026-02-10 15:54:57 +02:00
.into())
}
2026-01-28 03:21:43 +03:00
async fn get_blocks(
&self,
2026-03-03 23:21:08 +03:00
before: Option<BlockId>,
limit: u64,
) -> Result<Vec<Block>, ErrorObjectOwned> {
2026-02-04 14:57:38 +02:00
let blocks = self
.indexer
.store
2026-03-03 23:21:08 +03:00
.get_block_batch(before, limit)
2026-02-23 10:55:00 +02:00
.map_err(db_error)?;
2026-02-04 14:57:38 +02:00
let mut block_res = vec![];
for block in blocks {
2026-03-03 23:21:08 +03:00
block_res.push(block.into());
2026-02-04 14:57:38 +02:00
}
Ok(block_res)
2026-01-28 03:21:43 +03:00
}
async fn get_transactions_by_account(
&self,
2026-02-05 16:21:08 +02:00
account_id: AccountId,
2026-03-03 23:21:08 +03:00
offset: u64,
limit: u64,
2026-01-28 03:21:43 +03:00
) -> Result<Vec<Transaction>, ErrorObjectOwned> {
2026-02-05 16:21:08 +02:00
let transactions = self
.indexer
.store
2026-03-03 23:21:08 +03:00
.get_transactions_by_account(account_id.value, offset, limit)
2026-02-23 10:55:00 +02:00
.map_err(db_error)?;
2026-02-05 16:21:08 +02:00
let mut tx_res = vec![];
for tx in transactions {
2026-03-03 23:21:08 +03:00
tx_res.push(tx.into());
2026-02-05 16:21:08 +02:00
}
Ok(tx_res)
2026-01-28 03:21:43 +03:00
}
2026-02-09 15:05:01 +02:00
async fn healthcheck(&self) -> Result<(), ErrorObjectOwned> {
// Checking, that indexer can calculate last state
2026-03-17 15:18:12 +02:00
let _ = self
.indexer
.store
.recalculate_final_state()
.map_err(db_error)?;
2026-02-09 15:05:01 +02:00
Ok(())
}
}
2026-01-30 21:37:27 +03:00
2026-02-02 17:05:58 +03:00
struct SubscriptionService {
parts: ArcSwap<SubscriptionLoopParts>,
indexer: IndexerCore,
2026-01-30 21:37:27 +03:00
}
2026-02-02 17:05:58 +03:00
impl SubscriptionService {
pub fn spawn_new(indexer: IndexerCore) -> Self {
let parts = Self::spawn_respond_subscribers_loop(indexer.clone());
Self {
parts: ArcSwap::new(Arc::new(parts)),
indexer,
}
}
pub async fn add_subscription(&self, subscription: Subscription<BlockId>) -> Result<()> {
let guard = self.parts.load();
2026-03-04 18:42:33 +03:00
if let Err(send_err) = guard.new_subscription_sender.send(subscription) {
error!(
"Failed to send new subscription to subscription service with error: {send_err:#?}"
);
// Respawn the subscription service loop if it has finished (either with error or panic)
if guard.handle.is_finished() {
drop(guard);
let new_parts = Self::spawn_respond_subscribers_loop(self.indexer.clone());
let old_handle_and_sender = self.parts.swap(Arc::new(new_parts));
let old_parts = Arc::into_inner(old_handle_and_sender)
.expect("There should be no other references to the old handle and sender");
match old_parts.handle.await {
Ok(Err(err)) => {
error!(
"Subscription service loop has unexpectedly finished with error: {err:#}"
);
}
Err(err) => {
error!("Subscription service loop has panicked with err: {err:#}");
}
}
}
2026-03-04 18:42:33 +03:00
bail!(send_err)
2026-03-03 23:21:08 +03:00
}
Ok(())
}
fn spawn_respond_subscribers_loop(indexer: IndexerCore) -> SubscriptionLoopParts {
2026-02-02 17:05:58 +03:00
let (new_subscription_sender, mut sub_receiver) =
tokio::sync::mpsc::unbounded_channel::<Subscription<BlockId>>();
let handle = tokio::spawn(async move {
let mut subscribers = Vec::new();
2026-02-02 17:05:58 +03:00
2026-03-03 23:21:08 +03:00
let mut block_stream = pin!(indexer.subscribe_parse_block_stream());
2026-02-02 17:05:58 +03:00
2026-03-04 18:42:33 +03:00
#[expect(
clippy::integer_division_remainder_used,
reason = "Generated by select! macro, can't be easily rewritten to avoid this lint"
)]
2026-02-02 17:05:58 +03:00
loop {
tokio::select! {
sub = sub_receiver.recv() => {
let Some(subscription) = sub else {
bail!("Subscription receiver closed unexpectedly");
};
info!("Added new subscription with ID {:?}", subscription.sink.subscription_id());
subscribers.push(subscription);
2026-02-02 17:05:58 +03:00
}
block_opt = block_stream.next() => {
debug!("Got new block from block stream");
2026-02-02 17:05:58 +03:00
let Some(block) = block_opt else {
bail!("Block stream ended unexpectedly");
};
let block = block.context("Failed to get L2 block data")?;
let block: indexer_service_protocol::Block = block.into();
for sub in &mut subscribers {
if let Err(err) = sub.try_send(&block.header.block_id) {
warn!(
"Failed to send block ID {:?} to subscription ID {:?} with error: {err:#?}",
block.header.block_id,
sub.sink.subscription_id(),
);
}
2026-02-02 17:05:58 +03:00
}
}
}
}
});
SubscriptionLoopParts {
handle,
2026-02-02 17:05:58 +03:00
new_subscription_sender,
2026-01-30 21:37:27 +03:00
}
}
2026-02-02 17:05:58 +03:00
}
2026-01-30 21:37:27 +03:00
2026-02-02 17:05:58 +03:00
impl Drop for SubscriptionService {
fn drop(&mut self) {
self.parts.load().handle.abort();
2026-02-02 17:05:58 +03:00
}
}
struct SubscriptionLoopParts {
handle: tokio::task::JoinHandle<Result<Never>>,
new_subscription_sender: UnboundedSender<Subscription<BlockId>>,
}
2026-02-02 17:05:58 +03:00
struct Subscription<T> {
sink: SubscriptionSink,
_marker: std::marker::PhantomData<T>,
}
2026-01-30 21:37:27 +03:00
2026-02-02 17:05:58 +03:00
impl<T> Subscription<T> {
2026-03-09 18:27:56 +03:00
const fn new(sink: SubscriptionSink) -> Self {
2026-02-02 17:05:58 +03:00
Self {
sink,
_marker: std::marker::PhantomData,
2026-01-30 21:37:27 +03:00
}
2026-02-02 17:05:58 +03:00
}
2026-01-30 21:37:27 +03:00
fn try_send(&mut self, item: &T) -> Result<()>
2026-02-02 17:05:58 +03:00
where
T: Serialize,
{
let json = serde_json::value::to_raw_value(item)
.context("Failed to serialize item for subscription")?;
self.sink.try_send(json)?;
2026-02-02 17:05:58 +03:00
Ok(())
2026-01-30 21:37:27 +03:00
}
}
impl<T> Drop for Subscription<T> {
fn drop(&mut self) {
info!(
"Subscription with ID {:?} is being dropped",
self.sink.subscription_id()
);
}
}
2026-03-03 23:21:08 +03:00
#[must_use]
pub fn not_yet_implemented_error() -> ErrorObjectOwned {
ErrorObject::owned(
ErrorCode::InternalError.code(),
"Not yet implemented",
Option::<String>::None,
)
}
2026-02-23 10:55:00 +02:00
2026-03-03 23:21:08 +03:00
#[expect(
clippy::needless_pass_by_value,
reason = "Error is consumed to extract details for error response"
)]
2026-02-23 10:55:00 +02:00
fn db_error(err: anyhow::Error) -> ErrorObjectOwned {
2026-02-23 16:10:56 +02:00
ErrorObjectOwned::owned(
ErrorCode::InternalError.code(),
2026-03-04 18:42:33 +03:00
"DBError".to_owned(),
2026-02-23 16:10:56 +02:00
Some(format!("{err:#?}")),
)
2026-02-23 10:55:00 +02:00
}