From 42cfac8a746f51d2ccee79d6992d68bd01bcafc9 Mon Sep 17 00:00:00 2001 From: Oleksandr Pravdyvyi Date: Mon, 30 Sep 2024 05:49:46 +0300 Subject: [PATCH 1/2] feat: adding rpc interfaces into sequencer and node --- Cargo.toml | 6 +- consensus/src/lib.rs | 2 +- mempool/src/lib.rs | 2 +- networking/src/lib.rs | 2 +- node_core/src/lib.rs | 2 +- node_rpc/Cargo.toml | 8 +- node_rpc/src/lib.rs | 40 +- node_rpc/src/net_utils.rs | 64 +++ node_rpc/src/process.rs | 53 +++ node_rpc/src/types/err_rpc.rs | 47 +++ node_rpc/src/types/mod.rs | 3 + node_rpc/src/types/parse.rs | 1 + node_rpc/src/types/rpc_structs.rs | 16 + node_runner/Cargo.toml | 6 +- node_runner/src/lib.rs | 17 + node_runner/src/main.rs | 17 +- rpc_primitives/Cargo.toml | 11 + rpc_primitives/src/errors.rs | 185 +++++++++ rpc_primitives/src/lib.rs | 72 ++++ rpc_primitives/src/message.rs | 551 +++++++++++++++++++++++++ rpc_primitives/src/parser.rs | 27 ++ sequencer_rpc/Cargo.toml | 8 +- sequencer_rpc/src/lib.rs | 40 +- sequencer_rpc/src/net_utils.rs | 64 +++ sequencer_rpc/src/process.rs | 53 +++ sequencer_rpc/src/types/err_rpc.rs | 47 +++ sequencer_rpc/src/types/mod.rs | 3 + sequencer_rpc/src/types/parse.rs | 1 + sequencer_rpc/src/types/rpc_structs.rs | 16 + sequencer_runner/Cargo.toml | 6 +- sequencer_runner/src/lib.rs | 17 + sequencer_runner/src/main.rs | 17 +- storage/src/lib.rs | 2 +- utxo/src/lib.rs | 2 +- zkvm/src/lib.rs | 2 +- 35 files changed, 1390 insertions(+), 20 deletions(-) create mode 100644 node_rpc/src/net_utils.rs create mode 100644 node_rpc/src/process.rs create mode 100644 node_rpc/src/types/err_rpc.rs create mode 100644 node_rpc/src/types/mod.rs create mode 100644 node_rpc/src/types/parse.rs create mode 100644 node_rpc/src/types/rpc_structs.rs create mode 100644 node_runner/src/lib.rs create mode 100644 rpc_primitives/Cargo.toml create mode 100644 rpc_primitives/src/errors.rs create mode 100644 rpc_primitives/src/lib.rs create mode 100644 rpc_primitives/src/message.rs create mode 100644 rpc_primitives/src/parser.rs create mode 100644 sequencer_rpc/src/net_utils.rs create mode 100644 sequencer_rpc/src/process.rs create mode 100644 sequencer_rpc/src/types/err_rpc.rs create mode 100644 sequencer_rpc/src/types/mod.rs create mode 100644 sequencer_rpc/src/types/parse.rs create mode 100644 sequencer_rpc/src/types/rpc_structs.rs create mode 100644 sequencer_runner/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 70cd909..cd337b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,8 @@ members = [ "mempool", "zkvm", "node_core", - "sequencer_core", + "sequencer_core", + "rpc_primitives", ] [workspace.dependencies] @@ -23,6 +24,9 @@ num_cpus = "1.13.1" openssl = { version = "0.10", features = ["vendored"] } openssl-probe = { version = "0.1.2" } serde_json = "1.0.81" +actix = "0.13.0" +actix-cors = "0.6.1" +futures = "0.3" env_logger = "0.10" log = "0.4" diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index bca8166..2517fb1 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -1 +1 @@ -//ToDo: Add consensus module \ No newline at end of file +//ToDo: Add consensus module diff --git a/mempool/src/lib.rs b/mempool/src/lib.rs index 9ac2d43..d12d785 100644 --- a/mempool/src/lib.rs +++ b/mempool/src/lib.rs @@ -1 +1 @@ -//ToDo: Add mempool module \ No newline at end of file +//ToDo: Add mempool module diff --git a/networking/src/lib.rs b/networking/src/lib.rs index 7ed5171..57df9c8 100644 --- a/networking/src/lib.rs +++ b/networking/src/lib.rs @@ -1 +1 @@ -//ToDo: Add networking module \ No newline at end of file +//ToDo: Add networking module diff --git a/node_core/src/lib.rs b/node_core/src/lib.rs index cd8cf43..7f92203 100644 --- a/node_core/src/lib.rs +++ b/node_core/src/lib.rs @@ -1 +1 @@ -//ToDo: Add node_core module \ No newline at end of file +//ToDo: Add node_core module diff --git a/node_rpc/Cargo.toml b/node_rpc/Cargo.toml index faaf3b9..ef65de0 100644 --- a/node_rpc/Cargo.toml +++ b/node_rpc/Cargo.toml @@ -9,6 +9,9 @@ serde_json.workspace = true env_logger.workspace = true log.workspace = true serde.workspace = true +actix.workspace = true +actix-cors.workspace = true +futures.workspace = true actix-web.workspace = true @@ -34,4 +37,7 @@ path = "../vm" path = "../zkvm" [dependencies.node_core] -path = "../node_core" \ No newline at end of file +path = "../node_core" + +[dependencies.rpc_primitives] +path = "../rpc_primitives" \ No newline at end of file diff --git a/node_rpc/src/lib.rs b/node_rpc/src/lib.rs index f2f198c..71a3b8f 100644 --- a/node_rpc/src/lib.rs +++ b/node_rpc/src/lib.rs @@ -1 +1,39 @@ -//ToDo: Add node_rpc module \ No newline at end of file +pub mod net_utils; +pub mod process; +pub mod types; + +use rpc_primitives::{ + errors::{RpcError, RpcErrorKind}, + RpcPollingConfig, +}; +use serde::Serialize; +use serde_json::Value; + +pub use net_utils::*; + +use self::types::err_rpc::RpcErr; + +//ToDo: Add necessary fields +pub struct JsonHandler { + pub polling_config: RpcPollingConfig, +} + +fn respond(val: T) -> Result { + Ok(serde_json::to_value(val)?) +} + +pub fn rpc_error_responce_inverter(err: RpcError) -> RpcError { + let mut content: Option = None; + if err.error_struct.is_some() { + content = match err.error_struct.clone().unwrap() { + RpcErrorKind::HandlerError(val) | RpcErrorKind::InternalError(val) => Some(val), + RpcErrorKind::RequestValidationError(vall) => Some(serde_json::to_value(vall).unwrap()), + }; + } + RpcError { + error_struct: None, + code: err.code, + message: err.message, + data: content, + } +} diff --git a/node_rpc/src/net_utils.rs b/node_rpc/src/net_utils.rs new file mode 100644 index 0000000..35c46ed --- /dev/null +++ b/node_rpc/src/net_utils.rs @@ -0,0 +1,64 @@ +use std::io; + +use actix_cors::Cors; +use actix_web::{http, middleware, web, App, Error as HttpError, HttpResponse, HttpServer}; +use futures::Future; +use futures::FutureExt; +use log::info; + +use rpc_primitives::message::Message; +use rpc_primitives::RpcConfig; + +use super::JsonHandler; + +pub const SHUTDOWN_TIMEOUT_SECS: u64 = 10; + +fn rpc_handler( + message: web::Json, + handler: web::Data, +) -> impl Future> { + let response = async move { + let message = handler.process(message.0).await?; + Ok(HttpResponse::Ok().json(&message)) + }; + response.boxed() +} + +fn get_cors(cors_allowed_origins: &[String]) -> Cors { + let mut cors = Cors::permissive(); + if cors_allowed_origins != ["*".to_string()] { + for origin in cors_allowed_origins { + cors = cors.allowed_origin(origin); + } + } + cors.allowed_methods(vec!["GET", "POST"]) + .allowed_headers(vec![http::header::AUTHORIZATION, http::header::ACCEPT]) + .allowed_header(http::header::CONTENT_TYPE) + .max_age(3600) +} + +#[allow(clippy::too_many_arguments)] +pub fn new_http_server(config: RpcConfig) -> io::Result { + let RpcConfig { + addr, + cors_allowed_origins, + polling_config, + limits_config, + } = config; + info!(target:"network", "Starting http server at {}", addr); + let handler = web::Data::new(JsonHandler { polling_config }); + + // HTTP server + Ok(HttpServer::new(move || { + App::new() + .wrap(get_cors(&cors_allowed_origins)) + .app_data(handler.clone()) + .app_data(web::JsonConfig::default().limit(limits_config.json_payload_max_size)) + .wrap(middleware::Logger::default()) + .service(web::resource("/").route(web::post().to(rpc_handler))) + }) + .bind(addr)? + .shutdown_timeout(SHUTDOWN_TIMEOUT_SECS) + .disable_signals() + .run()) +} diff --git a/node_rpc/src/process.rs b/node_rpc/src/process.rs new file mode 100644 index 0000000..aad6124 --- /dev/null +++ b/node_rpc/src/process.rs @@ -0,0 +1,53 @@ +use actix_web::Error as HttpError; +use serde_json::Value; + +use rpc_primitives::{ + errors::RpcError, + message::{Message, Request}, + parser::RpcRequest, +}; + +use crate::{ + rpc_error_responce_inverter, + types::rpc_structs::{HelloRequest, HelloResponse}, +}; + +use super::{respond, types::err_rpc::RpcErr, JsonHandler}; + +impl JsonHandler { + pub async fn process(&self, message: Message) -> Result { + let id = message.id(); + if let Message::Request(request) = message { + let message_inner = self + .process_request_internal(request) + .await + .map_err(|e| e.0) + .map_err(rpc_error_responce_inverter); + Ok(Message::response(id, message_inner)) + } else { + Ok(Message::error(RpcError::parse_error( + "JSON RPC Request format was expected".to_owned(), + ))) + } + } + + #[allow(clippy::unused_async)] + ///Example of request processing + async fn process_temp_hello(&self, request: Request) -> Result { + let _hello_request = HelloRequest::parse(Some(request.params))?; + + let helperstruct = HelloResponse { + greeting: "HELLO_FROM_NODE".to_string(), + }; + + respond(helperstruct) + } + + pub async fn process_request_internal(&self, request: Request) -> Result { + match request.method.as_ref() { + //Todo : Add handling of more JSON RPC methods + "hello" => self.process_temp_hello(request).await, + _ => Err(RpcErr(RpcError::method_not_found(request.method))), + } + } +} diff --git a/node_rpc/src/types/err_rpc.rs b/node_rpc/src/types/err_rpc.rs new file mode 100644 index 0000000..f5998d2 --- /dev/null +++ b/node_rpc/src/types/err_rpc.rs @@ -0,0 +1,47 @@ +use log::debug; + +use rpc_primitives::errors::{RpcError, RpcParseError}; + +pub struct RpcErr(pub RpcError); + +pub type RpcErrInternal = anyhow::Error; + +pub trait RpcErrKind: 'static { + fn into_rpc_err(self) -> RpcError; +} + +impl From for RpcErr { + fn from(e: T) -> Self { + Self(e.into_rpc_err()) + } +} + +macro_rules! standard_rpc_err_kind { + ($type_name:path) => { + impl RpcErrKind for $type_name { + fn into_rpc_err(self) -> RpcError { + self.into() + } + } + }; +} +standard_rpc_err_kind!(RpcError); +standard_rpc_err_kind!(RpcParseError); + +impl RpcErrKind for serde_json::Error { + fn into_rpc_err(self) -> RpcError { + RpcError::serialization_error(&self.to_string()) + } +} + +impl RpcErrKind for RpcErrInternal { + fn into_rpc_err(self) -> RpcError { + RpcError::new_internal_error(None, &format!("{self:#?}")) + } +} + +#[allow(clippy::needless_pass_by_value)] +pub fn from_rpc_err_into_anyhow_err(rpc_err: RpcError) -> anyhow::Error { + debug!("Rpc error cast to anyhow error : err {rpc_err:?}"); + anyhow::anyhow!(format!("{rpc_err:#?}")) +} diff --git a/node_rpc/src/types/mod.rs b/node_rpc/src/types/mod.rs new file mode 100644 index 0000000..6084ddd --- /dev/null +++ b/node_rpc/src/types/mod.rs @@ -0,0 +1,3 @@ +pub mod err_rpc; +pub mod parse; +pub mod rpc_structs; diff --git a/node_rpc/src/types/parse.rs b/node_rpc/src/types/parse.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/node_rpc/src/types/parse.rs @@ -0,0 +1 @@ + diff --git a/node_rpc/src/types/rpc_structs.rs b/node_rpc/src/types/rpc_structs.rs new file mode 100644 index 0000000..4d7a2c4 --- /dev/null +++ b/node_rpc/src/types/rpc_structs.rs @@ -0,0 +1,16 @@ +use rpc_primitives::errors::RpcParseError; +use rpc_primitives::parse_request; +use rpc_primitives::parser::parse_params; +use rpc_primitives::parser::RpcRequest; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Serialize, Deserialize, Debug)] +pub struct HelloRequest {} + +parse_request!(HelloRequest); + +#[derive(Serialize, Deserialize, Debug)] +pub struct HelloResponse { + pub greeting: String, +} diff --git a/node_runner/Cargo.toml b/node_runner/Cargo.toml index ba384a9..757081d 100644 --- a/node_runner/Cargo.toml +++ b/node_runner/Cargo.toml @@ -9,6 +9,7 @@ serde_json.workspace = true env_logger.workspace = true log.workspace = true serde.workspace = true +actix.workspace = true actix-web.workspace = true tokio.workspace = true @@ -38,4 +39,7 @@ path = "../zkvm" path = "../node_rpc" [dependencies.node_core] -path = "../node_core" \ No newline at end of file +path = "../node_core" + +[dependencies.rpc_primitives] +path = "../rpc_primitives" \ No newline at end of file diff --git a/node_runner/src/lib.rs b/node_runner/src/lib.rs new file mode 100644 index 0000000..d8a957c --- /dev/null +++ b/node_runner/src/lib.rs @@ -0,0 +1,17 @@ +use anyhow::Result; +use log::info; +use node_rpc::new_http_server; +use rpc_primitives::RpcConfig; + +pub async fn main_runner() -> Result<()> { + env_logger::init(); + + let http_server = new_http_server(RpcConfig::default())?; + info!("HTTP server started"); + let _http_server_handle = http_server.handle(); + tokio::spawn(http_server); + + loop { + //ToDo: Insert activity into main loop + } +} diff --git a/node_runner/src/main.rs b/node_runner/src/main.rs index c55cfd5..2bc288d 100644 --- a/node_runner/src/main.rs +++ b/node_runner/src/main.rs @@ -1,5 +1,16 @@ -//ToDo: Add node_runner module +use anyhow::Result; -fn main() { - println!("Hello, world!"); +use node_runner::main_runner; + +pub const NUM_THREADS: usize = 4; + +fn main() -> Result<()> { + actix::System::with_tokio_rt(|| { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(NUM_THREADS) + .enable_all() + .build() + .unwrap() + }) + .block_on(main_runner()) } diff --git a/rpc_primitives/Cargo.toml b/rpc_primitives/Cargo.toml new file mode 100644 index 0000000..f015302 --- /dev/null +++ b/rpc_primitives/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "rpc_primitives" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow.workspace = true +serde_json.workspace = true +env_logger.workspace = true +log.workspace = true +serde.workspace = true diff --git a/rpc_primitives/src/errors.rs b/rpc_primitives/src/errors.rs new file mode 100644 index 0000000..f718baf --- /dev/null +++ b/rpc_primitives/src/errors.rs @@ -0,0 +1,185 @@ +use serde_json::{to_value, Value}; +use std::fmt; + +#[derive(serde::Serialize)] +pub struct RpcParseError(pub String); + +/// This struct may be returned from JSON RPC server in case of error +/// It is expected that that this struct has impls From<_> all other RPC errors +/// like [`RpcBlockError`](crate::types::blocks::RpcBlockError) +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct RpcError { + #[serde(flatten)] + pub error_struct: Option, + /// Deprecated please use the `error_struct` instead + pub code: i64, + /// Deprecated please use the `error_struct` instead + pub message: String, + /// Deprecated please use the `error_struct` instead + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +#[serde(tag = "name", content = "cause", rename_all = "SCREAMING_SNAKE_CASE")] +pub enum RpcErrorKind { + RequestValidationError(RpcRequestValidationErrorKind), + HandlerError(Value), + InternalError(Value), +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +#[serde(tag = "name", content = "info", rename_all = "SCREAMING_SNAKE_CASE")] +pub enum RpcRequestValidationErrorKind { + MethodNotFound { method_name: String }, + ParseError { error_message: String }, +} + +/// A general Server Error +#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq, Clone)] +pub enum ServerError { + Timeout, + Closed, +} + +impl RpcError { + /// A generic constructor. + /// + /// Mostly for completeness, doesn't do anything but filling in the corresponding fields. + pub fn new(code: i64, message: String, data: Option) -> Self { + RpcError { + code, + message, + data, + error_struct: None, + } + } + + /// Create an Invalid Param error. + pub fn invalid_params(data: impl serde::Serialize) -> Self { + let value = match to_value(data) { + Ok(value) => value, + Err(err) => { + return Self::server_error(Some(format!( + "Failed to serialize invalid parameters error: {:?}", + err.to_string() + ))) + } + }; + RpcError::new(-32_602, "Invalid params".to_owned(), Some(value)) + } + + /// Create a server error. + pub fn server_error(e: Option) -> Self { + RpcError::new( + -32_000, + "Server error".to_owned(), + e.map(|v| to_value(v).expect("Must be representable in JSON")), + ) + } + + /// Create a parse error. + pub fn parse_error(e: String) -> Self { + RpcError { + code: -32_700, + message: "Parse error".to_owned(), + data: Some(Value::String(e.clone())), + error_struct: Some(RpcErrorKind::RequestValidationError( + RpcRequestValidationErrorKind::ParseError { error_message: e }, + )), + } + } + + pub fn serialization_error(e: &str) -> Self { + RpcError::new_internal_error(Some(Value::String(e.to_owned())), e) + } + + /// Helper method to define extract `INTERNAL_ERROR` in separate `RpcErrorKind` + /// Returns `HANDLER_ERROR` if the error is not internal one + pub fn new_internal_or_handler_error(error_data: Option, error_struct: Value) -> Self { + if error_struct["name"] == "INTERNAL_ERROR" { + let error_message = match error_struct["info"].get("error_message") { + Some(Value::String(error_message)) => error_message.as_str(), + _ => "InternalError happened during serializing InternalError", + }; + Self::new_internal_error(error_data, error_message) + } else { + Self::new_handler_error(error_data, error_struct) + } + } + + pub fn new_internal_error(error_data: Option, info: &str) -> Self { + RpcError { + code: -32_000, + message: "Server error".to_owned(), + data: error_data, + error_struct: Some(RpcErrorKind::InternalError(serde_json::json!({ + "name": "INTERNAL_ERROR", + "info": serde_json::json!({"error_message": info}) + }))), + } + } + + fn new_handler_error(error_data: Option, error_struct: Value) -> Self { + RpcError { + code: -32_000, + message: "Server error".to_owned(), + data: error_data, + error_struct: Some(RpcErrorKind::HandlerError(error_struct)), + } + } + + /// Create a method not found error. + pub fn method_not_found(method: String) -> Self { + RpcError { + code: -32_601, + message: "Method not found".to_owned(), + data: Some(Value::String(method.clone())), + error_struct: Some(RpcErrorKind::RequestValidationError( + RpcRequestValidationErrorKind::MethodNotFound { + method_name: method, + }, + )), + } + } +} + +impl fmt::Display for RpcError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{self:?}") + } +} + +impl From for RpcError { + fn from(parse_error: RpcParseError) -> Self { + Self::parse_error(parse_error.0) + } +} + +impl From for RpcError { + fn from(_: std::convert::Infallible) -> Self { + unsafe { core::hint::unreachable_unchecked() } + } +} + +impl fmt::Display for ServerError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ServerError::Timeout => write!(f, "ServerError: Timeout"), + ServerError::Closed => write!(f, "ServerError: Closed"), + } + } +} + +impl From for RpcError { + fn from(e: ServerError) -> RpcError { + let error_data = match to_value(&e) { + Ok(value) => value, + Err(_err) => { + return RpcError::new_internal_error(None, "Failed to serialize ServerError") + } + }; + RpcError::new_internal_error(Some(error_data), e.to_string().as_str()) + } +} diff --git a/rpc_primitives/src/lib.rs b/rpc_primitives/src/lib.rs new file mode 100644 index 0000000..5e3d870 --- /dev/null +++ b/rpc_primitives/src/lib.rs @@ -0,0 +1,72 @@ +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +pub mod errors; +pub mod message; +pub mod parser; + +#[derive(Serialize, Deserialize, Clone, Copy, Debug)] +pub struct RpcPollingConfig { + pub polling_interval: Duration, + pub polling_timeout: Duration, +} + +impl Default for RpcPollingConfig { + fn default() -> Self { + Self { + polling_interval: Duration::from_millis(500), + polling_timeout: Duration::from_secs(10), + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct RpcLimitsConfig { + /// Maximum byte size of the json payload. + pub json_payload_max_size: usize, +} + +impl Default for RpcLimitsConfig { + fn default() -> Self { + Self { + json_payload_max_size: 10 * 1024 * 1024, + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct RpcConfig { + pub addr: String, + pub cors_allowed_origins: Vec, + pub polling_config: RpcPollingConfig, + #[serde(default)] + pub limits_config: RpcLimitsConfig, +} + +impl Default for RpcConfig { + fn default() -> Self { + RpcConfig { + addr: "0.0.0.0:3040".to_owned(), + cors_allowed_origins: vec!["*".to_owned()], + polling_config: RpcPollingConfig::default(), + limits_config: RpcLimitsConfig::default(), + } + } +} + +impl RpcConfig { + pub fn new(addr: &str) -> Self { + RpcConfig { + addr: addr.to_owned(), + ..Default::default() + } + } + + pub fn with_port(port: u16) -> Self { + RpcConfig { + addr: format!("0.0.0.0:{port}"), + ..Default::default() + } + } +} diff --git a/rpc_primitives/src/message.rs b/rpc_primitives/src/message.rs new file mode 100644 index 0000000..523fcca --- /dev/null +++ b/rpc_primitives/src/message.rs @@ -0,0 +1,551 @@ +// Copyright 2017 tokio-jsonrpc Developers +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! JSON-RPC 2.0 messages. +//! +//! The main entrypoint here is the [Message](enum.Message.html). The others are just building +//! blocks and you should generally work with `Message` instead. +use serde::de::{Deserializer, Error, Unexpected, Visitor}; +use serde::ser::{SerializeStruct, Serializer}; +use serde_json::{Result as JsonResult, Value}; +use std::fmt::{Formatter, Result as FmtResult}; + +use super::errors::RpcError; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +struct Version; + +impl serde::Serialize for Version { + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_str("2.0") + } +} + +impl<'de> serde::Deserialize<'de> for Version { + fn deserialize>(deserializer: D) -> Result { + struct VersionVisitor; + impl<'de> Visitor<'de> for VersionVisitor { + type Value = Version; + + fn expecting(&self, formatter: &mut Formatter<'_>) -> FmtResult { + formatter.write_str("a version string") + } + + fn visit_str(self, value: &str) -> Result { + match value { + "2.0" => Ok(Version), + _ => Err(E::invalid_value(Unexpected::Str(value), &"value 2.0")), + } + } + } + deserializer.deserialize_str(VersionVisitor) + } +} + +/// An RPC request. +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct Request { + jsonrpc: Version, + pub method: String, + #[serde(default, skip_serializing_if = "Value::is_null")] + pub params: Value, + pub id: Value, +} + +impl Request { + /// Answer the request with a (positive) reply. + /// + /// The ID is taken from the request. + pub fn reply(&self, reply: Value) -> Message { + Message::Response(Response { + jsonrpc: Version, + result: Ok(reply), + id: self.id.clone(), + }) + } + /// Answer the request with an error. + pub fn error(&self, error: RpcError) -> Message { + Message::Response(Response { + jsonrpc: Version, + result: Err(error), + id: self.id.clone(), + }) + } +} + +/// A response to an RPC. +/// +/// It is created by the methods on [Request](struct.Request.html). +#[derive(Debug, Clone, PartialEq)] +pub struct Response { + jsonrpc: Version, + pub result: Result, + pub id: Value, +} + +impl serde::Serialize for Response { + fn serialize(&self, serializer: S) -> Result { + let mut sub = serializer.serialize_struct("Response", 3)?; + sub.serialize_field("jsonrpc", &self.jsonrpc)?; + match self.result { + Ok(ref value) => sub.serialize_field("result", value), + Err(ref err) => sub.serialize_field("error", err), + }?; + sub.serialize_field("id", &self.id)?; + sub.end() + } +} + +/// Deserializer for `Option` that produces `Some(Value::Null)`. +/// +/// The usual one produces None in that case. But we need to know the difference between +/// `{x: null}` and `{}`. +fn some_value<'de, D: Deserializer<'de>>(deserializer: D) -> Result, D::Error> { + serde::Deserialize::deserialize(deserializer).map(Some) +} + +/// A helper trick for deserialization. +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +struct WireResponse { + // It is actually used to eat and sanity check the deserialized text + #[allow(dead_code)] + jsonrpc: Version, + // Make sure we accept null as Some(Value::Null), instead of going to None + #[serde(default, deserialize_with = "some_value")] + result: Option, + error: Option, + id: Value, +} + +// Implementing deserialize is hard. We sidestep the difficulty by deserializing a similar +// structure that directly corresponds to whatever is on the wire and then convert it to our more +// convenient representation. +impl<'de> serde::Deserialize<'de> for Response { + fn deserialize>(deserializer: D) -> Result { + let wr: WireResponse = serde::Deserialize::deserialize(deserializer)?; + let result = match (wr.result, wr.error) { + (Some(res), None) => Ok(res), + (None, Some(err)) => Err(err), + _ => { + let err = D::Error::custom("Either 'error' or 'result' is expected, but not both"); + return Err(err); + } + }; + Ok(Response { + jsonrpc: Version, + result, + id: wr.id, + }) + } +} + +/// A notification (doesn't expect an answer). +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct Notification { + jsonrpc: Version, + pub method: String, + #[serde(default, skip_serializing_if = "Value::is_null")] + pub params: Value, +} + +/// One message of the JSON RPC protocol. +/// +/// One message, directly mapped from the structures of the protocol. See the +/// [specification](http://www.jsonrpc.org/specification) for more details. +/// +/// Since the protocol allows one endpoint to be both client and server at the same time, the +/// message can decode and encode both directions of the protocol. +/// +/// The `Batch` variant is supposed to be created directly, without a constructor. +/// +/// The `UnmatchedSub` variant is used when a request is an array and some of the subrequests +/// aren't recognized as valid json rpc 2.0 messages. This is never returned as a top-level +/// element, it is returned as `Err(Broken::Unmatched)`. +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +#[serde(untagged)] +pub enum Message { + /// An RPC request. + Request(Request), + /// A response to a Request. + Response(Response), + /// A notification. + Notification(Notification), + /// A batch of more requests or responses. + /// + /// The protocol allows bundling multiple requests, notifications or responses to a single + /// message. + /// + /// This variant has no direct constructor and is expected to be constructed manually. + Batch(Vec), + /// An unmatched sub entry in a `Batch`. + /// + /// When there's a `Batch` and an element doesn't comform to the JSONRPC 2.0 format, that one + /// is represented by this. This is never produced as a top-level value when parsing, the + /// `Err(Broken::Unmatched)` is used instead. It is not possible to serialize. + #[serde(skip_serializing)] + UnmatchedSub(Value), +} + +impl Message { + /// A constructor for a request. + /// + /// The ID is auto-set to dontcare. + pub fn request(method: String, params: Value) -> Self { + let id = Value::from("dontcare"); + Message::Request(Request { + jsonrpc: Version, + method, + params, + id, + }) + } + /// Create a top-level error (without an ID). + pub fn error(error: RpcError) -> Self { + Message::Response(Response { + jsonrpc: Version, + result: Err(error), + id: Value::Null, + }) + } + /// A constructor for a notification. + pub fn notification(method: String, params: Value) -> Self { + Message::Notification(Notification { + jsonrpc: Version, + method, + params, + }) + } + /// A constructor for a response. + pub fn response(id: Value, result: Result) -> Self { + Message::Response(Response { + jsonrpc: Version, + result, + id, + }) + } + /// Returns id or Null if there is no id. + pub fn id(&self) -> Value { + match self { + Message::Request(req) => req.id.clone(), + _ => Value::Null, + } + } +} + +/// A broken message. +/// +/// Protocol-level errors. +#[derive(Debug, Clone, PartialEq, serde::Deserialize)] +#[serde(untagged)] +pub enum Broken { + /// It was valid JSON, but doesn't match the form of a JSONRPC 2.0 message. + Unmatched(Value), + /// Invalid JSON. + #[serde(skip_deserializing)] + SyntaxError(String), +} + +impl Broken { + /// Generate an appropriate error message. + /// + /// The error message for these things are specified in the RFC, so this just creates an error + /// with the right values. + pub fn reply(&self) -> Message { + match *self { + Broken::Unmatched(_) => Message::error(RpcError::parse_error( + "JSON RPC Request format was expected".to_owned(), + )), + Broken::SyntaxError(ref e) => Message::error(RpcError::parse_error(e.clone())), + } + } +} + +/// A trick to easily deserialize and detect valid JSON, but invalid Message. +#[derive(serde::Deserialize)] +#[serde(untagged)] +pub enum WireMessage { + Message(Message), + Broken(Broken), +} + +pub fn decoded_to_parsed(res: JsonResult) -> Parsed { + match res { + Ok(WireMessage::Message(Message::UnmatchedSub(value))) => Err(Broken::Unmatched(value)), + Ok(WireMessage::Message(m)) => Ok(m), + Ok(WireMessage::Broken(b)) => Err(b), + Err(e) => Err(Broken::SyntaxError(e.to_string())), + } +} + +pub type Parsed = Result; + +/// Read a [Message](enum.Message.html) from a slice. +/// +/// Invalid JSON or JSONRPC messages are reported as [Broken](enum.Broken.html). +pub fn from_slice(s: &[u8]) -> Parsed { + decoded_to_parsed(::serde_json::de::from_slice(s)) +} + +/// Read a [Message](enum.Message.html) from a string. +/// +/// Invalid JSON or JSONRPC messages are reported as [Broken](enum.Broken.html). +pub fn from_str(s: &str) -> Parsed { + from_slice(s.as_bytes()) +} + +impl From for String { + fn from(val: Message) -> Self { + ::serde_json::ser::to_string(&val).unwrap() + } +} + +impl From for Vec { + fn from(val: Message) -> Self { + ::serde_json::ser::to_vec(&val).unwrap() + } +} + +#[cfg(test)] +mod tests { + use serde_json::de::from_slice; + use serde_json::json; + use serde_json::ser::to_vec; + use serde_json::Value; + + use super::*; + + /// Test serialization and deserialization of the Message + /// + /// We first deserialize it from a string. That way we check deserialization works. + /// But since serialization doesn't have to produce the exact same result (order, spaces, …), + /// we then serialize and deserialize the thing again and check it matches. + #[test] + #[allow(clippy::too_many_lines)] + fn message_serde() { + // A helper for running one message test + fn one(input: &str, expected: &Message) { + let parsed: Message = from_str(input).unwrap(); + assert_eq!(*expected, parsed); + let serialized = to_vec(&parsed).unwrap(); + let deserialized: Message = from_slice(&serialized).unwrap(); + assert_eq!(parsed, deserialized); + } + + // A request without parameters + one( + r#"{"jsonrpc": "2.0", "method": "call", "id": 1}"#, + &Message::Request(Request { + jsonrpc: Version, + method: "call".to_owned(), + params: Value::Null, + id: json!(1), + }), + ); + // A request with parameters + one( + r#"{"jsonrpc": "2.0", "method": "call", "params": [1, 2, 3], "id": 2}"#, + &Message::Request(Request { + jsonrpc: Version, + method: "call".to_owned(), + params: json!([1, 2, 3]), + id: json!(2), + }), + ); + // A notification (with parameters) + one( + r#"{"jsonrpc": "2.0", "method": "notif", "params": {"x": "y"}}"#, + &Message::Notification(Notification { + jsonrpc: Version, + method: "notif".to_owned(), + params: json!({"x": "y"}), + }), + ); + // A successful response + one( + r#"{"jsonrpc": "2.0", "result": 42, "id": 3}"#, + &Message::Response(Response { + jsonrpc: Version, + result: Ok(json!(42)), + id: json!(3), + }), + ); + // A successful response + one( + r#"{"jsonrpc": "2.0", "result": null, "id": 3}"#, + &Message::Response(Response { + jsonrpc: Version, + result: Ok(Value::Null), + id: json!(3), + }), + ); + // An error + one( + r#"{"jsonrpc": "2.0", "error": {"code": 42, "message": "Wrong!"}, "id": null}"#, + &Message::Response(Response { + jsonrpc: Version, + result: Err(RpcError::new(42, "Wrong!".to_owned(), None)), + id: Value::Null, + }), + ); + // A batch + one( + r#"[ + {"jsonrpc": "2.0", "method": "notif"}, + {"jsonrpc": "2.0", "method": "call", "id": 42} + ]"#, + &Message::Batch(vec![ + Message::Notification(Notification { + jsonrpc: Version, + method: "notif".to_owned(), + params: Value::Null, + }), + Message::Request(Request { + jsonrpc: Version, + method: "call".to_owned(), + params: Value::Null, + id: json!(42), + }), + ]), + ); + // Some handling of broken messages inside a batch + let parsed = from_str( + r#"[ + {"jsonrpc": "2.0", "method": "notif"}, + {"jsonrpc": "2.0", "method": "call", "id": 42}, + true + ]"#, + ) + .unwrap(); + assert_eq!( + Message::Batch(vec![ + Message::Notification(Notification { + jsonrpc: Version, + method: "notif".to_owned(), + params: Value::Null, + }), + Message::Request(Request { + jsonrpc: Version, + method: "call".to_owned(), + params: Value::Null, + id: json!(42), + }), + Message::UnmatchedSub(Value::Bool(true)), + ]), + parsed + ); + to_vec(&Message::UnmatchedSub(Value::Null)).unwrap_err(); + } + + /// A helper for the `broken` test. + /// + /// Check that the given JSON string parses, but is not recognized as a valid RPC message. + + /// Test things that are almost but not entirely JSONRPC are rejected + /// + /// The reject is done by returning it as Unmatched. + #[test] + #[allow(clippy::panic)] + fn broken() { + // A helper with one test + fn one(input: &str) { + let msg = from_str(input); + match msg { + Err(Broken::Unmatched(_)) => (), + _ => panic!("{input} recognized as an RPC message: {msg:?}!"), + } + } + + // Missing the version + one(r#"{"method": "notif"}"#); + // Wrong version + one(r#"{"jsonrpc": 2.0, "method": "notif"}"#); + // A response with both result and error + one(r#"{"jsonrpc": "2.0", "result": 42, "error": {"code": 42, "message": "!"}, "id": 1}"#); + // A response without an id + one(r#"{"jsonrpc": "2.0", "result": 42}"#); + // An extra field + one(r#"{"jsonrpc": "2.0", "method": "weird", "params": 42, "others": 43, "id": 2}"#); + // Something completely different + one(r#"{"x": [1, 2, 3]}"#); + + match from_str(r#"{]"#) { + Err(Broken::SyntaxError(_)) => (), + other => panic!("Something unexpected: {other:?}"), + }; + } + + /// Test some non-trivial aspects of the constructors + /// + /// This doesn't have a full coverage, because there's not much to actually test there. + /// Most of it is related to the ids. + #[test] + #[allow(clippy::panic)] + #[ignore] + fn constructors() { + let msg1 = Message::request("call".to_owned(), json!([1, 2, 3])); + let msg2 = Message::request("call".to_owned(), json!([1, 2, 3])); + // They differ, even when created with the same parameters + assert_ne!(msg1, msg2); + // And, specifically, they differ in the ID's + let (req1, req2) = if let (Message::Request(req1), Message::Request(req2)) = (msg1, msg2) { + assert_ne!(req1.id, req2.id); + assert!(req1.id.is_string()); + assert!(req2.id.is_string()); + (req1, req2) + } else { + panic!("Non-request received"); + }; + let id1 = req1.id.clone(); + // When we answer a message, we get the same ID + if let Message::Response(ref resp) = req1.reply(json!([1, 2, 3])) { + assert_eq!( + *resp, + Response { + jsonrpc: Version, + result: Ok(json!([1, 2, 3])), + id: id1 + } + ); + } else { + panic!("Not a response"); + } + let id2 = req2.id.clone(); + // The same with an error + if let Message::Response(ref resp) = + req2.error(RpcError::new(42, "Wrong!".to_owned(), None)) + { + assert_eq!( + *resp, + Response { + jsonrpc: Version, + result: Err(RpcError::new(42, "Wrong!".to_owned(), None)), + id: id2, + } + ); + } else { + panic!("Not a response"); + } + // When we have unmatched, we generate a top-level error with Null id. + if let Message::Response(ref resp) = + Message::error(RpcError::new(43, "Also wrong!".to_owned(), None)) + { + assert_eq!( + *resp, + Response { + jsonrpc: Version, + result: Err(RpcError::new(43, "Also wrong!".to_owned(), None)), + id: Value::Null, + } + ); + } else { + panic!("Not a response"); + } + } +} diff --git a/rpc_primitives/src/parser.rs b/rpc_primitives/src/parser.rs new file mode 100644 index 0000000..fd50ea2 --- /dev/null +++ b/rpc_primitives/src/parser.rs @@ -0,0 +1,27 @@ +use serde::de::DeserializeOwned; +use serde_json::Value; + +use crate::errors::RpcParseError; + +pub trait RpcRequest: Sized { + fn parse(value: Option) -> Result; +} + +pub fn parse_params(value: Option) -> Result { + if let Some(value) = value { + serde_json::from_value(value) + .map_err(|err| RpcParseError(format!("Failed parsing args: {err}"))) + } else { + Err(RpcParseError("Require at least one parameter".to_owned())) + } +} +#[macro_export] +macro_rules! parse_request { + ($request_name:ty) => { + impl RpcRequest for $request_name { + fn parse(value: Option) -> Result { + parse_params::(value) + } + } + }; +} diff --git a/sequencer_rpc/Cargo.toml b/sequencer_rpc/Cargo.toml index e56b40f..b15efe3 100644 --- a/sequencer_rpc/Cargo.toml +++ b/sequencer_rpc/Cargo.toml @@ -9,6 +9,9 @@ serde_json.workspace = true env_logger.workspace = true log.workspace = true serde.workspace = true +actix.workspace = true +actix-cors.workspace = true +futures.workspace = true actix-web.workspace = true @@ -22,4 +25,7 @@ path = "../consensus" path = "../networking" [dependencies.sequencer_core] -path = "../sequencer_core" \ No newline at end of file +path = "../sequencer_core" + +[dependencies.rpc_primitives] +path = "../rpc_primitives" \ No newline at end of file diff --git a/sequencer_rpc/src/lib.rs b/sequencer_rpc/src/lib.rs index 5f44417..71a3b8f 100644 --- a/sequencer_rpc/src/lib.rs +++ b/sequencer_rpc/src/lib.rs @@ -1 +1,39 @@ -//ToDo: Add sequencer_rpc module \ No newline at end of file +pub mod net_utils; +pub mod process; +pub mod types; + +use rpc_primitives::{ + errors::{RpcError, RpcErrorKind}, + RpcPollingConfig, +}; +use serde::Serialize; +use serde_json::Value; + +pub use net_utils::*; + +use self::types::err_rpc::RpcErr; + +//ToDo: Add necessary fields +pub struct JsonHandler { + pub polling_config: RpcPollingConfig, +} + +fn respond(val: T) -> Result { + Ok(serde_json::to_value(val)?) +} + +pub fn rpc_error_responce_inverter(err: RpcError) -> RpcError { + let mut content: Option = None; + if err.error_struct.is_some() { + content = match err.error_struct.clone().unwrap() { + RpcErrorKind::HandlerError(val) | RpcErrorKind::InternalError(val) => Some(val), + RpcErrorKind::RequestValidationError(vall) => Some(serde_json::to_value(vall).unwrap()), + }; + } + RpcError { + error_struct: None, + code: err.code, + message: err.message, + data: content, + } +} diff --git a/sequencer_rpc/src/net_utils.rs b/sequencer_rpc/src/net_utils.rs new file mode 100644 index 0000000..35c46ed --- /dev/null +++ b/sequencer_rpc/src/net_utils.rs @@ -0,0 +1,64 @@ +use std::io; + +use actix_cors::Cors; +use actix_web::{http, middleware, web, App, Error as HttpError, HttpResponse, HttpServer}; +use futures::Future; +use futures::FutureExt; +use log::info; + +use rpc_primitives::message::Message; +use rpc_primitives::RpcConfig; + +use super::JsonHandler; + +pub const SHUTDOWN_TIMEOUT_SECS: u64 = 10; + +fn rpc_handler( + message: web::Json, + handler: web::Data, +) -> impl Future> { + let response = async move { + let message = handler.process(message.0).await?; + Ok(HttpResponse::Ok().json(&message)) + }; + response.boxed() +} + +fn get_cors(cors_allowed_origins: &[String]) -> Cors { + let mut cors = Cors::permissive(); + if cors_allowed_origins != ["*".to_string()] { + for origin in cors_allowed_origins { + cors = cors.allowed_origin(origin); + } + } + cors.allowed_methods(vec!["GET", "POST"]) + .allowed_headers(vec![http::header::AUTHORIZATION, http::header::ACCEPT]) + .allowed_header(http::header::CONTENT_TYPE) + .max_age(3600) +} + +#[allow(clippy::too_many_arguments)] +pub fn new_http_server(config: RpcConfig) -> io::Result { + let RpcConfig { + addr, + cors_allowed_origins, + polling_config, + limits_config, + } = config; + info!(target:"network", "Starting http server at {}", addr); + let handler = web::Data::new(JsonHandler { polling_config }); + + // HTTP server + Ok(HttpServer::new(move || { + App::new() + .wrap(get_cors(&cors_allowed_origins)) + .app_data(handler.clone()) + .app_data(web::JsonConfig::default().limit(limits_config.json_payload_max_size)) + .wrap(middleware::Logger::default()) + .service(web::resource("/").route(web::post().to(rpc_handler))) + }) + .bind(addr)? + .shutdown_timeout(SHUTDOWN_TIMEOUT_SECS) + .disable_signals() + .run()) +} diff --git a/sequencer_rpc/src/process.rs b/sequencer_rpc/src/process.rs new file mode 100644 index 0000000..dd6ebdf --- /dev/null +++ b/sequencer_rpc/src/process.rs @@ -0,0 +1,53 @@ +use actix_web::Error as HttpError; +use serde_json::Value; + +use rpc_primitives::{ + errors::RpcError, + message::{Message, Request}, + parser::RpcRequest, +}; + +use crate::{ + rpc_error_responce_inverter, + types::rpc_structs::{HelloRequest, HelloResponse}, +}; + +use super::{respond, types::err_rpc::RpcErr, JsonHandler}; + +impl JsonHandler { + pub async fn process(&self, message: Message) -> Result { + let id = message.id(); + if let Message::Request(request) = message { + let message_inner = self + .process_request_internal(request) + .await + .map_err(|e| e.0) + .map_err(rpc_error_responce_inverter); + Ok(Message::response(id, message_inner)) + } else { + Ok(Message::error(RpcError::parse_error( + "JSON RPC Request format was expected".to_owned(), + ))) + } + } + + #[allow(clippy::unused_async)] + ///Example of request processing + async fn process_temp_hello(&self, request: Request) -> Result { + let _hello_request = HelloRequest::parse(Some(request.params))?; + + let helperstruct = HelloResponse { + greeting: "HELLO_FROM_SEQUENCER".to_string(), + }; + + respond(helperstruct) + } + + pub async fn process_request_internal(&self, request: Request) -> Result { + match request.method.as_ref() { + //Todo : Add handling of more JSON RPC methods + "hello" => self.process_temp_hello(request).await, + _ => Err(RpcErr(RpcError::method_not_found(request.method))), + } + } +} diff --git a/sequencer_rpc/src/types/err_rpc.rs b/sequencer_rpc/src/types/err_rpc.rs new file mode 100644 index 0000000..f5998d2 --- /dev/null +++ b/sequencer_rpc/src/types/err_rpc.rs @@ -0,0 +1,47 @@ +use log::debug; + +use rpc_primitives::errors::{RpcError, RpcParseError}; + +pub struct RpcErr(pub RpcError); + +pub type RpcErrInternal = anyhow::Error; + +pub trait RpcErrKind: 'static { + fn into_rpc_err(self) -> RpcError; +} + +impl From for RpcErr { + fn from(e: T) -> Self { + Self(e.into_rpc_err()) + } +} + +macro_rules! standard_rpc_err_kind { + ($type_name:path) => { + impl RpcErrKind for $type_name { + fn into_rpc_err(self) -> RpcError { + self.into() + } + } + }; +} +standard_rpc_err_kind!(RpcError); +standard_rpc_err_kind!(RpcParseError); + +impl RpcErrKind for serde_json::Error { + fn into_rpc_err(self) -> RpcError { + RpcError::serialization_error(&self.to_string()) + } +} + +impl RpcErrKind for RpcErrInternal { + fn into_rpc_err(self) -> RpcError { + RpcError::new_internal_error(None, &format!("{self:#?}")) + } +} + +#[allow(clippy::needless_pass_by_value)] +pub fn from_rpc_err_into_anyhow_err(rpc_err: RpcError) -> anyhow::Error { + debug!("Rpc error cast to anyhow error : err {rpc_err:?}"); + anyhow::anyhow!(format!("{rpc_err:#?}")) +} diff --git a/sequencer_rpc/src/types/mod.rs b/sequencer_rpc/src/types/mod.rs new file mode 100644 index 0000000..6084ddd --- /dev/null +++ b/sequencer_rpc/src/types/mod.rs @@ -0,0 +1,3 @@ +pub mod err_rpc; +pub mod parse; +pub mod rpc_structs; diff --git a/sequencer_rpc/src/types/parse.rs b/sequencer_rpc/src/types/parse.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/sequencer_rpc/src/types/parse.rs @@ -0,0 +1 @@ + diff --git a/sequencer_rpc/src/types/rpc_structs.rs b/sequencer_rpc/src/types/rpc_structs.rs new file mode 100644 index 0000000..4d7a2c4 --- /dev/null +++ b/sequencer_rpc/src/types/rpc_structs.rs @@ -0,0 +1,16 @@ +use rpc_primitives::errors::RpcParseError; +use rpc_primitives::parse_request; +use rpc_primitives::parser::parse_params; +use rpc_primitives::parser::RpcRequest; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Serialize, Deserialize, Debug)] +pub struct HelloRequest {} + +parse_request!(HelloRequest); + +#[derive(Serialize, Deserialize, Debug)] +pub struct HelloResponse { + pub greeting: String, +} diff --git a/sequencer_runner/Cargo.toml b/sequencer_runner/Cargo.toml index 6356254..f2d5518 100644 --- a/sequencer_runner/Cargo.toml +++ b/sequencer_runner/Cargo.toml @@ -9,6 +9,7 @@ serde_json.workspace = true env_logger.workspace = true log.workspace = true serde.workspace = true +actix.workspace = true actix-web.workspace = true tokio.workspace = true @@ -26,4 +27,7 @@ path = "../networking" path = "../sequencer_rpc" [dependencies.sequencer_core] -path = "../sequencer_core" \ No newline at end of file +path = "../sequencer_core" + +[dependencies.rpc_primitives] +path = "../rpc_primitives" \ No newline at end of file diff --git a/sequencer_runner/src/lib.rs b/sequencer_runner/src/lib.rs new file mode 100644 index 0000000..181b8cf --- /dev/null +++ b/sequencer_runner/src/lib.rs @@ -0,0 +1,17 @@ +use anyhow::Result; +use log::info; +use rpc_primitives::RpcConfig; +use sequencer_rpc::new_http_server; + +pub async fn main_runner() -> Result<()> { + env_logger::init(); + + let http_server = new_http_server(RpcConfig::default())?; + info!("HTTP server started"); + let _http_server_handle = http_server.handle(); + tokio::spawn(http_server); + + loop { + //ToDo: Insert activity into main loop + } +} diff --git a/sequencer_runner/src/main.rs b/sequencer_runner/src/main.rs index 627f016..9de8835 100644 --- a/sequencer_runner/src/main.rs +++ b/sequencer_runner/src/main.rs @@ -1,5 +1,16 @@ -//ToDo: Add sequencer_runner module +use anyhow::Result; -fn main() { - println!("Hello, world!"); +use sequencer_runner::main_runner; + +pub const NUM_THREADS: usize = 4; + +fn main() -> Result<()> { + actix::System::with_tokio_rt(|| { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(NUM_THREADS) + .enable_all() + .build() + .unwrap() + }) + .block_on(main_runner()) } diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 83188f9..ee5bbfa 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -1 +1 @@ -//ToDo: Add storage module \ No newline at end of file +//ToDo: Add storage module diff --git a/utxo/src/lib.rs b/utxo/src/lib.rs index c516dd1..08193b8 100644 --- a/utxo/src/lib.rs +++ b/utxo/src/lib.rs @@ -1 +1 @@ -//ToDo: Add utxo module \ No newline at end of file +//ToDo: Add utxo module diff --git a/zkvm/src/lib.rs b/zkvm/src/lib.rs index 2ef34a8..3216739 100644 --- a/zkvm/src/lib.rs +++ b/zkvm/src/lib.rs @@ -1 +1 @@ -//ToDo: Add zkvm module \ No newline at end of file +//ToDo: Add zkvm module From b6c3556f11c416aff688544f3859993492f374f2 Mon Sep 17 00:00:00 2001 From: Oleksandr Pravdyvyi Date: Wed, 2 Oct 2024 14:46:20 +0300 Subject: [PATCH 2/2] fix: Cargo.lock added --- Cargo.lock | 158 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 158 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 904c394..71f4600 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,6 +13,31 @@ dependencies = [ "serde_json", ] +[[package]] +name = "actix" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de7fa236829ba0841304542f7614c42b80fca007455315c45c785ccfa873a85b" +dependencies = [ + "actix-macros", + "actix-rt", + "actix_derive", + "bitflags 2.6.0", + "bytes", + "crossbeam-channel", + "futures-core", + "futures-sink", + "futures-task", + "futures-util", + "log", + "once_cell", + "parking_lot", + "pin-project-lite", + "smallvec", + "tokio", + "tokio-util", +] + [[package]] name = "actix-codec" version = "0.5.2" @@ -30,6 +55,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "actix-cors" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0346d8c1f762b41b458ed3145eea914966bb9ad20b9be0d6d463b20d45586370" +dependencies = [ + "actix-utils", + "actix-web", + "derive_more", + "futures-util", + "log", + "once_cell", + "smallvec", +] + [[package]] name = "actix-http" version = "3.9.0" @@ -66,6 +106,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "actix-macros" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "actix-router" version = "0.5.3" @@ -166,6 +216,17 @@ dependencies = [ "url", ] +[[package]] +name = "actix_derive" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6ac1e58cded18cb28ddc17143c4dea5345b3ad575e14f32f66e4054a56eb271" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "addr2line" version = "0.24.1" @@ -385,6 +446,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" @@ -470,12 +546,65 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -494,10 +623,16 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -815,13 +950,17 @@ name = "node_rpc" version = "0.1.0" dependencies = [ "accounts", + "actix", + "actix-cors", "actix-web", "anyhow", "consensus", "env_logger", + "futures", "log", "networking", "node_core", + "rpc_primitives", "serde", "serde_json", "storage", @@ -835,6 +974,7 @@ name = "node_runner" version = "0.1.0" dependencies = [ "accounts", + "actix", "actix-web", "anyhow", "consensus", @@ -843,6 +983,7 @@ dependencies = [ "networking", "node_core", "node_rpc", + "rpc_primitives", "serde", "serde_json", "storage", @@ -1078,6 +1219,17 @@ dependencies = [ "librocksdb-sys", ] +[[package]] +name = "rpc_primitives" +version = "0.1.0" +dependencies = [ + "anyhow", + "env_logger", + "log", + "serde", + "serde_json", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -1132,13 +1284,17 @@ dependencies = [ name = "sequencer_rpc" version = "0.1.0" dependencies = [ + "actix", + "actix-cors", "actix-web", "anyhow", "consensus", "env_logger", + "futures", "log", "mempool", "networking", + "rpc_primitives", "sequencer_core", "serde", "serde_json", @@ -1148,6 +1304,7 @@ dependencies = [ name = "sequencer_runner" version = "0.1.0" dependencies = [ + "actix", "actix-web", "anyhow", "consensus", @@ -1155,6 +1312,7 @@ dependencies = [ "log", "mempool", "networking", + "rpc_primitives", "sequencer_core", "sequencer_rpc", "serde",