From 2fe3ab7db08c34ded21f203cdb10f7c356ea72bb Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Mon, 8 Feb 2021 17:53:18 +0100
Subject: [PATCH] [Streaming] Properly filters node-meta queries on health
This wil fix https://github.com/hashicorp/consul/issues/9730
---
agent/health_endpoint_test.go | 115 ++++++++++++++++++++-----------
agent/rpcclient/health/health.go | 17 ++++-
2 files changed, 92 insertions(+), 40 deletions(-)
diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go
index 975c42d556..edf6187ec5 100644
--- a/agent/health_endpoint_test.go
+++ b/agent/health_endpoint_test.go
@@ -8,6 +8,7 @@ import (
"net/http/httptest"
"net/url"
"reflect"
+ "strconv"
"testing"
"github.com/hashicorp/consul/agent/structs"
@@ -733,54 +734,90 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
}
t.Parallel()
- a := NewTestAgent(t, "")
- defer a.Shutdown()
- testrpc.WaitForLeader(t, a.RPC, "dc1")
- req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil)
- resp := httptest.NewRecorder()
- obj, err := a.srv.HealthServiceNodes(resp, req)
- if err != nil {
- t.Fatalf("err: %v", err)
+ tests := []struct {
+ name string
+ config string
+ }{
+ {"normal", ""},
+ {"cache-with-streaming", `
+ rpc{
+ enable_streaming=true
+ }
+ use_streaming_backend=true
+ `},
}
+ for _, tst := range tests {
+ t.Run(tst.name, func(t *testing.T) {
- assertIndex(t, resp)
+ a := NewTestAgent(t, tst.config)
+ defer a.Shutdown()
+ testrpc.WaitForLeader(t, a.RPC, "dc1")
- // Should be a non-nil empty list
- nodes := obj.(structs.CheckServiceNodes)
- if nodes == nil || len(nodes) != 0 {
- t.Fatalf("bad: %v", obj)
- }
+ req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil)
+ resp := httptest.NewRecorder()
+ obj, err := a.srv.HealthServiceNodes(resp, req)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
- args := &structs.RegisterRequest{
- Datacenter: "dc1",
- Node: "bar",
- Address: "127.0.0.1",
- NodeMeta: map[string]string{"somekey": "somevalue"},
- Service: &structs.NodeService{
- ID: "test",
- Service: "test",
- },
- }
+ assertIndex(t, resp)
- var out struct{}
- if err := a.RPC("Catalog.Register", args, &out); err != nil {
- t.Fatalf("err: %v", err)
- }
+ cIndex, err := strconv.ParseUint(resp.Header().Get("X-Consul-Index"), 10, 64)
+ require.NoError(t, err)
- req, _ = http.NewRequest("GET", "/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue", nil)
- resp = httptest.NewRecorder()
- obj, err = a.srv.HealthServiceNodes(resp, req)
- if err != nil {
- t.Fatalf("err: %v", err)
- }
+ // Should be a non-nil empty list
+ nodes := obj.(structs.CheckServiceNodes)
+ if nodes == nil || len(nodes) != 0 {
+ t.Fatalf("bad: %v", obj)
+ }
- assertIndex(t, resp)
+ args := &structs.RegisterRequest{
+ Datacenter: "dc1",
+ Node: "bar",
+ Address: "127.0.0.1",
+ NodeMeta: map[string]string{"somekey": "somevalue"},
+ Service: &structs.NodeService{
+ ID: "test",
+ Service: "test",
+ },
+ }
- // Should be a non-nil empty list for checks
- nodes = obj.(structs.CheckServiceNodes)
- if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
- t.Fatalf("bad: %v", obj)
+ var out struct{}
+ if err := a.RPC("Catalog.Register", args, &out); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ args = &structs.RegisterRequest{
+ Datacenter: "dc1",
+ Node: "bar2",
+ Address: "127.0.0.1",
+ NodeMeta: map[string]string{"somekey": "othervalue"},
+ Service: &structs.NodeService{
+ ID: "test2",
+ Service: "test",
+ },
+ }
+
+ if err := a.RPC("Catalog.Register", args, &out); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ req, _ = http.NewRequest("GET", fmt.Sprintf("/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue&index=%d&wait=10ms", cIndex), nil)
+ resp = httptest.NewRecorder()
+ obj, err = a.srv.HealthServiceNodes(resp, req)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ assertIndex(t, resp)
+
+ // Should be a non-nil empty list for checks
+ nodes = obj.(structs.CheckServiceNodes)
+ if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
+ t.Fatalf("bad: %v", obj)
+ }
+ })
}
}
diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go
index e2c6bd63e3..c374bbb3fb 100644
--- a/agent/rpcclient/health/health.go
+++ b/agent/rpcclient/health/health.go
@@ -71,7 +71,7 @@ func (c *Client) getServiceNodes(
panic("wrong response type for cachetype.HealthServicesName")
}
- return filterTags(value, req), md, nil
+ return filterTags(filterNodeMeta(value, req), req), md, nil
}
func filterTags(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) structs.IndexedCheckServiceNodes {
@@ -126,3 +126,18 @@ func serviceTagFilter(sn *structs.NodeService, tag string) bool {
// If we didn't hit the tag above then we should filter.
return true
}
+
+func filterNodeMeta(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) *structs.IndexedCheckServiceNodes {
+ if len(req.NodeMetaFilters) == 0 || len(out.Nodes) == 0 {
+ return out
+ }
+ results := make(structs.CheckServiceNodes, 0, len(out.Nodes))
+ for _, service := range out.Nodes {
+ serviceNode := service.Node
+ if structs.SatisfiesMetaFilters(serviceNode.Meta, req.NodeMetaFilters) {
+ results = append(results, service)
+ }
+ }
+ out.Nodes = results
+ return out
+}