diff --git a/.changelog/9967.txt b/.changelog/9967.txt new file mode 100644 index 0000000000..3db1a4e20b --- /dev/null +++ b/.changelog/9967.txt @@ -0,0 +1,3 @@ +```release-note:bug +api: ensure v1/health/ingress/:service endpoint works properly when streaming is enabled +``` diff --git a/agent/agent.go b/agent/agent.go index 1d48cd85de..ef41f24747 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -377,6 +377,7 @@ func New(bd BaseDeps) (*Agent, error) { CacheName: cacheName, // Temporarily until streaming supports all connect events CacheNameConnect: cachetype.HealthServicesName, + CacheNameIngress: cachetype.HealthServicesName, } a.serviceManager = NewServiceManager(&a) diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index b2b7cb086c..cdfccd39f2 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -219,7 +219,7 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re return nil, nil } - useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 + useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 && !args.Ingress args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming) if args.QueryOptions.UseCache && useStreaming && args.Source.Node != "" { diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index 3429260fcb..9e37f0e685 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -1310,9 +1310,16 @@ func TestHealthConnectServiceNodes(t *testing.T) { } func TestHealthIngressServiceNodes(t *testing.T) { - t.Parallel() + t.Run("no streaming", func(t *testing.T) { + testHealthIngressServiceNodes(t, ` rpc { enable_streaming = false } use_streaming_backend = false `) + }) + t.Run("cache with streaming", func(t *testing.T) { + testHealthIngressServiceNodes(t, ` rpc { enable_streaming = true } use_streaming_backend = true `) + }) +} - a := NewTestAgent(t, "") +func testHealthIngressServiceNodes(t *testing.T, agentHCL string) { + a := NewTestAgent(t, agentHCL) defer a.Shutdown() testrpc.WaitForLeader(t, a.RPC, "dc1") @@ -1349,34 +1356,64 @@ func TestHealthIngressServiceNodes(t *testing.T) { require.Nil(t, a.RPC("ConfigEntry.Apply", req, &outB)) require.True(t, outB) - t.Run("associated service", func(t *testing.T) { - assert := assert.New(t) - req, _ := http.NewRequest("GET", fmt.Sprintf( - "/v1/health/ingress/%s", args.Service.Service), nil) - resp := httptest.NewRecorder() - obj, err := a.srv.HealthIngressServiceNodes(resp, req) - assert.Nil(err) - assertIndex(t, resp) - + checkResults := func(t *testing.T, obj interface{}) { nodes := obj.(structs.CheckServiceNodes) require.Len(t, nodes, 1) require.Equal(t, structs.ServiceKindIngressGateway, nodes[0].Service.Kind) require.Equal(t, gatewayArgs.Service.Address, nodes[0].Service.Address) require.Equal(t, gatewayArgs.Service.Proxy, nodes[0].Service.Proxy) - }) + } - t.Run("non-associated service", func(t *testing.T) { - assert := assert.New(t) + require.True(t, t.Run("associated service", func(t *testing.T) { + req, _ := http.NewRequest("GET", fmt.Sprintf( + "/v1/health/ingress/%s", args.Service.Service), nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthIngressServiceNodes(resp, req) + require.NoError(t, err) + assertIndex(t, resp) + + checkResults(t, obj) + })) + + require.True(t, t.Run("non-associated service", func(t *testing.T) { req, _ := http.NewRequest("GET", "/v1/health/connect/notexist", nil) resp := httptest.NewRecorder() obj, err := a.srv.HealthIngressServiceNodes(resp, req) - assert.Nil(err) + require.NoError(t, err) assertIndex(t, resp) nodes := obj.(structs.CheckServiceNodes) require.Len(t, nodes, 0) - }) + })) + + require.True(t, t.Run("test caching miss", func(t *testing.T) { + // List instances with cache enabled + req, _ := http.NewRequest("GET", fmt.Sprintf( + "/v1/health/ingress/%s?cached", args.Service.Service), nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthIngressServiceNodes(resp, req) + require.NoError(t, err) + + checkResults(t, obj) + + // Should be a cache miss + require.Equal(t, "MISS", resp.Header().Get("X-Cache")) + })) + + require.True(t, t.Run("test caching hit", func(t *testing.T) { + // List instances with cache enabled + req, _ := http.NewRequest("GET", fmt.Sprintf( + "/v1/health/ingress/%s?cached", args.Service.Service), nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthIngressServiceNodes(resp, req) + require.NoError(t, err) + + checkResults(t, obj) + + // Should be a cache HIT now! + require.Equal(t, "HIT", resp.Header().Get("X-Cache")) + })) } func TestHealthConnectServiceNodes_Filter(t *testing.T) { diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 0118a363cd..4674990f9b 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -14,6 +14,9 @@ type Client struct { CacheName string // CacheNameConnect is the name of the cache to use for connect service health. CacheNameConnect string + // CacheNameIngress is the name of the cache type to use for ingress + // service health. + CacheNameIngress string } type NetRPC interface { @@ -58,6 +61,9 @@ func (c *Client) getServiceNodes( if req.Connect { cacheName = c.CacheNameConnect } + if req.Ingress { + cacheName = c.CacheNameIngress + } raw, md, err := c.Cache.Get(ctx, cacheName, &req) if err != nil {