From da60d8fc955f11a5eedfe84f246f6e80260e51df Mon Sep 17 00:00:00 2001 From: gusto Date: Thu, 10 Aug 2023 12:56:02 +0300 Subject: [PATCH] Parallel node init (#300) --- simulations/src/bin/app/main.rs | 32 +++++++++++++++++++++----------- simulations/src/network/mod.rs | 2 ++ 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/simulations/src/bin/app/main.rs b/simulations/src/bin/app/main.rs index 508f0bb2..d9fea65e 100644 --- a/simulations/src/bin/app/main.rs +++ b/simulations/src/bin/app/main.rs @@ -1,22 +1,25 @@ // std -use anyhow::Ok; -use serde::Serialize; -use simulations::node::carnot::{CarnotSettings, CarnotState}; use std::fs::File; use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; // crates +use anyhow::Ok; use clap::Parser; use consensus_engine::overlay::RandomBeaconState; use consensus_engine::{Block, View}; use crossbeam::channel; +use parking_lot::Mutex; use rand::rngs::SmallRng; use rand::seq::SliceRandom; use rand::SeedableRng; +use rayon::prelude::{IntoParallelRefIterator, ParallelIterator}; use serde::de::DeserializeOwned; +use serde::Serialize; use simulations::network::behaviour::create_behaviours; use simulations::network::regions::{create_regions, RegionsData}; use simulations::network::{InMemoryNetworkInterface, Network}; +use simulations::node::carnot::{CarnotSettings, CarnotState}; use simulations::node::{NodeId, NodeIdExt}; use simulations::output_processors::Record; use simulations::runner::{BoxedNode, SimulationRunnerHandle}; @@ -72,9 +75,9 @@ impl SimulationApp { let regions_data = RegionsData::new(regions, behaviours); let ids = node_ids.clone(); - let mut network = Network::new(regions_data, seed); + let network = Arc::new(Mutex::new(Network::new(regions_data, seed))); let nodes: Vec> = node_ids - .iter() + .par_iter() .copied() .map(|node_id| { let (node_message_broadcast_sender, node_message_broadcast_receiver) = @@ -86,12 +89,15 @@ impl SimulationApp { let capacity_bps = simulation_settings.node_settings.network_capacity_kbps as f32 * 1024.0 * step_time_as_second_fraction; - let network_message_receiver = network.connect( - node_id, - capacity_bps as u32, - node_message_receiver, - node_message_broadcast_receiver, - ); + let network_message_receiver = { + let mut network = network.lock(); + network.connect( + node_id, + capacity_bps as u32, + node_message_receiver, + node_message_broadcast_receiver, + ) + }; let network_interface = InMemoryNetworkInterface::new( node_id, node_message_broadcast_sender, @@ -111,6 +117,7 @@ impl SimulationApp { entropy: Box::new([0; 32]), }, ); + let mut rng = SmallRng::seed_from_u64(seed); overlay_node::to_overlay_node( node_id, nodes, @@ -122,6 +129,9 @@ impl SimulationApp { ) }) .collect(); + let network = Arc::try_unwrap(network) + .expect("network is not used anywhere else") + .into_inner(); run::<_, _, _>(network, nodes, simulation_settings, stream_type)?; Ok(()) } diff --git a/simulations/src/network/mod.rs b/simulations/src/network/mod.rs index 6bc62a7c..6ef5f451 100644 --- a/simulations/src/network/mod.rs +++ b/simulations/src/network/mod.rs @@ -107,6 +107,7 @@ mod network_behaviors_serde { } /// Represents node network capacity and current load in bytes. +#[derive(Debug)] struct NodeNetworkCapacity { capacity_bps: u32, current_load: Mutex, @@ -143,6 +144,7 @@ impl NodeNetworkCapacity { } } +#[derive(Debug)] pub struct Network { pub regions: regions::RegionsData, network_time: NetworkTime,