use std::{net::SocketAddr, sync::Arc, time::Duration}; use anyhow::{Context as _, Result, anyhow}; #[cfg(not(feature = "standalone"))] use borsh::BorshDeserialize; use bytesize::ByteSize; use common::transaction::NSSATransaction; #[cfg(not(feature = "standalone"))] use futures::StreamExt as _; use futures::never::Never; use jsonrpsee::server::ServerHandle; #[cfg(not(feature = "standalone"))] use log::warn; use log::{error, info}; #[cfg(not(feature = "standalone"))] use logos_blockchain_core::mantle::ops::channel::MsgId; #[cfg(not(feature = "standalone"))] use logos_blockchain_zone_sdk::{ CommonHttpClient, Slot, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer, }; use mempool::MemPoolHandle; #[cfg(not(feature = "standalone"))] use sequencer_core::SequencerCore; #[cfg(feature = "standalone")] use sequencer_core::SequencerCoreWithMockClients as SequencerCore; pub use sequencer_core::config::*; use sequencer_service_rpc::RpcServer as _; use tokio::{sync::Mutex, task::JoinHandle}; pub mod service; const REQUEST_BODY_MAX_SIZE: ByteSize = ByteSize::mib(10); #[cfg(not(feature = "standalone"))] #[derive(Clone, Debug, BorshDeserialize)] struct DepositMetadata { recipient_id: nssa::AccountId, } #[cfg(not(feature = "standalone"))] impl DepositMetadata { fn decode(bytes: &[u8]) -> Result { Self::try_from_slice(bytes) } } /// Handle to manage the sequencer and its tasks. /// /// Implements `Drop` to ensure all tasks are aborted and the RPC server is stopped when dropped. pub struct SequencerHandle { addr: SocketAddr, /// Option because of `Drop` which forbids to simply move out of `self` in `stopped()`. server_handle: Option, main_loop_handle: JoinHandle>, bedrock_deposit_loop_handle: Option>>, } impl SequencerHandle { const fn new( addr: SocketAddr, server_handle: ServerHandle, main_loop_handle: JoinHandle>, bedrock_deposit_loop_handle: Option>>, ) -> Self { Self { addr, server_handle: Some(server_handle), main_loop_handle, bedrock_deposit_loop_handle, } } /// Wait for any of the sequencer tasks to fail and return the error. #[expect( clippy::integer_division_remainder_used, reason = "Generated by select! macro, can't be easily rewritten to avoid this lint" )] pub async fn failed(mut self) -> Result { let Self { addr: _, server_handle, main_loop_handle, bedrock_deposit_loop_handle, } = &mut self; let server_handle = server_handle.take().expect("Server handle is set"); let deposit_opt_fut = futures::future::OptionFuture::from(bedrock_deposit_loop_handle.take()); tokio::select! { () = server_handle.stopped() => { Err(anyhow!("RPC Server stopped")) } res = main_loop_handle => { res .context("Main loop task panicked")? .context("Main loop exited unexpectedly") } Some(res) = deposit_opt_fut => { res .context("Bedrock deposit loop task panicked")? .context("Bedrock deposit loop exited unexpectedly") } } } /// Check if all Sequencer tasks are still running. /// /// Return `false` if any of the tasks has failed and `true` otherwise. /// Error of the failed task can be retrieved by awaiting on [`Self::failed()`]. #[must_use] pub fn is_healthy(&self) -> bool { let Self { addr: _, server_handle, main_loop_handle, bedrock_deposit_loop_handle, } = self; let stopped = server_handle.as_ref().is_none_or(ServerHandle::is_stopped) || main_loop_handle.is_finished() || bedrock_deposit_loop_handle .as_ref() .is_some_and(JoinHandle::is_finished); !stopped } #[must_use] pub const fn addr(&self) -> SocketAddr { self.addr } } impl Drop for SequencerHandle { fn drop(&mut self) { let Self { addr: _, server_handle, main_loop_handle, bedrock_deposit_loop_handle, } = self; main_loop_handle.abort(); if let Some(handle) = bedrock_deposit_loop_handle { handle.abort(); } let Some(handle) = server_handle else { return; }; if let Err(err) = handle.stop() { error!("An error occurred while stopping Sequencer RPC server: {err}"); } } } pub async fn run(config: SequencerConfig, port: u16) -> Result { let block_timeout = config.block_create_timeout; let max_block_size = config.max_block_size; #[cfg(not(feature = "standalone"))] let bedrock_config = config.bedrock_config.clone(); let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(config).await; info!("Sequencer core set up"); let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core)); let mempool_handle_for_server = mempool_handle.clone(); let (server_handle, addr) = run_server( Arc::clone(&seq_core_wrapped), mempool_handle_for_server, port, max_block_size.as_u64(), ) .await?; info!("RPC server started"); info!("Starting main sequencer loop"); let main_loop_handle = tokio::spawn(main_loop(seq_core_wrapped, block_timeout)); #[cfg(not(feature = "standalone"))] let bedrock_deposit_loop_handle = { info!("Starting Bedrock deposit listener loop"); Some(tokio::spawn(bedrock_deposit_loop( bedrock_config, mempool_handle, ))) }; #[cfg(feature = "standalone")] let bedrock_deposit_loop_handle = { let _ = mempool_handle; None }; Ok(SequencerHandle::new( addr, server_handle, main_loop_handle, bedrock_deposit_loop_handle, )) } async fn run_server( sequencer: Arc>, mempool_handle: MemPoolHandle, port: u16, max_block_size: u64, ) -> Result<(ServerHandle, SocketAddr)> { let server = jsonrpsee::server::ServerBuilder::with_config( jsonrpsee::server::ServerConfigBuilder::new() .max_request_body_size( u32::try_from(REQUEST_BODY_MAX_SIZE.as_u64()) .expect("REQUEST_BODY_MAX_SIZE should be less than u32::MAX"), ) .build(), ) .build(SocketAddr::from(([0, 0, 0, 0], port))) .await .context("Failed to build RPC server")?; let addr = server .local_addr() .context("Failed to get local address of RPC server")?; info!("Starting Sequencer Service RPC server on {addr}"); let service = service::SequencerService::new(sequencer, mempool_handle, max_block_size); let handle = server.start(service.into_rpc()); Ok((handle, addr)) } async fn main_loop(seq_core: Arc>, block_timeout: Duration) -> Result { loop { tokio::time::sleep(block_timeout).await; info!("Collecting transactions from mempool, block creation"); let id = { let mut state = seq_core.lock().await; state.produce_new_block().await? }; info!("Block with id {id} created"); info!("Waiting for new transactions"); } } #[cfg(not(feature = "standalone"))] async fn bedrock_deposit_loop( bedrock_config: BedrockConfig, mempool_handle: MemPoolHandle, ) -> Result { let basic_auth = bedrock_config.auth.map(Into::into); let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), bedrock_config.node_url); let zone_indexer = ZoneIndexer::new(bedrock_config.channel_id, node); let mut cursor: Option<(MsgId, Slot)> = None; let poll_interval = Duration::from_secs(1); loop { let stream = match zone_indexer.next_messages(cursor).await { Ok(stream) => stream, Err(err) => { error!("Failed to start Bedrock deposit stream: {err}"); tokio::time::sleep(poll_interval).await; continue; } }; let mut stream = std::pin::pin!(stream); while let Some((msg, slot)) = stream.next().await { match msg { ZoneMessage::Block(block) => { cursor = Some((block.id, slot)); info!("Observed Bedrock channel block id {:?}", block.id); } ZoneMessage::Deposit(deposit) => { let metadata = match DepositMetadata::decode(&deposit.metadata) { Ok(metadata) => metadata, Err(err) => { warn!("Skipping Bedrock Deposit with invalid metadata: {err}"); continue; } }; let tx = match build_bridge_deposit_tx(&metadata) { Ok(tx) => tx, Err(err) => { warn!("Skipping Bedrock Deposit due to tx build failure: {err:#}"); continue; } }; info!( "Observed Bedrock Deposit for recipient {recipient_id}, pushing to mempool", recipient_id = metadata.recipient_id ); mempool_handle .push(tx) .await .context("Mempool is closed while pushing Bedrock Deposit transaction")?; } ZoneMessage::Withdraw(_) => {} } } tokio::time::sleep(poll_interval).await; } } #[cfg(not(feature = "standalone"))] fn build_bridge_deposit_tx(metadata: &DepositMetadata) -> Result { // TODO: Remove this constant once we have a way to get the deposit amount from Bedrock deposit // inputs. const TEMPORARY_BRIDGE_DEPOSIT_AMOUNT: u128 = 1; let bridge_program_id = nssa::program::Program::bridge().id(); let vault_program_id = nssa::program::Program::vault().id(); let recipient_vault_id = vault_core::compute_vault_account_id(vault_program_id, metadata.recipient_id); let message = nssa::public_transaction::Message::try_new( bridge_program_id, vec![nssa::system_bridge_account_id(), recipient_vault_id], vec![], bridge_core::Instruction::Deposit { vault_program_id, recipient_id: metadata.recipient_id, amount: TEMPORARY_BRIDGE_DEPOSIT_AMOUNT, }, ) .context("Failed to build bridge deposit message")?; let witness_set = nssa::public_transaction::WitnessSet::from_raw_parts(vec![]); Ok(NSSATransaction::Public(nssa::PublicTransaction::new( message, witness_set, ))) }