feat: node to sequencer interaction

This commit is contained in:
Oleksandr Pravdyvyi 2024-12-05 13:05:58 +02:00
parent a80d198e88
commit f92a4e82e3
10 changed files with 337 additions and 30 deletions

2
Cargo.lock generated
View File

@ -2668,6 +2668,8 @@ dependencies = [
"serde_json",
"sha2 0.10.8",
"storage",
"thiserror",
"tokio",
"utxo",
]

View File

@ -16,6 +16,8 @@ monotree.workspace = true
bincode.workspace = true
elliptic-curve.workspace = true
reqwest.workspace = true
thiserror.workspace = true
tokio.workspace = true
[dependencies.accounts]
path = "../accounts"

View File

@ -8,4 +8,8 @@ pub struct NodeConfig {
pub home: PathBuf,
///Override rust log (env var logging level)
pub override_rust_log: Option<String>,
///Sequencer URL
pub sequencer_addr: String,
///Sequencer polling duration for new blocks in seconds
pub seq_poll_timeout_secs: u64,
}

View File

@ -1,6 +1,14 @@
use accounts::account_core::AccountAddress;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use accounts::account_core::Account;
use anyhow::Result;
use config::NodeConfig;
use sequencer_client::SequencerClient;
use storage::NodeChainStore;
use tokio::{sync::Mutex, task::JoinHandle};
pub mod config;
pub mod executions;
@ -8,8 +16,70 @@ pub mod sequencer_client;
pub mod storage;
pub struct NodeCore {
pub storage: NodeChainStore,
pub curr_height: u64,
pub main_acc_addr: AccountAddress,
pub storage: Arc<Mutex<NodeChainStore>>,
pub curr_height: Arc<AtomicU64>,
pub main_acc: Account,
pub node_config: NodeConfig,
pub db_updater_handle: JoinHandle<Result<()>>,
}
impl NodeCore {
pub async fn start_from_config_update_chain(config: NodeConfig) -> Result<Self> {
let client = SequencerClient::new(config.clone())?;
let genesis_id = client.get_genesis_id().await?;
let genesis_block = client.get_block(genesis_id.genesis_id).await?.block;
let mut storage = NodeChainStore::new_with_genesis(&config.home, genesis_block);
let account = Account::new();
let mut chain_height = genesis_id.genesis_id;
//Chain update loop
loop {
let next_block = chain_height + 1;
if let Ok(block) = client.get_block(next_block).await {
storage.dissect_insert_block(block.block)?;
} else {
break;
}
chain_height += 1;
}
let wrapped_storage = Arc::new(Mutex::new(storage));
let chain_height_wrapped = Arc::new(AtomicU64::new(chain_height));
let wrapped_storage_thread = wrapped_storage.clone();
let wrapped_chain_height_thread = chain_height_wrapped.clone();
let client_thread = client.clone();
let updater_handle = tokio::spawn(async move {
loop {
let next_block = wrapped_chain_height_thread.load(Ordering::Relaxed) + 1;
if let Ok(block) = client_thread.get_block(next_block).await {
{
let mut storage_guard = wrapped_storage_thread.lock().await;
storage_guard.dissect_insert_block(block.block)?;
}
wrapped_chain_height_thread.store(next_block, Ordering::Relaxed);
} else {
tokio::time::sleep(std::time::Duration::from_secs(config.seq_poll_timeout_secs)).await;
}
}
});
Ok(Self {
storage: wrapped_storage,
curr_height: chain_height_wrapped,
main_acc: account,
node_config: config.clone(),
db_updater_handle: updater_handle,
})
}
}

View File

