mirror of https://github.com/status-im/consul.git
Remaining ACL Unit Tests (#4852)
* Add leader token upgrade test and fix various ACL enablement bugs * Update the leader ACL initialization tests. * Add a StateStore ACL tests for ACLTokenSet and ACLTokenGetBy* functions * Advertise the agents acl support status with the agent/self endpoint. * Make batch token upsert CAS’able to prevent consistency issues with token auto-upgrade * Finish up the ACL state store token tests * Finish the ACL state store unit tests Also rename some things to make them more consistent. * Do as much ACL replication testing as I can.
This commit is contained in:
parent
33ae0149ea
commit
f9cf0eb36e
|
@ -126,7 +126,7 @@ func (s *HTTPServer) ACLRulesTranslateLegacyToken(resp http.ResponseWriter, req
|
|||
return nil, BadRequestError{Reason: "Missing token ID"}
|
||||
}
|
||||
|
||||
args := structs.ACLTokenReadRequest{
|
||||
args := structs.ACLTokenGetRequest{
|
||||
Datacenter: s.agent.config.Datacenter,
|
||||
TokenID: tokenID,
|
||||
TokenIDType: structs.ACLTokenAccessor,
|
||||
|
@ -223,7 +223,7 @@ func (s *HTTPServer) ACLPolicyCRUD(resp http.ResponseWriter, req *http.Request)
|
|||
}
|
||||
|
||||
func (s *HTTPServer) ACLPolicyRead(resp http.ResponseWriter, req *http.Request, policyID string) (interface{}, error) {
|
||||
args := structs.ACLPolicyReadRequest{
|
||||
args := structs.ACLPolicyGetRequest{
|
||||
Datacenter: s.agent.config.Datacenter,
|
||||
PolicyID: policyID,
|
||||
}
|
||||
|
@ -284,7 +284,7 @@ func fixCreateTimeAndHash(raw interface{}) error {
|
|||
}
|
||||
|
||||
func (s *HTTPServer) ACLPolicyWrite(resp http.ResponseWriter, req *http.Request, policyID string) (interface{}, error) {
|
||||
args := structs.ACLPolicyUpsertRequest{
|
||||
args := structs.ACLPolicySetRequest{
|
||||
Datacenter: s.agent.config.Datacenter,
|
||||
}
|
||||
s.parseToken(req, &args.Token)
|
||||
|
@ -302,7 +302,7 @@ func (s *HTTPServer) ACLPolicyWrite(resp http.ResponseWriter, req *http.Request,
|
|||
}
|
||||
|
||||
var out structs.ACLPolicy
|
||||
if err := s.agent.RPC("ACL.PolicyUpsert", args, &out); err != nil {
|
||||
if err := s.agent.RPC("ACL.PolicySet", args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -361,10 +361,10 @@ func (s *HTTPServer) ACLTokenCRUD(resp http.ResponseWriter, req *http.Request) (
|
|||
|
||||
switch req.Method {
|
||||
case "GET":
|
||||
fn = s.ACLTokenRead
|
||||
fn = s.ACLTokenGet
|
||||
|
||||
case "PUT":
|
||||
fn = s.ACLTokenWrite
|
||||
fn = s.ACLTokenSet
|
||||
|
||||
case "DELETE":
|
||||
fn = s.ACLTokenDelete
|
||||
|
@ -390,7 +390,7 @@ func (s *HTTPServer) ACLTokenSelf(resp http.ResponseWriter, req *http.Request) (
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
args := structs.ACLTokenReadRequest{
|
||||
args := structs.ACLTokenGetRequest{
|
||||
TokenIDType: structs.ACLTokenSecret,
|
||||
}
|
||||
|
||||
|
@ -423,11 +423,11 @@ func (s *HTTPServer) ACLTokenCreate(resp http.ResponseWriter, req *http.Request)
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
return s.ACLTokenWrite(resp, req, "")
|
||||
return s.ACLTokenSet(resp, req, "")
|
||||
}
|
||||
|
||||
func (s *HTTPServer) ACLTokenRead(resp http.ResponseWriter, req *http.Request, tokenID string) (interface{}, error) {
|
||||
args := structs.ACLTokenReadRequest{
|
||||
func (s *HTTPServer) ACLTokenGet(resp http.ResponseWriter, req *http.Request, tokenID string) (interface{}, error) {
|
||||
args := structs.ACLTokenGetRequest{
|
||||
Datacenter: s.agent.config.Datacenter,
|
||||
TokenID: tokenID,
|
||||
TokenIDType: structs.ACLTokenAccessor,
|
||||
|
@ -454,8 +454,8 @@ func (s *HTTPServer) ACLTokenRead(resp http.ResponseWriter, req *http.Request, t
|
|||
return out.Token, nil
|
||||
}
|
||||
|
||||
func (s *HTTPServer) ACLTokenWrite(resp http.ResponseWriter, req *http.Request, tokenID string) (interface{}, error) {
|
||||
args := structs.ACLTokenUpsertRequest{
|
||||
func (s *HTTPServer) ACLTokenSet(resp http.ResponseWriter, req *http.Request, tokenID string) (interface{}, error) {
|
||||
args := structs.ACLTokenSetRequest{
|
||||
Datacenter: s.agent.config.Datacenter,
|
||||
}
|
||||
s.parseToken(req, &args.Token)
|
||||
|
@ -471,7 +471,7 @@ func (s *HTTPServer) ACLTokenWrite(resp http.ResponseWriter, req *http.Request,
|
|||
}
|
||||
|
||||
var out structs.ACLToken
|
||||
if err := s.agent.RPC("ACL.TokenUpsert", args, &out); err != nil {
|
||||
if err := s.agent.RPC("ACL.TokenSet", args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -497,7 +497,7 @@ func (s *HTTPServer) ACLTokenClone(resp http.ResponseWriter, req *http.Request,
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
args := structs.ACLTokenUpsertRequest{
|
||||
args := structs.ACLTokenSetRequest{
|
||||
Datacenter: s.agent.config.Datacenter,
|
||||
}
|
||||
|
||||
|
|
|
@ -378,7 +378,7 @@ func (r *ACLResolver) fireAsyncTokenResult(token string, identity structs.ACLIde
|
|||
}
|
||||
|
||||
func (r *ACLResolver) resolveIdentityFromTokenAsync(token string, cached *structs.IdentityCacheEntry) {
|
||||
req := structs.ACLTokenReadRequest{
|
||||
req := structs.ACLTokenGetRequest{
|
||||
Datacenter: r.delegate.ACLDatacenter(false),
|
||||
TokenID: token,
|
||||
TokenIDType: structs.ACLTokenSecret,
|
||||
|
@ -491,7 +491,7 @@ func (r *ACLResolver) fireAsyncPolicyResult(policyID string, policy *structs.ACL
|
|||
}
|
||||
|
||||
func (r *ACLResolver) resolvePoliciesAsyncForIdentity(identity structs.ACLIdentity, policyIDs []string, cached map[string]*structs.PolicyCacheEntry) {
|
||||
req := structs.ACLPolicyBatchReadRequest{
|
||||
req := structs.ACLPolicyBatchGetRequest{
|
||||
Datacenter: r.delegate.ACLDatacenter(false),
|
||||
PolicyIDs: policyIDs,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
|
@ -501,7 +501,7 @@ func (r *ACLResolver) resolvePoliciesAsyncForIdentity(identity structs.ACLIdenti
|
|||
}
|
||||
|
||||
found := make(map[string]struct{})
|
||||
var resp structs.ACLPoliciesResponse
|
||||
var resp structs.ACLPolicyBatchResponse
|
||||
err := r.delegate.RPC("ACL.PolicyResolve", &req, &resp)
|
||||
if err == nil {
|
||||
for _, policy := range resp.Policies {
|
||||
|
|
|
@ -175,7 +175,7 @@ func (a *ACL) BootstrapTokens(args *structs.DCSpecificRequest, reply *structs.AC
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *ACL) TokenRead(args *structs.ACLTokenReadRequest, reply *structs.ACLTokenResponse) error {
|
||||
func (a *ACL) TokenRead(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
if err := a.aclPreCheck(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -227,7 +227,7 @@ func (a *ACL) TokenRead(args *structs.ACLTokenReadRequest, reply *structs.ACLTok
|
|||
})
|
||||
}
|
||||
|
||||
func (a *ACL) TokenClone(args *structs.ACLTokenUpsertRequest, reply *structs.ACLToken) error {
|
||||
func (a *ACL) TokenClone(args *structs.ACLTokenSetRequest, reply *structs.ACLToken) error {
|
||||
if err := a.aclPreCheck(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -265,7 +265,7 @@ func (a *ACL) TokenClone(args *structs.ACLTokenUpsertRequest, reply *structs.ACL
|
|||
return fmt.Errorf("Cannot clone a legacy ACL with this endpoint")
|
||||
}
|
||||
|
||||
cloneReq := structs.ACLTokenUpsertRequest{
|
||||
cloneReq := structs.ACLTokenSetRequest{
|
||||
Datacenter: args.Datacenter,
|
||||
ACLToken: structs.ACLToken{
|
||||
Policies: token.Policies,
|
||||
|
@ -279,10 +279,10 @@ func (a *ACL) TokenClone(args *structs.ACLTokenUpsertRequest, reply *structs.ACL
|
|||
cloneReq.ACLToken.Description = args.ACLToken.Description
|
||||
}
|
||||
|
||||
return a.tokenUpsertInternal(&cloneReq, reply, false)
|
||||
return a.tokenSetInternal(&cloneReq, reply, false)
|
||||
}
|
||||
|
||||
func (a *ACL) TokenUpsert(args *structs.ACLTokenUpsertRequest, reply *structs.ACLToken) error {
|
||||
func (a *ACL) TokenSet(args *structs.ACLTokenSetRequest, reply *structs.ACLToken) error {
|
||||
if err := a.aclPreCheck(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -294,7 +294,7 @@ func (a *ACL) TokenUpsert(args *structs.ACLTokenUpsertRequest, reply *structs.AC
|
|||
return fmt.Errorf("Local tokens are disabled")
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.TokenUpsert", args, args, reply); done {
|
||||
if done, err := a.srv.forward("ACL.TokenSet", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -307,10 +307,10 @@ func (a *ACL) TokenUpsert(args *structs.ACLTokenUpsertRequest, reply *structs.AC
|
|||
return acl.ErrPermissionDenied
|
||||
}
|
||||
|
||||
return a.tokenUpsertInternal(args, reply, false)
|
||||
return a.tokenSetInternal(args, reply, false)
|
||||
}
|
||||
|
||||
func (a *ACL) tokenUpsertInternal(args *structs.ACLTokenUpsertRequest, reply *structs.ACLToken, upgrade bool) error {
|
||||
func (a *ACL) tokenSetInternal(args *structs.ACLTokenSetRequest, reply *structs.ACLToken, upgrade bool) error {
|
||||
token := &args.ACLToken
|
||||
|
||||
if !a.srv.LocalTokensEnabled() {
|
||||
|
@ -420,12 +420,12 @@ func (a *ACL) tokenUpsertInternal(args *structs.ACLTokenUpsertRequest, reply *st
|
|||
|
||||
token.SetHash(true)
|
||||
|
||||
req := &structs.ACLTokenBatchUpsertRequest{
|
||||
Tokens: structs.ACLTokens{token},
|
||||
AllowCreate: true,
|
||||
req := &structs.ACLTokenBatchSetRequest{
|
||||
Tokens: structs.ACLTokens{token},
|
||||
CAS: false,
|
||||
}
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLTokenUpsertRequestType, req)
|
||||
resp, err := a.srv.raftApply(structs.ACLTokenSetRequestType, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply token write request: %v", err)
|
||||
}
|
||||
|
@ -553,7 +553,7 @@ func (a *ACL) TokenList(args *structs.ACLTokenListRequest, reply *structs.ACLTok
|
|||
})
|
||||
}
|
||||
|
||||
func (a *ACL) TokenBatchRead(args *structs.ACLTokenBatchReadRequest, reply *structs.ACLTokensResponse) error {
|
||||
func (a *ACL) TokenBatchRead(args *structs.ACLTokenBatchGetRequest, reply *structs.ACLTokenBatchResponse) error {
|
||||
if err := a.aclPreCheck(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -575,7 +575,7 @@ func (a *ACL) TokenBatchRead(args *structs.ACLTokenBatchReadRequest, reply *stru
|
|||
|
||||
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
index, tokens, err := state.ACLTokenBatchRead(ws, args.AccessorIDs)
|
||||
index, tokens, err := state.ACLTokenBatchGet(ws, args.AccessorIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -587,7 +587,7 @@ func (a *ACL) TokenBatchRead(args *structs.ACLTokenBatchReadRequest, reply *stru
|
|||
})
|
||||
}
|
||||
|
||||
func (a *ACL) PolicyRead(args *structs.ACLPolicyReadRequest, reply *structs.ACLPolicyResponse) error {
|
||||
func (a *ACL) PolicyRead(args *structs.ACLPolicyGetRequest, reply *structs.ACLPolicyResponse) error {
|
||||
if err := a.aclPreCheck(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -615,7 +615,7 @@ func (a *ACL) PolicyRead(args *structs.ACLPolicyReadRequest, reply *structs.ACLP
|
|||
})
|
||||
}
|
||||
|
||||
func (a *ACL) PolicyBatchRead(args *structs.ACLPolicyBatchReadRequest, reply *structs.ACLPoliciesResponse) error {
|
||||
func (a *ACL) PolicyBatchRead(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
if err := a.aclPreCheck(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -632,7 +632,7 @@ func (a *ACL) PolicyBatchRead(args *structs.ACLPolicyBatchReadRequest, reply *st
|
|||
|
||||
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
index, policies, err := state.ACLPolicyBatchRead(ws, args.PolicyIDs)
|
||||
index, policies, err := state.ACLPolicyBatchGet(ws, args.PolicyIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -642,7 +642,7 @@ func (a *ACL) PolicyBatchRead(args *structs.ACLPolicyBatchReadRequest, reply *st
|
|||
})
|
||||
}
|
||||
|
||||
func (a *ACL) PolicyUpsert(args *structs.ACLPolicyUpsertRequest, reply *structs.ACLPolicy) error {
|
||||
func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPolicy) error {
|
||||
if err := a.aclPreCheck(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -651,7 +651,7 @@ func (a *ACL) PolicyUpsert(args *structs.ACLPolicyUpsertRequest, reply *structs.
|
|||
args.Datacenter = a.srv.config.ACLDatacenter
|
||||
}
|
||||
|
||||
if done, err := a.srv.forward("ACL.PolicyUpsert", args, args, reply); done {
|
||||
if done, err := a.srv.forward("ACL.PolicySet", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -736,11 +736,11 @@ func (a *ACL) PolicyUpsert(args *structs.ACLPolicyUpsertRequest, reply *structs.
|
|||
// calcualte the hash for this policy
|
||||
policy.SetHash(true)
|
||||
|
||||
req := &structs.ACLPolicyBatchUpsertRequest{
|
||||
req := &structs.ACLPolicyBatchSetRequest{
|
||||
Policies: structs.ACLPolicies{policy},
|
||||
}
|
||||
|
||||
resp, err := a.srv.raftApply(structs.ACLPolicyUpsertRequestType, req)
|
||||
resp, err := a.srv.raftApply(structs.ACLPolicySetRequestType, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to apply policy upsert request: %v", err)
|
||||
}
|
||||
|
@ -837,7 +837,7 @@ func (a *ACL) PolicyList(args *structs.ACLPolicyListRequest, reply *structs.ACLP
|
|||
|
||||
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
index, policies, err := state.ACLPolicyList(ws, args.DCScope)
|
||||
index, policies, err := state.ACLPolicyList(ws)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -854,7 +854,7 @@ func (a *ACL) PolicyList(args *structs.ACLPolicyListRequest, reply *structs.ACLP
|
|||
|
||||
// PolicyResolve is used to retrieve a subset of the policies associated with a given token
|
||||
// The policy ids in the args simply act as a filter on the policy set assigned to the token
|
||||
func (a *ACL) PolicyResolve(args *structs.ACLPolicyBatchReadRequest, reply *structs.ACLPoliciesResponse) error {
|
||||
func (a *ACL) PolicyResolve(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
if err := a.aclPreCheck(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -145,7 +145,7 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
|
|||
defer metrics.MeasureSince([]string{"acl", "apply"}, time.Now())
|
||||
|
||||
// Verify we are allowed to serve this request
|
||||
if a.srv.config.ACLDatacenter != a.srv.config.Datacenter {
|
||||
if !a.srv.ACLsEnabled() {
|
||||
return acl.ErrDisabled
|
||||
}
|
||||
|
||||
|
@ -189,7 +189,7 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
|
|||
}
|
||||
|
||||
// Verify we are allowed to serve this request
|
||||
if a.srv.config.ACLDatacenter != a.srv.config.Datacenter {
|
||||
if !a.srv.ACLsEnabled() {
|
||||
return acl.ErrDisabled
|
||||
}
|
||||
|
||||
|
@ -226,7 +226,7 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
|
|||
}
|
||||
|
||||
// Verify we are allowed to serve this request
|
||||
if a.srv.config.ACLDatacenter != a.srv.config.Datacenter {
|
||||
if !a.srv.ACLsEnabled() {
|
||||
return acl.ErrDisabled
|
||||
}
|
||||
|
||||
|
|
|
@ -648,7 +648,7 @@ func TestACLEndpoint_TokenRead(t *testing.T) {
|
|||
|
||||
// exists and matches what we created
|
||||
{
|
||||
req := structs.ACLTokenReadRequest{
|
||||
req := structs.ACLTokenGetRequest{
|
||||
Datacenter: "dc1",
|
||||
TokenID: token.AccessorID,
|
||||
TokenIDType: structs.ACLTokenAccessor,
|
||||
|
@ -670,7 +670,7 @@ func TestACLEndpoint_TokenRead(t *testing.T) {
|
|||
fakeID, err := uuid.GenerateUUID()
|
||||
assert.NoError(err)
|
||||
|
||||
req := structs.ACLTokenReadRequest{
|
||||
req := structs.ACLTokenGetRequest{
|
||||
Datacenter: "dc1",
|
||||
TokenID: fakeID,
|
||||
TokenIDType: structs.ACLTokenAccessor,
|
||||
|
@ -686,7 +686,7 @@ func TestACLEndpoint_TokenRead(t *testing.T) {
|
|||
|
||||
// validates ID format
|
||||
{
|
||||
req := structs.ACLTokenReadRequest{
|
||||
req := structs.ACLTokenGetRequest{
|
||||
Datacenter: "dc1",
|
||||
TokenID: "definitely-really-certainly-not-a-uuid",
|
||||
TokenIDType: structs.ACLTokenAccessor,
|
||||
|
@ -722,7 +722,7 @@ func TestACLEndpoint_TokenClone(t *testing.T) {
|
|||
|
||||
acl := ACL{srv: s1}
|
||||
|
||||
req := structs.ACLTokenUpsertRequest{
|
||||
req := structs.ACLTokenSetRequest{
|
||||
Datacenter: "dc1",
|
||||
ACLToken: structs.ACLToken{AccessorID: t1.AccessorID},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
|
@ -741,7 +741,7 @@ func TestACLEndpoint_TokenClone(t *testing.T) {
|
|||
assert.NotEqual(t1.SecretID, t2.SecretID)
|
||||
}
|
||||
|
||||
func TestACLEndpoint_TokenUpsert(t *testing.T) {
|
||||
func TestACLEndpoint_TokenSet(t *testing.T) {
|
||||
t.Parallel()
|
||||
assert := assert.New(t)
|
||||
|
||||
|
@ -762,7 +762,7 @@ func TestACLEndpoint_TokenUpsert(t *testing.T) {
|
|||
|
||||
// Create it
|
||||
{
|
||||
req := structs.ACLTokenUpsertRequest{
|
||||
req := structs.ACLTokenSetRequest{
|
||||
Datacenter: "dc1",
|
||||
ACLToken: structs.ACLToken{
|
||||
Description: "foobar",
|
||||
|
@ -774,7 +774,7 @@ func TestACLEndpoint_TokenUpsert(t *testing.T) {
|
|||
|
||||
resp := structs.ACLToken{}
|
||||
|
||||
err := acl.TokenUpsert(&req, &resp)
|
||||
err := acl.TokenSet(&req, &resp)
|
||||
assert.NoError(err)
|
||||
|
||||
// Get the token directly to validate that it exists
|
||||
|
@ -790,7 +790,7 @@ func TestACLEndpoint_TokenUpsert(t *testing.T) {
|
|||
}
|
||||
// Update it
|
||||
{
|
||||
req := structs.ACLTokenUpsertRequest{
|
||||
req := structs.ACLTokenSetRequest{
|
||||
Datacenter: "dc1",
|
||||
ACLToken: structs.ACLToken{
|
||||
Description: "new-description",
|
||||
|
@ -801,7 +801,7 @@ func TestACLEndpoint_TokenUpsert(t *testing.T) {
|
|||
|
||||
resp := structs.ACLToken{}
|
||||
|
||||
err := acl.TokenUpsert(&req, &resp)
|
||||
err := acl.TokenSet(&req, &resp)
|
||||
assert.NoError(err)
|
||||
|
||||
// Get the token directly to validate that it exists
|
||||
|
@ -814,7 +814,7 @@ func TestACLEndpoint_TokenUpsert(t *testing.T) {
|
|||
assert.Equal(token.AccessorID, resp.AccessorID)
|
||||
}
|
||||
}
|
||||
func TestACLEndpoint_TokenUpsert_anon(t *testing.T) {
|
||||
func TestACLEndpoint_TokenSet_anon(t *testing.T) {
|
||||
t.Parallel()
|
||||
assert := assert.New(t)
|
||||
|
||||
|
@ -835,7 +835,7 @@ func TestACLEndpoint_TokenUpsert_anon(t *testing.T) {
|
|||
acl := ACL{srv: s1}
|
||||
|
||||
// Assign the policies to a token
|
||||
tokenUpsertReq := structs.ACLTokenUpsertRequest{
|
||||
tokenUpsertReq := structs.ACLTokenSetRequest{
|
||||
Datacenter: "dc1",
|
||||
ACLToken: structs.ACLToken{
|
||||
AccessorID: structs.ACLTokenAnonymousID,
|
||||
|
@ -848,7 +848,7 @@ func TestACLEndpoint_TokenUpsert_anon(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
token := structs.ACLToken{}
|
||||
err = acl.TokenUpsert(&tokenUpsertReq, &token)
|
||||
err = acl.TokenSet(&tokenUpsertReq, &token)
|
||||
assert.NoError(err)
|
||||
assert.NotEmpty(token.SecretID)
|
||||
|
||||
|
@ -1021,13 +1021,13 @@ func TestACLEndpoint_TokenBatchRead(t *testing.T) {
|
|||
acl := ACL{srv: s1}
|
||||
tokens := []string{t1.AccessorID, t2.AccessorID}
|
||||
|
||||
req := structs.ACLTokenBatchReadRequest{
|
||||
req := structs.ACLTokenBatchGetRequest{
|
||||
Datacenter: "dc1",
|
||||
AccessorIDs: tokens,
|
||||
QueryOptions: structs.QueryOptions{Token: "root"},
|
||||
}
|
||||
|
||||
resp := structs.ACLTokensResponse{}
|
||||
resp := structs.ACLTokenBatchResponse{}
|
||||
|
||||
err = acl.TokenBatchRead(&req, &resp)
|
||||
assert.NoError(err)
|
||||
|
@ -1061,7 +1061,7 @@ func TestACLEndpoint_PolicyRead(t *testing.T) {
|
|||
|
||||
acl := ACL{srv: s1}
|
||||
|
||||
req := structs.ACLPolicyReadRequest{
|
||||
req := structs.ACLPolicyGetRequest{
|
||||
Datacenter: "dc1",
|
||||
PolicyID: policy.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: "root"},
|
||||
|
@ -1104,13 +1104,13 @@ func TestACLEndpoint_PolicyBatchRead(t *testing.T) {
|
|||
acl := ACL{srv: s1}
|
||||
tokens := []string{t1.AccessorID, t2.AccessorID}
|
||||
|
||||
req := structs.ACLTokenBatchReadRequest{
|
||||
req := structs.ACLTokenBatchGetRequest{
|
||||
Datacenter: "dc1",
|
||||
AccessorIDs: tokens,
|
||||
QueryOptions: structs.QueryOptions{Token: "root"},
|
||||
}
|
||||
|
||||
resp := structs.ACLTokensResponse{}
|
||||
resp := structs.ACLTokenBatchResponse{}
|
||||
|
||||
err = acl.TokenBatchRead(&req, &resp)
|
||||
assert.NoError(err)
|
||||
|
@ -1123,7 +1123,7 @@ func TestACLEndpoint_PolicyBatchRead(t *testing.T) {
|
|||
assert.EqualValues(retrievedTokens, tokens)
|
||||
}
|
||||
|
||||
func TestACLEndpoint_PolicyUpsert(t *testing.T) {
|
||||
func TestACLEndpoint_PolicySet(t *testing.T) {
|
||||
t.Parallel()
|
||||
assert := assert.New(t)
|
||||
|
||||
|
@ -1144,7 +1144,7 @@ func TestACLEndpoint_PolicyUpsert(t *testing.T) {
|
|||
|
||||
// Create it
|
||||
{
|
||||
req := structs.ACLPolicyUpsertRequest{
|
||||
req := structs.ACLPolicySetRequest{
|
||||
Datacenter: "dc1",
|
||||
Policy: structs.ACLPolicy{
|
||||
Description: "foobar",
|
||||
|
@ -1155,7 +1155,7 @@ func TestACLEndpoint_PolicyUpsert(t *testing.T) {
|
|||
}
|
||||
resp := structs.ACLPolicy{}
|
||||
|
||||
err := acl.PolicyUpsert(&req, &resp)
|
||||
err := acl.PolicySet(&req, &resp)
|
||||
assert.NoError(err)
|
||||
assert.NotNil(resp.ID)
|
||||
|
||||
|
@ -1174,7 +1174,7 @@ func TestACLEndpoint_PolicyUpsert(t *testing.T) {
|
|||
|
||||
// Update it
|
||||
{
|
||||
req := structs.ACLPolicyUpsertRequest{
|
||||
req := structs.ACLPolicySetRequest{
|
||||
Datacenter: "dc1",
|
||||
Policy: structs.ACLPolicy{
|
||||
ID: policyID,
|
||||
|
@ -1186,7 +1186,7 @@ func TestACLEndpoint_PolicyUpsert(t *testing.T) {
|
|||
}
|
||||
resp := structs.ACLPolicy{}
|
||||
|
||||
err := acl.PolicyUpsert(&req, &resp)
|
||||
err := acl.PolicySet(&req, &resp)
|
||||
assert.NoError(err)
|
||||
assert.NotNil(resp.ID)
|
||||
|
||||
|
@ -1202,7 +1202,7 @@ func TestACLEndpoint_PolicyUpsert(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestACLEndpoint_PolicyUpsert_globalManagement(t *testing.T) {
|
||||
func TestACLEndpoint_PolicySet_globalManagement(t *testing.T) {
|
||||
t.Parallel()
|
||||
assert := assert.New(t)
|
||||
|
||||
|
@ -1223,7 +1223,7 @@ func TestACLEndpoint_PolicyUpsert_globalManagement(t *testing.T) {
|
|||
// Can't change the rules
|
||||
{
|
||||
|
||||
req := structs.ACLPolicyUpsertRequest{
|
||||
req := structs.ACLPolicySetRequest{
|
||||
Datacenter: "dc1",
|
||||
Policy: structs.ACLPolicy{
|
||||
ID: structs.ACLPolicyGlobalManagementID,
|
||||
|
@ -1234,13 +1234,13 @@ func TestACLEndpoint_PolicyUpsert_globalManagement(t *testing.T) {
|
|||
}
|
||||
resp := structs.ACLPolicy{}
|
||||
|
||||
err := acl.PolicyUpsert(&req, &resp)
|
||||
err := acl.PolicySet(&req, &resp)
|
||||
assert.EqualError(err, "Changing the Rules for the builtin global-management policy is not permitted")
|
||||
}
|
||||
|
||||
// Can rename it
|
||||
{
|
||||
req := structs.ACLPolicyUpsertRequest{
|
||||
req := structs.ACLPolicySetRequest{
|
||||
Datacenter: "dc1",
|
||||
Policy: structs.ACLPolicy{
|
||||
ID: structs.ACLPolicyGlobalManagementID,
|
||||
|
@ -1251,7 +1251,7 @@ func TestACLEndpoint_PolicyUpsert_globalManagement(t *testing.T) {
|
|||
}
|
||||
resp := structs.ACLPolicy{}
|
||||
|
||||
err := acl.PolicyUpsert(&req, &resp)
|
||||
err := acl.PolicySet(&req, &resp)
|
||||
assert.NoError(err)
|
||||
|
||||
// Get the policy again
|
||||
|
@ -1404,7 +1404,7 @@ func TestACLEndpoint_PolicyResolve(t *testing.T) {
|
|||
policies := []string{p1.ID, p2.ID}
|
||||
|
||||
// Assign the policies to a token
|
||||
tokenUpsertReq := structs.ACLTokenUpsertRequest{
|
||||
tokenUpsertReq := structs.ACLTokenSetRequest{
|
||||
Datacenter: "dc1",
|
||||
ACLToken: structs.ACLToken{
|
||||
Policies: []structs.ACLTokenPolicyLink{
|
||||
|
@ -1419,12 +1419,12 @@ func TestACLEndpoint_PolicyResolve(t *testing.T) {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
token := structs.ACLToken{}
|
||||
err = acl.TokenUpsert(&tokenUpsertReq, &token)
|
||||
err = acl.TokenSet(&tokenUpsertReq, &token)
|
||||
assert.NoError(err)
|
||||
assert.NotEmpty(token.SecretID)
|
||||
|
||||
resp := structs.ACLPoliciesResponse{}
|
||||
req := structs.ACLPolicyBatchReadRequest{
|
||||
resp := structs.ACLPolicyBatchResponse{}
|
||||
req := structs.ACLPolicyBatchGetRequest{
|
||||
Datacenter: "dc1",
|
||||
PolicyIDs: []string{p1.ID, p2.ID},
|
||||
QueryOptions: structs.QueryOptions{Token: token.SecretID},
|
||||
|
@ -1442,7 +1442,7 @@ func TestACLEndpoint_PolicyResolve(t *testing.T) {
|
|||
|
||||
// upsertTestToken creates a token for testing purposes
|
||||
func upsertTestToken(codec rpc.ClientCodec, masterToken string, datacenter string) (*structs.ACLToken, error) {
|
||||
arg := structs.ACLTokenUpsertRequest{
|
||||
arg := structs.ACLTokenSetRequest{
|
||||
Datacenter: datacenter,
|
||||
ACLToken: structs.ACLToken{
|
||||
Description: "User token",
|
||||
|
@ -1454,7 +1454,7 @@ func upsertTestToken(codec rpc.ClientCodec, masterToken string, datacenter strin
|
|||
|
||||
var out structs.ACLToken
|
||||
|
||||
err := msgpackrpc.CallWithCodec(codec, "ACL.TokenUpsert", &arg, &out)
|
||||
err := msgpackrpc.CallWithCodec(codec, "ACL.TokenSet", &arg, &out)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1469,7 +1469,7 @@ func upsertTestToken(codec rpc.ClientCodec, masterToken string, datacenter strin
|
|||
|
||||
// retrieveTestToken returns a policy for testing purposes
|
||||
func retrieveTestToken(codec rpc.ClientCodec, masterToken string, datacenter string, id string) (*structs.ACLTokenResponse, error) {
|
||||
arg := structs.ACLTokenReadRequest{
|
||||
arg := structs.ACLTokenGetRequest{
|
||||
Datacenter: datacenter,
|
||||
TokenID: id,
|
||||
TokenIDType: structs.ACLTokenAccessor,
|
||||
|
@ -1495,7 +1495,7 @@ func upsertTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter stri
|
|||
return nil, err
|
||||
}
|
||||
|
||||
arg := structs.ACLPolicyUpsertRequest{
|
||||
arg := structs.ACLPolicySetRequest{
|
||||
Datacenter: datacenter,
|
||||
Policy: structs.ACLPolicy{
|
||||
Name: fmt.Sprintf("test-policy-%s", policyUnq),
|
||||
|
@ -1505,7 +1505,7 @@ func upsertTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter stri
|
|||
|
||||
var out structs.ACLPolicy
|
||||
|
||||
err = msgpackrpc.CallWithCodec(codec, "ACL.PolicyUpsert", &arg, &out)
|
||||
err = msgpackrpc.CallWithCodec(codec, "ACL.PolicySet", &arg, &out)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1520,7 +1520,7 @@ func upsertTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter stri
|
|||
|
||||
// retrieveTestPolicy returns a policy for testing purposes
|
||||
func retrieveTestPolicy(codec rpc.ClientCodec, masterToken string, datacenter string, id string) (*structs.ACLPolicyResponse, error) {
|
||||
arg := structs.ACLPolicyReadRequest{
|
||||
arg := structs.ACLPolicyGetRequest{
|
||||
Datacenter: datacenter,
|
||||
PolicyID: id,
|
||||
QueryOptions: structs.QueryOptions{Token: masterToken},
|
||||
|
|
|
@ -105,11 +105,11 @@ func (s *Server) updateLocalACLPolicies(policies structs.ACLPolicies, ctx contex
|
|||
batchSize += policies[batchEnd].EstimateSize()
|
||||
}
|
||||
|
||||
req := structs.ACLPolicyBatchUpsertRequest{
|
||||
req := structs.ACLPolicyBatchSetRequest{
|
||||
Policies: policies[batchStart:batchEnd],
|
||||
}
|
||||
|
||||
resp, err := s.raftApply(structs.ACLPolicyUpsertRequestType, &req)
|
||||
resp, err := s.raftApply(structs.ACLPolicySetRequestType, &req)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Failed to apply policy upserts: %v", err)
|
||||
}
|
||||
|
@ -134,8 +134,8 @@ func (s *Server) updateLocalACLPolicies(policies structs.ACLPolicies, ctx contex
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func (s *Server) fetchACLPoliciesBatch(policyIDs []string) (*structs.ACLPoliciesResponse, error) {
|
||||
req := structs.ACLPolicyBatchReadRequest{
|
||||
func (s *Server) fetchACLPoliciesBatch(policyIDs []string) (*structs.ACLPolicyBatchResponse, error) {
|
||||
req := structs.ACLPolicyBatchGetRequest{
|
||||
Datacenter: s.config.ACLDatacenter,
|
||||
PolicyIDs: policyIDs,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
|
@ -144,7 +144,7 @@ func (s *Server) fetchACLPoliciesBatch(policyIDs []string) (*structs.ACLPolicies
|
|||
},
|
||||
}
|
||||
|
||||
var response structs.ACLPoliciesResponse
|
||||
var response structs.ACLPolicyBatchResponse
|
||||
if err := s.RPC("ACL.PolicyBatchRead", &req, &response); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -261,12 +261,12 @@ func (s *Server) updateLocalACLTokens(tokens structs.ACLTokens, ctx context.Cont
|
|||
batchSize += tokens[batchEnd].EstimateSize()
|
||||
}
|
||||
|
||||
req := structs.ACLTokenBatchUpsertRequest{
|
||||
Tokens: tokens[batchStart:batchEnd],
|
||||
AllowCreate: true,
|
||||
req := structs.ACLTokenBatchSetRequest{
|
||||
Tokens: tokens[batchStart:batchEnd],
|
||||
CAS: false,
|
||||
}
|
||||
|
||||
resp, err := s.raftApply(structs.ACLTokenUpsertRequestType, &req)
|
||||
resp, err := s.raftApply(structs.ACLTokenSetRequestType, &req)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Failed to apply token upserts: %v", err)
|
||||
}
|
||||
|
@ -292,8 +292,8 @@ func (s *Server) updateLocalACLTokens(tokens structs.ACLTokens, ctx context.Cont
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func (s *Server) fetchACLTokensBatch(tokenIDs []string) (*structs.ACLTokensResponse, error) {
|
||||
req := structs.ACLTokenBatchReadRequest{
|
||||
func (s *Server) fetchACLTokensBatch(tokenIDs []string) (*structs.ACLTokenBatchResponse, error) {
|
||||
req := structs.ACLTokenBatchGetRequest{
|
||||
Datacenter: s.config.ACLDatacenter,
|
||||
AccessorIDs: tokenIDs,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
|
@ -302,7 +302,7 @@ func (s *Server) fetchACLTokensBatch(tokenIDs []string) (*structs.ACLTokensRespo
|
|||
},
|
||||
}
|
||||
|
||||
var response structs.ACLTokensResponse
|
||||
var response structs.ACLTokenBatchResponse
|
||||
if err := s.RPC("ACL.TokenBatchRead", &req, &response); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -354,7 +354,7 @@ func (s *Server) replicateACLPolicies(lastRemoteIndex uint64, ctx context.Contex
|
|||
// replication process is.
|
||||
defer metrics.MeasureSince([]string{"leader", "replication", "acl", "policy", "apply"}, time.Now())
|
||||
|
||||
_, local, err := s.fsm.State().ACLPolicyList(nil, "")
|
||||
_, local, err := s.fsm.State().ACLPolicyList(nil)
|
||||
if err != nil {
|
||||
return 0, false, fmt.Errorf("failed to retrieve local ACL policies: %v", err)
|
||||
}
|
||||
|
@ -374,7 +374,7 @@ func (s *Server) replicateACLPolicies(lastRemoteIndex uint64, ctx context.Contex
|
|||
|
||||
s.logger.Printf("[DEBUG] acl: policy replication - deletions: %d, updates: %d", len(deletions), len(updates))
|
||||
|
||||
var policies *structs.ACLPoliciesResponse
|
||||
var policies *structs.ACLPolicyBatchResponse
|
||||
if len(updates) > 0 {
|
||||
policies, err = s.fetchACLPoliciesBatch(updates)
|
||||
if err != nil {
|
||||
|
@ -456,7 +456,7 @@ func (s *Server) replicateACLTokens(lastRemoteIndex uint64, ctx context.Context)
|
|||
deletions, updates := diffACLTokens(local, remote.Tokens, lastRemoteIndex)
|
||||
s.logger.Printf("[DEBUG] acl: token replication - deletions: %d, updates: %d", len(deletions), len(updates))
|
||||
|
||||
var tokens *structs.ACLTokensResponse
|
||||
var tokens *structs.ACLTokenBatchResponse
|
||||
if len(updates) > 0 {
|
||||
tokens, err = s.fetchACLTokensBatch(updates)
|
||||
if err != nil {
|
||||
|
|
|
@ -0,0 +1,467 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/consul/testutil/retry"
|
||||
)
|
||||
|
||||
func TestACLReplication_Sorter(t *testing.T) {
|
||||
t.Parallel()
|
||||
acls := structs.ACLs{
|
||||
&structs.ACL{ID: "a"},
|
||||
&structs.ACL{ID: "b"},
|
||||
&structs.ACL{ID: "c"},
|
||||
}
|
||||
|
||||
sorter := &aclIterator{acls, 0}
|
||||
if len := sorter.Len(); len != 3 {
|
||||
t.Fatalf("bad: %d", len)
|
||||
}
|
||||
if !sorter.Less(0, 1) {
|
||||
t.Fatalf("should be less")
|
||||
}
|
||||
if sorter.Less(1, 0) {
|
||||
t.Fatalf("should not be less")
|
||||
}
|
||||
if !sort.IsSorted(sorter) {
|
||||
t.Fatalf("should be sorted")
|
||||
}
|
||||
|
||||
expected := structs.ACLs{
|
||||
&structs.ACL{ID: "b"},
|
||||
&structs.ACL{ID: "a"},
|
||||
&structs.ACL{ID: "c"},
|
||||
}
|
||||
sorter.Swap(0, 1)
|
||||
if !reflect.DeepEqual(acls, expected) {
|
||||
t.Fatalf("bad: %v", acls)
|
||||
}
|
||||
if sort.IsSorted(sorter) {
|
||||
t.Fatalf("should not be sorted")
|
||||
}
|
||||
sort.Sort(sorter)
|
||||
if !sort.IsSorted(sorter) {
|
||||
t.Fatalf("should be sorted")
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_Iterator(t *testing.T) {
|
||||
t.Parallel()
|
||||
acls := structs.ACLs{}
|
||||
|
||||
iter := newACLIterator(acls)
|
||||
if front := iter.Front(); front != nil {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != nil {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
|
||||
acls = structs.ACLs{
|
||||
&structs.ACL{ID: "a"},
|
||||
&structs.ACL{ID: "b"},
|
||||
&structs.ACL{ID: "c"},
|
||||
}
|
||||
iter = newACLIterator(acls)
|
||||
if front := iter.Front(); front != acls[0] {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != acls[1] {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != acls[2] {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != nil {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_reconcileACLs(t *testing.T) {
|
||||
t.Parallel()
|
||||
parseACLs := func(raw string) structs.ACLs {
|
||||
var acls structs.ACLs
|
||||
for _, key := range strings.Split(raw, "|") {
|
||||
if len(key) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
tuple := strings.Split(key, ":")
|
||||
index, err := strconv.Atoi(tuple[1])
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
acl := &structs.ACL{
|
||||
ID: tuple[0],
|
||||
Rules: tuple[2],
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: uint64(index),
|
||||
},
|
||||
}
|
||||
acls = append(acls, acl)
|
||||
}
|
||||
return acls
|
||||
}
|
||||
|
||||
parseChanges := func(changes structs.ACLRequests) string {
|
||||
var ret string
|
||||
for i, change := range changes {
|
||||
if i > 0 {
|
||||
ret += "|"
|
||||
}
|
||||
ret += fmt.Sprintf("%s:%s:%s", change.Op, change.ACL.ID, change.ACL.Rules)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
local string
|
||||
remote string
|
||||
lastRemoteIndex uint64
|
||||
expected string
|
||||
}{
|
||||
// Everything empty.
|
||||
{
|
||||
local: "",
|
||||
remote: "",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "",
|
||||
},
|
||||
// First time with empty local.
|
||||
{
|
||||
local: "",
|
||||
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
|
||||
},
|
||||
// Remote not sorted.
|
||||
{
|
||||
local: "",
|
||||
remote: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
|
||||
},
|
||||
// Neither side sorted.
|
||||
{
|
||||
local: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
|
||||
remote: "ccc:9:X|bbb:3:X|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "",
|
||||
},
|
||||
// Fully replicated, nothing to do.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "",
|
||||
},
|
||||
// Change an ACL.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:ccc:Y",
|
||||
},
|
||||
// Change an ACL, but mask the change by the last replicated
|
||||
// index. This isn't how things work normally, but it proves
|
||||
// we are skipping the full compare based on the index.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 33,
|
||||
expected: "",
|
||||
},
|
||||
// Empty everything out.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "delete:bbb:X|delete:ccc:X|delete:ddd:X|delete:eee:X",
|
||||
},
|
||||
// Adds on the ends and in the middle.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "aaa:99:X|bbb:3:X|ccc:9:X|ccx:101:X|ddd:2:X|eee:11:X|fff:102:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:aaa:X|set:ccx:X|set:fff:X",
|
||||
},
|
||||
// Deletes on the ends and in the middle.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "ccc:9:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "delete:bbb:X|delete:ddd:X|delete:eee:X",
|
||||
},
|
||||
// Everything.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "aaa:99:X|bbb:3:X|ccx:101:X|ddd:103:Y|eee:11:X|fff:102:X",
|
||||
lastRemoteIndex: 11,
|
||||
expected: "set:aaa:X|delete:ccc:X|set:ccx:X|set:ddd:Y|set:fff:X",
|
||||
},
|
||||
}
|
||||
for i, test := range tests {
|
||||
local, remote := parseACLs(test.local), parseACLs(test.remote)
|
||||
changes := reconcileLegacyACLs(local, remote, test.lastRemoteIndex)
|
||||
if actual := parseChanges(changes); actual != test.expected {
|
||||
t.Errorf("test case %d failed: %s", i, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLReplicationApplyLimit = 1
|
||||
})
|
||||
s1.tokens.UpdateACLReplicationToken("secret")
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
changes := structs.ACLRequests{
|
||||
&structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
ID: "secret",
|
||||
Type: "client",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Should be throttled to 1 Hz.
|
||||
start := time.Now()
|
||||
if _, err := s1.updateLocalLegacyACLs(changes, context.Background()); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if dur := time.Since(start); dur < time.Second {
|
||||
t.Fatalf("too slow: %9.6f", dur.Seconds())
|
||||
}
|
||||
|
||||
changes = append(changes,
|
||||
&structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
ID: "secret",
|
||||
Type: "client",
|
||||
},
|
||||
})
|
||||
|
||||
// Should be throttled to 1 Hz.
|
||||
start = time.Now()
|
||||
if _, err := s1.updateLocalLegacyACLs(changes, context.Background()); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if dur := time.Since(start); dur < 2*time.Second {
|
||||
t.Fatalf("too fast: %9.6f", dur.Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_IsACLReplicationEnabled(t *testing.T) {
|
||||
t.Parallel()
|
||||
// ACLs not enabled.
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = ""
|
||||
c.ACLsEnabled = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
if s1.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should not be enabled")
|
||||
}
|
||||
|
||||
// ACLs enabled but not replication.
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
|
||||
if s2.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should not be enabled")
|
||||
}
|
||||
|
||||
// ACLs enabled with replication.
|
||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLTokenReplication = true
|
||||
})
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
testrpc.WaitForLeader(t, s3.RPC, "dc2")
|
||||
if !s3.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should be enabled")
|
||||
}
|
||||
|
||||
// ACLs enabled with replication, but inside the ACL datacenter
|
||||
// so replication should be disabled.
|
||||
dir4, s4 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLTokenReplication = true
|
||||
})
|
||||
defer os.RemoveAll(dir4)
|
||||
defer s4.Shutdown()
|
||||
testrpc.WaitForLeader(t, s4.RPC, "dc1")
|
||||
if s4.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should not be enabled")
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_LegacyTokens(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = "root"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLTokenReplication = true
|
||||
c.ACLReplicationRate = 100
|
||||
c.ACLReplicationBurst = 100
|
||||
c.ACLReplicationApplyLimit = 1000000
|
||||
})
|
||||
s2.tokens.UpdateACLReplicationToken("root")
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Try to join.
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
// Create a bunch of new tokens.
|
||||
var id string
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTokenTypeClient,
|
||||
Rules: testACLPolicy,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
checkSame := func() error {
|
||||
index, remote, err := s1.fsm.State().ACLTokenList(nil, true, true, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, local, err := s2.fsm.State().ACLTokenList(nil, true, true, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if got, want := len(remote), len(local); got != want {
|
||||
return fmt.Errorf("got %d remote ACLs want %d", got, want)
|
||||
}
|
||||
for i, token := range remote {
|
||||
if !bytes.Equal(token.Hash, local[i].Hash) {
|
||||
return fmt.Errorf("ACLs differ")
|
||||
}
|
||||
}
|
||||
|
||||
var status structs.ACLReplicationStatus
|
||||
s2.aclReplicationStatusLock.RLock()
|
||||
status = s2.aclReplicationStatus
|
||||
s2.aclReplicationStatusLock.RUnlock()
|
||||
if !status.Enabled || !status.Running ||
|
||||
status.ReplicatedTokenIndex != index ||
|
||||
status.SourceDatacenter != "dc1" {
|
||||
return fmt.Errorf("ACL replication status differs")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := checkSame(); err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
})
|
||||
|
||||
// Create more new tokens.
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTokenTypeClient,
|
||||
Rules: testACLPolicy,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var dontCare string
|
||||
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := checkSame(); err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
})
|
||||
|
||||
// Delete a token.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLDelete,
|
||||
ACL: structs.ACL{
|
||||
ID: id,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var dontCare string
|
||||
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := checkSame(); err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
|
@ -1,16 +1,9 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -19,455 +12,6 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestACLReplication_Sorter(t *testing.T) {
|
||||
t.Parallel()
|
||||
acls := structs.ACLs{
|
||||
&structs.ACL{ID: "a"},
|
||||
&structs.ACL{ID: "b"},
|
||||
&structs.ACL{ID: "c"},
|
||||
}
|
||||
|
||||
sorter := &aclIterator{acls, 0}
|
||||
if len := sorter.Len(); len != 3 {
|
||||
t.Fatalf("bad: %d", len)
|
||||
}
|
||||
if !sorter.Less(0, 1) {
|
||||
t.Fatalf("should be less")
|
||||
}
|
||||
if sorter.Less(1, 0) {
|
||||
t.Fatalf("should not be less")
|
||||
}
|
||||
if !sort.IsSorted(sorter) {
|
||||
t.Fatalf("should be sorted")
|
||||
}
|
||||
|
||||
expected := structs.ACLs{
|
||||
&structs.ACL{ID: "b"},
|
||||
&structs.ACL{ID: "a"},
|
||||
&structs.ACL{ID: "c"},
|
||||
}
|
||||
sorter.Swap(0, 1)
|
||||
if !reflect.DeepEqual(acls, expected) {
|
||||
t.Fatalf("bad: %v", acls)
|
||||
}
|
||||
if sort.IsSorted(sorter) {
|
||||
t.Fatalf("should not be sorted")
|
||||
}
|
||||
sort.Sort(sorter)
|
||||
if !sort.IsSorted(sorter) {
|
||||
t.Fatalf("should be sorted")
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_Iterator(t *testing.T) {
|
||||
t.Parallel()
|
||||
acls := structs.ACLs{}
|
||||
|
||||
iter := newACLIterator(acls)
|
||||
if front := iter.Front(); front != nil {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != nil {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
|
||||
acls = structs.ACLs{
|
||||
&structs.ACL{ID: "a"},
|
||||
&structs.ACL{ID: "b"},
|
||||
&structs.ACL{ID: "c"},
|
||||
}
|
||||
iter = newACLIterator(acls)
|
||||
if front := iter.Front(); front != acls[0] {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != acls[1] {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != acls[2] {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
iter.Next()
|
||||
if front := iter.Front(); front != nil {
|
||||
t.Fatalf("bad: %v", front)
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_reconcileACLs(t *testing.T) {
|
||||
t.Parallel()
|
||||
parseACLs := func(raw string) structs.ACLs {
|
||||
var acls structs.ACLs
|
||||
for _, key := range strings.Split(raw, "|") {
|
||||
if len(key) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
tuple := strings.Split(key, ":")
|
||||
index, err := strconv.Atoi(tuple[1])
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
acl := &structs.ACL{
|
||||
ID: tuple[0],
|
||||
Rules: tuple[2],
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: uint64(index),
|
||||
},
|
||||
}
|
||||
acls = append(acls, acl)
|
||||
}
|
||||
return acls
|
||||
}
|
||||
|
||||
parseChanges := func(changes structs.ACLRequests) string {
|
||||
var ret string
|
||||
for i, change := range changes {
|
||||
if i > 0 {
|
||||
ret += "|"
|
||||
}
|
||||
ret += fmt.Sprintf("%s:%s:%s", change.Op, change.ACL.ID, change.ACL.Rules)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
local string
|
||||
remote string
|
||||
lastRemoteIndex uint64
|
||||
expected string
|
||||
}{
|
||||
// Everything empty.
|
||||
{
|
||||
local: "",
|
||||
remote: "",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "",
|
||||
},
|
||||
// First time with empty local.
|
||||
{
|
||||
local: "",
|
||||
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
|
||||
},
|
||||
// Remote not sorted.
|
||||
{
|
||||
local: "",
|
||||
remote: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:bbb:X|set:ccc:X|set:ddd:X|set:eee:X",
|
||||
},
|
||||
// Neither side sorted.
|
||||
{
|
||||
local: "ddd:2:X|bbb:3:X|ccc:9:X|eee:11:X",
|
||||
remote: "ccc:9:X|bbb:3:X|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "",
|
||||
},
|
||||
// Fully replicated, nothing to do.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "",
|
||||
},
|
||||
// Change an ACL.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:ccc:Y",
|
||||
},
|
||||
// Change an ACL, but mask the change by the last replicated
|
||||
// index. This isn't how things work normally, but it proves
|
||||
// we are skipping the full compare based on the index.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "bbb:3:X|ccc:33:Y|ddd:2:X|eee:11:X",
|
||||
lastRemoteIndex: 33,
|
||||
expected: "",
|
||||
},
|
||||
// Empty everything out.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "delete:bbb:X|delete:ccc:X|delete:ddd:X|delete:eee:X",
|
||||
},
|
||||
// Adds on the ends and in the middle.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "aaa:99:X|bbb:3:X|ccc:9:X|ccx:101:X|ddd:2:X|eee:11:X|fff:102:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "set:aaa:X|set:ccx:X|set:fff:X",
|
||||
},
|
||||
// Deletes on the ends and in the middle.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "ccc:9:X",
|
||||
lastRemoteIndex: 0,
|
||||
expected: "delete:bbb:X|delete:ddd:X|delete:eee:X",
|
||||
},
|
||||
// Everything.
|
||||
{
|
||||
local: "bbb:3:X|ccc:9:X|ddd:2:X|eee:11:X",
|
||||
remote: "aaa:99:X|bbb:3:X|ccx:101:X|ddd:103:Y|eee:11:X|fff:102:X",
|
||||
lastRemoteIndex: 11,
|
||||
expected: "set:aaa:X|delete:ccc:X|set:ccx:X|set:ddd:Y|set:fff:X",
|
||||
},
|
||||
}
|
||||
for i, test := range tests {
|
||||
local, remote := parseACLs(test.local), parseACLs(test.remote)
|
||||
changes := reconcileLegacyACLs(local, remote, test.lastRemoteIndex)
|
||||
if actual := parseChanges(changes); actual != test.expected {
|
||||
t.Errorf("test case %d failed: %s", i, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_updateLocalACLs_RateLimit(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLReplicationApplyLimit = 1
|
||||
})
|
||||
s1.tokens.UpdateACLReplicationToken("secret")
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
changes := structs.ACLRequests{
|
||||
&structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
ID: "secret",
|
||||
Type: "client",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Should be throttled to 1 Hz.
|
||||
start := time.Now()
|
||||
if _, err := s1.updateLocalLegacyACLs(changes, context.Background()); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if dur := time.Since(start); dur < time.Second {
|
||||
t.Fatalf("too slow: %9.6f", dur.Seconds())
|
||||
}
|
||||
|
||||
changes = append(changes,
|
||||
&structs.ACLRequest{
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
ID: "secret",
|
||||
Type: "client",
|
||||
},
|
||||
})
|
||||
|
||||
// Should be throttled to 1 Hz.
|
||||
start = time.Now()
|
||||
if _, err := s1.updateLocalLegacyACLs(changes, context.Background()); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if dur := time.Since(start); dur < 2*time.Second {
|
||||
t.Fatalf("too fast: %9.6f", dur.Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication_IsACLReplicationEnabled(t *testing.T) {
|
||||
t.Parallel()
|
||||
// ACLs not enabled.
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = ""
|
||||
c.ACLsEnabled = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
if s1.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should not be enabled")
|
||||
}
|
||||
|
||||
// ACLs enabled but not replication.
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
|
||||
if s2.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should not be enabled")
|
||||
}
|
||||
|
||||
// ACLs enabled with replication.
|
||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLTokenReplication = true
|
||||
})
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
testrpc.WaitForLeader(t, s3.RPC, "dc2")
|
||||
if !s3.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should be enabled")
|
||||
}
|
||||
|
||||
// ACLs enabled with replication, but inside the ACL datacenter
|
||||
// so replication should be disabled.
|
||||
dir4, s4 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLTokenReplication = true
|
||||
})
|
||||
defer os.RemoveAll(dir4)
|
||||
defer s4.Shutdown()
|
||||
testrpc.WaitForLeader(t, s4.RPC, "dc1")
|
||||
if s4.IsACLReplicationEnabled() {
|
||||
t.Fatalf("should not be enabled")
|
||||
}
|
||||
}
|
||||
|
||||
func TestACLReplication(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = "root"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLTokenReplication = true
|
||||
c.ACLReplicationRate = 100
|
||||
c.ACLReplicationBurst = 100
|
||||
c.ACLReplicationApplyLimit = 1000000
|
||||
})
|
||||
s2.tokens.UpdateACLReplicationToken("root")
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Try to join.
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
// Create a bunch of new tokens.
|
||||
var id string
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTokenTypeClient,
|
||||
Rules: testACLPolicy,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
if err := s1.RPC("ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
checkSame := func() error {
|
||||
index, remote, err := s1.fsm.State().ACLTokenList(nil, true, true, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, local, err := s2.fsm.State().ACLTokenList(nil, true, true, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if got, want := len(remote), len(local); got != want {
|
||||
return fmt.Errorf("got %d remote ACLs want %d", got, want)
|
||||
}
|
||||
for i, token := range remote {
|
||||
if !bytes.Equal(token.Hash, local[i].Hash) {
|
||||
return fmt.Errorf("ACLs differ")
|
||||
}
|
||||
}
|
||||
|
||||
var status structs.ACLReplicationStatus
|
||||
s2.aclReplicationStatusLock.RLock()
|
||||
status = s2.aclReplicationStatus
|
||||
s2.aclReplicationStatusLock.RUnlock()
|
||||
if !status.Enabled || !status.Running ||
|
||||
status.ReplicatedTokenIndex != index ||
|
||||
status.SourceDatacenter != "dc1" {
|
||||
return fmt.Errorf("ACL replication status differs")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := checkSame(); err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
})
|
||||
|
||||
// Create more new tokens.
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTokenTypeClient,
|
||||
Rules: testACLPolicy,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var dontCare string
|
||||
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := checkSame(); err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
})
|
||||
|
||||
// Delete a token.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLDelete,
|
||||
ACL: structs.ACL{
|
||||
ID: id,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var dontCare string
|
||||
if err := s1.RPC("ACL.Apply", &arg, &dontCare); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if err := checkSame(); err != nil {
|
||||
r.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestACLReplication_diffACLPolicies(t *testing.T) {
|
||||
local := structs.ACLPolicies{
|
||||
&structs.ACLPolicy{
|
||||
|
@ -671,3 +215,293 @@ func TestACLReplication_diffACLTokens(t *testing.T) {
|
|||
"539f1cb6-40aa-464f-ae66-a900d26bc1b2",
|
||||
"c6e8fffd-cbd9-4ecd-99fe-ab2f200c7926"})
|
||||
}
|
||||
|
||||
func TestACLReplication_Tokens(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = "root"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLTokenReplication = true
|
||||
c.ACLReplicationRate = 100
|
||||
c.ACLReplicationBurst = 100
|
||||
c.ACLReplicationApplyLimit = 1000000
|
||||
})
|
||||
s2.tokens.UpdateACLReplicationToken("root")
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Try to join.
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
// Create a bunch of new tokens and policies
|
||||
var tokens structs.ACLTokens
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ACLTokenSetRequest{
|
||||
Datacenter: "dc1",
|
||||
ACLToken: structs.ACLToken{
|
||||
Description: fmt.Sprintf("token-%d", i),
|
||||
Policies: []structs.ACLTokenPolicyLink{
|
||||
structs.ACLTokenPolicyLink{
|
||||
ID: structs.ACLPolicyGlobalManagementID,
|
||||
},
|
||||
},
|
||||
Local: false,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var token structs.ACLToken
|
||||
require.NoError(t, s1.RPC("ACL.TokenSet", &arg, &token))
|
||||
tokens = append(tokens, &token)
|
||||
}
|
||||
|
||||
checkSame := func(t *retry.R) error {
|
||||
// only account for global tokens - local tokens shouldn't be replicated
|
||||
index, remote, err := s1.fsm.State().ACLTokenList(nil, false, true, "")
|
||||
require.NoError(t, err)
|
||||
_, local, err := s2.fsm.State().ACLTokenList(nil, false, true, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, local, len(remote))
|
||||
for i, token := range remote {
|
||||
require.Equal(t, token.Hash, local[i].Hash)
|
||||
}
|
||||
|
||||
var status structs.ACLReplicationStatus
|
||||
s2.aclReplicationStatusLock.RLock()
|
||||
status = s2.aclReplicationStatus
|
||||
s2.aclReplicationStatusLock.RUnlock()
|
||||
if !status.Enabled || !status.Running ||
|
||||
status.ReplicationType != structs.ACLReplicateTokens ||
|
||||
status.ReplicatedTokenIndex != index ||
|
||||
status.SourceDatacenter != "dc1" {
|
||||
return fmt.Errorf("ACL replication status differs")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
checkSame(r)
|
||||
})
|
||||
|
||||
// add some local tokens to the secondary DC
|
||||
// these shouldn't be deleted by replication
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ACLTokenSetRequest{
|
||||
Datacenter: "dc2",
|
||||
ACLToken: structs.ACLToken{
|
||||
Description: fmt.Sprintf("token-%d", i),
|
||||
Policies: []structs.ACLTokenPolicyLink{
|
||||
structs.ACLTokenPolicyLink{
|
||||
ID: structs.ACLPolicyGlobalManagementID,
|
||||
},
|
||||
},
|
||||
Local: true,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var token structs.ACLToken
|
||||
require.NoError(t, s2.RPC("ACL.TokenSet", &arg, &token))
|
||||
}
|
||||
|
||||
// add some local tokens to the primary DC
|
||||
// these shouldn't be replicated to the secondary DC
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ACLTokenSetRequest{
|
||||
Datacenter: "dc1",
|
||||
ACLToken: structs.ACLToken{
|
||||
Description: fmt.Sprintf("token-%d", i),
|
||||
Policies: []structs.ACLTokenPolicyLink{
|
||||
structs.ACLTokenPolicyLink{
|
||||
ID: structs.ACLPolicyGlobalManagementID,
|
||||
},
|
||||
},
|
||||
Local: true,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var token structs.ACLToken
|
||||
require.NoError(t, s1.RPC("ACL.TokenSet", &arg, &token))
|
||||
}
|
||||
|
||||
// Update those other tokens
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ACLTokenSetRequest{
|
||||
Datacenter: "dc1",
|
||||
ACLToken: structs.ACLToken{
|
||||
AccessorID: tokens[i].AccessorID,
|
||||
SecretID: tokens[i].SecretID,
|
||||
Description: fmt.Sprintf("token-%d-modified", i),
|
||||
Policies: []structs.ACLTokenPolicyLink{
|
||||
structs.ACLTokenPolicyLink{
|
||||
ID: structs.ACLPolicyGlobalManagementID,
|
||||
},
|
||||
},
|
||||
Local: false,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var token structs.ACLToken
|
||||
require.NoError(t, s1.RPC("ACL.TokenSet", &arg, &token))
|
||||
}
|
||||
|
||||
// Wait for the replica to converge.
|
||||
// this time it also verifies the local tokens from the primary were not replicated.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
checkSame(r)
|
||||
})
|
||||
|
||||
// verify dc2 local tokens didn't get blown away
|
||||
_, local, err := s2.fsm.State().ACLTokenList(nil, true, false, "")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, local, 50)
|
||||
|
||||
for _, token := range tokens {
|
||||
arg := structs.ACLTokenDeleteRequest{
|
||||
Datacenter: "dc1",
|
||||
TokenID: token.AccessorID,
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
|
||||
var dontCare string
|
||||
require.NoError(t, s1.RPC("ACL.TokenDelete", &arg, &dontCare))
|
||||
}
|
||||
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
checkSame(r)
|
||||
})
|
||||
}
|
||||
|
||||
func TestACLReplication_Policies(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = "root"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
client := rpcClient(t, s1)
|
||||
defer client.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLTokenReplication = false
|
||||
c.ACLReplicationRate = 100
|
||||
c.ACLReplicationBurst = 100
|
||||
c.ACLReplicationApplyLimit = 1000000
|
||||
})
|
||||
s2.tokens.UpdateACLReplicationToken("root")
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Try to join.
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
// Create a bunch of new policies
|
||||
var policies structs.ACLPolicies
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ACLPolicySetRequest{
|
||||
Datacenter: "dc1",
|
||||
Policy: structs.ACLPolicy{
|
||||
Name: fmt.Sprintf("token-%d", i),
|
||||
Description: fmt.Sprintf("token-%d", i),
|
||||
Rules: fmt.Sprintf(`service "app-%d" { policy = "read" }`, i),
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var policy structs.ACLPolicy
|
||||
require.NoError(t, s1.RPC("ACL.PolicySet", &arg, &policy))
|
||||
policies = append(policies, &policy)
|
||||
}
|
||||
|
||||
checkSame := func(t *retry.R) error {
|
||||
// only account for global tokens - local tokens shouldn't be replicated
|
||||
index, remote, err := s1.fsm.State().ACLPolicyList(nil)
|
||||
require.NoError(t, err)
|
||||
_, local, err := s2.fsm.State().ACLPolicyList(nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, local, len(remote))
|
||||
for i, policy := range remote {
|
||||
require.Equal(t, policy.Hash, local[i].Hash)
|
||||
}
|
||||
|
||||
var status structs.ACLReplicationStatus
|
||||
s2.aclReplicationStatusLock.RLock()
|
||||
status = s2.aclReplicationStatus
|
||||
s2.aclReplicationStatusLock.RUnlock()
|
||||
if !status.Enabled || !status.Running ||
|
||||
status.ReplicationType != structs.ACLReplicatePolicies ||
|
||||
status.ReplicatedIndex != index ||
|
||||
status.SourceDatacenter != "dc1" {
|
||||
return fmt.Errorf("ACL replication status differs")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
checkSame(r)
|
||||
})
|
||||
|
||||
// Update those policies
|
||||
for i := 0; i < 50; i++ {
|
||||
arg := structs.ACLPolicySetRequest{
|
||||
Datacenter: "dc1",
|
||||
Policy: structs.ACLPolicy{
|
||||
ID: policies[i].ID,
|
||||
Name: fmt.Sprintf("token-%d-modified", i),
|
||||
Description: fmt.Sprintf("token-%d-modified", i),
|
||||
Rules: policies[i].Rules,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var policy structs.ACLPolicy
|
||||
require.NoError(t, s1.RPC("ACL.PolicySet", &arg, &policy))
|
||||
}
|
||||
|
||||
// Wait for the replica to converge.
|
||||
// this time it also verifies the local tokens from the primary were not replicated.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
checkSame(r)
|
||||
})
|
||||
|
||||
for _, policy := range policies {
|
||||
arg := structs.ACLPolicyDeleteRequest{
|
||||
Datacenter: "dc1",
|
||||
PolicyID: policy.ID,
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
|
||||
var dontCare string
|
||||
require.NoError(t, s1.RPC("ACL.PolicyDelete", &arg, &dontCare))
|
||||
}
|
||||
|
||||
// Wait for the replica to converge.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
checkSame(r)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -76,19 +76,19 @@ func (s *Server) canUpgradeToNewACLs(isLeader bool) bool {
|
|||
}
|
||||
|
||||
if !s.InACLDatacenter() {
|
||||
mode, _ := ServersGetACLMode(s.WANMembers(), "", s.config.ACLDatacenter)
|
||||
if mode != structs.ACLModeEnabled {
|
||||
numServers, mode, _ := ServersGetACLMode(s.WANMembers(), "", s.config.ACLDatacenter)
|
||||
if mode != structs.ACLModeEnabled || numServers == 0 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if isLeader {
|
||||
if mode, _ := ServersGetACLMode(s.LANMembers(), "", ""); mode == structs.ACLModeLegacy {
|
||||
if _, mode, _ := ServersGetACLMode(s.LANMembers(), "", ""); mode == structs.ACLModeLegacy {
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
leader := string(s.raft.Leader())
|
||||
if _, leaderMode := ServersGetACLMode(s.LANMembers(), leader, ""); leaderMode == structs.ACLModeEnabled {
|
||||
if _, _, leaderMode := ServersGetACLMode(s.LANMembers(), leader, ""); leaderMode == structs.ACLModeEnabled {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ func (s *Server) canUpgradeToNewACLs(isLeader bool) bool {
|
|||
}
|
||||
|
||||
func (s *Server) InACLDatacenter() bool {
|
||||
return s.config.Datacenter == s.config.ACLDatacenter
|
||||
return s.config.ACLDatacenter == "" || s.config.Datacenter == s.config.ACLDatacenter
|
||||
}
|
||||
|
||||
func (s *Server) UseLegacyACLs() bool {
|
||||
|
|
|
@ -163,8 +163,8 @@ type ACLResolverTestDelegate struct {
|
|||
localTokens bool
|
||||
localPolicies bool
|
||||
getPolicyFn func(*structs.ACLPolicyResolveLegacyRequest, *structs.ACLPolicyResolveLegacyResponse) error
|
||||
tokenReadFn func(*structs.ACLTokenReadRequest, *structs.ACLTokenResponse) error
|
||||
policyResolveFn func(*structs.ACLPolicyBatchReadRequest, *structs.ACLPoliciesResponse) error
|
||||
tokenReadFn func(*structs.ACLTokenGetRequest, *structs.ACLTokenResponse) error
|
||||
policyResolveFn func(*structs.ACLPolicyBatchGetRequest, *structs.ACLPolicyBatchResponse) error
|
||||
}
|
||||
|
||||
func (d *ACLResolverTestDelegate) ACLsEnabled() bool {
|
||||
|
@ -204,12 +204,12 @@ func (d *ACLResolverTestDelegate) RPC(method string, args interface{}, reply int
|
|||
panic("Bad Test Implmentation: should provide a getPolicyFn to the ACLResolverTestDelegate")
|
||||
case "ACL.TokenRead":
|
||||
if d.tokenReadFn != nil {
|
||||
return d.tokenReadFn(args.(*structs.ACLTokenReadRequest), reply.(*structs.ACLTokenResponse))
|
||||
return d.tokenReadFn(args.(*structs.ACLTokenGetRequest), reply.(*structs.ACLTokenResponse))
|
||||
}
|
||||
panic("Bad Test Implmentation: should provide a tokenReadFn to the ACLResolverTestDelegate")
|
||||
case "ACL.PolicyResolve":
|
||||
if d.policyResolveFn != nil {
|
||||
return d.policyResolveFn(args.(*structs.ACLPolicyBatchReadRequest), reply.(*structs.ACLPoliciesResponse))
|
||||
return d.policyResolveFn(args.(*structs.ACLPolicyBatchGetRequest), reply.(*structs.ACLPolicyBatchResponse))
|
||||
}
|
||||
panic("Bad Test Implmentation: should provide a policyResolveFn to the ACLResolverTestDelegate")
|
||||
}
|
||||
|
@ -300,7 +300,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
|
|||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: true,
|
||||
tokenReadFn: func(*structs.ACLTokenReadRequest, *structs.ACLTokenResponse) error {
|
||||
tokenReadFn: func(*structs.ACLTokenGetRequest, *structs.ACLTokenResponse) error {
|
||||
return fmt.Errorf("Induced RPC Error")
|
||||
},
|
||||
}
|
||||
|
@ -322,7 +322,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
|
|||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: true,
|
||||
tokenReadFn: func(*structs.ACLTokenReadRequest, *structs.ACLTokenResponse) error {
|
||||
tokenReadFn: func(*structs.ACLTokenGetRequest, *structs.ACLTokenResponse) error {
|
||||
return fmt.Errorf("Induced RPC Error")
|
||||
},
|
||||
}
|
||||
|
@ -345,7 +345,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
|
|||
legacy: false,
|
||||
localTokens: true,
|
||||
localPolicies: false,
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchReadRequest, reply *structs.ACLPoliciesResponse) error {
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
if !policyCached {
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
|
@ -388,7 +388,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
|
|||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: true,
|
||||
tokenReadFn: func(args *structs.ACLTokenReadRequest, reply *structs.ACLTokenResponse) error {
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
if !cached {
|
||||
_, token, _ := testIdentityForToken("found")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
|
@ -425,7 +425,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
|
|||
legacy: false,
|
||||
localTokens: true,
|
||||
localPolicies: false,
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchReadRequest, reply *structs.ACLPoliciesResponse) error {
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
if !policyCached {
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
|
@ -468,7 +468,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
|
|||
legacy: false,
|
||||
localTokens: true,
|
||||
localPolicies: false,
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchReadRequest, reply *structs.ACLPoliciesResponse) error {
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
if !policyCached {
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
|
@ -523,7 +523,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
|
|||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: false,
|
||||
tokenReadFn: func(args *structs.ACLTokenReadRequest, reply *structs.ACLTokenResponse) error {
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
if !tokenCached {
|
||||
_, token, _ := testIdentityForToken("found")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
|
@ -532,7 +532,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
|
|||
}
|
||||
return fmt.Errorf("Induced RPC Error")
|
||||
},
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchReadRequest, reply *structs.ACLPoliciesResponse) error {
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
if !policyCached {
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
|
@ -576,7 +576,7 @@ func TestACLResolver_DownPolicy(t *testing.T) {
|
|||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: true,
|
||||
tokenReadFn: func(args *structs.ACLTokenReadRequest, reply *structs.ACLTokenResponse) error {
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
if !cached {
|
||||
_, token, _ := testIdentityForToken("found")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
|
@ -725,7 +725,7 @@ func TestACLResolver_LocalPolicies(t *testing.T) {
|
|||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: true,
|
||||
tokenReadFn: func(args *structs.ACLTokenReadRequest, reply *structs.ACLTokenResponse) error {
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
_, token, err := testIdentityForToken(args.TokenID)
|
||||
|
||||
if token != nil {
|
||||
|
|
|
@ -384,6 +384,16 @@ func (c *Client) Stats() map[string]map[string]string {
|
|||
"runtime": runtimeStats(),
|
||||
}
|
||||
|
||||
if c.ACLsEnabled() {
|
||||
if c.UseLegacyACLs() {
|
||||
stats["consul"]["acl"] = "legacy"
|
||||
} else {
|
||||
stats["consul"]["acl"] = "enabled"
|
||||
}
|
||||
} else {
|
||||
stats["consul"]["acl"] = "disabled"
|
||||
}
|
||||
|
||||
for outerKey, outerValue := range c.enterpriseStats() {
|
||||
if _, ok := stats[outerKey]; ok {
|
||||
for innerKey, innerValue := range outerValue {
|
||||
|
|
|
@ -23,10 +23,10 @@ func init() {
|
|||
registerCommand(structs.AutopilotRequestType, (*FSM).applyAutopilotUpdate)
|
||||
registerCommand(structs.IntentionRequestType, (*FSM).applyIntentionOperation)
|
||||
registerCommand(structs.ConnectCARequestType, (*FSM).applyConnectCAOperation)
|
||||
registerCommand(structs.ACLTokenUpsertRequestType, (*FSM).applyACLTokenUpsertOperation)
|
||||
registerCommand(structs.ACLTokenSetRequestType, (*FSM).applyACLTokenSetOperation)
|
||||
registerCommand(structs.ACLTokenDeleteRequestType, (*FSM).applyACLTokenDeleteOperation)
|
||||
registerCommand(structs.ACLBootstrapRequestType, (*FSM).applyACLTokenBootstrap)
|
||||
registerCommand(structs.ACLPolicyUpsertRequestType, (*FSM).applyACLPolicyUpsertOperation)
|
||||
registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation)
|
||||
registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation)
|
||||
}
|
||||
|
||||
|
@ -179,7 +179,7 @@ func (c *FSM) applyACLOperation(buf []byte, index uint64) interface{} {
|
|||
}
|
||||
return req.ACL.ID
|
||||
case structs.ACLDelete:
|
||||
return c.state.ACLTokenDeleteSecret(index, req.ACL.ID)
|
||||
return c.state.ACLTokenDeleteBySecret(index, req.ACL.ID)
|
||||
default:
|
||||
c.logger.Printf("[WARN] consul.fsm: Invalid ACL operation '%s'", req.Op)
|
||||
return fmt.Errorf("Invalid ACL operation '%s'", req.Op)
|
||||
|
@ -351,15 +351,15 @@ func (c *FSM) applyConnectCAOperation(buf []byte, index uint64) interface{} {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *FSM) applyACLTokenUpsertOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.ACLTokenBatchUpsertRequest
|
||||
func (c *FSM) applyACLTokenSetOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.ACLTokenBatchSetRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "token"}, time.Now(),
|
||||
[]metrics.Label{{Name: "op", Value: "upsert"}})
|
||||
|
||||
return c.state.ACLTokensUpsert(index, req.Tokens, req.AllowCreate)
|
||||
return c.state.ACLTokenBatchSet(index, req.Tokens, req.CAS)
|
||||
}
|
||||
|
||||
func (c *FSM) applyACLTokenDeleteOperation(buf []byte, index uint64) interface{} {
|
||||
|
@ -370,7 +370,7 @@ func (c *FSM) applyACLTokenDeleteOperation(buf []byte, index uint64) interface{}
|
|||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "token"}, time.Now(),
|
||||
[]metrics.Label{{Name: "op", Value: "delete"}})
|
||||
|
||||
return c.state.ACLTokensDelete(index, req.TokenIDs)
|
||||
return c.state.ACLTokenBatchDelete(index, req.TokenIDs)
|
||||
}
|
||||
|
||||
func (c *FSM) applyACLTokenBootstrap(buf []byte, index uint64) interface{} {
|
||||
|
@ -383,15 +383,15 @@ func (c *FSM) applyACLTokenBootstrap(buf []byte, index uint64) interface{} {
|
|||
return c.state.ACLBootstrap(index, req.ResetIndex, &req.Token, false)
|
||||
}
|
||||
|
||||
func (c *FSM) applyACLPolicyUpsertOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.ACLPolicyBatchUpsertRequest
|
||||
func (c *FSM) applyACLPolicySetOperation(buf []byte, index uint64) interface{} {
|
||||
var req structs.ACLPolicyBatchSetRequest
|
||||
if err := structs.Decode(buf, &req); err != nil {
|
||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "policy"}, time.Now(),
|
||||
[]metrics.Label{{Name: "op", Value: "upsert"}})
|
||||
|
||||
return c.state.ACLPoliciesUpsert(index, req.Policies)
|
||||
return c.state.ACLPolicyBatchSet(index, req.Policies)
|
||||
}
|
||||
|
||||
func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{} {
|
||||
|
@ -402,5 +402,5 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{
|
|||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "policy"}, time.Now(),
|
||||
[]metrics.Label{{Name: "op", Value: "delete"}})
|
||||
|
||||
return c.state.ACLPoliciesDelete(index, req.PolicyIDs)
|
||||
return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs)
|
||||
}
|
||||
|
|
|
@ -25,8 +25,8 @@ func init() {
|
|||
registerRestorer(structs.ConnectCAProviderStateType, restoreConnectCAProviderState)
|
||||
registerRestorer(structs.ConnectCAConfigType, restoreConnectCAConfig)
|
||||
registerRestorer(structs.IndexRequestType, restoreIndex)
|
||||
registerRestorer(structs.ACLTokenUpsertRequestType, restoreToken)
|
||||
registerRestorer(structs.ACLPolicyUpsertRequestType, restorePolicy)
|
||||
registerRestorer(structs.ACLTokenSetRequestType, restoreToken)
|
||||
registerRestorer(structs.ACLPolicySetRequestType, restorePolicy)
|
||||
}
|
||||
|
||||
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||
|
@ -173,7 +173,7 @@ func (s *snapshot) persistACLs(sink raft.SnapshotSink,
|
|||
}
|
||||
|
||||
for token := tokens.Next(); token != nil; token = tokens.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.ACLTokenUpsertRequestType)}); err != nil {
|
||||
if _, err := sink.Write([]byte{byte(structs.ACLTokenSetRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(token.(*structs.ACLToken)); err != nil {
|
||||
|
@ -187,7 +187,7 @@ func (s *snapshot) persistACLs(sink raft.SnapshotSink,
|
|||
}
|
||||
|
||||
for policy := policies.Next(); policy != nil; policy = policies.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.ACLPolicyUpsertRequestType)}); err != nil {
|
||||
if _, err := sink.Write([]byte{byte(structs.ACLPolicySetRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(policy.(*structs.ACLPolicy)); err != nil {
|
||||
|
|
|
@ -433,10 +433,10 @@ func (s *Server) initializeACLs(upgrade bool) error {
|
|||
}
|
||||
policy.SetHash(true)
|
||||
|
||||
req := structs.ACLPolicyBatchUpsertRequest{
|
||||
req := structs.ACLPolicyBatchSetRequest{
|
||||
Policies: structs.ACLPolicies{&policy},
|
||||
}
|
||||
_, err := s.raftApply(structs.ACLPolicyUpsertRequestType, &req)
|
||||
_, err := s.raftApply(structs.ACLPolicySetRequestType, &req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create global-management policy: %v", err)
|
||||
}
|
||||
|
@ -497,10 +497,11 @@ func (s *Server) initializeACLs(upgrade bool) error {
|
|||
|
||||
if !done {
|
||||
// either we didn't attempt to or setting the token with a bootstrap request failed.
|
||||
req := structs.ACLTokenBatchUpsertRequest{
|
||||
req := structs.ACLTokenBatchSetRequest{
|
||||
Tokens: structs.ACLTokens{&token},
|
||||
CAS: false,
|
||||
}
|
||||
if _, err := s.raftApply(structs.ACLTokenUpsertRequestType, &req); err != nil {
|
||||
if _, err := s.raftApply(structs.ACLTokenSetRequestType, &req); err != nil {
|
||||
return fmt.Errorf("failed to create master token: %v", err)
|
||||
}
|
||||
|
||||
|
@ -531,11 +532,11 @@ func (s *Server) initializeACLs(upgrade bool) error {
|
|||
}
|
||||
token.SetHash(true)
|
||||
|
||||
req := structs.ACLTokenBatchUpsertRequest{
|
||||
Tokens: structs.ACLTokens{token},
|
||||
AllowCreate: true,
|
||||
req := structs.ACLTokenBatchSetRequest{
|
||||
Tokens: structs.ACLTokens{token},
|
||||
CAS: false,
|
||||
}
|
||||
_, err := s.raftApply(structs.ACLTokenUpsertRequestType, &req)
|
||||
_, err := s.raftApply(structs.ACLTokenSetRequestType, &req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create anonymous token: %v", err)
|
||||
}
|
||||
|
@ -623,12 +624,16 @@ func (s *Server) startACLUpgrade() {
|
|||
newToken.Policies = append(newToken.Policies, structs.ACLTokenPolicyLink{ID: structs.ACLPolicyGlobalManagementID})
|
||||
}
|
||||
|
||||
// need to copy these as we are going to do a CAS operation.
|
||||
newToken.CreateIndex = token.CreateIndex
|
||||
newToken.ModifyIndex = token.ModifyIndex
|
||||
|
||||
newTokens = append(newTokens, &newToken)
|
||||
}
|
||||
|
||||
req := &structs.ACLTokenBatchUpsertRequest{Tokens: newTokens, AllowCreate: false}
|
||||
req := &structs.ACLTokenBatchSetRequest{Tokens: newTokens, CAS: true}
|
||||
|
||||
resp, err := s.raftApply(structs.ACLTokenUpsertRequestType, req)
|
||||
resp, err := s.raftApply(structs.ACLTokenSetRequestType, req)
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] acl: failed to apply acl token upgrade batch: %v", err)
|
||||
}
|
||||
|
|
|
@ -984,33 +984,21 @@ func TestLeader_ACL_Initialization(t *testing.T) {
|
|||
|
||||
if tt.master != "" {
|
||||
_, master, err := s1.fsm.State().ACLTokenGetBySecret(nil, tt.master)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if master == nil {
|
||||
t.Fatalf("master token wasn't created")
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, master)
|
||||
}
|
||||
|
||||
_, anon, err := s1.fsm.State().ACLTokenGetBySecret(nil, anonymousToken)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if anon == nil {
|
||||
t.Fatalf("anonymous token wasn't created")
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, anon)
|
||||
|
||||
canBootstrap, _, err := s1.fsm.State().CanBootstrapACLToken()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if tt.bootstrap {
|
||||
if !canBootstrap {
|
||||
t.Fatalf("bootstrap should be allowed")
|
||||
}
|
||||
} else if canBootstrap {
|
||||
t.Fatalf("bootstrap should not be allowed")
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.bootstrap, canBootstrap)
|
||||
|
||||
_, policy, err := s1.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, policy)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -1156,3 +1144,65 @@ func TestLeader_PersistIntermediateCAs(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestLeader_ACLUpgrade(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = "root"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// create a legacy management ACL
|
||||
mgmt := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "Management token",
|
||||
Type: structs.ACLTokenTypeManagement,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var mgmt_id string
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.Apply", &mgmt, &mgmt_id))
|
||||
|
||||
// wait for it to be upgraded
|
||||
retry.Run(t, func(t *retry.R) {
|
||||
_, token, err := s1.fsm.State().ACLTokenGetBySecret(nil, mgmt_id)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, token)
|
||||
require.NotEqual(t, "", token.AccessorID)
|
||||
require.Equal(t, structs.ACLTokenTypeManagement, token.Type)
|
||||
require.Len(t, token.Policies, 1)
|
||||
require.Equal(t, structs.ACLPolicyGlobalManagementID, token.Policies[0].ID)
|
||||
})
|
||||
|
||||
// create a legacy management ACL
|
||||
client := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "Management token",
|
||||
Type: structs.ACLTokenTypeClient,
|
||||
Rules: `node "" { policy = "read"}`,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var client_id string
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ACL.Apply", &client, &client_id))
|
||||
|
||||
// wait for it to be upgraded
|
||||
retry.Run(t, func(t *retry.R) {
|
||||
_, token, err := s1.fsm.State().ACLTokenGetBySecret(nil, client_id)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, token)
|
||||
require.NotEqual(t, "", token.AccessorID)
|
||||
require.Len(t, token.Policies, 0)
|
||||
require.Equal(t, structs.ACLTokenTypeClient, token.Type)
|
||||
require.Equal(t, client.ACL.Rules, token.Rules)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1071,6 +1071,17 @@ func (s *Server) Stats() map[string]map[string]string {
|
|||
"serf_lan": s.serfLAN.Stats(),
|
||||
"runtime": runtimeStats(),
|
||||
}
|
||||
|
||||
if s.ACLsEnabled() {
|
||||
if s.UseLegacyACLs() {
|
||||
stats["consul"]["acl"] = "legacy"
|
||||
} else {
|
||||
stats["consul"]["acl"] = "enabled"
|
||||
}
|
||||
} else {
|
||||
stats["consul"]["acl"] = "disabled"
|
||||
}
|
||||
|
||||
if s.serfWAN != nil {
|
||||
stats["serf_wan"] = s.serfWAN.Stats()
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
|
|||
conf.Tags["use_tls"] = "1"
|
||||
}
|
||||
|
||||
if s.config.ACLDatacenter != "" {
|
||||
if s.acls.ACLsEnabled() {
|
||||
// we start in legacy mode and allow upgrading later
|
||||
conf.Tags["acls"] = string(structs.ACLModeLegacy)
|
||||
} else {
|
||||
|
|
|
@ -63,7 +63,8 @@ func tokensTableSchema() *memdb.TableSchema {
|
|||
Name: "acl-tokens",
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
"accessor": &memdb.IndexSchema{
|
||||
Name: "accessor",
|
||||
Name: "accessor",
|
||||
// DEPRECATED (ACL-Legacy-Compat) - we should not AllowMissing here once legacy compat is removed
|
||||
AllowMissing: true,
|
||||
Unique: true,
|
||||
Indexer: &memdb.UUIDFieldIndex{
|
||||
|
@ -141,15 +142,6 @@ func policiesTableSchema() *memdb.TableSchema {
|
|||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
"datacenters": &memdb.IndexSchema{
|
||||
Name: "datacenters",
|
||||
AllowMissing: true,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringSliceFieldIndex{
|
||||
Field: "Datacenters",
|
||||
Lowercase: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -220,7 +212,7 @@ func (s *Store) ACLBootstrap(idx, resetIndex uint64, token *structs.ACLToken, le
|
|||
}
|
||||
}
|
||||
|
||||
if err := s.aclTokenSetTxn(tx, idx, token, true, false, legacy); err != nil {
|
||||
if err := s.aclTokenSetTxn(tx, idx, token, false, false, legacy); err != nil {
|
||||
return fmt.Errorf("failed inserting bootstrap token: %v", err)
|
||||
}
|
||||
if err := indexUpdateMaxTxn(tx, idx, "acl-tokens"); err != nil {
|
||||
|
@ -280,7 +272,7 @@ func (s *Store) ACLTokenSet(idx uint64, token *structs.ACLToken, legacy bool) er
|
|||
defer tx.Abort()
|
||||
|
||||
// Call set on the ACL
|
||||
if err := s.aclTokenSetTxn(tx, idx, token, true, false, legacy); err != nil {
|
||||
if err := s.aclTokenSetTxn(tx, idx, token, false, false, legacy); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -292,14 +284,14 @@ func (s *Store) ACLTokenSet(idx uint64, token *structs.ACLToken, legacy bool) er
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) ACLTokensUpsert(idx uint64, tokens structs.ACLTokens, allowCreate bool) error {
|
||||
func (s *Store) ACLTokenBatchSet(idx uint64, tokens structs.ACLTokens, cas bool) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
for _, token := range tokens {
|
||||
// this is only used when doing batch insertions for upgrades and replication. Therefore
|
||||
// we take whatever those said.
|
||||
if err := s.aclTokenSetTxn(tx, idx, token, allowCreate, true, false); err != nil {
|
||||
if err := s.aclTokenSetTxn(tx, idx, token, cas, true, false); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -314,7 +306,7 @@ func (s *Store) ACLTokensUpsert(idx uint64, tokens structs.ACLTokens, allowCreat
|
|||
|
||||
// aclTokenSetTxn is the inner method used to insert an ACL token with the
|
||||
// proper indexes into the state store.
|
||||
func (s *Store) aclTokenSetTxn(tx *memdb.Txn, idx uint64, token *structs.ACLToken, allowCreate, allowMissingPolicyIDs, legacy bool) error {
|
||||
func (s *Store) aclTokenSetTxn(tx *memdb.Txn, idx uint64, token *structs.ACLToken, cas, allowMissingPolicyIDs, legacy bool) error {
|
||||
// Check that the ID is set
|
||||
if token.SecretID == "" {
|
||||
return ErrMissingACLTokenSecret
|
||||
|
@ -331,13 +323,29 @@ func (s *Store) aclTokenSetTxn(tx *memdb.Txn, idx uint64, token *structs.ACLToke
|
|||
return fmt.Errorf("failed token lookup: %s", err)
|
||||
}
|
||||
|
||||
if existing == nil && !allowCreate {
|
||||
return nil
|
||||
var original *structs.ACLToken
|
||||
|
||||
if existing != nil {
|
||||
original = existing.(*structs.ACLToken)
|
||||
}
|
||||
|
||||
if legacy && existing != nil {
|
||||
original := existing.(*structs.ACLToken)
|
||||
if len(original.Policies) > 0 {
|
||||
if cas {
|
||||
// set-if-unset case
|
||||
if token.ModifyIndex == 0 && original != nil {
|
||||
return nil
|
||||
}
|
||||
// token already deleted
|
||||
if token.ModifyIndex != 0 && original == nil {
|
||||
return nil
|
||||
}
|
||||
// check for other modifications
|
||||
if token.ModifyIndex != 0 && token.ModifyIndex != original.ModifyIndex {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if legacy && original != nil {
|
||||
if len(original.Policies) > 0 || original.Type == "" {
|
||||
return fmt.Errorf("failed inserting acl token: cannot use legacy endpoint to modify a non-legacy token")
|
||||
}
|
||||
|
||||
|
@ -349,8 +357,16 @@ func (s *Store) aclTokenSetTxn(tx *memdb.Txn, idx uint64, token *structs.ACLToke
|
|||
}
|
||||
|
||||
// Set the indexes
|
||||
if existing != nil {
|
||||
token.CreateIndex = existing.(*structs.ACLToken).CreateIndex
|
||||
if original != nil {
|
||||
if original.AccessorID != "" && token.AccessorID != original.AccessorID {
|
||||
return fmt.Errorf("The ACL Token AccessorID field is immutable")
|
||||
}
|
||||
|
||||
if token.SecretID != original.SecretID {
|
||||
return fmt.Errorf("The ACL Token SecretID field is immutable")
|
||||
}
|
||||
|
||||
token.CreateIndex = original.CreateIndex
|
||||
token.ModifyIndex = idx
|
||||
} else {
|
||||
token.CreateIndex = idx
|
||||
|
@ -389,7 +405,7 @@ func (s *Store) aclTokenGet(ws memdb.WatchSet, value, index string) (uint64, *st
|
|||
return idx, token, nil
|
||||
}
|
||||
|
||||
func (s *Store) ACLTokenBatchRead(ws memdb.WatchSet, accessors []string) (uint64, structs.ACLTokens, error) {
|
||||
func (s *Store) ACLTokenBatchGet(ws memdb.WatchSet, accessors []string) (uint64, structs.ACLTokens, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -511,19 +527,19 @@ func (s *Store) ACLTokenListUpgradeable(max int) (structs.ACLTokens, <-chan stru
|
|||
return tokens, iter.WatchCh(), nil
|
||||
}
|
||||
|
||||
// ACLTokenDeleteSecret is used to remove an existing ACL from the state store. If
|
||||
// ACLTokenDeleteBySecret is used to remove an existing ACL from the state store. If
|
||||
// the ACL does not exist this is a no-op and no error is returned.
|
||||
func (s *Store) ACLTokenDeleteSecret(idx uint64, secret string) error {
|
||||
func (s *Store) ACLTokenDeleteBySecret(idx uint64, secret string) error {
|
||||
return s.aclTokenDelete(idx, secret, "id")
|
||||
}
|
||||
|
||||
// ACLTokenDeleteAccessor is used to remove an existing ACL from the state store. If
|
||||
// ACLTokenDeleteByAccessor is used to remove an existing ACL from the state store. If
|
||||
// the ACL does not exist this is a no-op and no error is returned.
|
||||
func (s *Store) ACLTokenDeleteAccessor(idx uint64, accessor string) error {
|
||||
func (s *Store) ACLTokenDeleteByAccessor(idx uint64, accessor string) error {
|
||||
return s.aclTokenDelete(idx, accessor, "accessor")
|
||||
}
|
||||
|
||||
func (s *Store) ACLTokensDelete(idx uint64, tokenIDs []string) error {
|
||||
func (s *Store) ACLTokenBatchDelete(idx uint64, tokenIDs []string) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -541,7 +557,9 @@ func (s *Store) aclTokenDelete(idx uint64, value, index string) error {
|
|||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
s.aclTokenDeleteTxn(tx, idx, value, index)
|
||||
if err := s.aclTokenDeleteTxn(tx, idx, value, index); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
return nil
|
||||
|
@ -558,6 +576,10 @@ func (s *Store) aclTokenDeleteTxn(tx *memdb.Txn, idx uint64, value, index string
|
|||
return nil
|
||||
}
|
||||
|
||||
if token.(*structs.ACLToken).AccessorID == structs.ACLTokenAnonymousID {
|
||||
return fmt.Errorf("Deletion of the builtin anonymous token is not permitted")
|
||||
}
|
||||
|
||||
if err := tx.Delete("acl-tokens", token); err != nil {
|
||||
return fmt.Errorf("failed deleting acl token: %v", err)
|
||||
}
|
||||
|
@ -567,7 +589,7 @@ func (s *Store) aclTokenDeleteTxn(tx *memdb.Txn, idx uint64, value, index string
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) ACLPoliciesUpsert(idx uint64, policies structs.ACLPolicies) error {
|
||||
func (s *Store) ACLPolicyBatchSet(idx uint64, policies structs.ACLPolicies) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -664,7 +686,7 @@ func (s *Store) ACLPolicyGetByName(ws memdb.WatchSet, name string) (uint64, *str
|
|||
return s.aclPolicyGet(ws, name, "name")
|
||||
}
|
||||
|
||||
func (s *Store) ACLPolicyBatchRead(ws memdb.WatchSet, ids []string) (uint64, structs.ACLPolicies, error) {
|
||||
func (s *Store) ACLPolicyBatchGet(ws memdb.WatchSet, ids []string) (uint64, structs.ACLPolicies, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -713,19 +735,11 @@ func (s *Store) aclPolicyGet(ws memdb.WatchSet, value, index string) (uint64, *s
|
|||
return idx, policy, nil
|
||||
}
|
||||
|
||||
func (s *Store) ACLPolicyList(ws memdb.WatchSet, datacenter string) (uint64, structs.ACLPolicies, error) {
|
||||
func (s *Store) ACLPolicyList(ws memdb.WatchSet) (uint64, structs.ACLPolicies, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
var iter memdb.ResultIterator
|
||||
var err error
|
||||
|
||||
if datacenter != "" {
|
||||
iter, err = tx.Get("acl-policies", "datacenters", datacenter)
|
||||
} else {
|
||||
iter, err = tx.Get("acl-policies", "id")
|
||||
}
|
||||
|
||||
iter, err := tx.Get("acl-policies", "id")
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed acl policy lookup: %v", err)
|
||||
}
|
||||
|
@ -750,7 +764,7 @@ func (s *Store) ACLPolicyDeleteByName(idx uint64, name string) error {
|
|||
return s.aclPolicyDelete(idx, name, "name")
|
||||
}
|
||||
|
||||
func (s *Store) ACLPoliciesDelete(idx uint64, policyIDs []string) error {
|
||||
func (s *Store) ACLPolicyBatchDelete(idx uint64, policyIDs []string) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -284,15 +284,19 @@ func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Versio
|
|||
return true
|
||||
}
|
||||
|
||||
func ServersGetACLMode(members []serf.Member, leader string, datacenter string) (mode structs.ACLMode, leaderMode structs.ACLMode) {
|
||||
func ServersGetACLMode(members []serf.Member, leader string, datacenter string) (numServers int, mode structs.ACLMode, leaderMode structs.ACLMode) {
|
||||
numServers = 0
|
||||
mode = structs.ACLModeEnabled
|
||||
leaderMode = structs.ACLModeDisabled
|
||||
leaderMode = structs.ACLModeUnknown
|
||||
for _, member := range members {
|
||||
if valid, parts := metadata.IsConsulServer(member); valid {
|
||||
|
||||
if datacenter != "" && parts.Datacenter != datacenter {
|
||||
continue
|
||||
}
|
||||
|
||||
numServers += 1
|
||||
|
||||
if memberAddr := (&net.TCPAddr{IP: member.Addr, Port: parts.Port}).String(); memberAddr == leader {
|
||||
leaderMode = parts.ACLs
|
||||
}
|
||||
|
|
|
@ -499,27 +499,27 @@ type ACLReplicationStatus struct {
|
|||
LastError time.Time
|
||||
}
|
||||
|
||||
// ACLTokenUpsertRequest is used for token creation and update operations
|
||||
// ACLTokenSetRequest is used for token creation and update operations
|
||||
// at the RPC layer
|
||||
type ACLTokenUpsertRequest struct {
|
||||
type ACLTokenSetRequest struct {
|
||||
ACLToken ACLToken // Token to manipulate - I really dislike this name but "Token" is taken in the WriteRequest
|
||||
Datacenter string // The datacenter to perform the request within
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
func (r *ACLTokenUpsertRequest) RequestDatacenter() string {
|
||||
func (r *ACLTokenSetRequest) RequestDatacenter() string {
|
||||
return r.Datacenter
|
||||
}
|
||||
|
||||
// ACLTokenReadRequest is used for token read operations at the RPC layer
|
||||
type ACLTokenReadRequest struct {
|
||||
// ACLTokenGetRequest is used for token read operations at the RPC layer
|
||||
type ACLTokenGetRequest struct {
|
||||
TokenID string // id used for the token lookup
|
||||
TokenIDType ACLTokenIDType // The Type of ID used to lookup the token
|
||||
Datacenter string // The datacenter to perform the request within
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
func (r *ACLTokenReadRequest) RequestDatacenter() string {
|
||||
func (r *ACLTokenGetRequest) RequestDatacenter() string {
|
||||
return r.Datacenter
|
||||
}
|
||||
|
||||
|
@ -554,27 +554,27 @@ type ACLTokenListResponse struct {
|
|||
QueryMeta
|
||||
}
|
||||
|
||||
// ACLTokenBatchReadRequest is used for reading multiple tokens, this is
|
||||
// ACLTokenBatchGetRequest is used for reading multiple tokens, this is
|
||||
// different from the the token list request in that only tokens with the
|
||||
// the requested ids are returned
|
||||
type ACLTokenBatchReadRequest struct {
|
||||
type ACLTokenBatchGetRequest struct {
|
||||
AccessorIDs []string // List of accessor ids to fetch
|
||||
Datacenter string // The datacenter to perform the request within
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
func (r *ACLTokenBatchReadRequest) RequestDatacenter() string {
|
||||
func (r *ACLTokenBatchGetRequest) RequestDatacenter() string {
|
||||
return r.Datacenter
|
||||
}
|
||||
|
||||
// ACLTokenBatchUpsertRequest is used only at the Raft layer
|
||||
// ACLTokenBatchSetRequest is used only at the Raft layer
|
||||
// for batching multiple token creation/update operations
|
||||
//
|
||||
// This is particularly useful during token replication and during
|
||||
// automatic legacy token upgrades.
|
||||
type ACLTokenBatchUpsertRequest struct {
|
||||
Tokens ACLTokens
|
||||
AllowCreate bool
|
||||
type ACLTokenBatchSetRequest struct {
|
||||
Tokens ACLTokens
|
||||
CAS bool
|
||||
}
|
||||
|
||||
// ACLTokenBatchDeleteRequest is used only at the Raft layer
|
||||
|
@ -603,20 +603,20 @@ type ACLTokenResponse struct {
|
|||
QueryMeta
|
||||
}
|
||||
|
||||
// ACLTokensResponse returns multiple Tokens associated with the same metadata
|
||||
type ACLTokensResponse struct {
|
||||
// ACLTokenBatchResponse returns multiple Tokens associated with the same metadata
|
||||
type ACLTokenBatchResponse struct {
|
||||
Tokens []*ACLToken
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// ACLPolicyUpsertRequest is used at the RPC layer for creation and update requests
|
||||
type ACLPolicyUpsertRequest struct {
|
||||
// ACLPolicySetRequest is used at the RPC layer for creation and update requests
|
||||
type ACLPolicySetRequest struct {
|
||||
Policy ACLPolicy // The policy to upsert
|
||||
Datacenter string // The datacenter to perform the request within
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
func (r *ACLPolicyUpsertRequest) RequestDatacenter() string {
|
||||
func (r *ACLPolicySetRequest) RequestDatacenter() string {
|
||||
return r.Datacenter
|
||||
}
|
||||
|
||||
|
@ -631,20 +631,19 @@ func (r *ACLPolicyDeleteRequest) RequestDatacenter() string {
|
|||
return r.Datacenter
|
||||
}
|
||||
|
||||
// ACLPolicyReadRequest is used at the RPC layer to perform policy read operations
|
||||
type ACLPolicyReadRequest struct {
|
||||
// ACLPolicyGetRequest is used at the RPC layer to perform policy read operations
|
||||
type ACLPolicyGetRequest struct {
|
||||
PolicyID string // id used for the policy lookup
|
||||
Datacenter string // The datacenter to perform the request within
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
func (r *ACLPolicyReadRequest) RequestDatacenter() string {
|
||||
func (r *ACLPolicyGetRequest) RequestDatacenter() string {
|
||||
return r.Datacenter
|
||||
}
|
||||
|
||||
// ACLPolicyListRequest is used at the RPC layer to request a listing of policies
|
||||
type ACLPolicyListRequest struct {
|
||||
DCScope string
|
||||
Datacenter string // The datacenter to perform the request within
|
||||
QueryOptions
|
||||
}
|
||||
|
@ -658,15 +657,15 @@ type ACLPolicyListResponse struct {
|
|||
QueryMeta
|
||||
}
|
||||
|
||||
// ACLPolicyBatchReadRequest is used at the RPC layer to request a subset of
|
||||
// ACLPolicyBatchGetRequest is used at the RPC layer to request a subset of
|
||||
// the policies associated with the token used for retrieval
|
||||
type ACLPolicyBatchReadRequest struct {
|
||||
type ACLPolicyBatchGetRequest struct {
|
||||
PolicyIDs []string // List of policy ids to fetch
|
||||
Datacenter string // The datacenter to perform the request within
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
func (r *ACLPolicyBatchReadRequest) RequestDatacenter() string {
|
||||
func (r *ACLPolicyBatchGetRequest) RequestDatacenter() string {
|
||||
return r.Datacenter
|
||||
}
|
||||
|
||||
|
@ -676,16 +675,16 @@ type ACLPolicyResponse struct {
|
|||
QueryMeta
|
||||
}
|
||||
|
||||
type ACLPoliciesResponse struct {
|
||||
type ACLPolicyBatchResponse struct {
|
||||
Policies []*ACLPolicy
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// ACLPolicyBatchUpsertRequest is used at the Raft layer for batching
|
||||
// ACLPolicyBatchSetRequest is used at the Raft layer for batching
|
||||
// multiple policy creations and updates
|
||||
//
|
||||
// This is particularly useful during replication
|
||||
type ACLPolicyBatchUpsertRequest struct {
|
||||
type ACLPolicyBatchSetRequest struct {
|
||||
Policies ACLPolicies
|
||||
}
|
||||
|
||||
|
|
|
@ -48,9 +48,9 @@ const (
|
|||
ConnectCAProviderStateType = 14
|
||||
ConnectCAConfigType = 15 // FSM snapshots only.
|
||||
IndexRequestType = 16 // FSM snapshots only.
|
||||
ACLTokenUpsertRequestType = 17
|
||||
ACLTokenSetRequestType = 17
|
||||
ACLTokenDeleteRequestType = 18
|
||||
ACLPolicyUpsertRequestType = 19
|
||||
ACLPolicySetRequestType = 19
|
||||
ACLPolicyDeleteRequestType = 20
|
||||
)
|
||||
|
||||
|
|
|
@ -56,9 +56,9 @@ func TestStructs_Implements(t *testing.T) {
|
|||
_ RPCInfo = &SessionSpecificRequest{}
|
||||
_ RPCInfo = &EventFireRequest{}
|
||||
_ RPCInfo = &ACLPolicyResolveLegacyRequest{}
|
||||
_ RPCInfo = &ACLPolicyBatchReadRequest{}
|
||||
_ RPCInfo = &ACLPolicyReadRequest{}
|
||||
_ RPCInfo = &ACLTokenReadRequest{}
|
||||
_ RPCInfo = &ACLPolicyBatchGetRequest{}
|
||||
_ RPCInfo = &ACLPolicyGetRequest{}
|
||||
_ RPCInfo = &ACLTokenGetRequest{}
|
||||
_ RPCInfo = &KeyringRequest{}
|
||||
_ CompoundResponse = &KeyringResponses{}
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue