mirror of
https://github.com/status-im/consul.git
synced 2025-02-10 12:46:32 +00:00
Merge pull request #8552 from pierresouchay/reload_cache_throttling_config
Ensure that Cache options are reloaded when `consul reload` is performed
This commit is contained in:
parent
295358044b
commit
607a494000
@ -3749,6 +3749,12 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if a.cache.ReloadOptions(newCfg.Cache) {
|
||||||
|
a.logger.Info("Cache options have been updated")
|
||||||
|
} else {
|
||||||
|
a.logger.Debug("Cache options have not been modified")
|
||||||
|
}
|
||||||
|
|
||||||
// Update filtered metrics
|
// Update filtered metrics
|
||||||
metrics.UpdateFilter(newCfg.Telemetry.AllowedPrefixes,
|
metrics.UpdateFilter(newCfg.Telemetry.AllowedPrefixes,
|
||||||
newCfg.Telemetry.BlockedPrefixes)
|
newCfg.Telemetry.BlockedPrefixes)
|
||||||
|
@ -42,6 +42,7 @@ import (
|
|||||||
"github.com/pascaldekloe/goe/verify"
|
"github.com/pascaldekloe/goe/verify"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
"gopkg.in/square/go-jose.v2/jwt"
|
"gopkg.in/square/go-jose.v2/jwt"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -764,10 +765,18 @@ func TestCacheRateLimit(test *testing.T) {
|
|||||||
test.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) {
|
test.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) {
|
||||||
tt := currentTest
|
tt := currentTest
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
a := NewTestAgent(t, fmt.Sprintf("cache = { entry_fetch_rate = %v, entry_fetch_max_burst = 1 }", tt.rateLimit))
|
a := NewTestAgent(t, "cache = { entry_fetch_rate = 1, entry_fetch_max_burst = 100 }")
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
cfg := a.config
|
||||||
|
require.Equal(t, rate.Limit(1), a.config.Cache.EntryFetchRate)
|
||||||
|
require.Equal(t, 100, a.config.Cache.EntryFetchMaxBurst)
|
||||||
|
cfg.Cache.EntryFetchRate = rate.Limit(tt.rateLimit)
|
||||||
|
cfg.Cache.EntryFetchMaxBurst = 1
|
||||||
|
a.reloadConfigInternal(cfg)
|
||||||
|
require.Equal(t, rate.Limit(tt.rateLimit), a.config.Cache.EntryFetchRate)
|
||||||
|
require.Equal(t, 1, a.config.Cache.EntryFetchMaxBurst)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
stillProcessing := true
|
stillProcessing := true
|
||||||
|
|
||||||
|
38
agent/cache/cache.go
vendored
38
agent/cache/cache.go
vendored
@ -143,16 +143,26 @@ type Options struct {
|
|||||||
EntryFetchRate rate.Limit
|
EntryFetchRate rate.Limit
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new cache with the given RPC client and reasonable defaults.
|
// Equal return true if both options are equivalent
|
||||||
// Further settings can be tweaked on the returned value.
|
func (o Options) Equal(other Options) bool {
|
||||||
func New(options Options) *Cache {
|
return o.EntryFetchMaxBurst == other.EntryFetchMaxBurst && o.EntryFetchRate == other.EntryFetchRate
|
||||||
|
}
|
||||||
|
|
||||||
|
// applyDefaultValuesOnOptions set default values on options and returned updated value
|
||||||
|
func applyDefaultValuesOnOptions(options Options) Options {
|
||||||
if options.EntryFetchRate == 0.0 {
|
if options.EntryFetchRate == 0.0 {
|
||||||
options.EntryFetchRate = DefaultEntryFetchRate
|
options.EntryFetchRate = DefaultEntryFetchRate
|
||||||
}
|
}
|
||||||
if options.EntryFetchMaxBurst == 0 {
|
if options.EntryFetchMaxBurst == 0 {
|
||||||
options.EntryFetchMaxBurst = DefaultEntryFetchMaxBurst
|
options.EntryFetchMaxBurst = DefaultEntryFetchMaxBurst
|
||||||
}
|
}
|
||||||
|
return options
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new cache with the given RPC client and reasonable defaults.
|
||||||
|
// Further settings can be tweaked on the returned value.
|
||||||
|
func New(options Options) *Cache {
|
||||||
|
options = applyDefaultValuesOnOptions(options)
|
||||||
// Initialize the heap. The buffer of 1 is really important because
|
// Initialize the heap. The buffer of 1 is really important because
|
||||||
// its possible for the expiry loop to trigger the heap to update
|
// its possible for the expiry loop to trigger the heap to update
|
||||||
// itself and it'd block forever otherwise.
|
// itself and it'd block forever otherwise.
|
||||||
@ -232,6 +242,28 @@ func (c *Cache) RegisterType(n string, typ Type) {
|
|||||||
c.types[n] = typeEntry{Name: n, Type: typ, Opts: &opts}
|
c.types[n] = typeEntry{Name: n, Type: typ, Opts: &opts}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReloadOptions updates the cache with the new options
|
||||||
|
// return true if Cache is updated, false if already up to date
|
||||||
|
func (c *Cache) ReloadOptions(options Options) bool {
|
||||||
|
options = applyDefaultValuesOnOptions(options)
|
||||||
|
modified := !options.Equal(c.options)
|
||||||
|
if modified {
|
||||||
|
c.entriesLock.RLock()
|
||||||
|
defer c.entriesLock.RUnlock()
|
||||||
|
for _, entry := range c.entries {
|
||||||
|
if c.options.EntryFetchRate != options.EntryFetchRate {
|
||||||
|
entry.FetchRateLimiter.SetLimit(options.EntryFetchRate)
|
||||||
|
}
|
||||||
|
if c.options.EntryFetchMaxBurst != options.EntryFetchMaxBurst {
|
||||||
|
entry.FetchRateLimiter.SetBurst(options.EntryFetchMaxBurst)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.options.EntryFetchRate = options.EntryFetchRate
|
||||||
|
c.options.EntryFetchMaxBurst = options.EntryFetchMaxBurst
|
||||||
|
}
|
||||||
|
return modified
|
||||||
|
}
|
||||||
|
|
||||||
// Get loads the data for the given type and request. If data satisfying the
|
// Get loads the data for the given type and request. If data satisfying the
|
||||||
// minimum index is present in the cache, it is returned immediately. Otherwise,
|
// minimum index is present in the cache, it is returned immediately. Otherwise,
|
||||||
// this will block until the data is available or the request timeout is
|
// this will block until the data is available or the request timeout is
|
||||||
|
59
agent/cache/cache_test.go
vendored
59
agent/cache/cache_test.go
vendored
@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Test a basic Get with no indexes (and therefore no blocking queries).
|
// Test a basic Get with no indexes (and therefore no blocking queries).
|
||||||
@ -1220,6 +1221,64 @@ func TestCacheGet_nonBlockingType(t *testing.T) {
|
|||||||
typ.AssertExpectations(t)
|
typ.AssertExpectations(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test a get with an index set will wait until an index that is higher
|
||||||
|
// is set in the cache.
|
||||||
|
func TestCacheReload(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
typ1 := TestType(t)
|
||||||
|
defer typ1.AssertExpectations(t)
|
||||||
|
|
||||||
|
c := New(Options{EntryFetchRate: rate.Limit(1), EntryFetchMaxBurst: 1})
|
||||||
|
c.RegisterType("t1", typ1)
|
||||||
|
typ1.Mock.On("Fetch", mock.Anything, mock.Anything).Return(FetchResult{Value: 42, Index: 42}, nil).Maybe()
|
||||||
|
|
||||||
|
require.False(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(1), EntryFetchMaxBurst: 1}), "Value should not be reloaded")
|
||||||
|
|
||||||
|
_, meta, err := c.Get(context.Background(), "t1", TestRequest(t, RequestInfo{Key: "hello1", MinIndex: uint64(1)}))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, meta.Index, uint64(42))
|
||||||
|
|
||||||
|
testEntry := func(t *testing.T, doTest func(t *testing.T, entry cacheEntry)) {
|
||||||
|
c.entriesLock.Lock()
|
||||||
|
tEntry, ok := c.types["t1"]
|
||||||
|
require.True(t, ok)
|
||||||
|
keyName := makeEntryKey("t1", "", "", "hello1")
|
||||||
|
ok, entryValid, entry := c.getEntryLocked(tEntry, keyName, RequestInfo{})
|
||||||
|
require.True(t, ok)
|
||||||
|
require.True(t, entryValid)
|
||||||
|
doTest(t, entry)
|
||||||
|
c.entriesLock.Unlock()
|
||||||
|
|
||||||
|
}
|
||||||
|
testEntry(t, func(t *testing.T, entry cacheEntry) {
|
||||||
|
require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(1))
|
||||||
|
require.Equal(t, entry.FetchRateLimiter.Burst(), 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Modify only rateLimit
|
||||||
|
require.True(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(100), EntryFetchMaxBurst: 1}))
|
||||||
|
testEntry(t, func(t *testing.T, entry cacheEntry) {
|
||||||
|
require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(100))
|
||||||
|
require.Equal(t, entry.FetchRateLimiter.Burst(), 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Modify only Burst
|
||||||
|
require.True(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(100), EntryFetchMaxBurst: 5}))
|
||||||
|
testEntry(t, func(t *testing.T, entry cacheEntry) {
|
||||||
|
require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(100))
|
||||||
|
require.Equal(t, entry.FetchRateLimiter.Burst(), 5)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Modify only Burst and Limit at the same time
|
||||||
|
require.True(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(1000), EntryFetchMaxBurst: 42}))
|
||||||
|
|
||||||
|
testEntry(t, func(t *testing.T, entry cacheEntry) {
|
||||||
|
require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(1000))
|
||||||
|
require.Equal(t, entry.FetchRateLimiter.Burst(), 42)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// TestCacheThrottle checks the assumptions for the cache throttling. It sets
|
// TestCacheThrottle checks the assumptions for the cache throttling. It sets
|
||||||
// up a cache with Options{EntryFetchRate: 10.0, EntryFetchMaxBurst: 1}, which
|
// up a cache with Options{EntryFetchRate: 10.0, EntryFetchMaxBurst: 1}, which
|
||||||
// allows for 10req/s, or one request every 100ms.
|
// allows for 10req/s, or one request every 100ms.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user