Merge pull request #8667 from hashicorp/streaming/grpc-client-conn-router

streaming: grpc.ClientConnPool and grpc resolver
This commit is contained in:
Daniel Nephin 2020-09-24 13:26:50 -04:00 committed by GitHub
commit 4d04139678
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 694 additions and 24 deletions

View File

@ -480,7 +480,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
tls, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), logger)
require.NoError(t, err, "failed to create tls configuration")
r := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter))
r := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter), nil)
connPool := &pool.ConnPool{
Server: false,

121
agent/grpc/client.go Normal file
View File

@ -0,0 +1,121 @@
package grpc
import (
"context"
"fmt"
"net"
"sync"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
)
// ClientConnPool creates and stores a connection for each datacenter.
type ClientConnPool struct {
dialer dialer
servers ServerLocator
conns map[string]*grpc.ClientConn
connsLock sync.Mutex
}
type ServerLocator interface {
// ServerForAddr is used to look up server metadata from an address.
ServerForAddr(addr string) (*metadata.Server, error)
// Scheme returns the url scheme to use to dial the server. This is primarily
// needed for testing multiple agents in parallel, because gRPC requires the
// resolver to be registered globally.
Scheme() string
}
// TLSWrapper wraps a non-TLS connection and returns a connection with TLS
// enabled.
type TLSWrapper func(dc string, conn net.Conn) (net.Conn, error)
type dialer func(context.Context, string) (net.Conn, error)
func NewClientConnPool(servers ServerLocator, tls TLSWrapper) *ClientConnPool {
return &ClientConnPool{
dialer: newDialer(servers, tls),
servers: servers,
conns: make(map[string]*grpc.ClientConn),
}
}
// ClientConn returns a grpc.ClientConn for the datacenter. If there are no
// existing connections in the pool, a new one will be created, stored in the pool,
// then returned.
func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error) {
c.connsLock.Lock()
defer c.connsLock.Unlock()
if conn, ok := c.conns[datacenter]; ok {
return conn, nil
}
conn, err := grpc.Dial(
fmt.Sprintf("%s:///server.%s", c.servers.Scheme(), datacenter),
// use WithInsecure mode here because we handle the TLS wrapping in the
// custom dialer based on logic around whether the server has TLS enabled.
grpc.WithInsecure(),
grpc.WithContextDialer(c.dialer),
grpc.WithDisableRetry(),
// TODO: previously this statsHandler was shared with the Handler. Is that necessary?
grpc.WithStatsHandler(newStatsHandler()),
// nolint:staticcheck // there is no other supported alternative to WithBalancerName
grpc.WithBalancerName("pick_first"))
if err != nil {
return nil, err
}
c.conns[datacenter] = conn
return conn, nil
}
// newDialer returns a gRPC dialer function that conditionally wraps the connection
// with TLS based on the Server.useTLS value.
func newDialer(servers ServerLocator, wrapper TLSWrapper) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{}
conn, err := d.DialContext(ctx, "tcp", addr)
if err != nil {
return nil, err
}
server, err := servers.ServerForAddr(addr)
if err != nil {
conn.Close()
return nil, err
}
if server.UseTLS {
if wrapper == nil {
conn.Close()
return nil, fmt.Errorf("TLS enabled but got nil TLS wrapper")
}
// Switch the connection into TLS mode
if _, err := conn.Write([]byte{byte(pool.RPCTLS)}); err != nil {
conn.Close()
return nil, err
}
// Wrap the connection in a TLS client
tlsConn, err := wrapper(server.Datacenter, conn)
if err != nil {
conn.Close()
return nil, err
}
conn = tlsConn
}
_, err = conn.Write([]byte{pool.RPCGRPC})
if err != nil {
conn.Close()
return nil, err
}
return conn, nil
}
}

157
agent/grpc/client_test.go Normal file
View File

