From 051f250edb23bf8c47fca965e80f9bd1eee73a26 Mon Sep 17 00:00:00 2001 From: John Murret Date: Thu, 24 Aug 2023 16:44:14 -0600 Subject: [PATCH] NET-5338 - NET-5338 - Run a v2 mode xds server (#18579) * NET-5338 - NET-5338 - Run a v2 mode xds server * fix linting --- agent/agent.go | 44 ++- agent/agent_test.go | 73 ++++ agent/config/default.go | 4 +- agent/config/runtime_test.go | 2 +- agent/consul/server.go | 4 +- agent/proxy-tracker/mock_Logger.go | 31 -- agent/proxy-tracker/proxy_tracker.go | 45 +-- agent/proxy-tracker/proxy_tracker_test.go | 35 +- .../proxycfg-sources/catalog/config_source.go | 11 +- .../catalog/config_source_oss.go | 13 + .../catalog/config_source_test.go | 35 +- .../catalog/mock_ConfigManager.go | 24 +- .../proxycfg-sources/catalog/mock_Watcher.go | 38 ++- agent/proxycfg-sources/local/config_source.go | 6 +- .../local/mock_ConfigManager.go | 24 +- agent/proxycfg-sources/local/sync.go | 2 +- agent/proxycfg/manager.go | 12 +- agent/proxycfg/manager_test.go | 8 +- agent/proxycfg/proxysnapshot.go | 76 +++++ agent/proxycfg/proxysnapshot_test.go | 311 ++++++++++++++++++ agent/proxycfg_test.go | 6 +- agent/xds/delta.go | 221 ++++++++----- agent/xds/delta_test.go | 21 +- agent/xds/protocol_trace.go | 15 +- agent/xds/proxystateconverter/converter.go | 21 +- agent/xds/proxystateconverter/endpoints.go | 8 +- agent/xds/server.go | 35 +- agent/xds/xds_protocol_helpers_test.go | 15 +- agent/xdsv2/endpoint_resources.go | 1 - agent/xdsv2/resources.go | 8 +- internal/mesh/proxy_state_exports.go | 42 +++ test/integration/connect/envoy/main_test.go | 39 ++- test/integration/connect/envoy/run-tests.sh | 12 + 33 files changed, 923 insertions(+), 319 deletions(-) delete mode 100644 agent/proxy-tracker/mock_Logger.go create mode 100644 agent/proxycfg-sources/catalog/config_source_oss.go create mode 100644 agent/proxycfg/proxysnapshot.go create mode 100644 agent/proxycfg/proxysnapshot_test.go create mode 100644 internal/mesh/proxy_state_exports.go diff --git a/agent/agent.go b/agent/agent.go index 229ec5b8fd..5f2e57c718 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -9,6 +9,9 @@ import ( "encoding/json" "errors" "fmt" + proxytracker "github.com/hashicorp/consul/agent/proxy-tracker" + catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog" + "github.com/hashicorp/consul/lib/stringslice" "io" "net" "net/http" @@ -54,7 +57,6 @@ import ( "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/proxycfg" proxycfgglue "github.com/hashicorp/consul/agent/proxycfg-glue" - catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog" localproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/local" "github.com/hashicorp/consul/agent/rpcclient" "github.com/hashicorp/consul/agent/rpcclient/configentry" @@ -908,13 +910,37 @@ func (a *Agent) Failed() <-chan struct{} { return a.apiServers.failed } -func (a *Agent) listenAndServeGRPC() error { - if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 { - return nil +// useV2Resources returns true if "resource-apis" is present in the Experiments +// array of the agent config. +func (a *Agent) useV2Resources() bool { + if stringslice.Contains(a.baseDeps.Experiments, consul.CatalogResourceExperimentName) { + return true } + return false +} + +// getProxyWatcher returns the proper implementation of the ProxyWatcher interface. +// It will return a ProxyTracker if "resource-apis" experiment is active. Otherwise, +// it will return a ConfigSource. +func (a *Agent) getProxyWatcher() xds.ProxyWatcher { + if a.useV2Resources() { + return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{ + Logger: a.proxyConfig.Logger.Named("proxy-tracker"), + SessionLimiter: a.baseDeps.XDSStreamLimiter, + }) + } else { + return localproxycfg.NewConfigSource(a.proxyConfig) + } +} + +// configureXDSServer configures an XDS server with the proper implementation of +// the PRoxyWatcher interface and registers the XDS server with Consul's +// external facing GRPC server. +func (a *Agent) configureXDSServer() { + cfg := a.getProxyWatcher() + // TODO(agentless): rather than asserting the concrete type of delegate, we // should add a method to the Delegate interface to build a ConfigSource. - var cfg xds.ProxyConfigSource = localproxycfg.NewConfigSource(a.proxyConfig) if server, ok := a.delegate.(*consul.Server); ok { catalogCfg := catalogproxycfg.NewConfigSource(catalogproxycfg.Config{ NodeName: a.config.NodeName, @@ -941,6 +967,14 @@ func (a *Agent) listenAndServeGRPC() error { a, ) a.xdsServer.Register(a.externalGRPCServer) +} + +func (a *Agent) listenAndServeGRPC() error { + if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 { + return nil + } + + a.configureXDSServer() // Attempt to spawn listeners var listeners []net.Listener diff --git a/agent/agent_test.go b/agent/agent_test.go index 96dd1de125..174e8c8cdf 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -14,6 +14,11 @@ import ( "encoding/json" "errors" "fmt" + "github.com/hashicorp/consul/agent/grpc-external/limiter" + proxytracker "github.com/hashicorp/consul/agent/proxy-tracker" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/proxycfg-sources/local" + "github.com/hashicorp/consul/agent/xds" mathrand "math/rand" "net" "net/http" @@ -22,6 +27,7 @@ import ( "os" "path" "path/filepath" + "reflect" "strconv" "strings" "sync" @@ -6358,6 +6364,73 @@ func TestAgent_checkServerLastSeen(t *testing.T) { }) } +func TestAgent_getProxyWatcher(t *testing.T) { + type testcase struct { + description string + getExperiments func() []string + expectedType xds.ProxyWatcher + } + testscases := []testcase{ + { + description: "config source is returned when api-resources experiment is not configured", + expectedType: &local.ConfigSource{}, + getExperiments: func() []string { + return []string{} + }, + }, + { + description: "proxy tracker is returned when api-resources experiment is configured", + expectedType: &proxytracker.ProxyTracker{}, + getExperiments: func() []string { + return []string{consul.CatalogResourceExperimentName} + }, + }, + } + for _, tc := range testscases { + caConfig := tlsutil.Config{} + tlsConf, err := tlsutil.NewConfigurator(caConfig, hclog.New(nil)) + require.NoError(t, err) + + bd := BaseDeps{ + Deps: consul.Deps{ + Logger: hclog.NewInterceptLogger(nil), + Tokens: new(token.Store), + TLSConfigurator: tlsConf, + GRPCConnPool: &fakeGRPCConnPool{}, + Registry: resource.NewRegistry(), + }, + RuntimeConfig: &config.RuntimeConfig{ + HTTPAddrs: []net.Addr{ + &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: freeport.GetOne(t)}, + }, + }, + Cache: cache.New(cache.Options{}), + NetRPC: &LazyNetRPC{}, + } + + bd.XDSStreamLimiter = limiter.NewSessionLimiter() + bd.LeafCertManager = leafcert.NewManager(leafcert.Deps{ + CertSigner: leafcert.NewNetRPCCertSigner(bd.NetRPC), + RootsReader: leafcert.NewCachedRootsReader(bd.Cache, "dc1"), + Config: leafcert.Config{}, + }) + + cfg := config.RuntimeConfig{ + BuildDate: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC), + } + bd, err = initEnterpriseBaseDeps(bd, &cfg) + require.NoError(t, err) + + bd.Experiments = tc.getExperiments() + + agent, err := New(bd) + require.NoError(t, err) + agent.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{Logger: bd.Logger, Source: &structs.QuerySource{}}) + require.NoError(t, err) + require.IsTypef(t, tc.expectedType, agent.getProxyWatcher(), fmt.Sprintf("Expected proxyWatcher to be of type %s", reflect.TypeOf(tc.expectedType))) + } + +} func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool { pool := x509.NewCertPool() data, err := os.ReadFile("../test/ca/root.cer") diff --git a/agent/config/default.go b/agent/config/default.go index 275a32a33f..f5ef349b75 100644 --- a/agent/config/default.go +++ b/agent/config/default.go @@ -209,9 +209,7 @@ func DevSource() Source { ports = { grpc = 8502 } - experiments = [ - "resource-apis" - ] + experiments = [] `, } } diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 54074bc195..e39557b3d4 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -324,7 +324,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) { rt.DevMode = true rt.DisableAnonymousSignature = true rt.DisableKeyringFile = true - rt.Experiments = []string{"resource-apis"} + rt.Experiments = nil rt.EnableDebug = true rt.UIConfig.Enabled = true rt.LeaveOnTerm = false diff --git a/agent/consul/server.go b/agent/consul/server.go index e440c6c27d..64c0c1f76a 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -134,7 +134,7 @@ const ( LeaderTransferMinVersion = "1.6.0" - catalogResourceExperimentName = "resource-apis" + CatalogResourceExperimentName = "resource-apis" ) const ( @@ -874,7 +874,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom } func (s *Server) registerControllers(deps Deps) { - if stringslice.Contains(deps.Experiments, catalogResourceExperimentName) { + if stringslice.Contains(deps.Experiments, CatalogResourceExperimentName) { catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies()) mesh.RegisterControllers(s.controllerManager, mesh.ControllerDependencies{ TrustBundleFetcher: func() (*pbproxystate.TrustBundle, error) { diff --git a/agent/proxy-tracker/mock_Logger.go b/agent/proxy-tracker/mock_Logger.go deleted file mode 100644 index b4d28b096e..0000000000 --- a/agent/proxy-tracker/mock_Logger.go +++ /dev/null @@ -1,31 +0,0 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. - -package proxytracker - -import mock "github.com/stretchr/testify/mock" - -// MockLogger is an autogenerated mock type for the Logger type -type MockLogger struct { - mock.Mock -} - -// Error provides a mock function with given fields: args -func (_m *MockLogger) Error(args ...interface{}) { - var _ca []interface{} - _ca = append(_ca, args...) - _m.Called(_ca...) -} - -// NewMockLogger creates a new instance of MockLogger. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockLogger(t interface { - mock.TestingT - Cleanup(func()) -}) *MockLogger { - mock := &MockLogger{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/agent/proxy-tracker/proxy_tracker.go b/agent/proxy-tracker/proxy_tracker.go index 00c0cf23f9..ca4478f237 100644 --- a/agent/proxy-tracker/proxy_tracker.go +++ b/agent/proxy-tracker/proxy_tracker.go @@ -6,6 +6,7 @@ package proxytracker import ( "errors" "fmt" + "github.com/hashicorp/go-hclog" "sync" "github.com/hashicorp/consul/internal/controller" @@ -14,7 +15,6 @@ import ( "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/proxycfg" - pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" "github.com/hashicorp/consul/proto-public/pbresource" ) @@ -35,9 +35,9 @@ func (e *ProxyConnection) Key() string { // when the ProxyState for that proxyID has changed. type proxyWatchData struct { // notifyCh is the channel that the watcher receives updates from ProxyTracker. - notifyCh chan *pbmesh.ProxyState + notifyCh chan proxycfg.ProxySnapshot // state is the current/last updated ProxyState for a given proxy. - state *pbmesh.ProxyState + state *mesh.ProxyState // token is the ACL token provided by the watcher. token string // nodeName is the node where the given proxy resides. @@ -46,7 +46,7 @@ type proxyWatchData struct { type ProxyTrackerConfig struct { // logger will be used to write log messages. - Logger Logger + Logger hclog.Logger // sessionLimiter is used to enforce xDS concurrency limits. SessionLimiter SessionLimiter @@ -87,10 +87,10 @@ func NewProxyTracker(cfg ProxyTrackerConfig) *ProxyTracker { // Watch connects a proxy with ProxyTracker and returns the consumer a channel to receive updates, // a channel to notify of xDS terminated session, and a cancel function to cancel the watch. func (pt *ProxyTracker) Watch(proxyID *pbresource.ID, - nodeName string, token string) (<-chan *pbmesh.ProxyState, + nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { - - if err := validateArgs(proxyID, nodeName, token); err != nil { + pt.config.Logger.Trace("watch initiated", "proxyID", proxyID, "nodeName", nodeName) + if err := pt.validateWatchArgs(proxyID, nodeName); err != nil { pt.config.Logger.Error("args failed validation", err) return nil, nil, nil, err } @@ -105,7 +105,8 @@ func (pt *ProxyTracker) Watch(proxyID *pbresource.ID, // This buffering is crucial otherwise we'd block immediately trying to // deliver the current snapshot below if we already have one. - proxyStateChan := make(chan *pbmesh.ProxyState, 1) + + proxyStateChan := make(chan proxycfg.ProxySnapshot, 1) watchData := &proxyWatchData{ notifyCh: proxyStateChan, state: nil, @@ -128,9 +129,11 @@ func (pt *ProxyTracker) Watch(proxyID *pbresource.ID, //Send an event to the controller err = pt.notifyNewProxyChannel(proxyID) if err != nil { + pt.config.Logger.Error("failed to notify controller of new proxy connection", err) pt.cancelWatchLocked(proxyReferenceKey, watchData.notifyCh, session) return nil, nil, nil, err } + pt.config.Logger.Trace("controller notified of watch created", "proxyID", proxyID, "nodeName", nodeName) return proxyStateChan, session.Terminated(), cancel, nil } @@ -163,34 +166,37 @@ func (pt *ProxyTracker) notifyNewProxyChannel(proxyID *pbresource.ID) error { // - ends the session with xDS session limiter. // - closes the proxy state channel assigned to the proxy. // This function assumes the state lock is already held. -func (pt *ProxyTracker) cancelWatchLocked(proxyReferenceKey resource.ReferenceKey, proxyStateChan chan *pbmesh.ProxyState, session limiter.Session) { +func (pt *ProxyTracker) cancelWatchLocked(proxyReferenceKey resource.ReferenceKey, proxyStateChan chan proxycfg.ProxySnapshot, session limiter.Session) { delete(pt.proxies, proxyReferenceKey) session.End() close(proxyStateChan) + pt.config.Logger.Trace("watch cancelled", "proxyReferenceKey", proxyReferenceKey) } -func validateArgs(proxyID *pbresource.ID, - nodeName string, token string) error { +// validateWatchArgs checks the proxyIDand nodeName passed to Watch +// and returns an error if the args are not properly constructed. +func (pt *ProxyTracker) validateWatchArgs(proxyID *pbresource.ID, + nodeName string) error { if proxyID == nil { return errors.New("proxyID is required") - } else if proxyID.Type.Kind != mesh.ProxyStateTemplateConfigurationType.Kind { + } else if proxyID.GetType().GetKind() != mesh.ProxyStateTemplateConfigurationType.Kind { return fmt.Errorf("proxyID must be a %s", mesh.ProxyStateTemplateConfigurationType.GetKind()) } else if nodeName == "" { return errors.New("nodeName is required") - } else if token == "" { - return errors.New("token is required") } return nil } // PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy. -func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *pbmesh.ProxyState) error { +func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *mesh.ProxyState) error { + pt.config.Logger.Trace("push change called for proxy", "proxyID", proxyID) proxyReferenceKey := resource.NewReferenceKey(proxyID) pt.mu.Lock() defer pt.mu.Unlock() if data, ok := pt.proxies[proxyReferenceKey]; ok { data.state = proxyState + pt.deliverLatest(proxyID, proxyState, data.notifyCh) } else { return errors.New("proxyState change could not be sent because proxy is not connected") @@ -199,7 +205,8 @@ func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *pbmesh.Pr return nil } -func (pt *ProxyTracker) deliverLatest(proxyID *pbresource.ID, proxyState *pbmesh.ProxyState, ch chan *pbmesh.ProxyState) { +func (pt *ProxyTracker) deliverLatest(proxyID *pbresource.ID, proxyState *mesh.ProxyState, ch chan proxycfg.ProxySnapshot) { + pt.config.Logger.Trace("delivering latest proxy snapshot to proxy", "proxyID", proxyID) // Send if chan is empty select { case ch <- proxyState: @@ -254,6 +261,7 @@ func (pt *ProxyTracker) ProxyConnectedToServer(proxyID *pbresource.ID) bool { // Shutdown removes all state and close all channels. func (pt *ProxyTracker) Shutdown() { + pt.config.Logger.Info("proxy tracker shutdown initiated") pt.mu.Lock() defer pt.mu.Unlock() @@ -271,8 +279,3 @@ func (pt *ProxyTracker) Shutdown() { type SessionLimiter interface { BeginSession() (limiter.Session, error) } - -//go:generate mockery --name Logger --inpackage -type Logger interface { - Error(args ...any) -} diff --git a/agent/proxy-tracker/proxy_tracker_test.go b/agent/proxy-tracker/proxy_tracker_test.go index 3913f95b21..e799249d43 100644 --- a/agent/proxy-tracker/proxy_tracker_test.go +++ b/agent/proxy-tracker/proxy_tracker_test.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/consul/internal/resource/resourcetest" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/sdk/testutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "testing" @@ -27,7 +28,7 @@ func TestProxyTracker_Watch(t *testing.T) { session1.On("Terminated").Return(session1TermCh) session1.On("End").Return() lim.On("BeginSession").Return(session1, nil) - logger := NewMockLogger(t) + logger := testutil.Logger(t) pt := NewProxyTracker(ProxyTrackerConfig{ Logger: logger, @@ -75,7 +76,7 @@ func TestProxyTracker_Watch_ErrorConsumerNotReady(t *testing.T) { session1 := newMockSession(t) session1.On("End").Return() lim.On("BeginSession").Return(session1, nil) - logger := NewMockLogger(t) + logger := testutil.Logger(t) pt := NewProxyTracker(ProxyTrackerConfig{ Logger: logger, @@ -125,13 +126,6 @@ func TestProxyTracker_Watch_ArgValidationErrors(t *testing.T) { token: "something", expectedError: errors.New("nodeName is required"), }, - { - description: "Empty token", - proxyID: resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID(), - nodeName: "something", - token: "", - expectedError: errors.New("token is required"), - }, { description: "resource is not ProxyStateTemplate", proxyID: resourcetest.Resource(mesh.ProxyConfigurationType, "test").ID(), @@ -143,8 +137,7 @@ func TestProxyTracker_Watch_ArgValidationErrors(t *testing.T) { for _, tc := range testcases { lim := NewMockSessionLimiter(t) lim.On("BeginSession").Return(nil, nil).Maybe() - logger := NewMockLogger(t) - logger.On("Error", mock.Anything, mock.Anything).Return(nil) + logger := testutil.Logger(t) pt := NewProxyTracker(ProxyTrackerConfig{ Logger: logger, @@ -165,9 +158,7 @@ func TestProxyTracker_Watch_SessionLimiterError(t *testing.T) { resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() lim := NewMockSessionLimiter(t) lim.On("BeginSession").Return(nil, errors.New("kaboom")) - logger := NewMockLogger(t) - logger.On("Error", mock.Anything, mock.Anything).Return(nil) - + logger := testutil.Logger(t) pt := NewProxyTracker(ProxyTrackerConfig{ Logger: logger, SessionLimiter: lim, @@ -190,7 +181,7 @@ func TestProxyTracker_PushChange(t *testing.T) { session1TermCh := make(limiter.SessionTerminatedChan) session1.On("Terminated").Return(session1TermCh) lim.On("BeginSession").Return(session1, nil) - logger := NewMockLogger(t) + logger := testutil.Logger(t) pt := NewProxyTracker(ProxyTrackerConfig{ Logger: logger, @@ -202,9 +193,9 @@ func TestProxyTracker_PushChange(t *testing.T) { require.NoError(t, err) // PushChange - proxyState := &pbmesh.ProxyState{ + proxyState := &mesh.ProxyState{ProxyState: &pbmesh.ProxyState{ IntentionDefaultAllow: true, - } + }} // using a goroutine so that the channel and main test thread do not cause // blocking issues with each other @@ -227,7 +218,7 @@ func TestProxyTracker_PushChange(t *testing.T) { func TestProxyTracker_PushChanges_ErrorProxyNotConnected(t *testing.T) { resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() lim := NewMockSessionLimiter(t) - logger := NewMockLogger(t) + logger := testutil.Logger(t) pt := NewProxyTracker(ProxyTrackerConfig{ Logger: logger, @@ -235,9 +226,9 @@ func TestProxyTracker_PushChanges_ErrorProxyNotConnected(t *testing.T) { }) // PushChange - proxyState := &pbmesh.ProxyState{ + proxyState := &mesh.ProxyState{ProxyState: &pbmesh.ProxyState{ IntentionDefaultAllow: true, - } + }} err := pt.PushChange(resourceID, proxyState) require.Error(t, err) @@ -276,7 +267,7 @@ func TestProxyTracker_ProxyConnectedToServer(t *testing.T) { lim := NewMockSessionLimiter(t) session1 := newMockSession(t) session1TermCh := make(limiter.SessionTerminatedChan) - logger := NewMockLogger(t) + logger := testutil.Logger(t) pt := NewProxyTracker(ProxyTrackerConfig{ Logger: logger, @@ -297,7 +288,7 @@ func TestProxyTracker_Shutdown(t *testing.T) { session1.On("Terminated").Return(session1TermCh) session1.On("End").Return().Maybe() lim.On("BeginSession").Return(session1, nil) - logger := NewMockLogger(t) + logger := testutil.Logger(t) pt := NewProxyTracker(ProxyTrackerConfig{ Logger: logger, diff --git a/agent/proxycfg-sources/catalog/config_source.go b/agent/proxycfg-sources/catalog/config_source.go index fb1ceab316..a7205b6622 100644 --- a/agent/proxycfg-sources/catalog/config_source.go +++ b/agent/proxycfg-sources/catalog/config_source.go @@ -6,6 +6,7 @@ package catalog import ( "context" "errors" + "github.com/hashicorp/consul/proto-public/pbresource" "sync" "github.com/hashicorp/go-hclog" @@ -48,11 +49,13 @@ func NewConfigSource(cfg Config) *ConfigSource { // Watch wraps the underlying proxycfg.Manager and dynamically registers // services from the catalog with it when requested by the xDS server. -func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { +func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { + // Create service ID + serviceID := structs.NewServiceID(id.Name, GetEnterpriseMetaFromResourceID(id)) // If the service is registered to the local agent, use the LocalConfigSource // rather than trying to configure it from the catalog. if nodeName == m.NodeName && m.LocalState.ServiceExists(serviceID) { - return m.LocalConfigSource.Watch(serviceID, nodeName, token) + return m.LocalConfigSource.Watch(id, nodeName, token) } // Begin a session with the xDS session concurrency limiter. @@ -276,7 +279,7 @@ type Config struct { //go:generate mockery --name ConfigManager --inpackage type ConfigManager interface { - Watch(req proxycfg.ProxyID) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc) + Watch(req proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc) Register(proxyID proxycfg.ProxyID, service *structs.NodeService, source proxycfg.ProxySource, token string, overwrite bool) error Deregister(proxyID proxycfg.ProxyID, source proxycfg.ProxySource) } @@ -289,7 +292,7 @@ type Store interface { //go:generate mockery --name Watcher --inpackage type Watcher interface { - Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) + Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) } //go:generate mockery --name SessionLimiter --inpackage diff --git a/agent/proxycfg-sources/catalog/config_source_oss.go b/agent/proxycfg-sources/catalog/config_source_oss.go new file mode 100644 index 0000000000..21ddede882 --- /dev/null +++ b/agent/proxycfg-sources/catalog/config_source_oss.go @@ -0,0 +1,13 @@ +//go:build !consulent +// +build !consulent + +package catalog + +import ( + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +func GetEnterpriseMetaFromResourceID(id *pbresource.ID) *acl.EnterpriseMeta { + return acl.DefaultEnterpriseMeta() +} diff --git a/agent/proxycfg-sources/catalog/config_source_test.go b/agent/proxycfg-sources/catalog/config_source_test.go index 0767466a6d..653ac66977 100644 --- a/agent/proxycfg-sources/catalog/config_source_test.go +++ b/agent/proxycfg-sources/catalog/config_source_test.go @@ -5,6 +5,7 @@ package catalog import ( "errors" + rtest "github.com/hashicorp/consul/internal/resource/resourcetest" "testing" "time" @@ -19,6 +20,7 @@ import ( "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" + "github.com/hashicorp/consul/internal/mesh" ) func TestConfigSource_Success(t *testing.T) { @@ -75,15 +77,15 @@ func TestConfigSource_Success(t *testing.T) { }) t.Cleanup(mgr.Shutdown) - snapCh, termCh, cancelWatch1, err := mgr.Watch(serviceID, nodeName, token) + snapCh, termCh, cancelWatch1, err := mgr.Watch(rtest.Resource(mesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) require.NoError(t, err) require.Equal(t, session1TermCh, termCh) // Expect Register to have been called with the proxy's inital port. select { case snap := <-snapCh: - require.Equal(t, 9999, snap.Port) - require.Equal(t, token, snap.ProxyID.Token) + require.Equal(t, 9999, snap.(*proxycfg.ConfigSnapshot).Port) + require.Equal(t, token, snap.(*proxycfg.ConfigSnapshot).ProxyID.Token) case <-time.After(100 * time.Millisecond): t.Fatal("timeout waiting for snapshot") } @@ -107,7 +109,7 @@ func TestConfigSource_Success(t *testing.T) { // Expect Register to have been called again with the proxy's new port. select { case snap := <-snapCh: - require.Equal(t, 8888, snap.Port) + require.Equal(t, 8888, snap.(*proxycfg.ConfigSnapshot).Port) case <-time.After(100 * time.Millisecond): t.Fatal("timeout waiting for snapshot") } @@ -126,13 +128,13 @@ func TestConfigSource_Success(t *testing.T) { require.Equal(t, map[string]any{ "local_connect_timeout_ms": 123, "max_inbound_connections": 321, - }, snap.Proxy.Config) + }, snap.(*proxycfg.ConfigSnapshot).Proxy.Config) case <-time.After(100 * time.Millisecond): t.Fatal("timeout waiting for snapshot") } // Start another watch. - _, termCh2, cancelWatch2, err := mgr.Watch(serviceID, nodeName, token) + _, termCh2, cancelWatch2, err := mgr.Watch(rtest.Resource(mesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) require.NoError(t, err) require.Equal(t, session2TermCh, termCh2) @@ -166,6 +168,7 @@ func TestConfigSource_Success(t *testing.T) { func TestConfigSource_LocallyManagedService(t *testing.T) { serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil) + proxyID := rtest.Resource(mesh.ProxyConfigurationType, serviceID.ID).ID() nodeName := "node-1" token := "token" @@ -173,8 +176,8 @@ func TestConfigSource_LocallyManagedService(t *testing.T) { localState.AddServiceWithChecks(&structs.NodeService{ID: serviceID.ID}, nil, "", false) localWatcher := NewMockWatcher(t) - localWatcher.On("Watch", serviceID, nodeName, token). - Return(make(<-chan *proxycfg.ConfigSnapshot), nil, proxycfg.CancelFunc(func() {}), nil) + localWatcher.On("Watch", proxyID, nodeName, token). + Return(make(<-chan proxycfg.ProxySnapshot), nil, proxycfg.CancelFunc(func() {}), nil) mgr := NewConfigSource(Config{ NodeName: nodeName, @@ -186,7 +189,7 @@ func TestConfigSource_LocallyManagedService(t *testing.T) { }) t.Cleanup(mgr.Shutdown) - _, _, _, err := mgr.Watch(serviceID, nodeName, token) + _, _, _, err := mgr.Watch(proxyID, nodeName, token) require.NoError(t, err) } @@ -213,7 +216,7 @@ func TestConfigSource_ErrorRegisteringService(t *testing.T) { cfgMgr := NewMockConfigManager(t) cfgMgr.On("Watch", mock.Anything). - Return(make(<-chan *proxycfg.ConfigSnapshot), cancel) + Return(make(<-chan proxycfg.ProxySnapshot), cancel) cfgMgr.On("Register", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(errors.New("KABOOM")) @@ -233,7 +236,7 @@ func TestConfigSource_ErrorRegisteringService(t *testing.T) { }) t.Cleanup(mgr.Shutdown) - _, _, _, err := mgr.Watch(serviceID, nodeName, token) + _, _, _, err := mgr.Watch(rtest.Resource(mesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) require.Error(t, err) require.True(t, canceledWatch, "watch should've been canceled") @@ -263,7 +266,7 @@ func TestConfigSource_NotProxyService(t *testing.T) { cfgMgr := NewMockConfigManager(t) cfgMgr.On("Watch", mock.Anything). - Return(make(<-chan *proxycfg.ConfigSnapshot), cancel) + Return(make(<-chan proxycfg.ProxySnapshot), cancel) mgr := NewConfigSource(Config{ Manager: cfgMgr, @@ -274,7 +277,7 @@ func TestConfigSource_NotProxyService(t *testing.T) { }) t.Cleanup(mgr.Shutdown) - _, _, _, err := mgr.Watch(serviceID, nodeName, token) + _, _, _, err := mgr.Watch(rtest.Resource(mesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) require.Error(t, err) require.Contains(t, err.Error(), "must be a sidecar proxy or gateway") require.True(t, canceledWatch, "watch should've been canceled") @@ -291,7 +294,7 @@ func TestConfigSource_SessionLimiterError(t *testing.T) { t.Cleanup(src.Shutdown) _, _, _, err := src.Watch( - structs.NewServiceID("web-sidecar-proxy-1", nil), + rtest.Resource(mesh.ProxyConfigurationType, "web-sidecar-proxy-1").ID(), "node-name", "token", ) @@ -309,9 +312,9 @@ func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName strin Token: token, } - snapCh := make(chan *proxycfg.ConfigSnapshot, 1) + snapCh := make(chan proxycfg.ProxySnapshot, 1) cfgMgr.On("Watch", proxyID). - Return((<-chan *proxycfg.ConfigSnapshot)(snapCh), proxycfg.CancelFunc(func() {}), nil) + Return((<-chan proxycfg.ProxySnapshot)(snapCh), proxycfg.CancelFunc(func() {}), nil) cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false). Run(func(args mock.Arguments) { diff --git a/agent/proxycfg-sources/catalog/mock_ConfigManager.go b/agent/proxycfg-sources/catalog/mock_ConfigManager.go index 3ae51c5f6a..b50d032d02 100644 --- a/agent/proxycfg-sources/catalog/mock_ConfigManager.go +++ b/agent/proxycfg-sources/catalog/mock_ConfigManager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.15.0. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package catalog @@ -34,19 +34,22 @@ func (_m *MockConfigManager) Register(proxyID proxycfg.ProxyID, service *structs } // Watch provides a mock function with given fields: req -func (_m *MockConfigManager) Watch(req proxycfg.ProxyID) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc) { +func (_m *MockConfigManager) Watch(req proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc) { ret := _m.Called(req) - var r0 <-chan *proxycfg.ConfigSnapshot - if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) <-chan *proxycfg.ConfigSnapshot); ok { + var r0 <-chan proxycfg.ProxySnapshot + var r1 proxycfg.CancelFunc + if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc)); ok { + return rf(req) + } + if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) <-chan proxycfg.ProxySnapshot); ok { r0 = rf(req) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan *proxycfg.ConfigSnapshot) + r0 = ret.Get(0).(<-chan proxycfg.ProxySnapshot) } } - var r1 proxycfg.CancelFunc if rf, ok := ret.Get(1).(func(proxycfg.ProxyID) proxycfg.CancelFunc); ok { r1 = rf(req) } else { @@ -58,13 +61,12 @@ func (_m *MockConfigManager) Watch(req proxycfg.ProxyID) (<-chan *proxycfg.Confi return r0, r1 } -type mockConstructorTestingTNewMockConfigManager interface { +// NewMockConfigManager creates a new instance of MockConfigManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockConfigManager(t interface { mock.TestingT Cleanup(func()) -} - -// NewMockConfigManager creates a new instance of MockConfigManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockConfigManager(t mockConstructorTestingTNewMockConfigManager) *MockConfigManager { +}) *MockConfigManager { mock := &MockConfigManager{} mock.Mock.Test(t) diff --git a/agent/proxycfg-sources/catalog/mock_Watcher.go b/agent/proxycfg-sources/catalog/mock_Watcher.go index d5ca046a40..7701fd7d38 100644 --- a/agent/proxycfg-sources/catalog/mock_Watcher.go +++ b/agent/proxycfg-sources/catalog/mock_Watcher.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.15.0. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package catalog @@ -6,9 +6,9 @@ import ( limiter "github.com/hashicorp/consul/agent/grpc-external/limiter" mock "github.com/stretchr/testify/mock" - proxycfg "github.com/hashicorp/consul/agent/proxycfg" + pbresource "github.com/hashicorp/consul/proto-public/pbresource" - structs "github.com/hashicorp/consul/agent/structs" + proxycfg "github.com/hashicorp/consul/agent/proxycfg" ) // MockWatcher is an autogenerated mock type for the Watcher type @@ -17,20 +17,25 @@ type MockWatcher struct { } // Watch provides a mock function with given fields: proxyID, nodeName, token -func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { +func (_m *MockWatcher) Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { ret := _m.Called(proxyID, nodeName, token) - var r0 <-chan *proxycfg.ConfigSnapshot - if rf, ok := ret.Get(0).(func(structs.ServiceID, string, string) <-chan *proxycfg.ConfigSnapshot); ok { + var r0 <-chan proxycfg.ProxySnapshot + var r1 limiter.SessionTerminatedChan + var r2 proxycfg.CancelFunc + var r3 error + if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error)); ok { + return rf(proxyID, nodeName, token) + } + if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) <-chan proxycfg.ProxySnapshot); ok { r0 = rf(proxyID, nodeName, token) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan *proxycfg.ConfigSnapshot) + r0 = ret.Get(0).(<-chan proxycfg.ProxySnapshot) } } - var r1 limiter.SessionTerminatedChan - if rf, ok := ret.Get(1).(func(structs.ServiceID, string, string) limiter.SessionTerminatedChan); ok { + if rf, ok := ret.Get(1).(func(*pbresource.ID, string, string) limiter.SessionTerminatedChan); ok { r1 = rf(proxyID, nodeName, token) } else { if ret.Get(1) != nil { @@ -38,8 +43,7 @@ func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token s } } - var r2 proxycfg.CancelFunc - if rf, ok := ret.Get(2).(func(structs.ServiceID, string, string) proxycfg.CancelFunc); ok { + if rf, ok := ret.Get(2).(func(*pbresource.ID, string, string) proxycfg.CancelFunc); ok { r2 = rf(proxyID, nodeName, token) } else { if ret.Get(2) != nil { @@ -47,8 +51,7 @@ func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token s } } - var r3 error - if rf, ok := ret.Get(3).(func(structs.ServiceID, string, string) error); ok { + if rf, ok := ret.Get(3).(func(*pbresource.ID, string, string) error); ok { r3 = rf(proxyID, nodeName, token) } else { r3 = ret.Error(3) @@ -57,13 +60,12 @@ func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token s return r0, r1, r2, r3 } -type mockConstructorTestingTNewMockWatcher interface { +// NewMockWatcher creates a new instance of MockWatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockWatcher(t interface { mock.TestingT Cleanup(func()) -} - -// NewMockWatcher creates a new instance of MockWatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockWatcher(t mockConstructorTestingTNewMockWatcher) *MockWatcher { +}) *MockWatcher { mock := &MockWatcher{} mock.Mock.Test(t) diff --git a/agent/proxycfg-sources/local/config_source.go b/agent/proxycfg-sources/local/config_source.go index e104043bac..634a0f479e 100644 --- a/agent/proxycfg-sources/local/config_source.go +++ b/agent/proxycfg-sources/local/config_source.go @@ -6,7 +6,9 @@ package local import ( "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/proxycfg-sources/catalog" structs "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto-public/pbresource" ) // ConfigSource wraps a proxycfg.Manager to create watches on services @@ -20,7 +22,9 @@ func NewConfigSource(cfgMgr ConfigManager) *ConfigSource { return &ConfigSource{cfgMgr} } -func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, _ string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { +func (m *ConfigSource) Watch(proxyID *pbresource.ID, nodeName string, _ string) (<-chan proxycfg.ProxySnapshot, + limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { + serviceID := structs.NewServiceID(proxyID.Name, catalog.GetEnterpriseMetaFromResourceID(proxyID)) watchCh, cancelWatch := m.manager.Watch(proxycfg.ProxyID{ ServiceID: serviceID, NodeName: nodeName, diff --git a/agent/proxycfg-sources/local/mock_ConfigManager.go b/agent/proxycfg-sources/local/mock_ConfigManager.go index 8f2c8fc6c8..d16f0e98ee 100644 --- a/agent/proxycfg-sources/local/mock_ConfigManager.go +++ b/agent/proxycfg-sources/local/mock_ConfigManager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.15.0. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package local @@ -50,19 +50,22 @@ func (_m *MockConfigManager) RegisteredProxies(source proxycfg.ProxySource) []pr } // Watch provides a mock function with given fields: id -func (_m *MockConfigManager) Watch(id proxycfg.ProxyID) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc) { +func (_m *MockConfigManager) Watch(id proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc) { ret := _m.Called(id) - var r0 <-chan *proxycfg.ConfigSnapshot - if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) <-chan *proxycfg.ConfigSnapshot); ok { + var r0 <-chan proxycfg.ProxySnapshot + var r1 proxycfg.CancelFunc + if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc)); ok { + return rf(id) + } + if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) <-chan proxycfg.ProxySnapshot); ok { r0 = rf(id) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan *proxycfg.ConfigSnapshot) + r0 = ret.Get(0).(<-chan proxycfg.ProxySnapshot) } } - var r1 proxycfg.CancelFunc if rf, ok := ret.Get(1).(func(proxycfg.ProxyID) proxycfg.CancelFunc); ok { r1 = rf(id) } else { @@ -74,13 +77,12 @@ func (_m *MockConfigManager) Watch(id proxycfg.ProxyID) (<-chan *proxycfg.Config return r0, r1 } -type mockConstructorTestingTNewMockConfigManager interface { +// NewMockConfigManager creates a new instance of MockConfigManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockConfigManager(t interface { mock.TestingT Cleanup(func()) -} - -// NewMockConfigManager creates a new instance of MockConfigManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockConfigManager(t mockConstructorTestingTNewMockConfigManager) *MockConfigManager { +}) *MockConfigManager { mock := &MockConfigManager{} mock.Mock.Test(t) diff --git a/agent/proxycfg-sources/local/sync.go b/agent/proxycfg-sources/local/sync.go index 982e527f79..bb06cc8438 100644 --- a/agent/proxycfg-sources/local/sync.go +++ b/agent/proxycfg-sources/local/sync.go @@ -135,7 +135,7 @@ func sync(cfg SyncConfig) { //go:generate mockery --name ConfigManager --inpackage type ConfigManager interface { - Watch(id proxycfg.ProxyID) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc) + Watch(id proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc) Register(proxyID proxycfg.ProxyID, service *structs.NodeService, source proxycfg.ProxySource, token string, overwrite bool) error Deregister(proxyID proxycfg.ProxyID, source proxycfg.ProxySource) RegisteredProxies(source proxycfg.ProxySource) []proxycfg.ProxyID diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index ac87dddc19..296bd831a8 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -55,7 +55,7 @@ type Manager struct { mu sync.Mutex proxies map[ProxyID]*state - watchers map[ProxyID]map[uint64]chan *ConfigSnapshot + watchers map[ProxyID]map[uint64]chan ProxySnapshot maxWatchID uint64 } @@ -106,7 +106,7 @@ func NewManager(cfg ManagerConfig) (*Manager, error) { m := &Manager{ ManagerConfig: cfg, proxies: make(map[ProxyID]*state), - watchers: make(map[ProxyID]map[uint64]chan *ConfigSnapshot), + watchers: make(map[ProxyID]map[uint64]chan ProxySnapshot), rateLimiter: rate.NewLimiter(cfg.UpdateRateLimit, 1), } return m, nil @@ -262,7 +262,7 @@ func (m *Manager) notify(snap *ConfigSnapshot) { // it will drain the chan and then re-attempt delivery so that a slow consumer // gets the latest config earlier. This MUST be called from a method where m.mu // is held to be safe since it assumes we are the only goroutine sending on ch. -func (m *Manager) deliverLatest(snap *ConfigSnapshot, ch chan *ConfigSnapshot) { +func (m *Manager) deliverLatest(snap *ConfigSnapshot, ch chan ProxySnapshot) { // Send if chan is empty select { case ch <- snap: @@ -299,16 +299,16 @@ OUTER: // will not fail, but no updates will be delivered until the proxy is // registered. If there is already a valid snapshot in memory, it will be // delivered immediately. -func (m *Manager) Watch(id ProxyID) (<-chan *ConfigSnapshot, CancelFunc) { +func (m *Manager) Watch(id ProxyID) (<-chan ProxySnapshot, CancelFunc) { m.mu.Lock() defer m.mu.Unlock() // This buffering is crucial otherwise we'd block immediately trying to // deliver the current snapshot below if we already have one. - ch := make(chan *ConfigSnapshot, 1) + ch := make(chan ProxySnapshot, 1) watchers, ok := m.watchers[id] if !ok { - watchers = make(map[uint64]chan *ConfigSnapshot) + watchers = make(map[uint64]chan ProxySnapshot) } watchID := m.maxWatchID m.maxWatchID++ diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 72deaa73ab..c66352c1ec 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -469,7 +469,7 @@ func testManager_BasicLifecycle( require.Len(t, m.watchers, 0) } -func assertWatchChanBlocks(t *testing.T, ch <-chan *ConfigSnapshot) { +func assertWatchChanBlocks(t *testing.T, ch <-chan ProxySnapshot) { t.Helper() select { @@ -479,7 +479,7 @@ func assertWatchChanBlocks(t *testing.T, ch <-chan *ConfigSnapshot) { } } -func assertWatchChanRecvs(t *testing.T, ch <-chan *ConfigSnapshot, expect *ConfigSnapshot) { +func assertWatchChanRecvs(t *testing.T, ch <-chan ProxySnapshot, expect ProxySnapshot) { t.Helper() select { @@ -517,7 +517,7 @@ func TestManager_deliverLatest(t *testing.T) { } // test 1 buffered chan - ch1 := make(chan *ConfigSnapshot, 1) + ch1 := make(chan ProxySnapshot, 1) // Sending to an unblocked chan should work m.deliverLatest(snap1, ch1) @@ -533,7 +533,7 @@ func TestManager_deliverLatest(t *testing.T) { require.Equal(t, snap2, <-ch1) // Same again for 5-buffered chan - ch5 := make(chan *ConfigSnapshot, 5) + ch5 := make(chan ProxySnapshot, 5) // Sending to an unblocked chan should work m.deliverLatest(snap1, ch5) diff --git a/agent/proxycfg/proxysnapshot.go b/agent/proxycfg/proxysnapshot.go new file mode 100644 index 0000000000..fb1c79183c --- /dev/null +++ b/agent/proxycfg/proxysnapshot.go @@ -0,0 +1,76 @@ +package proxycfg + +import ( + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/logging" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// ProxySnapshot is an abstraction that allows interchangeability between +// Catalog V1 ConfigSnapshot and Catalog V2 ProxyState. +type ProxySnapshot interface { + AllowEmptyListeners() bool + AllowEmptyRoutes() bool + AllowEmptyClusters() bool + Authorize(authz acl.Authorizer) error + LoggerName() string +} + +// The below functions are added to ConfigSnapshot to allow it to conform to +// the ProxySnapshot interface. +func (s *ConfigSnapshot) AllowEmptyListeners() bool { + // Ingress and API gateways are allowed to inform LDS of no listeners. + return s.Kind == structs.ServiceKindIngressGateway || + s.Kind == structs.ServiceKindAPIGateway +} + +func (s *ConfigSnapshot) AllowEmptyRoutes() bool { + // Ingress and API gateways are allowed to inform RDS of no routes. + return s.Kind == structs.ServiceKindIngressGateway || + s.Kind == structs.ServiceKindAPIGateway +} + +func (s *ConfigSnapshot) AllowEmptyClusters() bool { + // Mesh, Ingress, API and Terminating gateways are allowed to inform CDS of no clusters. + return s.Kind == structs.ServiceKindMeshGateway || + s.Kind == structs.ServiceKindTerminatingGateway || + s.Kind == structs.ServiceKindIngressGateway || + s.Kind == structs.ServiceKindAPIGateway +} + +func (s *ConfigSnapshot) Authorize(authz acl.Authorizer) error { + var authzContext acl.AuthorizerContext + switch s.Kind { + case structs.ServiceKindConnectProxy: + s.ProxyID.EnterpriseMeta.FillAuthzContext(&authzContext) + if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(s.Proxy.DestinationServiceName, &authzContext); err != nil { + return status.Errorf(codes.PermissionDenied, err.Error()) + } + case structs.ServiceKindMeshGateway, structs.ServiceKindTerminatingGateway, structs.ServiceKindIngressGateway, structs.ServiceKindAPIGateway: + s.ProxyID.EnterpriseMeta.FillAuthzContext(&authzContext) + if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(s.Service, &authzContext); err != nil { + return status.Errorf(codes.PermissionDenied, err.Error()) + } + default: + return status.Errorf(codes.Internal, "Invalid service kind") + } + + // Authed OK! + return nil +} + +func (s *ConfigSnapshot) LoggerName() string { + switch s.Kind { + case structs.ServiceKindConnectProxy: + case structs.ServiceKindTerminatingGateway: + return logging.TerminatingGateway + case structs.ServiceKindMeshGateway: + return logging.MeshGateway + case structs.ServiceKindIngressGateway: + return logging.IngressGateway + } + + return "" +} diff --git a/agent/proxycfg/proxysnapshot_test.go b/agent/proxycfg/proxysnapshot_test.go new file mode 100644 index 0000000000..9a61a7f1c6 --- /dev/null +++ b/agent/proxycfg/proxysnapshot_test.go @@ -0,0 +1,311 @@ +package proxycfg + +import ( + "strings" + "testing" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestConfigSnapshot_AllowEmptyClusters(t *testing.T) { + type testCase struct { + description string + cfgSnapshot *ConfigSnapshot + expectedResult bool + } + testsCases := []testCase{ + { + description: "Mesh proxies are not allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindConnectProxy}, + expectedResult: false, + }, + { + description: "Ingress gateways are allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindIngressGateway}, + expectedResult: true, + }, + { + description: "Terminating gateways are allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindTerminatingGateway}, + expectedResult: true, + }, + { + description: "API Gateways are allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindAPIGateway}, + expectedResult: true, + }, + { + description: "Mesh Gateways are allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindMeshGateway}, + expectedResult: true, + }, + } + for _, tc := range testsCases { + t.Run(tc.description, func(t *testing.T) { + require.Equal(t, tc.expectedResult, tc.cfgSnapshot.AllowEmptyClusters()) + }) + } +} + +func TestConfigSnapshot_AllowEmptyListeners(t *testing.T) { + type testCase struct { + description string + cfgSnapshot *ConfigSnapshot + expectedResult bool + } + testsCases := []testCase{ + { + description: "Mesh proxies are not allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindConnectProxy}, + expectedResult: false, + }, + { + description: "Ingress gateways are allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindIngressGateway}, + expectedResult: true, + }, + { + description: "Terminating gateways are not allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindTerminatingGateway}, + expectedResult: false, + }, + { + description: "API Gateways are allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindAPIGateway}, + expectedResult: true, + }, + { + description: "Mesh Gateways are not allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindMeshGateway}, + expectedResult: false, + }, + } + for _, tc := range testsCases { + t.Run(tc.description, func(t *testing.T) { + require.Equal(t, tc.expectedResult, tc.cfgSnapshot.AllowEmptyListeners()) + }) + } +} + +func TestConfigSnapshot_AllowEmptyRoutes(t *testing.T) { + type testCase struct { + description string + cfgSnapshot *ConfigSnapshot + expectedResult bool + } + testsCases := []testCase{ + { + description: "Mesh proxies are not allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindConnectProxy}, + expectedResult: false, + }, + { + description: "Ingress gateways are allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindIngressGateway}, + expectedResult: true, + }, + { + description: "Terminating gateways are not allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindTerminatingGateway}, + expectedResult: false, + }, + { + description: "API Gateways are allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindAPIGateway}, + expectedResult: true, + }, + { + description: "Mesh Gateways are not allowed", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindMeshGateway}, + expectedResult: false, + }, + } + for _, tc := range testsCases { + t.Run(tc.description, func(t *testing.T) { + require.Equal(t, tc.expectedResult, tc.cfgSnapshot.AllowEmptyRoutes()) + }) + } +} + +func TestConfigSnapshot_LoggerName(t *testing.T) { + type testCase struct { + description string + cfgSnapshot *ConfigSnapshot + expectedResult string + } + testsCases := []testCase{ + { + description: "Mesh proxies have a logger named ''", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindConnectProxy}, + expectedResult: "", + }, + { + description: "Ingress gateways have a logger named 'ingress_gateway'", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindIngressGateway}, + expectedResult: "ingress_gateway", + }, + { + description: "Terminating gateways have a logger named 'terminating_gateway'", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindTerminatingGateway}, + expectedResult: "terminating_gateway", + }, + { + description: "API Gateways have a logger named ''", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindAPIGateway}, + expectedResult: "", + }, + { + description: "Mesh Gateways have a logger named 'mesh_gateway'", + cfgSnapshot: &ConfigSnapshot{Kind: structs.ServiceKindMeshGateway}, + expectedResult: "mesh_gateway", + }, + } + for _, tc := range testsCases { + t.Run(tc.description, func(t *testing.T) { + require.Equal(t, tc.expectedResult, tc.cfgSnapshot.LoggerName()) + }) + } +} + +func TestConfigSnapshot_Authorize(t *testing.T) { + type testCase struct { + description string + cfgSnapshot *ConfigSnapshot + configureAuthorizer func(authorizer *acl.MockAuthorizer) + expectedErrorMessage string + } + testsCases := []testCase{ + { + description: "ConnectProxy - if service write is allowed for the DestinationService then allow.", + cfgSnapshot: &ConfigSnapshot{ + Kind: structs.ServiceKindConnectProxy, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "DestinationServiceName", + }, + }, + expectedErrorMessage: "", + configureAuthorizer: func(authz *acl.MockAuthorizer) { + authz.On("ServiceWrite", "DestinationServiceName", mock.Anything).Return(acl.Allow) + }, + }, + { + description: "ConnectProxy - if service write is not allowed for the DestinationService then deny.", + cfgSnapshot: &ConfigSnapshot{ + Kind: structs.ServiceKindConnectProxy, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "DestinationServiceName", + }, + }, + expectedErrorMessage: "rpc error: code = PermissionDenied desc = Permission denied: token with AccessorID '' lacks permission 'service:write' on \"DestinationServiceName\"", + configureAuthorizer: func(authz *acl.MockAuthorizer) { + authz.On("ServiceWrite", "DestinationServiceName", mock.Anything).Return(acl.Deny) + }, + }, + { + description: "Mesh Gateway - if service write is allowed for the Service then allow.", + cfgSnapshot: &ConfigSnapshot{ + Kind: structs.ServiceKindMeshGateway, + Service: "Service", + }, + expectedErrorMessage: "", + configureAuthorizer: func(authz *acl.MockAuthorizer) { + authz.On("ServiceWrite", "Service", mock.Anything).Return(acl.Allow) + }, + }, + { + description: "Mesh Gateway - if service write is not allowed for the Service then deny.", + cfgSnapshot: &ConfigSnapshot{ + Kind: structs.ServiceKindMeshGateway, + Service: "Service", + }, + expectedErrorMessage: "rpc error: code = PermissionDenied desc = Permission denied: token with AccessorID '' lacks permission 'service:write' on \"Service\"", + configureAuthorizer: func(authz *acl.MockAuthorizer) { + authz.On("ServiceWrite", "Service", mock.Anything).Return(acl.Deny) + }, + }, + { + description: "Terminating Gateway - if service write is allowed for the Service then allow.", + cfgSnapshot: &ConfigSnapshot{ + Kind: structs.ServiceKindTerminatingGateway, + Service: "Service", + }, + expectedErrorMessage: "rpc error: code = PermissionDenied desc = Permission denied: token with AccessorID '' lacks permission 'service:write' on \"Service\"", + configureAuthorizer: func(authz *acl.MockAuthorizer) { + authz.On("ServiceWrite", "Service", mock.Anything).Return(acl.Deny) + }, + }, + { + description: "Terminating Gateway - if service write is not allowed for the Service then deny.", + cfgSnapshot: &ConfigSnapshot{ + Kind: structs.ServiceKindTerminatingGateway, + Service: "Service", + }, + expectedErrorMessage: "rpc error: code = PermissionDenied desc = Permission denied: token with AccessorID '' lacks permission 'service:write' on \"Service\"", + configureAuthorizer: func(authz *acl.MockAuthorizer) { + authz.On("ServiceWrite", "Service", mock.Anything).Return(acl.Deny) + }, + }, + { + description: "Ingress Gateway - if service write is allowed for the Service then allow.", + cfgSnapshot: &ConfigSnapshot{ + Kind: structs.ServiceKindIngressGateway, + Service: "Service", + }, + expectedErrorMessage: "rpc error: code = PermissionDenied desc = Permission denied: token with AccessorID '' lacks permission 'service:write' on \"Service\"", + configureAuthorizer: func(authz *acl.MockAuthorizer) { + authz.On("ServiceWrite", "Service", mock.Anything).Return(acl.Deny) + }, + }, + { + description: "Ingress Gateway - if service write is not allowed for the Service then deny.", + cfgSnapshot: &ConfigSnapshot{ + Kind: structs.ServiceKindIngressGateway, + Service: "Service", + }, + expectedErrorMessage: "rpc error: code = PermissionDenied desc = Permission denied: token with AccessorID '' lacks permission 'service:write' on \"Service\"", + configureAuthorizer: func(authz *acl.MockAuthorizer) { + authz.On("ServiceWrite", "Service", mock.Anything).Return(acl.Deny) + }, + }, + { + description: "API Gateway - if service write is allowed for the Service then allow.", + cfgSnapshot: &ConfigSnapshot{ + Kind: structs.ServiceKindAPIGateway, + Service: "Service", + }, + expectedErrorMessage: "rpc error: code = PermissionDenied desc = Permission denied: token with AccessorID '' lacks permission 'service:write' on \"Service\"", + configureAuthorizer: func(authz *acl.MockAuthorizer) { + authz.On("ServiceWrite", "Service", mock.Anything).Return(acl.Deny) + }, + }, + { + description: "API Gateway - if service write is not allowed for the Service then deny.", + cfgSnapshot: &ConfigSnapshot{ + Kind: structs.ServiceKindAPIGateway, + Service: "Service", + }, + expectedErrorMessage: "rpc error: code = PermissionDenied desc = Permission denied: token with AccessorID '' lacks permission 'service:write' on \"Service\"", + configureAuthorizer: func(authz *acl.MockAuthorizer) { + authz.On("ServiceWrite", "Service", mock.Anything).Return(acl.Deny) + }, + }, + } + for _, tc := range testsCases { + t.Run(tc.description, func(t *testing.T) { + authz := &acl.MockAuthorizer{} + authz.On("ToAllow").Return(acl.AllowAuthorizer{Authorizer: authz}) + tc.configureAuthorizer(authz) + err := tc.cfgSnapshot.Authorize(authz) + errMsg := "" + if err != nil { + errMsg = err.Error() + } + // using contains because Enterprise tests append the parition and namespace + // information to the message. + require.True(t, strings.Contains(errMsg, tc.expectedErrorMessage)) + }) + } +} diff --git a/agent/proxycfg_test.go b/agent/proxycfg_test.go index 74584331ae..c51d9a64bc 100644 --- a/agent/proxycfg_test.go +++ b/agent/proxycfg_test.go @@ -5,6 +5,8 @@ package agent import ( "encoding/json" + "github.com/hashicorp/consul/internal/mesh" + rtest "github.com/hashicorp/consul/internal/resource/resourcetest" "net/http" "net/http/httptest" "testing" @@ -62,7 +64,7 @@ func TestAgent_local_proxycfg(t *testing.T) { var ( firstTime = true - ch <-chan *proxycfg.ConfigSnapshot + ch <-chan proxycfg.ProxySnapshot stc limiter.SessionTerminatedChan cancel proxycfg.CancelFunc ) @@ -85,7 +87,7 @@ func TestAgent_local_proxycfg(t *testing.T) { // Prior to fixes in https://github.com/hashicorp/consul/pull/16497 // this call to Watch() would deadlock. var err error - ch, stc, cancel, err = cfg.Watch(sid, a.config.NodeName, token) + ch, stc, cancel, err = cfg.Watch(rtest.Resource(mesh.ProxyConfigurationType, sid.ID).ID(), a.config.NodeName, token) require.NoError(t, err) } select { diff --git a/agent/xds/delta.go b/agent/xds/delta.go index d44b84fd4e..7d345ff770 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -8,6 +8,10 @@ import ( "encoding/hex" "errors" "fmt" + "github.com/hashicorp/consul/agent/xds/configfetcher" + "github.com/hashicorp/consul/agent/xdsv2" + "github.com/hashicorp/consul/internal/mesh" + "github.com/hashicorp/consul/proto-public/pbresource" "strconv" "sync" "sync/atomic" @@ -29,7 +33,6 @@ import ( external "github.com/hashicorp/consul/agent/grpc-external" "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/proxycfg" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/extensionruntime" "github.com/hashicorp/consul/envoyextensions/extensioncommon" "github.com/hashicorp/consul/envoyextensions/xdscommon" @@ -84,6 +87,40 @@ func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error { return err } +// getEnvoyConfiguration is a utility function that instantiates the proper +// Envoy resource generator based on whether it was passed a ConfigSource or +// ProxyState implementation of the ProxySnapshot interface and returns the +// generated Envoy configuration. +func getEnvoyConfiguration(proxySnapshot proxycfg.ProxySnapshot, logger hclog.Logger, cfgFetcher configfetcher.ConfigFetcher) (map[string][]proto.Message, error) { + switch proxySnapshot.(type) { + case *proxycfg.ConfigSnapshot: + logger.Trace("ProxySnapshot update channel received a ProxySnapshot of type ConfigSnapshot", + "proxySnapshot", proxySnapshot, + ) + generator := NewResourceGenerator( + logger, + cfgFetcher, + true, + ) + + c := proxySnapshot.(*proxycfg.ConfigSnapshot) + logger.Trace("ConfigSnapshot", c) + return generator.AllResourcesFromSnapshot(c) + case *mesh.ProxyState: + logger.Trace("ProxySnapshot update channel received a ProxySnapshot of type ProxyState", + "proxySnapshot", proxySnapshot, + ) + generator := xdsv2.NewResourceGenerator( + logger, + ) + c := proxySnapshot.(*mesh.ProxyState) + logger.Trace("ProxyState", c) + return generator.AllResourcesFromIR(c) + default: + return nil, errors.New("proxysnapshot must be of type ProxyState or ConfigSnapshot") + } +} + const ( stateDeltaInit int = iota stateDeltaPendingInitialConfig @@ -98,14 +135,13 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove // Loop state var ( - cfgSnap *proxycfg.ConfigSnapshot - node *envoy_config_core_v3.Node - stateCh <-chan *proxycfg.ConfigSnapshot - drainCh limiter.SessionTerminatedChan - watchCancel func() - proxyID structs.ServiceID - nonce uint64 // xDS requires a unique nonce to correlate response/request pairs - ready bool // set to true after the first snapshot arrives + proxySnapshot proxycfg.ProxySnapshot + node *envoy_config_core_v3.Node + stateCh <-chan proxycfg.ProxySnapshot + drainCh limiter.SessionTerminatedChan + watchCancel func() + nonce uint64 // xDS requires a unique nonce to correlate response/request pairs + ready bool // set to true after the first snapshot arrives streamStartTime = time.Now() streamStartOnce sync.Once @@ -123,36 +159,24 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove currentVersions = make(map[string]map[string]string) ) - generator := NewResourceGenerator( - s.Logger.Named(logging.XDS).With("xdsVersion", "v3"), - s.CfgFetcher, - true, - ) + logger := s.Logger.Named(logging.XDS).With("xdsVersion", "v3") // need to run a small state machine to get through initial authentication. var state = stateDeltaInit // Configure handlers for each type of request we currently care about. handlers := map[string]*xDSDeltaType{ - xdscommon.ListenerType: newDeltaType(generator, stream, xdscommon.ListenerType, func(kind structs.ServiceKind) bool { - // Ingress and API gateways are allowed to inform LDS of no listeners. - return cfgSnap.Kind == structs.ServiceKindIngressGateway || - cfgSnap.Kind == structs.ServiceKindAPIGateway + xdscommon.ListenerType: newDeltaType(logger, stream, xdscommon.ListenerType, func() bool { + return proxySnapshot.AllowEmptyListeners() }), - xdscommon.RouteType: newDeltaType(generator, stream, xdscommon.RouteType, func(kind structs.ServiceKind) bool { - // Ingress and API gateways are allowed to inform RDS of no routes. - return cfgSnap.Kind == structs.ServiceKindIngressGateway || - cfgSnap.Kind == structs.ServiceKindAPIGateway + xdscommon.RouteType: newDeltaType(logger, stream, xdscommon.RouteType, func() bool { + return proxySnapshot.AllowEmptyRoutes() }), - xdscommon.ClusterType: newDeltaType(generator, stream, xdscommon.ClusterType, func(kind structs.ServiceKind) bool { - // Mesh, Ingress, API and Terminating gateways are allowed to inform CDS of no clusters. - return cfgSnap.Kind == structs.ServiceKindMeshGateway || - cfgSnap.Kind == structs.ServiceKindTerminatingGateway || - cfgSnap.Kind == structs.ServiceKindIngressGateway || - cfgSnap.Kind == structs.ServiceKindAPIGateway + xdscommon.ClusterType: newDeltaType(logger, stream, xdscommon.ClusterType, func() bool { + return proxySnapshot.AllowEmptyClusters() }), - xdscommon.EndpointType: newDeltaType(generator, stream, xdscommon.EndpointType, nil), - xdscommon.SecretType: newDeltaType(generator, stream, xdscommon.SecretType, nil), // TODO allowEmptyFn + xdscommon.EndpointType: newDeltaType(logger, stream, xdscommon.EndpointType, nil), + xdscommon.SecretType: newDeltaType(logger, stream, xdscommon.SecretType, nil), // TODO allowEmptyFn } // Endpoints are stored within a Cluster (and Routes @@ -178,19 +202,19 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove authTimer = time.After(s.AuthCheckFrequency) } - checkStreamACLs := func(cfgSnap *proxycfg.ConfigSnapshot) error { - return s.authorize(stream.Context(), cfgSnap) + checkStreamACLs := func(proxySnap proxycfg.ProxySnapshot) error { + return s.authorize(stream.Context(), proxySnap) } for { select { case <-drainCh: - generator.Logger.Debug("draining stream to rebalance load") + logger.Debug("draining stream to rebalance load") metrics.IncrCounter([]string{"xds", "server", "streamDrained"}, 1) return errOverwhelmed case <-authTimer: // It's been too long since a Discovery{Request,Response} so recheck ACLs. - if err := checkStreamACLs(cfgSnap); err != nil { + if err := checkStreamACLs(proxySnapshot); err != nil { return err } extendAuthTimer() @@ -204,28 +228,29 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove return nil } - generator.logTraceRequest("Incremental xDS v3", req) + logTraceRequest(logger, "Incremental xDS v3", req) if req.TypeUrl == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } + var proxyFeatures xdscommon.SupportedProxyFeatures if node == nil && req.Node != nil { node = req.Node var err error - generator.ProxyFeatures, err = xdscommon.DetermineSupportedProxyFeatures(req.Node) + proxyFeatures, err = xdscommon.DetermineSupportedProxyFeatures(req.Node) if err != nil { return status.Errorf(codes.InvalidArgument, err.Error()) } } if handler, ok := handlers[req.TypeUrl]; ok { - switch handler.Recv(req, generator.ProxyFeatures) { + switch handler.Recv(req, proxyFeatures) { case deltaRecvNewSubscription: - generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl) + logger.Trace("subscribing to type", "typeUrl", req.TypeUrl) case deltaRecvResponseNack: - generator.Logger.Trace("got nack response for type", "typeUrl", req.TypeUrl) + logger.Trace("got nack response for type", "typeUrl", req.TypeUrl) // There is no reason to believe that generating new xDS resources from the same snapshot // would lead to an ACK from Envoy. Instead we continue to the top of this for loop and wait @@ -244,21 +269,21 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove // would've already exited this loop. return status.Error(codes.Aborted, "xDS stream terminated due to an irrecoverable error, please try again") } - cfgSnap = cs + proxySnapshot = cs - newRes, err := generator.AllResourcesFromSnapshot(cfgSnap) + newRes, err := getEnvoyConfiguration(proxySnapshot, logger, s.CfgFetcher) if err != nil { return status.Errorf(codes.Unavailable, "failed to generate all xDS resources from the snapshot: %v", err) } // index and hash the xDS structures - newResourceMap := xdscommon.IndexResources(generator.Logger, newRes) + newResourceMap := xdscommon.IndexResources(logger, newRes) if s.ResourceMapMutateFn != nil { s.ResourceMapMutateFn(newResourceMap) } - if newResourceMap, err = s.applyEnvoyExtensions(newResourceMap, cfgSnap, node); err != nil { + if newResourceMap, err = s.applyEnvoyExtensions(newResourceMap, proxySnapshot, node); err != nil { // err is already the result of calling status.Errorf return err } @@ -292,7 +317,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove } // Start authentication process, we need the proxyID - proxyID = structs.NewServiceID(node.Id, parseEnterpriseMeta(node)) + proxyID := newResourceIDFromEnvoyNode(node) // Start watching config for that proxy var err error @@ -306,7 +331,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove case errors.Is(err, limiter.ErrCapacityReached): return errOverwhelmed case err != nil: - return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err) + return status.Errorf(codes.Internal, "failed to watch proxy: %s", err) } // Note that in this case we _intend_ the defer to only be triggered when // this whole process method ends (i.e. when streaming RPC aborts) not at @@ -315,14 +340,14 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove // state machine. defer watchCancel() - generator.Logger = generator.Logger.With("service_id", proxyID.String()) // enhance future logs + logger = logger.With("service_id", proxyID.Name) // enhance future logs - generator.Logger.Trace("watching proxy, pending initial proxycfg snapshot for xDS") + logger.Trace("watching proxy, pending initial proxycfg snapshot for xDS") // Now wait for the config so we can check ACL state = stateDeltaPendingInitialConfig case stateDeltaPendingInitialConfig: - if cfgSnap == nil { + if proxySnapshot == nil { // Nothing we can do until we get the initial config continue } @@ -331,23 +356,18 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove state = stateDeltaRunning // Upgrade the logger - switch cfgSnap.Kind { - case structs.ServiceKindConnectProxy: - case structs.ServiceKindTerminatingGateway: - generator.Logger = generator.Logger.Named(logging.TerminatingGateway) - case structs.ServiceKindMeshGateway: - generator.Logger = generator.Logger.Named(logging.MeshGateway) - case structs.ServiceKindIngressGateway: - generator.Logger = generator.Logger.Named(logging.IngressGateway) + loggerName := proxySnapshot.LoggerName() + if loggerName != "" { + logger = logger.Named(loggerName) } - generator.Logger.Trace("Got initial config snapshot") + logger.Trace("Got initial config snapshot") // Let's actually process the config we just got, or we'll miss responding fallthrough case stateDeltaRunning: // Check ACLs on every Discovery{Request,Response}. - if err := checkStreamACLs(cfgSnap); err != nil { + if err := checkStreamACLs(proxySnapshot); err != nil { return err } // For the first time through the state machine, this is when the @@ -355,11 +375,11 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove extendAuthTimer() if !ready { - generator.Logger.Trace("Skipping delta computation because we haven't gotten a snapshot yet") + logger.Trace("Skipping delta computation because we haven't gotten a snapshot yet") continue } - generator.Logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any") + logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any") streamStartOnce.Do(func() { metrics.MeasureSince([]string{"xds", "server", "streamStart"}, streamStartTime) @@ -368,7 +388,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove for _, op := range xDSUpdateOrder { if op.TypeUrl == xdscommon.ListenerType || op.TypeUrl == xdscommon.RouteType { if clusterHandler := handlers[xdscommon.ClusterType]; clusterHandler.registered && len(clusterHandler.pendingUpdates) > 0 { - generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending", + logger.Trace("Skipping delta computation for resource because there are dependent updates pending", "typeUrl", op.TypeUrl, "dependent", xdscommon.ClusterType) // Receiving an ACK from Envoy will unblock the select statement above, @@ -376,7 +396,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove break } if endpointHandler := handlers[xdscommon.EndpointType]; endpointHandler.registered && len(endpointHandler.pendingUpdates) > 0 { - generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending", + logger.Trace("Skipping delta computation for resource because there are dependent updates pending", "typeUrl", op.TypeUrl, "dependent", xdscommon.EndpointType) // Receiving an ACK from Envoy will unblock the select statement above, @@ -384,14 +404,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove break } } - err, _ := handlers[op.TypeUrl].SendIfNew( - cfgSnap.Kind, - currentVersions[op.TypeUrl], - resourceMap, - &nonce, - op.Upsert, - op.Remove, - ) + err, _ := handlers[op.TypeUrl].SendIfNew(currentVersions[op.TypeUrl], resourceMap, &nonce, op.Upsert, op.Remove) if err != nil { return status.Errorf(codes.Unavailable, "failed to send %sreply for type %q: %v", @@ -403,7 +416,37 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove } } -func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, cfgSnap *proxycfg.ConfigSnapshot, node *envoy_config_core_v3.Node) (*xdscommon.IndexedResources, error) { +// newResourceIDFromEnvoyNode is a utility function that allows creating a +// Resource ID from an Envoy proxy node so that existing delta calls can easily +// use ProxyWatcher interface arguments for Watch(). +func newResourceIDFromEnvoyNode(node *envoy_config_core_v3.Node) *pbresource.ID { + entMeta := parseEnterpriseMeta(node) + + return &pbresource.ID{ + Name: node.Id, + Tenancy: &pbresource.Tenancy{ + Namespace: entMeta.NamespaceOrDefault(), + Partition: entMeta.PartitionOrDefault(), + }, + Type: mesh.ProxyStateTemplateConfigurationV1Alpha1Type, + } +} + +func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, proxySnapshot proxycfg.ProxySnapshot, node *envoy_config_core_v3.Node) (*xdscommon.IndexedResources, error) { + // TODO(proxystate) + // This is a workaround for now as envoy extensions are not yet supported with ProxyState. + // For now, we cast to proxycfg.ConfigSnapshot and no-op if it's the pbmesh.ProxyState type. + var snapshot *proxycfg.ConfigSnapshot + switch proxySnapshot.(type) { + //TODO(proxystate): implement envoy extensions for ProxyState + case *mesh.ProxyState: + return resources, nil + case *proxycfg.ConfigSnapshot: + snapshot = proxySnapshot.(*proxycfg.ConfigSnapshot) + default: + return nil, status.Errorf(codes.InvalidArgument, + "unsupported config snapshot type to apply envoy extensions to %T", proxySnapshot) + } var err error envoyVersion := xdscommon.DetermineEnvoyVersionFromNode(node) consulVersion, err := goversion.NewVersion(version.Version) @@ -412,10 +455,10 @@ func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, cfg return nil, status.Errorf(codes.InvalidArgument, "failed to parse Consul version") } - serviceConfigs := extensionruntime.GetRuntimeConfigurations(cfgSnap) + serviceConfigs := extensionruntime.GetRuntimeConfigurations(snapshot) for _, cfgs := range serviceConfigs { for _, cfg := range cfgs { - resources, err = validateAndApplyEnvoyExtension(s.Logger, cfgSnap, resources, cfg, envoyVersion, consulVersion) + resources, err = validateAndApplyEnvoyExtension(s.Logger, snapshot, resources, cfg, envoyVersion, consulVersion) if err != nil { return nil, err @@ -625,10 +668,10 @@ type xDSDeltaChild struct { } type xDSDeltaType struct { - generator *ResourceGenerator + logger hclog.Logger stream ADSDeltaStream typeURL string - allowEmptyFn func(kind structs.ServiceKind) bool + allowEmptyFn func() bool // deltaChild contains data for an xDS child type if there is one. // For example, endpoints are a child type of clusters. @@ -677,13 +720,13 @@ type PendingUpdate struct { } func newDeltaType( - generator *ResourceGenerator, + logger hclog.Logger, stream ADSDeltaStream, typeUrl string, - allowEmptyFn func(kind structs.ServiceKind) bool, + allowEmptyFn func() bool, ) *xDSDeltaType { return &xDSDeltaType{ - generator: generator, + logger: logger, stream: stream, typeURL: typeUrl, allowEmptyFn: allowEmptyFn, @@ -700,7 +743,6 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf xd if t == nil { return deltaRecvUnknownType // not something we care about } - logger := t.generator.Logger.With("typeUrl", t.typeURL) registeredThisTime := false if !t.registered { @@ -737,10 +779,10 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf xd response_nonce, with presence of error_detail making it a NACK). */ if req.ErrorDetail == nil { - logger.Trace("got ok response from envoy proxy", "nonce", req.ResponseNonce) + t.logger.Trace("got ok response from envoy proxy", "nonce", req.ResponseNonce) t.ack(req.ResponseNonce) } else { - logger.Error("got error response from envoy proxy", "nonce", req.ResponseNonce, + t.logger.Error("got error response from envoy proxy", "nonce", req.ResponseNonce, "error", status.ErrorProto(req.ErrorDetail)) t.nack(req.ResponseNonce) return deltaRecvResponseNack @@ -756,7 +798,7 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf xd the client already possesses, using the initial_resource_versions field. */ - logger.Trace("setting initial resource versions for stream", + t.logger.Trace("setting initial resource versions for stream", "resources", req.InitialResourceVersions) t.resourceVersions = req.InitialResourceVersions if !t.wildcard { @@ -807,9 +849,9 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf xd } if alreadySubscribed { - logger.Trace("re-subscribing resource for stream", "resource", name) + t.logger.Trace("re-subscribing resource for stream", "resource", name) } else { - logger.Trace("subscribing resource for stream", "resource", name) + t.logger.Trace("subscribing resource for stream", "resource", name) } } @@ -818,7 +860,7 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf xd continue } delete(t.subscriptions, name) - logger.Trace("unsubscribing resource for stream", "resource", name) + t.logger.Trace("unsubscribing resource for stream", "resource", name) // NOTE: we'll let the normal differential comparison handle cleaning up resourceVersions } } @@ -852,7 +894,6 @@ func (t *xDSDeltaType) nack(nonce string) { } func (t *xDSDeltaType) SendIfNew( - kind structs.ServiceKind, currentVersions map[string]string, // type => name => version (as consul knows right now) resourceMap *xdscommon.IndexedResources, nonce *uint64, @@ -867,9 +908,9 @@ func (t *xDSDeltaType) SendIfNew( return nil, false } - logger := t.generator.Logger.With("typeUrl", t.typeURL) + logger := t.logger.With("typeUrl", t.typeURL) - allowEmpty := t.allowEmptyFn != nil && t.allowEmptyFn(kind) + allowEmpty := t.allowEmptyFn != nil && t.allowEmptyFn() // Zero length resource responses should be ignored and are the result of no // data yet. Notice that this caused a bug originally where we had zero @@ -894,7 +935,7 @@ func (t *xDSDeltaType) SendIfNew( *nonce++ resp.Nonce = fmt.Sprintf("%08x", *nonce) - t.generator.logTraceResponse("Incremental xDS v3", resp) + logTraceResponse(t.logger, "Incremental xDS v3", resp) logger.Trace("sending response", "nonce", resp.Nonce) if err := t.stream.Send(resp); err != nil { @@ -1046,7 +1087,7 @@ func (t *xDSDeltaType) ensureChildResend(parentName, childName string) { return } - t.generator.Logger.Trace( + t.logger.Trace( "triggering implicit update of resource", "typeUrl", t.typeURL, "resource", parentName, diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index f324e91934..d93d06271b 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -6,6 +6,7 @@ package xds import ( "errors" "fmt" + "github.com/hashicorp/consul/envoyextensions/xdscommon" "strconv" "strings" "sync" @@ -17,6 +18,15 @@ import ( envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/grpc-external/limiter" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/envoyextensions/extensioncommon" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/sdk/testutil/retry" + "github.com/hashicorp/consul/version" "github.com/hashicorp/go-hclog" goversion "github.com/hashicorp/go-version" "github.com/stretchr/testify/require" @@ -24,17 +34,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" - - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/grpc-external/limiter" - "github.com/hashicorp/consul/agent/proxycfg" - "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/envoyextensions/extensioncommon" - "github.com/hashicorp/consul/envoyextensions/xdscommon" - "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/consul/sdk/testutil/retry" - "github.com/hashicorp/consul/version" ) // NOTE: For these tests, prefer not using xDS protobuf "factory" methods if diff --git a/agent/xds/protocol_trace.go b/agent/xds/protocol_trace.go index 76d4e8e6ee..cfe905d70b 100644 --- a/agent/xds/protocol_trace.go +++ b/agent/xds/protocol_trace.go @@ -5,22 +5,23 @@ package xds import ( envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/hashicorp/go-hclog" "github.com/mitchellh/copystructure" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) -func (s *ResourceGenerator) logTraceRequest(msg string, pb proto.Message) { - s.logTraceProto(msg, pb, false) +func logTraceRequest(logger hclog.Logger, msg string, pb proto.Message) { + logTraceProto(logger, msg, pb, false) } -func (s *ResourceGenerator) logTraceResponse(msg string, pb proto.Message) { - s.logTraceProto(msg, pb, true) +func logTraceResponse(logger hclog.Logger, msg string, pb proto.Message) { + logTraceProto(logger, msg, pb, true) } -func (s *ResourceGenerator) logTraceProto(msg string, pb proto.Message, response bool) { - if !s.Logger.IsTrace() { +func logTraceProto(logger hclog.Logger, msg string, pb proto.Message, response bool) { + if !logger.IsTrace() { return } @@ -55,5 +56,5 @@ func (s *ResourceGenerator) logTraceProto(msg string, pb proto.Message, response out = string(outBytes) } - s.Logger.Trace(msg, "direction", dir, "protobuf", out) + logger.Trace(msg, "direction", dir, "protobuf", out) } diff --git a/agent/xds/proxystateconverter/converter.go b/agent/xds/proxystateconverter/converter.go index 4cf9b95151..191b9084b8 100644 --- a/agent/xds/proxystateconverter/converter.go +++ b/agent/xds/proxystateconverter/converter.go @@ -5,21 +5,21 @@ package proxystateconverter import ( "fmt" - - "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/internal/mesh" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/configfetcher" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" + "github.com/hashicorp/go-hclog" ) // Converter converts a single snapshot into a ProxyState. type Converter struct { Logger hclog.Logger CfgFetcher configfetcher.ConfigFetcher - proxyState *pbmesh.ProxyState + proxyState *mesh.ProxyState } func NewConverter( @@ -29,16 +29,18 @@ func NewConverter( return &Converter{ Logger: logger, CfgFetcher: cfgFetcher, - proxyState: &pbmesh.ProxyState{ - Listeners: make([]*pbproxystate.Listener, 0), - Clusters: make(map[string]*pbproxystate.Cluster), - Routes: make(map[string]*pbproxystate.Route), - Endpoints: make(map[string]*pbproxystate.Endpoints), + proxyState: &mesh.ProxyState{ + ProxyState: &pbmesh.ProxyState{ + Listeners: make([]*pbproxystate.Listener, 0), + Clusters: make(map[string]*pbproxystate.Cluster), + Routes: make(map[string]*pbproxystate.Route), + Endpoints: make(map[string]*pbproxystate.Endpoints), + }, }, } } -func (g *Converter) ProxyStateFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) (*pbmesh.ProxyState, error) { +func (g *Converter) ProxyStateFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) (*mesh.ProxyState, error) { err := g.resourcesFromSnapshot(cfgSnap) if err != nil { return nil, fmt.Errorf("failed to generate FullProxyState: %v", err) @@ -66,7 +68,6 @@ func (g *Converter) resourcesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) erro if err != nil { return err } - err = g.routesFromSnapshot(cfgSnap) if err != nil { return err diff --git a/agent/xds/proxystateconverter/endpoints.go b/agent/xds/proxystateconverter/endpoints.go index 28156a9b03..78ebe95e2d 100644 --- a/agent/xds/proxystateconverter/endpoints.go +++ b/agent/xds/proxystateconverter/endpoints.go @@ -404,7 +404,7 @@ func (s *Converter) endpointsFromDiscoveryChain( clusterEndpoints := make(map[string][]*pbproxystate.Endpoint) - // TODO(jm): escape hatches will be implemented in the future + // TODO(proxystate): escape hatches will be implemented in the future //var escapeHatchCluster *pbproxystate.Cluster //if !forMeshGateway { @@ -465,7 +465,7 @@ func (s *Converter) endpointsFromDiscoveryChain( for _, groupedTarget := range targetGroups { clusterName := groupedTarget.ClusterName - // TODO(jm): escape hatches will be implemented in the future + // TODO(proxystate): escape hatches will be implemented in the future //if escapeHatchCluster != nil { // clusterName = escapeHatchCluster.Name //} @@ -532,7 +532,7 @@ func makeEndpointsForLoadAssignment(cfgSnap *proxycfg.ConfigSnapshot, localKey proxycfg.GatewayKey) []*pbproxystate.Endpoint { pbEndpoints := make([]*pbproxystate.Endpoint, 0, len(endpointGroups)) - // TODO(jm): make this work in xdsv2 + // TODO(proxystate): this will be added with property overrides having golden files with this //if len(endpointGroups) > 1 { // cla.Policy = &envoy_endpoint_v3.ClusterLoadAssignment_Policy{ // // We choose such a large value here that the failover math should @@ -567,7 +567,7 @@ func makeEndpointsForLoadAssignment(cfgSnap *proxycfg.ConfigSnapshot, pbEndpoints = append(pbEndpoints, endpoint) } - // TODO(jm): what do we do about priority downstream? + // TODO(proxystate): what do we do about priority downstream? //cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{ // Priority: priority, // LbEndpoints: es, diff --git a/agent/xds/server.go b/agent/xds/server.go index c8ffeef842..cc4220da28 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -6,6 +6,7 @@ package xds import ( "context" "errors" + "github.com/hashicorp/consul/proto-public/pbresource" "sync/atomic" "time" @@ -25,7 +26,6 @@ import ( external "github.com/hashicorp/consul/agent/grpc-external" "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/proxycfg" - "github.com/hashicorp/consul/agent/structs" ) var ( @@ -84,8 +84,8 @@ type ACLResolverFunc func(id string) (acl.Authorizer, error) // ProxyConfigSource is the interface xds.Server requires to consume proxy // config updates. -type ProxyConfigSource interface { - Watch(id structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) +type ProxyWatcher interface { + Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) } // Server represents a gRPC server that can handle xDS requests from Envoy. All @@ -96,7 +96,7 @@ type ProxyConfigSource interface { type Server struct { NodeName string Logger hclog.Logger - CfgSrc ProxyConfigSource + CfgSrc ProxyWatcher ResolveToken ACLResolverFunc CfgFetcher configfetcher.ConfigFetcher @@ -147,7 +147,7 @@ func (c *activeStreamCounters) Increment(ctx context.Context) func() { func NewServer( nodeName string, logger hclog.Logger, - cfgMgr ProxyConfigSource, + cfgMgr ProxyWatcher, resolveTokenSecret ACLResolverFunc, cfgFetcher configfetcher.ConfigFetcher, ) *Server { @@ -202,9 +202,9 @@ func (s *Server) authenticate(ctx context.Context) (acl.Authorizer, error) { // using a token with the same permissions, and that it stores the data by // proxy ID. We assume that any data in the snapshot was already filtered, // which allows this authorization to be a shallow authorization check -// for all the data in a ConfigSnapshot. -func (s *Server) authorize(ctx context.Context, cfgSnap *proxycfg.ConfigSnapshot) error { - if cfgSnap == nil { +// for all the data in a ProxySnapshot. +func (s *Server) authorize(ctx context.Context, proxySnapshot proxycfg.ProxySnapshot) error { + if proxySnapshot == nil { return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot") } @@ -213,22 +213,5 @@ func (s *Server) authorize(ctx context.Context, cfgSnap *proxycfg.ConfigSnapshot return err } - var authzContext acl.AuthorizerContext - switch cfgSnap.Kind { - case structs.ServiceKindConnectProxy: - cfgSnap.ProxyID.EnterpriseMeta.FillAuthzContext(&authzContext) - if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(cfgSnap.Proxy.DestinationServiceName, &authzContext); err != nil { - return status.Errorf(codes.PermissionDenied, err.Error()) - } - case structs.ServiceKindMeshGateway, structs.ServiceKindTerminatingGateway, structs.ServiceKindIngressGateway, structs.ServiceKindAPIGateway: - cfgSnap.ProxyID.EnterpriseMeta.FillAuthzContext(&authzContext) - if err := authz.ToAllowAuthorizer().ServiceWriteAllowed(cfgSnap.Service, &authzContext); err != nil { - return status.Errorf(codes.PermissionDenied, err.Error()) - } - default: - return status.Errorf(codes.Internal, "Invalid service kind") - } - - // Authed OK! - return nil + return proxySnapshot.Authorize(authz) } diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index bf632217d7..671974f45b 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -4,7 +4,9 @@ package xds import ( + "github.com/hashicorp/consul/agent/proxycfg-sources/catalog" "github.com/hashicorp/consul/agent/xds/response" + "github.com/hashicorp/consul/proto-public/pbresource" "sort" "sync" "testing" @@ -72,14 +74,14 @@ func newTestSnapshot( // testing. It also implements ConnectAuthz to allow control over authorization. type testManager struct { sync.Mutex - stateChans map[structs.ServiceID]chan *proxycfg.ConfigSnapshot + stateChans map[structs.ServiceID]chan proxycfg.ProxySnapshot drainChans map[structs.ServiceID]chan struct{} cancels chan structs.ServiceID } func newTestManager(t *testing.T) *testManager { return &testManager{ - stateChans: map[structs.ServiceID]chan *proxycfg.ConfigSnapshot{}, + stateChans: map[structs.ServiceID]chan proxycfg.ProxySnapshot{}, drainChans: map[structs.ServiceID]chan struct{}{}, cancels: make(chan structs.ServiceID, 10), } @@ -89,12 +91,12 @@ func newTestManager(t *testing.T) *testManager { func (m *testManager) RegisterProxy(t *testing.T, proxyID structs.ServiceID) { m.Lock() defer m.Unlock() - m.stateChans[proxyID] = make(chan *proxycfg.ConfigSnapshot, 1) + m.stateChans[proxyID] = make(chan proxycfg.ProxySnapshot, 1) m.drainChans[proxyID] = make(chan struct{}) } // Deliver simulates a proxy registration -func (m *testManager) DeliverConfig(t *testing.T, proxyID structs.ServiceID, cfg *proxycfg.ConfigSnapshot) { +func (m *testManager) DeliverConfig(t *testing.T, proxyID structs.ServiceID, cfg proxycfg.ProxySnapshot) { t.Helper() m.Lock() defer m.Unlock() @@ -121,7 +123,10 @@ func (m *testManager) DrainStreams(proxyID structs.ServiceID) { } // Watch implements ConfigManager -func (m *testManager) Watch(proxyID structs.ServiceID, _ string, _ string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { +func (m *testManager) Watch(id *pbresource.ID, _ string, _ string) (<-chan proxycfg.ProxySnapshot, + limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { + // Create service ID + proxyID := structs.NewServiceID(id.Name, catalog.GetEnterpriseMetaFromResourceID(id)) m.Lock() defer m.Unlock() diff --git a/agent/xdsv2/endpoint_resources.go b/agent/xdsv2/endpoint_resources.go index c493c48f64..ba67364e9e 100644 --- a/agent/xdsv2/endpoint_resources.go +++ b/agent/xdsv2/endpoint_resources.go @@ -51,7 +51,6 @@ func (pr *ProxyResources) makeXDSEndpoints() ([]proto.Message, error) { endpoints := make([]proto.Message, 0) for clusterName, eps := range pr.proxyState.GetEndpoints() { - // TODO(jm): this does not seem like the best way. if clusterName != xdscommon.LocalAppClusterName { protoEndpoint := makeEnvoyClusterLoadAssignment(clusterName, eps.Endpoints) endpoints = append(endpoints, protoEndpoint) diff --git a/agent/xdsv2/resources.go b/agent/xdsv2/resources.go index 46bdd08d6c..349671ceb9 100644 --- a/agent/xdsv2/resources.go +++ b/agent/xdsv2/resources.go @@ -5,7 +5,8 @@ package xdsv2 import ( "fmt" - pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" + "github.com/hashicorp/consul/internal/mesh" + "github.com/hashicorp/go-hclog" "google.golang.org/protobuf/proto" @@ -28,11 +29,11 @@ func NewResourceGenerator( } type ProxyResources struct { - proxyState *pbmesh.ProxyState + proxyState *mesh.ProxyState envoyResources map[string][]proto.Message } -func (g *ResourceGenerator) AllResourcesFromIR(proxyState *pbmesh.ProxyState) (map[string][]proto.Message, error) { +func (g *ResourceGenerator) AllResourcesFromIR(proxyState *mesh.ProxyState) (map[string][]proto.Message, error) { pr := &ProxyResources{ proxyState: proxyState, envoyResources: make(map[string][]proto.Message), @@ -49,7 +50,6 @@ func (pr *ProxyResources) generateXDSResources() error { if err != nil { return err } - pr.envoyResources[xdscommon.ListenerType] = listeners pr.envoyResources[xdscommon.ListenerType] = listeners diff --git a/internal/mesh/proxy_state_exports.go b/internal/mesh/proxy_state_exports.go new file mode 100644 index 0000000000..2739e3a679 --- /dev/null +++ b/internal/mesh/proxy_state_exports.go @@ -0,0 +1,42 @@ +package mesh + +import ( + "github.com/hashicorp/consul/acl" + pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" +) + +// ProxyState is an implementation of the ProxySnapshot interface for pbmesh.ProxyState. +// It is a simple wrapper around pbmesh.ProxyState so that it can be used +// by the ProxyWatcher interface in XDS processing. This struct is necessary +// because pbmesh.ProxyState is a proto definition and there were complications +// adding these functions directly to that proto definition. +type ProxyState struct { + *pbmesh.ProxyState +} + +// TODO(proxystate): need to modify ProxyState to carry a type/kind (connect proxy, mesh gateway, etc.) +// for sidecar proxies, all Allow* functions +// should return false, but for different gateways we'd need to add it to IR. + +func (p *ProxyState) AllowEmptyListeners() bool { + return false +} + +func (p *ProxyState) AllowEmptyRoutes() bool { + return false +} + +func (p *ProxyState) AllowEmptyClusters() bool { + return false +} + +func (p *ProxyState) Authorize(authz acl.Authorizer) error { + // TODO(proxystate): we'll need to implement this once identity policy is implemented + + // Authed OK! + return nil +} + +func (p *ProxyState) LoggerName() string { + return "" +} diff --git a/test/integration/connect/envoy/main_test.go b/test/integration/connect/envoy/main_test.go index 52f3b74187..a1aaa060b8 100644 --- a/test/integration/connect/envoy/main_test.go +++ b/test/integration/connect/envoy/main_test.go @@ -20,7 +20,8 @@ import ( ) var ( - flagWin = flag.Bool("win", false, "Execute tests on windows") + flagWin = flag.Bool("win", false, "Execute tests on windows") + flagResourceAPIs = flag.Bool("enable-resource-apis", false, "Execute tests with resource apis enabled.") ) func TestEnvoy(t *testing.T) { @@ -31,7 +32,14 @@ func TestEnvoy(t *testing.T) { check_dir_files(dir) } - testcases, err := discoverCases() + var testcases []string + var err error + if *flagResourceAPIs == true { + os.Setenv("USE_RESOURCE_APIS", "true") + testcases, err = discoverResourceAPICases() + } else { + testcases, err = discoverCases() + } require.NoError(t, err) runCmd(t, "suite_setup") @@ -118,6 +126,33 @@ func discoverCases() ([]string, error) { return out, nil } +// discoverResourceAPICases will discover the Envoy tests case files but will contain +// a filter in it to only return those case for which functionality has been added +// to the V2 catalog resources. +func discoverResourceAPICases() ([]string, error) { + cwd, err := os.Getwd() + if err != nil { + return nil, err + } + + dirs, err := os.ReadDir(cwd) + if err != nil { + return nil, err + } + + var out []string + for _, fi := range dirs { + // TODO(proxystate): enable this to only include tests cases that are supported. + // Currently the work is in progress, so it is wired up in CI, but this excludes any tests from actually running. + if fi.IsDir() && strings.HasPrefix(fi.Name(), "case-don-match-me-on-anything-yet-because-i-am-not-ready") { + out = append(out, fi.Name()) + } + } + + sort.Strings(out) + return out, nil +} + // CRLF convert functions // Recursively iterates through the directory passed by parameter looking for the sh and bash files. // Upon finding them, it calls crlf_file_check. diff --git a/test/integration/connect/envoy/run-tests.sh b/test/integration/connect/envoy/run-tests.sh index 99c918a6ee..fa1d64312d 100755 --- a/test/integration/connect/envoy/run-tests.sh +++ b/test/integration/connect/envoy/run-tests.sh @@ -179,6 +179,14 @@ function start_consul { license=$(cat $CONSUL_LICENSE_PATH) fi + USE_RESOURCE_APIS=${USE_RESOURCE_APIS:-false} + + experiments="experiments=[]" + # set up consul to run in V1 or V2 catalog mode + if [[ "${USE_RESOURCE_APIS}" == true ]]; then + experiments="experiments=[\"resource-apis\"]" + fi + # We currently run these integration tests in two modes: one in which Envoy's # xDS sessions are served directly by a Consul server, and another in which it # goes through a client agent. @@ -262,6 +270,7 @@ function start_consul { agent -dev -datacenter "${DC}" \ -config-dir "/workdir/${DC}/consul" \ -config-dir "/workdir/${DC}/consul-server" \ + -hcl=${experiments} \ -client "0.0.0.0" >/dev/null fi } @@ -789,6 +798,9 @@ function common_run_container_gateway { function run_container_gateway-primary { common_run_container_gateway mesh-gateway primary } +function run_container_gateway-ap1 { + common_run_container_gateway mesh-gateway ap1 +} function run_container_gateway-secondary { common_run_container_gateway mesh-gateway secondary }