From e494313e7b7c46f474514893f08836f0b9d70508 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Mon, 5 Apr 2021 13:23:00 -0500 Subject: [PATCH] api: ensure v1/health/ingress/:service endpoint works properly when streaming is enabled (#9967) The streaming cache type for service health has no way to handle v1/health/ingress/:service queries as there is no equivalent topic that would return the appropriate data. Ensure that attempts to use this endpoint will use the old cache-type for now so that they return appropriate data when streaming is enabled. --- .changelog/9967.txt | 3 ++ agent/agent.go | 7 ++-- agent/health_endpoint.go | 2 +- agent/health_endpoint_test.go | 71 ++++++++++++++++++++++++-------- agent/rpcclient/health/health.go | 16 ++++++- 5 files changed, 76 insertions(+), 23 deletions(-) create mode 100644 .changelog/9967.txt diff --git a/.changelog/9967.txt b/.changelog/9967.txt new file mode 100644 index 0000000000..3db1a4e20b --- /dev/null +++ b/.changelog/9967.txt @@ -0,0 +1,3 @@ +```release-note:bug +api: ensure v1/health/ingress/:service endpoint works properly when streaming is enabled +``` diff --git a/agent/agent.go b/agent/agent.go index 6c811143ff..65f8e4643b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -374,9 +374,10 @@ func New(bd BaseDeps) (*Agent, error) { cacheName = cachetype.StreamingHealthServicesName } a.rpcClientHealth = &health.Client{ - Cache: bd.Cache, - NetRPC: &a, - CacheName: cacheName, + Cache: bd.Cache, + NetRPC: &a, + CacheName: cacheName, + CacheNameIngress: cachetype.HealthServicesName, } a.serviceManager = NewServiceManager(&a) diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index b2b7cb086c..cdfccd39f2 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -219,7 +219,7 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re return nil, nil } - useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 + useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 && !args.Ingress args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming) if args.QueryOptions.UseCache && useStreaming && args.Source.Node != "" { diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index d6fc0be964..b36c23fa43 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -1418,13 +1418,20 @@ func TestHealthConnectServiceNodes(t *testing.T) { } func TestHealthIngressServiceNodes(t *testing.T) { + t.Run("no streaming", func(t *testing.T) { + testHealthIngressServiceNodes(t, ` rpc { enable_streaming = false } use_streaming_backend = false `) + }) + t.Run("cache with streaming", func(t *testing.T) { + testHealthIngressServiceNodes(t, ` rpc { enable_streaming = true } use_streaming_backend = true `) + }) +} + +func testHealthIngressServiceNodes(t *testing.T, agentHCL string) { if testing.Short() { t.Skip("too slow for testing.Short") } - t.Parallel() - - a := NewTestAgent(t, "") + a := NewTestAgent(t, agentHCL) defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") @@ -1461,34 +1468,64 @@ func TestHealthIngressServiceNodes(t *testing.T) { require.Nil(t, a.RPC("ConfigEntry.Apply", req, &outB)) require.True(t, outB) - t.Run("associated service", func(t *testing.T) { - assert := assert.New(t) - req, _ := http.NewRequest("GET", fmt.Sprintf( - "/v1/health/ingress/%s", args.Service.Service), nil) - resp := httptest.NewRecorder() - obj, err := a.srv.HealthIngressServiceNodes(resp, req) - assert.Nil(err) - assertIndex(t, resp) - + checkResults := func(t *testing.T, obj interface{}) { nodes := obj.(structs.CheckServiceNodes) require.Len(t, nodes, 1) require.Equal(t, structs.ServiceKindIngressGateway, nodes[0].Service.Kind) require.Equal(t, gatewayArgs.Service.Address, nodes[0].Service.Address) require.Equal(t, gatewayArgs.Service.Proxy, nodes[0].Service.Proxy) - }) + } - t.Run("non-associated service", func(t *testing.T) { - assert := assert.New(t) + require.True(t, t.Run("associated service", func(t *testing.T) { + req, _ := http.NewRequest("GET", fmt.Sprintf( + "/v1/health/ingress/%s", args.Service.Service), nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthIngressServiceNodes(resp, req) + require.NoError(t, err) + assertIndex(t, resp) + + checkResults(t, obj) + })) + + require.True(t, t.Run("non-associated service", func(t *testing.T) { req, _ := http.NewRequest("GET", "/v1/health/connect/notexist", nil) resp := httptest.NewRecorder() obj, err := a.srv.HealthIngressServiceNodes(resp, req) - assert.Nil(err) + require.NoError(t, err) assertIndex(t, resp) nodes := obj.(structs.CheckServiceNodes) require.Len(t, nodes, 0) - }) + })) + + require.True(t, t.Run("test caching miss", func(t *testing.T) { + // List instances with cache enabled + req, _ := http.NewRequest("GET", fmt.Sprintf( + "/v1/health/ingress/%s?cached", args.Service.Service), nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthIngressServiceNodes(resp, req) + require.NoError(t, err) + + checkResults(t, obj) + + // Should be a cache miss + require.Equal(t, "MISS", resp.Header().Get("X-Cache")) + })) + + require.True(t, t.Run("test caching hit", func(t *testing.T) { + // List instances with cache enabled + req, _ := http.NewRequest("GET", fmt.Sprintf( + "/v1/health/ingress/%s?cached", args.Service.Service), nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthIngressServiceNodes(resp, req) + require.NoError(t, err) + + checkResults(t, obj) + + // Should be a cache HIT now! + require.Equal(t, "HIT", resp.Header().Get("X-Cache")) + })) } func TestHealthConnectServiceNodes_Filter(t *testing.T) { diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 259c5b5f97..1ef29488ff 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -12,6 +12,9 @@ type Client struct { Cache CacheGetter // CacheName to use for service health. CacheName string + // CacheNameIngress is the name of the cache type to use for ingress + // service health. + CacheNameIngress string } type NetRPC interface { @@ -53,7 +56,12 @@ func (c *Client) getServiceNodes( return out, cache.ResultMeta{}, err } - raw, md, err := c.Cache.Get(ctx, c.CacheName, &req) + cacheName := c.CacheName + if req.Ingress { + cacheName = c.CacheNameIngress + } + + raw, md, err := c.Cache.Get(ctx, cacheName, &req) if err != nil { return out, md, err } @@ -72,5 +80,9 @@ func (c *Client) Notify( correlationID string, ch chan<- cache.UpdateEvent, ) error { - return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch) + cacheName := c.CacheName + if req.Ingress { + cacheName = c.CacheNameIngress + } + return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch) }