Voting core primitive (#82)

* Add tally trait
Implement mock tally

* Add tally to consensus

* Use tally in implemented overlays

* Clippy happy

* Scratch carnot voting as per specification

* Add missing derives

* Clippy happy

* Fix tests

* Add checks on valid votes

* Refactor ApprovalMsg to VoteMsg

* Remove no vote in MockVote and MockQC

* Remove no vote in MockVote and MockQC

* Remove timeout todo

* Fix tests
This commit is contained in:
Daniel Sanchez 2023-03-14 03:32:36 -07:00 committed by GitHub
parent ae5c5b9d4c
commit 274e8d55fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 400 additions and 119 deletions

View File

@ -4,4 +4,5 @@ pub mod crypto;
pub mod fountain;
pub mod staking;
pub mod tx;
pub mod vote;
pub mod wire;

View File

@ -0,0 +1,117 @@
#![allow(dead_code)]
// TODO: Well, remove this when we actually use the fields from the specification
// std
use std::collections::HashSet;
// crates
use futures::{Stream, StreamExt};
// internal
use crate::block::BlockId;
use crate::crypto::PublicKey;
use crate::vote::Tally;
pub type NodeId = PublicKey;
pub enum QuorumCertificate {
Simple(SimpleQuorumCertificate),
Aggregated(AggregatedQuorumCertificate),
}
impl QuorumCertificate {
pub fn view(&self) -> u64 {
match self {
QuorumCertificate::Simple(qc) => qc.view,
QuorumCertificate::Aggregated(qc) => qc.view,
}
}
}
pub struct SimpleQuorumCertificate {
view: u64,
block: BlockId,
}
pub struct AggregatedQuorumCertificate {
view: u64,
high_qh: Box<QuorumCertificate>,
}
pub struct Vote {
block: BlockId,
view: u64,
voter: NodeId, // TODO: this should be some id, probably the node pk
qc: Option<QuorumCertificate>,
}
impl Vote {
pub fn valid_view(&self, view: u64) -> bool {
self.view == view && self.qc.as_ref().map_or(true, |qc| qc.view() == view - 1)
}
}
#[derive(thiserror::Error, Debug)]
pub enum CarnotTallyError {
#[error("Received invalid vote: {0}")]
InvalidVote(String),
#[error("Did not receive enough votes")]
InsufficientVotes,
}
#[derive(Clone)]
pub struct CarnotTallySettings {
threshold: usize,
// TODO: this probably should be dynamic and should change with the view (?)
participating_nodes: HashSet<NodeId>,
}
pub struct CarnotTally {
settings: CarnotTallySettings,
}
#[async_trait::async_trait]
impl Tally for CarnotTally {
type Vote = Vote;
type Outcome = QuorumCertificate;
type TallyError = CarnotTallyError;
type Settings = CarnotTallySettings;
fn new(settings: Self::Settings) -> Self {
Self { settings }
}
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
view: u64,
mut vote_stream: S,
) -> Result<Self::Outcome, Self::TallyError> {
let mut approved = 0usize;
let mut seen = HashSet::new();
while let Some(vote) = vote_stream.next().await {
// check vote view is valid
if !vote.valid_view(view) {
return Err(CarnotTallyError::InvalidVote("Invalid view".to_string()));
}
// check for duplicated votes
if seen.contains(&vote.voter) {
return Err(CarnotTallyError::InvalidVote(
"Double voted node".to_string(),
));
}
// check for individual nodes votes
if !self.settings.participating_nodes.contains(&vote.voter) {
return Err(CarnotTallyError::InvalidVote(
"Non-participating node".to_string(),
));
}
seen.insert(vote.voter);
approved += 1;
if approved >= self.settings.threshold {
return Ok(QuorumCertificate::Simple(SimpleQuorumCertificate {
view,
block: vote.block,
}));
}
}
Err(CarnotTallyError::InsufficientVotes)
}
}

View File