@ -0,0 +1,157 @@
package grpc
import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
"github.com/hashicorp/consul/agent/grpc/resolver"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/require"
)
func TestNewDialer_WithTLSWrapper(t *testing.T) {
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(logError(t, lis.Close))
builder := resolver.NewServerResolverBuilder(resolver.Config{})
builder.AddServer(&metadata.Server{
Name: "server-1",
ID: "ID1",
Datacenter: "dc1",
Addr: lis.Addr(),
UseTLS: true,
})
var called bool
wrapper := func(_ string, conn net.Conn) (net.Conn, error) {
called = true
return conn, nil
}
dial := newDialer(builder, wrapper)
ctx := context.Background()
conn, err := dial(ctx, lis.Addr().String())
require.NoError(t, err)
require.NoError(t, conn.Close())
require.True(t, called, "expected TLSWrapper to be called")
}
// TODO: integration test TestNewDialer with TLS and rcp server, when the rpc
// exists as an isolated component.
func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
count := 4
cfg := resolver.Config{Scheme: newScheme(t.Name())}
res := resolver.NewServerResolverBuilder(cfg)
resolver.RegisterWithGRPC(res)
pool := NewClientConnPool(res, nil)
for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newTestServer(t, name, "dc1")
res.AddServer(srv.Metadata())
t.Cleanup(srv.shutdown)
}
conn, err := pool.ClientConn("dc1")
require.NoError(t, err)
client := testservice.NewSimpleClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
first, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)
res.RemoveServer(&metadata.Server{ID: first.ServerName, Datacenter: "dc1"})
resp, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)
require.NotEqual(t, resp.ServerName, first.ServerName)
}
func newScheme(n string) string {
s := strings.Replace(n, "/", "", -1)
s = strings.Replace(s, "_", "", -1)
return strings.ToLower(s)
}
func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) {
count := 4
cfg := resolver.Config{Scheme: newScheme(t.Name())}
res := resolver.NewServerResolverBuilder(cfg)
resolver.RegisterWithGRPC(res)
pool := NewClientConnPool(res, nil)
for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newTestServer(t, name, "dc1")
res.AddServer(srv.Metadata())
t.Cleanup(srv.shutdown)
}
conn, err := pool.ClientConn("dc1")
require.NoError(t, err)
client := testservice.NewSimpleClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
first, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)
t.Run("rebalance a different DC, does nothing", func(t *testing.T) {
res.NewRebalancer("dc-other")()
resp, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)
require.Equal(t, resp.ServerName, first.ServerName)
})
t.Run("rebalance the dc", func(t *testing.T) {
// Rebalance is random, but if we repeat it a few times it should give us a
// new server.
retry.RunWith(fastRetry, t, func(r *retry.R) {
res.NewRebalancer("dc1")()
resp, err := client.Something(ctx, &testservice.Req{})
require.NoError(r, err)
require.NotEqual(r, resp.ServerName, first.ServerName)
})
})
}
func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) {
dcs := []string{"dc1", "dc2", "dc3"}
cfg := resolver.Config{Scheme: newScheme(t.Name())}
res := resolver.NewServerResolverBuilder(cfg)
resolver.RegisterWithGRPC(res)
pool := NewClientConnPool(res, nil)
for _, dc := range dcs {
name := "server-0-" + dc
srv := newTestServer(t, name, dc)
res.AddServer(srv.Metadata())
t.Cleanup(srv.shutdown)
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
for _, dc := range dcs {
conn, err := pool.ClientConn(dc)
require.NoError(t, err)
client := testservice.NewSimpleClient(conn)
resp, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)
require.Equal(t, resp.Datacenter, dc)
}
}

View File

