From e0729a17251fd03702128141197f3c696c59147a Mon Sep 17 00:00:00 2001 From: Daniil Polyakov Date: Fri, 30 Jan 2026 19:29:59 +0300 Subject: [PATCH] Integrate indexer client into sequencer --- Cargo.lock | 6 + indexer/core/src/lib.rs | 12 +- indexer/service/Cargo.toml | 1 + indexer/service/rpc/src/lib.rs | 4 +- indexer/service/src/main.rs | 7 +- indexer/service/src/mock_service.rs | 14 +- indexer/service/src/service.rs | 2 +- integration_tests/src/lib.rs | 66 ++----- integration_tests/tests/tps.rs | 1 + sequencer_core/Cargo.toml | 4 +- sequencer_core/src/config.rs | 5 +- sequencer_core/src/lib.rs | 21 +++ sequencer_rpc/src/process.rs | 9 +- sequencer_runner/Cargo.toml | 3 + sequencer_runner/src/lib.rs | 268 +++++++++++++++++++--------- wallet/src/config.rs | 2 +- 16 files changed, 270 insertions(+), 155 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d87af8af..844b7645 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3232,6 +3232,7 @@ dependencies = [ "indexer_service_rpc", "jsonrpsee", "log", + "serde_json", "tokio", "tokio-util", ] @@ -6446,6 +6447,7 @@ dependencies = [ "chrono", "common", "futures", + "jsonrpsee", "log", "logos-blockchain-core", "logos-blockchain-key-management-system-service", @@ -6459,6 +6461,7 @@ dependencies = [ "storage", "tempfile", "tokio", + "url", ] [[package]] @@ -6495,6 +6498,9 @@ dependencies = [ "clap 4.5.53", "common", "env_logger", + "futures", + "indexer_service_protocol", + "indexer_service_rpc", "log", "sequencer_core", "sequencer_rpc", diff --git a/indexer/core/src/lib.rs b/indexer/core/src/lib.rs index e88f2057..3508a257 100644 --- a/indexer/core/src/lib.rs +++ b/indexer/core/src/lib.rs @@ -69,14 +69,14 @@ impl IndexerCore { } } - // Sending data into sequencer, may need to be expanded. - let message = Message::L2BlockFinalized { - l2_block_height: l2_block.block_id, - }; + // // Sending data into sequencer, may need to be expanded. + // let message = Message::L2BlockFinalized { + // l2_block_height: l2_block.block_id, + // }; - let status = self.send_message_to_sequencer(message.clone()).await?; + // let status = self.send_message_to_sequencer(message.clone()).await?; - info!("Sent message {message:#?} to sequencer; status {status:#?}"); + // info!("Sent message {message:#?} to sequencer; status {status:#?}"); } } } diff --git a/indexer/service/Cargo.toml b/indexer/service/Cargo.toml index a959fcb1..82639982 100644 --- a/indexer/service/Cargo.toml +++ b/indexer/service/Cargo.toml @@ -15,6 +15,7 @@ tokio-util.workspace = true env_logger.workspace = true log.workspace = true jsonrpsee.workspace = true +serde_json.workspace = true async-trait = "0.1.89" [features] diff --git a/indexer/service/rpc/src/lib.rs b/indexer/service/rpc/src/lib.rs index def20ad5..f0fd519f 100644 --- a/indexer/service/rpc/src/lib.rs +++ b/indexer/service/rpc/src/lib.rs @@ -23,8 +23,8 @@ pub trait Rpc { Ok(serde_json::to_value(block_schema).expect("Schema serialization should not fail")) } - #[subscription(name = "subscribeToBlocks", item = Vec)] - async fn subscribe_to_blocks(&self, from: BlockId) -> SubscriptionResult; + #[subscription(name = "subscribeToFinalizedBlocks", item = Block)] + async fn subscribe_to_finalized_blocks(&self, from: BlockId) -> SubscriptionResult; #[method(name = "getBlockById")] async fn get_block_by_id(&self, block_id: BlockId) -> Result; diff --git a/indexer/service/src/main.rs b/indexer/service/src/main.rs index f18e540b..aaeebacd 100644 --- a/indexer/service/src/main.rs +++ b/indexer/service/src/main.rs @@ -2,6 +2,7 @@ use std::{net::SocketAddr, path::PathBuf}; use anyhow::{Context as _, Result}; use clap::Parser; +use indexer_core::config::IndexerConfig; use indexer_service_rpc::RpcServer as _; use jsonrpsee::server::Server; use log::{error, info}; @@ -10,7 +11,7 @@ use tokio_util::sync::CancellationToken; #[derive(Debug, Parser)] #[clap(version)] struct Args { - #[clap(rename = "config")] + #[clap(name = "config")] config_path: PathBuf, #[clap(short, long, default_value = "8779")] port: u16, @@ -42,7 +43,9 @@ async fn main() -> Result<()> { } async fn run_server(config_path: PathBuf, port: u16) -> Result { - let config = IndexerServiceConfig::from_file(&config_path)?; + let config = IndexerConfig::from_path(&config_path)?; + #[cfg(feature = "mock-responses")] + let _ = config; let server = Server::builder() .build(SocketAddr::from(([0, 0, 0, 0], port))) diff --git a/indexer/service/src/mock_service.rs b/indexer/service/src/mock_service.rs index 06a9780a..e6edd172 100644 --- a/indexer/service/src/mock_service.rs +++ b/indexer/service/src/mock_service.rs @@ -163,13 +163,17 @@ impl MockIndexerService { #[async_trait::async_trait] impl indexer_service_rpc::RpcServer for MockIndexerService { - async fn subscribe_to_blocks( + async fn subscribe_to_finalized_blocks( &self, - _subscription_sink: jsonrpsee::PendingSubscriptionSink, - _from: BlockId, + subscription_sink: jsonrpsee::PendingSubscriptionSink, + from: BlockId, ) -> SubscriptionResult { - // Subscription not implemented for mock service - Err("Subscriptions not supported in mock service".into()) + let sink = subscription_sink.accept().await?; + for block in self.blocks.iter().filter(|b| b.header.block_id >= from) { + let json = serde_json::value::to_raw_value(block).unwrap(); + sink.send(json).await?; + } + Ok(()) } async fn get_block_by_id(&self, block_id: BlockId) -> Result { diff --git a/indexer/service/src/service.rs b/indexer/service/src/service.rs index 49565ad3..643da044 100644 --- a/indexer/service/src/service.rs +++ b/indexer/service/src/service.rs @@ -17,7 +17,7 @@ impl IndexerService { #[async_trait::async_trait] impl indexer_service_rpc::RpcServer for IndexerService { - async fn subscribe_to_blocks( + async fn subscribe_to_finalized_blocks( &self, _subscription_sink: jsonrpsee::PendingSubscriptionSink, _from: BlockId, diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 21e1ca81..0fa39129 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -2,19 +2,18 @@ use std::{net::SocketAddr, path::PathBuf, sync::LazyLock}; -use actix_web::dev::ServerHandle; use anyhow::{Context, Result}; use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; use common::{ sequencer_client::SequencerClient, transaction::{EncodedTransaction, NSSATransaction}, }; -use futures::FutureExt as _; use indexer_core::{IndexerCore, config::IndexerConfig}; use log::debug; use nssa::PrivacyPreservingTransaction; use nssa_core::Commitment; use sequencer_core::config::SequencerConfig; +use sequencer_runner::SequencerHandle; use tempfile::TempDir; use tokio::task::JoinHandle; use url::Url; @@ -38,9 +37,7 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init); /// It's memory and logically safe to create multiple instances of this struct in parallel tests, /// as each instance uses its own temporary directories for sequencer and wallet data. pub struct TestContext { - sequencer_server_handle: ServerHandle, - sequencer_loop_handle: JoinHandle>, - sequencer_retry_pending_blocks_handle: JoinHandle>, + _sequencer_handle: SequencerHandle, indexer_loop_handle: Option>>, sequencer_client: SequencerClient, wallet: WalletCore, @@ -95,15 +92,10 @@ impl TestContext { debug!("Test context setup"); - let ( - sequencer_server_handle, - sequencer_addr, - sequencer_loop_handle, - sequencer_retry_pending_blocks_handle, - temp_sequencer_dir, - ) = Self::setup_sequencer(sequencer_config) - .await - .context("Failed to setup sequencer")?; + let (_sequencer_handle, sequencer_addr, temp_sequencer_dir) = + Self::setup_sequencer(sequencer_config) + .await + .context("Failed to setup sequencer")?; // Convert 0.0.0.0 to 127.0.0.1 for client connections // When binding to port 0, the server binds to 0.0.0.0: @@ -123,10 +115,7 @@ impl TestContext { ) .context("Failed to create sequencer client")?; - if let Some(mut indexer_config) = indexer_config { - indexer_config.sequencer_client_config.addr = - Url::parse(&sequencer_addr).context("Failed to parse sequencer addr")?; - + if let Some(indexer_config) = indexer_config { let indexer_core = IndexerCore::new(indexer_config)?; let indexer_loop_handle = Some(tokio::spawn(async move { @@ -134,9 +123,7 @@ impl TestContext { })); Ok(Self { - sequencer_server_handle, - sequencer_loop_handle, - sequencer_retry_pending_blocks_handle, + _sequencer_handle, indexer_loop_handle, sequencer_client, wallet, @@ -145,9 +132,7 @@ impl TestContext { }) } else { Ok(Self { - sequencer_server_handle, - sequencer_loop_handle, - sequencer_retry_pending_blocks_handle, + _sequencer_handle, indexer_loop_handle: None, sequencer_client, wallet, @@ -159,13 +144,7 @@ impl TestContext { async fn setup_sequencer( mut config: SequencerConfig, - ) -> Result<( - ServerHandle, - SocketAddr, - JoinHandle>, - JoinHandle>, - TempDir, - )> { + ) -> Result<(SequencerHandle, SocketAddr, TempDir)> { let temp_sequencer_dir = tempfile::tempdir().context("Failed to create temp dir for sequencer home")?; @@ -177,20 +156,10 @@ impl TestContext { // Setting port to 0 lets the OS choose a free port for us config.port = 0; - let ( - sequencer_server_handle, - sequencer_addr, - sequencer_loop_handle, - sequencer_retry_pending_blocks_handle, - ) = sequencer_runner::startup_sequencer(config).await?; + let (sequencer_handle, sequencer_addr) = + sequencer_runner::startup_sequencer(config).await?; - Ok(( - sequencer_server_handle, - sequencer_addr, - sequencer_loop_handle, - sequencer_retry_pending_blocks_handle, - temp_sequencer_dir, - )) + Ok((sequencer_handle, sequencer_addr, temp_sequencer_dir)) } async fn setup_wallet(sequencer_addr: String) -> Result<(WalletCore, TempDir)> { @@ -247,9 +216,7 @@ impl Drop for TestContext { debug!("Test context cleanup"); let Self { - sequencer_server_handle, - sequencer_loop_handle, - sequencer_retry_pending_blocks_handle, + _sequencer_handle, indexer_loop_handle, sequencer_client: _, wallet: _, @@ -257,14 +224,9 @@ impl Drop for TestContext { _temp_wallet_dir, } = self; - sequencer_loop_handle.abort(); - sequencer_retry_pending_blocks_handle.abort(); if let Some(indexer_loop_handle) = indexer_loop_handle { indexer_loop_handle.abort(); } - - // Can't wait here as Drop can't be async, but anyway stop signal should be sent - sequencer_server_handle.stop(true).now_or_never(); } } diff --git a/integration_tests/tests/tps.rs b/integration_tests/tests/tps.rs index 5fc09c4c..cb571726 100644 --- a/integration_tests/tests/tps.rs +++ b/integration_tests/tests/tps.rs @@ -191,6 +191,7 @@ impl TpsTestManager { signing_key: [37; 32], bedrock_config: None, retry_pending_blocks_timeout_millis: 1000 * 60 * 4, + indexer_rpc_url: "http://localhost:8779".parse().unwrap(), } } } diff --git a/sequencer_core/Cargo.toml b/sequencer_core/Cargo.toml index 528fa16f..dfe2c27e 100644 --- a/sequencer_core/Cargo.toml +++ b/sequencer_core/Cargo.toml @@ -9,6 +9,7 @@ nssa_core.workspace = true common.workspace = true storage.workspace = true mempool.workspace = true +bedrock_client.workspace = true base58.workspace = true anyhow.workspace = true @@ -18,12 +19,13 @@ tempfile.workspace = true chrono.workspace = true log.workspace = true tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } -bedrock_client.workspace = true logos-blockchain-key-management-system-service.workspace = true logos-blockchain-core.workspace = true rand.workspace = true reqwest.workspace = true borsh.workspace = true +url.workspace = true +jsonrpsee = { workspace = true, features = ["http-client"] } [features] default = [] diff --git a/sequencer_core/src/config.rs b/sequencer_core/src/config.rs index b0dda3e6..26f5f97e 100644 --- a/sequencer_core/src/config.rs +++ b/sequencer_core/src/config.rs @@ -8,6 +8,7 @@ use anyhow::Result; use common::config::BasicAuth; use logos_blockchain_core::mantle::ops::channel::ChannelId; use serde::{Deserialize, Serialize}; +use url::Url; #[derive(Debug, Serialize, Deserialize, Clone)] /// Helperstruct for account serialization @@ -53,6 +54,8 @@ pub struct SequencerConfig { pub signing_key: [u8; 32], /// Bedrock configuration options pub bedrock_config: Option, + /// Indexer RPC URL + pub indexer_rpc_url: Url, } #[derive(Clone, Serialize, Deserialize)] @@ -60,7 +63,7 @@ pub struct BedrockConfig { /// Bedrock channel ID pub channel_id: ChannelId, /// Bedrock Url - pub node_url: String, + pub node_url: Url, /// Bedrock auth pub auth: Option, } diff --git a/sequencer_core/src/lib.rs b/sequencer_core/src/lib.rs index efddcd7e..f1b9dc80 100644 --- a/sequencer_core/src/lib.rs +++ b/sequencer_core/src/lib.rs @@ -19,6 +19,8 @@ mod block_settlement_client; pub mod block_store; pub mod config; +type IndexerClient = jsonrpsee::http_client::HttpClient; + pub struct SequencerCore { state: nssa::V02State, store: SequencerStore, @@ -26,6 +28,7 @@ pub struct SequencerCore { sequencer_config: SequencerConfig, chain_height: u64, block_settlement_client: Option, + indexer_client: IndexerClient, last_bedrock_msg_id: MantleMsgId, } @@ -113,6 +116,10 @@ impl SequencerCore { .expect("Block settlement client should be constructible") }); + let indexer_client = jsonrpsee::http_client::HttpClientBuilder::default() + .build(config.indexer_rpc_url.clone()) + .expect("Failed to create Indexer client"); + let sequencer_core = Self { state, store, @@ -120,6 +127,7 @@ impl SequencerCore { chain_height: config.genesis_id, sequencer_config: config, block_settlement_client, + indexer_client, last_bedrock_msg_id: channel_genesis_msg_id, }; @@ -252,6 +260,14 @@ impl SequencerCore { } } + pub fn first_pending_block_id(&self) -> Result> { + Ok(self + .get_pending_blocks()? + .iter() + .map(|block| block.header.block_id) + .min()) + } + /// Returns the list of stored pending blocks. pub fn get_pending_blocks(&self) -> Result> { Ok(self @@ -266,6 +282,10 @@ impl SequencerCore { pub fn block_settlement_client(&self) -> Option { self.block_settlement_client.clone() } + + pub fn indexer_client(&self) -> &IndexerClient { + &self.indexer_client + } } // TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction @@ -329,6 +349,7 @@ mod tests { signing_key: *sequencer_sign_key_for_testing().value(), bedrock_config: None, retry_pending_blocks_timeout_millis: 1000 * 60 * 4, + indexer_rpc_url: "http://localhost:8779".parse().unwrap(), } } diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index 6d149448..e2d6052a 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -18,8 +18,8 @@ use common::{ GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, GetProofForCommitmentResponse, GetTransactionByHashRequest, - GetTransactionByHashResponse, HelloRequest, HelloResponse, PostIndexerMessageRequest, - PostIndexerMessageResponse, SendTxRequest, SendTxResponse, + GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest, + SendTxResponse, }, }, transaction::{EncodedTransaction, NSSATransaction}, @@ -341,7 +341,7 @@ mod tests { use base58::ToBase58; use base64::{Engine, engine::general_purpose}; use common::{ - sequencer_client::BasicAuth, test_utils::sequencer_sign_key_for_testing, + config::BasicAuth, test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction, }; use sequencer_core::{ @@ -394,12 +394,13 @@ mod tests { retry_pending_blocks_timeout_millis: 1000 * 60 * 4, bedrock_config: Some(BedrockConfig { channel_id: [42; 32].into(), - node_url: "http://localhost:8080".to_string(), + node_url: "http://localhost:8080".parse().unwrap(), auth: Some(BasicAuth { username: "user".to_string(), password: None, }), }), + indexer_rpc_url: "http://localhost:8779".parse().unwrap(), } } diff --git a/sequencer_runner/Cargo.toml b/sequencer_runner/Cargo.toml index 55f56dec..f840317c 100644 --- a/sequencer_runner/Cargo.toml +++ b/sequencer_runner/Cargo.toml @@ -7,6 +7,8 @@ edition = "2024" common.workspace = true sequencer_core = { workspace = true, features = ["testnet"] } sequencer_rpc.workspace = true +indexer_service_protocol.workspace = true +indexer_service_rpc = { workspace = true, features = ["client"] } clap = { workspace = true, features = ["derive", "env"] } anyhow.workspace = true @@ -15,3 +17,4 @@ log.workspace = true actix.workspace = true actix-web.workspace = true tokio.workspace = true +futures.workspace = true diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs index 8dbea525..ef381da1 100644 --- a/sequencer_runner/src/lib.rs +++ b/sequencer_runner/src/lib.rs @@ -1,10 +1,11 @@ -use std::{net::SocketAddr, path::PathBuf, sync::Arc}; +use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use actix_web::dev::ServerHandle; -use anyhow::Result; +use anyhow::{Context as _, Result}; use clap::Parser; use common::rpc_primitives::RpcConfig; -use log::{info, warn}; +use futures::FutureExt as _; +use log::{debug, error, info, warn}; use sequencer_core::{SequencerCore, config::SequencerConfig}; use sequencer_rpc::new_http_server; use tokio::{sync::Mutex, task::JoinHandle}; @@ -18,16 +19,76 @@ struct Args { home_dir: PathBuf, } +/// An enum that can never be instantiated, used to replace unstable `!` type. +#[derive(Debug)] +pub enum Never {} + +/// Handle to manage the sequencer and its tasks. +/// +/// Implements `Drop` to ensure all tasks are aborted and the HTTP server is stopped when dropped. +pub struct SequencerHandle { + http_server_handle: ServerHandle, + main_loop_handle: JoinHandle>, + retry_pending_blocks_loop_handle: JoinHandle>, + listen_for_bedrock_blocks_loop_handle: JoinHandle>, +} + +impl SequencerHandle { + /// Runs the sequencer indefinitely, monitoring its tasks. + /// + /// If no error occurs, this function will never return. + async fn run_forever(&mut self) -> Result { + let Self { + http_server_handle: _, + main_loop_handle, + retry_pending_blocks_loop_handle, + listen_for_bedrock_blocks_loop_handle, + } = self; + + tokio::select! { + res = main_loop_handle => { + res + .context("Main loop task panicked")? + .context("Main loop exited unexpectedly") + } + res = retry_pending_blocks_loop_handle => { + res + .context("Retry pending blocks loop task panicked")? + .context("Retry pending blocks loop exited unexpectedly") + } + res = listen_for_bedrock_blocks_loop_handle => { + res + .context("Listen for bedrock blocks loop task panicked")? + .context("Listen for bedrock blocks loop exited unexpectedly") + } + } + } +} + +impl Drop for SequencerHandle { + fn drop(&mut self) { + let Self { + http_server_handle, + main_loop_handle, + retry_pending_blocks_loop_handle, + listen_for_bedrock_blocks_loop_handle, + } = self; + + main_loop_handle.abort(); + retry_pending_blocks_loop_handle.abort(); + listen_for_bedrock_blocks_loop_handle.abort(); + + // Can't wait here as Drop can't be async, but anyway stop signal should be sent + http_server_handle.stop(true).now_or_never(); + } +} + pub async fn startup_sequencer( app_config: SequencerConfig, -) -> Result<( - ServerHandle, - SocketAddr, - JoinHandle>, - JoinHandle>, -)> { - let block_timeout = app_config.block_create_timeout_millis; - let retry_pending_blocks_timeout = app_config.retry_pending_blocks_timeout_millis; +) -> Result<(SequencerHandle, SocketAddr)> { + let block_timeout = Duration::from_millis(app_config.block_create_timeout_millis); + let retry_pending_blocks_timeout = + Duration::from_millis(app_config.retry_pending_blocks_timeout_millis); let port = app_config.port; let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config); @@ -45,69 +106,128 @@ pub async fn startup_sequencer( let http_server_handle = http_server.handle(); tokio::spawn(http_server); - info!("Starting pending block retry loop"); - let seq_core_wrapped_for_block_retry = seq_core_wrapped.clone(); - let retry_pending_blocks_handle = tokio::spawn(async move { - loop { - tokio::time::sleep(std::time::Duration::from_millis( - retry_pending_blocks_timeout, - )) - .await; - - let (pending_blocks, block_settlement_client) = { - let sequencer_core = seq_core_wrapped_for_block_retry.lock().await; - let client = sequencer_core.block_settlement_client(); - let pending_blocks = sequencer_core - .get_pending_blocks() - .expect("Sequencer should be able to retrieve pending blocks"); - (pending_blocks, client) - }; - - let Some(client) = block_settlement_client else { - continue; - }; - - info!("Resubmitting {} pending blocks", pending_blocks.len()); - for block in &pending_blocks { - if let Err(e) = client.submit_block_to_bedrock(block).await { - warn!( - "Failed to resubmit block with id {} with error {}", - block.header.block_id, e - ); - } - } - } - }); - info!("Starting main sequencer loop"); - let main_loop_handle = tokio::spawn(async move { - loop { - tokio::time::sleep(std::time::Duration::from_millis(block_timeout)).await; + let main_loop_handle = tokio::spawn(main_loop(Arc::clone(&seq_core_wrapped), block_timeout)); - info!("Collecting transactions from mempool, block creation"); + info!("Starting pending block retry loop"); + let retry_pending_blocks_loop_handle = tokio::spawn(retry_pending_blocks_loop( + Arc::clone(&seq_core_wrapped), + retry_pending_blocks_timeout, + )); - let id = { - let mut state = seq_core_wrapped.lock().await; - - state - .produce_new_block_and_post_to_settlement_layer() - .await? - }; - - info!("Block with id {id} created"); - - info!("Waiting for new transactions"); - } - }); + info!("Starting bedrock block listening loop"); + let listen_for_bedrock_blocks_loop_handle = + tokio::spawn(listen_for_bedrock_blocks_loop(seq_core_wrapped)); Ok(( - http_server_handle, + SequencerHandle { + http_server_handle, + main_loop_handle, + retry_pending_blocks_loop_handle, + listen_for_bedrock_blocks_loop_handle, + }, addr, - main_loop_handle, - retry_pending_blocks_handle, )) } +async fn main_loop(seq_core: Arc>, block_timeout: Duration) -> Result { + loop { + tokio::time::sleep(block_timeout).await; + + info!("Collecting transactions from mempool, block creation"); + + let id = { + let mut state = seq_core.lock().await; + + state + .produce_new_block_and_post_to_settlement_layer() + .await? + }; + + info!("Block with id {id} created"); + + info!("Waiting for new transactions"); + } +} + +async fn retry_pending_blocks_loop( + seq_core: Arc>, + retry_pending_blocks_timeout: Duration, +) -> Result { + loop { + tokio::time::sleep(retry_pending_blocks_timeout).await; + + let (pending_blocks, block_settlement_client) = { + let sequencer_core = seq_core.lock().await; + let client = sequencer_core.block_settlement_client(); + let pending_blocks = sequencer_core + .get_pending_blocks() + .expect("Sequencer should be able to retrieve pending blocks"); + (pending_blocks, client) + }; + + let Some(client) = block_settlement_client else { + continue; + }; + + info!("Resubmitting {} pending blocks", pending_blocks.len()); + for block in &pending_blocks { + if let Err(e) = client.submit_block_to_bedrock(block).await { + warn!( + "Failed to resubmit block with id {} with error {}", + block.header.block_id, e + ); + } + } + } +} + +async fn listen_for_bedrock_blocks_loop(seq_core: Arc>) -> Result { + use indexer_service_rpc::RpcClient as _; + + let indexer_client = seq_core.lock().await.indexer_client().clone(); + + loop { + let first_pending_block_id = { + let sequencer_core = seq_core.lock().await; + + sequencer_core + .first_pending_block_id() + .context("Failed to get first pending block ID")? + .unwrap_or(sequencer_core.chain_height()) + }; + + info!("Subscribing to blocks from ID {first_pending_block_id}"); + let mut subscription = indexer_client + .subscribe_to_finalized_blocks(first_pending_block_id) + .await + .with_context(|| { + format!("Failed to subscribe to blocks from {first_pending_block_id}") + })?; + + while let Some(block) = subscription.next().await { + let block = block.context("Failed to get next block from subscription")?; + let block_id = block.header.block_id; + + info!("Received new L2 block with ID {block_id}"); + debug!("Block data: {block:#?}"); + + seq_core + .lock() + .await + .clean_finalized_blocks_from_db(block_id) + .with_context(|| { + format!("Failed to clean finalized blocks from DB for block ID {block_id}") + })?; + } + + warn!( + "Block subscription closed unexpectedly, reason: {:?}", + subscription.close_reason() + ); + } +} + pub async fn main_runner() -> Result<()> { env_logger::init(); @@ -125,24 +245,12 @@ pub async fn main_runner() -> Result<()> { } // ToDo: Add restart on failures - let (_, _, main_loop_handle, retry_loop_handle) = startup_sequencer(app_config).await?; + let (mut sequencer_handle, _addr) = startup_sequencer(app_config).await?; info!("Sequencer running. Monitoring concurrent tasks..."); - tokio::select! { - res = main_loop_handle => { - match res { - Ok(inner_res) => warn!("Main loop exited unexpectedly: {:?}", inner_res), - Err(e) => warn!("Main loop task panicked: {:?}", e), - } - } - res = retry_loop_handle => { - match res { - Ok(inner_res) => warn!("Retry loop exited unexpectedly: {:?}", inner_res), - Err(e) => warn!("Retry loop task panicked: {:?}", e), - } - } - } + let Err(err) = sequencer_handle.run_forever().await; + error!("Sequencer failed: {err:?}"); info!("Shutting down sequencer..."); diff --git a/wallet/src/config.rs b/wallet/src/config.rs index 2784267b..3a5d6e0c 100644 --- a/wallet/src/config.rs +++ b/wallet/src/config.rs @@ -4,7 +4,7 @@ use std::{ }; use anyhow::{Context as _, Result}; -use common::common::BasicAuth; +use common::config::BasicAuth; use key_protocol::key_management::{ KeyChain, key_tree::{