Generate data message by running stake lottery (#25)

Co-authored-by: Daniel Sanchez <3danimanimal@gmail.com>
This commit is contained in:
Youngjoon Lee 2024-11-07 13:43:46 +09:00 committed by GitHub
parent 5ab816a3b7
commit 1efc5d9c0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 63 additions and 12 deletions

View File

@ -101,6 +101,8 @@ impl SimulationApp {
.filter(|&id| id != &node_id)
.copied()
.choose_multiple(&mut rng, 3),
data_message_lottery_interval: Duration::from_secs(20),
stake_proportion: 1.0 / node_ids.len() as f64,
seed: 0,
persistent_transmission: PersistentTransmissionSettings {
max_emission_frequency: 1.0,

View File

@ -0,0 +1,22 @@
use rand::Rng;
pub struct StakeLottery<R> {
rng: R,
stake_proportion: f64,
}
impl<R> StakeLottery<R>
where
R: Rng,
{
pub fn new(rng: R, stake_proportion: f64) -> Self {
Self {
rng,
stake_proportion,
}
}
pub fn run(&mut self) -> bool {
self.rng.gen_range(0.0..1.0) < self.stake_proportion
}
}

View File

@ -1,3 +1,4 @@
mod lottery;
mod consensus_streams;
mod scheduler;
pub mod state;
@ -7,6 +8,7 @@ use super::{Node, NodeId};
use crate::network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize};
use crossbeam::channel;
use futures::Stream;
use lottery::StakeLottery;
use multiaddr::Multiaddr;
use nomos_mix::{
membership::Membership,
@ -19,7 +21,7 @@ use nomos_mix::{
MixOutgoingMessage,
};
use nomos_mix_message::mock::MockMixMessage;
use rand::{Rng, RngCore, SeedableRng};
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use scheduler::{Interval, TemporalRelease};
use serde::Deserialize;
@ -43,6 +45,8 @@ impl PayloadSize for MixMessage {
#[derive(Clone, Deserialize)]
pub struct MixnodeSettings {
pub connected_peers: Vec<NodeId>,
pub data_message_lottery_interval: Duration,
pub stake_proportion: f64,
pub seed: u64,
pub persistent_transmission: PersistentTransmissionSettings,
pub message_blend: MessageBlendSettings<MockMixMessage>,
@ -56,7 +60,9 @@ pub struct MixNode {
settings: MixnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>,
msg_gen_rng: ChaCha12Rng,
data_msg_lottery_update_time_sender: channel::Sender<Duration>,
data_msg_lottery_interval: Interval,
data_msg_lottery: StakeLottery<ChaCha12Rng>,
persistent_sender: channel::Sender<Vec<u8>>,
persistent_update_time_sender: channel::Sender<Duration>,
@ -85,6 +91,18 @@ impl MixNode {
) -> Self {
let mut rng_generator = ChaCha12Rng::seed_from_u64(settings.seed);
// Init Interval for data message lottery
let (data_msg_lottery_update_time_sender, data_msg_lottery_update_time_receiver) =
channel::unbounded();
let data_msg_lottery_interval = Interval::new(
settings.data_message_lottery_interval,
data_msg_lottery_update_time_receiver,
);
let data_msg_lottery = StakeLottery::new(
ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
settings.stake_proportion,
);
// Init Tier-1: Persistent transmission
let (persistent_sender, persistent_receiver) = channel::unbounded();
let (persistent_update_time_sender, persistent_update_time_receiver) = channel::unbounded();
@ -146,7 +164,9 @@ impl MixNode {
step_id: 0,
num_messages_broadcasted: 0,
},
msg_gen_rng: ChaCha12Rng::from_rng(&mut rng_generator).unwrap(),
data_msg_lottery_update_time_sender,
data_msg_lottery_interval,
data_msg_lottery,
persistent_sender,
persistent_update_time_sender,
persistent_transmission_messages,
@ -165,6 +185,9 @@ impl MixNode {
}
fn update_time(&mut self, elapsed: Duration) {
self.data_msg_lottery_update_time_sender
.send(elapsed)
.unwrap();
self.persistent_update_time_sender.send(elapsed).unwrap();
self.blend_update_time_sender.send(elapsed).unwrap();
}
@ -187,6 +210,8 @@ impl Node for MixNode {
self.update_time(elapsed);
let Self {
data_msg_lottery_interval,
data_msg_lottery,
persistent_sender,
persistent_transmission_messages,
crypto_processor,
@ -194,24 +219,26 @@ impl Node for MixNode {
blend_messages,
..
} = self;
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
// Generate a message probabilistically (1 % chance)
// TODO: Replace this with the actual cover message generation
if self.msg_gen_rng.gen_range(0..100) == 0 {
let mut payload = [0u8; 1024];
self.msg_gen_rng.fill_bytes(&mut payload);
let message = crypto_processor.wrap_message(&payload).unwrap();
persistent_sender.send(message).unwrap();
if let Poll::Ready(Some(_)) = pin::pin!(data_msg_lottery_interval).poll_next(&mut cx) {
if data_msg_lottery.run() {
// TODO: Include a meaningful information in the payload (such as, step_id) to
// measure the latency until the message reaches the last mix node.
let message = crypto_processor.wrap_message(&[1u8; 1024]).unwrap();
persistent_sender.send(message).unwrap();
}
}
// TODO: Generate cover message with probability
let messages = self.network_interface.receive_messages();
for message in messages {
println!(">>>>> Node {}, message: {message:?}", self.id);
blend_sender.send(message.into_payload().0).unwrap();
}
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
// Proceed message blend
if let Poll::Ready(Some(msg)) = pin::pin!(blend_messages).poll_next(&mut cx) {
match msg {