diff --git a/mempool/Cargo.toml b/mempool/Cargo.toml index c47d2b0..4601438 100644 --- a/mempool/Cargo.toml +++ b/mempool/Cargo.toml @@ -4,3 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] +tokio = { workspace = true, features = ["sync"] } + +[dev-dependencies] +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/mempool/src/lib.rs b/mempool/src/lib.rs index ab1c5d3..8fb87dc 100644 --- a/mempool/src/lib.rs +++ b/mempool/src/lib.rs @@ -1,231 +1,99 @@ -use std::collections::VecDeque; +use tokio::sync::mpsc::{Receiver, Sender}; -pub struct MemPool { - items: VecDeque, +pub struct MemPool { + receiver: Receiver, } -impl MemPool { - pub fn new() -> Self { - Self { - items: VecDeque::new(), - } +impl MemPool { + pub fn new(max_size: usize) -> (Self, MemPoolHandle) { + let (sender, receiver) = tokio::sync::mpsc::channel(max_size); + + let mem_pool = Self { receiver }; + let sender = MemPoolHandle::new(sender); + (mem_pool, sender) } - pub fn pop_last(&mut self) -> Option { - self.items.pop_front() - } + pub fn pop(&mut self) -> Option { + use tokio::sync::mpsc::error::TryRecvError; - pub fn peek_last(&self) -> Option<&Item> { - self.items.front() - } - - pub fn push_item(&mut self, item: Item) { - self.items.push_back(item); - } - - pub fn len(&self) -> usize { - self.items.len() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn pop_size(&mut self, size: usize) -> Vec { - let mut ret_vec = vec![]; - - for _ in 0..size { - let item = self.pop_last(); - - match item { - Some(item) => ret_vec.push(item), - None => break, + match self.receiver.try_recv() { + Ok(item) => Some(item), + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Disconnected) => { + panic!("Mempool senders disconnected, cannot receive items, this is a bug") } } - - ret_vec - } - - pub fn drain_size(&mut self, remainder: usize) -> Vec { - self.pop_size(self.len().saturating_sub(remainder)) } } -impl Default for MemPool { - fn default() -> Self { - Self::new() +pub struct MemPoolHandle { + sender: Sender, +} + +impl MemPoolHandle { + fn new(sender: Sender) -> Self { + Self { sender } + } + + /// Send an item to the mempool blocking if max size is reached + pub async fn push(&self, item: T) -> Result<(), tokio::sync::mpsc::error::SendError> { + self.sender.send(item).await } } #[cfg(test)] mod tests { - use std::vec; - use super::*; - pub type ItemId = u64; + use tokio::test; - #[derive(Debug, PartialEq, Eq)] - pub struct TestItem { - id: ItemId, - } - - fn test_item_with_id(id: u64) -> TestItem { - TestItem { id } + #[test] + async fn test_mempool_new() { + let (mut pool, _handle): (MemPool, _) = MemPool::new(10); + assert_eq!(pool.pop(), None); } #[test] - fn test_create_empty_mempool() { - let _: MemPool = MemPool::new(); + async fn test_push_and_pop() { + let (mut pool, handle) = MemPool::new(10); + + handle.push(1).await.unwrap(); + + let item = pool.pop(); + assert_eq!(item, Some(1)); + assert_eq!(pool.pop(), None); } #[test] - fn test_mempool_new() { - let pool: MemPool = MemPool::new(); - assert!(pool.is_empty()); - assert_eq!(pool.len(), 0); + async fn test_multiple_push_pop() { + let (mut pool, handle) = MemPool::new(10); + + handle.push(1).await.unwrap(); + handle.push(2).await.unwrap(); + handle.push(3).await.unwrap(); + + assert_eq!(pool.pop(), Some(1)); + assert_eq!(pool.pop(), Some(2)); + assert_eq!(pool.pop(), Some(3)); + assert_eq!(pool.pop(), None); } #[test] - fn test_push_item() { - let mut pool = MemPool::new(); - pool.push_item(test_item_with_id(1)); - assert!(!pool.is_empty()); - assert_eq!(pool.len(), 1); + async fn test_pop_empty() { + let (mut pool, _handle): (MemPool, _) = MemPool::new(10); + assert_eq!(pool.pop(), None); } #[test] - fn test_pop_last() { - let mut pool = MemPool::new(); - pool.push_item(test_item_with_id(1)); - pool.push_item(test_item_with_id(2)); - let item = pool.pop_last(); - assert_eq!(item, Some(test_item_with_id(1))); - assert_eq!(pool.len(), 1); - } + async fn test_max_size() { + let (mut pool, handle) = MemPool::new(2); - #[test] - fn test_peek_last() { - let mut pool = MemPool::new(); - pool.push_item(test_item_with_id(1)); - pool.push_item(test_item_with_id(2)); - let item = pool.peek_last(); - assert_eq!(item, Some(&test_item_with_id(1))); - } + handle.push(1).await.unwrap(); + handle.push(2).await.unwrap(); - #[test] - fn test_pop_size() { - let mut pool = MemPool::new(); - pool.push_item(test_item_with_id(1)); - pool.push_item(test_item_with_id(2)); - pool.push_item(test_item_with_id(3)); - - let items = pool.pop_size(2); - assert_eq!(items, vec![test_item_with_id(1), test_item_with_id(2)]); - assert_eq!(pool.len(), 1); - } - - #[test] - fn test_drain_size() { - let mut pool = MemPool::new(); - pool.push_item(test_item_with_id(1)); - pool.push_item(test_item_with_id(2)); - pool.push_item(test_item_with_id(3)); - pool.push_item(test_item_with_id(4)); - - let items = pool.drain_size(2); - assert_eq!(items, vec![test_item_with_id(1), test_item_with_id(2)]); - assert_eq!(pool.len(), 2); - } - - #[test] - fn test_default() { - let pool: MemPool = MemPool::default(); - assert!(pool.is_empty()); - assert_eq!(pool.len(), 0); - } - - #[test] - fn test_is_empty() { - let mut pool = MemPool::new(); - assert!(pool.is_empty()); - pool.push_item(test_item_with_id(1)); - assert!(!pool.is_empty()); - } - - #[test] - fn test_push_pop() { - let mut mempool: MemPool = MemPool::new(); - - let items = vec![ - test_item_with_id(1), - test_item_with_id(2), - test_item_with_id(3), - ]; - - for item in items { - mempool.push_item(item); - } - assert_eq!(mempool.len(), 3); - - let item = mempool.pop_last(); - - assert_eq!(item, Some(TestItem { id: 1 })); - assert_eq!(mempool.len(), 2); - - let item = mempool.pop_last(); - - assert_eq!(item, Some(TestItem { id: 2 })); - assert_eq!(mempool.len(), 1); - - let item = mempool.pop_last(); - - assert_eq!(item, Some(TestItem { id: 3 })); - assert_eq!(mempool.len(), 0); - - let item = mempool.pop_last(); - - assert_eq!(item, None); - } - - #[test] - fn test_pop_many() { - let mut mempool: MemPool = MemPool::new(); - - let mut items = vec![]; - - for i in 1..11 { - items.push(test_item_with_id(i)); - } - - for item in items { - mempool.push_item(item); - } - - assert_eq!(mempool.len(), 10); - - let items1 = mempool.pop_size(4); - assert_eq!( - items1, - vec![ - test_item_with_id(1), - test_item_with_id(2), - test_item_with_id(3), - test_item_with_id(4) - ] - ); - assert_eq!(mempool.len(), 6); - - let items2 = mempool.drain_size(2); - assert_eq!( - items2, - vec![ - test_item_with_id(5), - test_item_with_id(6), - test_item_with_id(7), - test_item_with_id(8) - ] - ); - assert_eq!(mempool.len(), 2); + // This should block if buffer is full, but we'll use try_send in a real scenario + // For now, just verify we can pop items + assert_eq!(pool.pop(), Some(1)); + assert_eq!(pool.pop(), Some(2)); } } diff --git a/sequencer_core/Cargo.toml b/sequencer_core/Cargo.toml index 6e9979c..32c263c 100644 --- a/sequencer_core/Cargo.toml +++ b/sequencer_core/Cargo.toml @@ -28,3 +28,7 @@ path = "../nssa" [features] default = [] testnet = [] + +[dev-dependencies] +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } +futures.workspace = true diff --git a/sequencer_core/src/block_store.rs b/sequencer_core/src/block_store.rs index 6fd9fb7..83a94b3 100644 --- a/sequencer_core/src/block_store.rs +++ b/sequencer_core/src/block_store.rs @@ -4,15 +4,15 @@ use anyhow::Result; use common::{HashType, block::Block, transaction::EncodedTransaction}; use storage::RocksDBIO; -pub struct SequecerBlockStore { +pub struct SequencerBlockStore { dbio: RocksDBIO, // TODO: Consider adding the hashmap to the database for faster recovery. - pub tx_hash_to_block_map: HashMap, - pub genesis_id: u64, - pub signing_key: nssa::PrivateKey, + tx_hash_to_block_map: HashMap, + genesis_id: u64, + signing_key: nssa::PrivateKey, } -impl SequecerBlockStore { +impl SequencerBlockStore { ///Starting database at the start of new chain. /// Creates files if necessary. /// @@ -42,7 +42,7 @@ impl SequecerBlockStore { ///Reopening existing database pub fn open_db_restart(location: &Path, signing_key: nssa::PrivateKey) -> Result { - SequecerBlockStore::open_db_with_genesis(location, None, signing_key) + SequencerBlockStore::open_db_with_genesis(location, None, signing_key) } pub fn get_block_at_id(&self, id: u64) -> Result { @@ -69,6 +69,18 @@ impl SequecerBlockStore { } None } + + pub fn insert(&mut self, tx: &EncodedTransaction, block_id: u64) { + self.tx_hash_to_block_map.insert(tx.hash(), block_id); + } + + pub fn genesis_id(&self) -> u64 { + self.genesis_id + } + + pub fn signing_key(&self) -> &nssa::PrivateKey { + &self.signing_key + } } pub(crate) fn block_to_transactions_map(block: &Block) -> HashMap { @@ -104,7 +116,7 @@ mod tests { let genesis_block = genesis_block_hashable_data.into_block(&signing_key); // Start an empty node store let mut node_store = - SequecerBlockStore::open_db_with_genesis(path, Some(genesis_block), signing_key) + SequencerBlockStore::open_db_with_genesis(path, Some(genesis_block), signing_key) .unwrap(); let tx = common::test_utils::produce_dummy_empty_transaction(); diff --git a/sequencer_core/src/config.rs b/sequencer_core/src/config.rs index a86bcb3..9927df1 100644 --- a/sequencer_core/src/config.rs +++ b/sequencer_core/src/config.rs @@ -16,6 +16,7 @@ pub struct CommitmentsInitialData { pub account: nssa_core::account::Account, } +// TODO: Provide default values #[derive(Clone, Serialize, Deserialize)] pub struct SequencerConfig { ///Home dir of sequencer storage diff --git a/sequencer_core/src/lib.rs b/sequencer_core/src/lib.rs index 479a830..4127fd0 100644 --- a/sequencer_core/src/lib.rs +++ b/sequencer_core/src/lib.rs @@ -10,25 +10,24 @@ use common::{ }; use config::SequencerConfig; use log::warn; -use mempool::MemPool; +use mempool::{MemPool, MemPoolHandle}; use serde::{Deserialize, Serialize}; -use crate::block_store::SequecerBlockStore; +use crate::block_store::SequencerBlockStore; pub mod block_store; pub mod config; pub struct SequencerCore { - pub state: nssa::V02State, - pub block_store: SequecerBlockStore, - pub mempool: MemPool, - pub sequencer_config: SequencerConfig, - pub chain_height: u64, + state: nssa::V02State, + block_store: SequencerBlockStore, + mempool: MemPool, + sequencer_config: SequencerConfig, + chain_height: u64, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum TransactionMalformationError { - MempoolFullForRound, InvalidSignature, FailedToDecode { tx: HashType }, } @@ -42,7 +41,8 @@ impl Display for TransactionMalformationError { impl std::error::Error for TransactionMalformationError {} impl SequencerCore { - pub fn start_from_config(config: SequencerConfig) -> Self { + /// Start Sequencer from configuration and construct transaction sender + pub fn start_from_config(config: SequencerConfig) -> (Self, MemPoolHandle) { let hashable_data = HashableBlockData { block_id: config.genesis_id, transactions: vec![], @@ -55,7 +55,7 @@ impl SequencerCore { //Sequencer should panic if unable to open db, //as fixing this issue may require actions non-native to program scope - let block_store = SequecerBlockStore::open_db_with_genesis( + let block_store = SequencerBlockStore::open_db_with_genesis( &config.home.join("rocksdb"), Some(genesis_block), signing_key, @@ -86,17 +86,18 @@ impl SequencerCore { #[cfg(feature = "testnet")] state.add_pinata_program(PINATA_BASE58.parse().unwrap()); + let (mempool, mempool_handle) = MemPool::new(config.mempool_max_size); let mut this = Self { state, block_store, - mempool: MemPool::default(), + mempool, chain_height: config.genesis_id, sequencer_config: config, }; this.sync_state_with_stored_blocks(); - this + (this, mempool_handle) } /// If there are stored blocks ahead of the current height, this method will load and process all transaction @@ -110,108 +111,50 @@ impl SequencerCore { self.execute_check_transaction_on_state(transaction) .unwrap(); // Update the tx hash to block id map. - self.block_store - .tx_hash_to_block_map - .insert(encoded_transaction.hash(), next_block_id); + self.block_store.insert(&encoded_transaction, next_block_id); } self.chain_height = next_block_id; next_block_id += 1; } } - pub fn transaction_pre_check( - &mut self, - tx: NSSATransaction, - ) -> Result { - // Stateless checks here - match tx { - NSSATransaction::Public(tx) => { - if tx.witness_set().is_valid_for(tx.message()) { - Ok(NSSATransaction::Public(tx)) - } else { - Err(TransactionMalformationError::InvalidSignature) - } - } - NSSATransaction::PrivacyPreserving(tx) => { - if tx.witness_set().signatures_are_valid_for(tx.message()) { - Ok(NSSATransaction::PrivacyPreserving(tx)) - } else { - Err(TransactionMalformationError::InvalidSignature) - } - } - NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)), - } - } - - pub fn push_tx_into_mempool_pre_check( - &mut self, - transaction: EncodedTransaction, - ) -> Result<(), TransactionMalformationError> { - let transaction = NSSATransaction::try_from(&transaction).map_err(|_| { - TransactionMalformationError::FailedToDecode { - tx: transaction.hash(), - } - })?; - - let mempool_size = self.mempool.len(); - if mempool_size >= self.sequencer_config.mempool_max_size { - return Err(TransactionMalformationError::MempoolFullForRound); - } - - let authenticated_tx = self - .transaction_pre_check(transaction) - .inspect_err(|err| warn!("Error at pre_check {err:#?}"))?; - - self.mempool.push_item(authenticated_tx.into()); - - Ok(()) - } - fn execute_check_transaction_on_state( &mut self, tx: NSSATransaction, ) -> Result { match &tx { - NSSATransaction::Public(tx) => { - self.state - .transition_from_public_transaction(tx) - .inspect_err(|err| warn!("Error at transition {err:#?}"))?; - } - NSSATransaction::PrivacyPreserving(tx) => { - self.state - .transition_from_privacy_preserving_transaction(tx) - .inspect_err(|err| warn!("Error at transition {err:#?}"))?; - } - NSSATransaction::ProgramDeployment(tx) => { - self.state - .transition_from_program_deployment_transaction(tx) - .inspect_err(|err| warn!("Error at transition {err:#?}"))?; - } + NSSATransaction::Public(tx) => self.state.transition_from_public_transaction(tx), + NSSATransaction::PrivacyPreserving(tx) => self + .state + .transition_from_privacy_preserving_transaction(tx), + NSSATransaction::ProgramDeployment(tx) => self + .state + .transition_from_program_deployment_transaction(tx), } + .inspect_err(|err| warn!("Error at transition {err:#?}"))?; Ok(tx) } - ///Produces new block from transactions in mempool + /// Produces new block from transactions in mempool pub fn produce_new_block_with_mempool_transactions(&mut self) -> Result { let now = Instant::now(); let new_block_height = self.chain_height + 1; - let mut num_valid_transactions_in_block = 0; let mut valid_transactions = vec![]; - while let Some(tx) = self.mempool.pop_last() { + while let Some(tx) = self.mempool.pop() { let nssa_transaction = NSSATransaction::try_from(&tx) .map_err(|_| TransactionMalformationError::FailedToDecode { tx: tx.hash() })?; if let Ok(valid_tx) = self.execute_check_transaction_on_state(nssa_transaction) { valid_transactions.push(valid_tx.into()); - num_valid_transactions_in_block += 1; - - if num_valid_transactions_in_block >= self.sequencer_config.max_num_tx_in_block { + if valid_transactions.len() >= self.sequencer_config.max_num_tx_in_block { break; } + } else { + // Probably need to handle unsuccessful transaction execution? } } @@ -232,12 +175,17 @@ impl SequencerCore { timestamp: curr_time, }; - let block = hashable_data.into_block(&self.block_store.signing_key); + let block = hashable_data.into_block(self.block_store.signing_key()); self.block_store.put_block_at_id(block)?; self.chain_height = new_block_height; + // TODO: Consider switching to `tracing` crate to have more structured and consistent logs e.g. + // + // ``` + // info!(num_txs = num_txs_in_block, time = now.elapsed(), "Created block"); + // ``` log::info!( "Created block with {} transactions in {} seconds", num_txs_in_block, @@ -246,10 +194,52 @@ impl SequencerCore { Ok(self.chain_height) } + + pub fn state(&self) -> &nssa::V02State { + &self.state + } + + pub fn block_store(&self) -> &SequencerBlockStore { + &self.block_store + } + + pub fn chain_height(&self) -> u64 { + self.chain_height + } + + pub fn sequencer_config(&self) -> &SequencerConfig { + &self.sequencer_config + } +} + +// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction +pub fn transaction_pre_check( + tx: NSSATransaction, +) -> Result { + // Stateless checks here + match tx { + NSSATransaction::Public(tx) => { + if tx.witness_set().is_valid_for(tx.message()) { + Ok(NSSATransaction::Public(tx)) + } else { + Err(TransactionMalformationError::InvalidSignature) + } + } + NSSATransaction::PrivacyPreserving(tx) => { + if tx.witness_set().signatures_are_valid_for(tx.message()) { + Ok(NSSATransaction::PrivacyPreserving(tx)) + } else { + Err(TransactionMalformationError::InvalidSignature) + } + } + NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)), + } } #[cfg(test)] mod tests { + use std::pin::pin; + use base58::{FromBase58, ToBase58}; use common::test_utils::sequencer_sign_key_for_testing; use nssa::PrivateKey; @@ -319,19 +309,30 @@ mod tests { nssa::PrivateKey::try_new([2; 32]).unwrap() } - fn common_setup(sequencer: &mut SequencerCore) { + async fn common_setup() -> (SequencerCore, MemPoolHandle) { + let config = setup_sequencer_config(); + common_setup_with_config(config).await + } + + async fn common_setup_with_config( + config: SequencerConfig, + ) -> (SequencerCore, MemPoolHandle) { + let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config); + let tx = common::test_utils::produce_dummy_empty_transaction(); - sequencer.mempool.push_item(tx); + mempool_handle.push(tx).await.unwrap(); sequencer .produce_new_block_with_mempool_transactions() .unwrap(); + + (sequencer, mempool_handle) } #[test] fn test_start_from_config() { let config = setup_sequencer_config(); - let sequencer = SequencerCore::start_from_config(config.clone()); + let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); assert_eq!(sequencer.chain_height, config.genesis_id); assert_eq!(sequencer.sequencer_config.max_num_tx_in_block, 10); @@ -390,7 +391,7 @@ mod tests { let initial_accounts = vec![initial_acc1, initial_acc2]; let config = setup_sequencer_config_variable_initial_accounts(initial_accounts); - let sequencer = SequencerCore::start_from_config(config.clone()); + let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); let acc1_addr = config.initial_accounts[0] .addr @@ -425,23 +426,15 @@ mod tests { #[test] fn test_transaction_pre_check_pass() { - let config = setup_sequencer_config(); - let mut sequencer = SequencerCore::start_from_config(config); - - common_setup(&mut sequencer); - let tx = common::test_utils::produce_dummy_empty_transaction(); - let result = sequencer.transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx)); + let result = transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx)); assert!(result.is_ok()); } - #[test] - fn test_transaction_pre_check_native_transfer_valid() { - let config = setup_sequencer_config(); - let mut sequencer = SequencerCore::start_from_config(config); - - common_setup(&mut sequencer); + #[tokio::test] + async fn test_transaction_pre_check_native_transfer_valid() { + let (sequencer, _mempool_handle) = common_setup().await; let acc1 = sequencer.sequencer_config.initial_accounts[0] .addr @@ -463,17 +456,14 @@ mod tests { let tx = common::test_utils::create_transaction_native_token_transfer( acc1, 0, acc2, 10, sign_key1, ); - let result = sequencer.transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx)); + let result = transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx)); assert!(result.is_ok()); } - #[test] - fn test_transaction_pre_check_native_transfer_other_signature() { - let config = setup_sequencer_config(); - let mut sequencer = SequencerCore::start_from_config(config); - - common_setup(&mut sequencer); + #[tokio::test] + async fn test_transaction_pre_check_native_transfer_other_signature() { + let (mut sequencer, _mempool_handle) = common_setup().await; let acc1 = sequencer.sequencer_config.initial_accounts[0] .addr @@ -497,9 +487,7 @@ mod tests { ); // Signature is valid, stateless check pass - let tx = sequencer - .transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx)) - .unwrap(); + let tx = transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx)).unwrap(); // Signature is not from sender. Execution fails let result = sequencer.execute_check_transaction_on_state(tx); @@ -510,12 +498,9 @@ mod tests { )); } - #[test] - fn test_transaction_pre_check_native_transfer_sent_too_much() { - let config = setup_sequencer_config(); - let mut sequencer = SequencerCore::start_from_config(config); - - common_setup(&mut sequencer); + #[tokio::test] + async fn test_transaction_pre_check_native_transfer_sent_too_much() { + let (mut sequencer, _mempool_handle) = common_setup().await; let acc1 = sequencer.sequencer_config.initial_accounts[0] .addr @@ -538,9 +523,9 @@ mod tests { acc1, 0, acc2, 10000000, sign_key1, ); - let result = sequencer.transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx)); + let result = transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx)); - //Passed pre-check + // Passed pre-check assert!(result.is_ok()); let result = sequencer.execute_check_transaction_on_state(result.unwrap()); @@ -552,12 +537,9 @@ mod tests { assert!(is_failed_at_balance_mismatch); } - #[test] - fn test_transaction_execute_native_transfer() { - let config = setup_sequencer_config(); - let mut sequencer = SequencerCore::start_from_config(config); - - common_setup(&mut sequencer); + #[tokio::test] + async fn test_transaction_execute_native_transfer() { + let (mut sequencer, _mempool_handle) = common_setup().await; let acc1 = sequencer.sequencer_config.initial_accounts[0] .addr @@ -597,63 +579,49 @@ mod tests { assert_eq!(bal_to, 20100); } - #[test] - fn test_push_tx_into_mempool_fails_mempool_full() { + #[tokio::test] + async fn test_push_tx_into_mempool_blocks_until_mempool_is_full() { let config = SequencerConfig { mempool_max_size: 1, ..setup_sequencer_config() }; - let mut sequencer = SequencerCore::start_from_config(config); - - common_setup(&mut sequencer); + let (mut sequencer, mempool_handle) = common_setup_with_config(config).await; let tx = common::test_utils::produce_dummy_empty_transaction(); // Fill the mempool - sequencer.mempool.push_item(tx.clone()); + mempool_handle.push(tx.clone()).await.unwrap(); - let result = sequencer.push_tx_into_mempool_pre_check(tx); + // Check that pushing another transaction will block + let mut push_fut = pin!(mempool_handle.push(tx.clone())); + let poll = futures::poll!(push_fut.as_mut()); + assert!(poll.is_pending()); - assert!(matches!( - result, - Err(TransactionMalformationError::MempoolFullForRound) - )); + // Empty the mempool by producing a block + sequencer + .produce_new_block_with_mempool_transactions() + .unwrap(); + + // Resolve the pending push + assert!(push_fut.await.is_ok()); } - #[test] - fn test_push_tx_into_mempool_pre_check() { - let config = setup_sequencer_config(); - let mut sequencer = SequencerCore::start_from_config(config); - - common_setup(&mut sequencer); - - let tx = common::test_utils::produce_dummy_empty_transaction(); - - let result = sequencer.push_tx_into_mempool_pre_check(tx); - assert!(result.is_ok()); - assert_eq!(sequencer.mempool.len(), 1); - } - - #[test] - fn test_produce_new_block_with_mempool_transactions() { - let config = setup_sequencer_config(); - let mut sequencer = SequencerCore::start_from_config(config); + #[tokio::test] + async fn test_produce_new_block_with_mempool_transactions() { + let (mut sequencer, mempool_handle) = common_setup().await; let genesis_height = sequencer.chain_height; let tx = common::test_utils::produce_dummy_empty_transaction(); - sequencer.mempool.push_item(tx); + mempool_handle.push(tx).await.unwrap(); let block_id = sequencer.produce_new_block_with_mempool_transactions(); assert!(block_id.is_ok()); assert_eq!(block_id.unwrap(), genesis_height + 1); } - #[test] - fn test_replay_transactions_are_rejected_in_the_same_block() { - let config = setup_sequencer_config(); - let mut sequencer = SequencerCore::start_from_config(config); - - common_setup(&mut sequencer); + #[tokio::test] + async fn test_replay_transactions_are_rejected_in_the_same_block() { + let (mut sequencer, mempool_handle) = common_setup().await; let acc1 = sequencer.sequencer_config.initial_accounts[0] .addr @@ -679,8 +647,8 @@ mod tests { let tx_original = tx.clone(); let tx_replay = tx.clone(); // Pushing two copies of the same tx to the mempool - sequencer.mempool.push_item(tx_original); - sequencer.mempool.push_item(tx_replay); + mempool_handle.push(tx_original).await.unwrap(); + mempool_handle.push(tx_replay).await.unwrap(); // Create block let current_height = sequencer @@ -695,12 +663,9 @@ mod tests { assert_eq!(block.body.transactions, vec![tx.clone()]); } - #[test] - fn test_replay_transactions_are_rejected_in_different_blocks() { - let config = setup_sequencer_config(); - let mut sequencer = SequencerCore::start_from_config(config); - - common_setup(&mut sequencer); + #[tokio::test] + async fn test_replay_transactions_are_rejected_in_different_blocks() { + let (mut sequencer, mempool_handle) = common_setup().await; let acc1 = sequencer.sequencer_config.initial_accounts[0] .addr @@ -724,7 +689,7 @@ mod tests { ); // The transaction should be included the first time - sequencer.mempool.push_item(tx.clone()); + mempool_handle.push(tx.clone()).await.unwrap(); let current_height = sequencer .produce_new_block_with_mempool_transactions() .unwrap(); @@ -735,7 +700,7 @@ mod tests { assert_eq!(block.body.transactions, vec![tx.clone()]); // Add same transaction should fail - sequencer.mempool.push_item(tx); + mempool_handle.push(tx.clone()).await.unwrap(); let current_height = sequencer .produce_new_block_with_mempool_transactions() .unwrap(); @@ -746,8 +711,8 @@ mod tests { assert!(block.body.transactions.is_empty()); } - #[test] - fn test_restart_from_storage() { + #[tokio::test] + async fn test_restart_from_storage() { let config = setup_sequencer_config(); let acc1_addr: nssa::Address = config.initial_accounts[0].addr.parse().unwrap(); let acc2_addr: nssa::Address = config.initial_accounts[1].addr.parse().unwrap(); @@ -757,7 +722,7 @@ mod tests { // from `acc_1` to `acc_2`. The block created with that transaction will be kept stored in // the temporary directory for the block storage of this test. { - let mut sequencer = SequencerCore::start_from_config(config.clone()); + let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config.clone()); let signing_key = PrivateKey::try_new([1; 32]).unwrap(); let tx = common::test_utils::create_transaction_native_token_transfer( @@ -768,7 +733,7 @@ mod tests { signing_key, ); - sequencer.mempool.push_item(tx.clone()); + mempool_handle.push(tx.clone()).await.unwrap(); let current_height = sequencer .produce_new_block_with_mempool_transactions() .unwrap(); @@ -781,7 +746,7 @@ mod tests { // Instantiating a new sequencer from the same config. This should load the existing block // with the above transaction and update the state to reflect that. - let sequencer = SequencerCore::start_from_config(config.clone()); + let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); let balance_acc_1 = sequencer.state.get_account_by_address(&acc1_addr).balance; let balance_acc_2 = sequencer.state.get_account_by_address(&acc2_addr).balance; diff --git a/sequencer_rpc/Cargo.toml b/sequencer_rpc/Cargo.toml index 557ce6a..242e8b2 100644 --- a/sequencer_rpc/Cargo.toml +++ b/sequencer_rpc/Cargo.toml @@ -19,6 +19,8 @@ actix-web.workspace = true tokio.workspace = true borsh.workspace = true +# TODO: Move to workspace + [dependencies.sequencer_core] path = "../sequencer_core" @@ -27,3 +29,6 @@ path = "../common" [dependencies.nssa] path = "../nssa" + +[dependencies.mempool] +path = "../mempool" diff --git a/sequencer_rpc/src/lib.rs b/sequencer_rpc/src/lib.rs index 2973c42..02d62f2 100644 --- a/sequencer_rpc/src/lib.rs +++ b/sequencer_rpc/src/lib.rs @@ -4,10 +4,14 @@ pub mod types; use std::sync::Arc; -use common::rpc_primitives::{ - RpcPollingConfig, - errors::{RpcError, RpcErrorKind}, +use common::{ + rpc_primitives::{ + RpcPollingConfig, + errors::{RpcError, RpcErrorKind}, + }, + transaction::EncodedTransaction, }; +use mempool::MemPoolHandle; use sequencer_core::SequencerCore; use serde::Serialize; use serde_json::Value; @@ -19,8 +23,13 @@ use self::types::err_rpc::RpcErr; //ToDo: Add necessary fields pub struct JsonHandler { - pub polling_config: RpcPollingConfig, - pub sequencer_state: Arc>, + #[expect( + dead_code, + reason = "Decided to keep it just in case, should we remove it?" + )] + polling_config: RpcPollingConfig, + sequencer_state: Arc>, + mempool_handle: MemPoolHandle, } fn respond(val: T) -> Result { diff --git a/sequencer_rpc/src/net_utils.rs b/sequencer_rpc/src/net_utils.rs index b373823..9e41fc6 100644 --- a/sequencer_rpc/src/net_utils.rs +++ b/sequencer_rpc/src/net_utils.rs @@ -3,12 +3,14 @@ use std::sync::Arc; use actix_cors::Cors; use actix_web::{App, Error as HttpError, HttpResponse, HttpServer, http, middleware, web}; +use common::transaction::EncodedTransaction; use futures::Future; use futures::FutureExt; use log::info; use common::rpc_primitives::RpcConfig; use common::rpc_primitives::message::Message; +use mempool::MemPoolHandle; use sequencer_core::SequencerCore; use tokio::sync::Mutex; @@ -46,6 +48,7 @@ fn get_cors(cors_allowed_origins: &[String]) -> Cors { pub fn new_http_server( config: RpcConfig, seuquencer_core: Arc>, + mempool_handle: MemPoolHandle, ) -> io::Result { let RpcConfig { addr, @@ -57,6 +60,7 @@ pub fn new_http_server( let handler = web::Data::new(JsonHandler { polling_config, sequencer_state: seuquencer_core.clone(), + mempool_handle, }); // HTTP server diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index 5e27bdb..7f7146a 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -3,8 +3,9 @@ use std::collections::HashMap; use actix_web::Error as HttpError; use base58::FromBase58; use base64::{Engine, engine::general_purpose}; +use log::warn; use nssa::{self, program::Program}; -use sequencer_core::config::AccountInitialData; +use sequencer_core::{TransactionMalformationError, config::AccountInitialData}; use serde_json::Value; use common::{ @@ -22,7 +23,7 @@ use common::{ GetTransactionByHashRequest, GetTransactionByHashResponse, }, }, - transaction::EncodedTransaction, + transaction::{EncodedTransaction, NSSATransaction}, }; use common::rpc_primitives::requests::{ @@ -84,11 +85,17 @@ impl JsonHandler { let tx = borsh::from_slice::(&send_tx_req.transaction).unwrap(); let tx_hash = hex::encode(tx.hash()); - { - let mut state = self.sequencer_state.lock().await; + let transaction = NSSATransaction::try_from(&tx) + .map_err(|_| TransactionMalformationError::FailedToDecode { tx: tx.hash() })?; - state.push_tx_into_mempool_pre_check(tx)?; - } + let authenticated_tx = sequencer_core::transaction_pre_check(transaction) + .inspect_err(|err| warn!("Error at pre_check {err:#?}"))?; + + // TODO: Do we need a timeout here? It will be usable if we have too many transactions to process + self.mempool_handle + .push(authenticated_tx.into()) + .await + .expect("Mempool is closed, this is a bug"); let response = SendTxResponse { status: TRANSACTION_SUBMITTED.to_string(), @@ -104,7 +111,9 @@ impl JsonHandler { let block = { let state = self.sequencer_state.lock().await; - state.block_store.get_block_at_id(get_block_req.block_id)? + state + .block_store() + .get_block_at_id(get_block_req.block_id)? }; let response = GetBlockDataResponse { @@ -120,7 +129,7 @@ impl JsonHandler { let genesis_id = { let state = self.sequencer_state.lock().await; - state.block_store.genesis_id + state.block_store().genesis_id() }; let response = GetGenesisIdResponse { genesis_id }; @@ -134,7 +143,7 @@ impl JsonHandler { let last_block = { let state = self.sequencer_state.lock().await; - state.chain_height + state.chain_height() }; let response = GetLastBlockResponse { last_block }; @@ -151,7 +160,7 @@ impl JsonHandler { let initial_accounts: Vec = { let state = self.sequencer_state.lock().await; - state.sequencer_config.initial_accounts.clone() + state.sequencer_config().initial_accounts.clone() }; respond(initial_accounts) @@ -173,7 +182,7 @@ impl JsonHandler { let balance = { let state = self.sequencer_state.lock().await; - let account = state.state.get_account_by_address(&address); + let account = state.state().get_account_by_address(&address); account.balance }; @@ -200,7 +209,7 @@ impl JsonHandler { addresses .into_iter() - .map(|addr| state.state.get_account_by_address(&addr).nonce) + .map(|addr| state.state().get_account_by_address(&addr).nonce) .collect() }; @@ -222,7 +231,7 @@ impl JsonHandler { let account = { let state = self.sequencer_state.lock().await; - state.state.get_account_by_address(&address) + state.state().get_account_by_address(&address) }; let response = GetAccountResponse { account }; @@ -243,7 +252,7 @@ impl JsonHandler { let transaction = { let state = self.sequencer_state.lock().await; state - .block_store + .block_store() .get_transaction_by_hash(hash) .map(|tx| borsh::to_vec(&tx).unwrap()) }; @@ -261,7 +270,7 @@ impl JsonHandler { let membership_proof = { let state = self.sequencer_state.lock().await; state - .state + .state() .get_proof_for_commitment(&get_proof_req.commitment) }; let response = GetProofForCommitmentResponse { membership_proof }; @@ -365,10 +374,10 @@ mod tests { } } - fn components_for_tests() -> (JsonHandler, Vec, EncodedTransaction) { + async fn components_for_tests() -> (JsonHandler, Vec, EncodedTransaction) { let config = sequencer_config_for_tests(); - let mut sequencer_core = SequencerCore::start_from_config(config); - let initial_accounts = sequencer_core.sequencer_config.initial_accounts.clone(); + let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config); + let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone(); let signing_key = nssa::PrivateKey::try_new([1; 32]).unwrap(); let balance_to_move = 10; @@ -383,9 +392,10 @@ mod tests { signing_key, ); - sequencer_core - .push_tx_into_mempool_pre_check(tx.clone()) - .unwrap(); + mempool_handle + .push(tx.clone()) + .await + .expect("Mempool is closed, this is a bug"); sequencer_core .produce_new_block_with_mempool_transactions() @@ -397,6 +407,7 @@ mod tests { JsonHandler { polling_config: RpcPollingConfig::default(), sequencer_state: sequencer_core, + mempool_handle, }, initial_accounts, tx, @@ -426,7 +437,7 @@ mod tests { #[actix_web::test] async fn test_get_account_balance_for_non_existent_account() { - let (json_handler, _, _) = components_for_tests(); + let (json_handler, _, _) = components_for_tests().await; let request = serde_json::json!({ "jsonrpc": "2.0", "method": "get_account_balance", @@ -448,7 +459,7 @@ mod tests { #[actix_web::test] async fn test_get_account_balance_for_invalid_base58() { - let (json_handler, _, _) = components_for_tests(); + let (json_handler, _, _) = components_for_tests().await; let request = serde_json::json!({ "jsonrpc": "2.0", "method": "get_account_balance", @@ -471,7 +482,7 @@ mod tests { #[actix_web::test] async fn test_get_account_balance_for_invalid_length() { - let (json_handler, _, _) = components_for_tests(); + let (json_handler, _, _) = components_for_tests().await; let request = serde_json::json!({ "jsonrpc": "2.0", "method": "get_account_balance", @@ -494,7 +505,7 @@ mod tests { #[actix_web::test] async fn test_get_account_balance_for_existing_account() { - let (json_handler, initial_accounts, _) = components_for_tests(); + let (json_handler, initial_accounts, _) = components_for_tests().await; let acc1_addr = initial_accounts[0].addr.clone(); @@ -519,7 +530,7 @@ mod tests { #[actix_web::test] async fn test_get_accounts_nonces_for_non_existent_account() { - let (json_handler, _, _) = components_for_tests(); + let (json_handler, _, _) = components_for_tests().await; let request = serde_json::json!({ "jsonrpc": "2.0", "method": "get_accounts_nonces", @@ -541,7 +552,7 @@ mod tests { #[actix_web::test] async fn test_get_accounts_nonces_for_existent_account() { - let (json_handler, initial_accounts, _) = components_for_tests(); + let (json_handler, initial_accounts, _) = components_for_tests().await; let acc_1_addr = initial_accounts[0].addr.clone(); let acc_2_addr = initial_accounts[1].addr.clone(); @@ -567,7 +578,7 @@ mod tests { #[actix_web::test] async fn test_get_account_data_for_non_existent_account() { - let (json_handler, _, _) = components_for_tests(); + let (json_handler, _, _) = components_for_tests().await; let request = serde_json::json!({ "jsonrpc": "2.0", "method": "get_account", @@ -594,7 +605,7 @@ mod tests { #[actix_web::test] async fn test_get_transaction_by_hash_for_non_existent_hash() { - let (json_handler, _, _) = components_for_tests(); + let (json_handler, _, _) = components_for_tests().await; let request = serde_json::json!({ "jsonrpc": "2.0", "method": "get_transaction_by_hash", @@ -616,7 +627,7 @@ mod tests { #[actix_web::test] async fn test_get_transaction_by_hash_for_invalid_hex() { - let (json_handler, _, _) = components_for_tests(); + let (json_handler, _, _) = components_for_tests().await; let request = serde_json::json!({ "jsonrpc": "2.0", "method": "get_transaction_by_hash", @@ -640,7 +651,7 @@ mod tests { #[actix_web::test] async fn test_get_transaction_by_hash_for_invalid_length() { - let (json_handler, _, _) = components_for_tests(); + let (json_handler, _, _) = components_for_tests().await; let request = serde_json::json!({ "jsonrpc": "2.0", "method": "get_transaction_by_hash", @@ -664,7 +675,7 @@ mod tests { #[actix_web::test] async fn test_get_transaction_by_hash_for_existing_transaction() { - let (json_handler, _, tx) = components_for_tests(); + let (json_handler, _, tx) = components_for_tests().await; let tx_hash_hex = hex::encode(tx.hash()); let expected_base64_encoded = general_purpose::STANDARD.encode(borsh::to_vec(&tx).unwrap()); diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs index 068d3d7..212066d 100644 --- a/sequencer_runner/src/lib.rs +++ b/sequencer_runner/src/lib.rs @@ -26,13 +26,17 @@ pub async fn startup_sequencer( let block_timeout = app_config.block_create_timeout_millis; let port = app_config.port; - let sequencer_core = SequencerCore::start_from_config(app_config); + let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config); info!("Sequencer core set up"); let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core)); - let http_server = new_http_server(RpcConfig::with_port(port), seq_core_wrapped.clone())?; + let http_server = new_http_server( + RpcConfig::with_port(port), + Arc::clone(&seq_core_wrapped), + mempool_handle, + )?; info!("HTTP server started"); let http_server_handle = http_server.handle(); tokio::spawn(http_server);