1
0
mirror of synced 2025-01-11 00:05:48 +00:00

Simulation network broadcast (#225)

* Sim network broadcast method

* Remove list of all nodes in carnot node instance

* Use network broadcast method within nodes

* Add helper method for network payload
This commit is contained in:
gusto 2023-06-27 14:20:59 +03:00 committed by GitHub
parent 09370dcef8
commit 55648e3151
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 95 additions and 47 deletions

View File

@ -24,7 +24,7 @@
"step_time": "100ms",
"runner_settings": "Sync",
"stream_settings": {
"format": "json"
"type": "stdout"
},
"node_count": 3,
"views_count": 3,

View File

@ -98,7 +98,6 @@ impl SimulationApp {
CarnotNode::<FlatOverlay<RoundRobin>>::new(
node_id,
CarnotSettings::new(
nodes,
simulation_settings.node_settings.timeout,
simulation_settings.record_settings.clone(),
),

View File

@ -194,11 +194,35 @@ where
network_time: &NetworkTime,
message: &NetworkMessage<M>,
) -> bool {
if let Some(delay) = self.send_message_cost(rng, message.from, message.to) {
match message {
NetworkMessage::Adhoc(msg) => {
let recipient = msg.to.expect("Adhoc message has recipient");
let to_node = self.to_node_senders.get(&recipient).unwrap();
self.send_delayed(rng, recipient, to_node, network_time, msg)
}
NetworkMessage::Broadcast(msg) => {
let mut adhoc = msg.clone();
for (recipient, to_node) in self.to_node_senders.iter() {
adhoc.to = Some(*recipient);
self.send_delayed(rng, *recipient, to_node, network_time, &adhoc);
}
false
}
}
}
fn send_delayed<R: Rng>(
&self,
rng: &mut R,
to: NodeId,
to_node: &Sender<NetworkMessage<M>>,
network_time: &NetworkTime,
msg: &AdhocMessage<M>,
) -> bool {
if let Some(delay) = self.send_message_cost(rng, msg.from, to) {
if network_time.add(delay) <= self.network_time {
let to_node = self.to_node_senders.get(&message.to).unwrap();
to_node
.send(message.clone())
.send(NetworkMessage::Adhoc(msg.clone()))
.expect("Node should have connection");
return false;
} else {
@ -210,21 +234,47 @@ where
}
#[derive(Clone, Debug)]
pub struct NetworkMessage<M> {
pub struct AdhocMessage<M> {
pub from: NodeId,
pub to: NodeId,
pub to: Option<NodeId>,
pub payload: M,
}
#[derive(Clone, Debug)]
pub enum NetworkMessage<M> {
Adhoc(AdhocMessage<M>),
Broadcast(AdhocMessage<M>),
}
impl<M> NetworkMessage<M> {
pub fn new(from: NodeId, to: NodeId, payload: M) -> Self {
Self { from, to, payload }
pub fn adhoc(from: NodeId, to: NodeId, payload: M) -> Self {
Self::Adhoc(AdhocMessage {
from,
to: Some(to),
payload,
})
}
pub fn broadcast(from: NodeId, payload: M) -> Self {
Self::Broadcast(AdhocMessage {
from,
to: None,
payload,
})
}
pub fn get_payload(self) -> M {
match self {
NetworkMessage::Adhoc(AdhocMessage { payload, .. }) => payload,
NetworkMessage::Broadcast(AdhocMessage { payload, .. }) => payload,
}
}
}
pub trait NetworkInterface {
type Payload;
fn broadcast(&self, message: Self::Payload);
fn send_message(&self, address: NodeId, message: Self::Payload);
fn receive_messages(&self) -> Vec<NetworkMessage<Self::Payload>>;
}
@ -252,8 +302,13 @@ impl<M> InMemoryNetworkInterface<M> {
impl<M> NetworkInterface for InMemoryNetworkInterface<M> {
type Payload = M;
fn broadcast(&self, message: Self::Payload) {
let message = NetworkMessage::broadcast(self.id, message);
self.sender.send(message).unwrap();
}
fn send_message(&self, address: NodeId, message: Self::Payload) {
let message = NetworkMessage::new(self.id, address, message);
let message = NetworkMessage::adhoc(self.id, address, message);
self.sender.send(message).unwrap();
}
@ -296,8 +351,13 @@ mod tests {
impl NetworkInterface for MockNetworkInterface {
type Payload = ();
fn broadcast(&self, message: Self::Payload) {
let message = NetworkMessage::broadcast(self.id, message);
self.sender.send(message).unwrap();
}
fn send_message(&self, address: NodeId, message: Self::Payload) {
let message = NetworkMessage::new(self.id, address, message);
let message = NetworkMessage::adhoc(self.id, address, message);
self.sender.send(message).unwrap();
}

View File

@ -186,19 +186,13 @@ impl<O: Overlay> From<&Carnot<O>> for CarnotState {
#[derive(Clone, Default, Deserialize)]
pub struct CarnotSettings {
nodes: Vec<consensus_engine::NodeId>,
timeout: Duration,
record_settings: HashMap<String, bool>,
}
impl CarnotSettings {
pub fn new(
nodes: Vec<consensus_engine::NodeId>,
timeout: Duration,
record_settings: HashMap<String, bool>,
) -> Self {
pub fn new(timeout: Duration, record_settings: HashMap<String, bool>) -> Self {
Self {
nodes,
timeout,
record_settings,
}
@ -249,11 +243,6 @@ impl<O: Overlay> CarnotNode<O> {
this
}
pub(crate) fn send_message(&self, message: NetworkMessage<CarnotMessage>) {
self.network_interface
.send_message(self.id, message.payload);
}
fn handle_output(&self, output: Output<CarnotTx>) {
match output {
Output::Send(consensus_engine::Send {
@ -312,16 +301,12 @@ impl<O: Overlay> CarnotNode<O> {
);
}
Output::BroadcastProposal { proposal } => {
for node in &self.settings.nodes {
self.network_interface.send_message(
*node,
CarnotMessage::Proposal(ProposalChunkMsg {
chunk: proposal.as_bytes().to_vec().into(),
proposal: proposal.header().id,
view: proposal.header().view,
}),
)
}
self.network_interface
.broadcast(CarnotMessage::Proposal(ProposalChunkMsg {
chunk: proposal.as_bytes().to_vec().into(),
proposal: proposal.header().id,
view: proposal.header().view,
}))
}
}
}
@ -349,7 +334,7 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
.network_interface
.receive_messages()
.into_iter()
.map(|m| m.payload)
.map(NetworkMessage::get_payload)
.partition(|m| {
m.view() == self.engine.current_view()
|| matches!(m, CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_))

View File

@ -236,8 +236,8 @@ impl DummyNode {
// - Leader gets vote from root nodes that are in previous overlay.
// - Leader sends NewView message to all it's view nodes if it receives votes from all root
// nodes.
fn handle_leader(&mut self, message: &NetworkMessage<DummyMessage>) {
if let DummyMessage::Vote(vote) = &message.payload {
fn handle_leader(&mut self, payload: &DummyMessage) {
if let DummyMessage::Vote(vote) = payload {
// Internal node can be a leader in the next view, check if the vote traversed the
// whole tree.
if vote.intent != Intent::FromRootToLeader || vote.view < self.current_view() {
@ -261,8 +261,8 @@ impl DummyNode {
}
}
fn handle_root(&mut self, message: &NetworkMessage<DummyMessage>) {
if let DummyMessage::Vote(vote) = &message.payload {
fn handle_root(&mut self, payload: &DummyMessage) {
if let DummyMessage::Vote(vote) = payload {
// Root node can be a leader in the next view, check if the vote traversed the
// whole tree.
if vote.intent != Intent::FromInternalToInternal || vote.view != self.current_view() {
@ -282,8 +282,8 @@ impl DummyNode {
}
}
fn handle_internal(&mut self, message: &NetworkMessage<DummyMessage>) {
if let DummyMessage::Vote(vote) = &message.payload {
fn handle_internal(&mut self, message: &DummyMessage) {
if let DummyMessage::Vote(vote) = &message {
// Internal node can be a leader in the next view, check if the vote traversed the
// whole tree.
if vote.intent != Intent::FromLeafToInternal
@ -313,8 +313,8 @@ impl DummyNode {
}
}
fn handle_leaf(&mut self, message: &NetworkMessage<DummyMessage>) {
if let DummyMessage::Proposal(block) = &message.payload {
fn handle_leaf(&mut self, payload: &DummyMessage) {
if let DummyMessage::Proposal(block) = &payload {
if !self.is_vote_sent(block.view) {
let parents = &self.local_view.parents.as_ref().expect("leaf has parents");
parents.iter().for_each(|node_id| {
@ -326,9 +326,13 @@ impl DummyNode {
}
fn handle_message(&mut self, message: &NetworkMessage<DummyMessage>) {
let payload = match message {
NetworkMessage::Adhoc(m) => m.payload.clone(),
NetworkMessage::Broadcast(m) => m.payload.clone(),
};
// The view can change on any message, node needs to change its position
// and roles if the view changes during the message processing.
if let DummyMessage::Proposal(block) = &message.payload {
if let DummyMessage::Proposal(block) = &payload {
if block.view > self.current_view() {
self.update_view(block.view);
}
@ -337,10 +341,10 @@ impl DummyNode {
for role in roles.iter() {
match role {
DummyRole::Leader => self.handle_leader(message),
DummyRole::Root => self.handle_root(message),
DummyRole::Internal => self.handle_internal(message),
DummyRole::Leaf => self.handle_leaf(message),
DummyRole::Leader => self.handle_leader(&payload),
DummyRole::Root => self.handle_root(&payload),
DummyRole::Internal => self.handle_internal(&payload),
DummyRole::Leaf => self.handle_leaf(&payload),
DummyRole::Unknown => (),
}
}