make maintenance optional

This commit is contained in:
Youngjoon Lee 2024-12-02 12:54:25 +09:00
parent 9fe53a7a60
commit 5a77ede4f5
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
7 changed files with 46 additions and 35 deletions

View File

@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct ConnectionMaintenanceSettings {
/// Time interval to measure/evaluate the number of messages sent by each peer.
pub time_window: Duration,
/// The number of effective (data or cover) messages that a peer is expected to send in a given time window.
/// If the measured count is greater than (expected * (1 + tolerance)), the peer is considered malicious.

View File

@ -30,8 +30,10 @@ pub struct Behaviour<M, Interval> {
/// Peers that support the mix protocol, and their connection IDs
negotiated_peers: HashMap<PeerId, HashSet<ConnectionId>>,
/// Connection maintenance
conn_maintenance: ConnectionMaintenance<PeerId>,
conn_maintenance_interval: Interval,
/// NOTE: For now, this is optional because this may close too many connections
/// until we clearly figure out the optimal parameters.
conn_maintenance: Option<ConnectionMaintenance<PeerId>>,
conn_maintenance_interval: Option<Interval>,
/// Peers that should be excluded from connection establishments
blacklist_peers: HashSet<PeerId>,
/// To maintain address of peers because libp2p API uses only [`PeerId`] when handling
@ -51,7 +53,7 @@ pub struct Behaviour<M, Interval> {
pub struct Config {
pub max_peering_degree: usize,
pub duplicate_cache_lifespan: u64,
pub conn_maintenance_settings: ConnectionMaintenanceSettings,
pub conn_maintenance_settings: Option<ConnectionMaintenanceSettings>,
}
#[derive(Debug)]
@ -69,10 +71,15 @@ impl<M, Interval> Behaviour<M, Interval>
where
M: MixMessage,
{
pub fn new(config: Config, conn_maintenance_interval: Interval) -> Self {
pub fn new(config: Config, conn_maintenance_interval: Option<Interval>) -> Self {
assert_eq!(
config.conn_maintenance_settings.is_some(),
conn_maintenance_interval.is_some()
);
let conn_maintenance = config
.conn_maintenance_settings
.map(ConnectionMaintenance::<PeerId>::new);
let duplicate_cache = TimedCache::with_lifespan(config.duplicate_cache_lifespan);
let conn_maintenance =
ConnectionMaintenance::<PeerId>::new(config.conn_maintenance_settings);
Self {
config,
negotiated_peers: HashMap::new(),
@ -180,20 +187,22 @@ where
}
fn run_conn_maintenance(&mut self) {
let (malicious_peers, unhealthy_peers) = self.conn_maintenance.reset();
let num_closed_malicious = self.handle_malicious_peers(malicious_peers);
let num_connected_unhealthy = self.handle_unhealthy_peers(unhealthy_peers);
let mut num_to_dial = num_closed_malicious + num_connected_unhealthy;
if num_to_dial + self.negotiated_peers.len() > self.config.max_peering_degree {
tracing::warn!(
if let Some(conn_maintenance) = self.conn_maintenance.as_mut() {
let (malicious_peers, unhealthy_peers) = conn_maintenance.reset();
let num_closed_malicious = self.handle_malicious_peers(malicious_peers);
let num_connected_unhealthy = self.handle_unhealthy_peers(unhealthy_peers);
let mut num_to_dial = num_closed_malicious + num_connected_unhealthy;
if num_to_dial + self.negotiated_peers.len() > self.config.max_peering_degree {
tracing::warn!(
"Cannot establish {} new connections due to max_peering_degree:{}. connected_peers:{}",
num_to_dial,
self.config.max_peering_degree,
self.negotiated_peers.len(),
);
num_to_dial = self.config.max_peering_degree - self.negotiated_peers.len();
num_to_dial = self.config.max_peering_degree - self.negotiated_peers.len();
}
self.schedule_dials(num_to_dial);
}
self.schedule_dials(num_to_dial);
}
fn handle_malicious_peers(&mut self, peers: HashSet<PeerId>) -> usize {
@ -320,14 +329,15 @@ where
ToBehaviour::Message(message) => {
// Ignore drop message
if M::is_drop_message(&message) {
self.conn_maintenance.add_drop(peer_id);
if let Some(conn_maintenance) = self.conn_maintenance.as_mut() {
conn_maintenance.add_drop(peer_id);
}
return;
}
// TODO: Discuss about the spec again.
// Due to immediate forwarding (which bypass Persistent Transmission),
// the measured effective messages may be very higher than the expected.
self.conn_maintenance.add_effective(peer_id);
if let Some(conn_maintenance) = self.conn_maintenance.as_mut() {
conn_maintenance.add_effective(peer_id);
}
// Add the message to the cache. If it was already seen, ignore it.
if self
@ -377,11 +387,10 @@ where
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// Run connection maintenance if the interval is reached.
if pin!(&mut self.conn_maintenance_interval)
.poll_next(cx)
.is_ready()
{
self.run_conn_maintenance();
if let Some(interval) = self.conn_maintenance_interval.as_mut() {
if pin!(interval).poll_next(cx).is_ready() {
self.run_conn_maintenance();
}
}
if let Some(event) = self.events.pop_front() {

View File

@ -134,9 +134,9 @@ mod test {
Config {
max_peering_degree: 10,
duplicate_cache_lifespan: 60,
conn_maintenance_settings,
conn_maintenance_settings: Some(conn_maintenance_settings),
},
conn_maintenance_interval,
Some(conn_maintenance_interval),
),
)
}

View File

@ -331,13 +331,13 @@ pub fn new_mix_configs(listening_addresses: Vec<Multiaddr>) -> Vec<TestMixSettin
node_key: ed25519::SecretKey::generate(),
peering_degree: 1,
max_peering_degree: 1,
conn_maintenance: ConnectionMaintenanceSettings {
conn_maintenance: Some(ConnectionMaintenanceSettings {
time_window: Duration::from_secs(600),
expected_effective_messages: 600.0,
effective_message_tolerance: 0.1,
expected_drop_messages: 0.0,
drop_message_tolerance: 0.0,
},
}),
},
x25519_dalek::StaticSecret::random(),
)

View File

@ -36,7 +36,7 @@ pub struct Libp2pMixBackendSettings {
pub node_key: ed25519::SecretKey,
pub peering_degree: usize,
pub max_peering_degree: usize,
pub conn_maintenance: ConnectionMaintenanceSettings,
pub conn_maintenance: Option<ConnectionMaintenanceSettings>,
}
const CHANNEL_SIZE: usize = 64;
@ -126,8 +126,9 @@ where
.with_tokio()
.with_quic()
.with_behaviour(|_| {
let conn_maintenance_interval =
IntervalStream::new(tokio::time::interval(config.conn_maintenance.time_window));
let conn_maintenance_interval = config.conn_maintenance.map(|settings| {
IntervalStream::new(tokio::time::interval(settings.time_window))
});
nomos_mix_network::Behaviour::new(
nomos_mix_network::Config {
max_peering_degree: config.max_peering_degree,

View File

@ -110,13 +110,13 @@ fn new_mix_configs(listening_addresses: Vec<Multiaddr>) -> Vec<TestMixSettings>
node_key: ed25519::SecretKey::generate(),
peering_degree: PEERING_DEGREE,
max_peering_degree: PEERING_DEGREE + 5,
conn_maintenance: ConnectionMaintenanceSettings {
conn_maintenance: Some(ConnectionMaintenanceSettings {
time_window: Duration::from_secs(10),
expected_effective_messages: 5.0,
effective_message_tolerance: 0.1,
expected_drop_messages: 0.0,
drop_message_tolerance: 0.0,
},
}),
},
x25519_dalek::StaticSecret::random(),
)

View File

@ -32,13 +32,13 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
node_key,
peering_degree: 1,
max_peering_degree: 3,
conn_maintenance: ConnectionMaintenanceSettings {
conn_maintenance: Some(ConnectionMaintenanceSettings {
time_window: Duration::from_secs(600),
expected_effective_messages: 600.0,
effective_message_tolerance: 0.1,
expected_drop_messages: 0.0,
drop_message_tolerance: 0.0,
},
}),
},
private_key: x25519_dalek::StaticSecret::random(),
membership: Vec::new(),