@ -21,10 +21,8 @@ func NewHandler(addr net.Addr) *Handler {
// TODO(streaming): add gRPC services to srv here
return &Handler{
srv: srv,
listener: &chanListener{addr: addr, conns: make(chan net.Conn)},
}
lis := &chanListener{addr: addr, conns: make(chan net.Conn)}
return &Handler{srv: srv, listener: lis}
}
// Handler implements a handler for the rpc server listener, and the
@ -57,15 +55,26 @@ type chanListener struct {
// Accept blocks until a connection is received from Handle, and then returns the
// connection. Accept implements part of the net.Listener interface for grpc.Server.
func (l *chanListener) Accept() (net.Conn, error) {
return <-l.conns, nil
select {
case c, ok := <-l.conns:
if !ok {
return nil, &net.OpError{
Op: "accept",
Net: l.addr.Network(),
Addr: l.addr,
Err: fmt.Errorf("listener closed"),
}
}
return c, nil
}
}
func (l *chanListener) Addr() net.Addr {
return l.addr
}
// Close does nothing. The connections are managed by the caller.
func (l *chanListener) Close() error {
close(l.conns)
return nil
}

View File

@ -0,0 +1,233 @@
package resolver
import (
"fmt"
"math/rand"
"strings"
"sync"
"time"
"github.com/hashicorp/consul/agent/metadata"
"google.golang.org/grpc/resolver"
)
var registerLock sync.Mutex
// RegisterWithGRPC registers the ServerResolverBuilder as a grpc/resolver.
// This function exists to synchronize registrations with a lock.
// grpc/resolver.Register expects all registration to happen at init and does
// not allow for concurrent registration. This function exists to support
// parallel testing.
func RegisterWithGRPC(b *ServerResolverBuilder) {
registerLock.Lock()
defer registerLock.Unlock()
resolver.Register(b)
}
// ServerResolverBuilder tracks the current server list and keeps any
// ServerResolvers updated when changes occur.
type ServerResolverBuilder struct {
// scheme used to query the server. Defaults to consul. Used to support
// parallel testing because gRPC registers resolvers globally.
scheme string
// servers is an index of Servers by Server.ID. The map contains server IDs
// for all datacenters, so it assumes the ID is globally unique.
servers map[string]*metadata.Server
// resolvers is an index of connections to the serverResolver which manages
// addresses of servers for that connection.
resolvers map[resolver.ClientConn]*serverResolver
// lock for servers and resolvers.
lock sync.RWMutex
}
var _ resolver.Builder = (*ServerResolverBuilder)(nil)
type Config struct {
// Scheme used to connect to the server. Defaults to consul.
Scheme string
}
func NewServerResolverBuilder(cfg Config) *ServerResolverBuilder {
if cfg.Scheme == "" {
cfg.Scheme = "consul"
}
return &ServerResolverBuilder{
scheme: cfg.Scheme,
servers: make(map[string]*metadata.Server),
resolvers: make(map[resolver.ClientConn]*serverResolver),
}
}
// Rebalance shuffles the server list for resolvers in all datacenters.
func (s *ServerResolverBuilder) NewRebalancer(dc string) func() {
shuffler := rand.New(rand.NewSource(time.Now().UnixNano()))
return func() {
s.lock.RLock()
defer s.lock.RUnlock()
for _, resolver := range s.resolvers {
if resolver.datacenter != dc {
continue
}
// Shuffle the list of addresses using the last list given to the resolver.
resolver.addrLock.Lock()
addrs := resolver.addrs
shuffler.Shuffle(len(addrs), func(i, j int) {
addrs[i], addrs[j] = addrs[j], addrs[i]
})
// Pass the shuffled list to the resolver.
resolver.updateAddrsLocked(addrs)
resolver.addrLock.Unlock()
}
}
}
// ServerForAddr returns server metadata for a server with the specified address.
func (s *ServerResolverBuilder) ServerForAddr(addr string) (*metadata.Server, error) {
s.lock.RLock()
defer s.lock.RUnlock()
for _, server := range s.servers {
if server.Addr.String() == addr {
return server, nil
}
}
return nil, fmt.Errorf("failed to find Consul server for address %q", addr)
}
// Build returns a new serverResolver for the given ClientConn. The resolver
// will keep the ClientConn's state updated based on updates from Serf.
func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOption) (resolver.Resolver, error) {
s.lock.Lock()
defer s.lock.Unlock()
// If there's already a resolver for this connection, return it.
// TODO(streaming): how would this happen since we already cache connections in ClientConnPool?
if resolver, ok := s.resolvers[cc]; ok {
return resolver, nil
}
// Make a new resolver for the dc and add it to the list of active ones.
datacenter := strings.TrimPrefix(target.Endpoint, "server.")
resolver := &serverResolver{
datacenter: datacenter,
clientConn: cc,
close: func() {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.resolvers, cc)
},
}
resolver.updateAddrs(s.getDCAddrs(datacenter))
s.resolvers[cc] = resolver
return resolver, nil
}
func (s *ServerResolverBuilder) Scheme() string { return s.scheme }
// AddServer updates the resolvers' states to include the new server's address.
func (s *ServerResolverBuilder) AddServer(server *metadata.Server) {
s.lock.Lock()
defer s.lock.Unlock()
s.servers[server.ID] = server
addrs := s.getDCAddrs(server.Datacenter)
for _, resolver := range s.resolvers {
if resolver.datacenter == server.Datacenter {
resolver.updateAddrs(addrs)
}
}
}
// RemoveServer updates the resolvers' states with the given server removed.
func (s *ServerResolverBuilder) RemoveServer(server *metadata.Server) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.servers, server.ID)
addrs := s.getDCAddrs(server.Datacenter)
for _, resolver := range s.resolvers {
if resolver.datacenter == server.Datacenter {
resolver.updateAddrs(addrs)
}
}
}
// getDCAddrs returns a list of the server addresses for the given datacenter.
// This method requires that lock is held for reads.
func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address {
var addrs []resolver.Address
for _, server := range s.servers {
if server.Datacenter != dc {
continue
}
addrs = append(addrs, resolver.Address{
Addr: server.Addr.String(),
Type: resolver.Backend,
ServerName: server.Name,
})
}
return addrs
}
// serverResolver is a grpc Resolver that will keep a grpc.ClientConn up to date
// on the list of server addresses to use.
type serverResolver struct {
// datacenter that can be reached by the clientConn. Used by ServerResolverBuilder
// to filter resolvers for those in a specific datacenter.
datacenter string
// clientConn that this resolver is providing addresses for.
clientConn resolver.ClientConn
// close is used by ServerResolverBuilder to remove this resolver from the
// index of resolvers. It is called by grpc when the connection is closed.
close func()
// addrs stores the list of addresses passed to updateAddrs, so that they
// can be rebalanced periodically by ServerResolverBuilder.
addrs []resolver.Address
addrLock sync.Mutex
}
var _ resolver.Resolver = (*serverResolver)(nil)
// updateAddrs updates this serverResolver's ClientConn to use the given set of
// addrs.
func (r *serverResolver) updateAddrs(addrs []resolver.Address) {
r.addrLock.Lock()
defer r.addrLock.Unlock()
r.updateAddrsLocked(addrs)
}
// updateAddrsLocked updates this serverResolver's ClientConn to use the given
// set of addrs. addrLock must be held by caller.
func (r *serverResolver) updateAddrsLocked(addrs []resolver.Address) {
// Only pass the first address initially, which will cause the
// balancer to spin down the connection for its previous first address
// if it is different. If we don't do this, it will keep using the old
// first address as long as it is still in the list, making it impossible to
// rebalance until that address is removed.
var firstAddr []resolver.Address
if len(addrs) > 0 {
firstAddr = []resolver.Address{addrs[0]}
}
r.clientConn.UpdateState(resolver.State{Addresses: firstAddr})
// Call UpdateState again with the entire list of addrs in case we need them
// for failover.
r.clientConn.UpdateState(resolver.State{Addresses: addrs})
r.addrs = addrs
}
func (r *serverResolver) Close() {
r.close()
}
// ResolveNow is not used
func (*serverResolver) ResolveNow(_ resolver.ResolveNowOption) {}

