Simulation network broadcast fix (#262)

* Replace network broadcast msg type to a dedicated channel

* Update tests with broadcast chan

* Replace threadrng with seedable smallrng

* Simplify the broadcast loop
This commit is contained in:
gusto 2023-07-18 15:01:29 +03:00 committed by GitHub
parent 94e384f609
commit 9f71dbb24c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 337 additions and 296 deletions

View File

@ -72,15 +72,22 @@ impl SimulationApp {
let regions_data = RegionsData::new(regions, behaviours);
let ids = node_ids.clone();
let mut network = Network::new(regions_data);
let mut network = Network::new(regions_data, seed);
let nodes: Vec<BoxedNode<CarnotSettings, CarnotState>> = node_ids
.iter()
.copied()
.map(|node_id| {
let (node_message_broadcast_sender, node_message_broadcast_receiver) =
channel::unbounded();
let (node_message_sender, node_message_receiver) = channel::unbounded();
let network_message_receiver = network.connect(node_id, node_message_receiver);
let network_message_receiver = network.connect(
node_id,
node_message_receiver,
node_message_broadcast_receiver,
);
let network_interface = InMemoryNetworkInterface::new(
node_id,
node_message_broadcast_sender,
node_message_sender,
network_message_receiver,
);

View File

@ -7,7 +7,7 @@ use std::{
};
// crates
use crossbeam::channel::{self, Receiver, Sender};
use rand::{rngs::ThreadRng, Rng};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
// internal
@ -106,20 +106,24 @@ pub struct Network<M> {
network_time: NetworkTime,
messages: Vec<(NetworkTime, NetworkMessage<M>)>,
from_node_receivers: HashMap<NodeId, Receiver<NetworkMessage<M>>>,
from_node_broadcast_receivers: HashMap<NodeId, Receiver<NetworkMessage<M>>>,
to_node_senders: HashMap<NodeId, Sender<NetworkMessage<M>>>,
seed: u64,
}
impl<M> Network<M>
where
M: Send + Sync + Clone,
{
pub fn new(regions: regions::RegionsData) -> Self {
pub fn new(regions: regions::RegionsData, seed: u64) -> Self {
Self {
regions,
network_time: Instant::now(),
messages: Vec::new(),
from_node_receivers: HashMap::new(),
from_node_broadcast_receivers: HashMap::new(),
to_node_senders: HashMap::new(),
seed,
}
}
@ -139,10 +143,13 @@ where
&mut self,
node_id: NodeId,
node_message_receiver: Receiver<NetworkMessage<M>>,
node_message_broadcast_receiver: Receiver<NetworkMessage<M>>,
) -> Receiver<NetworkMessage<M>> {
let (to_node_sender, from_network_receiver) = channel::unbounded();
self.from_node_receivers
.insert(node_id, node_message_receiver);
self.from_node_broadcast_receivers
.insert(node_id, node_message_broadcast_receiver);
self.to_node_senders.insert(node_id, to_node_sender);
from_network_receiver
}
@ -155,7 +162,7 @@ where
/// Receive and store all messages from nodes.
pub fn collect_messages(&mut self) {
let mut new_messages = self
let mut adhoc_messages = self
.from_node_receivers
.par_iter()
.flat_map(|(_, from_node)| {
@ -165,8 +172,23 @@ where
.collect::<Vec<_>>()
})
.collect();
self.messages.append(&mut adhoc_messages);
self.messages.append(&mut new_messages);
let mut broadcast_messages = self
.from_node_broadcast_receivers
.iter()
.flat_map(|(_, from_node)| {
from_node.try_iter().flat_map(|msg| {
self.to_node_senders.keys().map(move |recipient| {
let mut m = msg.clone();
m.to = Some(*recipient);
m
})
})
})
.map(|m| (self.network_time, m))
.collect::<Vec<_>>();
self.messages.append(&mut broadcast_messages);
}
/// Reiterate all messages and send to appropriate nodes if simulated
@ -178,7 +200,7 @@ where
.messages
.par_iter()
.filter(|(network_time, message)| {
let mut rng = ThreadRng::default();
let mut rng = SmallRng::seed_from_u64(self.seed);
self.send_or_drop_message(&mut rng, network_time, message)
})
.cloned()
@ -194,35 +216,12 @@ where
network_time: &NetworkTime,
message: &NetworkMessage<M>,
) -> bool {
match message {
NetworkMessage::Adhoc(msg) => {
let recipient = msg.to.expect("Adhoc message has recipient");
let to_node = self.to_node_senders.get(&recipient).unwrap();
self.send_delayed(rng, recipient, to_node, network_time, msg)
}
NetworkMessage::Broadcast(msg) => {
let mut adhoc = msg.clone();
for (recipient, to_node) in self.to_node_senders.iter() {
adhoc.to = Some(*recipient);
self.send_delayed(rng, *recipient, to_node, network_time, &adhoc);
}
false
}
}
}
fn send_delayed<R: Rng>(
&self,
rng: &mut R,
to: NodeId,
to_node: &Sender<NetworkMessage<M>>,
network_time: &NetworkTime,
msg: &AdhocMessage<M>,
) -> bool {
if let Some(delay) = self.send_message_cost(rng, msg.from, to) {
let to = message.to.expect("adhoc message has recipient");
if let Some(delay) = self.send_message_cost(rng, message.from, to) {
if network_time.add(delay) <= self.network_time {
let to_node = self.to_node_senders.get(&to).unwrap();
to_node
.send(NetworkMessage::Adhoc(msg.clone()))
.send(message.clone())
.expect("Node should have connection");
return false;
} else {
@ -234,40 +233,19 @@ where
}
#[derive(Clone, Debug)]
pub struct AdhocMessage<M> {
pub struct NetworkMessage<M> {
pub from: NodeId,
pub to: Option<NodeId>,
pub payload: M,
}
#[derive(Clone, Debug)]
pub enum NetworkMessage<M> {
Adhoc(AdhocMessage<M>),
Broadcast(AdhocMessage<M>),
}
impl<M> NetworkMessage<M> {
pub fn adhoc(from: NodeId, to: NodeId, payload: M) -> Self {
Self::Adhoc(AdhocMessage {
from,
to: Some(to),
payload,
})
}
pub fn broadcast(from: NodeId, payload: M) -> Self {
Self::Broadcast(AdhocMessage {
from,
to: None,
payload,
})
pub fn new(from: NodeId, to: Option<NodeId>, payload: M) -> Self {
Self { from, to, payload }
}
pub fn get_payload(self) -> M {
match self {
NetworkMessage::Adhoc(AdhocMessage { payload, .. }) => payload,
NetworkMessage::Broadcast(AdhocMessage { payload, .. }) => payload,
}
self.payload
}
}
@ -281,6 +259,7 @@ pub trait NetworkInterface {
pub struct InMemoryNetworkInterface<M> {
id: NodeId,
broadcast: Sender<NetworkMessage<M>>,
sender: Sender<NetworkMessage<M>>,
receiver: Receiver<NetworkMessage<M>>,
}
@ -288,11 +267,13 @@ pub struct InMemoryNetworkInterface<M> {
impl<M> InMemoryNetworkInterface<M> {
pub fn new(
id: NodeId,
broadcast: Sender<NetworkMessage<M>>,
sender: Sender<NetworkMessage<M>>,
receiver: Receiver<NetworkMessage<M>>,
) -> Self {
Self {
id,
broadcast,
sender,
receiver,
}
@ -303,12 +284,12 @@ impl<M> NetworkInterface for InMemoryNetworkInterface<M> {
type Payload = M;
fn broadcast(&self, message: Self::Payload) {
let message = NetworkMessage::broadcast(self.id, message);
self.sender.send(message).unwrap();
let message = NetworkMessage::new(self.id, None, message);
self.broadcast.send(message).unwrap();
}
fn send_message(&self, address: NodeId, message: Self::Payload) {
let message = NetworkMessage::adhoc(self.id, address, message);
let message = NetworkMessage::new(self.id, Some(address), message);
self.sender.send(message).unwrap();
}
@ -333,6 +314,7 @@ mod tests {
struct MockNetworkInterface {
id: NodeId,
broadcast: Sender<NetworkMessage<()>>,
sender: Sender<NetworkMessage<()>>,
receiver: Receiver<NetworkMessage<()>>,
}
@ -340,11 +322,13 @@ mod tests {
impl MockNetworkInterface {
pub fn new(
id: NodeId,
broadcast: Sender<NetworkMessage<()>>,
sender: Sender<NetworkMessage<()>>,
receiver: Receiver<NetworkMessage<()>>,
) -> Self {
Self {
id,
broadcast,
sender,
receiver,
}
@ -355,12 +339,12 @@ mod tests {
type Payload = ();
fn broadcast(&self, message: Self::Payload) {
let message = NetworkMessage::broadcast(self.id, message);
self.sender.send(message).unwrap();
let message = NetworkMessage::new(self.id, None, message);
self.broadcast.send(message).unwrap();
}
fn send_message(&self, address: NodeId, message: Self::Payload) {
let message = NetworkMessage::adhoc(self.id, address, message);
let message = NetworkMessage::new(self.id, Some(address), message);
self.sender.send(message).unwrap();
}
@ -380,15 +364,27 @@ mod tests {
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
)]);
let regions_data = RegionsData::new(regions, behaviour);
let mut network = Network::new(regions_data);
let mut network = Network::new(regions_data, 0);
let (from_a_sender, from_a_receiver) = channel::unbounded();
let to_a_receiver = network.connect(node_a, from_a_receiver);
let a = MockNetworkInterface::new(node_a, from_a_sender, to_a_receiver);
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
let to_a_receiver = network.connect(node_a, from_a_receiver, from_a_broadcast_receiver);
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
from_a_sender,
to_a_receiver,
);
let (from_b_sender, from_b_receiver) = channel::unbounded();
let to_b_receiver = network.connect(node_b, from_b_receiver);
let b = MockNetworkInterface::new(node_b, from_b_sender, to_b_receiver);
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
let to_b_receiver = network.connect(node_b, from_b_receiver, from_b_broadcast_receiver);
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
from_b_sender,
to_b_receiver,
);
a.send_message(node_b, ());
network.collect_messages();
@ -443,19 +439,37 @@ mod tests {
),
]);
let regions_data = RegionsData::new(regions, behaviour);
let mut network = Network::new(regions_data);
let mut network = Network::new(regions_data, 0);
let (from_a_sender, from_a_receiver) = channel::unbounded();
let to_a_receiver = network.connect(node_a, from_a_receiver);
let a = MockNetworkInterface::new(node_a, from_a_sender, to_a_receiver);
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
let to_a_receiver = network.connect(node_a, from_a_receiver, from_a_broadcast_receiver);
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
from_a_sender,
to_a_receiver,
);
let (from_b_sender, from_b_receiver) = channel::unbounded();
let to_b_receiver = network.connect(node_b, from_b_receiver);
let b = MockNetworkInterface::new(node_b, from_b_sender, to_b_receiver);
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
let to_b_receiver = network.connect(node_b, from_b_receiver, from_b_broadcast_receiver);
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
from_b_sender,
to_b_receiver,
);
let (from_c_sender, from_c_receiver) = channel::unbounded();
let to_c_receiver = network.connect(node_c, from_c_receiver);
let c = MockNetworkInterface::new(node_c, from_c_sender, to_c_receiver);
let (from_c_broadcast_sender, from_c_broadcast_receiver) = channel::unbounded();
let to_c_receiver = network.connect(node_c, from_c_receiver, from_c_broadcast_receiver);
let c = MockNetworkInterface::new(
node_c,
from_c_broadcast_sender,
from_c_sender,
to_c_receiver,
);
a.send_message(node_b, ());
a.send_message(node_c, ());

View File

@ -291,13 +291,11 @@ impl<O: Overlay> CarnotNode<O> {
}
}
Output::BroadcastTimeoutQc { timeout_qc } => {
self.network_interface.send_message(
self.id,
CarnotMessage::TimeoutQc(TimeoutQcMsg {
self.network_interface
.broadcast(CarnotMessage::TimeoutQc(TimeoutQcMsg {
source: self.id,
qc: timeout_qc,
}),
);
}));
}
Output::BroadcastProposal { proposal } => {
self.network_interface

View File

@ -8,7 +8,7 @@ pub(crate) struct Tally<T: core::hash::Hash + Eq> {
impl<T: core::hash::Hash + Eq> Default for Tally<T> {
fn default() -> Self {
Self::new(0)
Self::new(2)
}
}

View File

@ -326,13 +326,9 @@ impl DummyNode {
}
fn handle_message(&mut self, message: &NetworkMessage<DummyMessage>) {
let payload = match message {
NetworkMessage::Adhoc(m) => m.payload.clone(),
NetworkMessage::Broadcast(m) => m.payload.clone(),
};
// The view can change on any message, node needs to change its position
// and roles if the view changes during the message processing.
if let DummyMessage::Proposal(block) = &payload {
if let DummyMessage::Proposal(block) = &message.payload {
if block.view > self.current_view() {
self.update_view(block.view);
}
@ -341,10 +337,10 @@ impl DummyNode {
for role in roles.iter() {
match role {
DummyRole::Leader => self.handle_leader(&payload),
DummyRole::Root => self.handle_root(&payload),
DummyRole::Internal => self.handle_internal(&payload),
DummyRole::Leaf => self.handle_leaf(&payload),
DummyRole::Leader => self.handle_leader(&message.payload),
DummyRole::Root => self.handle_root(&message.payload),
DummyRole::Internal => self.handle_internal(&message.payload),
DummyRole::Leaf => self.handle_leaf(&message.payload),
DummyRole::Unknown => (),
}
}
@ -461,7 +457,7 @@ mod tests {
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
)]);
let regions_data = RegionsData::new(regions, behaviour);
Network::new(regions_data)
Network::new(regions_data, 0)
}
fn init_dummy_nodes(
@ -473,9 +469,16 @@ mod tests {
.iter()
.map(|node_id| {
let (node_message_sender, node_message_receiver) = channel::unbounded();
let network_message_receiver = network.connect(*node_id, node_message_receiver);
let (node_message_broadcast_sender, node_message_broadcast_receiver) =
channel::unbounded();
let network_message_receiver = network.connect(
*node_id,
node_message_receiver,
node_message_broadcast_receiver,
);
let network_interface = InMemoryNetworkInterface::new(
*node_id,
node_message_broadcast_sender,
node_message_sender,
network_message_receiver,
);

View File

@ -99,7 +99,7 @@ mod tests {
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
)]);
let regions_data = RegionsData::new(regions, behaviour);
Network::new(regions_data)
Network::new(regions_data, 0)
}
fn init_dummy_nodes(
@ -111,9 +111,16 @@ mod tests {
.iter()
.map(|node_id| {
let (node_message_sender, node_message_receiver) = channel::unbounded();
let network_message_receiver = network.connect(*node_id, node_message_receiver);
let (node_message_broadcast_sender, node_message_broadcast_receiver) =
channel::unbounded();
let network_message_receiver = network.connect(
*node_id,
node_message_receiver,
node_message_broadcast_receiver,
);
let network_interface = InMemoryNetworkInterface::new(
*node_id,
node_message_broadcast_sender,
node_message_sender,
network_message_receiver,
);

View File

@ -164,7 +164,8 @@ mod tests {
>
})
.collect::<Vec<_>>();
let network = Network::new(RegionsData {
let network = Network::new(
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
@ -213,7 +214,9 @@ mod tests {
)
})
.collect(),
});
},
0,
);
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner

View File

@ -169,7 +169,8 @@ mod tests {
>
})
.collect::<Vec<_>>();
let network = Network::new(RegionsData {
let network = Network::new(
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
@ -218,7 +219,9 @@ mod tests {
)
})
.collect(),
});
},
0,
);
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner.simulate().unwrap();

View File

@ -154,7 +154,8 @@ mod tests {
>
})
.collect::<Vec<_>>();
let network = Network::new(RegionsData {
let network = Network::new(
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
@ -203,7 +204,9 @@ mod tests {
)
})
.collect(),
});
},
0,
);
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner.simulate().unwrap();

View File

@ -154,7 +154,8 @@ mod tests {
>
})
.collect::<Vec<_>>();
let network = Network::new(RegionsData {
let network = Network::new(
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
@ -203,7 +204,9 @@ mod tests {
)
})
.collect(),
});
},
0,
);
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner.simulate().unwrap();