diff --git a/acl/cache.go b/acl/cache.go index 069d13ba75..0debd1d20f 100644 --- a/acl/cache.go +++ b/acl/cache.go @@ -8,7 +8,7 @@ import ( ) // FaultFunc is a function used to fault in the parent, -// rules for an ACL given it's ID +// rules for an ACL given its ID type FaultFunc func(id string) (string, string, error) // aclEntry allows us to store the ACL with it's policy ID @@ -46,7 +46,7 @@ func NewCache(size int, faultfn FaultFunc) (*Cache, error) { // GetPolicy is used to get a potentially cached policy set. // If not cached, it will be parsed, and then cached. func (c *Cache) GetPolicy(rules string) (*Policy, error) { - return c.getPolicy(c.ruleID(rules), rules) + return c.getPolicy(RuleID(rules), rules) } // getPolicy is an internal method to get a cached policy, @@ -66,8 +66,8 @@ func (c *Cache) getPolicy(id, rules string) (*Policy, error) { } -// ruleID is used to generate an ID for a rule -func (c *Cache) ruleID(rules string) string { +// RuleID is used to generate an ID for a rule +func RuleID(rules string) string { return fmt.Sprintf("%x", md5.Sum([]byte(rules))) } @@ -112,7 +112,7 @@ func (c *Cache) GetACL(id string) (ACL, error) { if err != nil { return nil, err } - ruleID := c.ruleID(rules) + ruleID := RuleID(rules) // Check for a compiled ACL policyID := c.policyID(parentID, ruleID) diff --git a/consul/acl.go b/consul/acl.go index d2a6461731..7a472134fb 100644 --- a/consul/acl.go +++ b/consul/acl.go @@ -44,7 +44,7 @@ var ( permissionDeniedErr = errors.New(permissionDenied) ) -// aclCacheEntry is used to cache non-authoritative ACL's +// aclCacheEntry is used to cache non-authoritative ACLs // If non-authoritative, then we must respect a TTL type aclCacheEntry struct { ACL acl.ACL @@ -52,9 +52,14 @@ type aclCacheEntry struct { ETag string } -// aclFault is used to fault in the rules for an ACL if we take a miss -func (s *Server) aclFault(id string) (string, string, error) { +// aclLocalFault is used by the authoritative ACL cache to fault in the rules +// for an ACL if we take a miss. This goes directly to the state store, so it +// assumes its running in the ACL datacenter, or in a non-ACL datacenter when +// using its replicated ACLs during an outage. +func (s *Server) aclLocalFault(id string) (string, string, error) { defer metrics.MeasureSince([]string{"consul", "acl", "fault"}, time.Now()) + + // Query the state store. state := s.fsm.State() _, acl, err := state.ACLGet(id) if err != nil { @@ -64,19 +69,23 @@ func (s *Server) aclFault(id string) (string, string, error) { return "", "", errors.New(aclNotFound) } - // Management tokens have no policy and inherit from the - // 'manage' root policy + // Management tokens have no policy and inherit from the 'manage' root + // policy. if acl.Type == structs.ACLTypeManagement { return "manage", "", nil } - // Otherwise use the base policy + // Otherwise use the default policy. return s.config.ACLDefaultPolicy, acl.Rules, nil } -// resolveToken is used to resolve an ACL if any is appropriate +// resolveToken is the primary interface used by ACL-checkers (such as an +// endpoint handling a request) to resolve a token. If ACLs aren't enabled +// then this will return a nil token, otherwise it will attempt to use local +// cache and ultimately the ACL datacenter to get the policy associated with the +// token. func (s *Server) resolveToken(id string) (acl.ACL, error) { - // Check if there is no ACL datacenter (ACL's disabled) + // Check if there is no ACL datacenter (ACLs disabled) authDC := s.config.ACLDatacenter if len(authDC) == 0 { return nil, nil @@ -108,23 +117,30 @@ type aclCache struct { config *Config logger *log.Logger - // acls is a non-authoritative ACL cache + // acls is a non-authoritative ACL cache. acls *lru.Cache - // aclPolicyCache is a policy cache + // aclPolicyCache is a non-authoritative policy cache. policies *lru.Cache - // The RPC function used to talk to the client/server + // rpc is a function used to talk to the client/server. rpc rpcFn + + // local is a function used to look for an ACL locally if replication is + // enabled. This will be nil if replication isn't enabled. + local acl.FaultFunc } -// newAclCache returns a new cache layer for ACLs and policies -func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn) (*aclCache, error) { +// newAclCache returns a new non-authoritative cache for ACLs. This is used for +// performance, and is used inside the ACL datacenter on non-leader servers, and +// outside the ACL datacenter everywhere. +func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn, local acl.FaultFunc) (*aclCache, error) { var err error cache := &aclCache{ config: conf, logger: logger, rpc: rpc, + local: local, } // Initialize the non-authoritative ACL cache @@ -142,17 +158,16 @@ func newAclCache(conf *Config, logger *log.Logger, rpc rpcFn) (*aclCache, error) return cache, nil } -// lookupACL is used when we are non-authoritative, and need -// to resolve an ACL +// lookupACL is used when we are non-authoritative, and need to resolve an ACL. func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) { - // Check the cache for the ACL + // Check the cache for the ACL. var cached *aclCacheEntry raw, ok := c.acls.Get(id) if ok { cached = raw.(*aclCacheEntry) } - // Check for live cache + // Check for live cache. if cached != nil && time.Now().Before(cached.Expires) { metrics.IncrCounter([]string{"consul", "acl", "cache_hit"}, 1) return cached.ACL, nil @@ -160,7 +175,7 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) { metrics.IncrCounter([]string{"consul", "acl", "cache_miss"}, 1) } - // Attempt to refresh the policy + // Attempt to refresh the policy from the ACL datacenter via an RPC. args := structs.ACLPolicyRequest{ Datacenter: authDC, ACL: id, @@ -168,29 +183,62 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) { if cached != nil { args.ETag = cached.ETag } - var out structs.ACLPolicy - err := c.rpc("ACL.GetPolicy", &args, &out) - - // Handle the happy path + var reply structs.ACLPolicy + err := c.rpc("ACL.GetPolicy", &args, &reply) if err == nil { - return c.useACLPolicy(id, authDC, cached, &out) + return c.useACLPolicy(id, authDC, cached, &reply) } - // Check for not-found + // Check for not-found, which will cause us to bail immediately. For any + // other error we report it in the logs but can continue. if strings.Contains(err.Error(), aclNotFound) { return nil, errors.New(aclNotFound) } else { - s := id - // Print last 3 chars of the token if long enough, otherwise completly hide it - if len(s) > 3 { - s = fmt.Sprintf("token ending in '%s'", s[len(s)-3:]) - } else { - s = redactedToken - } - c.logger.Printf("[ERR] consul.acl: Failed to get policy for %s: %v", s, err) + // TODO (slackpad) - We used to print a few characters of the + // token here if the token was long enough. This was bugging me + // so I deleted it. We should probably print a hash of the token, + // or better yet let's add another ID to tokens to identify them + // without giving away their privileges. + c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err) } - // Unable to refresh, apply the down policy + // At this point we might have an expired cache entry and we know that + // there was a problem getting the ACL from the ACL datacenter. If a + // local ACL fault function is registered to query replicated ACL data, + // and the user's policy allows it, we will try locally before we give + // up. + if c.local != nil && c.config.ACLDownPolicy == "extend-cache" { + parent, rules, err := c.local(id) + if err != nil { + // We don't make an exception here for ACLs that aren't + // found locally. It seems more robust to use an expired + // cached entry (if we have one) rather than ignore it + // for the case that replication was a bit behind and + // didn't have the ACL yet. + c.logger.Printf("[DEBUG] consul.acl: Failed to get policy from replicated ACLs: %v", err) + goto ACL_DOWN + } + + policy, err := acl.Parse(rules) + if err != nil { + c.logger.Printf("[DEBUG] consul.acl: Failed to parse policy for replicated ACL: %v", err) + goto ACL_DOWN + } + policy.ID = acl.RuleID(rules) + + // Fake up an ACL datacenter reply and inject it into the cache. + // Note we use the local TTL here, so this'll be used for that + // amount of time even once the ACL datacenter becomes available. + metrics.IncrCounter([]string{"consul", "acl", "replication_hit"}, 1) + reply.ETag = makeACLETag(parent, policy) + reply.TTL = c.config.ACLTTL + reply.Parent = parent + reply.Policy = policy + return c.useACLPolicy(id, authDC, cached, &reply) + } + +ACL_DOWN: + // Unable to refresh, apply the down policy. switch c.config.ACLDownPolicy { case "allow": return acl.AllowAll(), nil diff --git a/consul/acl_endpoint.go b/consul/acl_endpoint.go index d14c90d289..19776ac5ea 100644 --- a/consul/acl_endpoint.go +++ b/consul/acl_endpoint.go @@ -161,6 +161,11 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest, }) } +// makeACLETag returns an ETag for the given parent and policy. +func makeACLETag(parent string, policy *acl.Policy) string { + return fmt.Sprintf("%s:%s", parent, policy.ID) +} + // GetPolicy is used to retrieve a compiled policy object with a TTL. Does not // support a blocking query. func (a *ACL) GetPolicy(args *structs.ACLPolicyRequest, reply *structs.ACLPolicy) error { @@ -181,7 +186,7 @@ func (a *ACL) GetPolicy(args *structs.ACLPolicyRequest, reply *structs.ACLPolicy // Generate an ETag conf := a.srv.config - etag := fmt.Sprintf("%s:%s", parent, policy.ID) + etag := makeACLETag(parent, policy) // Setup the response reply.ETag = etag diff --git a/consul/acl_replication.go b/consul/acl_replication.go index af4aebffb7..ca530453f7 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -179,6 +179,7 @@ func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { time.Sleep(1*time.Second - elapsed) ops, start = 0, time.Now() } + ops++ // Note that we are using the single ACL interface here and not // performing all this inside a single transaction. This is OK @@ -192,7 +193,6 @@ func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { if err := aclApplyInternal(s, change, &reply); err != nil { return err } - ops++ } return nil } @@ -232,6 +232,8 @@ func (s *Server) replicateACLs(lastRemoteIndex uint64) (uint64, error) { lastRemoteIndex = 0 } + // Calculate the changes required to bring the state into sync and then + // apply them. changes := reconcileACLs(local, remote.ACLs, lastRemoteIndex) if err := s.updateLocalACLs(changes); err != nil { return 0, fmt.Errorf("failed to sync ACL changes: %v", err) @@ -252,8 +254,6 @@ func (s *Server) IsACLReplicationEnabled() bool { // runACLReplication is a long-running goroutine that will attempt to replicate // ACLs while the server is the leader, until the shutdown channel closes. func (s *Server) runACLReplication() { - defer s.shutdownWait.Done() - // Give each server's replicator a random initial phase for good // measure. select { diff --git a/consul/server.go b/consul/server.go index 02490c6ae0..5419f0b6ee 100644 --- a/consul/server.go +++ b/consul/server.go @@ -67,7 +67,7 @@ const ( // Server is Consul server which manages the service discovery, // health checking, DC forwarding, Raft, and multiple Serf pools. type Server struct { - // aclAuthCache is the authoritative ACL cache + // aclAuthCache is the authoritative ACL cache. aclAuthCache *acl.Cache // aclCache is the non-authoritative ACL cache. @@ -151,15 +151,10 @@ type Server struct { // shutdown and the associated members here are used in orchestrating // a clean shutdown. The shutdownCh is never written to, only closed to - // indicate a shutdown has been initiated. The shutdownWait group will - // be waited on after closing the shutdownCh, but before any other - // shutdown activities take place in the server. Anything added to the - // shutdownWait group will block all the rest of shutdown, so use this - // sparingly and carefully. + // indicate a shutdown has been initiated. shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex - shutdownWait sync.WaitGroup } // Holds the RPC endpoints @@ -236,16 +231,21 @@ func NewServer(config *Config) (*Server, error) { } // Initialize the authoritative ACL cache. - s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclFault) + s.aclAuthCache, err = acl.NewCache(aclCacheSize, s.aclLocalFault) if err != nil { s.Shutdown() - return nil, fmt.Errorf("Failed to create ACL cache: %v", err) + return nil, fmt.Errorf("Failed to create authoritative ACL cache: %v", err) } - // Set up the non-authoritative ACL cache. - if s.aclCache, err = newAclCache(config, logger, s.RPC); err != nil { + // Set up the non-authoritative ACL cache. A nil local function is given + // if ACL replication isn't enabled. + var local acl.FaultFunc + if s.IsACLReplicationEnabled() { + local = s.aclLocalFault + } + if s.aclCache, err = newAclCache(config, logger, s.RPC, local); err != nil { s.Shutdown() - return nil, err + return nil, fmt.Errorf("Failed to create non-authoritative ACL cache: %v", err) } // Initialize the RPC layer. @@ -280,7 +280,6 @@ func NewServer(config *Config) (*Server, error) { // Start ACL replication. if s.IsACLReplicationEnabled() { - s.shutdownWait.Add(1) go s.runACLReplication() } @@ -509,7 +508,6 @@ func (s *Server) Shutdown() error { s.shutdown = true close(s.shutdownCh) - s.shutdownWait.Wait() if s.serfLAN != nil { s.serfLAN.Shutdown()