From 927f5847615b3e34670db2ed6811eca88f6b47c7 Mon Sep 17 00:00:00 2001 From: Kit Patella Date: Thu, 16 Apr 2020 15:07:52 -0700 Subject: [PATCH] agent: stub out auditing functionality in OSS --- agent/agent.go | 15 ++++++++- agent/agent_oss.go | 18 +++++++++-- agent/config/builder.go | 2 +- agent/config/config.go | 19 +++++++++++ agent/config/runtime_test.go | 29 +++++++++++++++++ agent/consul/config.go | 2 +- agent/http.go | 62 +++++++++++++++++++++++++----------- agent/http_oss.go | 11 +++++++ 8 files changed, 133 insertions(+), 25 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 581199f229..ec8196c189 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -16,6 +16,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/hashicorp/go-connlimit" @@ -178,6 +179,10 @@ type Agent struct { // In-memory sink used for collecting metrics MemSink *metrics.InmemSink + // Eventer provides a backend for handling event logging. APIs are provided on the agent for interacting with + // this reloadable type + Eventer atomic.Value + // delegate is either a *consul.Server or *consul.Client // depending on the configuration delegate delegate @@ -430,7 +435,10 @@ func (a *Agent) Start() error { // waiting to discover a consul server consulCfg.ServerUp = a.sync.SyncFull.Trigger - a.initEnterprise(consulCfg) + err = a.initEnterprise(consulCfg) + if err != nil { + return fmt.Errorf("failed to start Consul enterprise component: %v", err) + } tlsConfigurator, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), a.logger) if err != nil { @@ -4101,6 +4109,11 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error { // concurrent due to both gaining a full lock on the stateLock a.config.ConfigEntryBootstrap = newCfg.ConfigEntryBootstrap + err := a.reloadEnterprise(newCfg) + if err != nil { + return err + } + // create the config for the rpc server/client consulCfg, err := a.consulConfig() if err != nil { diff --git a/agent/agent_oss.go b/agent/agent_oss.go index 97719b6e30..4ee4d7bccf 100644 --- a/agent/agent_oss.go +++ b/agent/agent_oss.go @@ -9,14 +9,26 @@ import ( "github.com/hashicorp/consul/api" ) -// fillAgentServiceEnterpriseMeta stub +// fillAgentServiceEnterpriseMeta is a noop stub for the func defined agent_ent.go func fillAgentServiceEnterpriseMeta(_ *api.AgentService, _ *structs.EnterpriseMeta) {} -// fillHealthCheckEnterpriseMeta stub +// fillHealthCheckEnterpriseMeta is a noop stub for the func defined agent_ent.go func fillHealthCheckEnterpriseMeta(_ *api.HealthCheck, _ *structs.EnterpriseMeta) {} -func (a *Agent) initEnterprise(consulCfg *consul.Config) { +// initEnterprise is a noop stub for the func defined agent_ent.go +func (a *Agent) initEnterprise(consulCfg *consul.Config) error { + return nil } +// loadEnterpriseTokens is a noop stub for the func defined agent_ent.go func (a *Agent) loadEnterpriseTokens(conf *config.RuntimeConfig) { } + +// reloadEnterprise is a noop stub for the func defined agent_ent.go +func (a *Agent) reloadEnterprise(conf *config.RuntimeConfig) error { + return nil +} + +// WriteEvent is a noop stub for the func defined agent_ent.go +func (a *Agent) WriteEvent(eventType string, payload interface{}) { +} diff --git a/agent/config/builder.go b/agent/config/builder.go index 48d18b2724..318d6d95e1 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1004,7 +1004,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { return rt, nil } -// Validate performs semantical validation of the runtime configuration. +// Validate performs semantic validation of the runtime configuration. func (b *Builder) Validate(rt RuntimeConfig) error { // reDatacenter defines a regexp for a valid datacenter name var reDatacenter = regexp.MustCompile("^[a-z0-9_-]+$") diff --git a/agent/config/config.go b/agent/config/config.go index 4de4b313a0..2eed906bb8 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -188,6 +188,7 @@ type Config struct { AdvertiseAddrWAN *string `json:"advertise_addr_wan,omitempty" hcl:"advertise_addr_wan" mapstructure:"advertise_addr_wan"` AdvertiseAddrWANIPv4 *string `json:"advertise_addr_wan_ipv4,omitempty" hcl:"advertise_addr_wan_ipv4" mapstructure:"advertise_addr_wan_ipv4"` AdvertiseAddrWANIPv6 *string `json:"advertise_addr_wan_ipv6,omitempty" hcl:"advertise_addr_wan_ipv6" mapstructure:"advertise_addr_ipv6"` + Audit Audit `json:"audit,omitempty" hcl:"audit" mapstructure:"audit"` Autopilot Autopilot `json:"autopilot,omitempty" hcl:"autopilot" mapstructure:"autopilot"` BindAddr *string `json:"bind_addr,omitempty" hcl:"bind_addr" mapstructure:"bind_addr"` Bootstrap *bool `json:"bootstrap,omitempty" hcl:"bootstrap" mapstructure:"bootstrap"` @@ -367,6 +368,24 @@ type AdvertiseAddrsConfig struct { SerfWAN *string `json:"serf_wan,omitempty" hcl:"serf_wan" mapstructure:"serf_wan"` } +// AuditSink can be provided multiple times to define pipelines for auditing +type AuditSink struct { + Name *string `json:"name,omitempty" hcl:"name" mapstructure:"name"` + Type *string `json:"type,omitempty" hcl:"type" mapstructure:"type"` + Format *string `json:"format,omitempty" hcl:"format" mapstructure:"format"` + Path *string `json:"path,omitempty" hcl:"path" mapstructure:"path"` + DeliveryGuarantee *string `json:"delivery_guarantee,omitempty" hcl:"delivery_guarantee" mapstructure:"delivery_guarantee"` + RotateBytes *int `json:"rotate_bytes,omitempty" hcl:"rotate_bytes" mapstructure:"rotate_bytes"` + RotateDuration *string `json:"rotate_duration,omitempty" hcl:"rotate_duration" mapstructure:"rotate_duration"` + RotateMaxFiles *int `json:"rotate_max_files,omitempty" hcl:"rotate_max_files" mapstructure:"rotate_max_files"` +} + +// Audit allows us to enable and define destinations for auditing +type Audit struct { + Enabled *bool `json:"enabled,omitempty" hcl:"enabled" mapstructure:"enabled"` + Sinks map[string]AuditSink `json:"sink,omitempty" hcl:"sink" mapstructure:"sink"` +} + type Autopilot struct { CleanupDeadServers *bool `json:"cleanup_dead_servers,omitempty" hcl:"cleanup_dead_servers" mapstructure:"cleanup_dead_servers"` DisableUpgradeMigration *bool `json:"disable_upgrade_migration,omitempty" hcl:"disable_upgrade_migration" mapstructure:"disable_upgrade_migration"` diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index ec1bac446c..e84c977890 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -3884,6 +3884,20 @@ func TestFullConfig(t *testing.T) { }, "advertise_addr": "17.99.29.16", "advertise_addr_wan": "78.63.37.19", + "audit": { + "enabled": true, + "sink": { + "test": { + "type": "file", + "format": "json", + "delivery_guarantee": "best-effort", + "path": "/test/path", + "rotate_bytes": 0, + "rotate_max_files": 0, + "rotate_duration": "0" + } + } + }, "autopilot": { "cleanup_dead_servers": true, "disable_upgrade_migration": true, @@ -4515,6 +4529,18 @@ func TestFullConfig(t *testing.T) { } advertise_addr = "17.99.29.16" advertise_addr_wan = "78.63.37.19" + audit { + enabled = true + sink "test" { + type = "file" + format = "json" + delivery_guarantee = "best-effort" + path = "/test/path" + rotate_bytes = 0 + rotate_max_files = 0 + rotate_duration = "0" + } + } autopilot = { cleanup_dead_servers = true disable_upgrade_migration = true @@ -6158,6 +6184,9 @@ func TestSanitize(t *testing.T) { "AEInterval": "0s", "AdvertiseAddrLAN": "", "AdvertiseAddrWAN": "", + "Audit": { + "enabled": false + }, "AutopilotCleanupDeadServers": false, "AutopilotDisableUpgradeMigration": false, "AutopilotLastContactThreshold": "0s", diff --git a/agent/consul/config.go b/agent/consul/config.go index e7df308d21..aa4ea7352c 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -523,7 +523,7 @@ func (c *Config) CheckACL() error { return nil } -// DefaultConfig returns a sane default configuration. +// DefaultConfig returns a default configuration. func DefaultConfig() *Config { hostname, err := os.Hostname() if err != nil { diff --git a/agent/http.go b/agent/http.go index 4a84260921..7fba3b0067 100644 --- a/agent/http.go +++ b/agent/http.go @@ -399,6 +399,10 @@ var ( func (s *HTTPServer) wrap(handler endpoint, methods []string) http.HandlerFunc { httpLogger := s.agent.logger.Named(logging.HTTP) return func(resp http.ResponseWriter, req *http.Request) { + + // Audit log the request + reqPayload := s.auditReq(req) + setHeaders(resp, s.agent.config.HTTPResponseHeaders) setTranslateAddr(resp, s.agent.config.TranslateWANAddrs) @@ -476,33 +480,44 @@ func (s *HTTPServer) wrap(handler endpoint, methods []string) http.HandlerFunc { "from", req.RemoteAddr, "error", err, ) + var httpCode int switch { case isForbidden(err): - resp.WriteHeader(http.StatusForbidden) + httpCode = http.StatusForbidden + resp.WriteHeader(httpCode) fmt.Fprint(resp, err.Error()) case structs.IsErrRPCRateExceeded(err): - resp.WriteHeader(http.StatusTooManyRequests) + httpCode = http.StatusTooManyRequests + resp.WriteHeader(httpCode) case isMethodNotAllowed(err): // RFC2616 states that for 405 Method Not Allowed the response // MUST include an Allow header containing the list of valid // methods for the requested resource. // https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html addAllowHeader(err.(MethodNotAllowedError).Allow) - resp.WriteHeader(http.StatusMethodNotAllowed) // 405 + httpCode = http.StatusMethodNotAllowed + resp.WriteHeader(httpCode) // 405 fmt.Fprint(resp, err.Error()) case isBadRequest(err): - resp.WriteHeader(http.StatusBadRequest) + httpCode = http.StatusBadRequest + resp.WriteHeader(httpCode) fmt.Fprint(resp, err.Error()) case isNotFound(err): - resp.WriteHeader(http.StatusNotFound) + httpCode = http.StatusNotFound + resp.WriteHeader(httpCode) fmt.Fprintf(resp, err.Error()) case isTooManyRequests(err): - resp.WriteHeader(http.StatusTooManyRequests) + httpCode = http.StatusTooManyRequests + resp.WriteHeader(httpCode) fmt.Fprint(resp, err.Error()) default: - resp.WriteHeader(http.StatusInternalServerError) + httpCode = http.StatusInternalServerError + resp.WriteHeader(httpCode) fmt.Fprint(resp, err.Error()) } + + // Audit log the error response + s.auditResp(reqPayload, httpCode) } start := time.Now() @@ -577,6 +592,10 @@ func (s *HTTPServer) wrap(handler endpoint, methods []string) http.HandlerFunc { } resp.Header().Set("Content-Type", contentType) resp.WriteHeader(httpCode) + + // Audit log the success response + s.auditResp(reqPayload, httpCode) + resp.Write(buf) } } @@ -925,10 +944,7 @@ func (s *HTTPServer) parseDC(req *http.Request, dc *string) { } // parseTokenInternal is used to parse the ?token query param or the X-Consul-Token header or -// Authorization Bearer token (RFC6750) and -// optionally resolve proxy tokens to real ACL tokens. If the token is invalid or not specified it will populate -// the token with the agents UserToken (acl_token in the consul configuration) -// Parsing has the following priority: ?token, X-Consul-Token and last "Authorization: Bearer " +// Authorization Bearer token (RFC6750). func (s *HTTPServer) parseTokenInternal(req *http.Request, token *string) { tok := "" if other := req.URL.Query().Get("token"); other != "" { @@ -949,25 +965,33 @@ func (s *HTTPServer) parseTokenInternal(req *http.Request, token *string) { // must be "Bearer" if strings.ToLower(scheme) == "bearer" { - // Since Bearer tokens shouldnt contain spaces (rfc6750#section-2.1) + // Since Bearer tokens shouldn't contain spaces (rfc6750#section-2.1) // "value" is tokenized, only the first item is used tok = strings.TrimSpace(strings.Split(value, " ")[0]) } } } - if tok != "" { - *token = tok + *token = tok + return +} + +// parseTokenResolveProxy passes through to parseTokenInternal and optionally resolves proxy tokens to real ACL tokens. +// If the token is invalid or not specified it will populate the token with the agents UserToken (acl_token in the +// consul configuration) +func (s *HTTPServer) parseTokenResolveProxy(req *http.Request, token *string) { + s.parseTokenInternal(req, token) // parseTokenInternal modifies *token + if token != nil && *token == "" { + *token = s.agent.tokens.UserToken() return } - - *token = s.agent.tokens.UserToken() + return } // parseToken is used to parse the ?token query param or the X-Consul-Token header or -// Authorization Bearer token header (RFC6750) +// Authorization Bearer token header (RFC6750). This function is used widely in Consul's endpoints func (s *HTTPServer) parseToken(req *http.Request, token *string) { - s.parseTokenInternal(req, token) + s.parseTokenResolveProxy(req, token) } func sourceAddrFromRequest(req *http.Request) string { @@ -1027,7 +1051,7 @@ func (s *HTTPServer) parseMetaFilter(req *http.Request) map[string]string { func (s *HTTPServer) parseInternal(resp http.ResponseWriter, req *http.Request, dc *string, b structs.QueryOptionsCompat) bool { s.parseDC(req, dc) var token string - s.parseTokenInternal(req, &token) + s.parseTokenResolveProxy(req, &token) b.SetToken(token) var filter string s.parseFilter(req, &filter) diff --git a/agent/http_oss.go b/agent/http_oss.go index 9811a5685b..d264bf74d5 100644 --- a/agent/http_oss.go +++ b/agent/http_oss.go @@ -52,3 +52,14 @@ func parseACLAuthMethodEnterpriseMeta(req *http.Request, _ *structs.ACLAuthMetho return nil } + +// auditReq is a noop stub for the corresponding func in http_ent.go +func (s *HTTPServer) auditReq(req *http.Request) interface{} { + // note(kit): We return an nil here so we can pass it to auditResp. Auditing the response requires the + // request object for context, so we have it pass it even when it's disabled + return nil +} + +// auditResp is a noop stub for the corresponding func in http_ent.go +func (s *HTTPServer) auditResp(reqPayload interface{}, httpCode int) { +}