[WIP] Use events instead of send/broadcast methods (#21)

* Remove send and broadcast, use events

* Adjust tests to use events

* Adjust unhappy path tests to use events

* Fix missing wrongly optional return types
This commit is contained in:
Daniel Sanchez 2023-04-21 13:22:34 +02:00 committed by GitHub
parent 354e9f3e9d
commit 436577b760
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 85 deletions

View File

@ -139,6 +139,21 @@ class NewView:
Quorum: TypeAlias = Set[Vote] | Set[NewView]
Payload: TypeAlias = Block | Vote | Timeout | NewView | TimeoutQc
@dataclass
class BroadCast:
payload: Payload
@dataclass
class Send:
to: [Id]
payload: Payload
Event: TypeAlias = BroadCast | Send
class Overlay:
"""
Overlay structure for a View
@ -343,7 +358,7 @@ class Carnot:
self.safe_blocks[block.id()] = block
self.update_high_qc(block.qc)
def approve_block(self, block: Block, votes: Set[Vote]):
def approve_block(self, block: Block, votes: Set[Vote]) -> Event:
assert block.id() in self.safe_blocks
assert len(votes) == self.overlay.super_majority_threshold(self.id)
assert all(self.overlay.is_member_of_child_committee(self.id, vote.voter) for vote in votes)
@ -361,30 +376,30 @@ class Carnot:
view=block.view,
qc=qc
)
if self.overlay.is_member_of_root_committee(self.id):
self.send(vote, self.overlay.leader(block.view + 1))
else:
self.send(vote, *self.overlay.parent_committee(self.id))
self.highest_voted_view = max(self.highest_voted_view, block.view)
def forward_vote(self, vote: Vote):
if self.overlay.is_member_of_root_committee(self.id):
return Send(to=self.overlay.leader(block.view + 1), payload=vote)
return Send(to=self.overlay.parent_committee(self.id), payload=vote)
def forward_vote(self, vote: Vote) -> Optional[Event]:
assert vote.block in self.safe_blocks
assert self.overlay.is_member_of_child_committee(self.id, vote.voter)
# we only forward votes after we've voted ourselves
assert self.highest_voted_view == vote.view
if self.overlay.is_member_of_root_committee(self.id):
self.send(vote, self.overlay.leader(self.current_view + 1))
return Send(to=self.overlay.leader(self.current_view + 1), payload=vote)
def forward_new_view(self, msg: NewView):
def forward_new_view(self, msg: NewView) -> Optional[Event]:
assert msg.view == self.current_view
assert self.overlay.is_member_of_child_committee(self.id, msg.sender)
# we only forward votes after we've voted ourselves
assert self.highest_voted_view == msg.view
if self.overlay.is_member_of_root_committee(self.id):
self.send(msg, self.overlay.leader(self.current_view + 1))
return Send(to=self.overlay.leader(self.current_view + 1), payload=msg)
def build_qc(self, view: View, block: Optional[Block], new_views: Optional[Set[NewView]]) -> Qc:
# unhappy path
@ -401,7 +416,7 @@ class Carnot:
block=block.id()
)
def propose_block(self, view: View, quorum: Quorum):
def propose_block(self, view: View, quorum: Quorum) -> Event:
assert self.overlay.is_leader(self.id)
qc = None
@ -428,7 +443,7 @@ class Carnot:
)
))
)
self.broadcast(block)
return BroadCast(payload=block)
def is_safe_to_timeout_invariant(
self,
@ -462,7 +477,7 @@ class Carnot:
(self.current_view == self.last_view_timeout_qc.view)
)
def local_timeout(self):
def local_timeout(self) -> Optional[Event]:
"""
Root committee changes for each failure, so repeated failure will be handled by different
root committees
@ -479,9 +494,9 @@ class Carnot:
timeout_qc=self.last_view_timeout_qc,
sender=self.id
)
self.send(timeout_msg, *self.overlay.root_committee())
return Send(payload=timeout_msg, to=self.overlay.root_committee())
def timeout_detected(self, msgs: Set[Timeout]):
def timeout_detected(self, msgs: Set[Timeout]) -> Event:
"""
Root committee detected that supermajority of root + its children has timed out
The view has failed and this information is sent to all participants along with the information
@ -494,11 +509,11 @@ class Carnot:
assert self.overlay.is_member_of_root_committee(self.id)
timeout_qc = self.build_timeout_qc(msgs, self.id)
self.broadcast(timeout_qc) # we broadcast so all nodes can get ready for voting on a new view
return BroadCast(payload=timeout_qc) # we broadcast so all nodes can get ready for voting on a new view
# Note that receive_timeout qc should be called for root nodes as well
# noinspection PyTypeChecker
def approve_new_view(self, timeout_qc: TimeoutQc, new_views: Set[NewView]):
def approve_new_view(self, timeout_qc: TimeoutQc, new_views: Set[NewView]) -> Event:
"""
We will always need for timeout_qc to have been preprocessed by the received_timeout_qc method when the event
happens before approve_new_view is processed.
@ -530,15 +545,16 @@ class Carnot:
timeout_qc=timeout_qc,
)
if self.overlay.is_member_of_root_committee(self.id):
self.send(timeout_msg, self.overlay.leader(self.current_view + 1))
else:
self.send(timeout_msg, *self.overlay.parent_committee(self.id))
# This checks if a node has already incremented its voted view by local_timeout. If not then it should
# do it now to avoid voting in this view.
self.highest_voted_view = max(self.highest_voted_view, view)
if self.overlay.is_member_of_root_committee(self.id):
return Send(payload=timeout_msg, to=[self.overlay.leader(self.current_view + 1)])
return Send(payload=timeout_msg, to=self.overlay.parent_committee(self.id))
# Just a suggestion that received_timeout_qc can be reused by each node when the process timeout_qc of the NewView msg.
def receive_timeout_qc(self, timeout_qc: TimeoutQc):
assert timeout_qc.view >= self.current_view
@ -564,12 +580,6 @@ class Carnot:
sender=sender,
)
def send(self, vote: Vote | Timeout | NewView | TimeoutQc, *ids: Id):
pass
def broadcast(self, block):
pass
def update_current_view_from_timeout_qc(self, timeout_qc: TimeoutQc):
self.current_view = timeout_qc.view + 1

