diff --git a/score.go b/score.go index 55c9f6f..bfddde1 100644 --- a/score.go +++ b/score.go @@ -1,6 +1,7 @@ package pubsub import ( + "context" "sync" "time" @@ -180,13 +181,20 @@ const ( ) func newPeerScore(params *PeerScoreParams) *peerScore { - // TODO - return nil + return &peerScore{ + params: params, + peerStats: make(map[peer.ID]*peerStats), + peerIPs: make(map[string]map[peer.ID]struct{}), + deliveries: &messageDeliveries{records: make(map[string]*deliveryRecord)}, + msgID: DefaultMsgIdFn, + } } // router interface func (ps *peerScore) Start(gs *GossipSubRouter) { - // TODO + ps.msgID = gs.p.msgID + ps.host = gs.p.host + go ps.background(gs.p.ctx) } func (ps *peerScore) Score(p peer.ID) float64 { @@ -268,6 +276,33 @@ func (ps *peerScore) Score(p peer.ID) float64 { } // periodic maintenance +func (ps *peerScore) background(ctx context.Context) { + refreshScores := time.NewTicker(ps.params.DecayInterval) + defer refreshScores.Stop() + + refreshIPs := time.NewTicker(time.Minute) + defer refreshIPs.Stop() + + gcDeliveryRecords := time.NewTicker(time.Minute) + defer gcDeliveryRecords.Stop() + + for { + select { + case <-refreshScores.C: + ps.refreshScores() + + case <-refreshIPs.C: + ps.refreshIPs() + + case <-gcDeliveryRecords.C: + ps.gcDeliveryRecords() + + case <-ctx.Done(): + return + } + } +} + func (ps *peerScore) refreshScores() { ps.Lock() defer ps.Unlock() @@ -325,7 +360,10 @@ func (ps *peerScore) refreshScores() { } } -func (ps *peerScore) resfreshIPs() { +func (ps *peerScore) refreshIPs() { + ps.Lock() + defer ps.Unlock() + // peer IPs may change, so we periodically refresh them for p, pstats := range ps.peerStats { if pstats.connected { @@ -336,6 +374,13 @@ func (ps *peerScore) resfreshIPs() { } } +func (ps *peerScore) gcDeliveryRecords() { + ps.Lock() + defer ps.Unlock() + + ps.deliveries.gc() +} + // tracer interface func (ps *peerScore) AddPeer(p peer.ID, proto protocol.ID) { ps.Lock()