Merge branch 'master' into emission-logs

This commit is contained in:
Youngjoon Lee 2024-11-09 09:48:25 +07:00
commit 9863d7a0f0
No known key found for this signature in database
GPG Key ID: 25CA11F37F095E5D
9 changed files with 40 additions and 33 deletions

View File

@ -38,11 +38,12 @@
"stake_proportion": 1.0,
"epoch_duration": "432000s",
"slot_duration": "1s",
"slots_per_epoch": 432000,
"number_of_hops": 1,
"persistent_transmission": {
"max_emission_frequency": 1.0,
"drop_message_probability": 0.0
},
"number_of_mix_layers": 1,
"max_delay_seconds": 10,
"slots_per_epoch": 432000
"max_delay_seconds": 10
}

View File

@ -22,9 +22,9 @@ use nomos_mix::message_blend::{
};
use parking_lot::Mutex;
use rand::prelude::IteratorRandom;
use rand::rngs::SmallRng;
use rand::seq::SliceRandom;
use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha12Rng;
use serde::de::DeserializeOwned;
use serde::Serialize;
// internal
@ -73,7 +73,7 @@ impl SimulationApp {
.expect("Time went backwards")
.as_secs()
});
let mut rng = SmallRng::seed_from_u64(seed);
let mut rng = ChaCha12Rng::seed_from_u64(seed);
let mut node_ids: Vec<NodeId> = (0..settings.simulation_settings.node_count)
.map(NodeId::from_index)
.collect();
@ -123,7 +123,7 @@ impl SimulationApp {
},
cover_traffic_settings: CoverTrafficSettings {
node_id: node_id.0,
number_of_hops: settings.number_of_mix_layers,
number_of_hops: settings.number_of_hops,
slots_per_epoch: settings.slots_per_epoch,
network_size: node_ids.len(),
},

View File

@ -67,16 +67,18 @@ mod tests {
let (update_sender, update_receiver) = channel::unbounded();
let mut interval = CounterInterval::new(Duration::from_secs(1), update_receiver);
update_sender.send(Duration::from_secs(0)).unwrap();
assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(0)));
update_sender.send(Duration::from_secs(0)).unwrap();
assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Pending);
update_sender.send(Duration::from_millis(999)).unwrap();
assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Pending);
update_sender.send(Duration::from_millis(1)).unwrap();
assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(0)));
update_sender.send(Duration::from_secs(1)).unwrap();
assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
update_sender.send(Duration::from_secs(3)).unwrap();
update_sender.send(Duration::from_secs(1)).unwrap();
assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(2)));
update_sender.send(Duration::from_secs(3)).unwrap();
assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(3)));
}
#[test]
@ -87,17 +89,19 @@ mod tests {
let (update_sender, update_receiver) = channel::unbounded();
let mut slot = Slot::new(3, Duration::from_secs(1), update_receiver);
update_sender.send(Duration::from_secs(0)).unwrap();
assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(0)));
update_sender.send(Duration::from_secs(0)).unwrap();
assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Pending);
update_sender.send(Duration::from_millis(999)).unwrap();
assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Pending);
update_sender.send(Duration::from_millis(1)).unwrap();
assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
update_sender.send(Duration::from_secs(1)).unwrap();
assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(2)));
update_sender.send(Duration::from_secs(3)).unwrap();
assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(0)));
update_sender.send(Duration::from_secs(1)).unwrap();
assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
update_sender.send(Duration::from_secs(3)).unwrap();
assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(2)));
update_sender.send(Duration::from_secs(1)).unwrap();
assert_eq!(slot.poll_next_unpin(&mut cx), Poll::Ready(Some(0)));
}
}

View File

