mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-04 05:43:06 +00:00
add option to periodically inspect peer scores for debugging purposes
This commit is contained in:
parent
244a39f419
commit
588f7b85c0
57
score.go
57
score.go
@ -151,6 +151,10 @@ type peerScore struct {
|
|||||||
|
|
||||||
msgID MsgIdFunction
|
msgID MsgIdFunction
|
||||||
host host.Host
|
host host.Host
|
||||||
|
|
||||||
|
// debugging inspection
|
||||||
|
inspect PeerScoreInspectFn
|
||||||
|
inspectPeriod time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type messageDeliveries struct {
|
type messageDeliveries struct {
|
||||||
@ -181,6 +185,31 @@ const (
|
|||||||
delivery_throttled // we can't tell if it is valid because validation throttled
|
delivery_throttled // we can't tell if it is valid because validation throttled
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type PeerScoreInspectFn func(map[peer.ID]float64)
|
||||||
|
|
||||||
|
// WithPeerScoreDebug is a gossipsub router option that enables peer score debugging.
|
||||||
|
// When this option is enabled, the supplied function will be invoked periodically to allow
|
||||||
|
// the application to inspec or dump the scores for connected peers.
|
||||||
|
// This option must be passed _after_ the WithPeerScore option.
|
||||||
|
func WithPeerScoreDebug(inspect PeerScoreInspectFn, period time.Duration) Option {
|
||||||
|
return func(ps *PubSub) error {
|
||||||
|
gs, ok := ps.rt.(*GossipSubRouter)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("pubsub router is not gossipsub")
|
||||||
|
}
|
||||||
|
|
||||||
|
if gs.score == nil {
|
||||||
|
return fmt.Errorf("peer scoring is not enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
gs.score.inspect = inspect
|
||||||
|
gs.score.inspectPeriod = period
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// implementation
|
||||||
func newPeerScore(params *PeerScoreParams) *peerScore {
|
func newPeerScore(params *PeerScoreParams) *peerScore {
|
||||||
return &peerScore{
|
return &peerScore{
|
||||||
params: params,
|
params: params,
|
||||||
@ -311,6 +340,10 @@ func (ps *peerScore) Score(p peer.ID) float64 {
|
|||||||
ps.Lock()
|
ps.Lock()
|
||||||
defer ps.Unlock()
|
defer ps.Unlock()
|
||||||
|
|
||||||
|
return ps.score(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *peerScore) score(p peer.ID) float64 {
|
||||||
pstats, ok := ps.peerStats[p]
|
pstats, ok := ps.peerStats[p]
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0
|
return 0
|
||||||
@ -392,6 +425,16 @@ func (ps *peerScore) background(ctx context.Context) {
|
|||||||
gcDeliveryRecords := time.NewTicker(time.Minute)
|
gcDeliveryRecords := time.NewTicker(time.Minute)
|
||||||
defer gcDeliveryRecords.Stop()
|
defer gcDeliveryRecords.Stop()
|
||||||
|
|
||||||
|
var inspectScores <-chan time.Time
|
||||||
|
if ps.inspect != nil {
|
||||||
|
ticker := time.NewTicker(ps.inspectPeriod)
|
||||||
|
defer ticker.Stop()
|
||||||
|
inspectScores = ticker.C
|
||||||
|
} else {
|
||||||
|
// never fires
|
||||||
|
inspectScores = make(chan time.Time)
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-refreshScores.C:
|
case <-refreshScores.C:
|
||||||
@ -403,12 +446,26 @@ func (ps *peerScore) background(ctx context.Context) {
|
|||||||
case <-gcDeliveryRecords.C:
|
case <-gcDeliveryRecords.C:
|
||||||
ps.gcDeliveryRecords()
|
ps.gcDeliveryRecords()
|
||||||
|
|
||||||
|
case <-inspectScores:
|
||||||
|
ps.inspectScores()
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ps *peerScore) inspectScores() {
|
||||||
|
ps.Lock()
|
||||||
|
scores := make(map[peer.ID]float64, len(ps.peerStats))
|
||||||
|
for p := range ps.peerStats {
|
||||||
|
scores[p] = ps.score(p)
|
||||||
|
}
|
||||||
|
ps.Unlock()
|
||||||
|
|
||||||
|
ps.inspect(scores)
|
||||||
|
}
|
||||||
|
|
||||||
func (ps *peerScore) refreshScores() {
|
func (ps *peerScore) refreshScores() {
|
||||||
ps.Lock()
|
ps.Lock()
|
||||||
defer ps.Unlock()
|
defer ps.Unlock()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user