Remove unused fountain codes (#407)

* Remove unused fountain codes

* Refactor ProposalChunk to Proposal
This commit is contained in:
Daniel Sanchez 2023-09-15 09:39:53 +02:00 committed by GitHub
parent e1e2b84921
commit 04d07038dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 46 additions and 278 deletions

View File

@ -11,7 +11,6 @@ use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2p
#[cfg(feature = "waku")] #[cfg(feature = "waku")]
use nomos_consensus::network::adapters::waku::WakuAdapter as ConsensusWakuAdapter; use nomos_consensus::network::adapters::waku::WakuAdapter as ConsensusWakuAdapter;
use nomos_consensus::CarnotConsensus; use nomos_consensus::CarnotConsensus;
use nomos_core::fountain::mock::MockFountain;
use nomos_http::backends::axum::AxumBackend; use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::HttpBridgeService; use nomos_http::bridge::HttpBridgeService;
use nomos_http::http::HttpService; use nomos_http::http::HttpService;
@ -41,7 +40,6 @@ pub type Carnot = CarnotConsensus<
ConsensusWakuAdapter, ConsensusWakuAdapter,
MockPool<Tx>, MockPool<Tx>,
MempoolWakuAdapter<Tx>, MempoolWakuAdapter<Tx>,
MockFountain,
FlatOverlay<RoundRobin, RandomBeaconState>, FlatOverlay<RoundRobin, RandomBeaconState>,
Blob, Blob,
>; >;
@ -51,7 +49,6 @@ pub type Carnot = CarnotConsensus<
ConsensusLibp2pAdapter, ConsensusLibp2pAdapter,
MockPool<Tx>, MockPool<Tx>,
MempoolLibp2pAdapter<Tx>, MempoolLibp2pAdapter<Tx>,
MockFountain,
FlatOverlay<RoundRobin, RandomBeaconState>, FlatOverlay<RoundRobin, RandomBeaconState>,
Blob, Blob,
>; >;

View File

@ -1,39 +0,0 @@
// std
// crates
use async_trait::async_trait;
use bytes::Bytes;
use futures::{Stream, StreamExt};
// internal
use crate::fountain::{FountainCode, FountainError};
/// Fountain code that does no protocol at all.
/// Just bypasses the raw bytes into a single chunk and reconstruct from it.
#[derive(Debug, Clone)]
pub struct MockFountain;
#[async_trait]
impl FountainCode for MockFountain {
type Settings = ();
fn new(_: Self::Settings) -> Self {
Self
}
fn encode(&self, block: &[u8]) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
let data = block.to_vec();
Box::new(futures::stream::once(Box::pin(
async move { Bytes::from(data) },
)))
}
async fn decode(
&self,
mut stream: impl Stream<Item = Bytes> + Send + Sync + Unpin,
) -> Result<Bytes, FountainError> {
if let Some(chunk) = stream.next().await {
Ok(chunk)
} else {
Err("Stream ended before decoding was complete".into())
}
}
}

View File

@ -1,43 +0,0 @@
#[cfg(feature = "mock")]
pub mod mock;
#[cfg(feature = "raptor")]
pub mod raptorq;
// std
use std::error::Error;
// crates
use async_trait;
use bytes::Bytes;
use futures::Stream;
use thiserror::Error;
// internal
/// FountainCode trait main error type
/// Wrapper around generic whichever error type the fountain code implementation uses
#[derive(Error, Debug)]
#[error(transparent)]
pub struct FountainError(#[from] Box<dyn Error + Send + Sync>);
impl From<&str> for FountainError {
fn from(value: &str) -> Self {
FountainError(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
value,
)))
}
}
/// [FountainCode](https://en.wikipedia.org/wiki/Fountain_code)
/// Chop a block of data into chunks and reassembling trait
#[async_trait::async_trait]
pub trait FountainCode {
type Settings: Clone + Send + Sync + 'static;
fn new(settings: Self::Settings) -> Self;
/// Encode a block of data into a stream of chunks
fn encode(&self, block: &[u8]) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin>;
/// Decode a stream of chunks into a block of data
async fn decode(
&self,
stream: impl Stream<Item = Bytes> + Send + Sync + Unpin,
) -> Result<Bytes, FountainError>;
}

View File

