From e01914a025d07b1ef8b57db08c2877f949a6a2d1 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 9 Mar 2018 08:34:55 -0800 Subject: [PATCH] agent/consul: Catalog.ServiceNodes supports Connect filtering --- agent/consul/catalog_endpoint.go | 48 +++++++++++++++++-------- agent/consul/catalog_endpoint_test.go | 51 +++++++++++++++++++++++++++ agent/structs/structs.go | 4 +++ agent/structs/testing_catalog.go | 14 ++++++++ 4 files changed, 103 insertions(+), 14 deletions(-) diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index f6fb9a91d8..840b97fa6a 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -269,24 +269,37 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru return fmt.Errorf("Must provide service name") } + // Determine the function we'll call + var f func(memdb.WatchSet, *state.Store) (uint64, structs.ServiceNodes, error) + switch { + case args.Connect: + f = func(ws memdb.WatchSet, s *state.Store) (uint64, structs.ServiceNodes, error) { + return s.ConnectServiceNodes(ws, args.ServiceName) + } + + default: + f = func(ws memdb.WatchSet, s *state.Store) (uint64, structs.ServiceNodes, error) { + if args.ServiceAddress != "" { + return s.ServiceAddressNodes(ws, args.ServiceAddress) + } + + if args.TagFilter { + return s.ServiceTagNodes(ws, args.ServiceName, args.ServiceTag) + } + + return s.ServiceNodes(ws, args.ServiceName) + } + } + err := c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { - var index uint64 - var services structs.ServiceNodes - var err error - if args.TagFilter { - index, services, err = state.ServiceTagNodes(ws, args.ServiceName, args.ServiceTag) - } else { - index, services, err = state.ServiceNodes(ws, args.ServiceName) - } - if args.ServiceAddress != "" { - index, services, err = state.ServiceAddressNodes(ws, args.ServiceAddress) - } + index, services, err := f(ws, state) if err != nil { return err } + reply.Index, reply.ServiceNodes = index, services if len(args.NodeMetaFilters) > 0 { var filtered structs.ServiceNodes @@ -305,17 +318,24 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // Provide some metrics if err == nil { - metrics.IncrCounterWithLabels([]string{"catalog", "service", "query"}, 1, + // For metrics, we separate Connect-based lookups from non-Connect + key := "service" + if args.Connect { + key = "connect" + } + + metrics.IncrCounterWithLabels([]string{"catalog", key, "query"}, 1, []metrics.Label{{Name: "service", Value: args.ServiceName}}) if args.ServiceTag != "" { - metrics.IncrCounterWithLabels([]string{"catalog", "service", "query-tag"}, 1, + metrics.IncrCounterWithLabels([]string{"catalog", key, "query-tag"}, 1, []metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}}) } if len(reply.ServiceNodes) == 0 { - metrics.IncrCounterWithLabels([]string{"catalog", "service", "not-found"}, 1, + metrics.IncrCounterWithLabels([]string{"catalog", key, "not-found"}, 1, []metrics.Label{{Name: "service", Value: args.ServiceName}}) } } + return err } diff --git a/agent/consul/catalog_endpoint_test.go b/agent/consul/catalog_endpoint_test.go index e810f3f636..b095c3f3a8 100644 --- a/agent/consul/catalog_endpoint_test.go +++ b/agent/consul/catalog_endpoint_test.go @@ -1773,6 +1773,57 @@ func TestCatalog_ListServiceNodes_ConnectProxy(t *testing.T) { assert.Equal(args.Service.ProxyDestination, v.ServiceProxyDestination) } +func TestCatalog_ListServiceNodes_ConnectDestination(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Register the proxy service + args := structs.TestRegisterRequestProxy(t) + var out struct{} + assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", args, &out)) + + // Register the service + { + dst := args.Service.ProxyDestination + args := structs.TestRegisterRequest(t) + args.Service.Service = dst + var out struct{} + assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", args, &out)) + } + + // List + req := structs.ServiceSpecificRequest{ + Connect: true, + Datacenter: "dc1", + ServiceName: args.Service.ProxyDestination, + } + var resp structs.IndexedServiceNodes + assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp)) + assert.Len(resp.ServiceNodes, 1) + v := resp.ServiceNodes[0] + assert.Equal(structs.ServiceKindConnectProxy, v.ServiceKind) + assert.Equal(args.Service.ProxyDestination, v.ServiceProxyDestination) + + // List by non-Connect + req = structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: args.Service.ProxyDestination, + } + assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp)) + assert.Len(resp.ServiceNodes, 1) + v = resp.ServiceNodes[0] + assert.Equal(args.Service.ProxyDestination, v.ServiceName) + assert.Equal("", v.ServiceProxyDestination) +} + func TestCatalog_NodeServices(t *testing.T) { t.Parallel() dir1, s1 := testServer(t) diff --git a/agent/structs/structs.go b/agent/structs/structs.go index e1ab91ab54..4301c7e93d 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -284,6 +284,10 @@ type ServiceSpecificRequest struct { ServiceAddress string TagFilter bool // Controls tag filtering Source QuerySource + + // Connect if true will only search for Connect-compatible services. + Connect bool + QueryOptions } diff --git a/agent/structs/testing_catalog.go b/agent/structs/testing_catalog.go index 4a55f1e3d3..d61266ad57 100644 --- a/agent/structs/testing_catalog.go +++ b/agent/structs/testing_catalog.go @@ -4,6 +4,20 @@ import ( "github.com/mitchellh/go-testing-interface" ) +// TestRegisterRequest returns a RegisterRequest for registering a typical service. +func TestRegisterRequest(t testing.T) *RegisterRequest { + return &RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + Service: &NodeService{ + Service: "web", + Address: "", + Port: 80, + }, + } +} + // TestRegisterRequestProxy returns a RegisterRequest for registering a // Connect proxy. func TestRegisterRequestProxy(t testing.T) *RegisterRequest {