feature(wire): Wrap errors (#950)

This commit is contained in:
Álex 2024-12-19 19:11:21 +01:00 committed by GitHub
parent 739df1737a
commit 1260230898
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 223 additions and 202 deletions

View File

@ -1,160 +0,0 @@
//! Serializer and Deserializer for wire formats.
// TODO: we're using bincode for now, but might need strong guarantees about
// the underlying format in the future for standardization.
use bincode::{
config::{
Bounded, DefaultOptions, FixintEncoding, LittleEndian, RejectTrailing, WithOtherEndian,
WithOtherIntEncoding, WithOtherLimit, WithOtherTrailing,
},
de::read::SliceReader,
Options,
};
use once_cell::sync::Lazy;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
pub type Error = bincode::Error;
pub type ErrorKind = bincode::ErrorKind;
// type composition is cool but also makes naming types a bit akward
type BincodeOptions = WithOtherTrailing<
WithOtherIntEncoding<
WithOtherLimit<WithOtherEndian<DefaultOptions, LittleEndian>, Bounded>,
FixintEncoding,
>,
RejectTrailing,
>;
// TODO: Risc0 proofs are HUGE (220 Kb) and it's the only reason we need to have this
// limit so large. Remove this once we transition to smaller proofs.
const DATA_LIMIT: u64 = 1 << 18; // Do not serialize/deserialize more than 256 KiB
static OPTIONS: Lazy<BincodeOptions> = Lazy::new(|| {
bincode::DefaultOptions::new()
.with_little_endian()
.with_limit(DATA_LIMIT)
.with_fixint_encoding()
.reject_trailing_bytes()
});
type BincodeDeserializer<'de> = bincode::Deserializer<SliceReader<'de>, BincodeOptions>;
type BincodeSerializer<T> = bincode::Serializer<T, BincodeOptions>;
pub struct Deserializer<'de> {
inner: BincodeDeserializer<'de>,
}
pub struct Serializer<T> {
inner: BincodeSerializer<T>,
}
impl<'de> Deserializer<'de> {
pub fn get_deserializer(&mut self) -> impl serde::Deserializer<'de> + '_ {
&mut self.inner
}
pub fn deserialize<T: Deserialize<'de>>(&mut self) -> Result<T, Error> {
<T>::deserialize(&mut self.inner)
}
}
impl<T: std::io::Write> Serializer<T> {
pub fn get_serializer(&mut self) -> impl serde::Serializer + '_ {
&mut self.inner
}
pub fn serialize_into<U: Serialize>(
&mut self,
item: &U,
) -> Result<<&mut BincodeSerializer<T> as serde::Serializer>::Ok, Error> {
item.serialize(&mut self.inner)
}
}
/// Return a deserializer for wire format
///
/// We only operator on in-memory slices as to abstract
/// any underlying protocol. See https://sans-io.readthedocs.io/how-to-sans-io.html
pub fn deserializer(data: &[u8]) -> Deserializer<'_> {
Deserializer {
inner: bincode::de::Deserializer::from_slice(data, *OPTIONS),
}
}
/// Return a serializer for wire format.
///
/// We only operator on in-memory slices as to abstract
/// any underlying protocol. See https://sans-io.readthedocs.io/how-to-sans-io.html
pub fn serializer(buffer: &mut Vec<u8>) -> Serializer<&'_ mut Vec<u8>> {
Serializer {
inner: bincode::Serializer::new(buffer, *OPTIONS),
}
}
/// Return a serializer for wire format that overwrites (but now grow) the provided
/// buffer.
///
/// We only operator on in-memory slices as to abstract
/// any underlying protocol. See https://sans-io.readthedocs.io/how-to-sans-io.html
pub fn serializer_into_buffer(buffer: &mut [u8]) -> Serializer<&'_ mut [u8]> {
Serializer {
inner: bincode::Serializer::new(buffer, *OPTIONS),
}
}
/// Serialize an object directly into a vec
pub fn serialize<T: Serialize>(item: &T) -> Result<Vec<u8>, Error> {
let size = OPTIONS.serialized_size(item)?;
let mut buf = Vec::with_capacity(size as usize);
serializer(&mut buf).serialize_into(item)?;
Ok(buf)
}
/// Deserialize an object directly
pub fn deserialize<T: DeserializeOwned>(item: &[u8]) -> Result<T, Error> {
deserializer(item).deserialize()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ser_de() {
let tmp = String::from("much wow, very cool");
let mut buf = Vec::new();
serializer(&mut buf).serialize_into(&tmp).unwrap();
let deserialized = deserializer(&buf).deserialize::<String>().unwrap();
assert_eq!(tmp, deserialized);
}
#[test]
fn ser_de_slice() {
let tmp = String::from("much wow, very cool");
let mut buf = vec![0; 1024];
serializer_into_buffer(&mut buf)
.serialize_into(&tmp)
.unwrap();
let deserialized = deserializer(&buf).deserialize::<String>().unwrap();
assert_eq!(tmp, deserialized);
}
#[test]
fn ser_de_owned() {
let tmp = String::from("much wow, very cool");
let serialized = serialize(&tmp).unwrap();
let deserialized = deserializer(&serialized).deserialize::<String>().unwrap();
assert_eq!(tmp, deserialized);
}
#[test]
fn ser_de_inner() {
let tmp = String::from("much wow, very cool");
let mut buf = Vec::new();
let mut serializer = serializer(&mut buf);
tmp.serialize(serializer.get_serializer()).unwrap();
let mut deserializer = deserializer(&buf);
let deserialized = <String>::deserialize(deserializer.get_deserializer()).unwrap();
assert_eq!(tmp, deserialized);
}
}

