diff --git a/Makefile b/Makefile index 4d80406237..a7c275511c 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ MOCKERY_VERSION='v2.20.0' BUF_VERSION='v1.26.0' PROTOC_GEN_GO_GRPC_VERSION="v1.2.0" -MOG_VERSION='v0.4.0' +MOG_VERSION='v0.4.1' PROTOC_GO_INJECT_TAG_VERSION='v1.3.0' PROTOC_GEN_GO_BINARY_VERSION="v0.1.0" DEEP_COPY_VERSION='bc3f5aa5735d8a54961580a3a24422c308c831c2' diff --git a/agent/agent.go b/agent/agent.go index 9277443777..dab7d542c0 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -656,43 +656,12 @@ func (a *Agent) Start(ctx context.Context) error { return fmt.Errorf("failed to start Consul enterprise component: %v", err) } - // Create proxy config manager now because it is a dependency of creating the proxyWatcher - // which will be passed to consul.NewServer so that it is then passed to the - // controller registration for the XDS controller in v2 mode, and the xds server in v1 and v2 mode. - intentionDefaultAllow, err := a.config.ACLResolverSettings.IsDefaultAllow() - if err != nil { - return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy) - } - - go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh}) - - // Start the proxy config manager. - a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ - DataSources: a.proxyDataSources(), - Logger: a.logger.Named(logging.ProxyConfig), - Source: &structs.QuerySource{ - Datacenter: a.config.Datacenter, - Segment: a.config.SegmentName, - Node: a.config.NodeName, - NodePartition: a.config.PartitionOrEmpty(), - }, - DNSConfig: proxycfg.DNSConfig{ - Domain: a.config.DNSDomain, - AltDomain: a.config.DNSAltDomain, - }, - TLSConfigurator: a.tlsConfigurator, - IntentionDefaultAllow: intentionDefaultAllow, - UpdateRateLimit: a.config.XDSUpdateRateLimit, - }) - if err != nil { - return err - } - - // proxyWatcher will be used in the creation of the XDS server and also - // in the registration of the xds controller. - proxyWatcher := a.getProxyWatcher() + // proxyTracker will be used in the creation of the XDS server and also + // in the registration of the v2 xds controller + var proxyTracker *proxytracker.ProxyTracker // Setup either the client or the server. + var consulServer *consul.Server if c.ServerMode { serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer) @@ -729,16 +698,18 @@ func (a *Agent) Start(ctx context.Context) error { }, ) - var pt *proxytracker.ProxyTracker if a.baseDeps.UseV2Resources() { - pt = proxyWatcher.(*proxytracker.ProxyTracker) + proxyTracker = proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{ + Logger: a.logger.Named("proxy-tracker"), + SessionLimiter: a.baseDeps.XDSStreamLimiter, + }) } - server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, pt) + consulServer, err = consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, proxyTracker) if err != nil { return fmt.Errorf("Failed to start Consul server: %v", err) } - incomingRPCLimiter.Register(server) - a.delegate = server + incomingRPCLimiter.Register(consulServer) + a.delegate = consulServer if a.config.PeeringEnabled && a.config.ConnectEnabled { d := servercert.Deps{ @@ -748,7 +719,7 @@ func (a *Agent) Start(ctx context.Context) error { ACLsEnabled: a.config.ACLsEnabled, }, LeafCertManager: a.leafCertManager, - GetStore: func() servercert.Store { return server.FSM().State() }, + GetStore: func() servercert.Store { return consulServer.FSM().State() }, TLSConfigurator: a.tlsConfigurator, } a.certManager = servercert.NewCertManager(d) @@ -804,6 +775,35 @@ func (a *Agent) Start(ctx context.Context) error { return err } + intentionDefaultAllow, err := a.config.ACLResolverSettings.IsDefaultAllow() + if err != nil { + return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy) + } + + go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh}) + + // Start the proxy config manager. + a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ + DataSources: a.proxyDataSources(consulServer), + Logger: a.logger.Named(logging.ProxyConfig), + Source: &structs.QuerySource{ + Datacenter: a.config.Datacenter, + Segment: a.config.SegmentName, + Node: a.config.NodeName, + NodePartition: a.config.PartitionOrEmpty(), + }, + DNSConfig: proxycfg.DNSConfig{ + Domain: a.config.DNSDomain, + AltDomain: a.config.DNSAltDomain, + }, + TLSConfigurator: a.tlsConfigurator, + IntentionDefaultAllow: intentionDefaultAllow, + UpdateRateLimit: a.config.XDSUpdateRateLimit, + }) + if err != nil { + return err + } + go localproxycfg.Sync( &lib.StopChannelContext{StopCh: a.shutdownCh}, localproxycfg.SyncConfig{ @@ -856,7 +856,7 @@ func (a *Agent) Start(ctx context.Context) error { } // Start grpc and grpc_tls servers. - if err := a.listenAndServeGRPC(proxyWatcher); err != nil { + if err := a.listenAndServeGRPC(proxyTracker, consulServer); err != nil { return err } @@ -921,29 +921,13 @@ func (a *Agent) Failed() <-chan struct{} { return a.apiServers.failed } -// getProxyWatcher returns the proper implementation of the ProxyWatcher interface. -// It will return a ProxyTracker if "resource-apis" experiment is active. Otherwise, -// it will return a ConfigSource. -func (a *Agent) getProxyWatcher() xds.ProxyWatcher { - if a.baseDeps.UseV2Resources() { - a.logger.Trace("returning proxyTracker for getProxyWatcher") - return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{ - Logger: a.logger.Named("proxy-tracker"), - SessionLimiter: a.baseDeps.XDSStreamLimiter, - }) - } else { - a.logger.Trace("returning configSource for getProxyWatcher") - return localproxycfg.NewConfigSource(a.proxyConfig) - } -} - // configureXDSServer configures an XDS server with the proper implementation of // the PRoxyWatcher interface and registers the XDS server with Consul's // external facing GRPC server. -func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) { +func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher, server *consul.Server) { // TODO(agentless): rather than asserting the concrete type of delegate, we // should add a method to the Delegate interface to build a ConfigSource. - if server, ok := a.delegate.(*consul.Server); ok { + if server != nil { switch proxyWatcher.(type) { case *proxytracker.ProxyTracker: go func() { @@ -979,12 +963,18 @@ func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) { a.xdsServer.Register(a.externalGRPCServer) } -func (a *Agent) listenAndServeGRPC(proxyWatcher xds.ProxyWatcher) error { +func (a *Agent) listenAndServeGRPC(proxyTracker *proxytracker.ProxyTracker, server *consul.Server) error { if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 { return nil } + var proxyWatcher xds.ProxyWatcher + if a.baseDeps.UseV2Resources() { + proxyWatcher = proxyTracker + } else { + proxyWatcher = localproxycfg.NewConfigSource(a.proxyConfig) + } - a.configureXDSServer(proxyWatcher) + a.configureXDSServer(proxyWatcher, server) // Attempt to spawn listeners var listeners []net.Listener @@ -4579,7 +4569,7 @@ func (a *Agent) listenerPortLocked(svcID structs.ServiceID, checkID structs.Chec return port, nil } -func (a *Agent) proxyDataSources() proxycfg.DataSources { +func (a *Agent) proxyDataSources(server *consul.Server) proxycfg.DataSources { sources := proxycfg.DataSources{ CARoots: proxycfgglue.CacheCARoots(a.cache), CompiledDiscoveryChain: proxycfgglue.CacheCompiledDiscoveryChain(a.cache), @@ -4606,7 +4596,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources { ExportedPeeredServices: proxycfgglue.CacheExportedPeeredServices(a.cache), } - if server, ok := a.delegate.(*consul.Server); ok { + if server != nil { deps := proxycfgglue.ServerDataSourceDeps{ Datacenter: a.config.Datacenter, EventPublisher: a.baseDeps.EventPublisher, diff --git a/agent/agent_test.go b/agent/agent_test.go index 455a8bbb3e..0b17190dd6 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -23,19 +23,12 @@ import ( "os" "path" "path/filepath" - "reflect" "strconv" "strings" "sync" "testing" "time" - "github.com/hashicorp/consul/agent/grpc-external/limiter" - "github.com/hashicorp/consul/agent/proxycfg" - "github.com/hashicorp/consul/agent/proxycfg-sources/local" - "github.com/hashicorp/consul/agent/xds" - proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker" - "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/tcpproxy" @@ -6442,73 +6435,6 @@ func TestAgent_checkServerLastSeen(t *testing.T) { }) } -func TestAgent_getProxyWatcher(t *testing.T) { - type testcase struct { - description string - getExperiments func() []string - expectedType xds.ProxyWatcher - } - testscases := []testcase{ - { - description: "config source is returned when api-resources experiment is not configured", - expectedType: &local.ConfigSource{}, - getExperiments: func() []string { - return []string{} - }, - }, - { - description: "proxy tracker is returned when api-resources experiment is configured", - expectedType: &proxytracker.ProxyTracker{}, - getExperiments: func() []string { - return []string{consul.CatalogResourceExperimentName} - }, - }, - } - for _, tc := range testscases { - caConfig := tlsutil.Config{} - tlsConf, err := tlsutil.NewConfigurator(caConfig, hclog.New(nil)) - require.NoError(t, err) - - bd := BaseDeps{ - Deps: consul.Deps{ - Logger: hclog.NewInterceptLogger(nil), - Tokens: new(token.Store), - TLSConfigurator: tlsConf, - GRPCConnPool: &fakeGRPCConnPool{}, - Registry: resource.NewRegistry(), - }, - RuntimeConfig: &config.RuntimeConfig{ - HTTPAddrs: []net.Addr{ - &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: freeport.GetOne(t)}, - }, - }, - Cache: cache.New(cache.Options{}), - NetRPC: &LazyNetRPC{}, - } - - bd.XDSStreamLimiter = limiter.NewSessionLimiter() - bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{ - CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC), - RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"), - Config: leafcert.Config{}, - }) - - cfg := config.RuntimeConfig{ - BuildDate: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC), - } - bd, err = initEnterpriseBaseDeps(bd, &cfg) - require.NoError(t, err) - - bd.Experiments = tc.getExperiments() - - agent, err := New(bd) - require.NoError(t, err) - agent.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{Logger: bd.Logger, Source: &structs.QuerySource{}}) - require.NoError(t, err) - require.IsTypef(t, tc.expectedType, agent.getProxyWatcher(), fmt.Sprintf("Expected proxyWatcher to be of type %s", reflect.TypeOf(tc.expectedType))) - } - -} func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool { pool := x509.NewCertPool() data, err := os.ReadFile("../test/ca/root.cer") diff --git a/agent/consul/discoverychain/gateway_httproute.go b/agent/consul/discoverychain/gateway_httproute.go index 3a4acf48e4..c4816e0274 100644 --- a/agent/consul/discoverychain/gateway_httproute.go +++ b/agent/consul/discoverychain/gateway_httproute.go @@ -177,12 +177,9 @@ func httpRouteToDiscoveryChain(route structs.HTTPRouteConfigEntry) (*structs.Ser } if rule.Filters.RetryFilter != nil { - if rule.Filters.RetryFilter.NumRetries != nil { - destination.NumRetries = *rule.Filters.RetryFilter.NumRetries - } - if rule.Filters.RetryFilter.RetryOnConnectFailure != nil { - destination.RetryOnConnectFailure = *rule.Filters.RetryFilter.RetryOnConnectFailure - } + + destination.NumRetries = rule.Filters.RetryFilter.NumRetries + destination.RetryOnConnectFailure = rule.Filters.RetryFilter.RetryOnConnectFailure if len(rule.Filters.RetryFilter.RetryOn) > 0 { destination.RetryOn = rule.Filters.RetryFilter.RetryOn diff --git a/agent/consul/gateways/controller_gateways.go b/agent/consul/gateways/controller_gateways.go index 24e1dd4c27..ae82bdddc4 100644 --- a/agent/consul/gateways/controller_gateways.go +++ b/agent/consul/gateways/controller_gateways.go @@ -686,14 +686,16 @@ func (g *gatewayMeta) updateRouteBinding(route structs.BoundRoute) (bool, []stru errors[ref] = err } + isValidJWT := true if httpRoute, ok := route.(*structs.HTTPRouteConfigEntry); ok { var jwtErrors map[structs.ResourceReference]error - didBind, jwtErrors = g.validateJWTForRoute(httpRoute) + isValidJWT, jwtErrors = g.validateJWTForRoute(httpRoute) for ref, err := range jwtErrors { errors[ref] = err } } - if didBind { + + if didBind && isValidJWT { refDidBind = true listenerBound[listener.Name] = true } diff --git a/agent/structs/config_entry_routes.go b/agent/structs/config_entry_routes.go index 7f764c93c7..741404769e 100644 --- a/agent/structs/config_entry_routes.go +++ b/agent/structs/config_entry_routes.go @@ -475,10 +475,10 @@ type URLRewrite struct { } type RetryFilter struct { - NumRetries *uint32 + NumRetries uint32 RetryOn []string RetryOnStatusCodes []uint32 - RetryOnConnectFailure *bool + RetryOnConnectFailure bool } type TimeoutFilter struct { diff --git a/agent/structs/structs.deepcopy.go b/agent/structs/structs.deepcopy.go index 8a2b95f296..017b0c943c 100644 --- a/agent/structs/structs.deepcopy.go +++ b/agent/structs/structs.deepcopy.go @@ -400,10 +400,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry { if o.Rules[i2].Filters.RetryFilter != nil { cp.Rules[i2].Filters.RetryFilter = new(RetryFilter) *cp.Rules[i2].Filters.RetryFilter = *o.Rules[i2].Filters.RetryFilter - if o.Rules[i2].Filters.RetryFilter.NumRetries != nil { - cp.Rules[i2].Filters.RetryFilter.NumRetries = new(uint32) - *cp.Rules[i2].Filters.RetryFilter.NumRetries = *o.Rules[i2].Filters.RetryFilter.NumRetries - } if o.Rules[i2].Filters.RetryFilter.RetryOn != nil { cp.Rules[i2].Filters.RetryFilter.RetryOn = make([]string, len(o.Rules[i2].Filters.RetryFilter.RetryOn)) copy(cp.Rules[i2].Filters.RetryFilter.RetryOn, o.Rules[i2].Filters.RetryFilter.RetryOn) @@ -412,10 +408,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry { cp.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes = make([]uint32, len(o.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes)) copy(cp.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes, o.Rules[i2].Filters.RetryFilter.RetryOnStatusCodes) } - if o.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure != nil { - cp.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure = new(bool) - *cp.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure = *o.Rules[i2].Filters.RetryFilter.RetryOnConnectFailure - } } if o.Rules[i2].Filters.TimeoutFilter != nil { cp.Rules[i2].Filters.TimeoutFilter = new(TimeoutFilter) @@ -493,10 +485,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry { if o.Rules[i2].Services[i4].Filters.RetryFilter != nil { cp.Rules[i2].Services[i4].Filters.RetryFilter = new(RetryFilter) *cp.Rules[i2].Services[i4].Filters.RetryFilter = *o.Rules[i2].Services[i4].Filters.RetryFilter - if o.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries != nil { - cp.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries = new(uint32) - *cp.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries = *o.Rules[i2].Services[i4].Filters.RetryFilter.NumRetries - } if o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn != nil { cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn = make([]string, len(o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn)) copy(cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn, o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOn) @@ -505,10 +493,6 @@ func (o *HTTPRouteConfigEntry) DeepCopy() *HTTPRouteConfigEntry { cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes = make([]uint32, len(o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes)) copy(cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes, o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnStatusCodes) } - if o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure != nil { - cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure = new(bool) - *cp.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure = *o.Rules[i2].Services[i4].Filters.RetryFilter.RetryOnConnectFailure - } } if o.Rules[i2].Services[i4].Filters.TimeoutFilter != nil { cp.Rules[i2].Services[i4].Filters.TimeoutFilter = new(TimeoutFilter) diff --git a/agent/xds/resources_test.go b/agent/xds/resources_test.go index 69a704386b..b47edcbac3 100644 --- a/agent/xds/resources_test.go +++ b/agent/xds/resources_test.go @@ -14,8 +14,6 @@ import ( envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" - "k8s.io/utils/pointer" - "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/xds/testcommon" @@ -571,10 +569,10 @@ func getAPIGatewayGoldenTestCases(t *testing.T) []goldenTestCase { }, }, RetryFilter: &structs.RetryFilter{ - NumRetries: pointer.Uint32(3), + NumRetries: 3, RetryOn: []string{"cancelled"}, RetryOnStatusCodes: []uint32{500}, - RetryOnConnectFailure: pointer.Bool(true), + RetryOnConnectFailure: true, }, TimeoutFilter: &structs.TimeoutFilter{ IdleTimeout: time.Second * 30, diff --git a/api/config_entry_routes.go b/api/config_entry_routes.go index 1918386576..bbaa032d50 100644 --- a/api/config_entry_routes.go +++ b/api/config_entry_routes.go @@ -222,10 +222,10 @@ type URLRewrite struct { } type RetryFilter struct { - NumRetries *uint32 + NumRetries uint32 RetryOn []string RetryOnStatusCodes []uint32 - RetryOnConnectFailure *bool + RetryOnConnectFailure bool } type TimeoutFilter struct { diff --git a/go.mod b/go.mod index 2187645660..5a1417eea3 100644 --- a/go.mod +++ b/go.mod @@ -124,7 +124,6 @@ require ( k8s.io/api v0.26.2 k8s.io/apimachinery v0.26.2 k8s.io/client-go v0.26.2 - k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5 ) require ( @@ -275,6 +274,7 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.90.1 // indirect k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect + k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect sigs.k8s.io/yaml v1.3.0 // indirect diff --git a/proto/private/pbconfigentry/config_entry.gen.go b/proto/private/pbconfigentry/config_entry.gen.go index 258d8dbb6d..6f9b53e01d 100644 --- a/proto/private/pbconfigentry/config_entry.gen.go +++ b/proto/private/pbconfigentry/config_entry.gen.go @@ -1822,19 +1822,19 @@ func RetryFilterToStructs(s *RetryFilter, t *structs.RetryFilter) { if s == nil { return } - t.NumRetries = &s.NumRetries + t.NumRetries = s.NumRetries t.RetryOn = s.RetryOn t.RetryOnStatusCodes = s.RetryOnStatusCodes - t.RetryOnConnectFailure = &s.RetryOnConnectFailure + t.RetryOnConnectFailure = s.RetryOnConnectFailure } func RetryFilterFromStructs(t *structs.RetryFilter, s *RetryFilter) { if s == nil { return } - s.NumRetries = *t.NumRetries + s.NumRetries = t.NumRetries s.RetryOn = t.RetryOn s.RetryOnStatusCodes = t.RetryOnStatusCodes - s.RetryOnConnectFailure = *t.RetryOnConnectFailure + s.RetryOnConnectFailure = t.RetryOnConnectFailure } func RetryPolicyBackOffToStructs(s *RetryPolicyBackOff, t *structs.RetryPolicyBackOff) { if s == nil { diff --git a/test/integration/consul-container/test/gateways/http_route_test.go b/test/integration/consul-container/test/gateways/http_route_test.go index 96c93da955..bf2c7d6956 100644 --- a/test/integration/consul-container/test/gateways/http_route_test.go +++ b/test/integration/consul-container/test/gateways/http_route_test.go @@ -9,7 +9,6 @@ import ( "encoding/hex" "fmt" "github.com/testcontainers/testcontainers-go" - "k8s.io/utils/pointer" "testing" "time" @@ -869,7 +868,7 @@ func TestHTTPRouteRetryAndTimeout(t *testing.T) { { Filters: api.HTTPFilters{ RetryFilter: &api.RetryFilter{ - NumRetries: pointer.Uint32(10), + NumRetries: 10, RetryOnStatusCodes: []uint32{500}, }, },