From 2218ad1c64ec4ed690b626c688894bf5a301aac3 Mon Sep 17 00:00:00 2001 From: Sergio Chouhy Date: Thu, 22 Jan 2026 02:01:02 -0300 Subject: [PATCH] add functionality to resubmit pending transactions and delete finalized blocks --- common/src/rpc_primitives/requests.rs | 9 ++++ common/src/sequencer_client.rs | 33 ++++++++++--- sequencer_core/src/block_settlement_client.rs | 12 +++-- sequencer_core/src/block_store.rs | 14 +++++- sequencer_core/src/lib.rs | 27 ++++++++++- sequencer_rpc/src/process.rs | 33 +++++++++---- storage/src/lib.rs | 47 ++++++++++++++++++- 7 files changed, 151 insertions(+), 24 deletions(-) diff --git a/common/src/rpc_primitives/requests.rs b/common/src/rpc_primitives/requests.rs index 71641936..feb534ea 100644 --- a/common/src/rpc_primitives/requests.rs +++ b/common/src/rpc_primitives/requests.rs @@ -73,6 +73,11 @@ pub struct GetProofForCommitmentRequest { #[derive(Serialize, Deserialize, Debug)] pub struct GetProgramIdsRequest {} +#[derive(Serialize, Deserialize, Debug)] +pub struct DeleteFinalizedBlockRequest { + pub block_id: u64, +} + parse_request!(HelloRequest); parse_request!(RegisterAccountRequest); parse_request!(SendTxRequest); @@ -87,6 +92,7 @@ parse_request!(GetAccountsNoncesRequest); parse_request!(GetProofForCommitmentRequest); parse_request!(GetAccountRequest); parse_request!(GetProgramIdsRequest); +parse_request!(DeleteFinalizedBlockRequest); #[derive(Serialize, Deserialize, Debug)] pub struct HelloResponse { @@ -216,3 +222,6 @@ pub struct GetInitialTestnetAccountsResponse { pub account_id: String, pub balance: u64, } + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct DeleteFinalizedBlockResponse; diff --git a/common/src/sequencer_client.rs b/common/src/sequencer_client.rs index 0cb03f6f..ea110e37 100644 --- a/common/src/sequencer_client.rs +++ b/common/src/sequencer_client.rs @@ -15,12 +15,13 @@ use crate::{ rpc_primitives::{ self, requests::{ - GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest, - GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, - GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse, - GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, - GetProofForCommitmentResponse, GetTransactionByHashRequest, - GetTransactionByHashResponse, SendTxRequest, SendTxResponse, + DeleteFinalizedBlockRequest, GetAccountRequest, GetAccountResponse, + GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockRangeDataRequest, + GetBlockRangeDataResponse, GetInitialTestnetAccountsResponse, GetLastBlockRequest, + GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, + GetProofForCommitmentRequest, GetProofForCommitmentResponse, + GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest, + SendTxResponse, }, }, transaction::{EncodedTransaction, NSSATransaction}, @@ -347,4 +348,24 @@ impl SequencerClient { Ok(resp_deser) } + + pub async fn delete_finalized_block( + &self, + block_id: u64, + ) -> Result, SequencerClientError> { + let acc_req = DeleteFinalizedBlockRequest { block_id }; + + let req = serde_json::to_value(acc_req).unwrap(); + + let resp = self + .call_method_with_payload("delete_finalized_block", req) + .await + .unwrap(); + + let resp_deser = serde_json::from_value::(resp) + .unwrap() + .program_ids; + + Ok(resp_deser) + } } diff --git a/sequencer_core/src/block_settlement_client.rs b/sequencer_core/src/block_settlement_client.rs index 58f4d7f4..03e9f164 100644 --- a/sequencer_core/src/block_settlement_client.rs +++ b/sequencer_core/src/block_settlement_client.rs @@ -33,6 +33,10 @@ impl BlockSettlementClient { }) } + pub fn set_last_message_id(&mut self, msg_id: MsgId) { + self.last_message_id = msg_id; + } + /// Create and sign a transaction for inscribing data pub fn create_inscribe_tx(&self, data: Vec) -> (SignedMantleTx, MsgId) { let verifying_key_bytes = self.bedrock_signing_key.public_key().to_bytes(); @@ -73,17 +77,15 @@ impl BlockSettlementClient { (signed_mantle_tx, inscribe_op_id) } - /// Post a transaction to the node and wait for inclusion - pub async fn post_and_wait(&mut self, block_data: &HashableBlockData) -> Result { + /// Post a transaction to the node + pub async fn post_transaction(&self, block_data: &HashableBlockData) -> Result { let inscription_data = borsh::to_vec(&block_data)?; let (tx, new_msg_id) = self.create_inscribe_tx(inscription_data); // Post the transaction self.bedrock_client.post_transaction(tx).await?; - self.last_message_id = new_msg_id; - - Ok(block_data.block_id) + Ok(new_msg_id) } } diff --git a/sequencer_core/src/block_store.rs b/sequencer_core/src/block_store.rs index cd9aa194..e050f181 100644 --- a/sequencer_core/src/block_store.rs +++ b/sequencer_core/src/block_store.rs @@ -1,7 +1,11 @@ use std::{collections::HashMap, path::Path}; use anyhow::Result; -use common::{HashType, block::Block, transaction::EncodedTransaction}; +use common::{ + HashType, + block::{Block, BlockHash}, + transaction::EncodedTransaction, +}; use storage::RocksDBIO; pub struct SequencerBlockStore { @@ -56,6 +60,10 @@ impl SequencerBlockStore { Ok(()) } + pub fn delete_block_at_id(&mut self, block_id: u64) -> Result<()> { + Ok(self.dbio.delete_block(block_id)?) + } + /// Returns the transaction corresponding to the given hash, if it exists in the blockchain. pub fn get_transaction_by_hash(&self, hash: HashType) -> Option { let block_id = self.tx_hash_to_block_map.get(&hash); @@ -81,6 +89,10 @@ impl SequencerBlockStore { pub fn signing_key(&self) -> &nssa::PrivateKey { &self.signing_key } + + pub(crate) fn get_pending_blocks(&self) -> impl Iterator> { + self.dbio.get_all_blocks().map(|res| Ok(res?)) + } } pub(crate) fn block_to_transactions_map(block: &Block) -> HashMap { diff --git a/sequencer_core/src/lib.rs b/sequencer_core/src/lib.rs index 89cafc4c..115baf80 100644 --- a/sequencer_core/src/lib.rs +++ b/sequencer_core/src/lib.rs @@ -5,12 +5,13 @@ use anyhow::Result; use common::PINATA_BASE58; use common::{ HashType, - block::HashableBlockData, + block::{BedrockStatus, Block, BlockHash, HashableBlockData}, transaction::{EncodedTransaction, NSSATransaction}, }; use config::SequencerConfig; use log::warn; use mempool::{MemPool, MemPoolHandle}; +use nomos_core::mantle::ops::channel::MsgId; use serde::{Deserialize, Serialize}; use crate::{block_settlement_client::BlockSettlementClient, block_store::SequencerBlockStore}; @@ -149,7 +150,8 @@ impl SequencerCore { let block_data = self.produce_new_block_with_mempool_transactions()?; if let Some(block_settlement) = self.block_settlement_client.as_mut() { - block_settlement.post_and_wait(&block_data).await?; + let msg_id = block_settlement.post_transaction(&block_data).await?; + block_settlement.set_last_message_id(msg_id); log::info!("Posted block data to Bedrock"); } @@ -235,6 +237,27 @@ impl SequencerCore { pub fn sequencer_config(&self) -> &SequencerConfig { &self.sequencer_config } + + pub fn delete_finalized_block_from_db(&mut self, block_id: u64) -> Result<()> { + self.block_store.delete_block_at_id(block_id) + } + + pub async fn resubmit_pending_blocks(&self) -> Result<()> { + for res in self.block_store.get_pending_blocks() { + let block = res?; + match block.bedrock_status { + BedrockStatus::Pending => { + if let Some(block_settlement) = self.block_settlement_client.as_ref() { + let block_data: HashableBlockData = block.into(); + block_settlement.post_transaction(&block_data).await?; + log::info!("Posted block data to Bedrock"); + } + } + _ => continue, + } + } + Ok(()) + } } // TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index b89993f9..dff4cb60 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -11,15 +11,15 @@ use common::{ message::{Message, Request}, parser::RpcRequest, requests::{ - GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest, - GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse, - GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest, - GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, - GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse, - GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest, - GetProofForCommitmentResponse, GetTransactionByHashRequest, - GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest, - SendTxResponse, + DeleteFinalizedBlockRequest, DeleteFinalizedBlockResponse, GetAccountBalanceRequest, + GetAccountBalanceResponse, GetAccountRequest, GetAccountResponse, + GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockDataRequest, + GetBlockDataResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse, + GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest, + GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse, + GetProofForCommitmentRequest, GetProofForCommitmentResponse, + GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse, + SendTxRequest, SendTxResponse, }, }, transaction::{EncodedTransaction, NSSATransaction}, @@ -44,6 +44,7 @@ pub const GET_ACCOUNTS_NONCES: &str = "get_accounts_nonces"; pub const GET_ACCOUNT: &str = "get_account"; pub const GET_PROOF_FOR_COMMITMENT: &str = "get_proof_for_commitment"; pub const GET_PROGRAM_IDS: &str = "get_program_ids"; +pub const DELETE_FINALIZED_BLOCK: &str = "delete_finalized_block"; pub const HELLO_FROM_SEQUENCER: &str = "HELLO_FROM_SEQUENCER"; @@ -314,6 +315,19 @@ impl JsonHandler { respond(response) } + async fn delete_finalized_block(&self, request: Request) -> Result { + let delete_finalized_block_req = DeleteFinalizedBlockRequest::parse(Some(request.params))?; + let block_id = delete_finalized_block_req.block_id; + + self.sequencer_state + .lock() + .await + .delete_finalized_block_from_db(block_id)?; + + let response = DeleteFinalizedBlockResponse; + respond(response) + } + pub async fn process_request_internal(&self, request: Request) -> Result { match request.method.as_ref() { HELLO => self.process_temp_hello(request).await, @@ -329,6 +343,7 @@ impl JsonHandler { GET_TRANSACTION_BY_HASH => self.process_get_transaction_by_hash(request).await, GET_PROOF_FOR_COMMITMENT => self.process_get_proof_by_commitment(request).await, GET_PROGRAM_IDS => self.process_get_program_ids(request).await, + DELETE_FINALIZED_BLOCK => self.delete_finalized_block(request).await, _ => Err(RpcErr(RpcError::method_not_found(request.method))), } } diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 883684c2..498197b6 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -1,6 +1,6 @@ use std::{path::Path, sync::Arc}; -use common::block::Block; +use common::block::{Block, BlockHash}; use error::DbError; use rocksdb::{ BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, @@ -362,4 +362,49 @@ impl RocksDBIO { )) } } + + pub fn delete_block(&self, block_id: u64) -> DbResult<()> { + let cf_block = self.block_column(); + let key = borsh::to_vec(&block_id).map_err(|err| { + DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string())) + })?; + + if self + .db + .get_cf(&cf_block, &key) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))? + .is_none() + { + return Err(DbError::db_interaction_error( + "Block on this id not found".to_string(), + )); + } + + self.db + .delete_cf(&cf_block, key) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + Ok(()) + } + + pub fn get_all_blocks(&self) -> impl Iterator> { + let cf_block = self.block_column(); + self.db + .iterator_cf(&cf_block, rocksdb::IteratorMode::Start) + .map(|res| { + let (_key, value) = res.map_err(|rerr| { + DbError::rocksdb_cast_message( + rerr, + Some("Failed to get key value pair".to_string()), + ) + })?; + + borsh::from_slice::(&value).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to deserialize block data".to_string()), + ) + }) + }) + } }