mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-05 22:33:10 +00:00
implement periodic maintenance
This commit is contained in:
parent
f550385493
commit
4175342bc8
53
score.go
53
score.go
@ -1,6 +1,7 @@
|
|||||||
package pubsub
|
package pubsub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -180,13 +181,20 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func newPeerScore(params *PeerScoreParams) *peerScore {
|
func newPeerScore(params *PeerScoreParams) *peerScore {
|
||||||
// TODO
|
return &peerScore{
|
||||||
return nil
|
params: params,
|
||||||
|
peerStats: make(map[peer.ID]*peerStats),
|
||||||
|
peerIPs: make(map[string]map[peer.ID]struct{}),
|
||||||
|
deliveries: &messageDeliveries{records: make(map[string]*deliveryRecord)},
|
||||||
|
msgID: DefaultMsgIdFn,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// router interface
|
// router interface
|
||||||
func (ps *peerScore) Start(gs *GossipSubRouter) {
|
func (ps *peerScore) Start(gs *GossipSubRouter) {
|
||||||
// TODO
|
ps.msgID = gs.p.msgID
|
||||||
|
ps.host = gs.p.host
|
||||||
|
go ps.background(gs.p.ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *peerScore) Score(p peer.ID) float64 {
|
func (ps *peerScore) Score(p peer.ID) float64 {
|
||||||
@ -268,6 +276,33 @@ func (ps *peerScore) Score(p peer.ID) float64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// periodic maintenance
|
// periodic maintenance
|
||||||
|
func (ps *peerScore) background(ctx context.Context) {
|
||||||
|
refreshScores := time.NewTicker(ps.params.DecayInterval)
|
||||||
|
defer refreshScores.Stop()
|
||||||
|
|
||||||
|
refreshIPs := time.NewTicker(time.Minute)
|
||||||
|
defer refreshIPs.Stop()
|
||||||
|
|
||||||
|
gcDeliveryRecords := time.NewTicker(time.Minute)
|
||||||
|
defer gcDeliveryRecords.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-refreshScores.C:
|
||||||
|
ps.refreshScores()
|
||||||
|
|
||||||
|
case <-refreshIPs.C:
|
||||||
|
ps.refreshIPs()
|
||||||
|
|
||||||
|
case <-gcDeliveryRecords.C:
|
||||||
|
ps.gcDeliveryRecords()
|
||||||
|
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (ps *peerScore) refreshScores() {
|
func (ps *peerScore) refreshScores() {
|
||||||
ps.Lock()
|
ps.Lock()
|
||||||
defer ps.Unlock()
|
defer ps.Unlock()
|
||||||
@ -325,7 +360,10 @@ func (ps *peerScore) refreshScores() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *peerScore) resfreshIPs() {
|
func (ps *peerScore) refreshIPs() {
|
||||||
|
ps.Lock()
|
||||||
|
defer ps.Unlock()
|
||||||
|
|
||||||
// peer IPs may change, so we periodically refresh them
|
// peer IPs may change, so we periodically refresh them
|
||||||
for p, pstats := range ps.peerStats {
|
for p, pstats := range ps.peerStats {
|
||||||
if pstats.connected {
|
if pstats.connected {
|
||||||
@ -336,6 +374,13 @@ func (ps *peerScore) resfreshIPs() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ps *peerScore) gcDeliveryRecords() {
|
||||||
|
ps.Lock()
|
||||||
|
defer ps.Unlock()
|
||||||
|
|
||||||
|
ps.deliveries.gc()
|
||||||
|
}
|
||||||
|
|
||||||
// tracer interface
|
// tracer interface
|
||||||
func (ps *peerScore) AddPeer(p peer.ID, proto protocol.ID) {
|
func (ps *peerScore) AddPeer(p peer.ID, proto protocol.ID) {
|
||||||
ps.Lock()
|
ps.Lock()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user