package agent import ( "encoding/json" "fmt" "net/http" "strconv" "strings" "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/mitchellh/hashstructure" "github.com/hashicorp/go-bexpr" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/hashicorp/consul/acl" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/debug" "github.com/hashicorp/consul/agent/structs" token_store "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/xds/proxysupport" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging/monitor" "github.com/hashicorp/consul/types" ) type Self struct { Config interface{} DebugConfig map[string]interface{} Coord *coordinate.Coordinate Member serf.Member Stats map[string]map[string]string Meta map[string]string XDS *XDSSelf `json:"xDS,omitempty"` } type XDSSelf struct { SupportedProxies map[string][]string Port int } func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any, and enforce agent policy. var token string s.parseToken(req, &token) rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) if err != nil { return nil, err } if rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow { return nil, acl.ErrPermissionDenied } var cs lib.CoordinateSet if !s.agent.config.DisableCoordinates { var err error if cs, err = s.agent.GetLANCoordinate(); err != nil { return nil, err } } var xds *XDSSelf if s.agent.grpcServer != nil { xds = &XDSSelf{ SupportedProxies: map[string][]string{ "envoy": proxysupport.EnvoyVersions, }, Port: s.agent.config.XDSPort, } } config := struct { Datacenter string PrimaryDatacenter string NodeName string NodeID string Revision string Server bool Version string }{ Datacenter: s.agent.config.Datacenter, PrimaryDatacenter: s.agent.config.PrimaryDatacenter, NodeName: s.agent.config.NodeName, NodeID: string(s.agent.config.NodeID), Revision: s.agent.config.Revision, Server: s.agent.config.ServerMode, Version: s.agent.config.Version, } debugConfig := s.agent.config.Sanitized() // Backwards compat for the envoy command. Never use DebugConfig for // programmatic access to data. debugConfig["GRPCPort"] = s.agent.config.XDSPort return Self{ Config: config, DebugConfig: debugConfig, Coord: cs[s.agent.config.SegmentName], Member: s.agent.LocalMember(), Stats: s.agent.Stats(), Meta: s.agent.State.Metadata(), XDS: xds, }, nil } // acceptsOpenMetricsMimeType returns true if mime type is Prometheus-compatible func acceptsOpenMetricsMimeType(acceptHeader string) bool { mimeTypes := strings.Split(acceptHeader, ",") for _, v := range mimeTypes { mimeInfo := strings.Split(v, ";") if len(mimeInfo) > 0 { rawMime := strings.ToLower(strings.Trim(mimeInfo[0], " ")) if rawMime == "application/openmetrics-text" { return true } if rawMime == "text/plain" && (len(mimeInfo) > 1 && strings.Trim(mimeInfo[1], " ") == "version=0.4.0") { return true } } } return false } // enablePrometheusOutput will look for Prometheus mime-type or format Query parameter the same way as Nomad func enablePrometheusOutput(req *http.Request) bool { if format := req.URL.Query().Get("format"); format == "prometheus" { return true } return acceptsOpenMetricsMimeType(req.Header.Get("Accept")) } func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any, and enforce agent policy. var token string s.parseToken(req, &token) rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) if err != nil { return nil, err } if rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow { return nil, acl.ErrPermissionDenied } if enablePrometheusOutput(req) { if s.agent.config.Telemetry.PrometheusOpts.Expiration < 1 { resp.WriteHeader(http.StatusUnsupportedMediaType) fmt.Fprint(resp, "Prometheus is not enabled since its retention time is not positive") return nil, nil } handlerOptions := promhttp.HandlerOpts{ ErrorLog: s.agent.logger.StandardLogger(&hclog.StandardLoggerOptions{ InferLevels: true, }), ErrorHandling: promhttp.ContinueOnError, } handler := promhttp.HandlerFor(prometheus.DefaultGatherer, handlerOptions) handler.ServeHTTP(resp, req) return nil, nil } return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req) } func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any, and enforce agent policy. var token string s.parseToken(req, &token) rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) switch { case err != nil: return nil, err case rule != nil && rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow: return nil, acl.ErrPermissionDenied } flusher, ok := resp.(http.Flusher) if !ok { return nil, fmt.Errorf("streaming not supported") } resp.WriteHeader(http.StatusOK) // 0 byte write is needed before the Flush call so that if we are using // a gzip stream it will go ahead and write out the HTTP response header resp.Write([]byte("")) flusher.Flush() enc := metricsEncoder{ logger: s.agent.logger, encoder: json.NewEncoder(resp), flusher: flusher, } enc.encoder.SetIndent("", " ") s.agent.baseDeps.MetricsHandler.Stream(req.Context(), enc) return nil, nil } type metricsEncoder struct { logger hclog.Logger encoder *json.Encoder flusher http.Flusher } func (m metricsEncoder) Encode(summary interface{}) error { if err := m.encoder.Encode(summary); err != nil { m.logger.Error("failed to encode metrics summary", "error", err) return err } m.flusher.Flush() return nil } func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any, and enforce agent policy. var token string s.parseToken(req, &token) rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) if err != nil { return nil, err } if rule.AgentWrite(s.agent.config.NodeName, nil) != acl.Allow { return nil, acl.ErrPermissionDenied } return nil, s.agent.ReloadConfig() } func buildAgentService(s *structs.NodeService, dc string) api.AgentService { weights := api.AgentWeights{Passing: 1, Warning: 1} if s.Weights != nil { if s.Weights.Passing > 0 { weights.Passing = s.Weights.Passing } weights.Warning = s.Weights.Warning } var taggedAddrs map[string]api.ServiceAddress if len(s.TaggedAddresses) > 0 { taggedAddrs = make(map[string]api.ServiceAddress) for k, v := range s.TaggedAddresses { taggedAddrs[k] = v.ToAPIServiceAddress() } } as := api.AgentService{ Kind: api.ServiceKind(s.Kind), ID: s.ID, Service: s.Service, Tags: s.Tags, Meta: s.Meta, Port: s.Port, Address: s.Address, TaggedAddresses: taggedAddrs, EnableTagOverride: s.EnableTagOverride, CreateIndex: s.CreateIndex, ModifyIndex: s.ModifyIndex, Weights: weights, Datacenter: dc, } if as.Tags == nil { as.Tags = []string{} } if as.Meta == nil { as.Meta = map[string]string{} } // Attach Proxy config if exists if s.Kind == structs.ServiceKindConnectProxy || s.IsGateway() { as.Proxy = s.Proxy.ToAPI() } // Attach Connect configs if they exist. if s.Connect.Native { as.Connect = &api.AgentServiceConnect{ Native: true, } } fillAgentServiceEnterpriseMeta(&as, &s.EnterpriseMeta) return as } func (s *HTTPHandlers) AgentServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any. var token string s.parseToken(req, &token) var entMeta structs.EnterpriseMeta if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil { return nil, err } var filterExpression string s.parseFilter(req, &filterExpression) authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, nil) if err != nil { return nil, err } services := s.agent.State.Services(&entMeta) if err := s.agent.filterServicesWithAuthorizer(authz, &services); err != nil { return nil, err } // Convert into api.AgentService since that includes Connect config but so far // NodeService doesn't need to internally. They are otherwise identical since // that is the struct used in client for reading the one we output here // anyway. agentSvcs := make(map[string]*api.AgentService) dc := s.agent.config.Datacenter // Use empty list instead of nil for id, s := range services { agentService := buildAgentService(s, dc) agentSvcs[id.ID] = &agentService } filter, err := bexpr.CreateFilter(filterExpression, nil, agentSvcs) if err != nil { return nil, err } return filter.Execute(agentSvcs) } // GET /v1/agent/service/:service_id // // Returns the service definition for a single local services and allows // blocking watch using hash-based blocking. func (s *HTTPHandlers) AgentService(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Get the proxy ID. Note that this is the ID of a proxy's service instance. id := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/") // Maybe block var queryOpts structs.QueryOptions if parseWait(resp, req, &queryOpts) { // parseWait returns an error itself return nil, nil } // Parse the token var token string s.parseToken(req, &token) var entMeta structs.EnterpriseMeta if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil { return nil, err } // need to resolve to default the meta _, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, nil) if err != nil { return nil, err } // Parse hash specially. Eventually this should happen in parseWait and end up // in QueryOptions but I didn't want to make very general changes right away. hash := req.URL.Query().Get("hash") sid := structs.NewServiceID(id, &entMeta) dc := s.agent.config.Datacenter resultHash, service, err := s.agent.LocalBlockingQuery(false, hash, queryOpts.MaxQueryTime, func(ws memdb.WatchSet) (string, interface{}, error) { svcState := s.agent.State.ServiceState(sid) if svcState == nil { resp.WriteHeader(http.StatusNotFound) fmt.Fprintf(resp, "unknown service ID: %s", sid.String()) return "", nil, nil } svc := svcState.Service // Setup watch on the service ws.Add(svcState.WatchCh) // Check ACLs. authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) if err != nil { return "", nil, err } var authzContext acl.AuthorizerContext svc.FillAuthzContext(&authzContext) if authz != nil && authz.ServiceRead(svc.Service, &authzContext) != acl.Allow { return "", nil, acl.ErrPermissionDenied } // Calculate the content hash over the response, minus the hash field aSvc := buildAgentService(svc, dc) reply := &aSvc rawHash, err := hashstructure.Hash(reply, nil) if err != nil { return "", nil, err } // Include the ContentHash in the response body reply.ContentHash = fmt.Sprintf("%x", rawHash) return reply.ContentHash, reply, nil }, ) if resultHash != "" { resp.Header().Set("X-Consul-ContentHash", resultHash) } return service, err } func (s *HTTPHandlers) AgentChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any. var token string s.parseToken(req, &token) var entMeta structs.EnterpriseMeta if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil { return nil, err } authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, nil) if err != nil { return nil, err } var filterExpression string s.parseFilter(req, &filterExpression) filter, err := bexpr.CreateFilter(filterExpression, nil, nil) if err != nil { return nil, err } checks := s.agent.State.Checks(&entMeta) if err := s.agent.filterChecksWithAuthorizer(authz, &checks); err != nil { return nil, err } agentChecks := make(map[types.CheckID]*structs.HealthCheck) // Use empty list instead of nil for id, c := range checks { if c.ServiceTags == nil { clone := *c clone.ServiceTags = make([]string, 0) agentChecks[id.ID] = &clone } else { agentChecks[id.ID] = c } } return filter.Execute(agentChecks) } func (s *HTTPHandlers) AgentMembers(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any. var token string s.parseToken(req, &token) // Check if the WAN is being queried wan := false if other := req.URL.Query().Get("wan"); other != "" { wan = true } segment := req.URL.Query().Get("segment") if wan { switch segment { case "", api.AllSegments: // The zero value and the special "give me all members" // key are ok, otherwise the argument doesn't apply to // the WAN. default: resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, "Cannot provide a segment with wan=true") return nil, nil } } var members []serf.Member if wan { members = s.agent.WANMembers() } else { var err error if segment == api.AllSegments { members, err = s.agent.delegate.LANMembersAllSegments() } else { members, err = s.agent.delegate.LANSegmentMembers(segment) } if err != nil { return nil, err } } if err := s.agent.filterMembers(token, &members); err != nil { return nil, err } return members, nil } func (s *HTTPHandlers) AgentJoin(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any, and enforce agent policy. var token string s.parseToken(req, &token) rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) if err != nil { return nil, err } if rule.AgentWrite(s.agent.config.NodeName, nil) != acl.Allow { return nil, acl.ErrPermissionDenied } // Check if the WAN is being queried wan := false if other := req.URL.Query().Get("wan"); other != "" { wan = true } // Get the address addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/join/") if wan { if s.agent.config.ConnectMeshGatewayWANFederationEnabled { return nil, fmt.Errorf("WAN join is disabled when wan federation via mesh gateways is enabled") } _, err = s.agent.JoinWAN([]string{addr}) } else { _, err = s.agent.JoinLAN([]string{addr}) } return nil, err } func (s *HTTPHandlers) AgentLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any, and enforce agent policy. var token string s.parseToken(req, &token) rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) if err != nil { return nil, err } if rule.AgentWrite(s.agent.config.NodeName, nil) != acl.Allow { return nil, acl.ErrPermissionDenied } if err := s.agent.Leave(); err != nil { return nil, err } return nil, s.agent.ShutdownAgent() } func (s *HTTPHandlers) AgentForceLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any, and enforce agent policy. var token string s.parseToken(req, &token) rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) if err != nil { return nil, err } if rule.OperatorWrite(nil) != acl.Allow { return nil, acl.ErrPermissionDenied } //Check the value of the prune query _, prune := req.URL.Query()["prune"] addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/") return nil, s.agent.ForceLeave(addr, prune) } // syncChanges is a helper function which wraps a blocking call to sync // services and checks to the server. If the operation fails, we only // only warn because the write did succeed and anti-entropy will sync later. func (s *HTTPHandlers) syncChanges() { if err := s.agent.State.SyncChanges(); err != nil { s.agent.logger.Error("failed to sync changes", "error", err) } } func (s *HTTPHandlers) AgentRegisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var token string s.parseToken(req, &token) var args structs.CheckDefinition if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil { return nil, err } if err := decodeBody(req.Body, &args); err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprintf(resp, "Request decode failed: %v", err) return nil, nil } // Verify the check has a name. if args.Name == "" { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, "Missing check name") return nil, nil } if args.Status != "" && !structs.ValidStatus(args.Status) { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, "Bad check status") return nil, nil } authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &args.EnterpriseMeta, nil) if err != nil { return nil, err } // Construct the health check. health := args.HealthCheck(s.agent.config.NodeName) // Verify the check type. chkType := args.CheckType() err = chkType.Validate() if err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, fmt.Errorf("Invalid check: %v", err)) return nil, nil } // Store the type of check based on the definition health.Type = chkType.Type() if health.ServiceID != "" { // fixup the service name so that vetCheckRegister requires the right ACLs service := s.agent.State.Service(health.CompoundServiceID()) if service != nil { health.ServiceName = service.Service } } // Get the provided token, if any, and vet against any ACL policies. if err := s.agent.vetCheckRegisterWithAuthorizer(authz, health); err != nil { return nil, err } // Add the check. if err := s.agent.AddCheck(health, chkType, true, token, ConfigSourceRemote); err != nil { return nil, err } s.syncChanges() return nil, nil } func (s *HTTPHandlers) AgentDeregisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) { checkID := structs.NewCheckID(types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/deregister/")), nil) // Get the provided token, if any, and vet against any ACL policies. var token string s.parseToken(req, &token) if err := s.parseEntMetaNoWildcard(req, &checkID.EnterpriseMeta); err != nil { return nil, err } authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &checkID.EnterpriseMeta, nil) if err != nil { return nil, err } checkID.Normalize() if err := s.agent.vetCheckUpdateWithAuthorizer(authz, checkID); err != nil { return nil, err } if err := s.agent.RemoveCheck(checkID, true); err != nil { return nil, err } s.syncChanges() return nil, nil } func (s *HTTPHandlers) AgentCheckPass(resp http.ResponseWriter, req *http.Request) (interface{}, error) { checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/pass/")) note := req.URL.Query().Get("note") return s.agentCheckUpdate(resp, req, checkID, api.HealthPassing, note) } func (s *HTTPHandlers) AgentCheckWarn(resp http.ResponseWriter, req *http.Request) (interface{}, error) { checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/warn/")) note := req.URL.Query().Get("note") return s.agentCheckUpdate(resp, req, checkID, api.HealthWarning, note) } func (s *HTTPHandlers) AgentCheckFail(resp http.ResponseWriter, req *http.Request) (interface{}, error) { checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/fail/")) note := req.URL.Query().Get("note") return s.agentCheckUpdate(resp, req, checkID, api.HealthCritical, note) } // checkUpdate is the payload for a PUT to AgentCheckUpdate. type checkUpdate struct { // Status us one of the api.Health* states, "passing", "warning", or // "critical". Status string // Output is the information to post to the UI for operators as the // output of the process that decided to hit the TTL check. This is // different from the note field that's associated with the check // itself. Output string } // AgentCheckUpdate is a PUT-based alternative to the GET-based Pass/Warn/Fail // APIs. func (s *HTTPHandlers) AgentCheckUpdate(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var update checkUpdate if err := decodeBody(req.Body, &update); err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprintf(resp, "Request decode failed: %v", err) return nil, nil } switch update.Status { case api.HealthPassing: case api.HealthWarning: case api.HealthCritical: default: resp.WriteHeader(http.StatusBadRequest) fmt.Fprintf(resp, "Invalid check status: '%s'", update.Status) return nil, nil } checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/")) return s.agentCheckUpdate(resp, req, checkID, update.Status, update.Output) } func (s *HTTPHandlers) agentCheckUpdate(_resp http.ResponseWriter, req *http.Request, checkID types.CheckID, status string, output string) (interface{}, error) { cid := structs.NewCheckID(checkID, nil) // Get the provided token, if any, and vet against any ACL policies. var token string s.parseToken(req, &token) if err := s.parseEntMetaNoWildcard(req, &cid.EnterpriseMeta); err != nil { return nil, err } authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &cid.EnterpriseMeta, nil) if err != nil { return nil, err } cid.Normalize() if err := s.agent.vetCheckUpdateWithAuthorizer(authz, cid); err != nil { return nil, err } if err := s.agent.updateTTLCheck(cid, status, output); err != nil { return nil, err } s.syncChanges() return nil, nil } // agentHealthService Returns Health for a given service ID func agentHealthService(serviceID structs.ServiceID, s *HTTPHandlers) (int, string, api.HealthChecks) { checks := s.agent.State.ChecksForService(serviceID, true) serviceChecks := make(api.HealthChecks, 0) for _, c := range checks { // TODO: harmonize struct.HealthCheck and api.HealthCheck (or at least extract conversion function) healthCheck := &api.HealthCheck{ Node: c.Node, CheckID: string(c.CheckID), Name: c.Name, Status: c.Status, Notes: c.Notes, Output: c.Output, ServiceID: c.ServiceID, ServiceName: c.ServiceName, ServiceTags: c.ServiceTags, } fillHealthCheckEnterpriseMeta(healthCheck, &c.EnterpriseMeta) serviceChecks = append(serviceChecks, healthCheck) } status := serviceChecks.AggregatedStatus() switch status { case api.HealthWarning: return http.StatusTooManyRequests, status, serviceChecks case api.HealthPassing: return http.StatusOK, status, serviceChecks default: return http.StatusServiceUnavailable, status, serviceChecks } } func returnTextPlain(req *http.Request) bool { if contentType := req.Header.Get("Accept"); strings.HasPrefix(contentType, "text/plain") { return true } if format := req.URL.Query().Get("format"); format != "" { return format == "text" } return false } // AgentHealthServiceByID return the local Service Health given its ID func (s *HTTPHandlers) AgentHealthServiceByID(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Pull out the service id (service id since there may be several instance of the same service on this host) serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/health/service/id/") if serviceID == "" { return nil, &BadRequestError{Reason: "Missing serviceID"} } var entMeta structs.EnterpriseMeta if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil { return nil, err } var token string s.parseToken(req, &token) // need to resolve to default the meta var authzContext acl.AuthorizerContext authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, &authzContext) if err != nil { return nil, err } sid := structs.NewServiceID(serviceID, &entMeta) dc := s.agent.config.Datacenter if service := s.agent.State.Service(sid); service != nil { if authz != nil && authz.ServiceRead(service.Service, &authzContext) != acl.Allow { return nil, acl.ErrPermissionDenied } code, status, healthChecks := agentHealthService(sid, s) if returnTextPlain(req) { return status, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "text/plain"} } serviceInfo := buildAgentService(service, dc) result := &api.AgentServiceChecksInfo{ AggregatedStatus: status, Checks: healthChecks, Service: &serviceInfo, } return result, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "application/json"} } notFoundReason := fmt.Sprintf("ServiceId %s not found", sid.String()) if returnTextPlain(req) { return notFoundReason, CodeWithPayloadError{StatusCode: http.StatusNotFound, Reason: notFoundReason, ContentType: "application/json"} } return &api.AgentServiceChecksInfo{ AggregatedStatus: api.HealthCritical, Checks: nil, Service: nil, }, CodeWithPayloadError{StatusCode: http.StatusNotFound, Reason: notFoundReason, ContentType: "application/json"} } // AgentHealthServiceByName return the worse status of all the services with given name on an agent func (s *HTTPHandlers) AgentHealthServiceByName(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Pull out the service name serviceName := strings.TrimPrefix(req.URL.Path, "/v1/agent/health/service/name/") if serviceName == "" { return nil, &BadRequestError{Reason: "Missing service Name"} } var entMeta structs.EnterpriseMeta if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil { return nil, err } var token string s.parseToken(req, &token) // need to resolve to default the meta var authzContext acl.AuthorizerContext authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &entMeta, &authzContext) if err != nil { return nil, err } if authz != nil && authz.ServiceRead(serviceName, &authzContext) != acl.Allow { return nil, acl.ErrPermissionDenied } dc := s.agent.config.Datacenter code := http.StatusNotFound status := fmt.Sprintf("ServiceName %s Not Found", serviceName) services := s.agent.State.Services(&entMeta) result := make([]api.AgentServiceChecksInfo, 0, 16) for _, service := range services { if service.Service == serviceName { sid := structs.NewServiceID(service.ID, &entMeta) scode, sstatus, healthChecks := agentHealthService(sid, s) serviceInfo := buildAgentService(service, dc) res := api.AgentServiceChecksInfo{ AggregatedStatus: sstatus, Checks: healthChecks, Service: &serviceInfo, } result = append(result, res) // When service is not found, we ignore it and keep existing HTTP status if code == http.StatusNotFound { code = scode status = sstatus } // We take the worst of all statuses, so we keep iterating // passing: 200 < warning: 429 < critical: 503 if code < scode { code = scode status = sstatus } } } if returnTextPlain(req) { return status, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "text/plain"} } return result, CodeWithPayloadError{StatusCode: code, Reason: status, ContentType: "application/json"} } func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var args structs.ServiceDefinition // Fixup the type decode of TTL or Interval if a check if provided. if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil { return nil, err } if err := decodeBody(req.Body, &args); err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprintf(resp, "Request decode failed: %v", err) return nil, nil } // Verify the service has a name. if args.Name == "" { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, "Missing service name") return nil, nil } // Check the service address here and in the catalog RPC endpoint // since service registration isn't synchronous. if ipaddr.IsAny(args.Address) { resp.WriteHeader(http.StatusBadRequest) fmt.Fprintf(resp, "Invalid service address") return nil, nil } var token string s.parseToken(req, &token) authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &args.EnterpriseMeta, nil) if err != nil { return nil, err } // Get the node service. ns := args.NodeService() if ns.Weights != nil { if err := structs.ValidateWeights(ns.Weights); err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, fmt.Errorf("Invalid Weights: %v", err)) return nil, nil } } if err := structs.ValidateServiceMetadata(ns.Kind, ns.Meta, false); err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, fmt.Errorf("Invalid Service Meta: %v", err)) return nil, nil } // Run validation. This is the same validation that would happen on // the catalog endpoint so it helps ensure the sync will work properly. if err := ns.Validate(); err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, err.Error()) return nil, nil } // Verify the check type. chkTypes, err := args.CheckTypes() if err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, fmt.Errorf("Invalid check: %v", err)) return nil, nil } for _, check := range chkTypes { if check.Status != "" && !structs.ValidStatus(check.Status) { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, "Status for checks must 'passing', 'warning', 'critical'") return nil, nil } } // Verify the sidecar check types if args.Connect != nil && args.Connect.SidecarService != nil { chkTypes, err := args.Connect.SidecarService.CheckTypes() if err != nil { return nil, &BadRequestError{ Reason: fmt.Sprintf("Invalid check in sidecar_service: %v", err), } } for _, check := range chkTypes { if check.Status != "" && !structs.ValidStatus(check.Status) { return nil, &BadRequestError{ Reason: "Status for checks must 'passing', 'warning', 'critical'", } } } } // Get the provided token, if any, and vet against any ACL policies. if err := s.agent.vetServiceRegisterWithAuthorizer(authz, ns); err != nil { return nil, err } // See if we have a sidecar to register too sidecar, sidecarChecks, sidecarToken, err := s.agent.sidecarServiceFromNodeService(ns, token) if err != nil { return nil, &BadRequestError{ Reason: fmt.Sprintf("Invalid SidecarService: %s", err)} } if sidecar != nil { // Make sure we are allowed to register the sidecar using the token // specified (might be specific to sidecar or the same one as the overall // request). if err := s.agent.vetServiceRegister(sidecarToken, sidecar); err != nil { return nil, err } // We parsed the sidecar registration, now remove it from the NodeService // for the actual service since it's done it's job and we don't want to // persist it in the actual state/catalog. SidecarService is meant to be a // registration syntax sugar so don't propagate it any further. ns.Connect.SidecarService = nil } // Add the service. replaceExistingChecks := false query := req.URL.Query() if len(query["replace-existing-checks"]) > 0 && (query.Get("replace-existing-checks") == "" || query.Get("replace-existing-checks") == "true") { replaceExistingChecks = true } addReq := AddServiceRequest{ Service: ns, chkTypes: chkTypes, persist: true, token: token, Source: ConfigSourceRemote, replaceExistingChecks: replaceExistingChecks, } if err := s.agent.AddService(addReq); err != nil { return nil, err } if sidecar != nil { addReq := AddServiceRequest{ Service: sidecar, chkTypes: sidecarChecks, persist: true, token: sidecarToken, Source: ConfigSourceRemote, replaceExistingChecks: replaceExistingChecks, } if err := s.agent.AddService(addReq); err != nil { return nil, err } } s.syncChanges() return nil, nil } func (s *HTTPHandlers) AgentDeregisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) { sid := structs.NewServiceID(strings.TrimPrefix(req.URL.Path, "/v1/agent/service/deregister/"), nil) // Get the provided token, if any, and vet against any ACL policies. var token string s.parseToken(req, &token) if err := s.parseEntMetaNoWildcard(req, &sid.EnterpriseMeta); err != nil { return nil, err } authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &sid.EnterpriseMeta, nil) if err != nil { return nil, err } sid.Normalize() if err := s.agent.vetServiceUpdateWithAuthorizer(authz, sid); err != nil { return nil, err } if err := s.agent.RemoveService(sid); err != nil { return nil, err } s.syncChanges() return nil, nil } func (s *HTTPHandlers) AgentServiceMaintenance(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Ensure we have a service ID sid := structs.NewServiceID(strings.TrimPrefix(req.URL.Path, "/v1/agent/service/maintenance/"), nil) if sid.ID == "" { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, "Missing service ID") return nil, nil } // Ensure we have some action params := req.URL.Query() if _, ok := params["enable"]; !ok { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, "Missing value for enable") return nil, nil } raw := params.Get("enable") enable, err := strconv.ParseBool(raw) if err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprintf(resp, "Invalid value for enable: %q", raw) return nil, nil } // Get the provided token, if any, and vet against any ACL policies. var token string s.parseToken(req, &token) if err := s.parseEntMetaNoWildcard(req, &sid.EnterpriseMeta); err != nil { return nil, err } authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, &sid.EnterpriseMeta, nil) if err != nil { return nil, err } sid.Normalize() if err := s.agent.vetServiceUpdateWithAuthorizer(authz, sid); err != nil { return nil, err } if enable { reason := params.Get("reason") if err = s.agent.EnableServiceMaintenance(sid, reason, token); err != nil { resp.WriteHeader(http.StatusNotFound) fmt.Fprint(resp, err.Error()) return nil, nil } } else { if err = s.agent.DisableServiceMaintenance(sid); err != nil { resp.WriteHeader(http.StatusNotFound) fmt.Fprint(resp, err.Error()) return nil, nil } } s.syncChanges() return nil, nil } func (s *HTTPHandlers) AgentNodeMaintenance(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Ensure we have some action params := req.URL.Query() if _, ok := params["enable"]; !ok { resp.WriteHeader(http.StatusBadRequest) fmt.Fprint(resp, "Missing value for enable") return nil, nil } raw := params.Get("enable") enable, err := strconv.ParseBool(raw) if err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprintf(resp, "Invalid value for enable: %q", raw) return nil, nil } // Get the provided token, if any, and vet against any ACL policies. var token string s.parseToken(req, &token) rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) if err != nil { return nil, err } if rule.NodeWrite(s.agent.config.NodeName, nil) != acl.Allow { return nil, acl.ErrPermissionDenied } if enable { s.agent.EnableNodeMaintenance(params.Get("reason"), token) } else { s.agent.DisableNodeMaintenance() } s.syncChanges() return nil, nil } func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any, and enforce agent policy. var token string s.parseToken(req, &token) rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) if err != nil { return nil, err } if rule.AgentRead(s.agent.config.NodeName, nil) != acl.Allow { return nil, acl.ErrPermissionDenied } // Get the provided loglevel. logLevel := req.URL.Query().Get("loglevel") if logLevel == "" { logLevel = "INFO" } var logJSON bool if _, ok := req.URL.Query()["logjson"]; ok { logJSON = true } if !logging.ValidateLogLevel(logLevel) { return nil, BadRequestError{ Reason: fmt.Sprintf("Unknown log level: %s", logLevel), } } flusher, ok := resp.(http.Flusher) if !ok { return nil, fmt.Errorf("Streaming not supported") } monitor := monitor.New(monitor.Config{ BufferSize: 512, Logger: s.agent.logger, LoggerOptions: &hclog.LoggerOptions{ Level: logging.LevelFromString(logLevel), JSONFormat: logJSON, }, }) logsCh := monitor.Start() // Send header so client can start streaming body resp.WriteHeader(http.StatusOK) // 0 byte write is needed before the Flush call so that if we are using // a gzip stream it will go ahead and write out the HTTP response header resp.Write([]byte("")) flusher.Flush() const flushDelay = 200 * time.Millisecond flushTicker := time.NewTicker(flushDelay) defer flushTicker.Stop() // Stream logs until the connection is closed. for { select { case <-req.Context().Done(): droppedCount := monitor.Stop() if droppedCount > 0 { s.agent.logger.Warn("Dropped logs during monitor request", "dropped_count", droppedCount) } flusher.Flush() return nil, nil case log := <-logsCh: fmt.Fprint(resp, string(log)) case <-flushTicker.C: flusher.Flush() } } } func (s *HTTPHandlers) AgentToken(resp http.ResponseWriter, req *http.Request) (interface{}, error) { if s.checkACLDisabled(resp, req) { return nil, nil } // Fetch the ACL token, if any, and enforce agent policy. var token string s.parseToken(req, &token) rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) if err != nil { return nil, err } if rule.AgentWrite(s.agent.config.NodeName, nil) != acl.Allow { return nil, acl.ErrPermissionDenied } // The body is just the token, but it's in a JSON object so we can add // fields to this later if needed. var args api.AgentToken if err := decodeBody(req.Body, &args); err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprintf(resp, "Request decode failed: %v", err) return nil, nil } // Figure out the target token. target := strings.TrimPrefix(req.URL.Path, "/v1/agent/token/") err = s.agent.tokens.WithPersistenceLock(func() error { triggerAntiEntropySync := false switch target { case "acl_token", "default": changed := s.agent.tokens.UpdateUserToken(args.Token, token_store.TokenSourceAPI) if changed { triggerAntiEntropySync = true } case "acl_agent_token", "agent": changed := s.agent.tokens.UpdateAgentToken(args.Token, token_store.TokenSourceAPI) if changed { triggerAntiEntropySync = true } case "acl_agent_master_token", "agent_master": s.agent.tokens.UpdateAgentMasterToken(args.Token, token_store.TokenSourceAPI) case "acl_replication_token", "replication": s.agent.tokens.UpdateReplicationToken(args.Token, token_store.TokenSourceAPI) default: return NotFoundError{Reason: fmt.Sprintf("Token %q is unknown", target)} } // TODO: is it safe to move this out of WithPersistenceLock? if triggerAntiEntropySync { s.agent.sync.SyncFull.Trigger() } return nil }) if err != nil { return nil, err } s.agent.logger.Info("Updated agent's ACL token", "token", target) return nil, nil } // AgentConnectCARoots returns the trusted CA roots. func (s *HTTPHandlers) AgentConnectCARoots(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var args structs.DCSpecificRequest if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } raw, m, err := s.agent.cache.Get(req.Context(), cachetype.ConnectCARootName, &args) if err != nil { return nil, err } defer setCacheMeta(resp, &m) // Add cache hit reply, ok := raw.(*structs.IndexedCARoots) if !ok { // This should never happen, but we want to protect against panics return nil, fmt.Errorf("internal error: response type not correct") } defer setMeta(resp, &reply.QueryMeta) return *reply, nil } // AgentConnectCALeafCert returns the certificate bundle for a service // instance. This supports blocking queries to update the returned bundle. func (s *HTTPHandlers) AgentConnectCALeafCert(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Get the service name. Note that this is the name of the service, // not the ID of the service instance. serviceName := strings.TrimPrefix(req.URL.Path, "/v1/agent/connect/ca/leaf/") args := cachetype.ConnectCALeafRequest{ Service: serviceName, // Need name not ID } var qOpts structs.QueryOptions if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil { return nil, err } // Store DC in the ConnectCALeafRequest but query opts separately if done := s.parse(resp, req, &args.Datacenter, &qOpts); done { return nil, nil } args.MinQueryIndex = qOpts.MinQueryIndex args.MaxQueryTime = qOpts.MaxQueryTime args.Token = qOpts.Token raw, m, err := s.agent.cache.Get(req.Context(), cachetype.ConnectCALeafName, &args) if err != nil { return nil, err } defer setCacheMeta(resp, &m) reply, ok := raw.(*structs.IssuedCert) if !ok { // This should never happen, but we want to protect against panics return nil, fmt.Errorf("internal error: response type not correct") } setIndex(resp, reply.ModifyIndex) return reply, nil } // AgentConnectAuthorize // // POST /v1/agent/connect/authorize // // NOTE: This endpoint treats any L7 intentions as DENY. // // Note: when this logic changes, consider if the Intention.Check RPC method // also needs to be updated. func (s *HTTPHandlers) AgentConnectAuthorize(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the token var token string s.parseToken(req, &token) var authReq structs.ConnectAuthorizeRequest if err := s.parseEntMetaNoWildcard(req, &authReq.EnterpriseMeta); err != nil { return nil, err } if err := decodeBody(req.Body, &authReq); err != nil { return nil, BadRequestError{fmt.Sprintf("Request decode failed: %v", err)} } authz, reason, cacheMeta, err := s.agent.ConnectAuthorize(token, &authReq) if err != nil { return nil, err } setCacheMeta(resp, cacheMeta) return &connectAuthorizeResp{ Authorized: authz, Reason: reason, }, nil } // connectAuthorizeResp is the response format/structure for the // /v1/agent/connect/authorize endpoint. type connectAuthorizeResp struct { Authorized bool // True if authorized, false if not Reason string // Reason for the Authorized value (whether true or false) } // AgentHost // // GET /v1/agent/host // // Retrieves information about resources available and in-use for the // host the agent is running on such as CPU, memory, and disk usage. Requires // a operator:read ACL token. func (s *HTTPHandlers) AgentHost(resp http.ResponseWriter, req *http.Request) (interface{}, error) { // Fetch the ACL token, if any, and enforce agent policy. var token string s.parseToken(req, &token) rule, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil) if err != nil { return nil, err } if rule.OperatorRead(nil) != acl.Allow { return nil, acl.ErrPermissionDenied } return debug.CollectHostInfo(), nil }