View File

@ -2,11 +2,66 @@ package grpc
import (
"context"
"fmt"
"io"
"net"
"testing"
"time"
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
type testServer struct {
addr net.Addr
name string
dc string
shutdown func()
}
func (s testServer) Metadata() *metadata.Server {
return &metadata.Server{ID: s.name, Datacenter: s.dc, Addr: s.addr}
}
func newTestServer(t *testing.T, name string, dc string) testServer {
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
handler := NewHandler(addr)
testservice.RegisterSimpleServer(handler.srv, &simple{name: name, dc: dc})
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
rpc := &fakeRPCListener{t: t, handler: handler}
g := errgroup.Group{}
g.Go(func() error {
return rpc.listen(lis)
})
g.Go(func() error {
return handler.Run()
})
return testServer{
addr: lis.Addr(),
name: name,
dc: dc,
shutdown: func() {
if err := lis.Close(); err != nil {
t.Logf("listener closed with error: %v", err)
}
if err := handler.Shutdown(); err != nil {
t.Logf("grpc server shutdown: %v", err)
}
if err := g.Wait(); err != nil {
t.Logf("grpc server error: %v", err)
}
},
}
}
type simple struct {
name string
dc string
@ -26,3 +81,45 @@ func (s *simple) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) er
func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) {
return &testservice.Resp{ServerName: s.name, Datacenter: s.dc}, nil
}
// fakeRPCListener mimics agent/consul.Server.listen to handle the RPCType byte.
// In the future we should be able to refactor Server and extract this RPC
// handling logic so that we don't need to use a fake.
// For now, since this logic is in agent/consul, we can't easily use Server.listen
// so we fake it.
type fakeRPCListener struct {
t *testing.T
handler *Handler
}
func (f *fakeRPCListener) listen(listener net.Listener) error {
for {
conn, err := listener.Accept()
if err != nil {
return err
}
go f.handleConn(conn)
}
}
func (f *fakeRPCListener) handleConn(conn net.Conn) {
buf := make([]byte, 1)
if _, err := conn.Read(buf); err != nil {
if err != io.EOF {
fmt.Println("ERROR", err.Error())
}
conn.Close()
return
}
typ := pool.RPCType(buf[0])
if typ == pool.RPCGRPC {
f.handler.Handle(conn)
return
}
fmt.Println("ERROR: unexpected byte", typ)
conn.Close()
}

