fix: debug updates on node-sequencer synchronization

This commit is contained in:
Oleksandr Pravdyvyi 2024-12-29 14:11:47 +02:00
parent 1ce6141cc1
commit 90da8b45f4
19 changed files with 158 additions and 30 deletions

2
Cargo.lock generated
View File

@ -2729,6 +2729,7 @@ dependencies = [
"actix", "actix",
"actix-web", "actix-web",
"anyhow", "anyhow",
"clap",
"consensus", "consensus",
"env_logger", "env_logger",
"log", "log",
@ -4083,6 +4084,7 @@ dependencies = [
"consensus", "consensus",
"env_logger", "env_logger",
"futures", "futures",
"hex",
"log", "log",
"mempool", "mempool",
"networking", "networking",

View File

@ -12,4 +12,6 @@ pub struct NodeConfig {
pub sequencer_addr: String, pub sequencer_addr: String,
///Sequencer polling duration for new blocks in seconds ///Sequencer polling duration for new blocks in seconds
pub seq_poll_timeout_secs: u64, pub seq_poll_timeout_secs: u64,
///Port to listen
pub port: u16,
} }

View File

@ -72,6 +72,8 @@ impl NodeCore {
let client = Arc::new(SequencerClient::new(config.clone())?); let client = Arc::new(SequencerClient::new(config.clone())?);
let genesis_id = client.get_genesis_id().await?; let genesis_id = client.get_genesis_id().await?;
info!("Gesesis id is {genesis_id:?}");
let genesis_block = client.get_block(genesis_id.genesis_id).await?.block; let genesis_block = client.get_block(genesis_id.genesis_id).await?.block;
let mut storage = NodeChainStore::new_with_genesis(&config.home, genesis_block); let mut storage = NodeChainStore::new_with_genesis(&config.home, genesis_block);
@ -84,6 +86,7 @@ impl NodeCore {
if let Ok(block) = client.get_block(next_block).await { if let Ok(block) = client.get_block(next_block).await {
storage.dissect_insert_block(block.block)?; storage.dissect_insert_block(block.block)?;
info!("Preprocessed block with id {next_block:?}");
} else { } else {
break; break;
} }
@ -107,6 +110,7 @@ impl NodeCore {
let mut storage_guard = wrapped_storage_thread.write().await; let mut storage_guard = wrapped_storage_thread.write().await;
storage_guard.dissect_insert_block(block.block)?; storage_guard.dissect_insert_block(block.block)?;
info!("Processed block with id {next_block:?}");
} }
wrapped_chain_height_thread.store(next_block, Ordering::Relaxed); wrapped_chain_height_thread.store(next_block, Ordering::Relaxed);

View File

@ -67,3 +67,11 @@ impl SequencerRpcRequest {
} }
} }
} }
#[derive(Debug, Clone, Deserialize)]
pub struct SequencerRpcResponse {
pub jsonrpc: String,
pub result: serde_json::Value,
pub id: u64,
}

View File

