mirror of https://github.com/status-im/consul.git
proxycfg: ensure that an irrecoverable error in proxycfg closes the xds session and triggers a replacement proxycfg watcher (#16497)
Receiving an "acl not found" error from an RPC in the agent cache and the streaming/event components will cause any request loops to cease under the assumption that they will never work again if the token was destroyed. This prevents log spam (#14144, #9738). Unfortunately due to things like: - authz requests going to stale servers that may not have witnessed the token creation yet - authz requests in a secondary datacenter happening before the tokens get replicated to that datacenter - authz requests from a primary TO a secondary datacenter happening before the tokens get replicated to that datacenter The caller will get an "acl not found" *before* the token exists, rather than just after. The machinery added above in the linked PRs will kick in and prevent the request loop from looping around again once the tokens actually exist. For `consul-dataplane` usages, where xDS is served by the Consul servers rather than the clients ultimately this is not a problem because in that scenario the `agent/proxycfg` machinery is on-demand and launched by a new xDS stream needing data for a specific service in the catalog. If the watching goroutines are terminated it ripples down and terminates the xDS stream, which CDP will eventually re-establish and restart everything. For Consul client usages, the `agent/proxycfg` machinery is ahead-of-time launched at service registration time (called "local" in some of the proxycfg machinery) so when the xDS stream comes in the data is already ready to go. If the watching goroutines terminate it should terminate the xDS stream, but there's no mechanism to re-spawn the watching goroutines. If the xDS stream reconnects it will see no `ConfigSnapshot` and will not get one again until the client agent is restarted, or the service is re-registered with something changed in it. This PR fixes a few things in the machinery: - there was an inadvertent deadlock in fetching snapshot from the proxycfg machinery by xDS, such that when the watching goroutine terminated the snapshots would never be fetched. This caused some of the xDS machinery to get indefinitely paused and not finish the teardown properly. - Every 30s we now attempt to re-insert all locally registered services into the proxycfg machinery. - When services are re-inserted into the proxycfg machinery we special case "dead" ones such that we unilaterally replace them rather that doing that conditionally.
This commit is contained in:
parent
8910002e8f
commit
9a485cdb49
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
proxycfg: ensure that an irrecoverable error in proxycfg closes the xds session and triggers a replacement proxycfg watcher
|
||||||
|
```
|
|
@ -721,11 +721,12 @@ func (a *Agent) Start(ctx context.Context) error {
|
||||||
go localproxycfg.Sync(
|
go localproxycfg.Sync(
|
||||||
&lib.StopChannelContext{StopCh: a.shutdownCh},
|
&lib.StopChannelContext{StopCh: a.shutdownCh},
|
||||||
localproxycfg.SyncConfig{
|
localproxycfg.SyncConfig{
|
||||||
Manager: a.proxyConfig,
|
Manager: a.proxyConfig,
|
||||||
State: a.State,
|
State: a.State,
|
||||||
Logger: a.proxyConfig.Logger.Named("agent-state"),
|
Logger: a.proxyConfig.Logger.Named("agent-state"),
|
||||||
Tokens: a.baseDeps.Tokens,
|
Tokens: a.baseDeps.Tokens,
|
||||||
NodeName: a.config.NodeName,
|
NodeName: a.config.NodeName,
|
||||||
|
ResyncFrequency: a.config.LocalProxyConfigResyncInterval,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -1091,6 +1091,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
|
||||||
Watches: c.Watches,
|
Watches: c.Watches,
|
||||||
XDSUpdateRateLimit: limitVal(c.XDS.UpdateMaxPerSecond),
|
XDSUpdateRateLimit: limitVal(c.XDS.UpdateMaxPerSecond),
|
||||||
AutoReloadConfigCoalesceInterval: 1 * time.Second,
|
AutoReloadConfigCoalesceInterval: 1 * time.Second,
|
||||||
|
LocalProxyConfigResyncInterval: 30 * time.Second,
|
||||||
}
|
}
|
||||||
|
|
||||||
rt.TLS, err = b.buildTLSConfig(rt, c.TLS)
|
rt.TLS, err = b.buildTLSConfig(rt, c.TLS)
|
||||||
|
|
|
@ -1475,6 +1475,10 @@ type RuntimeConfig struct {
|
||||||
// AutoReloadConfigCoalesceInterval Coalesce Interval for auto reload config
|
// AutoReloadConfigCoalesceInterval Coalesce Interval for auto reload config
|
||||||
AutoReloadConfigCoalesceInterval time.Duration
|
AutoReloadConfigCoalesceInterval time.Duration
|
||||||
|
|
||||||
|
// LocalProxyConfigResyncInterval is not a user-configurable value and exists
|
||||||
|
// here so that tests can use a smaller value.
|
||||||
|
LocalProxyConfigResyncInterval time.Duration
|
||||||
|
|
||||||
EnterpriseRuntimeConfig
|
EnterpriseRuntimeConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5995,12 +5995,13 @@ func TestLoad_FullConfig(t *testing.T) {
|
||||||
nodeEntMeta := structs.NodeEnterpriseMetaInDefaultPartition()
|
nodeEntMeta := structs.NodeEnterpriseMetaInDefaultPartition()
|
||||||
expected := &RuntimeConfig{
|
expected := &RuntimeConfig{
|
||||||
// non-user configurable values
|
// non-user configurable values
|
||||||
AEInterval: time.Minute,
|
AEInterval: time.Minute,
|
||||||
CheckDeregisterIntervalMin: time.Minute,
|
CheckDeregisterIntervalMin: time.Minute,
|
||||||
CheckReapInterval: 30 * time.Second,
|
CheckReapInterval: 30 * time.Second,
|
||||||
SegmentNameLimit: 64,
|
SegmentNameLimit: 64,
|
||||||
SyncCoordinateIntervalMin: 15 * time.Second,
|
SyncCoordinateIntervalMin: 15 * time.Second,
|
||||||
SyncCoordinateRateTarget: 64,
|
SyncCoordinateRateTarget: 64,
|
||||||
|
LocalProxyConfigResyncInterval: 30 * time.Second,
|
||||||
|
|
||||||
Revision: "JNtPSav3",
|
Revision: "JNtPSav3",
|
||||||
Version: "R909Hblt",
|
Version: "R909Hblt",
|
||||||
|
|
|
@ -233,6 +233,7 @@
|
||||||
"KVMaxValueSize": 1234567800000000,
|
"KVMaxValueSize": 1234567800000000,
|
||||||
"LeaveDrainTime": "0s",
|
"LeaveDrainTime": "0s",
|
||||||
"LeaveOnTerm": false,
|
"LeaveOnTerm": false,
|
||||||
|
"LocalProxyConfigResyncInterval": "0s",
|
||||||
"Logging": {
|
"Logging": {
|
||||||
"EnableSyslog": false,
|
"EnableSyslog": false,
|
||||||
"LogFilePath": "",
|
"LogFilePath": "",
|
||||||
|
|
|
@ -2,6 +2,7 @@ package local
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
|
|
||||||
|
@ -11,6 +12,8 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/token"
|
"github.com/hashicorp/consul/agent/token"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const resyncFrequency = 30 * time.Second
|
||||||
|
|
||||||
const source proxycfg.ProxySource = "local"
|
const source proxycfg.ProxySource = "local"
|
||||||
|
|
||||||
// SyncConfig contains the dependencies required by Sync.
|
// SyncConfig contains the dependencies required by Sync.
|
||||||
|
@ -30,6 +33,10 @@ type SyncConfig struct {
|
||||||
|
|
||||||
// Logger will be used to write log messages.
|
// Logger will be used to write log messages.
|
||||||
Logger hclog.Logger
|
Logger hclog.Logger
|
||||||
|
|
||||||
|
// ResyncFrequency is how often to do a resync and recreate any terminated
|
||||||
|
// watches.
|
||||||
|
ResyncFrequency time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync watches the agent's local state and registers/deregisters services with
|
// Sync watches the agent's local state and registers/deregisters services with
|
||||||
|
@ -50,12 +57,19 @@ func Sync(ctx context.Context, cfg SyncConfig) {
|
||||||
cfg.State.Notify(stateCh)
|
cfg.State.Notify(stateCh)
|
||||||
defer cfg.State.StopNotify(stateCh)
|
defer cfg.State.StopNotify(stateCh)
|
||||||
|
|
||||||
|
var resyncCh <-chan time.Time
|
||||||
for {
|
for {
|
||||||
sync(cfg)
|
sync(cfg)
|
||||||
|
|
||||||
|
if resyncCh == nil && cfg.ResyncFrequency > 0 {
|
||||||
|
resyncCh = time.After(cfg.ResyncFrequency)
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-stateCh:
|
case <-stateCh:
|
||||||
// Wait for a state change.
|
// Wait for a state change.
|
||||||
|
case <-resyncCh:
|
||||||
|
resyncCh = nil
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -158,7 +158,7 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour
|
||||||
|
|
||||||
func (m *Manager) register(id ProxyID, ns *structs.NodeService, source ProxySource, token string, overwrite bool) error {
|
func (m *Manager) register(id ProxyID, ns *structs.NodeService, source ProxySource, token string, overwrite bool) error {
|
||||||
state, ok := m.proxies[id]
|
state, ok := m.proxies[id]
|
||||||
if ok {
|
if ok && !state.stoppedRunning() {
|
||||||
if state.source != source && !overwrite {
|
if state.source != source && !overwrite {
|
||||||
// Registered by a different source, leave as-is.
|
// Registered by a different source, leave as-is.
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -83,10 +83,20 @@ type state struct {
|
||||||
ch chan UpdateEvent
|
ch chan UpdateEvent
|
||||||
snapCh chan ConfigSnapshot
|
snapCh chan ConfigSnapshot
|
||||||
reqCh chan chan *ConfigSnapshot
|
reqCh chan chan *ConfigSnapshot
|
||||||
|
doneCh chan struct{}
|
||||||
|
|
||||||
rateLimiter *rate.Limiter
|
rateLimiter *rate.Limiter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *state) stoppedRunning() bool {
|
||||||
|
select {
|
||||||
|
case <-s.doneCh:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// failed returns whether run exited because a data source is in an
|
// failed returns whether run exited because a data source is in an
|
||||||
// irrecoverable state.
|
// irrecoverable state.
|
||||||
func (s *state) failed() bool {
|
func (s *state) failed() bool {
|
||||||
|
@ -182,6 +192,7 @@ func newState(id ProxyID, ns *structs.NodeService, source ProxySource, token str
|
||||||
ch: ch,
|
ch: ch,
|
||||||
snapCh: make(chan ConfigSnapshot, 1),
|
snapCh: make(chan ConfigSnapshot, 1),
|
||||||
reqCh: make(chan chan *ConfigSnapshot, 1),
|
reqCh: make(chan chan *ConfigSnapshot, 1),
|
||||||
|
doneCh: make(chan struct{}),
|
||||||
rateLimiter: rateLimiter,
|
rateLimiter: rateLimiter,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -265,6 +276,9 @@ func (s *state) Watch() (<-chan ConfigSnapshot, error) {
|
||||||
|
|
||||||
// Close discards the state and stops any long-running watches.
|
// Close discards the state and stops any long-running watches.
|
||||||
func (s *state) Close(failed bool) error {
|
func (s *state) Close(failed bool) error {
|
||||||
|
if s.stoppedRunning() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
if s.cancel != nil {
|
if s.cancel != nil {
|
||||||
s.cancel()
|
s.cancel()
|
||||||
}
|
}
|
||||||
|
@ -314,6 +328,9 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) unsafeRun(ctx context.Context, snap *ConfigSnapshot) {
|
func (s *state) unsafeRun(ctx context.Context, snap *ConfigSnapshot) {
|
||||||
|
// Closing the done channel signals that this entire state is no longer
|
||||||
|
// going to be updated.
|
||||||
|
defer close(s.doneCh)
|
||||||
// Close the channel we return from Watch when we stop so consumers can stop
|
// Close the channel we return from Watch when we stop so consumers can stop
|
||||||
// watching and clean up their goroutines. It's important we do this here and
|
// watching and clean up their goroutines. It's important we do this here and
|
||||||
// not in Close since this routine sends on this chan and so might panic if it
|
// not in Close since this routine sends on this chan and so might panic if it
|
||||||
|
@ -429,9 +446,20 @@ func (s *state) unsafeRun(ctx context.Context, snap *ConfigSnapshot) {
|
||||||
func (s *state) CurrentSnapshot() *ConfigSnapshot {
|
func (s *state) CurrentSnapshot() *ConfigSnapshot {
|
||||||
// Make a chan for the response to be sent on
|
// Make a chan for the response to be sent on
|
||||||
ch := make(chan *ConfigSnapshot, 1)
|
ch := make(chan *ConfigSnapshot, 1)
|
||||||
s.reqCh <- ch
|
|
||||||
|
select {
|
||||||
|
case <-s.doneCh:
|
||||||
|
return nil
|
||||||
|
case s.reqCh <- ch:
|
||||||
|
}
|
||||||
|
|
||||||
// Wait for the response
|
// Wait for the response
|
||||||
return <-ch
|
select {
|
||||||
|
case <-s.doneCh:
|
||||||
|
return nil
|
||||||
|
case resp := <-ch:
|
||||||
|
return resp
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Changed returns whether or not the passed NodeService has had any of the
|
// Changed returns whether or not the passed NodeService has had any of the
|
||||||
|
|
|
@ -0,0 +1,138 @@
|
||||||
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/grpc-external/limiter"
|
||||||
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
|
"github.com/hashicorp/consul/testrpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAgent_local_proxycfg(t *testing.T) {
|
||||||
|
a := NewTestAgent(t, TestACLConfig())
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
token := generateUUID()
|
||||||
|
|
||||||
|
svc := &structs.NodeService{
|
||||||
|
ID: "db",
|
||||||
|
Service: "db",
|
||||||
|
Port: 5000,
|
||||||
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
|
}
|
||||||
|
require.NoError(t, a.State.AddServiceWithChecks(svc, nil, token, true))
|
||||||
|
|
||||||
|
proxy := &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
ID: "db-sidecar-proxy",
|
||||||
|
Service: "db-sidecar-proxy",
|
||||||
|
Port: 5000,
|
||||||
|
// Set this internal state that we expect sidecar registrations to have.
|
||||||
|
LocallyRegisteredAsSidecar: true,
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "db",
|
||||||
|
Upstreams: structs.TestUpstreams(t),
|
||||||
|
},
|
||||||
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
|
}
|
||||||
|
require.NoError(t, a.State.AddServiceWithChecks(proxy, nil, token, true))
|
||||||
|
|
||||||
|
// This is a little gross, but this gives us the layered pair of
|
||||||
|
// local/catalog sources for now.
|
||||||
|
cfg := a.xdsServer.CfgSrc
|
||||||
|
|
||||||
|
var (
|
||||||
|
timer = time.After(100 * time.Millisecond)
|
||||||
|
timerFired = false
|
||||||
|
finalTimer <-chan time.Time
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
firstTime = true
|
||||||
|
ch <-chan *proxycfg.ConfigSnapshot
|
||||||
|
stc limiter.SessionTerminatedChan
|
||||||
|
cancel proxycfg.CancelFunc
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
if cancel != nil {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
if ch == nil {
|
||||||
|
// Sign up for a stream of config snapshots, in the same manner as the xds server.
|
||||||
|
sid := proxy.CompoundServiceID()
|
||||||
|
|
||||||
|
if firstTime {
|
||||||
|
firstTime = false
|
||||||
|
} else {
|
||||||
|
t.Logf("re-creating watch")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prior to fixes in https://github.com/hashicorp/consul/pull/16497
|
||||||
|
// this call to Watch() would deadlock.
|
||||||
|
var err error
|
||||||
|
ch, stc, cancel, err = cfg.Watch(sid, a.config.NodeName, token)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-stc:
|
||||||
|
t.Fatal("session unexpectedly terminated")
|
||||||
|
case snap, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
t.Logf("channel is closed")
|
||||||
|
cancel()
|
||||||
|
ch, stc, cancel = nil, nil, nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
require.NotNil(t, snap)
|
||||||
|
if !timerFired {
|
||||||
|
t.Fatal("should not have gotten snapshot until after we manifested the token")
|
||||||
|
}
|
||||||
|
return
|
||||||
|
case <-timer:
|
||||||
|
timerFired = true
|
||||||
|
finalTimer = time.After(1 * time.Second)
|
||||||
|
|
||||||
|
// This simulates the eventual consistency of a token
|
||||||
|
// showing up on a server after it's creation by
|
||||||
|
// pre-creating the UUID and later using that as the
|
||||||
|
// initial SecretID for a real token.
|
||||||
|
gotToken := testWriteToken(t, a, &api.ACLToken{
|
||||||
|
AccessorID: generateUUID(),
|
||||||
|
SecretID: token,
|
||||||
|
Description: "my token",
|
||||||
|
ServiceIdentities: []*api.ACLServiceIdentity{{
|
||||||
|
ServiceName: "db",
|
||||||
|
}},
|
||||||
|
})
|
||||||
|
require.Equal(t, token, gotToken)
|
||||||
|
case <-finalTimer:
|
||||||
|
t.Fatal("did not receive a snapshot after the token manifested")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func testWriteToken(t *testing.T, a *TestAgent, tok *api.ACLToken) string {
|
||||||
|
req, _ := http.NewRequest("PUT", "/v1/acl/token", jsonReader(tok))
|
||||||
|
req.Header.Add("X-Consul-Token", "root")
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
a.srv.h.ServeHTTP(resp, req)
|
||||||
|
require.Equal(t, http.StatusOK, resp.Code)
|
||||||
|
|
||||||
|
dec := json.NewDecoder(resp.Body)
|
||||||
|
aclResp := &structs.ACLToken{}
|
||||||
|
require.NoError(t, dec.Decode(aclResp))
|
||||||
|
return aclResp.SecretID
|
||||||
|
}
|
|
@ -214,6 +214,9 @@ func (a *TestAgent) Start(t *testing.T) error {
|
||||||
// Lower the maximum backoff period of a cache refresh just for
|
// Lower the maximum backoff period of a cache refresh just for
|
||||||
// tests see #14956 for more.
|
// tests see #14956 for more.
|
||||||
result.RuntimeConfig.Cache.CacheRefreshMaxWait = 1 * time.Second
|
result.RuntimeConfig.Cache.CacheRefreshMaxWait = 1 * time.Second
|
||||||
|
|
||||||
|
// Lower the resync interval for tests.
|
||||||
|
result.RuntimeConfig.LocalProxyConfigResyncInterval = 250 * time.Millisecond
|
||||||
}
|
}
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue