mirror of
https://github.com/logos-blockchain/lssa.git
synced 2026-06-26 10:59:38 +00:00
feat: add bedrock withdraw events validation
This commit is contained in:
parent
77f1fb3f21
commit
e5ca40c0f9
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -495,7 +495,7 @@ async fn bedrock_deposit_claim_and_withdraw_round_trip_succeeds() -> anyhow::Res
|
||||
let sender_id = recipient_id;
|
||||
|
||||
let observer = create_zone_indexer_observer(ctx.bedrock_addr())?;
|
||||
let observe_fut = wait_for_finalized_withdraw_op(&observer, amount);
|
||||
let observe_fut = wait_for_finalized_withdraw_op(&observer, amount, bedrock_account_pk);
|
||||
|
||||
let withdraw_fut = execute_subcommand(
|
||||
ctx.wallet_mut(),
|
||||
@ -513,6 +513,9 @@ async fn bedrock_deposit_claim_and_withdraw_round_trip_succeeds() -> anyhow::Res
|
||||
observe_result
|
||||
.context("Failed while waiting for finalized withdraw event from zone indexer")?;
|
||||
|
||||
// Sleep to observe sequencer log about validated withdraw event
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -563,11 +566,16 @@ async fn wait_for_finalized_withdraw_op(
|
||||
continue;
|
||||
};
|
||||
|
||||
let amount = withdraw.outputs.amount().context(
|
||||
"Failed to compute finalized withdraw amount from zone indexer message",
|
||||
)?;
|
||||
let mut iter = withdraw.outputs.iter();
|
||||
let Some(note) = iter.next() else {
|
||||
continue;
|
||||
};
|
||||
if iter.next().is_some() {
|
||||
// Withdraw op should only have one output
|
||||
continue;
|
||||
}
|
||||
|
||||
if amount == expected_amount {
|
||||
if note.value == expected_amount && note.pk == expected_receiver_pk {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
@ -80,11 +80,11 @@ impl LeeTransaction {
|
||||
) -> Result<ValidatedStateDiff, lee::error::LeeError> {
|
||||
let diff = self.compute_state_diff(state, block_id, timestamp)?;
|
||||
|
||||
let system_accounts = lee::CLOCK_PROGRAM_ACCOUNT_IDS
|
||||
let restricted_modification_accounts = lee::CLOCK_PROGRAM_ACCOUNT_IDS
|
||||
.iter()
|
||||
.copied()
|
||||
.chain(std::iter::once(lee::system_faucet_account_id()));
|
||||
for account_id in system_accounts {
|
||||
for account_id in restricted_modification_accounts {
|
||||
validate_doesnt_modify_account(state, &diff, account_id)?;
|
||||
}
|
||||
|
||||
|
||||
@ -3,19 +3,21 @@ use std::{pin::Pin, sync::Arc, time::Duration};
|
||||
use anyhow::{Context as _, Result};
|
||||
use common::block::Block;
|
||||
use log::{info, warn};
|
||||
use logos_blockchain_core::mantle::{Note, ledger::Outputs, ops::channel::inscribe::Inscription};
|
||||
use logos_blockchain_core::mantle::ops::channel::inscribe::Inscription;
|
||||
pub use logos_blockchain_key_management_system_service::keys::{Ed25519Key, ZkKey};
|
||||
pub use logos_blockchain_zone_sdk::sequencer::SequencerCheckpoint;
|
||||
use logos_blockchain_zone_sdk::{
|
||||
CommonHttpClient,
|
||||
adapter::NodeHttpClient,
|
||||
sequencer::{Event, SequencerConfig as ZoneSdkSequencerConfig, SequencerHandle, ZoneSequencer},
|
||||
state::{DepositInfo, FinalizedOp, InscriptionInfo},
|
||||
sequencer::{
|
||||
Event, SequencerConfig as ZoneSdkSequencerConfig, SequencerHandle, WithdrawArg,
|
||||
ZoneSequencer,
|
||||
},
|
||||
state::{DepositInfo, FinalizedOp, InscriptionInfo, WithdrawInfo},
|
||||
};
|
||||
use num_bigint::BigUint;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::{BridgeWithdrawData, config::BedrockConfig};
|
||||
use crate::config::BedrockConfig;
|
||||
|
||||
/// Sink for `Event::Published` checkpoints emitted by the drive task.
|
||||
/// Caller is responsible for persistence (e.g. writing to rocksdb).
|
||||
@ -30,8 +32,16 @@ pub type FinalizedBlockSink = Box<dyn Fn(u64) + Send + 'static>;
|
||||
pub type OnDepositEventSink =
|
||||
Box<dyn Fn(DepositInfo) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;
|
||||
|
||||
/// Sink for finalized Bedrock withdraw events.
|
||||
pub type OnWithdrawEventSink =
|
||||
Box<dyn Fn(WithdrawInfo) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + 'static>;
|
||||
|
||||
#[expect(async_fn_in_trait, reason = "We don't care about Send/Sync here")]
|
||||
pub trait BlockPublisherTrait: Clone {
|
||||
#[expect(
|
||||
clippy::too_many_arguments,
|
||||
reason = "Looks better than bundling all those callbacks into a struct"
|
||||
)]
|
||||
async fn new(
|
||||
config: &BedrockConfig,
|
||||
bedrock_signing_key: Ed25519Key,
|
||||
@ -40,15 +50,12 @@ pub trait BlockPublisherTrait: Clone {
|
||||
on_checkpoint: CheckpointSink,
|
||||
on_finalized_block: FinalizedBlockSink,
|
||||
on_deposit_event: OnDepositEventSink,
|
||||
on_withdraw_event: OnWithdrawEventSink,
|
||||
) -> Result<Self>;
|
||||
|
||||
/// Fire-and-forget publish. Zone-sdk drives the actual submission and
|
||||
/// retries internally; this just hands the payload off.
|
||||
async fn publish_block(
|
||||
&self,
|
||||
block: &Block,
|
||||
bridge_withdrawals: Vec<BridgeWithdrawData>,
|
||||
) -> Result<()>;
|
||||
async fn publish_block(&self, block: &Block, withdrawals: Vec<WithdrawArg>) -> Result<()>;
|
||||
}
|
||||
|
||||
/// Real block publisher backed by zone-sdk's `ZoneSequencer`.
|
||||
@ -76,6 +83,7 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
|
||||
on_checkpoint: CheckpointSink,
|
||||
on_finalized_block: FinalizedBlockSink,
|
||||
on_deposit_event: OnDepositEventSink,
|
||||
on_withdraw_event: OnWithdrawEventSink,
|
||||
) -> Result<Self> {
|
||||
let basic_auth = config.auth.clone().map(Into::into);
|
||||
let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), config.node_url.clone());
|
||||
@ -112,7 +120,9 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
|
||||
FinalizedOp::Deposit(deposit) => {
|
||||
on_deposit_event(deposit).await;
|
||||
}
|
||||
FinalizedOp::Withdraw(_) => {}
|
||||
FinalizedOp::Withdraw(withdraw) => {
|
||||
on_withdraw_event(withdraw).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -132,18 +142,14 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
|
||||
})
|
||||
}
|
||||
|
||||
async fn publish_block(
|
||||
&self,
|
||||
block: &Block,
|
||||
bridge_withdrawals: Vec<BridgeWithdrawData>,
|
||||
) -> Result<()> {
|
||||
async fn publish_block(&self, block: &Block, withdrawals: Vec<WithdrawArg>) -> Result<()> {
|
||||
let data = borsh::to_vec(block).context("Failed to serialize block")?;
|
||||
let data_bounded: Inscription = data
|
||||
.try_into()
|
||||
.context("Block data exceeds maximum allowed size")?;
|
||||
let data_byte_size = data_bounded.len();
|
||||
|
||||
if bridge_withdrawals.is_empty() {
|
||||
if withdrawals.is_empty() {
|
||||
self.handle
|
||||
.publish_message(data_bounded)
|
||||
.await
|
||||
@ -154,23 +160,9 @@ impl BlockPublisherTrait for ZoneSdkPublisher {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let withdraws: Vec<_> = bridge_withdrawals
|
||||
.into_iter()
|
||||
.map(|withdrawal| {
|
||||
let recipient_pk =
|
||||
logos_blockchain_key_management_system_service::keys::ZkPublicKey::from(
|
||||
BigUint::from_bytes_le(&withdrawal.bedrock_account_pk),
|
||||
);
|
||||
|
||||
logos_blockchain_zone_sdk::sequencer::WithdrawArg {
|
||||
outputs: Outputs::new(Note::new(withdrawal.amount, recipient_pk)),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let withdraw_count = withdraws.len();
|
||||
let withdraw_count = withdrawals.len();
|
||||
self.handle
|
||||
.publish_atomic_withdraw(data_bounded, withdraws)
|
||||
.publish_atomic_withdraw(data_bounded, withdrawals)
|
||||
.await
|
||||
.context("Failed to publish block with withdrawals")?;
|
||||
|
||||
|
||||
@ -10,7 +10,10 @@ use lee::V03State;
|
||||
use log::info;
|
||||
use logos_blockchain_zone_sdk::sequencer::SequencerCheckpoint;
|
||||
pub use storage::DbResult;
|
||||
use storage::sequencer::{RocksDBIO, sequencer_cells::PendingDepositEventRecord};
|
||||
use storage::sequencer::{
|
||||
RocksDBIO,
|
||||
sequencer_cells::{PendingDepositEventRecord, WithdrawalReconciliationKey},
|
||||
};
|
||||
|
||||
pub struct SequencerStore {
|
||||
dbio: Arc<RocksDBIO>,
|
||||
@ -128,9 +131,16 @@ impl SequencerStore {
|
||||
self.dbio.get_all_blocks()
|
||||
}
|
||||
|
||||
pub(crate) fn update(&mut self, block: &Block, state: &V03State) -> DbResult<()> {
|
||||
pub(crate) fn update(
|
||||
&mut self,
|
||||
block: &Block,
|
||||
deposit_event_ids: &[HashType],
|
||||
withdrawals: Vec<WithdrawalReconciliationKey>,
|
||||
state: &V03State,
|
||||
) -> DbResult<()> {
|
||||
let new_transactions_map = block_to_transactions_map(block);
|
||||
self.dbio.atomic_update(block, state)?;
|
||||
self.dbio
|
||||
.atomic_update(block, deposit_event_ids, withdrawals, state)?;
|
||||
self.tx_hash_to_block_map.extend(new_transactions_map);
|
||||
Ok(())
|
||||
}
|
||||
@ -158,23 +168,6 @@ impl SequencerStore {
|
||||
pub fn get_unfulfilled_deposit_events(&self) -> DbResult<Vec<PendingDepositEventRecord>> {
|
||||
self.dbio.get_pending_deposit_events()
|
||||
}
|
||||
|
||||
pub fn mark_unfulfilled_deposit_events_submitted(
|
||||
&self,
|
||||
deposit_op_ids: &[HashType],
|
||||
submitted_block_id: u64,
|
||||
) -> DbResult<usize> {
|
||||
self.dbio
|
||||
.mark_pending_deposit_events_submitted(deposit_op_ids, submitted_block_id)
|
||||
}
|
||||
|
||||
pub fn remove_fulfilled_unfulfilled_deposit_events_up_to_block(
|
||||
&self,
|
||||
finalized_block_id: u64,
|
||||
) -> DbResult<usize> {
|
||||
self.dbio
|
||||
.remove_fulfilled_pending_deposit_events_up_to_block(finalized_block_id)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn block_to_transactions_map(block: &Block) -> HashMap<HashType, u64> {
|
||||
@ -227,7 +220,9 @@ mod tests {
|
||||
assert_eq!(None, retrieved_tx);
|
||||
// Add the block with the transaction
|
||||
let dummy_state = V03State::new_with_genesis_accounts(&[], vec![], 0);
|
||||
node_store.update(&block, &dummy_state).unwrap();
|
||||
node_store
|
||||
.update(&block, &[], vec![], &dummy_state)
|
||||
.unwrap();
|
||||
// Try again
|
||||
let retrieved_tx = node_store.get_transaction_by_hash(tx.hash());
|
||||
assert_eq!(Some(tx), retrieved_tx);
|
||||
@ -292,7 +287,9 @@ mod tests {
|
||||
let block_hash = block.header.hash;
|
||||
|
||||
let dummy_state = V03State::new_with_genesis_accounts(&[], vec![], 0);
|
||||
node_store.update(&block, &dummy_state).unwrap();
|
||||
node_store
|
||||
.update(&block, &[], vec![], &dummy_state)
|
||||
.unwrap();
|
||||
|
||||
// Verify that the latest block meta now equals the new block's hash
|
||||
let latest_meta = node_store.latest_block_meta().unwrap();
|
||||
@ -328,7 +325,9 @@ mod tests {
|
||||
let block_id = block.header.block_id;
|
||||
|
||||
let dummy_state = V03State::new_with_genesis_accounts(&[], vec![], 0);
|
||||
node_store.update(&block, &dummy_state).unwrap();
|
||||
node_store
|
||||
.update(&block, &[], vec![], &dummy_state)
|
||||
.unwrap();
|
||||
|
||||
// Verify initial status is Pending
|
||||
let retrieved_block = node_store.get_block_at_id(block_id).unwrap().unwrap();
|
||||
@ -377,7 +376,12 @@ mod tests {
|
||||
// Add a new block
|
||||
let block = common::test_utils::produce_dummy_block(1, None, vec![tx.clone()]);
|
||||
node_store
|
||||
.update(&block, &V03State::new_with_genesis_accounts(&[], vec![], 0))
|
||||
.update(
|
||||
&block,
|
||||
&[],
|
||||
vec![],
|
||||
&V03State::new_with_genesis_accounts(&[], vec![], 0),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
|
||||
@ -12,11 +12,16 @@ use lee::{AccountId, PublicTransaction, program::Program, public_transaction::Me
|
||||
use lee_core::GENESIS_BLOCK_ID;
|
||||
use log::{error, info, warn};
|
||||
use logos_blockchain_key_management_system_service::keys::{ED25519_SECRET_KEY_SIZE, Ed25519Key};
|
||||
use logos_blockchain_zone_sdk::sequencer::WithdrawArg;
|
||||
use mempool::{MemPool, MemPoolHandle};
|
||||
#[cfg(feature = "mock")]
|
||||
pub use mock::SequencerCoreWithMockClients;
|
||||
use num_bigint::BigUint;
|
||||
pub use storage::error::DbError;
|
||||
use storage::sequencer::sequencer_cells::PendingDepositEventRecord;
|
||||
use storage::sequencer::{
|
||||
RocksDBIO,
|
||||
sequencer_cells::{PendingDepositEventRecord, WithdrawalReconciliationKey},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
block_publisher::{BlockPublisherTrait, ZoneSdkPublisher},
|
||||
@ -126,112 +131,18 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
|
||||
.expect("Failed to load zone-sdk checkpoint");
|
||||
let is_fresh_start = initial_checkpoint.is_none();
|
||||
|
||||
let dbio_for_checkpoint = store.dbio();
|
||||
let on_checkpoint: block_publisher::CheckpointSink = Box::new(move |cp| {
|
||||
let bytes = match serde_json::to_vec(&cp) {
|
||||
Ok(b) => b,
|
||||
Err(err) => {
|
||||
error!("Failed to serialize zone-sdk checkpoint: {err:#}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(err) = dbio_for_checkpoint.put_zone_sdk_checkpoint_bytes(&bytes) {
|
||||
error!("Failed to persist zone-sdk checkpoint: {err:#}");
|
||||
}
|
||||
});
|
||||
|
||||
let dbio_for_finalized = store.dbio();
|
||||
let on_finalized_block: block_publisher::FinalizedBlockSink = Box::new(move |block_id| {
|
||||
// NOTE: Theoretically Zone SDK may report finalization happening multiple times for the
|
||||
// same block. In practice this is very unlikely to happen. For that to
|
||||
// happen Sequencer should crash between receiving Finalized and Checkpoint events while
|
||||
// these events happen very fast (because Checkpoints are generated by Zone SDK
|
||||
// locally).
|
||||
|
||||
if let Err(err) = dbio_for_finalized.clean_pending_blocks_up_to(block_id) {
|
||||
error!("Failed to mark pending blocks finalized up to {block_id}: {err:#}");
|
||||
}
|
||||
|
||||
match dbio_for_finalized.remove_fulfilled_pending_deposit_events_up_to_block(block_id) {
|
||||
Ok(0) => {}
|
||||
Ok(removed) => {
|
||||
info!(
|
||||
"Removed {removed} fulfilled pending deposit events up to finalized block {block_id}"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Failed to remove fulfilled pending deposit events up to block {block_id}: {err:#}"
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let (mempool, mempool_handle) = MemPool::new(config.mempool_max_size);
|
||||
|
||||
replay_unfulfilled_deposit_events(&store, mempool_handle.clone());
|
||||
|
||||
let mempool_handle_for_deposit = mempool_handle.clone();
|
||||
let dbio_for_deposit = store.dbio();
|
||||
let on_deposit_event: block_publisher::OnDepositEventSink = Box::new(move |deposit| {
|
||||
// NOTE: Theoretically Zone SDK may report multiple identical deposits. In practice this
|
||||
// is very unlikely to happen. For that to happen Sequencer should crash
|
||||
// between receiving Deposit and Checkpoint events while these events happen
|
||||
// very fast (because Checkpoints are generated by Zone SDK locally).
|
||||
|
||||
let dbio_for_deposit = Arc::clone(&dbio_for_deposit);
|
||||
let mempool_handle_for_deposit = mempool_handle_for_deposit.clone();
|
||||
Box::pin(async move {
|
||||
let id_hex = hex::encode(deposit.op_id);
|
||||
info!("Observed Bedrock Deposit event with id: {id_hex}");
|
||||
|
||||
let event_record = pending_deposit_event_record(&deposit);
|
||||
|
||||
match dbio_for_deposit.add_pending_deposit_event(event_record.clone()) {
|
||||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
info!(
|
||||
"Deposit event {id_hex} already persisted as unfulfilled, skipping duplicate enqueue",
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Failed to persist unfulfilled deposit event {id_hex} before enqueue: {err:#}. Deposit will be lost.",
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let tx = match build_bridge_deposit_tx_from_event(&event_record) {
|
||||
Ok(tx) => tx,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Failed to build transaction from Bedrock deposit event {id_hex}: {err:#}. Deposit will be lost.",
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = mempool_handle_for_deposit
|
||||
.push((TransactionOrigin::Sequencer, tx))
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
"Failed to queue sequencer transaction built from finalized Bedrock event: {err:#}. Deposit will be lost."
|
||||
);
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
let block_publisher = BP::new(
|
||||
&config.bedrock_config,
|
||||
bedrock_signing_key,
|
||||
config.retry_pending_blocks_timeout,
|
||||
initial_checkpoint,
|
||||
on_checkpoint,
|
||||
on_finalized_block,
|
||||
on_deposit_event,
|
||||
Self::on_checkpoint(store.dbio()),
|
||||
Self::on_finalized_block(store.dbio()),
|
||||
Self::on_deposit_event(store.dbio(), mempool_handle.clone()),
|
||||
Self::on_withdraw_event(store.dbio()),
|
||||
)
|
||||
.await
|
||||
.expect("Failed to initialize Block Publisher");
|
||||
@ -259,32 +170,163 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
|
||||
(sequencer_core, mempool_handle)
|
||||
}
|
||||
|
||||
fn on_checkpoint(dbio: Arc<RocksDBIO>) -> block_publisher::CheckpointSink {
|
||||
Box::new(move |cp| {
|
||||
let bytes = match serde_json::to_vec(&cp) {
|
||||
Ok(b) => b,
|
||||
Err(err) => {
|
||||
error!("Failed to serialize zone-sdk checkpoint: {err:#}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(err) = dbio.put_zone_sdk_checkpoint_bytes(&bytes) {
|
||||
error!("Failed to persist zone-sdk checkpoint: {err:#}");
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn on_finalized_block(dbio: Arc<RocksDBIO>) -> block_publisher::FinalizedBlockSink {
|
||||
Box::new(move |block_id| {
|
||||
// NOTE: Theoretically Zone SDK may report finalization happening multiple times for the
|
||||
// same block. In practice this is very unlikely to happen. For that to
|
||||
// happen Sequencer should crash between receiving Finalized and Checkpoint events while
|
||||
// these events happen very fast (because Checkpoints are generated by Zone SDK
|
||||
// locally).
|
||||
|
||||
if let Err(err) = dbio.clean_pending_blocks_up_to(block_id) {
|
||||
error!("Failed to mark pending blocks finalized up to {block_id}: {err:#}");
|
||||
}
|
||||
|
||||
match dbio.remove_fulfilled_pending_deposit_events_up_to_block(block_id) {
|
||||
Ok(0) => {}
|
||||
Ok(removed) => {
|
||||
info!(
|
||||
"Removed {removed} fulfilled pending deposit events up to finalized block {block_id}"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Failed to remove fulfilled pending deposit events up to block {block_id}: {err:#}"
|
||||
);
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn on_deposit_event(
|
||||
dbio: Arc<RocksDBIO>,
|
||||
mempool_handle: MemPoolHandle<(TransactionOrigin, LeeTransaction)>,
|
||||
) -> block_publisher::OnDepositEventSink {
|
||||
Box::new(move |deposit| {
|
||||
// NOTE: Theoretically Zone SDK may report multiple identical deposits. In practice this
|
||||
// is very unlikely to happen. For that to happen Sequencer should crash
|
||||
// between receiving Deposit and Checkpoint events while these events happen
|
||||
// very fast (because Checkpoints are generated by Zone SDK locally).
|
||||
|
||||
let dbio = Arc::clone(&dbio);
|
||||
let mempool_handle = mempool_handle.clone();
|
||||
|
||||
Box::pin(async move {
|
||||
let id_hex = hex::encode(deposit.op_id);
|
||||
info!("Observed Bedrock Deposit event with id: {id_hex}");
|
||||
|
||||
let event_record = pending_deposit_event_record(&deposit);
|
||||
|
||||
match dbio.add_pending_deposit_event(event_record.clone()) {
|
||||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
info!(
|
||||
"Deposit event {id_hex} already persisted as unfulfilled, skipping duplicate enqueue",
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Failed to persist unfulfilled deposit event {id_hex} before enqueue: {err:#}. Deposit will be lost.",
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let tx = match build_bridge_deposit_tx_from_event(&event_record) {
|
||||
Ok(tx) => tx,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Failed to build transaction from Bedrock deposit event {id_hex}: {err:#}. Deposit will be lost.",
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = mempool_handle
|
||||
.push((TransactionOrigin::Sequencer, tx))
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
"Failed to queue sequencer transaction built from finalized Bedrock event: {err:#}. Deposit will be lost."
|
||||
);
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
fn on_withdraw_event(dbio: Arc<RocksDBIO>) -> block_publisher::OnWithdrawEventSink {
|
||||
Box::new(move |withdraw| {
|
||||
let dbio = Arc::clone(&dbio);
|
||||
Box::pin(async move {
|
||||
let hash_encoded = hex::encode(withdraw.tx_hash.as_ref());
|
||||
let withdraw_key = match withdraw_event_reconciliation_key(&withdraw.op.outputs) {
|
||||
Ok(key) => key,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Failed to build reconciliation key for Bedrock Withdraw event with tx_hash {hash_encoded}: {err:#}"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match dbio.consume_unseen_withdraw_count(withdraw_key) {
|
||||
Ok(true) => {
|
||||
info!("Validated Bedrock Withdraw event with tx_hash: {hash_encoded}");
|
||||
}
|
||||
Ok(false) => warn!(
|
||||
"Unexpected Bedrock Withdraw event with tx_hash {hash_encoded}: no matching unseen withdraw found"
|
||||
),
|
||||
Err(err) => error!(
|
||||
"Failed to reconcile Bedrock Withdraw event with tx_hash {hash_encoded}: {err:#}"
|
||||
),
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/// Produces a new block from mempool transactions and publishes it via zone-sdk.
|
||||
pub async fn produce_new_block(&mut self) -> Result<u64> {
|
||||
let BlockWithMeta {
|
||||
block,
|
||||
deposit_event_ids,
|
||||
bridge_withdrawals,
|
||||
withdrawals,
|
||||
} = self
|
||||
.build_block_from_mempool()
|
||||
.context("Failed to build block from mempool transactions")?;
|
||||
|
||||
let withdrawal_reconciliation_keys = withdrawals
|
||||
.iter()
|
||||
.map(|withdraw| withdraw_event_reconciliation_key(&withdraw.outputs))
|
||||
.collect::<Result<_>>()
|
||||
.context("Failed to build reconciliation keys for block withdrawals")?;
|
||||
|
||||
self.block_publisher
|
||||
.publish_block(&block, bridge_withdrawals)
|
||||
.publish_block(&block, withdrawals)
|
||||
.await
|
||||
.context("Failed to publish block to Bedrock")?;
|
||||
|
||||
self.store.update(&block, &self.state)?;
|
||||
|
||||
let updated_deposits = self
|
||||
.store
|
||||
.mark_unfulfilled_deposit_events_submitted(&deposit_event_ids, block.header.block_id)?;
|
||||
if updated_deposits > 0 {
|
||||
info!(
|
||||
"Marked {updated_deposits} pending deposit events as submitted in block {}",
|
||||
block.header.block_id
|
||||
);
|
||||
}
|
||||
self.store.update(
|
||||
&block,
|
||||
&deposit_event_ids,
|
||||
withdrawal_reconciliation_keys,
|
||||
&self.state,
|
||||
)?;
|
||||
|
||||
Ok(self.chain_height)
|
||||
}
|
||||
@ -298,7 +340,7 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
|
||||
|
||||
let mut valid_transactions = Vec::new();
|
||||
let mut deposit_event_ids = Vec::new();
|
||||
let mut bridge_withdrawals = Vec::new();
|
||||
let mut withdrawals = Vec::new();
|
||||
|
||||
let max_block_size = usize::try_from(self.sequencer_config.max_block_size.as_u64())
|
||||
.expect("`max_block_size` should fit into usize");
|
||||
@ -364,7 +406,7 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
|
||||
};
|
||||
|
||||
if let Some(withdraw_data) = extract_bridge_withdraw_data(&tx) {
|
||||
bridge_withdrawals.push(withdraw_data);
|
||||
withdrawals.push(withdraw_data);
|
||||
}
|
||||
|
||||
self.state.apply_state_diff(validated_diff);
|
||||
@ -424,7 +466,7 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
|
||||
Ok(BlockWithMeta {
|
||||
block,
|
||||
deposit_event_ids,
|
||||
bridge_withdrawals,
|
||||
withdrawals,
|
||||
})
|
||||
}
|
||||
|
||||
@ -484,13 +526,7 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
|
||||
struct BlockWithMeta {
|
||||
block: Block,
|
||||
deposit_event_ids: Vec<HashType>,
|
||||
bridge_withdrawals: Vec<BridgeWithdrawData>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct BridgeWithdrawData {
|
||||
pub amount: u64,
|
||||
pub bedrock_account_pk: [u8; 32],
|
||||
withdrawals: Vec<WithdrawArg>,
|
||||
}
|
||||
|
||||
/// Checks the database for any pending deposit events that have not yet been marked as submitted in
|
||||
@ -681,7 +717,7 @@ fn extract_bridge_deposit_id(tx: &LeeTransaction) -> Option<HashType> {
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
fn extract_bridge_withdraw_data(tx: &LeeTransaction) -> Option<BridgeWithdrawData> {
|
||||
fn extract_bridge_withdraw_data(tx: &LeeTransaction) -> Option<WithdrawArg> {
|
||||
let LeeTransaction::Public(tx) = tx else {
|
||||
return None;
|
||||
};
|
||||
@ -699,16 +735,55 @@ fn extract_bridge_withdraw_data(tx: &LeeTransaction) -> Option<BridgeWithdrawDat
|
||||
bridge_core::Instruction::Withdraw {
|
||||
amount,
|
||||
bedrock_account_pk,
|
||||
} => Some(BridgeWithdrawData {
|
||||
amount,
|
||||
bedrock_account_pk,
|
||||
}),
|
||||
} => {
|
||||
let recipient_pk =
|
||||
logos_blockchain_key_management_system_service::keys::ZkPublicKey::from(
|
||||
BigUint::from_bytes_le(&bedrock_account_pk),
|
||||
);
|
||||
|
||||
Some(WithdrawArg {
|
||||
outputs: logos_blockchain_core::mantle::ledger::Outputs::new(
|
||||
logos_blockchain_core::mantle::Note::new(amount, recipient_pk),
|
||||
),
|
||||
})
|
||||
}
|
||||
bridge_core::Instruction::Deposit { .. } => unreachable!(
|
||||
"Deposit instructions from users should never pass validation, and thus should never be seen here"
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
fn withdraw_event_reconciliation_key(
|
||||
outputs: &logos_blockchain_core::mantle::ledger::Outputs,
|
||||
) -> Result<WithdrawalReconciliationKey> {
|
||||
let [note] = outputs.as_ref().as_slice() else {
|
||||
return Err(anyhow!(
|
||||
"Unsupported withdraw output count for reconciliation: {}",
|
||||
outputs.len()
|
||||
));
|
||||
};
|
||||
|
||||
// `extract_bridge_withdraw_data` maps [u8;32] LE -> BigUint -> ZkPublicKey.
|
||||
// Reconcile by reversing that direction here.
|
||||
let mut bedrock_account_pk = BigUint::from(note.pk.into_inner()).to_bytes_le();
|
||||
if bedrock_account_pk.len() > 32 {
|
||||
return Err(anyhow!(
|
||||
"Withdraw recipient public key is too large: {} bytes",
|
||||
bedrock_account_pk.len()
|
||||
));
|
||||
}
|
||||
bedrock_account_pk.resize(32, 0);
|
||||
|
||||
let bedrock_account_pk: [u8; 32] = bedrock_account_pk
|
||||
.try_into()
|
||||
.expect("Public key bytes were padded/truncated to 32 bytes");
|
||||
|
||||
Ok(WithdrawalReconciliationKey {
|
||||
amount: note.value,
|
||||
bedrock_account_pk,
|
||||
})
|
||||
}
|
||||
|
||||
/// Load signing key from file or generate a new one if it doesn't exist.
|
||||
fn load_or_create_signing_key(path: &Path) -> Result<Ed25519Key> {
|
||||
if path.exists() {
|
||||
@ -755,7 +830,7 @@ mod tests {
|
||||
system_bridge_account_id,
|
||||
};
|
||||
use lee_core::{
|
||||
Commitment, InputAccountIdentity, Nullifier,
|
||||
Commitment, EncryptedAccountData, InputAccountIdentity, Nullifier,
|
||||
account::{AccountWithMetadata, Nonce},
|
||||
};
|
||||
use logos_blockchain_core::mantle::ops::channel::ChannelId;
|
||||
@ -1586,6 +1661,11 @@ mod tests {
|
||||
instruction,
|
||||
vec![
|
||||
InputAccountIdentity::PrivateAuthorizedUpdate {
|
||||
epk: EphemeralPublicKey(vec![12_u8; 1088]),
|
||||
view_tag: EncryptedAccountData::compute_view_tag(
|
||||
&sender_keys.nullifier_public_key,
|
||||
&sender_keys.viewing_public_key,
|
||||
),
|
||||
ssk: shared_secret,
|
||||
nsk: sender_keys.private_key_holder.nullifier_secret_key,
|
||||
membership_proof: state
|
||||
@ -1599,17 +1679,8 @@ mod tests {
|
||||
)
|
||||
.expect("Execution should succeed");
|
||||
|
||||
let message = Message::try_from_circuit_output(
|
||||
vec![bridge_account_id],
|
||||
vec![],
|
||||
vec![(
|
||||
sender_keys.nullifier_public_key,
|
||||
sender_keys.viewing_public_key,
|
||||
EphemeralPublicKey(vec![12_u8; 1088]),
|
||||
)],
|
||||
output,
|
||||
)
|
||||
.expect("Message construction should succeed");
|
||||
let message = Message::try_from_circuit_output(vec![bridge_account_id], vec![], output)
|
||||
.expect("Message construction should succeed");
|
||||
let witness_set =
|
||||
lee::privacy_preserving_transaction::WitnessSet::for_message(&message, proof, &[]);
|
||||
let tx = LeeTransaction::PrivacyPreserving(PrivacyPreservingTransaction::new(
|
||||
|
||||
@ -3,12 +3,12 @@ use std::time::Duration;
|
||||
use anyhow::Result;
|
||||
use common::block::Block;
|
||||
use logos_blockchain_key_management_system_service::keys::Ed25519Key;
|
||||
use logos_blockchain_zone_sdk::sequencer::WithdrawArg;
|
||||
|
||||
use crate::{
|
||||
BridgeWithdrawData,
|
||||
block_publisher::{
|
||||
BlockPublisherTrait, CheckpointSink, FinalizedBlockSink, OnDepositEventSink,
|
||||
SequencerCheckpoint,
|
||||
OnWithdrawEventSink, SequencerCheckpoint,
|
||||
},
|
||||
config::BedrockConfig,
|
||||
};
|
||||
@ -27,6 +27,7 @@ impl BlockPublisherTrait for MockBlockPublisher {
|
||||
_on_checkpoint: CheckpointSink,
|
||||
_on_finalized_block: FinalizedBlockSink,
|
||||
_on_deposit_event: OnDepositEventSink,
|
||||
_on_withdraw_event: OnWithdrawEventSink,
|
||||
) -> Result<Self> {
|
||||
Ok(Self)
|
||||
}
|
||||
@ -34,7 +35,7 @@ impl BlockPublisherTrait for MockBlockPublisher {
|
||||
async fn publish_block(
|
||||
&self,
|
||||
_block: &Block,
|
||||
_bridge_withdrawals: Vec<BridgeWithdrawData>,
|
||||
_bridge_withdrawals: Vec<WithdrawArg>,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -11,12 +11,16 @@ use rocksdb::{
|
||||
|
||||
use crate::{
|
||||
CF_BLOCK_NAME, CF_META_NAME, DB_META_FIRST_BLOCK_IN_DB_KEY, DBIO, DbResult,
|
||||
cells::shared_cells::{BlockCell, FirstBlockCell, FirstBlockSetCell, LastBlockCell},
|
||||
cells::{
|
||||
SimpleStorableCell,
|
||||
shared_cells::{BlockCell, FirstBlockCell, FirstBlockSetCell, LastBlockCell},
|
||||
},
|
||||
error::DbError,
|
||||
sequencer::sequencer_cells::{
|
||||
LEEStateCellOwned, LEEStateCellRef, LastFinalizedBlockIdCell, LatestBlockMetaCellOwned,
|
||||
LatestBlockMetaCellRef, PendingDepositEventRecord, PendingDepositEventsCellOwned,
|
||||
PendingDepositEventsCellRef, ZoneSdkCheckpointCellOwned, ZoneSdkCheckpointCellRef,
|
||||
PendingDepositEventsCellRef, UnseenWithdrawCountCell, WithdrawalReconciliationKey,
|
||||
ZoneSdkCheckpointCellOwned, ZoneSdkCheckpointCellRef,
|
||||
},
|
||||
};
|
||||
|
||||
@ -31,6 +35,8 @@ pub const DB_META_ZONE_SDK_CHECKPOINT_KEY: &str = "zone_sdk_checkpoint";
|
||||
/// Key base for storing queued deposit events that were not yet
|
||||
/// fulfilled on L2.
|
||||
pub const DB_META_PENDING_DEPOSIT_EVENTS_KEY: &str = "pending_deposit_events";
|
||||
/// Key base for counting unseen L2 withdraw intents.
|
||||
pub const DB_META_UNSEEN_WITHDRAW_COUNT_KEY: &str = "unseen_withdraw_count";
|
||||
|
||||
/// Key base for storing the LEE state.
|
||||
pub const DB_LEE_STATE_KEY: &str = "lee_state";
|
||||
@ -250,6 +256,14 @@ impl RocksDBIO {
|
||||
self.put(&PendingDepositEventsCellRef(records), ())
|
||||
}
|
||||
|
||||
fn put_pending_deposit_events_batch(
|
||||
&self,
|
||||
records: &[PendingDepositEventRecord],
|
||||
batch: &mut WriteBatch,
|
||||
) -> DbResult<()> {
|
||||
self.put_batch(&PendingDepositEventsCellRef(records), (), batch)
|
||||
}
|
||||
|
||||
pub fn add_pending_deposit_event(&self, event: PendingDepositEventRecord) -> DbResult<bool> {
|
||||
let mut records = self.get_pending_deposit_events()?;
|
||||
if records
|
||||
@ -263,10 +277,11 @@ impl RocksDBIO {
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub fn mark_pending_deposit_events_submitted(
|
||||
fn mark_pending_deposit_events_submitted(
|
||||
&self,
|
||||
deposit_op_ids: &[HashType],
|
||||
submitted_block_id: u64,
|
||||
batch: &mut WriteBatch,
|
||||
) -> DbResult<usize> {
|
||||
let mut records = self.get_pending_deposit_events()?;
|
||||
let mut updated: usize = 0;
|
||||
@ -280,7 +295,7 @@ impl RocksDBIO {
|
||||
}
|
||||
|
||||
if updated > 0 {
|
||||
self.put_pending_deposit_events(&records)?;
|
||||
self.put_pending_deposit_events_batch(&records, batch)?;
|
||||
}
|
||||
|
||||
Ok(updated)
|
||||
@ -306,6 +321,53 @@ impl RocksDBIO {
|
||||
Ok(removed)
|
||||
}
|
||||
|
||||
fn increment_unseen_withdraw_count(
|
||||
&self,
|
||||
withdrawal: WithdrawalReconciliationKey,
|
||||
batch: &mut WriteBatch,
|
||||
) -> DbResult<u64> {
|
||||
let current = self
|
||||
.get_opt::<UnseenWithdrawCountCell>(withdrawal)?
|
||||
.map_or(0, |cell| cell.0);
|
||||
|
||||
let next = current.checked_add(1).ok_or_else(|| {
|
||||
DbError::db_interaction_error("Unseen withdraw counter overflow".to_owned())
|
||||
})?;
|
||||
|
||||
self.put_batch(&UnseenWithdrawCountCell(next), withdrawal, batch)?;
|
||||
|
||||
Ok(next)
|
||||
}
|
||||
|
||||
pub fn consume_unseen_withdraw_count(
|
||||
&self,
|
||||
withdrawal: WithdrawalReconciliationKey,
|
||||
) -> DbResult<bool> {
|
||||
let Some(current) = self
|
||||
.get_opt::<UnseenWithdrawCountCell>(withdrawal)?
|
||||
.map(|cell| cell.0)
|
||||
else {
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
if let Some(next) = current.checked_sub(1) {
|
||||
self.put(&UnseenWithdrawCountCell(next), withdrawal)?;
|
||||
} else {
|
||||
let cf_meta = self.meta_column();
|
||||
let db_key =
|
||||
<UnseenWithdrawCountCell as SimpleStorableCell>::key_constructor(withdrawal)?;
|
||||
|
||||
self.db.delete_cf(&cf_meta, db_key).map_err(|rerr| {
|
||||
DbError::rocksdb_cast_message(
|
||||
rerr,
|
||||
Some("Failed to delete unseen withdraw count".to_owned()),
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub fn put_block(&self, block: &Block, first: bool, batch: &mut WriteBatch) -> DbResult<()> {
|
||||
let cf_block = self.block_column();
|
||||
|
||||
@ -439,11 +501,26 @@ impl RocksDBIO {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn atomic_update(&self, block: &Block, state: &V03State) -> DbResult<()> {
|
||||
pub fn atomic_update(
|
||||
&self,
|
||||
block: &Block,
|
||||
deposit_op_ids: &[HashType],
|
||||
withdrawals: Vec<WithdrawalReconciliationKey>,
|
||||
state: &V03State,
|
||||
) -> DbResult<()> {
|
||||
let block_id = block.header.block_id;
|
||||
let mut batch = WriteBatch::default();
|
||||
|
||||
self.put_block(block, false, &mut batch)?;
|
||||
|
||||
self.mark_pending_deposit_events_submitted(deposit_op_ids, block_id, &mut batch)?;
|
||||
|
||||
for withdrawal in withdrawals {
|
||||
self.increment_unseen_withdraw_count(withdrawal, &mut batch)?;
|
||||
}
|
||||
|
||||
self.put_lee_state_in_db_batch(state, &mut batch)?;
|
||||
|
||||
self.db.write(batch).map_err(|rerr| {
|
||||
DbError::rocksdb_cast_message(
|
||||
rerr,
|
||||
|
||||
@ -9,7 +9,7 @@ use crate::{
|
||||
sequencer::{
|
||||
CF_LEE_STATE_NAME, DB_LEE_STATE_KEY, DB_META_LAST_FINALIZED_BLOCK_ID,
|
||||
DB_META_LATEST_BLOCK_META_KEY, DB_META_PENDING_DEPOSIT_EVENTS_KEY,
|
||||
DB_META_ZONE_SDK_CHECKPOINT_KEY,
|
||||
DB_META_UNSEEN_WITHDRAW_COUNT_KEY, DB_META_ZONE_SDK_CHECKPOINT_KEY,
|
||||
},
|
||||
};
|
||||
|
||||
@ -175,6 +175,52 @@ impl SimpleWritableCell for PendingDepositEventsCellRef<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct WithdrawalReconciliationKey {
|
||||
pub amount: u64,
|
||||
pub bedrock_account_pk: [u8; 32],
|
||||
}
|
||||
|
||||
#[derive(Debug, BorshSerialize, BorshDeserialize)]
|
||||
pub struct UnseenWithdrawCountCell(pub u64);
|
||||
|
||||
impl SimpleStorableCell for UnseenWithdrawCountCell {
|
||||
type KeyParams = WithdrawalReconciliationKey;
|
||||
|
||||
const CELL_NAME: &'static str = DB_META_UNSEEN_WITHDRAW_COUNT_KEY;
|
||||
const CF_NAME: &'static str = CF_META_NAME;
|
||||
|
||||
fn key_constructor(key_params: Self::KeyParams) -> DbResult<Vec<u8>> {
|
||||
let WithdrawalReconciliationKey {
|
||||
amount,
|
||||
bedrock_account_pk,
|
||||
} = key_params;
|
||||
|
||||
borsh::to_vec(&(Self::CELL_NAME, amount, bedrock_account_pk)).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some(format!(
|
||||
"Failed to serialize {:?} key params",
|
||||
Self::CELL_NAME
|
||||
)),
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl SimpleReadableCell for UnseenWithdrawCountCell {}
|
||||
|
||||
impl SimpleWritableCell for UnseenWithdrawCountCell {
|
||||
fn value_constructor(&self) -> DbResult<Vec<u8>> {
|
||||
borsh::to_vec(&self).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize unseen withdraw count".to_owned()),
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod uniform_tests {
|
||||
use crate::{
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user