feat: enforce TimeoutQc to be constructed only by new() (#229)

This commit is contained in:
Youngjoon Lee 2023-06-28 23:37:27 +09:00 committed by GitHub
parent 224a3a53f5
commit 98aa138b87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 141 additions and 72 deletions

View File

@ -96,13 +96,13 @@ impl<O: Overlay> Carnot<O> {
pub fn receive_timeout_qc(&self, timeout_qc: TimeoutQc) -> Self { pub fn receive_timeout_qc(&self, timeout_qc: TimeoutQc) -> Self {
let mut new_state = self.clone(); let mut new_state = self.clone();
if timeout_qc.view < new_state.current_view { if timeout_qc.view() < new_state.current_view {
return new_state; return new_state;
} }
new_state.update_high_qc(Qc::Standard(timeout_qc.high_qc.clone())); new_state.update_high_qc(Qc::Standard(timeout_qc.high_qc().clone()));
new_state.update_timeout_qc(timeout_qc.clone()); new_state.update_timeout_qc(timeout_qc.clone());
new_state.current_view = timeout_qc.view + 1; new_state.current_view = timeout_qc.view() + 1;
new_state.overlay.rebuild(timeout_qc); new_state.overlay.rebuild(timeout_qc);
new_state new_state
@ -159,13 +159,13 @@ impl<O: Overlay> Carnot<O> {
timeout_qc: TimeoutQc, timeout_qc: TimeoutQc,
new_views: HashSet<NewView>, new_views: HashSet<NewView>,
) -> (Self, Send) { ) -> (Self, Send) {
let new_view = timeout_qc.view + 1; let new_view = timeout_qc.view() + 1;
assert!( assert!(
new_view new_view
> self > self
.last_view_timeout_qc .last_view_timeout_qc
.as_ref() .as_ref()
.map(|qc| qc.view) .map(|qc| qc.view())
.unwrap_or(0), .unwrap_or(0),
"can't vote for a new view not bigger than the last timeout_qc" "can't vote for a new view not bigger than the last timeout_qc"
); );
@ -185,7 +185,7 @@ impl<O: Overlay> Carnot<O> {
let high_qc = new_views let high_qc = new_views
.iter() .iter()
.map(|nv| &nv.high_qc) .map(|nv| &nv.high_qc)
.chain(std::iter::once(&timeout_qc.high_qc)) .chain(std::iter::once(timeout_qc.high_qc()))
.max_by_key(|qc| qc.view) .max_by_key(|qc| qc.view)
.unwrap(); .unwrap();
new_state.update_high_qc(Qc::Standard(high_qc.clone())); new_state.update_high_qc(Qc::Standard(high_qc.clone()));
@ -267,7 +267,7 @@ impl<O: Overlay> Carnot<O> {
(None, timeout_qc) => { (None, timeout_qc) => {
self.last_view_timeout_qc = Some(timeout_qc); self.last_view_timeout_qc = Some(timeout_qc);
} }
(Some(current_qc), timeout_qc) if timeout_qc.view > current_qc.view => { (Some(current_qc), timeout_qc) if timeout_qc.view() > current_qc.view() => {
self.last_view_timeout_qc = Some(timeout_qc); self.last_view_timeout_qc = Some(timeout_qc);
} }
_ => {} _ => {}
@ -643,16 +643,16 @@ mod test {
let (mut engine, _) = engine.local_timeout(); let (mut engine, _) = engine.local_timeout();
assert_eq!(engine.current_view(), 1); assert_eq!(engine.current_view(), 1);
let timeout_qc = TimeoutQc { let timeout_qc = TimeoutQc::new(
view: 1, 1,
high_qc: StandardQc { StandardQc {
view: 0, // genesis view: 0, // genesis
id: [0; 32], id: [0; 32],
}, },
sender: [0; 32], [0; 32],
}; );
engine = engine.receive_timeout_qc(timeout_qc.clone()); engine = engine.receive_timeout_qc(timeout_qc.clone());
assert_eq!(engine.local_high_qc, timeout_qc.high_qc); assert_eq!(&engine.local_high_qc, timeout_qc.high_qc());
assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc)); assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc));
assert_eq!(engine.current_view(), 2); assert_eq!(engine.current_view(), 2);
} }
@ -667,16 +667,16 @@ mod test {
// before local_timeout occurs // before local_timeout occurs
assert_eq!(engine.current_view(), 1); assert_eq!(engine.current_view(), 1);
let timeout_qc = TimeoutQc { let timeout_qc = TimeoutQc::new(
view: 1, 1,
high_qc: StandardQc { StandardQc {
view: 0, // genesis view: 0, // genesis
id: [0; 32], id: [0; 32],
}, },
sender: [0; 32], [0; 32],
}; );
engine = engine.receive_timeout_qc(timeout_qc.clone()); engine = engine.receive_timeout_qc(timeout_qc.clone());
assert_eq!(engine.local_high_qc, timeout_qc.high_qc); assert_eq!(&engine.local_high_qc, timeout_qc.high_qc());
assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc)); assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc));
assert_eq!(engine.current_view(), 2); assert_eq!(engine.current_view(), 2);
} }
@ -689,22 +689,22 @@ mod test {
engine = engine.receive_block(block).unwrap(); // received but not approved yet engine = engine.receive_block(block).unwrap(); // received but not approved yet
assert_eq!(engine.current_view(), 1); // still waiting for a QC(view=1) assert_eq!(engine.current_view(), 1); // still waiting for a QC(view=1)
let timeout_qc = TimeoutQc { let timeout_qc = TimeoutQc::new(
view: 1, 1,
high_qc: StandardQc { StandardQc {
view: 0, // genesis view: 0, // genesis
id: [0; 32], id: [0; 32],
}, },
sender: [0; 32], [0; 32],
}; );
engine = engine.receive_timeout_qc(timeout_qc.clone()); engine = engine.receive_timeout_qc(timeout_qc.clone());
assert_eq!(engine.local_high_qc, timeout_qc.high_qc); assert_eq!(&engine.local_high_qc, timeout_qc.high_qc());
assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc.clone())); assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc.clone()));
assert_eq!(engine.current_view(), 2); assert_eq!(engine.current_view(), 2);
assert_eq!(engine.highest_voted_view, -1); // didn't vote on anything yet assert_eq!(engine.highest_voted_view, -1); // didn't vote on anything yet
let (engine, send) = engine.approve_new_view(timeout_qc.clone(), HashSet::new()); let (engine, send) = engine.approve_new_view(timeout_qc.clone(), HashSet::new());
assert_eq!(engine.high_qc(), timeout_qc.high_qc); assert_eq!(&engine.high_qc(), timeout_qc.high_qc());
assert_eq!(engine.current_view(), 2); // not changed assert_eq!(engine.current_view(), 2); // not changed
assert_eq!(engine.highest_voted_view, 2); assert_eq!(engine.highest_voted_view, 2);
assert_eq!( assert_eq!(
@ -713,7 +713,7 @@ mod test {
view: 2, view: 2,
sender: [0; 32], sender: [0; 32],
timeout_qc: timeout_qc.clone(), timeout_qc: timeout_qc.clone(),
high_qc: timeout_qc.high_qc, high_qc: timeout_qc.high_qc().clone(),
}) })
); );
} }
@ -726,26 +726,26 @@ mod test {
engine = engine.receive_block(block).unwrap(); // received but not approved yet engine = engine.receive_block(block).unwrap(); // received but not approved yet
assert_eq!(engine.current_view(), 1); assert_eq!(engine.current_view(), 1);
let timeout_qc1 = TimeoutQc { let timeout_qc1 = TimeoutQc::new(
view: 1, 1,
high_qc: StandardQc { StandardQc {
view: 0, // genesis view: 0, // genesis
id: [0; 32], id: [0; 32],
}, },
sender: [0; 32], [0; 32],
}; );
engine = engine.receive_timeout_qc(timeout_qc1.clone()); engine = engine.receive_timeout_qc(timeout_qc1.clone());
assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc1.clone())); assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc1.clone()));
// receiving a timeout_qc2 before approving new_view(timeout_qc1) // receiving a timeout_qc2 before approving new_view(timeout_qc1)
let timeout_qc2 = TimeoutQc { let timeout_qc2 = TimeoutQc::new(
view: 2, 2,
high_qc: StandardQc { StandardQc {
view: 0, // genesis view: 0, // genesis
id: [0; 32], id: [0; 32],
}, },
sender: [0; 32], [0; 32],
}; );
engine = engine.receive_timeout_qc(timeout_qc2.clone()); engine = engine.receive_timeout_qc(timeout_qc2.clone());
assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc2)); assert_eq!(engine.last_view_timeout_qc, Some(timeout_qc2));

