mirror of
https://github.com/logos-blockchain/lssa.git
synced 2026-01-02 13:23:10 +00:00
refactor: sequencer & mempool
This commit is contained in:
parent
6280110c9f
commit
c1fabb2181
@ -4,3 +4,7 @@ version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
|
||||
|
||||
@ -1,231 +1,99 @@
|
||||
use std::collections::VecDeque;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
|
||||
pub struct MemPool<Item> {
|
||||
items: VecDeque<Item>,
|
||||
pub struct MemPool<T> {
|
||||
receiver: Receiver<T>,
|
||||
}
|
||||
|
||||
impl<Item> MemPool<Item> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
items: VecDeque::new(),
|
||||
}
|
||||
impl<T> MemPool<T> {
|
||||
pub fn new(max_size: usize) -> (Self, MemPoolHandle<T>) {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(max_size);
|
||||
|
||||
let mem_pool = Self { receiver };
|
||||
let sender = MemPoolHandle::new(sender);
|
||||
(mem_pool, sender)
|
||||
}
|
||||
|
||||
pub fn pop_last(&mut self) -> Option<Item> {
|
||||
self.items.pop_front()
|
||||
}
|
||||
pub fn pop(&mut self) -> Option<T> {
|
||||
use tokio::sync::mpsc::error::TryRecvError;
|
||||
|
||||
pub fn peek_last(&self) -> Option<&Item> {
|
||||
self.items.front()
|
||||
}
|
||||
|
||||
pub fn push_item(&mut self, item: Item) {
|
||||
self.items.push_back(item);
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.items.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.len() == 0
|
||||
}
|
||||
|
||||
pub fn pop_size(&mut self, size: usize) -> Vec<Item> {
|
||||
let mut ret_vec = vec![];
|
||||
|
||||
for _ in 0..size {
|
||||
let item = self.pop_last();
|
||||
|
||||
match item {
|
||||
Some(item) => ret_vec.push(item),
|
||||
None => break,
|
||||
match self.receiver.try_recv() {
|
||||
Ok(item) => Some(item),
|
||||
Err(TryRecvError::Empty) => None,
|
||||
Err(TryRecvError::Disconnected) => {
|
||||
panic!("Mempool senders disconnected, cannot receive items, this is a bug")
|
||||
}
|
||||
}
|
||||
|
||||
ret_vec
|
||||
}
|
||||
|
||||
pub fn drain_size(&mut self, remainder: usize) -> Vec<Item> {
|
||||
self.pop_size(self.len().saturating_sub(remainder))
|
||||
}
|
||||
}
|
||||
|
||||
impl<Item> Default for MemPool<Item> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
pub struct MemPoolHandle<T> {
|
||||
sender: Sender<T>,
|
||||
}
|
||||
|
||||
impl<T> MemPoolHandle<T> {
|
||||
fn new(sender: Sender<T>) -> Self {
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
/// Send an item to the mempool blocking if max size is reached
|
||||
pub async fn push(&self, item: T) -> Result<(), tokio::sync::mpsc::error::SendError<T>> {
|
||||
self.sender.send(item).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::vec;
|
||||
|
||||
use super::*;
|
||||
|
||||
pub type ItemId = u64;
|
||||
use tokio::test;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct TestItem {
|
||||
id: ItemId,
|
||||
}
|
||||
|
||||
fn test_item_with_id(id: u64) -> TestItem {
|
||||
TestItem { id }
|
||||
#[test]
|
||||
async fn test_mempool_new() {
|
||||
let (mut pool, _handle): (MemPool<u64>, _) = MemPool::new(10);
|
||||
assert_eq!(pool.pop(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_empty_mempool() {
|
||||
let _: MemPool<TestItem> = MemPool::new();
|
||||
async fn test_push_and_pop() {
|
||||
let (mut pool, handle) = MemPool::new(10);
|
||||
|
||||
handle.push(1).await.unwrap();
|
||||
|
||||
let item = pool.pop();
|
||||
assert_eq!(item, Some(1));
|
||||
assert_eq!(pool.pop(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mempool_new() {
|
||||
let pool: MemPool<TestItem> = MemPool::new();
|
||||
assert!(pool.is_empty());
|
||||
assert_eq!(pool.len(), 0);
|
||||
async fn test_multiple_push_pop() {
|
||||
let (mut pool, handle) = MemPool::new(10);
|
||||
|
||||
handle.push(1).await.unwrap();
|
||||
handle.push(2).await.unwrap();
|
||||
handle.push(3).await.unwrap();
|
||||
|
||||
assert_eq!(pool.pop(), Some(1));
|
||||
assert_eq!(pool.pop(), Some(2));
|
||||
assert_eq!(pool.pop(), Some(3));
|
||||
assert_eq!(pool.pop(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_item() {
|
||||
let mut pool = MemPool::new();
|
||||
pool.push_item(test_item_with_id(1));
|
||||
assert!(!pool.is_empty());
|
||||
assert_eq!(pool.len(), 1);
|
||||
async fn test_pop_empty() {
|
||||
let (mut pool, _handle): (MemPool<u64>, _) = MemPool::new(10);
|
||||
assert_eq!(pool.pop(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pop_last() {
|
||||
let mut pool = MemPool::new();
|
||||
pool.push_item(test_item_with_id(1));
|
||||
pool.push_item(test_item_with_id(2));
|
||||
let item = pool.pop_last();
|
||||
assert_eq!(item, Some(test_item_with_id(1)));
|
||||
assert_eq!(pool.len(), 1);
|
||||
}
|
||||
async fn test_max_size() {
|
||||
let (mut pool, handle) = MemPool::new(2);
|
||||
|
||||
#[test]
|
||||
fn test_peek_last() {
|
||||
let mut pool = MemPool::new();
|
||||
pool.push_item(test_item_with_id(1));
|
||||
pool.push_item(test_item_with_id(2));
|
||||
let item = pool.peek_last();
|
||||
assert_eq!(item, Some(&test_item_with_id(1)));
|
||||
}
|
||||
handle.push(1).await.unwrap();
|
||||
handle.push(2).await.unwrap();
|
||||
|
||||
#[test]
|
||||
fn test_pop_size() {
|
||||
let mut pool = MemPool::new();
|
||||
pool.push_item(test_item_with_id(1));
|
||||
pool.push_item(test_item_with_id(2));
|
||||
pool.push_item(test_item_with_id(3));
|
||||
|
||||
let items = pool.pop_size(2);
|
||||
assert_eq!(items, vec![test_item_with_id(1), test_item_with_id(2)]);
|
||||
assert_eq!(pool.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_drain_size() {
|
||||
let mut pool = MemPool::new();
|
||||
pool.push_item(test_item_with_id(1));
|
||||
pool.push_item(test_item_with_id(2));
|
||||
pool.push_item(test_item_with_id(3));
|
||||
pool.push_item(test_item_with_id(4));
|
||||
|
||||
let items = pool.drain_size(2);
|
||||
assert_eq!(items, vec![test_item_with_id(1), test_item_with_id(2)]);
|
||||
assert_eq!(pool.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default() {
|
||||
let pool: MemPool<TestItem> = MemPool::default();
|
||||
assert!(pool.is_empty());
|
||||
assert_eq!(pool.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_empty() {
|
||||
let mut pool = MemPool::new();
|
||||
assert!(pool.is_empty());
|
||||
pool.push_item(test_item_with_id(1));
|
||||
assert!(!pool.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_pop() {
|
||||
let mut mempool: MemPool<TestItem> = MemPool::new();
|
||||
|
||||
let items = vec![
|
||||
test_item_with_id(1),
|
||||
test_item_with_id(2),
|
||||
test_item_with_id(3),
|
||||
];
|
||||
|
||||
for item in items {
|
||||
mempool.push_item(item);
|
||||
}
|
||||
assert_eq!(mempool.len(), 3);
|
||||
|
||||
let item = mempool.pop_last();
|
||||
|
||||
assert_eq!(item, Some(TestItem { id: 1 }));
|
||||
assert_eq!(mempool.len(), 2);
|
||||
|
||||
let item = mempool.pop_last();
|
||||
|
||||
assert_eq!(item, Some(TestItem { id: 2 }));
|
||||
assert_eq!(mempool.len(), 1);
|
||||
|
||||
let item = mempool.pop_last();
|
||||
|
||||
assert_eq!(item, Some(TestItem { id: 3 }));
|
||||
assert_eq!(mempool.len(), 0);
|
||||
|
||||
let item = mempool.pop_last();
|
||||
|
||||
assert_eq!(item, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pop_many() {
|
||||
let mut mempool: MemPool<TestItem> = MemPool::new();
|
||||
|
||||
let mut items = vec![];
|
||||
|
||||
for i in 1..11 {
|
||||
items.push(test_item_with_id(i));
|
||||
}
|
||||
|
||||
for item in items {
|
||||
mempool.push_item(item);
|
||||
}
|
||||
|
||||
assert_eq!(mempool.len(), 10);
|
||||
|
||||
let items1 = mempool.pop_size(4);
|
||||
assert_eq!(
|
||||
items1,
|
||||
vec![
|
||||
test_item_with_id(1),
|
||||
test_item_with_id(2),
|
||||
test_item_with_id(3),
|
||||
test_item_with_id(4)
|
||||
]
|
||||
);
|
||||
assert_eq!(mempool.len(), 6);
|
||||
|
||||
let items2 = mempool.drain_size(2);
|
||||
assert_eq!(
|
||||
items2,
|
||||
vec![
|
||||
test_item_with_id(5),
|
||||
test_item_with_id(6),
|
||||
test_item_with_id(7),
|
||||
test_item_with_id(8)
|
||||
]
|
||||
);
|
||||
assert_eq!(mempool.len(), 2);
|
||||
// This should block if buffer is full, but we'll use try_send in a real scenario
|
||||
// For now, just verify we can pop items
|
||||
assert_eq!(pool.pop(), Some(1));
|
||||
assert_eq!(pool.pop(), Some(2));
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,3 +28,7 @@ path = "../nssa"
|
||||
[features]
|
||||
default = []
|
||||
testnet = []
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
|
||||
futures.workspace = true
|
||||
|
||||
@ -4,15 +4,15 @@ use anyhow::Result;
|
||||
use common::{HashType, block::Block, transaction::EncodedTransaction};
|
||||
use storage::RocksDBIO;
|
||||
|
||||
pub struct SequecerBlockStore {
|
||||
pub struct SequencerBlockStore {
|
||||
dbio: RocksDBIO,
|
||||
// TODO: Consider adding the hashmap to the database for faster recovery.
|
||||
pub tx_hash_to_block_map: HashMap<HashType, u64>,
|
||||
pub genesis_id: u64,
|
||||
pub signing_key: nssa::PrivateKey,
|
||||
tx_hash_to_block_map: HashMap<HashType, u64>,
|
||||
genesis_id: u64,
|
||||
signing_key: nssa::PrivateKey,
|
||||
}
|
||||
|
||||
impl SequecerBlockStore {
|
||||
impl SequencerBlockStore {
|
||||
///Starting database at the start of new chain.
|
||||
/// Creates files if necessary.
|
||||
///
|
||||
@ -42,7 +42,7 @@ impl SequecerBlockStore {
|
||||
|
||||
///Reopening existing database
|
||||
pub fn open_db_restart(location: &Path, signing_key: nssa::PrivateKey) -> Result<Self> {
|
||||
SequecerBlockStore::open_db_with_genesis(location, None, signing_key)
|
||||
SequencerBlockStore::open_db_with_genesis(location, None, signing_key)
|
||||
}
|
||||
|
||||
pub fn get_block_at_id(&self, id: u64) -> Result<Block> {
|
||||
@ -69,6 +69,18 @@ impl SequecerBlockStore {
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, tx: &EncodedTransaction, block_id: u64) {
|
||||
self.tx_hash_to_block_map.insert(tx.hash(), block_id);
|
||||
}
|
||||
|
||||
pub fn genesis_id(&self) -> u64 {
|
||||
self.genesis_id
|
||||
}
|
||||
|
||||
pub fn signing_key(&self) -> &nssa::PrivateKey {
|
||||
&self.signing_key
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn block_to_transactions_map(block: &Block) -> HashMap<HashType, u64> {
|
||||
@ -104,7 +116,7 @@ mod tests {
|
||||
let genesis_block = genesis_block_hashable_data.into_block(&signing_key);
|
||||
// Start an empty node store
|
||||
let mut node_store =
|
||||
SequecerBlockStore::open_db_with_genesis(path, Some(genesis_block), signing_key)
|
||||
SequencerBlockStore::open_db_with_genesis(path, Some(genesis_block), signing_key)
|
||||
.unwrap();
|
||||
|
||||
let tx = common::test_utils::produce_dummy_empty_transaction();
|
||||
|
||||
@ -16,6 +16,7 @@ pub struct CommitmentsInitialData {
|
||||
pub account: nssa_core::account::Account,
|
||||
}
|
||||
|
||||
// TODO: Provide default values
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct SequencerConfig {
|
||||
///Home dir of sequencer storage
|
||||
|
||||
@ -10,25 +10,24 @@ use common::{
|
||||
};
|
||||
use config::SequencerConfig;
|
||||
use log::warn;
|
||||
use mempool::MemPool;
|
||||
use mempool::{MemPool, MemPoolHandle};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::block_store::SequecerBlockStore;
|
||||
use crate::block_store::SequencerBlockStore;
|
||||
|
||||
pub mod block_store;
|
||||
pub mod config;
|
||||
|
||||
pub struct SequencerCore {
|
||||
pub state: nssa::V02State,
|
||||
pub block_store: SequecerBlockStore,
|
||||
pub mempool: MemPool<EncodedTransaction>,
|
||||
pub sequencer_config: SequencerConfig,
|
||||
pub chain_height: u64,
|
||||
state: nssa::V02State,
|
||||
block_store: SequencerBlockStore,
|
||||
mempool: MemPool<EncodedTransaction>,
|
||||
sequencer_config: SequencerConfig,
|
||||
chain_height: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub enum TransactionMalformationError {
|
||||
MempoolFullForRound,
|
||||
InvalidSignature,
|
||||
FailedToDecode { tx: HashType },
|
||||
}
|
||||
@ -42,7 +41,8 @@ impl Display for TransactionMalformationError {
|
||||
impl std::error::Error for TransactionMalformationError {}
|
||||
|
||||
impl SequencerCore {
|
||||
pub fn start_from_config(config: SequencerConfig) -> Self {
|
||||
/// Start Sequencer from configuration and construct transaction sender
|
||||
pub fn start_from_config(config: SequencerConfig) -> (Self, MemPoolHandle<EncodedTransaction>) {
|
||||
let hashable_data = HashableBlockData {
|
||||
block_id: config.genesis_id,
|
||||
transactions: vec![],
|
||||
@ -55,7 +55,7 @@ impl SequencerCore {
|
||||
|
||||
//Sequencer should panic if unable to open db,
|
||||
//as fixing this issue may require actions non-native to program scope
|
||||
let block_store = SequecerBlockStore::open_db_with_genesis(
|
||||
let block_store = SequencerBlockStore::open_db_with_genesis(
|
||||
&config.home.join("rocksdb"),
|
||||
Some(genesis_block),
|
||||
signing_key,
|
||||
@ -86,17 +86,18 @@ impl SequencerCore {
|
||||
#[cfg(feature = "testnet")]
|
||||
state.add_pinata_program(PINATA_BASE58.parse().unwrap());
|
||||
|
||||
let (mempool, mempool_handle) = MemPool::new(config.mempool_max_size);
|
||||
let mut this = Self {
|
||||
state,
|
||||
block_store,
|
||||
mempool: MemPool::default(),
|
||||
mempool,
|
||||
chain_height: config.genesis_id,
|
||||
sequencer_config: config,
|
||||
};
|
||||
|
||||
this.sync_state_with_stored_blocks();
|
||||
|
||||
this
|
||||
(this, mempool_handle)
|
||||
}
|
||||
|
||||
/// If there are stored blocks ahead of the current height, this method will load and process all transaction
|
||||
@ -110,108 +111,50 @@ impl SequencerCore {
|
||||
self.execute_check_transaction_on_state(transaction)
|
||||
.unwrap();
|
||||
// Update the tx hash to block id map.
|
||||
self.block_store
|
||||
.tx_hash_to_block_map
|
||||
.insert(encoded_transaction.hash(), next_block_id);
|
||||
self.block_store.insert(&encoded_transaction, next_block_id);
|
||||
}
|
||||
self.chain_height = next_block_id;
|
||||
next_block_id += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn transaction_pre_check(
|
||||
&mut self,
|
||||
tx: NSSATransaction,
|
||||
) -> Result<NSSATransaction, TransactionMalformationError> {
|
||||
// Stateless checks here
|
||||
match tx {
|
||||
NSSATransaction::Public(tx) => {
|
||||
if tx.witness_set().is_valid_for(tx.message()) {
|
||||
Ok(NSSATransaction::Public(tx))
|
||||
} else {
|
||||
Err(TransactionMalformationError::InvalidSignature)
|
||||
}
|
||||
}
|
||||
NSSATransaction::PrivacyPreserving(tx) => {
|
||||
if tx.witness_set().signatures_are_valid_for(tx.message()) {
|
||||
Ok(NSSATransaction::PrivacyPreserving(tx))
|
||||
} else {
|
||||
Err(TransactionMalformationError::InvalidSignature)
|
||||
}
|
||||
}
|
||||
NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push_tx_into_mempool_pre_check(
|
||||
&mut self,
|
||||
transaction: EncodedTransaction,
|
||||
) -> Result<(), TransactionMalformationError> {
|
||||
let transaction = NSSATransaction::try_from(&transaction).map_err(|_| {
|
||||
TransactionMalformationError::FailedToDecode {
|
||||
tx: transaction.hash(),
|
||||
}
|
||||
})?;
|
||||
|
||||
let mempool_size = self.mempool.len();
|
||||
if mempool_size >= self.sequencer_config.mempool_max_size {
|
||||
return Err(TransactionMalformationError::MempoolFullForRound);
|
||||
}
|
||||
|
||||
let authenticated_tx = self
|
||||
.transaction_pre_check(transaction)
|
||||
.inspect_err(|err| warn!("Error at pre_check {err:#?}"))?;
|
||||
|
||||
self.mempool.push_item(authenticated_tx.into());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn execute_check_transaction_on_state(
|
||||
&mut self,
|
||||
tx: NSSATransaction,
|
||||
) -> Result<NSSATransaction, nssa::error::NssaError> {
|
||||
match &tx {
|
||||
NSSATransaction::Public(tx) => {
|
||||
self.state
|
||||
.transition_from_public_transaction(tx)
|
||||
.inspect_err(|err| warn!("Error at transition {err:#?}"))?;
|
||||
}
|
||||
NSSATransaction::PrivacyPreserving(tx) => {
|
||||
self.state
|
||||
.transition_from_privacy_preserving_transaction(tx)
|
||||
.inspect_err(|err| warn!("Error at transition {err:#?}"))?;
|
||||
}
|
||||
NSSATransaction::ProgramDeployment(tx) => {
|
||||
self.state
|
||||
.transition_from_program_deployment_transaction(tx)
|
||||
.inspect_err(|err| warn!("Error at transition {err:#?}"))?;
|
||||
}
|
||||
NSSATransaction::Public(tx) => self.state.transition_from_public_transaction(tx),
|
||||
NSSATransaction::PrivacyPreserving(tx) => self
|
||||
.state
|
||||
.transition_from_privacy_preserving_transaction(tx),
|
||||
NSSATransaction::ProgramDeployment(tx) => self
|
||||
.state
|
||||
.transition_from_program_deployment_transaction(tx),
|
||||
}
|
||||
.inspect_err(|err| warn!("Error at transition {err:#?}"))?;
|
||||
|
||||
Ok(tx)
|
||||
}
|
||||
|
||||
///Produces new block from transactions in mempool
|
||||
/// Produces new block from transactions in mempool
|
||||
pub fn produce_new_block_with_mempool_transactions(&mut self) -> Result<u64> {
|
||||
let now = Instant::now();
|
||||
let new_block_height = self.chain_height + 1;
|
||||
|
||||
let mut num_valid_transactions_in_block = 0;
|
||||
let mut valid_transactions = vec![];
|
||||
|
||||
while let Some(tx) = self.mempool.pop_last() {
|
||||
while let Some(tx) = self.mempool.pop() {
|
||||
let nssa_transaction = NSSATransaction::try_from(&tx)
|
||||
.map_err(|_| TransactionMalformationError::FailedToDecode { tx: tx.hash() })?;
|
||||
|
||||
if let Ok(valid_tx) = self.execute_check_transaction_on_state(nssa_transaction) {
|
||||
valid_transactions.push(valid_tx.into());
|
||||
|
||||
num_valid_transactions_in_block += 1;
|
||||
|
||||
if num_valid_transactions_in_block >= self.sequencer_config.max_num_tx_in_block {
|
||||
if valid_transactions.len() >= self.sequencer_config.max_num_tx_in_block {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// Probably need to handle unsuccessful transaction execution?
|
||||
}
|
||||
}
|
||||
|
||||
@ -232,12 +175,17 @@ impl SequencerCore {
|
||||
timestamp: curr_time,
|
||||
};
|
||||
|
||||
let block = hashable_data.into_block(&self.block_store.signing_key);
|
||||
let block = hashable_data.into_block(self.block_store.signing_key());
|
||||
|
||||
self.block_store.put_block_at_id(block)?;
|
||||
|
||||
self.chain_height = new_block_height;
|
||||
|
||||
// TODO: Consider switching to `tracing` crate to have more structured and consistent logs e.g.
|
||||
//
|
||||
// ```
|
||||
// info!(num_txs = num_txs_in_block, time = now.elapsed(), "Created block");
|
||||
// ```
|
||||
log::info!(
|
||||
"Created block with {} transactions in {} seconds",
|
||||
num_txs_in_block,
|
||||
@ -246,10 +194,52 @@ impl SequencerCore {
|
||||
|
||||
Ok(self.chain_height)
|
||||
}
|
||||
|
||||
pub fn state(&self) -> &nssa::V02State {
|
||||
&self.state
|
||||
}
|
||||
|
||||
pub fn block_store(&self) -> &SequencerBlockStore {
|
||||
&self.block_store
|
||||
}
|
||||
|
||||
pub fn chain_height(&self) -> u64 {
|
||||
self.chain_height
|
||||
}
|
||||
|
||||
pub fn sequencer_config(&self) -> &SequencerConfig {
|
||||
&self.sequencer_config
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction
|
||||
pub fn transaction_pre_check(
|
||||
tx: NSSATransaction,
|
||||
) -> Result<NSSATransaction, TransactionMalformationError> {
|
||||
// Stateless checks here
|
||||
match tx {
|
||||
NSSATransaction::Public(tx) => {
|
||||
if tx.witness_set().is_valid_for(tx.message()) {
|
||||
Ok(NSSATransaction::Public(tx))
|
||||
} else {
|
||||
Err(TransactionMalformationError::InvalidSignature)
|
||||
}
|
||||
}
|
||||
NSSATransaction::PrivacyPreserving(tx) => {
|
||||
if tx.witness_set().signatures_are_valid_for(tx.message()) {
|
||||
Ok(NSSATransaction::PrivacyPreserving(tx))
|
||||
} else {
|
||||
Err(TransactionMalformationError::InvalidSignature)
|
||||
}
|
||||
}
|
||||
NSSATransaction::ProgramDeployment(tx) => Ok(NSSATransaction::ProgramDeployment(tx)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::pin::pin;
|
||||
|
||||
use base58::{FromBase58, ToBase58};
|
||||
use common::test_utils::sequencer_sign_key_for_testing;
|
||||
use nssa::PrivateKey;
|
||||
@ -319,19 +309,30 @@ mod tests {
|
||||
nssa::PrivateKey::try_new([2; 32]).unwrap()
|
||||
}
|
||||
|
||||
fn common_setup(sequencer: &mut SequencerCore) {
|
||||
async fn common_setup() -> (SequencerCore, MemPoolHandle<EncodedTransaction>) {
|
||||
let config = setup_sequencer_config();
|
||||
common_setup_with_config(config).await
|
||||
}
|
||||
|
||||
async fn common_setup_with_config(
|
||||
config: SequencerConfig,
|
||||
) -> (SequencerCore, MemPoolHandle<EncodedTransaction>) {
|
||||
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config);
|
||||
|
||||
let tx = common::test_utils::produce_dummy_empty_transaction();
|
||||
sequencer.mempool.push_item(tx);
|
||||
mempool_handle.push(tx).await.unwrap();
|
||||
|
||||
sequencer
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap();
|
||||
|
||||
(sequencer, mempool_handle)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_start_from_config() {
|
||||
let config = setup_sequencer_config();
|
||||
let sequencer = SequencerCore::start_from_config(config.clone());
|
||||
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone());
|
||||
|
||||
assert_eq!(sequencer.chain_height, config.genesis_id);
|
||||
assert_eq!(sequencer.sequencer_config.max_num_tx_in_block, 10);
|
||||
@ -390,7 +391,7 @@ mod tests {
|
||||
let initial_accounts = vec![initial_acc1, initial_acc2];
|
||||
|
||||
let config = setup_sequencer_config_variable_initial_accounts(initial_accounts);
|
||||
let sequencer = SequencerCore::start_from_config(config.clone());
|
||||
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone());
|
||||
|
||||
let acc1_addr = config.initial_accounts[0]
|
||||
.addr
|
||||
@ -425,23 +426,15 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_transaction_pre_check_pass() {
|
||||
let config = setup_sequencer_config();
|
||||
let mut sequencer = SequencerCore::start_from_config(config);
|
||||
|
||||
common_setup(&mut sequencer);
|
||||
|
||||
let tx = common::test_utils::produce_dummy_empty_transaction();
|
||||
let result = sequencer.transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx));
|
||||
let result = transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx));
|
||||
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_transaction_pre_check_native_transfer_valid() {
|
||||
let config = setup_sequencer_config();
|
||||
let mut sequencer = SequencerCore::start_from_config(config);
|
||||
|
||||
common_setup(&mut sequencer);
|
||||
#[tokio::test]
|
||||
async fn test_transaction_pre_check_native_transfer_valid() {
|
||||
let (sequencer, _mempool_handle) = common_setup().await;
|
||||
|
||||
let acc1 = sequencer.sequencer_config.initial_accounts[0]
|
||||
.addr
|
||||
@ -463,17 +456,14 @@ mod tests {
|
||||
let tx = common::test_utils::create_transaction_native_token_transfer(
|
||||
acc1, 0, acc2, 10, sign_key1,
|
||||
);
|
||||
let result = sequencer.transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx));
|
||||
let result = transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx));
|
||||
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_transaction_pre_check_native_transfer_other_signature() {
|
||||
let config = setup_sequencer_config();
|
||||
let mut sequencer = SequencerCore::start_from_config(config);
|
||||
|
||||
common_setup(&mut sequencer);
|
||||
#[tokio::test]
|
||||
async fn test_transaction_pre_check_native_transfer_other_signature() {
|
||||
let (mut sequencer, _mempool_handle) = common_setup().await;
|
||||
|
||||
let acc1 = sequencer.sequencer_config.initial_accounts[0]
|
||||
.addr
|
||||
@ -497,9 +487,7 @@ mod tests {
|
||||
);
|
||||
|
||||
// Signature is valid, stateless check pass
|
||||
let tx = sequencer
|
||||
.transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx))
|
||||
.unwrap();
|
||||
let tx = transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx)).unwrap();
|
||||
|
||||
// Signature is not from sender. Execution fails
|
||||
let result = sequencer.execute_check_transaction_on_state(tx);
|
||||
@ -510,12 +498,9 @@ mod tests {
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_transaction_pre_check_native_transfer_sent_too_much() {
|
||||
let config = setup_sequencer_config();
|
||||
let mut sequencer = SequencerCore::start_from_config(config);
|
||||
|
||||
common_setup(&mut sequencer);
|
||||
#[tokio::test]
|
||||
async fn test_transaction_pre_check_native_transfer_sent_too_much() {
|
||||
let (mut sequencer, _mempool_handle) = common_setup().await;
|
||||
|
||||
let acc1 = sequencer.sequencer_config.initial_accounts[0]
|
||||
.addr
|
||||
@ -538,9 +523,9 @@ mod tests {
|
||||
acc1, 0, acc2, 10000000, sign_key1,
|
||||
);
|
||||
|
||||
let result = sequencer.transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx));
|
||||
let result = transaction_pre_check(parse_unwrap_tx_body_into_nssa_tx(tx));
|
||||
|
||||
//Passed pre-check
|
||||
// Passed pre-check
|
||||
assert!(result.is_ok());
|
||||
|
||||
let result = sequencer.execute_check_transaction_on_state(result.unwrap());
|
||||
@ -552,12 +537,9 @@ mod tests {
|
||||
assert!(is_failed_at_balance_mismatch);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_transaction_execute_native_transfer() {
|
||||
let config = setup_sequencer_config();
|
||||
let mut sequencer = SequencerCore::start_from_config(config);
|
||||
|
||||
common_setup(&mut sequencer);
|
||||
#[tokio::test]
|
||||
async fn test_transaction_execute_native_transfer() {
|
||||
let (mut sequencer, _mempool_handle) = common_setup().await;
|
||||
|
||||
let acc1 = sequencer.sequencer_config.initial_accounts[0]
|
||||
.addr
|
||||
@ -597,63 +579,49 @@ mod tests {
|
||||
assert_eq!(bal_to, 20100);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_tx_into_mempool_fails_mempool_full() {
|
||||
#[tokio::test]
|
||||
async fn test_push_tx_into_mempool_blocks_until_mempool_is_full() {
|
||||
let config = SequencerConfig {
|
||||
mempool_max_size: 1,
|
||||
..setup_sequencer_config()
|
||||
};
|
||||
let mut sequencer = SequencerCore::start_from_config(config);
|
||||
|
||||
common_setup(&mut sequencer);
|
||||
let (mut sequencer, mempool_handle) = common_setup_with_config(config).await;
|
||||
|
||||
let tx = common::test_utils::produce_dummy_empty_transaction();
|
||||
|
||||
// Fill the mempool
|
||||
sequencer.mempool.push_item(tx.clone());
|
||||
mempool_handle.push(tx.clone()).await.unwrap();
|
||||
|
||||
let result = sequencer.push_tx_into_mempool_pre_check(tx);
|
||||
// Check that pushing another transaction will block
|
||||
let mut push_fut = pin!(mempool_handle.push(tx.clone()));
|
||||
let poll = futures::poll!(push_fut.as_mut());
|
||||
assert!(poll.is_pending());
|
||||
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(TransactionMalformationError::MempoolFullForRound)
|
||||
));
|
||||
// Empty the mempool by producing a block
|
||||
sequencer
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap();
|
||||
|
||||
// Resolve the pending push
|
||||
assert!(push_fut.await.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_tx_into_mempool_pre_check() {
|
||||
let config = setup_sequencer_config();
|
||||
let mut sequencer = SequencerCore::start_from_config(config);
|
||||
|
||||
common_setup(&mut sequencer);
|
||||
|
||||
let tx = common::test_utils::produce_dummy_empty_transaction();
|
||||
|
||||
let result = sequencer.push_tx_into_mempool_pre_check(tx);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(sequencer.mempool.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_produce_new_block_with_mempool_transactions() {
|
||||
let config = setup_sequencer_config();
|
||||
let mut sequencer = SequencerCore::start_from_config(config);
|
||||
#[tokio::test]
|
||||
async fn test_produce_new_block_with_mempool_transactions() {
|
||||
let (mut sequencer, mempool_handle) = common_setup().await;
|
||||
let genesis_height = sequencer.chain_height;
|
||||
|
||||
let tx = common::test_utils::produce_dummy_empty_transaction();
|
||||
sequencer.mempool.push_item(tx);
|
||||
mempool_handle.push(tx).await.unwrap();
|
||||
|
||||
let block_id = sequencer.produce_new_block_with_mempool_transactions();
|
||||
assert!(block_id.is_ok());
|
||||
assert_eq!(block_id.unwrap(), genesis_height + 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replay_transactions_are_rejected_in_the_same_block() {
|
||||
let config = setup_sequencer_config();
|
||||
let mut sequencer = SequencerCore::start_from_config(config);
|
||||
|
||||
common_setup(&mut sequencer);
|
||||
#[tokio::test]
|
||||
async fn test_replay_transactions_are_rejected_in_the_same_block() {
|
||||
let (mut sequencer, mempool_handle) = common_setup().await;
|
||||
|
||||
let acc1 = sequencer.sequencer_config.initial_accounts[0]
|
||||
.addr
|
||||
@ -679,8 +647,8 @@ mod tests {
|
||||
let tx_original = tx.clone();
|
||||
let tx_replay = tx.clone();
|
||||
// Pushing two copies of the same tx to the mempool
|
||||
sequencer.mempool.push_item(tx_original);
|
||||
sequencer.mempool.push_item(tx_replay);
|
||||
mempool_handle.push(tx_original).await.unwrap();
|
||||
mempool_handle.push(tx_replay).await.unwrap();
|
||||
|
||||
// Create block
|
||||
let current_height = sequencer
|
||||
@ -695,12 +663,9 @@ mod tests {
|
||||
assert_eq!(block.body.transactions, vec![tx.clone()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replay_transactions_are_rejected_in_different_blocks() {
|
||||
let config = setup_sequencer_config();
|
||||
let mut sequencer = SequencerCore::start_from_config(config);
|
||||
|
||||
common_setup(&mut sequencer);
|
||||
#[tokio::test]
|
||||
async fn test_replay_transactions_are_rejected_in_different_blocks() {
|
||||
let (mut sequencer, mempool_handle) = common_setup().await;
|
||||
|
||||
let acc1 = sequencer.sequencer_config.initial_accounts[0]
|
||||
.addr
|
||||
@ -724,7 +689,7 @@ mod tests {
|
||||
);
|
||||
|
||||
// The transaction should be included the first time
|
||||
sequencer.mempool.push_item(tx.clone());
|
||||
mempool_handle.push(tx.clone()).await.unwrap();
|
||||
let current_height = sequencer
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap();
|
||||
@ -735,7 +700,7 @@ mod tests {
|
||||
assert_eq!(block.body.transactions, vec![tx.clone()]);
|
||||
|
||||
// Add same transaction should fail
|
||||
sequencer.mempool.push_item(tx);
|
||||
mempool_handle.push(tx.clone()).await.unwrap();
|
||||
let current_height = sequencer
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap();
|
||||
@ -746,8 +711,8 @@ mod tests {
|
||||
assert!(block.body.transactions.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_restart_from_storage() {
|
||||
#[tokio::test]
|
||||
async fn test_restart_from_storage() {
|
||||
let config = setup_sequencer_config();
|
||||
let acc1_addr: nssa::Address = config.initial_accounts[0].addr.parse().unwrap();
|
||||
let acc2_addr: nssa::Address = config.initial_accounts[1].addr.parse().unwrap();
|
||||
@ -757,7 +722,7 @@ mod tests {
|
||||
// from `acc_1` to `acc_2`. The block created with that transaction will be kept stored in
|
||||
// the temporary directory for the block storage of this test.
|
||||
{
|
||||
let mut sequencer = SequencerCore::start_from_config(config.clone());
|
||||
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config.clone());
|
||||
let signing_key = PrivateKey::try_new([1; 32]).unwrap();
|
||||
|
||||
let tx = common::test_utils::create_transaction_native_token_transfer(
|
||||
@ -768,7 +733,7 @@ mod tests {
|
||||
signing_key,
|
||||
);
|
||||
|
||||
sequencer.mempool.push_item(tx.clone());
|
||||
mempool_handle.push(tx.clone()).await.unwrap();
|
||||
let current_height = sequencer
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap();
|
||||
@ -781,7 +746,7 @@ mod tests {
|
||||
|
||||
// Instantiating a new sequencer from the same config. This should load the existing block
|
||||
// with the above transaction and update the state to reflect that.
|
||||
let sequencer = SequencerCore::start_from_config(config.clone());
|
||||
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone());
|
||||
let balance_acc_1 = sequencer.state.get_account_by_address(&acc1_addr).balance;
|
||||
let balance_acc_2 = sequencer.state.get_account_by_address(&acc2_addr).balance;
|
||||
|
||||
|
||||
@ -19,6 +19,8 @@ actix-web.workspace = true
|
||||
tokio.workspace = true
|
||||
borsh.workspace = true
|
||||
|
||||
# TODO: Move to workspace
|
||||
|
||||
[dependencies.sequencer_core]
|
||||
path = "../sequencer_core"
|
||||
|
||||
@ -27,3 +29,6 @@ path = "../common"
|
||||
|
||||
[dependencies.nssa]
|
||||
path = "../nssa"
|
||||
|
||||
[dependencies.mempool]
|
||||
path = "../mempool"
|
||||
|
||||
@ -4,10 +4,14 @@ pub mod types;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::rpc_primitives::{
|
||||
RpcPollingConfig,
|
||||
errors::{RpcError, RpcErrorKind},
|
||||
use common::{
|
||||
rpc_primitives::{
|
||||
RpcPollingConfig,
|
||||
errors::{RpcError, RpcErrorKind},
|
||||
},
|
||||
transaction::EncodedTransaction,
|
||||
};
|
||||
use mempool::MemPoolHandle;
|
||||
use sequencer_core::SequencerCore;
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
@ -19,8 +23,13 @@ use self::types::err_rpc::RpcErr;
|
||||
|
||||
//ToDo: Add necessary fields
|
||||
pub struct JsonHandler {
|
||||
pub polling_config: RpcPollingConfig,
|
||||
pub sequencer_state: Arc<Mutex<SequencerCore>>,
|
||||
#[expect(
|
||||
dead_code,
|
||||
reason = "Decided to keep it just in case, should we remove it?"
|
||||
)]
|
||||
polling_config: RpcPollingConfig,
|
||||
sequencer_state: Arc<Mutex<SequencerCore>>,
|
||||
mempool_handle: MemPoolHandle<EncodedTransaction>,
|
||||
}
|
||||
|
||||
fn respond<T: Serialize>(val: T) -> Result<Value, RpcErr> {
|
||||
|
||||
@ -3,12 +3,14 @@ use std::sync::Arc;
|
||||
|
||||
use actix_cors::Cors;
|
||||
use actix_web::{App, Error as HttpError, HttpResponse, HttpServer, http, middleware, web};
|
||||
use common::transaction::EncodedTransaction;
|
||||
use futures::Future;
|
||||
use futures::FutureExt;
|
||||
use log::info;
|
||||
|
||||
use common::rpc_primitives::RpcConfig;
|
||||
use common::rpc_primitives::message::Message;
|
||||
use mempool::MemPoolHandle;
|
||||
use sequencer_core::SequencerCore;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
@ -46,6 +48,7 @@ fn get_cors(cors_allowed_origins: &[String]) -> Cors {
|
||||
pub fn new_http_server(
|
||||
config: RpcConfig,
|
||||
seuquencer_core: Arc<Mutex<SequencerCore>>,
|
||||
mempool_handle: MemPoolHandle<EncodedTransaction>,
|
||||
) -> io::Result<actix_web::dev::Server> {
|
||||
let RpcConfig {
|
||||
addr,
|
||||
@ -57,6 +60,7 @@ pub fn new_http_server(
|
||||
let handler = web::Data::new(JsonHandler {
|
||||
polling_config,
|
||||
sequencer_state: seuquencer_core.clone(),
|
||||
mempool_handle,
|
||||
});
|
||||
|
||||
// HTTP server
|
||||
|
||||
@ -3,8 +3,9 @@ use std::collections::HashMap;
|
||||
use actix_web::Error as HttpError;
|
||||
use base58::FromBase58;
|
||||
use base64::{Engine, engine::general_purpose};
|
||||
use log::warn;
|
||||
use nssa::{self, program::Program};
|
||||
use sequencer_core::config::AccountInitialData;
|
||||
use sequencer_core::{TransactionMalformationError, config::AccountInitialData};
|
||||
use serde_json::Value;
|
||||
|
||||
use common::{
|
||||
@ -22,7 +23,7 @@ use common::{
|
||||
GetTransactionByHashRequest, GetTransactionByHashResponse,
|
||||
},
|
||||
},
|
||||
transaction::EncodedTransaction,
|
||||
transaction::{EncodedTransaction, NSSATransaction},
|
||||
};
|
||||
|
||||
use common::rpc_primitives::requests::{
|
||||
@ -84,11 +85,17 @@ impl JsonHandler {
|
||||
let tx = borsh::from_slice::<EncodedTransaction>(&send_tx_req.transaction).unwrap();
|
||||
let tx_hash = hex::encode(tx.hash());
|
||||
|
||||
{
|
||||
let mut state = self.sequencer_state.lock().await;
|
||||
let transaction = NSSATransaction::try_from(&tx)
|
||||
.map_err(|_| TransactionMalformationError::FailedToDecode { tx: tx.hash() })?;
|
||||
|
||||
state.push_tx_into_mempool_pre_check(tx)?;
|
||||
}
|
||||
let authenticated_tx = sequencer_core::transaction_pre_check(transaction)
|
||||
.inspect_err(|err| warn!("Error at pre_check {err:#?}"))?;
|
||||
|
||||
// TODO: Do we need a timeout here? It will be usable if we have too many transactions to process
|
||||
self.mempool_handle
|
||||
.push(authenticated_tx.into())
|
||||
.await
|
||||
.expect("Mempool is closed, this is a bug");
|
||||
|
||||
let response = SendTxResponse {
|
||||
status: TRANSACTION_SUBMITTED.to_string(),
|
||||
@ -104,7 +111,9 @@ impl JsonHandler {
|
||||
let block = {
|
||||
let state = self.sequencer_state.lock().await;
|
||||
|
||||
state.block_store.get_block_at_id(get_block_req.block_id)?
|
||||
state
|
||||
.block_store()
|
||||
.get_block_at_id(get_block_req.block_id)?
|
||||
};
|
||||
|
||||
let response = GetBlockDataResponse {
|
||||
@ -120,7 +129,7 @@ impl JsonHandler {
|
||||
let genesis_id = {
|
||||
let state = self.sequencer_state.lock().await;
|
||||
|
||||
state.block_store.genesis_id
|
||||
state.block_store().genesis_id()
|
||||
};
|
||||
|
||||
let response = GetGenesisIdResponse { genesis_id };
|
||||
@ -134,7 +143,7 @@ impl JsonHandler {
|
||||
let last_block = {
|
||||
let state = self.sequencer_state.lock().await;
|
||||
|
||||
state.chain_height
|
||||
state.chain_height()
|
||||
};
|
||||
|
||||
let response = GetLastBlockResponse { last_block };
|
||||
@ -151,7 +160,7 @@ impl JsonHandler {
|
||||
let initial_accounts: Vec<AccountInitialData> = {
|
||||
let state = self.sequencer_state.lock().await;
|
||||
|
||||
state.sequencer_config.initial_accounts.clone()
|
||||
state.sequencer_config().initial_accounts.clone()
|
||||
};
|
||||
|
||||
respond(initial_accounts)
|
||||
@ -173,7 +182,7 @@ impl JsonHandler {
|
||||
|
||||
let balance = {
|
||||
let state = self.sequencer_state.lock().await;
|
||||
let account = state.state.get_account_by_address(&address);
|
||||
let account = state.state().get_account_by_address(&address);
|
||||
account.balance
|
||||
};
|
||||
|
||||
@ -200,7 +209,7 @@ impl JsonHandler {
|
||||
|
||||
addresses
|
||||
.into_iter()
|
||||
.map(|addr| state.state.get_account_by_address(&addr).nonce)
|
||||
.map(|addr| state.state().get_account_by_address(&addr).nonce)
|
||||
.collect()
|
||||
};
|
||||
|
||||
@ -222,7 +231,7 @@ impl JsonHandler {
|
||||
let account = {
|
||||
let state = self.sequencer_state.lock().await;
|
||||
|
||||
state.state.get_account_by_address(&address)
|
||||
state.state().get_account_by_address(&address)
|
||||
};
|
||||
|
||||
let response = GetAccountResponse { account };
|
||||
@ -243,7 +252,7 @@ impl JsonHandler {
|
||||
let transaction = {
|
||||
let state = self.sequencer_state.lock().await;
|
||||
state
|
||||
.block_store
|
||||
.block_store()
|
||||
.get_transaction_by_hash(hash)
|
||||
.map(|tx| borsh::to_vec(&tx).unwrap())
|
||||
};
|
||||
@ -261,7 +270,7 @@ impl JsonHandler {
|
||||
let membership_proof = {
|
||||
let state = self.sequencer_state.lock().await;
|
||||
state
|
||||
.state
|
||||
.state()
|
||||
.get_proof_for_commitment(&get_proof_req.commitment)
|
||||
};
|
||||
let response = GetProofForCommitmentResponse { membership_proof };
|
||||
@ -365,10 +374,10 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn components_for_tests() -> (JsonHandler, Vec<AccountInitialData>, EncodedTransaction) {
|
||||
async fn components_for_tests() -> (JsonHandler, Vec<AccountInitialData>, EncodedTransaction) {
|
||||
let config = sequencer_config_for_tests();
|
||||
let mut sequencer_core = SequencerCore::start_from_config(config);
|
||||
let initial_accounts = sequencer_core.sequencer_config.initial_accounts.clone();
|
||||
let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config);
|
||||
let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone();
|
||||
|
||||
let signing_key = nssa::PrivateKey::try_new([1; 32]).unwrap();
|
||||
let balance_to_move = 10;
|
||||
@ -383,9 +392,10 @@ mod tests {
|
||||
signing_key,
|
||||
);
|
||||
|
||||
sequencer_core
|
||||
.push_tx_into_mempool_pre_check(tx.clone())
|
||||
.unwrap();
|
||||
mempool_handle
|
||||
.push(tx.clone())
|
||||
.await
|
||||
.expect("Mempool is closed, this is a bug");
|
||||
|
||||
sequencer_core
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
@ -397,6 +407,7 @@ mod tests {
|
||||
JsonHandler {
|
||||
polling_config: RpcPollingConfig::default(),
|
||||
sequencer_state: sequencer_core,
|
||||
mempool_handle,
|
||||
},
|
||||
initial_accounts,
|
||||
tx,
|
||||
@ -426,7 +437,7 @@ mod tests {
|
||||
|
||||
#[actix_web::test]
|
||||
async fn test_get_account_balance_for_non_existent_account() {
|
||||
let (json_handler, _, _) = components_for_tests();
|
||||
let (json_handler, _, _) = components_for_tests().await;
|
||||
let request = serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "get_account_balance",
|
||||
@ -448,7 +459,7 @@ mod tests {
|
||||
|
||||
#[actix_web::test]
|
||||
async fn test_get_account_balance_for_invalid_base58() {
|
||||
let (json_handler, _, _) = components_for_tests();
|
||||
let (json_handler, _, _) = components_for_tests().await;
|
||||
let request = serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "get_account_balance",
|
||||
@ -471,7 +482,7 @@ mod tests {
|
||||
|
||||
#[actix_web::test]
|
||||
async fn test_get_account_balance_for_invalid_length() {
|
||||
let (json_handler, _, _) = components_for_tests();
|
||||
let (json_handler, _, _) = components_for_tests().await;
|
||||
let request = serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "get_account_balance",
|
||||
@ -494,7 +505,7 @@ mod tests {
|
||||
|
||||
#[actix_web::test]
|
||||
async fn test_get_account_balance_for_existing_account() {
|
||||
let (json_handler, initial_accounts, _) = components_for_tests();
|
||||
let (json_handler, initial_accounts, _) = components_for_tests().await;
|
||||
|
||||
let acc1_addr = initial_accounts[0].addr.clone();
|
||||
|
||||
@ -519,7 +530,7 @@ mod tests {
|
||||
|
||||
#[actix_web::test]
|
||||
async fn test_get_accounts_nonces_for_non_existent_account() {
|
||||
let (json_handler, _, _) = components_for_tests();
|
||||
let (json_handler, _, _) = components_for_tests().await;
|
||||
let request = serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "get_accounts_nonces",
|
||||
@ -541,7 +552,7 @@ mod tests {
|
||||
|
||||
#[actix_web::test]
|
||||
async fn test_get_accounts_nonces_for_existent_account() {
|
||||
let (json_handler, initial_accounts, _) = components_for_tests();
|
||||
let (json_handler, initial_accounts, _) = components_for_tests().await;
|
||||
|
||||
let acc_1_addr = initial_accounts[0].addr.clone();
|
||||
let acc_2_addr = initial_accounts[1].addr.clone();
|
||||
@ -567,7 +578,7 @@ mod tests {
|
||||
|
||||
#[actix_web::test]
|
||||
async fn test_get_account_data_for_non_existent_account() {
|
||||
let (json_handler, _, _) = components_for_tests();
|
||||
let (json_handler, _, _) = components_for_tests().await;
|
||||
let request = serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "get_account",
|
||||
@ -594,7 +605,7 @@ mod tests {
|
||||
|
||||
#[actix_web::test]
|
||||
async fn test_get_transaction_by_hash_for_non_existent_hash() {
|
||||
let (json_handler, _, _) = components_for_tests();
|
||||
let (json_handler, _, _) = components_for_tests().await;
|
||||
let request = serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "get_transaction_by_hash",
|
||||
@ -616,7 +627,7 @@ mod tests {
|
||||
|
||||
#[actix_web::test]
|
||||
async fn test_get_transaction_by_hash_for_invalid_hex() {
|
||||
let (json_handler, _, _) = components_for_tests();
|
||||
let (json_handler, _, _) = components_for_tests().await;
|
||||
let request = serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "get_transaction_by_hash",
|
||||
@ -640,7 +651,7 @@ mod tests {
|
||||
|
||||
#[actix_web::test]
|
||||
async fn test_get_transaction_by_hash_for_invalid_length() {
|
||||
let (json_handler, _, _) = components_for_tests();
|
||||
let (json_handler, _, _) = components_for_tests().await;
|
||||
let request = serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "get_transaction_by_hash",
|
||||
@ -664,7 +675,7 @@ mod tests {
|
||||
|
||||
#[actix_web::test]
|
||||
async fn test_get_transaction_by_hash_for_existing_transaction() {
|
||||
let (json_handler, _, tx) = components_for_tests();
|
||||
let (json_handler, _, tx) = components_for_tests().await;
|
||||
let tx_hash_hex = hex::encode(tx.hash());
|
||||
let expected_base64_encoded = general_purpose::STANDARD.encode(borsh::to_vec(&tx).unwrap());
|
||||
|
||||
|
||||
@ -26,13 +26,17 @@ pub async fn startup_sequencer(
|
||||
let block_timeout = app_config.block_create_timeout_millis;
|
||||
let port = app_config.port;
|
||||
|
||||
let sequencer_core = SequencerCore::start_from_config(app_config);
|
||||
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config);
|
||||
|
||||
info!("Sequencer core set up");
|
||||
|
||||
let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core));
|
||||
|
||||
let http_server = new_http_server(RpcConfig::with_port(port), seq_core_wrapped.clone())?;
|
||||
let http_server = new_http_server(
|
||||
RpcConfig::with_port(port),
|
||||
Arc::clone(&seq_core_wrapped),
|
||||
mempool_handle,
|
||||
)?;
|
||||
info!("HTTP server started");
|
||||
let http_server_handle = http_server.handle();
|
||||
tokio::spawn(http_server);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user