From 4981c724af41c831ce1e4f02232035810f4374f0 Mon Sep 17 00:00:00 2001 From: gusto Date: Mon, 6 Mar 2023 15:19:27 +0200 Subject: [PATCH] Http status codes (#88) * Include status codes in http error responses * Mockpool bridges error handling * Last TX in milliseconds in mempool metrics * u64 for last tx metrics --- .gitignore | 1 + config.yml.example | 1 + nodes/mockpool-node/Cargo.toml | 2 + nodes/mockpool-node/src/bridges.rs | 220 +++++++++++------- nomos-services/http/Cargo.toml | 1 + nomos-services/http/examples/axum.rs | 2 +- nomos-services/http/examples/graphql.rs | 2 +- nomos-services/http/src/backends/axum.rs | 12 +- nomos-services/http/src/http.rs | 5 +- .../mempool/src/backend/mockpool.rs | 14 +- nomos-services/mempool/src/backend/mod.rs | 1 + nomos-services/mempool/src/lib.rs | 2 + .../metrics/src/frontend/graphql/mod.rs | 2 +- 13 files changed, 177 insertions(+), 88 deletions(-) diff --git a/.gitignore b/.gitignore index 1bdce877..7fe3190a 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ Cargo.lock # These are backup files generated by rustfmt **/*.rs.bk config.yml +store.* diff --git a/config.yml.example b/config.yml.example index 99001893..7ff030c8 100644 --- a/config.yml.example +++ b/config.yml.example @@ -10,6 +10,7 @@ network: nodeKey: null discV5BootstrapNodes: [] initial_peers: [] + relayTopics: [] http: backend: address: 0.0.0.0:8080 diff --git a/nodes/mockpool-node/Cargo.toml b/nodes/mockpool-node/Cargo.toml index 6d080eeb..fdaed7e2 100644 --- a/nodes/mockpool-node/Cargo.toml +++ b/nodes/mockpool-node/Cargo.toml @@ -8,9 +8,11 @@ edition = "2021" [dependencies] blake2 = "0.10" bincode = "2.0.0-rc.2" +bytes = "1.3" clap = { version = "4", features = ["derive"] } chrono = "0.4" futures = "0.3" +http = "0.2.9" overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" } tracing = "0.1" diff --git a/nodes/mockpool-node/src/bridges.rs b/nodes/mockpool-node/src/bridges.rs index 3c42eabb..70c584a9 100644 --- a/nodes/mockpool-node/src/bridges.rs +++ b/nodes/mockpool-node/src/bridges.rs @@ -1,7 +1,10 @@ // std // crates +use bytes::Bytes; +use http::StatusCode; +use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; -use tracing::debug; +use tracing::error; // internal use crate::tx::{Tx, TxId}; use futures::future::join_all; @@ -9,7 +12,7 @@ use multiaddr::Multiaddr; use nomos_core::wire; use nomos_http::backends::axum::AxumBackend; use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; -use nomos_http::http::{HttpMethod, HttpRequest}; +use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse}; use nomos_mempool::backend::mockpool::MockPool; use nomos_mempool::network::adapters::waku::{ WakuAdapter, WAKU_CARNOT_PUB_SUB_TOPIC, WAKU_CARNOT_TX_CONTENT_TOPIC, @@ -35,19 +38,9 @@ pub fn mempool_metrics_bridge( .unwrap(); while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await { - let (sender, receiver) = oneshot::channel(); - mempool_channel - .send(MempoolMsg::Metrics { - reply_channel: sender, - }) - .await - .unwrap(); - let metrics: MempoolMetrics = receiver.await.unwrap(); - res_tx - // TODO: use serde to serialize metrics - .send(format!("{{\"pending_tx\": {}}}", metrics.pending_txs).into()) - .await - .unwrap(); + if let Err(e) = handle_metrics_req(&mempool_channel, res_tx).await { + error!(e); + } } Ok(()) })) @@ -68,38 +61,13 @@ pub fn mempool_add_tx_bridge( ) .await .unwrap(); + while let Some(HttpRequest { res_tx, payload, .. }) = http_request_channel.recv().await { - if let Some(data) = payload - .as_ref() - .and_then(|b| String::from_utf8(b.to_vec()).ok()) - { - let tx = Tx(data); - let (sender, receiver) = oneshot::channel(); - mempool_channel - .send(MempoolMsg::AddTx { - tx: tx.clone(), - reply_channel: sender, - }) - .await - .unwrap(); - if let Ok(()) = receiver.await.unwrap() { - // broadcast transaction to peers - let network_relay = handle - .relay::>() - .connect() - .await - .unwrap(); - send_transaction(network_relay, tx).await; - res_tx.send(b"".to_vec().into()).await.unwrap(); - } - } else { - debug!( - "Invalid payload, {:?}. Empty or couldn't transform into a utf8 String", - payload - ); + if let Err(e) = handle_add_tx_req(&handle, &mempool_channel, res_tx, payload).await { + error!(e); } } @@ -120,22 +88,9 @@ pub fn waku_info_bridge( .unwrap(); while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await { - let (sender, receiver) = oneshot::channel(); - waku_channel - .send(NetworkMsg::Process(WakuBackendMessage::Info { - reply_channel: sender, - })) - .await - .unwrap(); - let waku_info: WakuInfo = receiver.await.unwrap(); - res_tx - .send( - serde_json::to_vec(&waku_info) - .expect("Serializing of waku info message should not fail") - .into(), - ) - .await - .unwrap(); + if let Err(e) = handle_info_req(&waku_channel, &res_tx).await { + error!(e); + } } Ok(()) })) @@ -157,31 +112,136 @@ pub fn waku_add_conn_bridge( res_tx, payload, .. }) = http_request_channel.recv().await { - if let Some(payload) = payload { - if let Ok(addrs) = serde_json::from_slice::>(&payload) { - let reqs: Vec<_> = addrs - .into_iter() - .map(|addr| { - waku_channel.send(NetworkMsg::Process( - WakuBackendMessage::ConnectPeer { addr }, - )) - }) - .collect(); - - join_all(reqs).await; - } - res_tx.send(b"".to_vec().into()).await.unwrap(); - } else { - debug!("Invalid payload, {:?}. Should not be empty", payload); + if let Err(e) = handle_add_conn_req(&waku_channel, res_tx, payload).await { + error!(e); } } Ok(()) })) } -async fn send_transaction(network_relay: OutboundRelay>, tx: Tx) { +async fn handle_metrics_req( + mempool_channel: &OutboundRelay>, + res_tx: Sender, +) -> Result<(), overwatch_rs::DynError> { + let (sender, receiver) = oneshot::channel(); + mempool_channel + .send(MempoolMsg::Metrics { + reply_channel: sender, + }) + .await + .map_err(|(e, _)| e)?; + + let metrics: MempoolMetrics = receiver.await.unwrap(); + res_tx + // TODO: use serde to serialize metrics + .send(Ok(format!( + "{{\"pending_tx\": {}, \"last_tx\": {}}}", + metrics.pending_txs, metrics.last_tx_timestamp + ) + .into())) + .await?; + + Ok(()) +} + +async fn handle_add_tx_req( + handle: &overwatch_rs::overwatch::handle::OverwatchHandle, + mempool_channel: &OutboundRelay>, + res_tx: Sender, + payload: Option, +) -> Result<(), overwatch_rs::DynError> { + if let Some(data) = payload + .as_ref() + .and_then(|b| String::from_utf8(b.to_vec()).ok()) + { + let tx = Tx(data); + let (sender, receiver) = oneshot::channel(); + mempool_channel + .send(MempoolMsg::AddTx { + tx: tx.clone(), + reply_channel: sender, + }) + .await + .map_err(|(e, _)| e)?; + + match receiver.await { + Ok(Ok(())) => { + // broadcast transaction to peers + let network_relay = handle.relay::>().connect().await?; + send_transaction(network_relay, tx).await?; + Ok(res_tx.send(Ok(b"".to_vec().into())).await?) + } + Ok(Err(())) => Ok(res_tx + .send(Err(( + StatusCode::CONFLICT, + "error: unable to add tx".into(), + ))) + .await?), + Err(err) => Ok(res_tx + .send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))) + .await?), + } + } else { + Err( + format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String") + .into(), + ) + } +} + +async fn handle_info_req( + waku_channel: &OutboundRelay>, + res_tx: &Sender, +) -> Result<(), overwatch_rs::DynError> { + let (sender, receiver) = oneshot::channel(); + waku_channel + .send(NetworkMsg::Process(WakuBackendMessage::Info { + reply_channel: sender, + })) + .await + .map_err(|(e, _)| e)?; + let waku_info: WakuInfo = receiver.await.unwrap(); + res_tx + .send(Ok(serde_json::to_vec(&waku_info)?.into())) + .await?; + + Ok(()) +} + +async fn handle_add_conn_req( + waku_channel: &OutboundRelay>, + res_tx: Sender, + payload: Option, +) -> Result<(), overwatch_rs::DynError> { + if let Some(payload) = payload { + if let Ok(addrs) = serde_json::from_slice::>(&payload) { + let reqs: Vec<_> = addrs + .into_iter() + .map(|addr| { + waku_channel.send(NetworkMsg::Process(WakuBackendMessage::ConnectPeer { + addr, + })) + }) + .collect(); + + join_all(reqs).await; + } + Ok(res_tx.send(Ok(b"".to_vec().into())).await?) + } else { + Err( + format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String") + .into(), + ) + } +} + +async fn send_transaction( + network_relay: OutboundRelay>, + tx: Tx, +) -> Result<(), overwatch_rs::DynError> { let payload = wire::serialize(&tx).expect("Tx serialization failed"); - if let Err((_, _e)) = network_relay + network_relay .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { message: WakuMessage::new( payload, @@ -192,7 +252,7 @@ async fn send_transaction(network_relay: OutboundRelay>, tx: Tx topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), })) .await - { - todo!("log error"); - }; + .map_err(|(e, _)| e)?; + + Ok(()) } diff --git a/nomos-services/http/Cargo.toml b/nomos-services/http/Cargo.toml index 14aea1d3..b6f8fd6c 100644 --- a/nomos-services/http/Cargo.toml +++ b/nomos-services/http/Cargo.toml @@ -23,6 +23,7 @@ async-graphql = { version = "=5.0.5", optional = true } bytes = "1.3" clap = { version = "4", features = ["derive", "env"], optional = true } futures = "0.3" +http = "0.2.9" hyper = { version = "0.14", optional = true } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" } diff --git a/nomos-services/http/examples/axum.rs b/nomos-services/http/examples/axum.rs index cd5f20a0..82583577 100644 --- a/nomos-services/http/examples/axum.rs +++ b/nomos-services/http/examples/axum.rs @@ -94,7 +94,7 @@ where .unwrap(); let value = receiver.await.unwrap(); res_tx - .send(format!("Hello, world! {value}").into()) + .send(Ok(format!("Hello, world! {value}").into())) .await .unwrap(); } diff --git a/nomos-services/http/examples/graphql.rs b/nomos-services/http/examples/graphql.rs index 6dd72bff..9b9aca26 100644 --- a/nomos-services/http/examples/graphql.rs +++ b/nomos-services/http/examples/graphql.rs @@ -71,7 +71,7 @@ where } }; - res_tx.send(res.into()).await.unwrap(); + res_tx.send(Ok(res.into())).await.unwrap(); } Ok(()) })) diff --git a/nomos-services/http/src/backends/axum.rs b/nomos-services/http/src/backends/axum.rs index b9548c14..4b103c67 100644 --- a/nomos-services/http/src/backends/axum.rs +++ b/nomos-services/http/src/backends/axum.rs @@ -9,6 +9,7 @@ use axum::{ routing::{get, patch, post, put}, Router, }; +use http::StatusCode; use hyper::{ header::{CONTENT_TYPE, USER_AGENT}, Body, Request, @@ -145,7 +146,7 @@ async fn handle_req( req_stream: Sender, query: HashMap, payload: Option, -) -> Result { +) -> Result { let (tx, mut rx) = tokio::sync::mpsc::channel(1); match req_stream .send(HttpRequest { @@ -155,7 +156,12 @@ async fn handle_req( }) .await { - Ok(_) => rx.recv().await.ok_or_else(|| "".into()), - Err(e) => Err(AxumBackendError::SendError(e).to_string()), + Ok(_) => rx.recv().await.ok_or_else(|| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + "closed response channel".into(), + ) + })?, + Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), } } diff --git a/nomos-services/http/src/http.rs b/nomos-services/http/src/http.rs index f2619350..0d0f54f8 100644 --- a/nomos-services/http/src/http.rs +++ b/nomos-services/http/src/http.rs @@ -7,6 +7,7 @@ use std::{ }; // crates use bytes::Bytes; +use http::StatusCode; use overwatch_rs::services::{ handle::ServiceStateHandle, relay::{InboundRelay, OutboundRelay, RelayMessage}, @@ -61,11 +62,13 @@ impl core::fmt::Debug for Route { } } +pub type HttpResponse = Result; + #[derive(Debug, Clone)] pub struct HttpRequest { pub query: HashMap, pub payload: Option, - pub res_tx: Sender, + pub res_tx: Sender, } // HttpMsg is a message that is sent via the relay to communicate with diff --git a/nomos-services/mempool/src/backend/mockpool.rs b/nomos-services/mempool/src/backend/mockpool.rs index 34822e4e..0ea679da 100644 --- a/nomos-services/mempool/src/backend/mockpool.rs +++ b/nomos-services/mempool/src/backend/mockpool.rs @@ -1,7 +1,8 @@ // std use linked_hash_map::LinkedHashMap; -use std::collections::BTreeMap; use std::hash::Hash; +use std::time::SystemTime; +use std::{collections::BTreeMap, time::UNIX_EPOCH}; // crates // internal use crate::backend::{MemPool, MempoolError}; @@ -12,6 +13,7 @@ pub struct MockPool { pending_txs: LinkedHashMap, in_block_txs: BTreeMap>, in_block_txs_by_id: BTreeMap, + last_tx_timestamp: u64, } impl Default for MockPool @@ -23,6 +25,7 @@ where pending_txs: LinkedHashMap::new(), in_block_txs: BTreeMap::new(), in_block_txs_by_id: BTreeMap::new(), + last_tx_timestamp: 0, } } } @@ -55,6 +58,11 @@ where return Err(MempoolError::ExistingTx); } self.pending_txs.insert(id, tx); + self.last_tx_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + Ok(()) } @@ -87,4 +95,8 @@ where fn pending_tx_count(&self) -> usize { self.pending_txs.len() } + + fn last_tx_timestamp(&self) -> u64 { + self.last_tx_timestamp + } } diff --git a/nomos-services/mempool/src/backend/mod.rs b/nomos-services/mempool/src/backend/mod.rs index 8a376835..6ec1a06c 100644 --- a/nomos-services/mempool/src/backend/mod.rs +++ b/nomos-services/mempool/src/backend/mod.rs @@ -37,4 +37,5 @@ pub trait MemPool { fn prune(&mut self, txs: &[Self::Id]); fn pending_tx_count(&self) -> usize; + fn last_tx_timestamp(&self) -> u64; } diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index ffb86025..359dbeed 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -34,6 +34,7 @@ where pub struct MempoolMetrics { pub pending_txs: usize, + pub last_tx_timestamp: u64, } pub enum MempoolMsg { @@ -155,6 +156,7 @@ where MempoolMsg::Metrics { reply_channel } => { let metrics = MempoolMetrics { pending_txs: pool.pending_tx_count(), + last_tx_timestamp: pool.last_tx_timestamp(), }; reply_channel.send(metrics).unwrap_or_else(|_| { tracing::debug!("could not send back mempool metrics") diff --git a/nomos-services/metrics/src/frontend/graphql/mod.rs b/nomos-services/metrics/src/frontend/graphql/mod.rs index 8b7461ed..39ffd60f 100644 --- a/nomos-services/metrics/src/frontend/graphql/mod.rs +++ b/nomos-services/metrics/src/frontend/graphql/mod.rs @@ -73,7 +73,7 @@ where } }; - res_tx.send(res.into()).await.unwrap(); + res_tx.send(Ok(res.into())).await.unwrap(); } Ok(()) }))