consul/agent/grpc/client.go
Daniel Nephin 29e93f6338 grpc: fix a data race by using a static resolver
We have seen test flakes caused by 'concurrent map read and map write', and the race detector
reports the problem as well (prevent us from running some tests with -race).

The root of the problem is the grpc expects resolvers to be registered at init time
before any requests are made, but we were using a separate resolver for each test.

This commit introduces a resolver registry. The registry is registered as the single
resolver for the consul scheme. Each test uses the Authority section of the target
(instead of the scheme) to identify the resolver that should be used for the test.
The scheme is used for lookup, which is why it can no longer be used as the unique
key.

This allows us to use a lock around the map of resolvers, preventing the data race.
2021-06-02 11:35:38 -04:00

139 lines
4.3 KiB
Go

package grpc
import (
"context"
"fmt"
"net"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
)
// ClientConnPool creates and stores a connection for each datacenter.
type ClientConnPool struct {
dialer dialer
servers ServerLocator
conns map[string]*grpc.ClientConn
connsLock sync.Mutex
}
type ServerLocator interface {
// ServerForAddr is used to look up server metadata from an address.
ServerForAddr(addr string) (*metadata.Server, error)
// Authority returns the target authority to use to dial the server. This is primarily
// needed for testing multiple agents in parallel, because gRPC requires the
// resolver to be registered globally.
Authority() string
}
// TLSWrapper wraps a non-TLS connection and returns a connection with TLS
// enabled.
type TLSWrapper func(dc string, conn net.Conn) (net.Conn, error)
type dialer func(context.Context, string) (net.Conn, error)
// NewClientConnPool create new GRPC client pool to connect to servers using GRPC over RPC
func NewClientConnPool(servers ServerLocator, tls TLSWrapper, useTLSForDC func(dc string) bool) *ClientConnPool {
return &ClientConnPool{
dialer: newDialer(servers, tls, useTLSForDC),
servers: servers,
conns: make(map[string]*grpc.ClientConn),
}
}
// ClientConn returns a grpc.ClientConn for the datacenter. If there are no
// existing connections in the pool, a new one will be created, stored in the pool,
// then returned.
func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error) {
c.connsLock.Lock()
defer c.connsLock.Unlock()
if conn, ok := c.conns[datacenter]; ok {
return conn, nil
}
conn, err := grpc.Dial(
fmt.Sprintf("consul://%s/server.%s", c.servers.Authority(), datacenter),
// use WithInsecure mode here because we handle the TLS wrapping in the
// custom dialer based on logic around whether the server has TLS enabled.
grpc.WithInsecure(),
grpc.WithContextDialer(c.dialer),
grpc.WithDisableRetry(),
grpc.WithStatsHandler(newStatsHandler(defaultMetrics())),
// nolint:staticcheck // there is no other supported alternative to WithBalancerName
grpc.WithBalancerName("pick_first"),
// Keep alive parameters are based on the same default ones we used for
// Yamux. These are somewhat arbitrary but we did observe in scale testing
// that the gRPC defaults (servers send keepalives only every 2 hours,
// clients never) seemed to result in TCP drops going undetected until
// actual updates needed to be sent which caused unnecessary delays for
// deliveries. These settings should be no more work for servers than
// existing yamux clients but hopefully allow TCP drops to be detected
// earlier and so have a smaller chance of going unnoticed until there are
// actual updates to send out from the servers. The servers have a policy to
// not accept pings any faster than once every 15 seconds to protect against
// abuse.
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 10 * time.Second,
}))
if err != nil {
return nil, err
}
c.conns[datacenter] = conn
return conn, nil
}
// newDialer returns a gRPC dialer function that conditionally wraps the connection
// with TLS based on the Server.useTLS value.
func newDialer(servers ServerLocator, wrapper TLSWrapper, useTLSForDC func(dc string) bool) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{}
conn, err := d.DialContext(ctx, "tcp", addr)
if err != nil {
return nil, err
}
server, err := servers.ServerForAddr(addr)
if err != nil {
conn.Close()
return nil, err
}
if server.UseTLS && useTLSForDC(server.Datacenter) {
if wrapper == nil {
conn.Close()
return nil, fmt.Errorf("TLS enabled but got nil TLS wrapper")
}
// Switch the connection into TLS mode
if _, err := conn.Write([]byte{byte(pool.RPCTLS)}); err != nil {
conn.Close()
return nil, err
}
// Wrap the connection in a TLS client
tlsConn, err := wrapper(server.Datacenter, conn)
if err != nil {
conn.Close()
return nil, err
}
conn = tlsConn
}
_, err = conn.Write([]byte{pool.RPCGRPC})
if err != nil {
conn.Close()
return nil, err
}
return conn, nil
}
}