mirror of https://github.com/status-im/consul.git
Make the Agent Cache more Context aware (#8092)
Blocking queries issues will still be uncancellable (that cannot be helped until we get rid of net/rpc). However this makes it so that if calling getWithIndex (like during a cache Notify go routine) we can cancell the outer routine. Previously it would keep issuing more blocking queries until the result state actually changed.
This commit is contained in:
parent
965c80e2eb
commit
8837907de4
|
@ -1317,7 +1317,7 @@ func (s *HTTPServer) AgentConnectCARoots(resp http.ResponseWriter, req *http.Req
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, m, err := s.agent.cache.Get(cachetype.ConnectCARootName, &args)
|
raw, m, err := s.agent.cache.Get(req.Context(), cachetype.ConnectCARootName, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1359,7 +1359,7 @@ func (s *HTTPServer) AgentConnectCALeafCert(resp http.ResponseWriter, req *http.
|
||||||
args.MaxQueryTime = qOpts.MaxQueryTime
|
args.MaxQueryTime = qOpts.MaxQueryTime
|
||||||
args.Token = qOpts.Token
|
args.Token = qOpts.Token
|
||||||
|
|
||||||
raw, m, err := s.agent.cache.Get(cachetype.ConnectCALeafName, &args)
|
raw, m, err := s.agent.cache.Get(req.Context(), cachetype.ConnectCALeafName, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -469,7 +469,9 @@ func activeRootHasKey(roots *structs.IndexedCARoots, currentSigningKeyID string)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConnectCALeaf) rootsFromCache() (*structs.IndexedCARoots, error) {
|
func (c *ConnectCALeaf) rootsFromCache() (*structs.IndexedCARoots, error) {
|
||||||
rawRoots, _, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{
|
// Background is fine here because this isn't a blocking query as no index is set.
|
||||||
|
// Therefore this will just either be a cache hit or return once the non-blocking query returns.
|
||||||
|
rawRoots, _, err := c.Cache.Get(context.Background(), ConnectCARootName, &structs.DCSpecificRequest{
|
||||||
Datacenter: c.Datacenter,
|
Datacenter: c.Datacenter,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -16,6 +16,7 @@ package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -216,7 +217,7 @@ func (c *Cache) RegisterType(n string, typ Type) {
|
||||||
// index is retrieved, the last known value (maybe nil) is returned. No
|
// index is retrieved, the last known value (maybe nil) is returned. No
|
||||||
// error is returned on timeout. This matches the behavior of Consul blocking
|
// error is returned on timeout. This matches the behavior of Consul blocking
|
||||||
// queries.
|
// queries.
|
||||||
func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
|
func (c *Cache) Get(ctx context.Context, t string, r Request) (interface{}, ResultMeta, error) {
|
||||||
c.typesLock.RLock()
|
c.typesLock.RLock()
|
||||||
tEntry, ok := c.types[t]
|
tEntry, ok := c.types[t]
|
||||||
c.typesLock.RUnlock()
|
c.typesLock.RUnlock()
|
||||||
|
@ -225,7 +226,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
|
||||||
// once. But be robust against panics.
|
// once. But be robust against panics.
|
||||||
return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
|
return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
|
||||||
}
|
}
|
||||||
return c.getWithIndex(newGetOptions(tEntry, r))
|
return c.getWithIndex(ctx, newGetOptions(tEntry, r))
|
||||||
}
|
}
|
||||||
|
|
||||||
// getOptions contains the arguments for a Get request. It is used in place of
|
// getOptions contains the arguments for a Get request. It is used in place of
|
||||||
|
@ -292,7 +293,7 @@ func entryExceedsMaxAge(maxAge time.Duration, entry cacheEntry) bool {
|
||||||
// getWithIndex implements the main Get functionality but allows internal
|
// getWithIndex implements the main Get functionality but allows internal
|
||||||
// callers (Watch) to manipulate the blocking index separately from the actual
|
// callers (Watch) to manipulate the blocking index separately from the actual
|
||||||
// request object.
|
// request object.
|
||||||
func (c *Cache) getWithIndex(r getOptions) (interface{}, ResultMeta, error) {
|
func (c *Cache) getWithIndex(ctx context.Context, r getOptions) (interface{}, ResultMeta, error) {
|
||||||
if r.Info.Key == "" {
|
if r.Info.Key == "" {
|
||||||
metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1)
|
metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1)
|
||||||
|
|
||||||
|
@ -394,6 +395,8 @@ RETRY_GET:
|
||||||
first = false
|
first = false
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ResultMeta{}, ctx.Err()
|
||||||
case <-waiterCh:
|
case <-waiterCh:
|
||||||
// Our fetch returned, retry the get from the cache.
|
// Our fetch returned, retry the get from the cache.
|
||||||
r.Info.MustRevalidate = false
|
r.Info.MustRevalidate = false
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -9,6 +10,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -30,13 +32,13 @@ func TestCacheGet_noIndex(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, meta, err := c.Get("t", req)
|
result, meta, err := c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Get, should not fetch since we already have a satisfying value
|
// Get, should not fetch since we already have a satisfying value
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.True(meta.Hit)
|
require.True(meta.Hit)
|
||||||
|
@ -64,13 +66,13 @@ func TestCacheGet_initError(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, meta, err := c.Get("t", req)
|
result, meta, err := c.Get(context.Background(), "t", req)
|
||||||
require.Error(err)
|
require.Error(err)
|
||||||
require.Nil(result)
|
require.Nil(result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Get, should fetch again since our last fetch was an error
|
// Get, should fetch again since our last fetch was an error
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.Error(err)
|
require.Error(err)
|
||||||
require.Nil(result)
|
require.Nil(result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
@ -104,13 +106,13 @@ func TestCacheGet_cachedErrorsDontStick(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch and get error
|
// Get, should fetch and get error
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, meta, err := c.Get("t", req)
|
result, meta, err := c.Get(context.Background(), "t", req)
|
||||||
require.Error(err)
|
require.Error(err)
|
||||||
require.Nil(result)
|
require.Nil(result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Get, should fetch again since our last fetch was an error, but get success
|
// Get, should fetch again since our last fetch was an error, but get success
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
@ -159,13 +161,13 @@ func TestCacheGet_blankCacheKey(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: ""})
|
req := TestRequest(t, RequestInfo{Key: ""})
|
||||||
result, meta, err := c.Get("t", req)
|
result, meta, err := c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Get, should not fetch since we already have a satisfying value
|
// Get, should not fetch since we already have a satisfying value
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
@ -296,6 +298,26 @@ func TestCacheGet_blockingIndex(t *testing.T) {
|
||||||
TestCacheGetChResult(t, resultCh, 42)
|
TestCacheGetChResult(t, resultCh, 42)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCacheGet_cancellation(t *testing.T) {
|
||||||
|
typ := TestType(t)
|
||||||
|
defer typ.AssertExpectations(t)
|
||||||
|
c := TestCache(t)
|
||||||
|
c.RegisterType("t", typ)
|
||||||
|
|
||||||
|
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Times(0).WaitUntil(time.After(1 * time.Millisecond))
|
||||||
|
|
||||||
|
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond))
|
||||||
|
// this is just to keep the linter happy
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
result, _, err := c.Get(ctx, "t", TestRequest(t, RequestInfo{
|
||||||
|
Key: "hello", MinIndex: 5}))
|
||||||
|
|
||||||
|
require.Nil(t, result)
|
||||||
|
require.Error(t, err)
|
||||||
|
testutil.RequireErrorContains(t, err, context.DeadlineExceeded.Error())
|
||||||
|
}
|
||||||
|
|
||||||
// Test a get with an index set will timeout if the fetch doesn't return
|
// Test a get with an index set will timeout if the fetch doesn't return
|
||||||
// anything.
|
// anything.
|
||||||
func TestCacheGet_blockingIndexTimeout(t *testing.T) {
|
func TestCacheGet_blockingIndexTimeout(t *testing.T) {
|
||||||
|
@ -393,7 +415,7 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, meta, err := c.Get("t", req)
|
result, meta, err := c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
@ -401,7 +423,7 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
|
||||||
// Get, should not fetch since we already have a satisfying value
|
// Get, should not fetch since we already have a satisfying value
|
||||||
req = TestRequest(t, RequestInfo{
|
req = TestRequest(t, RequestInfo{
|
||||||
Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond})
|
Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond})
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
@ -418,7 +440,7 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
|
||||||
// returns nil and so the previous result is used.
|
// returns nil and so the previous result is used.
|
||||||
req = TestRequest(t, RequestInfo{
|
req = TestRequest(t, RequestInfo{
|
||||||
Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond})
|
Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond})
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
@ -698,7 +720,7 @@ func TestCacheGet_fetchTimeout(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, meta, err := c.Get("t", req)
|
result, meta, err := c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
@ -728,7 +750,7 @@ func TestCacheGet_expire(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, meta, err := c.Get("t", req)
|
result, meta, err := c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
@ -741,7 +763,7 @@ func TestCacheGet_expire(t *testing.T) {
|
||||||
|
|
||||||
// Get, should not fetch, verified via the mock assertions above
|
// Get, should not fetch, verified via the mock assertions above
|
||||||
req = TestRequest(t, RequestInfo{Key: "hello"})
|
req = TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.True(meta.Hit)
|
require.True(meta.Hit)
|
||||||
|
@ -752,7 +774,7 @@ func TestCacheGet_expire(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req = TestRequest(t, RequestInfo{Key: "hello"})
|
req = TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
@ -784,7 +806,7 @@ func TestCacheGet_expireResetGet(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, meta, err := c.Get("t", req)
|
result, meta, err := c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
@ -797,7 +819,7 @@ func TestCacheGet_expireResetGet(t *testing.T) {
|
||||||
|
|
||||||
// Get, should not fetch
|
// Get, should not fetch
|
||||||
req = TestRequest(t, RequestInfo{Key: "hello"})
|
req = TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.True(meta.Hit)
|
require.True(meta.Hit)
|
||||||
|
@ -807,7 +829,7 @@ func TestCacheGet_expireResetGet(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req = TestRequest(t, RequestInfo{Key: "hello"})
|
req = TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
@ -840,21 +862,21 @@ func TestCacheGet_duplicateKeyDifferentType(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "foo"})
|
req := TestRequest(t, RequestInfo{Key: "foo"})
|
||||||
result, meta, err := c.Get("t", req)
|
result, meta, err := c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(100, result)
|
require.Equal(100, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Get from t2 with same key, should fetch
|
// Get from t2 with same key, should fetch
|
||||||
req = TestRequest(t, RequestInfo{Key: "foo"})
|
req = TestRequest(t, RequestInfo{Key: "foo"})
|
||||||
result, meta, err = c.Get("t2", req)
|
result, meta, err = c.Get(context.Background(), "t2", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(200, result)
|
require.Equal(200, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Get from t again with same key, should cache
|
// Get from t again with same key, should cache
|
||||||
req = TestRequest(t, RequestInfo{Key: "foo"})
|
req = TestRequest(t, RequestInfo{Key: "foo"})
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(100, result)
|
require.Equal(100, result)
|
||||||
require.True(meta.Hit)
|
require.True(meta.Hit)
|
||||||
|
@ -974,7 +996,7 @@ func TestCacheGet_refreshAge(t *testing.T) {
|
||||||
time.Sleep(2 * time.Millisecond)
|
time.Sleep(2 * time.Millisecond)
|
||||||
|
|
||||||
// Fetch again, non-blocking
|
// Fetch again, non-blocking
|
||||||
result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"}))
|
result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(8, result)
|
require.Equal(8, result)
|
||||||
require.True(meta.Hit)
|
require.True(meta.Hit)
|
||||||
|
@ -994,7 +1016,7 @@ func TestCacheGet_refreshAge(t *testing.T) {
|
||||||
|
|
||||||
var lastAge time.Duration
|
var lastAge time.Duration
|
||||||
{
|
{
|
||||||
result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"}))
|
result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(8, result)
|
require.Equal(8, result)
|
||||||
require.True(meta.Hit)
|
require.True(meta.Hit)
|
||||||
|
@ -1005,7 +1027,7 @@ func TestCacheGet_refreshAge(t *testing.T) {
|
||||||
// Wait a bit longer - age should increase by at least this much
|
// Wait a bit longer - age should increase by at least this much
|
||||||
time.Sleep(5 * time.Millisecond)
|
time.Sleep(5 * time.Millisecond)
|
||||||
{
|
{
|
||||||
result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"}))
|
result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(8, result)
|
require.Equal(8, result)
|
||||||
require.True(meta.Hit)
|
require.True(meta.Hit)
|
||||||
|
@ -1027,7 +1049,7 @@ func TestCacheGet_refreshAge(t *testing.T) {
|
||||||
// the test thread got down here relative to the failures.
|
// the test thread got down here relative to the failures.
|
||||||
for attempts := 0; attempts < 50; attempts++ {
|
for attempts := 0; attempts < 50; attempts++ {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"}))
|
result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||||
// Should never error even if background is failing as we have cached value
|
// Should never error even if background is failing as we have cached value
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.True(meta.Hit)
|
require.True(meta.Hit)
|
||||||
|
@ -1080,7 +1102,7 @@ func TestCacheGet_nonRefreshAge(t *testing.T) {
|
||||||
time.Sleep(5 * time.Millisecond)
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
|
||||||
// Fetch again, non-blocking
|
// Fetch again, non-blocking
|
||||||
result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"}))
|
result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(8, result)
|
require.Equal(8, result)
|
||||||
require.True(meta.Hit)
|
require.True(meta.Hit)
|
||||||
|
@ -1092,7 +1114,7 @@ func TestCacheGet_nonRefreshAge(t *testing.T) {
|
||||||
time.Sleep(200 * time.Millisecond)
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
{
|
{
|
||||||
result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"}))
|
result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(8, result)
|
require.Equal(8, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
@ -1108,7 +1130,7 @@ func TestCacheGet_nonRefreshAge(t *testing.T) {
|
||||||
time.Sleep(5 * time.Millisecond)
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
|
||||||
// Fetch again, non-blocking
|
// Fetch again, non-blocking
|
||||||
result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"}))
|
result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(8, result)
|
require.Equal(8, result)
|
||||||
require.True(meta.Hit)
|
require.True(meta.Hit)
|
||||||
|
@ -1118,7 +1140,7 @@ func TestCacheGet_nonRefreshAge(t *testing.T) {
|
||||||
|
|
||||||
// Now verify that setting MaxAge results in cache invalidation
|
// Now verify that setting MaxAge results in cache invalidation
|
||||||
{
|
{
|
||||||
result, meta, err := c.Get("t", TestRequest(t, RequestInfo{
|
result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{
|
||||||
Key: "hello",
|
Key: "hello",
|
||||||
MaxAge: 1 * time.Millisecond,
|
MaxAge: 1 * time.Millisecond,
|
||||||
}))
|
}))
|
||||||
|
@ -1150,14 +1172,14 @@ func TestCacheGet_nonBlockingType(t *testing.T) {
|
||||||
|
|
||||||
// Get, should fetch
|
// Get, should fetch
|
||||||
req := TestRequest(t, RequestInfo{Key: "hello"})
|
req := TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, meta, err := c.Get("t", req)
|
result, meta, err := c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Get, should not fetch since we have a cached value
|
// Get, should not fetch since we have a cached value
|
||||||
req = TestRequest(t, RequestInfo{Key: "hello"})
|
req = TestRequest(t, RequestInfo{Key: "hello"})
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.True(meta.Hit)
|
require.True(meta.Hit)
|
||||||
|
@ -1171,7 +1193,7 @@ func TestCacheGet_nonBlockingType(t *testing.T) {
|
||||||
MinIndex: 1,
|
MinIndex: 1,
|
||||||
Timeout: 10 * time.Minute,
|
Timeout: 10 * time.Minute,
|
||||||
})
|
})
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(42, result)
|
require.Equal(42, result)
|
||||||
require.True(meta.Hit)
|
require.True(meta.Hit)
|
||||||
|
@ -1180,14 +1202,14 @@ func TestCacheGet_nonBlockingType(t *testing.T) {
|
||||||
|
|
||||||
// Get with a max age should fetch again
|
// Get with a max age should fetch again
|
||||||
req = TestRequest(t, RequestInfo{Key: "hello", MaxAge: 5 * time.Millisecond})
|
req = TestRequest(t, RequestInfo{Key: "hello", MaxAge: 5 * time.Millisecond})
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(43, result)
|
require.Equal(43, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
|
||||||
// Get with a must revalidate should fetch again even without a delay.
|
// Get with a must revalidate should fetch again even without a delay.
|
||||||
req = TestRequest(t, RequestInfo{Key: "hello", MustRevalidate: true})
|
req = TestRequest(t, RequestInfo{Key: "hello", MustRevalidate: true})
|
||||||
result, meta, err = c.Get("t", req)
|
result, meta, err = c.Get(context.Background(), "t", req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(43, result)
|
require.Equal(43, result)
|
||||||
require.False(meta.Hit)
|
require.False(meta.Hit)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -21,7 +22,7 @@ func TestCache(t testing.T) *Cache {
|
||||||
func TestCacheGetCh(t testing.T, c *Cache, typ string, r Request) <-chan interface{} {
|
func TestCacheGetCh(t testing.T, c *Cache, typ string, r Request) <-chan interface{} {
|
||||||
resultCh := make(chan interface{})
|
resultCh := make(chan interface{})
|
||||||
go func() {
|
go func() {
|
||||||
result, _, err := c.Get(typ, r)
|
result, _, err := c.Get(context.Background(), typ, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("Error: %s", err)
|
t.Logf("Error: %s", err)
|
||||||
close(resultCh)
|
close(resultCh)
|
||||||
|
|
|
@ -91,7 +91,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlati
|
||||||
|
|
||||||
// Blocking request
|
// Blocking request
|
||||||
r.Info.MinIndex = index
|
r.Info.MinIndex = index
|
||||||
res, meta, err := c.getWithIndex(r)
|
res, meta, err := c.getWithIndex(ctx, r)
|
||||||
|
|
||||||
// Check context hasn't been canceled
|
// Check context hasn't been canceled
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
|
@ -151,7 +151,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlatio
|
||||||
|
|
||||||
// Make the request
|
// Make the request
|
||||||
r.Info.MinIndex = index
|
r.Info.MinIndex = index
|
||||||
res, meta, err := c.getWithIndex(r)
|
res, meta, err := c.getWithIndex(ctx, r)
|
||||||
|
|
||||||
// Check context hasn't been canceled
|
// Check context hasn't been canceled
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
|
|
|
@ -85,7 +85,7 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ
|
||||||
var out []string
|
var out []string
|
||||||
|
|
||||||
if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache {
|
if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache {
|
||||||
raw, m, err := s.agent.cache.Get(cachetype.CatalogDatacentersName, &args)
|
raw, m, err := s.agent.cache.Get(req.Context(), cachetype.CatalogDatacentersName, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_datacenters"}, 1,
|
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_datacenters"}, 1,
|
||||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||||
|
@ -167,7 +167,7 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
|
||||||
if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache {
|
if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache {
|
||||||
raw, m, err := s.agent.cache.Get(cachetype.CatalogListServicesName, &args)
|
raw, m, err := s.agent.cache.Get(req.Context(), cachetype.CatalogListServicesName, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1,
|
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1,
|
||||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||||
|
@ -256,7 +256,7 @@ func (s *HTTPServer) catalogServiceNodes(resp http.ResponseWriter, req *http.Req
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
|
||||||
if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache {
|
if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache {
|
||||||
raw, m, err := s.agent.cache.Get(cachetype.CatalogServicesName, &args)
|
raw, m, err := s.agent.cache.Get(req.Context(), cachetype.CatalogServicesName, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_service_nodes"}, 1,
|
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_service_nodes"}, 1,
|
||||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package agent
|
package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
@ -83,7 +84,7 @@ func (a *Agent) ConnectAuthorize(token string,
|
||||||
QueryOptions: structs.QueryOptions{Token: token},
|
QueryOptions: structs.QueryOptions{Token: token},
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, meta, err := a.cache.Get(cachetype.IntentionMatchName, args)
|
raw, meta, err := a.cache.Get(context.TODO(), cachetype.IntentionMatchName, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return returnErr(err)
|
return returnErr(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,7 @@ func (s *HTTPServer) DiscoveryChainRead(resp http.ResponseWriter, req *http.Requ
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
|
||||||
if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache {
|
if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache {
|
||||||
raw, m, err := s.agent.cache.Get(cachetype.CompiledDiscoveryChainName, &args)
|
raw, m, err := s.agent.cache.Get(req.Context(), cachetype.CompiledDiscoveryChainName, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package agent
|
package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
@ -876,7 +877,7 @@ func (d *DNSServer) lookupNode(cfg *dnsConfig, args *structs.NodeSpecificRequest
|
||||||
useCache := cfg.UseCache
|
useCache := cfg.UseCache
|
||||||
RPC:
|
RPC:
|
||||||
if useCache {
|
if useCache {
|
||||||
raw, _, err := d.agent.cache.Get(cachetype.NodeServicesName, args)
|
raw, _, err := d.agent.cache.Get(context.TODO(), cachetype.NodeServicesName, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1154,7 +1155,7 @@ func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, lookup serviceLookup) (st
|
||||||
var out structs.IndexedCheckServiceNodes
|
var out structs.IndexedCheckServiceNodes
|
||||||
|
|
||||||
if cfg.UseCache {
|
if cfg.UseCache {
|
||||||
raw, m, err := d.agent.cache.Get(cachetype.HealthServicesName, &args)
|
raw, m, err := d.agent.cache.Get(context.TODO(), cachetype.HealthServicesName, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return out, err
|
return out, err
|
||||||
}
|
}
|
||||||
|
@ -1360,7 +1361,7 @@ func (d *DNSServer) lookupPreparedQuery(cfg *dnsConfig, args structs.PreparedQue
|
||||||
|
|
||||||
RPC:
|
RPC:
|
||||||
if cfg.UseCache {
|
if cfg.UseCache {
|
||||||
raw, m, err := d.agent.cache.Get(cachetype.PreparedQueryName, &args)
|
raw, m, err := d.agent.cache.Get(context.TODO(), cachetype.PreparedQueryName, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -225,7 +225,7 @@ func (s *HTTPServer) healthServiceNodes(resp http.ResponseWriter, req *http.Requ
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
|
||||||
if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache {
|
if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache {
|
||||||
raw, m, err := s.agent.cache.Get(cachetype.HealthServicesName, &args)
|
raw, m, err := s.agent.cache.Get(req.Context(), cachetype.HealthServicesName, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,7 +122,7 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
|
||||||
defer setMeta(resp, &reply.QueryMeta)
|
defer setMeta(resp, &reply.QueryMeta)
|
||||||
|
|
||||||
if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache {
|
if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache {
|
||||||
raw, m, err := s.agent.cache.Get(cachetype.PreparedQueryName, &args)
|
raw, m, err := s.agent.cache.Get(req.Context(), cachetype.PreparedQueryName, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Don't return error if StaleIfError is set and we are within it and had
|
// Don't return error if StaleIfError is set and we are within it and had
|
||||||
// a cached value.
|
// a cached value.
|
||||||
|
|
|
@ -252,7 +252,7 @@ func (w *serviceConfigWatch) RegisterAndStart(
|
||||||
// operation. Either way the watcher will end up with something flagged
|
// operation. Either way the watcher will end up with something flagged
|
||||||
// as defaults even if they don't actually reflect actual defaults.
|
// as defaults even if they don't actually reflect actual defaults.
|
||||||
if waitForCentralConfig {
|
if waitForCentralConfig {
|
||||||
if err := w.fetchDefaults(); err != nil {
|
if err := w.fetchDefaults(ctx); err != nil {
|
||||||
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err)
|
return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -290,10 +290,10 @@ func (w *serviceConfigWatch) RegisterAndStart(
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: this is called while holding the Agent.stateLock
|
// NOTE: this is called while holding the Agent.stateLock
|
||||||
func (w *serviceConfigWatch) fetchDefaults() error {
|
func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) error {
|
||||||
req := makeConfigRequest(w.agent, w.registration)
|
req := makeConfigRequest(w.agent, w.registration)
|
||||||
|
|
||||||
raw, _, err := w.agent.cache.Get(cachetype.ResolvedServiceConfigName, req)
|
raw, _, err := w.agent.cache.Get(ctx, cachetype.ResolvedServiceConfigName, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue