Fountain codes (#37)

* Add fountain module and raptorq basic implementation

* Add basic encode/decode test

* Use Stream for trait instead of Iterator

* Removed unnecessary pin

* Add custom fountain error

* Add failing path to tests

* Added docs
This commit is contained in:
Daniel Sanchez 2023-01-05 10:35:12 +01:00 committed by GitHub
parent 52708b8253
commit fc75ad1732
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 168 additions and 0 deletions

View File

@ -9,3 +9,17 @@ authors = [
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = { version = "0.1" }
bytes = "1.3"
futures = "0.3"
raptorq = { version = "1.7", optional = true }
thiserror = "1.0"
[dev-dependencies]
rand = "0.8"
tokio = { version = "1.23", features = ["macros", "rt"] }
[features]
default = []
raptor = ["raptorq"]

View File

@ -0,0 +1,43 @@
#[cfg(feature = "raptor")]
pub mod raptorq;
// std
use std::error::Error;
// crates
use async_trait;
use bytes::Bytes;
use futures::Stream;
use thiserror::Error;
// internal
/// FountainCode trait main error type
/// Wrapper around generic whichever error type the fountain code implementation uses
#[derive(Error, Debug)]
#[error(transparent)]
pub struct FountainError(#[from] Box<dyn Error + Send + Sync>);
impl From<&str> for FountainError {
fn from(value: &str) -> Self {
FountainError(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
value,
)))
}
}
/// [FountainCode](https://en.wikipedia.org/wiki/Fountain_code)
/// Chop a block of data into chunks and reassembling trait
#[async_trait::async_trait]
pub trait FountainCode {
type Settings;
/// Encode a block of data into a stream of chunks
fn encode(
block: &[u8],
settings: &Self::Settings,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin>;
/// Decode a stream of chunks into a block of data
async fn decode(
stream: impl Stream<Item = Bytes> + Send + Sync + Unpin,
settings: &Self::Settings,
) -> Result<Bytes, FountainError>;
}

View File

@ -0,0 +1,110 @@
// std
// crates
use bytes::Bytes;
use futures::{Stream, StreamExt};
use raptorq::{Decoder, Encoder, EncodingPacket, ObjectTransmissionInformation};
// internal
use crate::fountain::{FountainCode, FountainError};
/// [RaptorQ](https://en.wikipedia.org/wiki/Raptor_code#RaptorQ_code) implementation of [`FountainCode`] trait
pub struct RaptorQFountain;
/// Settings for [`RaptorQFountain`] code
pub struct RaptorQSettings {
pub transmission_information: ObjectTransmissionInformation,
pub repair_packets_per_block: u32,
}
/// RaptorQ implementation of [`FountainCode`] trait
/// Wrapper around the [`raptorq`](https://crates.io/crates/raptorq) crate
#[async_trait::async_trait]
impl FountainCode for RaptorQFountain {
type Settings = RaptorQSettings;
fn encode(
block: &[u8],
settings: &Self::Settings,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
let encoder = Encoder::new(block, settings.transmission_information);
Box::new(futures::stream::iter(
encoder
.get_encoded_packets(settings.repair_packets_per_block)
.into_iter()
.map(|packet| packet.serialize().into()),
))
}
async fn decode(
mut stream: impl Stream<Item = Bytes> + Send + Sync + Unpin,
settings: &Self::Settings,
) -> Result<Bytes, FountainError> {
let mut decoder = Decoder::new(settings.transmission_information);
while let Some(chunk) = stream.next().await {
let packet = EncodingPacket::deserialize(&chunk);
if let Some(result) = decoder.decode(packet) {
return Ok(Bytes::from(result));
}
}
Err("Stream ended before decoding was complete".into())
}
}
#[cfg(test)]
mod test {
use crate::fountain::raptorq::RaptorQFountain;
use crate::fountain::{FountainCode, FountainError};
use bytes::Bytes;
use futures::StreamExt;
use rand::RngCore;
#[tokio::test]
async fn random_encode_decode() -> Result<(), FountainError> {
const TRANSFER_LENGTH: usize = 1024;
// build settings
let settings = super::RaptorQSettings {
transmission_information: raptorq::ObjectTransmissionInformation::with_defaults(
TRANSFER_LENGTH as u64,
1000,
),
repair_packets_per_block: 10,
};
// create random payload
let mut payload = [0u8; TRANSFER_LENGTH];
rand::thread_rng().fill_bytes(&mut payload);
let payload = Bytes::from(payload.to_vec());
// encode payload
let encoded = RaptorQFountain::encode(&payload, &settings);
// reconstruct
let decoded = RaptorQFountain::decode(encoded, &settings).await?;
assert_eq!(decoded, payload);
Ok(())
}
#[tokio::test]
async fn random_encode_decode_fails() {
const TRANSFER_LENGTH: usize = 1024;
// build settings
let settings = super::RaptorQSettings {
transmission_information: raptorq::ObjectTransmissionInformation::with_defaults(
TRANSFER_LENGTH as u64,
1000,
),
repair_packets_per_block: 10,
};
// create random payload
let mut payload = [0u8; TRANSFER_LENGTH];
rand::thread_rng().fill_bytes(&mut payload);
let payload = Bytes::from(payload.to_vec());
// encode payload
let encoded = RaptorQFountain::encode(&payload, &settings);
// reconstruct skipping packets, must fail
let decoded = RaptorQFountain::decode(encoded.skip(400), &settings).await;
assert!(decoded.is_err());
}
}

View File

@ -1,3 +1,4 @@
pub mod block;
pub mod crypto;
pub mod fountain;
pub mod staking;