diff --git a/nomos-mix/core/Cargo.toml b/nomos-mix/core/Cargo.toml index 4a228442..190f0bee 100644 --- a/nomos-mix/core/Cargo.toml +++ b/nomos-mix/core/Cargo.toml @@ -13,7 +13,6 @@ rand = "0.8" serde = { version = "1.0", features = ["derive"] } nomos-mix-message = { path = "../message" } futures = "0.3" -multiaddr = "0.18" x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] } fixed = { version = "1", features = ["serde-str"] } diff --git a/nomos-mix/core/src/conn_maintenance.rs b/nomos-mix/core/src/conn_maintenance.rs index ae2cb928..cc4d66c8 100644 --- a/nomos-mix/core/src/conn_maintenance.rs +++ b/nomos-mix/core/src/conn_maintenance.rs @@ -1,11 +1,11 @@ use std::{ collections::{HashMap, HashSet}, fmt::Debug, + hash::Hash, time::Duration, }; use fixed::types::U57F7; -use multiaddr::Multiaddr; use nomos_mix_message::MixMessage; use rand::RngCore; use serde::{Deserialize, Serialize}; @@ -22,28 +22,33 @@ pub struct ConnectionMaintenanceSettings { /// Connection maintenance to detect malicious and unhealthy peers /// based on the number of messages sent by each peer in time windows -pub struct ConnectionMaintenance +pub struct ConnectionMaintenance where M: MixMessage, R: RngCore, { settings: ConnectionMaintenanceSettings, - membership: Membership, + membership: Membership, rng: R, - connected_peers: HashSet, - malicious_peers: HashSet, + connected_peers: HashSet
, + malicious_peers: HashSet
, /// Monitors to measure the number of effective and drop messages sent by each peer /// NOTE: We keep this optional until we gain confidence in parameter values that don't cause false detection. - monitors: Option>, + monitors: Option>, } -impl ConnectionMaintenance +impl ConnectionMaintenance where + Address: Eq + Hash + Clone + Debug, M: MixMessage, M::PublicKey: PartialEq, R: RngCore, { - pub fn new(settings: ConnectionMaintenanceSettings, membership: Membership, rng: R) -> Self { + pub fn new( + settings: ConnectionMaintenanceSettings, + membership: Membership, + rng: R, + ) -> Self { Self { settings, membership, @@ -55,7 +60,7 @@ where } /// Choose the `peering_degree` number of remote nodes to connect to. - pub fn bootstrap(&mut self) -> Vec { + pub fn bootstrap(&mut self) -> Vec
{ self.membership .choose_remote_nodes(&mut self.rng, self.settings.peering_degree) .iter() @@ -65,24 +70,24 @@ where } /// Add a peer, which is fully connected, to the list of connected peers. - pub fn add_connected_peer(&mut self, peer: Multiaddr) { + pub fn add_connected_peer(&mut self, peer: Address) { self.connected_peers.insert(peer); } /// Remove a peer that has been disconnected. - pub fn remove_connected_peer(&mut self, peer: &Multiaddr) { + pub fn remove_connected_peer(&mut self, peer: &Address) { self.connected_peers.remove(peer); } /// Return the set of connected peers. - pub fn connected_peers(&self) -> &HashSet { + pub fn connected_peers(&self) -> &HashSet
{ &self.connected_peers } /// Record a effective message sent by the [`peer`]. /// If the peer was added during the current time window, the peer is not monitored /// until the next time window, to avoid false detection. - pub fn record_effective_message(&mut self, peer: &Multiaddr) { + pub fn record_effective_message(&mut self, peer: &Address) { if let Some(monitors) = self.monitors.as_mut() { if let Some(monitor) = monitors.get_mut(peer) { monitor.effective_messages = monitor @@ -102,7 +107,7 @@ where /// Record a drop message sent by the [`peer`]. /// If the peer was added during the current time window, the peer is not monitored /// until the next time window, to avoid false detection. - pub fn record_drop_message(&mut self, peer: &Multiaddr) { + pub fn record_drop_message(&mut self, peer: &Address) { if let Some(monitors) = self.monitors.as_mut() { if let Some(monitor) = monitors.get_mut(peer) { monitor.drop_messages = monitor @@ -125,9 +130,9 @@ where pub fn reset( &mut self, ) -> Option<( - HashMap, - HashSet, - HashSet, + HashMap, + HashSet
, + HashSet
, )> { let (malicious_peers, unhealthy_peers) = self.analyze_monitors(); @@ -157,7 +162,7 @@ where /// Find malicious peers and unhealthy peers by analyzing connection monitors. /// The set of malicious peers is disjoint from the set of unhealthy peers. - fn analyze_monitors(&mut self) -> (HashSet, HashSet) { + fn analyze_monitors(&mut self) -> (HashSet
, HashSet
) { let mut malicious_peers = HashSet::new(); let mut unhealthy_peers = HashSet::new(); @@ -185,7 +190,7 @@ where /// Reset monitors for all connected peers to be monitored in the next time window. /// To avoid false detection, we only monitor peers that were already connected /// before the time window started. - fn reset_monitors(&mut self) -> Option> { + fn reset_monitors(&mut self) -> Option> { match self.monitors.take() { Some(old_monitors) => { self.monitors = Some( @@ -410,9 +415,9 @@ mod tests { fn init_maintenance( settings: ConnectionMaintenanceSettings, node_count: usize, - ) -> ConnectionMaintenance { + ) -> ConnectionMaintenance { let nodes = nodes(node_count); - let mut maintenance = ConnectionMaintenance::::new( + let mut maintenance = ConnectionMaintenance::::new( settings, Membership::new(nodes.clone(), nodes[0].public_key), thread_rng(), @@ -424,12 +429,10 @@ mod tests { maintenance } - fn nodes(count: usize) -> Vec::PublicKey>> { + fn nodes(count: usize) -> Vec::PublicKey>> { (0..count) .map(|i| Node { - address: format!("/ip4/127.0.0.1/udp/{}/quic-v1", 1000 + i) - .parse() - .unwrap(), + address: i, public_key: [i as u8; 32], }) .collect() diff --git a/nomos-mix/core/src/membership.rs b/nomos-mix/core/src/membership.rs index 04dd8386..6e6e9af6 100644 --- a/nomos-mix/core/src/membership.rs +++ b/nomos-mix/core/src/membership.rs @@ -1,6 +1,5 @@ -use std::collections::HashSet; +use std::{collections::HashSet, hash::Hash}; -use multiaddr::Multiaddr; use nomos_mix_message::MixMessage; use rand::{ seq::{IteratorRandom, SliceRandom}, @@ -9,26 +8,27 @@ use rand::{ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug)] -pub struct Membership +pub struct Membership where M: MixMessage, { - remote_nodes: Vec>, - local_node: Node, + remote_nodes: Vec>, + local_node: Node, } #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Node { - pub address: Multiaddr, +pub struct Node { + pub address: Address, pub public_key: K, } -impl Membership +impl Membership where + Address: Eq + Hash, M: MixMessage, M::PublicKey: PartialEq, { - pub fn new(nodes: Vec>, local_public_key: M::PublicKey) -> Self { + pub fn new(nodes: Vec>, local_public_key: M::PublicKey) -> Self { let mut remote_nodes = Vec::with_capacity(nodes.len() - 1); let mut local_node = None; nodes.into_iter().for_each(|node| { @@ -49,7 +49,7 @@ where &self, rng: &mut R, amount: usize, - ) -> Vec<&Node> { + ) -> Vec<&Node> { self.remote_nodes.choose_multiple(rng, amount).collect() } @@ -57,15 +57,15 @@ where &self, rng: &mut R, amount: usize, - exclude_addrs: &HashSet, - ) -> Vec<&Node> { + exclude_addrs: &HashSet
, + ) -> Vec<&Node> { self.remote_nodes .iter() .filter(|node| !exclude_addrs.contains(&node.address)) .choose_multiple(rng, amount) } - pub fn local_node(&self) -> &Node { + pub fn local_node(&self) -> &Node { &self.local_node } diff --git a/nomos-mix/core/src/message_blend/crypto.rs b/nomos-mix/core/src/message_blend/crypto.rs index 62a8a865..aa6b1074 100644 --- a/nomos-mix/core/src/message_blend/crypto.rs +++ b/nomos-mix/core/src/message_blend/crypto.rs @@ -1,3 +1,5 @@ +use std::hash::Hash; + use crate::membership::Membership; use nomos_mix_message::MixMessage; use rand::RngCore; @@ -5,12 +7,12 @@ use serde::{Deserialize, Serialize}; /// [`CryptographicProcessor`] is responsible for wrapping and unwrapping messages /// for the message indistinguishability. -pub struct CryptographicProcessor +pub struct CryptographicProcessor where M: MixMessage, { settings: CryptographicProcessorSettings, - membership: Membership, + membership: Membership, rng: R, } @@ -20,15 +22,16 @@ pub struct CryptographicProcessorSettings { pub num_mix_layers: usize, } -impl CryptographicProcessor +impl CryptographicProcessor where R: RngCore, + Address: Eq + Hash, M: MixMessage, M::PublicKey: Clone + PartialEq, { pub fn new( settings: CryptographicProcessorSettings, - membership: Membership, + membership: Membership, rng: R, ) -> Self { Self { diff --git a/nomos-mix/core/src/message_blend/mod.rs b/nomos-mix/core/src/message_blend/mod.rs index 2cc988b6..76deddce 100644 --- a/nomos-mix/core/src/message_blend/mod.rs +++ b/nomos-mix/core/src/message_blend/mod.rs @@ -5,6 +5,7 @@ pub use crypto::CryptographicProcessorSettings; use futures::{Stream, StreamExt}; use rand::RngCore; use std::fmt::Debug; +use std::hash::Hash; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; @@ -34,22 +35,23 @@ where /// [`MessageBlendStream`] handles the entire mixing tiers process /// - Unwraps incoming messages received from network using [`CryptographicProcessor`] /// - Pushes unwrapped messages to [`TemporalProcessor`] -pub struct MessageBlendStream +pub struct MessageBlendStream where M: MixMessage, { input_stream: S, output_stream: Pin + Send + Sync + 'static>>, temporal_sender: UnboundedSender, - cryptographic_processor: CryptographicProcessor, + cryptographic_processor: CryptographicProcessor, _rng: PhantomData, _scheduler: PhantomData, } -impl MessageBlendStream +impl MessageBlendStream where S: Stream>, Rng: RngCore + Unpin + Send + 'static, + Address: Eq + Hash, M: MixMessage, M::PrivateKey: Serialize + DeserializeOwned, M::PublicKey: Clone + PartialEq, @@ -59,7 +61,7 @@ where pub fn new( input_stream: S, settings: MessageBlendSettings, - membership: Membership, + membership: Membership, scheduler: Scheduler, cryptographic_processor_rng: Rng, ) -> Self { @@ -100,10 +102,11 @@ where } } -impl Stream for MessageBlendStream +impl Stream for MessageBlendStream where S: Stream> + Unpin, Rng: RngCore + Unpin + Send + 'static, + Address: Eq + Hash + Unpin, M: MixMessage + Unpin, M::PrivateKey: Serialize + DeserializeOwned + Unpin, M::PublicKey: Clone + PartialEq + Unpin, @@ -120,9 +123,10 @@ where } } -pub trait MessageBlendExt: Stream> +pub trait MessageBlendExt: Stream> where Rng: RngCore + Send + Unpin + 'static, + Address: Eq + Hash, M: MixMessage, M::PrivateKey: Serialize + DeserializeOwned, M::PublicKey: Clone + PartialEq, @@ -132,10 +136,10 @@ where fn blend( self, message_blend_settings: MessageBlendSettings, - membership: Membership, + membership: Membership, scheduler: Scheduler, cryptographic_processor_rng: Rng, - ) -> MessageBlendStream + ) -> MessageBlendStream where Self: Sized + Unpin, { @@ -149,10 +153,11 @@ where } } -impl MessageBlendExt for T +impl MessageBlendExt for T where T: Stream>, Rng: RngCore + Unpin + Send + 'static, + Address: Eq + Hash, M: MixMessage, M::PrivateKey: Clone + Serialize + DeserializeOwned + PartialEq, M::PublicKey: Clone + Serialize + DeserializeOwned + PartialEq, diff --git a/nomos-mix/network/src/behaviour.rs b/nomos-mix/network/src/behaviour.rs index 9b41c4c7..d0c15563 100644 --- a/nomos-mix/network/src/behaviour.rs +++ b/nomos-mix/network/src/behaviour.rs @@ -37,7 +37,7 @@ where { config: Config, /// Connection maintenance - conn_maintenance: ConnectionMaintenance, + conn_maintenance: ConnectionMaintenance, peer_address_map: PeerAddressMap, /// Queue of events to yield to the swarm. events: VecDeque>, @@ -69,9 +69,12 @@ where M::PublicKey: PartialEq, R: RngCore, { - pub fn new(config: Config, membership: Membership, rng: R) -> Self { - let mut conn_maintenance = - ConnectionMaintenance::::new(config.conn_maintenance_settings, membership, rng); + pub fn new(config: Config, membership: Membership, rng: R) -> Self { + let mut conn_maintenance = ConnectionMaintenance::::new( + config.conn_maintenance_settings, + membership, + rng, + ); // Bootstrap connections with initial peers randomly chosen. let peer_addrs: Vec = conn_maintenance.bootstrap(); diff --git a/nomos-mix/network/src/lib.rs b/nomos-mix/network/src/lib.rs index 9cc4420b..cf2f4645 100644 --- a/nomos-mix/network/src/lib.rs +++ b/nomos-mix/network/src/lib.rs @@ -95,7 +95,7 @@ mod test { } fn new_swarm( - membership: Membership, + membership: Membership, ) -> Swarm> { let conn_maintenance_settings = ConnectionMaintenanceSettings { peering_degree: membership.size() - 1, // excluding the local node @@ -151,7 +151,7 @@ mod test { fn nodes( count: usize, base_port: usize, - ) -> Vec::PublicKey>> { + ) -> Vec::PublicKey>> { (0..count) .map(|i| Node { address: format!("/ip4/127.0.0.1/udp/{}/quic-v1", base_port + i) diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index 849a7b66..8cc5bb83 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -191,7 +191,7 @@ pub struct TestDaNetworkSettings { pub struct TestMixSettings { pub backend: Libp2pMixBackendSettings, pub private_key: x25519_dalek::StaticSecret, - pub membership: Vec::PublicKey>>, + pub membership: Vec::PublicKey>>, } pub fn new_node( diff --git a/nomos-services/mix/src/backends/libp2p.rs b/nomos-services/mix/src/backends/libp2p.rs index 962b7f68..41ecd685 100644 --- a/nomos-services/mix/src/backends/libp2p.rs +++ b/nomos-services/mix/src/backends/libp2p.rs @@ -42,11 +42,12 @@ const CHANNEL_SIZE: usize = 64; #[async_trait] impl MixBackend for Libp2pMixBackend { type Settings = Libp2pMixBackendSettings; + type Address = Multiaddr; fn new( config: Self::Settings, overwatch_handle: OverwatchHandle, - membership: Membership, + membership: Membership, rng: R, ) -> Self where @@ -112,7 +113,7 @@ where { fn new( config: Libp2pMixBackendSettings, - membership: Membership, + membership: Membership, rng: R, swarm_messages_receiver: mpsc::Receiver, incoming_message_sender: broadcast::Sender>, diff --git a/nomos-services/mix/src/backends/mod.rs b/nomos-services/mix/src/backends/mod.rs index 8bab1449..2d0eb7ab 100644 --- a/nomos-services/mix/src/backends/mod.rs +++ b/nomos-services/mix/src/backends/mod.rs @@ -1,7 +1,7 @@ #[cfg(feature = "libp2p")] pub mod libp2p; -use std::{fmt::Debug, pin::Pin}; +use std::{fmt::Debug, hash::Hash, pin::Pin}; use futures::Stream; use nomos_mix::membership::Membership; @@ -13,11 +13,12 @@ use rand::RngCore; #[async_trait::async_trait] pub trait MixBackend { type Settings: Clone + Debug + Send + Sync + 'static; + type Address: Eq + Hash + Clone; fn new( config: Self::Settings, overwatch_handle: OverwatchHandle, - membership: Membership, + membership: Membership, rng: R, ) -> Self where diff --git a/nomos-services/mix/src/lib.rs b/nomos-services/mix/src/lib.rs index 2338fbc4..9fc76939 100644 --- a/nomos-services/mix/src/lib.rs +++ b/nomos-services/mix/src/lib.rs @@ -30,6 +30,7 @@ use rand::SeedableRng; use rand_chacha::ChaCha12Rng; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::fmt::Debug; +use std::hash::Hash; use std::time::Duration; use tokio::sync::mpsc; use tokio::time; @@ -50,7 +51,7 @@ where backend: Backend, service_state: ServiceStateHandle, network_relay: Relay>, - membership: Membership, + membership: Membership, } impl ServiceData for MixService @@ -61,7 +62,7 @@ where Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned, { const SERVICE_ID: ServiceId = "Mix"; - type Settings = MixConfig; + type Settings = MixConfig; type State = NoState; type StateOperator = NoOperator; type Message = ServiceMessage; @@ -72,6 +73,7 @@ impl ServiceCore for MixService where Backend: MixBackend + Send + 'static, Backend::Settings: Clone, + Backend::Address: Unpin + Send + Sync + 'static, Network: NetworkAdapter + Send + Sync + 'static, Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned + Send + Sync + 'static, @@ -224,7 +226,11 @@ where fn wrap_and_send_to_persistent_transmission( message: Vec, - cryptographic_processor: &mut CryptographicProcessor, + cryptographic_processor: &mut CryptographicProcessor< + ChaCha12Rng, + Backend::Address, + SphinxMessage, + >, persistent_sender: &mpsc::UnboundedSender>, ) { match cryptographic_processor.wrap_message(&message) { @@ -241,12 +247,12 @@ where } #[derive(Serialize, Deserialize, Clone, Debug)] -pub struct MixConfig { +pub struct MixConfig { pub backend: BackendSettings, pub message_blend: MessageBlendSettings, pub persistent_transmission: PersistentTransmissionSettings, pub cover_traffic: CoverTrafficExtSettings, - pub membership: Vec::PublicKey>>, + pub membership: Vec::PublicKey>>, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -256,13 +262,16 @@ pub struct CoverTrafficExtSettings { } impl CoverTrafficExtSettings { - fn cover_traffic_settings( + fn cover_traffic_settings
( &self, - membership: &Membership, + membership: &Membership, cryptographic_processor_settings: &CryptographicProcessorSettings< ::PrivateKey, >, - ) -> CoverTrafficSettings { + ) -> CoverTrafficSettings + where + Address: Eq + Hash, + { CoverTrafficSettings { node_id: membership.local_node().public_key, number_of_hops: cryptographic_processor_settings.num_mix_layers, @@ -301,8 +310,11 @@ impl CoverTrafficExtSettings { } } -impl MixConfig { - fn membership(&self) -> Membership { +impl MixConfig +where + Address: Eq + Hash + Clone, +{ + fn membership(&self) -> Membership { let public_key = x25519_dalek::PublicKey::from(&x25519_dalek::StaticSecret::from( self.message_blend.cryptographic_processor.private_key, )) diff --git a/testnet/cfgsync/src/config.rs b/testnet/cfgsync/src/config.rs index 44a5c978..d4403097 100644 --- a/testnet/cfgsync/src/config.rs +++ b/testnet/cfgsync/src/config.rs @@ -176,8 +176,8 @@ fn update_da_peer_addresses( fn update_mix_membership( hosts: Vec, - membership: Vec::PublicKey>>, -) -> Vec::PublicKey>> { + membership: Vec::PublicKey>>, +) -> Vec::PublicKey>> { membership .into_iter() .zip(hosts) diff --git a/tests/src/topology/configs/mix.rs b/tests/src/topology/configs/mix.rs index b8ad089d..4e6fb50f 100644 --- a/tests/src/topology/configs/mix.rs +++ b/tests/src/topology/configs/mix.rs @@ -11,7 +11,7 @@ use crate::get_available_port; pub struct GeneralMixConfig { pub backend: Libp2pMixBackendSettings, pub private_key: x25519_dalek::StaticSecret, - pub membership: Vec::PublicKey>>, + pub membership: Vec::PublicKey>>, } pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec { @@ -50,7 +50,9 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec { configs } -fn mix_nodes(configs: &[GeneralMixConfig]) -> Vec::PublicKey>> { +fn mix_nodes( + configs: &[GeneralMixConfig], +) -> Vec::PublicKey>> { configs .iter() .map(|config| Node {