add tokio task to retry pending blocks in bedrock

This commit is contained in:
Sergio Chouhy 2026-01-27 13:27:52 -03:00
parent d7cac557af
commit e78d6a59a0
10 changed files with 92 additions and 44 deletions

View File

@ -5,6 +5,7 @@ use reqwest::{Client, Url};
// Simple wrapper // Simple wrapper
// maybe extend in the future for our purposes // maybe extend in the future for our purposes
#[derive(Clone)]
pub struct BedrockClient { pub struct BedrockClient {
http_client: CommonHttpClient, http_client: CommonHttpClient,
node_url: Url, node_url: Url,

View File

@ -6,6 +6,7 @@
"max_num_tx_in_block": 20, "max_num_tx_in_block": 20,
"mempool_max_size": 10000, "mempool_max_size": 10000,
"block_create_timeout_millis": 10000, "block_create_timeout_millis": 10000,
"retry_pending_blocks_timeout_millis": 240000,
"port": 0, "port": 0,
"initial_accounts": [ "initial_accounts": [
{ {

View File

@ -113,7 +113,7 @@ impl TestContext {
// Setting port to 0 lets the OS choose a free port for us // Setting port to 0 lets the OS choose a free port for us
config.port = 0; config.port = 0;
let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle) = let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle, _) =
sequencer_runner::startup_sequencer(config).await?; sequencer_runner::startup_sequencer(config).await?;
Ok(( Ok((

View File

@ -186,6 +186,7 @@ impl TpsTestManager {
initial_commitments: vec![initial_commitment], initial_commitments: vec![initial_commitment],
signing_key: [37; 32], signing_key: [37; 32],
bedrock_config: None, bedrock_config: None,
retry_pending_blocks_timeout_millis: 1000 * 60 * 4,
} }
} }
} }

View File

@ -2,7 +2,7 @@ use std::{fs, path::Path};
use anyhow::{Result, anyhow}; use anyhow::{Result, anyhow};
use bedrock_client::BedrockClient; use bedrock_client::BedrockClient;
use common::block::{Block, HashableBlockData}; use common::block::{BedrockStatus, Block, HashableBlockData};
use logos_blockchain_core::mantle::{ use logos_blockchain_core::mantle::{
MantleTx, Op, OpProof, SignedMantleTx, Transaction, TxHash, ledger, MantleTx, Op, OpProof, SignedMantleTx, Transaction, TxHash, ledger,
ops::channel::{ChannelId, MsgId, inscribe::InscriptionOp}, ops::channel::{ChannelId, MsgId, inscribe::InscriptionOp},
@ -14,11 +14,11 @@ use logos_blockchain_key_management_system_service::keys::{
use crate::config::BedrockConfig; use crate::config::BedrockConfig;
/// A component that posts block data to logos blockchain /// A component that posts block data to logos blockchain
#[derive(Clone)]
pub struct BlockSettlementClient { pub struct BlockSettlementClient {
bedrock_client: BedrockClient, bedrock_client: BedrockClient,
bedrock_signing_key: Ed25519Key, bedrock_signing_key: Ed25519Key,
bedrock_channel_id: ChannelId, bedrock_channel_id: ChannelId,
last_message_id: MsgId,
} }
impl BlockSettlementClient { impl BlockSettlementClient {
@ -26,23 +26,13 @@ impl BlockSettlementClient {
let bedrock_signing_key = load_or_create_signing_key(&home.join("bedrock_signing_key"))?; let bedrock_signing_key = load_or_create_signing_key(&home.join("bedrock_signing_key"))?;
let bedrock_channel_id = ChannelId::from(config.channel_id); let bedrock_channel_id = ChannelId::from(config.channel_id);
let bedrock_client = BedrockClient::new(None, config.node_url.clone())?; let bedrock_client = BedrockClient::new(None, config.node_url.clone())?;
let channel_genesis_msg = MsgId::from([0; 32]);
Ok(Self { Ok(Self {
bedrock_client, bedrock_client,
bedrock_signing_key, bedrock_signing_key,
bedrock_channel_id, bedrock_channel_id,
last_message_id: channel_genesis_msg,
}) })
} }
pub fn set_last_message_id(&mut self, msg_id: MsgId) {
self.last_message_id = msg_id;
}
pub fn last_message_id(&self) -> MsgId {
self.last_message_id
}
/// Create and sign a transaction for inscribing data /// Create and sign a transaction for inscribing data
pub fn create_inscribe_tx(&self, block: &Block) -> Result<(SignedMantleTx, MsgId)> { pub fn create_inscribe_tx(&self, block: &Block) -> Result<(SignedMantleTx, MsgId)> {
let inscription_data = borsh::to_vec(block)?; let inscription_data = borsh::to_vec(block)?;

View File

@ -80,7 +80,7 @@ impl SequencerStore {
&self.signing_key &self.signing_key
} }
pub(crate) fn get_pending_blocks(&self) -> impl Iterator<Item = Result<Block>> { pub fn get_pending_blocks(&self) -> impl Iterator<Item = Result<Block>> {
self.dbio.get_all_blocks().map(|res| Ok(res?)) self.dbio.get_all_blocks().map(|res| Ok(res?))
} }

View File

@ -40,6 +40,8 @@ pub struct SequencerConfig {
pub mempool_max_size: usize, pub mempool_max_size: usize,
/// Interval in which blocks produced /// Interval in which blocks produced
pub block_create_timeout_millis: u64, pub block_create_timeout_millis: u64,
/// Interval in which pending blocks are retried
pub retry_pending_blocks_timeout_millis: u64,
/// Port to listen /// Port to listen
pub port: u16, pub port: u16,
/// List of initial accounts data /// List of initial accounts data

View File

@ -27,6 +27,7 @@ pub struct SequencerCore {
sequencer_config: SequencerConfig, sequencer_config: SequencerConfig,
chain_height: u64, chain_height: u64,
block_settlement_client: Option<BlockSettlementClient>, block_settlement_client: Option<BlockSettlementClient>,
last_bedrock_msg_id: MsgId,
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@ -96,6 +97,7 @@ impl SequencerCore {
.expect("Block settlement client should be constructible") .expect("Block settlement client should be constructible")
}); });
let channel_genesis_msg_id = MsgId::from([0; 32]);
let mut this = Self { let mut this = Self {
state, state,
store: block_store, store: block_store,
@ -103,6 +105,7 @@ impl SequencerCore {
chain_height: config.genesis_id, chain_height: config.genesis_id,
sequencer_config: config, sequencer_config: config,
block_settlement_client, block_settlement_client,
last_bedrock_msg_id: channel_genesis_msg_id,
}; };
this.sync_state_with_stored_blocks(); this.sync_state_with_stored_blocks();
@ -151,10 +154,10 @@ impl SequencerCore {
let block_data = self.produce_new_block_with_mempool_transactions()?; let block_data = self.produce_new_block_with_mempool_transactions()?;
if let Some(client) = self.block_settlement_client.as_mut() { if let Some(client) = self.block_settlement_client.as_mut() {
let last_message_id = client.last_message_id(); let block =
let block = block_data.into_pending_block(self.store.signing_key(), last_message_id); block_data.into_pending_block(self.store.signing_key(), self.last_bedrock_msg_id);
let msg_id = client.submit_block_to_bedrock(&block).await?; let msg_id = client.submit_block_to_bedrock(&block).await?;
client.set_last_message_id(msg_id); self.last_bedrock_msg_id = msg_id;
log::info!("Posted block data to Bedrock"); log::info!("Posted block data to Bedrock");
} }
@ -195,15 +198,9 @@ impl SequencerCore {
timestamp: curr_time, timestamp: curr_time,
}; };
let bedrock_parent_id = self
.block_settlement_client
.as_ref()
.map(|client| client.last_message_id())
.unwrap_or(MsgId::from([0; 32]));
let block = hashable_data let block = hashable_data
.clone() .clone()
.into_pending_block(self.store.signing_key(), bedrock_parent_id); .into_pending_block(self.store.signing_key(), self.last_bedrock_msg_id);
self.store.update(block, &self.state)?; self.store.update(block, &self.state)?;
@ -249,20 +246,16 @@ impl SequencerCore {
.try_for_each(|&id| self.store.delete_block_at_id(id)) .try_for_each(|&id| self.store.delete_block_at_id(id))
} }
pub async fn resubmit_pending_blocks(&self) -> Result<()> { pub fn get_pending_blocks(&self) -> Vec<Block> {
for res in self.store.get_pending_blocks() { self.store
let block = res?; .get_pending_blocks()
match block.bedrock_status { .flatten()
BedrockStatus::Pending => { .filter(|block| matches!(block.bedrock_status, BedrockStatus::Pending))
if let Some(client) = self.block_settlement_client.as_ref() { .collect()
client.submit_block_to_bedrock(&block).await?; }
log::info!("Posted block data to Bedrock");
} pub fn block_settlement_client(&self) -> Option<BlockSettlementClient> {
} self.block_settlement_client.clone()
_ => continue,
}
}
Ok(())
} }
} }
@ -326,6 +319,7 @@ mod tests {
initial_commitments: vec![], initial_commitments: vec![],
signing_key: *sequencer_sign_key_for_testing().value(), signing_key: *sequencer_sign_key_for_testing().value(),
bedrock_config: None, bedrock_config: None,
retry_pending_blocks_timeout_millis: 1000 * 60 * 4,
} }
} }

View File

@ -389,6 +389,7 @@ mod tests {
initial_commitments: vec![], initial_commitments: vec![],
signing_key: *sequencer_sign_key_for_testing().value(), signing_key: *sequencer_sign_key_for_testing().value(),
bedrock_config: None, bedrock_config: None,
retry_pending_blocks_timeout_millis: 1000 * 60 * 4,
} }
} }

