Remove waku support in nomos-node binary (#429)

This commit is contained in:
Daniel Sanchez 2023-09-20 16:32:43 +02:00 committed by GitHub
parent 500683b0da
commit e71c359171
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 23 additions and 306 deletions

View File

@ -19,12 +19,12 @@ overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "ma
tracing = "0.1"
multiaddr = "0.18"
nomos-core = { path = "../../nomos-core" }
nomos-network = { path = "../../nomos-services/network" }
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
nomos-log = { path = "../../nomos-services/log" }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock"] }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p"] }
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
nomos-consensus = { path = "../../nomos-services/consensus" }
nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
nomos-consensus = { path = "../../nomos-services/consensus", features = ["libp2p"] }
nomos-libp2p = { path = "../../nomos-libp2p" }
nomos-da = { path = "../../nomos-services/data-availability" }
metrics = { path = "../../nomos-services/metrics", optional = true }
tracing-subscriber = "0.3"
@ -34,10 +34,4 @@ serde_json = "1.0"
serde_yaml = "0.9"
color-eyre = "0.6.0"
serde = "1"
waku-bindings = { version = "0.1.1", optional = true }
full-replication = { path = "../../nomos-da/full-replication" }
[features]
default = ["libp2p"]
waku = ["waku-bindings", "nomos-network/waku", "nomos-mempool/waku", "nomos-consensus/waku"]
libp2p = ["nomos-libp2p", "nomos-network/libp2p", "nomos-mempool/libp2p", "nomos-consensus/libp2p"]

View File

