From 0ac8ae6c3beb8641927f0880d5c8c4e9754e2d82 Mon Sep 17 00:00:00 2001 From: Derek Menteer <105233703+hashi-derek@users.noreply.github.com> Date: Fri, 15 Mar 2024 13:57:11 -0500 Subject: [PATCH] Fix xDS deadlock due to syncLoop termination. (#20867) * Fix xDS deadlock due to syncLoop termination. This fixes an issue where agentless xDS streams can deadlock permanently until a server is restarted. When this issue occurs, no new proxies are able to successfully connect to the server. Effectively, the trigger for this deadlock stems from the following return statement: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L199-L202 When this happens, the entire `syncLoop()` terminates and stops consuming from the following channel: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L182-L192 Which results in the `ConfigSource.cleanup()` function never receiving a response and holding a mutex indefinitely: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L241-L247 Because this mutex is shared, it effectively deadlocks the server's ability to process new xDS streams. ---- The fix to this issue involves removing the `chan chan struct{}` used like an RPC-over-channels pattern and replacing it with two distinct channels: + `stopSyncLoopCh` - indicates that the `syncLoop()` should terminate soon. + `syncLoopDoneCh` - indicates that the `syncLoop()` has terminated. Splitting these two concepts out and deferring a `close(syncLoopDoneCh)` in the `syncLoop()` function ensures that the deadlock above should no longer occur. We also now evict xDS connections of all proxies for the corresponding `syncLoop()` whenever it encounters an irrecoverable error. This is done by hoisting the new `syncLoopDoneCh` upwards so that it's visible to the xDS delta processing. Prior to this fix, the behavior was to simply orphan them so they would never receive catalog-registration or service-defaults updates. * Add changelog. --- .changelog/20867.txt | 3 + .../proxycfg-sources/catalog/config_source.go | 88 +++++---- .../catalog/config_source_test.go | 177 +++++++++++++++++- .../catalog/mock_ConfigManager.go | 2 +- .../catalog/mock_SessionLimiter.go | 2 +- .../proxycfg-sources/catalog/mock_Watcher.go | 31 ++- agent/proxycfg-sources/local/config_source.go | 11 +- agent/proxycfg/manager.go | 7 +- agent/proxycfg_test.go | 2 +- agent/xds/delta.go | 24 ++- agent/xds/delta_test.go | 28 +++ agent/xds/server.go | 5 +- agent/xds/xds_protocol_helpers_test.go | 35 +++- internal/mesh/proxy-tracker/proxy_tracker.go | 22 ++- .../mesh/proxy-tracker/proxy_tracker_test.go | 14 +- 15 files changed, 353 insertions(+), 98 deletions(-) create mode 100644 .changelog/20867.txt diff --git a/.changelog/20867.txt b/.changelog/20867.txt new file mode 100644 index 0000000000..3a39126c8a --- /dev/null +++ b/.changelog/20867.txt @@ -0,0 +1,3 @@ +```release-note:bug +connect: Fix xDS deadlock that could result in proxies being unable to start. +``` diff --git a/agent/proxycfg-sources/catalog/config_source.go b/agent/proxycfg-sources/catalog/config_source.go index 9ded9aa7fd..deb1bbeac8 100644 --- a/agent/proxycfg-sources/catalog/config_source.go +++ b/agent/proxycfg-sources/catalog/config_source.go @@ -34,9 +34,12 @@ type ConfigSource struct { shutdownCh chan struct{} } +var _ Watcher = (*ConfigSource)(nil) + type watch struct { - numWatchers int // guarded by ConfigSource.mu. - closeCh chan chan struct{} + numWatchers int // guarded by ConfigSource.mu. + stopSyncLoopCh chan struct{} + syncLoopDoneCh chan struct{} } // NewConfigSource creates a ConfigSource with the given configuration. @@ -50,7 +53,7 @@ func NewConfigSource(cfg Config) *ConfigSource { // Watch wraps the underlying proxycfg.Manager and dynamically registers // services from the catalog with it when requested by the xDS server. -func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) { +func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.SrcTerminatedChan, proxysnapshot.CancelFunc, error) { // Create service ID serviceID := structs.NewServiceID(id.Name, GetEnterpriseMetaFromResourceID(id)) // If the service is registered to the local agent, use the LocalConfigSource @@ -67,7 +70,7 @@ func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) ( // See: https://github.com/hashicorp/consul/issues/15753 session, err := m.SessionLimiter.BeginSession() if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } proxyID := proxycfg.ProxyID{ @@ -79,6 +82,28 @@ func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) ( // Start the watch on the real proxycfg Manager. snapCh, cancelWatch := m.Manager.Watch(proxyID) + m.mu.Lock() + defer m.mu.Unlock() + + w, ok := m.watches[proxyID] + if ok { + w.numWatchers++ + } else { + w = &watch{ + numWatchers: 1, + stopSyncLoopCh: make(chan struct{}), + syncLoopDoneCh: make(chan struct{}), + } + m.watches[proxyID] = w + + if err := m.startSync(w.stopSyncLoopCh, w.syncLoopDoneCh, proxyID); err != nil { + delete(m.watches, proxyID) + cancelWatch() + session.End() + return nil, nil, nil, nil, err + } + } + // Wrap the cancelWatch function with our bookkeeping. m.mu must be held when calling. var cancelOnce sync.Once cancel := func() { @@ -88,26 +113,7 @@ func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) ( session.End() }) } - - m.mu.Lock() - defer m.mu.Unlock() - - w, ok := m.watches[proxyID] - if ok { - w.numWatchers++ - } else { - w = &watch{closeCh: make(chan chan struct{}), numWatchers: 1} - m.watches[proxyID] = w - - if err := m.startSync(w.closeCh, proxyID); err != nil { - delete(m.watches, proxyID) - cancelWatch() - session.End() - return nil, nil, nil, err - } - } - - return snapCh, session.Terminated(), cancel, nil + return snapCh, session.Terminated(), w.syncLoopDoneCh, cancel, nil } func (m *ConfigSource) Shutdown() { @@ -122,7 +128,11 @@ func (m *ConfigSource) Shutdown() { // // If the first attempt to fetch and register the service fails, startSync // will return an error (and no goroutine will be started). -func (m *ConfigSource) startSync(closeCh <-chan chan struct{}, proxyID proxycfg.ProxyID) error { +func (m *ConfigSource) startSync( + stopSyncLoopCh <-chan struct{}, + syncLoopDoneCh chan<- struct{}, + proxyID proxycfg.ProxyID, +) error { logger := m.Logger.With( "proxy_service_id", proxyID.ServiceID.String(), "node", proxyID.NodeName, @@ -170,7 +180,13 @@ func (m *ConfigSource) startSync(closeCh <-chan chan struct{}, proxyID proxycfg. syncLoop := func(ws memdb.WatchSet) { // Cancel the context on return to clean up the goroutine started by WatchCh. ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + defer func() { + cancel() + logger.Debug("de-registering service with proxycfg manager because all watchers have gone away") + m.Manager.Deregister(proxyID, source) + close(syncLoopDoneCh) + logger.Debug("sync-loop terminated") + }() for { select { @@ -179,16 +195,13 @@ func (m *ConfigSource) startSync(closeCh <-chan chan struct{}, proxyID proxycfg. // // It is expected that all other branches of this select will return and // cancel the context given to WatchCh (to clean up its goroutine). - case doneCh := <-closeCh: + case <-stopSyncLoopCh: // All watchers of this service (xDS streams) have gone away, so it's time // to free its resources. // // TODO(agentless): we should probably wait for a short grace period before // de-registering the service to allow clients to reconnect after a network // blip. - logger.Trace("de-registering service with proxycfg manager because all watchers have gone away") - m.Manager.Deregister(proxyID, source) - close(doneCh) return case <-m.shutdownCh: // Manager is shutting down, stop the goroutine. @@ -198,6 +211,7 @@ func (m *ConfigSource) startSync(closeCh <-chan chan struct{}, proxyID proxycfg. var err error ws, err = fetchAndRegister() if err != nil { + logger.Debug("error in syncLoop.fetchAndRegister", "err", err) return } } @@ -233,18 +247,14 @@ func (m *ConfigSource) cleanup(id proxycfg.ProxyID) { h.numWatchers-- if h.numWatchers == 0 { - // We wait for doneCh to be closed by the sync goroutine, so that the lock is + // Notify the sync loop that it should terminate. + close(h.stopSyncLoopCh) + // We wait for sync loop to be closed, so that the lock is // held until after the service is de-registered - this prevents a potential // race where another sync goroutine is started for the service and we undo // its call to register the service. - // - // This cannot deadlock because closeCh is unbuffered. Sending will only - // succeed if the sync goroutine is ready to receive (which always closes - // doneCh). - doneCh := make(chan struct{}) select { - case h.closeCh <- doneCh: - <-doneCh + case <-h.syncLoopDoneCh: case <-m.shutdownCh: // ConfigSource is shutting down, so the goroutine will be stopped anyway. } @@ -293,7 +303,7 @@ type Store interface { //go:generate mockery --name Watcher --inpackage type Watcher interface { - Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) + Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.SrcTerminatedChan, proxysnapshot.CancelFunc, error) } //go:generate mockery --name SessionLimiter --inpackage diff --git a/agent/proxycfg-sources/catalog/config_source_test.go b/agent/proxycfg-sources/catalog/config_source_test.go index 94d939e8ca..7b267023a6 100644 --- a/agent/proxycfg-sources/catalog/config_source_test.go +++ b/agent/proxycfg-sources/catalog/config_source_test.go @@ -6,6 +6,7 @@ package catalog import ( "context" "errors" + "fmt" "testing" "time" @@ -79,7 +80,7 @@ func TestConfigSource_Success(t *testing.T) { }) t.Cleanup(mgr.Shutdown) - snapCh, termCh, cancelWatch1, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) + snapCh, termCh, _, cancelWatch1, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) require.NoError(t, err) require.Equal(t, session1TermCh, termCh) @@ -136,7 +137,7 @@ func TestConfigSource_Success(t *testing.T) { } // Start another watch. - _, termCh2, cancelWatch2, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) + _, termCh2, _, cancelWatch2, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) require.NoError(t, err) require.Equal(t, session2TermCh, termCh2) @@ -179,7 +180,7 @@ func TestConfigSource_LocallyManagedService(t *testing.T) { localWatcher := NewMockWatcher(t) localWatcher.On("Watch", proxyID, nodeName, token). - Return(make(<-chan proxysnapshot.ProxySnapshot), nil, proxysnapshot.CancelFunc(func() {}), nil) + Return(make(<-chan proxysnapshot.ProxySnapshot), nil, nil, proxysnapshot.CancelFunc(func() {}), nil) mgr := NewConfigSource(Config{ NodeName: nodeName, @@ -191,7 +192,7 @@ func TestConfigSource_LocallyManagedService(t *testing.T) { }) t.Cleanup(mgr.Shutdown) - _, _, _, err := mgr.Watch(proxyID, nodeName, token) + _, _, _, _, err := mgr.Watch(proxyID, nodeName, token) require.NoError(t, err) } @@ -238,13 +239,173 @@ func TestConfigSource_ErrorRegisteringService(t *testing.T) { }) t.Cleanup(mgr.Shutdown) - _, _, _, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) + _, _, _, _, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) require.Error(t, err) require.True(t, canceledWatch, "watch should've been canceled") session.AssertCalled(t, "End") } +func TestConfigSource_ErrorInSyncLoop(t *testing.T) { + serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil) + nodeName := "node-name" + token := "token" + + store := testStateStore(t) + + // Register the proxy in the catalog/state store at port 9999. + require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{ + Node: nodeName, + Service: &structs.NodeService{ + ID: serviceID.ID, + Service: "web-sidecar-proxy", + Port: 9999, + Kind: structs.ServiceKindConnectProxy, + Proxy: structs.ConnectProxyConfig{ + Config: map[string]any{ + "local_connect_timeout_ms": 123, + }, + }, + }, + })) + + cfgMgr := NewMockConfigManager(t) + { + proxyID := proxycfg.ProxyID{ + ServiceID: serviceID, + NodeName: nodeName, + Token: token, + } + snapCh := make(chan proxysnapshot.ProxySnapshot, 1) + cfgMgr.On("Watch", proxyID). + Return((<-chan proxysnapshot.ProxySnapshot)(snapCh), proxysnapshot.CancelFunc(func() {}), nil) + + // Answer the register call successfully for session 1 starting (Repeatability = 1). + // Session 2 should not have caused a re-register to happen. + cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false). + Run(func(args mock.Arguments) { + id := args.Get(0).(proxycfg.ProxyID) + ns := args.Get(1).(*structs.NodeService) + + snapCh <- &proxycfg.ConfigSnapshot{ + ProxyID: id, + Port: ns.Port, + Proxy: ns.Proxy, + } + }). + Return(nil). + Repeatability = 1 + + // Error on subsequent registrations afterwards (during the sync loop). + cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false). + Return(fmt.Errorf("intentional registration error")) + + cfgMgr.On("Deregister", proxyID, source). + Run(func(mock.Arguments) { close(snapCh) }). + Return() + } + + lim := NewMockSessionLimiter(t) + session1TermCh := make(limiter.SessionTerminatedChan) + session2TermCh := make(limiter.SessionTerminatedChan) + { + session1 := newMockSession(t) + session1.On("Terminated").Return(session1TermCh) + session1.On("End").Return() + + session2 := newMockSession(t) + session2.On("Terminated").Return(session2TermCh) + session2.On("End").Return() + + lim.On("BeginSession").Return(session1, nil).Once() + lim.On("BeginSession").Return(session2, nil).Once() + } + + mgr := NewConfigSource(Config{ + Manager: cfgMgr, + LocalState: testLocalState(t), + Logger: hclog.NewNullLogger(), + GetStore: func() Store { return store }, + SessionLimiter: lim, + }) + t.Cleanup(mgr.Shutdown) + + snapCh, termCh, cfgSrcTerminated1, cancelWatch1, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) + require.NoError(t, err) + require.Equal(t, session1TermCh, termCh) + + // Expect Register to have been called with the proxy's inital port. + select { + case snap := <-snapCh: + require.Equal(t, 9999, snap.(*proxycfg.ConfigSnapshot).Port) + require.Equal(t, token, snap.(*proxycfg.ConfigSnapshot).ProxyID.Token) + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout waiting for snapshot") + } + + // Start another watch. + _, termCh2, cfgSrcTerminated2, cancelWatch2, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) + require.NoError(t, err) + require.Equal(t, session2TermCh, termCh2) + + // Expect the service to have not been re-registered by the second watch. + select { + case <-snapCh: + t.Fatal("service shouldn't have been re-registered") + case <-time.After(100 * time.Millisecond): + } + + // Ensure that no config-source syncLoops were terminated. + select { + case <-cfgSrcTerminated1: + t.Fatal("unexpected config-source termination 1") + case <-cfgSrcTerminated2: + t.Fatal("unexpected config-source termination 2") + default: + } + + // Update the proxy's port to 8888. + // This should trigger the config-source syncLoop termination channel due to an error. + require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{ + Node: nodeName, + Service: &structs.NodeService{ + ID: serviceID.ID, + Service: "web-sidecar-proxy", + Port: 8888, + Kind: structs.ServiceKindConnectProxy, + Proxy: structs.ConnectProxyConfig{ + Config: map[string]any{ + "local_connect_timeout_ms": 123, + }, + }, + }, + })) + + // Expect both config sources to have terminated when the syncLoop errors. + select { + case _, ok := <-cfgSrcTerminated1: + cancelWatch1() + require.False(t, ok) + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout waiting for config-source termination 1") + } + select { + case _, ok := <-cfgSrcTerminated2: + cancelWatch2() + require.False(t, ok) + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout waiting for config-source termination 2") + } + + // Expect the snap channels to have been closed. + select { + case _, ok := <-snapCh: + require.False(t, ok) + case <-time.After(100 * time.Millisecond): + t.Fatal("snap channel was not closed") + } +} + func TestConfigSource_NotProxyService(t *testing.T) { serviceID := structs.NewServiceID("web", nil) nodeName := "node-name" @@ -279,7 +440,7 @@ func TestConfigSource_NotProxyService(t *testing.T) { }) t.Cleanup(mgr.Shutdown) - _, _, _, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) + _, _, _, _, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) require.Error(t, err) require.Contains(t, err.Error(), "must be a sidecar proxy or gateway") require.True(t, canceledWatch, "watch should've been canceled") @@ -295,7 +456,7 @@ func TestConfigSource_SessionLimiterError(t *testing.T) { }) t.Cleanup(src.Shutdown) - _, _, _, err := src.Watch( + _, _, _, _, err := src.Watch( rtest.Resource(pbmesh.ProxyConfigurationType, "web-sidecar-proxy-1").ID(), "node-name", "token", @@ -303,7 +464,7 @@ func TestConfigSource_SessionLimiterError(t *testing.T) { require.Equal(t, limiter.ErrCapacityReached, err) } -func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName string, token string) ConfigManager { +func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName string, token string) *MockConfigManager { t.Helper() cfgMgr := NewMockConfigManager(t) diff --git a/agent/proxycfg-sources/catalog/mock_ConfigManager.go b/agent/proxycfg-sources/catalog/mock_ConfigManager.go index 37deffb022..2c1608f241 100644 --- a/agent/proxycfg-sources/catalog/mock_ConfigManager.go +++ b/agent/proxycfg-sources/catalog/mock_ConfigManager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.33.1. DO NOT EDIT. +// Code generated by mockery v2.37.1. DO NOT EDIT. package catalog diff --git a/agent/proxycfg-sources/catalog/mock_SessionLimiter.go b/agent/proxycfg-sources/catalog/mock_SessionLimiter.go index 39cd430f06..baa2d3591e 100644 --- a/agent/proxycfg-sources/catalog/mock_SessionLimiter.go +++ b/agent/proxycfg-sources/catalog/mock_SessionLimiter.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.33.1. DO NOT EDIT. +// Code generated by mockery v2.37.1. DO NOT EDIT. package catalog diff --git a/agent/proxycfg-sources/catalog/mock_Watcher.go b/agent/proxycfg-sources/catalog/mock_Watcher.go index b77be5d98e..f77ca13283 100644 --- a/agent/proxycfg-sources/catalog/mock_Watcher.go +++ b/agent/proxycfg-sources/catalog/mock_Watcher.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.33.1. DO NOT EDIT. +// Code generated by mockery v2.37.1. DO NOT EDIT. package catalog @@ -8,6 +8,8 @@ import ( pbresource "github.com/hashicorp/consul/proto-public/pbresource" + proxycfg "github.com/hashicorp/consul/agent/proxycfg" + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" ) @@ -17,14 +19,15 @@ type MockWatcher struct { } // Watch provides a mock function with given fields: proxyID, nodeName, token -func (_m *MockWatcher) Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) { +func (_m *MockWatcher) Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.SrcTerminatedChan, proxysnapshot.CancelFunc, error) { ret := _m.Called(proxyID, nodeName, token) var r0 <-chan proxysnapshot.ProxySnapshot var r1 limiter.SessionTerminatedChan - var r2 proxysnapshot.CancelFunc - var r3 error - if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error)); ok { + var r2 proxycfg.SrcTerminatedChan + var r3 proxysnapshot.CancelFunc + var r4 error + if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.SrcTerminatedChan, proxysnapshot.CancelFunc, error)); ok { return rf(proxyID, nodeName, token) } if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) <-chan proxysnapshot.ProxySnapshot); ok { @@ -43,21 +46,29 @@ func (_m *MockWatcher) Watch(proxyID *pbresource.ID, nodeName string, token stri } } - if rf, ok := ret.Get(2).(func(*pbresource.ID, string, string) proxysnapshot.CancelFunc); ok { + if rf, ok := ret.Get(2).(func(*pbresource.ID, string, string) proxycfg.SrcTerminatedChan); ok { r2 = rf(proxyID, nodeName, token) } else { if ret.Get(2) != nil { - r2 = ret.Get(2).(proxysnapshot.CancelFunc) + r2 = ret.Get(2).(proxycfg.SrcTerminatedChan) } } - if rf, ok := ret.Get(3).(func(*pbresource.ID, string, string) error); ok { + if rf, ok := ret.Get(3).(func(*pbresource.ID, string, string) proxysnapshot.CancelFunc); ok { r3 = rf(proxyID, nodeName, token) } else { - r3 = ret.Error(3) + if ret.Get(3) != nil { + r3 = ret.Get(3).(proxysnapshot.CancelFunc) + } } - return r0, r1, r2, r3 + if rf, ok := ret.Get(4).(func(*pbresource.ID, string, string) error); ok { + r4 = rf(proxyID, nodeName, token) + } else { + r4 = ret.Error(4) + } + + return r0, r1, r2, r3, r4 } // NewMockWatcher creates a new instance of MockWatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. diff --git a/agent/proxycfg-sources/local/config_source.go b/agent/proxycfg-sources/local/config_source.go index 7b3a835fb8..d30edc1b7b 100644 --- a/agent/proxycfg-sources/local/config_source.go +++ b/agent/proxycfg-sources/local/config_source.go @@ -23,8 +23,13 @@ func NewConfigSource(cfgMgr ConfigManager) *ConfigSource { return &ConfigSource{cfgMgr} } -func (m *ConfigSource) Watch(proxyID *pbresource.ID, nodeName string, _ string) (<-chan proxysnapshot.ProxySnapshot, - limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) { +func (m *ConfigSource) Watch(proxyID *pbresource.ID, nodeName string, _ string) ( + <-chan proxysnapshot.ProxySnapshot, + limiter.SessionTerminatedChan, + proxycfg.SrcTerminatedChan, + proxysnapshot.CancelFunc, + error, +) { serviceID := structs.NewServiceID(proxyID.Name, catalog.GetEnterpriseMetaFromResourceID(proxyID)) watchCh, cancelWatch := m.manager.Watch(proxycfg.ProxyID{ ServiceID: serviceID, @@ -36,5 +41,5 @@ func (m *ConfigSource) Watch(proxyID *pbresource.ID, nodeName string, _ string) // is checked before the watch is created). Token: "", }) - return watchCh, nil, cancelWatch, nil + return watchCh, nil, nil, cancelWatch, nil } diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index b01787f2c1..4d3dd6cbc7 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -5,10 +5,11 @@ package proxycfg import ( "errors" - "github.com/hashicorp/consul/lib/channels" "runtime/debug" "sync" + "github.com/hashicorp/consul/lib/channels" + "github.com/hashicorp/go-hclog" "golang.org/x/time/rate" @@ -38,6 +39,10 @@ type ProxyID struct { // from overwriting each other's registrations. type ProxySource string +// SrcTerminatedChan indicates that the config-source for the proxycfg is no longer running +// and will stop receiving updates when it is closed. +type SrcTerminatedChan <-chan struct{} + // Manager provides an API with which proxy services can be registered, and // coordinates the fetching (and refreshing) of intentions, upstreams, discovery // chain, certificates etc. diff --git a/agent/proxycfg_test.go b/agent/proxycfg_test.go index c8141e407f..568d616486 100644 --- a/agent/proxycfg_test.go +++ b/agent/proxycfg_test.go @@ -87,7 +87,7 @@ func TestAgent_local_proxycfg(t *testing.T) { // Prior to fixes in https://github.com/hashicorp/consul/pull/16497 // this call to Watch() would deadlock. var err error - ch, stc, cancel, err = cfg.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, sid.ID).ID(), a.config.NodeName, token) + ch, stc, _, cancel, err = cfg.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, sid.ID).ID(), a.config.NodeName, token) require.NoError(t, err) } select { diff --git a/agent/xds/delta.go b/agent/xds/delta.go index cd4d4fb164..266c9d76d8 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -43,6 +43,7 @@ import ( ) var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another") +var errConfigSyncError = status.Errorf(codes.Internal, "config-source sync loop terminated due to error") // xdsProtocolLegacyChildResend enables the legacy behavior for the `ensureChildResend` function. // This environment variable exists as an escape hatch so that users can disable the behavior, if needed. @@ -144,13 +145,14 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove // Loop state var ( - proxySnapshot proxysnapshot.ProxySnapshot - node *envoy_config_core_v3.Node - stateCh <-chan proxysnapshot.ProxySnapshot - drainCh limiter.SessionTerminatedChan - watchCancel func() - nonce uint64 // xDS requires a unique nonce to correlate response/request pairs - ready bool // set to true after the first snapshot arrives + proxySnapshot proxysnapshot.ProxySnapshot + node *envoy_config_core_v3.Node + stateCh <-chan proxysnapshot.ProxySnapshot + drainCh limiter.SessionTerminatedChan + cfgSrcTerminated proxycfg.SrcTerminatedChan + watchCancel func() + nonce uint64 // xDS requires a unique nonce to correlate response/request pairs + ready bool // set to true after the first snapshot arrives streamStartTime = time.Now() streamStartOnce sync.Once @@ -309,6 +311,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove resourceMap = newResourceMap currentVersions = newVersions ready = true + case <-cfgSrcTerminated: + // Ensure that we cancel and cleanup resources if the sync loop terminates for any reason. + // This is necessary to handle the scenario where an unexpected error occurs that the loop + // cannot recover from. + logger.Debug("config-source sync loop terminated due to error") + return errConfigSyncError } // Trigger state machine @@ -335,7 +343,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err) } - stateCh, drainCh, watchCancel, err = s.ProxyWatcher.Watch(proxyID, nodeName, options.Token) + stateCh, drainCh, cfgSrcTerminated, watchCancel, err = s.ProxyWatcher.Watch(proxyID, nodeName, options.Token) switch { case errors.Is(err, limiter.ErrCapacityReached): return errOverwhelmed diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 34150cc30f..ab170f9d66 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -1608,6 +1608,34 @@ func TestServer_DeltaAggregatedResources_v3_CapacityReached(t *testing.T) { } } +func TestServer_DeltaAggregatedResources_v3_CfgSrcTerminated(t *testing.T) { + aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil } + + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy + + sid := structs.NewServiceID("web-sidecar-proxy", nil) + + mgr.RegisterProxy(t, sid) + + snap := newTestSnapshot(t, nil, "", nil) + envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + InitialResourceVersions: mustMakeVersionMap(t, + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + mgr.CfgSrcTerminate(sid) + + select { + case err := <-errCh: + require.Error(t, err) + require.Equal(t, codes.Internal.String(), status.Code(err).String()) + require.Equal(t, errConfigSyncError, err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } +} + type capacityReachedLimiter struct{} func (capacityReachedLimiter) BeginSession() (limiter.Session, error) { diff --git a/agent/xds/server.go b/agent/xds/server.go index 45c11fa0b3..71f7724a7d 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -20,9 +20,10 @@ import ( "github.com/hashicorp/consul/acl" external "github.com/hashicorp/consul/agent/grpc-external" "github.com/hashicorp/consul/agent/grpc-external/limiter" + "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/xds/configfetcher" "github.com/hashicorp/consul/envoyextensions/xdscommon" - "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" + proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" "github.com/hashicorp/consul/proto-public/pbresource" ) @@ -83,7 +84,7 @@ type ACLResolverFunc func(id string) (acl.Authorizer, error) // ProxyConfigSource is the interface xds.Server requires to consume proxy // config updates. type ProxyWatcher interface { - Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) + Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.SrcTerminatedChan, proxysnapshot.CancelFunc, error) } // Server represents a gRPC server that can handle xDS requests from Envoy. All diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index 4b7fb90f4b..c2cd4e6e50 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -73,16 +73,18 @@ func newTestSnapshot( // testing. It also implements ConnectAuthz to allow control over authorization. type testManager struct { sync.Mutex - stateChans map[structs.ServiceID]chan proxysnapshot.ProxySnapshot - drainChans map[structs.ServiceID]chan struct{} - cancels chan structs.ServiceID + stateChans map[structs.ServiceID]chan proxysnapshot.ProxySnapshot + drainChans map[structs.ServiceID]chan struct{} + cfgSrcTerminateChans map[structs.ServiceID]chan struct{} + cancels chan structs.ServiceID } func newTestManager(t *testing.T) *testManager { return &testManager{ - stateChans: map[structs.ServiceID]chan proxysnapshot.ProxySnapshot{}, - drainChans: map[structs.ServiceID]chan struct{}{}, - cancels: make(chan structs.ServiceID, 10), + stateChans: map[structs.ServiceID]chan proxysnapshot.ProxySnapshot{}, + drainChans: map[structs.ServiceID]chan struct{}{}, + cfgSrcTerminateChans: map[structs.ServiceID]chan struct{}{}, + cancels: make(chan structs.ServiceID, 10), } } @@ -92,6 +94,7 @@ func (m *testManager) RegisterProxy(t *testing.T, proxyID structs.ServiceID) { defer m.Unlock() m.stateChans[proxyID] = make(chan proxysnapshot.ProxySnapshot, 1) m.drainChans[proxyID] = make(chan struct{}) + m.cfgSrcTerminateChans[proxyID] = make(chan struct{}) } // Deliver simulates a proxy registration @@ -121,9 +124,23 @@ func (m *testManager) DrainStreams(proxyID structs.ServiceID) { close(ch) } +// CfgSrcTerminate terminates any open streams for the given proxyID by indicating that the +// corresponding config-source terminated unexpectedly. +func (m *testManager) CfgSrcTerminate(proxyID structs.ServiceID) { + m.Lock() + defer m.Unlock() + + ch, ok := m.cfgSrcTerminateChans[proxyID] + if !ok { + ch = make(chan struct{}) + m.cfgSrcTerminateChans[proxyID] = ch + } + close(ch) +} + // Watch implements ConfigManager func (m *testManager) Watch(id *pbresource.ID, _ string, _ string) (<-chan proxysnapshot.ProxySnapshot, - limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) { + limiter.SessionTerminatedChan, proxycfg.SrcTerminatedChan, proxysnapshot.CancelFunc, error) { // Create service ID proxyID := structs.NewServiceID(id.Name, catalog.GetEnterpriseMetaFromResourceID(id)) m.Lock() @@ -133,12 +150,12 @@ func (m *testManager) Watch(id *pbresource.ID, _ string, _ string) (<-chan proxy drainCh := m.drainChans[proxyID] select { case <-drainCh: - return nil, nil, nil, limiter.ErrCapacityReached + return nil, nil, nil, nil, limiter.ErrCapacityReached default: } // ch might be nil but then it will just block forever - return m.stateChans[proxyID], drainCh, func() { + return m.stateChans[proxyID], drainCh, m.cfgSrcTerminateChans[proxyID], func() { m.cancels <- proxyID }, nil } diff --git a/internal/mesh/proxy-tracker/proxy_tracker.go b/internal/mesh/proxy-tracker/proxy_tracker.go index f60cacb5cd..f3d0c7fc80 100644 --- a/internal/mesh/proxy-tracker/proxy_tracker.go +++ b/internal/mesh/proxy-tracker/proxy_tracker.go @@ -6,12 +6,14 @@ package proxytracker import ( "errors" "fmt" - "github.com/hashicorp/consul/lib/channels" "sync" + "github.com/hashicorp/consul/lib/channels" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/consul/agent/grpc-external/limiter" + "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/internal/controller" proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" "github.com/hashicorp/consul/internal/resource" @@ -87,13 +89,17 @@ func NewProxyTracker(cfg ProxyTrackerConfig) *ProxyTracker { // Watch connects a proxy with ProxyTracker and returns the consumer a channel to receive updates, // a channel to notify of xDS terminated session, and a cancel function to cancel the watch. -func (pt *ProxyTracker) Watch(proxyID *pbresource.ID, - nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, - limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) { +func (pt *ProxyTracker) Watch(proxyID *pbresource.ID, nodeName string, token string) ( + <-chan proxysnapshot.ProxySnapshot, + limiter.SessionTerminatedChan, + proxycfg.SrcTerminatedChan, + proxysnapshot.CancelFunc, + error, +) { pt.config.Logger.Trace("watch initiated", "proxyID", proxyID, "nodeName", nodeName) if err := pt.validateWatchArgs(proxyID, nodeName); err != nil { pt.config.Logger.Error("args failed validation", err) - return nil, nil, nil, err + return nil, nil, nil, nil, err } // Begin a session with the xDS session concurrency limiter. // @@ -101,7 +107,7 @@ func (pt *ProxyTracker) Watch(proxyID *pbresource.ID, session, err := pt.config.SessionLimiter.BeginSession() if err != nil { pt.config.Logger.Error("failed to begin session with xDS session concurrency limiter", err) - return nil, nil, nil, err + return nil, nil, nil, nil, err } // This buffering is crucial otherwise we'd block immediately trying to @@ -132,11 +138,11 @@ func (pt *ProxyTracker) Watch(proxyID *pbresource.ID, if err != nil { pt.config.Logger.Error("failed to notify controller of new proxy connection", err) pt.cancelWatchLocked(proxyReferenceKey, watchData.notifyCh, session) - return nil, nil, nil, err + return nil, nil, nil, nil, err } pt.config.Logger.Trace("controller notified of watch created", "proxyID", proxyID, "nodeName", nodeName) - return proxyStateChan, session.Terminated(), cancel, nil + return proxyStateChan, session.Terminated(), nil, cancel, nil } // notifyNewProxyChannel attempts to send a message to newProxyConnectionCh and will return an error if there's no receiver. diff --git a/internal/mesh/proxy-tracker/proxy_tracker_test.go b/internal/mesh/proxy-tracker/proxy_tracker_test.go index d6c1fc1ce5..8b3108a5a6 100644 --- a/internal/mesh/proxy-tracker/proxy_tracker_test.go +++ b/internal/mesh/proxy-tracker/proxy_tracker_test.go @@ -37,7 +37,7 @@ func TestProxyTracker_Watch(t *testing.T) { }) // Watch() - proxyStateChan, _, cancelFunc, err := pt.Watch(resourceID, "node 1", "token") + proxyStateChan, _, _, cancelFunc, err := pt.Watch(resourceID, "node 1", "token") require.NoError(t, err) // ensure New Proxy Connection message is sent @@ -91,7 +91,7 @@ func TestProxyTracker_Watch_ErrorConsumerNotReady(t *testing.T) { } // Watch() - proxyStateChan, sessionTerminatedCh, cancelFunc, err := pt.Watch(resourceID, "node 1", "token") + proxyStateChan, sessionTerminatedCh, _, cancelFunc, err := pt.Watch(resourceID, "node 1", "token") require.Nil(t, cancelFunc) require.Nil(t, proxyStateChan) require.Nil(t, sessionTerminatedCh) @@ -146,7 +146,7 @@ func TestProxyTracker_Watch_ArgValidationErrors(t *testing.T) { }) // Watch() - proxyStateChan, sessionTerminateCh, cancelFunc, err := pt.Watch(tc.proxyID, tc.nodeName, tc.token) + proxyStateChan, sessionTerminateCh, _, cancelFunc, err := pt.Watch(tc.proxyID, tc.nodeName, tc.token) require.Error(t, err) require.Equal(t, tc.expectedError, err) require.Nil(t, proxyStateChan) @@ -166,7 +166,7 @@ func TestProxyTracker_Watch_SessionLimiterError(t *testing.T) { }) // Watch() - proxyStateChan, sessionTerminateCh, cancelFunc, err := pt.Watch(resourceID, "node 1", "token") + proxyStateChan, sessionTerminateCh, _, cancelFunc, err := pt.Watch(resourceID, "node 1", "token") require.Error(t, err) require.Equal(t, "kaboom", err.Error()) require.Nil(t, proxyStateChan) @@ -190,7 +190,7 @@ func TestProxyTracker_PushChange(t *testing.T) { }) // Watch() - proxyStateChan, _, _, err := pt.Watch(resourceID, "node 1", "token") + proxyStateChan, _, _, _, err := pt.Watch(resourceID, "node 1", "token") require.NoError(t, err) // PushChange @@ -255,7 +255,7 @@ func TestProxyTracker_ProxyConnectedToServer(t *testing.T) { session.On("Terminated").Return(channel).Maybe() session.On("End").Return().Maybe() limiter.On("BeginSession").Return(session, nil) - _, _, _, _ = pt.Watch(resourceID, "node 1", "token") + _, _, _, _, _ = pt.Watch(resourceID, "node 1", "token") }, }, } @@ -294,7 +294,7 @@ func TestProxyTracker_Shutdown(t *testing.T) { }) // Watch() - proxyStateChan, _, _, err := pt.Watch(resourceID, "node 1", "token") + proxyStateChan, _, _, _, err := pt.Watch(resourceID, "node 1", "token") require.NoError(t, err) pt.Shutdown()