View File

@ -4,7 +4,7 @@ use actix_web::dev::ServerHandle;
use anyhow::Result; use anyhow::Result;
use clap::Parser; use clap::Parser;
use common::rpc_primitives::RpcConfig; use common::rpc_primitives::RpcConfig;
use log::info; use log::{info, warn};
use sequencer_core::{SequencerCore, config::SequencerConfig}; use sequencer_core::{SequencerCore, config::SequencerConfig};
use sequencer_rpc::new_http_server; use sequencer_rpc::new_http_server;
use tokio::{sync::Mutex, task::JoinHandle}; use tokio::{sync::Mutex, task::JoinHandle};
@ -20,8 +20,14 @@ struct Args {
pub async fn startup_sequencer( pub async fn startup_sequencer(
app_config: SequencerConfig, app_config: SequencerConfig,
) -> Result<(ServerHandle, SocketAddr, JoinHandle<Result<()>>)> { ) -> Result<(
ServerHandle,
SocketAddr,
JoinHandle<Result<()>>,
JoinHandle<Result<()>>,
)> {
let block_timeout = app_config.block_create_timeout_millis; let block_timeout = app_config.block_create_timeout_millis;
let retry_pending_blocks_timeout = app_config.retry_pending_blocks_timeout_millis;
let port = app_config.port; let port = app_config.port;
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config); let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config);
@ -39,8 +45,38 @@ pub async fn startup_sequencer(
let http_server_handle = http_server.handle(); let http_server_handle = http_server.handle();
tokio::spawn(http_server); tokio::spawn(http_server);
info!("Starting main sequencer loop"); info!("Starting pending block retry loop");
let seq_core_wrapped_for_block_retry = seq_core_wrapped.clone();
let retry_pending_blocks_handle = tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_millis(
retry_pending_blocks_timeout,
))
.await;
let (pending_blocks, block_settlement_client) = {
let sequencer_core = seq_core_wrapped_for_block_retry.lock().await;
let client = sequencer_core.block_settlement_client();
let pending_blocks = sequencer_core.get_pending_blocks();
(pending_blocks, client)
};
let Some(client) = block_settlement_client else {
continue;
};
for block in pending_blocks.iter() {
if let Err(e) = client.submit_block_to_bedrock(block).await {
warn!(
"Failed to resubmit block with id {} with error {}",
block.header.block_id, e
);
}
}
}
});
info!("Starting main sequencer loop");
let main_loop_handle = tokio::spawn(async move { let main_loop_handle = tokio::spawn(async move {
loop { loop {
tokio::time::sleep(std::time::Duration::from_millis(block_timeout)).await; tokio::time::sleep(std::time::Duration::from_millis(block_timeout)).await;
@ -61,7 +97,12 @@ pub async fn startup_sequencer(
} }
}); });
Ok((http_server_handle, addr, main_loop_handle)) Ok((
http_server_handle,
addr,
main_loop_handle,
retry_pending_blocks_handle,
))
} }
pub async fn main_runner() -> Result<()> { pub async fn main_runner() -> Result<()> {
@ -81,9 +122,26 @@ pub async fn main_runner() -> Result<()> {
} }
// ToDo: Add restart on failures // ToDo: Add restart on failures
let (_, _, main_loop_handle) = startup_sequencer(app_config).await?; let (_, _, main_loop_handle, retry_loop_handle) = startup_sequencer(app_config).await?;
main_loop_handle.await??; info!("Sequencer running. Monitoring concurrent tasks...");
tokio::select! {
res = main_loop_handle => {
match res {
Ok(inner_res) => warn!("Main loop exited unexpectedly: {:?}", inner_res),
Err(e) => warn!("Main loop task panicked: {:?}", e),
}
}
res = retry_loop_handle => {
match res {
Ok(inner_res) => warn!("Retry loop exited unexpectedly: {:?}", inner_res),
Err(e) => warn!("Retry loop task panicked: {:?}", e),
}
}
}
info!("Shutting down sequencer...");
Ok(()) Ok(())
} }