mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-07 15:23:08 +00:00
configure the peer gater using a parameter object, docs and stuff
This commit is contained in:
parent
f595227544
commit
c242b2e7be
178
peer_gater.go
178
peer_gater.go
@ -16,46 +16,149 @@ import (
|
|||||||
manet "github.com/multiformats/go-multiaddr-net"
|
manet "github.com/multiformats/go-multiaddr-net"
|
||||||
)
|
)
|
||||||
|
|
||||||
var PeerGaterRetainStats = 6 * time.Hour
|
var (
|
||||||
var PeerGaterQuiet = time.Minute
|
DefaultPeerGaterRetainStats = 6 * time.Hour
|
||||||
|
DefaultPeerGaterQuiet = time.Minute
|
||||||
|
DefaultPeerGaterDuplicateWeight = 0.25
|
||||||
|
DefaultPeerGaterIgnoreWeight = 1.0
|
||||||
|
DefaultPeerGaterRejectWeight = 4.0
|
||||||
|
DefaultPeerGaterThreshold = 0.33
|
||||||
|
DefaultPeerGaterDecay = ScoreParameterDecay(time.Hour)
|
||||||
|
)
|
||||||
|
|
||||||
|
// PeerGaterParams groups together parameters that control the operation of the peer gater
|
||||||
|
type PeerGaterParams struct {
|
||||||
|
// when the ratio of throttled/validated messages exceeds this threshold, the gater turns on
|
||||||
|
Threshold float64
|
||||||
|
// (linear) decay parameter for gater counters
|
||||||
|
Decay float64
|
||||||
|
// decay interval
|
||||||
|
DecayInterval time.Duration
|
||||||
|
// counter zeroing threshold
|
||||||
|
DecayToZero float64
|
||||||
|
// how long to retain stats
|
||||||
|
RetainStats time.Duration
|
||||||
|
// quiet interval before turning off the gater; if there are no validation throttle events
|
||||||
|
// for this interval, the gater turns off
|
||||||
|
Quiet time.Duration
|
||||||
|
// weight of duplicate message deliveries
|
||||||
|
DuplicateWeight float64
|
||||||
|
// weight of ignored messages
|
||||||
|
IgnoreWeight float64
|
||||||
|
// weight of rejected messages
|
||||||
|
RejectWeight float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PeerGaterParams) validate() error {
|
||||||
|
if p.Threshold <= 0 {
|
||||||
|
return fmt.Errorf("invalid Threshold; must be > 0")
|
||||||
|
}
|
||||||
|
if p.Decay <= 0 || p.Decay >= 1 {
|
||||||
|
return fmt.Errorf("invalid Decay; must be between 0 and 1")
|
||||||
|
}
|
||||||
|
if p.DecayInterval < time.Second {
|
||||||
|
return fmt.Errorf("invalid DecayInterval; must be at least 1s")
|
||||||
|
}
|
||||||
|
if p.DecayToZero <= 0 || p.DecayToZero >= 1 {
|
||||||
|
return fmt.Errorf("invalid DecayToZero; must be between 0 and 1")
|
||||||
|
}
|
||||||
|
// no need to check stats retention; a value of 0 means we don't retain stats
|
||||||
|
if p.Quiet < time.Second {
|
||||||
|
return fmt.Errorf("invalud Quiet interval; must be at least 1s")
|
||||||
|
}
|
||||||
|
if p.DuplicateWeight <= 0 {
|
||||||
|
return fmt.Errorf("invalid DuplicateWeight; must be > 0")
|
||||||
|
}
|
||||||
|
if p.IgnoreWeight < 1 {
|
||||||
|
return fmt.Errorf("invalid IgnoreWeight; must be >= 1")
|
||||||
|
}
|
||||||
|
if p.RejectWeight < 1 {
|
||||||
|
return fmt.Errorf("invalud RejectWeight; must be >= 1")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPeerGaterParams creates a new PeerGaterParams struct, using the specified threshold and decay
|
||||||
|
// parameters and default values for all other parameters.
|
||||||
|
func NewPeerGaterParams(threshold, decay float64) *PeerGaterParams {
|
||||||
|
return &PeerGaterParams{
|
||||||
|
Threshold: threshold,
|
||||||
|
Decay: decay,
|
||||||
|
DecayToZero: DefaultDecayToZero,
|
||||||
|
DecayInterval: DefaultDecayInterval,
|
||||||
|
RetainStats: DefaultPeerGaterRetainStats,
|
||||||
|
Quiet: DefaultPeerGaterQuiet,
|
||||||
|
DuplicateWeight: DefaultPeerGaterDuplicateWeight,
|
||||||
|
IgnoreWeight: DefaultPeerGaterIgnoreWeight,
|
||||||
|
RejectWeight: DefaultPeerGaterRejectWeight,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultPeerGaterParams creates a new PeerGaterParams struct using default values
|
||||||
|
func DefaultPeerGaterParams() *PeerGaterParams {
|
||||||
|
return NewPeerGaterParams(DefaultPeerGaterThreshold, DefaultPeerGaterDecay)
|
||||||
|
}
|
||||||
|
|
||||||
|
// the gater object.
|
||||||
type peerGater struct {
|
type peerGater struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
threshold, decay float64
|
host host.Host
|
||||||
|
|
||||||
|
// gater parameters
|
||||||
|
params *PeerGaterParams
|
||||||
|
|
||||||
|
// counters
|
||||||
validate, throttle float64
|
validate, throttle float64
|
||||||
|
|
||||||
|
// time of last validation throttle
|
||||||
lastThrottle time.Time
|
lastThrottle time.Time
|
||||||
|
|
||||||
|
// stats per peer.ID -- multiple peer IDs may share the same stats object if they are
|
||||||
|
// colocated in the same IP
|
||||||
peerStats map[peer.ID]*peerGaterStats
|
peerStats map[peer.ID]*peerGaterStats
|
||||||
ipStats map[string]*peerGaterStats
|
// stats per IP
|
||||||
|
ipStats map[string]*peerGaterStats
|
||||||
host host.Host
|
|
||||||
|
|
||||||
// for unit tests
|
// for unit tests
|
||||||
getIP func(peer.ID) string
|
getIP func(peer.ID) string
|
||||||
}
|
}
|
||||||
|
|
||||||
type peerGaterStats struct {
|
type peerGaterStats struct {
|
||||||
|
// number of connected peer IDs mapped to this stat object
|
||||||
connected int
|
connected int
|
||||||
expire time.Time
|
// stats expiration time -- only valid if connected = 0
|
||||||
|
expire time.Time
|
||||||
|
|
||||||
|
// counters
|
||||||
deliver, duplicate, ignore, reject float64
|
deliver, duplicate, ignore, reject float64
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithPeerGater is a gossipsub router option that enables reactive validation queue
|
// WithPeerGater is a gossipsub router option that enables reactive validation queue
|
||||||
// management.
|
// management.
|
||||||
// The threshold parameter is the threshold of throttled/validated messages before the gating
|
// The Gater is activated if the ratio of throttled/validated messages exceeds the specified
|
||||||
// kicks in.
|
// threshold.
|
||||||
// The decay parameter is the (linear) decay of counters per second.
|
// Once active, the Gater probabilistically throttles peers _before_ they enter the validation
|
||||||
func WithPeerGater(threshold, decay float64) Option {
|
// queue, performing Random Early Drop.
|
||||||
|
// The throttle decision is randomized, with the probability of allowing messages to enter the
|
||||||
|
// validation queue controlled by the statistical observations of the performance of all peers
|
||||||
|
// in the IP address of the gated peer.
|
||||||
|
// The Gater deactivates if there is no validation throttlinc occurring for the specified quiet
|
||||||
|
// interval.
|
||||||
|
func WithPeerGater(params *PeerGaterParams) Option {
|
||||||
return func(ps *PubSub) error {
|
return func(ps *PubSub) error {
|
||||||
gs, ok := ps.rt.(*GossipSubRouter)
|
gs, ok := ps.rt.(*GossipSubRouter)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("pubsub router is not gossipsub")
|
return fmt.Errorf("pubsub router is not gossipsub")
|
||||||
}
|
}
|
||||||
|
|
||||||
gs.gate = newPeerGater(ps.ctx, ps.host, threshold, decay)
|
err := params.validate()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
gs.gate = newPeerGater(ps.ctx, ps.host, params)
|
||||||
|
|
||||||
// hook the tracer
|
// hook the tracer
|
||||||
if ps.tracer != nil {
|
if ps.tracer != nil {
|
||||||
@ -72,10 +175,9 @@ func WithPeerGater(threshold, decay float64) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPeerGater(ctx context.Context, host host.Host, threshold, decay float64) *peerGater {
|
func newPeerGater(ctx context.Context, host host.Host, params *PeerGaterParams) *peerGater {
|
||||||
pg := &peerGater{
|
pg := &peerGater{
|
||||||
threshold: threshold,
|
params: params,
|
||||||
decay: decay,
|
|
||||||
peerStats: make(map[peer.ID]*peerGaterStats),
|
peerStats: make(map[peer.ID]*peerGaterStats),
|
||||||
ipStats: make(map[string]*peerGaterStats),
|
ipStats: make(map[string]*peerGaterStats),
|
||||||
host: host,
|
host: host,
|
||||||
@ -85,7 +187,7 @@ func newPeerGater(ctx context.Context, host host.Host, threshold, decay float64)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pg *peerGater) background(ctx context.Context) {
|
func (pg *peerGater) background(ctx context.Context) {
|
||||||
tick := time.NewTicker(DefaultDecayInterval)
|
tick := time.NewTicker(pg.params.DecayInterval)
|
||||||
|
|
||||||
defer tick.Stop()
|
defer tick.Stop()
|
||||||
|
|
||||||
@ -103,36 +205,36 @@ func (pg *peerGater) decayStats() {
|
|||||||
pg.Lock()
|
pg.Lock()
|
||||||
defer pg.Unlock()
|
defer pg.Unlock()
|
||||||
|
|
||||||
pg.validate *= pg.decay
|
pg.validate *= pg.params.Decay
|
||||||
if pg.validate < DefaultDecayToZero {
|
if pg.validate < pg.params.DecayToZero {
|
||||||
pg.validate = 0
|
pg.validate = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
pg.throttle *= pg.throttle
|
pg.throttle *= pg.params.Decay
|
||||||
if pg.throttle < DefaultDecayToZero {
|
if pg.throttle < pg.params.DecayToZero {
|
||||||
pg.throttle = 0
|
pg.throttle = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
for ip, st := range pg.ipStats {
|
for ip, st := range pg.ipStats {
|
||||||
if st.connected > 0 {
|
if st.connected > 0 {
|
||||||
st.deliver *= pg.decay
|
st.deliver *= pg.params.Decay
|
||||||
if st.deliver < DefaultDecayToZero {
|
if st.deliver < pg.params.DecayToZero {
|
||||||
st.deliver = 0
|
st.deliver = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
st.duplicate *= pg.decay
|
st.duplicate *= pg.params.Decay
|
||||||
if st.duplicate < DefaultDecayToZero {
|
if st.duplicate < pg.params.DecayToZero {
|
||||||
st.duplicate = 0
|
st.duplicate = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
st.ignore *= pg.decay
|
st.ignore *= pg.params.Decay
|
||||||
if st.ignore < DefaultDecayToZero {
|
if st.ignore < pg.params.DecayToZero {
|
||||||
st.ignore = 0
|
st.ignore = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
st.reject *= pg.decay
|
st.reject *= pg.params.Decay
|
||||||
if st.reject < DefaultDecayToZero {
|
if st.reject < pg.params.DecayToZero {
|
||||||
st.reject = 0
|
st.reject = 0
|
||||||
}
|
}
|
||||||
} else if st.expire.Before(now) {
|
} else if st.expire.Before(now) {
|
||||||
@ -194,6 +296,7 @@ func (pg *peerGater) getPeerIP(p peer.ID) string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// router interface
|
||||||
func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus {
|
func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus {
|
||||||
if pg == nil {
|
if pg == nil {
|
||||||
return AcceptAll
|
return AcceptAll
|
||||||
@ -202,26 +305,34 @@ func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus {
|
|||||||
pg.Lock()
|
pg.Lock()
|
||||||
defer pg.Unlock()
|
defer pg.Unlock()
|
||||||
|
|
||||||
if time.Since(pg.lastThrottle) > PeerGaterQuiet {
|
// check the quiet period; if the validation queue has not throttled for more than the Quiet
|
||||||
|
// interval, we turn off the circuit breaker and accept.
|
||||||
|
if time.Since(pg.lastThrottle) > pg.params.Quiet {
|
||||||
return AcceptAll
|
return AcceptAll
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// no throttle events -- or they have decayed; accept.
|
||||||
if pg.throttle == 0 {
|
if pg.throttle == 0 {
|
||||||
return AcceptAll
|
return AcceptAll
|
||||||
}
|
}
|
||||||
|
|
||||||
if pg.validate != 0 && pg.throttle/pg.validate < pg.threshold {
|
// check the throttle/validate ration; if it is below threshold we accept.
|
||||||
|
if pg.validate != 0 && pg.throttle/pg.validate < pg.params.Threshold {
|
||||||
return AcceptAll
|
return AcceptAll
|
||||||
}
|
}
|
||||||
|
|
||||||
st := pg.getPeerStats(p)
|
st := pg.getPeerStats(p)
|
||||||
|
|
||||||
total := st.deliver + 0.25*st.duplicate + st.ignore + 4*st.reject
|
// compute the goodput of the peer; the denominator is the weighted mix of message counters
|
||||||
|
total := st.deliver + pg.params.DuplicateWeight*st.duplicate + pg.params.IgnoreWeight*st.ignore + pg.params.RejectWeight*st.reject
|
||||||
if total == 0 {
|
if total == 0 {
|
||||||
return AcceptAll
|
return AcceptAll
|
||||||
}
|
}
|
||||||
|
|
||||||
// we make a randomized decision based on the goodput of the peer
|
// we make a randomized decision based on the goodput of the peer.
|
||||||
|
// the probabiity is biased by adding 1 to the delivery counter so that we don't unconditionally
|
||||||
|
// throttle in the first negative event; it also ensures that a peer always has a chance of being
|
||||||
|
// accepted; this is not a sinkhole/blacklist.
|
||||||
threshold := (1 + st.deliver) / (1 + total)
|
threshold := (1 + st.deliver) / (1 + total)
|
||||||
if rand.Float64() < threshold {
|
if rand.Float64() < threshold {
|
||||||
return AcceptAll
|
return AcceptAll
|
||||||
@ -231,6 +342,7 @@ func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus {
|
|||||||
return AcceptControl
|
return AcceptControl
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// tracer interface
|
||||||
func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) {
|
func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) {
|
||||||
pg.Lock()
|
pg.Lock()
|
||||||
defer pg.Unlock()
|
defer pg.Unlock()
|
||||||
@ -245,7 +357,7 @@ func (pg *peerGater) RemovePeer(p peer.ID) {
|
|||||||
|
|
||||||
st := pg.getPeerStats(p)
|
st := pg.getPeerStats(p)
|
||||||
st.connected--
|
st.connected--
|
||||||
st.expire = time.Now().Add(PeerGaterRetainStats)
|
st.expire = time.Now().Add(pg.params.RetainStats)
|
||||||
|
|
||||||
delete(pg.peerStats, p)
|
delete(pg.peerStats, p)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,7 +15,13 @@ func TestPeerGater(t *testing.T) {
|
|||||||
peerA := peer.ID("A")
|
peerA := peer.ID("A")
|
||||||
peerAip := "1.2.3.4"
|
peerAip := "1.2.3.4"
|
||||||
|
|
||||||
pg := newPeerGater(ctx, nil, .1, .9)
|
params := NewPeerGaterParams(.1, .9)
|
||||||
|
err := params.validate()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pg := newPeerGater(ctx, nil, params)
|
||||||
pg.getIP = func(p peer.ID) string {
|
pg.getIP = func(p peer.ID) string {
|
||||||
switch p {
|
switch p {
|
||||||
case peerA:
|
case peerA:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user