Network interface in the mixnode

This commit is contained in:
Gusto 2024-11-05 13:49:46 +02:00
parent 92a77fdff7
commit f63db0e46f
No known key found for this signature in database
6 changed files with 68 additions and 30 deletions

View File

@ -6,13 +6,14 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
// crates // crates
use anyhow::Ok; use anyhow::Ok;
use clap::Parser; use clap::Parser;
use crossbeam::channel;
use nomos_simulations_network_runner::network::behaviour::create_behaviours; use nomos_simulations_network_runner::network::behaviour::create_behaviours;
use nomos_simulations_network_runner::network::regions::{create_regions, RegionsData}; use nomos_simulations_network_runner::network::regions::{create_regions, RegionsData};
use nomos_simulations_network_runner::network::Network; use nomos_simulations_network_runner::network::{InMemoryNetworkInterface, Network};
use nomos_simulations_network_runner::node::mix::{ use nomos_simulations_network_runner::node::mix::{
MixMessage, MixNode, MixNodeState, MixnodeSettings, MixMessage, MixNode, MixNodeState, MixnodeSettings,
}; };
use nomos_simulations_network_runner::node::{Node, NodeId, NodeIdExt}; use nomos_simulations_network_runner::node::{NodeId, NodeIdExt};
use nomos_simulations_network_runner::output_processors::{OutData, Record}; use nomos_simulations_network_runner::output_processors::{OutData, Record};
use nomos_simulations_network_runner::runner::{BoxedNode, SimulationRunnerHandle}; use nomos_simulations_network_runner::runner::{BoxedNode, SimulationRunnerHandle};
#[cfg(feature = "polars")] #[cfg(feature = "polars")]
@ -25,7 +26,6 @@ use rand::prelude::IteratorRandom;
use rand::rngs::SmallRng; use rand::rngs::SmallRng;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use rand::SeedableRng; use rand::SeedableRng;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
// internal // internal
@ -46,8 +46,6 @@ pub struct SimulationApp {
#[clap(long, default_value = "stdout")] #[clap(long, default_value = "stdout")]
log_to: log::LogOutput, log_to: log::LogOutput,
#[clap(long)] #[clap(long)]
dump_overlay_info: bool,
#[clap(long)]
no_netcap: bool, no_netcap: bool,
} }
@ -58,7 +56,6 @@ impl SimulationApp {
stream_type, stream_type,
log_format: _, log_format: _,
log_to: _, log_to: _,
dump_overlay_info,
no_netcap, no_netcap,
} = self; } = self;
let simulation_settings: SimulationSettings = load_json_from_file(&input_settings)?; let simulation_settings: SimulationSettings = load_json_from_file(&input_settings)?;
@ -82,23 +79,16 @@ impl SimulationApp {
let ids = node_ids.clone(); let ids = node_ids.clone();
let network = Arc::new(Mutex::new(Network::<MixMessage>::new(regions_data, seed))); let network = Arc::new(Mutex::new(Network::<MixMessage>::new(regions_data, seed)));
// if dump_overlay_info {
// dump_json_to_file(
// Path::new("overlay_info.json"),
// &overlay_node::overlay_info(
// node_ids.clone(),
// node_ids.first().copied().unwrap(),
// &simulation_settings.overlay_settings,
// ),
// )?;
// }
let nodes: Vec<_> = node_ids let nodes: Vec<_> = node_ids
.iter() .iter()
.copied() .copied()
.map(|node_id| { .map(|node_id| {
let mut network = network.lock();
create_boxed_mixnode( create_boxed_mixnode(
node_id, node_id,
&mut network,
simulation_settings.clone(),
no_netcap,
MixnodeSettings { MixnodeSettings {
connected_peers: ids connected_peers: ids
.iter() .iter()
@ -119,9 +109,39 @@ impl SimulationApp {
fn create_boxed_mixnode( fn create_boxed_mixnode(
node_id: NodeId, node_id: NodeId,
settings: MixnodeSettings, network: &mut Network<MixMessage>,
simulation_settings: SimulationSettings,
no_netcap: bool,
mixnode_settings: MixnodeSettings,
) -> BoxedNode<MixnodeSettings, MixNodeState> { ) -> BoxedNode<MixnodeSettings, MixNodeState> {
Box::new(MixNode::new(node_id, settings)) let (node_message_broadcast_sender, node_message_broadcast_receiver) = channel::unbounded();
let (node_message_sender, node_message_receiver) = channel::unbounded();
// Dividing milliseconds in second by milliseconds in the step.
let step_time_as_second_fraction =
simulation_settings.step_time.subsec_millis() as f32 / 1_000_000_f32;
let capacity_bps = if no_netcap {
None
} else {
simulation_settings
.node_settings
.network_capacity_kbps
.map(|c| (c as f32 * 1024.0 * step_time_as_second_fraction) as u32)
};
let network_message_receiver = {
network.connect(
node_id,
capacity_bps,
node_message_receiver,
node_message_broadcast_receiver,
)
};
let network_interface = InMemoryNetworkInterface::new(
node_id,
node_message_broadcast_sender,
node_message_sender,
network_message_receiver,
);
Box::new(MixNode::new(node_id, mixnode_settings, network_interface))
} }
fn run<M: std::fmt::Debug, S, T>( fn run<M: std::fmt::Debug, S, T>(
@ -190,11 +210,6 @@ fn load_json_from_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<T> {
Ok(serde_json::from_reader(f)?) Ok(serde_json::from_reader(f)?)
} }
fn dump_json_to_file<T: Serialize>(path: &Path, data: &T) -> anyhow::Result<()> {
let f = File::create(path).map_err(Box::new)?;
Ok(serde_json::to_writer(f, data)?)
}
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
let app: SimulationApp = SimulationApp::parse(); let app: SimulationApp = SimulationApp::parse();
log::config_tracing(app.log_format, &app.log_to); log::config_tracing(app.log_format, &app.log_to);

View File

@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::time::Duration; use std::time::Duration;
use crate::network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize};
use super::{Node, NodeId}; use super::{Node, NodeId};
#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)] #[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)]
@ -13,6 +15,12 @@ pub enum MixMessage {
Dummy(String), Dummy(String),
} }
impl PayloadSize for MixMessage {
fn size_bytes(&self) -> u32 {
todo!()
}
}
pub struct MixnodeSettings { pub struct MixnodeSettings {
pub connected_peers: Vec<NodeId>, pub connected_peers: Vec<NodeId>,
} }
@ -21,15 +29,21 @@ pub struct MixnodeSettings {
pub struct MixNode { pub struct MixNode {
id: NodeId, id: NodeId,
state: MixNodeState, state: MixNodeState,
settings: MixnodeSettings, _settings: MixnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>,
} }
impl MixNode { impl MixNode {
pub fn new(id: NodeId, settings: MixnodeSettings) -> Self { pub fn new(
id: NodeId,
settings: MixnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>,
) -> Self {
Self { Self {
id, id,
network_interface,
_settings: settings,
state: MixNodeState::default(), state: MixNodeState::default(),
settings,
} }
} }
} }
@ -48,6 +62,12 @@ impl Node for MixNode {
} }
fn step(&mut self, _: Duration) { fn step(&mut self, _: Duration) {
todo!() let _messages = self.network_interface.receive_messages();
// Do stuff on the messages;
// Network interface can be passed into the functions for outputting the messages:
// ```rust
// self.network_interface.send_message(receiving_node_id, payload);
// ```
} }
} }

View File

@ -4,7 +4,6 @@ pub mod mix;
// std // std
use std::{ use std::{
collections::HashMap,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
sync::Arc, sync::Arc,
time::Duration, time::Duration,

View File

@ -166,6 +166,7 @@ where
match self.runner_settings.clone() { match self.runner_settings.clone() {
RunnerSettings::Sync => sync_runner::simulate(self, step_time), RunnerSettings::Sync => sync_runner::simulate(self, step_time),
RunnerSettings::Async { chunks } => async_runner::simulate(self, chunks, step_time),
} }
} }
} }

View File

@ -9,6 +9,9 @@ use serde::{Deserialize, Serialize};
pub enum RunnerSettings { pub enum RunnerSettings {
#[default] #[default]
Sync, Sync,
Async {
chunks: usize,
},
} }
#[derive(Clone, Debug, Default, Serialize, Deserialize)] #[derive(Clone, Debug, Default, Serialize, Deserialize)]

View File

@ -11,7 +11,7 @@ pub struct MaxViewWard {
impl<S, T> SimulationWard<S, T> for MaxViewWard { impl<S, T> SimulationWard<S, T> for MaxViewWard {
type SimulationState = SimulationState<S, T>; type SimulationState = SimulationState<S, T>;
fn analyze(&mut self, state: &Self::SimulationState) -> bool { fn analyze(&mut self, _state: &Self::SimulationState) -> bool {
// state.nodes.read().iter(); // state.nodes.read().iter();
//.all(|n| n.current_view() >= self.max_count) //.all(|n| n.current_view() >= self.max_count)
todo!() todo!()