feat: storage and merkle tree added

This commit is contained in:
Oleksandr Pravdyvyi 2024-10-10 14:09:31 +03:00
parent 3e84d130b1
commit e19d9227d5
14 changed files with 511 additions and 5 deletions

67
Cargo.lock generated
View File

@ -688,6 +688,15 @@ dependencies = [
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [
"ahash 0.7.8",
]
[[package]]
name = "hashbrown"
version = "0.14.5"
@ -752,7 +761,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5"
dependencies = [
"equivalent",
"hashbrown",
"hashbrown 0.14.5",
]
[[package]]
@ -873,6 +882,15 @@ version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "lru"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a"
dependencies = [
"hashbrown 0.12.3",
]
[[package]]
name = "memchr"
version = "2.7.4"
@ -944,6 +962,7 @@ dependencies = [
"log",
"serde",
"serde_json",
"storage",
]
[[package]]
@ -1231,6 +1250,15 @@ dependencies = [
"serde_json",
]
[[package]]
name = "rs_merkle"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b241d2e59b74ef9e98d94c78c47623d04c8392abaf82014dfd372a16041128f"
dependencies = [
"sha2",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@ -1277,8 +1305,10 @@ dependencies = [
"anyhow",
"env_logger",
"log",
"mempool",
"serde",
"serde_json",
"storage",
]
[[package]]
@ -1376,6 +1406,17 @@ dependencies = [
"digest",
]
[[package]]
name = "sha2"
version = "0.10.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "shlex"
version = "1.3.0"
@ -1433,9 +1474,13 @@ dependencies = [
"anyhow",
"env_logger",
"log",
"lru",
"rocksdb",
"rs_merkle",
"serde",
"serde_json",
"sha2",
"thiserror",
]
[[package]]
@ -1458,6 +1503,26 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "thiserror"
version = "1.0.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "time"
version = "0.3.36"

View File

@ -30,6 +30,10 @@ futures = "0.3"
env_logger = "0.10"
log = "0.4"
lru = "0.7.8"
thiserror = "1.0"
rs_merkle = "1.4"
sha2 = "0.10.8"
rocksdb = { version = "0.21.0", default-features = false, features = ["snappy"] }

View File

@ -8,4 +8,7 @@ anyhow.workspace = true
serde_json.workspace = true
env_logger.workspace = true
log.workspace = true
serde.workspace = true
serde.workspace = true
[dependencies.storage]
path = "../storage"

View File

@ -9,3 +9,9 @@ serde_json.workspace = true
env_logger.workspace = true
log.workspace = true
serde.workspace = true
[dependencies.storage]
path = "../storage"
[dependencies.mempool]
path = "../mempool"

View File

@ -1 +1 @@
//ToDo: Add sequencer_core module
pub mod transaction_mempool;

View File

@ -0,0 +1,37 @@
use mempool::mempoolitem::MemPoolItem;
use serde::{Deserialize, Serialize};
use storage::transaction::{Transaction, TxHash};
#[derive(Debug)]
pub struct TransactionMempool {
pub tx: Transaction,
}
impl Serialize for TransactionMempool {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.tx.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for TransactionMempool {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
match Transaction::deserialize(deserializer) {
Ok(tx) => Ok(TransactionMempool { tx }),
Err(err) => Err(err),
}
}
}
impl MemPoolItem for TransactionMempool {
type Identifier = TxHash;
fn identifier(&self) -> Self::Identifier {
self.tx.hash
}
}

View File

@ -9,5 +9,9 @@ serde_json.workspace = true
env_logger.workspace = true
log.workspace = true
serde.workspace = true
lru.workspace = true
thiserror.workspace = true
rocksdb.workspace = true
rocksdb.workspace = true
rs_merkle.workspace = true
sha2.workspace = true

16
storage/src/block.rs Normal file
View File

@ -0,0 +1,16 @@
use serde::{Deserialize, Serialize};
use crate::transaction::Transaction;
pub type BlockHash = [u8; 32];
pub type Data = Vec<u8>;
pub type BlockId = u64;
//ToDo: Add fields to block when model is clear
#[derive(Debug, Serialize, Deserialize)]
pub struct Block {
pub block_id: BlockId,
pub hash: BlockHash,
pub transactions: Vec<Transaction>,
pub data: Data,
}

37
storage/src/error.rs Normal file
View File

@ -0,0 +1,37 @@
#[derive(thiserror::Error, Debug)]
pub enum DbError {
#[error("RocksDb error")]
RocksDbError {
error: rocksdb::Error,
additional_info: Option<String>,
},
#[error("Serialization error")]
SerializationError {
error: serde_json::Error,
additional_info: Option<String>,
},
#[error("Logic Error")]
DbInteractionError { additional_info: String },
}
impl DbError {
pub fn rocksdb_cast_message(rerr: rocksdb::Error, message: Option<String>) -> Self {
Self::RocksDbError {
error: rerr,
additional_info: message,
}
}
pub fn serde_cast_message(serr: serde_json::Error, message: Option<String>) -> Self {
Self::SerializationError {
error: serr,
additional_info: message,
}
}
pub fn db_interaction_error(message: String) -> Self {
Self::DbInteractionError {
additional_info: message,
}
}
}

View File

@ -1 +1,208 @@
//ToDo: Add storage module
use std::{path::Path, sync::Arc};
use block::Block;
use error::DbError;
use log::warn;
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options,
};
pub mod block;
pub mod error;
pub mod merkle_tree_public;
pub mod transaction;
///Maximal size of stored blocks in base
///
///Used to control db size
///
///Currently effectively unbounded.
pub const BUFF_SIZE_ROCKSDB: usize = usize::MAX;
///Size of stored blocks cache in memory
///
///Keeping small to not run out of memory
pub const CACHE_SIZE: usize = 1000;
///Key base for storing metainformation about id of first block in db
pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db";
///Key base for storing metainformation about id of last current block in db
pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db";
///Key base for storing metainformation which describe if first block has been set
pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set";
///Name of block column family
pub const CF_BLOCK_NAME: &str = "cf_block";
///Name of meta column family
pub const CF_META_NAME: &str = "cf_meta";
pub type DbResult<T> = Result<T, DbError>;
pub struct RocksDBIO {
pub db: DBWithThreadMode<MultiThreaded>,
}
impl RocksDBIO {
pub fn new(path: &Path, start_block: Option<Block>) -> DbResult<Self> {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
//ToDo: Add more column families for different data
let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_opts,
path,
vec![cfb, cfmeta],
);
let dbio = Self {
//There is no point in handling this from runner code
db: db.unwrap(),
};
let is_start_set = dbio.get_meta_is_first_block_set()?;
if is_start_set {
Ok(dbio)
} else if let Some(block) = start_block {
dbio.put_meta_first_block_in_db(block)?;
dbio.put_meta_is_first_block_set()?;
Ok(dbio)
} else {
warn!("Starting db in unset mode, will have to set starting block manually");
Ok(dbio)
}
}
pub fn meta_column(&self) -> Arc<BoundColumnFamily> {
self.db.cf_handle(CF_META_NAME).unwrap()
}
pub fn block_column(&self) -> Arc<BoundColumnFamily> {
self.db.cf_handle(CF_BLOCK_NAME).unwrap()
}
pub fn get_meta_first_block_in_db(&self) -> DbResult<u64> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(&cf_meta, DB_META_FIRST_BLOCK_IN_DB_KEY)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(u64::from_be_bytes(data.try_into().unwrap()))
} else {
Err(DbError::db_interaction_error(
"First block not found".to_string(),
))
}
}
pub fn get_meta_last_block_in_db(&self) -> DbResult<u64> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(&cf_meta, DB_META_LAST_BLOCK_IN_DB_KEY)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(u64::from_be_bytes(data.try_into().unwrap()))
} else {
Err(DbError::db_interaction_error(
"Last block not found".to_string(),
))
}
}
pub fn get_meta_is_first_block_set(&self) -> DbResult<bool> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(&cf_meta, DB_META_FIRST_BLOCK_SET_KEY)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(res.is_some())
}
pub fn put_meta_first_block_in_db(&self, block: Block) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
DB_META_FIRST_BLOCK_IN_DB_KEY.as_bytes(),
block.block_id.to_be_bytes(),
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
self.put_block(block)?;
Ok(())
}
pub fn put_meta_last_block_in_db(&self, block_id: u64) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
DB_META_LAST_BLOCK_IN_DB_KEY.as_bytes(),
block_id.to_be_bytes(),
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn put_meta_is_first_block_set(&self) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(&cf_meta, DB_META_FIRST_BLOCK_SET_KEY.as_bytes(), [1u8; 1])
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn put_block(&self, block: Block) -> DbResult<()> {
let cf_block = self.block_column();
let last_curr_block = self.get_meta_last_block_in_db()?;
if block.block_id > last_curr_block {
self.put_meta_last_block_in_db(block.block_id)?;
}
self.db
.put_cf(
&cf_block,
block.block_id.to_be_bytes(),
serde_json::to_vec(&block).map_err(|serr| {
DbError::serde_cast_message(
serr,
Some("Block Serialization failed".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn get_block(&self, block_id: u64) -> DbResult<Block> {
let cf_block = self.block_column();
let res = self
.db
.get_cf(&cf_block, block_id.to_be_bytes())
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(serde_json::from_slice::<Block>(&data).map_err(|serr| {
DbError::serde_cast_message(serr, Some("Block Deserialization failed".to_string()))
})?)
} else {
Err(DbError::db_interaction_error(
"Block on this id not found".to_string(),
))
}
}
}

View File

@ -0,0 +1,20 @@
use rs_merkle::Hasher;
use sha2::{digest::FixedOutput, Digest, Sha256};
use super::HashType;
#[derive(Debug, Clone)]
///Our own hasher.
/// Currently it is SHA256 hasher wrapper. May change in a future.
pub struct OwnHasher {}
impl Hasher for OwnHasher {
type Hash = HashType;
fn hash(data: &[u8]) -> HashType {
let mut hasher = Sha256::new();
hasher.update(data);
<HashType>::from(hasher.finalize_fixed())
}
}

View File

@ -0,0 +1,93 @@
use std::collections::HashMap;
use rs_merkle::{MerkleProof, MerkleTree};
use crate::transaction::Transaction;
use super::{hasher::OwnHasher, HashType};
pub struct PublicTransactionsMerkleTree {
leaves: HashMap<usize, Transaction>,
hash_to_id_map: HashMap<HashType, usize>,
tree: MerkleTree<OwnHasher>,
}
impl PublicTransactionsMerkleTree {
pub fn new(leaves_vec: Vec<Transaction>) -> Self {
let mut leaves_map = HashMap::new();
let mut hash_to_id_map = HashMap::new();
let leaves_hashed: Vec<HashType> = leaves_vec
.iter()
.enumerate()
.map(|(id, tx)| {
leaves_map.insert(id, tx.clone());
hash_to_id_map.insert(tx.hash, id);
tx.hash
})
.collect();
Self {
leaves: leaves_map,
hash_to_id_map,
tree: MerkleTree::from_leaves(&leaves_hashed),
}
}
pub fn get_tx(&self, hash: HashType) -> Option<&Transaction> {
self.hash_to_id_map
.get(&hash)
.and_then(|id| self.leaves.get(id))
}
pub fn get_root(&self) -> Option<HashType> {
self.tree.root()
}
pub fn get_proof(&self, hash: HashType) -> Option<MerkleProof<OwnHasher>> {
self.hash_to_id_map
.get(&hash)
.map(|id| self.tree.proof(&[*id]))
}
pub fn get_proof_multiple(&self, hashes: &[HashType]) -> Option<MerkleProof<OwnHasher>> {
let ids_opt: Vec<Option<&usize>> = hashes
.iter()
.map(|hash| self.hash_to_id_map.get(hash))
.collect();
let is_valid = ids_opt.iter().all(|el| el.is_some());
if is_valid {
let ids: Vec<usize> = ids_opt.into_iter().map(|el| *el.unwrap()).collect();
Some(self.tree.proof(&ids))
} else {
None
}
}
pub fn add_tx(&mut self, tx: Transaction) {
let last = self.leaves.len();
self.leaves.insert(last, tx.clone());
self.hash_to_id_map.insert(tx.hash, last);
self.tree.insert(tx.hash);
self.tree.commit();
}
pub fn add_tx_multiple(&mut self, txs: Vec<Transaction>) {
for tx in txs.iter() {
let last = self.leaves.len();
self.leaves.insert(last, tx.clone());
self.hash_to_id_map.insert(tx.hash, last);
}
self.tree
.append(&mut txs.iter().map(|tx| tx.hash).collect());
self.tree.commit();
}
}

View File

@ -0,0 +1,4 @@
pub mod hasher;
pub mod merkle_tree;
pub type HashType = [u8; 32];

View File

@ -0,0 +1,10 @@
use serde::{Deserialize, Serialize};
pub type TxHash = [u8; 32];
//ToDo: Update Tx model, when it is clear
#[derive(Debug, Serialize, Deserialize, Clone)]
///General transaction object
pub struct Transaction {
pub hash: TxHash,
}