@ -10,7 +10,6 @@ use nomos_network::NetworkMsg;
use nomos_node::Tx;
use overwatch_rs::services::relay::OutboundRelay;
#[cfg(feature = "libp2p")]
pub(super) async fn handle_libp2p_info_req(
channel: &OutboundRelay<NetworkMsg<Libp2p>>,
res_tx: Sender<HttpResponse>,
@ -28,7 +27,6 @@ pub(super) async fn handle_libp2p_info_req(
Ok(())
}
#[cfg(feature = "libp2p")]
pub(super) async fn libp2p_send_transaction(
network_relay: OutboundRelay<NetworkMsg<Libp2p>>,
tx: Tx,

View File

@ -1,9 +1,10 @@
use bytes::Bytes;
use http::StatusCode;
mod libp2p;
// std
// crates
use bytes::Bytes;
use http::StatusCode;
use nomos_consensus::{CarnotInfo, ConsensusMsg};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tracing::error;
@ -15,25 +16,13 @@ use nomos_mempool::backend::mockpool::MockPool;
use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
#[cfg(feature = "libp2p")]
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter;
#[cfg(feature = "waku")]
use nomos_mempool::network::adapters::waku::WakuAdapter;
#[cfg(feature = "libp2p")]
use nomos_network::backends::libp2p::Libp2p;
#[cfg(feature = "waku")]
use nomos_network::backends::waku::Waku;
use nomos_network::NetworkService;
use nomos_node::{Carnot, Tx};
use overwatch_rs::services::relay::OutboundRelay;
#[cfg(feature = "waku")]
mod waku;
#[cfg(feature = "waku")]
use waku::*;
#[cfg(feature = "libp2p")]
mod libp2p;
#[cfg(feature = "libp2p")]
use libp2p::*;
use nomos_mempool::network::NetworkAdapter;
use nomos_network::backends::NetworkBackend;
@ -65,11 +54,6 @@ pub fn mempool_metrics_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
#[cfg(feature = "waku")]
{
get_handler!(handle, MempoolService<WakuAdapter<Tx>, MockPool<Tx>>, "metrics" => handle_mempool_metrics_req)
}
#[cfg(feature = "libp2p")]
get_handler!(handle, MempoolService<Libp2pAdapter<Tx>, MockPool<Tx>>, "metrics" => handle_mempool_metrics_req)
}))
}
@ -78,11 +62,6 @@ pub fn network_info_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
#[cfg(feature = "waku")]
{
get_handler!(handle, NetworkService<Waku>, "info" => handle_waku_info_req)
}
#[cfg(feature = "libp2p")]
get_handler!(handle, NetworkService<Libp2p>, "info" => handle_libp2p_info_req)
}))
}
@ -117,31 +96,6 @@ pub fn mempool_add_tx_bridge<
}))
}
#[cfg(feature = "waku")]
pub fn waku_add_conn_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
let (waku_channel, mut http_request_channel) = build_http_bridge::<
NetworkService<Waku>,
AxumBackend,
_,
>(handle, HttpMethod::POST, "conn")
.await
.unwrap();
while let Some(HttpRequest {
res_tx, payload, ..
}) = http_request_channel.recv().await
{
if let Err(e) = handle_add_conn_req(&waku_channel, res_tx, payload).await {
error!(e);
}
}
Ok(())
}))
}
async fn handle_carnot_info_req(
carnot_channel: &OutboundRelay<ConsensusMsg>,
res_tx: Sender<HttpResponse>,
@ -207,16 +161,8 @@ pub(super) async fn handle_mempool_add_tx_req(
match receiver.await {
Ok(Ok(())) => {
// broadcast transaction to peers
#[cfg(feature = "waku")]
{
let network_relay = handle.relay::<NetworkService<Waku>>().connect().await?;
waku_send_transaction(network_relay, tx).await?;
}
#[cfg(feature = "libp2p")]
{
let network_relay = handle.relay::<NetworkService<Libp2p>>().connect().await?;
libp2p_send_transaction(network_relay, tx).await?;
}
let network_relay = handle.relay::<NetworkService<Libp2p>>().connect().await?;
libp2p_send_transaction(network_relay, tx).await?;
Ok(res_tx.send(Ok(b"".to_vec().into())).await?)
}
Ok(Err(())) => Ok(res_tx

View File

@ -1,85 +0,0 @@
use bytes::Bytes;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
// internal
use futures::future::join_all;
use nomos_core::wire;
use nomos_http::http::HttpResponse;
use nomos_mempool::network::adapters::waku::{
WAKU_CARNOT_PUB_SUB_TOPIC, WAKU_CARNOT_TX_CONTENT_TOPIC,
};
use nomos_network::backends::waku::{Waku, WakuBackendMessage};
use nomos_network::NetworkMsg;
use nomos_node::Tx;
use overwatch_rs::services::relay::OutboundRelay;
use waku_bindings::{Multiaddr, WakuMessage};
pub(super) async fn handle_waku_info_req(
channel: &OutboundRelay<NetworkMsg<Waku>>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError> {
let (sender, receiver) = oneshot::channel();
channel
.send(NetworkMsg::Process(WakuBackendMessage::Info {
reply_channel: sender,
}))
.await
.map_err(|(e, _)| e)?;
let info = receiver.await.unwrap();
res_tx.send(Ok(serde_json::to_vec(&info)?.into())).await?;
Ok(())
}
pub(super) async fn handle_add_conn_req(
waku_channel: &OutboundRelay<NetworkMsg<Waku>>,
res_tx: Sender<HttpResponse>,
payload: Option<Bytes>,
) -> Result<(), overwatch_rs::DynError> {
if let Some(payload) = payload {
if let Ok(addrs) = serde_json::from_slice::<Vec<Multiaddr>>(&payload) {
let reqs: Vec<_> = addrs
.into_iter()
.map(|addr| {
waku_channel.send(NetworkMsg::Process(WakuBackendMessage::ConnectPeer {
addr,
}))
})
.collect();
join_all(reqs).await;
}
Ok(res_tx.send(Ok(b"".to_vec().into())).await?)
} else {
Err(
format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String")
.into(),
)
}
}
pub(super) async fn waku_send_transaction(
network_relay: OutboundRelay<NetworkMsg<Waku>>,
tx: Tx,
) -> Result<(), overwatch_rs::DynError> {
let payload = wire::serialize(&tx).expect("Tx serialization failed");
network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message: WakuMessage::new(
payload,
WAKU_CARNOT_TX_CONTENT_TOPIC.clone(),
1,
chrono::Utc::now()
.timestamp_nanos_opt()
.expect("timestamp should be in valid range") as usize,
[],
false,
),
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
}))
.await
.map_err(|(e, _)| e)?;
Ok(())
}

View File

@ -5,7 +5,6 @@ use std::{
};
use crate::Carnot;
#[cfg(feature = "libp2p")]
use crate::DataAvailability;
use clap::{Parser, ValueEnum};
use color_eyre::eyre::{self, eyre, Result};
@ -13,19 +12,13 @@ use hex::FromHex;
#[cfg(feature = "metrics")]
use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService};
use nomos_http::{backends::axum::AxumBackend, http::HttpService};
#[cfg(feature = "libp2p")]
use nomos_libp2p::{secp256k1::SecretKey, Multiaddr};
use nomos_log::{Logger, LoggerBackend, LoggerFormat};
#[cfg(feature = "libp2p")]
use nomos_network::backends::libp2p::Libp2p;
#[cfg(feature = "waku")]
use nomos_network::backends::waku::Waku;
use nomos_network::NetworkService;
use overwatch_rs::services::ServiceData;
use serde::{Deserialize, Serialize};
use tracing::Level;
#[cfg(feature = "waku")]
use waku_bindings::{Multiaddr, SecretKey};
#[derive(ValueEnum, Clone, Debug, Default)]
pub enum LoggerBackendType {
@ -123,15 +116,11 @@ pub struct OverlayArgs {
#[derive(Deserialize, Debug, Clone, Serialize)]
pub struct Config {
pub log: <Logger as ServiceData>::Settings,
#[cfg(feature = "waku")]
pub network: <NetworkService<Waku> as ServiceData>::Settings,
#[cfg(feature = "libp2p")]
pub network: <NetworkService<Libp2p> as ServiceData>::Settings,
pub http: <HttpService<AxumBackend> as ServiceData>::Settings,
pub consensus: <Carnot as ServiceData>::Settings,
#[cfg(feature = "metrics")]
pub metrics: <MetricsService<MapMetricsBackend<MetricsData>> as ServiceData>::Settings,
#[cfg(feature = "libp2p")]
pub da: <DataAvailability as ServiceData>::Settings,
}
@ -178,37 +167,6 @@ impl Config {
}
Ok(self)
}
#[cfg(feature = "waku")]
pub fn update_network(mut self, network_args: NetworkArgs) -> Result<Self> {
let NetworkArgs {
host,
port,
node_key,
initial_peers,
} = network_args;
if let Some(host) = host {
self.network.backend.inner.host = Some(host);
}
if let Some(port) = port {
self.network.backend.inner.port = Some(port);
}
if let Some(node_key) = node_key {
use std::str::FromStr;
self.network.backend.inner.node_key = Some(SecretKey::from_str(&node_key)?);
}
if let Some(peers) = initial_peers {
self.network.backend.initial_peers = peers;
}
Ok(self)
}
#[cfg(feature = "libp2p")]
pub fn update_network(mut self, network_args: NetworkArgs) -> Result<Self> {
let NetworkArgs {
host,

View File

@ -4,16 +4,14 @@ mod tx;
use color_eyre::eyre::Result;
use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin};
use full_replication::Blob;
#[cfg(feature = "libp2p")]
use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication};
#[cfg(feature = "metrics")]
use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService};
#[cfg(feature = "libp2p")]
use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter;
#[cfg(feature = "waku")]
use nomos_consensus::network::adapters::waku::WakuAdapter as ConsensusWakuAdapter;
use nomos_consensus::CarnotConsensus;
#[cfg(feature = "libp2p")]
use nomos_da::{
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter,
DataAvailabilityService,
@ -22,15 +20,11 @@ use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::HttpBridgeService;
use nomos_http::http::HttpService;
use nomos_log::Logger;
#[cfg(feature = "libp2p")]
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter;
#[cfg(feature = "waku")]
use nomos_mempool::network::adapters::waku::WakuAdapter as MempoolWakuAdapter;
use nomos_mempool::{backend::mockpool::MockPool, MempoolService};
#[cfg(feature = "libp2p")]
use nomos_network::backends::libp2p::Libp2p;
#[cfg(feature = "waku")]
use nomos_network::backends::waku::Waku;
use nomos_network::NetworkService;
use overwatch_derive::*;
use overwatch_rs::services::handle::ServiceHandle;
@ -41,23 +35,8 @@ use nomos_core::{
};
pub use tx::Tx;
#[cfg(all(feature = "waku", feature = "libp2p"))]
compile_error!("feature \"waku\" and feature \"libp2p\" cannot be enabled at the same time");
#[cfg(feature = "waku")]
pub type Carnot = CarnotConsensus<
ConsensusWakuAdapter,
MockPool<Tx>,
MempoolWakuAdapter<Tx>,
FlatOverlay<RoundRobin, RandomBeaconState>,
Blob,
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobs<MB16, Blob>,
>;
const MB16: usize = 1024 * 1024 * 16;
#[cfg(feature = "libp2p")]
pub type Carnot = CarnotConsensus<
ConsensusLibp2pAdapter,
MockPool<Tx>,
@ -68,7 +47,6 @@ pub type Carnot = CarnotConsensus<
FillSizeWithBlobs<MB16, Blob>,
>;
#[cfg(feature = "libp2p")]
type DataAvailability = DataAvailabilityService<
FullReplication<AbsoluteNumber<Attestation, Certificate>>,
BlobCache<<Blob as nomos_core::da::blob::Blob>::Hash, Blob>,
@ -78,19 +56,12 @@ type DataAvailability = DataAvailabilityService<
#[derive(Services)]
pub struct Nomos {
logging: ServiceHandle<Logger>,
#[cfg(feature = "waku")]
network: ServiceHandle<NetworkService<Waku>>,
#[cfg(feature = "libp2p")]
network: ServiceHandle<NetworkService<Libp2p>>,
#[cfg(feature = "waku")]
mockpool: ServiceHandle<MempoolService<MempoolWakuAdapter<Tx>, MockPool<Tx>>>,
#[cfg(feature = "libp2p")]
mockpool: ServiceHandle<MempoolService<MempoolLibp2pAdapter<Tx>, MockPool<Tx>>>,
consensus: ServiceHandle<Carnot>,
http: ServiceHandle<HttpService<AxumBackend>>,
bridges: ServiceHandle<HttpBridgeService>,
#[cfg(feature = "metrics")]
metrics: ServiceHandle<MetricsService<MapMetricsBackend<MetricsData>>>,
#[cfg(feature = "libp2p")]
da: ServiceHandle<DataAvailability>,
}

View File

@ -8,15 +8,8 @@ mod bridges;
use clap::Parser;
use color_eyre::eyre::{eyre, Result};
use nomos_http::bridge::{HttpBridge, HttpBridgeSettings};
#[cfg(feature = "libp2p")]
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter;
#[cfg(feature = "waku")]
use nomos_mempool::network::adapters::waku::WakuAdapter;
#[cfg(feature = "libp2p")]
use nomos_network::backends::libp2p::Libp2p;
#[cfg(feature = "waku")]
use nomos_network::backends::waku::Waku;
use overwatch_rs::overwatch::*;
use std::sync::Arc;
@ -62,16 +55,9 @@ fn main() -> Result<()> {
Arc::new(Box::new(bridges::carnot_info_bridge)),
Arc::new(Box::new(bridges::mempool_metrics_bridge)),
Arc::new(Box::new(bridges::network_info_bridge)),
#[cfg(feature = "waku")]
Arc::new(Box::new(
bridges::mempool_add_tx_bridge::<Waku, WakuAdapter<Tx>>,
)),
#[cfg(feature = "libp2p")]
Arc::new(Box::new(
bridges::mempool_add_tx_bridge::<Libp2p, Libp2pAdapter<Tx>>,
)),
#[cfg(feature = "waku")]
Arc::new(Box::new(bridges::waku_add_conn_bridge)),
];
let app = OverwatchRunner::<Nomos>::run(
NomosServiceSettings {
@ -83,7 +69,6 @@ fn main() -> Result<()> {
bridges: HttpBridgeSettings { bridges },
#[cfg(feature = "metrics")]
metrics: config.metrics,
#[cfg(feature = "libp2p")]
da: config.da,
},
None,

View File

@ -7,13 +7,13 @@ publish = false
[dependencies]
nomos-node = { path = "../nodes/nomos-node", default-features = false }
nomos-consensus = { path = "../nomos-services/consensus" }
nomos-network = { path = "../nomos-services/network" }
nomos-network = { path = "../nomos-services/network", features = ["libp2p"]}
nomos-log = { path = "../nomos-services/log" }
nomos-http = { path = "../nomos-services/http", features = ["http"] }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
nomos-core = { path = "../nomos-core" }
consensus-engine = { path = "../consensus-engine", features = ["serde"] }
nomos-mempool = { path = "../nomos-services/mempool", features = ["mock"] }
nomos-mempool = { path = "../nomos-services/mempool", features = ["mock", "libp2p"] }
nomos-da = { path = "../nomos-services/data-availability" }
full-replication = { path = "../nomos-da/full-replication" }
mixnode = { path = "../nodes/mixnode" }
@ -27,7 +27,7 @@ once_cell = "1"
secp256k1 = { version = "0.26", features = ["rand"] }
waku-bindings = { version = "0.1.1", optional = true }
reqwest = { version = "0.11", features = ["json"] }
nomos-libp2p = { path = "../nomos-libp2p", optional = true }
nomos-libp2p = { path = "../nomos-libp2p" }
tempfile = "3.6"
serde_yaml = "0.9"
tokio = "1"
@ -57,5 +57,3 @@ harness = false
[features]
metrics = ["nomos-node/metrics"]
waku = ["nomos-network/waku", "nomos-mempool/waku", "nomos-node/waku", "waku-bindings"]
libp2p = ["nomos-network/libp2p", "nomos-mempool/libp2p", "nomos-libp2p", "nomos-node/libp2p"]

View File

@ -6,23 +6,18 @@ use std::time::Duration;
use crate::{get_available_port, Node, SpawnConfig};
use consensus_engine::overlay::{FlatOverlaySettings, RoundRobin};
use consensus_engine::NodeId;
#[cfg(feature = "libp2p")]
use mixnet_client::{MixnetClientConfig, MixnetClientMode};
use mixnet_node::MixnetNodeConfig;
use mixnet_topology::MixnetTopology;
use nomos_consensus::{CarnotInfo, CarnotSettings};
use nomos_http::backends::axum::AxumBackendSettings;
#[cfg(feature = "libp2p")]
use nomos_libp2p::Multiaddr;
use nomos_log::{LoggerBackend, LoggerFormat};
#[cfg(feature = "libp2p")]
use nomos_network::backends::libp2p::{Libp2pConfig, Libp2pInfo};
#[cfg(feature = "waku")]
use nomos_network::backends::waku::{WakuConfig, WakuInfo};
use nomos_network::NetworkConfig;
use nomos_node::Config;
#[cfg(feature = "waku")]
use waku_bindings::{Multiaddr, PeerId};
// crates
use fraction::Fraction;
use once_cell::sync::Lazy;
@ -104,33 +99,6 @@ impl NomosNode {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
#[cfg(feature = "waku")]
pub async fn peer_id(&self) -> PeerId {
self.get(NETWORK_INFO_API)
.await
.unwrap()
.json::<WakuInfo>()
.await
.unwrap()
.peer_id
.unwrap()
}
#[cfg(feature = "waku")]
pub async fn get_listening_address(&self) -> Multiaddr {
self.get(NETWORK_INFO_API)
.await
.unwrap()
.json::<WakuInfo>()
.await
.unwrap()
.listen_addresses
.unwrap()
.swap_remove(0)
}
#[cfg(feature = "libp2p")]
pub async fn get_listening_address(&self) -> Multiaddr {
self.get(NETWORK_INFO_API)
.await
@ -229,12 +197,9 @@ fn create_node_config(
private_key: [u8; 32],
threshold: Fraction,
timeout: Duration,
#[cfg(feature = "libp2p")] mixnet_node_config: Option<MixnetNodeConfig>,
#[cfg(feature = "waku")] _mixnet_node_config: Option<MixnetNodeConfig>,
#[cfg(feature = "libp2p")] mixnet_topology: MixnetTopology,
#[cfg(feature = "waku")] _mixnet_topology: MixnetTopology,
mixnet_node_config: Option<MixnetNodeConfig>,
mixnet_topology: MixnetTopology,
) -> Config {
#[cfg(feature = "libp2p")]
let mixnet_client_mode = match mixnet_node_config {
Some(node_config) => MixnetClientMode::SenderReceiver(node_config.client_listen_address),
None => MixnetClientMode::Sender,
@ -242,12 +207,6 @@ fn create_node_config(
let mut config = Config {
network: NetworkConfig {
#[cfg(feature = "waku")]
backend: WakuConfig {
inner: Default::default(),
initial_peers: vec![],
},
#[cfg(feature = "libp2p")]
backend: Libp2pConfig {
inner: Default::default(),
initial_peers: vec![],
@ -286,7 +245,6 @@ fn create_node_config(
},
#[cfg(feature = "metrics")]
metrics: Default::default(),
#[cfg(feature = "libp2p")]
da: nomos_da::Settings {
da_protocol: full_replication::Settings {
num_attestations: 1,
@ -297,14 +255,8 @@ fn create_node_config(
},
},
};
#[cfg(feature = "waku")]
{
config.network.backend.inner.port = Some(get_available_port() as usize);
}
#[cfg(feature = "libp2p")]
{
config.network.backend.inner.port = get_available_port();
}
config.network.backend.inner.port = get_available_port();
config
}