Adds remaining core replication tests.

This commit is contained in:
James Phillips 2016-08-04 16:33:40 -07:00
parent f44bc7e97a
commit 0a9060bb84
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
3 changed files with 229 additions and 2 deletions

View File

@ -197,6 +197,18 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err) c.logger.Printf("[ERR] consul.acl: Failed to get policy from ACL datacenter: %v", err)
} }
// TODO (slackpad) - We could do a similar thing *within* the ACL
// datacenter if the leader isn't available. We have a local state
// store of the ACLs, so by populating the local member in this cache,
// it would fall back to the state store if there was a leader loss and
// the extend-cache policy was true. This feels subtle to explain and
// configure, and leader blips should be paved over by cache already, so
// we won't do this for now but should consider for the future. This is
// a lot different than the replication story where you might be cut off
// from the ACL datacenter for an extended period of time and need to
// carry on operating with the full set of ACLs as they were known
// before the partition.
// At this point we might have an expired cache entry and we know that // At this point we might have an expired cache entry and we know that
// there was a problem getting the ACL from the ACL datacenter. If a // there was a problem getting the ACL from the ACL datacenter. If a
// local ACL fault function is registered to query replicated ACL data, // local ACL fault function is registered to query replicated ACL data,

View File

@ -179,7 +179,7 @@ func (s *Server) updateLocalACLs(changes structs.ACLRequests) error {
// operations and wait out to the second before we continue. If // operations and wait out to the second before we continue. If
// it's going slower than that, the sleep time will be negative // it's going slower than that, the sleep time will be negative
// so we will just keep going without delay. // so we will just keep going without delay.
if ops > s.config.ACLReplicationApplyLimit { if ops >= s.config.ACLReplicationApplyLimit {
elapsed := time.Now().Sub(start) elapsed := time.Now().Sub(start)
time.Sleep(1*time.Second - elapsed) time.Sleep(1*time.Second - elapsed)
ops, start = 0, time.Now() ops, start = 0, time.Now()

View File

@ -2,13 +2,16 @@ package consul
import ( import (
"fmt" "fmt"
"os"
"reflect" "reflect"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
"time"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
) )
func TestACLReplication_Sorter(t *testing.T) { func TestACLReplication_Sorter(t *testing.T) {
@ -203,7 +206,7 @@ func TestACLReplication_reconcileACLs(t *testing.T) {
{ {
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X", local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
remote: "aaa:99:X|bbb:3:X|ccx:101:X|ddd:103:Y|eee:11:X|fff:102:X", remote: "aaa:99:X|bbb:3:X|ccx:101:X|ddd:103:Y|eee:11:X|fff:102:X",
lastRemoteIndex: 0, lastRemoteIndex: 11,
expected: "set:aaa:X|delete:ccc:X|set:ccx:X|set:ddd:Y|set:fff:X", expected: "set:aaa:X|delete:ccc:X|set:ccx:X|set:ddd:Y|set:fff:X",
}, },
} }
@ -215,3 +218,215 @@ func TestACLReplication_reconcileACLs(t *testing.T) {
} }
} }
} }
func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
c.ACLReplicationToken = "secret"
c.ACLReplicationApplyLimit = 1
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC, "dc2")
changes := structs.ACLRequests{
&structs.ACLRequest{
Op: structs.ACLSet,
ACL: structs.ACL{
ID: "secret",
Type: "client",
},
},
}
// Under the limit, should be quick.
start := time.Now()
if err := s1.updateLocalACLs(changes); err != nil {
t.Fatalf("err: %v", err)
}
if dur := time.Now().Sub(start); dur > 500*time.Millisecond {
t.Fatalf("too slow: %9.6f", dur.Seconds())
}
changes = append(changes,
&structs.ACLRequest{
Op: structs.ACLSet,
ACL: structs.ACL{
ID: "secret",
Type: "client",
},
})
// Over the limit, should be throttled.
start = time.Now()
if err := s1.updateLocalACLs(changes); err != nil {
t.Fatalf("err: %v", err)
}
if dur := time.Now().Sub(start); dur < 500*time.Millisecond {
t.Fatalf("too fast: %9.6f", dur.Seconds())
}
}
func TestACLReplication_IsACLReplicationEnabled(t *testing.T) {
// ACLs not enabled.
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = ""
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
if s1.IsACLReplicationEnabled() {
t.Fatalf("should not be enabled")
}
// ACLs enabled but not replication.
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
if s2.IsACLReplicationEnabled() {
t.Fatalf("should not be enabled")
}
// ACLs enabled with replication.
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
c.ACLReplicationToken = "secret"
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
if !s3.IsACLReplicationEnabled() {
t.Fatalf("should be enabled")
}
// ACLs enabled and replication token set, but inside the ACL datacenter
// so replication should be disabled.
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.ACLDatacenter = "dc1"
c.ACLReplicationToken = "secret"
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
if s4.IsACLReplicationEnabled() {
t.Fatalf("should not be enabled")
}
}
func TestACLReplication(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.ACLDatacenter = "dc1"
c.ACLReplicationToken = "root"
c.ACLReplicationInterval = 0
c.ACLReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
// Try to join.
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinWAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, s1.RPC, "dc1")
testutil.WaitForLeader(t, s1.RPC, "dc2")
// Create a bunch of new tokens.
var id string
for i := 0; i < 1000; i++ {
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testACLPolicy,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
checkSame := func() (bool, error) {
_, remote, err := s1.fsm.State().ACLList()
if err != nil {
return false, err
}
_, local, err := s2.fsm.State().ACLList()
if err != nil {
return false, err
}
if len(remote) != len(local) {
return false, nil
}
for i, acl := range remote {
if !acl.IsSame(local[i]) {
return false, nil
}
}
return true, nil
}
// Wait for the replica to converge.
testutil.WaitForResult(checkSame, func(err error) {
t.Fatalf("ACLs didn't converge")
})
// Create more new tokens.
for i := 0; i < 1000; i++ {
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testACLPolicy,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var dontCare string
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
t.Fatalf("err: %v", err)
}
}
// Wait for the replica to converge.
testutil.WaitForResult(checkSame, func(err error) {
t.Fatalf("ACLs didn't converge")
})
// Delete a token.
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLDelete,
ACL: structs.ACL{
ID: id,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var dontCare string
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
t.Fatalf("err: %v", err)
}
// Wait for the replica to converge.
testutil.WaitForResult(checkSame, func(err error) {
t.Fatalf("ACLs didn't converge")
})
}