mirror of https://github.com/status-im/consul.git
api: ensure v1/health/ingress/:service endpoint works properly when streaming is enabled (#9967)
The streaming cache type for service health has no way to handle v1/health/ingress/:service queries as there is no equivalent topic that would return the appropriate data. Ensure that attempts to use this endpoint will use the old cache-type for now so that they return appropriate data when streaming is enabled.
This commit is contained in:
parent
63374bf4c1
commit
e494313e7b
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
api: ensure v1/health/ingress/:service endpoint works properly when streaming is enabled
|
||||||
|
```
|
|
@ -374,9 +374,10 @@ func New(bd BaseDeps) (*Agent, error) {
|
||||||
cacheName = cachetype.StreamingHealthServicesName
|
cacheName = cachetype.StreamingHealthServicesName
|
||||||
}
|
}
|
||||||
a.rpcClientHealth = &health.Client{
|
a.rpcClientHealth = &health.Client{
|
||||||
Cache: bd.Cache,
|
Cache: bd.Cache,
|
||||||
NetRPC: &a,
|
NetRPC: &a,
|
||||||
CacheName: cacheName,
|
CacheName: cacheName,
|
||||||
|
CacheNameIngress: cachetype.HealthServicesName,
|
||||||
}
|
}
|
||||||
|
|
||||||
a.serviceManager = NewServiceManager(&a)
|
a.serviceManager = NewServiceManager(&a)
|
||||||
|
|
|
@ -219,7 +219,7 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0
|
useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 && !args.Ingress
|
||||||
args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming)
|
args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming)
|
||||||
|
|
||||||
if args.QueryOptions.UseCache && useStreaming && args.Source.Node != "" {
|
if args.QueryOptions.UseCache && useStreaming && args.Source.Node != "" {
|
||||||
|
|
|
@ -1418,13 +1418,20 @@ func TestHealthConnectServiceNodes(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHealthIngressServiceNodes(t *testing.T) {
|
func TestHealthIngressServiceNodes(t *testing.T) {
|
||||||
|
t.Run("no streaming", func(t *testing.T) {
|
||||||
|
testHealthIngressServiceNodes(t, ` rpc { enable_streaming = false } use_streaming_backend = false `)
|
||||||
|
})
|
||||||
|
t.Run("cache with streaming", func(t *testing.T) {
|
||||||
|
testHealthIngressServiceNodes(t, ` rpc { enable_streaming = true } use_streaming_backend = true `)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func testHealthIngressServiceNodes(t *testing.T, agentHCL string) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Parallel()
|
a := NewTestAgent(t, agentHCL)
|
||||||
|
|
||||||
a := NewTestAgent(t, "")
|
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
@ -1461,34 +1468,64 @@ func TestHealthIngressServiceNodes(t *testing.T) {
|
||||||
require.Nil(t, a.RPC("ConfigEntry.Apply", req, &outB))
|
require.Nil(t, a.RPC("ConfigEntry.Apply", req, &outB))
|
||||||
require.True(t, outB)
|
require.True(t, outB)
|
||||||
|
|
||||||
t.Run("associated service", func(t *testing.T) {
|
checkResults := func(t *testing.T, obj interface{}) {
|
||||||
assert := assert.New(t)
|
|
||||||
req, _ := http.NewRequest("GET", fmt.Sprintf(
|
|
||||||
"/v1/health/ingress/%s", args.Service.Service), nil)
|
|
||||||
resp := httptest.NewRecorder()
|
|
||||||
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
|
|
||||||
assert.Nil(err)
|
|
||||||
assertIndex(t, resp)
|
|
||||||
|
|
||||||
nodes := obj.(structs.CheckServiceNodes)
|
nodes := obj.(structs.CheckServiceNodes)
|
||||||
require.Len(t, nodes, 1)
|
require.Len(t, nodes, 1)
|
||||||
require.Equal(t, structs.ServiceKindIngressGateway, nodes[0].Service.Kind)
|
require.Equal(t, structs.ServiceKindIngressGateway, nodes[0].Service.Kind)
|
||||||
require.Equal(t, gatewayArgs.Service.Address, nodes[0].Service.Address)
|
require.Equal(t, gatewayArgs.Service.Address, nodes[0].Service.Address)
|
||||||
require.Equal(t, gatewayArgs.Service.Proxy, nodes[0].Service.Proxy)
|
require.Equal(t, gatewayArgs.Service.Proxy, nodes[0].Service.Proxy)
|
||||||
})
|
}
|
||||||
|
|
||||||
t.Run("non-associated service", func(t *testing.T) {
|
require.True(t, t.Run("associated service", func(t *testing.T) {
|
||||||
assert := assert.New(t)
|
req, _ := http.NewRequest("GET", fmt.Sprintf(
|
||||||
|
"/v1/health/ingress/%s", args.Service.Service), nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assertIndex(t, resp)
|
||||||
|
|
||||||
|
checkResults(t, obj)
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.True(t, t.Run("non-associated service", func(t *testing.T) {
|
||||||
req, _ := http.NewRequest("GET",
|
req, _ := http.NewRequest("GET",
|
||||||
"/v1/health/connect/notexist", nil)
|
"/v1/health/connect/notexist", nil)
|
||||||
resp := httptest.NewRecorder()
|
resp := httptest.NewRecorder()
|
||||||
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
|
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
|
||||||
assert.Nil(err)
|
require.NoError(t, err)
|
||||||
assertIndex(t, resp)
|
assertIndex(t, resp)
|
||||||
|
|
||||||
nodes := obj.(structs.CheckServiceNodes)
|
nodes := obj.(structs.CheckServiceNodes)
|
||||||
require.Len(t, nodes, 0)
|
require.Len(t, nodes, 0)
|
||||||
})
|
}))
|
||||||
|
|
||||||
|
require.True(t, t.Run("test caching miss", func(t *testing.T) {
|
||||||
|
// List instances with cache enabled
|
||||||
|
req, _ := http.NewRequest("GET", fmt.Sprintf(
|
||||||
|
"/v1/health/ingress/%s?cached", args.Service.Service), nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
checkResults(t, obj)
|
||||||
|
|
||||||
|
// Should be a cache miss
|
||||||
|
require.Equal(t, "MISS", resp.Header().Get("X-Cache"))
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.True(t, t.Run("test caching hit", func(t *testing.T) {
|
||||||
|
// List instances with cache enabled
|
||||||
|
req, _ := http.NewRequest("GET", fmt.Sprintf(
|
||||||
|
"/v1/health/ingress/%s?cached", args.Service.Service), nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.HealthIngressServiceNodes(resp, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
checkResults(t, obj)
|
||||||
|
|
||||||
|
// Should be a cache HIT now!
|
||||||
|
require.Equal(t, "HIT", resp.Header().Get("X-Cache"))
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHealthConnectServiceNodes_Filter(t *testing.T) {
|
func TestHealthConnectServiceNodes_Filter(t *testing.T) {
|
||||||
|
|
|
@ -12,6 +12,9 @@ type Client struct {
|
||||||
Cache CacheGetter
|
Cache CacheGetter
|
||||||
// CacheName to use for service health.
|
// CacheName to use for service health.
|
||||||
CacheName string
|
CacheName string
|
||||||
|
// CacheNameIngress is the name of the cache type to use for ingress
|
||||||
|
// service health.
|
||||||
|
CacheNameIngress string
|
||||||
}
|
}
|
||||||
|
|
||||||
type NetRPC interface {
|
type NetRPC interface {
|
||||||
|
@ -53,7 +56,12 @@ func (c *Client) getServiceNodes(
|
||||||
return out, cache.ResultMeta{}, err
|
return out, cache.ResultMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, md, err := c.Cache.Get(ctx, c.CacheName, &req)
|
cacheName := c.CacheName
|
||||||
|
if req.Ingress {
|
||||||
|
cacheName = c.CacheNameIngress
|
||||||
|
}
|
||||||
|
|
||||||
|
raw, md, err := c.Cache.Get(ctx, cacheName, &req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return out, md, err
|
return out, md, err
|
||||||
}
|
}
|
||||||
|
@ -72,5 +80,9 @@ func (c *Client) Notify(
|
||||||
correlationID string,
|
correlationID string,
|
||||||
ch chan<- cache.UpdateEvent,
|
ch chan<- cache.UpdateEvent,
|
||||||
) error {
|
) error {
|
||||||
return c.Cache.Notify(ctx, c.CacheName, &req, correlationID, ch)
|
cacheName := c.CacheName
|
||||||
|
if req.Ingress {
|
||||||
|
cacheName = c.CacheNameIngress
|
||||||
|
}
|
||||||
|
return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue