diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index c794b713..78012c6e 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use anyhow::Result; use bedrock_client::BedrockClient; -use common::block::{Block, }; +use common::block::Block; use futures::StreamExt; use log::info; use logos_blockchain_core::mantle::{ @@ -38,9 +38,7 @@ impl IndexerCore { }) } - pub async fn subscribe_parse_block_stream( - &self, - ) -> impl futures::Stream> { + pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream> { async_stream::stream! { loop { let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?); diff --git a/indexer/service/src/main.rs b/indexer/service/src/main.rs index aaeebacd..e0ece691 100644 --- a/indexer/service/src/main.rs +++ b/indexer/service/src/main.rs @@ -63,7 +63,7 @@ async fn run_server(config_path: PathBuf, port: u16) -> Result; pub struct SequencerCore { state: nssa::V02State, @@ -52,7 +52,9 @@ impl SequencerCore { /// assumed to represent the correct latest state consistent with Bedrock-finalized data. /// If no database is found, the sequencer performs a fresh start from genesis, /// initializing its state with the accounts defined in the configuration file. - pub fn start_from_config(config: SequencerConfig) -> (Self, MemPoolHandle) { + pub async fn start_from_config( + config: SequencerConfig, + ) -> (Self, MemPoolHandle) { let hashable_data = HashableBlockData { block_id: config.genesis_id, transactions: vec![], @@ -116,9 +118,12 @@ impl SequencerCore { .expect("Block settlement client should be constructible") }); - let indexer_client = jsonrpsee::http_client::HttpClientBuilder::default() - .build(config.indexer_rpc_url.clone()) - .expect("Failed to create Indexer client"); + let indexer_client = Arc::new( + jsonrpsee::ws_client::WsClientBuilder::default() + .build(config.indexer_rpc_url.clone()) + .await + .expect("Failed to create Indexer client"), + ); let sequencer_core = Self { state, @@ -275,8 +280,8 @@ impl SequencerCore { self.block_settlement_client.clone() } - pub fn indexer_client(&self) -> &IndexerClient { - &self.indexer_client + pub fn indexer_client(&self) -> IndexerClient { + Arc::clone(&self.indexer_client) } } @@ -387,7 +392,7 @@ mod tests { async fn common_setup_with_config( config: SequencerConfig, ) -> (SequencerCore, MemPoolHandle) { - let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config); + let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config).await; let tx = common::test_utils::produce_dummy_empty_transaction(); mempool_handle.push(tx).await.unwrap(); @@ -399,10 +404,10 @@ mod tests { (sequencer, mempool_handle) } - #[test] - fn test_start_from_config() { + #[tokio::test] + async fn test_start_from_config() { let config = setup_sequencer_config(); - let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); + let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()).await; assert_eq!(sequencer.chain_height, config.genesis_id); assert_eq!(sequencer.sequencer_config.max_num_tx_in_block, 10); @@ -436,8 +441,8 @@ mod tests { assert_eq!(20000, balance_acc_2); } - #[test] - fn test_start_different_intial_accounts_balances() { + #[tokio::test] + async fn test_start_different_intial_accounts_balances() { let acc1_account_id: Vec = vec![ 27, 132, 197, 86, 123, 18, 100, 64, 153, 93, 62, 213, 170, 186, 5, 101, 215, 30, 24, 52, 96, 72, 25, 255, 156, 23, 245, 233, 213, 221, 7, 143, @@ -461,7 +466,7 @@ mod tests { let initial_accounts = vec![initial_acc1, initial_acc2]; let config = setup_sequencer_config_variable_initial_accounts(initial_accounts); - let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); + let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()).await; let acc1_account_id = config.initial_accounts[0] .account_id @@ -788,7 +793,8 @@ mod tests { // from `acc_1` to `acc_2`. The block created with that transaction will be kept stored in // the temporary directory for the block storage of this test. { - let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config.clone()); + let (mut sequencer, mempool_handle) = + SequencerCore::start_from_config(config.clone()).await; let signing_key = PrivateKey::try_new([1; 32]).unwrap(); let tx = common::test_utils::create_transaction_native_token_transfer( @@ -810,7 +816,7 @@ mod tests { // Instantiating a new sequencer from the same config. This should load the existing block // with the above transaction and update the state to reflect that. - let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); + let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()).await; let balance_acc_1 = sequencer.state.get_account_by_id(&acc1_account_id).balance; let balance_acc_2 = sequencer.state.get_account_by_id(&acc2_account_id).balance; @@ -825,10 +831,10 @@ mod tests { ); } - #[test] - fn test_get_pending_blocks() { + #[tokio::test] + async fn test_get_pending_blocks() { let config = setup_sequencer_config(); - let (mut sequencer, _mempool_handle) = SequencerCore::start_from_config(config); + let (mut sequencer, _mempool_handle) = SequencerCore::start_from_config(config).await; sequencer .produce_new_block_with_mempool_transactions() .unwrap(); @@ -841,10 +847,10 @@ mod tests { assert_eq!(sequencer.get_pending_blocks().unwrap().len(), 4); } - #[test] - fn test_delete_blocks() { + #[tokio::test] + async fn test_delete_blocks() { let config = setup_sequencer_config(); - let (mut sequencer, _mempool_handle) = SequencerCore::start_from_config(config); + let (mut sequencer, _mempool_handle) = SequencerCore::start_from_config(config).await; sequencer .produce_new_block_with_mempool_transactions() .unwrap(); diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index e2d6052a..0702ac1f 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -407,7 +407,7 @@ mod tests { async fn components_for_tests() -> (JsonHandler, Vec, EncodedTransaction) { let config = sequencer_config_for_tests(); - let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config); + let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config).await; let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone(); let signing_key = nssa::PrivateKey::try_new([1; 32]).unwrap(); diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs index d62d0ca3..17c984f5 100644 --- a/sequencer_runner/src/lib.rs +++ b/sequencer_runner/src/lib.rs @@ -91,7 +91,7 @@ pub async fn startup_sequencer( Duration::from_millis(app_config.retry_pending_blocks_timeout_millis); let port = app_config.port; - let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config); + let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config).await; info!("Sequencer core set up"); @@ -185,7 +185,7 @@ async fn retry_pending_blocks_loop( async fn listen_for_bedrock_blocks_loop(seq_core: Arc>) -> Result { use indexer_service_rpc::RpcClient as _; - let indexer_client = seq_core.lock().await.indexer_client().clone(); + let indexer_client = seq_core.lock().await.indexer_client(); loop { // TODO: Subscribe from the first pending block ID?