Mockpool node (#53)

* Create nodes folder
Kickstart mockpool node

* Create nodes folder
Added bridges file

* Added metrics to mempool

* Remove metrics from node

* Added mempool metrics bridge

* Pipe in mempool_metric bridge

* Add wakuinfo to waku network service

* Add waku network info bridge

* Added waku info bridge to node

* Use mock Tx wrapper over a string

* Create add tx http bridge

* Add tx bridge to http config

* Use hash for Tx

* Remove tracing subscriber from binary

* Fix bridges routes

* Added mimimal configuration example

* Remove subscribing to default waku pubsub topic

* Use addtx payload for tx

* Remove pub serde mod from core transaction

* Clippy happy

* Id from &Tx instead of owned value

* Removed mempool metrics feature
This commit is contained in:
Daniel Sanchez 2023-01-25 16:24:33 +01:00 committed by GitHub
parent 915ec00b34
commit dbe36bba3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 322 additions and 24 deletions

View File

@ -8,5 +8,6 @@ members = [
"nomos-services/consensus",
"nomos-services/mempool",
"nomos-services/http",
"nomos-node"
"nodes/nomos-node",
"nodes/mockpool-node"
]

View File

@ -0,0 +1,25 @@
[package]
name = "mockpool-node"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
blake2 = "0.10"
bincode = "2.0.0-rc.2"
clap = { version = "4", features = ["derive"] }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
tracing = "0.1"
nomos-core = { path = "../../nomos-core" }
nomos-network = { path = "../../nomos-services/network", features = ["waku"] }
nomos-log = { path = "../../nomos-services/log" }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["waku", "mock"] }
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
tracing-subscriber = "0.3"
tokio = {version = "1.24", features = ["sync"] }
serde_json = "1.0"
serde_yaml = "0.9"
color-eyre = "0.6.0"
serde = "1"

View File

@ -0,0 +1,121 @@
// std
// crates
use tokio::sync::oneshot;
use tracing::debug;
// internal
use crate::tx::{Tx, TxId};
use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
use nomos_http::http::{HttpMethod, HttpRequest};
use nomos_mempool::backend::mockpool::MockPool;
use nomos_mempool::network::adapters::waku::WakuAdapter;
use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
use nomos_network::backends::waku::{Waku, WakuBackendMessage, WakuInfo};
use nomos_network::{NetworkMsg, NetworkService};
pub fn mempool_metrics_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
let (mempool_channel, mut http_request_channel) = build_http_bridge::<
MempoolService<WakuAdapter<Tx>, MockPool<TxId, Tx>>,
AxumBackend,
_,
>(
handle, HttpMethod::GET, "metrics"
)
.await
.unwrap();
while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await {
let (sender, receiver) = oneshot::channel();
mempool_channel
.send(MempoolMsg::Metrics {
reply_channel: sender,
})
.await
.unwrap();
let metrics: MempoolMetrics = receiver.await.unwrap();
res_tx
// TODO: use serde to serialize metrics
.send(format!("{{\"pending_tx\": {}}}", metrics.pending_txs).into())
.await
.unwrap();
}
Ok(())
}))
}
pub fn mempool_add_tx_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
let (mempool_channel, mut http_request_channel) = build_http_bridge::<
MempoolService<WakuAdapter<Tx>, MockPool<TxId, Tx>>,
AxumBackend,
_,
>(
handle.clone(),
HttpMethod::POST,
"addtx",
)
.await
.unwrap();
while let Some(HttpRequest {
res_tx, payload, ..
}) = http_request_channel.recv().await
{
if let Some(data) = payload
.as_ref()
.and_then(|b| String::from_utf8(b.to_vec()).ok())
{
mempool_channel
.send(MempoolMsg::AddTx { tx: Tx(data) })
.await
.unwrap();
res_tx.send(b"".to_vec().into()).await.unwrap();
} else {
debug!(
"Invalid payload, {:?}. Empty or couldn't transform into a utf8 String",
payload
);
}
}
Ok(())
}))
}
pub fn waku_info_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
let (waku_channel, mut http_request_channel) = build_http_bridge::<
NetworkService<Waku>,
AxumBackend,
_,
>(handle, HttpMethod::GET, "info")
.await
.unwrap();
while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await {
let (sender, receiver) = oneshot::channel();
waku_channel
.send(NetworkMsg::Process(WakuBackendMessage::Info {
reply_channel: sender,
}))
.await
.unwrap();
let waku_info: WakuInfo = receiver.await.unwrap();
res_tx
.send(
serde_json::to_vec(&waku_info)
.expect("Serializing of waku info message should not fail")
.into(),
)
.await
.unwrap();
}
Ok(())
}))
}

