Use par_iter in network module (#116)

* Use par_iter in network module

* Pass rng from filter function

* Remove locking for network messages
This commit is contained in:
gusto 2023-04-13 17:24:27 +03:00 committed by GitHub
parent a87418c908
commit 2206d7a291
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 71 additions and 52 deletions

View File

@ -6,14 +6,15 @@ use std::{
}; };
// crates // crates
use crossbeam::channel::{self, Receiver, Sender}; use crossbeam::channel::{self, Receiver, Sender};
use rand::Rng; use rand::{rngs::ThreadRng, Rng};
use rayon::prelude::*;
// internal // internal
use crate::node::NodeId; use crate::node::NodeId;
pub mod behaviour; pub mod behaviour;
pub mod regions; pub mod regions;
pub type NetworkTime = Instant; type NetworkTime = Instant;
pub struct Network<M> { pub struct Network<M> {
pub regions: regions::RegionsData, pub regions: regions::RegionsData,
@ -25,7 +26,7 @@ pub struct Network<M> {
impl<M> Network<M> impl<M> Network<M>
where where
M: Clone, M: Send + Sync + Clone,
{ {
pub fn new(regions: regions::RegionsData) -> Self { pub fn new(regions: regions::RegionsData) -> Self {
Self { Self {
@ -62,39 +63,64 @@ where
} }
/// Collects and dispatches messages to connected interfaces. /// Collects and dispatches messages to connected interfaces.
pub fn step<R: Rng>(&mut self, rng: &mut R, time_passed: Duration) { pub fn step(&mut self, time_passed: Duration) {
self.collect_messages(); self.collect_messages();
self.dispatch_after(rng, time_passed); self.dispatch_after(time_passed);
} }
/// Receive and store all messages from nodes. /// Receive and store all messages from nodes.
pub fn collect_messages(&mut self) { pub fn collect_messages(&mut self) {
self.from_node_receivers.iter().for_each(|(_, from_node)| { let mut new_messages = self
while let Ok(message) = from_node.try_recv() { .from_node_receivers
self.messages.push((self.network_time, message)); .par_iter()
} .flat_map(|(_, from_node)| {
}); from_node
.try_iter()
.map(|msg| (self.network_time, msg))
.collect::<Vec<_>>()
})
.collect();
self.messages.append(&mut new_messages);
} }
/// Reiterate all messages and send to appropriate nodes if simulated /// Reiterate all messages and send to appropriate nodes if simulated
/// delay has passed. /// delay has passed.
pub fn dispatch_after<R: Rng>(&mut self, rng: &mut R, time_passed: Duration) { pub fn dispatch_after(&mut self, time_passed: Duration) {
self.network_time += time_passed; self.network_time += time_passed;
let mut delayed = vec![]; let delayed = self
while let Some((network_time, message)) = self.messages.pop() { .messages
// If cost is None, message won't get sent nor it will be .par_iter()
// readded to the pending messages list. .filter(|(network_time, message)| {
let mut rng = ThreadRng::default();
self.send_or_drop_message(&mut rng, network_time, message)
})
.cloned()
.collect();
self.messages = delayed;
}
/// Returns true if message needs to be delayed and be dispatched in future.
fn send_or_drop_message<R: Rng>(
&self,
rng: &mut R,
network_time: &NetworkTime,
message: &NetworkMessage<M>,
) -> bool {
if let Some(delay) = self.send_message_cost(rng, message.from, message.to) { if let Some(delay) = self.send_message_cost(rng, message.from, message.to) {
if network_time.add(delay) <= self.network_time { if network_time.add(delay) <= self.network_time {
let to_node = self.to_node_senders.get(&message.to).unwrap(); let to_node = self.to_node_senders.get(&message.to).unwrap();
to_node.send(message).expect("Node should have connection"); to_node
.send(message.clone())
.expect("Node should have connection");
return false;
} else { } else {
delayed.push((network_time, message)); return true;
} }
} }
} false
self.messages = delayed;
} }
} }
@ -127,7 +153,6 @@ mod tests {
}; };
use crate::node::NodeId; use crate::node::NodeId;
use crossbeam::channel::{self, Receiver, Sender}; use crossbeam::channel::{self, Receiver, Sender};
use rand::rngs::mock::StepRng;
use std::{collections::HashMap, time::Duration}; use std::{collections::HashMap, time::Duration};
struct MockNetworkInterface { struct MockNetworkInterface {
@ -165,7 +190,6 @@ mod tests {
#[test] #[test]
fn send_receive_messages() { fn send_receive_messages() {
let mut rng = StepRng::new(1, 0);
let node_a = 0.into(); let node_a = 0.into();
let node_b = 1.into(); let node_b = 1.into();
@ -191,15 +215,15 @@ mod tests {
assert_eq!(a.receive_messages().len(), 0); assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0); assert_eq!(b.receive_messages().len(), 0);
network.step(&mut rng, Duration::from_millis(0)); network.step(Duration::from_millis(0));
assert_eq!(a.receive_messages().len(), 0); assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0); assert_eq!(b.receive_messages().len(), 0);
network.step(&mut rng, Duration::from_millis(100)); network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0); assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 1); assert_eq!(b.receive_messages().len(), 1);
network.step(&mut rng, Duration::from_millis(100)); network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0); assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0); assert_eq!(b.receive_messages().len(), 0);
@ -208,14 +232,13 @@ mod tests {
b.send_message(node_a, ()); b.send_message(node_a, ());
network.collect_messages(); network.collect_messages();
network.dispatch_after(&mut rng, Duration::from_millis(100)); network.dispatch_after(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 3); assert_eq!(a.receive_messages().len(), 3);
assert_eq!(b.receive_messages().len(), 0); assert_eq!(b.receive_messages().len(), 0);
} }
#[test] #[test]
fn regions_send_receive_messages() { fn regions_send_receive_messages() {
let mut rng = StepRng::new(1, 0);
let node_a = 0.into(); let node_a = 0.into();
let node_b = 1.into(); let node_b = 1.into();
let node_c = 2.into(); let node_c = 2.into();
@ -265,7 +288,7 @@ mod tests {
c.send_message(node_b, ()); c.send_message(node_b, ());
network.collect_messages(); network.collect_messages();
network.dispatch_after(&mut rng, Duration::from_millis(100)); network.dispatch_after(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 1); assert_eq!(a.receive_messages().len(), 1);
assert_eq!(b.receive_messages().len(), 1); assert_eq!(b.receive_messages().len(), 1);
assert_eq!(c.receive_messages().len(), 0); assert_eq!(c.receive_messages().len(), 0);
@ -274,12 +297,12 @@ mod tests {
b.send_message(node_c, ()); b.send_message(node_c, ());
network.collect_messages(); network.collect_messages();
network.dispatch_after(&mut rng, Duration::from_millis(400)); network.dispatch_after(Duration::from_millis(400));
assert_eq!(a.receive_messages().len(), 1); // c to a assert_eq!(a.receive_messages().len(), 1); // c to a
assert_eq!(b.receive_messages().len(), 2); // c to b && a to b assert_eq!(b.receive_messages().len(), 2); // c to b && a to b
assert_eq!(c.receive_messages().len(), 2); // a to c && b to c assert_eq!(c.receive_messages().len(), 2); // a to c && b to c
network.dispatch_after(&mut rng, Duration::from_millis(100)); network.dispatch_after(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0); assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0); assert_eq!(b.receive_messages().len(), 0);
assert_eq!(c.receive_messages().len(), 1); // b to c assert_eq!(c.receive_messages().len(), 1); // b to c

View File

@ -315,9 +315,6 @@ impl DummyNode {
fn handle_leaf(&mut self, message: &NetworkMessage<DummyMessage>) { fn handle_leaf(&mut self, message: &NetworkMessage<DummyMessage>) {
if let DummyMessage::Proposal(block) = &message.payload { if let DummyMessage::Proposal(block) = &message.payload {
if block.view > self.current_view() {
self.update_view(block.view);
}
if !self.is_vote_sent(block.view) { if !self.is_vote_sent(block.view) {
let parents = &self.local_view.parents.as_ref().expect("leaf has parents"); let parents = &self.local_view.parents.as_ref().expect("leaf has parents");
parents.iter().for_each(|node_id| { parents.iter().for_each(|node_id| {
@ -601,7 +598,7 @@ mod tests {
} }
// 1. Leaders receive vote and broadcast new Proposal(Block) to all nodes. // 1. Leaders receive vote and broadcast new Proposal(Block) to all nodes.
network.dispatch_after(&mut rng, Duration::from_millis(100)); network.dispatch_after(Duration::from_millis(100));
nodes.iter_mut().for_each(|(_, node)| { nodes.iter_mut().for_each(|(_, node)| {
node.step(); node.step();
}); });
@ -609,7 +606,7 @@ mod tests {
// 2. a) All nodes received proposal block. // 2. a) All nodes received proposal block.
// b) Leaf nodes send vote to internal nodes. // b) Leaf nodes send vote to internal nodes.
network.dispatch_after(&mut rng, Duration::from_millis(100)); network.dispatch_after(Duration::from_millis(100));
nodes.iter_mut().for_each(|(_, node)| { nodes.iter_mut().for_each(|(_, node)| {
node.step(); node.step();
}); });
@ -632,7 +629,7 @@ mod tests {
assert!(nodes[&6.into()].state().view_state[&1].vote_sent); // Leaf assert!(nodes[&6.into()].state().view_state[&1].vote_sent); // Leaf
// 3. Internal nodes send vote to root node. // 3. Internal nodes send vote to root node.
network.dispatch_after(&mut rng, Duration::from_millis(100)); network.dispatch_after(Duration::from_millis(100));
nodes.iter_mut().for_each(|(_, node)| { nodes.iter_mut().for_each(|(_, node)| {
node.step(); node.step();
}); });
@ -650,7 +647,7 @@ mod tests {
assert!(nodes[&6.into()].state().view_state[&1].vote_sent); // Leaf assert!(nodes[&6.into()].state().view_state[&1].vote_sent); // Leaf
// 4. Root node send vote to next view leader nodes. // 4. Root node send vote to next view leader nodes.
network.dispatch_after(&mut rng, Duration::from_millis(100)); network.dispatch_after(Duration::from_millis(100));
nodes.iter_mut().for_each(|(_, node)| { nodes.iter_mut().for_each(|(_, node)| {
node.step(); node.step();
}); });
@ -666,7 +663,7 @@ mod tests {
assert!(nodes[&6.into()].state().view_state[&1].vote_sent); // Leaf assert!(nodes[&6.into()].state().view_state[&1].vote_sent); // Leaf
// 5. Leaders receive vote and broadcast new Proposal(Block) to all nodes. // 5. Leaders receive vote and broadcast new Proposal(Block) to all nodes.
network.dispatch_after(&mut rng, Duration::from_millis(100)); network.dispatch_after(Duration::from_millis(100));
nodes.iter_mut().for_each(|(_, node)| { nodes.iter_mut().for_each(|(_, node)| {
node.step(); node.step();
}); });
@ -679,7 +676,7 @@ mod tests {
// 6. a) All nodes received proposal block. // 6. a) All nodes received proposal block.
// b) Leaf nodes send vote to internal nodes. // b) Leaf nodes send vote to internal nodes.
network.dispatch_after(&mut rng, Duration::from_millis(100)); network.dispatch_after(Duration::from_millis(100));
nodes.iter_mut().for_each(|(_, node)| { nodes.iter_mut().for_each(|(_, node)| {
node.step(); node.step();
}); });
@ -740,7 +737,7 @@ mod tests {
} }
for _ in 0..7 { for _ in 0..7 {
network.dispatch_after(&mut rng, Duration::from_millis(100)); network.dispatch_after(Duration::from_millis(100));
nodes.iter_mut().for_each(|(_, node)| { nodes.iter_mut().for_each(|(_, node)| {
node.step(); node.step();
}); });
@ -789,7 +786,7 @@ mod tests {
} }
for _ in 0..7 { for _ in 0..7 {
network.dispatch_after(&mut rng, Duration::from_millis(100)); network.dispatch_after(Duration::from_millis(100));
nodes.iter_mut().for_each(|(_, node)| { nodes.iter_mut().for_each(|(_, node)| {
node.step(); node.step();
}); });
@ -835,7 +832,7 @@ mod tests {
let nodes = Arc::new(RwLock::new(nodes)); let nodes = Arc::new(RwLock::new(nodes));
for _ in 0..9 { for _ in 0..9 {
network.dispatch_after(&mut rng, Duration::from_millis(100)); network.dispatch_after(Duration::from_millis(100));
nodes.write().unwrap().par_iter_mut().for_each(|(_, node)| { nodes.write().unwrap().par_iter_mut().for_each(|(_, node)| {
node.step(); node.step();
}); });

View File

@ -15,7 +15,7 @@ pub fn simulate<M, N: Node, O: Overlay>(
mut out_data: Option<&mut Vec<OutData>>, mut out_data: Option<&mut Vec<OutData>>,
) -> anyhow::Result<()> ) -> anyhow::Result<()>
where where
M: Clone, M: Send + Sync + Clone,
N::Settings: Clone, N::Settings: Clone,
N: Send + Sync, N: Send + Sync,
N::State: Serialize, N::State: Serialize,

View File

@ -16,7 +16,7 @@ pub fn simulate<M, N: Node, O: Overlay>(
mut out_data: Option<&mut Vec<OutData>>, mut out_data: Option<&mut Vec<OutData>>,
) -> anyhow::Result<()> ) -> anyhow::Result<()>
where where
M: Clone, M: Send + Sync + Clone,
N: Send + Sync, N: Send + Sync,
N::Settings: Clone, N::Settings: Clone,
N::State: Serialize, N::State: Serialize,

View File

@ -50,7 +50,7 @@ pub fn simulate<M, N: Node, O: Overlay>(
mut out_data: Option<&mut Vec<OutData>>, mut out_data: Option<&mut Vec<OutData>>,
) -> anyhow::Result<()> ) -> anyhow::Result<()>
where where
M: Clone, M: Send + Sync + Clone,
N: Send + Sync, N: Send + Sync,
N::Settings: Clone, N::Settings: Clone,
N::State: Serialize, N::State: Serialize,

View File

@ -37,7 +37,7 @@ where
impl<M, N: Node, O: Overlay> SimulationRunner<M, N, O> impl<M, N: Node, O: Overlay> SimulationRunner<M, N, O>
where where
M: Clone, M: Send + Sync + Clone,
N: Send + Sync, N: Send + Sync,
N::Settings: Clone, N::Settings: Clone,
N::State: Serialize, N::State: Serialize,
@ -101,8 +101,7 @@ where
} }
fn step(&mut self) { fn step(&mut self) {
self.network self.network.dispatch_after(Duration::from_millis(100));
.dispatch_after(&mut self.rng, Duration::from_millis(100));
self.nodes self.nodes
.write() .write()
.expect("Single access to nodes vector") .expect("Single access to nodes vector")

View File

@ -13,7 +13,7 @@ pub fn simulate<M, N: Node, O: Overlay>(
mut out_data: Option<&mut Vec<OutData>>, mut out_data: Option<&mut Vec<OutData>>,
) -> anyhow::Result<()> ) -> anyhow::Result<()>
where where
M: Clone, M: Send + Sync + Clone,
N: Send + Sync, N: Send + Sync,
N::Settings: Clone, N::Settings: Clone,
N::State: Serialize, N::State: Serialize,