diff --git a/carnot/carnot.py b/carnot/carnot.py index c67019e..532c4d9 100644 --- a/carnot/carnot.py +++ b/carnot/carnot.py @@ -139,6 +139,21 @@ class NewView: Quorum: TypeAlias = Set[Vote] | Set[NewView] +Payload: TypeAlias = Block | Vote | Timeout | NewView | TimeoutQc + +@dataclass +class BroadCast: + payload: Payload + + +@dataclass +class Send: + to: [Id] + payload: Payload + + +Event: TypeAlias = BroadCast | Send + class Overlay: """ Overlay structure for a View @@ -343,7 +358,7 @@ class Carnot: self.safe_blocks[block.id()] = block self.update_high_qc(block.qc) - def approve_block(self, block: Block, votes: Set[Vote]): + def approve_block(self, block: Block, votes: Set[Vote]) -> Optional[Event]: assert block.id() in self.safe_blocks assert len(votes) == self.overlay.super_majority_threshold(self.id) assert all(self.overlay.is_member_of_child_committee(self.id, vote.voter) for vote in votes) @@ -361,13 +376,14 @@ class Carnot: view=block.view, qc=qc ) - if self.overlay.is_member_of_root_committee(self.id): - self.send(vote, self.overlay.leader(block.view + 1)) - else: - self.send(vote, *self.overlay.parent_committee(self.id)) self.highest_voted_view = max(self.highest_voted_view, block.view) + if self.overlay.is_member_of_root_committee(self.id): + return Send(to=self.overlay.leader(block.view + 1), payload=vote) + else: + return Send(to=self.overlay.parent_committee(self.id), payload=vote) + def forward_vote(self, vote: Vote): assert vote.block in self.safe_blocks assert self.overlay.is_member_of_child_committee(self.id, vote.voter) @@ -375,16 +391,16 @@ class Carnot: assert self.highest_voted_view == vote.view if self.overlay.is_member_of_root_committee(self.id): - self.send(vote, self.overlay.leader(self.current_view + 1)) + return Send(to=self.overlay.leader(self.current_view + 1), payload=vote) - def forward_new_view(self, msg: NewView): + def forward_new_view(self, msg: NewView) -> Optional[Event]: assert msg.view == self.current_view assert self.overlay.is_member_of_child_committee(self.id, msg.sender) # we only forward votes after we've voted ourselves assert self.highest_voted_view == msg.view if self.overlay.is_member_of_root_committee(self.id): - self.send(msg, self.overlay.leader(self.current_view + 1)) + return Send(to=self.overlay.leader(self.current_view + 1), payload=msg) def build_qc(self, view: View, block: Optional[Block], new_views: Optional[Set[NewView]]) -> Qc: # unhappy path @@ -401,7 +417,7 @@ class Carnot: block=block.id() ) - def propose_block(self, view: View, quorum: Quorum): + def propose_block(self, view: View, quorum: Quorum) -> Event: assert self.overlay.is_leader(self.id) qc = None @@ -428,7 +444,7 @@ class Carnot: ) )) ) - self.broadcast(block) + return BroadCast(payload=block) def is_safe_to_timeout_invariant( self, @@ -462,7 +478,7 @@ class Carnot: (self.current_view == self.last_view_timeout_qc.view) ) - def local_timeout(self): + def local_timeout(self) -> Optional[Event]: """ Root committee changes for each failure, so repeated failure will be handled by different root committees @@ -479,9 +495,9 @@ class Carnot: timeout_qc=self.last_view_timeout_qc, sender=self.id ) - self.send(timeout_msg, *self.overlay.root_committee()) + return Send(payload=timeout_msg, to=self.overlay.root_committee()) - def timeout_detected(self, msgs: Set[Timeout]): + def timeout_detected(self, msgs: Set[Timeout]) -> Optional[Event]: """ Root committee detected that supermajority of root + its children has timed out The view has failed and this information is sent to all participants along with the information @@ -494,11 +510,11 @@ class Carnot: assert self.overlay.is_member_of_root_committee(self.id) timeout_qc = self.build_timeout_qc(msgs, self.id) - self.broadcast(timeout_qc) # we broadcast so all nodes can get ready for voting on a new view + return BroadCast(payload=timeout_qc) # we broadcast so all nodes can get ready for voting on a new view # Note that receive_timeout qc should be called for root nodes as well # noinspection PyTypeChecker - def approve_new_view(self, timeout_qc: TimeoutQc, new_views: Set[NewView]): + def approve_new_view(self, timeout_qc: TimeoutQc, new_views: Set[NewView]) -> Optional[Event]: """ We will always need for timeout_qc to have been preprocessed by the received_timeout_qc method when the event happens before approve_new_view is processed. @@ -530,15 +546,17 @@ class Carnot: timeout_qc=timeout_qc, ) - if self.overlay.is_member_of_root_committee(self.id): - self.send(timeout_msg, self.overlay.leader(self.current_view + 1)) - else: - self.send(timeout_msg, *self.overlay.parent_committee(self.id)) - # This checks if a node has already incremented its voted view by local_timeout. If not then it should # do it now to avoid voting in this view. self.highest_voted_view = max(self.highest_voted_view, view) + if self.overlay.is_member_of_root_committee(self.id): + return Send(payload=timeout_msg, to=[self.overlay.leader(self.current_view + 1)]) + else: + return Send(payload=timeout_msg, to=self.overlay.parent_committee(self.id)) + + + # Just a suggestion that received_timeout_qc can be reused by each node when the process timeout_qc of the NewView msg. def receive_timeout_qc(self, timeout_qc: TimeoutQc): assert timeout_qc.view >= self.current_view @@ -564,12 +582,6 @@ class Carnot: sender=sender, ) - def send(self, vote: Vote | Timeout | NewView | TimeoutQc, *ids: Id): - pass - - def broadcast(self, block): - pass - def update_current_view_from_timeout_qc(self, timeout_qc: TimeoutQc): self.current_view = timeout_qc.view + 1