consul/agent/proxycfg_test.go
Derek Menteer 0ac8ae6c3b
Fix xDS deadlock due to syncLoop termination. (#20867)
* Fix xDS deadlock due to syncLoop termination.

This fixes an issue where agentless xDS streams can deadlock permanently until
a server is restarted. When this issue occurs, no new proxies are able to
successfully connect to the server.

Effectively, the trigger for this deadlock stems from the following return
statement:
https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L199-L202

When this happens, the entire `syncLoop()` terminates and stops consuming from
the following channel:
https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L182-L192

Which results in the `ConfigSource.cleanup()` function never receiving a
response and holding a mutex indefinitely:
https://github.com/hashicorp/consul/blob/v1.18.0/agent/proxycfg-sources/catalog/config_source.go#L241-L247

Because this mutex is shared, it effectively deadlocks the server's ability to
process new xDS streams.

----

The fix to this issue involves removing the `chan chan struct{}` used like an
RPC-over-channels pattern and replacing it with two distinct channels:

+ `stopSyncLoopCh` - indicates that the `syncLoop()` should terminate soon.  +
`syncLoopDoneCh` - indicates that the `syncLoop()` has terminated.

Splitting these two concepts out and deferring a `close(syncLoopDoneCh)` in the
`syncLoop()` function ensures that the deadlock above should no longer occur.

We also now evict xDS connections of all proxies for the corresponding
`syncLoop()` whenever it encounters an irrecoverable error. This is done by
hoisting the new `syncLoopDoneCh` upwards so that it's visible to the xDS delta
processing. Prior to this fix, the behavior was to simply orphan them so they
would never receive catalog-registration or service-defaults updates.

* Add changelog.
2024-03-15 13:57:11 -05:00

144 lines
3.9 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package agent
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
"github.com/hashicorp/consul/testrpc"
)
func TestAgent_local_proxycfg(t *testing.T) {
a := NewTestAgent(t, TestACLConfig())
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
token := generateUUID()
svc := &structs.NodeService{
ID: "db",
Service: "db",
Port: 5000,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
require.NoError(t, a.State.AddServiceWithChecks(svc, nil, token, true))
proxy := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "db-sidecar-proxy",
Service: "db-sidecar-proxy",
Port: 5000,
// Set this internal state that we expect sidecar registrations to have.
LocallyRegisteredAsSidecar: true,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "db",
Upstreams: structs.TestUpstreams(t, false),
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
require.NoError(t, a.State.AddServiceWithChecks(proxy, nil, token, true))
// This is a little gross, but this gives us the layered pair of
// local/catalog sources for now.
cfg := a.xdsServer.ProxyWatcher
var (
timer = time.After(100 * time.Millisecond)
timerFired = false
finalTimer <-chan time.Time
)
var (
firstTime = true
ch <-chan proxysnapshot.ProxySnapshot
stc limiter.SessionTerminatedChan
cancel proxysnapshot.CancelFunc
)
defer func() {
if cancel != nil {
cancel()
}
}()
for {
if ch == nil {
// Sign up for a stream of config snapshots, in the same manner as the xds server.
sid := proxy.CompoundServiceID()
if firstTime {
firstTime = false
} else {
t.Logf("re-creating watch")
}
// Prior to fixes in https://github.com/hashicorp/consul/pull/16497
// this call to Watch() would deadlock.
var err error
ch, stc, _, cancel, err = cfg.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, sid.ID).ID(), a.config.NodeName, token)
require.NoError(t, err)
}
select {
case <-stc:
t.Fatal("session unexpectedly terminated")
case snap, ok := <-ch:
if !ok {
t.Logf("channel is closed")
cancel()
ch, stc, cancel = nil, nil, nil
continue
}
require.NotNil(t, snap)
if !timerFired {
t.Fatal("should not have gotten snapshot until after we manifested the token")
}
return
case <-timer:
timerFired = true
finalTimer = time.After(1 * time.Second)
// This simulates the eventual consistency of a token
// showing up on a server after it's creation by
// pre-creating the UUID and later using that as the
// initial SecretID for a real token.
gotToken := testWriteToken(t, a, &api.ACLToken{
AccessorID: generateUUID(),
SecretID: token,
Description: "my token",
ServiceIdentities: []*api.ACLServiceIdentity{{
ServiceName: "db",
}},
})
require.Equal(t, token, gotToken)
case <-finalTimer:
t.Fatal("did not receive a snapshot after the token manifested")
}
}
}
func testWriteToken(t *testing.T, a *TestAgent, tok *api.ACLToken) string {
req, _ := http.NewRequest("PUT", "/v1/acl/token", jsonReader(tok))
req.Header.Add("X-Consul-Token", "root")
resp := httptest.NewRecorder()
a.srv.h.ServeHTTP(resp, req)
require.Equal(t, http.StatusOK, resp.Code)
dec := json.NewDecoder(resp.Body)
aclResp := &structs.ACLToken{}
require.NoError(t, dec.Decode(aclResp))
return aclResp.SecretID
}