mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
Merge pull request #8552 from pierresouchay/reload_cache_throttling_config
Ensure that Cache options are reloaded when `consul reload` is performed
This commit is contained in:
commit
72bf350069
@ -3764,6 +3764,12 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if a.cache.ReloadOptions(newCfg.Cache) {
|
||||
a.logger.Info("Cache options have been updated")
|
||||
} else {
|
||||
a.logger.Debug("Cache options have not been modified")
|
||||
}
|
||||
|
||||
// Update filtered metrics
|
||||
metrics.UpdateFilter(newCfg.Telemetry.AllowedPrefixes,
|
||||
newCfg.Telemetry.BlockedPrefixes)
|
||||
|
@ -43,6 +43,7 @@ import (
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/time/rate"
|
||||
"gopkg.in/square/go-jose.v2/jwt"
|
||||
)
|
||||
|
||||
@ -765,10 +766,18 @@ func TestCacheRateLimit(test *testing.T) {
|
||||
test.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) {
|
||||
tt := currentTest
|
||||
t.Parallel()
|
||||
a := NewTestAgent(t, fmt.Sprintf("cache = { entry_fetch_rate = %v, entry_fetch_max_burst = 1 }", tt.rateLimit))
|
||||
a := NewTestAgent(t, "cache = { entry_fetch_rate = 1, entry_fetch_max_burst = 100 }")
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||
|
||||
cfg := a.config
|
||||
require.Equal(t, rate.Limit(1), a.config.Cache.EntryFetchRate)
|
||||
require.Equal(t, 100, a.config.Cache.EntryFetchMaxBurst)
|
||||
cfg.Cache.EntryFetchRate = rate.Limit(tt.rateLimit)
|
||||
cfg.Cache.EntryFetchMaxBurst = 1
|
||||
a.reloadConfigInternal(cfg)
|
||||
require.Equal(t, rate.Limit(tt.rateLimit), a.config.Cache.EntryFetchRate)
|
||||
require.Equal(t, 1, a.config.Cache.EntryFetchMaxBurst)
|
||||
var wg sync.WaitGroup
|
||||
stillProcessing := true
|
||||
|
||||
|
38
agent/cache/cache.go
vendored
38
agent/cache/cache.go
vendored
@ -144,16 +144,26 @@ type Options struct {
|
||||
EntryFetchRate rate.Limit
|
||||
}
|
||||
|
||||
// New creates a new cache with the given RPC client and reasonable defaults.
|
||||
// Further settings can be tweaked on the returned value.
|
||||
func New(options Options) *Cache {
|
||||
// Equal return true if both options are equivalent
|
||||
func (o Options) Equal(other Options) bool {
|
||||
return o.EntryFetchMaxBurst == other.EntryFetchMaxBurst && o.EntryFetchRate == other.EntryFetchRate
|
||||
}
|
||||
|
||||
// applyDefaultValuesOnOptions set default values on options and returned updated value
|
||||
func applyDefaultValuesOnOptions(options Options) Options {
|
||||
if options.EntryFetchRate == 0.0 {
|
||||
options.EntryFetchRate = DefaultEntryFetchRate
|
||||
}
|
||||
if options.EntryFetchMaxBurst == 0 {
|
||||
options.EntryFetchMaxBurst = DefaultEntryFetchMaxBurst
|
||||
}
|
||||
return options
|
||||
}
|
||||
|
||||
// New creates a new cache with the given RPC client and reasonable defaults.
|
||||
// Further settings can be tweaked on the returned value.
|
||||
func New(options Options) *Cache {
|
||||
options = applyDefaultValuesOnOptions(options)
|
||||
// Initialize the heap. The buffer of 1 is really important because
|
||||
// its possible for the expiry loop to trigger the heap to update
|
||||
// itself and it'd block forever otherwise.
|
||||
@ -234,6 +244,28 @@ func (c *Cache) RegisterType(n string, typ Type) {
|
||||
c.types[n] = typeEntry{Name: n, Type: typ, Opts: &opts}
|
||||
}
|
||||
|
||||
// ReloadOptions updates the cache with the new options
|
||||
// return true if Cache is updated, false if already up to date
|
||||
func (c *Cache) ReloadOptions(options Options) bool {
|
||||
options = applyDefaultValuesOnOptions(options)
|
||||
modified := !options.Equal(c.options)
|
||||
if modified {
|
||||
c.entriesLock.RLock()
|
||||
defer c.entriesLock.RUnlock()
|
||||
for _, entry := range c.entries {
|
||||
if c.options.EntryFetchRate != options.EntryFetchRate {
|
||||
entry.FetchRateLimiter.SetLimit(options.EntryFetchRate)
|
||||
}
|
||||
if c.options.EntryFetchMaxBurst != options.EntryFetchMaxBurst {
|
||||
entry.FetchRateLimiter.SetBurst(options.EntryFetchMaxBurst)
|
||||
}
|
||||
}
|
||||
c.options.EntryFetchRate = options.EntryFetchRate
|
||||
c.options.EntryFetchMaxBurst = options.EntryFetchMaxBurst
|
||||
}
|
||||
return modified
|
||||
}
|
||||
|
||||
// Get loads the data for the given type and request. If data satisfying the
|
||||
// minimum index is present in the cache, it is returned immediately. Otherwise,
|
||||
// this will block until the data is available or the request timeout is
|
||||
|
59
agent/cache/cache_test.go
vendored
59
agent/cache/cache_test.go
vendored
@ -14,6 +14,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// Test a basic Get with no indexes (and therefore no blocking queries).
|
||||
@ -1220,6 +1221,64 @@ func TestCacheGet_nonBlockingType(t *testing.T) {
|
||||
typ.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// Test a get with an index set will wait until an index that is higher
|
||||
// is set in the cache.
|
||||
func TestCacheReload(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
typ1 := TestType(t)
|
||||
defer typ1.AssertExpectations(t)
|
||||
|
||||
c := New(Options{EntryFetchRate: rate.Limit(1), EntryFetchMaxBurst: 1})
|
||||
c.RegisterType("t1", typ1)
|
||||
typ1.Mock.On("Fetch", mock.Anything, mock.Anything).Return(FetchResult{Value: 42, Index: 42}, nil).Maybe()
|
||||
|
||||
require.False(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(1), EntryFetchMaxBurst: 1}), "Value should not be reloaded")
|
||||
|
||||
_, meta, err := c.Get(context.Background(), "t1", TestRequest(t, RequestInfo{Key: "hello1", MinIndex: uint64(1)}))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, meta.Index, uint64(42))
|
||||
|
||||
testEntry := func(t *testing.T, doTest func(t *testing.T, entry cacheEntry)) {
|
||||
c.entriesLock.Lock()
|
||||
tEntry, ok := c.types["t1"]
|
||||
require.True(t, ok)
|
||||
keyName := makeEntryKey("t1", "", "", "hello1")
|
||||
ok, entryValid, entry := c.getEntryLocked(tEntry, keyName, RequestInfo{})
|
||||
require.True(t, ok)
|
||||
require.True(t, entryValid)
|
||||
doTest(t, entry)
|
||||
c.entriesLock.Unlock()
|
||||
|
||||
}
|
||||
testEntry(t, func(t *testing.T, entry cacheEntry) {
|
||||
require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(1))
|
||||
require.Equal(t, entry.FetchRateLimiter.Burst(), 1)
|
||||
})
|
||||
|
||||
// Modify only rateLimit
|
||||
require.True(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(100), EntryFetchMaxBurst: 1}))
|
||||
testEntry(t, func(t *testing.T, entry cacheEntry) {
|
||||
require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(100))
|
||||
require.Equal(t, entry.FetchRateLimiter.Burst(), 1)
|
||||
})
|
||||
|
||||
// Modify only Burst
|
||||
require.True(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(100), EntryFetchMaxBurst: 5}))
|
||||
testEntry(t, func(t *testing.T, entry cacheEntry) {
|
||||
require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(100))
|
||||
require.Equal(t, entry.FetchRateLimiter.Burst(), 5)
|
||||
})
|
||||
|
||||
// Modify only Burst and Limit at the same time
|
||||
require.True(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(1000), EntryFetchMaxBurst: 42}))
|
||||
|
||||
testEntry(t, func(t *testing.T, entry cacheEntry) {
|
||||
require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(1000))
|
||||
require.Equal(t, entry.FetchRateLimiter.Burst(), 42)
|
||||
})
|
||||
}
|
||||
|
||||
// TestCacheThrottle checks the assumptions for the cache throttling. It sets
|
||||
// up a cache with Options{EntryFetchRate: 10.0, EntryFetchMaxBurst: 1}, which
|
||||
// allows for 10req/s, or one request every 100ms.
|
||||
|
Loading…
x
Reference in New Issue
Block a user