mirror of https://github.com/status-im/consul.git
acl: remove legacy ACL replication
This commit is contained in:
parent
bffdce7c1e
commit
d63cef1219
|
@ -477,14 +477,6 @@ func (s *Server) replicateACLType(ctx context.Context, logger hclog.Logger, tr a
|
|||
return remoteIndex, false, nil
|
||||
}
|
||||
|
||||
// IsACLReplicationEnabled returns true if ACL replication is enabled.
|
||||
// DEPRECATED (ACL-Legacy-Compat) - with new ACLs at least policy replication is required
|
||||
func (s *Server) IsACLReplicationEnabled() bool {
|
||||
authDC := s.config.PrimaryDatacenter
|
||||
return len(authDC) > 0 && (authDC != s.config.Datacenter) &&
|
||||
s.config.ACLTokenReplication
|
||||
}
|
||||
|
||||
func (s *Server) updateACLReplicationStatusError(errorMsg string) {
|
||||
s.aclReplicationStatusLock.Lock()
|
||||
defer s.aclReplicationStatusLock.Unlock()
|
||||
|
@ -499,8 +491,6 @@ func (s *Server) updateACLReplicationStatusIndex(replicationType structs.ACLRepl
|
|||
|
||||
s.aclReplicationStatus.LastSuccess = time.Now().Round(time.Second).UTC()
|
||||
switch replicationType {
|
||||
case structs.ACLReplicateLegacy:
|
||||
s.aclReplicationStatus.ReplicatedIndex = index
|
||||
case structs.ACLReplicateTokens:
|
||||
s.aclReplicationStatus.ReplicatedTokenIndex = index
|
||||
case structs.ACLReplicatePolicies:
|
||||
|
|
|
@ -1,276 +0,0 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
metrics "github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// aclIterator simplifies the algorithm below by providing a basic iterator that
|
||||
// moves through a list of ACLs and returns nil when it's exhausted. It also has
|
||||
// methods for pre-sorting the ACLs being iterated over by ID, which should
|
||||
// already be true, but since this is crucial for correctness and we are taking
|
||||
// input from other servers, we sort to make sure.
|
||||
type aclIterator struct {
|
||||
acls structs.ACLs
|
||||
|
||||
// index is the current position of the iterator.
|
||||
index int
|
||||
}
|
||||
|
||||
// newACLIterator returns a new ACL iterator.
|
||||
func newACLIterator(acls structs.ACLs) *aclIterator {
|
||||
return &aclIterator{acls: acls}
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (a *aclIterator) Len() int {
|
||||
return len(a.acls)
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (a *aclIterator) Swap(i, j int) {
|
||||
a.acls[i], a.acls[j] = a.acls[j], a.acls[i]
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (a *aclIterator) Less(i, j int) bool {
|
||||
return a.acls[i].ID < a.acls[j].ID
|
||||
}
|
||||
|
||||
// Front returns the item at index position, or nil if the list is exhausted.
|
||||
func (a *aclIterator) Front() *structs.ACL {
|
||||
if a.index < len(a.acls) {
|
||||
return a.acls[a.index]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Next advances the iterator to the next index.
|
||||
func (a *aclIterator) Next() {
|
||||
a.index++
|
||||
}
|
||||
|
||||
// reconcileACLs takes the local and remote ACL state, and produces a list of
|
||||
// changes required in order to bring the local ACLs into sync with the remote
|
||||
// ACLs. You can supply lastRemoteIndex as a hint that replication has succeeded
|
||||
// up to that remote index and it will make this process more efficient by only
|
||||
// comparing ACL entries modified after that index. Setting this to 0 will force
|
||||
// a full compare of all existing ACLs.
|
||||
func reconcileLegacyACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.ACLRequests {
|
||||
// Since sorting the lists is crucial for correctness, we are depending
|
||||
// on data coming from other servers potentially running a different,
|
||||
// version of Consul, and sorted-ness is kind of a subtle property of
|
||||
// the state store indexing, it's prudent to make sure things are sorted
|
||||
// before we begin.
|
||||
localIter, remoteIter := newACLIterator(local), newACLIterator(remote)
|
||||
sort.Sort(localIter)
|
||||
sort.Sort(remoteIter)
|
||||
|
||||
// Run through both lists and reconcile them.
|
||||
var changes structs.ACLRequests
|
||||
for localIter.Front() != nil || remoteIter.Front() != nil {
|
||||
// If the local list is exhausted, then process this as a remote
|
||||
// add. We know from the loop condition that there's something
|
||||
// in the remote list.
|
||||
if localIter.Front() == nil {
|
||||
changes = append(changes, &structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: *(remoteIter.Front()),
|
||||
})
|
||||
remoteIter.Next()
|
||||
continue
|
||||
}
|
||||
|
||||
// If the remote list is exhausted, then process this as a local
|
||||
// delete. We know from the loop condition that there's something
|
||||
// in the local list.
|
||||
if remoteIter.Front() == nil {
|
||||
changes = append(changes, &structs.ACLRequest{
|
||||
Op: structs.ACLDelete,
|
||||
ACL: *(localIter.Front()),
|
||||
})
|
||||
localIter.Next()
|
||||
continue
|
||||
}
|
||||
|
||||
// At this point we know there's something at the front of each
|
||||
// list we need to resolve.
|
||||
|
||||
// If the remote list has something local doesn't, we add it.
|
||||
if localIter.Front().ID > remoteIter.Front().ID {
|
||||
changes = append(changes, &structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: *(remoteIter.Front()),
|
||||
})
|
||||
remoteIter.Next()
|
||||
continue
|
||||
}
|
||||
|
||||
// If local has something remote doesn't, we delete it.
|
||||
if localIter.Front().ID < remoteIter.Front().ID {
|
||||
changes = append(changes, &structs.ACLRequest{
|
||||
Op: structs.ACLDelete,
|
||||
ACL: *(localIter.Front()),
|
||||
})
|
||||
localIter.Next()
|
||||
continue
|
||||
}
|
||||
|
||||
// Local and remote have an ACL with the same ID, so we might
|
||||
// need to compare them.
|
||||
l, r := localIter.Front(), remoteIter.Front()
|
||||
if r.RaftIndex.ModifyIndex > lastRemoteIndex && !r.IsSame(l) {
|
||||
changes = append(changes, &structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: *r,
|
||||
})
|
||||
}
|
||||
localIter.Next()
|
||||
remoteIter.Next()
|
||||
}
|
||||
return changes
|
||||
}
|
||||
|
||||
// FetchLocalACLs returns the ACLs in the local state store.
|
||||
func (s *Server) fetchLocalLegacyACLs() (structs.ACLs, error) {
|
||||
_, local, err := s.fsm.State().ACLTokenList(nil, false, true, "", "", "", nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
var acls structs.ACLs
|
||||
for _, token := range local {
|
||||
if token.IsExpired(now) {
|
||||
continue
|
||||
}
|
||||
if acl, err := token.Convert(); err == nil && acl != nil {
|
||||
acls = append(acls, acl)
|
||||
}
|
||||
}
|
||||
|
||||
return acls, nil
|
||||
}
|
||||
|
||||
// FetchRemoteACLs is used to get the remote set of ACLs from the ACL
|
||||
// datacenter. The lastIndex parameter is a hint about which remote index we
|
||||
// have replicated to, so this is expected to block until something changes.
|
||||
func (s *Server) fetchRemoteLegacyACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, error) {
|
||||
defer metrics.MeasureSince([]string{"leader", "fetchRemoteACLs"}, time.Now())
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: s.config.PrimaryDatacenter,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: s.tokens.ReplicationToken(),
|
||||
MinQueryIndex: lastRemoteIndex,
|
||||
AllowStale: true,
|
||||
},
|
||||
}
|
||||
var remote structs.IndexedACLs
|
||||
if err := s.RPC("ACL.List", &args, &remote); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &remote, nil
|
||||
}
|
||||
|
||||
// UpdateLocalACLs is given a list of changes to apply in order to bring the
|
||||
// local ACLs in-line with the remote ACLs from the ACL datacenter.
|
||||
func (s *Server) updateLocalLegacyACLs(changes structs.ACLRequests, ctx context.Context) (bool, error) {
|
||||
defer metrics.MeasureSince([]string{"leader", "updateLocalACLs"}, time.Now())
|
||||
|
||||
minTimePerOp := time.Second / time.Duration(s.config.ACLReplicationApplyLimit)
|
||||
for _, change := range changes {
|
||||
// Note that we are using the single ACL interface here and not
|
||||
// performing all this inside a single transaction. This is OK
|
||||
// for two reasons. First, there's nothing else other than this
|
||||
// replication routine that alters the local ACLs, so there's
|
||||
// nothing to contend with locally. Second, if an apply fails
|
||||
// in the middle (most likely due to losing leadership), the
|
||||
// next replication pass will clean up and check everything
|
||||
// again.
|
||||
var reply string
|
||||
start := time.Now()
|
||||
if err := aclApplyInternal(s, change, &reply); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Do a smooth rate limit to wait out the min time allowed for
|
||||
// each op. If this op took longer than the min, then the sleep
|
||||
// time will be negative and we will just move on.
|
||||
elapsed := time.Since(start)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return true, nil
|
||||
case <-time.After(minTimePerOp - elapsed):
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// replicateACLs is a runs one pass of the algorithm for replicating ACLs from
|
||||
// a remote ACL datacenter to local state. If there's any error, this will return
|
||||
// 0 for the lastRemoteIndex, which will cause us to immediately do a full sync
|
||||
// next time.
|
||||
func (s *Server) replicateLegacyACLs(ctx context.Context, logger hclog.Logger, lastRemoteIndex uint64) (uint64, bool, error) {
|
||||
remote, err := s.fetchRemoteLegacyACLs(lastRemoteIndex)
|
||||
if err != nil {
|
||||
return 0, false, fmt.Errorf("failed to retrieve remote ACLs: %v", err)
|
||||
}
|
||||
|
||||
// Need to check if we should be stopping. This will be common as the fetching process is a blocking
|
||||
// RPC which could have been hanging around for a long time and during that time leadership could
|
||||
// have been lost.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, true, nil
|
||||
default:
|
||||
// do nothing
|
||||
}
|
||||
|
||||
// Measure everything after the remote query, which can block for long
|
||||
// periods of time. This metric is a good measure of how expensive the
|
||||
// replication process is.
|
||||
defer metrics.MeasureSince([]string{"leader", "replicateACLs"}, time.Now())
|
||||
|
||||
local, err := s.fetchLocalLegacyACLs()
|
||||
if err != nil {
|
||||
return 0, false, fmt.Errorf("failed to retrieve local ACLs: %v", err)
|
||||
}
|
||||
|
||||
// If the remote index ever goes backwards, it's a good indication that
|
||||
// the remote side was rebuilt and we should do a full sync since we
|
||||
// can't make any assumptions about what's going on.
|
||||
if remote.QueryMeta.Index < lastRemoteIndex {
|
||||
logger.Warn(
|
||||
"Legacy ACL replication remote index moved backwards, forcing a full ACL sync",
|
||||
"from", lastRemoteIndex,
|
||||
"to", remote.QueryMeta.Index,
|
||||
)
|
||||
lastRemoteIndex = 0
|
||||
}
|
||||
|
||||
// Calculate the changes required to bring the state into sync and then
|
||||
// apply them.
|
||||
changes := reconcileLegacyACLs(local, remote.ACLs, lastRemoteIndex)
|
||||
exit, err := s.updateLocalLegacyACLs(changes, ctx)
|
||||
if exit {
|
||||
return 0, true, nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return 0, false, fmt.Errorf("failed to sync ACL changes: %v", err)
|
||||
}
|
||||
|
||||
// Return the index we got back from the remote side, since we've synced
|
||||
// up with the remote state as of that index.
|
||||
return remote.QueryMeta.Index, false, nil
|
||||
}
|
|
@ -1,491 +0,0 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
tokenStore "github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
)
|
||||
|
||||
func TestACLReplication_Sorter(t *testing.T) {
|
||||
t.Parallel()
|
||||
acls := structs.ACLs{
|
||||
&structs.ACL{ID: "a"},
|
||||
&structs.ACL{ID: "b"},
|
||||
&structs.ACL{ID: "c"},
|
||||
}
|
||||
|
||||
sorter := &aclIterator{acls, 0}
|
||||
if len := sorter.Len(); len != 3 {
|
||||
t.Fatalf("bad: %d", len)
|
||||
}
|
||||
if !sorter.Less(0, 1) {
|
||||
t.Fatalf("should be less")
|
||||
}
|
||||
if sorter.Less(1, 0) {
|
||||
t.Fatalf("should not be less")
|
||||
}
|
||||
if !sort.IsSorted(sorter) {
|
||||
t.Fatalf("should be sorted")
|
||||
}
|
||||
|
||||
expected := structs.ACLs{
|
||||
&structs.ACL{ID: "b"},
|
||||
&structs.ACL{ID: "a"},
|
||||
&structs.ACL{ID: "c"},
|
||||
}
|
||||
sorter.Swap(0, 1)
|
||||
if !reflect.DeepEqual(acls, expected) {
|
||||
t.Fatalf("bad: %v", acls)
|
||||
}
|
||||
if sort.IsSorted(sorter) {
|
||||
t.Fatalf("should not be sorted")
|
||||
}
|
||||
sort.Sort(sorter)
|
||||
if !sort.IsSorted(sorter) {
|
||||
t.Fatalf("should be sorted")
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_Iterator(t *testing.T) {
|
||||
t.Parallel()
|
||||
acls := structs.ACLs{}
|
||||
|
||||
iter := newACLIterator(acls)
|
||||
if front := iter.Front(); front != nil {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != nil {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
|
||||
acls = structs.ACLs{
|
||||
&structs.ACL{ID: "a"},
|
||||
&structs.ACL{ID: "b"},
|
||||
&structs.ACL{ID: "c"},
|
||||
}
|
||||
iter = newACLIterator(acls)
|
||||
if front := iter.Front(); front != acls[0] {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != acls[1] {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != acls[2] {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != nil {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_reconcileACLs(t *testing.T) {
|
||||
t.Parallel()
|
||||
parseACLs := func(raw string) structs.ACLs {
|
||||
var acls structs.ACLs
|
||||
for _, key := range strings.Split(raw, "|") {
|
||||
if len(key) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
tuple := strings.Split(key, ":")
|
||||
index, err := strconv.Atoi(tuple[1])
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
acl := &structs.ACL{
|
||||
ID: tuple[0],
|
||||
Rules: tuple[2],
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: uint64(index),
|
||||
},
|
||||
}
|
||||
acls = append(acls, acl)
|
||||
}
|
||||
return acls
|
||||
}
|
||||
|
||||
parseChanges := func(changes structs.ACLRequests) string {
|
||||
var ret string
|
||||
for i, change := range changes {
|
||||
if i > 0 {
|
||||
ret += "|"
|
||||
}
|
||||
ret += fmt.Sprintf("%s:%s:%s", change.Op, change.ACL.ID, change.ACL.Rules)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
local string
|
||||
remote string
|
||||
lastRemoteIndex uint64
|
||||
expected string
|
||||
}{
|
||||
// Everything empty.
|
||||
{
|
||||
local: "",
|
||||
remote: "",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "",
|
||||
},
|
||||
// First time with empty local.
|
||||
{
|
||||
local: "",
|
||||
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
|
||||
},
|
||||
// Remote not sorted.
|
||||
{
|
||||
local: "",
|
||||
remote: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
|
||||
},
|
||||
// Neither side sorted.
|
||||
{
|
||||
local: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
|
||||
remote: "ccc:9:X|bbb:3:X|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "",
|
||||
},
|
||||
// Fully replicated, nothing to do.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "",
|
||||
},
|
||||
// Change an ACL.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:ccc:Y",
|
||||
},
|
||||
// Change an ACL, but mask the change by the last replicated
|
||||
// index. This isn't how things work normally, but it proves
|
||||
// we are skipping the full compare based on the index.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 33,
|
||||
expected: "",
|
||||
},
|
||||
// Empty everything out.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "delete:bbb:X|delete:ccc:X|delete:ddd:X|delete:eee:X",
|
||||
},
|
||||
// Adds on the ends and in the middle.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "aaa:99:X|bbb:3:X|ccc:9:X|ccx:101:X|ddd:2:X|eee:11:X|fff:102:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:aaa:X|set:ccx:X|set:fff:X",
|
||||
},
|
||||
// Deletes on the ends and in the middle.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "ccc:9:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "delete:bbb:X|delete:ddd:X|delete:eee:X",
|
||||
},
|
||||
// Everything.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "aaa:99:X|bbb:3:X|ccx:101:X|ddd:103:Y|eee:11:X|fff:102:X",
|
||||
lastRemoteIndex: 11,
|
||||
expected: "set:aaa:X|delete:ccc:X|set:ccx:X|set:ddd:Y|set:fff:X",
|
||||
},
|
||||
}
|
||||
for i, test := range tests {
|
||||
local, remote := parseACLs(test.local), parseACLs(test.remote)
|
||||
changes := reconcileLegacyACLs(local, remote, test.lastRemoteIndex)
|
||||
if actual := parseChanges(changes); actual != test.expected {
|
||||
t.Errorf("test case %d failed: %s", i, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLReplicationApplyLimit = 1
|
||||
})
|
||||
s1.tokens.UpdateReplicationToken("secret", tokenStore.TokenSourceConfig)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
changes := structs.ACLRequests{
|
||||
&structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
ID: "secret",
|
||||
Type: "client",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Should be throttled to 1 Hz.
|
||||
start := time.Now()
|
||||
if _, err := s1.updateLocalLegacyACLs(changes, context.Background()); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if dur := time.Since(start); dur < time.Second {
|
||||
t.Fatalf("too slow: %9.6f", dur.Seconds())
|
||||
}
|
||||
|
||||
changes = append(changes,
|
||||
&structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
ID: "secret",
|
||||
Type: "client",
|
||||
},
|
||||
})
|
||||
|
||||
// Should be throttled to 1 Hz.
|
||||
start = time.Now()
|
||||
if _, err := s1.updateLocalLegacyACLs(changes, context.Background()); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if dur := time.Since(start); dur < 2*time.Second {
|
||||
t.Fatalf("too fast: %9.6f", dur.Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_IsACLReplicationEnabled(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
// ACLs not enabled.
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.PrimaryDatacenter = ""
|
||||
c.ACLsEnabled = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
if s1.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should not be enabled")
|
||||
}
|
||||
|
||||
// ACLs enabled but not replication.
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
|
||||
if s2.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should not be enabled")
|
||||
}
|
||||
|
||||
// ACLs enabled with replication.
|
||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLTokenReplication = true
|
||||
})
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
testrpc.WaitForLeader(t, s3.RPC, "dc2")
|
||||
if !s3.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should be enabled")
|
||||
}
|
||||
|
||||
// ACLs enabled with replication, but inside the ACL datacenter
|
||||
// so replication should be disabled.
|
||||
dir4, s4 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLTokenReplication = true
|
||||
})
|
||||
defer os.RemoveAll(dir4)
|
||||
defer s4.Shutdown()
|
||||
testrpc.WaitForLeader(t, s4.RPC, "dc1")
|
||||
if s4.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should not be enabled")
|
||||
}
|
||||
}
|
||||
|
||||
// Note that this test is testing that legacy token data is replicated, NOT
|
||||
// directly testing the legacy acl replication goroutine code.
|
||||
//
|
||||
// Actually testing legacy replication is difficult to do without old binaries.
|
||||
func TestACLReplication_LegacyTokens(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = "root"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLTokenReplication = true
|
||||
c.ACLReplicationRate = 100
|
||||
c.ACLReplicationBurst = 100
|
||||
c.ACLReplicationApplyLimit = 1000000
|
||||
})
|
||||
s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig)
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Try to join.
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
// Wait for legacy acls to be disabled so we are clear that
|
||||
// legacy replication isn't meddling.
|
||||
waitForNewACLs(t, s1)
|
||||
waitForNewACLs(t, s2)
|
||||
waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
|
||||
|
||||
// Create a bunch of new tokens.
|
||||
var id string
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTokenTypeClient,
|
||||
Rules: testACLPolicy,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
checkSame := func() error {
|
||||
index, remote, err := s1.fsm.State().ACLTokenList(nil, true, true, "", "", "", nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, local, err := s2.fsm.State().ACLTokenList(nil, true, true, "", "", "", nil, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if got, want := len(remote), len(local); got != want {
|
||||
return fmt.Errorf("got %d remote ACLs want %d", got, want)
|
||||
}
|
||||
for i, token := range remote {
|
||||
if !bytes.Equal(token.Hash, local[i].Hash) {
|
||||
return fmt.Errorf("ACLs differ")
|
||||
}
|
||||
}
|
||||
|
||||
var status structs.ACLReplicationStatus
|
||||
s2.aclReplicationStatusLock.RLock()
|
||||
status = s2.aclReplicationStatus
|
||||
s2.aclReplicationStatusLock.RUnlock()
|
||||
if !status.Enabled || !status.Running ||
|
||||
status.ReplicatedTokenIndex != index ||
|
||||
status.SourceDatacenter != "dc1" {
|
||||
return fmt.Errorf("ACL replication status differs")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := checkSame(); err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
})
|
||||
|
||||
// Create more new tokens.
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTokenTypeClient,
|
||||
Rules: testACLPolicy,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var dontCare string
|
||||
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := checkSame(); err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
})
|
||||
|
||||
// Delete a token.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLDelete,
|
||||
ACL: structs.ACL{
|
||||
ID: id,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var dontCare string
|
||||
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := checkSame(); err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
|
@ -674,15 +674,6 @@ func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error {
|
|||
// launch the upgrade go routine to generate accessors for everything
|
||||
s.startACLUpgrade(ctx)
|
||||
} else {
|
||||
if s.UseLegacyACLs() && !upgrade {
|
||||
if s.IsACLReplicationEnabled() {
|
||||
s.startLegacyACLReplication(ctx)
|
||||
}
|
||||
// return early as we don't want to start new ACL replication
|
||||
// or ACL token reaping as these are new ACL features.
|
||||
return nil
|
||||
}
|
||||
|
||||
if upgrade {
|
||||
s.stopACLReplication()
|
||||
}
|
||||
|
@ -783,67 +774,6 @@ func (s *Server) stopACLUpgrade() {
|
|||
s.leaderRoutineManager.Stop(aclUpgradeRoutineName)
|
||||
}
|
||||
|
||||
// This function is only intended to be run as a managed go routine, it will block until
|
||||
// the context passed in indicates that it should exit.
|
||||
func (s *Server) runLegacyACLReplication(ctx context.Context) error {
|
||||
var lastRemoteIndex uint64
|
||||
legacyACLLogger := s.aclReplicationLogger(logging.Legacy)
|
||||
limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
|
||||
|
||||
for {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.tokens.ReplicationToken() == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
index, exit, err := s.replicateLegacyACLs(ctx, legacyACLLogger, lastRemoteIndex)
|
||||
if exit {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"},
|
||||
0,
|
||||
)
|
||||
lastRemoteIndex = 0
|
||||
s.updateACLReplicationStatusError(err.Error())
|
||||
legacyACLLogger.Warn("Legacy ACL replication error (will retry if still leader)", "error", err)
|
||||
} else {
|
||||
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"},
|
||||
1,
|
||||
)
|
||||
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "index"},
|
||||
float32(index),
|
||||
)
|
||||
lastRemoteIndex = index
|
||||
s.updateACLReplicationStatusIndex(structs.ACLReplicateLegacy, index)
|
||||
legacyACLLogger.Debug("Legacy ACL replication completed through remote index", "index", index)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) startLegacyACLReplication(ctx context.Context) {
|
||||
if s.InACLDatacenter() {
|
||||
return
|
||||
}
|
||||
|
||||
// unlike some other leader routines this initializes some extra state
|
||||
// and therefore we want to prevent re-initialization if things are already
|
||||
// running
|
||||
if s.leaderRoutineManager.IsRunning(legacyACLReplicationRoutineName) {
|
||||
return
|
||||
}
|
||||
|
||||
s.initReplicationStatus()
|
||||
|
||||
s.leaderRoutineManager.Start(ctx, legacyACLReplicationRoutineName, s.runLegacyACLReplication)
|
||||
s.logger.Info("started legacy ACL replication")
|
||||
s.updateACLReplicationStatusRunning(structs.ACLReplicateLegacy)
|
||||
}
|
||||
|
||||
func (s *Server) startACLReplication(ctx context.Context) {
|
||||
if s.InACLDatacenter() {
|
||||
return
|
||||
|
@ -966,7 +896,6 @@ func (s *Server) aclReplicationLogger(singularNoun string) hclog.Logger {
|
|||
|
||||
func (s *Server) stopACLReplication() {
|
||||
// these will be no-ops when not started
|
||||
s.leaderRoutineManager.Stop(legacyACLReplicationRoutineName)
|
||||
s.leaderRoutineManager.Stop(aclPolicyReplicationRoutineName)
|
||||
s.leaderRoutineManager.Stop(aclRoleReplicationRoutineName)
|
||||
s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName)
|
||||
|
|
|
@ -1542,30 +1542,6 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestLeader_ACLLegacyReplication(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
|
||||
// This test relies on configuring a secondary DC with no route to the primary DC
|
||||
// Having no route will cause the ACL mode checking of the primary to "fail". In this
|
||||
// scenario legacy ACL replication should be enabled without also running new ACL
|
||||
// replication routines.
|
||||
cb := func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLTokenReplication = true
|
||||
}
|
||||
_, srv, _ := testACLServerWithConfig(t, cb, true)
|
||||
waitForLeaderEstablishment(t, srv)
|
||||
|
||||
require.True(t, srv.leaderRoutineManager.IsRunning(legacyACLReplicationRoutineName))
|
||||
require.False(t, srv.leaderRoutineManager.IsRunning(aclPolicyReplicationRoutineName))
|
||||
require.False(t, srv.leaderRoutineManager.IsRunning(aclRoleReplicationRoutineName))
|
||||
require.False(t, srv.leaderRoutineManager.IsRunning(aclTokenReplicationRoutineName))
|
||||
}
|
||||
|
||||
func TestDatacenterSupportsFederationStates(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
|
|
@ -95,7 +95,6 @@ const (
|
|||
)
|
||||
|
||||
const (
|
||||
legacyACLReplicationRoutineName = "legacy ACL replication"
|
||||
aclPolicyReplicationRoutineName = "ACL policy replication"
|
||||
aclRoleReplicationRoutineName = "ACL role replication"
|
||||
aclTokenReplicationRoutineName = "ACL token replication"
|
||||
|
|
|
@ -800,7 +800,7 @@ func aclTokenGetTxn(tx ReadTxn, ws memdb.WatchSet, value, index string, entMeta
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
// ACLTokenList is used to list out all of the ACLs in the state store.
|
||||
// ACLTokenList return a list of ACL Tokens that match the policy, role, and method.
|
||||
func (s *Store) ACLTokenList(ws memdb.WatchSet, local, global bool, policy, role, methodName string, methodMeta, entMeta *structs.EnterpriseMeta) (uint64, structs.ACLTokens, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
|
|
@ -1242,7 +1242,6 @@ func (m *ACLAuthMethod) UnmarshalJSON(data []byte) (err error) {
|
|||
type ACLReplicationType string
|
||||
|
||||
const (
|
||||
ACLReplicateLegacy ACLReplicationType = "legacy"
|
||||
ACLReplicatePolicies ACLReplicationType = "policies"
|
||||
ACLReplicateRoles ACLReplicationType = "roles"
|
||||
ACLReplicateTokens ACLReplicationType = "tokens"
|
||||
|
@ -1250,8 +1249,6 @@ const (
|
|||
|
||||
func (t ACLReplicationType) SingularNoun() string {
|
||||
switch t {
|
||||
case ACLReplicateLegacy:
|
||||
return "legacy"
|
||||
case ACLReplicatePolicies:
|
||||
return "policy"
|
||||
case ACLReplicateRoles:
|
||||
|
|
|
@ -90,6 +90,7 @@ func (a *ACL) Convert() *ACLToken {
|
|||
}
|
||||
|
||||
// Convert attempts to convert an ACLToken into an ACLCompat.
|
||||
// TODO(ACL-Legacy-Compat): remove
|
||||
func (tok *ACLToken) Convert() (*ACL, error) {
|
||||
if tok.Type == "" {
|
||||
return nil, fmt.Errorf("Cannot convert ACLToken into compat token")
|
||||
|
|
Loading…
Reference in New Issue