mod async_runner; mod sync_runner; // std use std::sync::Arc; use std::time::Duration; use crate::output_processors::Record; // crates use crate::streaming::{ runtime_subscriber::RuntimeSubscriber, settings_subscriber::SettingsSubscriber, StreamProducer, Subscriber, SubscriberHandle, }; use crossbeam::channel::Sender; use parking_lot::RwLock; use rand::rngs::SmallRng; use rand::{RngCore, SeedableRng}; use rayon::prelude::*; use serde::Serialize; // internal use crate::network::Network; use crate::node::Node; use crate::settings::{RunnerSettings, SimulationSettings}; use crate::warding::{SimulationState, SimulationWard, Ward}; pub type BoxedNode = Box + Send + Sync>; pub struct SimulationRunnerHandle { producer: StreamProducer, stop_tx: Sender<()>, handle: std::thread::JoinHandle>, } impl SimulationRunnerHandle { pub fn stop_after(self, duration: Duration) -> anyhow::Result<()> { std::thread::sleep(duration); self.stop() } pub fn stop(&self) -> anyhow::Result<()> { if !self.handle.is_finished() { self.stop_tx.send(())?; self.shutdown()?; } Ok(()) } pub fn subscribe>( &self, settings: S::Settings, ) -> anyhow::Result> { self.producer.subscribe(settings) } pub fn is_finished(&self) -> bool { self.handle.is_finished() } pub fn shutdown(&self) -> anyhow::Result<()> { self.producer.stop() } pub fn join(self) -> anyhow::Result<()> { self.handle.join().expect("Join simulation thread") } } pub(crate) struct SimulationRunnerInner { network: Network, wards: Vec, rng: SmallRng, } impl SimulationRunnerInner where M: std::fmt::Debug + Send + Sync + Clone, { fn check_wards(&mut self, state: &SimulationState) -> bool { self.wards .par_iter_mut() .map(|ward| ward.analyze(state)) .any(|x| x) } fn step(&mut self, nodes: &mut [BoxedNode], elapsed: Duration) { self.network.dispatch_after(elapsed); nodes.par_iter_mut().for_each(|node| { node.step(elapsed); }); self.network.collect_messages(); } } /// Encapsulation solution for the simulations runner /// Holds the network state, the simulating nodes and the simulation settings. pub struct SimulationRunner { inner: SimulationRunnerInner, nodes: Arc>>>, runner_settings: RunnerSettings, producer: StreamProducer, step_time: Duration, } impl SimulationRunner where M: std::fmt::Debug + Clone + Send + Sync + 'static, R: Record + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send + Sync + 'static, S: 'static, T: Serialize + Clone + 'static, { pub fn new( network: Network, nodes: Vec>, producer: StreamProducer, mut settings: SimulationSettings, ) -> anyhow::Result { let seed = settings .seed .unwrap_or_else(|| rand::thread_rng().next_u64()); settings .seed .get_or_insert_with(|| rand::thread_rng().next_u64()); // Store the settings to the producer so that we can collect them later producer.send(R::from(settings.clone()))?; let rng = SmallRng::seed_from_u64(seed); let nodes = Arc::new(RwLock::new(nodes)); let SimulationSettings { wards, node_settings: _, runner_settings, stream_settings: _, node_count: _, seed: _, network_settings: _, step_time, record_settings: _, } = settings; Ok(Self { runner_settings, inner: SimulationRunnerInner { network, rng, wards, }, nodes, producer, step_time, }) } pub fn simulate(self) -> anyhow::Result> { // init the start time let _ = *crate::START_TIME; let step_time = self.step_time; match self.runner_settings.clone() { RunnerSettings::Sync => sync_runner::simulate(self, step_time), RunnerSettings::Async { chunks } => async_runner::simulate(self, chunks, step_time), } } } impl SimulationRunner where M: std::fmt::Debug + Clone + Send + Sync + 'static, R: Record + serde::Serialize + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + Send + Sync + 'static, S: 'static, T: Serialize + Clone + 'static, { pub fn simulate_and_subscribe( self, settings: B::Settings, ) -> anyhow::Result> where B: Subscriber + Send + Sync + 'static, { let handle = self.simulate()?; let mut data_subscriber_handle = handle.subscribe::(settings)?; let mut runtime_subscriber_handle = handle.subscribe::>(Default::default())?; let mut settings_subscriber_handle = handle.subscribe::>(Default::default())?; std::thread::scope(|s| { s.spawn(move || { data_subscriber_handle.run(); }); s.spawn(move || { runtime_subscriber_handle.run(); }); s.spawn(move || { settings_subscriber_handle.run(); }); }); Ok(handle) } }