Consensus backbone (#16)

* tmp

* add peers implem

* update return values

* Backport network consensus adapter (#26)

* Pipe network adapter with proper types over original implementation

* Hold relay on CarnotConsensus

* Scratch Network adapter methods

* Fix tests blocking CI

* Fix waku feature on network crate

* Fix waku_bindings refs

* Restructure consensus network

* Stream block chunk

* Pipe adapter creation with subscription

* Add placeholder proposal chunk and approval messages

* Implement waku backend

* Clippy happy

* Use full path for tokio oneshot and error types in message_subscriber_channel method

* Clean imports

* small fixes

Co-authored-by: Daniel Sanchez Quiros <sanchez.quiros.daniel@gmail.com>

Co-authored-by: Daniel Sanchez Quiros <sanchez.quiros.daniel@gmail.com>
This commit is contained in:
Giacomo Pasini 2022-12-13 11:15:54 +01:00 committed by GitHub
parent 5484cb7079
commit 7cc9181574
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 600 additions and 17 deletions

View File

@ -3,5 +3,6 @@
members = [
"nomos-services/log",
"nomos-services/network",
"nomos-services/storage"
"nomos-services/storage",
"nomos-services/consensus"
]

View File

@ -0,0 +1,23 @@
[package]
name = "nomos-consensus"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
chrono = "0.4"
rand_chacha = "0.3"
rand = "0.8"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
async-trait = "0.1"
nomos-network = { path = "../network" }
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
futures = "0.3"
waku-bindings = { version = "0.1.0-beta1", optional = true}
once_cell = "1.16"
[features]
default = []
waku = ["nomos-network/waku", "waku-bindings"]

View File

@ -0,0 +1,182 @@
//! In this module, and children ones, the 'view lifetime is tied to a logical consensus view,
//! represented by the `View` struct.
//! This is done to ensure that all the different data structs used to represent various actors
//! are always synchronized (i.e. it cannot happen that we accidentally use committees from different views).
//! It's obviously extremely important that the information contained in `View` is synchronized across different
//! nodes, but that has to be achieved through different means.
mod network;
pub mod overlay;
use overlay::{Member, Overlay};
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::NoMessage,
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
};
// Raw bytes for now, could be a ed25519 public key
pub type NodeId = [u8; 32];
// Random seed for each round provided by the protocol
pub type Seed = [u8; 32];
pub type Stake = u64;
use crate::network::NetworkAdapter;
use nomos_network::NetworkService;
use overwatch_rs::services::relay::{OutboundRelay, Relay};
use std::collections::{BTreeMap, HashSet};
const COMMITTEE_SIZE: usize = 1;
#[derive(Clone)]
pub struct CarnotSettings {
private_key: [u8; 32],
}
pub struct CarnotConsensus<Network: NetworkAdapter + Send + Sync + 'static> {
service_state: ServiceStateHandle<Self>,
// underlying networking backend. We need this so we can relay and check the types properly
// when implementing ServiceCore for CarnotConsensus
network_relay: Relay<NetworkService<<Network as NetworkAdapter>::Backend>>,
}
impl<Network: NetworkAdapter + Send + Sync + 'static> ServiceData for CarnotConsensus<Network> {
const SERVICE_ID: ServiceId = "Carnot";
type Settings = CarnotSettings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait::async_trait]
impl<Network: NetworkAdapter + Send + Sync + 'static> ServiceCore for CarnotConsensus<Network> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
Ok(Self {
service_state,
network_relay,
})
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let mut view_generator = self.view_generator().await;
let network_relay: OutboundRelay<_> = self
.network_relay
.connect()
.await
.expect("Relay connection with NetworkService should succeed");
let network_adapter = Network::new(network_relay).await;
// TODO: fix
let node_id = self
.service_state
.settings_reader
.get_updated_settings()
.private_key;
loop {
let view = view_generator.next().await;
// if we want to process multiple views at the same time this can
// be spawned as a separate future
// TODO: add leadership module
view.resolve::<Network, Member<'_, COMMITTEE_SIZE>>(node_id, &network_adapter)
.await;
}
}
}
impl<Network: NetworkAdapter + Send + Sync + 'static> CarnotConsensus<Network> {
// Build a service that generates new views as they become available
async fn view_generator(&self) -> ViewGenerator {
todo!()
}
}
/// Tracks new views and make them available as soon as they are available
///
/// A new view is normally generated as soon a a block is approved, but
/// additional logic is needed in failure cases, like when no new block is
/// approved for a long enough period of time
struct ViewGenerator;
impl ViewGenerator {
async fn next(&mut self) -> View {
todo!()
}
}
/// A block
#[derive(Clone)]
pub struct Block;
/// A block chunk, N pieces are necessary to reconstruct the full block
#[derive(Clone, Copy, Debug)]
pub struct BlockChunk {
index: u8,
}
impl Block {
/// Fake implementation of erasure coding protocol
pub fn chunk<const SIZE: usize>(self) -> [BlockChunk; SIZE] {
// TODO: this is a completely temporary and fake implementation
(0..SIZE)
.map(|i| BlockChunk { index: i as u8 })
.collect::<Vec<_>>()
.try_into()
.expect("This should not fail unless chunking exceed memory limits")
}
}
#[derive(Hash, Eq, PartialEq)]
pub struct Approval;
// Consensus round, also aids in guaranteeing synchronization
// between various data structures by means of lifetimes
pub struct View {
seed: Seed,
staking_keys: BTreeMap<NodeId, Stake>,
_view_n: u64,
}
impl View {
const APPROVAL_THRESHOLD: usize = 1;
// TODO: might want to encode steps in the type system
async fn resolve<'view, Network: NetworkAdapter, O: Overlay<'view, Network>>(
&'view self,
node_id: NodeId,
adapter: &Network,
) {
let overlay = O::new(self, node_id);
let block = overlay.reconstruct_proposal_block(adapter).await;
// TODO: verify?
overlay.broadcast_block(block.clone(), adapter).await;
self.approve(&overlay, block, adapter).await;
}
async fn approve<'view, Network: NetworkAdapter, O: Overlay<'view, Network>>(
&'view self,
overlay: &O,
block: Block,
adapter: &Network,
) {
// wait for approval in the overlay, if necessary
let mut approvals = HashSet::new();
let mut stream = overlay.collect_approvals(block, adapter).await;
while let Some(approval) = stream.recv().await {
approvals.insert(approval);
if approvals.len() > Self::APPROVAL_THRESHOLD {
let self_approval = self.craft_proof_of_approval(approvals.into_iter());
overlay.forward_approval(self_approval, adapter).await;
return;
}
}
}
fn craft_proof_of_approval(&self, _approvals: impl Iterator<Item = Approval>) -> Approval {
todo!()
}
}

