diff --git a/gossipsub.go b/gossipsub.go index a183f18..9de15d6 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -1146,6 +1146,17 @@ func (gs *GossipSubRouter) heartbeat() { // ensure direct peers are connected gs.directConnect() + // cache scores throughout the heartbeat + scores := make(map[peer.ID]float64) + score := func(p peer.ID) float64 { + s, ok := scores[p] + if !ok { + s = gs.score.Score(p) + scores[p] = s + } + return s + } + // maintain the mesh for topics we have joined for topic, peers := range gs.mesh { prunePeer := func(p peer.ID) { @@ -1166,16 +1177,10 @@ func (gs *GossipSubRouter) heartbeat() { tograft[p] = append(topics, topic) } - // compute mesh peer scores - scores := make(map[peer.ID]float64) - for p := range peers { - scores[p] = gs.score.Score(p) - } - // drop all peers with negative score, without PX for p := range peers { - if scores[p] < 0 { - log.Debugf("HEARTBEAT: Prune peer %s with negative score [score = %f, topic = %s]", p, scores[p], topic) + if score(p) < 0 { + log.Debugf("HEARTBEAT: Prune peer %s with negative score [score = %f, topic = %s]", p, score(p), topic) prunePeer(p) noPX[p] = true } @@ -1190,7 +1195,7 @@ func (gs *GossipSubRouter) heartbeat() { _, inMesh := peers[p] _, doBackoff := backoff[p] _, direct := gs.direct[p] - return !inMesh && !doBackoff && !direct && gs.score.Score(p) >= 0 + return !inMesh && !doBackoff && !direct && score(p) >= 0 }) for _, p := range plst { @@ -1205,7 +1210,7 @@ func (gs *GossipSubRouter) heartbeat() { // sort by score (but shuffle first for the case we don't use the score) shufflePeers(plst) sort.Slice(plst, func(i, j int) bool { - return scores[plst[i]] > scores[plst[j]] + return score(plst[i]) > score(plst[j]) }) // We keep the first D_score peers by score and the remaining up to D randomly @@ -1262,7 +1267,7 @@ func (gs *GossipSubRouter) heartbeat() { _, inMesh := peers[p] _, doBackoff := backoff[p] _, direct := gs.direct[p] - return !inMesh && !doBackoff && !direct && gs.outbound[p] && gs.score.Score(p) >= 0 + return !inMesh && !doBackoff && !direct && gs.outbound[p] && score(p) >= 0 }) for _, p := range plst { @@ -1280,19 +1285,10 @@ func (gs *GossipSubRouter) heartbeat() { // scoring peers that may have been gossiping at us. This allows us to get out of sticky // situations where we are stuck with poor peers and also recover from churn of good peers. - // first cache scores for new peers that may have been added since the initial score computation - for p := range peers { - _, haveScore := scores[p] - if haveScore { - continue - } - scores[p] = gs.score.Score(p) - } - // now compute the median peer score in the mesh plst := peerMapToList(peers) sort.Slice(plst, func(i, j int) bool { - return scores[plst[i]] < scores[plst[j]] + return score(plst[i]) < score(plst[j]) }) medianIndex := len(peers) / 2 medianScore := scores[plst[medianIndex]] @@ -1304,7 +1300,7 @@ func (gs *GossipSubRouter) heartbeat() { _, inMesh := peers[p] _, doBackoff := backoff[p] _, direct := gs.direct[p] - return !inMesh && !doBackoff && !direct && gs.score.Score(p) > medianScore + return !inMesh && !doBackoff && !direct && score(p) > medianScore }) for _, p := range plst { @@ -1333,7 +1329,7 @@ func (gs *GossipSubRouter) heartbeat() { // check whether our peers are still in the topic and have a score above the publish threshold for p := range peers { _, ok := gs.p.topics[topic][p] - if !ok || gs.score.Score(p) < gs.publishThreshold { + if !ok || score(p) < gs.publishThreshold { delete(peers, p) } } @@ -1345,7 +1341,7 @@ func (gs *GossipSubRouter) heartbeat() { // filter our current and direct peers and peers with score above the publish threshold _, inFanout := peers[p] _, direct := gs.direct[p] - return !inFanout && !direct && gs.score.Score(p) >= gs.publishThreshold + return !inFanout && !direct && score(p) >= gs.publishThreshold }) for _, p := range plst {