Pass gossipsub params to go-waku node (#50)

* Pass gossipsub params to go-waku node

* Make gossipsub conf test run by default

* Use merge commit to master for go-waku

* Add tests for info logs
This commit is contained in:
gusto 2023-03-13 10:51:15 +02:00 committed by GitHub
parent 31255ac4c0
commit 54a80f83cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 205 additions and 7 deletions

View File

@ -9,9 +9,9 @@ mod utils;
pub use node::{
waku_create_content_topic, waku_create_pubsub_topic, waku_dafault_pubsub_topic,
waku_dns_discovery, waku_new, Aes256Gcm, DnsInfo, Initialized, Key, Multiaddr, Protocol,
PublicKey, Running, SecretKey, WakuLogLevel, WakuNodeConfig, WakuNodeHandle, WakuPeerData,
WakuPeers,
waku_dns_discovery, waku_new, Aes256Gcm, DnsInfo, GossipSubParams, Initialized, Key, Multiaddr,
Protocol, PublicKey, Running, SecretKey, WakuLogLevel, WakuNodeConfig, WakuNodeHandle,
WakuPeerData, WakuPeers,
};
pub use general::{

View File

@ -67,6 +67,129 @@ pub struct WakuNodeConfig {
#[default(Some(9000))]
#[serde(rename = "discV5UDPPort")]
pub discv5_udp_port: Option<u16>,
/// Gossipsub custom configuration.
pub gossipsub_params: Option<GossipSubParams>,
}
#[derive(Clone, SmartDefault, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct GossipSubParams {
/// Sets the optimal degree for a GossipSub topic mesh. For example, if D == 6,
/// each peer will want to have about six peers in their mesh for each topic they're subscribed to.
/// `d` should be set somewhere between `dlo` and `dhi`.
#[serde(rename = "d")]
pub d: Option<i32>,
/// Sets the lower bound on the number of peers we keep in a GossipSub topic mesh.
/// If we have fewer than dlo peers, we will attempt to graft some more into the mesh at
/// the next heartbeat.
#[serde(rename = "d_low")]
pub dlo: Option<i32>,
/// Sets the upper bound on the number of peers we keep in a GossipSub topic mesh.
/// If we have more than dhi peers, we will select some to prune from the mesh at the next heartbeat.
#[serde(rename = "d_high")]
pub dhi: Option<i32>,
/// `dscore` affects how peers are selected when pruning a mesh due to over subscription.
/// At least dscore of the retained peers will be high-scoring, while the remainder are
/// chosen randomly.
#[serde(rename = "d_score")]
pub dscore: Option<i32>,
/// Sets the quota for the number of outbound connections to maintain in a topic mesh.
/// When the mesh is pruned due to over subscription, we make sure that we have outbound connections
/// to at least dout of the survivor peers. This prevents sybil attackers from overwhelming
/// our mesh with incoming connections.
///
/// dout must be set below Dlo, and must not exceed D / 2.
#[serde(rename = "d_out")]
pub dout: Option<i32>,
/// Controls the size of the message cache used for gossip.
/// The message cache will remember messages for history_length heartbeats.
pub history_length: Option<i32>,
/// Controls how many cached message ids we will advertise in
/// IHAVE gossip messages. When asked for our seen message IDs, we will return
/// only those from the most recent history_gossip heartbeats. The slack between
/// history_gossip and history_length allows us to avoid advertising messages
/// that will be expired by the time they're requested.
///
/// history_gossip must be less than or equal to history_length to
/// avoid a runtime panic.
pub history_gossip: Option<i32>,
/// dlazy affects how many peers we will emit gossip to at each heartbeat.
/// We will send gossip to at least dlazy peers outside our mesh. The actual
/// number may be more, depending on gossip_factor and how many peers we're
/// connected to.
pub dlazy: Option<i32>,
/// `gossip_factor` affects how many peers we will emit gossip to at each heartbeat.
/// We will send gossip to gossip_factor * (total number of non-mesh peers), or
/// Dlazy, whichever is greater.
pub gossip_factor: Option<f64>,
/// Controls how many times we will allow a peer to request
/// the same message id through IWANT gossip before we start ignoring them. This is designed
/// to prevent peers from spamming us with requests and wasting our resources.
pub gossip_retransmission: Option<i32>,
/// Short delay before the heartbeat timer begins
/// after the router is initialized.
pub heartbeat_initial_delay_ms: Option<i32>,
/// Controls the time between heartbeats.
pub heartbeat_interval_seconds: Option<i32>,
/// Duration threshold for heartbeat processing before emitting
/// a warning; this would be indicative of an overloaded peer.
pub slow_heartbeat_warning: Option<f64>,
/// Controls how long we keep track of the fanout state. If it's been
/// fanout_ttl_seconds since we've published to a topic that we're not subscribed to,
/// we'll delete the fanout map for that topic.
pub fanout_ttl_seconds: Option<i32>,
/// Controls the number of peers to include in prune Peer eXchange.
/// When we prune a peer that's eligible for PX (has a good score, etc), we will try to
/// send them signed peer records for up to prune_peers other peers that we
/// know of.
pub prune_peers: Option<i32>,
/// Controls the backoff time for pruned peers. This is how long
/// a peer must wait before attempting to graft into our mesh again after being pruned.
/// When pruning a peer, we send them our value of PruneBackoff so they know
/// the minimum time to wait. Peers running older versions may not send a backoff time,
/// so if we receive a prune message without one, we will wait at least PruneBackoff
/// before attempting to re-graft.
pub prune_backoff_seconds: Option<i32>,
/// Controls the backoff time to use when unsuscribing
/// from a topic. A peer should not resubscribe to this topic before this
/// duration.
pub unsubscribe_backoff_seconds: Option<i32>,
/// Controls the number of active connection attempts for peers obtained through PX.
pub connectors: Option<i32>,
/// Sets the maximum number of pending connections for peers attempted through px.
pub max_pending_connections: Option<i32>,
/// Controls the timeout for connection attempts.
pub connection_timeout_seconds: Option<i32>,
/// Number of heartbeat ticks for attempting to reconnect direct peers
/// that are not currently connected.
pub direct_connect_ticks: Option<u64>,
/// Initial delay before opening connections to direct peers
pub direct_connect_initial_delay_seconds: Option<i32>,
/// Number of heartbeat ticks for attempting to improve the mesh
/// with opportunistic grafting. Every opportunistic_graft_ticks we will attempt to select some
/// high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls
/// below a threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds).
pub opportunistic_graft_ticks: Option<u64>,
/// Number of peers to opportunistically graft.
pub opportunistic_graft_peers: Option<i32>,
/// If a GRAFT comes before graft_flood_threshold_seconds has elapsed since the last PRUNE,
/// then there is an extra score penalty applied to the peer through P7.
pub graft_flood_threshold_seconds: Option<i32>,
/// Maximum number of messages to include in an IHAVE message.
/// Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
/// peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
/// default if your system is pushing more than 5000 messages in history_gossip heartbeats;
/// with the defaults this is 1666 messages/s.
#[serde(rename = "maxIHaveLength")]
pub max_ihave_length: Option<i32>,
/// Maximum number of IHAVE messages to accept from a peer within a heartbeat.
#[serde(rename = "maxIHaveMessages")]
pub max_ihave_messages: Option<i32>,
/// Time to wait for a message requested through IWANT following an IHAVE advertisement.
/// If the message is not received within this window, a broken promise is declared and
/// the router may apply bahavioural penalties.
#[serde(rename = "iwantFollowupTimeSeconds")]
pub iwant_followup_time_seconds: Option<i32>,
}
#[derive(Clone, Default, Serialize, Deserialize, Debug)]

View File

@ -24,7 +24,7 @@ use crate::general::{
WakuMessage, WakuPubSubTopic,
};
pub use config::{WakuLogLevel, WakuNodeConfig};
pub use config::{GossipSubParams, WakuLogLevel, WakuNodeConfig};
pub use discovery::{waku_dns_discovery, DnsInfo};
pub use peers::{Protocol, WakuPeerData, WakuPeers};
pub use relay::{waku_create_content_topic, waku_create_pubsub_topic, waku_dafault_pubsub_topic};

View File

@ -10,8 +10,9 @@ use std::{collections::HashSet, str::from_utf8};
use tokio::sync::mpsc::{self, Sender};
use tokio::time;
use waku_bindings::{
waku_new, waku_set_event_callback, Encoding, Event, Key, MessageId, ProtocolId, Running,
WakuContentTopic, WakuLogLevel, WakuMessage, WakuNodeConfig, WakuNodeHandle,
waku_new, waku_set_event_callback, Encoding, Event, GossipSubParams, Key, MessageId,
ProtocolId, Running, WakuContentTopic, WakuLogLevel, WakuMessage, WakuNodeConfig,
WakuNodeHandle,
};
const ECHO_TIMEOUT: u64 = 10;
@ -248,6 +249,80 @@ async fn default_echo() -> Result<(), String> {
Ok(())
}
#[test]
#[serial]
fn gossipsub_config() -> Result<(), String> {
let params = GossipSubParams {
d: Some(6),
dlo: Some(3),
dhi: Some(12),
dscore: Some(10),
dout: Some(8),
history_length: Some(500),
history_gossip: Some(3),
dlazy: Some(12),
gossip_factor: Some(0.25),
gossip_retransmission: Some(4),
heartbeat_initial_delay_ms: Some(500),
heartbeat_interval_seconds: Some(60),
slow_heartbeat_warning: Some(0.5),
fanout_ttl_seconds: Some(60),
prune_peers: Some(3),
prune_backoff_seconds: Some(900),
unsubscribe_backoff_seconds: Some(60),
connectors: Some(3),
max_pending_connections: Some(50),
connection_timeout_seconds: Some(15),
direct_connect_ticks: Some(5),
direct_connect_initial_delay_seconds: Some(5),
opportunistic_graft_ticks: Some(8),
opportunistic_graft_peers: Some(2),
graft_flood_threshold_seconds: Some(120),
max_ihave_length: Some(32),
max_ihave_messages: Some(8),
iwant_followup_time_seconds: Some(120),
};
let config = WakuNodeConfig {
gossipsub_params: params.into(),
log_level: Some(WakuLogLevel::Error),
..Default::default()
};
let node = waku_new(Some(config))?;
let node = node.start()?;
node.stop()?;
Ok(())
}
#[test]
#[serial]
fn loglevel_error() -> Result<(), String> {
let config = WakuNodeConfig {
log_level: Some(WakuLogLevel::Error),
..Default::default()
};
let node = waku_new(Some(config))?;
let node = node.start()?;
node.stop()?;
Ok(())
}
#[test]
#[serial]
fn loglevel_info() -> Result<(), String> {
let config = WakuNodeConfig {
log_level: Some(WakuLogLevel::Info),
..Default::default()
};
let node = waku_new(Some(config))?;
let node = node.start()?;
node.stop()?;
Ok(())
}
#[test]
#[serial]
fn node_restart() {

@ -1 +1 @@
Subproject commit cedaa670c75b74990571248edbd83dc445c99bac
Subproject commit 3c4a863cb7c898a6c551a51288a6badf809fd6f5