diff --git a/.changelog/9703.txt b/.changelog/9703.txt new file mode 100644 index 0000000000..cde68e58b3 --- /dev/null +++ b/.changelog/9703.txt @@ -0,0 +1,3 @@ +```release-note:bug +streaming: lookup in health properly handle case-sensitivity and perform filtering based on tags +``` diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 526eca3549..855d4a3cc1 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -1,6 +1,8 @@ package state import ( + "strings" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/consul/acl" @@ -42,7 +44,7 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool { name = e.key } ns := e.Value.Service.EnterpriseMeta.GetNamespace() - return (key == "" || key == name) && (namespace == "" || namespace == ns) + return (key == "" || strings.EqualFold(key, name)) && (namespace == "" || namespace == ns) } // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot diff --git a/agent/dns_test.go b/agent/dns_test.go index fdc70118f2..e7506db0f9 100644 --- a/agent/dns_test.go +++ b/agent/dns_test.go @@ -3016,72 +3016,95 @@ func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) { } t.Parallel() - a := NewTestAgent(t, "") - defer a.Shutdown() - testrpc.WaitForLeader(t, a.RPC, "dc1") - - // Register a node with a service. - { - args := &structs.RegisterRequest{ - Datacenter: "dc1", - Node: "foo", - Address: "127.0.0.1", - Service: &structs.NodeService{ - Service: "Db", - Tags: []string{"Primary"}, - Port: 12345, - }, - } - - var out struct{} - if err := a.RPC("Catalog.Register", args, &out); err != nil { - t.Fatalf("err: %v", err) - } + tests := []struct { + name string + config string + }{ + // UDP + EDNS + {"normal", ""}, + {"cache", `dns_config{ allow_stale=true, max_stale="3h", use_cache=true, "cache_max_age"="3h"}`}, + {"cache-with-streaming", ` + rpc{ + enable_streaming=true + } + use_streaming_backend=true + dns_config{ allow_stale=true, max_stale="3h", use_cache=true, "cache_max_age"="3h"} + `}, } + for _, tst := range tests { + t.Run(fmt.Sprintf("A lookup %v", tst.name), func(t *testing.T) { + a := NewTestAgent(t, tst.config) + defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") - // Register an equivalent prepared query, as well as a name. - var id string - { - args := &structs.PreparedQueryRequest{ - Datacenter: "dc1", - Op: structs.PreparedQueryCreate, - Query: &structs.PreparedQuery{ - Name: "somequery", - Service: structs.ServiceQuery{ - Service: "db", - }, - }, - } - if err := a.RPC("PreparedQuery.Apply", args, &id); err != nil { - t.Fatalf("err: %v", err) - } - } + // Register a node with a service. + { + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &structs.NodeService{ + Service: "Db", + Tags: []string{"Primary"}, + Port: 12345, + }, + } - // Try some variations to make sure case doesn't matter. - questions := []string{ - "primary.db.service.consul.", - "pRIMARY.dB.service.consul.", - "PRIMARY.dB.service.consul.", - "db.service.consul.", - "DB.service.consul.", - "Db.service.consul.", - "somequery.query.consul.", - "SomeQuery.query.consul.", - "SOMEQUERY.query.consul.", - } - for _, question := range questions { - m := new(dns.Msg) - m.SetQuestion(question, dns.TypeSRV) + var out struct{} + if err := a.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + } - c := new(dns.Client) - in, _, err := c.Exchange(m, a.DNSAddr()) - if err != nil { - t.Fatalf("err: %v", err) - } + // Register an equivalent prepared query, as well as a name. + var id string + { + args := &structs.PreparedQueryRequest{ + Datacenter: "dc1", + Op: structs.PreparedQueryCreate, + Query: &structs.PreparedQuery{ + Name: "somequery", + Service: structs.ServiceQuery{ + Service: "db", + }, + }, + } + if err := a.RPC("PreparedQuery.Apply", args, &id); err != nil { + t.Fatalf("err: %v", err) + } + } - if len(in.Answer) != 1 { - t.Fatalf("empty lookup: %#v", in) - } + // Try some variations to make sure case doesn't matter. + questions := []string{ + "primary.Db.service.consul.", + "primary.db.service.consul.", + "pRIMARY.dB.service.consul.", + "PRIMARY.dB.service.consul.", + "db.service.consul.", + "DB.service.consul.", + "Db.service.consul.", + "somequery.query.consul.", + "SomeQuery.query.consul.", + "SOMEQUERY.query.consul.", + } + + for _, question := range questions { + m := new(dns.Msg) + m.SetQuestion(question, dns.TypeSRV) + + c := new(dns.Client) + retry.Run(t, func(r *retry.R) { + in, _, err := c.Exchange(m, a.DNSAddr()) + if err != nil { + t.Fatalf("err: %v", err) + } + + if len(in.Answer) != 1 { + t.Fatalf("empty lookup: %#v", in) + } + }) + } + }) } } diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 0118a363cd..e2c6bd63e3 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -2,6 +2,7 @@ package health import ( "context" + "strings" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" @@ -49,6 +50,7 @@ func (c *Client) getServiceNodes( ) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) { var out structs.IndexedCheckServiceNodes + req.ServiceName = strings.ToLower(req.ServiceName) if !req.QueryOptions.UseCache { err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out) return out, cache.ResultMeta{}, err @@ -68,5 +70,59 @@ func (c *Client) getServiceNodes( if !ok { panic("wrong response type for cachetype.HealthServicesName") } - return *value, md, nil + + return filterTags(value, req), md, nil +} + +func filterTags(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) structs.IndexedCheckServiceNodes { + if len(req.ServiceTags) == 0 || len(out.Nodes) == 0 { + return *out + } + tags := make([]string, 0, len(req.ServiceTags)) + for _, r := range req.ServiceTags { + // DNS has the bad habit to setting [""] for ServiceTags + if r != "" { + tags = append(tags, strings.ToLower(r)) + } + } + // No need to filter + if len(tags) == 0 { + return *out + } + results := make(structs.CheckServiceNodes, 0, len(out.Nodes)) + for _, service := range out.Nodes { + svc := service.Service + if !serviceTagsFilter(svc, tags) { + results = append(results, service) + } + } + out.Nodes = results + return *out +} + +// serviceTagsFilter return true if service does not contains all the given tags +func serviceTagsFilter(sn *structs.NodeService, tags []string) bool { + for _, tag := range tags { + if serviceTagFilter(sn, tag) { + // If any one of the expected tags was not found, filter the service + return true + } + } + + // If all tags were found, don't filter the service + return false +} + +// serviceTagFilter returns true (should filter) if the given service node +// doesn't contain the given tag. +func serviceTagFilter(sn *structs.NodeService, tag string) bool { + // Look for the lower cased version of the tag. + for _, t := range sn.Tags { + if strings.ToLower(t) == tag { + return false + } + } + + // If we didn't hit the tag above then we should filter. + return true }