From 2e223ea2b775e38fa38cff617ce069ba80839bfc Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Fri, 15 Jun 2018 21:03:19 +0100 Subject: [PATCH] Fix hot loop in cache for RPC returning zero index. --- agent/cache/cache.go | 40 +++++++++--- agent/cache/cache_test.go | 93 ++++++++++++++++++++++++---- agent/consul/state/intention.go | 9 +++ agent/consul/state/intention_test.go | 4 +- agent/testagent.go | 10 +++ 5 files changed, 133 insertions(+), 23 deletions(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 172250d7ac..e566797beb 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -325,14 +325,6 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- entry = cacheEntry{Valid: false, Waiter: make(chan struct{})} } - // We always specify an index greater than zero since index of zero - // means to always return immediately and we want to block if possible. - // Index 1 is always safe since Consul's own initialization always results - // in a higher index (around 10 or above). - if entry.Index == 0 { - entry.Index = 1 - } - // Set that we're fetching to true, which makes it so that future // identical calls to fetch will return the same waiter rather than // perform multiple fetches. @@ -355,6 +347,17 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- // A new value was given, so we create a brand new entry. newEntry.Value = result.Value newEntry.Index = result.Index + if newEntry.Index < 1 { + // Less than one is invalid unless there was an error and in this case + // there wasn't since a value was returned. If a badly behaved RPC + // returns 0 when it has no data, we might get into a busy loop here. We + // set this to minimum of 1 which is safe because no valid user data can + // ever be written at raft index 1 due to the bootstrap process for + // raft. This insure that any subsequent background refresh request will + // always block, but allows the initial request to return immediately + // even if there is no data. + newEntry.Index = 1 + } // This is a valid entry with a result newEntry.Valid = true @@ -365,8 +368,25 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1) metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1) - // Reset the attempts counter so we don't have any backoff - attempt = 0 + if result.Index > 0 { + // Reset the attempts counter so we don't have any backoff + attempt = 0 + } else { + // Result having a zero index is an implicit error case. There was no + // actual error but it implies the RPC found in index (nothing written + // yet for that type) but didn't take care to return safe "1" index. We + // don't want to actually treat it like an error by setting + // newEntry.Error to something non-nil, but we should guard against 100% + // CPU burn hot loops caused by that case which will never block but + // also won't backoff either. So we treat it as a failed attempt so that + // at least the failure backoff will save our CPU while still + // periodically refreshing so normal service can resume when the servers + // actually have something to return from the RPC. If we get in this + // state it can be considered a bug in the RPC implementation (to ever + // return a zero index) however since it can happen this is a safety net + // for the future. + attempt++ + } } else { metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1) metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1) diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 0faeae9e53..3c20c58a3e 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -471,10 +471,8 @@ func TestCacheGet_periodicRefreshErrorBackoff(t *testing.T) { require.True(t, actual < 10, fmt.Sprintf("actual: %d", actual)) } -// Test that fetching with no index actually sets to index to one, including -// for background refreshes. This ensures we don't end up with any index 0 -// loops. -func TestCacheGet_noIndexSetsOne(t *testing.T) { +// Test that a badly behaved RPC that returns 0 index will perform a backoff. +func TestCacheGet_periodicRefreshBadRPCZeroIndexErrorBackoff(t *testing.T) { t.Parallel() typ := TestType(t) @@ -487,18 +485,91 @@ func TestCacheGet_noIndexSetsOne(t *testing.T) { }) // Configure the type - fetchErr := fmt.Errorf("test fetch error") - typ.Static(FetchResult{Value: 42, Index: 0}, fetchErr).Run(func(args mock.Arguments) { - opts := args.Get(0).(FetchOptions) - assert.True(t, opts.MinIndex > 0, "minIndex > 0") + var retries uint32 + typ.Static(FetchResult{Value: 0, Index: 0}, nil).Run(func(args mock.Arguments) { + atomic.AddUint32(&retries, 1) }) // Fetch resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) - TestCacheGetChResult(t, resultCh, 42) + TestCacheGetChResult(t, resultCh, 0) - // Sleep a bit so background refresh happens - time.Sleep(100 * time.Millisecond) + // Sleep a bit. The refresh will quietly fail in the background. What we + // want to verify is that it doesn't retry too much. "Too much" is hard + // to measure since its CPU dependent if this test is failing. But due + // to the short sleep below, we can calculate about what we'd expect if + // backoff IS working. + time.Sleep(500 * time.Millisecond) + + // Fetch should work, we should get a 0 still. Errors are ignored. + resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) + TestCacheGetChResult(t, resultCh, 0) + + // Check the number + actual := atomic.LoadUint32(&retries) + require.True(t, actual < 10, fmt.Sprintf("%d retries, should be < 10", actual)) +} + +// Test that fetching with no index makes an initial request with no index, but +// then ensures all background refreshes have > 0. This ensures we don't end up +// with any index 0 loops from background refreshed while also returning +// immediately on the initial request if there is no data written to that table +// yet. +func TestCacheGet_noIndexSetsOne(t *testing.T) { + t.Parallel() + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, &RegisterOptions{ + Refresh: true, + RefreshTimer: 0, + RefreshTimeout: 5 * time.Minute, + }) + + // Simulate "well behaved" RPC with no data yet but returning 1 + { + first := int32(1) + + typ.Static(FetchResult{Value: 0, Index: 1}, nil).Run(func(args mock.Arguments) { + opts := args.Get(0).(FetchOptions) + isFirst := atomic.SwapInt32(&first, 0) + if isFirst == 1 { + assert.Equal(t, uint64(0), opts.MinIndex) + } else { + assert.True(t, opts.MinIndex > 0, "minIndex > 0") + } + }) + + // Fetch + resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) + TestCacheGetChResult(t, resultCh, 0) + + // Sleep a bit so background refresh happens + time.Sleep(100 * time.Millisecond) + } + + // Same for "badly behaved" RPC that returns 0 index and no data + { + first := int32(1) + + typ.Static(FetchResult{Value: 0, Index: 0}, nil).Run(func(args mock.Arguments) { + opts := args.Get(0).(FetchOptions) + isFirst := atomic.SwapInt32(&first, 0) + if isFirst == 1 { + assert.Equal(t, uint64(0), opts.MinIndex) + } else { + assert.True(t, opts.MinIndex > 0, "minIndex > 0") + } + }) + + // Fetch + resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) + TestCacheGetChResult(t, resultCh, 0) + + // Sleep a bit so background refresh happens + time.Sleep(100 * time.Millisecond) + } } // Test that the backend fetch sets the proper timeout. diff --git a/agent/consul/state/intention.go b/agent/consul/state/intention.go index a29b9f63f7..bf113fd8b7 100644 --- a/agent/consul/state/intention.go +++ b/agent/consul/state/intention.go @@ -132,6 +132,9 @@ func (s *Store) Intentions(ws memdb.WatchSet) (uint64, structs.Intentions, error // Get the index idx := maxIndexTxn(tx, intentionsTableName) + if idx < 1 { + idx = 1 + } // Get all intentions iter, err := tx.Get(intentionsTableName, "id") @@ -228,6 +231,9 @@ func (s *Store) IntentionGet(ws memdb.WatchSet, id string) (uint64, *structs.Int // Get the table index. idx := maxIndexTxn(tx, intentionsTableName) + if idx < 1 { + idx = 1 + } // Look up by its ID. watchCh, intention, err := tx.FirstWatch(intentionsTableName, "id", id) @@ -295,6 +301,9 @@ func (s *Store) IntentionMatch(ws memdb.WatchSet, args *structs.IntentionQueryMa // Get the table index. idx := maxIndexTxn(tx, intentionsTableName) + if idx < 1 { + idx = 1 + } // Make all the calls and accumulate the results results := make([]structs.Intentions, len(args.Entries)) diff --git a/agent/consul/state/intention_test.go b/agent/consul/state/intention_test.go index 3e7df561c1..a68d6222d4 100644 --- a/agent/consul/state/intention_test.go +++ b/agent/consul/state/intention_test.go @@ -17,7 +17,7 @@ func TestStore_IntentionGet_none(t *testing.T) { // Querying with no results returns nil. ws := memdb.NewWatchSet() idx, res, err := s.IntentionGet(ws, testUUID()) - assert.Equal(idx, uint64(0)) + assert.Equal(uint64(1), idx) assert.Nil(res) assert.Nil(err) } @@ -231,7 +231,7 @@ func TestStore_IntentionsList(t *testing.T) { idx, res, err := s.Intentions(ws) assert.NoError(err) assert.Nil(res) - assert.Equal(idx, uint64(0)) + assert.Equal(uint64(1), idx) // Create some intentions ixns := structs.Intentions{ diff --git a/agent/testagent.go b/agent/testagent.go index 007d01322f..3703ded1ba 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -352,6 +352,16 @@ func TestConfig(sources ...config.Source) *config.RuntimeConfig { ca_config { cluster_id = "` + connect.TestClusterID + `" } + proxy_defaults { + // Generally we don't actually need to test real proxy startup except + // in the Daemon package which explicitly manages how it starts things + // so making this a no-op long running command like /bin/sleep would + // be fine, but there is no such thing on windows etc. We hackily rely + // on the fact that if the executable doesn't exist, the Daemon + // manager will get an exec error and retry it with a backoff beningly + // until tests pass. + daemon_command = ["/bin/sleep", "3600"] + } } performance { raft_multiplier = 1