Add node metadata filtering to remaining health/catalog endpoints

This commit is contained in:
Kyle Havlovitz 2017-01-13 20:08:43 -05:00
parent 5efdec89ec
commit 5acd69b4fc
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
17 changed files with 1085 additions and 13 deletions

View File

@ -222,6 +222,36 @@ func TestCatalog_Service(t *testing.T) {
})
}
func TestCatalog_Service_NodeMetaFilter(t *testing.T) {
t.Parallel()
meta := map[string]string{"somekey": "somevalue"}
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.NodeMeta = meta
})
defer s.Stop()
catalog := c.Catalog()
testutil.WaitForResult(func() (bool, error) {
services, meta, err := catalog.Service("consul", "", &QueryOptions{NodeMeta: meta})
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("Bad: %v", meta)
}
if len(services) == 0 {
return false, fmt.Errorf("Bad: %v", services)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestCatalog_Node(t *testing.T) {
t.Parallel()
c, s := makeClient(t)

View File

@ -208,6 +208,46 @@ func TestHealth_Checks(t *testing.T) {
})
}
func TestHealth_Checks_NodeMetaFilter(t *testing.T) {
t.Parallel()
meta := map[string]string{"somekey": "somevalue"}
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.NodeMeta = meta
})
defer s.Stop()
agent := c.Agent()
health := c.Health()
// Make a service with a check
reg := &AgentServiceRegistration{
Name: "foo",
Check: &AgentServiceCheck{
TTL: "15s",
},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
defer agent.ServiceDeregister("foo")
testutil.WaitForResult(func() (bool, error) {
checks, meta, err := health.Checks("foo", &QueryOptions{NodeMeta: meta})
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("bad: %v", meta)
}
if len(checks) == 0 {
return false, fmt.Errorf("Bad: %v", checks)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestHealth_Service(t *testing.T) {
c, s := makeClient(t)
defer s.Stop()
@ -235,6 +275,36 @@ func TestHealth_Service(t *testing.T) {
})
}
func TestHealth_Service_NodeMetaFilter(t *testing.T) {
meta := map[string]string{"somekey": "somevalue"}
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.NodeMeta = meta
})
defer s.Stop()
health := c.Health()
testutil.WaitForResult(func() (bool, error) {
// consul service should always exist...
checks, meta, err := health.Service("consul", "", true, &QueryOptions{NodeMeta: meta})
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("bad: %v", meta)
}
if len(checks) == 0 {
return false, fmt.Errorf("Bad: %v", checks)
}
if _, ok := checks[0].Node.TaggedAddresses["wan"]; !ok {
return false, fmt.Errorf("Bad: %v", checks[0].Node)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestHealth_State(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
@ -258,3 +328,30 @@ func TestHealth_State(t *testing.T) {
t.Fatalf("err: %s", err)
})
}
func TestHealth_State_NodeMetaFilter(t *testing.T) {
t.Parallel()
meta := map[string]string{"somekey": "somevalue"}
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
conf.NodeMeta = meta
})
defer s.Stop()
health := c.Health()
testutil.WaitForResult(func() (bool, error) {
checks, meta, err := health.State("any", &QueryOptions{NodeMeta: meta})
if err != nil {
return false, err
}
if meta.LastIndex == 0 {
return false, fmt.Errorf("bad: %v", meta)
}
if len(checks) == 0 {
return false, fmt.Errorf("Bad: %v", checks)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}

View File

@ -103,6 +103,7 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req
// Set default DC
args := structs.ServiceSpecificRequest{}
s.parseSource(req, &args.Source)
args.NodeMetaFilters = s.parseMetaFilter(req)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

View File

@ -608,6 +608,72 @@ func TestCatalogServiceNodes(t *testing.T) {
}
}
func TestCatalogServiceNodes_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
// Make sure an empty list is returned, not a nil
{
req, err := http.NewRequest("GET", "/v1/catalog/service/api?node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.CatalogServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
nodes := obj.(structs.ServiceNodes)
if nodes == nil || len(nodes) != 0 {
t.Fatalf("bad: %v", obj)
}
}
// Register node
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
NodeMeta: map[string]string{
"somekey": "somevalue",
},
Service: &structs.NodeService{
Service: "api",
},
}
var out struct{}
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
req, err := http.NewRequest("GET", "/v1/catalog/service/api?node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.CatalogServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
nodes := obj.(structs.ServiceNodes)
if len(nodes) != 1 {
t.Fatalf("bad: %v", obj)
}
}
func TestCatalogServiceNodes_WanTranslation(t *testing.T) {
dir1, srv1 := makeHTTPServerWithConfig(t,
func(c *Config) {

View File

@ -11,6 +11,7 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req
// Set default DC
args := structs.ChecksInStateRequest{}
s.parseSource(req, &args.Source)
args.NodeMetaFilters = s.parseMetaFilter(req)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
@ -70,6 +71,7 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req
// Set default DC
args := structs.ServiceSpecificRequest{}
s.parseSource(req, &args.Source)
args.NodeMetaFilters = s.parseMetaFilter(req)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
@ -100,6 +102,7 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ
// Set default DC
args := structs.ServiceSpecificRequest{}
s.parseSource(req, &args.Source)
args.NodeMetaFilters = s.parseMetaFilter(req)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}

View File

@ -65,6 +65,49 @@ func TestHealthChecksInState(t *testing.T) {
})
}
func TestHealthChecksInState_NodeMetaFilter(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.1",
NodeMeta: map[string]string{"somekey": "somevalue"},
Check: &structs.HealthCheck{
Node: "bar",
Name: "node check",
Status: structs.HealthCritical,
},
}
var out struct{}
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
req, err := http.NewRequest("GET", "/v1/health/state/critical?node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
resp := httptest.NewRecorder()
obj, err := srv.HealthChecksInState(resp, req)
if err != nil {
return false, err
}
if err := checkIndex(resp); err != nil {
return false, err
}
// Should be 1 health check for the server
nodes := obj.(structs.HealthChecks)
if len(nodes) != 1 {
return false, fmt.Errorf("bad: %v", obj)
}
return true, nil
}, func(err error) { t.Fatalf("err: %v", err) })
})
}
func TestHealthChecksInState_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
@ -258,6 +301,69 @@ func TestHealthServiceChecks(t *testing.T) {
}
}
func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
req, err := http.NewRequest("GET", "/v1/health/checks/consul?dc=dc1&node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.HealthServiceChecks(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
// Should be a non-nil empty list
nodes := obj.(structs.HealthChecks)
if nodes == nil || len(nodes) != 0 {
t.Fatalf("bad: %v", obj)
}
// Create a service check
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: srv.agent.config.NodeName,
Address: "127.0.0.1",
NodeMeta: map[string]string{"somekey": "somevalue"},
Check: &structs.HealthCheck{
Node: srv.agent.config.NodeName,
Name: "consul check",
ServiceID: "consul",
},
}
var out struct{}
if err = srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
req, err = http.NewRequest("GET", "/v1/health/checks/consul?dc=dc1&node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = httptest.NewRecorder()
obj, err = srv.HealthServiceChecks(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
// Should be 1 health check for consul
nodes = obj.(structs.HealthChecks)
if len(nodes) != 1 {
t.Fatalf("bad: %v", obj)
}
}
func TestHealthServiceChecks_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
@ -429,6 +535,69 @@ func TestHealthServiceNodes(t *testing.T) {
}
}
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
req, err := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.HealthServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
// Should be a non-nil empty list
nodes := obj.(structs.CheckServiceNodes)
if nodes == nil || len(nodes) != 0 {
t.Fatalf("bad: %v", obj)
}
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.1",
NodeMeta: map[string]string{"somekey": "somevalue"},
Service: &structs.NodeService{
ID: "test",
Service: "test",
},
}
var out struct{}
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
req, err = http.NewRequest("GET", "/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = httptest.NewRecorder()
obj, err = srv.HealthServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
// Should be a non-nil empty list for checks
nodes = obj.(structs.CheckServiceNodes)
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
t.Fatalf("bad: %v", obj)
}
}
func TestHealthServiceNodes_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)