View File

@ -0,0 +1,2 @@
#[cfg(feature = "waku")]
pub mod waku;

View File

@ -0,0 +1,176 @@
// std
// crates
use futures::{Stream, StreamExt};
use once_cell::sync::Lazy;
use tokio_stream::wrappers::BroadcastStream;
// internal
use crate::network::{
messages::{ApprovalMsg, ProposalChunkMsg},
NetworkAdapter,
};
use crate::{Approval, BlockChunk, View};
use nomos_network::{
backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage},
NetworkMsg, NetworkService,
};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic};
static WAKU_CARNOT_PUB_SUB_TOPIC: Lazy<WakuPubSubTopic> =
Lazy::new(|| WakuPubSubTopic::new("CarnotSim".to_string(), Encoding::Proto));
static WAKU_CARNOT_BLOCK_CONTENT_TOPIC: Lazy<WakuContentTopic> = Lazy::new(|| WakuContentTopic {
application_name: "CarnotSim".to_string(),
version: 1,
content_topic_name: "CarnotBlock".to_string(),
encoding: Encoding::Proto,
});
static WAKU_CARNOT_APPROVAL_CONTENT_TOPIC: Lazy<WakuContentTopic> =
Lazy::new(|| WakuContentTopic {
application_name: "CarnotSim".to_string(),
version: 1,
content_topic_name: "CarnotApproval".to_string(),
encoding: Encoding::Proto,
});
// TODO: ehm...this should be here, but we will change it whenever the chunking is decided.
const CHUNK_SIZE: usize = 8;
pub struct WakuAdapter {
network_relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
}
impl WakuAdapter {
async fn message_subscriber_channel(
&self,
) -> Result<
tokio::sync::broadcast::Receiver<NetworkEvent>,
tokio::sync::oneshot::error::RecvError,
> {
let (sender, receiver) = tokio::sync::oneshot::channel();
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender,
})
.await
{
todo!("log error");
};
receiver.await
}
}
#[async_trait::async_trait]
impl NetworkAdapter for WakuAdapter {
type Backend = Waku;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
Self { network_relay }
}
async fn proposal_chunks_stream(&self) -> Box<dyn Stream<Item = BlockChunk>> {
let stream_channel = self
.message_subscriber_channel()
.await
.unwrap_or_else(|_e| todo!("handle error"));
Box::new(
BroadcastStream::new(stream_channel).filter_map(|msg| async move {
match msg {
Ok(event) => match event {
NetworkEvent::RawMessage(message) => {
// TODO: this should actually check the whole content topic,
// waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28)
if WAKU_CARNOT_BLOCK_CONTENT_TOPIC.content_topic_name
== message.content_topic().content_topic_name
{
let payload = message.payload();
Some(
ProposalChunkMsg::from_bytes::<CHUNK_SIZE>(
payload.try_into().unwrap(),
)
.chunk,
)
} else {
None
}
}
},
Err(_e) => None,
}
}),
)
}
async fn broadcast_block_chunk(&self, _view: View, chunk_message: ProposalChunkMsg) {
// TODO: probably later, depending on the view we should map to different content topics
// but this is an ongoing idea that should/will be discus.
let message = WakuMessage::new::<[u8; CHUNK_SIZE]>(
chunk_message.as_bytes(),
WAKU_CARNOT_BLOCK_CONTENT_TOPIC.clone(),
1,
chrono::Utc::now().timestamp() as usize,
);
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message,
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
}))
.await
{
todo!("log error");
};
}
async fn approvals_stream(&self) -> Box<dyn Stream<Item = Approval>> {
let stream_channel = self
.message_subscriber_channel()
.await
.unwrap_or_else(|_e| todo!("handle error"));
Box::new(
BroadcastStream::new(stream_channel).filter_map(|msg| async move {
match msg {
Ok(event) => match event {
NetworkEvent::RawMessage(message) => {
// TODO: this should actually check the whole content topic,
// waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28)
if WAKU_CARNOT_APPROVAL_CONTENT_TOPIC.content_topic_name
== message.content_topic().content_topic_name
{
let payload = message.payload();
Some(ApprovalMsg::from_bytes(payload).approval)
} else {
None
}
}
},
Err(_e) => None,
}
}),
)
}
async fn forward_approval(&self, approval_message: ApprovalMsg) {
let message = WakuMessage::new(
approval_message.as_bytes(),
WAKU_CARNOT_APPROVAL_CONTENT_TOPIC.clone(),
1,
chrono::Utc::now().timestamp() as usize,
);
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message,
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
}))
.await
{
todo!("log error");
};
}
}

