Merge pull request #8679 from hashicorp/streaming/fix-TestHandler_EmitsStats

streaming: Fix TestHandler_EmitsStats
This commit is contained in:
Daniel Nephin 2020-09-15 17:04:55 -04:00 committed by GitHub
commit d5edce269e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 142 additions and 49 deletions

View File

@ -15,7 +15,7 @@ func NewHandler(addr net.Addr) *Handler {
// We don't need to pass tls.Config to the server since it's multiplexed
// behind the RPC listener, which already has TLS configured.
srv := grpc.NewServer(
grpc.StatsHandler(&statsHandler{}),
grpc.StatsHandler(newStatsHandler()),
grpc.StreamInterceptor((&activeStreamCounter{}).Intercept),
)

View File

@ -75,6 +75,7 @@ func (m *Req) GetDatacenter() string {
type Resp struct {
ServerName string `protobuf:"bytes,1,opt,name=ServerName,proto3" json:"ServerName,omitempty"`
Datacenter string `protobuf:"bytes,2,opt,name=Datacenter,proto3" json:"Datacenter,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -120,6 +121,13 @@ func (m *Resp) GetServerName() string {
return ""
}
func (m *Resp) GetDatacenter() string {
if m != nil {
return m.Datacenter
}
return ""
}
func init() {
proto.RegisterType((*Req)(nil), "testservice.Req")
proto.RegisterType((*Resp)(nil), "testservice.Resp")
@ -130,20 +138,20 @@ func init() {
}
var fileDescriptor_3009a77c573f826d = []byte{
// 200 bytes of a gzipped FileDescriptorProto
// 206 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x49, 0x4c, 0x4f, 0xcd,
0x2b, 0xd1, 0x4f, 0x2f, 0x2a, 0x48, 0xd6, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, 0xd1,
0x2f, 0x49, 0x2d, 0x2e, 0x29, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2f, 0xce, 0xcc, 0x2d,
0xc8, 0x49, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x46, 0x92, 0x51, 0x52, 0xe5, 0x62,
0x0e, 0x4a, 0x2d, 0x14, 0x92, 0xe3, 0xe2, 0x72, 0x49, 0x2c, 0x49, 0x4c, 0x4e, 0x05, 0xe9, 0x96,
0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x42, 0x12, 0x51, 0x52, 0xe3, 0x62, 0x09, 0x4a, 0x2d, 0x2e,
0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x42, 0x12, 0x51, 0x72, 0xe3, 0x62, 0x09, 0x4a, 0x2d, 0x2e,
0x00, 0xa9, 0x0b, 0x4e, 0x2d, 0x2a, 0x4b, 0x2d, 0xf2, 0x4b, 0xcc, 0x4d, 0x85, 0xa9, 0x43, 0x88,
0x18, 0xe5, 0x72, 0xb1, 0x05, 0x83, 0xed, 0x12, 0x32, 0xe2, 0xe2, 0x0c, 0xce, 0xcf, 0x4d, 0x2d,
0xc9, 0xc8, 0xcc, 0x4b, 0x17, 0x12, 0xd0, 0x43, 0xb2, 0x53, 0x2f, 0x28, 0xb5, 0x50, 0x4a, 0x10,
0x4d, 0xa4, 0xb8, 0x40, 0x89, 0x41, 0x48, 0x9f, 0x8b, 0xc5, 0x2d, 0x27, 0xbf, 0x9c, 0x48, 0xe5,
0x06, 0x8c, 0x4e, 0x02, 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c,
0xe3, 0x8c, 0xc7, 0x72, 0x0c, 0x49, 0x6c, 0x60, 0x3f, 0x1a, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff,
0x61, 0xd3, 0x5e, 0xba, 0x13, 0x01, 0x00, 0x00,
0xa0, 0x99, 0xc3, 0x84, 0x6e, 0x8e, 0x51, 0x2e, 0x17, 0x5b, 0x30, 0xd8, 0x2d, 0x42, 0x46, 0x5c,
0x9c, 0xc1, 0xf9, 0xb9, 0xa9, 0x25, 0x19, 0x99, 0x79, 0xe9, 0x42, 0x02, 0x7a, 0x48, 0x6e, 0xd2,
0x0b, 0x4a, 0x2d, 0x94, 0x12, 0x44, 0x13, 0x29, 0x2e, 0x50, 0x62, 0x10, 0xd2, 0xe7, 0x62, 0x71,
0xcb, 0xc9, 0x2f, 0x27, 0x52, 0xb9, 0x01, 0xa3, 0x93, 0xc0, 0x89, 0x47, 0x72, 0x8c, 0x17, 0x1e,
0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x38, 0xe3, 0xb1, 0x1c, 0x43, 0x12, 0x1b, 0x38, 0x0c, 0x8c,
0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0xe7, 0x4b, 0x16, 0x40, 0x33, 0x01, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -348,6 +356,13 @@ func (m *Resp) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Datacenter) > 0 {
i -= len(m.Datacenter)
copy(dAtA[i:], m.Datacenter)
i = encodeVarintSimple(dAtA, i, uint64(len(m.Datacenter)))
i--
dAtA[i] = 0x12
}
if len(m.ServerName) > 0 {
i -= len(m.ServerName)
copy(dAtA[i:], m.ServerName)
@ -395,6 +410,10 @@ func (m *Resp) Size() (n int) {
if l > 0 {
n += 1 + l + sovSimple(uint64(l))
}
l = len(m.Datacenter)
if l > 0 {
n += 1 + l + sovSimple(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
@ -554,6 +573,38 @@ func (m *Resp) Unmarshal(dAtA []byte) error {
}
m.ServerName = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Datacenter", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSimple
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthSimple
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthSimple
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Datacenter = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSimple(dAtA[iNdEx:])

View File

@ -14,4 +14,5 @@ message Req {
message Resp {
string ServerName = 1;
string Datacenter = 2;
}

28
agent/grpc/server_test.go Normal file
View File

@ -0,0 +1,28 @@
package grpc
import (
"context"
"time"
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
)
type simple struct {
name string
dc string
}
func (s *simple) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error {
for flow.Context().Err() == nil {
resp := &testservice.Resp{ServerName: "one", Datacenter: s.dc}
if err := flow.Send(resp); err != nil {
return err
}
time.Sleep(time.Millisecond)
}
return nil
}
func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) {
return &testservice.Resp{ServerName: s.name, Datacenter: s.dc}, nil
}

View File

@ -9,12 +9,19 @@ import (
"google.golang.org/grpc/stats"
)
var defaultMetrics = metrics.Default()
// statsHandler is a grpc/stats.StatsHandler which emits connection and
// request metrics to go-metrics.
type statsHandler struct {
metrics *metrics.Metrics
activeConns uint64 // must be 8-byte aligned for atomic access
}
func newStatsHandler() *statsHandler {
return &statsHandler{metrics: defaultMetrics}
}
// TagRPC implements grpcStats.StatsHandler
func (c *statsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
// No-op
@ -29,7 +36,7 @@ func (c *statsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
}
switch s.(type) {
case *stats.InHeader:
metrics.IncrCounter([]string{"grpc", label, "request"}, 1)
c.metrics.IncrCounter([]string{"grpc", label, "request"}, 1)
}
}
@ -53,7 +60,7 @@ func (c *statsHandler) HandleConn(_ context.Context, s stats.ConnStats) {
// Decrement!
count = atomic.AddUint64(&c.activeConns, ^uint64(0))
}
metrics.SetGauge([]string{"grpc", label, "active_conns"}, float32(count))
c.metrics.SetGauge([]string{"grpc", label, "active_conns"}, float32(count))
}
type activeStreamCounter struct {
@ -71,10 +78,10 @@ func (i *activeStreamCounter) Intercept(
handler grpc.StreamHandler,
) error {
count := atomic.AddUint64(&i.count, 1)
metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
defer func() {
count := atomic.AddUint64(&i.count, ^uint64(0))
metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
}()
return handler(srv, ss)

View File

@ -8,6 +8,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
@ -18,12 +19,11 @@ func TestHandler_EmitsStats(t *testing.T) {
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
handler := NewHandler(addr)
testservice.RegisterSimpleServer(handler.srv, &simple{})
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer lis.Close()
t.Cleanup(logError(t, lis.Close))
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
@ -43,7 +43,7 @@ func TestHandler_EmitsStats(t *testing.T) {
conn, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure())
require.NoError(t, err)
defer conn.Close()
t.Cleanup(logError(t, conn.Close))
client := testservice.NewSimpleClient(conn)
fClient, err := client.Flow(ctx, &testservice.Req{Datacenter: "mine"})
@ -53,36 +53,24 @@ func TestHandler_EmitsStats(t *testing.T) {
_, err = fClient.Recv()
require.NoError(t, err)
cancel()
// Wait for the server to stop so that active_streams is predictable.
retry.RunWith(fastRetry, t, func(r *retry.R) {
expectedGauge := []metricCall{
{key: []string{"testing", "grpc", "server", "active_conns"}, val: 1},
{key: []string{"testing", "grpc", "server", "active_streams"}, val: 1},
{key: []string{"testing", "grpc", "server", "active_streams"}, val: 0},
}
require.Equal(r, expectedGauge, sink.gaugeCalls)
})
expectedCounter := []metricCall{
{key: []string{"testing", "grpc", "server", "request"}, val: 1},
}
require.Equal(t, expectedCounter, sink.incrCounterCalls)
expectedGauge := []metricCall{
{key: []string{"testing", "grpc", "server", "active_conns"}, val: 1},
{key: []string{"testing", "grpc", "server", "active_streams"}, val: 1},
// TODO: why is the count reset to 0 before the client receives the second message?
{key: []string{"testing", "grpc", "server", "active_streams"}, val: 0},
}
require.Equal(t, expectedGauge, sink.gaugeCalls)
}
type simple struct {
name string
}
func (s *simple) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error {
if err := flow.Send(&testservice.Resp{ServerName: "one"}); err != nil {
return err
}
if err := flow.Send(&testservice.Resp{ServerName: "two"}); err != nil {
return err
}
return nil
}
func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) {
return &testservice.Resp{ServerName: "the-fake-service-name"}, nil
}
var fastRetry = &retry.Timer{Timeout: 7 * time.Second, Wait: 2 * time.Millisecond}
func patchGlobalMetrics(t *testing.T) *fakeMetricsSink {
t.Helper()
@ -94,7 +82,8 @@ func patchGlobalMetrics(t *testing.T) *fakeMetricsSink {
ProfileInterval: time.Second, // Poll runtime every second
FilterDefault: true,
}
_, err := metrics.NewGlobal(cfg, sink)
var err error
defaultMetrics, err = metrics.New(cfg, sink)
require.NoError(t, err)
t.Cleanup(func() {
_, err = metrics.NewGlobal(cfg, &metrics.BlackholeSink{})
@ -122,3 +111,11 @@ type metricCall struct {
val float32
labels []metrics.Label
}
func logError(t *testing.T, f func() error) func() {
return func() {
if err := f(); err != nil {
t.Logf(err.Error())
}
}
}

2
go.mod
View File

@ -13,7 +13,7 @@ require (
github.com/NYTimes/gziphandler v1.0.1
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
github.com/armon/go-metrics v0.3.4
github.com/armon/go-metrics v0.3.5-0.20200914211745-2bc64ebd2914
github.com/armon/go-radix v1.0.0
github.com/aws/aws-sdk-go v1.25.41
github.com/coredns/coredns v1.1.2

6
go.sum
View File

@ -34,7 +34,6 @@ github.com/Azure/go-autorest/tracing v0.5.0 h1:TRn4WjSnkcSy5AEG3pnbtFSwNtwzjr4VY
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/datadog-go v2.2.0+incompatible h1:V5BKkxACZLjzHjSgBbr2gvLA2Ae49yhc6CSY7MLy5k4=
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go v3.2.0+incompatible h1:qSG2N4FghB1He/r2mFrWKCaL7dXCilEuNEeAn20fdD4=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
@ -58,10 +57,9 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-metrics v0.3.4 h1:Xqf+7f2Vhl9tsqDYmXhnXInUdcrtgpRNpIA15/uldSc=
github.com/armon/go-metrics v0.3.4/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-metrics v0.3.5-0.20200914211745-2bc64ebd2914 h1:Yiw8vrY+7jX6pCOdAkIUNU8QBS9c6HJAct+K36MeANw=
github.com/armon/go-metrics v0.3.5-0.20200914211745-2bc64ebd2914/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=

View File

@ -28,6 +28,7 @@ type PrometheusOpts struct {
// Expiration is the duration a metric is valid for, after which it will be
// untracked. If the value is zero, a metric is never expired.
Expiration time.Duration
Registerer prometheus.Registerer
}
type PrometheusSink struct {
@ -67,7 +68,12 @@ func NewPrometheusSinkFrom(opts PrometheusOpts) (*PrometheusSink, error) {
expiration: opts.Expiration,
}
return sink, prometheus.Register(sink)
reg := opts.Registerer
if reg == nil {
reg = prometheus.DefaultRegisterer
}
return sink, reg.Register(sink)
}
// Describe is needed to meet the Collector interface.

View File

@ -6,7 +6,7 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/go-immutable-radix"
iradix "github.com/hashicorp/go-immutable-radix"
)
// Config is used to configure metrics settings
@ -48,6 +48,11 @@ func init() {
globalMetrics.Store(&Metrics{sink: &BlackholeSink{}})
}
// Default returns the shared global metrics instance.
func Default() *Metrics {
return globalMetrics.Load().(*Metrics)
}
// DefaultConfig provides a sane default configuration
func DefaultConfig(serviceName string) *Config {
c := &Config{

2
vendor/modules.txt vendored
View File

@ -32,7 +32,7 @@ github.com/NYTimes/gziphandler
github.com/StackExchange/wmi
# github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
github.com/armon/circbuf
# github.com/armon/go-metrics v0.3.4
# github.com/armon/go-metrics v0.3.5-0.20200914211745-2bc64ebd2914
github.com/armon/go-metrics
github.com/armon/go-metrics/circonus
github.com/armon/go-metrics/datadog