View File

@ -0,0 +1,89 @@
mod bridges;
mod tx;
use clap::Parser;
use color_eyre::eyre::{eyre, Result};
use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::{HttpBridge, HttpBridgeService, HttpBridgeSettings};
use nomos_http::http::HttpService;
use nomos_log::Logger;
use nomos_mempool::backend::mockpool::MockPool;
use nomos_mempool::network::adapters::waku::WakuAdapter;
use nomos_mempool::MempoolService;
use nomos_network::{backends::waku::Waku, NetworkService};
use overwatch_derive::*;
use overwatch_rs::{
overwatch::*,
services::{handle::ServiceHandle, ServiceData},
};
use serde::Deserialize;
use std::sync::Arc;
use tx::{Tx, TxId};
/// Simple program to greet a person
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Path for a yaml-encoded network config file
config: std::path::PathBuf,
}
#[derive(Deserialize)]
struct Config {
log: <Logger as ServiceData>::Settings,
network: <NetworkService<Waku> as ServiceData>::Settings,
http: <HttpService<AxumBackend> as ServiceData>::Settings,
}
#[derive(Services)]
struct MockPoolNode {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Waku>>,
mockpool: ServiceHandle<MempoolService<WakuAdapter<Tx>, MockPool<TxId, Tx>>>,
http: ServiceHandle<HttpService<AxumBackend>>,
bridges: ServiceHandle<HttpBridgeService>,
}
/// Mockpool node
/// Minimal configuration file:
///
/// ```yaml
/// log:
/// backend: "Stdout"
/// format: "Json"
/// level: "debug"
/// network:
/// backend:
/// host: 0.0.0.0
/// port: 3000
/// log_level: "fatal"
/// nodeKey: null
/// discV5BootstrapNodes: []
/// initial_peers: []
/// http:
/// backend:
/// address: 0.0.0.0:8080
/// cors_origins: []
///
/// ```
fn main() -> Result<()> {
let Args { config } = Args::parse();
let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?;
let bridges: Vec<HttpBridge> = vec![
Arc::new(Box::new(bridges::mempool_metrics_bridge)),
Arc::new(Box::new(bridges::waku_info_bridge)),
Arc::new(Box::new(bridges::mempool_add_tx_bridge)),
];
let app = OverwatchRunner::<MockPoolNode>::run(
MockPoolNodeServiceSettings {
network: config.network,
logging: config.log,
http: config.http,
mockpool: (),
bridges: HttpBridgeSettings { bridges },
},
None,
)
.map_err(|e| eyre!("Error encountered: {}", e))?;
app.wait_finished();
Ok(())
}

View File

@ -0,0 +1,19 @@
use blake2::{Blake2b512, Digest};
use serde::{Deserialize, Serialize};
use std::hash::Hash;
#[derive(Clone, Debug, Serialize, Deserialize, Hash)]
pub struct Tx(pub String);
#[derive(Debug, Eq, Hash, PartialEq, Ord, Clone, PartialOrd)]
pub struct TxId([u8; 32]);
impl From<&Tx> for TxId {
fn from(tx: &Tx) -> Self {
let mut hasher = Blake2b512::new();
hasher.update(bincode::serde::encode_to_vec(tx, bincode::config::standard()).unwrap());
let mut id = [0u8; 32];
id.copy_from_slice(hasher.finalize().as_slice());
Self(id)
}
}

View File

@ -10,9 +10,9 @@ clap = { version = "4", features = ["derive"] }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
tracing = "0.1"
nomos-network = { path = "../nomos-services/network", features = ["waku"] }
metrics = { path = "../nomos-services/metrics", optional = true }
nomos-log = { path = "../nomos-services/log" }
nomos-network = { path = "../../nomos-services/network", features = ["waku"] }
metrics = { path = "../../nomos-services/metrics", optional = true }
nomos-log = { path = "../../nomos-services/log" }
tracing-subscriber = "0.3"
serde_yaml = "0.9"
color-eyre = "0.6.0"

