mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 21:03:07 +00:00
opportunistic grafting
This commit is contained in:
parent
8e969ba29e
commit
2fdf6cdaed
68
gossipsub.go
68
gossipsub.go
@ -63,6 +63,10 @@ var (
|
||||
// Number of heartbeat ticks for attempting to reconnect direct peers that are not
|
||||
// currently connected
|
||||
GossipSubDirectConnectTicks uint64 = 300
|
||||
|
||||
// Number of heartbeat ticks for attempting to improve the mesh with opportunistic
|
||||
// grafting
|
||||
GossipSubOpportunisticGraftTicks uint64 = 60
|
||||
)
|
||||
|
||||
// NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
|
||||
@ -106,6 +110,7 @@ func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Opt
|
||||
gs.publishThreshold = thresholds.PublishThreshold
|
||||
gs.graylistThreshold = thresholds.GraylistThreshold
|
||||
gs.acceptPXThreshold = thresholds.AcceptPXThreshold
|
||||
gs.opportunisticGraftThreshold = thresholds.OpportunisticGraftThreshold
|
||||
|
||||
// hook the tracer
|
||||
if ps.tracer != nil {
|
||||
@ -216,6 +221,9 @@ type GossipSubRouter struct {
|
||||
// threshold for peer score before we graylist the peer and silently ignore its RPCs
|
||||
graylistThreshold float64
|
||||
|
||||
// threshold for media peer score before triggering opportunistic grafting
|
||||
opportunisticGraftThreshold float64
|
||||
|
||||
// whether to use flood publishing
|
||||
floodPublish bool
|
||||
|
||||
@ -858,6 +866,15 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
toprune[p] = append(topics, topic)
|
||||
}
|
||||
|
||||
graftPeer := func(p peer.ID) {
|
||||
log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
|
||||
gs.tracer.Graft(p, topic)
|
||||
peers[p] = struct{}{}
|
||||
gs.tagPeer(p, topic)
|
||||
topics := tograft[p]
|
||||
tograft[p] = append(topics, topic)
|
||||
}
|
||||
|
||||
// compute mesh peer scores
|
||||
scores := make(map[peer.ID]float64)
|
||||
for p := range peers {
|
||||
@ -886,12 +903,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
})
|
||||
|
||||
for _, p := range plst {
|
||||
log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
|
||||
gs.tracer.Graft(p, topic)
|
||||
peers[p] = struct{}{}
|
||||
gs.tagPeer(p, topic)
|
||||
topics := tograft[p]
|
||||
tograft[p] = append(topics, topic)
|
||||
graftPeer(p)
|
||||
}
|
||||
}
|
||||
|
||||
@ -913,6 +925,50 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
}
|
||||
}
|
||||
|
||||
// should we try to improve the mesh with opportunistic grafting?
|
||||
if gs.heartbeatTicks%GossipSubOpportunisticGraftTicks == 0 && len(peers) > 1 {
|
||||
// Opportunistic grafting works as follows: we check the median score of peers in the
|
||||
// mesh; if this score is below the opportunisticGraftThreshold, we select a peer at
|
||||
// random with score over the median.
|
||||
// The intention is to (slowly) improve an underperforming mesh by introducing good
|
||||
// scoring peers that may have been gossiping at us. This allows us to get out of sticky
|
||||
// situations where we are stuck with poor peers and also recover from churn of good peers.
|
||||
|
||||
// first cache scores for new peers that may have been added since the initial score computation
|
||||
for p := range peers {
|
||||
_, haveScore := scores[p]
|
||||
if haveScore {
|
||||
continue
|
||||
}
|
||||
scores[p] = gs.score.Score(p)
|
||||
}
|
||||
|
||||
// now compute the median peer score in the mesh
|
||||
plst := peerMapToList(peers)
|
||||
sort.Slice(plst, func(i, j int) bool {
|
||||
return scores[plst[i]] < scores[plst[j]]
|
||||
})
|
||||
medianIndex := len(peers) / 2
|
||||
medianScore := scores[plst[medianIndex]]
|
||||
|
||||
// if the media score is below the threshold, select a better peer (if any) and GRAFT
|
||||
if medianScore < gs.opportunisticGraftThreshold {
|
||||
backoff := gs.backoff[topic]
|
||||
plst = gs.getPeers(topic, 1, func(p peer.ID) bool {
|
||||
_, inMesh := peers[p]
|
||||
_, doBackoff := backoff[p]
|
||||
_, direct := gs.direct[p]
|
||||
return !inMesh && !doBackoff && !direct && gs.score.Score(p) > medianScore
|
||||
})
|
||||
|
||||
if len(plst) != 0 {
|
||||
p := plst[0]
|
||||
log.Debugf("HEARTBEAT: Opportunistically graft peer %s on topic %s", p, topic)
|
||||
graftPeer(p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 2nd arg are mesh peers excluded from gossip. We already push
|
||||
// messages to them, so its redundant to gossip IHAVEs.
|
||||
gs.emitGossip(topic, peers)
|
||||
|
||||
@ -20,9 +20,13 @@ type PeerScoreThresholds struct {
|
||||
// implementing an effective graylist according to peer score; should be negative and <= PublisThreshold.
|
||||
GraylistThreshold float64
|
||||
|
||||
// acceptPXThreshold is the score threshold below which PX will be ignored; this should be positive
|
||||
// AcceptPXThreshold is the score threshold below which PX will be ignored; this should be positive
|
||||
// and limited to scores attainable by bootstrappers and other trusted nodes.
|
||||
AcceptPXThreshold float64
|
||||
|
||||
// OpportunisticGraftThreshold is the median mesh score threshold before triggering opportunistic
|
||||
// grafting; this should have a small positive value.
|
||||
OpportunisticGraftThreshold float64
|
||||
}
|
||||
|
||||
func (p *PeerScoreThresholds) validate() error {
|
||||
@ -38,6 +42,9 @@ func (p *PeerScoreThresholds) validate() error {
|
||||
if p.AcceptPXThreshold < 0 {
|
||||
return fmt.Errorf("invalid accept PX threshold; it must be >= 0")
|
||||
}
|
||||
if p.OpportunisticGraftThreshold < 0 {
|
||||
return fmt.Errorf("invalid opportunistic grafting threshold; it must be >= 0")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user