1
0
mirror of synced 2025-01-10 15:56:03 +00:00

Da: Network addresses (#703)

* Crate address book

* Add addressbook on necesary places
Incorporate to sampling test

* fmt happy
This commit is contained in:
Daniel Sanchez 2024-08-27 12:15:45 +02:00 committed by GitHub
parent 2f6d265aa6
commit c611319622
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 79 additions and 14 deletions

View File

@ -0,0 +1,25 @@
use libp2p::{Multiaddr, PeerId};
use std::collections::HashMap;
use std::sync::Arc;
/// Store for known peer addresses
/// It is a simple wrapper around a `HashMap` at the moment.
/// But it should be abstracted here to keep addresses in sync among different libp2p protocols
#[derive(Clone, Debug)]
pub struct AddressBook(Arc<HashMap<PeerId, Multiaddr>>);
impl AddressBook {
pub fn empty() -> Self {
Self(Arc::new(HashMap::new()))
}
pub fn get_address(&self, peer_id: &PeerId) -> Option<&Multiaddr> {
self.0.get(peer_id)
}
}
impl FromIterator<(PeerId, Multiaddr)> for AddressBook {
fn from_iter<T: IntoIterator<Item = (PeerId, Multiaddr)>>(iter: T) -> Self {
Self(Arc::new(iter.into_iter().collect()))
}
}

View File

@ -5,6 +5,7 @@ use libp2p::PeerId;
// crates
use libp2p::swarm::NetworkBehaviour;
// internal
use crate::address_book::AddressBook;
use crate::{
protocols::dispersal::validator::behaviour::DispersalValidatorBehaviour,
protocols::replication::behaviour::ReplicationBehaviour,
@ -33,10 +34,10 @@ where
Membership: MembershipHandler + Clone + Send + 'static,
<Membership as MembershipHandler>::NetworkId: Send,
{
pub fn new(key: &Keypair, membership: Membership) -> Self {
pub fn new(key: &Keypair, membership: Membership, addresses: AddressBook) -> Self {
let peer_id = PeerId::from_public_key(&key.public());
Self {
sampling: SamplingBehaviour::new(peer_id, membership.clone()),
sampling: SamplingBehaviour::new(peer_id, membership.clone(), addresses),
dispersal: DispersalValidatorBehaviour::new(membership.clone()),
replication: ReplicationBehaviour::new(peer_id, membership),
}

View File

@ -1,3 +1,4 @@
pub mod address_book;
pub mod behaviour;
pub mod protocol;
pub mod protocols;

View File

@ -10,6 +10,7 @@ use futures::stream::{BoxStream, FuturesUnordered};
use futures::{AsyncWriteExt, FutureExt, StreamExt};
use kzgrs_backend::common::blob::DaBlob;
use libp2p::core::Endpoint;
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::{
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
@ -25,6 +26,7 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::error;
// internal
use crate::address_book::AddressBook;
use crate::protocol::SAMPLING_PROTOCOL;
use crate::protocols::clone_deserialize_error;
use crate::SubnetworkId;
@ -167,7 +169,10 @@ type IncomingStreamHandlerFuture = BoxFuture<'static, Result<SampleStream, Sampl
/// Executor sampling protocol
/// Takes care of sending and replying sampling requests
pub struct SamplingBehaviour<Membership: MembershipHandler> {
/// Self peer id
peer_id: PeerId,
/// Addresses of known peers in the DA network
addresses: AddressBook,
/// Underlying stream behaviour
stream_behaviour: libp2p_stream::Behaviour,
/// Incoming sample request streams
@ -195,7 +200,7 @@ where
Membership: MembershipHandler + 'static,
Membership::NetworkId: Send,
{
pub fn new(peer_id: PeerId, membership: Membership) -> Self {
pub fn new(peer_id: PeerId, membership: Membership, addresses: AddressBook) -> Self {
let stream_behaviour = libp2p_stream::Behaviour::new();
let mut control = stream_behaviour.new_control();
@ -213,6 +218,7 @@ where
let connected_peers = HashSet::new();
Self {
peer_id,
addresses,
stream_behaviour,
incoming_streams,
control,
@ -639,7 +645,18 @@ impl<M: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static> Netw
}
// Deal with connection as the underlying behaviour would do
match self.stream_behaviour.poll(cx) {
Poll::Ready(ToSwarm::Dial { opts }) => Poll::Ready(ToSwarm::Dial { opts }),
Poll::Ready(ToSwarm::Dial { mut opts }) => {
// attach known peer address if possible
if let Some(address) = opts
.get_peer_id()
.and_then(|peer_id: PeerId| self.addresses.get_address(&peer_id))
{
opts = DialOpts::peer_id(opts.get_peer_id().unwrap())
.addresses(vec![address.clone()])
.build();
}
Poll::Ready(ToSwarm::Dial { opts })
}
Poll::Pending => {
// TODO: probably must be smarter when to wake this
cx.waker().wake_by_ref();

View File

@ -2,6 +2,7 @@ pub mod behaviour;
#[cfg(test)]
mod test {
use crate::address_book::AddressBook;
use crate::protocols::sampling::behaviour::{SamplingBehaviour, SamplingEvent};
use crate::test_utils::AllNeighbours;
use crate::SubnetworkId;
@ -22,6 +23,7 @@ mod test {
pub fn sampling_swarm(
key: Keypair,
membership: impl MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static,
addresses: AddressBook,
) -> Swarm<
SamplingBehaviour<impl MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'static>,
> {
@ -30,7 +32,11 @@ mod test {
.with_other_transport(|key| quic::tokio::Transport::new(quic::Config::new(key)))
.unwrap()
.with_behaviour(|key| {
SamplingBehaviour::new(PeerId::from_public_key(&key.public()), membership)
SamplingBehaviour::new(
PeerId::from_public_key(&key.public()),
membership,
addresses,
)
})
.unwrap()
.with_swarm_config(|cfg| {
@ -66,8 +72,14 @@ mod test {
.unwrap()
.with_p2p(PeerId::from_public_key(&k2.public()))
.unwrap();
let mut p1 = sampling_swarm(k1.clone(), neighbours.clone());
let mut p2 = sampling_swarm(k2.clone(), neighbours);
let p1_addresses = vec![(PeerId::from_public_key(&k2.public()), p2_address.clone())];
let p2_addresses = vec![(PeerId::from_public_key(&k1.public()), p1_address.clone())];
let mut p1 = sampling_swarm(
k1.clone(),
neighbours.clone(),
p1_addresses.into_iter().collect(),
);
let mut p2 = sampling_swarm(k2.clone(), neighbours, p2_addresses.into_iter().collect());
let request_sender_1 = p1.behaviour().sample_request_channel();
let request_sender_2 = p2.behaviour().sample_request_channel();
@ -136,13 +148,11 @@ mod test {
let t1 = tokio::spawn(async move {
p1.listen_on(p1_address).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
p1.dial(_p2_address).unwrap();
test_sampling_swarm(p1).await
});
let t2 = tokio::spawn(async move {
p2.listen_on(p2_address).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
p2.dial(_p1_address).unwrap();
test_sampling_swarm(p2).await
});
tokio::time::sleep(Duration::from_secs(2)).await;

View File

@ -10,6 +10,7 @@ use nomos_da_messages::replication::ReplicationReq;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
// internal
use crate::address_book::AddressBook;
use crate::behaviour::validator::{ValidatorBehaviour, ValidatorBehaviourEvent};
use crate::protocols::{
dispersal::validator::behaviour::DispersalEvent, replication::behaviour::ReplicationEvent,
@ -35,7 +36,11 @@ impl<Membership> ValidatorSwarm<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId> + Clone + Send,
{
pub fn new(key: Keypair, membership: Membership) -> (Self, ValidatorEventsStream) {
pub fn new(
key: Keypair,
membership: Membership,
addresses: AddressBook,
) -> (Self, ValidatorEventsStream) {
let (sampling_events_sender, sampling_events_receiver) = unbounded_channel();
let (validation_events_sender, validation_events_receiver) = unbounded_channel();
@ -43,7 +48,7 @@ where
let validation_events_receiver = UnboundedReceiverStream::new(validation_events_receiver);
(
Self {
swarm: Self::build_swarm(key, membership),
swarm: Self::build_swarm(key, membership, addresses),
sampling_events_sender,
validation_events_sender,
},
@ -53,11 +58,15 @@ where
},
)
}
fn build_swarm(key: Keypair, membership: Membership) -> Swarm<ValidatorBehaviour<Membership>> {
fn build_swarm(
key: Keypair,
membership: Membership,
addresses: AddressBook,
) -> Swarm<ValidatorBehaviour<Membership>> {
SwarmBuilder::with_existing_identity(key)
.with_tokio()
.with_quic()
.with_behaviour(|key| ValidatorBehaviour::new(key, membership))
.with_behaviour(|key| ValidatorBehaviour::new(key, membership, addresses))
.expect("Validator behaviour should build")
.build()
}

View File

@ -5,6 +5,7 @@ use libp2p::identity::Keypair;
use libp2p::{Multiaddr, PeerId};
use log::error;
use nomos_core::da::BlobId;
use nomos_da_network_core::address_book::AddressBook;
use nomos_da_network_core::protocols::sampling;
use nomos_da_network_core::protocols::sampling::behaviour::SamplingError;
use nomos_da_network_core::swarm::validator::{ValidatorEventsStream, ValidatorSwarm};
@ -81,6 +82,7 @@ pub struct DaNetworkValidatorBackendSettings<Membership> {
key: Keypair,
/// Membership of DA network PoV set
membership: Membership,
addresses: AddressBook,
listening_address: Multiaddr,
}
@ -115,7 +117,7 @@ where
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self {
let (mut validator_swarm, events_streams) =
ValidatorSwarm::new(config.key, config.membership);
ValidatorSwarm::new(config.key, config.membership, config.addresses);
let sampling_request_channel = validator_swarm
.protocol_swarm()
.behaviour()