@ -1,122 +0,0 @@
// std
// crates
use bytes::Bytes;
use futures::{Stream, StreamExt};
use raptorq::{Decoder, Encoder, EncodingPacket, ObjectTransmissionInformation};
// internal
use crate::fountain::{FountainCode, FountainError};
/// [RaptorQ](https://en.wikipedia.org/wiki/Raptor_code#RaptorQ_code) implementation of [`FountainCode`] trait
pub struct RaptorQFountain {
settings: RaptorQSettings,
}
/// Settings for [`RaptorQFountain`] code
#[derive(Clone, Debug)]
pub struct RaptorQSettings {
pub transmission_information: ObjectTransmissionInformation,
pub repair_packets_per_block: u32,
}
/// RaptorQ implementation of [`FountainCode`] trait
/// Wrapper around the [`raptorq`](https://crates.io/crates/raptorq) crate
#[async_trait::async_trait]
impl FountainCode for RaptorQFountain {
type Settings = RaptorQSettings;
fn new(settings: Self::Settings) -> Self {
Self { settings }
}
fn encode(&self, block: &[u8]) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
let encoder = Encoder::new(block, self.settings.transmission_information);
Box::new(futures::stream::iter(
encoder
.get_encoded_packets(self.settings.repair_packets_per_block)
.into_iter()
.map(|packet| packet.serialize().into()),
))
}
async fn decode(
&self,
mut stream: impl Stream<Item = Bytes> + Send + Sync + Unpin,
) -> Result<Bytes, FountainError> {
let mut decoder = Decoder::new(self.settings.transmission_information);
while let Some(chunk) = stream.next().await {
let packet = EncodingPacket::deserialize(&chunk);
if let Some(result) = decoder.decode(packet) {
return Ok(Bytes::from(result));
}
}
Err("Stream ended before decoding was complete".into())
}
}
#[cfg(test)]
mod test {
use crate::fountain::raptorq::RaptorQFountain;
use crate::fountain::{FountainCode, FountainError};
use bytes::Bytes;
use futures::StreamExt;
use rand::{RngCore, SeedableRng};
// completely random seed
const SEED: u64 = 1889;
#[tokio::test]
async fn random_encode_decode() -> Result<(), FountainError> {
const TRANSFER_LENGTH: usize = 1024;
// build settings
let settings = super::RaptorQSettings {
transmission_information: raptorq::ObjectTransmissionInformation::with_defaults(
TRANSFER_LENGTH as u64,
1000,
),
repair_packets_per_block: 10,
};
let raptor = RaptorQFountain::new(settings);
// create random payload
let mut payload = [0u8; TRANSFER_LENGTH];
let mut r = rand::prelude::StdRng::seed_from_u64(SEED);
r.fill_bytes(&mut payload);
let payload = Bytes::from(payload.to_vec());
// encode payload
let encoded = raptor.encode(&payload);
// reconstruct
let decoded = raptor.decode(encoded).await?;
assert_eq!(decoded, payload);
Ok(())
}
#[tokio::test]
async fn random_encode_decode_fails() {
const TRANSFER_LENGTH: usize = 1024;
// build settings
let settings = super::RaptorQSettings {
transmission_information: raptorq::ObjectTransmissionInformation::with_defaults(
TRANSFER_LENGTH as u64,
1000,
),
repair_packets_per_block: 10,
};
// create random payload
let mut payload = [0u8; TRANSFER_LENGTH];
let mut r = rand::prelude::StdRng::seed_from_u64(SEED);
r.fill_bytes(&mut payload);
let payload = Bytes::from(payload.to_vec());
let raptor = RaptorQFountain::new(settings);
// encode payload
let encoded = raptor.encode(&payload);
// reconstruct skipping packets, must fail
let decoded = raptor.decode(encoded.skip(400)).await;
assert!(decoded.is_err());
}
}

View File

@ -2,7 +2,6 @@ pub mod account;
pub mod block; pub mod block;
pub mod crypto; pub mod crypto;
pub mod da; pub mod da;
pub mod fountain;
pub mod staking; pub mod staking;
pub mod tx; pub mod tx;
pub mod utils; pub mod utils;

View File