View File

@ -0,0 +1,46 @@
// Crates
use bincode::config::{
Bounded, FixintEncoding, LittleEndian, RejectTrailing, WithOtherEndian, WithOtherIntEncoding,
WithOtherLimit, WithOtherTrailing,
};
use bincode::de::read::SliceReader;
use bincode::Options;
use once_cell::sync::Lazy;
// Type composition is cool but also makes naming types a bit awkward
pub(crate) type BincodeOptions = WithOtherTrailing<
WithOtherIntEncoding<
WithOtherLimit<WithOtherEndian<bincode::DefaultOptions, LittleEndian>, Bounded>,
FixintEncoding,
>,
RejectTrailing,
>;
// TODO: Remove this once we transition to smaller proofs
// Risc0 proofs are HUGE (220 Kb) and it's the only reason we need to have this limit so large
pub(crate) const DATA_LIMIT: u64 = 1 << 18; // Do not serialize/deserialize more than 256 KiB
pub(crate) static OPTIONS: Lazy<BincodeOptions> = Lazy::new(|| {
bincode::DefaultOptions::new()
.with_little_endian()
.with_limit(DATA_LIMIT)
.with_fixint_encoding()
.reject_trailing_bytes()
});
pub(crate) type BincodeDeserializer<'de> = bincode::Deserializer<SliceReader<'de>, BincodeOptions>;
pub(crate) type BincodeSerializer<T> = bincode::Serializer<T, BincodeOptions>;
pub(crate) fn clone_bincode_error(error: &bincode::Error) -> bincode::Error {
use bincode::ErrorKind;
Box::new(match error.as_ref() {
ErrorKind::Io(error) => ErrorKind::Io(std::io::Error::new(error.kind(), error.to_string())),
ErrorKind::InvalidUtf8Encoding(error) => ErrorKind::InvalidUtf8Encoding(*error),
ErrorKind::InvalidBoolEncoding(bool) => ErrorKind::InvalidBoolEncoding(*bool),
ErrorKind::InvalidCharEncoding => ErrorKind::InvalidCharEncoding,
ErrorKind::InvalidTagEncoding(tag) => ErrorKind::InvalidTagEncoding(*tag),
ErrorKind::DeserializeAnyNotSupported => ErrorKind::DeserializeAnyNotSupported,
ErrorKind::SizeLimit => ErrorKind::SizeLimit,
ErrorKind::SequenceMustHaveLength => ErrorKind::SequenceMustHaveLength,
ErrorKind::Custom(custom) => ErrorKind::Custom(custom.clone()),
})
}

View File

