Replace mixnet backend with mixnet addon (#615)
This commit is contained in:
parent
ed81577ab5
commit
d449114044
|
@ -16,7 +16,7 @@ jobs:
|
|||
strategy:
|
||||
fail-fast: true
|
||||
matrix:
|
||||
feature: [libp2p, mixnet]
|
||||
feature: [libp2p, "libp2p,mixnet"]
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
with:
|
||||
|
@ -37,7 +37,7 @@ jobs:
|
|||
strategy:
|
||||
fail-fast: false # all OSes should be tested even if one fails (default: true)
|
||||
matrix:
|
||||
feature: [libp2p, mixnet]
|
||||
feature: [libp2p, "libp2p,mixnet"]
|
||||
os: [ubuntu-latest, windows-latest, macos-latest]
|
||||
runs-on: ${{ matrix.os }}
|
||||
steps:
|
||||
|
@ -70,15 +70,13 @@ jobs:
|
|||
command: build
|
||||
args: --all --no-default-features --features ${{ matrix.feature }}
|
||||
- name: Cargo test (Other OSes)
|
||||
# TODO: enable tests for mixnet
|
||||
if: matrix.os != 'windows-latest' && matrix.feature != 'mixnet'
|
||||
if: matrix.os != 'windows-latest'
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: test
|
||||
args: --all --no-default-features --features ${{ matrix.feature }}
|
||||
- name: Cargo test (Windows)
|
||||
# TODO: enable tests for mixnet
|
||||
if: matrix.os == 'windows-latest' && matrix.feature != 'mixnet'
|
||||
if: matrix.os == 'windows-latest'
|
||||
uses: actions-rs/cargo@v1
|
||||
env:
|
||||
# Because Windows runners in Github Actions tend to be slow.
|
||||
|
@ -97,7 +95,7 @@ jobs:
|
|||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
feature: [libp2p, mixnet]
|
||||
feature: [libp2p, "libp2p,mixnet"]
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
with:
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
use sphinx_packet::crypto::{PrivateKey, PublicKey, PRIVATE_KEY_SIZE, PUBLIC_KEY_SIZE};
|
||||
|
||||
/// Converts a mixnode private key to a public key
|
||||
pub fn public_key_from(private_key: [u8; PRIVATE_KEY_SIZE]) -> [u8; PUBLIC_KEY_SIZE] {
|
||||
*PublicKey::from(&PrivateKey::from(private_key)).as_bytes()
|
||||
}
|
|
@ -6,6 +6,8 @@
|
|||
pub mod address;
|
||||
/// Mix client
|
||||
pub mod client;
|
||||
/// Mixnet cryptography
|
||||
pub mod crypto;
|
||||
/// Mixnet errors
|
||||
pub mod error;
|
||||
mod fragment;
|
||||
|
|
|
@ -20,19 +20,24 @@ overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d
|
|||
tracing = "0.1"
|
||||
multiaddr = "0.18"
|
||||
nomos-core = { path = "../../nomos-core" }
|
||||
nomos-network = { path = "../../nomos-services/network" }
|
||||
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
|
||||
nomos-api = { path = "../../nomos-services/api" }
|
||||
nomos-log = { path = "../../nomos-services/log" }
|
||||
nomos-mempool = { path = "../../nomos-services/mempool", features = [
|
||||
"mock",
|
||||
"libp2p",
|
||||
"metrics",
|
||||
] }
|
||||
nomos-metrics = { path = "../../nomos-metrics" }
|
||||
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
|
||||
carnot-consensus = { path = "../../nomos-services/carnot-consensus" }
|
||||
carnot-consensus = { path = "../../nomos-services/carnot-consensus", features = [
|
||||
"libp2p",
|
||||
] }
|
||||
nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] }
|
||||
nomos-libp2p = { path = "../../nomos-libp2p" }
|
||||
nomos-da = { path = "../../nomos-services/data-availability" }
|
||||
nomos-da = { path = "../../nomos-services/data-availability", features = [
|
||||
"libp2p",
|
||||
] }
|
||||
nomos-system-sig = { path = "../../nomos-services/system-sig" }
|
||||
metrics = { path = "../../nomos-services/metrics", optional = true }
|
||||
tracing-subscriber = "0.3"
|
||||
|
@ -53,15 +58,4 @@ tower-http = { version = "0.4", features = ["cors", "trace"] }
|
|||
|
||||
[features]
|
||||
default = []
|
||||
libp2p = [
|
||||
"nomos-network/libp2p",
|
||||
"nomos-mempool/libp2p",
|
||||
"nomos-da/libp2p",
|
||||
"carnot-consensus/libp2p",
|
||||
]
|
||||
mixnet = [
|
||||
"nomos-network/mixnet",
|
||||
"nomos-mempool/mixnet",
|
||||
"nomos-da/mixnet",
|
||||
"carnot-consensus/mixnet",
|
||||
]
|
||||
mixnet = ["nomos-network/mixnet"]
|
||||
|
|
|
@ -22,12 +22,10 @@ use utoipa_swagger_ui::SwaggerUi;
|
|||
use full_replication::{Blob, Certificate};
|
||||
use nomos_core::{da::blob, header::HeaderId, tx::Transaction};
|
||||
use nomos_mempool::{
|
||||
network::adapters::p2p::P2pAdapter as MempoolNetworkAdapter, openapi::Status, MempoolMetrics,
|
||||
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, openapi::Status,
|
||||
MempoolMetrics,
|
||||
};
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
|
||||
#[cfg(feature = "mixnet")]
|
||||
use nomos_network::backends::mixnet::MixnetNetworkBackend as NetworkBackend;
|
||||
use nomos_storage::backends::StorageSerde;
|
||||
|
||||
use nomos_api::{
|
||||
|
|
|
@ -13,7 +13,7 @@ use hex::FromHex;
|
|||
use nomos_api::ApiService;
|
||||
use nomos_libp2p::{secp256k1::SecretKey, Multiaddr};
|
||||
use nomos_log::{Logger, LoggerBackend, LoggerFormat};
|
||||
use nomos_network::backends::libp2p::Libp2p;
|
||||
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
|
||||
use nomos_network::NetworkService;
|
||||
use overwatch_rs::services::ServiceData;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -122,7 +122,7 @@ pub struct MetricsArgs {
|
|||
#[derive(Deserialize, Debug, Clone, Serialize)]
|
||||
pub struct Config {
|
||||
pub log: <Logger as ServiceData>::Settings,
|
||||
pub network: <NetworkService<Libp2p> as ServiceData>::Settings,
|
||||
pub network: <NetworkService<NetworkBackend> as ServiceData>::Settings,
|
||||
pub http: <ApiService<AxumBackend<Tx, Wire, MB16>> as ServiceData>::Settings,
|
||||
pub consensus: <Carnot as ServiceData>::Settings,
|
||||
pub da: <DataAvailability as ServiceData>::Settings,
|
||||
|
@ -171,6 +171,7 @@ impl Config {
|
|||
}
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub fn update_network(mut self, network_args: NetworkArgs) -> Result<Self> {
|
||||
let NetworkArgs {
|
||||
host,
|
||||
|
@ -199,6 +200,8 @@ impl Config {
|
|||
self.network.backend.initial_peers = peers;
|
||||
}
|
||||
|
||||
// TODO: configure mixclient and mixnode if the mixnet feature is enabled
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ pub mod api;
|
|||
mod config;
|
||||
mod tx;
|
||||
|
||||
use carnot_consensus::network::adapters::p2p::P2pAdapter as ConsensusNetworkAdapter;
|
||||
use carnot_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusNetworkAdapter;
|
||||
use carnot_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay};
|
||||
use color_eyre::eyre::Result;
|
||||
use full_replication::Certificate;
|
||||
|
@ -21,18 +21,18 @@ use nomos_core::{
|
|||
wire,
|
||||
};
|
||||
use nomos_da::{
|
||||
backend::memory_cache::BlobCache, network::adapters::p2p::P2pAdapter as DaNetworkAdapter,
|
||||
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaNetworkAdapter,
|
||||
DataAvailabilityService,
|
||||
};
|
||||
use nomos_log::Logger;
|
||||
use nomos_mempool::network::adapters::p2p::P2pAdapter as MempoolNetworkAdapter;
|
||||
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
|
||||
use nomos_mempool::{
|
||||
backend::mockpool::MockPool, Certificate as CertDiscriminant, MempoolService,
|
||||
Transaction as TxDiscriminant,
|
||||
};
|
||||
#[cfg(feature = "metrics")]
|
||||
use nomos_metrics::Metrics;
|
||||
use nomos_network::backends::libp2p::Libp2p;
|
||||
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
|
||||
use nomos_storage::{
|
||||
backends::{sled::SledBackend, StorageSerde},
|
||||
StorageService,
|
||||
|
@ -87,7 +87,7 @@ type Mempool<K, V, D> = MempoolService<MempoolNetworkAdapter<K, V>, MockPool<Hea
|
|||
#[derive(Services)]
|
||||
pub struct Nomos {
|
||||
logging: ServiceHandle<Logger>,
|
||||
network: ServiceHandle<NetworkService<Libp2p>>,
|
||||
network: ServiceHandle<NetworkService<NetworkBackend>>,
|
||||
cl_mempool: ServiceHandle<Mempool<Tx, <Tx as Transaction>::Hash, TxDiscriminant>>,
|
||||
da_mempool: ServiceHandle<
|
||||
Mempool<
|
||||
|
|
|
@ -13,7 +13,7 @@ use nomos_core::{
|
|||
tx::Transaction,
|
||||
};
|
||||
|
||||
use nomos_mempool::network::adapters::p2p::Settings as AdapterSettings;
|
||||
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
|
||||
|
||||
use overwatch_rs::overwatch::*;
|
||||
|
||||
|
|
|
@ -18,8 +18,10 @@ futures = "0.3"
|
|||
tokio = { version = "1", features = ["sync"] }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||
nomos-network = { path = "../nomos-services/network" }
|
||||
nomos-da = { path = "../nomos-services/data-availability" }
|
||||
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
|
||||
nomos-da = { path = "../nomos-services/data-availability", features = [
|
||||
"libp2p",
|
||||
] }
|
||||
carnot-consensus = { path = "../nomos-services/carnot-consensus" }
|
||||
nomos-log = { path = "../nomos-services/log" }
|
||||
nomos-libp2p = { path = "../nomos-libp2p" }
|
||||
|
@ -37,8 +39,3 @@ ratatui = "0.24"
|
|||
tui-input = "0.8"
|
||||
ansi-to-tui = "3"
|
||||
rand = "0.8"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
libp2p = ["nomos-network/libp2p", "nomos-da/libp2p"]
|
||||
mixnet = ["nomos-network/mixnet", "nomos-da/mixnet"]
|
||||
|
|
|
@ -20,10 +20,7 @@ use full_replication::{
|
|||
use futures::{stream, StreamExt};
|
||||
use nomos_core::{da::DaProtocol, header::HeaderId, wire};
|
||||
use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter};
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
|
||||
#[cfg(feature = "mixnet")]
|
||||
use nomos_network::backends::mixnet::MixnetNetworkBackend as NetworkBackend;
|
||||
use nomos_network::NetworkService;
|
||||
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};
|
||||
use reqwest::Url;
|
||||
|
|
|
@ -3,10 +3,7 @@ use crate::da::disseminate::{
|
|||
};
|
||||
use clap::Args;
|
||||
use nomos_log::{LoggerBackend, LoggerSettings};
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
|
||||
#[cfg(feature = "mixnet")]
|
||||
use nomos_network::backends::mixnet::MixnetNetworkBackend as NetworkBackend;
|
||||
use nomos_network::NetworkService;
|
||||
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};
|
||||
use reqwest::Url;
|
||||
|
|
|
@ -4,12 +4,9 @@ use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication
|
|||
use futures::StreamExt;
|
||||
use hex::FromHex;
|
||||
use nomos_core::{da::DaProtocol, wire};
|
||||
use nomos_da::network::{adapters::p2p::P2pAdapter as DaNetworkAdapter, NetworkAdapter};
|
||||
use nomos_da::network::{adapters::libp2p::Libp2pAdapter as DaNetworkAdapter, NetworkAdapter};
|
||||
use nomos_log::Logger;
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
|
||||
#[cfg(feature = "mixnet")]
|
||||
use nomos_network::backends::mixnet::MixnetNetworkBackend as NetworkBackend;
|
||||
use nomos_network::NetworkService;
|
||||
use overwatch_derive::*;
|
||||
use overwatch_rs::{
|
||||
|
|
|
@ -6,8 +6,6 @@ edition = "2021"
|
|||
[features]
|
||||
default = ["axum"]
|
||||
axum = ["dep:axum", "dep:hyper", "dep:tower-http", "utoipa-swagger-ui/axum"]
|
||||
libp2p = ["nomos-mempool/libp2p"]
|
||||
mixnet = ["nomos-mempool/mixnet"]
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
|
@ -22,6 +20,7 @@ nomos-network = { path = "../../nomos-services/network" }
|
|||
nomos-da = { path = "../../nomos-services/data-availability" }
|
||||
nomos-mempool = { path = "../../nomos-services/mempool", features = [
|
||||
"mock",
|
||||
"libp2p",
|
||||
"openapi",
|
||||
] }
|
||||
nomos-metrics = { path = "../../nomos-metrics" }
|
||||
|
|
|
@ -4,7 +4,7 @@ use nomos_core::header::HeaderId;
|
|||
use nomos_core::tx::Transaction;
|
||||
use nomos_mempool::{
|
||||
backend::mockpool::MockPool,
|
||||
network::adapters::p2p::P2pAdapter as MempoolNetworkAdapter,
|
||||
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
|
||||
openapi::{MempoolMetrics, Status},
|
||||
MempoolMsg, MempoolService, Transaction as TxDiscriminant,
|
||||
};
|
||||
|
|
|
@ -5,8 +5,8 @@ use serde::{de::DeserializeOwned, Serialize};
|
|||
use tokio::sync::oneshot;
|
||||
|
||||
use carnot_consensus::{
|
||||
network::adapters::p2p::P2pAdapter as ConsensusNetworkAdapter, CarnotConsensus, CarnotInfo,
|
||||
ConsensusMsg,
|
||||
network::adapters::libp2p::Libp2pAdapter as ConsensusNetworkAdapter, CarnotConsensus,
|
||||
CarnotInfo, ConsensusMsg,
|
||||
};
|
||||
use carnot_engine::{
|
||||
overlay::{RandomBeaconState, RoundRobin, TreeOverlay},
|
||||
|
@ -22,7 +22,7 @@ use nomos_core::{
|
|||
tx::{select::FillSize as FillSizeWithTx, Transaction},
|
||||
};
|
||||
use nomos_mempool::{
|
||||
backend::mockpool::MockPool, network::adapters::p2p::P2pAdapter as MempoolNetworkAdapter,
|
||||
backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
|
||||
};
|
||||
use nomos_storage::backends::{sled::SledBackend, StorageSerde};
|
||||
|
||||
|
|
|
@ -2,12 +2,12 @@ use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullRepli
|
|||
use nomos_core::da::blob;
|
||||
use nomos_core::header::HeaderId;
|
||||
use nomos_da::{
|
||||
backend::memory_cache::BlobCache, network::adapters::p2p::P2pAdapter as DaNetworkAdapter,
|
||||
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaNetworkAdapter,
|
||||
DaMsg, DataAvailabilityService,
|
||||
};
|
||||
use nomos_mempool::{
|
||||
backend::mockpool::MockPool,
|
||||
network::adapters::p2p::P2pAdapter as MempoolNetworkAdapter,
|
||||
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
|
||||
openapi::{MempoolMetrics, Status},
|
||||
Certificate as CertDiscriminant, MempoolMsg, MempoolService,
|
||||
};
|
||||
|
|
|
@ -36,7 +36,6 @@ serde_json = { version = "1", optional = true }
|
|||
default = []
|
||||
mock = ["nomos-network/mock"]
|
||||
libp2p = ["nomos-network/libp2p", "nomos-libp2p"]
|
||||
mixnet = ["nomos-network/mixnet", "nomos-libp2p"]
|
||||
openapi = ["dep:utoipa", "serde_json"]
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
@ -15,12 +15,8 @@ use crate::network::{
|
|||
};
|
||||
use carnot_engine::{Committee, CommitteeId, View};
|
||||
use nomos_core::{header::HeaderId, wire};
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_network::backends::libp2p::Libp2p as Backend;
|
||||
#[cfg(feature = "mixnet")]
|
||||
use nomos_network::backends::mixnet::MixnetNetworkBackend as Backend;
|
||||
use nomos_network::{
|
||||
backends::libp2p::{Command, Event, EventKind},
|
||||
backends::libp2p::{Command, Event, EventKind, Libp2p},
|
||||
NetworkMsg, NetworkService,
|
||||
};
|
||||
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
|
||||
|
@ -107,8 +103,8 @@ struct Messages {
|
|||
/// Requesting the same stream type multiple times will re-initialize it and new items will only be forwarded to the latest one.
|
||||
/// It's required for the consumer to keep the stream around for the time it's necessary
|
||||
#[derive(Clone)]
|
||||
pub struct P2pAdapter {
|
||||
network_relay: OutboundRelay<<NetworkService<Backend> as ServiceData>::Message>,
|
||||
pub struct Libp2pAdapter {
|
||||
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
|
||||
message_cache: MessageCache,
|
||||
}
|
||||
|
||||
|
@ -202,7 +198,7 @@ impl GossipsubMessage {
|
|||
}
|
||||
}
|
||||
|
||||
impl P2pAdapter {
|
||||
impl Libp2pAdapter {
|
||||
async fn broadcast(&self, message: GossipsubMessage, topic: &str) {
|
||||
if let Err((e, message)) = self
|
||||
.network_relay
|
||||
|
@ -216,7 +212,7 @@ impl P2pAdapter {
|
|||
};
|
||||
}
|
||||
|
||||
async fn subscribe(relay: &Relay<Backend>, topic: &str) {
|
||||
async fn subscribe(relay: &Relay<Libp2p>, topic: &str) {
|
||||
if let Err((e, _)) = relay
|
||||
.send(NetworkMsg::Process(Command::Subscribe(topic.into())))
|
||||
.await
|
||||
|
@ -227,10 +223,10 @@ impl P2pAdapter {
|
|||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl NetworkAdapter for P2pAdapter {
|
||||
type Backend = Backend;
|
||||
impl NetworkAdapter for Libp2pAdapter {
|
||||
type Backend = Libp2p;
|
||||
|
||||
async fn new(network_relay: Relay<Backend>) -> Self {
|
||||
async fn new(network_relay: Relay<Self::Backend>) -> Self {
|
||||
let message_cache = MessageCache::new();
|
||||
let cache = message_cache.clone();
|
||||
let relay = network_relay.clone();
|
|
@ -1,4 +1,4 @@
|
|||
#[cfg(feature = "libp2p")]
|
||||
pub mod libp2p;
|
||||
#[cfg(feature = "mock")]
|
||||
pub mod mock;
|
||||
#[cfg(any(feature = "libp2p", feature = "mixnet"))]
|
||||
pub mod p2p;
|
||||
|
|
|
@ -19,4 +19,3 @@ tokio-stream = "0.1"
|
|||
|
||||
[features]
|
||||
libp2p = ["nomos-network/libp2p"]
|
||||
mixnet = ["nomos-network/mixnet"]
|
||||
|
|
|
@ -7,11 +7,7 @@ use std::marker::PhantomData;
|
|||
// internal
|
||||
use crate::network::NetworkAdapter;
|
||||
use nomos_core::wire;
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_network::backends::libp2p::Libp2p as Backend;
|
||||
use nomos_network::backends::libp2p::{Command, Event, EventKind, Message, TopicHash};
|
||||
#[cfg(feature = "mixnet")]
|
||||
use nomos_network::backends::mixnet::MixnetNetworkBackend as Backend;
|
||||
use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash};
|
||||
use nomos_network::{NetworkMsg, NetworkService};
|
||||
use overwatch_rs::services::relay::OutboundRelay;
|
||||
use overwatch_rs::services::ServiceData;
|
||||
|
@ -23,13 +19,13 @@ use tracing::debug;
|
|||
|
||||
pub const NOMOS_DA_TOPIC: &str = "NomosDa";
|
||||
|
||||
pub struct P2pAdapter<B, A> {
|
||||
network_relay: OutboundRelay<<NetworkService<Backend> as ServiceData>::Message>,
|
||||
pub struct Libp2pAdapter<B, A> {
|
||||
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
|
||||
_blob: PhantomData<B>,
|
||||
_attestation: PhantomData<A>,
|
||||
}
|
||||
|
||||
impl<B, A> P2pAdapter<B, A>
|
||||
impl<B, A> Libp2pAdapter<B, A>
|
||||
where
|
||||
B: Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||
A: Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||
|
@ -74,12 +70,12 @@ where
|
|||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<B, A> NetworkAdapter for P2pAdapter<B, A>
|
||||
impl<B, A> NetworkAdapter for Libp2pAdapter<B, A>
|
||||
where
|
||||
B: Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||
A: Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||
{
|
||||
type Backend = Backend;
|
||||
type Backend = Libp2p;
|
||||
type Blob = B;
|
||||
type Attestation = A;
|
||||
|
|
@ -1,2 +1,2 @@
|
|||
#[cfg(any(feature = "libp2p", feature = "mixnet"))]
|
||||
pub mod p2p;
|
||||
#[cfg(feature = "libp2p")]
|
||||
pub mod libp2p;
|
||||
|
|
|
@ -35,7 +35,6 @@ blake2 = "0.10"
|
|||
default = []
|
||||
mock = ["linked-hash-map", "nomos-network/mock", "rand", "nomos-core/mock"]
|
||||
libp2p = ["nomos-network/libp2p"]
|
||||
mixnet = ["nomos-network/mixnet"]
|
||||
metrics = []
|
||||
|
||||
# enable to help generate OpenAPI
|
||||
|
|
|
@ -6,27 +6,23 @@ use tokio_stream::StreamExt;
|
|||
// internal
|
||||
use crate::network::NetworkAdapter;
|
||||
use nomos_core::wire;
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_network::backends::libp2p::Libp2p as Backend;
|
||||
use nomos_network::backends::libp2p::{Command, Event, EventKind, Message, TopicHash};
|
||||
#[cfg(feature = "mixnet")]
|
||||
use nomos_network::backends::mixnet::MixnetNetworkBackend as Backend;
|
||||
use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash};
|
||||
use nomos_network::{NetworkMsg, NetworkService};
|
||||
use overwatch_rs::services::relay::OutboundRelay;
|
||||
use overwatch_rs::services::ServiceData;
|
||||
|
||||
pub struct P2pAdapter<Item, Key> {
|
||||
network_relay: OutboundRelay<<NetworkService<Backend> as ServiceData>::Message>,
|
||||
pub struct Libp2pAdapter<Item, Key> {
|
||||
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
|
||||
settings: Settings<Key, Item>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<Item, Key> NetworkAdapter for P2pAdapter<Item, Key>
|
||||
impl<Item, Key> NetworkAdapter for Libp2pAdapter<Item, Key>
|
||||
where
|
||||
Item: DeserializeOwned + Serialize + Send + Sync + 'static + Clone,
|
||||
Key: Clone + Send + Sync + 'static,
|
||||
{
|
||||
type Backend = Backend;
|
||||
type Backend = Libp2p;
|
||||
type Settings = Settings<Key, Item>;
|
||||
type Item = Item;
|
||||
type Key = Key;
|
|
@ -1,5 +1,5 @@
|
|||
#[cfg(any(feature = "libp2p", feature = "mixnet"))]
|
||||
pub mod p2p;
|
||||
#[cfg(feature = "libp2p")]
|
||||
pub mod libp2p;
|
||||
|
||||
#[cfg(feature = "mock")]
|
||||
pub mod mock;
|
||||
|
|
|
@ -33,6 +33,6 @@ tokio = { version = "1", features = ["full"] }
|
|||
[features]
|
||||
default = []
|
||||
libp2p = ["nomos-libp2p", "rand", "humantime-serde"]
|
||||
mixnet = ["dep:mixnet", "nomos-libp2p", "rand", "humantime-serde"]
|
||||
mixnet = ["dep:mixnet"]
|
||||
mock = ["rand", "chrono"]
|
||||
openapi = ["dep:utoipa", "serde_json"]
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
use nomos_libp2p::{Multiaddr, SwarmConfig};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[cfg(feature = "mixnet")]
|
||||
use crate::backends::libp2p::mixnet::MixnetConfig;
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct Libp2pConfig {
|
||||
#[serde(flatten)]
|
||||
|
@ -8,4 +11,6 @@ pub struct Libp2pConfig {
|
|||
// Initial peers to connect to
|
||||
#[serde(default)]
|
||||
pub initial_peers: Vec<Multiaddr>,
|
||||
#[cfg(feature = "mixnet")]
|
||||
pub mixnet: MixnetConfig,
|
||||
}
|
||||
|
|
|
@ -0,0 +1,201 @@
|
|||
use std::net::SocketAddr;
|
||||
|
||||
use futures::StreamExt;
|
||||
use mixnet::{
|
||||
address::NodeAddress,
|
||||
client::{MessageQueue, MixClient, MixClientConfig},
|
||||
node::{MixNode, MixNodeConfig, Output, PacketQueue},
|
||||
packet::PacketBody,
|
||||
};
|
||||
use nomos_core::wire;
|
||||
use nomos_libp2p::{
|
||||
libp2p::{Stream, StreamProtocol},
|
||||
libp2p_stream::IncomingStreams,
|
||||
Multiaddr, Protocol,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::{
|
||||
runtime::Handle,
|
||||
sync::{mpsc, oneshot},
|
||||
};
|
||||
|
||||
use crate::backends::libp2p::{Command, Dial, Topic};
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct MixnetConfig {
|
||||
pub mixclient: MixClientConfig,
|
||||
pub mixnode: MixNodeConfig,
|
||||
}
|
||||
|
||||
pub(crate) const STREAM_PROTOCOL: StreamProtocol = StreamProtocol::new("/mixnet");
|
||||
|
||||
pub(crate) fn init_mixnet(
|
||||
config: MixnetConfig,
|
||||
runtime_handle: Handle,
|
||||
cmd_tx: mpsc::Sender<Command>,
|
||||
incoming_streams: IncomingStreams,
|
||||
) -> MessageQueue {
|
||||
// Run mixnode
|
||||
let (mixnode, packet_queue) = MixNode::new(config.mixnode).unwrap();
|
||||
let libp2p_cmd_tx = cmd_tx.clone();
|
||||
let queue = packet_queue.clone();
|
||||
runtime_handle.spawn(async move {
|
||||
run_mixnode(mixnode, queue, libp2p_cmd_tx).await;
|
||||
});
|
||||
let handle = runtime_handle.clone();
|
||||
let queue = packet_queue.clone();
|
||||
runtime_handle.spawn(async move {
|
||||
handle_incoming_streams(incoming_streams, queue, handle).await;
|
||||
});
|
||||
|
||||
// Run mixclient
|
||||
let (mixclient, message_queue) = MixClient::new(config.mixclient).unwrap();
|
||||
runtime_handle.spawn(async move {
|
||||
run_mixclient(mixclient, packet_queue, cmd_tx).await;
|
||||
});
|
||||
|
||||
message_queue
|
||||
}
|
||||
|
||||
async fn run_mixnode(
|
||||
mut mixnode: MixNode,
|
||||
packet_queue: PacketQueue,
|
||||
cmd_tx: mpsc::Sender<Command>,
|
||||
) {
|
||||
while let Some(output) = mixnode.next().await {
|
||||
match output {
|
||||
Output::Forward(packet) => {
|
||||
stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await;
|
||||
}
|
||||
Output::ReconstructedMessage(message) => match MixnetMessage::from_bytes(&message) {
|
||||
Ok(msg) => {
|
||||
cmd_tx
|
||||
.send(Command::Broadcast {
|
||||
topic: msg.topic,
|
||||
message: msg.message,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("failed to parse message received from mixnet: {e}");
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_mixclient(
|
||||
mut mixclient: MixClient,
|
||||
packet_queue: PacketQueue,
|
||||
cmd_tx: mpsc::Sender<Command>,
|
||||
) {
|
||||
while let Some(packet) = mixclient.next().await {
|
||||
stream_send(packet.address(), packet.body(), &cmd_tx, &packet_queue).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_incoming_streams(
|
||||
mut incoming_streams: IncomingStreams,
|
||||
packet_queue: PacketQueue,
|
||||
runtime_handle: Handle,
|
||||
) {
|
||||
while let Some((_, stream)) = incoming_streams.next().await {
|
||||
let queue = packet_queue.clone();
|
||||
runtime_handle.spawn(async move {
|
||||
if let Err(e) = handle_stream(stream, queue).await {
|
||||
tracing::warn!("stream closed: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_stream(mut stream: Stream, packet_queue: PacketQueue) -> std::io::Result<()> {
|
||||
loop {
|
||||
match PacketBody::read_from(&mut stream).await? {
|
||||
Ok(packet_body) => {
|
||||
packet_queue
|
||||
.send(packet_body)
|
||||
.await
|
||||
.expect("The receiving half of packet queue should be always open");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"failed to parse packet body. continuing reading the next packet: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn stream_send(
|
||||
addr: NodeAddress,
|
||||
packet_body: PacketBody,
|
||||
cmd_tx: &mpsc::Sender<Command>,
|
||||
packet_queue: &PacketQueue,
|
||||
) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
cmd_tx
|
||||
.send(Command::Connect(Dial {
|
||||
addr: multiaddr_from(addr),
|
||||
retry_count: 3,
|
||||
result_sender: tx,
|
||||
}))
|
||||
.await
|
||||
.expect("Command receiver should be always open");
|
||||
|
||||
match rx.await {
|
||||
Ok(Ok(peer_id)) => {
|
||||
cmd_tx
|
||||
.send(Command::StreamSend {
|
||||
peer_id,
|
||||
protocol: STREAM_PROTOCOL,
|
||||
data: packet_body.bytes(),
|
||||
})
|
||||
.await
|
||||
.expect("Command receiver should be always open");
|
||||
}
|
||||
Ok(Err(e)) => match e {
|
||||
nomos_libp2p::DialError::NoAddresses => {
|
||||
tracing::debug!("Dialing failed because the peer is the local node. Sending msg directly to the queue");
|
||||
packet_queue
|
||||
.send(packet_body)
|
||||
.await
|
||||
.expect("The receiving half of packet queue should be always open");
|
||||
}
|
||||
_ => tracing::error!("failed to dial with unrecoverable error: {e}"),
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::error!("channel closed before receiving: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn multiaddr_from(addr: NodeAddress) -> Multiaddr {
|
||||
match SocketAddr::from(addr) {
|
||||
SocketAddr::V4(addr) => Multiaddr::empty()
|
||||
.with(Protocol::Ip4(*addr.ip()))
|
||||
.with(Protocol::Udp(addr.port()))
|
||||
.with(Protocol::QuicV1),
|
||||
SocketAddr::V6(addr) => Multiaddr::empty()
|
||||
.with(Protocol::Ip6(*addr.ip()))
|
||||
.with(Protocol::Udp(addr.port()))
|
||||
.with(Protocol::QuicV1),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub(crate) struct MixnetMessage {
|
||||
pub topic: Topic,
|
||||
pub message: Box<[u8]>,
|
||||
}
|
||||
|
||||
impl MixnetMessage {
|
||||
pub fn as_bytes(&self) -> Vec<u8> {
|
||||
wire::serialize(self).expect("Couldn't serialize MixnetMessage")
|
||||
}
|
||||
|
||||
pub fn from_bytes(data: &[u8]) -> Result<Self, wire::Error> {
|
||||
wire::deserialize(data)
|
||||
}
|
||||
}
|
|
@ -1,5 +1,7 @@
|
|||
mod command;
|
||||
mod config;
|
||||
#[cfg(feature = "mixnet")]
|
||||
pub mod mixnet;
|
||||
pub(crate) mod swarm;
|
||||
|
||||
// std
|
||||
|
@ -9,6 +11,10 @@ use self::swarm::SwarmHandler;
|
|||
|
||||
// internal
|
||||
use super::NetworkBackend;
|
||||
#[cfg(feature = "mixnet")]
|
||||
use crate::backends::libp2p::mixnet::{init_mixnet, MixnetMessage, STREAM_PROTOCOL};
|
||||
#[cfg(feature = "mixnet")]
|
||||
use ::mixnet::client::MessageQueue;
|
||||
pub use nomos_libp2p::libp2p::gossipsub::{Message, TopicHash};
|
||||
// crates
|
||||
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState};
|
||||
|
@ -17,6 +23,8 @@ use tokio::sync::{broadcast, mpsc};
|
|||
pub struct Libp2p {
|
||||
events_tx: broadcast::Sender<Event>,
|
||||
commands_tx: mpsc::Sender<Command>,
|
||||
#[cfg(feature = "mixnet")]
|
||||
mixclient_message_queue: MessageQueue,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -46,6 +54,15 @@ impl NetworkBackend for Libp2p {
|
|||
|
||||
let mut swarm_handler =
|
||||
SwarmHandler::new(&config, commands_tx.clone(), commands_rx, events_tx.clone());
|
||||
|
||||
#[cfg(feature = "mixnet")]
|
||||
let mixclient_message_queue = init_mixnet(
|
||||
config.mixnet,
|
||||
overwatch_handle.runtime().clone(),
|
||||
commands_tx.clone(),
|
||||
swarm_handler.incoming_streams(STREAM_PROTOCOL),
|
||||
);
|
||||
|
||||
overwatch_handle.runtime().spawn(async move {
|
||||
swarm_handler.run(config.initial_peers).await;
|
||||
});
|
||||
|
@ -53,15 +70,35 @@ impl NetworkBackend for Libp2p {
|
|||
Self {
|
||||
events_tx,
|
||||
commands_tx,
|
||||
#[cfg(feature = "mixnet")]
|
||||
mixclient_message_queue,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "mixnet"))]
|
||||
async fn process(&self, msg: Self::Message) {
|
||||
if let Err(e) = self.commands_tx.send(msg).await {
|
||||
tracing::error!("failed to send command to nomos-libp2p: {e:?}");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "mixnet")]
|
||||
async fn process(&self, msg: Self::Message) {
|
||||
match msg {
|
||||
Command::Broadcast { topic, message } => {
|
||||
let msg = MixnetMessage { topic, message };
|
||||
if let Err(e) = self.mixclient_message_queue.send(msg.as_bytes()).await {
|
||||
tracing::error!("failed to send messasge to mixclient: {e}");
|
||||
}
|
||||
}
|
||||
cmd => {
|
||||
if let Err(e) = self.commands_tx.send(cmd).await {
|
||||
tracing::error!("failed to send command to libp2p swarm: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn subscribe(
|
||||
&mut self,
|
||||
kind: Self::EventKind,
|
||||
|
|
|
@ -4,10 +4,12 @@ use std::{
|
|||
};
|
||||
|
||||
use futures::AsyncWriteExt;
|
||||
#[cfg(feature = "mixnet")]
|
||||
use nomos_libp2p::libp2p_stream::IncomingStreams;
|
||||
use nomos_libp2p::{
|
||||
gossipsub,
|
||||
libp2p::{swarm::ConnectionId, Stream, StreamProtocol},
|
||||
libp2p_stream::{Control, IncomingStreams, OpenStreamError},
|
||||
libp2p_stream::{Control, OpenStreamError},
|
||||
BehaviourEvent, Multiaddr, PeerId, Swarm, SwarmEvent,
|
||||
};
|
||||
use tokio::sync::{broadcast, mpsc, oneshot};
|
||||
|
@ -290,7 +292,7 @@ impl SwarmHandler {
|
|||
std::time::Duration::from_secs(BACKOFF.pow(retry as u32))
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[cfg(feature = "mixnet")]
|
||||
pub fn incoming_streams(&mut self, protocol: StreamProtocol) -> IncomingStreams {
|
||||
self.stream_control.accept(protocol).unwrap()
|
||||
}
|
||||
|
|
|
@ -1,291 +0,0 @@
|
|||
use std::net::SocketAddr;
|
||||
|
||||
// internal
|
||||
use super::{
|
||||
libp2p::{self, swarm::SwarmHandler, Libp2pConfig, Topic},
|
||||
NetworkBackend,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use mixnet::{
|
||||
address::NodeAddress,
|
||||
client::{MessageQueue, MixClient, MixClientConfig},
|
||||
node::{MixNode, MixNodeConfig, Output, PacketQueue},
|
||||
packet::PacketBody,
|
||||
};
|
||||
use nomos_core::wire;
|
||||
use nomos_libp2p::{
|
||||
libp2p::{Stream, StreamProtocol},
|
||||
libp2p_stream::IncomingStreams,
|
||||
Multiaddr, Protocol,
|
||||
};
|
||||
// crates
|
||||
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::{
|
||||
runtime::Handle,
|
||||
sync::{broadcast, mpsc, oneshot},
|
||||
};
|
||||
|
||||
/// A Mixnet network backend broadcasts messages to the network with mixing packets through mixnet,
|
||||
/// and receives messages broadcasted from the network.
|
||||
pub struct MixnetNetworkBackend {
|
||||
libp2p_events_tx: broadcast::Sender<libp2p::Event>,
|
||||
libp2p_commands_tx: mpsc::Sender<libp2p::Command>,
|
||||
|
||||
mixclient_message_queue: MessageQueue,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct MixnetConfig {
|
||||
libp2p_config: Libp2pConfig,
|
||||
mixclient_config: MixClientConfig,
|
||||
mixnode_config: MixNodeConfig,
|
||||
}
|
||||
|
||||
const BUFFER_SIZE: usize = 64;
|
||||
const STREAM_PROTOCOL: StreamProtocol = StreamProtocol::new("/mixnet");
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl NetworkBackend for MixnetNetworkBackend {
|
||||
type Settings = MixnetConfig;
|
||||
type State = NoState<MixnetConfig>;
|
||||
type Message = libp2p::Command;
|
||||
type EventKind = libp2p::EventKind;
|
||||
type NetworkEvent = libp2p::Event;
|
||||
|
||||
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self {
|
||||
// TODO: One important task that should be spawned is
|
||||
// subscribing NewEntropy events that will be emitted from the consensus service soon.
|
||||
// so that new topology can be built internally.
|
||||
// In the mixnet spec, the robustness layer is responsible for this task.
|
||||
// We can implement the robustness layer in the mixnet-specific crate,
|
||||
// that we're going to define at the root of the project.
|
||||
|
||||
let (libp2p_commands_tx, libp2p_commands_rx) = tokio::sync::mpsc::channel(BUFFER_SIZE);
|
||||
let (libp2p_events_tx, _) = tokio::sync::broadcast::channel(BUFFER_SIZE);
|
||||
|
||||
let mut swarm_handler = SwarmHandler::new(
|
||||
&config.libp2p_config,
|
||||
libp2p_commands_tx.clone(),
|
||||
libp2p_commands_rx,
|
||||
libp2p_events_tx.clone(),
|
||||
);
|
||||
|
||||
// Run mixnode
|
||||
let (mixnode, packet_queue) = MixNode::new(config.mixnode_config).unwrap();
|
||||
let libp2p_cmd_tx = libp2p_commands_tx.clone();
|
||||
let queue = packet_queue.clone();
|
||||
overwatch_handle.runtime().spawn(async move {
|
||||
Self::run_mixnode(mixnode, queue, libp2p_cmd_tx).await;
|
||||
});
|
||||
let incoming_streams = swarm_handler.incoming_streams(STREAM_PROTOCOL);
|
||||
let runtime_handle = overwatch_handle.runtime().clone();
|
||||
let queue = packet_queue.clone();
|
||||
overwatch_handle.runtime().spawn(async move {
|
||||
Self::handle_incoming_streams(incoming_streams, queue, runtime_handle).await;
|
||||
});
|
||||
|
||||
// Run mixclient
|
||||
let (mixclient, message_queue) = MixClient::new(config.mixclient_config).unwrap();
|
||||
let libp2p_cmd_tx = libp2p_commands_tx.clone();
|
||||
overwatch_handle.runtime().spawn(async move {
|
||||
Self::run_mixclient(mixclient, packet_queue, libp2p_cmd_tx).await;
|
||||
});
|
||||
|
||||
// Run libp2p swarm to make progress
|
||||
overwatch_handle.runtime().spawn(async move {
|
||||
swarm_handler.run(config.libp2p_config.initial_peers).await;
|
||||
});
|
||||
|
||||
Self {
|
||||
libp2p_events_tx,
|
||||
libp2p_commands_tx,
|
||||
|
||||
mixclient_message_queue: message_queue,
|
||||
}
|
||||
}
|
||||
|
||||
async fn process(&self, msg: Self::Message) {
|
||||
match msg {
|
||||
libp2p::Command::Broadcast { topic, message } => {
|
||||
let msg = MixnetMessage { topic, message };
|
||||
if let Err(e) = self.mixclient_message_queue.send(msg.as_bytes()).await {
|
||||
tracing::error!("failed to send messasge to mixclient: {e}");
|
||||
}
|
||||
}
|
||||
cmd => {
|
||||
if let Err(e) = self.libp2p_commands_tx.send(cmd).await {
|
||||
tracing::error!("failed to send command to libp2p swarm: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn subscribe(
|
||||
&mut self,
|
||||
kind: Self::EventKind,
|
||||
) -> broadcast::Receiver<Self::NetworkEvent> {
|
||||
match kind {
|
||||
libp2p::EventKind::Message => self.libp2p_events_tx.subscribe(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MixnetNetworkBackend {
|
||||
async fn run_mixnode(
|
||||
mut mixnode: MixNode,
|
||||
packet_queue: PacketQueue,
|
||||
swarm_commands_tx: mpsc::Sender<libp2p::Command>,
|
||||
) {
|
||||
while let Some(output) = mixnode.next().await {
|
||||
match output {
|
||||
Output::Forward(packet) => {
|
||||
Self::stream_send(
|
||||
packet.address(),
|
||||
packet.body(),
|
||||
&swarm_commands_tx,
|
||||
&packet_queue,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Output::ReconstructedMessage(message) => {
|
||||
match MixnetMessage::from_bytes(&message) {
|
||||
Ok(msg) => {
|
||||
swarm_commands_tx
|
||||
.send(libp2p::Command::Broadcast {
|
||||
topic: msg.topic,
|
||||
message: msg.message,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("failed to parse message received from mixnet: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_mixclient(
|
||||
mut mixclient: MixClient,
|
||||
packet_queue: PacketQueue,
|
||||
swarm_commands_tx: mpsc::Sender<libp2p::Command>,
|
||||
) {
|
||||
while let Some(packet) = mixclient.next().await {
|
||||
Self::stream_send(
|
||||
packet.address(),
|
||||
packet.body(),
|
||||
&swarm_commands_tx,
|
||||
&packet_queue,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_incoming_streams(
|
||||
mut incoming_streams: IncomingStreams,
|
||||
packet_queue: PacketQueue,
|
||||
runtime_handle: Handle,
|
||||
) {
|
||||
while let Some((_, stream)) = incoming_streams.next().await {
|
||||
let queue = packet_queue.clone();
|
||||
runtime_handle.spawn(async move {
|
||||
if let Err(e) = Self::handle_stream(stream, queue).await {
|
||||
tracing::warn!("stream closed: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_stream(mut stream: Stream, packet_queue: PacketQueue) -> std::io::Result<()> {
|
||||
loop {
|
||||
match PacketBody::read_from(&mut stream).await? {
|
||||
Ok(packet_body) => {
|
||||
packet_queue
|
||||
.send(packet_body)
|
||||
.await
|
||||
.expect("The receiving half of packet queue should be always open");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"failed to parse packet body. continuing reading the next packet: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn stream_send(
|
||||
addr: NodeAddress,
|
||||
packet_body: PacketBody,
|
||||
swarm_commands_tx: &mpsc::Sender<libp2p::Command>,
|
||||
packet_queue: &PacketQueue,
|
||||
) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
swarm_commands_tx
|
||||
.send(libp2p::Command::Connect(libp2p::Dial {
|
||||
addr: Self::multiaddr_from(addr),
|
||||
retry_count: 3,
|
||||
result_sender: tx,
|
||||
}))
|
||||
.await
|
||||
.expect("Command receiver should be always open");
|
||||
|
||||
match rx.await {
|
||||
Ok(Ok(peer_id)) => {
|
||||
swarm_commands_tx
|
||||
.send(libp2p::Command::StreamSend {
|
||||
peer_id,
|
||||
protocol: STREAM_PROTOCOL,
|
||||
data: packet_body.bytes(),
|
||||
})
|
||||
.await
|
||||
.expect("Command receiver should be always open");
|
||||
}
|
||||
Ok(Err(e)) => match e {
|
||||
nomos_libp2p::DialError::NoAddresses => {
|
||||
tracing::debug!("Dialing failed because the peer is the local node. Sending msg directly to the queue");
|
||||
packet_queue
|
||||
.send(packet_body)
|
||||
.await
|
||||
.expect("The receiving half of packet queue should be always open");
|
||||
}
|
||||
_ => tracing::error!("failed to dial with unrecoverable error: {e}"),
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::error!("channel closed before receiving: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn multiaddr_from(addr: NodeAddress) -> Multiaddr {
|
||||
match SocketAddr::from(addr) {
|
||||
SocketAddr::V4(addr) => Multiaddr::empty()
|
||||
.with(Protocol::Ip4(*addr.ip()))
|
||||
.with(Protocol::Udp(addr.port()))
|
||||
.with(Protocol::QuicV1),
|
||||
SocketAddr::V6(addr) => Multiaddr::empty()
|
||||
.with(Protocol::Ip6(*addr.ip()))
|
||||
.with(Protocol::Udp(addr.port()))
|
||||
.with(Protocol::QuicV1),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct MixnetMessage {
|
||||
pub topic: Topic,
|
||||
pub message: Box<[u8]>,
|
||||
}
|
||||
|
||||
impl MixnetMessage {
|
||||
pub fn as_bytes(&self) -> Vec<u8> {
|
||||
wire::serialize(self).expect("Couldn't serialize MixnetMessage")
|
||||
}
|
||||
|
||||
pub fn from_bytes(data: &[u8]) -> Result<Self, wire::Error> {
|
||||
wire::deserialize(data)
|
||||
}
|
||||
}
|
|
@ -2,12 +2,9 @@ use super::*;
|
|||
use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::ServiceState};
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
|
||||
#[cfg(any(feature = "libp2p", feature = "mixnet"))]
|
||||
#[cfg(feature = "libp2p")]
|
||||
pub mod libp2p;
|
||||
|
||||
#[cfg(feature = "mixnet")]
|
||||
pub mod mixnet;
|
||||
|
||||
#[cfg(feature = "mock")]
|
||||
pub mod mock;
|
||||
|
||||
|
|
|
@ -7,14 +7,17 @@ publish = false
|
|||
[dependencies]
|
||||
nomos-node = { path = "../nodes/nomos-node", default-features = false }
|
||||
carnot-consensus = { path = "../nomos-services/carnot-consensus" }
|
||||
nomos-network = { path = "../nomos-services/network" }
|
||||
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
|
||||
mixnet = { path = "../mixnet" }
|
||||
nomos-log = { path = "../nomos-services/log" }
|
||||
nomos-api = { path = "../nomos-services/api" }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||
nomos-core = { path = "../nomos-core" }
|
||||
carnot-engine = { path = "../consensus/carnot-engine", features = ["serde"] }
|
||||
nomos-mempool = { path = "../nomos-services/mempool", features = ["mock"] }
|
||||
nomos-mempool = { path = "../nomos-services/mempool", features = [
|
||||
"mock",
|
||||
"libp2p",
|
||||
] }
|
||||
nomos-da = { path = "../nomos-services/data-availability" }
|
||||
full-replication = { path = "../nomos-da/full-replication" }
|
||||
rand = "0.8"
|
||||
|
@ -48,6 +51,5 @@ path = "src/tests/cli.rs"
|
|||
|
||||
|
||||
[features]
|
||||
libp2p = ["nomos-network/libp2p", "nomos-mempool/libp2p"]
|
||||
mixnet = ["nomos-network/mixnet", "nomos-mempool/mixnet"]
|
||||
mixnet = ["nomos-network/mixnet"]
|
||||
metrics = ["nomos-node/metrics"]
|
||||
|
|
|
@ -2,6 +2,11 @@
|
|||
use std::net::SocketAddr;
|
||||
use std::process::{Child, Command, Stdio};
|
||||
use std::time::Duration;
|
||||
#[cfg(feature = "mixnet")]
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
num::NonZeroU8,
|
||||
};
|
||||
// internal
|
||||
use super::{create_tempdir, persist_tempdir, LOGS_PREFIX};
|
||||
use crate::{adjust_timeout, get_available_port, ConsensusConfig, Node, SpawnConfig};
|
||||
|
@ -9,12 +14,20 @@ use carnot_consensus::{CarnotInfo, CarnotSettings};
|
|||
use carnot_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay, TreeOverlaySettings};
|
||||
use carnot_engine::{NodeId, Overlay};
|
||||
use full_replication::Certificate;
|
||||
#[cfg(feature = "mixnet")]
|
||||
use mixnet::{
|
||||
address::NodeAddress,
|
||||
client::MixClientConfig,
|
||||
node::MixNodeConfig,
|
||||
topology::{MixNodeInfo, MixnetTopology},
|
||||
};
|
||||
use nomos_core::{block::Block, header::HeaderId};
|
||||
use nomos_libp2p::{Multiaddr, Swarm};
|
||||
use nomos_log::{LoggerBackend, LoggerFormat};
|
||||
use nomos_mempool::MempoolMetrics;
|
||||
use nomos_network::backends::libp2p::Libp2pConfig;
|
||||
use nomos_network::NetworkConfig;
|
||||
#[cfg(feature = "mixnet")]
|
||||
use nomos_network::backends::libp2p::mixnet::MixnetConfig;
|
||||
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
|
||||
use nomos_node::{api::AxumBackendSettings, Config, Tx};
|
||||
// crates
|
||||
use fraction::Fraction;
|
||||
|
@ -220,7 +233,6 @@ impl Node for NomosNode {
|
|||
.backend
|
||||
.initial_peers
|
||||
.push(first_node_addr.clone());
|
||||
|
||||
node_configs.push(conf);
|
||||
}
|
||||
node_configs
|
||||
|
@ -231,9 +243,11 @@ impl Node for NomosNode {
|
|||
let mut prev_node_addr = node_address(&next_leader_config);
|
||||
let mut node_configs = vec![next_leader_config];
|
||||
for mut conf in configs {
|
||||
conf.network.backend.initial_peers.push(prev_node_addr);
|
||||
conf.network
|
||||
.backend
|
||||
.initial_peers
|
||||
.push(prev_node_addr.clone());
|
||||
prev_node_addr = node_address(&conf);
|
||||
|
||||
node_configs.push(conf);
|
||||
}
|
||||
node_configs
|
||||
|
@ -251,6 +265,9 @@ impl Node for NomosNode {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "mixnet")]
|
||||
const NUM_MIXNODE_CANDIDATES: usize = 2;
|
||||
|
||||
/// Returns the config of the next leader and all other nodes.
|
||||
///
|
||||
/// Depending on the network topology, the next leader must be spawned first,
|
||||
|
@ -263,14 +280,23 @@ fn create_node_configs(consensus: ConsensusConfig) -> (Config, Vec<Config>) {
|
|||
thread_rng().fill(id);
|
||||
}
|
||||
|
||||
#[cfg(feature = "mixnet")]
|
||||
let (mixclient_config, mixnode_configs) = create_mixnet_config(&ids);
|
||||
|
||||
let mut configs = ids
|
||||
.iter()
|
||||
.map(|id| {
|
||||
.enumerate()
|
||||
.map(|(_i, id)| {
|
||||
create_node_config(
|
||||
ids.iter().copied().map(NodeId::new).collect(),
|
||||
*id,
|
||||
consensus.threshold,
|
||||
consensus.timeout,
|
||||
#[cfg(feature = "mixnet")]
|
||||
MixnetConfig {
|
||||
mixclient: mixclient_config.clone(),
|
||||
mixnode: mixnode_configs[_i].clone(),
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
@ -282,9 +308,31 @@ fn create_node_configs(consensus: ConsensusConfig) -> (Config, Vec<Config>) {
|
|||
.position(|&id| NodeId::from(id) == next_leader)
|
||||
.unwrap();
|
||||
|
||||
let next_leader_config = configs.swap_remove(next_leader_idx);
|
||||
#[cfg(not(feature = "mixnet"))]
|
||||
{
|
||||
let next_leader_config = configs.swap_remove(next_leader_idx);
|
||||
(next_leader_config, configs)
|
||||
}
|
||||
#[cfg(feature = "mixnet")]
|
||||
{
|
||||
let mut next_leader_config = configs.swap_remove(next_leader_idx);
|
||||
|
||||
(next_leader_config, configs)
|
||||
// Build a topology using only a subset of nodes.
|
||||
let mut mixnode_candidates = vec![&next_leader_config];
|
||||
configs
|
||||
.iter()
|
||||
.take(NUM_MIXNODE_CANDIDATES - 1)
|
||||
.for_each(|config| mixnode_candidates.push(config));
|
||||
let topology = build_mixnet_topology(&mixnode_candidates);
|
||||
|
||||
// Set the topology to all configs
|
||||
next_leader_config.network.backend.mixnet.mixclient.topology = topology.clone();
|
||||
configs.iter_mut().for_each(|config| {
|
||||
config.network.backend.mixnet.mixclient.topology = topology.clone();
|
||||
});
|
||||
|
||||
(next_leader_config, configs)
|
||||
}
|
||||
}
|
||||
|
||||
fn create_node_config(
|
||||
|
@ -292,12 +340,15 @@ fn create_node_config(
|
|||
id: [u8; 32],
|
||||
threshold: Fraction,
|
||||
timeout: Duration,
|
||||
#[cfg(feature = "mixnet")] mixnet_config: MixnetConfig,
|
||||
) -> Config {
|
||||
let mut config = Config {
|
||||
network: NetworkConfig {
|
||||
backend: Libp2pConfig {
|
||||
inner: Default::default(),
|
||||
initial_peers: vec![],
|
||||
#[cfg(feature = "mixnet")]
|
||||
mixnet: mixnet_config,
|
||||
},
|
||||
},
|
||||
consensus: CarnotSettings {
|
||||
|
@ -343,6 +394,49 @@ fn create_node_config(
|
|||
config
|
||||
}
|
||||
|
||||
#[cfg(feature = "mixnet")]
|
||||
fn create_mixnet_config(ids: &[[u8; 32]]) -> (MixClientConfig, Vec<MixNodeConfig>) {
|
||||
let mixnode_configs: Vec<MixNodeConfig> = ids
|
||||
.iter()
|
||||
.map(|id| MixNodeConfig {
|
||||
encryption_private_key: *id,
|
||||
delay_rate_per_min: 100000000.0,
|
||||
})
|
||||
.collect();
|
||||
// Build an empty topology because it will be constructed with meaningful node infos later
|
||||
let topology = MixnetTopology::new(Vec::new(), 0, 0, [1u8; 32]).unwrap();
|
||||
|
||||
(
|
||||
MixClientConfig {
|
||||
topology,
|
||||
emission_rate_per_min: 120.0,
|
||||
redundancy: NonZeroU8::new(1).unwrap(),
|
||||
},
|
||||
mixnode_configs,
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(feature = "mixnet")]
|
||||
fn build_mixnet_topology(mixnode_candidates: &[&Config]) -> MixnetTopology {
|
||||
use mixnet::crypto::public_key_from;
|
||||
|
||||
let candidates = mixnode_candidates
|
||||
.iter()
|
||||
.map(|config| {
|
||||
MixNodeInfo::new(
|
||||
NodeAddress::from(SocketAddr::new(
|
||||
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
|
||||
config.network.backend.inner.port,
|
||||
)),
|
||||
public_key_from(config.network.backend.mixnet.mixnode.encryption_private_key),
|
||||
)
|
||||
.unwrap()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let num_layers = candidates.len();
|
||||
MixnetTopology::new(candidates, num_layers, 1, [1u8; 32]).unwrap()
|
||||
}
|
||||
|
||||
fn node_address(config: &Config) -> Multiaddr {
|
||||
Swarm::multiaddr(
|
||||
std::net::Ipv4Addr::new(127, 0, 0, 1),
|
||||
|
|
|
@ -7,7 +7,7 @@ use nomos_cli::{
|
|||
use nomos_core::da::{blob::Blob as _, DaProtocol};
|
||||
use std::{io::Write, time::Duration};
|
||||
use tempfile::NamedTempFile;
|
||||
use tests::{adjust_timeout, get_available_port, nodes::nomos::Pool, Node, NomosNode, SpawnConfig};
|
||||
use tests::{adjust_timeout, nodes::nomos::Pool, Node, NomosNode, SpawnConfig};
|
||||
|
||||
const CLI_BIN: &str = "../target/debug/nomos-cli";
|
||||
|
||||
|
@ -36,13 +36,9 @@ async fn disseminate(config: &mut Disseminate) {
|
|||
let node_configs = NomosNode::node_configs(SpawnConfig::chain_happy(2));
|
||||
let first_node = NomosNode::spawn(node_configs[0].clone()).await;
|
||||
|
||||
let mut network_config = node_configs[1].network.clone();
|
||||
// use a new port because the old port is sometimes not closed immediately
|
||||
network_config.backend.inner.port = get_available_port();
|
||||
|
||||
let mut file = NamedTempFile::new().unwrap();
|
||||
let config_path = file.path().to_owned();
|
||||
serde_yaml::to_writer(&mut file, &network_config).unwrap();
|
||||
serde_yaml::to_writer(&mut file, &node_configs[1].network).unwrap();
|
||||
let da_protocol = DaProtocolChoice {
|
||||
da_protocol: Protocol::FullReplication,
|
||||
settings: ProtocolSettings {
|
||||
|
|
|
@ -14,7 +14,7 @@ struct Info {
|
|||
}
|
||||
|
||||
async fn happy_test(nodes: &[NomosNode]) {
|
||||
let timeout = adjust_timeout(Duration::from_secs(30));
|
||||
let timeout = adjust_timeout(Duration::from_secs(60));
|
||||
let timeout = tokio::time::sleep(timeout);
|
||||
tokio::select! {
|
||||
_ = timeout => panic!("timed out waiting for nodes to reach view {}", TARGET_VIEW),
|
||||
|
|
Loading…
Reference in New Issue