This commit is contained in:
Youngjoon Lee 2024-12-13 13:10:03 +09:00
parent d2a88fc2e4
commit 53d0c2649d
No known key found for this signature in database
GPG Key ID: 303963A54A81DD4D
5 changed files with 93 additions and 31 deletions

View File

@ -27,6 +27,7 @@ uuid = { version = "1", features = ["fast-rng", "v4"] }
tracing-appender = "0.2"
cached = "0.54.0"
polars = "0.44"
humantime-serde = "1.1.1"
[dev-dependencies]
tracing-subscriber = "0.3"

View File

@ -4,7 +4,7 @@
"north america:north america": "50ms",
"north america:europe": "100ms",
"north america:asia": "120ms",
"europe:europe": "50ms",
"europe:europe": "10ms",
"europe:asia": "100ms",
"europe:north america": "120ms",
"asia:north america": "100ms",
@ -12,43 +12,43 @@
"asia:asia": "40ms"
},
"regions": {
"north america": 0.4,
"europe": 0.4,
"asia": 0.3
"north america": 0.0,
"europe": 1.0,
"asia": 0.0
}
},
"node_settings": {
"timeout": "1000ms"
},
"step_time": "40ms",
"step_time": "10ms",
"runner_settings": "Sync",
"stream_settings": {
"path": "test.json"
},
"node_count": 10,
"node_count": 100,
"seed": 0,
"record_settings": {},
"wards": [
{
"sum": 10
"sum": 1000
}
],
"data_message_lottery_interval": "20s",
"stake_proportion": 1.0,
"stake_proportion": 0.0,
"conn_maintenance": {
"peering_degree": 4,
"max_peering_degree": 8,
"peering_degree": 2,
"max_peering_degree": 2,
"monitor": {
"time_window": "20s",
"expected_effective_messages": 0.0,
"effective_message_tolerance": 0.0,
"expected_drop_messages": 0.0,
"drop_message_tolerance": 0.0
"time_window": "200s",
"expected_effective_messages": "0.0",
"effective_message_tolerance": "0.0",
"expected_drop_messages": "0.0",
"drop_message_tolerance": "0.0"
}
},
"epoch_duration": "432000s",
"slot_duration": "20s",
"slots_per_epoch": 21600,
"epoch_duration": "200s",
"slot_duration": "1s",
"slots_per_epoch": 200,
"number_of_hops": 2,
"persistent_transmission": {
"max_emission_frequency": 1.0,

View File

@ -106,6 +106,7 @@ impl SimulationApp {
settings.simulation_settings.clone(),
no_netcap,
MixnodeSettings {
step_time: settings.simulation_settings.step_time,
membership: node_ids.clone(),
topology: topology.clone(),
data_message_lottery_interval: settings.data_message_lottery_interval,

View File

@ -8,6 +8,7 @@ pub mod topology;
use crate::node::mix::consensus_streams::{Epoch, Slot};
use cached::{Cached, TimedCache};
use consensus_streams::CounterInterval;
use crossbeam::channel;
use futures::Stream;
use lottery::StakeLottery;
@ -39,6 +40,7 @@ use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use state::MixnodeState;
use std::collections::HashSet;
use std::ops::Mul;
use std::{pin::pin, task::Poll, time::Duration};
use stream_wrapper::CrossbeamReceiverStream;
use topology::Topology;
@ -54,6 +56,7 @@ impl PayloadSize for MixMessage {
#[derive(Deserialize)]
pub struct MixnodeSettings {
pub step_time: Duration,
pub membership: Vec<NodeId>,
pub topology: Topology,
pub data_message_lottery_interval: Duration,
@ -71,6 +74,7 @@ type Sha256Hash = [u8; 32];
/// This node implementation only used for testing different streaming implementation purposes.
pub struct MixNode {
step_time: Duration,
id: NodeId,
state: MixnodeState,
network_interface: InMemoryNetworkInterface<MixMessage>,
@ -82,7 +86,7 @@ pub struct MixNode {
conn_maintenance: ConnectionMaintenance<NodeId, MockMixMessage, ChaCha12Rng>,
conn_maintenance_update_time_sender: channel::Sender<Duration>,
conn_maintenance_interval: Interval,
conn_maintenance_interval: CounterInterval,
persistent_sender: channel::Sender<Vec<u8>>,
persistent_update_time_sender: channel::Sender<Duration>,
persistent_transmission_messages: PersistentTransmissionStream<
@ -103,7 +107,9 @@ pub struct MixNode {
>,
epoch_update_sender: channel::Sender<Duration>,
slot_update_sender: channel::Sender<Duration>,
slot_update_sender_new: channel::Sender<Duration>,
cover_traffic: CoverTraffic<Epoch, Slot, MockMixMessage>,
slot_scheduler: Slot,
}
impl MixNode {
@ -156,7 +162,7 @@ impl MixNode {
let (conn_maintenance_update_time_sender, conn_maintenance_update_time_receiver) =
channel::unbounded();
let (persistent_sender, persistent_receiver) = channel::unbounded();
let conn_maintenance_interval = Interval::new(
let conn_maintenance_interval = CounterInterval::new(
settings.conn_maintenance.monitor.unwrap().time_window,
conn_maintenance_update_time_receiver,
);
@ -186,8 +192,8 @@ impl MixNode {
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
blend_update_time_receiver,
(
1,
settings.message_blend.temporal_processor.max_delay_seconds,
0,
settings.message_blend.temporal_processor.max_delay_seconds / 2,
),
);
let blend_messages = CrossbeamReceiverStream::new(blend_receiver).blend(
@ -200,6 +206,7 @@ impl MixNode {
// tier 3 cover traffic
let (epoch_update_sender, epoch_updater_update_receiver) = channel::unbounded();
let (slot_update_sender, slot_updater_update_receiver) = channel::unbounded();
let (slot_update_sender_new, slot_updater_update_receiver_new) = channel::unbounded();
let cover_traffic: CoverTraffic<Epoch, Slot, MockMixMessage> = CoverTraffic::new(
settings.cover_traffic_settings,
Epoch::new(settings.epoch_duration, epoch_updater_update_receiver),
@ -209,8 +216,14 @@ impl MixNode {
slot_updater_update_receiver,
),
);
let slot_scheduler = Slot::new(
settings.cover_traffic_settings.slots_per_epoch,
settings.slot_duration,
slot_updater_update_receiver_new,
);
Self {
step_time: settings.step_time,
id,
network_interface,
// We're not coupling this lifespan with the steps now, but it's okay
@ -236,7 +249,9 @@ impl MixNode {
blend_messages,
epoch_update_sender,
slot_update_sender,
slot_update_sender_new,
cover_traffic,
slot_scheduler,
}
}
@ -270,6 +285,16 @@ impl MixNode {
.into_iter()
.inspect(|msg| {
self.conn_maintenance.record_effective_message(&msg.from);
let log = MessageLog {
payload_id: "received".to_string(),
step_id: self.state.step_id,
elapsed: self.step_time.mul(self.state.step_id.try_into().unwrap()),
node_id: self.id.index(),
};
tracing::info!(
"CoverMessageReceived {}",
serde_json::to_string(&log).unwrap()
);
})
// Retain only messages that have not been seen before
.filter(|msg| {
@ -297,6 +322,7 @@ impl MixNode {
self.blend_update_time_sender.send(elapsed).unwrap();
self.epoch_update_sender.send(elapsed).unwrap();
self.slot_update_sender.send(elapsed).unwrap();
self.slot_update_sender_new.send(elapsed).unwrap();
}
fn log_message_generated(&self, msg_type: &str, payload: &Payload) {
@ -311,6 +337,7 @@ impl MixNode {
let log = MessageLog {
payload_id: payload.id(),
step_id: self.state.step_id,
elapsed: self.step_time.mul(self.state.step_id.try_into().unwrap()),
node_id: self.id.index(),
};
tracing::info!("{}: {}", tag, serde_json::to_string(&log).unwrap());
@ -389,14 +416,16 @@ impl Node for MixNode {
}
// Proceed connection maintenance if interval is reached.
if let Poll::Ready(Some(_)) = pin!(&mut self.conn_maintenance_interval).poll_next(&mut cx) {
let (monitors, _, _) = self.conn_maintenance.reset().unwrap();
let effective_messages_series = Series::from_iter(
monitors
.values()
.map(|monitor| monitor.effective_messages.to_num::<u64>()),
);
self.log_monitors(&effective_messages_series);
if let Poll::Ready(Some(i)) = pin!(&mut self.conn_maintenance_interval).poll_next(&mut cx) {
if i > 0 {
let (monitors, _, _) = self.conn_maintenance.reset().unwrap();
let effective_messages_series = Series::from_iter(
monitors
.values()
.map(|monitor| monitor.effective_messages.to_num::<u64>()),
);
self.log_monitors(&effective_messages_series);
}
}
// Handle incoming messages
@ -415,6 +444,13 @@ impl Node for MixNode {
if let Poll::Ready(Some(msg)) = pin!(&mut self.blend_messages).poll_next(&mut cx) {
match msg {
MixOutgoingMessage::Outbound(msg) => {
let log = MessageLog {
payload_id: "after_blend".to_string(),
step_id: self.state.step_id,
elapsed: self.step_time.mul(self.state.step_id.try_into().unwrap()),
node_id: self.id.index(),
};
tracing::info!("CoverAfterBlend: {}", serde_json::to_string(&log).unwrap());
self.persistent_sender.send(msg).unwrap();
}
MixOutgoingMessage::FullyUnwrapped(payload) => {
@ -427,6 +463,18 @@ impl Node for MixNode {
}
// Generate a cover message probabilistically
// if let Poll::Ready(Some(slot)) = pin!(&mut self.slot_scheduler).poll_next(&mut cx) {
// // if slot == self.id.index() {
// if slot == 0 {
// let payload = Payload::new();
// self.log_message_generated("Cover", &payload);
// let message = self
// .crypto_processor
// .wrap_message(payload.as_bytes())
// .unwrap();
// self.persistent_sender.send(message).unwrap();
// }
// }
if let Poll::Ready(Some(_)) = pin!(&mut self.cover_traffic).poll_next(&mut cx) {
let payload = Payload::new();
self.log_message_generated("Cover", &payload);
@ -466,6 +514,8 @@ impl Node for MixNode {
struct MessageLog {
payload_id: PayloadId,
step_id: usize,
#[serde(with = "humantime_serde")]
elapsed: Duration,
node_id: usize,
}

View File

@ -58,7 +58,12 @@ impl TemporalRelease {
(min_delay, max_delay): (u64, u64),
) -> Self {
let mut random_sleeps = Box::new(std::iter::repeat_with(move || {
Duration::from_secs((rng.next_u64() % (max_delay + 1)).max(min_delay))
if min_delay == max_delay {
tracing::info!("Temporal release: fixed delay: {}", min_delay);
Duration::from_secs(min_delay)
} else {
Duration::from_secs(rng.next_u64() % (max_delay - min_delay) + min_delay)
}
}));
let current_sleep = random_sleeps.next().unwrap();
Self {
@ -71,6 +76,11 @@ impl TemporalRelease {
pub fn update(&mut self, elapsed: Duration) -> bool {
self.elapsed += elapsed;
if self.elapsed >= self.current_sleep {
tracing::info!(
"Temporal update: elapsed:{:?}, current_sleep:{:?}",
self.elapsed,
self.current_sleep
);
self.elapsed = Duration::from_secs(0);
self.current_sleep = self.random_sleeps.next().unwrap();
true