diff --git a/Cargo.lock b/Cargo.lock index c6bb207..a53f375 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2668,6 +2668,8 @@ dependencies = [ "serde_json", "sha2 0.10.8", "storage", + "thiserror", + "tokio", "utxo", ] diff --git a/node_core/Cargo.toml b/node_core/Cargo.toml index 4ac9e9c..a93bd40 100644 --- a/node_core/Cargo.toml +++ b/node_core/Cargo.toml @@ -16,6 +16,8 @@ monotree.workspace = true bincode.workspace = true elliptic-curve.workspace = true reqwest.workspace = true +thiserror.workspace = true +tokio.workspace = true [dependencies.accounts] path = "../accounts" diff --git a/node_core/src/config.rs b/node_core/src/config.rs index f6cb680..435de4c 100644 --- a/node_core/src/config.rs +++ b/node_core/src/config.rs @@ -8,4 +8,8 @@ pub struct NodeConfig { pub home: PathBuf, ///Override rust log (env var logging level) pub override_rust_log: Option, + ///Sequencer URL + pub sequencer_addr: String, + ///Sequencer polling duration for new blocks in seconds + pub seq_poll_timeout_secs: u64, } diff --git a/node_core/src/lib.rs b/node_core/src/lib.rs index a59c5f9..3fd76a3 100644 --- a/node_core/src/lib.rs +++ b/node_core/src/lib.rs @@ -1,6 +1,14 @@ -use accounts::account_core::AccountAddress; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +use accounts::account_core::Account; +use anyhow::Result; use config::NodeConfig; +use sequencer_client::SequencerClient; use storage::NodeChainStore; +use tokio::{sync::Mutex, task::JoinHandle}; pub mod config; pub mod executions; @@ -8,8 +16,70 @@ pub mod sequencer_client; pub mod storage; pub struct NodeCore { - pub storage: NodeChainStore, - pub curr_height: u64, - pub main_acc_addr: AccountAddress, + pub storage: Arc>, + pub curr_height: Arc, + pub main_acc: Account, pub node_config: NodeConfig, + pub db_updater_handle: JoinHandle>, +} + +impl NodeCore { + pub async fn start_from_config_update_chain(config: NodeConfig) -> Result { + let client = SequencerClient::new(config.clone())?; + + let genesis_id = client.get_genesis_id().await?; + let genesis_block = client.get_block(genesis_id.genesis_id).await?.block; + + let mut storage = NodeChainStore::new_with_genesis(&config.home, genesis_block); + + let account = Account::new(); + + let mut chain_height = genesis_id.genesis_id; + + //Chain update loop + loop { + let next_block = chain_height + 1; + + if let Ok(block) = client.get_block(next_block).await { + storage.dissect_insert_block(block.block)?; + } else { + break; + } + + chain_height += 1; + } + + let wrapped_storage = Arc::new(Mutex::new(storage)); + let chain_height_wrapped = Arc::new(AtomicU64::new(chain_height)); + + let wrapped_storage_thread = wrapped_storage.clone(); + let wrapped_chain_height_thread = chain_height_wrapped.clone(); + let client_thread = client.clone(); + + let updater_handle = tokio::spawn(async move { + loop { + let next_block = wrapped_chain_height_thread.load(Ordering::Relaxed) + 1; + + if let Ok(block) = client_thread.get_block(next_block).await { + { + let mut storage_guard = wrapped_storage_thread.lock().await; + + storage_guard.dissect_insert_block(block.block)?; + } + + wrapped_chain_height_thread.store(next_block, Ordering::Relaxed); + } else { + tokio::time::sleep(std::time::Duration::from_secs(config.seq_poll_timeout_secs)).await; + } + } + }); + + Ok(Self { + storage: wrapped_storage, + curr_height: chain_height_wrapped, + main_acc: account, + node_config: config.clone(), + db_updater_handle: updater_handle, + }) + } } diff --git a/node_core/src/sequencer_client/json.rs b/node_core/src/sequencer_client/json.rs index 8b13789..d36b695 100644 --- a/node_core/src/sequencer_client/json.rs +++ b/node_core/src/sequencer_client/json.rs @@ -1 +1,68 @@ +use serde::{Deserialize, Serialize}; +use storage::{block::Block, transaction::Transaction}; +//Requests + +#[derive(Serialize, Deserialize, Debug)] +pub struct RegisterAccountRequest { + pub nullifier_public_key: Vec, + pub viewing_public_key: Vec, + pub address: [u8; 32], +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct SendTxRequest { + pub transaction: Transaction, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct GetBlockDataRequest { + pub block_id: u64, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct GetGenesisIdRequest {} + +//Responses + +#[derive(Serialize, Deserialize, Debug)] +pub struct RegisterAccountResponse { + pub status: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct SendTxResponse { + pub status: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct GetBlockDataResponse { + pub block: Block, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct GetGenesisIdResponse { + pub genesis_id: u64, +} + +//General + +#[derive(Debug, Clone, Serialize)] +pub struct SequencerRpcRequest { + jsonrpc: String, + pub method: String, + pub params: serde_json::Value, + pub id: u64, +} + +impl SequencerRpcRequest { + pub fn from_payload_version_2_0(method: String, payload: serde_json::Value) -> Self { + Self { + jsonrpc: "2.0".to_string(), + method, + params: payload, + //ToDo: Correct checking of id + id: 1, + } + } +} diff --git a/node_core/src/sequencer_client/mod.rs b/node_core/src/sequencer_client/mod.rs index f265ced..0fe041d 100644 --- a/node_core/src/sequencer_client/mod.rs +++ b/node_core/src/sequencer_client/mod.rs @@ -1,8 +1,132 @@ +use accounts::account_core::Account; +use anyhow::Result; +use json::{ + GetBlockDataRequest, GetBlockDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, + RegisterAccountRequest, RegisterAccountResponse, SendTxRequest, SendTxResponse, + SequencerRpcRequest, +}; +use k256::elliptic_curve::group::GroupEncoding; +use reqwest::Client; +use serde_json::Value; +use storage::transaction::Transaction; + use crate::config::NodeConfig; pub mod json; +#[derive(Clone)] pub struct SequencerClient { pub client: reqwest::Client, pub config: NodeConfig, } + +#[derive(thiserror::Error, Debug)] +pub enum SequencerClientError { + #[error("HTTP error")] + HTTPError(reqwest::Error), + #[error("Serde error")] + SerdeError(serde_json::Error), +} + +impl From for SequencerClientError { + fn from(value: reqwest::Error) -> Self { + SequencerClientError::HTTPError(value) + } +} + +impl From for SequencerClientError { + fn from(value: serde_json::Error) -> Self { + SequencerClientError::SerdeError(value) + } +} + +impl SequencerClient { + pub fn new(config: NodeConfig) -> Result { + Ok(Self { + client: Client::builder() + //Add more fiedls if needed + .timeout(std::time::Duration::from_secs(60)) + .build()?, + config, + }) + } + + pub async fn call_method_with_payload( + &self, + method: &str, + payload: Value, + ) -> Result { + let request = SequencerRpcRequest::from_payload_version_2_0(method.to_string(), payload); + + let call_builder = self.client.post(&self.config.sequencer_addr); + + let call_res = call_builder.json(&request).send().await?; + + let response = call_res.json::().await?; + + Ok(response) + } + + pub async fn get_block( + &self, + block_id: u64, + ) -> Result { + let block_req = GetBlockDataRequest { block_id }; + + let req = serde_json::to_value(block_req)?; + + let resp = self.call_method_with_payload("get_block", req).await?; + + let resp_deser = serde_json::from_value(resp)?; + + Ok(resp_deser) + } + + pub async fn send_tx( + &self, + transaction: Transaction, + ) -> Result { + let tx_req = SendTxRequest { transaction }; + + let req = serde_json::to_value(tx_req)?; + + let resp = self.call_method_with_payload("send_tx", req).await?; + + let resp_deser = serde_json::from_value(resp)?; + + Ok(resp_deser) + } + + pub async fn register_account( + &self, + account: &Account, + ) -> Result { + let acc_req = RegisterAccountRequest { + nullifier_public_key: account.key_holder.nullifer_public_key.to_bytes().to_vec(), + viewing_public_key: account.key_holder.viewing_public_key.to_bytes().to_vec(), + address: account.address, + }; + + let req = serde_json::to_value(acc_req)?; + + let resp = self + .call_method_with_payload("register_account", req) + .await?; + + let resp_deser = serde_json::from_value(resp)?; + + Ok(resp_deser) + } + + pub async fn get_genesis_id(&self) -> Result { + let genesis_req = GetGenesisIdRequest {}; + + let req = serde_json::to_value(genesis_req)?; + + let resp = self.call_method_with_payload("get_genesis", req).await?; + + let resp_deser = serde_json::from_value(resp)?; + + Ok(resp_deser) + } +} diff --git a/node_core/src/storage/mod.rs b/node_core/src/storage/mod.rs index 0f48c35..3e16864 100644 --- a/node_core/src/storage/mod.rs +++ b/node_core/src/storage/mod.rs @@ -2,12 +2,14 @@ use std::path::Path; use accounts::account_core::{Account, AccountAddress}; use accounts_store::NodeAccountsStore; +use anyhow::Result; use block_store::NodeBlockStore; -use rand::{rngs::OsRng, RngCore}; use storage::{ - block::{Block, HashableBlockData}, + block::Block, merkle_tree_public::merkle_tree::{PublicTransactionMerkleTree, UTXOCommitmentsMerkleTree}, + nullifier::UTXONullifier, nullifier_sparse_merkle_tree::NullifierSparseMerkleTree, + utxo_commitment::UTXOCommitment, }; pub mod accounts_store; @@ -25,30 +27,12 @@ pub struct NodeChainStore { } impl NodeChainStore { - pub fn new_with_genesis(home_dir: &Path, genesis_id: u64, is_genesis_random: bool) -> Self { + pub fn new_with_genesis(home_dir: &Path, genesis_block: Block) -> Self { let acc_store = NodeAccountsStore::default(); let nullifier_store = NullifierSparseMerkleTree::default(); let utxo_commitments_store = UTXOCommitmentsMerkleTree::new(vec![]); let pub_tx_store = PublicTransactionMerkleTree::new(vec![]); - let mut data = [0; 32]; - let mut prev_block_hash = [0; 32]; - - if is_genesis_random { - OsRng.fill_bytes(&mut data); - OsRng.fill_bytes(&mut prev_block_hash); - } - - let hashable_data = HashableBlockData { - block_id: genesis_id, - prev_block_id: genesis_id.saturating_sub(1), - transactions: vec![], - data: data.to_vec(), - prev_block_hash, - }; - - let genesis_block = Block::produce_block_from_hashable_data(hashable_data); - //Sequencer should panic if unable to open db, //as fixing this issue may require actions non-native to program scope let block_store = @@ -68,4 +52,30 @@ impl NodeChainStore { pub fn get_main_account_addr(&self) -> AccountAddress { self.node_main_account_info.address } + + pub fn dissect_insert_block(&mut self, block: Block) -> Result<()> { + for tx in &block.transactions { + self.utxo_commitments_store.add_tx_multiple( + tx.utxo_commitments_created_hashes + .clone() + .into_iter() + .map(|hash| UTXOCommitment { hash }) + .collect(), + ); + + self.nullifier_store.insert_items( + tx.nullifier_created_hashes + .clone() + .into_iter() + .map(|hash| UTXONullifier { utxo_hash: hash }) + .collect(), + )?; + + self.pub_tx_store.add_tx(tx.clone()); + } + + self.block_store.put_block_at_id(block)?; + + Ok(()) + } } diff --git a/sequencer_core/src/sequecer_store/block_store.rs b/sequencer_core/src/sequecer_store/block_store.rs index 69c889a..5f668d4 100644 --- a/sequencer_core/src/sequecer_store/block_store.rs +++ b/sequencer_core/src/sequecer_store/block_store.rs @@ -5,6 +5,7 @@ use storage::{block::Block, RocksDBIO}; pub struct SequecerBlockStore { dbio: RocksDBIO, + pub genesis_id: u64, } impl SequecerBlockStore { @@ -13,9 +14,11 @@ impl SequecerBlockStore { /// /// ATTENTION: Will overwrite genesis block. pub fn open_db_with_genesis(location: &Path, genesis_block: Option) -> Result { - Ok(Self { - dbio: RocksDBIO::new(location, genesis_block)?, - }) + let dbio = RocksDBIO::new(location, genesis_block)?; + + let genesis_id = dbio.get_meta_first_block_in_db()?; + + Ok(Self { dbio, genesis_id }) } ///Reopening existing database diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs index 5314f62..8e17f6f 100644 --- a/sequencer_rpc/src/process.rs +++ b/sequencer_rpc/src/process.rs @@ -11,8 +11,9 @@ use rpc_primitives::{ use crate::{ rpc_error_responce_inverter, types::rpc_structs::{ - GetBlockDataRequest, GetBlockDataResponse, HelloRequest, HelloResponse, - RegisterAccountRequest, RegisterAccountResponse, SendTxRequest, SendTxResponse, + GetBlockDataRequest, GetBlockDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, + HelloRequest, HelloResponse, RegisterAccountRequest, RegisterAccountResponse, + SendTxRequest, SendTxResponse, }, }; @@ -100,12 +101,27 @@ impl JsonHandler { respond(helperstruct) } + async fn process_get_genesis(&self, request: Request) -> Result { + let _get_genesis_req = GetGenesisIdRequest::parse(Some(request.params))?; + + let genesis_id = { + let state = self.sequencer_state.lock().await; + + state.store.block_store.genesis_id + }; + + let helperstruct = GetGenesisIdResponse { genesis_id }; + + respond(helperstruct) + } + pub async fn process_request_internal(&self, request: Request) -> Result { match request.method.as_ref() { "hello" => self.process_temp_hello(request).await, "register_account" => self.process_register_account_request(request).await, "send_tx" => self.process_send_tx(request).await, "get_block" => self.process_get_block_data(request).await, + "get_genesis" => self.process_get_genesis(request).await, _ => Err(RpcErr(RpcError::method_not_found(request.method))), } } diff --git a/sequencer_rpc/src/types/rpc_structs.rs b/sequencer_rpc/src/types/rpc_structs.rs index c8dc5d3..3e09507 100644 --- a/sequencer_rpc/src/types/rpc_structs.rs +++ b/sequencer_rpc/src/types/rpc_structs.rs @@ -27,10 +27,14 @@ pub struct GetBlockDataRequest { pub block_id: u64, } +#[derive(Serialize, Deserialize, Debug)] +pub struct GetGenesisIdRequest {} + parse_request!(HelloRequest); parse_request!(RegisterAccountRequest); parse_request!(SendTxRequest); parse_request!(GetBlockDataRequest); +parse_request!(GetGenesisIdRequest); #[derive(Serialize, Deserialize, Debug)] pub struct HelloResponse { @@ -51,3 +55,8 @@ pub struct SendTxResponse { pub struct GetBlockDataResponse { pub block: Block, } + +#[derive(Serialize, Deserialize, Debug)] +pub struct GetGenesisIdResponse { + pub genesis_id: u64, +}