mirror of https://github.com/status-im/consul.git
agent: clean up connect/non-connect duplication by using shared methods
This commit is contained in:
parent
368137b81b
commit
44ec8d94d2
|
@ -157,12 +157,27 @@ RETRY_ONCE:
|
||||||
return out.Services, nil
|
return out.Services, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *HTTPServer) CatalogConnectServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
|
return s.catalogServiceNodes(resp, req, true)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
metrics.IncrCounterWithLabels([]string{"client", "api", "catalog_service_nodes"}, 1,
|
return s.catalogServiceNodes(resp, req, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *HTTPServer) catalogServiceNodes(resp http.ResponseWriter, req *http.Request, connect bool) (interface{}, error) {
|
||||||
|
metricsKey := "catalog_service_nodes"
|
||||||
|
pathPrefix := "/v1/catalog/service/"
|
||||||
|
if connect {
|
||||||
|
metricsKey = "catalog_connect_service_nodes"
|
||||||
|
pathPrefix = "/v1/catalog/connect/"
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics.IncrCounterWithLabels([]string{"client", "api", metricsKey}, 1,
|
||||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||||
|
|
||||||
// Set default DC
|
// Set default DC
|
||||||
args := structs.ServiceSpecificRequest{}
|
args := structs.ServiceSpecificRequest{Connect: connect}
|
||||||
s.parseSource(req, &args.Source)
|
s.parseSource(req, &args.Source)
|
||||||
args.NodeMetaFilters = s.parseMetaFilter(req)
|
args.NodeMetaFilters = s.parseMetaFilter(req)
|
||||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||||
|
@ -177,7 +192,7 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pull out the service name
|
// Pull out the service name
|
||||||
args.ServiceName = strings.TrimPrefix(req.URL.Path, "/v1/catalog/service/")
|
args.ServiceName = strings.TrimPrefix(req.URL.Path, pathPrefix)
|
||||||
if args.ServiceName == "" {
|
if args.ServiceName == "" {
|
||||||
resp.WriteHeader(http.StatusBadRequest)
|
resp.WriteHeader(http.StatusBadRequest)
|
||||||
fmt.Fprint(resp, "Missing service name")
|
fmt.Fprint(resp, "Missing service name")
|
||||||
|
@ -217,55 +232,6 @@ RETRY_ONCE:
|
||||||
return out.ServiceNodes, nil
|
return out.ServiceNodes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *HTTPServer) CatalogConnectServiceNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
|
||||||
metrics.IncrCounterWithLabels([]string{"client", "api", "catalog_connect_service_nodes"}, 1,
|
|
||||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
|
||||||
if req.Method != "GET" {
|
|
||||||
return nil, MethodNotAllowedError{req.Method, []string{"GET"}}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set default DC
|
|
||||||
args := structs.ServiceSpecificRequest{Connect: true}
|
|
||||||
s.parseSource(req, &args.Source)
|
|
||||||
args.NodeMetaFilters = s.parseMetaFilter(req)
|
|
||||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pull out the service name
|
|
||||||
args.ServiceName = strings.TrimPrefix(req.URL.Path, "/v1/catalog/connect/")
|
|
||||||
if args.ServiceName == "" {
|
|
||||||
resp.WriteHeader(http.StatusBadRequest)
|
|
||||||
fmt.Fprint(resp, "Missing service name")
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make the RPC request
|
|
||||||
var out structs.IndexedServiceNodes
|
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
|
||||||
if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil {
|
|
||||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_connect_service_nodes"}, 1,
|
|
||||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
s.agent.TranslateAddresses(args.Datacenter, out.ServiceNodes)
|
|
||||||
|
|
||||||
// Use empty list instead of nil
|
|
||||||
if out.ServiceNodes == nil {
|
|
||||||
out.ServiceNodes = make(structs.ServiceNodes, 0)
|
|
||||||
}
|
|
||||||
for i, s := range out.ServiceNodes {
|
|
||||||
if s.ServiceTags == nil {
|
|
||||||
clone := *s
|
|
||||||
clone.ServiceTags = make([]string, 0)
|
|
||||||
out.ServiceNodes[i] = &clone
|
|
||||||
}
|
|
||||||
}
|
|
||||||
metrics.IncrCounterWithLabels([]string{"client", "api", "success", "catalog_connect_service_nodes"}, 1,
|
|
||||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
|
||||||
return out.ServiceNodes, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
metrics.IncrCounterWithLabels([]string{"client", "api", "catalog_node_services"}, 1,
|
metrics.IncrCounterWithLabels([]string{"client", "api", "catalog_node_services"}, 1,
|
||||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||||
|
|
|
@ -792,15 +792,39 @@ func maxIndexForService(tx *memdb.Txn, serviceName string, checks bool) uint64 {
|
||||||
return maxIndexTxn(tx, "nodes", "services")
|
return maxIndexTxn(tx, "nodes", "services")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConnectServiceNodes returns the nodes associated with a Connect
|
||||||
|
// compatible destination for the given service name. This will include
|
||||||
|
// both proxies and native integrations.
|
||||||
|
func (s *Store) ConnectServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
|
||||||
|
return s.serviceNodes(ws, serviceName, true)
|
||||||
|
}
|
||||||
|
|
||||||
// ServiceNodes returns the nodes associated with a given service name.
|
// ServiceNodes returns the nodes associated with a given service name.
|
||||||
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
|
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
|
||||||
|
return s.serviceNodes(ws, serviceName, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool) (uint64, structs.ServiceNodes, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
// Get the table index.
|
// Get the table index.
|
||||||
idx := maxIndexForService(tx, serviceName, false)
|
idx := maxIndexForService(tx, serviceName, false)
|
||||||
|
|
||||||
|
// Function for lookup
|
||||||
|
var f func() (memdb.ResultIterator, error)
|
||||||
|
if !connect {
|
||||||
|
f = func() (memdb.ResultIterator, error) {
|
||||||
|
return tx.Get("services", "service", serviceName)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
f = func() (memdb.ResultIterator, error) {
|
||||||
|
return tx.Get("services", "proxy_destination", serviceName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// List all the services.
|
// List all the services.
|
||||||
services, err := tx.Get("services", "service", serviceName)
|
services, err := f()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -852,39 +876,6 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (
|
||||||
return idx, results, nil
|
return idx, results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConnectServiceNodes returns the nodes associated with a Connect
|
|
||||||
// compatible destination for the given service name. This will include
|
|
||||||
// both proxies and native integrations.
|
|
||||||
func (s *Store) ConnectServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
|
|
||||||
tx := s.db.Txn(false)
|
|
||||||
defer tx.Abort()
|
|
||||||
|
|
||||||
// Get the table index.
|
|
||||||
idx := maxIndexForService(tx, serviceName, false)
|
|
||||||
|
|
||||||
// Find all the proxies. When we support native integrations we'll have
|
|
||||||
// to perform another table lookup here.
|
|
||||||
services, err := tx.Get(servicesTableName, "proxy_destination", serviceName)
|
|
||||||
if err != nil {
|
|
||||||
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
|
||||||
}
|
|
||||||
ws.Add(services.WatchCh())
|
|
||||||
|
|
||||||
// Store them
|
|
||||||
var results structs.ServiceNodes
|
|
||||||
for service := services.Next(); service != nil; service = services.Next() {
|
|
||||||
results = append(results, service.(*structs.ServiceNode))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fill in the node details.
|
|
||||||
results, err = s.parseServiceNodes(tx, ws, results)
|
|
||||||
if err != nil {
|
|
||||||
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return idx, results, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// serviceTagFilter returns true (should filter) if the given service node
|
// serviceTagFilter returns true (should filter) if the given service node
|
||||||
// doesn't contain the given tag.
|
// doesn't contain the given tag.
|
||||||
func serviceTagFilter(sn *structs.ServiceNode, tag string) bool {
|
func serviceTagFilter(sn *structs.ServiceNode, tag string) bool {
|
||||||
|
|
Loading…
Reference in New Issue