Iryna Shustava d747b51dab
Handle ACL errors consistently when blocking query timeout is reached. (#20876)
Currently, when a client starts a blocking query and an ACL token expires within
that time, Consul will return ACL not found error with a 403 status code. However,
sometimes if an ACL token is invalidated at the same time as the query's deadline is reached,
Consul will instead return an empty response with a 200 status code.

This is because of the events being executed.
1. Client issues a blocking query request with timeout `t`.
2. ACL is deleted.
3. Server detects a change in ACLs and force closes the gRPC stream.
4. Client resubscribes with the same token and resets its state (view).
5. Client sees "ACL not found" error.

If ACL is deleted before step 4, the client is unaware that the stream was closed due to
an ACL error and will return an empty view (from the reset state) with the 200 status code.

To fix this problem, we introduce another state to the subsciption to indicate when a change
to ACLs has occured. If the server sees that there was an error due to ACL change, it will
re-authenticate the request and return an error if the token is no longer valid.

Fixes #20790
2024-03-22 14:59:54 -06:00

142 lines
4.8 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package serverdiscovery
import (
"context"
"errors"
"math/rand"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/agent/consul/autopilotevents"
"github.com/hashicorp/consul/agent/consul/stream"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto-public/pbserverdiscovery"
)
// WatchServers provides a stream on which you can receive the list of servers
// that are ready to receive incoming requests including stale queries. The
// current set of ready servers are sent immediately at the start of the
// stream and new updates will be sent whenver the set of ready servers changes.
func (s *Server) WatchServers(req *pbserverdiscovery.WatchServersRequest, serverStream pbserverdiscovery.ServerDiscoveryService_WatchServersServer) error {
logger := s.Logger.Named("watch-servers").With("request_id", external.TraceID())
logger.Debug("starting stream")
defer logger.Trace("stream closed")
options, err := external.QueryOptionsFromContext(serverStream.Context())
if err != nil {
return err
}
// Serve the ready servers from an EventPublisher subscription. If the subscription is
// closed due to an ACL change, we'll attempt to re-authorize and resume it to
// prevent unnecessarily terminating the stream.
var idx uint64
for {
var err error
idx, err = s.serveReadyServers(options.Token, idx, req, serverStream, logger)
if errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged) {
logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt to re-auth and resume")
} else {
return err
}
}
}
func (s *Server) serveReadyServers(token string, index uint64, req *pbserverdiscovery.WatchServersRequest, serverStream pbserverdiscovery.ServerDiscoveryService_WatchServersServer, logger hclog.Logger) (uint64, error) {
if err := external.RequireAnyValidACLToken(s.ACLResolver, token); err != nil {
return 0, err
}
// Start the subscription.
sub, err := s.Publisher.Subscribe(&stream.SubscribeRequest{
Topic: autopilotevents.EventTopicReadyServers,
Subject: stream.SubjectNone,
Token: token,
Index: index,
})
if err != nil {
logger.Error("failed to subscribe to server discovery events", "error", err)
return 0, status.Error(codes.Internal, "failed to subscribe to server discovery events")
}
defer sub.Unsubscribe()
for {
event, err := sub.Next(serverStream.Context())
switch {
case errors.Is(err, stream.ErrSubForceClosed) || errors.Is(err, stream.ErrACLChanged):
return index, err
case errors.Is(err, context.Canceled):
return 0, nil
case err != nil:
logger.Error("failed to read next event", "error", err)
return index, status.Error(codes.Internal, err.Error())
}
// We do not send framing events (e.g. EndOfSnapshot, NewSnapshotToFollow)
// because we send a full list of ready servers on every event, rather than expecting
// clients to maintain a state-machine in the way they do for service health.
if event.IsFramingEvent() {
continue
}
// Note: this check isn't strictly necessary because the event publishing
// machinery will ensure the index increases monotonically, but it can be
// tricky to faithfully reproduce this in tests (e.g. the EventPublisher
// garbage collects topic buffers and snapshots aggressively when streams
// disconnect) so this avoids a bunch of confusing setup code.
if event.Index <= index {
continue
}
index = event.Index
rsp, err := eventToResponse(req, event)
if err != nil {
logger.Error("failed to convert event to response", "error", err)
return index, status.Error(codes.Internal, err.Error())
}
if err := serverStream.Send(rsp); err != nil {
logger.Error("failed to send response", "error", err)
return index, err
}
}
}
func eventToResponse(req *pbserverdiscovery.WatchServersRequest, event stream.Event) (*pbserverdiscovery.WatchServersResponse, error) {
readyServers, err := autopilotevents.ExtractEventPayload(event)
if err != nil {
return nil, err
}
var servers []*pbserverdiscovery.Server
for _, srv := range readyServers {
addr := srv.Address
wanAddr, ok := srv.TaggedAddresses[structs.TaggedAddressWAN]
if req.Wan && ok {
addr = wanAddr
}
servers = append(servers, &pbserverdiscovery.Server{
Id: srv.ID,
Version: srv.Version,
Address: addr,
})
}
// Shuffle servers so that consul-dataplane doesn't consistently choose the same connections on startup.
rand.Shuffle(len(servers), func(i, j int) {
servers[i], servers[j] = servers[j], servers[i]
})
return &pbserverdiscovery.WatchServersResponse{
Servers: servers,
}, nil
}