1
0
mirror of synced 2025-02-08 13:53:59 +00:00

DA: Executor network service (#795)

* Refactor common things in backends

* Further extract to common

* Extract dial_peers

* Pipe out executor events

* Add wrapper settings

* Dial up dispersal peers

* Clippy happy

* dial_peers -> dial_validator_subnetwork_peers

* Add members list to membership trait

* Implement peers selection and dialing up from specification

* Fix tests

* Fix tiny doc
This commit is contained in:
Daniel Sanchez 2024-10-01 14:44:13 +02:00 committed by GitHub
parent 2672b7bde4
commit 9ecd738d6e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 560 additions and 175 deletions

View File

@ -22,4 +22,8 @@ impl MembershipHandler for AllNeighbours {
fn members_of(&self, _network_id: &Self::NetworkId) -> HashSet<Self::Id> {
self.neighbours.clone()
}
fn members(&self) -> HashSet<Self::Id> {
self.neighbours.clone()
}
}

View File

@ -125,7 +125,7 @@ impl Clone for DispersalError {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum DispersalExecutorEvent {
/// A blob successfully arrived its destination
DispersalSuccess {

View File

@ -25,12 +25,12 @@ use crate::protocols::{
use crate::swarm::common::{
handle_replication_event, handle_sampling_event, handle_validator_dispersal_event,
};
use crate::swarm::validator::ValidatorEventsStream;
use crate::SubnetworkId;
use subnetworks_assignations::MembershipHandler;
pub struct ExecutorEventsStream {
pub sampling_events_receiver: UnboundedReceiverStream<SamplingEvent>,
pub validation_events_receiver: UnboundedReceiverStream<DaBlob>,
pub validator_events_stream: ValidatorEventsStream,
pub dispersal_events_receiver: UnboundedReceiverStream<DispersalExecutorEvent>,
}
@ -66,8 +66,10 @@ where
dispersal_events_sender,
},
ExecutorEventsStream {
validator_events_stream: ValidatorEventsStream {
sampling_events_receiver,
validation_events_receiver,
},
dispersal_events_receiver,
},
)

View File

@ -22,4 +22,8 @@ impl MembershipHandler for AllNeighbours {
fn members_of(&self, _network_id: &Self::NetworkId) -> HashSet<Self::Id> {
self.neighbours.clone()
}
fn members(&self) -> HashSet<Self::Id> {
self.neighbours.clone()
}
}

View File

@ -23,4 +23,6 @@ pub trait MembershipHandler {
// Returns the set of members in a subnetwork by its NetworkId
fn members_of(&self, network_id: &Self::NetworkId) -> HashSet<Self::Id>;
fn members(&self) -> HashSet<Self::Id>;
}

View File

@ -69,6 +69,10 @@ impl MembershipHandler for FillFromNodeList {
fn members_of(&self, network_id: &Self::NetworkId) -> HashSet<Self::Id> {
self.assignations[*network_id as usize].clone()
}
fn members(&self) -> HashSet<Self::Id> {
self.assignations.iter().flatten().copied().collect()
}
}
#[cfg(test)]

View File

@ -92,6 +92,10 @@ impl MembershipHandler for FillWithOriginalReplication {
fn members_of(&self, network_id: &Self::NetworkId) -> HashSet<Self::Id> {
self.assignations[*network_id as usize].clone()
}
fn members(&self) -> HashSet<Self::Id> {
self.assignations.iter().flatten().copied().collect()
}
}
#[cfg(test)]

View File

@ -12,7 +12,7 @@ nomos-core = { path = "../../../nomos-core" }
nomos-libp2p = { path = "../../../nomos-libp2p" }
nomos-da-network-core = { path = "../../../nomos-da/network/core" }
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
libp2p = { version = "0.54", features = ["ed25519"] }
libp2p = { version = "0.53", features = ["ed25519"] }
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["macros", "sync"] }
tokio-stream = "0.1"

View File

@ -0,0 +1,165 @@
use futures::StreamExt;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::common::ColumnIndex;
use libp2p::swarm::NetworkBehaviour;
use libp2p::Swarm;
use log::error;
use nomos_core::da::BlobId;
use nomos_da_network_core::protocols::sampling;
use nomos_da_network_core::protocols::sampling::behaviour::{
BehaviourSampleReq, BehaviourSampleRes, SamplingError,
};
use nomos_da_network_core::swarm::validator::ValidatorEventsStream;
use nomos_da_network_core::SubnetworkId;
use nomos_libp2p::secret_key_serde;
use nomos_libp2p::{ed25519, Multiaddr, PeerId};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use subnetworks_assignations::MembershipHandler;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{broadcast, mpsc};
pub(crate) const BROADCAST_CHANNEL_SIZE: usize = 128;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DaNetworkBackendSettings<Membership> {
// Identification Secp256k1 private key in Hex format (`0x123...abc`). Default random.
#[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")]
pub node_key: ed25519::SecretKey,
/// Membership of DA network PoV set
pub membership: Membership,
pub addresses: HashMap<PeerId, Multiaddr>,
pub listening_address: Multiaddr,
}
/// Sampling events coming from da network
#[derive(Debug, Clone)]
pub enum SamplingEvent {
/// A success sampling
SamplingSuccess { blob_id: BlobId, blob: Box<DaBlob> },
/// Incoming sampling request
SamplingRequest {
blob_id: BlobId,
column_idx: ColumnIndex,
response_sender: mpsc::Sender<Option<DaBlob>>,
},
/// A failed sampling error
SamplingError { error: SamplingError },
}
pub(crate) fn dial_validator_subnetwork_peers<Membership, Behaviour>(
membership: &Membership,
addresses: &HashMap<PeerId, Multiaddr>,
swarm: &mut Swarm<Behaviour>,
local_peer_id: PeerId,
) -> HashSet<PeerId>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
Behaviour: NetworkBehaviour,
{
let mut connected_peers = HashSet::new();
membership
.membership(&local_peer_id)
.iter()
.flat_map(|subnet| membership.members_of(subnet))
.filter(|peer| peer != &local_peer_id)
.filter_map(|peer| addresses.get(&peer).map(|addr| (peer, addr.clone())))
.for_each(|(peer, addr)| {
// Only dial if we haven't already connected to this peer.
if connected_peers.insert(peer) {
swarm
.dial(addr)
.expect("Node should be able to dial peer in a subnet");
}
});
connected_peers
}
/// Task that handles forwarding of events to the subscriptions channels/stream
pub(crate) async fn handle_validator_events_stream(
events_streams: ValidatorEventsStream,
sampling_broadcast_sender: broadcast::Sender<SamplingEvent>,
validation_broadcast_sender: broadcast::Sender<DaBlob>,
) {
let ValidatorEventsStream {
mut sampling_events_receiver,
mut validation_events_receiver,
} = events_streams;
#[allow(clippy::never_loop)]
loop {
// WARNING: `StreamExt::next` is cancellation safe.
// If adding more branches check if such methods are within the cancellation safe set:
// https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
tokio::select! {
Some(sampling_event) = StreamExt::next(&mut sampling_events_receiver) => {
match sampling_event {
sampling::behaviour::SamplingEvent::SamplingSuccess{ blob_id, blob , .. } => {
if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingSuccess {blob_id, blob}){
error!("Error in internal broadcast of sampling success: {e:?}");
}
}
sampling::behaviour::SamplingEvent::IncomingSample{request_receiver, response_sender} => {
if let Ok(BehaviourSampleReq { blob_id, column_idx }) = request_receiver.await {
let (sampling_response_sender, mut sampling_response_receiver) = mpsc::channel(1);
if let Err(e) = sampling_broadcast_sender
.send(SamplingEvent::SamplingRequest { blob_id, column_idx, response_sender: sampling_response_sender })
{
error!("Error in internal broadcast of sampling request: {e:?}");
sampling_response_receiver.close()
}
if let Some(maybe_blob) = sampling_response_receiver.recv().await {
let result = match maybe_blob {
Some(blob) => BehaviourSampleRes::SamplingSuccess {
blob_id,
subnetwork_id: blob.column_idx as u32,
blob: Box::new(blob),
},
None => BehaviourSampleRes::SampleNotFound { blob_id },
};
if response_sender.send(result).is_err() {
error!("Error sending sampling success response");
}
} else if response_sender
.send(BehaviourSampleRes::SampleNotFound { blob_id })
.is_err()
{
error!("Error sending sampling success response");
}
}
}
sampling::behaviour::SamplingEvent::SamplingError{ error } => {
if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingError {error}) {
error!{"Error in internal broadcast of sampling error: {e:?}"};
}
}}
}
Some(da_blob) = StreamExt::next(&mut validation_events_receiver)=> {
if let Err(error) = validation_broadcast_sender.send(da_blob) {
error!("Error in internal broadcast of validation for blob: {:?}", error.0);
}
}
}
}
}
pub(crate) async fn handle_sample_request(
sampling_request_channel: &UnboundedSender<(SubnetworkId, BlobId)>,
subnetwork_id: SubnetworkId,
blob_id: BlobId,
) {
if let Err(SendError((subnetwork_id, blob_id))) =
sampling_request_channel.send((subnetwork_id, blob_id))
{
error!("Error requesting sample for subnetwork id : {subnetwork_id}, blob_id: {blob_id:?}");
}
}

