Implement block subscription

This commit is contained in:
Daniil Polyakov 2026-01-30 21:37:27 +03:00
parent 2d0525ab31
commit 71787a70f7
13 changed files with 158 additions and 88 deletions

3
Cargo.lock generated
View File

@ -3207,6 +3207,7 @@ name = "indexer_core"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream",
"bedrock_client", "bedrock_client",
"borsh", "borsh",
"common", "common",
@ -3227,11 +3228,13 @@ dependencies = [
"async-trait", "async-trait",
"clap 4.5.53", "clap 4.5.53",
"env_logger", "env_logger",
"futures",
"indexer_core", "indexer_core",
"indexer_service_protocol", "indexer_service_protocol",
"indexer_service_rpc", "indexer_service_rpc",
"jsonrpsee", "jsonrpsee",
"log", "log",
"serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-util", "tokio-util",

View File

@ -93,6 +93,7 @@ itertools = "0.14.0"
url = { version = "2.5.4", features = ["serde"] } url = { version = "2.5.4", features = ["serde"] }
tokio-retry = "0.3.0" tokio-retry = "0.3.0"
schemars = "1.2.0" schemars = "1.2.0"
async-stream = "0.3.6"
logos-blockchain-common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }
logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" }

View File

@ -16,3 +16,4 @@ futures.workspace = true
url.workspace = true url.workspace = true
logos-blockchain-core.workspace = true logos-blockchain-core.workspace = true
serde_json.workspace = true serde_json.workspace = true
async-stream.workspace = true

View File

