remove RwLock from SimmulationRunnerInner (#189)

This commit is contained in:
Al Liu 2023-06-15 17:16:28 +08:00 committed by GitHub
parent 27d9f72035
commit e57cb7b3cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 11 additions and 18 deletions

View File

@ -34,7 +34,7 @@ where
let mut node_ids: Vec<NodeId> = runner.nodes.read().iter().map(N::id).collect(); let mut node_ids: Vec<NodeId> = runner.nodes.read().iter().map(N::id).collect();
let inner_runner = runner.inner.clone(); let mut inner_runner = runner.inner;
let nodes = runner.nodes; let nodes = runner.nodes;
let (stop_tx, stop_rx) = bounded(1); let (stop_tx, stop_rx) = bounded(1);
let p = runner.producer.clone(); let p = runner.producer.clone();
@ -46,7 +46,6 @@ where
return Ok(()); return Ok(());
} }
default => { default => {
let mut inner_runner = inner_runner.write();
node_ids.shuffle(&mut inner_runner.rng); node_ids.shuffle(&mut inner_runner.rng);
for ids_chunk in node_ids.chunks(chunk_size) { for ids_chunk in node_ids.chunks(chunk_size) {
let ids: HashSet<NodeId> = ids_chunk.iter().copied().collect(); let ids: HashSet<NodeId> = ids_chunk.iter().copied().collect();

View File

@ -35,7 +35,7 @@ where
nodes: Arc::clone(&runner.nodes), nodes: Arc::clone(&runner.nodes),
}; };
let inner_runner = runner.inner.clone(); let mut inner_runner = runner.inner;
let nodes = runner.nodes; let nodes = runner.nodes;
let nodes_remaining: BTreeSet<NodeId> = (0..nodes.read().len()).map(node_id).collect(); let nodes_remaining: BTreeSet<NodeId> = (0..nodes.read().len()).map(node_id).collect();
let iterations: Vec<_> = (0..maximum_iterations).collect(); let iterations: Vec<_> = (0..maximum_iterations).collect();
@ -43,9 +43,6 @@ where
let p = runner.producer.clone(); let p = runner.producer.clone();
let p1 = runner.producer; let p1 = runner.producer;
let handle = std::thread::spawn(move || { let handle = std::thread::spawn(move || {
let mut inner_runner: parking_lot::RwLockWriteGuard<super::SimulationRunnerInner<M>> =
inner_runner.write();
'main: for chunk in iterations.chunks(update_rate) { 'main: for chunk in iterations.chunks(update_rate) {
select! { select! {
recv(stop_rx) -> _ => break 'main, recv(stop_rx) -> _ => break 'main,

View File

@ -75,7 +75,7 @@ where
nodes: Arc::clone(&runner.nodes), nodes: Arc::clone(&runner.nodes),
}; };
let inner_runner = runner.inner.clone(); let mut inner_runner = runner.inner;
let nodes = runner.nodes; let nodes = runner.nodes;
let (stop_tx, stop_rx) = bounded(1); let (stop_tx, stop_rx) = bounded(1);
let p = runner.producer.clone(); let p = runner.producer.clone();
@ -87,7 +87,6 @@ where
break; break;
} }
default => { default => {
let mut inner_runner = inner_runner.write();
let (group_index, node_id) = let (group_index, node_id) =
choose_random_layer_and_node_id(&mut inner_runner.rng, &distribution, &layers, &mut deque); choose_random_layer_and_node_id(&mut inner_runner.rng, &distribution, &layers, &mut deque);

View File

@ -97,7 +97,7 @@ pub struct SimulationRunner<M, N, R>
where where
N: Node, N: Node,
{ {
inner: Arc<RwLock<SimulationRunnerInner<M>>>, inner: SimulationRunnerInner<M>,
nodes: Arc<RwLock<Vec<N>>>, nodes: Arc<RwLock<Vec<N>>>,
runner_settings: RunnerSettings, runner_settings: RunnerSettings,
producer: StreamProducer<R>, producer: StreamProducer<R>,
@ -148,11 +148,11 @@ where
} = settings; } = settings;
Ok(Self { Ok(Self {
runner_settings, runner_settings,
inner: Arc::new(RwLock::new(SimulationRunnerInner { inner: SimulationRunnerInner {
network, network,
rng, rng,
wards, wards,
})), },
nodes, nodes,
producer, producer,
}) })

View File

@ -25,7 +25,7 @@ where
nodes: Arc::clone(&runner.nodes), nodes: Arc::clone(&runner.nodes),
}; };
let inner_runner = runner.inner.clone(); let mut inner_runner = runner.inner;
let nodes = runner.nodes; let nodes = runner.nodes;
let (stop_tx, stop_rx) = bounded(1); let (stop_tx, stop_rx) = bounded(1);
@ -39,8 +39,6 @@ where
return Ok(()); return Ok(());
} }
default => { default => {
let mut inner_runner = inner_runner.write();
// we must use a code block to make sure once the step call is finished then the write lock will be released, because in Record::try_from(&state), // we must use a code block to make sure once the step call is finished then the write lock will be released, because in Record::try_from(&state),
// we need to call the read lock, if we do not release the write lock, // we need to call the read lock, if we do not release the write lock,
// then dead lock will occur // then dead lock will occur
@ -146,10 +144,10 @@ mod tests {
let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state); let nodes = init_dummy_nodes(&node_ids, &mut network, overlay_state);
let producer = StreamProducer::default(); let producer = StreamProducer::default();
let runner: SimulationRunner<DummyMessage, DummyNode, OutData> = let mut runner: SimulationRunner<DummyMessage, DummyNode, OutData> =
SimulationRunner::new(network, nodes, producer, settings).unwrap(); SimulationRunner::new(network, nodes, producer, settings).unwrap();
let mut nodes = runner.nodes.write(); let mut nodes = runner.nodes.write();
runner.inner.write().step(&mut nodes); runner.inner.step(&mut nodes);
drop(nodes); drop(nodes);
let nodes = runner.nodes.read(); let nodes = runner.nodes.read();
@ -192,11 +190,11 @@ mod tests {
} }
network.collect_messages(); network.collect_messages();
let runner: SimulationRunner<DummyMessage, DummyNode, OutData> = let mut runner: SimulationRunner<DummyMessage, DummyNode, OutData> =
SimulationRunner::new(network, nodes, Default::default(), settings).unwrap(); SimulationRunner::new(network, nodes, Default::default(), settings).unwrap();
let mut nodes = runner.nodes.write(); let mut nodes = runner.nodes.write();
runner.inner.write().step(&mut nodes); runner.inner.step(&mut nodes);
drop(nodes); drop(nodes);
let nodes = runner.nodes.read(); let nodes = runner.nodes.read();