From 5a77ede4f5595fd60cd43b41e7f5bdf8ee31eaba Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Mon, 2 Dec 2024 12:54:25 +0900 Subject: [PATCH] make maintenance optional --- nomos-mix/core/src/conn_maintenance.rs | 1 + nomos-mix/network/src/behaviour.rs | 57 +++++++++++-------- nomos-mix/network/src/lib.rs | 4 +- .../data-availability/tests/src/common.rs | 4 +- nomos-services/mix/src/backends/libp2p.rs | 7 ++- nomos-services/mix/tests/frequency.rs | 4 +- tests/src/topology/configs/mix.rs | 4 +- 7 files changed, 46 insertions(+), 35 deletions(-) diff --git a/nomos-mix/core/src/conn_maintenance.rs b/nomos-mix/core/src/conn_maintenance.rs index 5f724280..dc6cee39 100644 --- a/nomos-mix/core/src/conn_maintenance.rs +++ b/nomos-mix/core/src/conn_maintenance.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct ConnectionMaintenanceSettings { + /// Time interval to measure/evaluate the number of messages sent by each peer. pub time_window: Duration, /// The number of effective (data or cover) messages that a peer is expected to send in a given time window. /// If the measured count is greater than (expected * (1 + tolerance)), the peer is considered malicious. diff --git a/nomos-mix/network/src/behaviour.rs b/nomos-mix/network/src/behaviour.rs index 0f301739..eba24a76 100644 --- a/nomos-mix/network/src/behaviour.rs +++ b/nomos-mix/network/src/behaviour.rs @@ -30,8 +30,10 @@ pub struct Behaviour { /// Peers that support the mix protocol, and their connection IDs negotiated_peers: HashMap>, /// Connection maintenance - conn_maintenance: ConnectionMaintenance, - conn_maintenance_interval: Interval, + /// NOTE: For now, this is optional because this may close too many connections + /// until we clearly figure out the optimal parameters. + conn_maintenance: Option>, + conn_maintenance_interval: Option, /// Peers that should be excluded from connection establishments blacklist_peers: HashSet, /// To maintain address of peers because libp2p API uses only [`PeerId`] when handling @@ -51,7 +53,7 @@ pub struct Behaviour { pub struct Config { pub max_peering_degree: usize, pub duplicate_cache_lifespan: u64, - pub conn_maintenance_settings: ConnectionMaintenanceSettings, + pub conn_maintenance_settings: Option, } #[derive(Debug)] @@ -69,10 +71,15 @@ impl Behaviour where M: MixMessage, { - pub fn new(config: Config, conn_maintenance_interval: Interval) -> Self { + pub fn new(config: Config, conn_maintenance_interval: Option) -> Self { + assert_eq!( + config.conn_maintenance_settings.is_some(), + conn_maintenance_interval.is_some() + ); + let conn_maintenance = config + .conn_maintenance_settings + .map(ConnectionMaintenance::::new); let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan); - let conn_maintenance = - ConnectionMaintenance::::new(config.conn_maintenance_settings); Self { config, negotiated_peers: HashMap::new(), @@ -180,20 +187,22 @@ where } fn run_conn_maintenance(&mut self) { - let (malicious_peers, unhealthy_peers) = self.conn_maintenance.reset(); - let num_closed_malicious = self.handle_malicious_peers(malicious_peers); - let num_connected_unhealthy = self.handle_unhealthy_peers(unhealthy_peers); - let mut num_to_dial = num_closed_malicious + num_connected_unhealthy; - if num_to_dial + self.negotiated_peers.len() > self.config.max_peering_degree { - tracing::warn!( + if let Some(conn_maintenance) = self.conn_maintenance.as_mut() { + let (malicious_peers, unhealthy_peers) = conn_maintenance.reset(); + let num_closed_malicious = self.handle_malicious_peers(malicious_peers); + let num_connected_unhealthy = self.handle_unhealthy_peers(unhealthy_peers); + let mut num_to_dial = num_closed_malicious + num_connected_unhealthy; + if num_to_dial + self.negotiated_peers.len() > self.config.max_peering_degree { + tracing::warn!( "Cannot establish {} new connections due to max_peering_degree:{}. connected_peers:{}", num_to_dial, self.config.max_peering_degree, self.negotiated_peers.len(), ); - num_to_dial = self.config.max_peering_degree - self.negotiated_peers.len(); + num_to_dial = self.config.max_peering_degree - self.negotiated_peers.len(); + } + self.schedule_dials(num_to_dial); } - self.schedule_dials(num_to_dial); } fn handle_malicious_peers(&mut self, peers: HashSet) -> usize { @@ -320,14 +329,15 @@ where ToBehaviour::Message(message) => { // Ignore drop message if M::is_drop_message(&message) { - self.conn_maintenance.add_drop(peer_id); + if let Some(conn_maintenance) = self.conn_maintenance.as_mut() { + conn_maintenance.add_drop(peer_id); + } return; } - // TODO: Discuss about the spec again. - // Due to immediate forwarding (which bypass Persistent Transmission), - // the measured effective messages may be very higher than the expected. - self.conn_maintenance.add_effective(peer_id); + if let Some(conn_maintenance) = self.conn_maintenance.as_mut() { + conn_maintenance.add_effective(peer_id); + } // Add the message to the cache. If it was already seen, ignore it. if self @@ -377,11 +387,10 @@ where cx: &mut Context<'_>, ) -> Poll>> { // Run connection maintenance if the interval is reached. - if pin!(&mut self.conn_maintenance_interval) - .poll_next(cx) - .is_ready() - { - self.run_conn_maintenance(); + if let Some(interval) = self.conn_maintenance_interval.as_mut() { + if pin!(interval).poll_next(cx).is_ready() { + self.run_conn_maintenance(); + } } if let Some(event) = self.events.pop_front() { diff --git a/nomos-mix/network/src/lib.rs b/nomos-mix/network/src/lib.rs index 3ac45609..b814ecda 100644 --- a/nomos-mix/network/src/lib.rs +++ b/nomos-mix/network/src/lib.rs @@ -134,9 +134,9 @@ mod test { Config { max_peering_degree: 10, duplicate_cache_lifespan: 60, - conn_maintenance_settings, + conn_maintenance_settings: Some(conn_maintenance_settings), }, - conn_maintenance_interval, + Some(conn_maintenance_interval), ), ) } diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index cca0255f..55f6bf7a 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -331,13 +331,13 @@ pub fn new_mix_configs(listening_addresses: Vec) -> Vec, } const CHANNEL_SIZE: usize = 64; @@ -126,8 +126,9 @@ where .with_tokio() .with_quic() .with_behaviour(|_| { - let conn_maintenance_interval = - IntervalStream::new(tokio::time::interval(config.conn_maintenance.time_window)); + let conn_maintenance_interval = config.conn_maintenance.map(|settings| { + IntervalStream::new(tokio::time::interval(settings.time_window)) + }); nomos_mix_network::Behaviour::new( nomos_mix_network::Config { max_peering_degree: config.max_peering_degree, diff --git a/nomos-services/mix/tests/frequency.rs b/nomos-services/mix/tests/frequency.rs index cc058c03..c8a417c5 100644 --- a/nomos-services/mix/tests/frequency.rs +++ b/nomos-services/mix/tests/frequency.rs @@ -110,13 +110,13 @@ fn new_mix_configs(listening_addresses: Vec) -> Vec node_key: ed25519::SecretKey::generate(), peering_degree: PEERING_DEGREE, max_peering_degree: PEERING_DEGREE + 5, - conn_maintenance: ConnectionMaintenanceSettings { + conn_maintenance: Some(ConnectionMaintenanceSettings { time_window: Duration::from_secs(10), expected_effective_messages: 5.0, effective_message_tolerance: 0.1, expected_drop_messages: 0.0, drop_message_tolerance: 0.0, - }, + }), }, x25519_dalek::StaticSecret::random(), ) diff --git a/tests/src/topology/configs/mix.rs b/tests/src/topology/configs/mix.rs index ae121192..1ccb47fa 100644 --- a/tests/src/topology/configs/mix.rs +++ b/tests/src/topology/configs/mix.rs @@ -32,13 +32,13 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec { node_key, peering_degree: 1, max_peering_degree: 3, - conn_maintenance: ConnectionMaintenanceSettings { + conn_maintenance: Some(ConnectionMaintenanceSettings { time_window: Duration::from_secs(600), expected_effective_messages: 600.0, effective_message_tolerance: 0.1, expected_drop_messages: 0.0, drop_message_tolerance: 0.0, - }, + }), }, private_key: x25519_dalek::StaticSecret::random(), membership: Vec::new(),