emit metrics for global rate limiting (#15891)

This commit is contained in:
Semir Patel 2023-01-06 17:49:33 -06:00 committed by GitHub
parent 8bfdc0c02f
commit 40c0bb24ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 197 additions and 141 deletions

View File

@ -9,9 +9,9 @@ import (
"reflect"
"sync/atomic"
"github.com/hashicorp/go-hclog"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/consul/multilimiter"
"github.com/hashicorp/go-hclog"
)
var (
@ -214,6 +214,21 @@ func (h *Handler) Allow(op Operation) error {
"limit_enforced", enforced,
)
metrics.IncrCounterWithLabels([]string{"consul", "rate_limit"}, 1, []metrics.Label{
{
Name: "limit_type",
Value: l.desc,
},
{
Name: "op",
Value: op.Name,
},
{
Name: "mode",
Value: l.mode.String(),
},
})
if enforced {
// TODO(NET-1382) - use the logger to print rate limiter logs.
if h.leaderStatusProvider.IsLeader() && op.Type == OperationTypeWrite {

View File

@ -13,6 +13,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/consul/multilimiter"
"github.com/hashicorp/consul/agent/metrics"
)
//
@ -48,6 +49,9 @@ func TestHandler(t *testing.T) {
isLeader bool
expectErr error
expectLog bool
expectMetric bool
expectMetricName string
expectMetricCount float64
}{
"operation exempt from limiting": {
op: Operation{
@ -59,6 +63,7 @@ func TestHandler(t *testing.T) {
checks: []limitCheck{},
expectErr: nil,
expectLog: false,
expectMetric: false,
},
"global write limit disabled": {
op: Operation{
@ -70,6 +75,7 @@ func TestHandler(t *testing.T) {
checks: []limitCheck{},
expectErr: nil,
expectLog: false,
expectMetric: false,
},
"global write limit within allowance": {
op: Operation{
@ -83,6 +89,7 @@ func TestHandler(t *testing.T) {
},
expectErr: nil,
expectLog: false,
expectMetric: false,
},
"global write limit exceeded (permissive)": {
op: Operation{
@ -96,6 +103,9 @@ func TestHandler(t *testing.T) {
},
expectErr: nil,
expectLog: true,
expectMetric: true,
expectMetricName: "consul.rate_limit;limit_type=global/write;op=Foo.Bar;mode=permissive",
expectMetricCount: 1,
},
"global write limit exceeded (enforcing, leader)": {
op: Operation{
@ -110,6 +120,9 @@ func TestHandler(t *testing.T) {
isLeader: true,
expectErr: ErrRetryLater,
expectLog: true,
expectMetric: true,
expectMetricName: "consul.rate_limit;limit_type=global/write;op=Foo.Bar;mode=enforcing",
expectMetricCount: 1,
},
"global write limit exceeded (enforcing, follower)": {
op: Operation{
@ -124,6 +137,9 @@ func TestHandler(t *testing.T) {
isLeader: false,
expectErr: ErrRetryElsewhere,
expectLog: true,
expectMetric: true,
expectMetricName: "consul.rate_limit;limit_type=global/write;op=Foo.Bar;mode=enforcing",
expectMetricCount: 1,
},
"global read limit disabled": {
op: Operation{
@ -135,6 +151,7 @@ func TestHandler(t *testing.T) {
checks: []limitCheck{},
expectErr: nil,
expectLog: false,
expectMetric: false,
},
"global read limit within allowance": {
op: Operation{
@ -148,6 +165,7 @@ func TestHandler(t *testing.T) {
},
expectErr: nil,
expectLog: false,
expectMetric: false,
},
"global read limit exceeded (permissive)": {
op: Operation{
@ -161,6 +179,9 @@ func TestHandler(t *testing.T) {
},
expectErr: nil,
expectLog: true,
expectMetric: true,
expectMetricName: "consul.rate_limit;limit_type=global/read;op=Foo.Bar;mode=permissive",
expectMetricCount: 1,
},
"global read limit exceeded (enforcing, leader)": {
op: Operation{
@ -175,6 +196,9 @@ func TestHandler(t *testing.T) {
isLeader: true,
expectErr: ErrRetryElsewhere,
expectLog: true,
expectMetric: true,
expectMetricName: "consul.rate_limit;limit_type=global/read;op=Foo.Bar;mode=enforcing",
expectMetricCount: 1,
},
"global read limit exceeded (enforcing, follower)": {
op: Operation{
@ -189,10 +213,14 @@ func TestHandler(t *testing.T) {
isLeader: false,
expectErr: ErrRetryElsewhere,
expectLog: true,
expectMetric: true,
expectMetricName: "consul.rate_limit;limit_type=global/read;op=Foo.Bar;mode=enforcing",
expectMetricCount: 1,
},
}
for desc, tc := range testCases {
t.Run(desc, func(t *testing.T) {
sink := metrics.TestSetupMetrics(t, "")
limiter := newMockLimiter(t)
limiter.On("UpdateConfig", mock.Anything, mock.Anything).Return()
for _, c := range tc.checks {
@ -224,6 +252,10 @@ func TestHandler(t *testing.T) {
} else {
require.Zero(t, output.Len(), "expected no logs to be emitted")
}
if tc.expectMetric {
metrics.AssertCounter(t, sink, tc.expectMetricName, tc.expectMetricCount)
}
})
}
}

101
agent/metrics/testing.go Normal file
View File

@ -0,0 +1,101 @@
package metrics
import (
"bytes"
"fmt"
"testing"
"time"
"github.com/armon/go-metrics"
"github.com/stretchr/testify/assert"
)
// Returns an in memory metrics sink for tests to assert metrics are emitted.
// Do not enable t.Parallel() since this relies on the global metrics instance.
func TestSetupMetrics(t *testing.T, serviceName string) *metrics.InmemSink {
// Record for ages (5 mins) so we can be confident that our assertions won't
// fail on silly long test runs due to dropped data.
s := metrics.NewInmemSink(10*time.Second, 300*time.Second)
cfg := metrics.DefaultConfig(serviceName)
cfg.EnableHostname = false
cfg.EnableRuntimeMetrics = false
metrics.NewGlobal(cfg, s)
return s
}
// Asserts that a counter metric has the given value
func AssertCounter(t *testing.T, sink *metrics.InmemSink, name string, value float64) {
t.Helper()
data := sink.Data()
var got float64
for _, intv := range data {
intv.RLock()
// Note that InMemSink uses SampledValue and treats the _Sum_ not the Count
// as the entire value.
if sample, ok := intv.Counters[name]; ok {
got += sample.Sum
}
intv.RUnlock()
}
if !assert.Equal(t, value, got) {
// no nice way to dump this - this is copied from private method in
// InMemSink used for dumping to stdout on SIGUSR1.
buf := bytes.NewBuffer(nil)
for _, intv := range data {
intv.RLock()
for name, val := range intv.Gauges {
fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
}
for name, vals := range intv.Points {
for _, val := range vals {
fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val)
}
}
for name, agg := range intv.Counters {
fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
}
for name, agg := range intv.Samples {
fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
}
intv.RUnlock()
}
t.Log(buf.String())
}
}
// Asserts that a gauge metric has the current value
func AssertGauge(t *testing.T, sink *metrics.InmemSink, name string, value float32) {
t.Helper()
data := sink.Data()
// Loop backward through intervals until there is a non-empty one
// Addresses flakiness around recording to one interval but accessing during the next
var got float32
for i := len(data) - 1; i >= 0; i-- {
currentInterval := data[i]
currentInterval.RLock()
if len(currentInterval.Gauges) > 0 {
got = currentInterval.Gauges[name].Value
currentInterval.RUnlock()
break
}
currentInterval.RUnlock()
}
if !assert.Equal(t, value, got) {
buf := bytes.NewBuffer(nil)
for _, intv := range data {
intv.RLock()
for name, val := range intv.Gauges {
fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
}
intv.RUnlock()
}
t.Log(buf.String())
}
}

View File

@ -1,113 +1,21 @@
package proxy
import (
"bytes"
"context"
"fmt"
"net"
"testing"
"time"
"github.com/hashicorp/consul/connect"
metrics "github.com/armon/go-metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
agConnect "github.com/hashicorp/consul/agent/connect"
agMetrics "github.com/hashicorp/consul/agent/metrics"
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/sdk/testutil"
)
func testSetupMetrics(t *testing.T) *metrics.InmemSink {
// Record for ages (5 mins) so we can be confident that our assertions won't
// fail on silly long test runs due to dropped data.
s := metrics.NewInmemSink(10*time.Second, 300*time.Second)
cfg := metrics.DefaultConfig("consul.proxy.test")
cfg.EnableHostname = false
cfg.EnableRuntimeMetrics = false
metrics.NewGlobal(cfg, s)
return s
}
func assertCurrentGaugeValue(t *testing.T, sink *metrics.InmemSink,
name string, value float32) {
t.Helper()
data := sink.Data()
// Loop backward through intervals until there is a non-empty one
// Addresses flakiness around recording to one interval but accessing during the next
var got float32
for i := len(data) - 1; i >= 0; i-- {
currentInterval := data[i]
currentInterval.RLock()
if len(currentInterval.Gauges) > 0 {
got = currentInterval.Gauges[name].Value
currentInterval.RUnlock()
break
}
currentInterval.RUnlock()
}
if !assert.Equal(t, value, got) {
buf := bytes.NewBuffer(nil)
for _, intv := range data {
intv.RLock()
for name, val := range intv.Gauges {
fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
}
intv.RUnlock()
}
t.Log(buf.String())
}
}
func assertAllTimeCounterValue(t *testing.T, sink *metrics.InmemSink,
name string, value float64) {
t.Helper()
data := sink.Data()
var got float64
for _, intv := range data {
intv.RLock()
// Note that InMemSink uses SampledValue and treats the _Sum_ not the Count
// as the entire value.
if sample, ok := intv.Counters[name]; ok {
got += sample.Sum
}
intv.RUnlock()
}
if !assert.Equal(t, value, got) {
// no nice way to dump this - this is copied from private method in
// InMemSink used for dumping to stdout on SIGUSR1.
buf := bytes.NewBuffer(nil)
for _, intv := range data {
intv.RLock()
for name, val := range intv.Gauges {
fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
}
for name, vals := range intv.Points {
for _, val := range vals {
fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val)
}
}
for name, agg := range intv.Counters {
fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
}
for name, agg := range intv.Samples {
fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
}
intv.RUnlock()
}
t.Log(buf.String())
}
}
func TestPublicListener(t *testing.T) {
// Can't enable t.Parallel since we rely on the global metrics instance.
@ -125,7 +33,7 @@ func TestPublicListener(t *testing.T) {
}
// Setup metrics to test they are recorded
sink := testSetupMetrics(t)
sink := agMetrics.TestSetupMetrics(t, "consul.proxy.test")
svc := connect.TestService(t, "db", ca)
l := NewPublicListener(svc, cfg, testutil.Logger(t))
@ -150,14 +58,14 @@ func TestPublicListener(t *testing.T) {
TestEchoConn(t, conn, "")
// Check active conn is tracked in gauges
assertCurrentGaugeValue(t, sink, "consul.proxy.test.inbound.conns;dst=db", 1)
agMetrics.AssertGauge(t, sink, "consul.proxy.test.inbound.conns;dst=db", 1)
// Close listener to ensure all conns are closed and have reported their metrics
l.Close()
// Check all the tx/rx counters got added
assertAllTimeCounterValue(t, sink, "consul.proxy.test.inbound.tx_bytes;dst=db", 11)
assertAllTimeCounterValue(t, sink, "consul.proxy.test.inbound.rx_bytes;dst=db", 11)
agMetrics.AssertCounter(t, sink, "consul.proxy.test.inbound.tx_bytes;dst=db", 11)
agMetrics.AssertCounter(t, sink, "consul.proxy.test.inbound.rx_bytes;dst=db", 11)
}
func TestUpstreamListener(t *testing.T) {
@ -183,7 +91,7 @@ func TestUpstreamListener(t *testing.T) {
}
// Setup metrics to test they are recorded
sink := testSetupMetrics(t)
sink := agMetrics.TestSetupMetrics(t, "consul.proxy.test")
svc := connect.TestService(t, "web", ca)
@ -214,12 +122,12 @@ func TestUpstreamListener(t *testing.T) {
TestEchoConn(t, conn, "")
// Check active conn is tracked in gauges
assertCurrentGaugeValue(t, sink, "consul.proxy.test.upstream.conns;src=web;dst_type=service;dst=db", 1)
agMetrics.AssertGauge(t, sink, "consul.proxy.test.upstream.conns;src=web;dst_type=service;dst=db", 1)
// Close listener to ensure all conns are closed and have reported their metrics
l.Close()
// Check all the tx/rx counters got added
assertAllTimeCounterValue(t, sink, "consul.proxy.test.upstream.tx_bytes;src=web;dst_type=service;dst=db", 11)
assertAllTimeCounterValue(t, sink, "consul.proxy.test.upstream.rx_bytes;src=web;dst_type=service;dst=db", 11)
agMetrics.AssertCounter(t, sink, "consul.proxy.test.upstream.tx_bytes;src=web;dst_type=service;dst=db", 11)
agMetrics.AssertCounter(t, sink, "consul.proxy.test.upstream.rx_bytes;src=web;dst_type=service;dst=db", 11)
}