View File

@ -57,9 +57,38 @@ pub struct NewView {
#[derive(Debug, Clone, Eq, PartialEq, Hash)] #[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct TimeoutQc { pub struct TimeoutQc {
pub view: View, view: View,
pub high_qc: StandardQc, high_qc: StandardQc,
pub sender: NodeId, sender: NodeId,
}
impl TimeoutQc {
pub fn new(view: View, high_qc: StandardQc, sender: NodeId) -> Self {
assert!(
view >= high_qc.view,
"timeout_qc.view:{} shouldn't be lower than timeout_qc.high_qc.view:{}",
view,
high_qc.view,
);
Self {
view,
high_qc,
sender,
}
}
pub fn view(&self) -> View {
self.view
}
pub fn high_qc(&self) -> &StandardQc {
&self.high_qc
}
pub fn sender(&self) -> NodeId {
self.sender
}
} }
#[derive(Debug, Clone, Eq, PartialEq, Hash)] #[derive(Debug, Clone, Eq, PartialEq, Hash)]
@ -186,4 +215,48 @@ mod test {
assert_eq!(qc.block(), [0; 32]); assert_eq!(qc.block(), [0; 32]);
assert_eq!(qc.high_qc(), aggregated_qc.high_qc); assert_eq!(qc.high_qc(), aggregated_qc.high_qc);
} }
#[test]
fn new_timeout_qc() {
let timeout_qc = TimeoutQc::new(
2,
StandardQc {
view: 1,
id: [0; 32],
},
[0; 32],
);
assert_eq!(timeout_qc.view(), 2);
assert_eq!(timeout_qc.high_qc().view, 1);
assert_eq!(timeout_qc.high_qc().id, [0; 32]);
assert_eq!(timeout_qc.sender(), [0; 32]);
let timeout_qc = TimeoutQc::new(
2,
StandardQc {
view: 2,
id: [0; 32],
},
[0; 32],
);
assert_eq!(timeout_qc.view(), 2);
assert_eq!(timeout_qc.high_qc().view, 2);
assert_eq!(timeout_qc.high_qc().id, [0; 32]);
assert_eq!(timeout_qc.sender(), [0; 32]);
}
#[test]
#[should_panic(
expected = "timeout_qc.view:1 shouldn't be lower than timeout_qc.high_qc.view:2"
)]
fn new_timeout_qc_panic() {
let _ = TimeoutQc::new(
1,
StandardQc {
view: 2,
id: [0; 32],
},
[0; 32],
);
}
} }

