mirror of https://github.com/status-im/consul.git
cache: fix bug where TTLs were ignored leading to leaked memory in client agents (#9978)
* Fix bug in cache where TTLs are effectively ignored This mostly affects streaming since streaming will immediately return from Fetch calls when the state is Closed on eviction which causes the race condition every time. However this also affects all other cache types if the fetch call happens to return between the eviction and then next time around the Get loop by any client. There is a separate bug that allows cache items to be evicted even when there are active clients which is the trigger here. * Add changelog entry * Update .changelog/9978.txt
This commit is contained in:
parent
0efdb9f0b5
commit
78c1528c48
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
cache: fix a bug in the client agent cache where streaming could potentially leak resources. [[GH-9978](https://github.com/hashicorp/consul/pull/9978)].
|
||||
```
|
|
@ -723,6 +723,22 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
|
|||
// Set our entry
|
||||
c.entriesLock.Lock()
|
||||
|
||||
if _, ok := c.entries[key]; !ok {
|
||||
// This entry was evicted during our fetch. DON'T re-insert it or fall
|
||||
// through to the refresh loop below otherwise it will live forever! In
|
||||
// theory there should not be any Get calls waiting on entry.Waiter since
|
||||
// they would have prevented the eviction, but in practice there may be
|
||||
// due to timing and the fact that we don't update the TTL on the entry if
|
||||
// errors are being returned for a while. So we do need to unblock them,
|
||||
// which will mean they recreate the entry again right away and so "reset"
|
||||
// to a good state anyway!
|
||||
c.entriesLock.Unlock()
|
||||
|
||||
// Trigger any waiters that are around.
|
||||
close(entry.Waiter)
|
||||
return
|
||||
}
|
||||
|
||||
// If this is a new entry (not in the heap yet), then setup the
|
||||
// initial expiry information and insert. If we're already in
|
||||
// the heap we do nothing since we're reusing the same entry.
|
||||
|
|
|
@ -762,7 +762,7 @@ func TestCacheGet_expire(t *testing.T) {
|
|||
// Wait for a non-trivial amount of time to sanity check the age increases at
|
||||
// least this amount. Note that this is not a fudge for some timing-dependent
|
||||
// background work it's just ensuring a non-trivial time elapses between the
|
||||
// request above and below serilaly in this thread so short time is OK.
|
||||
// request above and below serially in this thread so short time is OK.
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
// Get, should not fetch, verified via the mock assertions above
|
||||
|
@ -783,6 +783,160 @@ func TestCacheGet_expire(t *testing.T) {
|
|||
require.Equal(42, result)
|
||||
require.False(meta.Hit)
|
||||
|
||||
// Sleep a tiny bit just to let maybe some background calls happen then verify
|
||||
// that we still only got the one call
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
typ.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// Test that entries expire for background refresh types that cancel fetch on
|
||||
// eviction. This is really a special case of the test below where the close
|
||||
// behavior of the type forces the timing that causes the race but it's worth
|
||||
// keeping explicitly anyway to make sure this behavior is supported and
|
||||
// doesn't introduce any different races.
|
||||
func TestCacheGet_expireBackgroudRefreshCancel(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
typ := &MockType{}
|
||||
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||
LastGetTTL: 400 * time.Millisecond,
|
||||
Refresh: true,
|
||||
RefreshTimer: 0,
|
||||
SupportsBlocking: true,
|
||||
})
|
||||
defer typ.AssertExpectations(t)
|
||||
c := New(Options{})
|
||||
|
||||
// Register the type with a timeout
|
||||
c.RegisterType("t", typ)
|
||||
|
||||
// Create a cache state that is a closer that cancels the context on close
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
closer := &testCloser{
|
||||
closeFn: func() {
|
||||
cancel()
|
||||
},
|
||||
}
|
||||
|
||||
// Configure the type
|
||||
typ.On("Fetch", mock.Anything, mock.Anything).
|
||||
Return(func(o FetchOptions, r Request) FetchResult {
|
||||
return FetchResult{Value: 8, Index: 4, State: closer}
|
||||
}, func(o FetchOptions, r Request) error {
|
||||
if o.MinIndex == 4 {
|
||||
// Simulate waiting for a new value on second call until the cache type
|
||||
// is evicted
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// Get, should fetch
|
||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||
result, meta, err := c.Get(context.Background(), "t", req)
|
||||
require.NoError(err)
|
||||
require.Equal(8, result)
|
||||
require.Equal(uint64(4), meta.Index)
|
||||
require.False(meta.Hit)
|
||||
|
||||
// Get, should not fetch, verified via the mock assertions above
|
||||
req = TestRequest(t, RequestInfo{Key: "hello"})
|
||||
result, meta, err = c.Get(context.Background(), "t", req)
|
||||
require.NoError(err)
|
||||
require.Equal(8, result)
|
||||
require.Equal(uint64(4), meta.Index)
|
||||
require.True(meta.Hit)
|
||||
|
||||
// Sleep for the expiry
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Get, should fetch
|
||||
req = TestRequest(t, RequestInfo{Key: "hello"})
|
||||
result, meta, err = c.Get(context.Background(), "t", req)
|
||||
require.NoError(err)
|
||||
require.Equal(8, result)
|
||||
require.Equal(uint64(4), meta.Index)
|
||||
require.False(meta.Hit, "the fetch should not have re-populated the cache "+
|
||||
"entry after it expired so this get should be a miss")
|
||||
|
||||
// Sleep a tiny bit just to let maybe some background calls happen
|
||||
// then verify that we still only got the one call
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
typ.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// Test that entries expire for background refresh types that return before any
|
||||
// watcher re-fetches.
|
||||
func TestCacheGet_expireBackgroudRefresh(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
typ := &MockType{}
|
||||
typ.On("RegisterOptions").Return(RegisterOptions{
|
||||
LastGetTTL: 400 * time.Millisecond,
|
||||
Refresh: true,
|
||||
RefreshTimer: 0,
|
||||
SupportsBlocking: true,
|
||||
})
|
||||
defer typ.AssertExpectations(t)
|
||||
c := New(Options{})
|
||||
|
||||
// Register the type with a timeout
|
||||
c.RegisterType("t", typ)
|
||||
|
||||
ctrlCh := make(chan struct{})
|
||||
|
||||
// Configure the type
|
||||
typ.On("Fetch", mock.Anything, mock.Anything).
|
||||
Return(func(o FetchOptions, r Request) FetchResult {
|
||||
if o.MinIndex == 4 {
|
||||
// Simulate returning from fetch (after a timeout with no value change)
|
||||
// at a time controlled by the test to ensure we interleave requests.
|
||||
<-ctrlCh
|
||||
}
|
||||
return FetchResult{Value: 8, Index: 4}
|
||||
}, func(o FetchOptions, r Request) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
// Get, should fetch
|
||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||
result, meta, err := c.Get(context.Background(), "t", req)
|
||||
require.NoError(err)
|
||||
require.Equal(8, result)
|
||||
require.Equal(uint64(4), meta.Index)
|
||||
require.False(meta.Hit)
|
||||
|
||||
// Get, should not fetch, verified via the mock assertions above
|
||||
req = TestRequest(t, RequestInfo{Key: "hello"})
|
||||
result, meta, err = c.Get(context.Background(), "t", req)
|
||||
require.NoError(err)
|
||||
require.Equal(8, result)
|
||||
require.Equal(uint64(4), meta.Index)
|
||||
require.True(meta.Hit)
|
||||
|
||||
// Sleep for the expiry
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Now (after expiry) let the fetch call return
|
||||
close(ctrlCh)
|
||||
|
||||
// Get, should fetch (it didn't originally because the fetch return would
|
||||
// re-insert the value back into the cache and make it live forever).
|
||||
req = TestRequest(t, RequestInfo{Key: "hello"})
|
||||
result, meta, err = c.Get(context.Background(), "t", req)
|
||||
require.NoError(err)
|
||||
require.Equal(8, result)
|
||||
require.Equal(uint64(4), meta.Index)
|
||||
require.False(meta.Hit, "the fetch should not have re-populated the cache "+
|
||||
"entry after it expired so this get should be a miss")
|
||||
|
||||
// Sleep a tiny bit just to let maybe some background calls happen
|
||||
// then verify that we still only got the one call
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
@ -882,11 +1036,15 @@ func TestCacheGet_expireClose(t *testing.T) {
|
|||
}
|
||||
|
||||
type testCloser struct {
|
||||
closed uint32
|
||||
closed uint32
|
||||
closeFn func()
|
||||
}
|
||||
|
||||
func (t *testCloser) Close() error {
|
||||
atomic.SwapUint32(&t.closed, 1)
|
||||
if t.closeFn != nil {
|
||||
t.closeFn()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue