make PX optional and disabled by default, gate by acceptPXThreshold
This commit is contained in:
parent
375b66b0c1
commit
f6f34cfc99
42
gossipsub.go
42
gossipsub.go
|
@ -89,7 +89,11 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
|
||||||
//
|
//
|
||||||
// These thresholds should generally be negative, allowing some information to disseminate from low
|
// These thresholds should generally be negative, allowing some information to disseminate from low
|
||||||
// scoring peers.
|
// scoring peers.
|
||||||
func WithPeerScore(params *PeerScoreParams, gossipThreshold, publishThreshold, graylistThreshold float64) Option {
|
//
|
||||||
|
// acceptPXThreshold is the score threshold below which PX will be ignored; this should be positive
|
||||||
|
// and limited to scores attainable by bootstrappers and other trusted nodes.
|
||||||
|
//
|
||||||
|
func WithPeerScore(params *PeerScoreParams, gossipThreshold, publishThreshold, graylistThreshold, acceptPXThreshold float64) Option {
|
||||||
return func(ps *PubSub) error {
|
return func(ps *PubSub) error {
|
||||||
gs, ok := ps.rt.(*GossipSubRouter)
|
gs, ok := ps.rt.(*GossipSubRouter)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -145,6 +149,22 @@ func WithFloodPublish(floodPublish bool) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithPeerExchange is a gossipsub router option that enables Peer eXchange on PRUNE.
|
||||||
|
// This should generally be enabled in bootstrappers and well connected/trusted nodes
|
||||||
|
// used for bootstrapping.
|
||||||
|
func WithPeerExchange(doPX bool) Option {
|
||||||
|
return func(ps *PubSub) error {
|
||||||
|
gs, ok := ps.rt.(*GossipSubRouter)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("pubsub router is not gossipsub")
|
||||||
|
}
|
||||||
|
|
||||||
|
gs.doPX = doPX
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// GossipSubRouter is a router that implements the gossipsub protocol.
|
// GossipSubRouter is a router that implements the gossipsub protocol.
|
||||||
// For each topic we have joined, we maintain an overlay through which
|
// For each topic we have joined, we maintain an overlay through which
|
||||||
// messages flow; this is the mesh map.
|
// messages flow; this is the mesh map.
|
||||||
|
@ -166,6 +186,14 @@ type GossipSubRouter struct {
|
||||||
tracer *pubsubTracer
|
tracer *pubsubTracer
|
||||||
score *peerScore
|
score *peerScore
|
||||||
|
|
||||||
|
// whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted
|
||||||
|
// nodes.
|
||||||
|
doPX bool
|
||||||
|
|
||||||
|
// threshold for accepting PX from a peer; this should be positive and limited to scores
|
||||||
|
// attainable by bootstrappers and trusted nodes
|
||||||
|
acceptPXThreshold float64
|
||||||
|
|
||||||
// threshold for peer score to emit/accept gossip
|
// threshold for peer score to emit/accept gossip
|
||||||
// If the peer score is below this threshold, we won't emit or accept gossip from the peer.
|
// If the peer score is below this threshold, we won't emit or accept gossip from the peer.
|
||||||
// When there is no score, this value is 0.
|
// When there is no score, this value is 0.
|
||||||
|
@ -369,7 +397,7 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
|
||||||
func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune {
|
func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune {
|
||||||
var prune []string
|
var prune []string
|
||||||
|
|
||||||
doPX := true
|
doPX := gs.doPX
|
||||||
score := gs.score.Score(p)
|
score := gs.score.Score(p)
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
|
@ -443,9 +471,9 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
|
||||||
|
|
||||||
px := prune.GetPeers()
|
px := prune.GetPeers()
|
||||||
if len(px) > 0 {
|
if len(px) > 0 {
|
||||||
// we ignore PX from peers with negative score
|
// we ignore PX from peers with insufficient score
|
||||||
if score < 0 {
|
if score < gs.acceptPXThreshold {
|
||||||
log.Debugf("PRUNE: ignoring PX from peer %s with negative score [score = %f, topic = %s]", p, score, topic)
|
log.Debugf("PRUNE: ignoring PX from peer %s with insufficient score [score = %f, topic = %s]", p, score, topic)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -920,7 +948,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string,
|
||||||
delete(toprune, p)
|
delete(toprune, p)
|
||||||
prune = make([]*pb.ControlPrune, 0, len(pruning))
|
prune = make([]*pb.ControlPrune, 0, len(pruning))
|
||||||
for _, topic := range pruning {
|
for _, topic := range pruning {
|
||||||
prune = append(prune, gs.makePrune(p, topic, !noPX[p]))
|
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -931,7 +959,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string,
|
||||||
for p, topics := range toprune {
|
for p, topics := range toprune {
|
||||||
prune := make([]*pb.ControlPrune, 0, len(topics))
|
prune := make([]*pb.ControlPrune, 0, len(topics))
|
||||||
for _, topic := range topics {
|
for _, topic := range topics {
|
||||||
prune = append(prune, gs.makePrune(p, topic, !noPX[p]))
|
prune = append(prune, gs.makePrune(p, topic, gs.doPX && !noPX[p]))
|
||||||
}
|
}
|
||||||
|
|
||||||
out := rpcWithControl(nil, nil, nil, nil, prune)
|
out := rpcWithControl(nil, nil, nil, nil, prune)
|
||||||
|
|
|
@ -932,7 +932,7 @@ func TestGossipsubStarTopology(t *testing.T) {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
hosts := getNetHosts(t, ctx, 20)
|
hosts := getNetHosts(t, ctx, 20)
|
||||||
psubs := getGossipsubs(ctx, hosts)
|
psubs := getGossipsubs(ctx, hosts, WithPeerExchange(true))
|
||||||
|
|
||||||
// add all peer addresses to the peerstores
|
// add all peer addresses to the peerstores
|
||||||
// this is necessary because we can't have signed address records witout identify
|
// this is necessary because we can't have signed address records witout identify
|
||||||
|
|
Loading…
Reference in New Issue