mirror of https://github.com/status-im/consul.git
Rate limiter/add ip prefix (#16342)
* add support for prefixes in the config tree * fix to use default config when the prefix have no config
This commit is contained in:
parent
641737f32b
commit
ae9c228967
|
@ -15,14 +15,10 @@ var _ RateLimiter = &MultiLimiter{}
|
||||||
|
|
||||||
const separator = "♣"
|
const separator = "♣"
|
||||||
|
|
||||||
func makeKey(keys ...[]byte) KeyType {
|
func Key(keys ...[]byte) KeyType {
|
||||||
return bytes.Join(keys, []byte(separator))
|
return bytes.Join(keys, []byte(separator))
|
||||||
}
|
}
|
||||||
|
|
||||||
func Key(prefix, key []byte) KeyType {
|
|
||||||
return makeKey(prefix, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// RateLimiter is the interface implemented by MultiLimiter
|
// RateLimiter is the interface implemented by MultiLimiter
|
||||||
//
|
//
|
||||||
//go:generate mockery --name RateLimiter --inpackage --filename mock_RateLimiter.go
|
//go:generate mockery --name RateLimiter --inpackage --filename mock_RateLimiter.go
|
||||||
|
@ -131,19 +127,9 @@ func (m *MultiLimiter) Run(ctx context.Context) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func splitKey(key []byte) ([]byte, []byte) {
|
|
||||||
|
|
||||||
ret := bytes.SplitN(key, []byte(separator), 2)
|
|
||||||
if len(ret) != 2 {
|
|
||||||
return []byte(""), []byte("")
|
|
||||||
}
|
|
||||||
return ret[0], ret[1]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allow should be called by a request processor to check if the current request is Limited
|
// Allow should be called by a request processor to check if the current request is Limited
|
||||||
// The request processor should provide a LimitedEntity that implement the right Key()
|
// The request processor should provide a LimitedEntity that implement the right Key()
|
||||||
func (m *MultiLimiter) Allow(e LimitedEntity) bool {
|
func (m *MultiLimiter) Allow(e LimitedEntity) bool {
|
||||||
prefix, _ := splitKey(e.Key())
|
|
||||||
limiters := m.limiters.Load()
|
limiters := m.limiters.Load()
|
||||||
l, ok := limiters.Get(e.Key())
|
l, ok := limiters.Get(e.Key())
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
@ -157,14 +143,16 @@ func (m *MultiLimiter) Allow(e LimitedEntity) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
configs := m.limitersConfigs.Load()
|
configs := m.limitersConfigs.Load()
|
||||||
c, okP := configs.Get(prefix)
|
config := &m.defaultConfig.Load().LimiterConfig
|
||||||
var config = &m.defaultConfig.Load().LimiterConfig
|
p, _, ok := configs.Root().LongestPrefix(e.Key())
|
||||||
if okP {
|
|
||||||
prefixConfig := c.(*LimiterConfig)
|
if ok {
|
||||||
if prefixConfig != nil {
|
c, ok := configs.Get(p)
|
||||||
config = prefixConfig
|
if ok && c != nil {
|
||||||
|
config = c.(*LimiterConfig)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
limiter := &Limiter{limiter: rate.NewLimiter(config.Rate, config.Burst)}
|
limiter := &Limiter{limiter: rate.NewLimiter(config.Rate, config.Burst)}
|
||||||
limiter.lastAccess.Store(unixNow)
|
limiter.lastAccess.Store(unixNow)
|
||||||
m.limiterCh <- &limiterWithKey{l: limiter, k: e.Key(), t: now}
|
m.limiterCh <- &limiterWithKey{l: limiter, k: e.Key(), t: now}
|
||||||
|
@ -182,7 +170,6 @@ type tickerWrapper struct {
|
||||||
func (t tickerWrapper) Ticker() <-chan time.Time {
|
func (t tickerWrapper) Ticker() <-chan time.Time {
|
||||||
return t.ticker.C
|
return t.ticker.C
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MultiLimiter) reconcile(ctx context.Context, waiter ticker, txn *radix.Txn, reconcileCheckLimit time.Duration) *radix.Txn {
|
func (m *MultiLimiter) reconcile(ctx context.Context, waiter ticker, txn *radix.Txn, reconcileCheckLimit time.Duration) *radix.Txn {
|
||||||
select {
|
select {
|
||||||
case <-waiter.Ticker():
|
case <-waiter.Ticker():
|
||||||
|
@ -223,8 +210,11 @@ func (m *MultiLimiter) reconcileConfig(txn *radix.Txn) {
|
||||||
|
|
||||||
// find the prefix for the leaf and check if the defaultConfig is up-to-date
|
// find the prefix for the leaf and check if the defaultConfig is up-to-date
|
||||||
// it's possible that the prefix is equal to the key
|
// it's possible that the prefix is equal to the key
|
||||||
prefix, _ := splitKey(k)
|
p, _, ok := m.limitersConfigs.Load().Root().LongestPrefix(k)
|
||||||
v, ok := m.limitersConfigs.Load().Get(prefix)
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
v, ok := m.limitersConfigs.Load().Get(p)
|
||||||
if v == nil || !ok {
|
if v == nil || !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,7 @@ func TestNewMultiLimiter(t *testing.T) {
|
||||||
func TestRateLimiterUpdate(t *testing.T) {
|
func TestRateLimiterUpdate(t *testing.T) {
|
||||||
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 1 * time.Hour, ReconcileCheckInterval: 10 * time.Millisecond}
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 1 * time.Hour, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
m := NewMultiLimiter(c)
|
m := NewMultiLimiter(c)
|
||||||
key := makeKey([]byte("test"))
|
key := Key([]byte("test"))
|
||||||
|
|
||||||
//Allow a key
|
//Allow a key
|
||||||
m.Allow(Limited{key: key})
|
m.Allow(Limited{key: key})
|
||||||
|
@ -77,7 +77,7 @@ func TestRateLimiterCleanup(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
m.Run(ctx)
|
m.Run(ctx)
|
||||||
key := makeKey([]byte("test"))
|
key := Key([]byte("test"))
|
||||||
m.Allow(Limited{key: key})
|
m.Allow(Limited{key: key})
|
||||||
retry.RunWith(&retry.Timer{Wait: 100 * time.Millisecond, Timeout: 2 * time.Second}, t, func(r *retry.R) {
|
retry.RunWith(&retry.Timer{Wait: 100 * time.Millisecond, Timeout: 2 * time.Second}, t, func(r *retry.R) {
|
||||||
l := m.limiters.Load()
|
l := m.limiters.Load()
|
||||||
|
@ -279,6 +279,53 @@ func TestRateLimiterUpdateConfig(t *testing.T) {
|
||||||
limiter := l.(*Limiter)
|
limiter := l.(*Limiter)
|
||||||
require.True(t, c1.isApplied(limiter.limiter))
|
require.True(t, c1.isApplied(limiter.limiter))
|
||||||
})
|
})
|
||||||
|
t.Run("Allow an IP with prefix and check prefix config is applied to new keys under that prefix", func(t *testing.T) {
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
require.Equal(t, *m.defaultConfig.Load(), c)
|
||||||
|
c1 := LimiterConfig{Rate: 3}
|
||||||
|
prefix := Key([]byte("ip.ratelimit"), []byte("127.0"))
|
||||||
|
m.UpdateConfig(c1, prefix)
|
||||||
|
ip := Key([]byte("ip.ratelimit"), []byte("127.0.0.1"))
|
||||||
|
m.Allow(ipLimited{key: ip})
|
||||||
|
storeLimiter(m)
|
||||||
|
load := m.limiters.Load()
|
||||||
|
l, ok := load.Get(ip)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
limiter := l.(*Limiter)
|
||||||
|
require.True(t, c1.isApplied(limiter.limiter))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Allow an IP with 2 prefixes and check prefix config is applied to new keys under that prefix", func(t *testing.T) {
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
require.Equal(t, *m.defaultConfig.Load(), c)
|
||||||
|
c1 := LimiterConfig{Rate: 3}
|
||||||
|
prefix := Key([]byte("ip.ratelimit"), []byte("127.0"))
|
||||||
|
m.UpdateConfig(c1, prefix)
|
||||||
|
prefix = Key([]byte("ip.ratelimit"), []byte("127.0.0"))
|
||||||
|
c2 := LimiterConfig{Rate: 6}
|
||||||
|
m.UpdateConfig(c2, prefix)
|
||||||
|
ip := Key([]byte("ip.ratelimit"), []byte("127.0.0.1"))
|
||||||
|
m.Allow(ipLimited{key: ip})
|
||||||
|
storeLimiter(m)
|
||||||
|
load := m.limiters.Load()
|
||||||
|
l, ok := load.Get(ip)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
limiter := l.(*Limiter)
|
||||||
|
require.True(t, c2.isApplied(limiter.limiter))
|
||||||
|
ip = Key([]byte("ip.ratelimit"), []byte("127.0.1.1"))
|
||||||
|
m.Allow(ipLimited{key: ip})
|
||||||
|
storeLimiter(m)
|
||||||
|
load = m.limiters.Load()
|
||||||
|
l, ok = load.Get(ip)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
limiter = l.(*Limiter)
|
||||||
|
require.True(t, c1.isApplied(limiter.limiter))
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("Allow an IP with prefix and check after it's cleaned new Allow would give it the right defaultConfig", func(t *testing.T) {
|
t.Run("Allow an IP with prefix and check after it's cleaned new Allow would give it the right defaultConfig", func(t *testing.T) {
|
||||||
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
@ -304,10 +351,10 @@ func FuzzSingleConfig(f *testing.F) {
|
||||||
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
m := NewMultiLimiter(c)
|
m := NewMultiLimiter(c)
|
||||||
require.Equal(f, *m.defaultConfig.Load(), c)
|
require.Equal(f, *m.defaultConfig.Load(), c)
|
||||||
f.Add(makeKey(randIP()))
|
f.Add(Key(randIP()))
|
||||||
f.Add(makeKey(randIP(), randIP()))
|
f.Add(Key(randIP(), randIP()))
|
||||||
f.Add(makeKey(randIP(), randIP(), randIP()))
|
f.Add(Key(randIP(), randIP(), randIP()))
|
||||||
f.Add(makeKey(randIP(), randIP(), randIP(), randIP()))
|
f.Add(Key(randIP(), randIP(), randIP(), randIP()))
|
||||||
f.Fuzz(func(t *testing.T, ff []byte) {
|
f.Fuzz(func(t *testing.T, ff []byte) {
|
||||||
m.Allow(Limited{key: ff})
|
m.Allow(Limited{key: ff})
|
||||||
storeLimiter(m)
|
storeLimiter(m)
|
||||||
|
@ -317,9 +364,9 @@ func FuzzSingleConfig(f *testing.F) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func FuzzSplitKey(f *testing.F) {
|
func FuzzSplitKey(f *testing.F) {
|
||||||
f.Add(makeKey(randIP(), randIP()))
|
f.Add(Key(randIP(), randIP()))
|
||||||
f.Add(makeKey(randIP(), randIP(), randIP()))
|
f.Add(Key(randIP(), randIP(), randIP()))
|
||||||
f.Add(makeKey(randIP(), randIP(), randIP(), randIP()))
|
f.Add(Key(randIP(), randIP(), randIP(), randIP()))
|
||||||
f.Add([]byte(""))
|
f.Add([]byte(""))
|
||||||
f.Fuzz(func(t *testing.T, ff []byte) {
|
f.Fuzz(func(t *testing.T, ff []byte) {
|
||||||
prefix, suffix := splitKey(ff)
|
prefix, suffix := splitKey(ff)
|
||||||
|
@ -342,7 +389,7 @@ func checkLimiter(t require.TestingT, ff []byte, Tree *radix.Txn) {
|
||||||
|
|
||||||
func FuzzUpdateConfig(f *testing.F) {
|
func FuzzUpdateConfig(f *testing.F) {
|
||||||
|
|
||||||
f.Add(bytes.Join([][]byte{[]byte(""), makeKey(randIP()), makeKey(randIP(), randIP()), makeKey(randIP(), randIP(), randIP()), makeKey(randIP(), randIP(), randIP(), randIP())}, []byte(",")))
|
f.Add(bytes.Join([][]byte{[]byte(""), Key(randIP()), Key(randIP(), randIP()), Key(randIP(), randIP(), randIP()), Key(randIP(), randIP(), randIP(), randIP())}, []byte(",")))
|
||||||
f.Fuzz(func(t *testing.T, ff []byte) {
|
f.Fuzz(func(t *testing.T, ff []byte) {
|
||||||
cm := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 1 * time.Millisecond, ReconcileCheckInterval: 1 * time.Millisecond}
|
cm := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 1 * time.Millisecond, ReconcileCheckInterval: 1 * time.Millisecond}
|
||||||
m := NewMultiLimiter(cm)
|
m := NewMultiLimiter(cm)
|
||||||
|
@ -509,3 +556,12 @@ type mockTicker struct {
|
||||||
func (m *mockTicker) Ticker() <-chan time.Time {
|
func (m *mockTicker) Ticker() <-chan time.Time {
|
||||||
return m.tickerCh
|
return m.tickerCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func splitKey(key []byte) ([]byte, []byte) {
|
||||||
|
|
||||||
|
ret := bytes.SplitN(key, []byte(separator), 2)
|
||||||
|
if len(ret) != 2 {
|
||||||
|
return []byte(""), []byte("")
|
||||||
|
}
|
||||||
|
return ret[0], ret[1]
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue