Added ratelimit to handle throtling cache (#8226)

This implements a solution for #7863

It does:

    Add a new config cache.entry_fetch_rate to limit the number of calls/s for a given cache entry, default value = rate.Inf
    Add cache.entry_fetch_max_burst size of rate limit (default value = 2)

The new configuration now supports the following syntax for instance to allow 1 query every 3s:

    command line HCL: -hcl 'cache = { entry_fetch_rate = 0.333}'
    in JSON

{
  "cache": {
    "entry_fetch_rate": 0.333
  }
}
This commit is contained in:
Pierre Souchay 2020-07-27 23:11:11 +02:00 committed by GitHub
parent efb1aae70d
commit 505de6dc29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 226 additions and 19 deletions

View File

@ -664,7 +664,7 @@ func (a *Agent) Start(ctx context.Context) error {
a.sync = ae.NewStateSyncer(a.State, c.AEInterval, a.shutdownCh, a.logger) a.sync = ae.NewStateSyncer(a.State, c.AEInterval, a.shutdownCh, a.logger)
// create the cache // create the cache
a.cache = cache.New(nil) a.cache = cache.New(c.Cache)
// create the config for the rpc server/client // create the config for the rpc server/client
consulCfg, err := a.consulConfig() consulCfg, err := a.consulConfig()

View File

@ -17,6 +17,7 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -857,6 +858,98 @@ func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) {
} }
} }
func TestCacheRateLimit(test *testing.T) {
test.Parallel()
tests := []struct {
// count := number of updates performed (1 every 10ms)
count int
// rateLimit rate limiting of cache
rateLimit float64
// Minimum number of updates to see from a cache perspective
// We add a value with tolerance to work even on a loaded CI
minUpdates int
}{
// 250 => we have a test running for at least 2.5s
{250, 0.5, 1},
{250, 1, 1},
{300, 2, 2},
}
for _, currentTest := range tests {
test.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) {
tt := currentTest
t.Parallel()
a := NewTestAgent(t, fmt.Sprintf("cache = { entry_fetch_rate = %v, entry_fetch_max_burst = 1 }", tt.rateLimit))
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
var wg sync.WaitGroup
stillProcessing := true
injectService := func(i int) {
srv := &structs.NodeService{
Service: "redis",
ID: "redis",
Port: 1024 + i,
Address: fmt.Sprintf("10.0.1.%d", i%255),
}
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
require.Nil(t, err)
}
runUpdates := func() {
wg.Add(tt.count)
for i := 0; i < tt.count; i++ {
time.Sleep(10 * time.Millisecond)
injectService(i)
wg.Done()
}
stillProcessing = false
}
getIndex := func(t *testing.T, oldIndex int) int {
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/health/service/redis?cached&wait=5s&index=%d", oldIndex), nil)
require.NoError(t, err)
resp := httptest.NewRecorder()
a.srv.Handler.ServeHTTP(resp, req)
// Key doesn't actually exist so we should get 404
if got, want := resp.Code, http.StatusOK; got != want {
t.Fatalf("bad response code got %d want %d", got, want)
}
index, err := strconv.Atoi(resp.Header().Get("X-Consul-Index"))
require.NoError(t, err)
return index
}
{
start := time.Now()
injectService(0)
// Get the first index
index := getIndex(t, 0)
require.Greater(t, index, 2)
go runUpdates()
numberOfUpdates := 0
for stillProcessing {
oldIndex := index
index = getIndex(t, oldIndex)
require.GreaterOrEqual(t, index, oldIndex, "index must be increasing only")
numberOfUpdates++
}
elapsed := time.Since(start)
qps := float64(time.Second) * float64(numberOfUpdates) / float64(elapsed)
summary := fmt.Sprintf("received %v updates in %v aka %f qps, target max was: %f qps", numberOfUpdates, elapsed, qps, tt.rateLimit)
// We must never go beyond the limit, we give 10% margin to avoid having values like 1.05 instead of 1 due to precision of clock
require.LessOrEqual(t, qps, 1.1*tt.rateLimit, fmt.Sprintf("it should never get more requests than ratelimit, had: %s", summary))
// We must have at least being notified a few times
require.GreaterOrEqual(t, numberOfUpdates, tt.minUpdates, fmt.Sprintf("It should have received a minimum of %d updates, had: %s", tt.minUpdates, summary))
}
wg.Wait()
})
}
}
func TestAddServiceIPv4TaggedDefault(t *testing.T) { func TestAddServiceIPv4TaggedDefault(t *testing.T) {
t.Helper() t.Helper()

39
agent/cache/cache.go vendored
View File

@ -25,6 +25,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"golang.org/x/time/rate"
) )
//go:generate mockery -all -inpkg //go:generate mockery -all -inpkg
@ -81,6 +82,10 @@ type Cache struct {
stopped uint32 stopped uint32
// stopCh is closed when Close is called // stopCh is closed when Close is called
stopCh chan struct{} stopCh chan struct{}
// options includes a per Cache Rate limiter specification to avoid performing too many queries
options Options
rateLimitContext context.Context
rateLimitCancel context.CancelFunc
} }
// typeEntry is a single type that is registered with a Cache. // typeEntry is a single type that is registered with a Cache.
@ -122,23 +127,29 @@ type ResultMeta struct {
// Options are options for the Cache. // Options are options for the Cache.
type Options struct { type Options struct {
// Nothing currently, reserved. // EntryFetchMaxBurst max burst size of RateLimit for a single cache entry
EntryFetchMaxBurst int
// EntryFetchRate represents the max calls/sec for a single cache entry
EntryFetchRate rate.Limit
} }
// New creates a new cache with the given RPC client and reasonable defaults. // New creates a new cache with the given RPC client and reasonable defaults.
// Further settings can be tweaked on the returned value. // Further settings can be tweaked on the returned value.
func New(*Options) *Cache { func New(options Options) *Cache {
// Initialize the heap. The buffer of 1 is really important because // Initialize the heap. The buffer of 1 is really important because
// its possible for the expiry loop to trigger the heap to update // its possible for the expiry loop to trigger the heap to update
// itself and it'd block forever otherwise. // itself and it'd block forever otherwise.
h := &expiryHeap{NotifyCh: make(chan struct{}, 1)} h := &expiryHeap{NotifyCh: make(chan struct{}, 1)}
heap.Init(h) heap.Init(h)
ctx, cancel := context.WithCancel(context.Background())
c := &Cache{ c := &Cache{
types: make(map[string]typeEntry), types: make(map[string]typeEntry),
entries: make(map[string]cacheEntry), entries: make(map[string]cacheEntry),
entriesExpiryHeap: h, entriesExpiryHeap: h,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
options: options,
rateLimitContext: ctx,
rateLimitCancel: cancel,
} }
// Start the expiry watcher // Start the expiry watcher
@ -454,7 +465,14 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
// If we don't have an entry, then create it. The entry must be marked // If we don't have an entry, then create it. The entry must be marked
// as invalid so that it isn't returned as a valid value for a zero index. // as invalid so that it isn't returned as a valid value for a zero index.
if !ok { if !ok {
entry = cacheEntry{Valid: false, Waiter: make(chan struct{})} entry = cacheEntry{
Valid: false,
Waiter: make(chan struct{}),
FetchRateLimiter: rate.NewLimiter(
c.options.EntryFetchRate,
c.options.EntryFetchMaxBurst,
),
}
} }
// Set that we're fetching to true, which makes it so that future // Set that we're fetching to true, which makes it so that future
@ -504,7 +522,13 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
Index: entry.Index, Index: entry.Index,
} }
} }
if err := entry.FetchRateLimiter.Wait(c.rateLimitContext); err != nil {
if connectedTimer != nil {
connectedTimer.Stop()
}
entry.Error = fmt.Errorf("rateLimitContext canceled: %s", err.Error())
return
}
// Start building the new entry by blocking on the fetch. // Start building the new entry by blocking on the fetch.
result, err := r.Fetch(fOpts) result, err := r.Fetch(fOpts)
if connectedTimer != nil { if connectedTimer != nil {
@ -728,6 +752,7 @@ func (c *Cache) Close() error {
if wasStopped == 0 { if wasStopped == 0 {
// First time only, close stop chan // First time only, close stop chan
close(c.stopCh) close(c.stopCh)
c.rateLimitCancel()
} }
return nil return nil
} }
@ -747,6 +772,10 @@ func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) erro
FetchedAt: time.Now(), FetchedAt: time.Now(),
Waiter: make(chan struct{}), Waiter: make(chan struct{}),
Expiry: &cacheEntryExpiry{Key: key}, Expiry: &cacheEntryExpiry{Key: key},
FetchRateLimiter: rate.NewLimiter(
c.options.EntryFetchRate,
c.options.EntryFetchMaxBurst,
),
} }
c.entriesLock.Lock() c.entriesLock.Lock()
c.entries[key] = newEntry c.entries[key] = newEntry

