DA: Executor dispersal auto stream (#826)

* Re-structure connection handling properly

* Handle event based connections

* Fix test

* Handling auto reconnection for dispersal behaviour

* Ignore cli test
This commit is contained in:
Daniel Sanchez 2024-10-17 16:10:28 +02:00 committed by GitHub
parent 8dbcf560f9
commit 41a9387b4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 241 additions and 56 deletions

View File

@ -1,5 +1,4 @@
// std
use std::time::Duration;
// crates
use kzgrs_backend::common::blob::DaBlob;
use libp2p::futures::StreamExt;
@ -62,18 +61,19 @@ where
}
fn build_swarm(
key: Keypair,
membership: Membership,
_key: Keypair,
_membership: Membership,
) -> Swarm<DispersalExecutorBehaviour<Membership>> {
libp2p::SwarmBuilder::with_existing_identity(key)
.with_tokio()
.with_quic()
.with_behaviour(|_key| DispersalExecutorBehaviour::new(membership))
.expect("Validator behaviour should build")
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))
})
.build()
todo!("CLI will be removed");
// libp2p::SwarmBuilder::with_existing_identity(key)
// .with_tokio()
// .with_quic()
// .with_behaviour(|_key| DispersalExecutorBehaviour::new(membership))
// .expect("Validator behaviour should build")
// .with_swarm_config(|cfg| {
// cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))
// })
// .build()
}
pub fn dial(&mut self, addr: Multiaddr) -> Result<(), DialError> {
@ -167,6 +167,7 @@ pub mod test {
use tracing_subscriber::EnvFilter;
#[tokio::test]
#[ignore]
async fn test_dispersal_with_swarms() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())

View File

@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
bincode = "1.3"
libp2p = { version = "0.53", features = ["macros", "tokio", "quic"] }
libp2p-stream = "0.1.0-alpha"
futures = "0.3"
@ -18,7 +19,8 @@ void = "1.0.2"
either = "1.13.0"
log = "0.4"
serde = "1.0"
bincode = "1.3"
rand = "0.8"
rand_chacha = "0.3"
tokio = { version = "1.39" }
tokio-stream = "0.1"
thiserror = "1.0"

View File

@ -40,8 +40,12 @@ where
pub fn new(key: &Keypair, membership: Membership, addresses: AddressBook) -> Self {
let peer_id = PeerId::from_public_key(&key.public());
Self {
sampling: SamplingBehaviour::new(peer_id, membership.clone(), addresses),
executor_dispersal: DispersalExecutorBehaviour::new(membership.clone()),
sampling: SamplingBehaviour::new(peer_id, membership.clone(), addresses.clone()),
executor_dispersal: DispersalExecutorBehaviour::new(
peer_id,
membership.clone(),
addresses.clone(),
),
validator_dispersal: DispersalValidatorBehaviour::new(membership.clone()),
replication: ReplicationBehaviour::new(peer_id, membership),
}

View File

@ -1,33 +1,39 @@
// std
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::task::{Context, Poll};
// crates
use either::Either;
use futures::future::BoxFuture;
use futures::stream::{BoxStream, FuturesUnordered};
use futures::{AsyncWriteExt, FutureExt, StreamExt};
use kzgrs_backend::common::blob::DaBlob;
use libp2p::core::Endpoint;
use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished};
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::{
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
};
use libp2p::{Multiaddr, PeerId, Stream};
use libp2p_stream::{Control, OpenStreamError};
use rand::prelude::IteratorRandom;
use rand::SeedableRng;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::error;
// internal
use crate::address_book::AddressBook;
use crate::protocol::DISPERSAL_PROTOCOL;
use crate::protocols::clone_deserialize_error;
use crate::SubnetworkId;
use kzgrs_backend::common::blob::DaBlob;
use nomos_core::da::BlobId;
use nomos_da_messages::common::Blob;
use nomos_da_messages::dispersal::dispersal_res::MessageType;
use nomos_da_messages::dispersal::{DispersalErr, DispersalReq, DispersalRes};
use nomos_da_messages::{pack_message, unpack_from_reader};
use subnetworks_assignations::MembershipHandler;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
// internal
use crate::protocol::DISPERSAL_PROTOCOL;
use crate::protocols::clone_deserialize_error;
use crate::SubnetworkId;
#[derive(Debug, Error)]
pub enum DispersalError {
@ -149,6 +155,8 @@ type StreamHandlerFuture = BoxFuture<'static, Result<StreamHandlerFutureSuccess,
/// It takes care of sending blobs to different subnetworks.
/// Bubbles up events with the success or error when dispersing
pub struct DispersalExecutorBehaviour<Membership: MembershipHandler> {
/// Self id
local_peer_id: PeerId,
/// Underlying stream behaviour
stream_behaviour: libp2p_stream::Behaviour,
/// Pending running tasks (one task per stream)
@ -157,10 +165,16 @@ pub struct DispersalExecutorBehaviour<Membership: MembershipHandler> {
idle_streams: HashMap<PeerId, DispersalStream>,
/// Subnetworks membership information
membership: Membership,
/// Addresses of known peers in the DA network
addresses: AddressBook,
/// Pending blobs that need to be dispersed by PeerId
to_disperse: HashMap<PeerId, VecDeque<(Membership::NetworkId, DaBlob)>>,
/// Pending blobs from disconnected networks
disconnected_pending_blobs: HashMap<Membership::NetworkId, VecDeque<DaBlob>>,
/// Already connected peers connection Ids
connected_subnetworks: HashMap<PeerId, ConnectionId>,
connected_peers: HashMap<PeerId, ConnectionId>,
/// Subnetwork working streams
subnetwork_open_streams: HashSet<SubnetworkId>,
/// Sender hook of peers to open streams channel
pending_out_streams_sender: UnboundedSender<PeerId>,
/// Pending to open streams
@ -176,11 +190,12 @@ where
Membership: MembershipHandler + 'static,
Membership::NetworkId: Send,
{
pub fn new(membership: Membership) -> Self {
pub fn new(local_peer_id: PeerId, membership: Membership, addresses: AddressBook) -> Self {
let stream_behaviour = libp2p_stream::Behaviour::new();
let tasks = FuturesUnordered::new();
let to_disperse = HashMap::new();
let connected_subnetworks = HashMap::new();
let connected_peers = HashMap::new();
let subnetwork_open_streams = HashSet::new();
let idle_streams = HashMap::new();
let (pending_out_streams_sender, receiver) = mpsc::unbounded_channel();
let control = stream_behaviour.new_control();
@ -191,13 +206,18 @@ where
let (pending_blobs_sender, receiver) = mpsc::unbounded_channel();
let pending_blobs_stream = UnboundedReceiverStream::new(receiver).boxed();
let disconnected_pending_blobs = HashMap::new();
Self {
local_peer_id,
stream_behaviour,
tasks,
membership,
addresses,
to_disperse,
connected_subnetworks,
disconnected_pending_blobs,
connected_peers,
subnetwork_open_streams,
idle_streams,
pending_out_streams_sender,
pending_out_streams,
@ -326,8 +346,8 @@ impl<Membership: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'sta
fn disperse_blob(
tasks: &mut FuturesUnordered<StreamHandlerFuture>,
idle_streams: &mut HashMap<Membership::Id, DispersalStream>,
membership: &mut Membership,
connected_subnetworks: &mut HashMap<PeerId, ConnectionId>,
membership: &Membership,
connected_peers: &HashMap<PeerId, ConnectionId>,
to_disperse: &mut HashMap<PeerId, VecDeque<(Membership::NetworkId, DaBlob)>>,
subnetwork_id: SubnetworkId,
blob: DaBlob,
@ -335,7 +355,8 @@ impl<Membership: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'sta
let members = membership.members_of(&subnetwork_id);
let peers = members
.iter()
.filter(|peer_id| connected_subnetworks.contains_key(peer_id));
.filter(|peer_id| connected_peers.contains_key(peer_id));
// We may be connected to more than a single node. Usually will be one, but that is an
// internal decision of the executor itself.
for peer in peers {
@ -352,6 +373,117 @@ impl<Membership: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'sta
}
}
}
fn reschedule_blobs_for_peer_stream(
stream: &DispersalStream,
membership: &Membership,
to_disperse: &mut HashMap<PeerId, VecDeque<(SubnetworkId, DaBlob)>>,
disconnected_pending_blobs: &mut HashMap<SubnetworkId, VecDeque<DaBlob>>,
) {
let peer_id = stream.peer_id;
let subnetworks = membership.membership(&peer_id);
let entry = to_disperse.entry(peer_id).or_default();
for subnetwork in subnetworks {
if let Some(blobs) = disconnected_pending_blobs.remove(&subnetwork) {
entry.extend(blobs.into_iter().map(|blob| (subnetwork, blob)));
}
}
}
fn filter_peers_for_subnetworks<'s>(
&'s self,
peer_id: PeerId,
subnetworks: impl Iterator<Item = SubnetworkId> + 's,
) -> impl Iterator<Item = HashSet<PeerId>> + 's {
subnetworks.map(move |subnetwork_id| {
self.membership
.members_of(&subnetwork_id)
.iter()
.filter(|&&peer| peer != peer_id && peer != self.local_peer_id)
.copied()
.collect::<HashSet<_>>()
})
}
fn find_subnetworks_candidates_excluding_peer(
&self,
peer_id: PeerId,
subnetworks: &HashSet<SubnetworkId>,
) -> HashSet<PeerId> {
let mut peers: HashSet<PeerId> = self
.filter_peers_for_subnetworks(peer_id, subnetworks.iter().copied())
.reduce(|h1, h2| h1.intersection(&h2).copied().collect())
.unwrap_or_default();
// we didn't find a single shared peer for all subnetworks, so we take the smallest subset
if peers.is_empty() {
peers = self
.filter_peers_for_subnetworks(peer_id, subnetworks.iter().copied())
.reduce(|h1, h2| h1.union(&h2).copied().collect())
.unwrap_or_default();
}
peers
}
fn open_streams_for_disconnected_subnetworks_selected_peer(&mut self, peer_id: PeerId) {
let subnetworks = self.membership.membership(&peer_id);
// open stream will result in dialing if we are not yet connected to the peer
for peer in self.find_subnetworks_candidates_excluding_peer(peer_id, &subnetworks) {
if let Err(e) = self.pending_out_streams_sender.send(peer) {
error!("Error requesting stream for peer {peer_id}: {e}");
}
}
}
fn prune_blobs_for_peer(&mut self, peer_id: PeerId) -> VecDeque<(SubnetworkId, DaBlob)> {
self.to_disperse.remove(&peer_id).unwrap_or_default()
}
fn recover_blobs_for_disconnected_subnetworks(&mut self, peer_id: PeerId) {
// push missing blobs into pending ones
let disconnected_pending_blobs = self.prune_blobs_for_peer(peer_id);
for (subnetwork_id, blob) in disconnected_pending_blobs {
self.disconnected_pending_blobs
.entry(subnetwork_id)
.or_default()
.push_back(blob);
}
}
fn try_ensure_stream_from_missing_subnetwork(
local_peer_id: &PeerId,
pending_out_streams_sender: &mut UnboundedSender<PeerId>,
membership: &Membership,
subnetwork_id: &SubnetworkId,
) {
let mut rng = rand_chacha::ChaCha20Rng::from_entropy();
// chose a random peer that is not us
let peer = membership
.members_of(subnetwork_id)
.iter()
.filter(|&peer| peer != local_peer_id)
.choose(&mut rng)
.copied();
// if we have any, try to connect
if let Some(peer) = peer {
if let Err(e) = pending_out_streams_sender.send(peer) {
error!("Error requesting stream for peer {peer}: {e}");
}
}
}
fn handle_connection_established(&mut self, peer_id: PeerId, connection_id: ConnectionId) {
self.connected_peers.insert(peer_id, connection_id);
}
fn handle_connection_closed(&mut self, peer_id: PeerId) {
let peer_subnetworks = self.membership.membership(&peer_id);
self.subnetwork_open_streams
.retain(|subnetwork_id| !peer_subnetworks.contains(subnetwork_id));
if self.connected_peers.remove(&peer_id).is_some() {
// mangle pending blobs for disconnected subnetworks from peer
self.recover_blobs_for_disconnected_subnetworks(peer_id);
self.open_streams_for_disconnected_subnetworks_selected_peer(peer_id);
}
}
}
impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> NetworkBehaviour
@ -380,14 +512,26 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
addr: &Multiaddr,
role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
self.connected_subnetworks.insert(peer, connection_id);
self.stream_behaviour
.handle_established_outbound_connection(connection_id, peer, addr, role_override)
.map(Either::Left)
}
fn on_swarm_event(&mut self, event: FromSwarm) {
self.stream_behaviour.on_swarm_event(event)
self.stream_behaviour.on_swarm_event(event);
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
..
}) => {
self.handle_connection_established(peer_id, connection_id);
}
FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => {
self.handle_connection_closed(peer_id);
}
_ => {}
}
}
fn on_connection_handler_event(
@ -408,13 +552,18 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
let Self {
local_peer_id,
tasks,
to_disperse,
disconnected_pending_blobs,
idle_streams,
pending_out_streams,
pending_out_streams_sender,
pending_blobs_stream,
membership,
connected_subnetworks,
addresses,
connected_peers,
subnetwork_open_streams,
..
} = self;
// poll pending tasks
@ -454,20 +603,38 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
}
// poll pending blobs
if let Poll::Ready(Some((subnetwork_id, blob))) = pending_blobs_stream.poll_next_unpin(cx) {
if subnetwork_open_streams.contains(&subnetwork_id) {
Self::disperse_blob(
tasks,
idle_streams,
membership,
connected_subnetworks,
connected_peers,
to_disperse,
subnetwork_id,
blob,
);
} else {
let entry = disconnected_pending_blobs.entry(subnetwork_id).or_default();
entry.push_back(blob);
Self::try_ensure_stream_from_missing_subnetwork(
local_peer_id,
pending_out_streams_sender,
membership,
&subnetwork_id,
);
}
}
// poll pending streams
if let Poll::Ready(Some(res)) = pending_out_streams.poll_next_unpin(cx) {
match res {
Ok(stream) => {
subnetwork_open_streams.extend(membership.membership(&stream.peer_id));
Self::reschedule_blobs_for_peer_stream(
&stream,
membership,
to_disperse,
disconnected_pending_blobs,
);
Self::handle_stream(tasks, to_disperse, idle_streams, stream);
}
Err(error) => {
@ -479,7 +646,18 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
}
// Deal with connection as the underlying behaviour would do
match self.stream_behaviour.poll(cx) {
Poll::Ready(ToSwarm::Dial { opts }) => Poll::Ready(ToSwarm::Dial { opts }),
Poll::Ready(ToSwarm::Dial { mut opts }) => {
// attach known peer address if possible
if let Some(address) = opts
.get_peer_id()
.and_then(|peer_id: PeerId| addresses.get_address(&peer_id))
{
opts = DialOpts::peer_id(opts.get_peer_id().unwrap())
.addresses(vec![address.clone()])
.build();
}
Poll::Ready(ToSwarm::Dial { opts })
}
Poll::Pending => {
// TODO: probably must be smarter when to wake this
cx.waker().wake_by_ref();

View File

@ -3,6 +3,7 @@ pub mod validator;
#[cfg(test)]
pub mod test {
use crate::address_book::AddressBook;
use crate::protocols::dispersal::executor::behaviour::DispersalExecutorBehaviour;
use crate::protocols::dispersal::validator::behaviour::{
DispersalEvent, DispersalValidatorBehaviour,
@ -15,22 +16,25 @@ pub mod test {
use libp2p::swarm::SwarmEvent;
use libp2p::{quic, Multiaddr, PeerId};
use log::info;
use std::time::Duration;
use subnetworks_assignations::MembershipHandler;
use tracing_subscriber::fmt::TestWriter;
use tracing_subscriber::EnvFilter;
pub fn executor_swarm(
addressbook: AddressBook,
key: Keypair,
membership: impl MembershipHandler<NetworkId = u32, Id = PeerId> + 'static,
) -> libp2p::Swarm<
DispersalExecutorBehaviour<impl MembershipHandler<NetworkId = u32, Id = PeerId>>,
> {
let peer_id = PeerId::from_public_key(&key.public());
libp2p::SwarmBuilder::with_existing_identity(key)
.with_tokio()
.with_other_transport(|keypair| quic::tokio::Transport::new(quic::Config::new(keypair)))
.unwrap()
.with_behaviour(|_key| DispersalExecutorBehaviour::new(membership))
.with_behaviour(|_key| {
DispersalExecutorBehaviour::new(peer_id, membership, addressbook)
})
.unwrap()
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(std::time::Duration::from_secs(u64::MAX))
@ -74,12 +78,14 @@ pub mod test {
.into_iter()
.collect(),
};
let mut executor = executor_swarm(k1, neighbours.clone());
let addr: Multiaddr = "/ip4/127.0.0.1/udp/5063/quic-v1".parse().unwrap();
let addr2 = addr.clone().with_p2p(validator_peer).unwrap();
let addressbook =
AddressBook::from_iter([(PeerId::from_public_key(&k2.public()), addr2.clone())]);
let mut executor = executor_swarm(addressbook, k1, neighbours.clone());
let mut validator = validator_swarm(k2, neighbours);
let msg_count = 10usize;
let addr: Multiaddr = "/ip4/127.0.0.1/udp/5063/quic-v1".parse().unwrap();
let addr2 = addr.clone().with_p2p(validator_peer).unwrap();
let validator_task = async move {
validator.listen_on(addr).unwrap();
@ -100,10 +106,6 @@ pub mod test {
res
};
let join_validator = tokio::spawn(validator_task);
tokio::time::sleep(Duration::from_secs(1)).await;
executor.dial(addr2).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let executor_open_stream_sender = executor.behaviour().open_stream_sender();
let executor_disperse_blob_sender = executor.behaviour().blobs_sender();
let (sender, mut receiver) = tokio::sync::oneshot::channel();
let executor_poll = async move {
@ -119,8 +121,6 @@ pub mod test {
}
};
let executor_task = tokio::spawn(executor_poll);
executor_open_stream_sender.send(validator_peer).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
for i in 0..10 {
info!("Sending blob: {i}");
executor_disperse_blob_sender