From d747b51dab34308034222d7b4cd160fbcea0b93d Mon Sep 17 00:00:00 2001 From: Iryna Shustava Date: Fri, 22 Mar 2024 14:59:54 -0600 Subject: [PATCH] Handle ACL errors consistently when blocking query timeout is reached. (#20876) Currently, when a client starts a blocking query and an ACL token expires within that time, Consul will return ACL not found error with a 403 status code. However, sometimes if an ACL token is invalidated at the same time as the query's deadline is reached, Consul will instead return an empty response with a 200 status code. This is because of the events being executed. 1. Client issues a blocking query request with timeout `t`. 2. ACL is deleted. 3. Server detects a change in ACLs and force closes the gRPC stream. 4. Client resubscribes with the same token and resets its state (view). 5. Client sees "ACL not found" error. If ACL is deleted before step 4, the client is unaware that the stream was closed due to an ACL error and will return an empty view (from the reset state) with the 200 status code. To fix this problem, we introduce another state to the subsciption to indicate when a change to ACLs has occured. If the server sees that there was an error due to ACL change, it will re-authenticate the request and return an error if the token is no longer valid. Fixes #20790 --- .changelog/20876.txt | 3 + agent/consul/state/store_integration_test.go | 8 +-- agent/consul/stream/event_publisher.go | 2 +- agent/consul/stream/subscription.go | 22 +++++++- .../services/connectca/watch_roots.go | 4 +- .../services/serverdiscovery/watch_servers.go | 4 +- .../services/subscribe/subscribe.go | 8 +++ .../services/subscribe/subscribe_test.go | 56 +++++++++++++++++-- 8 files changed, 92 insertions(+), 15 deletions(-) create mode 100644 .changelog/20876.txt diff --git a/.changelog/20876.txt b/.changelog/20876.txt new file mode 100644 index 0000000000..70a2ad468e --- /dev/null +++ b/.changelog/20876.txt @@ -0,0 +1,3 @@ +```release-note:bug +streaming: Handle ACL errors consistently when blocking query timeout is reached. +``` diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 25a91c5586..bf51039e25 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -72,7 +72,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { // Ensure the reset event was sent. err = assertErr(t, eventCh) - require.Equal(t, stream.ErrSubForceClosed, err) + require.Equal(t, stream.ErrACLChanged, err) // Register another subscription. subscription2 := &stream.SubscribeRequest{ @@ -101,7 +101,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { // Ensure the reset event was sent. err = assertErr(t, eventCh2) - require.Equal(t, stream.ErrSubForceClosed, err) + require.Equal(t, stream.ErrACLChanged, err) } func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { @@ -191,7 +191,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { // Ensure the reload event was sent. err = assertErr(t, eventCh) - require.Equal(t, stream.ErrSubForceClosed, err) + require.Equal(t, stream.ErrACLChanged, err) // Register another subscription. subscription3 := &stream.SubscribeRequest{ @@ -381,7 +381,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) { } } require.Error(t, next.Err) - require.Equal(t, stream.ErrSubForceClosed, next.Err) + require.Equal(t, stream.ErrACLChanged, next.Err) return case <-time.After(100 * time.Millisecond): t.Fatalf("no err after 100ms") diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 04aa08334b..cf1454bce8 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -393,7 +393,7 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { for _, secretID := range tokenSecretIDs { if subs, ok := s.byToken[secretID]; ok { for _, sub := range subs { - sub.forceClose() + sub.closeACLChanged() } } } diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index 23911eff2e..187b2c92d4 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -25,9 +25,13 @@ const ( // will not return new events. subStateUnsub = 2 - // subStateShutting down indicates the subscription was closed due to + // subStateShuttingDown indicates the subscription was closed due to // the server being shut down. subStateShuttingDown = 3 + + // subStateACLChanged indicates the subscription was closed due to + // a change in ACLs. + subStateACLChanged = 4 ) // ErrSubForceClosed is a error signalling the subscription has been @@ -39,6 +43,12 @@ var ErrSubForceClosed = errors.New("subscription closed by server, client must r // subscribe to a different server to get streaming event updates. var ErrShuttingDown = errors.New("subscription closed by server, server is shutting down") +// ErrACLChanged is an error to signal that the subscription has +// been closed because a change in ACL token or its associated roles or policies has occurred. +// If the token or policy is no longer valid, the client should resubscribe using a valid token. Otherwise, +// the client should resubscribe using the same token. +var ErrACLChanged = errors.New("subscription closed by server, ACL change occurred") + // Subscription provides events on a Topic. Events may be filtered by Key. // Events are returned by Next(), and may start with a Snapshot of events. type Subscription struct { @@ -131,6 +141,8 @@ func (s *Subscription) requireStateOpen() error { return ErrSubForceClosed case subStateShuttingDown: return ErrShuttingDown + case subStateACLChanged: + return ErrACLChanged case subStateUnsub: return fmt.Errorf("subscription was closed by unsubscribe") default: @@ -166,6 +178,14 @@ func (s *Subscription) shutDown() { } } +// Close the subscription and indicate that an ACL change occurred. This change may require +// a client to subscribe with a new token or re-subscribe with an existing token. +func (s *Subscription) closeACLChanged() { + if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateACLChanged) { + close(s.closed) + } +} + // Unsubscribe the subscription, freeing resources. func (s *Subscription) Unsubscribe() { if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateUnsub) { diff --git a/agent/grpc-external/services/connectca/watch_roots.go b/agent/grpc-external/services/connectca/watch_roots.go index ddd02ca56e..058fdd4134 100644 --- a/agent/grpc-external/services/connectca/watch_roots.go +++ b/agent/grpc-external/services/connectca/watch_roots.go @@ -47,7 +47,7 @@ func (s *Server) WatchRoots(_ *pbconnectca.WatchRootsRequest, serverStream pbcon for { var err error idx, err = s.serveRoots(options.Token, idx, serverStream, logger) - if errors.Is(err, stream.ErrSubForceClosed) { + if errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged) { logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume") } else { return err @@ -90,7 +90,7 @@ func (s *Server) serveRoots( for { event, err := sub.Next(serverStream.Context()) switch { - case errors.Is(err, stream.ErrSubForceClosed): + case errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged): // If the subscription was closed because the state store was abandoned (e.g. // following a snapshot restore) reset idx to ensure we don't skip over the // new store's events. diff --git a/agent/grpc-external/services/serverdiscovery/watch_servers.go b/agent/grpc-external/services/serverdiscovery/watch_servers.go index 24960336c8..15586111f5 100644 --- a/agent/grpc-external/services/serverdiscovery/watch_servers.go +++ b/agent/grpc-external/services/serverdiscovery/watch_servers.go @@ -40,7 +40,7 @@ func (s *Server) WatchServers(req *pbserverdiscovery.WatchServersRequest, server for { var err error idx, err = s.serveReadyServers(options.Token, idx, req, serverStream, logger) - if errors.Is(err, stream.ErrSubForceClosed) { + if errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged) { logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume") } else { return err @@ -69,7 +69,7 @@ func (s *Server) serveReadyServers(token string, index uint64, req *pbserverdisc for { event, err := sub.Next(serverStream.Context()) switch { - case errors.Is(err, stream.ErrSubForceClosed): + case errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged): return index, err case errors.Is(err, context.Canceled): return 0, nil diff --git a/agent/grpc-internal/services/subscribe/subscribe.go b/agent/grpc-internal/services/subscribe/subscribe.go index a728b0164c..5075d9e3dd 100644 --- a/agent/grpc-internal/services/subscribe/subscribe.go +++ b/agent/grpc-internal/services/subscribe/subscribe.go @@ -77,6 +77,14 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub case errors.Is(err, stream.ErrSubForceClosed): logger.Trace("subscription reset by server") return status.Error(codes.Aborted, err.Error()) + case errors.Is(err, stream.ErrACLChanged): + logger.Trace("ACL change occurred; re-authenticating") + _, authzErr := h.Backend.ResolveTokenAndDefaultMeta(req.Token, &entMeta, nil) + if authzErr != nil { + return authzErr + } + // Otherwise, abort the stream so the client re-subscribes. + return status.Error(codes.Aborted, err.Error()) case err != nil: return err } diff --git a/agent/grpc-internal/services/subscribe/subscribe_test.go b/agent/grpc-internal/services/subscribe/subscribe_test.go index 9f6b550cc9..a574da3fa8 100644 --- a/agent/grpc-internal/services/subscribe/subscribe_test.go +++ b/agent/grpc-internal/services/subscribe/subscribe_test.go @@ -6,6 +6,7 @@ package subscribe import ( "context" "errors" + "fmt" "io" "net" "testing" @@ -323,17 +324,21 @@ func getEvent(t *testing.T, ch chan eventOrError) *pbsubscribe.Event { } type testBackend struct { - publisher *stream.EventPublisher - store *state.Store - authorizer func(token string, entMeta *acl.EnterpriseMeta) acl.Authorizer - forwardConn *gogrpc.ClientConn + publisher *stream.EventPublisher + store *state.Store + authorizer func(token string, entMeta *acl.EnterpriseMeta) acl.Authorizer + resolveTokenAndDefaultMeta func(token string, entMeta *acl.EnterpriseMeta, _ *acl.AuthorizerContext) (acl.Authorizer, error) + forwardConn *gogrpc.ClientConn } func (b testBackend) ResolveTokenAndDefaultMeta( token string, entMeta *acl.EnterpriseMeta, - _ *acl.AuthorizerContext, + authCtx *acl.AuthorizerContext, ) (acl.Authorizer, error) { + if b.resolveTokenAndDefaultMeta != nil { + return b.resolveTokenAndDefaultMeta(token, entMeta, authCtx) + } return b.authorizer(token, entMeta), nil } @@ -986,6 +991,47 @@ node "node1" { t.Fatalf("timeout waiting for aborted error") } }) + + // Re-subscribe because the previous test step terminated the stream. + chEvents = make(chan eventOrError, 0) + streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) + streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ + Topic: pbsubscribe.Topic_ServiceHealth, + Subject: &pbsubscribe.SubscribeRequest_NamedSubject{ + NamedSubject: &pbsubscribe.NamedSubject{ + Key: "foo", + }, + }, + Token: token, + }) + require.NoError(t, err) + + go recvEvents(chEvents, streamHandle) + + // Stub out token authn function so that the token is no longer considered valid. + backend.resolveTokenAndDefaultMeta = func(t string, entMeta *acl.EnterpriseMeta, _ *acl.AuthorizerContext) (acl.Authorizer, error) { + return nil, fmt.Errorf("ACL not found") + } + + testutil.RunStep(t, "invalid token should return an error", func(t *testing.T) { + // Force another ACL update. + tokenID, err := uuid.GenerateUUID() + require.NoError(t, err) + + aclToken := &structs.ACLToken{ + AccessorID: tokenID, + SecretID: token, + } + require.NoError(t, backend.store.ACLTokenSet(ids.Next("update"), aclToken)) + + select { + case item := <-chEvents: + require.Error(t, item.err, "got event instead of an error: %v", item.event) + require.EqualError(t, item.err, "rpc error: code = Unknown desc = ACL not found") + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for ACL not found error") + } + }) } func assertNoEvents(t *testing.T, chEvents chan eventOrError) {