mirror of
https://github.com/logos-blockchain/logos-execution-zone.git
synced 2026-05-25 17:39:27 +00:00
364 lines
12 KiB
Rust
364 lines
12 KiB
Rust
#[cfg(not(feature = "standalone"))]
|
|
use std::collections::HashSet;
|
|
use std::{net::SocketAddr, sync::Arc, time::Duration};
|
|
|
|
use anyhow::{Context as _, Result, anyhow};
|
|
#[cfg(not(feature = "standalone"))]
|
|
use borsh::BorshDeserialize;
|
|
use bytesize::ByteSize;
|
|
use common::transaction::NSSATransaction;
|
|
#[cfg(not(feature = "standalone"))]
|
|
use futures::StreamExt as _;
|
|
use futures::never::Never;
|
|
use jsonrpsee::server::ServerHandle;
|
|
#[cfg(not(feature = "standalone"))]
|
|
use log::warn;
|
|
use log::{error, info};
|
|
#[cfg(not(feature = "standalone"))]
|
|
use logos_blockchain_core::mantle::ops::channel::MsgId;
|
|
#[cfg(not(feature = "standalone"))]
|
|
use logos_blockchain_zone_sdk::{
|
|
CommonHttpClient, Slot, ZoneMessage, adapter::NodeHttpClient, indexer::ZoneIndexer,
|
|
};
|
|
use mempool::MemPoolHandle;
|
|
#[cfg(not(feature = "standalone"))]
|
|
use sequencer_core::SequencerCore;
|
|
#[cfg(feature = "standalone")]
|
|
use sequencer_core::SequencerCoreWithMockClients as SequencerCore;
|
|
pub use sequencer_core::config::*;
|
|
use sequencer_service_rpc::RpcServer as _;
|
|
use tokio::{sync::Mutex, task::JoinHandle};
|
|
|
|
pub mod service;
|
|
|
|
const REQUEST_BODY_MAX_SIZE: ByteSize = ByteSize::mib(10);
|
|
|
|
#[cfg(not(feature = "standalone"))]
|
|
#[derive(Clone, Debug, BorshDeserialize)]
|
|
struct DepositMetadata {
|
|
recipient_id: nssa::AccountId,
|
|
}
|
|
|
|
#[cfg(not(feature = "standalone"))]
|
|
impl DepositMetadata {
|
|
fn decode(bytes: &[u8]) -> Result<Self, std::io::Error> {
|
|
Self::try_from_slice(bytes)
|
|
}
|
|
}
|
|
|
|
/// Handle to manage the sequencer and its tasks.
|
|
///
|
|
/// Implements `Drop` to ensure all tasks are aborted and the RPC server is stopped when dropped.
|
|
pub struct SequencerHandle {
|
|
addr: SocketAddr,
|
|
/// Option because of `Drop` which forbids to simply move out of `self` in `stopped()`.
|
|
server_handle: Option<ServerHandle>,
|
|
main_loop_handle: JoinHandle<Result<Never>>,
|
|
bedrock_deposit_loop_handle: Option<JoinHandle<Result<Never>>>,
|
|
}
|
|
|
|
impl SequencerHandle {
|
|
const fn new(
|
|
addr: SocketAddr,
|
|
server_handle: ServerHandle,
|
|
main_loop_handle: JoinHandle<Result<Never>>,
|
|
bedrock_deposit_loop_handle: Option<JoinHandle<Result<Never>>>,
|
|
) -> Self {
|
|
Self {
|
|
addr,
|
|
server_handle: Some(server_handle),
|
|
main_loop_handle,
|
|
bedrock_deposit_loop_handle,
|
|
}
|
|
}
|
|
|
|
/// Wait for any of the sequencer tasks to fail and return the error.
|
|
#[expect(
|
|
clippy::integer_division_remainder_used,
|
|
reason = "Generated by select! macro, can't be easily rewritten to avoid this lint"
|
|
)]
|
|
pub async fn failed(mut self) -> Result<Never> {
|
|
let Self {
|
|
addr: _,
|
|
server_handle,
|
|
main_loop_handle,
|
|
bedrock_deposit_loop_handle,
|
|
} = &mut self;
|
|
|
|
let server_handle = server_handle.take().expect("Server handle is set");
|
|
let deposit_opt_fut =
|
|
futures::future::OptionFuture::from(bedrock_deposit_loop_handle.take());
|
|
tokio::select! {
|
|
() = server_handle.stopped() => {
|
|
Err(anyhow!("RPC Server stopped"))
|
|
}
|
|
res = main_loop_handle => {
|
|
res
|
|
.context("Main loop task panicked")?
|
|
.context("Main loop exited unexpectedly")
|
|
}
|
|
Some(res) = deposit_opt_fut => {
|
|
res
|
|
.context("Bedrock deposit loop task panicked")?
|
|
.context("Bedrock deposit loop exited unexpectedly")
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Check if all Sequencer tasks are still running.
|
|
///
|
|
/// Return `false` if any of the tasks has failed and `true` otherwise.
|
|
/// Error of the failed task can be retrieved by awaiting on [`Self::failed()`].
|
|
#[must_use]
|
|
pub fn is_healthy(&self) -> bool {
|
|
let Self {
|
|
addr: _,
|
|
server_handle,
|
|
main_loop_handle,
|
|
bedrock_deposit_loop_handle,
|
|
} = self;
|
|
|
|
let stopped = server_handle.as_ref().is_none_or(ServerHandle::is_stopped)
|
|
|| main_loop_handle.is_finished()
|
|
|| bedrock_deposit_loop_handle
|
|
.as_ref()
|
|
.is_some_and(JoinHandle::is_finished);
|
|
!stopped
|
|
}
|
|
|
|
#[must_use]
|
|
pub const fn addr(&self) -> SocketAddr {
|
|
self.addr
|
|
}
|
|
}
|
|
|
|
impl Drop for SequencerHandle {
|
|
fn drop(&mut self) {
|
|
let Self {
|
|
addr: _,
|
|
server_handle,
|
|
main_loop_handle,
|
|
bedrock_deposit_loop_handle,
|
|
} = self;
|
|
|
|
main_loop_handle.abort();
|
|
if let Some(handle) = bedrock_deposit_loop_handle {
|
|
handle.abort();
|
|
}
|
|
|
|
let Some(handle) = server_handle else {
|
|
return;
|
|
};
|
|
|
|
if let Err(err) = handle.stop() {
|
|
error!("An error occurred while stopping Sequencer RPC server: {err}");
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn run(config: SequencerConfig, port: u16) -> Result<SequencerHandle> {
|
|
let block_timeout = config.block_create_timeout;
|
|
let max_block_size = config.max_block_size;
|
|
#[cfg(not(feature = "standalone"))]
|
|
let bedrock_config = config.bedrock_config.clone();
|
|
|
|
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(config).await;
|
|
|
|
info!("Sequencer core set up");
|
|
|
|
let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core));
|
|
let mempool_handle_for_server = mempool_handle.clone();
|
|
|
|
let (server_handle, addr) = run_server(
|
|
Arc::clone(&seq_core_wrapped),
|
|
mempool_handle_for_server,
|
|
port,
|
|
max_block_size.as_u64(),
|
|
)
|
|
.await?;
|
|
info!("RPC server started");
|
|
|
|
info!("Starting main sequencer loop");
|
|
let main_loop_handle = tokio::spawn(main_loop(seq_core_wrapped, block_timeout));
|
|
|
|
#[cfg(not(feature = "standalone"))]
|
|
let bedrock_deposit_loop_handle = {
|
|
info!("Starting Bedrock deposit listener loop");
|
|
Some(tokio::spawn(bedrock_deposit_loop(
|
|
bedrock_config,
|
|
mempool_handle,
|
|
)))
|
|
};
|
|
#[cfg(feature = "standalone")]
|
|
let bedrock_deposit_loop_handle = {
|
|
let _ = mempool_handle;
|
|
None
|
|
};
|
|
|
|
Ok(SequencerHandle::new(
|
|
addr,
|
|
server_handle,
|
|
main_loop_handle,
|
|
bedrock_deposit_loop_handle,
|
|
))
|
|
}
|
|
|
|
async fn run_server(
|
|
sequencer: Arc<Mutex<SequencerCore>>,
|
|
mempool_handle: MemPoolHandle<NSSATransaction>,
|
|
port: u16,
|
|
max_block_size: u64,
|
|
) -> Result<(ServerHandle, SocketAddr)> {
|
|
let server = jsonrpsee::server::ServerBuilder::with_config(
|
|
jsonrpsee::server::ServerConfigBuilder::new()
|
|
.max_request_body_size(
|
|
u32::try_from(REQUEST_BODY_MAX_SIZE.as_u64())
|
|
.expect("REQUEST_BODY_MAX_SIZE should be less than u32::MAX"),
|
|
)
|
|
.build(),
|
|
)
|
|
.build(SocketAddr::from(([0, 0, 0, 0], port)))
|
|
.await
|
|
.context("Failed to build RPC server")?;
|
|
|
|
let addr = server
|
|
.local_addr()
|
|
.context("Failed to get local address of RPC server")?;
|
|
|
|
info!("Starting Sequencer Service RPC server on {addr}");
|
|
|
|
let service = service::SequencerService::new(sequencer, mempool_handle, max_block_size);
|
|
let handle = server.start(service.into_rpc());
|
|
Ok((handle, addr))
|
|
}
|
|
|
|
async fn main_loop(seq_core: Arc<Mutex<SequencerCore>>, block_timeout: Duration) -> Result<Never> {
|
|
loop {
|
|
tokio::time::sleep(block_timeout).await;
|
|
|
|
info!("Collecting transactions from mempool, block creation");
|
|
|
|
let id = {
|
|
let mut state = seq_core.lock().await;
|
|
|
|
state.produce_new_block().await?
|
|
};
|
|
|
|
info!("Block with id {id} created");
|
|
|
|
info!("Waiting for new transactions");
|
|
}
|
|
}
|
|
|
|
#[cfg(not(feature = "standalone"))]
|
|
async fn bedrock_deposit_loop(
|
|
bedrock_config: BedrockConfig,
|
|
mempool_handle: MemPoolHandle<NSSATransaction>,
|
|
) -> Result<Never> {
|
|
let basic_auth = bedrock_config.auth.map(Into::into);
|
|
let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), bedrock_config.node_url);
|
|
let zone_indexer = ZoneIndexer::new(bedrock_config.channel_id, node);
|
|
|
|
let mut cursor: Option<(MsgId, Slot)> = None;
|
|
let poll_interval = Duration::from_secs(1);
|
|
|
|
/// Mirrors [`logos_blockchain_zone_sdk::Deposit`] but can be stored in a [`HashSet`].
|
|
#[derive(Debug, PartialEq, Eq, Hash)]
|
|
struct HashableDeposit {
|
|
inputs: logos_blockchain_core::mantle::ledger::Inputs,
|
|
metadata: Vec<u8>,
|
|
}
|
|
impl From<logos_blockchain_zone_sdk::Deposit> for HashableDeposit {
|
|
fn from(deposit: logos_blockchain_zone_sdk::Deposit) -> Self {
|
|
let logos_blockchain_zone_sdk::Deposit { inputs, metadata } = deposit;
|
|
Self { inputs, metadata }
|
|
}
|
|
}
|
|
// TODO: We should probably store this in DB.
|
|
let mut seen_deposits = HashSet::new();
|
|
|
|
loop {
|
|
let stream = match zone_indexer.next_messages(cursor).await {
|
|
Ok(stream) => stream,
|
|
Err(err) => {
|
|
error!("Failed to start Bedrock deposit stream: {err}");
|
|
tokio::time::sleep(poll_interval).await;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let mut stream = std::pin::pin!(stream);
|
|
while let Some((msg, slot)) = stream.next().await {
|
|
match msg {
|
|
ZoneMessage::Block(block) => {
|
|
cursor = Some((block.id, slot));
|
|
info!("Observed Bedrock channel block id {:?}", block.id);
|
|
}
|
|
ZoneMessage::Deposit(deposit) => {
|
|
let hashable_deposit = HashableDeposit::from(deposit.clone());
|
|
if !seen_deposits.insert(hashable_deposit) {
|
|
continue;
|
|
}
|
|
let metadata = match DepositMetadata::decode(&deposit.metadata) {
|
|
Ok(metadata) => metadata,
|
|
Err(err) => {
|
|
warn!("Skipping Bedrock Deposit with invalid metadata: {err}");
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let tx = match build_bridge_deposit_tx(&metadata) {
|
|
Ok(tx) => tx,
|
|
Err(err) => {
|
|
warn!("Skipping Bedrock Deposit due to tx build failure: {err:#}");
|
|
continue;
|
|
}
|
|
};
|
|
|
|
info!(
|
|
"Observed Bedrock Deposit for recipient {recipient_id}, pushing to mempool",
|
|
recipient_id = metadata.recipient_id
|
|
);
|
|
mempool_handle
|
|
.push(tx)
|
|
.await
|
|
.context("Mempool is closed while pushing Bedrock Deposit transaction")?;
|
|
}
|
|
ZoneMessage::Withdraw(_) => {}
|
|
}
|
|
}
|
|
|
|
tokio::time::sleep(poll_interval).await;
|
|
}
|
|
}
|
|
|
|
#[cfg(not(feature = "standalone"))]
|
|
fn build_bridge_deposit_tx(metadata: &DepositMetadata) -> Result<NSSATransaction> {
|
|
// TODO: Remove this constant once we have a way to get the deposit amount from Bedrock deposit
|
|
// inputs.
|
|
const TEMPORARY_BRIDGE_DEPOSIT_AMOUNT: u128 = 1;
|
|
|
|
let bridge_program_id = nssa::program::Program::bridge().id();
|
|
let vault_program_id = nssa::program::Program::vault().id();
|
|
let recipient_vault_id =
|
|
vault_core::compute_vault_account_id(vault_program_id, metadata.recipient_id);
|
|
|
|
let message = nssa::public_transaction::Message::try_new(
|
|
bridge_program_id,
|
|
vec![nssa::system_bridge_account_id(), recipient_vault_id],
|
|
vec![],
|
|
bridge_core::Instruction::Deposit {
|
|
vault_program_id,
|
|
recipient_id: metadata.recipient_id,
|
|
amount: TEMPORARY_BRIDGE_DEPOSIT_AMOUNT,
|
|
},
|
|
)
|
|
.context("Failed to build bridge deposit message")?;
|
|
|
|
let witness_set = nssa::public_transaction::WitnessSet::from_raw_parts(vec![]);
|
|
Ok(NSSATransaction::Public(nssa::PublicTransaction::new(
|
|
message,
|
|
witness_set,
|
|
)))
|
|
}
|