Parallel node init (#300)

This commit is contained in:
gusto 2023-08-10 12:56:02 +03:00 committed by GitHub
parent 78c6566d8a
commit da60d8fc95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 11 deletions

View File

@ -1,22 +1,25 @@
// std // std
use anyhow::Ok;
use serde::Serialize;
use simulations::node::carnot::{CarnotSettings, CarnotState};
use std::fs::File; use std::fs::File;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
// crates // crates
use anyhow::Ok;
use clap::Parser; use clap::Parser;
use consensus_engine::overlay::RandomBeaconState; use consensus_engine::overlay::RandomBeaconState;
use consensus_engine::{Block, View}; use consensus_engine::{Block, View};
use crossbeam::channel; use crossbeam::channel;
use parking_lot::Mutex;
use rand::rngs::SmallRng; use rand::rngs::SmallRng;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use rand::SeedableRng; use rand::SeedableRng;
use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize;
use simulations::network::behaviour::create_behaviours; use simulations::network::behaviour::create_behaviours;
use simulations::network::regions::{create_regions, RegionsData}; use simulations::network::regions::{create_regions, RegionsData};
use simulations::network::{InMemoryNetworkInterface, Network}; use simulations::network::{InMemoryNetworkInterface, Network};
use simulations::node::carnot::{CarnotSettings, CarnotState};
use simulations::node::{NodeId, NodeIdExt}; use simulations::node::{NodeId, NodeIdExt};
use simulations::output_processors::Record; use simulations::output_processors::Record;
use simulations::runner::{BoxedNode, SimulationRunnerHandle}; use simulations::runner::{BoxedNode, SimulationRunnerHandle};
@ -72,9 +75,9 @@ impl SimulationApp {
let regions_data = RegionsData::new(regions, behaviours); let regions_data = RegionsData::new(regions, behaviours);
let ids = node_ids.clone(); let ids = node_ids.clone();
let mut network = Network::new(regions_data, seed); let network = Arc::new(Mutex::new(Network::new(regions_data, seed)));
let nodes: Vec<BoxedNode<CarnotSettings, CarnotState>> = node_ids let nodes: Vec<BoxedNode<CarnotSettings, CarnotState>> = node_ids
.iter() .par_iter()
.copied() .copied()
.map(|node_id| { .map(|node_id| {
let (node_message_broadcast_sender, node_message_broadcast_receiver) = let (node_message_broadcast_sender, node_message_broadcast_receiver) =
@ -86,12 +89,15 @@ impl SimulationApp {
let capacity_bps = simulation_settings.node_settings.network_capacity_kbps as f32 let capacity_bps = simulation_settings.node_settings.network_capacity_kbps as f32
* 1024.0 * 1024.0
* step_time_as_second_fraction; * step_time_as_second_fraction;
let network_message_receiver = network.connect( let network_message_receiver = {
node_id, let mut network = network.lock();
capacity_bps as u32, network.connect(
node_message_receiver, node_id,
node_message_broadcast_receiver, capacity_bps as u32,
); node_message_receiver,
node_message_broadcast_receiver,
)
};
let network_interface = InMemoryNetworkInterface::new( let network_interface = InMemoryNetworkInterface::new(
node_id, node_id,
node_message_broadcast_sender, node_message_broadcast_sender,
@ -111,6 +117,7 @@ impl SimulationApp {
entropy: Box::new([0; 32]), entropy: Box::new([0; 32]),
}, },
); );
let mut rng = SmallRng::seed_from_u64(seed);
overlay_node::to_overlay_node( overlay_node::to_overlay_node(
node_id, node_id,
nodes, nodes,
@ -122,6 +129,9 @@ impl SimulationApp {
) )
}) })
.collect(); .collect();
let network = Arc::try_unwrap(network)
.expect("network is not used anywhere else")
.into_inner();
run::<_, _, _>(network, nodes, simulation_settings, stream_type)?; run::<_, _, _>(network, nodes, simulation_settings, stream_type)?;
Ok(()) Ok(())
} }

View File

@ -107,6 +107,7 @@ mod network_behaviors_serde {
} }
/// Represents node network capacity and current load in bytes. /// Represents node network capacity and current load in bytes.
#[derive(Debug)]
struct NodeNetworkCapacity { struct NodeNetworkCapacity {
capacity_bps: u32, capacity_bps: u32,
current_load: Mutex<u32>, current_load: Mutex<u32>,
@ -143,6 +144,7 @@ impl NodeNetworkCapacity {
} }
} }
#[derive(Debug)]
pub struct Network<M: std::fmt::Debug> { pub struct Network<M: std::fmt::Debug> {
pub regions: regions::RegionsData, pub regions: regions::RegionsData,
network_time: NetworkTime, network_time: NetworkTime,