30
agent/router/grpc.go Normal file
View File

@ -0,0 +1,30 @@
package router
import "github.com/hashicorp/consul/agent/metadata"
// ServerTracker is called when Router is notified of a server being added or
// removed.
type ServerTracker interface {
NewRebalancer(dc string) func()
AddServer(*metadata.Server)
RemoveServer(*metadata.Server)
}
// Rebalancer is called periodically to re-order the servers so that the load on the
// servers is evenly balanced.
type Rebalancer func()
// NoOpServerTracker is a ServerTracker that does nothing. Used when gRPC is not
// enabled.
type NoOpServerTracker struct{}
// Rebalance does nothing
func (NoOpServerTracker) NewRebalancer(string) func() {
return func() {}
}
// AddServer does nothing
func (NoOpServerTracker) AddServer(*metadata.Server) {}
// RemoveServer does nothing
func (NoOpServerTracker) RemoveServer(*metadata.Server) {}

View File

@ -98,6 +98,8 @@ type Manager struct {
// client.ConnPool.
connPoolPinger Pinger
rebalancer Rebalancer
// serverName has the name of the managers's server. This is used to
// short-circuit pinging to itself.
serverName string
@ -267,7 +269,7 @@ func (m *Manager) saveServerList(l serverList) {
}
// New is the only way to safely create a new Manager struct.
func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger, serverName string) (m *Manager) {
func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfCluster, connPoolPinger Pinger, serverName string, rb Rebalancer) (m *Manager) {
if logger == nil {
logger = hclog.New(&hclog.LoggerOptions{})
}
@ -278,6 +280,7 @@ func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfC
m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration)
m.shutdownCh = shutdownCh
m.rebalancer = rb
m.serverName = serverName
atomic.StoreInt32(&m.offline, 1)
@ -529,6 +532,7 @@ func (m *Manager) Start() {
for {
select {
case <-m.rebalanceTimer.C:
m.rebalancer()
m.RebalanceServers()
m.refreshServerRebalanceTimer()

View File

@ -54,14 +54,16 @@ func (s *fauxSerf) NumNodes() int {
func testManager() (m *Manager) {
logger := GetBufferedLogger()
shutdownCh := make(chan struct{})
m = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}, "")
m = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}, "", noopRebalancer)
return m
}
func noopRebalancer() {}
func testManagerFailProb(failPct float64) (m *Manager) {
logger := GetBufferedLogger()
shutdownCh := make(chan struct{})
m = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "")
m = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "", noopRebalancer)
return m
}
@ -300,7 +302,7 @@ func TestManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
shutdownCh := make(chan struct{})
for _, s := range clusters {
m := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{}, "")
m := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{}, "", noopRebalancer)
for i := 0; i < s.numServers; i++ {
nodeName := fmt.Sprintf("s%02d", i)
m.AddServer(&metadata.Server{Name: nodeName})

View File

@ -57,21 +57,23 @@ func (s *fauxSerf) NumNodes() int {
func testManager(t testing.TB) (m *router.Manager) {
logger := testutil.Logger(t)
shutdownCh := make(chan struct{})
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "")
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "", noopRebalancer)
return m
}
func noopRebalancer() {}
func testManagerFailProb(t testing.TB, failPct float64) (m *router.Manager) {
logger := testutil.Logger(t)
shutdownCh := make(chan struct{})
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "")
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}, "", noopRebalancer)
return m
}
func testManagerFailAddr(t testing.TB, failAddr net.Addr) (m *router.Manager) {
logger := testutil.Logger(t)
shutdownCh := make(chan struct{})
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failAddr: failAddr}, "")
m = router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failAddr: failAddr}, "", noopRebalancer)
return m
}
@ -195,7 +197,7 @@ func TestServers_FindServer(t *testing.T) {
func TestServers_New(t *testing.T) {
logger := testutil.Logger(t)
shutdownCh := make(chan struct{})
m := router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "")
m := router.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}, "", noopRebalancer)
if m == nil {
t.Fatalf("Manager nil")
}

