consul/agent/grpc/handler.go
Daniel Nephin 636f76f6f1 agent/grpc: make TestHandler_EmitsStats predictable
Occasionally this test would flake. The flakes were fixed by:

1. Stopping the service and retrying to check on metrics. This way we
   also include the active_streams going to 0 in the metric calls.

2. Using a reference to the global Metrics. This way when other tests
   have background goroutines that are still shutting down, they won't
   emit metrics to the metric instance with the fake Sink. The stats
   test can patch the local reference to the global, so the existing
   statHandlers will continue to emit to the global, but the stats
   test will send all metrics to the replacement.
2020-09-14 19:05:22 -04:00

110 lines
2.6 KiB
Go

/*
Package grpc provides a Handler and client for agent gRPC connections.
*/
package grpc
import (
"fmt"
"net"
"google.golang.org/grpc"
)
// NewHandler returns a gRPC server that accepts connections from Handle(conn).
func NewHandler(addr net.Addr) *Handler {
// We don't need to pass tls.Config to the server since it's multiplexed
// behind the RPC listener, which already has TLS configured.
srv := grpc.NewServer(
grpc.StatsHandler(newStatsHandler()),
grpc.StreamInterceptor((&activeStreamCounter{}).Intercept),
)
// TODO(streaming): add gRPC services to srv here
return &Handler{
srv: srv,
listener: &chanListener{addr: addr, conns: make(chan net.Conn)},
}
}
// Handler implements a handler for the rpc server listener, and the
// agent.Component interface for managing the lifecycle of the grpc.Server.
type Handler struct {
srv *grpc.Server
listener *chanListener
}
// Handle the connection by sending it to a channel for the grpc.Server to receive.
func (h *Handler) Handle(conn net.Conn) {
h.listener.conns <- conn
}
func (h *Handler) Run() error {
return h.srv.Serve(h.listener)
}
func (h *Handler) Shutdown() error {
h.srv.Stop()
return nil
}
// chanListener implements net.Listener for grpc.Server.
type chanListener struct {
conns chan net.Conn
addr net.Addr
}
// Accept blocks until a connection is received from Handle, and then returns the
// connection. Accept implements part of the net.Listener interface for grpc.Server.
func (l *chanListener) Accept() (net.Conn, error) {
return <-l.conns, nil
}
func (l *chanListener) Addr() net.Addr {
return l.addr
}
// Close does nothing. The connections are managed by the caller.
func (l *chanListener) Close() error {
return nil
}
// NoOpHandler implements the same methods as Handler, but performs no handling.
// It may be used in place of Handler to disable the grpc server.
type NoOpHandler struct {
Logger Logger
}
type Logger interface {
Error(string, ...interface{})
}
func (h NoOpHandler) Handle(conn net.Conn) {
h.Logger.Error("gRPC conn opened but gRPC RPC is disabled, closing",
"conn", logConn(conn))
_ = conn.Close()
}
func (h NoOpHandler) Run() error {
return nil
}
func (h NoOpHandler) Shutdown() error {
return nil
}
// logConn is a local copy of github.com/hashicorp/memberlist.LogConn, to avoid
// a large dependency for a minor formatting function.
// logConn is used to keep log formatting consistent.
func logConn(conn net.Conn) string {
if conn == nil {
return "from=<unknown address>"
}
addr := conn.RemoteAddr()
if addr == nil {
return "from=<unknown address>"
}
return fmt.Sprintf("from=%s", addr.String())
}