From cce1f8a107d9a28560982a74df5d8a24b2f3ac72 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 22 Jul 2020 21:26:44 +0300 Subject: [PATCH] extended peer score inspection --- score.go | 89 +++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 85 insertions(+), 4 deletions(-) diff --git a/score.go b/score.go index 6c5738b..778d833 100644 --- a/score.go +++ b/score.go @@ -78,6 +78,7 @@ type peerScore struct { // debugging inspection inspect PeerScoreInspectFn + inspectEx ExtendedPeerScoreInspectFn inspectPeriod time.Duration } @@ -114,12 +115,33 @@ const ( ) type PeerScoreInspectFn func(map[peer.ID]float64) +type ExtendedPeerScoreInspectFn func(map[peer.ID]*PeerScoreSnapshot) + +type PeerScoreSnapshot struct { + Score float64 + Topics map[string]*TopicScoreSnapshot + AppSpecificScore float64 + IPColocationFactor int + BehaviourPenalty float64 +} + +type TopicScoreSnapshot struct { + TimeInMesh time.Duration + FirstMessageDeliveries float64 + MeshMessageDeliveries float64 + InvalidMessageDeliveries float64 +} // WithPeerScoreInspect is a gossipsub router option that enables peer score debugging. // When this option is enabled, the supplied function will be invoked periodically to allow -// the application to inspec or dump the scores for connected peers. +// the application to inspect or dump the scores for connected peers. +// The supplied function can have one of two signatures: +// - PeerScoreInspectFn, which takes a map of peer IDs to score. +// - ExtendedPeerScoreInspectFn, which takes a map of peer IDs to +// PeerScoreSnapshots and allows inspection of individual score +// components for debugging peer scoring. // This option must be passed _after_ the WithPeerScore option. -func WithPeerScoreInspect(inspect PeerScoreInspectFn, period time.Duration) Option { +func WithPeerScoreInspect(inspect interface{}, period time.Duration) Option { return func(ps *PubSub) error { gs, ok := ps.rt.(*GossipSubRouter) if !ok { @@ -130,7 +152,19 @@ func WithPeerScoreInspect(inspect PeerScoreInspectFn, period time.Duration) Opti return fmt.Errorf("peer scoring is not enabled") } - gs.score.inspect = inspect + switch i := inspect.(type) { + case PeerScoreInspectFn: + gs.score.inspect = i + case func(map[peer.ID]float64): + gs.score.inspect = PeerScoreInspectFn(i) + case ExtendedPeerScoreInspectFn: + gs.score.inspectEx = i + case func(map[peer.ID]*PeerScoreSnapshot): + gs.score.inspectEx = ExtendedPeerScoreInspectFn(i) + default: + return fmt.Errorf("unknown peer score insector type: %v", inspect) + } + gs.score.inspectPeriod = period return nil @@ -290,7 +324,7 @@ func (ps *peerScore) background(ctx context.Context) { defer gcDeliveryRecords.Stop() var inspectScores <-chan time.Time - if ps.inspect != nil { + if ps.inspect != nil || ps.inspectEx != nil { ticker := time.NewTicker(ps.inspectPeriod) defer ticker.Stop() // also dump at exit for one final sample @@ -320,6 +354,14 @@ func (ps *peerScore) background(ctx context.Context) { // inspectScores dumps all tracked scores into the inspect function. func (ps *peerScore) inspectScores() { + if ps.inspect != nil { + ps.inspectScoresSimple() + } else if ps.inspectEx != nil { + ps.inspectScoresExtended() + } +} + +func (ps *peerScore) inspectScoresSimple() { ps.Lock() scores := make(map[peer.ID]float64, len(ps.peerStats)) for p := range ps.peerStats { @@ -334,6 +376,45 @@ func (ps *peerScore) inspectScores() { go ps.inspect(scores) } +func (ps *peerScore) inspectScoresExtended() { + ps.Lock() + scores := make(map[peer.ID]*PeerScoreSnapshot, len(ps.peerStats)) + for p, pstats := range ps.peerStats { + pss := new(PeerScoreSnapshot) + pss.Score = ps.score(p) + if len(pstats.topics) > 0 { + pss.Topics = make(map[string]*TopicScoreSnapshot, len(pstats.topics)) + for t, ts := range pstats.topics { + pss.Topics[t] = &TopicScoreSnapshot{ + FirstMessageDeliveries: ts.firstMessageDeliveries, + MeshMessageDeliveries: ts.meshMessageDeliveries, + InvalidMessageDeliveries: ts.invalidMessageDeliveries, + } + if ts.inMesh { + pss.Topics[t].TimeInMesh = ts.meshTime + } + } + } + pss.AppSpecificScore = ps.params.AppSpecificScore(p) + for _, ip := range pstats.ips { + _, whitelisted := ps.params.IPColocationFactorWhitelist[ip] + if whitelisted { + continue + } + peersInIP := len(ps.peerIPs[ip]) + if peersInIP > ps.params.IPColocationFactorThreshold { + surpluss := peersInIP - ps.params.IPColocationFactorThreshold + pss.IPColocationFactor += surpluss + } + } + pss.BehaviourPenalty = pstats.behaviourPenalty + scores[p] = pss + } + ps.Unlock() + + go ps.inspectEx(scores) +} + // refreshScores decays scores, and purges score records for disconnected peers, // once their expiry has elapsed. func (ps *peerScore) refreshScores() {