DA: Executor behaviour + swarm (#768)

* Add executor behaviour

* Implement executor behaviour

* Implement executor swarm

* Fix comment

* Clippy happy

* Bubbled up utility methods on swarms
This commit is contained in:
Daniel Sanchez 2024-09-26 11:58:49 +02:00 committed by GitHub
parent a19a8e8112
commit 02b6969967
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 403 additions and 43 deletions

View File

@ -0,0 +1,93 @@
// std
use libp2p::identity::Keypair;
use libp2p::PeerId;
// crates
use libp2p::swarm::NetworkBehaviour;
// internal
use crate::address_book::AddressBook;
use crate::{
protocols::dispersal::executor::behaviour::DispersalExecutorBehaviour,
protocols::dispersal::validator::behaviour::DispersalValidatorBehaviour,
protocols::replication::behaviour::ReplicationBehaviour,
protocols::sampling::behaviour::SamplingBehaviour,
};
use subnetworks_assignations::MembershipHandler;
/// Aggregated `NetworkBehaviour` composed of:
/// * Sampling
/// * Executor dispersal
/// * Validator dispersal
/// * Replication
/// WARNING: Order of internal protocols matters as the first one will be polled first until return
/// a `Poll::Pending`.
/// 1) Sampling is the crucial one as we have to be responsive for consensus.
/// 2) Dispersal so we do not bottleneck executors.
/// 3) Replication is the least important (and probably the least used), it is also dependant of dispersal.
#[derive(NetworkBehaviour)]
pub struct ExecutorBehaviour<Membership: MembershipHandler> {
sampling: SamplingBehaviour<Membership>,
executor_dispersal: DispersalExecutorBehaviour<Membership>,
validator_dispersal: DispersalValidatorBehaviour<Membership>,
replication: ReplicationBehaviour<Membership>,
}
impl<Membership> ExecutorBehaviour<Membership>
where
Membership: MembershipHandler + Clone + Send + 'static,
<Membership as MembershipHandler>::NetworkId: Send,
{
pub fn new(key: &Keypair, membership: Membership, addresses: AddressBook) -> Self {
let peer_id = PeerId::from_public_key(&key.public());
Self {
sampling: SamplingBehaviour::new(peer_id, membership.clone(), addresses),
executor_dispersal: DispersalExecutorBehaviour::new(membership.clone()),
validator_dispersal: DispersalValidatorBehaviour::new(membership.clone()),
replication: ReplicationBehaviour::new(peer_id, membership),
}
}
pub fn update_membership(&mut self, membership: Membership) {
// TODO: share membership
self.sampling.update_membership(membership.clone());
self.executor_dispersal
.update_membership(membership.clone());
self.replication.update_membership(membership);
}
pub fn sampling_behaviour(&self) -> &SamplingBehaviour<Membership> {
&self.sampling
}
pub fn dispersal_executor_behaviour(&self) -> &DispersalExecutorBehaviour<Membership> {
&self.executor_dispersal
}
pub fn dispersal_validator_behaviour(&self) -> &DispersalValidatorBehaviour<Membership> {
&self.validator_dispersal
}
pub fn replication_behaviour(&self) -> &ReplicationBehaviour<Membership> {
&self.replication
}
pub fn sampling_behaviour_mut(&mut self) -> &mut SamplingBehaviour<Membership> {
&mut self.sampling
}
pub fn dispersal_executor_behaviour_mut(
&mut self,
) -> &mut DispersalExecutorBehaviour<Membership> {
&mut self.executor_dispersal
}
pub fn dispersal_validator_behaviour_mut(
&mut self,
) -> &mut DispersalValidatorBehaviour<Membership> {
&mut self.validator_dispersal
}
pub fn replication_behaviour_mut(&mut self) -> &mut ReplicationBehaviour<Membership> {
&mut self.replication
}
}

View File

@ -1 +1,2 @@
pub mod executor;
pub mod validator; pub mod validator;

View File

@ -206,6 +206,10 @@ where
} }
} }
pub fn update_membership(&mut self, membership: Membership) {
self.membership = membership;
}
/// Open a new stream from the underlying control to the provided peer /// Open a new stream from the underlying control to the provided peer
async fn open_stream( async fn open_stream(
peer_id: PeerId, peer_id: PeerId,

View File

@ -0,0 +1,68 @@
use crate::protocols::dispersal::validator::behaviour::DispersalEvent;
use crate::protocols::replication::behaviour::{ReplicationBehaviour, ReplicationEvent};
use crate::protocols::sampling::behaviour::SamplingEvent;
use crate::SubnetworkId;
use kzgrs_backend::common::blob::DaBlob;
use libp2p::PeerId;
use log::{debug, error};
use nomos_da_messages::replication::ReplicationReq;
use subnetworks_assignations::MembershipHandler;
use tokio::sync::mpsc::UnboundedSender;
pub async fn handle_validator_dispersal_event<Membership>(
validation_events_sender: &mut UnboundedSender<DaBlob>,
replication_behaviour: &mut ReplicationBehaviour<Membership>,
event: DispersalEvent,
) where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>,
{
match event {
// Send message for replication
DispersalEvent::IncomingMessage { message } => {
if let Ok(blob) = bincode::deserialize::<DaBlob>(
message
.blob
.as_ref()
.expect("Message blob should not be empty")
.data
.as_slice(),
) {
if let Err(e) = validation_events_sender.send(blob) {
error!("Error sending blob to validation: {e:?}");
}
}
replication_behaviour.send_message(ReplicationReq {
blob: message.blob,
subnetwork_id: message.subnetwork_id,
});
}
}
}
pub async fn handle_sampling_event(
sampling_events_sender: &mut UnboundedSender<SamplingEvent>,
event: SamplingEvent,
) {
if let Err(e) = sampling_events_sender.send(event) {
debug!("Error distributing sampling message internally: {e:?}");
}
}
pub async fn handle_replication_event(
validation_events_sender: &mut UnboundedSender<DaBlob>,
event: ReplicationEvent,
) {
let ReplicationEvent::IncomingMessage { message, .. } = event;
if let Ok(blob) = bincode::deserialize::<DaBlob>(
message
.blob
.as_ref()
.expect("Message blob should not be empty")
.data
.as_slice(),
) {
if let Err(e) = validation_events_sender.send(blob) {
error!("Error sending blob to validation: {e:?}");
}
}
}

View File

@ -0,0 +1,205 @@
use std::io;
// std
use std::time::Duration;
// crates
use futures::StreamExt;
use kzgrs_backend::common::blob::DaBlob;
use libp2p::core::transport::ListenerId;
use libp2p::identity::Keypair;
use libp2p::swarm::{DialError, SwarmEvent};
use libp2p::{Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError};
use log::debug;
use nomos_core::da::BlobId;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
// internal
use crate::address_book::AddressBook;
use crate::behaviour::executor::{ExecutorBehaviour, ExecutorBehaviourEvent};
use crate::protocols::{
dispersal::{
executor::behaviour::DispersalExecutorEvent, validator::behaviour::DispersalEvent,
},
replication::behaviour::ReplicationEvent,
sampling::behaviour::SamplingEvent,
};
use crate::swarm::common::{
handle_replication_event, handle_sampling_event, handle_validator_dispersal_event,
};
use crate::SubnetworkId;
use subnetworks_assignations::MembershipHandler;
pub struct ExecutorEventsStream {
pub sampling_events_receiver: UnboundedReceiverStream<SamplingEvent>,
pub validation_events_receiver: UnboundedReceiverStream<DaBlob>,
pub dispersal_events_receiver: UnboundedReceiverStream<DispersalExecutorEvent>,
}
pub struct ExecutorSwarm<
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId> + 'static,
> {
swarm: Swarm<ExecutorBehaviour<Membership>>,
sampling_events_sender: UnboundedSender<SamplingEvent>,
validation_events_sender: UnboundedSender<DaBlob>,
dispersal_events_sender: UnboundedSender<DispersalExecutorEvent>,
}
impl<Membership> ExecutorSwarm<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId> + Clone + Send,
{
pub fn new(
key: Keypair,
membership: Membership,
addresses: AddressBook,
) -> (Self, ExecutorEventsStream) {
let (sampling_events_sender, sampling_events_receiver) = unbounded_channel();
let (validation_events_sender, validation_events_receiver) = unbounded_channel();
let (dispersal_events_sender, dispersal_events_receiver) = unbounded_channel();
let sampling_events_receiver = UnboundedReceiverStream::new(sampling_events_receiver);
let validation_events_receiver = UnboundedReceiverStream::new(validation_events_receiver);
let dispersal_events_receiver = UnboundedReceiverStream::new(dispersal_events_receiver);
(
Self {
swarm: Self::build_swarm(key, membership, addresses),
sampling_events_sender,
validation_events_sender,
dispersal_events_sender,
},
ExecutorEventsStream {
sampling_events_receiver,
validation_events_receiver,
dispersal_events_receiver,
},
)
}
fn build_swarm(
key: Keypair,
membership: Membership,
addresses: AddressBook,
) -> Swarm<ExecutorBehaviour<Membership>> {
SwarmBuilder::with_existing_identity(key)
.with_tokio()
.with_quic()
.with_behaviour(|key| ExecutorBehaviour::new(key, membership, addresses))
.expect("Validator behaviour should build")
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))
})
.build()
}
pub fn dial(&mut self, addr: Multiaddr) -> Result<(), DialError> {
self.swarm.dial(addr)?;
Ok(())
}
pub fn listen_on(
&mut self,
address: Multiaddr,
) -> Result<ListenerId, TransportError<io::Error>> {
self.swarm.listen_on(address)
}
pub fn sample_request_channel(&mut self) -> UnboundedSender<(Membership::NetworkId, BlobId)> {
self.swarm
.behaviour()
.sampling_behaviour()
.sample_request_channel()
}
pub fn dispersal_blobs_channel(&mut self) -> UnboundedSender<(Membership::NetworkId, DaBlob)> {
self.swarm
.behaviour()
.dispersal_executor_behaviour()
.blobs_sender()
}
pub fn dispersal_open_stream_sender(&mut self) -> UnboundedSender<PeerId> {
self.swarm
.behaviour()
.dispersal_executor_behaviour()
.open_stream_sender()
}
pub fn local_peer_id(&self) -> &PeerId {
self.swarm.local_peer_id()
}
pub fn protocol_swarm(&self) -> &Swarm<ExecutorBehaviour<Membership>> {
&self.swarm
}
pub fn protocol_swarm_mut(&mut self) -> &mut Swarm<ExecutorBehaviour<Membership>> {
&mut self.swarm
}
async fn handle_sampling_event(&mut self, event: SamplingEvent) {
handle_sampling_event(&mut self.sampling_events_sender, event).await
}
async fn handle_executor_dispersal_event(&mut self, event: DispersalExecutorEvent) {
if let Err(e) = self.dispersal_events_sender.send(event) {
debug!("Error distributing sampling message internally: {e:?}");
}
}
async fn handle_validator_dispersal_event(&mut self, event: DispersalEvent) {
handle_validator_dispersal_event(
&mut self.validation_events_sender,
self.swarm.behaviour_mut().replication_behaviour_mut(),
event,
)
.await
}
async fn handle_replication_event(&mut self, event: ReplicationEvent) {
handle_replication_event(&mut self.validation_events_sender, event).await
}
async fn handle_behaviour_event(&mut self, event: ExecutorBehaviourEvent<Membership>) {
match event {
ExecutorBehaviourEvent::Sampling(event) => {
self.handle_sampling_event(event).await;
}
ExecutorBehaviourEvent::ExecutorDispersal(event) => {
self.handle_executor_dispersal_event(event).await;
}
ExecutorBehaviourEvent::ValidatorDispersal(event) => {
self.handle_validator_dispersal_event(event).await;
}
ExecutorBehaviourEvent::Replication(event) => {
self.handle_replication_event(event).await;
}
}
}
pub async fn run(mut self) {
loop {
if let Some(event) = self.swarm.next().await {
debug!("Da swarm event received: {event:?}");
match event {
SwarmEvent::Behaviour(behaviour_event) => {
self.handle_behaviour_event(behaviour_event).await;
}
SwarmEvent::ConnectionEstablished { .. } => {}
SwarmEvent::ConnectionClosed { .. } => {}
SwarmEvent::IncomingConnection { .. } => {}
SwarmEvent::IncomingConnectionError { .. } => {}
SwarmEvent::OutgoingConnectionError { .. } => {}
SwarmEvent::NewListenAddr { .. } => {}
SwarmEvent::ExpiredListenAddr { .. } => {}
SwarmEvent::ListenerClosed { .. } => {}
SwarmEvent::ListenerError { .. } => {}
SwarmEvent::Dialing { .. } => {}
SwarmEvent::NewExternalAddrCandidate { .. } => {}
SwarmEvent::ExternalAddrConfirmed { .. } => {}
SwarmEvent::ExternalAddrExpired { .. } => {}
SwarmEvent::NewExternalAddrOfPeer { .. } => {}
event => {
debug!("Unsupported validator swarm event: {event:?}");
}
}
}
}
}
}

