comments and nits.

This commit is contained in:
Raúl Kripalani 2020-04-01 23:20:53 +01:00 committed by vyzo
parent 8809484a47
commit fc38f556a3
3 changed files with 51 additions and 18 deletions

View File

@ -381,7 +381,7 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
// we don't respond to IWANT requests from any peer whose score is below the gossip threshold // we don't respond to IWANT requests from any peer whose score is below the gossip threshold
score := gs.score.Score(p) score := gs.score.Score(p)
if score < gs.gossipThreshold { if score < gs.gossipThreshold {
log.Debugf("IWANT: ignorin peer %s with score below threshold [score = %f]", p, score) log.Debugf("IWANT: ignoring peer %s with score below threshold [score = %f]", p, score)
return nil return nil
} }
@ -429,7 +429,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
if !ok { if !ok {
// don't do PX when there is an unknown topic to avoid leaking our peers // don't do PX when there is an unknown topic to avoid leaking our peers
doPX = false doPX = false
// spam harndening: ignore GRAFTs for unknown topics // spam hardening: ignore GRAFTs for unknown topics
continue continue
} }
@ -467,11 +467,10 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
continue continue
} }
log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic) log.Debugf("GRAFT: add mesh link from %s in %s", p, topic)
gs.tracer.Graft(p, topic) gs.tracer.Graft(p, topic)
peers[p] = struct{}{} peers[p] = struct{}{}
gs.tagPeer(p, topic) gs.tagPeer(p, topic)
} }
if len(prune) == 0 { if len(prune) == 0 {
@ -875,9 +874,9 @@ func (gs *GossipSubRouter) heartbeat() {
} }
// do we have enough peers? // do we have enough peers?
if len(peers) < GossipSubDlo { if l := len(peers); l < GossipSubDlo {
backoff := gs.backoff[topic] backoff := gs.backoff[topic]
ineed := GossipSubD - len(peers) ineed := GossipSubD - l
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
// filter our current and direct peers, peers we are backing off, and peers with negative score // filter our current and direct peers, peers we are backing off, and peers with negative score
_, inMesh := peers[p] _, inMesh := peers[p]

View File

@ -64,7 +64,7 @@ type peerScore struct {
// per peer stats for score calculation // per peer stats for score calculation
peerStats map[peer.ID]*peerStats peerStats map[peer.ID]*peerStats
// IP colocation tracking // IP colocation tracking; maps IP => set of peers.
peerIPs map[string]map[peer.ID]struct{} peerIPs map[string]map[peer.ID]struct{}
// message delivery tracking // message delivery tracking
@ -78,6 +78,8 @@ type peerScore struct {
inspectPeriod time.Duration inspectPeriod time.Duration
} }
var _ scoreTracer = (*peerScore)(nil)
type messageDeliveries struct { type messageDeliveries struct {
records map[string]*deliveryRecord records map[string]*deliveryRecord
@ -173,9 +175,6 @@ func (ps *peerScore) score(p peer.ID) float64 {
// topic scores // topic scores
for topic, tstats := range pstats.topics { for topic, tstats := range pstats.topics {
// the topic score
var topicScore float64
// the topic parameters // the topic parameters
topicParams, ok := ps.params.Topics[topic] topicParams, ok := ps.params.Topics[topic]
if !ok { if !ok {
@ -183,6 +182,9 @@ func (ps *peerScore) score(p peer.ID) float64 {
continue continue
} }
// the topic score
var topicScore float64
// P1: time in Mesh // P1: time in Mesh
if tstats.inMesh { if tstats.inMesh {
p1 := float64(tstats.meshTime / topicParams.TimeInMeshQuantum) p1 := float64(tstats.meshTime / topicParams.TimeInMeshQuantum)
@ -206,10 +208,12 @@ func (ps *peerScore) score(p peer.ID) float64 {
} }
// P3b: // P3b:
// NOTE: the weight of P3b is negative (validated in TopicScoreParams.validate), so this detracts.
p3b := tstats.meshFailurePenalty p3b := tstats.meshFailurePenalty
topicScore += p3b * topicParams.MeshFailurePenaltyWeight topicScore += p3b * topicParams.MeshFailurePenaltyWeight
// P4: invalid messages // P4: invalid messages
// NOTE: the weight of P4 is negative (validated in TopicScoreParams.validate), so this detracts.
p4 := tstats.invalidMessageDeliveries p4 := tstats.invalidMessageDeliveries
topicScore += p4 * topicParams.InvalidMessageDeliveriesWeight topicScore += p4 * topicParams.InvalidMessageDeliveriesWeight
@ -233,6 +237,10 @@ func (ps *peerScore) score(p peer.ID) float64 {
continue continue
} }
// P6 has a cliff (IPColocationFactorThreshold); it's only applied iff
// at least that many peers are connected to us from that source IP
// addr. It is quadratic, and the weight is negative (validated by
// PeerScoreParams.validate).
peersInIP := len(ps.peerIPs[ip]) peersInIP := len(ps.peerIPs[ip])
if peersInIP > ps.params.IPColocationFactorThreshold { if peersInIP > ps.params.IPColocationFactorThreshold {
surpluss := float64(peersInIP - ps.params.IPColocationFactorThreshold) surpluss := float64(peersInIP - ps.params.IPColocationFactorThreshold)
@ -284,6 +292,7 @@ func (ps *peerScore) background(ctx context.Context) {
} }
} }
// inspectScores dumps all tracked scores into the inspect function.
func (ps *peerScore) inspectScores() { func (ps *peerScore) inspectScores() {
ps.Lock() ps.Lock()
scores := make(map[peer.ID]float64, len(ps.peerStats)) scores := make(map[peer.ID]float64, len(ps.peerStats))
@ -292,9 +301,15 @@ func (ps *peerScore) inspectScores() {
} }
ps.Unlock() ps.Unlock()
ps.inspect(scores) // Since this is a user-injected function, it could be performing I/O, and
// we don't want to block the scorer's background loop. Therefore, we launch
// it in a separate goroutine. If the function needs to synchronise, it
// should do so locally.
go ps.inspect(scores)
} }
// refreshScores decays scores, and purges score records for disconnected peers,
// once their expiry has elapsed.
func (ps *peerScore) refreshScores() { func (ps *peerScore) refreshScores() {
ps.Lock() ps.Lock()
defer ps.Unlock() defer ps.Unlock()
@ -352,11 +367,17 @@ func (ps *peerScore) refreshScores() {
} }
} }
// refreshIPs refreshes IPs we know of peers we're tracking.
func (ps *peerScore) refreshIPs() { func (ps *peerScore) refreshIPs() {
ps.Lock() ps.Lock()
defer ps.Unlock() defer ps.Unlock()
// peer IPs may change, so we periodically refresh them // peer IPs may change, so we periodically refresh them
//
// TODO: it could be more efficient to collect connections for all peers
// from the Network, populate a new map, and replace it in place. We are
// incurring in those allocs anyway, and maybe even in more, in the form of
// slices.
for p, pstats := range ps.peerStats { for p, pstats := range ps.peerStats {
if pstats.connected { if pstats.connected {
ips := ps.getIPs(p) ips := ps.getIPs(p)
@ -619,7 +640,9 @@ func (d *messageDeliveries) gc() {
} }
} }
// utilities // getTopicStats returns existing topic stats for a given a given (peer, topic)
// tuple, or initialises a new topicStats object and inserts it in the
// peerStats, iff the topic is scored.
func (pstats *peerStats) getTopicStats(topic string, params *PeerScoreParams) (*topicStats, bool) { func (pstats *peerStats) getTopicStats(topic string, params *PeerScoreParams) (*topicStats, bool) {
tstats, ok := pstats.topics[topic] tstats, ok := pstats.topics[topic]
if ok { if ok {
@ -637,6 +660,8 @@ func (pstats *peerStats) getTopicStats(topic string, params *PeerScoreParams) (*
return tstats, true return tstats, true
} }
// markInvalidMessageDelivery increments the "invalid message deliveries"
// counter for all scored topics the message is published in.
func (ps *peerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) { func (ps *peerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) {
pstats, ok := ps.peerStats[p] pstats, ok := ps.peerStats[p]
if !ok { if !ok {
@ -653,6 +678,9 @@ func (ps *peerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) {
} }
} }
// markFirstMessageDelivery increments the "first message deliveries" counter
// for all scored topics the message is published in, as well as the "mesh
// message deliveries" counter, if the peer is in the mesh for the topic.
func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) { func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) {
pstats, ok := ps.peerStats[p] pstats, ok := ps.peerStats[p]
if !ok { if !ok {
@ -683,6 +711,9 @@ func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) {
} }
} }
// markDuplicateMessageDelivery increments the "mesh message deliveries" counter
// for messages we've seen before, as long the message was received within the
// P3 window.
func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, validated time.Time) { func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, validated time.Time) {
var now time.Time var now time.Time
@ -705,14 +736,16 @@ func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, valid
continue continue
} }
tparams := ps.params.Topics[topic]
// check against the mesh delivery window -- if the validated time is passed as 0, then // check against the mesh delivery window -- if the validated time is passed as 0, then
// the message was received before we finished validation and thus falls within the mesh // the message was received before we finished validation and thus falls within the mesh
// delivery window. // delivery window.
if !validated.IsZero() && now.After(validated.Add(ps.params.Topics[topic].MeshMessageDeliveriesWindow)) { if !validated.IsZero() && now.After(validated.Add(tparams.MeshMessageDeliveriesWindow)) {
continue continue
} }
cap := ps.params.Topics[topic].MeshMessageDeliveriesCap cap := tparams.MeshMessageDeliveriesCap
tstats.meshMessageDeliveries += 1 tstats.meshMessageDeliveries += 1
if tstats.meshMessageDeliveries > cap { if tstats.meshMessageDeliveries > cap {
tstats.meshMessageDeliveries = cap tstats.meshMessageDeliveries = cap
@ -720,7 +753,7 @@ func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, valid
} }
} }
// gets the current IPs for a peer // getIPs gets the current IPs for a peer.
func (ps *peerScore) getIPs(p peer.ID) []string { func (ps *peerScore) getIPs(p peer.ID) []string {
// in unit tests this can be nil // in unit tests this can be nil
if ps.host == nil { if ps.host == nil {
@ -756,7 +789,8 @@ func (ps *peerScore) getIPs(p peer.ID) []string {
return res return res
} }
// adds tracking for the new IPs in the list, and removes tracking from the obsolete ips. // setIPs adds tracking for the new IPs in the list, and removes tracking from
// the obsolete IPs.
func (ps *peerScore) setIPs(p peer.ID, newips, oldips []string) { func (ps *peerScore) setIPs(p peer.ID, newips, oldips []string) {
addNewIPs: addNewIPs:
// add the new IPs to the tracking // add the new IPs to the tracking
@ -797,7 +831,7 @@ removeOldIPs:
} }
} }
// removes an IP list from the tracking list // removeIPs removes an IP list from the tracking list for a peer.
func (ps *peerScore) removeIPs(p peer.ID, ips []string) { func (ps *peerScore) removeIPs(p peer.ID, ips []string) {
for _, ip := range ips { for _, ip := range ips {
peers, ok := ps.peerIPs[ip] peers, ok := ps.peerIPs[ip]

View File

@ -21,7 +21,7 @@ type PeerScoreThresholds struct {
GraylistThreshold float64 GraylistThreshold float64
// acceptPXThreshold is the score threshold below which PX will be ignored; this should be positive // acceptPXThreshold is the score threshold below which PX will be ignored; this should be positive
// and limited to scores attainable by bootstrappers and other trusted nodes. // and limited to scores attainable by bootstrappers and other trusted nodes.
AcceptPXThreshold float64 AcceptPXThreshold float64
} }