Consensus service refactor (#176)

The consensus service was becoming a bit messy and difficult to maintain.
This PR moves handling invididual events to their own function and refactors the cancelabe task system in a separate struct.
This commit is contained in:
Giacomo Pasini 2023-06-13 12:11:55 +02:00 committed by GitHub
parent fe0361a5b8
commit 75025e8cf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 421 additions and 291 deletions

View File

@ -1,13 +1,7 @@
//! In this module, and children ones, the 'view lifetime is tied to a logical consensus view,
//! represented by the `View` struct.
//! This is done to ensure that all the different data structs used to represent various actors
//! are always synchronized (i.e. it cannot happen that we accidentally use committees from different views).
//! It's obviously extremely important that the information contained in `View` is synchronized across different
//! nodes, but that has to be achieved through different means.
mod leader_selection;
pub mod network;
mod tally;
mod view_cancel;
mod task_manager;
// std
use std::collections::HashSet;
@ -17,19 +11,20 @@ use std::pin::Pin;
use std::time::Duration;
// crates
use bls_signatures::PrivateKey;
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
use futures::{Stream, StreamExt};
use leader_selection::UpdateableLeaderSelection;
use serde::Deserialize;
use serde::{de::DeserializeOwned, Serialize};
use tracing::instrument;
// internal
use crate::network::messages::{NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg};
use crate::network::NetworkAdapter;
use crate::tally::{happy::CarnotTally, unhappy::NewViewTally, CarnotTallySettings};
use crate::view_cancel::ViewCancelCache;
use consensus_engine::{
overlay::RandomBeaconState, AggregateQc, Carnot, Committee, LeaderProof, NewView, Overlay,
Payload, Qc, StandardQc, Timeout, TimeoutQc, Vote,
Payload, Qc, StandardQc, Timeout, TimeoutQc, View, Vote,
};
use task_manager::TaskManager;
use nomos_core::block::Block;
use nomos_core::crypto::PublicKey;
@ -178,117 +173,65 @@ where
leader_proof: LeaderProof::LeaderId { leader_id: [0; 32] },
};
let mut carnot = Carnot::from_genesis(private_key, genesis, overlay);
let network_adapter = A::new(network_relay).await;
let adapter = &network_adapter;
let _self_committee = carnot.self_committee();
let self_committee = &_self_committee;
let _leader_committee = [carnot.id()].into_iter().collect();
let leader_committee = &_leader_committee;
let adapter = A::new(network_relay).await;
let fountain = F::new(fountain_settings);
let _tally_settings = CarnotTallySettings {
let private_key = PrivateKey::new(private_key);
let self_committee = carnot.self_committee();
let leader_committee = [carnot.id()].into_iter().collect::<HashSet<_>>();
let tally_settings = CarnotTallySettings {
threshold: carnot.super_majority_threshold(),
participating_nodes: carnot.child_committees().into_iter().flatten().collect(),
};
let tally_settings = &_tally_settings;
let _leader_tally_settings = CarnotTallySettings {
let leader_tally_settings = CarnotTallySettings {
threshold: carnot.leader_super_majority_threshold(),
// TODO: add children of root committee
participating_nodes: carnot.root_committee(),
};
let leader_tally_settings = &_leader_tally_settings;
let mut view_cancel_cache = ViewCancelCache::new();
let mut task_manager = TaskManager::new();
let events: FuturesUnordered<Pin<Box<dyn Future<Output = Event<P::Tx>> + Send>>> =
FuturesUnordered::new();
let genesis_block = carnot.genesis_block();
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
task_manager.push(
genesis_block.view + 1,
Self::gather_block(adapter, genesis_block.view + 1),
)));
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
Self::gather_block(adapter.clone(), genesis_block.view + 1),
);
task_manager.push(
genesis_block.view,
Self::gather_votes(
adapter,
self_committee,
adapter.clone(),
self_committee.clone(),
genesis_block.clone(),
tally_settings.clone(),
),
)));
);
if carnot.is_next_leader() {
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
genesis_block.view + 1,
async move {
let Event::Approve { qc, .. } = Self::gather_votes(
adapter,
leader_committee,
genesis_block,
leader_tally_settings.clone(),
)
.await else { unreachable!() };
Event::ProposeBlock { qc }
},
)));
let network_adapter = adapter.clone();
task_manager.push(genesis_block.view + 1, async move {
let Event::Approve { qc, .. } = Self::gather_votes(
network_adapter,
leader_committee.clone(),
genesis_block,
leader_tally_settings.clone(),
)
.await else { unreachable!() };
Event::ProposeBlock { qc }
});
}
tokio::pin!(events);
while let Some(event) = events.next().await {
while let Some(event) = task_manager.next().await {
let mut output = None;
let prev_view = carnot.current_view();
match event {
Event::Proposal { block, mut stream } => {
tracing::debug!("received proposal {:?}", block);
let original_block = block;
let block = original_block.header().clone();
match carnot.receive_block(block.clone()) {
Ok(mut new_state) => {
let new_view = new_state.current_view();
if new_view != carnot.current_view() {
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
block.view,
Self::gather_votes(
adapter,
self_committee,
block.clone(),
tally_settings.clone(),
),
)));
new_state =
Self::update_leader_selection(new_state, |leader_selection| {
leader_selection.on_new_block_received(original_block)
});
} else {
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
block.view,
async move {
if let Some(block) = stream.next().await {
Event::Proposal { block, stream }
} else {
Event::None
}
},
)));
}
carnot = new_state;
}
Err(_) => tracing::debug!("invalid block {:?}", block),
}
if carnot.is_next_leader() {
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
block.view,
async move {
let Event::Approve { qc, .. } = Self::gather_votes(
adapter,
leader_committee,
block,
leader_tally_settings.clone(),
)
.await else { unreachable!() };
Event::ProposeBlock { qc }
},
)));
}
Event::Proposal { block, stream } => {
(carnot, output) = Self::process_block(
carnot,
block,
stream,
&mut task_manager,
adapter.clone(),
)
.await;
}
Event::Approve { block, .. } => {
tracing::debug!("approving proposal {:?}", block);
@ -306,131 +249,48 @@ where
timeout_qc,
new_views,
} => {
tracing::debug!("approving new view {:?}", timeout_qc);
let (new_carnot, out) = carnot.approve_new_view(timeout_qc.clone(), new_views);
carnot = new_carnot;
output = Some(Output::Send(out));
let new_view = timeout_qc.view + 1;
if carnot.is_next_leader() {
let high_qc = carnot.high_qc();
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
new_view,
async move {
let _votes = Self::gather_new_views(
adapter,
leader_committee,
timeout_qc.clone(),
leader_tally_settings.clone(),
)
.await;
Event::ProposeBlock {
qc: Qc::Aggregated(AggregateQc {
high_qc,
view: new_view,
}),
}
},
)));
}
(carnot, output) = Self::approve_new_view(
carnot,
timeout_qc,
new_views,
&mut task_manager,
adapter.clone(),
)
.await;
}
Event::TimeoutQc { timeout_qc } => {
tracing::debug!("timeout received {:?}", timeout_qc);
let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone());
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
timeout_qc.view + 1,
Self::gather_new_views(
adapter,
self_committee,
timeout_qc.clone(),
tally_settings.clone(),
),
)));
if carnot.current_view() != new_state.current_view() {
new_state = Self::update_leader_selection(new_state, |leader_selection| {
leader_selection.on_timeout_qc_received(timeout_qc)
});
}
carnot = new_state;
(carnot, output) = Self::receive_timeout_qc(
carnot,
timeout_qc,
&mut task_manager,
adapter.clone(),
)
.await;
}
Event::RootTimeout { timeouts } => {
tracing::debug!("root timeout {:?}", timeouts);
// TODO: filter timeouts upon reception
assert!(timeouts.iter().all(|t| t.view == carnot.current_view()));
let high_qc = timeouts
.iter()
.map(|t| &t.high_qc)
.chain(std::iter::once(&carnot.high_qc()))
.max_by_key(|qc| qc.view)
.expect("empty root committee")
.clone();
if carnot.is_member_of_root_committee() {
let timeout_qc = TimeoutQc {
view: carnot.current_view(),
high_qc,
sender: carnot.id(),
};
output = Some(Output::BroadcastTimeoutQc { timeout_qc });
}
(carnot, output) = Self::process_root_timeout(carnot, timeouts).await;
}
Event::ProposeBlock { qc } => {
tracing::debug!("proposing block");
let (reply_channel, rx) = tokio::sync::oneshot::channel();
mempool_relay
.send(MempoolMsg::View {
ancestor_hint: [0; 32],
reply_channel,
})
.await
.unwrap_or_else(|(e, _)| {
eprintln!("Could not get transactions from mempool {e}")
});
match rx.await {
Ok(txs) => {
let beacon = RandomBeaconState::generate_happy(
qc.view(),
&PrivateKey::new(private_key),
);
let proposal = Block::new(qc.view() + 1, qc, txs, carnot.id(), beacon);
output = Some(Output::BroadcastProposal { proposal });
}
Err(e) => tracing::error!("Could not fetch txs {e}"),
}
output =
Self::propose_block(carnot.id(), private_key, qc, mempool_relay.clone())
.await;
}
Event::None => {}
_ => {}
}
let current_view = carnot.current_view();
if current_view != prev_view {
// First we cancel previous processing view tasks
view_cancel_cache.cancel(prev_view);
tracing::debug!("Advanced view from {prev_view} to {current_view}");
// View change!
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
current_view,
async {
tokio::time::sleep(TIMEOUT).await;
Event::LocalTimeout
},
)));
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
current_view + 1,
Self::gather_block(adapter, current_view + 1),
)));
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
current_view,
Self::gather_timeout_qc(adapter, current_view),
)));
if carnot.is_member_of_root_committee() {
let threshold = carnot.leader_super_majority_threshold();
events.push(Box::pin(view_cancel_cache.cancelable_event_future(
current_view,
Self::gather_timeout(adapter, self_committee, current_view, threshold),
)));
}
Self::process_view_change(
carnot.clone(),
prev_view,
&mut task_manager,
adapter.clone(),
)
.await;
}
if let Some(output) = output {
handle_output(adapter, &fountain, carnot.id(), output).await;
handle_output(&adapter, &fountain, carnot.id(), output).await;
}
}
@ -458,7 +318,226 @@ where
O: Overlay + Debug + Send + Sync + 'static,
O::LeaderSelection: UpdateableLeaderSelection,
{
async fn gather_timeout_qc(adapter: &A, view: consensus_engine::View) -> Event<P::Tx> {
#[instrument(level = "debug", skip(adapter, task_manager, stream))]
async fn process_block(
mut carnot: Carnot<O>,
block: Block<P::Tx>,
mut stream: Pin<Box<dyn Stream<Item = Block<P::Tx>> + Send>>,
task_manager: &mut TaskManager<View, Event<P::Tx>>,
adapter: A,
) -> (Carnot<O>, Option<Output<P::Tx>>) {
tracing::debug!("received proposal {:?}", block);
let original_block = block;
let block = original_block.header().clone();
let self_committee = carnot.self_committee();
let leader_committee = [carnot.id()].into_iter().collect();
let tally_settings = CarnotTallySettings {
threshold: carnot.super_majority_threshold(),
participating_nodes: carnot.child_committees().into_iter().flatten().collect(),
};
let leader_tally_settings = CarnotTallySettings {
threshold: carnot.leader_super_majority_threshold(),
// TODO: add children of root committee
participating_nodes: carnot.root_committee(),
};
match carnot.receive_block(block.clone()) {
Ok(mut new_state) => {
let new_view = new_state.current_view();
if new_view != carnot.current_view() {
task_manager.push(
block.view,
Self::gather_votes(
adapter.clone(),
self_committee,
block.clone(),
tally_settings,
),
);
new_state = Self::update_leader_selection(new_state, |leader_selection| {
leader_selection.on_new_block_received(original_block)
});
} else {
task_manager.push(block.view, async move {
if let Some(block) = stream.next().await {
Event::Proposal { block, stream }
} else {
Event::None
}
});
}
carnot = new_state;
}
Err(_) => tracing::debug!("invalid block {:?}", block),
}
if carnot.is_next_leader() {
task_manager.push(block.view, async move {
let Event::Approve { qc, .. } = Self::gather_votes(
adapter,
leader_committee,
block,
leader_tally_settings,
)
.await else { unreachable!() };
Event::ProposeBlock { qc }
});
}
(carnot, None)
}
#[instrument(level = "debug", skip(task_manager, adapter))]
async fn approve_new_view(
carnot: Carnot<O>,
timeout_qc: TimeoutQc,
new_views: HashSet<NewView>,
task_manager: &mut TaskManager<View, Event<P::Tx>>,
adapter: A,
) -> (Carnot<O>, Option<Output<P::Tx>>) {
let leader_committee = [carnot.id()].into_iter().collect();
let leader_tally_settings = CarnotTallySettings {
threshold: carnot.leader_super_majority_threshold(),
// TODO: add children of root committee
participating_nodes: carnot.root_committee(),
};
let (new_carnot, out) = carnot.approve_new_view(timeout_qc.clone(), new_views);
let next_view = timeout_qc.view + 2;
if carnot.is_next_leader() {
let high_qc = carnot.high_qc();
task_manager.push(timeout_qc.view + 1, async move {
let _votes = Self::gather_new_views(
adapter,
leader_committee,
timeout_qc,
leader_tally_settings.clone(),
)
.await;
Event::ProposeBlock {
qc: Qc::Aggregated(AggregateQc {
high_qc,
view: next_view,
}),
}
});
}
(new_carnot, Some(Output::Send(out)))
}
#[instrument(level = "debug", skip(task_manager, adapter))]
async fn receive_timeout_qc(
carnot: Carnot<O>,
timeout_qc: TimeoutQc,
task_manager: &mut TaskManager<View, Event<P::Tx>>,
adapter: A,
) -> (Carnot<O>, Option<Output<P::Tx>>) {
let mut new_state = carnot.receive_timeout_qc(timeout_qc.clone());
let self_committee = carnot.self_committee();
let tally_settings = CarnotTallySettings {
threshold: carnot.super_majority_threshold(),
participating_nodes: carnot.child_committees().into_iter().flatten().collect(),
};
task_manager.push(
timeout_qc.view + 1,
Self::gather_new_views(adapter, self_committee, timeout_qc.clone(), tally_settings),
);
if carnot.current_view() != new_state.current_view() {
new_state = Self::update_leader_selection(new_state, |leader_selection| {
leader_selection.on_timeout_qc_received(timeout_qc)
});
}
(new_state, None)
}
#[instrument(level = "debug")]
async fn process_root_timeout(
carnot: Carnot<O>,
timeouts: HashSet<Timeout>,
) -> (Carnot<O>, Option<Output<P::Tx>>) {
// TODO: filter timeouts upon reception
assert!(timeouts.iter().all(|t| t.view == carnot.current_view()));
let high_qc = timeouts
.iter()
.map(|t| &t.high_qc)
.chain(std::iter::once(&carnot.high_qc()))
.max_by_key(|qc| qc.view)
.expect("empty root committee")
.clone();
let mut output = None;
if carnot.is_member_of_root_committee() {
let timeout_qc = TimeoutQc {
view: carnot.current_view(),
high_qc,
sender: carnot.id(),
};
output = Some(Output::BroadcastTimeoutQc { timeout_qc });
}
(carnot, output)
}
#[instrument(level = "debug", skip(mempool_relay, private_key))]
async fn propose_block(
id: NodeId,
private_key: PrivateKey,
qc: Qc,
mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>,
) -> Option<Output<P::Tx>> {
let (reply_channel, rx) = tokio::sync::oneshot::channel();
let mut output = None;
mempool_relay
.send(MempoolMsg::View {
ancestor_hint: [0; 32],
reply_channel,
})
.await
.unwrap_or_else(|(e, _)| eprintln!("Could not get transactions from mempool {e}"));
match rx.await {
Ok(txs) => {
let beacon = RandomBeaconState::generate_happy(qc.view(), &private_key);
let proposal = Block::new(qc.view() + 1, qc, txs, id, beacon);
output = Some(Output::BroadcastProposal { proposal });
}
Err(e) => tracing::error!("Could not fetch txs {e}"),
}
output
}
async fn process_view_change(
carnot: Carnot<O>,
prev_view: View,
task_manager: &mut TaskManager<View, Event<P::Tx>>,
adapter: A,
) {
let current_view = carnot.current_view();
// First we cancel previous processing view tasks
task_manager.cancel(prev_view);
tracing::debug!("Advanced view from {prev_view} to {current_view}");
// View change!
task_manager.push(current_view, async {
tokio::time::sleep(TIMEOUT).await;
Event::LocalTimeout
});
task_manager.push(
current_view + 1,
Self::gather_block(adapter.clone(), current_view + 1),
);
task_manager.push(
current_view,
Self::gather_timeout_qc(adapter.clone(), current_view),
);
if carnot.is_member_of_root_committee() {
let threshold = carnot.leader_super_majority_threshold();
task_manager.push(
current_view,
Self::gather_timeout(adapter, carnot.self_committee(), current_view, threshold),
);
}
}
async fn gather_timeout_qc(adapter: A, view: consensus_engine::View) -> Event<P::Tx> {
if let Some(timeout_qc) = adapter
.timeout_qc_stream(view)
.await
@ -473,13 +552,13 @@ where
}
async fn gather_votes(
adapter: &A,
committee: &Committee,
adapter: A,
committee: Committee,
block: consensus_engine::Block,
tally: CarnotTallySettings,
) -> Event<P::Tx> {
let tally = CarnotTally::new(tally);
let votes_stream = adapter.votes_stream(committee, block.view, block.id).await;
let votes_stream = adapter.votes_stream(&committee, block.view, block.id).await;
match tally.tally(block.clone(), votes_stream).await {
Ok((qc, votes)) => Event::Approve { qc, votes, block },
Err(_e) => {
@ -489,14 +568,14 @@ where
}
async fn gather_new_views(
adapter: &A,
committee: &Committee,
adapter: A,
committee: Committee,
timeout_qc: TimeoutQc,
tally: CarnotTallySettings,
) -> Event<P::Tx> {
let tally = NewViewTally::new(tally);
let stream = adapter
.new_view_stream(committee, timeout_qc.view + 1)
.new_view_stream(&committee, timeout_qc.view + 1)
.await;
match tally.tally(timeout_qc.clone(), stream).await {
Ok((_qc, new_views)) => Event::NewView {
@ -510,13 +589,13 @@ where
}
async fn gather_timeout(
adapter: &A,
committee: &Committee,
adapter: A,
committee: Committee,
view: consensus_engine::View,
threshold: usize,
) -> Event<P::Tx> {
let timeouts = adapter
.timeout_stream(committee, view)
.timeout_stream(&committee, view)
.await
.take(threshold)
.map(|msg| msg.vote)
@ -525,7 +604,7 @@ where
Event::RootTimeout { timeouts }
}
async fn gather_block(adapter: &A, view: consensus_engine::View) -> Event<P::Tx> {
async fn gather_block(adapter: A, view: consensus_engine::View) -> Event<P::Tx> {
let stream = adapter
.proposal_chunks_stream(view)
.await

View File

@ -0,0 +1,121 @@
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
use std::collections::HashMap;
use std::hash::Hash;
use std::pin::Pin;
use tokio::select;
use tokio_util::sync::CancellationToken;
pub struct TaskManager<Group, Out> {
tasks: FuturesUnordered<Pin<Box<dyn Future<Output = Option<Out>> + Send>>>,
cancel_cache: CancelCache<Group>,
}
impl<Group, Out> TaskManager<Group, Out>
where
Group: Eq + PartialEq + Hash + 'static,
Out: 'static,
{
pub fn new() -> Self {
Self {
tasks: FuturesUnordered::new(),
cancel_cache: CancelCache::new(),
}
}
pub fn push(&mut self, group: Group, task: impl Future<Output = Out> + Send + 'static) {
self.tasks.push(Box::pin(
self.cancel_cache.cancelable_event_future(group, task),
));
}
pub fn cancel(&mut self, group: Group) {
self.cancel_cache.cancel(group);
}
}
impl<Group: Unpin, Out> Stream for TaskManager<Group, Out> {
type Item = Out;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::task::Poll;
let tasks = &mut self.get_mut().tasks;
loop {
match tasks.poll_next_unpin(cx) {
// we need to remove the outer Option that was inserted by the cancelabl future
Poll::Ready(Some(Some(event))) => return Poll::Ready(Some(event)),
// an empty output means the task was cancelled, ignore it
Poll::Ready(Some(None)) => {}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
}
}
}
pub struct GroupCancel(CancellationToken);
impl GroupCancel {
pub fn new() -> Self {
Self(CancellationToken::new())
}
pub fn cancel(&self) {
self.0.cancel();
}
pub fn cancel_token(&self) -> CancellationToken {
self.0.clone()
}
}
impl Drop for GroupCancel {
fn drop(&mut self) {
if !self.0.is_cancelled() {
self.cancel();
}
}
}
pub struct CancelCache<Group> {
cancels: HashMap<Group, GroupCancel>,
}
impl<Group: Eq + PartialEq + Hash> CancelCache<Group> {
pub fn new() -> Self {
Self {
cancels: HashMap::new(),
}
}
pub fn cancel(&mut self, group: Group) {
if let Some(cancel) = self.cancels.remove(&group) {
cancel.cancel();
}
}
pub fn cancel_token(&mut self, group: Group) -> CancellationToken {
self.cancels
.entry(group)
.or_insert_with(GroupCancel::new)
.cancel_token()
}
pub(crate) fn cancelable_event_future<Out, F: Future<Output = Out>>(
&mut self,
group: Group,
f: F,
) -> impl Future<Output = Option<Out>> {
let token = self.cancel_token(group);
async move {
select! {
event = f => Some(event),
_ = token.cancelled() => None,
}
}
}
}

View File

@ -1,70 +0,0 @@
use crate::Event;
use consensus_engine::View;
use std::collections::HashMap;
use std::future::Future;
use std::hash::Hash;
use tokio::select;
use tokio_util::sync::CancellationToken;
pub struct ViewCancel(CancellationToken);
impl ViewCancel {
pub fn new() -> Self {
ViewCancel(CancellationToken::new())
}
pub fn cancel(&self) {
self.0.cancel();
}
pub fn cancel_token(&self) -> CancellationToken {
self.0.clone()
}
}
impl Drop for ViewCancel {
fn drop(&mut self) {
if !self.0.is_cancelled() {
self.cancel();
}
}
}
pub struct ViewCancelCache {
cancels: HashMap<View, ViewCancel>,
}
impl ViewCancelCache {
pub fn new() -> Self {
ViewCancelCache {
cancels: HashMap::new(),
}
}
pub fn cancel(&mut self, view: View) {
if let Some(cancel) = self.cancels.remove(&view) {
cancel.cancel();
}
}
pub fn cancel_token(&mut self, view: View) -> CancellationToken {
self.cancels
.entry(view)
.or_insert_with(ViewCancel::new)
.cancel_token()
}
pub(crate) fn cancelable_event_future<Tx: Clone + Hash + Eq, F: Future<Output = Event<Tx>>>(
&mut self,
view: View,
f: F,
) -> impl Future<Output = Event<Tx>> {
let token = self.cancel_token(view);
async move {
select! {
event = f => event,
_ = token.cancelled() => Event::None,
}
}
}
}