mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 19:50:36 +00:00
Only send one single ACL cache refresh across network when TTL is over
It will allow the following: * when connectivity is limited (saturated linnks between DCs), only one single request to refresh an ACL will be sent to ACL master DC instead of statcking ACL refresh queries * when extend-cache is used for ACL, do not wait for result, but refresh the ACL asynchronously, so no delay is not impacting slave DC * When extend-cache is not used, keep the existing blocking mechanism, but only send a single refresh request. This will fix https://github.com/hashicorp/consul/issues/3524
This commit is contained in:
parent
d7c5da335b
commit
9406ca1c95
@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
@ -116,6 +117,9 @@ type aclCache struct {
|
||||
// local is a function used to look for an ACL locally if replication is
|
||||
// enabled. This will be nil if replication isn't enabled.
|
||||
local acl.FaultFunc
|
||||
|
||||
fetchMutex sync.RWMutex
|
||||
fetchMap map[string][]chan (RemoteACLResult)
|
||||
}
|
||||
|
||||
// newACLCache returns a new non-authoritative cache for ACLs. This is used for
|
||||
@ -146,6 +150,11 @@ func newACLCache(conf *Config, logger *log.Logger, rpc rpcFn, local acl.FaultFun
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
type RemoteACLResult struct {
|
||||
result acl.ACL
|
||||
err error
|
||||
}
|
||||
|
||||
// lookupACL is used when we are non-authoritative, and need to resolve an ACL.
|
||||
func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
||||
// Check the cache for the ACL.
|
||||
@ -161,8 +170,22 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
||||
return cached.ACL, nil
|
||||
}
|
||||
metrics.IncrCounter([]string{"acl", "cache_miss"}, 1)
|
||||
res := c.lookupACLRemote(id, authDC, cached)
|
||||
return res.result, res.err
|
||||
}
|
||||
|
||||
// Attempt to refresh the policy from the ACL datacenter via an RPC.
|
||||
func (c *aclCache) fireResult(id string, theACL acl.ACL, err error) {
|
||||
c.fetchMutex.Lock()
|
||||
channels := c.fetchMap[id]
|
||||
delete(c.fetchMap, id)
|
||||
c.fetchMutex.Unlock()
|
||||
for _, cx := range channels {
|
||||
cx <- RemoteACLResult{theACL, err}
|
||||
close(cx)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *aclCache) loadACLInChan(id, authDC string, cached *aclCacheEntry) {
|
||||
args := structs.ACLPolicyRequest{
|
||||
Datacenter: authDC,
|
||||
ACL: id,
|
||||
@ -173,13 +196,21 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
||||
var reply structs.ACLPolicy
|
||||
err := c.rpc("ACL.GetPolicy", &args, &reply)
|
||||
if err == nil {
|
||||
return c.useACLPolicy(id, authDC, cached, &reply)
|
||||
theACL, theError := c.useACLPolicy(id, authDC, cached, &reply)
|
||||
if cached != nil && theACL != nil {
|
||||
cached.ACL = theACL
|
||||
cached.ETag = reply.ETag
|
||||
cached.Expires = time.Now().Add(c.config.ACLTTL)
|
||||
}
|
||||
c.fireResult(id, theACL, theError)
|
||||
return
|
||||
}
|
||||
|
||||
// Check for not-found, which will cause us to bail immediately. For any
|
||||
// other error we report it in the logs but can continue.
|
||||
if acl.IsErrNotFound(err) {
|
||||
return nil, acl.ErrNotFound
|
||||
c.fireResult(id, nil, acl.ErrNotFound)
|
||||
return
|
||||
}
|
||||
c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err)
|
||||
|
||||
@ -227,24 +258,58 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
|
||||
reply.TTL = c.config.ACLTTL
|
||||
reply.Parent = parent
|
||||
reply.Policy = policy
|
||||
return c.useACLPolicy(id, authDC, cached, &reply)
|
||||
theACL, theError := c.useACLPolicy(id, authDC, cached, &reply)
|
||||
if cached != nil && theACL != nil {
|
||||
cached.ACL = theACL
|
||||
cached.ETag = reply.ETag
|
||||
cached.Expires = time.Now().Add(c.config.ACLTTL)
|
||||
}
|
||||
c.fireResult(id, theACL, theError)
|
||||
return
|
||||
}
|
||||
|
||||
ACL_DOWN:
|
||||
// Unable to refresh, apply the down policy.
|
||||
switch c.config.ACLDownPolicy {
|
||||
case "allow":
|
||||
return acl.AllowAll(), nil
|
||||
c.fireResult(id, acl.AllowAll(), nil)
|
||||
return
|
||||
case "extend-cache":
|
||||
if cached != nil {
|
||||
return cached.ACL, nil
|
||||
c.fireResult(id, cached.ACL, nil)
|
||||
return
|
||||
}
|
||||
fallthrough
|
||||
default:
|
||||
return acl.DenyAll(), nil
|
||||
c.fireResult(id, acl.DenyAll(), nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (c *aclCache) lookupACLRemote(id, authDC string, cached *aclCacheEntry) RemoteACLResult {
|
||||
// Attempt to refresh the policy from the ACL datacenter via an RPC.
|
||||
myChan := make(chan RemoteACLResult)
|
||||
mustWaitForResult := cached == nil || c.config.ACLDownPolicy != "extend-cache"
|
||||
c.fetchMutex.Lock()
|
||||
clients, ok := c.fetchMap[id]
|
||||
if !ok {
|
||||
clients = make([]chan RemoteACLResult, 16)
|
||||
}
|
||||
if mustWaitForResult {
|
||||
c.fetchMap[id] = append(clients, myChan)
|
||||
}
|
||||
c.fetchMutex.Unlock()
|
||||
|
||||
if !ok {
|
||||
go c.loadACLInChan(id, authDC, cached)
|
||||
}
|
||||
if !mustWaitForResult {
|
||||
return RemoteACLResult{cached.ACL, nil}
|
||||
}
|
||||
res := <-myChan
|
||||
return res
|
||||
}
|
||||
|
||||
// useACLPolicy handles an ACLPolicy response
|
||||
func (c *aclCache) useACLPolicy(id, authDC string, cached *aclCacheEntry, p *structs.ACLPolicy) (acl.ACL, error) {
|
||||
// Check if we can used the cached policy
|
||||
|
Loading…
x
Reference in New Issue
Block a user