Test for partial message sending in simulation (#294)

* Test for partial message sending

* Test correctness, typos

* Fix the node capacity flushing

* Not process double timeout qcs in simulations

* Discard older view messages in simulations messages

* Refactor committed_blocks to latest_committed_blocks

* Remove tally default

* Fix condition to root committee parenting

* Bring back pruning

* Clippy happy

---------

Co-authored-by: danielsanchezq <sanchez.quiros.daniel@gmail.com>
This commit is contained in:
gusto 2023-08-08 18:00:08 +03:00 committed by GitHub
parent 4bdc3ed15a
commit d675585a0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 132 additions and 50 deletions

View File

@ -323,12 +323,17 @@ impl<O: Overlay> Carnot<O> {
self.latest_committed_block().view self.latest_committed_block().view
} }
pub fn committed_blocks(&self) -> Vec<BlockId> { pub fn latest_committed_blocks(&self) -> Vec<BlockId> {
let mut res = vec![]; let mut res = vec![];
let mut current = self.latest_committed_block(); let mut current = self.latest_committed_block();
while current != self.genesis_block() { while current != self.genesis_block() {
res.push(current.id); res.push(current.id);
current = self.safe_blocks.get(&current.parent()).unwrap().clone(); current = if let Some(new_current) = self.safe_blocks.get(&current.parent()) {
new_current.clone()
} else {
break;
};
// current = self.safe_blocks.get(&current.parent()).unwrap().clone();
} }
res.push(self.genesis_block().id); res.push(self.genesis_block().id);
res res
@ -403,7 +408,7 @@ impl<O: Overlay> Carnot<O> {
// do not remove genesis // do not remove genesis
let view_zero = View::new(0); let view_zero = View::new(0);
self.safe_blocks self.safe_blocks
.retain(|_, b| b.view > threshold_view && view_zero == b.view); .retain(|_, b| b.view > threshold_view || view_zero == b.view);
} }
} }
@ -481,7 +486,7 @@ mod test {
assert_eq!(engine.high_qc(), genesis.parent_qc.high_qc()); assert_eq!(engine.high_qc(), genesis.parent_qc.high_qc());
assert_eq!(engine.blocks_in_view(View(0)), vec![genesis.clone()]); assert_eq!(engine.blocks_in_view(View(0)), vec![genesis.clone()]);
assert_eq!(engine.last_view_timeout_qc(), None); assert_eq!(engine.last_view_timeout_qc(), None);
assert_eq!(engine.committed_blocks(), vec![genesis.id]); assert_eq!(engine.latest_committed_blocks(), vec![genesis.id]);
} }
#[test] #[test]
@ -575,7 +580,7 @@ mod test {
engine = engine.receive_block(block3.clone()).unwrap(); engine = engine.receive_block(block3.clone()).unwrap();
assert_eq!(engine.latest_committed_block(), block1); assert_eq!(engine.latest_committed_block(), block1);
assert_eq!( assert_eq!(
engine.committed_blocks(), engine.latest_committed_blocks(),
vec![block1.id, engine.genesis_block().id] // without block2 and block3 vec![block1.id, engine.genesis_block().id] // without block2 and block3
); );
engine = update_leader_selection(&engine); engine = update_leader_selection(&engine);
@ -584,7 +589,7 @@ mod test {
engine = engine.receive_block(block4).unwrap(); engine = engine.receive_block(block4).unwrap();
assert_eq!(engine.latest_committed_block(), block2); assert_eq!(engine.latest_committed_block(), block2);
assert_eq!( assert_eq!(
engine.committed_blocks(), engine.latest_committed_blocks(),
vec![block2.id, block1.id, engine.genesis_block().id] // without block3, block4 vec![block2.id, block1.id, engine.genesis_block().id] // without block3, block4
); );
} }

View File

@ -298,7 +298,7 @@ where
local_high_qc: carnot.high_qc(), local_high_qc: carnot.high_qc(),
safe_blocks: carnot.safe_blocks().clone(), safe_blocks: carnot.safe_blocks().clone(),
last_view_timeout_qc: carnot.last_view_timeout_qc(), last_view_timeout_qc: carnot.last_view_timeout_qc(),
committed_blocks: carnot.committed_blocks(), committed_blocks: carnot.latest_committed_blocks(),
}; };
tx.send(info).unwrap_or_else(|e| { tx.send(info).unwrap_or_else(|e| {
tracing::error!("Could not send consensus info through channel: {:?}", e) tracing::error!("Could not send consensus info through channel: {:?}", e)

View File

@ -270,9 +270,9 @@ where
let to = message.to.expect("adhoc message has recipient"); let to = message.to.expect("adhoc message has recipient");
if let Some(delay) = self.send_message_cost(rng, message.from, to) { if let Some(delay) = self.send_message_cost(rng, message.from, to) {
let node_capacity = self.node_network_capacity.get(&to).unwrap(); let node_capacity = self.node_network_capacity.get(&to).unwrap();
let should_delay = network_time.add(delay) <= self.network_time; let should_send = network_time.add(delay) <= self.network_time;
let remaining_size = message.remaining_size(); let remaining_size = message.remaining_size();
if should_delay && node_capacity.increase_load(remaining_size) { if should_send && node_capacity.increase_load(remaining_size) {
let to_node = self.to_node_senders.get(&to).unwrap(); let to_node = self.to_node_senders.get(&to).unwrap();
to_node to_node
.send(message.clone()) .send(message.clone())
@ -282,7 +282,7 @@ where
} else { } else {
// if we do not need to delay, then we should check if the msg is too large // if we do not need to delay, then we should check if the msg is too large
// if so, we mock the partial sending message behavior // if so, we mock the partial sending message behavior
if should_delay { if should_send {
// if remaining is 0, we should send without delay // if remaining is 0, we should send without delay
return self.try_partial_send(node_capacity, message, &to) != 0; return self.try_partial_send(node_capacity, message, &to) != 0;
} }
@ -303,12 +303,14 @@ where
let sent = node_capacity.capacity_bps - *cap; let sent = node_capacity.capacity_bps - *cap;
*cap = node_capacity.capacity_bps; *cap = node_capacity.capacity_bps;
let remaining = message.partial_send(sent); let remaining = message.partial_send(sent);
// Message is partially sent, the node capacity needs to be flushed at the end of step even
// if the whole message is not sent.
node_capacity.decrease_load(sent);
if remaining == 0 { if remaining == 0 {
let to_node = self.to_node_senders.get(to).unwrap(); let to_node = self.to_node_senders.get(to).unwrap();
to_node to_node
.send(message.clone()) .send(message.clone())
.expect("node should have connection"); .expect("node should have connection");
node_capacity.decrease_load(sent);
} }
remaining remaining
} }
@ -426,6 +428,7 @@ mod tests {
broadcast: Sender<NetworkMessage<()>>, broadcast: Sender<NetworkMessage<()>>,
sender: Sender<NetworkMessage<()>>, sender: Sender<NetworkMessage<()>>,
receiver: Receiver<NetworkMessage<()>>, receiver: Receiver<NetworkMessage<()>>,
message_size: u32,
} }
impl MockNetworkInterface { impl MockNetworkInterface {
@ -434,12 +437,14 @@ mod tests {
broadcast: Sender<NetworkMessage<()>>, broadcast: Sender<NetworkMessage<()>>,
sender: Sender<NetworkMessage<()>>, sender: Sender<NetworkMessage<()>>,
receiver: Receiver<NetworkMessage<()>>, receiver: Receiver<NetworkMessage<()>>,
message_size: u32,
) -> Self { ) -> Self {
Self { Self {
id, id,
broadcast, broadcast,
sender, sender,
receiver, receiver,
message_size,
} }
} }
} }
@ -448,12 +453,12 @@ mod tests {
type Payload = (); type Payload = ();
fn broadcast(&self, message: Self::Payload) { fn broadcast(&self, message: Self::Payload) {
let message = NetworkMessage::new(self.id, None, message, 1); let message = NetworkMessage::new(self.id, None, message, self.message_size);
self.broadcast.send(message).unwrap(); self.broadcast.send(message).unwrap();
} }
fn send_message(&self, address: NodeId, message: Self::Payload) { fn send_message(&self, address: NodeId, message: Self::Payload) {
let message = NetworkMessage::new(self.id, Some(address), message, 1); let message = NetworkMessage::new(self.id, Some(address), message, self.message_size);
self.sender.send(message).unwrap(); self.sender.send(message).unwrap();
} }
@ -483,6 +488,7 @@ mod tests {
from_a_broadcast_sender, from_a_broadcast_sender,
from_a_sender, from_a_sender,
to_a_receiver, to_a_receiver,
1,
); );
let (from_b_sender, from_b_receiver) = channel::unbounded(); let (from_b_sender, from_b_receiver) = channel::unbounded();
@ -493,6 +499,7 @@ mod tests {
from_b_broadcast_sender, from_b_broadcast_sender,
from_b_sender, from_b_sender,
to_b_receiver, to_b_receiver,
1,
); );
a.send_message(node_b, ()); a.send_message(node_b, ());
@ -558,6 +565,7 @@ mod tests {
from_a_broadcast_sender, from_a_broadcast_sender,
from_a_sender, from_a_sender,
to_a_receiver, to_a_receiver,
1,
); );
let (from_b_sender, from_b_receiver) = channel::unbounded(); let (from_b_sender, from_b_receiver) = channel::unbounded();
@ -568,6 +576,7 @@ mod tests {
from_b_broadcast_sender, from_b_broadcast_sender,
from_b_sender, from_b_sender,
to_b_receiver, to_b_receiver,
1,
); );
let (from_c_sender, from_c_receiver) = channel::unbounded(); let (from_c_sender, from_c_receiver) = channel::unbounded();
@ -578,6 +587,7 @@ mod tests {
from_c_broadcast_sender, from_c_broadcast_sender,
from_c_sender, from_c_sender,
to_c_receiver, to_c_receiver,
1,
); );
a.send_message(node_b, ()); a.send_message(node_b, ());
@ -633,6 +643,7 @@ mod tests {
from_a_broadcast_sender, from_a_broadcast_sender,
from_a_sender, from_a_sender,
to_a_receiver, to_a_receiver,
1,
); );
let (from_b_sender, from_b_receiver) = channel::unbounded(); let (from_b_sender, from_b_receiver) = channel::unbounded();
@ -643,6 +654,7 @@ mod tests {
from_b_broadcast_sender, from_b_broadcast_sender,
from_b_sender, from_b_sender,
to_b_receiver, to_b_receiver,
1,
); );
for _ in 0..6 { for _ in 0..6 {
@ -662,4 +674,71 @@ mod tests {
assert_eq!(a.receive_messages().len(), 0); assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 2); assert_eq!(b.receive_messages().len(), 2);
} }
#[test]
fn node_network_message_partial_send() {
let node_a = NodeId::from_index(0);
let node_b = NodeId::from_index(1);
let regions = HashMap::from([(Region::Europe, vec![node_a, node_b])]);
let behaviour = HashMap::from([(
NetworkBehaviourKey::new(Region::Europe, Region::Europe),
NetworkBehaviour::new(Duration::from_millis(100), 0.0),
)]);
let regions_data = RegionsData::new(regions, behaviour);
let mut network = Network::new(regions_data, 0);
let (from_a_sender, from_a_receiver) = channel::unbounded();
let (from_a_broadcast_sender, from_a_broadcast_receiver) = channel::unbounded();
// Node A is connected to the network with throuput of 5.
let to_a_receiver = network.connect(node_a, 5, from_a_receiver, from_a_broadcast_receiver);
// Every message sent **from** Node A will be of size 15.
let a = MockNetworkInterface::new(
node_a,
from_a_broadcast_sender,
from_a_sender,
to_a_receiver,
2,
);
let (from_b_sender, from_b_receiver) = channel::unbounded();
let (from_b_broadcast_sender, from_b_broadcast_receiver) = channel::unbounded();
// Node B is connected to the network with throuput of 1.
let to_b_receiver = network.connect(node_b, 1, from_b_receiver, from_b_broadcast_receiver);
// Every message sent **from** Node B will be of size 2.
let b = MockNetworkInterface::new(
node_b,
from_b_broadcast_sender,
from_b_sender,
to_b_receiver,
15,
);
// Node A sends message of size 2 to Node B.
a.send_message(node_b, ());
// Node B sends message of size 15 to Node A.
b.send_message(node_a, ());
// Step duration matches the latency between nodes, thus Node A can receive 5 units of a
// message, Node B - 1 unit of a message during the step.
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 0);
// Node B should receive a message during the second step, because it's throughput during the
// step is 1, but the message size it receives is 2.
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 0);
assert_eq!(b.receive_messages().len(), 1);
// Node A should receive a message during the third step, because it's throughput during the
// step is 5, but the message it recieves is of size 15.
network.step(Duration::from_millis(100));
assert_eq!(a.receive_messages().len(), 1);
assert_eq!(b.receive_messages().len(), 0);
}
} }

