add persistent transmission

This commit is contained in:
Youngjoon Lee 2024-11-06 15:03:40 +07:00
parent af4c7e7ada
commit ae7f5e437a
No known key found for this signature in database
GPG Key ID: 25CA11F37F095E5D
6 changed files with 1014 additions and 39 deletions

File diff suppressed because it is too large Load Diff

View File

@ -26,7 +26,14 @@ humantime = "2.1"
humantime-serde = "1"
once_cell = "1.17"
parking_lot = "0.12"
polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"], optional = true }
polars = { version = "0.27", features = [
"serde",
"object",
"json",
"csv-file",
"parquet",
"dtype-struct",
], optional = true }
rand = { version = "0.8", features = ["small_rng"] }
rayon = "1.7"
scopeguard = "1"
@ -34,8 +41,18 @@ serde = { version = "1.0", features = ["derive", "rc"] }
serde_with = "2.3"
serde_json = "1.0"
thiserror = "1"
tracing = { version = "0.1", default-features = false, features = ["log", "attributes"] }
tracing-subscriber = { version = "0.3", features = ["json", "env-filter", "tracing-log"]}
tracing = { version = "0.1", default-features = false, features = [
"log",
"attributes",
] }
tracing-subscriber = { version = "0.3", features = [
"json",
"env-filter",
"tracing-log",
] }
nomos-mix = { git = "https://github.com/logos-co/nomos-node", rev = "9b29c17", package = "nomos-mix" }
nomos-mix-message = { git = "https://github.com/logos-co/nomos-node", rev = "9b29c17", package = "nomos-mix-message" }
rand_chacha = "0.3.1"
[target.'cfg(target_arch = "wasm32")'.dependencies]
getrandom = { version = "0.2", features = ["js"] }

View File

@ -7,6 +7,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::Ok;
use clap::Parser;
use crossbeam::channel;
use nomos_mix::persistent_transmission::PersistentTransmissionSettings;
use nomos_simulations_network_runner::network::behaviour::create_behaviours;
use nomos_simulations_network_runner::network::regions::{create_regions, RegionsData};
use nomos_simulations_network_runner::network::{InMemoryNetworkInterface, Network};
@ -94,6 +95,11 @@ impl SimulationApp {
.filter(|&id| id != &node_id)
.copied()
.choose_multiple(&mut rng, 3),
seed: 0,
persistent_transmission: PersistentTransmissionSettings {
max_emission_frequency: 1.0,
drop_message_probability: 0.5,
},
},
)
})

View File

