diff --git a/indexer/service/rpc/src/lib.rs b/indexer/service/rpc/src/lib.rs index 18755df5..52c5f0fb 100644 --- a/indexer/service/rpc/src/lib.rs +++ b/indexer/service/rpc/src/lib.rs @@ -23,7 +23,7 @@ pub trait Rpc { Ok(serde_json::to_value(block_schema).expect("Schema serialization should not fail")) } - #[subscription(name = "subscribeToFinalizedBlocks", item = Block)] + #[subscription(name = "subscribeToFinalizedBlocks", item = BlockId)] async fn subscribe_to_finalized_blocks(&self) -> SubscriptionResult; #[method(name = "getBlockById")] diff --git a/indexer/service/src/service.rs b/indexer/service/src/service.rs index c7cdccd4..d3faa5d5 100644 --- a/indexer/service/src/service.rs +++ b/indexer/service/src/service.rs @@ -4,14 +4,18 @@ use anyhow::{Context as _, Result, anyhow}; use futures::StreamExt as _; use indexer_core::{IndexerCore, config::IndexerConfig}; use indexer_service_protocol::{Account, AccountId, Block, BlockId, Hash, Transaction}; -use jsonrpsee::{SubscriptionSink, core::{Serialize, SubscriptionResult}, types::ErrorObjectOwned}; -use tokio::sync::Mutex; +use jsonrpsee::{ + SubscriptionMessage, SubscriptionSink, + core::{Serialize, SubscriptionResult}, + types::ErrorObjectOwned, +}; +use serde_json::value::RawValue; +use tokio::sync::{Mutex, broadcast}; pub struct IndexerService { service_impl: Arc>, respond_subscribers_loop_handle: tokio::task::JoinHandle>, } - impl Drop for IndexerService { fn drop(&mut self) { self.respond_subscribers_loop_handle.abort(); @@ -20,9 +24,9 @@ impl Drop for IndexerService { impl IndexerService { pub fn new(config: IndexerConfig) -> Result { - let service_impl = Arc::new(Mutex::new(IndexerServiceImpl::new( - IndexerCore::new(config)?, - ))); + let service_impl = Arc::new(Mutex::new(IndexerServiceImpl::new(IndexerCore::new( + config, + )?))); let respond_subscribers_loop_handle = tokio::spawn( IndexerServiceImpl::respond_subscribers_loop(Arc::clone(&service_impl)), @@ -41,8 +45,27 @@ impl indexer_service_rpc::RpcServer for IndexerService { &self, subscription_sink: jsonrpsee::PendingSubscriptionSink, ) -> SubscriptionResult { + let mut rx = self + .service_impl + .lock() + .await + .finalized_block_id_tx + .subscribe(); + let sink = subscription_sink.accept().await?; - self.service_impl.lock().await.add_subscription(Subscription::new(sink)).await; + + tokio::spawn(async move { + while let Ok(block_id) = rx.recv().await { + let msg = SubscriptionMessage::from( + RawValue::from_string(block_id.to_string()) + .expect("u64 string is always valid JSON"), + ); + if sink.send(msg).await.is_err() { + break; + } + } + }); + Ok(()) } @@ -78,60 +101,34 @@ impl indexer_service_rpc::RpcServer for IndexerService { struct IndexerServiceImpl { indexer: IndexerCore, - subscriptions: Vec>, + finalized_block_id_tx: broadcast::Sender, } impl IndexerServiceImpl { fn new(indexer: IndexerCore) -> Self { + let (finalized_block_id_tx, _block_rx) = broadcast::channel(1024); + Self { indexer, - subscriptions: Vec::new(), + finalized_block_id_tx, } } - async fn add_subscription(&mut self, subscription: Subscription) { - self.subscriptions.push(subscription); - } - async fn respond_subscribers_loop(service_impl: Arc>) -> Result<()> { let indexer_clone = service_impl.lock().await.indexer.clone(); let mut block_stream = pin!(indexer_clone.subscribe_parse_block_stream().await); while let Some(block) = block_stream.next().await { - let block= block.context("Failed to get L2 block data")?; - let block = block.try_into().context("Failed to convert L2 Block into protocol Block")?; + let block = block.context("Failed to get L2 block data")?; // Cloning subscriptions to avoid holding the lock while sending - let subscriptions = service_impl.lock().await.subscriptions.clone(); - for sink in subscriptions { - sink.send(&block).await?; - } + service_impl + .lock() + .await + .finalized_block_id_tx + .send(block.header.block_id)?; } Err(anyhow!("Block stream ended unexpectedly")) } } - -#[derive(Clone)] -struct Subscription { - sink: SubscriptionSink, - _marker: std::marker::PhantomData, -} - -impl Subscription { - fn new(sink: SubscriptionSink) -> Self { - Self { - sink, - _marker: std::marker::PhantomData, - } - } - - async fn send(&self, item: &T) -> Result<()> - where T: Serialize - { - let json = serde_json::value::to_raw_value(item) - .context("Failed to serialize item for subscription")?; - self.sink.send(json).await?; - Ok(()) - } -} diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index baff7c9c..5f39c872 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -8,7 +8,7 @@ use common::{ sequencer_client::SequencerClient, transaction::{EncodedTransaction, NSSATransaction}, }; -use indexer_core::{IndexerCore, config::IndexerConfig}; +use indexer_core::config::IndexerConfig; use log::debug; use nssa::PrivacyPreservingTransaction; use nssa_core::Commitment; diff --git a/sequencer_core/src/lib.rs b/sequencer_core/src/lib.rs index f54fb78f..913481f8 100644 --- a/sequencer_core/src/lib.rs +++ b/sequencer_core/src/lib.rs @@ -265,7 +265,10 @@ impl SequencerCore { .map(|block| block.header.block_id) .min() { - info!("FIRST PENDING BLOCK: {}", first_pending_block_id); + info!( + "Clearing pending blocks up to id: {}", + last_finalized_block_id + ); (first_pending_block_id..=last_finalized_block_id) .try_for_each(|id| self.store.delete_block_at_id(id)) } else { diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs index 17c984f5..5913ee02 100644 --- a/sequencer_runner/src/lib.rs +++ b/sequencer_runner/src/lib.rs @@ -5,7 +5,7 @@ use anyhow::{Context as _, Result}; use clap::Parser; use common::rpc_primitives::RpcConfig; use futures::FutureExt as _; -use log::{debug, error, info, warn}; +use log::{error, info, warn}; use sequencer_core::{SequencerCore, config::SequencerConfig}; use sequencer_rpc::new_http_server; use tokio::{sync::Mutex, task::JoinHandle}; @@ -194,12 +194,10 @@ async fn listen_for_bedrock_blocks_loop(seq_core: Arc>) -> .await .context("Failed to subscribe to finalized blocks")?; - while let Some(block) = subscription.next().await { - let block = block.context("Failed to get next block from subscription")?; - let block_id = block.header.block_id; + while let Some(block_id) = subscription.next().await { + let block_id = block_id.context("Failed to get next block from subscription")?; info!("Received new L2 block with ID {block_id}"); - debug!("Block data: {block:#?}"); seq_core .lock()