mirror of
https://github.com/logos-blockchain/lssa.git
synced 2026-02-19 12:53:31 +00:00
Reimplement subscriptions
This commit is contained in:
parent
6ca020d547
commit
321f31a54b
@ -1,40 +1,34 @@
|
||||
use std::{pin::pin, sync::Arc};
|
||||
|
||||
use anyhow::{Context as _, Result, anyhow};
|
||||
use anyhow::{Context as _, Result, bail};
|
||||
use futures::StreamExt as _;
|
||||
use indexer_core::{IndexerCore, config::IndexerConfig};
|
||||
use indexer_service_protocol::{Account, AccountId, Block, BlockId, Hash, Transaction};
|
||||
use jsonrpsee::{
|
||||
SubscriptionMessage, SubscriptionSink,
|
||||
SubscriptionSink,
|
||||
core::{Serialize, SubscriptionResult},
|
||||
types::ErrorObjectOwned,
|
||||
};
|
||||
use serde_json::value::RawValue;
|
||||
use tokio::sync::{Mutex, broadcast};
|
||||
use tokio::sync::{Mutex, mpsc::UnboundedSender};
|
||||
|
||||
pub struct IndexerService {
|
||||
service_impl: Arc<Mutex<IndexerServiceImpl>>,
|
||||
respond_subscribers_loop_handle: tokio::task::JoinHandle<Result<()>>,
|
||||
}
|
||||
impl Drop for IndexerService {
|
||||
fn drop(&mut self) {
|
||||
self.respond_subscribers_loop_handle.abort();
|
||||
}
|
||||
subscription_service: SubscriptionService,
|
||||
|
||||
#[expect(
|
||||
dead_code,
|
||||
reason = "Will be used in future implementations of RPC methods"
|
||||
)]
|
||||
indexer: IndexerCore,
|
||||
}
|
||||
|
||||
impl IndexerService {
|
||||
pub fn new(config: IndexerConfig) -> Result<Self> {
|
||||
let service_impl = Arc::new(Mutex::new(IndexerServiceImpl::new(IndexerCore::new(
|
||||
config,
|
||||
)?)));
|
||||
|
||||
let respond_subscribers_loop_handle = tokio::spawn(
|
||||
IndexerServiceImpl::respond_subscribers_loop(Arc::clone(&service_impl)),
|
||||
);
|
||||
let indexer = IndexerCore::new(config)?;
|
||||
let subscription_service = SubscriptionService::spawn_new(indexer.clone());
|
||||
|
||||
Ok(Self {
|
||||
service_impl,
|
||||
respond_subscribers_loop_handle,
|
||||
subscription_service,
|
||||
indexer,
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -45,26 +39,9 @@ impl indexer_service_rpc::RpcServer for IndexerService {
|
||||
&self,
|
||||
subscription_sink: jsonrpsee::PendingSubscriptionSink,
|
||||
) -> SubscriptionResult {
|
||||
let mut rx = self
|
||||
.service_impl
|
||||
.lock()
|
||||
.await
|
||||
.finalized_block_id_tx
|
||||
.subscribe();
|
||||
|
||||
let sink = subscription_sink.accept().await?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Ok(block_id) = rx.recv().await {
|
||||
let msg = SubscriptionMessage::from(
|
||||
RawValue::from_string(block_id.to_string())
|
||||
.expect("u64 string is always valid JSON"),
|
||||
);
|
||||
if sink.send(msg).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
self.subscription_service
|
||||
.add_subscription(Subscription::new(sink))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -99,36 +76,87 @@ impl indexer_service_rpc::RpcServer for IndexerService {
|
||||
}
|
||||
}
|
||||
|
||||
struct IndexerServiceImpl {
|
||||
indexer: IndexerCore,
|
||||
finalized_block_id_tx: broadcast::Sender<BlockId>,
|
||||
struct SubscriptionService {
|
||||
respond_subscribers_loop_handle: tokio::task::JoinHandle<Result<()>>,
|
||||
new_subscription_sender: UnboundedSender<Subscription<BlockId>>,
|
||||
}
|
||||
|
||||
impl IndexerServiceImpl {
|
||||
fn new(indexer: IndexerCore) -> Self {
|
||||
let (finalized_block_id_tx, _block_rx) = broadcast::channel(1024);
|
||||
impl SubscriptionService {
|
||||
pub fn spawn_new(indexer: IndexerCore) -> Self {
|
||||
let (new_subscription_sender, mut sub_receiver) =
|
||||
tokio::sync::mpsc::unbounded_channel::<Subscription<BlockId>>();
|
||||
|
||||
let subscriptions = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let respond_subscribers_loop_handle = tokio::spawn(async move {
|
||||
let mut block_stream = pin!(indexer.subscribe_parse_block_stream().await);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
sub = sub_receiver.recv() => {
|
||||
let Some(subscription) = sub else {
|
||||
bail!("Subscription receiver closed unexpectedly");
|
||||
};
|
||||
subscriptions.lock().await.push(subscription);
|
||||
}
|
||||
block_opt = block_stream.next() => {
|
||||
let Some(block) = block_opt else {
|
||||
bail!("Block stream ended unexpectedly");
|
||||
};
|
||||
let block = block.context("Failed to get L2 block data")?;
|
||||
let block: indexer_service_protocol::Block = block
|
||||
.try_into()
|
||||
.context("Failed to convert L2 Block into protocol Block")?;
|
||||
|
||||
// Cloning subscriptions to avoid holding the lock while sending
|
||||
let subscriptions = subscriptions.lock().await.clone();
|
||||
for sink in subscriptions {
|
||||
sink.send(&block.header.block_id).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
indexer,
|
||||
finalized_block_id_tx,
|
||||
respond_subscribers_loop_handle,
|
||||
new_subscription_sender,
|
||||
}
|
||||
}
|
||||
|
||||
async fn respond_subscribers_loop(service_impl: Arc<Mutex<IndexerServiceImpl>>) -> Result<()> {
|
||||
let indexer_clone = service_impl.lock().await.indexer.clone();
|
||||
|
||||
let mut block_stream = pin!(indexer_clone.subscribe_parse_block_stream().await);
|
||||
while let Some(block) = block_stream.next().await {
|
||||
let block = block.context("Failed to get L2 block data")?;
|
||||
|
||||
// Cloning subscriptions to avoid holding the lock while sending
|
||||
service_impl
|
||||
.lock()
|
||||
.await
|
||||
.finalized_block_id_tx
|
||||
.send(block.header.block_id)?;
|
||||
}
|
||||
|
||||
Err(anyhow!("Block stream ended unexpectedly"))
|
||||
pub fn add_subscription(&self, subscription: Subscription<BlockId>) -> Result<()> {
|
||||
self.new_subscription_sender.send(subscription)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SubscriptionService {
|
||||
fn drop(&mut self) {
|
||||
self.respond_subscribers_loop_handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Subscription<T> {
|
||||
sink: SubscriptionSink,
|
||||
_marker: std::marker::PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<T> Subscription<T> {
|
||||
fn new(sink: SubscriptionSink) -> Self {
|
||||
Self {
|
||||
sink,
|
||||
_marker: std::marker::PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
async fn send(&self, item: &T) -> Result<()>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
let json = serde_json::value::to_raw_value(item)
|
||||
.context("Failed to serialize item for subscription")?;
|
||||
self.sink.send(json).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user