@ -22,7 +22,7 @@ use tokio::sync::oneshot::Sender;
use tracing::instrument; use tracing::instrument;
// internal // internal
use crate::network::messages::{ use crate::network::messages::{
NetworkMessage, NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, NetworkMessage, NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg,
}; };
use crate::network::NetworkAdapter; use crate::network::NetworkAdapter;
use crate::tally::{ use crate::tally::{
@ -36,7 +36,6 @@ use task_manager::TaskManager;
use crate::committee_membership::UpdateableCommitteeMembership; use crate::committee_membership::UpdateableCommitteeMembership;
use nomos_core::block::Block; use nomos_core::block::Block;
use nomos_core::fountain::FountainCode;
use nomos_core::tx::Transaction; use nomos_core::tx::Transaction;
use nomos_core::vote::Tally; use nomos_core::vote::Tally;
use nomos_mempool::{ use nomos_mempool::{
@ -60,45 +59,40 @@ fn default_timeout() -> Duration {
pub type Seed = [u8; 32]; pub type Seed = [u8; 32];
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
pub struct CarnotSettings<Fountain: FountainCode, O: Overlay> { pub struct CarnotSettings<O: Overlay> {
pub private_key: [u8; 32], pub private_key: [u8; 32],
pub fountain_settings: Fountain::Settings,
pub overlay_settings: O::Settings, pub overlay_settings: O::Settings,
#[serde(default = "default_timeout")] #[serde(default = "default_timeout")]
pub timeout: Duration, pub timeout: Duration,
} }
impl<Fountain: FountainCode, O: Overlay> Clone for CarnotSettings<Fountain, O> { impl<O: Overlay> Clone for CarnotSettings<O> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
private_key: self.private_key, private_key: self.private_key,
fountain_settings: self.fountain_settings.clone(),
overlay_settings: self.overlay_settings.clone(), overlay_settings: self.overlay_settings.clone(),
timeout: self.timeout, timeout: self.timeout,
} }
} }
} }
impl<Fountain: FountainCode, O: Overlay> CarnotSettings<Fountain, O> { impl<O: Overlay> CarnotSettings<O> {
#[inline] #[inline]
pub const fn new( pub const fn new(
private_key: [u8; 32], private_key: [u8; 32],
fountain_settings: Fountain::Settings,
overlay_settings: O::Settings, overlay_settings: O::Settings,
timeout: Duration, timeout: Duration,
) -> Self { ) -> Self {
Self { Self {
private_key, private_key,
fountain_settings,
overlay_settings, overlay_settings,
timeout, timeout,
} }
} }
} }
pub struct CarnotConsensus<A, P, M, F, O, B> pub struct CarnotConsensus<A, P, M, O, B>
where where
F: FountainCode,
A: NetworkAdapter, A: NetworkAdapter,
M: MempoolAdapter<Tx = P::Tx>, M: MempoolAdapter<Tx = P::Tx>,
P: MemPool, P: MemPool,
@ -112,15 +106,13 @@ where
// when implementing ServiceCore for CarnotConsensus // when implementing ServiceCore for CarnotConsensus
network_relay: Relay<NetworkService<A::Backend>>, network_relay: Relay<NetworkService<A::Backend>>,
mempool_relay: Relay<MempoolService<M, P>>, mempool_relay: Relay<MempoolService<M, P>>,
_fountain: std::marker::PhantomData<F>,
_overlay: std::marker::PhantomData<O>, _overlay: std::marker::PhantomData<O>,
// this need to be substituted by some kind DA bo // this need to be substituted by some kind DA bo
_blob: std::marker::PhantomData<B>, _blob: std::marker::PhantomData<B>,
} }
impl<A, P, M, F, O, B> ServiceData for CarnotConsensus<A, P, M, F, O, B> impl<A, P, M, O, B> ServiceData for CarnotConsensus<A, P, M, O, B>
where where
F: FountainCode,
A: NetworkAdapter, A: NetworkAdapter,
P: MemPool, P: MemPool,
P::Tx: Transaction + Debug, P::Tx: Transaction + Debug,
@ -129,16 +121,15 @@ where
O: Overlay + Debug, O: Overlay + Debug,
{ {
const SERVICE_ID: ServiceId = "Carnot"; const SERVICE_ID: ServiceId = "Carnot";
type Settings = CarnotSettings<F, O>; type Settings = CarnotSettings<O>;
type State = NoState<Self::Settings>; type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>; type StateOperator = NoOperator<Self::State>;
type Message = ConsensusMsg; type Message = ConsensusMsg;
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<A, P, M, F, O, B> ServiceCore for CarnotConsensus<A, P, M, F, O, B> impl<A, P, M, O, B> ServiceCore for CarnotConsensus<A, P, M, O, B>
where where
F: FountainCode + Clone + Send + Sync + 'static,
A: NetworkAdapter + Clone + Send + Sync + 'static, A: NetworkAdapter + Clone + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static, P: MemPool + Send + Sync + 'static,
P::Settings: Send + Sync + 'static, P::Settings: Send + Sync + 'static,
@ -157,7 +148,6 @@ where
Ok(Self { Ok(Self {
service_state, service_state,
network_relay, network_relay,
_fountain: Default::default(),
_overlay: Default::default(), _overlay: Default::default(),
_blob: Default::default(), _blob: Default::default(),
mempool_relay, mempool_relay,
@ -179,7 +169,6 @@ where
let CarnotSettings { let CarnotSettings {
private_key, private_key,
fountain_settings,
overlay_settings, overlay_settings,
timeout, timeout,
} = self.service_state.settings_reader.get_updated_settings(); } = self.service_state.settings_reader.get_updated_settings();
@ -195,7 +184,6 @@ where
}; };
let mut carnot = Carnot::from_genesis(NodeId::new(private_key), genesis, overlay); let mut carnot = Carnot::from_genesis(NodeId::new(private_key), genesis, overlay);
let adapter = A::new(network_relay).await; let adapter = A::new(network_relay).await;
let fountain = F::new(fountain_settings);
let private_key = PrivateKey::new(private_key); let private_key = PrivateKey::new(private_key);
let self_committee = carnot.self_committee(); let self_committee = carnot.self_committee();
let leader_committee = [carnot.id()].into_iter().collect::<Committee>(); let leader_committee = [carnot.id()].into_iter().collect::<Committee>();
@ -257,7 +245,6 @@ where
adapter.clone(), adapter.clone(),
private_key, private_key,
mempool_relay.clone(), mempool_relay.clone(),
&fountain,
timeout, timeout,
) )
.await .await
@ -278,9 +265,8 @@ enum Output<Tx: Clone + Eq + Hash, Blob: Clone + Eq + Hash> {
BroadcastProposal { proposal: Block<Tx, Blob> }, BroadcastProposal { proposal: Block<Tx, Blob> },
} }
impl<A, P, M, F, O, B> CarnotConsensus<A, P, M, F, O, B> impl<A, P, M, O, B> CarnotConsensus<A, P, M, O, B>
where where
F: FountainCode + Clone + Send + Sync + 'static,
A: NetworkAdapter + Clone + Send + Sync + 'static, A: NetworkAdapter + Clone + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static, P: MemPool + Send + Sync + 'static,
P::Settings: Send + Sync + 'static, P::Settings: Send + Sync + 'static,
@ -320,7 +306,6 @@ where
adapter: A, adapter: A,
private_key: PrivateKey, private_key: PrivateKey,
mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>, mempool_relay: OutboundRelay<MempoolMsg<P::Tx>>,
fountain: &F,
timeout: Duration, timeout: Duration,
) -> Carnot<O> { ) -> Carnot<O> {
let mut output = None; let mut output = None;
@ -387,7 +372,7 @@ where
} }
if let Some(output) = output { if let Some(output) = output {
handle_output(&adapter, fountain, carnot.id(), output).await; handle_output(&adapter, carnot.id(), output).await;
} }
carnot carnot
@ -719,7 +704,7 @@ where
.await .await
.filter_map(move |msg| { .filter_map(move |msg| {
async move { async move {
let proposal = Block::from_bytes(&msg.chunk); let proposal = Block::from_bytes(&msg.data);
if proposal.header().id == msg.proposal { if proposal.header().id == msg.proposal {
// TODO: Leader is faulty? what should we do? // TODO: Leader is faulty? what should we do?
Some(proposal) Some(proposal)
@ -775,14 +760,9 @@ where
} }
} }
async fn handle_output<A, F, Tx, B>( async fn handle_output<A, Tx, B>(adapter: &A, node_id: NodeId, output: Output<Tx, B>)
adapter: &A, where
fountain: &F,
node_id: NodeId,
output: Output<Tx, B>,
) where
A: NetworkAdapter, A: NetworkAdapter,
F: FountainCode,
Tx: Hash + Eq + Clone + Serialize + DeserializeOwned + Debug, Tx: Hash + Eq + Clone + Serialize + DeserializeOwned + Debug,
B: Clone + Eq + Hash + Serialize + DeserializeOwned, B: Clone + Eq + Hash + Serialize + DeserializeOwned,
{ {
@ -824,15 +804,12 @@ async fn handle_output<A, F, Tx, B>(
} }
}, },
Output::BroadcastProposal { proposal } => { Output::BroadcastProposal { proposal } => {
fountain adapter
.encode(&proposal.as_bytes()) .broadcast(NetworkMessage::Proposal(ProposalMsg {
.for_each(|chunk| {
adapter.broadcast(NetworkMessage::ProposalChunk(ProposalChunkMsg {
proposal: proposal.header().id, proposal: proposal.header().id,
chunk: chunk.to_vec().into_boxed_slice(), data: proposal.as_bytes().to_vec().into_boxed_slice(),
view: proposal.header().view, view: proposal.header().view,
})) }))
})
.await; .await;
} }
Output::BroadcastTimeoutQc { timeout_qc } => { Output::BroadcastTimeoutQc { timeout_qc } => {

View File

@ -10,7 +10,7 @@ use tokio_stream::wrappers::ReceiverStream;
// internal // internal
use crate::network::messages::{NewViewMsg, TimeoutMsg, TimeoutQcMsg}; use crate::network::messages::{NewViewMsg, TimeoutMsg, TimeoutQcMsg};
use crate::network::{ use crate::network::{
messages::{NetworkMessage, ProposalChunkMsg, VoteMsg}, messages::{NetworkMessage, ProposalMsg, VoteMsg},
BoxedStream, NetworkAdapter, BoxedStream, NetworkAdapter,
}; };
use consensus_engine::{BlockId, Committee, CommitteeId, View}; use consensus_engine::{BlockId, Committee, CommitteeId, View};
@ -93,7 +93,7 @@ impl<T> Spsc<T> {
#[derive(Default)] #[derive(Default)]
struct Messages { struct Messages {
proposal_chunks: Spsc<ProposalChunkMsg>, proposal_chunks: Spsc<ProposalMsg>,
votes: HashMap<CommitteeId, HashMap<BlockId, Spsc<VoteMsg>>>, votes: HashMap<CommitteeId, HashMap<BlockId, Spsc<VoteMsg>>>,
new_views: HashMap<CommitteeId, Spsc<NewViewMsg>>, new_views: HashMap<CommitteeId, Spsc<NewViewMsg>>,
timeouts: HashMap<CommitteeId, Spsc<TimeoutMsg>>, timeouts: HashMap<CommitteeId, Spsc<TimeoutMsg>>,
@ -130,7 +130,7 @@ impl MessageCache {
} }
// This will also advance the cache to use view - 1 as the current view // This will also advance the cache to use view - 1 as the current view
fn get_proposals(&self, view: View) -> Option<Receiver<ProposalChunkMsg>> { fn get_proposals(&self, view: View) -> Option<Receiver<ProposalMsg>> {
let mut cache = self.cache.lock().unwrap(); let mut cache = self.cache.lock().unwrap();
let res = cache let res = cache
.get_mut(&view) .get_mut(&view)
@ -255,7 +255,7 @@ impl NetworkAdapter for Libp2pAdapter {
Ok(Event::Message(message)) => { Ok(Event::Message(message)) => {
match nomos_core::wire::deserialize(&message.data) { match nomos_core::wire::deserialize(&message.data) {
Ok(GossipsubMessage { to, message }) => match message { Ok(GossipsubMessage { to, message }) => match message {
NetworkMessage::ProposalChunk(msg) => { NetworkMessage::Proposal(msg) => {
tracing::debug!("received proposal chunk"); tracing::debug!("received proposal chunk");
let mut cache = cache.cache.lock().unwrap(); let mut cache = cache.cache.lock().unwrap();
let view = msg.view; let view = msg.view;
@ -326,10 +326,10 @@ impl NetworkAdapter for Libp2pAdapter {
} }
} }
async fn proposal_chunks_stream(&self, view: View) -> BoxedStream<ProposalChunkMsg> { async fn proposal_chunks_stream(&self, view: View) -> BoxedStream<ProposalMsg> {
self.message_cache self.message_cache
.get_proposals(view) .get_proposals(view)
.map::<BoxedStream<ProposalChunkMsg>, _>(|stream| Box::new(ReceiverStream::new(stream))) .map::<BoxedStream<ProposalMsg>, _>(|stream| Box::new(ReceiverStream::new(stream)))
.unwrap_or_else(|| Box::new(tokio_stream::empty())) .unwrap_or_else(|| Box::new(tokio_stream::empty()))
} }

View File

@ -10,7 +10,7 @@ use tokio_stream::wrappers::BroadcastStream;
use crate::network::messages::{NetworkMessage, NewViewMsg, TimeoutMsg, TimeoutQcMsg}; use crate::network::messages::{NetworkMessage, NewViewMsg, TimeoutMsg, TimeoutQcMsg};
use crate::network::{ use crate::network::{
messages::{ProposalChunkMsg, VoteMsg}, messages::{ProposalMsg, VoteMsg},
BoxedStream, NetworkAdapter, BoxedStream, NetworkAdapter,
}; };
use consensus_engine::{BlockId, Committee, View}; use consensus_engine::{BlockId, Committee, View};
@ -57,7 +57,7 @@ impl NetworkAdapter for MockAdapter {
Self { network_relay } Self { network_relay }
} }
async fn proposal_chunks_stream(&self, _view: View) -> BoxedStream<ProposalChunkMsg> { async fn proposal_chunks_stream(&self, _view: View) -> BoxedStream<ProposalMsg> {
let stream_channel = self let stream_channel = self
.message_subscriber_channel() .message_subscriber_channel()
.await .await
@ -72,7 +72,7 @@ impl NetworkAdapter for MockAdapter {
== message.content_topic().content_topic_name == message.content_topic().content_topic_name
{ {
let payload = message.payload(); let payload = message.payload();
Some(ProposalChunkMsg::from_bytes(payload.as_bytes())) Some(ProposalMsg::from_bytes(payload.as_bytes()))
} else { } else {
None None
} }

View File

@ -6,7 +6,7 @@ use tokio_stream::wrappers::BroadcastStream;
// internal // internal
use crate::network::messages::{NetworkMessage, NewViewMsg, TimeoutMsg, TimeoutQcMsg}; use crate::network::messages::{NetworkMessage, NewViewMsg, TimeoutMsg, TimeoutQcMsg};
use crate::network::{ use crate::network::{
messages::{ProposalChunkMsg, VoteMsg}, messages::{ProposalMsg, VoteMsg},
BoxedStream, NetworkAdapter, BoxedStream, NetworkAdapter,
}; };
use consensus_engine::{BlockId, Committee, View}; use consensus_engine::{BlockId, Committee, View};
@ -161,13 +161,13 @@ impl NetworkAdapter for WakuAdapter {
Self { network_relay } Self { network_relay }
} }
async fn proposal_chunks_stream(&self, view: View) -> BoxedStream<ProposalChunkMsg> { async fn proposal_chunks_stream(&self, view: View) -> BoxedStream<ProposalMsg> {
Box::new(Box::pin( Box::new(Box::pin(
self.cached_stream_with_content_topic(create_topic(PROPOSAL_TAG, None)) self.cached_stream_with_content_topic(create_topic(PROPOSAL_TAG, None))
.await .await
.filter_map(move |message| { .filter_map(move |message| {
let payload = message.payload(); let payload = message.payload();
let proposal = ProposalChunkMsg::from_bytes(payload); let proposal = ProposalMsg::from_bytes(payload);
async move { async move {
if view == proposal.view { if view == proposal.view {
Some(proposal) Some(proposal)
@ -291,7 +291,7 @@ fn create_topic(tag: &str, committee: Option<&Committee>) -> WakuContentTopic {
fn unwrap_message_to_bytes(message: &NetworkMessage) -> Box<[u8]> { fn unwrap_message_to_bytes(message: &NetworkMessage) -> Box<[u8]> {
match message { match message {
NetworkMessage::NewView(msg) => msg.as_bytes(), NetworkMessage::NewView(msg) => msg.as_bytes(),
NetworkMessage::ProposalChunk(msg) => msg.as_bytes(), NetworkMessage::Proposal(msg) => msg.as_bytes(),
NetworkMessage::Vote(msg) => msg.as_bytes(), NetworkMessage::Vote(msg) => msg.as_bytes(),
NetworkMessage::Timeout(msg) => msg.as_bytes(), NetworkMessage::Timeout(msg) => msg.as_bytes(),
NetworkMessage::TimeoutQc(msg) => msg.as_bytes(), NetworkMessage::TimeoutQc(msg) => msg.as_bytes(),
@ -301,7 +301,7 @@ fn unwrap_message_to_bytes(message: &NetworkMessage) -> Box<[u8]> {
fn message_tag(message: &NetworkMessage) -> &str { fn message_tag(message: &NetworkMessage) -> &str {
match message { match message {
NetworkMessage::NewView(_) => NEW_VIEW_TAG, NetworkMessage::NewView(_) => NEW_VIEW_TAG,
NetworkMessage::ProposalChunk(_) => PROPOSAL_TAG, NetworkMessage::Proposal(_) => PROPOSAL_TAG,
NetworkMessage::Vote(_) => VOTE_TAG, NetworkMessage::Vote(_) => VOTE_TAG,
NetworkMessage::Timeout(_) => TIMEOUT_TAG, NetworkMessage::Timeout(_) => TIMEOUT_TAG,
NetworkMessage::TimeoutQc(_) => TIMEOUT_QC_TAG, NetworkMessage::TimeoutQc(_) => TIMEOUT_QC_TAG,

View File

@ -7,13 +7,13 @@ use consensus_engine::{BlockId, NewView, Qc, Timeout, TimeoutQc, View, Vote};
use nomos_core::wire; use nomos_core::wire;
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] #[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
pub struct ProposalChunkMsg { pub struct ProposalMsg {
pub chunk: Box<[u8]>, pub data: Box<[u8]>,
pub proposal: BlockId, pub proposal: BlockId,
pub view: View, pub view: View,
} }
impl ProposalChunkMsg { impl ProposalMsg {
pub fn as_bytes(&self) -> Box<[u8]> { pub fn as_bytes(&self) -> Box<[u8]> {
wire::serialize(self).unwrap().into_boxed_slice() wire::serialize(self).unwrap().into_boxed_slice()
} }
@ -90,7 +90,7 @@ pub enum NetworkMessage {
TimeoutQc(TimeoutQcMsg), TimeoutQc(TimeoutQcMsg),
Vote(VoteMsg), Vote(VoteMsg),
NewView(NewViewMsg), NewView(NewViewMsg),
ProposalChunk(ProposalChunkMsg), Proposal(ProposalMsg),
} }
impl NetworkMessage { impl NetworkMessage {

View File

@ -6,7 +6,7 @@ pub mod messages;
use futures::Stream; use futures::Stream;
// internal // internal
use crate::network::messages::{ use crate::network::messages::{
NetworkMessage, NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, NetworkMessage, NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg,
}; };
use consensus_engine::{BlockId, Committee, View}; use consensus_engine::{BlockId, Committee, View};
use nomos_network::backends::NetworkBackend; use nomos_network::backends::NetworkBackend;
@ -25,7 +25,7 @@ pub trait NetworkAdapter {
async fn proposal_chunks_stream( async fn proposal_chunks_stream(
&self, &self,
view: View, view: View,
) -> Box<dyn Stream<Item = ProposalChunkMsg> + Send + Sync + Unpin>; ) -> Box<dyn Stream<Item = ProposalMsg> + Send + Sync + Unpin>;
async fn broadcast(&self, message: NetworkMessage); async fn broadcast(&self, message: NetworkMessage);
async fn timeout_stream(&self, committee: &Committee, view: View) -> BoxedStream<TimeoutMsg>; async fn timeout_stream(&self, committee: &Committee, view: View) -> BoxedStream<TimeoutMsg>;
async fn timeout_qc_stream(&self, view: View) -> BoxedStream<TimeoutQcMsg>; async fn timeout_qc_stream(&self, view: View) -> BoxedStream<TimeoutQcMsg>;

View File

@ -93,7 +93,7 @@ impl EventBuilder {
for message in messages { for message in messages {
match message { match message {
CarnotMessage::Proposal(msg) => { CarnotMessage::Proposal(msg) => {
let block = Block::from_bytes(&msg.chunk); let block = Block::from_bytes(&msg.data);
tracing::info!( tracing::info!(
node=%self.id, node=%self.id,
current_view = %engine.current_view(), current_view = %engine.current_view(),

View File

@ -1,13 +1,13 @@
use consensus_engine::View; use consensus_engine::View;
use nomos_consensus::network::messages::{ use nomos_consensus::network::messages::{
NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg,
}; };
use crate::network::PayloadSize; use crate::network::PayloadSize;
#[derive(Debug, Eq, PartialEq, Hash, Clone)] #[derive(Debug, Eq, PartialEq, Hash, Clone)]
pub enum CarnotMessage { pub enum CarnotMessage {
Proposal(ProposalChunkMsg), Proposal(ProposalMsg),
Vote(VoteMsg), Vote(VoteMsg),
TimeoutQc(TimeoutQcMsg), TimeoutQc(TimeoutQcMsg),
Timeout(TimeoutMsg), Timeout(TimeoutMsg),
@ -30,7 +30,7 @@ impl PayloadSize for CarnotMessage {
fn size_bytes(&self) -> u32 { fn size_bytes(&self) -> u32 {
match self { match self {
CarnotMessage::Proposal(p) => { CarnotMessage::Proposal(p) => {
(std::mem::size_of::<ProposalChunkMsg>() + p.chunk.len()) as u32 (std::mem::size_of::<ProposalMsg>() + p.data.len()) as u32
} }
CarnotMessage::Vote(_) => std::mem::size_of::<VoteMsg>() as u32, CarnotMessage::Vote(_) => std::mem::size_of::<VoteMsg>() as u32,
CarnotMessage::TimeoutQc(_) => std::mem::size_of::<TimeoutQcMsg>() as u32, CarnotMessage::TimeoutQc(_) => std::mem::size_of::<TimeoutQcMsg>() as u32,

View File

@ -34,7 +34,7 @@ use consensus_engine::{
Block, BlockId, Carnot, Committee, Overlay, Payload, Qc, StandardQc, TimeoutQc, View, Vote, Block, BlockId, Carnot, Committee, Overlay, Payload, Qc, StandardQc, TimeoutQc, View, Vote,
}; };
use nomos_consensus::committee_membership::UpdateableCommitteeMembership; use nomos_consensus::committee_membership::UpdateableCommitteeMembership;
use nomos_consensus::network::messages::{ProposalChunkMsg, TimeoutQcMsg}; use nomos_consensus::network::messages::{ProposalMsg, TimeoutQcMsg};
use nomos_consensus::{ use nomos_consensus::{
leader_selection::UpdateableLeaderSelection, leader_selection::UpdateableLeaderSelection,
network::messages::{NewViewMsg, TimeoutMsg, VoteMsg}, network::messages::{NewViewMsg, TimeoutMsg, VoteMsg},
@ -177,8 +177,8 @@ impl<
} }
Output::BroadcastProposal { proposal } => { Output::BroadcastProposal { proposal } => {
self.network_interface self.network_interface
.broadcast(CarnotMessage::Proposal(ProposalChunkMsg { .broadcast(CarnotMessage::Proposal(ProposalMsg {
chunk: proposal.as_bytes().to_vec().into(), data: proposal.as_bytes().to_vec().into(),
proposal: proposal.header().id, proposal: proposal.header().id,
view: proposal.header().view, view: proposal.header().view,
})) }))

View File

@ -271,7 +271,6 @@ fn create_node_config(
}, },
consensus: CarnotSettings { consensus: CarnotSettings {
private_key, private_key,
fountain_settings: (),
overlay_settings: FlatOverlaySettings { overlay_settings: FlatOverlaySettings {
nodes, nodes,
leader: RoundRobin::new(), leader: RoundRobin::new(),