From 66de571bfa7d6a01c68edf55f546ebefc1995f73 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Thu, 22 Aug 2024 12:14:42 +0200 Subject: [PATCH] Da: network service backend (#695) * Sketch up types and backend skeleton * Pipe up sampling events * Pipe up validation blobs events * Added docs * Cargo fmt * Clippy happy * Debug log events in validator swarm run method --- .../network/core/src/behaviour/validator.rs | 24 ++ .../core/src/protocols/sampling/behaviour.rs | 69 ++++++ nomos-da/network/core/src/swarm/validator.rs | 90 +++++--- .../data-availability/network/Cargo.toml | 4 + .../network/src/backends/libp2p/mod.rs | 1 + .../network/src/backends/libp2p/validator.rs | 217 ++++++++++++++++++ .../network/src/backends/mock/executor.rs | 15 +- .../network/src/backends/mod.rs | 9 +- .../data-availability/network/src/lib.rs | 17 +- 9 files changed, 405 insertions(+), 41 deletions(-) create mode 100644 nomos-services/data-availability/network/src/backends/libp2p/mod.rs create mode 100644 nomos-services/data-availability/network/src/backends/libp2p/validator.rs diff --git a/nomos-da/network/core/src/behaviour/validator.rs b/nomos-da/network/core/src/behaviour/validator.rs index 9ee3f2ae..870b88a9 100644 --- a/nomos-da/network/core/src/behaviour/validator.rs +++ b/nomos-da/network/core/src/behaviour/validator.rs @@ -48,4 +48,28 @@ where self.dispersal.update_membership(membership.clone()); self.replication.update_membership(membership); } + + pub fn sampling_behaviour(&self) -> &SamplingBehaviour { + &self.sampling + } + + pub fn dispersal_behaviour(&self) -> &DispersalValidatorBehaviour { + &self.dispersal + } + + pub fn replication_behaviour(&self) -> &ReplicationBehaviour { + &self.replication + } + + pub fn sampling_behaviour_mut(&mut self) -> &mut SamplingBehaviour { + &mut self.sampling + } + + pub fn dispersal_behaviour_mut(&mut self) -> &mut DispersalValidatorBehaviour { + &mut self.dispersal + } + + pub fn replication_behaviour_mut(&mut self) -> &mut ReplicationBehaviour { + &mut self.replication + } } diff --git a/nomos-da/network/core/src/protocols/sampling/behaviour.rs b/nomos-da/network/core/src/protocols/sampling/behaviour.rs index 96514fda..de129aff 100644 --- a/nomos-da/network/core/src/protocols/sampling/behaviour.rs +++ b/nomos-da/network/core/src/protocols/sampling/behaviour.rs @@ -1,4 +1,5 @@ // std +use bincode::ErrorKind; use std::collections::{HashMap, HashSet, VecDeque}; use std::task::{Context, Poll}; // crates @@ -72,6 +73,74 @@ impl SamplingError { } } +impl Clone for SamplingError { + fn clone(&self) -> Self { + match self { + SamplingError::Io { peer_id, error } => SamplingError::Io { + peer_id: *peer_id, + error: std::io::Error::new(error.kind(), error.to_string()), + }, + SamplingError::Protocol { + subnetwork_id, + peer_id, + error, + } => SamplingError::Protocol { + subnetwork_id: *subnetwork_id, + peer_id: *peer_id, + error: error.clone(), + }, + SamplingError::OpenStream { peer_id, error } => SamplingError::OpenStream { + peer_id: *peer_id, + error: match error { + OpenStreamError::UnsupportedProtocol(protocol) => { + OpenStreamError::UnsupportedProtocol(protocol.clone()) + } + OpenStreamError::Io(error) => { + OpenStreamError::Io(std::io::Error::new(error.kind(), error.to_string())) + } + err => OpenStreamError::Io(std::io::Error::new( + std::io::ErrorKind::Other, + err.to_string(), + )), + }, + }, + SamplingError::Deserialize { + blob_id, + subnetwork_id, + peer_id, + error, + } => SamplingError::Deserialize { + blob_id: *blob_id, + subnetwork_id: *subnetwork_id, + peer_id: *peer_id, + error: clone_deserialize_error(error), + }, + SamplingError::RequestChannel { request, peer_id } => SamplingError::RequestChannel { + request: request.clone(), + peer_id: *peer_id, + }, + SamplingError::ResponseChannel { error, peer_id } => SamplingError::ResponseChannel { + peer_id: *peer_id, + error: *error, + }, + } + } +} + +fn clone_deserialize_error(error: &bincode::Error) -> bincode::Error { + Box::new(match error.as_ref() { + ErrorKind::Io(error) => ErrorKind::Io(std::io::Error::new(error.kind(), error.to_string())), + ErrorKind::InvalidUtf8Encoding(error) => ErrorKind::InvalidUtf8Encoding(*error), + ErrorKind::InvalidBoolEncoding(bool) => ErrorKind::InvalidBoolEncoding(*bool), + ErrorKind::InvalidCharEncoding => ErrorKind::InvalidCharEncoding, + ErrorKind::InvalidTagEncoding(tag) => ErrorKind::InvalidTagEncoding(*tag), + ErrorKind::DeserializeAnyNotSupported => ErrorKind::DeserializeAnyNotSupported, + ErrorKind::SizeLimit => ErrorKind::SizeLimit, + ErrorKind::SequenceMustHaveLength => ErrorKind::SequenceMustHaveLength, + ErrorKind::Custom(custom) => ErrorKind::Custom(custom.clone()), + }) +} + /// Inner type representation of a Blob ID // TODO: Use a proper type that is common to the codebase type BlobId = [u8; 32]; diff --git a/nomos-da/network/core/src/swarm/validator.rs b/nomos-da/network/core/src/swarm/validator.rs index ba8c0dfa..a59eb083 100644 --- a/nomos-da/network/core/src/swarm/validator.rs +++ b/nomos-da/network/core/src/swarm/validator.rs @@ -1,10 +1,12 @@ // std // crates use futures::StreamExt; +use kzgrs_backend::common::blob::DaBlob; use libp2p::identity::Keypair; use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm, SwarmBuilder}; -use log::debug; +use log::{debug, error}; +use nomos_da_messages::replication::ReplicationReq; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; // internal @@ -18,6 +20,7 @@ use subnetworks_assignations::MembershipHandler; pub struct ValidatorEventsStream { pub sampling_events_receiver: UnboundedReceiverStream, + pub validation_events_receiver: UnboundedReceiverStream, } pub struct ValidatorSwarm< @@ -25,6 +28,7 @@ pub struct ValidatorSwarm< > { swarm: Swarm>, sampling_events_sender: UnboundedSender, + validation_events_sender: UnboundedSender, } impl ValidatorSwarm @@ -33,14 +37,19 @@ where { pub fn new(key: Keypair, membership: Membership) -> (Self, ValidatorEventsStream) { let (sampling_events_sender, sampling_events_receiver) = unbounded_channel(); + let (validation_events_sender, validation_events_receiver) = unbounded_channel(); + let sampling_events_receiver = UnboundedReceiverStream::new(sampling_events_receiver); + let validation_events_receiver = UnboundedReceiverStream::new(validation_events_receiver); ( Self { swarm: Self::build_swarm(key, membership), sampling_events_sender, + validation_events_sender, }, ValidatorEventsStream { sampling_events_receiver, + validation_events_receiver, }, ) } @@ -53,21 +62,48 @@ where .build() } + pub fn protocol_swarm(&self) -> &Swarm> { + &self.swarm + } + + pub fn protocol_swarm_mut(&mut self) -> &mut Swarm> { + &mut self.swarm + } + async fn handle_sampling_event(&mut self, event: SamplingEvent) { if let Err(e) = self.sampling_events_sender.send(event) { debug!("Error distributing sampling message internally: {e:?}"); } } - async fn handle_dispersal_event(&mut self, _event: DispersalEvent) { - // TODO: hook incoming dispersal events => to replication - unimplemented!() + async fn handle_dispersal_event(&mut self, event: DispersalEvent) { + match event { + // Send message for replication + DispersalEvent::IncomingMessage { message } => { + if let Ok(blob) = bincode::deserialize::( + message + .blob + .as_ref() + .expect("Message blob should not be empty") + .data + .as_slice(), + ) { + if let Err(e) = self.validation_events_sender.send(blob) { + error!("Error sending blob to validation: {e:?}"); + } + } + self.swarm + .behaviour_mut() + .replication_behaviour_mut() + .send_message(ReplicationReq { + blob: message.blob, + subnetwork_id: message.subnetwork_id, + }); + } + } } - async fn handle_replication_event(&mut self, _event: ReplicationEvent) { - // TODO: Hook incoming blobs from replication protocol - unimplemented!() - } + async fn handle_replication_event(&mut self, _event: ReplicationEvent) {} async fn handle_behaviour_event(&mut self, event: ValidatorBehaviourEvent) { match event { @@ -83,33 +119,33 @@ where } } - pub async fn step(&mut self) { - tokio::select! { - Some(event) = self.swarm.next() => { + pub async fn run(mut self) { + loop { + if let Some(event) = self.swarm.next().await { + debug!("Da swarm event received: {event:?}"); match event { SwarmEvent::Behaviour(behaviour_event) => { self.handle_behaviour_event(behaviour_event).await; - }, - SwarmEvent::ConnectionEstablished{ .. } => {} - SwarmEvent::ConnectionClosed{ .. } => {} - SwarmEvent::IncomingConnection{ .. } => {} - SwarmEvent::IncomingConnectionError{ .. } => {} - SwarmEvent::OutgoingConnectionError{ .. } => {} - SwarmEvent::NewListenAddr{ .. } => {} - SwarmEvent::ExpiredListenAddr{ .. } => {} - SwarmEvent::ListenerClosed{ .. } => {} - SwarmEvent::ListenerError{ .. } => {} - SwarmEvent::Dialing{ .. } => {} - SwarmEvent::NewExternalAddrCandidate{ .. } => {} - SwarmEvent::ExternalAddrConfirmed{ .. } => {} - SwarmEvent::ExternalAddrExpired{ .. } => {} - SwarmEvent::NewExternalAddrOfPeer{ .. } => {} + } + SwarmEvent::ConnectionEstablished { .. } => {} + SwarmEvent::ConnectionClosed { .. } => {} + SwarmEvent::IncomingConnection { .. } => {} + SwarmEvent::IncomingConnectionError { .. } => {} + SwarmEvent::OutgoingConnectionError { .. } => {} + SwarmEvent::NewListenAddr { .. } => {} + SwarmEvent::ExpiredListenAddr { .. } => {} + SwarmEvent::ListenerClosed { .. } => {} + SwarmEvent::ListenerError { .. } => {} + SwarmEvent::Dialing { .. } => {} + SwarmEvent::NewExternalAddrCandidate { .. } => {} + SwarmEvent::ExternalAddrConfirmed { .. } => {} + SwarmEvent::ExternalAddrExpired { .. } => {} + SwarmEvent::NewExternalAddrOfPeer { .. } => {} event => { debug!("Unsupported validator swarm event: {event:?}"); } } } - } } } diff --git a/nomos-services/data-availability/network/Cargo.toml b/nomos-services/data-availability/network/Cargo.toml index a6692624..01cb72b2 100644 --- a/nomos-services/data-availability/network/Cargo.toml +++ b/nomos-services/data-availability/network/Cargo.toml @@ -8,7 +8,11 @@ async-trait = "0.1" futures = "0.3" kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } +nomos-da-network-core = { path = "../../../nomos-da/network/core" } +subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" } +libp2p = { version = "0.54", features = ["ed25519"] } serde = { version = "1.0", features = ["derive"] } tokio = { version = "1", features = ["macros", "sync"] } tokio-stream = "0.1" tracing = "0.1" +log = "0.4.22" diff --git a/nomos-services/data-availability/network/src/backends/libp2p/mod.rs b/nomos-services/data-availability/network/src/backends/libp2p/mod.rs new file mode 100644 index 00000000..fa199f24 --- /dev/null +++ b/nomos-services/data-availability/network/src/backends/libp2p/mod.rs @@ -0,0 +1 @@ +pub mod validator; diff --git a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs new file mode 100644 index 00000000..5de06b83 --- /dev/null +++ b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs @@ -0,0 +1,217 @@ +use crate::backends::NetworkBackend; +use futures::{Stream, StreamExt}; +use kzgrs_backend::common::blob::DaBlob; +use libp2p::identity::Keypair; +use libp2p::PeerId; +use log::error; +use nomos_da_network_core::protocols::sampling; +use nomos_da_network_core::protocols::sampling::behaviour::SamplingError; +use nomos_da_network_core::swarm::validator::{ValidatorEventsStream, ValidatorSwarm}; +use nomos_da_network_core::SubnetworkId; +use overwatch_rs::overwatch::handle::OverwatchHandle; +use overwatch_rs::services::state::NoState; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::pin::Pin; +use subnetworks_assignations::MembershipHandler; +use tokio::sync::broadcast; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::UnboundedSender; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::BroadcastStream; + +type BlobId = [u8; 32]; +type ColumnIdx = u32; + +const BROADCAST_CHANNEL_SIZE: usize = 128; + +/// Message that the backend replies to +#[derive(Debug)] +pub enum DaNetworkMessage { + /// Kickstart a network sapling + RequestSample { + subnetwork_id: ColumnIdx, + blob_id: BlobId, + }, +} + +/// Events types to subscribe to +/// * Sampling: Incoming sampling events [success/fail] +/// * Incoming blobs to be verified +#[derive(Debug)] +pub enum DaNetworkEventKind { + Sampling, + Verifying, +} + +/// Sampling events coming from da network +#[derive(Debug, Clone)] +pub enum SamplingEvent { + /// A success sampling + SamplingSuccess { blob_id: BlobId, blob: Box }, + /// A failed sampling error + SamplingError { error: SamplingError }, +} + +/// DA network incoming events +#[derive(Debug)] +pub enum DaNetworkEvent { + Sampling(SamplingEvent), + Verifying(Box), +} + +/// DA network backend for validators +/// Internally uses a libp2p swarm composed of the [`ValidatorBehaviour`] +/// It forwards network messages to the corresponding subscription channels/streams +pub struct DaNetworkValidatorBackend { + // TODO: this join handles should be cancelable tasks. We should add an stop method for + // the `NetworkBackend` trait so if the service is stopped the backend can gracefully handle open + // sub-tasks as well. + #[allow(dead_code)] + task: JoinHandle<()>, + #[allow(dead_code)] + replies_task: JoinHandle<()>, + sampling_request_channel: UnboundedSender<(SubnetworkId, BlobId)>, + sampling_broadcast_receiver: broadcast::Receiver, + verifying_broadcast_receiver: broadcast::Receiver, + _membership: PhantomData, +} + +#[derive(Clone, Debug)] +pub struct DaNetworkValidatorBackendSettings { + /// Identification key + key: Keypair, + /// Membership of DA network PoV set + membership: Membership, +} + +impl DaNetworkValidatorBackend { + /// Send the sampling request to the underlying sampling behaviour + async fn handle_sample_request(&self, subnetwork_id: SubnetworkId, blob_id: BlobId) { + if let Err(SendError((subnetwork_id, blob_id))) = + self.sampling_request_channel.send((subnetwork_id, blob_id)) + { + error!( + "Error requesting sample for subnetwork id : {subnetwork_id}, blob_id: {blob_id:?}" + ); + } + } +} + +#[async_trait::async_trait] +impl NetworkBackend for DaNetworkValidatorBackend +where + Membership: MembershipHandler + + Clone + + Debug + + Send + + Sync + + 'static, +{ + type Settings = DaNetworkValidatorBackendSettings; + type State = NoState; + type Message = DaNetworkMessage; + type EventKind = DaNetworkEventKind; + type NetworkEvent = DaNetworkEvent; + + fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { + let (validator_swarm, events_streams) = ValidatorSwarm::new(config.key, config.membership); + let sampling_request_channel = validator_swarm + .protocol_swarm() + .behaviour() + .sampling_behaviour() + .sample_request_channel(); + + let task = overwatch_handle.runtime().spawn(validator_swarm.run()); + let (sampling_broadcast_sender, sampling_broadcast_receiver) = + broadcast::channel(BROADCAST_CHANNEL_SIZE); + let (verifying_broadcast_sender, verifying_broadcast_receiver) = + broadcast::channel(BROADCAST_CHANNEL_SIZE); + let replies_task = overwatch_handle + .runtime() + .spawn(handle_validator_events_stream( + events_streams, + sampling_broadcast_sender, + verifying_broadcast_sender, + )); + + Self { + task, + replies_task, + sampling_request_channel, + sampling_broadcast_receiver, + verifying_broadcast_receiver, + _membership: Default::default(), + } + } + + async fn process(&self, msg: Self::Message) { + match msg { + DaNetworkMessage::RequestSample { + subnetwork_id, + blob_id, + } => { + self.handle_sample_request(subnetwork_id, blob_id).await; + } + } + } + + async fn subscribe( + &mut self, + event: Self::EventKind, + ) -> Pin + Send>> { + match event { + DaNetworkEventKind::Sampling => Box::pin( + BroadcastStream::new(self.sampling_broadcast_receiver.resubscribe()) + .filter_map(|event| async { event.ok() }) + .map(Self::NetworkEvent::Sampling), + ), + DaNetworkEventKind::Verifying => Box::pin( + BroadcastStream::new(self.verifying_broadcast_receiver.resubscribe()) + .filter_map(|event| async { event.ok() }) + .map(|blob| Self::NetworkEvent::Verifying(Box::new(blob))), + ), + } + } +} + +/// Task that handles forwarding of events to the subscriptions channels/stream +async fn handle_validator_events_stream( + events_streams: ValidatorEventsStream, + sampling_broadcast_sender: broadcast::Sender, + validation_broadcast_sender: broadcast::Sender, +) { + let ValidatorEventsStream { + mut sampling_events_receiver, + mut validation_events_receiver, + } = events_streams; + #[allow(clippy::never_loop)] + loop { + // WARNING: `StreamExt::next` is cancellation safe. + // If adding more branches check if such methods are within the cancellation safe set: + // https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety + tokio::select! { + Some(sampling_event) = StreamExt::next(&mut sampling_events_receiver) => { + match sampling_event { + sampling::behaviour::SamplingEvent::SamplingSuccess{ blob_id, blob , .. } => { + if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingSuccess {blob_id, blob}){ + error!("Error in internal broadcast of sampling success: {e:?}"); + } + } + sampling::behaviour::SamplingEvent::IncomingSample{ .. } => { + unimplemented!("Handle request/response from Sampling service"); + } + sampling::behaviour::SamplingEvent::SamplingError{ error } => { + if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingError {error}) { + error!{"Error in internal broadcast of sampling error: {e:?}"}; + } + }} + } + Some(da_blob) = StreamExt::next(&mut validation_events_receiver)=> { + if let Err(error) = validation_broadcast_sender.send(da_blob) { + error!("Error in internal broadcast of validation for blob: {:?}", error.0); + } + } + } + } +} diff --git a/nomos-services/data-availability/network/src/backends/mock/executor.rs b/nomos-services/data-availability/network/src/backends/mock/executor.rs index 687ff806..180dd6ee 100644 --- a/nomos-services/data-availability/network/src/backends/mock/executor.rs +++ b/nomos-services/data-availability/network/src/backends/mock/executor.rs @@ -1,10 +1,13 @@ +use futures::{Stream, StreamExt}; use kzgrs_backend::common::{blob::DaBlob, build_blob_id}; use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::NoState}; use serde::{Deserialize, Serialize}; +use std::pin::Pin; use tokio::sync::{ - broadcast::{self, Receiver}, + broadcast::{self}, mpsc, }; +use tokio_stream::wrappers::BroadcastStream; use crate::backends::NetworkBackend; @@ -96,9 +99,15 @@ impl NetworkBackend for MockExecutorBackend { } } - async fn subscribe(&mut self, kind: Self::EventKind) -> Receiver { + async fn subscribe( + &mut self, + kind: Self::EventKind, + ) -> Pin + Send>> { match kind { - EventKind::Dispersal | EventKind::Sample => self.events_tx.subscribe(), + EventKind::Dispersal | EventKind::Sample => Box::pin( + BroadcastStream::new(self.events_tx.subscribe()) + .filter_map(|event| async { event.ok() }), + ), } } } diff --git a/nomos-services/data-availability/network/src/backends/mod.rs b/nomos-services/data-availability/network/src/backends/mod.rs index 675fe6c8..95338a4e 100644 --- a/nomos-services/data-availability/network/src/backends/mod.rs +++ b/nomos-services/data-availability/network/src/backends/mod.rs @@ -1,8 +1,10 @@ +pub mod libp2p; pub mod mock; use super::*; +use futures::Stream; use overwatch_rs::{overwatch::handle::OverwatchHandle, services::state::ServiceState}; -use tokio::sync::broadcast::Receiver; +use std::pin::Pin; #[async_trait::async_trait] pub trait NetworkBackend { @@ -14,5 +16,8 @@ pub trait NetworkBackend { fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self; async fn process(&self, msg: Self::Message); - async fn subscribe(&mut self, event: Self::EventKind) -> Receiver; + async fn subscribe( + &mut self, + event: Self::EventKind, + ) -> Pin + Send>>; } diff --git a/nomos-services/data-availability/network/src/lib.rs b/nomos-services/data-availability/network/src/lib.rs index ce6b4bc2..294721f2 100644 --- a/nomos-services/data-availability/network/src/lib.rs +++ b/nomos-services/data-availability/network/src/lib.rs @@ -2,10 +2,11 @@ pub mod backends; // std use std::fmt::{self, Debug}; +use std::pin::Pin; // crates use async_trait::async_trait; use backends::NetworkBackend; -use futures::StreamExt; +use futures::{Stream, StreamExt}; use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -14,7 +15,6 @@ use overwatch_rs::services::{ ServiceCore, ServiceData, ServiceId, }; use serde::{Deserialize, Serialize}; -use tokio::sync::broadcast; use tokio::sync::oneshot; use tracing::error; // internal @@ -23,7 +23,7 @@ pub enum DaNetworkMsg { Process(B::Message), Subscribe { kind: B::EventKind, - sender: oneshot::Sender>, + sender: oneshot::Sender + Send>>>, }, } @@ -31,10 +31,9 @@ impl Debug for DaNetworkMsg { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { match self { Self::Process(msg) => write!(fmt, "DaNetworkMsg::Process({msg:?})"), - Self::Subscribe { kind, sender } => write!( - fmt, - "DaNetworkMsg::Subscribe{{ kind: {kind:?}, sender: {sender:?}}}" - ), + Self::Subscribe { kind, .. } => { + write!(fmt, "DaNetworkMsg::Subscribe{{ kind: {kind:?}}}") + } } } } @@ -52,7 +51,7 @@ impl Debug for NetworkConfig { } } -pub struct NetworkService { +pub struct NetworkService { backend: B, service_state: ServiceStateHandle, } @@ -61,7 +60,7 @@ pub struct NetworkState { _backend: B::State, } -impl ServiceData for NetworkService { +impl ServiceData for NetworkService { const SERVICE_ID: ServiceId = "DaNetwork"; type Settings = NetworkConfig; type State = NetworkState;