diff --git a/peer_gater.go b/peer_gater.go index 37d9ab7..54d7fe3 100644 --- a/peer_gater.go +++ b/peer_gater.go @@ -16,46 +16,149 @@ import ( manet "github.com/multiformats/go-multiaddr-net" ) -var PeerGaterRetainStats = 6 * time.Hour -var PeerGaterQuiet = time.Minute +var ( + DefaultPeerGaterRetainStats = 6 * time.Hour + DefaultPeerGaterQuiet = time.Minute + DefaultPeerGaterDuplicateWeight = 0.25 + DefaultPeerGaterIgnoreWeight = 1.0 + DefaultPeerGaterRejectWeight = 4.0 + DefaultPeerGaterThreshold = 0.33 + DefaultPeerGaterDecay = ScoreParameterDecay(time.Hour) +) +// PeerGaterParams groups together parameters that control the operation of the peer gater +type PeerGaterParams struct { + // when the ratio of throttled/validated messages exceeds this threshold, the gater turns on + Threshold float64 + // (linear) decay parameter for gater counters + Decay float64 + // decay interval + DecayInterval time.Duration + // counter zeroing threshold + DecayToZero float64 + // how long to retain stats + RetainStats time.Duration + // quiet interval before turning off the gater; if there are no validation throttle events + // for this interval, the gater turns off + Quiet time.Duration + // weight of duplicate message deliveries + DuplicateWeight float64 + // weight of ignored messages + IgnoreWeight float64 + // weight of rejected messages + RejectWeight float64 +} + +func (p *PeerGaterParams) validate() error { + if p.Threshold <= 0 { + return fmt.Errorf("invalid Threshold; must be > 0") + } + if p.Decay <= 0 || p.Decay >= 1 { + return fmt.Errorf("invalid Decay; must be between 0 and 1") + } + if p.DecayInterval < time.Second { + return fmt.Errorf("invalid DecayInterval; must be at least 1s") + } + if p.DecayToZero <= 0 || p.DecayToZero >= 1 { + return fmt.Errorf("invalid DecayToZero; must be between 0 and 1") + } + // no need to check stats retention; a value of 0 means we don't retain stats + if p.Quiet < time.Second { + return fmt.Errorf("invalud Quiet interval; must be at least 1s") + } + if p.DuplicateWeight <= 0 { + return fmt.Errorf("invalid DuplicateWeight; must be > 0") + } + if p.IgnoreWeight < 1 { + return fmt.Errorf("invalid IgnoreWeight; must be >= 1") + } + if p.RejectWeight < 1 { + return fmt.Errorf("invalud RejectWeight; must be >= 1") + } + + return nil +} + +// NewPeerGaterParams creates a new PeerGaterParams struct, using the specified threshold and decay +// parameters and default values for all other parameters. +func NewPeerGaterParams(threshold, decay float64) *PeerGaterParams { + return &PeerGaterParams{ + Threshold: threshold, + Decay: decay, + DecayToZero: DefaultDecayToZero, + DecayInterval: DefaultDecayInterval, + RetainStats: DefaultPeerGaterRetainStats, + Quiet: DefaultPeerGaterQuiet, + DuplicateWeight: DefaultPeerGaterDuplicateWeight, + IgnoreWeight: DefaultPeerGaterIgnoreWeight, + RejectWeight: DefaultPeerGaterRejectWeight, + } +} + +// DefaultPeerGaterParams creates a new PeerGaterParams struct using default values +func DefaultPeerGaterParams() *PeerGaterParams { + return NewPeerGaterParams(DefaultPeerGaterThreshold, DefaultPeerGaterDecay) +} + +// the gater object. type peerGater struct { sync.Mutex - threshold, decay float64 + host host.Host + + // gater parameters + params *PeerGaterParams + + // counters validate, throttle float64 + // time of last validation throttle lastThrottle time.Time + // stats per peer.ID -- multiple peer IDs may share the same stats object if they are + // colocated in the same IP peerStats map[peer.ID]*peerGaterStats - ipStats map[string]*peerGaterStats - - host host.Host + // stats per IP + ipStats map[string]*peerGaterStats // for unit tests getIP func(peer.ID) string } type peerGaterStats struct { + // number of connected peer IDs mapped to this stat object connected int - expire time.Time + // stats expiration time -- only valid if connected = 0 + expire time.Time + // counters deliver, duplicate, ignore, reject float64 } // WithPeerGater is a gossipsub router option that enables reactive validation queue // management. -// The threshold parameter is the threshold of throttled/validated messages before the gating -// kicks in. -// The decay parameter is the (linear) decay of counters per second. -func WithPeerGater(threshold, decay float64) Option { +// The Gater is activated if the ratio of throttled/validated messages exceeds the specified +// threshold. +// Once active, the Gater probabilistically throttles peers _before_ they enter the validation +// queue, performing Random Early Drop. +// The throttle decision is randomized, with the probability of allowing messages to enter the +// validation queue controlled by the statistical observations of the performance of all peers +// in the IP address of the gated peer. +// The Gater deactivates if there is no validation throttlinc occurring for the specified quiet +// interval. +func WithPeerGater(params *PeerGaterParams) Option { return func(ps *PubSub) error { gs, ok := ps.rt.(*GossipSubRouter) if !ok { return fmt.Errorf("pubsub router is not gossipsub") } - gs.gate = newPeerGater(ps.ctx, ps.host, threshold, decay) + err := params.validate() + if err != nil { + return err + } + + gs.gate = newPeerGater(ps.ctx, ps.host, params) // hook the tracer if ps.tracer != nil { @@ -72,10 +175,9 @@ func WithPeerGater(threshold, decay float64) Option { } } -func newPeerGater(ctx context.Context, host host.Host, threshold, decay float64) *peerGater { +func newPeerGater(ctx context.Context, host host.Host, params *PeerGaterParams) *peerGater { pg := &peerGater{ - threshold: threshold, - decay: decay, + params: params, peerStats: make(map[peer.ID]*peerGaterStats), ipStats: make(map[string]*peerGaterStats), host: host, @@ -85,7 +187,7 @@ func newPeerGater(ctx context.Context, host host.Host, threshold, decay float64) } func (pg *peerGater) background(ctx context.Context) { - tick := time.NewTicker(DefaultDecayInterval) + tick := time.NewTicker(pg.params.DecayInterval) defer tick.Stop() @@ -103,36 +205,36 @@ func (pg *peerGater) decayStats() { pg.Lock() defer pg.Unlock() - pg.validate *= pg.decay - if pg.validate < DefaultDecayToZero { + pg.validate *= pg.params.Decay + if pg.validate < pg.params.DecayToZero { pg.validate = 0 } - pg.throttle *= pg.throttle - if pg.throttle < DefaultDecayToZero { + pg.throttle *= pg.params.Decay + if pg.throttle < pg.params.DecayToZero { pg.throttle = 0 } now := time.Now() for ip, st := range pg.ipStats { if st.connected > 0 { - st.deliver *= pg.decay - if st.deliver < DefaultDecayToZero { + st.deliver *= pg.params.Decay + if st.deliver < pg.params.DecayToZero { st.deliver = 0 } - st.duplicate *= pg.decay - if st.duplicate < DefaultDecayToZero { + st.duplicate *= pg.params.Decay + if st.duplicate < pg.params.DecayToZero { st.duplicate = 0 } - st.ignore *= pg.decay - if st.ignore < DefaultDecayToZero { + st.ignore *= pg.params.Decay + if st.ignore < pg.params.DecayToZero { st.ignore = 0 } - st.reject *= pg.decay - if st.reject < DefaultDecayToZero { + st.reject *= pg.params.Decay + if st.reject < pg.params.DecayToZero { st.reject = 0 } } else if st.expire.Before(now) { @@ -194,6 +296,7 @@ func (pg *peerGater) getPeerIP(p peer.ID) string { } } +// router interface func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus { if pg == nil { return AcceptAll @@ -202,26 +305,34 @@ func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus { pg.Lock() defer pg.Unlock() - if time.Since(pg.lastThrottle) > PeerGaterQuiet { + // check the quiet period; if the validation queue has not throttled for more than the Quiet + // interval, we turn off the circuit breaker and accept. + if time.Since(pg.lastThrottle) > pg.params.Quiet { return AcceptAll } + // no throttle events -- or they have decayed; accept. if pg.throttle == 0 { return AcceptAll } - if pg.validate != 0 && pg.throttle/pg.validate < pg.threshold { + // check the throttle/validate ration; if it is below threshold we accept. + if pg.validate != 0 && pg.throttle/pg.validate < pg.params.Threshold { return AcceptAll } st := pg.getPeerStats(p) - total := st.deliver + 0.25*st.duplicate + st.ignore + 4*st.reject + // compute the goodput of the peer; the denominator is the weighted mix of message counters + total := st.deliver + pg.params.DuplicateWeight*st.duplicate + pg.params.IgnoreWeight*st.ignore + pg.params.RejectWeight*st.reject if total == 0 { return AcceptAll } - // we make a randomized decision based on the goodput of the peer + // we make a randomized decision based on the goodput of the peer. + // the probabiity is biased by adding 1 to the delivery counter so that we don't unconditionally + // throttle in the first negative event; it also ensures that a peer always has a chance of being + // accepted; this is not a sinkhole/blacklist. threshold := (1 + st.deliver) / (1 + total) if rand.Float64() < threshold { return AcceptAll @@ -231,6 +342,7 @@ func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus { return AcceptControl } +// tracer interface func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) { pg.Lock() defer pg.Unlock() @@ -245,7 +357,7 @@ func (pg *peerGater) RemovePeer(p peer.ID) { st := pg.getPeerStats(p) st.connected-- - st.expire = time.Now().Add(PeerGaterRetainStats) + st.expire = time.Now().Add(pg.params.RetainStats) delete(pg.peerStats, p) } diff --git a/peer_gater_test.go b/peer_gater_test.go index 73a68fd..2ba22e4 100644 --- a/peer_gater_test.go +++ b/peer_gater_test.go @@ -15,7 +15,13 @@ func TestPeerGater(t *testing.T) { peerA := peer.ID("A") peerAip := "1.2.3.4" - pg := newPeerGater(ctx, nil, .1, .9) + params := NewPeerGaterParams(.1, .9) + err := params.validate() + if err != nil { + t.Fatal(err) + } + + pg := newPeerGater(ctx, nil, params) pg.getIP = func(p peer.ID) string { switch p { case peerA: