diff --git a/simulations/Cargo.toml b/simulations/Cargo.toml index 7c9c407d..68946543 100644 --- a/simulations/Cargo.toml +++ b/simulations/Cargo.toml @@ -13,6 +13,7 @@ crc32fast = "1.3" crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] } fixed-slice-deque = "0.1.0-beta2" nomos-core = { path = "../nomos-core" } +parking_lot = "0.12" polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"] } rand = { version = "0.8", features = ["small_rng"] } rayon = "1.7" diff --git a/simulations/src/bin/app.rs b/simulations/src/bin/app.rs index 70c5bdd7..eab07267 100644 --- a/simulations/src/bin/app.rs +++ b/simulations/src/bin/app.rs @@ -7,11 +7,12 @@ use simulations::streaming::polars::PolarsSubscriber; use std::collections::BTreeMap; use std::fs::File; use std::path::{Path, PathBuf}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; // crates use clap::Parser; use crossbeam::channel; +use parking_lot::RwLock; use rand::rngs::SmallRng; use rand::seq::SliceRandom; use rand::{Rng, SeedableRng}; diff --git a/simulations/src/node/dummy.rs b/simulations/src/node/dummy.rs index b6dcf2c4..9921e258 100644 --- a/simulations/src/node/dummy.rs +++ b/simulations/src/node/dummy.rs @@ -419,11 +419,12 @@ fn get_roles( mod tests { use std::{ collections::{BTreeMap, BTreeSet, HashMap}, - sync::{Arc, RwLock}, + sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; use crossbeam::channel; + use parking_lot::RwLock; use rand::{ rngs::{mock::StepRng, SmallRng}, Rng, SeedableRng, @@ -803,13 +804,13 @@ mod tests { let nodes = Arc::new(RwLock::new(nodes)); for _ in 0..9 { network.dispatch_after(Duration::from_millis(100)); - nodes.write().unwrap().par_iter_mut().for_each(|(_, node)| { + nodes.write().par_iter_mut().for_each(|(_, node)| { node.step(); }); network.collect_messages(); } - for (_, node) in nodes.read().unwrap().iter() { + for (_, node) in nodes.read().iter() { assert_eq!(node.current_view(), 2); } } diff --git a/simulations/src/node/mod.rs b/simulations/src/node/mod.rs index 83fcc3d6..81eec2a7 100644 --- a/simulations/src/node/mod.rs +++ b/simulations/src/node/mod.rs @@ -8,10 +8,11 @@ pub mod dummy_streaming; use std::{ collections::BTreeMap, ops::{Deref, DerefMut}, - sync::{Arc, RwLock}, + sync::Arc, time::Duration, }; // crates +use parking_lot::RwLock; use serde::{Deserialize, Serialize}; // internal use crate::overlay::{Layout, OverlaySettings, SimulationOverlay}; @@ -155,12 +156,12 @@ pub trait OverlayGetter { impl OverlayGetter for SharedState { fn get_view(&self, index: usize) -> Option { - let overlay_state = self.read().unwrap(); + let overlay_state = self.read(); overlay_state.overlays.get(&index).cloned() } fn get_all_nodes(&self) -> Vec { - let overlay_state = self.read().unwrap(); + let overlay_state = self.read(); overlay_state.all_nodes.clone() } } diff --git a/simulations/src/output_processors/mod.rs b/simulations/src/output_processors/mod.rs index 6956723c..25fb2707 100644 --- a/simulations/src/output_processors/mod.rs +++ b/simulations/src/output_processors/mod.rs @@ -22,17 +22,9 @@ where type Error = anyhow::Error; fn try_from(state: &crate::warding::SimulationState) -> Result { - serde_json::to_value( - state - .nodes - .read() - .expect("simulations: SimulationState panic when requiring a read lock") - .iter() - .map(N::state) - .collect::>(), - ) - .map(OutData::new) - .map_err(From::from) + serde_json::to_value(state.nodes.read().iter().map(N::state).collect::>()) + .map(OutData::new) + .map_err(From::from) } } diff --git a/simulations/src/runner/async_runner.rs b/simulations/src/runner/async_runner.rs index 27fd1656..1752e38e 100644 --- a/simulations/src/runner/async_runner.rs +++ b/simulations/src/runner/async_runner.rs @@ -27,13 +27,7 @@ where nodes: Arc::clone(&runner.nodes), }; - let mut node_ids: Vec = runner - .nodes - .read() - .expect("Read access to nodes vector") - .iter() - .map(N::id) - .collect(); + let mut node_ids: Vec = runner.nodes.read().iter().map(N::id).collect(); let inner_runner = runner.inner.clone(); let nodes = runner.nodes; @@ -47,13 +41,12 @@ where return Ok(()); } default => { - let mut inner_runner = inner_runner.write().expect("Write access to inner in async runner"); + let mut inner_runner = inner_runner.write(); node_ids.shuffle(&mut inner_runner.rng); for ids_chunk in node_ids.chunks(chunk_size) { let ids: HashSet = ids_chunk.iter().copied().collect(); nodes .write() - .expect("Write access to nodes vector") .par_iter_mut() .filter(|n| ids.contains(&n.id())) .for_each(N::step); diff --git a/simulations/src/runner/glauber_runner.rs b/simulations/src/runner/glauber_runner.rs index b6f49613..015608dd 100644 --- a/simulations/src/runner/glauber_runner.rs +++ b/simulations/src/runner/glauber_runner.rs @@ -31,17 +31,14 @@ where let inner_runner = runner.inner.clone(); let nodes = runner.nodes; - let nodes_remaining: BTreeSet = - (0..nodes.read().expect("Read access to nodes vector").len()) - .map(From::from) - .collect(); + let nodes_remaining: BTreeSet = (0..nodes.read().len()).map(From::from).collect(); let iterations: Vec<_> = (0..maximum_iterations).collect(); let (stop_tx, stop_rx) = bounded(1); let p = runner.producer.clone(); let p1 = runner.producer; let handle = std::thread::spawn(move || { - let mut inner_runner: std::sync::RwLockWriteGuard> = - inner_runner.write().expect("Locking runner"); + let mut inner_runner: parking_lot::RwLockWriteGuard> = + inner_runner.write(); 'main: for chunk in iterations.chunks(update_rate) { select! { @@ -57,7 +54,7 @@ where ); { - let mut shared_nodes = nodes.write().expect("Write access to nodes vector"); + let mut shared_nodes = nodes.write(); let node: &mut N = shared_nodes .get_mut(node_id.inner()) .expect("Node should be present"); diff --git a/simulations/src/runner/layered_runner.rs b/simulations/src/runner/layered_runner.rs index 699b15ae..f37755b7 100644 --- a/simulations/src/runner/layered_runner.rs +++ b/simulations/src/runner/layered_runner.rs @@ -81,7 +81,7 @@ where break; } default => { - let mut inner_runner = inner_runner.write().expect("Lock runner"); + let mut inner_runner = inner_runner.write(); let (group_index, node_id) = choose_random_layer_and_node_id(&mut inner_runner.rng, &distribution, &layers, &mut deque); @@ -89,7 +89,7 @@ where deque.get_mut(group_index).unwrap().remove(&node_id); { - let mut shared_nodes = nodes.write().expect("Write access to nodes vector"); + let mut shared_nodes = nodes.write(); let node: &mut N = shared_nodes .get_mut(node_id.inner()) .expect("Node should be present"); @@ -168,13 +168,7 @@ where // add a +1 so we always have let mut deque = FixedSliceDeque::new(gap + 1); // push first layer - let node_ids: BTreeSet = runner - .nodes - .write() - .expect("Single access to runner nodes") - .iter() - .map(|node| node.id()) - .collect(); + let node_ids: BTreeSet = runner.nodes.write().iter().map(|node| node.id()).collect(); deque.push_back(node_ids); // allocate default sets diff --git a/simulations/src/runner/mod.rs b/simulations/src/runner/mod.rs index 4e7e27f4..dc846ee9 100644 --- a/simulations/src/runner/mod.rs +++ b/simulations/src/runner/mod.rs @@ -4,12 +4,13 @@ mod layered_runner; mod sync_runner; // std -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; // crates use crate::streaming::{StreamProducer, Subscriber, SubscriberHandle}; use crossbeam::channel::Sender; +use parking_lot::RwLock; use rand::rngs::SmallRng; use rand::{RngCore, SeedableRng}; use rayon::prelude::*; diff --git a/simulations/src/runner/sync_runner.rs b/simulations/src/runner/sync_runner.rs index cf548799..ad6f1ec4 100644 --- a/simulations/src/runner/sync_runner.rs +++ b/simulations/src/runner/sync_runner.rs @@ -35,13 +35,13 @@ where return Ok(()); } default => { - let mut inner_runner = inner_runner.write().expect("Write access to inner simulation state"); + let mut inner_runner = inner_runner.write(); // we must use a code block to make sure once the step call is finished then the write lock will be released, because in Record::try_from(&state), // we need to call the read lock, if we do not release the write lock, // then dead lock will occur { - let mut nodes = nodes.write().expect("Write access to nodes vector"); + let mut nodes = nodes.write(); inner_runner.step(&mut nodes); } @@ -80,10 +80,11 @@ mod tests { streaming::StreamProducer, }; use crossbeam::channel; + use parking_lot::RwLock; use rand::rngs::mock::StepRng; use std::{ collections::{BTreeMap, HashMap}, - sync::{Arc, RwLock}, + sync::Arc, time::Duration, }; @@ -142,11 +143,11 @@ mod tests { let producer = StreamProducer::default(); let runner: SimulationRunner = SimulationRunner::new(network, nodes, producer, settings); - let mut nodes = runner.nodes.write().unwrap(); - runner.inner.write().unwrap().step(&mut nodes); + let mut nodes = runner.nodes.write(); + runner.inner.write().step(&mut nodes); drop(nodes); - let nodes = runner.nodes.read().unwrap(); + let nodes = runner.nodes.read(); for node in nodes.iter() { assert_eq!(node.current_view(), 0); } @@ -189,11 +190,11 @@ mod tests { let runner: SimulationRunner = SimulationRunner::new(network, nodes, Default::default(), settings); - let mut nodes = runner.nodes.write().unwrap(); - runner.inner.write().unwrap().step(&mut nodes); + let mut nodes = runner.nodes.write(); + runner.inner.write().step(&mut nodes); drop(nodes); - let nodes = runner.nodes.read().unwrap(); + let nodes = runner.nodes.read(); let state = nodes[1].state(); assert_eq!(state.message_count, 10); } diff --git a/simulations/src/streaming/io.rs b/simulations/src/streaming/io.rs index ef6ca755..356d46b2 100644 --- a/simulations/src/streaming/io.rs +++ b/simulations/src/streaming/io.rs @@ -1,10 +1,7 @@ -use std::{ - any::Any, - io::stdout, - sync::{Arc, Mutex}, -}; +use std::{any::Any, io::stdout, sync::Arc}; use super::{Receivers, StreamSettings, Subscriber}; +use parking_lot::Mutex; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Default, Deserialize)] @@ -98,13 +95,7 @@ where } fn sink(&self, state: Arc) -> anyhow::Result<()> { - serde_json::to_writer( - &mut *self - .writer - .lock() - .expect("fail to lock writer in io subscriber"), - &state, - )?; + serde_json::to_writer(&mut *self.writer.lock(), &state)?; Ok(()) } } @@ -135,7 +126,7 @@ mod tests { type Error = anyhow::Error; fn try_from(value: &SimulationState>) -> Result { - let nodes = value.nodes.read().expect("failed to read nodes"); + let nodes = value.nodes.read(); Ok(Self { states: nodes .iter() diff --git a/simulations/src/streaming/naive.rs b/simulations/src/streaming/naive.rs index e4275c59..0035a4f2 100644 --- a/simulations/src/streaming/naive.rs +++ b/simulations/src/streaming/naive.rs @@ -1,13 +1,13 @@ +use super::{Receivers, StreamSettings, Subscriber}; +use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; use std::{ fs::{File, OpenOptions}, io::Write, path::PathBuf, - sync::{Arc, Mutex}, + sync::Arc, }; -use super::{Receivers, StreamSettings, Subscriber}; -use serde::{Deserialize, Serialize}; - #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NaiveSettings { pub path: PathBuf, @@ -94,7 +94,7 @@ where } fn sink(&self, state: Arc) -> anyhow::Result<()> { - let mut file = self.file.lock().expect("failed to lock file"); + let mut file = self.file.lock(); serde_json::to_writer(&mut *file, &state)?; file.write_all(b",\n")?; Ok(()) @@ -131,7 +131,6 @@ mod tests { states: value .nodes .read() - .expect("failed to read nodes") .iter() .map(|node| (node.id(), node.current_view())) .collect(), diff --git a/simulations/src/streaming/polars.rs b/simulations/src/streaming/polars.rs index a961d73e..d5652730 100644 --- a/simulations/src/streaming/polars.rs +++ b/simulations/src/streaming/polars.rs @@ -1,3 +1,4 @@ +use parking_lot::Mutex; use polars::prelude::*; use serde::{Deserialize, Serialize}; use std::{ @@ -5,7 +6,6 @@ use std::{ io::Cursor, path::{Path, PathBuf}, str::FromStr, - sync::Mutex, }; use super::{Receivers, StreamSettings}; @@ -72,10 +72,7 @@ where R: Serialize, { fn persist(&self) -> anyhow::Result<()> { - let data = self - .data - .lock() - .expect("failed to lock data in PolarsSubscriber pesist"); + let data = self.data.lock(); let mut cursor = Cursor::new(Vec::new()); serde_json::to_writer(&mut cursor, &*data).expect("Dump data to json "); let mut data = JsonReader::new(cursor) @@ -137,10 +134,7 @@ where } fn sink(&self, state: Arc) -> anyhow::Result<()> { - self.data - .lock() - .expect("failed to lock data in PolarsSubscriber") - .push(state); + self.data.lock().push(state); Ok(()) } } diff --git a/simulations/src/warding/minmax.rs b/simulations/src/warding/minmax.rs index bfed2080..3151242b 100644 --- a/simulations/src/warding/minmax.rs +++ b/simulations/src/warding/minmax.rs @@ -14,10 +14,7 @@ impl SimulationWard for MinMaxViewWard { fn analyze(&mut self, state: &Self::SimulationState) -> bool { let mut min = usize::MAX; let mut max = 0; - let nodes = state - .nodes - .read() - .expect("simulations: MinMaxViewWard panic when requiring a read lock"); + let nodes = state.nodes.read(); for node in nodes.iter() { let view = node.current_view(); min = min.min(view); @@ -31,7 +28,8 @@ impl SimulationWard for MinMaxViewWard { mod test { use crate::warding::minmax::MinMaxViewWard; use crate::warding::{SimulationState, SimulationWard}; - use std::sync::{Arc, RwLock}; + use parking_lot::RwLock; + use std::sync::Arc; #[test] fn rebase_threshold() { @@ -43,7 +41,7 @@ mod test { assert!(!minmax.analyze(&state)); // push a new node with 10 - state.nodes.write().unwrap().push(20); + state.nodes.write().push(20); // we now have two nodes and the max - min is 10 > max_gap 5, so true assert!(minmax.analyze(&state)); } diff --git a/simulations/src/warding/mod.rs b/simulations/src/warding/mod.rs index a1a9a4d2..5a9d588e 100644 --- a/simulations/src/warding/mod.rs +++ b/simulations/src/warding/mod.rs @@ -1,6 +1,7 @@ // std -use std::sync::{Arc, RwLock}; +use std::sync::Arc; // crates +use parking_lot::RwLock; use serde::Deserialize; // internal use crate::node::Node; diff --git a/simulations/src/warding/stalled.rs b/simulations/src/warding/stalled.rs index 19c2d6fd..7612c7a3 100644 --- a/simulations/src/warding/stalled.rs +++ b/simulations/src/warding/stalled.rs @@ -34,10 +34,7 @@ impl StalledViewWard { impl SimulationWard for StalledViewWard { type SimulationState = SimulationState; fn analyze(&mut self, state: &Self::SimulationState) -> bool { - let nodes = state - .nodes - .read() - .expect("simulations: StalledViewWard panic when requiring a read lock"); + let nodes = state.nodes.read(); self.update_state(checksum(nodes.as_slice())); self.criterion >= self.threshold } @@ -57,7 +54,8 @@ fn checksum(nodes: &[N]) -> u32 { #[cfg(test)] mod test { use super::*; - use std::sync::{Arc, RwLock}; + use parking_lot::RwLock; + use std::sync::Arc; #[test] fn rebase_threshold() { @@ -78,7 +76,7 @@ mod test { assert!(stalled.analyze(&state)); // push a new one, so the criterion is reset to 0 - state.nodes.write().unwrap().push(20); + state.nodes.write().push(20); assert!(!stalled.analyze(&state)); // increase the criterion, 2 diff --git a/simulations/src/warding/ttf.rs b/simulations/src/warding/ttf.rs index 6ab27694..c9a54597 100644 --- a/simulations/src/warding/ttf.rs +++ b/simulations/src/warding/ttf.rs @@ -15,7 +15,6 @@ impl SimulationWard for MaxViewWard { state .nodes .read() - .expect("simulations: MaxViewWard panic when requiring a read lock") .iter() .all(|n| n.current_view() >= self.max_view) } @@ -25,7 +24,8 @@ impl SimulationWard for MaxViewWard { mod test { use crate::warding::ttf::MaxViewWard; use crate::warding::{SimulationState, SimulationWard}; - use std::sync::{Arc, RwLock}; + use parking_lot::RwLock; + use std::sync::Arc; #[test] fn rebase_threshold() { @@ -37,7 +37,7 @@ mod test { }; assert!(ttf.analyze(&state)); - state.nodes.write().unwrap().push(9); + state.nodes.write().push(9); assert!(!ttf.analyze(&state)); } }