From b105b2541c8e82f1a415d788c45f211aa06f68c0 Mon Sep 17 00:00:00 2001 From: Gusto Date: Thu, 5 Sep 2024 20:04:15 +0300 Subject: [PATCH] WIP: Sample message to behaviour massage translation --- .../core/src/protocols/sampling/behaviour.rs | 101 ++++++++++++++---- .../network/src/backends/libp2p/validator.rs | 37 ++++++- .../data-availability/sampling/src/lib.rs | 4 + .../verifier/src/network/adapters/libp2p.rs | 2 - 4 files changed, 120 insertions(+), 24 deletions(-) diff --git a/nomos-da/network/core/src/protocols/sampling/behaviour.rs b/nomos-da/network/core/src/protocols/sampling/behaviour.rs index 53f8184f..da699ff0 100644 --- a/nomos-da/network/core/src/protocols/sampling/behaviour.rs +++ b/nomos-da/network/core/src/protocols/sampling/behaviour.rs @@ -17,8 +17,10 @@ use libp2p::swarm::{ }; use libp2p::{Multiaddr, PeerId, Stream}; use libp2p_stream::{Control, IncomingStreams, OpenStreamError}; +use nomos_core::da::BlobId; +use nomos_da_messages::sampling::sample_err::SampleErrType; use nomos_da_messages::sampling::{sample_res, SampleErr, SampleReq, SampleRes}; -use nomos_da_messages::{pack_message, unpack_from_reader}; +use nomos_da_messages::{common, pack_message, unpack_from_reader}; use subnetworks_assignations::MembershipHandler; use thiserror::Error; use tokio::sync::mpsc; @@ -57,7 +59,12 @@ pub enum SamplingError { error: bincode::Error, }, #[error("Error sending request: {request:?}")] - RequestChannel { request: SampleReq, peer_id: PeerId }, + RequestChannel { + request: BehaviourSampleReq, + peer_id: PeerId, + }, + #[error("Malformed blob id: {blob_id:?}")] + InvalidBlobId { peer_id: PeerId, blob_id: Vec }, #[error("Canceled response: {error}")] ResponseChannel { error: Canceled, peer_id: PeerId }, } @@ -71,6 +78,7 @@ impl SamplingError { SamplingError::Deserialize { peer_id, .. } => peer_id, SamplingError::RequestChannel { peer_id, .. } => peer_id, SamplingError::ResponseChannel { peer_id, .. } => peer_id, + SamplingError::InvalidBlobId { peer_id, .. } => peer_id, } } @@ -132,13 +140,64 @@ impl Clone for SamplingError { peer_id: *peer_id, error: *error, }, + SamplingError::InvalidBlobId { blob_id, peer_id } => SamplingError::InvalidBlobId { + peer_id: *peer_id, + blob_id: blob_id.clone(), + }, } } } -/// Inner type representation of a Blob ID -// TODO: Use a proper type that is common to the codebase -type BlobId = [u8; 32]; +#[derive(Debug, Clone)] +pub struct BehaviourSampleReq { + pub blob_id: BlobId, +} + +impl TryFrom for BehaviourSampleReq { + type Error = Vec; + + fn try_from(req: SampleReq) -> Result { + let blob_id: BlobId = req.blob_id.try_into()?; + Ok(Self { blob_id }) + } +} + +#[derive(Debug)] +pub enum BehaviourSampleRes { + SamplingSuccess { + blob_id: BlobId, + subnetwork_id: SubnetworkId, + blob: Box, + }, + SamplingError { + error: SamplingError, + }, +} + +impl From for SampleRes { + fn from(res: BehaviourSampleRes) -> Self { + match res { + // Handle the success case by consuming the fields + BehaviourSampleRes::SamplingSuccess { blob, blob_id, .. } => SampleRes { + message_type: Some(sample_res::MessageType::Blob(common::Blob { + blob_id: blob_id.to_vec(), + data: bincode::serialize(&blob).unwrap(), + })), + }, + // Handle the error case by consuming the error + BehaviourSampleRes::SamplingError { error } => { + let blob_id = error.blob_id().map(|id| id.to_vec()).unwrap_or(vec![0; 32]); + SampleRes { + message_type: Some(sample_res::MessageType::Err(SampleErr { + blob_id, + err_type: SampleErrType::NotFound.into(), + err_description: error.to_string(), + })), + } + } + } + } +} #[derive(Debug)] pub enum SamplingEvent { @@ -149,8 +208,8 @@ pub enum SamplingEvent { blob: Box, }, IncomingSample { - request_receiver: Receiver, - response_sender: Sender, + request_receiver: Receiver, + response_sender: Sender, }, SamplingError { error: SamplingError, @@ -165,8 +224,8 @@ struct SampleStream { /// Auxiliary struct that binds where to send a request and the pair channel to listen for a response struct ResponseChannel { - request_sender: Sender, - response_receiver: Receiver, + request_sender: Sender, + response_receiver: Receiver, } type StreamHandlerFutureSuccess = (BlobId, SubnetworkId, SampleRes, SampleStream); @@ -350,6 +409,12 @@ where peer_id: stream.peer_id, error, })?; + let request = BehaviourSampleReq::try_from(request).map_err(|blob_id| { + SamplingError::InvalidBlobId { + peer_id: stream.peer_id, + blob_id, + } + })?; channel .request_sender .send(request) @@ -357,14 +422,14 @@ where request, peer_id: stream.peer_id, })?; - let response = - channel - .response_receiver - .await - .map_err(|error| SamplingError::ResponseChannel { - error, - peer_id: stream.peer_id, - })?; + let response: SampleRes = channel + .response_receiver + .await + .map_err(|error| SamplingError::ResponseChannel { + error, + peer_id: stream.peer_id, + })? + .into(); let bytes = pack_message(&response).map_err(|error| SamplingError::Io { peer_id: stream.peer_id, error, @@ -394,7 +459,7 @@ where fn schedule_incoming_stream_task( incoming_tasks: &mut FuturesUnordered, sample_stream: SampleStream, - ) -> (Receiver, Sender) { + ) -> (Receiver, Sender) { let (request_sender, request_receiver) = oneshot::channel(); let (response_sender, response_receiver) = oneshot::channel(); let channel = ResponseChannel { diff --git a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs index 33d0c8ec..12eea250 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs @@ -6,7 +6,9 @@ use libp2p::{Multiaddr, PeerId}; use log::error; use nomos_core::da::BlobId; use nomos_da_network_core::protocols::sampling; -use nomos_da_network_core::protocols::sampling::behaviour::SamplingError; +use nomos_da_network_core::protocols::sampling::behaviour::{ + BehaviourSampleReq, BehaviourSampleRes, SamplingError, +}; use nomos_da_network_core::swarm::validator::{ValidatorEventsStream, ValidatorSwarm}; use nomos_da_network_core::SubnetworkId; use nomos_libp2p::secret_key_serde; @@ -17,9 +19,9 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::pin::Pin; use subnetworks_assignations::MembershipHandler; -use tokio::sync::broadcast; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::{broadcast, mpsc}; use tokio::task::JoinHandle; use tokio_stream::wrappers::BroadcastStream; @@ -49,6 +51,11 @@ pub enum DaNetworkEventKind { pub enum SamplingEvent { /// A success sampling SamplingSuccess { blob_id: BlobId, blob: Box }, + /// Incoming sampling request + SamplingRequest { + blob_id: BlobId, + response_sender: mpsc::Sender, + }, /// A failed sampling error SamplingError { error: SamplingError }, } @@ -214,8 +221,30 @@ async fn handle_validator_events_stream( error!("Error in internal broadcast of sampling success: {e:?}"); } } - sampling::behaviour::SamplingEvent::IncomingSample{ .. } => { - unimplemented!("Handle request/response from Sampling service"); + sampling::behaviour::SamplingEvent::IncomingSample{request_receiver, response_sender} => { + if let Ok(BehaviourSampleReq { blob_id }) = request_receiver.await { + let (sampling_response_sender, mut sampling_response_receiver) = mpsc::channel(1); + + if let Err(e) = sampling_broadcast_sender + .send(SamplingEvent::SamplingRequest { blob_id, response_sender: sampling_response_sender }) + { + error!("Error in internal broadcast of sampling request: {e:?}"); + sampling_response_receiver.close() + } + + if let Some(blob) = sampling_response_receiver.recv().await { + if response_sender + .send(BehaviourSampleRes::SamplingSuccess { + blob_id, + subnetwork_id: blob.column_idx as u32, + blob: Box::new(blob), + }) + .is_err() + { + error!("Error sending sampling success response"); + } + } + } } sampling::behaviour::SamplingEvent::SamplingError{ error } => { if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingError {error}) { diff --git a/nomos-services/data-availability/sampling/src/lib.rs b/nomos-services/data-availability/sampling/src/lib.rs index 4bb3b6b3..07d7a8cd 100644 --- a/nomos-services/data-availability/sampling/src/lib.rs +++ b/nomos-services/data-availability/sampling/src/lib.rs @@ -129,6 +129,10 @@ where } error!("Error while sampling: {error}"); } + SamplingEvent::SamplingRequest { + blob_id, + response_sender, + } => todo!(), } } } diff --git a/nomos-services/data-availability/verifier/src/network/adapters/libp2p.rs b/nomos-services/data-availability/verifier/src/network/adapters/libp2p.rs index 80ac05dd..137caef8 100644 --- a/nomos-services/data-availability/verifier/src/network/adapters/libp2p.rs +++ b/nomos-services/data-availability/verifier/src/network/adapters/libp2p.rs @@ -17,8 +17,6 @@ use tokio_stream::StreamExt; // internal use crate::network::NetworkAdapter; -pub const NOMOS_DA_TOPIC: &str = "NomosDa"; - pub struct Libp2pAdapter where M: MembershipHandler