@ -3,7 +3,7 @@ use anyhow::Result;
use json::{ use json::{
GetBlockDataRequest, GetBlockDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, GetBlockDataRequest, GetBlockDataResponse, GetGenesisIdRequest, GetGenesisIdResponse,
RegisterAccountRequest, RegisterAccountResponse, SendTxRequest, SendTxResponse, RegisterAccountRequest, RegisterAccountResponse, SendTxRequest, SendTxResponse,
SequencerRpcRequest, SequencerRpcRequest, SequencerRpcResponse,
}; };
use k256::elliptic_curve::group::GroupEncoding; use k256::elliptic_curve::group::GroupEncoding;
use reqwest::Client; use reqwest::Client;
@ -62,9 +62,9 @@ impl SequencerClient {
let call_res = call_builder.json(&request).send().await?; let call_res = call_builder.json(&request).send().await?;
let response = call_res.json::<Value>().await?; let response = call_res.json::<SequencerRpcResponse>().await?;
Ok(response) Ok(response.result)
} }
pub async fn get_block( pub async fn get_block(
@ -121,11 +121,11 @@ impl SequencerClient {
pub async fn get_genesis_id(&self) -> Result<GetGenesisIdResponse, SequencerClientError> { pub async fn get_genesis_id(&self) -> Result<GetGenesisIdResponse, SequencerClientError> {
let genesis_req = GetGenesisIdRequest {}; let genesis_req = GetGenesisIdRequest {};
let req = serde_json::to_value(genesis_req)?; let req = serde_json::to_value(genesis_req).unwrap();
let resp = self.call_method_with_payload("get_genesis", req).await?; let resp = self.call_method_with_payload("get_genesis", req).await.unwrap();
let resp_deser = serde_json::from_value(resp)?; let resp_deser = serde_json::from_value(resp).unwrap();
Ok(resp_deser) Ok(resp_deser)
} }

View File

@ -1,3 +1,5 @@
use std::sync::atomic::Ordering;
use actix_web::Error as HttpError; use actix_web::Error as HttpError;
use serde_json::Value; use serde_json::Value;
@ -12,8 +14,7 @@ use crate::{
types::{ types::{
err_rpc::cast_seq_client_error_into_rpc_error, err_rpc::cast_seq_client_error_into_rpc_error,
rpc_structs::{ rpc_structs::{
ExecuteSubscenarioRequest, ExecuteSubscenarioResponse, RegisterAccountRequest, ExecuteSubscenarioRequest, ExecuteSubscenarioResponse, GetBlockDataRequest, GetBlockDataResponse, GetLastBlockRequest, GetLastBlockResponse, RegisterAccountRequest, RegisterAccountResponse, SendTxRequest
RegisterAccountResponse, SendTxRequest,
}, },
}, },
}; };
@ -96,12 +97,50 @@ impl JsonHandler {
respond(helperstruct) respond(helperstruct)
} }
async fn process_get_block_data(&self, request: Request) -> Result<Value, RpcErr> {
let req = GetBlockDataRequest::parse(Some(request.params))?;
let block = {
let guard = self.node_chain_store.lock().await;
{
let read_guard = guard.storage.read().await;
read_guard.block_store.get_block_at_id(req.block_id)?
}
};
let helperstruct = GetBlockDataResponse {
block,
};
respond(helperstruct)
}
async fn process_get_last_block(&self, request: Request) -> Result<Value, RpcErr> {
let _req = GetLastBlockRequest::parse(Some(request.params))?;
let last_block = {
let guard = self.node_chain_store.lock().await;
guard.curr_height.load(Ordering::Relaxed)
};
let helperstruct = GetLastBlockResponse {
last_block,
};
respond(helperstruct)
}
pub async fn process_request_internal(&self, request: Request) -> Result<Value, RpcErr> { pub async fn process_request_internal(&self, request: Request) -> Result<Value, RpcErr> {
match request.method.as_ref() { match request.method.as_ref() {
//Todo : Add handling of more JSON RPC methods //Todo : Add handling of more JSON RPC methods
"register_account" => self.process_register_account(request).await, "register_account" => self.process_register_account(request).await,
"execute_subscenario" => self.process_request_execute_subscenario(request).await, "execute_subscenario" => self.process_request_execute_subscenario(request).await,
"send_tx" => self.process_send_tx(request).await, "send_tx" => self.process_send_tx(request).await,
"get_block" => self.process_get_block_data(request).await,
"get_last_block" => self.process_get_last_block(request).await,
_ => Err(RpcErr(RpcError::method_not_found(request.method))), _ => Err(RpcErr(RpcError::method_not_found(request.method))),
} }
} }

View File

@ -28,11 +28,15 @@ pub struct ExecuteSubscenarioRequest {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct GetGenesisIdRequest {} pub struct GetGenesisIdRequest {}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetLastBlockRequest {}
parse_request!(RegisterAccountRequest); parse_request!(RegisterAccountRequest);
parse_request!(SendTxRequest); parse_request!(SendTxRequest);
parse_request!(GetBlockDataRequest); parse_request!(GetBlockDataRequest);
parse_request!(GetGenesisIdRequest); parse_request!(GetGenesisIdRequest);
parse_request!(ExecuteSubscenarioRequest); parse_request!(ExecuteSubscenarioRequest);
parse_request!(GetLastBlockRequest);
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct HelloResponse { pub struct HelloResponse {
@ -63,3 +67,9 @@ pub struct ExecuteSubscenarioResponse {
pub struct GetGenesisIdResponse { pub struct GetGenesisIdResponse {
pub genesis_id: u64, pub genesis_id: u64,
} }
#[derive(Serialize, Deserialize, Debug)]
pub struct GetLastBlockResponse {
pub last_block: u64,
}

View File

@ -14,6 +14,10 @@ actix.workspace = true
actix-web.workspace = true actix-web.workspace = true
tokio.workspace = true tokio.workspace = true
[dependencies.clap]
features = ["derive", "env"]
workspace = true
[dependencies.accounts] [dependencies.accounts]
path = "../accounts" path = "../accounts"

View File

@ -0,0 +1,7 @@
{
"home": ".",
"override_rust_log": null,
"sequencer_addr": "http://127.0.0.1:3040",
"seq_poll_timeout_secs": 10,
"port": 3041
}

14
node_runner/src/config.rs Normal file
View File

@ -0,0 +1,14 @@
use std::path::PathBuf;
use anyhow::Result;
use node_core::config::NodeConfig;
use std::fs::File;
use std::io::BufReader;
pub fn from_file(config_home: PathBuf) -> Result<NodeConfig> {
let file = File::open(config_home)?;
let reader = BufReader::new(file);
Ok(serde_json::from_reader(reader)?)
}

View File

@ -1,31 +1,40 @@
use std::{path::PathBuf, sync::Arc}; use std::{path::PathBuf, sync::Arc};
use anyhow::Result; use anyhow::Result;
use clap::Parser;
use consensus::ConsensusManager; use consensus::ConsensusManager;
use log::info; use log::info;
use networking::peer_manager::PeerManager; use networking::peer_manager::PeerManager;
use node_core::{config::NodeConfig, NodeCore}; use node_core::NodeCore;
use node_rpc::new_http_server; use node_rpc::new_http_server;
use rpc_primitives::RpcConfig; use rpc_primitives::RpcConfig;
use tokio::sync::Mutex; use tokio::sync::Mutex;
pub mod config;
#[derive(Parser, Debug)]
#[clap(version)]
struct Args {
/// Path to configs
home_dir: PathBuf,
}
pub async fn main_runner() -> Result<()> { pub async fn main_runner() -> Result<()> {
env_logger::init(); env_logger::init();
//ToDo: Change it let args = Args::parse();
let node_config = NodeConfig { let Args { home_dir } = args;
home: PathBuf::new(),
override_rust_log: None,
sequencer_addr: "addr".to_string(),
seq_poll_timeout_secs: 1,
};
let node_core = NodeCore::start_from_config_update_chain(node_config.clone()).await?; let app_config = config::from_file(home_dir.join("node_config.json"))?;
let port = app_config.port;
let node_core = NodeCore::start_from_config_update_chain(app_config.clone()).await?;
let wrapped_node_core = Arc::new(Mutex::new(node_core)); let wrapped_node_core = Arc::new(Mutex::new(node_core));
let http_server = new_http_server( let http_server = new_http_server(
RpcConfig::default(), RpcConfig::with_port(port),
node_config.clone(), app_config.clone(),
wrapped_node_core.clone(), wrapped_node_core.clone(),
)?; )?;
info!("HTTP server started"); info!("HTTP server started");

View File

@ -1,4 +1,4 @@
use std::{path::PathBuf, time::Duration}; use std::path::PathBuf;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -15,5 +15,7 @@ pub struct SequencerConfig {
///Maximum number of transactions in block ///Maximum number of transactions in block
pub max_num_tx_in_block: usize, pub max_num_tx_in_block: usize,
///Interval in which blocks produced ///Interval in which blocks produced
pub block_create_timeout_millis: Duration, pub block_create_timeout_millis: u64,
///Port to listen
pub port: u16,
} }

View File

@ -181,7 +181,7 @@ impl SequencerCore {
.store .store
.block_store .block_store
.get_block_at_id(self.chain_height)? .get_block_at_id(self.chain_height)?
.prev_block_hash; .hash;
let hashable_data = HashableBlockData { let hashable_data = HashableBlockData {
block_id: self.chain_height + 1, block_id: self.chain_height + 1,

View File

@ -12,6 +12,7 @@ serde.workspace = true
actix.workspace = true actix.workspace = true
actix-cors.workspace = true actix-cors.workspace = true
futures.workspace = true futures.workspace = true
hex.workspace = true
actix-web.workspace = true actix-web.workspace = true
tokio.workspace = true tokio.workspace = true

View File

@ -11,9 +11,7 @@ use rpc_primitives::{
use crate::{ use crate::{
rpc_error_responce_inverter, rpc_error_responce_inverter,
types::rpc_structs::{ types::rpc_structs::{
GetBlockDataRequest, GetBlockDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, GetBlockDataRequest, GetBlockDataResponse, GetGenesisIdRequest, GetGenesisIdResponse, GetLastBlockRequest, GetLastBlockResponse, HelloRequest, HelloResponse, RegisterAccountRequest, RegisterAccountResponse, SendTxRequest, SendTxResponse
HelloRequest, HelloResponse, RegisterAccountRequest, RegisterAccountResponse,
SendTxRequest, SendTxResponse,
}, },
}; };
@ -115,6 +113,20 @@ impl JsonHandler {
respond(helperstruct) respond(helperstruct)
} }
async fn process_get_last_block(&self, request: Request) -> Result<Value, RpcErr> {
let _get_last_block_req = GetLastBlockRequest::parse(Some(request.params))?;
let last_block = {
let state = self.sequencer_state.lock().await;
state.chain_height
};
let helperstruct = GetLastBlockResponse { last_block };
respond(helperstruct)
}
pub async fn process_request_internal(&self, request: Request) -> Result<Value, RpcErr> { pub async fn process_request_internal(&self, request: Request) -> Result<Value, RpcErr> {
match request.method.as_ref() { match request.method.as_ref() {
"hello" => self.process_temp_hello(request).await, "hello" => self.process_temp_hello(request).await,
@ -122,6 +134,7 @@ impl JsonHandler {
"send_tx" => self.process_send_tx(request).await, "send_tx" => self.process_send_tx(request).await,
"get_block" => self.process_get_block_data(request).await, "get_block" => self.process_get_block_data(request).await,
"get_genesis" => self.process_get_genesis(request).await, "get_genesis" => self.process_get_genesis(request).await,
"get_last_block" => self.process_get_last_block(request).await,
_ => Err(RpcErr(RpcError::method_not_found(request.method))), _ => Err(RpcErr(RpcError::method_not_found(request.method))),
} }
} }

View File

@ -6,6 +6,8 @@ use sequencer_core::transaction_mempool::TransactionMempool;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use storage::block::Block; use storage::block::Block;
use storage::block::BlockId;
use storage::transaction::Transaction;
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct HelloRequest {} pub struct HelloRequest {}
@ -30,11 +32,15 @@ pub struct GetBlockDataRequest {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct GetGenesisIdRequest {} pub struct GetGenesisIdRequest {}
#[derive(Serialize, Deserialize, Debug)]
pub struct GetLastBlockRequest {}
parse_request!(HelloRequest); parse_request!(HelloRequest);
parse_request!(RegisterAccountRequest); parse_request!(RegisterAccountRequest);
parse_request!(SendTxRequest); parse_request!(SendTxRequest);
parse_request!(GetBlockDataRequest); parse_request!(GetBlockDataRequest);
parse_request!(GetGenesisIdRequest); parse_request!(GetGenesisIdRequest);
parse_request!(GetLastBlockRequest);
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct HelloResponse { pub struct HelloResponse {
@ -60,3 +66,8 @@ pub struct GetBlockDataResponse {
pub struct GetGenesisIdResponse { pub struct GetGenesisIdResponse {
pub genesis_id: u64, pub genesis_id: u64,
} }
#[derive(Serialize, Deserialize, Debug)]
pub struct GetLastBlockResponse {
pub last_block: u64,
}

View File

@ -4,5 +4,6 @@
"genesis_id": 1, "genesis_id": 1,
"is_genesis_random": true, "is_genesis_random": true,
"max_num_tx_in_block": 20, "max_num_tx_in_block": 20,
"block_create_timeout_millis": 10000 "block_create_timeout_millis": 10000,
"port": 3040
} }

View File

@ -24,6 +24,7 @@ pub async fn main_runner() -> Result<()> {
let app_config = config::from_file(home_dir.join("sequencer_config.json"))?; let app_config = config::from_file(home_dir.join("sequencer_config.json"))?;
let block_timeout = app_config.block_create_timeout_millis; let block_timeout = app_config.block_create_timeout_millis;
let port = app_config.port;
if let Some(ref rust_log) = app_config.override_rust_log { if let Some(ref rust_log) = app_config.override_rust_log {
info!("RUST_LOG env var set to {rust_log:?}"); info!("RUST_LOG env var set to {rust_log:?}");
@ -39,7 +40,7 @@ pub async fn main_runner() -> Result<()> {
let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core)); let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core));
let http_server = new_http_server(RpcConfig::default(), seq_core_wrapped.clone())?; let http_server = new_http_server(RpcConfig::with_port(port), seq_core_wrapped.clone())?;
info!("HTTP server started"); info!("HTTP server started");
let _http_server_handle = http_server.handle(); let _http_server_handle = http_server.handle();
tokio::spawn(http_server); tokio::spawn(http_server);
@ -48,7 +49,7 @@ pub async fn main_runner() -> Result<()> {
#[allow(clippy::empty_loop)] #[allow(clippy::empty_loop)]
loop { loop {
tokio::time::sleep(block_timeout).await; tokio::time::sleep(std::time::Duration::from_millis(block_timeout)).await;
info!("Collecting transactions from mempool, block creation"); info!("Collecting transactions from mempool, block creation");

View File

@ -845,9 +845,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.133" version = "1.0.134"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d"
dependencies = [ dependencies = [
"itoa", "itoa",
"memchr", "memchr",