View File

@ -102,10 +102,10 @@ impl ReferenceStateMachine for RefState {
Transition::ApprovePastBlock(block) => state.highest_voted_view >= block.view, Transition::ApprovePastBlock(block) => state.highest_voted_view >= block.view,
Transition::LocalTimeout => true, Transition::LocalTimeout => true,
Transition::ReceiveTimeoutQcForRecentView(timeout_qc) => { Transition::ReceiveTimeoutQcForRecentView(timeout_qc) => {
timeout_qc.view == state.current_view() timeout_qc.view() == state.current_view()
} }
Transition::ReceiveTimeoutQcForOldView(timeout_qc) => { Transition::ReceiveTimeoutQcForOldView(timeout_qc) => {
timeout_qc.view < state.current_view() timeout_qc.view() < state.current_view()
} }
Transition::ApproveNewViewWithLatestTimeoutQc(timeout_qc, _) => { Transition::ApproveNewViewWithLatestTimeoutQc(timeout_qc, _) => {
state.latest_timeout_qcs().contains(timeout_qc) state.latest_timeout_qcs().contains(timeout_qc)
@ -142,7 +142,7 @@ impl ReferenceStateMachine for RefState {
Transition::ReceiveTimeoutQcForRecentView(timeout_qc) => { Transition::ReceiveTimeoutQcForRecentView(timeout_qc) => {
state state
.chain .chain
.entry(timeout_qc.view) .entry(timeout_qc.view())
.or_default() .or_default()
.timeout_qcs .timeout_qcs
.insert(timeout_qc.clone()); .insert(timeout_qc.clone());
@ -262,14 +262,14 @@ impl RefState {
.prop_flat_map(move |block| { .prop_flat_map(move |block| {
(current_view..=current_view + delta) // including future views (current_view..=current_view + delta) // including future views
.prop_map(move |view| { .prop_map(move |view| {
Transition::ReceiveTimeoutQcForRecentView(TimeoutQc { Transition::ReceiveTimeoutQcForRecentView(TimeoutQc::new(
view, view,
high_qc: StandardQc { StandardQc {
view: block.view, view: block.view,
id: block.id, id: block.id,
}, },
sender: SENDER, SENDER,
}) ))
}) })
}) })
.boxed() .boxed()
@ -290,11 +290,11 @@ impl RefState {
} else { } else {
proptest::sample::select(old_view_entries) proptest::sample::select(old_view_entries)
.prop_map(move |(view, entry)| { .prop_map(move |(view, entry)| {
Transition::ReceiveTimeoutQcForOldView(TimeoutQc { Transition::ReceiveTimeoutQcForOldView(TimeoutQc::new(
view, view,
high_qc: entry.high_qc().unwrap(), entry.high_qc().unwrap(),
sender: SENDER, SENDER,
}) ))
}) })
.boxed() .boxed()
} }
@ -349,7 +349,7 @@ impl RefState {
} }
pub fn new_view_from(timeout_qc: &TimeoutQc) -> View { pub fn new_view_from(timeout_qc: &TimeoutQc) -> View {
timeout_qc.view + 1 timeout_qc.view() + 1
} }
pub fn high_qc(&self) -> StandardQc { pub fn high_qc(&self) -> StandardQc {
@ -415,7 +415,7 @@ impl ViewEntry {
let iter2 = self let iter2 = self
.timeout_qcs .timeout_qcs
.iter() .iter()
.map(|timeout_qc| timeout_qc.high_qc.clone()); .map(|timeout_qc| timeout_qc.high_qc().clone());
iter1.chain(iter2).max_by_key(|qc| qc.view) iter1.chain(iter2).max_by_key(|qc| qc.view)
} }
} }

View File

@ -18,5 +18,5 @@ prop_state_machine! {
#[test] #[test]
// run 100 state transitions per test case // run 100 state transitions per test case
fn consensus_engine_test(sequential 1..100 => ConsensusEngineTest); fn consensus_engine_test(sequential 1..30 => ConsensusEngineTest);
} }

View File

@ -42,6 +42,6 @@ impl UpdateableLeaderSelection for RandomBeaconState {
} }
fn on_timeout_qc_received(&self, qc: TimeoutQc) -> Result<Self, Self::Error> { fn on_timeout_qc_received(&self, qc: TimeoutQc) -> Result<Self, Self::Error> {
Ok(Self::generate_sad(qc.view, self)) Ok(Self::generate_sad(qc.view(), self))
} }
} }

