diff --git a/agent/grpc/handler.go b/agent/grpc/handler.go index ab23537ffa..c3af7f38c4 100644 --- a/agent/grpc/handler.go +++ b/agent/grpc/handler.go @@ -15,7 +15,7 @@ func NewHandler(addr net.Addr) *Handler { // We don't need to pass tls.Config to the server since it's multiplexed // behind the RPC listener, which already has TLS configured. srv := grpc.NewServer( - grpc.StatsHandler(&statsHandler{}), + grpc.StatsHandler(newStatsHandler()), grpc.StreamInterceptor((&activeStreamCounter{}).Intercept), ) diff --git a/agent/grpc/server_test.go b/agent/grpc/server_test.go new file mode 100644 index 0000000000..b7843ff011 --- /dev/null +++ b/agent/grpc/server_test.go @@ -0,0 +1,28 @@ +package grpc + +import ( + "context" + "time" + + "github.com/hashicorp/consul/agent/grpc/internal/testservice" +) + +type simple struct { + name string + dc string +} + +func (s *simple) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error { + for flow.Context().Err() == nil { + resp := &testservice.Resp{ServerName: "one", Datacenter: s.dc} + if err := flow.Send(resp); err != nil { + return err + } + time.Sleep(time.Millisecond) + } + return nil +} + +func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) { + return &testservice.Resp{ServerName: s.name, Datacenter: s.dc}, nil +} diff --git a/agent/grpc/stats.go b/agent/grpc/stats.go index cbf443878e..d25048110d 100644 --- a/agent/grpc/stats.go +++ b/agent/grpc/stats.go @@ -9,12 +9,19 @@ import ( "google.golang.org/grpc/stats" ) +var defaultMetrics = metrics.Default() + // statsHandler is a grpc/stats.StatsHandler which emits connection and // request metrics to go-metrics. type statsHandler struct { + metrics *metrics.Metrics activeConns uint64 // must be 8-byte aligned for atomic access } +func newStatsHandler() *statsHandler { + return &statsHandler{metrics: defaultMetrics} +} + // TagRPC implements grpcStats.StatsHandler func (c *statsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { // No-op @@ -29,7 +36,7 @@ func (c *statsHandler) HandleRPC(_ context.Context, s stats.RPCStats) { } switch s.(type) { case *stats.InHeader: - metrics.IncrCounter([]string{"grpc", label, "request"}, 1) + c.metrics.IncrCounter([]string{"grpc", label, "request"}, 1) } } @@ -53,7 +60,7 @@ func (c *statsHandler) HandleConn(_ context.Context, s stats.ConnStats) { // Decrement! count = atomic.AddUint64(&c.activeConns, ^uint64(0)) } - metrics.SetGauge([]string{"grpc", label, "active_conns"}, float32(count)) + c.metrics.SetGauge([]string{"grpc", label, "active_conns"}, float32(count)) } type activeStreamCounter struct { @@ -71,10 +78,10 @@ func (i *activeStreamCounter) Intercept( handler grpc.StreamHandler, ) error { count := atomic.AddUint64(&i.count, 1) - metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) + defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) defer func() { count := atomic.AddUint64(&i.count, ^uint64(0)) - metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) + defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) }() return handler(srv, ss) diff --git a/agent/grpc/stats_test.go b/agent/grpc/stats_test.go index cc99100701..05a9f30365 100644 --- a/agent/grpc/stats_test.go +++ b/agent/grpc/stats_test.go @@ -8,6 +8,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/agent/grpc/internal/testservice" + "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -18,12 +19,11 @@ func TestHandler_EmitsStats(t *testing.T) { addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} handler := NewHandler(addr) - testservice.RegisterSimpleServer(handler.srv, &simple{}) lis, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - defer lis.Close() + t.Cleanup(logError(t, lis.Close)) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -43,7 +43,7 @@ func TestHandler_EmitsStats(t *testing.T) { conn, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure()) require.NoError(t, err) - defer conn.Close() + t.Cleanup(logError(t, conn.Close)) client := testservice.NewSimpleClient(conn) fClient, err := client.Flow(ctx, &testservice.Req{Datacenter: "mine"}) @@ -53,36 +53,24 @@ func TestHandler_EmitsStats(t *testing.T) { _, err = fClient.Recv() require.NoError(t, err) + cancel() + // Wait for the server to stop so that active_streams is predictable. + retry.RunWith(fastRetry, t, func(r *retry.R) { + expectedGauge := []metricCall{ + {key: []string{"testing", "grpc", "server", "active_conns"}, val: 1}, + {key: []string{"testing", "grpc", "server", "active_streams"}, val: 1}, + {key: []string{"testing", "grpc", "server", "active_streams"}, val: 0}, + } + require.Equal(r, expectedGauge, sink.gaugeCalls) + }) + expectedCounter := []metricCall{ {key: []string{"testing", "grpc", "server", "request"}, val: 1}, } require.Equal(t, expectedCounter, sink.incrCounterCalls) - expectedGauge := []metricCall{ - {key: []string{"testing", "grpc", "server", "active_conns"}, val: 1}, - {key: []string{"testing", "grpc", "server", "active_streams"}, val: 1}, - // TODO: why is the count reset to 0 before the client receives the second message? - {key: []string{"testing", "grpc", "server", "active_streams"}, val: 0}, - } - require.Equal(t, expectedGauge, sink.gaugeCalls) } -type simple struct { - name string -} - -func (s *simple) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error { - if err := flow.Send(&testservice.Resp{ServerName: "one"}); err != nil { - return err - } - if err := flow.Send(&testservice.Resp{ServerName: "two"}); err != nil { - return err - } - return nil -} - -func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) { - return &testservice.Resp{ServerName: "the-fake-service-name"}, nil -} +var fastRetry = &retry.Timer{Timeout: 7 * time.Second, Wait: 2 * time.Millisecond} func patchGlobalMetrics(t *testing.T) *fakeMetricsSink { t.Helper() @@ -94,7 +82,8 @@ func patchGlobalMetrics(t *testing.T) *fakeMetricsSink { ProfileInterval: time.Second, // Poll runtime every second FilterDefault: true, } - _, err := metrics.NewGlobal(cfg, sink) + var err error + defaultMetrics, err = metrics.New(cfg, sink) require.NoError(t, err) t.Cleanup(func() { _, err = metrics.NewGlobal(cfg, &metrics.BlackholeSink{}) @@ -122,3 +111,11 @@ type metricCall struct { val float32 labels []metrics.Label } + +func logError(t *testing.T, f func() error) func() { + return func() { + if err := f(); err != nil { + t.Logf(err.Error()) + } + } +}