From abde81a3e759b99aefab232d78fb871d9688f532 Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Sun, 1 Jul 2018 12:50:53 +0200 Subject: [PATCH] Added async-cache with similar behaviour as extend-cache but asynchronously --- agent/acl.go | 1 + agent/acl_test.go | 133 ++++++++++++++++++++------------------- agent/config/runtime.go | 4 +- agent/consul/acl.go | 14 +++-- agent/consul/acl_test.go | 132 +++++++++++++++++++------------------- agent/consul/config.go | 6 +- 6 files changed, 153 insertions(+), 137 deletions(-) diff --git a/agent/acl.go b/agent/acl.go index e266feafa8..1143da97cc 100644 --- a/agent/acl.go +++ b/agent/acl.go @@ -104,6 +104,7 @@ func newACLManager(config *config.RuntimeConfig) (*aclManager, error) { case "deny": down = acl.DenyAll() case "extend-cache": + case "async-cache": // Leave the down policy as nil to signal this. default: return nil, fmt.Errorf("invalid ACL down policy %q", config.ACLDownPolicy) diff --git a/agent/acl_test.go b/agent/acl_test.go index 8b16269981..0932dc5254 100644 --- a/agent/acl_test.go +++ b/agent/acl_test.go @@ -274,79 +274,82 @@ func TestACL_Down_Allow(t *testing.T) { func TestACL_Down_Extend(t *testing.T) { t.Parallel() - a := NewTestAgent(t.Name(), TestACLConfig()+` - acl_down_policy = "extend-cache" + aclExtendPolicies := []string{"extend-cache", "async-cache"} + for _, aclDownPolicy := range aclExtendPolicies { + a := NewTestAgent(t.Name(), TestACLConfig()+` + acl_down_policy = "`+aclDownPolicy+`" acl_enforce_version_8 = true `) - defer a.Shutdown() + defer a.Shutdown() - m := MockServer{ - // Populate the cache for one of the tokens. - getPolicyFn: func(req *structs.ACLPolicyRequest, reply *structs.ACLPolicy) error { - *reply = structs.ACLPolicy{ - Parent: "allow", - Policy: &rawacl.Policy{ - Agents: []*rawacl.AgentPolicy{ - &rawacl.AgentPolicy{ - Node: a.config.NodeName, - Policy: "read", + m := MockServer{ + // Populate the cache for one of the tokens. + getPolicyFn: func(req *structs.ACLPolicyRequest, reply *structs.ACLPolicy) error { + *reply = structs.ACLPolicy{ + Parent: "allow", + Policy: &rawacl.Policy{ + Agents: []*rawacl.AgentPolicy{ + &rawacl.AgentPolicy{ + Node: a.config.NodeName, + Policy: "read", + }, }, }, - }, - } - return nil - }, - } - if err := a.registerEndpoint("ACL", &m); err != nil { - t.Fatalf("err: %v", err) - } + } + return nil + }, + } + if err := a.registerEndpoint("ACL", &m); err != nil { + t.Fatalf("err: %v", err) + } - acl, err := a.resolveToken("yep") - if err != nil { - t.Fatalf("err: %v", err) - } - if acl == nil { - t.Fatalf("should not be nil") - } - if !acl.AgentRead(a.config.NodeName) { - t.Fatalf("should allow") - } - if acl.AgentWrite(a.config.NodeName) { - t.Fatalf("should deny") - } + acl, err := a.resolveToken("yep") + if err != nil { + t.Fatalf("err: %v", err) + } + if acl == nil { + t.Fatalf("should not be nil") + } + if !acl.AgentRead(a.config.NodeName) { + t.Fatalf("should allow") + } + if acl.AgentWrite(a.config.NodeName) { + t.Fatalf("should deny") + } - // Now take down ACLs and make sure a new token fails to resolve. - m.getPolicyFn = func(*structs.ACLPolicyRequest, *structs.ACLPolicy) error { - return fmt.Errorf("ACLs are broken") - } - acl, err = a.resolveToken("nope") - if err != nil { - t.Fatalf("err: %v", err) - } - if acl == nil { - t.Fatalf("should not be nil") - } - if acl.AgentRead(a.config.NodeName) { - t.Fatalf("should deny") - } - if acl.AgentWrite(a.config.NodeName) { - t.Fatalf("should deny") - } + // Now take down ACLs and make sure a new token fails to resolve. + m.getPolicyFn = func(*structs.ACLPolicyRequest, *structs.ACLPolicy) error { + return fmt.Errorf("ACLs are broken") + } + acl, err = a.resolveToken("nope") + if err != nil { + t.Fatalf("err: %v", err) + } + if acl == nil { + t.Fatalf("should not be nil") + } + if acl.AgentRead(a.config.NodeName) { + t.Fatalf("should deny") + } + if acl.AgentWrite(a.config.NodeName) { + t.Fatalf("should deny") + } - // Read the token from the cache while ACLs are broken, which should - // extend. - acl, err = a.resolveToken("yep") - if err != nil { - t.Fatalf("err: %v", err) - } - if acl == nil { - t.Fatalf("should not be nil") - } - if !acl.AgentRead(a.config.NodeName) { - t.Fatalf("should allow") - } - if acl.AgentWrite(a.config.NodeName) { - t.Fatalf("should deny") + // Read the token from the cache while ACLs are broken, which should + // extend. + acl, err = a.resolveToken("yep") + if err != nil { + t.Fatalf("err: %v", err) + } + if acl == nil { + t.Fatalf("should not be nil") + } + if !acl.AgentRead(a.config.NodeName) { + t.Fatalf("should allow") + } + if acl.AgentWrite(a.config.NodeName) { + t.Fatalf("should deny") + } } } diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 9674d14d5f..05ad8796ca 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -94,8 +94,10 @@ type RuntimeConfig struct { // ACL's to be used to service requests. This // is the default. If the ACL is not in the cache, // this acts like deny. + // * async-cache - Same behaviour as extend-cache, but perform ACL + // Lookups asynchronously when cache TTL is expired. // - // hcl: acl_down_policy = ("allow"|"deny"|"extend-cache") + // hcl: acl_down_policy = ("allow"|"deny"|"extend-cache"|"async-cache") ACLDownPolicy string // ACLEnforceVersion8 is used to gate a set of ACL policy features that diff --git a/agent/consul/acl.go b/agent/consul/acl.go index 0bf1dbb4cd..23bdbff356 100644 --- a/agent/consul/acl.go +++ b/agent/consul/acl.go @@ -146,10 +146,12 @@ func newACLCache(conf *Config, logger *log.Logger, rpc rpcFn, local acl.FaultFun if err != nil { return nil, fmt.Errorf("Failed to create ACL policy cache: %v", err) } + cache.fetchMap = make(map[string][]chan (RemoteACLResult)) return cache, nil } +// Result Type returned when fetching Remote ACLs asynchronously type RemoteACLResult struct { result acl.ACL err error @@ -179,8 +181,9 @@ func (c *aclCache) fireResult(id string, theACL acl.ACL, err error) { channels := c.fetchMap[id] delete(c.fetchMap, id) c.fetchMutex.Unlock() + aclResult := RemoteACLResult{theACL, err} for _, cx := range channels { - cx <- RemoteACLResult{theACL, err} + cx <- aclResult close(cx) } } @@ -231,7 +234,7 @@ func (c *aclCache) loadACLInChan(id, authDC string, cached *aclCacheEntry) { // local ACL fault function is registered to query replicated ACL data, // and the user's policy allows it, we will try locally before we give // up. - if c.local != nil && c.config.ACLDownPolicy == "extend-cache" { + if c.local != nil && (c.config.ACLDownPolicy == "extend-cache" || c.config.ACLDownPolicy == "async-cache") { parent, rules, err := c.local(id) if err != nil { // We don't make an exception here for ACLs that aren't @@ -274,6 +277,7 @@ ACL_DOWN: case "allow": c.fireResult(id, acl.AllowAll(), nil) return + case "async-cache": case "extend-cache": if cached != nil { c.fireResult(id, cached.ACL, nil) @@ -289,11 +293,11 @@ ACL_DOWN: func (c *aclCache) lookupACLRemote(id, authDC string, cached *aclCacheEntry) RemoteACLResult { // Attempt to refresh the policy from the ACL datacenter via an RPC. myChan := make(chan RemoteACLResult) - mustWaitForResult := cached == nil || c.config.ACLDownPolicy != "extend-cache" + mustWaitForResult := cached == nil || c.config.ACLDownPolicy != "async-cache" c.fetchMutex.Lock() clients, ok := c.fetchMap[id] - if !ok { - clients = make([]chan RemoteACLResult, 16) + if !ok || clients == nil { + clients = make([]chan RemoteACLResult, 0) } if mustWaitForResult { c.fetchMap[id] = append(clients, myChan) diff --git a/agent/consul/acl_test.go b/agent/consul/acl_test.go index ace1284a86..fd08b54ab8 100644 --- a/agent/consul/acl_test.go +++ b/agent/consul/acl_test.go @@ -508,78 +508,82 @@ func TestACL_DownPolicy_Allow(t *testing.T) { func TestACL_DownPolicy_ExtendCache(t *testing.T) { t.Parallel() - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.ACLDatacenter = "dc1" - c.ACLTTL = 0 - c.ACLDownPolicy = "extend-cache" - c.ACLMasterToken = "root" - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - client := rpcClient(t, s1) - defer client.Close() + aclExtendPolicies := []string{"extend-cache", "async-cache"} //"async-cache" - dir2, s2 := testServerWithConfig(t, func(c *Config) { - c.ACLDatacenter = "dc1" // Enable ACLs! - c.ACLTTL = 0 - c.ACLDownPolicy = "extend-cache" - c.Bootstrap = false // Disable bootstrap - }) - defer os.RemoveAll(dir2) - defer s2.Shutdown() + for _, aclDownPolicy := range aclExtendPolicies { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLTTL = 0 + c.ACLDownPolicy = aclDownPolicy + c.ACLMasterToken = "root" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() - // Try to join - joinLAN(t, s2, s1) - retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) }) + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" // Enable ACLs! + c.ACLTTL = 0 + c.ACLDownPolicy = aclDownPolicy + c.Bootstrap = false // Disable bootstrap + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() - testrpc.WaitForLeader(t, s1.RPC, "dc1") + // Try to join + joinLAN(t, s2, s1) + retry.Run(t, func(r *retry.R) { r.Check(wantRaft([]*Server{s1, s2})) }) - // Create a new token - arg := structs.ACLRequest{ - Datacenter: "dc1", - Op: structs.ACLSet, - ACL: structs.ACL{ - Name: "User token", - Type: structs.ACLTypeClient, - Rules: testACLPolicy, - }, - WriteRequest: structs.WriteRequest{Token: "root"}, - } - var id string - if err := s1.RPC("ACL.Apply", &arg, &id); err != nil { - t.Fatalf("err: %v", err) - } + testrpc.WaitForLeader(t, s1.RPC, "dc1") - // find the non-authoritative server - var nonAuth *Server - var auth *Server - if !s1.IsLeader() { - nonAuth = s1 - auth = s2 - } else { - nonAuth = s2 - auth = s1 - } + // Create a new token + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: testACLPolicy, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var id string + if err := s1.RPC("ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } - // Warm the caches - aclR, err := nonAuth.resolveToken(id) - if err != nil { - t.Fatalf("err: %v", err) - } - if aclR == nil { - t.Fatalf("bad acl: %#v", aclR) - } + // find the non-authoritative server + var nonAuth *Server + var auth *Server + if !s1.IsLeader() { + nonAuth = s1 + auth = s2 + } else { + nonAuth = s2 + auth = s1 + } - // Kill the authoritative server - auth.Shutdown() + // Warm the caches + aclR, err := nonAuth.resolveToken(id) + if err != nil { + t.Fatalf("err: %v", err) + } + if aclR == nil { + t.Fatalf("bad acl: %#v", aclR) + } - // Token should resolve into cached copy - aclR2, err := nonAuth.resolveToken(id) - if err != nil { - t.Fatalf("err: %v", err) - } - if aclR2 != aclR { - t.Fatalf("bad acl: %#v", aclR) + // Kill the authoritative server + auth.Shutdown() + + // Token should resolve into cached copy + aclR2, err := nonAuth.resolveToken(id) + if err != nil { + t.Fatalf("err: %v", err) + } + if aclR2 != aclR { + t.Fatalf("bad acl: %#v", aclR) + } } } diff --git a/agent/consul/config.go b/agent/consul/config.go index 29a5245319..570a07c163 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -235,8 +235,9 @@ type Config struct { // ACLDownPolicy controls the behavior of ACLs if the ACLDatacenter // cannot be contacted. It can be either "deny" to deny all requests, - // or "extend-cache" which ignores the ACLCacheInterval and uses - // cached policies. If a policy is not in the cache, it acts like deny. + // "extend-cache" or "async-cache" which ignores the ACLCacheInterval and + // uses cached policies. + // If a policy is not in the cache, it acts like deny. // "allow" can be used to allow all requests. This is not recommended. ACLDownPolicy string @@ -378,6 +379,7 @@ func (c *Config) CheckACL() error { switch c.ACLDownPolicy { case "allow": case "deny": + case "async-cache": case "extend-cache": default: return fmt.Errorf("Unsupported down ACL policy: %s", c.ACLDownPolicy)