Add fountain codes to consensus (#38)

* Add basic encode/decode test

* Use Stream for trait instead of Iterator

* Removed unnecessary pin

* Add custom fountain error

* Add failing path to tests

* Added docs

* Normalized chunks to bytes

* Added settings initialization for fountain codes trait

* Pipe fountain code through consensus

* Implement broadcast block and block reconstruction for Member Overlay

* Fix failing raptor test

* Use seed for raptorq tests

* Use const for topics instead of static

* Clippy happy
This commit is contained in:
Daniel Sanchez 2023-01-10 12:58:51 +01:00 committed by GitHub
parent 15f97dcace
commit 7ff63d4824
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 182 additions and 127 deletions

View File

@ -1,3 +1,8 @@
// std
// crates
use bytes::Bytes;
// internal
/// A block
#[derive(Clone, Debug)]
pub struct Block;
@ -10,20 +15,13 @@ pub struct BlockHeader;
#[derive(Clone, Debug)]
pub struct BlockId;
/// A block chunk, N pieces are necessary to reconstruct the full block
#[derive(Clone, Copy, Debug)]
pub struct BlockChunk {
pub index: u8,
}
impl Block {
/// Fake implementation of erasure coding protocol
pub fn chunk<const SIZE: usize>(self) -> [BlockChunk; SIZE] {
// TODO: this is a completely temporary and fake implementation
(0..SIZE)
.map(|i| BlockChunk { index: i as u8 })
.collect::<Vec<_>>()
.try_into()
.expect("This should not fail unless chunking exceed memory limits")
/// Encode block into bytes
pub fn as_bytes(&self) -> Bytes {
Bytes::new()
}
pub fn from_bytes(_: Bytes) -> Self {
Self
}
}

View File

@ -29,15 +29,13 @@ impl From<&str> for FountainError {
/// Chop a block of data into chunks and reassembling trait
#[async_trait::async_trait]
pub trait FountainCode {
type Settings;
type Settings: Clone + Send + Sync + 'static;
fn new(settings: Self::Settings) -> Self;
/// Encode a block of data into a stream of chunks
fn encode(
block: &[u8],
settings: &Self::Settings,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin>;
fn encode(&self, block: &[u8]) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin>;
/// Decode a stream of chunks into a block of data
async fn decode(
&self,
stream: impl Stream<Item = Bytes> + Send + Sync + Unpin,
settings: &Self::Settings,
) -> Result<Bytes, FountainError>;
}

View File

@ -7,9 +7,12 @@ use raptorq::{Decoder, Encoder, EncodingPacket, ObjectTransmissionInformation};
use crate::fountain::{FountainCode, FountainError};
/// [RaptorQ](https://en.wikipedia.org/wiki/Raptor_code#RaptorQ_code) implementation of [`FountainCode`] trait
pub struct RaptorQFountain;
pub struct RaptorQFountain {
settings: RaptorQSettings,
}
/// Settings for [`RaptorQFountain`] code
#[derive(Clone, Debug)]
pub struct RaptorQSettings {
pub transmission_information: ObjectTransmissionInformation,
pub repair_packets_per_block: u32,
@ -20,24 +23,24 @@ pub struct RaptorQSettings {
#[async_trait::async_trait]
impl FountainCode for RaptorQFountain {
type Settings = RaptorQSettings;
fn encode(
block: &[u8],
settings: &Self::Settings,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
let encoder = Encoder::new(block, settings.transmission_information);
fn new(settings: Self::Settings) -> Self {
Self { settings }
}
fn encode(&self, block: &[u8]) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
let encoder = Encoder::new(block, self.settings.transmission_information);
Box::new(futures::stream::iter(
encoder
.get_encoded_packets(settings.repair_packets_per_block)
.get_encoded_packets(self.settings.repair_packets_per_block)
.into_iter()
.map(|packet| packet.serialize().into()),
))
}
async fn decode(
&self,
mut stream: impl Stream<Item = Bytes> + Send + Sync + Unpin,
settings: &Self::Settings,
) -> Result<Bytes, FountainError> {
let mut decoder = Decoder::new(settings.transmission_information);
let mut decoder = Decoder::new(self.settings.transmission_information);
while let Some(chunk) = stream.next().await {
let packet = EncodingPacket::deserialize(&chunk);
if let Some(result) = decoder.decode(packet) {
@ -54,7 +57,10 @@ mod test {
use crate::fountain::{FountainCode, FountainError};
use bytes::Bytes;
use futures::StreamExt;
use rand::RngCore;
use rand::{RngCore, SeedableRng};
// completely random seed
const SEED: u64 = 1889;
#[tokio::test]
async fn random_encode_decode() -> Result<(), FountainError> {
@ -68,16 +74,19 @@ mod test {
repair_packets_per_block: 10,
};
let raptor = RaptorQFountain::new(settings);
// create random payload
let mut payload = [0u8; TRANSFER_LENGTH];
rand::thread_rng().fill_bytes(&mut payload);
let mut r = rand::prelude::StdRng::seed_from_u64(SEED);
r.fill_bytes(&mut payload);
let payload = Bytes::from(payload.to_vec());
// encode payload
let encoded = RaptorQFountain::encode(&payload, &settings);
let encoded = raptor.encode(&payload);
// reconstruct
let decoded = RaptorQFountain::decode(encoded, &settings).await?;
let decoded = raptor.decode(encoded).await?;
assert_eq!(decoded, payload);
Ok(())
@ -97,14 +106,17 @@ mod test {
// create random payload
let mut payload = [0u8; TRANSFER_LENGTH];
rand::thread_rng().fill_bytes(&mut payload);
let mut r = rand::prelude::StdRng::seed_from_u64(SEED);
r.fill_bytes(&mut payload);
let payload = Bytes::from(payload.to_vec());
let raptor = RaptorQFountain::new(settings);
// encode payload
let encoded = RaptorQFountain::encode(&payload, &settings);
let encoded = raptor.encode(&payload);
// reconstruct skipping packets, must fail
let decoded = RaptorQFountain::decode(encoded.skip(400), &settings).await;
let decoded = raptor.decode(encoded.skip(400)).await;
assert!(decoded.is_err());
}
}

View File

@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bytes = "1.3"
chrono = "0.4"
rand_chacha = "0.3"
rand = "0.8"

View File

@ -18,6 +18,7 @@ use crate::network::NetworkAdapter;
use leadership::{Leadership, LeadershipResult};
use nomos_core::block::Block;
use nomos_core::crypto::PublicKey;
use nomos_core::fountain::FountainCode;
use nomos_core::staking::Stake;
use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolService};
use nomos_network::NetworkService;
@ -38,13 +39,23 @@ pub type Seed = [u8; 32];
const COMMITTEE_SIZE: usize = 1;
#[derive(Clone)]
pub struct CarnotSettings {
pub struct CarnotSettings<Fountain: FountainCode> {
private_key: [u8; 32],
fountain_settings: Fountain::Settings,
}
pub struct CarnotConsensus<A, P, M>
impl<Fountain: FountainCode> Clone for CarnotSettings<Fountain> {
fn clone(&self) -> Self {
Self {
private_key: self.private_key,
fountain_settings: self.fountain_settings.clone(),
}
}
}
pub struct CarnotConsensus<A, P, M, F>
where
F: FountainCode + Send + Sync + 'static,
A: NetworkAdapter + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
P::Settings: Clone + Send + Sync + 'static,
@ -57,10 +68,12 @@ where
// when implementing ServiceCore for CarnotConsensus
network_relay: Relay<NetworkService<A::Backend>>,
mempool_relay: Relay<MempoolService<M, P>>,
_fountain: std::marker::PhantomData<F>,
}
impl<A, P, M> ServiceData for CarnotConsensus<A, P, M>
impl<A, P, M, F> ServiceData for CarnotConsensus<A, P, M, F>
where
F: FountainCode + Send + Sync + 'static,
A: NetworkAdapter + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
P::Settings: Clone + Send + Sync + 'static,
@ -69,15 +82,16 @@ where
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
{
const SERVICE_ID: ServiceId = "Carnot";
type Settings = CarnotSettings;
type Settings = CarnotSettings<F>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait::async_trait]
impl<A, P, M> ServiceCore for CarnotConsensus<A, P, M>
impl<A, P, M, F> ServiceCore for CarnotConsensus<A, P, M, F>
where
F: FountainCode + Send + Sync + 'static,
A: NetworkAdapter + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
P::Settings: Clone + Send + Sync + 'static,
@ -91,6 +105,7 @@ where
Ok(Self {
service_state,
network_relay,
_fountain: Default::default(),
mempool_relay,
})
}
@ -110,28 +125,28 @@ where
.await
.expect("Relay connection with MemPoolService should succeed");
let CarnotSettings {
private_key,
fountain_settings,
} = self.service_state.settings_reader.get_updated_settings();
let network_adapter = A::new(network_relay).await;
let tip = Tip;
// TODO: fix
let priv_key = self
.service_state
.settings_reader
.get_updated_settings()
.private_key;
let node_id = priv_key;
let fountain = F::new(fountain_settings);
let leadership = Leadership::new(priv_key, mempool_relay);
let leadership = Leadership::new(private_key, mempool_relay);
loop {
let view = view_generator.next().await;
// if we want to process multiple views at the same time this can
// be spawned as a separate future
// TODO: add leadership module
view.resolve::<A, Member<'_, COMMITTEE_SIZE>, _, _>(
node_id,
view.resolve::<A, Member<'_, COMMITTEE_SIZE>, _, _, _>(
private_key,
&tip,
&network_adapter,
&fountain,
&leadership,
)
.await;
@ -139,8 +154,9 @@ where
}
}
impl<A, P, M> CarnotConsensus<A, P, M>
impl<A, P, M, F> CarnotConsensus<A, P, M, F>
where
F: FountainCode + Send + Sync + 'static,
A: NetworkAdapter + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
P::Settings: Clone + Send + Sync + 'static,
@ -182,31 +198,47 @@ impl View {
const APPROVAL_THRESHOLD: usize = 1;
// TODO: might want to encode steps in the type system
async fn resolve<'view, A, O, Tx, Id>(
async fn resolve<'view, A, O, F, Tx, Id>(
&'view self,
node_id: NodeId,
tip: &Tip,
adapter: &A,
fountain: &F,
leadership: &Leadership<Tx, Id>,
) where
A: NetworkAdapter + Send + Sync + 'static,
O: Overlay<'view, A>,
F: FountainCode,
O: Overlay<'view, A, F>,
{
let overlay = O::new(self, node_id);
let block = if let LeadershipResult::Leader { block, .. } =
leadership.try_propose_block(self, tip).await
{
block
Ok(block)
} else {
overlay.reconstruct_proposal_block(adapter).await
overlay.reconstruct_proposal_block(adapter, fountain).await
};
// TODO: verify?
overlay.broadcast_block(block.clone(), adapter).await;
self.approve(&overlay, block, adapter).await;
match block {
Ok(block) => {
// TODO: verify?
overlay
.broadcast_block(block.clone(), adapter, fountain)
.await;
self.approve(&overlay, block, adapter).await;
}
Err(_e) => {
// TODO: log error
}
}
}
async fn approve<'view, Network: NetworkAdapter, O: Overlay<'view, Network>>(
async fn approve<
'view,
Network: NetworkAdapter,
Fountain: FountainCode,
O: Overlay<'view, Network, Fountain>,
>(
&'view self,
overlay: &O,
block: Block,

View File

@ -1,5 +1,6 @@
// std
// crates
use bytes::Bytes;
use futures::{Stream, StreamExt};
use tokio_stream::wrappers::BroadcastStream;
// internal
@ -8,7 +9,6 @@ use crate::network::{
NetworkAdapter,
};
use crate::{Approval, View};
use nomos_core::block::BlockChunk;
use nomos_network::{
backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage},
NetworkMsg, NetworkService,
@ -16,18 +16,15 @@ use nomos_network::{
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic};
static WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
WakuPubSubTopic::new("CarnotSim", Encoding::Proto);
static WAKU_CARNOT_BLOCK_CONTENT_TOPIC: WakuContentTopic =
const WAKU_CARNOT_BLOCK_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new("CarnotSim", 1, "CarnotBlock", Encoding::Proto);
static WAKU_CARNOT_APPROVAL_CONTENT_TOPIC: WakuContentTopic =
const WAKU_CARNOT_APPROVAL_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new("CarnotSim", 1, "CarnotApproval", Encoding::Proto);
// TODO: ehm...this should be here, but we will change it whenever the chunking is decided.
const CHUNK_SIZE: usize = 8;
pub struct WakuAdapter {
network_relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
}
@ -64,43 +61,40 @@ impl NetworkAdapter for WakuAdapter {
Self { network_relay }
}
async fn proposal_chunks_stream(&self) -> Box<dyn Stream<Item = BlockChunk>> {
async fn proposal_chunks_stream(&self) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
let stream_channel = self
.message_subscriber_channel()
.await
.unwrap_or_else(|_e| todo!("handle error"));
Box::new(
BroadcastStream::new(stream_channel).filter_map(|msg| async move {
Box::new(BroadcastStream::new(stream_channel).filter_map(|msg| {
Box::pin(async move {
match msg {
Ok(NetworkEvent::RawMessage(message)) => {
// TODO: this should actually check the whole content topic,
// waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28)
if WAKU_CARNOT_BLOCK_CONTENT_TOPIC.content_topic_name
== message.content_topic().content_topic_name
{
let payload = message.payload();
Some(
ProposalChunkMsg::from_bytes::<CHUNK_SIZE>(
payload.try_into().unwrap(),
)
.chunk,
)
} else {
None
Ok(event) => match event {
NetworkEvent::RawMessage(message) => {
// TODO: this should actually check the whole content topic,
// waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28)
if WAKU_CARNOT_BLOCK_CONTENT_TOPIC.content_topic_name
== message.content_topic().content_topic_name
{
let payload = message.payload();
Some(ProposalChunkMsg::from_bytes(payload).chunk)
} else {
None
}
}
}
},
Err(_e) => None,
}
}),
)
})
}))
}
async fn broadcast_block_chunk(&self, _view: View, chunk_message: ProposalChunkMsg) {
async fn broadcast_block_chunk(&self, _view: &View, chunk_message: ProposalChunkMsg) {
// TODO: probably later, depending on the view we should map to different content topics
// but this is an ongoing idea that should/will be discus.
let message = WakuMessage::new::<[u8; CHUNK_SIZE]>(
let message = WakuMessage::new(
chunk_message.as_bytes(),
WAKU_CARNOT_BLOCK_CONTENT_TOPIC.clone(),
WAKU_CARNOT_BLOCK_CONTENT_TOPIC,
1,
chrono::Utc::now().timestamp() as usize,
);
@ -147,7 +141,7 @@ impl NetworkAdapter for WakuAdapter {
async fn forward_approval(&self, approval_message: ApprovalMsg) {
let message = WakuMessage::new(
approval_message.as_bytes(),
WAKU_CARNOT_APPROVAL_CONTENT_TOPIC.clone(),
WAKU_CARNOT_APPROVAL_CONTENT_TOPIC,
1,
chrono::Utc::now().timestamp() as usize,
);

View File

@ -1,25 +1,21 @@
// std
// crates
use bytes::Bytes;
// internal
use crate::{Approval, NodeId};
use nomos_core::block::BlockChunk;
pub struct ProposalChunkMsg {
pub chunk: BlockChunk,
pub chunk: Bytes,
}
// TODO: this is completely temporal and match no reality at all, but it will help use fake some of the process
impl ProposalChunkMsg {
pub fn as_bytes<const SIZE: usize>(&self) -> [u8; SIZE] {
[self.chunk.index; SIZE]
pub fn as_bytes(&self) -> &[u8] {
&self.chunk
}
pub fn from_bytes<const SIZE: usize>(data: [u8; SIZE]) -> Self {
let index = data[0];
pub fn from_bytes(data: &[u8]) -> Self {
Self {
chunk: BlockChunk { index },
chunk: Bytes::from(data.to_vec()),
}
}
}

View File

@ -1,14 +1,13 @@
pub mod adapters;
mod messages;
pub mod messages;
// std
use bytes::Bytes;
// crates
use futures::Stream;
// internal
use crate::network::messages::{ApprovalMsg, ProposalChunkMsg};
use crate::{Approval, View};
use nomos_core::block::BlockChunk;
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::services::relay::OutboundRelay;
@ -20,8 +19,8 @@ pub trait NetworkAdapter {
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self;
async fn proposal_chunks_stream(&self) -> Box<dyn Stream<Item = BlockChunk>>;
async fn broadcast_block_chunk(&self, view: View, chunk_msg: ProposalChunkMsg);
async fn proposal_chunks_stream(&self) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin>;
async fn broadcast_block_chunk(&self, view: &View, chunk_msg: ProposalChunkMsg);
async fn approvals_stream(&self) -> Box<dyn Stream<Item = Approval>>;
async fn forward_approval(&self, approval: ApprovalMsg);
}

View File

@ -1,9 +1,11 @@
// std
use std::pin::Pin;
// crates
use futures::StreamExt;
use rand::{seq::SliceRandom, SeedableRng};
// internal
use super::*;
use crate::network::messages::ProposalChunkMsg;
use crate::network::NetworkAdapter;
/// View of the tree overlay centered around a specific member
@ -93,20 +95,38 @@ impl<'view, const C: usize> Member<'view, C> {
}
#[async_trait::async_trait]
impl<'view, Network: NetworkAdapter + Send + Sync, const C: usize> Overlay<'view, Network>
for Member<'view, C>
impl<
'view,
Network: NetworkAdapter + Send + Sync,
Fountain: FountainCode + Send + Sync,
const C: usize,
> Overlay<'view, Network, Fountain> for Member<'view, C>
{
fn new(view: &'view View, node: NodeId) -> Self {
let committees = Committees::new(view);
committees.into_member(node).unwrap()
}
async fn reconstruct_proposal_block(&self, adapter: &Network) -> Block {
todo!()
async fn reconstruct_proposal_block(
&self,
adapter: &Network,
fountain: &Fountain,
) -> Result<Block, FountainError> {
let message_stream = adapter.proposal_chunks_stream().await;
fountain.decode(message_stream).await.map(Block::from_bytes)
}
async fn broadcast_block(&self, _block: Block, adapter: &Network) {
todo!()
async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain) {
let block_bytes = block.as_bytes();
let encoded_stream = fountain.encode(&block_bytes);
encoded_stream
.for_each_concurrent(None, |chunk| async move {
let message = ProposalChunkMsg { chunk };
adapter
.broadcast_block_chunk(self.committees.view, message)
.await;
})
.await;
}
async fn collect_approvals(

View File

@ -8,14 +8,19 @@ use super::{Approval, NodeId, View};
use crate::network::NetworkAdapter;
pub use committees::Member;
use nomos_core::block::Block;
use nomos_core::fountain::{FountainCode, FountainError};
// Dissamination overlay, tied to a specific view
/// Dissemination overlay, tied to a specific view
#[async_trait::async_trait]
pub trait Overlay<'view, Network: NetworkAdapter> {
pub trait Overlay<'view, Network: NetworkAdapter, Fountain: FountainCode> {
fn new(view: &'view View, node: NodeId) -> Self;
async fn reconstruct_proposal_block(&self, adapter: &Network) -> Block;
async fn broadcast_block(&self, block: Block, adapter: &Network);
async fn reconstruct_proposal_block(
&self,
adapter: &Network,
fountain: &Fountain,
) -> Result<Block, FountainError>;
async fn broadcast_block(&self, block: Block, adapter: &Network, fountain: &Fountain);
async fn collect_approvals(
&self,
block: Block,

View File

@ -14,10 +14,10 @@ use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use waku_bindings::{Encoding, WakuContentTopic, WakuPubSubTopic};
static WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
WakuPubSubTopic::new("CarnotSim", Encoding::Proto);
static WAKU_CARNOT_TX_CONTENT_TOPIC: WakuContentTopic =
const WAKU_CARNOT_TX_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new("CarnotSim", 1, "CarnotTx", Encoding::Proto);
pub struct WakuAdapter<Tx> {

View File

@ -7,13 +7,13 @@ pub mod waku;
#[async_trait::async_trait]
pub trait NetworkBackend {
type Config: Clone + Debug + Send + Sync + 'static;
type State: ServiceState<Settings = Self::Config> + Clone;
type Settings: Clone + Debug + Send + Sync + 'static;
type State: ServiceState<Settings = Self::Settings> + Clone;
type Message: Debug + Send + Sync + 'static;
type EventKind: Debug + Send + Sync + 'static;
type NetworkEvent: Debug + Send + Sync + 'static;
fn new(config: Self::Config) -> Self;
fn new(config: Self::Settings) -> Self;
async fn process(&self, msg: Self::Message);
async fn subscribe(&mut self, event: Self::EventKind) -> Receiver<Self::NetworkEvent>;
}

View File

@ -60,13 +60,13 @@ pub enum NetworkEvent {
#[async_trait::async_trait]
impl NetworkBackend for Waku {
type Config = WakuConfig;
type Settings = WakuConfig;
type State = NoState<WakuConfig>;
type Message = WakuBackendMessage;
type EventKind = EventKind;
type NetworkEvent = NetworkEvent;
fn new(config: Self::Config) -> Self {
fn new(config: Self::Settings) -> Self {
let waku = waku_new(Some(config.inner)).unwrap().start().unwrap();
waku.relay_subscribe(None).unwrap();
tracing::info!("waku listening on {}", waku.listen_addresses().unwrap()[0]);

View File

@ -41,7 +41,7 @@ impl<T: NetworkBackend + 'static> RelayMessage for NetworkMsg<T> {}
#[derive(Serialize, Deserialize)]
pub struct NetworkConfig<B: NetworkBackend> {
pub backend: B::Config,
pub backend: B::Settings,
}
impl<B: NetworkBackend> Debug for NetworkConfig<B> {