Mix: add connection maintenance, excluding disconnecting

This commit is contained in:
Youngjoon Lee 2024-12-06 13:52:30 +09:00
parent 8ef0adc996
commit 6ca609acd9
No known key found for this signature in database
GPG Key ID: 167546E2D1712F8C
5 changed files with 109 additions and 36 deletions

View File

@ -18,12 +18,12 @@ tracing = "0.1.40"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"] }
netrunner = { path = "../netrunner" }
nomos-tracing = { git = "https://github.com/logos-co/nomos-node.git" }
nomos-mix = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix" }
nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", package = "nomos-mix-message" }
nomos-mix = { git = "https://github.com/logos-co/nomos-node", branch = "mix-conn-monitor-sim", package = "nomos-mix" }
nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", branch = "mix-conn-monitor-sim", package = "nomos-mix-message" }
futures = "0.3.31"
rand_chacha = "0.3"
multiaddr = "0.18"
sha2 = "0.10"
uuid = { version = "1", features = ["fast-rng", "v4"] }
tracing-appender = "0.2"
cached = "0.54.0"
polars = "0.44"

View File

@ -33,9 +33,19 @@
"sum": 10
}
],
"connected_peers_count": 4,
"data_message_lottery_interval": "20s",
"stake_proportion": 1.0,
"conn_maintenance": {
"peering_degree": 4,
"max_peering_degree": 8,
"monitor": {
"time_window": "20s",
"expected_effective_messages": 0.0,
"effective_message_tolerance": 0.0,
"expected_drop_messages": 0.0,
"drop_message_tolerance": 0.0
}
},
"epoch_duration": "432000s",
"slot_duration": "20s",
"slots_per_epoch": 21600,

View File

