feat(sequencer): make bridge deposits fault-tolerant

This commit is contained in:
Daniil Polyakov 2026-05-29 22:44:48 +03:00
parent 1e4e82a8ec
commit 15521eedc2
10 changed files with 439 additions and 26 deletions

1
Cargo.lock generated
View File

@ -8946,6 +8946,7 @@ dependencies = [
"nssa",
"nssa_core",
"rand 0.8.5",
"risc0-zkvm",
"serde",
"serde_json",
"storage",

Binary file not shown.

View File

@ -43,6 +43,7 @@ async fn public_bridge_deposit_invocation_is_dropped() -> anyhow::Result<()> {
vec![bridge_account_id, recipient_vault_id],
vec![],
bridge_core::Instruction::Deposit {
l1_deposit_op_id: [0_u8; 32],
vault_program_id,
recipient_id,
amount: 1,
@ -129,6 +130,7 @@ async fn private_bridge_deposit_invocation_is_dropped() -> anyhow::Result<()> {
// Serialize the bridge deposit instruction
let instruction = Program::serialize_instruction(bridge_core::Instruction::Deposit {
l1_deposit_op_id: [0_u8; 32],
vault_program_id,
recipient_id,
amount: 1,

View File

@ -33,6 +33,7 @@ fn main() {
let chained_calls = match instruction {
Instruction::Deposit {
l1_deposit_op_id: _,
vault_program_id,
recipient_id,
amount,

View File

@ -12,6 +12,9 @@ pub enum Instruction {
/// - Bridge PDA account
/// - Recipient vault PDA account
Deposit {
/// Deposit OP ID from L1, stored here to pin each [`Deposit`](Instruction::Deposit) to a
/// Deposit Event on L1.
l1_deposit_op_id: [u8; 32],
vault_program_id: ProgramId,
recipient_id: AccountId,
amount: u128,

View File

@ -34,6 +34,7 @@ borsh.workspace = true
bytesize.workspace = true
hex.workspace = true
url.workspace = true
risc0-zkvm.workspace = true
[features]
default = []

View File

@ -10,7 +10,7 @@ use log::info;
use logos_blockchain_zone_sdk::sequencer::SequencerCheckpoint;
use nssa::V03State;
pub use storage::DbResult;
use storage::sequencer::RocksDBIO;
use storage::sequencer::{RocksDBIO, sequencer_cells::PendingDepositEventRecord};
pub struct SequencerStore {
dbio: Arc<RocksDBIO>,
@ -165,6 +165,27 @@ impl SequencerStore {
self.dbio.put_zone_sdk_checkpoint_bytes(&bytes)?;
Ok(())
}
pub fn get_unfulfilled_deposit_events(&self) -> DbResult<Vec<PendingDepositEventRecord>> {
self.dbio.get_pending_deposit_events()
}
pub fn mark_unfulfilled_deposit_events_submitted(
&self,
deposit_op_ids: &[HashType],
submitted_block_id: u64,
) -> DbResult<usize> {
self.dbio
.mark_pending_deposit_events_submitted(deposit_op_ids, submitted_block_id)
}
pub fn remove_fulfilled_unfulfilled_deposit_events_up_to_block(
&self,
finalized_block_id: u64,
) -> DbResult<usize> {
self.dbio
.remove_fulfilled_pending_deposit_events_up_to_block(finalized_block_id)
}
}
pub(crate) fn block_to_transactions_map(block: &Block) -> HashMap<HashType, u64> {

View File

@ -1,4 +1,4 @@
use std::{path::Path, time::Instant};
use std::{path::Path, sync::Arc, time::Instant};
use anyhow::{Context as _, Result, anyhow};
use borsh::BorshDeserialize;
@ -16,6 +16,7 @@ pub use mock::SequencerCoreWithMockClients;
use nssa::{AccountId, PublicTransaction, program::Program, public_transaction::Message};
use nssa_core::GENESIS_BLOCK_ID;
pub use storage::error::DbError;
use storage::sequencer::sequencer_cells::PendingDepositEventRecord;
use crate::{
block_publisher::{BlockPublisherTrait, ZoneSdkPublisher},
@ -147,26 +148,72 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
let dbio_for_finalized = store.dbio();
let on_finalized_block: block_publisher::FinalizedBlockSink = Box::new(move |block_id| {
// NOTE: Theoretically Zone SDK may report finalization happening multiple times for the
// same block. In practice this is very unlikely to happen. For that to
// happen Sequencer should crash between receiving Finalized and Checkpoint events while
// these events happen very fast (because Checkpoints are generated by Zone SDK
// locally).
if let Err(err) = dbio_for_finalized.clean_pending_blocks_up_to(block_id) {
error!("Failed to mark pending blocks finalized up to {block_id}: {err:#}");
}
match dbio_for_finalized.remove_fulfilled_pending_deposit_events_up_to_block(block_id) {
Ok(0) => {}
Ok(removed) => {
info!(
"Removed {removed} fulfilled pending deposit events up to finalized block {block_id}"
);
}
Err(err) => {
error!(
"Failed to remove fulfilled pending deposit events up to block {block_id}: {err:#}"
);
}
}
});
let (mempool, mempool_handle) = MemPool::new(config.mempool_max_size);
replay_unfulfilled_deposit_events(&store, mempool_handle.clone());
let mempool_handle_for_deposit = mempool_handle.clone();
let dbio_for_deposit = store.dbio();
let on_deposit_event: block_publisher::OnDepositEventSink = Box::new(move |deposit| {
// NOTE: Theoretically Zone SDK may report multiple identical deposits. In practice this
// is very unlikely to happen. For that to happen Sequencer should crash
// between receiving Deposit and Checkpoint events while these events happen
// very fast (because Checkpoints are generated by Zone SDK locally).
let dbio_for_deposit = Arc::clone(&dbio_for_deposit);
let mempool_handle_for_deposit = mempool_handle_for_deposit.clone();
Box::pin(async move {
info!(
"Observed Bedrock Deposit event with id: {:?}",
hex::encode(deposit.op_id)
);
let tx = match build_bridge_deposit_tx(&deposit) {
let id_hex = hex::encode(deposit.op_id);
info!("Observed Bedrock Deposit event with id: {id_hex}");
let event_record = pending_deposit_event_record(&deposit);
match dbio_for_deposit.add_pending_deposit_event(event_record.clone()) {
Ok(true) => {}
Ok(false) => {
info!(
"Deposit event {id_hex} already persisted as unfulfilled, skipping duplicate enqueue",
);
return;
}
Err(err) => {
error!(
"Failed to persist unfulfilled deposit event {id_hex} before enqueue: {err:#}. Deposit will be lost.",
);
return;
}
}
let tx = match build_bridge_deposit_tx_from_event(&event_record) {
Ok(tx) => tx,
Err(err) => {
warn!(
"Skipping finalized Bedrock deposit event due to tx build failure: {err:#}"
error!(
"Failed to build transaction from Bedrock deposit event {id_hex}: {err:#}. Deposit will be lost.",
);
return;
}
@ -177,7 +224,7 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
.await
{
error!(
"Failed to queue sequencer transaction built from finalized Bedrock event: {err:#}"
"Failed to queue sequencer transaction built from finalized Bedrock event: {err:#}. Deposit will be lost."
);
}
})
@ -199,8 +246,11 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
// genesis block so the indexer can find the channel start. After the
// first publish, zone-sdk's checkpoint persistence covers further
// restarts.
if is_fresh_start && let Err(err) = block_publisher.publish_block(&genesis_block).await {
error!("Failed to publish genesis block: {err:#}");
if is_fresh_start {
block_publisher
.publish_block(&genesis_block)
.await
.expect("Failed to publish genesis block");
}
let sequencer_core = Self {
@ -217,30 +267,47 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
/// Produces a new block from mempool transactions and publishes it via zone-sdk.
pub async fn produce_new_block(&mut self) -> Result<u64> {
let block = self
let block_with_meta = self
.build_block_from_mempool()
.context("Failed to build block from mempool transactions")?;
let BlockWithMeta {
block,
deposit_event_ids,
} = block_with_meta;
// TODO: Remove msg_id from store.update — it is no longer needed now that
// zone-sdk manages L1 settlement state via its own checkpoint.
let placeholder_msg_id = [0_u8; 32];
if let Err(err) = self.block_publisher.publish_block(&block).await {
error!("Failed to publish block to Bedrock with error: {err:#}");
}
self.block_publisher
.publish_block(&block)
.await
.context("Failed to publish block to Bedrock")?;
self.store.update(&block, placeholder_msg_id, &self.state)?;
let updated_deposits = self
.store
.mark_unfulfilled_deposit_events_submitted(&deposit_event_ids, block.header.block_id)?;
if updated_deposits > 0 {
info!(
"Marked {updated_deposits} pending deposit events as submitted in block {}",
block.header.block_id
);
}
Ok(self.chain_height)
}
/// Builds a new block from transactions in the mempool.
/// Does NOT publish or store the block — the caller is responsible for that.
pub fn build_block_from_mempool(&mut self) -> Result<Block> {
fn build_block_from_mempool(&mut self) -> Result<BlockWithMeta> {
let now = Instant::now();
let new_block_height = self.next_block_id();
let mut valid_transactions = vec![];
let mut valid_transactions = Vec::new();
let mut deposit_event_ids = Vec::new();
let max_block_size = usize::try_from(self.sequencer_config.max_block_size.as_u64())
.expect("`max_block_size` should fit into usize");
@ -311,6 +378,20 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
let NSSATransaction::Public(public_tx) = &tx else {
panic!("Sequencer may only generate Public transactions, found {tx:#?}");
};
if public_tx.message.program_id == Program::bridge().id() {
let instruction: bridge_core::Instruction =
risc0_zkvm::serde::from_slice(&public_tx.message.instruction_data)
.context("Failed to deserialize bridge instruction")?;
match instruction {
bridge_core::Instruction::Deposit {
l1_deposit_op_id, ..
} => {
deposit_event_ids.push(HashType(l1_deposit_op_id));
}
}
}
self.state
.transition_from_public_transaction(
public_tx,
@ -355,7 +436,11 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
hashable_data.transactions.len(),
now.elapsed().as_secs()
);
Ok(block)
Ok(BlockWithMeta {
block,
deposit_event_ids,
})
}
pub const fn state(&self) -> &nssa::V03State {
@ -411,6 +496,60 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
}
}
struct BlockWithMeta {
block: Block,
deposit_event_ids: Vec<HashType>,
}
/// Checks the database for any pending deposit events that have not yet been marked as submitted in
/// a block, and re-queues them in the mempool in a separate async task for inclusion in the next
/// block.
fn replay_unfulfilled_deposit_events(
store: &SequencerStore,
mempool_handle: MemPoolHandle<(TransactionOrigin, NSSATransaction)>,
) {
let replay_records: Vec<PendingDepositEventRecord> = store
.get_unfulfilled_deposit_events()
.expect("Failed to load unfulfilled deposit events")
.into_iter()
.filter(|record| record.submitted_in_block_id.is_none())
.collect();
if replay_records.is_empty() {
return;
}
info!(
"Found {} unfulfilled deposit events in DB, re-queueing",
replay_records.len()
);
tokio::spawn(async move {
for record in replay_records {
let tx = match build_bridge_deposit_tx_from_event(&record) {
Ok(tx) => tx,
Err(err) => {
warn!(
"Skipping replay of pending deposit event {} due to tx build failure: {err:#}",
hex::encode(record.deposit_op_id)
);
continue;
}
};
if let Err(err) = mempool_handle
.push((TransactionOrigin::Sequencer, tx))
.await
{
error!(
"Failed to re-queue unfulfilled deposit event {} from DB: {err:#}",
hex::encode(record.deposit_op_id)
);
break;
}
}
});
}
/// Builds the initial genesis state from `testnet_initial_state` plus configured genesis
/// transactions. Returns the final state and the list of [`NSSATransaction`]s that should be
/// committed to the genesis block so external observers can replay them.
@ -485,10 +624,22 @@ fn build_supply_bridge_account_genesis_transaction(balance: u128) -> PublicTrans
PublicTransaction::new(message, witness_set)
}
fn build_bridge_deposit_tx(
fn pending_deposit_event_record(
deposit: &logos_blockchain_zone_sdk::state::DepositInfo,
) -> PendingDepositEventRecord {
PendingDepositEventRecord {
deposit_op_id: HashType(deposit.op_id),
source_tx_hash: HashType(deposit.tx_hash.0),
amount: deposit.amount,
metadata: deposit.metadata.clone().into(),
submitted_in_block_id: None,
}
}
fn build_bridge_deposit_tx_from_event(
event: &PendingDepositEventRecord,
) -> Result<NSSATransaction> {
let metadata = DepositMetadata::decode(&deposit.metadata)
let metadata = DepositMetadata::decode(&event.metadata)
.context("Failed to decode finalized Bedrock deposit metadata")?;
let bridge_program_id = Program::bridge().id();
@ -501,9 +652,10 @@ fn build_bridge_deposit_tx(
vec![nssa::system_bridge_account_id(), recipient_vault_id],
vec![],
bridge_core::Instruction::Deposit {
l1_deposit_op_id: event.deposit_op_id.0,
vault_program_id,
recipient_id: metadata.recipient_id,
amount: u128::from(deposit.amount),
amount: u128::from(event.amount),
},
)
.context("Failed to build bridge deposit message")?;
@ -552,6 +704,7 @@ mod tests {
};
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use mempool::MemPoolHandle;
use storage::sequencer::sequencer_cells::PendingDepositEventRecord;
use tempfile::tempdir;
use testnet_initial_state::{initial_accounts, initial_pub_accounts_private_keys};
@ -563,6 +716,11 @@ mod tests {
mock::SequencerCoreWithMockClients,
};
#[derive(borsh::BorshSerialize)]
struct DepositMetadataForEncoding {
recipient_id: nssa::AccountId,
}
fn setup_sequencer_config() -> SequencerConfig {
let tempdir = tempfile::tempdir().unwrap();
let home = tempdir.path().to_path_buf();
@ -620,6 +778,35 @@ mod tests {
(sequencer, mempool_handle)
}
fn tx_is_bridge_deposit(
tx: &NSSATransaction,
deposit_op_id: [u8; 32],
expected_amount: u64,
) -> bool {
let NSSATransaction::Public(public_tx) = tx else {
return false;
};
if public_tx.message.program_id != nssa::program::Program::bridge().id() {
return false;
}
let instruction: bridge_core::Instruction =
match risc0_zkvm::serde::from_slice(&public_tx.message.instruction_data) {
Ok(instruction) => instruction,
Err(_err) => return false,
};
matches!(
instruction,
bridge_core::Instruction::Deposit {
l1_deposit_op_id,
amount,
..
} if l1_deposit_op_id == deposit_op_id && amount == u128::from(expected_amount)
)
}
#[tokio::test]
async fn start_from_config() {
let config = setup_sequencer_config();
@ -690,6 +877,69 @@ mod tests {
let _ = SequencerCoreWithMockClients::start_from_config(config).await;
}
#[tokio::test]
async fn start_from_config_replays_unfulfilled_deposit_events_from_db() {
let config = setup_sequencer_config();
let deposit_op_id = [13_u8; 32];
let expected_amount = 1_u64;
let recipient_id = initial_accounts()[0].account_id;
{
let (_sequencer, _mempool_handle) =
SequencerCoreWithMockClients::start_from_config(config.clone()).await;
}
let pending_event = PendingDepositEventRecord {
deposit_op_id: HashType(deposit_op_id),
source_tx_hash: HashType([7_u8; 32]),
amount: expected_amount,
metadata: borsh::to_vec(&DepositMetadataForEncoding { recipient_id }).unwrap(),
submitted_in_block_id: None,
};
{
let signing_key = nssa::PrivateKey::try_new(config.signing_key).unwrap();
let store = SequencerStore::open_db(&config.home.join("rocksdb"), signing_key).unwrap();
let inserted = store
.dbio()
.add_pending_deposit_event(pending_event)
.unwrap();
assert!(inserted);
}
let (mut sequencer, _mempool_handle) =
SequencerCoreWithMockClients::start_from_config(config).await;
let (origin, tx) = tokio::time::timeout(Duration::from_secs(5), async {
loop {
if let Some((origin, tx)) = sequencer.mempool.pop() {
return (origin, tx);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await
.expect("Timed out waiting for pending deposit event to be replayed into mempool");
match origin {
TransactionOrigin::Sequencer => {}
TransactionOrigin::User => {
panic!("Unexpected user transaction in empty mempool replay test")
}
}
assert!(tx_is_bridge_deposit(&tx, deposit_op_id, expected_amount));
let pending_events = sequencer.store.get_unfulfilled_deposit_events().unwrap();
let replayed_event = pending_events
.into_iter()
.find(|event| event.deposit_op_id == HashType(deposit_op_id))
.expect("Pending deposit event should remain in DB until included in a block");
assert!(replayed_event.submitted_in_block_id.is_none());
}
#[test]
fn transaction_pre_check_pass() {
let tx = common::test_utils::produce_dummy_empty_transaction();

View File

@ -1,6 +1,9 @@
use std::{path::Path, sync::Arc};
use common::block::{BedrockStatus, Block, BlockMeta, MantleMsgId};
use common::{
HashType,
block::{BedrockStatus, Block, BlockMeta, MantleMsgId},
};
use nssa::V03State;
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch,
@ -12,7 +15,9 @@ use crate::{
error::DbError,
sequencer::sequencer_cells::{
LastFinalizedBlockIdCell, LatestBlockMetaCellOwned, LatestBlockMetaCellRef,
NSSAStateCellOwned, NSSAStateCellRef, ZoneSdkCheckpointCellOwned, ZoneSdkCheckpointCellRef,
NSSAStateCellOwned, NSSAStateCellRef, PendingDepositEventRecord,
PendingDepositEventsCellOwned, PendingDepositEventsCellRef, ZoneSdkCheckpointCellOwned,
ZoneSdkCheckpointCellRef,
},
};
@ -24,6 +29,9 @@ pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id";
pub const DB_META_LATEST_BLOCK_META_KEY: &str = "latest_block_meta";
/// Key base for storing the zone-sdk sequencer checkpoint (opaque bytes).
pub const DB_META_ZONE_SDK_CHECKPOINT_KEY: &str = "zone_sdk_checkpoint";
/// Key base for storing queued deposit events that were not yet
/// fulfilled on L2.
pub const DB_META_PENDING_DEPOSIT_EVENTS_KEY: &str = "pending_deposit_events";
/// Key base for storing the NSSA state.
pub const DB_NSSA_STATE_KEY: &str = "nssa_state";
@ -239,6 +247,72 @@ impl RocksDBIO {
self.put(&ZoneSdkCheckpointCellRef(bytes), ())
}
pub fn get_pending_deposit_events(&self) -> DbResult<Vec<PendingDepositEventRecord>> {
Ok(self
.get_opt::<PendingDepositEventsCellOwned>(())?
.map_or_else(Vec::new, |cell| cell.0))
}
fn put_pending_deposit_events(&self, records: &[PendingDepositEventRecord]) -> DbResult<()> {
self.put(&PendingDepositEventsCellRef(records), ())
}
pub fn add_pending_deposit_event(&self, event: PendingDepositEventRecord) -> DbResult<bool> {
let mut records = self.get_pending_deposit_events()?;
if records
.iter()
.any(|record| record.deposit_op_id == event.deposit_op_id)
{
return Ok(false);
}
records.push(event);
self.put_pending_deposit_events(&records)?;
Ok(true)
}
pub fn mark_pending_deposit_events_submitted(
&self,
deposit_op_ids: &[HashType],
submitted_block_id: u64,
) -> DbResult<usize> {
let mut records = self.get_pending_deposit_events()?;
let mut updated: usize = 0;
for record in records
.iter_mut()
.filter(|record| deposit_op_ids.contains(&record.deposit_op_id))
{
record.submitted_in_block_id = Some(submitted_block_id);
updated = updated.saturating_add(1);
}
if updated > 0 {
self.put_pending_deposit_events(&records)?;
}
Ok(updated)
}
pub fn remove_fulfilled_pending_deposit_events_up_to_block(
&self,
finalized_block_id: u64,
) -> DbResult<usize> {
let mut records = self.get_pending_deposit_events()?;
let before = records.len();
records.retain(|record| {
record
.submitted_in_block_id
.is_none_or(|submitted_id| submitted_id > finalized_block_id)
});
let removed = before.saturating_sub(records.len());
if removed > 0 {
self.put_pending_deposit_events(&records)?;
}
Ok(removed)
}
pub fn put_block(
&self,
block: &Block,

View File

@ -1,5 +1,5 @@
use borsh::{BorshDeserialize, BorshSerialize};
use common::block::BlockMeta;
use common::{HashType, block::BlockMeta};
use nssa::V03State;
use crate::{
@ -8,7 +8,7 @@ use crate::{
error::DbError,
sequencer::{
CF_NSSA_STATE_NAME, DB_META_LAST_FINALIZED_BLOCK_ID, DB_META_LATEST_BLOCK_META_KEY,
DB_META_ZONE_SDK_CHECKPOINT_KEY, DB_NSSA_STATE_KEY,
DB_META_PENDING_DEPOSIT_EVENTS_KEY, DB_META_ZONE_SDK_CHECKPOINT_KEY, DB_NSSA_STATE_KEY,
},
};
@ -131,12 +131,56 @@ impl SimpleWritableCell for ZoneSdkCheckpointCellRef<'_> {
}
}
#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)]
pub struct PendingDepositEventRecord {
pub deposit_op_id: HashType,
pub source_tx_hash: HashType,
pub amount: u64,
pub metadata: Vec<u8>,
/// Set when block containing the deposit event is submitted, but not necessarily finalized.
pub submitted_in_block_id: Option<u64>,
}
#[derive(BorshDeserialize)]
pub struct PendingDepositEventsCellOwned(pub Vec<PendingDepositEventRecord>);
impl SimpleStorableCell for PendingDepositEventsCellOwned {
type KeyParams = ();
const CELL_NAME: &'static str = DB_META_PENDING_DEPOSIT_EVENTS_KEY;
const CF_NAME: &'static str = CF_META_NAME;
}
impl SimpleReadableCell for PendingDepositEventsCellOwned {}
#[derive(BorshSerialize)]
pub struct PendingDepositEventsCellRef<'records>(pub &'records [PendingDepositEventRecord]);
impl SimpleStorableCell for PendingDepositEventsCellRef<'_> {
type KeyParams = ();
const CELL_NAME: &'static str = DB_META_PENDING_DEPOSIT_EVENTS_KEY;
const CF_NAME: &'static str = CF_META_NAME;
}
impl SimpleWritableCell for PendingDepositEventsCellRef<'_> {
fn value_constructor(&self) -> DbResult<Vec<u8>> {
borsh::to_vec(&self).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize pending deposit events cell".to_owned()),
)
})
}
}
#[cfg(test)]
mod uniform_tests {
use crate::{
cells::SimpleStorableCell as _,
sequencer::sequencer_cells::{
LatestBlockMetaCellOwned, LatestBlockMetaCellRef, NSSAStateCellOwned, NSSAStateCellRef,
PendingDepositEventsCellOwned, PendingDepositEventsCellRef,
},
};
@ -165,4 +209,20 @@ mod uniform_tests {
LatestBlockMetaCellOwned::key_constructor(()).unwrap()
);
}
#[test]
fn pending_deposit_events_ref_and_owned_is_aligned() {
assert_eq!(
PendingDepositEventsCellRef::CELL_NAME,
PendingDepositEventsCellOwned::CELL_NAME
);
assert_eq!(
PendingDepositEventsCellRef::CF_NAME,
PendingDepositEventsCellOwned::CF_NAME
);
assert_eq!(
PendingDepositEventsCellRef::key_constructor(()).unwrap(),
PendingDepositEventsCellOwned::key_constructor(()).unwrap()
);
}
}