From 588f7b85c0b471205800a2f76eaf0ac8a51ff5cc Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 17 Mar 2020 14:14:56 +0200 Subject: [PATCH] add option to periodically inspect peer scores for debugging purposes --- score.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/score.go b/score.go index 4a681d3..c5f5e37 100644 --- a/score.go +++ b/score.go @@ -151,6 +151,10 @@ type peerScore struct { msgID MsgIdFunction host host.Host + + // debugging inspection + inspect PeerScoreInspectFn + inspectPeriod time.Duration } type messageDeliveries struct { @@ -181,6 +185,31 @@ const ( delivery_throttled // we can't tell if it is valid because validation throttled ) +type PeerScoreInspectFn func(map[peer.ID]float64) + +// WithPeerScoreDebug is a gossipsub router option that enables peer score debugging. +// When this option is enabled, the supplied function will be invoked periodically to allow +// the application to inspec or dump the scores for connected peers. +// This option must be passed _after_ the WithPeerScore option. +func WithPeerScoreDebug(inspect PeerScoreInspectFn, period time.Duration) Option { + return func(ps *PubSub) error { + gs, ok := ps.rt.(*GossipSubRouter) + if !ok { + return fmt.Errorf("pubsub router is not gossipsub") + } + + if gs.score == nil { + return fmt.Errorf("peer scoring is not enabled") + } + + gs.score.inspect = inspect + gs.score.inspectPeriod = period + + return nil + } +} + +// implementation func newPeerScore(params *PeerScoreParams) *peerScore { return &peerScore{ params: params, @@ -311,6 +340,10 @@ func (ps *peerScore) Score(p peer.ID) float64 { ps.Lock() defer ps.Unlock() + return ps.score(p) +} + +func (ps *peerScore) score(p peer.ID) float64 { pstats, ok := ps.peerStats[p] if !ok { return 0 @@ -392,6 +425,16 @@ func (ps *peerScore) background(ctx context.Context) { gcDeliveryRecords := time.NewTicker(time.Minute) defer gcDeliveryRecords.Stop() + var inspectScores <-chan time.Time + if ps.inspect != nil { + ticker := time.NewTicker(ps.inspectPeriod) + defer ticker.Stop() + inspectScores = ticker.C + } else { + // never fires + inspectScores = make(chan time.Time) + } + for { select { case <-refreshScores.C: @@ -403,12 +446,26 @@ func (ps *peerScore) background(ctx context.Context) { case <-gcDeliveryRecords.C: ps.gcDeliveryRecords() + case <-inspectScores: + ps.inspectScores() + case <-ctx.Done(): return } } } +func (ps *peerScore) inspectScores() { + ps.Lock() + scores := make(map[peer.ID]float64, len(ps.peerStats)) + for p := range ps.peerStats { + scores[p] = ps.score(p) + } + ps.Unlock() + + ps.inspect(scores) +} + func (ps *peerScore) refreshScores() { ps.Lock() defer ps.Unlock()