View File

@ -2,19 +2,6 @@ from carnot import *
from unittest import TestCase
class MockCarnot(Carnot):
def __init__(self, id):
super(MockCarnot, self).__init__(id)
self.proposed_block = None
self.latest_vote = None
def broadcast(self, block):
self.proposed_block = block
def send(self, vote: Vote | Timeout | TimeoutQc, *ids: Id):
self.latest_vote = vote
class TestCarnotHappyPath(TestCase):
@staticmethod
def add_genesis_block(carnot: Carnot) -> Block:
@ -299,7 +286,7 @@ class TestCarnotHappyPath(TestCase):
def parent_committee(self, _id: Id) -> Optional[Committee]:
return set()
carnot = MockCarnot(int_to_id(0))
carnot = Carnot(int_to_id(0))
carnot.overlay = MockOverlay()
genesis_block = self.add_genesis_block(carnot)
@ -316,8 +303,8 @@ class TestCarnotHappyPath(TestCase):
) for i in range(10)
)
# propose a new block
carnot.propose_block(view=1, quorum=votes)
proposed_block = carnot.proposed_block
proposed_block = carnot.propose_block(view=1, quorum=votes).payload
# process the proposed block as member of a committee
carnot.receive_block(proposed_block)
child_votes = set(
@ -382,7 +369,7 @@ class TestCarnotHappyPath(TestCase):
"""
Test that having a single committee (both root and leaf) and a leader is able to advance
"""
nodes = [MockCarnot(int_to_id(i)) for i in range(4)]
nodes = [Carnot(int_to_id(i)) for i in range(4)]
leader = nodes[0]
class MockOverlay(Overlay):
@ -433,15 +420,13 @@ class TestCarnotHappyPath(TestCase):
),
) for i in range(3)
)
leader.propose_block(1, votes)
proposed_block = leader.proposed_block
proposed_block = leader.propose_block(1, votes).payload
votes = []
for node in nodes:
node.receive_block(proposed_block)
node.approve_block(proposed_block, set())
votes.append(node.latest_vote)
leader.propose_block(2, set(votes))
next_proposed_block = leader.proposed_block
vote = node.approve_block(proposed_block, set())
votes.append(vote.payload)
next_proposed_block = leader.propose_block(2, set(votes)).payload
for node in nodes:
# A node receives the second proposed block
node.receive_block(next_proposed_block)

View File

