agent/consul: Catalog.ServiceNodes supports Connect filtering

This commit is contained in:
Mitchell Hashimoto 2018-03-09 08:34:55 -08:00
parent 2062e37270
commit e01914a025
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
4 changed files with 103 additions and 14 deletions

View File

@ -269,24 +269,37 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
return fmt.Errorf("Must provide service name")
}
// Determine the function we'll call
var f func(memdb.WatchSet, *state.Store) (uint64, structs.ServiceNodes, error)
switch {
case args.Connect:
f = func(ws memdb.WatchSet, s *state.Store) (uint64, structs.ServiceNodes, error) {
return s.ConnectServiceNodes(ws, args.ServiceName)
}
default:
f = func(ws memdb.WatchSet, s *state.Store) (uint64, structs.ServiceNodes, error) {
if args.ServiceAddress != "" {
return s.ServiceAddressNodes(ws, args.ServiceAddress)
}
if args.TagFilter {
return s.ServiceTagNodes(ws, args.ServiceName, args.ServiceTag)
}
return s.ServiceNodes(ws, args.ServiceName)
}
}
err := c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
var index uint64
var services structs.ServiceNodes
var err error
if args.TagFilter {
index, services, err = state.ServiceTagNodes(ws, args.ServiceName, args.ServiceTag)
} else {
index, services, err = state.ServiceNodes(ws, args.ServiceName)
}
if args.ServiceAddress != "" {
index, services, err = state.ServiceAddressNodes(ws, args.ServiceAddress)
}
index, services, err := f(ws, state)
if err != nil {
return err
}
reply.Index, reply.ServiceNodes = index, services
if len(args.NodeMetaFilters) > 0 {
var filtered structs.ServiceNodes
@ -305,17 +318,24 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
// Provide some metrics
if err == nil {
metrics.IncrCounterWithLabels([]string{"catalog", "service", "query"}, 1,
// For metrics, we separate Connect-based lookups from non-Connect
key := "service"
if args.Connect {
key = "connect"
}
metrics.IncrCounterWithLabels([]string{"catalog", key, "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
if args.ServiceTag != "" {
metrics.IncrCounterWithLabels([]string{"catalog", "service", "query-tag"}, 1,
metrics.IncrCounterWithLabels([]string{"catalog", key, "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
}
if len(reply.ServiceNodes) == 0 {
metrics.IncrCounterWithLabels([]string{"catalog", "service", "not-found"}, 1,
metrics.IncrCounterWithLabels([]string{"catalog", key, "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
}
}
return err
}

View File

@ -1773,6 +1773,57 @@ func TestCatalog_ListServiceNodes_ConnectProxy(t *testing.T) {
assert.Equal(args.Service.ProxyDestination, v.ServiceProxyDestination)
}
func TestCatalog_ListServiceNodes_ConnectDestination(t *testing.T) {
t.Parallel()
assert := assert.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Register the proxy service
args := structs.TestRegisterRequestProxy(t)
var out struct{}
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", args, &out))
// Register the service
{
dst := args.Service.ProxyDestination
args := structs.TestRegisterRequest(t)
args.Service.Service = dst
var out struct{}
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", args, &out))
}
// List
req := structs.ServiceSpecificRequest{
Connect: true,
Datacenter: "dc1",
ServiceName: args.Service.ProxyDestination,
}
var resp structs.IndexedServiceNodes
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp))
assert.Len(resp.ServiceNodes, 1)
v := resp.ServiceNodes[0]
assert.Equal(structs.ServiceKindConnectProxy, v.ServiceKind)
assert.Equal(args.Service.ProxyDestination, v.ServiceProxyDestination)
// List by non-Connect
req = structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: args.Service.ProxyDestination,
}
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp))
assert.Len(resp.ServiceNodes, 1)
v = resp.ServiceNodes[0]
assert.Equal(args.Service.ProxyDestination, v.ServiceName)
assert.Equal("", v.ServiceProxyDestination)
}
func TestCatalog_NodeServices(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)

View File

@ -284,6 +284,10 @@ type ServiceSpecificRequest struct {
ServiceAddress string
TagFilter bool // Controls tag filtering
Source QuerySource
// Connect if true will only search for Connect-compatible services.
Connect bool
QueryOptions
}

View File

@ -4,6 +4,20 @@ import (
"github.com/mitchellh/go-testing-interface"
)
// TestRegisterRequest returns a RegisterRequest for registering a typical service.
func TestRegisterRequest(t testing.T) *RegisterRequest {
return &RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &NodeService{
Service: "web",
Address: "",
Port: 80,
},
}
}
// TestRegisterRequestProxy returns a RegisterRequest for registering a
// Connect proxy.
func TestRegisterRequestProxy(t testing.T) *RegisterRequest {