@ -21,7 +21,6 @@ use nomos_mix::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
};
use parking_lot::Mutex;
use rand::prelude::IteratorRandom;
use rand::seq::SliceRandom;
use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha12Rng;
@ -87,7 +86,6 @@ impl SimulationApp {
let behaviours = create_behaviours(&settings.simulation_settings.network_settings);
let regions_data = RegionsData::new(regions, behaviours);
let ids = node_ids.clone();
let network = Arc::new(Mutex::new(Network::<MixMessage>::new(regions_data, seed)));
let nodes: Vec<_> = node_ids
@ -101,16 +99,13 @@ impl SimulationApp {
settings.simulation_settings.clone(),
no_netcap,
MixnodeSettings {
connected_peers: ids
.iter()
.filter(|&id| id != &node_id)
.copied()
.choose_multiple(&mut rng, settings.connected_peers_count),
membership: node_ids.clone(),
data_message_lottery_interval: settings.data_message_lottery_interval,
stake_proportion: settings.stake_proportion / node_ids.len() as f64,
seed: rng.next_u64(),
epoch_duration: settings.epoch_duration, // 5 days seconds
slot_duration: settings.slot_duration,
conn_maintenance: settings.conn_maintenance,
persistent_transmission: settings.persistent_transmission,
message_blend: MessageBlendSettings {
cryptographic_processor: CryptographicProcessorSettings {
@ -127,7 +122,6 @@ impl SimulationApp {
slots_per_epoch: settings.slots_per_epoch,
network_size: node_ids.len(),
},
membership: node_ids.iter().map(|&id| id.into()).collect(),
},
)
})

View File

@ -11,13 +11,13 @@ use crossbeam::channel;
use futures::Stream;
use lottery::StakeLottery;
use message::{Payload, PayloadId};
use multiaddr::Multiaddr;
use netrunner::network::NetworkMessage;
use netrunner::node::{Node, NodeId, NodeIdExt};
use netrunner::{
network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize},
warding::WardCondition,
};
use nomos_mix::conn_maintenance::{ConnectionMaintenance, ConnectionMaintenanceSettings};
use nomos_mix::{
cover_traffic::{CoverTraffic, CoverTrafficSettings},
membership::Membership,
@ -30,6 +30,7 @@ use nomos_mix::{
MixOutgoingMessage,
};
use nomos_mix_message::mock::MockMixMessage;
use polars::series::Series;
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use scheduler::{Interval, TemporalRelease};
@ -50,16 +51,16 @@ impl PayloadSize for MixMessage {
#[derive(Deserialize)]
pub struct MixnodeSettings {
pub connected_peers: Vec<NodeId>,
pub membership: Vec<NodeId>,
pub data_message_lottery_interval: Duration,
pub stake_proportion: f64,
pub seed: u64,
pub epoch_duration: Duration,
pub slot_duration: Duration,
pub conn_maintenance: ConnectionMaintenanceSettings,
pub persistent_transmission: PersistentTransmissionSettings,
pub message_blend: MessageBlendSettings<MockMixMessage>,
pub cover_traffic_settings: CoverTrafficSettings,
pub membership: Vec<<MockMixMessage as nomos_mix_message::MixMessage>::PublicKey>,
}
type Sha256Hash = [u8; 32];
@ -68,7 +69,6 @@ type Sha256Hash = [u8; 32];
pub struct MixNode {
id: NodeId,
state: MixnodeState,
settings: MixnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>,
message_cache: TimedCache<Sha256Hash, ()>,
@ -76,6 +76,9 @@ pub struct MixNode {
data_msg_lottery_interval: Interval,
data_msg_lottery: StakeLottery<ChaCha12Rng>,
conn_maintenance: ConnectionMaintenance<NodeId, MockMixMessage, ChaCha12Rng>,
conn_maintenance_update_time_sender: channel::Sender<Duration>,
conn_maintenance_interval: Interval,
persistent_sender: channel::Sender<Vec<u8>>,
persistent_update_time_sender: channel::Sender<Duration>,
persistent_transmission_messages: PersistentTransmissionStream<
@ -84,12 +87,13 @@ pub struct MixNode {
MockMixMessage,
Interval,
>,
crypto_processor: CryptographicProcessor<ChaCha12Rng, MockMixMessage>,
crypto_processor: CryptographicProcessor<ChaCha12Rng, NodeId, MockMixMessage>,
blend_sender: channel::Sender<Vec<u8>>,
blend_update_time_sender: channel::Sender<Duration>,
blend_messages: MessageBlendStream<
CrossbeamReceiverStream<Vec<u8>>,
ChaCha12Rng,
NodeId,
MockMixMessage,
TemporalRelease,
>,
@ -106,6 +110,22 @@ impl MixNode {
) -> Self {
let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed);
// Init Membership
let nodes: Vec<
nomos_mix::membership::Node<
NodeId,
<MockMixMessage as nomos_mix_message::MixMessage>::PublicKey,
>,
> = settings
.membership
.iter()
.map(|&node_id| nomos_mix::membership::Node {
address: node_id,
public_key: node_id.into(),
})
.collect();
let membership = Membership::<NodeId, MockMixMessage>::new(nodes, id.into());
// Init Interval for data message lottery
let (data_msg_lottery_update_time_sender, data_msg_lottery_update_time_receiver) =
channel::unbounded();
@ -118,8 +138,23 @@ impl MixNode {
settings.stake_proportion,
);
// Init Tier-1: Persistent transmission
// Init Tier-1: Connection maintenance and Persistent transmission
let mut conn_maintenance = ConnectionMaintenance::new(
settings.conn_maintenance,
membership.clone(),
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
);
conn_maintenance
.bootstrap()
.into_iter()
.for_each(|peer| conn_maintenance.add_connected_peer(peer));
let (conn_maintenance_update_time_sender, conn_maintenance_update_time_receiver) =
channel::unbounded();
let (persistent_sender, persistent_receiver) = channel::unbounded();
let conn_maintenance_interval = Interval::new(
settings.conn_maintenance.monitor.unwrap().time_window,
conn_maintenance_update_time_receiver,
);
let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded();
let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver)
.persistent_transmission(
@ -136,19 +171,7 @@ impl MixNode {
// Init Tier-2: message blend
let (blend_sender, blend_receiver) = channel::unbounded();
let (blend_update_time_sender, blend_update_time_receiver) = channel::unbounded();
let nodes: Vec<
nomos_mix::membership::Node<
<MockMixMessage as nomos_mix_message::MixMessage>::PublicKey,
>,
> = settings
.membership
.iter()
.map(|&public_key| nomos_mix::membership::Node {
address: Multiaddr::empty(),
public_key,
})
.collect();
let membership = Membership::<MockMixMessage>::new(nodes, id.into());
let crypto_processor = CryptographicProcessor::new(
settings.message_blend.cryptographic_processor.clone(),
membership.clone(),
@ -188,7 +211,6 @@ impl MixNode {
// We're not coupling this lifespan with the steps now, but it's okay
// We expected that a message will be delivered to most of nodes within 60s.
message_cache: TimedCache::with_lifespan(60),
settings,
state: MixnodeState {
node_id: id,
step_id: 0,
@ -197,6 +219,9 @@ impl MixNode {
data_msg_lottery_update_time_sender,
data_msg_lottery_interval,
data_msg_lottery,
conn_maintenance,
conn_maintenance_update_time_sender,
conn_maintenance_interval,
persistent_sender,
persistent_update_time_sender,
persistent_transmission_messages,
@ -217,8 +242,8 @@ impl MixNode {
log: Option<EmissionLog>,
) {
for (i, node_id) in self
.settings
.connected_peers
.conn_maintenance
.connected_peers()
.iter()
.filter(|&id| Some(*id) != exclude_node)
.enumerate()
@ -238,6 +263,9 @@ impl MixNode {
self.network_interface
.receive_messages()
.into_iter()
.inspect(|msg| {
self.conn_maintenance.record_effective_message(&msg.from);
})
// Retain only messages that have not been seen before
.filter(|msg| {
self.message_cache
@ -257,6 +285,9 @@ impl MixNode {
self.data_msg_lottery_update_time_sender
.send(elapsed)
.unwrap();
self.conn_maintenance_update_time_sender
.send(elapsed)
.unwrap();
self.persistent_update_time_sender.send(elapsed).unwrap();
self.blend_update_time_sender.send(elapsed).unwrap();
self.epoch_update_sender.send(elapsed).unwrap();
@ -284,6 +315,21 @@ impl MixNode {
tracing::info!("Emission: {}", serde_json::to_string(log).unwrap());
}
fn log_monitor(&self, effective_messages_series: &Series) {
if effective_messages_series.is_empty() {
return;
}
let log = SeriesLog {
series_type: "EffectiveMessages".to_string(),
min: effective_messages_series.min().unwrap().unwrap(),
avg: effective_messages_series.mean().unwrap(),
median: effective_messages_series.median().unwrap(),
max: effective_messages_series.max().unwrap().unwrap(),
};
tracing::info!("Monitor: {}", serde_json::to_string(&log).unwrap());
}
fn new_emission_log(&self, emission_type: &str) -> EmissionLog {
EmissionLog {
emission_type: emission_type.to_string(),
@ -324,6 +370,17 @@ impl Node for MixNode {
}
}
// Proceed connection maintenance if interval is reached.
if let Poll::Ready(Some(_)) = pin!(&mut self.conn_maintenance_interval).poll_next(&mut cx) {
let (monitors, _, _) = self.conn_maintenance.reset().unwrap();
let effective_messages_series = Series::from_iter(
monitors
.values()
.map(|monitor| monitor.effective_messages as u64),
);
self.log_monitor(&effective_messages_series);
}
// Handle incoming messages
for network_message in self.receive() {
self.forward(
@ -400,3 +457,12 @@ struct EmissionLog {
step_id: usize,
node_id: usize,
}
#[derive(Debug, Serialize, Deserialize)]
struct SeriesLog {
series_type: String,
min: u64,
avg: f64,
median: f64,
max: u64,
}

View File

@ -1,5 +1,8 @@
use netrunner::settings::SimulationSettings;
use nomos_mix::persistent_transmission::PersistentTransmissionSettings;
use nomos_mix::{
conn_maintenance::ConnectionMaintenanceSettings,
persistent_transmission::PersistentTransmissionSettings,
};
use serde::{Deserialize, Deserializer};
use std::time::Duration;
@ -7,7 +10,6 @@ use std::time::Duration;
pub struct SimSettings {
#[serde(flatten)]
pub simulation_settings: SimulationSettings,
pub connected_peers_count: usize,
#[serde(deserialize_with = "deserialize_duration_with_human_time")]
pub data_message_lottery_interval: Duration,
pub stake_proportion: f64,
@ -19,6 +21,7 @@ pub struct SimSettings {
pub slots_per_epoch: usize,
pub number_of_hops: usize,
// For tier 1
pub conn_maintenance: ConnectionMaintenanceSettings,
pub persistent_transmission: PersistentTransmissionSettings,
// For tier 2
pub number_of_mix_layers: usize,