View File

@ -243,6 +243,15 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
return err
}
reply.Index, reply.ServiceNodes = index, services
if len(args.NodeMetaFilters) > 0 {
var filtered structs.ServiceNodes
for _, service := range services {
if structs.SatisfiesMetaFilters(service.NodeMeta, args.NodeMetaFilters) {
filtered = append(filtered, service)
}
}
reply.ServiceNodes = filtered
}
if err := c.srv.filterACL(args.Token, reply); err != nil {
return err
}

View File

@ -592,7 +592,7 @@ func TestCatalog_ListNodes(t *testing.T) {
}
}
func TestCatalog_ListNodes_MetaFilter(t *testing.T) {
func TestCatalog_ListNodes_NodeMetaFilter(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -1060,7 +1060,7 @@ func TestCatalog_ListServices(t *testing.T) {
}
}
func TestCatalog_ListServices_MetaFilter(t *testing.T) {
func TestCatalog_ListServices_NodeMetaFilter(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -1308,6 +1308,106 @@ func TestCatalog_ListServiceNodes(t *testing.T) {
}
}
func TestCatalog_ListServiceNodes_NodeMetaFilter(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Add 2 nodes with specific meta maps
node := &structs.Node{Node: "foo", Address: "127.0.0.1", Meta: map[string]string{"somekey": "somevalue", "common": "1"}}
if err := s1.fsm.State().EnsureNode(1, node); err != nil {
t.Fatalf("err: %v", err)
}
node2 := &structs.Node{Node: "bar", Address: "127.0.0.2", Meta: map[string]string{"common": "1"}}
if err := s1.fsm.State().EnsureNode(2, node2); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := s1.fsm.State().EnsureService(4, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000}); err != nil {
t.Fatalf("err: %v", err)
}
cases := []struct {
filters map[string]string
tag string
services structs.ServiceNodes
}{
// Basic meta filter
{
filters: map[string]string{"somekey": "somevalue"},
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
},
// Basic meta filter, tag
{
filters: map[string]string{"somekey": "somevalue"},
tag: "primary",
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
},
// Common meta filter
{
filters: map[string]string{"common": "1"},
services: structs.ServiceNodes{
&structs.ServiceNode{Node: "bar", ServiceID: "db2"},
&structs.ServiceNode{Node: "foo", ServiceID: "db"},
},
},
// Common meta filter, tag
{
filters: map[string]string{"common": "1"},
tag: "secondary",
services: structs.ServiceNodes{
&structs.ServiceNode{Node: "bar", ServiceID: "db2"},
},
},
// Invalid meta filter
{
filters: map[string]string{"invalid": "nope"},
services: structs.ServiceNodes{},
},
// Multiple filter values
{
filters: map[string]string{"somekey": "somevalue", "common": "1"},
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
},
// Multiple filter values, tag
{
filters: map[string]string{"somekey": "somevalue", "common": "1"},
tag: "primary",
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
},
}
for _, tc := range cases {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
NodeMetaFilters: tc.filters,
ServiceName: "db",
ServiceTag: tc.tag,
TagFilter: tc.tag != "",
}
var out structs.IndexedServiceNodes
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.ServiceNodes) != len(tc.services) {
t.Fatalf("bad: %v", out)
}
for i, serviceNode := range out.ServiceNodes {
if serviceNode.Node != tc.services[i].Node || serviceNode.ServiceID != tc.services[i].ServiceID {
t.Fatalf("bad: %v, %v filters: %v", serviceNode, tc.services[i], tc.filters)
}
}
}
}
func TestCatalog_ListServiceNodes_DistanceSort(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)