@ -0,0 +1,76 @@
// std
// crates
use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
// internal
use crate::vote::Tally;
#[derive(Serialize, Deserialize)]
pub struct MockVote {
view: u64,
}
impl MockVote {
pub fn view(&self) -> u64 {
self.view
}
}
#[allow(dead_code)]
pub struct MockQc {
count_votes: usize,
}
pub struct Error(String);
#[derive(Clone, Debug)]
pub struct MockTallySettings {
pub threshold: usize,
}
#[derive(Debug)]
pub struct MockTally {
threshold: usize,
}
impl MockQc {
pub fn new(count_votes: usize) -> Self {
Self { count_votes }
}
pub fn votes(&self) -> usize {
self.count_votes
}
}
#[async_trait::async_trait]
impl Tally for MockTally {
type Vote = MockVote;
type Outcome = MockQc;
type TallyError = Error;
type Settings = MockTallySettings;
fn new(settings: Self::Settings) -> Self {
let Self::Settings { threshold } = settings;
Self { threshold }
}
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
view: u64,
mut vote_stream: S,
) -> Result<Self::Outcome, Self::TallyError> {
let mut count_votes = 0;
while let Some(vote) = vote_stream.next().await {
if vote.view() != view {
return Err(Error("Invalid vote".into()));
}
count_votes += 1;
}
if count_votes > self.threshold {
Ok(MockQc { count_votes })
} else {
Err(Error("Not enough votes".into()))
}
}
}

View File

@ -0,0 +1,18 @@
pub mod carnot;
pub mod mock;
use futures::Stream;
#[async_trait::async_trait]
pub trait Tally {
type Vote;
type Outcome;
type TallyError;
type Settings: Clone;
fn new(settings: Self::Settings) -> Self;
async fn tally<S: Stream<Item = Self::Vote> + Unpin + Send>(
&self,
view: u64,
vote_stream: S,
) -> Result<Self::Outcome, Self::TallyError>;
}

View File

