Added async-cache with similar behaviour as extend-cache but asynchronously

This commit is contained in:
Pierre Souchay 2018-07-01 12:50:53 +02:00
parent bfc83ce045
commit abde81a3e7
6 changed files with 153 additions and 137 deletions

View File

@ -104,6 +104,7 @@ func newACLManager(config *config.RuntimeConfig) (*aclManager, error) {
case "deny": case "deny":
down = acl.DenyAll() down = acl.DenyAll()
case "extend-cache": case "extend-cache":
case "async-cache":
// Leave the down policy as nil to signal this. // Leave the down policy as nil to signal this.
default: default:
return nil, fmt.Errorf("invalid ACL down policy %q", config.ACLDownPolicy) return nil, fmt.Errorf("invalid ACL down policy %q", config.ACLDownPolicy)

View File

@ -274,79 +274,82 @@ func TestACL_Down_Allow(t *testing.T) {
func TestACL_Down_Extend(t *testing.T) { func TestACL_Down_Extend(t *testing.T) {
t.Parallel() t.Parallel()
a := NewTestAgent(t.Name(), TestACLConfig()+` aclExtendPolicies := []string{"extend-cache", "async-cache"}
acl_down_policy = "extend-cache" for _, aclDownPolicy := range aclExtendPolicies {
a := NewTestAgent(t.Name(), TestACLConfig()+`
acl_down_policy = "`+aclDownPolicy+`"
acl_enforce_version_8 = true acl_enforce_version_8 = true
`) `)
defer a.Shutdown() defer a.Shutdown()
m := MockServer{ m := MockServer{
// Populate the cache for one of the tokens. // Populate the cache for one of the tokens.
getPolicyFn: func(req *structs.ACLPolicyRequest, reply *structs.ACLPolicy) error { getPolicyFn: func(req *structs.ACLPolicyRequest, reply *structs.ACLPolicy) error {
*reply = structs.ACLPolicy{ *reply = structs.ACLPolicy{
Parent: "allow", Parent: "allow",
Policy: &rawacl.Policy{ Policy: &rawacl.Policy{
Agents: []*rawacl.AgentPolicy{ Agents: []*rawacl.AgentPolicy{
&rawacl.AgentPolicy{ &rawacl.AgentPolicy{
Node: a.config.NodeName, Node: a.config.NodeName,
Policy: "read", Policy: "read",
},
}, },
}, },
}, }
} return nil
return nil },
}, }
} if err := a.registerEndpoint("ACL", &m); err != nil {
if err := a.registerEndpoint("ACL", &m); err != nil { t.Fatalf("err: %v", err)
t.Fatalf("err: %v", err) }
}
acl, err := a.resolveToken("yep") acl, err := a.resolveToken("yep")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if acl == nil { if acl == nil {
t.Fatalf("should not be nil") t.Fatalf("should not be nil")
} }
if !acl.AgentRead(a.config.NodeName) { if !acl.AgentRead(a.config.NodeName) {
t.Fatalf("should allow") t.Fatalf("should allow")
} }
if acl.AgentWrite(a.config.NodeName) { if acl.AgentWrite(a.config.NodeName) {
t.Fatalf("should deny") t.Fatalf("should deny")
} }
// Now take down ACLs and make sure a new token fails to resolve. // Now take down ACLs and make sure a new token fails to resolve.
m.getPolicyFn = func(*structs.ACLPolicyRequest, *structs.ACLPolicy) error { m.getPolicyFn = func(*structs.ACLPolicyRequest, *structs.ACLPolicy) error {
return fmt.Errorf("ACLs are broken") return fmt.Errorf("ACLs are broken")
} }
acl, err = a.resolveToken("nope") acl, err = a.resolveToken("nope")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if acl == nil { if acl == nil {
t.Fatalf("should not be nil") t.Fatalf("should not be nil")
} }
if acl.AgentRead(a.config.NodeName) { if acl.AgentRead(a.config.NodeName) {
t.Fatalf("should deny") t.Fatalf("should deny")
} }
if acl.AgentWrite(a.config.NodeName) { if acl.AgentWrite(a.config.NodeName) {
t.Fatalf("should deny") t.Fatalf("should deny")
} }
// Read the token from the cache while ACLs are broken, which should // Read the token from the cache while ACLs are broken, which should
// extend. // extend.
acl, err = a.resolveToken("yep") acl, err = a.resolveToken("yep")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if acl == nil { if acl == nil {
t.Fatalf("should not be nil") t.Fatalf("should not be nil")
} }
if !acl.AgentRead(a.config.NodeName) { if !acl.AgentRead(a.config.NodeName) {
t.Fatalf("should allow") t.Fatalf("should allow")
} }
if acl.AgentWrite(a.config.NodeName) { if acl.AgentWrite(a.config.NodeName) {
t.Fatalf("should deny") t.Fatalf("should deny")
}
} }
} }

View File

@ -94,8 +94,10 @@ type RuntimeConfig struct {
// ACL's to be used to service requests. This // ACL's to be used to service requests. This
// is the default. If the ACL is not in the cache, // is the default. If the ACL is not in the cache,
// this acts like deny. // this acts like deny.
// * async-cache - Same behaviour as extend-cache, but perform ACL
// Lookups asynchronously when cache TTL is expired.
// //
// hcl: acl_down_policy = ("allow"|"deny"|"extend-cache") // hcl: acl_down_policy = ("allow"|"deny"|"extend-cache"|"async-cache")
ACLDownPolicy string ACLDownPolicy string
// ACLEnforceVersion8 is used to gate a set of ACL policy features that // ACLEnforceVersion8 is used to gate a set of ACL policy features that

View File

@ -146,10 +146,12 @@ func newACLCache(conf *Config, logger *log.Logger, rpc rpcFn, local acl.FaultFun
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to create ACL policy cache: %v", err) return nil, fmt.Errorf("Failed to create ACL policy cache: %v", err)
} }
cache.fetchMap = make(map[string][]chan (RemoteACLResult))
return cache, nil return cache, nil
} }
// Result Type returned when fetching Remote ACLs asynchronously
type RemoteACLResult struct { type RemoteACLResult struct {
result acl.ACL result acl.ACL
err error err error
@ -179,8 +181,9 @@ func (c *aclCache) fireResult(id string, theACL acl.ACL, err error) {
channels := c.fetchMap[id] channels := c.fetchMap[id]
delete(c.fetchMap, id) delete(c.fetchMap, id)
c.fetchMutex.Unlock() c.fetchMutex.Unlock()
aclResult := RemoteACLResult{theACL, err}
for _, cx := range channels { for _, cx := range channels {
cx <- RemoteACLResult{theACL, err} cx <- aclResult
close(cx) close(cx)
} }
} }
@ -231,7 +234,7 @@ func (c *aclCache) loadACLInChan(id, authDC string, cached *aclCacheEntry) {
// local ACL fault function is registered to query replicated ACL data, // local ACL fault function is registered to query replicated ACL data,
// and the user's policy allows it, we will try locally before we give // and the user's policy allows it, we will try locally before we give
// up. // up.
if c.local != nil && c.config.ACLDownPolicy == "extend-cache" { if c.local != nil && (c.config.ACLDownPolicy == "extend-cache" || c.config.ACLDownPolicy == "async-cache") {
parent, rules, err := c.local(id) parent, rules, err := c.local(id)
if err != nil { if err != nil {
// We don't make an exception here for ACLs that aren't // We don't make an exception here for ACLs that aren't
@ -274,6 +277,7 @@ ACL_DOWN:
case "allow": case "allow":
c.fireResult(id, acl.AllowAll(), nil) c.fireResult(id, acl.AllowAll(), nil)
return return
case "async-cache":
case "extend-cache": case "extend-cache":
if cached != nil { if cached != nil {
c.fireResult(id, cached.ACL, nil) c.fireResult(id, cached.ACL, nil)
@ -289,11 +293,11 @@ ACL_DOWN:
func (c *aclCache) lookupACLRemote(id, authDC string, cached *aclCacheEntry) RemoteACLResult { func (c *aclCache) lookupACLRemote(id, authDC string, cached *aclCacheEntry) RemoteACLResult {
// Attempt to refresh the policy from the ACL datacenter via an RPC. // Attempt to refresh the policy from the ACL datacenter via an RPC.
myChan := make(chan RemoteACLResult) myChan := make(chan RemoteACLResult)
mustWaitForResult := cached == nil || c.config.ACLDownPolicy != "extend-cache" mustWaitForResult := cached == nil || c.config.ACLDownPolicy != "async-cache"
c.fetchMutex.Lock() c.fetchMutex.Lock()
clients, ok := c.fetchMap[id] clients, ok := c.fetchMap[id]
if !ok { if !ok || clients == nil {
clients = make([]chan RemoteACLResult, 16) clients = make([]chan RemoteACLResult, 0)
} }
if mustWaitForResult { if mustWaitForResult {
c.fetchMap[id] = append(clients, myChan) c.fetchMap[id] = append(clients, myChan)

View File

@ -508,78 +508,82 @@ func TestACL_DownPolicy_Allow(t *testing.T) {
func TestACL_DownPolicy_ExtendCache(t *testing.T) { func TestACL_DownPolicy_ExtendCache(t *testing.T) {
t.Parallel() t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) { aclExtendPolicies := []string{"extend-cache", "async-cache"} //"async-cache"
c.ACLDatacenter = "dc1"
c.ACLTTL = 0
c.ACLDownPolicy = "extend-cache"
c.ACLMasterToken = "root"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
dir2, s2 := testServerWithConfig(t, func(c *Config) { for _, aclDownPolicy := range aclExtendPolicies {
c.ACLDatacenter = "dc1" // Enable ACLs! dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLTTL = 0 c.ACLDatacenter = "dc1"
c.ACLDownPolicy = "extend-cache" c.ACLTTL = 0
c.Bootstrap = false // Disable bootstrap c.ACLDownPolicy = aclDownPolicy
}) c.ACLMasterToken = "root"
defer os.RemoveAll(dir2) })
defer s2.Shutdown() defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
// Try to join dir2, s2 := testServerWithConfig(t, func(c *Config) {
joinLAN(t, s2, s1) c.ACLDatacenter = "dc1" // Enable ACLs!
retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) }) c.ACLTTL = 0
c.ACLDownPolicy = aclDownPolicy
c.Bootstrap = false // Disable bootstrap
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1") // Try to join
joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) })
// Create a new token testrpc.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testACLPolicy,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
// find the non-authoritative server // Create a new token
var nonAuth *Server arg := structs.ACLRequest{
var auth *Server Datacenter: "dc1",
if !s1.IsLeader() { Op: structs.ACLSet,
nonAuth = s1 ACL: structs.ACL{
auth = s2 Name: "User token",
} else { Type: structs.ACLTypeClient,
nonAuth = s2 Rules: testACLPolicy,
auth = s1 },
} WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
// Warm the caches // find the non-authoritative server
aclR, err := nonAuth.resolveToken(id) var nonAuth *Server
if err != nil { var auth *Server
t.Fatalf("err: %v", err) if !s1.IsLeader() {
} nonAuth = s1
if aclR == nil { auth = s2
t.Fatalf("bad acl: %#v", aclR) } else {
} nonAuth = s2
auth = s1
}
// Kill the authoritative server // Warm the caches
auth.Shutdown() aclR, err := nonAuth.resolveToken(id)
if err != nil {
t.Fatalf("err: %v", err)
}
if aclR == nil {
t.Fatalf("bad acl: %#v", aclR)
}
// Token should resolve into cached copy // Kill the authoritative server
aclR2, err := nonAuth.resolveToken(id) auth.Shutdown()
if err != nil {
t.Fatalf("err: %v", err) // Token should resolve into cached copy
} aclR2, err := nonAuth.resolveToken(id)
if aclR2 != aclR { if err != nil {
t.Fatalf("bad acl: %#v", aclR) t.Fatalf("err: %v", err)
}
if aclR2 != aclR {
t.Fatalf("bad acl: %#v", aclR)
}
} }
} }

View File

@ -235,8 +235,9 @@ type Config struct {
// ACLDownPolicy controls the behavior of ACLs if the ACLDatacenter // ACLDownPolicy controls the behavior of ACLs if the ACLDatacenter
// cannot be contacted. It can be either "deny" to deny all requests, // cannot be contacted. It can be either "deny" to deny all requests,
// or "extend-cache" which ignores the ACLCacheInterval and uses // "extend-cache" or "async-cache" which ignores the ACLCacheInterval and
// cached policies. If a policy is not in the cache, it acts like deny. // uses cached policies.
// If a policy is not in the cache, it acts like deny.
// "allow" can be used to allow all requests. This is not recommended. // "allow" can be used to allow all requests. This is not recommended.
ACLDownPolicy string ACLDownPolicy string
@ -378,6 +379,7 @@ func (c *Config) CheckACL() error {
switch c.ACLDownPolicy { switch c.ACLDownPolicy {
case "allow": case "allow":
case "deny": case "deny":
case "async-cache":
case "extend-cache": case "extend-cache":
default: default:
return fmt.Errorf("Unsupported down ACL policy: %s", c.ACLDownPolicy) return fmt.Errorf("Unsupported down ACL policy: %s", c.ACLDownPolicy)