From 13c04a13af70bcd8c00fcf55003212f584a92799 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Thu, 11 Aug 2022 10:19:36 +0100 Subject: [PATCH] proxycfg: terminate stream on irrecoverable errors This is the OSS portion of enterprise PR 2339. It improves our handling of "irrecoverable" errors in proxycfg data sources. The canonical example of this is what happens when the ACL token presented by Envoy is deleted/revoked. Previously, the stream would get "stuck" until the xDS server re-checked the token (after 5 minutes) and terminated the stream. Materializers would also sit burning resources retrying something that could never succeed. Now, it is possible for data sources to mark errors as "terminal" which causes the xDS stream to be closed immediately. Similarly, the submatview.Store will evict materializers when it observes they have encountered such an error. --- agent/proxycfg-glue/glue.go | 20 +++--- agent/proxycfg-glue/intention_upstreams.go | 7 +-- agent/proxycfg-glue/intentions.go | 17 ++--- agent/proxycfg/data_sources.go | 23 +++++++ agent/proxycfg/manager.go | 39 +++++++++--- agent/proxycfg/state.go | 24 +++++++- agent/submatview/local_materializer.go | 12 ++++ agent/submatview/store.go | 60 +++++++++++++++++- agent/submatview/store_test.go | 72 ++++++++++++++++++++++ agent/xds/delta.go | 18 +++++- agent/xds/server.go | 22 ++++--- 11 files changed, 267 insertions(+), 47 deletions(-) diff --git a/agent/proxycfg-glue/glue.go b/agent/proxycfg-glue/glue.go index 86badf67e4..1b22b02bd4 100644 --- a/agent/proxycfg-glue/glue.go +++ b/agent/proxycfg-glue/glue.go @@ -124,15 +124,21 @@ func (c *cacheProxyDataSource[ReqType]) Notify( func dispatchCacheUpdate(ch chan<- proxycfg.UpdateEvent) cache.Callback { return func(ctx context.Context, e cache.UpdateEvent) { - u := proxycfg.UpdateEvent{ - CorrelationID: e.CorrelationID, - Result: e.Result, - Err: e.Err, - } - select { - case ch <- u: + case ch <- newUpdateEvent(e.CorrelationID, e.Result, e.Err): case <-ctx.Done(): } } } + +func newUpdateEvent(correlationID string, result any, err error) proxycfg.UpdateEvent { + // This roughly matches the logic in agent/submatview.LocalMaterializer.isTerminalError. + if acl.IsErrNotFound(err) { + err = proxycfg.TerminalError(err) + } + return proxycfg.UpdateEvent{ + CorrelationID: correlationID, + Result: result, + Err: err, + } +} diff --git a/agent/proxycfg-glue/intention_upstreams.go b/agent/proxycfg-glue/intention_upstreams.go index 186d91b357..a694d033b4 100644 --- a/agent/proxycfg-glue/intention_upstreams.go +++ b/agent/proxycfg-glue/intention_upstreams.go @@ -54,13 +54,8 @@ func (s serverIntentionUpstreams) Notify(ctx context.Context, req *structs.Servi func dispatchBlockingQueryUpdate[ResultType any](ch chan<- proxycfg.UpdateEvent) func(context.Context, string, ResultType, error) { return func(ctx context.Context, correlationID string, result ResultType, err error) { - event := proxycfg.UpdateEvent{ - CorrelationID: correlationID, - Result: result, - Err: err, - } select { - case ch <- event: + case ch <- newUpdateEvent(correlationID, result, err): case <-ctx.Done(): } } diff --git a/agent/proxycfg-glue/intentions.go b/agent/proxycfg-glue/intentions.go index 57f48bdae9..69652d922d 100644 --- a/agent/proxycfg-glue/intentions.go +++ b/agent/proxycfg-glue/intentions.go @@ -39,12 +39,8 @@ func (c cacheIntentions) Notify(ctx context.Context, req *structs.ServiceSpecifi QueryOptions: structs.QueryOptions{Token: req.QueryOptions.Token}, } return c.c.NotifyCallback(ctx, cachetype.IntentionMatchName, query, correlationID, func(ctx context.Context, event cache.UpdateEvent) { - e := proxycfg.UpdateEvent{ - CorrelationID: correlationID, - Err: event.Err, - } - - if e.Err == nil { + var result any + if event.Err == nil { rsp, ok := event.Result.(*structs.IndexedIntentionMatches) if !ok { return @@ -54,11 +50,11 @@ func (c cacheIntentions) Notify(ctx context.Context, req *structs.ServiceSpecifi if len(rsp.Matches) != 0 { matches = rsp.Matches[0] } - e.Result = matches + result = matches } select { - case ch <- e: + case ch <- newUpdateEvent(correlationID, result, event.Err): case <-ctx.Done(): } }) @@ -110,10 +106,7 @@ func (s *serverIntentions) Notify(ctx context.Context, req *structs.ServiceSpeci sort.Sort(structs.IntentionPrecedenceSorter(intentions)) - return proxycfg.UpdateEvent{ - CorrelationID: correlationID, - Result: intentions, - }, true + return newUpdateEvent(correlationID, intentions, nil), true } for subjectIdx, subject := range subjects { diff --git a/agent/proxycfg/data_sources.go b/agent/proxycfg/data_sources.go index bda0226ffb..3649bed2d3 100644 --- a/agent/proxycfg/data_sources.go +++ b/agent/proxycfg/data_sources.go @@ -2,6 +2,7 @@ package proxycfg import ( "context" + "errors" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" @@ -15,6 +16,28 @@ type UpdateEvent struct { Err error } +// TerminalError wraps the given error to indicate that the data source is in +// an irrecoverably broken state (e.g. because the given ACL token has been +// deleted). +// +// Setting UpdateEvent.Err to a TerminalError causes all watches to be canceled +// which, in turn, terminates the xDS streams. +func TerminalError(err error) error { + return terminalError{err} +} + +// IsTerminalError returns whether the given error indicates that the data +// source is in an irrecoverably broken state so watches should be torn down +// and retried at a higher level. +func IsTerminalError(err error) bool { + return errors.As(err, &terminalError{}) +} + +type terminalError struct{ err error } + +func (e terminalError) Error() string { return e.err.Error() } +func (e terminalError) Unwrap() error { return e.err } + // DataSources contains the dependencies used to consume data used to configure // proxies. type DataSources struct { diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index 3de11b3f8a..efdfe4b724 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -127,7 +127,7 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour } // We are updating the proxy, close its old state - state.Close() + state.Close(false) } // TODO: move to a function that translates ManagerConfig->stateConfig @@ -148,14 +148,13 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour return err } - ch, err := state.Watch() - if err != nil { + if _, err = state.Watch(); err != nil { return err } m.proxies[id] = state // Start a goroutine that will wait for changes and broadcast them to watchers. - go m.notifyBroadcast(ch) + go m.notifyBroadcast(id, state) return nil } @@ -175,8 +174,8 @@ func (m *Manager) Deregister(id ProxyID, source ProxySource) { } // Closing state will let the goroutine we started in Register finish since - // watch chan is closed. - state.Close() + // watch chan is closed + state.Close(false) delete(m.proxies, id) // We intentionally leave potential watchers hanging here - there is no new @@ -186,11 +185,17 @@ func (m *Manager) Deregister(id ProxyID, source ProxySource) { // cleaned up naturally. } -func (m *Manager) notifyBroadcast(ch <-chan ConfigSnapshot) { - // Run until ch is closed - for snap := range ch { +func (m *Manager) notifyBroadcast(proxyID ProxyID, state *state) { + // Run until ch is closed (by a defer in state.run). + for snap := range state.snapCh { m.notify(&snap) } + + // If state.run exited because of an irrecoverable error, close all of the + // watchers so that the consumers reconnect/retry at a higher level. + if state.failed() { + m.closeAllWatchers(proxyID) + } } func (m *Manager) notify(snap *ConfigSnapshot) { @@ -281,6 +286,20 @@ func (m *Manager) Watch(id ProxyID) (<-chan *ConfigSnapshot, CancelFunc) { } } +func (m *Manager) closeAllWatchers(proxyID ProxyID) { + m.mu.Lock() + defer m.mu.Unlock() + + watchers, ok := m.watchers[proxyID] + if !ok { + return + } + + for watchID := range watchers { + m.closeWatchLocked(proxyID, watchID) + } +} + // closeWatchLocked cleans up state related to a single watcher. It assumes the // lock is held. func (m *Manager) closeWatchLocked(proxyID ProxyID, watchID uint64) { @@ -309,7 +328,7 @@ func (m *Manager) Close() error { // Then close all states for proxyID, state := range m.proxies { - state.Close() + state.Close(false) delete(m.proxies, proxyID) } return nil diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 13b22c4fd2..34d3364356 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "reflect" + "sync/atomic" "time" "github.com/hashicorp/go-hclog" @@ -70,11 +71,21 @@ type state struct { // in Watch. cancel func() + // failedFlag is (atomically) set to 1 (by Close) when run exits because a data + // source is in an irrecoverable state. It can be read with failed. + failedFlag int32 + ch chan UpdateEvent snapCh chan ConfigSnapshot reqCh chan chan *ConfigSnapshot } +// failed returns whether run exited because a data source is in an +// irrecoverable state. +func (s *state) failed() bool { + return atomic.LoadInt32(&s.failedFlag) == 1 +} + type DNSConfig struct { Domain string AltDomain string @@ -250,10 +261,13 @@ func (s *state) Watch() (<-chan ConfigSnapshot, error) { } // Close discards the state and stops any long-running watches. -func (s *state) Close() error { +func (s *state) Close(failed bool) error { if s.cancel != nil { s.cancel() } + if failed { + atomic.StoreInt32(&s.failedFlag, 1) + } return nil } @@ -300,7 +314,13 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { case <-ctx.Done(): return case u := <-s.ch: - s.logger.Trace("A blocking query returned; handling snapshot update", "correlationID", u.CorrelationID) + s.logger.Trace("Data source returned; handling snapshot update", "correlationID", u.CorrelationID) + + if IsTerminalError(u.Err) { + s.logger.Error("Data source in an irrecoverable state; exiting", "error", u.Err, "correlationID", u.CorrelationID) + s.Close(true) + return + } if err := s.handler.handleUpdate(ctx, u, snap); err != nil { s.logger.Error("Failed to handle update from watch", diff --git a/agent/submatview/local_materializer.go b/agent/submatview/local_materializer.go index 6e32b36025..b3d4480bda 100644 --- a/agent/submatview/local_materializer.go +++ b/agent/submatview/local_materializer.go @@ -66,6 +66,10 @@ func (m *LocalMaterializer) Run(ctx context.Context) { if ctx.Err() != nil { return } + if m.isTerminalError(err) { + return + } + m.mat.handleError(req, err) if err := m.mat.retryWaiter.Wait(ctx); err != nil { @@ -74,6 +78,14 @@ func (m *LocalMaterializer) Run(ctx context.Context) { } } +// isTerminalError determines whether the given error cannot be recovered from +// and should cause the materializer to halt and be evicted from the view store. +// +// This roughly matches the logic in agent/proxycfg-glue.newUpdateEvent. +func (m *LocalMaterializer) isTerminalError(err error) bool { + return acl.IsErrNotFound(err) +} + // subscribeOnce opens a new subscription to a local backend and runs // for its lifetime or until the view is closed. func (m *LocalMaterializer) subscribeOnce(ctx context.Context, req *pbsubscribe.SubscribeRequest) error { diff --git a/agent/submatview/store.go b/agent/submatview/store.go index 242a0d70d7..dacf2d8bae 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -47,6 +47,9 @@ type entry struct { // requests is the count of active requests using this entry. This entry will // remain in the store as long as this count remains > 0. requests int + // evicting is used to mark an entry that will be evicted when the current in- + // flight requests finish. + evicting bool } // NewStore creates and returns a Store that is ready for use. The caller must @@ -89,6 +92,7 @@ func (s *Store) Run(ctx context.Context) { // Only stop the materializer if there are no active requests. if e.requests == 0 { + s.logger.Trace("evicting item from store", "key", he.Key()) e.stop() delete(s.byKey, he.Key()) } @@ -187,13 +191,13 @@ func (s *Store) NotifyCallback( "error", err, "request-type", req.Type(), "index", index) - continue } index = result.Index cb(ctx, cache.UpdateEvent{ CorrelationID: correlationID, Result: result.Value, + Err: err, Meta: cache.ResultMeta{Index: result.Index, Hit: result.Cached}, }) } @@ -211,6 +215,9 @@ func (s *Store) readEntry(req Request) (string, Materializer, error) { defer s.lock.Unlock() e, ok := s.byKey[key] if ok { + if e.evicting { + return "", nil, errors.New("item is marked for eviction") + } e.requests++ s.byKey[key] = e return key, e.materializer, nil @@ -222,7 +229,18 @@ func (s *Store) readEntry(req Request) (string, Materializer, error) { } ctx, cancel := context.WithCancel(context.Background()) - go mat.Run(ctx) + go func() { + mat.Run(ctx) + + // Materializers run until they either reach their TTL and are evicted (which + // cancels the given context) or encounter an irrecoverable error. + // + // If the context hasn't been canceled, we know it's the error case so we + // trigger an immediate eviction. + if ctx.Err() == nil { + s.evictNow(key) + } + }() e = entry{ materializer: mat, @@ -233,6 +251,28 @@ func (s *Store) readEntry(req Request) (string, Materializer, error) { return key, e.materializer, nil } +// evictNow causes the item with the given key to be evicted immediately. +// +// If there are requests in-flight, the item is marked for eviction such that +// once the requests have been served releaseEntry will move it to the top of +// the expiry heap. If there are no requests in-flight, evictNow will move the +// item to the top of the expiry heap itself. +// +// In either case, the entry's evicting flag prevents it from being served by +// readEntry (and thereby gaining new in-flight requests). +func (s *Store) evictNow(key string) { + s.lock.Lock() + defer s.lock.Unlock() + + e := s.byKey[key] + e.evicting = true + s.byKey[key] = e + + if e.requests == 0 { + s.expireNowLocked(key) + } +} + // releaseEntry decrements the request count and starts an expiry timer if the // count has reached 0. Must be called once for every call to readEntry. func (s *Store) releaseEntry(key string) { @@ -246,6 +286,11 @@ func (s *Store) releaseEntry(key string) { return } + if e.evicting { + s.expireNowLocked(key) + return + } + if e.expiry.Index() == ttlcache.NotIndexed { e.expiry = s.expiryHeap.Add(key, s.idleTTL) s.byKey[key] = e @@ -255,6 +300,17 @@ func (s *Store) releaseEntry(key string) { s.expiryHeap.Update(e.expiry.Index(), s.idleTTL) } +// expireNowLocked moves the item with the given key to the top of the expiry +// heap, causing it to be picked up by the expiry loop and evicted immediately. +func (s *Store) expireNowLocked(key string) { + e := s.byKey[key] + if idx := e.expiry.Index(); idx != ttlcache.NotIndexed { + s.expiryHeap.Remove(idx) + } + e.expiry = s.expiryHeap.Add(key, time.Duration(0)) + s.byKey[key] = e +} + // makeEntryKey matches agent/cache.makeEntryKey, but may change in the future. func makeEntryKey(typ string, r cache.RequestInfo) string { return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key) diff --git a/agent/submatview/store_test.go b/agent/submatview/store_test.go index 1d5789c054..aab0995998 100644 --- a/agent/submatview/store_test.go +++ b/agent/submatview/store_test.go @@ -509,3 +509,75 @@ func TestStore_Run_ExpiresEntries(t *testing.T) { require.Len(t, store.byKey, 0) require.Equal(t, ttlcache.NotIndexed, e.expiry.Index()) } + +func TestStore_Run_FailingMaterializer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + store := NewStore(hclog.NewNullLogger()) + store.idleTTL = 24 * time.Hour + go store.Run(ctx) + + t.Run("with an in-flight request", func(t *testing.T) { + req := &failingMaterializerRequest{ + doneCh: make(chan struct{}), + } + + ch := make(chan cache.UpdateEvent) + reqCtx, reqCancel := context.WithCancel(context.Background()) + t.Cleanup(reqCancel) + require.NoError(t, store.Notify(reqCtx, req, "", ch)) + + assertRequestCount(t, store, req, 1) + + // Cause the materializer to "fail" (exit before its context is canceled). + close(req.doneCh) + + // End the in-flight request. + reqCancel() + + // Check that the item was evicted. + retry.Run(t, func(r *retry.R) { + store.lock.Lock() + defer store.lock.Unlock() + + require.Len(r, store.byKey, 0) + }) + }) + + t.Run("with no in-flight requests", func(t *testing.T) { + req := &failingMaterializerRequest{ + doneCh: make(chan struct{}), + } + + // Cause the materializer to "fail" (exit before its context is canceled). + close(req.doneCh) + + // Check that the item was evicted. + retry.Run(t, func(r *retry.R) { + store.lock.Lock() + defer store.lock.Unlock() + + require.Len(r, store.byKey, 0) + }) + }) +} + +type failingMaterializerRequest struct { + doneCh chan struct{} +} + +func (failingMaterializerRequest) CacheInfo() cache.RequestInfo { return cache.RequestInfo{} } +func (failingMaterializerRequest) Type() string { return "test.FailingMaterializerRequest" } + +func (r *failingMaterializerRequest) NewMaterializer() (Materializer, error) { + return &failingMaterializer{doneCh: r.doneCh}, nil +} + +type failingMaterializer struct { + doneCh <-chan struct{} +} + +func (failingMaterializer) Query(context.Context, uint64) (Result, error) { return Result{}, nil } + +func (m *failingMaterializer) Run(context.Context) { <-m.doneCh } diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 701c04f2ed..71c1edcb0f 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -81,6 +81,11 @@ const ( ) func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discovery_v3.DeltaDiscoveryRequest) error { + // Handle invalid ACL tokens up-front. + if _, err := s.authenticate(stream.Context()); err != nil { + return err + } + // Loop state var ( cfgSnap *proxycfg.ConfigSnapshot @@ -200,7 +205,18 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove } } - case cfgSnap = <-stateCh: + case cs, ok := <-stateCh: + if !ok { + // stateCh is closed either when *we* cancel the watch (on-exit via defer) + // or by the proxycfg.Manager when an irrecoverable error is encountered + // such as the ACL token getting deleted. + // + // We know for sure that this is the latter case, because in the former we + // would've already exited this loop. + return status.Error(codes.Aborted, "xDS stream terminated due to an irrecoverable error, please try again") + } + cfgSnap = cs + newRes, err := generator.allResourcesFromSnapshot(cfgSnap) if err != nil { return status.Errorf(codes.Unavailable, "failed to generate all xDS resources from the snapshot: %v", err) diff --git a/agent/xds/server.go b/agent/xds/server.go index cc27f3fde7..3ee42e77b0 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -186,6 +186,18 @@ func (s *Server) Register(srv *grpc.Server) { envoy_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, s) } +func (s *Server) authenticate(ctx context.Context) (acl.Authorizer, error) { + authz, err := s.ResolveToken(external.TokenFromContext(ctx)) + if acl.IsErrNotFound(err) { + return nil, status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err) + } else if acl.IsErrPermissionDenied(err) { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } else if err != nil { + return nil, status.Errorf(codes.Internal, "error resolving acl token: %v", err) + } + return authz, nil +} + // authorize the xDS request using the token stored in ctx. This authorization is // a bit different from most interfaces. Instead of explicitly authorizing or // filtering each piece of data in the response, the request is authorized @@ -201,13 +213,9 @@ func (s *Server) authorize(ctx context.Context, cfgSnap *proxycfg.ConfigSnapshot return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot") } - authz, err := s.ResolveToken(external.TokenFromContext(ctx)) - if acl.IsErrNotFound(err) { - return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err) - } else if acl.IsErrPermissionDenied(err) { - return status.Error(codes.PermissionDenied, err.Error()) - } else if err != nil { - return status.Errorf(codes.Internal, "error resolving acl token: %v", err) + authz, err := s.authenticate(ctx) + if err != nil { + return err } var authzContext acl.AuthorizerContext