diff --git a/Cargo.lock b/Cargo.lock index e2db10b..51e2333 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -688,6 +688,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash 0.7.8", +] + [[package]] name = "hashbrown" version = "0.14.5" @@ -752,7 +761,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.14.5", ] [[package]] @@ -873,6 +882,15 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "lru" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a" +dependencies = [ + "hashbrown 0.12.3", +] + [[package]] name = "memchr" version = "2.7.4" @@ -944,6 +962,7 @@ dependencies = [ "log", "serde", "serde_json", + "storage", ] [[package]] @@ -1231,6 +1250,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rs_merkle" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b241d2e59b74ef9e98d94c78c47623d04c8392abaf82014dfd372a16041128f" +dependencies = [ + "sha2", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -1277,8 +1305,10 @@ dependencies = [ "anyhow", "env_logger", "log", + "mempool", "serde", "serde_json", + "storage", ] [[package]] @@ -1376,6 +1406,17 @@ dependencies = [ "digest", ] +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "shlex" version = "1.3.0" @@ -1433,9 +1474,13 @@ dependencies = [ "anyhow", "env_logger", "log", + "lru", "rocksdb", + "rs_merkle", "serde", "serde_json", + "sha2", + "thiserror", ] [[package]] @@ -1458,6 +1503,26 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "thiserror" +version = "1.0.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "time" version = "0.3.36" diff --git a/Cargo.toml b/Cargo.toml index 3afa16b..52f7a96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,10 @@ futures = "0.3" env_logger = "0.10" log = "0.4" +lru = "0.7.8" +thiserror = "1.0" +rs_merkle = "1.4" +sha2 = "0.10.8" rocksdb = { version = "0.21.0", default-features = false, features = ["snappy"] } diff --git a/node_core/Cargo.toml b/node_core/Cargo.toml index bb6e3cd..8b5f0c1 100644 --- a/node_core/Cargo.toml +++ b/node_core/Cargo.toml @@ -8,4 +8,7 @@ anyhow.workspace = true serde_json.workspace = true env_logger.workspace = true log.workspace = true -serde.workspace = true \ No newline at end of file +serde.workspace = true + +[dependencies.storage] +path = "../storage" \ No newline at end of file diff --git a/sequencer_core/Cargo.toml b/sequencer_core/Cargo.toml index b35700d..0de8d48 100644 --- a/sequencer_core/Cargo.toml +++ b/sequencer_core/Cargo.toml @@ -9,3 +9,9 @@ serde_json.workspace = true env_logger.workspace = true log.workspace = true serde.workspace = true + +[dependencies.storage] +path = "../storage" + +[dependencies.mempool] +path = "../mempool" \ No newline at end of file diff --git a/sequencer_core/src/lib.rs b/sequencer_core/src/lib.rs index 4208023..e4fc834 100644 --- a/sequencer_core/src/lib.rs +++ b/sequencer_core/src/lib.rs @@ -1 +1 @@ -//ToDo: Add sequencer_core module +pub mod transaction_mempool; diff --git a/sequencer_core/src/transaction_mempool.rs b/sequencer_core/src/transaction_mempool.rs new file mode 100644 index 0000000..7029c6c --- /dev/null +++ b/sequencer_core/src/transaction_mempool.rs @@ -0,0 +1,37 @@ +use mempool::mempoolitem::MemPoolItem; +use serde::{Deserialize, Serialize}; +use storage::transaction::{Transaction, TxHash}; + +#[derive(Debug)] +pub struct TransactionMempool { + pub tx: Transaction, +} + +impl Serialize for TransactionMempool { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.tx.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for TransactionMempool { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + match Transaction::deserialize(deserializer) { + Ok(tx) => Ok(TransactionMempool { tx }), + Err(err) => Err(err), + } + } +} + +impl MemPoolItem for TransactionMempool { + type Identifier = TxHash; + + fn identifier(&self) -> Self::Identifier { + self.tx.hash + } +} diff --git a/storage/Cargo.toml b/storage/Cargo.toml index ae3afdd..90f96e0 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -9,5 +9,9 @@ serde_json.workspace = true env_logger.workspace = true log.workspace = true serde.workspace = true +lru.workspace = true +thiserror.workspace = true -rocksdb.workspace = true \ No newline at end of file +rocksdb.workspace = true +rs_merkle.workspace = true +sha2.workspace = true \ No newline at end of file diff --git a/storage/src/block.rs b/storage/src/block.rs new file mode 100644 index 0000000..954e5f0 --- /dev/null +++ b/storage/src/block.rs @@ -0,0 +1,16 @@ +use serde::{Deserialize, Serialize}; + +use crate::transaction::Transaction; + +pub type BlockHash = [u8; 32]; +pub type Data = Vec; +pub type BlockId = u64; + +//ToDo: Add fields to block when model is clear +#[derive(Debug, Serialize, Deserialize)] +pub struct Block { + pub block_id: BlockId, + pub hash: BlockHash, + pub transactions: Vec, + pub data: Data, +} diff --git a/storage/src/error.rs b/storage/src/error.rs new file mode 100644 index 0000000..feb36f6 --- /dev/null +++ b/storage/src/error.rs @@ -0,0 +1,37 @@ +#[derive(thiserror::Error, Debug)] +pub enum DbError { + #[error("RocksDb error")] + RocksDbError { + error: rocksdb::Error, + additional_info: Option, + }, + #[error("Serialization error")] + SerializationError { + error: serde_json::Error, + additional_info: Option, + }, + #[error("Logic Error")] + DbInteractionError { additional_info: String }, +} + +impl DbError { + pub fn rocksdb_cast_message(rerr: rocksdb::Error, message: Option) -> Self { + Self::RocksDbError { + error: rerr, + additional_info: message, + } + } + + pub fn serde_cast_message(serr: serde_json::Error, message: Option) -> Self { + Self::SerializationError { + error: serr, + additional_info: message, + } + } + + pub fn db_interaction_error(message: String) -> Self { + Self::DbInteractionError { + additional_info: message, + } + } +} diff --git a/storage/src/lib.rs b/storage/src/lib.rs index ee5bbfa..6baa60a 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -1 +1,208 @@ -//ToDo: Add storage module +use std::{path::Path, sync::Arc}; + +use block::Block; +use error::DbError; +use log::warn; +use rocksdb::{ + BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, +}; + +pub mod block; +pub mod error; +pub mod merkle_tree_public; +pub mod transaction; + +///Maximal size of stored blocks in base +/// +///Used to control db size +/// +///Currently effectively unbounded. +pub const BUFF_SIZE_ROCKSDB: usize = usize::MAX; + +///Size of stored blocks cache in memory +/// +///Keeping small to not run out of memory +pub const CACHE_SIZE: usize = 1000; + +///Key base for storing metainformation about id of first block in db +pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db"; +///Key base for storing metainformation about id of last current block in db +pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db"; +///Key base for storing metainformation which describe if first block has been set +pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set"; + +///Name of block column family +pub const CF_BLOCK_NAME: &str = "cf_block"; +///Name of meta column family +pub const CF_META_NAME: &str = "cf_meta"; + +pub type DbResult = Result; + +pub struct RocksDBIO { + pub db: DBWithThreadMode, +} + +impl RocksDBIO { + pub fn new(path: &Path, start_block: Option) -> DbResult { + let mut cf_opts = Options::default(); + cf_opts.set_max_write_buffer_number(16); + //ToDo: Add more column families for different data + let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone()); + let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone()); + + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + let db = DBWithThreadMode::::open_cf_descriptors( + &db_opts, + path, + vec![cfb, cfmeta], + ); + + let dbio = Self { + //There is no point in handling this from runner code + db: db.unwrap(), + }; + + let is_start_set = dbio.get_meta_is_first_block_set()?; + + if is_start_set { + Ok(dbio) + } else if let Some(block) = start_block { + dbio.put_meta_first_block_in_db(block)?; + dbio.put_meta_is_first_block_set()?; + + Ok(dbio) + } else { + warn!("Starting db in unset mode, will have to set starting block manually"); + + Ok(dbio) + } + } + + pub fn meta_column(&self) -> Arc { + self.db.cf_handle(CF_META_NAME).unwrap() + } + + pub fn block_column(&self) -> Arc { + self.db.cf_handle(CF_BLOCK_NAME).unwrap() + } + + pub fn get_meta_first_block_in_db(&self) -> DbResult { + let cf_meta = self.meta_column(); + let res = self + .db + .get_cf(&cf_meta, DB_META_FIRST_BLOCK_IN_DB_KEY) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + if let Some(data) = res { + Ok(u64::from_be_bytes(data.try_into().unwrap())) + } else { + Err(DbError::db_interaction_error( + "First block not found".to_string(), + )) + } + } + + pub fn get_meta_last_block_in_db(&self) -> DbResult { + let cf_meta = self.meta_column(); + let res = self + .db + .get_cf(&cf_meta, DB_META_LAST_BLOCK_IN_DB_KEY) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + if let Some(data) = res { + Ok(u64::from_be_bytes(data.try_into().unwrap())) + } else { + Err(DbError::db_interaction_error( + "Last block not found".to_string(), + )) + } + } + + pub fn get_meta_is_first_block_set(&self) -> DbResult { + let cf_meta = self.meta_column(); + let res = self + .db + .get_cf(&cf_meta, DB_META_FIRST_BLOCK_SET_KEY) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + Ok(res.is_some()) + } + + pub fn put_meta_first_block_in_db(&self, block: Block) -> DbResult<()> { + let cf_meta = self.meta_column(); + self.db + .put_cf( + &cf_meta, + DB_META_FIRST_BLOCK_IN_DB_KEY.as_bytes(), + block.block_id.to_be_bytes(), + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + self.put_block(block)?; + Ok(()) + } + + pub fn put_meta_last_block_in_db(&self, block_id: u64) -> DbResult<()> { + let cf_meta = self.meta_column(); + self.db + .put_cf( + &cf_meta, + DB_META_LAST_BLOCK_IN_DB_KEY.as_bytes(), + block_id.to_be_bytes(), + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + Ok(()) + } + + pub fn put_meta_is_first_block_set(&self) -> DbResult<()> { + let cf_meta = self.meta_column(); + self.db + .put_cf(&cf_meta, DB_META_FIRST_BLOCK_SET_KEY.as_bytes(), [1u8; 1]) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + Ok(()) + } + + pub fn put_block(&self, block: Block) -> DbResult<()> { + let cf_block = self.block_column(); + + let last_curr_block = self.get_meta_last_block_in_db()?; + + if block.block_id > last_curr_block { + self.put_meta_last_block_in_db(block.block_id)?; + } + + self.db + .put_cf( + &cf_block, + block.block_id.to_be_bytes(), + serde_json::to_vec(&block).map_err(|serr| { + DbError::serde_cast_message( + serr, + Some("Block Serialization failed".to_string()), + ) + })?, + ) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + Ok(()) + } + + pub fn get_block(&self, block_id: u64) -> DbResult { + let cf_block = self.block_column(); + let res = self + .db + .get_cf(&cf_block, block_id.to_be_bytes()) + .map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?; + + if let Some(data) = res { + Ok(serde_json::from_slice::(&data).map_err(|serr| { + DbError::serde_cast_message(serr, Some("Block Deserialization failed".to_string())) + })?) + } else { + Err(DbError::db_interaction_error( + "Block on this id not found".to_string(), + )) + } + } +} diff --git a/storage/src/merkle_tree_public/hasher.rs b/storage/src/merkle_tree_public/hasher.rs new file mode 100644 index 0000000..d3f6e6b --- /dev/null +++ b/storage/src/merkle_tree_public/hasher.rs @@ -0,0 +1,20 @@ +use rs_merkle::Hasher; +use sha2::{digest::FixedOutput, Digest, Sha256}; + +use super::HashType; + +#[derive(Debug, Clone)] +///Our own hasher. +/// Currently it is SHA256 hasher wrapper. May change in a future. +pub struct OwnHasher {} + +impl Hasher for OwnHasher { + type Hash = HashType; + + fn hash(data: &[u8]) -> HashType { + let mut hasher = Sha256::new(); + + hasher.update(data); + ::from(hasher.finalize_fixed()) + } +} diff --git a/storage/src/merkle_tree_public/merkle_tree.rs b/storage/src/merkle_tree_public/merkle_tree.rs new file mode 100644 index 0000000..e160a2d --- /dev/null +++ b/storage/src/merkle_tree_public/merkle_tree.rs @@ -0,0 +1,93 @@ +use std::collections::HashMap; + +use rs_merkle::{MerkleProof, MerkleTree}; + +use crate::transaction::Transaction; + +use super::{hasher::OwnHasher, HashType}; + +pub struct PublicTransactionsMerkleTree { + leaves: HashMap, + hash_to_id_map: HashMap, + tree: MerkleTree, +} + +impl PublicTransactionsMerkleTree { + pub fn new(leaves_vec: Vec) -> Self { + let mut leaves_map = HashMap::new(); + let mut hash_to_id_map = HashMap::new(); + + let leaves_hashed: Vec = leaves_vec + .iter() + .enumerate() + .map(|(id, tx)| { + leaves_map.insert(id, tx.clone()); + hash_to_id_map.insert(tx.hash, id); + tx.hash + }) + .collect(); + Self { + leaves: leaves_map, + hash_to_id_map, + tree: MerkleTree::from_leaves(&leaves_hashed), + } + } + + pub fn get_tx(&self, hash: HashType) -> Option<&Transaction> { + self.hash_to_id_map + .get(&hash) + .and_then(|id| self.leaves.get(id)) + } + + pub fn get_root(&self) -> Option { + self.tree.root() + } + + pub fn get_proof(&self, hash: HashType) -> Option> { + self.hash_to_id_map + .get(&hash) + .map(|id| self.tree.proof(&[*id])) + } + + pub fn get_proof_multiple(&self, hashes: &[HashType]) -> Option> { + let ids_opt: Vec> = hashes + .iter() + .map(|hash| self.hash_to_id_map.get(hash)) + .collect(); + + let is_valid = ids_opt.iter().all(|el| el.is_some()); + + if is_valid { + let ids: Vec = ids_opt.into_iter().map(|el| *el.unwrap()).collect(); + + Some(self.tree.proof(&ids)) + } else { + None + } + } + + pub fn add_tx(&mut self, tx: Transaction) { + let last = self.leaves.len(); + + self.leaves.insert(last, tx.clone()); + self.hash_to_id_map.insert(tx.hash, last); + + self.tree.insert(tx.hash); + + self.tree.commit(); + } + + pub fn add_tx_multiple(&mut self, txs: Vec) { + for tx in txs.iter() { + let last = self.leaves.len(); + + self.leaves.insert(last, tx.clone()); + self.hash_to_id_map.insert(tx.hash, last); + } + + self.tree + .append(&mut txs.iter().map(|tx| tx.hash).collect()); + + self.tree.commit(); + } +} diff --git a/storage/src/merkle_tree_public/mod.rs b/storage/src/merkle_tree_public/mod.rs new file mode 100644 index 0000000..1732066 --- /dev/null +++ b/storage/src/merkle_tree_public/mod.rs @@ -0,0 +1,4 @@ +pub mod hasher; +pub mod merkle_tree; + +pub type HashType = [u8; 32]; diff --git a/storage/src/transaction.rs b/storage/src/transaction.rs new file mode 100644 index 0000000..2588e95 --- /dev/null +++ b/storage/src/transaction.rs @@ -0,0 +1,10 @@ +use serde::{Deserialize, Serialize}; + +pub type TxHash = [u8; 32]; + +//ToDo: Update Tx model, when it is clear +#[derive(Debug, Serialize, Deserialize, Clone)] +///General transaction object +pub struct Transaction { + pub hash: TxHash, +}