Merge pull request #2193 from hashicorp/pr-2188-slackpad

Adds Circonus support for telemetry metrics.
This commit is contained in:
James Phillips 2016-07-19 17:15:29 -07:00 committed by GitHub
commit cfbe8f430e
42 changed files with 3995 additions and 94 deletions

28
Godeps/Godeps.json generated
View File

@ -13,11 +13,15 @@
}, },
{ {
"ImportPath": "github.com/armon/go-metrics", "ImportPath": "github.com/armon/go-metrics",
"Rev": "345426c77237ece5dab0e1605c3e4b35c3f54757" "Rev": "3df31a1ada83e310c2e24b267c8e8b68836547b4"
},
{
"ImportPath": "github.com/armon/go-metrics/circonus",
"Rev": "3df31a1ada83e310c2e24b267c8e8b68836547b4"
}, },
{ {
"ImportPath": "github.com/armon/go-metrics/datadog", "ImportPath": "github.com/armon/go-metrics/datadog",
"Rev": "345426c77237ece5dab0e1605c3e4b35c3f54757" "Rev": "3df31a1ada83e310c2e24b267c8e8b68836547b4"
}, },
{ {
"ImportPath": "github.com/armon/go-radix", "ImportPath": "github.com/armon/go-radix",
@ -32,6 +36,22 @@
"Comment": "v1.2.0", "Comment": "v1.2.0",
"Rev": "c6ba97b89e0454fec9aa92e1d33a4e2c5fc1f631" "Rev": "c6ba97b89e0454fec9aa92e1d33a4e2c5fc1f631"
}, },
{
"ImportPath": "github.com/circonus-labs/circonus-gometrics",
"Rev": "8e7296e1945cf2ac4adc0a08df3eb52419b227c3"
},
{
"ImportPath": "github.com/circonus-labs/circonus-gometrics/api",
"Rev": "8e7296e1945cf2ac4adc0a08df3eb52419b227c3"
},
{
"ImportPath": "github.com/circonus-labs/circonus-gometrics/checkmgr",
"Rev": "8e7296e1945cf2ac4adc0a08df3eb52419b227c3"
},
{
"ImportPath": "github.com/circonus-labs/circonusllhist",
"Rev": "d724266ae5270ae8b87a5d2e8081f04e307c3c18"
},
{ {
"ImportPath": "github.com/elazarl/go-bindata-assetfs", "ImportPath": "github.com/elazarl/go-bindata-assetfs",
"Rev": "57eb5e1fc594ad4b0b1dbea7b286d299e0cb43c2" "Rev": "57eb5e1fc594ad4b0b1dbea7b286d299e0cb43c2"
@ -140,6 +160,10 @@
"ImportPath": "github.com/hashicorp/go-reap", "ImportPath": "github.com/hashicorp/go-reap",
"Rev": "2d85522212dcf5a84c6b357094f5c44710441912" "Rev": "2d85522212dcf5a84c6b357094f5c44710441912"
}, },
{
"ImportPath": "github.com/hashicorp/go-retryablehttp",
"Rev": "886ce0458bc81ccca0fb7044c1be0e9ab590bed7"
},
{ {
"ImportPath": "github.com/hashicorp/go-syslog", "ImportPath": "github.com/hashicorp/go-syslog",
"Rev": "42a2b573b664dbf281bd48c3cc12c086b17a39ba" "Rev": "42a2b573b664dbf281bd48c3cc12c086b17a39ba"

View File

@ -82,3 +82,21 @@ See also [golang/winstrap](https://github.com/golang/winstrap) and
for more information of how to set up a general Go build environment on Windows for more information of how to set up a general Go build environment on Windows
with MinGW. with MinGW.
## Vendoring
Consul currently uses [Godep](https://github.com/tools/godep) for vendoring. These
steps can be used to update dependencies in a controlled way.
Start by running a clean golang container:
```docker run -i -t -v `pwd`:/go/src/github.com/hashicorp/consul golang sh```
After that, you'll get a shell inside the container:
1. Run `go get github.com/tools/godep` to install Godep.
2. Run `cd /go/src/github.com/hashicorp/consul` to change to the Consul repo. Note
that we mounted that as a volume above into the `GOPATH`.
3. Run `godep restore` to update the container with the current state of dependencies
based on what's vendored.
4. Update dependencies as needed.
5. Run `godep save` and look at the results carefully before committing.

View File

@ -15,6 +15,7 @@ import (
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/circonus"
"github.com/armon/go-metrics/datadog" "github.com/armon/go-metrics/datadog"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/watch" "github.com/hashicorp/consul/watch"
@ -704,6 +705,41 @@ func (c *Command) Run(args []string) int {
fanout = append(fanout, sink) fanout = append(fanout, sink)
} }
if config.Telemetry.CirconusAPIToken != "" || config.Telemetry.CirconusCheckSubmissionURL != "" {
cfg := &circonus.Config{}
cfg.Interval = config.Telemetry.CirconusSubmissionInterval
cfg.CheckManager.API.TokenKey = config.Telemetry.CirconusAPIToken
cfg.CheckManager.API.TokenApp = config.Telemetry.CirconusAPIApp
cfg.CheckManager.API.URL = config.Telemetry.CirconusAPIURL
cfg.CheckManager.Check.SubmissionURL = config.Telemetry.CirconusCheckSubmissionURL
cfg.CheckManager.Check.ID = config.Telemetry.CirconusCheckID
cfg.CheckManager.Check.ForceMetricActivation = config.Telemetry.CirconusCheckForceMetricActivation
cfg.CheckManager.Check.InstanceID = config.Telemetry.CirconusCheckInstanceID
cfg.CheckManager.Check.SearchTag = config.Telemetry.CirconusCheckSearchTag
cfg.CheckManager.Broker.ID = config.Telemetry.CirconusBrokerID
cfg.CheckManager.Broker.SelectTag = config.Telemetry.CirconusBrokerSelectTag
if cfg.CheckManager.API.TokenApp == "" {
cfg.CheckManager.API.TokenApp = "consul"
}
if cfg.CheckManager.Check.InstanceID == "" {
cfg.CheckManager.Check.InstanceID = fmt.Sprintf("%s:%s", config.NodeName, config.Datacenter)
}
if cfg.CheckManager.Check.SearchTag == "" {
cfg.CheckManager.Check.SearchTag = "service:consul"
}
sink, err := circonus.NewCirconusSink(cfg)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to start Circonus sink. Got: %s", err))
return 1
}
sink.Start()
fanout = append(fanout, sink)
}
// Initialize the global sink // Initialize the global sink
if len(fanout) > 0 { if len(fanout) > 0 {
fanout = append(fanout, inm) fanout = append(fanout, inm)

View File

@ -130,6 +130,70 @@ type Telemetry struct {
// DogStatsdTags are the global tags that should be sent with each packet to dogstatsd // DogStatsdTags are the global tags that should be sent with each packet to dogstatsd
// It is a list of strings, where each string looks like "my_tag_name:my_tag_value" // It is a list of strings, where each string looks like "my_tag_name:my_tag_value"
DogStatsdTags []string `mapstructure:"dogstatsd_tags"` DogStatsdTags []string `mapstructure:"dogstatsd_tags"`
// Circonus: see https://github.com/circonus-labs/circonus-gometrics
// for more details on the various configuration options.
// Valid configuration combinations:
// - CirconusAPIToken
// metric management enabled (search for existing check or create a new one)
// - CirconusSubmissionUrl
// metric management disabled (use check with specified submission_url,
// broker must be using a public SSL certificate)
// - CirconusAPIToken + CirconusCheckSubmissionURL
// metric management enabled (use check with specified submission_url)
// - CirconusAPIToken + CirconusCheckID
// metric management enabled (use check with specified id)
// CirconusAPIToken is a valid API Token used to create/manage check. If provided,
// metric management is enabled.
// Default: none
CirconusAPIToken string `mapstructure:"circonus_api_token"`
// CirconusAPIApp is an app name associated with API token.
// Default: "consul"
CirconusAPIApp string `mapstructure:"circonus_api_app"`
// CirconusAPIURL is the base URL to use for contacting the Circonus API.
// Default: "https://api.circonus.com/v2"
CirconusAPIURL string `mapstructure:"circonus_api_url"`
// CirconusSubmissionInterval is the interval at which metrics are submitted to Circonus.
// Default: 10s
CirconusSubmissionInterval string `mapstructure:"circonus_submission_interval"`
// CirconusCheckSubmissionURL is the check.config.submission_url field from a
// previously created HTTPTRAP check.
// Default: none
CirconusCheckSubmissionURL string `mapstructure:"circonus_submission_url"`
// CirconusCheckID is the check id (not check bundle id) from a previously created
// HTTPTRAP check. The numeric portion of the check._cid field.
// Default: none
CirconusCheckID string `mapstructure:"circonus_check_id"`
// CirconusCheckForceMetricActivation will force enabling metrics, as they are encountered,
// if the metric already exists and is NOT active. If check management is enabled, the default
// behavior is to add new metrics as they are encoutered. If the metric already exists in the
// check, it will *NOT* be activated. This setting overrides that behavior.
// Default: "false"
CirconusCheckForceMetricActivation string `mapstructure:"circonus_check_force_metric_activation"`
// CirconusCheckInstanceID serves to uniquely identify the metrics coming from this "instance".
// It can be used to maintain metric continuity with transient or ephemeral instances as
// they move around within an infrastructure.
// Default: hostname:app
CirconusCheckInstanceID string `mapstructure:"circonus_check_instance_id"`
// CirconusCheckSearchTag is a special tag which, when coupled with the instance id, helps to
// narrow down the search results when neither a Submission URL or Check ID is provided.
// Default: service:app (e.g. service:consul)
CirconusCheckSearchTag string `mapstructure:"circonus_check_search_tag"`
// CirconusBrokerID is an explicit broker to use when creating a new check. The numeric portion
// of broker._cid. If metric management is enabled and neither a Submission URL nor Check ID
// is provided, an attempt will be made to search for an existing check using Instance ID and
// Search Tag. If one is not found, a new HTTPTRAP check will be created.
// Default: use Select Tag if provided, otherwise, a random Enterprise Broker associated
// with the specified API token or the default Circonus Broker.
// Default: none
CirconusBrokerID string `mapstructure:"circonus_broker_id"`
// CirconusBrokerSelectTag is a special tag which will be used to select a broker when
// a Broker ID is not provided. The best use of this is to as a hint for which broker
// should be used based on *where* this particular instance is running.
// (e.g. a specific geo location or datacenter, dc:sfo)
// Default: none
CirconusBrokerSelectTag string `mapstructure:"circonus_broker_select_tag"`
} }
// Config is the configuration that can be set for an Agent. // Config is the configuration that can be set for an Agent.
@ -708,7 +772,9 @@ func DecodeConfig(r io.Reader) (*Config, error) {
// Check unused fields and verify that no bad configuration options were // Check unused fields and verify that no bad configuration options were
// passed to Consul. There are a few additional fields which don't directly // passed to Consul. There are a few additional fields which don't directly
// use mapstructure decoding, so we need to account for those as well. // use mapstructure decoding, so we need to account for those as well. These
// telemetry-related fields used to be available as top-level keys, so they
// are here for backward compatibility with the old format.
allowedKeys := []string{ allowedKeys := []string{
"service", "services", "check", "checks", "statsd_addr", "statsite_addr", "statsite_prefix", "service", "services", "check", "checks", "statsd_addr", "statsite_addr", "statsite_prefix",
"dogstatsd_addr", "dogstatsd_tags", "dogstatsd_addr", "dogstatsd_tags",
@ -1071,6 +1137,39 @@ func MergeConfig(a, b *Config) *Config {
if b.Telemetry.DogStatsdTags != nil { if b.Telemetry.DogStatsdTags != nil {
result.Telemetry.DogStatsdTags = b.Telemetry.DogStatsdTags result.Telemetry.DogStatsdTags = b.Telemetry.DogStatsdTags
} }
if b.Telemetry.CirconusAPIToken != "" {
result.Telemetry.CirconusAPIToken = b.Telemetry.CirconusAPIToken
}
if b.Telemetry.CirconusAPIApp != "" {
result.Telemetry.CirconusAPIApp = b.Telemetry.CirconusAPIApp
}
if b.Telemetry.CirconusAPIURL != "" {
result.Telemetry.CirconusAPIURL = b.Telemetry.CirconusAPIURL
}
if b.Telemetry.CirconusCheckSubmissionURL != "" {
result.Telemetry.CirconusCheckSubmissionURL = b.Telemetry.CirconusCheckSubmissionURL
}
if b.Telemetry.CirconusSubmissionInterval != "" {
result.Telemetry.CirconusSubmissionInterval = b.Telemetry.CirconusSubmissionInterval
}
if b.Telemetry.CirconusCheckID != "" {
result.Telemetry.CirconusCheckID = b.Telemetry.CirconusCheckID
}
if b.Telemetry.CirconusCheckForceMetricActivation != "" {
result.Telemetry.CirconusCheckForceMetricActivation = b.Telemetry.CirconusCheckForceMetricActivation
}
if b.Telemetry.CirconusCheckInstanceID != "" {
result.Telemetry.CirconusCheckInstanceID = b.Telemetry.CirconusCheckInstanceID
}
if b.Telemetry.CirconusCheckSearchTag != "" {
result.Telemetry.CirconusCheckSearchTag = b.Telemetry.CirconusCheckSearchTag
}
if b.Telemetry.CirconusBrokerID != "" {
result.Telemetry.CirconusBrokerID = b.Telemetry.CirconusBrokerID
}
if b.Telemetry.CirconusBrokerSelectTag != "" {
result.Telemetry.CirconusBrokerSelectTag = b.Telemetry.CirconusBrokerSelectTag
}
if b.EnableDebug { if b.EnableDebug {
result.EnableDebug = true result.EnableDebug = true
} }

View File

@ -725,6 +725,51 @@ func TestDecodeConfig(t *testing.T) {
t.Fatalf("bad: %#v", config) t.Fatalf("bad: %#v", config)
} }
// Circonus settings
input = `{"telemetry": {"circonus_api_token": "12345678-1234-1234-12345678", "circonus_api_app": "testApp",
"circonus_api_url": "https://api.host.foo/v2", "circonus_submission_interval": "15s",
"circonus_submission_url": "https://submit.host.bar:123/one/two/three",
"circonus_check_id": "12345", "circonus_check_force_metric_activation": "true",
"circonus_check_instance_id": "a:b", "circonus_check_search_tag": "c:d",
"circonus_broker_id": "6789", "circonus_broker_select_tag": "e:f"} }`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if config.Telemetry.CirconusAPIToken != "12345678-1234-1234-12345678" {
t.Fatalf("bad: %#v", config)
}
if config.Telemetry.CirconusAPIApp != "testApp" {
t.Fatalf("bad: %#v", config)
}
if config.Telemetry.CirconusAPIURL != "https://api.host.foo/v2" {
t.Fatalf("bad: %#v", config)
}
if config.Telemetry.CirconusSubmissionInterval != "15s" {
t.Fatalf("bad: %#v", config)
}
if config.Telemetry.CirconusCheckSubmissionURL != "https://submit.host.bar:123/one/two/three" {
t.Fatalf("bad: %#v", config)
}
if config.Telemetry.CirconusCheckID != "12345" {
t.Fatalf("bad: %#v", config)
}
if config.Telemetry.CirconusCheckForceMetricActivation != "true" {
t.Fatalf("bad: %#v", config)
}
if config.Telemetry.CirconusCheckInstanceID != "a:b" {
t.Fatalf("bad: %#v", config)
}
if config.Telemetry.CirconusCheckSearchTag != "c:d" {
t.Fatalf("bad: %#v", config)
}
if config.Telemetry.CirconusBrokerID != "6789" {
t.Fatalf("bad: %#v", config)
}
if config.Telemetry.CirconusBrokerSelectTag != "e:f" {
t.Fatalf("bad: %#v", config)
}
// New telemetry // New telemetry
input = `{"telemetry": { "statsite_prefix": "my_prefix", "statsite_address": "127.0.0.1:7250", "statsd_address":"127.0.0.1:7251", "disable_hostname": true, "dogstatsd_addr": "1.1.1.1:111", "dogstatsd_tags": [ "tag_1:val_1" ] } }` input = `{"telemetry": { "statsite_prefix": "my_prefix", "statsite_address": "127.0.0.1:7250", "statsd_address":"127.0.0.1:7251", "disable_hostname": true, "dogstatsd_addr": "1.1.1.1:111", "dogstatsd_tags": [ "tag_1:val_1" ] } }`
config, err = DecodeConfig(bytes.NewReader([]byte(input))) config, err = DecodeConfig(bytes.NewReader([]byte(input)))

2
vendor/github.com/armon/go-metrics/.gitignore generated vendored Normal file → Executable file
View File

@ -20,3 +20,5 @@ _cgo_export.*
_testmain.go _testmain.go
*.exe *.exe
/metrics.out

View File

@ -0,0 +1,92 @@
// Circonus Metrics Sink
package circonus
import (
"strings"
cgm "github.com/circonus-labs/circonus-gometrics"
)
// CirconusSink provides an interface to forward metrics to Circonus with
// automatic check creation and metric management
type CirconusSink struct {
metrics *cgm.CirconusMetrics
}
// Config options for CirconusSink
// See https://github.com/circonus-labs/circonus-gometrics for configuration options
type Config cgm.Config
// NewCirconusSink - create new metric sink for circonus
//
// one of the following must be supplied:
// - API Token - search for an existing check or create a new check
// - API Token + Check Id - the check identified by check id will be used
// - API Token + Check Submission URL - the check identified by the submission url will be used
// - Check Submission URL - the check identified by the submission url will be used
// metric management will be *disabled*
//
// Note: If submission url is supplied w/o an api token, the public circonus ca cert will be used
// to verify the broker for metrics submission.
func NewCirconusSink(cc *Config) (*CirconusSink, error) {
cfg := cgm.Config{}
if cc != nil {
cfg = cgm.Config(*cc)
}
metrics, err := cgm.NewCirconusMetrics(&cfg)
if err != nil {
return nil, err
}
return &CirconusSink{
metrics: metrics,
}, nil
}
// Start submitting metrics to Circonus (flush every SubmitInterval)
func (s *CirconusSink) Start() {
s.metrics.Start()
}
// Flush manually triggers metric submission to Circonus
func (s *CirconusSink) Flush() {
s.metrics.Flush()
}
// SetGauge sets value for a gauge metric
func (s *CirconusSink) SetGauge(key []string, val float32) {
flatKey := s.flattenKey(key)
s.metrics.SetGauge(flatKey, int64(val))
}
// EmitKey is not implemented in circonus
func (s *CirconusSink) EmitKey(key []string, val float32) {
// NOP
}
// IncrCounter increments a counter metric
func (s *CirconusSink) IncrCounter(key []string, val float32) {
flatKey := s.flattenKey(key)
s.metrics.IncrementByValue(flatKey, uint64(val))
}
// AddSample adds a sample to a histogram metric
func (s *CirconusSink) AddSample(key []string, val float32) {
flatKey := s.flattenKey(key)
s.metrics.RecordValue(flatKey, float64(val))
}
// Flattens key to Circonus metric name
func (s *CirconusSink) flattenKey(parts []string) string {
joined := strings.Join(parts, "`")
return strings.Map(func(r rune) rune {
switch r {
case ' ':
return '_'
default:
return r
}
}, joined)
}

View File

@ -25,6 +25,8 @@ type InmemSink struct {
// intervals is a slice of the retained intervals // intervals is a slice of the retained intervals
intervals []*IntervalMetrics intervals []*IntervalMetrics
intervalLock sync.RWMutex intervalLock sync.RWMutex
rateDenom float64
} }
// IntervalMetrics stores the aggregated metrics // IntervalMetrics stores the aggregated metrics
@ -66,6 +68,7 @@ func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
// about a sample // about a sample
type AggregateSample struct { type AggregateSample struct {
Count int // The count of emitted pairs Count int // The count of emitted pairs
Rate float64 // The count of emitted pairs per time unit (usually 1 second)
Sum float64 // The sum of values Sum float64 // The sum of values
SumSq float64 // The sum of squared values SumSq float64 // The sum of squared values
Min float64 // Minimum value Min float64 // Minimum value
@ -92,7 +95,7 @@ func (a *AggregateSample) Mean() float64 {
} }
// Ingest is used to update a sample // Ingest is used to update a sample
func (a *AggregateSample) Ingest(v float64) { func (a *AggregateSample) Ingest(v float64, rateDenom float64) {
a.Count++ a.Count++
a.Sum += v a.Sum += v
a.SumSq += (v * v) a.SumSq += (v * v)
@ -102,6 +105,7 @@ func (a *AggregateSample) Ingest(v float64) {
if v > a.Max || a.Count == 1 { if v > a.Max || a.Count == 1 {
a.Max = v a.Max = v
} }
a.Rate = float64(a.Count)/rateDenom
a.LastUpdated = time.Now() a.LastUpdated = time.Now()
} }
@ -119,10 +123,12 @@ func (a *AggregateSample) String() string {
// NewInmemSink is used to construct a new in-memory sink. // NewInmemSink is used to construct a new in-memory sink.
// Uses an aggregation interval and maximum retention period. // Uses an aggregation interval and maximum retention period.
func NewInmemSink(interval, retain time.Duration) *InmemSink { func NewInmemSink(interval, retain time.Duration) *InmemSink {
rateTimeUnit := time.Second
i := &InmemSink{ i := &InmemSink{
interval: interval, interval: interval,
retain: retain, retain: retain,
maxIntervals: int(retain / interval), maxIntervals: int(retain / interval),
rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()),
} }
i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals) i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals)
return i return i
@ -159,7 +165,7 @@ func (i *InmemSink) IncrCounter(key []string, val float32) {
agg = &AggregateSample{} agg = &AggregateSample{}
intv.Counters[k] = agg intv.Counters[k] = agg
} }
agg.Ingest(float64(val)) agg.Ingest(float64(val), i.rateDenom)
} }
func (i *InmemSink) AddSample(key []string, val float32) { func (i *InmemSink) AddSample(key []string, val float32) {
@ -174,7 +180,7 @@ func (i *InmemSink) AddSample(key []string, val float32) {
agg = &AggregateSample{} agg = &AggregateSample{}
intv.Samples[k] = agg intv.Samples[k] = agg
} }
agg.Ingest(float64(val)) agg.Ingest(float64(val), i.rateDenom)
} }
// Data is used to retrieve all the aggregated metrics // Data is used to retrieve all the aggregated metrics

0
vendor/github.com/armon/go-metrics/metrics.go generated vendored Normal file → Executable file
View File

View File

@ -1,88 +0,0 @@
// +build go1.3
package prometheus
import (
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
)
type PrometheusSink struct {
mu sync.Mutex
gauges map[string]prometheus.Gauge
summaries map[string]prometheus.Summary
counters map[string]prometheus.Counter
}
func NewPrometheusSink() (*PrometheusSink, error) {
return &PrometheusSink{
gauges: make(map[string]prometheus.Gauge),
summaries: make(map[string]prometheus.Summary),
counters: make(map[string]prometheus.Counter),
}, nil
}
func (p *PrometheusSink) flattenKey(parts []string) string {
joined := strings.Join(parts, "_")
joined = strings.Replace(joined, " ", "_", -1)
joined = strings.Replace(joined, ".", "_", -1)
joined = strings.Replace(joined, "-", "_", -1)
return joined
}
func (p *PrometheusSink) SetGauge(parts []string, val float32) {
p.mu.Lock()
defer p.mu.Unlock()
key := p.flattenKey(parts)
g, ok := p.gauges[key]
if !ok {
g = prometheus.NewGauge(prometheus.GaugeOpts{
Name: key,
Help: key,
})
prometheus.MustRegister(g)
p.gauges[key] = g
}
g.Set(float64(val))
}
func (p *PrometheusSink) AddSample(parts []string, val float32) {
p.mu.Lock()
defer p.mu.Unlock()
key := p.flattenKey(parts)
g, ok := p.summaries[key]
if !ok {
g = prometheus.NewSummary(prometheus.SummaryOpts{
Name: key,
Help: key,
MaxAge: 10 * time.Second,
})
prometheus.MustRegister(g)
p.summaries[key] = g
}
g.Observe(float64(val))
}
// EmitKey is not implemented. Prometheus doesnt offer a type for which an
// arbitrary number of values is retained, as Prometheus works with a pull
// model, rather than a push model.
func (p *PrometheusSink) EmitKey(key []string, val float32) {
}
func (p *PrometheusSink) IncrCounter(parts []string, val float32) {
p.mu.Lock()
defer p.mu.Unlock()
key := p.flattenKey(parts)
g, ok := p.counters[key]
if !ok {
g = prometheus.NewCounter(prometheus.CounterOpts{
Name: key,
Help: key,
})
prometheus.MustRegister(g)
p.counters[key] = g
}
g.Add(float64(val))
}

0
vendor/github.com/armon/go-metrics/sink.go generated vendored Normal file → Executable file
View File

0
vendor/github.com/armon/go-metrics/start.go generated vendored Normal file → Executable file
View File

0
vendor/github.com/armon/go-metrics/statsite.go generated vendored Normal file → Executable file
View File

View File

@ -0,0 +1,2 @@
.DS_Store
env.sh

View File

@ -0,0 +1,69 @@
# Setting up dev/test environment
Get go installed and environment configured
```sh
cd $GOPATH
mkdir -pv src/github.com/{hashicorp,armon,circonus-labs}
cd $GOPATH/src/github.com/hashicorp
git clone https://github.com/maier/consul.git
cd $GOPATH/src/github.com/armon
git clone https://github.com/maier/go-metrics.git
cd $GOPATH/src/github.com/circonus-labs
git clone https://github.com/maier/circonus-gometrics.git
cd $GOPATH/src/github.com/hashicorp/consul
make dev
```
In `$GOPATH/src/github.com/hashicorp/consul/bin` is the binary just created.
Create a consul configuration file somewhere (e.g. ~/testconfig.json) and add any applicable configuration settings. As an example:
```json
{
"datacenter": "mcfl",
"server": true,
"log_level": "debug",
"telemetry": {
"statsd_address": "127.0.0.1:8125",
"circonus_api_token": "...",
"circonus_api_host": "..."
}
}
```
StatsD was used as a check to see what metrics consul was sending and what metrics circonus was receiving. So, it can safely be elided.
Fill in appropriate cirocnus specific settings:
* circonus_api_token - required
* circonus_api_app - optional, default is circonus-gometrics
* circonus_api_host - optional, default is api.circonus.com (for dev stuff yon can use "http://..." to circumvent ssl)
* circonus_submission_url - optional
* circonus_submission_interval - optional, seconds, defaults to 10 seconds
* circonus_check_id - optional
* circonus_broker_id - optional (unless you want to use the public one, then add it)
The actual circonus-gometrics package has more configuraiton options, the above are exposed in the consul configuration.
CirconusMetrics.InstanceId is derived from consul's config.NodeName and config.Datacenter
CirconusMetrics.SearchTag is hardcoded as 'service:consul'
The defaults are taken for other options.
---
To run after creating the config:
`$GOPATH/src/github.com/hashicorp/consul/bin/consul agent -dev -config-file <config file>`
or, to add the ui (localhost:8500)
`$GOPATH/src/github.com/hashicorp/consul/bin/consul agent -dev -ui -config-file <config file>`

View File

@ -0,0 +1,175 @@
# Circonus metrics tracking for Go applications
This library supports named counters, gauges and histograms.
It also provides convenience wrappers for registering latency
instrumented functions with Go's builtin http server.
Initializing only requires setting an ApiToken.
## Example
**rough and simple**
```go
package main
import (
"log"
"math/rand"
"os"
"time"
cgm "github.com/circonus-labs/circonus-gometrics"
)
func main() {
log.Println("Configuring cgm")
cmc := &cgm.Config{}
// Interval at which metrics are submitted to Circonus, default: 10 seconds
cmc.Interval = "10s" // 10 seconds
// Enable debug messages, default: false
cmc.Debug = false
// Send debug messages to specific log.Logger instance
// default: if debug stderr, else, discard
//cmc.CheckManager.Log = ...
// Circonus API configuration options
//
// Token, no default (blank disables check manager)
cmc.CheckManager.API.TokenKey = os.Getenv("CIRCONUS_API_TOKEN")
// App name, default: circonus-gometrics
cmc.CheckManager.API.TokenApp = os.Getenv("CIRCONUS_API_APP")
// URL, default: https://api.circonus.com/v2
cmc.CheckManager.API.URL = os.Getenv("CIRCONUS_API_URL")
// Check configuration options
//
// precedence 1 - explicit submission_url
// precedence 2 - specific check id (note: not a check bundle id)
// precedence 3 - search using instanceId and searchTag
// otherwise: if an applicable check is NOT specified or found, an
// attempt will be made to automatically create one
//
// Pre-existing httptrap check submission_url
cmc.CheckManager.Check.SubmissionURL = os.Getenv("CIRCONUS_SUBMISION_URL")
// Pre-existing httptrap check id (check not check bundle)
cmc.CheckManager.Check.ID = ""
// if neither a submission url nor check id are provided, an attempt will be made to find an existing
// httptrap check by using the circonus api to search for a check matching the following criteria:
// an active check,
// of type httptrap,
// where the target/host is equal to InstanceId - see below
// and the check has a tag equal to SearchTag - see below
// Instance ID - an identifier for the 'group of metrics emitted by this process or service'
// this is used as the value for check.target (aka host)
// default: 'hostname':'program name'
// note: for a persistent instance that is ephemeral or transient where metric continuity is
// desired set this explicitly so that the current hostname will not be used.
cmc.CheckManager.Check.InstanceID = ""
// Search tag - a specific tag which, when coupled with the instanceId serves to identify the
// origin and/or grouping of the metrics
// default: service:application name (e.g. service:consul)
cmc.CheckManager.Check.SearchTag = ""
// Check secret, default: generated when a check needs to be created
cmc.CheckManager.Check.Secret = ""
// Check tags, array of strings, additional tags to add to a new check, default: none
//cmc.CheckManager.Check.Tags = []string{"category:tagname"}
// max amount of time to to hold on to a submission url
// when a given submission fails (due to retries) if the
// time the url was last updated is > than this, the trap
// url will be refreshed (e.g. if the broker is changed
// in the UI) default 5 minutes
cmc.CheckManager.Check.MaxURLAge = "5m"
// custom display name for check, default: "InstanceId /cgm"
cmc.CheckManager.Check.DisplayName = ""
// force metric activation - if a metric has been disabled via the UI
// the default behavior is to *not* re-activate the metric; this setting
// overrides the behavior and will re-activate the metric when it is
// encountered. "(true|false)", default "false"
cmc.CheckManager.Check.ForceMetricActivation = "false"
// Broker configuration options
//
// Broker ID of specific broker to use, default: random enterprise broker or
// Circonus default if no enterprise brokers are available.
// default: only used if set
cmc.CheckManager.Broker.ID = ""
// used to select a broker with the same tag (e.g. can be used to dictate that a broker
// serving a specific location should be used. "dc:sfo", "location:new_york", "zone:us-west")
// if more than one broker has the tag, one will be selected randomly from the resulting list
// default: not used unless != ""
cmc.CheckManager.Broker.SelectTag = ""
// longest time to wait for a broker connection (if latency is > the broker will
// be considered invalid and not available for selection.), default: 500 milliseconds
cmc.CheckManager.Broker.MaxResponseTime = "500ms"
// if broker Id or SelectTag are not specified, a broker will be selected randomly
// from the list of brokers available to the api token. enterprise brokers take precedence
// viable brokers are "active", have the "httptrap" module enabled, are reachable and respond
// within MaxResponseTime.
log.Println("Creating new cgm instance")
metrics, err := cgm.NewCirconusMetrics(cmc)
if err != nil {
panic(err)
}
src := rand.NewSource(time.Now().UnixNano())
rnd := rand.New(src)
log.Println("Starting cgm internal auto-flush timer")
metrics.Start()
log.Println("Starting to send metrics")
// number of "sets" of metrics to send (a minute worth)
max := 60
for i := 1; i < max; i++ {
log.Printf("\tmetric set %d of %d", i, 60)
metrics.Timing("ding", rnd.Float64()*10)
metrics.Increment("dong")
metrics.Gauge("dang", 10)
time.Sleep(1000 * time.Millisecond)
}
log.Println("Flushing any outstanding metrics manually")
metrics.Flush()
}
```
# untested
### HTTP Handler wrapping
```
http.HandleFunc("/", metrics.TrackHTTPLatency("/", handler_func))
```
### HTTP latency example
```
package main
import (
"fmt"
"net/http"
metrics "github.com/circonus-labs/circonus-gometrics"
)
func main() {
metrics.WithAuthToken("9fdd5432-5308-4691-acd1-6bf1f7a20f73")
metrics.WithCheckId(115010)
metrics.Start()
http.HandleFunc("/", metrics.TrackHTTPLatency("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
}))
http.ListenAndServe(":8080", http.DefaultServeMux)
}
```

View File

@ -0,0 +1,201 @@
// Package api provides methods for interacting with the Circonus API
package api
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/hashicorp/go-retryablehttp"
)
const (
// a few sensible defaults
defaultAPIURL = "https://api.circonus.com/v2"
defaultAPIApp = "circonus-gometrics"
minRetryWait = 10 * time.Millisecond
maxRetryWait = 50 * time.Millisecond
maxRetries = 3
)
// TokenKeyType - Circonus API Token key
type TokenKeyType string
// TokenAppType - Circonus API Token app name
type TokenAppType string
// IDType Circonus object id (numeric portion of cid)
type IDType int
// CIDType Circonus object cid
type CIDType string
// URLType submission url type
type URLType string
// SearchQueryType search query
type SearchQueryType string
// SearchTagType search/select tag type
type SearchTagType string
// Config options for Circonus API
type Config struct {
URL string
TokenKey string
TokenApp string
Log *log.Logger
Debug bool
}
// API Circonus API
type API struct {
apiURL *url.URL
key TokenKeyType
app TokenAppType
Debug bool
Log *log.Logger
}
// NewAPI returns a new Circonus API
func NewAPI(ac *Config) (*API, error) {
if ac == nil {
return nil, errors.New("Invalid API configuration (nil)")
}
key := TokenKeyType(ac.TokenKey)
if key == "" {
return nil, errors.New("API Token is required")
}
app := TokenAppType(ac.TokenApp)
if app == "" {
app = defaultAPIApp
}
au := string(ac.URL)
if au == "" {
au = defaultAPIURL
}
if !strings.Contains(au, "/") {
// if just a hostname is passed, ASSume "https" and a path prefix of "/v2"
au = fmt.Sprintf("https://%s/v2", ac.URL)
}
if last := len(au) - 1; last >= 0 && au[last] == '/' {
au = au[:last]
}
apiURL, err := url.Parse(au)
if err != nil {
return nil, err
}
a := &API{apiURL, key, app, ac.Debug, ac.Log}
if a.Log == nil {
if a.Debug {
a.Log = log.New(os.Stderr, "", log.LstdFlags)
} else {
a.Log = log.New(ioutil.Discard, "", log.LstdFlags)
}
}
return a, nil
}
// Get API request
func (a *API) Get(reqPath string) ([]byte, error) {
return a.apiCall("GET", reqPath, nil)
}
// Delete API request
func (a *API) Delete(reqPath string) ([]byte, error) {
return a.apiCall("DELETE", reqPath, nil)
}
// Post API request
func (a *API) Post(reqPath string, data []byte) ([]byte, error) {
return a.apiCall("POST", reqPath, data)
}
// Put API request
func (a *API) Put(reqPath string, data []byte) ([]byte, error) {
return a.apiCall("PUT", reqPath, data)
}
// apiCall call Circonus API
func (a *API) apiCall(reqMethod string, reqPath string, data []byte) ([]byte, error) {
dataReader := bytes.NewReader(data)
reqURL := a.apiURL.String()
if reqPath[:1] != "/" {
reqURL += "/"
}
if reqPath[:3] == "/v2" {
reqURL += reqPath[3:len(reqPath)]
} else {
reqURL += reqPath
}
req, err := retryablehttp.NewRequest(reqMethod, reqURL, dataReader)
if err != nil {
return nil, fmt.Errorf("[ERROR] creating API request: %s %+v", reqURL, err)
}
req.Header.Add("Accept", "application/json")
req.Header.Add("X-Circonus-Auth-Token", string(a.key))
req.Header.Add("X-Circonus-App-Name", string(a.app))
client := retryablehttp.NewClient()
client.RetryWaitMin = minRetryWait
client.RetryWaitMax = maxRetryWait
client.RetryMax = maxRetries
client.Logger = a.Log
resp, err := client.Do(req)
if err != nil {
stdClient := &http.Client{}
dataReader.Seek(0, 0)
stdRequest, _ := http.NewRequest(reqMethod, reqURL, dataReader)
stdRequest.Header.Add("Accept", "application/json")
stdRequest.Header.Add("X-Circonus-Auth-Token", string(a.key))
stdRequest.Header.Add("X-Circonus-App-Name", string(a.app))
res, errSC := stdClient.Do(stdRequest)
if errSC != nil {
return nil, fmt.Errorf("[ERROR] fetching %s: %s", reqURL, errSC)
}
if res != nil && res.Body != nil {
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
if a.Debug {
a.Log.Printf("[DEBUG] %v\n", string(body))
}
return nil, fmt.Errorf("[ERROR] %s", string(body))
}
return nil, fmt.Errorf("[ERROR] fetching %s: %s", reqURL, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("[ERROR] reading body %+v", err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
msg := fmt.Sprintf("API response code %d: %s", resp.StatusCode, string(body))
if a.Debug {
a.Log.Printf("[DEBUG] %s\n", msg)
}
return nil, fmt.Errorf("[ERROR] %s", msg)
}
return body, nil
}

View File

@ -0,0 +1,85 @@
package api
import (
"encoding/json"
"fmt"
)
// BrokerDetail instance attributes
type BrokerDetail struct {
CN string `json:"cn"`
IP string `json:"ipaddress"`
MinVer int `json:"minimum_version_required"`
Modules []string `json:"modules"`
Port int `json:"port"`
Skew string `json:"skew"`
Status string `json:"status"`
Version int `json:"version"`
}
// Broker definition
type Broker struct {
Cid string `json:"_cid"`
Details []BrokerDetail `json:"_details"`
Latitude string `json:"_latitude"`
Longitude string `json:"_longitude"`
Name string `json:"_name"`
Tags []string `json:"_tags"`
Type string `json:"_type"`
}
// FetchBrokerByID fetch a broker configuration by [group]id
func (a *API) FetchBrokerByID(id IDType) (*Broker, error) {
cid := CIDType(fmt.Sprintf("/broker/%d", id))
return a.FetchBrokerByCID(cid)
}
// FetchBrokerByCID fetch a broker configuration by cid
func (a *API) FetchBrokerByCID(cid CIDType) (*Broker, error) {
result, err := a.Get(string(cid))
if err != nil {
return nil, err
}
response := new(Broker)
if err := json.Unmarshal(result, &response); err != nil {
return nil, err
}
return response, nil
}
// FetchBrokerListByTag return list of brokers with a specific tag
func (a *API) FetchBrokerListByTag(searchTag SearchTagType) ([]Broker, error) {
query := SearchQueryType(fmt.Sprintf("f__tags_has=%s", searchTag))
return a.BrokerSearch(query)
}
// BrokerSearch return a list of brokers matching a query/filter
func (a *API) BrokerSearch(query SearchQueryType) ([]Broker, error) {
queryURL := fmt.Sprintf("/broker?%s", string(query))
result, err := a.Get(queryURL)
if err != nil {
return nil, err
}
var brokers []Broker
json.Unmarshal(result, &brokers)
return brokers, nil
}
// FetchBrokerList return list of all brokers available to the api token/app
func (a *API) FetchBrokerList() ([]Broker, error) {
result, err := a.Get("/broker")
if err != nil {
return nil, err
}
var response []Broker
json.Unmarshal(result, &response)
return response, nil
}

View File

@ -0,0 +1,109 @@
package api
import (
"encoding/json"
"fmt"
"net/url"
"strings"
)
// CheckDetails is an arbitrary json structure, we would only care about submission_url
type CheckDetails struct {
SubmissionURL string `json:"submission_url"`
}
// Check definition
type Check struct {
Cid string `json:"_cid"`
Active bool `json:"_active"`
BrokerCid string `json:"_broker"`
CheckBundleCid string `json:"_check_bundle"`
CheckUUID string `json:"_check_uuid"`
Details CheckDetails `json:"_details"`
}
// FetchCheckByID fetch a check configuration by id
func (a *API) FetchCheckByID(id IDType) (*Check, error) {
cid := CIDType(fmt.Sprintf("/check/%d", int(id)))
return a.FetchCheckByCID(cid)
}
// FetchCheckByCID fetch a check configuration by cid
func (a *API) FetchCheckByCID(cid CIDType) (*Check, error) {
result, err := a.Get(string(cid))
if err != nil {
return nil, err
}
check := new(Check)
json.Unmarshal(result, check)
return check, nil
}
// FetchCheckBySubmissionURL fetch a check configuration by submission_url
func (a *API) FetchCheckBySubmissionURL(submissionURL URLType) (*Check, error) {
u, err := url.Parse(string(submissionURL))
if err != nil {
return nil, err
}
// valid trap url: scheme://host[:port]/module/httptrap/UUID/secret
// does it smell like a valid trap url path
if u.Path[:17] != "/module/httptrap/" {
return nil, fmt.Errorf("[ERROR] Invalid submission URL '%s', unrecognized path", submissionURL)
}
// extract uuid/secret
pathParts := strings.Split(u.Path[17:len(u.Path)], "/")
if len(pathParts) != 2 {
return nil, fmt.Errorf("[ERROR] Invalid submission URL '%s', UUID not where expected", submissionURL)
}
uuid := pathParts[0]
query := SearchQueryType(fmt.Sprintf("f__check_uuid=%s", uuid))
checks, err := a.CheckSearch(query)
if err != nil {
return nil, err
}
if len(checks) == 0 {
return nil, fmt.Errorf("[ERROR] No checks found with UUID %s", uuid)
}
numActive := 0
checkID := -1
for idx, check := range checks {
if check.Active {
numActive++
checkID = idx
}
}
if numActive > 1 {
return nil, fmt.Errorf("[ERROR] Multiple checks with same UUID %s", uuid)
}
return &checks[checkID], nil
}
// CheckSearch returns a list of checks matching a query/filter
func (a *API) CheckSearch(query SearchQueryType) ([]Check, error) {
queryURL := fmt.Sprintf("/check?%s", string(query))
result, err := a.Get(queryURL)
if err != nil {
return nil, err
}
var checks []Check
json.Unmarshal(result, &checks)
return checks, nil
}

View File

@ -0,0 +1,128 @@
package api
import (
"encoding/json"
"fmt"
)
// CheckBundleConfig configuration specific to check type
type CheckBundleConfig struct {
AsyncMetrics bool `json:"async_metrics"`
Secret string `json:"secret"`
SubmissionURL string `json:"submission_url"`
}
// CheckBundleMetric individual metric configuration
type CheckBundleMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Units string `json:"units"`
Status string `json:"status"`
}
// CheckBundle definition
type CheckBundle struct {
CheckUUIDs []string `json:"_check_uuids,omitempty"`
Checks []string `json:"_checks,omitempty"`
Cid string `json:"_cid,omitempty"`
Created int `json:"_created,omitempty"`
LastModified int `json:"_last_modified,omitempty"`
LastModifedBy string `json:"_last_modifed_by,omitempty"`
ReverseConnectUrls []string `json:"_reverse_connection_urls,omitempty"`
Brokers []string `json:"brokers"`
Config CheckBundleConfig `json:"config"`
DisplayName string `json:"display_name"`
Metrics []CheckBundleMetric `json:"metrics"`
MetricLimit int `json:"metric_limit"`
Notes string `json:"notes"`
Period int `json:"period"`
Status string `json:"status"`
Tags []string `json:"tags"`
Target string `json:"target"`
Timeout int `json:"timeout"`
Type string `json:"type"`
}
// FetchCheckBundleByID fetch a check bundle configuration by id
func (a *API) FetchCheckBundleByID(id IDType) (*CheckBundle, error) {
cid := CIDType(fmt.Sprintf("/check_bundle/%d", id))
return a.FetchCheckBundleByCID(cid)
}
// FetchCheckBundleByCID fetch a check bundle configuration by id
func (a *API) FetchCheckBundleByCID(cid CIDType) (*CheckBundle, error) {
result, err := a.Get(string(cid))
if err != nil {
return nil, err
}
checkBundle := &CheckBundle{}
json.Unmarshal(result, checkBundle)
return checkBundle, nil
}
// CheckBundleSearch returns list of check bundles matching a search query
// - a search query not a filter (see: https://login.circonus.com/resources/api#searching)
func (a *API) CheckBundleSearch(searchCriteria SearchQueryType) ([]CheckBundle, error) {
apiPath := fmt.Sprintf("/check_bundle?search=%s", searchCriteria)
response, err := a.Get(apiPath)
if err != nil {
return nil, fmt.Errorf("[ERROR] API call error %+v", err)
}
var results []CheckBundle
err = json.Unmarshal(response, &results)
if err != nil {
return nil, fmt.Errorf("[ERROR] Parsing JSON response %+v", err)
}
return results, nil
}
// CreateCheckBundle create a new check bundle (check)
func (a *API) CreateCheckBundle(config CheckBundle) (*CheckBundle, error) {
cfgJSON, err := json.Marshal(config)
if err != nil {
return nil, err
}
response, err := a.Post("/check_bundle", cfgJSON)
if err != nil {
return nil, err
}
checkBundle := &CheckBundle{}
err = json.Unmarshal(response, checkBundle)
if err != nil {
return nil, err
}
return checkBundle, nil
}
// UpdateCheckBundle updates a check bundle configuration
func (a *API) UpdateCheckBundle(config *CheckBundle) (*CheckBundle, error) {
if a.Debug {
a.Log.Printf("[DEBUG] Updating check bundle with new metrics.")
}
cfgJSON, err := json.Marshal(config)
if err != nil {
return nil, err
}
response, err := a.Put(config.Cid, cfgJSON)
if err != nil {
return nil, err
}
checkBundle := &CheckBundle{}
err = json.Unmarshal(response, checkBundle)
if err != nil {
return nil, err
}
return checkBundle, nil
}

View File

@ -0,0 +1,195 @@
package checkmgr
import (
"fmt"
"math/rand"
"net"
"net/url"
"reflect"
"strings"
"time"
"github.com/circonus-labs/circonus-gometrics/api"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// Get Broker to use when creating a check
func (cm *CheckManager) getBroker() (*api.Broker, error) {
if cm.brokerID != 0 {
broker, err := cm.apih.FetchBrokerByID(cm.brokerID)
if err != nil {
return nil, err
}
if !cm.isValidBroker(broker) {
return nil, fmt.Errorf(
"[ERROR] designated broker %d [%s] is invalid (not active, does not support required check type, or connectivity issue)",
cm.brokerID,
broker.Name)
}
return broker, nil
}
broker, err := cm.selectBroker()
if err != nil {
return nil, fmt.Errorf("[ERROR] Unable to fetch suitable broker %s", err)
}
return broker, nil
}
// Get CN of Broker associated with submission_url to satisfy no IP SANS in certs
func (cm *CheckManager) getBrokerCN(broker *api.Broker, submissionURL api.URLType) (string, error) {
u, err := url.Parse(string(submissionURL))
if err != nil {
return "", err
}
hostParts := strings.Split(u.Host, ":")
host := hostParts[0]
if net.ParseIP(host) == nil { // it's a non-ip string
return u.Host, nil
}
cn := ""
for _, detail := range broker.Details {
if detail.IP == host {
cn = detail.CN
break
}
}
if cn == "" {
return "", fmt.Errorf("[ERROR] Unable to match URL host (%s) to Broker", u.Host)
}
return cn, nil
}
// Select a broker for use when creating a check, if a specific broker
// was not specified.
func (cm *CheckManager) selectBroker() (*api.Broker, error) {
var brokerList []api.Broker
var err error
if cm.brokerSelectTag != "" {
brokerList, err = cm.apih.FetchBrokerListByTag(cm.brokerSelectTag)
if err != nil {
return nil, err
}
} else {
brokerList, err = cm.apih.FetchBrokerList()
if err != nil {
return nil, err
}
}
if len(brokerList) == 0 {
return nil, fmt.Errorf("zero brokers found")
}
validBrokers := make(map[string]api.Broker)
haveEnterprise := false
for _, broker := range brokerList {
if cm.isValidBroker(&broker) {
validBrokers[broker.Cid] = broker
if broker.Type == "enterprise" {
haveEnterprise = true
}
}
}
if haveEnterprise { // eliminate non-enterprise brokers from valid brokers
for k, v := range validBrokers {
if v.Type != "enterprise" {
delete(validBrokers, k)
}
}
}
if len(validBrokers) == 0 {
return nil, fmt.Errorf("found %d broker(s), zero are valid", len(brokerList))
}
validBrokerKeys := reflect.ValueOf(validBrokers).MapKeys()
selectedBroker := validBrokers[validBrokerKeys[rand.Intn(len(validBrokerKeys))].String()]
if cm.Debug {
cm.Log.Printf("[DEBUG] Selected broker '%s'\n", selectedBroker.Name)
}
return &selectedBroker, nil
}
// Verify broker supports the check type to be used
func (cm *CheckManager) brokerSupportsCheckType(checkType CheckTypeType, details *api.BrokerDetail) bool {
for _, module := range details.Modules {
if CheckTypeType(module) == checkType {
return true
}
}
return false
}
// Is the broker valid (active, supports check type, and reachable)
func (cm *CheckManager) isValidBroker(broker *api.Broker) bool {
brokerPort := 0
valid := false
for _, detail := range broker.Details {
brokerPort = 43191
// broker must be active
if detail.Status != statusActive {
if cm.Debug {
cm.Log.Printf("[DEBUG] Broker '%s' is not active.\n", broker.Name)
}
continue
}
// broker must have module loaded for the check type to be used
if !cm.brokerSupportsCheckType(cm.checkType, &detail) {
if cm.Debug {
cm.Log.Printf("[DEBUG] Broker '%s' does not support '%s' checks.\n", broker.Name, cm.checkType)
}
continue
}
// broker must be reachable and respond within designated time
conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", detail.IP, brokerPort), cm.brokerMaxResponseTime)
if err != nil {
if detail.CN != "trap.noit.circonus.net" {
if cm.Debug {
cm.Log.Printf("[DEBUG] Broker '%s' unable to connect, %v\n", broker.Name, err)
}
continue // not able to reach the broker (or respone slow enough for it to be considered not usable)
}
// if circonus trap broker, try port 443
brokerPort = 443
conn, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", detail.CN, brokerPort), cm.brokerMaxResponseTime)
if err != nil {
if cm.Debug {
cm.Log.Printf("[DEBUG] Broker '%s' unable to connect %v\n", broker.Name, err)
}
continue // not able to reach the broker on 443 either (or respone slow enough for it to be considered not usable)
}
}
conn.Close()
if cm.Debug {
cm.Log.Printf("[DEBUG] Broker '%s' is valid\n", broker.Name)
}
valid = true
break
}
return valid
}

View File

@ -0,0 +1,83 @@
package checkmgr
import (
"crypto/x509"
"encoding/json"
"fmt"
)
// Default Circonus CA certificate
var circonusCA = []byte(`-----BEGIN CERTIFICATE-----
MIID4zCCA0ygAwIBAgIJAMelf8skwVWPMA0GCSqGSIb3DQEBBQUAMIGoMQswCQYD
VQQGEwJVUzERMA8GA1UECBMITWFyeWxhbmQxETAPBgNVBAcTCENvbHVtYmlhMRcw
FQYDVQQKEw5DaXJjb251cywgSW5jLjERMA8GA1UECxMIQ2lyY29udXMxJzAlBgNV
BAMTHkNpcmNvbnVzIENlcnRpZmljYXRlIEF1dGhvcml0eTEeMBwGCSqGSIb3DQEJ
ARYPY2FAY2lyY29udXMubmV0MB4XDTA5MTIyMzE5MTcwNloXDTE5MTIyMTE5MTcw
NlowgagxCzAJBgNVBAYTAlVTMREwDwYDVQQIEwhNYXJ5bGFuZDERMA8GA1UEBxMI
Q29sdW1iaWExFzAVBgNVBAoTDkNpcmNvbnVzLCBJbmMuMREwDwYDVQQLEwhDaXJj
b251czEnMCUGA1UEAxMeQ2lyY29udXMgQ2VydGlmaWNhdGUgQXV0aG9yaXR5MR4w
HAYJKoZIhvcNAQkBFg9jYUBjaXJjb251cy5uZXQwgZ8wDQYJKoZIhvcNAQEBBQAD
gY0AMIGJAoGBAKz2X0/0vJJ4ad1roehFyxUXHdkjJA9msEKwT2ojummdUB3kK5z6
PDzDL9/c65eFYWqrQWVWZSLQK1D+v9xJThCe93v6QkSJa7GZkCq9dxClXVtBmZH3
hNIZZKVC6JMA9dpRjBmlFgNuIdN7q5aJsv8VZHH+QrAyr9aQmhDJAmk1AgMBAAGj
ggERMIIBDTAdBgNVHQ4EFgQUyNTsgZHSkhhDJ5i+6IFlPzKYxsUwgd0GA1UdIwSB
1TCB0oAUyNTsgZHSkhhDJ5i+6IFlPzKYxsWhga6kgaswgagxCzAJBgNVBAYTAlVT
MREwDwYDVQQIEwhNYXJ5bGFuZDERMA8GA1UEBxMIQ29sdW1iaWExFzAVBgNVBAoT
DkNpcmNvbnVzLCBJbmMuMREwDwYDVQQLEwhDaXJjb251czEnMCUGA1UEAxMeQ2ly
Y29udXMgQ2VydGlmaWNhdGUgQXV0aG9yaXR5MR4wHAYJKoZIhvcNAQkBFg9jYUBj
aXJjb251cy5uZXSCCQDHpX/LJMFVjzAMBgNVHRMEBTADAQH/MA0GCSqGSIb3DQEB
BQUAA4GBAAHBtl15BwbSyq0dMEBpEdQYhHianU/rvOMe57digBmox7ZkPEbB/baE
sYJysziA2raOtRxVRtcxuZSMij2RiJDsLxzIp1H60Xhr8lmf7qF6Y+sZl7V36KZb
n2ezaOoRtsQl9dhqEMe8zgL76p9YZ5E69Al0mgiifTteyNjjMuIW
-----END CERTIFICATE-----`)
// CACert contains cert returned from Circonus API
type CACert struct {
Contents string `json:"contents"`
}
// loadCACert loads the CA cert for the broker designated by the submission url
func (cm *CheckManager) loadCACert() {
if cm.certPool != nil {
return
}
cm.certPool = x509.NewCertPool()
cert, err := cm.fetchCert()
if err != nil {
if cm.Debug {
cm.Log.Printf("[DEBUG] Unable to fetch ca.crt, using default. %+v\n", err)
}
}
if cert == nil {
cert = circonusCA
}
cm.certPool.AppendCertsFromPEM(cert)
}
// fetchCert fetches CA certificate using Circonus API
func (cm *CheckManager) fetchCert() ([]byte, error) {
if !cm.enabled {
return circonusCA, nil
}
response, err := cm.apih.Get("/pki/ca.crt")
if err != nil {
return nil, err
}
cadata := new(CACert)
err = json.Unmarshal(response, cadata)
if err != nil {
return nil, err
}
if cadata.Contents == "" {
return nil, fmt.Errorf("[ERROR] Unable to find ca cert %+v", cadata)
}
return []byte(cadata.Contents), nil
}

View File

@ -0,0 +1,207 @@
package checkmgr
import (
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/circonus-labs/circonus-gometrics/api"
)
// Initialize CirconusMetrics instance. Attempt to find a check otherwise create one.
// use cases:
//
// check [bundle] by submission url
// check [bundle] by *check* id (note, not check_bundle id)
// check [bundle] by search
// create check [bundle]
func (cm *CheckManager) initializeTrapURL() error {
if cm.trapURL != "" {
return nil
}
cm.trapmu.Lock()
defer cm.trapmu.Unlock()
if cm.checkSubmissionURL != "" {
if !cm.enabled {
cm.trapURL = cm.checkSubmissionURL
cm.trapLastUpdate = time.Now()
return nil
}
}
if !cm.enabled {
return errors.New("Unable to initialize trap, check manager is disabled.")
}
var err error
var check *api.Check
var checkBundle *api.CheckBundle
var broker *api.Broker
if cm.checkSubmissionURL != "" {
check, err = cm.apih.FetchCheckBySubmissionURL(cm.checkSubmissionURL)
if err != nil {
return err
}
// extract check id from check object returned from looking up using submission url
// set m.CheckId to the id
// set m.SubmissionUrl to "" to prevent trying to search on it going forward
// use case: if the broker is changed in the UI metrics would stop flowing
// unless the new submission url can be fetched with the API (which is no
// longer possible using the original submission url)
var id int
id, err = strconv.Atoi(strings.Replace(check.Cid, "/check/", "", -1))
if err == nil {
cm.checkID = api.IDType(id)
cm.checkSubmissionURL = ""
} else {
cm.Log.Printf(
"[WARN] SubmissionUrl check to Check ID: unable to convert %s to int %q\n",
check.Cid, err)
}
} else if cm.checkID > 0 {
check, err = cm.apih.FetchCheckByID(cm.checkID)
if err != nil {
return err
}
} else {
searchCriteria := fmt.Sprintf(
"(active:1)(host:\"%s\")(type:\"%s\")(tags:%s)",
cm.checkInstanceID, cm.checkType, cm.checkSearchTag)
checkBundle, err = cm.checkBundleSearch(searchCriteria)
if err != nil {
return err
}
if checkBundle == nil {
// err==nil && checkBundle==nil is "no check bundles matched"
// an error *should* be returned for any other invalid scenario
checkBundle, broker, err = cm.createNewCheck()
if err != nil {
return err
}
}
}
if checkBundle == nil {
if check != nil {
checkBundle, err = cm.apih.FetchCheckBundleByCID(api.CIDType(check.CheckBundleCid))
if err != nil {
return err
}
} else {
return fmt.Errorf("[ERROR] Unable to retrieve, find, or create check")
}
}
if broker == nil {
broker, err = cm.apih.FetchBrokerByCID(api.CIDType(checkBundle.Brokers[0]))
if err != nil {
return err
}
}
// retain to facilitate metric management (adding new metrics specifically)
cm.checkBundle = checkBundle
cm.inventoryMetrics()
// url to which metrics should be PUT
cm.trapURL = api.URLType(checkBundle.Config.SubmissionURL)
// used when sending as "ServerName" get around certs not having IP SANS
// (cert created with server name as CN but IP used in trap url)
cn, err := cm.getBrokerCN(broker, cm.trapURL)
if err != nil {
return err
}
cm.trapCN = BrokerCNType(cn)
cm.trapLastUpdate = time.Now()
return nil
}
// Search for a check bundle given a predetermined set of criteria
func (cm *CheckManager) checkBundleSearch(criteria string) (*api.CheckBundle, error) {
checkBundles, err := cm.apih.CheckBundleSearch(api.SearchQueryType(criteria))
if err != nil {
return nil, err
}
if len(checkBundles) == 0 {
return nil, nil // trigger creation of a new check
}
numActive := 0
checkID := -1
for idx, check := range checkBundles {
if check.Status == statusActive {
numActive++
checkID = idx
}
}
if numActive > 1 {
return nil, fmt.Errorf("[ERROR] Multiple possibilities multiple check bundles match criteria %s\n", criteria)
}
return &checkBundles[checkID], nil
}
// Create a new check to receive metrics
func (cm *CheckManager) createNewCheck() (*api.CheckBundle, *api.Broker, error) {
checkSecret := string(cm.checkSecret)
if checkSecret == "" {
secret, err := cm.makeSecret()
if err != nil {
secret = "myS3cr3t"
}
checkSecret = secret
}
broker, err := cm.getBroker()
if err != nil {
return nil, nil, err
}
config := api.CheckBundle{
Brokers: []string{broker.Cid},
Config: api.CheckBundleConfig{AsyncMetrics: true, Secret: checkSecret},
DisplayName: string(cm.checkDisplayName),
Metrics: []api.CheckBundleMetric{},
MetricLimit: 0,
Notes: "",
Period: 60,
Status: statusActive,
Tags: append([]string{string(cm.checkSearchTag)}, cm.checkTags...),
Target: string(cm.checkInstanceID),
Timeout: 10,
Type: string(cm.checkType),
}
checkBundle, err := cm.apih.CreateCheckBundle(config)
if err != nil {
return nil, nil, err
}
return checkBundle, broker, nil
}
// Create a dynamic secret to use with a new check
func (cm *CheckManager) makeSecret() (string, error) {
hash := sha256.New()
x := make([]byte, 2048)
if _, err := rand.Read(x); err != nil {
return "", err
}
hash.Write(x)
return hex.EncodeToString(hash.Sum(nil))[0:16], nil
}

View File

@ -0,0 +1,361 @@
// Package checkmgr provides a check management interace to circonus-gometrics
package checkmgr
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"log"
"net/url"
"os"
"path"
"strconv"
"sync"
"time"
"github.com/circonus-labs/circonus-gometrics/api"
)
// Check management offers:
//
// Create a check if one cannot be found matching specific criteria
// Manage metrics in the supplied check (enabling new metrics as they are submitted)
//
// To disable check management, leave Config.Api.Token.Key blank
//
// use cases:
// configure without api token - check management disabled
// - configuration parameters other than Check.SubmissionUrl, Debug and Log are ignored
// - note: SubmissionUrl is **required** in this case as there is no way to derive w/o api
// configure with api token - check management enabled
// - all otehr configuration parameters affect how the trap url is obtained
// 1. provided (Check.SubmissionUrl)
// 2. via check lookup (CheckConfig.Id)
// 3. via a search using CheckConfig.InstanceId + CheckConfig.SearchTag
// 4. a new check is created
const (
defaultCheckType = "httptrap"
defaultTrapMaxURLAge = "60s" // 60 seconds
defaultBrokerMaxResponseTime = "500ms" // 500 milliseconds
defaultForceMetricActivation = "false"
statusActive = "active"
)
// CheckConfig options for check
type CheckConfig struct {
// a specific submission url
SubmissionURL string
// a specific check id (not check bundle id)
ID string
// unique instance id string
// used to search for a check to use
// used as check.target when creating a check
InstanceID string
// unique check searching tag
// used to search for a check to use (combined with instanceid)
// used as a regular tag when creating a check
SearchTag string
// a custom display name for the check (as viewed in UI Checks)
DisplayName string
// httptrap check secret (for creating a check)
Secret string
// additional tags to add to a check (when creating a check)
// these tags will not be added to an existing check
Tags []string
// max amount of time to to hold on to a submission url
// when a given submission fails (due to retries) if the
// time the url was last updated is > than this, the trap
// url will be refreshed (e.g. if the broker is changed
// in the UI) **only relevant when check management is enabled**
// e.g. 5m, 30m, 1h, etc.
MaxURLAge string
// force metric activation - if a metric has been disabled via the UI
// the default behavior is to *not* re-activate the metric; this setting
// overrides the behavior and will re-activate the metric when it is
// encountered. "(true|false)", default "false"
ForceMetricActivation string
}
// BrokerConfig options for broker
type BrokerConfig struct {
// a specific broker id (numeric portion of cid)
ID string
// a tag that can be used to select 1-n brokers from which to select
// when creating a new check (e.g. datacenter:abc)
SelectTag string
// for a broker to be considered viable it must respond to a
// connection attempt within this amount of time e.g. 200ms, 2s, 1m
MaxResponseTime string
}
// Config options
type Config struct {
Log *log.Logger
Debug bool
// Circonus API config
API api.Config
// Check specific configuration options
Check CheckConfig
// Broker specific configuration options
Broker BrokerConfig
}
// CheckTypeType check type
type CheckTypeType string
// CheckInstanceIDType check instance id
type CheckInstanceIDType string
// CheckSecretType check secret
type CheckSecretType string
// CheckTagsType check tags
type CheckTagsType []string
// CheckDisplayNameType check display name
type CheckDisplayNameType string
// BrokerCNType broker common name
type BrokerCNType string
// CheckManager settings
type CheckManager struct {
enabled bool
Log *log.Logger
Debug bool
apih *api.API
// check
checkType CheckTypeType
checkID api.IDType
checkInstanceID CheckInstanceIDType
checkSearchTag api.SearchTagType
checkSecret CheckSecretType
checkTags CheckTagsType
checkSubmissionURL api.URLType
checkDisplayName CheckDisplayNameType
forceMetricActivation bool
// broker
brokerID api.IDType
brokerSelectTag api.SearchTagType
brokerMaxResponseTime time.Duration
// state
checkBundle *api.CheckBundle
availableMetrics map[string]bool
trapURL api.URLType
trapCN BrokerCNType
trapLastUpdate time.Time
trapMaxURLAge time.Duration
trapmu sync.Mutex
certPool *x509.CertPool
}
// Trap config
type Trap struct {
URL *url.URL
TLS *tls.Config
}
// NewCheckManager returns a new check manager
func NewCheckManager(cfg *Config) (*CheckManager, error) {
if cfg == nil {
return nil, errors.New("Invalid Check Manager configuration (nil).")
}
cm := &CheckManager{
enabled: false,
}
cm.Debug = cfg.Debug
cm.Log = cfg.Log
if cm.Log == nil {
if cm.Debug {
cm.Log = log.New(os.Stderr, "", log.LstdFlags)
} else {
cm.Log = log.New(ioutil.Discard, "", log.LstdFlags)
}
}
if cfg.Check.SubmissionURL != "" {
cm.checkSubmissionURL = api.URLType(cfg.Check.SubmissionURL)
}
// Blank API Token *disables* check management
if cfg.API.TokenKey == "" {
if cm.checkSubmissionURL == "" {
return nil, errors.New("Invalid check manager configuration (no API token AND no submission url).")
}
if err := cm.initializeTrapURL(); err != nil {
return nil, err
}
return cm, nil
}
// enable check manager
cm.enabled = true
// initialize api handle
cfg.API.Debug = cm.Debug
cfg.API.Log = cm.Log
apih, err := api.NewAPI(&cfg.API)
if err != nil {
return nil, err
}
cm.apih = apih
// initialize check related data
cm.checkType = defaultCheckType
idSetting := "0"
if cfg.Check.ID != "" {
idSetting = cfg.Check.ID
}
id, err := strconv.Atoi(idSetting)
if err != nil {
return nil, err
}
cm.checkID = api.IDType(id)
cm.checkInstanceID = CheckInstanceIDType(cfg.Check.InstanceID)
cm.checkDisplayName = CheckDisplayNameType(cfg.Check.DisplayName)
cm.checkSearchTag = api.SearchTagType(cfg.Check.SearchTag)
cm.checkSecret = CheckSecretType(cfg.Check.Secret)
cm.checkTags = cfg.Check.Tags
fma := defaultForceMetricActivation
if cfg.Check.ForceMetricActivation != "" {
fma = cfg.Check.ForceMetricActivation
}
fm, err := strconv.ParseBool(fma)
if err != nil {
return nil, err
}
cm.forceMetricActivation = fm
_, an := path.Split(os.Args[0])
hn, err := os.Hostname()
if err != nil {
hn = "unknown"
}
if cm.checkInstanceID == "" {
cm.checkInstanceID = CheckInstanceIDType(fmt.Sprintf("%s:%s", hn, an))
}
if cm.checkSearchTag == "" {
cm.checkSearchTag = api.SearchTagType(fmt.Sprintf("service:%s", an))
}
if cm.checkDisplayName == "" {
cm.checkDisplayName = CheckDisplayNameType(fmt.Sprintf("%s /cgm", string(cm.checkInstanceID)))
}
dur := cfg.Check.MaxURLAge
if dur == "" {
dur = defaultTrapMaxURLAge
}
maxDur, err := time.ParseDuration(dur)
if err != nil {
return nil, err
}
cm.trapMaxURLAge = maxDur
// setup broker
idSetting = "0"
if cfg.Broker.ID != "" {
idSetting = cfg.Broker.ID
}
id, err = strconv.Atoi(idSetting)
if err != nil {
return nil, err
}
cm.brokerID = api.IDType(id)
cm.brokerSelectTag = api.SearchTagType(cfg.Broker.SelectTag)
dur = cfg.Broker.MaxResponseTime
if dur == "" {
dur = defaultBrokerMaxResponseTime
}
maxDur, err = time.ParseDuration(dur)
if err != nil {
return nil, err
}
cm.brokerMaxResponseTime = maxDur
// metrics
cm.availableMetrics = make(map[string]bool)
if err := cm.initializeTrapURL(); err != nil {
return nil, err
}
return cm, nil
}
// GetTrap return the trap url
func (cm *CheckManager) GetTrap() (*Trap, error) {
if cm.trapURL == "" {
if err := cm.initializeTrapURL(); err != nil {
return nil, err
}
}
trap := &Trap{}
u, err := url.Parse(string(cm.trapURL))
if err != nil {
return nil, err
}
trap.URL = u
if u.Scheme == "https" {
if cm.certPool == nil {
cm.loadCACert()
}
t := &tls.Config{
RootCAs: cm.certPool,
}
if cm.trapCN != "" {
t.ServerName = string(cm.trapCN)
}
trap.TLS = t
}
return trap, nil
}
// ResetTrap URL, force request to the API for the submission URL and broker ca cert
func (cm *CheckManager) ResetTrap() error {
if cm.trapURL == "" {
return nil
}
cm.trapURL = ""
cm.certPool = nil
err := cm.initializeTrapURL()
return err
}
// RefreshTrap check when the last time the URL was reset, reset if needed
func (cm *CheckManager) RefreshTrap() {
if cm.trapURL == "" {
return
}
if time.Since(cm.trapLastUpdate) >= cm.trapMaxURLAge {
cm.ResetTrap()
}
}

View File

@ -0,0 +1,75 @@
package checkmgr
import (
"github.com/circonus-labs/circonus-gometrics/api"
)
// IsMetricActive checks whether a given metric name is currently active(enabled)
func (cm *CheckManager) IsMetricActive(name string) bool {
active, _ := cm.availableMetrics[name]
return active
}
// ActivateMetric determines if a given metric should be activated
func (cm *CheckManager) ActivateMetric(name string) bool {
active, exists := cm.availableMetrics[name]
if !exists {
return true
}
if !active && cm.forceMetricActivation {
return true
}
return false
}
// AddNewMetrics updates a check bundle with new metrics
func (cm *CheckManager) AddNewMetrics(newMetrics map[string]*api.CheckBundleMetric) {
// only if check manager is enabled
if !cm.enabled {
return
}
// only if checkBundle has been populated
if cm.checkBundle == nil {
return
}
newCheckBundle := cm.checkBundle
numCurrMetrics := len(newCheckBundle.Metrics)
numNewMetrics := len(newMetrics)
if numCurrMetrics+numNewMetrics >= cap(newCheckBundle.Metrics) {
nm := make([]api.CheckBundleMetric, numCurrMetrics+numNewMetrics)
copy(nm, newCheckBundle.Metrics)
newCheckBundle.Metrics = nm
}
newCheckBundle.Metrics = newCheckBundle.Metrics[0 : numCurrMetrics+numNewMetrics]
i := 0
for _, metric := range newMetrics {
newCheckBundle.Metrics[numCurrMetrics+i] = *metric
i++
}
checkBundle, err := cm.apih.UpdateCheckBundle(newCheckBundle)
if err != nil {
cm.Log.Printf("[ERROR] updating check bundle with new metrics %v", err)
return
}
cm.checkBundle = checkBundle
cm.inventoryMetrics()
}
// inventoryMetrics creates list of active metrics in check bundle
func (cm *CheckManager) inventoryMetrics() {
availableMetrics := make(map[string]bool)
for _, metric := range cm.checkBundle.Metrics {
availableMetrics[metric.Name] = metric.Status == "active"
}
cm.availableMetrics = availableMetrics
}

View File

@ -0,0 +1,250 @@
// Package circonusgometrics provides instrumentation for your applications in the form
// of counters, gauges and histograms and allows you to publish them to
// Circonus
//
// Counters
//
// A counter is a monotonically-increasing, unsigned, 64-bit integer used to
// represent the number of times an event has occurred. By tracking the deltas
// between measurements of a counter over intervals of time, an aggregation
// layer can derive rates, acceleration, etc.
//
// Gauges
//
// A gauge returns instantaneous measurements of something using signed, 64-bit
// integers. This value does not need to be monotonic.
//
// Histograms
//
// A histogram tracks the distribution of a stream of values (e.g. the number of
// seconds it takes to handle requests). Circonus can calculate complex
// analytics on these.
//
// Reporting
//
// A period push to a Circonus httptrap is confgurable.
package circonusgometrics
import (
"errors"
"io/ioutil"
"log"
"os"
"sync"
"time"
"github.com/circonus-labs/circonus-gometrics/api"
"github.com/circonus-labs/circonus-gometrics/checkmgr"
)
const (
defaultFlushInterval = "10s" // 10 * time.Second
)
// Config options for circonus-gometrics
type Config struct {
Log *log.Logger
Debug bool
// API, Check and Broker configuration options
CheckManager checkmgr.Config
// how frequenly to submit metrics to Circonus, default 10 seconds
Interval string
}
// CirconusMetrics state
type CirconusMetrics struct {
Log *log.Logger
Debug bool
flushInterval time.Duration
flushing bool
flushmu sync.Mutex
check *checkmgr.CheckManager
counters map[string]uint64
cm sync.Mutex
counterFuncs map[string]func() uint64
cfm sync.Mutex
gauges map[string]int64
gm sync.Mutex
gaugeFuncs map[string]func() int64
gfm sync.Mutex
histograms map[string]*Histogram
hm sync.Mutex
text map[string]string
tm sync.Mutex
textFuncs map[string]func() string
tfm sync.Mutex
}
// NewCirconusMetrics returns a CirconusMetrics instance
func NewCirconusMetrics(cfg *Config) (*CirconusMetrics, error) {
if cfg == nil {
return nil, errors.New("Invalid configuration (nil).")
}
cm := &CirconusMetrics{
counters: make(map[string]uint64),
counterFuncs: make(map[string]func() uint64),
gauges: make(map[string]int64),
gaugeFuncs: make(map[string]func() int64),
histograms: make(map[string]*Histogram),
text: make(map[string]string),
textFuncs: make(map[string]func() string),
}
cm.Debug = cfg.Debug
if cm.Debug {
if cfg.Log == nil {
cm.Log = log.New(os.Stderr, "", log.LstdFlags)
} else {
cm.Log = cfg.Log
}
}
if cm.Log == nil {
cm.Log = log.New(ioutil.Discard, "", log.LstdFlags)
}
fi := defaultFlushInterval
if cfg.Interval != "" {
fi = cfg.Interval
}
dur, err := time.ParseDuration(fi)
if err != nil {
return nil, err
}
cm.flushInterval = dur
cfg.CheckManager.Debug = cm.Debug
cfg.CheckManager.Log = cm.Log
check, err := checkmgr.NewCheckManager(&cfg.CheckManager)
if err != nil {
return nil, err
}
cm.check = check
if _, err := cm.check.GetTrap(); err != nil {
return nil, err
}
return cm, nil
}
// Start initializes the CirconusMetrics instance based on
// configuration settings and sets the httptrap check url to
// which metrics should be sent. It then starts a perdiodic
// submission process of all metrics collected.
func (m *CirconusMetrics) Start() {
go func() {
for _ = range time.NewTicker(m.flushInterval).C {
m.Flush()
}
}()
}
// Flush metrics kicks off the process of sending metrics to Circonus
func (m *CirconusMetrics) Flush() {
if m.flushing {
return
}
m.flushmu.Lock()
m.flushing = true
m.flushmu.Unlock()
if m.Debug {
m.Log.Println("[DEBUG] Flushing metrics")
}
// check for new metrics and enable them automatically
newMetrics := make(map[string]*api.CheckBundleMetric)
counters, gauges, histograms, text := m.snapshot()
output := make(map[string]interface{})
for name, value := range counters {
send := m.check.IsMetricActive(name)
if !send && m.check.ActivateMetric(name) {
send = true
newMetrics[name] = &api.CheckBundleMetric{
Name: name,
Type: "numeric",
Status: "active",
}
}
if send {
output[name] = map[string]interface{}{
"_type": "n",
"_value": value,
}
}
}
for name, value := range gauges {
send := m.check.IsMetricActive(name)
if !send && m.check.ActivateMetric(name) {
send = true
newMetrics[name] = &api.CheckBundleMetric{
Name: name,
Type: "numeric",
Status: "active",
}
}
if send {
output[name] = map[string]interface{}{
"_type": "n",
"_value": value,
}
}
}
for name, value := range histograms {
send := m.check.IsMetricActive(name)
if !send && m.check.ActivateMetric(name) {
send = true
newMetrics[name] = &api.CheckBundleMetric{
Name: name,
Type: "histogram",
Status: "active",
}
}
if send {
output[name] = map[string]interface{}{
"_type": "n",
"_value": value.DecStrings(),
}
}
}
for name, value := range text {
send := m.check.IsMetricActive(name)
if !send && m.check.ActivateMetric(name) {
send = true
newMetrics[name] = &api.CheckBundleMetric{
Name: name,
Type: "text",
Status: "active",
}
}
if send {
output[name] = map[string]interface{}{
"_type": "s",
"_value": value,
}
}
}
m.submit(output, newMetrics)
m.flushmu.Lock()
m.flushing = false
m.flushmu.Unlock()
}

View File

@ -0,0 +1,44 @@
package circonusgometrics
// A Counter is a monotonically increasing unsigned integer.
//
// Use a counter to derive rates (e.g., record total number of requests, derive
// requests per second).
// Increment counter by 1
func (m *CirconusMetrics) Increment(metric string) {
m.Add(metric, 1)
}
// IncrementByValue updates counter by supplied value
func (m *CirconusMetrics) IncrementByValue(metric string, val uint64) {
m.Add(metric, val)
}
// Add updates counter by supplied value
func (m *CirconusMetrics) Add(metric string, val uint64) {
m.cm.Lock()
defer m.cm.Unlock()
m.counters[metric] += val
}
// RemoveCounter removes the named counter
func (m *CirconusMetrics) RemoveCounter(metric string) {
m.cm.Lock()
defer m.cm.Unlock()
delete(m.counters, metric)
}
// SetCounterFunc set counter to a function [called at flush interval]
func (m *CirconusMetrics) SetCounterFunc(metric string, fn func() uint64) {
m.cfm.Lock()
defer m.cfm.Unlock()
m.counterFuncs[metric] = fn
}
// RemoveCounterFunc removes the named counter function
func (m *CirconusMetrics) RemoveCounterFunc(metric string) {
m.cfm.Lock()
defer m.cfm.Unlock()
delete(m.counterFuncs, metric)
}

View File

@ -0,0 +1,39 @@
package circonusgometrics
// A Gauge is an instantaneous measurement of a value.
//
// Use a gauge to track metrics which increase and decrease (e.g., amount of
// free memory).
// Gauge sets a gauge to a value
func (m *CirconusMetrics) Gauge(metric string, val int64) {
m.SetGauge(metric, val)
}
// SetGauge sets a gauge to a value
func (m *CirconusMetrics) SetGauge(metric string, val int64) {
m.gm.Lock()
defer m.gm.Unlock()
m.gauges[metric] = val
}
// RemoveGauge removes a gauge
func (m *CirconusMetrics) RemoveGauge(metric string) {
m.gm.Lock()
defer m.gm.Unlock()
delete(m.gauges, metric)
}
// SetGaugeFunc sets a gauge to a function [called at flush interval]
func (m *CirconusMetrics) SetGaugeFunc(metric string, fn func() int64) {
m.gfm.Lock()
defer m.gfm.Unlock()
m.gaugeFuncs[metric] = fn
}
// RemoveGaugeFunc removes a gauge function
func (m *CirconusMetrics) RemoveGaugeFunc(metric string) {
m.gfm.Lock()
defer m.gfm.Unlock()
delete(m.gaugeFuncs, metric)
}

View File

@ -0,0 +1,73 @@
package circonusgometrics
import (
"sync"
"github.com/circonus-labs/circonusllhist"
)
// Histogram measures the distribution of a stream of values.
type Histogram struct {
name string
hist *circonusllhist.Histogram
rw sync.RWMutex
}
// Timing adds a value to a histogram
func (m *CirconusMetrics) Timing(metric string, val float64) {
m.SetHistogramValue(metric, val)
}
// RecordValue adds a value to a histogram
func (m *CirconusMetrics) RecordValue(metric string, val float64) {
m.SetHistogramValue(metric, val)
}
// SetHistogramValue adds a value to a histogram
func (m *CirconusMetrics) SetHistogramValue(metric string, val float64) {
m.NewHistogram(metric)
m.histograms[metric].rw.Lock()
defer m.histograms[metric].rw.Unlock()
m.histograms[metric].hist.RecordValue(val)
}
// RemoveHistogram removes a histogram
func (m *CirconusMetrics) RemoveHistogram(metric string) {
m.hm.Lock()
defer m.hm.Unlock()
delete(m.histograms, metric)
}
// NewHistogram returns a histogram instance.
func (m *CirconusMetrics) NewHistogram(metric string) *Histogram {
m.hm.Lock()
defer m.hm.Unlock()
if hist, ok := m.histograms[metric]; ok {
return hist
}
hist := &Histogram{
name: metric,
hist: circonusllhist.New(),
}
m.histograms[metric] = hist
return hist
}
// Name returns the name from a histogram instance
func (h *Histogram) Name() string {
return h.name
}
// RecordValue records the given value to a histogram instance
func (h *Histogram) RecordValue(v float64) {
h.rw.Lock()
defer h.rw.Unlock()
h.hist.RecordValue(v)
}

View File

@ -0,0 +1,122 @@
package circonusgometrics
import (
"bytes"
"encoding/json"
"errors"
"io/ioutil"
"log"
"net"
"net/http"
"strconv"
"time"
"github.com/circonus-labs/circonus-gometrics/api"
"github.com/hashicorp/go-retryablehttp"
)
func (m *CirconusMetrics) submit(output map[string]interface{}, newMetrics map[string]*api.CheckBundleMetric) {
if len(newMetrics) > 0 {
m.check.AddNewMetrics(newMetrics)
}
str, err := json.Marshal(output)
if err != nil {
m.Log.Printf("[ERROR] marshling output %+v", err)
return
}
numStats, err := m.trapCall(str)
if err != nil {
m.Log.Printf("[ERROR] %+v\n", err)
return
}
if m.Debug {
m.Log.Printf("[DEBUG] %d stats sent\n", numStats)
}
}
func (m *CirconusMetrics) trapCall(payload []byte) (int, error) {
trap, err := m.check.GetTrap()
if err != nil {
return 0, err
}
dataReader := bytes.NewReader(payload)
req, err := retryablehttp.NewRequest("PUT", trap.URL.String(), dataReader)
if err != nil {
return 0, err
}
req.Header.Add("Accept", "application/json")
client := retryablehttp.NewClient()
if trap.URL.Scheme == "https" {
client.HTTPClient.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: trap.TLS,
DisableKeepAlives: true,
MaxIdleConnsPerHost: -1,
DisableCompression: true,
}
} else {
client.HTTPClient.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
DisableKeepAlives: true,
MaxIdleConnsPerHost: -1,
DisableCompression: true,
}
}
client.RetryWaitMin = 10 * time.Millisecond
client.RetryWaitMax = 50 * time.Millisecond
client.RetryMax = 3
client.Logger = m.Log
attempts := -1
client.RequestLogHook = func(logger *log.Logger, req *http.Request, retryNumber int) {
attempts = retryNumber
}
resp, err := client.Do(req)
if err != nil {
if attempts == client.RetryMax {
m.check.RefreshTrap()
}
return 0, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
m.Log.Printf("[ERROR] reading body, proceeding. %s\n", err)
}
var response map[string]interface{}
err = json.Unmarshal(body, &response)
if err != nil {
m.Log.Printf("[ERROR] parsing body, proceeding. %s\n", err)
}
if resp.StatusCode != 200 {
return 0, errors.New("[ERROR] bad response code: " + strconv.Itoa(resp.StatusCode))
}
switch v := response["stats"].(type) {
case float64:
return int(v), nil
case int:
return v, nil
default:
}
return 0, errors.New("[ERROR] bad response type")
}

View File

@ -0,0 +1,37 @@
package circonusgometrics
// A Text metric is an arbitrary string
//
// SetText sets a text metric
func (m *CirconusMetrics) SetText(metric string, val string) {
m.SetTextValue(metric, val)
}
// SetTextValue sets a text metric
func (m *CirconusMetrics) SetTextValue(metric string, val string) {
m.tm.Lock()
defer m.tm.Unlock()
m.text[metric] = val
}
// RemoveText removes a text metric
func (m *CirconusMetrics) RemoveText(metric string) {
m.tm.Lock()
defer m.tm.Unlock()
delete(m.text, metric)
}
// SetTextFunc sets a text metric to a function [called at flush interval]
func (m *CirconusMetrics) SetTextFunc(metric string, fn func() string) {
m.tfm.Lock()
defer m.tfm.Unlock()
m.textFuncs[metric] = fn
}
// RemoveTextFunc a text metric function
func (m *CirconusMetrics) RemoveTextFunc(metric string) {
m.tfm.Lock()
defer m.tfm.Unlock()
delete(m.textFuncs, metric)
}

View File

@ -0,0 +1,19 @@
package circonusgometrics
import (
"net/http"
"time"
)
// TrackHTTPLatency wraps Handler functions registered with an http.ServerMux tracking latencies.
// Metrics are of the for go`HTTP`<method>`<name>`latency and are tracked in a histogram in units
// of seconds (as a float64) providing nanosecond ganularity.
func (m *CirconusMetrics) TrackHTTPLatency(name string, handler func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) {
return func(rw http.ResponseWriter, req *http.Request) {
start := time.Now().UnixNano()
handler(rw, req)
elapsed := time.Now().UnixNano() - start
//hist := m.NewHistogram("go`HTTP`" + req.Method + "`" + name + "`latency")
m.RecordValue("go`HTTP`"+req.Method+"`"+name+"`latency", float64(elapsed)/float64(time.Second))
}
}

View File

@ -0,0 +1,95 @@
package circonusgometrics
import (
"github.com/circonus-labs/circonusllhist"
)
// Reset removes all existing counters and gauges.
func (m *CirconusMetrics) Reset() {
m.cm.Lock()
defer m.cm.Unlock()
m.cfm.Lock()
defer m.cfm.Unlock()
m.gm.Lock()
defer m.gm.Unlock()
m.gfm.Lock()
defer m.gfm.Unlock()
m.hm.Lock()
defer m.hm.Unlock()
m.tm.Lock()
defer m.tm.Unlock()
m.tfm.Lock()
defer m.tfm.Unlock()
m.counters = make(map[string]uint64)
m.counterFuncs = make(map[string]func() uint64)
m.gauges = make(map[string]int64)
m.gaugeFuncs = make(map[string]func() int64)
m.histograms = make(map[string]*Histogram)
m.text = make(map[string]string)
m.textFuncs = make(map[string]func() string)
}
// snapshot returns a copy of the values of all registered counters and gauges.
func (m *CirconusMetrics) snapshot() (c map[string]uint64, g map[string]int64, h map[string]*circonusllhist.Histogram, t map[string]string) {
m.cm.Lock()
defer m.cm.Unlock()
m.cfm.Lock()
defer m.cfm.Unlock()
m.gm.Lock()
defer m.gm.Unlock()
m.gfm.Lock()
defer m.gfm.Unlock()
m.hm.Lock()
defer m.hm.Unlock()
m.tm.Lock()
defer m.tm.Unlock()
m.tfm.Lock()
defer m.tfm.Unlock()
c = make(map[string]uint64, len(m.counters)+len(m.counterFuncs))
for n, v := range m.counters {
c[n] = v
}
for n, f := range m.counterFuncs {
c[n] = f()
}
g = make(map[string]int64, len(m.gauges)+len(m.gaugeFuncs))
for n, v := range m.gauges {
g[n] = v
}
for n, f := range m.gaugeFuncs {
g[n] = f()
}
h = make(map[string]*circonusllhist.Histogram, len(m.histograms))
for n, hist := range m.histograms {
h[n] = hist.hist.CopyAndReset()
}
t = make(map[string]string, len(m.text)+len(m.textFuncs))
for n, v := range m.text {
t[n] = v
}
for n, f := range m.textFuncs {
t[n] = f()
}
return
}

28
vendor/github.com/circonus-labs/circonusllhist/LICENSE generated vendored Normal file
View File

@ -0,0 +1,28 @@
Copyright (c) 2016 Circonus, Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials provided
with the distribution.
* Neither the name Circonus, Inc. nor the names of its contributors
may be used to endorse or promote products derived from this
software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -0,0 +1,555 @@
// Copyright 2016, Circonus, Inc. All rights reserved.
// See the LICENSE file.
// Package circllhist provides an implementation of Circonus' fixed log-linear
// histogram data structure. This allows tracking of histograms in a
// composable way such that accurate error can be reasoned about.
package circonusllhist
import (
"bytes"
"errors"
"fmt"
"math"
"sync"
)
const (
DEFAULT_HIST_SIZE = int16(100)
)
var power_of_ten = [...]float64{
1, 10, 100, 1000, 10000, 100000, 1e+06, 1e+07, 1e+08, 1e+09, 1e+10,
1e+11, 1e+12, 1e+13, 1e+14, 1e+15, 1e+16, 1e+17, 1e+18, 1e+19, 1e+20,
1e+21, 1e+22, 1e+23, 1e+24, 1e+25, 1e+26, 1e+27, 1e+28, 1e+29, 1e+30,
1e+31, 1e+32, 1e+33, 1e+34, 1e+35, 1e+36, 1e+37, 1e+38, 1e+39, 1e+40,
1e+41, 1e+42, 1e+43, 1e+44, 1e+45, 1e+46, 1e+47, 1e+48, 1e+49, 1e+50,
1e+51, 1e+52, 1e+53, 1e+54, 1e+55, 1e+56, 1e+57, 1e+58, 1e+59, 1e+60,
1e+61, 1e+62, 1e+63, 1e+64, 1e+65, 1e+66, 1e+67, 1e+68, 1e+69, 1e+70,
1e+71, 1e+72, 1e+73, 1e+74, 1e+75, 1e+76, 1e+77, 1e+78, 1e+79, 1e+80,
1e+81, 1e+82, 1e+83, 1e+84, 1e+85, 1e+86, 1e+87, 1e+88, 1e+89, 1e+90,
1e+91, 1e+92, 1e+93, 1e+94, 1e+95, 1e+96, 1e+97, 1e+98, 1e+99, 1e+100,
1e+101, 1e+102, 1e+103, 1e+104, 1e+105, 1e+106, 1e+107, 1e+108, 1e+109,
1e+110, 1e+111, 1e+112, 1e+113, 1e+114, 1e+115, 1e+116, 1e+117, 1e+118,
1e+119, 1e+120, 1e+121, 1e+122, 1e+123, 1e+124, 1e+125, 1e+126, 1e+127,
1e-128, 1e-127, 1e-126, 1e-125, 1e-124, 1e-123, 1e-122, 1e-121, 1e-120,
1e-119, 1e-118, 1e-117, 1e-116, 1e-115, 1e-114, 1e-113, 1e-112, 1e-111,
1e-110, 1e-109, 1e-108, 1e-107, 1e-106, 1e-105, 1e-104, 1e-103, 1e-102,
1e-101, 1e-100, 1e-99, 1e-98, 1e-97, 1e-96,
1e-95, 1e-94, 1e-93, 1e-92, 1e-91, 1e-90, 1e-89, 1e-88, 1e-87, 1e-86,
1e-85, 1e-84, 1e-83, 1e-82, 1e-81, 1e-80, 1e-79, 1e-78, 1e-77, 1e-76,
1e-75, 1e-74, 1e-73, 1e-72, 1e-71, 1e-70, 1e-69, 1e-68, 1e-67, 1e-66,
1e-65, 1e-64, 1e-63, 1e-62, 1e-61, 1e-60, 1e-59, 1e-58, 1e-57, 1e-56,
1e-55, 1e-54, 1e-53, 1e-52, 1e-51, 1e-50, 1e-49, 1e-48, 1e-47, 1e-46,
1e-45, 1e-44, 1e-43, 1e-42, 1e-41, 1e-40, 1e-39, 1e-38, 1e-37, 1e-36,
1e-35, 1e-34, 1e-33, 1e-32, 1e-31, 1e-30, 1e-29, 1e-28, 1e-27, 1e-26,
1e-25, 1e-24, 1e-23, 1e-22, 1e-21, 1e-20, 1e-19, 1e-18, 1e-17, 1e-16,
1e-15, 1e-14, 1e-13, 1e-12, 1e-11, 1e-10, 1e-09, 1e-08, 1e-07, 1e-06,
1e-05, 0.0001, 0.001, 0.01, 0.1,
}
// A Bracket is a part of a cumulative distribution.
type Bin struct {
val int8
exp int8
count uint64
}
func NewBinRaw(val int8, exp int8, count uint64) *Bin {
return &Bin{
val: val,
exp: exp,
count: count,
}
}
func NewBin() *Bin {
return NewBinRaw(0, 0, 0)
}
func NewBinFromFloat64(d float64) *Bin {
hb := NewBinRaw(0, 0, 0)
hb.SetFromFloat64(d)
return hb
}
func (hb *Bin) SetFromFloat64(d float64) *Bin {
hb.val = -1
if math.IsInf(d, 0) || math.IsNaN(d) {
return hb
}
if d == 0.0 {
hb.val = 0
return hb
}
sign := 1
if math.Signbit(d) {
sign = -1
}
d = math.Abs(d)
big_exp := int(math.Floor(math.Log10(d)))
hb.exp = int8(big_exp)
if int(hb.exp) != big_exp { //rolled
hb.exp = 0
if big_exp < 0 {
hb.val = 0
}
return hb
}
d = d / hb.PowerOfTen()
d = d * 10
hb.val = int8(sign * int(math.Floor(d+1e-13)))
if hb.val == 100 || hb.val == -100 {
if hb.exp < 127 {
hb.val = hb.val / 10
hb.exp++
} else {
hb.val = 0
hb.exp = 0
}
}
if hb.val == 0 {
hb.exp = 0
return hb
}
if !((hb.val >= 10 && hb.val < 100) ||
(hb.val <= -10 && hb.val > -100)) {
hb.val = -1
hb.exp = 0
}
return hb
}
func (hb *Bin) PowerOfTen() float64 {
idx := int(hb.exp)
if idx < 0 {
idx = 256 + idx
}
return power_of_ten[idx]
}
func (hb *Bin) IsNaN() bool {
if hb.val > 99 || hb.val < -99 {
return true
}
return false
}
func (hb *Bin) Val() int8 {
return hb.val
}
func (hb *Bin) Exp() int8 {
return hb.exp
}
func (hb *Bin) Count() uint64 {
return hb.count
}
func (hb *Bin) Value() float64 {
if hb.IsNaN() {
return math.NaN()
}
if hb.val < 10 && hb.val > -10 {
return 0.0
}
return (float64(hb.val) / 10.0) * hb.PowerOfTen()
}
func (hb *Bin) BinWidth() float64 {
if hb.IsNaN() {
return math.NaN()
}
if hb.val < 10 && hb.val > -10 {
return 0.0
}
return hb.PowerOfTen() / 10.0
}
func (hb *Bin) Midpoint() float64 {
if hb.IsNaN() {
return math.NaN()
}
out := hb.Value()
if out == 0 {
return 0
}
interval := hb.BinWidth()
if out < 0 {
interval = interval * -1
}
return out + interval/2.0
}
func (hb *Bin) Left() float64 {
if hb.IsNaN() {
return math.NaN()
}
out := hb.Value()
if out >= 0 {
return out
}
return out - hb.BinWidth()
}
func (h1 *Bin) Compare(h2 *Bin) int {
if h1.val == h2.val && h1.exp == h2.exp {
return 0
}
if h1.val == -1 {
return 1
}
if h2.val == -1 {
return -1
}
if h1.val == 0 {
if h2.val > 0 {
return 1
}
return -1
}
if h2.val == 0 {
if h1.val < 0 {
return 1
}
return -1
}
if h1.val < 0 && h2.val > 0 {
return 1
}
if h1.val > 0 && h2.val < 0 {
return -1
}
if h1.exp == h2.exp {
if h1.val < h2.val {
return 1
}
return -1
}
if h1.exp > h2.exp {
if h1.val < 0 {
return 1
}
return -1
}
if h1.exp < h2.exp {
if h1.val < 0 {
return -1
}
return 1
}
return 0
}
// This histogram structure tracks values are two decimal digits of precision
// with a bounded error that remains bounded upon composition
type Histogram struct {
mutex sync.Mutex
bvs []Bin
used int16
allocd int16
}
// New returns a new Histogram
func New() *Histogram {
return &Histogram{
allocd: DEFAULT_HIST_SIZE,
used: 0,
bvs: make([]Bin, DEFAULT_HIST_SIZE),
}
}
// Max returns the approximate maximum recorded value.
func (h *Histogram) Max() float64 {
return h.ValueAtQuantile(1.0)
}
// Min returns the approximate minimum recorded value.
func (h *Histogram) Min() float64 {
return h.ValueAtQuantile(0.0)
}
// Mean returns the approximate arithmetic mean of the recorded values.
func (h *Histogram) Mean() float64 {
return h.ApproxMean()
}
// Reset forgets all bins in the histogram (they remain allocated)
func (h *Histogram) Reset() {
h.mutex.Lock()
h.used = 0
h.mutex.Unlock()
}
// RecordValue records the given value, returning an error if the value is out
// of range.
func (h *Histogram) RecordValue(v float64) error {
return h.RecordValues(v, 1)
}
// RecordCorrectedValue records the given value, correcting for stalls in the
// recording process. This only works for processes which are recording values
// at an expected interval (e.g., doing jitter analysis). Processes which are
// recording ad-hoc values (e.g., latency for incoming requests) can't take
// advantage of this.
// CH Compat
func (h *Histogram) RecordCorrectedValue(v, expectedInterval int64) error {
if err := h.RecordValue(float64(v)); err != nil {
return err
}
if expectedInterval <= 0 || v <= expectedInterval {
return nil
}
missingValue := v - expectedInterval
for missingValue >= expectedInterval {
if err := h.RecordValue(float64(missingValue)); err != nil {
return err
}
missingValue -= expectedInterval
}
return nil
}
// find where a new bin should go
func (h *Histogram) InternalFind(hb *Bin) (bool, int16) {
if h.used == 0 {
return false, 0
}
rv := -1
idx := int16(0)
l := int16(0)
r := h.used - 1
for l < r {
check := (r + l) / 2
rv = h.bvs[check].Compare(hb)
if rv == 0 {
l = check
r = check
} else if rv > 0 {
l = check + 1
} else {
r = check - 1
}
}
if rv != 0 {
rv = h.bvs[l].Compare(hb)
}
idx = l
if rv == 0 {
return true, idx
}
if rv < 0 {
return false, idx
}
idx++
return false, idx
}
func (h *Histogram) InsertBin(hb *Bin, count int64) uint64 {
h.mutex.Lock()
defer h.mutex.Unlock()
if count == 0 {
return 0
}
found, idx := h.InternalFind(hb)
if !found {
if h.used == h.allocd {
new_bvs := make([]Bin, h.allocd+DEFAULT_HIST_SIZE)
if idx > 0 {
copy(new_bvs[0:], h.bvs[0:idx])
}
if idx < h.used {
copy(new_bvs[idx+1:], h.bvs[idx:])
}
h.allocd = h.allocd + DEFAULT_HIST_SIZE
h.bvs = new_bvs
} else {
copy(h.bvs[idx+1:], h.bvs[idx:h.used])
}
h.bvs[idx].val = hb.val
h.bvs[idx].exp = hb.exp
h.bvs[idx].count = uint64(count)
h.used++
return h.bvs[idx].count
}
var newval uint64
if count < 0 {
newval = h.bvs[idx].count - uint64(-count)
} else {
newval = h.bvs[idx].count + uint64(count)
}
if newval < h.bvs[idx].count { //rolled
newval = ^uint64(0)
}
h.bvs[idx].count = newval
return newval - h.bvs[idx].count
}
// RecordValues records n occurrences of the given value, returning an error if
// the value is out of range.
func (h *Histogram) RecordValues(v float64, n int64) error {
var hb Bin
hb.SetFromFloat64(v)
h.InsertBin(&hb, n)
return nil
}
// Approximate mean
func (h *Histogram) ApproxMean() float64 {
h.mutex.Lock()
defer h.mutex.Unlock()
divisor := 0.0
sum := 0.0
for i := int16(0); i < h.used; i++ {
midpoint := h.bvs[i].Midpoint()
cardinality := float64(h.bvs[i].count)
divisor += cardinality
sum += midpoint * cardinality
}
if divisor == 0.0 {
return math.NaN()
}
return sum / divisor
}
// Approximate sum
func (h *Histogram) ApproxSum() float64 {
h.mutex.Lock()
defer h.mutex.Unlock()
sum := 0.0
for i := int16(0); i < h.used; i++ {
midpoint := h.bvs[i].Midpoint()
cardinality := float64(h.bvs[i].count)
sum += midpoint * cardinality
}
return sum
}
func (h *Histogram) ApproxQuantile(q_in []float64) ([]float64, error) {
h.mutex.Lock()
defer h.mutex.Unlock()
q_out := make([]float64, len(q_in))
i_q, i_b := 0, int16(0)
total_cnt, bin_width, bin_left, lower_cnt, upper_cnt := 0.0, 0.0, 0.0, 0.0, 0.0
if len(q_in) == 0 {
return q_out, nil
}
// Make sure the requested quantiles are in order
for i_q = 1; i_q < len(q_in); i_q++ {
if q_in[i_q-1] > q_in[i_q] {
return nil, errors.New("out of order")
}
}
// Add up the bins
for i_b = 0; i_b < h.used; i_b++ {
if !h.bvs[i_b].IsNaN() {
total_cnt += float64(h.bvs[i_b].count)
}
}
if total_cnt == 0.0 {
return nil, errors.New("empty_histogram")
}
for i_q = 0; i_q < len(q_in); i_q++ {
if q_in[i_q] < 0.0 || q_in[i_q] > 1.0 {
return nil, errors.New("out of bound quantile")
}
q_out[i_q] = total_cnt * q_in[i_q]
}
for i_b = 0; i_b < h.used; i_b++ {
if h.bvs[i_b].IsNaN() {
continue
}
bin_width = h.bvs[i_b].BinWidth()
bin_left = h.bvs[i_b].Left()
lower_cnt = upper_cnt
upper_cnt = lower_cnt + float64(h.bvs[i_b].count)
break
}
for i_q = 0; i_q < len(q_in); i_q++ {
for i_b < (h.used-1) && upper_cnt < q_out[i_q] {
i_b++
bin_width = h.bvs[i_b].BinWidth()
bin_left = h.bvs[i_b].Left()
lower_cnt = upper_cnt
upper_cnt = lower_cnt + float64(h.bvs[i_b].count)
}
if lower_cnt == q_out[i_q] {
q_out[i_q] = bin_left
} else if upper_cnt == q_out[i_q] {
q_out[i_q] = bin_left + bin_width
} else {
if bin_width == 0 {
q_out[i_q] = bin_left
} else {
q_out[i_q] = bin_left + (q_out[i_q]-lower_cnt)/(upper_cnt-lower_cnt)*bin_width
}
}
}
return q_out, nil
}
// ValueAtQuantile returns the recorded value at the given quantile (0..1).
func (h *Histogram) ValueAtQuantile(q float64) float64 {
h.mutex.Lock()
defer h.mutex.Unlock()
q_in := make([]float64, 1)
q_in[0] = q
q_out, err := h.ApproxQuantile(q_in)
if err == nil && len(q_out) == 1 {
return q_out[0]
}
return math.NaN()
}
// SignificantFigures returns the significant figures used to create the
// histogram
// CH Compat
func (h *Histogram) SignificantFigures() int64 {
return 2
}
// Equals returns true if the two Histograms are equivalent, false if not.
func (h *Histogram) Equals(other *Histogram) bool {
h.mutex.Lock()
other.mutex.Lock()
defer h.mutex.Unlock()
defer other.mutex.Unlock()
switch {
case
h.used != other.used:
return false
default:
for i := int16(0); i < h.used; i++ {
if h.bvs[i].Compare(&other.bvs[i]) != 0 {
return false
}
if h.bvs[i].count != other.bvs[i].count {
return false
}
}
}
return true
}
func (h *Histogram) CopyAndReset() *Histogram {
h.mutex.Lock()
defer h.mutex.Unlock()
newhist := &Histogram{
allocd: h.allocd,
used: h.used,
bvs: h.bvs,
}
h.allocd = DEFAULT_HIST_SIZE
h.bvs = make([]Bin, DEFAULT_HIST_SIZE)
h.used = 0
return newhist
}
func (h *Histogram) DecStrings() []string {
h.mutex.Lock()
defer h.mutex.Unlock()
out := make([]string, h.used)
for i, bin := range h.bvs[0:h.used] {
var buffer bytes.Buffer
buffer.WriteString("H[")
buffer.WriteString(fmt.Sprintf("%3.1e", bin.Value()))
buffer.WriteString("]=")
buffer.WriteString(fmt.Sprintf("%v", bin.count))
out[i] = buffer.String()
}
return out
}

View File

@ -0,0 +1,3 @@
.idea/
*.iml
*.test

View File

@ -0,0 +1,12 @@
sudo: false
language: go
go:
- 1.5.1
branches:
only:
- master
script: make updatedeps test

363
vendor/github.com/hashicorp/go-retryablehttp/LICENSE generated vendored Normal file
View File

@ -0,0 +1,363 @@
Mozilla Public License, version 2.0
1. Definitions
1.1. "Contributor"
means each individual or legal entity that creates, contributes to the
creation of, or owns Covered Software.
1.2. "Contributor Version"
means the combination of the Contributions of others (if any) used by a
Contributor and that particular Contributor's Contribution.
1.3. "Contribution"
means Covered Software of a particular Contributor.
1.4. "Covered Software"
means Source Code Form to which the initial Contributor has attached the
notice in Exhibit A, the Executable Form of such Source Code Form, and
Modifications of such Source Code Form, in each case including portions
thereof.
1.5. "Incompatible With Secondary Licenses"
means
a. that the initial Contributor has attached the notice described in
Exhibit B to the Covered Software; or
b. that the Covered Software was made available under the terms of
version 1.1 or earlier of the License, but not also under the terms of
a Secondary License.
1.6. "Executable Form"
means any form of the work other than Source Code Form.
1.7. "Larger Work"
means a work that combines Covered Software with other material, in a
separate file or files, that is not Covered Software.
1.8. "License"
means this document.
1.9. "Licensable"
means having the right to grant, to the maximum extent possible, whether
at the time of the initial grant or subsequently, any and all of the
rights conveyed by this License.
1.10. "Modifications"
means any of the following:
a. any file in Source Code Form that results from an addition to,
deletion from, or modification of the contents of Covered Software; or
b. any new file in Source Code Form that contains any Covered Software.
1.11. "Patent Claims" of a Contributor
means any patent claim(s), including without limitation, method,
process, and apparatus claims, in any patent Licensable by such
Contributor that would be infringed, but for the grant of the License,
by the making, using, selling, offering for sale, having made, import,
or transfer of either its Contributions or its Contributor Version.
1.12. "Secondary License"
means either the GNU General Public License, Version 2.0, the GNU Lesser
General Public License, Version 2.1, the GNU Affero General Public
License, Version 3.0, or any later versions of those licenses.
1.13. "Source Code Form"
means the form of the work preferred for making modifications.
1.14. "You" (or "Your")
means an individual or a legal entity exercising rights under this
License. For legal entities, "You" includes any entity that controls, is
controlled by, or is under common control with You. For purposes of this
definition, "control" means (a) the power, direct or indirect, to cause
the direction or management of such entity, whether by contract or
otherwise, or (b) ownership of more than fifty percent (50%) of the
outstanding shares or beneficial ownership of such entity.
2. License Grants and Conditions
2.1. Grants
Each Contributor hereby grants You a world-wide, royalty-free,
non-exclusive license:
a. under intellectual property rights (other than patent or trademark)
Licensable by such Contributor to use, reproduce, make available,
modify, display, perform, distribute, and otherwise exploit its
Contributions, either on an unmodified basis, with Modifications, or
as part of a Larger Work; and
b. under Patent Claims of such Contributor to make, use, sell, offer for
sale, have made, import, and otherwise transfer either its
Contributions or its Contributor Version.
2.2. Effective Date
The licenses granted in Section 2.1 with respect to any Contribution
become effective for each Contribution on the date the Contributor first
distributes such Contribution.
2.3. Limitations on Grant Scope
The licenses granted in this Section 2 are the only rights granted under
this License. No additional rights or licenses will be implied from the
distribution or licensing of Covered Software under this License.
Notwithstanding Section 2.1(b) above, no patent license is granted by a
Contributor:
a. for any code that a Contributor has removed from Covered Software; or
b. for infringements caused by: (i) Your and any other third party's
modifications of Covered Software, or (ii) the combination of its
Contributions with other software (except as part of its Contributor
Version); or
c. under Patent Claims infringed by Covered Software in the absence of
its Contributions.
This License does not grant any rights in the trademarks, service marks,
or logos of any Contributor (except as may be necessary to comply with
the notice requirements in Section 3.4).
2.4. Subsequent Licenses
No Contributor makes additional grants as a result of Your choice to
distribute the Covered Software under a subsequent version of this
License (see Section 10.2) or under the terms of a Secondary License (if
permitted under the terms of Section 3.3).
2.5. Representation
Each Contributor represents that the Contributor believes its
Contributions are its original creation(s) or it has sufficient rights to
grant the rights to its Contributions conveyed by this License.
2.6. Fair Use
This License is not intended to limit any rights You have under
applicable copyright doctrines of fair use, fair dealing, or other
equivalents.
2.7. Conditions
Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted in
Section 2.1.
3. Responsibilities
3.1. Distribution of Source Form
All distribution of Covered Software in Source Code Form, including any
Modifications that You create or to which You contribute, must be under
the terms of this License. You must inform recipients that the Source
Code Form of the Covered Software is governed by the terms of this
License, and how they can obtain a copy of this License. You may not
attempt to alter or restrict the recipients' rights in the Source Code
Form.
3.2. Distribution of Executable Form
If You distribute Covered Software in Executable Form then:
a. such Covered Software must also be made available in Source Code Form,
as described in Section 3.1, and You must inform recipients of the
Executable Form how they can obtain a copy of such Source Code Form by
reasonable means in a timely manner, at a charge no more than the cost
of distribution to the recipient; and
b. You may distribute such Executable Form under the terms of this
License, or sublicense it under different terms, provided that the
license for the Executable Form does not attempt to limit or alter the
recipients' rights in the Source Code Form under this License.
3.3. Distribution of a Larger Work
You may create and distribute a Larger Work under terms of Your choice,
provided that You also comply with the requirements of this License for
the Covered Software. If the Larger Work is a combination of Covered
Software with a work governed by one or more Secondary Licenses, and the
Covered Software is not Incompatible With Secondary Licenses, this
License permits You to additionally distribute such Covered Software
under the terms of such Secondary License(s), so that the recipient of
the Larger Work may, at their option, further distribute the Covered
Software under the terms of either this License or such Secondary
License(s).
3.4. Notices
You may not remove or alter the substance of any license notices
(including copyright notices, patent notices, disclaimers of warranty, or
limitations of liability) contained within the Source Code Form of the
Covered Software, except that You may alter any license notices to the
extent required to remedy known factual inaccuracies.
3.5. Application of Additional Terms
You may choose to offer, and to charge a fee for, warranty, support,
indemnity or liability obligations to one or more recipients of Covered
Software. However, You may do so only on Your own behalf, and not on
behalf of any Contributor. You must make it absolutely clear that any
such warranty, support, indemnity, or liability obligation is offered by
You alone, and You hereby agree to indemnify every Contributor for any
liability incurred by such Contributor as a result of warranty, support,
indemnity or liability terms You offer. You may include additional
disclaimers of warranty and limitations of liability specific to any
jurisdiction.
4. Inability to Comply Due to Statute or Regulation
If it is impossible for You to comply with any of the terms of this License
with respect to some or all of the Covered Software due to statute,
judicial order, or regulation then You must: (a) comply with the terms of
this License to the maximum extent possible; and (b) describe the
limitations and the code they affect. Such description must be placed in a
text file included with all distributions of the Covered Software under
this License. Except to the extent prohibited by statute or regulation,
such description must be sufficiently detailed for a recipient of ordinary
skill to be able to understand it.
5. Termination
5.1. The rights granted under this License will terminate automatically if You
fail to comply with any of its terms. However, if You become compliant,
then the rights granted under this License from a particular Contributor
are reinstated (a) provisionally, unless and until such Contributor
explicitly and finally terminates Your grants, and (b) on an ongoing
basis, if such Contributor fails to notify You of the non-compliance by
some reasonable means prior to 60 days after You have come back into
compliance. Moreover, Your grants from a particular Contributor are
reinstated on an ongoing basis if such Contributor notifies You of the
non-compliance by some reasonable means, this is the first time You have
received notice of non-compliance with this License from such
Contributor, and You become compliant prior to 30 days after Your receipt
of the notice.
5.2. If You initiate litigation against any entity by asserting a patent
infringement claim (excluding declaratory judgment actions,
counter-claims, and cross-claims) alleging that a Contributor Version
directly or indirectly infringes any patent, then the rights granted to
You by any and all Contributors for the Covered Software under Section
2.1 of this License shall terminate.
5.3. In the event of termination under Sections 5.1 or 5.2 above, all end user
license agreements (excluding distributors and resellers) which have been
validly granted by You or Your distributors under this License prior to
termination shall survive termination.
6. Disclaimer of Warranty
Covered Software is provided under this License on an "as is" basis,
without warranty of any kind, either expressed, implied, or statutory,
including, without limitation, warranties that the Covered Software is free
of defects, merchantable, fit for a particular purpose or non-infringing.
The entire risk as to the quality and performance of the Covered Software
is with You. Should any Covered Software prove defective in any respect,
You (not any Contributor) assume the cost of any necessary servicing,
repair, or correction. This disclaimer of warranty constitutes an essential
part of this License. No use of any Covered Software is authorized under
this License except under this disclaimer.
7. Limitation of Liability
Under no circumstances and under no legal theory, whether tort (including
negligence), contract, or otherwise, shall any Contributor, or anyone who
distributes Covered Software as permitted above, be liable to You for any
direct, indirect, special, incidental, or consequential damages of any
character including, without limitation, damages for lost profits, loss of
goodwill, work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses, even if such party shall have been
informed of the possibility of such damages. This limitation of liability
shall not apply to liability for death or personal injury resulting from
such party's negligence to the extent applicable law prohibits such
limitation. Some jurisdictions do not allow the exclusion or limitation of
incidental or consequential damages, so this exclusion and limitation may
not apply to You.
8. Litigation
Any litigation relating to this License may be brought only in the courts
of a jurisdiction where the defendant maintains its principal place of
business and such litigation shall be governed by laws of that
jurisdiction, without reference to its conflict-of-law provisions. Nothing
in this Section shall prevent a party's ability to bring cross-claims or
counter-claims.
9. Miscellaneous
This License represents the complete agreement concerning the subject
matter hereof. If any provision of this License is held to be
unenforceable, such provision shall be reformed only to the extent
necessary to make it enforceable. Any law or regulation which provides that
the language of a contract shall be construed against the drafter shall not
be used to construe this License against a Contributor.
10. Versions of the License
10.1. New Versions
Mozilla Foundation is the license steward. Except as provided in Section
10.3, no one other than the license steward has the right to modify or
publish new versions of this License. Each version will be given a
distinguishing version number.
10.2. Effect of New Versions
You may distribute the Covered Software under the terms of the version
of the License under which You originally received the Covered Software,
or under the terms of any subsequent version published by the license
steward.
10.3. Modified Versions
If you create software not governed by this License, and you want to
create a new license for such software, you may create and use a
modified version of this License if you rename the license and remove
any references to the name of the license steward (except to note that
such modified license differs from this License).
10.4. Distributing Source Code Form that is Incompatible With Secondary
Licenses If You choose to distribute Source Code Form that is
Incompatible With Secondary Licenses under the terms of this version of
the License, the notice described in Exhibit B of this License must be
attached.
Exhibit A - Source Code Form License Notice
This Source Code Form is subject to the
terms of the Mozilla Public License, v.
2.0. If a copy of the MPL was not
distributed with this file, You can
obtain one at
http://mozilla.org/MPL/2.0/.
If it is not possible or desirable to put the notice in a particular file,
then You may include the notice in a location (such as a LICENSE file in a
relevant directory) where a recipient would be likely to look for such a
notice.
You may add additional accurate notices of copyright ownership.
Exhibit B - "Incompatible With Secondary Licenses" Notice
This Source Code Form is "Incompatible
With Secondary Licenses", as defined by
the Mozilla Public License, v. 2.0.

11
vendor/github.com/hashicorp/go-retryablehttp/Makefile generated vendored Normal file
View File

@ -0,0 +1,11 @@
default: test
test:
go vet ./...
go test -race ./...
updatedeps:
go get -f -t -u ./...
go get -f -u ./...
.PHONY: default test updatedeps

43
vendor/github.com/hashicorp/go-retryablehttp/README.md generated vendored Normal file
View File

@ -0,0 +1,43 @@
go-retryablehttp
================
[![Build Status](http://img.shields.io/travis/hashicorp/go-retryablehttp.svg?style=flat-square)][travis]
[![Go Documentation](http://img.shields.io/badge/go-documentation-blue.svg?style=flat-square)][godocs]
[travis]: http://travis-ci.org/hashicorp/go-retryablehttp
[godocs]: http://godoc.org/github.com/hashicorp/go-retryablehttp
The `retryablehttp` package provides a familiar HTTP client interface with
automatic retries and exponential backoff. It is a thin wrapper over the
standard `net/http` client library and exposes nearly the same public API. This
makes `retryablehttp` very easy to drop into existing programs.
`retryablehttp` performs automatic retries under certain conditions. Mainly, if
an error is returned by the client (connection errors, etc.), or if a 500-range
response code is received, then a retry is invoked after a wait period.
Otherwise, the response is returned and left to the caller to interpret.
The main difference from `net/http` is that requests which take a request body
(POST/PUT et. al) require an `io.ReadSeeker` to be provided. This enables the
request body to be "rewound" if the initial request fails so that the full
request can be attempted again.
Example Use
===========
Using this library should look almost identical to what you would do with
`net/http`. The most simple example of a GET request is shown below:
```go
resp, err := retryablehttp.Get("/foo")
if err != nil {
panic(err)
}
```
The returned response object is an `*http.Response`, the same thing you would
usually get from `net/http`. Had the request failed one or more times, the above
call would block and retry with exponential backoff.
For more usage and examples see the
[godoc](http://godoc.org/github.com/hashicorp/go-retryablehttp).

250
vendor/github.com/hashicorp/go-retryablehttp/client.go generated vendored Normal file
View File

@ -0,0 +1,250 @@
// The retryablehttp package provides a familiar HTTP client interface with
// automatic retries and exponential backoff. It is a thin wrapper over the
// standard net/http client library and exposes nearly the same public API.
// This makes retryablehttp very easy to drop into existing programs.
//
// retryablehttp performs automatic retries under certain conditions. Mainly, if
// an error is returned by the client (connection errors etc), or if a 500-range
// response is received, then a retry is invoked. Otherwise, the response is
// returned and left to the caller to interpret.
//
// The main difference from net/http is that requests which take a request body
// (POST/PUT et. al) require an io.ReadSeeker to be provided. This enables the
// request body to be "rewound" if the initial request fails so that the full
// request can be attempted again.
package retryablehttp
import (
"fmt"
"io"
"io/ioutil"
"log"
"math"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/hashicorp/go-cleanhttp"
)
var (
// Default retry configuration
defaultRetryWaitMin = 1 * time.Second
defaultRetryWaitMax = 5 * time.Minute
defaultRetryMax = 32
// defaultClient is used for performing requests without explicitly making
// a new client. It is purposely private to avoid modifications.
defaultClient = NewClient()
)
// LenReader is an interface implemented by many in-memory io.Reader's. Used
// for automatically sending the right Content-Length header when possible.
type LenReader interface {
Len() int
}
// Request wraps the metadata needed to create HTTP requests.
type Request struct {
// body is a seekable reader over the request body payload. This is
// used to rewind the request data in between retries.
body io.ReadSeeker
// Embed an HTTP request directly. This makes a *Request act exactly
// like an *http.Request so that all meta methods are supported.
*http.Request
}
// NewRequest creates a new wrapped request.
func NewRequest(method, url string, body io.ReadSeeker) (*Request, error) {
// Wrap the body in a noop ReadCloser if non-nil. This prevents the
// reader from being closed by the HTTP client.
var rcBody io.ReadCloser
if body != nil {
rcBody = ioutil.NopCloser(body)
}
// Make the request with the noop-closer for the body.
httpReq, err := http.NewRequest(method, url, rcBody)
if err != nil {
return nil, err
}
// Check if we can set the Content-Length automatically.
if lr, ok := body.(LenReader); ok {
httpReq.ContentLength = int64(lr.Len())
}
return &Request{body, httpReq}, nil
}
// RequestLogHook allows a function to run before each retry. The HTTP
// request which will be made, and the retry number (0 for the initial
// request) are available to users. The internal logger is exposed to
// consumers.
type RequestLogHook func(*log.Logger, *http.Request, int)
// ResponseLogHook is like RequestLogHook, but allows running a function
// on each HTTP response. This function will be invoked at the end of
// every HTTP request executed, regardless of whether a subsequent retry
// needs to be performed or not. If the response body is read or closed
// from this method, this will affect the response returned from Do().
type ResponseLogHook func(*log.Logger, *http.Response)
// Client is used to make HTTP requests. It adds additional functionality
// like automatic retries to tolerate minor outages.
type Client struct {
HTTPClient *http.Client // Internal HTTP client.
Logger *log.Logger // Customer logger instance.
RetryWaitMin time.Duration // Minimum time to wait
RetryWaitMax time.Duration // Maximum time to wait
RetryMax int // Maximum number of retries
// RequestLogHook allows a user-supplied function to be called
// before each retry.
RequestLogHook RequestLogHook
// ResponseLogHook allows a user-supplied function to be called
// with the response from each HTTP request executed.
ResponseLogHook ResponseLogHook
}
// NewClient creates a new Client with default settings.
func NewClient() *Client {
return &Client{
HTTPClient: cleanhttp.DefaultClient(),
Logger: log.New(os.Stderr, "", log.LstdFlags),
RetryWaitMin: defaultRetryWaitMin,
RetryWaitMax: defaultRetryWaitMax,
RetryMax: defaultRetryMax,
}
}
// Do wraps calling an HTTP method with retries.
func (c *Client) Do(req *Request) (*http.Response, error) {
c.Logger.Printf("[DEBUG] %s %s", req.Method, req.URL)
for i := 0; ; i++ {
var code int // HTTP response code
// Always rewind the request body when non-nil.
if req.body != nil {
if _, err := req.body.Seek(0, 0); err != nil {
return nil, fmt.Errorf("failed to seek body: %v", err)
}
}
if c.RequestLogHook != nil {
c.RequestLogHook(c.Logger, req.Request, i)
}
// Attempt the request
resp, err := c.HTTPClient.Do(req.Request)
if err != nil {
c.Logger.Printf("[ERR] %s %s request failed: %v", req.Method, req.URL, err)
goto RETRY
}
code = resp.StatusCode
// Call the response logger function if provided.
if c.ResponseLogHook != nil {
c.ResponseLogHook(c.Logger, resp)
}
// Check the response code. We retry on 500-range responses to allow
// the server time to recover, as 500's are typically not permanent
// errors and may relate to outages on the server side.
if code%500 < 100 {
resp.Body.Close()
goto RETRY
}
return resp, nil
RETRY:
remain := c.RetryMax - i
if remain == 0 {
break
}
wait := backoff(c.RetryWaitMin, c.RetryWaitMax, i)
desc := fmt.Sprintf("%s %s", req.Method, req.URL)
if code > 0 {
desc = fmt.Sprintf("%s (status: %d)", desc, code)
}
c.Logger.Printf("[DEBUG] %s: retrying in %s (%d left)", desc, wait, remain)
time.Sleep(wait)
}
// Return an error if we fall out of the retry loop
return nil, fmt.Errorf("%s %s giving up after %d attempts",
req.Method, req.URL, c.RetryMax+1)
}
// Get is a shortcut for doing a GET request without making a new client.
func Get(url string) (*http.Response, error) {
return defaultClient.Get(url)
}
// Get is a convenience helper for doing simple GET requests.
func (c *Client) Get(url string) (*http.Response, error) {
req, err := NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return c.Do(req)
}
// Head is a shortcut for doing a HEAD request without making a new client.
func Head(url string) (*http.Response, error) {
return defaultClient.Head(url)
}
// Head is a convenience method for doing simple HEAD requests.
func (c *Client) Head(url string) (*http.Response, error) {
req, err := NewRequest("HEAD", url, nil)
if err != nil {
return nil, err
}
return c.Do(req)
}
// Post is a shortcut for doing a POST request without making a new client.
func Post(url, bodyType string, body io.ReadSeeker) (*http.Response, error) {
return defaultClient.Post(url, bodyType, body)
}
// Post is a convenience method for doing simple POST requests.
func (c *Client) Post(url, bodyType string, body io.ReadSeeker) (*http.Response, error) {
req, err := NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", bodyType)
return c.Do(req)
}
// PostForm is a shortcut to perform a POST with form data without creating
// a new client.
func PostForm(url string, data url.Values) (*http.Response, error) {
return defaultClient.PostForm(url, data)
}
// PostForm is a convenience method for doing simple POST operations using
// pre-filled url.Values form data.
func (c *Client) PostForm(url string, data url.Values) (*http.Response, error) {
return c.Post(url, "application/x-www-form-urlencoded", strings.NewReader(data.Encode()))
}
// backoff is used to calculate how long to sleep before retrying
// after observing failures. It takes the minimum/maximum wait time and
// iteration, and returns the duration to wait.
func backoff(min, max time.Duration, iter int) time.Duration {
mult := math.Pow(2, float64(iter)) * float64(min)
sleep := time.Duration(mult)
if float64(sleep) != mult || sleep > max {
sleep = max
}
return sleep
}

View File

@ -681,6 +681,39 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
* <a name="telemetry-disable_hostname"></a><a href="#telemetry-disable_hostname">`disable_hostname`</a> * <a name="telemetry-disable_hostname"></a><a href="#telemetry-disable_hostname">`disable_hostname`</a>
This controls whether or not to prepend runtime telemetry with the machine's hostname, defaults to false. This controls whether or not to prepend runtime telemetry with the machine's hostname, defaults to false.
* <a name="telemetry-circonus_api_token"></a><a href="#telemetry-circonus_api_token">`circonus_api_token`</a>
A valid API Token used to create/manage check. If provided, metric management is enabled.
* <a name="telemetry-circonus_api_app"></a><a href="#telemetry-circonus_api_app">`circonus_api_app`</a>
A valid app name associated with the API token. By default, this is set to "consul".
* <a name="telemetry-circonus_api_url"></a><a href="#telemetry-circonus_api_url">`circonus_api_url`</a>
The base URL to use for contacting the Circonus API. By default, this is set to "https://api.circonus.com/v2".
* <a name="telemetry-circonus_submission_interval"></a><a href="#telemetry-circonus_submission_interval">`circonus_submission_interval`</a>
The interval at which metrics are submitted to Circonus. By default, this is set to "10s" (ten seconds).
* <a name="telemetry-circonus_submission_url"></a><a href="#telemetry-circonus_submission_url">`circonus_submission_url`</a>
The `check.config.submission_url` field, of a Check API object, from a previously created HTTPTRAP check.
* <a name="telemetry-circonus_check_id"></a><a href="#telemetry-circonus_check_id">`circonus_check_id`</a>
The Check ID (not **check bundle**) from a previously created HTTPTRAP check. The numeric portion of the `check._cid` field in the Check API object.
* <a name="telemetry-circonus_check_force_metric_activation"></a><a href="#telemetry-circonus_check_force_metric_activation">`circonus_check_force_metric_activation`</a>
Force activation of metrics which already exist and are not currently active. If check management is enabled, the default behavior is to add new metrics as they are encoutered. If the metric already exists in the check, it will **not** be activated. This setting overrides that behavior. By default, this is set to false.
* <a name="telemetry-circonus_check_instance_id"></a><a href="#telemetry-circonus_check_instance_id">`circonus_check_instance_id`</a>
Uniquely identifies the metrics coming from this *instance*. It can be used to maintain metric continuity with transient or ephemeral instances as they move around within an infrastructure. By default, this is set to hostname:application name (e.g. "host123:consul").
* <a name="telemetry-circonus_check_search_tag"></a><a href="#telemetry-circonus_check_search_tag">`circonus_check_search_tag`</a>
A special tag which, when coupled with the instance id, helps to narrow down the search results when neither a Submission URL or Check ID is provided. By default, this is set to service:application name (e.g. "service:consul").
* <a name="telemetry-circonus_broker_id"></a><a href="#telemetry-circonus_broker_id">`circonus_broker_id`</a>
The ID of a specific Circonus Broker to use when creating a new check. The numeric portion of `broker._cid` field in a Broker API object. If metric management is enabled and neither a Submission URL nor Check ID is provided, an attempt will be made to search for an existing check using Instance ID and Search Tag. If one is not found, a new HTTPTRAP check will be created. By default, this is not used and a random Enterprise Broker is selected, or the default Circonus Public Broker.
* <a name="telemetry-circonus_broker_search_tag"></a><a href="#telemetry-circonus_broker_search_tag">`circonus_broker_search_tag`</a>
A special tag which will be used to select a Circonus Broker when a Broker ID is not provided. The best use of this is to as a hint for which broker should be used based on *where* this particular instance is running (e.g. a specific geo location or datacenter, dc:sfo). By default, this is left blank and not used.
* <a name="statsd_addr"></a><a href="#statsd_addr">`statsd_addr`</a> Deprecated, see * <a name="statsd_addr"></a><a href="#statsd_addr">`statsd_addr`</a> Deprecated, see
the <a href="#telemetry">telemetry</a> structure the <a href="#telemetry">telemetry</a> structure