From 80f3b8c45b6f9690b0ccd02aa56fd47af16ac91c Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 2 Mar 2020 15:55:25 +0200 Subject: [PATCH] hook score into heartbeat maintenance --- gossipsub.go | 73 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 22 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index e86c33b..4699978 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/libp2p/go-libp2p-core/record" "math/rand" + "sort" "time" pb "github.com/libp2p/go-libp2p-pubsub/pb" @@ -24,9 +25,10 @@ const ( var ( // overlay parameters - GossipSubD = 6 - GossipSubDlo = 4 - GossipSubDhi = 12 + GossipSubD = 6 + GossipSubDlo = 5 + GossipSubDhi = 12 + GossipSubDscore = 3 // gossip parameters GossipSubHistoryLength = 5 @@ -649,23 +651,46 @@ func (gs *GossipSubRouter) heartbeat() { tograft := make(map[peer.ID][]string) toprune := make(map[peer.ID][]string) - doPX := make(map[peer.ID]bool) + noPX := make(map[peer.ID]bool) // clean up expired backoffs gs.clearBackoff() // maintain the mesh for topics we have joined for topic, peers := range gs.mesh { + prunePeer := func(p peer.ID) { + gs.tracer.Prune(p, topic) + delete(peers, p) + gs.untagPeer(p, topic) + topics := toprune[p] + toprune[p] = append(topics, topic) + } + + // compute mesh peer scores + scores := make(map[peer.ID]float64) + for p := range peers { + scores[p] = gs.score.Score(p) + } + + // drop all peers with negative score, without PX + for p := range peers { + if scores[p] < 0 { + log.Debugf("HEARTBEAT: Prune peer %s with negative score [score = %f, topic = %s]", p, scores[p], topic) + prunePeer(p) + noPX[p] = true + } + } // do we have enough peers? if len(peers) < GossipSubDlo { backoff := gs.backoff[topic] ineed := GossipSubD - len(peers) plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { - // filter our current peers and peers we are backing off + // filter our current peers, peers we are backing off, and peers with negative score _, inMesh := peers[p] _, doBackoff := backoff[p] - return !inMesh && !doBackoff + score := gs.score.Score(p) + return !inMesh && !doBackoff && score >= 0 }) for _, p := range plst { @@ -680,17 +705,19 @@ func (gs *GossipSubRouter) heartbeat() { // do we have too many peers? if len(peers) > GossipSubDhi { - idontneed := len(peers) - GossipSubD plst := peerMapToList(peers) - shufflePeers(plst) - for _, p := range plst[:idontneed] { + // sort by score (but shuffle first for the case we don't use the score) + shufflePeers(plst) + sort.Slice(plst, func(i, j int) bool { + return scores[plst[i]] > scores[plst[j]] + }) + + // We keep the first D_score peers by score and the remaining up to D_lo randomly + shufflePeers(plst[GossipSubDscore:]) + for _, p := range plst[GossipSubDlo:] { log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic) - gs.tracer.Prune(p, topic) - delete(peers, p) - gs.untagPeer(p, topic) - topics := toprune[p] - toprune[p] = append(topics, topic) + prunePeer(p) } } @@ -722,9 +749,10 @@ func (gs *GossipSubRouter) heartbeat() { if len(peers) < GossipSubD { ineed := GossipSubD - len(peers) plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { - // filter our current peers + // filter our current peers and peers with negative score _, ok := peers[p] - return !ok + score := gs.score.Score(p) + return !ok && score >= 0 }) for _, p := range plst { @@ -738,7 +766,7 @@ func (gs *GossipSubRouter) heartbeat() { } // send coalesced GRAFT/PRUNE messages (will piggyback gossip) - gs.sendGraftPrune(tograft, toprune, doPX) + gs.sendGraftPrune(tograft, toprune, noPX) // advance the message history window gs.mcache.Shift() @@ -758,7 +786,7 @@ func (gs *GossipSubRouter) clearBackoff() { } } -func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, doPX map[peer.ID]bool) { +func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, noPX map[peer.ID]bool) { for p, topics := range tograft { graft := make([]*pb.ControlGraft, 0, len(topics)) for _, topic := range topics { @@ -771,7 +799,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, delete(toprune, p) prune = make([]*pb.ControlPrune, 0, len(pruning)) for _, topic := range pruning { - prune = append(prune, gs.makePrune(p, topic, doPX[p])) + prune = append(prune, gs.makePrune(p, topic, !noPX[p])) } } @@ -782,7 +810,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, for p, topics := range toprune { prune := make([]*pb.ControlPrune, 0, len(topics)) for _, topic := range topics { - prune = append(prune, gs.makePrune(p, topic, doPX[p])) + prune = append(prune, gs.makePrune(p, topic, !noPX[p])) } out := rpcWithControl(nil, nil, nil, nil, prune) @@ -799,10 +827,11 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{} return } - // Send gossip to D peers, skipping over the exclude set. + // Send gossip to D peers, skipping over the exclude set and peers with score below the threshold gpeers := gs.getPeers(topic, GossipSubD, func(p peer.ID) bool { _, ok := exclude[p] - return !ok + score := gs.score.Score(p) + return !ok && score >= gs.gossipThreshold }) // Emit the IHAVE gossip to the selected peers.