Refactor Gossipsub Parameters To Make Them More Configurable (#421)

Co-authored-by: nisdas <nishdas93@gmail.com>
This commit is contained in:
Steven Allen 2021-05-03 08:59:15 -07:00 committed by GitHub
parent cbb7bfc1f1
commit 0094708cc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 271 additions and 177 deletions

View File

@ -17,6 +17,8 @@ type gossipTracer struct {
msgID MsgIdFunction
followUpTime time.Duration
// promises for messages by message ID; for each message tracked, we track the promise
// expiration time for each peer.
promises map[string]map[peer.ID]time.Time
@ -39,6 +41,7 @@ func (gt *gossipTracer) Start(gs *GossipSubRouter) {
}
gt.msgID = gs.p.msgID
gt.followUpTime = gs.params.IWantFollowupTime
}
// track a promise to deliver a message from a list of msgIDs we are requesting
@ -61,7 +64,7 @@ func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs []string) {
_, ok = promises[p]
if !ok {
promises[p] = time.Now().Add(GossipSubIWantFollowupTime)
promises[p] = time.Now().Add(gt.followUpTime)
peerPromises, ok := gt.peerPromises[p]
if !ok {
peerPromises = make(map[string]struct{})

View File

@ -11,13 +11,8 @@ import (
func TestBrokenPromises(t *testing.T) {
// tests that unfullfilled promises are tracked correctly
originalGossipSubIWantFollowupTime := GossipSubIWantFollowupTime
GossipSubIWantFollowupTime = 100 * time.Millisecond
defer func() {
GossipSubIWantFollowupTime = originalGossipSubIWantFollowupTime
}()
gt := newGossipTracer()
gt.followUpTime = 100 * time.Millisecond
peerA := peer.ID("A")
peerB := peer.ID("B")

View File

@ -28,175 +28,192 @@ const (
GossipSubID_v11 = protocol.ID("/meshsub/1.1.0")
)
// Defines the default gossipsub parameters.
var (
GossipSubD = 6
GossipSubDlo = 5
GossipSubDhi = 12
GossipSubDscore = 4
GossipSubDout = 2
GossipSubHistoryLength = 5
GossipSubHistoryGossip = 3
GossipSubDlazy = 6
GossipSubGossipFactor = 0.25
GossipSubGossipRetransmission = 3
GossipSubHeartbeatInitialDelay = 100 * time.Millisecond
GossipSubHeartbeatInterval = 1 * time.Second
GossipSubFanoutTTL = 60 * time.Second
GossipSubPrunePeers = 16
GossipSubPruneBackoff = time.Minute
GossipSubConnectors = 8
GossipSubMaxPendingConnections = 128
GossipSubConnectionTimeout = 30 * time.Second
GossipSubDirectConnectTicks uint64 = 300
GossipSubDirectConnectInitialDelay = time.Second
GossipSubOpportunisticGraftTicks uint64 = 60
GossipSubOpportunisticGraftPeers = 2
GossipSubGraftFloodThreshold = 10 * time.Second
GossipSubMaxIHaveLength = 5000
GossipSubMaxIHaveMessages = 10
GossipSubIWantFollowupTime = 3 * time.Second
)
// GossipSubParams defines all the gossipsub specific parameters.
type GossipSubParams struct {
// overlay parameters.
// GossipSubD sets the optimal degree for a GossipSub topic mesh. For example, if GossipSubD == 6,
// D sets the optimal degree for a GossipSub topic mesh. For example, if D == 6,
// each peer will want to have about six peers in their mesh for each topic they're subscribed to.
// GossipSubD should be set somewhere between GossipSubDlo and GossipSubDhi.
GossipSubD = 6
// D should be set somewhere between Dlo and Dhi.
D int
// GossipSubDlo sets the lower bound on the number of peers we keep in a GossipSub topic mesh.
// If we have fewer than GossipSubDlo peers, we will attempt to graft some more into the mesh at
// Dlo sets the lower bound on the number of peers we keep in a GossipSub topic mesh.
// If we have fewer than Dlo peers, we will attempt to graft some more into the mesh at
// the next heartbeat.
GossipSubDlo = 5
Dlo int
// GossipSubDhi sets the upper bound on the number of peers we keep in a GossipSub topic mesh.
// If we have more than GossipSubDhi peers, we will select some to prune from the mesh at the next heartbeat.
GossipSubDhi = 12
// Dhi sets the upper bound on the number of peers we keep in a GossipSub topic mesh.
// If we have more than Dhi peers, we will select some to prune from the mesh at the next heartbeat.
Dhi int
// GossipSubDscore affects how peers are selected when pruning a mesh due to over subscription.
// At least GossipSubDscore of the retained peers will be high-scoring, while the remainder are
// Dscore affects how peers are selected when pruning a mesh due to over subscription.
// At least Dscore of the retained peers will be high-scoring, while the remainder are
// chosen randomly.
GossipSubDscore = 4
Dscore int
// GossipSubDout sets the quota for the number of outbound connections to maintain in a topic mesh.
// Dout sets the quota for the number of outbound connections to maintain in a topic mesh.
// When the mesh is pruned due to over subscription, we make sure that we have outbound connections
// to at least GossipSubDout of the survivor peers. This prevents sybil attackers from overwhelming
// to at least Dout of the survivor peers. This prevents sybil attackers from overwhelming
// our mesh with incoming connections.
//
// GossipSubDout must be set below GossipSubDlo, and must not exceed GossipSubD / 2.
GossipSubDout = 2
// Dout must be set below Dlo, and must not exceed D / 2.
Dout int
// gossip parameters
// GossipSubHistoryLength controls the size of the message cache used for gossip.
// The message cache will remember messages for GossipSubHistoryLength heartbeats.
GossipSubHistoryLength = 5
// HistoryLength controls the size of the message cache used for gossip.
// The message cache will remember messages for HistoryLength heartbeats.
HistoryLength int
// GossipSubHistoryGossip controls how many cached message ids we will advertise in
// HistoryGossip controls how many cached message ids we will advertise in
// IHAVE gossip messages. When asked for our seen message IDs, we will return
// only those from the most recent GossipSubHistoryGossip heartbeats. The slack between
// GossipSubHistoryGossip and GossipSubHistoryLength allows us to avoid advertising messages
// only those from the most recent HistoryGossip heartbeats. The slack between
// HistoryGossip and HistoryLength allows us to avoid advertising messages
// that will be expired by the time they're requested.
//
// GossipSubHistoryGossip must be less than or equal to GossipSubHistoryLength to
// HistoryGossip must be less than or equal to HistoryLength to
// avoid a runtime panic.
GossipSubHistoryGossip = 3
HistoryGossip int
// GossipSubDlazy affects how many peers we will emit gossip to at each heartbeat.
// We will send gossip to at least GossipSubDlazy peers outside our mesh. The actual
// number may be more, depending on GossipSubGossipFactor and how many peers we're
// Dlazy affects how many peers we will emit gossip to at each heartbeat.
// We will send gossip to at least Dlazy peers outside our mesh. The actual
// number may be more, depending on GossipFactor and how many peers we're
// connected to.
GossipSubDlazy = 6
Dlazy int
// GossipSubGossipFactor affects how many peers we will emit gossip to at each heartbeat.
// We will send gossip to GossipSubGossipFactor * (total number of non-mesh peers), or
// GossipSubDlazy, whichever is greater.
GossipSubGossipFactor = 0.25
// GossipFactor affects how many peers we will emit gossip to at each heartbeat.
// We will send gossip to GossipFactor * (total number of non-mesh peers), or
// Dlazy, whichever is greater.
GossipFactor float64
// GossipSubGossipRetransmission controls how many times we will allow a peer to request
// GossipRetransmission controls how many times we will allow a peer to request
// the same message id through IWANT gossip before we start ignoring them. This is designed
// to prevent peers from spamming us with requests and wasting our resources.
GossipSubGossipRetransmission = 3
GossipRetransmission int
// heartbeat interval
// GossipSubHeartbeatInitialDelay is the short delay before the heartbeat timer begins
// HeartbeatInitialDelay is the short delay before the heartbeat timer begins
// after the router is initialized.
GossipSubHeartbeatInitialDelay = 100 * time.Millisecond
HeartbeatInitialDelay time.Duration
// GossipSubHeartbeatInterval controls the time between heartbeats.
GossipSubHeartbeatInterval = 1 * time.Second
// HeartbeatInterval controls the time between heartbeats.
HeartbeatInterval time.Duration
// GossipSubFanoutTTL controls how long we keep track of the fanout state. If it's been
// GossipSubFanoutTTL since we've published to a topic that we're not subscribed to,
// FanoutTTL controls how long we keep track of the fanout state. If it's been
// FanoutTTL since we've published to a topic that we're not subscribed to,
// we'll delete the fanout map for that topic.
GossipSubFanoutTTL = 60 * time.Second
FanoutTTL time.Duration
// GossipSubPrunePeers controls the number of peers to include in prune Peer eXchange.
// PrunePeers controls the number of peers to include in prune Peer eXchange.
// When we prune a peer that's eligible for PX (has a good score, etc), we will try to
// send them signed peer records for up to GossipSubPrunePeers other peers that we
// send them signed peer records for up to PrunePeers other peers that we
// know of.
GossipSubPrunePeers = 16
PrunePeers int
// GossipSubPruneBackoff controls the backoff time for pruned peers. This is how long
// PruneBackoff controls the backoff time for pruned peers. This is how long
// a peer must wait before attempting to graft into our mesh again after being pruned.
// When pruning a peer, we send them our value of GossipSubPruneBackoff so they know
// When pruning a peer, we send them our value of PruneBackoff so they know
// the minimum time to wait. Peers running older versions may not send a backoff time,
// so if we receive a prune message without one, we will wait at least GossipSubPruneBackoff
// so if we receive a prune message without one, we will wait at least PruneBackoff
// before attempting to re-graft.
GossipSubPruneBackoff = time.Minute
PruneBackoff time.Duration
// GossipSubConnectors controls the number of active connection attempts for peers obtained through PX.
GossipSubConnectors = 8
// Connectors controls the number of active connection attempts for peers obtained through PX.
Connectors int
// GossipSubMaxPendingConnections sets the maximum number of pending connections for peers attempted through px.
GossipSubMaxPendingConnections = 128
// MaxPendingConnections sets the maximum number of pending connections for peers attempted through px.
MaxPendingConnections int
// GossipSubConnectionTimeout controls the timeout for connection attempts.
GossipSubConnectionTimeout = 30 * time.Second
// ConnectionTimeout controls the timeout for connection attempts.
ConnectionTimeout time.Duration
// GossipSubDirectConnectTicks is the number of heartbeat ticks for attempting to reconnect direct peers
// DirectConnectTicks is the number of heartbeat ticks for attempting to reconnect direct peers
// that are not currently connected.
GossipSubDirectConnectTicks uint64 = 300
DirectConnectTicks uint64
// GossipSubDirectConnectInitialDelay is the initial delay before opening connections to direct peers
GossipSubDirectConnectInitialDelay = time.Second
// DirectConnectInitialDelay is the initial delay before opening connections to direct peers
DirectConnectInitialDelay time.Duration
// GossipSubOpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh
// with opportunistic grafting. Every GossipSubOpportunisticGraftTicks we will attempt to select some
// OpportunisticGraftTicks is the number of heartbeat ticks for attempting to improve the mesh
// with opportunistic grafting. Every OpportunisticGraftTicks we will attempt to select some
// high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls
// below a threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds).
GossipSubOpportunisticGraftTicks uint64 = 60
OpportunisticGraftTicks uint64
// GossipSubOpportunisticGraftPeers is the number of peers to opportunistically graft.
GossipSubOpportunisticGraftPeers = 2
// OpportunisticGraftPeers is the number of peers to opportunistically graft.
OpportunisticGraftPeers int
// If a GRAFT comes before GossipSubGraftFloodThreshold has elapsed since the last PRUNE,
// If a GRAFT comes before GraftFloodThreshold has elapsed since the last PRUNE,
// then there is an extra score penalty applied to the peer through P7.
GossipSubGraftFloodThreshold = 10 * time.Second
GraftFloodThreshold time.Duration
// GossipSubMaxIHaveLength is the maximum number of messages to include in an IHAVE message.
// MaxIHaveLength is the maximum number of messages to include in an IHAVE message.
// Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
// peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
// default if your system is pushing more than 5000 messages in GossipSubHistoryGossip heartbeats;
// default if your system is pushing more than 5000 messages in HistoryGossip heartbeats;
// with the defaults this is 1666 messages/s.
GossipSubMaxIHaveLength = 5000
MaxIHaveLength int
// GossipSubMaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat.
GossipSubMaxIHaveMessages = 10
// MaxIHaveMessages is the maximum number of IHAVE messages to accept from a peer within a heartbeat.
MaxIHaveMessages int
// Time to wait for a message requested through IWANT following an IHAVE advertisement.
// If the message is not received within this window, a broken promise is declared and
// the router may apply bahavioural penalties.
GossipSubIWantFollowupTime = 3 * time.Second
)
IWantFollowupTime time.Duration
}
// NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
params := DefaultGossipSubParams()
rt := &GossipSubRouter{
peers: make(map[peer.ID]protocol.ID),
mesh: make(map[string]map[peer.ID]struct{}),
fanout: make(map[string]map[peer.ID]struct{}),
lastpub: make(map[string]int64),
gossip: make(map[peer.ID][]*pb.ControlIHave),
control: make(map[peer.ID]*pb.ControlMessage),
backoff: make(map[string]map[peer.ID]time.Time),
peerhave: make(map[peer.ID]int),
iasked: make(map[peer.ID]int),
outbound: make(map[peer.ID]bool),
connect: make(chan connectInfo, GossipSubMaxPendingConnections),
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
peers: make(map[peer.ID]protocol.ID),
mesh: make(map[string]map[peer.ID]struct{}),
fanout: make(map[string]map[peer.ID]struct{}),
lastpub: make(map[string]int64),
gossip: make(map[peer.ID][]*pb.ControlIHave),
control: make(map[peer.ID]*pb.ControlMessage),
backoff: make(map[string]map[peer.ID]time.Time),
peerhave: make(map[peer.ID]int),
iasked: make(map[peer.ID]int),
outbound: make(map[peer.ID]bool),
connect: make(chan connectInfo, params.MaxPendingConnections),
mcache: NewMessageCache(params.HistoryGossip, params.HistoryLength),
protos: GossipSubDefaultProtocols,
feature: GossipSubDefaultFeatures,
// these are configured per router to allow variation in tests
D: GossipSubD,
Dlo: GossipSubDlo,
Dhi: GossipSubDhi,
Dscore: GossipSubDscore,
Dout: GossipSubDout,
Dlazy: GossipSubDlazy,
// these must be pulled in to resolve races in tests... sigh.
directConnectTicks: GossipSubDirectConnectTicks,
opportunisticGraftTicks: GossipSubOpportunisticGraftTicks,
fanoutTTL: GossipSubFanoutTTL,
tagTracer: newTagTracer(h.ConnManager()),
params: params,
}
// hook the tag tracer
@ -204,6 +221,39 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
return NewPubSub(ctx, h, rt, opts...)
}
// DefaultGossipSubParams returns the default gossip sub parameters
// as a config.
func DefaultGossipSubParams() GossipSubParams {
return GossipSubParams{
D: GossipSubD,
Dlo: GossipSubDlo,
Dhi: GossipSubDhi,
Dscore: GossipSubDscore,
Dout: GossipSubDout,
HistoryLength: GossipSubHistoryLength,
HistoryGossip: GossipSubHistoryLength,
Dlazy: GossipSubDlazy,
GossipFactor: GossipSubGossipFactor,
GossipRetransmission: GossipSubGossipRetransmission,
HeartbeatInitialDelay: GossipSubHeartbeatInitialDelay,
HeartbeatInterval: GossipSubHeartbeatInterval,
FanoutTTL: GossipSubFanoutTTL,
PrunePeers: GossipSubPrunePeers,
PruneBackoff: GossipSubPruneBackoff,
Connectors: GossipSubConnectors,
MaxPendingConnections: GossipSubMaxPendingConnections,
ConnectionTimeout: GossipSubConnectionTimeout,
DirectConnectTicks: GossipSubDirectConnectTicks,
DirectConnectInitialDelay: GossipSubDirectConnectInitialDelay,
OpportunisticGraftTicks: GossipSubOpportunisticGraftTicks,
OpportunisticGraftPeers: GossipSubOpportunisticGraftPeers,
GraftFloodThreshold: GossipSubGraftFloodThreshold,
MaxIHaveLength: GossipSubMaxIHaveLength,
MaxIHaveMessages: GossipSubMaxIHaveMessages,
IWantFollowupTime: GossipSubIWantFollowupTime,
}
}
// WithPeerScore is a gossipsub router option that enables peer scoring.
func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Option {
return func(ps *PubSub) error {
@ -318,7 +368,24 @@ func WithDirectConnectTicks(t uint64) Option {
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
}
gs.directConnectTicks = t
gs.params.DirectConnectTicks = t
return nil
}
}
// WithGossipSubParams is a gossip sub router option that allows a custom
// config to be set when instantiating the gossipsub router.
func WithGossipSubParams(cfg GossipSubParams) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
return fmt.Errorf("pubsub router is not gossipsub")
}
// Overwrite current config and associated variables in the router.
gs.params = cfg
gs.connect = make(chan connectInfo, cfg.MaxPendingConnections)
gs.mcache = NewMessageCache(cfg.HistoryGossip, cfg.HistoryLength)
return nil
}
}
@ -355,6 +422,9 @@ type GossipSubRouter struct {
tagTracer *tagTracer
gate *peerGater
// config for gossipsub parameters
params GossipSubParams
// whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted
// nodes.
doPX bool
@ -384,19 +454,6 @@ type GossipSubRouter struct {
// number of heartbeats since the beginning of time; this allows us to amortize some resource
// clean up -- eg backoff clean up.
heartbeatTicks uint64
// overly parameter "constants"
// these are pulled from their global value or else the race detector is angry on travis
// it also allows us to change them per peer in tests, which is a plus
D, Dlo, Dhi, Dscore, Dout, Dlazy int
// tick "constants" for triggering direct connect and opportunistic grafting
// these are pulled from their global value or else the race detector is angry on travis
directConnectTicks, opportunisticGraftTicks uint64
// fanout expiry ttl "constant"
// this is pulled from its global value or else the race detector is angry on travis
fanoutTTL time.Duration
}
type connectInfo struct {
@ -428,15 +485,15 @@ func (gs *GossipSubRouter) Attach(p *PubSub) {
go gs.heartbeatTimer()
// start the PX connectors
for i := 0; i < GossipSubConnectors; i++ {
for i := 0; i < gs.params.Connectors; i++ {
go gs.connector()
}
// connect to direct peers
if len(gs.direct) > 0 {
go func() {
if GossipSubDirectConnectInitialDelay > 0 {
time.Sleep(GossipSubDirectConnectInitialDelay)
if gs.params.DirectConnectInitialDelay > 0 {
time.Sleep(gs.params.DirectConnectInitialDelay)
}
for p := range gs.direct {
gs.connect <- connectInfo{p: p}
@ -508,10 +565,10 @@ func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool {
gsPeers = len(gs.mesh[topic])
if suggested == 0 {
suggested = gs.Dlo
suggested = gs.params.Dlo
}
if fsPeers+gsPeers >= suggested || gsPeers >= gs.Dhi {
if fsPeers+gsPeers >= suggested || gsPeers >= gs.params.Dhi {
return true
}
@ -560,12 +617,12 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
// IHAVE flood protection
gs.peerhave[p]++
if gs.peerhave[p] > GossipSubMaxIHaveMessages {
if gs.peerhave[p] > gs.params.MaxIHaveMessages {
log.Debugf("IHAVE: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", p, gs.peerhave[p])
return nil
}
if gs.iasked[p] >= GossipSubMaxIHaveLength {
if gs.iasked[p] >= gs.params.MaxIHaveLength {
log.Debugf("IHAVE: peer %s has already advertised too many messages (%d); ignoring", p, gs.iasked[p])
return nil
}
@ -591,8 +648,8 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
}
iask := len(iwant)
if iask+gs.iasked[p] > GossipSubMaxIHaveLength {
iask = GossipSubMaxIHaveLength - gs.iasked[p]
if iask+gs.iasked[p] > gs.params.MaxIHaveLength {
iask = gs.params.MaxIHaveLength - gs.iasked[p]
}
log.Debugf("IHAVE: Asking for %d out of %d messages from %s", iask, len(iwant), p)
@ -630,7 +687,7 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
continue
}
if count > GossipSubGossipRetransmission {
if count > gs.params.GossipRetransmission {
log.Debugf("IWANT: Peer %s has asked for message %s too many times; ignoring request", p, mid)
continue
}
@ -696,7 +753,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
// no PX
doPX = false
// check the flood cutoff -- is the GRAFT coming too fast?
floodCutoff := expire.Add(GossipSubGraftFloodThreshold - GossipSubPruneBackoff)
floodCutoff := expire.Add(gs.params.GraftFloodThreshold - gs.params.PruneBackoff)
if now.Before(floodCutoff) {
// extra penalty
gs.score.AddPenalty(p, 1)
@ -723,7 +780,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
// check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts
// from peers with outbound connections; this is a defensive check to restrict potential
// mesh takeover attacks combined with love bombing
if len(peers) >= gs.Dhi && !gs.outbound[p] {
if len(peers) >= gs.params.Dhi && !gs.outbound[p] {
prune = append(prune, topic)
gs.addBackoff(p, topic)
continue
@ -781,7 +838,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
}
func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) {
gs.doAddBackoff(p, topic, GossipSubPruneBackoff)
gs.doAddBackoff(p, topic, gs.params.PruneBackoff)
}
func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.Duration) {
@ -797,9 +854,9 @@ func (gs *GossipSubRouter) doAddBackoff(p peer.ID, topic string, interval time.D
}
func (gs *GossipSubRouter) pxConnect(peers []*pb.PeerInfo) {
if len(peers) > GossipSubPrunePeers {
if len(peers) > gs.params.PrunePeers {
shufflePeerInfo(peers)
peers = peers[:GossipSubPrunePeers]
peers = peers[:gs.params.PrunePeers]
}
toconnect := make([]connectInfo, 0, len(peers))
@ -866,7 +923,7 @@ func (gs *GossipSubRouter) connector() {
}
}
ctx, cancel := context.WithTimeout(gs.p.ctx, GossipSubConnectionTimeout)
ctx, cancel := context.WithTimeout(gs.p.ctx, gs.params.ConnectionTimeout)
err := gs.p.host.Connect(ctx, peer.AddrInfo{ID: ci.p})
cancel()
if err != nil {
@ -923,7 +980,7 @@ func (gs *GossipSubRouter) Publish(msg *Message) {
gmap, ok = gs.fanout[topic]
if !ok || len(gmap) == 0 {
// we don't have any, pick some with score above the publish threshold
peers := gs.getPeers(topic, gs.D, func(p peer.ID) bool {
peers := gs.getPeers(topic, gs.params.D, func(p peer.ID) bool {
_, direct := gs.direct[p]
return !direct && gs.score.Score(p) >= gs.publishThreshold
})
@ -970,9 +1027,9 @@ func (gs *GossipSubRouter) Join(topic string) {
}
}
if len(gmap) < gs.D {
if len(gmap) < gs.params.D {
// we need more peers; eager, as this would get fixed in the next heartbeat
more := gs.getPeers(topic, gs.D-len(gmap), func(p peer.ID) bool {
more := gs.getPeers(topic, gs.params.D-len(gmap), func(p peer.ID) bool {
// filter our current peers, direct peers, and peers with negative scores
_, inMesh := gmap[p]
_, direct := gs.direct[p]
@ -986,7 +1043,7 @@ func (gs *GossipSubRouter) Join(topic string) {
delete(gs.fanout, topic)
delete(gs.lastpub, topic)
} else {
peers := gs.getPeers(topic, gs.D, func(p peer.ID) bool {
peers := gs.getPeers(topic, gs.params.D, func(p peer.ID) bool {
// filter direct peers and peers with negative score
_, direct := gs.direct[p]
return !direct && gs.score.Score(p) >= 0
@ -1215,14 +1272,14 @@ func fragmentMessageIds(msgIds []string, limit int) [][]string {
}
func (gs *GossipSubRouter) heartbeatTimer() {
time.Sleep(GossipSubHeartbeatInitialDelay)
time.Sleep(gs.params.HeartbeatInitialDelay)
select {
case gs.p.eval <- gs.heartbeat:
case <-gs.p.ctx.Done():
return
}
ticker := time.NewTicker(GossipSubHeartbeatInterval)
ticker := time.NewTicker(gs.params.HeartbeatInterval)
defer ticker.Stop()
for {
@ -1299,9 +1356,9 @@ func (gs *GossipSubRouter) heartbeat() {
}
// do we have enough peers?
if l := len(peers); l < gs.Dlo {
if l := len(peers); l < gs.params.Dlo {
backoff := gs.backoff[topic]
ineed := gs.D - l
ineed := gs.params.D - l
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
// filter our current and direct peers, peers we are backing off, and peers with negative score
_, inMesh := peers[p]
@ -1316,7 +1373,7 @@ func (gs *GossipSubRouter) heartbeat() {
}
// do we have too many peers?
if len(peers) > gs.Dhi {
if len(peers) > gs.params.Dhi {
plst := peerMapToList(peers)
// sort by score (but shuffle first for the case we don't use the score)
@ -1327,18 +1384,18 @@ func (gs *GossipSubRouter) heartbeat() {
// We keep the first D_score peers by score and the remaining up to D randomly
// under the constraint that we keep D_out peers in the mesh (if we have that many)
shufflePeers(plst[gs.Dscore:])
shufflePeers(plst[gs.params.Dscore:])
// count the outbound peers we are keeping
outbound := 0
for _, p := range plst[:gs.D] {
for _, p := range plst[:gs.params.D] {
if gs.outbound[p] {
outbound++
}
}
// if it's less than D_out, bubble up some outbound peers from the random selection
if outbound < gs.Dout {
if outbound < gs.params.Dout {
rotate := func(i int) {
// rotate the plst to the right and put the ith peer in the front
p := plst[i]
@ -1351,7 +1408,7 @@ func (gs *GossipSubRouter) heartbeat() {
// first bubble up all outbound peers already in the selection to the front
if outbound > 0 {
ihave := outbound
for i := 1; i < gs.D && ihave > 0; i++ {
for i := 1; i < gs.params.D && ihave > 0; i++ {
p := plst[i]
if gs.outbound[p] {
rotate(i)
@ -1361,8 +1418,8 @@ func (gs *GossipSubRouter) heartbeat() {
}
// now bubble up enough outbound peers outside the selection to the front
ineed := gs.Dout - outbound
for i := gs.D; i < len(plst) && ineed > 0; i++ {
ineed := gs.params.Dout - outbound
for i := gs.params.D; i < len(plst) && ineed > 0; i++ {
p := plst[i]
if gs.outbound[p] {
rotate(i)
@ -1372,14 +1429,14 @@ func (gs *GossipSubRouter) heartbeat() {
}
// prune the excess peers
for _, p := range plst[gs.D:] {
for _, p := range plst[gs.params.D:] {
log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
prunePeer(p)
}
}
// do we have enough outboud peers?
if len(peers) >= gs.Dlo {
if len(peers) >= gs.params.Dlo {
// count the outbound peers we have
outbound := 0
for p := range peers {
@ -1389,8 +1446,8 @@ func (gs *GossipSubRouter) heartbeat() {
}
// if it's less than D_out, select some peers with outbound connections and graft them
if outbound < gs.Dout {
ineed := gs.Dout - outbound
if outbound < gs.params.Dout {
ineed := gs.params.Dout - outbound
backoff := gs.backoff[topic]
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
// filter our current and direct peers, peers we are backing off, and peers with negative score
@ -1407,7 +1464,7 @@ func (gs *GossipSubRouter) heartbeat() {
}
// should we try to improve the mesh with opportunistic grafting?
if gs.heartbeatTicks%gs.opportunisticGraftTicks == 0 && len(peers) > 1 {
if gs.heartbeatTicks%gs.params.OpportunisticGraftTicks == 0 && len(peers) > 1 {
// Opportunistic grafting works as follows: we check the median score of peers in the
// mesh; if this score is below the opportunisticGraftThreshold, we select a few peers at
// random with score over the median.
@ -1426,7 +1483,7 @@ func (gs *GossipSubRouter) heartbeat() {
// if the median score is below the threshold, select a better peer (if any) and GRAFT
if medianScore < gs.opportunisticGraftThreshold {
backoff := gs.backoff[topic]
plst = gs.getPeers(topic, GossipSubOpportunisticGraftPeers, func(p peer.ID) bool {
plst = gs.getPeers(topic, gs.params.OpportunisticGraftPeers, func(p peer.ID) bool {
_, inMesh := peers[p]
_, doBackoff := backoff[p]
_, direct := gs.direct[p]
@ -1448,7 +1505,7 @@ func (gs *GossipSubRouter) heartbeat() {
// expire fanout for topics we haven't published to in a while
now := time.Now().UnixNano()
for topic, lastpub := range gs.lastpub {
if lastpub+int64(gs.fanoutTTL) < now {
if lastpub+int64(gs.params.FanoutTTL) < now {
delete(gs.fanout, topic)
delete(gs.lastpub, topic)
}
@ -1465,8 +1522,8 @@ func (gs *GossipSubRouter) heartbeat() {
}
// do we need more peers?
if len(peers) < gs.D {
ineed := gs.D - len(peers)
if len(peers) < gs.params.D {
ineed := gs.params.D - len(peers)
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
// filter our current and direct peers and peers with score above the publish threshold
_, inFanout := peers[p]
@ -1537,7 +1594,7 @@ func (gs *GossipSubRouter) clearBackoff() {
func (gs *GossipSubRouter) directConnect() {
// we donly do this every some ticks to allow pending connections to complete and account
// for restarts/downtime
if gs.heartbeatTicks%gs.directConnectTicks != 0 {
if gs.heartbeatTicks%gs.params.DirectConnectTicks != 0 {
return
}
@ -1608,7 +1665,7 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}
shuffleStrings(mids)
// if we are emitting more than GossipSubMaxIHaveLength mids, truncate the list
if len(mids) > GossipSubMaxIHaveLength {
if len(mids) > gs.params.MaxIHaveLength {
// we do the truncation (with shuffling) per peer below
log.Debugf("too many messages for gossip; will truncate IHAVE list (%d messages)", len(mids))
}
@ -1626,8 +1683,8 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}
}
}
target := gs.Dlazy
factor := int(GossipSubGossipFactor * float64(len(peers)))
target := gs.params.Dlazy
factor := int(gs.params.GossipFactor * float64(len(peers)))
if factor > target {
target = factor
}
@ -1642,11 +1699,11 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}
// Emit the IHAVE gossip to the selected peers.
for _, p := range peers {
peerMids := mids
if len(mids) > GossipSubMaxIHaveLength {
if len(mids) > gs.params.MaxIHaveLength {
// we do this per peer so that we emit a different set for each peer.
// we have enough redundancy in the system that this will significantly increase the message
// coverage when we do truncate.
peerMids = make([]string, GossipSubMaxIHaveLength)
peerMids = make([]string, gs.params.MaxIHaveLength)
shuffleStrings(mids)
copy(peerMids, mids)
}
@ -1749,11 +1806,11 @@ func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.Con
return &pb.ControlPrune{TopicID: &topic}
}
backoff := uint64(GossipSubPruneBackoff / time.Second)
backoff := uint64(gs.params.PruneBackoff / time.Second)
var px []*pb.PeerInfo
if doPX {
// select peers for Peer eXchange
peers := gs.getPeers(topic, GossipSubPrunePeers, func(xp peer.ID) bool {
peers := gs.getPeers(topic, gs.params.PrunePeers, func(xp peer.ID) bool {
return p != xp && gs.score.Score(xp) >= 0
})

View File

@ -967,10 +967,10 @@ func TestGossipsubStarTopology(t *testing.T) {
// configure the center of the star with a very low D
psubs[0].eval <- func() {
gs := psubs[0].rt.(*GossipSubRouter)
gs.D = 0
gs.Dlo = 0
gs.Dhi = 0
gs.Dscore = 0
gs.params.D = 0
gs.params.Dlo = 0
gs.params.Dhi = 0
gs.params.Dscore = 0
}
// add all peer addresses to the peerstores
@ -1051,10 +1051,10 @@ func TestGossipsubStarTopologyWithSignedPeerRecords(t *testing.T) {
// configure the center of the star with a very low D
psubs[0].eval <- func() {
gs := psubs[0].rt.(*GossipSubRouter)
gs.D = 0
gs.Dlo = 0
gs.Dhi = 0
gs.Dscore = 0
gs.params.D = 0
gs.params.Dlo = 0
gs.params.Dhi = 0
gs.params.Dscore = 0
}
// manually create signed peer records for each host and add them to the
@ -1346,6 +1346,45 @@ func TestGossipsubEnoughPeers(t *testing.T) {
}
}
func TestGossipsubCustomParams(t *testing.T) {
// in this test we score sinkhole a peer to exercise code paths relative to negative scores
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
params := DefaultGossipSubParams()
wantedFollowTime := 1 * time.Second
params.IWantFollowupTime = wantedFollowTime
customGossipFactor := 0.12
params.GossipFactor = customGossipFactor
wantedMaxPendingConns := 23
params.MaxPendingConnections = wantedMaxPendingConns
hosts := getNetHosts(t, ctx, 1)
psubs := getGossipsubs(ctx, hosts,
WithGossipSubParams(params))
if len(psubs) != 1 {
t.Fatalf("incorrect number of pusbub objects received: wanted %d but got %d", 1, len(psubs))
}
rt, ok := psubs[0].rt.(*GossipSubRouter)
if !ok {
t.Fatal("Did not get gossip sub router from pub sub object")
}
if rt.params.IWantFollowupTime != wantedFollowTime {
t.Errorf("Wanted %d of param GossipSubIWantFollowupTime but got %d", wantedFollowTime, rt.params.IWantFollowupTime)
}
if rt.params.GossipFactor != customGossipFactor {
t.Errorf("Wanted %f of param GossipSubGossipFactor but got %f", customGossipFactor, rt.params.GossipFactor)
}
if rt.params.MaxPendingConnections != wantedMaxPendingConns {
t.Errorf("Wanted %d of param GossipSubMaxPendingConnections but got %d", wantedMaxPendingConns, rt.params.MaxPendingConnections)
}
}
func TestGossipsubNegativeScore(t *testing.T) {
// in this test we score sinkhole a peer to exercise code paths relative to negative scores
ctx, cancel := context.WithCancel(context.Background())