chore: replace std locks to parking_lot locks in simulations (#141)

* replace std locks to parking_lot locks
This commit is contained in:
Al Liu 2023-05-31 12:57:42 +08:00 committed by GitHub
parent 2d60ce9921
commit d864fecd07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 60 additions and 97 deletions

View File

@ -13,6 +13,7 @@ crc32fast = "1.3"
crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] }
fixed-slice-deque = "0.1.0-beta2"
nomos-core = { path = "../nomos-core" }
parking_lot = "0.12"
polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"] }
rand = { version = "0.8", features = ["small_rng"] }
rayon = "1.7"

View File

@ -7,11 +7,12 @@ use simulations::streaming::polars::PolarsSubscriber;
use std::collections::BTreeMap;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
// crates
use clap::Parser;
use crossbeam::channel;
use parking_lot::RwLock;
use rand::rngs::SmallRng;
use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng};

View File

@ -419,11 +419,12 @@ fn get_roles(
mod tests {
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
sync::{Arc, RwLock},
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use crossbeam::channel;
use parking_lot::RwLock;
use rand::{
rngs::{mock::StepRng, SmallRng},
Rng, SeedableRng,
@ -803,13 +804,13 @@ mod tests {
let nodes = Arc::new(RwLock::new(nodes));
for _ in 0..9 {
network.dispatch_after(Duration::from_millis(100));
nodes.write().unwrap().par_iter_mut().for_each(|(_, node)| {
nodes.write().par_iter_mut().for_each(|(_, node)| {
node.step();
});
network.collect_messages();
}
for (_, node) in nodes.read().unwrap().iter() {
for (_, node) in nodes.read().iter() {
assert_eq!(node.current_view(), 2);
}
}

View File

@ -8,10 +8,11 @@ pub mod dummy_streaming;
use std::{
collections::BTreeMap,
ops::{Deref, DerefMut},
sync::{Arc, RwLock},
sync::Arc,
time::Duration,
};
// crates
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
// internal
use crate::overlay::{Layout, OverlaySettings, SimulationOverlay};
@ -155,12 +156,12 @@ pub trait OverlayGetter {
impl OverlayGetter for SharedState<OverlayState> {
fn get_view(&self, index: usize) -> Option<ViewOverlay> {
let overlay_state = self.read().unwrap();
let overlay_state = self.read();
overlay_state.overlays.get(&index).cloned()
}
fn get_all_nodes(&self) -> Vec<NodeId> {
let overlay_state = self.read().unwrap();
let overlay_state = self.read();
overlay_state.all_nodes.clone()
}
}

View File

@ -22,17 +22,9 @@ where
type Error = anyhow::Error;
fn try_from(state: &crate::warding::SimulationState<N>) -> Result<Self, Self::Error> {
serde_json::to_value(
state
.nodes
.read()
.expect("simulations: SimulationState panic when requiring a read lock")
.iter()
.map(N::state)
.collect::<Vec<_>>(),
)
.map(OutData::new)
.map_err(From::from)
serde_json::to_value(state.nodes.read().iter().map(N::state).collect::<Vec<_>>())
.map(OutData::new)
.map_err(From::from)
}
}

View File

@ -27,13 +27,7 @@ where
nodes: Arc::clone(&runner.nodes),
};
let mut node_ids: Vec<NodeId> = runner
.nodes
.read()
.expect("Read access to nodes vector")
.iter()
.map(N::id)
.collect();
let mut node_ids: Vec<NodeId> = runner.nodes.read().iter().map(N::id).collect();
let inner_runner = runner.inner.clone();
let nodes = runner.nodes;
@ -47,13 +41,12 @@ where
return Ok(());
}
default => {
let mut inner_runner = inner_runner.write().expect("Write access to inner in async runner");
let mut inner_runner = inner_runner.write();
node_ids.shuffle(&mut inner_runner.rng);
for ids_chunk in node_ids.chunks(chunk_size) {
let ids: HashSet<NodeId> = ids_chunk.iter().copied().collect();
nodes
.write()
.expect("Write access to nodes vector")
.par_iter_mut()
.filter(|n| ids.contains(&n.id()))
.for_each(N::step);

View File

@ -31,17 +31,14 @@ where
let inner_runner = runner.inner.clone();
let nodes = runner.nodes;
let nodes_remaining: BTreeSet<NodeId> =
(0..nodes.read().expect("Read access to nodes vector").len())
.map(From::from)
.collect();
let nodes_remaining: BTreeSet<NodeId> = (0..nodes.read().len()).map(From::from).collect();
let iterations: Vec<_> = (0..maximum_iterations).collect();
let (stop_tx, stop_rx) = bounded(1);
let p = runner.producer.clone();
let p1 = runner.producer;
let handle = std::thread::spawn(move || {
let mut inner_runner: std::sync::RwLockWriteGuard<super::SimulationRunnerInner<M>> =
inner_runner.write().expect("Locking runner");
let mut inner_runner: parking_lot::RwLockWriteGuard<super::SimulationRunnerInner<M>> =
inner_runner.write();
'main: for chunk in iterations.chunks(update_rate) {
select! {
@ -57,7 +54,7 @@ where
);
{
let mut shared_nodes = nodes.write().expect("Write access to nodes vector");
let mut shared_nodes = nodes.write();
let node: &mut N = shared_nodes
.get_mut(node_id.inner())
.expect("Node should be present");

View File

@ -81,7 +81,7 @@ where
break;
}
default => {
let mut inner_runner = inner_runner.write().expect("Lock runner");
let mut inner_runner = inner_runner.write();
let (group_index, node_id) =
choose_random_layer_and_node_id(&mut inner_runner.rng, &distribution, &layers, &mut deque);
@ -89,7 +89,7 @@ where
deque.get_mut(group_index).unwrap().remove(&node_id);
{
let mut shared_nodes = nodes.write().expect("Write access to nodes vector");
let mut shared_nodes = nodes.write();
let node: &mut N = shared_nodes
.get_mut(node_id.inner())
.expect("Node should be present");
@ -168,13 +168,7 @@ where
// add a +1 so we always have
let mut deque = FixedSliceDeque::new(gap + 1);
// push first layer
let node_ids: BTreeSet<NodeId> = runner
.nodes
.write()
.expect("Single access to runner nodes")
.iter()
.map(|node| node.id())
.collect();
let node_ids: BTreeSet<NodeId> = runner.nodes.write().iter().map(|node| node.id()).collect();
deque.push_back(node_ids);
// allocate default sets

View File

@ -4,12 +4,13 @@ mod layered_runner;
mod sync_runner;
// std
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::Duration;
// crates
use crate::streaming::{StreamProducer, Subscriber, SubscriberHandle};
use crossbeam::channel::Sender;
use parking_lot::RwLock;
use rand::rngs::SmallRng;
use rand::{RngCore, SeedableRng};
use rayon::prelude::*;

View File

@ -35,13 +35,13 @@ where
return Ok(());
}
default => {
let mut inner_runner = inner_runner.write().expect("Write access to inner simulation state");
let mut inner_runner = inner_runner.write();
// we must use a code block to make sure once the step call is finished then the write lock will be released, because in Record::try_from(&state),
// we need to call the read lock, if we do not release the write lock,
// then dead lock will occur
{
let mut nodes = nodes.write().expect("Write access to nodes vector");
let mut nodes = nodes.write();
inner_runner.step(&mut nodes);
}
@ -80,10 +80,11 @@ mod tests {
streaming::StreamProducer,
};
use crossbeam::channel;
use parking_lot::RwLock;
use rand::rngs::mock::StepRng;
use std::{
collections::{BTreeMap, HashMap},
sync::{Arc, RwLock},
sync::Arc,
time::Duration,
};
@ -142,11 +143,11 @@ mod tests {
let producer = StreamProducer::default();
let runner: SimulationRunner<DummyMessage, DummyNode, OutData> =
SimulationRunner::new(network, nodes, producer, settings);
let mut nodes = runner.nodes.write().unwrap();
runner.inner.write().unwrap().step(&mut nodes);
let mut nodes = runner.nodes.write();
runner.inner.write().step(&mut nodes);
drop(nodes);
let nodes = runner.nodes.read().unwrap();
let nodes = runner.nodes.read();
for node in nodes.iter() {
assert_eq!(node.current_view(), 0);
}
@ -189,11 +190,11 @@ mod tests {
let runner: SimulationRunner<DummyMessage, DummyNode, OutData> =
SimulationRunner::new(network, nodes, Default::default(), settings);
let mut nodes = runner.nodes.write().unwrap();
runner.inner.write().unwrap().step(&mut nodes);
let mut nodes = runner.nodes.write();
runner.inner.write().step(&mut nodes);
drop(nodes);
let nodes = runner.nodes.read().unwrap();
let nodes = runner.nodes.read();
let state = nodes[1].state();
assert_eq!(state.message_count, 10);
}

View File

@ -1,10 +1,7 @@
use std::{
any::Any,
io::stdout,
sync::{Arc, Mutex},
};
use std::{any::Any, io::stdout, sync::Arc};
use super::{Receivers, StreamSettings, Subscriber};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Default, Deserialize)]
@ -98,13 +95,7 @@ where
}
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> {
serde_json::to_writer(
&mut *self
.writer
.lock()
.expect("fail to lock writer in io subscriber"),
&state,
)?;
serde_json::to_writer(&mut *self.writer.lock(), &state)?;
Ok(())
}
}
@ -135,7 +126,7 @@ mod tests {
type Error = anyhow::Error;
fn try_from(value: &SimulationState<DummyStreamingNode<()>>) -> Result<Self, Self::Error> {
let nodes = value.nodes.read().expect("failed to read nodes");
let nodes = value.nodes.read();
Ok(Self {
states: nodes
.iter()

View File

@ -1,13 +1,13 @@
use super::{Receivers, StreamSettings, Subscriber};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::{
fs::{File, OpenOptions},
io::Write,
path::PathBuf,
sync::{Arc, Mutex},
sync::Arc,
};
use super::{Receivers, StreamSettings, Subscriber};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NaiveSettings {
pub path: PathBuf,
@ -94,7 +94,7 @@ where
}
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> {
let mut file = self.file.lock().expect("failed to lock file");
let mut file = self.file.lock();
serde_json::to_writer(&mut *file, &state)?;
file.write_all(b",\n")?;
Ok(())
@ -131,7 +131,6 @@ mod tests {
states: value
.nodes
.read()
.expect("failed to read nodes")
.iter()
.map(|node| (node.id(), node.current_view()))
.collect(),

View File

@ -1,3 +1,4 @@
use parking_lot::Mutex;
use polars::prelude::*;
use serde::{Deserialize, Serialize};
use std::{
@ -5,7 +6,6 @@ use std::{
io::Cursor,
path::{Path, PathBuf},
str::FromStr,
sync::Mutex,
};
use super::{Receivers, StreamSettings};
@ -72,10 +72,7 @@ where
R: Serialize,
{
fn persist(&self) -> anyhow::Result<()> {
let data = self
.data
.lock()
.expect("failed to lock data in PolarsSubscriber pesist");
let data = self.data.lock();
let mut cursor = Cursor::new(Vec::new());
serde_json::to_writer(&mut cursor, &*data).expect("Dump data to json ");
let mut data = JsonReader::new(cursor)
@ -137,10 +134,7 @@ where
}
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> {
self.data
.lock()
.expect("failed to lock data in PolarsSubscriber")
.push(state);
self.data.lock().push(state);
Ok(())
}
}

View File

@ -14,10 +14,7 @@ impl<N: Node> SimulationWard<N> for MinMaxViewWard {
fn analyze(&mut self, state: &Self::SimulationState) -> bool {
let mut min = usize::MAX;
let mut max = 0;
let nodes = state
.nodes
.read()
.expect("simulations: MinMaxViewWard panic when requiring a read lock");
let nodes = state.nodes.read();
for node in nodes.iter() {
let view = node.current_view();
min = min.min(view);
@ -31,7 +28,8 @@ impl<N: Node> SimulationWard<N> for MinMaxViewWard {
mod test {
use crate::warding::minmax::MinMaxViewWard;
use crate::warding::{SimulationState, SimulationWard};
use std::sync::{Arc, RwLock};
use parking_lot::RwLock;
use std::sync::Arc;
#[test]
fn rebase_threshold() {
@ -43,7 +41,7 @@ mod test {
assert!(!minmax.analyze(&state));
// push a new node with 10
state.nodes.write().unwrap().push(20);
state.nodes.write().push(20);
// we now have two nodes and the max - min is 10 > max_gap 5, so true
assert!(minmax.analyze(&state));
}

View File

@ -1,6 +1,7 @@
// std
use std::sync::{Arc, RwLock};
use std::sync::Arc;
// crates
use parking_lot::RwLock;
use serde::Deserialize;
// internal
use crate::node::Node;

View File

@ -34,10 +34,7 @@ impl StalledViewWard {
impl<N: Node> SimulationWard<N> for StalledViewWard {
type SimulationState = SimulationState<N>;
fn analyze(&mut self, state: &Self::SimulationState) -> bool {
let nodes = state
.nodes
.read()
.expect("simulations: StalledViewWard panic when requiring a read lock");
let nodes = state.nodes.read();
self.update_state(checksum(nodes.as_slice()));
self.criterion >= self.threshold
}
@ -57,7 +54,8 @@ fn checksum<N: Node>(nodes: &[N]) -> u32 {
#[cfg(test)]
mod test {
use super::*;
use std::sync::{Arc, RwLock};
use parking_lot::RwLock;
use std::sync::Arc;
#[test]
fn rebase_threshold() {
@ -78,7 +76,7 @@ mod test {
assert!(stalled.analyze(&state));
// push a new one, so the criterion is reset to 0
state.nodes.write().unwrap().push(20);
state.nodes.write().push(20);
assert!(!stalled.analyze(&state));
// increase the criterion, 2

View File

@ -15,7 +15,6 @@ impl<N: Node> SimulationWard<N> for MaxViewWard {
state
.nodes
.read()
.expect("simulations: MaxViewWard panic when requiring a read lock")
.iter()
.all(|n| n.current_view() >= self.max_view)
}
@ -25,7 +24,8 @@ impl<N: Node> SimulationWard<N> for MaxViewWard {
mod test {
use crate::warding::ttf::MaxViewWard;
use crate::warding::{SimulationState, SimulationWard};
use std::sync::{Arc, RwLock};
use parking_lot::RwLock;
use std::sync::Arc;
#[test]
fn rebase_threshold() {
@ -37,7 +37,7 @@ mod test {
};
assert!(ttf.analyze(&state));
state.nodes.write().unwrap().push(9);
state.nodes.write().push(9);
assert!(!ttf.analyze(&state));
}
}