diff --git a/Cargo.toml b/Cargo.toml index 31cf67a8..73a0edfd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,5 +3,6 @@ members = [ "nomos-services/log", "nomos-services/network", - "nomos-services/storage" + "nomos-services/storage", + "nomos-services/consensus" ] \ No newline at end of file diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml new file mode 100644 index 00000000..ed4ffa9c --- /dev/null +++ b/nomos-services/consensus/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "nomos-consensus" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +chrono = "0.4" +rand_chacha = "0.3" +rand = "0.8" +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +async-trait = "0.1" +nomos-network = { path = "../network" } +tokio = { version = "1", features = ["sync"] } +tokio-stream = "0.1" +futures = "0.3" +waku-bindings = { version = "0.1.0-beta1", optional = true} +once_cell = "1.16" + +[features] +default = [] +waku = ["nomos-network/waku", "waku-bindings"] \ No newline at end of file diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs new file mode 100644 index 00000000..0e5c34b0 --- /dev/null +++ b/nomos-services/consensus/src/lib.rs @@ -0,0 +1,182 @@ +//! In this module, and children ones, the 'view lifetime is tied to a logical consensus view, +//! represented by the `View` struct. +//! This is done to ensure that all the different data structs used to represent various actors +//! are always synchronized (i.e. it cannot happen that we accidentally use committees from different views). +//! It's obviously extremely important that the information contained in `View` is synchronized across different +//! nodes, but that has to be achieved through different means. +mod network; +pub mod overlay; + +use overlay::{Member, Overlay}; +use overwatch_rs::services::{ + handle::ServiceStateHandle, + relay::NoMessage, + state::{NoOperator, NoState}, + ServiceCore, ServiceData, ServiceId, +}; + +// Raw bytes for now, could be a ed25519 public key +pub type NodeId = [u8; 32]; +// Random seed for each round provided by the protocol +pub type Seed = [u8; 32]; +pub type Stake = u64; + +use crate::network::NetworkAdapter; +use nomos_network::NetworkService; +use overwatch_rs::services::relay::{OutboundRelay, Relay}; +use std::collections::{BTreeMap, HashSet}; + +const COMMITTEE_SIZE: usize = 1; + +#[derive(Clone)] +pub struct CarnotSettings { + private_key: [u8; 32], +} + +pub struct CarnotConsensus { + service_state: ServiceStateHandle, + // underlying networking backend. We need this so we can relay and check the types properly + // when implementing ServiceCore for CarnotConsensus + network_relay: Relay::Backend>>, +} + +impl ServiceData for CarnotConsensus { + const SERVICE_ID: ServiceId = "Carnot"; + type Settings = CarnotSettings; + type State = NoState; + type StateOperator = NoOperator; + type Message = NoMessage; +} + +#[async_trait::async_trait] +impl ServiceCore for CarnotConsensus { + fn init(service_state: ServiceStateHandle) -> Result { + let network_relay = service_state.overwatch_handle.relay(); + Ok(Self { + service_state, + network_relay, + }) + } + + async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + let mut view_generator = self.view_generator().await; + + let network_relay: OutboundRelay<_> = self + .network_relay + .connect() + .await + .expect("Relay connection with NetworkService should succeed"); + + let network_adapter = Network::new(network_relay).await; + + // TODO: fix + let node_id = self + .service_state + .settings_reader + .get_updated_settings() + .private_key; + + loop { + let view = view_generator.next().await; + // if we want to process multiple views at the same time this can + // be spawned as a separate future + // TODO: add leadership module + view.resolve::>(node_id, &network_adapter) + .await; + } + } +} + +impl CarnotConsensus { + // Build a service that generates new views as they become available + async fn view_generator(&self) -> ViewGenerator { + todo!() + } +} + +/// Tracks new views and make them available as soon as they are available +/// +/// A new view is normally generated as soon a a block is approved, but +/// additional logic is needed in failure cases, like when no new block is +/// approved for a long enough period of time +struct ViewGenerator; + +impl ViewGenerator { + async fn next(&mut self) -> View { + todo!() + } +} + +/// A block +#[derive(Clone)] +pub struct Block; + +/// A block chunk, N pieces are necessary to reconstruct the full block +#[derive(Clone, Copy, Debug)] +pub struct BlockChunk { + index: u8, +} + +impl Block { + /// Fake implementation of erasure coding protocol + pub fn chunk(self) -> [BlockChunk; SIZE] { + // TODO: this is a completely temporary and fake implementation + (0..SIZE) + .map(|i| BlockChunk { index: i as u8 }) + .collect::>() + .try_into() + .expect("This should not fail unless chunking exceed memory limits") + } +} + +#[derive(Hash, Eq, PartialEq)] +pub struct Approval; + +// Consensus round, also aids in guaranteeing synchronization +// between various data structures by means of lifetimes +pub struct View { + seed: Seed, + staking_keys: BTreeMap, + _view_n: u64, +} + +impl View { + const APPROVAL_THRESHOLD: usize = 1; + + // TODO: might want to encode steps in the type system + async fn resolve<'view, Network: NetworkAdapter, O: Overlay<'view, Network>>( + &'view self, + node_id: NodeId, + adapter: &Network, + ) { + let overlay = O::new(self, node_id); + + let block = overlay.reconstruct_proposal_block(adapter).await; + // TODO: verify? + overlay.broadcast_block(block.clone(), adapter).await; + self.approve(&overlay, block, adapter).await; + } + + async fn approve<'view, Network: NetworkAdapter, O: Overlay<'view, Network>>( + &'view self, + overlay: &O, + block: Block, + adapter: &Network, + ) { + // wait for approval in the overlay, if necessary + let mut approvals = HashSet::new(); + let mut stream = overlay.collect_approvals(block, adapter).await; + while let Some(approval) = stream.recv().await { + approvals.insert(approval); + if approvals.len() > Self::APPROVAL_THRESHOLD { + let self_approval = self.craft_proof_of_approval(approvals.into_iter()); + overlay.forward_approval(self_approval, adapter).await; + return; + } + } + } + + fn craft_proof_of_approval(&self, _approvals: impl Iterator) -> Approval { + todo!() + } +} diff --git a/nomos-services/consensus/src/network/adapters/mod.rs b/nomos-services/consensus/src/network/adapters/mod.rs new file mode 100644 index 00000000..ac25906e --- /dev/null +++ b/nomos-services/consensus/src/network/adapters/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "waku")] +pub mod waku; diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs new file mode 100644 index 00000000..0164a30d --- /dev/null +++ b/nomos-services/consensus/src/network/adapters/waku.rs @@ -0,0 +1,176 @@ +// std +// crates +use futures::{Stream, StreamExt}; +use once_cell::sync::Lazy; +use tokio_stream::wrappers::BroadcastStream; +// internal +use crate::network::{ + messages::{ApprovalMsg, ProposalChunkMsg}, + NetworkAdapter, +}; +use crate::{Approval, BlockChunk, View}; +use nomos_network::{ + backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage}, + NetworkMsg, NetworkService, +}; +use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; +use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic}; + +static WAKU_CARNOT_PUB_SUB_TOPIC: Lazy = + Lazy::new(|| WakuPubSubTopic::new("CarnotSim".to_string(), Encoding::Proto)); + +static WAKU_CARNOT_BLOCK_CONTENT_TOPIC: Lazy = Lazy::new(|| WakuContentTopic { + application_name: "CarnotSim".to_string(), + version: 1, + content_topic_name: "CarnotBlock".to_string(), + encoding: Encoding::Proto, +}); + +static WAKU_CARNOT_APPROVAL_CONTENT_TOPIC: Lazy = + Lazy::new(|| WakuContentTopic { + application_name: "CarnotSim".to_string(), + version: 1, + content_topic_name: "CarnotApproval".to_string(), + encoding: Encoding::Proto, + }); + +// TODO: ehm...this should be here, but we will change it whenever the chunking is decided. +const CHUNK_SIZE: usize = 8; + +pub struct WakuAdapter { + network_relay: OutboundRelay< as ServiceData>::Message>, +} + +impl WakuAdapter { + async fn message_subscriber_channel( + &self, + ) -> Result< + tokio::sync::broadcast::Receiver, + tokio::sync::oneshot::error::RecvError, + > { + let (sender, receiver) = tokio::sync::oneshot::channel(); + if let Err((_, _e)) = self + .network_relay + .send(NetworkMsg::Subscribe { + kind: EventKind::Message, + sender, + }) + .await + { + todo!("log error"); + }; + receiver.await + } +} + +#[async_trait::async_trait] +impl NetworkAdapter for WakuAdapter { + type Backend = Waku; + + async fn new( + network_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self { + Self { network_relay } + } + + async fn proposal_chunks_stream(&self) -> Box> { + let stream_channel = self + .message_subscriber_channel() + .await + .unwrap_or_else(|_e| todo!("handle error")); + Box::new( + BroadcastStream::new(stream_channel).filter_map(|msg| async move { + match msg { + Ok(event) => match event { + NetworkEvent::RawMessage(message) => { + // TODO: this should actually check the whole content topic, + // waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28) + if WAKU_CARNOT_BLOCK_CONTENT_TOPIC.content_topic_name + == message.content_topic().content_topic_name + { + let payload = message.payload(); + Some( + ProposalChunkMsg::from_bytes::( + payload.try_into().unwrap(), + ) + .chunk, + ) + } else { + None + } + } + }, + Err(_e) => None, + } + }), + ) + } + + async fn broadcast_block_chunk(&self, _view: View, chunk_message: ProposalChunkMsg) { + // TODO: probably later, depending on the view we should map to different content topics + // but this is an ongoing idea that should/will be discus. + let message = WakuMessage::new::<[u8; CHUNK_SIZE]>( + chunk_message.as_bytes(), + WAKU_CARNOT_BLOCK_CONTENT_TOPIC.clone(), + 1, + chrono::Utc::now().timestamp() as usize, + ); + if let Err((_, _e)) = self + .network_relay + .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { + message, + topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), + })) + .await + { + todo!("log error"); + }; + } + + async fn approvals_stream(&self) -> Box> { + let stream_channel = self + .message_subscriber_channel() + .await + .unwrap_or_else(|_e| todo!("handle error")); + Box::new( + BroadcastStream::new(stream_channel).filter_map(|msg| async move { + match msg { + Ok(event) => match event { + NetworkEvent::RawMessage(message) => { + // TODO: this should actually check the whole content topic, + // waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28) + if WAKU_CARNOT_APPROVAL_CONTENT_TOPIC.content_topic_name + == message.content_topic().content_topic_name + { + let payload = message.payload(); + Some(ApprovalMsg::from_bytes(payload).approval) + } else { + None + } + } + }, + Err(_e) => None, + } + }), + ) + } + + async fn forward_approval(&self, approval_message: ApprovalMsg) { + let message = WakuMessage::new( + approval_message.as_bytes(), + WAKU_CARNOT_APPROVAL_CONTENT_TOPIC.clone(), + 1, + chrono::Utc::now().timestamp() as usize, + ); + if let Err((_, _e)) = self + .network_relay + .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { + message, + topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), + })) + .await + { + todo!("log error"); + }; + } +} diff --git a/nomos-services/consensus/src/network/messages.rs b/nomos-services/consensus/src/network/messages.rs new file mode 100644 index 00000000..db35a74d --- /dev/null +++ b/nomos-services/consensus/src/network/messages.rs @@ -0,0 +1,37 @@ +use crate::{Approval, BlockChunk, NodeId}; + +pub struct ProposalChunkMsg { + pub chunk: BlockChunk, +} + +// TODO: this is completely temporal and match no reality at all, but it will help use fake some of the process +impl ProposalChunkMsg { + pub fn as_bytes(&self) -> [u8; SIZE] { + [self.chunk.index; SIZE] + } + + pub fn from_bytes(data: [u8; SIZE]) -> Self { + let index = data[0]; + Self { + chunk: BlockChunk { index }, + } + } +} + +pub struct ApprovalMsg { + pub source: NodeId, + pub approval: Approval, +} + +impl ApprovalMsg { + pub fn as_bytes(&self) -> Box<[u8]> { + self.source.into() + } + + pub fn from_bytes(data: &[u8]) -> Self { + Self { + source: NodeId::try_from(data).unwrap(), + approval: Approval, + } + } +} diff --git a/nomos-services/consensus/src/network/mod.rs b/nomos-services/consensus/src/network/mod.rs new file mode 100644 index 00000000..31bdc8dc --- /dev/null +++ b/nomos-services/consensus/src/network/mod.rs @@ -0,0 +1,22 @@ +pub mod adapters; +mod messages; + +use crate::network::messages::{ApprovalMsg, ProposalChunkMsg}; +use crate::{Approval, BlockChunk, View}; +use futures::Stream; +use nomos_network::backends::NetworkBackend; +use nomos_network::NetworkService; +use overwatch_rs::services::relay::OutboundRelay; +use overwatch_rs::services::ServiceData; + +#[async_trait::async_trait] +pub trait NetworkAdapter { + type Backend: NetworkBackend + Send + Sync + 'static; + async fn new( + network_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self; + async fn proposal_chunks_stream(&self) -> Box>; + async fn broadcast_block_chunk(&self, view: View, chunk_msg: ProposalChunkMsg); + async fn approvals_stream(&self) -> Box>; + async fn forward_approval(&self, approval: ApprovalMsg); +} diff --git a/nomos-services/consensus/src/overlay/committees.rs b/nomos-services/consensus/src/overlay/committees.rs new file mode 100644 index 00000000..8b933f0e --- /dev/null +++ b/nomos-services/consensus/src/overlay/committees.rs @@ -0,0 +1,119 @@ +use super::*; +use crate::network::NetworkAdapter; +use rand::{seq::SliceRandom, SeedableRng}; + +/// View of the tree overlay centered around a specific member +pub struct Member<'view, const C: usize> { + id: NodeId, + committee: Committee, + committees: Committees<'view, C>, +} + +/// #Just a newtype index to be able to implement parent/children methods +#[derive(Copy, Clone)] +pub struct Committee(usize); + +pub struct Committees<'view, const C: usize> { + view: &'view View, + nodes: Box<[NodeId]>, +} + +impl<'view, const C: usize> Committees<'view, C> { + pub fn new(view: &'view View) -> Self { + let mut nodes = view.staking_keys.keys().cloned().collect::>(); + let mut rng = rand_chacha::ChaCha20Rng::from_seed(view.seed); + nodes.shuffle(&mut rng); + Self { nodes, view } + } + + pub fn into_member(self, id: NodeId) -> Option> { + let member_idx = self.nodes.iter().position(|m| m == &id)?; + Some(Member { + committee: Committee(member_idx / C), + committees: self, + id, + }) + } + + fn get_committee_members(&self, committee: Committee) -> Option<&[NodeId]> { + let leftb = committee.0 * C; + let rightb = std::cmp::min(self.nodes.len(), leftb + C); + + if leftb < rightb { + Some(&self.nodes[leftb..rightb]) + } else { + None + } + } +} + +impl Committee { + /// Return the left and right children committee, if any + pub fn children(&self) -> (Committee, Committee) { + ( + // left child + Committee(self.0 * 2 + 1), + // right child + Committee(self.0 + 2 + 2), + ) + } + + /// Return the parent committee, if any + pub fn parent(&self) -> Option { + if self.0 == 0 { + None + } else { + Some(Committee((self.0 - 1) / 2)) + } + } +} + +impl<'view, const C: usize> Member<'view, C> { + /// Return other members of this committee + pub fn peers(&self) -> &[NodeId] { + self.committees + .get_committee_members(self.committee) + .unwrap() + } + + /// Return the participant in the parent committee this member should interact + /// with + pub fn parent_committee(&self) -> Option { + self.committee.parent() + } + + // Return participants in the children committees this member should interact with + pub fn children_committes(&self) -> (Committee, Committee) { + self.committee.children() + } +} + +#[async_trait::async_trait] +impl<'view, Network: NetworkAdapter + Send + Sync, const C: usize> Overlay<'view, Network> + for Member<'view, C> +{ + fn new(view: &'view View, node: NodeId) -> Self { + let committees = Committees::new(view); + committees.into_member(node).unwrap() + } + + async fn reconstruct_proposal_block(&self, adapter: &Network) -> Block { + todo!() + } + + async fn broadcast_block(&self, _block: Block, adapter: &Network) { + todo!() + } + + async fn collect_approvals( + &self, + _block: Block, + adapter: &Network, + ) -> tokio::sync::mpsc::Receiver { + todo!() + } + + async fn forward_approval(&self, _approval: Approval, adapter: &Network) { + todo!() + } +} diff --git a/nomos-services/consensus/src/overlay/mod.rs b/nomos-services/consensus/src/overlay/mod.rs new file mode 100644 index 00000000..6a45c8b3 --- /dev/null +++ b/nomos-services/consensus/src/overlay/mod.rs @@ -0,0 +1,22 @@ +use super::{Approval, Block, NodeId, View}; + +#[allow(unused)] +mod committees; + +use crate::network::NetworkAdapter; +pub use committees::Member; + +// Dissamination overlay, tied to a specific view +#[async_trait::async_trait] +pub trait Overlay<'view, Network: NetworkAdapter> { + fn new(view: &'view View, node: NodeId) -> Self; + + async fn reconstruct_proposal_block(&self, adapter: &Network) -> Block; + async fn broadcast_block(&self, block: Block, adapter: &Network); + async fn collect_approvals( + &self, + block: Block, + adapter: &Network, + ) -> tokio::sync::mpsc::Receiver; + async fn forward_approval(&self, approval: Approval, adapter: &Network); +} diff --git a/nomos-services/log/Cargo.toml b/nomos-services/log/Cargo.toml index bb192ea0..feb91f08 100644 --- a/nomos-services/log/Cargo.toml +++ b/nomos-services/log/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] async-trait = "0.1" overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } -serde = "1.0" +serde = { version = "1.0", features = ["derive"] } tracing = "0.1" tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["json"] } diff --git a/nomos-services/network/Cargo.toml b/nomos-services/network/Cargo.toml index d1f361ba..ed8f0072 100644 --- a/nomos-services/network/Cargo.toml +++ b/nomos-services/network/Cargo.toml @@ -22,4 +22,5 @@ tracing-gelf = "0.7" futures = "0.3" [features] -default = ["waku-bindings"] +default = [] +waku = ["waku-bindings"] diff --git a/nomos-services/network/src/backends/mod.rs b/nomos-services/network/src/backends/mod.rs index 4eec4ddf..86ce8e41 100644 --- a/nomos-services/network/src/backends/mod.rs +++ b/nomos-services/network/src/backends/mod.rs @@ -3,9 +3,7 @@ use overwatch_rs::services::state::ServiceState; use tokio::sync::broadcast::Receiver; #[cfg(feature = "waku")] -mod waku; -#[cfg(feature = "waku")] -pub use self::waku::Waku; +pub mod waku; #[async_trait::async_trait] pub trait NetworkBackend { diff --git a/nomos-services/network/src/backends/waku.rs b/nomos-services/network/src/backends/waku.rs index 5f31d555..106b0c46 100644 --- a/nomos-services/network/src/backends/waku.rs +++ b/nomos-services/network/src/backends/waku.rs @@ -1,5 +1,4 @@ use super::*; -use ::waku_bindings::*; use overwatch_rs::services::state::NoState; use serde::{Deserialize, Serialize}; use tokio::sync::{ @@ -7,6 +6,7 @@ use tokio::sync::{ oneshot, }; use tracing::{debug, error}; +use waku_bindings::*; const BROADCAST_CHANNEL_BUF: usize = 16; @@ -96,15 +96,6 @@ impl NetworkBackend for Waku { } } - async fn subscribe(&mut self, kind: Self::EventKind) -> Receiver { - match kind { - EventKind::Message => { - debug!("processed subscription to incoming messages"); - self.message_event.subscribe() - } - } - } - async fn process(&self, msg: Self::Message) { match msg { WakuBackendMessage::Broadcast { message, topic } => { @@ -172,4 +163,13 @@ impl NetworkBackend for Waku { }, }; } + + async fn subscribe(&mut self, kind: Self::EventKind) -> Receiver { + match kind { + EventKind::Message => { + debug!("processed subscription to incoming messages"); + self.message_event.subscribe() + } + } + } } diff --git a/nomos-services/storage/src/backends/mod.rs b/nomos-services/storage/src/backends/mod.rs index 4cc92bc5..ac640857 100644 --- a/nomos-services/storage/src/backends/mod.rs +++ b/nomos-services/storage/src/backends/mod.rs @@ -52,7 +52,7 @@ pub trait StorageBackend: Sized { #[cfg(test)] pub mod testing { - use crate::backends::StorageSerde; + use super::StorageSerde; use bytes::Bytes; use serde::de::DeserializeOwned; use serde::Serialize;