Merge pull request #167 from vacp2p/iss145-fix-mempool-locks

Fix locking on high transaction amount, refactor sequencer & mempool
This commit is contained in:
Daniil Polyakov 2025-11-21 13:22:10 +03:00 committed by GitHub
commit 3355d23218
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 329 additions and 468 deletions

View File

@ -1,5 +1,3 @@
use std::time::Duration;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
pub mod errors; pub mod errors;
@ -7,21 +5,6 @@ pub mod message;
pub mod parser; pub mod parser;
pub mod requests; pub mod requests;
#[derive(Serialize, Deserialize, Clone, Copy, Debug)]
pub struct RpcPollingConfig {
pub polling_interval: Duration,
pub polling_timeout: Duration,
}
impl Default for RpcPollingConfig {
fn default() -> Self {
Self {
polling_interval: Duration::from_millis(500),
polling_timeout: Duration::from_secs(10),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
pub struct RpcLimitsConfig { pub struct RpcLimitsConfig {
/// Maximum byte size of the json payload. /// Maximum byte size of the json payload.
@ -40,7 +23,6 @@ impl Default for RpcLimitsConfig {
pub struct RpcConfig { pub struct RpcConfig {
pub addr: String, pub addr: String,
pub cors_allowed_origins: Vec<String>, pub cors_allowed_origins: Vec<String>,
pub polling_config: RpcPollingConfig,
#[serde(default)] #[serde(default)]
pub limits_config: RpcLimitsConfig, pub limits_config: RpcLimitsConfig,
} }
@ -50,7 +32,6 @@ impl Default for RpcConfig {
RpcConfig { RpcConfig {
addr: "0.0.0.0:3040".to_owned(), addr: "0.0.0.0:3040".to_owned(),
cors_allowed_origins: vec!["*".to_owned()], cors_allowed_origins: vec!["*".to_owned()],
polling_config: RpcPollingConfig::default(),
limits_config: RpcLimitsConfig::default(), limits_config: RpcLimitsConfig::default(),
} }
} }

View File

@ -1664,7 +1664,7 @@ pub async fn tps_test() {
for (i, tx_hash) in tx_hashes.iter().enumerate() { for (i, tx_hash) in tx_hashes.iter().enumerate() {
loop { loop {
if now.elapsed().as_millis() > target_time.as_millis() { if now.elapsed().as_millis() > target_time.as_millis() {
panic!("TPS test failed by timout"); panic!("TPS test failed by timeout");
} }
let tx_obj = seq_client let tx_obj = seq_client

View File

@ -4,3 +4,7 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
tokio = { workspace = true, features = ["sync"] }
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }

View File

@ -1,231 +1,99 @@
use std::collections::VecDeque; use tokio::sync::mpsc::{Receiver, Sender};
pub struct MemPool<Item> { pub struct MemPool<T> {
items: VecDeque<Item>, receiver: Receiver<T>,
} }
impl<Item> MemPool<Item> { impl<T> MemPool<T> {
pub fn new() -> Self { pub fn new(max_size: usize) -> (Self, MemPoolHandle<T>) {
Self { let (sender, receiver) = tokio::sync::mpsc::channel(max_size);
items: VecDeque::new(),
} let mem_pool = Self { receiver };
let sender = MemPoolHandle::new(sender);
(mem_pool, sender)
} }
pub fn pop_last(&mut self) -> Option<Item> { pub fn pop(&mut self) -> Option<T> {
self.items.pop_front() use tokio::sync::mpsc::error::TryRecvError;
}
pub fn peek_last(&self) -> Option<&Item> { match self.receiver.try_recv() {
self.items.front() Ok(item) => Some(item),
} Err(TryRecvError::Empty) => None,
Err(TryRecvError::Disconnected) => {
pub fn push_item(&mut self, item: Item) { panic!("Mempool senders disconnected, cannot receive items, this is a bug")
self.items.push_back(item);
}
pub fn len(&self) -> usize {
self.items.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn pop_size(&mut self, size: usize) -> Vec<Item> {
let mut ret_vec = vec![];
for _ in 0..size {
let item = self.pop_last();
match item {
Some(item) => ret_vec.push(item),
None => break,
} }
} }
ret_vec
}
pub fn drain_size(&mut self, remainder: usize) -> Vec<Item> {
self.pop_size(self.len().saturating_sub(remainder))
} }
} }
impl<Item> Default for MemPool<Item> { pub struct MemPoolHandle<T> {
fn default() -> Self { sender: Sender<T>,
Self::new() }
impl<T> MemPoolHandle<T> {
fn new(sender: Sender<T>) -> Self {
Self { sender }
}
/// Send an item to the mempool blocking if max size is reached
pub async fn push(&self, item: T) -> Result<(), tokio::sync::mpsc::error::SendError<T>> {
self.sender.send(item).await
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::vec;
use super::*; use super::*;
pub type ItemId = u64; use tokio::test;
#[derive(Debug, PartialEq, Eq)] #[test]
pub struct TestItem { async fn test_mempool_new() {
id: ItemId, let (mut pool, _handle): (MemPool<u64>, _) = MemPool::new(10);
} assert_eq!(pool.pop(), None);
fn test_item_with_id(id: u64) -> TestItem {
TestItem { id }
} }
#[test] #[test]
fn test_create_empty_mempool() { async fn test_push_and_pop() {
let _: MemPool<TestItem> = MemPool::new(); let (mut pool, handle) = MemPool::new(10);
handle.push(1).await.unwrap();
let item = pool.pop();
assert_eq!(item, Some(1));
assert_eq!(pool.pop(), None);
} }
#[test] #[test]
fn test_mempool_new() { async fn test_multiple_push_pop() {
let pool: MemPool<TestItem> = MemPool::new(); let (mut pool, handle) = MemPool::new(10);
assert!(pool.is_empty());
assert_eq!(pool.len(), 0); handle.push(1).await.unwrap();
handle.push(2).await.unwrap();
handle.push(3).await.unwrap();
assert_eq!(pool.pop(), Some(1));
assert_eq!(pool.pop(), Some(2));
assert_eq!(pool.pop(), Some(3));
assert_eq!(pool.pop(), None);
} }
#[test] #[test]
fn test_push_item() { async fn test_pop_empty() {
let mut pool = MemPool::new(); let (mut pool, _handle): (MemPool<u64>, _) = MemPool::new(10);
pool.push_item(test_item_with_id(1)); assert_eq!(pool.pop(), None);
assert!(!pool.is_empty());
assert_eq!(pool.len(), 1);
} }
#[test] #[test]
fn test_pop_last() { async fn test_max_size() {
let mut pool = MemPool::new(); let (mut pool, handle) = MemPool::new(2);
pool.push_item(test_item_with_id(1));
pool.push_item(test_item_with_id(2));
let item = pool.pop_last();
assert_eq!(item, Some(test_item_with_id(1)));
assert_eq!(pool.len(), 1);
}
#[test] handle.push(1).await.unwrap();
fn test_peek_last() { handle.push(2).await.unwrap();
let mut pool = MemPool::new();
pool.push_item(test_item_with_id(1));
pool.push_item(test_item_with_id(2));
let item = pool.peek_last();
assert_eq!(item, Some(&test_item_with_id(1)));
}
#[test] // This should block if buffer is full, but we'll use try_send in a real scenario
fn test_pop_size() { // For now, just verify we can pop items
let mut pool = MemPool::new(); assert_eq!(pool.pop(), Some(1));
pool.push_item(test_item_with_id(1)); assert_eq!(pool.pop(), Some(2));
pool.push_item(test_item_with_id(2));
pool.push_item(test_item_with_id(3));
let items = pool.pop_size(2);
assert_eq!(items, vec![test_item_with_id(1), test_item_with_id(2)]);
assert_eq!(pool.len(), 1);
}
#[test]
fn test_drain_size() {
let mut pool = MemPool::new();
pool.push_item(test_item_with_id(1));
pool.push_item(test_item_with_id(2));
pool.push_item(test_item_with_id(3));
pool.push_item(test_item_with_id(4));
let items = pool.drain_size(2);
assert_eq!(items, vec![test_item_with_id(1), test_item_with_id(2)]);
assert_eq!(pool.len(), 2);
}
#[test]
fn test_default() {
let pool: MemPool<TestItem> = MemPool::default();
assert!(pool.is_empty());
assert_eq!(pool.len(), 0);
}
#[test]
fn test_is_empty() {
let mut pool = MemPool::new();
assert!(pool.is_empty());
pool.push_item(test_item_with_id(1));
assert!(!pool.is_empty());
}
#[test]
fn test_push_pop() {
let mut mempool: MemPool<TestItem> = MemPool::new();
let items = vec![
test_item_with_id(1),
test_item_with_id(2),
test_item_with_id(3),
];
for item in items {
mempool.push_item(item);
}
assert_eq!(mempool.len(), 3);
let item = mempool.pop_last();
assert_eq!(item, Some(TestItem { id: 1 }));
assert_eq!(mempool.len(), 2);
let item = mempool.pop_last();
assert_eq!(item, Some(TestItem { id: 2 }));
assert_eq!(mempool.len(), 1);
let item = mempool.pop_last();
assert_eq!(item, Some(TestItem { id: 3 }));
assert_eq!(mempool.len(), 0);
let item = mempool.pop_last();
assert_eq!(item, None);
}
#[test]
fn test_pop_many() {
let mut mempool: MemPool<TestItem> = MemPool::new();
let mut items = vec![];
for i in 1..11 {
items.push(test_item_with_id(i));
}
for item in items {
mempool.push_item(item);
}
assert_eq!(mempool.len(), 10);
let items1 = mempool.pop_size(4);
assert_eq!(
items1,
vec![
test_item_with_id(1),
test_item_with_id(2),
test_item_with_id(3),
test_item_with_id(4)
]
);
assert_eq!(mempool.len(), 6);
let items2 = mempool.drain_size(2);
assert_eq!(
items2,
vec![
test_item_with_id(5),
test_item_with_id(6),
test_item_with_id(7),
test_item_with_id(8)
]
);
assert_eq!(mempool.len(), 2);
} }
} }

View File

@ -28,3 +28,7 @@ path = "../nssa"
[features] [features]
default = [] default = []
testnet = [] testnet = []
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
futures.workspace = true

View File

@ -4,15 +4,15 @@ use anyhow::Result;
use common::{HashType, block::Block, transaction::EncodedTransaction}; use common::{HashType, block::Block, transaction::EncodedTransaction};
use storage::RocksDBIO; use storage::RocksDBIO;
pub struct SequecerBlockStore { pub struct SequencerBlockStore {
dbio: RocksDBIO, dbio: RocksDBIO,
// TODO: Consider adding the hashmap to the database for faster recovery. // TODO: Consider adding the hashmap to the database for faster recovery.
pub tx_hash_to_block_map: HashMap<HashType, u64>, tx_hash_to_block_map: HashMap<HashType, u64>,
pub genesis_id: u64, genesis_id: u64,
pub signing_key: nssa::PrivateKey, signing_key: nssa::PrivateKey,
} }
impl SequecerBlockStore { impl SequencerBlockStore {
///Starting database at the start of new chain. ///Starting database at the start of new chain.
/// Creates files if necessary. /// Creates files if necessary.
/// ///
@ -42,7 +42,7 @@ impl SequecerBlockStore {
///Reopening existing database ///Reopening existing database
pub fn open_db_restart(location: &Path, signing_key: nssa::PrivateKey) -> Result<Self> { pub fn open_db_restart(location: &Path, signing_key: nssa::PrivateKey) -> Result<Self> {
SequecerBlockStore::open_db_with_genesis(location, None, signing_key) SequencerBlockStore::open_db_with_genesis(location, None, signing_key)
} }
pub fn get_block_at_id(&self, id: u64) -> Result<Block> { pub fn get_block_at_id(&self, id: u64) -> Result<Block> {
@ -69,6 +69,18 @@ impl SequecerBlockStore {
} }
None None
} }
pub fn insert(&mut self, tx: &EncodedTransaction, block_id: u64) {
self.tx_hash_to_block_map.insert(tx.hash(), block_id);
}
pub fn genesis_id(&self) -> u64 {
self.genesis_id
}
pub fn signing_key(&self) -> &nssa::PrivateKey {
&self.signing_key
}
} }
pub(crate) fn block_to_transactions_map(block: &Block) -> HashMap<HashType, u64> { pub(crate) fn block_to_transactions_map(block: &Block) -> HashMap<HashType, u64> {
@ -104,7 +116,7 @@ mod tests {
let genesis_block = genesis_block_hashable_data.into_block(&signing_key); let genesis_block = genesis_block_hashable_data.into_block(&signing_key);
// Start an empty node store // Start an empty node store
let mut node_store = let mut node_store =
SequecerBlockStore::open_db_with_genesis(path, Some(genesis_block), signing_key) SequencerBlockStore::open_db_with_genesis(path, Some(genesis_block), signing_key)
.unwrap(); .unwrap();
let tx = common::test_utils::produce_dummy_empty_transaction(); let tx = common::test_utils::produce_dummy_empty_transaction();

View File

@ -16,6 +16,7 @@ pub struct CommitmentsInitialData {
pub account: nssa_core::account::Account, pub account: nssa_core::account::Account,
} }
// TODO: Provide default values
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize)]
pub struct SequencerConfig { pub struct SequencerConfig {
///Home dir of sequencer storage ///Home dir of sequencer storage

View File

@ -10,25 +10,24 @@ use common::{
}; };
use config::SequencerConfig; use config::SequencerConfig;
use log::warn; use log::warn;
use mempool::MemPool; use mempool::{MemPool, MemPoolHandle};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::block_store::SequecerBlockStore; use crate::block_store::SequencerBlockStore;
pub mod block_store; pub mod block_store;
pub mod config; pub mod config;
pub struct SequencerCore { pub struct SequencerCore {
pub state: nssa::V02State, state: nssa::V02State,
pub block_store: SequecerBlockStore, block_store: SequencerBlockStore,
pub mempool: MemPool<EncodedTransaction>, mempool: MemPool<EncodedTransaction>,
pub sequencer_config: SequencerConfig, sequencer_config: SequencerConfig,
pub chain_height: u64, chain_height: u64,
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum TransactionMalformationError { pub enum TransactionMalformationError {
MempoolFullForRound,
InvalidSignature, InvalidSignature,
FailedToDecode { tx: HashType }, FailedToDecode { tx: HashType },
} }
@ -42,7 +41,8 @@ impl Display for TransactionMalformationError {
impl std::error::Error for TransactionMalformationError {} impl std::error::Error for TransactionMalformationError {}
impl SequencerCore { impl SequencerCore {
pub fn start_from_config(config: SequencerConfig) -> Self { /// Start Sequencer from configuration and construct transaction sender
pub fn start_from_config(config: SequencerConfig) -> (Self, MemPoolHandle<EncodedTransaction>) {
let hashable_data = HashableBlockData { let hashable_data = HashableBlockData {
block_id: config.genesis_id, block_id: config.genesis_id,
transactions: vec![], transactions: vec![],
@ -55,7 +55,7 @@ impl SequencerCore {
//Sequencer should panic if unable to open db, //Sequencer should panic if unable to open db,
//as fixing this issue may require actions non-native to program scope //as fixing this issue may require actions non-native to program scope
let block_store = SequecerBlockStore::open_db_with_genesis( let block_store = SequencerBlockStore::open_db_with_genesis(
&config.home.join("rocksdb"), &config.home.join("rocksdb"),
Some(genesis_block), Some(genesis_block),
signing_key, signing_key,
@ -86,17 +86,18 @@ impl SequencerCore {
#[cfg(feature = "testnet")] #[cfg(feature = "testnet")]
state.add_pinata_program(PINATA_BASE58.parse().unwrap()); state.add_pinata_program(PINATA_BASE58.parse().unwrap());
let (mempool, mempool_handle) = MemPool::new(config.mempool_max_size);
let mut this = Self { let mut this = Self {
state, state,
block_store, block_store,
mempool: MemPool::default(), mempool,
chain_height: config.genesis_id, chain_height: config.genesis_id,
sequencer_config: config, sequencer_config: config,
}; };
this.sync_state_with_stored_blocks(); this.sync_state_with_stored_blocks();
this (this, mempool_handle)
} }
/// If there are stored blocks ahead of the current height, this method will load and process all transaction /// If there are stored blocks ahead of the current height, this method will load and process all transaction
@ -110,108 +111,50 @@ impl SequencerCore {
self.execute_check_transaction_on_state(transaction) self.execute_check_transaction_on_state(transaction)
.unwrap(); .unwrap();
// Update the tx hash to block id map. // Update the tx hash to block id map.
self.block_store self.block_store.insert(&encoded_transaction, next_block_id);
.tx_hash_to_block_map
.insert(encoded_transaction.hash(), next_block_id);
} }
self.chain_height = next_block_id; self.chain_height = next_block_id;
next_block_id += 1; next_block_id += 1;
} }
} }
pub fn transaction_pre_check(
&mut self,
tx: NSSATransaction,
) -> Result<NSSATransaction, TransactionMalformationError> {
// Stateless checks here
match tx {
NSSATransaction::Public(tx) => {
if tx.witness_set().is_valid_for(tx.message()) {
Ok(NSSATransaction::Public(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::PrivacyPreserving(tx) => {
if tx.witness_set().signatures_are_valid_for(tx.message()) {
Ok(NSSATransaction::PrivacyPreserving(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)),
}
}
pub fn push_tx_into_mempool_pre_check(
&mut self,
transaction: EncodedTransaction,
) -> Result<(), TransactionMalformationError> {
let transaction = NSSATransaction::try_from(&transaction).map_err(|_| {
TransactionMalformationError::FailedToDecode {
tx: transaction.hash(),
}
})?;
let mempool_size = self.mempool.len();
if mempool_size >= self.sequencer_config.mempool_max_size {
return Err(TransactionMalformationError::MempoolFullForRound);
}
let authenticated_tx = self
.transaction_pre_check(transaction)
.inspect_err(|err| warn!("Error at pre_check {err:#?}"))?;
self.mempool.push_item(authenticated_tx.into());
Ok(())
}
fn execute_check_transaction_on_state( fn execute_check_transaction_on_state(
&mut self, &mut self,
tx: NSSATransaction, tx: NSSATransaction,
) -> Result<NSSATransaction, nssa::error::NssaError> { ) -> Result<NSSATransaction, nssa::error::NssaError> {
match &tx { match &tx {
NSSATransaction::Public(tx) => { NSSATransaction::Public(tx) => self.state.transition_from_public_transaction(tx),
self.state NSSATransaction::PrivacyPreserving(tx) => self
.transition_from_public_transaction(tx) .state
.transition_from_privacy_preserving_transaction(tx),
NSSATransaction::ProgramDeployment(tx) => self
.state
.transition_from_program_deployment_transaction(tx),
}
.inspect_err(|err| warn!("Error at transition {err:#?}"))?; .inspect_err(|err| warn!("Error at transition {err:#?}"))?;
}
NSSATransaction::PrivacyPreserving(tx) => {
self.state
.transition_from_privacy_preserving_transaction(tx)
.inspect_err(|err| warn!("Error at transition {err:#?}"))?;
}
NSSATransaction::ProgramDeployment(tx) => {
self.state
.transition_from_program_deployment_transaction(tx)
.inspect_err(|err| warn!("Error at transition {err:#?}"))?;
}
}
Ok(tx) Ok(tx)
} }
///Produces new block from transactions in mempool /// Produces new block from transactions in mempool
pub fn produce_new_block_with_mempool_transactions(&mut self) -> Result<u64> { pub fn produce_new_block_with_mempool_transactions(&mut self) -> Result<u64> {
let now = Instant::now(); let now = Instant::now();
let new_block_height = self.chain_height + 1; let new_block_height = self.chain_height + 1;
let mut num_valid_transactions_in_block = 0;
let mut valid_transactions = vec![]; let mut valid_transactions = vec![];
while let Some(tx) = self.mempool.pop_last() { while let Some(tx) = self.mempool.pop() {
let nssa_transaction = NSSATransaction::try_from(&tx) let nssa_transaction = NSSATransaction::try_from(&tx)
.map_err(|_| TransactionMalformationError::FailedToDecode { tx: tx.hash() })?; .map_err(|_| TransactionMalformationError::FailedToDecode { tx: tx.hash() })?;
if let Ok(valid_tx) = self.execute_check_transaction_on_state(nssa_transaction) { if let Ok(valid_tx) = self.execute_check_transaction_on_state(nssa_transaction) {
valid_transactions.push(valid_tx.into()); valid_transactions.push(valid_tx.into());
num_valid_transactions_in_block += 1; if valid_transactions.len() >= self.sequencer_config.max_num_tx_in_block {
if num_valid_transactions_in_block >= self.sequencer_config.max_num_tx_in_block {
break; break;
} }
} else {
// Probably need to handle unsuccessful transaction execution?
} }
} }
@ -232,12 +175,17 @@ impl SequencerCore {
timestamp: curr_time, timestamp: curr_time,
}; };
let block = hashable_data.into_block(&self.block_store.signing_key); let block = hashable_data.into_block(self.block_store.signing_key());
self.block_store.put_block_at_id(block)?; self.block_store.put_block_at_id(block)?;
self.chain_height = new_block_height; self.chain_height = new_block_height;
// TODO: Consider switching to `tracing` crate to have more structured and consistent logs e.g.
//
// ```
// info!(num_txs = num_txs_in_block, time = now.elapsed(), "Created block");
// ```
log::info!( log::info!(
"Created block with {} transactions in {} seconds", "Created block with {} transactions in {} seconds",
num_txs_in_block, num_txs_in_block,
@ -246,10 +194,52 @@ impl SequencerCore {
Ok(self.chain_height) Ok(self.chain_height)
} }
pub fn state(&self) -> &nssa::V02State {
&self.state
}
pub fn block_store(&self) -> &SequencerBlockStore {
&self.block_store
}
pub fn chain_height(&self) -> u64 {
self.chain_height
}
pub fn sequencer_config(&self) -> &SequencerConfig {
&self.sequencer_config
}
}
// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction
pub fn transaction_pre_check(
tx: NSSATransaction,
) -> Result<NSSATransaction, TransactionMalformationError> {
// Stateless checks here
match tx {
NSSATransaction::Public(tx) => {
if tx.witness_set().is_valid_for(tx.message()) {
Ok(NSSATransaction::Public(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::PrivacyPreserving(tx) => {
if tx.witness_set().signatures_are_valid_for(tx.message()) {
Ok(NSSATransaction::PrivacyPreserving(tx))
} else {
Err(TransactionMalformationError::InvalidSignature)
}
}
NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)),
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::pin::pin;
use base58::{FromBase58, ToBase58}; use base58::{FromBase58, ToBase58};
use common::test_utils::sequencer_sign_key_for_testing; use common::test_utils::sequencer_sign_key_for_testing;
use nssa::PrivateKey; use nssa::PrivateKey;
@ -319,19 +309,30 @@ mod tests {
nssa::PrivateKey::try_new([2; 32]).unwrap() nssa::PrivateKey::try_new([2; 32]).unwrap()
} }
fn common_setup(sequencer: &mut SequencerCore) { async fn common_setup() -> (SequencerCore, MemPoolHandle<EncodedTransaction>) {
let config = setup_sequencer_config();
common_setup_with_config(config).await
}
async fn common_setup_with_config(
config: SequencerConfig,
) -> (SequencerCore, MemPoolHandle<EncodedTransaction>) {
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config);
let tx = common::test_utils::produce_dummy_empty_transaction(); let tx = common::test_utils::produce_dummy_empty_transaction();
sequencer.mempool.push_item(tx); mempool_handle.push(tx).await.unwrap();
sequencer sequencer
.produce_new_block_with_mempool_transactions() .produce_new_block_with_mempool_transactions()
.unwrap(); .unwrap();
(sequencer, mempool_handle)
} }
#[test] #[test]
fn test_start_from_config() { fn test_start_from_config() {
let config = setup_sequencer_config(); let config = setup_sequencer_config();
let sequencer = SequencerCore::start_from_config(config.clone()); let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone());
assert_eq!(sequencer.chain_height, config.genesis_id); assert_eq!(sequencer.chain_height, config.genesis_id);
assert_eq!(sequencer.sequencer_config.max_num_tx_in_block, 10); assert_eq!(sequencer.sequencer_config.max_num_tx_in_block, 10);
@ -390,7 +391,7 @@ mod tests {
let initial_accounts = vec![initial_acc1, initial_acc2]; let initial_accounts = vec![initial_acc1, initial_acc2];
let config = setup_sequencer_config_variable_initial_accounts(initial_accounts); let config = setup_sequencer_config_variable_initial_accounts(initial_accounts);
let sequencer = SequencerCore::start_from_config(config.clone()); let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone());
let acc1_addr = config.initial_accounts[0] let acc1_addr = config.initial_accounts[0]
.addr .addr
@ -425,23 +426,15 @@ mod tests {
#[test] #[test]
fn test_transaction_pre_check_pass() { fn test_transaction_pre_check_pass() {
let config = setup_sequencer_config();
let mut sequencer = SequencerCore::start_from_config(config);
common_setup(&mut sequencer);
let tx = common::test_utils::produce_dummy_empty_transaction(); let tx = common::test_utils::produce_dummy_empty_transaction();
let result = sequencer.transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx)); let result = transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx));
assert!(result.is_ok()); assert!(result.is_ok());
} }
#[test] #[tokio::test]
fn test_transaction_pre_check_native_transfer_valid() { async fn test_transaction_pre_check_native_transfer_valid() {
let config = setup_sequencer_config(); let (sequencer, _mempool_handle) = common_setup().await;
let mut sequencer = SequencerCore::start_from_config(config);
common_setup(&mut sequencer);
let acc1 = sequencer.sequencer_config.initial_accounts[0] let acc1 = sequencer.sequencer_config.initial_accounts[0]
.addr .addr
@ -463,17 +456,14 @@ mod tests {
let tx = common::test_utils::create_transaction_native_token_transfer( let tx = common::test_utils::create_transaction_native_token_transfer(
acc1, 0, acc2, 10, sign_key1, acc1, 0, acc2, 10, sign_key1,
); );
let result = sequencer.transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx)); let result = transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx));
assert!(result.is_ok()); assert!(result.is_ok());
} }
#[test] #[tokio::test]
fn test_transaction_pre_check_native_transfer_other_signature() { async fn test_transaction_pre_check_native_transfer_other_signature() {
let config = setup_sequencer_config(); let (mut sequencer, _mempool_handle) = common_setup().await;
let mut sequencer = SequencerCore::start_from_config(config);
common_setup(&mut sequencer);
let acc1 = sequencer.sequencer_config.initial_accounts[0] let acc1 = sequencer.sequencer_config.initial_accounts[0]
.addr .addr
@ -497,9 +487,7 @@ mod tests {
); );
// Signature is valid, stateless check pass // Signature is valid, stateless check pass
let tx = sequencer let tx = transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx)).unwrap();
.transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx))
.unwrap();
// Signature is not from sender. Execution fails // Signature is not from sender. Execution fails
let result = sequencer.execute_check_transaction_on_state(tx); let result = sequencer.execute_check_transaction_on_state(tx);
@ -510,12 +498,9 @@ mod tests {
)); ));
} }
#[test] #[tokio::test]
fn test_transaction_pre_check_native_transfer_sent_too_much() { async fn test_transaction_pre_check_native_transfer_sent_too_much() {
let config = setup_sequencer_config(); let (mut sequencer, _mempool_handle) = common_setup().await;
let mut sequencer = SequencerCore::start_from_config(config);
common_setup(&mut sequencer);
let acc1 = sequencer.sequencer_config.initial_accounts[0] let acc1 = sequencer.sequencer_config.initial_accounts[0]
.addr .addr
@ -538,9 +523,9 @@ mod tests {
acc1, 0, acc2, 10000000, sign_key1, acc1, 0, acc2, 10000000, sign_key1,
); );
let result = sequencer.transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx)); let result = transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx));
//Passed pre-check // Passed pre-check
assert!(result.is_ok()); assert!(result.is_ok());
let result = sequencer.execute_check_transaction_on_state(result.unwrap()); let result = sequencer.execute_check_transaction_on_state(result.unwrap());
@ -552,12 +537,9 @@ mod tests {
assert!(is_failed_at_balance_mismatch); assert!(is_failed_at_balance_mismatch);
} }
#[test] #[tokio::test]
fn test_transaction_execute_native_transfer() { async fn test_transaction_execute_native_transfer() {
let config = setup_sequencer_config(); let (mut sequencer, _mempool_handle) = common_setup().await;
let mut sequencer = SequencerCore::start_from_config(config);
common_setup(&mut sequencer);
let acc1 = sequencer.sequencer_config.initial_accounts[0] let acc1 = sequencer.sequencer_config.initial_accounts[0]
.addr .addr
@ -597,63 +579,49 @@ mod tests {
assert_eq!(bal_to, 20100); assert_eq!(bal_to, 20100);
} }
#[test] #[tokio::test]
fn test_push_tx_into_mempool_fails_mempool_full() { async fn test_push_tx_into_mempool_blocks_until_mempool_is_full() {
let config = SequencerConfig { let config = SequencerConfig {
mempool_max_size: 1, mempool_max_size: 1,
..setup_sequencer_config() ..setup_sequencer_config()
}; };
let mut sequencer = SequencerCore::start_from_config(config); let (mut sequencer, mempool_handle) = common_setup_with_config(config).await;
common_setup(&mut sequencer);
let tx = common::test_utils::produce_dummy_empty_transaction(); let tx = common::test_utils::produce_dummy_empty_transaction();
// Fill the mempool // Fill the mempool
sequencer.mempool.push_item(tx.clone()); mempool_handle.push(tx.clone()).await.unwrap();
let result = sequencer.push_tx_into_mempool_pre_check(tx); // Check that pushing another transaction will block
let mut push_fut = pin!(mempool_handle.push(tx.clone()));
let poll = futures::poll!(push_fut.as_mut());
assert!(poll.is_pending());
assert!(matches!( // Empty the mempool by producing a block
result, sequencer
Err(TransactionMalformationError::MempoolFullForRound) .produce_new_block_with_mempool_transactions()
)); .unwrap();
// Resolve the pending push
assert!(push_fut.await.is_ok());
} }
#[test] #[tokio::test]
fn test_push_tx_into_mempool_pre_check() { async fn test_produce_new_block_with_mempool_transactions() {
let config = setup_sequencer_config(); let (mut sequencer, mempool_handle) = common_setup().await;
let mut sequencer = SequencerCore::start_from_config(config);
common_setup(&mut sequencer);
let tx = common::test_utils::produce_dummy_empty_transaction();
let result = sequencer.push_tx_into_mempool_pre_check(tx);
assert!(result.is_ok());
assert_eq!(sequencer.mempool.len(), 1);
}
#[test]
fn test_produce_new_block_with_mempool_transactions() {
let config = setup_sequencer_config();
let mut sequencer = SequencerCore::start_from_config(config);
let genesis_height = sequencer.chain_height; let genesis_height = sequencer.chain_height;
let tx = common::test_utils::produce_dummy_empty_transaction(); let tx = common::test_utils::produce_dummy_empty_transaction();
sequencer.mempool.push_item(tx); mempool_handle.push(tx).await.unwrap();
let block_id = sequencer.produce_new_block_with_mempool_transactions(); let block_id = sequencer.produce_new_block_with_mempool_transactions();
assert!(block_id.is_ok()); assert!(block_id.is_ok());
assert_eq!(block_id.unwrap(), genesis_height + 1); assert_eq!(block_id.unwrap(), genesis_height + 1);
} }
#[test] #[tokio::test]
fn test_replay_transactions_are_rejected_in_the_same_block() { async fn test_replay_transactions_are_rejected_in_the_same_block() {
let config = setup_sequencer_config(); let (mut sequencer, mempool_handle) = common_setup().await;
let mut sequencer = SequencerCore::start_from_config(config);
common_setup(&mut sequencer);
let acc1 = sequencer.sequencer_config.initial_accounts[0] let acc1 = sequencer.sequencer_config.initial_accounts[0]
.addr .addr
@ -679,8 +647,8 @@ mod tests {
let tx_original = tx.clone(); let tx_original = tx.clone();
let tx_replay = tx.clone(); let tx_replay = tx.clone();
// Pushing two copies of the same tx to the mempool // Pushing two copies of the same tx to the mempool
sequencer.mempool.push_item(tx_original); mempool_handle.push(tx_original).await.unwrap();
sequencer.mempool.push_item(tx_replay); mempool_handle.push(tx_replay).await.unwrap();
// Create block // Create block
let current_height = sequencer let current_height = sequencer
@ -695,12 +663,9 @@ mod tests {
assert_eq!(block.body.transactions, vec![tx.clone()]); assert_eq!(block.body.transactions, vec![tx.clone()]);
} }
#[test] #[tokio::test]
fn test_replay_transactions_are_rejected_in_different_blocks() { async fn test_replay_transactions_are_rejected_in_different_blocks() {
let config = setup_sequencer_config(); let (mut sequencer, mempool_handle) = common_setup().await;
let mut sequencer = SequencerCore::start_from_config(config);
common_setup(&mut sequencer);
let acc1 = sequencer.sequencer_config.initial_accounts[0] let acc1 = sequencer.sequencer_config.initial_accounts[0]
.addr .addr
@ -724,7 +689,7 @@ mod tests {
); );
// The transaction should be included the first time // The transaction should be included the first time
sequencer.mempool.push_item(tx.clone()); mempool_handle.push(tx.clone()).await.unwrap();
let current_height = sequencer let current_height = sequencer
.produce_new_block_with_mempool_transactions() .produce_new_block_with_mempool_transactions()
.unwrap(); .unwrap();
@ -735,7 +700,7 @@ mod tests {
assert_eq!(block.body.transactions, vec![tx.clone()]); assert_eq!(block.body.transactions, vec![tx.clone()]);
// Add same transaction should fail // Add same transaction should fail
sequencer.mempool.push_item(tx); mempool_handle.push(tx.clone()).await.unwrap();
let current_height = sequencer let current_height = sequencer
.produce_new_block_with_mempool_transactions() .produce_new_block_with_mempool_transactions()
.unwrap(); .unwrap();
@ -746,8 +711,8 @@ mod tests {
assert!(block.body.transactions.is_empty()); assert!(block.body.transactions.is_empty());
} }
#[test] #[tokio::test]
fn test_restart_from_storage() { async fn test_restart_from_storage() {
let config = setup_sequencer_config(); let config = setup_sequencer_config();
let acc1_addr: nssa::Address = config.initial_accounts[0].addr.parse().unwrap(); let acc1_addr: nssa::Address = config.initial_accounts[0].addr.parse().unwrap();
let acc2_addr: nssa::Address = config.initial_accounts[1].addr.parse().unwrap(); let acc2_addr: nssa::Address = config.initial_accounts[1].addr.parse().unwrap();
@ -757,7 +722,7 @@ mod tests {
// from `acc_1` to `acc_2`. The block created with that transaction will be kept stored in // from `acc_1` to `acc_2`. The block created with that transaction will be kept stored in
// the temporary directory for the block storage of this test. // the temporary directory for the block storage of this test.
{ {
let mut sequencer = SequencerCore::start_from_config(config.clone()); let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config.clone());
let signing_key = PrivateKey::try_new([1; 32]).unwrap(); let signing_key = PrivateKey::try_new([1; 32]).unwrap();
let tx = common::test_utils::create_transaction_native_token_transfer( let tx = common::test_utils::create_transaction_native_token_transfer(
@ -768,7 +733,7 @@ mod tests {
signing_key, signing_key,
); );
sequencer.mempool.push_item(tx.clone()); mempool_handle.push(tx.clone()).await.unwrap();
let current_height = sequencer let current_height = sequencer
.produce_new_block_with_mempool_transactions() .produce_new_block_with_mempool_transactions()
.unwrap(); .unwrap();
@ -781,7 +746,7 @@ mod tests {
// Instantiating a new sequencer from the same config. This should load the existing block // Instantiating a new sequencer from the same config. This should load the existing block
// with the above transaction and update the state to reflect that. // with the above transaction and update the state to reflect that.
let sequencer = SequencerCore::start_from_config(config.clone()); let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone());
let balance_acc_1 = sequencer.state.get_account_by_address(&acc1_addr).balance; let balance_acc_1 = sequencer.state.get_account_by_address(&acc1_addr).balance;
let balance_acc_2 = sequencer.state.get_account_by_address(&acc2_addr).balance; let balance_acc_2 = sequencer.state.get_account_by_address(&acc2_addr).balance;

View File

@ -19,6 +19,8 @@ actix-web.workspace = true
tokio.workspace = true tokio.workspace = true
borsh.workspace = true borsh.workspace = true
# TODO: Move to workspace
[dependencies.sequencer_core] [dependencies.sequencer_core]
path = "../sequencer_core" path = "../sequencer_core"
@ -27,3 +29,6 @@ path = "../common"
[dependencies.nssa] [dependencies.nssa]
path = "../nssa" path = "../nssa"
[dependencies.mempool]
path = "../mempool"

View File

@ -4,10 +4,11 @@ pub mod types;
use std::sync::Arc; use std::sync::Arc;
use common::rpc_primitives::{ use common::{
RpcPollingConfig, rpc_primitives::errors::{RpcError, RpcErrorKind},
errors::{RpcError, RpcErrorKind}, transaction::EncodedTransaction,
}; };
use mempool::MemPoolHandle;
use sequencer_core::SequencerCore; use sequencer_core::SequencerCore;
use serde::Serialize; use serde::Serialize;
use serde_json::Value; use serde_json::Value;
@ -19,8 +20,8 @@ use self::types::err_rpc::RpcErr;
//ToDo: Add necessary fields //ToDo: Add necessary fields
pub struct JsonHandler { pub struct JsonHandler {
pub polling_config: RpcPollingConfig, sequencer_state: Arc<Mutex<SequencerCore>>,
pub sequencer_state: Arc<Mutex<SequencerCore>>, mempool_handle: MemPoolHandle<EncodedTransaction>,
} }
fn respond<T: Serialize>(val: T) -> Result<Value, RpcErr> { fn respond<T: Serialize>(val: T) -> Result<Value, RpcErr> {

View File

@ -3,12 +3,14 @@ use std::sync::Arc;
use actix_cors::Cors; use actix_cors::Cors;
use actix_web::{App, Error as HttpError, HttpResponse, HttpServer, http, middleware, web}; use actix_web::{App, Error as HttpError, HttpResponse, HttpServer, http, middleware, web};
use common::transaction::EncodedTransaction;
use futures::Future; use futures::Future;
use futures::FutureExt; use futures::FutureExt;
use log::info; use log::info;
use common::rpc_primitives::RpcConfig; use common::rpc_primitives::RpcConfig;
use common::rpc_primitives::message::Message; use common::rpc_primitives::message::Message;
use mempool::MemPoolHandle;
use sequencer_core::SequencerCore; use sequencer_core::SequencerCore;
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -46,17 +48,17 @@ fn get_cors(cors_allowed_origins: &[String]) -> Cors {
pub fn new_http_server( pub fn new_http_server(
config: RpcConfig, config: RpcConfig,
seuquencer_core: Arc<Mutex<SequencerCore>>, seuquencer_core: Arc<Mutex<SequencerCore>>,
mempool_handle: MemPoolHandle<EncodedTransaction>,
) -> io::Result<actix_web::dev::Server> { ) -> io::Result<actix_web::dev::Server> {
let RpcConfig { let RpcConfig {
addr, addr,
cors_allowed_origins, cors_allowed_origins,
polling_config,
limits_config, limits_config,
} = config; } = config;
info!(target:NETWORK, "Starting http server at {addr}"); info!(target:NETWORK, "Starting http server at {addr}");
let handler = web::Data::new(JsonHandler { let handler = web::Data::new(JsonHandler {
polling_config,
sequencer_state: seuquencer_core.clone(), sequencer_state: seuquencer_core.clone(),
mempool_handle,
}); });
// HTTP server // HTTP server

View File

@ -3,8 +3,9 @@ use std::collections::HashMap;
use actix_web::Error as HttpError; use actix_web::Error as HttpError;
use base58::FromBase58; use base58::FromBase58;
use base64::{Engine, engine::general_purpose}; use base64::{Engine, engine::general_purpose};
use log::warn;
use nssa::{self, program::Program}; use nssa::{self, program::Program};
use sequencer_core::config::AccountInitialData; use sequencer_core::{TransactionMalformationError, config::AccountInitialData};
use serde_json::Value; use serde_json::Value;
use common::{ use common::{
@ -22,7 +23,7 @@ use common::{
GetTransactionByHashRequest, GetTransactionByHashResponse, GetTransactionByHashRequest, GetTransactionByHashResponse,
}, },
}, },
transaction::EncodedTransaction, transaction::{EncodedTransaction, NSSATransaction},
}; };
use common::rpc_primitives::requests::{ use common::rpc_primitives::requests::{
@ -67,16 +68,16 @@ impl JsonHandler {
} }
} }
/// Example of request processing
#[allow(clippy::unused_async)] #[allow(clippy::unused_async)]
///Example of request processing
async fn process_temp_hello(&self, request: Request) -> Result<Value, RpcErr> { async fn process_temp_hello(&self, request: Request) -> Result<Value, RpcErr> {
let _hello_request = HelloRequest::parse(Some(request.params))?; let _hello_request = HelloRequest::parse(Some(request.params))?;
let helperstruct = HelloResponse { let response = HelloResponse {
greeting: HELLO_FROM_SEQUENCER.to_string(), greeting: HELLO_FROM_SEQUENCER.to_string(),
}; };
respond(helperstruct) respond(response)
} }
async fn process_send_tx(&self, request: Request) -> Result<Value, RpcErr> { async fn process_send_tx(&self, request: Request) -> Result<Value, RpcErr> {
@ -84,18 +85,24 @@ impl JsonHandler {
let tx = borsh::from_slice::<EncodedTransaction>(&send_tx_req.transaction).unwrap(); let tx = borsh::from_slice::<EncodedTransaction>(&send_tx_req.transaction).unwrap();
let tx_hash = hex::encode(tx.hash()); let tx_hash = hex::encode(tx.hash());
{ let transaction = NSSATransaction::try_from(&tx)
let mut state = self.sequencer_state.lock().await; .map_err(|_| TransactionMalformationError::FailedToDecode { tx: tx.hash() })?;
state.push_tx_into_mempool_pre_check(tx)?; let authenticated_tx = sequencer_core::transaction_pre_check(transaction)
} .inspect_err(|err| warn!("Error at pre_check {err:#?}"))?;
let helperstruct = SendTxResponse { // TODO: Do we need a timeout here? It will be usable if we have too many transactions to process
self.mempool_handle
.push(authenticated_tx.into())
.await
.expect("Mempool is closed, this is a bug");
let response = SendTxResponse {
status: TRANSACTION_SUBMITTED.to_string(), status: TRANSACTION_SUBMITTED.to_string(),
tx_hash, tx_hash,
}; };
respond(helperstruct) respond(response)
} }
async fn process_get_block_data(&self, request: Request) -> Result<Value, RpcErr> { async fn process_get_block_data(&self, request: Request) -> Result<Value, RpcErr> {
@ -104,14 +111,16 @@ impl JsonHandler {
let block = { let block = {
let state = self.sequencer_state.lock().await; let state = self.sequencer_state.lock().await;
state.block_store.get_block_at_id(get_block_req.block_id)? state
.block_store()
.get_block_at_id(get_block_req.block_id)?
}; };
let helperstruct = GetBlockDataResponse { let response = GetBlockDataResponse {
block: borsh::to_vec(&HashableBlockData::from(block)).unwrap(), block: borsh::to_vec(&HashableBlockData::from(block)).unwrap(),
}; };
respond(helperstruct) respond(response)
} }
async fn process_get_genesis(&self, request: Request) -> Result<Value, RpcErr> { async fn process_get_genesis(&self, request: Request) -> Result<Value, RpcErr> {
@ -120,12 +129,12 @@ impl JsonHandler {
let genesis_id = { let genesis_id = {
let state = self.sequencer_state.lock().await; let state = self.sequencer_state.lock().await;
state.block_store.genesis_id state.block_store().genesis_id()
}; };
let helperstruct = GetGenesisIdResponse { genesis_id }; let response = GetGenesisIdResponse { genesis_id };
respond(helperstruct) respond(response)
} }
async fn process_get_last_block(&self, request: Request) -> Result<Value, RpcErr> { async fn process_get_last_block(&self, request: Request) -> Result<Value, RpcErr> {
@ -134,12 +143,12 @@ impl JsonHandler {
let last_block = { let last_block = {
let state = self.sequencer_state.lock().await; let state = self.sequencer_state.lock().await;
state.chain_height state.chain_height()
}; };
let helperstruct = GetLastBlockResponse { last_block }; let response = GetLastBlockResponse { last_block };
respond(helperstruct) respond(response)
} }
/// Returns the initial accounts for testnet /// Returns the initial accounts for testnet
@ -151,7 +160,7 @@ impl JsonHandler {
let initial_accounts: Vec<AccountInitialData> = { let initial_accounts: Vec<AccountInitialData> = {
let state = self.sequencer_state.lock().await; let state = self.sequencer_state.lock().await;
state.sequencer_config.initial_accounts.clone() state.sequencer_config().initial_accounts.clone()
}; };
respond(initial_accounts) respond(initial_accounts)
@ -173,13 +182,13 @@ impl JsonHandler {
let balance = { let balance = {
let state = self.sequencer_state.lock().await; let state = self.sequencer_state.lock().await;
let account = state.state.get_account_by_address(&address); let account = state.state().get_account_by_address(&address);
account.balance account.balance
}; };
let helperstruct = GetAccountBalanceResponse { balance }; let response = GetAccountBalanceResponse { balance };
respond(helperstruct) respond(response)
} }
/// Returns the nonces of the accounts at the given addresses. /// Returns the nonces of the accounts at the given addresses.
@ -200,13 +209,13 @@ impl JsonHandler {
addresses addresses
.into_iter() .into_iter()
.map(|addr| state.state.get_account_by_address(&addr).nonce) .map(|addr| state.state().get_account_by_address(&addr).nonce)
.collect() .collect()
}; };
let helperstruct = GetAccountsNoncesResponse { nonces }; let response = GetAccountsNoncesResponse { nonces };
respond(helperstruct) respond(response)
} }
/// Returns account struct for given address. /// Returns account struct for given address.
@ -222,12 +231,12 @@ impl JsonHandler {
let account = { let account = {
let state = self.sequencer_state.lock().await; let state = self.sequencer_state.lock().await;
state.state.get_account_by_address(&address) state.state().get_account_by_address(&address)
}; };
let helperstruct = GetAccountResponse { account }; let response = GetAccountResponse { account };
respond(helperstruct) respond(response)
} }
/// Returns the transaction corresponding to the given hash, if it exists in the blockchain. /// Returns the transaction corresponding to the given hash, if it exists in the blockchain.
@ -243,15 +252,15 @@ impl JsonHandler {
let transaction = { let transaction = {
let state = self.sequencer_state.lock().await; let state = self.sequencer_state.lock().await;
state state
.block_store .block_store()
.get_transaction_by_hash(hash) .get_transaction_by_hash(hash)
.map(|tx| borsh::to_vec(&tx).unwrap()) .map(|tx| borsh::to_vec(&tx).unwrap())
}; };
let base64_encoded = transaction.map(|tx| general_purpose::STANDARD.encode(tx)); let base64_encoded = transaction.map(|tx| general_purpose::STANDARD.encode(tx));
let helperstruct = GetTransactionByHashResponse { let response = GetTransactionByHashResponse {
transaction: base64_encoded, transaction: base64_encoded,
}; };
respond(helperstruct) respond(response)
} }
/// Returns the commitment proof, corresponding to commitment /// Returns the commitment proof, corresponding to commitment
@ -261,11 +270,11 @@ impl JsonHandler {
let membership_proof = { let membership_proof = {
let state = self.sequencer_state.lock().await; let state = self.sequencer_state.lock().await;
state state
.state .state()
.get_proof_for_commitment(&get_proof_req.commitment) .get_proof_for_commitment(&get_proof_req.commitment)
}; };
let helperstruct = GetProofForCommitmentResponse { membership_proof }; let response = GetProofForCommitmentResponse { membership_proof };
respond(helperstruct) respond(response)
} }
async fn process_get_program_ids(&self, request: Request) -> Result<Value, RpcErr> { async fn process_get_program_ids(&self, request: Request) -> Result<Value, RpcErr> {
@ -282,8 +291,8 @@ impl JsonHandler {
"privacy_preserving_circuit".to_string(), "privacy_preserving_circuit".to_string(),
nssa::PRIVACY_PRESERVING_CIRCUIT_ID, nssa::PRIVACY_PRESERVING_CIRCUIT_ID,
); );
let helperstruct = GetProgramIdsResponse { program_ids }; let response = GetProgramIdsResponse { program_ids };
respond(helperstruct) respond(response)
} }
pub async fn process_request_internal(&self, request: Request) -> Result<Value, RpcErr> { pub async fn process_request_internal(&self, request: Request) -> Result<Value, RpcErr> {
@ -312,10 +321,7 @@ mod tests {
use crate::{JsonHandler, rpc_handler}; use crate::{JsonHandler, rpc_handler};
use base58::ToBase58; use base58::ToBase58;
use base64::{Engine, engine::general_purpose}; use base64::{Engine, engine::general_purpose};
use common::{ use common::{test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction};
rpc_primitives::RpcPollingConfig, test_utils::sequencer_sign_key_for_testing,
transaction::EncodedTransaction,
};
use sequencer_core::{ use sequencer_core::{
SequencerCore, SequencerCore,
@ -365,10 +371,10 @@ mod tests {
} }
} }
fn components_for_tests() -> (JsonHandler, Vec<AccountInitialData>, EncodedTransaction) { async fn components_for_tests() -> (JsonHandler, Vec<AccountInitialData>, EncodedTransaction) {
let config = sequencer_config_for_tests(); let config = sequencer_config_for_tests();
let mut sequencer_core = SequencerCore::start_from_config(config); let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config);
let initial_accounts = sequencer_core.sequencer_config.initial_accounts.clone(); let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone();
let signing_key = nssa::PrivateKey::try_new([1; 32]).unwrap(); let signing_key = nssa::PrivateKey::try_new([1; 32]).unwrap();
let balance_to_move = 10; let balance_to_move = 10;
@ -383,9 +389,10 @@ mod tests {
signing_key, signing_key,
); );
sequencer_core mempool_handle
.push_tx_into_mempool_pre_check(tx.clone()) .push(tx.clone())
.unwrap(); .await
.expect("Mempool is closed, this is a bug");
sequencer_core sequencer_core
.produce_new_block_with_mempool_transactions() .produce_new_block_with_mempool_transactions()
@ -395,8 +402,8 @@ mod tests {
( (
JsonHandler { JsonHandler {
polling_config: RpcPollingConfig::default(),
sequencer_state: sequencer_core, sequencer_state: sequencer_core,
mempool_handle,
}, },
initial_accounts, initial_accounts,
tx, tx,
@ -426,7 +433,7 @@ mod tests {
#[actix_web::test] #[actix_web::test]
async fn test_get_account_balance_for_non_existent_account() { async fn test_get_account_balance_for_non_existent_account() {
let (json_handler, _, _) = components_for_tests(); let (json_handler, _, _) = components_for_tests().await;
let request = serde_json::json!({ let request = serde_json::json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "get_account_balance", "method": "get_account_balance",
@ -448,7 +455,7 @@ mod tests {
#[actix_web::test] #[actix_web::test]
async fn test_get_account_balance_for_invalid_base58() { async fn test_get_account_balance_for_invalid_base58() {
let (json_handler, _, _) = components_for_tests(); let (json_handler, _, _) = components_for_tests().await;
let request = serde_json::json!({ let request = serde_json::json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "get_account_balance", "method": "get_account_balance",
@ -471,7 +478,7 @@ mod tests {
#[actix_web::test] #[actix_web::test]
async fn test_get_account_balance_for_invalid_length() { async fn test_get_account_balance_for_invalid_length() {
let (json_handler, _, _) = components_for_tests(); let (json_handler, _, _) = components_for_tests().await;
let request = serde_json::json!({ let request = serde_json::json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "get_account_balance", "method": "get_account_balance",
@ -494,7 +501,7 @@ mod tests {
#[actix_web::test] #[actix_web::test]
async fn test_get_account_balance_for_existing_account() { async fn test_get_account_balance_for_existing_account() {
let (json_handler, initial_accounts, _) = components_for_tests(); let (json_handler, initial_accounts, _) = components_for_tests().await;
let acc1_addr = initial_accounts[0].addr.clone(); let acc1_addr = initial_accounts[0].addr.clone();
@ -519,7 +526,7 @@ mod tests {
#[actix_web::test] #[actix_web::test]
async fn test_get_accounts_nonces_for_non_existent_account() { async fn test_get_accounts_nonces_for_non_existent_account() {
let (json_handler, _, _) = components_for_tests(); let (json_handler, _, _) = components_for_tests().await;
let request = serde_json::json!({ let request = serde_json::json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "get_accounts_nonces", "method": "get_accounts_nonces",
@ -541,7 +548,7 @@ mod tests {
#[actix_web::test] #[actix_web::test]
async fn test_get_accounts_nonces_for_existent_account() { async fn test_get_accounts_nonces_for_existent_account() {
let (json_handler, initial_accounts, _) = components_for_tests(); let (json_handler, initial_accounts, _) = components_for_tests().await;
let acc_1_addr = initial_accounts[0].addr.clone(); let acc_1_addr = initial_accounts[0].addr.clone();
let acc_2_addr = initial_accounts[1].addr.clone(); let acc_2_addr = initial_accounts[1].addr.clone();
@ -567,7 +574,7 @@ mod tests {
#[actix_web::test] #[actix_web::test]
async fn test_get_account_data_for_non_existent_account() { async fn test_get_account_data_for_non_existent_account() {
let (json_handler, _, _) = components_for_tests(); let (json_handler, _, _) = components_for_tests().await;
let request = serde_json::json!({ let request = serde_json::json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "get_account", "method": "get_account",
@ -594,7 +601,7 @@ mod tests {
#[actix_web::test] #[actix_web::test]
async fn test_get_transaction_by_hash_for_non_existent_hash() { async fn test_get_transaction_by_hash_for_non_existent_hash() {
let (json_handler, _, _) = components_for_tests(); let (json_handler, _, _) = components_for_tests().await;
let request = serde_json::json!({ let request = serde_json::json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "get_transaction_by_hash", "method": "get_transaction_by_hash",
@ -616,7 +623,7 @@ mod tests {
#[actix_web::test] #[actix_web::test]
async fn test_get_transaction_by_hash_for_invalid_hex() { async fn test_get_transaction_by_hash_for_invalid_hex() {
let (json_handler, _, _) = components_for_tests(); let (json_handler, _, _) = components_for_tests().await;
let request = serde_json::json!({ let request = serde_json::json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "get_transaction_by_hash", "method": "get_transaction_by_hash",
@ -640,7 +647,7 @@ mod tests {
#[actix_web::test] #[actix_web::test]
async fn test_get_transaction_by_hash_for_invalid_length() { async fn test_get_transaction_by_hash_for_invalid_length() {
let (json_handler, _, _) = components_for_tests(); let (json_handler, _, _) = components_for_tests().await;
let request = serde_json::json!({ let request = serde_json::json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "get_transaction_by_hash", "method": "get_transaction_by_hash",
@ -664,7 +671,7 @@ mod tests {
#[actix_web::test] #[actix_web::test]
async fn test_get_transaction_by_hash_for_existing_transaction() { async fn test_get_transaction_by_hash_for_existing_transaction() {
let (json_handler, _, tx) = components_for_tests(); let (json_handler, _, tx) = components_for_tests().await;
let tx_hash_hex = hex::encode(tx.hash()); let tx_hash_hex = hex::encode(tx.hash());
let expected_base64_encoded = general_purpose::STANDARD.encode(borsh::to_vec(&tx).unwrap()); let expected_base64_encoded = general_purpose::STANDARD.encode(borsh::to_vec(&tx).unwrap());

View File

@ -26,13 +26,17 @@ pub async fn startup_sequencer(
let block_timeout = app_config.block_create_timeout_millis; let block_timeout = app_config.block_create_timeout_millis;
let port = app_config.port; let port = app_config.port;
let sequencer_core = SequencerCore::start_from_config(app_config); let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config);
info!("Sequencer core set up"); info!("Sequencer core set up");
let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core)); let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core));
let http_server = new_http_server(RpcConfig::with_port(port), seq_core_wrapped.clone())?; let http_server = new_http_server(
RpcConfig::with_port(port),
Arc::clone(&seq_core_wrapped),
mempool_handle,
)?;
info!("HTTP server started"); info!("HTTP server started");
let http_server_handle = http_server.handle(); let http_server_handle = http_server.handle();
tokio::spawn(http_server); tokio::spawn(http_server);

View File

@ -4,6 +4,7 @@ use sequencer_runner::main_runner;
pub const NUM_THREADS: usize = 4; pub const NUM_THREADS: usize = 4;
// TODO: Why it requires config as a directory and not as a file?
fn main() -> Result<()> { fn main() -> Result<()> {
actix::System::with_tokio_rt(|| { actix::System::with_tokio_rt(|| {
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()

View File

@ -5,6 +5,10 @@ use wallet::{Args, execute_continious_run, execute_subcommand};
pub const NUM_THREADS: usize = 2; pub const NUM_THREADS: usize = 2;
// TODO #169: We have sample configs for sequencer, but not for wallet
// TODO #168: Why it requires config as a directory? Maybe better to deduce directory from config file path?
// TODO #172: Why it requires config as env var while sequencer_runner accepts as argument?
// TODO #171: Running pinata doesn't give output about transaction hash and etc.
fn main() -> Result<()> { fn main() -> Result<()> {
let runtime = Builder::new_multi_thread() let runtime = Builder::new_multi_thread()
.worker_threads(NUM_THREADS) .worker_threads(NUM_THREADS)
@ -18,6 +22,7 @@ fn main() -> Result<()> {
runtime.block_on(async move { runtime.block_on(async move {
if let Some(command) = args.command { if let Some(command) = args.command {
// TODO: It should return error, not panic
execute_subcommand(command).await.unwrap(); execute_subcommand(command).await.unwrap();
} else if args.continious_run { } else if args.continious_run {
execute_continious_run().await.unwrap(); execute_continious_run().await.unwrap();

View File

@ -11,6 +11,7 @@ use crate::config::WalletConfig;
pub struct TxPoller { pub struct TxPoller {
pub polling_max_blocks_to_query: usize, pub polling_max_blocks_to_query: usize,
pub polling_max_error_attempts: u64, pub polling_max_error_attempts: u64,
// TODO: This should be Duration
pub polling_error_delay_millis: u64, pub polling_error_delay_millis: u64,
pub polling_delay_millis: u64, pub polling_delay_millis: u64,
pub client: Arc<SequencerClient>, pub client: Arc<SequencerClient>,