mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-03 05:13:07 +00:00
implement peer gater
This commit is contained in:
parent
2bc51e0cf2
commit
45272722d2
205
peer_gater.go
205
peer_gater.go
@ -1,25 +1,50 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
)
|
||||
|
||||
var PeerGaterRetainStats = 6 * time.Hour
|
||||
var PeerGaterQuiet = time.Minute
|
||||
|
||||
type peerGater struct {
|
||||
sync.Mutex
|
||||
|
||||
threshold, decay float64
|
||||
validate, throttle float64
|
||||
|
||||
lastThrottle time.Time
|
||||
|
||||
stats map[peer.ID]*peerGaterStats
|
||||
}
|
||||
|
||||
type peerGaterStats struct {
|
||||
connected bool
|
||||
expire time.Time
|
||||
|
||||
deliver, ignore, reject float64
|
||||
}
|
||||
|
||||
// WithPeerGater is a gossipsub router option that enables reactive validation queue
|
||||
// management.
|
||||
func WithPeerGater() Option {
|
||||
// The threshold parameter is the threshold of throttled/validated messages before the gating
|
||||
// kicks in.
|
||||
// The decay parameter is the (linear) decay of counters per second.
|
||||
func WithPeerGater(threshold, decay float64) Option {
|
||||
return func(ps *PubSub) error {
|
||||
gs, ok := ps.rt.(*GossipSubRouter)
|
||||
if !ok {
|
||||
return fmt.Errorf("pubsub router is not gossipsub")
|
||||
}
|
||||
|
||||
gs.gate = newPeerGater()
|
||||
gs.gate = newPeerGater(ps.ctx, threshold, decay)
|
||||
|
||||
// hook the tracer
|
||||
if ps.tracer != nil {
|
||||
@ -36,8 +61,71 @@ func WithPeerGater() Option {
|
||||
}
|
||||
}
|
||||
|
||||
func newPeerGater() *peerGater {
|
||||
return &peerGater{}
|
||||
func newPeerGater(ctx context.Context, threshold, decay float64) *peerGater {
|
||||
pg := &peerGater{threshold: threshold, decay: decay}
|
||||
go pg.background(ctx)
|
||||
return pg
|
||||
}
|
||||
|
||||
func (pg *peerGater) background(ctx context.Context) {
|
||||
tick := time.NewTicker(DefaultDecayInterval)
|
||||
|
||||
defer tick.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-tick.C:
|
||||
pg.decayStats()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (pg *peerGater) decayStats() {
|
||||
pg.Lock()
|
||||
defer pg.Unlock()
|
||||
|
||||
pg.validate *= pg.decay
|
||||
if pg.validate < DefaultDecayToZero {
|
||||
pg.validate = 0
|
||||
}
|
||||
|
||||
pg.throttle *= pg.throttle
|
||||
if pg.throttle < DefaultDecayToZero {
|
||||
pg.throttle = 0
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
for p, st := range pg.stats {
|
||||
if st.connected {
|
||||
st.deliver *= pg.decay
|
||||
if st.deliver < DefaultDecayToZero {
|
||||
st.deliver = 0
|
||||
}
|
||||
|
||||
st.ignore *= pg.decay
|
||||
if st.ignore < DefaultDecayToZero {
|
||||
st.ignore = 0
|
||||
}
|
||||
|
||||
st.reject *= pg.decay
|
||||
if st.reject < DefaultDecayToZero {
|
||||
st.reject = 0
|
||||
}
|
||||
} else if st.expire.Before(now) {
|
||||
delete(pg.stats, p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (pg *peerGater) getPeerStats(p peer.ID) *peerGaterStats {
|
||||
st, ok := pg.stats[p]
|
||||
if !ok {
|
||||
st = &peerGaterStats{}
|
||||
pg.stats[p] = st
|
||||
}
|
||||
return st
|
||||
}
|
||||
|
||||
func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus {
|
||||
@ -45,16 +133,103 @@ func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus {
|
||||
return AcceptAll
|
||||
}
|
||||
|
||||
return AcceptAll
|
||||
pg.Lock()
|
||||
defer pg.Unlock()
|
||||
|
||||
if time.Since(pg.lastThrottle) > PeerGaterQuiet {
|
||||
return AcceptAll
|
||||
}
|
||||
|
||||
if pg.throttle == 0 {
|
||||
return AcceptAll
|
||||
}
|
||||
|
||||
if pg.validate != 0 && pg.throttle/pg.validate < pg.threshold {
|
||||
return AcceptAll
|
||||
}
|
||||
|
||||
st := pg.getPeerStats(p)
|
||||
|
||||
total := st.deliver + st.ignore + st.reject
|
||||
if total == 0 {
|
||||
return AcceptAll
|
||||
}
|
||||
|
||||
// we make a randomized decision based on the goodput of the peer
|
||||
goodput := st.deliver / total
|
||||
if rand.Float64() < goodput {
|
||||
return AcceptAll
|
||||
}
|
||||
|
||||
log.Debugf("throttling peer %s with goodput %f", p, goodput)
|
||||
return AcceptControl
|
||||
}
|
||||
|
||||
func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) {}
|
||||
func (pg *peerGater) RemovePeer(p peer.ID) {}
|
||||
func (pg *peerGater) Join(topic string) {}
|
||||
func (pg *peerGater) Leave(topic string) {}
|
||||
func (pg *peerGater) Graft(p peer.ID, topic string) {}
|
||||
func (pg *peerGater) Prune(p peer.ID, topic string) {}
|
||||
func (pg *peerGater) ValidateMessage(msg *Message) {}
|
||||
func (pg *peerGater) DeliverMessage(msg *Message) {}
|
||||
func (pg *peerGater) RejectMessage(msg *Message, reason string) {}
|
||||
func (pg *peerGater) DuplicateMessage(msg *Message) {}
|
||||
func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) {
|
||||
pg.Lock()
|
||||
defer pg.Unlock()
|
||||
|
||||
st := pg.getPeerStats(p)
|
||||
st.connected = true
|
||||
st.expire = time.Time{}
|
||||
}
|
||||
|
||||
func (pg *peerGater) RemovePeer(p peer.ID) {
|
||||
pg.Lock()
|
||||
defer pg.Unlock()
|
||||
|
||||
st := pg.getPeerStats(p)
|
||||
st.connected = false
|
||||
st.expire = time.Now().Add(PeerGaterRetainStats)
|
||||
}
|
||||
|
||||
func (pg *peerGater) Join(topic string) {}
|
||||
func (pg *peerGater) Leave(topic string) {}
|
||||
func (pg *peerGater) Graft(p peer.ID, topic string) {}
|
||||
func (pg *peerGater) Prune(p peer.ID, topic string) {}
|
||||
|
||||
func (pg *peerGater) ValidateMessage(msg *Message) {
|
||||
pg.Lock()
|
||||
defer pg.Unlock()
|
||||
|
||||
pg.validate++
|
||||
}
|
||||
|
||||
func (pg *peerGater) DeliverMessage(msg *Message) {
|
||||
pg.Lock()
|
||||
defer pg.Unlock()
|
||||
|
||||
st := pg.getPeerStats(msg.ReceivedFrom)
|
||||
st.deliver++
|
||||
}
|
||||
|
||||
func (pg *peerGater) RejectMessage(msg *Message, reason string) {
|
||||
pg.Lock()
|
||||
defer pg.Unlock()
|
||||
|
||||
switch reason {
|
||||
case rejectValidationQueueFull:
|
||||
fallthrough
|
||||
case rejectValidationThrottled:
|
||||
pg.lastThrottle = time.Now()
|
||||
pg.throttle++
|
||||
|
||||
case rejectValidationIgnored:
|
||||
st := pg.getPeerStats(msg.ReceivedFrom)
|
||||
st.ignore++
|
||||
|
||||
case rejectMissingSignature:
|
||||
fallthrough
|
||||
case rejectUnexpectedSignature:
|
||||
fallthrough
|
||||
case rejectUnexpectedAuthInfo:
|
||||
fallthrough
|
||||
case rejectInvalidSignature:
|
||||
fallthrough
|
||||
case rejectValidationFailed:
|
||||
st := pg.getPeerStats(msg.ReceivedFrom)
|
||||
st.reject++
|
||||
}
|
||||
}
|
||||
|
||||
func (pg *peerGater) DuplicateMessage(msg *Message) {}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user