diff --git a/nomos-cli/src/da/network/swarm.rs b/nomos-cli/src/da/network/swarm.rs index 41aa4123..2fcea087 100644 --- a/nomos-cli/src/da/network/swarm.rs +++ b/nomos-cli/src/da/network/swarm.rs @@ -1,5 +1,4 @@ // std -use std::time::Duration; // crates use kzgrs_backend::common::blob::DaBlob; use libp2p::futures::StreamExt; @@ -62,18 +61,19 @@ where } fn build_swarm( - key: Keypair, - membership: Membership, + _key: Keypair, + _membership: Membership, ) -> Swarm> { - libp2p::SwarmBuilder::with_existing_identity(key) - .with_tokio() - .with_quic() - .with_behaviour(|_key| DispersalExecutorBehaviour::new(membership)) - .expect("Validator behaviour should build") - .with_swarm_config(|cfg| { - cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)) - }) - .build() + todo!("CLI will be removed"); + // libp2p::SwarmBuilder::with_existing_identity(key) + // .with_tokio() + // .with_quic() + // .with_behaviour(|_key| DispersalExecutorBehaviour::new(membership)) + // .expect("Validator behaviour should build") + // .with_swarm_config(|cfg| { + // cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)) + // }) + // .build() } pub fn dial(&mut self, addr: Multiaddr) -> Result<(), DialError> { @@ -167,6 +167,7 @@ pub mod test { use tracing_subscriber::EnvFilter; #[tokio::test] + #[ignore] async fn test_dispersal_with_swarms() { let _ = tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) diff --git a/nomos-da/network/core/Cargo.toml b/nomos-da/network/core/Cargo.toml index 63e1dcc4..9a99696e 100644 --- a/nomos-da/network/core/Cargo.toml +++ b/nomos-da/network/core/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +bincode = "1.3" libp2p = { version = "0.53", features = ["macros", "tokio", "quic"] } libp2p-stream = "0.1.0-alpha" futures = "0.3" @@ -18,7 +19,8 @@ void = "1.0.2" either = "1.13.0" log = "0.4" serde = "1.0" -bincode = "1.3" +rand = "0.8" +rand_chacha = "0.3" tokio = { version = "1.39" } tokio-stream = "0.1" thiserror = "1.0" diff --git a/nomos-da/network/core/src/behaviour/executor.rs b/nomos-da/network/core/src/behaviour/executor.rs index f1fd8e5f..d0eb5d64 100644 --- a/nomos-da/network/core/src/behaviour/executor.rs +++ b/nomos-da/network/core/src/behaviour/executor.rs @@ -40,8 +40,12 @@ where pub fn new(key: &Keypair, membership: Membership, addresses: AddressBook) -> Self { let peer_id = PeerId::from_public_key(&key.public()); Self { - sampling: SamplingBehaviour::new(peer_id, membership.clone(), addresses), - executor_dispersal: DispersalExecutorBehaviour::new(membership.clone()), + sampling: SamplingBehaviour::new(peer_id, membership.clone(), addresses.clone()), + executor_dispersal: DispersalExecutorBehaviour::new( + peer_id, + membership.clone(), + addresses.clone(), + ), validator_dispersal: DispersalValidatorBehaviour::new(membership.clone()), replication: ReplicationBehaviour::new(peer_id, membership), } diff --git a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs index 5b6c7035..5105afe7 100644 --- a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs +++ b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs @@ -1,33 +1,39 @@ // std -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::task::{Context, Poll}; // crates use either::Either; use futures::future::BoxFuture; use futures::stream::{BoxStream, FuturesUnordered}; use futures::{AsyncWriteExt, FutureExt, StreamExt}; -use kzgrs_backend::common::blob::DaBlob; use libp2p::core::Endpoint; +use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished}; +use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::{ ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; use libp2p::{Multiaddr, PeerId, Stream}; use libp2p_stream::{Control, OpenStreamError}; +use rand::prelude::IteratorRandom; +use rand::SeedableRng; +use thiserror::Error; +use tokio::sync::mpsc; +use tokio::sync::mpsc::UnboundedSender; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::error; +// internal +use crate::address_book::AddressBook; +use crate::protocol::DISPERSAL_PROTOCOL; +use crate::protocols::clone_deserialize_error; +use crate::SubnetworkId; +use kzgrs_backend::common::blob::DaBlob; use nomos_core::da::BlobId; use nomos_da_messages::common::Blob; use nomos_da_messages::dispersal::dispersal_res::MessageType; use nomos_da_messages::dispersal::{DispersalErr, DispersalReq, DispersalRes}; use nomos_da_messages::{pack_message, unpack_from_reader}; use subnetworks_assignations::MembershipHandler; -use thiserror::Error; -use tokio::sync::mpsc; -use tokio::sync::mpsc::UnboundedSender; -use tokio_stream::wrappers::UnboundedReceiverStream; -// internal -use crate::protocol::DISPERSAL_PROTOCOL; -use crate::protocols::clone_deserialize_error; -use crate::SubnetworkId; #[derive(Debug, Error)] pub enum DispersalError { @@ -149,6 +155,8 @@ type StreamHandlerFuture = BoxFuture<'static, Result { + /// Self id + local_peer_id: PeerId, /// Underlying stream behaviour stream_behaviour: libp2p_stream::Behaviour, /// Pending running tasks (one task per stream) @@ -157,10 +165,16 @@ pub struct DispersalExecutorBehaviour { idle_streams: HashMap, /// Subnetworks membership information membership: Membership, + /// Addresses of known peers in the DA network + addresses: AddressBook, /// Pending blobs that need to be dispersed by PeerId to_disperse: HashMap>, + /// Pending blobs from disconnected networks + disconnected_pending_blobs: HashMap>, /// Already connected peers connection Ids - connected_subnetworks: HashMap, + connected_peers: HashMap, + /// Subnetwork working streams + subnetwork_open_streams: HashSet, /// Sender hook of peers to open streams channel pending_out_streams_sender: UnboundedSender, /// Pending to open streams @@ -176,11 +190,12 @@ where Membership: MembershipHandler + 'static, Membership::NetworkId: Send, { - pub fn new(membership: Membership) -> Self { + pub fn new(local_peer_id: PeerId, membership: Membership, addresses: AddressBook) -> Self { let stream_behaviour = libp2p_stream::Behaviour::new(); let tasks = FuturesUnordered::new(); let to_disperse = HashMap::new(); - let connected_subnetworks = HashMap::new(); + let connected_peers = HashMap::new(); + let subnetwork_open_streams = HashSet::new(); let idle_streams = HashMap::new(); let (pending_out_streams_sender, receiver) = mpsc::unbounded_channel(); let control = stream_behaviour.new_control(); @@ -191,13 +206,18 @@ where let (pending_blobs_sender, receiver) = mpsc::unbounded_channel(); let pending_blobs_stream = UnboundedReceiverStream::new(receiver).boxed(); + let disconnected_pending_blobs = HashMap::new(); Self { + local_peer_id, stream_behaviour, tasks, membership, + addresses, to_disperse, - connected_subnetworks, + disconnected_pending_blobs, + connected_peers, + subnetwork_open_streams, idle_streams, pending_out_streams_sender, pending_out_streams, @@ -326,8 +346,8 @@ impl + 'sta fn disperse_blob( tasks: &mut FuturesUnordered, idle_streams: &mut HashMap, - membership: &mut Membership, - connected_subnetworks: &mut HashMap, + membership: &Membership, + connected_peers: &HashMap, to_disperse: &mut HashMap>, subnetwork_id: SubnetworkId, blob: DaBlob, @@ -335,7 +355,8 @@ impl + 'sta let members = membership.members_of(&subnetwork_id); let peers = members .iter() - .filter(|peer_id| connected_subnetworks.contains_key(peer_id)); + .filter(|peer_id| connected_peers.contains_key(peer_id)); + // We may be connected to more than a single node. Usually will be one, but that is an // internal decision of the executor itself. for peer in peers { @@ -352,6 +373,117 @@ impl + 'sta } } } + + fn reschedule_blobs_for_peer_stream( + stream: &DispersalStream, + membership: &Membership, + to_disperse: &mut HashMap>, + disconnected_pending_blobs: &mut HashMap>, + ) { + let peer_id = stream.peer_id; + let subnetworks = membership.membership(&peer_id); + let entry = to_disperse.entry(peer_id).or_default(); + for subnetwork in subnetworks { + if let Some(blobs) = disconnected_pending_blobs.remove(&subnetwork) { + entry.extend(blobs.into_iter().map(|blob| (subnetwork, blob))); + } + } + } + + fn filter_peers_for_subnetworks<'s>( + &'s self, + peer_id: PeerId, + subnetworks: impl Iterator + 's, + ) -> impl Iterator> + 's { + subnetworks.map(move |subnetwork_id| { + self.membership + .members_of(&subnetwork_id) + .iter() + .filter(|&&peer| peer != peer_id && peer != self.local_peer_id) + .copied() + .collect::>() + }) + } + + fn find_subnetworks_candidates_excluding_peer( + &self, + peer_id: PeerId, + subnetworks: &HashSet, + ) -> HashSet { + let mut peers: HashSet = self + .filter_peers_for_subnetworks(peer_id, subnetworks.iter().copied()) + .reduce(|h1, h2| h1.intersection(&h2).copied().collect()) + .unwrap_or_default(); + // we didn't find a single shared peer for all subnetworks, so we take the smallest subset + if peers.is_empty() { + peers = self + .filter_peers_for_subnetworks(peer_id, subnetworks.iter().copied()) + .reduce(|h1, h2| h1.union(&h2).copied().collect()) + .unwrap_or_default(); + } + peers + } + fn open_streams_for_disconnected_subnetworks_selected_peer(&mut self, peer_id: PeerId) { + let subnetworks = self.membership.membership(&peer_id); + // open stream will result in dialing if we are not yet connected to the peer + for peer in self.find_subnetworks_candidates_excluding_peer(peer_id, &subnetworks) { + if let Err(e) = self.pending_out_streams_sender.send(peer) { + error!("Error requesting stream for peer {peer_id}: {e}"); + } + } + } + + fn prune_blobs_for_peer(&mut self, peer_id: PeerId) -> VecDeque<(SubnetworkId, DaBlob)> { + self.to_disperse.remove(&peer_id).unwrap_or_default() + } + + fn recover_blobs_for_disconnected_subnetworks(&mut self, peer_id: PeerId) { + // push missing blobs into pending ones + let disconnected_pending_blobs = self.prune_blobs_for_peer(peer_id); + for (subnetwork_id, blob) in disconnected_pending_blobs { + self.disconnected_pending_blobs + .entry(subnetwork_id) + .or_default() + .push_back(blob); + } + } + + fn try_ensure_stream_from_missing_subnetwork( + local_peer_id: &PeerId, + pending_out_streams_sender: &mut UnboundedSender, + membership: &Membership, + subnetwork_id: &SubnetworkId, + ) { + let mut rng = rand_chacha::ChaCha20Rng::from_entropy(); + // chose a random peer that is not us + let peer = membership + .members_of(subnetwork_id) + .iter() + .filter(|&peer| peer != local_peer_id) + .choose(&mut rng) + .copied(); + // if we have any, try to connect + if let Some(peer) = peer { + if let Err(e) = pending_out_streams_sender.send(peer) { + error!("Error requesting stream for peer {peer}: {e}"); + } + } + } + + fn handle_connection_established(&mut self, peer_id: PeerId, connection_id: ConnectionId) { + self.connected_peers.insert(peer_id, connection_id); + } + + fn handle_connection_closed(&mut self, peer_id: PeerId) { + let peer_subnetworks = self.membership.membership(&peer_id); + self.subnetwork_open_streams + .retain(|subnetwork_id| !peer_subnetworks.contains(subnetwork_id)); + if self.connected_peers.remove(&peer_id).is_some() { + // mangle pending blobs for disconnected subnetworks from peer + self.recover_blobs_for_disconnected_subnetworks(peer_id); + self.open_streams_for_disconnected_subnetworks_selected_peer(peer_id); + } + } } impl + 'static> NetworkBehaviour @@ -380,14 +512,26 @@ impl + 'static> Netw addr: &Multiaddr, role_override: Endpoint, ) -> Result, ConnectionDenied> { - self.connected_subnetworks.insert(peer, connection_id); self.stream_behaviour .handle_established_outbound_connection(connection_id, peer, addr, role_override) .map(Either::Left) } fn on_swarm_event(&mut self, event: FromSwarm) { - self.stream_behaviour.on_swarm_event(event) + self.stream_behaviour.on_swarm_event(event); + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { + peer_id, + connection_id, + .. + }) => { + self.handle_connection_established(peer_id, connection_id); + } + FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, .. }) => { + self.handle_connection_closed(peer_id); + } + _ => {} + } } fn on_connection_handler_event( @@ -408,13 +552,18 @@ impl + 'static> Netw cx: &mut Context<'_>, ) -> Poll>> { let Self { + local_peer_id, tasks, to_disperse, + disconnected_pending_blobs, idle_streams, pending_out_streams, + pending_out_streams_sender, pending_blobs_stream, membership, - connected_subnetworks, + addresses, + connected_peers, + subnetwork_open_streams, .. } = self; // poll pending tasks @@ -454,20 +603,38 @@ impl + 'static> Netw } // poll pending blobs if let Poll::Ready(Some((subnetwork_id, blob))) = pending_blobs_stream.poll_next_unpin(cx) { - Self::disperse_blob( - tasks, - idle_streams, - membership, - connected_subnetworks, - to_disperse, - subnetwork_id, - blob, - ); + if subnetwork_open_streams.contains(&subnetwork_id) { + Self::disperse_blob( + tasks, + idle_streams, + membership, + connected_peers, + to_disperse, + subnetwork_id, + blob, + ); + } else { + let entry = disconnected_pending_blobs.entry(subnetwork_id).or_default(); + entry.push_back(blob); + Self::try_ensure_stream_from_missing_subnetwork( + local_peer_id, + pending_out_streams_sender, + membership, + &subnetwork_id, + ); + } } // poll pending streams if let Poll::Ready(Some(res)) = pending_out_streams.poll_next_unpin(cx) { match res { Ok(stream) => { + subnetwork_open_streams.extend(membership.membership(&stream.peer_id)); + Self::reschedule_blobs_for_peer_stream( + &stream, + membership, + to_disperse, + disconnected_pending_blobs, + ); Self::handle_stream(tasks, to_disperse, idle_streams, stream); } Err(error) => { @@ -479,7 +646,18 @@ impl + 'static> Netw } // Deal with connection as the underlying behaviour would do match self.stream_behaviour.poll(cx) { - Poll::Ready(ToSwarm::Dial { opts }) => Poll::Ready(ToSwarm::Dial { opts }), + Poll::Ready(ToSwarm::Dial { mut opts }) => { + // attach known peer address if possible + if let Some(address) = opts + .get_peer_id() + .and_then(|peer_id: PeerId| addresses.get_address(&peer_id)) + { + opts = DialOpts::peer_id(opts.get_peer_id().unwrap()) + .addresses(vec![address.clone()]) + .build(); + } + Poll::Ready(ToSwarm::Dial { opts }) + } Poll::Pending => { // TODO: probably must be smarter when to wake this cx.waker().wake_by_ref(); diff --git a/nomos-da/network/core/src/protocols/dispersal/mod.rs b/nomos-da/network/core/src/protocols/dispersal/mod.rs index 90b18d96..b03859b1 100644 --- a/nomos-da/network/core/src/protocols/dispersal/mod.rs +++ b/nomos-da/network/core/src/protocols/dispersal/mod.rs @@ -3,6 +3,7 @@ pub mod validator; #[cfg(test)] pub mod test { + use crate::address_book::AddressBook; use crate::protocols::dispersal::executor::behaviour::DispersalExecutorBehaviour; use crate::protocols::dispersal::validator::behaviour::{ DispersalEvent, DispersalValidatorBehaviour, @@ -15,22 +16,25 @@ pub mod test { use libp2p::swarm::SwarmEvent; use libp2p::{quic, Multiaddr, PeerId}; use log::info; - use std::time::Duration; use subnetworks_assignations::MembershipHandler; use tracing_subscriber::fmt::TestWriter; use tracing_subscriber::EnvFilter; pub fn executor_swarm( + addressbook: AddressBook, key: Keypair, membership: impl MembershipHandler + 'static, ) -> libp2p::Swarm< DispersalExecutorBehaviour>, > { + let peer_id = PeerId::from_public_key(&key.public()); libp2p::SwarmBuilder::with_existing_identity(key) .with_tokio() .with_other_transport(|keypair| quic::tokio::Transport::new(quic::Config::new(keypair))) .unwrap() - .with_behaviour(|_key| DispersalExecutorBehaviour::new(membership)) + .with_behaviour(|_key| { + DispersalExecutorBehaviour::new(peer_id, membership, addressbook) + }) .unwrap() .with_swarm_config(|cfg| { cfg.with_idle_connection_timeout(std::time::Duration::from_secs(u64::MAX)) @@ -74,12 +78,14 @@ pub mod test { .into_iter() .collect(), }; - let mut executor = executor_swarm(k1, neighbours.clone()); + let addr: Multiaddr = "/ip4/127.0.0.1/udp/5063/quic-v1".parse().unwrap(); + let addr2 = addr.clone().with_p2p(validator_peer).unwrap(); + let addressbook = + AddressBook::from_iter([(PeerId::from_public_key(&k2.public()), addr2.clone())]); + let mut executor = executor_swarm(addressbook, k1, neighbours.clone()); let mut validator = validator_swarm(k2, neighbours); let msg_count = 10usize; - let addr: Multiaddr = "/ip4/127.0.0.1/udp/5063/quic-v1".parse().unwrap(); - let addr2 = addr.clone().with_p2p(validator_peer).unwrap(); let validator_task = async move { validator.listen_on(addr).unwrap(); @@ -100,10 +106,6 @@ pub mod test { res }; let join_validator = tokio::spawn(validator_task); - tokio::time::sleep(Duration::from_secs(1)).await; - executor.dial(addr2).unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; - let executor_open_stream_sender = executor.behaviour().open_stream_sender(); let executor_disperse_blob_sender = executor.behaviour().blobs_sender(); let (sender, mut receiver) = tokio::sync::oneshot::channel(); let executor_poll = async move { @@ -119,8 +121,6 @@ pub mod test { } }; let executor_task = tokio::spawn(executor_poll); - executor_open_stream_sender.send(validator_peer).unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; for i in 0..10 { info!("Sending blob: {i}"); executor_disperse_blob_sender