From 9f69934fe2793f7db0ea533d4e8b4acb0baa2fe5 Mon Sep 17 00:00:00 2001 From: Youngjoon Lee Date: Mon, 9 Oct 2023 18:04:52 +0900 Subject: [PATCH] Make libp2p gossipsub config configurable (#454) * define GossipsubConfigDef to derive serde for gossipsub::Config * refactor: move config-related codes into config.rs --- nomos-libp2p/src/config.rs | 188 +++++++++++++++++++++++++++++++++++++ nomos-libp2p/src/lib.rs | 69 +------------- 2 files changed, 192 insertions(+), 65 deletions(-) create mode 100644 nomos-libp2p/src/config.rs diff --git a/nomos-libp2p/src/config.rs b/nomos-libp2p/src/config.rs new file mode 100644 index 00000000..87a77a93 --- /dev/null +++ b/nomos-libp2p/src/config.rs @@ -0,0 +1,188 @@ +use std::time::Duration; + +use libp2p::{gossipsub, identity::secp256k1}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SwarmConfig { + // Listening IPv4 address + pub host: std::net::Ipv4Addr, + // TCP listening port. Use 0 for random + pub port: u16, + // Secp256k1 private key in Hex format (`0x123...abc`). Default random + #[serde(with = "secret_key_serde", default = "secp256k1::SecretKey::generate")] + pub node_key: secp256k1::SecretKey, + // Gossipsub config + #[serde(with = "GossipsubConfigDef")] + pub gossipsub_config: gossipsub::Config, +} + +impl Default for SwarmConfig { + fn default() -> Self { + Self { + host: std::net::Ipv4Addr::new(0, 0, 0, 0), + port: 60000, + node_key: secp256k1::SecretKey::generate(), + gossipsub_config: gossipsub::Config::default(), + } + } +} + +// A partial copy of gossipsub::Config for deriving Serialize/Deserialize remotely +// https://serde.rs/remote-derive.html +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(remote = "gossipsub::Config")] +struct GossipsubConfigDef { + #[serde(getter = "gossipsub::Config::history_length")] + history_legnth: usize, + #[serde(getter = "gossipsub::Config::history_gossip")] + history_gossip: usize, + #[serde(getter = "gossipsub::Config::mesh_n")] + mesh_n: usize, + #[serde(getter = "gossipsub::Config::mesh_n_low")] + mesh_n_low: usize, + #[serde(getter = "gossipsub::Config::mesh_n_high")] + mesh_n_high: usize, + #[serde(getter = "gossipsub::Config::retain_scores")] + retain_scores: usize, + #[serde(getter = "gossipsub::Config::gossip_lazy")] + gossip_lazy: usize, + #[serde(getter = "gossipsub::Config::gossip_factor")] + gossip_factor: f64, + #[serde(getter = "gossipsub::Config::heartbeat_initial_delay")] + heartbeat_initial_delay: Duration, + #[serde(getter = "gossipsub::Config::heartbeat_interval")] + heartbeat_interval: Duration, + #[serde(getter = "gossipsub::Config::fanout_ttl")] + fanout_ttl: Duration, + #[serde(getter = "gossipsub::Config::check_explicit_peers_ticks")] + check_explicit_peers_ticks: u64, + #[serde(getter = "gossipsub::Config::idle_timeout")] + idle_timeout: Duration, + #[serde(getter = "gossipsub::Config::duplicate_cache_time")] + duplicate_cache_time: Duration, + #[serde(getter = "gossipsub::Config::validate_messages")] + validate_messages: bool, + #[serde(getter = "gossipsub::Config::allow_self_origin")] + allow_self_origin: bool, + #[serde(getter = "gossipsub::Config::do_px")] + do_px: bool, + #[serde(getter = "gossipsub::Config::prune_peers")] + prune_peers: usize, + #[serde(getter = "gossipsub::Config::prune_backoff")] + prune_backoff: Duration, + #[serde(getter = "gossipsub::Config::unsubscribe_backoff")] + unsubscribe_backoff: Duration, + #[serde(getter = "gossipsub::Config::backoff_slack")] + backoff_slack: u32, + #[serde(getter = "gossipsub::Config::flood_publish")] + flood_publish: bool, + #[serde(getter = "gossipsub::Config::graft_flood_threshold")] + graft_flood_threshold: Duration, + #[serde(getter = "gossipsub::Config::mesh_outbound_min")] + mesh_outbound_min: usize, + #[serde(getter = "gossipsub::Config::opportunistic_graft_ticks")] + opportunistic_graft_ticks: u64, + #[serde(getter = "gossipsub::Config::opportunistic_graft_peers")] + opportunistic_graft_peers: usize, + #[serde(getter = "gossipsub::Config::gossip_retransimission")] + gossip_retransimission: u32, + #[serde(getter = "gossipsub::Config::max_messages_per_rpc")] + max_messages_per_rpc: Option, + #[serde(getter = "gossipsub::Config::max_ihave_length")] + max_ihave_length: usize, + #[serde(getter = "gossipsub::Config::max_ihave_messages")] + max_ihave_messages: usize, + #[serde(getter = "gossipsub::Config::iwant_followup_time")] + iwant_followup_time: Duration, + #[serde(getter = "gossipsub::Config::published_message_ids_cache_time")] + published_message_ids_cache_time: Duration, +} + +impl From for gossipsub::Config { + fn from(def: GossipsubConfigDef) -> gossipsub::Config { + let mut builder = gossipsub::ConfigBuilder::default(); + let mut builder = builder + .history_length(def.history_legnth) + .history_gossip(def.history_gossip) + .mesh_n(def.mesh_n) + .mesh_n_low(def.mesh_n_low) + .mesh_n_high(def.mesh_n_high) + .retain_scores(def.retain_scores) + .gossip_lazy(def.gossip_lazy) + .gossip_factor(def.gossip_factor) + .heartbeat_initial_delay(def.heartbeat_initial_delay) + .heartbeat_interval(def.heartbeat_interval) + .fanout_ttl(def.fanout_ttl) + .check_explicit_peers_ticks(def.check_explicit_peers_ticks) + .idle_timeout(def.idle_timeout) + .duplicate_cache_time(def.duplicate_cache_time) + .allow_self_origin(def.allow_self_origin) + .prune_peers(def.prune_peers) + .prune_backoff(def.prune_backoff) + .unsubscribe_backoff(def.unsubscribe_backoff.as_secs()) + .backoff_slack(def.backoff_slack) + .flood_publish(def.flood_publish) + .graft_flood_threshold(def.graft_flood_threshold) + .mesh_outbound_min(def.mesh_outbound_min) + .opportunistic_graft_ticks(def.opportunistic_graft_ticks) + .opportunistic_graft_peers(def.opportunistic_graft_peers) + .gossip_retransimission(def.gossip_retransimission) + .max_messages_per_rpc(def.max_messages_per_rpc) + .max_ihave_length(def.max_ihave_length) + .max_ihave_messages(def.max_ihave_messages) + .iwant_followup_time(def.iwant_followup_time) + .published_message_ids_cache_time(def.published_message_ids_cache_time); + + if def.validate_messages { + builder = builder.validate_messages(); + } + if def.do_px { + builder = builder.do_px(); + } + + builder.build().unwrap() + } +} + +mod secret_key_serde { + use libp2p::identity::secp256k1; + use serde::de::Error; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + pub fn serialize(key: &secp256k1::SecretKey, serializer: S) -> Result + where + S: Serializer, + { + let hex_str = hex::encode(key.to_bytes()); + hex_str.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let hex_str = String::deserialize(deserializer)?; + let mut key_bytes = hex::decode(hex_str).map_err(|e| D::Error::custom(format!("{e}")))?; + secp256k1::SecretKey::try_from_bytes(key_bytes.as_mut_slice()) + .map_err(|e| D::Error::custom(format!("{e}"))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn config_serde() { + let config: SwarmConfig = Default::default(); + + let serialized = serde_json::to_string(&config).unwrap(); + println!("{serialized}"); + + let deserialized: SwarmConfig = serde_json::from_str(serialized.as_str()).unwrap(); + assert_eq!(deserialized.host, config.host); + assert_eq!(deserialized.port, config.port); + assert_eq!(deserialized.node_key.to_bytes(), config.node_key.to_bytes()); + } +} diff --git a/nomos-libp2p/src/lib.rs b/nomos-libp2p/src/lib.rs index fe0d0432..af1298e9 100644 --- a/nomos-libp2p/src/lib.rs +++ b/nomos-libp2p/src/lib.rs @@ -1,9 +1,12 @@ +mod config; + use std::error::Error; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; +pub use config::SwarmConfig; pub use libp2p; use blake2::digest::{consts::U32, Digest}; @@ -23,7 +26,6 @@ pub use libp2p::{ }; use libp2p::{swarm::ConnectionId, tcp::tokio::Tcp}; pub use multiaddr::{multiaddr, Multiaddr, Protocol}; -use serde::{Deserialize, Serialize}; /// Wraps [`libp2p::Swarm`], and config it for use within Nomos. pub struct Swarm { @@ -36,27 +38,6 @@ pub struct Behaviour { gossipsub: gossipsub::Behaviour, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SwarmConfig { - // Listening IPv4 address - pub host: std::net::Ipv4Addr, - // TCP listening port. Use 0 for random - pub port: u16, - // Secp256k1 private key in Hex format (`0x123...abc`). Default random - #[serde(with = "secret_key_serde", default = "secp256k1::SecretKey::generate")] - pub node_key: secp256k1::SecretKey, -} - -impl Default for SwarmConfig { - fn default() -> Self { - Self { - host: std::net::Ipv4Addr::new(0, 0, 0, 0), - port: 60000, - node_key: secp256k1::SecretKey::generate(), - } - } -} - #[derive(thiserror::Error, Debug)] pub enum SwarmError { #[error("duplicate dialing")] @@ -93,7 +74,7 @@ impl Swarm { // to prevent all messages from a peer being filtered as duplicates. let gossipsub = gossipsub::Behaviour::new( gossipsub::MessageAuthenticity::Author(local_peer_id), - gossipsub::ConfigBuilder::default() + gossipsub::ConfigBuilder::from(config.gossipsub_config.clone()) .validation_mode(gossipsub::ValidationMode::None) .message_id_fn(compute_message_id) .build()?, @@ -186,45 +167,3 @@ fn compute_message_id(message: &Message) -> MessageId { hasher.update(&message.data); MessageId::from(hasher.finalize().to_vec()) } - -mod secret_key_serde { - use libp2p::identity::secp256k1; - use serde::de::Error; - use serde::{Deserialize, Deserializer, Serialize, Serializer}; - - pub fn serialize(key: &secp256k1::SecretKey, serializer: S) -> Result - where - S: Serializer, - { - let hex_str = hex::encode(key.to_bytes()); - hex_str.serialize(serializer) - } - - pub fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let hex_str = String::deserialize(deserializer)?; - let mut key_bytes = hex::decode(hex_str).map_err(|e| D::Error::custom(format!("{e}")))?; - secp256k1::SecretKey::try_from_bytes(key_bytes.as_mut_slice()) - .map_err(|e| D::Error::custom(format!("{e}"))) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn config_serde() { - let config: SwarmConfig = Default::default(); - - let serialized = serde_json::to_string(&config).unwrap(); - println!("{serialized}"); - - let deserialized: SwarmConfig = serde_json::from_str(serialized.as_str()).unwrap(); - assert_eq!(deserialized.host, config.host); - assert_eq!(deserialized.port, config.port); - assert_eq!(deserialized.node_key.to_bytes(), config.node_key.to_bytes()); - } -}