diff --git a/Cargo.lock b/Cargo.lock index 844b7645..604ad55f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3207,6 +3207,7 @@ name = "indexer_core" version = "0.1.0" dependencies = [ "anyhow", + "async-stream", "bedrock_client", "borsh", "common", @@ -3227,11 +3228,13 @@ dependencies = [ "async-trait", "clap 4.5.53", "env_logger", + "futures", "indexer_core", "indexer_service_protocol", "indexer_service_rpc", "jsonrpsee", "log", + "serde", "serde_json", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index 2e5602f6..cc078836 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,6 +93,7 @@ itertools = "0.14.0" url = { version = "2.5.4", features = ["serde"] } tokio-retry = "0.3.0" schemars = "1.2.0" +async-stream = "0.3.6" logos-blockchain-common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } logos-blockchain-key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } diff --git a/indexer/core/Cargo.toml b/indexer/core/Cargo.toml index 922f566c..8e503410 100644 --- a/indexer/core/Cargo.toml +++ b/indexer/core/Cargo.toml @@ -16,3 +16,4 @@ futures.workspace = true url.workspace = true logos-blockchain-core.workspace = true serde_json.workspace = true +async-stream.workspace = true diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index 3508a257..c794b713 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use anyhow::Result; use bedrock_client::BedrockClient; -use common::block::HashableBlockData; +use common::block::{Block, }; use futures::StreamExt; use log::info; use logos_blockchain_core::mantle::{ @@ -16,6 +16,7 @@ use crate::{config::IndexerConfig, state::IndexerState}; pub mod config; pub mod state; +#[derive(Clone)] pub struct IndexerCore { bedrock_client: BedrockClient, config: IndexerConfig, @@ -37,55 +38,52 @@ impl IndexerCore { }) } - pub async fn subscribe_parse_block_stream(&self) -> Result<()> { - loop { - let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); + pub async fn subscribe_parse_block_stream( + &self, + ) -> impl futures::Stream> { + async_stream::stream! { + loop { + let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); - info!("Block stream joined"); + info!("Block stream joined"); - while let Some(block_info) = stream_pinned.next().await { - let header_id = block_info.header_id; + while let Some(block_info) = stream_pinned.next().await { + let header_id = block_info.header_id; - info!("Observed L1 block at height {}", block_info.height); + info!("Observed L1 block at height {}", block_info.height); - if let Some(l1_block) = self - .bedrock_client - .get_block_by_id(header_id, &self.config.backoff) - .await? - { - info!("Extracted L1 block at height {}", block_info.height); + if let Some(l1_block) = self + .bedrock_client + .get_block_by_id(header_id, &self.config.backoff) + .await? + { + info!("Extracted L1 block at height {}", block_info.height); - let l2_blocks_parsed = parse_blocks( - l1_block.into_transactions().into_iter(), - &self.config.channel_id, - ); + let l2_blocks_parsed = parse_blocks( + l1_block.into_transactions().into_iter(), + &self.config.channel_id, + ); - for l2_block in l2_blocks_parsed { - // State modification, will be updated in future - { - let mut guard = self.state.latest_seen_block.write().await; - if l2_block.block_id > *guard { - *guard = l2_block.block_id; + for l2_block in l2_blocks_parsed { + // State modification, will be updated in future + { + let mut guard = self.state.latest_seen_block.write().await; + if l2_block.header.block_id > *guard { + *guard = l2_block.header.block_id; + } } + + yield Ok(l2_block); } - - // // Sending data into sequencer, may need to be expanded. - // let message = Message::L2BlockFinalized { - // l2_block_height: l2_block.block_id, - // }; - - // let status = self.send_message_to_sequencer(message.clone()).await?; - - // info!("Sent message {message:#?} to sequencer; status {status:#?}"); } } - } - // Refetch stream after delay - tokio::time::sleep(std::time::Duration::from_millis( - self.config.resubscribe_interval_millis, - )) - .await; + // Refetch stream after delay + tokio::time::sleep(std::time::Duration::from_millis( + self.config.resubscribe_interval_millis, + )) + .await; + } } } } @@ -93,7 +91,7 @@ impl IndexerCore { fn parse_blocks( block_txs: impl Iterator, decoded_channel_id: &ChannelId, -) -> impl Iterator { +) -> impl Iterator { block_txs.flat_map(|tx| { tx.mantle_tx.ops.into_iter().filter_map(|op| match op { Op::ChannelInscribe(InscriptionOp { @@ -101,7 +99,7 @@ fn parse_blocks( inscription, .. }) if channel_id == *decoded_channel_id => { - borsh::from_slice::(&inscription).ok() + borsh::from_slice::(&inscription).ok() } _ => None, }) diff --git a/indexer/service/Cargo.toml b/indexer/service/Cargo.toml index 82639982..f41a9afd 100644 --- a/indexer/service/Cargo.toml +++ b/indexer/service/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] -indexer_service_protocol.workspace = true +indexer_service_protocol = { workspace = true, features = ["convert"] } indexer_service_rpc = { workspace = true, features = ["server"] } indexer_core.workspace = true @@ -15,7 +15,9 @@ tokio-util.workspace = true env_logger.workspace = true log.workspace = true jsonrpsee.workspace = true +serde.workspace = true serde_json.workspace = true +futures.workspace = true async-trait = "0.1.89" [features] diff --git a/indexer/service/rpc/src/lib.rs b/indexer/service/rpc/src/lib.rs index 820fd9a8..18755df5 100644 --- a/indexer/service/rpc/src/lib.rs +++ b/indexer/service/rpc/src/lib.rs @@ -24,7 +24,7 @@ pub trait Rpc { } #[subscription(name = "subscribeToFinalizedBlocks", item = Block)] - async fn subscribe_to_finalized_blocks(&self, from: BlockId) -> SubscriptionResult; + async fn subscribe_to_finalized_blocks(&self) -> SubscriptionResult; #[method(name = "getBlockById")] async fn get_block_by_id(&self, block_id: BlockId) -> Result; diff --git a/indexer/service/src/lib.rs b/indexer/service/src/lib.rs index ccabe1ae..0c18410e 100644 --- a/indexer/service/src/lib.rs +++ b/indexer/service/src/lib.rs @@ -1,4 +1,3 @@ -#[cfg(not(feature = "mock-responses"))] pub mod service; #[cfg(feature = "mock-responses")] diff --git a/indexer/service/src/mock_service.rs b/indexer/service/src/mock_service.rs index 2ce801ce..e7afda18 100644 --- a/indexer/service/src/mock_service.rs +++ b/indexer/service/src/mock_service.rs @@ -166,10 +166,13 @@ impl indexer_service_rpc::RpcServer for MockIndexerService { async fn subscribe_to_finalized_blocks( &self, subscription_sink: jsonrpsee::PendingSubscriptionSink, - from: BlockId, ) -> SubscriptionResult { let sink = subscription_sink.accept().await?; - for block in self.blocks.iter().filter(|b| b.header.block_id >= from) { + for block in self + .blocks + .iter() + .filter(|b| b.bedrock_status == BedrockStatus::Finalized) + { let json = serde_json::value::to_raw_value(block).unwrap(); sink.send(json).await?; } diff --git a/indexer/service/src/service.rs b/indexer/service/src/service.rs index 224ad38f..c7cdccd4 100644 --- a/indexer/service/src/service.rs +++ b/indexer/service/src/service.rs @@ -1,16 +1,36 @@ -use anyhow::Result; +use std::{pin::pin, sync::Arc}; + +use anyhow::{Context as _, Result, anyhow}; +use futures::StreamExt as _; use indexer_core::{IndexerCore, config::IndexerConfig}; use indexer_service_protocol::{Account, AccountId, Block, BlockId, Hash, Transaction}; -use jsonrpsee::{core::SubscriptionResult, types::ErrorObjectOwned}; +use jsonrpsee::{SubscriptionSink, core::{Serialize, SubscriptionResult}, types::ErrorObjectOwned}; +use tokio::sync::Mutex; pub struct IndexerService { - indexer: IndexerCore, + service_impl: Arc>, + respond_subscribers_loop_handle: tokio::task::JoinHandle>, +} + +impl Drop for IndexerService { + fn drop(&mut self) { + self.respond_subscribers_loop_handle.abort(); + } } impl IndexerService { pub fn new(config: IndexerConfig) -> Result { + let service_impl = Arc::new(Mutex::new(IndexerServiceImpl::new( + IndexerCore::new(config)?, + ))); + + let respond_subscribers_loop_handle = tokio::spawn( + IndexerServiceImpl::respond_subscribers_loop(Arc::clone(&service_impl)), + ); + Ok(Self { - indexer: IndexerCore::new(config)?, + service_impl, + respond_subscribers_loop_handle, }) } } @@ -19,10 +39,11 @@ impl IndexerService { impl indexer_service_rpc::RpcServer for IndexerService { async fn subscribe_to_finalized_blocks( &self, - _subscription_sink: jsonrpsee::PendingSubscriptionSink, - _from: BlockId, + subscription_sink: jsonrpsee::PendingSubscriptionSink, ) -> SubscriptionResult { - todo!() + let sink = subscription_sink.accept().await?; + self.service_impl.lock().await.add_subscription(Subscription::new(sink)).await; + Ok(()) } async fn get_block_by_id(&self, _block_id: BlockId) -> Result { @@ -54,3 +75,63 @@ impl indexer_service_rpc::RpcServer for IndexerService { todo!() } } + +struct IndexerServiceImpl { + indexer: IndexerCore, + subscriptions: Vec>, +} + +impl IndexerServiceImpl { + fn new(indexer: IndexerCore) -> Self { + Self { + indexer, + subscriptions: Vec::new(), + } + } + + async fn add_subscription(&mut self, subscription: Subscription) { + self.subscriptions.push(subscription); + } + + async fn respond_subscribers_loop(service_impl: Arc>) -> Result<()> { + let indexer_clone = service_impl.lock().await.indexer.clone(); + + let mut block_stream = pin!(indexer_clone.subscribe_parse_block_stream().await); + while let Some(block) = block_stream.next().await { + let block= block.context("Failed to get L2 block data")?; + let block = block.try_into().context("Failed to convert L2 Block into protocol Block")?; + + // Cloning subscriptions to avoid holding the lock while sending + let subscriptions = service_impl.lock().await.subscriptions.clone(); + for sink in subscriptions { + sink.send(&block).await?; + } + } + + Err(anyhow!("Block stream ended unexpectedly")) + } +} + +#[derive(Clone)] +struct Subscription { + sink: SubscriptionSink, + _marker: std::marker::PhantomData, +} + +impl Subscription { + fn new(sink: SubscriptionSink) -> Self { + Self { + sink, + _marker: std::marker::PhantomData, + } + } + + async fn send(&self, item: &T) -> Result<()> + where T: Serialize + { + let json = serde_json::value::to_raw_value(item) + .context("Failed to serialize item for subscription")?; + self.sink.send(json).await?; + Ok(()) + } +} diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 0fa39129..baff7c9c 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -116,20 +116,21 @@ impl TestContext { .context("Failed to create sequencer client")?; if let Some(indexer_config) = indexer_config { - let indexer_core = IndexerCore::new(indexer_config)?; + // let indexer_core = IndexerCore::new(indexer_config)?; - let indexer_loop_handle = Some(tokio::spawn(async move { - indexer_core.subscribe_parse_block_stream().await - })); + // let indexer_loop_handle = Some(tokio::spawn(async move { + // indexer_core.subscribe_parse_block_stream().await + // })); - Ok(Self { - _sequencer_handle, - indexer_loop_handle, - sequencer_client, - wallet, - _temp_sequencer_dir: temp_sequencer_dir, - _temp_wallet_dir: temp_wallet_dir, - }) + // Ok(Self { + // _sequencer_handle, + // indexer_loop_handle, + // sequencer_client, + // wallet, + // _temp_sequencer_dir: temp_sequencer_dir, + // _temp_wallet_dir: temp_wallet_dir, + // }) + todo!() } else { Ok(Self { _sequencer_handle, diff --git a/sequencer_core/src/lib.rs b/sequencer_core/src/lib.rs index f1b9dc80..4af124e4 100644 --- a/sequencer_core/src/lib.rs +++ b/sequencer_core/src/lib.rs @@ -260,14 +260,6 @@ impl SequencerCore { } } - pub fn first_pending_block_id(&self) -> Result> { - Ok(self - .get_pending_blocks()? - .iter() - .map(|block| block.header.block_id) - .min()) - } - /// Returns the list of stored pending blocks. pub fn get_pending_blocks(&self) -> Result> { Ok(self diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs index ef381da1..d62d0ca3 100644 --- a/sequencer_runner/src/lib.rs +++ b/sequencer_runner/src/lib.rs @@ -188,22 +188,11 @@ async fn listen_for_bedrock_blocks_loop(seq_core: Arc>) -> let indexer_client = seq_core.lock().await.indexer_client().clone(); loop { - let first_pending_block_id = { - let sequencer_core = seq_core.lock().await; - - sequencer_core - .first_pending_block_id() - .context("Failed to get first pending block ID")? - .unwrap_or(sequencer_core.chain_height()) - }; - - info!("Subscribing to blocks from ID {first_pending_block_id}"); + // TODO: Subscribe from the first pending block ID? let mut subscription = indexer_client - .subscribe_to_finalized_blocks(first_pending_block_id) + .subscribe_to_finalized_blocks() .await - .with_context(|| { - format!("Failed to subscribe to blocks from {first_pending_block_id}") - })?; + .context("Failed to subscribe to finalized blocks")?; while let Some(block) = subscription.next().await { let block = block.context("Failed to get next block from subscription")?; diff --git a/wallet/Cargo.toml b/wallet/Cargo.toml index 292cebac..45b0788c 100644 --- a/wallet/Cargo.toml +++ b/wallet/Cargo.toml @@ -26,7 +26,7 @@ itertools.workspace = true sha2.workspace = true futures.workspace = true risc0-zkvm.workspace = true -async-stream = "0.3.6" +async-stream.workspace = true indicatif = { version = "0.18.3", features = ["improved_unicode"] } optfield = "0.4.0" url.workspace = true