mirror of https://github.com/status-im/consul.git
agent/grpc: Add an integration test for ClientPool with TLS
Also deregister the resolver.Builder in tests.
This commit is contained in:
parent
e9479175a4
commit
8bcd5040c7
|
@ -5,14 +5,17 @@ import (
|
|||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
|
||||
"github.com/hashicorp/consul/agent/grpc/resolver"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
)
|
||||
|
||||
func TestNewDialer_WithTLSWrapper(t *testing.T) {
|
||||
|
@ -42,14 +45,43 @@ func TestNewDialer_WithTLSWrapper(t *testing.T) {
|
|||
require.True(t, called, "expected TLSWrapper to be called")
|
||||
}
|
||||
|
||||
// TODO: integration test TestNewDialer with TLS and rcp server, when the rpc
|
||||
// exists as an isolated component.
|
||||
func TestNewDialer_IntegrationWithTLSEnabledHandler(t *testing.T) {
|
||||
res := resolver.NewServerResolverBuilder(newConfig(t))
|
||||
registerWithGRPC(t, res)
|
||||
|
||||
srv := newTestServer(t, "server-1", "dc1")
|
||||
tlsConf, err := tlsutil.NewConfigurator(tlsutil.Config{
|
||||
VerifyIncoming: true,
|
||||
VerifyOutgoing: true,
|
||||
CAFile: "../../test/hostname/CertAuth.crt",
|
||||
CertFile: "../../test/hostname/Alice.crt",
|
||||
KeyFile: "../../test/hostname/Alice.key",
|
||||
}, hclog.New(nil))
|
||||
require.NoError(t, err)
|
||||
srv.rpc.tlsConf = tlsConf
|
||||
|
||||
res.AddServer(srv.Metadata())
|
||||
t.Cleanup(srv.shutdown)
|
||||
|
||||
pool := NewClientConnPool(res, TLSWrapper(tlsConf.OutgoingRPCWrapper()))
|
||||
|
||||
conn, err := pool.ClientConn("dc1")
|
||||
require.NoError(t, err)
|
||||
client := testservice.NewSimpleClient(conn)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
resp, err := client.Something(ctx, &testservice.Req{})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "server-1", resp.ServerName)
|
||||
require.True(t, atomic.LoadInt32(&srv.rpc.tlsConnEstablished) > 0)
|
||||
}
|
||||
|
||||
func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
|
||||
count := 4
|
||||
cfg := resolver.Config{Scheme: newScheme(t.Name())}
|
||||
res := resolver.NewServerResolverBuilder(cfg)
|
||||
resolver.RegisterWithGRPC(res)
|
||||
res := resolver.NewServerResolverBuilder(newConfig(t))
|
||||
registerWithGRPC(t, res)
|
||||
pool := NewClientConnPool(res, nil)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
|
@ -76,17 +108,17 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
|
|||
require.NotEqual(t, resp.ServerName, first.ServerName)
|
||||
}
|
||||
|
||||
func newScheme(n string) string {
|
||||
func newConfig(t *testing.T) resolver.Config {
|
||||
n := t.Name()
|
||||
s := strings.Replace(n, "/", "", -1)
|
||||
s = strings.Replace(s, "_", "", -1)
|
||||
return strings.ToLower(s)
|
||||
return resolver.Config{Scheme: strings.ToLower(s)}
|
||||
}
|
||||
|
||||
func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) {
|
||||
count := 5
|
||||
cfg := resolver.Config{Scheme: newScheme(t.Name())}
|
||||
res := resolver.NewServerResolverBuilder(cfg)
|
||||
resolver.RegisterWithGRPC(res)
|
||||
res := resolver.NewServerResolverBuilder(newConfig(t))
|
||||
registerWithGRPC(t, res)
|
||||
pool := NewClientConnPool(res, nil)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
|
@ -134,9 +166,8 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Rebalance(t *testing.T) {
|
|||
func TestClientConnPool_IntegrationWithGRPCResolver_MultiDC(t *testing.T) {
|
||||
dcs := []string{"dc1", "dc2", "dc3"}
|
||||
|
||||
cfg := resolver.Config{Scheme: newScheme(t.Name())}
|
||||
res := resolver.NewServerResolverBuilder(cfg)
|
||||
resolver.RegisterWithGRPC(res)
|
||||
res := resolver.NewServerResolverBuilder(newConfig(t))
|
||||
registerWithGRPC(t, res)
|
||||
pool := NewClientConnPool(res, nil)
|
||||
|
||||
for _, dc := range dcs {
|
||||
|
|
|
@ -12,19 +12,6 @@ import (
|
|||
"github.com/hashicorp/consul/agent/metadata"
|
||||
)
|
||||
|
||||
var registerLock sync.Mutex
|
||||
|
||||
// RegisterWithGRPC registers the ServerResolverBuilder as a grpc/resolver.
|
||||
// This function exists to synchronize registrations with a lock.
|
||||
// grpc/resolver.Register expects all registration to happen at init and does
|
||||
// not allow for concurrent registration. This function exists to support
|
||||
// parallel testing.
|
||||
func RegisterWithGRPC(b *ServerResolverBuilder) {
|
||||
registerLock.Lock()
|
||||
defer registerLock.Unlock()
|
||||
resolver.Register(b)
|
||||
}
|
||||
|
||||
// ServerResolverBuilder tracks the current server list and keeps any
|
||||
// ServerResolvers updated when changes occur.
|
||||
type ServerResolverBuilder struct {
|
||||
|
|
|
@ -2,19 +2,23 @@ package grpc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/resolver"
|
||||
|
||||
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
)
|
||||
|
||||
type testServer struct {
|
||||
|
@ -22,10 +26,16 @@ type testServer struct {
|
|||
name string
|
||||
dc string
|
||||
shutdown func()
|
||||
rpc *fakeRPCListener
|
||||
}
|
||||
|
||||
func (s testServer) Metadata() *metadata.Server {
|
||||
return &metadata.Server{ID: s.name, Datacenter: s.dc, Addr: s.addr}
|
||||
return &metadata.Server{
|
||||
ID: s.name,
|
||||
Datacenter: s.dc,
|
||||
Addr: s.addr,
|
||||
UseTLS: s.rpc.tlsConf != nil,
|
||||
}
|
||||
}
|
||||
|
||||
func newTestServer(t *testing.T, name string, dc string) testServer {
|
||||
|
@ -56,6 +66,7 @@ func newTestServer(t *testing.T, name string, dc string) testServer {
|
|||
addr: lis.Addr(),
|
||||
name: name,
|
||||
dc: dc,
|
||||
rpc: rpc,
|
||||
shutdown: func() {
|
||||
rpc.shutdown = true
|
||||
if err := lis.Close(); err != nil {
|
||||
|
@ -97,9 +108,11 @@ func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.
|
|||
// For now, since this logic is in agent/consul, we can't easily use Server.listen
|
||||
// so we fake it.
|
||||
type fakeRPCListener struct {
|
||||
t *testing.T
|
||||
handler *Handler
|
||||
shutdown bool
|
||||
t *testing.T
|
||||
handler *Handler
|
||||
shutdown bool
|
||||
tlsConf *tlsutil.Configurator
|
||||
tlsConnEstablished int32
|
||||
}
|
||||
|
||||
func (f *fakeRPCListener) listen(listener net.Listener) error {
|
||||
|
@ -128,11 +141,36 @@ func (f *fakeRPCListener) handleConn(conn net.Conn) {
|
|||
}
|
||||
typ := pool.RPCType(buf[0])
|
||||
|
||||
if typ == pool.RPCGRPC {
|
||||
switch typ {
|
||||
|
||||
case pool.RPCGRPC:
|
||||
f.handler.Handle(conn)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("ERROR: unexpected byte", typ)
|
||||
conn.Close()
|
||||
case pool.RPCTLS:
|
||||
// occasionally we see a test client connecting to an rpc listener that
|
||||
// was created as part of another test, despite none of the tests running
|
||||
// in parallel.
|
||||
// Maybe some strange grpc behaviour? I'm not sure.
|
||||
if f.tlsConf == nil {
|
||||
fmt.Println("ERROR: tls is not configured")
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
atomic.AddInt32(&f.tlsConnEstablished, 1)
|
||||
conn = tls.Server(conn, f.tlsConf.IncomingRPCConfig())
|
||||
f.handleConn(conn)
|
||||
|
||||
default:
|
||||
fmt.Println("ERROR: unexpected byte", typ)
|
||||
conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func registerWithGRPC(t *testing.T, b resolver.Builder) {
|
||||
resolver.Register(b)
|
||||
t.Cleanup(func() {
|
||||
resolver.UnregisterForTesting(b.Scheme())
|
||||
})
|
||||
}
|
||||
|
|
|
@ -5,10 +5,12 @@ import (
|
|||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
grpcresolver "google.golang.org/grpc/resolver"
|
||||
|
||||
autoconf "github.com/hashicorp/consul/agent/auto-config"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
|
@ -88,7 +90,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
|
|||
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator)
|
||||
|
||||
builder := resolver.NewServerResolverBuilder(resolver.Config{})
|
||||
resolver.RegisterWithGRPC(builder)
|
||||
registerWithGRPC(builder)
|
||||
d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()))
|
||||
|
||||
d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), builder)
|
||||
|
@ -162,3 +164,16 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
|
|||
}
|
||||
return pool
|
||||
}
|
||||
|
||||
var registerLock sync.Mutex
|
||||
|
||||
// registerWithGRPC registers the grpc/resolver.Builder as a grpc/resolver.
|
||||
// This function exists to synchronize registrations with a lock.
|
||||
// grpc/resolver.Register expects all registration to happen at init and does
|
||||
// not allow for concurrent registration. This function exists to support
|
||||
// parallel testing.
|
||||
func registerWithGRPC(b grpcresolver.Builder) {
|
||||
registerLock.Lock()
|
||||
defer registerLock.Unlock()
|
||||
grpcresolver.Register(b)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue