WIP: Sample message to behaviour massage translation

This commit is contained in:
Gusto 2024-09-05 20:04:15 +03:00
parent b31e867f7f
commit b105b2541c
4 changed files with 120 additions and 24 deletions

View File

@ -17,8 +17,10 @@ use libp2p::swarm::{
}; };
use libp2p::{Multiaddr, PeerId, Stream}; use libp2p::{Multiaddr, PeerId, Stream};
use libp2p_stream::{Control, IncomingStreams, OpenStreamError}; use libp2p_stream::{Control, IncomingStreams, OpenStreamError};
use nomos_core::da::BlobId;
use nomos_da_messages::sampling::sample_err::SampleErrType;
use nomos_da_messages::sampling::{sample_res, SampleErr, SampleReq, SampleRes}; use nomos_da_messages::sampling::{sample_res, SampleErr, SampleReq, SampleRes};
use nomos_da_messages::{pack_message, unpack_from_reader}; use nomos_da_messages::{common, pack_message, unpack_from_reader};
use subnetworks_assignations::MembershipHandler; use subnetworks_assignations::MembershipHandler;
use thiserror::Error; use thiserror::Error;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -57,7 +59,12 @@ pub enum SamplingError {
error: bincode::Error, error: bincode::Error,
}, },
#[error("Error sending request: {request:?}")] #[error("Error sending request: {request:?}")]
RequestChannel { request: SampleReq, peer_id: PeerId }, RequestChannel {
request: BehaviourSampleReq,
peer_id: PeerId,
},
#[error("Malformed blob id: {blob_id:?}")]
InvalidBlobId { peer_id: PeerId, blob_id: Vec<u8> },
#[error("Canceled response: {error}")] #[error("Canceled response: {error}")]
ResponseChannel { error: Canceled, peer_id: PeerId }, ResponseChannel { error: Canceled, peer_id: PeerId },
} }
@ -71,6 +78,7 @@ impl SamplingError {
SamplingError::Deserialize { peer_id, .. } => peer_id, SamplingError::Deserialize { peer_id, .. } => peer_id,
SamplingError::RequestChannel { peer_id, .. } => peer_id, SamplingError::RequestChannel { peer_id, .. } => peer_id,
SamplingError::ResponseChannel { peer_id, .. } => peer_id, SamplingError::ResponseChannel { peer_id, .. } => peer_id,
SamplingError::InvalidBlobId { peer_id, .. } => peer_id,
} }
} }
@ -132,13 +140,64 @@ impl Clone for SamplingError {
peer_id: *peer_id, peer_id: *peer_id,
error: *error, error: *error,
}, },
SamplingError::InvalidBlobId { blob_id, peer_id } => SamplingError::InvalidBlobId {
peer_id: *peer_id,
blob_id: blob_id.clone(),
},
} }
} }
} }
/// Inner type representation of a Blob ID #[derive(Debug, Clone)]
// TODO: Use a proper type that is common to the codebase pub struct BehaviourSampleReq {
type BlobId = [u8; 32]; pub blob_id: BlobId,
}
impl TryFrom<SampleReq> for BehaviourSampleReq {
type Error = Vec<u8>;
fn try_from(req: SampleReq) -> Result<Self, Self::Error> {
let blob_id: BlobId = req.blob_id.try_into()?;
Ok(Self { blob_id })
}
}
#[derive(Debug)]
pub enum BehaviourSampleRes {
SamplingSuccess {
blob_id: BlobId,
subnetwork_id: SubnetworkId,
blob: Box<DaBlob>,
},
SamplingError {
error: SamplingError,
},
}
impl From<BehaviourSampleRes> for SampleRes {
fn from(res: BehaviourSampleRes) -> Self {
match res {
// Handle the success case by consuming the fields
BehaviourSampleRes::SamplingSuccess { blob, blob_id, .. } => SampleRes {
message_type: Some(sample_res::MessageType::Blob(common::Blob {
blob_id: blob_id.to_vec(),
data: bincode::serialize(&blob).unwrap(),
})),
},
// Handle the error case by consuming the error
BehaviourSampleRes::SamplingError { error } => {
let blob_id = error.blob_id().map(|id| id.to_vec()).unwrap_or(vec![0; 32]);
SampleRes {
message_type: Some(sample_res::MessageType::Err(SampleErr {
blob_id,
err_type: SampleErrType::NotFound.into(),
err_description: error.to_string(),
})),
}
}
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub enum SamplingEvent { pub enum SamplingEvent {
@ -149,8 +208,8 @@ pub enum SamplingEvent {
blob: Box<DaBlob>, blob: Box<DaBlob>,
}, },
IncomingSample { IncomingSample {
request_receiver: Receiver<SampleReq>, request_receiver: Receiver<BehaviourSampleReq>,
response_sender: Sender<SampleRes>, response_sender: Sender<BehaviourSampleRes>,
}, },
SamplingError { SamplingError {
error: SamplingError, error: SamplingError,
@ -165,8 +224,8 @@ struct SampleStream {
/// Auxiliary struct that binds where to send a request and the pair channel to listen for a response /// Auxiliary struct that binds where to send a request and the pair channel to listen for a response
struct ResponseChannel { struct ResponseChannel {
request_sender: Sender<SampleReq>, request_sender: Sender<BehaviourSampleReq>,
response_receiver: Receiver<SampleRes>, response_receiver: Receiver<BehaviourSampleRes>,
} }
type StreamHandlerFutureSuccess = (BlobId, SubnetworkId, SampleRes, SampleStream); type StreamHandlerFutureSuccess = (BlobId, SubnetworkId, SampleRes, SampleStream);
@ -350,6 +409,12 @@ where
peer_id: stream.peer_id, peer_id: stream.peer_id,
error, error,
})?; })?;
let request = BehaviourSampleReq::try_from(request).map_err(|blob_id| {
SamplingError::InvalidBlobId {
peer_id: stream.peer_id,
blob_id,
}
})?;
channel channel
.request_sender .request_sender
.send(request) .send(request)
@ -357,14 +422,14 @@ where
request, request,
peer_id: stream.peer_id, peer_id: stream.peer_id,
})?; })?;
let response = let response: SampleRes = channel
channel .response_receiver
.response_receiver .await
.await .map_err(|error| SamplingError::ResponseChannel {
.map_err(|error| SamplingError::ResponseChannel { error,
error, peer_id: stream.peer_id,
peer_id: stream.peer_id, })?
})?; .into();
let bytes = pack_message(&response).map_err(|error| SamplingError::Io { let bytes = pack_message(&response).map_err(|error| SamplingError::Io {
peer_id: stream.peer_id, peer_id: stream.peer_id,
error, error,
@ -394,7 +459,7 @@ where
fn schedule_incoming_stream_task( fn schedule_incoming_stream_task(
incoming_tasks: &mut FuturesUnordered<IncomingStreamHandlerFuture>, incoming_tasks: &mut FuturesUnordered<IncomingStreamHandlerFuture>,
sample_stream: SampleStream, sample_stream: SampleStream,
) -> (Receiver<SampleReq>, Sender<SampleRes>) { ) -> (Receiver<BehaviourSampleReq>, Sender<BehaviourSampleRes>) {
let (request_sender, request_receiver) = oneshot::channel(); let (request_sender, request_receiver) = oneshot::channel();
let (response_sender, response_receiver) = oneshot::channel(); let (response_sender, response_receiver) = oneshot::channel();
let channel = ResponseChannel { let channel = ResponseChannel {

View File

@ -6,7 +6,9 @@ use libp2p::{Multiaddr, PeerId};
use log::error; use log::error;
use nomos_core::da::BlobId; use nomos_core::da::BlobId;
use nomos_da_network_core::protocols::sampling; use nomos_da_network_core::protocols::sampling;
use nomos_da_network_core::protocols::sampling::behaviour::SamplingError; use nomos_da_network_core::protocols::sampling::behaviour::{
BehaviourSampleReq, BehaviourSampleRes, SamplingError,
};
use nomos_da_network_core::swarm::validator::{ValidatorEventsStream, ValidatorSwarm}; use nomos_da_network_core::swarm::validator::{ValidatorEventsStream, ValidatorSwarm};
use nomos_da_network_core::SubnetworkId; use nomos_da_network_core::SubnetworkId;
use nomos_libp2p::secret_key_serde; use nomos_libp2p::secret_key_serde;
@ -17,9 +19,9 @@ use std::fmt::Debug;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use subnetworks_assignations::MembershipHandler; use subnetworks_assignations::MembershipHandler;
use tokio::sync::broadcast;
use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::BroadcastStream;
@ -49,6 +51,11 @@ pub enum DaNetworkEventKind {
pub enum SamplingEvent { pub enum SamplingEvent {
/// A success sampling /// A success sampling
SamplingSuccess { blob_id: BlobId, blob: Box<DaBlob> }, SamplingSuccess { blob_id: BlobId, blob: Box<DaBlob> },
/// Incoming sampling request
SamplingRequest {
blob_id: BlobId,
response_sender: mpsc::Sender<DaBlob>,
},
/// A failed sampling error /// A failed sampling error
SamplingError { error: SamplingError }, SamplingError { error: SamplingError },
} }
@ -214,8 +221,30 @@ async fn handle_validator_events_stream(
error!("Error in internal broadcast of sampling success: {e:?}"); error!("Error in internal broadcast of sampling success: {e:?}");
} }
} }
sampling::behaviour::SamplingEvent::IncomingSample{ .. } => { sampling::behaviour::SamplingEvent::IncomingSample{request_receiver, response_sender} => {
unimplemented!("Handle request/response from Sampling service"); if let Ok(BehaviourSampleReq { blob_id }) = request_receiver.await {
let (sampling_response_sender, mut sampling_response_receiver) = mpsc::channel(1);
if let Err(e) = sampling_broadcast_sender
.send(SamplingEvent::SamplingRequest { blob_id, response_sender: sampling_response_sender })
{
error!("Error in internal broadcast of sampling request: {e:?}");
sampling_response_receiver.close()
}
if let Some(blob) = sampling_response_receiver.recv().await {
if response_sender
.send(BehaviourSampleRes::SamplingSuccess {
blob_id,
subnetwork_id: blob.column_idx as u32,
blob: Box::new(blob),
})
.is_err()
{
error!("Error sending sampling success response");
}
}
}
} }
sampling::behaviour::SamplingEvent::SamplingError{ error } => { sampling::behaviour::SamplingEvent::SamplingError{ error } => {
if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingError {error}) { if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingError {error}) {

View File

@ -129,6 +129,10 @@ where
} }
error!("Error while sampling: {error}"); error!("Error while sampling: {error}");
} }
SamplingEvent::SamplingRequest {
blob_id,
response_sender,
} => todo!(),
} }
} }
} }

View File

@ -17,8 +17,6 @@ use tokio_stream::StreamExt;
// internal // internal
use crate::network::NetworkAdapter; use crate::network::NetworkAdapter;
pub const NOMOS_DA_TOPIC: &str = "NomosDa";
pub struct Libp2pAdapter<M> pub struct Libp2pAdapter<M>
where where
M: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId> M: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>