feature(da): Rust-based protobuf-like protocol (#945)
This commit is contained in:
parent
0e273c3f44
commit
d5d87b094c
|
@ -35,7 +35,7 @@ fn verify<const SIZE: usize>(bencher: Bencher, column_size: usize) {
|
||||||
let sk = SecretKey::key_gen(&buff, &[]).unwrap();
|
let sk = SecretKey::key_gen(&buff, &[]).unwrap();
|
||||||
let verifier = DaVerifier::new(
|
let verifier = DaVerifier::new(
|
||||||
sk.clone(),
|
sk.clone(),
|
||||||
(0..column_size as u32).collect(),
|
(0..column_size as u16).collect(),
|
||||||
GLOBAL_PARAMETERS.clone(),
|
GLOBAL_PARAMETERS.clone(),
|
||||||
);
|
);
|
||||||
let da_blob = DaBlob {
|
let da_blob = DaBlob {
|
||||||
|
|
|
@ -13,7 +13,7 @@ use crate::common::{
|
||||||
deserialize_canonical, deserialize_vec_canonical, serialize_canonical, serialize_vec_canonical,
|
deserialize_canonical, deserialize_vec_canonical, serialize_canonical, serialize_vec_canonical,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
pub struct DaBlob {
|
pub struct DaBlob {
|
||||||
pub column: Column,
|
pub column: Column,
|
||||||
pub column_idx: ColumnIndex,
|
pub column_idx: ColumnIndex,
|
||||||
|
@ -54,6 +54,10 @@ impl DaBlob {
|
||||||
hasher.update(self.column.as_bytes());
|
hasher.update(self.column.as_bytes());
|
||||||
hasher.finalize().as_slice().to_vec()
|
hasher.finalize().as_slice().to_vec()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn column_len(&self) -> usize {
|
||||||
|
self.column.as_bytes().len()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl blob::Blob for DaBlob {
|
impl blob::Blob for DaBlob {
|
||||||
|
|
|
@ -16,7 +16,7 @@ use kzgrs::Commitment;
|
||||||
#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
|
||||||
pub struct Chunk(pub Vec<u8>);
|
pub struct Chunk(pub Vec<u8>);
|
||||||
pub struct Row(pub Vec<Chunk>);
|
pub struct Row(pub Vec<Chunk>);
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
pub struct Column(pub Vec<Chunk>);
|
pub struct Column(pub Vec<Chunk>);
|
||||||
pub struct ChunksMatrix(pub Vec<Row>);
|
pub struct ChunksMatrix(pub Vec<Row>);
|
||||||
pub type ColumnIndex = u16;
|
pub type ColumnIndex = u16;
|
||||||
|
|
|
@ -161,7 +161,7 @@ mod tests {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(index, sk)| {
|
.map(|(index, sk)| {
|
||||||
DaVerifier::new(sk, [index as u32].into(), GLOBAL_PARAMETERS.clone())
|
DaVerifier::new(sk, [index as u16].into(), GLOBAL_PARAMETERS.clone())
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
|
|
@ -18,12 +18,12 @@ use crate::encoder::DaEncoderParams;
|
||||||
pub struct DaVerifier {
|
pub struct DaVerifier {
|
||||||
// TODO: substitute this for an abstraction to sign things over
|
// TODO: substitute this for an abstraction to sign things over
|
||||||
pub sk: SecretKey,
|
pub sk: SecretKey,
|
||||||
pub index: HashSet<u32>,
|
pub index: HashSet<u16>,
|
||||||
global_parameters: GlobalParameters,
|
global_parameters: GlobalParameters,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DaVerifier {
|
impl DaVerifier {
|
||||||
pub fn new(sk: SecretKey, index: HashSet<u32>, global_parameters: GlobalParameters) -> Self {
|
pub fn new(sk: SecretKey, index: HashSet<u16>, global_parameters: GlobalParameters) -> Self {
|
||||||
Self {
|
Self {
|
||||||
sk,
|
sk,
|
||||||
index,
|
index,
|
||||||
|
@ -118,7 +118,7 @@ impl DaVerifier {
|
||||||
pub fn verify(&self, blob: &DaBlob, rows_domain_size: usize) -> bool {
|
pub fn verify(&self, blob: &DaBlob, rows_domain_size: usize) -> bool {
|
||||||
let rows_domain = PolynomialEvaluationDomain::new(rows_domain_size)
|
let rows_domain = PolynomialEvaluationDomain::new(rows_domain_size)
|
||||||
.expect("Domain should be able to build");
|
.expect("Domain should be able to build");
|
||||||
let blob_col_idx = &u16::from_be_bytes(blob.column_idx()).into();
|
let blob_col_idx = &u16::from_be_bytes(blob.column_idx());
|
||||||
let index = self.index.get(blob_col_idx).unwrap();
|
let index = self.index.get(blob_col_idx).unwrap();
|
||||||
|
|
||||||
let is_column_verified = DaVerifier::verify_column(
|
let is_column_verified = DaVerifier::verify_column(
|
||||||
|
@ -380,7 +380,7 @@ mod test {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(index, sk)| {
|
.map(|(index, sk)| {
|
||||||
DaVerifier::new(sk, [index as u32].into(), GLOBAL_PARAMETERS.clone())
|
DaVerifier::new(sk, [index as u16].into(), GLOBAL_PARAMETERS.clone())
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
let encoded_data = encoder.encode(&data).unwrap();
|
let encoded_data = encoder.encode(&data).unwrap();
|
||||||
|
|
|
@ -6,5 +6,5 @@ pub mod swarm;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub mod test_utils;
|
pub mod test_utils;
|
||||||
|
|
||||||
pub type SubnetworkId = u32;
|
pub type SubnetworkId = u16;
|
||||||
pub use libp2p::PeerId;
|
pub use libp2p::PeerId;
|
||||||
|
|
|
@ -5,7 +5,7 @@ use std::task::{Context, Poll};
|
||||||
use either::Either;
|
use either::Either;
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use futures::stream::{BoxStream, FuturesUnordered};
|
use futures::stream::{BoxStream, FuturesUnordered};
|
||||||
use futures::{AsyncWriteExt, FutureExt, StreamExt};
|
use futures::{AsyncWriteExt, FutureExt, StreamExt, TryFutureExt};
|
||||||
use libp2p::core::Endpoint;
|
use libp2p::core::Endpoint;
|
||||||
use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished};
|
use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished};
|
||||||
use libp2p::swarm::dial_opts::DialOpts;
|
use libp2p::swarm::dial_opts::DialOpts;
|
||||||
|
@ -30,9 +30,8 @@ use crate::SubnetworkId;
|
||||||
use kzgrs_backend::common::blob::DaBlob;
|
use kzgrs_backend::common::blob::DaBlob;
|
||||||
use nomos_core::da::BlobId;
|
use nomos_core::da::BlobId;
|
||||||
use nomos_da_messages::common::Blob;
|
use nomos_da_messages::common::Blob;
|
||||||
use nomos_da_messages::dispersal::dispersal_res::MessageType;
|
use nomos_da_messages::dispersal;
|
||||||
use nomos_da_messages::dispersal::{DispersalErr, DispersalReq, DispersalRes};
|
use nomos_da_messages::packing::{pack_to_writer, unpack_from_reader};
|
||||||
use nomos_da_messages::{pack_message, unpack_from_reader};
|
|
||||||
use subnetworks_assignations::MembershipHandler;
|
use subnetworks_assignations::MembershipHandler;
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
|
@ -52,7 +51,7 @@ pub enum DispersalError {
|
||||||
#[error("Dispersal response error: {error:?}")]
|
#[error("Dispersal response error: {error:?}")]
|
||||||
Protocol {
|
Protocol {
|
||||||
subnetwork_id: SubnetworkId,
|
subnetwork_id: SubnetworkId,
|
||||||
error: DispersalErr,
|
error: dispersal::DispersalError,
|
||||||
},
|
},
|
||||||
#[error("Error dialing peer [{peer_id}]: {error}")]
|
#[error("Error dialing peer [{peer_id}]: {error}")]
|
||||||
OpenStreamError {
|
OpenStreamError {
|
||||||
|
@ -67,9 +66,9 @@ impl DispersalError {
|
||||||
DispersalError::Io { blob_id, .. } => Some(*blob_id),
|
DispersalError::Io { blob_id, .. } => Some(*blob_id),
|
||||||
DispersalError::Serialization { blob_id, .. } => Some(*blob_id),
|
DispersalError::Serialization { blob_id, .. } => Some(*blob_id),
|
||||||
DispersalError::Protocol {
|
DispersalError::Protocol {
|
||||||
error: DispersalErr { blob_id, .. },
|
error: dispersal::DispersalError { blob_id, .. },
|
||||||
..
|
..
|
||||||
} => Some(blob_id.clone().try_into().unwrap()),
|
} => Some(*blob_id),
|
||||||
DispersalError::OpenStreamError { .. } => None,
|
DispersalError::OpenStreamError { .. } => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -147,7 +146,12 @@ struct DispersalStream {
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
}
|
}
|
||||||
|
|
||||||
type StreamHandlerFutureSuccess = (BlobId, SubnetworkId, DispersalRes, DispersalStream);
|
type StreamHandlerFutureSuccess = (
|
||||||
|
BlobId,
|
||||||
|
SubnetworkId,
|
||||||
|
dispersal::DispersalResponse,
|
||||||
|
DispersalStream,
|
||||||
|
);
|
||||||
type StreamHandlerFuture = BoxFuture<'static, Result<StreamHandlerFutureSuccess, DispersalError>>;
|
type StreamHandlerFuture = BoxFuture<'static, Result<StreamHandlerFutureSuccess, DispersalError>>;
|
||||||
|
|
||||||
/// Executor dispersal protocol
|
/// Executor dispersal protocol
|
||||||
|
@ -260,52 +264,33 @@ where
|
||||||
subnetwork_id: SubnetworkId,
|
subnetwork_id: SubnetworkId,
|
||||||
) -> Result<StreamHandlerFutureSuccess, DispersalError> {
|
) -> Result<StreamHandlerFutureSuccess, DispersalError> {
|
||||||
let blob_id = message.id();
|
let blob_id = message.id();
|
||||||
let blob = bincode::serialize(&message).map_err(|error| DispersalError::Serialization {
|
let blob_id: BlobId = blob_id.clone().try_into().unwrap();
|
||||||
error,
|
let message = dispersal::DispersalRequest::new(Blob::new(blob_id, message), subnetwork_id);
|
||||||
blob_id: blob_id.clone().try_into().unwrap(),
|
pack_to_writer(&message, &mut stream.stream)
|
||||||
subnetwork_id,
|
|
||||||
})?;
|
|
||||||
let message = DispersalReq {
|
|
||||||
blob: Some(Blob {
|
|
||||||
blob_id: blob_id.clone(),
|
|
||||||
data: blob,
|
|
||||||
}),
|
|
||||||
subnetwork_id,
|
|
||||||
};
|
|
||||||
stream
|
|
||||||
.stream
|
|
||||||
.write_all(&pack_message(&message).map_err(|error| DispersalError::Io {
|
|
||||||
error,
|
|
||||||
blob_id: blob_id.clone().try_into().unwrap(),
|
|
||||||
subnetwork_id,
|
|
||||||
})?)
|
|
||||||
.await
|
|
||||||
.map_err(|error| DispersalError::Io {
|
.map_err(|error| DispersalError::Io {
|
||||||
error,
|
error,
|
||||||
blob_id: blob_id.clone().try_into().unwrap(),
|
blob_id,
|
||||||
subnetwork_id,
|
subnetwork_id,
|
||||||
})?;
|
})
|
||||||
|
.await?;
|
||||||
stream
|
stream
|
||||||
.stream
|
.stream
|
||||||
.flush()
|
.flush()
|
||||||
.await
|
.await
|
||||||
.map_err(|error| DispersalError::Io {
|
.map_err(|error| DispersalError::Io {
|
||||||
error,
|
error,
|
||||||
blob_id: blob_id.clone().try_into().unwrap(),
|
blob_id,
|
||||||
subnetwork_id,
|
subnetwork_id,
|
||||||
})?;
|
})?;
|
||||||
let response: DispersalRes =
|
let response: dispersal::DispersalResponse = unpack_from_reader(&mut stream.stream)
|
||||||
unpack_from_reader(&mut stream.stream)
|
|
||||||
.await
|
.await
|
||||||
.map_err(|error| DispersalError::Io {
|
.map_err(|error| DispersalError::Io {
|
||||||
error,
|
error,
|
||||||
blob_id: blob_id.clone().try_into().unwrap(),
|
blob_id,
|
||||||
subnetwork_id,
|
subnetwork_id,
|
||||||
})?;
|
})?;
|
||||||
// Safety: blob_id should always be a 32bytes hash, currently is abstracted into a `Vec<u8>`
|
// Safety: blob_id should always be a 32bytes hash
|
||||||
// but probably we should have a `[u8; 32]` wrapped in a custom type `BlobId`
|
Ok((blob_id, subnetwork_id, response, stream))
|
||||||
// TODO: use blob_id when changing types to [u8; 32]
|
|
||||||
Ok((blob_id.try_into().unwrap(), subnetwork_id, response, stream))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run when a stream gets free, if there is a pending task for the stream it will get scheduled to run
|
/// Run when a stream gets free, if there is a pending task for the stream it will get scheduled to run
|
||||||
|
@ -571,10 +556,7 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
|
||||||
// handle the free stream then return the success
|
// handle the free stream then return the success
|
||||||
Self::handle_stream(tasks, to_disperse, idle_streams, stream);
|
Self::handle_stream(tasks, to_disperse, idle_streams, stream);
|
||||||
// return an error if there was an error on the other side of the wire
|
// return an error if there was an error on the other side of the wire
|
||||||
if let DispersalRes {
|
if let dispersal::DispersalResponse::Error(error) = dispersal_response {
|
||||||
message_type: Some(MessageType::Err(error)),
|
|
||||||
} = dispersal_response
|
|
||||||
{
|
|
||||||
return Poll::Ready(ToSwarm::GenerateEvent(
|
return Poll::Ready(ToSwarm::GenerateEvent(
|
||||||
DispersalExecutorEvent::DispersalError {
|
DispersalExecutorEvent::DispersalError {
|
||||||
error: DispersalError::Protocol {
|
error: DispersalError::Protocol {
|
||||||
|
|
|
@ -9,6 +9,7 @@ pub mod test {
|
||||||
DispersalEvent, DispersalValidatorBehaviour,
|
DispersalEvent, DispersalValidatorBehaviour,
|
||||||
};
|
};
|
||||||
use crate::test_utils::AllNeighbours;
|
use crate::test_utils::AllNeighbours;
|
||||||
|
use crate::SubnetworkId;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use kzgrs_backend::common::blob::DaBlob;
|
use kzgrs_backend::common::blob::DaBlob;
|
||||||
use kzgrs_backend::common::Column;
|
use kzgrs_backend::common::Column;
|
||||||
|
@ -23,9 +24,9 @@ pub mod test {
|
||||||
pub fn executor_swarm(
|
pub fn executor_swarm(
|
||||||
addressbook: AddressBook,
|
addressbook: AddressBook,
|
||||||
key: Keypair,
|
key: Keypair,
|
||||||
membership: impl MembershipHandler<NetworkId = u32, Id = PeerId> + 'static,
|
membership: impl MembershipHandler<NetworkId = SubnetworkId, Id = PeerId> + 'static,
|
||||||
) -> libp2p::Swarm<
|
) -> libp2p::Swarm<
|
||||||
DispersalExecutorBehaviour<impl MembershipHandler<NetworkId = u32, Id = PeerId>>,
|
DispersalExecutorBehaviour<impl MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>>,
|
||||||
> {
|
> {
|
||||||
let peer_id = PeerId::from_public_key(&key.public());
|
let peer_id = PeerId::from_public_key(&key.public());
|
||||||
libp2p::SwarmBuilder::with_existing_identity(key)
|
libp2p::SwarmBuilder::with_existing_identity(key)
|
||||||
|
@ -44,9 +45,9 @@ pub mod test {
|
||||||
|
|
||||||
pub fn validator_swarm(
|
pub fn validator_swarm(
|
||||||
key: Keypair,
|
key: Keypair,
|
||||||
membership: impl MembershipHandler<NetworkId = u32, Id = PeerId> + 'static,
|
membership: impl MembershipHandler<NetworkId = SubnetworkId, Id = PeerId> + 'static,
|
||||||
) -> libp2p::Swarm<
|
) -> libp2p::Swarm<
|
||||||
DispersalValidatorBehaviour<impl MembershipHandler<NetworkId = u32, Id = PeerId>>,
|
DispersalValidatorBehaviour<impl MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>>,
|
||||||
> {
|
> {
|
||||||
libp2p::SwarmBuilder::with_existing_identity(key)
|
libp2p::SwarmBuilder::with_existing_identity(key)
|
||||||
.with_tokio()
|
.with_tokio()
|
||||||
|
|
|
@ -12,9 +12,8 @@ use libp2p::swarm::{
|
||||||
use libp2p::{Multiaddr, PeerId, Stream};
|
use libp2p::{Multiaddr, PeerId, Stream};
|
||||||
use libp2p_stream::IncomingStreams;
|
use libp2p_stream::IncomingStreams;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use nomos_da_messages::dispersal::dispersal_res::MessageType;
|
use nomos_da_messages::dispersal;
|
||||||
use nomos_da_messages::dispersal::{DispersalReq, DispersalRes};
|
use nomos_da_messages::packing::{pack_to_writer, unpack_from_reader};
|
||||||
use nomos_da_messages::{pack_message, unpack_from_reader};
|
|
||||||
use std::io::Error;
|
use std::io::Error;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use subnetworks_assignations::MembershipHandler;
|
use subnetworks_assignations::MembershipHandler;
|
||||||
|
@ -22,15 +21,15 @@ use subnetworks_assignations::MembershipHandler;
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum DispersalEvent {
|
pub enum DispersalEvent {
|
||||||
/// Received a n
|
/// Received a n
|
||||||
IncomingMessage { message: DispersalReq },
|
IncomingMessage {
|
||||||
|
message: dispersal::DispersalRequest,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DispersalEvent {
|
impl DispersalEvent {
|
||||||
pub fn blob_size(&self) -> Option<usize> {
|
pub fn blob_size(&self) -> Option<usize> {
|
||||||
match self {
|
match self {
|
||||||
DispersalEvent::IncomingMessage { message } => {
|
DispersalEvent::IncomingMessage { message } => Some(message.blob.data.column_len()),
|
||||||
message.blob.as_ref().map(|blob| blob.data.len())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -38,7 +37,8 @@ impl DispersalEvent {
|
||||||
pub struct DispersalValidatorBehaviour<Membership> {
|
pub struct DispersalValidatorBehaviour<Membership> {
|
||||||
stream_behaviour: libp2p_stream::Behaviour,
|
stream_behaviour: libp2p_stream::Behaviour,
|
||||||
incoming_streams: IncomingStreams,
|
incoming_streams: IncomingStreams,
|
||||||
tasks: FuturesUnordered<BoxFuture<'static, Result<(DispersalReq, Stream), Error>>>,
|
tasks:
|
||||||
|
FuturesUnordered<BoxFuture<'static, Result<(dispersal::DispersalRequest, Stream), Error>>>,
|
||||||
membership: Membership,
|
membership: Membership,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,14 +65,13 @@ impl<Membership: MembershipHandler> DispersalValidatorBehaviour<Membership> {
|
||||||
/// Stream handling messages task.
|
/// Stream handling messages task.
|
||||||
/// This task handles a single message receive. Then it writes up the acknowledgment into the same
|
/// This task handles a single message receive. Then it writes up the acknowledgment into the same
|
||||||
/// stream as response and finish.
|
/// stream as response and finish.
|
||||||
async fn handle_new_stream(mut stream: Stream) -> Result<(DispersalReq, Stream), Error> {
|
async fn handle_new_stream(
|
||||||
let message: DispersalReq = unpack_from_reader(&mut stream).await?;
|
mut stream: Stream,
|
||||||
let blob_id = message.blob.clone().unwrap().blob_id;
|
) -> Result<(dispersal::DispersalRequest, Stream), Error> {
|
||||||
let response = DispersalRes {
|
let message: dispersal::DispersalRequest = unpack_from_reader(&mut stream).await?;
|
||||||
message_type: Some(MessageType::BlobId(blob_id)),
|
let blob_id = message.blob.blob_id;
|
||||||
};
|
let response = dispersal::DispersalResponse::BlobId(blob_id);
|
||||||
let message_bytes = pack_message(&response)?;
|
pack_to_writer(&response, &mut stream).await?;
|
||||||
stream.write_all(&message_bytes).await?;
|
|
||||||
stream.flush().await?;
|
stream.flush().await?;
|
||||||
Ok((message, stream))
|
Ok((message, stream))
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ impl ReplicationEvent {
|
||||||
pub fn blob_size(&self) -> Option<usize> {
|
pub fn blob_size(&self) -> Option<usize> {
|
||||||
match self {
|
match self {
|
||||||
ReplicationEvent::IncomingMessage { message, .. } => {
|
ReplicationEvent::IncomingMessage { message, .. } => {
|
||||||
message.blob.as_ref().map(|blob| blob.data.len())
|
Some(message.blob.data.column_len())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -104,10 +104,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
fn replicate_message(&mut self, message: DaMessage) {
|
fn replicate_message(&mut self, message: DaMessage) {
|
||||||
let message_id = (
|
let message_id = (message.blob.blob_id.to_vec(), message.subnetwork_id);
|
||||||
message.blob.as_ref().unwrap().blob_id.clone(),
|
|
||||||
message.subnetwork_id,
|
|
||||||
);
|
|
||||||
if self.seen_message_cache.contains(&message_id) {
|
if self.seen_message_cache.contains(&message_id) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -246,12 +243,52 @@ where
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use futures::task::{waker_ref, ArcWake};
|
use futures::task::{waker_ref, ArcWake};
|
||||||
|
use kzgrs_backend::common::blob::DaBlob;
|
||||||
|
use kzgrs_backend::encoder;
|
||||||
|
use kzgrs_backend::encoder::DaEncoderParams;
|
||||||
use libp2p::{identity, PeerId};
|
use libp2p::{identity, PeerId};
|
||||||
|
use nomos_core::da::{BlobId, DaEncoder};
|
||||||
use nomos_da_messages::common::Blob;
|
use nomos_da_messages::common::Blob;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
fn get_encoder() -> encoder::DaEncoder {
|
||||||
|
const DOMAIN_SIZE: usize = 16;
|
||||||
|
let params = DaEncoderParams::default_with(DOMAIN_SIZE);
|
||||||
|
encoder::DaEncoder::new(params)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_da_blob() -> DaBlob {
|
||||||
|
let encoder = get_encoder();
|
||||||
|
let data = vec![
|
||||||
|
49u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
|
||||||
|
0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
|
||||||
|
];
|
||||||
|
|
||||||
|
let encoded_data = encoder.encode(&data).unwrap();
|
||||||
|
let columns: Vec<_> = encoded_data.extended_data.columns().collect();
|
||||||
|
|
||||||
|
let index = 0;
|
||||||
|
let da_blob = DaBlob {
|
||||||
|
column: columns[index].clone(),
|
||||||
|
column_idx: index
|
||||||
|
.try_into()
|
||||||
|
.expect("Column index shouldn't overflow the target type"),
|
||||||
|
column_commitment: encoded_data.column_commitments[index],
|
||||||
|
aggregated_column_commitment: encoded_data.aggregated_column_commitment,
|
||||||
|
aggregated_column_proof: encoded_data.aggregated_column_proofs[index],
|
||||||
|
rows_commitments: encoded_data.row_commitments.clone(),
|
||||||
|
rows_proofs: encoded_data
|
||||||
|
.rows_proofs
|
||||||
|
.iter()
|
||||||
|
.map(|proofs| proofs.get(index).cloned().unwrap())
|
||||||
|
.collect(),
|
||||||
|
};
|
||||||
|
|
||||||
|
da_blob
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
struct MockMembershipHandler {
|
struct MockMembershipHandler {
|
||||||
membership: HashMap<PeerId, HashSet<SubnetworkId>>,
|
membership: HashMap<PeerId, HashSet<SubnetworkId>>,
|
||||||
|
@ -295,7 +332,7 @@ mod tests {
|
||||||
|
|
||||||
fn create_replication_behaviours(
|
fn create_replication_behaviours(
|
||||||
num_instances: usize,
|
num_instances: usize,
|
||||||
subnet_id: u32,
|
subnetwork_id: SubnetworkId,
|
||||||
membership: &mut HashMap<PeerId, HashSet<SubnetworkId>>,
|
membership: &mut HashMap<PeerId, HashSet<SubnetworkId>>,
|
||||||
) -> Vec<ReplicationBehaviour<MockMembershipHandler>> {
|
) -> Vec<ReplicationBehaviour<MockMembershipHandler>> {
|
||||||
let mut behaviours = Vec::new();
|
let mut behaviours = Vec::new();
|
||||||
|
@ -308,7 +345,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
for peer_id in &peer_ids {
|
for peer_id in &peer_ids {
|
||||||
membership.insert(*peer_id, HashSet::from([subnet_id]));
|
membership.insert(*peer_id, HashSet::from([subnetwork_id]));
|
||||||
}
|
}
|
||||||
|
|
||||||
let membership_handler = MockMembershipHandler {
|
let membership_handler = MockMembershipHandler {
|
||||||
|
@ -397,13 +434,7 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Simulate sending a message from the first behavior.
|
// Simulate sending a message from the first behavior.
|
||||||
let message = DaMessage {
|
let message = DaMessage::new(Blob::new(BlobId::from([0; 32]), get_da_blob()), 0);
|
||||||
blob: Some(Blob {
|
|
||||||
blob_id: vec![1, 2, 3],
|
|
||||||
data: vec![4, 5, 6],
|
|
||||||
}),
|
|
||||||
subnetwork_id: 0,
|
|
||||||
};
|
|
||||||
all_behaviours[0].replicate_message(message.clone());
|
all_behaviours[0].replicate_message(message.clone());
|
||||||
|
|
||||||
let waker = Arc::new(TestWaker);
|
let waker = Arc::new(TestWaker);
|
||||||
|
@ -473,7 +504,7 @@ mod tests {
|
||||||
for behaviour in &subnet_0_behaviours {
|
for behaviour in &subnet_0_behaviours {
|
||||||
assert!(behaviour
|
assert!(behaviour
|
||||||
.seen_message_cache
|
.seen_message_cache
|
||||||
.contains(&(vec![1, 2, 3], message.subnetwork_id)));
|
.contains(&([0; 32].to_vec(), message.subnetwork_id)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assert that no members of other subnets have received the message.
|
// Assert that no members of other subnets have received the message.
|
||||||
|
|
|
@ -1,26 +1,24 @@
|
||||||
// std
|
// std
|
||||||
use std::io::Error;
|
use std::io::Error;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use futures::Future;
|
|
||||||
// crates
|
// crates
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
|
use futures::Future;
|
||||||
use libp2p::core::upgrade::ReadyUpgrade;
|
use libp2p::core::upgrade::ReadyUpgrade;
|
||||||
use libp2p::swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound};
|
use libp2p::swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound};
|
||||||
use libp2p::swarm::{ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol};
|
use libp2p::swarm::{ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol};
|
||||||
use libp2p::{Stream, StreamProtocol};
|
use libp2p::{Stream, StreamProtocol};
|
||||||
use log::trace;
|
use log::trace;
|
||||||
|
use nomos_da_messages::packing::{pack_to_writer, unpack_from_reader};
|
||||||
use tracing::error;
|
use tracing::error;
|
||||||
|
|
||||||
// internal
|
// internal
|
||||||
use nomos_da_messages::{pack_message, unpack_from_reader};
|
|
||||||
|
|
||||||
use crate::protocol::REPLICATION_PROTOCOL;
|
use crate::protocol::REPLICATION_PROTOCOL;
|
||||||
|
|
||||||
pub type DaMessage = nomos_da_messages::replication::ReplicationReq;
|
pub type DaMessage = nomos_da_messages::replication::ReplicationRequest;
|
||||||
|
|
||||||
/// Events that bubbles up from the `BroadcastHandler` to the `NetworkBehaviour`
|
/// Events that bubbles up from the `BroadcastHandler` to the `NetworkBehaviour
|
||||||
|
#[allow(clippy::large_enum_variant)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum HandlerEventToBehaviour {
|
pub enum HandlerEventToBehaviour {
|
||||||
IncomingMessage { message: DaMessage },
|
IncomingMessage { message: DaMessage },
|
||||||
|
@ -92,11 +90,19 @@ impl ReplicationHandler {
|
||||||
std::mem::swap(&mut self.outgoing_messages, &mut pending_messages);
|
std::mem::swap(&mut self.outgoing_messages, &mut pending_messages);
|
||||||
async {
|
async {
|
||||||
trace!("Writing {} messages", pending_messages.len());
|
trace!("Writing {} messages", pending_messages.len());
|
||||||
|
|
||||||
for message in pending_messages {
|
for message in pending_messages {
|
||||||
let bytes = pack_message(&message)?;
|
pack_to_writer(&message, &mut stream)
|
||||||
stream.write_all(&bytes).await?;
|
.await
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
panic!(
|
||||||
|
"Message should always be serializable.\nMessage: '{:?}'",
|
||||||
|
message
|
||||||
|
)
|
||||||
|
});
|
||||||
stream.flush().await?;
|
stream.flush().await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(stream)
|
Ok(stream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -107,8 +113,8 @@ impl ReplicationHandler {
|
||||||
) -> impl Future<Output = Result<(DaMessage, Stream), Error>> {
|
) -> impl Future<Output = Result<(DaMessage, Stream), Error>> {
|
||||||
trace!("Reading messages");
|
trace!("Reading messages");
|
||||||
async move {
|
async move {
|
||||||
let msg: DaMessage = unpack_from_reader(&mut stream).await?;
|
let unpacked_message = unpack_from_reader(&mut stream).await?;
|
||||||
Ok((msg, stream))
|
Ok((unpacked_message, stream))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,20 +3,61 @@ pub mod handler;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
|
use crate::protocols::replication::behaviour::{ReplicationBehaviour, ReplicationEvent};
|
||||||
|
use crate::protocols::replication::handler::DaMessage;
|
||||||
|
use crate::test_utils::AllNeighbours;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
use kzgrs_backend::common::blob::DaBlob;
|
||||||
|
use kzgrs_backend::encoder;
|
||||||
|
use kzgrs_backend::encoder::DaEncoderParams;
|
||||||
use libp2p::identity::Keypair;
|
use libp2p::identity::Keypair;
|
||||||
use libp2p::swarm::SwarmEvent;
|
use libp2p::swarm::SwarmEvent;
|
||||||
use libp2p::{quic, Multiaddr, PeerId, Swarm};
|
use libp2p::{quic, Multiaddr, PeerId, Swarm};
|
||||||
use log::info;
|
use log::info;
|
||||||
|
use nomos_core::da::{BlobId, DaEncoder};
|
||||||
|
use nomos_da_messages::common::Blob;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tracing_subscriber::fmt::TestWriter;
|
use tracing_subscriber::fmt::TestWriter;
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
|
|
||||||
use nomos_da_messages::common::Blob;
|
fn get_encoder() -> encoder::DaEncoder {
|
||||||
|
const DOMAIN_SIZE: usize = 16;
|
||||||
|
let params = DaEncoderParams::default_with(DOMAIN_SIZE);
|
||||||
|
encoder::DaEncoder::new(params)
|
||||||
|
}
|
||||||
|
|
||||||
use crate::protocols::replication::behaviour::{ReplicationBehaviour, ReplicationEvent};
|
fn get_da_blob(data: Option<Vec<u8>>) -> DaBlob {
|
||||||
use crate::protocols::replication::handler::DaMessage;
|
let encoder = get_encoder();
|
||||||
use crate::test_utils::AllNeighbours;
|
|
||||||
|
let data = data.unwrap_or_else(|| {
|
||||||
|
vec![
|
||||||
|
49u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
|
||||||
|
0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
|
||||||
|
]
|
||||||
|
});
|
||||||
|
|
||||||
|
let encoded_data = encoder.encode(&data).unwrap();
|
||||||
|
let columns: Vec<_> = encoded_data.extended_data.columns().collect();
|
||||||
|
|
||||||
|
let index = 0;
|
||||||
|
let da_blob = DaBlob {
|
||||||
|
column: columns[index].clone(),
|
||||||
|
column_idx: index
|
||||||
|
.try_into()
|
||||||
|
.expect("Column index shouldn't overflow the target type"),
|
||||||
|
column_commitment: encoded_data.column_commitments[index],
|
||||||
|
aggregated_column_commitment: encoded_data.aggregated_column_commitment,
|
||||||
|
aggregated_column_proof: encoded_data.aggregated_column_proofs[index],
|
||||||
|
rows_commitments: encoded_data.row_commitments.clone(),
|
||||||
|
rows_proofs: encoded_data
|
||||||
|
.rows_proofs
|
||||||
|
.iter()
|
||||||
|
.map(|proofs| proofs.get(index).cloned().unwrap())
|
||||||
|
.collect(),
|
||||||
|
};
|
||||||
|
|
||||||
|
da_blob
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_connects_and_receives_replication_messages() {
|
async fn test_connects_and_receives_replication_messages() {
|
||||||
|
@ -94,13 +135,19 @@ mod test {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
// send a message everytime that the channel ticks
|
// send a message everytime that the channel ticks
|
||||||
_ = receiver.recv() => {
|
_ = receiver.recv() => {
|
||||||
swarm_2.behaviour_mut().send_message(DaMessage {
|
// let blob_id_bytes: [u8; 32] = i.to_be_bytes().to_vec().try_into().unwrap();
|
||||||
blob: Some(Blob {
|
|
||||||
blob_id: i.to_be_bytes().to_vec(),
|
let mut blob_id_bytes = [0; 32];
|
||||||
data: i.to_be_bytes().to_vec(),
|
let b = i.to_be_bytes();
|
||||||
}),
|
assert!(b.len() <= blob_id_bytes.len());
|
||||||
subnetwork_id: 0,
|
blob_id_bytes[0..b.len()].copy_from_slice(&b);
|
||||||
});
|
assert_eq!(blob_id_bytes.len(), 32);
|
||||||
|
|
||||||
|
let blob = Blob::new(
|
||||||
|
BlobId::from(blob_id_bytes),
|
||||||
|
get_da_blob(None)
|
||||||
|
);
|
||||||
|
swarm_2.behaviour_mut().send_message(DaMessage::new(blob, 0));
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
// print out events
|
// print out events
|
||||||
|
|
|
@ -7,7 +7,7 @@ use futures::channel::oneshot;
|
||||||
use futures::channel::oneshot::{Canceled, Receiver, Sender};
|
use futures::channel::oneshot::{Canceled, Receiver, Sender};
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use futures::stream::{BoxStream, FuturesUnordered};
|
use futures::stream::{BoxStream, FuturesUnordered};
|
||||||
use futures::{AsyncWriteExt, FutureExt, StreamExt};
|
use futures::{AsyncWriteExt, FutureExt, StreamExt, TryFutureExt};
|
||||||
use kzgrs_backend::common::blob::DaBlob;
|
use kzgrs_backend::common::blob::DaBlob;
|
||||||
use kzgrs_backend::common::ColumnIndex;
|
use kzgrs_backend::common::ColumnIndex;
|
||||||
use libp2p::core::Endpoint;
|
use libp2p::core::Endpoint;
|
||||||
|
@ -19,9 +19,8 @@ use libp2p::swarm::{
|
||||||
use libp2p::{Multiaddr, PeerId, Stream};
|
use libp2p::{Multiaddr, PeerId, Stream};
|
||||||
use libp2p_stream::{Control, IncomingStreams, OpenStreamError};
|
use libp2p_stream::{Control, IncomingStreams, OpenStreamError};
|
||||||
use nomos_core::da::BlobId;
|
use nomos_core::da::BlobId;
|
||||||
use nomos_da_messages::sampling::sample_err::SampleErrType;
|
use nomos_da_messages::packing::{pack_to_writer, unpack_from_reader};
|
||||||
use nomos_da_messages::sampling::{sample_res, SampleErr, SampleReq, SampleRes};
|
use nomos_da_messages::{common, sampling};
|
||||||
use nomos_da_messages::{common, pack_message, unpack_from_reader};
|
|
||||||
use subnetworks_assignations::MembershipHandler;
|
use subnetworks_assignations::MembershipHandler;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
@ -45,7 +44,7 @@ pub enum SamplingError {
|
||||||
Protocol {
|
Protocol {
|
||||||
subnetwork_id: SubnetworkId,
|
subnetwork_id: SubnetworkId,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
error: SampleErr,
|
error: sampling::SampleError,
|
||||||
},
|
},
|
||||||
#[error("Error dialing peer [{peer_id}]: {error}")]
|
#[error("Error dialing peer [{peer_id}]: {error}")]
|
||||||
OpenStream {
|
OpenStream {
|
||||||
|
@ -162,12 +161,14 @@ pub struct BehaviourSampleReq {
|
||||||
pub column_idx: ColumnIndex,
|
pub column_idx: ColumnIndex,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<SampleReq> for BehaviourSampleReq {
|
impl TryFrom<sampling::SampleRequest> for BehaviourSampleReq {
|
||||||
type Error = Vec<u8>;
|
type Error = Vec<u8>;
|
||||||
|
|
||||||
fn try_from(req: SampleReq) -> Result<Self, Self::Error> {
|
fn try_from(req: sampling::SampleRequest) -> Result<Self, Self::Error> {
|
||||||
let blob_id: BlobId = req.blob_id.try_into()?;
|
let sampling::SampleRequest {
|
||||||
let column_idx = req.column_idx as u16; // TODO: This doesn't handle the overflow.
|
blob_id,
|
||||||
|
column_idx,
|
||||||
|
} = req;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
blob_id,
|
blob_id,
|
||||||
|
@ -188,23 +189,19 @@ pub enum BehaviourSampleRes {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<BehaviourSampleRes> for SampleRes {
|
impl From<BehaviourSampleRes> for sampling::SampleResponse {
|
||||||
fn from(res: BehaviourSampleRes) -> Self {
|
fn from(res: BehaviourSampleRes) -> Self {
|
||||||
match res {
|
match res {
|
||||||
BehaviourSampleRes::SamplingSuccess { blob, blob_id, .. } => SampleRes {
|
BehaviourSampleRes::SamplingSuccess { blob, blob_id, .. } => {
|
||||||
message_type: Some(sample_res::MessageType::Blob(common::Blob {
|
sampling::SampleResponse::Blob(common::Blob::new(blob_id, *blob))
|
||||||
blob_id: blob_id.to_vec(),
|
}
|
||||||
data: bincode::serialize(&blob)
|
BehaviourSampleRes::SampleNotFound { blob_id } => {
|
||||||
.expect("Blob from service should be serializable"),
|
sampling::SampleResponse::Error(sampling::SampleError::new(
|
||||||
})),
|
blob_id,
|
||||||
},
|
sampling::SampleErrorType::NotFound,
|
||||||
BehaviourSampleRes::SampleNotFound { blob_id } => SampleRes {
|
"Sample not found",
|
||||||
message_type: Some(sample_res::MessageType::Err(SampleErr {
|
))
|
||||||
blob_id: blob_id.into(),
|
}
|
||||||
err_type: SampleErrType::NotFound.into(),
|
|
||||||
err_description: "Sample not found".to_string(),
|
|
||||||
})),
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -238,7 +235,7 @@ struct ResponseChannel {
|
||||||
response_receiver: Receiver<BehaviourSampleRes>,
|
response_receiver: Receiver<BehaviourSampleRes>,
|
||||||
}
|
}
|
||||||
|
|
||||||
type StreamHandlerFutureSuccess = (BlobId, SubnetworkId, SampleRes, SampleStream);
|
type StreamHandlerFutureSuccess = (BlobId, SubnetworkId, sampling::SampleResponse, SampleStream);
|
||||||
type OutgoingStreamHandlerFuture =
|
type OutgoingStreamHandlerFuture =
|
||||||
BoxFuture<'static, Result<StreamHandlerFutureSuccess, SamplingError>>;
|
BoxFuture<'static, Result<StreamHandlerFutureSuccess, SamplingError>>;
|
||||||
type IncomingStreamHandlerFuture = BoxFuture<'static, Result<SampleStream, SamplingError>>;
|
type IncomingStreamHandlerFuture = BoxFuture<'static, Result<SampleStream, SamplingError>>;
|
||||||
|
@ -333,40 +330,24 @@ where
|
||||||
/// Writes the request to the stream and waits for a response
|
/// Writes the request to the stream and waits for a response
|
||||||
async fn stream_sample(
|
async fn stream_sample(
|
||||||
mut stream: SampleStream,
|
mut stream: SampleStream,
|
||||||
message: SampleReq,
|
message: sampling::SampleRequest,
|
||||||
subnetwork_id: SubnetworkId,
|
subnetwork_id: SubnetworkId,
|
||||||
blob_id: BlobId,
|
blob_id: BlobId,
|
||||||
) -> Result<StreamHandlerFutureSuccess, SamplingError> {
|
) -> Result<StreamHandlerFutureSuccess, SamplingError> {
|
||||||
let bytes = pack_message(&message).map_err(|error| SamplingError::Io {
|
let into_sampling_error = |error: std::io::Error| -> SamplingError {
|
||||||
|
SamplingError::Io {
|
||||||
peer_id: stream.peer_id,
|
peer_id: stream.peer_id,
|
||||||
error,
|
error,
|
||||||
})?;
|
}
|
||||||
stream
|
};
|
||||||
.stream
|
pack_to_writer(&message, &mut stream.stream)
|
||||||
.write_all(&bytes)
|
.map_err(into_sampling_error)
|
||||||
|
.await?;
|
||||||
|
stream.stream.flush().await.map_err(into_sampling_error)?;
|
||||||
|
let response: sampling::SampleResponse = unpack_from_reader(&mut stream.stream)
|
||||||
.await
|
.await
|
||||||
.map_err(|error| SamplingError::Io {
|
.map_err(into_sampling_error)?;
|
||||||
peer_id: stream.peer_id,
|
// Safety: blob_id should always be a 32bytes hash
|
||||||
error,
|
|
||||||
})?;
|
|
||||||
stream
|
|
||||||
.stream
|
|
||||||
.flush()
|
|
||||||
.await
|
|
||||||
.map_err(|error| SamplingError::Io {
|
|
||||||
peer_id: stream.peer_id,
|
|
||||||
error,
|
|
||||||
})?;
|
|
||||||
let response: SampleRes =
|
|
||||||
unpack_from_reader(&mut stream.stream)
|
|
||||||
.await
|
|
||||||
.map_err(|error| SamplingError::Io {
|
|
||||||
peer_id: stream.peer_id,
|
|
||||||
error,
|
|
||||||
})?;
|
|
||||||
// Safety: blob_id should always be a 32bytes hash, currently is abstracted into a `Vec<u8>`
|
|
||||||
// but probably we should have a `[u8; 32]` wrapped in a custom type `BlobId`
|
|
||||||
// TODO: use blob_id when changing types to [u8; 32]
|
|
||||||
Ok((blob_id, subnetwork_id, response, stream))
|
Ok((blob_id, subnetwork_id, response, stream))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -389,12 +370,10 @@ where
|
||||||
mut stream: SampleStream,
|
mut stream: SampleStream,
|
||||||
) {
|
) {
|
||||||
let peer = stream.peer_id;
|
let peer = stream.peer_id;
|
||||||
|
|
||||||
// If there is a pending task schedule next one
|
// If there is a pending task schedule next one
|
||||||
if let Some((subnetwork_id, blob_id)) = Self::next_request(&peer, to_sample) {
|
if let Some((subnetwork_id, blob_id)) = Self::next_request(&peer, to_sample) {
|
||||||
let sample_request = SampleReq {
|
let sample_request = sampling::SampleRequest::new(blob_id, subnetwork_id);
|
||||||
blob_id: blob_id.to_vec(),
|
|
||||||
column_idx: subnetwork_id,
|
|
||||||
};
|
|
||||||
outgoing_tasks
|
outgoing_tasks
|
||||||
.push(Self::stream_sample(stream, sample_request, subnetwork_id, blob_id).boxed());
|
.push(Self::stream_sample(stream, sample_request, subnetwork_id, blob_id).boxed());
|
||||||
// if not pop stream from connected ones
|
// if not pop stream from connected ones
|
||||||
|
@ -414,7 +393,7 @@ where
|
||||||
mut stream: SampleStream,
|
mut stream: SampleStream,
|
||||||
channel: ResponseChannel,
|
channel: ResponseChannel,
|
||||||
) -> Result<SampleStream, SamplingError> {
|
) -> Result<SampleStream, SamplingError> {
|
||||||
let request: SampleReq = unpack_from_reader(&mut stream.stream)
|
let request: sampling::SampleRequest = unpack_from_reader(&mut stream.stream)
|
||||||
.await
|
.await
|
||||||
.map_err(|error| SamplingError::Io {
|
.map_err(|error| SamplingError::Io {
|
||||||
peer_id: stream.peer_id,
|
peer_id: stream.peer_id,
|
||||||
|
@ -433,7 +412,7 @@ where
|
||||||
request,
|
request,
|
||||||
peer_id: stream.peer_id,
|
peer_id: stream.peer_id,
|
||||||
})?;
|
})?;
|
||||||
let response: SampleRes = channel
|
let response: sampling::SampleResponse = channel
|
||||||
.response_receiver
|
.response_receiver
|
||||||
.await
|
.await
|
||||||
.map_err(|error| SamplingError::ResponseChannel {
|
.map_err(|error| SamplingError::ResponseChannel {
|
||||||
|
@ -441,18 +420,12 @@ where
|
||||||
peer_id: stream.peer_id,
|
peer_id: stream.peer_id,
|
||||||
})?
|
})?
|
||||||
.into();
|
.into();
|
||||||
let bytes = pack_message(&response).map_err(|error| SamplingError::Io {
|
pack_to_writer(&response, &mut stream.stream)
|
||||||
peer_id: stream.peer_id,
|
|
||||||
error,
|
|
||||||
})?;
|
|
||||||
stream
|
|
||||||
.stream
|
|
||||||
.write_all(&bytes)
|
|
||||||
.await
|
|
||||||
.map_err(|error| SamplingError::Io {
|
.map_err(|error| SamplingError::Io {
|
||||||
peer_id: stream.peer_id,
|
peer_id: stream.peer_id,
|
||||||
error,
|
error,
|
||||||
})?;
|
})
|
||||||
|
.await?;
|
||||||
stream
|
stream
|
||||||
.stream
|
.stream
|
||||||
.flush()
|
.flush()
|
||||||
|
@ -516,10 +489,7 @@ impl<Membership: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'sta
|
||||||
} else {
|
} else {
|
||||||
connected_peers.insert(peer);
|
connected_peers.insert(peer);
|
||||||
let control = control.clone();
|
let control = control.clone();
|
||||||
let sample_request = SampleReq {
|
let sample_request = sampling::SampleRequest::new(blob_id, subnetwork_id);
|
||||||
blob_id: blob_id.to_vec(),
|
|
||||||
column_idx: subnetwork_id,
|
|
||||||
};
|
|
||||||
let with_dial_task: OutgoingStreamHandlerFuture = async move {
|
let with_dial_task: OutgoingStreamHandlerFuture = async move {
|
||||||
let stream = Self::open_stream(peer, control).await?;
|
let stream = Self::open_stream(peer, control).await?;
|
||||||
Self::stream_sample(stream, sample_request, subnetwork_id, blob_id).await
|
Self::stream_sample(stream, sample_request, subnetwork_id, blob_id).await
|
||||||
|
@ -533,13 +503,11 @@ impl<Membership: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'sta
|
||||||
fn handle_sample_response(
|
fn handle_sample_response(
|
||||||
blob_id: BlobId,
|
blob_id: BlobId,
|
||||||
subnetwork_id: SubnetworkId,
|
subnetwork_id: SubnetworkId,
|
||||||
sample_response: SampleRes,
|
sample_response: sampling::SampleResponse,
|
||||||
peer_id: PeerId,
|
peer_id: PeerId,
|
||||||
) -> Option<Poll<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>> {
|
) -> Option<Poll<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>> {
|
||||||
match sample_response {
|
match sample_response {
|
||||||
SampleRes {
|
sampling::SampleResponse::Error(error) => Some(Poll::Ready(ToSwarm::GenerateEvent(
|
||||||
message_type: Some(sample_res::MessageType::Err(error)),
|
|
||||||
} => Some(Poll::Ready(ToSwarm::GenerateEvent(
|
|
||||||
SamplingEvent::SamplingError {
|
SamplingEvent::SamplingError {
|
||||||
error: SamplingError::Protocol {
|
error: SamplingError::Protocol {
|
||||||
subnetwork_id,
|
subnetwork_id,
|
||||||
|
@ -548,35 +516,13 @@ impl<Membership: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'sta
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
))),
|
))),
|
||||||
SampleRes {
|
sampling::SampleResponse::Blob(blob) => Some(Poll::Ready(ToSwarm::GenerateEvent(
|
||||||
message_type: Some(sample_res::MessageType::Blob(da_blob)),
|
|
||||||
} => {
|
|
||||||
let blob =
|
|
||||||
bincode::deserialize::<DaBlob>(da_blob.data.as_slice()).map_err(|error| {
|
|
||||||
SamplingError::Deserialize {
|
|
||||||
blob_id,
|
|
||||||
subnetwork_id,
|
|
||||||
peer_id,
|
|
||||||
error,
|
|
||||||
}
|
|
||||||
});
|
|
||||||
match blob {
|
|
||||||
Ok(blob) => Some(Poll::Ready(ToSwarm::GenerateEvent(
|
|
||||||
SamplingEvent::SamplingSuccess {
|
SamplingEvent::SamplingSuccess {
|
||||||
blob_id,
|
blob_id,
|
||||||
subnetwork_id,
|
subnetwork_id,
|
||||||
blob: Box::new(blob),
|
blob: Box::new(blob.data),
|
||||||
},
|
},
|
||||||
))),
|
))),
|
||||||
Err(error) => Some(Poll::Ready(ToSwarm::GenerateEvent(
|
|
||||||
SamplingEvent::SamplingError { error },
|
|
||||||
))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
error!("Invalid sampling response received, empty body");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ use crate::SubnetworkId;
|
||||||
use kzgrs_backend::common::blob::DaBlob;
|
use kzgrs_backend::common::blob::DaBlob;
|
||||||
use libp2p::PeerId;
|
use libp2p::PeerId;
|
||||||
use log::{debug, error};
|
use log::{debug, error};
|
||||||
use nomos_da_messages::replication::ReplicationReq;
|
use nomos_da_messages::replication;
|
||||||
use subnetworks_assignations::MembershipHandler;
|
use subnetworks_assignations::MembershipHandler;
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
|
|
||||||
|
@ -19,22 +19,14 @@ pub async fn handle_validator_dispersal_event<Membership>(
|
||||||
match event {
|
match event {
|
||||||
// Send message for replication
|
// Send message for replication
|
||||||
DispersalEvent::IncomingMessage { message } => {
|
DispersalEvent::IncomingMessage { message } => {
|
||||||
if let Ok(blob) = bincode::deserialize::<DaBlob>(
|
let blob_message = message.blob;
|
||||||
message
|
if let Err(e) = validation_events_sender.send(blob_message.data.clone()) {
|
||||||
.blob
|
|
||||||
.as_ref()
|
|
||||||
.expect("Message blob should not be empty")
|
|
||||||
.data
|
|
||||||
.as_slice(),
|
|
||||||
) {
|
|
||||||
if let Err(e) = validation_events_sender.send(blob) {
|
|
||||||
error!("Error sending blob to validation: {e:?}");
|
error!("Error sending blob to validation: {e:?}");
|
||||||
}
|
}
|
||||||
}
|
replication_behaviour.send_message(replication::ReplicationRequest::new(
|
||||||
replication_behaviour.send_message(ReplicationReq {
|
blob_message,
|
||||||
blob: message.blob,
|
message.subnetwork_id,
|
||||||
subnetwork_id: message.subnetwork_id,
|
));
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -53,16 +45,7 @@ pub async fn handle_replication_event(
|
||||||
event: ReplicationEvent,
|
event: ReplicationEvent,
|
||||||
) {
|
) {
|
||||||
let ReplicationEvent::IncomingMessage { message, .. } = event;
|
let ReplicationEvent::IncomingMessage { message, .. } = event;
|
||||||
if let Ok(blob) = bincode::deserialize::<DaBlob>(
|
if let Err(e) = validation_events_sender.send(message.blob.data) {
|
||||||
message
|
|
||||||
.blob
|
|
||||||
.as_ref()
|
|
||||||
.expect("Message blob should not be empty")
|
|
||||||
.data
|
|
||||||
.as_slice(),
|
|
||||||
) {
|
|
||||||
if let Err(e) = validation_events_sender.send(blob) {
|
|
||||||
error!("Error sending blob to validation: {e:?}");
|
error!("Error sending blob to validation: {e:?}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::SubnetworkId;
|
||||||
use libp2p::PeerId;
|
use libp2p::PeerId;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use subnetworks_assignations::MembershipHandler;
|
use subnetworks_assignations::MembershipHandler;
|
||||||
|
@ -8,7 +9,7 @@ pub struct AllNeighbours {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MembershipHandler for AllNeighbours {
|
impl MembershipHandler for AllNeighbours {
|
||||||
type NetworkId = u32;
|
type NetworkId = SubnetworkId;
|
||||||
type Id = PeerId;
|
type Id = PeerId;
|
||||||
|
|
||||||
fn membership(&self, _self_id: &Self::Id) -> HashSet<Self::NetworkId> {
|
fn membership(&self, _self_id: &Self::Id) -> HashSet<Self::NetworkId> {
|
||||||
|
|
|
@ -4,12 +4,11 @@ version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bytes = "1.6.1"
|
nomos-core = { path = "../../../nomos-core/chain-defs" }
|
||||||
futures = "0.3.30"
|
kzgrs-backend = { path = "../../kzgrs-backend" }
|
||||||
prost = "0.13.1"
|
serde = { version = "1.0.215", features = ["derive"] }
|
||||||
|
futures = "0.3.31"
|
||||||
[build-dependencies]
|
tokio = "1"
|
||||||
prost-build = "0.13.1"
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "1", features = ["rt", "macros"] }
|
tokio = { version = "1", features = ["macros"] }
|
||||||
|
|
|
@ -1,16 +0,0 @@
|
||||||
use std::{env, path::PathBuf};
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
let project_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap());
|
|
||||||
let proto_includes = project_dir.join("proto");
|
|
||||||
|
|
||||||
let dispersal_proto = project_dir.join("proto/dispersal.proto");
|
|
||||||
let replication_proto = project_dir.join("proto/replication.proto");
|
|
||||||
let sampling_proto = project_dir.join("proto/sampling.proto");
|
|
||||||
|
|
||||||
prost_build::compile_protos(
|
|
||||||
&[dispersal_proto, replication_proto, sampling_proto],
|
|
||||||
&[proto_includes],
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
use kzgrs_backend::common::blob::DaBlob;
|
||||||
|
use nomos_core::da::BlobId;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct Blob {
|
||||||
|
pub blob_id: BlobId,
|
||||||
|
pub data: DaBlob,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Blob {
|
||||||
|
pub fn new(blob_id: BlobId, data: DaBlob) -> Self {
|
||||||
|
Self { blob_id, data }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(u8)]
|
||||||
|
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub enum CloseMessageReason {
|
||||||
|
GracefulShutdown = 0,
|
||||||
|
SubnetChange = 1,
|
||||||
|
SubnetSampleFailure = 2,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct CloseMessage {
|
||||||
|
pub reason: CloseMessageReason,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CloseMessage {
|
||||||
|
pub fn new(reason: CloseMessageReason) -> Self {
|
||||||
|
CloseMessage { reason }
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,29 +1,56 @@
|
||||||
use crate::{common, impl_from_for_message};
|
use crate::common::Blob;
|
||||||
|
use crate::SubnetworkId;
|
||||||
|
use nomos_core::da::BlobId;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
include!(concat!(env!("OUT_DIR"), "/nomos.da.v1.dispersal.rs"));
|
#[repr(C)]
|
||||||
|
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub enum DispersalErrorType {
|
||||||
|
ChunkSize,
|
||||||
|
Verification,
|
||||||
|
}
|
||||||
|
|
||||||
impl_from_for_message!(
|
#[repr(C)]
|
||||||
Message,
|
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
DispersalReq => DispersalReq,
|
pub struct DispersalError {
|
||||||
DispersalRes => DispersalRes,
|
pub blob_id: BlobId,
|
||||||
common::SessionReq => SessionReq,
|
pub error_type: DispersalErrorType,
|
||||||
);
|
pub error_description: String,
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
impl DispersalError {
|
||||||
mod tests {
|
pub fn new(
|
||||||
use crate::{common, dispersal};
|
blob_id: BlobId,
|
||||||
|
error_type: DispersalErrorType,
|
||||||
#[test]
|
error_description: impl Into<String>,
|
||||||
fn dispersal_message() {
|
) -> Self {
|
||||||
let blob = common::Blob {
|
Self {
|
||||||
blob_id: vec![0; 32],
|
blob_id,
|
||||||
data: vec![1; 32],
|
error_type,
|
||||||
};
|
error_description: error_description.into(),
|
||||||
let req = dispersal::DispersalReq {
|
|
||||||
blob: Some(blob),
|
|
||||||
subnetwork_id: 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
assert_eq!(req.blob.unwrap().blob_id, vec![0; 32]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct DispersalRequest {
|
||||||
|
pub blob: Blob,
|
||||||
|
pub subnetwork_id: SubnetworkId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DispersalRequest {
|
||||||
|
pub fn new(blob: Blob, subnetwork_id: SubnetworkId) -> Self {
|
||||||
|
Self {
|
||||||
|
blob,
|
||||||
|
subnetwork_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub enum DispersalResponse {
|
||||||
|
BlobId(BlobId),
|
||||||
|
Error(DispersalError),
|
||||||
|
}
|
||||||
|
|
|
@ -1,97 +1,8 @@
|
||||||
use std::io;
|
pub mod common;
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
use futures::AsyncReadExt;
|
|
||||||
use prost::Message;
|
|
||||||
|
|
||||||
pub mod dispersal;
|
pub mod dispersal;
|
||||||
|
pub mod packing;
|
||||||
pub mod replication;
|
pub mod replication;
|
||||||
pub mod sampling;
|
pub mod sampling;
|
||||||
|
|
||||||
const MAX_MSG_LEN_BYTES: usize = 2;
|
type Result<T> = std::result::Result<T, std::io::Error>;
|
||||||
|
type SubnetworkId = u16; // Must match `nomos-da-network-core::SubnetworkId`
|
||||||
pub mod common {
|
|
||||||
include!(concat!(env!("OUT_DIR"), "/nomos.da.v1.common.rs"));
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn pack_message(message: &impl Message) -> Result<Vec<u8>, io::Error> {
|
|
||||||
let data_len = message.encoded_len();
|
|
||||||
|
|
||||||
if data_len > (1 << (MAX_MSG_LEN_BYTES * 8)) {
|
|
||||||
return Err(io::Error::new(
|
|
||||||
io::ErrorKind::InvalidData,
|
|
||||||
"Message too large",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut buf = Vec::with_capacity(MAX_MSG_LEN_BYTES + data_len);
|
|
||||||
buf.extend_from_slice(&(data_len as u16).to_be_bytes());
|
|
||||||
message.encode(&mut buf).unwrap();
|
|
||||||
|
|
||||||
Ok(buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn unpack_from_reader<M, R>(reader: &mut R) -> Result<M, io::Error>
|
|
||||||
where
|
|
||||||
M: Message + Default,
|
|
||||||
R: AsyncReadExt + Unpin,
|
|
||||||
{
|
|
||||||
let mut length_prefix = [0u8; MAX_MSG_LEN_BYTES];
|
|
||||||
reader.read_exact(&mut length_prefix).await?;
|
|
||||||
let data_length = u16::from_be_bytes(length_prefix) as usize;
|
|
||||||
|
|
||||||
let mut data = vec![0u8; data_length];
|
|
||||||
reader.read_exact(&mut data).await?;
|
|
||||||
M::decode(Bytes::from(data)).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Macro to implement From trait for Wrapper Messages.
|
|
||||||
///
|
|
||||||
/// Usage:
|
|
||||||
/// ```ignore
|
|
||||||
/// impl_from_for_message!(
|
|
||||||
/// WrapperMessage, // impl From<Req> for WrappedMessage {
|
|
||||||
/// Req => WrappedReq, // .. return WrappedMsg::MessageType::WrappedReq(Req);
|
|
||||||
/// );
|
|
||||||
/// ```
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! impl_from_for_message {
|
|
||||||
($message:path, $($type:path => $variant:ident),+ $(,)?) => {
|
|
||||||
$(
|
|
||||||
impl From<$type> for $message {
|
|
||||||
fn from(msg: $type) -> Self {
|
|
||||||
$message {
|
|
||||||
message_type: Some(message::MessageType::$variant(msg)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)+
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use futures::io::BufReader;
|
|
||||||
|
|
||||||
use crate::{common, dispersal, pack_message, unpack_from_reader};
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn pack_and_unpack_from_reader() {
|
|
||||||
let blob = common::Blob {
|
|
||||||
blob_id: vec![0; 32],
|
|
||||||
data: vec![1; 32],
|
|
||||||
};
|
|
||||||
let message: dispersal::Message = dispersal::DispersalReq {
|
|
||||||
blob: Some(blob),
|
|
||||||
subnetwork_id: 0,
|
|
||||||
}
|
|
||||||
.into();
|
|
||||||
|
|
||||||
let packed = pack_message(&message).unwrap();
|
|
||||||
|
|
||||||
let mut reader = BufReader::new(&packed[..]);
|
|
||||||
let unpacked: dispersal::Message = unpack_from_reader(&mut reader).await.unwrap();
|
|
||||||
|
|
||||||
assert_eq!(message, unpacked);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,192 @@
|
||||||
|
// STD
|
||||||
|
use futures::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use std::io;
|
||||||
|
// Crates
|
||||||
|
use nomos_core::wire;
|
||||||
|
use serde::de::DeserializeOwned;
|
||||||
|
use serde::Serialize;
|
||||||
|
// Internal
|
||||||
|
use crate::Result;
|
||||||
|
|
||||||
|
type LenType = u16;
|
||||||
|
const MAX_MSG_LEN_BYTES: usize = size_of::<LenType>();
|
||||||
|
const MAX_MSG_LEN: usize = 1 << (MAX_MSG_LEN_BYTES * 8);
|
||||||
|
|
||||||
|
fn into_failed_to_serialize(error: wire::Error) -> io::Error {
|
||||||
|
io::Error::new(
|
||||||
|
io::ErrorKind::InvalidData,
|
||||||
|
format!("Failed to serialize message: {}", error),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn into_failed_to_deserialize(error: wire::Error) -> io::Error {
|
||||||
|
io::Error::new(
|
||||||
|
io::ErrorKind::InvalidData,
|
||||||
|
format!("Failed to deserialize message: {}", error),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
struct MessageTooLargeError(usize);
|
||||||
|
|
||||||
|
impl From<MessageTooLargeError> for io::Error {
|
||||||
|
fn from(value: MessageTooLargeError) -> Self {
|
||||||
|
io::Error::new(
|
||||||
|
io::ErrorKind::InvalidData,
|
||||||
|
format!(
|
||||||
|
"Message too large. Maximum size is {}. Actual size is {}",
|
||||||
|
MAX_MSG_LEN, value.0
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn pack<Message>(message: &Message) -> Result<Vec<u8>>
|
||||||
|
where
|
||||||
|
Message: Serialize,
|
||||||
|
{
|
||||||
|
wire::serialize(message).map_err(into_failed_to_serialize)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_packed_message_size(packed_message: &[u8]) -> Result<usize> {
|
||||||
|
let data_length = packed_message.len();
|
||||||
|
if data_length > MAX_MSG_LEN {
|
||||||
|
return Err(MessageTooLargeError(data_length).into());
|
||||||
|
}
|
||||||
|
Ok(data_length)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn prepare_message_for_writer(packed_message: &[u8]) -> Result<Vec<u8>> {
|
||||||
|
let data_length = get_packed_message_size(packed_message)?;
|
||||||
|
let mut buffer = Vec::with_capacity(MAX_MSG_LEN_BYTES + data_length);
|
||||||
|
buffer.extend_from_slice(&(data_length as LenType).to_be_bytes());
|
||||||
|
buffer.extend_from_slice(packed_message);
|
||||||
|
Ok(buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn pack_to_writer<Message, Writer>(message: &Message, writer: &mut Writer) -> Result<()>
|
||||||
|
where
|
||||||
|
Message: Serialize,
|
||||||
|
Writer: AsyncWriteExt + Unpin,
|
||||||
|
{
|
||||||
|
let packed_message = pack(message)?;
|
||||||
|
let prepared_packed_message = prepare_message_for_writer(&packed_message)?;
|
||||||
|
writer.write_all(&prepared_packed_message).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_data_length<R>(reader: &mut R) -> Result<usize>
|
||||||
|
where
|
||||||
|
R: AsyncReadExt + Unpin,
|
||||||
|
{
|
||||||
|
let mut length_prefix = [0u8; MAX_MSG_LEN_BYTES];
|
||||||
|
reader.read_exact(&mut length_prefix).await?;
|
||||||
|
let s = LenType::from_be_bytes(length_prefix) as usize;
|
||||||
|
Ok(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unpack<M: DeserializeOwned>(data: &[u8]) -> Result<M> {
|
||||||
|
wire::deserialize(data).map_err(into_failed_to_deserialize)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn unpack_from_reader<Message, R>(reader: &mut R) -> Result<Message>
|
||||||
|
where
|
||||||
|
Message: DeserializeOwned,
|
||||||
|
R: AsyncReadExt + Unpin,
|
||||||
|
{
|
||||||
|
let data_length = read_data_length(reader).await?;
|
||||||
|
let mut data = vec![0u8; data_length];
|
||||||
|
reader.read_exact(&mut data).await?;
|
||||||
|
unpack(&data)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::common::Blob;
|
||||||
|
use crate::dispersal::{DispersalError, DispersalErrorType, DispersalRequest};
|
||||||
|
use futures::io::BufReader;
|
||||||
|
use kzgrs_backend::common::blob::DaBlob;
|
||||||
|
use kzgrs_backend::encoder::{self, DaEncoderParams};
|
||||||
|
use nomos_core::da::{BlobId, DaEncoder};
|
||||||
|
|
||||||
|
fn get_encoder() -> encoder::DaEncoder {
|
||||||
|
const DOMAIN_SIZE: usize = 16;
|
||||||
|
let params = DaEncoderParams::default_with(DOMAIN_SIZE);
|
||||||
|
encoder::DaEncoder::new(params)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_da_blob() -> DaBlob {
|
||||||
|
let encoder = get_encoder();
|
||||||
|
let data = vec![
|
||||||
|
49u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
|
||||||
|
0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
|
||||||
|
];
|
||||||
|
|
||||||
|
let encoded_data = encoder.encode(&data).unwrap();
|
||||||
|
let columns: Vec<_> = encoded_data.extended_data.columns().collect();
|
||||||
|
|
||||||
|
let index = 0;
|
||||||
|
let da_blob = DaBlob {
|
||||||
|
column: columns[index].clone(),
|
||||||
|
column_idx: index
|
||||||
|
.try_into()
|
||||||
|
.expect("Column index shouldn't overflow the target type"),
|
||||||
|
column_commitment: encoded_data.column_commitments[index],
|
||||||
|
aggregated_column_commitment: encoded_data.aggregated_column_commitment,
|
||||||
|
aggregated_column_proof: encoded_data.aggregated_column_proofs[index],
|
||||||
|
rows_commitments: encoded_data.row_commitments.clone(),
|
||||||
|
rows_proofs: encoded_data
|
||||||
|
.rows_proofs
|
||||||
|
.iter()
|
||||||
|
.map(|proofs| proofs.get(index).cloned().unwrap())
|
||||||
|
.collect(),
|
||||||
|
};
|
||||||
|
|
||||||
|
da_blob
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pack_and_unpack() -> Result<()> {
|
||||||
|
let blob_id = BlobId::from([0; 32]);
|
||||||
|
let data = get_da_blob();
|
||||||
|
let blob = Blob::new(blob_id, data);
|
||||||
|
let subnetwork_id = 0;
|
||||||
|
let message = DispersalRequest::new(blob, subnetwork_id);
|
||||||
|
|
||||||
|
let packed_message = pack(&message)?;
|
||||||
|
let unpacked_message: DispersalRequest = unpack(&packed_message)?;
|
||||||
|
|
||||||
|
assert_eq!(message, unpacked_message);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pack_to_writer_and_unpack_from_reader() -> Result<()> {
|
||||||
|
let blob_id = BlobId::from([0; 32]);
|
||||||
|
let data = get_da_blob();
|
||||||
|
let blob = Blob::new(blob_id, data);
|
||||||
|
let subnetwork_id = 0;
|
||||||
|
let message = DispersalRequest::new(blob, subnetwork_id);
|
||||||
|
|
||||||
|
let mut writer = Vec::new();
|
||||||
|
pack_to_writer(&message, &mut writer).await?;
|
||||||
|
|
||||||
|
let mut reader = BufReader::new(writer.as_slice());
|
||||||
|
let unpacked_message: DispersalRequest = unpack_from_reader(&mut reader).await?;
|
||||||
|
|
||||||
|
assert_eq!(message, unpacked_message);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pack_to_writer_too_large_message() {
|
||||||
|
let blob_id = BlobId::from([0; 32]);
|
||||||
|
let error_description = ["."; MAX_MSG_LEN].concat();
|
||||||
|
let message =
|
||||||
|
DispersalError::new(blob_id, DispersalErrorType::ChunkSize, error_description);
|
||||||
|
|
||||||
|
let mut writer = Vec::new();
|
||||||
|
let res = pack_to_writer(&message, &mut writer).await;
|
||||||
|
assert!(res.is_err());
|
||||||
|
assert_eq!(res.unwrap_err().kind(), io::ErrorKind::InvalidData);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,9 +1,19 @@
|
||||||
use crate::{common, impl_from_for_message};
|
use crate::common::Blob;
|
||||||
|
use crate::SubnetworkId;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
include!(concat!(env!("OUT_DIR"), "/nomos.da.v1.replication.rs"));
|
#[repr(C)]
|
||||||
|
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct ReplicationRequest {
|
||||||
|
pub blob: Blob,
|
||||||
|
pub subnetwork_id: SubnetworkId,
|
||||||
|
}
|
||||||
|
|
||||||
impl_from_for_message!(
|
impl ReplicationRequest {
|
||||||
Message,
|
pub fn new(blob: Blob, subnetwork_id: SubnetworkId) -> Self {
|
||||||
ReplicationReq => ReplicationReq,
|
Self {
|
||||||
common::SessionReq => SessionReq,
|
blob,
|
||||||
);
|
subnetwork_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,9 +1,56 @@
|
||||||
use crate::impl_from_for_message;
|
use crate::common::Blob;
|
||||||
|
use kzgrs_backend::common::ColumnIndex;
|
||||||
|
use nomos_core::da::BlobId;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
include!(concat!(env!("OUT_DIR"), "/nomos.da.v1.sampling.rs"));
|
#[repr(C)]
|
||||||
|
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub enum SampleErrorType {
|
||||||
|
NotFound,
|
||||||
|
}
|
||||||
|
|
||||||
impl_from_for_message!(
|
#[repr(C)]
|
||||||
Message,
|
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
SampleReq => SampleReq,
|
pub struct SampleError {
|
||||||
SampleRes => SampleRes,
|
pub blob_id: BlobId,
|
||||||
);
|
pub error_type: SampleErrorType,
|
||||||
|
pub error_description: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SampleError {
|
||||||
|
pub fn new(
|
||||||
|
blob_id: BlobId,
|
||||||
|
error_type: SampleErrorType,
|
||||||
|
error_description: impl Into<String>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
blob_id,
|
||||||
|
error_type,
|
||||||
|
error_description: error_description.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct SampleRequest {
|
||||||
|
pub blob_id: BlobId,
|
||||||
|
pub column_idx: ColumnIndex,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SampleRequest {
|
||||||
|
pub fn new(blob_id: BlobId, column_idx: ColumnIndex) -> Self {
|
||||||
|
Self {
|
||||||
|
blob_id,
|
||||||
|
column_idx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::large_enum_variant)]
|
||||||
|
#[repr(C)]
|
||||||
|
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub enum SampleResponse {
|
||||||
|
Blob(Blob),
|
||||||
|
Error(SampleError),
|
||||||
|
}
|
||||||
|
|
|
@ -42,7 +42,7 @@ impl FillFromNodeList {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MembershipHandler for FillFromNodeList {
|
impl MembershipHandler for FillFromNodeList {
|
||||||
type NetworkId = u32;
|
type NetworkId = u16;
|
||||||
type Id = PeerId;
|
type Id = PeerId;
|
||||||
|
|
||||||
fn membership(&self, id: &Self::Id) -> HashSet<Self::NetworkId> {
|
fn membership(&self, id: &Self::Id) -> HashSet<Self::NetworkId> {
|
||||||
|
|
|
@ -65,7 +65,7 @@ impl FillWithOriginalReplication {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MembershipHandler for FillWithOriginalReplication {
|
impl MembershipHandler for FillWithOriginalReplication {
|
||||||
type NetworkId = u32;
|
type NetworkId = u16;
|
||||||
type Id = PeerId;
|
type Id = PeerId;
|
||||||
|
|
||||||
fn membership(&self, id: &Self::Id) -> HashSet<Self::NetworkId> {
|
fn membership(&self, id: &Self::Id) -> HashSet<Self::NetworkId> {
|
||||||
|
|
|
@ -46,7 +46,7 @@ pub struct DispersalFromAdapter<Adapter> {
|
||||||
impl<Adapter> DaDispersal for DispersalFromAdapter<Adapter>
|
impl<Adapter> DaDispersal for DispersalFromAdapter<Adapter>
|
||||||
where
|
where
|
||||||
Adapter: DispersalNetworkAdapter + Send + Sync,
|
Adapter: DispersalNetworkAdapter + Send + Sync,
|
||||||
Adapter::SubnetworkId: From<u32> + Send + Sync,
|
Adapter::SubnetworkId: From<u16> + Send + Sync,
|
||||||
{
|
{
|
||||||
type EncodedData = EncodedData;
|
type EncodedData = EncodedData;
|
||||||
type Error = DynError;
|
type Error = DynError;
|
||||||
|
@ -62,7 +62,7 @@ where
|
||||||
let reponses_stream = adapter.dispersal_events_stream().await?;
|
let reponses_stream = adapter.dispersal_events_stream().await?;
|
||||||
for (subnetwork_id, blob) in encoded_data_to_da_blobs(encoded_data).enumerate() {
|
for (subnetwork_id, blob) in encoded_data_to_da_blobs(encoded_data).enumerate() {
|
||||||
adapter
|
adapter
|
||||||
.disperse((subnetwork_id as u32).into(), blob)
|
.disperse((subnetwork_id as u16).into(), blob)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,7 +88,7 @@ impl<NetworkAdapter, MempoolAdapter> DispersalBackend
|
||||||
for DispersalKZGRSBackend<NetworkAdapter, MempoolAdapter>
|
for DispersalKZGRSBackend<NetworkAdapter, MempoolAdapter>
|
||||||
where
|
where
|
||||||
NetworkAdapter: DispersalNetworkAdapter + Send + Sync,
|
NetworkAdapter: DispersalNetworkAdapter + Send + Sync,
|
||||||
NetworkAdapter::SubnetworkId: From<u32> + Send + Sync,
|
NetworkAdapter::SubnetworkId: From<u16> + Send + Sync,
|
||||||
MempoolAdapter: DaMempoolAdapter<BlobId = BlobId, Metadata = dispersal::Metadata> + Send + Sync,
|
MempoolAdapter: DaMempoolAdapter<BlobId = BlobId, Metadata = dispersal::Metadata> + Send + Sync,
|
||||||
{
|
{
|
||||||
type Settings = DispersalKZGRSBackendSettings;
|
type Settings = DispersalKZGRSBackendSettings;
|
||||||
|
|
|
@ -120,7 +120,7 @@ pub(crate) async fn handle_validator_events_stream(
|
||||||
let result = match maybe_blob {
|
let result = match maybe_blob {
|
||||||
Some(blob) => BehaviourSampleRes::SamplingSuccess {
|
Some(blob) => BehaviourSampleRes::SamplingSuccess {
|
||||||
blob_id,
|
blob_id,
|
||||||
subnetwork_id: blob.column_idx as u32,
|
subnetwork_id: blob.column_idx,
|
||||||
blob: Box::new(blob),
|
blob: Box::new(blob),
|
||||||
},
|
},
|
||||||
None => BehaviourSampleRes::SampleNotFound { blob_id },
|
None => BehaviourSampleRes::SampleNotFound { blob_id },
|
||||||
|
|
|
@ -290,7 +290,7 @@ where
|
||||||
let membership = &config.validator_settings.membership;
|
let membership = &config.validator_settings.membership;
|
||||||
|
|
||||||
// filter out which subnetworks we already have connections with
|
// filter out which subnetworks we already have connections with
|
||||||
let connected_subnetworks: HashSet<SubnetworkId> = (0..config.num_subnets as u32)
|
let connected_subnetworks: HashSet<SubnetworkId> = (0..config.num_subnets)
|
||||||
.filter(|subnetwork_id| {
|
.filter(|subnetwork_id| {
|
||||||
!membership
|
!membership
|
||||||
.members_of(subnetwork_id)
|
.members_of(subnetwork_id)
|
||||||
|
@ -351,7 +351,7 @@ where
|
||||||
+ 'static,
|
+ 'static,
|
||||||
{
|
{
|
||||||
let membership = &config.validator_settings.membership;
|
let membership = &config.validator_settings.membership;
|
||||||
(0..config.num_subnets as u32)
|
(0..config.num_subnets)
|
||||||
.filter(|subnetwork_id| !filtered_subnetworks.contains(subnetwork_id))
|
.filter(|subnetwork_id| !filtered_subnetworks.contains(subnetwork_id))
|
||||||
.filter_map(|subnetwork_id| {
|
.filter_map(|subnetwork_id| {
|
||||||
membership
|
membership
|
||||||
|
|
|
@ -66,6 +66,6 @@ impl DaVerifier for KzgrsDaVerifier {
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct KzgrsDaVerifierSettings {
|
pub struct KzgrsDaVerifierSettings {
|
||||||
pub sk: String,
|
pub sk: String,
|
||||||
pub index: HashSet<u32>,
|
pub index: HashSet<u16>,
|
||||||
pub global_params_path: String,
|
pub global_params_path: String,
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,7 @@ pub struct GeneralDaConfig {
|
||||||
pub blob_storage_directory: PathBuf,
|
pub blob_storage_directory: PathBuf,
|
||||||
pub global_params_path: String,
|
pub global_params_path: String,
|
||||||
pub verifier_sk: String,
|
pub verifier_sk: String,
|
||||||
pub verifier_index: HashSet<u32>,
|
pub verifier_index: HashSet<u16>,
|
||||||
pub num_samples: u16,
|
pub num_samples: u16,
|
||||||
pub num_subnets: u16,
|
pub num_subnets: u16,
|
||||||
pub old_blobs_check_interval: Duration,
|
pub old_blobs_check_interval: Duration,
|
||||||
|
|
Loading…
Reference in New Issue