mirror of
https://github.com/status-im/consul.git
synced 2025-02-23 19:08:22 +00:00
Merge pull request #2668 from hashicorp/f-prepared-query-nodemeta
Node metadata support in prepared queries
This commit is contained in:
commit
ebced2db68
@ -43,6 +43,11 @@ type ServiceQuery struct {
|
|||||||
// this list it must be present. If the tag is preceded with "!" then
|
// this list it must be present. If the tag is preceded with "!" then
|
||||||
// it is disallowed.
|
// it is disallowed.
|
||||||
Tags []string
|
Tags []string
|
||||||
|
|
||||||
|
// NodeMeta is a map of required node metadata fields. If a key/value
|
||||||
|
// pair is in this map it must be present on the node in order for the
|
||||||
|
// service entry to be returned.
|
||||||
|
NodeMeta map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryTemplate carries the arguments for creating a templated query.
|
// QueryTemplate carries the arguments for creating a templated query.
|
||||||
|
@ -20,6 +20,7 @@ func TestPreparedQuery(t *testing.T) {
|
|||||||
TaggedAddresses: map[string]string{
|
TaggedAddresses: map[string]string{
|
||||||
"wan": "127.0.0.1",
|
"wan": "127.0.0.1",
|
||||||
},
|
},
|
||||||
|
NodeMeta: map[string]string{"somekey": "somevalue"},
|
||||||
Service: &AgentService{
|
Service: &AgentService{
|
||||||
ID: "redis1",
|
ID: "redis1",
|
||||||
Service: "redis",
|
Service: "redis",
|
||||||
@ -48,6 +49,7 @@ func TestPreparedQuery(t *testing.T) {
|
|||||||
Name: "test",
|
Name: "test",
|
||||||
Service: ServiceQuery{
|
Service: ServiceQuery{
|
||||||
Service: "redis",
|
Service: "redis",
|
||||||
|
NodeMeta: map[string]string{"somekey": "somevalue"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,26 +42,11 @@ const (
|
|||||||
"but no reason was provided. This is a default message."
|
"but no reason was provided. This is a default message."
|
||||||
defaultServiceMaintReason = "Maintenance mode is enabled for this " +
|
defaultServiceMaintReason = "Maintenance mode is enabled for this " +
|
||||||
"service, but no reason was provided. This is a default message."
|
"service, but no reason was provided. This is a default message."
|
||||||
|
|
||||||
// The meta key prefix reserved for Consul's internal use
|
|
||||||
metaKeyReservedPrefix = "consul-"
|
|
||||||
|
|
||||||
// The maximum number of metadata key pairs allowed to be registered
|
|
||||||
metaMaxKeyPairs = 64
|
|
||||||
|
|
||||||
// The maximum allowed length of a metadata key
|
|
||||||
metaKeyMaxLength = 128
|
|
||||||
|
|
||||||
// The maximum allowed length of a metadata value
|
|
||||||
metaValueMaxLength = 512
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// dnsNameRe checks if a name or tag is dns-compatible.
|
// dnsNameRe checks if a name or tag is dns-compatible.
|
||||||
dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`)
|
dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`)
|
||||||
|
|
||||||
// metaKeyFormat checks if a metadata key string is valid
|
|
||||||
metaKeyFormat = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`).MatchString
|
|
||||||
)
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1789,41 +1774,6 @@ func parseMetaPair(raw string) (string, string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// validateMeta validates a set of key/value pairs from the agent config
|
|
||||||
func validateMetadata(meta map[string]string) error {
|
|
||||||
if len(meta) > metaMaxKeyPairs {
|
|
||||||
return fmt.Errorf("Node metadata cannot contain more than %d key/value pairs", metaMaxKeyPairs)
|
|
||||||
}
|
|
||||||
|
|
||||||
for key, value := range meta {
|
|
||||||
if err := validateMetaPair(key, value); err != nil {
|
|
||||||
return fmt.Errorf("Couldn't load metadata pair ('%s', '%s'): %s", key, value, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// validateMetaPair checks that the given key/value pair is in a valid format
|
|
||||||
func validateMetaPair(key, value string) error {
|
|
||||||
if key == "" {
|
|
||||||
return fmt.Errorf("Key cannot be blank")
|
|
||||||
}
|
|
||||||
if !metaKeyFormat(key) {
|
|
||||||
return fmt.Errorf("Key contains invalid characters")
|
|
||||||
}
|
|
||||||
if len(key) > metaKeyMaxLength {
|
|
||||||
return fmt.Errorf("Key is too long (limit: %d characters)", metaKeyMaxLength)
|
|
||||||
}
|
|
||||||
if strings.HasPrefix(key, metaKeyReservedPrefix) {
|
|
||||||
return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix)
|
|
||||||
}
|
|
||||||
if len(value) > metaValueMaxLength {
|
|
||||||
return fmt.Errorf("Value is too long (limit: %d characters)", metaValueMaxLength)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// unloadMetadata resets the local metadata state
|
// unloadMetadata resets the local metadata state
|
||||||
func (a *Agent) unloadMetadata() {
|
func (a *Agent) unloadMetadata() {
|
||||||
a.state.Lock()
|
a.state.Lock()
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -21,7 +22,6 @@ import (
|
|||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/go-uuid"
|
"github.com/hashicorp/go-uuid"
|
||||||
"github.com/hashicorp/raft"
|
"github.com/hashicorp/raft"
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -1919,69 +1919,6 @@ func TestAgent_purgeCheckState(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAgent_metadata(t *testing.T) {
|
|
||||||
// Load a valid set of key/value pairs
|
|
||||||
meta := map[string]string{
|
|
||||||
"key1": "value1",
|
|
||||||
"key2": "value2",
|
|
||||||
}
|
|
||||||
// Should succeed
|
|
||||||
if err := validateMetadata(meta); err != nil {
|
|
||||||
t.Fatalf("err: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Should get error
|
|
||||||
meta = map[string]string{
|
|
||||||
"": "value1",
|
|
||||||
}
|
|
||||||
if err := validateMetadata(meta); !strings.Contains(err.Error(), "Couldn't load metadata pair") {
|
|
||||||
t.Fatalf("should have failed")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Should get error
|
|
||||||
meta = make(map[string]string)
|
|
||||||
for i := 0; i < metaMaxKeyPairs+1; i++ {
|
|
||||||
meta[string(i)] = "value"
|
|
||||||
}
|
|
||||||
if err := validateMetadata(meta); !strings.Contains(err.Error(), "cannot contain more than") {
|
|
||||||
t.Fatalf("should have failed")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAgent_validateMetaPair(t *testing.T) {
|
|
||||||
longKey := strings.Repeat("a", metaKeyMaxLength+1)
|
|
||||||
longValue := strings.Repeat("b", metaValueMaxLength+1)
|
|
||||||
pairs := []struct {
|
|
||||||
Key string
|
|
||||||
Value string
|
|
||||||
Error string
|
|
||||||
}{
|
|
||||||
// valid pair
|
|
||||||
{"key", "value", ""},
|
|
||||||
// invalid, blank key
|
|
||||||
{"", "value", "cannot be blank"},
|
|
||||||
// allowed special chars in key name
|
|
||||||
{"k_e-y", "value", ""},
|
|
||||||
// disallowed special chars in key name
|
|
||||||
{"(%key&)", "value", "invalid characters"},
|
|
||||||
// key too long
|
|
||||||
{longKey, "value", "Key is too long"},
|
|
||||||
// reserved prefix
|
|
||||||
{metaKeyReservedPrefix + "key", "value", "reserved for internal use"},
|
|
||||||
// value too long
|
|
||||||
{"key", longValue, "Value is too long"},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, pair := range pairs {
|
|
||||||
err := validateMetaPair(pair.Key, pair.Value)
|
|
||||||
if pair.Error == "" && err != nil {
|
|
||||||
t.Fatalf("should have succeeded: %v, %v", pair, err)
|
|
||||||
} else if pair.Error != "" && !strings.Contains(err.Error(), pair.Error) {
|
|
||||||
t.Fatalf("should have failed: %v, %v", pair, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAgent_GetCoordinate(t *testing.T) {
|
func TestAgent_GetCoordinate(t *testing.T) {
|
||||||
check := func(server bool) {
|
check := func(server bool) {
|
||||||
config := nextConfig()
|
config := nextConfig()
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/aws/aws-sdk-go/aws/ec2metadata"
|
"github.com/aws/aws-sdk-go/aws/ec2metadata"
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
"github.com/aws/aws-sdk-go/service/ec2"
|
"github.com/aws/aws-sdk-go/service/ec2"
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
"github.com/hashicorp/consul/logger"
|
"github.com/hashicorp/consul/logger"
|
||||||
"github.com/hashicorp/consul/watch"
|
"github.com/hashicorp/consul/watch"
|
||||||
@ -397,7 +398,7 @@ func (c *Command) readConfig() *Config {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Verify the node metadata entries are valid
|
// Verify the node metadata entries are valid
|
||||||
if err := validateMetadata(config.Meta); err != nil {
|
if err := structs.ValidateMetadata(config.Meta); err != nil {
|
||||||
c.Ui.Error(fmt.Sprintf("Failed to parse node metadata: %v", err))
|
c.Ui.Error(fmt.Sprintf("Failed to parse node metadata: %v", err))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -90,6 +90,7 @@ func TestPreparedQuery_Create(t *testing.T) {
|
|||||||
},
|
},
|
||||||
OnlyPassing: true,
|
OnlyPassing: true,
|
||||||
Tags: []string{"foo", "bar"},
|
Tags: []string{"foo", "bar"},
|
||||||
|
NodeMeta: map[string]string{"somekey": "somevalue"},
|
||||||
},
|
},
|
||||||
DNS: structs.QueryDNSOptions{
|
DNS: structs.QueryDNSOptions{
|
||||||
TTL: "10s",
|
TTL: "10s",
|
||||||
@ -120,6 +121,7 @@ func TestPreparedQuery_Create(t *testing.T) {
|
|||||||
},
|
},
|
||||||
"OnlyPassing": true,
|
"OnlyPassing": true,
|
||||||
"Tags": []string{"foo", "bar"},
|
"Tags": []string{"foo", "bar"},
|
||||||
|
"NodeMeta": map[string]string{"somekey": "somevalue"},
|
||||||
},
|
},
|
||||||
"DNS": map[string]interface{}{
|
"DNS": map[string]interface{}{
|
||||||
"TTL": "10s",
|
"TTL": "10s",
|
||||||
@ -645,6 +647,7 @@ func TestPreparedQuery_Update(t *testing.T) {
|
|||||||
},
|
},
|
||||||
OnlyPassing: true,
|
OnlyPassing: true,
|
||||||
Tags: []string{"foo", "bar"},
|
Tags: []string{"foo", "bar"},
|
||||||
|
NodeMeta: map[string]string{"somekey": "somevalue"},
|
||||||
},
|
},
|
||||||
DNS: structs.QueryDNSOptions{
|
DNS: structs.QueryDNSOptions{
|
||||||
TTL: "10s",
|
TTL: "10s",
|
||||||
@ -676,6 +679,7 @@ func TestPreparedQuery_Update(t *testing.T) {
|
|||||||
},
|
},
|
||||||
"OnlyPassing": true,
|
"OnlyPassing": true,
|
||||||
"Tags": []string{"foo", "bar"},
|
"Tags": []string{"foo", "bar"},
|
||||||
|
"NodeMeta": map[string]string{"somekey": "somevalue"},
|
||||||
},
|
},
|
||||||
"DNS": map[string]interface{}{
|
"DNS": map[string]interface{}{
|
||||||
"TTL": "10s",
|
"TTL": "10s",
|
||||||
|
@ -138,13 +138,7 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
|
|||||||
|
|
||||||
reply.Index, reply.Nodes = index, nodes
|
reply.Index, reply.Nodes = index, nodes
|
||||||
if len(args.NodeMetaFilters) > 0 {
|
if len(args.NodeMetaFilters) > 0 {
|
||||||
var filtered structs.CheckServiceNodes
|
reply.Nodes = nodeMetaFilter(args.NodeMetaFilters, reply.Nodes)
|
||||||
for _, node := range nodes {
|
|
||||||
if structs.SatisfiesMetaFilters(node.Node.Meta, args.NodeMetaFilters) {
|
|
||||||
filtered = append(filtered, node)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
reply.Nodes = filtered
|
|
||||||
}
|
}
|
||||||
if err := h.srv.filterACL(args.Token, reply); err != nil {
|
if err := h.srv.filterACL(args.Token, reply); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -38,6 +38,11 @@ var (
|
|||||||
"${match(1)}",
|
"${match(1)}",
|
||||||
"${match(2)}",
|
"${match(2)}",
|
||||||
},
|
},
|
||||||
|
NodeMeta: map[string]string{
|
||||||
|
"foo": "${name.prefix}",
|
||||||
|
"bar": "${match(0)}",
|
||||||
|
"baz": "${match(1)}",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -222,6 +227,7 @@ func TestTemplate_Render(t *testing.T) {
|
|||||||
"${match(4)}",
|
"${match(4)}",
|
||||||
"${40 + 2}",
|
"${40 + 2}",
|
||||||
},
|
},
|
||||||
|
NodeMeta: map[string]string{"foo": "${match(1)}"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
ct, err := Compile(query)
|
ct, err := Compile(query)
|
||||||
@ -252,6 +258,7 @@ func TestTemplate_Render(t *testing.T) {
|
|||||||
"",
|
"",
|
||||||
"42",
|
"42",
|
||||||
},
|
},
|
||||||
|
NodeMeta: map[string]string{"foo": "hello"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(actual, expected) {
|
if !reflect.DeepEqual(actual, expected) {
|
||||||
@ -282,6 +289,7 @@ func TestTemplate_Render(t *testing.T) {
|
|||||||
"",
|
"",
|
||||||
"42",
|
"42",
|
||||||
},
|
},
|
||||||
|
NodeMeta: map[string]string{"foo": ""},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(actual, expected) {
|
if !reflect.DeepEqual(actual, expected) {
|
||||||
|
@ -34,6 +34,20 @@ func visit(path string, v reflect.Value, t reflect.Type, fn visitor) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case reflect.Map:
|
||||||
|
for _, key := range v.MapKeys() {
|
||||||
|
value := v.MapIndex(key)
|
||||||
|
|
||||||
|
newValue := reflect.New(value.Type()).Elem()
|
||||||
|
newValue.SetString(value.String())
|
||||||
|
|
||||||
|
if err := visit(fmt.Sprintf("%s[%s]", path, key.String()), newValue, newValue.Type(), fn); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// overwrite the entry in case it was modified by the callback
|
||||||
|
v.SetMapIndex(key, newValue)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
"sort"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWalk_ServiceQuery(t *testing.T) {
|
func TestWalk_ServiceQuery(t *testing.T) {
|
||||||
@ -22,20 +23,24 @@ func TestWalk_ServiceQuery(t *testing.T) {
|
|||||||
},
|
},
|
||||||
Near: "_agent",
|
Near: "_agent",
|
||||||
Tags: []string{"tag1", "tag2", "tag3"},
|
Tags: []string{"tag1", "tag2", "tag3"},
|
||||||
|
NodeMeta: map[string]string{"foo": "bar", "role": "server"},
|
||||||
}
|
}
|
||||||
if err := walk(service, fn); err != nil {
|
if err := walk(service, fn); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
expected := []string{
|
expected := []string{
|
||||||
".Service:the-service",
|
|
||||||
".Failover.Datacenters[0]:dc1",
|
".Failover.Datacenters[0]:dc1",
|
||||||
".Failover.Datacenters[1]:dc2",
|
".Failover.Datacenters[1]:dc2",
|
||||||
".Near:_agent",
|
".Near:_agent",
|
||||||
|
".NodeMeta[foo]:bar",
|
||||||
|
".NodeMeta[role]:server",
|
||||||
|
".Service:the-service",
|
||||||
".Tags[0]:tag1",
|
".Tags[0]:tag1",
|
||||||
".Tags[1]:tag2",
|
".Tags[1]:tag2",
|
||||||
".Tags[2]:tag3",
|
".Tags[2]:tag3",
|
||||||
}
|
}
|
||||||
|
sort.Strings(actual)
|
||||||
if !reflect.DeepEqual(actual, expected) {
|
if !reflect.DeepEqual(actual, expected) {
|
||||||
t.Fatalf("bad: %#v", actual)
|
t.Fatalf("bad: %#v", actual)
|
||||||
}
|
}
|
||||||
|
@ -178,6 +178,11 @@ func parseService(svc *structs.ServiceQuery) error {
|
|||||||
return fmt.Errorf("Bad NearestN '%d', must be >= 0", svc.Failover.NearestN)
|
return fmt.Errorf("Bad NearestN '%d', must be >= 0", svc.Failover.NearestN)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make sure the metadata filters are valid
|
||||||
|
if err := structs.ValidateMetadata(svc.NodeMeta); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// We skip a few fields:
|
// We skip a few fields:
|
||||||
// - There's no validation for Datacenters; we skip any unknown entries
|
// - There's no validation for Datacenters; we skip any unknown entries
|
||||||
// at execution time.
|
// at execution time.
|
||||||
@ -492,6 +497,11 @@ func (p *PreparedQuery) execute(query *structs.PreparedQuery,
|
|||||||
// Filter out any unhealthy nodes.
|
// Filter out any unhealthy nodes.
|
||||||
nodes = nodes.Filter(query.Service.OnlyPassing)
|
nodes = nodes.Filter(query.Service.OnlyPassing)
|
||||||
|
|
||||||
|
// Apply the node metadata filters, if any.
|
||||||
|
if len(query.Service.NodeMeta) > 0 {
|
||||||
|
nodes = nodeMetaFilter(query.Service.NodeMeta, nodes)
|
||||||
|
}
|
||||||
|
|
||||||
// Apply the tag filters, if any.
|
// Apply the tag filters, if any.
|
||||||
if len(query.Service.Tags) > 0 {
|
if len(query.Service.Tags) > 0 {
|
||||||
nodes = tagFilter(query.Service.Tags, nodes)
|
nodes = tagFilter(query.Service.Tags, nodes)
|
||||||
@ -562,6 +572,18 @@ func tagFilter(tags []string, nodes structs.CheckServiceNodes) structs.CheckServ
|
|||||||
return nodes[:n]
|
return nodes[:n]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// nodeMetaFilter returns a list of the nodes who satisfy the given metadata filters. Nodes
|
||||||
|
// must have ALL the given tags.
|
||||||
|
func nodeMetaFilter(filters map[string]string, nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
|
||||||
|
var filtered structs.CheckServiceNodes
|
||||||
|
for _, node := range nodes {
|
||||||
|
if structs.SatisfiesMetaFilters(node.Node.Meta, filters) {
|
||||||
|
filtered = append(filtered, node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return filtered
|
||||||
|
}
|
||||||
|
|
||||||
// queryServer is a wrapper that makes it easier to test the failover logic.
|
// queryServer is a wrapper that makes it easier to test the failover logic.
|
||||||
type queryServer interface {
|
type queryServer interface {
|
||||||
GetLogger() *log.Logger
|
GetLogger() *log.Logger
|
||||||
|
@ -604,6 +604,17 @@ func TestPreparedQuery_parseQuery(t *testing.T) {
|
|||||||
if err := parseQuery(query, version8); err != nil {
|
if err := parseQuery(query, version8); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
query.Service.NodeMeta = map[string]string{"": "somevalue"}
|
||||||
|
err = parseQuery(query, version8)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "cannot be blank") {
|
||||||
|
t.Fatalf("bad: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
query.Service.NodeMeta = map[string]string{"somekey": "somevalue"}
|
||||||
|
if err := parseQuery(query, version8); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1482,6 +1493,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||||||
Datacenter: dc,
|
Datacenter: dc,
|
||||||
Node: fmt.Sprintf("node%d", i+1),
|
Node: fmt.Sprintf("node%d", i+1),
|
||||||
Address: fmt.Sprintf("127.0.0.%d", i+1),
|
Address: fmt.Sprintf("127.0.0.%d", i+1),
|
||||||
|
NodeMeta: map[string]string{
|
||||||
|
"group": fmt.Sprintf("%d", i/5),
|
||||||
|
"instance_type": "t2.micro",
|
||||||
|
},
|
||||||
Service: &structs.NodeService{
|
Service: &structs.NodeService{
|
||||||
Service: "foo",
|
Service: "foo",
|
||||||
Port: 8000,
|
Port: 8000,
|
||||||
@ -1489,6 +1504,9 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||||||
},
|
},
|
||||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
}
|
}
|
||||||
|
if i == 0 {
|
||||||
|
req.NodeMeta["unique"] = "true"
|
||||||
|
}
|
||||||
|
|
||||||
var codec rpc.ClientCodec
|
var codec rpc.ClientCodec
|
||||||
if dc == "dc1" {
|
if dc == "dc1" {
|
||||||
@ -1587,6 +1605,72 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run various service queries with node metadata filters.
|
||||||
|
if false {
|
||||||
|
cases := []struct {
|
||||||
|
filters map[string]string
|
||||||
|
numNodes int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
filters: map[string]string{},
|
||||||
|
numNodes: 10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
filters: map[string]string{"instance_type": "t2.micro"},
|
||||||
|
numNodes: 10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
filters: map[string]string{"group": "1"},
|
||||||
|
numNodes: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
filters: map[string]string{"group": "0", "unique": "true"},
|
||||||
|
numNodes: 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
nodeMetaQuery := structs.PreparedQueryRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.PreparedQueryCreate,
|
||||||
|
Query: &structs.PreparedQuery{
|
||||||
|
Service: structs.ServiceQuery{
|
||||||
|
Service: "foo",
|
||||||
|
NodeMeta: tc.filters,
|
||||||
|
},
|
||||||
|
DNS: structs.QueryDNSOptions{
|
||||||
|
TTL: "10s",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &nodeMetaQuery, &nodeMetaQuery.Query.ID); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: nodeMetaQuery.Query.ID,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(reply.Nodes) != tc.numNodes {
|
||||||
|
t.Fatalf("bad: %v, %v", len(reply.Nodes), tc.numNodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, node := range reply.Nodes {
|
||||||
|
if !structs.SatisfiesMetaFilters(node.Node.Meta, tc.filters) {
|
||||||
|
t.Fatalf("bad: %v", node.Node.Meta)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Push a coordinate for one of the nodes so we can try an RTT sort. We
|
// Push a coordinate for one of the nodes so we can try an RTT sort. We
|
||||||
// have to sleep a little while for the coordinate batch to get flushed.
|
// have to sleep a little while for the coordinate batch to get flushed.
|
||||||
{
|
{
|
||||||
@ -1690,9 +1774,8 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.PreparedQueryExecuteResponse
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
@ -1725,10 +1808,9 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.PreparedQueryExecuteResponse
|
|
||||||
|
|
||||||
shuffled := false
|
shuffled := false
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
@ -1759,9 +1841,8 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.PreparedQueryExecuteResponse
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
@ -1792,9 +1873,8 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.PreparedQueryExecuteResponse
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
@ -1821,12 +1901,11 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.PreparedQueryExecuteResponse
|
|
||||||
|
|
||||||
// Expect the set to be shuffled since we have no coordinates
|
// Expect the set to be shuffled since we have no coordinates
|
||||||
// on the "foo" node.
|
// on the "foo" node.
|
||||||
shuffled := false
|
shuffled := false
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
@ -1861,10 +1940,9 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.PreparedQueryExecuteResponse
|
|
||||||
|
|
||||||
shuffled := false
|
shuffled := false
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -44,6 +44,11 @@ type ServiceQuery struct {
|
|||||||
// this list it must be present. If the tag is preceded with "!" then
|
// this list it must be present. If the tag is preceded with "!" then
|
||||||
// it is disallowed.
|
// it is disallowed.
|
||||||
Tags []string
|
Tags []string
|
||||||
|
|
||||||
|
// NodeMeta is a map of required node metadata fields. If a key/value
|
||||||
|
// pair is in this map it must be present on the node in order for the
|
||||||
|
// service entry to be returned.
|
||||||
|
NodeMeta map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -11,6 +11,8 @@ import (
|
|||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
"github.com/hashicorp/serf/coordinate"
|
"github.com/hashicorp/serf/coordinate"
|
||||||
|
"regexp"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -67,6 +69,25 @@ const (
|
|||||||
ServiceMaintPrefix = "_service_maintenance:"
|
ServiceMaintPrefix = "_service_maintenance:"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// The meta key prefix reserved for Consul's internal use
|
||||||
|
metaKeyReservedPrefix = "consul-"
|
||||||
|
|
||||||
|
// The maximum number of metadata key pairs allowed to be registered
|
||||||
|
metaMaxKeyPairs = 64
|
||||||
|
|
||||||
|
// The maximum allowed length of a metadata key
|
||||||
|
metaKeyMaxLength = 128
|
||||||
|
|
||||||
|
// The maximum allowed length of a metadata value
|
||||||
|
metaValueMaxLength = 512
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// metaKeyFormat checks if a metadata key string is valid
|
||||||
|
metaKeyFormat = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`).MatchString
|
||||||
|
)
|
||||||
|
|
||||||
func ValidStatus(s string) bool {
|
func ValidStatus(s string) bool {
|
||||||
return s == HealthPassing ||
|
return s == HealthPassing ||
|
||||||
s == HealthWarning ||
|
s == HealthWarning ||
|
||||||
@ -292,6 +313,41 @@ type Node struct {
|
|||||||
}
|
}
|
||||||
type Nodes []*Node
|
type Nodes []*Node
|
||||||
|
|
||||||
|
// ValidateMeta validates a set of key/value pairs from the agent config
|
||||||
|
func ValidateMetadata(meta map[string]string) error {
|
||||||
|
if len(meta) > metaMaxKeyPairs {
|
||||||
|
return fmt.Errorf("Node metadata cannot contain more than %d key/value pairs", metaMaxKeyPairs)
|
||||||
|
}
|
||||||
|
|
||||||
|
for key, value := range meta {
|
||||||
|
if err := validateMetaPair(key, value); err != nil {
|
||||||
|
return fmt.Errorf("Couldn't load metadata pair ('%s', '%s'): %s", key, value, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// validateMetaPair checks that the given key/value pair is in a valid format
|
||||||
|
func validateMetaPair(key, value string) error {
|
||||||
|
if key == "" {
|
||||||
|
return fmt.Errorf("Key cannot be blank")
|
||||||
|
}
|
||||||
|
if !metaKeyFormat(key) {
|
||||||
|
return fmt.Errorf("Key contains invalid characters")
|
||||||
|
}
|
||||||
|
if len(key) > metaKeyMaxLength {
|
||||||
|
return fmt.Errorf("Key is too long (limit: %d characters)", metaKeyMaxLength)
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(key, metaKeyReservedPrefix) {
|
||||||
|
return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix)
|
||||||
|
}
|
||||||
|
if len(value) > metaValueMaxLength {
|
||||||
|
return fmt.Errorf("Value is too long (limit: %d characters)", metaValueMaxLength)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// SatisfiesMetaFilters returns true if the metadata map contains the given filters
|
// SatisfiesMetaFilters returns true if the metadata map contains the given filters
|
||||||
func SatisfiesMetaFilters(meta map[string]string, filters map[string]string) bool {
|
func SatisfiesMetaFilters(meta map[string]string, filters map[string]string) bool {
|
||||||
for key, value := range filters {
|
for key, value := range filters {
|
||||||
|
@ -501,3 +501,66 @@ func TestStructs_DirEntry_Clone(t *testing.T) {
|
|||||||
t.Fatalf("clone wasn't independent of the original")
|
t.Fatalf("clone wasn't independent of the original")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStructs_ValidateMetadata(t *testing.T) {
|
||||||
|
// Load a valid set of key/value pairs
|
||||||
|
meta := map[string]string{
|
||||||
|
"key1": "value1",
|
||||||
|
"key2": "value2",
|
||||||
|
}
|
||||||
|
// Should succeed
|
||||||
|
if err := ValidateMetadata(meta); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should get error
|
||||||
|
meta = map[string]string{
|
||||||
|
"": "value1",
|
||||||
|
}
|
||||||
|
if err := ValidateMetadata(meta); !strings.Contains(err.Error(), "Couldn't load metadata pair") {
|
||||||
|
t.Fatalf("should have failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should get error
|
||||||
|
meta = make(map[string]string)
|
||||||
|
for i := 0; i < metaMaxKeyPairs+1; i++ {
|
||||||
|
meta[string(i)] = "value"
|
||||||
|
}
|
||||||
|
if err := ValidateMetadata(meta); !strings.Contains(err.Error(), "cannot contain more than") {
|
||||||
|
t.Fatalf("should have failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStructs_validateMetaPair(t *testing.T) {
|
||||||
|
longKey := strings.Repeat("a", metaKeyMaxLength+1)
|
||||||
|
longValue := strings.Repeat("b", metaValueMaxLength+1)
|
||||||
|
pairs := []struct {
|
||||||
|
Key string
|
||||||
|
Value string
|
||||||
|
Error string
|
||||||
|
}{
|
||||||
|
// valid pair
|
||||||
|
{"key", "value", ""},
|
||||||
|
// invalid, blank key
|
||||||
|
{"", "value", "cannot be blank"},
|
||||||
|
// allowed special chars in key name
|
||||||
|
{"k_e-y", "value", ""},
|
||||||
|
// disallowed special chars in key name
|
||||||
|
{"(%key&)", "value", "invalid characters"},
|
||||||
|
// key too long
|
||||||
|
{longKey, "value", "Key is too long"},
|
||||||
|
// reserved prefix
|
||||||
|
{metaKeyReservedPrefix + "key", "value", "reserved for internal use"},
|
||||||
|
// value too long
|
||||||
|
{"key", longValue, "Value is too long"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, pair := range pairs {
|
||||||
|
err := validateMetaPair(pair.Key, pair.Value)
|
||||||
|
if pair.Error == "" && err != nil {
|
||||||
|
t.Fatalf("should have succeeded: %v, %v", pair, err)
|
||||||
|
} else if pair.Error != "" && !strings.Contains(err.Error(), pair.Error) {
|
||||||
|
t.Fatalf("should have failed: %v, %v", pair, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -78,7 +78,8 @@ query, like this example:
|
|||||||
},
|
},
|
||||||
"Near": "node1",
|
"Near": "node1",
|
||||||
"OnlyPassing": false,
|
"OnlyPassing": false,
|
||||||
"Tags": ["primary", "!experimental"]
|
"Tags": ["primary", "!experimental"],
|
||||||
|
"NodeMeta": {"instance_type": "m3.large"}
|
||||||
},
|
},
|
||||||
"DNS": {
|
"DNS": {
|
||||||
"TTL": "10s"
|
"TTL": "10s"
|
||||||
@ -162,6 +163,10 @@ to pass the tag filter it must have *all* of the required tags, and *none* of th
|
|||||||
excluded tags (prefixed with `!`). The default value is an empty list, which does
|
excluded tags (prefixed with `!`). The default value is an empty list, which does
|
||||||
no tag filtering.
|
no tag filtering.
|
||||||
|
|
||||||
|
`NodeMeta` provides a list of user-defined key/value pairs that will be used for
|
||||||
|
filtering the query results to nodes with the given metadata values present. This
|
||||||
|
was added in Consul 0.7.3.
|
||||||
|
|
||||||
`TTL` in the `DNS` structure is a duration string that can use `s` as a
|
`TTL` in the `DNS` structure is a duration string that can use `s` as a
|
||||||
suffix for seconds. It controls how the TTL is set when query results are served
|
suffix for seconds. It controls how the TTL is set when query results are served
|
||||||
over DNS. If this isn't specified, then the Consul agent configuration for the given
|
over DNS. If this isn't specified, then the Consul agent configuration for the given
|
||||||
@ -199,7 +204,8 @@ and features. Here's an example:
|
|||||||
"Datacenters": ["dc1", "dc2"]
|
"Datacenters": ["dc1", "dc2"]
|
||||||
},
|
},
|
||||||
"OnlyPassing": true,
|
"OnlyPassing": true,
|
||||||
"Tags": ["${match(2)}"]
|
"Tags": ["${match(2)}"],
|
||||||
|
"NodeMeta": {"instance_type": "m3.large"}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
@ -303,7 +309,8 @@ This returns a JSON list of prepared queries, which looks like:
|
|||||||
"Datacenters": ["dc1", "dc2"]
|
"Datacenters": ["dc1", "dc2"]
|
||||||
},
|
},
|
||||||
"OnlyPassing": false,
|
"OnlyPassing": false,
|
||||||
"Tags": ["primary", "!experimental"]
|
"Tags": ["primary", "!experimental"],
|
||||||
|
"NodeMeta": {"instance_type": "m3.large"}
|
||||||
},
|
},
|
||||||
"DNS": {
|
"DNS": {
|
||||||
"TTL": "10s"
|
"TTL": "10s"
|
||||||
@ -408,7 +415,8 @@ a JSON body will be returned like this:
|
|||||||
"TaggedAddresses": {
|
"TaggedAddresses": {
|
||||||
"lan": "10.1.10.12",
|
"lan": "10.1.10.12",
|
||||||
"wan": "10.1.10.12"
|
"wan": "10.1.10.12"
|
||||||
}
|
},
|
||||||
|
"NodeMeta": {"instance_type": "m3.large"}
|
||||||
},
|
},
|
||||||
"Service": {
|
"Service": {
|
||||||
"ID": "redis",
|
"ID": "redis",
|
||||||
@ -500,7 +508,8 @@ a JSON body will be returned like this:
|
|||||||
"Datacenters": ["dc1", "dc2"]
|
"Datacenters": ["dc1", "dc2"]
|
||||||
},
|
},
|
||||||
"OnlyPassing": true,
|
"OnlyPassing": true,
|
||||||
"Tags": ["primary"]
|
"Tags": ["primary"],
|
||||||
|
"NodeMeta": {"instance_type": "m3.large"}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
Loading…
x
Reference in New Issue
Block a user