NET-6294 - v1 Agentless proxycfg datasource errors after v2 changes (#19365)

This commit is contained in:
John Murret 2023-10-27 14:06:38 -06:00 committed by GitHub
parent 1a6225ade2
commit f0cf8f2f40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 74 additions and 178 deletions

View File

@ -15,7 +15,7 @@ MOCKERY_VERSION='v2.20.0'
BUF_VERSION='v1.26.0'
PROTOC_GEN_GO_GRPC_VERSION="v1.2.0"
MOG_VERSION='v0.4.0'
MOG_VERSION='v0.4.1'
PROTOC_GO_INJECT_TAG_VERSION='v1.3.0'
PROTOC_GEN_GO_BINARY_VERSION="v0.1.0"
DEEP_COPY_VERSION='bc3f5aa5735d8a54961580a3a24422c308c831c2'

View File

@ -656,43 +656,12 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
}
// Create proxy config manager now because it is a dependency of creating the proxyWatcher
// which will be passed to consul.NewServer so that it is then passed to the
// controller registration for the XDS controller in v2 mode, and the xds server in v1 and v2 mode.
intentionDefaultAllow, err := a.config.ACLResolverSettings.IsDefaultAllow()
if err != nil {
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}
go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})
// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}
// proxyWatcher will be used in the creation of the XDS server and also
// in the registration of the xds controller.
proxyWatcher := a.getProxyWatcher()
// proxyTracker will be used in the creation of the XDS server and also
// in the registration of the v2 xds controller
var proxyTracker *proxytracker.ProxyTracker
// Setup either the client or the server.
var consulServer *consul.Server
if c.ServerMode {
serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer)
@ -729,16 +698,18 @@ func (a *Agent) Start(ctx context.Context) error {
},
)
var pt *proxytracker.ProxyTracker
if a.baseDeps.UseV2Resources() {
pt = proxyWatcher.(*proxytracker.ProxyTracker)
proxyTracker = proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{
Logger: a.logger.Named("proxy-tracker"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
}
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, pt)
consulServer, err = consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, proxyTracker)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
}
incomingRPCLimiter.Register(server)
a.delegate = server
incomingRPCLimiter.Register(consulServer)
a.delegate = consulServer
if a.config.PeeringEnabled && a.config.ConnectEnabled {
d := servercert.Deps{
@ -748,7 +719,7 @@ func (a *Agent) Start(ctx context.Context) error {
ACLsEnabled: a.config.ACLsEnabled,
},
LeafCertManager: a.leafCertManager,
GetStore: func() servercert.Store { return server.FSM().State() },
GetStore: func() servercert.Store { return consulServer.FSM().State() },
TLSConfigurator: a.tlsConfigurator,
}
a.certManager = servercert.NewCertManager(d)
@ -804,6 +775,35 @@ func (a *Agent) Start(ctx context.Context) error {
return err
}
intentionDefaultAllow, err := a.config.ACLResolverSettings.IsDefaultAllow()
if err != nil {
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}
go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})
// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(consulServer),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}
go localproxycfg.Sync(
&lib.StopChannelContext{StopCh: a.shutdownCh},
localproxycfg.SyncConfig{
@ -856,7 +856,7 @@ func (a *Agent) Start(ctx context.Context) error {
}
// Start grpc and grpc_tls servers.
if err := a.listenAndServeGRPC(proxyWatcher); err != nil {
if err := a.listenAndServeGRPC(proxyTracker, consulServer); err != nil {
return err
}
@ -921,29 +921,13 @@ func (a *Agent) Failed() <-chan struct{} {
return a.apiServers.failed
}
// getProxyWatcher returns the proper implementation of the ProxyWatcher interface.
// It will return a ProxyTracker if "resource-apis" experiment is active. Otherwise,
// it will return a ConfigSource.
func (a *Agent) getProxyWatcher() xds.ProxyWatcher {
if a.baseDeps.UseV2Resources() {
a.logger.Trace("returning proxyTracker for getProxyWatcher")
return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{
Logger: a.logger.Named("proxy-tracker"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
} else {
a.logger.Trace("returning configSource for getProxyWatcher")
return localproxycfg.NewConfigSource(a.proxyConfig)
}
}
// configureXDSServer configures an XDS server with the proper implementation of
// the PRoxyWatcher interface and registers the XDS server with Consul's
// external facing GRPC server.
func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) {
func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher, server *consul.Server) {
// TODO(agentless): rather than asserting the concrete type of delegate, we
// should add a method to the Delegate interface to build a ConfigSource.
if server, ok := a.delegate.(*consul.Server); ok {
if server != nil {
switch proxyWatcher.(type) {
case *proxytracker.ProxyTracker:
go func() {
@ -979,12 +963,18 @@ func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) {
a.xdsServer.Register(a.externalGRPCServer)
}
func (a *Agent) listenAndServeGRPC(proxyWatcher xds.ProxyWatcher) error {
func (a *Agent) listenAndServeGRPC(proxyTracker *proxytracker.ProxyTracker, server *consul.Server) error {
if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 {
return nil
}
var proxyWatcher xds.ProxyWatcher
if a.baseDeps.UseV2Resources() {
proxyWatcher = proxyTracker
} else {
proxyWatcher = localproxycfg.NewConfigSource(a.proxyConfig)
}
a.configureXDSServer(proxyWatcher)
a.configureXDSServer(proxyWatcher, server)
// Attempt to spawn listeners
var listeners []net.Listener
@ -4579,7 +4569,7 @@ func (a *Agent) listenerPortLocked(svcID structs.ServiceID, checkID structs.Chec
return port, nil
}
func (a *Agent) proxyDataSources() proxycfg.DataSources {
func (a *Agent) proxyDataSources(server *consul.Server) proxycfg.DataSources {
sources := proxycfg.DataSources{
CARoots: proxycfgglue.CacheCARoots(a.cache),
CompiledDiscoveryChain: proxycfgglue.CacheCompiledDiscoveryChain(a.cache),
@ -4606,7 +4596,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
ExportedPeeredServices: proxycfgglue.CacheExportedPeeredServices(a.cache),
}
if server, ok := a.delegate.(*consul.Server); ok {
if server != nil {
deps := proxycfgglue.ServerDataSourceDeps{
Datacenter: a.config.Datacenter,
EventPublisher: a.baseDeps.EventPublisher,

View File

@ -23,19 +23,12 @@ import (
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/xds"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/tcpproxy"
@ -6442,73 +6435,6 @@ func TestAgent_checkServerLastSeen(t *testing.T) {
})
}
func TestAgent_getProxyWatcher(t *testing.T) {
type testcase struct {
description string
getExperiments func() []string
expectedType xds.ProxyWatcher
}
testscases := []testcase{
{
description: "config source is returned when api-resources experiment is not configured",
expectedType: &local.ConfigSource{},
getExperiments: func() []string {
return []string{}
},
},
{
description: "proxy tracker is returned when api-resources experiment is configured",
expectedType: &proxytracker.ProxyTracker{},
getExperiments: func() []string {
return []string{consul.CatalogResourceExperimentName}
},
},
}
for _, tc := range testscases {
caConfig := tlsutil.Config{}
tlsConf, err := tlsutil.NewConfigurator(caConfig, hclog.New(nil))
require.NoError(t, err)
bd := BaseDeps{
Deps: consul.Deps{
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
TLSConfigurator: tlsConf,
GRPCConnPool: &fakeGRPCConnPool{},
Registry: resource.NewRegistry(),
},
RuntimeConfig: &config.RuntimeConfig{
HTTPAddrs: []net.Addr{
&net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: freeport.GetOne(t)},
},
},
Cache: cache.New(cache.Options{}),
NetRPC: &LazyNetRPC{},
}
bd.XDSStreamLimiter = limiter.NewSessionLimiter()
bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{
CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC),
RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"),
Config: leafcert.Config{},
})
cfg := config.RuntimeConfig{
BuildDate: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC),
}
bd, err = initEnterpriseBaseDeps(bd, &cfg)
require.NoError(t, err)
bd.Experiments = tc.getExperiments()
agent, err := New(bd)
require.NoError(t, err)
agent.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{Logger: bd.Logger, Source: &structs.QuerySource{}})
require.NoError(t, err)
require.IsTypef(t, tc.expectedType, agent.getProxyWatcher(), fmt.Sprintf("Expected proxyWatcher to be of type %s", reflect.TypeOf(tc.expectedType)))
}
}
func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool {
pool := x509.NewCertPool()
data, err := os.ReadFile("../test/ca/root.cer")

View File

@ -177,12 +177,9 @@ func httpRouteToDiscoveryChain(route structs.HTTPRouteConfigEntry) (*structs.Ser
}
if rule.Filters.RetryFilter != nil {
if rule.Filters.RetryFilter.NumRetries != nil {
destination.NumRetries = *rule.Filters.RetryFilter.NumRetries
}
if rule.Filters.RetryFilter.RetryOnConnectFailure != nil {
destination.RetryOnConnectFailure = *rule.Filters.RetryFilter.RetryOnConnectFailure
}
destination.NumRetries = rule.Filters.RetryFilter.NumRetries
destination.RetryOnConnectFailure = rule.Filters.RetryFilter.RetryOnConnectFailure
if len(rule.Filters.RetryFilter.RetryOn) > 0 {
destination.RetryOn = rule.Filters.RetryFilter.RetryOn

View File

@ -686,14 +686,16 @@ func (g *gatewayMeta) updateRouteBinding(route structs.BoundRoute) (bool, []stru
errors[ref] = err
}
isValidJWT := true
if httpRoute, ok := route.(*structs.HTTPRouteConfigEntry); ok {
var jwtErrors map[structs.ResourceReference]error
didBind, jwtErrors = g.validateJWTForRoute(httpRoute)
isValidJWT, jwtErrors = g.validateJWTForRoute(httpRoute)
for ref, err := range jwtErrors {
errors[ref] = err
}
}
if didBind {
if didBind && isValidJWT {
refDidBind = true
listenerBound[listener.Name] = true
}

View File

@ -475,10 +475,10 @@ type URLRewrite struct {
}
type RetryFilter struct {
NumRetries *uint32
NumRetries uint32
RetryOn []string
RetryOnStatusCodes []uint32
RetryOnConnectFailure *bool
RetryOnConnectFailure bool
}
type TimeoutFilter struct {

View File

@ -400,10 +400,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry {
if o.Rules[i2].Filters.RetryFilter != nil {
cp.Rules[i2].Filters.RetryFilter = new(RetryFilter)
*cp.Rules[i2].Filters.RetryFilter = *o.Rules[i2].Filters.RetryFilter
if o.Rules[i2].Filters.RetryFilter.NumRetries != nil {
cp.Rules[i2].Filters.RetryFilter.NumRetries = new(uint32)
*cp.Rules[i2].Filters.RetryFilter.NumRetries = *o.Rules[i2].Filters.RetryFilter.NumRetries
}
if o.Rules[i2].Filters.RetryFilter.RetryOn != nil {
cp.Rules[i2].Filters.RetryFilter.RetryOn = make([]string, len(o.Rules[i2].Filters.RetryFilter.RetryOn))
copy(cp.Rules[i2].Filters.RetryFilter.RetryOn, o.Rules[i2].Filters.RetryFilter.RetryOn)
@ -412,10 +408,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry {
cp.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes = make([]uint32, len(o.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes))
copy(cp.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes, o.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes)
}
if o.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure != nil {
cp.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure = new(bool)
*cp.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure = *o.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure
}
}
if o.Rules[i2].Filters.TimeoutFilter != nil {
cp.Rules[i2].Filters.TimeoutFilter = new(TimeoutFilter)
@ -493,10 +485,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry {
if o.Rules[i2].Services[i4].Filters.RetryFilter != nil {
cp.Rules[i2].Services[i4].Filters.RetryFilter = new(RetryFilter)
*cp.Rules[i2].Services[i4].Filters.RetryFilter = *o.Rules[i2].Services[i4].Filters.RetryFilter
if o.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries != nil {
cp.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries = new(uint32)
*cp.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries = *o.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries
}
if o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn != nil {
cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn = make([]string, len(o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn))
copy(cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn, o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn)
@ -505,10 +493,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry {
cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes = make([]uint32, len(o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes))
copy(cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes, o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes)
}
if o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure != nil {
cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure = new(bool)
*cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure = *o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure
}
}
if o.Rules[i2].Services[i4].Filters.TimeoutFilter != nil {
cp.Rules[i2].Services[i4].Filters.TimeoutFilter = new(TimeoutFilter)

View File

@ -14,8 +14,6 @@ import (
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
envoy_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"k8s.io/utils/pointer"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/xds/testcommon"
@ -571,10 +569,10 @@ func getAPIGatewayGoldenTestCases(t *testing.T) []goldenTestCase {
},
},
RetryFilter: &structs.RetryFilter{
NumRetries: pointer.Uint32(3),
NumRetries: 3,
RetryOn: []string{"cancelled"},
RetryOnStatusCodes: []uint32{500},
RetryOnConnectFailure: pointer.Bool(true),
RetryOnConnectFailure: true,
},
TimeoutFilter: &structs.TimeoutFilter{
IdleTimeout: time.Second * 30,

View File

@ -222,10 +222,10 @@ type URLRewrite struct {
}
type RetryFilter struct {
NumRetries *uint32
NumRetries uint32
RetryOn []string
RetryOnStatusCodes []uint32
RetryOnConnectFailure *bool
RetryOnConnectFailure bool
}
type TimeoutFilter struct {

2
go.mod
View File

@ -124,7 +124,6 @@ require (
k8s.io/api v0.26.2
k8s.io/apimachinery v0.26.2
k8s.io/client-go v0.26.2
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5
)
require (
@ -275,6 +274,7 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect

View File

@ -1822,19 +1822,19 @@ func RetryFilterToStructs(s *RetryFilter, t *structs.RetryFilter) {
if s == nil {
return
}
t.NumRetries = &s.NumRetries
t.NumRetries = s.NumRetries
t.RetryOn = s.RetryOn
t.RetryOnStatusCodes = s.RetryOnStatusCodes
t.RetryOnConnectFailure = &s.RetryOnConnectFailure
t.RetryOnConnectFailure = s.RetryOnConnectFailure
}
func RetryFilterFromStructs(t *structs.RetryFilter, s *RetryFilter) {
if s == nil {
return
}
s.NumRetries = *t.NumRetries
s.NumRetries = t.NumRetries
s.RetryOn = t.RetryOn
s.RetryOnStatusCodes = t.RetryOnStatusCodes
s.RetryOnConnectFailure = *t.RetryOnConnectFailure
s.RetryOnConnectFailure = t.RetryOnConnectFailure
}
func RetryPolicyBackOffToStructs(s *RetryPolicyBackOff, t *structs.RetryPolicyBackOff) {
if s == nil {

View File

@ -9,7 +9,6 @@ import (
"encoding/hex"
"fmt"
"github.com/testcontainers/testcontainers-go"
"k8s.io/utils/pointer"
"testing"
"time"
@ -869,7 +868,7 @@ func TestHTTPRouteRetryAndTimeout(t *testing.T) {
{
Filters: api.HTTPFilters{
RetryFilter: &api.RetryFilter{
NumRetries: pointer.Uint32(10),
NumRetries: 10,
RetryOnStatusCodes: []uint32{500},
},
},