Make carnot overlay generic (#87)

* Make carnot overlay generic

* support generic

* add back 'view for View fns

* add assertion on view number

* remove unused comments

* fix fmt

---------

Co-authored-by: al8n <scygliu1@gmail.com>
This commit is contained in:
Daniel Sanchez 2023-03-02 17:23:51 +01:00 committed by GitHub
parent dfe17696e6
commit c6bc35a931
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 109 additions and 66 deletions

View File

@ -25,7 +25,7 @@ use nomos_core::fountain::FountainCode;
use nomos_core::staking::Stake; use nomos_core::staking::Stake;
use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolService}; use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolService};
use nomos_network::NetworkService; use nomos_network::NetworkService;
use overlay::{Member, Overlay}; use overlay::Overlay;
use overwatch_rs::services::relay::{OutboundRelay, Relay}; use overwatch_rs::services::relay::{OutboundRelay, Relay};
use overwatch_rs::services::{ use overwatch_rs::services::{
handle::ServiceStateHandle, handle::ServiceStateHandle,
@ -40,8 +40,6 @@ pub type NodeId = PublicKey;
// Random seed for each round provided by the protocol // Random seed for each round provided by the protocol
pub type Seed = [u8; 32]; pub type Seed = [u8; 32];
const COMMITTEE_SIZE: usize = 1;
pub struct CarnotSettings<Fountain: FountainCode> { pub struct CarnotSettings<Fountain: FountainCode> {
private_key: [u8; 32], private_key: [u8; 32],
fountain_settings: Fountain::Settings, fountain_settings: Fountain::Settings,
@ -66,12 +64,13 @@ impl<Fountain: FountainCode> CarnotSettings<Fountain> {
} }
} }
pub struct CarnotConsensus<A, P, M, F> pub struct CarnotConsensus<A, P, M, F, O>
where where
F: FountainCode, F: FountainCode,
A: NetworkAdapter, A: NetworkAdapter,
M: MempoolAdapter<Tx = P::Tx>, M: MempoolAdapter<Tx = P::Tx>,
P: MemPool, P: MemPool,
O: Overlay<A, F>,
P::Tx: Debug + 'static, P::Tx: Debug + 'static,
P::Id: Debug + 'static, P::Id: Debug + 'static,
A::Backend: 'static, A::Backend: 'static,
@ -82,9 +81,10 @@ where
network_relay: Relay<NetworkService<A::Backend>>, network_relay: Relay<NetworkService<A::Backend>>,
mempool_relay: Relay<MempoolService<M, P>>, mempool_relay: Relay<MempoolService<M, P>>,
_fountain: std::marker::PhantomData<F>, _fountain: std::marker::PhantomData<F>,
_overlay: std::marker::PhantomData<O>,
} }
impl<A, P, M, F> ServiceData for CarnotConsensus<A, P, M, F> impl<A, P, M, F, O> ServiceData for CarnotConsensus<A, P, M, F, O>
where where
F: FountainCode, F: FountainCode,
A: NetworkAdapter, A: NetworkAdapter,
@ -92,6 +92,7 @@ where
P::Tx: Debug, P::Tx: Debug,
P::Id: Debug, P::Id: Debug,
M: MempoolAdapter<Tx = P::Tx>, M: MempoolAdapter<Tx = P::Tx>,
O: Overlay<A, F>,
{ {
const SERVICE_ID: ServiceId = "Carnot"; const SERVICE_ID: ServiceId = "Carnot";
type Settings = CarnotSettings<F>; type Settings = CarnotSettings<F>;
@ -101,7 +102,7 @@ where
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<A, P, M, F> ServiceCore for CarnotConsensus<A, P, M, F> impl<A, P, M, F, O> ServiceCore for CarnotConsensus<A, P, M, F, O>
where where
F: FountainCode + Send + Sync + 'static, F: FountainCode + Send + Sync + 'static,
A: NetworkAdapter + Send + Sync + 'static, A: NetworkAdapter + Send + Sync + 'static,
@ -110,6 +111,7 @@ where
P::Tx: Debug + Send + Sync + 'static, P::Tx: Debug + Send + Sync + 'static,
P::Id: Debug + Send + Sync + 'static, P::Id: Debug + Send + Sync + 'static,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static, M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay<A, F> + Send + Sync + 'static,
{ {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> { fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay(); let network_relay = service_state.overwatch_handle.relay();
@ -118,6 +120,7 @@ where
service_state, service_state,
network_relay, network_relay,
_fountain: Default::default(), _fountain: Default::default(),
_overlay: Default::default(),
mempool_relay, mempool_relay,
}) })
} }
@ -159,7 +162,7 @@ where
// FIXME: this should probably have a timer to detect failed rounds // FIXME: this should probably have a timer to detect failed rounds
let res = cur_view let res = cur_view
.resolve::<A, Member<'_, COMMITTEE_SIZE>, _, _, _>( .resolve::<A, O, _, _, _>(
private_key, private_key,
&tip, &tip,
&network_adapter, &network_adapter,
@ -206,7 +209,7 @@ impl View {
where where
A: NetworkAdapter + Send + Sync + 'static, A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode, F: FountainCode,
O: Overlay<'view, A, F>, O: Overlay<A, F>,
{ {
let res = if self.is_leader(node_id) { let res = if self.is_leader(node_id) {
let block = self let block = self
@ -241,19 +244,19 @@ impl View {
where where
A: NetworkAdapter + Send + Sync + 'static, A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode, F: FountainCode,
O: Overlay<'view, A, F>, O: Overlay<A, F>,
{ {
let overlay = O::new(self, node_id); let overlay = O::new(self, node_id);
// We need to build the QC for the block we are proposing // We need to build the QC for the block we are proposing
let qc = overlay.build_qc(adapter).await; let qc = overlay.build_qc(self, adapter).await;
let LeadershipResult::Leader { block, _view } = leadership let LeadershipResult::Leader { block, _view } = leadership
.try_propose_block(self, tip, qc) .try_propose_block(self, tip, qc)
.await else { panic!("we are leader")}; .await else { panic!("we are leader")};
overlay overlay
.broadcast_block(block.clone(), adapter, fountain) .broadcast_block(self, block.clone(), adapter, fountain)
.await; .await;
Ok(block) Ok(block)
@ -268,7 +271,7 @@ impl View {
where where
A: NetworkAdapter + Send + Sync + 'static, A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode, F: FountainCode,
O: Overlay<'view, A, F>, O: Overlay<A, F>,
{ {
let overlay = O::new(self, node_id); let overlay = O::new(self, node_id);
// Consensus in Carnot is achieved in 2 steps from the point of view of a node: // Consensus in Carnot is achieved in 2 steps from the point of view of a node:
@ -279,7 +282,7 @@ impl View {
// 1) Collect and verify block proposal. // 1) Collect and verify block proposal.
let block = overlay let block = overlay
.reconstruct_proposal_block(adapter, fountain) .reconstruct_proposal_block(self, adapter, fountain)
.await .await
.unwrap(); // FIXME: handle sad path .unwrap(); // FIXME: handle sad path
@ -291,7 +294,7 @@ impl View {
// We only consider the happy path for now // We only consider the happy path for now
if self.pipelined_safe_block(&block) { if self.pipelined_safe_block(&block) {
overlay overlay
.approve_and_forward(&block, adapter, &next_view) .approve_and_forward(self, &block, adapter, &next_view)
.await .await
.unwrap(); // FIXME: handle sad path .unwrap(); // FIXME: handle sad path
} }

View File

@ -8,37 +8,38 @@ use crate::network::messages::ProposalChunkMsg;
use crate::network::NetworkAdapter; use crate::network::NetworkAdapter;
/// View of the tree overlay centered around a specific member /// View of the tree overlay centered around a specific member
pub struct Member<'view, const C: usize> { pub struct Member<const C: usize> {
// id is not used now, but gonna probably used it for later checking later on // id is not used now, but gonna probably used it for later checking later on
#[allow(dead_code)] #[allow(dead_code)]
id: NodeId, id: NodeId,
committee: Committee, committee: Committee,
committees: Committees<'view, C>, committees: Committees<C>,
view_n: u64,
} }
/// #Just a newtype index to be able to implement parent/children methods /// #Just a newtype index to be able to implement parent/children methods
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct Committee(usize); pub struct Committee(usize);
pub struct Committees<'view, const C: usize> { pub struct Committees<const C: usize> {
view: &'view View,
nodes: Box<[NodeId]>, nodes: Box<[NodeId]>,
} }
impl<'view, const C: usize> Committees<'view, C> { impl<const C: usize> Committees<C> {
pub fn new(view: &'view View) -> Self { pub fn new(view: &View) -> Self {
let mut nodes = view.staking_keys.keys().cloned().collect::<Box<[NodeId]>>(); let mut nodes = view.staking_keys.keys().cloned().collect::<Box<[NodeId]>>();
let mut rng = rand_chacha::ChaCha20Rng::from_seed(view.seed); let mut rng = rand_chacha::ChaCha20Rng::from_seed(view.seed);
nodes.shuffle(&mut rng); nodes.shuffle(&mut rng);
Self { nodes, view } Self { nodes }
} }
pub fn into_member(self, id: NodeId) -> Option<Member<'view, C>> { pub fn into_member(self, id: NodeId, view: &View) -> Option<Member<C>> {
let member_idx = self.nodes.iter().position(|m| m == &id)?; let member_idx = self.nodes.iter().position(|m| m == &id)?;
Some(Member { Some(Member {
committee: Committee(member_idx / C), committee: Committee(member_idx / C),
committees: self, committees: self,
id, id,
view_n: view.view_n,
}) })
} }
@ -83,7 +84,7 @@ impl Committee {
} }
} }
impl<'view, const C: usize> Member<'view, C> { impl<const C: usize> Member<C> {
/// Return other members of this committee /// Return other members of this committee
pub fn peers(&self) -> &[NodeId] { pub fn peers(&self) -> &[NodeId] {
self.committees self.committees
@ -108,28 +109,36 @@ impl<'view, const C: usize> Member<'view, C> {
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const C: usize> impl<Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const C: usize>
Overlay<'view, Network, Fountain> for Member<'view, C> Overlay<Network, Fountain> for Member<C>
{ {
fn new(view: &'view View, node: NodeId) -> Self { // we still need view here to help us initialize
fn new(view: &View, node: NodeId) -> Self {
let committees = Committees::new(view); let committees = Committees::new(view);
committees.into_member(node).unwrap() committees.into_member(node, view).unwrap()
} }
async fn reconstruct_proposal_block( async fn reconstruct_proposal_block(
&self, &self,
view: &View,
adapter: &Network, adapter: &Network,
fountain: &Fountain, fountain: &Fountain,
) -> Result<Block, FountainError> { ) -> Result<Block, FountainError> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let committee = self.committee; let committee = self.committee;
let view = self.committees.view;
let message_stream = adapter.proposal_chunks_stream(committee, view).await; let message_stream = adapter.proposal_chunks_stream(committee, view).await;
fountain.decode(message_stream).await.map(Block::from_bytes) fountain.decode(message_stream).await.map(Block::from_bytes)
} }
async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain) { async fn broadcast_block(
&self,
view: &View,
block: Block,
adapter: &Network,
fountain: &Fountain,
) {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let (left_child, right_child) = self.children_committes(); let (left_child, right_child) = self.children_committes();
let view = self.committees.view;
let block_bytes = block.as_bytes(); let block_bytes = block.as_bytes();
let encoded_stream = fountain.encode(&block_bytes); let encoded_stream = fountain.encode(&block_bytes);
encoded_stream encoded_stream
@ -152,10 +161,12 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const
async fn approve_and_forward( async fn approve_and_forward(
&self, &self,
view: &View,
_block: &Block, _block: &Block,
_adapter: &Network, _adapter: &Network,
_next_view: &View, _next_view: &View,
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
// roughly, we want to do something like this: // roughly, we want to do something like this:
// 1. wait for left and right children committees to approve // 1. wait for left and right children committees to approve
// 2. approve the block // 2. approve the block
@ -166,7 +177,8 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const
todo!() todo!()
} }
async fn build_qc(&self, _adapter: &Network) -> Approval { async fn build_qc(&self, view: &View, _adapter: &Network) -> Approval {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
// maybe the leader publishing the QC? // maybe the leader publishing the QC?
todo!() todo!()
} }

View File

@ -29,19 +29,19 @@ impl Threshold {
/// As far as the API is concerned, this should be equivalent to any other /// As far as the API is concerned, this should be equivalent to any other
/// overlay and far simpler to implement. /// overlay and far simpler to implement.
/// For this reason, this might act as a 'reference' overlay for testing. /// For this reason, this might act as a 'reference' overlay for testing.
pub struct Flat<'view> { pub struct Flat {
view: &'view View,
// TODO: this should be a const param, but we can't do that yet // TODO: this should be a const param, but we can't do that yet
threshold: Threshold, threshold: Threshold,
node_id: NodeId, node_id: NodeId,
view_n: u64,
} }
impl<'view> Flat<'view> { impl Flat {
pub fn new(view: &'view View, node_id: NodeId) -> Self { pub fn new(view_n: u64, node_id: NodeId) -> Self {
Self { Self {
view,
threshold: DEFAULT_THRESHOLD, threshold: DEFAULT_THRESHOLD,
node_id, node_id,
view_n,
} }
} }
@ -52,32 +52,39 @@ impl<'view> Flat<'view> {
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> impl<Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> Overlay<Network, Fountain>
Overlay<'view, Network, Fountain> for Flat<'view> for Flat
{ {
fn new(view: &'view View, node: NodeId) -> Self { fn new(view: &View, node: NodeId) -> Self {
Flat::new(view, node) Flat::new(view.view_n, node)
} }
async fn reconstruct_proposal_block( async fn reconstruct_proposal_block(
&self, &self,
view: &View,
adapter: &Network, adapter: &Network,
fountain: &Fountain, fountain: &Fountain,
) -> Result<Block, FountainError> { ) -> Result<Block, FountainError> {
let message_stream = adapter assert_eq!(view.view_n, self.view_n, "view_n mismatch");
.proposal_chunks_stream(FLAT_COMMITTEE, self.view) let message_stream = adapter.proposal_chunks_stream(FLAT_COMMITTEE, view).await;
.await;
fountain.decode(message_stream).await.map(Block::from_bytes) fountain.decode(message_stream).await.map(Block::from_bytes)
} }
async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain) { async fn broadcast_block(
&self,
view: &View,
block: Block,
adapter: &Network,
fountain: &Fountain,
) {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let block_bytes = block.as_bytes(); let block_bytes = block.as_bytes();
let encoded_stream = fountain.encode(&block_bytes); let encoded_stream = fountain.encode(&block_bytes);
encoded_stream encoded_stream
.for_each_concurrent(None, |chunk| async move { .for_each_concurrent(None, |chunk| async move {
let message = ProposalChunkMsg { chunk }; let message = ProposalChunkMsg { chunk };
adapter adapter
.broadcast_block_chunk(FLAT_COMMITTEE, self.view, message) .broadcast_block_chunk(FLAT_COMMITTEE, view, message)
.await; .await;
}) })
.await; .await;
@ -85,16 +92,18 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync>
async fn approve_and_forward( async fn approve_and_forward(
&self, &self,
view: &View,
block: &Block, block: &Block,
adapter: &Network, adapter: &Network,
_next_view: &View, _next_view: &View,
) -> Result<(), Box<dyn Error>> { ) -> Result<(), Box<dyn Error>> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
// in the flat overlay, there's no need to wait for anyone before approving the block // in the flat overlay, there's no need to wait for anyone before approving the block
let approval = self.approve(block); let approval = self.approve(block);
adapter adapter
.forward_approval( .forward_approval(
FLAT_COMMITTEE, FLAT_COMMITTEE,
self.view, view,
ApprovalMsg { ApprovalMsg {
approval, approval,
source: self.node_id, source: self.node_id,
@ -104,11 +113,13 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync>
Ok(()) Ok(())
} }
async fn build_qc(&self, adapter: &Network) -> Approval { async fn build_qc(&self, view: &View, adapter: &Network) -> Approval {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
// for now, let's pretend that consensus is reached as soon as the // for now, let's pretend that consensus is reached as soon as the
// block is approved by a share of the nodes // block is approved by a share of the nodes
let mut approvals = HashSet::new(); let mut approvals = HashSet::new();
let mut stream = Box::into_pin(adapter.approvals_stream(FLAT_COMMITTEE, self.view).await); let mut stream = Box::into_pin(adapter.approvals_stream(FLAT_COMMITTEE, view).await);
// Shadow the original binding so that it can't be directly accessed // Shadow the original binding so that it can't be directly accessed
// ever again. // ever again.
@ -118,7 +129,7 @@ impl<'view, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync>
approvals.insert(approval); approvals.insert(approval);
// ceil(num/den * n) // ceil(num/den * n)
let threshold = let threshold =
(self.threshold.num * self.view.staking_keys.len() as u64 + self.threshold.den - 1) (self.threshold.num * view.staking_keys.len() as u64 + self.threshold.den - 1)
/ self.threshold.den; / self.threshold.den;
if approvals.len() as u64 >= threshold { if approvals.len() as u64 >= threshold {
// consensus reached // consensus reached

View File

@ -13,24 +13,32 @@ use nomos_core::fountain::{FountainCode, FountainError};
/// Dissemination overlay, tied to a specific view /// Dissemination overlay, tied to a specific view
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait Overlay<'view, Network: NetworkAdapter, Fountain: FountainCode> { pub trait Overlay<Network: NetworkAdapter, Fountain: FountainCode> {
fn new(view: &'view View, node: NodeId) -> Self; fn new(view: &View, node: NodeId) -> Self;
async fn reconstruct_proposal_block( async fn reconstruct_proposal_block(
&self, &self,
view: &View,
adapter: &Network, adapter: &Network,
fountain: &Fountain, fountain: &Fountain,
) -> Result<Block, FountainError>; ) -> Result<Block, FountainError>;
async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain); async fn broadcast_block(
&self,
view: &View,
block: Block,
adapter: &Network,
fountain: &Fountain,
);
/// Different overlays might have different needs or the same overlay might /// Different overlays might have different needs or the same overlay might
/// require different steps depending on the node role /// require different steps depending on the node role
/// For now let's put this responsibility on the overlay /// For now let's put this responsibility on the overlay
async fn approve_and_forward( async fn approve_and_forward(
&self, &self,
view: &View,
block: &Block, block: &Block,
adapter: &Network, adapter: &Network,
next_view: &View, next_view: &View,
) -> Result<(), Box<dyn Error>>; ) -> Result<(), Box<dyn Error>>;
/// Wait for consensus on a block /// Wait for consensus on a block
async fn build_qc(&self, adapter: &Network) -> Approval; async fn build_qc(&self, view: &View, adapter: &Network) -> Approval;
} }

View File

@ -5,8 +5,8 @@ use crate::*;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use futures::Stream; use futures::Stream;
use nomos_core::block::*; use nomos_core::fountain::FountainError;
use nomos_core::fountain::{mock::MockFountain, FountainCode, FountainError}; use nomos_core::fountain::{mock::MockFountain, FountainCode};
use nomos_network::backends::NetworkBackend; use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService; use nomos_network::NetworkService;
use overwatch_rs::services::relay::*; use overwatch_rs::services::relay::*;
@ -17,26 +17,35 @@ struct DummyAdapter;
struct DummyBackend; struct DummyBackend;
#[async_trait] #[async_trait]
impl<'view, N: NetworkAdapter + Sync, F: FountainCode + Sync> Overlay<'view, N, F> impl<N: NetworkAdapter + Sync, F: FountainCode + Sync> Overlay<N, F> for DummyOverlay {
for DummyOverlay
{
fn new(_: &View, _: NodeId) -> Self { fn new(_: &View, _: NodeId) -> Self {
DummyOverlay DummyOverlay
} }
async fn build_qc(&self, _: &N) -> Approval { async fn reconstruct_proposal_block(
Approval &self,
} _view: &View,
_adapter: &N,
async fn broadcast_block(&self, _: Block, _: &N, _: &F) {} _fountain: &F,
) -> Result<Block, FountainError> {
async fn reconstruct_proposal_block(&self, _: &N, _: &F) -> Result<Block, FountainError> {
Ok(Block) Ok(Block)
} }
async fn approve_and_forward(&self, _: &Block, _: &N, _: &View) -> Result<(), Box<dyn Error>> { async fn broadcast_block(&self, _view: &View, _block: Block, _adapter: &N, _fountain: &F) {}
async fn approve_and_forward(
&self,
_view: &View,
_block: &Block,
_adapter: &N,
_next_view: &View,
) -> Result<(), Box<dyn Error>> {
Ok(()) Ok(())
} }
async fn build_qc(&self, _view: &View, _: &N) -> Approval {
Approval
}
} }
#[async_trait] #[async_trait]