From 8837907de4e62555c20672d6b4fd89b3bd6ecc9a Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Mon, 15 Jun 2020 11:01:25 -0400 Subject: [PATCH] Make the Agent Cache more Context aware (#8092) Blocking queries issues will still be uncancellable (that cannot be helped until we get rid of net/rpc). However this makes it so that if calling getWithIndex (like during a cache Notify go routine) we can cancell the outer routine. Previously it would keep issuing more blocking queries until the result state actually changed. --- agent/agent_endpoint.go | 4 +- agent/cache-types/connect_ca_leaf.go | 4 +- agent/cache/cache.go | 9 ++- agent/cache/cache_test.go | 90 +++++++++++++++++----------- agent/cache/testing.go | 3 +- agent/cache/watch.go | 4 +- agent/catalog_endpoint.go | 6 +- agent/connect_auth.go | 3 +- agent/discovery_chain_endpoint.go | 2 +- agent/dns.go | 7 ++- agent/health_endpoint.go | 2 +- agent/prepared_query_endpoint.go | 2 +- agent/service_manager.go | 6 +- 13 files changed, 86 insertions(+), 56 deletions(-) diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 6500aa1bd5..c383cad25e 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -1317,7 +1317,7 @@ func (s *HTTPServer) AgentConnectCARoots(resp http.ResponseWriter, req *http.Req return nil, nil } - raw, m, err := s.agent.cache.Get(cachetype.ConnectCARootName, &args) + raw, m, err := s.agent.cache.Get(req.Context(), cachetype.ConnectCARootName, &args) if err != nil { return nil, err } @@ -1359,7 +1359,7 @@ func (s *HTTPServer) AgentConnectCALeafCert(resp http.ResponseWriter, req *http. args.MaxQueryTime = qOpts.MaxQueryTime args.Token = qOpts.Token - raw, m, err := s.agent.cache.Get(cachetype.ConnectCALeafName, &args) + raw, m, err := s.agent.cache.Get(req.Context(), cachetype.ConnectCALeafName, &args) if err != nil { return nil, err } diff --git a/agent/cache-types/connect_ca_leaf.go b/agent/cache-types/connect_ca_leaf.go index b8b6575512..afda5a7822 100644 --- a/agent/cache-types/connect_ca_leaf.go +++ b/agent/cache-types/connect_ca_leaf.go @@ -469,7 +469,9 @@ func activeRootHasKey(roots *structs.IndexedCARoots, currentSigningKeyID string) } func (c *ConnectCALeaf) rootsFromCache() (*structs.IndexedCARoots, error) { - rawRoots, _, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{ + // Background is fine here because this isn't a blocking query as no index is set. + // Therefore this will just either be a cache hit or return once the non-blocking query returns. + rawRoots, _, err := c.Cache.Get(context.Background(), ConnectCARootName, &structs.DCSpecificRequest{ Datacenter: c.Datacenter, }) if err != nil { diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 6f9aec9126..9b8845b649 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -16,6 +16,7 @@ package cache import ( "container/heap" + "context" "fmt" "sync" "sync/atomic" @@ -216,7 +217,7 @@ func (c *Cache) RegisterType(n string, typ Type) { // index is retrieved, the last known value (maybe nil) is returned. No // error is returned on timeout. This matches the behavior of Consul blocking // queries. -func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) { +func (c *Cache) Get(ctx context.Context, t string, r Request) (interface{}, ResultMeta, error) { c.typesLock.RLock() tEntry, ok := c.types[t] c.typesLock.RUnlock() @@ -225,7 +226,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) { // once. But be robust against panics. return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t) } - return c.getWithIndex(newGetOptions(tEntry, r)) + return c.getWithIndex(ctx, newGetOptions(tEntry, r)) } // getOptions contains the arguments for a Get request. It is used in place of @@ -292,7 +293,7 @@ func entryExceedsMaxAge(maxAge time.Duration, entry cacheEntry) bool { // getWithIndex implements the main Get functionality but allows internal // callers (Watch) to manipulate the blocking index separately from the actual // request object. -func (c *Cache) getWithIndex(r getOptions) (interface{}, ResultMeta, error) { +func (c *Cache) getWithIndex(ctx context.Context, r getOptions) (interface{}, ResultMeta, error) { if r.Info.Key == "" { metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1) @@ -394,6 +395,8 @@ RETRY_GET: first = false select { + case <-ctx.Done(): + return nil, ResultMeta{}, ctx.Err() case <-waiterCh: // Our fetch returned, retry the get from the cache. r.Info.MustRevalidate = false diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 32cc2dc417..5036a12ca5 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -1,6 +1,7 @@ package cache import ( + "context" "errors" "fmt" "sort" @@ -9,6 +10,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/sdk/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -30,13 +32,13 @@ func TestCacheGet_noIndex(t *testing.T) { // Get, should fetch req := TestRequest(t, RequestInfo{Key: "hello"}) - result, meta, err := c.Get("t", req) + result, meta, err := c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.False(meta.Hit) // Get, should not fetch since we already have a satisfying value - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.True(meta.Hit) @@ -64,13 +66,13 @@ func TestCacheGet_initError(t *testing.T) { // Get, should fetch req := TestRequest(t, RequestInfo{Key: "hello"}) - result, meta, err := c.Get("t", req) + result, meta, err := c.Get(context.Background(), "t", req) require.Error(err) require.Nil(result) require.False(meta.Hit) // Get, should fetch again since our last fetch was an error - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.Error(err) require.Nil(result) require.False(meta.Hit) @@ -104,13 +106,13 @@ func TestCacheGet_cachedErrorsDontStick(t *testing.T) { // Get, should fetch and get error req := TestRequest(t, RequestInfo{Key: "hello"}) - result, meta, err := c.Get("t", req) + result, meta, err := c.Get(context.Background(), "t", req) require.Error(err) require.Nil(result) require.False(meta.Hit) // Get, should fetch again since our last fetch was an error, but get success - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.False(meta.Hit) @@ -159,13 +161,13 @@ func TestCacheGet_blankCacheKey(t *testing.T) { // Get, should fetch req := TestRequest(t, RequestInfo{Key: ""}) - result, meta, err := c.Get("t", req) + result, meta, err := c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.False(meta.Hit) // Get, should not fetch since we already have a satisfying value - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.False(meta.Hit) @@ -296,6 +298,26 @@ func TestCacheGet_blockingIndex(t *testing.T) { TestCacheGetChResult(t, resultCh, 42) } +func TestCacheGet_cancellation(t *testing.T) { + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ) + + typ.Static(FetchResult{Value: 1, Index: 4}, nil).Times(0).WaitUntil(time.After(1 * time.Millisecond)) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond)) + // this is just to keep the linter happy + defer cancel() + + result, _, err := c.Get(ctx, "t", TestRequest(t, RequestInfo{ + Key: "hello", MinIndex: 5})) + + require.Nil(t, result) + require.Error(t, err) + testutil.RequireErrorContains(t, err, context.DeadlineExceeded.Error()) +} + // Test a get with an index set will timeout if the fetch doesn't return // anything. func TestCacheGet_blockingIndexTimeout(t *testing.T) { @@ -393,7 +415,7 @@ func TestCacheGet_emptyFetchResult(t *testing.T) { // Get, should fetch req := TestRequest(t, RequestInfo{Key: "hello"}) - result, meta, err := c.Get("t", req) + result, meta, err := c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.False(meta.Hit) @@ -401,7 +423,7 @@ func TestCacheGet_emptyFetchResult(t *testing.T) { // Get, should not fetch since we already have a satisfying value req = TestRequest(t, RequestInfo{ Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond}) - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.False(meta.Hit) @@ -418,7 +440,7 @@ func TestCacheGet_emptyFetchResult(t *testing.T) { // returns nil and so the previous result is used. req = TestRequest(t, RequestInfo{ Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond}) - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.False(meta.Hit) @@ -698,7 +720,7 @@ func TestCacheGet_fetchTimeout(t *testing.T) { // Get, should fetch req := TestRequest(t, RequestInfo{Key: "hello"}) - result, meta, err := c.Get("t", req) + result, meta, err := c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.False(meta.Hit) @@ -728,7 +750,7 @@ func TestCacheGet_expire(t *testing.T) { // Get, should fetch req := TestRequest(t, RequestInfo{Key: "hello"}) - result, meta, err := c.Get("t", req) + result, meta, err := c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.False(meta.Hit) @@ -741,7 +763,7 @@ func TestCacheGet_expire(t *testing.T) { // Get, should not fetch, verified via the mock assertions above req = TestRequest(t, RequestInfo{Key: "hello"}) - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.True(meta.Hit) @@ -752,7 +774,7 @@ func TestCacheGet_expire(t *testing.T) { // Get, should fetch req = TestRequest(t, RequestInfo{Key: "hello"}) - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.False(meta.Hit) @@ -784,7 +806,7 @@ func TestCacheGet_expireResetGet(t *testing.T) { // Get, should fetch req := TestRequest(t, RequestInfo{Key: "hello"}) - result, meta, err := c.Get("t", req) + result, meta, err := c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.False(meta.Hit) @@ -797,7 +819,7 @@ func TestCacheGet_expireResetGet(t *testing.T) { // Get, should not fetch req = TestRequest(t, RequestInfo{Key: "hello"}) - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.True(meta.Hit) @@ -807,7 +829,7 @@ func TestCacheGet_expireResetGet(t *testing.T) { // Get, should fetch req = TestRequest(t, RequestInfo{Key: "hello"}) - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.False(meta.Hit) @@ -840,21 +862,21 @@ func TestCacheGet_duplicateKeyDifferentType(t *testing.T) { // Get, should fetch req := TestRequest(t, RequestInfo{Key: "foo"}) - result, meta, err := c.Get("t", req) + result, meta, err := c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(100, result) require.False(meta.Hit) // Get from t2 with same key, should fetch req = TestRequest(t, RequestInfo{Key: "foo"}) - result, meta, err = c.Get("t2", req) + result, meta, err = c.Get(context.Background(), "t2", req) require.NoError(err) require.Equal(200, result) require.False(meta.Hit) // Get from t again with same key, should cache req = TestRequest(t, RequestInfo{Key: "foo"}) - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(100, result) require.True(meta.Hit) @@ -974,7 +996,7 @@ func TestCacheGet_refreshAge(t *testing.T) { time.Sleep(2 * time.Millisecond) // Fetch again, non-blocking - result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"})) + result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"})) require.NoError(err) require.Equal(8, result) require.True(meta.Hit) @@ -994,7 +1016,7 @@ func TestCacheGet_refreshAge(t *testing.T) { var lastAge time.Duration { - result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"})) + result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"})) require.NoError(err) require.Equal(8, result) require.True(meta.Hit) @@ -1005,7 +1027,7 @@ func TestCacheGet_refreshAge(t *testing.T) { // Wait a bit longer - age should increase by at least this much time.Sleep(5 * time.Millisecond) { - result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"})) + result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"})) require.NoError(err) require.Equal(8, result) require.True(meta.Hit) @@ -1027,7 +1049,7 @@ func TestCacheGet_refreshAge(t *testing.T) { // the test thread got down here relative to the failures. for attempts := 0; attempts < 50; attempts++ { time.Sleep(100 * time.Millisecond) - result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"})) + result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"})) // Should never error even if background is failing as we have cached value require.NoError(err) require.True(meta.Hit) @@ -1080,7 +1102,7 @@ func TestCacheGet_nonRefreshAge(t *testing.T) { time.Sleep(5 * time.Millisecond) // Fetch again, non-blocking - result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"})) + result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"})) require.NoError(err) require.Equal(8, result) require.True(meta.Hit) @@ -1092,7 +1114,7 @@ func TestCacheGet_nonRefreshAge(t *testing.T) { time.Sleep(200 * time.Millisecond) { - result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"})) + result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"})) require.NoError(err) require.Equal(8, result) require.False(meta.Hit) @@ -1108,7 +1130,7 @@ func TestCacheGet_nonRefreshAge(t *testing.T) { time.Sleep(5 * time.Millisecond) // Fetch again, non-blocking - result, meta, err := c.Get("t", TestRequest(t, RequestInfo{Key: "hello"})) + result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{Key: "hello"})) require.NoError(err) require.Equal(8, result) require.True(meta.Hit) @@ -1118,7 +1140,7 @@ func TestCacheGet_nonRefreshAge(t *testing.T) { // Now verify that setting MaxAge results in cache invalidation { - result, meta, err := c.Get("t", TestRequest(t, RequestInfo{ + result, meta, err := c.Get(context.Background(), "t", TestRequest(t, RequestInfo{ Key: "hello", MaxAge: 1 * time.Millisecond, })) @@ -1150,14 +1172,14 @@ func TestCacheGet_nonBlockingType(t *testing.T) { // Get, should fetch req := TestRequest(t, RequestInfo{Key: "hello"}) - result, meta, err := c.Get("t", req) + result, meta, err := c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.False(meta.Hit) // Get, should not fetch since we have a cached value req = TestRequest(t, RequestInfo{Key: "hello"}) - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.True(meta.Hit) @@ -1171,7 +1193,7 @@ func TestCacheGet_nonBlockingType(t *testing.T) { MinIndex: 1, Timeout: 10 * time.Minute, }) - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(42, result) require.True(meta.Hit) @@ -1180,14 +1202,14 @@ func TestCacheGet_nonBlockingType(t *testing.T) { // Get with a max age should fetch again req = TestRequest(t, RequestInfo{Key: "hello", MaxAge: 5 * time.Millisecond}) - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(43, result) require.False(meta.Hit) // Get with a must revalidate should fetch again even without a delay. req = TestRequest(t, RequestInfo{Key: "hello", MustRevalidate: true}) - result, meta, err = c.Get("t", req) + result, meta, err = c.Get(context.Background(), "t", req) require.NoError(err) require.Equal(43, result) require.False(meta.Hit) diff --git a/agent/cache/testing.go b/agent/cache/testing.go index 47180dc3a6..edce5473bc 100644 --- a/agent/cache/testing.go +++ b/agent/cache/testing.go @@ -1,6 +1,7 @@ package cache import ( + "context" "reflect" "time" @@ -21,7 +22,7 @@ func TestCache(t testing.T) *Cache { func TestCacheGetCh(t testing.T, c *Cache, typ string, r Request) <-chan interface{} { resultCh := make(chan interface{}) go func() { - result, _, err := c.Get(typ, r) + result, _, err := c.Get(context.Background(), typ, r) if err != nil { t.Logf("Error: %s", err) close(resultCh) diff --git a/agent/cache/watch.go b/agent/cache/watch.go index baf2759f93..1fa228e39a 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -91,7 +91,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlati // Blocking request r.Info.MinIndex = index - res, meta, err := c.getWithIndex(r) + res, meta, err := c.getWithIndex(ctx, r) // Check context hasn't been canceled if ctx.Err() != nil { @@ -151,7 +151,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlatio // Make the request r.Info.MinIndex = index - res, meta, err := c.getWithIndex(r) + res, meta, err := c.getWithIndex(ctx, r) // Check context hasn't been canceled if ctx.Err() != nil { diff --git a/agent/catalog_endpoint.go b/agent/catalog_endpoint.go index 34464a24dc..153c721089 100644 --- a/agent/catalog_endpoint.go +++ b/agent/catalog_endpoint.go @@ -85,7 +85,7 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ var out []string if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache { - raw, m, err := s.agent.cache.Get(cachetype.CatalogDatacentersName, &args) + raw, m, err := s.agent.cache.Get(req.Context(), cachetype.CatalogDatacentersName, &args) if err != nil { metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_datacenters"}, 1, []metrics.Label{{Name: "node", Value: s.nodeName()}}) @@ -167,7 +167,7 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request defer setMeta(resp, &out.QueryMeta) if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache { - raw, m, err := s.agent.cache.Get(cachetype.CatalogListServicesName, &args) + raw, m, err := s.agent.cache.Get(req.Context(), cachetype.CatalogListServicesName, &args) if err != nil { metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1, []metrics.Label{{Name: "node", Value: s.nodeName()}}) @@ -256,7 +256,7 @@ func (s *HTTPServer) catalogServiceNodes(resp http.ResponseWriter, req *http.Req defer setMeta(resp, &out.QueryMeta) if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache { - raw, m, err := s.agent.cache.Get(cachetype.CatalogServicesName, &args) + raw, m, err := s.agent.cache.Get(req.Context(), cachetype.CatalogServicesName, &args) if err != nil { metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_service_nodes"}, 1, []metrics.Label{{Name: "node", Value: s.nodeName()}}) diff --git a/agent/connect_auth.go b/agent/connect_auth.go index 43ee70c5ed..da3f05fa73 100644 --- a/agent/connect_auth.go +++ b/agent/connect_auth.go @@ -1,6 +1,7 @@ package agent import ( + "context" "fmt" "github.com/hashicorp/consul/acl" @@ -83,7 +84,7 @@ func (a *Agent) ConnectAuthorize(token string, QueryOptions: structs.QueryOptions{Token: token}, } - raw, meta, err := a.cache.Get(cachetype.IntentionMatchName, args) + raw, meta, err := a.cache.Get(context.TODO(), cachetype.IntentionMatchName, args) if err != nil { return returnErr(err) } diff --git a/agent/discovery_chain_endpoint.go b/agent/discovery_chain_endpoint.go index f85f7364b1..d6878ba7bd 100644 --- a/agent/discovery_chain_endpoint.go +++ b/agent/discovery_chain_endpoint.go @@ -60,7 +60,7 @@ func (s *HTTPServer) DiscoveryChainRead(resp http.ResponseWriter, req *http.Requ defer setMeta(resp, &out.QueryMeta) if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache { - raw, m, err := s.agent.cache.Get(cachetype.CompiledDiscoveryChainName, &args) + raw, m, err := s.agent.cache.Get(req.Context(), cachetype.CompiledDiscoveryChainName, &args) if err != nil { return nil, err } diff --git a/agent/dns.go b/agent/dns.go index 314ffdef03..8d339ffc8a 100644 --- a/agent/dns.go +++ b/agent/dns.go @@ -1,6 +1,7 @@ package agent import ( + "context" "encoding/hex" "fmt" "net" @@ -876,7 +877,7 @@ func (d *DNSServer) lookupNode(cfg *dnsConfig, args *structs.NodeSpecificRequest useCache := cfg.UseCache RPC: if useCache { - raw, _, err := d.agent.cache.Get(cachetype.NodeServicesName, args) + raw, _, err := d.agent.cache.Get(context.TODO(), cachetype.NodeServicesName, args) if err != nil { return nil, err } @@ -1154,7 +1155,7 @@ func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, lookup serviceLookup) (st var out structs.IndexedCheckServiceNodes if cfg.UseCache { - raw, m, err := d.agent.cache.Get(cachetype.HealthServicesName, &args) + raw, m, err := d.agent.cache.Get(context.TODO(), cachetype.HealthServicesName, &args) if err != nil { return out, err } @@ -1360,7 +1361,7 @@ func (d *DNSServer) lookupPreparedQuery(cfg *dnsConfig, args structs.PreparedQue RPC: if cfg.UseCache { - raw, m, err := d.agent.cache.Get(cachetype.PreparedQueryName, &args) + raw, m, err := d.agent.cache.Get(context.TODO(), cachetype.PreparedQueryName, &args) if err != nil { return nil, err } diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index e690ff6740..ac40dad09f 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -225,7 +225,7 @@ func (s *HTTPServer) healthServiceNodes(resp http.ResponseWriter, req *http.Requ defer setMeta(resp, &out.QueryMeta) if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache { - raw, m, err := s.agent.cache.Get(cachetype.HealthServicesName, &args) + raw, m, err := s.agent.cache.Get(req.Context(), cachetype.HealthServicesName, &args) if err != nil { return nil, err } diff --git a/agent/prepared_query_endpoint.go b/agent/prepared_query_endpoint.go index 9c238ea44c..5a33f89e7c 100644 --- a/agent/prepared_query_endpoint.go +++ b/agent/prepared_query_endpoint.go @@ -122,7 +122,7 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r defer setMeta(resp, &reply.QueryMeta) if s.agent.config.HTTPUseCache && args.QueryOptions.UseCache { - raw, m, err := s.agent.cache.Get(cachetype.PreparedQueryName, &args) + raw, m, err := s.agent.cache.Get(req.Context(), cachetype.PreparedQueryName, &args) if err != nil { // Don't return error if StaleIfError is set and we are within it and had // a cached value. diff --git a/agent/service_manager.go b/agent/service_manager.go index b693ec6f40..5a163f3596 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -252,7 +252,7 @@ func (w *serviceConfigWatch) RegisterAndStart( // operation. Either way the watcher will end up with something flagged // as defaults even if they don't actually reflect actual defaults. if waitForCentralConfig { - if err := w.fetchDefaults(); err != nil { + if err := w.fetchDefaults(ctx); err != nil { return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v", service.ID, err) } } else { @@ -290,10 +290,10 @@ func (w *serviceConfigWatch) RegisterAndStart( } // NOTE: this is called while holding the Agent.stateLock -func (w *serviceConfigWatch) fetchDefaults() error { +func (w *serviceConfigWatch) fetchDefaults(ctx context.Context) error { req := makeConfigRequest(w.agent, w.registration) - raw, _, err := w.agent.cache.Get(cachetype.ResolvedServiceConfigName, req) + raw, _, err := w.agent.cache.Get(ctx, cachetype.ResolvedServiceConfigName, req) if err != nil { return err }