From 9ecd738d6ef4247b7744ab51f71da8b74d5af18a Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Tue, 1 Oct 2024 14:44:13 +0200 Subject: [PATCH] DA: Executor network service (#795) * Refactor common things in backends * Further extract to common * Extract dial_peers * Pipe out executor events * Add wrapper settings * Dial up dispersal peers * Clippy happy * dial_peers -> dial_validator_subnetwork_peers * Add members list to membership trait * Implement peers selection and dialing up from specification * Fix tests * Fix tiny doc --- nomos-cli/src/test_utils.rs | 4 + .../protocols/dispersal/executor/behaviour.rs | 2 +- nomos-da/network/core/src/swarm/executor.rs | 10 +- nomos-da/network/core/src/test_utils.rs | 4 + .../subnetworks-assignations/src/lib.rs | 2 + .../src/versions/v1.rs | 4 + .../src/versions/v2.rs | 4 + .../data-availability/network/Cargo.toml | 2 +- .../network/src/backends/libp2p/common.rs | 165 +++++++++ .../network/src/backends/libp2p/executor.rs | 335 ++++++++++++++++++ .../network/src/backends/libp2p/mod.rs | 2 + .../network/src/backends/libp2p/validator.rs | 177 ++------- .../data-availability/sampling/src/lib.rs | 2 +- .../sampling/src/network/adapters/libp2p.rs | 5 +- .../sampling/src/network/mod.rs | 2 +- .../data-availability/tests/src/common.rs | 9 +- tests/src/nodes/nomos.rs | 6 +- 17 files changed, 560 insertions(+), 175 deletions(-) create mode 100644 nomos-services/data-availability/network/src/backends/libp2p/common.rs create mode 100644 nomos-services/data-availability/network/src/backends/libp2p/executor.rs diff --git a/nomos-cli/src/test_utils.rs b/nomos-cli/src/test_utils.rs index 55588604..122bb469 100644 --- a/nomos-cli/src/test_utils.rs +++ b/nomos-cli/src/test_utils.rs @@ -22,4 +22,8 @@ impl MembershipHandler for AllNeighbours { fn members_of(&self, _network_id: &Self::NetworkId) -> HashSet { self.neighbours.clone() } + + fn members(&self) -> HashSet { + self.neighbours.clone() + } } diff --git a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs index 1f124c8a..5b6c7035 100644 --- a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs +++ b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs @@ -125,7 +125,7 @@ impl Clone for DispersalError { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum DispersalExecutorEvent { /// A blob successfully arrived its destination DispersalSuccess { diff --git a/nomos-da/network/core/src/swarm/executor.rs b/nomos-da/network/core/src/swarm/executor.rs index 410572ac..b0da6a0d 100644 --- a/nomos-da/network/core/src/swarm/executor.rs +++ b/nomos-da/network/core/src/swarm/executor.rs @@ -25,12 +25,12 @@ use crate::protocols::{ use crate::swarm::common::{ handle_replication_event, handle_sampling_event, handle_validator_dispersal_event, }; +use crate::swarm::validator::ValidatorEventsStream; use crate::SubnetworkId; use subnetworks_assignations::MembershipHandler; pub struct ExecutorEventsStream { - pub sampling_events_receiver: UnboundedReceiverStream, - pub validation_events_receiver: UnboundedReceiverStream, + pub validator_events_stream: ValidatorEventsStream, pub dispersal_events_receiver: UnboundedReceiverStream, } @@ -66,8 +66,10 @@ where dispersal_events_sender, }, ExecutorEventsStream { - sampling_events_receiver, - validation_events_receiver, + validator_events_stream: ValidatorEventsStream { + sampling_events_receiver, + validation_events_receiver, + }, dispersal_events_receiver, }, ) diff --git a/nomos-da/network/core/src/test_utils.rs b/nomos-da/network/core/src/test_utils.rs index 55588604..122bb469 100644 --- a/nomos-da/network/core/src/test_utils.rs +++ b/nomos-da/network/core/src/test_utils.rs @@ -22,4 +22,8 @@ impl MembershipHandler for AllNeighbours { fn members_of(&self, _network_id: &Self::NetworkId) -> HashSet { self.neighbours.clone() } + + fn members(&self) -> HashSet { + self.neighbours.clone() + } } diff --git a/nomos-da/network/subnetworks-assignations/src/lib.rs b/nomos-da/network/subnetworks-assignations/src/lib.rs index bfa343e5..dbb71575 100644 --- a/nomos-da/network/subnetworks-assignations/src/lib.rs +++ b/nomos-da/network/subnetworks-assignations/src/lib.rs @@ -23,4 +23,6 @@ pub trait MembershipHandler { // Returns the set of members in a subnetwork by its NetworkId fn members_of(&self, network_id: &Self::NetworkId) -> HashSet; + + fn members(&self) -> HashSet; } diff --git a/nomos-da/network/subnetworks-assignations/src/versions/v1.rs b/nomos-da/network/subnetworks-assignations/src/versions/v1.rs index 0530eff8..27c18dcf 100644 --- a/nomos-da/network/subnetworks-assignations/src/versions/v1.rs +++ b/nomos-da/network/subnetworks-assignations/src/versions/v1.rs @@ -69,6 +69,10 @@ impl MembershipHandler for FillFromNodeList { fn members_of(&self, network_id: &Self::NetworkId) -> HashSet { self.assignations[*network_id as usize].clone() } + + fn members(&self) -> HashSet { + self.assignations.iter().flatten().copied().collect() + } } #[cfg(test)] diff --git a/nomos-da/network/subnetworks-assignations/src/versions/v2.rs b/nomos-da/network/subnetworks-assignations/src/versions/v2.rs index 1babfedc..3e002d92 100644 --- a/nomos-da/network/subnetworks-assignations/src/versions/v2.rs +++ b/nomos-da/network/subnetworks-assignations/src/versions/v2.rs @@ -92,6 +92,10 @@ impl MembershipHandler for FillWithOriginalReplication { fn members_of(&self, network_id: &Self::NetworkId) -> HashSet { self.assignations[*network_id as usize].clone() } + + fn members(&self) -> HashSet { + self.assignations.iter().flatten().copied().collect() + } } #[cfg(test)] diff --git a/nomos-services/data-availability/network/Cargo.toml b/nomos-services/data-availability/network/Cargo.toml index 2e84acfd..ee375689 100644 --- a/nomos-services/data-availability/network/Cargo.toml +++ b/nomos-services/data-availability/network/Cargo.toml @@ -12,7 +12,7 @@ nomos-core = { path = "../../../nomos-core" } nomos-libp2p = { path = "../../../nomos-libp2p" } nomos-da-network-core = { path = "../../../nomos-da/network/core" } subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" } -libp2p = { version = "0.54", features = ["ed25519"] } +libp2p = { version = "0.53", features = ["ed25519"] } serde = { version = "1.0", features = ["derive"] } tokio = { version = "1", features = ["macros", "sync"] } tokio-stream = "0.1" diff --git a/nomos-services/data-availability/network/src/backends/libp2p/common.rs b/nomos-services/data-availability/network/src/backends/libp2p/common.rs new file mode 100644 index 00000000..fdc88b1d --- /dev/null +++ b/nomos-services/data-availability/network/src/backends/libp2p/common.rs @@ -0,0 +1,165 @@ +use futures::StreamExt; +use kzgrs_backend::common::blob::DaBlob; +use kzgrs_backend::common::ColumnIndex; +use libp2p::swarm::NetworkBehaviour; +use libp2p::Swarm; +use log::error; +use nomos_core::da::BlobId; +use nomos_da_network_core::protocols::sampling; +use nomos_da_network_core::protocols::sampling::behaviour::{ + BehaviourSampleReq, BehaviourSampleRes, SamplingError, +}; +use nomos_da_network_core::swarm::validator::ValidatorEventsStream; +use nomos_da_network_core::SubnetworkId; +use nomos_libp2p::secret_key_serde; +use nomos_libp2p::{ed25519, Multiaddr, PeerId}; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use std::fmt::Debug; +use subnetworks_assignations::MembershipHandler; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::{broadcast, mpsc}; + +pub(crate) const BROADCAST_CHANNEL_SIZE: usize = 128; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DaNetworkBackendSettings { + // Identification Secp256k1 private key in Hex format (`0x123...abc`). Default random. + #[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")] + pub node_key: ed25519::SecretKey, + /// Membership of DA network PoV set + pub membership: Membership, + pub addresses: HashMap, + pub listening_address: Multiaddr, +} + +/// Sampling events coming from da network +#[derive(Debug, Clone)] +pub enum SamplingEvent { + /// A success sampling + SamplingSuccess { blob_id: BlobId, blob: Box }, + /// Incoming sampling request + SamplingRequest { + blob_id: BlobId, + column_idx: ColumnIndex, + response_sender: mpsc::Sender>, + }, + /// A failed sampling error + SamplingError { error: SamplingError }, +} + +pub(crate) fn dial_validator_subnetwork_peers( + membership: &Membership, + addresses: &HashMap, + swarm: &mut Swarm, + local_peer_id: PeerId, +) -> HashSet +where + Membership: MembershipHandler + + Clone + + Debug + + Send + + Sync + + 'static, + Behaviour: NetworkBehaviour, +{ + let mut connected_peers = HashSet::new(); + membership + .membership(&local_peer_id) + .iter() + .flat_map(|subnet| membership.members_of(subnet)) + .filter(|peer| peer != &local_peer_id) + .filter_map(|peer| addresses.get(&peer).map(|addr| (peer, addr.clone()))) + .for_each(|(peer, addr)| { + // Only dial if we haven't already connected to this peer. + if connected_peers.insert(peer) { + swarm + .dial(addr) + .expect("Node should be able to dial peer in a subnet"); + } + }); + connected_peers +} + +/// Task that handles forwarding of events to the subscriptions channels/stream +pub(crate) async fn handle_validator_events_stream( + events_streams: ValidatorEventsStream, + sampling_broadcast_sender: broadcast::Sender, + validation_broadcast_sender: broadcast::Sender, +) { + let ValidatorEventsStream { + mut sampling_events_receiver, + mut validation_events_receiver, + } = events_streams; + #[allow(clippy::never_loop)] + loop { + // WARNING: `StreamExt::next` is cancellation safe. + // If adding more branches check if such methods are within the cancellation safe set: + // https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety + tokio::select! { + Some(sampling_event) = StreamExt::next(&mut sampling_events_receiver) => { + match sampling_event { + sampling::behaviour::SamplingEvent::SamplingSuccess{ blob_id, blob , .. } => { + if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingSuccess {blob_id, blob}){ + error!("Error in internal broadcast of sampling success: {e:?}"); + } + } + sampling::behaviour::SamplingEvent::IncomingSample{request_receiver, response_sender} => { + if let Ok(BehaviourSampleReq { blob_id, column_idx }) = request_receiver.await { + let (sampling_response_sender, mut sampling_response_receiver) = mpsc::channel(1); + + if let Err(e) = sampling_broadcast_sender + .send(SamplingEvent::SamplingRequest { blob_id, column_idx, response_sender: sampling_response_sender }) + { + error!("Error in internal broadcast of sampling request: {e:?}"); + sampling_response_receiver.close() + } + + if let Some(maybe_blob) = sampling_response_receiver.recv().await { + let result = match maybe_blob { + Some(blob) => BehaviourSampleRes::SamplingSuccess { + blob_id, + subnetwork_id: blob.column_idx as u32, + blob: Box::new(blob), + }, + None => BehaviourSampleRes::SampleNotFound { blob_id }, + }; + + if response_sender.send(result).is_err() { + error!("Error sending sampling success response"); + } + } else if response_sender + .send(BehaviourSampleRes::SampleNotFound { blob_id }) + .is_err() + { + error!("Error sending sampling success response"); + } + } + } + sampling::behaviour::SamplingEvent::SamplingError{ error } => { + if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingError {error}) { + error!{"Error in internal broadcast of sampling error: {e:?}"}; + } + }} + } + Some(da_blob) = StreamExt::next(&mut validation_events_receiver)=> { + if let Err(error) = validation_broadcast_sender.send(da_blob) { + error!("Error in internal broadcast of validation for blob: {:?}", error.0); + } + } + } + } +} + +pub(crate) async fn handle_sample_request( + sampling_request_channel: &UnboundedSender<(SubnetworkId, BlobId)>, + subnetwork_id: SubnetworkId, + blob_id: BlobId, +) { + if let Err(SendError((subnetwork_id, blob_id))) = + sampling_request_channel.send((subnetwork_id, blob_id)) + { + error!("Error requesting sample for subnetwork id : {subnetwork_id}, blob_id: {blob_id:?}"); + } +} diff --git a/nomos-services/data-availability/network/src/backends/libp2p/executor.rs b/nomos-services/data-availability/network/src/backends/libp2p/executor.rs new file mode 100644 index 00000000..d293cb48 --- /dev/null +++ b/nomos-services/data-availability/network/src/backends/libp2p/executor.rs @@ -0,0 +1,335 @@ +use crate::backends::libp2p::common::{ + dial_validator_subnetwork_peers, handle_sample_request, handle_validator_events_stream, + DaNetworkBackendSettings, SamplingEvent, BROADCAST_CHANNEL_SIZE, +}; +use crate::backends::NetworkBackend; +use futures::{Stream, StreamExt}; +use kzgrs_backend::common::blob::DaBlob; +use libp2p::PeerId; +use log::error; +use nomos_core::da::BlobId; +use nomos_da_network_core::protocols::dispersal::executor::behaviour::DispersalExecutorEvent; +use nomos_da_network_core::swarm::executor::ExecutorSwarm; +use nomos_da_network_core::SubnetworkId; +use nomos_libp2p::ed25519; +use overwatch_rs::overwatch::handle::OverwatchHandle; +use overwatch_rs::services::state::NoState; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::pin::Pin; +use subnetworks_assignations::MembershipHandler; +use tokio::sync::broadcast; +use tokio::sync::mpsc::UnboundedSender; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream}; + +/// Message that the backend replies to +#[derive(Debug)] +pub enum DaNetworkMessage { + /// Kickstart a network sapling + RequestSample { + subnetwork_id: SubnetworkId, + blob_id: BlobId, + }, + RequestDispersal { + subnetwork_id: SubnetworkId, + da_blob: Box, + }, +} + +/// Events types to subscribe to +/// * Sampling: Incoming sampling events [success/fail] +/// * Incoming blobs to be verified +#[derive(Debug)] +pub enum DaNetworkEventKind { + Sampling, + Verifying, + Dispersal, +} + +/// DA network incoming events +#[derive(Debug)] +pub enum DaNetworkEvent { + Sampling(SamplingEvent), + Verifying(Box), + Dispersal(DispersalExecutorEvent), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DaNetworkExecutorBackendSettings { + pub validator_settings: DaNetworkBackendSettings, + pub num_subnets: u16, +} + +/// DA network backend for validators +/// Internally uses a libp2p swarm composed of the [`ExecutorBehaviour`] +/// It forwards network messages to the corresponding subscription channels/streams +pub struct DaNetworkExecutorBackend +where + Membership: MembershipHandler, +{ + // TODO: this join handles should be cancelable tasks. We should add an stop method for + // the `NetworkBackend` trait so if the service is stopped the backend can gracefully handle open + // sub-tasks as well. + #[allow(dead_code)] + task: JoinHandle<()>, + #[allow(dead_code)] + verifier_replies_task: JoinHandle<()>, + #[allow(dead_code)] + executor_replies_task: JoinHandle<()>, + sampling_request_channel: UnboundedSender<(SubnetworkId, BlobId)>, + sampling_broadcast_receiver: broadcast::Receiver, + verifying_broadcast_receiver: broadcast::Receiver, + dispersal_broadcast_receiver: broadcast::Receiver, + dispersal_blobs_sender: UnboundedSender<(Membership::NetworkId, DaBlob)>, + _membership: PhantomData, +} + +#[async_trait::async_trait] +impl NetworkBackend for DaNetworkExecutorBackend +where + Membership: MembershipHandler + + Clone + + Debug + + Send + + Sync + + 'static, +{ + type Settings = DaNetworkExecutorBackendSettings; + type State = NoState; + type Message = DaNetworkMessage; + type EventKind = DaNetworkEventKind; + type NetworkEvent = DaNetworkEvent; + + fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { + let keypair = libp2p::identity::Keypair::from(ed25519::Keypair::from( + config.validator_settings.node_key.clone(), + )); + let (mut executor_swarm, executor_events_stream) = ExecutorSwarm::new( + keypair, + config.validator_settings.membership.clone(), + config + .validator_settings + .addresses + .clone() + .into_iter() + .collect(), + ); + let address = config.validator_settings.listening_address.clone(); + // put swarm to listen at the specified configuration address + executor_swarm + .protocol_swarm_mut() + .listen_on(address.clone()) + .unwrap_or_else(|e| { + panic!("Error listening on DA network with address {address}: {e}") + }); + // Dial peers in the same subnetworks (Node might participate in multiple). + let local_peer_id = *executor_swarm.local_peer_id(); + + let validator_subnetworks_connected_peers = dial_validator_subnetwork_peers( + &config.validator_settings.membership, + &config.validator_settings.addresses, + executor_swarm.protocol_swarm_mut(), + local_peer_id, + ); + + let dispersal_peers = dial_dispersal_peers( + &mut executor_swarm, + &config, + &validator_subnetworks_connected_peers, + ); + + let sampling_request_channel = executor_swarm.sample_request_channel(); + + let dispersal_blobs_sender = executor_swarm.dispersal_blobs_channel(); + let executor_open_stream_sender = executor_swarm.dispersal_open_stream_sender(); + + let task = overwatch_handle.runtime().spawn(executor_swarm.run()); + + // open streams to dispersal peers + for peer_id in dispersal_peers.iter() { + executor_open_stream_sender.send(*peer_id).unwrap(); + } + + let (sampling_broadcast_sender, sampling_broadcast_receiver) = + broadcast::channel(BROADCAST_CHANNEL_SIZE); + let (verifying_broadcast_sender, verifying_broadcast_receiver) = + broadcast::channel(BROADCAST_CHANNEL_SIZE); + let (dispersal_broadcast_sender, dispersal_broadcast_receiver) = + broadcast::channel(BROADCAST_CHANNEL_SIZE); + let verifier_replies_task = + overwatch_handle + .runtime() + .spawn(handle_validator_events_stream( + executor_events_stream.validator_events_stream, + sampling_broadcast_sender, + verifying_broadcast_sender, + )); + let executor_replies_task = + overwatch_handle + .runtime() + .spawn(handle_executor_dispersal_events_stream( + executor_events_stream.dispersal_events_receiver, + dispersal_broadcast_sender, + )); + + Self { + task, + verifier_replies_task, + executor_replies_task, + sampling_request_channel, + sampling_broadcast_receiver, + verifying_broadcast_receiver, + dispersal_broadcast_receiver, + dispersal_blobs_sender, + _membership: Default::default(), + } + } + + async fn process(&self, msg: Self::Message) { + match msg { + DaNetworkMessage::RequestSample { + subnetwork_id, + blob_id, + } => { + handle_sample_request(&self.sampling_request_channel, subnetwork_id, blob_id).await; + } + DaNetworkMessage::RequestDispersal { + subnetwork_id, + da_blob, + } => { + if let Err(e) = self.dispersal_blobs_sender.send((subnetwork_id, *da_blob)) { + error!("Could not send internal blob to underlying dispersal behaviour: {e}"); + } + } + } + } + + async fn subscribe( + &mut self, + event: Self::EventKind, + ) -> Pin + Send>> { + match event { + DaNetworkEventKind::Sampling => Box::pin( + BroadcastStream::new(self.sampling_broadcast_receiver.resubscribe()) + .filter_map(|event| async { event.ok() }) + .map(Self::NetworkEvent::Sampling), + ), + DaNetworkEventKind::Verifying => Box::pin( + BroadcastStream::new(self.verifying_broadcast_receiver.resubscribe()) + .filter_map(|event| async { event.ok() }) + .map(|blob| Self::NetworkEvent::Verifying(Box::new(blob))), + ), + DaNetworkEventKind::Dispersal => Box::pin( + BroadcastStream::new(self.dispersal_broadcast_receiver.resubscribe()) + .filter_map(|event| async { event.ok() }) + .map(Self::NetworkEvent::Dispersal), + ), + } + } +} + +async fn handle_executor_dispersal_events_stream( + mut dispersal_events_receiver: UnboundedReceiverStream, + dispersal_broadcast_sender: broadcast::Sender, +) { + while let Some(event) = dispersal_events_receiver.next().await { + if let Err(e) = dispersal_broadcast_sender.send(event) { + error!("Error forwarding internal dispersal executor event: {e}"); + } + } +} + +fn dial_dispersal_peers( + executor_swarm: &mut ExecutorSwarm, + config: &DaNetworkExecutorBackendSettings, + validator_connected_peers: &HashSet, +) -> HashSet +where + Membership: MembershipHandler + + Clone + + Debug + + Send + + Sync + + 'static, +{ + let mut connected_peers = HashSet::new(); + + let local_peer_id = *executor_swarm.local_peer_id(); + let membership = &config.validator_settings.membership; + + // filter out which subnetworks we already have connections with + let connected_subnetworks: HashSet = (0..config.num_subnets as u32) + .filter(|subnetwork_id| { + !membership + .members_of(subnetwork_id) + .is_disjoint(validator_connected_peers) + }) + .collect(); + + let already_connected_peers: HashSet = membership + .members() + .intersection(validator_connected_peers) + .copied() + .collect(); + + // select dispersal peers from the subnetworks we are not connected yet + let selected_dispersal_peers = select_dispersal_peers( + &local_peer_id, + config, + &connected_subnetworks, + // + &HashSet::new(), + ); + + for peer_id in selected_dispersal_peers { + let addr = config + .validator_settings + .addresses + .get(&peer_id) + .expect("Peer address should be in the list") + .clone(); + + executor_swarm + .dial(addr) + .expect("Should schedule the dials"); + + connected_peers.insert(peer_id); + } + + // add peers from the subnetwork we are connected with + connected_peers + .union(&already_connected_peers) + .copied() + .collect() +} + +/// Use selection as per the base [specification](https://www.notion.so/NomosDA-Network-Specification-c6664294d630470ba20aefb21a218f8c?pvs=4#10e8f96fb65c803f9ed9d5a91df3ac83) +fn select_dispersal_peers( + local_peer_id: &PeerId, + config: &DaNetworkExecutorBackendSettings, + filtered_subnetworks: &HashSet, + filtered_peers: &HashSet, +) -> HashSet +where + Membership: MembershipHandler + + Clone + + Debug + + Send + + Sync + + 'static, +{ + let membership = &config.validator_settings.membership; + (0..config.num_subnets as u32) + .filter(|subnetwork_id| !filtered_subnetworks.contains(subnetwork_id)) + .filter_map(|subnetwork_id| { + membership + .members_of(&subnetwork_id) + .difference(filtered_peers) + .find(|&id| id != local_peer_id) + .copied() + }) + .collect() +} diff --git a/nomos-services/data-availability/network/src/backends/libp2p/mod.rs b/nomos-services/data-availability/network/src/backends/libp2p/mod.rs index fa199f24..d7b417bb 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/mod.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/mod.rs @@ -1 +1,3 @@ +pub mod common; +pub mod executor; pub mod validator; diff --git a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs index 8a02dac0..1682595c 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs @@ -1,34 +1,26 @@ +use crate::backends::libp2p::common::{ + dial_validator_subnetwork_peers, handle_sample_request, handle_validator_events_stream, + DaNetworkBackendSettings, SamplingEvent, BROADCAST_CHANNEL_SIZE, +}; use crate::backends::NetworkBackend; use futures::{Stream, StreamExt}; use kzgrs_backend::common::blob::DaBlob; -use kzgrs_backend::common::ColumnIndex; -use libp2p::identity::ed25519; -use libp2p::{Multiaddr, PeerId}; -use log::error; +use libp2p::PeerId; use nomos_core::da::BlobId; -use nomos_da_network_core::protocols::sampling; -use nomos_da_network_core::protocols::sampling::behaviour::{ - BehaviourSampleReq, BehaviourSampleRes, SamplingError, -}; -use nomos_da_network_core::swarm::validator::{ValidatorEventsStream, ValidatorSwarm}; +use nomos_da_network_core::swarm::validator::ValidatorSwarm; use nomos_da_network_core::SubnetworkId; -use nomos_libp2p::secret_key_serde; +use nomos_libp2p::ed25519; use overwatch_rs::overwatch::handle::OverwatchHandle; use overwatch_rs::services::state::NoState; -use serde::{Deserialize, Serialize}; -use std::collections::HashSet; use std::fmt::Debug; use std::marker::PhantomData; use std::pin::Pin; use subnetworks_assignations::MembershipHandler; -use tokio::sync::mpsc::error::SendError; +use tokio::sync::broadcast; use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::{broadcast, mpsc}; use tokio::task::JoinHandle; use tokio_stream::wrappers::BroadcastStream; -const BROADCAST_CHANNEL_SIZE: usize = 128; - /// Message that the backend replies to #[derive(Debug)] pub enum DaNetworkMessage { @@ -48,21 +40,6 @@ pub enum DaNetworkEventKind { Verifying, } -/// Sampling events coming from da network -#[derive(Debug, Clone)] -pub enum SamplingEvent { - /// A success sampling - SamplingSuccess { blob_id: BlobId, blob: Box }, - /// Incoming sampling request - SamplingRequest { - blob_id: BlobId, - column_idx: ColumnIndex, - response_sender: mpsc::Sender>, - }, - /// A failed sampling error - SamplingError { error: SamplingError }, -} - /// DA network incoming events #[derive(Debug)] pub enum DaNetworkEvent { @@ -87,30 +64,6 @@ pub struct DaNetworkValidatorBackend { _membership: PhantomData, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct DaNetworkValidatorBackendSettings { - // Identification Secp256k1 private key in Hex format (`0x123...abc`). Default random. - #[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")] - pub node_key: ed25519::SecretKey, - /// Membership of DA network PoV set - pub membership: Membership, - pub addresses: Vec<(PeerId, Multiaddr)>, - pub listening_address: Multiaddr, -} - -impl DaNetworkValidatorBackend { - /// Send the sampling request to the underlying sampling behaviour - async fn handle_sample_request(&self, subnetwork_id: SubnetworkId, blob_id: BlobId) { - if let Err(SendError((subnetwork_id, blob_id))) = - self.sampling_request_channel.send((subnetwork_id, blob_id)) - { - error!( - "Error requesting sample for subnetwork id : {subnetwork_id}, blob_id: {blob_id:?}" - ); - } - } -} - #[async_trait::async_trait] impl NetworkBackend for DaNetworkValidatorBackend where @@ -121,7 +74,7 @@ where + Sync + 'static, { - type Settings = DaNetworkValidatorBackendSettings; + type Settings = DaNetworkBackendSettings; type State = NoState; type Message = DaNetworkMessage; type EventKind = DaNetworkEventKind; @@ -130,16 +83,11 @@ where fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { let keypair = libp2p::identity::Keypair::from(ed25519::Keypair::from(config.node_key.clone())); - let (mut validator_swarm, events_streams) = ValidatorSwarm::new( + let (mut validator_swarm, validator_events_stream) = ValidatorSwarm::new( keypair, config.membership.clone(), config.addresses.clone().into_iter().collect(), ); - let sampling_request_channel = validator_swarm - .protocol_swarm() - .behaviour() - .sampling_behaviour() - .sample_request_channel(); let address = config.listening_address; // put swarm to listen at the specified configuration address validator_swarm @@ -148,32 +96,17 @@ where .unwrap_or_else(|e| { panic!("Error listening on DA network with address {address}: {e}") }); - // Dial peers in the same subnetworks (Node might participate in multiple). let local_peer_id = *validator_swarm.local_peer_id(); - let mut connected_peers = HashSet::new(); - config - .membership - .membership(&local_peer_id) - .iter() - .flat_map(|subnet| config.membership.members_of(subnet)) - .filter(|peer| peer != &local_peer_id) - .filter_map(|peer| { - config - .addresses - .iter() - .find(|(p, _)| p == &peer) - .map(|(_, addr)| (peer, addr.clone())) - }) - .for_each(|(peer, addr)| { - // Only dial if we haven't already connected to this peer. - if connected_peers.insert(peer) { - validator_swarm - .dial(addr) - .expect("Node should be able to dial peer in a subnet"); - } - }); + dial_validator_subnetwork_peers( + &config.membership, + &config.addresses, + validator_swarm.protocol_swarm_mut(), + local_peer_id, + ); + + let sampling_request_channel = validator_swarm.sample_request_channel(); let task = overwatch_handle.runtime().spawn(validator_swarm.run()); let (sampling_broadcast_sender, sampling_broadcast_receiver) = @@ -183,7 +116,7 @@ where let replies_task = overwatch_handle .runtime() .spawn(handle_validator_events_stream( - events_streams, + validator_events_stream, sampling_broadcast_sender, verifying_broadcast_sender, )); @@ -204,7 +137,7 @@ where subnetwork_id, blob_id, } => { - self.handle_sample_request(subnetwork_id, blob_id).await; + handle_sample_request(&self.sampling_request_channel, subnetwork_id, blob_id).await; } } } @@ -227,73 +160,3 @@ where } } } - -/// Task that handles forwarding of events to the subscriptions channels/stream -async fn handle_validator_events_stream( - events_streams: ValidatorEventsStream, - sampling_broadcast_sender: broadcast::Sender, - validation_broadcast_sender: broadcast::Sender, -) { - let ValidatorEventsStream { - mut sampling_events_receiver, - mut validation_events_receiver, - } = events_streams; - #[allow(clippy::never_loop)] - loop { - // WARNING: `StreamExt::next` is cancellation safe. - // If adding more branches check if such methods are within the cancellation safe set: - // https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety - tokio::select! { - Some(sampling_event) = StreamExt::next(&mut sampling_events_receiver) => { - match sampling_event { - sampling::behaviour::SamplingEvent::SamplingSuccess{ blob_id, blob , .. } => { - if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingSuccess {blob_id, blob}){ - error!("Error in internal broadcast of sampling success: {e:?}"); - } - } - sampling::behaviour::SamplingEvent::IncomingSample{request_receiver, response_sender} => { - if let Ok(BehaviourSampleReq { blob_id, column_idx }) = request_receiver.await { - let (sampling_response_sender, mut sampling_response_receiver) = mpsc::channel(1); - - if let Err(e) = sampling_broadcast_sender - .send(SamplingEvent::SamplingRequest { blob_id, column_idx, response_sender: sampling_response_sender }) - { - error!("Error in internal broadcast of sampling request: {e:?}"); - sampling_response_receiver.close() - } - - if let Some(maybe_blob) = sampling_response_receiver.recv().await { - let result = match maybe_blob { - Some(blob) => BehaviourSampleRes::SamplingSuccess { - blob_id, - subnetwork_id: blob.column_idx as u32, - blob: Box::new(blob), - }, - None => BehaviourSampleRes::SampleNotFound { blob_id }, - }; - - if response_sender.send(result).is_err() { - error!("Error sending sampling success response"); - } - } else if response_sender - .send(BehaviourSampleRes::SampleNotFound { blob_id }) - .is_err() - { - error!("Error sending sampling success response"); - } - } - } - sampling::behaviour::SamplingEvent::SamplingError{ error } => { - if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingError {error}) { - error!{"Error in internal broadcast of sampling error: {e:?}"}; - } - }} - } - Some(da_blob) = StreamExt::next(&mut validation_events_receiver)=> { - if let Err(error) = validation_broadcast_sender.send(da_blob) { - error!("Error in internal broadcast of validation for blob: {:?}", error.0); - } - } - } - } -} diff --git a/nomos-services/data-availability/sampling/src/lib.rs b/nomos-services/data-availability/sampling/src/lib.rs index 1041230a..9e785534 100644 --- a/nomos-services/data-availability/sampling/src/lib.rs +++ b/nomos-services/data-availability/sampling/src/lib.rs @@ -9,7 +9,7 @@ use std::fmt::Debug; use kzgrs_backend::common::blob::DaBlob; use network::NetworkAdapter; use nomos_core::da::BlobId; -use nomos_da_network_service::backends::libp2p::validator::SamplingEvent; +use nomos_da_network_service::backends::libp2p::common::SamplingEvent; use nomos_da_network_service::NetworkService; use nomos_storage::StorageService; use overwatch_rs::services::handle::ServiceStateHandle; diff --git a/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs b/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs index fc2533b0..5ce7c959 100644 --- a/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs +++ b/nomos-services/data-availability/sampling/src/network/adapters/libp2p.rs @@ -10,8 +10,9 @@ use tokio::sync::oneshot; use crate::network::NetworkAdapter; use nomos_core::da::BlobId; use nomos_da_network_core::SubnetworkId; -use nomos_da_network_service::backends::libp2p::validator::{ - DaNetworkEvent, DaNetworkEventKind, DaNetworkMessage, DaNetworkValidatorBackend, SamplingEvent, +use nomos_da_network_service::backends::libp2p::{ + common::SamplingEvent, + validator::{DaNetworkEvent, DaNetworkEventKind, DaNetworkMessage, DaNetworkValidatorBackend}, }; use nomos_da_network_service::{DaNetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; diff --git a/nomos-services/data-availability/sampling/src/network/mod.rs b/nomos-services/data-availability/sampling/src/network/mod.rs index 0a2d0a13..4bc20e2e 100644 --- a/nomos-services/data-availability/sampling/src/network/mod.rs +++ b/nomos-services/data-availability/sampling/src/network/mod.rs @@ -3,7 +3,7 @@ pub mod adapters; use futures::Stream; use nomos_core::da::BlobId; use nomos_da_network_core::SubnetworkId; -use nomos_da_network_service::backends::libp2p::validator::SamplingEvent; +use nomos_da_network_service::backends::libp2p::common::SamplingEvent; use nomos_da_network_service::backends::NetworkBackend; use nomos_da_network_service::NetworkService; use overwatch_rs::services::relay::OutboundRelay; diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index 2ae86d0e..59c05435 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -1,4 +1,5 @@ // std +use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; use std::path::PathBuf; use std::time::Duration; // crates @@ -23,9 +24,7 @@ use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorage use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageSettings; use nomos_da_indexer::DataIndexerService; use nomos_da_indexer::IndexerSettings; -use nomos_da_network_service::backends::libp2p::validator::{ - DaNetworkValidatorBackend, DaNetworkValidatorBackendSettings, -}; +use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackend; use nomos_da_network_service::NetworkConfig as DaNetworkConfig; use nomos_da_network_service::NetworkService as DaNetworkService; use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings; @@ -191,7 +190,7 @@ pub fn new_node( }, }, da_network: DaNetworkConfig { - backend: DaNetworkValidatorBackendSettings { + backend: DaNetworkBackendSettings { node_key: da_network_settings.node_key, membership: FillFromNodeList::new( &da_network_settings @@ -202,7 +201,7 @@ pub fn new_node( da_network_settings.num_subnets.into(), da_network_settings.nodes_per_subnet.into(), ), - addresses: da_network_settings.peer_addresses, + addresses: da_network_settings.peer_addresses.into_iter().collect(), listening_address: da_network_settings.listening_address, }, }, diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 669db1b2..d2657a87 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -20,7 +20,7 @@ use mixnet::{ use nomos_core::{block::Block, header::HeaderId, staking::NMO_UNIT}; use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings; use nomos_da_indexer::IndexerSettings; -use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackendSettings; +use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; use nomos_da_network_service::NetworkConfig as DaNetworkConfig; use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings; use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageAdapterSettings; @@ -351,7 +351,7 @@ impl Node for NomosNode { let subnetwork_ids = membership.membership(&local_peer_id); config.da_verifier.verifier_settings.index = subnetwork_ids; config.da_network.backend.membership = membership; - config.da_network.backend.addresses = peer_addresses.clone(); + config.da_network.backend.addresses = peer_addresses.iter().cloned().collect(); } #[cfg(feature = "mixnet")] @@ -485,7 +485,7 @@ fn create_node_config( blob_selector_settings: (), }, da_network: DaNetworkConfig { - backend: DaNetworkValidatorBackendSettings { + backend: DaNetworkBackendSettings { node_key: swarm_config.node_key, listening_address: Multiaddr::from_str(&format!( "/ip4/127.0.0.1/udp/{}/quic-v1",