From 78c6566d8a795f7fac3180a820bb6206ad73287f Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Wed, 9 Aug 2023 07:42:08 +0200 Subject: [PATCH] Add libp2p backend to nomos-node (#285) * Add support for libp2p backend in integration tests * Add support for libp2p in nomos-node * change default to waku * add mutually exclusive features warning * disable default features to avoid unification * disable default features * remove leftover cargo build * Make sure we are subscribed to libp2p topic at startup * unify imports * typo in ci config * Sequential build and test steps for features * Add RandomBeaconState to libp2p carnot variant --------- Co-authored-by: Gusto --- ci/Jenkinsfile.nightly.integration | 2 +- ci/Jenkinsfile.prs.linux | 26 +- ci/Jenkinsfile.prs.macos | 30 +- nodes/nomos-node/Cargo.toml | 24 +- nodes/nomos-node/src/bridges.rs | 296 ------------------ nodes/nomos-node/src/bridges/libp2p.rs | 27 ++ nodes/nomos-node/src/bridges/mod.rs | 179 +++++++++++ nodes/nomos-node/src/bridges/waku.rs | 130 ++++++++ nodes/nomos-node/src/lib.rs | 45 ++- nodes/nomos-node/src/main.rs | 6 +- .../consensus/src/network/adapters/libp2p.rs | 2 +- tests/Cargo.toml | 11 +- tests/src/nodes/nomos.rs | 49 ++- 13 files changed, 474 insertions(+), 353 deletions(-) delete mode 100644 nodes/nomos-node/src/bridges.rs create mode 100644 nodes/nomos-node/src/bridges/libp2p.rs create mode 100644 nodes/nomos-node/src/bridges/mod.rs create mode 100644 nodes/nomos-node/src/bridges/waku.rs diff --git a/ci/Jenkinsfile.nightly.integration b/ci/Jenkinsfile.nightly.integration index e7b9b499..9c5646cb 100644 --- a/ci/Jenkinsfile.nightly.integration +++ b/ci/Jenkinsfile.nightly.integration @@ -89,7 +89,7 @@ def runTestCases(test_cases, iterations) { echo "Running iteration ${i + 1} of ${iterations}" for (test_case in test_cases) { - def test_cmd = "cargo test -p tests --features ${feature} ${test_case}" + def test_cmd = "cargo test -p tests --no-default-features --features ${feature} ${test_case}" if (sh(script: test_cmd, returnStatus: true) != 0) { error("Test '${test_case}' failed on iteration ${i + 1}") return diff --git a/ci/Jenkinsfile.prs.linux b/ci/Jenkinsfile.prs.linux index d5cd9649..0334b3c0 100644 --- a/ci/Jenkinsfile.prs.linux +++ b/ci/Jenkinsfile.prs.linux @@ -32,22 +32,28 @@ pipeline { stages { stage('Check') { steps { - sh "cargo check --all --features ${FEATURES}" + sh "cargo check --all --no-default-features --features ${FEATURES}" sh "cargo fmt -- --check" - sh "cargo clippy --all --features ${FEATURES} -- --deny warnings" + sh "cargo clippy --all --no-default-features --features ${FEATURES} -- --deny warnings" } } - stage('Build') { - steps { - sh "cargo build" - sh "cargo build --all --features ${FEATURES}" + stage('BuildAndTest') { + options { + lock("sync-features") } - } + stages { + stage('Build') { + steps { + sh "cargo build --all --no-default-features --features ${FEATURES}" + } + } - stage('Test') { - steps { - sh "cargo test --all --features ${FEATURES}" + stage('Test') { + steps { + sh "cargo test --all --no-default-features --features ${FEATURES}" + } + } } } } diff --git a/ci/Jenkinsfile.prs.macos b/ci/Jenkinsfile.prs.macos index ad1d50fe..587b5a7f 100644 --- a/ci/Jenkinsfile.prs.macos +++ b/ci/Jenkinsfile.prs.macos @@ -31,23 +31,29 @@ pipeline { stages { stage('Check') { steps { script { - nix.shell("cargo check --all --features ${FEATURES}") + nix.shell("cargo check --all --no-default-features --features ${FEATURES}") nix.shell("cargo fmt -- --check") - nix.shell("cargo clippy --all --features ${FEATURES} -- --deny warnings") + nix.shell("cargo clippy --all --no-default-features --features ${FEATURES} -- --deny warnings") } } } - stage('Build') { - steps { script { - nix.shell("cargo build") - nix.shell("cargo build --all --features ${FEATURES}") - } } - } + stage('BuildAndTest') { + options { + lock('sync-features') + } + stages { + stage('Build') { + steps { script { + nix.shell("cargo build --all --no-default-features --features ${FEATURES}") + } } + } - stage('Test') { - steps { script { - nix.shell("cargo test --all --features ${FEATURES}") - } } + stage('Test') { + steps { script { + nix.shell("cargo test --all --no-default-features --features ${FEATURES}") + } } + } + } } } } diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index cc1bef0b..ea500fb4 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -16,21 +16,13 @@ http = "0.2.9" overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" } tracing = "0.1" -multiaddr = "0.17" +multiaddr = "0.18" nomos-core = { path = "../../nomos-core" } -nomos-network = { path = "../../nomos-services/network", features = [ - "waku", - "libp2p", -] } +nomos-network = { path = "../../nomos-services/network" } nomos-log = { path = "../../nomos-services/log" } -nomos-mempool = { path = "../../nomos-services/mempool", features = [ - "waku", - "mock", -] } +nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock"] } nomos-http = { path = "../../nomos-services/http", features = ["http"] } -nomos-consensus = { path = "../../nomos-services/consensus", features = [ - "waku", -] } +nomos-consensus = { path = "../../nomos-services/consensus" } metrics = { path = "../../nomos-services/metrics", optional = true } tracing-subscriber = "0.3" consensus-engine = { path = "../../consensus-engine" } @@ -39,4 +31,10 @@ serde_json = "1.0" serde_yaml = "0.9" color-eyre = "0.6.0" serde = "1" -waku-bindings = "0.1.1" +waku-bindings = { version = "0.1.1", optional = true } + + +[features] +default = ["waku"] +waku = ["waku-bindings", "nomos-network/waku", "nomos-mempool/waku", "nomos-consensus/waku"] +libp2p = ["nomos-network/libp2p", "nomos-mempool/libp2p", "nomos-consensus/libp2p"] diff --git a/nodes/nomos-node/src/bridges.rs b/nodes/nomos-node/src/bridges.rs deleted file mode 100644 index 2f1d96d2..00000000 --- a/nodes/nomos-node/src/bridges.rs +++ /dev/null @@ -1,296 +0,0 @@ -// std -// crates -use bytes::Bytes; -use http::StatusCode; -use nomos_consensus::{CarnotInfo, ConsensusMsg}; - -use tokio::sync::mpsc::Sender; -use tokio::sync::oneshot; -use tracing::error; -// internal -use futures::future::join_all; -use multiaddr::Multiaddr; -use nomos_core::wire; -use nomos_http::backends::axum::AxumBackend; -use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; -use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse}; -use nomos_mempool::backend::mockpool::MockPool; -use nomos_mempool::network::adapters::waku::{ - WakuAdapter, WAKU_CARNOT_PUB_SUB_TOPIC, WAKU_CARNOT_TX_CONTENT_TOPIC, -}; -use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; -use nomos_network::backends::waku::{Waku, WakuBackendMessage, WakuInfo}; -use nomos_network::{NetworkMsg, NetworkService}; -use nomos_node::{Carnot, Tx}; -use overwatch_rs::services::relay::OutboundRelay; -use waku_bindings::WakuMessage; - -pub fn carnot_info_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - let (carnot_channel, mut http_request_channel) = - build_http_bridge::(handle, HttpMethod::GET, "info") - .await - .unwrap(); - - while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await { - if let Err(e) = handle_carnot_info_req(&carnot_channel, &res_tx).await { - error!(e); - } - } - - Ok(()) - })) -} - -pub fn mempool_metrics_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - let (mempool_channel, mut http_request_channel) = - build_http_bridge::, MockPool>, AxumBackend, _>( - handle, - HttpMethod::GET, - "metrics", - ) - .await - .unwrap(); - - while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await { - if let Err(e) = handle_mempool_metrics_req(&mempool_channel, res_tx).await { - error!(e); - } - } - Ok(()) - })) -} - -pub fn mempool_add_tx_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - let (mempool_channel, mut http_request_channel) = - build_http_bridge::, MockPool>, AxumBackend, _>( - handle.clone(), - HttpMethod::POST, - "addtx", - ) - .await - .unwrap(); - - while let Some(HttpRequest { - res_tx, payload, .. - }) = http_request_channel.recv().await - { - if let Err(e) = - handle_mempool_add_tx_req(&handle, &mempool_channel, res_tx, payload).await - { - error!(e); - } - } - - Ok(()) - })) -} - -pub fn waku_info_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - let (waku_channel, mut http_request_channel) = build_http_bridge::< - NetworkService, - AxumBackend, - _, - >(handle, HttpMethod::GET, "info") - .await - .unwrap(); - - while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await { - if let Err(e) = handle_waku_info_req(&waku_channel, &res_tx).await { - error!(e); - } - } - Ok(()) - })) -} - -pub fn waku_add_conn_bridge( - handle: overwatch_rs::overwatch::handle::OverwatchHandle, -) -> HttpBridgeRunner { - Box::new(Box::pin(async move { - let (waku_channel, mut http_request_channel) = build_http_bridge::< - NetworkService, - AxumBackend, - _, - >(handle, HttpMethod::POST, "conn") - .await - .unwrap(); - - while let Some(HttpRequest { - res_tx, payload, .. - }) = http_request_channel.recv().await - { - if let Err(e) = handle_add_conn_req(&waku_channel, res_tx, payload).await { - error!(e); - } - } - Ok(()) - })) -} - -async fn handle_carnot_info_req( - carnot_channel: &OutboundRelay, - res_tx: &Sender, -) -> Result<(), overwatch_rs::DynError> { - let (sender, receiver) = oneshot::channel(); - carnot_channel - .send(ConsensusMsg::Info { tx: sender }) - .await - .map_err(|(e, _)| e)?; - let carnot_info: CarnotInfo = receiver.await.unwrap(); - res_tx - .send(Ok(serde_json::to_vec(&carnot_info)?.into())) - .await?; - - Ok(()) -} - -async fn handle_mempool_metrics_req( - mempool_channel: &OutboundRelay>, - res_tx: Sender, -) -> Result<(), overwatch_rs::DynError> { - let (sender, receiver) = oneshot::channel(); - mempool_channel - .send(MempoolMsg::Metrics { - reply_channel: sender, - }) - .await - .map_err(|(e, _)| e)?; - - let metrics: MempoolMetrics = receiver.await.unwrap(); - res_tx - // TODO: use serde to serialize metrics - .send(Ok(format!( - "{{\"pending_tx\": {}, \"last_tx\": {}}}", - metrics.pending_txs, metrics.last_tx_timestamp - ) - .into())) - .await?; - - Ok(()) -} - -async fn handle_mempool_add_tx_req( - handle: &overwatch_rs::overwatch::handle::OverwatchHandle, - mempool_channel: &OutboundRelay>, - res_tx: Sender, - payload: Option, -) -> Result<(), overwatch_rs::DynError> { - if let Some(data) = payload - .as_ref() - .and_then(|b| String::from_utf8(b.to_vec()).ok()) - { - let tx = Tx(data); - let (sender, receiver) = oneshot::channel(); - mempool_channel - .send(MempoolMsg::AddTx { - tx: tx.clone(), - reply_channel: sender, - }) - .await - .map_err(|(e, _)| e)?; - - match receiver.await { - Ok(Ok(())) => { - // broadcast transaction to peers - let network_relay = handle.relay::>().connect().await?; - send_transaction(network_relay, tx).await?; - Ok(res_tx.send(Ok(b"".to_vec().into())).await?) - } - Ok(Err(())) => Ok(res_tx - .send(Err(( - StatusCode::CONFLICT, - "error: unable to add tx".into(), - ))) - .await?), - Err(err) => Ok(res_tx - .send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))) - .await?), - } - } else { - Err( - format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String") - .into(), - ) - } -} - -async fn handle_waku_info_req( - waku_channel: &OutboundRelay>, - res_tx: &Sender, -) -> Result<(), overwatch_rs::DynError> { - let (sender, receiver) = oneshot::channel(); - waku_channel - .send(NetworkMsg::Process(WakuBackendMessage::Info { - reply_channel: sender, - })) - .await - .map_err(|(e, _)| e)?; - let waku_info: WakuInfo = receiver.await.unwrap(); - res_tx - .send(Ok(serde_json::to_vec(&waku_info)?.into())) - .await?; - - Ok(()) -} - -async fn handle_add_conn_req( - waku_channel: &OutboundRelay>, - res_tx: Sender, - payload: Option, -) -> Result<(), overwatch_rs::DynError> { - if let Some(payload) = payload { - if let Ok(addrs) = serde_json::from_slice::>(&payload) { - let reqs: Vec<_> = addrs - .into_iter() - .map(|addr| { - waku_channel.send(NetworkMsg::Process(WakuBackendMessage::ConnectPeer { - addr, - })) - }) - .collect(); - - join_all(reqs).await; - } - Ok(res_tx.send(Ok(b"".to_vec().into())).await?) - } else { - Err( - format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String") - .into(), - ) - } -} - -async fn send_transaction( - network_relay: OutboundRelay>, - tx: Tx, -) -> Result<(), overwatch_rs::DynError> { - let payload = wire::serialize(&tx).expect("Tx serialization failed"); - network_relay - .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { - message: WakuMessage::new( - payload, - WAKU_CARNOT_TX_CONTENT_TOPIC.clone(), - 1, - chrono::Utc::now().timestamp_nanos() as usize, - [], - false, - ), - topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), - })) - .await - .map_err(|(e, _)| e)?; - - Ok(()) -} diff --git a/nodes/nomos-node/src/bridges/libp2p.rs b/nodes/nomos-node/src/bridges/libp2p.rs new file mode 100644 index 00000000..f52dd107 --- /dev/null +++ b/nodes/nomos-node/src/bridges/libp2p.rs @@ -0,0 +1,27 @@ +// std +// crates +use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot; +// internal +use nomos_http::http::HttpResponse; +use nomos_network::backends::libp2p::{Command, Libp2p}; +use nomos_network::NetworkMsg; +use overwatch_rs::services::relay::OutboundRelay; + +#[cfg(feature = "libp2p")] +pub(super) async fn handle_libp2p_info_req( + channel: &OutboundRelay>, + res_tx: Sender, +) -> Result<(), overwatch_rs::DynError> { + let (sender, receiver) = oneshot::channel(); + + channel + .send(NetworkMsg::Process(Command::Info { reply: sender })) + .await + .map_err(|(e, _)| e)?; + + let info = receiver.await.unwrap(); + res_tx.send(Ok(serde_json::to_vec(&info)?.into())).await?; + + Ok(()) +} diff --git a/nodes/nomos-node/src/bridges/mod.rs b/nodes/nomos-node/src/bridges/mod.rs new file mode 100644 index 00000000..971616fd --- /dev/null +++ b/nodes/nomos-node/src/bridges/mod.rs @@ -0,0 +1,179 @@ +// std +// crates +use nomos_consensus::{CarnotInfo, ConsensusMsg}; + +use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot; +use tracing::error; +// internal +use nomos_http::backends::axum::AxumBackend; +use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; +use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse}; +use nomos_mempool::backend::mockpool::MockPool; + +use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; + +#[cfg(feature = "libp2p")] +use nomos_mempool::network::adapters::libp2p::Libp2pAdapter; +#[cfg(feature = "waku")] +use nomos_mempool::network::adapters::waku::WakuAdapter; +#[cfg(feature = "libp2p")] +use nomos_network::backends::libp2p::Libp2p; +#[cfg(feature = "waku")] +use nomos_network::backends::waku::Waku; +use nomos_network::NetworkService; +use nomos_node::{Carnot, Tx}; +use overwatch_rs::services::relay::OutboundRelay; + +#[cfg(feature = "waku")] +mod waku; +#[cfg(feature = "waku")] +use waku::*; +#[cfg(feature = "libp2p")] +mod libp2p; +#[cfg(feature = "libp2p")] +use libp2p::*; + +macro_rules! get_handler { + ($handle:expr, $service:ty, $path:expr => $handler:tt) => {{ + let (channel, mut http_request_channel) = + build_http_bridge::<$service, AxumBackend, _>($handle, HttpMethod::GET, $path) + .await + .unwrap(); + while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await { + if let Err(e) = $handler(&channel, res_tx).await { + error!(e); + } + } + Ok(()) + }}; +} + +pub fn carnot_info_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner { + Box::new(Box::pin(async move { + get_handler!(handle, Carnot, "info" => handle_carnot_info_req) + })) +} + +pub fn mempool_metrics_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner { + Box::new(Box::pin(async move { + #[cfg(feature = "waku")] + { + get_handler!(handle, MempoolService, MockPool>, "metrics" => handle_mempool_metrics_req) + } + #[cfg(feature = "libp2p")] + get_handler!(handle, MempoolService, MockPool>, "metrics" => handle_mempool_metrics_req) + })) +} + +pub fn network_info_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner { + Box::new(Box::pin(async move { + #[cfg(feature = "waku")] + { + get_handler!(handle, NetworkService, "info" => handle_waku_info_req) + } + #[cfg(feature = "libp2p")] + get_handler!(handle, NetworkService, "info" => handle_libp2p_info_req) + })) +} + +#[cfg(feature = "waku")] +pub fn mempool_add_tx_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner { + Box::new(Box::pin(async move { + let (mempool_channel, mut http_request_channel) = + build_http_bridge::, MockPool>, AxumBackend, _>( + handle.clone(), + HttpMethod::POST, + "addtx", + ) + .await + .unwrap(); + + while let Some(HttpRequest { + res_tx, payload, .. + }) = http_request_channel.recv().await + { + if let Err(e) = + handle_mempool_add_tx_req(&handle, &mempool_channel, res_tx, payload).await + { + error!(e); + } + } + Ok(()) + })) +} + +#[cfg(feature = "waku")] +pub fn waku_add_conn_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner { + Box::new(Box::pin(async move { + let (waku_channel, mut http_request_channel) = build_http_bridge::< + NetworkService, + AxumBackend, + _, + >(handle, HttpMethod::POST, "conn") + .await + .unwrap(); + + while let Some(HttpRequest { + res_tx, payload, .. + }) = http_request_channel.recv().await + { + if let Err(e) = handle_add_conn_req(&waku_channel, res_tx, payload).await { + error!(e); + } + } + Ok(()) + })) +} + +async fn handle_carnot_info_req( + carnot_channel: &OutboundRelay, + res_tx: Sender, +) -> Result<(), overwatch_rs::DynError> { + let (sender, receiver) = oneshot::channel(); + carnot_channel + .send(ConsensusMsg::Info { tx: sender }) + .await + .map_err(|(e, _)| e)?; + let carnot_info: CarnotInfo = receiver.await.unwrap(); + res_tx + .send(Ok(serde_json::to_vec(&carnot_info)?.into())) + .await?; + + Ok(()) +} + +async fn handle_mempool_metrics_req( + mempool_channel: &OutboundRelay>, + res_tx: Sender, +) -> Result<(), overwatch_rs::DynError> { + let (sender, receiver) = oneshot::channel(); + mempool_channel + .send(MempoolMsg::Metrics { + reply_channel: sender, + }) + .await + .map_err(|(e, _)| e)?; + + let metrics: MempoolMetrics = receiver.await.unwrap(); + res_tx + // TODO: use serde to serialize metrics + .send(Ok(format!( + "{{\"pending_tx\": {}, \"last_tx\": {}}}", + metrics.pending_txs, metrics.last_tx_timestamp + ) + .into())) + .await?; + + Ok(()) +} diff --git a/nodes/nomos-node/src/bridges/waku.rs b/nodes/nomos-node/src/bridges/waku.rs new file mode 100644 index 00000000..3c7cf557 --- /dev/null +++ b/nodes/nomos-node/src/bridges/waku.rs @@ -0,0 +1,130 @@ +use bytes::Bytes; +use http::StatusCode; +use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot; +// internal +use futures::future::join_all; +use nomos_core::wire; +use nomos_http::http::HttpResponse; +use nomos_mempool::network::adapters::waku::{ + WAKU_CARNOT_PUB_SUB_TOPIC, WAKU_CARNOT_TX_CONTENT_TOPIC, +}; +use nomos_mempool::MempoolMsg; +use nomos_network::backends::waku::{Waku, WakuBackendMessage}; +use nomos_network::{NetworkMsg, NetworkService}; +use nomos_node::Tx; +use overwatch_rs::services::relay::OutboundRelay; +use waku_bindings::{Multiaddr, WakuMessage}; + +pub(super) async fn handle_mempool_add_tx_req( + handle: &overwatch_rs::overwatch::handle::OverwatchHandle, + mempool_channel: &OutboundRelay>, + res_tx: Sender, + payload: Option, +) -> Result<(), overwatch_rs::DynError> { + if let Some(data) = payload + .as_ref() + .and_then(|b| String::from_utf8(b.to_vec()).ok()) + { + let tx = Tx(data); + let (sender, receiver) = oneshot::channel(); + mempool_channel + .send(MempoolMsg::AddTx { + tx: tx.clone(), + reply_channel: sender, + }) + .await + .map_err(|(e, _)| e)?; + + match receiver.await { + Ok(Ok(())) => { + // broadcast transaction to peers + let network_relay = handle.relay::>().connect().await?; + send_transaction(network_relay, tx).await?; + Ok(res_tx.send(Ok(b"".to_vec().into())).await?) + } + Ok(Err(())) => Ok(res_tx + .send(Err(( + StatusCode::CONFLICT, + "error: unable to add tx".into(), + ))) + .await?), + Err(err) => Ok(res_tx + .send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))) + .await?), + } + } else { + Err( + format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String") + .into(), + ) + } +} + +pub(super) async fn handle_waku_info_req( + channel: &OutboundRelay>, + res_tx: Sender, +) -> Result<(), overwatch_rs::DynError> { + let (sender, receiver) = oneshot::channel(); + + channel + .send(NetworkMsg::Process(WakuBackendMessage::Info { + reply_channel: sender, + })) + .await + .map_err(|(e, _)| e)?; + let info = receiver.await.unwrap(); + res_tx.send(Ok(serde_json::to_vec(&info)?.into())).await?; + + Ok(()) +} + +pub(super) async fn handle_add_conn_req( + waku_channel: &OutboundRelay>, + res_tx: Sender, + payload: Option, +) -> Result<(), overwatch_rs::DynError> { + if let Some(payload) = payload { + if let Ok(addrs) = serde_json::from_slice::>(&payload) { + let reqs: Vec<_> = addrs + .into_iter() + .map(|addr| { + waku_channel.send(NetworkMsg::Process(WakuBackendMessage::ConnectPeer { + addr, + })) + }) + .collect(); + + join_all(reqs).await; + } + Ok(res_tx.send(Ok(b"".to_vec().into())).await?) + } else { + Err( + format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String") + .into(), + ) + } +} + +pub(super) async fn send_transaction( + network_relay: OutboundRelay>, + tx: Tx, +) -> Result<(), overwatch_rs::DynError> { + let payload = wire::serialize(&tx).expect("Tx serialization failed"); + network_relay + .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { + message: WakuMessage::new( + payload, + WAKU_CARNOT_TX_CONTENT_TOPIC.clone(), + 1, + chrono::Utc::now().timestamp_nanos() as usize, + [], + false, + ), + topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), + })) + .await + .map_err(|(e, _)| e)?; + + Ok(()) +} diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 44a79bf6..204e5bf2 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -4,35 +4,49 @@ use color_eyre::eyre::Result; use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin}; #[cfg(feature = "metrics")] use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService}; -use nomos_consensus::{ - network::adapters::waku::WakuAdapter as ConsensusWakuAdapter, CarnotConsensus, -}; +#[cfg(feature = "libp2p")] +use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter; +#[cfg(feature = "waku")] +use nomos_consensus::network::adapters::waku::WakuAdapter as ConsensusWakuAdapter; +use nomos_consensus::CarnotConsensus; use nomos_core::fountain::mock::MockFountain; use nomos_http::backends::axum::AxumBackend; use nomos_http::bridge::HttpBridgeService; use nomos_http::http::HttpService; use nomos_log::Logger; -use nomos_mempool::{ - backend::mockpool::MockPool, network::adapters::waku::WakuAdapter as MempoolWakuAdapter, - MempoolService, -}; -use nomos_network::{backends::waku::Waku, NetworkService}; +#[cfg(feature = "libp2p")] +use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter; +#[cfg(feature = "waku")] +use nomos_mempool::network::adapters::waku::WakuAdapter as MempoolWakuAdapter; +use nomos_mempool::{backend::mockpool::MockPool, MempoolService}; +#[cfg(feature = "libp2p")] +use nomos_network::backends::libp2p::Libp2p; +#[cfg(feature = "waku")] +use nomos_network::backends::waku::Waku; +use nomos_network::NetworkService; use overwatch_derive::*; use overwatch_rs::services::{handle::ServiceHandle, ServiceData}; use serde::{Deserialize, Serialize}; pub use tx::Tx; +#[cfg(all(feature = "waku", feature = "libp2p"))] +compile_error!("feature \"waku\" and feature \"libp2p\" cannot be enabled at the same time"); + #[derive(Deserialize, Debug, Clone, Serialize)] pub struct Config { pub log: ::Settings, + #[cfg(feature = "waku")] pub network: as ServiceData>::Settings, + #[cfg(feature = "libp2p")] + pub network: as ServiceData>::Settings, pub http: as ServiceData>::Settings, pub consensus: ::Settings, #[cfg(feature = "metrics")] pub metrics: > as ServiceData>::Settings, } +#[cfg(feature = "waku")] pub type Carnot = CarnotConsensus< ConsensusWakuAdapter, MockPool, @@ -41,11 +55,26 @@ pub type Carnot = CarnotConsensus< FlatOverlay, >; +#[cfg(feature = "libp2p")] +pub type Carnot = CarnotConsensus< + ConsensusLibp2pAdapter, + MockPool, + MempoolLibp2pAdapter, + MockFountain, + FlatOverlay, +>; + #[derive(Services)] pub struct Nomos { logging: ServiceHandle, + #[cfg(feature = "waku")] network: ServiceHandle>, + #[cfg(feature = "libp2p")] + network: ServiceHandle>, + #[cfg(feature = "waku")] mockpool: ServiceHandle, MockPool>>, + #[cfg(feature = "libp2p")] + mockpool: ServiceHandle, MockPool>>, consensus: ServiceHandle, http: ServiceHandle>, bridges: ServiceHandle, diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index f3a75552..52377929 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -21,10 +21,12 @@ fn main() -> Result<()> { let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?; let bridges: Vec = vec![ Arc::new(Box::new(bridges::carnot_info_bridge)), - Arc::new(Box::new(bridges::mempool_add_tx_bridge)), Arc::new(Box::new(bridges::mempool_metrics_bridge)), + Arc::new(Box::new(bridges::network_info_bridge)), + #[cfg(feature = "waku")] + Arc::new(Box::new(bridges::mempool_add_tx_bridge)), + #[cfg(feature = "waku")] Arc::new(Box::new(bridges::waku_add_conn_bridge)), - Arc::new(Box::new(bridges::waku_info_bridge)), ]; let app = OverwatchRunner::::run( NomosServiceSettings { diff --git a/nomos-services/consensus/src/network/adapters/libp2p.rs b/nomos-services/consensus/src/network/adapters/libp2p.rs index 03af2670..e92f5a87 100644 --- a/nomos-services/consensus/src/network/adapters/libp2p.rs +++ b/nomos-services/consensus/src/network/adapters/libp2p.rs @@ -230,9 +230,9 @@ impl NetworkAdapter for Libp2pAdapter { let message_cache = MessageCache::new(); let cache = message_cache.clone(); let relay = network_relay.clone(); + Self::subscribe(&relay, TOPIC).await; // TODO: maybe we need the runtime handle here? tokio::spawn(async move { - Self::subscribe(&relay, TOPIC).await; let (sender, receiver) = tokio::sync::oneshot::channel(); if let Err((e, _)) = relay .send(NetworkMsg::Subscribe { diff --git a/tests/Cargo.toml b/tests/Cargo.toml index b3bf7f81..de28f80d 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -5,21 +5,22 @@ edition = "2021" publish = false [dependencies] -nomos-node = { path = "../nodes/nomos-node" } +nomos-node = { path = "../nodes/nomos-node", default-features = false } nomos-consensus = { path = "../nomos-services/consensus" } -nomos-network = { path = "../nomos-services/network", features = ["waku"] } +nomos-network = { path = "../nomos-services/network" } nomos-log = { path = "../nomos-services/log" } nomos-http = { path = "../nomos-services/http", features = ["http"] } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } nomos-core = { path = "../nomos-core" } consensus-engine = { path = "../consensus-engine", features = ["serde"] } -nomos-mempool = { path = "../nomos-services/mempool", features = ["waku", "mock"] } +nomos-mempool = { path = "../nomos-services/mempool", features = ["mock"] } rand = "0.8" once_cell = "1" rand_xoshiro = "0.6" secp256k1 = { version = "0.26", features = ["rand"] } -waku-bindings = "0.1.1" +waku-bindings = { version = "0.1.1", optional = true } reqwest = { version = "0.11", features = ["json"] } +nomos-libp2p = { path = "../nomos-libp2p", optional = true } tempfile = "3.6" serde_yaml = "0.9" tokio = "1" @@ -38,3 +39,5 @@ path = "src/tests/unhappy.rs" [features] metrics = ["nomos-node/metrics"] +waku = ["nomos-network/waku", "nomos-mempool/waku", "waku-bindings"] +libp2p = ["nomos-network/libp2p", "nomos-mempool/libp2p", "nomos-libp2p"] \ No newline at end of file diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 58179231..8009f521 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -8,12 +8,16 @@ use consensus_engine::overlay::{FlatOverlaySettings, RoundRobin}; use consensus_engine::NodeId; use nomos_consensus::{CarnotInfo, CarnotSettings}; use nomos_http::backends::axum::AxumBackendSettings; +#[cfg(feature = "libp2p")] +use nomos_libp2p::{Multiaddr, SwarmConfig}; use nomos_log::{LoggerBackend, LoggerFormat}; -use nomos_network::{ - backends::waku::{WakuConfig, WakuInfo}, - NetworkConfig, -}; +#[cfg(feature = "libp2p")] +use nomos_network::backends::libp2p::Libp2pInfo; +#[cfg(feature = "waku")] +use nomos_network::backends::waku::{WakuConfig, WakuInfo}; +use nomos_network::NetworkConfig; use nomos_node::Config; +#[cfg(feature = "waku")] use waku_bindings::{Multiaddr, PeerId}; // crates use fraction::Fraction; @@ -75,7 +79,12 @@ impl NomosNode { child, _tempdir: dir, }; - node.wait_online().await; + tokio::time::timeout(std::time::Duration::from_secs(10), async { + node.wait_online().await + }) + .await + .unwrap(); + node } @@ -92,6 +101,7 @@ impl NomosNode { } } + #[cfg(feature = "waku")] pub async fn peer_id(&self) -> PeerId { self.get(NETWORK_INFO_API) .await @@ -103,6 +113,7 @@ impl NomosNode { .unwrap() } + #[cfg(feature = "waku")] pub async fn get_listening_address(&self) -> Multiaddr { self.get(NETWORK_INFO_API) .await @@ -115,6 +126,18 @@ impl NomosNode { .swap_remove(0) } + #[cfg(feature = "libp2p")] + pub async fn get_listening_address(&self) -> Multiaddr { + self.get(NETWORK_INFO_API) + .await + .unwrap() + .json::() + .await + .unwrap() + .listen_addresses + .swap_remove(0) + } + // not async so that we can use this in `Drop` pub fn get_logs_from_file(&self) -> String { println!( @@ -200,10 +223,16 @@ fn create_node_config( ) -> Config { let mut config = Config { network: NetworkConfig { + #[cfg(feature = "waku")] backend: WakuConfig { initial_peers: vec![], inner: Default::default(), }, + #[cfg(feature = "libp2p")] + backend: SwarmConfig { + initial_peers: vec![], + ..Default::default() + }, }, consensus: CarnotSettings { private_key, @@ -230,6 +259,14 @@ fn create_node_config( #[cfg(feature = "metrics")] metrics: Default::default(), }; - config.network.backend.inner.port = Some(get_available_port() as usize); + #[cfg(feature = "waku")] + { + config.network.backend.inner.port = Some(get_available_port() as usize); + } + #[cfg(feature = "libp2p")] + { + config.network.backend.port = get_available_port(); + } + config }