View File

@ -0,0 +1,37 @@
use crate::{Approval, BlockChunk, NodeId};
pub struct ProposalChunkMsg {
pub chunk: BlockChunk,
}
// TODO: this is completely temporal and match no reality at all, but it will help use fake some of the process
impl ProposalChunkMsg {
pub fn as_bytes<const SIZE: usize>(&self) -> [u8; SIZE] {
[self.chunk.index; SIZE]
}
pub fn from_bytes<const SIZE: usize>(data: [u8; SIZE]) -> Self {
let index = data[0];
Self {
chunk: BlockChunk { index },
}
}
}
pub struct ApprovalMsg {
pub source: NodeId,
pub approval: Approval,
}
impl ApprovalMsg {
pub fn as_bytes(&self) -> Box<[u8]> {
self.source.into()
}
pub fn from_bytes(data: &[u8]) -> Self {
Self {
source: NodeId::try_from(data).unwrap(),
approval: Approval,
}
}
}

View File

@ -0,0 +1,22 @@
pub mod adapters;
mod messages;
use crate::network::messages::{ApprovalMsg, ProposalChunkMsg};
use crate::{Approval, BlockChunk, View};
use futures::Stream;
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
#[async_trait::async_trait]
pub trait NetworkAdapter {
type Backend: NetworkBackend + Send + Sync + 'static;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self;
async fn proposal_chunks_stream(&self) -> Box<dyn Stream<Item = BlockChunk>>;
async fn broadcast_block_chunk(&self, view: View, chunk_msg: ProposalChunkMsg);
async fn approvals_stream(&self) -> Box<dyn Stream<Item = Approval>>;
async fn forward_approval(&self, approval: ApprovalMsg);
}

View File

