diff --git a/nomos-da/network/core/src/address_book.rs b/nomos-da/network/core/src/address_book.rs new file mode 100644 index 00000000..06705619 --- /dev/null +++ b/nomos-da/network/core/src/address_book.rs @@ -0,0 +1,25 @@ +use libp2p::{Multiaddr, PeerId}; +use std::collections::HashMap; +use std::sync::Arc; + +/// Store for known peer addresses +/// It is a simple wrapper around a `HashMap` at the moment. +/// But it should be abstracted here to keep addresses in sync among different libp2p protocols +#[derive(Clone, Debug)] +pub struct AddressBook(Arc>); + +impl AddressBook { + pub fn empty() -> Self { + Self(Arc::new(HashMap::new())) + } + + pub fn get_address(&self, peer_id: &PeerId) -> Option<&Multiaddr> { + self.0.get(peer_id) + } +} + +impl FromIterator<(PeerId, Multiaddr)> for AddressBook { + fn from_iter>(iter: T) -> Self { + Self(Arc::new(iter.into_iter().collect())) + } +} diff --git a/nomos-da/network/core/src/behaviour/validator.rs b/nomos-da/network/core/src/behaviour/validator.rs index 870b88a9..0124a6b0 100644 --- a/nomos-da/network/core/src/behaviour/validator.rs +++ b/nomos-da/network/core/src/behaviour/validator.rs @@ -5,6 +5,7 @@ use libp2p::PeerId; // crates use libp2p::swarm::NetworkBehaviour; // internal +use crate::address_book::AddressBook; use crate::{ protocols::dispersal::validator::behaviour::DispersalValidatorBehaviour, protocols::replication::behaviour::ReplicationBehaviour, @@ -33,10 +34,10 @@ where Membership: MembershipHandler + Clone + Send + 'static, ::NetworkId: Send, { - pub fn new(key: &Keypair, membership: Membership) -> Self { + pub fn new(key: &Keypair, membership: Membership, addresses: AddressBook) -> Self { let peer_id = PeerId::from_public_key(&key.public()); Self { - sampling: SamplingBehaviour::new(peer_id, membership.clone()), + sampling: SamplingBehaviour::new(peer_id, membership.clone(), addresses), dispersal: DispersalValidatorBehaviour::new(membership.clone()), replication: ReplicationBehaviour::new(peer_id, membership), } diff --git a/nomos-da/network/core/src/lib.rs b/nomos-da/network/core/src/lib.rs index 56e71e83..ea0b9f27 100644 --- a/nomos-da/network/core/src/lib.rs +++ b/nomos-da/network/core/src/lib.rs @@ -1,3 +1,4 @@ +pub mod address_book; pub mod behaviour; pub mod protocol; pub mod protocols; diff --git a/nomos-da/network/core/src/protocols/sampling/behaviour.rs b/nomos-da/network/core/src/protocols/sampling/behaviour.rs index 483458b3..10679d1b 100644 --- a/nomos-da/network/core/src/protocols/sampling/behaviour.rs +++ b/nomos-da/network/core/src/protocols/sampling/behaviour.rs @@ -10,6 +10,7 @@ use futures::stream::{BoxStream, FuturesUnordered}; use futures::{AsyncWriteExt, FutureExt, StreamExt}; use kzgrs_backend::common::blob::DaBlob; use libp2p::core::Endpoint; +use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::{ ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, @@ -25,6 +26,7 @@ use tokio::sync::mpsc::UnboundedSender; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::error; // internal +use crate::address_book::AddressBook; use crate::protocol::SAMPLING_PROTOCOL; use crate::protocols::clone_deserialize_error; use crate::SubnetworkId; @@ -167,7 +169,10 @@ type IncomingStreamHandlerFuture = BoxFuture<'static, Result { + /// Self peer id peer_id: PeerId, + /// Addresses of known peers in the DA network + addresses: AddressBook, /// Underlying stream behaviour stream_behaviour: libp2p_stream::Behaviour, /// Incoming sample request streams @@ -195,7 +200,7 @@ where Membership: MembershipHandler + 'static, Membership::NetworkId: Send, { - pub fn new(peer_id: PeerId, membership: Membership) -> Self { + pub fn new(peer_id: PeerId, membership: Membership, addresses: AddressBook) -> Self { let stream_behaviour = libp2p_stream::Behaviour::new(); let mut control = stream_behaviour.new_control(); @@ -213,6 +218,7 @@ where let connected_peers = HashSet::new(); Self { peer_id, + addresses, stream_behaviour, incoming_streams, control, @@ -639,7 +645,18 @@ impl + 'static> Netw } // Deal with connection as the underlying behaviour would do match self.stream_behaviour.poll(cx) { - Poll::Ready(ToSwarm::Dial { opts }) => Poll::Ready(ToSwarm::Dial { opts }), + Poll::Ready(ToSwarm::Dial { mut opts }) => { + // attach known peer address if possible + if let Some(address) = opts + .get_peer_id() + .and_then(|peer_id: PeerId| self.addresses.get_address(&peer_id)) + { + opts = DialOpts::peer_id(opts.get_peer_id().unwrap()) + .addresses(vec![address.clone()]) + .build(); + } + Poll::Ready(ToSwarm::Dial { opts }) + } Poll::Pending => { // TODO: probably must be smarter when to wake this cx.waker().wake_by_ref(); diff --git a/nomos-da/network/core/src/protocols/sampling/mod.rs b/nomos-da/network/core/src/protocols/sampling/mod.rs index b76bdc1e..da295211 100644 --- a/nomos-da/network/core/src/protocols/sampling/mod.rs +++ b/nomos-da/network/core/src/protocols/sampling/mod.rs @@ -2,6 +2,7 @@ pub mod behaviour; #[cfg(test)] mod test { + use crate::address_book::AddressBook; use crate::protocols::sampling::behaviour::{SamplingBehaviour, SamplingEvent}; use crate::test_utils::AllNeighbours; use crate::SubnetworkId; @@ -22,6 +23,7 @@ mod test { pub fn sampling_swarm( key: Keypair, membership: impl MembershipHandler + 'static, + addresses: AddressBook, ) -> Swarm< SamplingBehaviour + 'static>, > { @@ -30,7 +32,11 @@ mod test { .with_other_transport(|key| quic::tokio::Transport::new(quic::Config::new(key))) .unwrap() .with_behaviour(|key| { - SamplingBehaviour::new(PeerId::from_public_key(&key.public()), membership) + SamplingBehaviour::new( + PeerId::from_public_key(&key.public()), + membership, + addresses, + ) }) .unwrap() .with_swarm_config(|cfg| { @@ -66,8 +72,14 @@ mod test { .unwrap() .with_p2p(PeerId::from_public_key(&k2.public())) .unwrap(); - let mut p1 = sampling_swarm(k1.clone(), neighbours.clone()); - let mut p2 = sampling_swarm(k2.clone(), neighbours); + let p1_addresses = vec![(PeerId::from_public_key(&k2.public()), p2_address.clone())]; + let p2_addresses = vec![(PeerId::from_public_key(&k1.public()), p1_address.clone())]; + let mut p1 = sampling_swarm( + k1.clone(), + neighbours.clone(), + p1_addresses.into_iter().collect(), + ); + let mut p2 = sampling_swarm(k2.clone(), neighbours, p2_addresses.into_iter().collect()); let request_sender_1 = p1.behaviour().sample_request_channel(); let request_sender_2 = p2.behaviour().sample_request_channel(); @@ -136,13 +148,11 @@ mod test { let t1 = tokio::spawn(async move { p1.listen_on(p1_address).unwrap(); tokio::time::sleep(Duration::from_secs(1)).await; - p1.dial(_p2_address).unwrap(); test_sampling_swarm(p1).await }); let t2 = tokio::spawn(async move { p2.listen_on(p2_address).unwrap(); tokio::time::sleep(Duration::from_secs(1)).await; - p2.dial(_p1_address).unwrap(); test_sampling_swarm(p2).await }); tokio::time::sleep(Duration::from_secs(2)).await; diff --git a/nomos-da/network/core/src/swarm/validator.rs b/nomos-da/network/core/src/swarm/validator.rs index a59eb083..2a17cef4 100644 --- a/nomos-da/network/core/src/swarm/validator.rs +++ b/nomos-da/network/core/src/swarm/validator.rs @@ -10,6 +10,7 @@ use nomos_da_messages::replication::ReplicationReq; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio_stream::wrappers::UnboundedReceiverStream; // internal +use crate::address_book::AddressBook; use crate::behaviour::validator::{ValidatorBehaviour, ValidatorBehaviourEvent}; use crate::protocols::{ dispersal::validator::behaviour::DispersalEvent, replication::behaviour::ReplicationEvent, @@ -35,7 +36,11 @@ impl ValidatorSwarm where Membership: MembershipHandler + Clone + Send, { - pub fn new(key: Keypair, membership: Membership) -> (Self, ValidatorEventsStream) { + pub fn new( + key: Keypair, + membership: Membership, + addresses: AddressBook, + ) -> (Self, ValidatorEventsStream) { let (sampling_events_sender, sampling_events_receiver) = unbounded_channel(); let (validation_events_sender, validation_events_receiver) = unbounded_channel(); @@ -43,7 +48,7 @@ where let validation_events_receiver = UnboundedReceiverStream::new(validation_events_receiver); ( Self { - swarm: Self::build_swarm(key, membership), + swarm: Self::build_swarm(key, membership, addresses), sampling_events_sender, validation_events_sender, }, @@ -53,11 +58,15 @@ where }, ) } - fn build_swarm(key: Keypair, membership: Membership) -> Swarm> { + fn build_swarm( + key: Keypair, + membership: Membership, + addresses: AddressBook, + ) -> Swarm> { SwarmBuilder::with_existing_identity(key) .with_tokio() .with_quic() - .with_behaviour(|key| ValidatorBehaviour::new(key, membership)) + .with_behaviour(|key| ValidatorBehaviour::new(key, membership, addresses)) .expect("Validator behaviour should build") .build() } diff --git a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs index c2e59b64..ffc222cb 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs @@ -5,6 +5,7 @@ use libp2p::identity::Keypair; use libp2p::{Multiaddr, PeerId}; use log::error; use nomos_core::da::BlobId; +use nomos_da_network_core::address_book::AddressBook; use nomos_da_network_core::protocols::sampling; use nomos_da_network_core::protocols::sampling::behaviour::SamplingError; use nomos_da_network_core::swarm::validator::{ValidatorEventsStream, ValidatorSwarm}; @@ -81,6 +82,7 @@ pub struct DaNetworkValidatorBackendSettings { key: Keypair, /// Membership of DA network PoV set membership: Membership, + addresses: AddressBook, listening_address: Multiaddr, } @@ -115,7 +117,7 @@ where fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self { let (mut validator_swarm, events_streams) = - ValidatorSwarm::new(config.key, config.membership); + ValidatorSwarm::new(config.key, config.membership, config.addresses); let sampling_request_channel = validator_swarm .protocol_swarm() .behaviour()