mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-04 05:43:06 +00:00
cache scores throughout the heartbeat
This commit is contained in:
parent
7890c5a458
commit
09e6d9e1e3
44
gossipsub.go
44
gossipsub.go
@ -1146,6 +1146,17 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
// ensure direct peers are connected
|
// ensure direct peers are connected
|
||||||
gs.directConnect()
|
gs.directConnect()
|
||||||
|
|
||||||
|
// cache scores throughout the heartbeat
|
||||||
|
scores := make(map[peer.ID]float64)
|
||||||
|
score := func(p peer.ID) float64 {
|
||||||
|
s, ok := scores[p]
|
||||||
|
if !ok {
|
||||||
|
s = gs.score.Score(p)
|
||||||
|
scores[p] = s
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
// maintain the mesh for topics we have joined
|
// maintain the mesh for topics we have joined
|
||||||
for topic, peers := range gs.mesh {
|
for topic, peers := range gs.mesh {
|
||||||
prunePeer := func(p peer.ID) {
|
prunePeer := func(p peer.ID) {
|
||||||
@ -1166,16 +1177,10 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
tograft[p] = append(topics, topic)
|
tograft[p] = append(topics, topic)
|
||||||
}
|
}
|
||||||
|
|
||||||
// compute mesh peer scores
|
|
||||||
scores := make(map[peer.ID]float64)
|
|
||||||
for p := range peers {
|
|
||||||
scores[p] = gs.score.Score(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
// drop all peers with negative score, without PX
|
// drop all peers with negative score, without PX
|
||||||
for p := range peers {
|
for p := range peers {
|
||||||
if scores[p] < 0 {
|
if score(p) < 0 {
|
||||||
log.Debugf("HEARTBEAT: Prune peer %s with negative score [score = %f, topic = %s]", p, scores[p], topic)
|
log.Debugf("HEARTBEAT: Prune peer %s with negative score [score = %f, topic = %s]", p, score(p), topic)
|
||||||
prunePeer(p)
|
prunePeer(p)
|
||||||
noPX[p] = true
|
noPX[p] = true
|
||||||
}
|
}
|
||||||
@ -1190,7 +1195,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
_, inMesh := peers[p]
|
_, inMesh := peers[p]
|
||||||
_, doBackoff := backoff[p]
|
_, doBackoff := backoff[p]
|
||||||
_, direct := gs.direct[p]
|
_, direct := gs.direct[p]
|
||||||
return !inMesh && !doBackoff && !direct && gs.score.Score(p) >= 0
|
return !inMesh && !doBackoff && !direct && score(p) >= 0
|
||||||
})
|
})
|
||||||
|
|
||||||
for _, p := range plst {
|
for _, p := range plst {
|
||||||
@ -1205,7 +1210,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
// sort by score (but shuffle first for the case we don't use the score)
|
// sort by score (but shuffle first for the case we don't use the score)
|
||||||
shufflePeers(plst)
|
shufflePeers(plst)
|
||||||
sort.Slice(plst, func(i, j int) bool {
|
sort.Slice(plst, func(i, j int) bool {
|
||||||
return scores[plst[i]] > scores[plst[j]]
|
return score(plst[i]) > score(plst[j])
|
||||||
})
|
})
|
||||||
|
|
||||||
// We keep the first D_score peers by score and the remaining up to D randomly
|
// We keep the first D_score peers by score and the remaining up to D randomly
|
||||||
@ -1262,7 +1267,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
_, inMesh := peers[p]
|
_, inMesh := peers[p]
|
||||||
_, doBackoff := backoff[p]
|
_, doBackoff := backoff[p]
|
||||||
_, direct := gs.direct[p]
|
_, direct := gs.direct[p]
|
||||||
return !inMesh && !doBackoff && !direct && gs.outbound[p] && gs.score.Score(p) >= 0
|
return !inMesh && !doBackoff && !direct && gs.outbound[p] && score(p) >= 0
|
||||||
})
|
})
|
||||||
|
|
||||||
for _, p := range plst {
|
for _, p := range plst {
|
||||||
@ -1280,19 +1285,10 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
// scoring peers that may have been gossiping at us. This allows us to get out of sticky
|
// scoring peers that may have been gossiping at us. This allows us to get out of sticky
|
||||||
// situations where we are stuck with poor peers and also recover from churn of good peers.
|
// situations where we are stuck with poor peers and also recover from churn of good peers.
|
||||||
|
|
||||||
// first cache scores for new peers that may have been added since the initial score computation
|
|
||||||
for p := range peers {
|
|
||||||
_, haveScore := scores[p]
|
|
||||||
if haveScore {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
scores[p] = gs.score.Score(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
// now compute the median peer score in the mesh
|
// now compute the median peer score in the mesh
|
||||||
plst := peerMapToList(peers)
|
plst := peerMapToList(peers)
|
||||||
sort.Slice(plst, func(i, j int) bool {
|
sort.Slice(plst, func(i, j int) bool {
|
||||||
return scores[plst[i]] < scores[plst[j]]
|
return score(plst[i]) < score(plst[j])
|
||||||
})
|
})
|
||||||
medianIndex := len(peers) / 2
|
medianIndex := len(peers) / 2
|
||||||
medianScore := scores[plst[medianIndex]]
|
medianScore := scores[plst[medianIndex]]
|
||||||
@ -1304,7 +1300,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
_, inMesh := peers[p]
|
_, inMesh := peers[p]
|
||||||
_, doBackoff := backoff[p]
|
_, doBackoff := backoff[p]
|
||||||
_, direct := gs.direct[p]
|
_, direct := gs.direct[p]
|
||||||
return !inMesh && !doBackoff && !direct && gs.score.Score(p) > medianScore
|
return !inMesh && !doBackoff && !direct && score(p) > medianScore
|
||||||
})
|
})
|
||||||
|
|
||||||
for _, p := range plst {
|
for _, p := range plst {
|
||||||
@ -1333,7 +1329,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
// check whether our peers are still in the topic and have a score above the publish threshold
|
// check whether our peers are still in the topic and have a score above the publish threshold
|
||||||
for p := range peers {
|
for p := range peers {
|
||||||
_, ok := gs.p.topics[topic][p]
|
_, ok := gs.p.topics[topic][p]
|
||||||
if !ok || gs.score.Score(p) < gs.publishThreshold {
|
if !ok || score(p) < gs.publishThreshold {
|
||||||
delete(peers, p)
|
delete(peers, p)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1345,7 +1341,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
// filter our current and direct peers and peers with score above the publish threshold
|
// filter our current and direct peers and peers with score above the publish threshold
|
||||||
_, inFanout := peers[p]
|
_, inFanout := peers[p]
|
||||||
_, direct := gs.direct[p]
|
_, direct := gs.direct[p]
|
||||||
return !inFanout && !direct && gs.score.Score(p) >= gs.publishThreshold
|
return !inFanout && !direct && score(p) >= gs.publishThreshold
|
||||||
})
|
})
|
||||||
|
|
||||||
for _, p := range plst {
|
for _, p := range plst {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user