From 5f21a2734a3c8ecf880a52a1259f01f039c7939c Mon Sep 17 00:00:00 2001 From: gusto Date: Tue, 7 Feb 2023 17:50:38 +0200 Subject: [PATCH] Add endpoint to make a new peer conn for mockpool node (#68) * Add enpoint to make a new peer conn for mockpool node * Pass multiple addresses in connection request * Join async reqs to network service, cleanup * Collect and join reqs --- nodes/mockpool-node/Cargo.toml | 4 +- nodes/mockpool-node/src/bridges.rs | 50 +++++++++++++++++++-- nodes/mockpool-node/src/main.rs | 5 ++- nomos-services/network/src/backends/waku.rs | 10 +++++ 4 files changed, 62 insertions(+), 7 deletions(-) diff --git a/nodes/mockpool-node/Cargo.toml b/nodes/mockpool-node/Cargo.toml index 456a3b7c..e145c5af 100644 --- a/nodes/mockpool-node/Cargo.toml +++ b/nodes/mockpool-node/Cargo.toml @@ -9,9 +9,11 @@ edition = "2021" blake2 = "0.10" bincode = "2.0.0-rc.2" clap = { version = "4", features = ["derive"] } +futures = "0.3" overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" } tracing = "0.1" +multiaddr = "0.16" nomos-core = { path = "../../nomos-core" } nomos-network = { path = "../../nomos-services/network", features = ["waku"] } nomos-log = { path = "../../nomos-services/log" } @@ -22,4 +24,4 @@ tokio = {version = "1.24", features = ["sync"] } serde_json = "1.0" serde_yaml = "0.9" color-eyre = "0.6.0" -serde = "1" \ No newline at end of file +serde = "1" diff --git a/nodes/mockpool-node/src/bridges.rs b/nodes/mockpool-node/src/bridges.rs index 0563c56e..215775d8 100644 --- a/nodes/mockpool-node/src/bridges.rs +++ b/nodes/mockpool-node/src/bridges.rs @@ -1,9 +1,8 @@ // std + // crates -use tokio::sync::oneshot; -use tracing::debug; -// internal -use crate::tx::{Tx, TxId}; +use futures::future::join_all; +use multiaddr::Multiaddr; use nomos_http::backends::axum::AxumBackend; use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; use nomos_http::http::{HttpMethod, HttpRequest}; @@ -12,6 +11,11 @@ use nomos_mempool::network::adapters::waku::WakuAdapter; use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; use nomos_network::backends::waku::{Waku, WakuBackendMessage, WakuInfo}; use nomos_network::{NetworkMsg, NetworkService}; +use tokio::sync::oneshot; +use tracing::debug; + +// internal +use crate::tx::{Tx, TxId}; pub fn mempool_metrics_bridge( handle: overwatch_rs::overwatch::handle::OverwatchHandle, @@ -119,3 +123,41 @@ pub fn waku_info_bridge( Ok(()) })) } + +pub fn waku_add_conn_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner { + Box::new(Box::pin(async move { + let (waku_channel, mut http_request_channel) = build_http_bridge::< + NetworkService, + AxumBackend, + _, + >(handle, HttpMethod::POST, "conn") + .await + .unwrap(); + + while let Some(HttpRequest { + res_tx, payload, .. + }) = http_request_channel.recv().await + { + if let Some(payload) = payload { + if let Ok(addrs) = serde_json::from_slice::>(&payload) { + let reqs: Vec<_> = addrs + .into_iter() + .map(|addr| { + waku_channel.send(NetworkMsg::Process( + WakuBackendMessage::ConnectPeer { addr }, + )) + }) + .collect(); + + join_all(reqs).await; + } + res_tx.send(b"".to_vec().into()).await.unwrap(); + } else { + debug!("Invalid payload, {:?}. Should not be empty", payload); + } + } + Ok(()) + })) +} diff --git a/nodes/mockpool-node/src/main.rs b/nodes/mockpool-node/src/main.rs index a01e2d0b..5f11a62d 100644 --- a/nodes/mockpool-node/src/main.rs +++ b/nodes/mockpool-node/src/main.rs @@ -69,9 +69,10 @@ fn main() -> Result<()> { let Args { config } = Args::parse(); let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?; let bridges: Vec = vec![ - Arc::new(Box::new(bridges::mempool_metrics_bridge)), - Arc::new(Box::new(bridges::waku_info_bridge)), Arc::new(Box::new(bridges::mempool_add_tx_bridge)), + Arc::new(Box::new(bridges::mempool_metrics_bridge)), + Arc::new(Box::new(bridges::waku_add_conn_bridge)), + Arc::new(Box::new(bridges::waku_info_bridge)), ]; let app = OverwatchRunner::::run( MockPoolNodeServiceSettings { diff --git a/nomos-services/network/src/backends/waku.rs b/nomos-services/network/src/backends/waku.rs index e4b798e7..d1fa341e 100644 --- a/nomos-services/network/src/backends/waku.rs +++ b/nomos-services/network/src/backends/waku.rs @@ -36,6 +36,8 @@ pub enum WakuBackendMessage { message: WakuMessage, topic: Option, }, + /// Make a connection to peer at provided multiaddress + ConnectPeer { addr: Multiaddr }, /// Subscribe to a particular Waku topic RelaySubscribe { topic: WakuPubSubTopic }, /// Unsubscribe from a particular Waku topic @@ -118,6 +120,14 @@ impl NetworkBackend for Waku { ), } } + WakuBackendMessage::ConnectPeer { addr } => { + match self.waku.connect_peer_with_address(&addr, None) { + Ok(_) => debug!("successfully connected to {addr}"), + Err(e) => { + tracing::warn!("Could not connect to {addr}: {e}"); + } + } + } WakuBackendMessage::LightpushPublish { message, topic,