View File

@ -3,6 +3,8 @@ package cache
import ( import (
"container/heap" "container/heap"
"time" "time"
"golang.org/x/time/rate"
) )
// cacheEntry stores a single cache entry. // cacheEntry stores a single cache entry.
@ -41,6 +43,8 @@ type cacheEntry struct {
// background request has be blocking for at least 5 seconds, which ever // background request has be blocking for at least 5 seconds, which ever
// happens first. // happens first.
RefreshLostContact time.Time RefreshLostContact time.Time
// FetchRateLimiter limits the rate at which fetch is called for this entry.
FetchRateLimiter *rate.Limiter
} }
// cacheEntryExpiry contains the expiration information for a cache // cacheEntryExpiry contains the expiration information for a cache

View File

@ -8,12 +8,13 @@ import (
"github.com/mitchellh/go-testing-interface" "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/time/rate"
) )
// TestCache returns a Cache instance configuring for testing. // TestCache returns a Cache instance configuring for testing.
func TestCache(t testing.T) *Cache { func TestCache(t testing.T) *Cache {
// Simple but lets us do some fine-tuning later if we want to. // Simple but lets us do some fine-tuning later if we want to.
return New(nil) return New(Options{EntryFetchRate: rate.Inf, EntryFetchMaxBurst: 2})
} }
// TestCacheGetCh returns a channel that returns the result of the Get call. // TestCacheGetCh returns a channel that returns the result of the Get call.