@ -11,6 +11,7 @@ use bincode::{
Options,
};
use once_cell::sync::Lazy;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
pub type Error = bincode::Error;
@ -105,6 +106,11 @@ pub fn serialize<T: Serialize>(item: &T) -> Result<Vec<u8>, Error> {
Ok(buf)
}
/// Deserialize an object directly
pub fn deserialize<T: DeserializeOwned>(item: &[u8]) -> Result<T, Error> {
deserializer(item).deserialize()
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -6,18 +6,19 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
bytes = "1.3"
chrono = "0.4"
rand_chacha = "0.3"
rand = "0.8"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
async-trait = "0.1"
futures = "0.3"
nomos-network = { path = "../network" }
nomos-mempool = { path = "../mempool" }
nomos-core = { path = "../../nomos-core" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
rand_chacha = "0.3"
rand = "0.8"
serde = "1.0"
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
futures = "0.3"
waku-bindings = { version = "0.1.0-rc.2", optional = true}
tracing = "0.1"

View File

@ -36,11 +36,11 @@ impl<Tx, Id> Leadership<Tx, Id> {
}
#[allow(unused, clippy::diverging_sub_expression)]
pub async fn try_propose_block<'view>(
pub async fn try_propose_block<'view, Qc>(
&self,
view: &'view View,
tip: &Tip,
qc: Approval,
qc: Qc,
) -> LeadershipResult<'view> {
let ancestor_hint = todo!("get the ancestor from the tip");
if view.is_leader(self.key.key) {

View File

@ -16,6 +16,7 @@ use std::collections::BTreeMap;
use std::error::Error;
use std::fmt::Debug;
// crates
use serde::{Deserialize, Serialize};
// internal
use crate::network::NetworkAdapter;
use leadership::{Leadership, LeadershipResult};
@ -23,6 +24,7 @@ use nomos_core::block::Block;
use nomos_core::crypto::PublicKey;
use nomos_core::fountain::FountainCode;
use nomos_core::staking::Stake;
use nomos_core::vote::Tally;
use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolService};
use nomos_network::NetworkService;
use overlay::Overlay;
@ -40,37 +42,45 @@ pub type NodeId = PublicKey;
// Random seed for each round provided by the protocol
pub type Seed = [u8; 32];
pub struct CarnotSettings<Fountain: FountainCode> {
pub struct CarnotSettings<Fountain: FountainCode, VoteTally: Tally> {
private_key: [u8; 32],
fountain_settings: Fountain::Settings,
tally_settings: VoteTally::Settings,
}
impl<Fountain: FountainCode> Clone for CarnotSettings<Fountain> {
impl<Fountain: FountainCode, VoteTally: Tally> Clone for CarnotSettings<Fountain, VoteTally> {
fn clone(&self) -> Self {
Self {
private_key: self.private_key,
fountain_settings: self.fountain_settings.clone(),
tally_settings: self.tally_settings.clone(),
}
}
}
impl<Fountain: FountainCode> CarnotSettings<Fountain> {
impl<Fountain: FountainCode, VoteTally: Tally> CarnotSettings<Fountain, VoteTally> {
#[inline]
pub const fn new(private_key: [u8; 32], fountain_settings: Fountain::Settings) -> Self {
pub const fn new(
private_key: [u8; 32],
fountain_settings: Fountain::Settings,
tally_settings: VoteTally::Settings,
) -> Self {
Self {
private_key,
fountain_settings,
tally_settings,
}
}
}
pub struct CarnotConsensus<A, P, M, F, O>
pub struct CarnotConsensus<A, P, M, F, T, O>
where
F: FountainCode,
A: NetworkAdapter,
M: MempoolAdapter<Tx = P::Tx>,
P: MemPool,
O: Overlay<A, F>,
T: Tally,
O: Overlay<A, F, T>,
P::Tx: Debug + 'static,
P::Id: Debug + 'static,
A::Backend: 'static,
@ -81,37 +91,42 @@ where
network_relay: Relay<NetworkService<A::Backend>>,
mempool_relay: Relay<MempoolService<M, P>>,
_fountain: std::marker::PhantomData<F>,
_tally: std::marker::PhantomData<T>,
_overlay: std::marker::PhantomData<O>,
}
impl<A, P, M, F, O> ServiceData for CarnotConsensus<A, P, M, F, O>
impl<A, P, M, F, T, O> ServiceData for CarnotConsensus<A, P, M, F, T, O>
where
F: FountainCode,
A: NetworkAdapter,
P: MemPool,
T: Tally,
P::Tx: Debug,
P::Id: Debug,
M: MempoolAdapter<Tx = P::Tx>,
O: Overlay<A, F>,
O: Overlay<A, F, T>,
{
const SERVICE_ID: ServiceId = "Carnot";
type Settings = CarnotSettings<F>;
type Settings = CarnotSettings<F, T>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait::async_trait]
impl<A, P, M, F, O> ServiceCore for CarnotConsensus<A, P, M, F, O>
impl<A, P, M, F, T, O> ServiceCore for CarnotConsensus<A, P, M, F, T, O>
where
F: FountainCode + Send + Sync + 'static,
A: NetworkAdapter + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
T: Tally + Send + Sync + 'static,
T::Settings: Send + Sync + 'static,
T::Outcome: Send + Sync,
P::Settings: Send + Sync + 'static,
P::Tx: Debug + Send + Sync + 'static,
P::Id: Debug + Send + Sync + 'static,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay<A, F> + Send + Sync + 'static,
O: Overlay<A, F, T> + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
@ -120,6 +135,7 @@ where
service_state,
network_relay,
_fountain: Default::default(),
_tally: Default::default(),
_overlay: Default::default(),
mempool_relay,
})
@ -141,6 +157,7 @@ where
let CarnotSettings {
private_key,
fountain_settings,
tally_settings,
} = self.service_state.settings_reader.get_updated_settings();
let network_adapter = A::new(network_relay).await;
@ -148,6 +165,7 @@ where
let tip = Tip;
let fountain = F::new(fountain_settings);
let tally = T::new(tally_settings);
let leadership = Leadership::new(private_key, mempool_relay);
// FIXME: this should be taken from config
@ -162,11 +180,12 @@ where
// FIXME: this should probably have a timer to detect failed rounds
let res = cur_view
.resolve::<A, O, _, _, _>(
.resolve::<A, O, _, _, _, _>(
private_key,
&tip,
&network_adapter,
&fountain,
&tally,
&leadership,
)
.await;
@ -185,7 +204,7 @@ where
}
}
#[derive(Hash, Eq, PartialEq)]
#[derive(Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct Approval;
// Consensus round, also aids in guaranteeing synchronization
@ -198,28 +217,33 @@ pub struct View {
impl View {
// TODO: might want to encode steps in the type system
pub async fn resolve<'view, A, O, F, Tx, Id>(
pub async fn resolve<'view, A, O, F, T, Tx, Id>(
&'view self,
node_id: NodeId,
tip: &Tip,
adapter: &A,
fountain: &F,
tally: &T,
leadership: &Leadership<Tx, Id>,
) -> Result<(Block, View), Box<dyn Error>>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
O: Overlay<A, F>,
T: Tally + Send + Sync + 'static,
T::Outcome: Send + Sync,
O: Overlay<A, F, T>,
{
let res = if self.is_leader(node_id) {
let block = self
.resolve_leader::<A, O, F, _, _>(node_id, tip, adapter, fountain, leadership)
.resolve_leader::<A, O, F, T, _, _>(
node_id, tip, adapter, fountain, tally, leadership,
)
.await
.unwrap(); // FIXME: handle sad path
let next_view = self.generate_next_view(&block);
(block, next_view)
} else {
self.resolve_non_leader::<A, O, F>(node_id, adapter, fountain)
self.resolve_non_leader::<A, O, F, T>(node_id, adapter, fountain, tally)
.await
.unwrap() // FIXME: handle sad path
};
@ -233,23 +257,26 @@ impl View {
Ok(res)
}
async fn resolve_leader<'view, A, O, F, Tx, Id>(
async fn resolve_leader<'view, A, O, F, T, Tx, Id>(
&'view self,
node_id: NodeId,
tip: &Tip,
adapter: &A,
fountain: &F,
tally: &T,
leadership: &Leadership<Tx, Id>,
) -> Result<Block, ()>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
O: Overlay<A, F>,
T: Tally + Send + Sync + 'static,
T::Outcome: Send + Sync,
O: Overlay<A, F, T>,
{
let overlay = O::new(self, node_id);
// We need to build the QC for the block we are proposing
let qc = overlay.build_qc(self, adapter).await;
let qc = overlay.build_qc(self, adapter, tally).await;
let LeadershipResult::Leader { block, _view } = leadership
.try_propose_block(self, tip, qc)
@ -262,16 +289,18 @@ impl View {
Ok(block)
}
async fn resolve_non_leader<'view, A, O, F>(
async fn resolve_non_leader<'view, A, O, F, T>(
&'view self,
node_id: NodeId,
adapter: &A,
fountain: &F,
tally: &T,
) -> Result<(Block, View), ()>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
O: Overlay<A, F>,
T: Tally + Send + Sync + 'static,
O: Overlay<A, F, T>,
{
let overlay = O::new(self, node_id);
// Consensus in Carnot is achieved in 2 steps from the point of view of a node:
@ -294,7 +323,7 @@ impl View {
// We only consider the happy path for now
if self.pipelined_safe_block(&block) {
overlay
.approve_and_forward(self, &block, adapter, &next_view)
.approve_and_forward(self, &block, adapter, tally, &next_view)
.await
.unwrap(); // FIXME: handle sad path
}

View File

@ -7,15 +7,17 @@ use nomos_network::{
NetworkMsg, NetworkService,
};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio_stream::{wrappers::BroadcastStream, Stream};
use crate::{
network::{
messages::{ApprovalMsg, ProposalChunkMsg},
messages::{ProposalChunkMsg, VoteMsg},
NetworkAdapter,
},
overlay::committees::Committee,
Approval, View,
View,
};
const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic";
@ -114,11 +116,11 @@ impl NetworkAdapter for MockAdapter {
};
}
async fn approvals_stream(
async fn votes_stream<Vote: DeserializeOwned>(
&self,
_committee: Committee,
_view: &View,
) -> Box<dyn Stream<Item = Approval> + Send> {
) -> Box<dyn Stream<Item = Vote> + Send> {
let stream_channel = self
.message_subscriber_channel()
.await
@ -132,7 +134,7 @@ impl NetworkAdapter for MockAdapter {
== message.content_topic().content_topic_name
{
let payload = message.payload();
Some(ApprovalMsg::from_bytes(payload.as_bytes()).approval)
Some(VoteMsg::from_bytes(payload.as_bytes()).vote)
} else {
None
}
@ -144,12 +146,14 @@ impl NetworkAdapter for MockAdapter {
)
}
async fn forward_approval(
async fn forward_approval<Vote: Serialize>(
&self,
_committee: Committee,
_view: &View,
approval_message: ApprovalMsg,
) {
approval_message: VoteMsg<Vote>,
) where
Vote: Send,
{
let message = MockMessage::new(
String::from_utf8_lossy(&approval_message.as_bytes()).to_string(),
MOCK_APPROVAL_CONTENT_TOPIC,

View File

@ -6,16 +6,18 @@ use futures::{Stream, StreamExt};
use tokio_stream::wrappers::BroadcastStream;
// internal
use crate::network::{
messages::{ApprovalMsg, ProposalChunkMsg},
messages::{ProposalChunkMsg, VoteMsg},
NetworkAdapter,
};
use crate::overlay::committees::Committee;
use crate::{Approval, View};
use crate::View;
use nomos_network::{
backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage},
NetworkMsg, NetworkService,
};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use serde::de::DeserializeOwned;
use serde::Serialize;
use waku_bindings::{
ContentFilter, Encoding, StoreQuery, WakuContentTopic, WakuMessage, WakuPubSubTopic,
};
@ -161,27 +163,27 @@ impl NetworkAdapter for WakuAdapter {
};
}
async fn approvals_stream(
async fn votes_stream<Vote: DeserializeOwned>(
&self,
committee: Committee,
view: &View,
) -> Box<dyn Stream<Item = Approval> + Send> {
) -> Box<dyn Stream<Item = Vote> + Send> {
let content_topic = proposal_topic(committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
.await
.map(|message| {
let payload = message.payload();
ApprovalMsg::from_bytes(payload).approval
VoteMsg::from_bytes(payload).vote
}),
))
}
async fn forward_approval(
async fn forward_approval<Vote: Serialize + Send>(
&self,
committee: Committee,
view: &View,
approval_message: ApprovalMsg,
approval_message: VoteMsg<Vote>,
) {
let content_topic = approval_topic(committee, view);

View File

@ -1,8 +1,11 @@
// std
// crates
use bytes::Bytes;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
// internal
use crate::{Approval, NodeId};
use crate::NodeId;
use nomos_core::wire;
#[derive(Clone)]
pub struct ProposalChunkMsg {
@ -21,20 +24,26 @@ impl ProposalChunkMsg {
}
}
pub struct ApprovalMsg {
#[derive(Serialize, Deserialize)]
pub struct VoteMsg<Vote> {
pub source: NodeId,
pub approval: Approval,
pub vote: Vote,
}
impl ApprovalMsg {
impl<Vote> VoteMsg<Vote>
where
Vote: Serialize,
{
pub fn as_bytes(&self) -> Box<[u8]> {
self.source.into()
}
pub fn from_bytes(data: &[u8]) -> Self {
Self {
source: NodeId::try_from(data).unwrap(),
approval: Approval,
}
wire::serialize(self).unwrap().into_boxed_slice()
}
}
impl<Vote> VoteMsg<Vote>
where
Vote: DeserializeOwned,
{
pub fn from_bytes(data: &[u8]) -> Self {
wire::deserialize(data).unwrap()
}
}

View File

@ -6,13 +6,15 @@ use bytes::Bytes;
// crates
use futures::Stream;
// internal
use crate::network::messages::{ApprovalMsg, ProposalChunkMsg};
use crate::network::messages::{ProposalChunkMsg, VoteMsg};
use crate::overlay::committees::Committee;
use crate::{Approval, View};
use crate::View;
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use serde::de::DeserializeOwned;
use serde::Serialize;
#[async_trait::async_trait]
pub trait NetworkAdapter {
@ -31,10 +33,15 @@ pub trait NetworkAdapter {
view: &View,
chunk_msg: ProposalChunkMsg,
);
async fn approvals_stream(
async fn votes_stream<Vote: DeserializeOwned>(
&self,
committee: Committee,
view: &View,
) -> Box<dyn Stream<Item = Approval> + Send>;
async fn forward_approval(&self, committee: Committee, view: &View, approval: ApprovalMsg);
) -> Box<dyn Stream<Item = Vote> + Send>;
async fn forward_approval<Vote: Serialize + Send>(
&self,
committee: Committee,
view: &View,
approval: VoteMsg<Vote>,
);
}

View File

@ -109,8 +109,12 @@ impl<const C: usize> Member<C> {
}
#[async_trait::async_trait]
impl<Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const C: usize>
Overlay<Network, Fountain> for Member<C>
impl<
Network: NetworkAdapter + Sync,
Fountain: FountainCode + Sync,
VoteTally: Tally + Sync,
const C: usize,
> Overlay<Network, Fountain, VoteTally> for Member<C>
{
// we still need view here to help us initialize
fn new(view: &View, node: NodeId) -> Self {
@ -164,6 +168,7 @@ impl<Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const C: usi
view: &View,
_block: &Block,
_adapter: &Network,
_tally: &VoteTally,
_next_view: &View,
) -> Result<(), Box<dyn Error>> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
@ -177,7 +182,12 @@ impl<Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, const C: usi
todo!()
}
async fn build_qc(&self, view: &View, _adapter: &Network) -> Approval {
async fn build_qc(
&self,
view: &View,
_adapter: &Network,
_tally: &VoteTally,
) -> VoteTally::Outcome {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
// maybe the leader publishing the QC?
todo!()

View File

@ -1,29 +1,16 @@
use std::collections::HashSet;
// std
use std::error::Error;
// crates
use futures::StreamExt;
use serde::de::DeserializeOwned;
use serde::Serialize;
// internal
use super::*;
use crate::network::messages::{ApprovalMsg, ProposalChunkMsg};
use crate::network::messages::{ProposalChunkMsg, VoteMsg};
use crate::network::NetworkAdapter;
use crate::overlay::committees::Committee;
const DEFAULT_THRESHOLD: Threshold = Threshold::new(2, 3);
const FLAT_COMMITTEE: Committee = Committee::root();
/// The share of nodes that need to approve a block for it to be valid
/// expressed as a fraction of the total number of nodes
#[derive(Copy, Clone, Debug)]
pub struct Threshold {
num: u64,
den: u64,
}
impl Threshold {
pub const fn new(num: u64, den: u64) -> Self {
Self { num, den }
}
}
/// A flat overlay, everyone is in the same committee.
/// As far as the API is concerned, this should be equivalent to any other
@ -31,18 +18,13 @@ impl Threshold {
/// For this reason, this might act as a 'reference' overlay for testing.
pub struct Flat {
// TODO: this should be a const param, but we can't do that yet
threshold: Threshold,
node_id: NodeId,
view_n: u64,
}
impl Flat {
pub fn new(view_n: u64, node_id: NodeId) -> Self {
Self {
threshold: DEFAULT_THRESHOLD,
node_id,
view_n,
}
Self { node_id, view_n }
}
fn approve(&self, _block: &Block) -> Approval {
@ -52,8 +34,12 @@ impl Flat {
}
#[async_trait::async_trait]
impl<Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> Overlay<Network, Fountain>
for Flat
impl<Network, Fountain, VoteTally> Overlay<Network, Fountain, VoteTally> for Flat
where
Network: NetworkAdapter + Sync,
Fountain: FountainCode + Sync,
VoteTally: Tally + Sync,
VoteTally::Vote: Serialize + DeserializeOwned + Send,
{
fn new(view: &View, node: NodeId) -> Self {
Flat::new(view.view_n, node)
@ -95,6 +81,7 @@ impl<Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> Overlay<Netw
view: &View,
block: &Block,
adapter: &Network,
_tally: &VoteTally,
_next_view: &View,
) -> Result<(), Box<dyn Error>> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
@ -104,8 +91,8 @@ impl<Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> Overlay<Netw
.forward_approval(
FLAT_COMMITTEE,
view,
ApprovalMsg {
approval,
VoteMsg {
vote: approval,
source: self.node_id,
},
)
@ -113,30 +100,24 @@ impl<Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync> Overlay<Netw
Ok(())
}
async fn build_qc(&self, view: &View, adapter: &Network) -> Approval {
async fn build_qc(
&self,
view: &View,
adapter: &Network,
tally: &VoteTally,
) -> VoteTally::Outcome {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
// for now, let's pretend that consensus is reached as soon as the
// block is approved by a share of the nodes
let mut approvals = HashSet::new();
let mut stream = Box::into_pin(adapter.approvals_stream(FLAT_COMMITTEE, view).await);
let stream = Box::into_pin(adapter.votes_stream(FLAT_COMMITTEE, view).await);
// Shadow the original binding so that it can't be directly accessed
// ever again.
while let Some(approval) = stream.next().await {
// insert the approval in the map to deduplicate
// TODO: validate approval
approvals.insert(approval);
// ceil(num/den * n)
let threshold =
(self.threshold.num * view.staking_keys.len() as u64 + self.threshold.den - 1)
/ self.threshold.den;
if approvals.len() as u64 >= threshold {
// consensus reached
// FIXME: build a real QC
return Approval;
}
if let Ok(qc) = tally.tally(view.view_n, stream).await {
qc
} else {
unimplemented!("consensus not reached")
}
unimplemented!("consensus not reached")
}
}

View File

@ -10,10 +10,11 @@ use crate::network::NetworkAdapter;
pub use committees::Member;
use nomos_core::block::Block;
use nomos_core::fountain::{FountainCode, FountainError};
use nomos_core::vote::Tally;
/// Dissemination overlay, tied to a specific view
#[async_trait::async_trait]
pub trait Overlay<Network: NetworkAdapter, Fountain: FountainCode> {
pub trait Overlay<Network: NetworkAdapter, Fountain: FountainCode, VoteTally: Tally> {
fn new(view: &View, node: NodeId) -> Self;
async fn reconstruct_proposal_block(
@ -37,8 +38,14 @@ pub trait Overlay<Network: NetworkAdapter, Fountain: FountainCode> {
view: &View,
block: &Block,
adapter: &Network,
vote_tally: &VoteTally,
next_view: &View,
) -> Result<(), Box<dyn Error>>;
/// Wait for consensus on a block
async fn build_qc(&self, view: &View, adapter: &Network) -> Approval;
async fn build_qc(
&self,
view: &View,
adapter: &Network,
vote_tally: &VoteTally,
) -> VoteTally::Outcome;
}

View File

@ -7,9 +7,11 @@ use bytes::Bytes;
use futures::Stream;
use nomos_core::fountain::FountainError;
use nomos_core::fountain::{mock::MockFountain, FountainCode};
use nomos_core::vote::mock::{MockQc, MockTally, MockTallySettings};
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::services::relay::*;
use serde::de::DeserializeOwned;
use tokio::sync::broadcast::Receiver;
struct DummyOverlay;
@ -17,7 +19,7 @@ struct DummyAdapter;
struct DummyBackend;
#[async_trait]
impl<N: NetworkAdapter + Sync, F: FountainCode + Sync> Overlay<N, F> for DummyOverlay {
impl<N: NetworkAdapter + Sync, F: FountainCode + Sync> Overlay<N, F, MockTally> for DummyOverlay {
fn new(_: &View, _: NodeId) -> Self {
DummyOverlay
}
@ -38,13 +40,15 @@ impl<N: NetworkAdapter + Sync, F: FountainCode + Sync> Overlay<N, F> for DummyOv
_view: &View,
_block: &Block,
_adapter: &N,
_vote_tally: &MockTally,
_next_view: &View,
) -> Result<(), Box<dyn Error>> {
Ok(())
}
async fn build_qc(&self, _view: &View, _: &N) -> Approval {
Approval
async fn build_qc(&self, view: &View, _adapter: &N, _vote_tally: &MockTally) -> MockQc {
// TODO: mock the total votes
MockQc::new(0)
}
}
@ -66,14 +70,21 @@ impl NetworkAdapter for DummyAdapter {
async fn broadcast_block_chunk(&self, _: Committee, _: &View, _: ProposalChunkMsg) {
unimplemented!()
}
async fn approvals_stream(
async fn votes_stream<Vote: DeserializeOwned>(
&self,
_: Committee,
_: &View,
) -> Box<dyn Stream<Item = Approval> + Send> {
_committee: Committee,
_view: &View,
) -> Box<dyn Stream<Item = Vote> + Send> {
unimplemented!()
}
async fn forward_approval<Vote: Serialize + Send>(
&self,
_committee: Committee,
_view: &View,
_approval: VoteMsg<Vote>,
) {
unimplemented!()
}
async fn forward_approval(&self, _: Committee, _: &View, _: ApprovalMsg) {}
}
#[async_trait]
@ -100,11 +111,13 @@ async fn test_single_round_non_leader() {
staking_keys: BTreeMap::new(),
view_n: 0,
};
let mock_tally = MockTally::new(MockTallySettings { threshold: 0 });
let (_, next_view) = view
.resolve_non_leader::<DummyAdapter, DummyOverlay, MockFountain>(
.resolve_non_leader::<DummyAdapter, DummyOverlay, MockFountain, MockTally>(
[0; 32],
&DummyAdapter,
&MockFountain,
&mock_tally,
)
.await
.unwrap();