Merge pull request #9461 from hashicorp/dnephin/xds-server

xds: enable race detector and some small cleanup
This commit is contained in:
Daniel Nephin 2021-01-07 18:29:18 -05:00 committed by GitHub
commit 6f996ad41e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 40 additions and 66 deletions

View File

@ -237,7 +237,7 @@ jobs:
command: |
mkdir -p $TEST_RESULTS_DIR /tmp/jsonfile
pkgs="$(go list ./... | \
grep -E -v '^github.com/hashicorp/consul/agent(/consul|/local|/xds|/routine-leak-checker)?$' | \
grep -E -v '^github.com/hashicorp/consul/agent(/consul|/local|/routine-leak-checker)?$' | \
grep -E -v '^github.com/hashicorp/consul/command/')"
gotestsum \
--jsonfile /tmp/jsonfile/go-test-race.log \

View File

@ -650,22 +650,22 @@ func (a *Agent) listenAndServeGRPC() error {
}
xdsServer := &xds.Server{
Logger: a.logger,
CfgMgr: a.proxyConfig,
ResolveToken: a.resolveToken,
CheckFetcher: a,
CfgFetcher: a,
Logger: a.logger.Named(logging.Envoy),
CfgMgr: a.proxyConfig,
ResolveToken: a.resolveToken,
CheckFetcher: a,
CfgFetcher: a,
AuthCheckFrequency: xds.DefaultAuthCheckFrequency,
}
xdsServer.Initialize()
var err error
if a.config.HTTPSPort > 0 {
// gRPC uses the same TLS settings as the HTTPS API. If HTTPS is
// enabled then gRPC will require HTTPS as well.
a.grpcServer, err = xdsServer.GRPCServer(a.tlsConfigurator)
} else {
a.grpcServer, err = xdsServer.GRPCServer(nil)
tlsConfig := a.tlsConfigurator
// gRPC uses the same TLS settings as the HTTPS API. If HTTPS is not enabled
// then gRPC should not use TLS.
if a.config.HTTPSPort <= 0 {
tlsConfig = nil
}
var err error
a.grpcServer, err = xdsServer.GRPCServer(tlsConfig)
if err != nil {
return err
}

View File

@ -10,12 +10,13 @@ import (
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/ptypes/wrappers"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/sdk/testutil"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
)
func TestClustersFromSnapshot(t *testing.T) {
@ -665,10 +666,7 @@ func TestClustersFromSnapshot(t *testing.T) {
}
// Need server just for logger dependency
logger := testutil.Logger(t)
s := Server{
Logger: logger,
}
s := Server{Logger: testutil.Logger(t)}
cInfo := connectionInfo{
Token: "my-token",

View File

@ -12,11 +12,12 @@ import (
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/sdk/testutil"
testinf "github.com/mitchellh/go-testing-interface"
)
func Test_makeLoadAssignment(t *testing.T) {
@ -579,10 +580,7 @@ func Test_endpointsFromSnapshot(t *testing.T) {
}
// Need server just for logger dependency
logger := testutil.Logger(t)
s := Server{
Logger: logger,
}
s := Server{Logger: testutil.Logger(t)}
cInfo := connectionInfo{
Token: "my-token",

View File

@ -9,12 +9,13 @@ import (
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/sdk/testutil"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
)
func TestListenersFromSnapshot(t *testing.T) {
@ -508,10 +509,7 @@ func TestListenersFromSnapshot(t *testing.T) {
}
// Need server just for logger dependency
logger := testutil.Logger(t)
s := Server{
Logger: logger,
}
s := Server{Logger: testutil.Logger(t)}
cInfo := connectionInfo{
Token: "my-token",

View File

@ -9,14 +9,15 @@ import (
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoyroute "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
"github.com/golang/protobuf/ptypes"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/sdk/testutil"
testinf "github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
)
func TestRoutesFromSnapshot(t *testing.T) {
@ -256,10 +257,7 @@ func TestRoutesFromSnapshot(t *testing.T) {
tt.setup(snap)
}
logger := testutil.Logger(t)
s := Server{
Logger: logger,
}
s := Server{Logger: testutil.Logger(t)}
cInfo := connectionInfo{
Token: "my-token",
ProxyFeatures: sf,

View File

@ -11,17 +11,17 @@ import (
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoydisco "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/tlsutil"
)
// ADSStream is a shorter way of referring to this thing...
@ -125,14 +125,6 @@ type Server struct {
CfgFetcher ConfigFetcher
}
// Initialize will finish configuring the Server for first use.
func (s *Server) Initialize() {
if s.AuthCheckFrequency == 0 {
s.AuthCheckFrequency = DefaultAuthCheckFrequency
}
s.Logger = s.Logger.Named(logging.Envoy)
}
// StreamAggregatedResources implements
// envoydisco.AggregatedDiscoveryServiceServer. This is the ADS endpoint which is
// the only xDS API we directly support for now.

View File

@ -89,7 +89,6 @@ func (m *testManager) AssertWatchCancelled(t *testing.T, proxyID structs.Service
}
func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) {
logger := testutil.Logger(t)
mgr := newTestManager(t)
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
@ -99,11 +98,10 @@ func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) {
defer envoy.Close()
s := Server{
Logger: logger,
Logger: testutil.Logger(t),
CfgMgr: mgr,
ResolveToken: aclResolve,
}
s.Initialize()
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -430,7 +428,6 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger := testutil.Logger(t)
mgr := newTestManager(t)
aclResolve := func(id string) (acl.Authorizer, error) {
if !tt.defaultDeny {
@ -452,11 +449,10 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
defer envoy.Close()
s := Server{
Logger: logger,
Logger: testutil.Logger(t),
CfgMgr: mgr,
ResolveToken: aclResolve,
}
s.Initialize()
errCh := make(chan error, 1)
go func() {
@ -513,7 +509,6 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedDuring
var validToken atomic.Value
validToken.Store(token)
logger := testutil.Logger(t)
mgr := newTestManager(t)
aclResolve := func(id string) (acl.Authorizer, error) {
if token := validToken.Load(); token == nil || id != token.(string) {
@ -526,12 +521,11 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedDuring
defer envoy.Close()
s := Server{
Logger: logger,
Logger: testutil.Logger(t),
CfgMgr: mgr,
ResolveToken: aclResolve,
AuthCheckFrequency: 1 * time.Hour, // make sure this doesn't kick in
}
s.Initialize()
errCh := make(chan error, 1)
go func() {
@ -608,7 +602,6 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedInBack
var validToken atomic.Value
validToken.Store(token)
logger := testutil.Logger(t)
mgr := newTestManager(t)
aclResolve := func(id string) (acl.Authorizer, error) {
if token := validToken.Load(); token == nil || id != token.(string) {
@ -621,12 +614,11 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedInBack
defer envoy.Close()
s := Server{
Logger: logger,
Logger: testutil.Logger(t),
CfgMgr: mgr,
ResolveToken: aclResolve,
AuthCheckFrequency: 100 * time.Millisecond, // Make this short.
}
s.Initialize()
errCh := make(chan error, 1)
go func() {
@ -698,7 +690,6 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedInBack
}
func TestServer_StreamAggregatedResources_IngressEmptyResponse(t *testing.T) {
logger := testutil.Logger(t)
mgr := newTestManager(t)
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
@ -708,11 +699,10 @@ func TestServer_StreamAggregatedResources_IngressEmptyResponse(t *testing.T) {
defer envoy.Close()
s := Server{
Logger: logger,
Logger: testutil.Logger(t),
CfgMgr: mgr,
ResolveToken: aclResolve,
}
s.Initialize()
sid := structs.NewServiceID("ingress-gateway", nil)

View File

@ -189,7 +189,7 @@ func (e *TestEnvoy) Close() error {
// unblock the recv chan to simulate recv error when client disconnects
if e.stream != nil && e.stream.recvCh != nil {
close(e.stream.recvCh)
e.stream.recvCh = nil
e.stream = nil
}
if e.cancel != nil {
e.cancel()