Handle ACL errors consistently when blocking query timeout is reached. (#20876)

Currently, when a client starts a blocking query and an ACL token expires within
that time, Consul will return ACL not found error with a 403 status code. However,
sometimes if an ACL token is invalidated at the same time as the query's deadline is reached,
Consul will instead return an empty response with a 200 status code.

This is because of the events being executed.
1. Client issues a blocking query request with timeout `t`.
2. ACL is deleted.
3. Server detects a change in ACLs and force closes the gRPC stream.
4. Client resubscribes with the same token and resets its state (view).
5. Client sees "ACL not found" error.

If ACL is deleted before step 4, the client is unaware that the stream was closed due to
an ACL error and will return an empty view (from the reset state) with the 200 status code.

To fix this problem, we introduce another state to the subsciption to indicate when a change
to ACLs has occured. If the server sees that there was an error due to ACL change, it will
re-authenticate the request and return an error if the token is no longer valid.

Fixes #20790
This commit is contained in:
Iryna Shustava 2024-03-22 14:59:54 -06:00 committed by GitHub
parent 12fd9db45d
commit d747b51dab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 92 additions and 15 deletions

3
.changelog/20876.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
streaming: Handle ACL errors consistently when blocking query timeout is reached.
```

View File

@ -72,7 +72,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
// Ensure the reset event was sent. // Ensure the reset event was sent.
err = assertErr(t, eventCh) err = assertErr(t, eventCh)
require.Equal(t, stream.ErrSubForceClosed, err) require.Equal(t, stream.ErrACLChanged, err)
// Register another subscription. // Register another subscription.
subscription2 := &stream.SubscribeRequest{ subscription2 := &stream.SubscribeRequest{
@ -101,7 +101,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
// Ensure the reset event was sent. // Ensure the reset event was sent.
err = assertErr(t, eventCh2) err = assertErr(t, eventCh2)
require.Equal(t, stream.ErrSubForceClosed, err) require.Equal(t, stream.ErrACLChanged, err)
} }
func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
@ -191,7 +191,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
// Ensure the reload event was sent. // Ensure the reload event was sent.
err = assertErr(t, eventCh) err = assertErr(t, eventCh)
require.Equal(t, stream.ErrSubForceClosed, err) require.Equal(t, stream.ErrACLChanged, err)
// Register another subscription. // Register another subscription.
subscription3 := &stream.SubscribeRequest{ subscription3 := &stream.SubscribeRequest{
@ -381,7 +381,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
} }
} }
require.Error(t, next.Err) require.Error(t, next.Err)
require.Equal(t, stream.ErrSubForceClosed, next.Err) require.Equal(t, stream.ErrACLChanged, next.Err)
return return
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
t.Fatalf("no err after 100ms") t.Fatalf("no err after 100ms")

View File

@ -393,7 +393,7 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
for _, secretID := range tokenSecretIDs { for _, secretID := range tokenSecretIDs {
if subs, ok := s.byToken[secretID]; ok { if subs, ok := s.byToken[secretID]; ok {
for _, sub := range subs { for _, sub := range subs {
sub.forceClose() sub.closeACLChanged()
} }
} }
} }

View File

@ -25,9 +25,13 @@ const (
// will not return new events. // will not return new events.
subStateUnsub = 2 subStateUnsub = 2
// subStateShutting down indicates the subscription was closed due to // subStateShuttingDown indicates the subscription was closed due to
// the server being shut down. // the server being shut down.
subStateShuttingDown = 3 subStateShuttingDown = 3
// subStateACLChanged indicates the subscription was closed due to
// a change in ACLs.
subStateACLChanged = 4
) )
// ErrSubForceClosed is a error signalling the subscription has been // ErrSubForceClosed is a error signalling the subscription has been
@ -39,6 +43,12 @@ var ErrSubForceClosed = errors.New("subscription closed by server, client must r
// subscribe to a different server to get streaming event updates. // subscribe to a different server to get streaming event updates.
var ErrShuttingDown = errors.New("subscription closed by server, server is shutting down") var ErrShuttingDown = errors.New("subscription closed by server, server is shutting down")
// ErrACLChanged is an error to signal that the subscription has
// been closed because a change in ACL token or its associated roles or policies has occurred.
// If the token or policy is no longer valid, the client should resubscribe using a valid token. Otherwise,
// the client should resubscribe using the same token.
var ErrACLChanged = errors.New("subscription closed by server, ACL change occurred")
// Subscription provides events on a Topic. Events may be filtered by Key. // Subscription provides events on a Topic. Events may be filtered by Key.
// Events are returned by Next(), and may start with a Snapshot of events. // Events are returned by Next(), and may start with a Snapshot of events.
type Subscription struct { type Subscription struct {
@ -131,6 +141,8 @@ func (s *Subscription) requireStateOpen() error {
return ErrSubForceClosed return ErrSubForceClosed
case subStateShuttingDown: case subStateShuttingDown:
return ErrShuttingDown return ErrShuttingDown
case subStateACLChanged:
return ErrACLChanged
case subStateUnsub: case subStateUnsub:
return fmt.Errorf("subscription was closed by unsubscribe") return fmt.Errorf("subscription was closed by unsubscribe")
default: default:
@ -166,6 +178,14 @@ func (s *Subscription) shutDown() {
} }
} }
// Close the subscription and indicate that an ACL change occurred. This change may require
// a client to subscribe with a new token or re-subscribe with an existing token.
func (s *Subscription) closeACLChanged() {
if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateACLChanged) {
close(s.closed)
}
}
// Unsubscribe the subscription, freeing resources. // Unsubscribe the subscription, freeing resources.
func (s *Subscription) Unsubscribe() { func (s *Subscription) Unsubscribe() {
if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateUnsub) { if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateUnsub) {

View File

@ -47,7 +47,7 @@ func (s *Server) WatchRoots(_ *pbconnectca.WatchRootsRequest, serverStream pbcon
for { for {
var err error var err error
idx, err = s.serveRoots(options.Token, idx, serverStream, logger) idx, err = s.serveRoots(options.Token, idx, serverStream, logger)
if errors.Is(err, stream.ErrSubForceClosed) { if errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged) {
logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume") logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume")
} else { } else {
return err return err
@ -90,7 +90,7 @@ func (s *Server) serveRoots(
for { for {
event, err := sub.Next(serverStream.Context()) event, err := sub.Next(serverStream.Context())
switch { switch {
case errors.Is(err, stream.ErrSubForceClosed): case errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged):
// If the subscription was closed because the state store was abandoned (e.g. // If the subscription was closed because the state store was abandoned (e.g.
// following a snapshot restore) reset idx to ensure we don't skip over the // following a snapshot restore) reset idx to ensure we don't skip over the
// new store's events. // new store's events.

View File

@ -40,7 +40,7 @@ func (s *Server) WatchServers(req *pbserverdiscovery.WatchServersRequest, server
for { for {
var err error var err error
idx, err = s.serveReadyServers(options.Token, idx, req, serverStream, logger) idx, err = s.serveReadyServers(options.Token, idx, req, serverStream, logger)
if errors.Is(err, stream.ErrSubForceClosed) { if errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged) {
logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume") logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume")
} else { } else {
return err return err
@ -69,7 +69,7 @@ func (s *Server) serveReadyServers(token string, index uint64, req *pbserverdisc
for { for {
event, err := sub.Next(serverStream.Context()) event, err := sub.Next(serverStream.Context())
switch { switch {
case errors.Is(err, stream.ErrSubForceClosed): case errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged):
return index, err return index, err
case errors.Is(err, context.Canceled): case errors.Is(err, context.Canceled):
return 0, nil return 0, nil

View File

@ -77,6 +77,14 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
case errors.Is(err, stream.ErrSubForceClosed): case errors.Is(err, stream.ErrSubForceClosed):
logger.Trace("subscription reset by server") logger.Trace("subscription reset by server")
return status.Error(codes.Aborted, err.Error()) return status.Error(codes.Aborted, err.Error())
case errors.Is(err, stream.ErrACLChanged):
logger.Trace("ACL change occurred; re-authenticating")
_, authzErr := h.Backend.ResolveTokenAndDefaultMeta(req.Token, &entMeta, nil)
if authzErr != nil {
return authzErr
}
// Otherwise, abort the stream so the client re-subscribes.
return status.Error(codes.Aborted, err.Error())
case err != nil: case err != nil:
return err return err
} }

View File

@ -6,6 +6,7 @@ package subscribe
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"io" "io"
"net" "net"
"testing" "testing"
@ -323,17 +324,21 @@ func getEvent(t *testing.T, ch chan eventOrError) *pbsubscribe.Event {
} }
type testBackend struct { type testBackend struct {
publisher *stream.EventPublisher publisher *stream.EventPublisher
store *state.Store store *state.Store
authorizer func(token string, entMeta *acl.EnterpriseMeta) acl.Authorizer authorizer func(token string, entMeta *acl.EnterpriseMeta) acl.Authorizer
forwardConn *gogrpc.ClientConn resolveTokenAndDefaultMeta func(token string, entMeta *acl.EnterpriseMeta, _ *acl.AuthorizerContext) (acl.Authorizer, error)
forwardConn *gogrpc.ClientConn
} }
func (b testBackend) ResolveTokenAndDefaultMeta( func (b testBackend) ResolveTokenAndDefaultMeta(
token string, token string,
entMeta *acl.EnterpriseMeta, entMeta *acl.EnterpriseMeta,
_ *acl.AuthorizerContext, authCtx *acl.AuthorizerContext,
) (acl.Authorizer, error) { ) (acl.Authorizer, error) {
if b.resolveTokenAndDefaultMeta != nil {
return b.resolveTokenAndDefaultMeta(token, entMeta, authCtx)
}
return b.authorizer(token, entMeta), nil return b.authorizer(token, entMeta), nil
} }
@ -986,6 +991,47 @@ node "node1" {
t.Fatalf("timeout waiting for aborted error") t.Fatalf("timeout waiting for aborted error")
} }
}) })
// Re-subscribe because the previous test step terminated the stream.
chEvents = make(chan eventOrError, 0)
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Subject: &pbsubscribe.SubscribeRequest_NamedSubject{
NamedSubject: &pbsubscribe.NamedSubject{
Key: "foo",
},
},
Token: token,
})
require.NoError(t, err)
go recvEvents(chEvents, streamHandle)
// Stub out token authn function so that the token is no longer considered valid.
backend.resolveTokenAndDefaultMeta = func(t string, entMeta *acl.EnterpriseMeta, _ *acl.AuthorizerContext) (acl.Authorizer, error) {
return nil, fmt.Errorf("ACL not found")
}
testutil.RunStep(t, "invalid token should return an error", func(t *testing.T) {
// Force another ACL update.
tokenID, err := uuid.GenerateUUID()
require.NoError(t, err)
aclToken := &structs.ACLToken{
AccessorID: tokenID,
SecretID: token,
}
require.NoError(t, backend.store.ACLTokenSet(ids.Next("update"), aclToken))
select {
case item := <-chEvents:
require.Error(t, item.err, "got event instead of an error: %v", item.event)
require.EqualError(t, item.err, "rpc error: code = Unknown desc = ACL not found")
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for ACL not found error")
}
})
} }
func assertNoEvents(t *testing.T, chEvents chan eventOrError) { func assertNoEvents(t *testing.T, chEvents chan eventOrError) {