From 321f31a54b3a961e447c28db66f88ba3e08fd33e Mon Sep 17 00:00:00 2001 From: Daniil Polyakov Date: Mon, 2 Feb 2026 17:05:58 +0300 Subject: [PATCH] Reimplement subscriptions --- indexer/service/src/service.rs | 154 +++++++++++++++++++-------------- 1 file changed, 91 insertions(+), 63 deletions(-) diff --git a/indexer/service/src/service.rs b/indexer/service/src/service.rs index d3faa5d5..d7b14c4d 100644 --- a/indexer/service/src/service.rs +++ b/indexer/service/src/service.rs @@ -1,40 +1,34 @@ use std::{pin::pin, sync::Arc}; -use anyhow::{Context as _, Result, anyhow}; +use anyhow::{Context as _, Result, bail}; use futures::StreamExt as _; use indexer_core::{IndexerCore, config::IndexerConfig}; use indexer_service_protocol::{Account, AccountId, Block, BlockId, Hash, Transaction}; use jsonrpsee::{ - SubscriptionMessage, SubscriptionSink, + SubscriptionSink, core::{Serialize, SubscriptionResult}, types::ErrorObjectOwned, }; -use serde_json::value::RawValue; -use tokio::sync::{Mutex, broadcast}; +use tokio::sync::{Mutex, mpsc::UnboundedSender}; pub struct IndexerService { - service_impl: Arc>, - respond_subscribers_loop_handle: tokio::task::JoinHandle>, -} -impl Drop for IndexerService { - fn drop(&mut self) { - self.respond_subscribers_loop_handle.abort(); - } + subscription_service: SubscriptionService, + + #[expect( + dead_code, + reason = "Will be used in future implementations of RPC methods" + )] + indexer: IndexerCore, } impl IndexerService { pub fn new(config: IndexerConfig) -> Result { - let service_impl = Arc::new(Mutex::new(IndexerServiceImpl::new(IndexerCore::new( - config, - )?))); - - let respond_subscribers_loop_handle = tokio::spawn( - IndexerServiceImpl::respond_subscribers_loop(Arc::clone(&service_impl)), - ); + let indexer = IndexerCore::new(config)?; + let subscription_service = SubscriptionService::spawn_new(indexer.clone()); Ok(Self { - service_impl, - respond_subscribers_loop_handle, + subscription_service, + indexer, }) } } @@ -45,26 +39,9 @@ impl indexer_service_rpc::RpcServer for IndexerService { &self, subscription_sink: jsonrpsee::PendingSubscriptionSink, ) -> SubscriptionResult { - let mut rx = self - .service_impl - .lock() - .await - .finalized_block_id_tx - .subscribe(); - let sink = subscription_sink.accept().await?; - - tokio::spawn(async move { - while let Ok(block_id) = rx.recv().await { - let msg = SubscriptionMessage::from( - RawValue::from_string(block_id.to_string()) - .expect("u64 string is always valid JSON"), - ); - if sink.send(msg).await.is_err() { - break; - } - } - }); + self.subscription_service + .add_subscription(Subscription::new(sink))?; Ok(()) } @@ -99,36 +76,87 @@ impl indexer_service_rpc::RpcServer for IndexerService { } } -struct IndexerServiceImpl { - indexer: IndexerCore, - finalized_block_id_tx: broadcast::Sender, +struct SubscriptionService { + respond_subscribers_loop_handle: tokio::task::JoinHandle>, + new_subscription_sender: UnboundedSender>, } -impl IndexerServiceImpl { - fn new(indexer: IndexerCore) -> Self { - let (finalized_block_id_tx, _block_rx) = broadcast::channel(1024); +impl SubscriptionService { + pub fn spawn_new(indexer: IndexerCore) -> Self { + let (new_subscription_sender, mut sub_receiver) = + tokio::sync::mpsc::unbounded_channel::>(); + + let subscriptions = Arc::new(Mutex::new(Vec::new())); + + let respond_subscribers_loop_handle = tokio::spawn(async move { + let mut block_stream = pin!(indexer.subscribe_parse_block_stream().await); + + loop { + tokio::select! { + sub = sub_receiver.recv() => { + let Some(subscription) = sub else { + bail!("Subscription receiver closed unexpectedly"); + }; + subscriptions.lock().await.push(subscription); + } + block_opt = block_stream.next() => { + let Some(block) = block_opt else { + bail!("Block stream ended unexpectedly"); + }; + let block = block.context("Failed to get L2 block data")?; + let block: indexer_service_protocol::Block = block + .try_into() + .context("Failed to convert L2 Block into protocol Block")?; + + // Cloning subscriptions to avoid holding the lock while sending + let subscriptions = subscriptions.lock().await.clone(); + for sink in subscriptions { + sink.send(&block.header.block_id).await?; + } + } + } + } + }); Self { - indexer, - finalized_block_id_tx, + respond_subscribers_loop_handle, + new_subscription_sender, } } - async fn respond_subscribers_loop(service_impl: Arc>) -> Result<()> { - let indexer_clone = service_impl.lock().await.indexer.clone(); - - let mut block_stream = pin!(indexer_clone.subscribe_parse_block_stream().await); - while let Some(block) = block_stream.next().await { - let block = block.context("Failed to get L2 block data")?; - - // Cloning subscriptions to avoid holding the lock while sending - service_impl - .lock() - .await - .finalized_block_id_tx - .send(block.header.block_id)?; - } - - Err(anyhow!("Block stream ended unexpectedly")) + pub fn add_subscription(&self, subscription: Subscription) -> Result<()> { + self.new_subscription_sender.send(subscription)?; + Ok(()) + } +} + +impl Drop for SubscriptionService { + fn drop(&mut self) { + self.respond_subscribers_loop_handle.abort(); + } +} + +#[derive(Clone)] +struct Subscription { + sink: SubscriptionSink, + _marker: std::marker::PhantomData, +} + +impl Subscription { + fn new(sink: SubscriptionSink) -> Self { + Self { + sink, + _marker: std::marker::PhantomData, + } + } + + async fn send(&self, item: &T) -> Result<()> + where + T: Serialize, + { + let json = serde_json::value::to_raw_value(item) + .context("Failed to serialize item for subscription")?; + self.sink.send(json).await?; + Ok(()) } }