1
0
mirror of synced 2025-01-11 00:05:48 +00:00

etry connecting with initial peers (#410)

This commit is contained in:
Youngjoon Lee 2023-09-16 00:23:21 +09:00 committed by GitHub
parent 97c653efe3
commit 82946b8637
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 113 additions and 47 deletions

View File

@ -230,7 +230,7 @@ impl Config {
}
if let Some(peers) = initial_peers {
self.network.backend.inner.initial_peers = peers;
self.network.backend.initial_peers = peers;
}
Ok(self)

View File

@ -10,16 +10,18 @@ use blake2::digest::{consts::U32, Digest};
use blake2::Blake2b;
use libp2p::gossipsub::{Message, MessageId, TopicHash};
use libp2p::tcp::tokio::Tcp;
pub use libp2p::{
core::upgrade,
dns,
gossipsub::{self, PublishError, SubscriptionError},
identity::{self, secp256k1},
plaintext::PlainText2Config,
swarm::{DialError, NetworkBehaviour, SwarmBuilder, SwarmEvent, THandlerErr},
swarm::{
dial_opts::DialOpts, DialError, NetworkBehaviour, SwarmBuilder, SwarmEvent, THandlerErr,
},
tcp, yamux, PeerId, Transport,
};
use libp2p::{swarm::ConnectionId, tcp::tokio::Tcp};
pub use multiaddr::{multiaddr, Multiaddr, Protocol};
use serde::{Deserialize, Serialize};
@ -43,9 +45,6 @@ pub struct SwarmConfig {
// Secp256k1 private key in Hex format (`0x123...abc`). Default random
#[serde(with = "secret_key_serde", default = "secp256k1::SecretKey::generate")]
pub node_key: secp256k1::SecretKey,
// Initial peers to connect to
#[serde(default)]
pub initial_peers: Vec<Multiaddr>,
}
impl Default for SwarmConfig {
@ -54,7 +53,6 @@ impl Default for SwarmConfig {
host: std::net::Ipv4Addr::new(0, 0, 0, 0),
port: 60000,
node_key: secp256k1::SecretKey::generate(),
initial_peers: Vec::new(),
}
}
}
@ -110,18 +108,17 @@ impl Swarm {
swarm.listen_on(multiaddr!(Ip4(config.host), Tcp(config.port)))?;
for peer in &config.initial_peers {
swarm.dial(peer.clone())?;
}
Ok(Swarm { swarm })
}
/// Initiates a connection attempt to a peer
pub fn connect(&mut self, peer_id: PeerId, peer_addr: Multiaddr) -> Result<(), DialError> {
tracing::debug!("attempting to dial {peer_id}");
self.swarm.dial(peer_addr.with(Protocol::P2p(peer_id)))?;
Ok(())
pub fn connect(&mut self, peer_addr: Multiaddr) -> Result<ConnectionId, DialError> {
let opt = DialOpts::from(peer_addr.clone());
let connection_id = opt.connection_id();
tracing::debug!("attempting to dial {peer_addr}. connection_id:{connection_id:?}",);
self.swarm.dial(opt)?;
Ok(connection_id)
}
/// Subscribes to a topic

View File

@ -1,12 +1,13 @@
// std
use std::{error::Error, ops::Range, time::Duration};
use std::{collections::HashMap, error::Error, ops::Range, time::Duration};
// internal
use super::NetworkBackend;
use mixnet_client::{MixnetClient, MixnetClientConfig};
use nomos_core::wire;
pub use nomos_libp2p::libp2p::gossipsub::{Message, TopicHash};
use nomos_libp2p::{
libp2p::{gossipsub, Multiaddr, PeerId},
gossipsub,
libp2p::{swarm::ConnectionId, Multiaddr},
BehaviourEvent, Swarm, SwarmConfig, SwarmEvent,
};
// crates
@ -33,6 +34,9 @@ pub struct Libp2p {
pub struct Libp2pConfig {
#[serde(flatten)]
pub inner: SwarmConfig,
// Initial peers to connect to
#[serde(default)]
pub initial_peers: Vec<Multiaddr>,
pub mixnet_client: MixnetClientConfig,
#[serde(with = "humantime")]
pub mixnet_delay: Range<Duration>,
@ -92,13 +96,15 @@ pub enum EventKind {
}
const BUFFER_SIZE: usize = 64;
// TODO: make this configurable
const BACKOFF: u64 = 5;
// TODO: make this configurable
const MAX_RETRY: usize = 3;
#[derive(Debug)]
#[non_exhaustive]
pub enum Command {
Connect(PeerId, Multiaddr),
Connect(Dial),
Broadcast {
topic: Topic,
message: Box<[u8]>,
@ -113,10 +119,16 @@ pub enum Command {
DirectBroadcastAndRetry {
topic: Topic,
message: Box<[u8]>,
retry: usize,
retry_count: usize,
},
}
#[derive(Debug, Clone)]
pub struct Dial {
addr: Multiaddr,
retry_count: usize,
}
pub type Topic = String;
pub type CommandResultSender = oneshot::Sender<Result<(), Box<dyn Error + Send>>>;
@ -177,7 +189,7 @@ impl NetworkBackend for Libp2p {
.send(Command::DirectBroadcastAndRetry {
topic,
message,
retry: 0,
retry_count: 0,
})
.await
.unwrap_or_else(|_| tracing::error!("could not schedule broadcast"));
@ -193,6 +205,16 @@ impl NetworkBackend for Libp2p {
overwatch_handle.runtime().spawn(async move {
let mut swarm = Swarm::build(&config.inner).unwrap();
let mut mixnet_client = MixnetClient::new(config.mixnet_client, OsRng);
// Keep the dialing history since swarm.connect doesn't return the result synchronously
// TODO: Refactor: have a proper struct which holds this
let mut pending_dials = HashMap::<ConnectionId, Dial>::new();
for initial_peer in config.initial_peers {
let dial = Dial { addr: initial_peer, retry_count: 0 };
schedule_connect(dial, cmd_tx.clone()).await;
}
loop {
tokio::select! {
Some(event) = swarm.next() => {
@ -208,9 +230,13 @@ impl NetworkBackend for Libp2p {
SwarmEvent::ConnectionEstablished {
peer_id,
connection_id,
endpoint,
..
} => {
tracing::debug!("connected to peer: {peer_id} {connection_id:?}");
tracing::debug!("connected to peer:{peer_id}, connection_id:{connection_id:?}");
if endpoint.is_dialer() {
complete_connect(connection_id, &mut pending_dials);
}
}
SwarmEvent::ConnectionClosed {
peer_id,
@ -226,16 +252,16 @@ impl NetworkBackend for Libp2p {
error,
..
} => {
tracing::debug!("failed to connect to peer: {peer_id:?} {connection_id:?} due to: {error}");
tracing::error!("Failed to connect to peer: {peer_id:?} {connection_id:?} due to: {error}");
retry_connect(connection_id, &mut pending_dials, cmd_tx.clone());
}
_ => {}
}
}
Some(command) = commands_rx.recv() => {
match command {
Command::Connect(peer_id, peer_addr) => {
tracing::debug!("connecting to peer: {peer_id} {peer_addr}");
log_error!(swarm.connect(peer_id, peer_addr));
Command::Connect(dial) => {
connect(&mut swarm, dial, &mut pending_dials);
}
Command::Broadcast { topic, message } => {
tracing::debug!("sending message to mixnet client");
@ -263,8 +289,8 @@ impl NetworkBackend for Libp2p {
};
log_error!(reply.send(info));
}
Command::DirectBroadcastAndRetry { topic, message, retry } => {
broadcast_and_retry(topic, message, retry, cmd_tx.clone(), &mut swarm, notify.clone()).await;
Command::DirectBroadcastAndRetry { topic, message, retry_count } => {
broadcast_and_retry(topic, message, retry_count, cmd_tx.clone(), &mut swarm, notify.clone()).await;
}
};
}
@ -303,18 +329,66 @@ fn random_delay(range: &Range<Duration>) -> Duration {
thread_rng().gen_range(range.start, range.end)
}
async fn schedule_connect(dial: Dial, commands_tx: mpsc::Sender<Command>) {
commands_tx
.send(Command::Connect(dial))
.await
.unwrap_or_else(|_| tracing::error!("could not schedule connect"));
}
fn connect(swarm: &mut Swarm, dial: Dial, pending_dials: &mut HashMap<ConnectionId, Dial>) {
tracing::debug!("Connecting to {}", dial.addr);
match swarm.connect(dial.addr.clone()) {
Ok(connection_id) => {
// Dialing has been scheduled. The result will be notified as a SwarmEvent.
pending_dials.insert(connection_id, dial);
}
Err(e) => {
tracing::error!(
"Failed to connect to {} with unretriable error: {e}",
dial.addr
);
}
}
}
fn complete_connect(connection_id: ConnectionId, pending_dials: &mut HashMap<ConnectionId, Dial>) {
pending_dials.remove(&connection_id);
}
fn retry_connect(
connection_id: ConnectionId,
pending_dials: &mut HashMap<ConnectionId, Dial>,
commands_tx: mpsc::Sender<Command>,
) {
if let Some(mut dial) = pending_dials.remove(&connection_id) {
dial.retry_count += 1;
if dial.retry_count > MAX_RETRY {
tracing::debug!("Max retry({MAX_RETRY}) has been reached: {dial:?}");
return;
}
let wait = exp_backoff(dial.retry_count);
tracing::debug!("Retry dialing in {wait:?}: {dial:?}");
tokio::spawn(async move {
tokio::time::sleep(wait).await;
schedule_connect(dial, commands_tx).await;
});
}
}
async fn broadcast_and_retry(
topic: Topic,
message: Box<[u8]>,
retry: usize,
retry_count: usize,
commands_tx: mpsc::Sender<Command>,
swarm: &mut Swarm,
events_tx: broadcast::Sender<Event>,
) {
tracing::debug!("broadcasting message to topic: {topic}");
let wait = BACKOFF.pow(retry as u32);
match swarm.broadcast(&topic, message.to_vec()) {
Ok(id) => {
tracing::debug!("broadcasted message with id: {id} tp topic: {topic}");
@ -328,16 +402,17 @@ async fn broadcast_and_retry(
})));
}
}
Err(gossipsub::PublishError::InsufficientPeers) if retry < MAX_RETRY => {
Err(gossipsub::PublishError::InsufficientPeers) if retry_count < MAX_RETRY => {
let wait = exp_backoff(retry_count);
tracing::error!("failed to broadcast message to topic due to insufficient peers, trying again in {wait:?}");
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(wait)).await;
tokio::time::sleep(wait).await;
commands_tx
.send(Command::DirectBroadcastAndRetry {
topic,
message,
retry: retry + 1,
retry_count: retry_count + 1,
})
.await
.unwrap_or_else(|_| tracing::error!("could not schedule retry"));
@ -349,6 +424,10 @@ async fn broadcast_and_retry(
}
}
fn exp_backoff(retry: usize) -> Duration {
std::time::Duration::from_secs(BACKOFF.pow(retry as u32))
}
#[cfg(test)]
mod tests {
use std::time::Duration;

View File

@ -13,7 +13,7 @@ use mixnet_topology::MixnetTopology;
use nomos_consensus::{CarnotInfo, CarnotSettings};
use nomos_http::backends::axum::AxumBackendSettings;
#[cfg(feature = "libp2p")]
use nomos_libp2p::{Multiaddr, SwarmConfig};
use nomos_libp2p::Multiaddr;
use nomos_log::{LoggerBackend, LoggerFormat};
#[cfg(feature = "libp2p")]
use nomos_network::backends::libp2p::{Libp2pConfig, Libp2pInfo};
@ -198,18 +198,10 @@ impl Node for NomosNode {
let mut nodes = vec![Self::spawn(configs.swap_remove(0)).await];
let listening_addr = nodes[0].get_listening_address().await;
for mut conf in configs {
#[cfg(feature = "waku")]
conf.network
.backend
.initial_peers
.push(listening_addr.clone());
#[cfg(feature = "libp2p")]
// TODO: Consider having `initial_peers` outside of `inner`, as WakuConfig does
conf.network
.backend
.inner
.initial_peers
.push(listening_addr.clone());
nodes.push(Self::spawn(conf).await);
}
@ -252,15 +244,13 @@ fn create_node_config(
network: NetworkConfig {
#[cfg(feature = "waku")]
backend: WakuConfig {
initial_peers: vec![],
inner: Default::default(),
initial_peers: vec![],
},
#[cfg(feature = "libp2p")]
backend: Libp2pConfig {
inner: SwarmConfig {
initial_peers: vec![],
..Default::default()
},
inner: Default::default(),
initial_peers: vec![],
mixnet_client: MixnetClientConfig {
mode: mixnet_client_mode,
topology: mixnet_topology,