Http status codes (#88)

* Include status codes in http error responses

* Mockpool bridges error handling

* Last TX in milliseconds in mempool metrics

* u64 for last tx metrics
This commit is contained in:
gusto 2023-03-06 15:19:27 +02:00 committed by GitHub
parent 42ea8f9be3
commit 4981c724af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 177 additions and 88 deletions

1
.gitignore vendored
View File

@ -9,3 +9,4 @@ Cargo.lock
# These are backup files generated by rustfmt
**/*.rs.bk
config.yml
store.*

View File

@ -10,6 +10,7 @@ network:
nodeKey: null
discV5BootstrapNodes: []
initial_peers: []
relayTopics: []
http:
backend:
address: 0.0.0.0:8080

View File

@ -8,9 +8,11 @@ edition = "2021"
[dependencies]
blake2 = "0.10"
bincode = "2.0.0-rc.2"
bytes = "1.3"
clap = { version = "4", features = ["derive"] }
chrono = "0.4"
futures = "0.3"
http = "0.2.9"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
tracing = "0.1"

View File

@ -1,7 +1,10 @@
// std
// crates
use bytes::Bytes;
use http::StatusCode;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tracing::debug;
use tracing::error;
// internal
use crate::tx::{Tx, TxId};
use futures::future::join_all;
@ -9,7 +12,7 @@ use multiaddr::Multiaddr;
use nomos_core::wire;
use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
use nomos_http::http::{HttpMethod, HttpRequest};
use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse};
use nomos_mempool::backend::mockpool::MockPool;
use nomos_mempool::network::adapters::waku::{
WakuAdapter, WAKU_CARNOT_PUB_SUB_TOPIC, WAKU_CARNOT_TX_CONTENT_TOPIC,
@ -35,19 +38,9 @@ pub fn mempool_metrics_bridge(
.unwrap();
while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await {
let (sender, receiver) = oneshot::channel();
mempool_channel
.send(MempoolMsg::Metrics {
reply_channel: sender,
})
.await
.unwrap();
let metrics: MempoolMetrics = receiver.await.unwrap();
res_tx
// TODO: use serde to serialize metrics
.send(format!("{{\"pending_tx\": {}}}", metrics.pending_txs).into())
.await
.unwrap();
if let Err(e) = handle_metrics_req(&mempool_channel, res_tx).await {
error!(e);
}
}
Ok(())
}))
@ -68,38 +61,13 @@ pub fn mempool_add_tx_bridge(
)
.await
.unwrap();
while let Some(HttpRequest {
res_tx, payload, ..
}) = http_request_channel.recv().await
{
if let Some(data) = payload
.as_ref()
.and_then(|b| String::from_utf8(b.to_vec()).ok())
{
let tx = Tx(data);
let (sender, receiver) = oneshot::channel();
mempool_channel
.send(MempoolMsg::AddTx {
tx: tx.clone(),
reply_channel: sender,
})
.await
.unwrap();
if let Ok(()) = receiver.await.unwrap() {
// broadcast transaction to peers
let network_relay = handle
.relay::<NetworkService<Waku>>()
.connect()
.await
.unwrap();
send_transaction(network_relay, tx).await;
res_tx.send(b"".to_vec().into()).await.unwrap();
}
} else {
debug!(
"Invalid payload, {:?}. Empty or couldn't transform into a utf8 String",
payload
);
if let Err(e) = handle_add_tx_req(&handle, &mempool_channel, res_tx, payload).await {
error!(e);
}
}
@ -120,22 +88,9 @@ pub fn waku_info_bridge(
.unwrap();
while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await {
let (sender, receiver) = oneshot::channel();
waku_channel
.send(NetworkMsg::Process(WakuBackendMessage::Info {
reply_channel: sender,
}))
.await
.unwrap();
let waku_info: WakuInfo = receiver.await.unwrap();
res_tx
.send(
serde_json::to_vec(&waku_info)
.expect("Serializing of waku info message should not fail")
.into(),
)
.await
.unwrap();
if let Err(e) = handle_info_req(&waku_channel, &res_tx).await {
error!(e);
}
}
Ok(())
}))
@ -157,31 +112,136 @@ pub fn waku_add_conn_bridge(
res_tx, payload, ..
}) = http_request_channel.recv().await
{
if let Some(payload) = payload {
if let Ok(addrs) = serde_json::from_slice::<Vec<Multiaddr>>(&payload) {
let reqs: Vec<_> = addrs
.into_iter()
.map(|addr| {
waku_channel.send(NetworkMsg::Process(
WakuBackendMessage::ConnectPeer { addr },
))
})
.collect();
join_all(reqs).await;
}
res_tx.send(b"".to_vec().into()).await.unwrap();
} else {
debug!("Invalid payload, {:?}. Should not be empty", payload);
if let Err(e) = handle_add_conn_req(&waku_channel, res_tx, payload).await {
error!(e);
}
}
Ok(())
}))
}
async fn send_transaction(network_relay: OutboundRelay<NetworkMsg<Waku>>, tx: Tx) {
async fn handle_metrics_req(
mempool_channel: &OutboundRelay<MempoolMsg<Tx, TxId>>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError> {
let (sender, receiver) = oneshot::channel();
mempool_channel
.send(MempoolMsg::Metrics {
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
let metrics: MempoolMetrics = receiver.await.unwrap();
res_tx
// TODO: use serde to serialize metrics
.send(Ok(format!(
"{{\"pending_tx\": {}, \"last_tx\": {}}}",
metrics.pending_txs, metrics.last_tx_timestamp
)
.into()))
.await?;
Ok(())
}
async fn handle_add_tx_req(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
mempool_channel: &OutboundRelay<MempoolMsg<Tx, TxId>>,
res_tx: Sender<HttpResponse>,
payload: Option<Bytes>,
) -> Result<(), overwatch_rs::DynError> {
if let Some(data) = payload
.as_ref()
.and_then(|b| String::from_utf8(b.to_vec()).ok())
{
let tx = Tx(data);
let (sender, receiver) = oneshot::channel();
mempool_channel
.send(MempoolMsg::AddTx {
tx: tx.clone(),
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
match receiver.await {
Ok(Ok(())) => {
// broadcast transaction to peers
let network_relay = handle.relay::<NetworkService<Waku>>().connect().await?;
send_transaction(network_relay, tx).await?;
Ok(res_tx.send(Ok(b"".to_vec().into())).await?)
}
Ok(Err(())) => Ok(res_tx
.send(Err((
StatusCode::CONFLICT,
"error: unable to add tx".into(),
)))
.await?),
Err(err) => Ok(res_tx
.send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())))
.await?),
}
} else {
Err(
format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String")
.into(),
)
}
}
async fn handle_info_req(
waku_channel: &OutboundRelay<NetworkMsg<Waku>>,
res_tx: &Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError> {
let (sender, receiver) = oneshot::channel();
waku_channel
.send(NetworkMsg::Process(WakuBackendMessage::Info {
reply_channel: sender,
}))
.await
.map_err(|(e, _)| e)?;
let waku_info: WakuInfo = receiver.await.unwrap();
res_tx
.send(Ok(serde_json::to_vec(&waku_info)?.into()))
.await?;
Ok(())
}
async fn handle_add_conn_req(
waku_channel: &OutboundRelay<NetworkMsg<Waku>>,
res_tx: Sender<HttpResponse>,
payload: Option<Bytes>,
) -> Result<(), overwatch_rs::DynError> {
if let Some(payload) = payload {
if let Ok(addrs) = serde_json::from_slice::<Vec<Multiaddr>>(&payload) {
let reqs: Vec<_> = addrs
.into_iter()
.map(|addr| {
waku_channel.send(NetworkMsg::Process(WakuBackendMessage::ConnectPeer {
addr,
}))
})
.collect();
join_all(reqs).await;
}
Ok(res_tx.send(Ok(b"".to_vec().into())).await?)
} else {
Err(
format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String")
.into(),
)
}
}
async fn send_transaction(
network_relay: OutboundRelay<NetworkMsg<Waku>>,
tx: Tx,
) -> Result<(), overwatch_rs::DynError> {
let payload = wire::serialize(&tx).expect("Tx serialization failed");
if let Err((_, _e)) = network_relay
network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message: WakuMessage::new(
payload,
@ -192,7 +252,7 @@ async fn send_transaction(network_relay: OutboundRelay<NetworkMsg<Waku>>, tx: Tx
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
}))
.await
{
todo!("log error");
};
.map_err(|(e, _)| e)?;
Ok(())
}

View File

@ -23,6 +23,7 @@ async-graphql = { version = "=5.0.5", optional = true }
bytes = "1.3"
clap = { version = "4", features = ["derive", "env"], optional = true }
futures = "0.3"
http = "0.2.9"
hyper = { version = "0.14", optional = true }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }

View File

@ -94,7 +94,7 @@ where
.unwrap();
let value = receiver.await.unwrap();
res_tx
.send(format!("Hello, world! {value}").into())
.send(Ok(format!("Hello, world! {value}").into()))
.await
.unwrap();
}

View File

@ -71,7 +71,7 @@ where
}
};
res_tx.send(res.into()).await.unwrap();
res_tx.send(Ok(res.into())).await.unwrap();
}
Ok(())
}))

View File

@ -9,6 +9,7 @@ use axum::{
routing::{get, patch, post, put},
Router,
};
use http::StatusCode;
use hyper::{
header::{CONTENT_TYPE, USER_AGENT},
Body, Request,
@ -145,7 +146,7 @@ async fn handle_req(
req_stream: Sender<HttpRequest>,
query: HashMap<String, String>,
payload: Option<Bytes>,
) -> Result<Bytes, String> {
) -> Result<Bytes, (StatusCode, String)> {
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
match req_stream
.send(HttpRequest {
@ -155,7 +156,12 @@ async fn handle_req(
})
.await
{
Ok(_) => rx.recv().await.ok_or_else(|| "".into()),
Err(e) => Err(AxumBackendError::SendError(e).to_string()),
Ok(_) => rx.recv().await.ok_or_else(|| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"closed response channel".into(),
)
})?,
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())),
}
}

View File

@ -7,6 +7,7 @@ use std::{
};
// crates
use bytes::Bytes;
use http::StatusCode;
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::{InboundRelay, OutboundRelay, RelayMessage},
@ -61,11 +62,13 @@ impl core::fmt::Debug for Route {
}
}
pub type HttpResponse = Result<Bytes, (StatusCode, String)>;
#[derive(Debug, Clone)]
pub struct HttpRequest {
pub query: HashMap<String, String>,
pub payload: Option<bytes::Bytes>,
pub res_tx: Sender<bytes::Bytes>,
pub res_tx: Sender<HttpResponse>,
}
// HttpMsg is a message that is sent via the relay to communicate with

View File

@ -1,7 +1,8 @@
// std
use linked_hash_map::LinkedHashMap;
use std::collections::BTreeMap;
use std::hash::Hash;
use std::time::SystemTime;
use std::{collections::BTreeMap, time::UNIX_EPOCH};
// crates
// internal
use crate::backend::{MemPool, MempoolError};
@ -12,6 +13,7 @@ pub struct MockPool<Id, Tx> {
pending_txs: LinkedHashMap<Id, Tx>,
in_block_txs: BTreeMap<BlockId, Vec<Tx>>,
in_block_txs_by_id: BTreeMap<Id, BlockId>,
last_tx_timestamp: u64,
}
impl<Id, Tx> Default for MockPool<Id, Tx>
@ -23,6 +25,7 @@ where
pending_txs: LinkedHashMap::new(),
in_block_txs: BTreeMap::new(),
in_block_txs_by_id: BTreeMap::new(),
last_tx_timestamp: 0,
}
}
}
@ -55,6 +58,11 @@ where
return Err(MempoolError::ExistingTx);
}
self.pending_txs.insert(id, tx);
self.last_tx_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Ok(())
}
@ -87,4 +95,8 @@ where
fn pending_tx_count(&self) -> usize {
self.pending_txs.len()
}
fn last_tx_timestamp(&self) -> u64 {
self.last_tx_timestamp
}
}

View File

@ -37,4 +37,5 @@ pub trait MemPool {
fn prune(&mut self, txs: &[Self::Id]);
fn pending_tx_count(&self) -> usize;
fn last_tx_timestamp(&self) -> u64;
}

View File

@ -34,6 +34,7 @@ where
pub struct MempoolMetrics {
pub pending_txs: usize,
pub last_tx_timestamp: u64,
}
pub enum MempoolMsg<Tx, Id> {
@ -155,6 +156,7 @@ where
MempoolMsg::Metrics { reply_channel } => {
let metrics = MempoolMetrics {
pending_txs: pool.pending_tx_count(),
last_tx_timestamp: pool.last_tx_timestamp(),
};
reply_channel.send(metrics).unwrap_or_else(|_| {
tracing::debug!("could not send back mempool metrics")

View File

@ -73,7 +73,7 @@ where
}
};
res_tx.send(res.into()).await.unwrap();
res_tx.send(Ok(res.into())).await.unwrap();
}
Ok(())
}))