From 04d07038dc2f622d1a36bc317a47fc5d2416b443 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Fri, 15 Sep 2023 09:39:53 +0200 Subject: [PATCH] Remove unused fountain codes (#407) * Remove unused fountain codes * Refactor ProposalChunk to Proposal --- nodes/nomos-node/src/lib.rs | 3 - nomos-core/src/fountain/mock.rs | 39 ------ nomos-core/src/fountain/mod.rs | 43 ------ nomos-core/src/fountain/raptorq.rs | 122 ------------------ nomos-core/src/lib.rs | 1 - nomos-services/consensus/src/lib.rs | 61 +++------ .../consensus/src/network/adapters/libp2p.rs | 12 +- .../consensus/src/network/adapters/mock.rs | 6 +- .../consensus/src/network/adapters/waku.rs | 10 +- .../consensus/src/network/messages.rs | 8 +- nomos-services/consensus/src/network/mod.rs | 4 +- simulations/src/node/carnot/event_builder.rs | 2 +- simulations/src/node/carnot/messages.rs | 6 +- simulations/src/node/carnot/mod.rs | 6 +- tests/src/nodes/nomos.rs | 1 - 15 files changed, 46 insertions(+), 278 deletions(-) delete mode 100644 nomos-core/src/fountain/mock.rs delete mode 100644 nomos-core/src/fountain/mod.rs delete mode 100644 nomos-core/src/fountain/raptorq.rs diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 184ab975..2a886b43 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -11,7 +11,6 @@ use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2p #[cfg(feature = "waku")] use nomos_consensus::network::adapters::waku::WakuAdapter as ConsensusWakuAdapter; use nomos_consensus::CarnotConsensus; -use nomos_core::fountain::mock::MockFountain; use nomos_http::backends::axum::AxumBackend; use nomos_http::bridge::HttpBridgeService; use nomos_http::http::HttpService; @@ -41,7 +40,6 @@ pub type Carnot = CarnotConsensus< ConsensusWakuAdapter, MockPool, MempoolWakuAdapter, - MockFountain, FlatOverlay, Blob, >; @@ -51,7 +49,6 @@ pub type Carnot = CarnotConsensus< ConsensusLibp2pAdapter, MockPool, MempoolLibp2pAdapter, - MockFountain, FlatOverlay, Blob, >; diff --git a/nomos-core/src/fountain/mock.rs b/nomos-core/src/fountain/mock.rs deleted file mode 100644 index 5017168d..00000000 --- a/nomos-core/src/fountain/mock.rs +++ /dev/null @@ -1,39 +0,0 @@ -// std -// crates -use async_trait::async_trait; -use bytes::Bytes; -use futures::{Stream, StreamExt}; -// internal -use crate::fountain::{FountainCode, FountainError}; - -/// Fountain code that does no protocol at all. -/// Just bypasses the raw bytes into a single chunk and reconstruct from it. -#[derive(Debug, Clone)] -pub struct MockFountain; - -#[async_trait] -impl FountainCode for MockFountain { - type Settings = (); - - fn new(_: Self::Settings) -> Self { - Self - } - - fn encode(&self, block: &[u8]) -> Box + Send + Sync + Unpin> { - let data = block.to_vec(); - Box::new(futures::stream::once(Box::pin( - async move { Bytes::from(data) }, - ))) - } - - async fn decode( - &self, - mut stream: impl Stream + Send + Sync + Unpin, - ) -> Result { - if let Some(chunk) = stream.next().await { - Ok(chunk) - } else { - Err("Stream ended before decoding was complete".into()) - } - } -} diff --git a/nomos-core/src/fountain/mod.rs b/nomos-core/src/fountain/mod.rs deleted file mode 100644 index c1720e9f..00000000 --- a/nomos-core/src/fountain/mod.rs +++ /dev/null @@ -1,43 +0,0 @@ -#[cfg(feature = "mock")] -pub mod mock; -#[cfg(feature = "raptor")] -pub mod raptorq; - -// std -use std::error::Error; -// crates -use async_trait; -use bytes::Bytes; -use futures::Stream; -use thiserror::Error; -// internal - -/// FountainCode trait main error type -/// Wrapper around generic whichever error type the fountain code implementation uses -#[derive(Error, Debug)] -#[error(transparent)] -pub struct FountainError(#[from] Box); - -impl From<&str> for FountainError { - fn from(value: &str) -> Self { - FountainError(Box::new(std::io::Error::new( - std::io::ErrorKind::Other, - value, - ))) - } -} - -/// [FountainCode](https://en.wikipedia.org/wiki/Fountain_code) -/// Chop a block of data into chunks and reassembling trait -#[async_trait::async_trait] -pub trait FountainCode { - type Settings: Clone + Send + Sync + 'static; - fn new(settings: Self::Settings) -> Self; - /// Encode a block of data into a stream of chunks - fn encode(&self, block: &[u8]) -> Box + Send + Sync + Unpin>; - /// Decode a stream of chunks into a block of data - async fn decode( - &self, - stream: impl Stream + Send + Sync + Unpin, - ) -> Result; -} diff --git a/nomos-core/src/fountain/raptorq.rs b/nomos-core/src/fountain/raptorq.rs deleted file mode 100644 index 9226b905..00000000 --- a/nomos-core/src/fountain/raptorq.rs +++ /dev/null @@ -1,122 +0,0 @@ -// std -// crates -use bytes::Bytes; -use futures::{Stream, StreamExt}; -use raptorq::{Decoder, Encoder, EncodingPacket, ObjectTransmissionInformation}; -// internal -use crate::fountain::{FountainCode, FountainError}; - -/// [RaptorQ](https://en.wikipedia.org/wiki/Raptor_code#RaptorQ_code) implementation of [`FountainCode`] trait -pub struct RaptorQFountain { - settings: RaptorQSettings, -} - -/// Settings for [`RaptorQFountain`] code -#[derive(Clone, Debug)] -pub struct RaptorQSettings { - pub transmission_information: ObjectTransmissionInformation, - pub repair_packets_per_block: u32, -} - -/// RaptorQ implementation of [`FountainCode`] trait -/// Wrapper around the [`raptorq`](https://crates.io/crates/raptorq) crate -#[async_trait::async_trait] -impl FountainCode for RaptorQFountain { - type Settings = RaptorQSettings; - fn new(settings: Self::Settings) -> Self { - Self { settings } - } - fn encode(&self, block: &[u8]) -> Box + Send + Sync + Unpin> { - let encoder = Encoder::new(block, self.settings.transmission_information); - Box::new(futures::stream::iter( - encoder - .get_encoded_packets(self.settings.repair_packets_per_block) - .into_iter() - .map(|packet| packet.serialize().into()), - )) - } - - async fn decode( - &self, - mut stream: impl Stream + Send + Sync + Unpin, - ) -> Result { - let mut decoder = Decoder::new(self.settings.transmission_information); - while let Some(chunk) = stream.next().await { - let packet = EncodingPacket::deserialize(&chunk); - if let Some(result) = decoder.decode(packet) { - return Ok(Bytes::from(result)); - } - } - Err("Stream ended before decoding was complete".into()) - } -} - -#[cfg(test)] -mod test { - use crate::fountain::raptorq::RaptorQFountain; - use crate::fountain::{FountainCode, FountainError}; - use bytes::Bytes; - use futures::StreamExt; - use rand::{RngCore, SeedableRng}; - - // completely random seed - const SEED: u64 = 1889; - - #[tokio::test] - async fn random_encode_decode() -> Result<(), FountainError> { - const TRANSFER_LENGTH: usize = 1024; - // build settings - let settings = super::RaptorQSettings { - transmission_information: raptorq::ObjectTransmissionInformation::with_defaults( - TRANSFER_LENGTH as u64, - 1000, - ), - repair_packets_per_block: 10, - }; - - let raptor = RaptorQFountain::new(settings); - - // create random payload - let mut payload = [0u8; TRANSFER_LENGTH]; - let mut r = rand::prelude::StdRng::seed_from_u64(SEED); - r.fill_bytes(&mut payload); - let payload = Bytes::from(payload.to_vec()); - - // encode payload - let encoded = raptor.encode(&payload); - - // reconstruct - let decoded = raptor.decode(encoded).await?; - - assert_eq!(decoded, payload); - Ok(()) - } - - #[tokio::test] - async fn random_encode_decode_fails() { - const TRANSFER_LENGTH: usize = 1024; - // build settings - let settings = super::RaptorQSettings { - transmission_information: raptorq::ObjectTransmissionInformation::with_defaults( - TRANSFER_LENGTH as u64, - 1000, - ), - repair_packets_per_block: 10, - }; - - // create random payload - let mut payload = [0u8; TRANSFER_LENGTH]; - let mut r = rand::prelude::StdRng::seed_from_u64(SEED); - r.fill_bytes(&mut payload); - let payload = Bytes::from(payload.to_vec()); - - let raptor = RaptorQFountain::new(settings); - - // encode payload - let encoded = raptor.encode(&payload); - - // reconstruct skipping packets, must fail - let decoded = raptor.decode(encoded.skip(400)).await; - assert!(decoded.is_err()); - } -} diff --git a/nomos-core/src/lib.rs b/nomos-core/src/lib.rs index 5c8de0c1..ed4174ef 100644 --- a/nomos-core/src/lib.rs +++ b/nomos-core/src/lib.rs @@ -2,7 +2,6 @@ pub mod account; pub mod block; pub mod crypto; pub mod da; -pub mod fountain; pub mod staking; pub mod tx; pub mod utils; diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 968f17cd..3391300e 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -22,7 +22,7 @@ use tokio::sync::oneshot::Sender; use tracing::instrument; // internal use crate::network::messages::{ - NetworkMessage, NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, + NetworkMessage, NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, }; use crate::network::NetworkAdapter; use crate::tally::{ @@ -36,7 +36,6 @@ use task_manager::TaskManager; use crate::committee_membership::UpdateableCommitteeMembership; use nomos_core::block::Block; -use nomos_core::fountain::FountainCode; use nomos_core::tx::Transaction; use nomos_core::vote::Tally; use nomos_mempool::{ @@ -60,45 +59,40 @@ fn default_timeout() -> Duration { pub type Seed = [u8; 32]; #[derive(Debug, Deserialize, Serialize)] -pub struct CarnotSettings { +pub struct CarnotSettings { pub private_key: [u8; 32], - pub fountain_settings: Fountain::Settings, pub overlay_settings: O::Settings, #[serde(default = "default_timeout")] pub timeout: Duration, } -impl Clone for CarnotSettings { +impl Clone for CarnotSettings { fn clone(&self) -> Self { Self { private_key: self.private_key, - fountain_settings: self.fountain_settings.clone(), overlay_settings: self.overlay_settings.clone(), timeout: self.timeout, } } } -impl CarnotSettings { +impl CarnotSettings { #[inline] pub const fn new( private_key: [u8; 32], - fountain_settings: Fountain::Settings, overlay_settings: O::Settings, timeout: Duration, ) -> Self { Self { private_key, - fountain_settings, overlay_settings, timeout, } } } -pub struct CarnotConsensus +pub struct CarnotConsensus where - F: FountainCode, A: NetworkAdapter, M: MempoolAdapter, P: MemPool, @@ -112,15 +106,13 @@ where // when implementing ServiceCore for CarnotConsensus network_relay: Relay>, mempool_relay: Relay>, - _fountain: std::marker::PhantomData, _overlay: std::marker::PhantomData, // this need to be substituted by some kind DA bo _blob: std::marker::PhantomData, } -impl ServiceData for CarnotConsensus +impl ServiceData for CarnotConsensus where - F: FountainCode, A: NetworkAdapter, P: MemPool, P::Tx: Transaction + Debug, @@ -129,16 +121,15 @@ where O: Overlay + Debug, { const SERVICE_ID: ServiceId = "Carnot"; - type Settings = CarnotSettings; + type Settings = CarnotSettings; type State = NoState; type StateOperator = NoOperator; type Message = ConsensusMsg; } #[async_trait::async_trait] -impl ServiceCore for CarnotConsensus +impl ServiceCore for CarnotConsensus where - F: FountainCode + Clone + Send + Sync + 'static, A: NetworkAdapter + Clone + Send + Sync + 'static, P: MemPool + Send + Sync + 'static, P::Settings: Send + Sync + 'static, @@ -157,7 +148,6 @@ where Ok(Self { service_state, network_relay, - _fountain: Default::default(), _overlay: Default::default(), _blob: Default::default(), mempool_relay, @@ -179,7 +169,6 @@ where let CarnotSettings { private_key, - fountain_settings, overlay_settings, timeout, } = self.service_state.settings_reader.get_updated_settings(); @@ -195,7 +184,6 @@ where }; let mut carnot = Carnot::from_genesis(NodeId::new(private_key), genesis, overlay); let adapter = A::new(network_relay).await; - let fountain = F::new(fountain_settings); let private_key = PrivateKey::new(private_key); let self_committee = carnot.self_committee(); let leader_committee = [carnot.id()].into_iter().collect::(); @@ -257,7 +245,6 @@ where adapter.clone(), private_key, mempool_relay.clone(), - &fountain, timeout, ) .await @@ -278,9 +265,8 @@ enum Output { BroadcastProposal { proposal: Block }, } -impl CarnotConsensus +impl CarnotConsensus where - F: FountainCode + Clone + Send + Sync + 'static, A: NetworkAdapter + Clone + Send + Sync + 'static, P: MemPool + Send + Sync + 'static, P::Settings: Send + Sync + 'static, @@ -320,7 +306,6 @@ where adapter: A, private_key: PrivateKey, mempool_relay: OutboundRelay>, - fountain: &F, timeout: Duration, ) -> Carnot { let mut output = None; @@ -387,7 +372,7 @@ where } if let Some(output) = output { - handle_output(&adapter, fountain, carnot.id(), output).await; + handle_output(&adapter, carnot.id(), output).await; } carnot @@ -719,7 +704,7 @@ where .await .filter_map(move |msg| { async move { - let proposal = Block::from_bytes(&msg.chunk); + let proposal = Block::from_bytes(&msg.data); if proposal.header().id == msg.proposal { // TODO: Leader is faulty? what should we do? Some(proposal) @@ -775,14 +760,9 @@ where } } -async fn handle_output( - adapter: &A, - fountain: &F, - node_id: NodeId, - output: Output, -) where +async fn handle_output(adapter: &A, node_id: NodeId, output: Output) +where A: NetworkAdapter, - F: FountainCode, Tx: Hash + Eq + Clone + Serialize + DeserializeOwned + Debug, B: Clone + Eq + Hash + Serialize + DeserializeOwned, { @@ -824,15 +804,12 @@ async fn handle_output( } }, Output::BroadcastProposal { proposal } => { - fountain - .encode(&proposal.as_bytes()) - .for_each(|chunk| { - adapter.broadcast(NetworkMessage::ProposalChunk(ProposalChunkMsg { - proposal: proposal.header().id, - chunk: chunk.to_vec().into_boxed_slice(), - view: proposal.header().view, - })) - }) + adapter + .broadcast(NetworkMessage::Proposal(ProposalMsg { + proposal: proposal.header().id, + data: proposal.as_bytes().to_vec().into_boxed_slice(), + view: proposal.header().view, + })) .await; } Output::BroadcastTimeoutQc { timeout_qc } => { diff --git a/nomos-services/consensus/src/network/adapters/libp2p.rs b/nomos-services/consensus/src/network/adapters/libp2p.rs index dc92b1af..8ae95cdb 100644 --- a/nomos-services/consensus/src/network/adapters/libp2p.rs +++ b/nomos-services/consensus/src/network/adapters/libp2p.rs @@ -10,7 +10,7 @@ use tokio_stream::wrappers::ReceiverStream; // internal use crate::network::messages::{NewViewMsg, TimeoutMsg, TimeoutQcMsg}; use crate::network::{ - messages::{NetworkMessage, ProposalChunkMsg, VoteMsg}, + messages::{NetworkMessage, ProposalMsg, VoteMsg}, BoxedStream, NetworkAdapter, }; use consensus_engine::{BlockId, Committee, CommitteeId, View}; @@ -93,7 +93,7 @@ impl Spsc { #[derive(Default)] struct Messages { - proposal_chunks: Spsc, + proposal_chunks: Spsc, votes: HashMap>>, new_views: HashMap>, timeouts: HashMap>, @@ -130,7 +130,7 @@ impl MessageCache { } // This will also advance the cache to use view - 1 as the current view - fn get_proposals(&self, view: View) -> Option> { + fn get_proposals(&self, view: View) -> Option> { let mut cache = self.cache.lock().unwrap(); let res = cache .get_mut(&view) @@ -255,7 +255,7 @@ impl NetworkAdapter for Libp2pAdapter { Ok(Event::Message(message)) => { match nomos_core::wire::deserialize(&message.data) { Ok(GossipsubMessage { to, message }) => match message { - NetworkMessage::ProposalChunk(msg) => { + NetworkMessage::Proposal(msg) => { tracing::debug!("received proposal chunk"); let mut cache = cache.cache.lock().unwrap(); let view = msg.view; @@ -326,10 +326,10 @@ impl NetworkAdapter for Libp2pAdapter { } } - async fn proposal_chunks_stream(&self, view: View) -> BoxedStream { + async fn proposal_chunks_stream(&self, view: View) -> BoxedStream { self.message_cache .get_proposals(view) - .map::, _>(|stream| Box::new(ReceiverStream::new(stream))) + .map::, _>(|stream| Box::new(ReceiverStream::new(stream))) .unwrap_or_else(|| Box::new(tokio_stream::empty())) } diff --git a/nomos-services/consensus/src/network/adapters/mock.rs b/nomos-services/consensus/src/network/adapters/mock.rs index f6b16be7..2ff75c2f 100644 --- a/nomos-services/consensus/src/network/adapters/mock.rs +++ b/nomos-services/consensus/src/network/adapters/mock.rs @@ -10,7 +10,7 @@ use tokio_stream::wrappers::BroadcastStream; use crate::network::messages::{NetworkMessage, NewViewMsg, TimeoutMsg, TimeoutQcMsg}; use crate::network::{ - messages::{ProposalChunkMsg, VoteMsg}, + messages::{ProposalMsg, VoteMsg}, BoxedStream, NetworkAdapter, }; use consensus_engine::{BlockId, Committee, View}; @@ -57,7 +57,7 @@ impl NetworkAdapter for MockAdapter { Self { network_relay } } - async fn proposal_chunks_stream(&self, _view: View) -> BoxedStream { + async fn proposal_chunks_stream(&self, _view: View) -> BoxedStream { let stream_channel = self .message_subscriber_channel() .await @@ -72,7 +72,7 @@ impl NetworkAdapter for MockAdapter { == message.content_topic().content_topic_name { let payload = message.payload(); - Some(ProposalChunkMsg::from_bytes(payload.as_bytes())) + Some(ProposalMsg::from_bytes(payload.as_bytes())) } else { None } diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs index c2f87f59..bc75e945 100644 --- a/nomos-services/consensus/src/network/adapters/waku.rs +++ b/nomos-services/consensus/src/network/adapters/waku.rs @@ -6,7 +6,7 @@ use tokio_stream::wrappers::BroadcastStream; // internal use crate::network::messages::{NetworkMessage, NewViewMsg, TimeoutMsg, TimeoutQcMsg}; use crate::network::{ - messages::{ProposalChunkMsg, VoteMsg}, + messages::{ProposalMsg, VoteMsg}, BoxedStream, NetworkAdapter, }; use consensus_engine::{BlockId, Committee, View}; @@ -161,13 +161,13 @@ impl NetworkAdapter for WakuAdapter { Self { network_relay } } - async fn proposal_chunks_stream(&self, view: View) -> BoxedStream { + async fn proposal_chunks_stream(&self, view: View) -> BoxedStream { Box::new(Box::pin( self.cached_stream_with_content_topic(create_topic(PROPOSAL_TAG, None)) .await .filter_map(move |message| { let payload = message.payload(); - let proposal = ProposalChunkMsg::from_bytes(payload); + let proposal = ProposalMsg::from_bytes(payload); async move { if view == proposal.view { Some(proposal) @@ -291,7 +291,7 @@ fn create_topic(tag: &str, committee: Option<&Committee>) -> WakuContentTopic { fn unwrap_message_to_bytes(message: &NetworkMessage) -> Box<[u8]> { match message { NetworkMessage::NewView(msg) => msg.as_bytes(), - NetworkMessage::ProposalChunk(msg) => msg.as_bytes(), + NetworkMessage::Proposal(msg) => msg.as_bytes(), NetworkMessage::Vote(msg) => msg.as_bytes(), NetworkMessage::Timeout(msg) => msg.as_bytes(), NetworkMessage::TimeoutQc(msg) => msg.as_bytes(), @@ -301,7 +301,7 @@ fn unwrap_message_to_bytes(message: &NetworkMessage) -> Box<[u8]> { fn message_tag(message: &NetworkMessage) -> &str { match message { NetworkMessage::NewView(_) => NEW_VIEW_TAG, - NetworkMessage::ProposalChunk(_) => PROPOSAL_TAG, + NetworkMessage::Proposal(_) => PROPOSAL_TAG, NetworkMessage::Vote(_) => VOTE_TAG, NetworkMessage::Timeout(_) => TIMEOUT_TAG, NetworkMessage::TimeoutQc(_) => TIMEOUT_QC_TAG, diff --git a/nomos-services/consensus/src/network/messages.rs b/nomos-services/consensus/src/network/messages.rs index 6daf7708..6383da05 100644 --- a/nomos-services/consensus/src/network/messages.rs +++ b/nomos-services/consensus/src/network/messages.rs @@ -7,13 +7,13 @@ use consensus_engine::{BlockId, NewView, Qc, Timeout, TimeoutQc, View, Vote}; use nomos_core::wire; #[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)] -pub struct ProposalChunkMsg { - pub chunk: Box<[u8]>, +pub struct ProposalMsg { + pub data: Box<[u8]>, pub proposal: BlockId, pub view: View, } -impl ProposalChunkMsg { +impl ProposalMsg { pub fn as_bytes(&self) -> Box<[u8]> { wire::serialize(self).unwrap().into_boxed_slice() } @@ -90,7 +90,7 @@ pub enum NetworkMessage { TimeoutQc(TimeoutQcMsg), Vote(VoteMsg), NewView(NewViewMsg), - ProposalChunk(ProposalChunkMsg), + Proposal(ProposalMsg), } impl NetworkMessage { diff --git a/nomos-services/consensus/src/network/mod.rs b/nomos-services/consensus/src/network/mod.rs index 40d15770..a8647f9d 100644 --- a/nomos-services/consensus/src/network/mod.rs +++ b/nomos-services/consensus/src/network/mod.rs @@ -6,7 +6,7 @@ pub mod messages; use futures::Stream; // internal use crate::network::messages::{ - NetworkMessage, NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, + NetworkMessage, NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, }; use consensus_engine::{BlockId, Committee, View}; use nomos_network::backends::NetworkBackend; @@ -25,7 +25,7 @@ pub trait NetworkAdapter { async fn proposal_chunks_stream( &self, view: View, - ) -> Box + Send + Sync + Unpin>; + ) -> Box + Send + Sync + Unpin>; async fn broadcast(&self, message: NetworkMessage); async fn timeout_stream(&self, committee: &Committee, view: View) -> BoxedStream; async fn timeout_qc_stream(&self, view: View) -> BoxedStream; diff --git a/simulations/src/node/carnot/event_builder.rs b/simulations/src/node/carnot/event_builder.rs index e428a3c9..882fe6c1 100644 --- a/simulations/src/node/carnot/event_builder.rs +++ b/simulations/src/node/carnot/event_builder.rs @@ -93,7 +93,7 @@ impl EventBuilder { for message in messages { match message { CarnotMessage::Proposal(msg) => { - let block = Block::from_bytes(&msg.chunk); + let block = Block::from_bytes(&msg.data); tracing::info!( node=%self.id, current_view = %engine.current_view(), diff --git a/simulations/src/node/carnot/messages.rs b/simulations/src/node/carnot/messages.rs index 6873feb0..24f8b426 100644 --- a/simulations/src/node/carnot/messages.rs +++ b/simulations/src/node/carnot/messages.rs @@ -1,13 +1,13 @@ use consensus_engine::View; use nomos_consensus::network::messages::{ - NewViewMsg, ProposalChunkMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, + NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, }; use crate::network::PayloadSize; #[derive(Debug, Eq, PartialEq, Hash, Clone)] pub enum CarnotMessage { - Proposal(ProposalChunkMsg), + Proposal(ProposalMsg), Vote(VoteMsg), TimeoutQc(TimeoutQcMsg), Timeout(TimeoutMsg), @@ -30,7 +30,7 @@ impl PayloadSize for CarnotMessage { fn size_bytes(&self) -> u32 { match self { CarnotMessage::Proposal(p) => { - (std::mem::size_of::() + p.chunk.len()) as u32 + (std::mem::size_of::() + p.data.len()) as u32 } CarnotMessage::Vote(_) => std::mem::size_of::() as u32, CarnotMessage::TimeoutQc(_) => std::mem::size_of::() as u32, diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs index 446b6de0..9dc4f091 100644 --- a/simulations/src/node/carnot/mod.rs +++ b/simulations/src/node/carnot/mod.rs @@ -34,7 +34,7 @@ use consensus_engine::{ Block, BlockId, Carnot, Committee, Overlay, Payload, Qc, StandardQc, TimeoutQc, View, Vote, }; use nomos_consensus::committee_membership::UpdateableCommitteeMembership; -use nomos_consensus::network::messages::{ProposalChunkMsg, TimeoutQcMsg}; +use nomos_consensus::network::messages::{ProposalMsg, TimeoutQcMsg}; use nomos_consensus::{ leader_selection::UpdateableLeaderSelection, network::messages::{NewViewMsg, TimeoutMsg, VoteMsg}, @@ -177,8 +177,8 @@ impl< } Output::BroadcastProposal { proposal } => { self.network_interface - .broadcast(CarnotMessage::Proposal(ProposalChunkMsg { - chunk: proposal.as_bytes().to_vec().into(), + .broadcast(CarnotMessage::Proposal(ProposalMsg { + data: proposal.as_bytes().to_vec().into(), proposal: proposal.header().id, view: proposal.header().view, })) diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 91550dd8..bc0339c4 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -271,7 +271,6 @@ fn create_node_config( }, consensus: CarnotSettings { private_key, - fountain_settings: (), overlay_settings: FlatOverlaySettings { nodes, leader: RoundRobin::new(),