diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 72794bdf..38161a0f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -105,7 +105,7 @@ jobs: env: RISC0_DEV_MODE: "1" RUST_LOG: "info" - run: cargo nextest run --no-fail-fast -- --skip tps_test --skip indexer_run_local_node + run: cargo nextest run --no-fail-fast -- --skip tps_test valid-proof-test: runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 1e7135fe..0506b1b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -956,8 +956,14 @@ name = "bedrock_client" version = "0.1.0" dependencies = [ "anyhow", + "broadcast-service", "common-http-client", + "futures", + "log", + "nomos-core", "reqwest", + "tokio-retry", + "url", ] [[package]] @@ -2877,7 +2883,6 @@ dependencies = [ "nomos-core", "serde", "tokio", - "tokio-retry", "url", ] diff --git a/Cargo.toml b/Cargo.toml index 3fe9c16b..aaadc58b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,6 +85,7 @@ tokio-retry = "0.3.0" common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } key-management-system-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } nomos-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } +broadcast-service = { git = "https://github.com/logos-blockchain/logos-blockchain.git" } rocksdb = { version = "0.24.0", default-features = false, features = [ "snappy", diff --git a/bedrock_client/Cargo.toml b/bedrock_client/Cargo.toml index 034a093e..8ed7fd24 100644 --- a/bedrock_client/Cargo.toml +++ b/bedrock_client/Cargo.toml @@ -7,3 +7,9 @@ edition = "2024" reqwest.workspace = true anyhow.workspace = true common-http-client.workspace = true +nomos-core.workspace = true +broadcast-service.workspace = true +url.workspace = true +futures.workspace = true +tokio-retry.workspace = true +log.workspace = true diff --git a/bedrock_client/src/lib.rs b/bedrock_client/src/lib.rs index 9315f083..e22446f7 100644 --- a/bedrock_client/src/lib.rs +++ b/bedrock_client/src/lib.rs @@ -1,7 +1,13 @@ use anyhow::Result; +use broadcast_service::BlockInfo; use common_http_client::CommonHttpClient; pub use common_http_client::{BasicAuthCredentials, Error}; +use futures::{Stream, TryFutureExt}; +use log::warn; +use nomos_core::{block::Block, header::HeaderId, mantle::SignedMantleTx}; use reqwest::Client; +use tokio_retry::Retry; +use url::Url; // Simple wrapper // maybe extend in the future for our purposes @@ -18,4 +24,26 @@ impl BedrockClient { client, auth, ))) } + + pub async fn get_lib_stream(&self, url: Url) -> Result, Error> { + self.0.get_lib_stream(url).await + } + + pub async fn get_block_by_id( + &self, + url: &Url, + header_id: HeaderId, + start_delay_millis: u64, + max_retries: usize, + ) -> Result>, Error> { + let strategy = tokio_retry::strategy::FibonacciBackoff::from_millis(start_delay_millis) + .take(max_retries); + + Retry::spawn(strategy, || { + self.0 + .get_block_by_id(url.clone(), header_id) + .inspect_err(|err| warn!("Block fetching failed with err: {err:#?}")) + }) + .await + } } diff --git a/common/src/rpc_primitives/requests.rs b/common/src/rpc_primitives/requests.rs index 62460209..3c231f1c 100644 --- a/common/src/rpc_primitives/requests.rs +++ b/common/src/rpc_primitives/requests.rs @@ -220,3 +220,8 @@ pub struct GetInitialTestnetAccountsResponse { pub account_id: String, pub balance: u64, } + +#[derive(Serialize, Deserialize, Debug)] +pub struct GetLastSeenL2BlockResponse { + pub last_block: Option, +} diff --git a/common/src/sequencer_client.rs b/common/src/sequencer_client.rs index b33c22a5..57b0822c 100644 --- a/common/src/sequencer_client.rs +++ b/common/src/sequencer_client.rs @@ -18,8 +18,8 @@ use crate::{ GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse, - GetLastSeenL2BlockAtIndexerRequest, GetProgramIdsRequest, GetProgramIdsResponse, - GetProofForCommitmentRequest, GetProofForCommitmentResponse, + GetLastSeenL2BlockAtIndexerRequest, GetLastSeenL2BlockResponse, GetProgramIdsRequest, + GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest, SendTxResponse, }, @@ -352,7 +352,7 @@ impl SequencerClient { /// Get last seen l2 block at indexer pub async fn get_last_seen_l2_block_at_indexer( &self, - ) -> Result { + ) -> Result { let last_req = GetLastSeenL2BlockAtIndexerRequest {}; let req = serde_json::to_value(last_req).unwrap(); diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index fd8f7382..6727dc4e 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -15,4 +15,3 @@ borsh.workspace = true futures.workspace = true url.workspace = true nomos-core.workspace = true -tokio-retry.workspace = true diff --git a/indexer/src/config.rs b/indexer/src/config.rs index d37bb423..2a5cb0dd 100644 --- a/indexer/src/config.rs +++ b/indexer/src/config.rs @@ -1,8 +1,9 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] +/// Note: For individual RPC requests we use Fibonacci backoff retry strategy pub struct IndexerConfig { - pub resubscribe_interval: u64, - pub start_delay: u64, - pub limit_retry: usize, + pub resubscribe_interval_millis: u64, + pub start_delay_millis: u64, + pub max_retries: usize, } diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index b7d70dab..1b93ff20 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -3,17 +3,16 @@ use std::sync::Arc; use anyhow::Result; use bedrock_client::{BasicAuthCredentials, BedrockClient}; use common::block::HashableBlockData; -use futures::{StreamExt, TryFutureExt}; -use log::{info, warn}; +use futures::StreamExt; +use log::info; use nomos_core::mantle::{ Op, SignedMantleTx, ops::channel::{ChannelId, inscribe::InscriptionOp}, }; use tokio::sync::{RwLock, mpsc::Sender}; -use tokio_retry::Retry; use url::Url; -use crate::{config::IndexerConfig, message::IndexerToSequencerMessage, state::IndexerState}; +use crate::{config::IndexerConfig, message::Message, state::IndexerState}; pub mod config; pub mod message; @@ -21,7 +20,7 @@ pub mod state; pub struct IndexerCore { pub bedrock_client: BedrockClient, - pub channel_sender: Sender, + pub channel_sender: Sender, pub config: IndexerConfig, pub bedrock_url: Url, pub channel_id: ChannelId, @@ -32,7 +31,7 @@ impl IndexerCore { pub fn new( addr: &str, auth: Option, - sender: Sender, + sender: Sender, config: IndexerConfig, channel_id: ChannelId, ) -> Result { @@ -53,7 +52,6 @@ impl IndexerCore { loop { let mut stream_pinned = Box::pin( self.bedrock_client - .0 .get_lib_stream(self.bedrock_url.clone()) .await?, ); @@ -65,18 +63,15 @@ impl IndexerCore { info!("Observed L1 block at height {}", block_info.height); - // Simple retry strategy on requests - let strategy = - tokio_retry::strategy::FibonacciBackoff::from_millis(self.config.start_delay) - .take(self.config.limit_retry); - - if let Some(l1_block) = Retry::spawn(strategy, || { - self.bedrock_client - .0 - .get_block_by_id(self.bedrock_url.clone(), header_id) - .inspect_err(|err| warn!("Block fetching failed with err: {err:#?}")) - }) - .await? + if let Some(l1_block) = self + .bedrock_client + .get_block_by_id( + &self.bedrock_url, + header_id, + self.config.start_delay_millis, + self.config.max_retries, + ) + .await? { info!("Extracted L1 block at height {}", block_info.height); @@ -93,7 +88,7 @@ impl IndexerCore { } // Sending data into sequencer, may need to be expanded. - let message = IndexerToSequencerMessage::BlockObserved { + let message = Message::BlockObserved { l1_block_id: block_info.height, l2_block_height: l2_block.block_id, }; @@ -107,33 +102,29 @@ impl IndexerCore { // Refetch stream after delay tokio::time::sleep(std::time::Duration::from_millis( - self.config.resubscribe_interval, + self.config.resubscribe_interval_millis, )) .await; } } } -pub fn parse_blocks( +fn parse_blocks( block_txs: impl Iterator, decoded_channel_id: &ChannelId, ) -> Vec { block_txs .flat_map(|tx| { - tx.mantle_tx - .ops - .iter() - .filter_map(|op| match op { - Op::ChannelInscribe(InscriptionOp { - channel_id, - inscription, - .. - }) if channel_id == decoded_channel_id => { - borsh::from_slice::(inscription).ok() - } - _ => None, - }) - .collect::>() + tx.mantle_tx.ops.into_iter().filter_map(|op| match op { + Op::ChannelInscribe(InscriptionOp { + channel_id, + inscription, + .. + }) if channel_id == *decoded_channel_id => { + borsh::from_slice::(&inscription).ok() + } + _ => None, + }) }) .collect() } diff --git a/indexer/src/message.rs b/indexer/src/message.rs index 195696ad..afc9cae0 100644 --- a/indexer/src/message.rs +++ b/indexer/src/message.rs @@ -1,5 +1,5 @@ #[derive(Debug, Clone)] -pub enum IndexerToSequencerMessage { +pub enum Message { BlockObserved { l1_block_id: u64, l2_block_height: u64, diff --git a/integration_tests/configs/sequencer/bedrock_local_attached/sequencer_config.json b/integration_tests/configs/sequencer/bedrock_local_attached/sequencer_config.json index 80013751..bfb9d77a 100644 --- a/integration_tests/configs/sequencer/bedrock_local_attached/sequencer_config.json +++ b/integration_tests/configs/sequencer/bedrock_local_attached/sequencer_config.json @@ -161,9 +161,9 @@ "user": "user", "password": null, "indexer_config": { - "resubscribe_interval": 1000, - "start_delay": 1000, - "limit_retry": 10 + "resubscribe_interval_millis": 1000, + "start_delay_millis": 1000, + "max_retries": 10 } } } diff --git a/integration_tests/tests/indexer.rs b/integration_tests/tests/indexer.rs index 120a85c1..a7f27611 100644 --- a/integration_tests/tests/indexer.rs +++ b/integration_tests/tests/indexer.rs @@ -3,7 +3,12 @@ use integration_tests::TestContext; use log::info; use tokio::test; +#[ignore = "needs complicated setup"] #[test] +/// To run this test properly, you need nomos node running in the background. +/// For instructions in building nomos node, refer to [this](https://github.com/logos-blockchain/logos-blockchain?tab=readme-ov-file#running-a-logos-blockchain-node). +/// +/// Recommended to run node locally from build binary. async fn indexer_run_local_node() -> Result<()> { let ctx = TestContext::new_bedrock_local_attached().await?; @@ -15,12 +20,14 @@ async fn indexer_run_local_node() -> Result<()> { .sequencer_client() .get_last_seen_l2_block_at_indexer() .await + .unwrap() + .last_block .unwrap(); // Checking, that some blocks are landed on bedrock - assert!(gen_id.last_block > 0); + assert!(gen_id > 0); - info!("Last seen L2 block at indexer is {}", gen_id.last_block); + info!("Last seen L2 block at indexer is {gen_id}"); Ok(()) } diff --git a/sequencer_core/src/lib.rs b/sequencer_core/src/lib.rs index ed2af491..ee658742 100644 --- a/sequencer_core/src/lib.rs +++ b/sequencer_core/src/lib.rs @@ -9,7 +9,7 @@ use common::{ transaction::{EncodedTransaction, NSSATransaction}, }; use config::SequencerConfig; -use indexer::message::IndexerToSequencerMessage; +use indexer::message::Message; use log::warn; use mempool::{MemPool, MemPoolHandle}; use serde::{Deserialize, Serialize}; @@ -27,9 +27,8 @@ pub struct SequencerCore { mempool: MemPool, sequencer_config: SequencerConfig, chain_height: u64, - // No logic here for now - #[allow(unused)] - receiver: Option>, + #[expect(unused, reason = "No logic here for now")] + receiver: Option>, block_settlement_client: Option, } @@ -51,7 +50,7 @@ impl SequencerCore { /// Start Sequencer from configuration and construct transaction sender pub fn start_from_config( config: SequencerConfig, - receiver: Option>, + receiver: Option>, ) -> (Self, MemPoolHandle) { let hashable_data = HashableBlockData { block_id: config.genesis_id, diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index 797cbe52..99407215 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -16,8 +16,8 @@ use common::{ GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, - GetLastSeenL2BlockAtIndexerRequest, GetProgramIdsRequest, GetProgramIdsResponse, - GetProofForCommitmentRequest, GetProofForCommitmentResponse, + GetLastSeenL2BlockAtIndexerRequest, GetLastSeenL2BlockResponse, GetProgramIdsRequest, + GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest, SendTxResponse, }, @@ -325,13 +325,13 @@ impl JsonHandler { if let Some(indexer_state) = &self.indexer_state { let last_seen_block = indexer_state.latest_seen_block.read().await; - *last_seen_block + Some(*last_seen_block) } else { - 0 + None } }; - let response = GetLastBlockResponse { last_block }; + let response = GetLastSeenL2BlockResponse { last_block }; respond(response) } @@ -422,9 +422,9 @@ mod tests { user: "user".to_string(), password: None, indexer_config: IndexerConfig { - resubscribe_interval: 100, - start_delay: 1000, - limit_retry: 10, + resubscribe_interval_millis: 100, + start_delay_millis: 1000, + max_retries: 10, }, }), } diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs index ffb95806..fead0e3f 100644 --- a/sequencer_runner/src/lib.rs +++ b/sequencer_runner/src/lib.rs @@ -134,8 +134,8 @@ pub async fn main_runner() -> Result<()> { main_loop_handle.await??; - if indexer_loop_handle.is_some() { - indexer_loop_handle.unwrap().await??; + if let Some(indexer_loop_handle) = indexer_loop_handle { + indexer_loop_handle.await??; } Ok(())