mirror of
https://github.com/logos-blockchain/logos-execution-zone.git
synced 2026-05-20 15:10:01 +00:00
fix deadlock in subscriptions
This commit is contained in:
parent
d14aeb6cc1
commit
6ca020d547
@ -23,7 +23,7 @@ pub trait Rpc {
|
|||||||
Ok(serde_json::to_value(block_schema).expect("Schema serialization should not fail"))
|
Ok(serde_json::to_value(block_schema).expect("Schema serialization should not fail"))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[subscription(name = "subscribeToFinalizedBlocks", item = Block)]
|
#[subscription(name = "subscribeToFinalizedBlocks", item = BlockId)]
|
||||||
async fn subscribe_to_finalized_blocks(&self) -> SubscriptionResult;
|
async fn subscribe_to_finalized_blocks(&self) -> SubscriptionResult;
|
||||||
|
|
||||||
#[method(name = "getBlockById")]
|
#[method(name = "getBlockById")]
|
||||||
|
|||||||
@ -4,14 +4,18 @@ use anyhow::{Context as _, Result, anyhow};
|
|||||||
use futures::StreamExt as _;
|
use futures::StreamExt as _;
|
||||||
use indexer_core::{IndexerCore, config::IndexerConfig};
|
use indexer_core::{IndexerCore, config::IndexerConfig};
|
||||||
use indexer_service_protocol::{Account, AccountId, Block, BlockId, Hash, Transaction};
|
use indexer_service_protocol::{Account, AccountId, Block, BlockId, Hash, Transaction};
|
||||||
use jsonrpsee::{SubscriptionSink, core::{Serialize, SubscriptionResult}, types::ErrorObjectOwned};
|
use jsonrpsee::{
|
||||||
use tokio::sync::Mutex;
|
SubscriptionMessage, SubscriptionSink,
|
||||||
|
core::{Serialize, SubscriptionResult},
|
||||||
|
types::ErrorObjectOwned,
|
||||||
|
};
|
||||||
|
use serde_json::value::RawValue;
|
||||||
|
use tokio::sync::{Mutex, broadcast};
|
||||||
|
|
||||||
pub struct IndexerService {
|
pub struct IndexerService {
|
||||||
service_impl: Arc<Mutex<IndexerServiceImpl>>,
|
service_impl: Arc<Mutex<IndexerServiceImpl>>,
|
||||||
respond_subscribers_loop_handle: tokio::task::JoinHandle<Result<()>>,
|
respond_subscribers_loop_handle: tokio::task::JoinHandle<Result<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for IndexerService {
|
impl Drop for IndexerService {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.respond_subscribers_loop_handle.abort();
|
self.respond_subscribers_loop_handle.abort();
|
||||||
@ -20,9 +24,9 @@ impl Drop for IndexerService {
|
|||||||
|
|
||||||
impl IndexerService {
|
impl IndexerService {
|
||||||
pub fn new(config: IndexerConfig) -> Result<Self> {
|
pub fn new(config: IndexerConfig) -> Result<Self> {
|
||||||
let service_impl = Arc::new(Mutex::new(IndexerServiceImpl::new(
|
let service_impl = Arc::new(Mutex::new(IndexerServiceImpl::new(IndexerCore::new(
|
||||||
IndexerCore::new(config)?,
|
config,
|
||||||
)));
|
)?)));
|
||||||
|
|
||||||
let respond_subscribers_loop_handle = tokio::spawn(
|
let respond_subscribers_loop_handle = tokio::spawn(
|
||||||
IndexerServiceImpl::respond_subscribers_loop(Arc::clone(&service_impl)),
|
IndexerServiceImpl::respond_subscribers_loop(Arc::clone(&service_impl)),
|
||||||
@ -41,8 +45,27 @@ impl indexer_service_rpc::RpcServer for IndexerService {
|
|||||||
&self,
|
&self,
|
||||||
subscription_sink: jsonrpsee::PendingSubscriptionSink,
|
subscription_sink: jsonrpsee::PendingSubscriptionSink,
|
||||||
) -> SubscriptionResult {
|
) -> SubscriptionResult {
|
||||||
|
let mut rx = self
|
||||||
|
.service_impl
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.finalized_block_id_tx
|
||||||
|
.subscribe();
|
||||||
|
|
||||||
let sink = subscription_sink.accept().await?;
|
let sink = subscription_sink.accept().await?;
|
||||||
self.service_impl.lock().await.add_subscription(Subscription::new(sink)).await;
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Ok(block_id) = rx.recv().await {
|
||||||
|
let msg = SubscriptionMessage::from(
|
||||||
|
RawValue::from_string(block_id.to_string())
|
||||||
|
.expect("u64 string is always valid JSON"),
|
||||||
|
);
|
||||||
|
if sink.send(msg).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -78,60 +101,34 @@ impl indexer_service_rpc::RpcServer for IndexerService {
|
|||||||
|
|
||||||
struct IndexerServiceImpl {
|
struct IndexerServiceImpl {
|
||||||
indexer: IndexerCore,
|
indexer: IndexerCore,
|
||||||
subscriptions: Vec<Subscription<Block>>,
|
finalized_block_id_tx: broadcast::Sender<BlockId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IndexerServiceImpl {
|
impl IndexerServiceImpl {
|
||||||
fn new(indexer: IndexerCore) -> Self {
|
fn new(indexer: IndexerCore) -> Self {
|
||||||
|
let (finalized_block_id_tx, _block_rx) = broadcast::channel(1024);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
indexer,
|
indexer,
|
||||||
subscriptions: Vec::new(),
|
finalized_block_id_tx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn add_subscription(&mut self, subscription: Subscription<Block>) {
|
|
||||||
self.subscriptions.push(subscription);
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn respond_subscribers_loop(service_impl: Arc<Mutex<IndexerServiceImpl>>) -> Result<()> {
|
async fn respond_subscribers_loop(service_impl: Arc<Mutex<IndexerServiceImpl>>) -> Result<()> {
|
||||||
let indexer_clone = service_impl.lock().await.indexer.clone();
|
let indexer_clone = service_impl.lock().await.indexer.clone();
|
||||||
|
|
||||||
let mut block_stream = pin!(indexer_clone.subscribe_parse_block_stream().await);
|
let mut block_stream = pin!(indexer_clone.subscribe_parse_block_stream().await);
|
||||||
while let Some(block) = block_stream.next().await {
|
while let Some(block) = block_stream.next().await {
|
||||||
let block= block.context("Failed to get L2 block data")?;
|
let block = block.context("Failed to get L2 block data")?;
|
||||||
let block = block.try_into().context("Failed to convert L2 Block into protocol Block")?;
|
|
||||||
|
|
||||||
// Cloning subscriptions to avoid holding the lock while sending
|
// Cloning subscriptions to avoid holding the lock while sending
|
||||||
let subscriptions = service_impl.lock().await.subscriptions.clone();
|
service_impl
|
||||||
for sink in subscriptions {
|
.lock()
|
||||||
sink.send(&block).await?;
|
.await
|
||||||
}
|
.finalized_block_id_tx
|
||||||
|
.send(block.header.block_id)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(anyhow!("Block stream ended unexpectedly"))
|
Err(anyhow!("Block stream ended unexpectedly"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct Subscription<T> {
|
|
||||||
sink: SubscriptionSink,
|
|
||||||
_marker: std::marker::PhantomData<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Subscription<T> {
|
|
||||||
fn new(sink: SubscriptionSink) -> Self {
|
|
||||||
Self {
|
|
||||||
sink,
|
|
||||||
_marker: std::marker::PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn send(&self, item: &T) -> Result<()>
|
|
||||||
where T: Serialize
|
|
||||||
{
|
|
||||||
let json = serde_json::value::to_raw_value(item)
|
|
||||||
.context("Failed to serialize item for subscription")?;
|
|
||||||
self.sink.send(json).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@ -8,7 +8,7 @@ use common::{
|
|||||||
sequencer_client::SequencerClient,
|
sequencer_client::SequencerClient,
|
||||||
transaction::{EncodedTransaction, NSSATransaction},
|
transaction::{EncodedTransaction, NSSATransaction},
|
||||||
};
|
};
|
||||||
use indexer_core::{IndexerCore, config::IndexerConfig};
|
use indexer_core::config::IndexerConfig;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use nssa::PrivacyPreservingTransaction;
|
use nssa::PrivacyPreservingTransaction;
|
||||||
use nssa_core::Commitment;
|
use nssa_core::Commitment;
|
||||||
|
|||||||
@ -265,7 +265,10 @@ impl SequencerCore {
|
|||||||
.map(|block| block.header.block_id)
|
.map(|block| block.header.block_id)
|
||||||
.min()
|
.min()
|
||||||
{
|
{
|
||||||
info!("FIRST PENDING BLOCK: {}", first_pending_block_id);
|
info!(
|
||||||
|
"Clearing pending blocks up to id: {}",
|
||||||
|
last_finalized_block_id
|
||||||
|
);
|
||||||
(first_pending_block_id..=last_finalized_block_id)
|
(first_pending_block_id..=last_finalized_block_id)
|
||||||
.try_for_each(|id| self.store.delete_block_at_id(id))
|
.try_for_each(|id| self.store.delete_block_at_id(id))
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -5,7 +5,7 @@ use anyhow::{Context as _, Result};
|
|||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use common::rpc_primitives::RpcConfig;
|
use common::rpc_primitives::RpcConfig;
|
||||||
use futures::FutureExt as _;
|
use futures::FutureExt as _;
|
||||||
use log::{debug, error, info, warn};
|
use log::{error, info, warn};
|
||||||
use sequencer_core::{SequencerCore, config::SequencerConfig};
|
use sequencer_core::{SequencerCore, config::SequencerConfig};
|
||||||
use sequencer_rpc::new_http_server;
|
use sequencer_rpc::new_http_server;
|
||||||
use tokio::{sync::Mutex, task::JoinHandle};
|
use tokio::{sync::Mutex, task::JoinHandle};
|
||||||
@ -194,12 +194,10 @@ async fn listen_for_bedrock_blocks_loop(seq_core: Arc<Mutex<SequencerCore>>) ->
|
|||||||
.await
|
.await
|
||||||
.context("Failed to subscribe to finalized blocks")?;
|
.context("Failed to subscribe to finalized blocks")?;
|
||||||
|
|
||||||
while let Some(block) = subscription.next().await {
|
while let Some(block_id) = subscription.next().await {
|
||||||
let block = block.context("Failed to get next block from subscription")?;
|
let block_id = block_id.context("Failed to get next block from subscription")?;
|
||||||
let block_id = block.header.block_id;
|
|
||||||
|
|
||||||
info!("Received new L2 block with ID {block_id}");
|
info!("Received new L2 block with ID {block_id}");
|
||||||
debug!("Block data: {block:#?}");
|
|
||||||
|
|
||||||
seq_core
|
seq_core
|
||||||
.lock()
|
.lock()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user