From 2fdf6cdaedf3cdd447fa4acbf13901a0d6496d0b Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 10 Apr 2020 14:21:10 +0300 Subject: [PATCH] opportunistic grafting --- gossipsub.go | 68 ++++++++++++++++++++++++++++++++++++++++++++----- score_params.go | 9 ++++++- 2 files changed, 70 insertions(+), 7 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 3e079e9..e488ac2 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -63,6 +63,10 @@ var ( // Number of heartbeat ticks for attempting to reconnect direct peers that are not // currently connected GossipSubDirectConnectTicks uint64 = 300 + + // Number of heartbeat ticks for attempting to improve the mesh with opportunistic + // grafting + GossipSubOpportunisticGraftTicks uint64 = 60 ) // NewGossipSub returns a new PubSub object using GossipSubRouter as the router. @@ -106,6 +110,7 @@ func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Opt gs.publishThreshold = thresholds.PublishThreshold gs.graylistThreshold = thresholds.GraylistThreshold gs.acceptPXThreshold = thresholds.AcceptPXThreshold + gs.opportunisticGraftThreshold = thresholds.OpportunisticGraftThreshold // hook the tracer if ps.tracer != nil { @@ -216,6 +221,9 @@ type GossipSubRouter struct { // threshold for peer score before we graylist the peer and silently ignore its RPCs graylistThreshold float64 + // threshold for media peer score before triggering opportunistic grafting + opportunisticGraftThreshold float64 + // whether to use flood publishing floodPublish bool @@ -858,6 +866,15 @@ func (gs *GossipSubRouter) heartbeat() { toprune[p] = append(topics, topic) } + graftPeer := func(p peer.ID) { + log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic) + gs.tracer.Graft(p, topic) + peers[p] = struct{}{} + gs.tagPeer(p, topic) + topics := tograft[p] + tograft[p] = append(topics, topic) + } + // compute mesh peer scores scores := make(map[peer.ID]float64) for p := range peers { @@ -886,12 +903,7 @@ func (gs *GossipSubRouter) heartbeat() { }) for _, p := range plst { - log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic) - gs.tracer.Graft(p, topic) - peers[p] = struct{}{} - gs.tagPeer(p, topic) - topics := tograft[p] - tograft[p] = append(topics, topic) + graftPeer(p) } } @@ -913,6 +925,50 @@ func (gs *GossipSubRouter) heartbeat() { } } + // should we try to improve the mesh with opportunistic grafting? + if gs.heartbeatTicks%GossipSubOpportunisticGraftTicks == 0 && len(peers) > 1 { + // Opportunistic grafting works as follows: we check the median score of peers in the + // mesh; if this score is below the opportunisticGraftThreshold, we select a peer at + // random with score over the median. + // The intention is to (slowly) improve an underperforming mesh by introducing good + // scoring peers that may have been gossiping at us. This allows us to get out of sticky + // situations where we are stuck with poor peers and also recover from churn of good peers. + + // first cache scores for new peers that may have been added since the initial score computation + for p := range peers { + _, haveScore := scores[p] + if haveScore { + continue + } + scores[p] = gs.score.Score(p) + } + + // now compute the median peer score in the mesh + plst := peerMapToList(peers) + sort.Slice(plst, func(i, j int) bool { + return scores[plst[i]] < scores[plst[j]] + }) + medianIndex := len(peers) / 2 + medianScore := scores[plst[medianIndex]] + + // if the media score is below the threshold, select a better peer (if any) and GRAFT + if medianScore < gs.opportunisticGraftThreshold { + backoff := gs.backoff[topic] + plst = gs.getPeers(topic, 1, func(p peer.ID) bool { + _, inMesh := peers[p] + _, doBackoff := backoff[p] + _, direct := gs.direct[p] + return !inMesh && !doBackoff && !direct && gs.score.Score(p) > medianScore + }) + + if len(plst) != 0 { + p := plst[0] + log.Debugf("HEARTBEAT: Opportunistically graft peer %s on topic %s", p, topic) + graftPeer(p) + } + } + } + // 2nd arg are mesh peers excluded from gossip. We already push // messages to them, so its redundant to gossip IHAVEs. gs.emitGossip(topic, peers) diff --git a/score_params.go b/score_params.go index 0e537a3..65a1919 100644 --- a/score_params.go +++ b/score_params.go @@ -20,9 +20,13 @@ type PeerScoreThresholds struct { // implementing an effective graylist according to peer score; should be negative and <= PublisThreshold. GraylistThreshold float64 - // acceptPXThreshold is the score threshold below which PX will be ignored; this should be positive + // AcceptPXThreshold is the score threshold below which PX will be ignored; this should be positive // and limited to scores attainable by bootstrappers and other trusted nodes. AcceptPXThreshold float64 + + // OpportunisticGraftThreshold is the median mesh score threshold before triggering opportunistic + // grafting; this should have a small positive value. + OpportunisticGraftThreshold float64 } func (p *PeerScoreThresholds) validate() error { @@ -38,6 +42,9 @@ func (p *PeerScoreThresholds) validate() error { if p.AcceptPXThreshold < 0 { return fmt.Errorf("invalid accept PX threshold; it must be >= 0") } + if p.OpportunisticGraftThreshold < 0 { + return fmt.Errorf("invalid opportunistic grafting threshold; it must be >= 0") + } return nil }