1
0
mirror of synced 2025-01-11 00:05:48 +00:00

Add endpoint to make a new peer conn for mockpool node (#68)

* Add enpoint to make a new peer conn for mockpool node

* Pass multiple addresses in connection request

* Join async reqs to network service, cleanup

* Collect and join reqs
This commit is contained in:
gusto 2023-02-07 17:50:38 +02:00 committed by GitHub
parent 146001c9fe
commit 5f21a2734a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 7 deletions

View File

@ -9,9 +9,11 @@ edition = "2021"
blake2 = "0.10" blake2 = "0.10"
bincode = "2.0.0-rc.2" bincode = "2.0.0-rc.2"
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
futures = "0.3"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
tracing = "0.1" tracing = "0.1"
multiaddr = "0.16"
nomos-core = { path = "../../nomos-core" } nomos-core = { path = "../../nomos-core" }
nomos-network = { path = "../../nomos-services/network", features = ["waku"] } nomos-network = { path = "../../nomos-services/network", features = ["waku"] }
nomos-log = { path = "../../nomos-services/log" } nomos-log = { path = "../../nomos-services/log" }

View File

@ -1,9 +1,8 @@
// std // std
// crates // crates
use tokio::sync::oneshot; use futures::future::join_all;
use tracing::debug; use multiaddr::Multiaddr;
// internal
use crate::tx::{Tx, TxId};
use nomos_http::backends::axum::AxumBackend; use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
use nomos_http::http::{HttpMethod, HttpRequest}; use nomos_http::http::{HttpMethod, HttpRequest};
@ -12,6 +11,11 @@ use nomos_mempool::network::adapters::waku::WakuAdapter;
use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
use nomos_network::backends::waku::{Waku, WakuBackendMessage, WakuInfo}; use nomos_network::backends::waku::{Waku, WakuBackendMessage, WakuInfo};
use nomos_network::{NetworkMsg, NetworkService}; use nomos_network::{NetworkMsg, NetworkService};
use tokio::sync::oneshot;
use tracing::debug;
// internal
use crate::tx::{Tx, TxId};
pub fn mempool_metrics_bridge( pub fn mempool_metrics_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle, handle: overwatch_rs::overwatch::handle::OverwatchHandle,
@ -119,3 +123,41 @@ pub fn waku_info_bridge(
Ok(()) Ok(())
})) }))
} }
pub fn waku_add_conn_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
let (waku_channel, mut http_request_channel) = build_http_bridge::<
NetworkService<Waku>,
AxumBackend,
_,
>(handle, HttpMethod::POST, "conn")
.await
.unwrap();
while let Some(HttpRequest {
res_tx, payload, ..
}) = http_request_channel.recv().await
{
if let Some(payload) = payload {
if let Ok(addrs) = serde_json::from_slice::<Vec<Multiaddr>>(&payload) {
let reqs: Vec<_> = addrs
.into_iter()
.map(|addr| {
waku_channel.send(NetworkMsg::Process(
WakuBackendMessage::ConnectPeer { addr },
))
})
.collect();
join_all(reqs).await;
}
res_tx.send(b"".to_vec().into()).await.unwrap();
} else {
debug!("Invalid payload, {:?}. Should not be empty", payload);
}
}
Ok(())
}))
}

View File

@ -69,9 +69,10 @@ fn main() -> Result<()> {
let Args { config } = Args::parse(); let Args { config } = Args::parse();
let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?; let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?;
let bridges: Vec<HttpBridge> = vec![ let bridges: Vec<HttpBridge> = vec![
Arc::new(Box::new(bridges::mempool_metrics_bridge)),
Arc::new(Box::new(bridges::waku_info_bridge)),
Arc::new(Box::new(bridges::mempool_add_tx_bridge)), Arc::new(Box::new(bridges::mempool_add_tx_bridge)),
Arc::new(Box::new(bridges::mempool_metrics_bridge)),
Arc::new(Box::new(bridges::waku_add_conn_bridge)),
Arc::new(Box::new(bridges::waku_info_bridge)),
]; ];
let app = OverwatchRunner::<MockPoolNode>::run( let app = OverwatchRunner::<MockPoolNode>::run(
MockPoolNodeServiceSettings { MockPoolNodeServiceSettings {

View File

@ -36,6 +36,8 @@ pub enum WakuBackendMessage {
message: WakuMessage, message: WakuMessage,
topic: Option<WakuPubSubTopic>, topic: Option<WakuPubSubTopic>,
}, },
/// Make a connection to peer at provided multiaddress
ConnectPeer { addr: Multiaddr },
/// Subscribe to a particular Waku topic /// Subscribe to a particular Waku topic
RelaySubscribe { topic: WakuPubSubTopic }, RelaySubscribe { topic: WakuPubSubTopic },
/// Unsubscribe from a particular Waku topic /// Unsubscribe from a particular Waku topic
@ -118,6 +120,14 @@ impl NetworkBackend for Waku {
), ),
} }
} }
WakuBackendMessage::ConnectPeer { addr } => {
match self.waku.connect_peer_with_address(&addr, None) {
Ok(_) => debug!("successfully connected to {addr}"),
Err(e) => {
tracing::warn!("Could not connect to {addr}: {e}");
}
}
}
WakuBackendMessage::LightpushPublish { WakuBackendMessage::LightpushPublish {
message, message,
topic, topic,