From 53d0c2649d78a6f68d906188dec650e80081061f Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Fri, 13 Dec 2024 13:10:03 +0900 Subject: [PATCH] wip --- simlib/blendnet-sims/Cargo.toml | 1 + simlib/blendnet-sims/config/blendnet.json | 36 ++++----- simlib/blendnet-sims/src/main.rs | 1 + simlib/blendnet-sims/src/node/mix/mod.rs | 74 ++++++++++++++++--- .../blendnet-sims/src/node/mix/scheduler.rs | 12 ++- 5 files changed, 93 insertions(+), 31 deletions(-) diff --git a/simlib/blendnet-sims/Cargo.toml b/simlib/blendnet-sims/Cargo.toml index 3f6cd3b..777bd64 100644 --- a/simlib/blendnet-sims/Cargo.toml +++ b/simlib/blendnet-sims/Cargo.toml @@ -27,6 +27,7 @@ uuid = { version = "1", features = ["fast-rng", "v4"] } tracing-appender = "0.2" cached = "0.54.0" polars = "0.44" +humantime-serde = "1.1.1" [dev-dependencies] tracing-subscriber = "0.3" diff --git a/simlib/blendnet-sims/config/blendnet.json b/simlib/blendnet-sims/config/blendnet.json index d34b79c..ee1a6e3 100644 --- a/simlib/blendnet-sims/config/blendnet.json +++ b/simlib/blendnet-sims/config/blendnet.json @@ -4,7 +4,7 @@ "north america:north america": "50ms", "north america:europe": "100ms", "north america:asia": "120ms", - "europe:europe": "50ms", + "europe:europe": "10ms", "europe:asia": "100ms", "europe:north america": "120ms", "asia:north america": "100ms", @@ -12,43 +12,43 @@ "asia:asia": "40ms" }, "regions": { - "north america": 0.4, - "europe": 0.4, - "asia": 0.3 + "north america": 0.0, + "europe": 1.0, + "asia": 0.0 } }, "node_settings": { "timeout": "1000ms" }, - "step_time": "40ms", + "step_time": "10ms", "runner_settings": "Sync", "stream_settings": { "path": "test.json" }, - "node_count": 10, + "node_count": 100, "seed": 0, "record_settings": {}, "wards": [ { - "sum": 10 + "sum": 1000 } ], "data_message_lottery_interval": "20s", - "stake_proportion": 1.0, + "stake_proportion": 0.0, "conn_maintenance": { - "peering_degree": 4, - "max_peering_degree": 8, + "peering_degree": 2, + "max_peering_degree": 2, "monitor": { - "time_window": "20s", - "expected_effective_messages": 0.0, - "effective_message_tolerance": 0.0, - "expected_drop_messages": 0.0, - "drop_message_tolerance": 0.0 + "time_window": "200s", + "expected_effective_messages": "0.0", + "effective_message_tolerance": "0.0", + "expected_drop_messages": "0.0", + "drop_message_tolerance": "0.0" } }, - "epoch_duration": "432000s", - "slot_duration": "20s", - "slots_per_epoch": 21600, + "epoch_duration": "200s", + "slot_duration": "1s", + "slots_per_epoch": 200, "number_of_hops": 2, "persistent_transmission": { "max_emission_frequency": 1.0, diff --git a/simlib/blendnet-sims/src/main.rs b/simlib/blendnet-sims/src/main.rs index 0bb099a..5793341 100644 --- a/simlib/blendnet-sims/src/main.rs +++ b/simlib/blendnet-sims/src/main.rs @@ -106,6 +106,7 @@ impl SimulationApp { settings.simulation_settings.clone(), no_netcap, MixnodeSettings { + step_time: settings.simulation_settings.step_time, membership: node_ids.clone(), topology: topology.clone(), data_message_lottery_interval: settings.data_message_lottery_interval, diff --git a/simlib/blendnet-sims/src/node/mix/mod.rs b/simlib/blendnet-sims/src/node/mix/mod.rs index 1f2a9fd..81378c7 100644 --- a/simlib/blendnet-sims/src/node/mix/mod.rs +++ b/simlib/blendnet-sims/src/node/mix/mod.rs @@ -8,6 +8,7 @@ pub mod topology; use crate::node::mix::consensus_streams::{Epoch, Slot}; use cached::{Cached, TimedCache}; +use consensus_streams::CounterInterval; use crossbeam::channel; use futures::Stream; use lottery::StakeLottery; @@ -39,6 +40,7 @@ use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use state::MixnodeState; use std::collections::HashSet; +use std::ops::Mul; use std::{pin::pin, task::Poll, time::Duration}; use stream_wrapper::CrossbeamReceiverStream; use topology::Topology; @@ -54,6 +56,7 @@ impl PayloadSize for MixMessage { #[derive(Deserialize)] pub struct MixnodeSettings { + pub step_time: Duration, pub membership: Vec, pub topology: Topology, pub data_message_lottery_interval: Duration, @@ -71,6 +74,7 @@ type Sha256Hash = [u8; 32]; /// This node implementation only used for testing different streaming implementation purposes. pub struct MixNode { + step_time: Duration, id: NodeId, state: MixnodeState, network_interface: InMemoryNetworkInterface, @@ -82,7 +86,7 @@ pub struct MixNode { conn_maintenance: ConnectionMaintenance, conn_maintenance_update_time_sender: channel::Sender, - conn_maintenance_interval: Interval, + conn_maintenance_interval: CounterInterval, persistent_sender: channel::Sender>, persistent_update_time_sender: channel::Sender, persistent_transmission_messages: PersistentTransmissionStream< @@ -103,7 +107,9 @@ pub struct MixNode { >, epoch_update_sender: channel::Sender, slot_update_sender: channel::Sender, + slot_update_sender_new: channel::Sender, cover_traffic: CoverTraffic, + slot_scheduler: Slot, } impl MixNode { @@ -156,7 +162,7 @@ impl MixNode { let (conn_maintenance_update_time_sender, conn_maintenance_update_time_receiver) = channel::unbounded(); let (persistent_sender, persistent_receiver) = channel::unbounded(); - let conn_maintenance_interval = Interval::new( + let conn_maintenance_interval = CounterInterval::new( settings.conn_maintenance.monitor.unwrap().time_window, conn_maintenance_update_time_receiver, ); @@ -186,8 +192,8 @@ impl MixNode { ChaCha12Rng::from_rng(&mut rng_generator).unwrap(), blend_update_time_receiver, ( - 1, - settings.message_blend.temporal_processor.max_delay_seconds, + 0, + settings.message_blend.temporal_processor.max_delay_seconds / 2, ), ); let blend_messages = CrossbeamReceiverStream::new(blend_receiver).blend( @@ -200,6 +206,7 @@ impl MixNode { // tier 3 cover traffic let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded(); let (slot_update_sender, slot_updater_update_receiver) = channel::unbounded(); + let (slot_update_sender_new, slot_updater_update_receiver_new) = channel::unbounded(); let cover_traffic: CoverTraffic = CoverTraffic::new( settings.cover_traffic_settings, Epoch::new(settings.epoch_duration, epoch_updater_update_receiver), @@ -209,8 +216,14 @@ impl MixNode { slot_updater_update_receiver, ), ); + let slot_scheduler = Slot::new( + settings.cover_traffic_settings.slots_per_epoch, + settings.slot_duration, + slot_updater_update_receiver_new, + ); Self { + step_time: settings.step_time, id, network_interface, // We're not coupling this lifespan with the steps now, but it's okay @@ -236,7 +249,9 @@ impl MixNode { blend_messages, epoch_update_sender, slot_update_sender, + slot_update_sender_new, cover_traffic, + slot_scheduler, } } @@ -270,6 +285,16 @@ impl MixNode { .into_iter() .inspect(|msg| { self.conn_maintenance.record_effective_message(&msg.from); + let log = MessageLog { + payload_id: "received".to_string(), + step_id: self.state.step_id, + elapsed: self.step_time.mul(self.state.step_id.try_into().unwrap()), + node_id: self.id.index(), + }; + tracing::info!( + "CoverMessageReceived {}", + serde_json::to_string(&log).unwrap() + ); }) // Retain only messages that have not been seen before .filter(|msg| { @@ -297,6 +322,7 @@ impl MixNode { self.blend_update_time_sender.send(elapsed).unwrap(); self.epoch_update_sender.send(elapsed).unwrap(); self.slot_update_sender.send(elapsed).unwrap(); + self.slot_update_sender_new.send(elapsed).unwrap(); } fn log_message_generated(&self, msg_type: &str, payload: &Payload) { @@ -311,6 +337,7 @@ impl MixNode { let log = MessageLog { payload_id: payload.id(), step_id: self.state.step_id, + elapsed: self.step_time.mul(self.state.step_id.try_into().unwrap()), node_id: self.id.index(), }; tracing::info!("{}: {}", tag, serde_json::to_string(&log).unwrap()); @@ -389,14 +416,16 @@ impl Node for MixNode { } // Proceed connection maintenance if interval is reached. - if let Poll::Ready(Some(_)) = pin!(&mut self.conn_maintenance_interval).poll_next(&mut cx) { - let (monitors, _, _) = self.conn_maintenance.reset().unwrap(); - let effective_messages_series = Series::from_iter( - monitors - .values() - .map(|monitor| monitor.effective_messages.to_num::()), - ); - self.log_monitors(&effective_messages_series); + if let Poll::Ready(Some(i)) = pin!(&mut self.conn_maintenance_interval).poll_next(&mut cx) { + if i > 0 { + let (monitors, _, _) = self.conn_maintenance.reset().unwrap(); + let effective_messages_series = Series::from_iter( + monitors + .values() + .map(|monitor| monitor.effective_messages.to_num::()), + ); + self.log_monitors(&effective_messages_series); + } } // Handle incoming messages @@ -415,6 +444,13 @@ impl Node for MixNode { if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) { match msg { MixOutgoingMessage::Outbound(msg) => { + let log = MessageLog { + payload_id: "after_blend".to_string(), + step_id: self.state.step_id, + elapsed: self.step_time.mul(self.state.step_id.try_into().unwrap()), + node_id: self.id.index(), + }; + tracing::info!("CoverAfterBlend: {}", serde_json::to_string(&log).unwrap()); self.persistent_sender.send(msg).unwrap(); } MixOutgoingMessage::FullyUnwrapped(payload) => { @@ -427,6 +463,18 @@ impl Node for MixNode { } // Generate a cover message probabilistically + // if let Poll::Ready(Some(slot)) = pin!(&mut self.slot_scheduler).poll_next(&mut cx) { + // // if slot == self.id.index() { + // if slot == 0 { + // let payload = Payload::new(); + // self.log_message_generated("Cover", &payload); + // let message = self + // .crypto_processor + // .wrap_message(payload.as_bytes()) + // .unwrap(); + // self.persistent_sender.send(message).unwrap(); + // } + // } if let Poll::Ready(Some(_)) = pin!(&mut self.cover_traffic).poll_next(&mut cx) { let payload = Payload::new(); self.log_message_generated("Cover", &payload); @@ -466,6 +514,8 @@ impl Node for MixNode { struct MessageLog { payload_id: PayloadId, step_id: usize, + #[serde(with = "humantime_serde")] + elapsed: Duration, node_id: usize, } diff --git a/simlib/blendnet-sims/src/node/mix/scheduler.rs b/simlib/blendnet-sims/src/node/mix/scheduler.rs index ac403c8..51dff81 100644 --- a/simlib/blendnet-sims/src/node/mix/scheduler.rs +++ b/simlib/blendnet-sims/src/node/mix/scheduler.rs @@ -58,7 +58,12 @@ impl TemporalRelease { (min_delay, max_delay): (u64, u64), ) -> Self { let mut random_sleeps = Box::new(std::iter::repeat_with(move || { - Duration::from_secs((rng.next_u64() % (max_delay + 1)).max(min_delay)) + if min_delay == max_delay { + tracing::info!("Temporal release: fixed delay: {}", min_delay); + Duration::from_secs(min_delay) + } else { + Duration::from_secs(rng.next_u64() % (max_delay - min_delay) + min_delay) + } })); let current_sleep = random_sleeps.next().unwrap(); Self { @@ -71,6 +76,11 @@ impl TemporalRelease { pub fn update(&mut self, elapsed: Duration) -> bool { self.elapsed += elapsed; if self.elapsed >= self.current_sleep { + tracing::info!( + "Temporal update: elapsed:{:?}, current_sleep:{:?}", + self.elapsed, + self.current_sleep + ); self.elapsed = Duration::from_secs(0); self.current_sleep = self.random_sleeps.next().unwrap(); true