mirror of https://github.com/status-im/consul.git
Streaming filter tags + case insensitive lookups for Service Names
Will fix: * https://github.com/hashicorp/consul/issues/9695 * https://github.com/hashicorp/consul/issues/9702
This commit is contained in:
parent
0d1301c408
commit
7a024ed074
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
streaming: lookup in health properly handle case-sensitivity and perform filtering based on tags
|
||||||
|
```
|
|
@ -1,6 +1,8 @@
|
||||||
package state
|
package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
memdb "github.com/hashicorp/go-memdb"
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
@ -42,7 +44,7 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool {
|
||||||
name = e.key
|
name = e.key
|
||||||
}
|
}
|
||||||
ns := e.Value.Service.EnterpriseMeta.GetNamespace()
|
ns := e.Value.Service.EnterpriseMeta.GetNamespace()
|
||||||
return (key == "" || key == name) && (namespace == "" || namespace == ns)
|
return (key == "" || strings.EqualFold(key, name)) && (namespace == "" || namespace == ns)
|
||||||
}
|
}
|
||||||
|
|
||||||
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
|
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
|
||||||
|
|
|
@ -3016,7 +3016,24 @@ func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
a := NewTestAgent(t, "")
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
config string
|
||||||
|
}{
|
||||||
|
// UDP + EDNS
|
||||||
|
{"normal", ""},
|
||||||
|
{"cache", `dns_config{ allow_stale=true, max_stale="3h", use_cache=true, "cache_max_age"="3h"}`},
|
||||||
|
{"cache-with-streaming", `
|
||||||
|
rpc{
|
||||||
|
enable_streaming=true
|
||||||
|
}
|
||||||
|
use_streaming_backend=true
|
||||||
|
dns_config{ allow_stale=true, max_stale="3h", use_cache=true, "cache_max_age"="3h"}
|
||||||
|
`},
|
||||||
|
}
|
||||||
|
for _, tst := range tests {
|
||||||
|
t.Run(fmt.Sprintf("A lookup %v", tst.name), func(t *testing.T) {
|
||||||
|
a := NewTestAgent(t, tst.config)
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
@ -3059,6 +3076,7 @@ func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) {
|
||||||
|
|
||||||
// Try some variations to make sure case doesn't matter.
|
// Try some variations to make sure case doesn't matter.
|
||||||
questions := []string{
|
questions := []string{
|
||||||
|
"primary.Db.service.consul.",
|
||||||
"primary.db.service.consul.",
|
"primary.db.service.consul.",
|
||||||
"pRIMARY.dB.service.consul.",
|
"pRIMARY.dB.service.consul.",
|
||||||
"PRIMARY.dB.service.consul.",
|
"PRIMARY.dB.service.consul.",
|
||||||
|
@ -3069,11 +3087,13 @@ func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) {
|
||||||
"SomeQuery.query.consul.",
|
"SomeQuery.query.consul.",
|
||||||
"SOMEQUERY.query.consul.",
|
"SOMEQUERY.query.consul.",
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, question := range questions {
|
for _, question := range questions {
|
||||||
m := new(dns.Msg)
|
m := new(dns.Msg)
|
||||||
m.SetQuestion(question, dns.TypeSRV)
|
m.SetQuestion(question, dns.TypeSRV)
|
||||||
|
|
||||||
c := new(dns.Client)
|
c := new(dns.Client)
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
in, _, err := c.Exchange(m, a.DNSAddr())
|
in, _, err := c.Exchange(m, a.DNSAddr())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
|
@ -3082,6 +3102,9 @@ func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) {
|
||||||
if len(in.Answer) != 1 {
|
if len(in.Answer) != 1 {
|
||||||
t.Fatalf("empty lookup: %#v", in)
|
t.Fatalf("empty lookup: %#v", in)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package health
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -49,6 +50,7 @@ func (c *Client) getServiceNodes(
|
||||||
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
||||||
var out structs.IndexedCheckServiceNodes
|
var out structs.IndexedCheckServiceNodes
|
||||||
|
|
||||||
|
req.ServiceName = strings.ToLower(req.ServiceName)
|
||||||
if !req.QueryOptions.UseCache {
|
if !req.QueryOptions.UseCache {
|
||||||
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
|
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
|
||||||
return out, cache.ResultMeta{}, err
|
return out, cache.ResultMeta{}, err
|
||||||
|
@ -68,5 +70,59 @@ func (c *Client) getServiceNodes(
|
||||||
if !ok {
|
if !ok {
|
||||||
panic("wrong response type for cachetype.HealthServicesName")
|
panic("wrong response type for cachetype.HealthServicesName")
|
||||||
}
|
}
|
||||||
return *value, md, nil
|
|
||||||
|
return filterTags(value, req), md, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func filterTags(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) structs.IndexedCheckServiceNodes {
|
||||||
|
if len(req.ServiceTags) == 0 || len(out.Nodes) == 0 {
|
||||||
|
return *out
|
||||||
|
}
|
||||||
|
tags := make([]string, 0, len(req.ServiceTags))
|
||||||
|
for _, r := range req.ServiceTags {
|
||||||
|
// DNS has the bad habit to setting [""] for ServiceTags
|
||||||
|
if r != "" {
|
||||||
|
tags = append(tags, strings.ToLower(r))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// No need to filter
|
||||||
|
if len(tags) == 0 {
|
||||||
|
return *out
|
||||||
|
}
|
||||||
|
results := make(structs.CheckServiceNodes, 0, len(out.Nodes))
|
||||||
|
for _, service := range out.Nodes {
|
||||||
|
svc := service.Service
|
||||||
|
if !serviceTagsFilter(svc, tags) {
|
||||||
|
results = append(results, service)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out.Nodes = results
|
||||||
|
return *out
|
||||||
|
}
|
||||||
|
|
||||||
|
// serviceTagsFilter return true if service does not contains all the given tags
|
||||||
|
func serviceTagsFilter(sn *structs.NodeService, tags []string) bool {
|
||||||
|
for _, tag := range tags {
|
||||||
|
if serviceTagFilter(sn, tag) {
|
||||||
|
// If any one of the expected tags was not found, filter the service
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If all tags were found, don't filter the service
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// serviceTagFilter returns true (should filter) if the given service node
|
||||||
|
// doesn't contain the given tag.
|
||||||
|
func serviceTagFilter(sn *structs.NodeService, tag string) bool {
|
||||||
|
// Look for the lower cased version of the tag.
|
||||||
|
for _, t := range sn.Tags {
|
||||||
|
if strings.ToLower(t) == tag {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we didn't hit the tag above then we should filter.
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue