408 lines
9.3 KiB
Go

// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package circonusgometrics provides instrumentation for your applications in the form
// of counters, gauges and histograms and allows you to publish them to
// Circonus
//
// Counters
//
// A counter is a monotonically-increasing, unsigned, 64-bit integer used to
// represent the number of times an event has occurred. By tracking the deltas
// between measurements of a counter over intervals of time, an aggregation
// layer can derive rates, acceleration, etc.
//
// Gauges
//
// A gauge returns instantaneous measurements of something using signed, 64-bit
// integers. This value does not need to be monotonic.
//
// Histograms
//
// A histogram tracks the distribution of a stream of values (e.g. the number of
// seconds it takes to handle requests). Circonus can calculate complex
// analytics on these.
//
// Reporting
//
// A period push to a Circonus httptrap is confgurable.
package circonusgometrics
import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"log"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/circonus-labs/circonus-gometrics/api"
"github.com/circonus-labs/circonus-gometrics/checkmgr"
"github.com/pkg/errors"
)
const (
defaultFlushInterval = "10s" // 10 * time.Second
)
// Metric defines an individual metric
type Metric struct {
Type string `json:"_type"`
Value interface{} `json:"_value"`
}
// Metrics holds host metrics
type Metrics map[string]Metric
// Config options for circonus-gometrics
type Config struct {
Log *log.Logger
Debug bool
ResetCounters string // reset/delete counters on flush (default true)
ResetGauges string // reset/delete gauges on flush (default true)
ResetHistograms string // reset/delete histograms on flush (default true)
ResetText string // reset/delete text on flush (default true)
// API, Check and Broker configuration options
CheckManager checkmgr.Config
// how frequenly to submit metrics to Circonus, default 10 seconds.
// Set to 0 to disable automatic flushes and call Flush manually.
Interval string
}
type prevMetrics struct {
metrics *Metrics
metricsmu sync.Mutex
ts time.Time
}
// CirconusMetrics state
type CirconusMetrics struct {
Log *log.Logger
Debug bool
resetCounters bool
resetGauges bool
resetHistograms bool
resetText bool
flushInterval time.Duration
flushing bool
flushmu sync.Mutex
packagingmu sync.Mutex
check *checkmgr.CheckManager
lastMetrics *prevMetrics
counters map[string]uint64
cm sync.Mutex
counterFuncs map[string]func() uint64
cfm sync.Mutex
gauges map[string]interface{}
gm sync.Mutex
gaugeFuncs map[string]func() int64
gfm sync.Mutex
histograms map[string]*Histogram
hm sync.Mutex
text map[string]string
tm sync.Mutex
textFuncs map[string]func() string
tfm sync.Mutex
}
// NewCirconusMetrics returns a CirconusMetrics instance
func NewCirconusMetrics(cfg *Config) (*CirconusMetrics, error) {
return New(cfg)
}
// New returns a CirconusMetrics instance
func New(cfg *Config) (*CirconusMetrics, error) {
if cfg == nil {
return nil, errors.New("invalid configuration (nil)")
}
cm := &CirconusMetrics{
counters: make(map[string]uint64),
counterFuncs: make(map[string]func() uint64),
gauges: make(map[string]interface{}),
gaugeFuncs: make(map[string]func() int64),
histograms: make(map[string]*Histogram),
text: make(map[string]string),
textFuncs: make(map[string]func() string),
lastMetrics: &prevMetrics{},
}
// Logging
{
cm.Debug = cfg.Debug
cm.Log = cfg.Log
if cm.Debug && cm.Log == nil {
cm.Log = log.New(os.Stderr, "", log.LstdFlags)
}
if cm.Log == nil {
cm.Log = log.New(ioutil.Discard, "", log.LstdFlags)
}
}
// Flush Interval
{
fi := defaultFlushInterval
if cfg.Interval != "" {
fi = cfg.Interval
}
dur, err := time.ParseDuration(fi)
if err != nil {
return nil, errors.Wrap(err, "parsing flush interval")
}
cm.flushInterval = dur
}
// metric resets
cm.resetCounters = true
if cfg.ResetCounters != "" {
setting, err := strconv.ParseBool(cfg.ResetCounters)
if err != nil {
return nil, errors.Wrap(err, "parsing reset counters")
}
cm.resetCounters = setting
}
cm.resetGauges = true
if cfg.ResetGauges != "" {
setting, err := strconv.ParseBool(cfg.ResetGauges)
if err != nil {
return nil, errors.Wrap(err, "parsing reset gauges")
}
cm.resetGauges = setting
}
cm.resetHistograms = true
if cfg.ResetHistograms != "" {
setting, err := strconv.ParseBool(cfg.ResetHistograms)
if err != nil {
return nil, errors.Wrap(err, "parsing reset histograms")
}
cm.resetHistograms = setting
}
cm.resetText = true
if cfg.ResetText != "" {
setting, err := strconv.ParseBool(cfg.ResetText)
if err != nil {
return nil, errors.Wrap(err, "parsing reset text")
}
cm.resetText = setting
}
// check manager
{
cfg.CheckManager.Debug = cm.Debug
cfg.CheckManager.Log = cm.Log
check, err := checkmgr.New(&cfg.CheckManager)
if err != nil {
return nil, errors.Wrap(err, "creating new check manager")
}
cm.check = check
}
// start background initialization
cm.check.Initialize()
// if automatic flush is enabled, start it.
// NOTE: submit will jettison metrics until initialization has completed.
if cm.flushInterval > time.Duration(0) {
go func() {
for range time.NewTicker(cm.flushInterval).C {
cm.Flush()
}
}()
}
return cm, nil
}
// Start deprecated NOP, automatic flush is started in New if flush interval > 0.
func (m *CirconusMetrics) Start() {
// nop
}
// Ready returns true or false indicating if the check is ready to accept metrics
func (m *CirconusMetrics) Ready() bool {
return m.check.IsReady()
}
func (m *CirconusMetrics) packageMetrics() (map[string]*api.CheckBundleMetric, Metrics) {
m.packagingmu.Lock()
defer m.packagingmu.Unlock()
if m.Debug {
m.Log.Println("[DEBUG] Packaging metrics")
}
counters, gauges, histograms, text := m.snapshot()
newMetrics := make(map[string]*api.CheckBundleMetric)
output := make(Metrics, len(counters)+len(gauges)+len(histograms)+len(text))
for name, value := range counters {
send := m.check.IsMetricActive(name)
if !send && m.check.ActivateMetric(name) {
send = true
newMetrics[name] = &api.CheckBundleMetric{
Name: name,
Type: "numeric",
Status: "active",
}
}
if send {
output[name] = Metric{Type: "L", Value: value}
}
}
for name, value := range gauges {
send := m.check.IsMetricActive(name)
if !send && m.check.ActivateMetric(name) {
send = true
newMetrics[name] = &api.CheckBundleMetric{
Name: name,
Type: "numeric",
Status: "active",
}
}
if send {
output[name] = Metric{Type: m.getGaugeType(value), Value: value}
}
}
for name, value := range histograms {
send := m.check.IsMetricActive(name)
if !send && m.check.ActivateMetric(name) {
send = true
newMetrics[name] = &api.CheckBundleMetric{
Name: name,
Type: "histogram",
Status: "active",
}
}
if send {
output[name] = Metric{Type: "n", Value: value.DecStrings()}
}
}
for name, value := range text {
send := m.check.IsMetricActive(name)
if !send && m.check.ActivateMetric(name) {
send = true
newMetrics[name] = &api.CheckBundleMetric{
Name: name,
Type: "text",
Status: "active",
}
}
if send {
output[name] = Metric{Type: "s", Value: value}
}
}
m.lastMetrics.metricsmu.Lock()
defer m.lastMetrics.metricsmu.Unlock()
m.lastMetrics.metrics = &output
m.lastMetrics.ts = time.Now()
return newMetrics, output
}
// PromOutput returns lines of metrics in prom format
func (m *CirconusMetrics) PromOutput() (*bytes.Buffer, error) {
m.lastMetrics.metricsmu.Lock()
defer m.lastMetrics.metricsmu.Unlock()
if m.lastMetrics.metrics == nil {
return nil, errors.New("no metrics available")
}
var b bytes.Buffer
w := bufio.NewWriter(&b)
ts := m.lastMetrics.ts.UnixNano() / int64(time.Millisecond)
for name, metric := range *m.lastMetrics.metrics {
switch metric.Type {
case "n":
if strings.HasPrefix(fmt.Sprintf("%v", metric.Value), "[H[") {
continue // circonus histogram != prom "histogram" (aka percentile)
}
case "s":
continue // text metrics unsupported
}
fmt.Fprintf(w, "%s %v %d\n", name, metric.Value, ts)
}
err := w.Flush()
if err != nil {
return nil, errors.Wrap(err, "flushing metric buffer")
}
return &b, err
}
// FlushMetrics flushes current metrics to a structure and returns it (does NOT send to Circonus)
func (m *CirconusMetrics) FlushMetrics() *Metrics {
m.flushmu.Lock()
if m.flushing {
m.flushmu.Unlock()
return &Metrics{}
}
m.flushing = true
m.flushmu.Unlock()
_, output := m.packageMetrics()
m.flushmu.Lock()
m.flushing = false
m.flushmu.Unlock()
return &output
}
// Flush metrics kicks off the process of sending metrics to Circonus
func (m *CirconusMetrics) Flush() {
m.flushmu.Lock()
if m.flushing {
m.flushmu.Unlock()
return
}
m.flushing = true
m.flushmu.Unlock()
newMetrics, output := m.packageMetrics()
if len(output) > 0 {
m.submit(output, newMetrics)
} else {
if m.Debug {
m.Log.Println("[DEBUG] No metrics to send, skipping")
}
}
m.flushmu.Lock()
m.flushing = false
m.flushmu.Unlock()
}