diff --git a/bedrock_client/src/lib.rs b/bedrock_client/src/lib.rs index 530fdfc2..b16204c9 100644 --- a/bedrock_client/src/lib.rs +++ b/bedrock_client/src/lib.rs @@ -5,6 +5,7 @@ use reqwest::{Client, Url}; // Simple wrapper // maybe extend in the future for our purposes +#[derive(Clone)] pub struct BedrockClient { http_client: CommonHttpClient, node_url: Url, diff --git a/integration_tests/configs/sequencer/sequencer_config.json b/integration_tests/configs/sequencer/sequencer_config.json index 575d3de3..5c642d37 100644 --- a/integration_tests/configs/sequencer/sequencer_config.json +++ b/integration_tests/configs/sequencer/sequencer_config.json @@ -6,6 +6,7 @@ "max_num_tx_in_block": 20, "mempool_max_size": 10000, "block_create_timeout_millis": 10000, + "retry_pending_blocks_timeout_millis": 240000, "port": 0, "initial_accounts": [ { diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 12d718ec..5cb7233e 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -113,7 +113,7 @@ impl TestContext { // Setting port to 0 lets the OS choose a free port for us config.port = 0; - let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle) = + let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle, _) = sequencer_runner::startup_sequencer(config).await?; Ok(( diff --git a/integration_tests/tests/tps.rs b/integration_tests/tests/tps.rs index 3fdc8ac8..73b823cf 100644 --- a/integration_tests/tests/tps.rs +++ b/integration_tests/tests/tps.rs @@ -186,6 +186,7 @@ impl TpsTestManager { initial_commitments: vec![initial_commitment], signing_key: [37; 32], bedrock_config: None, + retry_pending_blocks_timeout_millis: 1000 * 60 * 4, } } } diff --git a/sequencer_core/src/block_settlement_client.rs b/sequencer_core/src/block_settlement_client.rs index 13c45b9a..e53c02e6 100644 --- a/sequencer_core/src/block_settlement_client.rs +++ b/sequencer_core/src/block_settlement_client.rs @@ -2,7 +2,7 @@ use std::{fs, path::Path}; use anyhow::{Result, anyhow}; use bedrock_client::BedrockClient; -use common::block::{Block, HashableBlockData}; +use common::block::{BedrockStatus, Block, HashableBlockData}; use logos_blockchain_core::mantle::{ MantleTx, Op, OpProof, SignedMantleTx, Transaction, TxHash, ledger, ops::channel::{ChannelId, MsgId, inscribe::InscriptionOp}, @@ -14,11 +14,11 @@ use logos_blockchain_key_management_system_service::keys::{ use crate::config::BedrockConfig; /// A component that posts block data to logos blockchain +#[derive(Clone)] pub struct BlockSettlementClient { bedrock_client: BedrockClient, bedrock_signing_key: Ed25519Key, bedrock_channel_id: ChannelId, - last_message_id: MsgId, } impl BlockSettlementClient { @@ -26,23 +26,13 @@ impl BlockSettlementClient { let bedrock_signing_key = load_or_create_signing_key(&home.join("bedrock_signing_key"))?; let bedrock_channel_id = ChannelId::from(config.channel_id); let bedrock_client = BedrockClient::new(None, config.node_url.clone())?; - let channel_genesis_msg = MsgId::from([0; 32]); Ok(Self { bedrock_client, bedrock_signing_key, bedrock_channel_id, - last_message_id: channel_genesis_msg, }) } - pub fn set_last_message_id(&mut self, msg_id: MsgId) { - self.last_message_id = msg_id; - } - - pub fn last_message_id(&self) -> MsgId { - self.last_message_id - } - /// Create and sign a transaction for inscribing data pub fn create_inscribe_tx(&self, block: &Block) -> Result<(SignedMantleTx, MsgId)> { let inscription_data = borsh::to_vec(block)?; diff --git a/sequencer_core/src/block_store.rs b/sequencer_core/src/block_store.rs index f81fb121..7f3c1140 100644 --- a/sequencer_core/src/block_store.rs +++ b/sequencer_core/src/block_store.rs @@ -80,7 +80,7 @@ impl SequencerStore { &self.signing_key } - pub(crate) fn get_pending_blocks(&self) -> impl Iterator> { + pub fn get_pending_blocks(&self) -> impl Iterator> { self.dbio.get_all_blocks().map(|res| Ok(res?)) } diff --git a/sequencer_core/src/config.rs b/sequencer_core/src/config.rs index 5911cc52..74460931 100644 --- a/sequencer_core/src/config.rs +++ b/sequencer_core/src/config.rs @@ -40,6 +40,8 @@ pub struct SequencerConfig { pub mempool_max_size: usize, /// Interval in which blocks produced pub block_create_timeout_millis: u64, + /// Interval in which pending blocks are retried + pub retry_pending_blocks_timeout_millis: u64, /// Port to listen pub port: u16, /// List of initial accounts data diff --git a/sequencer_core/src/lib.rs b/sequencer_core/src/lib.rs index b542c474..1df91f0f 100644 --- a/sequencer_core/src/lib.rs +++ b/sequencer_core/src/lib.rs @@ -27,6 +27,7 @@ pub struct SequencerCore { sequencer_config: SequencerConfig, chain_height: u64, block_settlement_client: Option, + last_bedrock_msg_id: MsgId, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -96,6 +97,7 @@ impl SequencerCore { .expect("Block settlement client should be constructible") }); + let channel_genesis_msg_id = MsgId::from([0; 32]); let mut this = Self { state, store: block_store, @@ -103,6 +105,7 @@ impl SequencerCore { chain_height: config.genesis_id, sequencer_config: config, block_settlement_client, + last_bedrock_msg_id: channel_genesis_msg_id, }; this.sync_state_with_stored_blocks(); @@ -151,10 +154,10 @@ impl SequencerCore { let block_data = self.produce_new_block_with_mempool_transactions()?; if let Some(client) = self.block_settlement_client.as_mut() { - let last_message_id = client.last_message_id(); - let block = block_data.into_pending_block(self.store.signing_key(), last_message_id); + let block = + block_data.into_pending_block(self.store.signing_key(), self.last_bedrock_msg_id); let msg_id = client.submit_block_to_bedrock(&block).await?; - client.set_last_message_id(msg_id); + self.last_bedrock_msg_id = msg_id; log::info!("Posted block data to Bedrock"); } @@ -195,15 +198,9 @@ impl SequencerCore { timestamp: curr_time, }; - let bedrock_parent_id = self - .block_settlement_client - .as_ref() - .map(|client| client.last_message_id()) - .unwrap_or(MsgId::from([0; 32])); - let block = hashable_data .clone() - .into_pending_block(self.store.signing_key(), bedrock_parent_id); + .into_pending_block(self.store.signing_key(), self.last_bedrock_msg_id); self.store.update(block, &self.state)?; @@ -249,20 +246,16 @@ impl SequencerCore { .try_for_each(|&id| self.store.delete_block_at_id(id)) } - pub async fn resubmit_pending_blocks(&self) -> Result<()> { - for res in self.store.get_pending_blocks() { - let block = res?; - match block.bedrock_status { - BedrockStatus::Pending => { - if let Some(client) = self.block_settlement_client.as_ref() { - client.submit_block_to_bedrock(&block).await?; - log::info!("Posted block data to Bedrock"); - } - } - _ => continue, - } - } - Ok(()) + pub fn get_pending_blocks(&self) -> Vec { + self.store + .get_pending_blocks() + .flatten() + .filter(|block| matches!(block.bedrock_status, BedrockStatus::Pending)) + .collect() + } + + pub fn block_settlement_client(&self) -> Option { + self.block_settlement_client.clone() } } @@ -326,6 +319,7 @@ mod tests { initial_commitments: vec![], signing_key: *sequencer_sign_key_for_testing().value(), bedrock_config: None, + retry_pending_blocks_timeout_millis: 1000 * 60 * 4, } } diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index b89993f9..eb19b620 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -389,6 +389,7 @@ mod tests { initial_commitments: vec![], signing_key: *sequencer_sign_key_for_testing().value(), bedrock_config: None, + retry_pending_blocks_timeout_millis: 1000 * 60 * 4, } } diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs index fd4a6c08..3387c524 100644 --- a/sequencer_runner/src/lib.rs +++ b/sequencer_runner/src/lib.rs @@ -4,7 +4,7 @@ use actix_web::dev::ServerHandle; use anyhow::Result; use clap::Parser; use common::rpc_primitives::RpcConfig; -use log::info; +use log::{info, warn}; use sequencer_core::{SequencerCore, config::SequencerConfig}; use sequencer_rpc::new_http_server; use tokio::{sync::Mutex, task::JoinHandle}; @@ -20,8 +20,14 @@ struct Args { pub async fn startup_sequencer( app_config: SequencerConfig, -) -> Result<(ServerHandle, SocketAddr, JoinHandle>)> { +) -> Result<( + ServerHandle, + SocketAddr, + JoinHandle>, + JoinHandle>, +)> { let block_timeout = app_config.block_create_timeout_millis; + let retry_pending_blocks_timeout = app_config.retry_pending_blocks_timeout_millis; let port = app_config.port; let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config); @@ -39,8 +45,38 @@ pub async fn startup_sequencer( let http_server_handle = http_server.handle(); tokio::spawn(http_server); - info!("Starting main sequencer loop"); + info!("Starting pending block retry loop"); + let seq_core_wrapped_for_block_retry = seq_core_wrapped.clone(); + let retry_pending_blocks_handle = tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis( + retry_pending_blocks_timeout, + )) + .await; + let (pending_blocks, block_settlement_client) = { + let sequencer_core = seq_core_wrapped_for_block_retry.lock().await; + let client = sequencer_core.block_settlement_client(); + let pending_blocks = sequencer_core.get_pending_blocks(); + (pending_blocks, client) + }; + + let Some(client) = block_settlement_client else { + continue; + }; + + for block in pending_blocks.iter() { + if let Err(e) = client.submit_block_to_bedrock(block).await { + warn!( + "Failed to resubmit block with id {} with error {}", + block.header.block_id, e + ); + } + } + } + }); + + info!("Starting main sequencer loop"); let main_loop_handle = tokio::spawn(async move { loop { tokio::time::sleep(std::time::Duration::from_millis(block_timeout)).await; @@ -61,7 +97,12 @@ pub async fn startup_sequencer( } }); - Ok((http_server_handle, addr, main_loop_handle)) + Ok(( + http_server_handle, + addr, + main_loop_handle, + retry_pending_blocks_handle, + )) } pub async fn main_runner() -> Result<()> { @@ -81,9 +122,26 @@ pub async fn main_runner() -> Result<()> { } // ToDo: Add restart on failures - let (_, _, main_loop_handle) = startup_sequencer(app_config).await?; + let (_, _, main_loop_handle, retry_loop_handle) = startup_sequencer(app_config).await?; - main_loop_handle.await??; + info!("Sequencer running. Monitoring concurrent tasks..."); + + tokio::select! { + res = main_loop_handle => { + match res { + Ok(inner_res) => warn!("Main loop exited unexpectedly: {:?}", inner_res), + Err(e) => warn!("Main loop task panicked: {:?}", e), + } + } + res = retry_loop_handle => { + match res { + Ok(inner_res) => warn!("Retry loop exited unexpectedly: {:?}", inner_res), + Err(e) => warn!("Retry loop task panicked: {:?}", e), + } + } + } + + info!("Shutting down sequencer..."); Ok(()) }