View File

@ -14,6 +14,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul"
@ -32,6 +33,15 @@ import (
"golang.org/x/time/rate" "golang.org/x/time/rate"
) )
// The following constants are default values for some settings
// Ensure to update documentation if you modify those values
const (
// DefaultEntryFetchMaxBurst is the default value for cache.entry_fetch_max_burst
DefaultEntryFetchMaxBurst = 2
// DefaultEntryFetchRate is the default value for cache.entry_fetch_rate
DefaultEntryFetchRate = float64(rate.Inf)
)
// Builder constructs a valid runtime configuration from multiple // Builder constructs a valid runtime configuration from multiple
// configuration sources. // configuration sources.
// //
@ -887,6 +897,12 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
BindAddr: bindAddr, BindAddr: bindAddr,
Bootstrap: b.boolVal(c.Bootstrap), Bootstrap: b.boolVal(c.Bootstrap),
BootstrapExpect: b.intVal(c.BootstrapExpect), BootstrapExpect: b.intVal(c.BootstrapExpect),
Cache: cache.Options{
EntryFetchRate: rate.Limit(
b.float64ValWithDefault(c.Cache.EntryFetchRate, DefaultEntryFetchRate)),
EntryFetchMaxBurst: b.intValWithDefault(
c.Cache.EntryFetchMaxBurst, DefaultEntryFetchMaxBurst),
},
CAFile: b.stringVal(c.CAFile), CAFile: b.stringVal(c.CAFile),
CAPath: b.stringVal(c.CAPath), CAPath: b.stringVal(c.CAPath),
CertFile: b.stringVal(c.CertFile), CertFile: b.stringVal(c.CertFile),
@ -1014,6 +1030,13 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
Watches: c.Watches, Watches: c.Watches,
} }
if rt.Cache.EntryFetchMaxBurst <= 0 {
return RuntimeConfig{}, fmt.Errorf("cache.entry_fetch_max_burst must be strictly positive, was: %v", rt.Cache.EntryFetchMaxBurst)
}
if rt.Cache.EntryFetchRate <= 0 {
return RuntimeConfig{}, fmt.Errorf("cache.entry_fetch_rate must be strictly positive, was: %v", rt.Cache.EntryFetchRate)
}
if entCfg, err := b.BuildEnterpriseRuntimeConfig(&c); err != nil { if entCfg, err := b.BuildEnterpriseRuntimeConfig(&c); err != nil {
return RuntimeConfig{}, err return RuntimeConfig{}, err
} else { } else {
@ -1645,14 +1668,18 @@ func (b *Builder) stringVal(v *string) string {
return b.stringValWithDefault(v, "") return b.stringValWithDefault(v, "")
} }
func (b *Builder) float64Val(v *float64) float64 { func (b *Builder) float64ValWithDefault(v *float64, defaultVal float64) float64 {
if v == nil { if v == nil {
return 0 return defaultVal
} }
return *v return *v
} }
func (b *Builder) float64Val(v *float64) float64 {
return b.float64ValWithDefault(v, 0)
}
func (b *Builder) cidrsVal(name string, v []string) (nets []*net.IPNet) { func (b *Builder) cidrsVal(name string, v []string) (nets []*net.IPNet) {
if v == nil { if v == nil {
return return

View File

@ -58,6 +58,14 @@ func Parse(data string, format string) (c Config, md mapstructure.Metadata, err
return c, md, nil return c, md, nil
} }
// Cache is the tunning configuration for cache, values are optional
type Cache struct {
// EntryFetchMaxBurst max burst size of RateLimit for a single cache entry
EntryFetchMaxBurst *int `json:"entry_fetch_max_burst,omitempty" hcl:"entry_fetch_max_burst" mapstructure:"entry_fetch_max_burst"`
// EntryFetchRate represents the max calls/sec for a single cache entry
EntryFetchRate *float64 `json:"entry_fetch_rate,omitempty" hcl:"entry_fetch_rate" mapstructure:"entry_fetch_rate"`
}
// Config defines the format of a configuration file in either JSON or // Config defines the format of a configuration file in either JSON or
// HCL format. // HCL format.
// //
@ -101,6 +109,7 @@ type Config struct {
BindAddr *string `json:"bind_addr,omitempty" hcl:"bind_addr" mapstructure:"bind_addr"` BindAddr *string `json:"bind_addr,omitempty" hcl:"bind_addr" mapstructure:"bind_addr"`
Bootstrap *bool `json:"bootstrap,omitempty" hcl:"bootstrap" mapstructure:"bootstrap"` Bootstrap *bool `json:"bootstrap,omitempty" hcl:"bootstrap" mapstructure:"bootstrap"`
BootstrapExpect *int `json:"bootstrap_expect,omitempty" hcl:"bootstrap_expect" mapstructure:"bootstrap_expect"` BootstrapExpect *int `json:"bootstrap_expect,omitempty" hcl:"bootstrap_expect" mapstructure:"bootstrap_expect"`
Cache Cache `json:"cache,omitempty" hcl:"cache" mapstructure:"cache"`
CAFile *string `json:"ca_file,omitempty" hcl:"ca_file" mapstructure:"ca_file"` CAFile *string `json:"ca_file,omitempty" hcl:"ca_file" mapstructure:"ca_file"`
CAPath *string `json:"ca_path,omitempty" hcl:"ca_path" mapstructure:"ca_path"` CAPath *string `json:"ca_path,omitempty" hcl:"ca_path" mapstructure:"ca_path"`
CertFile *string `json:"cert_file,omitempty" hcl:"cert_file" mapstructure:"cert_file"` CertFile *string `json:"cert_file,omitempty" hcl:"cert_file" mapstructure:"cert_file"`

View File

@ -7,6 +7,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
@ -444,6 +445,9 @@ type RuntimeConfig struct {
// flag: -bootstrap-expect=int // flag: -bootstrap-expect=int
BootstrapExpect int BootstrapExpect int
// Cache represent cache configuration of agent
Cache cache.Options
// CAFile is a path to a certificate authority file. This is used with // CAFile is a path to a certificate authority file. This is used with
// VerifyIncoming or VerifyOutgoing to verify the TLS connection. // VerifyIncoming or VerifyOutgoing to verify the TLS connection.
// //

View File

@ -18,6 +18,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
@ -4411,6 +4412,10 @@ func TestFullConfig(t *testing.T) {
"bind_addr": "16.99.34.17", "bind_addr": "16.99.34.17",
"bootstrap": true, "bootstrap": true,
"bootstrap_expect": 53, "bootstrap_expect": 53,
"cache": {
"entry_fetch_max_burst": 42,
"entry_fetch_rate": 0.334
},
"ca_file": "erA7T0PM", "ca_file": "erA7T0PM",
"ca_path": "mQEN1Mfp", "ca_path": "mQEN1Mfp",
"cert_file": "7s4QAzDk", "cert_file": "7s4QAzDk",
@ -5071,6 +5076,10 @@ func TestFullConfig(t *testing.T) {
bind_addr = "16.99.34.17" bind_addr = "16.99.34.17"
bootstrap = true bootstrap = true
bootstrap_expect = 53 bootstrap_expect = 53
cache = {
entry_fetch_max_burst = 42
entry_fetch_rate = 0.334
},
ca_file = "erA7T0PM" ca_file = "erA7T0PM"
ca_path = "mQEN1Mfp" ca_path = "mQEN1Mfp"
cert_file = "7s4QAzDk" cert_file = "7s4QAzDk"
@ -5797,6 +5806,10 @@ func TestFullConfig(t *testing.T) {
BindAddr: ipAddr("16.99.34.17"), BindAddr: ipAddr("16.99.34.17"),
Bootstrap: true, Bootstrap: true,
BootstrapExpect: 53, BootstrapExpect: 53,
Cache: cache.Options{
EntryFetchMaxBurst: 42,
EntryFetchRate: 0.334,
},
CAFile: "erA7T0PM", CAFile: "erA7T0PM",
CAPath: "mQEN1Mfp", CAPath: "mQEN1Mfp",
CertFile: "7s4QAzDk", CertFile: "7s4QAzDk",
@ -6679,6 +6692,10 @@ func TestSanitize(t *testing.T) {
&net.TCPAddr{IP: net.ParseIP("1.2.3.4"), Port: 5678}, &net.TCPAddr{IP: net.ParseIP("1.2.3.4"), Port: 5678},
&net.UnixAddr{Name: "/var/run/foo"}, &net.UnixAddr{Name: "/var/run/foo"},
}, },
Cache: cache.Options{
EntryFetchMaxBurst: 42,
EntryFetchRate: 0.334,
},
ConsulCoordinateUpdatePeriod: 15 * time.Second, ConsulCoordinateUpdatePeriod: 15 * time.Second,
RetryJoinLAN: []string{ RetryJoinLAN: []string{
"foo=bar key=baz secret=boom bang=bar", "foo=bar key=baz secret=boom bang=bar",
@ -6749,6 +6766,10 @@ func TestSanitize(t *testing.T) {
"BindAddr": "127.0.0.1", "BindAddr": "127.0.0.1",
"Bootstrap": false, "Bootstrap": false,
"BootstrapExpect": 0, "BootstrapExpect": 0,
"Cache": {
"EntryFetchMaxBurst": 42,
"EntryFetchRate": 0.334
},
"CAFile": "", "CAFile": "",
"CAPath": "", "CAPath": "",
"CertFile": "", "CertFile": "",

View File

@ -7,6 +7,7 @@ import (
"github.com/mitchellh/copystructure" "github.com/mitchellh/copystructure"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
@ -461,7 +462,7 @@ func TestManager_deliverLatest(t *testing.T) {
// None of these need to do anything to test this method just be valid // None of these need to do anything to test this method just be valid
logger := testutil.Logger(t) logger := testutil.Logger(t)
cfg := ManagerConfig{ cfg := ManagerConfig{
Cache: cache.New(nil), Cache: cache.New(cache.Options{EntryFetchRate: rate.Inf, EntryFetchMaxBurst: 2}),
State: local.NewState(local.Config{}, logger, &token.Store{}), State: local.NewState(local.Config{}, logger, &token.Store{}),
Source: &structs.QuerySource{ Source: &structs.QuerySource{
Node: "node1", Node: "node1",

View File

@ -959,6 +959,24 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
- `bind_addr` Equivalent to the [`-bind` command-line flag](#_bind). - `bind_addr` Equivalent to the [`-bind` command-line flag](#_bind).
- `cache` Cache configuration of agent. The configurable values are the following:
- `entry_fetch_max_burst`: The size of the token bucket used to recharge the rate-limit per
cache entry. The default value is 2 and means that when cache has not been updated
for a long time, 2 successive queries can be made as long as the rate-limit is not
reached.
- `entry_fetch_rate`: configures the rate-limit at which the cache may refresh a single
entry. On a cluster with many changes/s, watching changes in the cache might put high
pressure on the servers. This ensures the number of requests for a single cache entry
will never go beyond this limit, even when a given service changes every 1/100s.
Since this is a per cache entry limit, having a highly unstable service will only rate
limit the watched on this service, but not the other services/entries.
The value is strictly positive, expressed in queries per second as a float,
1 means 1 query per second, 0.1 mean 1 request every 10s maximum.
The default value is "No limit" and should be tuned on large
clusters to avoid performing too many RPCs on entries changing a lot.
- `ca_file` This provides a file path to a PEM-encoded certificate - `ca_file` This provides a file path to a PEM-encoded certificate
authority. The certificate authority is used to check the authenticity of client authority. The certificate authority is used to check the authenticity of client
and server connections with the appropriate [`verify_incoming`](#verify_incoming) and server connections with the appropriate [`verify_incoming`](#verify_incoming)