diff --git a/consul/acl.go b/consul/acl.go index a4da8211cf..e583df5c8b 100644 --- a/consul/acl.go +++ b/consul/acl.go @@ -197,6 +197,18 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) { c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err) } + // TODO (slackpad) - We could do a similar thing *within* the ACL + // datacenter if the leader isn't available. We have a local state + // store of the ACLs, so by populating the local member in this cache, + // it would fall back to the state store if there was a leader loss and + // the extend-cache policy was true. This feels subtle to explain and + // configure, and leader blips should be paved over by cache already, so + // we won't do this for now but should consider for the future. This is + // a lot different than the replication story where you might be cut off + // from the ACL datacenter for an extended period of time and need to + // carry on operating with the full set of ACLs as they were known + // before the partition. + // At this point we might have an expired cache entry and we know that // there was a problem getting the ACL from the ACL datacenter. If a // local ACL fault function is registered to query replicated ACL data, diff --git a/consul/acl_replication.go b/consul/acl_replication.go index b7e1609309..075a6b46dc 100644 --- a/consul/acl_replication.go +++ b/consul/acl_replication.go @@ -179,7 +179,7 @@ func (s *Server) updateLocalACLs(changes structs.ACLRequests) error { // operations and wait out to the second before we continue. If // it's going slower than that, the sleep time will be negative // so we will just keep going without delay. - if ops > s.config.ACLReplicationApplyLimit { + if ops >= s.config.ACLReplicationApplyLimit { elapsed := time.Now().Sub(start) time.Sleep(1*time.Second - elapsed) ops, start = 0, time.Now() diff --git a/consul/acl_replication_test.go b/consul/acl_replication_test.go index eaa9f0cee9..3f9b854f26 100644 --- a/consul/acl_replication_test.go +++ b/consul/acl_replication_test.go @@ -2,13 +2,16 @@ package consul import ( "fmt" + "os" "reflect" "sort" "strconv" "strings" "testing" + "time" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/testutil" ) func TestACLReplication_Sorter(t *testing.T) { @@ -203,7 +206,7 @@ func TestACLReplication_reconcileACLs(t *testing.T) { { local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X", remote: "aaa:99:X|bbb:3:X|ccx:101:X|ddd:103:Y|eee:11:X|fff:102:X", - lastRemoteIndex: 0, + lastRemoteIndex: 11, expected: "set:aaa:X|delete:ccc:X|set:ccx:X|set:ddd:Y|set:fff:X", }, } @@ -215,3 +218,215 @@ func TestACLReplication_reconcileACLs(t *testing.T) { } } } + +func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.ACLDatacenter = "dc1" + c.ACLReplicationToken = "secret" + c.ACLReplicationApplyLimit = 1 + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC, "dc2") + + changes := structs.ACLRequests{ + &structs.ACLRequest{ + Op: structs.ACLSet, + ACL: structs.ACL{ + ID: "secret", + Type: "client", + }, + }, + } + + // Under the limit, should be quick. + start := time.Now() + if err := s1.updateLocalACLs(changes); err != nil { + t.Fatalf("err: %v", err) + } + if dur := time.Now().Sub(start); dur > 500*time.Millisecond { + t.Fatalf("too slow: %9.6f", dur.Seconds()) + } + + changes = append(changes, + &structs.ACLRequest{ + Op: structs.ACLSet, + ACL: structs.ACL{ + ID: "secret", + Type: "client", + }, + }) + + // Over the limit, should be throttled. + start = time.Now() + if err := s1.updateLocalACLs(changes); err != nil { + t.Fatalf("err: %v", err) + } + if dur := time.Now().Sub(start); dur < 500*time.Millisecond { + t.Fatalf("too fast: %9.6f", dur.Seconds()) + } +} + +func TestACLReplication_IsACLReplicationEnabled(t *testing.T) { + // ACLs not enabled. + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + if s1.IsACLReplicationEnabled() { + t.Fatalf("should not be enabled") + } + + // ACLs enabled but not replication. + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.ACLDatacenter = "dc1" + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + if s2.IsACLReplicationEnabled() { + t.Fatalf("should not be enabled") + } + + // ACLs enabled with replication. + dir3, s3 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.ACLDatacenter = "dc1" + c.ACLReplicationToken = "secret" + }) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + if !s3.IsACLReplicationEnabled() { + t.Fatalf("should be enabled") + } + + // ACLs enabled and replication token set, but inside the ACL datacenter + // so replication should be disabled. + dir4, s4 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.ACLDatacenter = "dc1" + c.ACLReplicationToken = "secret" + }) + defer os.RemoveAll(dir4) + defer s4.Shutdown() + if s4.IsACLReplicationEnabled() { + t.Fatalf("should not be enabled") + } +} + +func TestACLReplication(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.ACLDatacenter = "dc1" + c.ACLReplicationToken = "root" + c.ACLReplicationInterval = 0 + c.ACLReplicationApplyLimit = 1000000 + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Try to join. + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfWANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForLeader(t, s1.RPC, "dc1") + testutil.WaitForLeader(t, s1.RPC, "dc2") + + // Create a bunch of new tokens. + var id string + for i := 0; i < 1000; i++ { + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: testACLPolicy, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := s1.RPC("ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + } + + checkSame := func() (bool, error) { + _, remote, err := s1.fsm.State().ACLList() + if err != nil { + return false, err + } + _, local, err := s2.fsm.State().ACLList() + if err != nil { + return false, err + } + if len(remote) != len(local) { + return false, nil + } + for i, acl := range remote { + if !acl.IsSame(local[i]) { + return false, nil + } + } + return true, nil + } + + // Wait for the replica to converge. + testutil.WaitForResult(checkSame, func(err error) { + t.Fatalf("ACLs didn't converge") + }) + + // Create more new tokens. + for i := 0; i < 1000; i++ { + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: testACLPolicy, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var dontCare string + if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Wait for the replica to converge. + testutil.WaitForResult(checkSame, func(err error) { + t.Fatalf("ACLs didn't converge") + }) + + // Delete a token. + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLDelete, + ACL: structs.ACL{ + ID: id, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var dontCare string + if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the replica to converge. + testutil.WaitForResult(checkSame, func(err error) { + t.Fatalf("ACLs didn't converge") + }) +}