@ -0,0 +1,33 @@
// STD
// Crates
use serde::de::DeserializeOwned;
use serde::Deserialize;
// Internal
use crate::wire::bincode::{BincodeDeserializer, OPTIONS};
use crate::wire::Error;
pub struct Deserializer<'de> {
inner: BincodeDeserializer<'de>,
}
impl<'de> Deserializer<'de> {
pub fn get_deserializer(&mut self) -> impl serde::Deserializer<'de> + '_ {
&mut self.inner
}
pub fn deserialize<T: Deserialize<'de>>(&mut self) -> crate::wire::Result<T> {
<T>::deserialize(&mut self.inner).map_err(Error::Deserialize)
}
}
/// Return a deserializer for wire format
///
/// We only operator on in-memory slices as to abstract
/// any underlying protocol. See https://sans-io.readthedocs.io/how-to-sans-io.html
pub fn deserializer(data: &[u8]) -> Deserializer<'_> {
Deserializer {
inner: bincode::de::Deserializer::from_slice(data, *OPTIONS),
}
}
/// Deserialize an object directly
pub fn deserialize<T: DeserializeOwned>(item: &[u8]) -> crate::wire::Result<T> {
deserializer(item).deserialize()
}

View File

@ -0,0 +1,26 @@
//STD
use crate::wire::bincode::clone_bincode_error;
use std::io;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Failed to serialize message: {0}")]
Serialize(bincode::Error),
#[error("Failed to deserialize message: {0}")]
Deserialize(bincode::Error),
}
impl From<Error> for io::Error {
fn from(value: Error) -> Self {
io::Error::new(io::ErrorKind::InvalidData, value)
}
}
impl Clone for Error {
fn clone(&self) -> Self {
match self {
Error::Serialize(error) => Error::Serialize(clone_bincode_error(error)),
Error::Deserialize(error) => Error::Deserialize(clone_bincode_error(error)),
}
}
}

View File

@ -0,0 +1,58 @@
//! Serializer for wire formats.
// TODO: we're using bincode for now, but might need strong guarantees about
// the underlying format in the future for standardization.
// Internals
pub(crate) mod bincode;
pub mod deserialization;
pub mod errors;
pub mod serialization;
// Exports
pub use deserialization::{deserialize, deserializer};
pub use errors::Error;
pub use serialization::{serialize, serializer, serializer_into_buffer};
pub type Result<T> = std::result::Result<T, Error>;
#[cfg(test)]
mod tests {
use super::*;
use serde::{Deserialize, Serialize};
#[test]
fn serialize_deserialize() {
let tmp = String::from("much wow, very cool");
let mut buf = Vec::new();
serializer(&mut buf).serialize_into(&tmp).unwrap();
let deserialized = deserializer(&buf).deserialize::<String>().unwrap();
assert_eq!(tmp, deserialized);
}
#[test]
fn serialize_deserialize_slice() {
let tmp = String::from("much wow, very cool");
let mut buf = vec![0; 1024];
serializer_into_buffer(&mut buf)
.serialize_into(&tmp)
.unwrap();
let deserialized = deserializer(&buf).deserialize::<String>().unwrap();
assert_eq!(tmp, deserialized);
}
#[test]
fn serialize_deserialize_owned() {
let tmp = String::from("much wow, very cool");
let serialized = serialize(&tmp).unwrap();
let deserialized = deserializer(&serialized).deserialize::<String>().unwrap();
assert_eq!(tmp, deserialized);
}
#[test]
fn serialize_deserialize_inner() {
let tmp = String::from("much wow, very cool");
let mut buf = Vec::new();
let mut serializer = serializer(&mut buf);
tmp.serialize(serializer.get_serializer()).unwrap();
let mut deserializer = deserializer(&buf);
let deserialized = <String>::deserialize(deserializer.get_deserializer()).unwrap();
assert_eq!(tmp, deserialized);
}
}

View File

