From 44ec8d94d264a10ea9e1ae9f1de2d3c2fb72620c Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 9 Mar 2018 10:01:42 -0800 Subject: [PATCH] agent: clean up connect/non-connect duplication by using shared methods --- agent/catalog_endpoint.go | 70 +++++++++-------------------------- agent/consul/state/catalog.go | 59 +++++++++++++---------------- 2 files changed, 43 insertions(+), 86 deletions(-) diff --git a/agent/catalog_endpoint.go b/agent/catalog_endpoint.go index 86e4e95ee8..4c0fd8f520 100644 --- a/agent/catalog_endpoint.go +++ b/agent/catalog_endpoint.go @@ -157,12 +157,27 @@ RETRY_ONCE: return out.Services, nil } +func (s *HTTPServer) CatalogConnectServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + return s.catalogServiceNodes(resp, req, true) +} + func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - metrics.IncrCounterWithLabels([]string{"client", "api", "catalog_service_nodes"}, 1, + return s.catalogServiceNodes(resp, req, false) +} + +func (s *HTTPServer) catalogServiceNodes(resp http.ResponseWriter, req *http.Request, connect bool) (interface{}, error) { + metricsKey := "catalog_service_nodes" + pathPrefix := "/v1/catalog/service/" + if connect { + metricsKey = "catalog_connect_service_nodes" + pathPrefix = "/v1/catalog/connect/" + } + + metrics.IncrCounterWithLabels([]string{"client", "api", metricsKey}, 1, []metrics.Label{{Name: "node", Value: s.nodeName()}}) // Set default DC - args := structs.ServiceSpecificRequest{} + args := structs.ServiceSpecificRequest{Connect: connect} s.parseSource(req, &args.Source) args.NodeMetaFilters = s.parseMetaFilter(req) if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { @@ -177,7 +192,7 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req } // Pull out the service name - args.ServiceName = strings.TrimPrefix(req.URL.Path, "/v1/catalog/service/") + args.ServiceName = strings.TrimPrefix(req.URL.Path, pathPrefix) if args.ServiceName == "" { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, "Missing service name") @@ -217,55 +232,6 @@ RETRY_ONCE: return out.ServiceNodes, nil } -func (s *HTTPServer) CatalogConnectServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - metrics.IncrCounterWithLabels([]string{"client", "api", "catalog_connect_service_nodes"}, 1, - []metrics.Label{{Name: "node", Value: s.nodeName()}}) - if req.Method != "GET" { - return nil, MethodNotAllowedError{req.Method, []string{"GET"}} - } - - // Set default DC - args := structs.ServiceSpecificRequest{Connect: true} - s.parseSource(req, &args.Source) - args.NodeMetaFilters = s.parseMetaFilter(req) - if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { - return nil, nil - } - - // Pull out the service name - args.ServiceName = strings.TrimPrefix(req.URL.Path, "/v1/catalog/connect/") - if args.ServiceName == "" { - resp.WriteHeader(http.StatusBadRequest) - fmt.Fprint(resp, "Missing service name") - return nil, nil - } - - // Make the RPC request - var out structs.IndexedServiceNodes - defer setMeta(resp, &out.QueryMeta) - if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil { - metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_connect_service_nodes"}, 1, - []metrics.Label{{Name: "node", Value: s.nodeName()}}) - return nil, err - } - s.agent.TranslateAddresses(args.Datacenter, out.ServiceNodes) - - // Use empty list instead of nil - if out.ServiceNodes == nil { - out.ServiceNodes = make(structs.ServiceNodes, 0) - } - for i, s := range out.ServiceNodes { - if s.ServiceTags == nil { - clone := *s - clone.ServiceTags = make([]string, 0) - out.ServiceNodes[i] = &clone - } - } - metrics.IncrCounterWithLabels([]string{"client", "api", "success", "catalog_connect_service_nodes"}, 1, - []metrics.Label{{Name: "node", Value: s.nodeName()}}) - return out.ServiceNodes, nil -} - func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { metrics.IncrCounterWithLabels([]string{"client", "api", "catalog_node_services"}, 1, []metrics.Label{{Name: "node", Value: s.nodeName()}}) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 2ce2da36b5..90a3dc5ebc 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -792,15 +792,39 @@ func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) uint64 { return maxIndexTxn(tx, "nodes", "services") } +// ConnectServiceNodes returns the nodes associated with a Connect +// compatible destination for the given service name. This will include +// both proxies and native integrations. +func (s *Store) ConnectServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) { + return s.serviceNodes(ws, serviceName, true) +} + // ServiceNodes returns the nodes associated with a given service name. func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) { + return s.serviceNodes(ws, serviceName, false) +} + +func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool) (uint64, structs.ServiceNodes, error) { tx := s.db.Txn(false) defer tx.Abort() // Get the table index. idx := maxIndexForService(tx, serviceName, false) + + // Function for lookup + var f func() (memdb.ResultIterator, error) + if !connect { + f = func() (memdb.ResultIterator, error) { + return tx.Get("services", "service", serviceName) + } + } else { + f = func() (memdb.ResultIterator, error) { + return tx.Get("services", "proxy_destination", serviceName) + } + } + // List all the services. - services, err := tx.Get("services", "service", serviceName) + services, err := f() if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } @@ -852,39 +876,6 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) ( return idx, results, nil } -// ConnectServiceNodes returns the nodes associated with a Connect -// compatible destination for the given service name. This will include -// both proxies and native integrations. -func (s *Store) ConnectServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) { - tx := s.db.Txn(false) - defer tx.Abort() - - // Get the table index. - idx := maxIndexForService(tx, serviceName, false) - - // Find all the proxies. When we support native integrations we'll have - // to perform another table lookup here. - services, err := tx.Get(servicesTableName, "proxy_destination", serviceName) - if err != nil { - return 0, nil, fmt.Errorf("failed service lookup: %s", err) - } - ws.Add(services.WatchCh()) - - // Store them - var results structs.ServiceNodes - for service := services.Next(); service != nil; service = services.Next() { - results = append(results, service.(*structs.ServiceNode)) - } - - // Fill in the node details. - results, err = s.parseServiceNodes(tx, ws, results) - if err != nil { - return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err) - } - - return idx, results, nil -} - // serviceTagFilter returns true (should filter) if the given service node // doesn't contain the given tag. func serviceTagFilter(sn *structs.ServiceNode, tag string) bool {