View File

@ -25,7 +25,14 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
&reply.QueryMeta,
state.GetQueryWatch("ChecksInState"),
func() error {
index, checks, err := state.ChecksInState(args.State)
var index uint64
var checks structs.HealthChecks
var err error
if len(args.NodeMetaFilters) > 0 {
index, checks, err = state.ChecksInStateByNodeMeta(args.State, args.NodeMetaFilters)
} else {
index, checks, err = state.ChecksInState(args.State)
}
if err != nil {
return err
}
@ -80,7 +87,14 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
&reply.QueryMeta,
state.GetQueryWatch("ServiceChecks"),
func() error {
index, checks, err := state.ServiceChecks(args.ServiceName)
var index uint64
var checks structs.HealthChecks
var err error
if len(args.NodeMetaFilters) > 0 {
index, checks, err = state.ServiceChecksByNodeMeta(args.ServiceName, args.NodeMetaFilters)
} else {
index, checks, err = state.ServiceChecks(args.ServiceName)
}
if err != nil {
return err
}
@ -123,6 +137,15 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
}
reply.Index, reply.Nodes = index, nodes
if len(args.NodeMetaFilters) > 0 {
var filtered structs.CheckServiceNodes
for _, node := range nodes {
if structs.SatisfiesMetaFilters(node.Node.Meta, args.NodeMetaFilters) {
filtered = append(filtered, node)
}
}
reply.Nodes = filtered
}
if err := h.srv.filterACL(args.Token, reply); err != nil {
return err
}

View File

@ -57,6 +57,101 @@ func TestHealth_ChecksInState(t *testing.T) {
}
}
func TestHealth_ChecksInState_NodeMetaFilter(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
NodeMeta: map[string]string{
"somekey": "somevalue",
"common": "1",
},
Check: &structs.HealthCheck{
Name: "memory utilization",
Status: structs.HealthPassing,
},
}
var out struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
NodeMeta: map[string]string{
"common": "1",
},
Check: &structs.HealthCheck{
Name: "disk space",
Status: structs.HealthPassing,
},
}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
cases := []struct {
filters map[string]string
checkNames []string
}{
// Get foo's check by its unique meta value
{
filters: map[string]string{"somekey": "somevalue"},
checkNames: []string{"memory utilization"},
},
// Get both foo/bar's checks by their common meta value
{
filters: map[string]string{"common": "1"},
checkNames: []string{"disk space", "memory utilization"},
},
// Use an invalid meta value, should get empty result
{
filters: map[string]string{"invalid": "nope"},
checkNames: []string{},
},
// Use multiple filters to get foo's check
{
filters: map[string]string{
"somekey": "somevalue",
"common": "1",
},
checkNames: []string{"memory utilization"},
},
}
for _, tc := range cases {
var out structs.IndexedHealthChecks
inState := structs.ChecksInStateRequest{
Datacenter: "dc1",
NodeMetaFilters: tc.filters,
State: structs.HealthPassing,
}
if err := msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &inState, &out); err != nil {
t.Fatalf("err: %v", err)
}
checks := out.HealthChecks
if len(checks) != len(tc.checkNames) {
t.Fatalf("Bad: %v, %v", checks, tc.checkNames)
}
for i, check := range checks {
if tc.checkNames[i] != check.Name {
t.Fatalf("Bad: %v %v", checks, tc.checkNames)
}
}
}
}
func TestHealth_ChecksInState_DistanceSort(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
@ -221,6 +316,111 @@ func TestHealth_ServiceChecks(t *testing.T) {
}
}
func TestHealth_ServiceChecks_NodeMetaFilter(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
NodeMeta: map[string]string{
"somekey": "somevalue",
"common": "1",
},
Service: &structs.NodeService{
ID: "db",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "memory utilization",
Status: structs.HealthPassing,
ServiceID: "db",
},
}
var out struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
NodeMeta: map[string]string{
"common": "1",
},
Service: &structs.NodeService{
ID: "db",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "disk space",
Status: structs.HealthPassing,
ServiceID: "db",
},
}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
cases := []struct {
filters map[string]string
checkNames []string
}{
// Get foo's check by its unique meta value
{
filters: map[string]string{"somekey": "somevalue"},
checkNames: []string{"memory utilization"},
},
// Get both foo/bar's checks by their common meta value
{
filters: map[string]string{"common": "1"},
checkNames: []string{"disk space", "memory utilization"},
},
// Use an invalid meta value, should get empty result
{
filters: map[string]string{"invalid": "nope"},
checkNames: []string{},
},
// Use multiple filters to get foo's check
{
filters: map[string]string{
"somekey": "somevalue",
"common": "1",
},
checkNames: []string{"memory utilization"},
},
}
for _, tc := range cases {
var out structs.IndexedHealthChecks
inState := structs.ServiceSpecificRequest{
Datacenter: "dc1",
NodeMetaFilters: tc.filters,
ServiceName: "db",
}
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &inState, &out); err != nil {
t.Fatalf("err: %v", err)
}
checks := out.HealthChecks
if len(checks) != len(tc.checkNames) {
t.Fatalf("Bad: %v, %v", checks, tc.checkNames)
}
for i, check := range checks {
if tc.checkNames[i] != check.Name {
t.Fatalf("Bad: %v %v", checks, tc.checkNames)
}
}
}
}
func TestHealth_ServiceChecks_DistanceSort(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
@ -392,6 +592,136 @@ func TestHealth_ServiceNodes(t *testing.T) {
}
}
func TestHealth_ServiceNodes_NodeMetaFilter(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
NodeMeta: map[string]string{
"somekey": "somevalue",
"common": "1",
},
Service: &structs.NodeService{
ID: "db",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "memory utilization",
Status: structs.HealthPassing,
ServiceID: "db",
},
}
var out struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
arg = structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.2",
NodeMeta: map[string]string{
"common": "1",
},
Service: &structs.NodeService{
ID: "db",
Service: "db",
},
Check: &structs.HealthCheck{
Name: "disk space",
Status: structs.HealthWarning,
ServiceID: "db",
},
}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
cases := []struct {
filters map[string]string
nodes structs.CheckServiceNodes
}{
// Get foo's check by its unique meta value
{
filters: map[string]string{"somekey": "somevalue"},
nodes: structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{Node: "foo"},
Checks: structs.HealthChecks{&structs.HealthCheck{Name: "memory utilization"}},
},
},
},
// Get both foo/bar's checks by their common meta value
{
filters: map[string]string{"common": "1"},
nodes: structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{Node: "bar"},
Checks: structs.HealthChecks{&structs.HealthCheck{Name: "disk space"}},
},
structs.CheckServiceNode{
Node: &structs.Node{Node: "foo"},
Checks: structs.HealthChecks{&structs.HealthCheck{Name: "memory utilization"}},
},
},
},
// Use an invalid meta value, should get empty result
{
filters: map[string]string{"invalid": "nope"},
nodes: structs.CheckServiceNodes{},
},
// Use multiple filters to get foo's check
{
filters: map[string]string{
"somekey": "somevalue",
"common": "1",
},
nodes: structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{Node: "foo"},
Checks: structs.HealthChecks{&structs.HealthCheck{Name: "memory utilization"}},
},
},
},
}
for _, tc := range cases {
var out structs.IndexedCheckServiceNodes
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
NodeMetaFilters: tc.filters,
ServiceName: "db",
}
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Nodes) != len(tc.nodes) {
t.Fatalf("bad: %v, %v, filters: %v", out.Nodes, tc.nodes, tc.filters)
}
for i, node := range out.Nodes {
checks := tc.nodes[i].Checks
if len(node.Checks) != len(checks) {
t.Fatalf("bad: %v, %v, filters: %v", node.Checks, checks, tc.filters)
}
for j, check := range node.Checks {
if check.Name != checks[j].Name {
t.Fatalf("bad: %v, %v, filters: %v", check, checks[j], tc.filters)
}
}
}
}
}
func TestHealth_ServiceNodes_DistanceSort(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)