@ -0,0 +1,53 @@
use bincode::Options;
// STD
// Crates
use serde::Serialize;
// Internal
use crate::wire::bincode::{BincodeSerializer, OPTIONS};
use crate::wire::{Error, Result};
pub struct Serializer<T> {
inner: BincodeSerializer<T>,
}
impl<T: std::io::Write> Serializer<T> {
pub fn get_serializer(&mut self) -> impl serde::Serializer + '_ {
&mut self.inner
}
pub fn serialize_into<U: Serialize>(
&mut self,
item: &U,
) -> Result<<&mut BincodeSerializer<T> as serde::Serializer>::Ok> {
item.serialize(&mut self.inner).map_err(Error::Serialize)
}
}
/// Return a serializer for wire format.
///
/// We only operator on in-memory slices as to abstract
/// any underlying protocol. See https://sans-io.readthedocs.io/how-to-sans-io.html
pub fn serializer(buffer: &mut Vec<u8>) -> Serializer<&'_ mut Vec<u8>> {
Serializer {
inner: bincode::Serializer::new(buffer, *OPTIONS),
}
}
/// Return a serializer for wire format that overwrites (but now grow) the provided
/// buffer.
///
/// We only operator on in-memory slices as to abstract
/// any underlying protocol. See https://sans-io.readthedocs.io/how-to-sans-io.html
pub fn serializer_into_buffer(buffer: &mut [u8]) -> Serializer<&'_ mut [u8]> {
Serializer {
inner: bincode::Serializer::new(buffer, *OPTIONS),
}
}
/// Serialize an object directly into a vec
pub fn serialize<T: Serialize>(item: &T) -> Result<Vec<u8>> {
let size = OPTIONS.serialized_size(item).map_err(Error::Serialize)?;
let mut buf = Vec::with_capacity(size as usize);
serializer(&mut buf).serialize_into(item)?;
Ok(buf)
}

View File

@ -29,3 +29,4 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
libp2p = { version = "0.53", features = ["ed25519", "ping", "macros", "quic", "tcp", "yamux", "noise"] } libp2p = { version = "0.53", features = ["ed25519", "ping", "macros", "quic", "tcp", "yamux", "noise"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
kzgrs-backend = { path = "../../kzgrs-backend", features = ["testutils"] } kzgrs-backend = { path = "../../kzgrs-backend", features = ["testutils"] }
blake2 = "0.10"

View File

@ -25,7 +25,6 @@ use tracing::error;
// internal // internal
use crate::address_book::AddressBook; use crate::address_book::AddressBook;
use crate::protocol::DISPERSAL_PROTOCOL; use crate::protocol::DISPERSAL_PROTOCOL;
use crate::protocols::clone_deserialize_error;
use crate::SubnetworkId; use crate::SubnetworkId;
use kzgrs_backend::common::blob::DaBlob; use kzgrs_backend::common::blob::DaBlob;
use nomos_core::da::BlobId; use nomos_core::da::BlobId;
@ -101,7 +100,7 @@ impl Clone for DispersalError {
blob_id, blob_id,
subnetwork_id, subnetwork_id,
} => DispersalError::Serialization { } => DispersalError::Serialization {
error: clone_deserialize_error(error), error: error.clone(),
blob_id: *blob_id, blob_id: *blob_id,
subnetwork_id: *subnetwork_id, subnetwork_id: *subnetwork_id,
}, },

View File

@ -1,19 +1,3 @@
pub mod dispersal; pub mod dispersal;
pub mod replication; pub mod replication;
pub mod sampling; pub mod sampling;
use nomos_core::wire;
fn clone_deserialize_error(error: &wire::Error) -> wire::Error {
use wire::ErrorKind;
Box::new(match error.as_ref() {
ErrorKind::Io(error) => ErrorKind::Io(std::io::Error::new(error.kind(), error.to_string())),
ErrorKind::InvalidUtf8Encoding(error) => ErrorKind::InvalidUtf8Encoding(*error),
ErrorKind::InvalidBoolEncoding(bool) => ErrorKind::InvalidBoolEncoding(*bool),
ErrorKind::InvalidCharEncoding => ErrorKind::InvalidCharEncoding,
ErrorKind::InvalidTagEncoding(tag) => ErrorKind::InvalidTagEncoding(*tag),
ErrorKind::DeserializeAnyNotSupported => ErrorKind::DeserializeAnyNotSupported,
ErrorKind::SizeLimit => ErrorKind::SizeLimit,
ErrorKind::SequenceMustHaveLength => ErrorKind::SequenceMustHaveLength,
ErrorKind::Custom(custom) => ErrorKind::Custom(custom.clone()),
})
}

View File

@ -6,6 +6,7 @@ mod test {
use crate::protocols::replication::behaviour::{ReplicationBehaviour, ReplicationEvent}; use crate::protocols::replication::behaviour::{ReplicationBehaviour, ReplicationEvent};
use crate::protocols::replication::handler::DaMessage; use crate::protocols::replication::handler::DaMessage;
use crate::test_utils::AllNeighbours; use crate::test_utils::AllNeighbours;
use blake2::{Blake2s256, Digest};
use futures::StreamExt; use futures::StreamExt;
use kzgrs_backend::testutils::get_da_blob; use kzgrs_backend::testutils::get_da_blob;
@ -95,12 +96,7 @@ mod test {
tokio::select! { tokio::select! {
// send a message everytime that the channel ticks // send a message everytime that the channel ticks
_ = receiver.recv() => { _ = receiver.recv() => {
// let blob_id_bytes: [u8; 32] = i.to_be_bytes().to_vec().try_into().unwrap(); let blob_id_bytes = Blake2s256::digest(i.to_be_bytes().as_slice());
let mut blob_id_bytes = [0; 32];
let b = i.to_be_bytes();
assert!(b.len() <= blob_id_bytes.len());
blob_id_bytes[0..b.len()].copy_from_slice(&b);
assert_eq!(blob_id_bytes.len(), 32); assert_eq!(blob_id_bytes.len(), 32);
let blob = Blob::new( let blob = Blob::new(

View File

@ -31,7 +31,6 @@ use tracing::error;
// internal // internal
use crate::address_book::AddressBook; use crate::address_book::AddressBook;
use crate::protocol::SAMPLING_PROTOCOL; use crate::protocol::SAMPLING_PROTOCOL;
use crate::protocols::clone_deserialize_error;
use crate::SubnetworkId; use crate::SubnetworkId;
#[derive(Debug, Error)] #[derive(Debug, Error)]
@ -134,7 +133,7 @@ impl Clone for SamplingError {
blob_id: *blob_id, blob_id: *blob_id,
subnetwork_id: *subnetwork_id, subnetwork_id: *subnetwork_id,
peer_id: *peer_id, peer_id: *peer_id,
error: clone_deserialize_error(error), error: error.clone(),
}, },
SamplingError::RequestChannel { request, peer_id } => SamplingError::RequestChannel { SamplingError::RequestChannel { request, peer_id } => SamplingError::RequestChannel {
request: request.clone(), request: request.clone(),

View File

@ -12,20 +12,6 @@ type LenType = u16;
const MAX_MSG_LEN_BYTES: usize = size_of::<LenType>(); const MAX_MSG_LEN_BYTES: usize = size_of::<LenType>();
const MAX_MSG_LEN: usize = 1 << (MAX_MSG_LEN_BYTES * 8); const MAX_MSG_LEN: usize = 1 << (MAX_MSG_LEN_BYTES * 8);
fn into_failed_to_serialize(error: wire::Error) -> io::Error {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Failed to serialize message: {}", error),
)
}
fn into_failed_to_deserialize(error: wire::Error) -> io::Error {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Failed to deserialize message: {}", error),
)
}
struct MessageTooLargeError(usize); struct MessageTooLargeError(usize);
impl From<MessageTooLargeError> for io::Error { impl From<MessageTooLargeError> for io::Error {
@ -44,7 +30,7 @@ pub fn pack<Message>(message: &Message) -> Result<Vec<u8>>
where where
Message: Serialize, Message: Serialize,
{ {
wire::serialize(message).map_err(into_failed_to_serialize) wire::serialize(message).map_err(io::Error::from)
} }
fn get_packed_message_size(packed_message: &[u8]) -> Result<usize> { fn get_packed_message_size(packed_message: &[u8]) -> Result<usize> {
@ -84,7 +70,7 @@ where
} }
pub fn unpack<M: DeserializeOwned>(data: &[u8]) -> Result<M> { pub fn unpack<M: DeserializeOwned>(data: &[u8]) -> Result<M> {
wire::deserialize(data).map_err(into_failed_to_deserialize) wire::deserialize(data).map_err(io::Error::from)
} }
pub async fn unpack_from_reader<Message, R>(reader: &mut R) -> Result<Message> pub async fn unpack_from_reader<Message, R>(reader: &mut R) -> Result<Message>