@ -1 +1,68 @@
use serde::{Deserialize, Serialize};
use storage::{block::Block, transaction::Transaction};
//Requests
#[derive(Serialize, Deserialize, Debug)]
pub struct RegisterAccountRequest {
pub nullifier_public_key: Vec<u8>,
pub viewing_public_key: Vec<u8>,
pub address: [u8; 32],
}
#[derive(Serialize, Deserialize, Debug)]
pub struct SendTxRequest {
pub transaction: Transaction,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetBlockDataRequest {
pub block_id: u64,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetGenesisIdRequest {}
//Responses
#[derive(Serialize, Deserialize, Debug)]
pub struct RegisterAccountResponse {
pub status: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct SendTxResponse {
pub status: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetBlockDataResponse {
pub block: Block,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetGenesisIdResponse {
pub genesis_id: u64,
}
//General
#[derive(Debug, Clone, Serialize)]
pub struct SequencerRpcRequest {
jsonrpc: String,
pub method: String,
pub params: serde_json::Value,
pub id: u64,
}
impl SequencerRpcRequest {
pub fn from_payload_version_2_0(method: String, payload: serde_json::Value) -> Self {
Self {
jsonrpc: "2.0".to_string(),
method,
params: payload,
//ToDo: Correct checking of id
id: 1,
}
}
}

View File

@ -1,8 +1,132 @@
use accounts::account_core::Account;
use anyhow::Result;
use json::{
GetBlockDataRequest, GetBlockDataResponse, GetGenesisIdRequest, GetGenesisIdResponse,
RegisterAccountRequest, RegisterAccountResponse, SendTxRequest, SendTxResponse,
SequencerRpcRequest,
};
use k256::elliptic_curve::group::GroupEncoding;
use reqwest::Client;
use serde_json::Value;
use storage::transaction::Transaction;
use crate::config::NodeConfig;
pub mod json;
#[derive(Clone)]
pub struct SequencerClient {
pub client: reqwest::Client,
pub config: NodeConfig,
}
#[derive(thiserror::Error, Debug)]
pub enum SequencerClientError {
#[error("HTTP error")]
HTTPError(reqwest::Error),
#[error("Serde error")]
SerdeError(serde_json::Error),
}
impl From<reqwest::Error> for SequencerClientError {
fn from(value: reqwest::Error) -> Self {
SequencerClientError::HTTPError(value)
}
}
impl From<serde_json::Error> for SequencerClientError {
fn from(value: serde_json::Error) -> Self {
SequencerClientError::SerdeError(value)
}
}
impl SequencerClient {
pub fn new(config: NodeConfig) -> Result<Self> {
Ok(Self {
client: Client::builder()
//Add more fiedls if needed
.timeout(std::time::Duration::from_secs(60))
.build()?,
config,
})
}
pub async fn call_method_with_payload(
&self,
method: &str,
payload: Value,
) -> Result<Value, SequencerClientError> {
let request = SequencerRpcRequest::from_payload_version_2_0(method.to_string(), payload);
let call_builder = self.client.post(&self.config.sequencer_addr);
let call_res = call_builder.json(&request).send().await?;
let response = call_res.json::<Value>().await?;
Ok(response)
}
pub async fn get_block(
&self,
block_id: u64,
) -> Result<GetBlockDataResponse, SequencerClientError> {
let block_req = GetBlockDataRequest { block_id };
let req = serde_json::to_value(block_req)?;
let resp = self.call_method_with_payload("get_block", req).await?;
let resp_deser = serde_json::from_value(resp)?;
Ok(resp_deser)
}
pub async fn send_tx(
&self,
transaction: Transaction,
) -> Result<SendTxResponse, SequencerClientError> {
let tx_req = SendTxRequest { transaction };
let req = serde_json::to_value(tx_req)?;
let resp = self.call_method_with_payload("send_tx", req).await?;
let resp_deser = serde_json::from_value(resp)?;
Ok(resp_deser)
}
pub async fn register_account(
&self,
account: &Account,
) -> Result<RegisterAccountResponse, SequencerClientError> {
let acc_req = RegisterAccountRequest {
nullifier_public_key: account.key_holder.nullifer_public_key.to_bytes().to_vec(),
viewing_public_key: account.key_holder.viewing_public_key.to_bytes().to_vec(),
address: account.address,
};
let req = serde_json::to_value(acc_req)?;
let resp = self
.call_method_with_payload("register_account", req)
.await?;
let resp_deser = serde_json::from_value(resp)?;
Ok(resp_deser)
}
pub async fn get_genesis_id(&self) -> Result<GetGenesisIdResponse, SequencerClientError> {
let genesis_req = GetGenesisIdRequest {};
let req = serde_json::to_value(genesis_req)?;
let resp = self.call_method_with_payload("get_genesis", req).await?;
let resp_deser = serde_json::from_value(resp)?;
Ok(resp_deser)
}
}

View File

@ -2,12 +2,14 @@ use std::path::Path;
use accounts::account_core::{Account, AccountAddress};
use accounts_store::NodeAccountsStore;
use anyhow::Result;
use block_store::NodeBlockStore;
use rand::{rngs::OsRng, RngCore};
use storage::{
block::{Block, HashableBlockData},
block::Block,
merkle_tree_public::merkle_tree::{PublicTransactionMerkleTree, UTXOCommitmentsMerkleTree},
nullifier::UTXONullifier,
nullifier_sparse_merkle_tree::NullifierSparseMerkleTree,
utxo_commitment::UTXOCommitment,
};
pub mod accounts_store;
@ -25,30 +27,12 @@ pub struct NodeChainStore {
}
impl NodeChainStore {
pub fn new_with_genesis(home_dir: &Path, genesis_id: u64, is_genesis_random: bool) -> Self {
pub fn new_with_genesis(home_dir: &Path, genesis_block: Block) -> Self {
let acc_store = NodeAccountsStore::default();
let nullifier_store = NullifierSparseMerkleTree::default();
let utxo_commitments_store = UTXOCommitmentsMerkleTree::new(vec![]);
let pub_tx_store = PublicTransactionMerkleTree::new(vec![]);
let mut data = [0; 32];
let mut prev_block_hash = [0; 32];
if is_genesis_random {
OsRng.fill_bytes(&mut data);
OsRng.fill_bytes(&mut prev_block_hash);
}
let hashable_data = HashableBlockData {
block_id: genesis_id,
prev_block_id: genesis_id.saturating_sub(1),
transactions: vec![],
data: data.to_vec(),
prev_block_hash,
};
let genesis_block = Block::produce_block_from_hashable_data(hashable_data);
//Sequencer should panic if unable to open db,
//as fixing this issue may require actions non-native to program scope
let block_store =
@ -68,4 +52,30 @@ impl NodeChainStore {
pub fn get_main_account_addr(&self) -> AccountAddress {
self.node_main_account_info.address
}
pub fn dissect_insert_block(&mut self, block: Block) -> Result<()> {
for tx in &block.transactions {
self.utxo_commitments_store.add_tx_multiple(
tx.utxo_commitments_created_hashes
.clone()
.into_iter()
.map(|hash| UTXOCommitment { hash })
.collect(),
);
self.nullifier_store.insert_items(
tx.nullifier_created_hashes
.clone()
.into_iter()
.map(|hash| UTXONullifier { utxo_hash: hash })
.collect(),
)?;
self.pub_tx_store.add_tx(tx.clone());
}
self.block_store.put_block_at_id(block)?;
Ok(())
}
}

View File

@ -5,6 +5,7 @@ use storage::{block::Block, RocksDBIO};
pub struct SequecerBlockStore {
dbio: RocksDBIO,
pub genesis_id: u64,
}
impl SequecerBlockStore {
@ -13,9 +14,11 @@ impl SequecerBlockStore {
///
/// ATTENTION: Will overwrite genesis block.
pub fn open_db_with_genesis(location: &Path, genesis_block: Option<Block>) -> Result<Self> {
Ok(Self {
dbio: RocksDBIO::new(location, genesis_block)?,
})
let dbio = RocksDBIO::new(location, genesis_block)?;
let genesis_id = dbio.get_meta_first_block_in_db()?;
Ok(Self { dbio, genesis_id })
}
///Reopening existing database

View File

@ -11,8 +11,9 @@ use rpc_primitives::{
use crate::{
rpc_error_responce_inverter,
types::rpc_structs::{
GetBlockDataRequest, GetBlockDataResponse, HelloRequest, HelloResponse,
RegisterAccountRequest, RegisterAccountResponse, SendTxRequest, SendTxResponse,
GetBlockDataRequest, GetBlockDataResponse, GetGenesisIdRequest, GetGenesisIdResponse,
HelloRequest, HelloResponse, RegisterAccountRequest, RegisterAccountResponse,
SendTxRequest, SendTxResponse,
},
};
@ -100,12 +101,27 @@ impl JsonHandler {
respond(helperstruct)
}
async fn process_get_genesis(&self, request: Request) -> Result<Value, RpcErr> {
let _get_genesis_req = GetGenesisIdRequest::parse(Some(request.params))?;
let genesis_id = {
let state = self.sequencer_state.lock().await;
state.store.block_store.genesis_id
};
let helperstruct = GetGenesisIdResponse { genesis_id };
respond(helperstruct)
}
pub async fn process_request_internal(&self, request: Request) -> Result<Value, RpcErr> {
match request.method.as_ref() {
"hello" => self.process_temp_hello(request).await,
"register_account" => self.process_register_account_request(request).await,
"send_tx" => self.process_send_tx(request).await,
"get_block" => self.process_get_block_data(request).await,
"get_genesis" => self.process_get_genesis(request).await,
_ => Err(RpcErr(RpcError::method_not_found(request.method))),
}
}

View File

@ -27,10 +27,14 @@ pub struct GetBlockDataRequest {
pub block_id: u64,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetGenesisIdRequest {}
parse_request!(HelloRequest);
parse_request!(RegisterAccountRequest);
parse_request!(SendTxRequest);
parse_request!(GetBlockDataRequest);
parse_request!(GetGenesisIdRequest);
#[derive(Serialize, Deserialize, Debug)]
pub struct HelloResponse {
@ -51,3 +55,8 @@ pub struct SendTxResponse {
pub struct GetBlockDataResponse {
pub block: Block,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetGenesisIdResponse {
pub genesis_id: u64,
}