View File

@ -877,6 +877,24 @@ func (s *StateStore) ServiceChecks(serviceName string) (uint64, structs.HealthCh
return s.parseChecks(idx, checks)
}
// ServiceChecksByNodeMeta is used to get all checks associated with a
// given service ID. The query is performed against a service
// _name_ instead of a service ID.
func (s *StateStore) ServiceChecksByNodeMeta(serviceName string, filters map[string]string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ServiceChecksByNodeMeta")...)
// Return the checks.
checks, err := tx.Get("checks", "service", serviceName)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
return s.parseChecksByNodeMeta(idx, checks, tx, filters)
}
// ChecksInState is used to query the state store for all checks
// which are in the provided state.
func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks, error) {
@ -903,6 +921,34 @@ func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks,
return s.parseChecks(idx, checks)
}
// ChecksInState is used to query the state store for all checks
// which are in the provided state.
func (s *StateStore) ChecksInStateByNodeMeta(state string, filters map[string]string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ChecksInStateByNodeMeta")...)
// Query all checks if HealthAny is passed
var checks memdb.ResultIterator
var err error
if state == structs.HealthAny {
checks, err = tx.Get("checks", "status")
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
} else {
// Any other state we need to query for explicitly
checks, err = tx.Get("checks", "status", state)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
}
return s.parseChecksByNodeMeta(idx, checks, tx, filters)
}
// parseChecks is a helper function used to deduplicate some
// repetitive code for returning health checks.
func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64, structs.HealthChecks, error) {
@ -914,6 +960,27 @@ func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64,
return idx, results, nil
}
// parseChecksByNodeMeta is a helper function used to deduplicate some
// repetitive code for returning health checks filtered by node metadata fields.
func (s *StateStore) parseChecksByNodeMeta(idx uint64, iter memdb.ResultIterator, tx *memdb.Txn,
filters map[string]string) (uint64, structs.HealthChecks, error) {
var results structs.HealthChecks
for check := iter.Next(); check != nil; check = iter.Next() {
healthCheck := check.(*structs.HealthCheck)
node, err := tx.First("nodes", "id", healthCheck.Node)
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
}
if node == nil {
return 0, nil, ErrMissingNode
}
if structs.SatisfiesMetaFilters(node.(*structs.Node).Meta, filters) {
results = append(results, healthCheck)
}
}
return idx, results, nil
}
// DeleteCheck is used to delete a health check registration.
func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID) error {
tx := s.db.Txn(true)

View File

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types"
)
func TestStateStore_EnsureRegistration(t *testing.T) {
@ -1546,6 +1547,63 @@ func TestStateStore_ServiceChecks(t *testing.T) {
}
}
func TestStateStore_ServiceChecksByNodeMeta(t *testing.T) {
s := testStateStore(t)
// Create the first node and service with some checks
testRegisterNodeWithMeta(t, s, 0, "node1", map[string]string{"somekey": "somevalue", "common": "1"})
testRegisterService(t, s, 1, "node1", "service1")
testRegisterCheck(t, s, 2, "node1", "service1", "check1", structs.HealthPassing)
testRegisterCheck(t, s, 3, "node1", "service1", "check2", structs.HealthPassing)
// Create a second node/service with a different set of checks
testRegisterNodeWithMeta(t, s, 4, "node2", map[string]string{"common": "1"})
testRegisterService(t, s, 5, "node2", "service1")
testRegisterCheck(t, s, 6, "node2", "service1", "check3", structs.HealthPassing)
cases := []struct {
filters map[string]string
checks []string
}{
// Basic meta filter
{
filters: map[string]string{"somekey": "somevalue"},
checks: []string{"check1", "check2"},
},
// Common meta field
{
filters: map[string]string{"common": "1"},
checks: []string{"check1", "check2", "check3"},
},
// Invalid meta filter
{
filters: map[string]string{"invalid": "nope"},
checks: []string{},
},
// Multiple filters
{
filters: map[string]string{"somekey": "somevalue", "common": "1"},
checks: []string{"check1", "check2"},
},
}
// Try querying for all checks associated with service1
for _, tc := range cases {
_, checks, err := s.ServiceChecksByNodeMeta("service1", tc.filters)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(checks) != len(tc.checks) {
t.Fatalf("bad checks: %#v", checks)
}
for i, check := range checks {
if check.CheckID != types.CheckID(tc.checks[i]) {
t.Fatalf("bad checks: %#v", checks)
}
}
}
}
func TestStateStore_ChecksInState(t *testing.T) {
s := testStateStore(t)
@ -1585,6 +1643,88 @@ func TestStateStore_ChecksInState(t *testing.T) {
}
}
func TestStateStore_ChecksInStateByNodeMeta(t *testing.T) {
s := testStateStore(t)
// Querying with no results returns nil
idx, res, err := s.ChecksInStateByNodeMeta(structs.HealthPassing, nil)
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Register a node with checks in varied states
testRegisterNodeWithMeta(t, s, 0, "node1", map[string]string{"somekey": "somevalue", "common": "1"})
testRegisterCheck(t, s, 1, "node1", "", "check1", structs.HealthPassing)
testRegisterCheck(t, s, 2, "node1", "", "check2", structs.HealthCritical)
testRegisterNodeWithMeta(t, s, 3, "node2", map[string]string{"common": "1"})
testRegisterCheck(t, s, 4, "node2", "", "check3", structs.HealthPassing)
cases := []struct {
filters map[string]string
state string
checks []string
}{
// Basic meta filter, any status
{
filters: map[string]string{"somekey": "somevalue"},
state: structs.HealthAny,
checks: []string{"check2", "check1"},
},
// Basic meta filter, only passing
{
filters: map[string]string{"somekey": "somevalue"},
state: structs.HealthPassing,
checks: []string{"check1"},
},
// Common meta filter, any status
{
filters: map[string]string{"common": "1"},
state: structs.HealthAny,
checks: []string{"check2", "check1", "check3"},
},
// Common meta filter, only passing
{
filters: map[string]string{"common": "1"},
state: structs.HealthPassing,
checks: []string{"check1", "check3"},
},
// Invalid meta filter
{
filters: map[string]string{"invalid": "nope"},
checks: []string{},
},
// Multiple filters, any status
{
filters: map[string]string{"somekey": "somevalue", "common": "1"},
state: structs.HealthAny,
checks: []string{"check2", "check1"},
},
// Multiple filters, only passing
{
filters: map[string]string{"somekey": "somevalue", "common": "1"},
state: structs.HealthPassing,
checks: []string{"check1"},
},
}
// Try querying for all checks associated with service1
for _, tc := range cases {
_, checks, err := s.ChecksInStateByNodeMeta(tc.state, tc.filters)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(checks) != len(tc.checks) {
t.Fatalf("bad checks: %#v", checks)
}
for i, check := range checks {
if check.CheckID != types.CheckID(tc.checks[i]) {
t.Fatalf("bad checks: %#v, %v", checks, tc.checks)
}
}
}
}
func TestStateStore_DeleteCheck(t *testing.T) {
s := testStateStore(t)

View File

@ -222,6 +222,8 @@ func (s *StateStore) getWatchTables(method string) []string {
return []string{"nodes", "services"}
case "NodeCheck", "NodeChecks", "ServiceChecks", "ChecksInState":
return []string{"checks"}
case "ChecksInStateByNodeMeta", "ServiceChecksByNodeMeta":
return []string{"nodes", "checks"}
case "CheckServiceNodes", "NodeInfo", "NodeDump":
return []string{"nodes", "services", "checks"}
case "SessionGet", "SessionList", "NodeSessions":

View File

@ -35,7 +35,11 @@ func testStateStore(t *testing.T) *StateStore {
}
func testRegisterNode(t *testing.T, s *StateStore, idx uint64, nodeID string) {
node := &structs.Node{Node: nodeID}
testRegisterNodeWithMeta(t, s, idx, nodeID, nil)
}
func testRegisterNodeWithMeta(t *testing.T, s *StateStore, idx uint64, nodeID string, meta map[string]string) {
node := &structs.Node{Node: nodeID, Meta: meta}
if err := s.EnsureNode(idx, node); err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -241,11 +241,12 @@ func (r *DCSpecificRequest) RequestDatacenter() string {
// ServiceSpecificRequest is used to query about a specific service
type ServiceSpecificRequest struct {
Datacenter string
ServiceName string
ServiceTag string
TagFilter bool // Controls tag filtering
Source QuerySource
Datacenter string
NodeMetaFilters map[string]string
ServiceName string
ServiceTag string
TagFilter bool // Controls tag filtering
Source QuerySource
QueryOptions
}
@ -266,9 +267,10 @@ func (r *NodeSpecificRequest) RequestDatacenter() string {
// ChecksInStateRequest is used to query for nodes in a state
type ChecksInStateRequest struct {
Datacenter string
State string
Source QuerySource
Datacenter string
NodeMetaFilters map[string]string
State string
Source QuerySource
QueryOptions
}
@ -287,6 +289,15 @@ type Node struct {
}
type Nodes []*Node
func SatisfiesMetaFilters(meta map[string]string, filters map[string]string) bool {
for key, value := range filters {
if v, ok := meta[key]; !ok || v != value {
return false
}
}
return true
}
// Used to return information about a provided services.
// Maps service name to available tags
type Services map[string][]string

View File

@ -276,6 +276,11 @@ the node list in ascending order based on the estimated round trip
time from that node. Passing `?near=_agent` will use the agent's
node for the sort.
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
provided with a desired node metadata key/value pair of the form `key:value`.
This parameter can be specified multiple times, and will filter the results to
service entries on nodes with the specified key/value pair(s).
It returns a JSON body like this:
```javascript

View File

@ -75,6 +75,11 @@ the node list in ascending order based on the estimated round trip
time from that node. Passing `?near=_agent` will use the agent's
node for the sort.
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
provided with a desired node metadata key/value pair of the form `key:value`.
This parameter can be specified multiple times, and will filter the results to
health checks on nodes with the specified key/value pair(s).
It returns a JSON body like this:
```javascript
@ -112,6 +117,11 @@ Providing the `?passing` query parameter, added in Consul 0.2, will
filter results to only nodes with all checks in the `passing` state.
This can be used to avoid extra filtering logic on the client side.
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
provided with a desired node metadata key/value pair of the form `key:value`.
This parameter can be specified multiple times, and will filter the results to
nodes with the specified key/value pair(s).
This endpoint is very similar to the `/v1/catalog/service` endpoint; however, this
endpoint automatically returns the status of the associated health check
as well as any system level health checks. This allows a client to avoid
@ -182,6 +192,11 @@ the node list in ascending order based on the estimated round trip
time from that node. Passing `?near=_agent` will use the agent's
node for the sort.
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
provided with a desired node metadata key/value pair of the form `key:value`.
This parameter can be specified multiple times, and will filter the results to
health checks on nodes with the specified key/value pair(s).
The supported states are `any`, `passing`, `warning`, or `critical`.
The `any` state is a wildcard that can be used to return all checks.