feat: add bedrock withdraw events validation

This commit is contained in:
Daniil Polyakov 2026-06-03 23:50:44 +03:00
parent b9d9c802e9
commit dd82e4770f
7 changed files with 303 additions and 116 deletions

View File

@ -485,6 +485,9 @@ async fn bedrock_deposit_claim_and_withdraw_round_trip_succeeds() -> anyhow::Res
observe_result
.context("Failed while waiting for finalized withdraw event from zone indexer")?;
// Sleep to observe sequencer log about validated withdraw event
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
}

View File

@ -3,19 +3,21 @@ use std::{pin::Pin, sync::Arc, time::Duration};
use anyhow::{Context as _, Result};
use common::block::Block;
use log::{info, warn};
use logos_blockchain_core::mantle::{Note, ledger::Outputs, ops::channel::inscribe::Inscription};
use logos_blockchain_core::mantle::ops::channel::inscribe::Inscription;
pub use logos_blockchain_key_management_system_service::keys::{Ed25519Key, ZkKey};
pub use logos_blockchain_zone_sdk::sequencer::SequencerCheckpoint;
use logos_blockchain_zone_sdk::{
CommonHttpClient,
adapter::NodeHttpClient,
sequencer::{Event, SequencerConfig as ZoneSdkSequencerConfig, SequencerHandle, ZoneSequencer},
state::{DepositInfo, FinalizedOp, InscriptionInfo},
sequencer::{
Event, SequencerConfig as ZoneSdkSequencerConfig, SequencerHandle, WithdrawArg,
ZoneSequencer,
},
state::{DepositInfo, FinalizedOp, InscriptionInfo, WithdrawInfo},
};
use num_bigint::BigUint;
use tokio::task::JoinHandle;
use crate::{BridgeWithdrawData, config::BedrockConfig};
use crate::config::BedrockConfig;
/// Sink for `Event::Published` checkpoints emitted by the drive task.
/// Caller is responsible for persistence (e.g. writing to rocksdb).
@ -30,8 +32,16 @@ pub type FinalizedBlockSink = Box<dyn Fn(u64) + Send + 'static>;
pub type OnDepositEventSink =
Box<dyn Fn(DepositInfo) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;
/// Sink for finalized Bedrock withdraw events.
pub type OnWithdrawEventSink =
Box<dyn Fn(WithdrawInfo) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;
#[expect(async_fn_in_trait, reason = "We don't care about Send/Sync here")]
pub trait BlockPublisherTrait: Clone {
#[expect(
clippy::too_many_arguments,
reason = "Looks better than bundling all those callbacks into a struct"
)]
async fn new(
config: &BedrockConfig,
bedrock_signing_key: Ed25519Key,
@ -40,15 +50,12 @@ pub trait BlockPublisherTrait: Clone {
on_checkpoint: CheckpointSink,
on_finalized_block: FinalizedBlockSink,
on_deposit_event: OnDepositEventSink,
on_withdraw_event: OnWithdrawEventSink,
) -> Result<Self>;
/// Fire-and-forget publish. Zone-sdk drives the actual submission and
/// retries internally; this just hands the payload off.
async fn publish_block(
&self,
block: &Block,
bridge_withdrawals: Vec<BridgeWithdrawData>,
) -> Result<()>;
async fn publish_block(&self, block: &Block, withdraws: Vec<WithdrawArg>) -> Result<()>;
}
/// Real block publisher backed by zone-sdk's `ZoneSequencer`.
@ -76,6 +83,7 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
on_checkpoint: CheckpointSink,
on_finalized_block: FinalizedBlockSink,
on_deposit_event: OnDepositEventSink,
on_withdraw_event: OnWithdrawEventSink,
) -> Result<Self> {
let basic_auth = config.auth.clone().map(Into::into);
let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), config.node_url.clone());
@ -112,7 +120,9 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
FinalizedOp::Deposit(deposit) => {
on_deposit_event(deposit).await;
}
FinalizedOp::Withdraw(_) => {}
FinalizedOp::Withdraw(withdraw) => {
on_withdraw_event(withdraw).await;
}
}
}
}
@ -129,18 +139,14 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
})
}
async fn publish_block(
&self,
block: &Block,
bridge_withdrawals: Vec<BridgeWithdrawData>,
) -> Result<()> {
async fn publish_block(&self, block: &Block, withdraws: Vec<WithdrawArg>) -> Result<()> {
let data = borsh::to_vec(block).context("Failed to serialize block")?;
let data_bounded: Inscription = data
.try_into()
.context("Block data exceeds maximum allowed size")?;
let data_byte_size = data_bounded.len();
if bridge_withdrawals.is_empty() {
if withdraws.is_empty() {
self.handle
.publish_message(data_bounded)
.await
@ -151,20 +157,6 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
return Ok(());
}
let withdraws: Vec<_> = bridge_withdrawals
.into_iter()
.map(|withdrawal| {
let recipient_pk =
logos_blockchain_key_management_system_service::keys::ZkPublicKey::from(
BigUint::from_bytes_le(&withdrawal.bedrock_account_pk),
);
logos_blockchain_zone_sdk::sequencer::WithdrawArg {
outputs: Outputs::new(vec![Note::new(withdrawal.amount, recipient_pk)]),
}
})
.collect();
let withdraw_count = withdraws.len();
self.handle
.publish_atomic_withdraw(data_bounded, withdraws)

View File

@ -165,6 +165,24 @@ impl SequencerStore {
self.dbio.put_zone_sdk_checkpoint_bytes(&bytes)?;
Ok(())
}
pub fn record_unseen_withdraw(
&self,
amount: u64,
bedrock_account_pk: [u8; 32],
) -> DbResult<u64> {
self.dbio
.increment_unseen_withdraw_count(amount, bedrock_account_pk)
}
pub fn consume_unseen_withdraw(
&self,
amount: u64,
bedrock_account_pk: [u8; 32],
) -> DbResult<bool> {
self.dbio
.consume_unseen_withdraw_count(amount, bedrock_account_pk)
}
}
pub(crate) fn block_to_transactions_map(block: &Block) -> HashMap<HashType, u64> {

View File

@ -1,4 +1,4 @@
use std::{path::Path, time::Instant};
use std::{path::Path, sync::Arc, time::Instant};
use anyhow::{Context as _, Result, anyhow};
use borsh::BorshDeserialize;
@ -10,12 +10,15 @@ use common::{
use config::{GenesisAction, SequencerConfig};
use log::{error, info, warn};
use logos_blockchain_key_management_system_service::keys::{ED25519_SECRET_KEY_SIZE, Ed25519Key};
use logos_blockchain_zone_sdk::sequencer::WithdrawArg;
use mempool::{MemPool, MemPoolHandle};
#[cfg(feature = "mock")]
pub use mock::SequencerCoreWithMockClients;
use nssa::{AccountId, PublicTransaction, program::Program, public_transaction::Message};
use nssa_core::GENESIS_BLOCK_ID;
use num_bigint::BigUint;
pub use storage::error::DbError;
use storage::sequencer::RocksDBIO;
use crate::{
block_publisher::{BlockPublisherTrait, ZoneSdkPublisher},
@ -131,66 +134,17 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
.expect("Failed to load zone-sdk checkpoint");
let is_fresh_start = initial_checkpoint.is_none();
let dbio_for_checkpoint = store.dbio();
let on_checkpoint: block_publisher::CheckpointSink = Box::new(move |cp| {
let bytes = match serde_json::to_vec(&cp) {
Ok(b) => b,
Err(err) => {
error!("Failed to serialize zone-sdk checkpoint: {err:#}");
return;
}
};
if let Err(err) = dbio_for_checkpoint.put_zone_sdk_checkpoint_bytes(&bytes) {
error!("Failed to persist zone-sdk checkpoint: {err:#}");
}
});
let dbio_for_finalized = store.dbio();
let on_finalized_block: block_publisher::FinalizedBlockSink = Box::new(move |block_id| {
if let Err(err) = dbio_for_finalized.clean_pending_blocks_up_to(block_id) {
error!("Failed to mark pending blocks finalized up to {block_id}: {err:#}");
}
});
let (mempool, mempool_handle) = MemPool::new(config.mempool_max_size);
let mempool_handle_for_deposit = mempool_handle.clone();
let on_deposit_event: block_publisher::OnDepositEventSink = Box::new(move |deposit| {
let mempool_handle_for_deposit = mempool_handle_for_deposit.clone();
Box::pin(async move {
info!(
"Observed Bedrock Deposit event with id: {:?}",
hex::encode(deposit.op_id)
);
let tx = match build_bridge_deposit_tx(&deposit) {
Ok(tx) => tx,
Err(err) => {
warn!(
"Skipping finalized Bedrock deposit event due to tx build failure: {err:#}"
);
return;
}
};
if let Err(err) = mempool_handle_for_deposit
.push((TransactionOrigin::Sequencer, tx))
.await
{
error!(
"Failed to queue sequencer transaction built from finalized Bedrock event: {err:#}"
);
}
})
});
let block_publisher = BP::new(
&config.bedrock_config,
bedrock_signing_key,
config.retry_pending_blocks_timeout,
initial_checkpoint,
on_checkpoint,
on_finalized_block,
on_deposit_event,
Self::on_checkpoint(store.dbio()),
Self::on_finalized_block(store.dbio()),
Self::on_deposit_event(mempool_handle.clone()),
Self::on_withdraw_event(store.dbio()),
)
.await
.expect("Failed to initialize Block Publisher");
@ -217,24 +171,114 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
(sequencer_core, mempool_handle)
}
fn on_checkpoint(dbio: Arc<RocksDBIO>) -> block_publisher::CheckpointSink {
Box::new(move |cp| {
let bytes = match serde_json::to_vec(&cp) {
Ok(b) => b,
Err(err) => {
error!("Failed to serialize zone-sdk checkpoint: {err:#}");
return;
}
};
if let Err(err) = dbio.put_zone_sdk_checkpoint_bytes(&bytes) {
error!("Failed to persist zone-sdk checkpoint: {err:#}");
}
})
}
fn on_finalized_block(dbio: Arc<RocksDBIO>) -> block_publisher::FinalizedBlockSink {
Box::new(move |block_id| {
if let Err(err) = dbio.clean_pending_blocks_up_to(block_id) {
error!("Failed to mark pending blocks finalized up to {block_id}: {err:#}");
}
})
}
fn on_deposit_event(
mempool_handle: MemPoolHandle<(TransactionOrigin, NSSATransaction)>,
) -> block_publisher::OnDepositEventSink {
Box::new(move |deposit| {
let mempool_handle = mempool_handle.clone();
Box::pin(async move {
info!(
"Observed Bedrock Deposit event with id: {:?}",
hex::encode(deposit.op_id)
);
let tx = match build_bridge_deposit_tx(&deposit) {
Ok(tx) => tx,
Err(err) => {
warn!(
"Skipping finalized Bedrock deposit event due to tx build failure: {err:#}"
);
return;
}
};
if let Err(err) = mempool_handle
.push((TransactionOrigin::Sequencer, tx))
.await
{
error!(
"Failed to queue sequencer transaction built from finalized Bedrock event: {err:#}"
);
}
})
})
}
fn on_withdraw_event(dbio: Arc<RocksDBIO>) -> block_publisher::OnWithdrawEventSink {
Box::new(move |withdraw| {
let dbio = Arc::clone(&dbio);
Box::pin(async move {
let hash_encoded = hex::encode(withdraw.tx_hash.as_ref());
let withdraw_key = match withdraw_event_reconciliation_key(&withdraw.op.outputs) {
Ok(key) => key,
Err(err) => {
error!(
"Failed to build reconciliation key for Bedrock Withdraw event with tx_hash {hash_encoded}: {err:#}"
);
return;
}
};
match dbio.consume_unseen_withdraw_count(
withdraw_key.amount,
withdraw_key.bedrock_account_pk,
) {
Ok(true) => {
info!("Validated Bedrock Withdraw event with tx_hash: {hash_encoded}");
}
Ok(false) => warn!(
"Unexpected Bedrock Withdraw event with tx_hash {hash_encoded}: no matching unseen withdraw found"
),
Err(err) => error!(
"Failed to reconcile Bedrock Withdraw event with tx_hash {hash_encoded}: {err:#}"
),
}
})
})
}
/// Produces a new block from mempool transactions and publishes it via zone-sdk.
pub async fn produce_new_block(&mut self) -> Result<u64> {
let BlockWithMeta {
block,
bridge_withdrawals,
} = self
let BlockWithMeta { block, withdraws } = self
.build_block_from_mempool()
.context("Failed to build block from mempool transactions")?;
for withdraw in &withdraws {
let withdraw_key = withdraw_event_reconciliation_key(&withdraw.outputs)
.context("Failed to derive unseen-withdraw key from withdraw data")?;
self.store
.record_unseen_withdraw(withdraw_key.amount, withdraw_key.bedrock_account_pk)
.context("Failed to persist unseen withdraw for reconciliation")?;
}
// TODO: Remove msg_id from store.update — it is no longer needed now that
// zone-sdk manages L1 settlement state via its own checkpoint.
let placeholder_msg_id = [0_u8; 32];
if let Err(err) = self
.block_publisher
.publish_block(&block, bridge_withdrawals)
.await
{
if let Err(err) = self.block_publisher.publish_block(&block, withdraws).await {
error!("Failed to publish block to Bedrock with error: {err:#}");
}
self.store.update(&block, placeholder_msg_id, &self.state)?;
@ -253,7 +297,7 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
let new_block_height = self.next_block_id();
let mut valid_transactions = vec![];
let mut bridge_withdrawals = vec![];
let mut withdraws = vec![];
let max_block_size = usize::try_from(self.sequencer_config.max_block_size.as_u64())
.expect("`max_block_size` should fit into usize");
@ -319,7 +363,7 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
};
if let Some(withdraw_data) = extract_bridge_withdraw_data(&tx) {
bridge_withdrawals.push(withdraw_data);
withdraws.push(withdraw_data);
}
self.state.apply_state_diff(validated_diff);
@ -374,10 +418,7 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
now.elapsed().as_secs()
);
Ok(BlockWithMeta {
block,
bridge_withdrawals,
})
Ok(BlockWithMeta { block, withdraws })
}
pub const fn state(&self) -> &nssa::V03State {
@ -435,13 +476,13 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
struct BlockWithMeta {
block: Block,
bridge_withdrawals: Vec<BridgeWithdrawData>,
withdraws: Vec<WithdrawArg>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BridgeWithdrawData {
pub amount: u64,
pub bedrock_account_pk: [u8; 32],
#[derive(Debug, Clone, Copy)]
struct WithdrawReconciliationKey {
amount: u64,
bedrock_account_pk: [u8; 32],
}
/// Builds the initial genesis state from `testnet_initial_state` plus configured genesis
@ -549,7 +590,7 @@ fn build_bridge_deposit_tx(
}
#[must_use]
pub fn extract_bridge_withdraw_data(tx: &NSSATransaction) -> Option<BridgeWithdrawData> {
pub fn extract_bridge_withdraw_data(tx: &NSSATransaction) -> Option<WithdrawArg> {
let NSSATransaction::Public(tx) = tx else {
return None;
};
@ -567,16 +608,55 @@ pub fn extract_bridge_withdraw_data(tx: &NSSATransaction) -> Option<BridgeWithdr
bridge_core::Instruction::Withdraw {
amount,
bedrock_account_pk,
} => Some(BridgeWithdrawData {
amount,
bedrock_account_pk,
}),
} => {
let recipient_pk =
logos_blockchain_key_management_system_service::keys::ZkPublicKey::from(
BigUint::from_bytes_le(&bedrock_account_pk),
);
Some(WithdrawArg {
outputs: logos_blockchain_core::mantle::ledger::Outputs::new(vec![
logos_blockchain_core::mantle::Note::new(amount, recipient_pk),
]),
})
}
bridge_core::Instruction::Deposit { .. } => unreachable!(
"Deposit instructions from users should never pass validation, and thus should never be seen here"
),
}
}
fn withdraw_event_reconciliation_key(
outputs: &logos_blockchain_core::mantle::ledger::Outputs,
) -> Result<WithdrawReconciliationKey> {
let [note] = outputs.as_ref().as_slice() else {
return Err(anyhow!(
"Unsupported withdraw output count for reconciliation: {}",
outputs.len()
));
};
// `extract_bridge_withdraw_data` maps [u8;32] LE -> BigUint -> ZkPublicKey.
// Reconcile by reversing that direction here.
let mut bedrock_account_pk = BigUint::from(note.pk.into_inner()).to_bytes_le();
if bedrock_account_pk.len() > 32 {
return Err(anyhow!(
"Withdraw recipient public key is too large: {} bytes",
bedrock_account_pk.len()
));
}
bedrock_account_pk.resize(32, 0);
let bedrock_account_pk: [u8; 32] = bedrock_account_pk
.try_into()
.expect("Public key bytes were padded/truncated to 32 bytes");
Ok(WithdrawReconciliationKey {
amount: note.value,
bedrock_account_pk,
})
}
/// Load signing key from file or generate a new one if it doesn't exist.
fn load_or_create_signing_key(path: &Path) -> Result<Ed25519Key> {
if path.exists() {

View File

@ -3,12 +3,12 @@ use std::time::Duration;
use anyhow::Result;
use common::block::Block;
use logos_blockchain_key_management_system_service::keys::Ed25519Key;
use logos_blockchain_zone_sdk::sequencer::WithdrawArg;
use crate::{
BridgeWithdrawData,
block_publisher::{
BlockPublisherTrait, CheckpointSink, FinalizedBlockSink, OnDepositEventSink,
SequencerCheckpoint,
OnWithdrawEventSink, SequencerCheckpoint,
},
config::BedrockConfig,
};
@ -27,6 +27,7 @@ impl BlockPublisherTrait for MockBlockPublisher {
_on_checkpoint: CheckpointSink,
_on_finalized_block: FinalizedBlockSink,
_on_deposit_event: OnDepositEventSink,
_on_withdraw_event: OnWithdrawEventSink,
) -> Result<Self> {
Ok(Self)
}
@ -34,7 +35,7 @@ impl BlockPublisherTrait for MockBlockPublisher {
async fn publish_block(
&self,
_block: &Block,
_bridge_withdrawals: Vec<BridgeWithdrawData>,
_bridge_withdrawals: Vec<WithdrawArg>,
) -> Result<()> {
Ok(())
}

View File

@ -8,11 +8,15 @@ use rocksdb::{
use crate::{
CF_BLOCK_NAME, CF_META_NAME, DB_META_FIRST_BLOCK_IN_DB_KEY, DBIO, DbResult,
cells::shared_cells::{BlockCell, FirstBlockCell, FirstBlockSetCell, LastBlockCell},
cells::{
SimpleStorableCell,
shared_cells::{BlockCell, FirstBlockCell, FirstBlockSetCell, LastBlockCell},
},
error::DbError,
sequencer::sequencer_cells::{
LastFinalizedBlockIdCell, LatestBlockMetaCellOwned, LatestBlockMetaCellRef,
NSSAStateCellOwned, NSSAStateCellRef, ZoneSdkCheckpointCellOwned, ZoneSdkCheckpointCellRef,
NSSAStateCellOwned, NSSAStateCellRef, UnseenWithdrawCountCell, ZoneSdkCheckpointCellOwned,
ZoneSdkCheckpointCellRef,
},
};
@ -24,6 +28,8 @@ pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id";
pub const DB_META_LATEST_BLOCK_META_KEY: &str = "latest_block_meta";
/// Key base for storing the zone-sdk sequencer checkpoint (opaque bytes).
pub const DB_META_ZONE_SDK_CHECKPOINT_KEY: &str = "zone_sdk_checkpoint";
/// Key base for counting unseen L2 withdraw intents.
pub const DB_META_UNSEEN_WITHDRAW_COUNT_KEY: &str = "unseen_withdraw_count";
/// Key base for storing the NSSA state.
pub const DB_NSSA_STATE_KEY: &str = "nssa_state";
@ -239,6 +245,58 @@ impl RocksDBIO {
self.put(&ZoneSdkCheckpointCellRef(bytes), ())
}
pub fn increment_unseen_withdraw_count(
&self,
amount: u64,
bedrock_account_pk: [u8; 32],
) -> DbResult<u64> {
let key_params = (amount, bedrock_account_pk);
let current = self
.get_opt::<UnseenWithdrawCountCell>(key_params)?
.map_or(0, |cell| cell.0);
let next = current.checked_add(1).ok_or_else(|| {
DbError::db_interaction_error("Unseen withdraw counter overflow".to_owned())
})?;
self.put(&UnseenWithdrawCountCell(next), key_params)?;
Ok(next)
}
pub fn consume_unseen_withdraw_count(
&self,
amount: u64,
bedrock_account_pk: [u8; 32],
) -> DbResult<bool> {
let key_params = (amount, bedrock_account_pk);
let Some(current) = self
.get_opt::<UnseenWithdrawCountCell>(key_params)?
.map(|cell| cell.0)
else {
return Ok(false);
};
if let Some(next) = current.checked_sub(1) {
self.put(&UnseenWithdrawCountCell(next), key_params)?;
} else {
let cf_meta = self.meta_column();
let db_key =
<UnseenWithdrawCountCell as SimpleStorableCell>::key_constructor(key_params)?;
self.db.delete_cf(&cf_meta, db_key).map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some("Failed to delete unseen withdraw count".to_owned()),
)
})?;
}
Ok(true)
}
pub fn put_block(
&self,
block: &Block,

View File

@ -8,7 +8,7 @@ use crate::{
error::DbError,
sequencer::{
CF_NSSA_STATE_NAME, DB_META_LAST_FINALIZED_BLOCK_ID, DB_META_LATEST_BLOCK_META_KEY,
DB_META_ZONE_SDK_CHECKPOINT_KEY, DB_NSSA_STATE_KEY,
DB_META_UNSEEN_WITHDRAW_COUNT_KEY, DB_META_ZONE_SDK_CHECKPOINT_KEY, DB_NSSA_STATE_KEY,
},
};
@ -131,6 +131,41 @@ impl SimpleWritableCell for ZoneSdkCheckpointCellRef<'_> {
}
}
#[derive(Debug, BorshSerialize, BorshDeserialize)]
pub struct UnseenWithdrawCountCell(pub u64);
impl SimpleStorableCell for UnseenWithdrawCountCell {
type KeyParams = (u64, [u8; 32]);
const CELL_NAME: &'static str = DB_META_UNSEEN_WITHDRAW_COUNT_KEY;
const CF_NAME: &'static str = CF_META_NAME;
fn key_constructor((amount, bedrock_account_pk): Self::KeyParams) -> DbResult<Vec<u8>> {
borsh::to_vec(&(Self::CELL_NAME, amount, bedrock_account_pk)).map_err(|err| {
DbError::borsh_cast_message(
err,
Some(format!(
"Failed to serialize {:?} key params",
Self::CELL_NAME
)),
)
})
}
}
impl SimpleReadableCell for UnseenWithdrawCountCell {}
impl SimpleWritableCell for UnseenWithdrawCountCell {
fn value_constructor(&self) -> DbResult<Vec<u8>> {
borsh::to_vec(&self).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize unseen withdraw count".to_owned()),
)
})
}
}
#[cfg(test)]
mod uniform_tests {
use crate::{