@ -6,13 +6,6 @@ from itertools import chain
class MockCarnot(Carnot):
def __init__(self, id):
super(MockCarnot, self).__init__(id)
self.latest_event = None
def broadcast(self, block):
self.latest_event = block
def send(self, vote: Vote | Timeout | TimeoutQc, *ids: Id):
self.latest_event = vote
def rebuild_overlay_from_timeout_qc(self, timeout_qc: TimeoutQc):
pass
@ -118,8 +111,7 @@ def setup_initial_setup(test_case: TestCase, overlay: MockOverlay, size: int) ->
),
) for i in range(5)
)
leader.propose_block(1, genesis_votes)
proposed_block = leader.latest_event
proposed_block = leader.propose_block(1, genesis_votes).payload
test_case.assertIsNotNone(proposed_block)
return nodes, leader, proposed_block
@ -143,20 +135,20 @@ def succeed(test_case: TestCase, overlay: MockOverlay, nodes: Dict[Id, MockCarno
childs_ids = list(chain.from_iterable(overlay.leaf_committees()))
leafs = [nodes[_id] for _id in childs_ids]
for node in leafs:
node.approve_block(proposed_block, set())
votes[node.id] = node.latest_event
vote = node.approve_block(proposed_block, set()).payload
votes[node.id] = vote
while len(parents := parents_from_childs(overlay, childs_ids)) != 0:
for node_id in parents:
node = nodes[node_id]
child_votes = [votes[_id] for _id in votes.keys() if overlay.is_member_of_child_committee(node_id, _id)]
if len(child_votes) == overlay.super_majority_threshold(node_id) and node_id not in votes:
node.approve_block(proposed_block, child_votes)
votes[node_id] = node.latest_event
vote = node.approve_block(proposed_block, child_votes).payload
votes[node_id] = vote
childs_ids = list(set(parents))
root_votes = [
nodes[node_id].latest_event
votes[node_id]
for node_id in nodes
if overlay.is_member_of_root_committee(node_id) or overlay.is_child_of_root_committee(node_id)
]
@ -171,12 +163,11 @@ def fail(test_case: TestCase, overlay: MockOverlay, nodes: Dict[Id, MockCarnot],
node: MockCarnot
timeouts = []
for node in (nodes[_id] for _id in nodes if overlay.is_member_of_root_committee(_id) or overlay.is_child_of_root_committee(_id)):
node.local_timeout()
timeouts.append(node.latest_event)
timeout = node.local_timeout().payload
timeouts.append(timeout)
root_member = next(nodes[_id] for _id in nodes if overlay.is_member_of_root_committee(_id))
root_member.timeout_detected(timeouts)
timeout_qc = root_member.latest_event
timeout_qc = root_member.timeout_detected(timeouts).payload
for node in nodes.values():
node.receive_timeout_qc(timeout_qc)
@ -185,20 +176,20 @@ def fail(test_case: TestCase, overlay: MockOverlay, nodes: Dict[Id, MockCarnot],
childs_ids = list(chain.from_iterable(overlay.leaf_committees()))
leafs = [nodes[_id] for _id in childs_ids]
for node in leafs:
node.approve_new_view(timeout_qc, set())
votes[node.id] = node.latest_event
vote = node.approve_new_view(timeout_qc, set()).payload
votes[node.id] = vote
while len(parents := parents_from_childs(overlay, childs_ids)) != 0:
for node_id in parents:
node = nodes[node_id]
child_votes = [votes[_id] for _id in votes.keys() if overlay.is_member_of_child_committee(node_id, _id)]
if len(child_votes) == overlay.super_majority_threshold(node_id) and node_id not in votes:
node.approve_new_view(timeout_qc, child_votes)
votes[node_id] = node.latest_event
vote = node.approve_new_view(timeout_qc, child_votes).payload
votes[node_id] = vote
childs_ids = list(set(parents))
root_votes = [
nodes[node_id].latest_event
votes[node_id]
for node_id in nodes
if overlay.is_member_of_root_committee(node_id) or overlay.is_child_of_root_committee(node_id)
]
@ -224,10 +215,9 @@ class TestCarnotUnhappyPath(TestCase):
# while aggregating votes for the high qc. Those votes are then forwarded to the leader of view v + 2
# which can propose a block with those aggregate votes as proof of the previous round completion.
root_votes = fail(self, overlay, nodes, proposed_block)
leader.propose_block(view+2, root_votes)
proposed_block = leader.propose_block(view+2, root_votes).payload
# Add final assertions on nodes
proposed_block = leader.latest_event
# Thus, the first block that can be proposed is 2 views after the timeout
self.assertEqual(proposed_block.view, view + 2)
# Its qc is always for the view before the block is proposed for
@ -253,26 +243,21 @@ class TestCarnotUnhappyPath(TestCase):
for view in range(2, 5):
root_votes = succeed(self, overlay, nodes, proposed_block)
leader.propose_block(view, root_votes)
proposed_block = leader.latest_event
proposed_block = leader.propose_block(view, root_votes).payload
root_votes = fail(self, overlay, nodes, proposed_block)
leader.propose_block(6, root_votes)
proposed_block = leader.latest_event
proposed_block = leader.propose_block(6, root_votes).payload
for view in range(7, 8):
root_votes = succeed(self, overlay, nodes, proposed_block)
leader.propose_block(view, root_votes)
proposed_block = leader.latest_event
proposed_block = leader.propose_block(view, root_votes).payload
root_votes = fail(self, overlay, nodes, proposed_block)
leader.propose_block(9, root_votes)
proposed_block = leader.latest_event
proposed_block = leader.propose_block(9, root_votes).payload
for view in range(10, 15):
root_votes = succeed(self, overlay, nodes, proposed_block)
leader.propose_block(view, root_votes)
proposed_block = leader.latest_event
proposed_block = leader.propose_block(view, root_votes).payload
committed_blocks = [view for view in range(1, 11) if view not in (4, 5, 7, 8)]
for node in nodes.values():