mirror of https://github.com/status-im/consul.git
Fix several ACL token/policy resolution issues. (#5246)
* Fix 2 remote ACL policy resolution issues 1 - Use the right method to fire async not found errors when the ACL.PolicyResolve RPC returns that error. This was previously accidentally firing a token result instead of a policy result which would have effectively done nothing (unless there happened to be a token with a secret id == the policy id being resolved. 2. When concurrent policy resolution is being done we single flight the requests. The bug before was that for the policy resolution that was going to piggy back on anothers RPC results it wasn’t waiting long enough for the results to come back due to looping with the wrong variable. * Fix a handful of other edge case ACL scenarios The main issue was that token specific issues (not able to access a particular policy or the token being deleted after initial fetching) were poisoning the policy cache. A second issue was that for concurrent token resolutions, the first resolution to get started would go fetch all the policies. If before the policies were retrieved a second resolution request came in, the new request would register watchers for those policies but then never block waiting for them to complete. This resulted in using the default policy when it shouldn't have.
This commit is contained in:
parent
ef9f27cbc8
commit
579a8b32ed
|
@ -52,6 +52,10 @@ const (
|
|||
// aclModeCheckMaxInterval controls the maximum interval for how often the agent
|
||||
// checks if it should be using the new or legacy ACL system.
|
||||
aclModeCheckMaxInterval = 30 * time.Second
|
||||
|
||||
// Maximum number of re-resolution requests to be made if the token is modified between
|
||||
// resolving the token and resolving its policies that would remove one of its policies.
|
||||
tokenPolicyResolutionMaxRetries = 5
|
||||
)
|
||||
|
||||
func minTTL(a time.Duration, b time.Duration) time.Duration {
|
||||
|
@ -99,6 +103,15 @@ type remoteACLPolicyResult struct {
|
|||
err error
|
||||
}
|
||||
|
||||
type policyTokenError struct {
|
||||
Err error
|
||||
token string
|
||||
}
|
||||
|
||||
func (e policyTokenError) Error() string {
|
||||
return e.Err.Error()
|
||||
}
|
||||
|
||||
// ACLResolverConfig holds all the configuration necessary to create an ACLResolver
|
||||
type ACLResolverConfig struct {
|
||||
Config *Config
|
||||
|
@ -472,9 +485,11 @@ func (r *ACLResolver) resolveIdentityFromToken(token string) (structs.ACLIdentit
|
|||
}
|
||||
|
||||
// fireAsyncPolicyResult is used to notify all waiters that policy resolution is complete.
|
||||
func (r *ACLResolver) fireAsyncPolicyResult(policyID string, policy *structs.ACLPolicy, err error) {
|
||||
func (r *ACLResolver) fireAsyncPolicyResult(policyID string, policy *structs.ACLPolicy, err error, updateCache bool) {
|
||||
if updateCache {
|
||||
// cache the result: positive or negative
|
||||
r.cache.PutPolicy(policyID, policy)
|
||||
}
|
||||
|
||||
// get the list of channels to send the result to
|
||||
r.asyncPolicyResultsMutex.Lock()
|
||||
|
@ -505,22 +520,48 @@ func (r *ACLResolver) resolvePoliciesAsyncForIdentity(identity structs.ACLIdenti
|
|||
err := r.delegate.RPC("ACL.PolicyResolve", &req, &resp)
|
||||
if err == nil {
|
||||
for _, policy := range resp.Policies {
|
||||
r.fireAsyncPolicyResult(policy.ID, policy, nil)
|
||||
r.fireAsyncPolicyResult(policy.ID, policy, nil, true)
|
||||
found[policy.ID] = struct{}{}
|
||||
}
|
||||
|
||||
for _, policyID := range policyIDs {
|
||||
if _, ok := found[policyID]; !ok {
|
||||
r.fireAsyncPolicyResult(policyID, nil, acl.ErrNotFound)
|
||||
r.fireAsyncPolicyResult(policyID, nil, acl.ErrNotFound, true)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if acl.IsErrNotFound(err) {
|
||||
// make sure to indicate that this identity is no longer valid within
|
||||
// the cache
|
||||
//
|
||||
// Note - This must be done before firing the results or else it would
|
||||
// be possible for waiters to get woken up an get the cached identity
|
||||
// again
|
||||
r.cache.PutIdentity(identity.SecretToken(), nil)
|
||||
for _, policyID := range policyIDs {
|
||||
// Make sure to remove from the cache if it was deleted
|
||||
r.fireAsyncTokenResult(policyID, nil, acl.ErrNotFound)
|
||||
// Do not touch the cache. Getting a top level ACL not found error
|
||||
// only indicates that the secret token used in the request
|
||||
// no longer exists
|
||||
r.fireAsyncPolicyResult(policyID, nil, &policyTokenError{acl.ErrNotFound, identity.SecretToken()}, false)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if acl.IsErrPermissionDenied(err) {
|
||||
// invalidate our ID cache so that identity resolution will take place
|
||||
// again in the future
|
||||
//
|
||||
// Note - This must be done before firing the results or else it would
|
||||
// be possible for waiters to get woken up and get the cached identity
|
||||
// again
|
||||
r.cache.RemoveIdentity(identity.SecretToken())
|
||||
|
||||
for _, policyID := range policyIDs {
|
||||
// Do not remove from the cache for permission denied
|
||||
// what this does indicate is that our view of the token is out of date
|
||||
r.fireAsyncPolicyResult(policyID, nil, &policyTokenError{acl.ErrPermissionDenied, identity.SecretToken()}, false)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -530,9 +571,9 @@ func (r *ACLResolver) resolvePoliciesAsyncForIdentity(identity structs.ACLIdenti
|
|||
extendCache := r.config.ACLDownPolicy == "extend-cache" || r.config.ACLDownPolicy == "async-cache"
|
||||
for _, policyID := range policyIDs {
|
||||
if entry, ok := cached[policyID]; extendCache && ok {
|
||||
r.fireAsyncPolicyResult(policyID, entry.Policy, nil)
|
||||
r.fireAsyncPolicyResult(policyID, entry.Policy, nil, true)
|
||||
} else {
|
||||
r.fireAsyncPolicyResult(policyID, nil, ACLRemoteError{Err: err})
|
||||
r.fireAsyncPolicyResult(policyID, nil, ACLRemoteError{Err: err}, true)
|
||||
}
|
||||
}
|
||||
return
|
||||
|
@ -659,12 +700,26 @@ func (r *ACLResolver) resolvePoliciesForIdentity(identity structs.ACLIdentity) (
|
|||
return r.filterPoliciesByScope(policies), nil
|
||||
}
|
||||
|
||||
for i := 0; i < len(newAsyncFetchIds); i++ {
|
||||
for i := 0; i < len(fetchIDs); i++ {
|
||||
res := <-waitChan
|
||||
|
||||
if res.err != nil {
|
||||
if _, ok := res.err.(*policyTokenError); ok {
|
||||
// always return token errors
|
||||
return nil, res.err
|
||||
} else if !acl.IsErrNotFound(res.err) {
|
||||
// ignore regular not found errors for policies
|
||||
return nil, res.err
|
||||
}
|
||||
}
|
||||
|
||||
// we probably could handle a special case where we
|
||||
// get a permission denied error due to another requests
|
||||
// issues and spawn the go routine to resolve it ourselves.
|
||||
// however this should be exceedingly rare and in this case
|
||||
// we can just kick the can down the road and retry the whole
|
||||
// token/policy resolution. All the remaining good bits that
|
||||
// we need will already be cached anyways.
|
||||
|
||||
if res.policy != nil {
|
||||
policies = append(policies, res.policy)
|
||||
|
@ -675,16 +730,45 @@ func (r *ACLResolver) resolvePoliciesForIdentity(identity structs.ACLIdentity) (
|
|||
}
|
||||
|
||||
func (r *ACLResolver) resolveTokenToPolicies(token string) (structs.ACLPolicies, error) {
|
||||
_, policies, err := r.resolveTokenToIdentityAndPolicies(token)
|
||||
return policies, err
|
||||
}
|
||||
|
||||
func (r *ACLResolver) resolveTokenToIdentityAndPolicies(token string) (structs.ACLIdentity, structs.ACLPolicies, error) {
|
||||
var lastErr error
|
||||
var lastIdentity structs.ACLIdentity
|
||||
|
||||
for i := 0; i < tokenPolicyResolutionMaxRetries; i++ {
|
||||
// Resolve the token to an ACLIdentity
|
||||
identity, err := r.resolveIdentityFromToken(token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
} else if identity == nil {
|
||||
return nil, acl.ErrNotFound
|
||||
return nil, nil, acl.ErrNotFound
|
||||
}
|
||||
|
||||
// Resolve the ACLIdentity to ACLPolicies
|
||||
return r.resolvePoliciesForIdentity(identity)
|
||||
lastIdentity = identity
|
||||
|
||||
policies, err := r.resolvePoliciesForIdentity(identity)
|
||||
if err == nil {
|
||||
return identity, policies, nil
|
||||
}
|
||||
lastErr = err
|
||||
|
||||
if tokenErr, ok := err.(*policyTokenError); ok {
|
||||
if acl.IsErrNotFound(err) && tokenErr.token == identity.SecretToken() {
|
||||
// token was deleted while resolving policies
|
||||
return nil, nil, acl.ErrNotFound
|
||||
}
|
||||
|
||||
// other types of policyTokenErrors should cause retrying the whole token
|
||||
// resolution process
|
||||
} else {
|
||||
return identity, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return lastIdentity, nil, lastErr
|
||||
}
|
||||
|
||||
func (r *ACLResolver) disableACLsWhenUpstreamDisabled(err error) error {
|
||||
|
|
|
@ -870,21 +870,32 @@ func (a *ACL) PolicyResolve(args *structs.ACLPolicyBatchGetRequest, reply *struc
|
|||
}
|
||||
|
||||
// get full list of policies for this token
|
||||
policies, err := a.srv.acls.resolveTokenToPolicies(args.Token)
|
||||
identity, policies, err := a.srv.acls.resolveTokenToIdentityAndPolicies(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
idMap := make(map[string]*structs.ACLPolicy)
|
||||
for _, policyID := range identity.PolicyIDs() {
|
||||
idMap[policyID] = nil
|
||||
}
|
||||
for _, policy := range policies {
|
||||
idMap[policy.ID] = policy
|
||||
}
|
||||
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
if policy, ok := idMap[policyID]; ok {
|
||||
// only add non-deleted policies
|
||||
if policy != nil {
|
||||
reply.Policies = append(reply.Policies, policy)
|
||||
}
|
||||
} else {
|
||||
// send a permission denied to indicate that the request included
|
||||
// policy ids not associated with this token
|
||||
return acl.ErrPermissionDenied
|
||||
}
|
||||
}
|
||||
|
||||
a.srv.setQueryMeta(&reply.QueryMeta)
|
||||
|
||||
return nil
|
||||
|
|
|
@ -6,7 +6,9 @@ import (
|
|||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -33,6 +35,16 @@ key_prefix "foo/" {
|
|||
}
|
||||
`
|
||||
|
||||
type asyncResolutionResult struct {
|
||||
authz acl.Authorizer
|
||||
err error
|
||||
}
|
||||
|
||||
func resolveTokenAsync(r *ACLResolver, token string, ch chan *asyncResolutionResult) {
|
||||
authz, err := r.ResolveToken(token)
|
||||
ch <- &asyncResolutionResult{authz: authz, err: err}
|
||||
}
|
||||
|
||||
func testIdentityForToken(token string) (bool, structs.ACLIdentity, error) {
|
||||
switch token {
|
||||
case "missing-policy":
|
||||
|
@ -94,6 +106,55 @@ func testIdentityForToken(token string) (bool, structs.ACLIdentity, error) {
|
|||
},
|
||||
},
|
||||
}, nil
|
||||
case "racey-unmodified":
|
||||
return true, &structs.ACLToken{
|
||||
AccessorID: "5f57c1f6-6a89-4186-9445-531b316e01df",
|
||||
SecretID: "a1a54629-5050-4d17-8a4e-560d2423f835",
|
||||
Policies: []structs.ACLTokenPolicyLink{
|
||||
structs.ACLTokenPolicyLink{
|
||||
ID: "node-wr",
|
||||
},
|
||||
structs.ACLTokenPolicyLink{
|
||||
ID: "acl-wr",
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
case "racey-modified":
|
||||
return true, &structs.ACLToken{
|
||||
AccessorID: "5f57c1f6-6a89-4186-9445-531b316e01df",
|
||||
SecretID: "a1a54629-5050-4d17-8a4e-560d2423f835",
|
||||
Policies: []structs.ACLTokenPolicyLink{
|
||||
structs.ACLTokenPolicyLink{
|
||||
ID: "node-wr",
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
case "concurrent-resolve-1":
|
||||
return true, &structs.ACLToken{
|
||||
AccessorID: "5f57c1f6-6a89-4186-9445-531b316e01df",
|
||||
SecretID: "a1a54629-5050-4d17-8a4e-560d2423f835",
|
||||
Policies: []structs.ACLTokenPolicyLink{
|
||||
structs.ACLTokenPolicyLink{
|
||||
ID: "node-wr",
|
||||
},
|
||||
structs.ACLTokenPolicyLink{
|
||||
ID: "acl-wr",
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
case "concurrent-resolve-2":
|
||||
return true, &structs.ACLToken{
|
||||
AccessorID: "296bbe10-01aa-437e-ac3b-3ecdc00ea65c",
|
||||
SecretID: "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7",
|
||||
Policies: []structs.ACLTokenPolicyLink{
|
||||
structs.ACLTokenPolicyLink{
|
||||
ID: "node-wr",
|
||||
},
|
||||
structs.ACLTokenPolicyLink{
|
||||
ID: "acl-wr",
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
case anonymousToken:
|
||||
return true, &structs.ACLToken{
|
||||
AccessorID: "00000000-0000-0000-0000-000000000002",
|
||||
|
@ -657,6 +718,414 @@ func TestACLResolver_DatacenterScoping(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestACLResolver_Client(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("Racey-Token-Mod-Policy-Resolve", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
var tokenReads int32
|
||||
var policyResolves int32
|
||||
modified := false
|
||||
deleted := false
|
||||
delegate := &ACLResolverTestDelegate{
|
||||
enabled: true,
|
||||
datacenter: "dc1",
|
||||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: false,
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
atomic.AddInt32(&tokenReads, 1)
|
||||
if deleted {
|
||||
return acl.ErrNotFound
|
||||
} else if modified {
|
||||
_, token, _ := testIdentityForToken("racey-modified")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
} else {
|
||||
_, token, _ := testIdentityForToken("racey-unmodified")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
atomic.AddInt32(&policyResolves, 1)
|
||||
if deleted {
|
||||
return acl.ErrNotFound
|
||||
} else if !modified {
|
||||
modified = true
|
||||
return acl.ErrPermissionDenied
|
||||
} else {
|
||||
deleted = true
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
if policy != nil {
|
||||
reply.Policies = append(reply.Policies, policy)
|
||||
}
|
||||
}
|
||||
|
||||
modified = true
|
||||
return nil
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) {
|
||||
config.Config.ACLTokenTTL = 600 * time.Second
|
||||
config.Config.ACLPolicyTTL = 30 * time.Millisecond
|
||||
config.Config.ACLDownPolicy = "extend-cache"
|
||||
})
|
||||
|
||||
// resolves the token
|
||||
// gets a permission denied resolving the policies - token updated
|
||||
// invalidates the token
|
||||
// refetches the token
|
||||
// fetches the policies from the modified token
|
||||
// creates the authorizers
|
||||
//
|
||||
// Must use the token secret here in order for the cached identity
|
||||
// to be removed properly. Many other tests just resolve some other
|
||||
// random name and it wont matter but this one cannot.
|
||||
authz, err := r.ResolveToken("a1a54629-5050-4d17-8a4e-560d2423f835")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, authz)
|
||||
require.True(t, authz.NodeWrite("foo", nil))
|
||||
require.False(t, authz.ACLRead())
|
||||
require.True(t, modified)
|
||||
require.True(t, deleted)
|
||||
require.Equal(t, int32(2), tokenReads)
|
||||
require.Equal(t, int32(2), policyResolves)
|
||||
|
||||
// sleep long enough for the policy cache to expire
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// this round the identity will be resolved from the cache
|
||||
// then the policy will be resolved but resolution will return ACL not found
|
||||
// resolution will stop with the not found error (even though we still have the
|
||||
// policies within the cache)
|
||||
authz, err = r.ResolveToken("a1a54629-5050-4d17-8a4e-560d2423f835")
|
||||
require.EqualError(t, err, acl.ErrNotFound.Error())
|
||||
require.Nil(t, authz)
|
||||
|
||||
require.True(t, modified)
|
||||
require.True(t, deleted)
|
||||
require.Equal(t, tokenReads, int32(2))
|
||||
require.Equal(t, policyResolves, int32(3))
|
||||
})
|
||||
|
||||
t.Run("Concurrent-Token-Resolve", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var tokenReads int32
|
||||
var policyResolves int32
|
||||
readyCh := make(chan struct{})
|
||||
|
||||
delegate := &ACLResolverTestDelegate{
|
||||
enabled: true,
|
||||
datacenter: "dc1",
|
||||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: false,
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
atomic.AddInt32(&tokenReads, 1)
|
||||
|
||||
switch args.TokenID {
|
||||
case "a1a54629-5050-4d17-8a4e-560d2423f835":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-1")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
default:
|
||||
return acl.ErrNotFound
|
||||
}
|
||||
|
||||
select {
|
||||
case <-readyCh:
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
return nil
|
||||
},
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
atomic.AddInt32(&policyResolves, 1)
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
if policy != nil {
|
||||
reply.Policies = append(reply.Policies, policy)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) {
|
||||
// effectively disable caching - so the only way we end up with 1 token read is if they were
|
||||
// being resolved concurrently
|
||||
config.Config.ACLTokenTTL = 0 * time.Second
|
||||
config.Config.ACLPolicyTTL = 30 * time.Millisecond
|
||||
config.Config.ACLDownPolicy = "extend-cache"
|
||||
})
|
||||
|
||||
ch1 := make(chan *asyncResolutionResult)
|
||||
ch2 := make(chan *asyncResolutionResult)
|
||||
go resolveTokenAsync(r, "a1a54629-5050-4d17-8a4e-560d2423f835", ch1)
|
||||
go resolveTokenAsync(r, "a1a54629-5050-4d17-8a4e-560d2423f835", ch2)
|
||||
close(readyCh)
|
||||
|
||||
res1 := <-ch1
|
||||
res2 := <-ch2
|
||||
require.NoError(t, res1.err)
|
||||
require.NoError(t, res2.err)
|
||||
require.Equal(t, res1.authz, res2.authz)
|
||||
require.Equal(t, int32(1), tokenReads)
|
||||
require.Equal(t, int32(1), policyResolves)
|
||||
})
|
||||
|
||||
t.Run("Concurrent-Policy-Resolve", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var tokenReads int32
|
||||
var policyResolves int32
|
||||
delegate := &ACLResolverTestDelegate{
|
||||
enabled: true,
|
||||
datacenter: "dc1",
|
||||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: false,
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
atomic.AddInt32(&tokenReads, 1)
|
||||
|
||||
switch args.TokenID {
|
||||
case "a1a54629-5050-4d17-8a4e-560d2423f835":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-1")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
case "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-2")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
default:
|
||||
return acl.ErrNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
atomic.AddInt32(&policyResolves, 1)
|
||||
// waits until both tokens have been read for up to 1 second
|
||||
for i := 0; i < 100; i++ {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
reads := atomic.LoadInt32(&tokenReads)
|
||||
if reads >= 2 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
if policy != nil {
|
||||
reply.Policies = append(reply.Policies, policy)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) {
|
||||
config.Config.ACLTokenTTL = 600 * time.Second
|
||||
// effectively disables the cache - therefore the only way we end up
|
||||
// with 1 policy resolution is if they get single flighted
|
||||
config.Config.ACLPolicyTTL = 0 * time.Millisecond
|
||||
config.Config.ACLDownPolicy = "extend-cache"
|
||||
})
|
||||
|
||||
ch1 := make(chan *asyncResolutionResult)
|
||||
ch2 := make(chan *asyncResolutionResult)
|
||||
|
||||
go resolveTokenAsync(r, "a1a54629-5050-4d17-8a4e-560d2423f835", ch1)
|
||||
go resolveTokenAsync(r, "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7", ch2)
|
||||
|
||||
res1 := <-ch1
|
||||
res2 := <-ch2
|
||||
|
||||
require.NoError(t, res1.err)
|
||||
require.NoError(t, res2.err)
|
||||
require.Equal(t, res1.authz, res2.authz)
|
||||
require.Equal(t, int32(2), tokenReads)
|
||||
require.Equal(t, int32(1), policyResolves)
|
||||
})
|
||||
|
||||
t.Run("Concurrent-Policy-Resolve-Permission-Denied", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var waitReady int32 = 1
|
||||
var tokenReads int32
|
||||
var policyResolves int32
|
||||
delegate := &ACLResolverTestDelegate{
|
||||
enabled: true,
|
||||
datacenter: "dc1",
|
||||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: false,
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
atomic.AddInt32(&tokenReads, 1)
|
||||
|
||||
switch args.TokenID {
|
||||
case "a1a54629-5050-4d17-8a4e-560d2423f835":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-1")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
case "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-2")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
default:
|
||||
return acl.ErrNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
atomic.AddInt32(&policyResolves, 1)
|
||||
|
||||
if atomic.CompareAndSwapInt32(&waitReady, 1, 0) {
|
||||
// waits until both tokens have been read for up to 1 second
|
||||
for i := 0; i < 100; i++ {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
reads := atomic.LoadInt32(&tokenReads)
|
||||
if reads >= 2 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return acl.ErrPermissionDenied
|
||||
}
|
||||
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
if policy != nil {
|
||||
reply.Policies = append(reply.Policies, policy)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) {
|
||||
config.Config.ACLTokenTTL = 600 * time.Second
|
||||
config.Config.ACLPolicyTTL = 600 * time.Second
|
||||
config.Config.ACLDownPolicy = "extend-cache"
|
||||
})
|
||||
|
||||
ch1 := make(chan *asyncResolutionResult)
|
||||
ch2 := make(chan *asyncResolutionResult)
|
||||
|
||||
go resolveTokenAsync(r, "a1a54629-5050-4d17-8a4e-560d2423f835", ch1)
|
||||
go resolveTokenAsync(r, "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7", ch2)
|
||||
|
||||
res1 := <-ch1
|
||||
res2 := <-ch2
|
||||
|
||||
require.NoError(t, res1.err)
|
||||
require.NoError(t, res2.err)
|
||||
require.Equal(t, res1.authz, res2.authz)
|
||||
// 2 reads for 1 token (cache gets invalidated and only 1 for the other)
|
||||
require.Equal(t, int32(3), tokenReads)
|
||||
require.Equal(t, int32(2), policyResolves)
|
||||
require.True(t, res1.authz.ACLRead())
|
||||
require.True(t, res1.authz.NodeWrite("foo", nil))
|
||||
})
|
||||
|
||||
t.Run("Concurrent-Policy-Resolve-Not-Found", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var waitReady int32 = 1
|
||||
var tokenReads int32
|
||||
var policyResolves int32
|
||||
var tokenNotAllowed string
|
||||
delegate := &ACLResolverTestDelegate{
|
||||
enabled: true,
|
||||
datacenter: "dc1",
|
||||
legacy: false,
|
||||
localTokens: false,
|
||||
localPolicies: false,
|
||||
tokenReadFn: func(args *structs.ACLTokenGetRequest, reply *structs.ACLTokenResponse) error {
|
||||
atomic.AddInt32(&tokenReads, 1)
|
||||
|
||||
switch args.TokenID {
|
||||
case "a1a54629-5050-4d17-8a4e-560d2423f835":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-1")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
case "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7":
|
||||
_, token, _ := testIdentityForToken("concurrent-resolve-2")
|
||||
reply.Token = token.(*structs.ACLToken)
|
||||
default:
|
||||
return acl.ErrNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
policyResolveFn: func(args *structs.ACLPolicyBatchGetRequest, reply *structs.ACLPolicyBatchResponse) error {
|
||||
atomic.AddInt32(&policyResolves, 1)
|
||||
|
||||
if atomic.CompareAndSwapInt32(&waitReady, 1, 0) {
|
||||
// waits until both tokens have been read for up to 1 second
|
||||
for i := 0; i < 100; i++ {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
reads := atomic.LoadInt32(&tokenReads)
|
||||
if reads >= 2 {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
tokenNotAllowed = args.Token
|
||||
return acl.ErrNotFound
|
||||
}
|
||||
|
||||
for _, policyID := range args.PolicyIDs {
|
||||
_, policy, _ := testPolicyForID(policyID)
|
||||
if policy != nil {
|
||||
reply.Policies = append(reply.Policies, policy)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
r := newTestACLResolver(t, delegate, func(config *ACLResolverConfig) {
|
||||
config.Config.ACLTokenTTL = 600 * time.Second
|
||||
config.Config.ACLPolicyTTL = 600 * time.Second
|
||||
config.Config.ACLDownPolicy = "extend-cache"
|
||||
})
|
||||
|
||||
ch1 := make(chan *asyncResolutionResult)
|
||||
ch2 := make(chan *asyncResolutionResult)
|
||||
|
||||
go resolveTokenAsync(r, "a1a54629-5050-4d17-8a4e-560d2423f835", ch1)
|
||||
go resolveTokenAsync(r, "cc58f0f3-2273-42a7-8b4a-2bef9d2863d7", ch2)
|
||||
|
||||
res1 := <-ch1
|
||||
res2 := <-ch2
|
||||
|
||||
var errResult *asyncResolutionResult
|
||||
var goodResult *asyncResolutionResult
|
||||
|
||||
// can't be sure which token resolution is going to be the one that does the first policy resolution
|
||||
// so we record it and then determine here how the results should be validated
|
||||
if tokenNotAllowed == "a1a54629-5050-4d17-8a4e-560d2423f835" {
|
||||
errResult = res1
|
||||
goodResult = res2
|
||||
} else {
|
||||
errResult = res2
|
||||
goodResult = res1
|
||||
}
|
||||
|
||||
require.Error(t, errResult.err)
|
||||
require.Nil(t, errResult.authz)
|
||||
require.EqualError(t, errResult.err, acl.ErrNotFound.Error())
|
||||
require.NoError(t, goodResult.err)
|
||||
require.Equal(t, int32(2), tokenReads)
|
||||
require.Equal(t, int32(2), policyResolves)
|
||||
require.NotNil(t, goodResult.authz)
|
||||
require.True(t, goodResult.authz.ACLRead())
|
||||
require.True(t, goodResult.authz.NodeWrite("foo", nil))
|
||||
})
|
||||
}
|
||||
|
||||
func TestACLResolver_LocalTokensAndPolicies(t *testing.T) {
|
||||
t.Parallel()
|
||||
delegate := &ACLResolverTestDelegate{
|
||||
|
|
Loading…
Reference in New Issue