View File

@ -0,0 +1,335 @@
use crate::backends::libp2p::common::{
dial_validator_subnetwork_peers, handle_sample_request, handle_validator_events_stream,
DaNetworkBackendSettings, SamplingEvent, BROADCAST_CHANNEL_SIZE,
};
use crate::backends::NetworkBackend;
use futures::{Stream, StreamExt};
use kzgrs_backend::common::blob::DaBlob;
use libp2p::PeerId;
use log::error;
use nomos_core::da::BlobId;
use nomos_da_network_core::protocols::dispersal::executor::behaviour::DispersalExecutorEvent;
use nomos_da_network_core::swarm::executor::ExecutorSwarm;
use nomos_da_network_core::SubnetworkId;
use nomos_libp2p::ed25519;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use overwatch_rs::services::state::NoState;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::pin::Pin;
use subnetworks_assignations::MembershipHandler;
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream};
/// Message that the backend replies to
#[derive(Debug)]
pub enum DaNetworkMessage {
/// Kickstart a network sapling
RequestSample {
subnetwork_id: SubnetworkId,
blob_id: BlobId,
},
RequestDispersal {
subnetwork_id: SubnetworkId,
da_blob: Box<DaBlob>,
},
}
/// Events types to subscribe to
/// * Sampling: Incoming sampling events [success/fail]
/// * Incoming blobs to be verified
#[derive(Debug)]
pub enum DaNetworkEventKind {
Sampling,
Verifying,
Dispersal,
}
/// DA network incoming events
#[derive(Debug)]
pub enum DaNetworkEvent {
Sampling(SamplingEvent),
Verifying(Box<DaBlob>),
Dispersal(DispersalExecutorEvent),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DaNetworkExecutorBackendSettings<Membership> {
pub validator_settings: DaNetworkBackendSettings<Membership>,
pub num_subnets: u16,
}
/// DA network backend for validators
/// Internally uses a libp2p swarm composed of the [`ExecutorBehaviour`]
/// It forwards network messages to the corresponding subscription channels/streams
pub struct DaNetworkExecutorBackend<Membership>
where
Membership: MembershipHandler,
{
// TODO: this join handles should be cancelable tasks. We should add an stop method for
// the `NetworkBackend` trait so if the service is stopped the backend can gracefully handle open
// sub-tasks as well.
#[allow(dead_code)]
task: JoinHandle<()>,
#[allow(dead_code)]
verifier_replies_task: JoinHandle<()>,
#[allow(dead_code)]
executor_replies_task: JoinHandle<()>,
sampling_request_channel: UnboundedSender<(SubnetworkId, BlobId)>,
sampling_broadcast_receiver: broadcast::Receiver<SamplingEvent>,
verifying_broadcast_receiver: broadcast::Receiver<DaBlob>,
dispersal_broadcast_receiver: broadcast::Receiver<DispersalExecutorEvent>,
dispersal_blobs_sender: UnboundedSender<(Membership::NetworkId, DaBlob)>,
_membership: PhantomData<Membership>,
}
#[async_trait::async_trait]
impl<Membership> NetworkBackend for DaNetworkExecutorBackend<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
{
type Settings = DaNetworkExecutorBackendSettings<Membership>;
type State = NoState<Self::Settings>;
type Message = DaNetworkMessage;
type EventKind = DaNetworkEventKind;
type NetworkEvent = DaNetworkEvent;
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self {
let keypair = libp2p::identity::Keypair::from(ed25519::Keypair::from(
config.validator_settings.node_key.clone(),
));
let (mut executor_swarm, executor_events_stream) = ExecutorSwarm::new(
keypair,
config.validator_settings.membership.clone(),
config
.validator_settings
.addresses
.clone()
.into_iter()
.collect(),
);
let address = config.validator_settings.listening_address.clone();
// put swarm to listen at the specified configuration address
executor_swarm
.protocol_swarm_mut()
.listen_on(address.clone())
.unwrap_or_else(|e| {
panic!("Error listening on DA network with address {address}: {e}")
});
// Dial peers in the same subnetworks (Node might participate in multiple).
let local_peer_id = *executor_swarm.local_peer_id();
let validator_subnetworks_connected_peers = dial_validator_subnetwork_peers(
&config.validator_settings.membership,
&config.validator_settings.addresses,
executor_swarm.protocol_swarm_mut(),
local_peer_id,
);
let dispersal_peers = dial_dispersal_peers(
&mut executor_swarm,
&config,
&validator_subnetworks_connected_peers,
);
let sampling_request_channel = executor_swarm.sample_request_channel();
let dispersal_blobs_sender = executor_swarm.dispersal_blobs_channel();
let executor_open_stream_sender = executor_swarm.dispersal_open_stream_sender();
let task = overwatch_handle.runtime().spawn(executor_swarm.run());
// open streams to dispersal peers
for peer_id in dispersal_peers.iter() {
executor_open_stream_sender.send(*peer_id).unwrap();
}
let (sampling_broadcast_sender, sampling_broadcast_receiver) =
broadcast::channel(BROADCAST_CHANNEL_SIZE);
let (verifying_broadcast_sender, verifying_broadcast_receiver) =
broadcast::channel(BROADCAST_CHANNEL_SIZE);
let (dispersal_broadcast_sender, dispersal_broadcast_receiver) =
broadcast::channel(BROADCAST_CHANNEL_SIZE);
let verifier_replies_task =
overwatch_handle
.runtime()
.spawn(handle_validator_events_stream(
executor_events_stream.validator_events_stream,
sampling_broadcast_sender,
verifying_broadcast_sender,
));
let executor_replies_task =
overwatch_handle
.runtime()
.spawn(handle_executor_dispersal_events_stream(
executor_events_stream.dispersal_events_receiver,
dispersal_broadcast_sender,
));
Self {
task,
verifier_replies_task,
executor_replies_task,
sampling_request_channel,
sampling_broadcast_receiver,
verifying_broadcast_receiver,
dispersal_broadcast_receiver,
dispersal_blobs_sender,
_membership: Default::default(),
}
}
async fn process(&self, msg: Self::Message) {
match msg {
DaNetworkMessage::RequestSample {
subnetwork_id,
blob_id,
} => {
handle_sample_request(&self.sampling_request_channel, subnetwork_id, blob_id).await;
}
DaNetworkMessage::RequestDispersal {
subnetwork_id,
da_blob,
} => {
if let Err(e) = self.dispersal_blobs_sender.send((subnetwork_id, *da_blob)) {
error!("Could not send internal blob to underlying dispersal behaviour: {e}");
}
}
}
}
async fn subscribe(
&mut self,
event: Self::EventKind,
) -> Pin<Box<dyn Stream<Item = Self::NetworkEvent> + Send>> {
match event {
DaNetworkEventKind::Sampling => Box::pin(
BroadcastStream::new(self.sampling_broadcast_receiver.resubscribe())
.filter_map(|event| async { event.ok() })
.map(Self::NetworkEvent::Sampling),
),
DaNetworkEventKind::Verifying => Box::pin(
BroadcastStream::new(self.verifying_broadcast_receiver.resubscribe())
.filter_map(|event| async { event.ok() })
.map(|blob| Self::NetworkEvent::Verifying(Box::new(blob))),
),
DaNetworkEventKind::Dispersal => Box::pin(
BroadcastStream::new(self.dispersal_broadcast_receiver.resubscribe())
.filter_map(|event| async { event.ok() })
.map(Self::NetworkEvent::Dispersal),
),
}
}
}
async fn handle_executor_dispersal_events_stream(
mut dispersal_events_receiver: UnboundedReceiverStream<DispersalExecutorEvent>,
dispersal_broadcast_sender: broadcast::Sender<DispersalExecutorEvent>,
) {
while let Some(event) = dispersal_events_receiver.next().await {
if let Err(e) = dispersal_broadcast_sender.send(event) {
error!("Error forwarding internal dispersal executor event: {e}");
}
}
}
fn dial_dispersal_peers<Membership>(
executor_swarm: &mut ExecutorSwarm<Membership>,
config: &DaNetworkExecutorBackendSettings<Membership>,
validator_connected_peers: &HashSet<PeerId>,
) -> HashSet<PeerId>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
{
let mut connected_peers = HashSet::new();
let local_peer_id = *executor_swarm.local_peer_id();
let membership = &config.validator_settings.membership;
// filter out which subnetworks we already have connections with
let connected_subnetworks: HashSet<SubnetworkId> = (0..config.num_subnets as u32)
.filter(|subnetwork_id| {
!membership
.members_of(subnetwork_id)
.is_disjoint(validator_connected_peers)
})
.collect();
let already_connected_peers: HashSet<PeerId> = membership
.members()
.intersection(validator_connected_peers)
.copied()
.collect();
// select dispersal peers from the subnetworks we are not connected yet
let selected_dispersal_peers = select_dispersal_peers(
&local_peer_id,
config,
&connected_subnetworks,
//
&HashSet::new(),
);
for peer_id in selected_dispersal_peers {
let addr = config
.validator_settings
.addresses
.get(&peer_id)
.expect("Peer address should be in the list")
.clone();
executor_swarm
.dial(addr)
.expect("Should schedule the dials");
connected_peers.insert(peer_id);
}
// add peers from the subnetwork we are connected with
connected_peers
.union(&already_connected_peers)
.copied()
.collect()
}
/// Use selection as per the base [specification](https://www.notion.so/NomosDA-Network-Specification-c6664294d630470ba20aefb21a218f8c?pvs=4#10e8f96fb65c803f9ed9d5a91df3ac83)
fn select_dispersal_peers<Membership>(
local_peer_id: &PeerId,
config: &DaNetworkExecutorBackendSettings<Membership>,
filtered_subnetworks: &HashSet<SubnetworkId>,
filtered_peers: &HashSet<PeerId>,
) -> HashSet<PeerId>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
{
let membership = &config.validator_settings.membership;
(0..config.num_subnets as u32)
.filter(|subnetwork_id| !filtered_subnetworks.contains(subnetwork_id))
.filter_map(|subnetwork_id| {
membership
.members_of(&subnetwork_id)
.difference(filtered_peers)
.find(|&id| id != local_peer_id)
.copied()
})
.collect()
}

View File

@ -1 +1,3 @@
pub mod common;
pub mod executor;
pub mod validator;

View File

@ -1,34 +1,26 @@
use crate::backends::libp2p::common::{
dial_validator_subnetwork_peers, handle_sample_request, handle_validator_events_stream,
DaNetworkBackendSettings, SamplingEvent, BROADCAST_CHANNEL_SIZE,
};
use crate::backends::NetworkBackend;
use futures::{Stream, StreamExt};
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::common::ColumnIndex;
use libp2p::identity::ed25519;
use libp2p::{Multiaddr, PeerId};
use log::error;
use libp2p::PeerId;
use nomos_core::da::BlobId;
use nomos_da_network_core::protocols::sampling;
use nomos_da_network_core::protocols::sampling::behaviour::{
BehaviourSampleReq, BehaviourSampleRes, SamplingError,
};
use nomos_da_network_core::swarm::validator::{ValidatorEventsStream, ValidatorSwarm};
use nomos_da_network_core::swarm::validator::ValidatorSwarm;
use nomos_da_network_core::SubnetworkId;
use nomos_libp2p::secret_key_serde;
use nomos_libp2p::ed25519;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use overwatch_rs::services::state::NoState;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::pin::Pin;
use subnetworks_assignations::MembershipHandler;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::BroadcastStream;
const BROADCAST_CHANNEL_SIZE: usize = 128;
/// Message that the backend replies to
#[derive(Debug)]
pub enum DaNetworkMessage {
@ -48,21 +40,6 @@ pub enum DaNetworkEventKind {
Verifying,
}
/// Sampling events coming from da network
#[derive(Debug, Clone)]
pub enum SamplingEvent {
/// A success sampling
SamplingSuccess { blob_id: BlobId, blob: Box<DaBlob> },
/// Incoming sampling request
SamplingRequest {
blob_id: BlobId,
column_idx: ColumnIndex,
response_sender: mpsc::Sender<Option<DaBlob>>,
},
/// A failed sampling error
SamplingError { error: SamplingError },
}
/// DA network incoming events
#[derive(Debug)]
pub enum DaNetworkEvent {
@ -87,30 +64,6 @@ pub struct DaNetworkValidatorBackend<Membership> {
_membership: PhantomData<Membership>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DaNetworkValidatorBackendSettings<Membership> {
// Identification Secp256k1 private key in Hex format (`0x123...abc`). Default random.
#[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")]
pub node_key: ed25519::SecretKey,
/// Membership of DA network PoV set
pub membership: Membership,
pub addresses: Vec<(PeerId, Multiaddr)>,
pub listening_address: Multiaddr,
}
impl<Membership> DaNetworkValidatorBackend<Membership> {
/// Send the sampling request to the underlying sampling behaviour
async fn handle_sample_request(&self, subnetwork_id: SubnetworkId, blob_id: BlobId) {
if let Err(SendError((subnetwork_id, blob_id))) =
self.sampling_request_channel.send((subnetwork_id, blob_id))
{
error!(
"Error requesting sample for subnetwork id : {subnetwork_id}, blob_id: {blob_id:?}"
);
}
}
}
#[async_trait::async_trait]
impl<Membership> NetworkBackend for DaNetworkValidatorBackend<Membership>
where
@ -121,7 +74,7 @@ where
+ Sync
+ 'static,
{
type Settings = DaNetworkValidatorBackendSettings<Membership>;
type Settings = DaNetworkBackendSettings<Membership>;
type State = NoState<Self::Settings>;
type Message = DaNetworkMessage;
type EventKind = DaNetworkEventKind;
@ -130,16 +83,11 @@ where
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self {
let keypair =
libp2p::identity::Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let (mut validator_swarm, events_streams) = ValidatorSwarm::new(
let (mut validator_swarm, validator_events_stream) = ValidatorSwarm::new(
keypair,
config.membership.clone(),
config.addresses.clone().into_iter().collect(),
);
let sampling_request_channel = validator_swarm
.protocol_swarm()
.behaviour()
.sampling_behaviour()
.sample_request_channel();
let address = config.listening_address;
// put swarm to listen at the specified configuration address
validator_swarm
@ -148,32 +96,17 @@ where
.unwrap_or_else(|e| {
panic!("Error listening on DA network with address {address}: {e}")
});
// Dial peers in the same subnetworks (Node might participate in multiple).
let local_peer_id = *validator_swarm.local_peer_id();
let mut connected_peers = HashSet::new();
config
.membership
.membership(&local_peer_id)
.iter()
.flat_map(|subnet| config.membership.members_of(subnet))
.filter(|peer| peer != &local_peer_id)
.filter_map(|peer| {
config
.addresses
.iter()
.find(|(p, _)| p == &peer)
.map(|(_, addr)| (peer, addr.clone()))
})
.for_each(|(peer, addr)| {
// Only dial if we haven't already connected to this peer.
if connected_peers.insert(peer) {
validator_swarm
.dial(addr)
.expect("Node should be able to dial peer in a subnet");
}
});
dial_validator_subnetwork_peers(
&config.membership,
&config.addresses,
validator_swarm.protocol_swarm_mut(),
local_peer_id,
);
let sampling_request_channel = validator_swarm.sample_request_channel();
let task = overwatch_handle.runtime().spawn(validator_swarm.run());
let (sampling_broadcast_sender, sampling_broadcast_receiver) =
@ -183,7 +116,7 @@ where
let replies_task = overwatch_handle
.runtime()
.spawn(handle_validator_events_stream(
events_streams,
validator_events_stream,
sampling_broadcast_sender,
verifying_broadcast_sender,
));
@ -204,7 +137,7 @@ where
subnetwork_id,
blob_id,
} => {
self.handle_sample_request(subnetwork_id, blob_id).await;
handle_sample_request(&self.sampling_request_channel, subnetwork_id, blob_id).await;
}
}
}
@ -227,73 +160,3 @@ where
}
}
}
/// Task that handles forwarding of events to the subscriptions channels/stream
async fn handle_validator_events_stream(
events_streams: ValidatorEventsStream,
sampling_broadcast_sender: broadcast::Sender<SamplingEvent>,
validation_broadcast_sender: broadcast::Sender<DaBlob>,
) {
let ValidatorEventsStream {
mut sampling_events_receiver,
mut validation_events_receiver,
} = events_streams;
#[allow(clippy::never_loop)]
loop {
// WARNING: `StreamExt::next` is cancellation safe.
// If adding more branches check if such methods are within the cancellation safe set:
// https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
tokio::select! {
Some(sampling_event) = StreamExt::next(&mut sampling_events_receiver) => {
match sampling_event {
sampling::behaviour::SamplingEvent::SamplingSuccess{ blob_id, blob , .. } => {
if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingSuccess {blob_id, blob}){
error!("Error in internal broadcast of sampling success: {e:?}");
}
}
sampling::behaviour::SamplingEvent::IncomingSample{request_receiver, response_sender} => {
if let Ok(BehaviourSampleReq { blob_id, column_idx }) = request_receiver.await {
let (sampling_response_sender, mut sampling_response_receiver) = mpsc::channel(1);
if let Err(e) = sampling_broadcast_sender
.send(SamplingEvent::SamplingRequest { blob_id, column_idx, response_sender: sampling_response_sender })
{
error!("Error in internal broadcast of sampling request: {e:?}");
sampling_response_receiver.close()
}
if let Some(maybe_blob) = sampling_response_receiver.recv().await {
let result = match maybe_blob {
Some(blob) => BehaviourSampleRes::SamplingSuccess {
blob_id,
subnetwork_id: blob.column_idx as u32,
blob: Box::new(blob),
},
None => BehaviourSampleRes::SampleNotFound { blob_id },
};
if response_sender.send(result).is_err() {
error!("Error sending sampling success response");
}
} else if response_sender
.send(BehaviourSampleRes::SampleNotFound { blob_id })
.is_err()
{
error!("Error sending sampling success response");
}
}
}
sampling::behaviour::SamplingEvent::SamplingError{ error } => {
if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingError {error}) {
error!{"Error in internal broadcast of sampling error: {e:?}"};
}
}}
}
Some(da_blob) = StreamExt::next(&mut validation_events_receiver)=> {
if let Err(error) = validation_broadcast_sender.send(da_blob) {
error!("Error in internal broadcast of validation for blob: {:?}", error.0);
}
}
}
}
}

