421 lines
12 KiB
Go
Raw Normal View History

// +build go1.9
2019-03-26 17:50:42 -04:00
package prometheus
import (
"fmt"
"log"
2020-11-05 11:51:58 -08:00
"math"
2020-08-11 11:17:43 -05:00
"regexp"
"strings"
"sync"
"time"
"github.com/armon/go-metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
)
var (
// DefaultPrometheusOpts is the default set of options used when creating a
// PrometheusSink.
DefaultPrometheusOpts = PrometheusOpts{
Expiration: 60 * time.Second,
}
)
// PrometheusOpts is used to configure the Prometheus Sink
type PrometheusOpts struct {
// Expiration is the duration a metric is valid for, after which it will be
// untracked. If the value is zero, a metric is never expired.
Expiration time.Duration
Registerer prometheus.Registerer
2020-11-05 11:51:58 -08:00
// Gauges, Summaries, and Counters allow us to pre-declare metrics by giving their Name, Help, and ConstLabels to
// the PrometheusSink when it is created. Metrics declared in this way will be initialized at zero and will not be
// deleted when their expiry is reached.
// - Gauges and Summaries will be set to NaN when they expire.
// - Counters continue to Collect their last known value.
// Ex:
// PrometheusOpts{
// Expiration: 10 * time.Second,
// Gauges: []GaugeDefinition{
// {
// Name: []string{ "application", "component", "measurement"},
// Help: "application_component_measurement provides an example of how to declare static metrics",
// ConstLabels: []metrics.Label{ { Name: "my_label", Value: "does_not_change" }, },
// },
// },
// }
GaugeDefinitions []GaugeDefinition
SummaryDefinitions []SummaryDefinition
CounterDefinitions []CounterDefinition
}
type PrometheusSink struct {
// If these will ever be copied, they should be converted to *sync.Map values and initialized appropriately
gauges sync.Map
summaries sync.Map
counters sync.Map
expiration time.Duration
}
2020-11-05 11:51:58 -08:00
// GaugeDefinition can be provided to PrometheusOpts to declare a constant gauge that is not deleted on expiry.
type GaugeDefinition struct {
Name []string
ConstLabels []metrics.Label
Help string
}
type gauge struct {
2020-08-11 11:17:43 -05:00
prometheus.Gauge
updatedAt time.Time
2020-11-05 11:51:58 -08:00
// canDelete is set if the metric is created during runtime so we know it's ephemeral and can delete it on expiry.
canDelete bool
}
// SummaryDefinition can be provided to PrometheusOpts to declare a constant summary that is not deleted on expiry.
type SummaryDefinition struct {
Name []string
ConstLabels []metrics.Label
Help string
2020-08-11 11:17:43 -05:00
}
2020-11-05 11:51:58 -08:00
type summary struct {
2020-08-11 11:17:43 -05:00
prometheus.Summary
updatedAt time.Time
2020-11-05 11:51:58 -08:00
canDelete bool
}
// CounterDefinition can be provided to PrometheusOpts to declare a constant counter that is not deleted on expiry.
type CounterDefinition struct {
Name []string
ConstLabels []metrics.Label
Help string
2020-08-11 11:17:43 -05:00
}
2020-11-05 11:51:58 -08:00
type counter struct {
2020-08-11 11:17:43 -05:00
prometheus.Counter
updatedAt time.Time
2020-11-05 11:51:58 -08:00
canDelete bool
2020-08-11 11:17:43 -05:00
}
// NewPrometheusSink creates a new PrometheusSink using the default options.
func NewPrometheusSink() (*PrometheusSink, error) {
return NewPrometheusSinkFrom(DefaultPrometheusOpts)
}
// NewPrometheusSinkFrom creates a new PrometheusSink using the passed options.
func NewPrometheusSinkFrom(opts PrometheusOpts) (*PrometheusSink, error) {
sink := &PrometheusSink{
gauges: sync.Map{},
summaries: sync.Map{},
counters: sync.Map{},
expiration: opts.Expiration,
}
2020-11-05 11:51:58 -08:00
initGauges(&sink.gauges, opts.GaugeDefinitions)
initSummaries(&sink.summaries, opts.SummaryDefinitions)
initCounters(&sink.counters, opts.CounterDefinitions)
reg := opts.Registerer
if reg == nil {
reg = prometheus.DefaultRegisterer
}
return sink, reg.Register(sink)
}
// Describe is needed to meet the Collector interface.
func (p *PrometheusSink) Describe(c chan<- *prometheus.Desc) {
// We must emit some description otherwise an error is returned. This
// description isn't shown to the user!
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(c)
}
// Collect meets the collection interface and allows us to enforce our expiration
// logic to clean up ephemeral metrics if their value haven't been set for a
// duration exceeding our allowed expiration time.
func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) {
expire := p.expiration != 0
now := time.Now()
p.gauges.Range(func(k, v interface{}) bool {
2020-11-05 11:51:58 -08:00
if v == nil {
return true
}
g := v.(*gauge)
lastUpdate := g.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if g.canDelete {
2020-08-11 11:17:43 -05:00
p.gauges.Delete(k)
2020-11-05 11:51:58 -08:00
return true
2020-08-11 11:17:43 -05:00
}
2020-11-05 11:51:58 -08:00
// We have not observed the gauge this interval so we don't know its value.
g.Set(math.NaN())
}
2020-11-05 11:51:58 -08:00
g.Collect(c)
return true
})
p.summaries.Range(func(k, v interface{}) bool {
2020-11-05 11:51:58 -08:00
if v == nil {
return true
}
s := v.(*summary)
lastUpdate := s.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if s.canDelete {
2020-08-11 11:17:43 -05:00
p.summaries.Delete(k)
2020-11-05 11:51:58 -08:00
return true
2020-08-11 11:17:43 -05:00
}
2020-11-05 11:51:58 -08:00
// We have observed nothing in this interval.
s.Observe(math.NaN())
}
2020-11-05 11:51:58 -08:00
s.Collect(c)
return true
})
p.counters.Range(func(k, v interface{}) bool {
2020-11-05 11:51:58 -08:00
if v == nil {
return true
}
count := v.(*counter)
lastUpdate := count.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if count.canDelete {
2020-08-11 11:17:43 -05:00
p.counters.Delete(k)
2020-11-05 11:51:58 -08:00
return true
2020-08-11 11:17:43 -05:00
}
2020-11-05 11:51:58 -08:00
// Counters remain at their previous value when not observed, so we do not set it to NaN.
}
2020-11-05 11:51:58 -08:00
count.Collect(c)
return true
})
}
2020-11-05 11:51:58 -08:00
func initGauges(m *sync.Map, gauges []GaugeDefinition) {
for _, g := range gauges {
key, hash := flattenKey(g.Name, g.ConstLabels)
pG := prometheus.NewGauge(prometheus.GaugeOpts{
Name: key,
Help: g.Help,
ConstLabels: prometheusLabels(g.ConstLabels),
})
m.Store(hash, &gauge{ Gauge: pG })
}
return
}
func initSummaries(m *sync.Map, summaries []SummaryDefinition) {
for _, s := range summaries {
key, hash := flattenKey(s.Name, s.ConstLabels)
pS := prometheus.NewSummary(prometheus.SummaryOpts{
Name: key,
Help: s.Help,
MaxAge: 10 * time.Second,
ConstLabels: prometheusLabels(s.ConstLabels),
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
m.Store(hash, &summary{ Summary: pS })
}
return
}
func initCounters(m *sync.Map, counters []CounterDefinition) {
for _, c := range counters {
key, hash := flattenKey(c.Name, c.ConstLabels)
pC := prometheus.NewCounter(prometheus.CounterOpts{
Name: key,
Help: c.Help,
ConstLabels: prometheusLabels(c.ConstLabels),
})
m.Store(hash, &counter{ Counter: pC })
}
return
}
2019-03-26 17:50:42 -04:00
var forbiddenChars = regexp.MustCompile("[ .=\\-/]")
2020-11-05 11:51:58 -08:00
func flattenKey(parts []string, labels []metrics.Label) (string, string) {
key := strings.Join(parts, "_")
key = forbiddenChars.ReplaceAllString(key, "_")
hash := key
for _, label := range labels {
hash += fmt.Sprintf(";%s=%s", label.Name, label.Value)
}
return key, hash
}
func prometheusLabels(labels []metrics.Label) prometheus.Labels {
l := make(prometheus.Labels)
for _, label := range labels {
l[label.Name] = label.Value
}
return l
}
func (p *PrometheusSink) SetGauge(parts []string, val float32) {
p.SetGaugeWithLabels(parts, val, nil)
}
func (p *PrometheusSink) SetGaugeWithLabels(parts []string, val float32, labels []metrics.Label) {
2020-11-05 11:51:58 -08:00
key, hash := flattenKey(parts, labels)
2020-08-11 11:17:43 -05:00
pg, ok := p.gauges.Load(hash)
// The sync.Map underlying gauges stores pointers to our structs. If we need to make updates,
// rather than modifying the underlying value directly, which would be racy, we make a local
// copy by dereferencing the pointer we get back, making the appropriate changes, and then
// storing a pointer to our local copy. The underlying Prometheus types are threadsafe,
// so there's no issues there. It's possible for racy updates to occur to the updatedAt
// value, but since we're always setting it to time.Now(), it doesn't really matter.
if ok {
2020-11-05 11:51:58 -08:00
localGauge := *pg.(*gauge)
2020-08-11 11:17:43 -05:00
localGauge.Set(float64(val))
localGauge.updatedAt = time.Now()
p.gauges.Store(hash, &localGauge)
2020-11-05 11:51:58 -08:00
// The gauge does not exist, create the gauge and allow it to be deleted
2020-08-11 11:17:43 -05:00
} else {
g := prometheus.NewGauge(prometheus.GaugeOpts{
Name: key,
Help: key,
ConstLabels: prometheusLabels(labels),
})
2020-08-11 11:17:43 -05:00
g.Set(float64(val))
2020-11-05 11:51:58 -08:00
pg = &gauge{
Gauge: g,
updatedAt: time.Now(),
canDelete: true,
2020-08-11 11:17:43 -05:00
}
p.gauges.Store(hash, pg)
}
}
func (p *PrometheusSink) AddSample(parts []string, val float32) {
p.AddSampleWithLabels(parts, val, nil)
}
func (p *PrometheusSink) AddSampleWithLabels(parts []string, val float32, labels []metrics.Label) {
2020-11-05 11:51:58 -08:00
key, hash := flattenKey(parts, labels)
2020-08-11 11:17:43 -05:00
ps, ok := p.summaries.Load(hash)
2020-11-05 11:51:58 -08:00
// Does the summary already exist for this sample type?
2020-08-11 11:17:43 -05:00
if ok {
2020-11-05 11:51:58 -08:00
localSummary := *ps.(*summary)
2020-08-11 11:17:43 -05:00
localSummary.Observe(float64(val))
localSummary.updatedAt = time.Now()
p.summaries.Store(hash, &localSummary)
2020-11-05 11:51:58 -08:00
// The summary does not exist, create the Summary and allow it to be deleted
2020-08-11 11:17:43 -05:00
} else {
s := prometheus.NewSummary(prometheus.SummaryOpts{
Name: key,
Help: key,
MaxAge: 10 * time.Second,
ConstLabels: prometheusLabels(labels),
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
2020-08-11 11:17:43 -05:00
s.Observe(float64(val))
2020-11-05 11:51:58 -08:00
ps = &summary{
Summary: s,
updatedAt: time.Now(),
canDelete: true,
2020-08-11 11:17:43 -05:00
}
p.summaries.Store(hash, ps)
}
}
// EmitKey is not implemented. Prometheus doesnt offer a type for which an
// arbitrary number of values is retained, as Prometheus works with a pull
// model, rather than a push model.
func (p *PrometheusSink) EmitKey(key []string, val float32) {
}
func (p *PrometheusSink) IncrCounter(parts []string, val float32) {
p.IncrCounterWithLabels(parts, val, nil)
}
func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labels []metrics.Label) {
2020-11-05 11:51:58 -08:00
key, hash := flattenKey(parts, labels)
2020-08-11 11:17:43 -05:00
pc, ok := p.counters.Load(hash)
2020-11-05 11:51:58 -08:00
// Does the counter exist?
2020-08-11 11:17:43 -05:00
if ok {
2020-11-05 11:51:58 -08:00
localCounter := *pc.(*counter)
2020-08-11 11:17:43 -05:00
localCounter.Add(float64(val))
localCounter.updatedAt = time.Now()
p.counters.Store(hash, &localCounter)
2020-11-05 11:51:58 -08:00
// The counter does not exist yet, create it and allow it to be deleted
2020-08-11 11:17:43 -05:00
} else {
c := prometheus.NewCounter(prometheus.CounterOpts{
Name: key,
Help: key,
ConstLabels: prometheusLabels(labels),
})
2020-08-11 11:17:43 -05:00
c.Add(float64(val))
2020-11-05 11:51:58 -08:00
pc = &counter{
Counter: c,
updatedAt: time.Now(),
canDelete: true,
2020-08-11 11:17:43 -05:00
}
p.counters.Store(hash, pc)
}
}
2020-11-05 11:51:58 -08:00
// PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address
// on an interval.
type PrometheusPushSink struct {
*PrometheusSink
pusher *push.Pusher
address string
pushInterval time.Duration
stopChan chan struct{}
}
2020-11-05 11:51:58 -08:00
// NewPrometheusPushSink creates a PrometheusPushSink by taking an address, interval, and destination name.
func NewPrometheusPushSink(address string, pushInterval time.Duration, name string) (*PrometheusPushSink, error) {
promSink := &PrometheusSink{
gauges: sync.Map{},
summaries: sync.Map{},
counters: sync.Map{},
expiration: 60 * time.Second,
}
pusher := push.New(address, name).Collector(promSink)
sink := &PrometheusPushSink{
promSink,
pusher,
address,
2020-11-05 11:51:58 -08:00
pushInterval,
make(chan struct{}),
}
sink.flushMetrics()
return sink, nil
}
func (s *PrometheusPushSink) flushMetrics() {
ticker := time.NewTicker(s.pushInterval)
go func() {
for {
select {
case <-ticker.C:
err := s.pusher.Push()
if err != nil {
log.Printf("[ERR] Error pushing to Prometheus! Err: %s", err)
}
case <-s.stopChan:
ticker.Stop()
return
}
}
}()
}
func (s *PrometheusPushSink) Shutdown() {
close(s.stopChan)
}