View File

@ -434,7 +434,7 @@ where
participating_nodes: carnot.root_committee(), participating_nodes: carnot.root_committee(),
}; };
let (new_carnot, out) = carnot.approve_new_view(timeout_qc.clone(), new_views); let (new_carnot, out) = carnot.approve_new_view(timeout_qc.clone(), new_views);
let new_view = timeout_qc.view + 1; let new_view = timeout_qc.view() + 1;
if carnot.is_next_leader() { if carnot.is_next_leader() {
let high_qc = carnot.high_qc(); let high_qc = carnot.high_qc();
task_manager.push(new_view, async move { task_manager.push(new_view, async move {
@ -471,7 +471,7 @@ where
participating_nodes: carnot.child_committees().into_iter().flatten().collect(), participating_nodes: carnot.child_committees().into_iter().flatten().collect(),
}; };
task_manager.push( task_manager.push(
timeout_qc.view + 1, timeout_qc.view() + 1,
Self::gather_new_views(adapter, self_committee, timeout_qc.clone(), tally_settings), Self::gather_new_views(adapter, self_committee, timeout_qc.clone(), tally_settings),
); );
if carnot.current_view() != new_state.current_view() { if carnot.current_view() != new_state.current_view() {
@ -498,11 +498,7 @@ where
.clone(); .clone();
let mut output = None; let mut output = None;
if carnot.is_member_of_root_committee() { if carnot.is_member_of_root_committee() {
let timeout_qc = TimeoutQc { let timeout_qc = TimeoutQc::new(carnot.current_view(), high_qc, carnot.id());
view: carnot.current_view(),
high_qc,
sender: carnot.id(),
};
output = Some(Output::BroadcastTimeoutQc { timeout_qc }); output = Some(Output::BroadcastTimeoutQc { timeout_qc });
} }
(carnot, output) (carnot, output)
@ -606,7 +602,7 @@ where
) -> Event<P::Tx> { ) -> Event<P::Tx> {
let tally = NewViewTally::new(tally); let tally = NewViewTally::new(tally);
let stream = adapter let stream = adapter
.new_view_stream(&committee, timeout_qc.view + 1) .new_view_stream(&committee, timeout_qc.view() + 1)
.await; .await;
match tally.tally(timeout_qc.clone(), stream).await { match tally.tally(timeout_qc.clone(), stream).await {
Ok((_qc, new_views)) => Event::NewView { Ok((_qc, new_views)) => Event::NewView {

View File

@ -247,7 +247,7 @@ impl NetworkAdapter for WakuAdapter {
let payload = message.payload(); let payload = message.payload();
let qc = TimeoutQcMsg::from_bytes(payload); let qc = TimeoutQcMsg::from_bytes(payload);
async move { async move {
if qc.qc.view > view { if qc.qc.view() > view {
Some(qc) Some(qc)
} else { } else {
None None

View File

@ -48,7 +48,7 @@ impl Tally for NewViewTally {
while let Some(vote) = vote_stream.next().await { while let Some(vote) = vote_stream.next().await {
// check vote view is valid // check vote view is valid
if !vote.vote.view != timeout_qc.view { if !vote.vote.view != timeout_qc.view() {
continue; continue;
} }

View File

@ -17,7 +17,7 @@ impl CarnotMessage {
match self { match self {
CarnotMessage::Proposal(msg) => msg.view, CarnotMessage::Proposal(msg) => msg.view,
CarnotMessage::Vote(msg) => msg.vote.view, CarnotMessage::Vote(msg) => msg.vote.view,
CarnotMessage::TimeoutQc(msg) => msg.qc.view, CarnotMessage::TimeoutQc(msg) => msg.qc.view(),
CarnotMessage::Timeout(msg) => msg.vote.view, CarnotMessage::Timeout(msg) => msg.vote.view,
CarnotMessage::NewView(msg) => msg.vote.view, CarnotMessage::NewView(msg) => msg.vote.view,
} }

View File

@ -428,7 +428,7 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
tracing::info!( tracing::info!(
node = parse_idx(&self.id), node = parse_idx(&self.id),
current_view = self.engine.current_view(), current_view = self.engine.current_view(),
timeout_view = timeout_qc.view, timeout_view = timeout_qc.view(),
"receive new view message" "receive new view message"
); );
let (new, out) = self.engine.approve_new_view(timeout_qc.clone(), new_views); let (new, out) = self.engine.approve_new_view(timeout_qc.clone(), new_views);
@ -439,7 +439,7 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
tracing::info!( tracing::info!(
node = parse_idx(&self.id), node = parse_idx(&self.id),
current_view = self.engine.current_view(), current_view = self.engine.current_view(),
timeout_view = timeout_qc.view, timeout_view = timeout_qc.view(),
"receive timeout qc message" "receive timeout qc message"
); );
self.engine = self.engine.receive_timeout_qc(timeout_qc.clone()); self.engine = self.engine.receive_timeout_qc(timeout_qc.clone());
@ -457,11 +457,11 @@ impl<L: UpdateableLeaderSelection, O: Overlay<LeaderSelection = L>> Node for Car
.max_by_key(|qc| qc.view) .max_by_key(|qc| qc.view)
.expect("empty root committee") .expect("empty root committee")
.clone(); .clone();
let timeout_qc = TimeoutQc { let timeout_qc = TimeoutQc::new(
view: timeouts.iter().next().unwrap().view, timeouts.iter().next().unwrap().view,
high_qc, high_qc,
sender: self.id(), self.id(),
}; );
output.push(Output::BroadcastTimeoutQc { timeout_qc }); output.push(Output::BroadcastTimeoutQc { timeout_qc });
} }
} }