mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-03 05:13:07 +00:00
hook score into heartbeat maintenance
This commit is contained in:
parent
487bbaf09f
commit
80f3b8c45b
73
gossipsub.go
73
gossipsub.go
@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"github.com/libp2p/go-libp2p-core/record"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
@ -24,9 +25,10 @@ const (
|
||||
|
||||
var (
|
||||
// overlay parameters
|
||||
GossipSubD = 6
|
||||
GossipSubDlo = 4
|
||||
GossipSubDhi = 12
|
||||
GossipSubD = 6
|
||||
GossipSubDlo = 5
|
||||
GossipSubDhi = 12
|
||||
GossipSubDscore = 3
|
||||
|
||||
// gossip parameters
|
||||
GossipSubHistoryLength = 5
|
||||
@ -649,23 +651,46 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
|
||||
tograft := make(map[peer.ID][]string)
|
||||
toprune := make(map[peer.ID][]string)
|
||||
doPX := make(map[peer.ID]bool)
|
||||
noPX := make(map[peer.ID]bool)
|
||||
|
||||
// clean up expired backoffs
|
||||
gs.clearBackoff()
|
||||
|
||||
// maintain the mesh for topics we have joined
|
||||
for topic, peers := range gs.mesh {
|
||||
prunePeer := func(p peer.ID) {
|
||||
gs.tracer.Prune(p, topic)
|
||||
delete(peers, p)
|
||||
gs.untagPeer(p, topic)
|
||||
topics := toprune[p]
|
||||
toprune[p] = append(topics, topic)
|
||||
}
|
||||
|
||||
// compute mesh peer scores
|
||||
scores := make(map[peer.ID]float64)
|
||||
for p := range peers {
|
||||
scores[p] = gs.score.Score(p)
|
||||
}
|
||||
|
||||
// drop all peers with negative score, without PX
|
||||
for p := range peers {
|
||||
if scores[p] < 0 {
|
||||
log.Debugf("HEARTBEAT: Prune peer %s with negative score [score = %f, topic = %s]", p, scores[p], topic)
|
||||
prunePeer(p)
|
||||
noPX[p] = true
|
||||
}
|
||||
}
|
||||
|
||||
// do we have enough peers?
|
||||
if len(peers) < GossipSubDlo {
|
||||
backoff := gs.backoff[topic]
|
||||
ineed := GossipSubD - len(peers)
|
||||
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
|
||||
// filter our current peers and peers we are backing off
|
||||
// filter our current peers, peers we are backing off, and peers with negative score
|
||||
_, inMesh := peers[p]
|
||||
_, doBackoff := backoff[p]
|
||||
return !inMesh && !doBackoff
|
||||
score := gs.score.Score(p)
|
||||
return !inMesh && !doBackoff && score >= 0
|
||||
})
|
||||
|
||||
for _, p := range plst {
|
||||
@ -680,17 +705,19 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
|
||||
// do we have too many peers?
|
||||
if len(peers) > GossipSubDhi {
|
||||
idontneed := len(peers) - GossipSubD
|
||||
plst := peerMapToList(peers)
|
||||
shufflePeers(plst)
|
||||
|
||||
for _, p := range plst[:idontneed] {
|
||||
// sort by score (but shuffle first for the case we don't use the score)
|
||||
shufflePeers(plst)
|
||||
sort.Slice(plst, func(i, j int) bool {
|
||||
return scores[plst[i]] > scores[plst[j]]
|
||||
})
|
||||
|
||||
// We keep the first D_score peers by score and the remaining up to D_lo randomly
|
||||
shufflePeers(plst[GossipSubDscore:])
|
||||
for _, p := range plst[GossipSubDlo:] {
|
||||
log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
|
||||
gs.tracer.Prune(p, topic)
|
||||
delete(peers, p)
|
||||
gs.untagPeer(p, topic)
|
||||
topics := toprune[p]
|
||||
toprune[p] = append(topics, topic)
|
||||
prunePeer(p)
|
||||
}
|
||||
}
|
||||
|
||||
@ -722,9 +749,10 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
if len(peers) < GossipSubD {
|
||||
ineed := GossipSubD - len(peers)
|
||||
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
|
||||
// filter our current peers
|
||||
// filter our current peers and peers with negative score
|
||||
_, ok := peers[p]
|
||||
return !ok
|
||||
score := gs.score.Score(p)
|
||||
return !ok && score >= 0
|
||||
})
|
||||
|
||||
for _, p := range plst {
|
||||
@ -738,7 +766,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
}
|
||||
|
||||
// send coalesced GRAFT/PRUNE messages (will piggyback gossip)
|
||||
gs.sendGraftPrune(tograft, toprune, doPX)
|
||||
gs.sendGraftPrune(tograft, toprune, noPX)
|
||||
|
||||
// advance the message history window
|
||||
gs.mcache.Shift()
|
||||
@ -758,7 +786,7 @@ func (gs *GossipSubRouter) clearBackoff() {
|
||||
}
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, doPX map[peer.ID]bool) {
|
||||
func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, noPX map[peer.ID]bool) {
|
||||
for p, topics := range tograft {
|
||||
graft := make([]*pb.ControlGraft, 0, len(topics))
|
||||
for _, topic := range topics {
|
||||
@ -771,7 +799,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string,
|
||||
delete(toprune, p)
|
||||
prune = make([]*pb.ControlPrune, 0, len(pruning))
|
||||
for _, topic := range pruning {
|
||||
prune = append(prune, gs.makePrune(p, topic, doPX[p]))
|
||||
prune = append(prune, gs.makePrune(p, topic, !noPX[p]))
|
||||
}
|
||||
}
|
||||
|
||||
@ -782,7 +810,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string,
|
||||
for p, topics := range toprune {
|
||||
prune := make([]*pb.ControlPrune, 0, len(topics))
|
||||
for _, topic := range topics {
|
||||
prune = append(prune, gs.makePrune(p, topic, doPX[p]))
|
||||
prune = append(prune, gs.makePrune(p, topic, !noPX[p]))
|
||||
}
|
||||
|
||||
out := rpcWithControl(nil, nil, nil, nil, prune)
|
||||
@ -799,10 +827,11 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}
|
||||
return
|
||||
}
|
||||
|
||||
// Send gossip to D peers, skipping over the exclude set.
|
||||
// Send gossip to D peers, skipping over the exclude set and peers with score below the threshold
|
||||
gpeers := gs.getPeers(topic, GossipSubD, func(p peer.ID) bool {
|
||||
_, ok := exclude[p]
|
||||
return !ok
|
||||
score := gs.score.Score(p)
|
||||
return !ok && score >= gs.gossipThreshold
|
||||
})
|
||||
|
||||
// Emit the IHAVE gossip to the selected peers.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user