diff --git a/nomos-mix/core/src/conn_maintenance.rs b/nomos-mix/core/src/conn_maintenance.rs new file mode 100644 index 00000000..70542c71 --- /dev/null +++ b/nomos-mix/core/src/conn_maintenance.rs @@ -0,0 +1,168 @@ +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, + hash::Hash, + time::Duration, +}; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct ConnectionMaintenanceSettings { + /// Time interval to measure/evaluate the number of messages sent by each peer. + pub time_window: Duration, + /// The number of effective (data or cover) messages that a peer is expected to send in a given time window. + /// If the measured count is greater than (expected * (1 + tolerance)), the peer is considered malicious. + /// If the measured count is less than (expected * (1 - tolerance)), the peer is considered unhealthy. + pub expected_effective_messages: f32, + pub effective_message_tolerance: f32, + /// The number of drop messages that a peer is expected to send in a given time window. + /// If the measured count is greater than (expected * (1 + tolerance)), the peer is considered malicious. + /// If the measured count is less than (expected * (1 - tolerance)), the peer is considered unhealthy. + pub expected_drop_messages: f32, + pub drop_message_tolerance: f32, +} + +/// Connection maintenance to detect malicious and unhealthy peers +/// based on the number of messages sent by each peer in time windows +pub struct ConnectionMaintenance { + settings: ConnectionMaintenanceSettings, + /// Meters to count the number of effective and drop messages sent by each peer + meters: HashMap, +} + +impl ConnectionMaintenance +where + Peer: Debug + Eq + Hash + Clone, +{ + pub fn new(settings: ConnectionMaintenanceSettings) -> Self { + Self { + settings, + meters: HashMap::new(), + } + } + + /// Record a effective message sent by the [`peer`] + pub fn add_effective(&mut self, peer: Peer) { + self.meter(peer).effective_messages += 1; + } + + /// Record a drop message sent by the [`peer`] + pub fn add_drop(&mut self, peer: Peer) { + self.meter(peer).drop_messages += 1; + } + + fn meter(&mut self, peer: Peer) -> &mut ConnectionMeter { + self.meters.entry(peer).or_insert_with(ConnectionMeter::new) + } + + /// Return malicious peers and unhealthy peers, and reset the counters of effective/drop messages. + /// This function must be called at the end of each time window. + /// The set of malicious peers doesn't have the intersection with the set of unhealthy peers. + pub fn reset(&mut self) -> (HashSet, HashSet) { + let mut malicious_peers = HashSet::new(); + let mut unhealthy_peers = HashSet::new(); + + self.meters.iter().for_each(|(peer, meter)| { + if meter.is_malicious(&self.settings) { + malicious_peers.insert(peer.clone()); + } else if meter.is_unhealthy(&self.settings) { + unhealthy_peers.insert(peer.clone()); + } + }); + self.meters.clear(); + + (malicious_peers, unhealthy_peers) + } +} + +/// Meter to count the number of effective and drop messages sent by a peer +#[derive(Debug)] +struct ConnectionMeter { + effective_messages: usize, + drop_messages: usize, +} + +impl ConnectionMeter { + fn new() -> Self { + Self { + effective_messages: 0, + drop_messages: 0, + } + } + + /// Check if the peer is malicious based on the number of effective and drop messages sent + fn is_malicious(&self, settings: &ConnectionMaintenanceSettings) -> bool { + let effective_threshold = + settings.expected_effective_messages * (1.0 + settings.effective_message_tolerance); + let drop_threshold = + settings.expected_drop_messages * (1.0 + settings.drop_message_tolerance); + self.effective_messages as f32 > effective_threshold + || self.drop_messages as f32 > drop_threshold + } + + /// Check if the peer is unhealthy based on the number of effective and drop messages sent + fn is_unhealthy(&self, settings: &ConnectionMaintenanceSettings) -> bool { + let effective_threshold = + settings.expected_effective_messages * (1.0 - settings.effective_message_tolerance); + let drop_threshold = + settings.expected_drop_messages * (1.0 - settings.drop_message_tolerance); + effective_threshold > self.effective_messages as f32 + || drop_threshold > self.drop_messages as f32 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn malicious_and_unhealthy_by_effective() { + let settings = ConnectionMaintenanceSettings { + time_window: Duration::from_secs(1), + expected_effective_messages: 2.0, + effective_message_tolerance: 0.1, + expected_drop_messages: 0.0, + drop_message_tolerance: 0.0, + }; + let mut maintenance = ConnectionMaintenance::::new(settings); + // Peer 0 sends 3 effective messages, more than expected + maintenance.add_effective(0); + maintenance.add_effective(0); + maintenance.add_effective(0); + // Peer 1 sends 2 effective messages, as expected + maintenance.add_effective(1); + maintenance.add_effective(1); + // Peer 2 sends 1 effective messages, less than expected + maintenance.add_effective(2); + + let (malicious, unhealthy) = maintenance.reset(); + assert_eq!(malicious, HashSet::from_iter(vec![0])); + assert_eq!(unhealthy, HashSet::from_iter(vec![2])); + } + + #[test] + fn malicious_and_unhealthy_by_drop() { + let settings = ConnectionMaintenanceSettings { + time_window: Duration::from_secs(1), + expected_effective_messages: 0.0, + effective_message_tolerance: 0.0, + expected_drop_messages: 2.0, + drop_message_tolerance: 0.1, + }; + let mut maintenance = ConnectionMaintenance::::new(settings); + // Peer 0 sends 3 drop messages, more than expected + maintenance.add_drop(0); + maintenance.add_drop(0); + maintenance.add_drop(0); + // Peer 1 sends 2 drop messages, as expected + maintenance.add_drop(1); + maintenance.add_drop(1); + // Peer 2 sends 1 drop messages, less than expected + maintenance.add_drop(2); + + let (malicious, unhealthy) = maintenance.reset(); + assert_eq!(malicious, HashSet::from_iter(vec![0])); + assert_eq!(unhealthy, HashSet::from_iter(vec![2])); + } +} diff --git a/nomos-mix/core/src/lib.rs b/nomos-mix/core/src/lib.rs index 53f33bf1..650b4299 100644 --- a/nomos-mix/core/src/lib.rs +++ b/nomos-mix/core/src/lib.rs @@ -1,3 +1,4 @@ +pub mod conn_maintenance; pub mod cover_traffic; pub mod membership; pub mod message_blend; diff --git a/nomos-mix/core/src/membership.rs b/nomos-mix/core/src/membership.rs index f1bda2ca..04dd8386 100644 --- a/nomos-mix/core/src/membership.rs +++ b/nomos-mix/core/src/membership.rs @@ -1,6 +1,11 @@ +use std::collections::HashSet; + use multiaddr::Multiaddr; use nomos_mix_message::MixMessage; -use rand::{seq::SliceRandom, Rng}; +use rand::{ + seq::{IteratorRandom, SliceRandom}, + Rng, +}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug)] @@ -48,6 +53,18 @@ where self.remote_nodes.choose_multiple(rng, amount).collect() } + pub fn filter_and_choose_remote_nodes( + &self, + rng: &mut R, + amount: usize, + exclude_addrs: &HashSet, + ) -> Vec<&Node> { + self.remote_nodes + .iter() + .filter(|node| !exclude_addrs.contains(&node.address)) + .choose_multiple(rng, amount) + } + pub fn local_node(&self) -> &Node { &self.local_node } diff --git a/nomos-mix/network/Cargo.toml b/nomos-mix/network/Cargo.toml index c8dfdc24..837c68d7 100644 --- a/nomos-mix/network/Cargo.toml +++ b/nomos-mix/network/Cargo.toml @@ -15,5 +15,6 @@ sha2 = "0.10" [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } +tokio-stream = "0.1" libp2p = { version = "0.53", features = ["ed25519", "tokio", "quic"] } tracing-subscriber = "0.3.18" diff --git a/nomos-mix/network/src/behaviour.rs b/nomos-mix/network/src/behaviour.rs index d2e04d84..200beebe 100644 --- a/nomos-mix/network/src/behaviour.rs +++ b/nomos-mix/network/src/behaviour.rs @@ -3,6 +3,7 @@ use crate::{ handler::{FromBehaviour, MixConnectionHandler, ToBehaviour}, }; use cached::{Cached, TimedCache}; +use futures::Stream; use libp2p::{ core::Endpoint, swarm::{ @@ -11,21 +12,32 @@ use libp2p::{ }, Multiaddr, PeerId, }; +use nomos_mix::conn_maintenance::{ConnectionMaintenance, ConnectionMaintenanceSettings}; use nomos_mix_message::MixMessage; use sha2::{Digest, Sha256}; use std::marker::PhantomData; use std::{ collections::{HashMap, HashSet, VecDeque}, + pin::pin, task::{Context, Poll, Waker}, }; /// A [`NetworkBehaviour`]: /// - forwards messages to all connected peers with deduplication. /// - receives messages from all connected peers. -pub struct Behaviour { - config: Config, +pub struct Behaviour { + config: Config, /// Peers that support the mix protocol, and their connection IDs negotiated_peers: HashMap>, + /// Connection maintenance + /// NOTE: For now, this is optional because this may close too many connections + /// until we clearly figure out the optimal parameters. + conn_maintenance: Option>, + /// Peers that should be excluded from connection establishments + blacklist_peers: HashSet, + /// To maintain address of peers because libp2p API uses only [`PeerId`] when handling + /// connection events. But, we need to know [`Multiaddr`] of peers to dial to them. + peer_addresses: HashMap, /// Queue of events to yield to the swarm. events: VecDeque>, /// Waker that handles polling @@ -37,26 +49,48 @@ pub struct Behaviour { } #[derive(Debug)] -pub struct Config { +pub struct Config { + pub max_peering_degree: usize, pub duplicate_cache_lifespan: u64, + pub conn_maintenance_settings: Option, + pub conn_maintenance_interval: Option, } #[derive(Debug)] pub enum Event { /// A message received from one of the peers. Message(Vec), + /// Request establishing the `[amount]` number of new connections, + /// excluding the peers with the given addresses. + EstabalishNewConnections { + amount: usize, + excludes: HashSet, + }, Error(Error), } -impl Behaviour +impl Behaviour where M: MixMessage, { - pub fn new(config: Config) -> Self { + pub fn new(config: Config) -> Self { + let Config { + conn_maintenance_settings, + conn_maintenance_interval, + .. + } = &config; + assert_eq!( + conn_maintenance_settings.is_some(), + conn_maintenance_interval.is_some() + ); + let conn_maintenance = conn_maintenance_settings.map(ConnectionMaintenance::::new); let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan); Self { config, negotiated_peers: HashMap::new(), + conn_maintenance, + peer_addresses: HashMap::new(), + blacklist_peers: HashSet::new(), events: VecDeque::new(), waker: None, duplicate_cache, @@ -115,6 +149,8 @@ where Ok(()) } + /// Add a peer to the list of connected peers that have completed negotiations + /// to verify [`Behaviour`] support. fn add_negotiated_peer(&mut self, peer_id: PeerId, connection_id: ConnectionId) -> bool { tracing::debug!( "Adding to connected_peers: peer_id:{:?}, connection_id:{:?}", @@ -127,20 +163,120 @@ where .insert(connection_id) } - fn remove_negotiated_peer(&mut self, peer_id: &PeerId, connection_id: &ConnectionId) { - if let Some(connections) = self.negotiated_peers.get_mut(peer_id) { - tracing::debug!( - "Removing from connected_peers: peer:{:?}, connection_id:{:?}", - peer_id, - connection_id - ); - connections.remove(connection_id); - if connections.is_empty() { - self.negotiated_peers.remove(peer_id); - } + /// Remove a peer from the list of connected peers. + /// If the peer (or the connection of the peer) is found and removed, return `false`. + /// If not, return `true`. + fn remove_negotiated_peer( + &mut self, + peer_id: &PeerId, + connection_id: Option<&ConnectionId>, + ) -> bool { + match connection_id { + Some(connection_id) => match self.negotiated_peers.get_mut(peer_id) { + Some(connections) => { + tracing::debug!( + "Removing from connected_peers: peer:{:?}, connection_id:{:?}", + peer_id, + connection_id + ); + let removed = connections.remove(connection_id); + if connections.is_empty() { + self.negotiated_peers.remove(peer_id); + } + removed + } + None => false, + }, + None => self.negotiated_peers.remove(peer_id).is_some(), } } + fn is_negotiated_peer(&self, peer_id: &PeerId) -> bool { + self.negotiated_peers.contains_key(peer_id) + } + + /// Handle newly detected malicious and unhealthy peers and schedule new connection establishments. + /// This must be called at the end of each time window. + fn run_conn_maintenance(&mut self) { + if let Some(conn_maintenance) = self.conn_maintenance.as_mut() { + let (malicious_peers, unhealthy_peers) = conn_maintenance.reset(); + let num_closed_malicious = self.handle_malicious_peers(malicious_peers); + let num_connected_unhealthy = self.handle_unhealthy_peers(unhealthy_peers); + let mut num_to_dial = num_closed_malicious + num_connected_unhealthy; + if num_to_dial + self.negotiated_peers.len() > self.config.max_peering_degree { + tracing::warn!( + "Cannot establish {} new connections due to max_peering_degree:{}. connected_peers:{}", + num_to_dial, + self.config.max_peering_degree, + self.negotiated_peers.len(), + ); + num_to_dial = self.config.max_peering_degree - self.negotiated_peers.len(); + } + self.schedule_dials(num_to_dial); + } + } + + /// Add newly detected malicious peers to the blacklist, + /// and schedule connection closures if the connections exist. + /// This returns the number of connection closures that were actually scheduled. + fn handle_malicious_peers(&mut self, peers: HashSet) -> usize { + peers + .into_iter() + .map(|peer_id| { + self.blacklist_peers.insert(peer_id); + // Close the connection only if the peer was already connected. + if self.remove_negotiated_peer(&peer_id, None) { + self.events.push_back(ToSwarm::CloseConnection { + peer_id, + connection: libp2p::swarm::CloseConnection::All, + }); + 1 + } else { + 0 + } + }) + .sum() + } + + /// Add newly detected unhealthy peers to the blacklist, + /// and return the number of connections that actually exist to the unhealthy peers. + fn handle_unhealthy_peers(&mut self, peers: HashSet) -> usize { + peers + .into_iter() + .map(|peer_id| { + self.blacklist_peers.insert(peer_id); + if self.is_negotiated_peer(&peer_id) { + 1 + } else { + 0 + } + }) + .sum() + } + + /// Schedule new connection establishments, excluding the blacklisted peers. + fn schedule_dials(&mut self, amount: usize) { + if amount == 0 { + return; + } + + let excludes = self + .blacklist_peers + .iter() + .filter_map(|peer_id| self.peer_addresses.get(peer_id).cloned()) + .collect(); + tracing::info!( + "Scheduling {} new dialings: excludes:{:?}", + amount, + excludes + ); + self.events + .push_back(ToSwarm::GenerateEvent(Event::EstabalishNewConnections { + amount, + excludes, + })); + } + /// SHA-256 hash of the message fn message_id(message: &[u8]) -> Vec { let mut hasher = Sha256::new(); @@ -155,9 +291,10 @@ where } } -impl NetworkBehaviour for Behaviour +impl NetworkBehaviour for Behaviour where M: MixMessage + 'static, + Interval: Stream + Unpin + 'static, { type ConnectionHandler = MixConnectionHandler; type ToSwarm = Event; @@ -165,21 +302,25 @@ where fn handle_established_inbound_connection( &mut self, _: ConnectionId, - _: PeerId, - _: &Multiaddr, + peer_id: PeerId, _: &Multiaddr, + remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { - Ok(MixConnectionHandler::new(&self.config)) + // Keep the address of the peer + self.peer_addresses.insert(peer_id, remote_addr.clone()); + Ok(MixConnectionHandler::new()) } fn handle_established_outbound_connection( &mut self, _: ConnectionId, - _: PeerId, - _: &Multiaddr, + peer_id: PeerId, + addr: &Multiaddr, _: Endpoint, ) -> Result, ConnectionDenied> { - Ok(MixConnectionHandler::new(&self.config)) + // Keep the address of the peer + self.peer_addresses.insert(peer_id, addr.clone()); + Ok(MixConnectionHandler::new()) } /// Informs the behaviour about an event from the [`Swarm`]. @@ -190,7 +331,7 @@ where .. }) = event { - self.remove_negotiated_peer(&peer_id, &connection_id); + self.remove_negotiated_peer(&peer_id, Some(&connection_id)); } } @@ -207,9 +348,16 @@ where ToBehaviour::Message(message) => { // Ignore drop message if M::is_drop_message(&message) { + if let Some(conn_maintenance) = self.conn_maintenance.as_mut() { + conn_maintenance.add_drop(peer_id); + } return; } + if let Some(conn_maintenance) = self.conn_maintenance.as_mut() { + conn_maintenance.add_effective(peer_id); + } + // Add the message to the cache. If it was already seen, ignore it. if self .duplicate_cache @@ -236,7 +384,7 @@ where self.add_negotiated_peer(peer_id, connection_id); } ToBehaviour::NegotiationFailed => { - self.remove_negotiated_peer(&peer_id, &connection_id); + self.remove_negotiated_peer(&peer_id, Some(&connection_id)); } ToBehaviour::IOError(error) => { // TODO: Consider removing the peer from the connected_peers and closing the connection @@ -257,6 +405,13 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll>> { + // Run connection maintenance if the interval is reached. + if let Some(interval) = self.config.conn_maintenance_interval.as_mut() { + if pin!(interval).poll_next(cx).is_ready() { + self.run_conn_maintenance(); + } + } + if let Some(event) = self.events.pop_front() { Poll::Ready(event) } else { diff --git a/nomos-mix/network/src/handler.rs b/nomos-mix/network/src/handler.rs index e6bddc64..d0c36ba4 100644 --- a/nomos-mix/network/src/handler.rs +++ b/nomos-mix/network/src/handler.rs @@ -14,8 +14,6 @@ use libp2p::{ Stream, StreamProtocol, }; -use crate::behaviour::Config; - const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/mix/0.1.0"); // TODO: Consider replacing this struct with libp2p_stream ConnectionHandler @@ -42,7 +40,7 @@ enum OutboundSubstreamState { } impl MixConnectionHandler { - pub fn new(_config: &Config) -> Self { + pub fn new() -> Self { Self { inbound_substream: None, outbound_substream: None, diff --git a/nomos-mix/network/src/lib.rs b/nomos-mix/network/src/lib.rs index 6db526e4..417e7570 100644 --- a/nomos-mix/network/src/lib.rs +++ b/nomos-mix/network/src/lib.rs @@ -14,8 +14,10 @@ mod test { swarm::{dummy, NetworkBehaviour, SwarmEvent}, Multiaddr, PeerId, Swarm, SwarmBuilder, }; + use nomos_mix::conn_maintenance::ConnectionMaintenanceSettings; use nomos_mix_message::mock::MockMixMessage; use tokio::select; + use tokio_stream::wrappers::IntervalStream; use crate::{behaviour::Config, error::Error, Behaviour, Event}; @@ -116,11 +118,23 @@ mod test { } } - fn new_swarm(key: Keypair) -> Swarm> { + fn new_swarm(key: Keypair) -> Swarm> { + let conn_maintenance_settings = ConnectionMaintenanceSettings { + time_window: Duration::from_secs(60), + expected_effective_messages: 1.0, + effective_message_tolerance: 0.1, + expected_drop_messages: 1.0, + drop_message_tolerance: 0.1, + }; + let conn_maintenance_interval = + IntervalStream::new(tokio::time::interval(conn_maintenance_settings.time_window)); new_swarm_with_behaviour( key, Behaviour::new(Config { + max_peering_degree: 10, duplicate_cache_lifespan: 60, + conn_maintenance_settings: Some(conn_maintenance_settings), + conn_maintenance_interval: Some(conn_maintenance_interval), }), ) } diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index 90c1d734..55f6bf7a 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -1,10 +1,10 @@ use cryptarchia_consensus::LeaderConfig; // std use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; -use nomos_mix::membership::Node; use nomos_mix::message_blend::{ CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, }; +use nomos_mix::{conn_maintenance::ConnectionMaintenanceSettings, membership::Node}; use nomos_mix_message::{sphinx::SphinxMessage, MixMessage}; use std::path::PathBuf; use std::time::Duration; @@ -330,6 +330,14 @@ pub fn new_mix_configs(listening_addresses: Vec) -> Vec, } const CHANNEL_SIZE: usize = 64; @@ -44,38 +45,26 @@ const CHANNEL_SIZE: usize = 64; impl MixBackend for Libp2pMixBackend { type Settings = Libp2pMixBackendSettings; - fn new( + fn new( config: Self::Settings, overwatch_handle: OverwatchHandle, membership: Membership, - mut rng: R, - ) -> Self { + rng: R, + ) -> Self + where + R: Rng + Send + 'static, + { let (swarm_message_sender, swarm_message_receiver) = mpsc::channel(CHANNEL_SIZE); let (incoming_message_sender, _) = broadcast::channel(CHANNEL_SIZE); - let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone())); let mut swarm = MixSwarm::new( - keypair, + config, swarm_message_receiver, incoming_message_sender.clone(), + membership, + rng, ); - swarm - .listen_on(config.listening_address) - .unwrap_or_else(|e| { - panic!("Failed to listen on Mix network: {e:?}"); - }); - - // Randomly select peering_degree number of peers, and dial to them - membership - .choose_remote_nodes(&mut rng, config.peering_degree) - .iter() - .for_each(|node| { - if let Err(e) = swarm.dial(node.address.clone()) { - tracing::error!("failed to dial to {:?}: {:?}", node.address, e); - } - }); - let task = overwatch_handle.runtime().spawn(async move { swarm.run().await; }); @@ -105,10 +94,15 @@ impl MixBackend for Libp2pMixBackend { } } -struct MixSwarm { - swarm: Swarm>, +struct MixSwarm +where + R: Send, +{ + swarm: Swarm>, swarm_messages_receiver: mpsc::Receiver, incoming_message_sender: broadcast::Sender>, + membership: Membership, + rng: R, } #[derive(Debug)] @@ -116,18 +110,30 @@ pub enum MixSwarmMessage { Publish(Vec), } -impl MixSwarm { +impl MixSwarm +where + R: Rng + Send, +{ fn new( - keypair: Keypair, + config: Libp2pMixBackendSettings, swarm_messages_receiver: mpsc::Receiver, incoming_message_sender: broadcast::Sender>, + membership: Membership, + rng: R, ) -> Self { - let swarm = SwarmBuilder::with_existing_identity(keypair) + let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone())); + let mut swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() .with_quic() .with_behaviour(|_| { + let conn_maintenance_interval = config.conn_maintenance.map(|settings| { + IntervalStream::new(tokio::time::interval(settings.time_window)) + }); nomos_mix_network::Behaviour::new(nomos_mix_network::Config { + max_peering_degree: config.max_peering_degree, duplicate_cache_lifespan: 60, + conn_maintenance_settings: config.conn_maintenance, + conn_maintenance_interval, }) }) .expect("Mix Behaviour should be built") @@ -136,19 +142,31 @@ impl MixSwarm { }) .build(); - Self { + swarm + .listen_on(config.listening_address) + .unwrap_or_else(|e| { + panic!("Failed to listen on Mix network: {e:?}"); + }); + + let mut mix_swarm = Self { swarm, swarm_messages_receiver, incoming_message_sender, + membership, + rng, + }; + + // Randomly select peering_degree number of peers, and dial to them + let num_dialed = mix_swarm.dial_to_peers(config.peering_degree, None); + if num_dialed < config.peering_degree { + tracing::warn!( + "Failed to dial to enough peers. Expected: {}, Dialed: {}", + config.peering_degree, + num_dialed + ); } - } - fn listen_on(&mut self, addr: Multiaddr) -> Result> { - self.swarm.listen_on(addr) - } - - fn dial(&mut self, addr: Multiaddr) -> Result<(), DialError> { - self.swarm.dial(DialOpts::from(addr)) + mix_swarm } async fn run(&mut self) { @@ -182,6 +200,22 @@ impl MixSwarm { tracing::error!("Failed to send incoming message to channel: {e}"); } } + SwarmEvent::Behaviour(nomos_mix_network::Event::EstabalishNewConnections { + amount, + excludes, + }) => { + tracing::debug!( + "Establishing {amount} new connections excluding peers: {excludes:?}" + ); + let successful_dials = self.dial_to_peers(amount, Some(&excludes)); + if successful_dials < amount { + tracing::warn!( + "Tried to establish {} new connections, but only {} succeeded", + amount, + successful_dials + ); + } + } SwarmEvent::Behaviour(nomos_mix_network::Event::Error(e)) => { tracing::error!("Received error from mix network: {e:?}"); } @@ -190,4 +224,29 @@ impl MixSwarm { } } } + + fn dial_to_peers(&mut self, amount: usize, excludes: Option<&HashSet>) -> usize { + let mut successful_dials = 0; + + let nodes = match excludes { + Some(excludes) => { + self.membership + .filter_and_choose_remote_nodes(&mut self.rng, amount, excludes) + } + None => self.membership.choose_remote_nodes(&mut self.rng, amount), + }; + + nodes + .iter() + .for_each(|node| match self.swarm.dial(node.address.clone()) { + Ok(_) => { + successful_dials += 1; + } + Err(e) => { + tracing::error!("Failed to dial to {:?}: {e}", node.address); + } + }); + + successful_dials + } } diff --git a/nomos-services/mix/src/backends/mod.rs b/nomos-services/mix/src/backends/mod.rs index 8c1ee8ef..da9dfc15 100644 --- a/nomos-services/mix/src/backends/mod.rs +++ b/nomos-services/mix/src/backends/mod.rs @@ -21,7 +21,7 @@ pub trait MixBackend { rng: R, ) -> Self where - R: Rng; + R: Rng + Send + 'static; /// Publish a message to the mix network. async fn publish(&self, msg: Vec); /// Listen to messages received from the mix network. diff --git a/tests/src/topology/configs/mix.rs b/tests/src/topology/configs/mix.rs index 16454b64..0017183a 100644 --- a/tests/src/topology/configs/mix.rs +++ b/tests/src/topology/configs/mix.rs @@ -31,6 +31,8 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec { .unwrap(), node_key, peering_degree: 1, + max_peering_degree: 3, + conn_maintenance: None, }, private_key: x25519_dalek::StaticSecret::random(), membership: Vec::new(),