add functionality to resubmit pending transactions and delete finalized blocks

This commit is contained in:
Sergio Chouhy 2026-01-22 02:01:02 -03:00
parent f04b299b1a
commit 2218ad1c64
7 changed files with 151 additions and 24 deletions

View File

@ -73,6 +73,11 @@ pub struct GetProofForCommitmentRequest {
#[derive(Serialize, Deserialize, Debug)]
pub struct GetProgramIdsRequest {}
#[derive(Serialize, Deserialize, Debug)]
pub struct DeleteFinalizedBlockRequest {
pub block_id: u64,
}
parse_request!(HelloRequest);
parse_request!(RegisterAccountRequest);
parse_request!(SendTxRequest);
@ -87,6 +92,7 @@ parse_request!(GetAccountsNoncesRequest);
parse_request!(GetProofForCommitmentRequest);
parse_request!(GetAccountRequest);
parse_request!(GetProgramIdsRequest);
parse_request!(DeleteFinalizedBlockRequest);
#[derive(Serialize, Deserialize, Debug)]
pub struct HelloResponse {
@ -216,3 +222,6 @@ pub struct GetInitialTestnetAccountsResponse {
pub account_id: String,
pub balance: u64,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct DeleteFinalizedBlockResponse;

View File

@ -15,12 +15,13 @@ use crate::{
rpc_primitives::{
self,
requests::{
GetAccountRequest, GetAccountResponse, GetAccountsNoncesRequest,
GetAccountsNoncesResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse,
GetInitialTestnetAccountsResponse, GetLastBlockRequest, GetLastBlockResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse, SendTxRequest, SendTxResponse,
DeleteFinalizedBlockRequest, GetAccountRequest, GetAccountResponse,
GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockRangeDataRequest,
GetBlockRangeDataResponse, GetInitialTestnetAccountsResponse, GetLastBlockRequest,
GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse,
GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetTransactionByHashRequest, GetTransactionByHashResponse, SendTxRequest,
SendTxResponse,
},
},
transaction::{EncodedTransaction, NSSATransaction},
@ -347,4 +348,24 @@ impl SequencerClient {
Ok(resp_deser)
}
pub async fn delete_finalized_block(
&self,
block_id: u64,
) -> Result<HashMap<String, ProgramId>, SequencerClientError> {
let acc_req = DeleteFinalizedBlockRequest { block_id };
let req = serde_json::to_value(acc_req).unwrap();
let resp = self
.call_method_with_payload("delete_finalized_block", req)
.await
.unwrap();
let resp_deser = serde_json::from_value::<GetProgramIdsResponse>(resp)
.unwrap()
.program_ids;
Ok(resp_deser)
}
}

View File

@ -33,6 +33,10 @@ impl BlockSettlementClient {
})
}
pub fn set_last_message_id(&mut self, msg_id: MsgId) {
self.last_message_id = msg_id;
}
/// Create and sign a transaction for inscribing data
pub fn create_inscribe_tx(&self, data: Vec<u8>) -> (SignedMantleTx, MsgId) {
let verifying_key_bytes = self.bedrock_signing_key.public_key().to_bytes();
@ -73,17 +77,15 @@ impl BlockSettlementClient {
(signed_mantle_tx, inscribe_op_id)
}
/// Post a transaction to the node and wait for inclusion
pub async fn post_and_wait(&mut self, block_data: &HashableBlockData) -> Result<u64> {
/// Post a transaction to the node
pub async fn post_transaction(&self, block_data: &HashableBlockData) -> Result<MsgId> {
let inscription_data = borsh::to_vec(&block_data)?;
let (tx, new_msg_id) = self.create_inscribe_tx(inscription_data);
// Post the transaction
self.bedrock_client.post_transaction(tx).await?;
self.last_message_id = new_msg_id;
Ok(block_data.block_id)
Ok(new_msg_id)
}
}

View File

@ -1,7 +1,11 @@
use std::{collections::HashMap, path::Path};
use anyhow::Result;
use common::{HashType, block::Block, transaction::EncodedTransaction};
use common::{
HashType,
block::{Block, BlockHash},
transaction::EncodedTransaction,
};
use storage::RocksDBIO;
pub struct SequencerBlockStore {
@ -56,6 +60,10 @@ impl SequencerBlockStore {
Ok(())
}
pub fn delete_block_at_id(&mut self, block_id: u64) -> Result<()> {
Ok(self.dbio.delete_block(block_id)?)
}
/// Returns the transaction corresponding to the given hash, if it exists in the blockchain.
pub fn get_transaction_by_hash(&self, hash: HashType) -> Option<EncodedTransaction> {
let block_id = self.tx_hash_to_block_map.get(&hash);
@ -81,6 +89,10 @@ impl SequencerBlockStore {
pub fn signing_key(&self) -> &nssa::PrivateKey {
&self.signing_key
}
pub(crate) fn get_pending_blocks(&self) -> impl Iterator<Item = Result<Block>> {
self.dbio.get_all_blocks().map(|res| Ok(res?))
}
}
pub(crate) fn block_to_transactions_map(block: &Block) -> HashMap<HashType, u64> {

View File

@ -5,12 +5,13 @@ use anyhow::Result;
use common::PINATA_BASE58;
use common::{
HashType,
block::HashableBlockData,
block::{BedrockStatus, Block, BlockHash, HashableBlockData},
transaction::{EncodedTransaction, NSSATransaction},
};
use config::SequencerConfig;
use log::warn;
use mempool::{MemPool, MemPoolHandle};
use nomos_core::mantle::ops::channel::MsgId;
use serde::{Deserialize, Serialize};
use crate::{block_settlement_client::BlockSettlementClient, block_store::SequencerBlockStore};
@ -149,7 +150,8 @@ impl SequencerCore {
let block_data = self.produce_new_block_with_mempool_transactions()?;
if let Some(block_settlement) = self.block_settlement_client.as_mut() {
block_settlement.post_and_wait(&block_data).await?;
let msg_id = block_settlement.post_transaction(&block_data).await?;
block_settlement.set_last_message_id(msg_id);
log::info!("Posted block data to Bedrock");
}
@ -235,6 +237,27 @@ impl SequencerCore {
pub fn sequencer_config(&self) -> &SequencerConfig {
&self.sequencer_config
}
pub fn delete_finalized_block_from_db(&mut self, block_id: u64) -> Result<()> {
self.block_store.delete_block_at_id(block_id)
}
pub async fn resubmit_pending_blocks(&self) -> Result<()> {
for res in self.block_store.get_pending_blocks() {
let block = res?;
match block.bedrock_status {
BedrockStatus::Pending => {
if let Some(block_settlement) = self.block_settlement_client.as_ref() {
let block_data: HashableBlockData = block.into();
block_settlement.post_transaction(&block_data).await?;
log::info!("Posted block data to Bedrock");
}
}
_ => continue,
}
}
Ok(())
}
}
// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction

View File

@ -11,15 +11,15 @@ use common::{
message::{Message, Request},
parser::RpcRequest,
requests::{
GetAccountBalanceRequest, GetAccountBalanceResponse, GetAccountRequest,
GetAccountResponse, GetAccountsNoncesRequest, GetAccountsNoncesResponse,
GetBlockDataRequest, GetBlockDataResponse, GetBlockRangeDataRequest,
GetBlockRangeDataResponse, GetGenesisIdRequest, GetGenesisIdResponse,
GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse,
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
GetProofForCommitmentResponse, GetTransactionByHashRequest,
GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest,
SendTxResponse,
DeleteFinalizedBlockRequest, DeleteFinalizedBlockResponse, GetAccountBalanceRequest,
GetAccountBalanceResponse, GetAccountRequest, GetAccountResponse,
GetAccountsNoncesRequest, GetAccountsNoncesResponse, GetBlockDataRequest,
GetBlockDataResponse, GetBlockRangeDataRequest, GetBlockRangeDataResponse,
GetGenesisIdRequest, GetGenesisIdResponse, GetInitialTestnetAccountsRequest,
GetLastBlockRequest, GetLastBlockResponse, GetProgramIdsRequest, GetProgramIdsResponse,
GetProofForCommitmentRequest, GetProofForCommitmentResponse,
GetTransactionByHashRequest, GetTransactionByHashResponse, HelloRequest, HelloResponse,
SendTxRequest, SendTxResponse,
},
},
transaction::{EncodedTransaction, NSSATransaction},
@ -44,6 +44,7 @@ pub const GET_ACCOUNTS_NONCES: &str = "get_accounts_nonces";
pub const GET_ACCOUNT: &str = "get_account";
pub const GET_PROOF_FOR_COMMITMENT: &str = "get_proof_for_commitment";
pub const GET_PROGRAM_IDS: &str = "get_program_ids";
pub const DELETE_FINALIZED_BLOCK: &str = "delete_finalized_block";
pub const HELLO_FROM_SEQUENCER: &str = "HELLO_FROM_SEQUENCER";
@ -314,6 +315,19 @@ impl JsonHandler {
respond(response)
}
async fn delete_finalized_block(&self, request: Request) -> Result<Value, RpcErr> {
let delete_finalized_block_req = DeleteFinalizedBlockRequest::parse(Some(request.params))?;
let block_id = delete_finalized_block_req.block_id;
self.sequencer_state
.lock()
.await
.delete_finalized_block_from_db(block_id)?;
let response = DeleteFinalizedBlockResponse;
respond(response)
}
pub async fn process_request_internal(&self, request: Request) -> Result<Value, RpcErr> {
match request.method.as_ref() {
HELLO => self.process_temp_hello(request).await,
@ -329,6 +343,7 @@ impl JsonHandler {
GET_TRANSACTION_BY_HASH => self.process_get_transaction_by_hash(request).await,
GET_PROOF_FOR_COMMITMENT => self.process_get_proof_by_commitment(request).await,
GET_PROGRAM_IDS => self.process_get_program_ids(request).await,
DELETE_FINALIZED_BLOCK => self.delete_finalized_block(request).await,
_ => Err(RpcErr(RpcError::method_not_found(request.method))),
}
}

View File

@ -1,6 +1,6 @@
use std::{path::Path, sync::Arc};
use common::block::Block;
use common::block::{Block, BlockHash};
use error::DbError;
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options,
@ -362,4 +362,49 @@ impl RocksDBIO {
))
}
}
pub fn delete_block(&self, block_id: u64) -> DbResult<()> {
let cf_block = self.block_column();
let key = borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
})?;
if self
.db
.get_cf(&cf_block, &key)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?
.is_none()
{
return Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
));
}
self.db
.delete_cf(&cf_block, key)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn get_all_blocks(&self) -> impl Iterator<Item = DbResult<Block>> {
let cf_block = self.block_column();
self.db
.iterator_cf(&cf_block, rocksdb::IteratorMode::Start)
.map(|res| {
let (_key, value) = res.map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some("Failed to get key value pair".to_string()),
)
})?;
borsh::from_slice::<Block>(&value).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize block data".to_string()),
)
})
})
}
}