View File

@ -121,11 +121,11 @@ fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let settings = Args::parse();
let app = OverwatchRunner::<Services>::run(
ServicesServiceSettings {
http: nomos_http::http::Config {
http: nomos_http::http::HttpServiceSettings {
backend: settings.http,
},
router: nomos_http::bridge::HttpBridgeSettings {
runners: vec![Arc::new(Box::new(dummy_router::<AxumBackend>))],
bridges: vec![Arc::new(Box::new(dummy_router::<AxumBackend>))],
},
dummy: (),
},

View File

@ -185,11 +185,11 @@ fn main() -> Result<(), overwatch_rs::DynError> {
let settings = Args::parse();
let app = OverwatchRunner::<Services>::run(
ServicesServiceSettings {
http: nomos_http::http::Config {
http: nomos_http::http::HttpServiceSettings {
backend: settings.http,
},
router: nomos_http::bridge::HttpBridgeSettings {
runners: vec![Arc::new(Box::new(dummy_graphql_router::<AxumBackend>))],
bridges: vec![Arc::new(Box::new(dummy_graphql_router::<AxumBackend>))],
},
dummy_graphql: (),
},

View File

@ -62,11 +62,11 @@ pub struct AxumBackend {
#[async_trait::async_trait]
impl HttpBackend for AxumBackend {
type Config = AxumBackendSettings;
type Settings = AxumBackendSettings;
type State = NoState<AxumBackendSettings>;
type Error = AxumBackendError;
fn new(config: Self::Config) -> Result<Self, Self::Error>
fn new(config: Self::Settings) -> Result<Self, Self::Error>
where
Self: Sized,
{

View File

@ -10,11 +10,11 @@ use crate::http::{HttpRequest, Route};
#[async_trait::async_trait]
pub trait HttpBackend {
type Config: Clone + Debug + Send + Sync + 'static;
type State: ServiceState<Settings = Self::Config> + Clone;
type Settings: Clone + Debug + Send + Sync + 'static;
type State: ServiceState<Settings = Self::Settings> + Clone;
type Error: std::fmt::Display;
fn new(config: Self::Config) -> Result<Self, Self::Error>
fn new(config: Self::Settings) -> Result<Self, Self::Error>
where
Self: Sized;

View File

@ -68,13 +68,13 @@ pub struct HttpBridgeService {
#[derive(Clone)]
pub struct HttpBridgeSettings {
pub runners: Vec<HttpBridge>,
pub bridges: Vec<HttpBridge>,
}
impl Debug for HttpBridgeSettings {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RouterSettings")
.field("runners len", &self.runners.len())
.field("runners len", &self.bridges.len())
.finish()
}
}
@ -90,7 +90,7 @@ impl ServiceData for HttpBridgeService {
#[async_trait]
impl ServiceCore for HttpBridgeService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
let runners = service_state.settings_reader.get_updated_settings().runners;
let runners = service_state.settings_reader.get_updated_settings().bridges;
let runners: Vec<_> = runners
.into_iter()
.map(|r| (r)(service_state.overwatch_handle.clone()))

View File

@ -21,8 +21,8 @@ use tokio::sync::{mpsc::Sender, oneshot};
use crate::backends::HttpBackend;
#[derive(Serialize, Deserialize, Debug)]
pub struct Config<B: HttpBackend> {
pub backend: B::Config,
pub struct HttpServiceSettings<B: HttpBackend> {
pub backend: B::Settings,
}
pub struct HttpService<B: HttpBackend> {
@ -32,7 +32,7 @@ pub struct HttpService<B: HttpBackend> {
impl<B: HttpBackend + 'static> ServiceData for HttpService<B> {
const SERVICE_ID: ServiceId = "Http";
type Settings = Config<B>;
type Settings = HttpServiceSettings<B>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = HttpMsg;
@ -171,7 +171,7 @@ where
}
}
impl<B: HttpBackend> Clone for Config<B> {
impl<B: HttpBackend> Clone for HttpServiceSettings<B> {
fn clone(&self) -> Self {
Self {
backend: self.backend.clone(),

View File

@ -23,4 +23,4 @@ waku-bindings = { version = "0.1.0-beta2", optional = true}
[features]
default = []
waku = ["nomos-network/waku", "waku-bindings"]
mock = ["linked-hash-map"]
mock = ["linked-hash-map"]

View File

@ -38,7 +38,7 @@ where
impl<Id, Tx> MemPool for MockPool<Id, Tx>
where
Id: From<Tx> + PartialOrd + Ord + Eq + Hash + Clone,
Id: for<'t> From<&'t Tx> + PartialOrd + Ord + Eq + Hash + Clone,
Tx: Clone + Send + Sync + 'static + Hash,
{
type Settings = ();
@ -50,7 +50,7 @@ where
}
fn add_tx(&mut self, tx: Self::Tx) -> Result<(), overwatch_rs::DynError> {
let id = Id::from(tx.clone());
let id = Id::from(&tx);
if self.pending_txs.contains_key(&id) || self.in_block_txs_by_id.contains_key(&id) {
return Ok(());
}
@ -83,4 +83,8 @@ where
self.pending_txs.remove(tx_id);
}
}
fn pending_tx_count(&self) -> usize {
self.pending_txs.len()
}
}

View File

@ -27,4 +27,6 @@ pub trait MemPool {
/// Signal that a set of transactions can't be possibly requested anymore and can be
/// discarded.
fn prune(&mut self, txs: &[Self::Id]);
fn pending_tx_count(&self) -> usize;
}

View File

@ -33,6 +33,10 @@ where
pool: P,
}
pub struct MempoolMetrics {
pub pending_txs: usize,
}
pub enum MempoolMsg<Tx, Id> {
AddTx {
tx: Tx,
@ -48,6 +52,9 @@ pub enum MempoolMsg<Tx, Id> {
ids: Vec<Id>,
block: BlockHeader,
},
Metrics {
reply_channel: Sender<MempoolMetrics>,
},
}
impl<Tx: Debug, Id: Debug> Debug for MempoolMsg<Tx, Id> {
@ -69,6 +76,7 @@ impl<Tx: Debug, Id: Debug> Debug for MempoolMsg<Tx, Id> {
ids, block
)
}
Self::Metrics { .. } => write!(f, "MempoolMsg::Metrics"),
}
}
}
@ -142,6 +150,14 @@ where
pool.mark_in_block(&ids, block);
}
MempoolMsg::Prune { ids } => { pool.prune(&ids); },
MempoolMsg::Metrics { reply_channel } => {
let metrics = MempoolMetrics {
pending_txs: pool.pending_tx_count(),
};
reply_channel.send(metrics).unwrap_or_else(|_| {
tracing::debug!("could not send back mempool metrics")
});
}
}
}
Some(tx) = network_txs.next() => {

View File

@ -15,6 +15,12 @@ pub struct Waku {
message_event: Sender<NetworkEvent>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct WakuInfo {
pub listen_addresses: Option<Vec<Multiaddr>>,
pub peer_id: Option<PeerId>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct WakuConfig {
#[serde(flatten)]
@ -46,6 +52,9 @@ pub enum WakuBackendMessage {
topic: Option<WakuPubSubTopic>,
peer_id: PeerId,
},
Info {
reply_channel: oneshot::Sender<WakuInfo>,
},
}
#[derive(Debug)]
@ -68,7 +77,6 @@ impl NetworkBackend for Waku {
fn new(config: Self::Settings) -> Self {
let waku = waku_new(Some(config.inner)).unwrap().start().unwrap();
waku.relay_subscribe(None).unwrap();
tracing::info!("waku listening on {}", waku.listen_addresses().unwrap()[0]);
for peer in &config.initial_peers {
if let Err(e) = waku.connect_peer_with_address(peer, None) {
@ -161,6 +169,19 @@ impl NetworkBackend for Waku {
)
}
},
WakuBackendMessage::Info { reply_channel } => {
let listen_addresses = self.waku.listen_addresses().ok();
let peer_id = self.waku.peer_id().ok();
if reply_channel
.send(WakuInfo {
listen_addresses,
peer_id,
})
.is_err()
{
error!("could not send waku info");
}
}
};
}