@ -1,15 +1,30 @@
pub mod state;
mod step_scheduler;
mod stream_wrapper;
use super::{Node, NodeId};
use crate::network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize};
use crossbeam::channel;
use futures::Stream;
use nomos_mix::persistent_transmission::{
PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream,
};
use nomos_mix_message::mock::MockMixMessage;
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use serde::Deserialize;
use state::MixnodeState;
use std::time::Duration;
use std::{
pin::{self},
task::Poll,
time::Duration,
};
use step_scheduler::Interval;
use stream_wrapper::CrossbeamReceiverStream;
#[derive(Debug, Clone)]
pub enum MixMessage {
Dummy(String),
Dummy(Vec<u8>),
}
impl PayloadSize for MixMessage {
@ -21,6 +36,8 @@ impl PayloadSize for MixMessage {
#[derive(Clone, Default, Deserialize)]
pub struct MixnodeSettings {
pub connected_peers: Vec<NodeId>,
pub seed: u64,
pub persistent_transmission: PersistentTransmissionSettings,
}
/// This node implementation only used for testing different streaming implementation purposes.
@ -29,6 +46,15 @@ pub struct MixNode {
state: MixnodeState,
settings: MixnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>,
persistent_sender: channel::Sender<Vec<u8>>,
update_time_sender: channel::Sender<Duration>,
persistent_transmission_messages: PersistentTransmissionStream<
CrossbeamReceiverStream<Vec<u8>>,
ChaCha12Rng,
MockMixMessage,
Interval,
>,
}
impl MixNode {
@ -37,15 +63,41 @@ impl MixNode {
settings: MixnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>,
) -> Self {
let state = MixnodeState {
node_id: id,
mock_counter: 0,
step_id: 0,
};
let (persistent_sender, persistent_receiver) = channel::unbounded();
let (update_time_sender, update_time_receiver) = channel::unbounded();
let persistent_transmission_messages = CrossbeamReceiverStream::new(persistent_receiver)
.persistent_transmission(
settings.persistent_transmission,
ChaCha12Rng::seed_from_u64(settings.seed),
Interval::new(
Duration::from_secs_f64(
1.0 / settings.persistent_transmission.max_emission_frequency,
),
update_time_receiver,
),
);
Self {
id,
network_interface,
settings,
state: MixnodeState {
node_id: id,
mock_counter: 0,
step_id: 0,
},
state,
persistent_sender,
update_time_sender,
persistent_transmission_messages,
}
}
fn forward(&self, message: MixMessage) {
for node_id in self.settings.connected_peers.iter() {
self.network_interface
.send_message(*node_id, message.clone())
}
}
}
@ -63,20 +115,31 @@ impl Node for MixNode {
&self.state
}
fn step(&mut self, _: Duration) {
fn step(&mut self, elapsed: Duration) {
let Self {
update_time_sender,
persistent_transmission_messages,
..
} = self;
let messages = self.network_interface.receive_messages();
for message in messages {
println!(">>>>> Node {}, message: {message:?}", self.id);
let MixMessage::Dummy(msg) = message.into_payload();
self.persistent_sender.send(msg).unwrap();
}
self.state.step_id += 1;
self.state.mock_counter += 1;
for node_id in self.settings.connected_peers.iter() {
self.network_interface.send_message(
*node_id,
MixMessage::Dummy(format!("Hello from node: {}", self.id)),
)
update_time_sender.send(elapsed).unwrap();
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
if let Poll::Ready(Some(msg)) =
pin::pin!(persistent_transmission_messages).poll_next(&mut cx)
{
self.forward(MixMessage::Dummy(msg));
}
}
}

View File

@ -1,18 +1,25 @@
use chrono::format::Item;
use crossbeam::channel;
use futures::Stream;
use rand::RngCore;
use std::pin::Pin;
use std::sync::mpsc;
use std::task::{Context, Poll};
use std::time::Duration;
struct Interval {
pub struct Interval {
duration: Duration,
current_elapsed: Duration,
update_time: mpsc::Receiver<Duration>,
update_time: channel::Receiver<Duration>,
}
impl Interval {
pub fn new(duration: Duration, update_time: channel::Receiver<Duration>) -> Self {
Self {
duration,
current_elapsed: Duration::from_secs(0),
update_time,
}
}
pub fn update(&mut self, elapsed: Duration) -> bool {
self.current_elapsed += elapsed;
if self.current_elapsed >= self.duration {
@ -41,13 +48,13 @@ struct TemporalRelease {
random_sleeps: Box<dyn Iterator<Item = Duration>>,
elapsed: Duration,
current_sleep: Duration,
update_time: mpsc::Receiver<Duration>,
update_time: channel::Receiver<Duration>,
}
impl TemporalRelease {
pub fn new<Rng: RngCore + 'static>(
mut rng: Rng,
update_time: mpsc::Receiver<Duration>,
update_time: channel::Receiver<Duration>,
(min_delay, max_delay): (u64, u64),
) -> Self {
let mut random_sleeps = Box::new(std::iter::repeat_with(move || {

View File

@ -0,0 +1,29 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
use crossbeam::channel;
use futures::Stream;
pub struct CrossbeamReceiverStream<T> {
receiver: channel::Receiver<T>,
}
impl<T> CrossbeamReceiverStream<T> {
pub fn new(receiver: channel::Receiver<T>) -> Self {
Self { receiver }
}
}
impl<T> Stream for CrossbeamReceiverStream<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.receiver.try_recv() {
Ok(item) => Poll::Ready(Some(item)),
Err(channel::TryRecvError::Empty) => Poll::Pending,
Err(channel::TryRecvError::Disconnected) => Poll::Ready(None),
}
}
}