From ae9c22896762f6d4a952df03103fd2b55321b9eb Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Wed, 22 Feb 2023 15:15:51 -0500 Subject: [PATCH] Rate limiter/add ip prefix (#16342) * add support for prefixes in the config tree * fix to use default config when the prefix have no config --- agent/consul/multilimiter/multilimiter.go | 38 ++++------ .../consul/multilimiter/multilimiter_test.go | 76 ++++++++++++++++--- 2 files changed, 80 insertions(+), 34 deletions(-) diff --git a/agent/consul/multilimiter/multilimiter.go b/agent/consul/multilimiter/multilimiter.go index c66948b263..21839d3fe0 100644 --- a/agent/consul/multilimiter/multilimiter.go +++ b/agent/consul/multilimiter/multilimiter.go @@ -15,14 +15,10 @@ var _ RateLimiter = &MultiLimiter{} const separator = "♣" -func makeKey(keys ...[]byte) KeyType { +func Key(keys ...[]byte) KeyType { return bytes.Join(keys, []byte(separator)) } -func Key(prefix, key []byte) KeyType { - return makeKey(prefix, key) -} - // RateLimiter is the interface implemented by MultiLimiter // //go:generate mockery --name RateLimiter --inpackage --filename mock_RateLimiter.go @@ -131,19 +127,9 @@ func (m *MultiLimiter) Run(ctx context.Context) { } -func splitKey(key []byte) ([]byte, []byte) { - - ret := bytes.SplitN(key, []byte(separator), 2) - if len(ret) != 2 { - return []byte(""), []byte("") - } - return ret[0], ret[1] -} - // Allow should be called by a request processor to check if the current request is Limited // The request processor should provide a LimitedEntity that implement the right Key() func (m *MultiLimiter) Allow(e LimitedEntity) bool { - prefix, _ := splitKey(e.Key()) limiters := m.limiters.Load() l, ok := limiters.Get(e.Key()) now := time.Now() @@ -157,14 +143,16 @@ func (m *MultiLimiter) Allow(e LimitedEntity) bool { } configs := m.limitersConfigs.Load() - c, okP := configs.Get(prefix) - var config = &m.defaultConfig.Load().LimiterConfig - if okP { - prefixConfig := c.(*LimiterConfig) - if prefixConfig != nil { - config = prefixConfig + config := &m.defaultConfig.Load().LimiterConfig + p, _, ok := configs.Root().LongestPrefix(e.Key()) + + if ok { + c, ok := configs.Get(p) + if ok && c != nil { + config = c.(*LimiterConfig) } } + limiter := &Limiter{limiter: rate.NewLimiter(config.Rate, config.Burst)} limiter.lastAccess.Store(unixNow) m.limiterCh <- &limiterWithKey{l: limiter, k: e.Key(), t: now} @@ -182,7 +170,6 @@ type tickerWrapper struct { func (t tickerWrapper) Ticker() <-chan time.Time { return t.ticker.C } - func (m *MultiLimiter) reconcile(ctx context.Context, waiter ticker, txn *radix.Txn, reconcileCheckLimit time.Duration) *radix.Txn { select { case <-waiter.Ticker(): @@ -223,8 +210,11 @@ func (m *MultiLimiter) reconcileConfig(txn *radix.Txn) { // find the prefix for the leaf and check if the defaultConfig is up-to-date // it's possible that the prefix is equal to the key - prefix, _ := splitKey(k) - v, ok := m.limitersConfigs.Load().Get(prefix) + p, _, ok := m.limitersConfigs.Load().Root().LongestPrefix(k) + if !ok { + continue + } + v, ok := m.limitersConfigs.Load().Get(p) if v == nil || !ok { continue } diff --git a/agent/consul/multilimiter/multilimiter_test.go b/agent/consul/multilimiter/multilimiter_test.go index b64f95febd..2b04f29eef 100644 --- a/agent/consul/multilimiter/multilimiter_test.go +++ b/agent/consul/multilimiter/multilimiter_test.go @@ -33,7 +33,7 @@ func TestNewMultiLimiter(t *testing.T) { func TestRateLimiterUpdate(t *testing.T) { c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 1 * time.Hour, ReconcileCheckInterval: 10 * time.Millisecond} m := NewMultiLimiter(c) - key := makeKey([]byte("test")) + key := Key([]byte("test")) //Allow a key m.Allow(Limited{key: key}) @@ -77,7 +77,7 @@ func TestRateLimiterCleanup(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() m.Run(ctx) - key := makeKey([]byte("test")) + key := Key([]byte("test")) m.Allow(Limited{key: key}) retry.RunWith(&retry.Timer{Wait: 100 * time.Millisecond, Timeout: 2 * time.Second}, t, func(r *retry.R) { l := m.limiters.Load() @@ -279,6 +279,53 @@ func TestRateLimiterUpdateConfig(t *testing.T) { limiter := l.(*Limiter) require.True(t, c1.isApplied(limiter.limiter)) }) + t.Run("Allow an IP with prefix and check prefix config is applied to new keys under that prefix", func(t *testing.T) { + c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond} + m := NewMultiLimiter(c) + require.Equal(t, *m.defaultConfig.Load(), c) + c1 := LimiterConfig{Rate: 3} + prefix := Key([]byte("ip.ratelimit"), []byte("127.0")) + m.UpdateConfig(c1, prefix) + ip := Key([]byte("ip.ratelimit"), []byte("127.0.0.1")) + m.Allow(ipLimited{key: ip}) + storeLimiter(m) + load := m.limiters.Load() + l, ok := load.Get(ip) + require.True(t, ok) + require.NotNil(t, l) + limiter := l.(*Limiter) + require.True(t, c1.isApplied(limiter.limiter)) + }) + + t.Run("Allow an IP with 2 prefixes and check prefix config is applied to new keys under that prefix", func(t *testing.T) { + c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond} + m := NewMultiLimiter(c) + require.Equal(t, *m.defaultConfig.Load(), c) + c1 := LimiterConfig{Rate: 3} + prefix := Key([]byte("ip.ratelimit"), []byte("127.0")) + m.UpdateConfig(c1, prefix) + prefix = Key([]byte("ip.ratelimit"), []byte("127.0.0")) + c2 := LimiterConfig{Rate: 6} + m.UpdateConfig(c2, prefix) + ip := Key([]byte("ip.ratelimit"), []byte("127.0.0.1")) + m.Allow(ipLimited{key: ip}) + storeLimiter(m) + load := m.limiters.Load() + l, ok := load.Get(ip) + require.True(t, ok) + require.NotNil(t, l) + limiter := l.(*Limiter) + require.True(t, c2.isApplied(limiter.limiter)) + ip = Key([]byte("ip.ratelimit"), []byte("127.0.1.1")) + m.Allow(ipLimited{key: ip}) + storeLimiter(m) + load = m.limiters.Load() + l, ok = load.Get(ip) + require.True(t, ok) + require.NotNil(t, l) + limiter = l.(*Limiter) + require.True(t, c1.isApplied(limiter.limiter)) + }) t.Run("Allow an IP with prefix and check after it's cleaned new Allow would give it the right defaultConfig", func(t *testing.T) { c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond} @@ -304,10 +351,10 @@ func FuzzSingleConfig(f *testing.F) { c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond} m := NewMultiLimiter(c) require.Equal(f, *m.defaultConfig.Load(), c) - f.Add(makeKey(randIP())) - f.Add(makeKey(randIP(), randIP())) - f.Add(makeKey(randIP(), randIP(), randIP())) - f.Add(makeKey(randIP(), randIP(), randIP(), randIP())) + f.Add(Key(randIP())) + f.Add(Key(randIP(), randIP())) + f.Add(Key(randIP(), randIP(), randIP())) + f.Add(Key(randIP(), randIP(), randIP(), randIP())) f.Fuzz(func(t *testing.T, ff []byte) { m.Allow(Limited{key: ff}) storeLimiter(m) @@ -317,9 +364,9 @@ func FuzzSingleConfig(f *testing.F) { } func FuzzSplitKey(f *testing.F) { - f.Add(makeKey(randIP(), randIP())) - f.Add(makeKey(randIP(), randIP(), randIP())) - f.Add(makeKey(randIP(), randIP(), randIP(), randIP())) + f.Add(Key(randIP(), randIP())) + f.Add(Key(randIP(), randIP(), randIP())) + f.Add(Key(randIP(), randIP(), randIP(), randIP())) f.Add([]byte("")) f.Fuzz(func(t *testing.T, ff []byte) { prefix, suffix := splitKey(ff) @@ -342,7 +389,7 @@ func checkLimiter(t require.TestingT, ff []byte, Tree *radix.Txn) { func FuzzUpdateConfig(f *testing.F) { - f.Add(bytes.Join([][]byte{[]byte(""), makeKey(randIP()), makeKey(randIP(), randIP()), makeKey(randIP(), randIP(), randIP()), makeKey(randIP(), randIP(), randIP(), randIP())}, []byte(","))) + f.Add(bytes.Join([][]byte{[]byte(""), Key(randIP()), Key(randIP(), randIP()), Key(randIP(), randIP(), randIP()), Key(randIP(), randIP(), randIP(), randIP())}, []byte(","))) f.Fuzz(func(t *testing.T, ff []byte) { cm := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 1 * time.Millisecond, ReconcileCheckInterval: 1 * time.Millisecond} m := NewMultiLimiter(cm) @@ -509,3 +556,12 @@ type mockTicker struct { func (m *mockTicker) Ticker() <-chan time.Time { return m.tickerCh } + +func splitKey(key []byte) ([]byte, []byte) { + + ret := bytes.SplitN(key, []byte(separator), 2) + if len(ret) != 2 { + return []byte(""), []byte("") + } + return ret[0], ret[1] +}