diff --git a/nomos-da/kzgrs-backend/benches/verifier.rs b/nomos-da/kzgrs-backend/benches/verifier.rs index 9b0e4ed0..7896aec6 100644 --- a/nomos-da/kzgrs-backend/benches/verifier.rs +++ b/nomos-da/kzgrs-backend/benches/verifier.rs @@ -35,7 +35,7 @@ fn verify(bencher: Bencher, column_size: usize) { let sk = SecretKey::key_gen(&buff, &[]).unwrap(); let verifier = DaVerifier::new( sk.clone(), - (0..column_size as u32).collect(), + (0..column_size as u16).collect(), GLOBAL_PARAMETERS.clone(), ); let da_blob = DaBlob { diff --git a/nomos-da/kzgrs-backend/src/common/blob.rs b/nomos-da/kzgrs-backend/src/common/blob.rs index 707847af..f0042b94 100644 --- a/nomos-da/kzgrs-backend/src/common/blob.rs +++ b/nomos-da/kzgrs-backend/src/common/blob.rs @@ -13,7 +13,7 @@ use crate::common::{ deserialize_canonical, deserialize_vec_canonical, serialize_canonical, serialize_vec_canonical, }; -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct DaBlob { pub column: Column, pub column_idx: ColumnIndex, @@ -54,6 +54,10 @@ impl DaBlob { hasher.update(self.column.as_bytes()); hasher.finalize().as_slice().to_vec() } + + pub fn column_len(&self) -> usize { + self.column.as_bytes().len() + } } impl blob::Blob for DaBlob { diff --git a/nomos-da/kzgrs-backend/src/common/mod.rs b/nomos-da/kzgrs-backend/src/common/mod.rs index d0cb622a..3f1ac6c5 100644 --- a/nomos-da/kzgrs-backend/src/common/mod.rs +++ b/nomos-da/kzgrs-backend/src/common/mod.rs @@ -16,7 +16,7 @@ use kzgrs::Commitment; #[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] pub struct Chunk(pub Vec); pub struct Row(pub Vec); -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct Column(pub Vec); pub struct ChunksMatrix(pub Vec); pub type ColumnIndex = u16; diff --git a/nomos-da/kzgrs-backend/src/dispersal.rs b/nomos-da/kzgrs-backend/src/dispersal.rs index 9761b951..ae10ed9f 100644 --- a/nomos-da/kzgrs-backend/src/dispersal.rs +++ b/nomos-da/kzgrs-backend/src/dispersal.rs @@ -161,7 +161,7 @@ mod tests { .into_iter() .enumerate() .map(|(index, sk)| { - DaVerifier::new(sk, [index as u32].into(), GLOBAL_PARAMETERS.clone()) + DaVerifier::new(sk, [index as u16].into(), GLOBAL_PARAMETERS.clone()) }) .collect(); diff --git a/nomos-da/kzgrs-backend/src/verifier.rs b/nomos-da/kzgrs-backend/src/verifier.rs index fe25af6d..6bc6bda3 100644 --- a/nomos-da/kzgrs-backend/src/verifier.rs +++ b/nomos-da/kzgrs-backend/src/verifier.rs @@ -18,12 +18,12 @@ use crate::encoder::DaEncoderParams; pub struct DaVerifier { // TODO: substitute this for an abstraction to sign things over pub sk: SecretKey, - pub index: HashSet, + pub index: HashSet, global_parameters: GlobalParameters, } impl DaVerifier { - pub fn new(sk: SecretKey, index: HashSet, global_parameters: GlobalParameters) -> Self { + pub fn new(sk: SecretKey, index: HashSet, global_parameters: GlobalParameters) -> Self { Self { sk, index, @@ -118,7 +118,7 @@ impl DaVerifier { pub fn verify(&self, blob: &DaBlob, rows_domain_size: usize) -> bool { let rows_domain = PolynomialEvaluationDomain::new(rows_domain_size) .expect("Domain should be able to build"); - let blob_col_idx = &u16::from_be_bytes(blob.column_idx()).into(); + let blob_col_idx = &u16::from_be_bytes(blob.column_idx()); let index = self.index.get(blob_col_idx).unwrap(); let is_column_verified = DaVerifier::verify_column( @@ -380,7 +380,7 @@ mod test { .into_iter() .enumerate() .map(|(index, sk)| { - DaVerifier::new(sk, [index as u32].into(), GLOBAL_PARAMETERS.clone()) + DaVerifier::new(sk, [index as u16].into(), GLOBAL_PARAMETERS.clone()) }) .collect(); let encoded_data = encoder.encode(&data).unwrap(); diff --git a/nomos-da/network/core/src/lib.rs b/nomos-da/network/core/src/lib.rs index 1f716e25..42c78f57 100644 --- a/nomos-da/network/core/src/lib.rs +++ b/nomos-da/network/core/src/lib.rs @@ -6,5 +6,5 @@ pub mod swarm; #[cfg(test)] pub mod test_utils; -pub type SubnetworkId = u32; +pub type SubnetworkId = u16; pub use libp2p::PeerId; diff --git a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs index afe2e2e2..233d2f98 100644 --- a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs +++ b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs @@ -5,7 +5,7 @@ use std::task::{Context, Poll}; use either::Either; use futures::future::BoxFuture; use futures::stream::{BoxStream, FuturesUnordered}; -use futures::{AsyncWriteExt, FutureExt, StreamExt}; +use futures::{AsyncWriteExt, FutureExt, StreamExt, TryFutureExt}; use libp2p::core::Endpoint; use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished}; use libp2p::swarm::dial_opts::DialOpts; @@ -30,9 +30,8 @@ use crate::SubnetworkId; use kzgrs_backend::common::blob::DaBlob; use nomos_core::da::BlobId; use nomos_da_messages::common::Blob; -use nomos_da_messages::dispersal::dispersal_res::MessageType; -use nomos_da_messages::dispersal::{DispersalErr, DispersalReq, DispersalRes}; -use nomos_da_messages::{pack_message, unpack_from_reader}; +use nomos_da_messages::dispersal; +use nomos_da_messages::packing::{pack_to_writer, unpack_from_reader}; use subnetworks_assignations::MembershipHandler; #[derive(Debug, Error)] @@ -52,7 +51,7 @@ pub enum DispersalError { #[error("Dispersal response error: {error:?}")] Protocol { subnetwork_id: SubnetworkId, - error: DispersalErr, + error: dispersal::DispersalError, }, #[error("Error dialing peer [{peer_id}]: {error}")] OpenStreamError { @@ -67,9 +66,9 @@ impl DispersalError { DispersalError::Io { blob_id, .. } => Some(*blob_id), DispersalError::Serialization { blob_id, .. } => Some(*blob_id), DispersalError::Protocol { - error: DispersalErr { blob_id, .. }, + error: dispersal::DispersalError { blob_id, .. }, .. - } => Some(blob_id.clone().try_into().unwrap()), + } => Some(*blob_id), DispersalError::OpenStreamError { .. } => None, } } @@ -147,7 +146,12 @@ struct DispersalStream { peer_id: PeerId, } -type StreamHandlerFutureSuccess = (BlobId, SubnetworkId, DispersalRes, DispersalStream); +type StreamHandlerFutureSuccess = ( + BlobId, + SubnetworkId, + dispersal::DispersalResponse, + DispersalStream, +); type StreamHandlerFuture = BoxFuture<'static, Result>; /// Executor dispersal protocol @@ -260,52 +264,33 @@ where subnetwork_id: SubnetworkId, ) -> Result { let blob_id = message.id(); - let blob = bincode::serialize(&message).map_err(|error| DispersalError::Serialization { - error, - blob_id: blob_id.clone().try_into().unwrap(), - subnetwork_id, - })?; - let message = DispersalReq { - blob: Some(Blob { - blob_id: blob_id.clone(), - data: blob, - }), - subnetwork_id, - }; - stream - .stream - .write_all(&pack_message(&message).map_err(|error| DispersalError::Io { - error, - blob_id: blob_id.clone().try_into().unwrap(), - subnetwork_id, - })?) - .await + let blob_id: BlobId = blob_id.clone().try_into().unwrap(); + let message = dispersal::DispersalRequest::new(Blob::new(blob_id, message), subnetwork_id); + pack_to_writer(&message, &mut stream.stream) .map_err(|error| DispersalError::Io { error, - blob_id: blob_id.clone().try_into().unwrap(), + blob_id, subnetwork_id, - })?; + }) + .await?; stream .stream .flush() .await .map_err(|error| DispersalError::Io { error, - blob_id: blob_id.clone().try_into().unwrap(), + blob_id, subnetwork_id, })?; - let response: DispersalRes = - unpack_from_reader(&mut stream.stream) - .await - .map_err(|error| DispersalError::Io { - error, - blob_id: blob_id.clone().try_into().unwrap(), - subnetwork_id, - })?; - // Safety: blob_id should always be a 32bytes hash, currently is abstracted into a `Vec` - // but probably we should have a `[u8; 32]` wrapped in a custom type `BlobId` - // TODO: use blob_id when changing types to [u8; 32] - Ok((blob_id.try_into().unwrap(), subnetwork_id, response, stream)) + let response: dispersal::DispersalResponse = unpack_from_reader(&mut stream.stream) + .await + .map_err(|error| DispersalError::Io { + error, + blob_id, + subnetwork_id, + })?; + // Safety: blob_id should always be a 32bytes hash + Ok((blob_id, subnetwork_id, response, stream)) } /// Run when a stream gets free, if there is a pending task for the stream it will get scheduled to run @@ -571,10 +556,7 @@ impl + 'static> Netw // handle the free stream then return the success Self::handle_stream(tasks, to_disperse, idle_streams, stream); // return an error if there was an error on the other side of the wire - if let DispersalRes { - message_type: Some(MessageType::Err(error)), - } = dispersal_response - { + if let dispersal::DispersalResponse::Error(error) = dispersal_response { return Poll::Ready(ToSwarm::GenerateEvent( DispersalExecutorEvent::DispersalError { error: DispersalError::Protocol { diff --git a/nomos-da/network/core/src/protocols/dispersal/mod.rs b/nomos-da/network/core/src/protocols/dispersal/mod.rs index b03859b1..073a63df 100644 --- a/nomos-da/network/core/src/protocols/dispersal/mod.rs +++ b/nomos-da/network/core/src/protocols/dispersal/mod.rs @@ -9,6 +9,7 @@ pub mod test { DispersalEvent, DispersalValidatorBehaviour, }; use crate::test_utils::AllNeighbours; + use crate::SubnetworkId; use futures::StreamExt; use kzgrs_backend::common::blob::DaBlob; use kzgrs_backend::common::Column; @@ -23,9 +24,9 @@ pub mod test { pub fn executor_swarm( addressbook: AddressBook, key: Keypair, - membership: impl MembershipHandler + 'static, + membership: impl MembershipHandler + 'static, ) -> libp2p::Swarm< - DispersalExecutorBehaviour>, + DispersalExecutorBehaviour>, > { let peer_id = PeerId::from_public_key(&key.public()); libp2p::SwarmBuilder::with_existing_identity(key) @@ -44,9 +45,9 @@ pub mod test { pub fn validator_swarm( key: Keypair, - membership: impl MembershipHandler + 'static, + membership: impl MembershipHandler + 'static, ) -> libp2p::Swarm< - DispersalValidatorBehaviour>, + DispersalValidatorBehaviour>, > { libp2p::SwarmBuilder::with_existing_identity(key) .with_tokio() diff --git a/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs b/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs index a7b5f533..cb2f9447 100644 --- a/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs +++ b/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs @@ -12,9 +12,8 @@ use libp2p::swarm::{ use libp2p::{Multiaddr, PeerId, Stream}; use libp2p_stream::IncomingStreams; use log::debug; -use nomos_da_messages::dispersal::dispersal_res::MessageType; -use nomos_da_messages::dispersal::{DispersalReq, DispersalRes}; -use nomos_da_messages::{pack_message, unpack_from_reader}; +use nomos_da_messages::dispersal; +use nomos_da_messages::packing::{pack_to_writer, unpack_from_reader}; use std::io::Error; use std::task::{Context, Poll}; use subnetworks_assignations::MembershipHandler; @@ -22,15 +21,15 @@ use subnetworks_assignations::MembershipHandler; #[derive(Debug)] pub enum DispersalEvent { /// Received a n - IncomingMessage { message: DispersalReq }, + IncomingMessage { + message: dispersal::DispersalRequest, + }, } impl DispersalEvent { pub fn blob_size(&self) -> Option { match self { - DispersalEvent::IncomingMessage { message } => { - message.blob.as_ref().map(|blob| blob.data.len()) - } + DispersalEvent::IncomingMessage { message } => Some(message.blob.data.column_len()), } } } @@ -38,7 +37,8 @@ impl DispersalEvent { pub struct DispersalValidatorBehaviour { stream_behaviour: libp2p_stream::Behaviour, incoming_streams: IncomingStreams, - tasks: FuturesUnordered>>, + tasks: + FuturesUnordered>>, membership: Membership, } @@ -65,14 +65,13 @@ impl DispersalValidatorBehaviour { /// Stream handling messages task. /// This task handles a single message receive. Then it writes up the acknowledgment into the same /// stream as response and finish. - async fn handle_new_stream(mut stream: Stream) -> Result<(DispersalReq, Stream), Error> { - let message: DispersalReq = unpack_from_reader(&mut stream).await?; - let blob_id = message.blob.clone().unwrap().blob_id; - let response = DispersalRes { - message_type: Some(MessageType::BlobId(blob_id)), - }; - let message_bytes = pack_message(&response)?; - stream.write_all(&message_bytes).await?; + async fn handle_new_stream( + mut stream: Stream, + ) -> Result<(dispersal::DispersalRequest, Stream), Error> { + let message: dispersal::DispersalRequest = unpack_from_reader(&mut stream).await?; + let blob_id = message.blob.blob_id; + let response = dispersal::DispersalResponse::BlobId(blob_id); + pack_to_writer(&response, &mut stream).await?; stream.flush().await?; Ok((message, stream)) } diff --git a/nomos-da/network/core/src/protocols/replication/behaviour.rs b/nomos-da/network/core/src/protocols/replication/behaviour.rs index 07f7aa4a..39e209d2 100644 --- a/nomos-da/network/core/src/protocols/replication/behaviour.rs +++ b/nomos-da/network/core/src/protocols/replication/behaviour.rs @@ -34,7 +34,7 @@ impl ReplicationEvent { pub fn blob_size(&self) -> Option { match self { ReplicationEvent::IncomingMessage { message, .. } => { - message.blob.as_ref().map(|blob| blob.data.len()) + Some(message.blob.data.column_len()) } } } @@ -104,10 +104,7 @@ where } fn replicate_message(&mut self, message: DaMessage) { - let message_id = ( - message.blob.as_ref().unwrap().blob_id.clone(), - message.subnetwork_id, - ); + let message_id = (message.blob.blob_id.to_vec(), message.subnetwork_id); if self.seen_message_cache.contains(&message_id) { return; } @@ -246,12 +243,52 @@ where mod tests { use super::*; use futures::task::{waker_ref, ArcWake}; + use kzgrs_backend::common::blob::DaBlob; + use kzgrs_backend::encoder; + use kzgrs_backend::encoder::DaEncoderParams; use libp2p::{identity, PeerId}; + use nomos_core::da::{BlobId, DaEncoder}; use nomos_da_messages::common::Blob; use std::collections::HashSet; use std::sync::Arc; use std::task::{Context, Poll}; + fn get_encoder() -> encoder::DaEncoder { + const DOMAIN_SIZE: usize = 16; + let params = DaEncoderParams::default_with(DOMAIN_SIZE); + encoder::DaEncoder::new(params) + } + + fn get_da_blob() -> DaBlob { + let encoder = get_encoder(); + let data = vec![ + 49u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, + 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, + ]; + + let encoded_data = encoder.encode(&data).unwrap(); + let columns: Vec<_> = encoded_data.extended_data.columns().collect(); + + let index = 0; + let da_blob = DaBlob { + column: columns[index].clone(), + column_idx: index + .try_into() + .expect("Column index shouldn't overflow the target type"), + column_commitment: encoded_data.column_commitments[index], + aggregated_column_commitment: encoded_data.aggregated_column_commitment, + aggregated_column_proof: encoded_data.aggregated_column_proofs[index], + rows_commitments: encoded_data.row_commitments.clone(), + rows_proofs: encoded_data + .rows_proofs + .iter() + .map(|proofs| proofs.get(index).cloned().unwrap()) + .collect(), + }; + + da_blob + } + #[derive(Clone, Debug)] struct MockMembershipHandler { membership: HashMap>, @@ -295,7 +332,7 @@ mod tests { fn create_replication_behaviours( num_instances: usize, - subnet_id: u32, + subnetwork_id: SubnetworkId, membership: &mut HashMap>, ) -> Vec> { let mut behaviours = Vec::new(); @@ -308,7 +345,7 @@ mod tests { } for peer_id in &peer_ids { - membership.insert(*peer_id, HashSet::from([subnet_id])); + membership.insert(*peer_id, HashSet::from([subnetwork_id])); } let membership_handler = MockMembershipHandler { @@ -397,13 +434,7 @@ mod tests { } // Simulate sending a message from the first behavior. - let message = DaMessage { - blob: Some(Blob { - blob_id: vec![1, 2, 3], - data: vec![4, 5, 6], - }), - subnetwork_id: 0, - }; + let message = DaMessage::new(Blob::new(BlobId::from([0; 32]), get_da_blob()), 0); all_behaviours[0].replicate_message(message.clone()); let waker = Arc::new(TestWaker); @@ -473,7 +504,7 @@ mod tests { for behaviour in &subnet_0_behaviours { assert!(behaviour .seen_message_cache - .contains(&(vec![1, 2, 3], message.subnetwork_id))); + .contains(&([0; 32].to_vec(), message.subnetwork_id))); } // Assert that no members of other subnets have received the message. diff --git a/nomos-da/network/core/src/protocols/replication/handler.rs b/nomos-da/network/core/src/protocols/replication/handler.rs index 9e3c5769..1dc1a1b7 100644 --- a/nomos-da/network/core/src/protocols/replication/handler.rs +++ b/nomos-da/network/core/src/protocols/replication/handler.rs @@ -1,26 +1,24 @@ // std use std::io::Error; use std::task::{Context, Poll}; - -use futures::Future; // crates use futures::future::BoxFuture; use futures::prelude::*; +use futures::Future; use libp2p::core::upgrade::ReadyUpgrade; use libp2p::swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}; use libp2p::swarm::{ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol}; use libp2p::{Stream, StreamProtocol}; use log::trace; +use nomos_da_messages::packing::{pack_to_writer, unpack_from_reader}; use tracing::error; - // internal -use nomos_da_messages::{pack_message, unpack_from_reader}; - use crate::protocol::REPLICATION_PROTOCOL; -pub type DaMessage = nomos_da_messages::replication::ReplicationReq; +pub type DaMessage = nomos_da_messages::replication::ReplicationRequest; -/// Events that bubbles up from the `BroadcastHandler` to the `NetworkBehaviour` +/// Events that bubbles up from the `BroadcastHandler` to the `NetworkBehaviour +#[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum HandlerEventToBehaviour { IncomingMessage { message: DaMessage }, @@ -92,11 +90,19 @@ impl ReplicationHandler { std::mem::swap(&mut self.outgoing_messages, &mut pending_messages); async { trace!("Writing {} messages", pending_messages.len()); + for message in pending_messages { - let bytes = pack_message(&message)?; - stream.write_all(&bytes).await?; + pack_to_writer(&message, &mut stream) + .await + .unwrap_or_else(|_| { + panic!( + "Message should always be serializable.\nMessage: '{:?}'", + message + ) + }); stream.flush().await?; } + Ok(stream) } } @@ -107,8 +113,8 @@ impl ReplicationHandler { ) -> impl Future> { trace!("Reading messages"); async move { - let msg: DaMessage = unpack_from_reader(&mut stream).await?; - Ok((msg, stream)) + let unpacked_message = unpack_from_reader(&mut stream).await?; + Ok((unpacked_message, stream)) } } diff --git a/nomos-da/network/core/src/protocols/replication/mod.rs b/nomos-da/network/core/src/protocols/replication/mod.rs index 6f26ab62..46ebfa9e 100644 --- a/nomos-da/network/core/src/protocols/replication/mod.rs +++ b/nomos-da/network/core/src/protocols/replication/mod.rs @@ -3,20 +3,61 @@ pub mod handler; #[cfg(test)] mod test { + use crate::protocols::replication::behaviour::{ReplicationBehaviour, ReplicationEvent}; + use crate::protocols::replication::handler::DaMessage; + use crate::test_utils::AllNeighbours; use futures::StreamExt; + use kzgrs_backend::common::blob::DaBlob; + use kzgrs_backend::encoder; + use kzgrs_backend::encoder::DaEncoderParams; use libp2p::identity::Keypair; use libp2p::swarm::SwarmEvent; use libp2p::{quic, Multiaddr, PeerId, Swarm}; use log::info; + use nomos_core::da::{BlobId, DaEncoder}; + use nomos_da_messages::common::Blob; use std::time::Duration; use tracing_subscriber::fmt::TestWriter; use tracing_subscriber::EnvFilter; - use nomos_da_messages::common::Blob; + fn get_encoder() -> encoder::DaEncoder { + const DOMAIN_SIZE: usize = 16; + let params = DaEncoderParams::default_with(DOMAIN_SIZE); + encoder::DaEncoder::new(params) + } - use crate::protocols::replication::behaviour::{ReplicationBehaviour, ReplicationEvent}; - use crate::protocols::replication::handler::DaMessage; - use crate::test_utils::AllNeighbours; + fn get_da_blob(data: Option>) -> DaBlob { + let encoder = get_encoder(); + + let data = data.unwrap_or_else(|| { + vec![ + 49u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, + 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, + ] + }); + + let encoded_data = encoder.encode(&data).unwrap(); + let columns: Vec<_> = encoded_data.extended_data.columns().collect(); + + let index = 0; + let da_blob = DaBlob { + column: columns[index].clone(), + column_idx: index + .try_into() + .expect("Column index shouldn't overflow the target type"), + column_commitment: encoded_data.column_commitments[index], + aggregated_column_commitment: encoded_data.aggregated_column_commitment, + aggregated_column_proof: encoded_data.aggregated_column_proofs[index], + rows_commitments: encoded_data.row_commitments.clone(), + rows_proofs: encoded_data + .rows_proofs + .iter() + .map(|proofs| proofs.get(index).cloned().unwrap()) + .collect(), + }; + + da_blob + } #[tokio::test] async fn test_connects_and_receives_replication_messages() { @@ -94,13 +135,19 @@ mod test { tokio::select! { // send a message everytime that the channel ticks _ = receiver.recv() => { - swarm_2.behaviour_mut().send_message(DaMessage { - blob: Some(Blob { - blob_id: i.to_be_bytes().to_vec(), - data: i.to_be_bytes().to_vec(), - }), - subnetwork_id: 0, - }); + // let blob_id_bytes: [u8; 32] = i.to_be_bytes().to_vec().try_into().unwrap(); + + let mut blob_id_bytes = [0; 32]; + let b = i.to_be_bytes(); + assert!(b.len() <= blob_id_bytes.len()); + blob_id_bytes[0..b.len()].copy_from_slice(&b); + assert_eq!(blob_id_bytes.len(), 32); + + let blob = Blob::new( + BlobId::from(blob_id_bytes), + get_da_blob(None) + ); + swarm_2.behaviour_mut().send_message(DaMessage::new(blob, 0)); i += 1; } // print out events diff --git a/nomos-da/network/core/src/protocols/sampling/behaviour.rs b/nomos-da/network/core/src/protocols/sampling/behaviour.rs index d4e2a986..ff22e9d0 100644 --- a/nomos-da/network/core/src/protocols/sampling/behaviour.rs +++ b/nomos-da/network/core/src/protocols/sampling/behaviour.rs @@ -7,7 +7,7 @@ use futures::channel::oneshot; use futures::channel::oneshot::{Canceled, Receiver, Sender}; use futures::future::BoxFuture; use futures::stream::{BoxStream, FuturesUnordered}; -use futures::{AsyncWriteExt, FutureExt, StreamExt}; +use futures::{AsyncWriteExt, FutureExt, StreamExt, TryFutureExt}; use kzgrs_backend::common::blob::DaBlob; use kzgrs_backend::common::ColumnIndex; use libp2p::core::Endpoint; @@ -19,9 +19,8 @@ use libp2p::swarm::{ use libp2p::{Multiaddr, PeerId, Stream}; use libp2p_stream::{Control, IncomingStreams, OpenStreamError}; use nomos_core::da::BlobId; -use nomos_da_messages::sampling::sample_err::SampleErrType; -use nomos_da_messages::sampling::{sample_res, SampleErr, SampleReq, SampleRes}; -use nomos_da_messages::{common, pack_message, unpack_from_reader}; +use nomos_da_messages::packing::{pack_to_writer, unpack_from_reader}; +use nomos_da_messages::{common, sampling}; use subnetworks_assignations::MembershipHandler; use thiserror::Error; use tokio::sync::mpsc; @@ -45,7 +44,7 @@ pub enum SamplingError { Protocol { subnetwork_id: SubnetworkId, peer_id: PeerId, - error: SampleErr, + error: sampling::SampleError, }, #[error("Error dialing peer [{peer_id}]: {error}")] OpenStream { @@ -162,12 +161,14 @@ pub struct BehaviourSampleReq { pub column_idx: ColumnIndex, } -impl TryFrom for BehaviourSampleReq { +impl TryFrom for BehaviourSampleReq { type Error = Vec; - fn try_from(req: SampleReq) -> Result { - let blob_id: BlobId = req.blob_id.try_into()?; - let column_idx = req.column_idx as u16; // TODO: This doesn't handle the overflow. + fn try_from(req: sampling::SampleRequest) -> Result { + let sampling::SampleRequest { + blob_id, + column_idx, + } = req; Ok(Self { blob_id, @@ -188,23 +189,19 @@ pub enum BehaviourSampleRes { }, } -impl From for SampleRes { +impl From for sampling::SampleResponse { fn from(res: BehaviourSampleRes) -> Self { match res { - BehaviourSampleRes::SamplingSuccess { blob, blob_id, .. } => SampleRes { - message_type: Some(sample_res::MessageType::Blob(common::Blob { - blob_id: blob_id.to_vec(), - data: bincode::serialize(&blob) - .expect("Blob from service should be serializable"), - })), - }, - BehaviourSampleRes::SampleNotFound { blob_id } => SampleRes { - message_type: Some(sample_res::MessageType::Err(SampleErr { - blob_id: blob_id.into(), - err_type: SampleErrType::NotFound.into(), - err_description: "Sample not found".to_string(), - })), - }, + BehaviourSampleRes::SamplingSuccess { blob, blob_id, .. } => { + sampling::SampleResponse::Blob(common::Blob::new(blob_id, *blob)) + } + BehaviourSampleRes::SampleNotFound { blob_id } => { + sampling::SampleResponse::Error(sampling::SampleError::new( + blob_id, + sampling::SampleErrorType::NotFound, + "Sample not found", + )) + } } } } @@ -238,7 +235,7 @@ struct ResponseChannel { response_receiver: Receiver, } -type StreamHandlerFutureSuccess = (BlobId, SubnetworkId, SampleRes, SampleStream); +type StreamHandlerFutureSuccess = (BlobId, SubnetworkId, sampling::SampleResponse, SampleStream); type OutgoingStreamHandlerFuture = BoxFuture<'static, Result>; type IncomingStreamHandlerFuture = BoxFuture<'static, Result>; @@ -333,40 +330,24 @@ where /// Writes the request to the stream and waits for a response async fn stream_sample( mut stream: SampleStream, - message: SampleReq, + message: sampling::SampleRequest, subnetwork_id: SubnetworkId, blob_id: BlobId, ) -> Result { - let bytes = pack_message(&message).map_err(|error| SamplingError::Io { - peer_id: stream.peer_id, - error, - })?; - stream - .stream - .write_all(&bytes) - .await - .map_err(|error| SamplingError::Io { + let into_sampling_error = |error: std::io::Error| -> SamplingError { + SamplingError::Io { peer_id: stream.peer_id, error, - })?; - stream - .stream - .flush() + } + }; + pack_to_writer(&message, &mut stream.stream) + .map_err(into_sampling_error) + .await?; + stream.stream.flush().await.map_err(into_sampling_error)?; + let response: sampling::SampleResponse = unpack_from_reader(&mut stream.stream) .await - .map_err(|error| SamplingError::Io { - peer_id: stream.peer_id, - error, - })?; - let response: SampleRes = - unpack_from_reader(&mut stream.stream) - .await - .map_err(|error| SamplingError::Io { - peer_id: stream.peer_id, - error, - })?; - // Safety: blob_id should always be a 32bytes hash, currently is abstracted into a `Vec` - // but probably we should have a `[u8; 32]` wrapped in a custom type `BlobId` - // TODO: use blob_id when changing types to [u8; 32] + .map_err(into_sampling_error)?; + // Safety: blob_id should always be a 32bytes hash Ok((blob_id, subnetwork_id, response, stream)) } @@ -389,12 +370,10 @@ where mut stream: SampleStream, ) { let peer = stream.peer_id; + // If there is a pending task schedule next one if let Some((subnetwork_id, blob_id)) = Self::next_request(&peer, to_sample) { - let sample_request = SampleReq { - blob_id: blob_id.to_vec(), - column_idx: subnetwork_id, - }; + let sample_request = sampling::SampleRequest::new(blob_id, subnetwork_id); outgoing_tasks .push(Self::stream_sample(stream, sample_request, subnetwork_id, blob_id).boxed()); // if not pop stream from connected ones @@ -414,7 +393,7 @@ where mut stream: SampleStream, channel: ResponseChannel, ) -> Result { - let request: SampleReq = unpack_from_reader(&mut stream.stream) + let request: sampling::SampleRequest = unpack_from_reader(&mut stream.stream) .await .map_err(|error| SamplingError::Io { peer_id: stream.peer_id, @@ -433,7 +412,7 @@ where request, peer_id: stream.peer_id, })?; - let response: SampleRes = channel + let response: sampling::SampleResponse = channel .response_receiver .await .map_err(|error| SamplingError::ResponseChannel { @@ -441,18 +420,12 @@ where peer_id: stream.peer_id, })? .into(); - let bytes = pack_message(&response).map_err(|error| SamplingError::Io { - peer_id: stream.peer_id, - error, - })?; - stream - .stream - .write_all(&bytes) - .await + pack_to_writer(&response, &mut stream.stream) .map_err(|error| SamplingError::Io { peer_id: stream.peer_id, error, - })?; + }) + .await?; stream .stream .flush() @@ -516,10 +489,7 @@ impl + 'sta } else { connected_peers.insert(peer); let control = control.clone(); - let sample_request = SampleReq { - blob_id: blob_id.to_vec(), - column_idx: subnetwork_id, - }; + let sample_request = sampling::SampleRequest::new(blob_id, subnetwork_id); let with_dial_task: OutgoingStreamHandlerFuture = async move { let stream = Self::open_stream(peer, control).await?; Self::stream_sample(stream, sample_request, subnetwork_id, blob_id).await @@ -533,13 +503,11 @@ impl + 'sta fn handle_sample_response( blob_id: BlobId, subnetwork_id: SubnetworkId, - sample_response: SampleRes, + sample_response: sampling::SampleResponse, peer_id: PeerId, ) -> Option::ToSwarm, THandlerInEvent>>> { match sample_response { - SampleRes { - message_type: Some(sample_res::MessageType::Err(error)), - } => Some(Poll::Ready(ToSwarm::GenerateEvent( + sampling::SampleResponse::Error(error) => Some(Poll::Ready(ToSwarm::GenerateEvent( SamplingEvent::SamplingError { error: SamplingError::Protocol { subnetwork_id, @@ -548,35 +516,13 @@ impl + 'sta }, }, ))), - SampleRes { - message_type: Some(sample_res::MessageType::Blob(da_blob)), - } => { - let blob = - bincode::deserialize::(da_blob.data.as_slice()).map_err(|error| { - SamplingError::Deserialize { - blob_id, - subnetwork_id, - peer_id, - error, - } - }); - match blob { - Ok(blob) => Some(Poll::Ready(ToSwarm::GenerateEvent( - SamplingEvent::SamplingSuccess { - blob_id, - subnetwork_id, - blob: Box::new(blob), - }, - ))), - Err(error) => Some(Poll::Ready(ToSwarm::GenerateEvent( - SamplingEvent::SamplingError { error }, - ))), - } - } - _ => { - error!("Invalid sampling response received, empty body"); - None - } + sampling::SampleResponse::Blob(blob) => Some(Poll::Ready(ToSwarm::GenerateEvent( + SamplingEvent::SamplingSuccess { + blob_id, + subnetwork_id, + blob: Box::new(blob.data), + }, + ))), } } } diff --git a/nomos-da/network/core/src/swarm/common.rs b/nomos-da/network/core/src/swarm/common.rs index 91e2d021..734708e1 100644 --- a/nomos-da/network/core/src/swarm/common.rs +++ b/nomos-da/network/core/src/swarm/common.rs @@ -5,7 +5,7 @@ use crate::SubnetworkId; use kzgrs_backend::common::blob::DaBlob; use libp2p::PeerId; use log::{debug, error}; -use nomos_da_messages::replication::ReplicationReq; +use nomos_da_messages::replication; use subnetworks_assignations::MembershipHandler; use tokio::sync::mpsc::UnboundedSender; @@ -19,22 +19,14 @@ pub async fn handle_validator_dispersal_event( match event { // Send message for replication DispersalEvent::IncomingMessage { message } => { - if let Ok(blob) = bincode::deserialize::( - message - .blob - .as_ref() - .expect("Message blob should not be empty") - .data - .as_slice(), - ) { - if let Err(e) = validation_events_sender.send(blob) { - error!("Error sending blob to validation: {e:?}"); - } + let blob_message = message.blob; + if let Err(e) = validation_events_sender.send(blob_message.data.clone()) { + error!("Error sending blob to validation: {e:?}"); } - replication_behaviour.send_message(ReplicationReq { - blob: message.blob, - subnetwork_id: message.subnetwork_id, - }); + replication_behaviour.send_message(replication::ReplicationRequest::new( + blob_message, + message.subnetwork_id, + )); } } } @@ -53,16 +45,7 @@ pub async fn handle_replication_event( event: ReplicationEvent, ) { let ReplicationEvent::IncomingMessage { message, .. } = event; - if let Ok(blob) = bincode::deserialize::( - message - .blob - .as_ref() - .expect("Message blob should not be empty") - .data - .as_slice(), - ) { - if let Err(e) = validation_events_sender.send(blob) { - error!("Error sending blob to validation: {e:?}"); - } + if let Err(e) = validation_events_sender.send(message.blob.data) { + error!("Error sending blob to validation: {e:?}"); } } diff --git a/nomos-da/network/core/src/test_utils.rs b/nomos-da/network/core/src/test_utils.rs index 122bb469..0fb74589 100644 --- a/nomos-da/network/core/src/test_utils.rs +++ b/nomos-da/network/core/src/test_utils.rs @@ -1,3 +1,4 @@ +use crate::SubnetworkId; use libp2p::PeerId; use std::collections::HashSet; use subnetworks_assignations::MembershipHandler; @@ -8,7 +9,7 @@ pub struct AllNeighbours { } impl MembershipHandler for AllNeighbours { - type NetworkId = u32; + type NetworkId = SubnetworkId; type Id = PeerId; fn membership(&self, _self_id: &Self::Id) -> HashSet { diff --git a/nomos-da/network/messages/Cargo.toml b/nomos-da/network/messages/Cargo.toml index e650e1a0..43faf0ed 100644 --- a/nomos-da/network/messages/Cargo.toml +++ b/nomos-da/network/messages/Cargo.toml @@ -4,12 +4,11 @@ version = "0.1.0" edition = "2021" [dependencies] -bytes = "1.6.1" -futures = "0.3.30" -prost = "0.13.1" - -[build-dependencies] -prost-build = "0.13.1" +nomos-core = { path = "../../../nomos-core/chain-defs" } +kzgrs-backend = { path = "../../kzgrs-backend" } +serde = { version = "1.0.215", features = ["derive"] } +futures = "0.3.31" +tokio = "1" [dev-dependencies] -tokio = { version = "1", features = ["rt", "macros"] } +tokio = { version = "1", features = ["macros"] } diff --git a/nomos-da/network/messages/build.rs b/nomos-da/network/messages/build.rs deleted file mode 100644 index 23db0cfb..00000000 --- a/nomos-da/network/messages/build.rs +++ /dev/null @@ -1,16 +0,0 @@ -use std::{env, path::PathBuf}; - -fn main() { - let project_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap()); - let proto_includes = project_dir.join("proto"); - - let dispersal_proto = project_dir.join("proto/dispersal.proto"); - let replication_proto = project_dir.join("proto/replication.proto"); - let sampling_proto = project_dir.join("proto/sampling.proto"); - - prost_build::compile_protos( - &[dispersal_proto, replication_proto, sampling_proto], - &[proto_includes], - ) - .unwrap(); -} diff --git a/nomos-da/network/messages/src/common.rs b/nomos-da/network/messages/src/common.rs new file mode 100644 index 00000000..e51436ce --- /dev/null +++ b/nomos-da/network/messages/src/common.rs @@ -0,0 +1,36 @@ +use kzgrs_backend::common::blob::DaBlob; +use nomos_core::da::BlobId; +use serde::{Deserialize, Serialize}; + +#[repr(C)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct Blob { + pub blob_id: BlobId, + pub data: DaBlob, +} + +impl Blob { + pub fn new(blob_id: BlobId, data: DaBlob) -> Self { + Self { blob_id, data } + } +} + +#[repr(u8)] +#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] +pub enum CloseMessageReason { + GracefulShutdown = 0, + SubnetChange = 1, + SubnetSampleFailure = 2, +} + +#[repr(C)] +#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct CloseMessage { + pub reason: CloseMessageReason, +} + +impl CloseMessage { + pub fn new(reason: CloseMessageReason) -> Self { + CloseMessage { reason } + } +} diff --git a/nomos-da/network/messages/src/dispersal.rs b/nomos-da/network/messages/src/dispersal.rs index 72279339..fa9808ba 100644 --- a/nomos-da/network/messages/src/dispersal.rs +++ b/nomos-da/network/messages/src/dispersal.rs @@ -1,29 +1,56 @@ -use crate::{common, impl_from_for_message}; +use crate::common::Blob; +use crate::SubnetworkId; +use nomos_core::da::BlobId; +use serde::{Deserialize, Serialize}; -include!(concat!(env!("OUT_DIR"), "/nomos.da.v1.dispersal.rs")); +#[repr(C)] +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub enum DispersalErrorType { + ChunkSize, + Verification, +} -impl_from_for_message!( - Message, - DispersalReq => DispersalReq, - DispersalRes => DispersalRes, - common::SessionReq => SessionReq, -); +#[repr(C)] +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct DispersalError { + pub blob_id: BlobId, + pub error_type: DispersalErrorType, + pub error_description: String, +} -#[cfg(test)] -mod tests { - use crate::{common, dispersal}; - - #[test] - fn dispersal_message() { - let blob = common::Blob { - blob_id: vec![0; 32], - data: vec![1; 32], - }; - let req = dispersal::DispersalReq { - blob: Some(blob), - subnetwork_id: 0, - }; - - assert_eq!(req.blob.unwrap().blob_id, vec![0; 32]); +impl DispersalError { + pub fn new( + blob_id: BlobId, + error_type: DispersalErrorType, + error_description: impl Into, + ) -> Self { + Self { + blob_id, + error_type, + error_description: error_description.into(), + } } } + +#[repr(C)] +#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct DispersalRequest { + pub blob: Blob, + pub subnetwork_id: SubnetworkId, +} + +impl DispersalRequest { + pub fn new(blob: Blob, subnetwork_id: SubnetworkId) -> Self { + Self { + blob, + subnetwork_id, + } + } +} + +#[repr(C)] +#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] +pub enum DispersalResponse { + BlobId(BlobId), + Error(DispersalError), +} diff --git a/nomos-da/network/messages/src/lib.rs b/nomos-da/network/messages/src/lib.rs index e06b3342..033752a7 100644 --- a/nomos-da/network/messages/src/lib.rs +++ b/nomos-da/network/messages/src/lib.rs @@ -1,97 +1,8 @@ -use std::io; - -use bytes::Bytes; -use futures::AsyncReadExt; -use prost::Message; - +pub mod common; pub mod dispersal; +pub mod packing; pub mod replication; pub mod sampling; -const MAX_MSG_LEN_BYTES: usize = 2; - -pub mod common { - include!(concat!(env!("OUT_DIR"), "/nomos.da.v1.common.rs")); -} - -pub fn pack_message(message: &impl Message) -> Result, io::Error> { - let data_len = message.encoded_len(); - - if data_len > (1 << (MAX_MSG_LEN_BYTES * 8)) { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "Message too large", - )); - } - - let mut buf = Vec::with_capacity(MAX_MSG_LEN_BYTES + data_len); - buf.extend_from_slice(&(data_len as u16).to_be_bytes()); - message.encode(&mut buf).unwrap(); - - Ok(buf) -} - -pub async fn unpack_from_reader(reader: &mut R) -> Result -where - M: Message + Default, - R: AsyncReadExt + Unpin, -{ - let mut length_prefix = [0u8; MAX_MSG_LEN_BYTES]; - reader.read_exact(&mut length_prefix).await?; - let data_length = u16::from_be_bytes(length_prefix) as usize; - - let mut data = vec![0u8; data_length]; - reader.read_exact(&mut data).await?; - M::decode(Bytes::from(data)).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) -} - -/// Macro to implement From trait for Wrapper Messages. -/// -/// Usage: -/// ```ignore -/// impl_from_for_message!( -/// WrapperMessage, // impl From for WrappedMessage { -/// Req => WrappedReq, // .. return WrappedMsg::MessageType::WrappedReq(Req); -/// ); -/// ``` -#[macro_export] -macro_rules! impl_from_for_message { - ($message:path, $($type:path => $variant:ident),+ $(,)?) => { - $( - impl From<$type> for $message { - fn from(msg: $type) -> Self { - $message { - message_type: Some(message::MessageType::$variant(msg)), - } - } - } - )+ - } -} - -#[cfg(test)] -mod tests { - use futures::io::BufReader; - - use crate::{common, dispersal, pack_message, unpack_from_reader}; - - #[tokio::test] - async fn pack_and_unpack_from_reader() { - let blob = common::Blob { - blob_id: vec![0; 32], - data: vec![1; 32], - }; - let message: dispersal::Message = dispersal::DispersalReq { - blob: Some(blob), - subnetwork_id: 0, - } - .into(); - - let packed = pack_message(&message).unwrap(); - - let mut reader = BufReader::new(&packed[..]); - let unpacked: dispersal::Message = unpack_from_reader(&mut reader).await.unwrap(); - - assert_eq!(message, unpacked); - } -} +type Result = std::result::Result; +type SubnetworkId = u16; // Must match `nomos-da-network-core::SubnetworkId` diff --git a/nomos-da/network/messages/src/packing.rs b/nomos-da/network/messages/src/packing.rs new file mode 100644 index 00000000..2efff203 --- /dev/null +++ b/nomos-da/network/messages/src/packing.rs @@ -0,0 +1,192 @@ +// STD +use futures::{AsyncReadExt, AsyncWriteExt}; +use std::io; +// Crates +use nomos_core::wire; +use serde::de::DeserializeOwned; +use serde::Serialize; +// Internal +use crate::Result; + +type LenType = u16; +const MAX_MSG_LEN_BYTES: usize = size_of::(); +const MAX_MSG_LEN: usize = 1 << (MAX_MSG_LEN_BYTES * 8); + +fn into_failed_to_serialize(error: wire::Error) -> io::Error { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to serialize message: {}", error), + ) +} + +fn into_failed_to_deserialize(error: wire::Error) -> io::Error { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to deserialize message: {}", error), + ) +} + +struct MessageTooLargeError(usize); + +impl From for io::Error { + fn from(value: MessageTooLargeError) -> Self { + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Message too large. Maximum size is {}. Actual size is {}", + MAX_MSG_LEN, value.0 + ), + ) + } +} + +pub fn pack(message: &Message) -> Result> +where + Message: Serialize, +{ + wire::serialize(message).map_err(into_failed_to_serialize) +} + +fn get_packed_message_size(packed_message: &[u8]) -> Result { + let data_length = packed_message.len(); + if data_length > MAX_MSG_LEN { + return Err(MessageTooLargeError(data_length).into()); + } + Ok(data_length) +} + +fn prepare_message_for_writer(packed_message: &[u8]) -> Result> { + let data_length = get_packed_message_size(packed_message)?; + let mut buffer = Vec::with_capacity(MAX_MSG_LEN_BYTES + data_length); + buffer.extend_from_slice(&(data_length as LenType).to_be_bytes()); + buffer.extend_from_slice(packed_message); + Ok(buffer) +} + +pub async fn pack_to_writer(message: &Message, writer: &mut Writer) -> Result<()> +where + Message: Serialize, + Writer: AsyncWriteExt + Unpin, +{ + let packed_message = pack(message)?; + let prepared_packed_message = prepare_message_for_writer(&packed_message)?; + writer.write_all(&prepared_packed_message).await +} + +async fn read_data_length(reader: &mut R) -> Result +where + R: AsyncReadExt + Unpin, +{ + let mut length_prefix = [0u8; MAX_MSG_LEN_BYTES]; + reader.read_exact(&mut length_prefix).await?; + let s = LenType::from_be_bytes(length_prefix) as usize; + Ok(s) +} + +pub fn unpack(data: &[u8]) -> Result { + wire::deserialize(data).map_err(into_failed_to_deserialize) +} + +pub async fn unpack_from_reader(reader: &mut R) -> Result +where + Message: DeserializeOwned, + R: AsyncReadExt + Unpin, +{ + let data_length = read_data_length(reader).await?; + let mut data = vec![0u8; data_length]; + reader.read_exact(&mut data).await?; + unpack(&data) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::common::Blob; + use crate::dispersal::{DispersalError, DispersalErrorType, DispersalRequest}; + use futures::io::BufReader; + use kzgrs_backend::common::blob::DaBlob; + use kzgrs_backend::encoder::{self, DaEncoderParams}; + use nomos_core::da::{BlobId, DaEncoder}; + + fn get_encoder() -> encoder::DaEncoder { + const DOMAIN_SIZE: usize = 16; + let params = DaEncoderParams::default_with(DOMAIN_SIZE); + encoder::DaEncoder::new(params) + } + + fn get_da_blob() -> DaBlob { + let encoder = get_encoder(); + let data = vec![ + 49u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, + 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, + ]; + + let encoded_data = encoder.encode(&data).unwrap(); + let columns: Vec<_> = encoded_data.extended_data.columns().collect(); + + let index = 0; + let da_blob = DaBlob { + column: columns[index].clone(), + column_idx: index + .try_into() + .expect("Column index shouldn't overflow the target type"), + column_commitment: encoded_data.column_commitments[index], + aggregated_column_commitment: encoded_data.aggregated_column_commitment, + aggregated_column_proof: encoded_data.aggregated_column_proofs[index], + rows_commitments: encoded_data.row_commitments.clone(), + rows_proofs: encoded_data + .rows_proofs + .iter() + .map(|proofs| proofs.get(index).cloned().unwrap()) + .collect(), + }; + + da_blob + } + + #[tokio::test] + async fn pack_and_unpack() -> Result<()> { + let blob_id = BlobId::from([0; 32]); + let data = get_da_blob(); + let blob = Blob::new(blob_id, data); + let subnetwork_id = 0; + let message = DispersalRequest::new(blob, subnetwork_id); + + let packed_message = pack(&message)?; + let unpacked_message: DispersalRequest = unpack(&packed_message)?; + + assert_eq!(message, unpacked_message); + Ok(()) + } + + #[tokio::test] + async fn pack_to_writer_and_unpack_from_reader() -> Result<()> { + let blob_id = BlobId::from([0; 32]); + let data = get_da_blob(); + let blob = Blob::new(blob_id, data); + let subnetwork_id = 0; + let message = DispersalRequest::new(blob, subnetwork_id); + + let mut writer = Vec::new(); + pack_to_writer(&message, &mut writer).await?; + + let mut reader = BufReader::new(writer.as_slice()); + let unpacked_message: DispersalRequest = unpack_from_reader(&mut reader).await?; + + assert_eq!(message, unpacked_message); + Ok(()) + } + + #[tokio::test] + async fn pack_to_writer_too_large_message() { + let blob_id = BlobId::from([0; 32]); + let error_description = ["."; MAX_MSG_LEN].concat(); + let message = + DispersalError::new(blob_id, DispersalErrorType::ChunkSize, error_description); + + let mut writer = Vec::new(); + let res = pack_to_writer(&message, &mut writer).await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), io::ErrorKind::InvalidData); + } +} diff --git a/nomos-da/network/messages/src/replication.rs b/nomos-da/network/messages/src/replication.rs index b0216fe6..d4af8c16 100644 --- a/nomos-da/network/messages/src/replication.rs +++ b/nomos-da/network/messages/src/replication.rs @@ -1,9 +1,19 @@ -use crate::{common, impl_from_for_message}; +use crate::common::Blob; +use crate::SubnetworkId; +use serde::{Deserialize, Serialize}; -include!(concat!(env!("OUT_DIR"), "/nomos.da.v1.replication.rs")); +#[repr(C)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct ReplicationRequest { + pub blob: Blob, + pub subnetwork_id: SubnetworkId, +} -impl_from_for_message!( - Message, - ReplicationReq => ReplicationReq, - common::SessionReq => SessionReq, -); +impl ReplicationRequest { + pub fn new(blob: Blob, subnetwork_id: SubnetworkId) -> Self { + Self { + blob, + subnetwork_id, + } + } +} diff --git a/nomos-da/network/messages/src/sampling.rs b/nomos-da/network/messages/src/sampling.rs index afe6cbe4..a92af17c 100644 --- a/nomos-da/network/messages/src/sampling.rs +++ b/nomos-da/network/messages/src/sampling.rs @@ -1,9 +1,56 @@ -use crate::impl_from_for_message; +use crate::common::Blob; +use kzgrs_backend::common::ColumnIndex; +use nomos_core::da::BlobId; +use serde::{Deserialize, Serialize}; -include!(concat!(env!("OUT_DIR"), "/nomos.da.v1.sampling.rs")); +#[repr(C)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub enum SampleErrorType { + NotFound, +} -impl_from_for_message!( - Message, - SampleReq => SampleReq, - SampleRes => SampleRes, -); +#[repr(C)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct SampleError { + pub blob_id: BlobId, + pub error_type: SampleErrorType, + pub error_description: String, +} + +impl SampleError { + pub fn new( + blob_id: BlobId, + error_type: SampleErrorType, + error_description: impl Into, + ) -> Self { + Self { + blob_id, + error_type, + error_description: error_description.into(), + } + } +} + +#[repr(C)] +#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct SampleRequest { + pub blob_id: BlobId, + pub column_idx: ColumnIndex, +} + +impl SampleRequest { + pub fn new(blob_id: BlobId, column_idx: ColumnIndex) -> Self { + Self { + blob_id, + column_idx, + } + } +} + +#[allow(clippy::large_enum_variant)] +#[repr(C)] +#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] +pub enum SampleResponse { + Blob(Blob), + Error(SampleError), +} diff --git a/nomos-da/network/subnetworks-assignations/src/versions/v1.rs b/nomos-da/network/subnetworks-assignations/src/versions/v1.rs index 27c18dcf..b21bdefc 100644 --- a/nomos-da/network/subnetworks-assignations/src/versions/v1.rs +++ b/nomos-da/network/subnetworks-assignations/src/versions/v1.rs @@ -42,7 +42,7 @@ impl FillFromNodeList { } impl MembershipHandler for FillFromNodeList { - type NetworkId = u32; + type NetworkId = u16; type Id = PeerId; fn membership(&self, id: &Self::Id) -> HashSet { diff --git a/nomos-da/network/subnetworks-assignations/src/versions/v2.rs b/nomos-da/network/subnetworks-assignations/src/versions/v2.rs index 3e002d92..f2fad238 100644 --- a/nomos-da/network/subnetworks-assignations/src/versions/v2.rs +++ b/nomos-da/network/subnetworks-assignations/src/versions/v2.rs @@ -65,7 +65,7 @@ impl FillWithOriginalReplication { } impl MembershipHandler for FillWithOriginalReplication { - type NetworkId = u32; + type NetworkId = u16; type Id = PeerId; fn membership(&self, id: &Self::Id) -> HashSet { diff --git a/nomos-services/data-availability/dispersal/src/backend/kzgrs.rs b/nomos-services/data-availability/dispersal/src/backend/kzgrs.rs index 45040d31..a44cce1f 100644 --- a/nomos-services/data-availability/dispersal/src/backend/kzgrs.rs +++ b/nomos-services/data-availability/dispersal/src/backend/kzgrs.rs @@ -46,7 +46,7 @@ pub struct DispersalFromAdapter { impl DaDispersal for DispersalFromAdapter where Adapter: DispersalNetworkAdapter + Send + Sync, - Adapter::SubnetworkId: From + Send + Sync, + Adapter::SubnetworkId: From + Send + Sync, { type EncodedData = EncodedData; type Error = DynError; @@ -62,7 +62,7 @@ where let reponses_stream = adapter.dispersal_events_stream().await?; for (subnetwork_id, blob) in encoded_data_to_da_blobs(encoded_data).enumerate() { adapter - .disperse((subnetwork_id as u32).into(), blob) + .disperse((subnetwork_id as u16).into(), blob) .await?; } @@ -88,7 +88,7 @@ impl DispersalBackend for DispersalKZGRSBackend where NetworkAdapter: DispersalNetworkAdapter + Send + Sync, - NetworkAdapter::SubnetworkId: From + Send + Sync, + NetworkAdapter::SubnetworkId: From + Send + Sync, MempoolAdapter: DaMempoolAdapter + Send + Sync, { type Settings = DispersalKZGRSBackendSettings; diff --git a/nomos-services/data-availability/network/src/backends/libp2p/common.rs b/nomos-services/data-availability/network/src/backends/libp2p/common.rs index fdc88b1d..8f8d20bc 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/common.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/common.rs @@ -120,7 +120,7 @@ pub(crate) async fn handle_validator_events_stream( let result = match maybe_blob { Some(blob) => BehaviourSampleRes::SamplingSuccess { blob_id, - subnetwork_id: blob.column_idx as u32, + subnetwork_id: blob.column_idx, blob: Box::new(blob), }, None => BehaviourSampleRes::SampleNotFound { blob_id }, diff --git a/nomos-services/data-availability/network/src/backends/libp2p/executor.rs b/nomos-services/data-availability/network/src/backends/libp2p/executor.rs index 9330a63d..02f79490 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/executor.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/executor.rs @@ -290,7 +290,7 @@ where let membership = &config.validator_settings.membership; // filter out which subnetworks we already have connections with - let connected_subnetworks: HashSet = (0..config.num_subnets as u32) + let connected_subnetworks: HashSet = (0..config.num_subnets) .filter(|subnetwork_id| { !membership .members_of(subnetwork_id) @@ -351,7 +351,7 @@ where + 'static, { let membership = &config.validator_settings.membership; - (0..config.num_subnets as u32) + (0..config.num_subnets) .filter(|subnetwork_id| !filtered_subnetworks.contains(subnetwork_id)) .filter_map(|subnetwork_id| { membership diff --git a/nomos-services/data-availability/verifier/src/backend/kzgrs.rs b/nomos-services/data-availability/verifier/src/backend/kzgrs.rs index 59d5e9b0..6c576080 100644 --- a/nomos-services/data-availability/verifier/src/backend/kzgrs.rs +++ b/nomos-services/data-availability/verifier/src/backend/kzgrs.rs @@ -66,6 +66,6 @@ impl DaVerifier for KzgrsDaVerifier { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct KzgrsDaVerifierSettings { pub sk: String, - pub index: HashSet, + pub index: HashSet, pub global_params_path: String, } diff --git a/tests/src/topology/configs/da.rs b/tests/src/topology/configs/da.rs index 2abb9d84..cc6b089f 100644 --- a/tests/src/topology/configs/da.rs +++ b/tests/src/topology/configs/da.rs @@ -59,7 +59,7 @@ pub struct GeneralDaConfig { pub blob_storage_directory: PathBuf, pub global_params_path: String, pub verifier_sk: String, - pub verifier_index: HashSet, + pub verifier_index: HashSet, pub num_samples: u16, pub num_subnets: u16, pub old_blobs_check_interval: Duration,