diff --git a/nomos-core/Cargo.toml b/nomos-core/Cargo.toml index ea193c62..ab03b4ad 100644 --- a/nomos-core/Cargo.toml +++ b/nomos-core/Cargo.toml @@ -9,3 +9,17 @@ authors = [ # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = { version = "0.1" } +bytes = "1.3" +futures = "0.3" +raptorq = { version = "1.7", optional = true } +thiserror = "1.0" + +[dev-dependencies] +rand = "0.8" +tokio = { version = "1.23", features = ["macros", "rt"] } + + +[features] +default = [] +raptor = ["raptorq"] \ No newline at end of file diff --git a/nomos-core/src/fountain/mod.rs b/nomos-core/src/fountain/mod.rs new file mode 100644 index 00000000..cafcee30 --- /dev/null +++ b/nomos-core/src/fountain/mod.rs @@ -0,0 +1,43 @@ +#[cfg(feature = "raptor")] +pub mod raptorq; + +// std +use std::error::Error; +// crates +use async_trait; +use bytes::Bytes; +use futures::Stream; +use thiserror::Error; +// internal + +/// FountainCode trait main error type +/// Wrapper around generic whichever error type the fountain code implementation uses +#[derive(Error, Debug)] +#[error(transparent)] +pub struct FountainError(#[from] Box); + +impl From<&str> for FountainError { + fn from(value: &str) -> Self { + FountainError(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + value, + ))) + } +} + +/// [FountainCode](https://en.wikipedia.org/wiki/Fountain_code) +/// Chop a block of data into chunks and reassembling trait +#[async_trait::async_trait] +pub trait FountainCode { + type Settings; + /// Encode a block of data into a stream of chunks + fn encode( + block: &[u8], + settings: &Self::Settings, + ) -> Box + Send + Sync + Unpin>; + /// Decode a stream of chunks into a block of data + async fn decode( + stream: impl Stream + Send + Sync + Unpin, + settings: &Self::Settings, + ) -> Result; +} diff --git a/nomos-core/src/fountain/raptorq.rs b/nomos-core/src/fountain/raptorq.rs new file mode 100644 index 00000000..e90a9ed5 --- /dev/null +++ b/nomos-core/src/fountain/raptorq.rs @@ -0,0 +1,110 @@ +// std +// crates +use bytes::Bytes; +use futures::{Stream, StreamExt}; +use raptorq::{Decoder, Encoder, EncodingPacket, ObjectTransmissionInformation}; +// internal +use crate::fountain::{FountainCode, FountainError}; + +/// [RaptorQ](https://en.wikipedia.org/wiki/Raptor_code#RaptorQ_code) implementation of [`FountainCode`] trait +pub struct RaptorQFountain; + +/// Settings for [`RaptorQFountain`] code +pub struct RaptorQSettings { + pub transmission_information: ObjectTransmissionInformation, + pub repair_packets_per_block: u32, +} + +/// RaptorQ implementation of [`FountainCode`] trait +/// Wrapper around the [`raptorq`](https://crates.io/crates/raptorq) crate +#[async_trait::async_trait] +impl FountainCode for RaptorQFountain { + type Settings = RaptorQSettings; + fn encode( + block: &[u8], + settings: &Self::Settings, + ) -> Box + Send + Sync + Unpin> { + let encoder = Encoder::new(block, settings.transmission_information); + Box::new(futures::stream::iter( + encoder + .get_encoded_packets(settings.repair_packets_per_block) + .into_iter() + .map(|packet| packet.serialize().into()), + )) + } + + async fn decode( + mut stream: impl Stream + Send + Sync + Unpin, + settings: &Self::Settings, + ) -> Result { + let mut decoder = Decoder::new(settings.transmission_information); + while let Some(chunk) = stream.next().await { + let packet = EncodingPacket::deserialize(&chunk); + if let Some(result) = decoder.decode(packet) { + return Ok(Bytes::from(result)); + } + } + Err("Stream ended before decoding was complete".into()) + } +} + +#[cfg(test)] +mod test { + use crate::fountain::raptorq::RaptorQFountain; + use crate::fountain::{FountainCode, FountainError}; + use bytes::Bytes; + use futures::StreamExt; + use rand::RngCore; + + #[tokio::test] + async fn random_encode_decode() -> Result<(), FountainError> { + const TRANSFER_LENGTH: usize = 1024; + // build settings + let settings = super::RaptorQSettings { + transmission_information: raptorq::ObjectTransmissionInformation::with_defaults( + TRANSFER_LENGTH as u64, + 1000, + ), + repair_packets_per_block: 10, + }; + + // create random payload + let mut payload = [0u8; TRANSFER_LENGTH]; + rand::thread_rng().fill_bytes(&mut payload); + let payload = Bytes::from(payload.to_vec()); + + // encode payload + let encoded = RaptorQFountain::encode(&payload, &settings); + + // reconstruct + let decoded = RaptorQFountain::decode(encoded, &settings).await?; + + assert_eq!(decoded, payload); + Ok(()) + } + + #[tokio::test] + async fn random_encode_decode_fails() { + const TRANSFER_LENGTH: usize = 1024; + // build settings + let settings = super::RaptorQSettings { + transmission_information: raptorq::ObjectTransmissionInformation::with_defaults( + TRANSFER_LENGTH as u64, + 1000, + ), + repair_packets_per_block: 10, + }; + + // create random payload + let mut payload = [0u8; TRANSFER_LENGTH]; + rand::thread_rng().fill_bytes(&mut payload); + let payload = Bytes::from(payload.to_vec()); + + // encode payload + let encoded = RaptorQFountain::encode(&payload, &settings); + + // reconstruct skipping packets, must fail + let decoded = RaptorQFountain::decode(encoded.skip(400), &settings).await; + assert!(decoded.is_err()); + } +} diff --git a/nomos-core/src/lib.rs b/nomos-core/src/lib.rs index c4198d78..616b68c0 100644 --- a/nomos-core/src/lib.rs +++ b/nomos-core/src/lib.rs @@ -1,3 +1,4 @@ pub mod block; pub mod crypto; +pub mod fountain; pub mod staking;