From 11245e53a749eacc6c832139ebdc802b70bda5c1 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Fri, 2 Jun 2017 15:30:03 +0200 Subject: [PATCH] agent: move telemetry out of the run method --- command/agent/command.go | 203 ++++++++++++++++++++++----------------- 1 file changed, 113 insertions(+), 90 deletions(-) diff --git a/command/agent/command.go b/command/agent/command.go index 2b389bc4f2..17b41df6cb 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -520,6 +520,116 @@ func (cmd *Command) startupJoinWan(agent *Agent, cfg *Config) error { return nil } +func statsiteSink(config *Config, hostname string) (metrics.MetricSink, error) { + if config.Telemetry.StatsiteAddr == "" { + return nil, nil + } + return metrics.NewStatsiteSink(config.Telemetry.StatsiteAddr) +} + +func statsdSink(config *Config, hostname string) (metrics.MetricSink, error) { + if config.Telemetry.StatsdAddr == "" { + return nil, nil + } + return metrics.NewStatsdSink(config.Telemetry.StatsdAddr) +} + +func dogstatdSink(config *Config, hostname string) (metrics.MetricSink, error) { + if config.Telemetry.DogStatsdAddr == "" { + return nil, nil + } + sink, err := datadog.NewDogStatsdSink(config.Telemetry.DogStatsdAddr, hostname) + if err != nil { + return nil, err + } + sink.SetTags(config.Telemetry.DogStatsdTags) + return sink, nil +} + +func circonusSink(config *Config, hostname string) (metrics.MetricSink, error) { + if config.Telemetry.CirconusAPIToken == "" && config.Telemetry.CirconusCheckSubmissionURL == "" { + return nil, nil + } + + cfg := &circonus.Config{} + cfg.Interval = config.Telemetry.CirconusSubmissionInterval + cfg.CheckManager.API.TokenKey = config.Telemetry.CirconusAPIToken + cfg.CheckManager.API.TokenApp = config.Telemetry.CirconusAPIApp + cfg.CheckManager.API.URL = config.Telemetry.CirconusAPIURL + cfg.CheckManager.Check.SubmissionURL = config.Telemetry.CirconusCheckSubmissionURL + cfg.CheckManager.Check.ID = config.Telemetry.CirconusCheckID + cfg.CheckManager.Check.ForceMetricActivation = config.Telemetry.CirconusCheckForceMetricActivation + cfg.CheckManager.Check.InstanceID = config.Telemetry.CirconusCheckInstanceID + cfg.CheckManager.Check.SearchTag = config.Telemetry.CirconusCheckSearchTag + cfg.CheckManager.Check.DisplayName = config.Telemetry.CirconusCheckDisplayName + cfg.CheckManager.Check.Tags = config.Telemetry.CirconusCheckTags + cfg.CheckManager.Broker.ID = config.Telemetry.CirconusBrokerID + cfg.CheckManager.Broker.SelectTag = config.Telemetry.CirconusBrokerSelectTag + + if cfg.CheckManager.Check.DisplayName == "" { + cfg.CheckManager.Check.DisplayName = "Consul" + } + + if cfg.CheckManager.API.TokenApp == "" { + cfg.CheckManager.API.TokenApp = "consul" + } + + if cfg.CheckManager.Check.SearchTag == "" { + cfg.CheckManager.Check.SearchTag = "service:consul" + } + + sink, err := circonus.NewCirconusSink(cfg) + if err != nil { + return nil, err + } + sink.Start() + return sink, nil +} + +func startupTelemetry(config *Config) error { + // Setup telemetry + // Aggregate on 10 second intervals for 1 minute. Expose the + // metrics over stderr when there is a SIGUSR1 received. + memSink := metrics.NewInmemSink(10*time.Second, time.Minute) + metrics.DefaultInmemSignal(memSink) + metricsConf := metrics.DefaultConfig(config.Telemetry.StatsitePrefix) + metricsConf.EnableHostname = !config.Telemetry.DisableHostname + + var sinks metrics.FanoutSink + addSink := func(name string, fn func(*Config, string) (metrics.MetricSink, error)) error { + s, err := fn(config, metricsConf.HostName) + if err != nil { + return err + } + if s != nil { + sinks = append(sinks, s) + } + return nil + } + + if err := addSink("statsite", statsiteSink); err != nil { + return err + } + if err := addSink("statsd", statsdSink); err != nil { + return err + } + if err := addSink("dogstatd", dogstatdSink); err != nil { + return err + } + if err := addSink("circonus", circonusSink); err != nil { + return err + } + + if len(sinks) > 0 { + sinks = append(sinks, memSink) + metrics.NewGlobal(metricsConf, sinks) + } else { + metricsConf.EnableHostname = false + metrics.NewGlobal(metricsConf, memSink) + } + return nil +} + func (cmd *Command) Run(args []string) int { cmd.UI = &cli.PrefixedUi{ OutputPrefix: "==> ", @@ -548,96 +658,9 @@ func (cmd *Command) Run(args []string) int { cmd.logFilter = logFilter cmd.logOutput = logOutput - // Setup telemetry - // Aggregate on 10 second intervals for 1 minute. Expose the - // metrics over stderr when there is a SIGUSR1 received. - inm := metrics.NewInmemSink(10*time.Second, time.Minute) - metrics.DefaultInmemSignal(inm) - metricsConf := metrics.DefaultConfig(config.Telemetry.StatsitePrefix) - metricsConf.EnableHostname = !config.Telemetry.DisableHostname - - // Configure the statsite sink - var fanout metrics.FanoutSink - if config.Telemetry.StatsiteAddr != "" { - sink, err := metrics.NewStatsiteSink(config.Telemetry.StatsiteAddr) - if err != nil { - cmd.UI.Error(fmt.Sprintf("Failed to start statsite sink. Got: %s", err)) - return 1 - } - fanout = append(fanout, sink) - } - - // Configure the statsd sink - if config.Telemetry.StatsdAddr != "" { - sink, err := metrics.NewStatsdSink(config.Telemetry.StatsdAddr) - if err != nil { - cmd.UI.Error(fmt.Sprintf("Failed to start statsd sink. Got: %s", err)) - return 1 - } - fanout = append(fanout, sink) - } - - // Configure the DogStatsd sink - if config.Telemetry.DogStatsdAddr != "" { - var tags []string - - if config.Telemetry.DogStatsdTags != nil { - tags = config.Telemetry.DogStatsdTags - } - - sink, err := datadog.NewDogStatsdSink(config.Telemetry.DogStatsdAddr, metricsConf.HostName) - if err != nil { - cmd.UI.Error(fmt.Sprintf("Failed to start DogStatsd sink. Got: %s", err)) - return 1 - } - sink.SetTags(tags) - fanout = append(fanout, sink) - } - - if config.Telemetry.CirconusAPIToken != "" || config.Telemetry.CirconusCheckSubmissionURL != "" { - cfg := &circonus.Config{} - cfg.Interval = config.Telemetry.CirconusSubmissionInterval - cfg.CheckManager.API.TokenKey = config.Telemetry.CirconusAPIToken - cfg.CheckManager.API.TokenApp = config.Telemetry.CirconusAPIApp - cfg.CheckManager.API.URL = config.Telemetry.CirconusAPIURL - cfg.CheckManager.Check.SubmissionURL = config.Telemetry.CirconusCheckSubmissionURL - cfg.CheckManager.Check.ID = config.Telemetry.CirconusCheckID - cfg.CheckManager.Check.ForceMetricActivation = config.Telemetry.CirconusCheckForceMetricActivation - cfg.CheckManager.Check.InstanceID = config.Telemetry.CirconusCheckInstanceID - cfg.CheckManager.Check.SearchTag = config.Telemetry.CirconusCheckSearchTag - cfg.CheckManager.Check.DisplayName = config.Telemetry.CirconusCheckDisplayName - cfg.CheckManager.Check.Tags = config.Telemetry.CirconusCheckTags - cfg.CheckManager.Broker.ID = config.Telemetry.CirconusBrokerID - cfg.CheckManager.Broker.SelectTag = config.Telemetry.CirconusBrokerSelectTag - - if cfg.CheckManager.Check.DisplayName == "" { - cfg.CheckManager.Check.DisplayName = "Consul" - } - - if cfg.CheckManager.API.TokenApp == "" { - cfg.CheckManager.API.TokenApp = "consul" - } - - if cfg.CheckManager.Check.SearchTag == "" { - cfg.CheckManager.Check.SearchTag = "service:consul" - } - - sink, err := circonus.NewCirconusSink(cfg) - if err != nil { - cmd.UI.Error(fmt.Sprintf("Failed to start Circonus sink. Got: %s", err)) - return 1 - } - sink.Start() - fanout = append(fanout, sink) - } - - // Initialize the global sink - if len(fanout) > 0 { - fanout = append(fanout, inm) - metrics.NewGlobal(metricsConf, fanout) - } else { - metricsConf.EnableHostname = false - metrics.NewGlobal(metricsConf, inm) + if err := startupTelemetry(config); err != nil { + cmd.UI.Error(err.Error()) + return 1 } // Create the agent