From 79c5932e801606444aafca3b7c40d6ab2ba8ac5d Mon Sep 17 00:00:00 2001 From: Daniil Polyakov Date: Thu, 21 May 2026 15:52:09 +0300 Subject: [PATCH] fix: differentiate between user and sequencer transactions in mempool --- .../tests/auth_transfer/private.rs | 63 ++------ sequencer/core/src/lib.rs | 148 +++++++++++++----- sequencer/service/src/lib.rs | 7 +- sequencer/service/src/service.rs | 10 +- test_fixtures/src/config.rs | 6 +- 5 files changed, 139 insertions(+), 95 deletions(-) diff --git a/integration_tests/tests/auth_transfer/private.rs b/integration_tests/tests/auth_transfer/private.rs index feb5e5e8..59be32da 100644 --- a/integration_tests/tests/auth_transfer/private.rs +++ b/integration_tests/tests/auth_transfer/private.rs @@ -7,8 +7,14 @@ use integration_tests::{ public_mention, verify_commitment_is_in_state, }; use log::info; -use nssa::{AccountId, program::Program}; -use nssa_core::{NullifierPublicKey, encryption::shared_key_derivation::Secp256k1Point}; +use nssa::{ + AccountId, SharedSecretKey, execute_and_prove, + privacy_preserving_transaction::circuit::ProgramWithDependencies, program::Program, +}; +use nssa_core::{ + InputAccountIdentity, NullifierPublicKey, account::AccountWithMetadata, + encryption::shared_key_derivation::Secp256k1Point, +}; use sequencer_service_rpc::RpcClient as _; use tokio::test; use wallet::{ @@ -626,13 +632,7 @@ async fn shielded_transfers_to_two_identifiers_same_npk() -> Result<()> { } #[test] -async fn ppt_that_chain_calls_faucet_is_dropped() -> Result<()> { - use nssa::{ - EphemeralPublicKey, SharedSecretKey, execute_and_prove, - privacy_preserving_transaction::{self, circuit::ProgramWithDependencies}, - }; - use nssa_core::{InputAccountIdentity, account::AccountWithMetadata}; - +async fn ppt_cant_chain_call_faucet() -> Result<()> { let ctx = TestContext::new().await?; let binary = std::fs::read( @@ -656,7 +656,6 @@ async fn ppt_that_chain_calls_faucet_is_dropped() -> Result<()> { let npk = NullifierPublicKey::from(&nsk); let vpk = Secp256k1Point::from_scalar([4; 32]); let ssk = SharedSecretKey::new([55; 32], &vpk); - let epk = EphemeralPublicKey::from_scalar([55; 32]); let attacker_vault_id = { let seed = vault_core::compute_vault_seed(attacker_id); AccountId::for_private_pda(&vault_program_id, &seed, &npk, 1337) @@ -695,7 +694,7 @@ async fn ppt_that_chain_calls_faucet_is_dropped() -> Result<()> { let instruction = Program::serialize_instruction((faucet_program_id, vault_program_id, attacker_id, amount))?; - let (output, proof) = execute_and_prove( + let res = execute_and_prove( vec![faucet_pre, vault_pda_pre], instruction, vec![ @@ -707,47 +706,9 @@ async fn ppt_that_chain_calls_faucet_is_dropped() -> Result<()> { }, ], &program_with_deps, - )?; + ); - let message = privacy_preserving_transaction::Message::try_from_circuit_output( - vec![faucet_account_id], - vec![], - vec![(npk, vpk, epk)], - output, - )?; - let witness_set = privacy_preserving_transaction::WitnessSet::for_message(&message, proof, &[]); - let attack_ppt = NSSATransaction::PrivacyPreserving(nssa::PrivacyPreservingTransaction::new( - message, - witness_set, - )); - - let faucet_balance_before = ctx - .sequencer_client() - .get_account_balance(faucet_account_id) - .await?; - let vault_balance_before = ctx - .sequencer_client() - .get_account_balance(attacker_vault_id) - .await?; - - let tx_hash = ctx.sequencer_client().send_transaction(attack_ppt).await?; - - info!("Waiting for next block creation"); - tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await; - - let faucet_balance_after = ctx - .sequencer_client() - .get_account_balance(faucet_account_id) - .await?; - let vault_balance_after = ctx - .sequencer_client() - .get_account_balance(attacker_vault_id) - .await?; - let tx_on_chain = ctx.sequencer_client().get_transaction(tx_hash).await?; - - assert_eq!(faucet_balance_after, faucet_balance_before); - assert_eq!(vault_balance_after, vault_balance_before); - assert!(tx_on_chain.is_none()); + assert!(res.is_err()); Ok(()) } diff --git a/sequencer/core/src/lib.rs b/sequencer/core/src/lib.rs index 17421d4b..3425db2a 100644 --- a/sequencer/core/src/lib.rs +++ b/sequencer/core/src/lib.rs @@ -28,10 +28,18 @@ pub mod config; #[cfg(feature = "mock")] pub mod mock; +/// The origin of a transaction. +pub enum TransactionOrigin { + /// Basic transactions submitted by users via RPC. + User, + /// Transactions generated by the sequencer itself. + Sequencer, +} + pub struct SequencerCore { state: nssa::V03State, store: SequencerStore, - mempool: MemPool, + mempool: MemPool<(TransactionOrigin, NSSATransaction)>, sequencer_config: SequencerConfig, chain_height: u64, block_publisher: BP, @@ -45,7 +53,7 @@ impl SequencerCore { /// initializing its state with the accounts defined in the configuration file. pub async fn start_from_config( config: SequencerConfig, - ) -> (Self, MemPoolHandle) { + ) -> (Self, MemPoolHandle<(TransactionOrigin, NSSATransaction)>) { let signing_key = nssa::PrivateKey::try_new(config.signing_key).unwrap(); let bedrock_signing_key = @@ -207,7 +215,7 @@ impl SequencerCore { let clock_tx = clock_invocation(new_block_timestamp); let clock_nssa_tx = NSSATransaction::Public(clock_tx.clone()); - while let Some(tx) = self.mempool.pop() { + while let Some((origin, tx)) = self.mempool.pop() { let tx_hash = tx.hash(); // Check if block size exceeds limit (including the mandatory clock tx). @@ -235,25 +243,41 @@ impl SequencerCore { block size {block_size} bytes would exceed limit of {max_block_size} bytes", ); - self.mempool.push_front(tx); + self.mempool.push_front((origin, tx)); break; } - let validated_diff = match tx.validate_on_state( - &self.state, - new_block_height, - new_block_timestamp, - ) { - Ok(diff) => diff, - Err(err) => { - error!( - "Transaction with hash {tx_hash} failed execution check with error: {err:#?}, skipping it", - ); - continue; - } - }; + match origin { + TransactionOrigin::User => { + let validated_diff = match tx.validate_on_state( + &self.state, + new_block_height, + new_block_timestamp, + ) { + Ok(diff) => diff, + Err(err) => { + error!( + "Transaction with hash {tx_hash} failed execution check with error: {err:#?}, skipping it", + ); + continue; + } + }; - self.state.apply_state_diff(validated_diff); + self.state.apply_state_diff(validated_diff); + } + TransactionOrigin::Sequencer => { + let NSSATransaction::Public(public_tx) = &tx else { + panic!("Sequencer may only generate Public transactions, found {tx:#?}"); + }; + self.state + .transition_from_public_transaction( + public_tx, + new_block_height, + new_block_timestamp, + ) + .context("Failed to execute sequencer-generated transaction")?; + } + } valid_transactions.push(tx); info!("Validated transaction with hash {tx_hash}, including it in block"); @@ -460,6 +484,7 @@ mod tests { use testnet_initial_state::{initial_accounts, initial_pub_accounts_private_keys}; use crate::{ + TransactionOrigin, block_store::SequencerStore, build_genesis_state, config::{BedrockConfig, SequencerConfig}, @@ -495,19 +520,28 @@ mod tests { initial_pub_accounts_private_keys()[1].pub_sign_key.clone() } - async fn common_setup() -> (SequencerCoreWithMockClients, MemPoolHandle) { + async fn common_setup() -> ( + SequencerCoreWithMockClients, + MemPoolHandle<(TransactionOrigin, NSSATransaction)>, + ) { let config = setup_sequencer_config(); common_setup_with_config(config).await } async fn common_setup_with_config( config: SequencerConfig, - ) -> (SequencerCoreWithMockClients, MemPoolHandle) { + ) -> ( + SequencerCoreWithMockClients, + MemPoolHandle<(TransactionOrigin, NSSATransaction)>, + ) { let (mut sequencer, mempool_handle) = SequencerCoreWithMockClients::start_from_config(config).await; let tx = common::test_utils::produce_dummy_empty_transaction(); - mempool_handle.push(tx).await.unwrap(); + mempool_handle + .push((TransactionOrigin::User, tx)) + .await + .unwrap(); sequencer.produce_new_block().await.unwrap(); @@ -697,10 +731,13 @@ mod tests { let tx = common::test_utils::produce_dummy_empty_transaction(); // Fill the mempool - mempool_handle.push(tx.clone()).await.unwrap(); + mempool_handle + .push((TransactionOrigin::User, tx.clone())) + .await + .unwrap(); // Check that pushing another transaction will block - let mut push_fut = pin!(mempool_handle.push(tx.clone())); + let mut push_fut = pin!(mempool_handle.push((TransactionOrigin::User, tx.clone()))); let poll = futures::poll!(push_fut.as_mut()); assert!(poll.is_pending()); @@ -717,7 +754,10 @@ mod tests { let genesis_height = sequencer.chain_height; let tx = common::test_utils::produce_dummy_empty_transaction(); - mempool_handle.push(tx).await.unwrap(); + mempool_handle + .push((TransactionOrigin::User, tx)) + .await + .unwrap(); let result = sequencer.build_block_from_mempool(); assert!(result.is_ok()); @@ -740,8 +780,14 @@ mod tests { let tx_original = tx.clone(); let tx_replay = tx.clone(); // Pushing two copies of the same tx to the mempool - mempool_handle.push(tx_original).await.unwrap(); - mempool_handle.push(tx_replay).await.unwrap(); + mempool_handle + .push((TransactionOrigin::User, tx_original)) + .await + .unwrap(); + mempool_handle + .push((TransactionOrigin::User, tx_replay)) + .await + .unwrap(); // Create block sequencer.produce_new_block().await.unwrap(); @@ -775,7 +821,10 @@ mod tests { ); // The transaction should be included the first time - mempool_handle.push(tx.clone()).await.unwrap(); + mempool_handle + .push((TransactionOrigin::User, tx.clone())) + .await + .unwrap(); sequencer.produce_new_block().await.unwrap(); let block = sequencer .store @@ -791,7 +840,10 @@ mod tests { ); // Add same transaction should fail - mempool_handle.push(tx.clone()).await.unwrap(); + mempool_handle + .push((TransactionOrigin::User, tx.clone())) + .await + .unwrap(); sequencer.produce_new_block().await.unwrap(); let block = sequencer .store @@ -830,7 +882,10 @@ mod tests { &signing_key, ); - mempool_handle.push(tx.clone()).await.unwrap(); + mempool_handle + .push((TransactionOrigin::User, tx.clone())) + .await + .unwrap(); sequencer.produce_new_block().await.unwrap(); let block = sequencer .store @@ -914,7 +969,10 @@ mod tests { &signing_key, ); - mempool_handle.push(tx).await.unwrap(); + mempool_handle + .push((TransactionOrigin::User, tx)) + .await + .unwrap(); sequencer.produce_new_block().await.unwrap(); // Get the metadata of the last block produced @@ -935,7 +993,10 @@ mod tests { &signing_key, ); - mempool_handle.push(tx.clone()).await.unwrap(); + mempool_handle + .push((TransactionOrigin::User, tx.clone())) + .await + .unwrap(); // Step 4: Produce new block sequencer.produce_new_block().await.unwrap(); @@ -981,10 +1042,16 @@ mod tests { )) }; mempool_handle - .push(NSSATransaction::Public(clock_invocation(0))) + .push(( + TransactionOrigin::User, + NSSATransaction::Public(clock_invocation(0)), + )) + .await + .unwrap(); + mempool_handle + .push((TransactionOrigin::User, crafted_clock_tx)) .await .unwrap(); - mempool_handle.push(crafted_clock_tx).await.unwrap(); sequencer.produce_new_block().await.unwrap(); let block = sequencer @@ -1013,7 +1080,10 @@ mod tests { test_program_methods::CLOCK_CHAIN_CALLER_ELF.to_vec(), ), )); - mempool_handle.push(deploy_tx).await.unwrap(); + mempool_handle + .push((TransactionOrigin::User, deploy_tx)) + .await + .unwrap(); sequencer.produce_new_block().await.unwrap(); // Build a user transaction that invokes clock_chain_caller, which in turn chain-calls the @@ -1038,7 +1108,10 @@ mod tests { nssa::public_transaction::WitnessSet::from_raw_parts(vec![]), )); - mempool_handle.push(user_tx).await.unwrap(); + mempool_handle + .push((TransactionOrigin::User, user_tx)) + .await + .unwrap(); sequencer.produce_new_block().await.unwrap(); let block = sequencer @@ -1070,7 +1143,10 @@ mod tests { // Push a dummy transaction so the mempool is non-empty. let tx = common::test_utils::produce_dummy_empty_transaction(); - mempool_handle.push(tx).await.unwrap(); + mempool_handle + .push((TransactionOrigin::User, tx)) + .await + .unwrap(); // Block production must fail because the appended clock tx cannot execute. let result = sequencer.produce_new_block().await; diff --git a/sequencer/service/src/lib.rs b/sequencer/service/src/lib.rs index cb3208fe..30c1e366 100644 --- a/sequencer/service/src/lib.rs +++ b/sequencer/service/src/lib.rs @@ -25,6 +25,7 @@ use mempool::MemPoolHandle; use sequencer_core::SequencerCore; #[cfg(feature = "standalone")] use sequencer_core::SequencerCoreWithMockClients as SequencerCore; +use sequencer_core::TransactionOrigin; pub use sequencer_core::config::*; use sequencer_service_rpc::RpcServer as _; use tokio::{sync::Mutex, task::JoinHandle}; @@ -205,7 +206,7 @@ pub async fn run(config: SequencerConfig, port: u16) -> Result async fn run_server( sequencer: Arc>, - mempool_handle: MemPoolHandle, + mempool_handle: MemPoolHandle<(TransactionOrigin, NSSATransaction)>, port: u16, max_block_size: u64, ) -> Result<(ServerHandle, SocketAddr)> { @@ -253,7 +254,7 @@ async fn main_loop(seq_core: Arc>, block_timeout: Duration) #[cfg(not(feature = "standalone"))] async fn bedrock_deposit_loop( bedrock_config: BedrockConfig, - mempool_handle: MemPoolHandle, + mempool_handle: MemPoolHandle<(TransactionOrigin, NSSATransaction)>, ) -> Result { let basic_auth = bedrock_config.auth.map(Into::into); let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), bedrock_config.node_url); @@ -320,7 +321,7 @@ async fn bedrock_deposit_loop( recipient_id = metadata.recipient_id ); mempool_handle - .push(tx) + .push((TransactionOrigin::Sequencer, tx)) .await .context("Mempool is closed while pushing Bedrock Deposit transaction")?; } diff --git a/sequencer/service/src/service.rs b/sequencer/service/src/service.rs index 0bb8e1dd..4a478d56 100644 --- a/sequencer/service/src/service.rs +++ b/sequencer/service/src/service.rs @@ -8,7 +8,9 @@ use jsonrpsee::{ use log::warn; use mempool::MemPoolHandle; use nssa::{self, program::Program}; -use sequencer_core::{DbError, SequencerCore, block_publisher::BlockPublisherTrait}; +use sequencer_core::{ + DbError, SequencerCore, TransactionOrigin, block_publisher::BlockPublisherTrait, +}; use sequencer_service_protocol::{ Account, AccountId, Block, BlockId, Commitment, HashType, MembershipProof, Nonce, ProgramId, }; @@ -18,14 +20,14 @@ const NOT_FOUND_ERROR_CODE: i32 = -31999; pub struct SequencerService { sequencer: Arc>>, - mempool_handle: MemPoolHandle, + mempool_handle: MemPoolHandle<(TransactionOrigin, NSSATransaction)>, max_block_size: u64, } impl SequencerService { pub const fn new( sequencer: Arc>>, - mempool_handle: MemPoolHandle, + mempool_handle: MemPoolHandle<(TransactionOrigin, NSSATransaction)>, max_block_size: u64, ) -> Self { Self { @@ -72,7 +74,7 @@ impl sequencer_service_rpc::RpcServer })?; self.mempool_handle - .push(authenticated_tx) + .push((TransactionOrigin::User, authenticated_tx)) .await .expect("Mempool is closed, this is a bug"); diff --git a/test_fixtures/src/config.rs b/test_fixtures/src/config.rs index de7c2644..3119a195 100644 --- a/test_fixtures/src/config.rs +++ b/test_fixtures/src/config.rs @@ -191,5 +191,9 @@ pub fn addr_to_url(protocol: UrlProtocol, addr: SocketAddr) -> Result { #[must_use] pub fn bedrock_channel_id() -> ChannelId { - ChannelId::from([0_u8; 32]) + let channel_id: [u8; 32] = [0_u8, 1] + .repeat(16) + .try_into() + .unwrap_or_else(|_| unreachable!()); + ChannelId::from(channel_id) }