@ -0,0 +1,119 @@
use super::*;
use crate::network::NetworkAdapter;
use rand::{seq::SliceRandom, SeedableRng};
/// View of the tree overlay centered around a specific member
pub struct Member<'view, const C: usize> {
id: NodeId,
committee: Committee,
committees: Committees<'view, C>,
}
/// #Just a newtype index to be able to implement parent/children methods
#[derive(Copy, Clone)]
pub struct Committee(usize);
pub struct Committees<'view, const C: usize> {
view: &'view View,
nodes: Box<[NodeId]>,
}
impl<'view, const C: usize> Committees<'view, C> {
pub fn new(view: &'view View) -> Self {
let mut nodes = view.staking_keys.keys().cloned().collect::<Box<[NodeId]>>();
let mut rng = rand_chacha::ChaCha20Rng::from_seed(view.seed);
nodes.shuffle(&mut rng);
Self { nodes, view }
}
pub fn into_member(self, id: NodeId) -> Option<Member<'view, C>> {
let member_idx = self.nodes.iter().position(|m| m == &id)?;
Some(Member {
committee: Committee(member_idx / C),
committees: self,
id,
})
}
fn get_committee_members(&self, committee: Committee) -> Option<&[NodeId]> {
let leftb = committee.0 * C;
let rightb = std::cmp::min(self.nodes.len(), leftb + C);
if leftb < rightb {
Some(&self.nodes[leftb..rightb])
} else {
None
}
}
}
impl Committee {
/// Return the left and right children committee, if any
pub fn children(&self) -> (Committee, Committee) {
(
// left child
Committee(self.0 * 2 + 1),
// right child
Committee(self.0 + 2 + 2),
)
}
/// Return the parent committee, if any
pub fn parent(&self) -> Option<Committee> {
if self.0 == 0 {
None
} else {
Some(Committee((self.0 - 1) / 2))
}
}
}
impl<'view, const C: usize> Member<'view, C> {
/// Return other members of this committee
pub fn peers(&self) -> &[NodeId] {
self.committees
.get_committee_members(self.committee)
.unwrap()
}
/// Return the participant in the parent committee this member should interact
/// with
pub fn parent_committee(&self) -> Option<Committee> {
self.committee.parent()
}
// Return participants in the children committees this member should interact with
pub fn children_committes(&self) -> (Committee, Committee) {
self.committee.children()
}
}
#[async_trait::async_trait]
impl<'view, Network: NetworkAdapter + Send + Sync, const C: usize> Overlay<'view, Network>
for Member<'view, C>
{
fn new(view: &'view View, node: NodeId) -> Self {
let committees = Committees::new(view);
committees.into_member(node).unwrap()
}
async fn reconstruct_proposal_block(&self, adapter: &Network) -> Block {
todo!()
}
async fn broadcast_block(&self, _block: Block, adapter: &Network) {
todo!()
}
async fn collect_approvals(
&self,
_block: Block,
adapter: &Network,
) -> tokio::sync::mpsc::Receiver<Approval> {
todo!()
}
async fn forward_approval(&self, _approval: Approval, adapter: &Network) {
todo!()
}
}

View File

@ -0,0 +1,22 @@
use super::{Approval, Block, NodeId, View};
#[allow(unused)]
mod committees;
use crate::network::NetworkAdapter;
pub use committees::Member;
// Dissamination overlay, tied to a specific view
#[async_trait::async_trait]
pub trait Overlay<'view, Network: NetworkAdapter> {
fn new(view: &'view View, node: NodeId) -> Self;
async fn reconstruct_proposal_block(&self, adapter: &Network) -> Block;
async fn broadcast_block(&self, block: Block, adapter: &Network);
async fn collect_approvals(
&self,
block: Block,
adapter: &Network,
) -> tokio::sync::mpsc::Receiver<Approval>;
async fn forward_approval(&self, approval: Approval, adapter: &Network);
}

View File

@ -8,7 +8,7 @@ edition = "2021"
[dependencies]
async-trait = "0.1"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
serde = "1.0"
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1"
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["json"] }

View File

@ -22,4 +22,5 @@ tracing-gelf = "0.7"
futures = "0.3"
[features]
default = ["waku-bindings"]
default = []
waku = ["waku-bindings"]

View File

@ -3,9 +3,7 @@ use overwatch_rs::services::state::ServiceState;
use tokio::sync::broadcast::Receiver;
#[cfg(feature = "waku")]
mod waku;
#[cfg(feature = "waku")]
pub use self::waku::Waku;
pub mod waku;
#[async_trait::async_trait]
pub trait NetworkBackend {

View File

@ -1,5 +1,4 @@
use super::*;
use ::waku_bindings::*;
use overwatch_rs::services::state::NoState;
use serde::{Deserialize, Serialize};
use tokio::sync::{
@ -7,6 +6,7 @@ use tokio::sync::{
oneshot,
};
use tracing::{debug, error};
use waku_bindings::*;
const BROADCAST_CHANNEL_BUF: usize = 16;
@ -96,15 +96,6 @@ impl NetworkBackend for Waku {
}
}
async fn subscribe(&mut self, kind: Self::EventKind) -> Receiver<Self::NetworkEvent> {
match kind {
EventKind::Message => {
debug!("processed subscription to incoming messages");
self.message_event.subscribe()
}
}
}
async fn process(&self, msg: Self::Message) {
match msg {
WakuBackendMessage::Broadcast { message, topic } => {
@ -172,4 +163,13 @@ impl NetworkBackend for Waku {
},
};
}
async fn subscribe(&mut self, kind: Self::EventKind) -> Receiver<Self::NetworkEvent> {
match kind {
EventKind::Message => {
debug!("processed subscription to incoming messages");
self.message_event.subscribe()
}
}
}
}

View File

@ -52,7 +52,7 @@ pub trait StorageBackend: Sized {
#[cfg(test)]
pub mod testing {
use crate::backends::StorageSerde;
use super::StorageSerde;
use bytes::Bytes;
use serde::de::DeserializeOwned;
use serde::Serialize;