diff --git a/integration_tests/tests/bridge.rs b/integration_tests/tests/bridge.rs index c108a66f..a2582c23 100644 --- a/integration_tests/tests/bridge.rs +++ b/integration_tests/tests/bridge.rs @@ -485,6 +485,9 @@ async fn bedrock_deposit_claim_and_withdraw_round_trip_succeeds() -> anyhow::Res observe_result .context("Failed while waiting for finalized withdraw event from zone indexer")?; + // Sleep to observe sequencer log about validated withdraw event + tokio::time::sleep(Duration::from_secs(1)).await; + Ok(()) } diff --git a/sequencer/core/src/block_publisher.rs b/sequencer/core/src/block_publisher.rs index 3b5c1d4a..a41216d8 100644 --- a/sequencer/core/src/block_publisher.rs +++ b/sequencer/core/src/block_publisher.rs @@ -3,19 +3,21 @@ use std::{pin::Pin, sync::Arc, time::Duration}; use anyhow::{Context as _, Result}; use common::block::Block; use log::{info, warn}; -use logos_blockchain_core::mantle::{Note, ledger::Outputs, ops::channel::inscribe::Inscription}; +use logos_blockchain_core::mantle::ops::channel::inscribe::Inscription; pub use logos_blockchain_key_management_system_service::keys::{Ed25519Key, ZkKey}; pub use logos_blockchain_zone_sdk::sequencer::SequencerCheckpoint; use logos_blockchain_zone_sdk::{ CommonHttpClient, adapter::NodeHttpClient, - sequencer::{Event, SequencerConfig as ZoneSdkSequencerConfig, SequencerHandle, ZoneSequencer}, - state::{DepositInfo, FinalizedOp, InscriptionInfo}, + sequencer::{ + Event, SequencerConfig as ZoneSdkSequencerConfig, SequencerHandle, WithdrawArg, + ZoneSequencer, + }, + state::{DepositInfo, FinalizedOp, InscriptionInfo, WithdrawInfo}, }; -use num_bigint::BigUint; use tokio::task::JoinHandle; -use crate::{BridgeWithdrawData, config::BedrockConfig}; +use crate::config::BedrockConfig; /// Sink for `Event::Published` checkpoints emitted by the drive task. /// Caller is responsible for persistence (e.g. writing to rocksdb). @@ -30,8 +32,16 @@ pub type FinalizedBlockSink = Box; pub type OnDepositEventSink = Box Pin + Send>> + Send + 'static>; +/// Sink for finalized Bedrock withdraw events. +pub type OnWithdrawEventSink = + Box Pin + Send>> + Send + 'static>; + #[expect(async_fn_in_trait, reason = "We don't care about Send/Sync here")] pub trait BlockPublisherTrait: Clone { + #[expect( + clippy::too_many_arguments, + reason = "Looks better than bundling all those callbacks into a struct" + )] async fn new( config: &BedrockConfig, bedrock_signing_key: Ed25519Key, @@ -40,15 +50,12 @@ pub trait BlockPublisherTrait: Clone { on_checkpoint: CheckpointSink, on_finalized_block: FinalizedBlockSink, on_deposit_event: OnDepositEventSink, + on_withdraw_event: OnWithdrawEventSink, ) -> Result; /// Fire-and-forget publish. Zone-sdk drives the actual submission and /// retries internally; this just hands the payload off. - async fn publish_block( - &self, - block: &Block, - bridge_withdrawals: Vec, - ) -> Result<()>; + async fn publish_block(&self, block: &Block, withdraws: Vec) -> Result<()>; } /// Real block publisher backed by zone-sdk's `ZoneSequencer`. @@ -76,6 +83,7 @@ impl BlockPublisherTrait for ZoneSdkPublisher { on_checkpoint: CheckpointSink, on_finalized_block: FinalizedBlockSink, on_deposit_event: OnDepositEventSink, + on_withdraw_event: OnWithdrawEventSink, ) -> Result { let basic_auth = config.auth.clone().map(Into::into); let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), config.node_url.clone()); @@ -112,7 +120,9 @@ impl BlockPublisherTrait for ZoneSdkPublisher { FinalizedOp::Deposit(deposit) => { on_deposit_event(deposit).await; } - FinalizedOp::Withdraw(_) => {} + FinalizedOp::Withdraw(withdraw) => { + on_withdraw_event(withdraw).await; + } } } } @@ -129,18 +139,14 @@ impl BlockPublisherTrait for ZoneSdkPublisher { }) } - async fn publish_block( - &self, - block: &Block, - bridge_withdrawals: Vec, - ) -> Result<()> { + async fn publish_block(&self, block: &Block, withdraws: Vec) -> Result<()> { let data = borsh::to_vec(block).context("Failed to serialize block")?; let data_bounded: Inscription = data .try_into() .context("Block data exceeds maximum allowed size")?; let data_byte_size = data_bounded.len(); - if bridge_withdrawals.is_empty() { + if withdraws.is_empty() { self.handle .publish_message(data_bounded) .await @@ -151,20 +157,6 @@ impl BlockPublisherTrait for ZoneSdkPublisher { return Ok(()); } - let withdraws: Vec<_> = bridge_withdrawals - .into_iter() - .map(|withdrawal| { - let recipient_pk = - logos_blockchain_key_management_system_service::keys::ZkPublicKey::from( - BigUint::from_bytes_le(&withdrawal.bedrock_account_pk), - ); - - logos_blockchain_zone_sdk::sequencer::WithdrawArg { - outputs: Outputs::new(vec![Note::new(withdrawal.amount, recipient_pk)]), - } - }) - .collect(); - let withdraw_count = withdraws.len(); self.handle .publish_atomic_withdraw(data_bounded, withdraws) diff --git a/sequencer/core/src/block_store.rs b/sequencer/core/src/block_store.rs index ada6d306..44e9a0dd 100644 --- a/sequencer/core/src/block_store.rs +++ b/sequencer/core/src/block_store.rs @@ -165,6 +165,24 @@ impl SequencerStore { self.dbio.put_zone_sdk_checkpoint_bytes(&bytes)?; Ok(()) } + + pub fn record_unseen_withdraw( + &self, + amount: u64, + bedrock_account_pk: [u8; 32], + ) -> DbResult { + self.dbio + .increment_unseen_withdraw_count(amount, bedrock_account_pk) + } + + pub fn consume_unseen_withdraw( + &self, + amount: u64, + bedrock_account_pk: [u8; 32], + ) -> DbResult { + self.dbio + .consume_unseen_withdraw_count(amount, bedrock_account_pk) + } } pub(crate) fn block_to_transactions_map(block: &Block) -> HashMap { diff --git a/sequencer/core/src/lib.rs b/sequencer/core/src/lib.rs index 01306ae8..b163ae25 100644 --- a/sequencer/core/src/lib.rs +++ b/sequencer/core/src/lib.rs @@ -1,4 +1,4 @@ -use std::{path::Path, time::Instant}; +use std::{path::Path, sync::Arc, time::Instant}; use anyhow::{Context as _, Result, anyhow}; use borsh::BorshDeserialize; @@ -10,12 +10,15 @@ use common::{ use config::{GenesisAction, SequencerConfig}; use log::{error, info, warn}; use logos_blockchain_key_management_system_service::keys::{ED25519_SECRET_KEY_SIZE, Ed25519Key}; +use logos_blockchain_zone_sdk::sequencer::WithdrawArg; use mempool::{MemPool, MemPoolHandle}; #[cfg(feature = "mock")] pub use mock::SequencerCoreWithMockClients; use nssa::{AccountId, PublicTransaction, program::Program, public_transaction::Message}; use nssa_core::GENESIS_BLOCK_ID; +use num_bigint::BigUint; pub use storage::error::DbError; +use storage::sequencer::RocksDBIO; use crate::{ block_publisher::{BlockPublisherTrait, ZoneSdkPublisher}, @@ -131,66 +134,17 @@ impl SequencerCore { .expect("Failed to load zone-sdk checkpoint"); let is_fresh_start = initial_checkpoint.is_none(); - let dbio_for_checkpoint = store.dbio(); - let on_checkpoint: block_publisher::CheckpointSink = Box::new(move |cp| { - let bytes = match serde_json::to_vec(&cp) { - Ok(b) => b, - Err(err) => { - error!("Failed to serialize zone-sdk checkpoint: {err:#}"); - return; - } - }; - if let Err(err) = dbio_for_checkpoint.put_zone_sdk_checkpoint_bytes(&bytes) { - error!("Failed to persist zone-sdk checkpoint: {err:#}"); - } - }); - - let dbio_for_finalized = store.dbio(); - let on_finalized_block: block_publisher::FinalizedBlockSink = Box::new(move |block_id| { - if let Err(err) = dbio_for_finalized.clean_pending_blocks_up_to(block_id) { - error!("Failed to mark pending blocks finalized up to {block_id}: {err:#}"); - } - }); - let (mempool, mempool_handle) = MemPool::new(config.mempool_max_size); - let mempool_handle_for_deposit = mempool_handle.clone(); - let on_deposit_event: block_publisher::OnDepositEventSink = Box::new(move |deposit| { - let mempool_handle_for_deposit = mempool_handle_for_deposit.clone(); - Box::pin(async move { - info!( - "Observed Bedrock Deposit event with id: {:?}", - hex::encode(deposit.op_id) - ); - let tx = match build_bridge_deposit_tx(&deposit) { - Ok(tx) => tx, - Err(err) => { - warn!( - "Skipping finalized Bedrock deposit event due to tx build failure: {err:#}" - ); - return; - } - }; - - if let Err(err) = mempool_handle_for_deposit - .push((TransactionOrigin::Sequencer, tx)) - .await - { - error!( - "Failed to queue sequencer transaction built from finalized Bedrock event: {err:#}" - ); - } - }) - }); - let block_publisher = BP::new( &config.bedrock_config, bedrock_signing_key, config.retry_pending_blocks_timeout, initial_checkpoint, - on_checkpoint, - on_finalized_block, - on_deposit_event, + Self::on_checkpoint(store.dbio()), + Self::on_finalized_block(store.dbio()), + Self::on_deposit_event(mempool_handle.clone()), + Self::on_withdraw_event(store.dbio()), ) .await .expect("Failed to initialize Block Publisher"); @@ -217,24 +171,114 @@ impl SequencerCore { (sequencer_core, mempool_handle) } + fn on_checkpoint(dbio: Arc) -> block_publisher::CheckpointSink { + Box::new(move |cp| { + let bytes = match serde_json::to_vec(&cp) { + Ok(b) => b, + Err(err) => { + error!("Failed to serialize zone-sdk checkpoint: {err:#}"); + return; + } + }; + if let Err(err) = dbio.put_zone_sdk_checkpoint_bytes(&bytes) { + error!("Failed to persist zone-sdk checkpoint: {err:#}"); + } + }) + } + + fn on_finalized_block(dbio: Arc) -> block_publisher::FinalizedBlockSink { + Box::new(move |block_id| { + if let Err(err) = dbio.clean_pending_blocks_up_to(block_id) { + error!("Failed to mark pending blocks finalized up to {block_id}: {err:#}"); + } + }) + } + + fn on_deposit_event( + mempool_handle: MemPoolHandle<(TransactionOrigin, NSSATransaction)>, + ) -> block_publisher::OnDepositEventSink { + Box::new(move |deposit| { + let mempool_handle = mempool_handle.clone(); + Box::pin(async move { + info!( + "Observed Bedrock Deposit event with id: {:?}", + hex::encode(deposit.op_id) + ); + let tx = match build_bridge_deposit_tx(&deposit) { + Ok(tx) => tx, + Err(err) => { + warn!( + "Skipping finalized Bedrock deposit event due to tx build failure: {err:#}" + ); + return; + } + }; + + if let Err(err) = mempool_handle + .push((TransactionOrigin::Sequencer, tx)) + .await + { + error!( + "Failed to queue sequencer transaction built from finalized Bedrock event: {err:#}" + ); + } + }) + }) + } + + fn on_withdraw_event(dbio: Arc) -> block_publisher::OnWithdrawEventSink { + Box::new(move |withdraw| { + let dbio = Arc::clone(&dbio); + Box::pin(async move { + let hash_encoded = hex::encode(withdraw.tx_hash.as_ref()); + let withdraw_key = match withdraw_event_reconciliation_key(&withdraw.op.outputs) { + Ok(key) => key, + Err(err) => { + error!( + "Failed to build reconciliation key for Bedrock Withdraw event with tx_hash {hash_encoded}: {err:#}" + ); + return; + } + }; + + match dbio.consume_unseen_withdraw_count( + withdraw_key.amount, + withdraw_key.bedrock_account_pk, + ) { + Ok(true) => { + info!("Validated Bedrock Withdraw event with tx_hash: {hash_encoded}"); + } + Ok(false) => warn!( + "Unexpected Bedrock Withdraw event with tx_hash {hash_encoded}: no matching unseen withdraw found" + ), + Err(err) => error!( + "Failed to reconcile Bedrock Withdraw event with tx_hash {hash_encoded}: {err:#}" + ), + } + }) + }) + } + /// Produces a new block from mempool transactions and publishes it via zone-sdk. pub async fn produce_new_block(&mut self) -> Result { - let BlockWithMeta { - block, - bridge_withdrawals, - } = self + let BlockWithMeta { block, withdraws } = self .build_block_from_mempool() .context("Failed to build block from mempool transactions")?; + for withdraw in &withdraws { + let withdraw_key = withdraw_event_reconciliation_key(&withdraw.outputs) + .context("Failed to derive unseen-withdraw key from withdraw data")?; + + self.store + .record_unseen_withdraw(withdraw_key.amount, withdraw_key.bedrock_account_pk) + .context("Failed to persist unseen withdraw for reconciliation")?; + } + // TODO: Remove msg_id from store.update — it is no longer needed now that // zone-sdk manages L1 settlement state via its own checkpoint. let placeholder_msg_id = [0_u8; 32]; - if let Err(err) = self - .block_publisher - .publish_block(&block, bridge_withdrawals) - .await - { + if let Err(err) = self.block_publisher.publish_block(&block, withdraws).await { error!("Failed to publish block to Bedrock with error: {err:#}"); } self.store.update(&block, placeholder_msg_id, &self.state)?; @@ -253,7 +297,7 @@ impl SequencerCore { let new_block_height = self.next_block_id(); let mut valid_transactions = vec![]; - let mut bridge_withdrawals = vec![]; + let mut withdraws = vec![]; let max_block_size = usize::try_from(self.sequencer_config.max_block_size.as_u64()) .expect("`max_block_size` should fit into usize"); @@ -319,7 +363,7 @@ impl SequencerCore { }; if let Some(withdraw_data) = extract_bridge_withdraw_data(&tx) { - bridge_withdrawals.push(withdraw_data); + withdraws.push(withdraw_data); } self.state.apply_state_diff(validated_diff); @@ -374,10 +418,7 @@ impl SequencerCore { now.elapsed().as_secs() ); - Ok(BlockWithMeta { - block, - bridge_withdrawals, - }) + Ok(BlockWithMeta { block, withdraws }) } pub const fn state(&self) -> &nssa::V03State { @@ -435,13 +476,13 @@ impl SequencerCore { struct BlockWithMeta { block: Block, - bridge_withdrawals: Vec, + withdraws: Vec, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct BridgeWithdrawData { - pub amount: u64, - pub bedrock_account_pk: [u8; 32], +#[derive(Debug, Clone, Copy)] +struct WithdrawReconciliationKey { + amount: u64, + bedrock_account_pk: [u8; 32], } /// Builds the initial genesis state from `testnet_initial_state` plus configured genesis @@ -549,7 +590,7 @@ fn build_bridge_deposit_tx( } #[must_use] -pub fn extract_bridge_withdraw_data(tx: &NSSATransaction) -> Option { +pub fn extract_bridge_withdraw_data(tx: &NSSATransaction) -> Option { let NSSATransaction::Public(tx) = tx else { return None; }; @@ -567,16 +608,55 @@ pub fn extract_bridge_withdraw_data(tx: &NSSATransaction) -> Option Some(BridgeWithdrawData { - amount, - bedrock_account_pk, - }), + } => { + let recipient_pk = + logos_blockchain_key_management_system_service::keys::ZkPublicKey::from( + BigUint::from_bytes_le(&bedrock_account_pk), + ); + + Some(WithdrawArg { + outputs: logos_blockchain_core::mantle::ledger::Outputs::new(vec![ + logos_blockchain_core::mantle::Note::new(amount, recipient_pk), + ]), + }) + } bridge_core::Instruction::Deposit { .. } => unreachable!( "Deposit instructions from users should never pass validation, and thus should never be seen here" ), } } +fn withdraw_event_reconciliation_key( + outputs: &logos_blockchain_core::mantle::ledger::Outputs, +) -> Result { + let [note] = outputs.as_ref().as_slice() else { + return Err(anyhow!( + "Unsupported withdraw output count for reconciliation: {}", + outputs.len() + )); + }; + + // `extract_bridge_withdraw_data` maps [u8;32] LE -> BigUint -> ZkPublicKey. + // Reconcile by reversing that direction here. + let mut bedrock_account_pk = BigUint::from(note.pk.into_inner()).to_bytes_le(); + if bedrock_account_pk.len() > 32 { + return Err(anyhow!( + "Withdraw recipient public key is too large: {} bytes", + bedrock_account_pk.len() + )); + } + bedrock_account_pk.resize(32, 0); + + let bedrock_account_pk: [u8; 32] = bedrock_account_pk + .try_into() + .expect("Public key bytes were padded/truncated to 32 bytes"); + + Ok(WithdrawReconciliationKey { + amount: note.value, + bedrock_account_pk, + }) +} + /// Load signing key from file or generate a new one if it doesn't exist. fn load_or_create_signing_key(path: &Path) -> Result { if path.exists() { diff --git a/sequencer/core/src/mock.rs b/sequencer/core/src/mock.rs index 0b0024ac..5f2ab8cf 100644 --- a/sequencer/core/src/mock.rs +++ b/sequencer/core/src/mock.rs @@ -3,12 +3,12 @@ use std::time::Duration; use anyhow::Result; use common::block::Block; use logos_blockchain_key_management_system_service::keys::Ed25519Key; +use logos_blockchain_zone_sdk::sequencer::WithdrawArg; use crate::{ - BridgeWithdrawData, block_publisher::{ BlockPublisherTrait, CheckpointSink, FinalizedBlockSink, OnDepositEventSink, - SequencerCheckpoint, + OnWithdrawEventSink, SequencerCheckpoint, }, config::BedrockConfig, }; @@ -27,6 +27,7 @@ impl BlockPublisherTrait for MockBlockPublisher { _on_checkpoint: CheckpointSink, _on_finalized_block: FinalizedBlockSink, _on_deposit_event: OnDepositEventSink, + _on_withdraw_event: OnWithdrawEventSink, ) -> Result { Ok(Self) } @@ -34,7 +35,7 @@ impl BlockPublisherTrait for MockBlockPublisher { async fn publish_block( &self, _block: &Block, - _bridge_withdrawals: Vec, + _bridge_withdrawals: Vec, ) -> Result<()> { Ok(()) } diff --git a/storage/src/sequencer/mod.rs b/storage/src/sequencer/mod.rs index be5e5cfe..d1c371b0 100644 --- a/storage/src/sequencer/mod.rs +++ b/storage/src/sequencer/mod.rs @@ -8,11 +8,15 @@ use rocksdb::{ use crate::{ CF_BLOCK_NAME, CF_META_NAME, DB_META_FIRST_BLOCK_IN_DB_KEY, DBIO, DbResult, - cells::shared_cells::{BlockCell, FirstBlockCell, FirstBlockSetCell, LastBlockCell}, + cells::{ + SimpleStorableCell, + shared_cells::{BlockCell, FirstBlockCell, FirstBlockSetCell, LastBlockCell}, + }, error::DbError, sequencer::sequencer_cells::{ LastFinalizedBlockIdCell, LatestBlockMetaCellOwned, LatestBlockMetaCellRef, - NSSAStateCellOwned, NSSAStateCellRef, ZoneSdkCheckpointCellOwned, ZoneSdkCheckpointCellRef, + NSSAStateCellOwned, NSSAStateCellRef, UnseenWithdrawCountCell, ZoneSdkCheckpointCellOwned, + ZoneSdkCheckpointCellRef, }, }; @@ -24,6 +28,8 @@ pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id"; pub const DB_META_LATEST_BLOCK_META_KEY: &str = "latest_block_meta"; /// Key base for storing the zone-sdk sequencer checkpoint (opaque bytes). pub const DB_META_ZONE_SDK_CHECKPOINT_KEY: &str = "zone_sdk_checkpoint"; +/// Key base for counting unseen L2 withdraw intents. +pub const DB_META_UNSEEN_WITHDRAW_COUNT_KEY: &str = "unseen_withdraw_count"; /// Key base for storing the NSSA state. pub const DB_NSSA_STATE_KEY: &str = "nssa_state"; @@ -239,6 +245,58 @@ impl RocksDBIO { self.put(&ZoneSdkCheckpointCellRef(bytes), ()) } + pub fn increment_unseen_withdraw_count( + &self, + amount: u64, + bedrock_account_pk: [u8; 32], + ) -> DbResult { + let key_params = (amount, bedrock_account_pk); + + let current = self + .get_opt::(key_params)? + .map_or(0, |cell| cell.0); + + let next = current.checked_add(1).ok_or_else(|| { + DbError::db_interaction_error("Unseen withdraw counter overflow".to_owned()) + })?; + + self.put(&UnseenWithdrawCountCell(next), key_params)?; + + Ok(next) + } + + pub fn consume_unseen_withdraw_count( + &self, + amount: u64, + bedrock_account_pk: [u8; 32], + ) -> DbResult { + let key_params = (amount, bedrock_account_pk); + + let Some(current) = self + .get_opt::(key_params)? + .map(|cell| cell.0) + else { + return Ok(false); + }; + + if let Some(next) = current.checked_sub(1) { + self.put(&UnseenWithdrawCountCell(next), key_params)?; + } else { + let cf_meta = self.meta_column(); + let db_key = + ::key_constructor(key_params)?; + + self.db.delete_cf(&cf_meta, db_key).map_err(|rerr| { + DbError::rocksdb_cast_message( + rerr, + Some("Failed to delete unseen withdraw count".to_owned()), + ) + })?; + } + + Ok(true) + } + pub fn put_block( &self, block: &Block, diff --git a/storage/src/sequencer/sequencer_cells.rs b/storage/src/sequencer/sequencer_cells.rs index 2bf65367..6709a9e9 100644 --- a/storage/src/sequencer/sequencer_cells.rs +++ b/storage/src/sequencer/sequencer_cells.rs @@ -8,7 +8,7 @@ use crate::{ error::DbError, sequencer::{ CF_NSSA_STATE_NAME, DB_META_LAST_FINALIZED_BLOCK_ID, DB_META_LATEST_BLOCK_META_KEY, - DB_META_ZONE_SDK_CHECKPOINT_KEY, DB_NSSA_STATE_KEY, + DB_META_UNSEEN_WITHDRAW_COUNT_KEY, DB_META_ZONE_SDK_CHECKPOINT_KEY, DB_NSSA_STATE_KEY, }, }; @@ -131,6 +131,41 @@ impl SimpleWritableCell for ZoneSdkCheckpointCellRef<'_> { } } +#[derive(Debug, BorshSerialize, BorshDeserialize)] +pub struct UnseenWithdrawCountCell(pub u64); + +impl SimpleStorableCell for UnseenWithdrawCountCell { + type KeyParams = (u64, [u8; 32]); + + const CELL_NAME: &'static str = DB_META_UNSEEN_WITHDRAW_COUNT_KEY; + const CF_NAME: &'static str = CF_META_NAME; + + fn key_constructor((amount, bedrock_account_pk): Self::KeyParams) -> DbResult> { + borsh::to_vec(&(Self::CELL_NAME, amount, bedrock_account_pk)).map_err(|err| { + DbError::borsh_cast_message( + err, + Some(format!( + "Failed to serialize {:?} key params", + Self::CELL_NAME + )), + ) + }) + } +} + +impl SimpleReadableCell for UnseenWithdrawCountCell {} + +impl SimpleWritableCell for UnseenWithdrawCountCell { + fn value_constructor(&self) -> DbResult> { + borsh::to_vec(&self).map_err(|err| { + DbError::borsh_cast_message( + err, + Some("Failed to serialize unseen withdraw count".to_owned()), + ) + }) + } +} + #[cfg(test)] mod uniform_tests { use crate::{