Fix xDS deadlock due to syncLoop termination. (#20867)

* Fix xDS deadlock due to syncLoop termination.

This fixes an issue where agentless xDS streams can deadlock permanently until
a server is restarted. When this issue occurs, no new proxies are able to
successfully connect to the server.

Effectively, the trigger for this deadlock stems from the following return
statement:
https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L199-L202

When this happens, the entire `syncLoop()` terminates and stops consuming from
the following channel:
https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L182-L192

Which results in the `ConfigSource.cleanup()` function never receiving a
response and holding a mutex indefinitely:
https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L241-L247

Because this mutex is shared, it effectively deadlocks the server's ability to
process new xDS streams.

----

The fix to this issue involves removing the `chan chan struct{}` used like an
RPC-over-channels pattern and replacing it with two distinct channels:

+ `stopSyncLoopCh` - indicates that the `syncLoop()` should terminate soon.  +
`syncLoopDoneCh` - indicates that the `syncLoop()` has terminated.

Splitting these two concepts out and deferring a `close(syncLoopDoneCh)` in the
`syncLoop()` function ensures that the deadlock above should no longer occur.

We also now evict xDS connections of all proxies for the corresponding
`syncLoop()` whenever it encounters an irrecoverable error. This is done by
hoisting the new `syncLoopDoneCh` upwards so that it's visible to the xDS delta
processing. Prior to this fix, the behavior was to simply orphan them so they
would never receive catalog-registration or service-defaults updates.

* Add changelog.
This commit is contained in:
Derek Menteer 2024-03-15 13:57:11 -05:00 committed by GitHub
parent eabff257d7
commit 0ac8ae6c3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 353 additions and 98 deletions

3
.changelog/20867.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
connect: Fix xDS deadlock that could result in proxies being unable to start.
```

View File

@ -34,9 +34,12 @@ type ConfigSource struct {
shutdownCh chan struct{}
}
var _ Watcher = (*ConfigSource)(nil)
type watch struct {
numWatchers int // guarded by ConfigSource.mu.
closeCh chan chan struct{}
numWatchers int // guarded by ConfigSource.mu.
stopSyncLoopCh chan struct{}
syncLoopDoneCh chan struct{}
}
// NewConfigSource creates a ConfigSource with the given configuration.
@ -50,7 +53,7 @@ func NewConfigSource(cfg Config) *ConfigSource {
// Watch wraps the underlying proxycfg.Manager and dynamically registers
// services from the catalog with it when requested by the xDS server.
func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) {
func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.SrcTerminatedChan, proxysnapshot.CancelFunc, error) {
// Create service ID
serviceID := structs.NewServiceID(id.Name, GetEnterpriseMetaFromResourceID(id))
// If the service is registered to the local agent, use the LocalConfigSource
@ -67,7 +70,7 @@ func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (
// See: https://github.com/hashicorp/consul/issues/15753
session, err := m.SessionLimiter.BeginSession()
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
proxyID := proxycfg.ProxyID{
@ -79,6 +82,28 @@ func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (
// Start the watch on the real proxycfg Manager.
snapCh, cancelWatch := m.Manager.Watch(proxyID)
m.mu.Lock()
defer m.mu.Unlock()
w, ok := m.watches[proxyID]
if ok {
w.numWatchers++
} else {
w = &watch{
numWatchers: 1,
stopSyncLoopCh: make(chan struct{}),
syncLoopDoneCh: make(chan struct{}),
}
m.watches[proxyID] = w
if err := m.startSync(w.stopSyncLoopCh, w.syncLoopDoneCh, proxyID); err != nil {
delete(m.watches, proxyID)
cancelWatch()
session.End()
return nil, nil, nil, nil, err
}
}
// Wrap the cancelWatch function with our bookkeeping. m.mu must be held when calling.
var cancelOnce sync.Once
cancel := func() {
@ -88,26 +113,7 @@ func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (
session.End()
})
}
m.mu.Lock()
defer m.mu.Unlock()
w, ok := m.watches[proxyID]
if ok {
w.numWatchers++
} else {
w = &watch{closeCh: make(chan chan struct{}), numWatchers: 1}
m.watches[proxyID] = w
if err := m.startSync(w.closeCh, proxyID); err != nil {
delete(m.watches, proxyID)
cancelWatch()
session.End()
return nil, nil, nil, err
}
}
return snapCh, session.Terminated(), cancel, nil
return snapCh, session.Terminated(), w.syncLoopDoneCh, cancel, nil
}
func (m *ConfigSource) Shutdown() {
@ -122,7 +128,11 @@ func (m *ConfigSource) Shutdown() {
//
// If the first attempt to fetch and register the service fails, startSync
// will return an error (and no goroutine will be started).
func (m *ConfigSource) startSync(closeCh <-chan chan struct{}, proxyID proxycfg.ProxyID) error {
func (m *ConfigSource) startSync(
stopSyncLoopCh <-chan struct{},
syncLoopDoneCh chan<- struct{},
proxyID proxycfg.ProxyID,
) error {
logger := m.Logger.With(
"proxy_service_id", proxyID.ServiceID.String(),
"node", proxyID.NodeName,
@ -170,7 +180,13 @@ func (m *ConfigSource) startSync(closeCh <-chan chan struct{}, proxyID proxycfg.
syncLoop := func(ws memdb.WatchSet) {
// Cancel the context on return to clean up the goroutine started by WatchCh.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
defer func() {
cancel()
logger.Debug("de-registering service with proxycfg manager because all watchers have gone away")
m.Manager.Deregister(proxyID, source)
close(syncLoopDoneCh)
logger.Debug("sync-loop terminated")
}()
for {
select {
@ -179,16 +195,13 @@ func (m *ConfigSource) startSync(closeCh <-chan chan struct{}, proxyID proxycfg.
//
// It is expected that all other branches of this select will return and
// cancel the context given to WatchCh (to clean up its goroutine).
case doneCh := <-closeCh:
case <-stopSyncLoopCh:
// All watchers of this service (xDS streams) have gone away, so it's time
// to free its resources.
//
// TODO(agentless): we should probably wait for a short grace period before
// de-registering the service to allow clients to reconnect after a network
// blip.
logger.Trace("de-registering service with proxycfg manager because all watchers have gone away")
m.Manager.Deregister(proxyID, source)
close(doneCh)
return
case <-m.shutdownCh:
// Manager is shutting down, stop the goroutine.
@ -198,6 +211,7 @@ func (m *ConfigSource) startSync(closeCh <-chan chan struct{}, proxyID proxycfg.
var err error
ws, err = fetchAndRegister()
if err != nil {
logger.Debug("error in syncLoop.fetchAndRegister", "err", err)
return
}
}
@ -233,18 +247,14 @@ func (m *ConfigSource) cleanup(id proxycfg.ProxyID) {
h.numWatchers--
if h.numWatchers == 0 {
// We wait for doneCh to be closed by the sync goroutine, so that the lock is
// Notify the sync loop that it should terminate.
close(h.stopSyncLoopCh)
// We wait for sync loop to be closed, so that the lock is
// held until after the service is de-registered - this prevents a potential
// race where another sync goroutine is started for the service and we undo
// its call to register the service.
//
// This cannot deadlock because closeCh is unbuffered. Sending will only
// succeed if the sync goroutine is ready to receive (which always closes
// doneCh).
doneCh := make(chan struct{})
select {
case h.closeCh <- doneCh:
<-doneCh
case <-h.syncLoopDoneCh:
case <-m.shutdownCh:
// ConfigSource is shutting down, so the goroutine will be stopped anyway.
}
@ -293,7 +303,7 @@ type Store interface {
//go:generate mockery --name Watcher --inpackage
type Watcher interface {
Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error)
Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.SrcTerminatedChan, proxysnapshot.CancelFunc, error)
}
//go:generate mockery --name SessionLimiter --inpackage

View File

@ -6,6 +6,7 @@ package catalog
import (
"context"
"errors"
"fmt"
"testing"
"time"
@ -79,7 +80,7 @@ func TestConfigSource_Success(t *testing.T) {
})
t.Cleanup(mgr.Shutdown)
snapCh, termCh, cancelWatch1, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
snapCh, termCh, _, cancelWatch1, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
require.NoError(t, err)
require.Equal(t, session1TermCh, termCh)
@ -136,7 +137,7 @@ func TestConfigSource_Success(t *testing.T) {
}
// Start another watch.
_, termCh2, cancelWatch2, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
_, termCh2, _, cancelWatch2, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
require.NoError(t, err)
require.Equal(t, session2TermCh, termCh2)
@ -179,7 +180,7 @@ func TestConfigSource_LocallyManagedService(t *testing.T) {
localWatcher := NewMockWatcher(t)
localWatcher.On("Watch", proxyID, nodeName, token).
Return(make(<-chan proxysnapshot.ProxySnapshot), nil, proxysnapshot.CancelFunc(func() {}), nil)
Return(make(<-chan proxysnapshot.ProxySnapshot), nil, nil, proxysnapshot.CancelFunc(func() {}), nil)
mgr := NewConfigSource(Config{
NodeName: nodeName,
@ -191,7 +192,7 @@ func TestConfigSource_LocallyManagedService(t *testing.T) {
})
t.Cleanup(mgr.Shutdown)
_, _, _, err := mgr.Watch(proxyID, nodeName, token)
_, _, _, _, err := mgr.Watch(proxyID, nodeName, token)
require.NoError(t, err)
}
@ -238,13 +239,173 @@ func TestConfigSource_ErrorRegisteringService(t *testing.T) {
})
t.Cleanup(mgr.Shutdown)
_, _, _, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
_, _, _, _, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
require.Error(t, err)
require.True(t, canceledWatch, "watch should've been canceled")
session.AssertCalled(t, "End")
}
func TestConfigSource_ErrorInSyncLoop(t *testing.T) {
serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil)
nodeName := "node-name"
token := "token"
store := testStateStore(t)
// Register the proxy in the catalog/state store at port 9999.
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web-sidecar-proxy",
Port: 9999,
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
Config: map[string]any{
"local_connect_timeout_ms": 123,
},
},
},
}))
cfgMgr := NewMockConfigManager(t)
{
proxyID := proxycfg.ProxyID{
ServiceID: serviceID,
NodeName: nodeName,
Token: token,
}
snapCh := make(chan proxysnapshot.ProxySnapshot, 1)
cfgMgr.On("Watch", proxyID).
Return((<-chan proxysnapshot.ProxySnapshot)(snapCh), proxysnapshot.CancelFunc(func() {}), nil)
// Answer the register call successfully for session 1 starting (Repeatability = 1).
// Session 2 should not have caused a re-register to happen.
cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false).
Run(func(args mock.Arguments) {
id := args.Get(0).(proxycfg.ProxyID)
ns := args.Get(1).(*structs.NodeService)
snapCh <- &proxycfg.ConfigSnapshot{
ProxyID: id,
Port: ns.Port,
Proxy: ns.Proxy,
}
}).
Return(nil).
Repeatability = 1
// Error on subsequent registrations afterwards (during the sync loop).
cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false).
Return(fmt.Errorf("intentional registration error"))
cfgMgr.On("Deregister", proxyID, source).
Run(func(mock.Arguments) { close(snapCh) }).
Return()
}
lim := NewMockSessionLimiter(t)
session1TermCh := make(limiter.SessionTerminatedChan)
session2TermCh := make(limiter.SessionTerminatedChan)
{
session1 := newMockSession(t)
session1.On("Terminated").Return(session1TermCh)
session1.On("End").Return()
session2 := newMockSession(t)
session2.On("Terminated").Return(session2TermCh)
session2.On("End").Return()
lim.On("BeginSession").Return(session1, nil).Once()
lim.On("BeginSession").Return(session2, nil).Once()
}
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: lim,
})
t.Cleanup(mgr.Shutdown)
snapCh, termCh, cfgSrcTerminated1, cancelWatch1, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
require.NoError(t, err)
require.Equal(t, session1TermCh, termCh)
// Expect Register to have been called with the proxy's inital port.
select {
case snap := <-snapCh:
require.Equal(t, 9999, snap.(*proxycfg.ConfigSnapshot).Port)
require.Equal(t, token, snap.(*proxycfg.ConfigSnapshot).ProxyID.Token)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for snapshot")
}
// Start another watch.
_, termCh2, cfgSrcTerminated2, cancelWatch2, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
require.NoError(t, err)
require.Equal(t, session2TermCh, termCh2)
// Expect the service to have not been re-registered by the second watch.
select {
case <-snapCh:
t.Fatal("service shouldn't have been re-registered")
case <-time.After(100 * time.Millisecond):
}
// Ensure that no config-source syncLoops were terminated.
select {
case <-cfgSrcTerminated1:
t.Fatal("unexpected config-source termination 1")
case <-cfgSrcTerminated2:
t.Fatal("unexpected config-source termination 2")
default:
}
// Update the proxy's port to 8888.
// This should trigger the config-source syncLoop termination channel due to an error.
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web-sidecar-proxy",
Port: 8888,
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
Config: map[string]any{
"local_connect_timeout_ms": 123,
},
},
},
}))
// Expect both config sources to have terminated when the syncLoop errors.
select {
case _, ok := <-cfgSrcTerminated1:
cancelWatch1()
require.False(t, ok)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for config-source termination 1")
}
select {
case _, ok := <-cfgSrcTerminated2:
cancelWatch2()
require.False(t, ok)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for config-source termination 2")
}
// Expect the snap channels to have been closed.
select {
case _, ok := <-snapCh:
require.False(t, ok)
case <-time.After(100 * time.Millisecond):
t.Fatal("snap channel was not closed")
}
}
func TestConfigSource_NotProxyService(t *testing.T) {
serviceID := structs.NewServiceID("web", nil)
nodeName := "node-name"
@ -279,7 +440,7 @@ func TestConfigSource_NotProxyService(t *testing.T) {
})
t.Cleanup(mgr.Shutdown)
_, _, _, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
_, _, _, _, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
require.Error(t, err)
require.Contains(t, err.Error(), "must be a sidecar proxy or gateway")
require.True(t, canceledWatch, "watch should've been canceled")
@ -295,7 +456,7 @@ func TestConfigSource_SessionLimiterError(t *testing.T) {
})
t.Cleanup(src.Shutdown)
_, _, _, err := src.Watch(
_, _, _, _, err := src.Watch(
rtest.Resource(pbmesh.ProxyConfigurationType, "web-sidecar-proxy-1").ID(),
"node-name",
"token",
@ -303,7 +464,7 @@ func TestConfigSource_SessionLimiterError(t *testing.T) {
require.Equal(t, limiter.ErrCapacityReached, err)
}
func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName string, token string) ConfigManager {
func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName string, token string) *MockConfigManager {
t.Helper()
cfgMgr := NewMockConfigManager(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.33.1. DO NOT EDIT.
// Code generated by mockery v2.37.1. DO NOT EDIT.
package catalog

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.33.1. DO NOT EDIT.
// Code generated by mockery v2.37.1. DO NOT EDIT.
package catalog

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.33.1. DO NOT EDIT.
// Code generated by mockery v2.37.1. DO NOT EDIT.
package catalog
@ -8,6 +8,8 @@ import (
pbresource "github.com/hashicorp/consul/proto-public/pbresource"
proxycfg "github.com/hashicorp/consul/agent/proxycfg"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
)
@ -17,14 +19,15 @@ type MockWatcher struct {
}
// Watch provides a mock function with given fields: proxyID, nodeName, token
func (_m *MockWatcher) Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) {
func (_m *MockWatcher) Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.SrcTerminatedChan, proxysnapshot.CancelFunc, error) {
ret := _m.Called(proxyID, nodeName, token)
var r0 <-chan proxysnapshot.ProxySnapshot
var r1 limiter.SessionTerminatedChan
var r2 proxysnapshot.CancelFunc
var r3 error
if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error)); ok {
var r2 proxycfg.SrcTerminatedChan
var r3 proxysnapshot.CancelFunc
var r4 error
if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.SrcTerminatedChan, proxysnapshot.CancelFunc, error)); ok {
return rf(proxyID, nodeName, token)
}
if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) <-chan proxysnapshot.ProxySnapshot); ok {
@ -43,21 +46,29 @@ func (_m *MockWatcher) Watch(proxyID *pbresource.ID, nodeName string, token stri
}
}
if rf, ok := ret.Get(2).(func(*pbresource.ID, string, string) proxysnapshot.CancelFunc); ok {
if rf, ok := ret.Get(2).(func(*pbresource.ID, string, string) proxycfg.SrcTerminatedChan); ok {
r2 = rf(proxyID, nodeName, token)
} else {
if ret.Get(2) != nil {
r2 = ret.Get(2).(proxysnapshot.CancelFunc)
r2 = ret.Get(2).(proxycfg.SrcTerminatedChan)
}
}
if rf, ok := ret.Get(3).(func(*pbresource.ID, string, string) error); ok {
if rf, ok := ret.Get(3).(func(*pbresource.ID, string, string) proxysnapshot.CancelFunc); ok {
r3 = rf(proxyID, nodeName, token)
} else {
r3 = ret.Error(3)
if ret.Get(3) != nil {
r3 = ret.Get(3).(proxysnapshot.CancelFunc)
}
}
return r0, r1, r2, r3
if rf, ok := ret.Get(4).(func(*pbresource.ID, string, string) error); ok {
r4 = rf(proxyID, nodeName, token)
} else {
r4 = ret.Error(4)
}
return r0, r1, r2, r3, r4
}
// NewMockWatcher creates a new instance of MockWatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

View File

@ -23,8 +23,13 @@ func NewConfigSource(cfgMgr ConfigManager) *ConfigSource {
return &ConfigSource{cfgMgr}
}
func (m *ConfigSource) Watch(proxyID *pbresource.ID, nodeName string, _ string) (<-chan proxysnapshot.ProxySnapshot,
limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) {
func (m *ConfigSource) Watch(proxyID *pbresource.ID, nodeName string, _ string) (
<-chan proxysnapshot.ProxySnapshot,
limiter.SessionTerminatedChan,
proxycfg.SrcTerminatedChan,
proxysnapshot.CancelFunc,
error,
) {
serviceID := structs.NewServiceID(proxyID.Name, catalog.GetEnterpriseMetaFromResourceID(proxyID))
watchCh, cancelWatch := m.manager.Watch(proxycfg.ProxyID{
ServiceID: serviceID,
@ -36,5 +41,5 @@ func (m *ConfigSource) Watch(proxyID *pbresource.ID, nodeName string, _ string)
// is checked before the watch is created).
Token: "",
})
return watchCh, nil, cancelWatch, nil
return watchCh, nil, nil, cancelWatch, nil
}

View File

@ -5,10 +5,11 @@ package proxycfg
import (
"errors"
"github.com/hashicorp/consul/lib/channels"
"runtime/debug"
"sync"
"github.com/hashicorp/consul/lib/channels"
"github.com/hashicorp/go-hclog"
"golang.org/x/time/rate"
@ -38,6 +39,10 @@ type ProxyID struct {
// from overwriting each other's registrations.
type ProxySource string
// SrcTerminatedChan indicates that the config-source for the proxycfg is no longer running
// and will stop receiving updates when it is closed.
type SrcTerminatedChan <-chan struct{}
// Manager provides an API with which proxy services can be registered, and
// coordinates the fetching (and refreshing) of intentions, upstreams, discovery
// chain, certificates etc.

View File

@ -87,7 +87,7 @@ func TestAgent_local_proxycfg(t *testing.T) {
// Prior to fixes in https://github.com/hashicorp/consul/pull/16497
// this call to Watch() would deadlock.
var err error
ch, stc, cancel, err = cfg.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, sid.ID).ID(), a.config.NodeName, token)
ch, stc, _, cancel, err = cfg.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, sid.ID).ID(), a.config.NodeName, token)
require.NoError(t, err)
}
select {

View File

@ -43,6 +43,7 @@ import (
)
var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another")
var errConfigSyncError = status.Errorf(codes.Internal, "config-source sync loop terminated due to error")
// xdsProtocolLegacyChildResend enables the legacy behavior for the `ensureChildResend` function.
// This environment variable exists as an escape hatch so that users can disable the behavior, if needed.
@ -144,13 +145,14 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
// Loop state
var (
proxySnapshot proxysnapshot.ProxySnapshot
node *envoy_config_core_v3.Node
stateCh <-chan proxysnapshot.ProxySnapshot
drainCh limiter.SessionTerminatedChan
watchCancel func()
nonce uint64 // xDS requires a unique nonce to correlate response/request pairs
ready bool // set to true after the first snapshot arrives
proxySnapshot proxysnapshot.ProxySnapshot
node *envoy_config_core_v3.Node
stateCh <-chan proxysnapshot.ProxySnapshot
drainCh limiter.SessionTerminatedChan
cfgSrcTerminated proxycfg.SrcTerminatedChan
watchCancel func()
nonce uint64 // xDS requires a unique nonce to correlate response/request pairs
ready bool // set to true after the first snapshot arrives
streamStartTime = time.Now()
streamStartOnce sync.Once
@ -309,6 +311,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
resourceMap = newResourceMap
currentVersions = newVersions
ready = true
case <-cfgSrcTerminated:
// Ensure that we cancel and cleanup resources if the sync loop terminates for any reason.
// This is necessary to handle the scenario where an unexpected error occurs that the loop
// cannot recover from.
logger.Debug("config-source sync loop terminated due to error")
return errConfigSyncError
}
// Trigger state machine
@ -335,7 +343,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err)
}
stateCh, drainCh, watchCancel, err = s.ProxyWatcher.Watch(proxyID, nodeName, options.Token)
stateCh, drainCh, cfgSrcTerminated, watchCancel, err = s.ProxyWatcher.Watch(proxyID, nodeName, options.Token)
switch {
case errors.Is(err, limiter.ErrCapacityReached):
return errOverwhelmed

View File

@ -1608,6 +1608,34 @@ func TestServer_DeltaAggregatedResources_v3_CapacityReached(t *testing.T) {
}
}
func TestServer_DeltaAggregatedResources_v3_CfgSrcTerminated(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
mgr.RegisterProxy(t, sid)
snap := newTestSnapshot(t, nil, "", nil)
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
InitialResourceVersions: mustMakeVersionMap(t,
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
mgr.CfgSrcTerminate(sid)
select {
case err := <-errCh:
require.Error(t, err)
require.Equal(t, codes.Internal.String(), status.Code(err).String())
require.Equal(t, errConfigSyncError, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}
type capacityReachedLimiter struct{}
func (capacityReachedLimiter) BeginSession() (limiter.Session, error) {

View File

@ -20,9 +20,10 @@ import (
"github.com/hashicorp/consul/acl"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/xds/configfetcher"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto-public/pbresource"
)
@ -83,7 +84,7 @@ type ACLResolverFunc func(id string) (acl.Authorizer, error)
// ProxyConfigSource is the interface xds.Server requires to consume proxy
// config updates.
type ProxyWatcher interface {
Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error)
Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.SrcTerminatedChan, proxysnapshot.CancelFunc, error)
}
// Server represents a gRPC server that can handle xDS requests from Envoy. All

View File

@ -73,16 +73,18 @@ func newTestSnapshot(
// testing. It also implements ConnectAuthz to allow control over authorization.
type testManager struct {
sync.Mutex
stateChans map[structs.ServiceID]chan proxysnapshot.ProxySnapshot
drainChans map[structs.ServiceID]chan struct{}
cancels chan structs.ServiceID
stateChans map[structs.ServiceID]chan proxysnapshot.ProxySnapshot
drainChans map[structs.ServiceID]chan struct{}
cfgSrcTerminateChans map[structs.ServiceID]chan struct{}
cancels chan structs.ServiceID
}
func newTestManager(t *testing.T) *testManager {
return &testManager{
stateChans: map[structs.ServiceID]chan proxysnapshot.ProxySnapshot{},
drainChans: map[structs.ServiceID]chan struct{}{},
cancels: make(chan structs.ServiceID, 10),
stateChans: map[structs.ServiceID]chan proxysnapshot.ProxySnapshot{},
drainChans: map[structs.ServiceID]chan struct{}{},
cfgSrcTerminateChans: map[structs.ServiceID]chan struct{}{},
cancels: make(chan structs.ServiceID, 10),
}
}
@ -92,6 +94,7 @@ func (m *testManager) RegisterProxy(t *testing.T, proxyID structs.ServiceID) {
defer m.Unlock()
m.stateChans[proxyID] = make(chan proxysnapshot.ProxySnapshot, 1)
m.drainChans[proxyID] = make(chan struct{})
m.cfgSrcTerminateChans[proxyID] = make(chan struct{})
}
// Deliver simulates a proxy registration
@ -121,9 +124,23 @@ func (m *testManager) DrainStreams(proxyID structs.ServiceID) {
close(ch)
}
// CfgSrcTerminate terminates any open streams for the given proxyID by indicating that the
// corresponding config-source terminated unexpectedly.
func (m *testManager) CfgSrcTerminate(proxyID structs.ServiceID) {
m.Lock()
defer m.Unlock()
ch, ok := m.cfgSrcTerminateChans[proxyID]
if !ok {
ch = make(chan struct{})
m.cfgSrcTerminateChans[proxyID] = ch
}
close(ch)
}
// Watch implements ConfigManager
func (m *testManager) Watch(id *pbresource.ID, _ string, _ string) (<-chan proxysnapshot.ProxySnapshot,
limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) {
limiter.SessionTerminatedChan, proxycfg.SrcTerminatedChan, proxysnapshot.CancelFunc, error) {
// Create service ID
proxyID := structs.NewServiceID(id.Name, catalog.GetEnterpriseMetaFromResourceID(id))
m.Lock()
@ -133,12 +150,12 @@ func (m *testManager) Watch(id *pbresource.ID, _ string, _ string) (<-chan proxy
drainCh := m.drainChans[proxyID]
select {
case <-drainCh:
return nil, nil, nil, limiter.ErrCapacityReached
return nil, nil, nil, nil, limiter.ErrCapacityReached
default:
}
// ch might be nil but then it will just block forever
return m.stateChans[proxyID], drainCh, func() {
return m.stateChans[proxyID], drainCh, m.cfgSrcTerminateChans[proxyID], func() {
m.cancels <- proxyID
}, nil
}

View File

@ -6,12 +6,14 @@ package proxytracker
import (
"errors"
"fmt"
"github.com/hashicorp/consul/lib/channels"
"sync"
"github.com/hashicorp/consul/lib/channels"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/internal/controller"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/internal/resource"
@ -87,13 +89,17 @@ func NewProxyTracker(cfg ProxyTrackerConfig) *ProxyTracker {
// Watch connects a proxy with ProxyTracker and returns the consumer a channel to receive updates,
// a channel to notify of xDS terminated session, and a cancel function to cancel the watch.
func (pt *ProxyTracker) Watch(proxyID *pbresource.ID,
nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot,
limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) {
func (pt *ProxyTracker) Watch(proxyID *pbresource.ID, nodeName string, token string) (
<-chan proxysnapshot.ProxySnapshot,
limiter.SessionTerminatedChan,
proxycfg.SrcTerminatedChan,
proxysnapshot.CancelFunc,
error,
) {
pt.config.Logger.Trace("watch initiated", "proxyID", proxyID, "nodeName", nodeName)
if err := pt.validateWatchArgs(proxyID, nodeName); err != nil {
pt.config.Logger.Error("args failed validation", err)
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
// Begin a session with the xDS session concurrency limiter.
//
@ -101,7 +107,7 @@ func (pt *ProxyTracker) Watch(proxyID *pbresource.ID,
session, err := pt.config.SessionLimiter.BeginSession()
if err != nil {
pt.config.Logger.Error("failed to begin session with xDS session concurrency limiter", err)
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
// This buffering is crucial otherwise we'd block immediately trying to
@ -132,11 +138,11 @@ func (pt *ProxyTracker) Watch(proxyID *pbresource.ID,
if err != nil {
pt.config.Logger.Error("failed to notify controller of new proxy connection", err)
pt.cancelWatchLocked(proxyReferenceKey, watchData.notifyCh, session)
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
pt.config.Logger.Trace("controller notified of watch created", "proxyID", proxyID, "nodeName", nodeName)
return proxyStateChan, session.Terminated(), cancel, nil
return proxyStateChan, session.Terminated(), nil, cancel, nil
}
// notifyNewProxyChannel attempts to send a message to newProxyConnectionCh and will return an error if there's no receiver.

View File

@ -37,7 +37,7 @@ func TestProxyTracker_Watch(t *testing.T) {
})
// Watch()
proxyStateChan, _, cancelFunc, err := pt.Watch(resourceID, "node 1", "token")
proxyStateChan, _, _, cancelFunc, err := pt.Watch(resourceID, "node 1", "token")
require.NoError(t, err)
// ensure New Proxy Connection message is sent
@ -91,7 +91,7 @@ func TestProxyTracker_Watch_ErrorConsumerNotReady(t *testing.T) {
}
// Watch()
proxyStateChan, sessionTerminatedCh, cancelFunc, err := pt.Watch(resourceID, "node 1", "token")
proxyStateChan, sessionTerminatedCh, _, cancelFunc, err := pt.Watch(resourceID, "node 1", "token")
require.Nil(t, cancelFunc)
require.Nil(t, proxyStateChan)
require.Nil(t, sessionTerminatedCh)
@ -146,7 +146,7 @@ func TestProxyTracker_Watch_ArgValidationErrors(t *testing.T) {
})
// Watch()
proxyStateChan, sessionTerminateCh, cancelFunc, err := pt.Watch(tc.proxyID, tc.nodeName, tc.token)
proxyStateChan, sessionTerminateCh, _, cancelFunc, err := pt.Watch(tc.proxyID, tc.nodeName, tc.token)
require.Error(t, err)
require.Equal(t, tc.expectedError, err)
require.Nil(t, proxyStateChan)
@ -166,7 +166,7 @@ func TestProxyTracker_Watch_SessionLimiterError(t *testing.T) {
})
// Watch()
proxyStateChan, sessionTerminateCh, cancelFunc, err := pt.Watch(resourceID, "node 1", "token")
proxyStateChan, sessionTerminateCh, _, cancelFunc, err := pt.Watch(resourceID, "node 1", "token")
require.Error(t, err)
require.Equal(t, "kaboom", err.Error())
require.Nil(t, proxyStateChan)
@ -190,7 +190,7 @@ func TestProxyTracker_PushChange(t *testing.T) {
})
// Watch()
proxyStateChan, _, _, err := pt.Watch(resourceID, "node 1", "token")
proxyStateChan, _, _, _, err := pt.Watch(resourceID, "node 1", "token")
require.NoError(t, err)
// PushChange
@ -255,7 +255,7 @@ func TestProxyTracker_ProxyConnectedToServer(t *testing.T) {
session.On("Terminated").Return(channel).Maybe()
session.On("End").Return().Maybe()
limiter.On("BeginSession").Return(session, nil)
_, _, _, _ = pt.Watch(resourceID, "node 1", "token")
_, _, _, _, _ = pt.Watch(resourceID, "node 1", "token")
},
},
}
@ -294,7 +294,7 @@ func TestProxyTracker_Shutdown(t *testing.T) {
})
// Watch()
proxyStateChan, _, _, err := pt.Watch(resourceID, "node 1", "token")
proxyStateChan, _, _, _, err := pt.Watch(resourceID, "node 1", "token")
require.NoError(t, err)
pt.Shutdown()