consul/agent/proxycfg/manager.go
R.B. Boyer 9a485cdb49
proxycfg: ensure that an irrecoverable error in proxycfg closes the xds session and triggers a replacement proxycfg watcher (#16497)
Receiving an "acl not found" error from an RPC in the agent cache and the
streaming/event components will cause any request loops to cease under the
assumption that they will never work again if the token was destroyed. This
prevents log spam (#14144, #9738).

Unfortunately due to things like:

- authz requests going to stale servers that may not have witnessed the token
  creation yet

- authz requests in a secondary datacenter happening before the tokens get
  replicated to that datacenter

- authz requests from a primary TO a secondary datacenter happening before the
  tokens get replicated to that datacenter

The caller will get an "acl not found" *before* the token exists, rather than
just after. The machinery added above in the linked PRs will kick in and
prevent the request loop from looping around again once the tokens actually
exist.

For `consul-dataplane` usages, where xDS is served by the Consul servers
rather than the clients ultimately this is not a problem because in that
scenario the `agent/proxycfg` machinery is on-demand and launched by a new xDS
stream needing data for a specific service in the catalog. If the watching
goroutines are terminated it ripples down and terminates the xDS stream, which
CDP will eventually re-establish and restart everything.

For Consul client usages, the `agent/proxycfg` machinery is ahead-of-time
launched at service registration time (called "local" in some of the proxycfg
machinery) so when the xDS stream comes in the data is already ready to go. If
the watching goroutines terminate it should terminate the xDS stream, but
there's no mechanism to re-spawn the watching goroutines. If the xDS stream
reconnects it will see no `ConfigSnapshot` and will not get one again until
the client agent is restarted, or the service is re-registered with something
changed in it.

This PR fixes a few things in the machinery:

- there was an inadvertent deadlock in fetching snapshot from the proxycfg
  machinery by xDS, such that when the watching goroutine terminated the
  snapshots would never be fetched. This caused some of the xDS machinery to
  get indefinitely paused and not finish the teardown properly.

- Every 30s we now attempt to re-insert all locally registered services into
  the proxycfg machinery.

- When services are re-inserted into the proxycfg machinery we special case
  "dead" ones such that we unilaterally replace them rather that doing that
  conditionally.
2023-03-03 14:27:53 -06:00

379 lines
11 KiB
Go

package proxycfg
import (
"errors"
"runtime/debug"
"sync"
"github.com/hashicorp/go-hclog"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/tlsutil"
)
// ProxyID is a handle on a proxy service instance being tracked by Manager.
type ProxyID struct {
structs.ServiceID
// NodeName identifies the node to which the proxy is registered.
NodeName string
// Token is used to track watches on the same proxy with different ACL tokens
// separately, to prevent accidental security bugs.
//
// Note: this can be different to the ACL token used for authorization that is
// passed to Register (e.g. agent-local services are registered ahead-of-time
// with a token that may be different to the one presented in the xDS stream).
Token string
}
// ProxySource identifies where a proxy service tracked by Manager came from,
// such as the agent's local state or the catalog. It's used to prevent sources
// from overwriting each other's registrations.
type ProxySource string
// CancelFunc is a type for a returned function that can be called to cancel a
// watch.
type CancelFunc func()
// Manager provides an API with which proxy services can be registered, and
// coordinates the fetching (and refreshing) of intentions, upstreams, discovery
// chain, certificates etc.
//
// Consumers such as the xDS server can then subscribe to receive snapshots of
// this data whenever it changes.
//
// See package docs for more detail.
type Manager struct {
ManagerConfig
rateLimiter *rate.Limiter
mu sync.Mutex
proxies map[ProxyID]*state
watchers map[ProxyID]map[uint64]chan *ConfigSnapshot
maxWatchID uint64
}
// ManagerConfig holds the required external dependencies for a Manager
// instance. All fields must be set to something valid or the manager will
// panic. The ManagerConfig is passed by value to NewManager so the passed value
// can be mutated safely.
type ManagerConfig struct {
// DataSources contains the dependencies used to consume data used to configure
// proxies.
DataSources DataSources
// source describes the current agent's identity, it's used directly for
// prepared query discovery but also indirectly as a way to pass current
// Datacenter name into other request types that need it. This is sufficient
// for now and cleaner than passing the entire RuntimeConfig.
Source *structs.QuerySource
// DNSConfig is the agent's relevant DNS config for any proxies.
DNSConfig DNSConfig
// logger is the agent's logger to be used for logging logs.
Logger hclog.Logger
TLSConfigurator *tlsutil.Configurator
// IntentionDefaultAllow is set by the agent so that we can pass this
// information to proxies that need to make intention decisions on their
// own.
IntentionDefaultAllow bool
// UpdateRateLimit controls the rate at which config snapshots are delivered
// when updates are received from data sources. This enables us to reduce the
// impact of updates to "global" resources (e.g. proxy-defaults and wildcard
// intentions) that could otherwise saturate system resources, and cause Raft
// or gossip instability.
//
// Defaults to rate.Inf (no rate limit).
UpdateRateLimit rate.Limit
}
// NewManager constructs a Manager.
func NewManager(cfg ManagerConfig) (*Manager, error) {
if cfg.Source == nil || cfg.Logger == nil {
return nil, errors.New("all ManagerConfig fields must be provided")
}
if cfg.UpdateRateLimit == 0 {
cfg.UpdateRateLimit = rate.Inf
}
m := &Manager{
ManagerConfig: cfg,
proxies: make(map[ProxyID]*state),
watchers: make(map[ProxyID]map[uint64]chan *ConfigSnapshot),
rateLimiter: rate.NewLimiter(cfg.UpdateRateLimit, 1),
}
return m, nil
}
// UpdateRateLimit returns the configured update rate limit (see ManagerConfig).
func (m *Manager) UpdateRateLimit() rate.Limit {
return m.rateLimiter.Limit()
}
// SetUpdateRateLimit configures the update rate limit (see ManagerConfig).
func (m *Manager) SetUpdateRateLimit(l rate.Limit) {
m.rateLimiter.SetLimit(l)
}
// RegisteredProxies returns a list of the proxies tracked by Manager, filtered
// by source.
func (m *Manager) RegisteredProxies(source ProxySource) []ProxyID {
m.mu.Lock()
defer m.mu.Unlock()
proxies := make([]ProxyID, 0, len(m.proxies))
for id, state := range m.proxies {
if state.source != source {
continue
}
proxies = append(proxies, id)
}
return proxies
}
// Register and start fetching resources for the given proxy service. If the
// given service was already registered by a different source (e.g. we began
// tracking it from the catalog, but then it was registered to the server
// agent locally) the service will be left as-is unless overwrite is true.
func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySource, token string, overwrite bool) error {
m.mu.Lock()
defer m.mu.Unlock()
defer func() {
if r := recover(); r != nil {
m.Logger.Error("unexpected panic during service manager registration",
"node", id.NodeName,
"service", id.ServiceID,
"message", r,
"stacktrace", string(debug.Stack()),
)
}
}()
return m.register(id, ns, source, token, overwrite)
}
func (m *Manager) register(id ProxyID, ns *structs.NodeService, source ProxySource, token string, overwrite bool) error {
state, ok := m.proxies[id]
if ok && !state.stoppedRunning() {
if state.source != source && !overwrite {
// Registered by a different source, leave as-is.
return nil
}
if !state.Changed(ns, token) {
// No change
return nil
}
// We are updating the proxy, close its old state
state.Close(false)
}
// TODO: move to a function that translates ManagerConfig->stateConfig
stateConfig := stateConfig{
logger: m.Logger.With("service_id", id.String()),
dataSources: m.DataSources,
source: m.Source,
dnsConfig: m.DNSConfig,
intentionDefaultAllow: m.IntentionDefaultAllow,
}
if m.TLSConfigurator != nil {
stateConfig.serverSNIFn = m.TLSConfigurator.ServerSNI
}
var err error
state, err = newState(id, ns, source, token, stateConfig, m.rateLimiter)
if err != nil {
return err
}
if _, err = state.Watch(); err != nil {
return err
}
m.proxies[id] = state
// Start a goroutine that will wait for changes and broadcast them to watchers.
go m.notifyBroadcast(id, state)
return nil
}
// Deregister the given proxy service, but only if it was registered by the same
// source.
func (m *Manager) Deregister(id ProxyID, source ProxySource) {
m.mu.Lock()
defer m.mu.Unlock()
state, ok := m.proxies[id]
if !ok {
return
}
if state.source != source {
return
}
// Closing state will let the goroutine we started in Register finish since
// watch chan is closed
state.Close(false)
delete(m.proxies, id)
// We intentionally leave potential watchers hanging here - there is no new
// config for them and closing their channels might be indistinguishable from
// an error that they should retry. We rely for them to eventually give up
// (because they are in fact not running any more) and so the watches be
// cleaned up naturally.
}
func (m *Manager) notifyBroadcast(proxyID ProxyID, state *state) {
// Run until ch is closed (by a defer in state.run).
for snap := range state.snapCh {
m.notify(&snap)
}
// If state.run exited because of an irrecoverable error, close all of the
// watchers so that the consumers reconnect/retry at a higher level.
if state.failed() {
m.closeAllWatchers(proxyID)
}
}
func (m *Manager) notify(snap *ConfigSnapshot) {
m.mu.Lock()
defer m.mu.Unlock()
watchers, ok := m.watchers[snap.ProxyID]
if !ok {
return
}
for _, ch := range watchers {
m.deliverLatest(snap, ch)
}
}
// deliverLatest delivers the snapshot to a watch chan. If the delivery blocks,
// it will drain the chan and then re-attempt delivery so that a slow consumer
// gets the latest config earlier. This MUST be called from a method where m.mu
// is held to be safe since it assumes we are the only goroutine sending on ch.
func (m *Manager) deliverLatest(snap *ConfigSnapshot, ch chan *ConfigSnapshot) {
// Send if chan is empty
select {
case ch <- snap:
return
default:
}
// Not empty, drain the chan of older snapshots and redeliver. For now we only
// use 1-buffered chans but this will still work if we change that later.
OUTER:
for {
select {
case <-ch:
continue
default:
break OUTER
}
}
// Now send again
select {
case ch <- snap:
return
default:
// This should not be possible since we should be the only sender, enforced
// by m.mu but error and drop the update rather than panic.
m.Logger.Error("failed to deliver ConfigSnapshot to proxy",
"proxy", snap.ProxyID.String(),
)
}
}
// Watch registers a watch on a proxy. It might not exist yet in which case this
// will not fail, but no updates will be delivered until the proxy is
// registered. If there is already a valid snapshot in memory, it will be
// delivered immediately.
func (m *Manager) Watch(id ProxyID) (<-chan *ConfigSnapshot, CancelFunc) {
m.mu.Lock()
defer m.mu.Unlock()
// This buffering is crucial otherwise we'd block immediately trying to
// deliver the current snapshot below if we already have one.
ch := make(chan *ConfigSnapshot, 1)
watchers, ok := m.watchers[id]
if !ok {
watchers = make(map[uint64]chan *ConfigSnapshot)
}
watchID := m.maxWatchID
m.maxWatchID++
watchers[watchID] = ch
m.watchers[id] = watchers
// Deliver the current snapshot immediately if there is one ready
if state, ok := m.proxies[id]; ok {
if snap := state.CurrentSnapshot(); snap != nil {
// We rely on ch being buffered above and that it's not been passed
// anywhere so we must be the only writer so this will never block and
// deadlock.
ch <- snap
}
}
return ch, func() {
m.mu.Lock()
defer m.mu.Unlock()
m.closeWatchLocked(id, watchID)
}
}
func (m *Manager) closeAllWatchers(proxyID ProxyID) {
m.mu.Lock()
defer m.mu.Unlock()
watchers, ok := m.watchers[proxyID]
if !ok {
return
}
for watchID := range watchers {
m.closeWatchLocked(proxyID, watchID)
}
}
// closeWatchLocked cleans up state related to a single watcher. It assumes the
// lock is held.
func (m *Manager) closeWatchLocked(proxyID ProxyID, watchID uint64) {
if watchers, ok := m.watchers[proxyID]; ok {
if ch, ok := watchers[watchID]; ok {
delete(watchers, watchID)
close(ch)
if len(watchers) == 0 {
delete(m.watchers, proxyID)
}
}
}
}
// Close removes all state and stops all running goroutines.
func (m *Manager) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
// Close all current watchers first
for proxyID, watchers := range m.watchers {
for watchID := range watchers {
m.closeWatchLocked(proxyID, watchID)
}
}
// Then close all states
for proxyID, state := range m.proxies {
state.Close(false)
delete(m.proxies, proxyID)
}
return nil
}