View File

@ -9,7 +9,7 @@ use std::fmt::Debug;
use kzgrs_backend::common::blob::DaBlob;
use network::NetworkAdapter;
use nomos_core::da::BlobId;
use nomos_da_network_service::backends::libp2p::validator::SamplingEvent;
use nomos_da_network_service::backends::libp2p::common::SamplingEvent;
use nomos_da_network_service::NetworkService;
use nomos_storage::StorageService;
use overwatch_rs::services::handle::ServiceStateHandle;

View File

@ -10,8 +10,9 @@ use tokio::sync::oneshot;
use crate::network::NetworkAdapter;
use nomos_core::da::BlobId;
use nomos_da_network_core::SubnetworkId;
use nomos_da_network_service::backends::libp2p::validator::{
DaNetworkEvent, DaNetworkEventKind, DaNetworkMessage, DaNetworkValidatorBackend, SamplingEvent,
use nomos_da_network_service::backends::libp2p::{
common::SamplingEvent,
validator::{DaNetworkEvent, DaNetworkEventKind, DaNetworkMessage, DaNetworkValidatorBackend},
};
use nomos_da_network_service::{DaNetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;

View File

@ -3,7 +3,7 @@ pub mod adapters;
use futures::Stream;
use nomos_core::da::BlobId;
use nomos_da_network_core::SubnetworkId;
use nomos_da_network_service::backends::libp2p::validator::SamplingEvent;
use nomos_da_network_service::backends::libp2p::common::SamplingEvent;
use nomos_da_network_service::backends::NetworkBackend;
use nomos_da_network_service::NetworkService;
use overwatch_rs::services::relay::OutboundRelay;

View File

@ -1,4 +1,5 @@
// std
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
use std::path::PathBuf;
use std::time::Duration;
// crates
@ -23,9 +24,7 @@ use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorage
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageSettings;
use nomos_da_indexer::DataIndexerService;
use nomos_da_indexer::IndexerSettings;
use nomos_da_network_service::backends::libp2p::validator::{
DaNetworkValidatorBackend, DaNetworkValidatorBackendSettings,
};
use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackend;
use nomos_da_network_service::NetworkConfig as DaNetworkConfig;
use nomos_da_network_service::NetworkService as DaNetworkService;
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings;
@ -191,7 +190,7 @@ pub fn new_node(
},
},
da_network: DaNetworkConfig {
backend: DaNetworkValidatorBackendSettings {
backend: DaNetworkBackendSettings {
node_key: da_network_settings.node_key,
membership: FillFromNodeList::new(
&da_network_settings
@ -202,7 +201,7 @@ pub fn new_node(
da_network_settings.num_subnets.into(),
da_network_settings.nodes_per_subnet.into(),
),
addresses: da_network_settings.peer_addresses,
addresses: da_network_settings.peer_addresses.into_iter().collect(),
listening_address: da_network_settings.listening_address,
},
},

View File

@ -20,7 +20,7 @@ use mixnet::{
use nomos_core::{block::Block, header::HeaderId, staking::NMO_UNIT};
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings;
use nomos_da_indexer::IndexerSettings;
use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackendSettings;
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
use nomos_da_network_service::NetworkConfig as DaNetworkConfig;
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings;
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageAdapterSettings;
@ -351,7 +351,7 @@ impl Node for NomosNode {
let subnetwork_ids = membership.membership(&local_peer_id);
config.da_verifier.verifier_settings.index = subnetwork_ids;
config.da_network.backend.membership = membership;
config.da_network.backend.addresses = peer_addresses.clone();
config.da_network.backend.addresses = peer_addresses.iter().cloned().collect();
}
#[cfg(feature = "mixnet")]
@ -485,7 +485,7 @@ fn create_node_config(
blob_selector_settings: (),
},
da_network: DaNetworkConfig {
backend: DaNetworkValidatorBackendSettings {
backend: DaNetworkBackendSettings {
node_key: swarm_config.node_key,
listening_address: Multiaddr::from_str(&format!(
"/ip4/127.0.0.1/udp/{}/quic-v1",