mirror of https://github.com/status-im/consul.git
Merge pull request #10985 from hashicorp/dnephin/acl-legacy-remove-replication
acl: remove legacy ACL replication
This commit is contained in:
commit
aee8a9511d
|
@ -477,14 +477,6 @@ func (s *Server) replicateACLType(ctx context.Context, logger hclog.Logger, tr a
|
||||||
return remoteIndex, false, nil
|
return remoteIndex, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsACLReplicationEnabled returns true if ACL replication is enabled.
|
|
||||||
// DEPRECATED (ACL-Legacy-Compat) - with new ACLs at least policy replication is required
|
|
||||||
func (s *Server) IsACLReplicationEnabled() bool {
|
|
||||||
authDC := s.config.PrimaryDatacenter
|
|
||||||
return len(authDC) > 0 && (authDC != s.config.Datacenter) &&
|
|
||||||
s.config.ACLTokenReplication
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) updateACLReplicationStatusError(errorMsg string) {
|
func (s *Server) updateACLReplicationStatusError(errorMsg string) {
|
||||||
s.aclReplicationStatusLock.Lock()
|
s.aclReplicationStatusLock.Lock()
|
||||||
defer s.aclReplicationStatusLock.Unlock()
|
defer s.aclReplicationStatusLock.Unlock()
|
||||||
|
@ -499,8 +491,6 @@ func (s *Server) updateACLReplicationStatusIndex(replicationType structs.ACLRepl
|
||||||
|
|
||||||
s.aclReplicationStatus.LastSuccess = time.Now().Round(time.Second).UTC()
|
s.aclReplicationStatus.LastSuccess = time.Now().Round(time.Second).UTC()
|
||||||
switch replicationType {
|
switch replicationType {
|
||||||
case structs.ACLReplicateLegacy:
|
|
||||||
s.aclReplicationStatus.ReplicatedIndex = index
|
|
||||||
case structs.ACLReplicateTokens:
|
case structs.ACLReplicateTokens:
|
||||||
s.aclReplicationStatus.ReplicatedTokenIndex = index
|
s.aclReplicationStatus.ReplicatedTokenIndex = index
|
||||||
case structs.ACLReplicatePolicies:
|
case structs.ACLReplicatePolicies:
|
||||||
|
|
|
@ -1,276 +0,0 @@
|
||||||
package consul
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"sort"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
metrics "github.com/armon/go-metrics"
|
|
||||||
"github.com/hashicorp/go-hclog"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
)
|
|
||||||
|
|
||||||
// aclIterator simplifies the algorithm below by providing a basic iterator that
|
|
||||||
// moves through a list of ACLs and returns nil when it's exhausted. It also has
|
|
||||||
// methods for pre-sorting the ACLs being iterated over by ID, which should
|
|
||||||
// already be true, but since this is crucial for correctness and we are taking
|
|
||||||
// input from other servers, we sort to make sure.
|
|
||||||
type aclIterator struct {
|
|
||||||
acls structs.ACLs
|
|
||||||
|
|
||||||
// index is the current position of the iterator.
|
|
||||||
index int
|
|
||||||
}
|
|
||||||
|
|
||||||
// newACLIterator returns a new ACL iterator.
|
|
||||||
func newACLIterator(acls structs.ACLs) *aclIterator {
|
|
||||||
return &aclIterator{acls: acls}
|
|
||||||
}
|
|
||||||
|
|
||||||
// See sort.Interface.
|
|
||||||
func (a *aclIterator) Len() int {
|
|
||||||
return len(a.acls)
|
|
||||||
}
|
|
||||||
|
|
||||||
// See sort.Interface.
|
|
||||||
func (a *aclIterator) Swap(i, j int) {
|
|
||||||
a.acls[i], a.acls[j] = a.acls[j], a.acls[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
// See sort.Interface.
|
|
||||||
func (a *aclIterator) Less(i, j int) bool {
|
|
||||||
return a.acls[i].ID < a.acls[j].ID
|
|
||||||
}
|
|
||||||
|
|
||||||
// Front returns the item at index position, or nil if the list is exhausted.
|
|
||||||
func (a *aclIterator) Front() *structs.ACL {
|
|
||||||
if a.index < len(a.acls) {
|
|
||||||
return a.acls[a.index]
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Next advances the iterator to the next index.
|
|
||||||
func (a *aclIterator) Next() {
|
|
||||||
a.index++
|
|
||||||
}
|
|
||||||
|
|
||||||
// reconcileACLs takes the local and remote ACL state, and produces a list of
|
|
||||||
// changes required in order to bring the local ACLs into sync with the remote
|
|
||||||
// ACLs. You can supply lastRemoteIndex as a hint that replication has succeeded
|
|
||||||
// up to that remote index and it will make this process more efficient by only
|
|
||||||
// comparing ACL entries modified after that index. Setting this to 0 will force
|
|
||||||
// a full compare of all existing ACLs.
|
|
||||||
func reconcileLegacyACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.ACLRequests {
|
|
||||||
// Since sorting the lists is crucial for correctness, we are depending
|
|
||||||
// on data coming from other servers potentially running a different,
|
|
||||||
// version of Consul, and sorted-ness is kind of a subtle property of
|
|
||||||
// the state store indexing, it's prudent to make sure things are sorted
|
|
||||||
// before we begin.
|
|
||||||
localIter, remoteIter := newACLIterator(local), newACLIterator(remote)
|
|
||||||
sort.Sort(localIter)
|
|
||||||
sort.Sort(remoteIter)
|
|
||||||
|
|
||||||
// Run through both lists and reconcile them.
|
|
||||||
var changes structs.ACLRequests
|
|
||||||
for localIter.Front() != nil || remoteIter.Front() != nil {
|
|
||||||
// If the local list is exhausted, then process this as a remote
|
|
||||||
// add. We know from the loop condition that there's something
|
|
||||||
// in the remote list.
|
|
||||||
if localIter.Front() == nil {
|
|
||||||
changes = append(changes, &structs.ACLRequest{
|
|
||||||
Op: structs.ACLSet,
|
|
||||||
ACL: *(remoteIter.Front()),
|
|
||||||
})
|
|
||||||
remoteIter.Next()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the remote list is exhausted, then process this as a local
|
|
||||||
// delete. We know from the loop condition that there's something
|
|
||||||
// in the local list.
|
|
||||||
if remoteIter.Front() == nil {
|
|
||||||
changes = append(changes, &structs.ACLRequest{
|
|
||||||
Op: structs.ACLDelete,
|
|
||||||
ACL: *(localIter.Front()),
|
|
||||||
})
|
|
||||||
localIter.Next()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// At this point we know there's something at the front of each
|
|
||||||
// list we need to resolve.
|
|
||||||
|
|
||||||
// If the remote list has something local doesn't, we add it.
|
|
||||||
if localIter.Front().ID > remoteIter.Front().ID {
|
|
||||||
changes = append(changes, &structs.ACLRequest{
|
|
||||||
Op: structs.ACLSet,
|
|
||||||
ACL: *(remoteIter.Front()),
|
|
||||||
})
|
|
||||||
remoteIter.Next()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// If local has something remote doesn't, we delete it.
|
|
||||||
if localIter.Front().ID < remoteIter.Front().ID {
|
|
||||||
changes = append(changes, &structs.ACLRequest{
|
|
||||||
Op: structs.ACLDelete,
|
|
||||||
ACL: *(localIter.Front()),
|
|
||||||
})
|
|
||||||
localIter.Next()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Local and remote have an ACL with the same ID, so we might
|
|
||||||
// need to compare them.
|
|
||||||
l, r := localIter.Front(), remoteIter.Front()
|
|
||||||
if r.RaftIndex.ModifyIndex > lastRemoteIndex && !r.IsSame(l) {
|
|
||||||
changes = append(changes, &structs.ACLRequest{
|
|
||||||
Op: structs.ACLSet,
|
|
||||||
ACL: *r,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
localIter.Next()
|
|
||||||
remoteIter.Next()
|
|
||||||
}
|
|
||||||
return changes
|
|
||||||
}
|
|
||||||
|
|
||||||
// FetchLocalACLs returns the ACLs in the local state store.
|
|
||||||
func (s *Server) fetchLocalLegacyACLs() (structs.ACLs, error) {
|
|
||||||
_, local, err := s.fsm.State().ACLTokenList(nil, false, true, "", "", "", nil, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
var acls structs.ACLs
|
|
||||||
for _, token := range local {
|
|
||||||
if token.IsExpired(now) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if acl, err := token.Convert(); err == nil && acl != nil {
|
|
||||||
acls = append(acls, acl)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return acls, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FetchRemoteACLs is used to get the remote set of ACLs from the ACL
|
|
||||||
// datacenter. The lastIndex parameter is a hint about which remote index we
|
|
||||||
// have replicated to, so this is expected to block until something changes.
|
|
||||||
func (s *Server) fetchRemoteLegacyACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, error) {
|
|
||||||
defer metrics.MeasureSince([]string{"leader", "fetchRemoteACLs"}, time.Now())
|
|
||||||
|
|
||||||
args := structs.DCSpecificRequest{
|
|
||||||
Datacenter: s.config.PrimaryDatacenter,
|
|
||||||
QueryOptions: structs.QueryOptions{
|
|
||||||
Token: s.tokens.ReplicationToken(),
|
|
||||||
MinQueryIndex: lastRemoteIndex,
|
|
||||||
AllowStale: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
var remote structs.IndexedACLs
|
|
||||||
if err := s.RPC("ACL.List", &args, &remote); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &remote, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateLocalACLs is given a list of changes to apply in order to bring the
|
|
||||||
// local ACLs in-line with the remote ACLs from the ACL datacenter.
|
|
||||||
func (s *Server) updateLocalLegacyACLs(changes structs.ACLRequests, ctx context.Context) (bool, error) {
|
|
||||||
defer metrics.MeasureSince([]string{"leader", "updateLocalACLs"}, time.Now())
|
|
||||||
|
|
||||||
minTimePerOp := time.Second / time.Duration(s.config.ACLReplicationApplyLimit)
|
|
||||||
for _, change := range changes {
|
|
||||||
// Note that we are using the single ACL interface here and not
|
|
||||||
// performing all this inside a single transaction. This is OK
|
|
||||||
// for two reasons. First, there's nothing else other than this
|
|
||||||
// replication routine that alters the local ACLs, so there's
|
|
||||||
// nothing to contend with locally. Second, if an apply fails
|
|
||||||
// in the middle (most likely due to losing leadership), the
|
|
||||||
// next replication pass will clean up and check everything
|
|
||||||
// again.
|
|
||||||
var reply string
|
|
||||||
start := time.Now()
|
|
||||||
if err := aclApplyInternal(s, change, &reply); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do a smooth rate limit to wait out the min time allowed for
|
|
||||||
// each op. If this op took longer than the min, then the sleep
|
|
||||||
// time will be negative and we will just move on.
|
|
||||||
elapsed := time.Since(start)
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return true, nil
|
|
||||||
case <-time.After(minTimePerOp - elapsed):
|
|
||||||
// do nothing
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// replicateACLs is a runs one pass of the algorithm for replicating ACLs from
|
|
||||||
// a remote ACL datacenter to local state. If there's any error, this will return
|
|
||||||
// 0 for the lastRemoteIndex, which will cause us to immediately do a full sync
|
|
||||||
// next time.
|
|
||||||
func (s *Server) replicateLegacyACLs(ctx context.Context, logger hclog.Logger, lastRemoteIndex uint64) (uint64, bool, error) {
|
|
||||||
remote, err := s.fetchRemoteLegacyACLs(lastRemoteIndex)
|
|
||||||
if err != nil {
|
|
||||||
return 0, false, fmt.Errorf("failed to retrieve remote ACLs: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Need to check if we should be stopping. This will be common as the fetching process is a blocking
|
|
||||||
// RPC which could have been hanging around for a long time and during that time leadership could
|
|
||||||
// have been lost.
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return 0, true, nil
|
|
||||||
default:
|
|
||||||
// do nothing
|
|
||||||
}
|
|
||||||
|
|
||||||
// Measure everything after the remote query, which can block for long
|
|
||||||
// periods of time. This metric is a good measure of how expensive the
|
|
||||||
// replication process is.
|
|
||||||
defer metrics.MeasureSince([]string{"leader", "replicateACLs"}, time.Now())
|
|
||||||
|
|
||||||
local, err := s.fetchLocalLegacyACLs()
|
|
||||||
if err != nil {
|
|
||||||
return 0, false, fmt.Errorf("failed to retrieve local ACLs: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the remote index ever goes backwards, it's a good indication that
|
|
||||||
// the remote side was rebuilt and we should do a full sync since we
|
|
||||||
// can't make any assumptions about what's going on.
|
|
||||||
if remote.QueryMeta.Index < lastRemoteIndex {
|
|
||||||
logger.Warn(
|
|
||||||
"Legacy ACL replication remote index moved backwards, forcing a full ACL sync",
|
|
||||||
"from", lastRemoteIndex,
|
|
||||||
"to", remote.QueryMeta.Index,
|
|
||||||
)
|
|
||||||
lastRemoteIndex = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate the changes required to bring the state into sync and then
|
|
||||||
// apply them.
|
|
||||||
changes := reconcileLegacyACLs(local, remote.ACLs, lastRemoteIndex)
|
|
||||||
exit, err := s.updateLocalLegacyACLs(changes, ctx)
|
|
||||||
if exit {
|
|
||||||
return 0, true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return 0, false, fmt.Errorf("failed to sync ACL changes: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return the index we got back from the remote side, since we've synced
|
|
||||||
// up with the remote state as of that index.
|
|
||||||
return remote.QueryMeta.Index, false, nil
|
|
||||||
}
|
|
|
@ -1,491 +0,0 @@
|
||||||
package consul
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"reflect"
|
|
||||||
"sort"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
tokenStore "github.com/hashicorp/consul/agent/token"
|
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
|
||||||
"github.com/hashicorp/consul/testrpc"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestACLReplication_Sorter(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
acls := structs.ACLs{
|
|
||||||
&structs.ACL{ID: "a"},
|
|
||||||
&structs.ACL{ID: "b"},
|
|
||||||
&structs.ACL{ID: "c"},
|
|
||||||
}
|
|
||||||
|
|
||||||
sorter := &aclIterator{acls, 0}
|
|
||||||
if len := sorter.Len(); len != 3 {
|
|
||||||
t.Fatalf("bad: %d", len)
|
|
||||||
}
|
|
||||||
if !sorter.Less(0, 1) {
|
|
||||||
t.Fatalf("should be less")
|
|
||||||
}
|
|
||||||
if sorter.Less(1, 0) {
|
|
||||||
t.Fatalf("should not be less")
|
|
||||||
}
|
|
||||||
if !sort.IsSorted(sorter) {
|
|
||||||
t.Fatalf("should be sorted")
|
|
||||||
}
|
|
||||||
|
|
||||||
expected := structs.ACLs{
|
|
||||||
&structs.ACL{ID: "b"},
|
|
||||||
&structs.ACL{ID: "a"},
|
|
||||||
&structs.ACL{ID: "c"},
|
|
||||||
}
|
|
||||||
sorter.Swap(0, 1)
|
|
||||||
if !reflect.DeepEqual(acls, expected) {
|
|
||||||
t.Fatalf("bad: %v", acls)
|
|
||||||
}
|
|
||||||
if sort.IsSorted(sorter) {
|
|
||||||
t.Fatalf("should not be sorted")
|
|
||||||
}
|
|
||||||
sort.Sort(sorter)
|
|
||||||
if !sort.IsSorted(sorter) {
|
|
||||||
t.Fatalf("should be sorted")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestACLReplication_Iterator(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
acls := structs.ACLs{}
|
|
||||||
|
|
||||||
iter := newACLIterator(acls)
|
|
||||||
if front := iter.Front(); front != nil {
|
|
||||||
t.Fatalf("bad: %v", front)
|
|
||||||
}
|
|
||||||
iter.Next()
|
|
||||||
if front := iter.Front(); front != nil {
|
|
||||||
t.Fatalf("bad: %v", front)
|
|
||||||
}
|
|
||||||
|
|
||||||
acls = structs.ACLs{
|
|
||||||
&structs.ACL{ID: "a"},
|
|
||||||
&structs.ACL{ID: "b"},
|
|
||||||
&structs.ACL{ID: "c"},
|
|
||||||
}
|
|
||||||
iter = newACLIterator(acls)
|
|
||||||
if front := iter.Front(); front != acls[0] {
|
|
||||||
t.Fatalf("bad: %v", front)
|
|
||||||
}
|
|
||||||
iter.Next()
|
|
||||||
if front := iter.Front(); front != acls[1] {
|
|
||||||
t.Fatalf("bad: %v", front)
|
|
||||||
}
|
|
||||||
iter.Next()
|
|
||||||
if front := iter.Front(); front != acls[2] {
|
|
||||||
t.Fatalf("bad: %v", front)
|
|
||||||
}
|
|
||||||
iter.Next()
|
|
||||||
if front := iter.Front(); front != nil {
|
|
||||||
t.Fatalf("bad: %v", front)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestACLReplication_reconcileACLs(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
parseACLs := func(raw string) structs.ACLs {
|
|
||||||
var acls structs.ACLs
|
|
||||||
for _, key := range strings.Split(raw, "|") {
|
|
||||||
if len(key) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
tuple := strings.Split(key, ":")
|
|
||||||
index, err := strconv.Atoi(tuple[1])
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
acl := &structs.ACL{
|
|
||||||
ID: tuple[0],
|
|
||||||
Rules: tuple[2],
|
|
||||||
RaftIndex: structs.RaftIndex{
|
|
||||||
ModifyIndex: uint64(index),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
acls = append(acls, acl)
|
|
||||||
}
|
|
||||||
return acls
|
|
||||||
}
|
|
||||||
|
|
||||||
parseChanges := func(changes structs.ACLRequests) string {
|
|
||||||
var ret string
|
|
||||||
for i, change := range changes {
|
|
||||||
if i > 0 {
|
|
||||||
ret += "|"
|
|
||||||
}
|
|
||||||
ret += fmt.Sprintf("%s:%s:%s", change.Op, change.ACL.ID, change.ACL.Rules)
|
|
||||||
}
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
local string
|
|
||||||
remote string
|
|
||||||
lastRemoteIndex uint64
|
|
||||||
expected string
|
|
||||||
}{
|
|
||||||
// Everything empty.
|
|
||||||
{
|
|
||||||
local: "",
|
|
||||||
remote: "",
|
|
||||||
lastRemoteIndex: 0,
|
|
||||||
expected: "",
|
|
||||||
},
|
|
||||||
// First time with empty local.
|
|
||||||
{
|
|
||||||
local: "",
|
|
||||||
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
|
||||||
lastRemoteIndex: 0,
|
|
||||||
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
|
|
||||||
},
|
|
||||||
// Remote not sorted.
|
|
||||||
{
|
|
||||||
local: "",
|
|
||||||
remote: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
|
|
||||||
lastRemoteIndex: 0,
|
|
||||||
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
|
|
||||||
},
|
|
||||||
// Neither side sorted.
|
|
||||||
{
|
|
||||||
local: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
|
|
||||||
remote: "ccc:9:X|bbb:3:X|ddd:2:X|eee:11:X",
|
|
||||||
lastRemoteIndex: 0,
|
|
||||||
expected: "",
|
|
||||||
},
|
|
||||||
// Fully replicated, nothing to do.
|
|
||||||
{
|
|
||||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
|
||||||
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
|
||||||
lastRemoteIndex: 0,
|
|
||||||
expected: "",
|
|
||||||
},
|
|
||||||
// Change an ACL.
|
|
||||||
{
|
|
||||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
|
||||||
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
|
|
||||||
lastRemoteIndex: 0,
|
|
||||||
expected: "set:ccc:Y",
|
|
||||||
},
|
|
||||||
// Change an ACL, but mask the change by the last replicated
|
|
||||||
// index. This isn't how things work normally, but it proves
|
|
||||||
// we are skipping the full compare based on the index.
|
|
||||||
{
|
|
||||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
|
||||||
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
|
|
||||||
lastRemoteIndex: 33,
|
|
||||||
expected: "",
|
|
||||||
},
|
|
||||||
// Empty everything out.
|
|
||||||
{
|
|
||||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
|
||||||
remote: "",
|
|
||||||
lastRemoteIndex: 0,
|
|
||||||
expected: "delete:bbb:X|delete:ccc:X|delete:ddd:X|delete:eee:X",
|
|
||||||
},
|
|
||||||
// Adds on the ends and in the middle.
|
|
||||||
{
|
|
||||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
|
||||||
remote: "aaa:99:X|bbb:3:X|ccc:9:X|ccx:101:X|ddd:2:X|eee:11:X|fff:102:X",
|
|
||||||
lastRemoteIndex: 0,
|
|
||||||
expected: "set:aaa:X|set:ccx:X|set:fff:X",
|
|
||||||
},
|
|
||||||
// Deletes on the ends and in the middle.
|
|
||||||
{
|
|
||||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
|
||||||
remote: "ccc:9:X",
|
|
||||||
lastRemoteIndex: 0,
|
|
||||||
expected: "delete:bbb:X|delete:ddd:X|delete:eee:X",
|
|
||||||
},
|
|
||||||
// Everything.
|
|
||||||
{
|
|
||||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
|
||||||
remote: "aaa:99:X|bbb:3:X|ccx:101:X|ddd:103:Y|eee:11:X|fff:102:X",
|
|
||||||
lastRemoteIndex: 11,
|
|
||||||
expected: "set:aaa:X|delete:ccc:X|set:ccx:X|set:ddd:Y|set:fff:X",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
for i, test := range tests {
|
|
||||||
local, remote := parseACLs(test.local), parseACLs(test.remote)
|
|
||||||
changes := reconcileLegacyACLs(local, remote, test.lastRemoteIndex)
|
|
||||||
if actual := parseChanges(changes); actual != test.expected {
|
|
||||||
t.Errorf("test case %d failed: %s", i, actual)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("too slow for testing.Short")
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Parallel()
|
|
||||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
||||||
c.Datacenter = "dc2"
|
|
||||||
c.PrimaryDatacenter = "dc1"
|
|
||||||
c.ACLsEnabled = true
|
|
||||||
c.ACLReplicationApplyLimit = 1
|
|
||||||
})
|
|
||||||
s1.tokens.UpdateReplicationToken("secret", tokenStore.TokenSourceConfig)
|
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
|
||||||
|
|
||||||
changes := structs.ACLRequests{
|
|
||||||
&structs.ACLRequest{
|
|
||||||
Op: structs.ACLSet,
|
|
||||||
ACL: structs.ACL{
|
|
||||||
ID: "secret",
|
|
||||||
Type: "client",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Should be throttled to 1 Hz.
|
|
||||||
start := time.Now()
|
|
||||||
if _, err := s1.updateLocalLegacyACLs(changes, context.Background()); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
if dur := time.Since(start); dur < time.Second {
|
|
||||||
t.Fatalf("too slow: %9.6f", dur.Seconds())
|
|
||||||
}
|
|
||||||
|
|
||||||
changes = append(changes,
|
|
||||||
&structs.ACLRequest{
|
|
||||||
Op: structs.ACLSet,
|
|
||||||
ACL: structs.ACL{
|
|
||||||
ID: "secret",
|
|
||||||
Type: "client",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// Should be throttled to 1 Hz.
|
|
||||||
start = time.Now()
|
|
||||||
if _, err := s1.updateLocalLegacyACLs(changes, context.Background()); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
if dur := time.Since(start); dur < 2*time.Second {
|
|
||||||
t.Fatalf("too fast: %9.6f", dur.Seconds())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestACLReplication_IsACLReplicationEnabled(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("too slow for testing.Short")
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Parallel()
|
|
||||||
// ACLs not enabled.
|
|
||||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
||||||
c.PrimaryDatacenter = ""
|
|
||||||
c.ACLsEnabled = false
|
|
||||||
})
|
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
if s1.IsACLReplicationEnabled() {
|
|
||||||
t.Fatalf("should not be enabled")
|
|
||||||
}
|
|
||||||
|
|
||||||
// ACLs enabled but not replication.
|
|
||||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
|
||||||
c.Datacenter = "dc2"
|
|
||||||
c.PrimaryDatacenter = "dc1"
|
|
||||||
c.ACLsEnabled = true
|
|
||||||
})
|
|
||||||
defer os.RemoveAll(dir2)
|
|
||||||
defer s2.Shutdown()
|
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
||||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
|
||||||
|
|
||||||
if s2.IsACLReplicationEnabled() {
|
|
||||||
t.Fatalf("should not be enabled")
|
|
||||||
}
|
|
||||||
|
|
||||||
// ACLs enabled with replication.
|
|
||||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
|
||||||
c.Datacenter = "dc2"
|
|
||||||
c.PrimaryDatacenter = "dc1"
|
|
||||||
c.ACLsEnabled = true
|
|
||||||
c.ACLTokenReplication = true
|
|
||||||
})
|
|
||||||
defer os.RemoveAll(dir3)
|
|
||||||
defer s3.Shutdown()
|
|
||||||
testrpc.WaitForLeader(t, s3.RPC, "dc2")
|
|
||||||
if !s3.IsACLReplicationEnabled() {
|
|
||||||
t.Fatalf("should be enabled")
|
|
||||||
}
|
|
||||||
|
|
||||||
// ACLs enabled with replication, but inside the ACL datacenter
|
|
||||||
// so replication should be disabled.
|
|
||||||
dir4, s4 := testServerWithConfig(t, func(c *Config) {
|
|
||||||
c.Datacenter = "dc1"
|
|
||||||
c.PrimaryDatacenter = "dc1"
|
|
||||||
c.ACLsEnabled = true
|
|
||||||
c.ACLTokenReplication = true
|
|
||||||
})
|
|
||||||
defer os.RemoveAll(dir4)
|
|
||||||
defer s4.Shutdown()
|
|
||||||
testrpc.WaitForLeader(t, s4.RPC, "dc1")
|
|
||||||
if s4.IsACLReplicationEnabled() {
|
|
||||||
t.Fatalf("should not be enabled")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Note that this test is testing that legacy token data is replicated, NOT
|
|
||||||
// directly testing the legacy acl replication goroutine code.
|
|
||||||
//
|
|
||||||
// Actually testing legacy replication is difficult to do without old binaries.
|
|
||||||
func TestACLReplication_LegacyTokens(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("too slow for testing.Short")
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Parallel()
|
|
||||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
|
||||||
c.PrimaryDatacenter = "dc1"
|
|
||||||
c.ACLsEnabled = true
|
|
||||||
c.ACLMasterToken = "root"
|
|
||||||
})
|
|
||||||
defer os.RemoveAll(dir1)
|
|
||||||
defer s1.Shutdown()
|
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
||||||
client := rpcClient(t, s1)
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
|
||||||
c.Datacenter = "dc2"
|
|
||||||
c.PrimaryDatacenter = "dc1"
|
|
||||||
c.ACLsEnabled = true
|
|
||||||
c.ACLTokenReplication = true
|
|
||||||
c.ACLReplicationRate = 100
|
|
||||||
c.ACLReplicationBurst = 100
|
|
||||||
c.ACLReplicationApplyLimit = 1000000
|
|
||||||
})
|
|
||||||
s2.tokens.UpdateReplicationToken("root", tokenStore.TokenSourceConfig)
|
|
||||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
|
||||||
defer os.RemoveAll(dir2)
|
|
||||||
defer s2.Shutdown()
|
|
||||||
|
|
||||||
// Try to join.
|
|
||||||
joinWAN(t, s2, s1)
|
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
|
||||||
|
|
||||||
// Wait for legacy acls to be disabled so we are clear that
|
|
||||||
// legacy replication isn't meddling.
|
|
||||||
waitForNewACLs(t, s1)
|
|
||||||
waitForNewACLs(t, s2)
|
|
||||||
waitForNewACLReplication(t, s2, structs.ACLReplicateTokens, 1, 1, 0)
|
|
||||||
|
|
||||||
// Create a bunch of new tokens.
|
|
||||||
var id string
|
|
||||||
for i := 0; i < 50; i++ {
|
|
||||||
arg := structs.ACLRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
Op: structs.ACLSet,
|
|
||||||
ACL: structs.ACL{
|
|
||||||
Name: "User token",
|
|
||||||
Type: structs.ACLTokenTypeClient,
|
|
||||||
Rules: testACLPolicy,
|
|
||||||
},
|
|
||||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
|
||||||
}
|
|
||||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
checkSame := func() error {
|
|
||||||
index, remote, err := s1.fsm.State().ACLTokenList(nil, true, true, "", "", "", nil, nil)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, local, err := s2.fsm.State().ACLTokenList(nil, true, true, "", "", "", nil, nil)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if got, want := len(remote), len(local); got != want {
|
|
||||||
return fmt.Errorf("got %d remote ACLs want %d", got, want)
|
|
||||||
}
|
|
||||||
for i, token := range remote {
|
|
||||||
if !bytes.Equal(token.Hash, local[i].Hash) {
|
|
||||||
return fmt.Errorf("ACLs differ")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var status structs.ACLReplicationStatus
|
|
||||||
s2.aclReplicationStatusLock.RLock()
|
|
||||||
status = s2.aclReplicationStatus
|
|
||||||
s2.aclReplicationStatusLock.RUnlock()
|
|
||||||
if !status.Enabled || !status.Running ||
|
|
||||||
status.ReplicatedTokenIndex != index ||
|
|
||||||
status.SourceDatacenter != "dc1" {
|
|
||||||
return fmt.Errorf("ACL replication status differs")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// Wait for the replica to converge.
|
|
||||||
retry.Run(t, func(r *retry.R) {
|
|
||||||
if err := checkSame(); err != nil {
|
|
||||||
r.Fatal(err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// Create more new tokens.
|
|
||||||
for i := 0; i < 50; i++ {
|
|
||||||
arg := structs.ACLRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
Op: structs.ACLSet,
|
|
||||||
ACL: structs.ACL{
|
|
||||||
Name: "User token",
|
|
||||||
Type: structs.ACLTokenTypeClient,
|
|
||||||
Rules: testACLPolicy,
|
|
||||||
},
|
|
||||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
|
||||||
}
|
|
||||||
var dontCare string
|
|
||||||
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Wait for the replica to converge.
|
|
||||||
retry.Run(t, func(r *retry.R) {
|
|
||||||
if err := checkSame(); err != nil {
|
|
||||||
r.Fatal(err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// Delete a token.
|
|
||||||
arg := structs.ACLRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
Op: structs.ACLDelete,
|
|
||||||
ACL: structs.ACL{
|
|
||||||
ID: id,
|
|
||||||
},
|
|
||||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
|
||||||
}
|
|
||||||
var dontCare string
|
|
||||||
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
// Wait for the replica to converge.
|
|
||||||
retry.Run(t, func(r *retry.R) {
|
|
||||||
if err := checkSame(); err != nil {
|
|
||||||
r.Fatal(err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -674,15 +674,6 @@ func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error {
|
||||||
// launch the upgrade go routine to generate accessors for everything
|
// launch the upgrade go routine to generate accessors for everything
|
||||||
s.startACLUpgrade(ctx)
|
s.startACLUpgrade(ctx)
|
||||||
} else {
|
} else {
|
||||||
if s.UseLegacyACLs() && !upgrade {
|
|
||||||
if s.IsACLReplicationEnabled() {
|
|
||||||
s.startLegacyACLReplication(ctx)
|
|
||||||
}
|
|
||||||
// return early as we don't want to start new ACL replication
|
|
||||||
// or ACL token reaping as these are new ACL features.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if upgrade {
|
if upgrade {
|
||||||
s.stopACLReplication()
|
s.stopACLReplication()
|
||||||
}
|
}
|
||||||
|
@ -783,67 +774,6 @@ func (s *Server) stopACLUpgrade() {
|
||||||
s.leaderRoutineManager.Stop(aclUpgradeRoutineName)
|
s.leaderRoutineManager.Stop(aclUpgradeRoutineName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function is only intended to be run as a managed go routine, it will block until
|
|
||||||
// the context passed in indicates that it should exit.
|
|
||||||
func (s *Server) runLegacyACLReplication(ctx context.Context) error {
|
|
||||||
var lastRemoteIndex uint64
|
|
||||||
legacyACLLogger := s.aclReplicationLogger(logging.Legacy)
|
|
||||||
limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
|
|
||||||
|
|
||||||
for {
|
|
||||||
if err := limiter.Wait(ctx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if s.tokens.ReplicationToken() == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
index, exit, err := s.replicateLegacyACLs(ctx, legacyACLLogger, lastRemoteIndex)
|
|
||||||
if exit {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"},
|
|
||||||
0,
|
|
||||||
)
|
|
||||||
lastRemoteIndex = 0
|
|
||||||
s.updateACLReplicationStatusError(err.Error())
|
|
||||||
legacyACLLogger.Warn("Legacy ACL replication error (will retry if still leader)", "error", err)
|
|
||||||
} else {
|
|
||||||
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"},
|
|
||||||
1,
|
|
||||||
)
|
|
||||||
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "index"},
|
|
||||||
float32(index),
|
|
||||||
)
|
|
||||||
lastRemoteIndex = index
|
|
||||||
s.updateACLReplicationStatusIndex(structs.ACLReplicateLegacy, index)
|
|
||||||
legacyACLLogger.Debug("Legacy ACL replication completed through remote index", "index", index)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) startLegacyACLReplication(ctx context.Context) {
|
|
||||||
if s.InACLDatacenter() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// unlike some other leader routines this initializes some extra state
|
|
||||||
// and therefore we want to prevent re-initialization if things are already
|
|
||||||
// running
|
|
||||||
if s.leaderRoutineManager.IsRunning(legacyACLReplicationRoutineName) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
s.initReplicationStatus()
|
|
||||||
|
|
||||||
s.leaderRoutineManager.Start(ctx, legacyACLReplicationRoutineName, s.runLegacyACLReplication)
|
|
||||||
s.logger.Info("started legacy ACL replication")
|
|
||||||
s.updateACLReplicationStatusRunning(structs.ACLReplicateLegacy)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) startACLReplication(ctx context.Context) {
|
func (s *Server) startACLReplication(ctx context.Context) {
|
||||||
if s.InACLDatacenter() {
|
if s.InACLDatacenter() {
|
||||||
return
|
return
|
||||||
|
@ -966,7 +896,6 @@ func (s *Server) aclReplicationLogger(singularNoun string) hclog.Logger {
|
||||||
|
|
||||||
func (s *Server) stopACLReplication() {
|
func (s *Server) stopACLReplication() {
|
||||||
// these will be no-ops when not started
|
// these will be no-ops when not started
|
||||||
s.leaderRoutineManager.Stop(legacyACLReplicationRoutineName)
|
|
||||||
s.leaderRoutineManager.Stop(aclPolicyReplicationRoutineName)
|
s.leaderRoutineManager.Stop(aclPolicyReplicationRoutineName)
|
||||||
s.leaderRoutineManager.Stop(aclRoleReplicationRoutineName)
|
s.leaderRoutineManager.Stop(aclRoleReplicationRoutineName)
|
||||||
s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName)
|
s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName)
|
||||||
|
|
|
@ -1542,30 +1542,6 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLeader_ACLLegacyReplication(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("too slow for testing.Short")
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Parallel()
|
|
||||||
|
|
||||||
// This test relies on configuring a secondary DC with no route to the primary DC
|
|
||||||
// Having no route will cause the ACL mode checking of the primary to "fail". In this
|
|
||||||
// scenario legacy ACL replication should be enabled without also running new ACL
|
|
||||||
// replication routines.
|
|
||||||
cb := func(c *Config) {
|
|
||||||
c.Datacenter = "dc2"
|
|
||||||
c.ACLTokenReplication = true
|
|
||||||
}
|
|
||||||
_, srv, _ := testACLServerWithConfig(t, cb, true)
|
|
||||||
waitForLeaderEstablishment(t, srv)
|
|
||||||
|
|
||||||
require.True(t, srv.leaderRoutineManager.IsRunning(legacyACLReplicationRoutineName))
|
|
||||||
require.False(t, srv.leaderRoutineManager.IsRunning(aclPolicyReplicationRoutineName))
|
|
||||||
require.False(t, srv.leaderRoutineManager.IsRunning(aclRoleReplicationRoutineName))
|
|
||||||
require.False(t, srv.leaderRoutineManager.IsRunning(aclTokenReplicationRoutineName))
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDatacenterSupportsFederationStates(t *testing.T) {
|
func TestDatacenterSupportsFederationStates(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
|
|
@ -95,7 +95,6 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
legacyACLReplicationRoutineName = "legacy ACL replication"
|
|
||||||
aclPolicyReplicationRoutineName = "ACL policy replication"
|
aclPolicyReplicationRoutineName = "ACL policy replication"
|
||||||
aclRoleReplicationRoutineName = "ACL role replication"
|
aclRoleReplicationRoutineName = "ACL role replication"
|
||||||
aclTokenReplicationRoutineName = "ACL token replication"
|
aclTokenReplicationRoutineName = "ACL token replication"
|
||||||
|
|
|
@ -649,7 +649,7 @@ func aclTokenGetTxn(tx ReadTxn, ws memdb.WatchSet, value, index string, entMeta
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ACLTokenList is used to list out all of the ACLs in the state store.
|
// ACLTokenList return a list of ACL Tokens that match the policy, role, and method.
|
||||||
func (s *Store) ACLTokenList(ws memdb.WatchSet, local, global bool, policy, role, methodName string, methodMeta, entMeta *structs.EnterpriseMeta) (uint64, structs.ACLTokens, error) {
|
func (s *Store) ACLTokenList(ws memdb.WatchSet, local, global bool, policy, role, methodName string, methodMeta, entMeta *structs.EnterpriseMeta) (uint64, structs.ACLTokens, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
|
@ -1242,7 +1242,6 @@ func (m *ACLAuthMethod) UnmarshalJSON(data []byte) (err error) {
|
||||||
type ACLReplicationType string
|
type ACLReplicationType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ACLReplicateLegacy ACLReplicationType = "legacy"
|
|
||||||
ACLReplicatePolicies ACLReplicationType = "policies"
|
ACLReplicatePolicies ACLReplicationType = "policies"
|
||||||
ACLReplicateRoles ACLReplicationType = "roles"
|
ACLReplicateRoles ACLReplicationType = "roles"
|
||||||
ACLReplicateTokens ACLReplicationType = "tokens"
|
ACLReplicateTokens ACLReplicationType = "tokens"
|
||||||
|
@ -1250,8 +1249,6 @@ const (
|
||||||
|
|
||||||
func (t ACLReplicationType) SingularNoun() string {
|
func (t ACLReplicationType) SingularNoun() string {
|
||||||
switch t {
|
switch t {
|
||||||
case ACLReplicateLegacy:
|
|
||||||
return "legacy"
|
|
||||||
case ACLReplicatePolicies:
|
case ACLReplicatePolicies:
|
||||||
return "policy"
|
return "policy"
|
||||||
case ACLReplicateRoles:
|
case ACLReplicateRoles:
|
||||||
|
|
|
@ -90,6 +90,7 @@ func (a *ACL) Convert() *ACLToken {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert attempts to convert an ACLToken into an ACLCompat.
|
// Convert attempts to convert an ACLToken into an ACLCompat.
|
||||||
|
// TODO(ACL-Legacy-Compat): remove
|
||||||
func (tok *ACLToken) Convert() (*ACL, error) {
|
func (tok *ACLToken) Convert() (*ACL, error) {
|
||||||
if tok.Type == "" {
|
if tok.Type == "" {
|
||||||
return nil, fmt.Errorf("Cannot convert ACLToken into compat token")
|
return nil, fmt.Errorf("Cannot convert ACLToken into compat token")
|
||||||
|
@ -105,21 +106,6 @@ func (tok *ACLToken) Convert() (*ACL, error) {
|
||||||
return compat, nil
|
return compat, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsSame checks if one ACL is the same as another, without looking
|
|
||||||
// at the Raft information (that's why we didn't call it IsEqual). This is
|
|
||||||
// useful for seeing if an update would be idempotent for all the functional
|
|
||||||
// parts of the structure.
|
|
||||||
func (a *ACL) IsSame(other *ACL) bool {
|
|
||||||
if a.ID != other.ID ||
|
|
||||||
a.Name != other.Name ||
|
|
||||||
a.Type != other.Type ||
|
|
||||||
a.Rules != other.Rules {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// ACLRequest is used to create, update or delete an ACL
|
// ACLRequest is used to create, update or delete an ACL
|
||||||
type ACLRequest struct {
|
type ACLRequest struct {
|
||||||
Datacenter string
|
Datacenter string
|
||||||
|
|
|
@ -6,53 +6,6 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestStructs_ACL_IsSame(t *testing.T) {
|
|
||||||
acl := &ACL{
|
|
||||||
ID: "guid",
|
|
||||||
Name: "An ACL for testing",
|
|
||||||
Type: "client",
|
|
||||||
Rules: "service \"\" { policy = \"read\" }",
|
|
||||||
}
|
|
||||||
if !acl.IsSame(acl) {
|
|
||||||
t.Fatalf("should be equal to itself")
|
|
||||||
}
|
|
||||||
|
|
||||||
other := &ACL{
|
|
||||||
ID: "guid",
|
|
||||||
Name: "An ACL for testing",
|
|
||||||
Type: "client",
|
|
||||||
Rules: "service \"\" { policy = \"read\" }",
|
|
||||||
RaftIndex: RaftIndex{
|
|
||||||
CreateIndex: 1,
|
|
||||||
ModifyIndex: 2,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if !acl.IsSame(other) || !other.IsSame(acl) {
|
|
||||||
t.Fatalf("should not care about Raft fields")
|
|
||||||
}
|
|
||||||
|
|
||||||
check := func(twiddle, restore func()) {
|
|
||||||
if !acl.IsSame(other) || !other.IsSame(acl) {
|
|
||||||
t.Fatalf("should be the same")
|
|
||||||
}
|
|
||||||
|
|
||||||
twiddle()
|
|
||||||
if acl.IsSame(other) || other.IsSame(acl) {
|
|
||||||
t.Fatalf("should not be the same")
|
|
||||||
}
|
|
||||||
|
|
||||||
restore()
|
|
||||||
if !acl.IsSame(other) || !other.IsSame(acl) {
|
|
||||||
t.Fatalf("should be the same")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
check(func() { other.ID = "nope" }, func() { other.ID = "guid" })
|
|
||||||
check(func() { other.Name = "nope" }, func() { other.Name = "An ACL for testing" })
|
|
||||||
check(func() { other.Type = "management" }, func() { other.Type = "client" })
|
|
||||||
check(func() { other.Rules = "" }, func() { other.Rules = "service \"\" { policy = \"read\" }" })
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStructs_ACL_Convert(t *testing.T) {
|
func TestStructs_ACL_Convert(t *testing.T) {
|
||||||
|
|
||||||
acl := &ACL{
|
acl := &ACL{
|
||||||
|
|
Loading…
Reference in New Issue