From 5acd69b4fc43f943739ad98d4490160f9f574d74 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Fri, 13 Jan 2017 20:08:43 -0500 Subject: [PATCH] Add node metadata filtering to remaining health/catalog endpoints --- api/catalog_test.go | 30 ++ api/health_test.go | 97 +++++ command/agent/catalog_endpoint.go | 1 + command/agent/catalog_endpoint_test.go | 66 ++++ command/agent/health_endpoint.go | 3 + command/agent/health_endpoint_test.go | 169 +++++++++ consul/catalog_endpoint.go | 9 + consul/catalog_endpoint_test.go | 104 +++++- consul/health_endpoint.go | 27 +- consul/health_endpoint_test.go | 330 ++++++++++++++++++ consul/state/catalog.go | 67 ++++ consul/state/catalog_test.go | 140 ++++++++ consul/state/state_store.go | 2 + consul/state/state_store_test.go | 6 +- consul/structs/structs.go | 27 +- .../docs/agent/http/catalog.html.markdown | 5 + .../docs/agent/http/health.html.markdown | 15 + 17 files changed, 1085 insertions(+), 13 deletions(-) diff --git a/api/catalog_test.go b/api/catalog_test.go index 669e861132..ed70e38586 100644 --- a/api/catalog_test.go +++ b/api/catalog_test.go @@ -222,6 +222,36 @@ func TestCatalog_Service(t *testing.T) { }) } +func TestCatalog_Service_NodeMetaFilter(t *testing.T) { + t.Parallel() + meta := map[string]string{"somekey": "somevalue"} + c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) { + conf.NodeMeta = meta + }) + defer s.Stop() + + catalog := c.Catalog() + + testutil.WaitForResult(func() (bool, error) { + services, meta, err := catalog.Service("consul", "", &QueryOptions{NodeMeta: meta}) + if err != nil { + return false, err + } + + if meta.LastIndex == 0 { + return false, fmt.Errorf("Bad: %v", meta) + } + + if len(services) == 0 { + return false, fmt.Errorf("Bad: %v", services) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + func TestCatalog_Node(t *testing.T) { t.Parallel() c, s := makeClient(t) diff --git a/api/health_test.go b/api/health_test.go index e971d8023b..4140a2811a 100644 --- a/api/health_test.go +++ b/api/health_test.go @@ -208,6 +208,46 @@ func TestHealth_Checks(t *testing.T) { }) } +func TestHealth_Checks_NodeMetaFilter(t *testing.T) { + t.Parallel() + meta := map[string]string{"somekey": "somevalue"} + c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) { + conf.NodeMeta = meta + }) + defer s.Stop() + + agent := c.Agent() + health := c.Health() + + // Make a service with a check + reg := &AgentServiceRegistration{ + Name: "foo", + Check: &AgentServiceCheck{ + TTL: "15s", + }, + } + if err := agent.ServiceRegister(reg); err != nil { + t.Fatalf("err: %v", err) + } + defer agent.ServiceDeregister("foo") + + testutil.WaitForResult(func() (bool, error) { + checks, meta, err := health.Checks("foo", &QueryOptions{NodeMeta: meta}) + if err != nil { + return false, err + } + if meta.LastIndex == 0 { + return false, fmt.Errorf("bad: %v", meta) + } + if len(checks) == 0 { + return false, fmt.Errorf("Bad: %v", checks) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + func TestHealth_Service(t *testing.T) { c, s := makeClient(t) defer s.Stop() @@ -235,6 +275,36 @@ func TestHealth_Service(t *testing.T) { }) } +func TestHealth_Service_NodeMetaFilter(t *testing.T) { + meta := map[string]string{"somekey": "somevalue"} + c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) { + conf.NodeMeta = meta + }) + defer s.Stop() + + health := c.Health() + + testutil.WaitForResult(func() (bool, error) { + // consul service should always exist... + checks, meta, err := health.Service("consul", "", true, &QueryOptions{NodeMeta: meta}) + if err != nil { + return false, err + } + if meta.LastIndex == 0 { + return false, fmt.Errorf("bad: %v", meta) + } + if len(checks) == 0 { + return false, fmt.Errorf("Bad: %v", checks) + } + if _, ok := checks[0].Node.TaggedAddresses["wan"]; !ok { + return false, fmt.Errorf("Bad: %v", checks[0].Node) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + func TestHealth_State(t *testing.T) { t.Parallel() c, s := makeClient(t) @@ -258,3 +328,30 @@ func TestHealth_State(t *testing.T) { t.Fatalf("err: %s", err) }) } + +func TestHealth_State_NodeMetaFilter(t *testing.T) { + t.Parallel() + meta := map[string]string{"somekey": "somevalue"} + c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) { + conf.NodeMeta = meta + }) + defer s.Stop() + + health := c.Health() + + testutil.WaitForResult(func() (bool, error) { + checks, meta, err := health.State("any", &QueryOptions{NodeMeta: meta}) + if err != nil { + return false, err + } + if meta.LastIndex == 0 { + return false, fmt.Errorf("bad: %v", meta) + } + if len(checks) == 0 { + return false, fmt.Errorf("Bad: %v", checks) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index 913b3af1ef..4c7f59db97 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -103,6 +103,7 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req // Set default DC args := structs.ServiceSpecificRequest{} s.parseSource(req, &args.Source) + args.NodeMetaFilters = s.parseMetaFilter(req) if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index 62cd3bd904..d0d636fc85 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -608,6 +608,72 @@ func TestCatalogServiceNodes(t *testing.T) { } } +func TestCatalogServiceNodes_NodeMetaFilter(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + + // Make sure an empty list is returned, not a nil + { + req, err := http.NewRequest("GET", "/v1/catalog/service/api?node-meta=somekey:somevalue", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.CatalogServiceNodes(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + assertIndex(t, resp) + + nodes := obj.(structs.ServiceNodes) + if nodes == nil || len(nodes) != 0 { + t.Fatalf("bad: %v", obj) + } + } + + // Register node + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + NodeMeta: map[string]string{ + "somekey": "somevalue", + }, + Service: &structs.NodeService{ + Service: "api", + }, + } + + var out struct{} + if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + req, err := http.NewRequest("GET", "/v1/catalog/service/api?node-meta=somekey:somevalue", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.CatalogServiceNodes(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + assertIndex(t, resp) + + nodes := obj.(structs.ServiceNodes) + if len(nodes) != 1 { + t.Fatalf("bad: %v", obj) + } +} + func TestCatalogServiceNodes_WanTranslation(t *testing.T) { dir1, srv1 := makeHTTPServerWithConfig(t, func(c *Config) { diff --git a/command/agent/health_endpoint.go b/command/agent/health_endpoint.go index eeb26edbe4..f201c3bc19 100644 --- a/command/agent/health_endpoint.go +++ b/command/agent/health_endpoint.go @@ -11,6 +11,7 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req // Set default DC args := structs.ChecksInStateRequest{} s.parseSource(req, &args.Source) + args.NodeMetaFilters = s.parseMetaFilter(req) if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } @@ -70,6 +71,7 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req // Set default DC args := structs.ServiceSpecificRequest{} s.parseSource(req, &args.Source) + args.NodeMetaFilters = s.parseMetaFilter(req) if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } @@ -100,6 +102,7 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ // Set default DC args := structs.ServiceSpecificRequest{} s.parseSource(req, &args.Source) + args.NodeMetaFilters = s.parseMetaFilter(req) if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } diff --git a/command/agent/health_endpoint_test.go b/command/agent/health_endpoint_test.go index 02bf3faa64..a0f2a62e6e 100644 --- a/command/agent/health_endpoint_test.go +++ b/command/agent/health_endpoint_test.go @@ -65,6 +65,49 @@ func TestHealthChecksInState(t *testing.T) { }) } +func TestHealthChecksInState_NodeMetaFilter(t *testing.T) { + httpTest(t, func(srv *HTTPServer) { + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.1", + NodeMeta: map[string]string{"somekey": "somevalue"}, + Check: &structs.HealthCheck{ + Node: "bar", + Name: "node check", + Status: structs.HealthCritical, + }, + } + var out struct{} + if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + req, err := http.NewRequest("GET", "/v1/health/state/critical?node-meta=somekey:somevalue", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + resp := httptest.NewRecorder() + obj, err := srv.HealthChecksInState(resp, req) + if err != nil { + return false, err + } + if err := checkIndex(resp); err != nil { + return false, err + } + + // Should be 1 health check for the server + nodes := obj.(structs.HealthChecks) + if len(nodes) != 1 { + return false, fmt.Errorf("bad: %v", obj) + } + return true, nil + }, func(err error) { t.Fatalf("err: %v", err) }) + }) +} + func TestHealthChecksInState_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) @@ -258,6 +301,69 @@ func TestHealthServiceChecks(t *testing.T) { } } +func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + + req, err := http.NewRequest("GET", "/v1/health/checks/consul?dc=dc1&node-meta=somekey:somevalue", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.HealthServiceChecks(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + assertIndex(t, resp) + + // Should be a non-nil empty list + nodes := obj.(structs.HealthChecks) + if nodes == nil || len(nodes) != 0 { + t.Fatalf("bad: %v", obj) + } + + // Create a service check + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: srv.agent.config.NodeName, + Address: "127.0.0.1", + NodeMeta: map[string]string{"somekey": "somevalue"}, + Check: &structs.HealthCheck{ + Node: srv.agent.config.NodeName, + Name: "consul check", + ServiceID: "consul", + }, + } + + var out struct{} + if err = srv.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + req, err = http.NewRequest("GET", "/v1/health/checks/consul?dc=dc1&node-meta=somekey:somevalue", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = httptest.NewRecorder() + obj, err = srv.HealthServiceChecks(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + assertIndex(t, resp) + + // Should be 1 health check for consul + nodes = obj.(structs.HealthChecks) + if len(nodes) != 1 { + t.Fatalf("bad: %v", obj) + } +} + func TestHealthServiceChecks_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) @@ -429,6 +535,69 @@ func TestHealthServiceNodes(t *testing.T) { } } +func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + + req, err := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp := httptest.NewRecorder() + obj, err := srv.HealthServiceNodes(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + assertIndex(t, resp) + + // Should be a non-nil empty list + nodes := obj.(structs.CheckServiceNodes) + if nodes == nil || len(nodes) != 0 { + t.Fatalf("bad: %v", obj) + } + + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.1", + NodeMeta: map[string]string{"somekey": "somevalue"}, + Service: &structs.NodeService{ + ID: "test", + Service: "test", + }, + } + + var out struct{} + if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + req, err = http.NewRequest("GET", "/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + resp = httptest.NewRecorder() + obj, err = srv.HealthServiceNodes(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + assertIndex(t, resp) + + // Should be a non-nil empty list for checks + nodes = obj.(structs.CheckServiceNodes) + if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 { + t.Fatalf("bad: %v", obj) + } +} + func TestHealthServiceNodes_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index c19082a8f5..55c28c0045 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -243,6 +243,15 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru return err } reply.Index, reply.ServiceNodes = index, services + if len(args.NodeMetaFilters) > 0 { + var filtered structs.ServiceNodes + for _, service := range services { + if structs.SatisfiesMetaFilters(service.NodeMeta, args.NodeMetaFilters) { + filtered = append(filtered, service) + } + } + reply.ServiceNodes = filtered + } if err := c.srv.filterACL(args.Token, reply); err != nil { return err } diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index b983eb59a2..6749d5b203 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -592,7 +592,7 @@ func TestCatalog_ListNodes(t *testing.T) { } } -func TestCatalog_ListNodes_MetaFilter(t *testing.T) { +func TestCatalog_ListNodes_NodeMetaFilter(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -1060,7 +1060,7 @@ func TestCatalog_ListServices(t *testing.T) { } } -func TestCatalog_ListServices_MetaFilter(t *testing.T) { +func TestCatalog_ListServices_NodeMetaFilter(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -1308,6 +1308,106 @@ func TestCatalog_ListServiceNodes(t *testing.T) { } } +func TestCatalog_ListServiceNodes_NodeMetaFilter(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Add 2 nodes with specific meta maps + node := &structs.Node{Node: "foo", Address: "127.0.0.1", Meta: map[string]string{"somekey": "somevalue", "common": "1"}} + if err := s1.fsm.State().EnsureNode(1, node); err != nil { + t.Fatalf("err: %v", err) + } + node2 := &structs.Node{Node: "bar", Address: "127.0.0.2", Meta: map[string]string{"common": "1"}} + if err := s1.fsm.State().EnsureNode(2, node2); err != nil { + t.Fatalf("err: %v", err) + } + if err := s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil { + t.Fatalf("err: %v", err) + } + if err := s1.fsm.State().EnsureService(4, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000}); err != nil { + t.Fatalf("err: %v", err) + } + + cases := []struct { + filters map[string]string + tag string + services structs.ServiceNodes + }{ + // Basic meta filter + { + filters: map[string]string{"somekey": "somevalue"}, + services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}}, + }, + // Basic meta filter, tag + { + filters: map[string]string{"somekey": "somevalue"}, + tag: "primary", + services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}}, + }, + // Common meta filter + { + filters: map[string]string{"common": "1"}, + services: structs.ServiceNodes{ + &structs.ServiceNode{Node: "bar", ServiceID: "db2"}, + &structs.ServiceNode{Node: "foo", ServiceID: "db"}, + }, + }, + // Common meta filter, tag + { + filters: map[string]string{"common": "1"}, + tag: "secondary", + services: structs.ServiceNodes{ + &structs.ServiceNode{Node: "bar", ServiceID: "db2"}, + }, + }, + // Invalid meta filter + { + filters: map[string]string{"invalid": "nope"}, + services: structs.ServiceNodes{}, + }, + // Multiple filter values + { + filters: map[string]string{"somekey": "somevalue", "common": "1"}, + services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}}, + }, + // Multiple filter values, tag + { + filters: map[string]string{"somekey": "somevalue", "common": "1"}, + tag: "primary", + services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}}, + }, + } + + for _, tc := range cases { + args := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + NodeMetaFilters: tc.filters, + ServiceName: "db", + ServiceTag: tc.tag, + TagFilter: tc.tag != "", + } + var out structs.IndexedServiceNodes + if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + if len(out.ServiceNodes) != len(tc.services) { + t.Fatalf("bad: %v", out) + } + + for i, serviceNode := range out.ServiceNodes { + if serviceNode.Node != tc.services[i].Node || serviceNode.ServiceID != tc.services[i].ServiceID { + t.Fatalf("bad: %v, %v filters: %v", serviceNode, tc.services[i], tc.filters) + } + } + } +} + func TestCatalog_ListServiceNodes_DistanceSort(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index e5aa5fec24..242309b290 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -25,7 +25,14 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, &reply.QueryMeta, state.GetQueryWatch("ChecksInState"), func() error { - index, checks, err := state.ChecksInState(args.State) + var index uint64 + var checks structs.HealthChecks + var err error + if len(args.NodeMetaFilters) > 0 { + index, checks, err = state.ChecksInStateByNodeMeta(args.State, args.NodeMetaFilters) + } else { + index, checks, err = state.ChecksInState(args.State) + } if err != nil { return err } @@ -80,7 +87,14 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, &reply.QueryMeta, state.GetQueryWatch("ServiceChecks"), func() error { - index, checks, err := state.ServiceChecks(args.ServiceName) + var index uint64 + var checks structs.HealthChecks + var err error + if len(args.NodeMetaFilters) > 0 { + index, checks, err = state.ServiceChecksByNodeMeta(args.ServiceName, args.NodeMetaFilters) + } else { + index, checks, err = state.ServiceChecks(args.ServiceName) + } if err != nil { return err } @@ -123,6 +137,15 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc } reply.Index, reply.Nodes = index, nodes + if len(args.NodeMetaFilters) > 0 { + var filtered structs.CheckServiceNodes + for _, node := range nodes { + if structs.SatisfiesMetaFilters(node.Node.Meta, args.NodeMetaFilters) { + filtered = append(filtered, node) + } + } + reply.Nodes = filtered + } if err := h.srv.filterACL(args.Token, reply); err != nil { return err } diff --git a/consul/health_endpoint_test.go b/consul/health_endpoint_test.go index b5db5b9f21..dec7f61a26 100644 --- a/consul/health_endpoint_test.go +++ b/consul/health_endpoint_test.go @@ -57,6 +57,101 @@ func TestHealth_ChecksInState(t *testing.T) { } } +func TestHealth_ChecksInState_NodeMetaFilter(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + arg := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + NodeMeta: map[string]string{ + "somekey": "somevalue", + "common": "1", + }, + Check: &structs.HealthCheck{ + Name: "memory utilization", + Status: structs.HealthPassing, + }, + } + var out struct{} + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.2", + NodeMeta: map[string]string{ + "common": "1", + }, + Check: &structs.HealthCheck{ + Name: "disk space", + Status: structs.HealthPassing, + }, + } + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + cases := []struct { + filters map[string]string + checkNames []string + }{ + // Get foo's check by its unique meta value + { + filters: map[string]string{"somekey": "somevalue"}, + checkNames: []string{"memory utilization"}, + }, + // Get both foo/bar's checks by their common meta value + { + filters: map[string]string{"common": "1"}, + checkNames: []string{"disk space", "memory utilization"}, + }, + // Use an invalid meta value, should get empty result + { + filters: map[string]string{"invalid": "nope"}, + checkNames: []string{}, + }, + // Use multiple filters to get foo's check + { + filters: map[string]string{ + "somekey": "somevalue", + "common": "1", + }, + checkNames: []string{"memory utilization"}, + }, + } + + for _, tc := range cases { + var out structs.IndexedHealthChecks + inState := structs.ChecksInStateRequest{ + Datacenter: "dc1", + NodeMetaFilters: tc.filters, + State: structs.HealthPassing, + } + if err := msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &inState, &out); err != nil { + t.Fatalf("err: %v", err) + } + + checks := out.HealthChecks + if len(checks) != len(tc.checkNames) { + t.Fatalf("Bad: %v, %v", checks, tc.checkNames) + } + + for i, check := range checks { + if tc.checkNames[i] != check.Name { + t.Fatalf("Bad: %v %v", checks, tc.checkNames) + } + } + } +} + func TestHealth_ChecksInState_DistanceSort(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) @@ -221,6 +316,111 @@ func TestHealth_ServiceChecks(t *testing.T) { } } +func TestHealth_ServiceChecks_NodeMetaFilter(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + arg := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + NodeMeta: map[string]string{ + "somekey": "somevalue", + "common": "1", + }, + Service: &structs.NodeService{ + ID: "db", + Service: "db", + }, + Check: &structs.HealthCheck{ + Name: "memory utilization", + Status: structs.HealthPassing, + ServiceID: "db", + }, + } + var out struct{} + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.2", + NodeMeta: map[string]string{ + "common": "1", + }, + Service: &structs.NodeService{ + ID: "db", + Service: "db", + }, + Check: &structs.HealthCheck{ + Name: "disk space", + Status: structs.HealthPassing, + ServiceID: "db", + }, + } + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + cases := []struct { + filters map[string]string + checkNames []string + }{ + // Get foo's check by its unique meta value + { + filters: map[string]string{"somekey": "somevalue"}, + checkNames: []string{"memory utilization"}, + }, + // Get both foo/bar's checks by their common meta value + { + filters: map[string]string{"common": "1"}, + checkNames: []string{"disk space", "memory utilization"}, + }, + // Use an invalid meta value, should get empty result + { + filters: map[string]string{"invalid": "nope"}, + checkNames: []string{}, + }, + // Use multiple filters to get foo's check + { + filters: map[string]string{ + "somekey": "somevalue", + "common": "1", + }, + checkNames: []string{"memory utilization"}, + }, + } + + for _, tc := range cases { + var out structs.IndexedHealthChecks + inState := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + NodeMetaFilters: tc.filters, + ServiceName: "db", + } + if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &inState, &out); err != nil { + t.Fatalf("err: %v", err) + } + + checks := out.HealthChecks + if len(checks) != len(tc.checkNames) { + t.Fatalf("Bad: %v, %v", checks, tc.checkNames) + } + + for i, check := range checks { + if tc.checkNames[i] != check.Name { + t.Fatalf("Bad: %v %v", checks, tc.checkNames) + } + } + } +} + func TestHealth_ServiceChecks_DistanceSort(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) @@ -392,6 +592,136 @@ func TestHealth_ServiceNodes(t *testing.T) { } } +func TestHealth_ServiceNodes_NodeMetaFilter(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + arg := structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + NodeMeta: map[string]string{ + "somekey": "somevalue", + "common": "1", + }, + Service: &structs.NodeService{ + ID: "db", + Service: "db", + }, + Check: &structs.HealthCheck{ + Name: "memory utilization", + Status: structs.HealthPassing, + ServiceID: "db", + }, + } + var out struct{} + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + arg = structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.2", + NodeMeta: map[string]string{ + "common": "1", + }, + Service: &structs.NodeService{ + ID: "db", + Service: "db", + }, + Check: &structs.HealthCheck{ + Name: "disk space", + Status: structs.HealthWarning, + ServiceID: "db", + }, + } + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil { + t.Fatalf("err: %v", err) + } + + cases := []struct { + filters map[string]string + nodes structs.CheckServiceNodes + }{ + // Get foo's check by its unique meta value + { + filters: map[string]string{"somekey": "somevalue"}, + nodes: structs.CheckServiceNodes{ + structs.CheckServiceNode{ + Node: &structs.Node{Node: "foo"}, + Checks: structs.HealthChecks{&structs.HealthCheck{Name: "memory utilization"}}, + }, + }, + }, + // Get both foo/bar's checks by their common meta value + { + filters: map[string]string{"common": "1"}, + nodes: structs.CheckServiceNodes{ + structs.CheckServiceNode{ + Node: &structs.Node{Node: "bar"}, + Checks: structs.HealthChecks{&structs.HealthCheck{Name: "disk space"}}, + }, + structs.CheckServiceNode{ + Node: &structs.Node{Node: "foo"}, + Checks: structs.HealthChecks{&structs.HealthCheck{Name: "memory utilization"}}, + }, + }, + }, + // Use an invalid meta value, should get empty result + { + filters: map[string]string{"invalid": "nope"}, + nodes: structs.CheckServiceNodes{}, + }, + // Use multiple filters to get foo's check + { + filters: map[string]string{ + "somekey": "somevalue", + "common": "1", + }, + nodes: structs.CheckServiceNodes{ + structs.CheckServiceNode{ + Node: &structs.Node{Node: "foo"}, + Checks: structs.HealthChecks{&structs.HealthCheck{Name: "memory utilization"}}, + }, + }, + }, + } + + for _, tc := range cases { + var out structs.IndexedCheckServiceNodes + req := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + NodeMetaFilters: tc.filters, + ServiceName: "db", + } + if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } + + if len(out.Nodes) != len(tc.nodes) { + t.Fatalf("bad: %v, %v, filters: %v", out.Nodes, tc.nodes, tc.filters) + } + + for i, node := range out.Nodes { + checks := tc.nodes[i].Checks + if len(node.Checks) != len(checks) { + t.Fatalf("bad: %v, %v, filters: %v", node.Checks, checks, tc.filters) + } + for j, check := range node.Checks { + if check.Name != checks[j].Name { + t.Fatalf("bad: %v, %v, filters: %v", check, checks[j], tc.filters) + } + } + } + } +} + func TestHealth_ServiceNodes_DistanceSort(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/state/catalog.go b/consul/state/catalog.go index 56191dc91f..b5bcbd780d 100644 --- a/consul/state/catalog.go +++ b/consul/state/catalog.go @@ -877,6 +877,24 @@ func (s *StateStore) ServiceChecks(serviceName string) (uint64, structs.HealthCh return s.parseChecks(idx, checks) } +// ServiceChecksByNodeMeta is used to get all checks associated with a +// given service ID. The query is performed against a service +// _name_ instead of a service ID. +func (s *StateStore) ServiceChecksByNodeMeta(serviceName string, filters map[string]string) (uint64, structs.HealthChecks, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := maxIndexTxn(tx, s.getWatchTables("ServiceChecksByNodeMeta")...) + + // Return the checks. + checks, err := tx.Get("checks", "service", serviceName) + if err != nil { + return 0, nil, fmt.Errorf("failed check lookup: %s", err) + } + return s.parseChecksByNodeMeta(idx, checks, tx, filters) +} + // ChecksInState is used to query the state store for all checks // which are in the provided state. func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks, error) { @@ -903,6 +921,34 @@ func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks, return s.parseChecks(idx, checks) } +// ChecksInState is used to query the state store for all checks +// which are in the provided state. +func (s *StateStore) ChecksInStateByNodeMeta(state string, filters map[string]string) (uint64, structs.HealthChecks, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := maxIndexTxn(tx, s.getWatchTables("ChecksInStateByNodeMeta")...) + + // Query all checks if HealthAny is passed + var checks memdb.ResultIterator + var err error + if state == structs.HealthAny { + checks, err = tx.Get("checks", "status") + if err != nil { + return 0, nil, fmt.Errorf("failed check lookup: %s", err) + } + } else { + // Any other state we need to query for explicitly + checks, err = tx.Get("checks", "status", state) + if err != nil { + return 0, nil, fmt.Errorf("failed check lookup: %s", err) + } + } + + return s.parseChecksByNodeMeta(idx, checks, tx, filters) +} + // parseChecks is a helper function used to deduplicate some // repetitive code for returning health checks. func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64, structs.HealthChecks, error) { @@ -914,6 +960,27 @@ func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64, return idx, results, nil } +// parseChecksByNodeMeta is a helper function used to deduplicate some +// repetitive code for returning health checks filtered by node metadata fields. +func (s *StateStore) parseChecksByNodeMeta(idx uint64, iter memdb.ResultIterator, tx *memdb.Txn, + filters map[string]string) (uint64, structs.HealthChecks, error) { + var results structs.HealthChecks + for check := iter.Next(); check != nil; check = iter.Next() { + healthCheck := check.(*structs.HealthCheck) + node, err := tx.First("nodes", "id", healthCheck.Node) + if err != nil { + return 0, nil, fmt.Errorf("failed node lookup: %s", err) + } + if node == nil { + return 0, nil, ErrMissingNode + } + if structs.SatisfiesMetaFilters(node.(*structs.Node).Meta, filters) { + results = append(results, healthCheck) + } + } + return idx, results, nil +} + // DeleteCheck is used to delete a health check registration. func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID) error { tx := s.db.Txn(true) diff --git a/consul/state/catalog_test.go b/consul/state/catalog_test.go index 43631e19a5..bccc7482d0 100644 --- a/consul/state/catalog_test.go +++ b/consul/state/catalog_test.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/types" ) func TestStateStore_EnsureRegistration(t *testing.T) { @@ -1546,6 +1547,63 @@ func TestStateStore_ServiceChecks(t *testing.T) { } } +func TestStateStore_ServiceChecksByNodeMeta(t *testing.T) { + s := testStateStore(t) + + // Create the first node and service with some checks + testRegisterNodeWithMeta(t, s, 0, "node1", map[string]string{"somekey": "somevalue", "common": "1"}) + testRegisterService(t, s, 1, "node1", "service1") + testRegisterCheck(t, s, 2, "node1", "service1", "check1", structs.HealthPassing) + testRegisterCheck(t, s, 3, "node1", "service1", "check2", structs.HealthPassing) + + // Create a second node/service with a different set of checks + testRegisterNodeWithMeta(t, s, 4, "node2", map[string]string{"common": "1"}) + testRegisterService(t, s, 5, "node2", "service1") + testRegisterCheck(t, s, 6, "node2", "service1", "check3", structs.HealthPassing) + + cases := []struct { + filters map[string]string + checks []string + }{ + // Basic meta filter + { + filters: map[string]string{"somekey": "somevalue"}, + checks: []string{"check1", "check2"}, + }, + // Common meta field + { + filters: map[string]string{"common": "1"}, + checks: []string{"check1", "check2", "check3"}, + }, + // Invalid meta filter + { + filters: map[string]string{"invalid": "nope"}, + checks: []string{}, + }, + // Multiple filters + { + filters: map[string]string{"somekey": "somevalue", "common": "1"}, + checks: []string{"check1", "check2"}, + }, + } + + // Try querying for all checks associated with service1 + for _, tc := range cases { + _, checks, err := s.ServiceChecksByNodeMeta("service1", tc.filters) + if err != nil { + t.Fatalf("err: %s", err) + } + if len(checks) != len(tc.checks) { + t.Fatalf("bad checks: %#v", checks) + } + for i, check := range checks { + if check.CheckID != types.CheckID(tc.checks[i]) { + t.Fatalf("bad checks: %#v", checks) + } + } + } +} + func TestStateStore_ChecksInState(t *testing.T) { s := testStateStore(t) @@ -1585,6 +1643,88 @@ func TestStateStore_ChecksInState(t *testing.T) { } } +func TestStateStore_ChecksInStateByNodeMeta(t *testing.T) { + s := testStateStore(t) + + // Querying with no results returns nil + idx, res, err := s.ChecksInStateByNodeMeta(structs.HealthPassing, nil) + if idx != 0 || res != nil || err != nil { + t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) + } + + // Register a node with checks in varied states + testRegisterNodeWithMeta(t, s, 0, "node1", map[string]string{"somekey": "somevalue", "common": "1"}) + testRegisterCheck(t, s, 1, "node1", "", "check1", structs.HealthPassing) + testRegisterCheck(t, s, 2, "node1", "", "check2", structs.HealthCritical) + + testRegisterNodeWithMeta(t, s, 3, "node2", map[string]string{"common": "1"}) + testRegisterCheck(t, s, 4, "node2", "", "check3", structs.HealthPassing) + + cases := []struct { + filters map[string]string + state string + checks []string + }{ + // Basic meta filter, any status + { + filters: map[string]string{"somekey": "somevalue"}, + state: structs.HealthAny, + checks: []string{"check2", "check1"}, + }, + // Basic meta filter, only passing + { + filters: map[string]string{"somekey": "somevalue"}, + state: structs.HealthPassing, + checks: []string{"check1"}, + }, + // Common meta filter, any status + { + filters: map[string]string{"common": "1"}, + state: structs.HealthAny, + checks: []string{"check2", "check1", "check3"}, + }, + // Common meta filter, only passing + { + filters: map[string]string{"common": "1"}, + state: structs.HealthPassing, + checks: []string{"check1", "check3"}, + }, + // Invalid meta filter + { + filters: map[string]string{"invalid": "nope"}, + checks: []string{}, + }, + // Multiple filters, any status + { + filters: map[string]string{"somekey": "somevalue", "common": "1"}, + state: structs.HealthAny, + checks: []string{"check2", "check1"}, + }, + // Multiple filters, only passing + { + filters: map[string]string{"somekey": "somevalue", "common": "1"}, + state: structs.HealthPassing, + checks: []string{"check1"}, + }, + } + + // Try querying for all checks associated with service1 + for _, tc := range cases { + _, checks, err := s.ChecksInStateByNodeMeta(tc.state, tc.filters) + if err != nil { + t.Fatalf("err: %s", err) + } + if len(checks) != len(tc.checks) { + t.Fatalf("bad checks: %#v", checks) + } + for i, check := range checks { + if check.CheckID != types.CheckID(tc.checks[i]) { + t.Fatalf("bad checks: %#v, %v", checks, tc.checks) + } + } + } +} + func TestStateStore_DeleteCheck(t *testing.T) { s := testStateStore(t) diff --git a/consul/state/state_store.go b/consul/state/state_store.go index b9b6467cbc..dc72726564 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -222,6 +222,8 @@ func (s *StateStore) getWatchTables(method string) []string { return []string{"nodes", "services"} case "NodeCheck", "NodeChecks", "ServiceChecks", "ChecksInState": return []string{"checks"} + case "ChecksInStateByNodeMeta", "ServiceChecksByNodeMeta": + return []string{"nodes", "checks"} case "CheckServiceNodes", "NodeInfo", "NodeDump": return []string{"nodes", "services", "checks"} case "SessionGet", "SessionList", "NodeSessions": diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 687cc4c582..5a3c781719 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -35,7 +35,11 @@ func testStateStore(t *testing.T) *StateStore { } func testRegisterNode(t *testing.T, s *StateStore, idx uint64, nodeID string) { - node := &structs.Node{Node: nodeID} + testRegisterNodeWithMeta(t, s, idx, nodeID, nil) +} + +func testRegisterNodeWithMeta(t *testing.T, s *StateStore, idx uint64, nodeID string, meta map[string]string) { + node := &structs.Node{Node: nodeID, Meta: meta} if err := s.EnsureNode(idx, node); err != nil { t.Fatalf("err: %s", err) } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index e34a8b6353..bf09ca3770 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -241,11 +241,12 @@ func (r *DCSpecificRequest) RequestDatacenter() string { // ServiceSpecificRequest is used to query about a specific service type ServiceSpecificRequest struct { - Datacenter string - ServiceName string - ServiceTag string - TagFilter bool // Controls tag filtering - Source QuerySource + Datacenter string + NodeMetaFilters map[string]string + ServiceName string + ServiceTag string + TagFilter bool // Controls tag filtering + Source QuerySource QueryOptions } @@ -266,9 +267,10 @@ func (r *NodeSpecificRequest) RequestDatacenter() string { // ChecksInStateRequest is used to query for nodes in a state type ChecksInStateRequest struct { - Datacenter string - State string - Source QuerySource + Datacenter string + NodeMetaFilters map[string]string + State string + Source QuerySource QueryOptions } @@ -287,6 +289,15 @@ type Node struct { } type Nodes []*Node +func SatisfiesMetaFilters(meta map[string]string, filters map[string]string) bool { + for key, value := range filters { + if v, ok := meta[key]; !ok || v != value { + return false + } + } + return true +} + // Used to return information about a provided services. // Maps service name to available tags type Services map[string][]string diff --git a/website/source/docs/agent/http/catalog.html.markdown b/website/source/docs/agent/http/catalog.html.markdown index 0a7a066295..e98a9de208 100644 --- a/website/source/docs/agent/http/catalog.html.markdown +++ b/website/source/docs/agent/http/catalog.html.markdown @@ -276,6 +276,11 @@ the node list in ascending order based on the estimated round trip time from that node. Passing `?near=_agent` will use the agent's node for the sort. +In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be +provided with a desired node metadata key/value pair of the form `key:value`. +This parameter can be specified multiple times, and will filter the results to +service entries on nodes with the specified key/value pair(s). + It returns a JSON body like this: ```javascript diff --git a/website/source/docs/agent/http/health.html.markdown b/website/source/docs/agent/http/health.html.markdown index e9f18c6418..69bfc332ee 100644 --- a/website/source/docs/agent/http/health.html.markdown +++ b/website/source/docs/agent/http/health.html.markdown @@ -75,6 +75,11 @@ the node list in ascending order based on the estimated round trip time from that node. Passing `?near=_agent` will use the agent's node for the sort. +In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be +provided with a desired node metadata key/value pair of the form `key:value`. +This parameter can be specified multiple times, and will filter the results to +health checks on nodes with the specified key/value pair(s). + It returns a JSON body like this: ```javascript @@ -112,6 +117,11 @@ Providing the `?passing` query parameter, added in Consul 0.2, will filter results to only nodes with all checks in the `passing` state. This can be used to avoid extra filtering logic on the client side. +In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be +provided with a desired node metadata key/value pair of the form `key:value`. +This parameter can be specified multiple times, and will filter the results to +nodes with the specified key/value pair(s). + This endpoint is very similar to the `/v1/catalog/service` endpoint; however, this endpoint automatically returns the status of the associated health check as well as any system level health checks. This allows a client to avoid @@ -182,6 +192,11 @@ the node list in ascending order based on the estimated round trip time from that node. Passing `?near=_agent` will use the agent's node for the sort. +In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be +provided with a desired node metadata key/value pair of the form `key:value`. +This parameter can be specified multiple times, and will filter the results to +health checks on nodes with the specified key/value pair(s). + The supported states are `any`, `passing`, `warning`, or `critical`. The `any` state is a wildcard that can be used to return all checks.