fix: differentiate between user and sequencer transactions in mempool

This commit is contained in:
Daniil Polyakov 2026-05-21 15:52:09 +03:00
parent 64a2e5c5cb
commit 79c5932e80
5 changed files with 139 additions and 95 deletions

View File

@ -7,8 +7,14 @@ use integration_tests::{
public_mention, verify_commitment_is_in_state,
};
use log::info;
use nssa::{AccountId, program::Program};
use nssa_core::{NullifierPublicKey, encryption::shared_key_derivation::Secp256k1Point};
use nssa::{
AccountId, SharedSecretKey, execute_and_prove,
privacy_preserving_transaction::circuit::ProgramWithDependencies, program::Program,
};
use nssa_core::{
InputAccountIdentity, NullifierPublicKey, account::AccountWithMetadata,
encryption::shared_key_derivation::Secp256k1Point,
};
use sequencer_service_rpc::RpcClient as _;
use tokio::test;
use wallet::{
@ -626,13 +632,7 @@ async fn shielded_transfers_to_two_identifiers_same_npk() -> Result<()> {
}
#[test]
async fn ppt_that_chain_calls_faucet_is_dropped() -> Result<()> {
use nssa::{
EphemeralPublicKey, SharedSecretKey, execute_and_prove,
privacy_preserving_transaction::{self, circuit::ProgramWithDependencies},
};
use nssa_core::{InputAccountIdentity, account::AccountWithMetadata};
async fn ppt_cant_chain_call_faucet() -> Result<()> {
let ctx = TestContext::new().await?;
let binary = std::fs::read(
@ -656,7 +656,6 @@ async fn ppt_that_chain_calls_faucet_is_dropped() -> Result<()> {
let npk = NullifierPublicKey::from(&nsk);
let vpk = Secp256k1Point::from_scalar([4; 32]);
let ssk = SharedSecretKey::new([55; 32], &vpk);
let epk = EphemeralPublicKey::from_scalar([55; 32]);
let attacker_vault_id = {
let seed = vault_core::compute_vault_seed(attacker_id);
AccountId::for_private_pda(&vault_program_id, &seed, &npk, 1337)
@ -695,7 +694,7 @@ async fn ppt_that_chain_calls_faucet_is_dropped() -> Result<()> {
let instruction =
Program::serialize_instruction((faucet_program_id, vault_program_id, attacker_id, amount))?;
let (output, proof) = execute_and_prove(
let res = execute_and_prove(
vec![faucet_pre, vault_pda_pre],
instruction,
vec![
@ -707,47 +706,9 @@ async fn ppt_that_chain_calls_faucet_is_dropped() -> Result<()> {
},
],
&program_with_deps,
)?;
);
let message = privacy_preserving_transaction::Message::try_from_circuit_output(
vec![faucet_account_id],
vec![],
vec![(npk, vpk, epk)],
output,
)?;
let witness_set = privacy_preserving_transaction::WitnessSet::for_message(&message, proof, &[]);
let attack_ppt = NSSATransaction::PrivacyPreserving(nssa::PrivacyPreservingTransaction::new(
message,
witness_set,
));
let faucet_balance_before = ctx
.sequencer_client()
.get_account_balance(faucet_account_id)
.await?;
let vault_balance_before = ctx
.sequencer_client()
.get_account_balance(attacker_vault_id)
.await?;
let tx_hash = ctx.sequencer_client().send_transaction(attack_ppt).await?;
info!("Waiting for next block creation");
tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await;
let faucet_balance_after = ctx
.sequencer_client()
.get_account_balance(faucet_account_id)
.await?;
let vault_balance_after = ctx
.sequencer_client()
.get_account_balance(attacker_vault_id)
.await?;
let tx_on_chain = ctx.sequencer_client().get_transaction(tx_hash).await?;
assert_eq!(faucet_balance_after, faucet_balance_before);
assert_eq!(vault_balance_after, vault_balance_before);
assert!(tx_on_chain.is_none());
assert!(res.is_err());
Ok(())
}

View File

@ -28,10 +28,18 @@ pub mod config;
#[cfg(feature = "mock")]
pub mod mock;
/// The origin of a transaction.
pub enum TransactionOrigin {
/// Basic transactions submitted by users via RPC.
User,
/// Transactions generated by the sequencer itself.
Sequencer,
}
pub struct SequencerCore<BP: BlockPublisherTrait = ZoneSdkPublisher> {
state: nssa::V03State,
store: SequencerStore,
mempool: MemPool<NSSATransaction>,
mempool: MemPool<(TransactionOrigin, NSSATransaction)>,
sequencer_config: SequencerConfig,
chain_height: u64,
block_publisher: BP,
@ -45,7 +53,7 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
/// initializing its state with the accounts defined in the configuration file.
pub async fn start_from_config(
config: SequencerConfig,
) -> (Self, MemPoolHandle<NSSATransaction>) {
) -> (Self, MemPoolHandle<(TransactionOrigin, NSSATransaction)>) {
let signing_key = nssa::PrivateKey::try_new(config.signing_key).unwrap();
let bedrock_signing_key =
@ -207,7 +215,7 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
let clock_tx = clock_invocation(new_block_timestamp);
let clock_nssa_tx = NSSATransaction::Public(clock_tx.clone());
while let Some(tx) = self.mempool.pop() {
while let Some((origin, tx)) = self.mempool.pop() {
let tx_hash = tx.hash();
// Check if block size exceeds limit (including the mandatory clock tx).
@ -235,25 +243,41 @@ impl<BP: BlockPublisherTrait> SequencerCore<BP> {
block size {block_size} bytes would exceed limit of {max_block_size} bytes",
);
self.mempool.push_front(tx);
self.mempool.push_front((origin, tx));
break;
}
let validated_diff = match tx.validate_on_state(
&self.state,
new_block_height,
new_block_timestamp,
) {
Ok(diff) => diff,
Err(err) => {
error!(
"Transaction with hash {tx_hash} failed execution check with error: {err:#?}, skipping it",
);
continue;
}
};
match origin {
TransactionOrigin::User => {
let validated_diff = match tx.validate_on_state(
&self.state,
new_block_height,
new_block_timestamp,
) {
Ok(diff) => diff,
Err(err) => {
error!(
"Transaction with hash {tx_hash} failed execution check with error: {err:#?}, skipping it",
);
continue;
}
};
self.state.apply_state_diff(validated_diff);
self.state.apply_state_diff(validated_diff);
}
TransactionOrigin::Sequencer => {
let NSSATransaction::Public(public_tx) = &tx else {
panic!("Sequencer may only generate Public transactions, found {tx:#?}");
};
self.state
.transition_from_public_transaction(
public_tx,
new_block_height,
new_block_timestamp,
)
.context("Failed to execute sequencer-generated transaction")?;
}
}
valid_transactions.push(tx);
info!("Validated transaction with hash {tx_hash}, including it in block");
@ -460,6 +484,7 @@ mod tests {
use testnet_initial_state::{initial_accounts, initial_pub_accounts_private_keys};
use crate::{
TransactionOrigin,
block_store::SequencerStore,
build_genesis_state,
config::{BedrockConfig, SequencerConfig},
@ -495,19 +520,28 @@ mod tests {
initial_pub_accounts_private_keys()[1].pub_sign_key.clone()
}
async fn common_setup() -> (SequencerCoreWithMockClients, MemPoolHandle<NSSATransaction>) {
async fn common_setup() -> (
SequencerCoreWithMockClients,
MemPoolHandle<(TransactionOrigin, NSSATransaction)>,
) {
let config = setup_sequencer_config();
common_setup_with_config(config).await
}
async fn common_setup_with_config(
config: SequencerConfig,
) -> (SequencerCoreWithMockClients, MemPoolHandle<NSSATransaction>) {
) -> (
SequencerCoreWithMockClients,
MemPoolHandle<(TransactionOrigin, NSSATransaction)>,
) {
let (mut sequencer, mempool_handle) =
SequencerCoreWithMockClients::start_from_config(config).await;
let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap();
mempool_handle
.push((TransactionOrigin::User, tx))
.await
.unwrap();
sequencer.produce_new_block().await.unwrap();
@ -697,10 +731,13 @@ mod tests {
let tx = common::test_utils::produce_dummy_empty_transaction();
// Fill the mempool
mempool_handle.push(tx.clone()).await.unwrap();
mempool_handle
.push((TransactionOrigin::User, tx.clone()))
.await
.unwrap();
// Check that pushing another transaction will block
let mut push_fut = pin!(mempool_handle.push(tx.clone()));
let mut push_fut = pin!(mempool_handle.push((TransactionOrigin::User, tx.clone())));
let poll = futures::poll!(push_fut.as_mut());
assert!(poll.is_pending());
@ -717,7 +754,10 @@ mod tests {
let genesis_height = sequencer.chain_height;
let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap();
mempool_handle
.push((TransactionOrigin::User, tx))
.await
.unwrap();
let result = sequencer.build_block_from_mempool();
assert!(result.is_ok());
@ -740,8 +780,14 @@ mod tests {
let tx_original = tx.clone();
let tx_replay = tx.clone();
// Pushing two copies of the same tx to the mempool
mempool_handle.push(tx_original).await.unwrap();
mempool_handle.push(tx_replay).await.unwrap();
mempool_handle
.push((TransactionOrigin::User, tx_original))
.await
.unwrap();
mempool_handle
.push((TransactionOrigin::User, tx_replay))
.await
.unwrap();
// Create block
sequencer.produce_new_block().await.unwrap();
@ -775,7 +821,10 @@ mod tests {
);
// The transaction should be included the first time
mempool_handle.push(tx.clone()).await.unwrap();
mempool_handle
.push((TransactionOrigin::User, tx.clone()))
.await
.unwrap();
sequencer.produce_new_block().await.unwrap();
let block = sequencer
.store
@ -791,7 +840,10 @@ mod tests {
);
// Add same transaction should fail
mempool_handle.push(tx.clone()).await.unwrap();
mempool_handle
.push((TransactionOrigin::User, tx.clone()))
.await
.unwrap();
sequencer.produce_new_block().await.unwrap();
let block = sequencer
.store
@ -830,7 +882,10 @@ mod tests {
&signing_key,
);
mempool_handle.push(tx.clone()).await.unwrap();
mempool_handle
.push((TransactionOrigin::User, tx.clone()))
.await
.unwrap();
sequencer.produce_new_block().await.unwrap();
let block = sequencer
.store
@ -914,7 +969,10 @@ mod tests {
&signing_key,
);
mempool_handle.push(tx).await.unwrap();
mempool_handle
.push((TransactionOrigin::User, tx))
.await
.unwrap();
sequencer.produce_new_block().await.unwrap();
// Get the metadata of the last block produced
@ -935,7 +993,10 @@ mod tests {
&signing_key,
);
mempool_handle.push(tx.clone()).await.unwrap();
mempool_handle
.push((TransactionOrigin::User, tx.clone()))
.await
.unwrap();
// Step 4: Produce new block
sequencer.produce_new_block().await.unwrap();
@ -981,10 +1042,16 @@ mod tests {
))
};
mempool_handle
.push(NSSATransaction::Public(clock_invocation(0)))
.push((
TransactionOrigin::User,
NSSATransaction::Public(clock_invocation(0)),
))
.await
.unwrap();
mempool_handle
.push((TransactionOrigin::User, crafted_clock_tx))
.await
.unwrap();
mempool_handle.push(crafted_clock_tx).await.unwrap();
sequencer.produce_new_block().await.unwrap();
let block = sequencer
@ -1013,7 +1080,10 @@ mod tests {
test_program_methods::CLOCK_CHAIN_CALLER_ELF.to_vec(),
),
));
mempool_handle.push(deploy_tx).await.unwrap();
mempool_handle
.push((TransactionOrigin::User, deploy_tx))
.await
.unwrap();
sequencer.produce_new_block().await.unwrap();
// Build a user transaction that invokes clock_chain_caller, which in turn chain-calls the
@ -1038,7 +1108,10 @@ mod tests {
nssa::public_transaction::WitnessSet::from_raw_parts(vec![]),
));
mempool_handle.push(user_tx).await.unwrap();
mempool_handle
.push((TransactionOrigin::User, user_tx))
.await
.unwrap();
sequencer.produce_new_block().await.unwrap();
let block = sequencer
@ -1070,7 +1143,10 @@ mod tests {
// Push a dummy transaction so the mempool is non-empty.
let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap();
mempool_handle
.push((TransactionOrigin::User, tx))
.await
.unwrap();
// Block production must fail because the appended clock tx cannot execute.
let result = sequencer.produce_new_block().await;

View File

@ -25,6 +25,7 @@ use mempool::MemPoolHandle;
use sequencer_core::SequencerCore;
#[cfg(feature = "standalone")]
use sequencer_core::SequencerCoreWithMockClients as SequencerCore;
use sequencer_core::TransactionOrigin;
pub use sequencer_core::config::*;
use sequencer_service_rpc::RpcServer as _;
use tokio::{sync::Mutex, task::JoinHandle};
@ -205,7 +206,7 @@ pub async fn run(config: SequencerConfig, port: u16) -> Result<SequencerHandle>
async fn run_server(
sequencer: Arc<Mutex<SequencerCore>>,
mempool_handle: MemPoolHandle<NSSATransaction>,
mempool_handle: MemPoolHandle<(TransactionOrigin, NSSATransaction)>,
port: u16,
max_block_size: u64,
) -> Result<(ServerHandle, SocketAddr)> {
@ -253,7 +254,7 @@ async fn main_loop(seq_core: Arc<Mutex<SequencerCore>>, block_timeout: Duration)
#[cfg(not(feature = "standalone"))]
async fn bedrock_deposit_loop(
bedrock_config: BedrockConfig,
mempool_handle: MemPoolHandle<NSSATransaction>,
mempool_handle: MemPoolHandle<(TransactionOrigin, NSSATransaction)>,
) -> Result<Never> {
let basic_auth = bedrock_config.auth.map(Into::into);
let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), bedrock_config.node_url);
@ -320,7 +321,7 @@ async fn bedrock_deposit_loop(
recipient_id = metadata.recipient_id
);
mempool_handle
.push(tx)
.push((TransactionOrigin::Sequencer, tx))
.await
.context("Mempool is closed while pushing Bedrock Deposit transaction")?;
}

View File

@ -8,7 +8,9 @@ use jsonrpsee::{
use log::warn;
use mempool::MemPoolHandle;
use nssa::{self, program::Program};
use sequencer_core::{DbError, SequencerCore, block_publisher::BlockPublisherTrait};
use sequencer_core::{
DbError, SequencerCore, TransactionOrigin, block_publisher::BlockPublisherTrait,
};
use sequencer_service_protocol::{
Account, AccountId, Block, BlockId, Commitment, HashType, MembershipProof, Nonce, ProgramId,
};
@ -18,14 +20,14 @@ const NOT_FOUND_ERROR_CODE: i32 = -31999;
pub struct SequencerService<BC: BlockPublisherTrait> {
sequencer: Arc<Mutex<SequencerCore<BC>>>,
mempool_handle: MemPoolHandle<NSSATransaction>,
mempool_handle: MemPoolHandle<(TransactionOrigin, NSSATransaction)>,
max_block_size: u64,
}
impl<BC: BlockPublisherTrait> SequencerService<BC> {
pub const fn new(
sequencer: Arc<Mutex<SequencerCore<BC>>>,
mempool_handle: MemPoolHandle<NSSATransaction>,
mempool_handle: MemPoolHandle<(TransactionOrigin, NSSATransaction)>,
max_block_size: u64,
) -> Self {
Self {
@ -72,7 +74,7 @@ impl<BC: BlockPublisherTrait + Send + 'static> sequencer_service_rpc::RpcServer
})?;
self.mempool_handle
.push(authenticated_tx)
.push((TransactionOrigin::User, authenticated_tx))
.await
.expect("Mempool is closed, this is a bug");

View File

@ -191,5 +191,9 @@ pub fn addr_to_url(protocol: UrlProtocol, addr: SocketAddr) -> Result<Url> {
#[must_use]
pub fn bedrock_channel_id() -> ChannelId {
ChannelId::from([0_u8; 32])
let channel_id: [u8; 32] = [0_u8, 1]
.repeat(16)
.try_into()
.unwrap_or_else(|_| unreachable!());
ChannelId::from(channel_id)
}