From 6ca609acd94a401ce85faa3836e2132d67f9961f Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Fri, 6 Dec 2024 13:52:30 +0900 Subject: [PATCH] Mix: add connection maintenance, excluding disconnecting --- simlib/blendnet-sims/Cargo.toml | 6 +- simlib/blendnet-sims/config/blendnet.json | 12 ++- simlib/blendnet-sims/src/main.rs | 10 +- simlib/blendnet-sims/src/node/mix/mod.rs | 110 +++++++++++++++++----- simlib/blendnet-sims/src/settings.rs | 7 +- 5 files changed, 109 insertions(+), 36 deletions(-) diff --git a/simlib/blendnet-sims/Cargo.toml b/simlib/blendnet-sims/Cargo.toml index 73b7f1e..0c51fbe 100644 --- a/simlib/blendnet-sims/Cargo.toml +++ b/simlib/blendnet-sims/Cargo.toml @@ -18,12 +18,12 @@ tracing = "0.1.40" tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"] } netrunner = { path = "../netrunner" } nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git" } -nomos-mix = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix" } -nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix-message" } +nomos-mix = { git = "https://github.com/logos-co/nomos-node", branch = "mix-conn-monitor-sim", package = "nomos-mix" } +nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", branch = "mix-conn-monitor-sim", package = "nomos-mix-message" } futures = "0.3.31" rand_chacha = "0.3" -multiaddr = "0.18" sha2 = "0.10" uuid = { version = "1", features = ["fast-rng", "v4"] } tracing-appender = "0.2" cached = "0.54.0" +polars = "0.44" diff --git a/simlib/blendnet-sims/config/blendnet.json b/simlib/blendnet-sims/config/blendnet.json index 6636913..d34b79c 100644 --- a/simlib/blendnet-sims/config/blendnet.json +++ b/simlib/blendnet-sims/config/blendnet.json @@ -33,9 +33,19 @@ "sum": 10 } ], - "connected_peers_count": 4, "data_message_lottery_interval": "20s", "stake_proportion": 1.0, + "conn_maintenance": { + "peering_degree": 4, + "max_peering_degree": 8, + "monitor": { + "time_window": "20s", + "expected_effective_messages": 0.0, + "effective_message_tolerance": 0.0, + "expected_drop_messages": 0.0, + "drop_message_tolerance": 0.0 + } + }, "epoch_duration": "432000s", "slot_duration": "20s", "slots_per_epoch": 21600, diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index 1e9eb91..9f4050c 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -21,7 +21,6 @@ use nomos_mix::message_blend::{ CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, }; use parking_lot::Mutex; -use rand::prelude::IteratorRandom; use rand::seq::SliceRandom; use rand::{RngCore, SeedableRng}; use rand_chacha::ChaCha12Rng; @@ -87,7 +86,6 @@ impl SimulationApp { let behaviours = create_behaviours(&settings.simulation_settings.network_settings); let regions_data = RegionsData::new(regions, behaviours); - let ids = node_ids.clone(); let network = Arc::new(Mutex::new(Network::::new(regions_data, seed))); let nodes: Vec<_> = node_ids @@ -101,16 +99,13 @@ impl SimulationApp { settings.simulation_settings.clone(), no_netcap, MixnodeSettings { - connected_peers: ids - .iter() - .filter(|&id| id != &node_id) - .copied() - .choose_multiple(&mut rng, settings.connected_peers_count), + membership: node_ids.clone(), data_message_lottery_interval: settings.data_message_lottery_interval, stake_proportion: settings.stake_proportion / node_ids.len() as f64, seed: rng.next_u64(), epoch_duration: settings.epoch_duration, // 5 days seconds slot_duration: settings.slot_duration, + conn_maintenance: settings.conn_maintenance, persistent_transmission: settings.persistent_transmission, message_blend: MessageBlendSettings { cryptographic_processor: CryptographicProcessorSettings { @@ -127,7 +122,6 @@ impl SimulationApp { slots_per_epoch: settings.slots_per_epoch, network_size: node_ids.len(), }, - membership: node_ids.iter().map(|&id| id.into()).collect(), }, ) }) diff --git a/simlib/blendnet-sims/src/node/mix/mod.rs b/simlib/blendnet-sims/src/node/mix/mod.rs index cf5cca4..61aba82 100644 --- a/simlib/blendnet-sims/src/node/mix/mod.rs +++ b/simlib/blendnet-sims/src/node/mix/mod.rs @@ -11,13 +11,13 @@ use crossbeam::channel; use futures::Stream; use lottery::StakeLottery; use message::{Payload, PayloadId}; -use multiaddr::Multiaddr; use netrunner::network::NetworkMessage; use netrunner::node::{Node, NodeId, NodeIdExt}; use netrunner::{ network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}, warding::WardCondition, }; +use nomos_mix::conn_maintenance::{ConnectionMaintenance, ConnectionMaintenanceSettings}; use nomos_mix::{ cover_traffic::{CoverTraffic, CoverTrafficSettings}, membership::Membership, @@ -30,6 +30,7 @@ use nomos_mix::{ MixOutgoingMessage, }; use nomos_mix_message::mock::MockMixMessage; +use polars::series::Series; use rand::SeedableRng; use rand_chacha::ChaCha12Rng; use scheduler::{Interval, TemporalRelease}; @@ -50,16 +51,16 @@ impl PayloadSize for MixMessage { #[derive(Deserialize)] pub struct MixnodeSettings { - pub connected_peers: Vec, + pub membership: Vec, pub data_message_lottery_interval: Duration, pub stake_proportion: f64, pub seed: u64, pub epoch_duration: Duration, pub slot_duration: Duration, + pub conn_maintenance: ConnectionMaintenanceSettings, pub persistent_transmission: PersistentTransmissionSettings, pub message_blend: MessageBlendSettings, pub cover_traffic_settings: CoverTrafficSettings, - pub membership: Vec<::PublicKey>, } type Sha256Hash = [u8; 32]; @@ -68,7 +69,6 @@ type Sha256Hash = [u8; 32]; pub struct MixNode { id: NodeId, state: MixnodeState, - settings: MixnodeSettings, network_interface: InMemoryNetworkInterface, message_cache: TimedCache, @@ -76,6 +76,9 @@ pub struct MixNode { data_msg_lottery_interval: Interval, data_msg_lottery: StakeLottery, + conn_maintenance: ConnectionMaintenance, + conn_maintenance_update_time_sender: channel::Sender, + conn_maintenance_interval: Interval, persistent_sender: channel::Sender>, persistent_update_time_sender: channel::Sender, persistent_transmission_messages: PersistentTransmissionStream< @@ -84,12 +87,13 @@ pub struct MixNode { MockMixMessage, Interval, >, - crypto_processor: CryptographicProcessor, + crypto_processor: CryptographicProcessor, blend_sender: channel::Sender>, blend_update_time_sender: channel::Sender, blend_messages: MessageBlendStream< CrossbeamReceiverStream>, ChaCha12Rng, + NodeId, MockMixMessage, TemporalRelease, >, @@ -106,6 +110,22 @@ impl MixNode { ) -> Self { let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed); + // Init Membership + let nodes: Vec< + nomos_mix::membership::Node< + NodeId, + ::PublicKey, + >, + > = settings + .membership + .iter() + .map(|&node_id| nomos_mix::membership::Node { + address: node_id, + public_key: node_id.into(), + }) + .collect(); + let membership = Membership::::new(nodes, id.into()); + // Init Interval for data message lottery let (data_msg_lottery_update_time_sender, data_msg_lottery_update_time_receiver) = channel::unbounded(); @@ -118,8 +138,23 @@ impl MixNode { settings.stake_proportion, ); - // Init Tier-1: Persistent transmission + // Init Tier-1: Connection maintenance and Persistent transmission + let mut conn_maintenance = ConnectionMaintenance::new( + settings.conn_maintenance, + membership.clone(), + ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), + ); + conn_maintenance + .bootstrap() + .into_iter() + .for_each(|peer| conn_maintenance.add_connected_peer(peer)); + let (conn_maintenance_update_time_sender, conn_maintenance_update_time_receiver) = + channel::unbounded(); let (persistent_sender, persistent_receiver) = channel::unbounded(); + let conn_maintenance_interval = Interval::new( + settings.conn_maintenance.monitor.unwrap().time_window, + conn_maintenance_update_time_receiver, + ); let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded(); let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver) .persistent_transmission( @@ -136,19 +171,7 @@ impl MixNode { // Init Tier-2: message blend let (blend_sender, blend_receiver) = channel::unbounded(); let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded(); - let nodes: Vec< - nomos_mix::membership::Node< - ::PublicKey, - >, - > = settings - .membership - .iter() - .map(|&public_key| nomos_mix::membership::Node { - address: Multiaddr::empty(), - public_key, - }) - .collect(); - let membership = Membership::::new(nodes, id.into()); + let crypto_processor = CryptographicProcessor::new( settings.message_blend.cryptographic_processor.clone(), membership.clone(), @@ -188,7 +211,6 @@ impl MixNode { // We're not coupling this lifespan with the steps now, but it's okay // We expected that a message will be delivered to most of nodes within 60s. message_cache: TimedCache::with_lifespan(60), - settings, state: MixnodeState { node_id: id, step_id: 0, @@ -197,6 +219,9 @@ impl MixNode { data_msg_lottery_update_time_sender, data_msg_lottery_interval, data_msg_lottery, + conn_maintenance, + conn_maintenance_update_time_sender, + conn_maintenance_interval, persistent_sender, persistent_update_time_sender, persistent_transmission_messages, @@ -217,8 +242,8 @@ impl MixNode { log: Option, ) { for (i, node_id) in self - .settings - .connected_peers + .conn_maintenance + .connected_peers() .iter() .filter(|&id| Some(*id) != exclude_node) .enumerate() @@ -238,6 +263,9 @@ impl MixNode { self.network_interface .receive_messages() .into_iter() + .inspect(|msg| { + self.conn_maintenance.record_effective_message(&msg.from); + }) // Retain only messages that have not been seen before .filter(|msg| { self.message_cache @@ -257,6 +285,9 @@ impl MixNode { self.data_msg_lottery_update_time_sender .send(elapsed) .unwrap(); + self.conn_maintenance_update_time_sender + .send(elapsed) + .unwrap(); self.persistent_update_time_sender.send(elapsed).unwrap(); self.blend_update_time_sender.send(elapsed).unwrap(); self.epoch_update_sender.send(elapsed).unwrap(); @@ -284,6 +315,21 @@ impl MixNode { tracing::info!("Emission: {}", serde_json::to_string(log).unwrap()); } + fn log_monitor(&self, effective_messages_series: &Series) { + if effective_messages_series.is_empty() { + return; + } + + let log = SeriesLog { + series_type: "EffectiveMessages".to_string(), + min: effective_messages_series.min().unwrap().unwrap(), + avg: effective_messages_series.mean().unwrap(), + median: effective_messages_series.median().unwrap(), + max: effective_messages_series.max().unwrap().unwrap(), + }; + tracing::info!("Monitor: {}", serde_json::to_string(&log).unwrap()); + } + fn new_emission_log(&self, emission_type: &str) -> EmissionLog { EmissionLog { emission_type: emission_type.to_string(), @@ -324,6 +370,17 @@ impl Node for MixNode { } } + // Proceed connection maintenance if interval is reached. + if let Poll::Ready(Some(_)) = pin!(&mut self.conn_maintenance_interval).poll_next(&mut cx) { + let (monitors, _, _) = self.conn_maintenance.reset().unwrap(); + let effective_messages_series = Series::from_iter( + monitors + .values() + .map(|monitor| monitor.effective_messages as u64), + ); + self.log_monitor(&effective_messages_series); + } + // Handle incoming messages for network_message in self.receive() { self.forward( @@ -400,3 +457,12 @@ struct EmissionLog { step_id: usize, node_id: usize, } + +#[derive(Debug, Serialize, Deserialize)] +struct SeriesLog { + series_type: String, + min: u64, + avg: f64, + median: f64, + max: u64, +} diff --git a/simlib/blendnet-sims/src/settings.rs b/simlib/blendnet-sims/src/settings.rs index ba53b46..fdbc305 100644 --- a/simlib/blendnet-sims/src/settings.rs +++ b/simlib/blendnet-sims/src/settings.rs @@ -1,5 +1,8 @@ use netrunner::settings::SimulationSettings; -use nomos_mix::persistent_transmission::PersistentTransmissionSettings; +use nomos_mix::{ + conn_maintenance::ConnectionMaintenanceSettings, + persistent_transmission::PersistentTransmissionSettings, +}; use serde::{Deserialize, Deserializer}; use std::time::Duration; @@ -7,7 +10,6 @@ use std::time::Duration; pub struct SimSettings { #[serde(flatten)] pub simulation_settings: SimulationSettings, - pub connected_peers_count: usize, #[serde(deserialize_with = "deserialize_duration_with_human_time")] pub data_message_lottery_interval: Duration, pub stake_proportion: f64, @@ -19,6 +21,7 @@ pub struct SimSettings { pub slots_per_epoch: usize, pub number_of_hops: usize, // For tier 1 + pub conn_maintenance: ConnectionMaintenanceSettings, pub persistent_transmission: PersistentTransmissionSettings, // For tier 2 pub number_of_mix_layers: usize,