mirror of https://github.com/status-im/consul.git
Merge pull request #9461 from hashicorp/dnephin/xds-server
xds: enable race detector and some small cleanup
This commit is contained in:
parent
7292fe7db0
commit
36193c17d1
|
@ -237,7 +237,7 @@ jobs:
|
|||
command: |
|
||||
mkdir -p $TEST_RESULTS_DIR /tmp/jsonfile
|
||||
pkgs="$(go list ./... | \
|
||||
grep -E -v '^github.com/hashicorp/consul/agent(/consul|/local|/xds|/routine-leak-checker)?$' | \
|
||||
grep -E -v '^github.com/hashicorp/consul/agent(/consul|/local|/routine-leak-checker)?$' | \
|
||||
grep -E -v '^github.com/hashicorp/consul/command/')"
|
||||
gotestsum \
|
||||
--jsonfile /tmp/jsonfile/go-test-race.log \
|
||||
|
|
|
@ -650,22 +650,22 @@ func (a *Agent) listenAndServeGRPC() error {
|
|||
}
|
||||
|
||||
xdsServer := &xds.Server{
|
||||
Logger: a.logger,
|
||||
CfgMgr: a.proxyConfig,
|
||||
ResolveToken: a.resolveToken,
|
||||
CheckFetcher: a,
|
||||
CfgFetcher: a,
|
||||
Logger: a.logger.Named(logging.Envoy),
|
||||
CfgMgr: a.proxyConfig,
|
||||
ResolveToken: a.resolveToken,
|
||||
CheckFetcher: a,
|
||||
CfgFetcher: a,
|
||||
AuthCheckFrequency: xds.DefaultAuthCheckFrequency,
|
||||
}
|
||||
xdsServer.Initialize()
|
||||
|
||||
var err error
|
||||
if a.config.HTTPSPort > 0 {
|
||||
// gRPC uses the same TLS settings as the HTTPS API. If HTTPS is
|
||||
// enabled then gRPC will require HTTPS as well.
|
||||
a.grpcServer, err = xdsServer.GRPCServer(a.tlsConfigurator)
|
||||
} else {
|
||||
a.grpcServer, err = xdsServer.GRPCServer(nil)
|
||||
tlsConfig := a.tlsConfigurator
|
||||
// gRPC uses the same TLS settings as the HTTPS API. If HTTPS is not enabled
|
||||
// then gRPC should not use TLS.
|
||||
if a.config.HTTPSPort <= 0 {
|
||||
tlsConfig = nil
|
||||
}
|
||||
var err error
|
||||
a.grpcServer, err = xdsServer.GRPCServer(tlsConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -10,12 +10,13 @@ import (
|
|||
|
||||
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||
"github.com/golang/protobuf/ptypes/wrappers"
|
||||
testinf "github.com/mitchellh/go-testing-interface"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/xds/proxysupport"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
testinf "github.com/mitchellh/go-testing-interface"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestClustersFromSnapshot(t *testing.T) {
|
||||
|
@ -662,10 +663,7 @@ func TestClustersFromSnapshot(t *testing.T) {
|
|||
}
|
||||
|
||||
// Need server just for logger dependency
|
||||
logger := testutil.Logger(t)
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
}
|
||||
s := Server{Logger: testutil.Logger(t)}
|
||||
|
||||
cInfo := connectionInfo{
|
||||
Token: "my-token",
|
||||
|
|
|
@ -12,11 +12,12 @@ import (
|
|||
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
||||
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
|
||||
testinf "github.com/mitchellh/go-testing-interface"
|
||||
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/xds/proxysupport"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
testinf "github.com/mitchellh/go-testing-interface"
|
||||
)
|
||||
|
||||
func Test_makeLoadAssignment(t *testing.T) {
|
||||
|
@ -576,10 +577,7 @@ func Test_endpointsFromSnapshot(t *testing.T) {
|
|||
}
|
||||
|
||||
// Need server just for logger dependency
|
||||
logger := testutil.Logger(t)
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
}
|
||||
s := Server{Logger: testutil.Logger(t)}
|
||||
|
||||
cInfo := connectionInfo{
|
||||
Token: "my-token",
|
||||
|
|
|
@ -9,12 +9,13 @@ import (
|
|||
|
||||
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
|
||||
testinf "github.com/mitchellh/go-testing-interface"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/xds/proxysupport"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
testinf "github.com/mitchellh/go-testing-interface"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestListenersFromSnapshot(t *testing.T) {
|
||||
|
@ -505,10 +506,7 @@ func TestListenersFromSnapshot(t *testing.T) {
|
|||
}
|
||||
|
||||
// Need server just for logger dependency
|
||||
logger := testutil.Logger(t)
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
}
|
||||
s := Server{Logger: testutil.Logger(t)}
|
||||
|
||||
cInfo := connectionInfo{
|
||||
Token: "my-token",
|
||||
|
|
|
@ -9,14 +9,15 @@ import (
|
|||
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||
envoyroute "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
testinf "github.com/mitchellh/go-testing-interface"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/xds/proxysupport"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
testinf "github.com/mitchellh/go-testing-interface"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestRoutesFromSnapshot(t *testing.T) {
|
||||
|
@ -256,10 +257,7 @@ func TestRoutesFromSnapshot(t *testing.T) {
|
|||
tt.setup(snap)
|
||||
}
|
||||
|
||||
logger := testutil.Logger(t)
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
}
|
||||
s := Server{Logger: testutil.Logger(t)}
|
||||
cInfo := connectionInfo{
|
||||
Token: "my-token",
|
||||
ProxyFeatures: sf,
|
||||
|
|
|
@ -11,17 +11,17 @@ import (
|
|||
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
||||
envoydisco "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
)
|
||||
|
||||
// ADSStream is a shorter way of referring to this thing...
|
||||
|
@ -125,14 +125,6 @@ type Server struct {
|
|||
CfgFetcher ConfigFetcher
|
||||
}
|
||||
|
||||
// Initialize will finish configuring the Server for first use.
|
||||
func (s *Server) Initialize() {
|
||||
if s.AuthCheckFrequency == 0 {
|
||||
s.AuthCheckFrequency = DefaultAuthCheckFrequency
|
||||
}
|
||||
s.Logger = s.Logger.Named(logging.Envoy)
|
||||
}
|
||||
|
||||
// StreamAggregatedResources implements
|
||||
// envoydisco.AggregatedDiscoveryServiceServer. This is the ADS endpoint which is
|
||||
// the only xDS API we directly support for now.
|
||||
|
|
|
@ -89,7 +89,6 @@ func (m *testManager) AssertWatchCancelled(t *testing.T, proxyID structs.Service
|
|||
}
|
||||
|
||||
func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) {
|
||||
logger := testutil.Logger(t)
|
||||
mgr := newTestManager(t)
|
||||
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||
// Allow all
|
||||
|
@ -99,11 +98,10 @@ func TestServer_StreamAggregatedResources_BasicProtocol(t *testing.T) {
|
|||
defer envoy.Close()
|
||||
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
Logger: testutil.Logger(t),
|
||||
CfgMgr: mgr,
|
||||
ResolveToken: aclResolve,
|
||||
}
|
||||
s.Initialize()
|
||||
|
||||
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||
|
||||
|
@ -430,7 +428,6 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
|
|||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
logger := testutil.Logger(t)
|
||||
mgr := newTestManager(t)
|
||||
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||
if !tt.defaultDeny {
|
||||
|
@ -452,11 +449,10 @@ func TestServer_StreamAggregatedResources_ACLEnforcement(t *testing.T) {
|
|||
defer envoy.Close()
|
||||
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
Logger: testutil.Logger(t),
|
||||
CfgMgr: mgr,
|
||||
ResolveToken: aclResolve,
|
||||
}
|
||||
s.Initialize()
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
|
@ -513,7 +509,6 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedDuring
|
|||
var validToken atomic.Value
|
||||
validToken.Store(token)
|
||||
|
||||
logger := testutil.Logger(t)
|
||||
mgr := newTestManager(t)
|
||||
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||
if token := validToken.Load(); token == nil || id != token.(string) {
|
||||
|
@ -526,12 +521,11 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedDuring
|
|||
defer envoy.Close()
|
||||
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
Logger: testutil.Logger(t),
|
||||
CfgMgr: mgr,
|
||||
ResolveToken: aclResolve,
|
||||
AuthCheckFrequency: 1 * time.Hour, // make sure this doesn't kick in
|
||||
}
|
||||
s.Initialize()
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
|
@ -604,7 +598,6 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedInBack
|
|||
var validToken atomic.Value
|
||||
validToken.Store(token)
|
||||
|
||||
logger := testutil.Logger(t)
|
||||
mgr := newTestManager(t)
|
||||
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||
if token := validToken.Load(); token == nil || id != token.(string) {
|
||||
|
@ -617,12 +610,11 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedInBack
|
|||
defer envoy.Close()
|
||||
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
Logger: testutil.Logger(t),
|
||||
CfgMgr: mgr,
|
||||
ResolveToken: aclResolve,
|
||||
AuthCheckFrequency: 100 * time.Millisecond, // Make this short.
|
||||
}
|
||||
s.Initialize()
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
|
@ -694,7 +686,6 @@ func TestServer_StreamAggregatedResources_ACLTokenDeleted_StreamTerminatedInBack
|
|||
}
|
||||
|
||||
func TestServer_StreamAggregatedResources_IngressEmptyResponse(t *testing.T) {
|
||||
logger := testutil.Logger(t)
|
||||
mgr := newTestManager(t)
|
||||
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||
// Allow all
|
||||
|
@ -704,11 +695,10 @@ func TestServer_StreamAggregatedResources_IngressEmptyResponse(t *testing.T) {
|
|||
defer envoy.Close()
|
||||
|
||||
s := Server{
|
||||
Logger: logger,
|
||||
Logger: testutil.Logger(t),
|
||||
CfgMgr: mgr,
|
||||
ResolveToken: aclResolve,
|
||||
}
|
||||
s.Initialize()
|
||||
|
||||
sid := structs.NewServiceID("ingress-gateway", nil)
|
||||
|
||||
|
|
|
@ -189,7 +189,7 @@ func (e *TestEnvoy) Close() error {
|
|||
// unblock the recv chan to simulate recv error when client disconnects
|
||||
if e.stream != nil && e.stream.recvCh != nil {
|
||||
close(e.stream.recvCh)
|
||||
e.stream.recvCh = nil
|
||||
e.stream = nil
|
||||
}
|
||||
if e.cancel != nil {
|
||||
e.cancel()
|
||||
|
|
Loading…
Reference in New Issue