@ -2,7 +2,7 @@ use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use bedrock_client::BedrockClient; use bedrock_client::BedrockClient;
use common::block::HashableBlockData; use common::block::{Block, };
use futures::StreamExt; use futures::StreamExt;
use log::info; use log::info;
use logos_blockchain_core::mantle::{ use logos_blockchain_core::mantle::{
@ -16,6 +16,7 @@ use crate::{config::IndexerConfig, state::IndexerState};
pub mod config; pub mod config;
pub mod state; pub mod state;
#[derive(Clone)]
pub struct IndexerCore { pub struct IndexerCore {
bedrock_client: BedrockClient, bedrock_client: BedrockClient,
config: IndexerConfig, config: IndexerConfig,
@ -37,55 +38,52 @@ impl IndexerCore {
}) })
} }
pub async fn subscribe_parse_block_stream(&self) -> Result<()> { pub async fn subscribe_parse_block_stream(
loop { &self,
let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); ) -> impl futures::Stream<Item = Result<Block>> {
async_stream::stream! {
loop {
let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);
info!("Block stream joined"); info!("Block stream joined");
while let Some(block_info) = stream_pinned.next().await { while let Some(block_info) = stream_pinned.next().await {
let header_id = block_info.header_id; let header_id = block_info.header_id;
info!("Observed L1 block at height {}", block_info.height); info!("Observed L1 block at height {}", block_info.height);
if let Some(l1_block) = self if let Some(l1_block) = self
.bedrock_client .bedrock_client
.get_block_by_id(header_id, &self.config.backoff) .get_block_by_id(header_id, &self.config.backoff)
.await? .await?
{ {
info!("Extracted L1 block at height {}", block_info.height); info!("Extracted L1 block at height {}", block_info.height);
let l2_blocks_parsed = parse_blocks( let l2_blocks_parsed = parse_blocks(
l1_block.into_transactions().into_iter(), l1_block.into_transactions().into_iter(),
&self.config.channel_id, &self.config.channel_id,
); );
for l2_block in l2_blocks_parsed { for l2_block in l2_blocks_parsed {
// State modification, will be updated in future // State modification, will be updated in future
{ {
let mut guard = self.state.latest_seen_block.write().await; let mut guard = self.state.latest_seen_block.write().await;
if l2_block.block_id > *guard { if l2_block.header.block_id > *guard {
*guard = l2_block.block_id; *guard = l2_block.header.block_id;
}
} }
yield Ok(l2_block);
} }
// // Sending data into sequencer, may need to be expanded.
// let message = Message::L2BlockFinalized {
// l2_block_height: l2_block.block_id,
// };
// let status = self.send_message_to_sequencer(message.clone()).await?;
// info!("Sent message {message:#?} to sequencer; status {status:#?}");
} }
} }
}
// Refetch stream after delay // Refetch stream after delay
tokio::time::sleep(std::time::Duration::from_millis( tokio::time::sleep(std::time::Duration::from_millis(
self.config.resubscribe_interval_millis, self.config.resubscribe_interval_millis,
)) ))
.await; .await;
}
} }
} }
} }
@ -93,7 +91,7 @@ impl IndexerCore {
fn parse_blocks( fn parse_blocks(
block_txs: impl Iterator<Item = SignedMantleTx>, block_txs: impl Iterator<Item = SignedMantleTx>,
decoded_channel_id: &ChannelId, decoded_channel_id: &ChannelId,
) -> impl Iterator<Item = HashableBlockData> { ) -> impl Iterator<Item = Block> {
block_txs.flat_map(|tx| { block_txs.flat_map(|tx| {
tx.mantle_tx.ops.into_iter().filter_map(|op| match op { tx.mantle_tx.ops.into_iter().filter_map(|op| match op {
Op::ChannelInscribe(InscriptionOp { Op::ChannelInscribe(InscriptionOp {
@ -101,7 +99,7 @@ fn parse_blocks(
inscription, inscription,
.. ..
}) if channel_id == *decoded_channel_id => { }) if channel_id == *decoded_channel_id => {
borsh::from_slice::<HashableBlockData>(&inscription).ok() borsh::from_slice::<Block>(&inscription).ok()
} }
_ => None, _ => None,
}) })

View File

@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
indexer_service_protocol.workspace = true indexer_service_protocol = { workspace = true, features = ["convert"] }
indexer_service_rpc = { workspace = true, features = ["server"] } indexer_service_rpc = { workspace = true, features = ["server"] }
indexer_core.workspace = true indexer_core.workspace = true
@ -15,7 +15,9 @@ tokio-util.workspace = true
env_logger.workspace = true env_logger.workspace = true
log.workspace = true log.workspace = true
jsonrpsee.workspace = true jsonrpsee.workspace = true
serde.workspace = true
serde_json.workspace = true serde_json.workspace = true
futures.workspace = true
async-trait = "0.1.89" async-trait = "0.1.89"
[features] [features]

View File

@ -24,7 +24,7 @@ pub trait Rpc {
} }
#[subscription(name = "subscribeToFinalizedBlocks", item = Block)] #[subscription(name = "subscribeToFinalizedBlocks", item = Block)]
async fn subscribe_to_finalized_blocks(&self, from: BlockId) -> SubscriptionResult; async fn subscribe_to_finalized_blocks(&self) -> SubscriptionResult;
#[method(name = "getBlockById")] #[method(name = "getBlockById")]
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned>; async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned>;

View File

@ -1,4 +1,3 @@
#[cfg(not(feature = "mock-responses"))]
pub mod service; pub mod service;
#[cfg(feature = "mock-responses")] #[cfg(feature = "mock-responses")]

View File

@ -166,10 +166,13 @@ impl indexer_service_rpc::RpcServer for MockIndexerService {
async fn subscribe_to_finalized_blocks( async fn subscribe_to_finalized_blocks(
&self, &self,
subscription_sink: jsonrpsee::PendingSubscriptionSink, subscription_sink: jsonrpsee::PendingSubscriptionSink,
from: BlockId,
) -> SubscriptionResult { ) -> SubscriptionResult {
let sink = subscription_sink.accept().await?; let sink = subscription_sink.accept().await?;
for block in self.blocks.iter().filter(|b| b.header.block_id >= from) { for block in self
.blocks
.iter()
.filter(|b| b.bedrock_status == BedrockStatus::Finalized)
{
let json = serde_json::value::to_raw_value(block).unwrap(); let json = serde_json::value::to_raw_value(block).unwrap();
sink.send(json).await?; sink.send(json).await?;
} }

View File

@ -1,16 +1,36 @@
use anyhow::Result; use std::{pin::pin, sync::Arc};
use anyhow::{Context as _, Result, anyhow};
use futures::StreamExt as _;
use indexer_core::{IndexerCore, config::IndexerConfig}; use indexer_core::{IndexerCore, config::IndexerConfig};
use indexer_service_protocol::{Account, AccountId, Block, BlockId, Hash, Transaction}; use indexer_service_protocol::{Account, AccountId, Block, BlockId, Hash, Transaction};
use jsonrpsee::{core::SubscriptionResult, types::ErrorObjectOwned}; use jsonrpsee::{SubscriptionSink, core::{Serialize, SubscriptionResult}, types::ErrorObjectOwned};
use tokio::sync::Mutex;
pub struct IndexerService { pub struct IndexerService {
indexer: IndexerCore, service_impl: Arc<Mutex<IndexerServiceImpl>>,
respond_subscribers_loop_handle: tokio::task::JoinHandle<Result<()>>,
}
impl Drop for IndexerService {
fn drop(&mut self) {
self.respond_subscribers_loop_handle.abort();
}
} }
impl IndexerService { impl IndexerService {
pub fn new(config: IndexerConfig) -> Result<Self> { pub fn new(config: IndexerConfig) -> Result<Self> {
let service_impl = Arc::new(Mutex::new(IndexerServiceImpl::new(
IndexerCore::new(config)?,
)));
let respond_subscribers_loop_handle = tokio::spawn(
IndexerServiceImpl::respond_subscribers_loop(Arc::clone(&service_impl)),
);
Ok(Self { Ok(Self {
indexer: IndexerCore::new(config)?, service_impl,
respond_subscribers_loop_handle,
}) })
} }
} }
@ -19,10 +39,11 @@ impl IndexerService {
impl indexer_service_rpc::RpcServer for IndexerService { impl indexer_service_rpc::RpcServer for IndexerService {
async fn subscribe_to_finalized_blocks( async fn subscribe_to_finalized_blocks(
&self, &self,
_subscription_sink: jsonrpsee::PendingSubscriptionSink, subscription_sink: jsonrpsee::PendingSubscriptionSink,
_from: BlockId,
) -> SubscriptionResult { ) -> SubscriptionResult {
todo!() let sink = subscription_sink.accept().await?;
self.service_impl.lock().await.add_subscription(Subscription::new(sink)).await;
Ok(())
} }
async fn get_block_by_id(&self, _block_id: BlockId) -> Result<Block, ErrorObjectOwned> { async fn get_block_by_id(&self, _block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
@ -54,3 +75,63 @@ impl indexer_service_rpc::RpcServer for IndexerService {
todo!() todo!()
} }
} }
struct IndexerServiceImpl {
indexer: IndexerCore,
subscriptions: Vec<Subscription<Block>>,
}
impl IndexerServiceImpl {
fn new(indexer: IndexerCore) -> Self {
Self {
indexer,
subscriptions: Vec::new(),
}
}
async fn add_subscription(&mut self, subscription: Subscription<Block>) {
self.subscriptions.push(subscription);
}
async fn respond_subscribers_loop(service_impl: Arc<Mutex<IndexerServiceImpl>>) -> Result<()> {
let indexer_clone = service_impl.lock().await.indexer.clone();
let mut block_stream = pin!(indexer_clone.subscribe_parse_block_stream().await);
while let Some(block) = block_stream.next().await {
let block= block.context("Failed to get L2 block data")?;
let block = block.try_into().context("Failed to convert L2 Block into protocol Block")?;
// Cloning subscriptions to avoid holding the lock while sending
let subscriptions = service_impl.lock().await.subscriptions.clone();
for sink in subscriptions {
sink.send(&block).await?;
}
}
Err(anyhow!("Block stream ended unexpectedly"))
}
}
#[derive(Clone)]
struct Subscription<T> {
sink: SubscriptionSink,
_marker: std::marker::PhantomData<T>,
}
impl<T> Subscription<T> {
fn new(sink: SubscriptionSink) -> Self {
Self {
sink,
_marker: std::marker::PhantomData,
}
}
async fn send(&self, item: &T) -> Result<()>
where T: Serialize
{
let json = serde_json::value::to_raw_value(item)
.context("Failed to serialize item for subscription")?;
self.sink.send(json).await?;
Ok(())
}
}

View File

@ -116,20 +116,21 @@ impl TestContext {
.context("Failed to create sequencer client")?; .context("Failed to create sequencer client")?;
if let Some(indexer_config) = indexer_config { if let Some(indexer_config) = indexer_config {
let indexer_core = IndexerCore::new(indexer_config)?; // let indexer_core = IndexerCore::new(indexer_config)?;
let indexer_loop_handle = Some(tokio::spawn(async move { // let indexer_loop_handle = Some(tokio::spawn(async move {
indexer_core.subscribe_parse_block_stream().await // indexer_core.subscribe_parse_block_stream().await
})); // }));
Ok(Self { // Ok(Self {
_sequencer_handle, // _sequencer_handle,
indexer_loop_handle, // indexer_loop_handle,
sequencer_client, // sequencer_client,
wallet, // wallet,
_temp_sequencer_dir: temp_sequencer_dir, // _temp_sequencer_dir: temp_sequencer_dir,
_temp_wallet_dir: temp_wallet_dir, // _temp_wallet_dir: temp_wallet_dir,
}) // })
todo!()
} else { } else {
Ok(Self { Ok(Self {
_sequencer_handle, _sequencer_handle,

View File

@ -260,14 +260,6 @@ impl SequencerCore {
} }
} }
pub fn first_pending_block_id(&self) -> Result<Option<u64>> {
Ok(self
.get_pending_blocks()?
.iter()
.map(|block| block.header.block_id)
.min())
}
/// Returns the list of stored pending blocks. /// Returns the list of stored pending blocks.
pub fn get_pending_blocks(&self) -> Result<Vec<Block>> { pub fn get_pending_blocks(&self) -> Result<Vec<Block>> {
Ok(self Ok(self

View File

@ -188,22 +188,11 @@ async fn listen_for_bedrock_blocks_loop(seq_core: Arc<Mutex<SequencerCore>>) ->
let indexer_client = seq_core.lock().await.indexer_client().clone(); let indexer_client = seq_core.lock().await.indexer_client().clone();
loop { loop {
let first_pending_block_id = { // TODO: Subscribe from the first pending block ID?
let sequencer_core = seq_core.lock().await;
sequencer_core
.first_pending_block_id()
.context("Failed to get first pending block ID")?
.unwrap_or(sequencer_core.chain_height())
};
info!("Subscribing to blocks from ID {first_pending_block_id}");
let mut subscription = indexer_client let mut subscription = indexer_client
.subscribe_to_finalized_blocks(first_pending_block_id) .subscribe_to_finalized_blocks()
.await .await
.with_context(|| { .context("Failed to subscribe to finalized blocks")?;
format!("Failed to subscribe to blocks from {first_pending_block_id}")
})?;
while let Some(block) = subscription.next().await { while let Some(block) = subscription.next().await {
let block = block.context("Failed to get next block from subscription")?; let block = block.context("Failed to get next block from subscription")?;

View File

@ -26,7 +26,7 @@ itertools.workspace = true
sha2.workspace = true sha2.workspace = true
futures.workspace = true futures.workspace = true
risc0-zkvm.workspace = true risc0-zkvm.workspace = true
async-stream = "0.3.6" async-stream.workspace = true
indicatif = { version = "0.18.3", features = ["improved_unicode"] } indicatif = { version = "0.18.3", features = ["improved_unicode"] }
optfield = "0.4.0" optfield = "0.4.0"
url.workspace = true url.workspace = true