Make libp2p gossipsub config configurable (#454)
* define GossipsubConfigDef to derive serde for gossipsub::Config * refactor: move config-related codes into config.rs
This commit is contained in:
parent
2016f75213
commit
9f69934fe2
|
@ -0,0 +1,188 @@
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use libp2p::{gossipsub, identity::secp256k1};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct SwarmConfig {
|
||||||
|
// Listening IPv4 address
|
||||||
|
pub host: std::net::Ipv4Addr,
|
||||||
|
// TCP listening port. Use 0 for random
|
||||||
|
pub port: u16,
|
||||||
|
// Secp256k1 private key in Hex format (`0x123...abc`). Default random
|
||||||
|
#[serde(with = "secret_key_serde", default = "secp256k1::SecretKey::generate")]
|
||||||
|
pub node_key: secp256k1::SecretKey,
|
||||||
|
// Gossipsub config
|
||||||
|
#[serde(with = "GossipsubConfigDef")]
|
||||||
|
pub gossipsub_config: gossipsub::Config,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for SwarmConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
host: std::net::Ipv4Addr::new(0, 0, 0, 0),
|
||||||
|
port: 60000,
|
||||||
|
node_key: secp256k1::SecretKey::generate(),
|
||||||
|
gossipsub_config: gossipsub::Config::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// A partial copy of gossipsub::Config for deriving Serialize/Deserialize remotely
|
||||||
|
// https://serde.rs/remote-derive.html
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(remote = "gossipsub::Config")]
|
||||||
|
struct GossipsubConfigDef {
|
||||||
|
#[serde(getter = "gossipsub::Config::history_length")]
|
||||||
|
history_legnth: usize,
|
||||||
|
#[serde(getter = "gossipsub::Config::history_gossip")]
|
||||||
|
history_gossip: usize,
|
||||||
|
#[serde(getter = "gossipsub::Config::mesh_n")]
|
||||||
|
mesh_n: usize,
|
||||||
|
#[serde(getter = "gossipsub::Config::mesh_n_low")]
|
||||||
|
mesh_n_low: usize,
|
||||||
|
#[serde(getter = "gossipsub::Config::mesh_n_high")]
|
||||||
|
mesh_n_high: usize,
|
||||||
|
#[serde(getter = "gossipsub::Config::retain_scores")]
|
||||||
|
retain_scores: usize,
|
||||||
|
#[serde(getter = "gossipsub::Config::gossip_lazy")]
|
||||||
|
gossip_lazy: usize,
|
||||||
|
#[serde(getter = "gossipsub::Config::gossip_factor")]
|
||||||
|
gossip_factor: f64,
|
||||||
|
#[serde(getter = "gossipsub::Config::heartbeat_initial_delay")]
|
||||||
|
heartbeat_initial_delay: Duration,
|
||||||
|
#[serde(getter = "gossipsub::Config::heartbeat_interval")]
|
||||||
|
heartbeat_interval: Duration,
|
||||||
|
#[serde(getter = "gossipsub::Config::fanout_ttl")]
|
||||||
|
fanout_ttl: Duration,
|
||||||
|
#[serde(getter = "gossipsub::Config::check_explicit_peers_ticks")]
|
||||||
|
check_explicit_peers_ticks: u64,
|
||||||
|
#[serde(getter = "gossipsub::Config::idle_timeout")]
|
||||||
|
idle_timeout: Duration,
|
||||||
|
#[serde(getter = "gossipsub::Config::duplicate_cache_time")]
|
||||||
|
duplicate_cache_time: Duration,
|
||||||
|
#[serde(getter = "gossipsub::Config::validate_messages")]
|
||||||
|
validate_messages: bool,
|
||||||
|
#[serde(getter = "gossipsub::Config::allow_self_origin")]
|
||||||
|
allow_self_origin: bool,
|
||||||
|
#[serde(getter = "gossipsub::Config::do_px")]
|
||||||
|
do_px: bool,
|
||||||
|
#[serde(getter = "gossipsub::Config::prune_peers")]
|
||||||
|
prune_peers: usize,
|
||||||
|
#[serde(getter = "gossipsub::Config::prune_backoff")]
|
||||||
|
prune_backoff: Duration,
|
||||||
|
#[serde(getter = "gossipsub::Config::unsubscribe_backoff")]
|
||||||
|
unsubscribe_backoff: Duration,
|
||||||
|
#[serde(getter = "gossipsub::Config::backoff_slack")]
|
||||||
|
backoff_slack: u32,
|
||||||
|
#[serde(getter = "gossipsub::Config::flood_publish")]
|
||||||
|
flood_publish: bool,
|
||||||
|
#[serde(getter = "gossipsub::Config::graft_flood_threshold")]
|
||||||
|
graft_flood_threshold: Duration,
|
||||||
|
#[serde(getter = "gossipsub::Config::mesh_outbound_min")]
|
||||||
|
mesh_outbound_min: usize,
|
||||||
|
#[serde(getter = "gossipsub::Config::opportunistic_graft_ticks")]
|
||||||
|
opportunistic_graft_ticks: u64,
|
||||||
|
#[serde(getter = "gossipsub::Config::opportunistic_graft_peers")]
|
||||||
|
opportunistic_graft_peers: usize,
|
||||||
|
#[serde(getter = "gossipsub::Config::gossip_retransimission")]
|
||||||
|
gossip_retransimission: u32,
|
||||||
|
#[serde(getter = "gossipsub::Config::max_messages_per_rpc")]
|
||||||
|
max_messages_per_rpc: Option<usize>,
|
||||||
|
#[serde(getter = "gossipsub::Config::max_ihave_length")]
|
||||||
|
max_ihave_length: usize,
|
||||||
|
#[serde(getter = "gossipsub::Config::max_ihave_messages")]
|
||||||
|
max_ihave_messages: usize,
|
||||||
|
#[serde(getter = "gossipsub::Config::iwant_followup_time")]
|
||||||
|
iwant_followup_time: Duration,
|
||||||
|
#[serde(getter = "gossipsub::Config::published_message_ids_cache_time")]
|
||||||
|
published_message_ids_cache_time: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<GossipsubConfigDef> for gossipsub::Config {
|
||||||
|
fn from(def: GossipsubConfigDef) -> gossipsub::Config {
|
||||||
|
let mut builder = gossipsub::ConfigBuilder::default();
|
||||||
|
let mut builder = builder
|
||||||
|
.history_length(def.history_legnth)
|
||||||
|
.history_gossip(def.history_gossip)
|
||||||
|
.mesh_n(def.mesh_n)
|
||||||
|
.mesh_n_low(def.mesh_n_low)
|
||||||
|
.mesh_n_high(def.mesh_n_high)
|
||||||
|
.retain_scores(def.retain_scores)
|
||||||
|
.gossip_lazy(def.gossip_lazy)
|
||||||
|
.gossip_factor(def.gossip_factor)
|
||||||
|
.heartbeat_initial_delay(def.heartbeat_initial_delay)
|
||||||
|
.heartbeat_interval(def.heartbeat_interval)
|
||||||
|
.fanout_ttl(def.fanout_ttl)
|
||||||
|
.check_explicit_peers_ticks(def.check_explicit_peers_ticks)
|
||||||
|
.idle_timeout(def.idle_timeout)
|
||||||
|
.duplicate_cache_time(def.duplicate_cache_time)
|
||||||
|
.allow_self_origin(def.allow_self_origin)
|
||||||
|
.prune_peers(def.prune_peers)
|
||||||
|
.prune_backoff(def.prune_backoff)
|
||||||
|
.unsubscribe_backoff(def.unsubscribe_backoff.as_secs())
|
||||||
|
.backoff_slack(def.backoff_slack)
|
||||||
|
.flood_publish(def.flood_publish)
|
||||||
|
.graft_flood_threshold(def.graft_flood_threshold)
|
||||||
|
.mesh_outbound_min(def.mesh_outbound_min)
|
||||||
|
.opportunistic_graft_ticks(def.opportunistic_graft_ticks)
|
||||||
|
.opportunistic_graft_peers(def.opportunistic_graft_peers)
|
||||||
|
.gossip_retransimission(def.gossip_retransimission)
|
||||||
|
.max_messages_per_rpc(def.max_messages_per_rpc)
|
||||||
|
.max_ihave_length(def.max_ihave_length)
|
||||||
|
.max_ihave_messages(def.max_ihave_messages)
|
||||||
|
.iwant_followup_time(def.iwant_followup_time)
|
||||||
|
.published_message_ids_cache_time(def.published_message_ids_cache_time);
|
||||||
|
|
||||||
|
if def.validate_messages {
|
||||||
|
builder = builder.validate_messages();
|
||||||
|
}
|
||||||
|
if def.do_px {
|
||||||
|
builder = builder.do_px();
|
||||||
|
}
|
||||||
|
|
||||||
|
builder.build().unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mod secret_key_serde {
|
||||||
|
use libp2p::identity::secp256k1;
|
||||||
|
use serde::de::Error;
|
||||||
|
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||||
|
|
||||||
|
pub fn serialize<S>(key: &secp256k1::SecretKey, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
let hex_str = hex::encode(key.to_bytes());
|
||||||
|
hex_str.serialize(serializer)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deserialize<'de, D>(deserializer: D) -> Result<secp256k1::SecretKey, D::Error>
|
||||||
|
where
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
let hex_str = String::deserialize(deserializer)?;
|
||||||
|
let mut key_bytes = hex::decode(hex_str).map_err(|e| D::Error::custom(format!("{e}")))?;
|
||||||
|
secp256k1::SecretKey::try_from_bytes(key_bytes.as_mut_slice())
|
||||||
|
.map_err(|e| D::Error::custom(format!("{e}")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn config_serde() {
|
||||||
|
let config: SwarmConfig = Default::default();
|
||||||
|
|
||||||
|
let serialized = serde_json::to_string(&config).unwrap();
|
||||||
|
println!("{serialized}");
|
||||||
|
|
||||||
|
let deserialized: SwarmConfig = serde_json::from_str(serialized.as_str()).unwrap();
|
||||||
|
assert_eq!(deserialized.host, config.host);
|
||||||
|
assert_eq!(deserialized.port, config.port);
|
||||||
|
assert_eq!(deserialized.node_key.to_bytes(), config.node_key.to_bytes());
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,9 +1,12 @@
|
||||||
|
mod config;
|
||||||
|
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
pub use config::SwarmConfig;
|
||||||
pub use libp2p;
|
pub use libp2p;
|
||||||
|
|
||||||
use blake2::digest::{consts::U32, Digest};
|
use blake2::digest::{consts::U32, Digest};
|
||||||
|
@ -23,7 +26,6 @@ pub use libp2p::{
|
||||||
};
|
};
|
||||||
use libp2p::{swarm::ConnectionId, tcp::tokio::Tcp};
|
use libp2p::{swarm::ConnectionId, tcp::tokio::Tcp};
|
||||||
pub use multiaddr::{multiaddr, Multiaddr, Protocol};
|
pub use multiaddr::{multiaddr, Multiaddr, Protocol};
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
/// Wraps [`libp2p::Swarm`], and config it for use within Nomos.
|
/// Wraps [`libp2p::Swarm`], and config it for use within Nomos.
|
||||||
pub struct Swarm {
|
pub struct Swarm {
|
||||||
|
@ -36,27 +38,6 @@ pub struct Behaviour {
|
||||||
gossipsub: gossipsub::Behaviour,
|
gossipsub: gossipsub::Behaviour,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct SwarmConfig {
|
|
||||||
// Listening IPv4 address
|
|
||||||
pub host: std::net::Ipv4Addr,
|
|
||||||
// TCP listening port. Use 0 for random
|
|
||||||
pub port: u16,
|
|
||||||
// Secp256k1 private key in Hex format (`0x123...abc`). Default random
|
|
||||||
#[serde(with = "secret_key_serde", default = "secp256k1::SecretKey::generate")]
|
|
||||||
pub node_key: secp256k1::SecretKey,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for SwarmConfig {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
host: std::net::Ipv4Addr::new(0, 0, 0, 0),
|
|
||||||
port: 60000,
|
|
||||||
node_key: secp256k1::SecretKey::generate(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(thiserror::Error, Debug)]
|
#[derive(thiserror::Error, Debug)]
|
||||||
pub enum SwarmError {
|
pub enum SwarmError {
|
||||||
#[error("duplicate dialing")]
|
#[error("duplicate dialing")]
|
||||||
|
@ -93,7 +74,7 @@ impl Swarm {
|
||||||
// to prevent all messages from a peer being filtered as duplicates.
|
// to prevent all messages from a peer being filtered as duplicates.
|
||||||
let gossipsub = gossipsub::Behaviour::new(
|
let gossipsub = gossipsub::Behaviour::new(
|
||||||
gossipsub::MessageAuthenticity::Author(local_peer_id),
|
gossipsub::MessageAuthenticity::Author(local_peer_id),
|
||||||
gossipsub::ConfigBuilder::default()
|
gossipsub::ConfigBuilder::from(config.gossipsub_config.clone())
|
||||||
.validation_mode(gossipsub::ValidationMode::None)
|
.validation_mode(gossipsub::ValidationMode::None)
|
||||||
.message_id_fn(compute_message_id)
|
.message_id_fn(compute_message_id)
|
||||||
.build()?,
|
.build()?,
|
||||||
|
@ -186,45 +167,3 @@ fn compute_message_id(message: &Message) -> MessageId {
|
||||||
hasher.update(&message.data);
|
hasher.update(&message.data);
|
||||||
MessageId::from(hasher.finalize().to_vec())
|
MessageId::from(hasher.finalize().to_vec())
|
||||||
}
|
}
|
||||||
|
|
||||||
mod secret_key_serde {
|
|
||||||
use libp2p::identity::secp256k1;
|
|
||||||
use serde::de::Error;
|
|
||||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
|
||||||
|
|
||||||
pub fn serialize<S>(key: &secp256k1::SecretKey, serializer: S) -> Result<S::Ok, S::Error>
|
|
||||||
where
|
|
||||||
S: Serializer,
|
|
||||||
{
|
|
||||||
let hex_str = hex::encode(key.to_bytes());
|
|
||||||
hex_str.serialize(serializer)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn deserialize<'de, D>(deserializer: D) -> Result<secp256k1::SecretKey, D::Error>
|
|
||||||
where
|
|
||||||
D: Deserializer<'de>,
|
|
||||||
{
|
|
||||||
let hex_str = String::deserialize(deserializer)?;
|
|
||||||
let mut key_bytes = hex::decode(hex_str).map_err(|e| D::Error::custom(format!("{e}")))?;
|
|
||||||
secp256k1::SecretKey::try_from_bytes(key_bytes.as_mut_slice())
|
|
||||||
.map_err(|e| D::Error::custom(format!("{e}")))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn config_serde() {
|
|
||||||
let config: SwarmConfig = Default::default();
|
|
||||||
|
|
||||||
let serialized = serde_json::to_string(&config).unwrap();
|
|
||||||
println!("{serialized}");
|
|
||||||
|
|
||||||
let deserialized: SwarmConfig = serde_json::from_str(serialized.as_str()).unwrap();
|
|
||||||
assert_eq!(deserialized.host, config.host);
|
|
||||||
assert_eq!(deserialized.port, config.port);
|
|
||||||
assert_eq!(deserialized.node_key.to_bytes(), config.node_key.to_bytes());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue