From fc38f556a308e5d9ac891f46a22fe04a45635f53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 1 Apr 2020 23:20:53 +0100 Subject: [PATCH] comments and nits. --- gossipsub.go | 11 +++++----- score.go | 56 +++++++++++++++++++++++++++++++++++++++---------- score_params.go | 2 +- 3 files changed, 51 insertions(+), 18 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index 16c0774..dbc88d1 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -381,7 +381,7 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb. // we don't respond to IWANT requests from any peer whose score is below the gossip threshold score := gs.score.Score(p) if score < gs.gossipThreshold { - log.Debugf("IWANT: ignorin peer %s with score below threshold [score = %f]", p, score) + log.Debugf("IWANT: ignoring peer %s with score below threshold [score = %f]", p, score) return nil } @@ -429,7 +429,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. if !ok { // don't do PX when there is an unknown topic to avoid leaking our peers doPX = false - // spam harndening: ignore GRAFTs for unknown topics + // spam hardening: ignore GRAFTs for unknown topics continue } @@ -467,11 +467,10 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. continue } - log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic) + log.Debugf("GRAFT: add mesh link from %s in %s", p, topic) gs.tracer.Graft(p, topic) peers[p] = struct{}{} gs.tagPeer(p, topic) - } if len(prune) == 0 { @@ -875,9 +874,9 @@ func (gs *GossipSubRouter) heartbeat() { } // do we have enough peers? - if len(peers) < GossipSubDlo { + if l := len(peers); l < GossipSubDlo { backoff := gs.backoff[topic] - ineed := GossipSubD - len(peers) + ineed := GossipSubD - l plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { // filter our current and direct peers, peers we are backing off, and peers with negative score _, inMesh := peers[p] diff --git a/score.go b/score.go index 4673e3e..f373181 100644 --- a/score.go +++ b/score.go @@ -64,7 +64,7 @@ type peerScore struct { // per peer stats for score calculation peerStats map[peer.ID]*peerStats - // IP colocation tracking + // IP colocation tracking; maps IP => set of peers. peerIPs map[string]map[peer.ID]struct{} // message delivery tracking @@ -78,6 +78,8 @@ type peerScore struct { inspectPeriod time.Duration } +var _ scoreTracer = (*peerScore)(nil) + type messageDeliveries struct { records map[string]*deliveryRecord @@ -173,9 +175,6 @@ func (ps *peerScore) score(p peer.ID) float64 { // topic scores for topic, tstats := range pstats.topics { - // the topic score - var topicScore float64 - // the topic parameters topicParams, ok := ps.params.Topics[topic] if !ok { @@ -183,6 +182,9 @@ func (ps *peerScore) score(p peer.ID) float64 { continue } + // the topic score + var topicScore float64 + // P1: time in Mesh if tstats.inMesh { p1 := float64(tstats.meshTime / topicParams.TimeInMeshQuantum) @@ -206,10 +208,12 @@ func (ps *peerScore) score(p peer.ID) float64 { } // P3b: + // NOTE: the weight of P3b is negative (validated in TopicScoreParams.validate), so this detracts. p3b := tstats.meshFailurePenalty topicScore += p3b * topicParams.MeshFailurePenaltyWeight // P4: invalid messages + // NOTE: the weight of P4 is negative (validated in TopicScoreParams.validate), so this detracts. p4 := tstats.invalidMessageDeliveries topicScore += p4 * topicParams.InvalidMessageDeliveriesWeight @@ -233,6 +237,10 @@ func (ps *peerScore) score(p peer.ID) float64 { continue } + // P6 has a cliff (IPColocationFactorThreshold); it's only applied iff + // at least that many peers are connected to us from that source IP + // addr. It is quadratic, and the weight is negative (validated by + // PeerScoreParams.validate). peersInIP := len(ps.peerIPs[ip]) if peersInIP > ps.params.IPColocationFactorThreshold { surpluss := float64(peersInIP - ps.params.IPColocationFactorThreshold) @@ -284,6 +292,7 @@ func (ps *peerScore) background(ctx context.Context) { } } +// inspectScores dumps all tracked scores into the inspect function. func (ps *peerScore) inspectScores() { ps.Lock() scores := make(map[peer.ID]float64, len(ps.peerStats)) @@ -292,9 +301,15 @@ func (ps *peerScore) inspectScores() { } ps.Unlock() - ps.inspect(scores) + // Since this is a user-injected function, it could be performing I/O, and + // we don't want to block the scorer's background loop. Therefore, we launch + // it in a separate goroutine. If the function needs to synchronise, it + // should do so locally. + go ps.inspect(scores) } +// refreshScores decays scores, and purges score records for disconnected peers, +// once their expiry has elapsed. func (ps *peerScore) refreshScores() { ps.Lock() defer ps.Unlock() @@ -352,11 +367,17 @@ func (ps *peerScore) refreshScores() { } } +// refreshIPs refreshes IPs we know of peers we're tracking. func (ps *peerScore) refreshIPs() { ps.Lock() defer ps.Unlock() // peer IPs may change, so we periodically refresh them + // + // TODO: it could be more efficient to collect connections for all peers + // from the Network, populate a new map, and replace it in place. We are + // incurring in those allocs anyway, and maybe even in more, in the form of + // slices. for p, pstats := range ps.peerStats { if pstats.connected { ips := ps.getIPs(p) @@ -619,7 +640,9 @@ func (d *messageDeliveries) gc() { } } -// utilities +// getTopicStats returns existing topic stats for a given a given (peer, topic) +// tuple, or initialises a new topicStats object and inserts it in the +// peerStats, iff the topic is scored. func (pstats *peerStats) getTopicStats(topic string, params *PeerScoreParams) (*topicStats, bool) { tstats, ok := pstats.topics[topic] if ok { @@ -637,6 +660,8 @@ func (pstats *peerStats) getTopicStats(topic string, params *PeerScoreParams) (* return tstats, true } +// markInvalidMessageDelivery increments the "invalid message deliveries" +// counter for all scored topics the message is published in. func (ps *peerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) { pstats, ok := ps.peerStats[p] if !ok { @@ -653,6 +678,9 @@ func (ps *peerScore) markInvalidMessageDelivery(p peer.ID, msg *Message) { } } +// markFirstMessageDelivery increments the "first message deliveries" counter +// for all scored topics the message is published in, as well as the "mesh +// message deliveries" counter, if the peer is in the mesh for the topic. func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) { pstats, ok := ps.peerStats[p] if !ok { @@ -683,6 +711,9 @@ func (ps *peerScore) markFirstMessageDelivery(p peer.ID, msg *Message) { } } +// markDuplicateMessageDelivery increments the "mesh message deliveries" counter +// for messages we've seen before, as long the message was received within the +// P3 window. func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, validated time.Time) { var now time.Time @@ -705,14 +736,16 @@ func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, valid continue } + tparams := ps.params.Topics[topic] + // check against the mesh delivery window -- if the validated time is passed as 0, then // the message was received before we finished validation and thus falls within the mesh // delivery window. - if !validated.IsZero() && now.After(validated.Add(ps.params.Topics[topic].MeshMessageDeliveriesWindow)) { + if !validated.IsZero() && now.After(validated.Add(tparams.MeshMessageDeliveriesWindow)) { continue } - cap := ps.params.Topics[topic].MeshMessageDeliveriesCap + cap := tparams.MeshMessageDeliveriesCap tstats.meshMessageDeliveries += 1 if tstats.meshMessageDeliveries > cap { tstats.meshMessageDeliveries = cap @@ -720,7 +753,7 @@ func (ps *peerScore) markDuplicateMessageDelivery(p peer.ID, msg *Message, valid } } -// gets the current IPs for a peer +// getIPs gets the current IPs for a peer. func (ps *peerScore) getIPs(p peer.ID) []string { // in unit tests this can be nil if ps.host == nil { @@ -756,7 +789,8 @@ func (ps *peerScore) getIPs(p peer.ID) []string { return res } -// adds tracking for the new IPs in the list, and removes tracking from the obsolete ips. +// setIPs adds tracking for the new IPs in the list, and removes tracking from +// the obsolete IPs. func (ps *peerScore) setIPs(p peer.ID, newips, oldips []string) { addNewIPs: // add the new IPs to the tracking @@ -797,7 +831,7 @@ removeOldIPs: } } -// removes an IP list from the tracking list +// removeIPs removes an IP list from the tracking list for a peer. func (ps *peerScore) removeIPs(p peer.ID, ips []string) { for _, ip := range ips { peers, ok := ps.peerIPs[ip] diff --git a/score_params.go b/score_params.go index 5bbb344..0e537a3 100644 --- a/score_params.go +++ b/score_params.go @@ -21,7 +21,7 @@ type PeerScoreThresholds struct { GraylistThreshold float64 // acceptPXThreshold is the score threshold below which PX will be ignored; this should be positive - // and limited to scores attainable by bootstrappers and other trusted nodes. + // and limited to scores attainable by bootstrappers and other trusted nodes. AcceptPXThreshold float64 }