mirror of https://github.com/status-im/consul.git
health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming (#12640)
The primary bug here is in the streaming subsystem that makes the overall v1/health/service/:service request behave incorrectly when servicing a blocking request with a filter provided. There is a secondary non-streaming bug being fixed here that is much less obvious related to when to update the `reply` variable in a `blockingQuery` evaluation. It is unlikely that it is triggerable in practical environments and I could not actually get the bug to manifest, but I fixed it anyway while investigating the original issue. Simple reproduction (streaming): 1. Register a service with a tag. curl -sL --request PUT 'http://localhost:8500/v1/agent/service/register' \ --header 'Content-Type: application/json' \ --data-raw '{ "ID": "ID1", "Name": "test", "Tags":[ "a" ], "EnableTagOverride": true }' 2. Do an initial filter query that matches on the tag. curl -sLi --get 'http://localhost:8500/v1/health/service/test' --data-urlencode 'filter=a in Service.Tags' 3. Note you get one result. Use the `X-Consul-Index` header to establish a blocking query in another terminal, this should not return yet. curl -sLi --get 'http://localhost:8500/v1/health/service/test?index=$INDEX' --data-urlencode 'filter=a in Service.Tags' 4. Re-register that service with a different tag. curl -sL --request PUT 'http://localhost:8500/v1/agent/service/register' \ --header 'Content-Type: application/json' \ --data-raw '{ "ID": "ID1", "Name": "test", "Tags":[ "b" ], "EnableTagOverride": true }' 5. Your blocking query from (3) should return with a header `X-Consul-Query-Backend: streaming` and empty results if it works correctly `[]`. Attempts to reproduce with non-streaming failed (where you add `&near=_agent` to the read queries and ensure `X-Consul-Query-Backend: blocking-query` shows up in the results).
This commit is contained in:
parent
1a491886fa
commit
11213ae180
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming #12640
|
||||||
|
```
|
|
@ -236,30 +236,38 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.Store) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
|
var thisReply structs.IndexedCheckServiceNodes
|
||||||
|
|
||||||
index, nodes, err := f(ws, state, args)
|
index, nodes, err := f(ws, state, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
reply.Index, reply.Nodes = index, nodes
|
thisReply.Index, thisReply.Nodes = index, nodes
|
||||||
|
|
||||||
if len(args.NodeMetaFilters) > 0 {
|
if len(args.NodeMetaFilters) > 0 {
|
||||||
reply.Nodes = nodeMetaFilter(args.NodeMetaFilters, reply.Nodes)
|
thisReply.Nodes = nodeMetaFilter(args.NodeMetaFilters, thisReply.Nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, err := filter.Execute(reply.Nodes)
|
raw, err := filter.Execute(thisReply.Nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
reply.Nodes = raw.(structs.CheckServiceNodes)
|
thisReply.Nodes = raw.(structs.CheckServiceNodes)
|
||||||
|
|
||||||
// Note: we filter the results with ACLs *after* applying the user-supplied
|
// Note: we filter the results with ACLs *after* applying the user-supplied
|
||||||
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
|
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include
|
||||||
// results that would be filtered out even if the user did have permission.
|
// results that would be filtered out even if the user did have permission.
|
||||||
if err := h.srv.filterACL(args.Token, reply); err != nil {
|
if err := h.srv.filterACL(args.Token, &thisReply); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return h.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes)
|
if err := h.srv.sortNodesByDistanceFrom(args.Source, thisReply.Nodes); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
*reply = thisReply
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
// Provide some metrics
|
// Provide some metrics
|
||||||
|
|
|
@ -663,6 +663,85 @@ func TestHealth_ServiceNodes(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHealth_ServiceNodes_BlockingQuery_withFilter(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
_, s1 := testServer(t)
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
|
||||||
|
waitForLeaderEstablishment(t, s1)
|
||||||
|
|
||||||
|
register := func(t *testing.T, name, tag string) {
|
||||||
|
arg := structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"),
|
||||||
|
Node: "node1",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: name,
|
||||||
|
Service: name,
|
||||||
|
Tags: []string{tag},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out))
|
||||||
|
}
|
||||||
|
|
||||||
|
register(t, "web", "foo")
|
||||||
|
|
||||||
|
var lastIndex uint64
|
||||||
|
runStep(t, "read original", func(t *testing.T) {
|
||||||
|
var out structs.IndexedCheckServiceNodes
|
||||||
|
req := structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web",
|
||||||
|
QueryOptions: structs.QueryOptions{
|
||||||
|
Filter: "foo in Service.Tags",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out))
|
||||||
|
|
||||||
|
require.Len(t, out.Nodes, 1)
|
||||||
|
node := out.Nodes[0]
|
||||||
|
require.Equal(t, "node1", node.Node.Node)
|
||||||
|
require.Equal(t, "web", node.Service.Service)
|
||||||
|
require.Equal(t, []string{"foo"}, node.Service.Tags)
|
||||||
|
|
||||||
|
require.Equal(t, structs.QueryBackendBlocking, out.Backend)
|
||||||
|
lastIndex = out.Index
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "read blocking query result", func(t *testing.T) {
|
||||||
|
req := structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ServiceName: "web",
|
||||||
|
QueryOptions: structs.QueryOptions{
|
||||||
|
Filter: "foo in Service.Tags",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
req.MinQueryIndex = lastIndex
|
||||||
|
|
||||||
|
var out structs.IndexedCheckServiceNodes
|
||||||
|
errCh := channelCallRPC(s1, "Health.ServiceNodes", &req, &out, nil)
|
||||||
|
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// Change the tags
|
||||||
|
register(t, "web", "bar")
|
||||||
|
|
||||||
|
if err := <-errCh; err != nil {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, structs.QueryBackendBlocking, out.Backend)
|
||||||
|
require.Len(t, out.Nodes, 0)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestHealth_ServiceNodes_MultipleServiceTags(t *testing.T) {
|
func TestHealth_ServiceNodes_MultipleServiceTags(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestHealthChecksInState(t *testing.T) {
|
func TestHealthChecksInState(t *testing.T) {
|
||||||
|
@ -936,6 +937,135 @@ use_streaming_backend = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHealthServiceNodes_Blocking_withFilter(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
hcl string
|
||||||
|
queryBackend string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no streaming",
|
||||||
|
queryBackend: "blocking-query",
|
||||||
|
hcl: `use_streaming_backend = false`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "streaming",
|
||||||
|
hcl: `
|
||||||
|
rpc { enable_streaming = true }
|
||||||
|
use_streaming_backend = true
|
||||||
|
`,
|
||||||
|
queryBackend: "streaming",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
runStep := func(t *testing.T, name string, fn func(t *testing.T)) {
|
||||||
|
t.Helper()
|
||||||
|
if !t.Run(name, fn) {
|
||||||
|
t.FailNow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
register := func(t *testing.T, a *TestAgent, name, tag string) {
|
||||||
|
args := &structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"),
|
||||||
|
Node: "node1",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: name,
|
||||||
|
Service: name,
|
||||||
|
Tags: []string{tag},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var out struct{}
|
||||||
|
require.NoError(t, a.RPC("Catalog.Register", args, &out))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
tc := tc
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
a := NewTestAgent(t, tc.hcl)
|
||||||
|
defer a.Shutdown()
|
||||||
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register one with a tag.
|
||||||
|
register(t, a, "web", "foo")
|
||||||
|
|
||||||
|
filterUrlPart := "filter=" + url.QueryEscape("foo in Service.Tags")
|
||||||
|
|
||||||
|
// TODO: use other call format
|
||||||
|
|
||||||
|
// Initial request with a filter should return one.
|
||||||
|
var lastIndex uint64
|
||||||
|
runStep(t, "read original", func(t *testing.T) {
|
||||||
|
req, err := http.NewRequest("GET", "/v1/health/service/web?dc=dc1&"+filterUrlPart, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.HealthServiceNodes(resp, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
nodes := obj.(structs.CheckServiceNodes)
|
||||||
|
|
||||||
|
require.Len(t, nodes, 1)
|
||||||
|
|
||||||
|
node := nodes[0]
|
||||||
|
require.Equal(t, "node1", node.Node.Node)
|
||||||
|
require.Equal(t, "web", node.Service.Service)
|
||||||
|
require.Equal(t, []string{"foo"}, node.Service.Tags)
|
||||||
|
|
||||||
|
require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))
|
||||||
|
|
||||||
|
idx := getIndex(t, resp)
|
||||||
|
require.True(t, idx > 0)
|
||||||
|
|
||||||
|
lastIndex = idx
|
||||||
|
})
|
||||||
|
|
||||||
|
const timeout = 30 * time.Second
|
||||||
|
runStep(t, "read blocking query result", func(t *testing.T) {
|
||||||
|
var (
|
||||||
|
// out and resp are not safe to read until reading from errCh
|
||||||
|
out structs.CheckServiceNodes
|
||||||
|
resp = httptest.NewRecorder()
|
||||||
|
errCh = make(chan error, 1)
|
||||||
|
)
|
||||||
|
go func() {
|
||||||
|
url := fmt.Sprintf("/v1/health/service/web?dc=dc1&index=%d&wait=%s&%s", lastIndex, timeout, filterUrlPart)
|
||||||
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
errCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, err := a.srv.HealthServiceNodes(resp, req)
|
||||||
|
if err != nil {
|
||||||
|
errCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
nodes := obj.(structs.CheckServiceNodes)
|
||||||
|
out = nodes
|
||||||
|
errCh <- nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// Change the tags.
|
||||||
|
register(t, a, "web", "bar")
|
||||||
|
|
||||||
|
if err := <-errCh; err != nil {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Len(t, out, 0)
|
||||||
|
require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
|
|
@ -80,11 +80,12 @@ func (s *healthView) Update(events []*pbsubscribe.Event) error {
|
||||||
return errors.New("check service node was unexpectedly nil")
|
return errors.New("check service node was unexpectedly nil")
|
||||||
}
|
}
|
||||||
passed, err := s.filter.Evaluate(*csn)
|
passed, err := s.filter.Evaluate(*csn)
|
||||||
switch {
|
if err != nil {
|
||||||
case err != nil:
|
|
||||||
return err
|
return err
|
||||||
case passed:
|
} else if passed {
|
||||||
s.state[id] = *csn
|
s.state[id] = *csn
|
||||||
|
} else {
|
||||||
|
delete(s.state, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
case pbsubscribe.CatalogOp_Deregister:
|
case pbsubscribe.CatalogOp_Deregister:
|
||||||
|
|
|
@ -398,6 +398,8 @@ const (
|
||||||
QueryBackendStreaming
|
QueryBackendStreaming
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (q QueryBackend) GoString() string { return q.String() }
|
||||||
|
|
||||||
func (q QueryBackend) String() string {
|
func (q QueryBackend) String() string {
|
||||||
switch q {
|
switch q {
|
||||||
case QueryBackendBlocking:
|
case QueryBackendBlocking:
|
||||||
|
|
Loading…
Reference in New Issue