use std::{net::SocketAddr, sync::Arc, time::Duration}; use anyhow::{Context as _, Result, anyhow}; use bytesize::ByteSize; use common::transaction::NSSATransaction; use futures::never::Never; use jsonrpsee::server::ServerHandle; #[cfg(not(feature = "standalone"))] use log::warn; use log::{error, info}; use mempool::MemPoolHandle; #[cfg(feature = "standalone")] use sequencer_core::SequencerCoreWithMockClients as SequencerCore; pub use sequencer_core::config::*; #[cfg(not(feature = "standalone"))] use sequencer_core::{SequencerCore, block_settlement_client::BlockSettlementClientTrait as _}; use sequencer_service_rpc::RpcServer as _; use tokio::{sync::Mutex, task::JoinHandle}; pub mod service; const REQUEST_BODY_MAX_SIZE: ByteSize = ByteSize::mib(10); /// Handle to manage the sequencer and its tasks. /// /// Implements `Drop` to ensure all tasks are aborted and the RPC server is stopped when dropped. pub struct SequencerHandle { addr: SocketAddr, /// Option because of `Drop` which forbids to simply move out of `self` in `stopped()`. server_handle: Option, main_loop_handle: JoinHandle>, retry_pending_blocks_loop_handle: JoinHandle>, listen_for_bedrock_blocks_loop_handle: JoinHandle>, } impl SequencerHandle { const fn new( addr: SocketAddr, server_handle: ServerHandle, main_loop_handle: JoinHandle>, retry_pending_blocks_loop_handle: JoinHandle>, listen_for_bedrock_blocks_loop_handle: JoinHandle>, ) -> Self { Self { addr, server_handle: Some(server_handle), main_loop_handle, retry_pending_blocks_loop_handle, listen_for_bedrock_blocks_loop_handle, } } /// Wait for any of the sequencer tasks to fail and return the error. #[expect( clippy::integer_division_remainder_used, reason = "Generated by select! macro, can't be easily rewritten to avoid this lint" )] pub async fn failed(mut self) -> Result { let Self { addr: _, server_handle, main_loop_handle, retry_pending_blocks_loop_handle, listen_for_bedrock_blocks_loop_handle, } = &mut self; let server_handle = server_handle.take().expect("Server handle is set"); tokio::select! { () = server_handle.stopped() => { Err(anyhow!("RPC Server stopped")) } res = main_loop_handle => { res .context("Main loop task panicked")? .context("Main loop exited unexpectedly") } res = retry_pending_blocks_loop_handle => { res .context("Retry pending blocks loop task panicked")? .context("Retry pending blocks loop exited unexpectedly") } res = listen_for_bedrock_blocks_loop_handle => { res .context("Listen for bedrock blocks loop task panicked")? .context("Listen for bedrock blocks loop exited unexpectedly") } } } /// Check if all Sequencer tasks are still running. /// /// Return `false` if any of the tasks has failed and `true` otherwise. /// Error of the failed task can be retrieved by awaiting on [`Self::failed()`]. #[must_use] pub fn is_healthy(&self) -> bool { let Self { addr: _, server_handle, main_loop_handle, retry_pending_blocks_loop_handle, listen_for_bedrock_blocks_loop_handle, } = self; let stopped = server_handle.as_ref().is_none_or(ServerHandle::is_stopped) || main_loop_handle.is_finished() || retry_pending_blocks_loop_handle.is_finished() || listen_for_bedrock_blocks_loop_handle.is_finished(); !stopped } #[must_use] pub const fn addr(&self) -> SocketAddr { self.addr } } impl Drop for SequencerHandle { fn drop(&mut self) { let Self { addr: _, server_handle, main_loop_handle, retry_pending_blocks_loop_handle, listen_for_bedrock_blocks_loop_handle, } = self; main_loop_handle.abort(); retry_pending_blocks_loop_handle.abort(); listen_for_bedrock_blocks_loop_handle.abort(); let Some(handle) = server_handle else { return; }; if let Err(err) = handle.stop() { error!("An error occurred while stopping Sequencer RPC server: {err}"); } } } pub async fn run(config: SequencerConfig, port: u16) -> Result { let block_timeout = config.block_create_timeout; let retry_pending_blocks_timeout = config.retry_pending_blocks_timeout; let max_block_size = config.max_block_size; let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(config).await; info!("Sequencer core set up"); let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core)); let (server_handle, addr) = run_server( Arc::clone(&seq_core_wrapped), mempool_handle, port, max_block_size.as_u64(), ) .await?; info!("RPC server started"); #[cfg(not(feature = "standalone"))] { info!("Submitting stored pending blocks"); retry_pending_blocks(&seq_core_wrapped) .await .expect("Failed to submit pending blocks on startup"); } info!("Starting main sequencer loop"); let main_loop_handle = tokio::spawn(main_loop(Arc::clone(&seq_core_wrapped), block_timeout)); info!("Starting pending block retry loop"); let retry_pending_blocks_loop_handle = tokio::spawn(retry_pending_blocks_loop( Arc::clone(&seq_core_wrapped), retry_pending_blocks_timeout, )); info!("Starting bedrock block listening loop"); let listen_for_bedrock_blocks_loop_handle = tokio::spawn(listen_for_bedrock_blocks_loop(seq_core_wrapped)); Ok(SequencerHandle::new( addr, server_handle, main_loop_handle, retry_pending_blocks_loop_handle, listen_for_bedrock_blocks_loop_handle, )) } async fn run_server( sequencer: Arc>, mempool_handle: MemPoolHandle, port: u16, max_block_size: u64, ) -> Result<(ServerHandle, SocketAddr)> { let server = jsonrpsee::server::ServerBuilder::with_config( jsonrpsee::server::ServerConfigBuilder::new() .max_request_body_size( u32::try_from(REQUEST_BODY_MAX_SIZE.as_u64()) .expect("REQUEST_BODY_MAX_SIZE should be less than u32::MAX"), ) .build(), ) .build(SocketAddr::from(([0, 0, 0, 0], port))) .await .context("Failed to build RPC server")?; let addr = server .local_addr() .context("Failed to get local address of RPC server")?; info!("Starting Sequencer Service RPC server on {addr}"); let service = service::SequencerService::new(sequencer, mempool_handle, max_block_size); let handle = server.start(service.into_rpc()); Ok((handle, addr)) } async fn main_loop(seq_core: Arc>, block_timeout: Duration) -> Result { loop { tokio::time::sleep(block_timeout).await; info!("Collecting transactions from mempool, block creation"); let id = { let mut state = seq_core.lock().await; state.produce_new_block().await? }; info!("Block with id {id} created"); info!("Waiting for new transactions"); } } #[cfg(not(feature = "standalone"))] async fn retry_pending_blocks(seq_core: &Arc>) -> Result<()> { use std::time::Instant; use log::debug; let (mut pending_blocks, block_settlement_client) = { let sequencer_core = seq_core.lock().await; let client = sequencer_core.block_settlement_client(); let pending_blocks = sequencer_core .get_pending_blocks() .expect("Sequencer should be able to retrieve pending blocks"); (pending_blocks, client) }; pending_blocks.sort_by(|block1, block2| block1.header.block_id.cmp(&block2.header.block_id)); if !pending_blocks.is_empty() { info!( "Resubmitting blocks from {} to {}", pending_blocks.first().unwrap().header.block_id, pending_blocks.last().unwrap().header.block_id ); } for block in &pending_blocks { debug!( "Resubmitting pending block with id {}", block.header.block_id ); // TODO: We could cache the inscribe tx for each pending block to avoid re-creating it // on every retry. let now = Instant::now(); let (tx, _msg_id) = block_settlement_client .create_inscribe_tx(block) .context("Failed to create inscribe tx for pending block")?; debug!("Create inscribe: {:?}", now.elapsed()); let now = Instant::now(); if let Err(e) = block_settlement_client .submit_inscribe_tx_to_bedrock(tx) .await { warn!( "Failed to resubmit block with id {} with error {e:#}", block.header.block_id ); } debug!("Post: {:?}", now.elapsed()); } Ok(()) } #[cfg(not(feature = "standalone"))] async fn retry_pending_blocks_loop( seq_core: Arc>, retry_pending_blocks_timeout: Duration, ) -> Result { loop { tokio::time::sleep(retry_pending_blocks_timeout).await; retry_pending_blocks(&seq_core).await?; } } #[cfg(not(feature = "standalone"))] async fn listen_for_bedrock_blocks_loop(seq_core: Arc>) -> Result { use indexer_service_rpc::RpcClient as _; let indexer_client = seq_core.lock().await.indexer_client(); let retry_delay = Duration::from_secs(5); loop { // TODO: Subscribe from the first pending block ID? let mut subscription = indexer_client .subscribe_to_finalized_blocks() .await .context("Failed to subscribe to finalized blocks")?; while let Some(block_id) = subscription.next().await { let block_id = block_id.context("Failed to get next block from subscription")?; info!("Received new L2 block with ID {block_id}"); seq_core .lock() .await .clean_finalized_blocks_from_db(block_id) .with_context(|| { format!("Failed to clean finalized blocks from DB for block ID {block_id}") })?; } warn!( "Block subscription closed unexpectedly, reason: {:?}, retrying after {retry_delay:?}", subscription.close_reason() ); tokio::time::sleep(retry_delay).await; } } #[cfg(feature = "standalone")] async fn listen_for_bedrock_blocks_loop(_seq_core: Arc>) -> Result { std::future::pending::>().await } #[cfg(feature = "standalone")] async fn retry_pending_blocks_loop( _seq_core: Arc>, _retry_pending_blocks_timeout: Duration, ) -> Result { std::future::pending::>().await }