From dbe36bba3c2f4a859cbf999404b415f9e9a446bc Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Wed, 25 Jan 2023 16:24:33 +0100 Subject: [PATCH] Mockpool node (#53) * Create nodes folder Kickstart mockpool node * Create nodes folder Added bridges file * Added metrics to mempool * Remove metrics from node * Added mempool metrics bridge * Pipe in mempool_metric bridge * Add wakuinfo to waku network service * Add waku network info bridge * Added waku info bridge to node * Use mock Tx wrapper over a string * Create add tx http bridge * Add tx bridge to http config * Use hash for Tx * Remove tracing subscriber from binary * Fix bridges routes * Added mimimal configuration example * Remove subscribing to default waku pubsub topic * Use addtx payload for tx * Remove pub serde mod from core transaction * Clippy happy * Id from &Tx instead of owned value * Removed mempool metrics feature --- Cargo.toml | 3 +- nodes/mockpool-node/Cargo.toml | 25 ++++ nodes/mockpool-node/src/bridges.rs | 121 ++++++++++++++++++ nodes/mockpool-node/src/main.rs | 89 +++++++++++++ nodes/mockpool-node/src/tx.rs | 19 +++ .../nomos-node}/.cargo/config.toml | 0 {nomos-node => nodes/nomos-node}/Cargo.toml | 6 +- {nomos-node => nodes/nomos-node}/src/main.rs | 0 nomos-services/http/examples/axum.rs | 4 +- nomos-services/http/examples/graphql.rs | 4 +- nomos-services/http/src/backends/axum.rs | 4 +- nomos-services/http/src/backends/mod.rs | 6 +- nomos-services/http/src/bridge.rs | 6 +- nomos-services/http/src/http.rs | 8 +- nomos-services/mempool/Cargo.toml | 2 +- .../mempool/src/backend/mockpool.rs | 8 +- nomos-services/mempool/src/backend/mod.rs | 2 + nomos-services/mempool/src/lib.rs | 16 +++ nomos-services/network/src/backends/waku.rs | 23 +++- 19 files changed, 322 insertions(+), 24 deletions(-) create mode 100644 nodes/mockpool-node/Cargo.toml create mode 100644 nodes/mockpool-node/src/bridges.rs create mode 100644 nodes/mockpool-node/src/main.rs create mode 100644 nodes/mockpool-node/src/tx.rs rename {nomos-node => nodes/nomos-node}/.cargo/config.toml (100%) rename {nomos-node => nodes/nomos-node}/Cargo.toml (71%) rename {nomos-node => nodes/nomos-node}/src/main.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index 1e9e6ae1..1a1d0f8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,5 +8,6 @@ members = [ "nomos-services/consensus", "nomos-services/mempool", "nomos-services/http", - "nomos-node" + "nodes/nomos-node", + "nodes/mockpool-node" ] diff --git a/nodes/mockpool-node/Cargo.toml b/nodes/mockpool-node/Cargo.toml new file mode 100644 index 00000000..456a3b7c --- /dev/null +++ b/nodes/mockpool-node/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "mockpool-node" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +blake2 = "0.10" +bincode = "2.0.0-rc.2" +clap = { version = "4", features = ["derive"] } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +tracing = "0.1" +nomos-core = { path = "../../nomos-core" } +nomos-network = { path = "../../nomos-services/network", features = ["waku"] } +nomos-log = { path = "../../nomos-services/log" } +nomos-mempool = { path = "../../nomos-services/mempool", features = ["waku", "mock"] } +nomos-http = { path = "../../nomos-services/http", features = ["http"] } +tracing-subscriber = "0.3" +tokio = {version = "1.24", features = ["sync"] } +serde_json = "1.0" +serde_yaml = "0.9" +color-eyre = "0.6.0" +serde = "1" \ No newline at end of file diff --git a/nodes/mockpool-node/src/bridges.rs b/nodes/mockpool-node/src/bridges.rs new file mode 100644 index 00000000..0563c56e --- /dev/null +++ b/nodes/mockpool-node/src/bridges.rs @@ -0,0 +1,121 @@ +// std +// crates +use tokio::sync::oneshot; +use tracing::debug; +// internal +use crate::tx::{Tx, TxId}; +use nomos_http::backends::axum::AxumBackend; +use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; +use nomos_http::http::{HttpMethod, HttpRequest}; +use nomos_mempool::backend::mockpool::MockPool; +use nomos_mempool::network::adapters::waku::WakuAdapter; +use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; +use nomos_network::backends::waku::{Waku, WakuBackendMessage, WakuInfo}; +use nomos_network::{NetworkMsg, NetworkService}; + +pub fn mempool_metrics_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner { + Box::new(Box::pin(async move { + let (mempool_channel, mut http_request_channel) = build_http_bridge::< + MempoolService, MockPool>, + AxumBackend, + _, + >( + handle, HttpMethod::GET, "metrics" + ) + .await + .unwrap(); + + while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await { + let (sender, receiver) = oneshot::channel(); + mempool_channel + .send(MempoolMsg::Metrics { + reply_channel: sender, + }) + .await + .unwrap(); + let metrics: MempoolMetrics = receiver.await.unwrap(); + res_tx + // TODO: use serde to serialize metrics + .send(format!("{{\"pending_tx\": {}}}", metrics.pending_txs).into()) + .await + .unwrap(); + } + Ok(()) + })) +} + +pub fn mempool_add_tx_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner { + Box::new(Box::pin(async move { + let (mempool_channel, mut http_request_channel) = build_http_bridge::< + MempoolService, MockPool>, + AxumBackend, + _, + >( + handle.clone(), + HttpMethod::POST, + "addtx", + ) + .await + .unwrap(); + while let Some(HttpRequest { + res_tx, payload, .. + }) = http_request_channel.recv().await + { + if let Some(data) = payload + .as_ref() + .and_then(|b| String::from_utf8(b.to_vec()).ok()) + { + mempool_channel + .send(MempoolMsg::AddTx { tx: Tx(data) }) + .await + .unwrap(); + res_tx.send(b"".to_vec().into()).await.unwrap(); + } else { + debug!( + "Invalid payload, {:?}. Empty or couldn't transform into a utf8 String", + payload + ); + } + } + + Ok(()) + })) +} + +pub fn waku_info_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner { + Box::new(Box::pin(async move { + let (waku_channel, mut http_request_channel) = build_http_bridge::< + NetworkService, + AxumBackend, + _, + >(handle, HttpMethod::GET, "info") + .await + .unwrap(); + + while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await { + let (sender, receiver) = oneshot::channel(); + waku_channel + .send(NetworkMsg::Process(WakuBackendMessage::Info { + reply_channel: sender, + })) + .await + .unwrap(); + let waku_info: WakuInfo = receiver.await.unwrap(); + res_tx + .send( + serde_json::to_vec(&waku_info) + .expect("Serializing of waku info message should not fail") + .into(), + ) + .await + .unwrap(); + } + Ok(()) + })) +} diff --git a/nodes/mockpool-node/src/main.rs b/nodes/mockpool-node/src/main.rs new file mode 100644 index 00000000..a01e2d0b --- /dev/null +++ b/nodes/mockpool-node/src/main.rs @@ -0,0 +1,89 @@ +mod bridges; +mod tx; + +use clap::Parser; +use color_eyre::eyre::{eyre, Result}; +use nomos_http::backends::axum::AxumBackend; +use nomos_http::bridge::{HttpBridge, HttpBridgeService, HttpBridgeSettings}; +use nomos_http::http::HttpService; +use nomos_log::Logger; +use nomos_mempool::backend::mockpool::MockPool; +use nomos_mempool::network::adapters::waku::WakuAdapter; +use nomos_mempool::MempoolService; +use nomos_network::{backends::waku::Waku, NetworkService}; +use overwatch_derive::*; +use overwatch_rs::{ + overwatch::*, + services::{handle::ServiceHandle, ServiceData}, +}; +use serde::Deserialize; +use std::sync::Arc; +use tx::{Tx, TxId}; + +/// Simple program to greet a person +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Path for a yaml-encoded network config file + config: std::path::PathBuf, +} + +#[derive(Deserialize)] +struct Config { + log: ::Settings, + network: as ServiceData>::Settings, + http: as ServiceData>::Settings, +} + +#[derive(Services)] +struct MockPoolNode { + logging: ServiceHandle, + network: ServiceHandle>, + mockpool: ServiceHandle, MockPool>>, + http: ServiceHandle>, + bridges: ServiceHandle, +} +/// Mockpool node +/// Minimal configuration file: +/// +/// ```yaml +/// log: +/// backend: "Stdout" +/// format: "Json" +/// level: "debug" +/// network: +/// backend: +/// host: 0.0.0.0 +/// port: 3000 +/// log_level: "fatal" +/// nodeKey: null +/// discV5BootstrapNodes: [] +/// initial_peers: [] +/// http: +/// backend: +/// address: 0.0.0.0:8080 +/// cors_origins: [] +/// +/// ``` +fn main() -> Result<()> { + let Args { config } = Args::parse(); + let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?; + let bridges: Vec = vec![ + Arc::new(Box::new(bridges::mempool_metrics_bridge)), + Arc::new(Box::new(bridges::waku_info_bridge)), + Arc::new(Box::new(bridges::mempool_add_tx_bridge)), + ]; + let app = OverwatchRunner::::run( + MockPoolNodeServiceSettings { + network: config.network, + logging: config.log, + http: config.http, + mockpool: (), + bridges: HttpBridgeSettings { bridges }, + }, + None, + ) + .map_err(|e| eyre!("Error encountered: {}", e))?; + app.wait_finished(); + Ok(()) +} diff --git a/nodes/mockpool-node/src/tx.rs b/nodes/mockpool-node/src/tx.rs new file mode 100644 index 00000000..664c5106 --- /dev/null +++ b/nodes/mockpool-node/src/tx.rs @@ -0,0 +1,19 @@ +use blake2::{Blake2b512, Digest}; +use serde::{Deserialize, Serialize}; +use std::hash::Hash; + +#[derive(Clone, Debug, Serialize, Deserialize, Hash)] +pub struct Tx(pub String); + +#[derive(Debug, Eq, Hash, PartialEq, Ord, Clone, PartialOrd)] +pub struct TxId([u8; 32]); + +impl From<&Tx> for TxId { + fn from(tx: &Tx) -> Self { + let mut hasher = Blake2b512::new(); + hasher.update(bincode::serde::encode_to_vec(tx, bincode::config::standard()).unwrap()); + let mut id = [0u8; 32]; + id.copy_from_slice(hasher.finalize().as_slice()); + Self(id) + } +} diff --git a/nomos-node/.cargo/config.toml b/nodes/nomos-node/.cargo/config.toml similarity index 100% rename from nomos-node/.cargo/config.toml rename to nodes/nomos-node/.cargo/config.toml diff --git a/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml similarity index 71% rename from nomos-node/Cargo.toml rename to nodes/nomos-node/Cargo.toml index 342d4052..37a36264 100644 --- a/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -10,9 +10,9 @@ clap = { version = "4", features = ["derive"] } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" } tracing = "0.1" -nomos-network = { path = "../nomos-services/network", features = ["waku"] } -metrics = { path = "../nomos-services/metrics", optional = true } -nomos-log = { path = "../nomos-services/log" } +nomos-network = { path = "../../nomos-services/network", features = ["waku"] } +metrics = { path = "../../nomos-services/metrics", optional = true } +nomos-log = { path = "../../nomos-services/log" } tracing-subscriber = "0.3" serde_yaml = "0.9" color-eyre = "0.6.0" diff --git a/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs similarity index 100% rename from nomos-node/src/main.rs rename to nodes/nomos-node/src/main.rs diff --git a/nomos-services/http/examples/axum.rs b/nomos-services/http/examples/axum.rs index 16767660..ec71fb84 100644 --- a/nomos-services/http/examples/axum.rs +++ b/nomos-services/http/examples/axum.rs @@ -121,11 +121,11 @@ fn main() -> Result<(), Box> { let settings = Args::parse(); let app = OverwatchRunner::::run( ServicesServiceSettings { - http: nomos_http::http::Config { + http: nomos_http::http::HttpServiceSettings { backend: settings.http, }, router: nomos_http::bridge::HttpBridgeSettings { - runners: vec![Arc::new(Box::new(dummy_router::))], + bridges: vec![Arc::new(Box::new(dummy_router::))], }, dummy: (), }, diff --git a/nomos-services/http/examples/graphql.rs b/nomos-services/http/examples/graphql.rs index 28fb3a27..6dd72bff 100644 --- a/nomos-services/http/examples/graphql.rs +++ b/nomos-services/http/examples/graphql.rs @@ -185,11 +185,11 @@ fn main() -> Result<(), overwatch_rs::DynError> { let settings = Args::parse(); let app = OverwatchRunner::::run( ServicesServiceSettings { - http: nomos_http::http::Config { + http: nomos_http::http::HttpServiceSettings { backend: settings.http, }, router: nomos_http::bridge::HttpBridgeSettings { - runners: vec![Arc::new(Box::new(dummy_graphql_router::))], + bridges: vec![Arc::new(Box::new(dummy_graphql_router::))], }, dummy_graphql: (), }, diff --git a/nomos-services/http/src/backends/axum.rs b/nomos-services/http/src/backends/axum.rs index 9fe4aa7d..b9548c14 100644 --- a/nomos-services/http/src/backends/axum.rs +++ b/nomos-services/http/src/backends/axum.rs @@ -62,11 +62,11 @@ pub struct AxumBackend { #[async_trait::async_trait] impl HttpBackend for AxumBackend { - type Config = AxumBackendSettings; + type Settings = AxumBackendSettings; type State = NoState; type Error = AxumBackendError; - fn new(config: Self::Config) -> Result + fn new(config: Self::Settings) -> Result where Self: Sized, { diff --git a/nomos-services/http/src/backends/mod.rs b/nomos-services/http/src/backends/mod.rs index 163aa965..f3ac40ed 100644 --- a/nomos-services/http/src/backends/mod.rs +++ b/nomos-services/http/src/backends/mod.rs @@ -10,11 +10,11 @@ use crate::http::{HttpRequest, Route}; #[async_trait::async_trait] pub trait HttpBackend { - type Config: Clone + Debug + Send + Sync + 'static; - type State: ServiceState + Clone; + type Settings: Clone + Debug + Send + Sync + 'static; + type State: ServiceState + Clone; type Error: std::fmt::Display; - fn new(config: Self::Config) -> Result + fn new(config: Self::Settings) -> Result where Self: Sized; diff --git a/nomos-services/http/src/bridge.rs b/nomos-services/http/src/bridge.rs index d987205a..04bc0ad4 100644 --- a/nomos-services/http/src/bridge.rs +++ b/nomos-services/http/src/bridge.rs @@ -68,13 +68,13 @@ pub struct HttpBridgeService { #[derive(Clone)] pub struct HttpBridgeSettings { - pub runners: Vec, + pub bridges: Vec, } impl Debug for HttpBridgeSettings { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("RouterSettings") - .field("runners len", &self.runners.len()) + .field("runners len", &self.bridges.len()) .finish() } } @@ -90,7 +90,7 @@ impl ServiceData for HttpBridgeService { #[async_trait] impl ServiceCore for HttpBridgeService { fn init(service_state: ServiceStateHandle) -> Result { - let runners = service_state.settings_reader.get_updated_settings().runners; + let runners = service_state.settings_reader.get_updated_settings().bridges; let runners: Vec<_> = runners .into_iter() .map(|r| (r)(service_state.overwatch_handle.clone())) diff --git a/nomos-services/http/src/http.rs b/nomos-services/http/src/http.rs index 9ed3aa8d..9260a5b2 100644 --- a/nomos-services/http/src/http.rs +++ b/nomos-services/http/src/http.rs @@ -21,8 +21,8 @@ use tokio::sync::{mpsc::Sender, oneshot}; use crate::backends::HttpBackend; #[derive(Serialize, Deserialize, Debug)] -pub struct Config { - pub backend: B::Config, +pub struct HttpServiceSettings { + pub backend: B::Settings, } pub struct HttpService { @@ -32,7 +32,7 @@ pub struct HttpService { impl ServiceData for HttpService { const SERVICE_ID: ServiceId = "Http"; - type Settings = Config; + type Settings = HttpServiceSettings; type State = NoState; type StateOperator = NoOperator; type Message = HttpMsg; @@ -171,7 +171,7 @@ where } } -impl Clone for Config { +impl Clone for HttpServiceSettings { fn clone(&self) -> Self { Self { backend: self.backend.clone(), diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index 1ed49459..2708dfb4 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -23,4 +23,4 @@ waku-bindings = { version = "0.1.0-beta2", optional = true} [features] default = [] waku = ["nomos-network/waku", "waku-bindings"] -mock = ["linked-hash-map"] +mock = ["linked-hash-map"] \ No newline at end of file diff --git a/nomos-services/mempool/src/backend/mockpool.rs b/nomos-services/mempool/src/backend/mockpool.rs index 11583207..9a074d1d 100644 --- a/nomos-services/mempool/src/backend/mockpool.rs +++ b/nomos-services/mempool/src/backend/mockpool.rs @@ -38,7 +38,7 @@ where impl MemPool for MockPool where - Id: From + PartialOrd + Ord + Eq + Hash + Clone, + Id: for<'t> From<&'t Tx> + PartialOrd + Ord + Eq + Hash + Clone, Tx: Clone + Send + Sync + 'static + Hash, { type Settings = (); @@ -50,7 +50,7 @@ where } fn add_tx(&mut self, tx: Self::Tx) -> Result<(), overwatch_rs::DynError> { - let id = Id::from(tx.clone()); + let id = Id::from(&tx); if self.pending_txs.contains_key(&id) || self.in_block_txs_by_id.contains_key(&id) { return Ok(()); } @@ -83,4 +83,8 @@ where self.pending_txs.remove(tx_id); } } + + fn pending_tx_count(&self) -> usize { + self.pending_txs.len() + } } diff --git a/nomos-services/mempool/src/backend/mod.rs b/nomos-services/mempool/src/backend/mod.rs index 4a98be8d..706a99de 100644 --- a/nomos-services/mempool/src/backend/mod.rs +++ b/nomos-services/mempool/src/backend/mod.rs @@ -27,4 +27,6 @@ pub trait MemPool { /// Signal that a set of transactions can't be possibly requested anymore and can be /// discarded. fn prune(&mut self, txs: &[Self::Id]); + + fn pending_tx_count(&self) -> usize; } diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index 60c097d9..060130be 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -33,6 +33,10 @@ where pool: P, } +pub struct MempoolMetrics { + pub pending_txs: usize, +} + pub enum MempoolMsg { AddTx { tx: Tx, @@ -48,6 +52,9 @@ pub enum MempoolMsg { ids: Vec, block: BlockHeader, }, + Metrics { + reply_channel: Sender, + }, } impl Debug for MempoolMsg { @@ -69,6 +76,7 @@ impl Debug for MempoolMsg { ids, block ) } + Self::Metrics { .. } => write!(f, "MempoolMsg::Metrics"), } } } @@ -142,6 +150,14 @@ where pool.mark_in_block(&ids, block); } MempoolMsg::Prune { ids } => { pool.prune(&ids); }, + MempoolMsg::Metrics { reply_channel } => { + let metrics = MempoolMetrics { + pending_txs: pool.pending_tx_count(), + }; + reply_channel.send(metrics).unwrap_or_else(|_| { + tracing::debug!("could not send back mempool metrics") + }); + } } } Some(tx) = network_txs.next() => { diff --git a/nomos-services/network/src/backends/waku.rs b/nomos-services/network/src/backends/waku.rs index 70059e33..e4b798e7 100644 --- a/nomos-services/network/src/backends/waku.rs +++ b/nomos-services/network/src/backends/waku.rs @@ -15,6 +15,12 @@ pub struct Waku { message_event: Sender, } +#[derive(Serialize, Deserialize, Debug)] +pub struct WakuInfo { + pub listen_addresses: Option>, + pub peer_id: Option, +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct WakuConfig { #[serde(flatten)] @@ -46,6 +52,9 @@ pub enum WakuBackendMessage { topic: Option, peer_id: PeerId, }, + Info { + reply_channel: oneshot::Sender, + }, } #[derive(Debug)] @@ -68,7 +77,6 @@ impl NetworkBackend for Waku { fn new(config: Self::Settings) -> Self { let waku = waku_new(Some(config.inner)).unwrap().start().unwrap(); - waku.relay_subscribe(None).unwrap(); tracing::info!("waku listening on {}", waku.listen_addresses().unwrap()[0]); for peer in &config.initial_peers { if let Err(e) = waku.connect_peer_with_address(peer, None) { @@ -161,6 +169,19 @@ impl NetworkBackend for Waku { ) } }, + WakuBackendMessage::Info { reply_channel } => { + let listen_addresses = self.waku.listen_addresses().ok(); + let peer_id = self.waku.peer_id().ok(); + if reply_channel + .send(WakuInfo { + listen_addresses, + peer_id, + }) + .is_err() + { + error!("could not send waku info"); + } + } }; }