@ -211,13 +211,6 @@ impl MixNode {
}
fn forward(&mut self, message: MixMessage, exclude_node: Option<NodeId>, log: EmissionLog) {
if self
.message_cache
.cache_set(Self::sha256(&message.0), ())
.is_some()
{
return;
}
for (i, node_id) in self
.settings
.connected_peers
@ -231,6 +224,7 @@ impl MixNode {
self.network_interface
.send_message(*node_id, message.clone())
}
self.message_cache.cache_set(Self::sha256(&message.0), ());
}
fn receive(&mut self) -> Vec<NetworkMessage<MixMessage>> {
@ -262,8 +256,8 @@ impl MixNode {
self.slot_update_sender.send(elapsed).unwrap();
}
fn log_message_generated(&self, payload: &Payload) {
self.log_message("MessageGenerated", payload);
fn log_message_generated(&self, msg_type: &str, payload: &Payload) {
self.log_message(format!("{}MessageGenerated", msg_type).as_str(), payload);
}
fn log_message_fully_unwrapped(&self, payload: &Payload) {
@ -314,7 +308,7 @@ impl Node for MixNode {
if let Poll::Ready(Some(_)) = pin!(&mut self.data_msg_lottery_interval).poll_next(&mut cx) {
if self.data_msg_lottery.run() {
let payload = Payload::new();
self.log_message_generated(&payload);
self.log_message_generated("Data", &payload);
let message = self
.crypto_processor
.wrap_message(payload.as_bytes())
@ -353,7 +347,7 @@ impl Node for MixNode {
// Generate a cover message probabilistically
if let Poll::Ready(Some(_)) = pin!(&mut self.cover_traffic).poll_next(&mut cx) {
let payload = Payload::new();
self.log_message_generated(&payload);
self.log_message_generated("Cover", &payload);
let message = self
.crypto_processor
.wrap_message(payload.as_bytes())

View File

@ -15,7 +15,7 @@ impl Interval {
pub fn new(duration: Duration, update_time: channel::Receiver<Duration>) -> Self {
Self {
duration,
current_elapsed: Duration::from_secs(0),
current_elapsed: duration, // to immediately release at the interval 0
update_time,
}
}
@ -104,7 +104,7 @@ mod tests {
let (_tx, rx) = channel::unbounded();
let mut interval = Interval::new(Duration::from_secs(2), rx);
assert!(!interval.update(Duration::from_secs(0)));
assert!(interval.update(Duration::from_secs(0)));
assert!(!interval.update(Duration::from_secs(1)));
assert!(interval.update(Duration::from_secs(1)));
assert!(interval.update(Duration::from_secs(3)));
@ -118,6 +118,8 @@ mod tests {
let (tx, rx) = channel::unbounded();
let mut interval = Interval::new(Duration::from_secs(2), rx);
tx.send(Duration::from_secs(0)).unwrap();
assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Ready(Some(())));
tx.send(Duration::from_secs(0)).unwrap();
assert_eq!(interval.poll_next_unpin(&mut cx), Poll::Pending);
tx.send(Duration::from_secs(1)).unwrap();
@ -132,7 +134,7 @@ mod tests {
fn temporal_release_update() {
let (_tx, rx) = channel::unbounded();
let mut temporal_release =
TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 2));
TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
assert!(!temporal_release.update(Duration::from_secs(0)));
assert!(!temporal_release.update(Duration::from_millis(999)));
@ -147,7 +149,7 @@ mod tests {
let (tx, rx) = channel::unbounded();
let mut temporal_release =
TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 2));
TemporalRelease::new(rand_chacha::ChaCha8Rng::from_entropy(), rx, (1, 1));
tx.send(Duration::from_secs(0)).unwrap();
assert_eq!(temporal_release.poll_next_unpin(&mut cx), Poll::Pending);

View File

@ -11,14 +11,18 @@ pub struct SimSettings {
#[serde(deserialize_with = "deserialize_duration_with_human_time")]
pub data_message_lottery_interval: Duration,
pub stake_proportion: f64,
// For tier 3: cover traffic
#[serde(deserialize_with = "deserialize_duration_with_human_time")]
pub epoch_duration: Duration,
#[serde(deserialize_with = "deserialize_duration_with_human_time")]
pub slot_duration: Duration,
pub slots_per_epoch: usize,
pub number_of_hops: usize,
// For tier 1
pub persistent_transmission: PersistentTransmissionSettings,
// For tier 2
pub number_of_mix_layers: usize,
pub max_delay_seconds: u64,
pub slots_per_epoch: usize,
}
fn deserialize_duration_with_human_time<'de, D>(deserializer: D) -> Result<Duration, D::Error>

View File

@ -28,6 +28,7 @@ serde_with = "2.3"
serde_json = "1.0"
thiserror = "1"
tracing = { version = "0.1", default-features = false, features = ["log", "attributes"] }
rand_chacha = "0.3"
[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }

View File

@ -12,7 +12,8 @@ use std::{
// crates
use crossbeam::channel::{self, Receiver, Sender};
use parking_lot::Mutex;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha12Rng;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
// internal
@ -284,7 +285,7 @@ where
.messages
.par_iter()
.filter(|(network_time, message)| {
let mut rng = SmallRng::seed_from_u64(self.seed);
let mut rng = ChaCha12Rng::seed_from_u64(self.seed);
self.send_or_drop_message(&mut rng, network_time, message)
})
.cloned()

View File

@ -13,8 +13,8 @@ use crate::streaming::{
};
use crossbeam::channel::Sender;
use parking_lot::RwLock;
use rand::rngs::SmallRng;
use rand::{RngCore, SeedableRng};
use rand_chacha::ChaCha12Rng;
use rayon::prelude::*;
use serde::Serialize;
@ -72,7 +72,7 @@ where
{
network: Network<M>,
wards: Vec<Ward>,
rng: SmallRng,
rng: ChaCha12Rng,
}
impl<M> SimulationRunnerInner<M>
@ -136,7 +136,7 @@ where
// Store the settings to the producer so that we can collect them later
producer.send(R::from(settings.clone()))?;
let rng = SmallRng::seed_from_u64(seed);
let rng = ChaCha12Rng::seed_from_u64(seed);
let nodes = Arc::new(RwLock::new(nodes));
let SimulationSettings {
wards,