From d4491140443eb2b52171ede1c6ed663b0ceb5c70 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Tue, 19 Mar 2024 19:15:09 +0900 Subject: [PATCH] Replace mixnet backend with mixnet addon (#615) --- .github/workflows/build-test.yml | 12 +- mixnet/src/crypto.rs | 6 + mixnet/src/lib.rs | 2 + nodes/nomos-node/Cargo.toml | 24 +- nodes/nomos-node/src/api.rs | 6 +- nodes/nomos-node/src/config.rs | 7 +- nodes/nomos-node/src/lib.rs | 10 +- nodes/nomos-node/src/main.rs | 2 +- nomos-cli/Cargo.toml | 11 +- nomos-cli/src/cmds/chat/mod.rs | 3 - nomos-cli/src/cmds/disseminate/mod.rs | 3 - nomos-cli/src/da/disseminate.rs | 5 +- nomos-services/api/Cargo.toml | 3 +- nomos-services/api/src/http/cl.rs | 2 +- nomos-services/api/src/http/consensus.rs | 6 +- nomos-services/api/src/http/da.rs | 4 +- nomos-services/carnot-consensus/Cargo.toml | 1 - .../network/adapters/{p2p.rs => libp2p.rs} | 20 +- .../src/network/adapters/mod.rs | 4 +- nomos-services/data-availability/Cargo.toml | 1 - .../network/adapters/{p2p.rs => libp2p.rs} | 16 +- .../src/network/adapters/mod.rs | 4 +- nomos-services/mempool/Cargo.toml | 1 - .../network/adapters/{p2p.rs => libp2p.rs} | 14 +- .../mempool/src/network/adapters/mod.rs | 4 +- nomos-services/network/Cargo.toml | 2 +- .../network/src/backends/libp2p/config.rs | 5 + .../network/src/backends/libp2p/mixnet.rs | 201 ++++++++++++ .../network/src/backends/libp2p/mod.rs | 37 +++ .../network/src/backends/libp2p/swarm.rs | 6 +- .../network/src/backends/mixnet/mod.rs | 291 ------------------ nomos-services/network/src/backends/mod.rs | 5 +- tests/Cargo.toml | 10 +- tests/src/nodes/nomos.rs | 110 ++++++- tests/src/tests/cli.rs | 8 +- tests/src/tests/happy.rs | 2 +- 36 files changed, 432 insertions(+), 416 deletions(-) create mode 100644 mixnet/src/crypto.rs rename nomos-services/carnot-consensus/src/network/adapters/{p2p.rs => libp2p.rs} (96%) rename nomos-services/data-availability/src/network/adapters/{p2p.rs => libp2p.rs} (89%) rename nomos-services/mempool/src/network/adapters/{p2p.rs => libp2p.rs} (87%) create mode 100644 nomos-services/network/src/backends/libp2p/mixnet.rs delete mode 100644 nomos-services/network/src/backends/mixnet/mod.rs diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 362e75c6..c748d1f3 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -16,7 +16,7 @@ jobs: strategy: fail-fast: true matrix: - feature: [libp2p, mixnet] + feature: [libp2p, "libp2p,mixnet"] steps: - uses: actions/checkout@v2 with: @@ -37,7 +37,7 @@ jobs: strategy: fail-fast: false # all OSes should be tested even if one fails (default: true) matrix: - feature: [libp2p, mixnet] + feature: [libp2p, "libp2p,mixnet"] os: [ubuntu-latest, windows-latest, macos-latest] runs-on: ${{ matrix.os }} steps: @@ -70,15 +70,13 @@ jobs: command: build args: --all --no-default-features --features ${{ matrix.feature }} - name: Cargo test (Other OSes) - # TODO: enable tests for mixnet - if: matrix.os != 'windows-latest' && matrix.feature != 'mixnet' + if: matrix.os != 'windows-latest' uses: actions-rs/cargo@v1 with: command: test args: --all --no-default-features --features ${{ matrix.feature }} - name: Cargo test (Windows) - # TODO: enable tests for mixnet - if: matrix.os == 'windows-latest' && matrix.feature != 'mixnet' + if: matrix.os == 'windows-latest' uses: actions-rs/cargo@v1 env: # Because Windows runners in Github Actions tend to be slow. @@ -97,7 +95,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - feature: [libp2p, mixnet] + feature: [libp2p, "libp2p,mixnet"] steps: - uses: actions/checkout@v2 with: diff --git a/mixnet/src/crypto.rs b/mixnet/src/crypto.rs new file mode 100644 index 00000000..158f777a --- /dev/null +++ b/mixnet/src/crypto.rs @@ -0,0 +1,6 @@ +use sphinx_packet::crypto::{PrivateKey, PublicKey, PRIVATE_KEY_SIZE, PUBLIC_KEY_SIZE}; + +/// Converts a mixnode private key to a public key +pub fn public_key_from(private_key: [u8; PRIVATE_KEY_SIZE]) -> [u8; PUBLIC_KEY_SIZE] { + *PublicKey::from(&PrivateKey::from(private_key)).as_bytes() +} diff --git a/mixnet/src/lib.rs b/mixnet/src/lib.rs index c835d847..a80817d5 100644 --- a/mixnet/src/lib.rs +++ b/mixnet/src/lib.rs @@ -6,6 +6,8 @@ pub mod address; /// Mix client pub mod client; +/// Mixnet cryptography +pub mod crypto; /// Mixnet errors pub mod error; mod fragment; diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index d9dc9327..2c58db76 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -20,19 +20,24 @@ overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d tracing = "0.1" multiaddr = "0.18" nomos-core = { path = "../../nomos-core" } -nomos-network = { path = "../../nomos-services/network" } +nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] } nomos-api = { path = "../../nomos-services/api" } nomos-log = { path = "../../nomos-services/log" } nomos-mempool = { path = "../../nomos-services/mempool", features = [ "mock", + "libp2p", "metrics", ] } nomos-metrics = { path = "../../nomos-metrics" } nomos-http = { path = "../../nomos-services/http", features = ["http"] } -carnot-consensus = { path = "../../nomos-services/carnot-consensus" } +carnot-consensus = { path = "../../nomos-services/carnot-consensus", features = [ + "libp2p", +] } nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] } nomos-libp2p = { path = "../../nomos-libp2p" } -nomos-da = { path = "../../nomos-services/data-availability" } +nomos-da = { path = "../../nomos-services/data-availability", features = [ + "libp2p", +] } nomos-system-sig = { path = "../../nomos-services/system-sig" } metrics = { path = "../../nomos-services/metrics", optional = true } tracing-subscriber = "0.3" @@ -53,15 +58,4 @@ tower-http = { version = "0.4", features = ["cors", "trace"] } [features] default = [] -libp2p = [ - "nomos-network/libp2p", - "nomos-mempool/libp2p", - "nomos-da/libp2p", - "carnot-consensus/libp2p", -] -mixnet = [ - "nomos-network/mixnet", - "nomos-mempool/mixnet", - "nomos-da/mixnet", - "carnot-consensus/mixnet", -] +mixnet = ["nomos-network/mixnet"] diff --git a/nodes/nomos-node/src/api.rs b/nodes/nomos-node/src/api.rs index d19046dd..6680eacc 100644 --- a/nodes/nomos-node/src/api.rs +++ b/nodes/nomos-node/src/api.rs @@ -22,12 +22,10 @@ use utoipa_swagger_ui::SwaggerUi; use full_replication::{Blob, Certificate}; use nomos_core::{da::blob, header::HeaderId, tx::Transaction}; use nomos_mempool::{ - network::adapters::p2p::P2pAdapter as MempoolNetworkAdapter, openapi::Status, MempoolMetrics, + network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, openapi::Status, + MempoolMetrics, }; -#[cfg(feature = "libp2p")] use nomos_network::backends::libp2p::Libp2p as NetworkBackend; -#[cfg(feature = "mixnet")] -use nomos_network::backends::mixnet::MixnetNetworkBackend as NetworkBackend; use nomos_storage::backends::StorageSerde; use nomos_api::{ diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index 04db7eba..a84cd739 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -13,7 +13,7 @@ use hex::FromHex; use nomos_api::ApiService; use nomos_libp2p::{secp256k1::SecretKey, Multiaddr}; use nomos_log::{Logger, LoggerBackend, LoggerFormat}; -use nomos_network::backends::libp2p::Libp2p; +use nomos_network::backends::libp2p::Libp2p as NetworkBackend; use nomos_network::NetworkService; use overwatch_rs::services::ServiceData; use serde::{Deserialize, Serialize}; @@ -122,7 +122,7 @@ pub struct MetricsArgs { #[derive(Deserialize, Debug, Clone, Serialize)] pub struct Config { pub log: ::Settings, - pub network: as ServiceData>::Settings, + pub network: as ServiceData>::Settings, pub http: > as ServiceData>::Settings, pub consensus: ::Settings, pub da: ::Settings, @@ -171,6 +171,7 @@ impl Config { } Ok(self) } + pub fn update_network(mut self, network_args: NetworkArgs) -> Result { let NetworkArgs { host, @@ -199,6 +200,8 @@ impl Config { self.network.backend.initial_peers = peers; } + // TODO: configure mixclient and mixnode if the mixnet feature is enabled + Ok(self) } diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index d00594b1..829020e7 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -2,7 +2,7 @@ pub mod api; mod config; mod tx; -use carnot_consensus::network::adapters::p2p::P2pAdapter as ConsensusNetworkAdapter; +use carnot_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusNetworkAdapter; use carnot_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay}; use color_eyre::eyre::Result; use full_replication::Certificate; @@ -21,18 +21,18 @@ use nomos_core::{ wire, }; use nomos_da::{ - backend::memory_cache::BlobCache, network::adapters::p2p::P2pAdapter as DaNetworkAdapter, + backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaNetworkAdapter, DataAvailabilityService, }; use nomos_log::Logger; -use nomos_mempool::network::adapters::p2p::P2pAdapter as MempoolNetworkAdapter; +use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter; use nomos_mempool::{ backend::mockpool::MockPool, Certificate as CertDiscriminant, MempoolService, Transaction as TxDiscriminant, }; #[cfg(feature = "metrics")] use nomos_metrics::Metrics; -use nomos_network::backends::libp2p::Libp2p; +use nomos_network::backends::libp2p::Libp2p as NetworkBackend; use nomos_storage::{ backends::{sled::SledBackend, StorageSerde}, StorageService, @@ -87,7 +87,7 @@ type Mempool = MempoolService, MockPool, - network: ServiceHandle>, + network: ServiceHandle>, cl_mempool: ServiceHandle::Hash, TxDiscriminant>>, da_mempool: ServiceHandle< Mempool< diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index e50a7829..25bc5787 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -13,7 +13,7 @@ use nomos_core::{ tx::Transaction, }; -use nomos_mempool::network::adapters::p2p::Settings as AdapterSettings; +use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings; use overwatch_rs::overwatch::*; diff --git a/nomos-cli/Cargo.toml b/nomos-cli/Cargo.toml index c7de8e3b..7a6775f1 100644 --- a/nomos-cli/Cargo.toml +++ b/nomos-cli/Cargo.toml @@ -18,8 +18,10 @@ futures = "0.3" tokio = { version = "1", features = ["sync"] } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } -nomos-network = { path = "../nomos-services/network" } -nomos-da = { path = "../nomos-services/data-availability" } +nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } +nomos-da = { path = "../nomos-services/data-availability", features = [ + "libp2p", +] } carnot-consensus = { path = "../nomos-services/carnot-consensus" } nomos-log = { path = "../nomos-services/log" } nomos-libp2p = { path = "../nomos-libp2p" } @@ -37,8 +39,3 @@ ratatui = "0.24" tui-input = "0.8" ansi-to-tui = "3" rand = "0.8" - -[features] -default = [] -libp2p = ["nomos-network/libp2p", "nomos-da/libp2p"] -mixnet = ["nomos-network/mixnet", "nomos-da/mixnet"] diff --git a/nomos-cli/src/cmds/chat/mod.rs b/nomos-cli/src/cmds/chat/mod.rs index d02cb81e..794dc09c 100644 --- a/nomos-cli/src/cmds/chat/mod.rs +++ b/nomos-cli/src/cmds/chat/mod.rs @@ -20,10 +20,7 @@ use full_replication::{ use futures::{stream, StreamExt}; use nomos_core::{da::DaProtocol, header::HeaderId, wire}; use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter}; -#[cfg(feature = "libp2p")] use nomos_network::backends::libp2p::Libp2p as NetworkBackend; -#[cfg(feature = "mixnet")] -use nomos_network::backends::mixnet::MixnetNetworkBackend as NetworkBackend; use nomos_network::NetworkService; use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; use reqwest::Url; diff --git a/nomos-cli/src/cmds/disseminate/mod.rs b/nomos-cli/src/cmds/disseminate/mod.rs index ad15d1ed..d5290f43 100644 --- a/nomos-cli/src/cmds/disseminate/mod.rs +++ b/nomos-cli/src/cmds/disseminate/mod.rs @@ -3,10 +3,7 @@ use crate::da::disseminate::{ }; use clap::Args; use nomos_log::{LoggerBackend, LoggerSettings}; -#[cfg(feature = "libp2p")] use nomos_network::backends::libp2p::Libp2p as NetworkBackend; -#[cfg(feature = "mixnet")] -use nomos_network::backends::mixnet::MixnetNetworkBackend as NetworkBackend; use nomos_network::NetworkService; use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; use reqwest::Url; diff --git a/nomos-cli/src/da/disseminate.rs b/nomos-cli/src/da/disseminate.rs index da53ca8c..ec4fc5bd 100644 --- a/nomos-cli/src/da/disseminate.rs +++ b/nomos-cli/src/da/disseminate.rs @@ -4,12 +4,9 @@ use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication use futures::StreamExt; use hex::FromHex; use nomos_core::{da::DaProtocol, wire}; -use nomos_da::network::{adapters::p2p::P2pAdapter as DaNetworkAdapter, NetworkAdapter}; +use nomos_da::network::{adapters::libp2p::Libp2pAdapter as DaNetworkAdapter, NetworkAdapter}; use nomos_log::Logger; -#[cfg(feature = "libp2p")] use nomos_network::backends::libp2p::Libp2p as NetworkBackend; -#[cfg(feature = "mixnet")] -use nomos_network::backends::mixnet::MixnetNetworkBackend as NetworkBackend; use nomos_network::NetworkService; use overwatch_derive::*; use overwatch_rs::{ diff --git a/nomos-services/api/Cargo.toml b/nomos-services/api/Cargo.toml index b72e1e7e..cddac679 100644 --- a/nomos-services/api/Cargo.toml +++ b/nomos-services/api/Cargo.toml @@ -6,8 +6,6 @@ edition = "2021" [features] default = ["axum"] axum = ["dep:axum", "dep:hyper", "dep:tower-http", "utoipa-swagger-ui/axum"] -libp2p = ["nomos-mempool/libp2p"] -mixnet = ["nomos-mempool/mixnet"] [dependencies] async-trait = "0.1" @@ -22,6 +20,7 @@ nomos-network = { path = "../../nomos-services/network" } nomos-da = { path = "../../nomos-services/data-availability" } nomos-mempool = { path = "../../nomos-services/mempool", features = [ "mock", + "libp2p", "openapi", ] } nomos-metrics = { path = "../../nomos-metrics" } diff --git a/nomos-services/api/src/http/cl.rs b/nomos-services/api/src/http/cl.rs index 17927582..83223a76 100644 --- a/nomos-services/api/src/http/cl.rs +++ b/nomos-services/api/src/http/cl.rs @@ -4,7 +4,7 @@ use nomos_core::header::HeaderId; use nomos_core::tx::Transaction; use nomos_mempool::{ backend::mockpool::MockPool, - network::adapters::p2p::P2pAdapter as MempoolNetworkAdapter, + network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, openapi::{MempoolMetrics, Status}, MempoolMsg, MempoolService, Transaction as TxDiscriminant, }; diff --git a/nomos-services/api/src/http/consensus.rs b/nomos-services/api/src/http/consensus.rs index 6c824998..02001741 100644 --- a/nomos-services/api/src/http/consensus.rs +++ b/nomos-services/api/src/http/consensus.rs @@ -5,8 +5,8 @@ use serde::{de::DeserializeOwned, Serialize}; use tokio::sync::oneshot; use carnot_consensus::{ - network::adapters::p2p::P2pAdapter as ConsensusNetworkAdapter, CarnotConsensus, CarnotInfo, - ConsensusMsg, + network::adapters::libp2p::Libp2pAdapter as ConsensusNetworkAdapter, CarnotConsensus, + CarnotInfo, ConsensusMsg, }; use carnot_engine::{ overlay::{RandomBeaconState, RoundRobin, TreeOverlay}, @@ -22,7 +22,7 @@ use nomos_core::{ tx::{select::FillSize as FillSizeWithTx, Transaction}, }; use nomos_mempool::{ - backend::mockpool::MockPool, network::adapters::p2p::P2pAdapter as MempoolNetworkAdapter, + backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, }; use nomos_storage::backends::{sled::SledBackend, StorageSerde}; diff --git a/nomos-services/api/src/http/da.rs b/nomos-services/api/src/http/da.rs index b96a5475..d2f44f3a 100644 --- a/nomos-services/api/src/http/da.rs +++ b/nomos-services/api/src/http/da.rs @@ -2,12 +2,12 @@ use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullRepli use nomos_core::da::blob; use nomos_core::header::HeaderId; use nomos_da::{ - backend::memory_cache::BlobCache, network::adapters::p2p::P2pAdapter as DaNetworkAdapter, + backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaNetworkAdapter, DaMsg, DataAvailabilityService, }; use nomos_mempool::{ backend::mockpool::MockPool, - network::adapters::p2p::P2pAdapter as MempoolNetworkAdapter, + network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, openapi::{MempoolMetrics, Status}, Certificate as CertDiscriminant, MempoolMsg, MempoolService, }; diff --git a/nomos-services/carnot-consensus/Cargo.toml b/nomos-services/carnot-consensus/Cargo.toml index 0c20b8b0..e84e3fb3 100644 --- a/nomos-services/carnot-consensus/Cargo.toml +++ b/nomos-services/carnot-consensus/Cargo.toml @@ -36,7 +36,6 @@ serde_json = { version = "1", optional = true } default = [] mock = ["nomos-network/mock"] libp2p = ["nomos-network/libp2p", "nomos-libp2p"] -mixnet = ["nomos-network/mixnet", "nomos-libp2p"] openapi = ["dep:utoipa", "serde_json"] [dev-dependencies] diff --git a/nomos-services/carnot-consensus/src/network/adapters/p2p.rs b/nomos-services/carnot-consensus/src/network/adapters/libp2p.rs similarity index 96% rename from nomos-services/carnot-consensus/src/network/adapters/p2p.rs rename to nomos-services/carnot-consensus/src/network/adapters/libp2p.rs index 5527522b..fa836086 100644 --- a/nomos-services/carnot-consensus/src/network/adapters/p2p.rs +++ b/nomos-services/carnot-consensus/src/network/adapters/libp2p.rs @@ -15,12 +15,8 @@ use crate::network::{ }; use carnot_engine::{Committee, CommitteeId, View}; use nomos_core::{header::HeaderId, wire}; -#[cfg(feature = "libp2p")] -use nomos_network::backends::libp2p::Libp2p as Backend; -#[cfg(feature = "mixnet")] -use nomos_network::backends::mixnet::MixnetNetworkBackend as Backend; use nomos_network::{ - backends::libp2p::{Command, Event, EventKind}, + backends::libp2p::{Command, Event, EventKind, Libp2p}, NetworkMsg, NetworkService, }; use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; @@ -107,8 +103,8 @@ struct Messages { /// Requesting the same stream type multiple times will re-initialize it and new items will only be forwarded to the latest one. /// It's required for the consumer to keep the stream around for the time it's necessary #[derive(Clone)] -pub struct P2pAdapter { - network_relay: OutboundRelay< as ServiceData>::Message>, +pub struct Libp2pAdapter { + network_relay: OutboundRelay< as ServiceData>::Message>, message_cache: MessageCache, } @@ -202,7 +198,7 @@ impl GossipsubMessage { } } -impl P2pAdapter { +impl Libp2pAdapter { async fn broadcast(&self, message: GossipsubMessage, topic: &str) { if let Err((e, message)) = self .network_relay @@ -216,7 +212,7 @@ impl P2pAdapter { }; } - async fn subscribe(relay: &Relay, topic: &str) { + async fn subscribe(relay: &Relay, topic: &str) { if let Err((e, _)) = relay .send(NetworkMsg::Process(Command::Subscribe(topic.into()))) .await @@ -227,10 +223,10 @@ impl P2pAdapter { } #[async_trait::async_trait] -impl NetworkAdapter for P2pAdapter { - type Backend = Backend; +impl NetworkAdapter for Libp2pAdapter { + type Backend = Libp2p; - async fn new(network_relay: Relay) -> Self { + async fn new(network_relay: Relay) -> Self { let message_cache = MessageCache::new(); let cache = message_cache.clone(); let relay = network_relay.clone(); diff --git a/nomos-services/carnot-consensus/src/network/adapters/mod.rs b/nomos-services/carnot-consensus/src/network/adapters/mod.rs index 4bcf9519..9cddaf9d 100644 --- a/nomos-services/carnot-consensus/src/network/adapters/mod.rs +++ b/nomos-services/carnot-consensus/src/network/adapters/mod.rs @@ -1,4 +1,4 @@ +#[cfg(feature = "libp2p")] +pub mod libp2p; #[cfg(feature = "mock")] pub mod mock; -#[cfg(any(feature = "libp2p", feature = "mixnet"))] -pub mod p2p; diff --git a/nomos-services/data-availability/Cargo.toml b/nomos-services/data-availability/Cargo.toml index 519d04b7..069ae646 100644 --- a/nomos-services/data-availability/Cargo.toml +++ b/nomos-services/data-availability/Cargo.toml @@ -19,4 +19,3 @@ tokio-stream = "0.1" [features] libp2p = ["nomos-network/libp2p"] -mixnet = ["nomos-network/mixnet"] diff --git a/nomos-services/data-availability/src/network/adapters/p2p.rs b/nomos-services/data-availability/src/network/adapters/libp2p.rs similarity index 89% rename from nomos-services/data-availability/src/network/adapters/p2p.rs rename to nomos-services/data-availability/src/network/adapters/libp2p.rs index 96f86ac8..b219833f 100644 --- a/nomos-services/data-availability/src/network/adapters/p2p.rs +++ b/nomos-services/data-availability/src/network/adapters/libp2p.rs @@ -7,11 +7,7 @@ use std::marker::PhantomData; // internal use crate::network::NetworkAdapter; use nomos_core::wire; -#[cfg(feature = "libp2p")] -use nomos_network::backends::libp2p::Libp2p as Backend; -use nomos_network::backends::libp2p::{Command, Event, EventKind, Message, TopicHash}; -#[cfg(feature = "mixnet")] -use nomos_network::backends::mixnet::MixnetNetworkBackend as Backend; +use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash}; use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; @@ -23,13 +19,13 @@ use tracing::debug; pub const NOMOS_DA_TOPIC: &str = "NomosDa"; -pub struct P2pAdapter { - network_relay: OutboundRelay< as ServiceData>::Message>, +pub struct Libp2pAdapter { + network_relay: OutboundRelay< as ServiceData>::Message>, _blob: PhantomData, _attestation: PhantomData, } -impl P2pAdapter +impl Libp2pAdapter where B: Serialize + DeserializeOwned + Send + Sync + 'static, A: Serialize + DeserializeOwned + Send + Sync + 'static, @@ -74,12 +70,12 @@ where } #[async_trait::async_trait] -impl NetworkAdapter for P2pAdapter +impl NetworkAdapter for Libp2pAdapter where B: Serialize + DeserializeOwned + Send + Sync + 'static, A: Serialize + DeserializeOwned + Send + Sync + 'static, { - type Backend = Backend; + type Backend = Libp2p; type Blob = B; type Attestation = A; diff --git a/nomos-services/data-availability/src/network/adapters/mod.rs b/nomos-services/data-availability/src/network/adapters/mod.rs index 27e673cb..a22ade97 100644 --- a/nomos-services/data-availability/src/network/adapters/mod.rs +++ b/nomos-services/data-availability/src/network/adapters/mod.rs @@ -1,2 +1,2 @@ -#[cfg(any(feature = "libp2p", feature = "mixnet"))] -pub mod p2p; +#[cfg(feature = "libp2p")] +pub mod libp2p; diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index 0d8ac241..d1a4e895 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -35,7 +35,6 @@ blake2 = "0.10" default = [] mock = ["linked-hash-map", "nomos-network/mock", "rand", "nomos-core/mock"] libp2p = ["nomos-network/libp2p"] -mixnet = ["nomos-network/mixnet"] metrics = [] # enable to help generate OpenAPI diff --git a/nomos-services/mempool/src/network/adapters/p2p.rs b/nomos-services/mempool/src/network/adapters/libp2p.rs similarity index 87% rename from nomos-services/mempool/src/network/adapters/p2p.rs rename to nomos-services/mempool/src/network/adapters/libp2p.rs index 7c77fe19..e6dd6bca 100644 --- a/nomos-services/mempool/src/network/adapters/p2p.rs +++ b/nomos-services/mempool/src/network/adapters/libp2p.rs @@ -6,27 +6,23 @@ use tokio_stream::StreamExt; // internal use crate::network::NetworkAdapter; use nomos_core::wire; -#[cfg(feature = "libp2p")] -use nomos_network::backends::libp2p::Libp2p as Backend; -use nomos_network::backends::libp2p::{Command, Event, EventKind, Message, TopicHash}; -#[cfg(feature = "mixnet")] -use nomos_network::backends::mixnet::MixnetNetworkBackend as Backend; +use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash}; use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; -pub struct P2pAdapter { - network_relay: OutboundRelay< as ServiceData>::Message>, +pub struct Libp2pAdapter { + network_relay: OutboundRelay< as ServiceData>::Message>, settings: Settings, } #[async_trait::async_trait] -impl NetworkAdapter for P2pAdapter +impl NetworkAdapter for Libp2pAdapter where Item: DeserializeOwned + Serialize + Send + Sync + 'static + Clone, Key: Clone + Send + Sync + 'static, { - type Backend = Backend; + type Backend = Libp2p; type Settings = Settings; type Item = Item; type Key = Key; diff --git a/nomos-services/mempool/src/network/adapters/mod.rs b/nomos-services/mempool/src/network/adapters/mod.rs index 5bcfdd34..9c581242 100644 --- a/nomos-services/mempool/src/network/adapters/mod.rs +++ b/nomos-services/mempool/src/network/adapters/mod.rs @@ -1,5 +1,5 @@ -#[cfg(any(feature = "libp2p", feature = "mixnet"))] -pub mod p2p; +#[cfg(feature = "libp2p")] +pub mod libp2p; #[cfg(feature = "mock")] pub mod mock; diff --git a/nomos-services/network/Cargo.toml b/nomos-services/network/Cargo.toml index 28dca191..2d630a60 100644 --- a/nomos-services/network/Cargo.toml +++ b/nomos-services/network/Cargo.toml @@ -33,6 +33,6 @@ tokio = { version = "1", features = ["full"] } [features] default = [] libp2p = ["nomos-libp2p", "rand", "humantime-serde"] -mixnet = ["dep:mixnet", "nomos-libp2p", "rand", "humantime-serde"] +mixnet = ["dep:mixnet"] mock = ["rand", "chrono"] openapi = ["dep:utoipa", "serde_json"] diff --git a/nomos-services/network/src/backends/libp2p/config.rs b/nomos-services/network/src/backends/libp2p/config.rs index a45d4c4b..bcee26e8 100644 --- a/nomos-services/network/src/backends/libp2p/config.rs +++ b/nomos-services/network/src/backends/libp2p/config.rs @@ -1,6 +1,9 @@ use nomos_libp2p::{Multiaddr, SwarmConfig}; use serde::{Deserialize, Serialize}; +#[cfg(feature = "mixnet")] +use crate::backends::libp2p::mixnet::MixnetConfig; + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Libp2pConfig { #[serde(flatten)] @@ -8,4 +11,6 @@ pub struct Libp2pConfig { // Initial peers to connect to #[serde(default)] pub initial_peers: Vec, + #[cfg(feature = "mixnet")] + pub mixnet: MixnetConfig, } diff --git a/nomos-services/network/src/backends/libp2p/mixnet.rs b/nomos-services/network/src/backends/libp2p/mixnet.rs new file mode 100644 index 00000000..fc32a2ca --- /dev/null +++ b/nomos-services/network/src/backends/libp2p/mixnet.rs @@ -0,0 +1,201 @@ +use std::net::SocketAddr; + +use futures::StreamExt; +use mixnet::{ + address::NodeAddress, + client::{MessageQueue, MixClient, MixClientConfig}, + node::{MixNode, MixNodeConfig, Output, PacketQueue}, + packet::PacketBody, +}; +use nomos_core::wire; +use nomos_libp2p::{ + libp2p::{Stream, StreamProtocol}, + libp2p_stream::IncomingStreams, + Multiaddr, Protocol, +}; +use serde::{Deserialize, Serialize}; +use tokio::{ + runtime::Handle, + sync::{mpsc, oneshot}, +}; + +use crate::backends::libp2p::{Command, Dial, Topic}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct MixnetConfig { + pub mixclient: MixClientConfig, + pub mixnode: MixNodeConfig, +} + +pub(crate) const STREAM_PROTOCOL: StreamProtocol = StreamProtocol::new("/mixnet"); + +pub(crate) fn init_mixnet( + config: MixnetConfig, + runtime_handle: Handle, + cmd_tx: mpsc::Sender, + incoming_streams: IncomingStreams, +) -> MessageQueue { + // Run mixnode + let (mixnode, packet_queue) = MixNode::new(config.mixnode).unwrap(); + let libp2p_cmd_tx = cmd_tx.clone(); + let queue = packet_queue.clone(); + runtime_handle.spawn(async move { + run_mixnode(mixnode, queue, libp2p_cmd_tx).await; + }); + let handle = runtime_handle.clone(); + let queue = packet_queue.clone(); + runtime_handle.spawn(async move { + handle_incoming_streams(incoming_streams, queue, handle).await; + }); + + // Run mixclient + let (mixclient, message_queue) = MixClient::new(config.mixclient).unwrap(); + runtime_handle.spawn(async move { + run_mixclient(mixclient, packet_queue, cmd_tx).await; + }); + + message_queue +} + +async fn run_mixnode( + mut mixnode: MixNode, + packet_queue: PacketQueue, + cmd_tx: mpsc::Sender, +) { + while let Some(output) = mixnode.next().await { + match output { + Output::Forward(packet) => { + stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await; + } + Output::ReconstructedMessage(message) => match MixnetMessage::from_bytes(&message) { + Ok(msg) => { + cmd_tx + .send(Command::Broadcast { + topic: msg.topic, + message: msg.message, + }) + .await + .unwrap(); + } + Err(e) => { + tracing::error!("failed to parse message received from mixnet: {e}"); + } + }, + } + } +} + +async fn run_mixclient( + mut mixclient: MixClient, + packet_queue: PacketQueue, + cmd_tx: mpsc::Sender, +) { + while let Some(packet) = mixclient.next().await { + stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await; + } +} + +async fn handle_incoming_streams( + mut incoming_streams: IncomingStreams, + packet_queue: PacketQueue, + runtime_handle: Handle, +) { + while let Some((_, stream)) = incoming_streams.next().await { + let queue = packet_queue.clone(); + runtime_handle.spawn(async move { + if let Err(e) = handle_stream(stream, queue).await { + tracing::warn!("stream closed: {e}"); + } + }); + } +} + +async fn handle_stream(mut stream: Stream, packet_queue: PacketQueue) -> std::io::Result<()> { + loop { + match PacketBody::read_from(&mut stream).await? { + Ok(packet_body) => { + packet_queue + .send(packet_body) + .await + .expect("The receiving half of packet queue should be always open"); + } + Err(e) => { + tracing::error!( + "failed to parse packet body. continuing reading the next packet: {e}" + ); + } + } + } +} + +async fn stream_send( + addr: NodeAddress, + packet_body: PacketBody, + cmd_tx: &mpsc::Sender, + packet_queue: &PacketQueue, +) { + let (tx, rx) = oneshot::channel(); + cmd_tx + .send(Command::Connect(Dial { + addr: multiaddr_from(addr), + retry_count: 3, + result_sender: tx, + })) + .await + .expect("Command receiver should be always open"); + + match rx.await { + Ok(Ok(peer_id)) => { + cmd_tx + .send(Command::StreamSend { + peer_id, + protocol: STREAM_PROTOCOL, + data: packet_body.bytes(), + }) + .await + .expect("Command receiver should be always open"); + } + Ok(Err(e)) => match e { + nomos_libp2p::DialError::NoAddresses => { + tracing::debug!("Dialing failed because the peer is the local node. Sending msg directly to the queue"); + packet_queue + .send(packet_body) + .await + .expect("The receiving half of packet queue should be always open"); + } + _ => tracing::error!("failed to dial with unrecoverable error: {e}"), + }, + Err(e) => { + tracing::error!("channel closed before receiving: {e}"); + } + } +} + +fn multiaddr_from(addr: NodeAddress) -> Multiaddr { + match SocketAddr::from(addr) { + SocketAddr::V4(addr) => Multiaddr::empty() + .with(Protocol::Ip4(*addr.ip())) + .with(Protocol::Udp(addr.port())) + .with(Protocol::QuicV1), + SocketAddr::V6(addr) => Multiaddr::empty() + .with(Protocol::Ip6(*addr.ip())) + .with(Protocol::Udp(addr.port())) + .with(Protocol::QuicV1), + } +} + +#[derive(Serialize, Deserialize, Clone, Debug)] +pub(crate) struct MixnetMessage { + pub topic: Topic, + pub message: Box<[u8]>, +} + +impl MixnetMessage { + pub fn as_bytes(&self) -> Vec { + wire::serialize(self).expect("Couldn't serialize MixnetMessage") + } + + pub fn from_bytes(data: &[u8]) -> Result { + wire::deserialize(data) + } +} diff --git a/nomos-services/network/src/backends/libp2p/mod.rs b/nomos-services/network/src/backends/libp2p/mod.rs index 57de8b34..39aa88c6 100644 --- a/nomos-services/network/src/backends/libp2p/mod.rs +++ b/nomos-services/network/src/backends/libp2p/mod.rs @@ -1,5 +1,7 @@ mod command; mod config; +#[cfg(feature = "mixnet")] +pub mod mixnet; pub(crate) mod swarm; // std @@ -9,6 +11,10 @@ use self::swarm::SwarmHandler; // internal use super::NetworkBackend; +#[cfg(feature = "mixnet")] +use crate::backends::libp2p::mixnet::{init_mixnet, MixnetMessage, STREAM_PROTOCOL}; +#[cfg(feature = "mixnet")] +use ::mixnet::client::MessageQueue; pub use nomos_libp2p::libp2p::gossipsub::{Message, TopicHash}; // crates use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState}; @@ -17,6 +23,8 @@ use tokio::sync::{broadcast, mpsc}; pub struct Libp2p { events_tx: broadcast::Sender, commands_tx: mpsc::Sender, + #[cfg(feature = "mixnet")] + mixclient_message_queue: MessageQueue, } #[derive(Debug)] @@ -46,6 +54,15 @@ impl NetworkBackend for Libp2p { let mut swarm_handler = SwarmHandler::new(&config, commands_tx.clone(), commands_rx, events_tx.clone()); + + #[cfg(feature = "mixnet")] + let mixclient_message_queue = init_mixnet( + config.mixnet, + overwatch_handle.runtime().clone(), + commands_tx.clone(), + swarm_handler.incoming_streams(STREAM_PROTOCOL), + ); + overwatch_handle.runtime().spawn(async move { swarm_handler.run(config.initial_peers).await; }); @@ -53,15 +70,35 @@ impl NetworkBackend for Libp2p { Self { events_tx, commands_tx, + #[cfg(feature = "mixnet")] + mixclient_message_queue, } } + #[cfg(not(feature = "mixnet"))] async fn process(&self, msg: Self::Message) { if let Err(e) = self.commands_tx.send(msg).await { tracing::error!("failed to send command to nomos-libp2p: {e:?}"); } } + #[cfg(feature = "mixnet")] + async fn process(&self, msg: Self::Message) { + match msg { + Command::Broadcast { topic, message } => { + let msg = MixnetMessage { topic, message }; + if let Err(e) = self.mixclient_message_queue.send(msg.as_bytes()).await { + tracing::error!("failed to send messasge to mixclient: {e}"); + } + } + cmd => { + if let Err(e) = self.commands_tx.send(cmd).await { + tracing::error!("failed to send command to libp2p swarm: {e:?}"); + } + } + } + } + async fn subscribe( &mut self, kind: Self::EventKind, diff --git a/nomos-services/network/src/backends/libp2p/swarm.rs b/nomos-services/network/src/backends/libp2p/swarm.rs index 8d8dee45..72580a5b 100644 --- a/nomos-services/network/src/backends/libp2p/swarm.rs +++ b/nomos-services/network/src/backends/libp2p/swarm.rs @@ -4,10 +4,12 @@ use std::{ }; use futures::AsyncWriteExt; +#[cfg(feature = "mixnet")] +use nomos_libp2p::libp2p_stream::IncomingStreams; use nomos_libp2p::{ gossipsub, libp2p::{swarm::ConnectionId, Stream, StreamProtocol}, - libp2p_stream::{Control, IncomingStreams, OpenStreamError}, + libp2p_stream::{Control, OpenStreamError}, BehaviourEvent, Multiaddr, PeerId, Swarm, SwarmEvent, }; use tokio::sync::{broadcast, mpsc, oneshot}; @@ -290,7 +292,7 @@ impl SwarmHandler { std::time::Duration::from_secs(BACKOFF.pow(retry as u32)) } - #[allow(dead_code)] + #[cfg(feature = "mixnet")] pub fn incoming_streams(&mut self, protocol: StreamProtocol) -> IncomingStreams { self.stream_control.accept(protocol).unwrap() } diff --git a/nomos-services/network/src/backends/mixnet/mod.rs b/nomos-services/network/src/backends/mixnet/mod.rs deleted file mode 100644 index b7af3c87..00000000 --- a/nomos-services/network/src/backends/mixnet/mod.rs +++ /dev/null @@ -1,291 +0,0 @@ -use std::net::SocketAddr; - -// internal -use super::{ - libp2p::{self, swarm::SwarmHandler, Libp2pConfig, Topic}, - NetworkBackend, -}; -use futures::StreamExt; -use mixnet::{ - address::NodeAddress, - client::{MessageQueue, MixClient, MixClientConfig}, - node::{MixNode, MixNodeConfig, Output, PacketQueue}, - packet::PacketBody, -}; -use nomos_core::wire; -use nomos_libp2p::{ - libp2p::{Stream, StreamProtocol}, - libp2p_stream::IncomingStreams, - Multiaddr, Protocol, -}; -// crates -use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState}; -use serde::{Deserialize, Serialize}; -use tokio::{ - runtime::Handle, - sync::{broadcast, mpsc, oneshot}, -}; - -/// A Mixnet network backend broadcasts messages to the network with mixing packets through mixnet, -/// and receives messages broadcasted from the network. -pub struct MixnetNetworkBackend { - libp2p_events_tx: broadcast::Sender, - libp2p_commands_tx: mpsc::Sender, - - mixclient_message_queue: MessageQueue, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct MixnetConfig { - libp2p_config: Libp2pConfig, - mixclient_config: MixClientConfig, - mixnode_config: MixNodeConfig, -} - -const BUFFER_SIZE: usize = 64; -const STREAM_PROTOCOL: StreamProtocol = StreamProtocol::new("/mixnet"); - -#[async_trait::async_trait] -impl NetworkBackend for MixnetNetworkBackend { - type Settings = MixnetConfig; - type State = NoState; - type Message = libp2p::Command; - type EventKind = libp2p::EventKind; - type NetworkEvent = libp2p::Event; - - fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { - // TODO: One important task that should be spawned is - // subscribing NewEntropy events that will be emitted from the consensus service soon. - // so that new topology can be built internally. - // In the mixnet spec, the robustness layer is responsible for this task. - // We can implement the robustness layer in the mixnet-specific crate, - // that we're going to define at the root of the project. - - let (libp2p_commands_tx, libp2p_commands_rx) = tokio::sync::mpsc::channel(BUFFER_SIZE); - let (libp2p_events_tx, _) = tokio::sync::broadcast::channel(BUFFER_SIZE); - - let mut swarm_handler = SwarmHandler::new( - &config.libp2p_config, - libp2p_commands_tx.clone(), - libp2p_commands_rx, - libp2p_events_tx.clone(), - ); - - // Run mixnode - let (mixnode, packet_queue) = MixNode::new(config.mixnode_config).unwrap(); - let libp2p_cmd_tx = libp2p_commands_tx.clone(); - let queue = packet_queue.clone(); - overwatch_handle.runtime().spawn(async move { - Self::run_mixnode(mixnode, queue, libp2p_cmd_tx).await; - }); - let incoming_streams = swarm_handler.incoming_streams(STREAM_PROTOCOL); - let runtime_handle = overwatch_handle.runtime().clone(); - let queue = packet_queue.clone(); - overwatch_handle.runtime().spawn(async move { - Self::handle_incoming_streams(incoming_streams, queue, runtime_handle).await; - }); - - // Run mixclient - let (mixclient, message_queue) = MixClient::new(config.mixclient_config).unwrap(); - let libp2p_cmd_tx = libp2p_commands_tx.clone(); - overwatch_handle.runtime().spawn(async move { - Self::run_mixclient(mixclient, packet_queue, libp2p_cmd_tx).await; - }); - - // Run libp2p swarm to make progress - overwatch_handle.runtime().spawn(async move { - swarm_handler.run(config.libp2p_config.initial_peers).await; - }); - - Self { - libp2p_events_tx, - libp2p_commands_tx, - - mixclient_message_queue: message_queue, - } - } - - async fn process(&self, msg: Self::Message) { - match msg { - libp2p::Command::Broadcast { topic, message } => { - let msg = MixnetMessage { topic, message }; - if let Err(e) = self.mixclient_message_queue.send(msg.as_bytes()).await { - tracing::error!("failed to send messasge to mixclient: {e}"); - } - } - cmd => { - if let Err(e) = self.libp2p_commands_tx.send(cmd).await { - tracing::error!("failed to send command to libp2p swarm: {e:?}"); - } - } - } - } - - async fn subscribe( - &mut self, - kind: Self::EventKind, - ) -> broadcast::Receiver { - match kind { - libp2p::EventKind::Message => self.libp2p_events_tx.subscribe(), - } - } -} - -impl MixnetNetworkBackend { - async fn run_mixnode( - mut mixnode: MixNode, - packet_queue: PacketQueue, - swarm_commands_tx: mpsc::Sender, - ) { - while let Some(output) = mixnode.next().await { - match output { - Output::Forward(packet) => { - Self::stream_send( - packet.address(), - packet.body(), - &swarm_commands_tx, - &packet_queue, - ) - .await; - } - Output::ReconstructedMessage(message) => { - match MixnetMessage::from_bytes(&message) { - Ok(msg) => { - swarm_commands_tx - .send(libp2p::Command::Broadcast { - topic: msg.topic, - message: msg.message, - }) - .await - .unwrap(); - } - Err(e) => { - tracing::error!("failed to parse message received from mixnet: {e}"); - } - } - } - } - } - } - - async fn run_mixclient( - mut mixclient: MixClient, - packet_queue: PacketQueue, - swarm_commands_tx: mpsc::Sender, - ) { - while let Some(packet) = mixclient.next().await { - Self::stream_send( - packet.address(), - packet.body(), - &swarm_commands_tx, - &packet_queue, - ) - .await; - } - } - - async fn handle_incoming_streams( - mut incoming_streams: IncomingStreams, - packet_queue: PacketQueue, - runtime_handle: Handle, - ) { - while let Some((_, stream)) = incoming_streams.next().await { - let queue = packet_queue.clone(); - runtime_handle.spawn(async move { - if let Err(e) = Self::handle_stream(stream, queue).await { - tracing::warn!("stream closed: {e}"); - } - }); - } - } - - async fn handle_stream(mut stream: Stream, packet_queue: PacketQueue) -> std::io::Result<()> { - loop { - match PacketBody::read_from(&mut stream).await? { - Ok(packet_body) => { - packet_queue - .send(packet_body) - .await - .expect("The receiving half of packet queue should be always open"); - } - Err(e) => { - tracing::error!( - "failed to parse packet body. continuing reading the next packet: {e}" - ); - } - } - } - } - - async fn stream_send( - addr: NodeAddress, - packet_body: PacketBody, - swarm_commands_tx: &mpsc::Sender, - packet_queue: &PacketQueue, - ) { - let (tx, rx) = oneshot::channel(); - swarm_commands_tx - .send(libp2p::Command::Connect(libp2p::Dial { - addr: Self::multiaddr_from(addr), - retry_count: 3, - result_sender: tx, - })) - .await - .expect("Command receiver should be always open"); - - match rx.await { - Ok(Ok(peer_id)) => { - swarm_commands_tx - .send(libp2p::Command::StreamSend { - peer_id, - protocol: STREAM_PROTOCOL, - data: packet_body.bytes(), - }) - .await - .expect("Command receiver should be always open"); - } - Ok(Err(e)) => match e { - nomos_libp2p::DialError::NoAddresses => { - tracing::debug!("Dialing failed because the peer is the local node. Sending msg directly to the queue"); - packet_queue - .send(packet_body) - .await - .expect("The receiving half of packet queue should be always open"); - } - _ => tracing::error!("failed to dial with unrecoverable error: {e}"), - }, - Err(e) => { - tracing::error!("channel closed before receiving: {e}"); - } - } - } - - fn multiaddr_from(addr: NodeAddress) -> Multiaddr { - match SocketAddr::from(addr) { - SocketAddr::V4(addr) => Multiaddr::empty() - .with(Protocol::Ip4(*addr.ip())) - .with(Protocol::Udp(addr.port())) - .with(Protocol::QuicV1), - SocketAddr::V6(addr) => Multiaddr::empty() - .with(Protocol::Ip6(*addr.ip())) - .with(Protocol::Udp(addr.port())) - .with(Protocol::QuicV1), - } - } -} - -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct MixnetMessage { - pub topic: Topic, - pub message: Box<[u8]>, -} - -impl MixnetMessage { - pub fn as_bytes(&self) -> Vec { - wire::serialize(self).expect("Couldn't serialize MixnetMessage") - } - - pub fn from_bytes(data: &[u8]) -> Result { - wire::deserialize(data) - } -} diff --git a/nomos-services/network/src/backends/mod.rs b/nomos-services/network/src/backends/mod.rs index eb7a49a2..e3b451c2 100644 --- a/nomos-services/network/src/backends/mod.rs +++ b/nomos-services/network/src/backends/mod.rs @@ -2,12 +2,9 @@ use super::*; use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::ServiceState}; use tokio::sync::broadcast::Receiver; -#[cfg(any(feature = "libp2p", feature = "mixnet"))] +#[cfg(feature = "libp2p")] pub mod libp2p; -#[cfg(feature = "mixnet")] -pub mod mixnet; - #[cfg(feature = "mock")] pub mod mock; diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 57fa060c..27440b49 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -7,14 +7,17 @@ publish = false [dependencies] nomos-node = { path = "../nodes/nomos-node", default-features = false } carnot-consensus = { path = "../nomos-services/carnot-consensus" } -nomos-network = { path = "../nomos-services/network" } +nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } mixnet = { path = "../mixnet" } nomos-log = { path = "../nomos-services/log" } nomos-api = { path = "../nomos-services/api" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } nomos-core = { path = "../nomos-core" } carnot-engine = { path = "../consensus/carnot-engine", features = ["serde"] } -nomos-mempool = { path = "../nomos-services/mempool", features = ["mock"] } +nomos-mempool = { path = "../nomos-services/mempool", features = [ + "mock", + "libp2p", +] } nomos-da = { path = "../nomos-services/data-availability" } full-replication = { path = "../nomos-da/full-replication" } rand = "0.8" @@ -48,6 +51,5 @@ path = "src/tests/cli.rs" [features] -libp2p = ["nomos-network/libp2p", "nomos-mempool/libp2p"] -mixnet = ["nomos-network/mixnet", "nomos-mempool/mixnet"] +mixnet = ["nomos-network/mixnet"] metrics = ["nomos-node/metrics"] diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 2bd3ccbe..8ad807bb 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -2,6 +2,11 @@ use std::net::SocketAddr; use std::process::{Child, Command, Stdio}; use std::time::Duration; +#[cfg(feature = "mixnet")] +use std::{ + net::{IpAddr, Ipv4Addr}, + num::NonZeroU8, +}; // internal use super::{create_tempdir, persist_tempdir, LOGS_PREFIX}; use crate::{adjust_timeout, get_available_port, ConsensusConfig, Node, SpawnConfig}; @@ -9,12 +14,20 @@ use carnot_consensus::{CarnotInfo, CarnotSettings}; use carnot_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay, TreeOverlaySettings}; use carnot_engine::{NodeId, Overlay}; use full_replication::Certificate; +#[cfg(feature = "mixnet")] +use mixnet::{ + address::NodeAddress, + client::MixClientConfig, + node::MixNodeConfig, + topology::{MixNodeInfo, MixnetTopology}, +}; use nomos_core::{block::Block, header::HeaderId}; use nomos_libp2p::{Multiaddr, Swarm}; use nomos_log::{LoggerBackend, LoggerFormat}; use nomos_mempool::MempoolMetrics; -use nomos_network::backends::libp2p::Libp2pConfig; -use nomos_network::NetworkConfig; +#[cfg(feature = "mixnet")] +use nomos_network::backends::libp2p::mixnet::MixnetConfig; +use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_node::{api::AxumBackendSettings, Config, Tx}; // crates use fraction::Fraction; @@ -220,7 +233,6 @@ impl Node for NomosNode { .backend .initial_peers .push(first_node_addr.clone()); - node_configs.push(conf); } node_configs @@ -231,9 +243,11 @@ impl Node for NomosNode { let mut prev_node_addr = node_address(&next_leader_config); let mut node_configs = vec![next_leader_config]; for mut conf in configs { - conf.network.backend.initial_peers.push(prev_node_addr); + conf.network + .backend + .initial_peers + .push(prev_node_addr.clone()); prev_node_addr = node_address(&conf); - node_configs.push(conf); } node_configs @@ -251,6 +265,9 @@ impl Node for NomosNode { } } +#[cfg(feature = "mixnet")] +const NUM_MIXNODE_CANDIDATES: usize = 2; + /// Returns the config of the next leader and all other nodes. /// /// Depending on the network topology, the next leader must be spawned first, @@ -263,14 +280,23 @@ fn create_node_configs(consensus: ConsensusConfig) -> (Config, Vec) { thread_rng().fill(id); } + #[cfg(feature = "mixnet")] + let (mixclient_config, mixnode_configs) = create_mixnet_config(&ids); + let mut configs = ids .iter() - .map(|id| { + .enumerate() + .map(|(_i, id)| { create_node_config( ids.iter().copied().map(NodeId::new).collect(), *id, consensus.threshold, consensus.timeout, + #[cfg(feature = "mixnet")] + MixnetConfig { + mixclient: mixclient_config.clone(), + mixnode: mixnode_configs[_i].clone(), + }, ) }) .collect::>(); @@ -282,9 +308,31 @@ fn create_node_configs(consensus: ConsensusConfig) -> (Config, Vec) { .position(|&id| NodeId::from(id) == next_leader) .unwrap(); - let next_leader_config = configs.swap_remove(next_leader_idx); + #[cfg(not(feature = "mixnet"))] + { + let next_leader_config = configs.swap_remove(next_leader_idx); + (next_leader_config, configs) + } + #[cfg(feature = "mixnet")] + { + let mut next_leader_config = configs.swap_remove(next_leader_idx); - (next_leader_config, configs) + // Build a topology using only a subset of nodes. + let mut mixnode_candidates = vec![&next_leader_config]; + configs + .iter() + .take(NUM_MIXNODE_CANDIDATES - 1) + .for_each(|config| mixnode_candidates.push(config)); + let topology = build_mixnet_topology(&mixnode_candidates); + + // Set the topology to all configs + next_leader_config.network.backend.mixnet.mixclient.topology = topology.clone(); + configs.iter_mut().for_each(|config| { + config.network.backend.mixnet.mixclient.topology = topology.clone(); + }); + + (next_leader_config, configs) + } } fn create_node_config( @@ -292,12 +340,15 @@ fn create_node_config( id: [u8; 32], threshold: Fraction, timeout: Duration, + #[cfg(feature = "mixnet")] mixnet_config: MixnetConfig, ) -> Config { let mut config = Config { network: NetworkConfig { backend: Libp2pConfig { inner: Default::default(), initial_peers: vec![], + #[cfg(feature = "mixnet")] + mixnet: mixnet_config, }, }, consensus: CarnotSettings { @@ -343,6 +394,49 @@ fn create_node_config( config } +#[cfg(feature = "mixnet")] +fn create_mixnet_config(ids: &[[u8; 32]]) -> (MixClientConfig, Vec) { + let mixnode_configs: Vec = ids + .iter() + .map(|id| MixNodeConfig { + encryption_private_key: *id, + delay_rate_per_min: 100000000.0, + }) + .collect(); + // Build an empty topology because it will be constructed with meaningful node infos later + let topology = MixnetTopology::new(Vec::new(), 0, 0, [1u8; 32]).unwrap(); + + ( + MixClientConfig { + topology, + emission_rate_per_min: 120.0, + redundancy: NonZeroU8::new(1).unwrap(), + }, + mixnode_configs, + ) +} + +#[cfg(feature = "mixnet")] +fn build_mixnet_topology(mixnode_candidates: &[&Config]) -> MixnetTopology { + use mixnet::crypto::public_key_from; + + let candidates = mixnode_candidates + .iter() + .map(|config| { + MixNodeInfo::new( + NodeAddress::from(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + config.network.backend.inner.port, + )), + public_key_from(config.network.backend.mixnet.mixnode.encryption_private_key), + ) + .unwrap() + }) + .collect::>(); + let num_layers = candidates.len(); + MixnetTopology::new(candidates, num_layers, 1, [1u8; 32]).unwrap() +} + fn node_address(config: &Config) -> Multiaddr { Swarm::multiaddr( std::net::Ipv4Addr::new(127, 0, 0, 1), diff --git a/tests/src/tests/cli.rs b/tests/src/tests/cli.rs index 294a5632..81e9c632 100644 --- a/tests/src/tests/cli.rs +++ b/tests/src/tests/cli.rs @@ -7,7 +7,7 @@ use nomos_cli::{ use nomos_core::da::{blob::Blob as _, DaProtocol}; use std::{io::Write, time::Duration}; use tempfile::NamedTempFile; -use tests::{adjust_timeout, get_available_port, nodes::nomos::Pool, Node, NomosNode, SpawnConfig}; +use tests::{adjust_timeout, nodes::nomos::Pool, Node, NomosNode, SpawnConfig}; const CLI_BIN: &str = "../target/debug/nomos-cli"; @@ -36,13 +36,9 @@ async fn disseminate(config: &mut Disseminate) { let node_configs = NomosNode::node_configs(SpawnConfig::chain_happy(2)); let first_node = NomosNode::spawn(node_configs[0].clone()).await; - let mut network_config = node_configs[1].network.clone(); - // use a new port because the old port is sometimes not closed immediately - network_config.backend.inner.port = get_available_port(); - let mut file = NamedTempFile::new().unwrap(); let config_path = file.path().to_owned(); - serde_yaml::to_writer(&mut file, &network_config).unwrap(); + serde_yaml::to_writer(&mut file, &node_configs[1].network).unwrap(); let da_protocol = DaProtocolChoice { da_protocol: Protocol::FullReplication, settings: ProtocolSettings { diff --git a/tests/src/tests/happy.rs b/tests/src/tests/happy.rs index fa3e50c2..76f4a01a 100644 --- a/tests/src/tests/happy.rs +++ b/tests/src/tests/happy.rs @@ -14,7 +14,7 @@ struct Info { } async fn happy_test(nodes: &[NomosNode]) { - let timeout = adjust_timeout(Duration::from_secs(30)); + let timeout = adjust_timeout(Duration::from_secs(60)); let timeout = tokio::time::sleep(timeout); tokio::select! { _ = timeout => panic!("timed out waiting for nodes to reach view {}", TARGET_VIEW),