consul/agent/agent_endpoint.go

1003 lines
28 KiB
Go
Raw Normal View History

2014-01-04 01:15:51 +00:00
package agent
import (
"fmt"
2016-11-16 21:45:26 +00:00
"log"
2014-01-04 01:15:51 +00:00
"net/http"
"net/url"
"strconv"
2014-01-04 01:15:51 +00:00
"strings"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
2018-03-21 17:55:39 +00:00
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
2016-11-16 21:45:26 +00:00
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
2016-11-16 21:45:26 +00:00
"github.com/hashicorp/logutils"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
2018-03-21 17:55:39 +00:00
// NOTE(mitcehllh): This is temporary while certs are stubbed out.
"github.com/mitchellh/go-testing-interface"
2014-01-04 01:15:51 +00:00
)
2017-04-21 00:46:29 +00:00
type Self struct {
Config interface{}
DebugConfig map[string]interface{}
Coord *coordinate.Coordinate
Member serf.Member
Stats map[string]map[string]string
Meta map[string]string
}
func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
rule, err := s.agent.resolveToken(token)
if err != nil {
return nil, err
}
if rule != nil && !rule.AgentRead(s.agent.config.NodeName) {
return nil, acl.ErrPermissionDenied
}
var cs lib.CoordinateSet
if !s.agent.config.DisableCoordinates {
var err error
if cs, err = s.agent.GetLANCoordinate(); err != nil {
return nil, err
}
}
config := struct {
Datacenter string
NodeName string
NodeID string
Revision string
Server bool
Version string
}{
Datacenter: s.agent.config.Datacenter,
NodeName: s.agent.config.NodeName,
NodeID: string(s.agent.config.NodeID),
Revision: s.agent.config.Revision,
Server: s.agent.config.ServerMode,
Version: s.agent.config.Version,
}
2017-04-21 00:46:29 +00:00
return Self{
Config: config,
DebugConfig: s.agent.config.Sanitized(),
Coord: cs[s.agent.config.SegmentName],
Member: s.agent.LocalMember(),
Stats: s.agent.Stats(),
2017-08-28 12:17:13 +00:00
Meta: s.agent.State.Metadata(),
}, nil
}
// enablePrometheusOutput will look for Prometheus mime-type or format Query parameter the same way as Nomad
func enablePrometheusOutput(req *http.Request) bool {
if format := req.URL.Query().Get("format"); format == "prometheus" {
return true
}
return false
}
2017-08-08 20:05:38 +00:00
func (s *HTTPServer) AgentMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
rule, err := s.agent.resolveToken(token)
2017-08-08 20:05:38 +00:00
if err != nil {
return nil, err
}
if rule != nil && !rule.AgentRead(s.agent.config.NodeName) {
return nil, acl.ErrPermissionDenied
2017-08-08 20:05:38 +00:00
}
if enablePrometheusOutput(req) {
if s.agent.config.TelemetryPrometheusRetentionTime < 1 {
resp.WriteHeader(http.StatusUnsupportedMediaType)
fmt.Fprint(resp, "Prometheus is not enable since its retention time is not positive")
return nil, nil
}
handlerOptions := promhttp.HandlerOpts{
ErrorLog: s.agent.logger,
ErrorHandling: promhttp.ContinueOnError,
}
2017-08-08 20:05:38 +00:00
handler := promhttp.HandlerFor(prometheus.DefaultGatherer, handlerOptions)
handler.ServeHTTP(resp, req)
return nil, nil
}
2017-08-08 20:05:38 +00:00
return s.agent.MemSink.DisplayMetrics(resp, req)
}
func (s *HTTPServer) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
rule, err := s.agent.resolveToken(token)
if err != nil {
return nil, err
}
if rule != nil && !rule.AgentWrite(s.agent.config.NodeName) {
return nil, acl.ErrPermissionDenied
}
// Trigger the reload
errCh := make(chan error, 0)
select {
case <-s.agent.shutdownCh:
return nil, fmt.Errorf("Agent was shutdown before reload could be completed")
case s.agent.reloadCh <- errCh:
}
// Wait for the result of the reload, or for the agent to shutdown
select {
case <-s.agent.shutdownCh:
return nil, fmt.Errorf("Agent was shutdown before reload could be completed")
case err := <-errCh:
return nil, err
}
}
2014-01-04 01:15:51 +00:00
func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any.
var token string
s.parseToken(req, &token)
2017-08-28 12:17:13 +00:00
services := s.agent.State.Services()
if err := s.agent.filterServices(token, &services); err != nil {
return nil, err
}
2017-04-28 01:22:07 +00:00
// Use empty list instead of nil
for id, s := range services {
if s.Tags == nil || s.Meta == nil {
clone := *s
if s.Tags == nil {
clone.Tags = make([]string, 0)
} else {
clone.Tags = s.Tags
}
if s.Meta == nil {
clone.Meta = make(map[string]string)
} else {
clone.Meta = s.Meta
}
services[id] = &clone
2017-04-28 01:22:07 +00:00
}
}
return services, nil
}
func (s *HTTPServer) AgentChecks(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any.
var token string
s.parseToken(req, &token)
2017-08-28 12:17:13 +00:00
checks := s.agent.State.Checks()
if err := s.agent.filterChecks(token, &checks); err != nil {
return nil, err
}
2017-04-28 01:22:07 +00:00
// Use empty list instead of nil
for id, c := range checks {
2017-04-28 01:22:07 +00:00
if c.ServiceTags == nil {
clone := *c
clone.ServiceTags = make([]string, 0)
checks[id] = &clone
2017-04-28 01:22:07 +00:00
}
}
return checks, nil
2014-01-04 01:15:51 +00:00
}
func (s *HTTPServer) AgentMembers(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any.
var token string
s.parseToken(req, &token)
2014-01-04 01:15:51 +00:00
// Check if the WAN is being queried
wan := false
if other := req.URL.Query().Get("wan"); other != "" {
wan = true
}
segment := req.URL.Query().Get("segment")
2017-09-05 20:40:19 +00:00
if wan {
switch segment {
case "", api.AllSegments:
// The zero value and the special "give me all members"
// key are ok, otherwise the argument doesn't apply to
// the WAN.
default:
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Cannot provide a segment with wan=true")
return nil, nil
}
}
var members []serf.Member
2014-01-04 01:15:51 +00:00
if wan {
members = s.agent.WANMembers()
} else {
var err error
if segment == api.AllSegments {
members, err = s.agent.delegate.LANMembersAllSegments()
} else {
members, err = s.agent.delegate.LANSegmentMembers(segment)
}
if err != nil {
return nil, err
}
}
if err := s.agent.filterMembers(token, &members); err != nil {
return nil, err
2014-01-04 01:15:51 +00:00
}
return members, nil
2014-01-04 01:15:51 +00:00
}
func (s *HTTPServer) AgentJoin(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
rule, err := s.agent.resolveToken(token)
if err != nil {
return nil, err
}
if rule != nil && !rule.AgentWrite(s.agent.config.NodeName) {
return nil, acl.ErrPermissionDenied
}
2014-01-04 01:15:51 +00:00
// Check if the WAN is being queried
wan := false
if other := req.URL.Query().Get("wan"); other != "" {
wan = true
}
// Get the address
addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/join/")
if wan {
_, err = s.agent.JoinWAN([]string{addr})
2014-01-04 01:15:51 +00:00
} else {
_, err = s.agent.JoinLAN([]string{addr})
2014-01-04 01:15:51 +00:00
}
return nil, err
2014-01-04 01:15:51 +00:00
}
func (s *HTTPServer) AgentLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
rule, err := s.agent.resolveToken(token)
if err != nil {
return nil, err
}
if rule != nil && !rule.AgentWrite(s.agent.config.NodeName) {
return nil, acl.ErrPermissionDenied
}
if err := s.agent.Leave(); err != nil {
return nil, err
}
return nil, s.agent.ShutdownAgent()
}
2014-01-04 01:15:51 +00:00
func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
rule, err := s.agent.resolveToken(token)
if err != nil {
return nil, err
}
if rule != nil && !rule.AgentWrite(s.agent.config.NodeName) {
return nil, acl.ErrPermissionDenied
}
2014-01-04 01:15:51 +00:00
addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/")
2014-02-19 22:27:01 +00:00
return nil, s.agent.ForceLeave(addr)
2014-01-04 01:15:51 +00:00
}
// syncChanges is a helper function which wraps a blocking call to sync
// services and checks to the server. If the operation fails, we only
// only warn because the write did succeed and anti-entropy will sync later.
func (s *HTTPServer) syncChanges() {
2017-08-28 12:17:13 +00:00
if err := s.agent.State.SyncChanges(); err != nil {
s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
}
}
func (s *HTTPServer) AgentRegisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.CheckDefinition
// Fixup the type decode of TTL or Interval.
decodeCB := func(raw interface{}) error {
return FixupCheckType(raw)
}
if err := decodeBody(req, &args, decodeCB); err != nil {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Request decode failed: %v", err)
return nil, nil
}
// Verify the check has a name.
if args.Name == "" {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Missing check name")
return nil, nil
}
if args.Status != "" && !structs.ValidStatus(args.Status) {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Bad check status")
return nil, nil
}
// Construct the health check.
health := args.HealthCheck(s.agent.config.NodeName)
// Verify the check type.
chkType := args.CheckType()
err := chkType.Validate()
if err != nil {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, fmt.Errorf("Invalid check: %v", err))
return nil, nil
}
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
if err := s.agent.vetCheckRegister(token, health); err != nil {
return nil, err
}
// Add the check.
if err := s.agent.AddCheck(health, chkType, true, token); err != nil {
return nil, err
}
s.syncChanges()
return nil, nil
}
func (s *HTTPServer) AgentDeregisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/deregister/"))
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
if err := s.agent.vetCheckUpdate(token, checkID); err != nil {
return nil, err
}
if err := s.agent.RemoveCheck(checkID, true); err != nil {
return nil, err
}
s.syncChanges()
return nil, nil
}
func (s *HTTPServer) AgentCheckPass(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/pass/"))
note := req.URL.Query().Get("note")
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
if err := s.agent.vetCheckUpdate(token, checkID); err != nil {
return nil, err
}
if err := s.agent.updateTTLCheck(checkID, api.HealthPassing, note); err != nil {
return nil, err
}
s.syncChanges()
return nil, nil
}
func (s *HTTPServer) AgentCheckWarn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/warn/"))
note := req.URL.Query().Get("note")
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
if err := s.agent.vetCheckUpdate(token, checkID); err != nil {
return nil, err
}
if err := s.agent.updateTTLCheck(checkID, api.HealthWarning, note); err != nil {
return nil, err
}
s.syncChanges()
return nil, nil
}
func (s *HTTPServer) AgentCheckFail(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/fail/"))
note := req.URL.Query().Get("note")
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
if err := s.agent.vetCheckUpdate(token, checkID); err != nil {
return nil, err
}
if err := s.agent.updateTTLCheck(checkID, api.HealthCritical, note); err != nil {
return nil, err
}
s.syncChanges()
return nil, nil
}
// checkUpdate is the payload for a PUT to AgentCheckUpdate.
type checkUpdate struct {
// Status us one of the api.Health* states, "passing", "warning", or
// "critical".
Status string
// Output is the information to post to the UI for operators as the
// output of the process that decided to hit the TTL check. This is
// different from the note field that's associated with the check
// itself.
Output string
}
// AgentCheckUpdate is a PUT-based alternative to the GET-based Pass/Warn/Fail
// APIs.
func (s *HTTPServer) AgentCheckUpdate(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var update checkUpdate
if err := decodeBody(req, &update, nil); err != nil {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Request decode failed: %v", err)
return nil, nil
}
switch update.Status {
case api.HealthPassing:
case api.HealthWarning:
case api.HealthCritical:
default:
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Invalid check status: '%s'", update.Status)
return nil, nil
}
total := len(update.Output)
if total > checks.BufSize {
update.Output = fmt.Sprintf("%s ... (captured %d of %d bytes)",
update.Output[:checks.BufSize], checks.BufSize, total)
}
checkID := types.CheckID(strings.TrimPrefix(req.URL.Path, "/v1/agent/check/update/"))
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
if err := s.agent.vetCheckUpdate(token, checkID); err != nil {
return nil, err
}
if err := s.agent.updateTTLCheck(checkID, update.Status, update.Output); err != nil {
return nil, err
}
s.syncChanges()
return nil, nil
}
func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.ServiceDefinition
// Fixup the type decode of TTL or Interval if a check if provided.
decodeCB := func(raw interface{}) error {
rawMap, ok := raw.(map[string]interface{})
if !ok {
return nil
}
// see https://github.com/hashicorp/consul/pull/3557 why we need this
// and why we should get rid of it.
config.TranslateKeys(rawMap, map[string]string{
"enable_tag_override": "EnableTagOverride",
})
for k, v := range rawMap {
switch strings.ToLower(k) {
case "check":
if err := FixupCheckType(v); err != nil {
return err
}
case "checks":
chkTypes, ok := v.([]interface{})
if !ok {
continue
}
for _, chkType := range chkTypes {
if err := FixupCheckType(chkType); err != nil {
return err
}
}
}
}
return nil
}
if err := decodeBody(req, &args, decodeCB); err != nil {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Request decode failed: %v", err)
return nil, nil
}
// Verify the service has a name.
if args.Name == "" {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Missing service name")
return nil, nil
}
// Check the service address here and in the catalog RPC endpoint
// since service registration isn't synchronous.
if ipaddr.IsAny(args.Address) {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Invalid service address")
return nil, nil
}
// Get the node service.
ns := args.NodeService()
if err := structs.ValidateMetadata(ns.Meta, false); err != nil {
2018-02-07 00:54:42 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, fmt.Errorf("Invalid Service Meta: %v", err))
2018-02-07 00:54:42 +00:00
return nil, nil
}
// Run validation. This is the same validation that would happen on
// the catalog endpoint so it helps ensure the sync will work properly.
if err := ns.Validate(); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, err.Error())
return nil, nil
}
// Verify the check type.
chkTypes, err := args.CheckTypes()
if err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, fmt.Errorf("Invalid check: %v", err))
return nil, nil
}
for _, check := range chkTypes {
if check.Status != "" && !structs.ValidStatus(check.Status) {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Status for checks must 'passing', 'warning', 'critical'")
return nil, nil
}
}
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
if err := s.agent.vetServiceRegister(token, ns); err != nil {
return nil, err
}
// Add the service.
if err := s.agent.AddService(ns, chkTypes, true, token); err != nil {
return nil, err
}
s.syncChanges()
return nil, nil
}
func (s *HTTPServer) AgentDeregisterService(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/deregister/")
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
if err := s.agent.vetServiceUpdate(token, serviceID); err != nil {
return nil, err
}
if err := s.agent.RemoveService(serviceID, true); err != nil {
return nil, err
}
s.syncChanges()
return nil, nil
}
func (s *HTTPServer) AgentServiceMaintenance(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Ensure we have a service ID
serviceID := strings.TrimPrefix(req.URL.Path, "/v1/agent/service/maintenance/")
if serviceID == "" {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Missing service ID")
return nil, nil
}
// Ensure we have some action
params := req.URL.Query()
if _, ok := params["enable"]; !ok {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Missing value for enable")
return nil, nil
}
raw := params.Get("enable")
enable, err := strconv.ParseBool(raw)
if err != nil {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Invalid value for enable: %q", raw)
return nil, nil
}
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
if err := s.agent.vetServiceUpdate(token, serviceID); err != nil {
return nil, err
}
if enable {
reason := params.Get("reason")
if err = s.agent.EnableServiceMaintenance(serviceID, reason, token); err != nil {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusNotFound)
fmt.Fprint(resp, err.Error())
return nil, nil
}
} else {
if err = s.agent.DisableServiceMaintenance(serviceID); err != nil {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusNotFound)
fmt.Fprint(resp, err.Error())
return nil, nil
}
}
s.syncChanges()
return nil, nil
}
2015-01-15 19:20:22 +00:00
func (s *HTTPServer) AgentNodeMaintenance(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Ensure we have some action
params := req.URL.Query()
if _, ok := params["enable"]; !ok {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Missing value for enable")
2015-01-15 19:20:22 +00:00
return nil, nil
}
raw := params.Get("enable")
enable, err := strconv.ParseBool(raw)
if err != nil {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Invalid value for enable: %q", raw)
2015-01-15 19:20:22 +00:00
return nil, nil
}
// Get the provided token, if any, and vet against any ACL policies.
var token string
s.parseToken(req, &token)
rule, err := s.agent.resolveToken(token)
if err != nil {
return nil, err
}
if rule != nil && !rule.NodeWrite(s.agent.config.NodeName, nil) {
return nil, acl.ErrPermissionDenied
}
2015-01-15 19:20:22 +00:00
if enable {
s.agent.EnableNodeMaintenance(params.Get("reason"), token)
2015-01-15 19:20:22 +00:00
} else {
s.agent.DisableNodeMaintenance()
}
s.syncChanges()
2015-01-15 19:20:22 +00:00
return nil, nil
}
2016-11-16 21:45:26 +00:00
func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
rule, err := s.agent.resolveToken(token)
if err != nil {
2016-11-28 21:08:31 +00:00
return nil, err
}
if rule != nil && !rule.AgentRead(s.agent.config.NodeName) {
return nil, acl.ErrPermissionDenied
}
2016-11-28 21:08:31 +00:00
// Get the provided loglevel.
2016-11-16 21:45:26 +00:00
logLevel := req.URL.Query().Get("loglevel")
if logLevel == "" {
logLevel = "INFO"
}
// Upper case the level since that's required by the filter.
2016-11-16 21:45:26 +00:00
logLevel = strings.ToUpper(logLevel)
// Create a level filter and flusher.
2016-11-16 21:45:26 +00:00
filter := logger.LevelFilter()
filter.MinLevel = logutils.LogLevel(logLevel)
if !logger.ValidateLevelFilter(filter.MinLevel, filter) {
2017-08-23 19:19:11 +00:00
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Unknown log level: %s", filter.MinLevel)
2016-11-16 21:45:26 +00:00
return nil, nil
}
flusher, ok := resp.(http.Flusher)
if !ok {
return nil, fmt.Errorf("Streaming not supported")
}
// Set up a log handler.
2016-11-16 21:45:26 +00:00
handler := &httpLogHandler{
filter: filter,
logCh: make(chan string, 512),
logger: s.agent.logger,
2016-11-16 21:45:26 +00:00
}
s.agent.LogWriter.RegisterHandler(handler)
defer s.agent.LogWriter.DeregisterHandler(handler)
2016-11-16 21:45:26 +00:00
notify := resp.(http.CloseNotifier).CloseNotify()
// Send header so client can start streaming body
resp.WriteHeader(http.StatusOK)
// 0 byte write is needed before the Flush call so that if we are using
// a gzip stream it will go ahead and write out the HTTP response header
resp.Write([]byte(""))
flusher.Flush()
// Stream logs until the connection is closed.
2016-11-16 21:45:26 +00:00
for {
select {
case <-notify:
s.agent.LogWriter.DeregisterHandler(handler)
2016-11-28 21:08:31 +00:00
if handler.droppedCount > 0 {
s.agent.logger.Printf("[WARN] agent: Dropped %d logs during monitor request", handler.droppedCount)
}
2016-11-16 21:45:26 +00:00
return nil, nil
case log := <-handler.logCh:
fmt.Fprintln(resp, log)
2016-11-16 21:45:26 +00:00
flusher.Flush()
}
}
}
type httpLogHandler struct {
2016-11-28 21:08:31 +00:00
filter *logutils.LevelFilter
logCh chan string
logger *log.Logger
droppedCount int
2016-11-16 21:45:26 +00:00
}
func (h *httpLogHandler) HandleLog(log string) {
// Check the log level
if !h.filter.Check([]byte(log)) {
return
}
// Do a non-blocking send
select {
case h.logCh <- log:
default:
2016-11-28 21:08:31 +00:00
// Just increment a counter for dropped logs to this handler; we can't log now
// because the lock is already held by the LogWriter invoking this
2017-04-20 19:00:03 +00:00
h.droppedCount++
2016-11-16 21:45:26 +00:00
}
}
func (s *HTTPServer) AgentToken(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if s.checkACLDisabled(resp, req) {
return nil, nil
}
// Fetch the ACL token, if any, and enforce agent policy.
var token string
s.parseToken(req, &token)
rule, err := s.agent.resolveToken(token)
if err != nil {
return nil, err
}
if rule != nil && !rule.AgentWrite(s.agent.config.NodeName) {
return nil, acl.ErrPermissionDenied
}
// The body is just the token, but it's in a JSON object so we can add
// fields to this later if needed.
var args api.AgentToken
if err := decodeBody(req, &args, nil); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Request decode failed: %v", err)
return nil, nil
}
// Figure out the target token.
target := strings.TrimPrefix(req.URL.Path, "/v1/agent/token/")
switch target {
case "acl_token":
s.agent.tokens.UpdateUserToken(args.Token)
case "acl_agent_token":
s.agent.tokens.UpdateAgentToken(args.Token)
case "acl_agent_master_token":
s.agent.tokens.UpdateAgentMasterToken(args.Token)
case "acl_replication_token":
s.agent.tokens.UpdateACLReplicationToken(args.Token)
default:
resp.WriteHeader(http.StatusNotFound)
fmt.Fprintf(resp, "Token %q is unknown", target)
return nil, nil
}
2018-03-21 15:56:14 +00:00
s.agent.logger.Printf("[INFO] agent: Updated agent's ACL token %q", target)
return nil, nil
}
2018-03-17 04:39:26 +00:00
// AgentConnectCARoots returns the trusted CA roots.
func (s *HTTPServer) AgentConnectCARoots(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
2018-03-20 17:36:05 +00:00
// NOTE(mitchellh): for now this is identical to /v1/connect/ca/roots.
// In the future, we're going to do some agent-local caching and the
// behavior will differ.
return s.ConnectCARoots(resp, req)
2018-03-17 04:39:26 +00:00
}
2018-03-21 17:55:39 +00:00
// AgentConnectCALeafCert returns the certificate bundle for a service
// instance. This supports blocking queries to update the returned bundle.
func (s *HTTPServer) AgentConnectCALeafCert(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Get the service ID. Note that this is the ID of a service instance.
id := strings.TrimPrefix(req.URL.Path, "/v1/agent/connect/ca/leaf/")
// Retrieve the service specified
service := s.agent.State.Service(id)
if service == nil {
return nil, fmt.Errorf("unknown service ID: %s", id)
}
// Create a CSR.
// TODO(mitchellh): This is obviously not production ready!
csr, pk := connect.TestCSR(&testing.RuntimeT{}, &connect.SpiffeIDService{
Host: "1234.consul",
Namespace: "default",
Datacenter: s.agent.config.Datacenter,
Service: service.Service,
})
// Request signing
var reply structs.IssuedCert
args := structs.CASignRequest{CSR: csr}
if err := s.agent.RPC("ConnectCA.Sign", &args, &reply); err != nil {
return nil, err
}
reply.PrivateKeyPEM = pk
return &reply, nil
}
// AgentConnectAuthorize
//
// POST /v1/agent/connect/authorize
func (s *HTTPServer) AgentConnectAuthorize(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Fetch the token
var token string
s.parseToken(req, &token)
// Decode the request from the request body
var authReq structs.ConnectAuthorizeRequest
if err := decodeBody(req, &authReq, nil); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Request decode failed: %v", err)
return nil, nil
}
// We need to have a target to check intentions
if authReq.Target == "" {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Target service must be specified")
return nil, nil
}
// Parse the certificate URI from the client ID
uriRaw, err := url.Parse(authReq.ClientID)
if err != nil {
return &connectAuthorizeResp{
Authorized: false,
Reason: fmt.Sprintf("Client ID must be a URI: %s", err),
}, nil
}
uri, err := connect.ParseCertURI(uriRaw)
if err != nil {
return &connectAuthorizeResp{
Authorized: false,
Reason: fmt.Sprintf("Invalid client ID: %s", err),
}, nil
}
uriService, ok := uri.(*connect.SpiffeIDService)
if !ok {
return &connectAuthorizeResp{
Authorized: false,
Reason: "Client ID must be a valid SPIFFE service URI",
}, nil
}
// We need to verify service:write permissions for the given token.
// We do this manually here since the RPC request below only verifies
// service:read.
rule, err := s.agent.resolveToken(token)
if err != nil {
return nil, err
}
if rule != nil && !rule.ServiceWrite(authReq.Target, nil) {
return nil, acl.ErrPermissionDenied
}
// Get the intentions for this target service.
args := &structs.IntentionQueryRequest{
Datacenter: s.agent.config.Datacenter,
Match: &structs.IntentionQueryMatch{
Type: structs.IntentionMatchDestination,
Entries: []structs.IntentionMatchEntry{
{
Namespace: structs.IntentionDefaultNamespace,
Name: authReq.Target,
},
},
},
}
args.Token = token
var reply structs.IndexedIntentionMatches
if err := s.agent.RPC("Intention.Match", args, &reply); err != nil {
return nil, err
}
if len(reply.Matches) != 1 {
return nil, fmt.Errorf("Internal error loading matches")
}
// Test the authorization for each match
for _, ixn := range reply.Matches[0] {
if auth, ok := uriService.Authorize(ixn); ok {
return &connectAuthorizeResp{
Authorized: auth,
Reason: fmt.Sprintf("Matched intention: %s", ixn.String()),
}, nil
}
}
// No match, we need to determine the default behavior. We do this by
// specifying the anonymous token token, which will get that behavior.
// The default behavior if ACLs are disabled is to allow connections
// to mimic the behavior of Consul itself: everything is allowed if
// ACLs are disabled.
rule, err = s.agent.resolveToken("")
if err != nil {
return nil, err
}
authz := true
reason := "ACLs disabled, access is allowed by default"
if rule != nil {
authz = rule.IntentionDefault()
reason = "Default behavior configured by ACLs"
}
return &connectAuthorizeResp{
Authorized: authz,
Reason: reason,
}, nil
}
// connectAuthorizeResp is the response format/structure for the
// /v1/agent/connect/authorize endpoint.
type connectAuthorizeResp struct {
Authorized bool // True if authorized, false if not
Reason string // Reason for the Authorized value (whether true or false)
}