View File

@ -41,6 +41,10 @@ type Router struct {
// routeFn is a hook to actually do the routing.
routeFn func(datacenter string) (*Manager, *metadata.Server, bool)
// grpcServerTracker is used to balance grpc connections across servers,
// and has callbacks for adding or removing a server.
grpcServerTracker ServerTracker
// isShutdown prevents adding new routes to a router after it is shut
// down.
isShutdown bool
@ -87,17 +91,21 @@ type areaInfo struct {
}
// NewRouter returns a new Router with the given configuration.
func NewRouter(logger hclog.Logger, localDatacenter, serverName string) *Router {
func NewRouter(logger hclog.Logger, localDatacenter, serverName string, tracker ServerTracker) *Router {
if logger == nil {
logger = hclog.New(&hclog.LoggerOptions{})
}
if tracker == nil {
tracker = NoOpServerTracker{}
}
router := &Router{
logger: logger.Named(logging.Router),
localDatacenter: localDatacenter,
serverName: serverName,
areas: make(map[types.AreaID]*areaInfo),
managers: make(map[string][]*Manager),
logger: logger.Named(logging.Router),
localDatacenter: localDatacenter,
serverName: serverName,
areas: make(map[types.AreaID]*areaInfo),
managers: make(map[string][]*Manager),
grpcServerTracker: tracker,
}
// Hook the direct route lookup by default.
@ -251,7 +259,8 @@ func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager {
}
shutdownCh := make(chan struct{})
manager := New(r.logger, shutdownCh, area.cluster, area.pinger, r.serverName)
rb := r.grpcServerTracker.NewRebalancer(dc)
manager := New(r.logger, shutdownCh, area.cluster, area.pinger, r.serverName, rb)
info = &managerInfo{
manager: manager,
shutdownCh: shutdownCh,
@ -278,6 +287,7 @@ func (r *Router) addServer(area *areaInfo, s *metadata.Server) error {
}
manager.AddServer(s)
r.grpcServerTracker.AddServer(s)
return nil
}
@ -313,6 +323,7 @@ func (r *Router) RemoveServer(areaID types.AreaID, s *metadata.Server) error {
return nil
}
info.manager.RemoveServer(s)
r.grpcServerTracker.RemoveServer(s)
// If this manager is empty then remove it so we don't accumulate cruft
// and waste time during request routing.

View File

@ -117,7 +117,7 @@ func testCluster(self string) *mockCluster {
func testRouter(t testing.TB, dc string) *Router {
logger := testutil.Logger(t)
return NewRouter(logger, dc, "")
return NewRouter(logger, dc, "", nil)
}
func TestRouter_Shutdown(t *testing.T) {

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/grpc/resolver"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/token"
@ -82,7 +83,10 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
d.Cache = cache.New(cfg.Cache)
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator)
d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter))
// TODO(streaming): setConfig.Scheme name for tests
builder := resolver.NewServerResolverBuilder(resolver.Config{})
resolver.RegisterWithGRPC(builder)
d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), builder)
acConf := autoconf.Config{
DirectRPC: d.ConnPool,