View File

@ -25,11 +25,11 @@ pub(crate) struct EventBuilder {
impl EventBuilder { impl EventBuilder {
pub fn new(id: NodeId, timeout: Duration) -> Self { pub fn new(id: NodeId, timeout: Duration) -> Self {
Self { Self {
vote_message: Default::default(), vote_message: Tally::new(),
leader_vote_message: Default::default(), leader_vote_message: Tally::new(),
timeout_message: Default::default(), timeout_message: Tally::new(),
leader_new_view_message: Default::default(), leader_new_view_message: Tally::new(),
new_view_message: Default::default(), new_view_message: Tally::new(),
current_view: View::default(), current_view: View::default(),
id, id,
timeout_handler: TimeoutHandler::new(timeout), timeout_handler: TimeoutHandler::new(timeout),
@ -165,7 +165,11 @@ impl EventBuilder {
} }
CarnotMessage::Timeout(msg) => { CarnotMessage::Timeout(msg) => {
let msg_view = msg.vote.view; let msg_view = msg.vote.view;
if let Some(timeouts) = self.timeout_message.tally(msg_view, msg) { if let Some(timeouts) = self.timeout_message.tally_by(
msg_view,
msg,
engine.overlay().super_majority_threshold(self.id),
) {
events.push(Event::RootTimeout { events.push(Event::RootTimeout {
timeouts: timeouts.into_iter().map(|v| v.vote).collect(), timeouts: timeouts.into_iter().map(|v| v.vote).collect(),
}) })

View File

@ -190,7 +190,7 @@ impl<O: Overlay> From<&Carnot<O>> for CarnotState {
.map(|b| (b.id, b)) .map(|b| (b.id, b))
.collect(), .collect(),
last_view_timeout_qc: value.last_view_timeout_qc(), last_view_timeout_qc: value.last_view_timeout_qc(),
committed_blocks: value.committed_blocks(), committed_blocks: value.latest_committed_blocks(),
highest_voted_view: Default::default(), highest_voted_view: Default::default(),
step_duration: Default::default(), step_duration: Default::default(),
} }
@ -344,20 +344,21 @@ impl<
"receive block proposal", "receive block proposal",
); );
match self.engine.receive_block(block.header().clone()) { match self.engine.receive_block(block.header().clone()) {
Ok(mut new) => { Ok(new) => {
if self.engine.current_view() != new.current_view() { if self.engine.current_view() != new.current_view() {
new = new // TODO: Refactor this into a method, use for timeout qc as well
.update_overlay(|overlay| { // new = new
let overlay = overlay // .update_overlay(|overlay| {
.update_leader_selection(|leader_selection| { // let overlay = overlay
leader_selection.on_new_block_received(&block) // .update_leader_selection(|leader_selection| {
}) // leader_selection.on_new_block_received(&block)
.expect("Leader selection update should succeed"); // })
overlay.update_committees(|committee_membership| { // .expect("Leader selection update should succeed");
committee_membership.on_new_block_received(&block) // overlay.update_committees(|committee_membership| {
}) // committee_membership.on_new_block_received(&block)
}) // })
.unwrap_or(new); // })
// .unwrap_or(new);
self.engine = new; self.engine = new;
} }
} }
@ -373,7 +374,7 @@ impl<
if self.engine.overlay().is_member_of_leaf_committee(self.id) { if self.engine.overlay().is_member_of_leaf_committee(self.id) {
// Check if we are also a member of the parent committee, this is a special case for the flat committee // Check if we are also a member of the parent committee, this is a special case for the flat committee
let to = if self.engine.overlay().is_child_of_root_committee(self.id) { let to = if self.engine.overlay().is_member_of_root_committee(self.id) {
[self.engine.overlay().next_leader()].into_iter().collect() [self.engine.overlay().next_leader()].into_iter().collect()
} else { } else {
self.engine.parent_committee().expect( self.engine.parent_committee().expect(
@ -440,9 +441,12 @@ impl<
timeout_view = %timeout_qc.view(), timeout_view = %timeout_qc.view(),
"receive new view message" "receive new view message"
); );
let (new, out) = self.engine.approve_new_view(timeout_qc, new_views); // just process timeout if node have not already process it
output = Some(Output::Send(out)); if timeout_qc.view() == self.engine.current_view() {
self.engine = new; let (new, out) = self.engine.approve_new_view(timeout_qc, new_views);
output = Some(Output::Send(out));
self.engine = new;
}
} }
Event::TimeoutQc { timeout_qc } => { Event::TimeoutQc { timeout_qc } => {
tracing::info!( tracing::info!(
@ -522,6 +526,8 @@ impl<
.receive_messages() .receive_messages()
.into_iter() .into_iter()
.map(NetworkMessage::into_payload) .map(NetworkMessage::into_payload)
// do not care for older view messages
.filter(|m| m.view() >= self.engine.current_view())
.partition(|m| { .partition(|m| {
m.view() == self.engine.current_view() m.view() == self.engine.current_view()
|| matches!(m, CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_)) || matches!(m, CarnotMessage::Proposal(_) | CarnotMessage::TimeoutQc(_))

View File

@ -3,27 +3,15 @@ use std::collections::{HashMap, HashSet};
pub(crate) struct Tally<T: core::hash::Hash + Eq + Clone> { pub(crate) struct Tally<T: core::hash::Hash + Eq + Clone> {
cache: HashMap<View, HashSet<T>>, cache: HashMap<View, HashSet<T>>,
threshold: usize,
}
impl<T: core::hash::Hash + Eq + Clone> Default for Tally<T> {
fn default() -> Self {
Self::new(2)
}
} }
impl<T: core::hash::Hash + Eq + Clone> Tally<T> { impl<T: core::hash::Hash + Eq + Clone> Tally<T> {
pub fn new(threshold: usize) -> Self { pub fn new() -> Self {
Self { Self {
cache: Default::default(), cache: Default::default(),
threshold,
} }
} }
pub fn tally(&mut self, view: View, message: T) -> Option<HashSet<T>> {
self.tally_by(view, message, self.threshold)
}
pub fn tally_by(&mut self, view: View, message: T, threshold: usize) -> Option<HashSet<T>> { pub fn tally_by(&mut self, view: View, message: T, threshold: usize) -> Option<HashSet<T>> {
let entries = self.cache.entry(view).or_default(); let entries = self.cache.entry(view).or_default();
entries.insert(message); entries.insert(message);