From 636f76f6f144ab63fc417b552a3b80b6ec70492e Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 14 Sep 2020 13:16:13 -0400 Subject: [PATCH] agent/grpc: make TestHandler_EmitsStats predictable Occasionally this test would flake. The flakes were fixed by: 1. Stopping the service and retrying to check on metrics. This way we also include the active_streams going to 0 in the metric calls. 2. Using a reference to the global Metrics. This way when other tests have background goroutines that are still shutting down, they won't emit metrics to the metric instance with the fake Sink. The stats test can patch the local reference to the global, so the existing statHandlers will continue to emit to the global, but the stats test will send all metrics to the replacement. --- agent/grpc/handler.go | 2 +- agent/grpc/server_test.go | 28 +++++++++++++++++++++ agent/grpc/stats.go | 15 ++++++++--- agent/grpc/stats_test.go | 53 ++++++++++++++++++--------------------- 4 files changed, 65 insertions(+), 33 deletions(-) create mode 100644 agent/grpc/server_test.go diff --git a/agent/grpc/handler.go b/agent/grpc/handler.go index ab23537ffa..c3af7f38c4 100644 --- a/agent/grpc/handler.go +++ b/agent/grpc/handler.go @@ -15,7 +15,7 @@ func NewHandler(addr net.Addr) *Handler { // We don't need to pass tls.Config to the server since it's multiplexed // behind the RPC listener, which already has TLS configured. srv := grpc.NewServer( - grpc.StatsHandler(&statsHandler{}), + grpc.StatsHandler(newStatsHandler()), grpc.StreamInterceptor((&activeStreamCounter{}).Intercept), ) diff --git a/agent/grpc/server_test.go b/agent/grpc/server_test.go new file mode 100644 index 0000000000..b7843ff011 --- /dev/null +++ b/agent/grpc/server_test.go @@ -0,0 +1,28 @@ +package grpc + +import ( + "context" + "time" + + "github.com/hashicorp/consul/agent/grpc/internal/testservice" +) + +type simple struct { + name string + dc string +} + +func (s *simple) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error { + for flow.Context().Err() == nil { + resp := &testservice.Resp{ServerName: "one", Datacenter: s.dc} + if err := flow.Send(resp); err != nil { + return err + } + time.Sleep(time.Millisecond) + } + return nil +} + +func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) { + return &testservice.Resp{ServerName: s.name, Datacenter: s.dc}, nil +} diff --git a/agent/grpc/stats.go b/agent/grpc/stats.go index cbf443878e..d25048110d 100644 --- a/agent/grpc/stats.go +++ b/agent/grpc/stats.go @@ -9,12 +9,19 @@ import ( "google.golang.org/grpc/stats" ) +var defaultMetrics = metrics.Default() + // statsHandler is a grpc/stats.StatsHandler which emits connection and // request metrics to go-metrics. type statsHandler struct { + metrics *metrics.Metrics activeConns uint64 // must be 8-byte aligned for atomic access } +func newStatsHandler() *statsHandler { + return &statsHandler{metrics: defaultMetrics} +} + // TagRPC implements grpcStats.StatsHandler func (c *statsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { // No-op @@ -29,7 +36,7 @@ func (c *statsHandler) HandleRPC(_ context.Context, s stats.RPCStats) { } switch s.(type) { case *stats.InHeader: - metrics.IncrCounter([]string{"grpc", label, "request"}, 1) + c.metrics.IncrCounter([]string{"grpc", label, "request"}, 1) } } @@ -53,7 +60,7 @@ func (c *statsHandler) HandleConn(_ context.Context, s stats.ConnStats) { // Decrement! count = atomic.AddUint64(&c.activeConns, ^uint64(0)) } - metrics.SetGauge([]string{"grpc", label, "active_conns"}, float32(count)) + c.metrics.SetGauge([]string{"grpc", label, "active_conns"}, float32(count)) } type activeStreamCounter struct { @@ -71,10 +78,10 @@ func (i *activeStreamCounter) Intercept( handler grpc.StreamHandler, ) error { count := atomic.AddUint64(&i.count, 1) - metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) + defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) defer func() { count := atomic.AddUint64(&i.count, ^uint64(0)) - metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) + defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) }() return handler(srv, ss) diff --git a/agent/grpc/stats_test.go b/agent/grpc/stats_test.go index cc99100701..05a9f30365 100644 --- a/agent/grpc/stats_test.go +++ b/agent/grpc/stats_test.go @@ -8,6 +8,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/agent/grpc/internal/testservice" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -18,12 +19,11 @@ func TestHandler_EmitsStats(t *testing.T) { addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} handler := NewHandler(addr) - testservice.RegisterSimpleServer(handler.srv, &simple{}) lis, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - defer lis.Close() + t.Cleanup(logError(t, lis.Close)) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -43,7 +43,7 @@ func TestHandler_EmitsStats(t *testing.T) { conn, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure()) require.NoError(t, err) - defer conn.Close() + t.Cleanup(logError(t, conn.Close)) client := testservice.NewSimpleClient(conn) fClient, err := client.Flow(ctx, &testservice.Req{Datacenter: "mine"}) @@ -53,36 +53,24 @@ func TestHandler_EmitsStats(t *testing.T) { _, err = fClient.Recv() require.NoError(t, err) + cancel() + // Wait for the server to stop so that active_streams is predictable. + retry.RunWith(fastRetry, t, func(r *retry.R) { + expectedGauge := []metricCall{ + {key: []string{"testing", "grpc", "server", "active_conns"}, val: 1}, + {key: []string{"testing", "grpc", "server", "active_streams"}, val: 1}, + {key: []string{"testing", "grpc", "server", "active_streams"}, val: 0}, + } + require.Equal(r, expectedGauge, sink.gaugeCalls) + }) + expectedCounter := []metricCall{ {key: []string{"testing", "grpc", "server", "request"}, val: 1}, } require.Equal(t, expectedCounter, sink.incrCounterCalls) - expectedGauge := []metricCall{ - {key: []string{"testing", "grpc", "server", "active_conns"}, val: 1}, - {key: []string{"testing", "grpc", "server", "active_streams"}, val: 1}, - // TODO: why is the count reset to 0 before the client receives the second message? - {key: []string{"testing", "grpc", "server", "active_streams"}, val: 0}, - } - require.Equal(t, expectedGauge, sink.gaugeCalls) } -type simple struct { - name string -} - -func (s *simple) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error { - if err := flow.Send(&testservice.Resp{ServerName: "one"}); err != nil { - return err - } - if err := flow.Send(&testservice.Resp{ServerName: "two"}); err != nil { - return err - } - return nil -} - -func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) { - return &testservice.Resp{ServerName: "the-fake-service-name"}, nil -} +var fastRetry = &retry.Timer{Timeout: 7 * time.Second, Wait: 2 * time.Millisecond} func patchGlobalMetrics(t *testing.T) *fakeMetricsSink { t.Helper() @@ -94,7 +82,8 @@ func patchGlobalMetrics(t *testing.T) *fakeMetricsSink { ProfileInterval: time.Second, // Poll runtime every second FilterDefault: true, } - _, err := metrics.NewGlobal(cfg, sink) + var err error + defaultMetrics, err = metrics.New(cfg, sink) require.NoError(t, err) t.Cleanup(func() { _, err = metrics.NewGlobal(cfg, &metrics.BlackholeSink{}) @@ -122,3 +111,11 @@ type metricCall struct { val float32 labels []metrics.Label } + +func logError(t *testing.T, f func() error) func() { + return func() { + if err := f(); err != nil { + t.Logf(err.Error()) + } + } +}