mirror of https://github.com/status-im/consul.git
add multilimiter and tests (#15467)
* add multilimiter and tests * exporting LimitedEntity * go mod tidy * Apply suggestions from code review Co-authored-by: John Murret <john.murret@hashicorp.com> * add config update and rename config params * add doc string and split config * Apply suggestions from code review Co-authored-by: Dan Upton <daniel@floppy.co> * use timer to avoid go routine leak and change the interface * add comments to tests * fix failing test * add prefix with config edge, refactor tests * Apply suggestions from code review Co-authored-by: Dan Upton <daniel@floppy.co> * refactor to apply configs for limiters under a prefix * add fuzz tests and fix bugs found. Refactor reconcile loop to have a simpler logic * make KeyType an exported type * split the config and limiter trees to fix race conditions in config update * rename variables * fix race in test and remove dead code * fix reconcile loop to not create a timer on each loop * add extra benchmark tests and fix tests * fix benchmark test to pass value to func * use a separate go routine to write limiters (#15643) * use a separate go routine to write limiters * Add updating limiter when another limiter is created * fix waiter to be a ticker, so we commit more than once. * fix tests and add tests for coverage * unexport members and add tests * make UpdateConfig thread safe and multi call to Run safe * replace swith with if * fix review comments * replace time.sleep with retries * fix flaky test and remove unnecessary init * fix test races * remove unnecessary negative test case * remove fixed todo Co-authored-by: John Murret <john.murret@hashicorp.com> Co-authored-by: Dan Upton <daniel@floppy.co>
This commit is contained in:
parent
eb3ff8cf73
commit
81e40c1fac
|
@ -0,0 +1,276 @@
|
||||||
|
package multilimiter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
radix "github.com/hashicorp/go-immutable-radix"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ RateLimiter = &MultiLimiter{}
|
||||||
|
|
||||||
|
const separator = "♣"
|
||||||
|
|
||||||
|
func makeKey(keys ...[]byte) KeyType {
|
||||||
|
return bytes.Join(keys, []byte(separator))
|
||||||
|
}
|
||||||
|
|
||||||
|
func Key(prefix, key []byte) KeyType {
|
||||||
|
return makeKey(prefix, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RateLimiter is the interface implemented by MultiLimiter
|
||||||
|
type RateLimiter interface {
|
||||||
|
Run(ctx context.Context)
|
||||||
|
Allow(entity LimitedEntity) bool
|
||||||
|
UpdateConfig(c LimiterConfig, prefix []byte)
|
||||||
|
}
|
||||||
|
|
||||||
|
type limiterWithKey struct {
|
||||||
|
l *Limiter
|
||||||
|
k []byte
|
||||||
|
t time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// MultiLimiter implement RateLimiter interface and represent a set of rate limiters
|
||||||
|
// specific to different LimitedEntities and queried by a LimitedEntities.Key()
|
||||||
|
type MultiLimiter struct {
|
||||||
|
limiters *atomic.Pointer[radix.Tree]
|
||||||
|
limitersConfigs *atomic.Pointer[radix.Tree]
|
||||||
|
defaultConfig *atomic.Pointer[Config]
|
||||||
|
limiterCh chan *limiterWithKey
|
||||||
|
configsLock sync.Mutex
|
||||||
|
once sync.Once
|
||||||
|
}
|
||||||
|
|
||||||
|
type KeyType = []byte
|
||||||
|
|
||||||
|
// LimitedEntity is an interface used by MultiLimiter.Allow to determine
|
||||||
|
// which rate limiter to use to allow the request
|
||||||
|
type LimitedEntity interface {
|
||||||
|
Key() KeyType
|
||||||
|
}
|
||||||
|
|
||||||
|
// Limiter define a limiter to be part of the MultiLimiter structure
|
||||||
|
type Limiter struct {
|
||||||
|
limiter *rate.Limiter
|
||||||
|
lastAccess atomic.Int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// LimiterConfig is a Limiter configuration
|
||||||
|
type LimiterConfig struct {
|
||||||
|
Rate rate.Limit
|
||||||
|
Burst int
|
||||||
|
}
|
||||||
|
|
||||||
|
// Config is a MultiLimiter configuration
|
||||||
|
type Config struct {
|
||||||
|
LimiterConfig
|
||||||
|
ReconcileCheckLimit time.Duration
|
||||||
|
ReconcileCheckInterval time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateConfig will update the MultiLimiter Config
|
||||||
|
// which will cascade to all the Limiter(s) LimiterConfig
|
||||||
|
func (m *MultiLimiter) UpdateConfig(c LimiterConfig, prefix []byte) {
|
||||||
|
m.configsLock.Lock()
|
||||||
|
defer m.configsLock.Unlock()
|
||||||
|
if prefix == nil {
|
||||||
|
prefix = []byte("")
|
||||||
|
}
|
||||||
|
configs := m.limitersConfigs.Load()
|
||||||
|
newConfigs, _, _ := configs.Insert(prefix, &c)
|
||||||
|
m.limitersConfigs.Store(newConfigs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMultiLimiter create a new MultiLimiter
|
||||||
|
func NewMultiLimiter(c Config) *MultiLimiter {
|
||||||
|
limiters := atomic.Pointer[radix.Tree]{}
|
||||||
|
configs := atomic.Pointer[radix.Tree]{}
|
||||||
|
config := atomic.Pointer[Config]{}
|
||||||
|
config.Store(&c)
|
||||||
|
limiters.Store(radix.New())
|
||||||
|
configs.Store(radix.New())
|
||||||
|
if c.ReconcileCheckLimit == 0 {
|
||||||
|
c.ReconcileCheckLimit = 30 * time.Millisecond
|
||||||
|
}
|
||||||
|
if c.ReconcileCheckInterval == 0 {
|
||||||
|
c.ReconcileCheckLimit = 1 * time.Second
|
||||||
|
}
|
||||||
|
chLimiter := make(chan *limiterWithKey, 100)
|
||||||
|
m := &MultiLimiter{limiters: &limiters, defaultConfig: &config, limitersConfigs: &configs, limiterCh: chLimiter}
|
||||||
|
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the cleanup routine to remove old entries of Limiters based on ReconcileCheckLimit and ReconcileCheckInterval.
|
||||||
|
func (m *MultiLimiter) Run(ctx context.Context) {
|
||||||
|
m.once.Do(func() {
|
||||||
|
go func() {
|
||||||
|
writeTimeout := 10 * time.Millisecond
|
||||||
|
limiters := m.limiters.Load()
|
||||||
|
txn := limiters.Txn()
|
||||||
|
waiter := time.NewTicker(writeTimeout)
|
||||||
|
wt := tickerWrapper{ticker: waiter}
|
||||||
|
defer waiter.Stop()
|
||||||
|
for {
|
||||||
|
if txn = m.runStoreOnce(ctx, wt, txn); txn == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
go func() {
|
||||||
|
waiter := time.NewTimer(0)
|
||||||
|
for {
|
||||||
|
c := m.defaultConfig.Load()
|
||||||
|
waiter.Reset(c.ReconcileCheckInterval)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
waiter.Stop()
|
||||||
|
return
|
||||||
|
case now := <-waiter.C:
|
||||||
|
m.reconcileLimitedOnce(now, c.ReconcileCheckLimit)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func splitKey(key []byte) ([]byte, []byte) {
|
||||||
|
|
||||||
|
ret := bytes.SplitN(key, []byte(separator), 2)
|
||||||
|
if len(ret) != 2 {
|
||||||
|
return []byte(""), []byte("")
|
||||||
|
}
|
||||||
|
return ret[0], ret[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allow should be called by a request processor to check if the current request is Limited
|
||||||
|
// The request processor should provide a LimitedEntity that implement the right Key()
|
||||||
|
func (m *MultiLimiter) Allow(e LimitedEntity) bool {
|
||||||
|
prefix, _ := splitKey(e.Key())
|
||||||
|
limiters := m.limiters.Load()
|
||||||
|
l, ok := limiters.Get(e.Key())
|
||||||
|
now := time.Now()
|
||||||
|
unixNow := time.Now().UnixMilli()
|
||||||
|
if ok {
|
||||||
|
limiter := l.(*Limiter)
|
||||||
|
if limiter.limiter != nil {
|
||||||
|
limiter.lastAccess.Store(unixNow)
|
||||||
|
return limiter.limiter.Allow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
configs := m.limitersConfigs.Load()
|
||||||
|
c, okP := configs.Get(prefix)
|
||||||
|
var config = &m.defaultConfig.Load().LimiterConfig
|
||||||
|
if okP {
|
||||||
|
prefixConfig := c.(*LimiterConfig)
|
||||||
|
if prefixConfig != nil {
|
||||||
|
config = prefixConfig
|
||||||
|
}
|
||||||
|
}
|
||||||
|
limiter := &Limiter{limiter: rate.NewLimiter(config.Rate, config.Burst)}
|
||||||
|
limiter.lastAccess.Store(unixNow)
|
||||||
|
m.limiterCh <- &limiterWithKey{l: limiter, k: e.Key(), t: now}
|
||||||
|
return limiter.limiter.Allow()
|
||||||
|
}
|
||||||
|
|
||||||
|
type ticker interface {
|
||||||
|
Ticker() <-chan time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type tickerWrapper struct {
|
||||||
|
ticker *time.Ticker
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t tickerWrapper) Ticker() <-chan time.Time {
|
||||||
|
return t.ticker.C
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MultiLimiter) runStoreOnce(ctx context.Context, waiter ticker, txn *radix.Txn) *radix.Txn {
|
||||||
|
select {
|
||||||
|
case <-waiter.Ticker():
|
||||||
|
tree := txn.Commit()
|
||||||
|
m.limiters.Store(tree)
|
||||||
|
txn = tree.Txn()
|
||||||
|
|
||||||
|
case lk := <-m.limiterCh:
|
||||||
|
v, ok := txn.Get(lk.k)
|
||||||
|
if !ok {
|
||||||
|
txn.Insert(lk.k, lk.l)
|
||||||
|
} else {
|
||||||
|
if l, ok := v.(*Limiter); ok {
|
||||||
|
l.lastAccess.Store(lk.t.Unix())
|
||||||
|
l.limiter.AllowN(lk.t, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return txn
|
||||||
|
}
|
||||||
|
|
||||||
|
// reconcileLimitedOnce is called by the MultiLimiter clean up routine to remove old Limited entries
|
||||||
|
// it will wait for ReconcileCheckInterval before traversing the radix tree and removing all entries
|
||||||
|
// with lastAccess > ReconcileCheckLimit
|
||||||
|
func (m *MultiLimiter) reconcileLimitedOnce(now time.Time, reconcileCheckLimit time.Duration) {
|
||||||
|
limiters := m.limiters.Load()
|
||||||
|
storedLimiters := limiters
|
||||||
|
iter := limiters.Root().Iterator()
|
||||||
|
k, v, ok := iter.Next()
|
||||||
|
var txn *radix.Txn
|
||||||
|
txn = limiters.Txn()
|
||||||
|
// remove all expired limiters
|
||||||
|
for ok {
|
||||||
|
if t, ok := v.(*Limiter); ok {
|
||||||
|
if t.limiter != nil {
|
||||||
|
lastAccess := t.lastAccess.Load()
|
||||||
|
lastAccessT := time.UnixMilli(lastAccess)
|
||||||
|
diff := now.Sub(lastAccessT)
|
||||||
|
|
||||||
|
if diff > reconcileCheckLimit {
|
||||||
|
txn.Delete(k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
k, v, ok = iter.Next()
|
||||||
|
}
|
||||||
|
iter = txn.Root().Iterator()
|
||||||
|
k, v, ok = iter.Next()
|
||||||
|
|
||||||
|
// make sure all limiters have the latest defaultConfig of their prefix
|
||||||
|
for ok {
|
||||||
|
if pl, ok := v.(*Limiter); ok {
|
||||||
|
// check if it has a limiter, if so that's a lead
|
||||||
|
if pl.limiter != nil {
|
||||||
|
// find the prefix for the leaf and check if the defaultConfig is up-to-date
|
||||||
|
// it's possible that the prefix is equal to the key
|
||||||
|
prefix, _ := splitKey(k)
|
||||||
|
v, ok := m.limitersConfigs.Load().Get(prefix)
|
||||||
|
if ok {
|
||||||
|
if cl, ok := v.(*LimiterConfig); ok {
|
||||||
|
if cl != nil {
|
||||||
|
if !cl.isApplied(pl.limiter) {
|
||||||
|
limiter := Limiter{limiter: rate.NewLimiter(cl.Rate, cl.Burst)}
|
||||||
|
limiter.lastAccess.Store(pl.lastAccess.Load())
|
||||||
|
txn.Insert(k, &limiter)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
k, v, ok = iter.Next()
|
||||||
|
}
|
||||||
|
limiters = txn.Commit()
|
||||||
|
m.limiters.CompareAndSwap(storedLimiters, limiters)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lc *LimiterConfig) isApplied(l *rate.Limiter) bool {
|
||||||
|
return l.Limit() == lc.Rate && l.Burst() == lc.Burst
|
||||||
|
}
|
|
@ -0,0 +1,502 @@
|
||||||
|
package multilimiter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
|
radix "github.com/hashicorp/go-immutable-radix"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
"math/rand"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Limited struct {
|
||||||
|
key KeyType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l Limited) Key() []byte {
|
||||||
|
return l.key
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewMultiLimiter(t *testing.T) {
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
require.NotNil(t, m)
|
||||||
|
require.NotNil(t, m.limiters)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRateLimiterUpdate(t *testing.T) {
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 1 * time.Hour, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
key := makeKey([]byte("test"))
|
||||||
|
|
||||||
|
//Allow a key
|
||||||
|
m.Allow(Limited{key: key})
|
||||||
|
storeLimiter(m)
|
||||||
|
limiters := m.limiters.Load()
|
||||||
|
l1, ok1 := limiters.Get(key)
|
||||||
|
retry.RunWith(&retry.Timer{Wait: 100 * time.Millisecond, Timeout: 2 * time.Second}, t, func(r *retry.R) {
|
||||||
|
limiters = m.limiters.Load()
|
||||||
|
l1, ok1 = limiters.Get(key)
|
||||||
|
// check key exist
|
||||||
|
require.True(r, ok1)
|
||||||
|
require.NotNil(r, l1)
|
||||||
|
})
|
||||||
|
|
||||||
|
la1 := l1.(*Limiter).lastAccess.Load()
|
||||||
|
|
||||||
|
// Sleep a bit just to make sure time change
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
// allow same key again
|
||||||
|
m.Allow(Limited{key: key})
|
||||||
|
limiters = m.limiters.Load()
|
||||||
|
l2, ok2 := limiters.Get(key)
|
||||||
|
|
||||||
|
// check it exist and it's same key
|
||||||
|
require.True(t, ok2)
|
||||||
|
require.NotNil(t, l2)
|
||||||
|
require.Equal(t, l1, l2)
|
||||||
|
|
||||||
|
// last access should be different
|
||||||
|
la2 := l1.(*Limiter).lastAccess.Load()
|
||||||
|
require.NotEqual(t, la1, la2)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRateLimiterCleanup(t *testing.T) {
|
||||||
|
|
||||||
|
// Create a limiter and Allow a key, check the key exists
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 1 * time.Second, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
limiters := m.limiters.Load()
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
m.Run(ctx)
|
||||||
|
key := makeKey([]byte("test"))
|
||||||
|
m.Allow(Limited{key: key})
|
||||||
|
retry.RunWith(&retry.Timer{Wait: 100 * time.Millisecond, Timeout: 2 * time.Second}, t, func(r *retry.R) {
|
||||||
|
l := m.limiters.Load()
|
||||||
|
require.NotEqual(r, limiters, l)
|
||||||
|
limiters = l
|
||||||
|
})
|
||||||
|
|
||||||
|
l, ok := limiters.Get(key)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
|
||||||
|
time.Sleep(c.ReconcileCheckInterval)
|
||||||
|
// Wait > ReconcileCheckInterval and check that the key was cleaned up
|
||||||
|
retry.RunWith(&retry.Timer{Wait: 100 * time.Millisecond, Timeout: 2 * time.Second}, t, func(r *retry.R) {
|
||||||
|
l := m.limiters.Load()
|
||||||
|
require.NotEqual(r, limiters, l)
|
||||||
|
limiters = l
|
||||||
|
})
|
||||||
|
l, ok = limiters.Get(key)
|
||||||
|
require.False(t, ok)
|
||||||
|
require.Nil(t, l)
|
||||||
|
}
|
||||||
|
|
||||||
|
func storeLimiter(m *MultiLimiter) {
|
||||||
|
txn := m.limiters.Load().Txn()
|
||||||
|
mockTicker := mockTicker{tickerCh: make(chan time.Time, 1)}
|
||||||
|
ctx := context.Background()
|
||||||
|
m.runStoreOnce(ctx, &mockTicker, txn)
|
||||||
|
mockTicker.tickerCh <- time.Now()
|
||||||
|
m.runStoreOnce(ctx, &mockTicker, txn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRateLimiterStore(t *testing.T) {
|
||||||
|
// Create a MultiLimiter m with a defaultConfig c and check the defaultConfig is applied
|
||||||
|
|
||||||
|
t.Run("Store multiple transactions", func(t *testing.T) {
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
require.Equal(t, *m.defaultConfig.Load(), c)
|
||||||
|
ipNoPrefix1 := Key([]byte(""), []byte("127.0.0.1"))
|
||||||
|
ipNoPrefix2 := Key([]byte(""), []byte("127.0.0.2"))
|
||||||
|
{
|
||||||
|
m.Allow(ipLimited{key: ipNoPrefix1})
|
||||||
|
storeLimiter(m)
|
||||||
|
l, ok := m.limiters.Load().Get(ipNoPrefix1)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
limiter := l.(*Limiter)
|
||||||
|
require.True(t, c.isApplied(limiter.limiter))
|
||||||
|
}
|
||||||
|
{
|
||||||
|
m.Allow(ipLimited{key: ipNoPrefix2})
|
||||||
|
storeLimiter(m)
|
||||||
|
l, ok := m.limiters.Load().Get(ipNoPrefix2)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
limiter := l.(*Limiter)
|
||||||
|
require.True(t, c.isApplied(limiter.limiter))
|
||||||
|
l, ok = m.limiters.Load().Get(ipNoPrefix1)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
limiter = l.(*Limiter)
|
||||||
|
require.True(t, c.isApplied(limiter.limiter))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
t.Run("runStore store multiple Limiters", func(t *testing.T) {
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 10 * time.Second, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
require.Equal(t, *m.defaultConfig.Load(), c)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
m.Run(ctx)
|
||||||
|
defer cancel()
|
||||||
|
ipNoPrefix1 := Key([]byte(""), []byte("127.0.0.1"))
|
||||||
|
ipNoPrefix2 := Key([]byte(""), []byte("127.0.0.2"))
|
||||||
|
limiters := m.limiters.Load()
|
||||||
|
m.Allow(ipLimited{key: ipNoPrefix1})
|
||||||
|
retry.RunWith(&retry.Timer{Wait: 1 * time.Second, Timeout: 5 * time.Second}, t, func(r *retry.R) {
|
||||||
|
l := m.limiters.Load()
|
||||||
|
require.NotEqual(r, limiters, l)
|
||||||
|
limiters = l
|
||||||
|
})
|
||||||
|
l, ok := m.limiters.Load().Get(ipNoPrefix1)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
limiter := l.(*Limiter)
|
||||||
|
require.True(t, c.isApplied(limiter.limiter))
|
||||||
|
m.Allow(ipLimited{key: ipNoPrefix2})
|
||||||
|
retry.RunWith(&retry.Timer{Wait: 1 * time.Second, Timeout: 5 * time.Second}, t, func(r *retry.R) {
|
||||||
|
l := m.limiters.Load()
|
||||||
|
require.NotEqual(r, limiters, l)
|
||||||
|
limiters = l
|
||||||
|
})
|
||||||
|
l, ok = m.limiters.Load().Get(ipNoPrefix1)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
limiter = l.(*Limiter)
|
||||||
|
require.True(t, c.isApplied(limiter.limiter))
|
||||||
|
l, ok = m.limiters.Load().Get(ipNoPrefix2)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
limiter = l.(*Limiter)
|
||||||
|
require.True(t, c.isApplied(limiter.limiter))
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRateLimiterUpdateConfig(t *testing.T) {
|
||||||
|
|
||||||
|
// Create a MultiLimiter m with a defaultConfig c and check the defaultConfig is applied
|
||||||
|
|
||||||
|
t.Run("Allow a key and check defaultConfig is applied to that key", func(t *testing.T) {
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
require.Equal(t, *m.defaultConfig.Load(), c)
|
||||||
|
ipNoPrefix := Key([]byte(""), []byte("127.0.0.1"))
|
||||||
|
m.Allow(ipLimited{key: ipNoPrefix})
|
||||||
|
storeLimiter(m)
|
||||||
|
l, ok := m.limiters.Load().Get(ipNoPrefix)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
limiter := l.(*Limiter)
|
||||||
|
require.True(t, c.isApplied(limiter.limiter))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Update nil prefix and make sure it's written in the root", func(t *testing.T) {
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
require.Equal(t, *m.defaultConfig.Load(), c)
|
||||||
|
prefix := []byte(nil)
|
||||||
|
c1 := LimiterConfig{Rate: 2}
|
||||||
|
m.UpdateConfig(c1, prefix)
|
||||||
|
v, ok := m.limitersConfigs.Load().Get([]byte(""))
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, v)
|
||||||
|
c2 := v.(*LimiterConfig)
|
||||||
|
require.Equal(t, c1, *c2)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Allow 2 keys with prefix and check defaultConfig is applied to those keys", func(t *testing.T) {
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
require.Equal(t, *m.defaultConfig.Load(), c)
|
||||||
|
prefix := []byte("namespace.write")
|
||||||
|
ip := Key(prefix, []byte("127.0.0.1"))
|
||||||
|
m.Allow(ipLimited{key: ip})
|
||||||
|
storeLimiter(m)
|
||||||
|
ip2 := Key(prefix, []byte("127.0.0.2"))
|
||||||
|
m.Allow(ipLimited{key: ip2})
|
||||||
|
l, ok := m.limiters.Load().Get(ip)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
limiter := l.(*Limiter)
|
||||||
|
require.True(t, c.LimiterConfig.isApplied(limiter.limiter))
|
||||||
|
})
|
||||||
|
t.Run("Apply a defaultConfig to 'namespace.write' check the defaultConfig is applied to existing keys under that prefix", func(t *testing.T) {
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
require.Equal(t, *m.defaultConfig.Load(), c)
|
||||||
|
prefix := []byte("namespace.write")
|
||||||
|
ip := Key(prefix, []byte("127.0.0.1"))
|
||||||
|
c3 := LimiterConfig{Rate: 2}
|
||||||
|
m.UpdateConfig(c3, prefix)
|
||||||
|
// call reconcileLimitedOnce to make sure the update is applied
|
||||||
|
m.reconcileLimitedOnce(time.Now(), 100*time.Millisecond)
|
||||||
|
m.Allow(ipLimited{key: ip})
|
||||||
|
storeLimiter(m)
|
||||||
|
l3, ok3 := m.limiters.Load().Get(ip)
|
||||||
|
require.True(t, ok3)
|
||||||
|
require.NotNil(t, l3)
|
||||||
|
limiter3 := l3.(*Limiter)
|
||||||
|
require.True(t, c3.isApplied(limiter3.limiter))
|
||||||
|
})
|
||||||
|
t.Run("Allow an IP with prefix and check prefix defaultConfig is applied to new keys under that prefix", func(t *testing.T) {
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
require.Equal(t, *m.defaultConfig.Load(), c)
|
||||||
|
c1 := LimiterConfig{Rate: 3}
|
||||||
|
prefix := []byte("namespace.read")
|
||||||
|
m.UpdateConfig(c1, prefix)
|
||||||
|
ip := Key(prefix, []byte("127.0.0.1"))
|
||||||
|
m.Allow(ipLimited{key: ip})
|
||||||
|
storeLimiter(m)
|
||||||
|
l, ok := m.limiters.Load().Get(ip)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
limiter := l.(*Limiter)
|
||||||
|
require.True(t, c1.isApplied(limiter.limiter))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Allow an IP with prefix and check after it's cleaned new Allow would give it the right defaultConfig", func(t *testing.T) {
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
require.Equal(t, *m.defaultConfig.Load(), c)
|
||||||
|
prefix := []byte("namespace.read")
|
||||||
|
ip := Key(prefix, []byte("127.0.0.1"))
|
||||||
|
c1 := LimiterConfig{Rate: 1}
|
||||||
|
m.UpdateConfig(c1, prefix)
|
||||||
|
// call reconcileLimitedOnce to make sure the update is applied
|
||||||
|
m.reconcileLimitedOnce(time.Now(), 100*time.Millisecond)
|
||||||
|
m.Allow(ipLimited{key: ip})
|
||||||
|
storeLimiter(m)
|
||||||
|
l, ok := m.limiters.Load().Get(ip)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, l)
|
||||||
|
limiter := l.(*Limiter)
|
||||||
|
require.True(t, c1.isApplied(limiter.limiter))
|
||||||
|
m.reconcileLimitedOnce(time.Now().Add(100*time.Millisecond), 100*time.Millisecond)
|
||||||
|
l, ok = m.limiters.Load().Get(ip)
|
||||||
|
require.False(t, ok)
|
||||||
|
require.Nil(t, l)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func FuzzSingleConfig(f *testing.F) {
|
||||||
|
c := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 100 * time.Millisecond, ReconcileCheckInterval: 10 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(c)
|
||||||
|
require.Equal(f, *m.defaultConfig.Load(), c)
|
||||||
|
f.Add(makeKey(randIP()))
|
||||||
|
f.Add(makeKey(randIP(), randIP()))
|
||||||
|
f.Add(makeKey(randIP(), randIP(), randIP()))
|
||||||
|
f.Add(makeKey(randIP(), randIP(), randIP(), randIP()))
|
||||||
|
f.Fuzz(func(t *testing.T, ff []byte) {
|
||||||
|
m.Allow(Limited{key: ff})
|
||||||
|
storeLimiter(m)
|
||||||
|
checkLimiter(t, ff, m.limiters.Load().Txn())
|
||||||
|
checkTree(t, m.limiters.Load().Txn())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func FuzzSplitKey(f *testing.F) {
|
||||||
|
f.Add(makeKey(randIP(), randIP()))
|
||||||
|
f.Add(makeKey(randIP(), randIP(), randIP()))
|
||||||
|
f.Add(makeKey(randIP(), randIP(), randIP(), randIP()))
|
||||||
|
f.Add([]byte(""))
|
||||||
|
f.Fuzz(func(t *testing.T, ff []byte) {
|
||||||
|
prefix, suffix := splitKey(ff)
|
||||||
|
require.NotNil(t, prefix)
|
||||||
|
require.NotNil(t, suffix)
|
||||||
|
if len(prefix) == 0 && len(suffix) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
joined := bytes.Join([][]byte{prefix, suffix}, []byte(separator))
|
||||||
|
require.Equal(t, ff, joined)
|
||||||
|
require.False(t, bytes.Contains(prefix, []byte(separator)))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkLimiter(t require.TestingT, ff []byte, Tree *radix.Txn) {
|
||||||
|
v, ok := Tree.Get(ff)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NotNil(t, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func FuzzUpdateConfig(f *testing.F) {
|
||||||
|
|
||||||
|
f.Add(bytes.Join([][]byte{[]byte(""), makeKey(randIP()), makeKey(randIP(), randIP()), makeKey(randIP(), randIP(), randIP()), makeKey(randIP(), randIP(), randIP(), randIP())}, []byte(",")))
|
||||||
|
f.Fuzz(func(t *testing.T, ff []byte) {
|
||||||
|
cm := Config{LimiterConfig: LimiterConfig{Rate: 0.1}, ReconcileCheckLimit: 1 * time.Millisecond, ReconcileCheckInterval: 1 * time.Millisecond}
|
||||||
|
m := NewMultiLimiter(cm)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
m.Run(ctx)
|
||||||
|
defer cancel()
|
||||||
|
keys := bytes.Split(ff, []byte(","))
|
||||||
|
for _, f := range keys {
|
||||||
|
prefix, _ := splitKey(f)
|
||||||
|
c := LimiterConfig{Rate: rate.Limit(rand.Float64()), Burst: rand.Int()}
|
||||||
|
m.UpdateConfig(c, prefix)
|
||||||
|
go m.Allow(Limited{key: f})
|
||||||
|
}
|
||||||
|
m.reconcileLimitedOnce(time.Now(), 1*time.Millisecond)
|
||||||
|
checkTree(t, m.limiters.Load().Txn())
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkTree(t require.TestingT, tree *radix.Txn) {
|
||||||
|
iterator := tree.Root().Iterator()
|
||||||
|
kp, v, ok := iterator.Next()
|
||||||
|
for ok {
|
||||||
|
switch c := v.(type) {
|
||||||
|
case *Limiter:
|
||||||
|
if c.limiter != nil {
|
||||||
|
prefix, _ := splitKey(kp)
|
||||||
|
v, _ := tree.Get(prefix)
|
||||||
|
switch c2 := v.(type) {
|
||||||
|
case *LimiterConfig:
|
||||||
|
if c2 != nil {
|
||||||
|
applied := c2.isApplied(c.limiter)
|
||||||
|
require.True(t, applied, fmt.Sprintf("%v,%v", kp, prefix))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
require.Nil(t, v)
|
||||||
|
}
|
||||||
|
kp, v, ok = iterator.Next()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type ipLimited struct {
|
||||||
|
key []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i ipLimited) Key() []byte {
|
||||||
|
return i.key
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkTestRateLimiterFixedIP(b *testing.B) {
|
||||||
|
var Config = Config{LimiterConfig: LimiterConfig{Rate: 1.0, Burst: 500}, ReconcileCheckLimit: time.Microsecond, ReconcileCheckInterval: time.Millisecond}
|
||||||
|
m := NewMultiLimiter(Config)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
m.Run(ctx)
|
||||||
|
defer cancel()
|
||||||
|
ip := []byte{244, 233, 0, 1}
|
||||||
|
for j := 0; j < b.N; j++ {
|
||||||
|
m.Allow(ipLimited{key: ip})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkTestRateLimiterAllowPrefill(b *testing.B) {
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
prefill uint64
|
||||||
|
}{
|
||||||
|
{name: "no prefill", prefill: 0},
|
||||||
|
{name: "prefill with 1K keys", prefill: 1000},
|
||||||
|
{name: "prefill with 10K keys", prefill: 10_000},
|
||||||
|
{name: "prefill with 100K keys", prefill: 100_000},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
|
||||||
|
b.Run(tc.name, func(b *testing.B) {
|
||||||
|
var Config = Config{LimiterConfig: LimiterConfig{Rate: 1.0, Burst: 500}, ReconcileCheckLimit: time.Second, ReconcileCheckInterval: time.Second}
|
||||||
|
m := NewMultiLimiter(Config)
|
||||||
|
var i uint64
|
||||||
|
for i = 0xdeaddead; i < 0xdeaddead+tc.prefill; i++ {
|
||||||
|
buf := make([]byte, 8)
|
||||||
|
binary.LittleEndian.PutUint64(buf, i)
|
||||||
|
m.Allow(ipLimited{key: buf})
|
||||||
|
}
|
||||||
|
b.ResetTimer()
|
||||||
|
for j := 0; j < b.N; j++ {
|
||||||
|
buf := make([]byte, 4)
|
||||||
|
binary.LittleEndian.PutUint32(buf, uint32(j))
|
||||||
|
m.Allow(ipLimited{key: buf})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkTestRateLimiterAllowConcurrencyPrefill(b *testing.B) {
|
||||||
|
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
prefill uint64
|
||||||
|
}{
|
||||||
|
{name: "no prefill", prefill: 0},
|
||||||
|
{name: "prefill with 1K keys", prefill: 1000},
|
||||||
|
{name: "prefill with 10K keys", prefill: 10_000},
|
||||||
|
{name: "prefill with 100K keys", prefill: 100_000},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
|
||||||
|
b.Run(tc.name, func(b *testing.B) {
|
||||||
|
var Config = Config{LimiterConfig: LimiterConfig{Rate: 1.0, Burst: 500}, ReconcileCheckLimit: time.Second, ReconcileCheckInterval: 100 * time.Second}
|
||||||
|
m := NewMultiLimiter(Config)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
m.Run(ctx)
|
||||||
|
defer cancel()
|
||||||
|
var i uint64
|
||||||
|
for i = 0xdeaddead; i < 0xdeaddead+tc.prefill; i++ {
|
||||||
|
buf := make([]byte, 8)
|
||||||
|
binary.LittleEndian.PutUint64(buf, i)
|
||||||
|
m.Allow(ipLimited{key: buf})
|
||||||
|
}
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
b.ResetTimer()
|
||||||
|
for j := 0; j < b.N; j++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(n int) {
|
||||||
|
buf := make([]byte, 4)
|
||||||
|
binary.LittleEndian.PutUint32(buf, uint32(n))
|
||||||
|
m.Allow(ipLimited{key: buf})
|
||||||
|
wg.Done()
|
||||||
|
}(j)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkTestRateLimiterRandomIP(b *testing.B) {
|
||||||
|
var Config = Config{LimiterConfig: LimiterConfig{Rate: 1.0, Burst: 500}, ReconcileCheckLimit: time.Microsecond, ReconcileCheckInterval: time.Millisecond}
|
||||||
|
m := NewMultiLimiter(Config)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
m.Run(ctx)
|
||||||
|
defer cancel()
|
||||||
|
for j := 0; j < b.N; j++ {
|
||||||
|
m.Allow(ipLimited{key: randIP()})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func randIP() []byte {
|
||||||
|
buf := make([]byte, 4)
|
||||||
|
|
||||||
|
ip := rand.Uint32()
|
||||||
|
|
||||||
|
binary.LittleEndian.PutUint32(buf, ip)
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockTicker struct {
|
||||||
|
tickerCh chan time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockTicker) Ticker() <-chan time.Time {
|
||||||
|
return m.tickerCh
|
||||||
|
}
|
|
@ -0,0 +1,2 @@
|
||||||
|
go test fuzz v1
|
||||||
|
[]byte("\x0e0,\x0e♣")
|
|
@ -0,0 +1,2 @@
|
||||||
|
go test fuzz v1
|
||||||
|
[]byte("0")
|
|
@ -0,0 +1,2 @@
|
||||||
|
go test fuzz v1
|
||||||
|
[]byte("0,0")
|
|
@ -0,0 +1,2 @@
|
||||||
|
go test fuzz v1
|
||||||
|
[]byte("")
|
|
@ -0,0 +1,2 @@
|
||||||
|
go test fuzz v1
|
||||||
|
[]byte(",0♣")
|
2
go.mod
2
go.mod
|
@ -43,6 +43,7 @@ require (
|
||||||
github.com/hashicorp/go-connlimit v0.3.0
|
github.com/hashicorp/go-connlimit v0.3.0
|
||||||
github.com/hashicorp/go-discover v0.0.0-20220411141802-20db45f7f0f9
|
github.com/hashicorp/go-discover v0.0.0-20220411141802-20db45f7f0f9
|
||||||
github.com/hashicorp/go-hclog v1.2.1
|
github.com/hashicorp/go-hclog v1.2.1
|
||||||
|
github.com/hashicorp/go-immutable-radix v1.3.0
|
||||||
github.com/hashicorp/go-memdb v1.3.4
|
github.com/hashicorp/go-memdb v1.3.4
|
||||||
github.com/hashicorp/go-multierror v1.1.1
|
github.com/hashicorp/go-multierror v1.1.1
|
||||||
github.com/hashicorp/go-raftchunking v0.6.2
|
github.com/hashicorp/go-raftchunking v0.6.2
|
||||||
|
@ -152,7 +153,6 @@ require (
|
||||||
github.com/googleapis/gnostic v0.2.0 // indirect
|
github.com/googleapis/gnostic v0.2.0 // indirect
|
||||||
github.com/gophercloud/gophercloud v0.3.0 // indirect
|
github.com/gophercloud/gophercloud v0.3.0 // indirect
|
||||||
github.com/hashicorp/errwrap v1.0.0 // indirect
|
github.com/hashicorp/errwrap v1.0.0 // indirect
|
||||||
github.com/hashicorp/go-immutable-radix v1.3.0 // indirect
|
|
||||||
github.com/hashicorp/go-msgpack v0.5.5 // indirect
|
github.com/hashicorp/go-msgpack v0.5.5 // indirect
|
||||||
github.com/hashicorp/go-msgpack/v2 v2.0.0 // indirect
|
github.com/hashicorp/go-msgpack/v2 v2.0.0 // indirect
|
||||||
github.com/hashicorp/go-retryablehttp v0.6.7 // indirect
|
github.com/hashicorp/go-retryablehttp v0.6.7 // indirect
|
||||||
|
|
Loading…
Reference in New Issue