From 82946b8637374f64df43ca8cac087d6a101da01f Mon Sep 17 00:00:00 2001 From: Youngjoon Lee Date: Sat, 16 Sep 2023 00:23:21 +0900 Subject: [PATCH] etry connecting with initial peers (#410) --- nodes/nomos-node/src/config.rs | 2 +- nomos-libp2p/src/lib.rs | 25 ++-- nomos-services/network/src/backends/libp2p.rs | 115 +++++++++++++++--- tests/src/nodes/nomos.rs | 18 +-- 4 files changed, 113 insertions(+), 47 deletions(-) diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index 216f895b..e967b323 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -230,7 +230,7 @@ impl Config { } if let Some(peers) = initial_peers { - self.network.backend.inner.initial_peers = peers; + self.network.backend.initial_peers = peers; } Ok(self) diff --git a/nomos-libp2p/src/lib.rs b/nomos-libp2p/src/lib.rs index fb41f373..fe0d0432 100644 --- a/nomos-libp2p/src/lib.rs +++ b/nomos-libp2p/src/lib.rs @@ -10,16 +10,18 @@ use blake2::digest::{consts::U32, Digest}; use blake2::Blake2b; use libp2p::gossipsub::{Message, MessageId, TopicHash}; -use libp2p::tcp::tokio::Tcp; pub use libp2p::{ core::upgrade, dns, gossipsub::{self, PublishError, SubscriptionError}, identity::{self, secp256k1}, plaintext::PlainText2Config, - swarm::{DialError, NetworkBehaviour, SwarmBuilder, SwarmEvent, THandlerErr}, + swarm::{ + dial_opts::DialOpts, DialError, NetworkBehaviour, SwarmBuilder, SwarmEvent, THandlerErr, + }, tcp, yamux, PeerId, Transport, }; +use libp2p::{swarm::ConnectionId, tcp::tokio::Tcp}; pub use multiaddr::{multiaddr, Multiaddr, Protocol}; use serde::{Deserialize, Serialize}; @@ -43,9 +45,6 @@ pub struct SwarmConfig { // Secp256k1 private key in Hex format (`0x123...abc`). Default random #[serde(with = "secret_key_serde", default = "secp256k1::SecretKey::generate")] pub node_key: secp256k1::SecretKey, - // Initial peers to connect to - #[serde(default)] - pub initial_peers: Vec, } impl Default for SwarmConfig { @@ -54,7 +53,6 @@ impl Default for SwarmConfig { host: std::net::Ipv4Addr::new(0, 0, 0, 0), port: 60000, node_key: secp256k1::SecretKey::generate(), - initial_peers: Vec::new(), } } } @@ -110,18 +108,17 @@ impl Swarm { swarm.listen_on(multiaddr!(Ip4(config.host), Tcp(config.port)))?; - for peer in &config.initial_peers { - swarm.dial(peer.clone())?; - } - Ok(Swarm { swarm }) } /// Initiates a connection attempt to a peer - pub fn connect(&mut self, peer_id: PeerId, peer_addr: Multiaddr) -> Result<(), DialError> { - tracing::debug!("attempting to dial {peer_id}"); - self.swarm.dial(peer_addr.with(Protocol::P2p(peer_id)))?; - Ok(()) + pub fn connect(&mut self, peer_addr: Multiaddr) -> Result { + let opt = DialOpts::from(peer_addr.clone()); + let connection_id = opt.connection_id(); + + tracing::debug!("attempting to dial {peer_addr}. connection_id:{connection_id:?}",); + self.swarm.dial(opt)?; + Ok(connection_id) } /// Subscribes to a topic diff --git a/nomos-services/network/src/backends/libp2p.rs b/nomos-services/network/src/backends/libp2p.rs index 4ab1cbe2..ce3e62c2 100644 --- a/nomos-services/network/src/backends/libp2p.rs +++ b/nomos-services/network/src/backends/libp2p.rs @@ -1,12 +1,13 @@ // std -use std::{error::Error, ops::Range, time::Duration}; +use std::{collections::HashMap, error::Error, ops::Range, time::Duration}; // internal use super::NetworkBackend; use mixnet_client::{MixnetClient, MixnetClientConfig}; use nomos_core::wire; pub use nomos_libp2p::libp2p::gossipsub::{Message, TopicHash}; use nomos_libp2p::{ - libp2p::{gossipsub, Multiaddr, PeerId}, + gossipsub, + libp2p::{swarm::ConnectionId, Multiaddr}, BehaviourEvent, Swarm, SwarmConfig, SwarmEvent, }; // crates @@ -33,6 +34,9 @@ pub struct Libp2p { pub struct Libp2pConfig { #[serde(flatten)] pub inner: SwarmConfig, + // Initial peers to connect to + #[serde(default)] + pub initial_peers: Vec, pub mixnet_client: MixnetClientConfig, #[serde(with = "humantime")] pub mixnet_delay: Range, @@ -92,13 +96,15 @@ pub enum EventKind { } const BUFFER_SIZE: usize = 64; +// TODO: make this configurable const BACKOFF: u64 = 5; +// TODO: make this configurable const MAX_RETRY: usize = 3; #[derive(Debug)] #[non_exhaustive] pub enum Command { - Connect(PeerId, Multiaddr), + Connect(Dial), Broadcast { topic: Topic, message: Box<[u8]>, @@ -113,10 +119,16 @@ pub enum Command { DirectBroadcastAndRetry { topic: Topic, message: Box<[u8]>, - retry: usize, + retry_count: usize, }, } +#[derive(Debug, Clone)] +pub struct Dial { + addr: Multiaddr, + retry_count: usize, +} + pub type Topic = String; pub type CommandResultSender = oneshot::Sender>>; @@ -177,7 +189,7 @@ impl NetworkBackend for Libp2p { .send(Command::DirectBroadcastAndRetry { topic, message, - retry: 0, + retry_count: 0, }) .await .unwrap_or_else(|_| tracing::error!("could not schedule broadcast")); @@ -193,6 +205,16 @@ impl NetworkBackend for Libp2p { overwatch_handle.runtime().spawn(async move { let mut swarm = Swarm::build(&config.inner).unwrap(); let mut mixnet_client = MixnetClient::new(config.mixnet_client, OsRng); + + // Keep the dialing history since swarm.connect doesn't return the result synchronously + // TODO: Refactor: have a proper struct which holds this + let mut pending_dials = HashMap::::new(); + + for initial_peer in config.initial_peers { + let dial = Dial { addr: initial_peer, retry_count: 0 }; + schedule_connect(dial, cmd_tx.clone()).await; + } + loop { tokio::select! { Some(event) = swarm.next() => { @@ -208,9 +230,13 @@ impl NetworkBackend for Libp2p { SwarmEvent::ConnectionEstablished { peer_id, connection_id, + endpoint, .. } => { - tracing::debug!("connected to peer: {peer_id} {connection_id:?}"); + tracing::debug!("connected to peer:{peer_id}, connection_id:{connection_id:?}"); + if endpoint.is_dialer() { + complete_connect(connection_id, &mut pending_dials); + } } SwarmEvent::ConnectionClosed { peer_id, @@ -226,16 +252,16 @@ impl NetworkBackend for Libp2p { error, .. } => { - tracing::debug!("failed to connect to peer: {peer_id:?} {connection_id:?} due to: {error}"); + tracing::error!("Failed to connect to peer: {peer_id:?} {connection_id:?} due to: {error}"); + retry_connect(connection_id, &mut pending_dials, cmd_tx.clone()); } _ => {} } } Some(command) = commands_rx.recv() => { match command { - Command::Connect(peer_id, peer_addr) => { - tracing::debug!("connecting to peer: {peer_id} {peer_addr}"); - log_error!(swarm.connect(peer_id, peer_addr)); + Command::Connect(dial) => { + connect(&mut swarm, dial, &mut pending_dials); } Command::Broadcast { topic, message } => { tracing::debug!("sending message to mixnet client"); @@ -263,8 +289,8 @@ impl NetworkBackend for Libp2p { }; log_error!(reply.send(info)); } - Command::DirectBroadcastAndRetry { topic, message, retry } => { - broadcast_and_retry(topic, message, retry, cmd_tx.clone(), &mut swarm, notify.clone()).await; + Command::DirectBroadcastAndRetry { topic, message, retry_count } => { + broadcast_and_retry(topic, message, retry_count, cmd_tx.clone(), &mut swarm, notify.clone()).await; } }; } @@ -303,18 +329,66 @@ fn random_delay(range: &Range) -> Duration { thread_rng().gen_range(range.start, range.end) } +async fn schedule_connect(dial: Dial, commands_tx: mpsc::Sender) { + commands_tx + .send(Command::Connect(dial)) + .await + .unwrap_or_else(|_| tracing::error!("could not schedule connect")); +} + +fn connect(swarm: &mut Swarm, dial: Dial, pending_dials: &mut HashMap) { + tracing::debug!("Connecting to {}", dial.addr); + + match swarm.connect(dial.addr.clone()) { + Ok(connection_id) => { + // Dialing has been scheduled. The result will be notified as a SwarmEvent. + pending_dials.insert(connection_id, dial); + } + Err(e) => { + tracing::error!( + "Failed to connect to {} with unretriable error: {e}", + dial.addr + ); + } + } +} + +fn complete_connect(connection_id: ConnectionId, pending_dials: &mut HashMap) { + pending_dials.remove(&connection_id); +} + +fn retry_connect( + connection_id: ConnectionId, + pending_dials: &mut HashMap, + commands_tx: mpsc::Sender, +) { + if let Some(mut dial) = pending_dials.remove(&connection_id) { + dial.retry_count += 1; + if dial.retry_count > MAX_RETRY { + tracing::debug!("Max retry({MAX_RETRY}) has been reached: {dial:?}"); + return; + } + + let wait = exp_backoff(dial.retry_count); + tracing::debug!("Retry dialing in {wait:?}: {dial:?}"); + + tokio::spawn(async move { + tokio::time::sleep(wait).await; + schedule_connect(dial, commands_tx).await; + }); + } +} + async fn broadcast_and_retry( topic: Topic, message: Box<[u8]>, - retry: usize, + retry_count: usize, commands_tx: mpsc::Sender, swarm: &mut Swarm, events_tx: broadcast::Sender, ) { tracing::debug!("broadcasting message to topic: {topic}"); - let wait = BACKOFF.pow(retry as u32); - match swarm.broadcast(&topic, message.to_vec()) { Ok(id) => { tracing::debug!("broadcasted message with id: {id} tp topic: {topic}"); @@ -328,16 +402,17 @@ async fn broadcast_and_retry( }))); } } - Err(gossipsub::PublishError::InsufficientPeers) if retry < MAX_RETRY => { + Err(gossipsub::PublishError::InsufficientPeers) if retry_count < MAX_RETRY => { + let wait = exp_backoff(retry_count); tracing::error!("failed to broadcast message to topic due to insufficient peers, trying again in {wait:?}"); tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_secs(wait)).await; + tokio::time::sleep(wait).await; commands_tx .send(Command::DirectBroadcastAndRetry { topic, message, - retry: retry + 1, + retry_count: retry_count + 1, }) .await .unwrap_or_else(|_| tracing::error!("could not schedule retry")); @@ -349,6 +424,10 @@ async fn broadcast_and_retry( } } +fn exp_backoff(retry: usize) -> Duration { + std::time::Duration::from_secs(BACKOFF.pow(retry as u32)) +} + #[cfg(test)] mod tests { use std::time::Duration; diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index bc0339c4..3e75d6f1 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -13,7 +13,7 @@ use mixnet_topology::MixnetTopology; use nomos_consensus::{CarnotInfo, CarnotSettings}; use nomos_http::backends::axum::AxumBackendSettings; #[cfg(feature = "libp2p")] -use nomos_libp2p::{Multiaddr, SwarmConfig}; +use nomos_libp2p::Multiaddr; use nomos_log::{LoggerBackend, LoggerFormat}; #[cfg(feature = "libp2p")] use nomos_network::backends::libp2p::{Libp2pConfig, Libp2pInfo}; @@ -198,18 +198,10 @@ impl Node for NomosNode { let mut nodes = vec![Self::spawn(configs.swap_remove(0)).await]; let listening_addr = nodes[0].get_listening_address().await; for mut conf in configs { - #[cfg(feature = "waku")] conf.network .backend .initial_peers .push(listening_addr.clone()); - #[cfg(feature = "libp2p")] - // TODO: Consider having `initial_peers` outside of `inner`, as WakuConfig does - conf.network - .backend - .inner - .initial_peers - .push(listening_addr.clone()); nodes.push(Self::spawn(conf).await); } @@ -252,15 +244,13 @@ fn create_node_config( network: NetworkConfig { #[cfg(feature = "waku")] backend: WakuConfig { - initial_peers: vec![], inner: Default::default(), + initial_peers: vec![], }, #[cfg(feature = "libp2p")] backend: Libp2pConfig { - inner: SwarmConfig { - initial_peers: vec![], - ..Default::default() - }, + inner: Default::default(), + initial_peers: vec![], mixnet_client: MixnetClientConfig { mode: mixnet_client_mode, topology: mixnet_topology,