consul/agent/proxycfg-sources/catalog/config_source_test.go

548 lines
15 KiB
Go
Raw Normal View History

// Copyright (c) HashiCorp, Inc.
[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
2023-08-11 13:12:13 +00:00
// SPDX-License-Identifier: BUSL-1.1
package catalog
import (
"context"
"errors"
Fix xDS deadlock due to syncLoop termination. (#20867) * Fix xDS deadlock due to syncLoop termination. This fixes an issue where agentless xDS streams can deadlock permanently until a server is restarted. When this issue occurs, no new proxies are able to successfully connect to the server. Effectively, the trigger for this deadlock stems from the following return statement: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L199-L202 When this happens, the entire `syncLoop()` terminates and stops consuming from the following channel: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L182-L192 Which results in the `ConfigSource.cleanup()` function never receiving a response and holding a mutex indefinitely: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L241-L247 Because this mutex is shared, it effectively deadlocks the server's ability to process new xDS streams. ---- The fix to this issue involves removing the `chan chan struct{}` used like an RPC-over-channels pattern and replacing it with two distinct channels: + `stopSyncLoopCh` - indicates that the `syncLoop()` should terminate soon. + `syncLoopDoneCh` - indicates that the `syncLoop()` has terminated. Splitting these two concepts out and deferring a `close(syncLoopDoneCh)` in the `syncLoop()` function ensures that the deadlock above should no longer occur. We also now evict xDS connections of all proxies for the corresponding `syncLoop()` whenever it encounters an irrecoverable error. This is done by hoisting the new `syncLoopDoneCh` upwards so that it's visible to the xDS delta processing. Prior to this fix, the behavior was to simply orphan them so they would never receive catalog-registration or service-defaults updates. * Add changelog.
2024-03-15 18:57:11 +00:00
"fmt"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
)
func TestConfigSource_Success(t *testing.T) {
serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil)
nodeName := "node-name"
token := "token"
store := testStateStore(t)
// Register the proxy in the catalog/state store at port 9999.
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web-sidecar-proxy",
Port: 9999,
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
Config: map[string]any{
"local_connect_timeout_ms": 123,
},
},
},
}))
// testConfigManager builds a ConfigManager that emits a ConfigSnapshot whenever
// Register is called, and closes the watch channel when Deregister is called.
//
// Though a little odd, this allows us to make assertions on the sync goroutine's
// behavior without sleeping which leads to slow/racy tests.
cfgMgr := testConfigManager(t, serviceID, nodeName, token)
lim := NewMockSessionLimiter(t)
session1 := newMockSession(t)
session1TermCh := make(limiter.SessionTerminatedChan)
session1.On("Terminated").Return(session1TermCh)
session1.On("End").Return()
session2 := newMockSession(t)
session2TermCh := make(limiter.SessionTerminatedChan)
session2.On("Terminated").Return(session2TermCh)
session2.On("End").Return()
lim.On("BeginSession").Return(session1, nil).Once()
lim.On("BeginSession").Return(session2, nil).Once()
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: lim,
})
t.Cleanup(mgr.Shutdown)
snapCh, termCh, _, cancelWatch1, err := mgr.Watch(serviceID, nodeName, token)
require.NoError(t, err)
require.Equal(t, session1TermCh, termCh)
// Expect Register to have been called with the proxy's inital port.
select {
case snap := <-snapCh:
require.Equal(t, 9999, snap.Port)
require.Equal(t, token, snap.ProxyID.Token)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for snapshot")
}
// Update the proxy's port to 8888.
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web-sidecar-proxy",
Port: 8888,
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
Config: map[string]any{
"local_connect_timeout_ms": 123,
},
},
},
}))
// Expect Register to have been called again with the proxy's new port.
select {
case snap := <-snapCh:
require.Equal(t, 8888, snap.Port)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for snapshot")
}
// Update proxy-defaults.
require.NoError(t, store.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
Name: structs.ProxyConfigGlobal,
Config: map[string]any{
"max_inbound_connections": 321,
},
}))
// Expect Register to have been called again with the new merged config.
select {
case snap := <-snapCh:
require.Equal(t, map[string]any{
"local_connect_timeout_ms": 123,
"max_inbound_connections": 321,
}, snap.Proxy.Config)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for snapshot")
}
// Start another watch.
_, termCh2, _, cancelWatch2, err := mgr.Watch(serviceID, nodeName, token)
require.NoError(t, err)
require.Equal(t, session2TermCh, termCh2)
// Expect the service to have not been re-registered by the second watch.
select {
case <-snapCh:
t.Fatal("service shouldn't have been re-registered")
case <-time.After(100 * time.Millisecond):
}
// Expect cancelling the first watch to *not* de-register the service.
cancelWatch1()
select {
case <-snapCh:
t.Fatal("service shouldn't have been de-registered until other watch went away")
case <-time.After(100 * time.Millisecond):
}
// Expect cancelling the other watch to de-register the service.
cancelWatch2()
select {
case _, ok := <-snapCh:
require.False(t, ok, "channel should've been closed")
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for service to be de-registered")
}
session1.AssertCalled(t, "End")
session2.AssertCalled(t, "End")
}
func TestConfigSource_LocallyManagedService(t *testing.T) {
serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil)
proxyID := serviceID
nodeName := "node-1"
token := "token"
localState := testLocalState(t)
localState.AddServiceWithChecks(&structs.NodeService{ID: serviceID.ID}, nil, "", false)
localWatcher := NewMockWatcher(t)
localWatcher.On("Watch", proxyID, nodeName, token).
Return(make(<-chan *proxycfg.ConfigSnapshot), nil, nil, context.CancelFunc(func() {}), nil)
mgr := NewConfigSource(Config{
NodeName: nodeName,
LocalState: localState,
LocalConfigSource: localWatcher,
Logger: hclog.NewNullLogger(),
GetStore: func() Store { panic("state store shouldn't have been used") },
SessionLimiter: nullSessionLimiter{},
})
t.Cleanup(mgr.Shutdown)
Fix xDS deadlock due to syncLoop termination. (#20867) * Fix xDS deadlock due to syncLoop termination. This fixes an issue where agentless xDS streams can deadlock permanently until a server is restarted. When this issue occurs, no new proxies are able to successfully connect to the server. Effectively, the trigger for this deadlock stems from the following return statement: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L199-L202 When this happens, the entire `syncLoop()` terminates and stops consuming from the following channel: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L182-L192 Which results in the `ConfigSource.cleanup()` function never receiving a response and holding a mutex indefinitely: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L241-L247 Because this mutex is shared, it effectively deadlocks the server's ability to process new xDS streams. ---- The fix to this issue involves removing the `chan chan struct{}` used like an RPC-over-channels pattern and replacing it with two distinct channels: + `stopSyncLoopCh` - indicates that the `syncLoop()` should terminate soon. + `syncLoopDoneCh` - indicates that the `syncLoop()` has terminated. Splitting these two concepts out and deferring a `close(syncLoopDoneCh)` in the `syncLoop()` function ensures that the deadlock above should no longer occur. We also now evict xDS connections of all proxies for the corresponding `syncLoop()` whenever it encounters an irrecoverable error. This is done by hoisting the new `syncLoopDoneCh` upwards so that it's visible to the xDS delta processing. Prior to this fix, the behavior was to simply orphan them so they would never receive catalog-registration or service-defaults updates. * Add changelog.
2024-03-15 18:57:11 +00:00
_, _, _, _, err := mgr.Watch(proxyID, nodeName, token)
require.NoError(t, err)
}
func TestConfigSource_ErrorRegisteringService(t *testing.T) {
serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil)
nodeName := "node-name"
token := "token"
store := testStateStore(t)
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web-sidecar-proxy",
Port: 9999,
Kind: structs.ServiceKindConnectProxy,
},
}))
var canceledWatch bool
cancel := context.CancelFunc(func() { canceledWatch = true })
cfgMgr := NewMockConfigManager(t)
cfgMgr.On("Watch", mock.Anything).
Return(make(<-chan *proxycfg.ConfigSnapshot), cancel)
cfgMgr.On("Register", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("KABOOM"))
session := newMockSession(t)
session.On("End").Return()
lim := NewMockSessionLimiter(t)
lim.On("BeginSession").Return(session, nil)
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: lim,
})
t.Cleanup(mgr.Shutdown)
_, _, _, _, err := mgr.Watch(serviceID, nodeName, token)
require.Error(t, err)
require.True(t, canceledWatch, "watch should've been canceled")
session.AssertCalled(t, "End")
}
Fix xDS deadlock due to syncLoop termination. (#20867) * Fix xDS deadlock due to syncLoop termination. This fixes an issue where agentless xDS streams can deadlock permanently until a server is restarted. When this issue occurs, no new proxies are able to successfully connect to the server. Effectively, the trigger for this deadlock stems from the following return statement: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L199-L202 When this happens, the entire `syncLoop()` terminates and stops consuming from the following channel: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L182-L192 Which results in the `ConfigSource.cleanup()` function never receiving a response and holding a mutex indefinitely: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L241-L247 Because this mutex is shared, it effectively deadlocks the server's ability to process new xDS streams. ---- The fix to this issue involves removing the `chan chan struct{}` used like an RPC-over-channels pattern and replacing it with two distinct channels: + `stopSyncLoopCh` - indicates that the `syncLoop()` should terminate soon. + `syncLoopDoneCh` - indicates that the `syncLoop()` has terminated. Splitting these two concepts out and deferring a `close(syncLoopDoneCh)` in the `syncLoop()` function ensures that the deadlock above should no longer occur. We also now evict xDS connections of all proxies for the corresponding `syncLoop()` whenever it encounters an irrecoverable error. This is done by hoisting the new `syncLoopDoneCh` upwards so that it's visible to the xDS delta processing. Prior to this fix, the behavior was to simply orphan them so they would never receive catalog-registration or service-defaults updates. * Add changelog.
2024-03-15 18:57:11 +00:00
func TestConfigSource_ErrorInSyncLoop(t *testing.T) {
serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil)
nodeName := "node-name"
token := "token"
store := testStateStore(t)
// Register the proxy in the catalog/state store at port 9999.
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web-sidecar-proxy",
Port: 9999,
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
Config: map[string]any{
"local_connect_timeout_ms": 123,
},
},
},
}))
cfgMgr := NewMockConfigManager(t)
{
proxyID := proxycfg.ProxyID{
ServiceID: serviceID,
NodeName: nodeName,
Token: token,
}
snapCh := make(chan *proxycfg.ConfigSnapshot, 1)
Fix xDS deadlock due to syncLoop termination. (#20867) * Fix xDS deadlock due to syncLoop termination. This fixes an issue where agentless xDS streams can deadlock permanently until a server is restarted. When this issue occurs, no new proxies are able to successfully connect to the server. Effectively, the trigger for this deadlock stems from the following return statement: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L199-L202 When this happens, the entire `syncLoop()` terminates and stops consuming from the following channel: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L182-L192 Which results in the `ConfigSource.cleanup()` function never receiving a response and holding a mutex indefinitely: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L241-L247 Because this mutex is shared, it effectively deadlocks the server's ability to process new xDS streams. ---- The fix to this issue involves removing the `chan chan struct{}` used like an RPC-over-channels pattern and replacing it with two distinct channels: + `stopSyncLoopCh` - indicates that the `syncLoop()` should terminate soon. + `syncLoopDoneCh` - indicates that the `syncLoop()` has terminated. Splitting these two concepts out and deferring a `close(syncLoopDoneCh)` in the `syncLoop()` function ensures that the deadlock above should no longer occur. We also now evict xDS connections of all proxies for the corresponding `syncLoop()` whenever it encounters an irrecoverable error. This is done by hoisting the new `syncLoopDoneCh` upwards so that it's visible to the xDS delta processing. Prior to this fix, the behavior was to simply orphan them so they would never receive catalog-registration or service-defaults updates. * Add changelog.
2024-03-15 18:57:11 +00:00
cfgMgr.On("Watch", proxyID).
Return((<-chan *proxycfg.ConfigSnapshot)(snapCh), context.CancelFunc(func() {}), nil)
Fix xDS deadlock due to syncLoop termination. (#20867) * Fix xDS deadlock due to syncLoop termination. This fixes an issue where agentless xDS streams can deadlock permanently until a server is restarted. When this issue occurs, no new proxies are able to successfully connect to the server. Effectively, the trigger for this deadlock stems from the following return statement: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L199-L202 When this happens, the entire `syncLoop()` terminates and stops consuming from the following channel: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L182-L192 Which results in the `ConfigSource.cleanup()` function never receiving a response and holding a mutex indefinitely: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L241-L247 Because this mutex is shared, it effectively deadlocks the server's ability to process new xDS streams. ---- The fix to this issue involves removing the `chan chan struct{}` used like an RPC-over-channels pattern and replacing it with two distinct channels: + `stopSyncLoopCh` - indicates that the `syncLoop()` should terminate soon. + `syncLoopDoneCh` - indicates that the `syncLoop()` has terminated. Splitting these two concepts out and deferring a `close(syncLoopDoneCh)` in the `syncLoop()` function ensures that the deadlock above should no longer occur. We also now evict xDS connections of all proxies for the corresponding `syncLoop()` whenever it encounters an irrecoverable error. This is done by hoisting the new `syncLoopDoneCh` upwards so that it's visible to the xDS delta processing. Prior to this fix, the behavior was to simply orphan them so they would never receive catalog-registration or service-defaults updates. * Add changelog.
2024-03-15 18:57:11 +00:00
// Answer the register call successfully for session 1 starting (Repeatability = 1).
// Session 2 should not have caused a re-register to happen.
cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false).
Run(func(args mock.Arguments) {
id := args.Get(0).(proxycfg.ProxyID)
ns := args.Get(1).(*structs.NodeService)
snapCh <- &proxycfg.ConfigSnapshot{
ProxyID: id,
Port: ns.Port,
Proxy: ns.Proxy,
}
}).
Return(nil).
Repeatability = 1
// Error on subsequent registrations afterwards (during the sync loop).
cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false).
Return(fmt.Errorf("intentional registration error"))
cfgMgr.On("Deregister", proxyID, source).
Run(func(mock.Arguments) { close(snapCh) }).
Return()
}
lim := NewMockSessionLimiter(t)
session1TermCh := make(limiter.SessionTerminatedChan)
session2TermCh := make(limiter.SessionTerminatedChan)
{
session1 := newMockSession(t)
session1.On("Terminated").Return(session1TermCh)
session1.On("End").Return()
session2 := newMockSession(t)
session2.On("Terminated").Return(session2TermCh)
session2.On("End").Return()
lim.On("BeginSession").Return(session1, nil).Once()
lim.On("BeginSession").Return(session2, nil).Once()
}
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: lim,
})
t.Cleanup(mgr.Shutdown)
snapCh, termCh, cfgSrcTerminated1, cancelWatch1, err := mgr.Watch(serviceID, nodeName, token)
Fix xDS deadlock due to syncLoop termination. (#20867) * Fix xDS deadlock due to syncLoop termination. This fixes an issue where agentless xDS streams can deadlock permanently until a server is restarted. When this issue occurs, no new proxies are able to successfully connect to the server. Effectively, the trigger for this deadlock stems from the following return statement: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L199-L202 When this happens, the entire `syncLoop()` terminates and stops consuming from the following channel: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L182-L192 Which results in the `ConfigSource.cleanup()` function never receiving a response and holding a mutex indefinitely: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L241-L247 Because this mutex is shared, it effectively deadlocks the server's ability to process new xDS streams. ---- The fix to this issue involves removing the `chan chan struct{}` used like an RPC-over-channels pattern and replacing it with two distinct channels: + `stopSyncLoopCh` - indicates that the `syncLoop()` should terminate soon. + `syncLoopDoneCh` - indicates that the `syncLoop()` has terminated. Splitting these two concepts out and deferring a `close(syncLoopDoneCh)` in the `syncLoop()` function ensures that the deadlock above should no longer occur. We also now evict xDS connections of all proxies for the corresponding `syncLoop()` whenever it encounters an irrecoverable error. This is done by hoisting the new `syncLoopDoneCh` upwards so that it's visible to the xDS delta processing. Prior to this fix, the behavior was to simply orphan them so they would never receive catalog-registration or service-defaults updates. * Add changelog.
2024-03-15 18:57:11 +00:00
require.NoError(t, err)
require.Equal(t, session1TermCh, termCh)
// Expect Register to have been called with the proxy's inital port.
select {
case snap := <-snapCh:
require.Equal(t, 9999, snap.Port)
require.Equal(t, token, snap.ProxyID.Token)
Fix xDS deadlock due to syncLoop termination. (#20867) * Fix xDS deadlock due to syncLoop termination. This fixes an issue where agentless xDS streams can deadlock permanently until a server is restarted. When this issue occurs, no new proxies are able to successfully connect to the server. Effectively, the trigger for this deadlock stems from the following return statement: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L199-L202 When this happens, the entire `syncLoop()` terminates and stops consuming from the following channel: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L182-L192 Which results in the `ConfigSource.cleanup()` function never receiving a response and holding a mutex indefinitely: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L241-L247 Because this mutex is shared, it effectively deadlocks the server's ability to process new xDS streams. ---- The fix to this issue involves removing the `chan chan struct{}` used like an RPC-over-channels pattern and replacing it with two distinct channels: + `stopSyncLoopCh` - indicates that the `syncLoop()` should terminate soon. + `syncLoopDoneCh` - indicates that the `syncLoop()` has terminated. Splitting these two concepts out and deferring a `close(syncLoopDoneCh)` in the `syncLoop()` function ensures that the deadlock above should no longer occur. We also now evict xDS connections of all proxies for the corresponding `syncLoop()` whenever it encounters an irrecoverable error. This is done by hoisting the new `syncLoopDoneCh` upwards so that it's visible to the xDS delta processing. Prior to this fix, the behavior was to simply orphan them so they would never receive catalog-registration or service-defaults updates. * Add changelog.
2024-03-15 18:57:11 +00:00
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for snapshot")
}
// Start another watch.
_, termCh2, cfgSrcTerminated2, cancelWatch2, err := mgr.Watch(serviceID, nodeName, token)
Fix xDS deadlock due to syncLoop termination. (#20867) * Fix xDS deadlock due to syncLoop termination. This fixes an issue where agentless xDS streams can deadlock permanently until a server is restarted. When this issue occurs, no new proxies are able to successfully connect to the server. Effectively, the trigger for this deadlock stems from the following return statement: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L199-L202 When this happens, the entire `syncLoop()` terminates and stops consuming from the following channel: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L182-L192 Which results in the `ConfigSource.cleanup()` function never receiving a response and holding a mutex indefinitely: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L241-L247 Because this mutex is shared, it effectively deadlocks the server's ability to process new xDS streams. ---- The fix to this issue involves removing the `chan chan struct{}` used like an RPC-over-channels pattern and replacing it with two distinct channels: + `stopSyncLoopCh` - indicates that the `syncLoop()` should terminate soon. + `syncLoopDoneCh` - indicates that the `syncLoop()` has terminated. Splitting these two concepts out and deferring a `close(syncLoopDoneCh)` in the `syncLoop()` function ensures that the deadlock above should no longer occur. We also now evict xDS connections of all proxies for the corresponding `syncLoop()` whenever it encounters an irrecoverable error. This is done by hoisting the new `syncLoopDoneCh` upwards so that it's visible to the xDS delta processing. Prior to this fix, the behavior was to simply orphan them so they would never receive catalog-registration or service-defaults updates. * Add changelog.
2024-03-15 18:57:11 +00:00
require.NoError(t, err)
require.Equal(t, session2TermCh, termCh2)
// Expect the service to have not been re-registered by the second watch.
select {
case <-snapCh:
t.Fatal("service shouldn't have been re-registered")
case <-time.After(100 * time.Millisecond):
}
// Ensure that no config-source syncLoops were terminated.
select {
case <-cfgSrcTerminated1:
t.Fatal("unexpected config-source termination 1")
case <-cfgSrcTerminated2:
t.Fatal("unexpected config-source termination 2")
default:
}
// Update the proxy's port to 8888.
// This should trigger the config-source syncLoop termination channel due to an error.
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web-sidecar-proxy",
Port: 8888,
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
Config: map[string]any{
"local_connect_timeout_ms": 123,
},
},
},
}))
// Expect both config sources to have terminated when the syncLoop errors.
select {
case _, ok := <-cfgSrcTerminated1:
cancelWatch1()
require.False(t, ok)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for config-source termination 1")
}
select {
case _, ok := <-cfgSrcTerminated2:
cancelWatch2()
require.False(t, ok)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for config-source termination 2")
}
// Expect the snap channels to have been closed.
select {
case _, ok := <-snapCh:
require.False(t, ok)
case <-time.After(100 * time.Millisecond):
t.Fatal("snap channel was not closed")
}
}
func TestConfigSource_NotProxyService(t *testing.T) {
serviceID := structs.NewServiceID("web", nil)
nodeName := "node-name"
token := "token"
store := testStateStore(t)
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web",
Port: 9999,
Kind: structs.ServiceKindTypical,
},
}))
var canceledWatch bool
cancel := context.CancelFunc(func() { canceledWatch = true })
cfgMgr := NewMockConfigManager(t)
cfgMgr.On("Watch", mock.Anything).
Return(make(<-chan *proxycfg.ConfigSnapshot), cancel)
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: nullSessionLimiter{},
})
t.Cleanup(mgr.Shutdown)
_, _, _, _, err := mgr.Watch(serviceID, nodeName, token)
require.Error(t, err)
require.Contains(t, err.Error(), "must be a sidecar proxy or gateway")
require.True(t, canceledWatch, "watch should've been canceled")
}
func TestConfigSource_SessionLimiterError(t *testing.T) {
lim := NewMockSessionLimiter(t)
lim.On("BeginSession").Return(nil, limiter.ErrCapacityReached)
src := NewConfigSource(Config{
LocalState: testLocalState(t),
SessionLimiter: lim,
})
t.Cleanup(src.Shutdown)
Fix xDS deadlock due to syncLoop termination. (#20867) * Fix xDS deadlock due to syncLoop termination. This fixes an issue where agentless xDS streams can deadlock permanently until a server is restarted. When this issue occurs, no new proxies are able to successfully connect to the server. Effectively, the trigger for this deadlock stems from the following return statement: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L199-L202 When this happens, the entire `syncLoop()` terminates and stops consuming from the following channel: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L182-L192 Which results in the `ConfigSource.cleanup()` function never receiving a response and holding a mutex indefinitely: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L241-L247 Because this mutex is shared, it effectively deadlocks the server's ability to process new xDS streams. ---- The fix to this issue involves removing the `chan chan struct{}` used like an RPC-over-channels pattern and replacing it with two distinct channels: + `stopSyncLoopCh` - indicates that the `syncLoop()` should terminate soon. + `syncLoopDoneCh` - indicates that the `syncLoop()` has terminated. Splitting these two concepts out and deferring a `close(syncLoopDoneCh)` in the `syncLoop()` function ensures that the deadlock above should no longer occur. We also now evict xDS connections of all proxies for the corresponding `syncLoop()` whenever it encounters an irrecoverable error. This is done by hoisting the new `syncLoopDoneCh` upwards so that it's visible to the xDS delta processing. Prior to this fix, the behavior was to simply orphan them so they would never receive catalog-registration or service-defaults updates. * Add changelog.
2024-03-15 18:57:11 +00:00
_, _, _, _, err := src.Watch(
structs.NewServiceID("web-sidecar-proxy-1", nil),
"node-name",
"token",
)
require.Equal(t, limiter.ErrCapacityReached, err)
}
Fix xDS deadlock due to syncLoop termination. (#20867) * Fix xDS deadlock due to syncLoop termination. This fixes an issue where agentless xDS streams can deadlock permanently until a server is restarted. When this issue occurs, no new proxies are able to successfully connect to the server. Effectively, the trigger for this deadlock stems from the following return statement: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L199-L202 When this happens, the entire `syncLoop()` terminates and stops consuming from the following channel: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L182-L192 Which results in the `ConfigSource.cleanup()` function never receiving a response and holding a mutex indefinitely: https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L241-L247 Because this mutex is shared, it effectively deadlocks the server's ability to process new xDS streams. ---- The fix to this issue involves removing the `chan chan struct{}` used like an RPC-over-channels pattern and replacing it with two distinct channels: + `stopSyncLoopCh` - indicates that the `syncLoop()` should terminate soon. + `syncLoopDoneCh` - indicates that the `syncLoop()` has terminated. Splitting these two concepts out and deferring a `close(syncLoopDoneCh)` in the `syncLoop()` function ensures that the deadlock above should no longer occur. We also now evict xDS connections of all proxies for the corresponding `syncLoop()` whenever it encounters an irrecoverable error. This is done by hoisting the new `syncLoopDoneCh` upwards so that it's visible to the xDS delta processing. Prior to this fix, the behavior was to simply orphan them so they would never receive catalog-registration or service-defaults updates. * Add changelog.
2024-03-15 18:57:11 +00:00
func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName string, token string) *MockConfigManager {
t.Helper()
cfgMgr := NewMockConfigManager(t)
proxyID := proxycfg.ProxyID{
ServiceID: serviceID,
NodeName: nodeName,
Token: token,
}
snapCh := make(chan *proxycfg.ConfigSnapshot, 1)
cfgMgr.On("Watch", proxyID).
Return((<-chan *proxycfg.ConfigSnapshot)(snapCh), context.CancelFunc(func() {}), nil)
cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false).
Run(func(args mock.Arguments) {
id := args.Get(0).(proxycfg.ProxyID)
ns := args.Get(1).(*structs.NodeService)
snapCh <- &proxycfg.ConfigSnapshot{
ProxyID: id,
Port: ns.Port,
Proxy: ns.Proxy,
}
}).
Return(nil)
cfgMgr.On("Deregister", proxyID, source).
Run(func(mock.Arguments) { close(snapCh) }).
Return()
return cfgMgr
}
func testStateStore(t *testing.T) *state.Store {
t.Helper()
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond)
require.NoError(t, err)
return state.NewStateStoreWithEventPublisher(gc, stream.NoOpEventPublisher{})
}
func testLocalState(t *testing.T) *local.State {
t.Helper()
l := local.NewState(local.Config{}, hclog.NewNullLogger(), &token.Store{})
l.TriggerSyncChanges = func() {}
return l
}
type nullSessionLimiter struct{}
func (nullSessionLimiter) BeginSession() (limiter.Session, error) {
return nullSession{}, nil
}
func (nullSessionLimiter) Run(ctx context.Context) {}
type nullSession struct{}
func (nullSession) End() {}
func (nullSession) Terminated() limiter.SessionTerminatedChan { return nil }
type mockSession struct {
mock.Mock
}
func newMockSession(t *testing.T) *mockSession {
m := &mockSession{}
m.Mock.Test(t)
t.Cleanup(func() { m.AssertExpectations(t) })
return m
}
func (m *mockSession) End() { m.Called() }
func (m *mockSession) Terminated() limiter.SessionTerminatedChan {
return m.Called().Get(0).(limiter.SessionTerminatedChan)
}