proxycfg: terminate stream on irrecoverable errors

This is the OSS portion of enterprise PR 2339.

It improves our handling of "irrecoverable" errors in proxycfg data sources.

The canonical example of this is what happens when the ACL token presented by
Envoy is deleted/revoked. Previously, the stream would get "stuck" until the
xDS server re-checked the token (after 5 minutes) and terminated the stream.

Materializers would also sit burning resources retrying something that could
never succeed.

Now, it is possible for data sources to mark errors as "terminal" which causes
the xDS stream to be closed immediately. Similarly, the submatview.Store will
evict materializers when it observes they have encountered such an error.
This commit is contained in:
Daniel Upton 2022-08-11 10:19:36 +01:00 committed by Dan Upton
parent 24a3975494
commit 13c04a13af
11 changed files with 267 additions and 47 deletions

View File

@ -124,15 +124,21 @@ func (c *cacheProxyDataSource[ReqType]) Notify(
func dispatchCacheUpdate(ch chan<- proxycfg.UpdateEvent) cache.Callback {
return func(ctx context.Context, e cache.UpdateEvent) {
u := proxycfg.UpdateEvent{
CorrelationID: e.CorrelationID,
Result: e.Result,
Err: e.Err,
}
select {
case ch <- u:
case ch <- newUpdateEvent(e.CorrelationID, e.Result, e.Err):
case <-ctx.Done():
}
}
}
func newUpdateEvent(correlationID string, result any, err error) proxycfg.UpdateEvent {
// This roughly matches the logic in agent/submatview.LocalMaterializer.isTerminalError.
if acl.IsErrNotFound(err) {
err = proxycfg.TerminalError(err)
}
return proxycfg.UpdateEvent{
CorrelationID: correlationID,
Result: result,
Err: err,
}
}

View File

@ -54,13 +54,8 @@ func (s serverIntentionUpstreams) Notify(ctx context.Context, req *structs.Servi
func dispatchBlockingQueryUpdate[ResultType any](ch chan<- proxycfg.UpdateEvent) func(context.Context, string, ResultType, error) {
return func(ctx context.Context, correlationID string, result ResultType, err error) {
event := proxycfg.UpdateEvent{
CorrelationID: correlationID,
Result: result,
Err: err,
}
select {
case ch <- event:
case ch <- newUpdateEvent(correlationID, result, err):
case <-ctx.Done():
}
}

View File

@ -39,12 +39,8 @@ func (c cacheIntentions) Notify(ctx context.Context, req *structs.ServiceSpecifi
QueryOptions: structs.QueryOptions{Token: req.QueryOptions.Token},
}
return c.c.NotifyCallback(ctx, cachetype.IntentionMatchName, query, correlationID, func(ctx context.Context, event cache.UpdateEvent) {
e := proxycfg.UpdateEvent{
CorrelationID: correlationID,
Err: event.Err,
}
if e.Err == nil {
var result any
if event.Err == nil {
rsp, ok := event.Result.(*structs.IndexedIntentionMatches)
if !ok {
return
@ -54,11 +50,11 @@ func (c cacheIntentions) Notify(ctx context.Context, req *structs.ServiceSpecifi
if len(rsp.Matches) != 0 {
matches = rsp.Matches[0]
}
e.Result = matches
result = matches
}
select {
case ch <- e:
case ch <- newUpdateEvent(correlationID, result, event.Err):
case <-ctx.Done():
}
})
@ -110,10 +106,7 @@ func (s *serverIntentions) Notify(ctx context.Context, req *structs.ServiceSpeci
sort.Sort(structs.IntentionPrecedenceSorter(intentions))
return proxycfg.UpdateEvent{
CorrelationID: correlationID,
Result: intentions,
}, true
return newUpdateEvent(correlationID, intentions, nil), true
}
for subjectIdx, subject := range subjects {

View File

@ -2,6 +2,7 @@ package proxycfg
import (
"context"
"errors"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
@ -15,6 +16,28 @@ type UpdateEvent struct {
Err error
}
// TerminalError wraps the given error to indicate that the data source is in
// an irrecoverably broken state (e.g. because the given ACL token has been
// deleted).
//
// Setting UpdateEvent.Err to a TerminalError causes all watches to be canceled
// which, in turn, terminates the xDS streams.
func TerminalError(err error) error {
return terminalError{err}
}
// IsTerminalError returns whether the given error indicates that the data
// source is in an irrecoverably broken state so watches should be torn down
// and retried at a higher level.
func IsTerminalError(err error) bool {
return errors.As(err, &terminalError{})
}
type terminalError struct{ err error }
func (e terminalError) Error() string { return e.err.Error() }
func (e terminalError) Unwrap() error { return e.err }
// DataSources contains the dependencies used to consume data used to configure
// proxies.
type DataSources struct {

View File

@ -127,7 +127,7 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour
}
// We are updating the proxy, close its old state
state.Close()
state.Close(false)
}
// TODO: move to a function that translates ManagerConfig->stateConfig
@ -148,14 +148,13 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour
return err
}
ch, err := state.Watch()
if err != nil {
if _, err = state.Watch(); err != nil {
return err
}
m.proxies[id] = state
// Start a goroutine that will wait for changes and broadcast them to watchers.
go m.notifyBroadcast(ch)
go m.notifyBroadcast(id, state)
return nil
}
@ -175,8 +174,8 @@ func (m *Manager) Deregister(id ProxyID, source ProxySource) {
}
// Closing state will let the goroutine we started in Register finish since
// watch chan is closed.
state.Close()
// watch chan is closed
state.Close(false)
delete(m.proxies, id)
// We intentionally leave potential watchers hanging here - there is no new
@ -186,11 +185,17 @@ func (m *Manager) Deregister(id ProxyID, source ProxySource) {
// cleaned up naturally.
}
func (m *Manager) notifyBroadcast(ch <-chan ConfigSnapshot) {
// Run until ch is closed
for snap := range ch {
func (m *Manager) notifyBroadcast(proxyID ProxyID, state *state) {
// Run until ch is closed (by a defer in state.run).
for snap := range state.snapCh {
m.notify(&snap)
}
// If state.run exited because of an irrecoverable error, close all of the
// watchers so that the consumers reconnect/retry at a higher level.
if state.failed() {
m.closeAllWatchers(proxyID)
}
}
func (m *Manager) notify(snap *ConfigSnapshot) {
@ -281,6 +286,20 @@ func (m *Manager) Watch(id ProxyID) (<-chan *ConfigSnapshot, CancelFunc) {
}
}
func (m *Manager) closeAllWatchers(proxyID ProxyID) {
m.mu.Lock()
defer m.mu.Unlock()
watchers, ok := m.watchers[proxyID]
if !ok {
return
}
for watchID := range watchers {
m.closeWatchLocked(proxyID, watchID)
}
}
// closeWatchLocked cleans up state related to a single watcher. It assumes the
// lock is held.
func (m *Manager) closeWatchLocked(proxyID ProxyID, watchID uint64) {
@ -309,7 +328,7 @@ func (m *Manager) Close() error {
// Then close all states
for proxyID, state := range m.proxies {
state.Close()
state.Close(false)
delete(m.proxies, proxyID)
}
return nil

View File

@ -6,6 +6,7 @@ import (
"fmt"
"net"
"reflect"
"sync/atomic"
"time"
"github.com/hashicorp/go-hclog"
@ -70,11 +71,21 @@ type state struct {
// in Watch.
cancel func()
// failedFlag is (atomically) set to 1 (by Close) when run exits because a data
// source is in an irrecoverable state. It can be read with failed.
failedFlag int32
ch chan UpdateEvent
snapCh chan ConfigSnapshot
reqCh chan chan *ConfigSnapshot
}
// failed returns whether run exited because a data source is in an
// irrecoverable state.
func (s *state) failed() bool {
return atomic.LoadInt32(&s.failedFlag) == 1
}
type DNSConfig struct {
Domain string
AltDomain string
@ -250,10 +261,13 @@ func (s *state) Watch() (<-chan ConfigSnapshot, error) {
}
// Close discards the state and stops any long-running watches.
func (s *state) Close() error {
func (s *state) Close(failed bool) error {
if s.cancel != nil {
s.cancel()
}
if failed {
atomic.StoreInt32(&s.failedFlag, 1)
}
return nil
}
@ -300,7 +314,13 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
case <-ctx.Done():
return
case u := <-s.ch:
s.logger.Trace("A blocking query returned; handling snapshot update", "correlationID", u.CorrelationID)
s.logger.Trace("Data source returned; handling snapshot update", "correlationID", u.CorrelationID)
if IsTerminalError(u.Err) {
s.logger.Error("Data source in an irrecoverable state; exiting", "error", u.Err, "correlationID", u.CorrelationID)
s.Close(true)
return
}
if err := s.handler.handleUpdate(ctx, u, snap); err != nil {
s.logger.Error("Failed to handle update from watch",

View File

@ -66,6 +66,10 @@ func (m *LocalMaterializer) Run(ctx context.Context) {
if ctx.Err() != nil {
return
}
if m.isTerminalError(err) {
return
}
m.mat.handleError(req, err)
if err := m.mat.retryWaiter.Wait(ctx); err != nil {
@ -74,6 +78,14 @@ func (m *LocalMaterializer) Run(ctx context.Context) {
}
}
// isTerminalError determines whether the given error cannot be recovered from
// and should cause the materializer to halt and be evicted from the view store.
//
// This roughly matches the logic in agent/proxycfg-glue.newUpdateEvent.
func (m *LocalMaterializer) isTerminalError(err error) bool {
return acl.IsErrNotFound(err)
}
// subscribeOnce opens a new subscription to a local backend and runs
// for its lifetime or until the view is closed.
func (m *LocalMaterializer) subscribeOnce(ctx context.Context, req *pbsubscribe.SubscribeRequest) error {

View File

@ -47,6 +47,9 @@ type entry struct {
// requests is the count of active requests using this entry. This entry will
// remain in the store as long as this count remains > 0.
requests int
// evicting is used to mark an entry that will be evicted when the current in-
// flight requests finish.
evicting bool
}
// NewStore creates and returns a Store that is ready for use. The caller must
@ -89,6 +92,7 @@ func (s *Store) Run(ctx context.Context) {
// Only stop the materializer if there are no active requests.
if e.requests == 0 {
s.logger.Trace("evicting item from store", "key", he.Key())
e.stop()
delete(s.byKey, he.Key())
}
@ -187,13 +191,13 @@ func (s *Store) NotifyCallback(
"error", err,
"request-type", req.Type(),
"index", index)
continue
}
index = result.Index
cb(ctx, cache.UpdateEvent{
CorrelationID: correlationID,
Result: result.Value,
Err: err,
Meta: cache.ResultMeta{Index: result.Index, Hit: result.Cached},
})
}
@ -211,6 +215,9 @@ func (s *Store) readEntry(req Request) (string, Materializer, error) {
defer s.lock.Unlock()
e, ok := s.byKey[key]
if ok {
if e.evicting {
return "", nil, errors.New("item is marked for eviction")
}
e.requests++
s.byKey[key] = e
return key, e.materializer, nil
@ -222,7 +229,18 @@ func (s *Store) readEntry(req Request) (string, Materializer, error) {
}
ctx, cancel := context.WithCancel(context.Background())
go mat.Run(ctx)
go func() {
mat.Run(ctx)
// Materializers run until they either reach their TTL and are evicted (which
// cancels the given context) or encounter an irrecoverable error.
//
// If the context hasn't been canceled, we know it's the error case so we
// trigger an immediate eviction.
if ctx.Err() == nil {
s.evictNow(key)
}
}()
e = entry{
materializer: mat,
@ -233,6 +251,28 @@ func (s *Store) readEntry(req Request) (string, Materializer, error) {
return key, e.materializer, nil
}
// evictNow causes the item with the given key to be evicted immediately.
//
// If there are requests in-flight, the item is marked for eviction such that
// once the requests have been served releaseEntry will move it to the top of
// the expiry heap. If there are no requests in-flight, evictNow will move the
// item to the top of the expiry heap itself.
//
// In either case, the entry's evicting flag prevents it from being served by
// readEntry (and thereby gaining new in-flight requests).
func (s *Store) evictNow(key string) {
s.lock.Lock()
defer s.lock.Unlock()
e := s.byKey[key]
e.evicting = true
s.byKey[key] = e
if e.requests == 0 {
s.expireNowLocked(key)
}
}
// releaseEntry decrements the request count and starts an expiry timer if the
// count has reached 0. Must be called once for every call to readEntry.
func (s *Store) releaseEntry(key string) {
@ -246,6 +286,11 @@ func (s *Store) releaseEntry(key string) {
return
}
if e.evicting {
s.expireNowLocked(key)
return
}
if e.expiry.Index() == ttlcache.NotIndexed {
e.expiry = s.expiryHeap.Add(key, s.idleTTL)
s.byKey[key] = e
@ -255,6 +300,17 @@ func (s *Store) releaseEntry(key string) {
s.expiryHeap.Update(e.expiry.Index(), s.idleTTL)
}
// expireNowLocked moves the item with the given key to the top of the expiry
// heap, causing it to be picked up by the expiry loop and evicted immediately.
func (s *Store) expireNowLocked(key string) {
e := s.byKey[key]
if idx := e.expiry.Index(); idx != ttlcache.NotIndexed {
s.expiryHeap.Remove(idx)
}
e.expiry = s.expiryHeap.Add(key, time.Duration(0))
s.byKey[key] = e
}
// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future.
func makeEntryKey(typ string, r cache.RequestInfo) string {
return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key)

View File

@ -509,3 +509,75 @@ func TestStore_Run_ExpiresEntries(t *testing.T) {
require.Len(t, store.byKey, 0)
require.Equal(t, ttlcache.NotIndexed, e.expiry.Index())
}
func TestStore_Run_FailingMaterializer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
store := NewStore(hclog.NewNullLogger())
store.idleTTL = 24 * time.Hour
go store.Run(ctx)
t.Run("with an in-flight request", func(t *testing.T) {
req := &failingMaterializerRequest{
doneCh: make(chan struct{}),
}
ch := make(chan cache.UpdateEvent)
reqCtx, reqCancel := context.WithCancel(context.Background())
t.Cleanup(reqCancel)
require.NoError(t, store.Notify(reqCtx, req, "", ch))
assertRequestCount(t, store, req, 1)
// Cause the materializer to "fail" (exit before its context is canceled).
close(req.doneCh)
// End the in-flight request.
reqCancel()
// Check that the item was evicted.
retry.Run(t, func(r *retry.R) {
store.lock.Lock()
defer store.lock.Unlock()
require.Len(r, store.byKey, 0)
})
})
t.Run("with no in-flight requests", func(t *testing.T) {
req := &failingMaterializerRequest{
doneCh: make(chan struct{}),
}
// Cause the materializer to "fail" (exit before its context is canceled).
close(req.doneCh)
// Check that the item was evicted.
retry.Run(t, func(r *retry.R) {
store.lock.Lock()
defer store.lock.Unlock()
require.Len(r, store.byKey, 0)
})
})
}
type failingMaterializerRequest struct {
doneCh chan struct{}
}
func (failingMaterializerRequest) CacheInfo() cache.RequestInfo { return cache.RequestInfo{} }
func (failingMaterializerRequest) Type() string { return "test.FailingMaterializerRequest" }
func (r *failingMaterializerRequest) NewMaterializer() (Materializer, error) {
return &failingMaterializer{doneCh: r.doneCh}, nil
}
type failingMaterializer struct {
doneCh <-chan struct{}
}
func (failingMaterializer) Query(context.Context, uint64) (Result, error) { return Result{}, nil }
func (m *failingMaterializer) Run(context.Context) { <-m.doneCh }

View File

@ -81,6 +81,11 @@ const (
)
func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discovery_v3.DeltaDiscoveryRequest) error {
// Handle invalid ACL tokens up-front.
if _, err := s.authenticate(stream.Context()); err != nil {
return err
}
// Loop state
var (
cfgSnap *proxycfg.ConfigSnapshot
@ -200,7 +205,18 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
}
}
case cfgSnap = <-stateCh:
case cs, ok := <-stateCh:
if !ok {
// stateCh is closed either when *we* cancel the watch (on-exit via defer)
// or by the proxycfg.Manager when an irrecoverable error is encountered
// such as the ACL token getting deleted.
//
// We know for sure that this is the latter case, because in the former we
// would've already exited this loop.
return status.Error(codes.Aborted, "xDS stream terminated due to an irrecoverable error, please try again")
}
cfgSnap = cs
newRes, err := generator.allResourcesFromSnapshot(cfgSnap)
if err != nil {
return status.Errorf(codes.Unavailable, "failed to generate all xDS resources from the snapshot: %v", err)

View File

@ -186,6 +186,18 @@ func (s *Server) Register(srv *grpc.Server) {
envoy_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, s)
}
func (s *Server) authenticate(ctx context.Context) (acl.Authorizer, error) {
authz, err := s.ResolveToken(external.TokenFromContext(ctx))
if acl.IsErrNotFound(err) {
return nil, status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err)
} else if acl.IsErrPermissionDenied(err) {
return nil, status.Error(codes.PermissionDenied, err.Error())
} else if err != nil {
return nil, status.Errorf(codes.Internal, "error resolving acl token: %v", err)
}
return authz, nil
}
// authorize the xDS request using the token stored in ctx. This authorization is
// a bit different from most interfaces. Instead of explicitly authorizing or
// filtering each piece of data in the response, the request is authorized
@ -201,13 +213,9 @@ func (s *Server) authorize(ctx context.Context, cfgSnap *proxycfg.ConfigSnapshot
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot")
}
authz, err := s.ResolveToken(external.TokenFromContext(ctx))
if acl.IsErrNotFound(err) {
return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err)
} else if acl.IsErrPermissionDenied(err) {
return status.Error(codes.PermissionDenied, err.Error())
} else if err != nil {
return status.Errorf(codes.Internal, "error resolving acl token: %v", err)
authz, err := s.authenticate(ctx)
if err != nil {
return err
}
var authzContext acl.AuthorizerContext