View File

@ -1 +1,3 @@
pub(crate) mod common;
pub mod executor;
pub mod validator; pub mod validator;

View File

@ -1,13 +1,15 @@
use std::io;
// std // std
use std::time::Duration; use std::time::Duration;
// crates // crates
use futures::StreamExt; use futures::StreamExt;
use kzgrs_backend::common::blob::DaBlob; use kzgrs_backend::common::blob::DaBlob;
use libp2p::core::transport::ListenerId;
use libp2p::identity::Keypair; use libp2p::identity::Keypair;
use libp2p::swarm::{DialError, SwarmEvent}; use libp2p::swarm::{DialError, SwarmEvent};
use libp2p::{Multiaddr, PeerId, Swarm, SwarmBuilder}; use libp2p::{Multiaddr, PeerId, Swarm, SwarmBuilder, TransportError};
use log::{debug, error}; use log::debug;
use nomos_da_messages::replication::ReplicationReq; use nomos_core::da::BlobId;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
// internal // internal
@ -17,6 +19,9 @@ use crate::protocols::{
dispersal::validator::behaviour::DispersalEvent, replication::behaviour::ReplicationEvent, dispersal::validator::behaviour::DispersalEvent, replication::behaviour::ReplicationEvent,
sampling::behaviour::SamplingEvent, sampling::behaviour::SamplingEvent,
}; };
use crate::swarm::common::{
handle_replication_event, handle_sampling_event, handle_validator_dispersal_event,
};
use crate::SubnetworkId; use crate::SubnetworkId;
use subnetworks_assignations::MembershipHandler; use subnetworks_assignations::MembershipHandler;
@ -80,6 +85,20 @@ where
Ok(()) Ok(())
} }
pub fn listen_on(
&mut self,
address: Multiaddr,
) -> Result<ListenerId, TransportError<io::Error>> {
self.swarm.listen_on(address)
}
pub fn sample_request_channel(&mut self) -> UnboundedSender<(Membership::NetworkId, BlobId)> {
self.swarm
.behaviour()
.sampling_behaviour()
.sample_request_channel()
}
pub fn local_peer_id(&self) -> &PeerId { pub fn local_peer_id(&self) -> &PeerId {
self.swarm.local_peer_id() self.swarm.local_peer_id()
} }
@ -93,52 +112,20 @@ where
} }
async fn handle_sampling_event(&mut self, event: SamplingEvent) { async fn handle_sampling_event(&mut self, event: SamplingEvent) {
if let Err(e) = self.sampling_events_sender.send(event) { handle_sampling_event(&mut self.sampling_events_sender, event).await
debug!("Error distributing sampling message internally: {e:?}");
}
} }
async fn handle_dispersal_event(&mut self, event: DispersalEvent) { async fn handle_dispersal_event(&mut self, event: DispersalEvent) {
match event { handle_validator_dispersal_event(
// Send message for replication &mut self.validation_events_sender,
DispersalEvent::IncomingMessage { message } => { self.swarm.behaviour_mut().replication_behaviour_mut(),
if let Ok(blob) = bincode::deserialize::<DaBlob>( event,
message )
.blob .await
.as_ref()
.expect("Message blob should not be empty")
.data
.as_slice(),
) {
if let Err(e) = self.validation_events_sender.send(blob) {
error!("Error sending blob to validation: {e:?}");
}
}
self.swarm
.behaviour_mut()
.replication_behaviour_mut()
.send_message(ReplicationReq {
blob: message.blob,
subnetwork_id: message.subnetwork_id,
});
}
}
} }
async fn handle_replication_event(&mut self, event: ReplicationEvent) { async fn handle_replication_event(&mut self, event: ReplicationEvent) {
let ReplicationEvent::IncomingMessage { message, .. } = event; handle_replication_event(&mut self.validation_events_sender, event).await
if let Ok(blob) = bincode::deserialize::<DaBlob>(
message
.blob
.as_ref()
.expect("Message blob should not be empty")
.data
.as_slice(),
) {
if let Err(e) = self.validation_events_sender.send(blob) {
error!("Error sending blob to validation: {e:?}");
}
}
} }
async fn handle_behaviour_event(&mut self, event: ValidatorBehaviourEvent<Membership>) { async fn handle_behaviour_event(&mut self, event: ValidatorBehaviourEvent<Membership>) {