diff --git a/command/agent/agent.go b/command/agent/agent.go index 9b8c5be095..4b71fcce99 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -42,11 +42,26 @@ const ( "but no reason was provided. This is a default message." defaultServiceMaintReason = "Maintenance mode is enabled for this " + "service, but no reason was provided. This is a default message." + + // The meta key prefix reserved for Consul's internal use + metaKeyReservedPrefix = "consul-" + + // The maximum number of metadata key pairs allowed to be registered + metaMaxKeyPairs = 64 + + // The maximum allowed length of a metadata key + metaKeyMaxLength = 128 + + // The maximum allowed length of a metadata value + metaValueMaxLength = 512 ) var ( // dnsNameRe checks if a name or tag is dns-compatible. dnsNameRe = regexp.MustCompile(`^[a-zA-Z0-9\-]+$`) + + // metaKeyFormat checks if a metadata key string is valid + metaKeyFormat = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`).MatchString ) /* @@ -1686,15 +1701,13 @@ func (a *Agent) loadMetadata(conf *Config) error { a.state.Lock() defer a.state.Unlock() + if len(conf.Meta) > metaMaxKeyPairs { + return fmt.Errorf("Node metadata cannot contain more than %d key/value pairs", metaMaxKeyPairs) + } + for key, value := range conf.Meta { - if key == "" { - return fmt.Errorf("Key name cannot be blank") - } - if strings.Contains(key, ":") { - return fmt.Errorf("Key name cannot contain ':' character: %s", key) - } - if strings.HasPrefix(key, "consul-") { - return fmt.Errorf("Key prefix 'consul-' is reserved for internal use") + if err := validateMetaPair(key, value); err != nil { + return fmt.Errorf("Couldn't load metadata pair ('%s', '%s'): %s", key, value, err) } a.state.metadata[key] = value } @@ -1704,6 +1717,29 @@ func (a *Agent) loadMetadata(conf *Config) error { return nil } +// validateMetaPair checks that the given key/value pair is in a valid format +func validateMetaPair(key, value string) error { + if key == "" { + return fmt.Errorf("Key cannot be blank") + } + if !metaKeyFormat(key) { + return fmt.Errorf("Key contains invalid characters") + } + if len(key) > metaKeyMaxLength { + return fmt.Errorf("Key is longer than %d chars", metaKeyMaxLength) + } + if strings.Contains(key, ":") { + return fmt.Errorf("Key contains ':' character") + } + if strings.HasPrefix(key, metaKeyReservedPrefix) { + return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix) + } + if len(value) > metaValueMaxLength { + return fmt.Errorf("Value is longer than %d characters", metaValueMaxLength) + } + return nil +} + // unloadMetadata resets the local metadata state func (a *Agent) unloadMetadata() error { a.state.Lock() diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 23a9ea0065..42e3269f74 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -1862,12 +1862,13 @@ func TestAgent_metadata(t *testing.T) { "key1": "value1", "key2": "value2", } + // Should succeed if err := agent.loadMetadata(config); err != nil { t.Fatalf("err: %s", err) } agent.unloadMetadata() - // Should fail, keys can't be blank + // Should get error config.Meta = map[string]string{ "": "value1", } @@ -1876,23 +1877,48 @@ func TestAgent_metadata(t *testing.T) { } agent.unloadMetadata() - // Should fail, keys can't contain ':' - config.Meta = map[string]string{ - "key:123": "value1", + // Should get error + tooManyKeys := make(map[string]string) + for i := 0; i < metaMaxKeyPairs+1; i++ { + tooManyKeys[string(i)] = "value" } if err := agent.loadMetadata(config); err == nil { t.Fatalf("should have failed") } - agent.unloadMetadata() +} - // Should fail, keys can't begin with 'consul-' - config.Meta = map[string]string{ - "consul-key": "value1", +func TestAgent_validateMetaPair(t *testing.T) { + longKey := fmt.Sprintf(fmt.Sprintf("%%%ds", metaKeyMaxLength+1), "") + longValue := fmt.Sprintf(fmt.Sprintf("%%%ds", metaValueMaxLength+1), "") + pairs := []struct { + Key string + Value string + Success bool + }{ + // valid pair + {"key", "value", true}, + // invalid, blank key + {"", "value", false}, + // allowed special chars in key name + {"k_e-y", "value", true}, + // ':' in key name + {"k:ey", "value", false}, + // disallowed special chars in key name + {"(%key&)", "value", false}, + // key too long + {longKey, "value", false}, + // reserved prefix + {metaKeyReservedPrefix + "key", "value", false}, + // value too long + {"key", longValue, false}, } - if err := agent.loadMetadata(config); err == nil { - t.Fatalf("should have failed") + + for _, pair := range pairs { + err := validateMetaPair(pair.Key, pair.Value) + if pair.Success != (err == nil) { + t.Fatalf("bad: %v, %v", pair, err) + } } - agent.unloadMetadata() } func TestAgent_GetCoordinate(t *testing.T) { diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index 13d4250442..05b431a923 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -64,17 +64,10 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) ( // Setup the request args := structs.DCSpecificRequest{} s.parseSource(req, &args.Source) + s.parseMetaFilter(req, &args.NodeMetaKey, &args.NodeMetaValue) if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } - // Try to parse node metadata filter params - if filter, ok := req.URL.Query()["node-meta"]; ok && len(filter) > 0 { - pair := strings.SplitN(filter[0], ":", 2) - args.NodeMetaKey = pair[0] - if len(pair) == 2 { - args.NodeMetaValue = pair[1] - } - } var out structs.IndexedNodes defer setMeta(resp, &out.QueryMeta) @@ -93,6 +86,7 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) ( func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Set default DC args := structs.DCSpecificRequest{} + s.parseMetaFilter(req, &args.NodeMetaKey, &args.NodeMetaValue) if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index 97f706d8ad..62cd3bd904 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -145,7 +145,7 @@ func TestCatalogNodes(t *testing.T) { } } -func TestCatalogNodes_metaFilter(t *testing.T) { +func TestCatalogNodes_MetaFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) defer srv.Shutdown() @@ -496,6 +496,54 @@ func TestCatalogServices(t *testing.T) { } } +func TestCatalogServices_NodeMetaFilter(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + + // Register node + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + NodeMeta: map[string]string{ + "somekey": "somevalue", + }, + Service: &structs.NodeService{ + Service: "api", + }, + } + + var out struct{} + if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + req, err := http.NewRequest("GET", "/v1/catalog/services?node-meta=somekey:somevalue", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.CatalogServices(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + assertIndex(t, resp) + + services := obj.(structs.Services) + if len(services) != 1 { + t.Fatalf("bad: %v", obj) + } + if _, ok := services[args.Service.Service]; !ok { + t.Fatalf("bad: %v", services) + } +} + func TestCatalogServiceNodes(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) diff --git a/command/agent/http.go b/command/agent/http.go index 7070706f11..4c4a3ab2a2 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -584,6 +584,18 @@ func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) } } +// parseMetaFilter is used to parse the ?node-meta=key:value query parameter, used for +// filtering results to nodes with the given metadata key/value +func (s *HTTPServer) parseMetaFilter(req *http.Request, key *string, value *string) { + if filter, ok := req.URL.Query()["node-meta"]; ok && len(filter) > 0 { + pair := strings.SplitN(filter[0], ":", 2) + *key = pair[0] + if len(pair) == 2 { + *value = pair[1] + } + } +} + // parse is a convenience method for endpoints that need // to use both parseWait and parseDC. func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool { diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 237d1b826c..2887559ac3 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -196,7 +196,14 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I &reply.QueryMeta, state.GetQueryWatch("Services"), func() error { - index, services, err := state.Services() + var index uint64 + var services structs.Services + var err error + if args.NodeMetaKey != "" { + index, services, err = state.ServicesByNodeMeta(args.NodeMetaKey, args.NodeMetaValue) + } else { + index, services, err = state.Services() + } if err != nil { return err } diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index f0152eece0..604b4ee25e 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -1062,6 +1062,72 @@ func TestCatalog_ListServices(t *testing.T) { } } +func TestCatalog_ListServices_MetaFilter(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // Filter by a specific meta k/v pair + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + NodeMetaKey: "somekey", + NodeMetaValue: "somevalue", + } + var out structs.IndexedServices + err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out) + if err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Add a new node with the right meta k/v pair + node := &structs.Node{Node: "foo", Address: "127.0.0.1", Meta: map[string]string{"somekey": "somevalue"}} + if err := s1.fsm.State().EnsureNode(1, node); err != nil { + t.Fatalf("err: %v", err) + } + // Add a service to the new node + if err := s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil { + t.Fatalf("err: %v", err) + } + + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + if len(out.Services) != 1 { + t.Fatalf("bad: %v", out) + } + if out.Services["db"] == nil { + t.Fatalf("bad: %v", out.Services["db"]) + } + if len(out.Services["db"]) != 1 { + t.Fatalf("bad: %v", out) + } + if out.Services["db"][0] != "primary" { + t.Fatalf("bad: %v", out) + } + + // Now filter on a nonexistent meta k/v pair + args = structs.DCSpecificRequest{ + Datacenter: "dc1", + NodeMetaKey: "somekey", + NodeMetaValue: "invalid", + } + out = structs.IndexedServices{} + err = msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, &out) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Should get an empty list of nodes back + if len(out.Services) != 0 { + t.Fatalf("bad: %v", out.Services) + } +} + func TestCatalog_ListServices_Blocking(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 88f811e819..eb2fcb8cee 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -781,6 +781,56 @@ func (s *StateStore) Services() (uint64, structs.Services, error) { return idx, results, nil } +// Services returns all services, filtered by given node metadata. +func (s *StateStore) ServicesByNodeMeta(key, value string) (uint64, structs.Services, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...) + + // Retrieve all of the nodes with the meta k/v pair + nodes, err := tx.Get("nodes", "meta", key, value) + if err != nil { + return 0, nil, fmt.Errorf("failed nodes lookup: %s", err) + } + + // Populate the services map + unique := make(map[string]map[string]struct{}) + for node := nodes.Next(); node != nil; node = nodes.Next() { + n := node.(*structs.Node) + // List all the services on the node + services, err := tx.Get("services", "node", n.Node) + if err != nil { + return 0, nil, fmt.Errorf("failed querying services: %s", err) + } + + // Rip through the services and enumerate them and their unique set of + // tags. + for service := services.Next(); service != nil; service = services.Next() { + svc := service.(*structs.ServiceNode) + tags, ok := unique[svc.ServiceName] + if !ok { + unique[svc.ServiceName] = make(map[string]struct{}) + tags = unique[svc.ServiceName] + } + for _, tag := range svc.ServiceTags { + tags[tag] = struct{}{} + } + } + } + + // Generate the output structure. + var results = make(structs.Services) + for service, tags := range unique { + results[service] = make([]string, 0) + for tag, _ := range tags { + results[service] = append(results[service], tag) + } + } + return idx, results, nil +} + // ServiceNodes returns the nodes associated with a given service name. func (s *StateStore) ServiceNodes(serviceName string) (uint64, structs.ServiceNodes, error) { tx := s.db.Txn(false) diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index d932b1b24d..3277cb391a 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -775,10 +775,13 @@ func TestStateStore_GetNodesByMeta(t *testing.T) { } // Retrieve the node with role=client - _, nodes, err := s.NodesByMeta("role", "client") + idx, nodes, err := s.NodesByMeta("role", "client") if err != nil { t.Fatalf("err: %s", err) } + if idx != 1 { + t.Fatalf("bad index: %d", idx) + } // Only one node was returned if n := len(nodes); n != 1 { @@ -796,33 +799,14 @@ func TestStateStore_GetNodesByMeta(t *testing.T) { t.Fatalf("bad: %v != %v", nodes[0].Meta, node0.Meta) } - // Retrieve the node with role=server - _, nodes, err = s.NodesByMeta("role", "server") - if err != nil { - t.Fatalf("err: %s", err) - } - - // Only one node was returned - if n := len(nodes); n != 1 { - t.Fatalf("bad node count: %d", n) - } - - // Make sure the node is correct - if nodes[0].CreateIndex != 1 || nodes[0].ModifyIndex != 1 { - t.Fatalf("bad node index: %d, %d", nodes[0].CreateIndex, nodes[0].ModifyIndex) - } - if nodes[0].Node != "node1" { - t.Fatalf("bad: %#v", nodes[0]) - } - if !reflect.DeepEqual(nodes[0].Meta, node1.Meta) { - t.Fatalf("bad: %v != %v", nodes[0].Meta, node1.Meta) - } - // Retrieve both nodes via their common meta field - _, nodes, err = s.NodesByMeta("common", "1") + idx, nodes, err = s.NodesByMeta("common", "1") if err != nil { t.Fatalf("err: %s", err) } + if idx != 1 { + t.Fatalf("bad index: %d", idx) + } // All nodes were returned if n := len(nodes); n != 2 { @@ -1172,6 +1156,78 @@ func TestStateStore_Services(t *testing.T) { } } +func TestStateStore_ServicesByNodeMeta(t *testing.T) { + s := testStateStore(t) + + // Listing with no results returns nil + idx, res, err := s.ServicesByNodeMeta("somekey", "somevalue") + if idx != 0 || len(res) != 0 || err != nil { + t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) + } + + // Create some nodes and services in the state store + node0 := &structs.Node{Node: "node0", Address: "127.0.0.1", Meta: map[string]string{"role": "client", "common": "1"}} + if err := s.EnsureNode(0, node0); err != nil { + t.Fatalf("err: %v", err) + } + node1 := &structs.Node{Node: "node1", Address: "127.0.0.1", Meta: map[string]string{"role": "server", "common": "1"}} + if err := s.EnsureNode(1, node1); err != nil { + t.Fatalf("err: %v", err) + } + ns1 := &structs.NodeService{ + ID: "service1", + Service: "redis", + Tags: []string{"prod", "master"}, + Address: "1.1.1.1", + Port: 1111, + } + if err := s.EnsureService(2, "node0", ns1); err != nil { + t.Fatalf("err: %s", err) + } + ns2 := &structs.NodeService{ + ID: "service1", + Service: "redis", + Tags: []string{"prod", "slave"}, + Address: "1.1.1.1", + Port: 1111, + } + if err := s.EnsureService(3, "node1", ns2); err != nil { + t.Fatalf("err: %s", err) + } + + // Filter the services by the first node's meta value + idx, res, err = s.ServicesByNodeMeta("role", "client") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } + expected := structs.Services{ + "redis": []string{"prod", "master"}, + } + if !reflect.DeepEqual(res, expected) { + t.Fatalf("bad: %v %v", res, expected) + } + + // Get all services using the common meta value + idx, res, err = s.ServicesByNodeMeta("common", "1") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 3 { + t.Fatalf("bad index: %d", idx) + } + expected = structs.Services{ + "redis": []string{"prod", "master", "slave"}, + } + sort.Strings(res["redis"]) + sort.Strings(expected["redis"]) + if !reflect.DeepEqual(res, expected) { + t.Fatalf("bad: %v %v", res, expected) + } +} + func TestStateStore_ServiceNodes(t *testing.T) { s := testStateStore(t) diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 97030f285d..bb4f6472df 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -292,8 +292,8 @@ type Nodes []*Node // Maps service name to available tags type Services map[string][]string -// ServiceNode represents a node that is part of a service. Address and -// TaggedAddresses are node-related fields that are always empty in the state +// ServiceNode represents a node that is part of a service. Address, TaggedAddresses, +// and NodeMeta are node-related fields that are always empty in the state // store and are filled in on the way out by parseServiceNodes(). This is also // why PartialClone() skips them, because we know they are blank already so it // would be a waste of time to copy them. diff --git a/consul/structs/structs_test.go b/consul/structs/structs_test.go index a66d4b176e..13714e993e 100644 --- a/consul/structs/structs_test.go +++ b/consul/structs/structs_test.go @@ -110,12 +110,18 @@ func TestStructs_RegisterRequest_ChangesNode(t *testing.T) { Node: "test", Address: "127.0.0.1", TaggedAddresses: make(map[string]string), + NodeMeta: map[string]string{ + "role": "server", + }, } node := &Node{ Node: "test", Address: "127.0.0.1", TaggedAddresses: make(map[string]string), + Meta: map[string]string{ + "role": "server", + }, } check := func(twiddle, restore func()) { @@ -137,6 +143,7 @@ func TestStructs_RegisterRequest_ChangesNode(t *testing.T) { check(func() { req.Node = "nope" }, func() { req.Node = "test" }) check(func() { req.Address = "127.0.0.2" }, func() { req.Address = "127.0.0.1" }) check(func() { req.TaggedAddresses["wan"] = "nope" }, func() { delete(req.TaggedAddresses, "wan") }) + check(func() { req.NodeMeta["invalid"] = "nope" }, func() { delete(req.NodeMeta, "invalid")}) if !req.ChangesNode(nil) { t.Fatalf("should change")