Empty mixnode in sim binary (#16)

* Empty mixnode in sim binary

* Add mixnode settings

* Network interface in the mixnode

* Runnable empty mixnode sim

* Use network interface and add readme

---------

Co-authored-by: Daniel Sanchez Quiros <sanchez.quiros.daniel@gmail.com>
This commit is contained in:
gusto 2024-11-06 04:40:38 +02:00 committed by GitHub
parent c1b6649bd2
commit 20f23f09ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 165 additions and 66 deletions

8
network-runner/README.md Normal file
View File

@ -0,0 +1,8 @@
# Network Simulator
## Running simulations
To run the simulation use this command line:
```bash
cargo run -- --input-settings config/mixnode.json
```

View File

@ -17,9 +17,6 @@
"asia": 0.3
}
},
"overlay_settings": {
"number_of_committees": 3
},
"node_settings": {
"timeout": "1000ms"
},
@ -28,25 +25,7 @@
"stream_settings": {
"path": "test.csv"
},
"node_count": 3000,
"views_count": 3,
"leaders_count": 1,
"node_count": 3,
"seed": 0,
"wards": [
{"max_view": 1}
],
"record_settings": {
"node_id": true,
"current_view": true,
"highest_voted_view": true,
"local_high_qc": true,
"safe_blocks": true,
"last_view_timeout_qc": true,
"latest_committed_block": true,
"latest_committed_view": true,
"root_committee": true,
"parent_committee": true,
"child_committees": true,
"committed_blocks": true
}
"record_settings": {}
}

View File

@ -6,10 +6,14 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
// crates
use anyhow::Ok;
use clap::Parser;
use crossbeam::channel;
use nomos_simulations_network_runner::network::behaviour::create_behaviours;
use nomos_simulations_network_runner::network::regions::{create_regions, RegionsData};
use nomos_simulations_network_runner::network::Network;
use nomos_simulations_network_runner::node::NodeId;
use nomos_simulations_network_runner::network::{InMemoryNetworkInterface, Network};
use nomos_simulations_network_runner::node::mix::{
MixMessage, MixNode, MixNodeState, MixnodeSettings,
};
use nomos_simulations_network_runner::node::{NodeId, NodeIdExt};
use nomos_simulations_network_runner::output_processors::{OutData, Record};
use nomos_simulations_network_runner::runner::{BoxedNode, SimulationRunnerHandle};
#[cfg(feature = "polars")]
@ -18,10 +22,10 @@ use nomos_simulations_network_runner::streaming::{
io::IOSubscriber, naive::NaiveSubscriber, StreamType,
};
use parking_lot::Mutex;
use rand::prelude::IteratorRandom;
use rand::rngs::SmallRng;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use serde::de::DeserializeOwned;
use serde::Serialize;
// internal
@ -42,8 +46,6 @@ pub struct SimulationApp {
#[clap(long, default_value = "stdout")]
log_to: log::LogOutput,
#[clap(long)]
dump_overlay_info: bool,
#[clap(long)]
no_netcap: bool,
}
@ -54,7 +56,6 @@ impl SimulationApp {
stream_type,
log_format: _,
log_to: _,
dump_overlay_info,
no_netcap,
} = self;
let simulation_settings: SimulationSettings = load_json_from_file(&input_settings)?;
@ -67,7 +68,7 @@ impl SimulationApp {
});
let mut rng = SmallRng::seed_from_u64(seed);
let mut node_ids: Vec<NodeId> = (0..simulation_settings.node_count)
.map(|_| todo!())
.map(NodeId::from_index)
.collect();
node_ids.shuffle(&mut rng);
@ -76,40 +77,81 @@ impl SimulationApp {
let regions_data = RegionsData::new(regions, behaviours);
let ids = node_ids.clone();
let network = Arc::new(Mutex::new(Network::<()>::new(regions_data, seed)));
let network = Arc::new(Mutex::new(Network::<MixMessage>::new(regions_data, seed)));
// if dump_overlay_info {
// dump_json_to_file(
// Path::new("overlay_info.json"),
// &overlay_node::overlay_info(
// node_ids.clone(),
// node_ids.first().copied().unwrap(),
// &simulation_settings.overlay_settings,
// ),
// )?;
// }
// let nodes: Vec<BoxedNode<_, _>> = node_ids
// .par_iter()
// .copied()
// .map(|node_id| todo!())
// .collect();
// let network = Arc::try_unwrap(network)
// .expect("network is not used anywhere else")
// .into_inner();
// run::<_, _, _>(network, nodes, simulation_settings, stream_type)?;
let nodes: Vec<_> = node_ids
.iter()
.copied()
.map(|node_id| {
let mut network = network.lock();
create_boxed_mixnode(
node_id,
&mut network,
simulation_settings.clone(),
no_netcap,
MixnodeSettings {
connected_peers: ids
.iter()
.filter(|&id| id != &node_id)
.copied()
.choose_multiple(&mut rng, 3),
},
)
})
.collect();
let network = Arc::try_unwrap(network)
.expect("network is not used anywhere else")
.into_inner();
run::<_, _, _>(network, nodes, simulation_settings, stream_type)?;
Ok(())
}
}
fn run<M: std::fmt::Debug, S, T>(
fn create_boxed_mixnode(
node_id: NodeId,
network: &mut Network<MixMessage>,
simulation_settings: SimulationSettings,
no_netcap: bool,
mixnode_settings: MixnodeSettings,
) -> BoxedNode<MixnodeSettings, MixNodeState> {
let (node_message_broadcast_sender, node_message_broadcast_receiver) = channel::unbounded();
let (node_message_sender, node_message_receiver) = channel::unbounded();
// Dividing milliseconds in second by milliseconds in the step.
let step_time_as_second_fraction =
simulation_settings.step_time.subsec_millis() as f32 / 1_000_000_f32;
let capacity_bps = if no_netcap {
None
} else {
simulation_settings
.node_settings
.network_capacity_kbps
.map(|c| (c as f32 * 1024.0 * step_time_as_second_fraction) as u32)
};
let network_message_receiver = {
network.connect(
node_id,
capacity_bps,
node_message_receiver,
node_message_broadcast_receiver,
)
};
let network_interface = InMemoryNetworkInterface::new(
node_id,
node_message_broadcast_sender,
node_message_sender,
network_message_receiver,
);
Box::new(MixNode::new(node_id, mixnode_settings, network_interface))
}
fn run<M, S, T>(
network: Network<M>,
nodes: Vec<BoxedNode<S, T>>,
settings: SimulationSettings,
stream_type: Option<StreamType>,
) -> anyhow::Result<()>
where
M: Clone + Send + Sync + 'static,
M: std::fmt::Debug + Clone + Send + Sync + 'static,
S: 'static,
T: Serialize + Clone + 'static,
{
@ -168,11 +210,6 @@ fn load_json_from_file<T: DeserializeOwned>(path: &Path) -> anyhow::Result<T> {
Ok(serde_json::from_reader(f)?)
}
fn dump_json_to_file<T: Serialize>(path: &Path, data: &T) -> anyhow::Result<()> {
let f = File::create(path).map_err(Box::new)?;
Ok(serde_json::to_writer(f, data)?)
}
fn main() -> anyhow::Result<()> {
let app: SimulationApp = SimulationApp::parse();
log::config_tracing(app.log_format, &app.log_to);

View File

@ -0,0 +1,77 @@
use super::{Node, NodeId};
use crate::network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize};
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)]
pub struct MixNodeState {
pub mock_counter: usize,
}
#[derive(Debug, Clone)]
pub enum MixMessage {
Dummy(String),
}
impl PayloadSize for MixMessage {
fn size_bytes(&self) -> u32 {
2208
}
}
pub struct MixnodeSettings {
pub connected_peers: Vec<NodeId>,
}
/// This node implementation only used for testing different streaming implementation purposes.
pub struct MixNode {
id: NodeId,
state: MixNodeState,
settings: MixnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>,
}
impl MixNode {
pub fn new(
id: NodeId,
settings: MixnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>,
) -> Self {
Self {
id,
network_interface,
settings,
state: MixNodeState::default(),
}
}
}
impl Node for MixNode {
type Settings = MixnodeSettings;
type State = MixNodeState;
fn id(&self) -> NodeId {
self.id
}
fn state(&self) -> &Self::State {
&self.state
}
fn step(&mut self, _: Duration) {
let messages = self.network_interface.receive_messages();
for message in messages {
println!(">>>>> Node {}, message: {message:?}", self.id);
}
self.state.mock_counter += 1;
for node_id in self.settings.connected_peers.iter() {
self.network_interface.send_message(
*node_id,
MixMessage::Dummy(format!("Hello from node: {}", self.id)),
)
}
}
}

View File

@ -1,9 +1,9 @@
#[cfg(test)]
pub mod dummy_streaming;
pub mod mix;
// std
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
sync::Arc,
time::Duration,

View File

@ -134,14 +134,11 @@ where
let nodes = Arc::new(RwLock::new(nodes));
let SimulationSettings {
wards,
overlay_settings: _,
node_settings: _,
runner_settings,
stream_settings: _,
node_count: _,
seed: _,
views_count: _,
leaders_count: _,
network_settings: _,
step_time,
record_settings: _,
@ -166,6 +163,7 @@ where
match self.runner_settings.clone() {
RunnerSettings::Sync => sync_runner::simulate(self, step_time),
RunnerSettings::Async { chunks } => async_runner::simulate(self, chunks, step_time),
}
}
}

View File

@ -9,6 +9,9 @@ use serde::{Deserialize, Serialize};
pub enum RunnerSettings {
#[default]
Sync,
Async {
chunks: usize,
},
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
@ -44,7 +47,6 @@ pub struct SimulationSettings {
#[serde(default)]
pub record_settings: BTreeMap<String, bool>,
pub network_settings: NetworkSettings,
pub overlay_settings: OverlaySettings,
pub node_settings: NodeSettings,
#[serde(default)]
pub runner_settings: RunnerSettings,
@ -52,7 +54,5 @@ pub struct SimulationSettings {
#[serde(with = "humantime_serde")]
pub step_time: std::time::Duration,
pub node_count: usize,
pub views_count: usize,
pub leaders_count: usize,
pub seed: Option<u64>,
}

View File

@ -11,7 +11,7 @@ pub struct MaxViewWard {
impl<S, T> SimulationWard<S, T> for MaxViewWard {
type SimulationState = SimulationState<S, T>;
fn analyze(&mut self, state: &Self::SimulationState) -> bool {
fn analyze(&mut self, _state: &Self::SimulationState) -> bool {
// state.nodes.read().iter();
//.all(|n| n.current_view() >= self.max_count)
todo!()