use std::{net::SocketAddr, sync::Arc, time::Duration}; use anyhow::{Context as _, Result, anyhow}; use bytesize::ByteSize; use common::transaction::NSSATransaction; use futures::never::Never; use jsonrpsee::server::ServerHandle; use log::{error, info}; use mempool::MemPoolHandle; #[cfg(not(feature = "standalone"))] use sequencer_core::SequencerCore; #[cfg(feature = "standalone")] use sequencer_core::SequencerCoreWithMockClients as SequencerCore; pub use sequencer_core::config::*; use sequencer_service_rpc::RpcServer as _; use tokio::{sync::Mutex, task::JoinHandle}; pub mod service; const REQUEST_BODY_MAX_SIZE: ByteSize = ByteSize::mib(10); /// Handle to manage the sequencer and its tasks. /// /// Implements `Drop` to ensure all tasks are aborted and the RPC server is stopped when dropped. pub struct SequencerHandle { addr: SocketAddr, /// Option because of `Drop` which forbids to simply move out of `self` in `stopped()`. server_handle: Option, main_loop_handle: JoinHandle>, } impl SequencerHandle { const fn new( addr: SocketAddr, server_handle: ServerHandle, main_loop_handle: JoinHandle>, ) -> Self { Self { addr, server_handle: Some(server_handle), main_loop_handle, } } /// Wait for any of the sequencer tasks to fail and return the error. #[expect( clippy::integer_division_remainder_used, reason = "Generated by select! macro, can't be easily rewritten to avoid this lint" )] pub async fn failed(mut self) -> Result { let Self { addr: _, server_handle, main_loop_handle, } = &mut self; let server_handle = server_handle.take().expect("Server handle is set"); tokio::select! { () = server_handle.stopped() => { Err(anyhow!("RPC Server stopped")) } res = main_loop_handle => { res .context("Main loop task panicked")? .context("Main loop exited unexpectedly") } } } /// Check if all Sequencer tasks are still running. /// /// Return `false` if any of the tasks has failed and `true` otherwise. /// Error of the failed task can be retrieved by awaiting on [`Self::failed()`]. #[must_use] pub fn is_healthy(&self) -> bool { let Self { addr: _, server_handle, main_loop_handle, } = self; let stopped = server_handle.as_ref().is_none_or(ServerHandle::is_stopped) || main_loop_handle.is_finished(); !stopped } #[must_use] pub const fn addr(&self) -> SocketAddr { self.addr } } impl Drop for SequencerHandle { fn drop(&mut self) { let Self { addr: _, server_handle, main_loop_handle, } = self; main_loop_handle.abort(); let Some(handle) = server_handle else { return; }; if let Err(err) = handle.stop() { error!("An error occurred while stopping Sequencer RPC server: {err}"); } } } pub async fn run(config: SequencerConfig, port: u16) -> Result { let block_timeout = config.block_create_timeout; let max_block_size = config.max_block_size; let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(config).await; info!("Sequencer core set up"); let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core)); let (server_handle, addr) = run_server( Arc::clone(&seq_core_wrapped), mempool_handle, port, max_block_size.as_u64(), ) .await?; info!("RPC server started"); info!("Starting main sequencer loop"); let main_loop_handle = tokio::spawn(main_loop(seq_core_wrapped, block_timeout)); Ok(SequencerHandle::new(addr, server_handle, main_loop_handle)) } async fn run_server( sequencer: Arc>, mempool_handle: MemPoolHandle, port: u16, max_block_size: u64, ) -> Result<(ServerHandle, SocketAddr)> { let server = jsonrpsee::server::ServerBuilder::with_config( jsonrpsee::server::ServerConfigBuilder::new() .max_request_body_size( u32::try_from(REQUEST_BODY_MAX_SIZE.as_u64()) .expect("REQUEST_BODY_MAX_SIZE should be less than u32::MAX"), ) .build(), ) .build(SocketAddr::from(([0, 0, 0, 0], port))) .await .context("Failed to build RPC server")?; let addr = server .local_addr() .context("Failed to get local address of RPC server")?; info!("Starting Sequencer Service RPC server on {addr}"); let service = service::SequencerService::new(sequencer, mempool_handle, max_block_size); let handle = server.start(service.into_rpc()); Ok((handle, addr)) } async fn main_loop(seq_core: Arc>, block_timeout: Duration) -> Result { loop { tokio::time::sleep(block_timeout).await; info!("Collecting transactions from mempool, block creation"); let id = { let mut state = seq_core.lock().await; state.produce_new_block().await? }; info!("Block with id {id} created"); info!("Waiting for new transactions"); } }