From 364d4f5efed113b0aa810825f4cb539400ab7bc6 Mon Sep 17 00:00:00 2001 From: cskh Date: Thu, 19 May 2022 16:03:46 -0400 Subject: [PATCH] Retry on bad dogstatsd connection (#13091) - Introduce a new telemetry configurable parameter retry_failed_connection. User can set the value to true to let consul agent continue its start process on failed connection to datadog server. When set to false, agent will stop on failed start. The default behavior is true. Co-authored-by: Dan Upton Co-authored-by: Evan Culver --- .changelog/13091.txt | 5 + agent/acl_test.go | 4 +- agent/agent.go | 1 + agent/agent_endpoint.go | 4 +- agent/agent_endpoint_test.go | 9 +- agent/config/builder.go | 1 + agent/config/config.go | 1 + agent/config/default.go | 1 + agent/config/runtime_test.go | 1 + .../TestRuntimeConfig_Sanitize.golden | 1 + agent/config/testdata/full-config.hcl | 1 + agent/config/testdata/full-config.json | 1 + agent/setup.go | 24 +-- agent/testagent.go | 5 +- connect/proxy/proxy.go | 2 +- lib/telemetry.go | 141 ++++++++++++++---- lib/telemetry_test.go | 66 ++++++++ 17 files changed, 212 insertions(+), 56 deletions(-) create mode 100644 .changelog/13091.txt create mode 100644 lib/telemetry_test.go diff --git a/.changelog/13091.txt b/.changelog/13091.txt new file mode 100644 index 0000000000..54b642f3d8 --- /dev/null +++ b/.changelog/13091.txt @@ -0,0 +1,5 @@ +```release-note:improvement +config: introduce `telemetry.retry_failed_connection` in agent configuration to +retry on failed connection to any telemetry backend. This prevents the agent from +exiting if the given DogStatsD DNS name is unresolvable, for example. +``` \ No newline at end of file diff --git a/agent/acl_test.go b/agent/acl_test.go index 995a3b6e6e..3ed7ce325c 100644 --- a/agent/acl_test.go +++ b/agent/acl_test.go @@ -71,7 +71,9 @@ func NewTestACLAgent(t *testing.T, name string, hcl string, resolveAuthz authzRe Output: logBuffer, TimeFormat: "04:05.000", }) - bd.MetricsHandler = metrics.NewInmemSink(1*time.Second, time.Minute) + bd.MetricsConfig = &lib.MetricsConfig{ + Handler: metrics.NewInmemSink(1*time.Second, time.Minute), + } agent, err := New(bd) require.NoError(t, err) diff --git a/agent/agent.go b/agent/agent.go index b1e5fd4aae..86917c53d8 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1429,6 +1429,7 @@ func (a *Agent) ShutdownAgent() error { // this would be cancelled anyways (by the closing of the shutdown ch) but // this should help them to be stopped more quickly a.baseDeps.AutoConfig.Stop() + a.baseDeps.MetricsConfig.Cancel() a.stateLock.Lock() defer a.stateLock.Unlock() diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 385e08bfdb..87805a24ad 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -173,7 +173,7 @@ func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request) handler.ServeHTTP(resp, req) return nil, nil } - return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req) + return s.agent.baseDeps.MetricsConfig.Handler.DisplayMetrics(resp, req) } func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -210,7 +210,7 @@ func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Re flusher: flusher, } enc.encoder.SetIndent("", " ") - s.agent.baseDeps.MetricsHandler.Stream(req.Context(), enc) + s.agent.baseDeps.MetricsConfig.Handler.Stream(req.Context(), enc) return nil, nil } diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 8fccd20930..05c3e59ac6 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -39,6 +39,7 @@ import ( tokenStore "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/xds/proxysupport" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" @@ -1563,7 +1564,9 @@ func TestHTTPHandlers_AgentMetricsStream_ACLDeny(t *testing.T) { bd := BaseDeps{} bd.Tokens = new(tokenStore.Store) sink := metrics.NewInmemSink(30*time.Millisecond, time.Second) - bd.MetricsHandler = sink + bd.MetricsConfig = &lib.MetricsConfig{ + Handler: sink, + } d := fakeResolveTokenDelegate{authorizer: acl.DenyAll()} agent := &Agent{ baseDeps: bd, @@ -1590,7 +1593,9 @@ func TestHTTPHandlers_AgentMetricsStream(t *testing.T) { bd := BaseDeps{} bd.Tokens = new(tokenStore.Store) sink := metrics.NewInmemSink(20*time.Millisecond, time.Second) - bd.MetricsHandler = sink + bd.MetricsConfig = &lib.MetricsConfig{ + Handler: sink, + } d := fakeResolveTokenDelegate{authorizer: acl.ManageAll()} agent := &Agent{ baseDeps: bd, diff --git a/agent/config/builder.go b/agent/config/builder.go index 9a814bcb5c..741aa06b12 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -917,6 +917,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) { DisableHostname: boolVal(c.Telemetry.DisableHostname), DogstatsdAddr: stringVal(c.Telemetry.DogstatsdAddr), DogstatsdTags: c.Telemetry.DogstatsdTags, + RetryFailedConfiguration: boolVal(c.Telemetry.RetryFailedConfiguration), FilterDefault: boolVal(c.Telemetry.FilterDefault), AllowedPrefixes: telemetryAllowedPrefixes, BlockedPrefixes: telemetryBlockedPrefixes, diff --git a/agent/config/config.go b/agent/config/config.go index d78e7098dd..0bb16cda55 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -674,6 +674,7 @@ type Telemetry struct { DisableHostname *bool `mapstructure:"disable_hostname"` DogstatsdAddr *string `mapstructure:"dogstatsd_addr"` DogstatsdTags []string `mapstructure:"dogstatsd_tags"` + RetryFailedConfiguration *bool `mapstructure:"retry_failed_connection"` FilterDefault *bool `mapstructure:"filter_default"` PrefixFilter []string `mapstructure:"prefix_filter"` MetricsPrefix *string `mapstructure:"metrics_prefix"` diff --git a/agent/config/default.go b/agent/config/default.go index 9355ec7ff0..8d1846e99a 100644 --- a/agent/config/default.go +++ b/agent/config/default.go @@ -128,6 +128,7 @@ func DefaultSource() Source { metrics_prefix = "consul" filter_default = true prefix_filter = [] + retry_failed_connection = true } raft_snapshot_threshold = ` + strconv.Itoa(int(cfg.RaftConfig.SnapshotThreshold)) + ` raft_snapshot_interval = "` + cfg.RaftConfig.SnapshotInterval.String() + `" diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 719d8ddeba..cdbcb50bc9 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -6306,6 +6306,7 @@ func TestLoad_FullConfig(t *testing.T) { DisableHostname: true, DogstatsdAddr: "0wSndumK", DogstatsdTags: []string{"3N81zSUB", "Xtj8AnXZ"}, + RetryFailedConfiguration: true, FilterDefault: true, AllowedPrefixes: []string{"oJotS8XJ"}, BlockedPrefixes: []string{"cazlEhGn", "ftO6DySn.rpc.server.call"}, diff --git a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden index 6cff27c9b0..159f600d29 100644 --- a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden +++ b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden @@ -418,6 +418,7 @@ "DisableHostname": false, "DogstatsdAddr": "", "DogstatsdTags": [], + "RetryFailedConfiguration": false, "FilterDefault": false, "MetricsPrefix": "", "PrometheusOpts": { diff --git a/agent/config/testdata/full-config.hcl b/agent/config/testdata/full-config.hcl index c52488751c..670d30a16f 100644 --- a/agent/config/testdata/full-config.hcl +++ b/agent/config/testdata/full-config.hcl @@ -647,6 +647,7 @@ telemetry { disable_hostname = true dogstatsd_addr = "0wSndumK" dogstatsd_tags = [ "3N81zSUB","Xtj8AnXZ" ] + retry_failed_connection = true filter_default = true prefix_filter = [ "+oJotS8XJ","-cazlEhGn" ] metrics_prefix = "ftO6DySn" diff --git a/agent/config/testdata/full-config.json b/agent/config/testdata/full-config.json index f051b9d817..3b08e0f1c0 100644 --- a/agent/config/testdata/full-config.json +++ b/agent/config/testdata/full-config.json @@ -644,6 +644,7 @@ "disable_hostname": true, "dogstatsd_addr": "0wSndumK", "dogstatsd_tags": [ "3N81zSUB","Xtj8AnXZ" ], + "retry_failed_connection": true, "filter_default": true, "prefix_filter": [ "+oJotS8XJ","-cazlEhGn" ], "metrics_prefix": "ftO6DySn", diff --git a/agent/setup.go b/agent/setup.go index bbe54ae06e..6e96f468fc 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -1,15 +1,12 @@ package agent import ( - "context" "fmt" "io" "net" - "net/http" "sync" "time" - "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/grpclog" @@ -41,18 +38,12 @@ import ( type BaseDeps struct { consul.Deps // TODO: un-embed - RuntimeConfig *config.RuntimeConfig - MetricsHandler MetricsHandler - AutoConfig *autoconf.AutoConfig // TODO: use an interface - Cache *cache.Cache - ViewStore *submatview.Store - WatchedFiles []string -} - -// MetricsHandler provides an http.Handler for displaying metrics. -type MetricsHandler interface { - DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) - Stream(ctx context.Context, encoder metrics.Encoder) + RuntimeConfig *config.RuntimeConfig + MetricsConfig *lib.MetricsConfig + AutoConfig *autoconf.AutoConfig // TODO: use an interface + Cache *cache.Cache + ViewStore *submatview.Store + WatchedFiles []string } type ConfigLoader func(source config.Source) (config.LoadResult, error) @@ -90,7 +81,8 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) cfg.Telemetry.PrometheusOpts.GaugeDefinitions = gauges cfg.Telemetry.PrometheusOpts.CounterDefinitions = counters cfg.Telemetry.PrometheusOpts.SummaryDefinitions = summaries - d.MetricsHandler, err = lib.InitTelemetry(cfg.Telemetry) + + d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger) if err != nil { return d, fmt.Errorf("failed to initialize telemetry: %w", err) } diff --git a/agent/testagent.go b/agent/testagent.go index 4dbf859bc8..6b1c2ed518 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" @@ -216,7 +217,9 @@ func (a *TestAgent) Start(t *testing.T) error { bd.Logger = logger // if we are not testing telemetry things, let's use a "mock" sink for metrics if bd.RuntimeConfig.Telemetry.Disable { - bd.MetricsHandler = metrics.NewInmemSink(1*time.Second, time.Minute) + bd.MetricsConfig = &lib.MetricsConfig{ + Handler: metrics.NewInmemSink(1*time.Second, time.Minute), + } } if a.Config != nil && bd.RuntimeConfig.AutoReloadConfigCoalesceInterval == 0 { diff --git a/connect/proxy/proxy.go b/connect/proxy/proxy.go index 86cc24d464..f77d96ca93 100644 --- a/connect/proxy/proxy.go +++ b/connect/proxy/proxy.go @@ -57,7 +57,7 @@ func (p *Proxy) Serve() error { // Setup telemetry if configured // NOTE(kit): As far as I can tell, all of the metrics in the proxy are generated at runtime, so we // don't have any static metrics we initialize at start. - _, err := lib.InitTelemetry(newCfg.Telemetry) + _, err := lib.InitTelemetry(newCfg.Telemetry, p.logger) if err != nil { p.logger.Error("proxy telemetry config error", "error", err) } diff --git a/lib/telemetry.go b/lib/telemetry.go index d74edb37cb..6183c6fac0 100644 --- a/lib/telemetry.go +++ b/lib/telemetry.go @@ -1,12 +1,20 @@ package lib import ( + "context" + "errors" + "net" + "net/http" + "sync" "time" "github.com/armon/go-metrics" "github.com/armon/go-metrics/circonus" "github.com/armon/go-metrics/datadog" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/consul/lib/retry" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-multierror" ) // TelemetryConfig is embedded in config.RuntimeConfig and holds the @@ -153,6 +161,11 @@ type TelemetryConfig struct { // hcl: telemetry { dogstatsd_tags = []string } DogstatsdTags []string `json:"dogstatsd_tags,omitempty" mapstructure:"dogstatsd_tags"` + // RetryFailedConfiguration retries transient errors when setting up sinks (e.g. network errors when connecting to telemetry backends). + // + // hcl: telemetry { retry_failed_connection = (true|false) } + RetryFailedConfiguration bool `json:"retry_failed_connection,omitempty" mapstructure:"retry_failed_connection"` + // FilterDefault is the default for whether to allow a metric that's not // covered by the filter. // @@ -199,6 +212,27 @@ type TelemetryConfig struct { PrometheusOpts prometheus.PrometheusOpts } +// MetricsHandler provides an http.Handler for displaying metrics. +type MetricsHandler interface { + DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) + Stream(ctx context.Context, encoder metrics.Encoder) +} + +type MetricsConfig struct { + Handler MetricsHandler + mu sync.Mutex + cancelFn context.CancelFunc +} + +func (cfg *MetricsConfig) Cancel() { + cfg.mu.Lock() + defer cfg.mu.Unlock() + + if cfg.cancelFn != nil { + cfg.cancelFn() + } +} + func statsiteSink(cfg TelemetryConfig, hostname string) (metrics.MetricSink, error) { addr := cfg.StatsiteAddr if addr == "" { @@ -283,17 +317,7 @@ func circonusSink(cfg TelemetryConfig, hostname string) (metrics.MetricSink, err return sink, nil } -// InitTelemetry configures go-metrics based on map of telemetry config -// values as returned by Runtimecfg.Config(). -func InitTelemetry(cfg TelemetryConfig) (*metrics.InmemSink, error) { - if cfg.Disable { - return nil, nil - } - // Setup telemetry - // Aggregate on 10 second intervals for 1 minute. Expose the - // metrics over stderr when there is a SIGUSR1 received. - memSink := metrics.NewInmemSink(10*time.Second, time.Minute) - metrics.DefaultInmemSignal(memSink) +func configureSinks(cfg TelemetryConfig, hostName string, memSink metrics.MetricSink) (metrics.FanoutSink, error) { metricsConf := metrics.DefaultConfig(cfg.MetricsPrefix) metricsConf.EnableHostname = !cfg.DisableHostname metricsConf.FilterDefault = cfg.FilterDefault @@ -301,35 +325,24 @@ func InitTelemetry(cfg TelemetryConfig) (*metrics.InmemSink, error) { metricsConf.BlockedPrefixes = cfg.BlockedPrefixes var sinks metrics.FanoutSink - addSink := func(fn func(TelemetryConfig, string) (metrics.MetricSink, error)) error { + var errors error + addSink := func(fn func(TelemetryConfig, string) (metrics.MetricSink, error)) { s, err := fn(cfg, metricsConf.HostName) if err != nil { - return err + errors = multierror.Append(errors, err) + return } if s != nil { sinks = append(sinks, s) } - return nil } - if err := addSink(statsiteSink); err != nil { - return nil, err - } - if err := addSink(statsdSink); err != nil { - return nil, err - } - if err := addSink(dogstatdSink); err != nil { - return nil, err - } - if err := addSink(circonusSink); err != nil { - return nil, err - } - if err := addSink(circonusSink); err != nil { - return nil, err - } - if err := addSink(prometheusSink); err != nil { - return nil, err - } + addSink(statsiteSink) + addSink(statsdSink) + addSink(dogstatdSink) + addSink(circonusSink) + addSink(circonusSink) + addSink(prometheusSink) if len(sinks) > 0 { sinks = append(sinks, memSink) @@ -338,5 +351,67 @@ func InitTelemetry(cfg TelemetryConfig) (*metrics.InmemSink, error) { metricsConf.EnableHostname = false metrics.NewGlobal(metricsConf, memSink) } - return memSink, nil + return sinks, errors +} + +// InitTelemetry configures go-metrics based on map of telemetry config +// values as returned by Runtimecfg.Config(). +// InitTelemetry retries configurating the sinks in case error is retriable +// and retry_failed_connection is set to true. +func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger) (*MetricsConfig, error) { + if cfg.Disable { + return nil, nil + } + + memSink := metrics.NewInmemSink(10*time.Second, time.Minute) + metrics.DefaultInmemSignal(memSink) + metricsConf := metrics.DefaultConfig(cfg.MetricsPrefix) + + metricsConfig := &MetricsConfig{ + Handler: memSink, + } + + var cancel context.CancelFunc + var ctx context.Context + retryWithBackoff := func() { + waiter := &retry.Waiter{ + MaxWait: 5 * time.Minute, + } + for { + logger.Warn("retrying configure metric sinks", "retries", waiter.Failures()) + _, err := configureSinks(cfg, metricsConf.HostName, memSink) + if err == nil { + logger.Info("successfully configured metrics sinks") + return + } + logger.Error("failed configure sinks", "error", multierror.Flatten(err)) + + if err := waiter.Wait(ctx); err != nil { + logger.Trace("stop retrying configure metrics sinks") + } + } + } + + if _, errs := configureSinks(cfg, metricsConf.HostName, memSink); errs != nil { + if isRetriableError(errs) && cfg.RetryFailedConfiguration { + logger.Warn("failed configure sinks", "error", multierror.Flatten(errs)) + ctx, cancel = context.WithCancel(context.Background()) + + metricsConfig.mu.Lock() + metricsConfig.cancelFn = cancel + metricsConfig.mu.Unlock() + go retryWithBackoff() + } else { + return nil, errs + } + } + return metricsConfig, nil +} + +func isRetriableError(errs error) bool { + var dnsError *net.DNSError + if errors.As(errs, &dnsError) && dnsError.IsNotFound { + return true + } + return false } diff --git a/lib/telemetry_test.go b/lib/telemetry_test.go new file mode 100644 index 0000000000..8f0ec176de --- /dev/null +++ b/lib/telemetry_test.go @@ -0,0 +1,66 @@ +package lib + +import ( + "errors" + "net" + "os" + "testing" + + "github.com/hashicorp/consul/logging" + "github.com/hashicorp/go-multierror" + "github.com/stretchr/testify/require" +) + +func newCfg() TelemetryConfig { + return TelemetryConfig{ + StatsdAddr: "statsd.host:1234", + StatsiteAddr: "statsite.host:1234", + DogstatsdAddr: "mydog.host:8125", + } +} + +func TestConfigureSinks(t *testing.T) { + cfg := newCfg() + sinks, err := configureSinks(cfg, "hostname", nil) + require.Error(t, err) + // 3 sinks: statsd, statsite, inmem + require.Equal(t, 3, len(sinks)) + + cfg = TelemetryConfig{ + DogstatsdAddr: "", + } + _, err = configureSinks(cfg, "hostname", nil) + require.NoError(t, err) + +} + +func TestIsRetriableError(t *testing.T) { + var err error + err = multierror.Append(err, errors.New("an error")) + r := isRetriableError(err) + require.False(t, r) + + err = multierror.Append(err, &net.DNSError{ + IsNotFound: true, + }) + r = isRetriableError(err) + require.True(t, r) +} + +func TestInitTelemetryRetrySuccess(t *testing.T) { + logger, err := logging.Setup(logging.Config{ + LogLevel: "INFO", + }, os.Stdout) + require.NoError(t, err) + cfg := newCfg() + _, err = InitTelemetry(cfg, logger) + require.Error(t, err) + + cfg.RetryFailedConfiguration = true + metricsCfg, err := InitTelemetry(cfg, logger) + require.NoError(t, err) + // TODO: we couldn't extract the metrics sinks from the + // global metrics due to it's limitation + // fanoutSink := metrics.Default()} + metricsCfg.cancelFn() +}