NET-4944 - wire up controllers with proxy tracker (#18603)

Co-authored-by: github-team-consul-core <github-team-consul-core@hashicorp.com>
This commit is contained in:
John Murret 2023-08-29 09:15:34 -06:00 committed by GitHub
parent 48c8a834f5
commit 0e606504bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 266 additions and 215 deletions

View File

@ -9,8 +9,8 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
proxytracker "github.com/hashicorp/consul/agent/proxy-tracker"
catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog" catalogproxycfg "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/hashicorp/consul/lib/stringslice" "github.com/hashicorp/consul/lib/stringslice"
"io" "io"
"net" "net"
@ -653,6 +653,47 @@ func (a *Agent) Start(ctx context.Context) error {
return fmt.Errorf("failed to start Consul enterprise component: %v", err) return fmt.Errorf("failed to start Consul enterprise component: %v", err)
} }
// Create proxy config manager now because it is a dependency of creating the proxyWatcher
// which will be passed to consul.NewServer so that it is then passed to the
// controller registration for the XDS controller in v2 mode, and the xds server in v1 and v2 mode.
var intentionDefaultAllow bool
switch a.config.ACLResolverSettings.ACLDefaultPolicy {
case "allow":
intentionDefaultAllow = true
case "deny":
intentionDefaultAllow = false
default:
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}
go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})
// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}
// proxyWatcher will be used in the creation of the XDS server and also
// in the registration of the xds controller.
proxyWatcher := a.getProxyWatcher()
// Setup either the client or the server. // Setup either the client or the server.
if c.ServerMode { if c.ServerMode {
serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer) serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer)
@ -686,7 +727,11 @@ func (a *Agent) Start(ctx context.Context) error {
incomingRPCLimiter, incomingRPCLimiter,
) )
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger) var pt *proxytracker.ProxyTracker
if a.useV2Resources() {
pt = proxyWatcher.(*proxytracker.ProxyTracker)
}
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps, a.externalGRPCServer, incomingRPCLimiter, serverLogger, pt)
if err != nil { if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err) return fmt.Errorf("Failed to start Consul server: %v", err)
} }
@ -753,40 +798,6 @@ func (a *Agent) Start(ctx context.Context) error {
return err return err
} }
var intentionDefaultAllow bool
switch a.config.ACLResolverSettings.ACLDefaultPolicy {
case "allow":
intentionDefaultAllow = true
case "deny":
intentionDefaultAllow = false
default:
return fmt.Errorf("unexpected ACL default policy value of %q", a.config.ACLResolverSettings.ACLDefaultPolicy)
}
go a.baseDeps.ViewStore.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})
// Start the proxy config manager.
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
DataSources: a.proxyDataSources(),
Logger: a.logger.Named(logging.ProxyConfig),
Source: &structs.QuerySource{
Datacenter: a.config.Datacenter,
Segment: a.config.SegmentName,
Node: a.config.NodeName,
NodePartition: a.config.PartitionOrEmpty(),
},
DNSConfig: proxycfg.DNSConfig{
Domain: a.config.DNSDomain,
AltDomain: a.config.DNSAltDomain,
},
TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow,
UpdateRateLimit: a.config.XDSUpdateRateLimit,
})
if err != nil {
return err
}
go localproxycfg.Sync( go localproxycfg.Sync(
&lib.StopChannelContext{StopCh: a.shutdownCh}, &lib.StopChannelContext{StopCh: a.shutdownCh},
localproxycfg.SyncConfig{ localproxycfg.SyncConfig{
@ -839,7 +850,7 @@ func (a *Agent) Start(ctx context.Context) error {
} }
// Start grpc and grpc_tls servers. // Start grpc and grpc_tls servers.
if err := a.listenAndServeGRPC(); err != nil { if err := a.listenAndServeGRPC(proxyWatcher); err != nil {
return err return err
} }
@ -924,11 +935,13 @@ func (a *Agent) useV2Resources() bool {
// it will return a ConfigSource. // it will return a ConfigSource.
func (a *Agent) getProxyWatcher() xds.ProxyWatcher { func (a *Agent) getProxyWatcher() xds.ProxyWatcher {
if a.useV2Resources() { if a.useV2Resources() {
a.logger.Trace("returning proxyTracker for getProxyWatcher")
return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{ return proxytracker.NewProxyTracker(proxytracker.ProxyTrackerConfig{
Logger: a.proxyConfig.Logger.Named("proxy-tracker"), Logger: a.logger.Named("proxy-tracker"),
SessionLimiter: a.baseDeps.XDSStreamLimiter, SessionLimiter: a.baseDeps.XDSStreamLimiter,
}) })
} else { } else {
a.logger.Trace("returning configSource for getProxyWatcher")
return localproxycfg.NewConfigSource(a.proxyConfig) return localproxycfg.NewConfigSource(a.proxyConfig)
} }
} }
@ -936,16 +949,14 @@ func (a *Agent) getProxyWatcher() xds.ProxyWatcher {
// configureXDSServer configures an XDS server with the proper implementation of // configureXDSServer configures an XDS server with the proper implementation of
// the PRoxyWatcher interface and registers the XDS server with Consul's // the PRoxyWatcher interface and registers the XDS server with Consul's
// external facing GRPC server. // external facing GRPC server.
func (a *Agent) configureXDSServer() { func (a *Agent) configureXDSServer(proxyWatcher xds.ProxyWatcher) {
cfg := a.getProxyWatcher()
// TODO(agentless): rather than asserting the concrete type of delegate, we // TODO(agentless): rather than asserting the concrete type of delegate, we
// should add a method to the Delegate interface to build a ConfigSource. // should add a method to the Delegate interface to build a ConfigSource.
if server, ok := a.delegate.(*consul.Server); ok { if server, ok := a.delegate.(*consul.Server); ok {
catalogCfg := catalogproxycfg.NewConfigSource(catalogproxycfg.Config{ catalogCfg := catalogproxycfg.NewConfigSource(catalogproxycfg.Config{
NodeName: a.config.NodeName, NodeName: a.config.NodeName,
LocalState: a.State, LocalState: a.State,
LocalConfigSource: cfg, LocalConfigSource: proxyWatcher,
Manager: a.proxyConfig, Manager: a.proxyConfig,
GetStore: func() catalogproxycfg.Store { return server.FSM().State() }, GetStore: func() catalogproxycfg.Store { return server.FSM().State() },
Logger: a.proxyConfig.Logger.Named("server-catalog"), Logger: a.proxyConfig.Logger.Named("server-catalog"),
@ -955,12 +966,12 @@ func (a *Agent) configureXDSServer() {
<-a.shutdownCh <-a.shutdownCh
catalogCfg.Shutdown() catalogCfg.Shutdown()
}() }()
cfg = catalogCfg proxyWatcher = catalogCfg
} }
a.xdsServer = xds.NewServer( a.xdsServer = xds.NewServer(
a.config.NodeName, a.config.NodeName,
a.logger.Named(logging.Envoy), a.logger.Named(logging.Envoy),
cfg, proxyWatcher,
func(id string) (acl.Authorizer, error) { func(id string) (acl.Authorizer, error) {
return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil) return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil)
}, },
@ -969,12 +980,12 @@ func (a *Agent) configureXDSServer() {
a.xdsServer.Register(a.externalGRPCServer) a.xdsServer.Register(a.externalGRPCServer)
} }
func (a *Agent) listenAndServeGRPC() error { func (a *Agent) listenAndServeGRPC(proxyWatcher xds.ProxyWatcher) error {
if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 { if len(a.config.GRPCAddrs) < 1 && len(a.config.GRPCTLSAddrs) < 1 {
return nil return nil
} }
a.configureXDSServer() a.configureXDSServer(proxyWatcher)
// Attempt to spawn listeners // Attempt to spawn listeners
var listeners []net.Listener var listeners []net.Listener

View File

@ -15,10 +15,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/grpc-external/limiter"
proxytracker "github.com/hashicorp/consul/agent/proxy-tracker"
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/proxycfg-sources/local" "github.com/hashicorp/consul/agent/proxycfg-sources/local"
"github.com/hashicorp/consul/agent/xds" "github.com/hashicorp/consul/agent/xds"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
mathrand "math/rand" mathrand "math/rand"
"net" "net"
"net/http" "net/http"

View File

@ -566,7 +566,7 @@ func TestCAManager_Initialize_Logging(t *testing.T) {
deps := newDefaultDeps(t, conf1) deps := newDefaultDeps(t, conf1)
deps.Logger = logger deps.Logger = logger
s1, err := NewServer(conf1, deps, grpc.NewServer(), nil, logger) s1, err := NewServer(conf1, deps, grpc.NewServer(), nil, logger, nil)
require.NoError(t, err) require.NoError(t, err)
defer s1.Shutdown() defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1") testrpc.WaitForLeader(t, s1.RPC, "dc1")

View File

@ -1640,7 +1640,7 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
deps := newDefaultDeps(t, config) deps := newDefaultDeps(t, config)
deps.Logger = logger deps.Logger = logger
srv, err := NewServer(config, deps, grpc.NewServer(), nil, logger) srv, err := NewServer(config, deps, grpc.NewServer(), nil, logger, nil)
require.NoError(t, err) require.NoError(t, err)
defer srv.Shutdown() defer srv.Shutdown()

View File

@ -8,6 +8,7 @@ import (
"crypto/x509" "crypto/x509"
"errors" "errors"
"fmt" "fmt"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"io" "io"
"net" "net"
"os" "os"
@ -480,9 +481,21 @@ type connHandler interface {
Shutdown() error Shutdown() error
} }
// ProxyUpdater is an interface for ProxyTracker.
type ProxyUpdater interface {
// PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy.
PushChange(id *pbresource.ID, snapshot proxysnapshot.ProxySnapshot) error
// ProxyConnectedToServer returns whether this id is connected to this server.
ProxyConnectedToServer(id *pbresource.ID) bool
EventChannel() chan controller.Event
}
// NewServer is used to construct a new Consul server from the configuration // NewServer is used to construct a new Consul server from the configuration
// and extra options, potentially returning an error. // and extra options, potentially returning an error.
func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incomingRPCLimiter rpcRate.RequestLimitsHandler, serverLogger hclog.InterceptLogger) (*Server, error) { func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server,
incomingRPCLimiter rpcRate.RequestLimitsHandler, serverLogger hclog.InterceptLogger, proxyUpdater ProxyUpdater) (*Server, error) {
logger := flat.Logger logger := flat.Logger
if err := config.CheckProtocolVersion(); err != nil { if err := config.CheckProtocolVersion(); err != nil {
return nil, err return nil, err
@ -822,7 +835,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
s.insecureResourceServiceClient, s.insecureResourceServiceClient,
logger.Named(logging.ControllerRuntime), logger.Named(logging.ControllerRuntime),
) )
s.registerControllers(flat) s.registerControllers(flat, proxyUpdater)
go s.controllerManager.Run(&lib.StopChannelContext{StopCh: shutdownCh}) go s.controllerManager.Run(&lib.StopChannelContext{StopCh: shutdownCh})
go s.trackLeaderChanges() go s.trackLeaderChanges()
@ -873,7 +886,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
return s, nil return s, nil
} }
func (s *Server) registerControllers(deps Deps) { func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) {
if stringslice.Contains(deps.Experiments, CatalogResourceExperimentName) { if stringslice.Contains(deps.Experiments, CatalogResourceExperimentName) {
catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies()) catalog.RegisterControllers(s.controllerManager, catalog.DefaultControllerDependencies())
mesh.RegisterControllers(s.controllerManager, mesh.ControllerDependencies{ mesh.RegisterControllers(s.controllerManager, mesh.ControllerDependencies{
@ -889,6 +902,7 @@ func (s *Server) registerControllers(deps Deps) {
} }
return &bundle, nil return &bundle, nil
}, },
ProxyUpdater: proxyUpdater,
}) })
} }

View File

@ -336,7 +336,7 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) {
} }
} }
grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRequestLimitsHandler()) grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRequestLimitsHandler())
srv, err := NewServer(c, deps, grpcServer, nil, deps.Logger) srv, err := NewServer(c, deps, grpcServer, nil, deps.Logger, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1243,7 +1243,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
} }
} }
s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger) s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -1281,7 +1281,7 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
return nil return nil
} }
s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger) s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -1315,7 +1315,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
deps := newDefaultDeps(t, conf) deps := newDefaultDeps(t, conf)
deps.NewRequestRecorderFunc = nil deps.NewRequestRecorderFunc = nil
s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger) s1, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil)
require.Error(t, err, "need err when provider func is nil") require.Error(t, err, "need err when provider func is nil")
require.Equal(t, err.Error(), "cannot initialize server without an RPC request recorder provider") require.Equal(t, err.Error(), "cannot initialize server without an RPC request recorder provider")
@ -1334,7 +1334,7 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
return nil return nil
} }
s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger) s2, err := NewServer(conf, deps, grpc.NewServer(), nil, deps.Logger, nil)
require.Error(t, err, "need err when RequestRecorder is nil") require.Error(t, err, "need err when RequestRecorder is nil")
require.Equal(t, err.Error(), "cannot initialize server with a nil RPC request recorder") require.Equal(t, err.Error(), "cannot initialize server with a nil RPC request recorder")

View File

@ -6,6 +6,8 @@ package catalog
import ( import (
"context" "context"
"errors" "errors"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto-public/pbresource"
"sync" "sync"
@ -14,7 +16,6 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -49,7 +50,7 @@ func NewConfigSource(cfg Config) *ConfigSource {
// Watch wraps the underlying proxycfg.Manager and dynamically registers // Watch wraps the underlying proxycfg.Manager and dynamically registers
// services from the catalog with it when requested by the xDS server. // services from the catalog with it when requested by the xDS server.
func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { func (m *ConfigSource) Watch(id *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) {
// Create service ID // Create service ID
serviceID := structs.NewServiceID(id.Name, GetEnterpriseMetaFromResourceID(id)) serviceID := structs.NewServiceID(id.Name, GetEnterpriseMetaFromResourceID(id))
// If the service is registered to the local agent, use the LocalConfigSource // If the service is registered to the local agent, use the LocalConfigSource
@ -279,7 +280,7 @@ type Config struct {
//go:generate mockery --name ConfigManager --inpackage //go:generate mockery --name ConfigManager --inpackage
type ConfigManager interface { type ConfigManager interface {
Watch(req proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc) Watch(req proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc)
Register(proxyID proxycfg.ProxyID, service *structs.NodeService, source proxycfg.ProxySource, token string, overwrite bool) error Register(proxyID proxycfg.ProxyID, service *structs.NodeService, source proxycfg.ProxySource, token string, overwrite bool) error
Deregister(proxyID proxycfg.ProxyID, source proxycfg.ProxySource) Deregister(proxyID proxycfg.ProxyID, source proxycfg.ProxySource)
} }
@ -292,10 +293,11 @@ type Store interface {
//go:generate mockery --name Watcher --inpackage //go:generate mockery --name Watcher --inpackage
type Watcher interface { type Watcher interface {
Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error)
} }
//go:generate mockery --name SessionLimiter --inpackage //go:generate mockery --name SessionLimiter --inpackage
type SessionLimiter interface { type SessionLimiter interface {
BeginSession() (limiter.Session, error) BeginSession() (limiter.Session, error)
Run(ctx context.Context)
} }

View File

@ -4,7 +4,9 @@
package catalog package catalog
import ( import (
"context"
"errors" "errors"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
rtest "github.com/hashicorp/consul/internal/resource/resourcetest" rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
"testing" "testing"
"time" "time"
@ -177,7 +179,7 @@ func TestConfigSource_LocallyManagedService(t *testing.T) {
localWatcher := NewMockWatcher(t) localWatcher := NewMockWatcher(t)
localWatcher.On("Watch", proxyID, nodeName, token). localWatcher.On("Watch", proxyID, nodeName, token).
Return(make(<-chan proxycfg.ProxySnapshot), nil, proxycfg.CancelFunc(func() {}), nil) Return(make(<-chan proxysnapshot.ProxySnapshot), nil, proxysnapshot.CancelFunc(func() {}), nil)
mgr := NewConfigSource(Config{ mgr := NewConfigSource(Config{
NodeName: nodeName, NodeName: nodeName,
@ -211,12 +213,12 @@ func TestConfigSource_ErrorRegisteringService(t *testing.T) {
})) }))
var canceledWatch bool var canceledWatch bool
cancel := proxycfg.CancelFunc(func() { canceledWatch = true }) cancel := proxysnapshot.CancelFunc(func() { canceledWatch = true })
cfgMgr := NewMockConfigManager(t) cfgMgr := NewMockConfigManager(t)
cfgMgr.On("Watch", mock.Anything). cfgMgr.On("Watch", mock.Anything).
Return(make(<-chan proxycfg.ProxySnapshot), cancel) Return(make(<-chan proxysnapshot.ProxySnapshot), cancel)
cfgMgr.On("Register", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). cfgMgr.On("Register", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("KABOOM")) Return(errors.New("KABOOM"))
@ -261,12 +263,12 @@ func TestConfigSource_NotProxyService(t *testing.T) {
})) }))
var canceledWatch bool var canceledWatch bool
cancel := proxycfg.CancelFunc(func() { canceledWatch = true }) cancel := proxysnapshot.CancelFunc(func() { canceledWatch = true })
cfgMgr := NewMockConfigManager(t) cfgMgr := NewMockConfigManager(t)
cfgMgr.On("Watch", mock.Anything). cfgMgr.On("Watch", mock.Anything).
Return(make(<-chan proxycfg.ProxySnapshot), cancel) Return(make(<-chan proxysnapshot.ProxySnapshot), cancel)
mgr := NewConfigSource(Config{ mgr := NewConfigSource(Config{
Manager: cfgMgr, Manager: cfgMgr,
@ -312,9 +314,9 @@ func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName strin
Token: token, Token: token,
} }
snapCh := make(chan proxycfg.ProxySnapshot, 1) snapCh := make(chan proxysnapshot.ProxySnapshot, 1)
cfgMgr.On("Watch", proxyID). cfgMgr.On("Watch", proxyID).
Return((<-chan proxycfg.ProxySnapshot)(snapCh), proxycfg.CancelFunc(func() {}), nil) Return((<-chan proxysnapshot.ProxySnapshot)(snapCh), proxysnapshot.CancelFunc(func() {}), nil)
cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false). cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false).
Run(func(args mock.Arguments) { Run(func(args mock.Arguments) {
@ -358,6 +360,8 @@ func (nullSessionLimiter) BeginSession() (limiter.Session, error) {
return nullSession{}, nil return nullSession{}, nil
} }
func (nullSessionLimiter) Run(ctx context.Context) {}
type nullSession struct{} type nullSession struct{}
func (nullSession) End() {} func (nullSession) End() {}

View File

@ -4,6 +4,7 @@ package catalog
import ( import (
proxycfg "github.com/hashicorp/consul/agent/proxycfg" proxycfg "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs" structs "github.com/hashicorp/consul/agent/structs"
@ -34,27 +35,27 @@ func (_m *MockConfigManager) Register(proxyID proxycfg.ProxyID, service *structs
} }
// Watch provides a mock function with given fields: req // Watch provides a mock function with given fields: req
func (_m *MockConfigManager) Watch(req proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc) { func (_m *MockConfigManager) Watch(req proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc) {
ret := _m.Called(req) ret := _m.Called(req)
var r0 <-chan proxycfg.ProxySnapshot var r0 <-chan proxysnapshot.ProxySnapshot
var r1 proxycfg.CancelFunc var r1 proxysnapshot.CancelFunc
if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc)); ok { if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc)); ok {
return rf(req) return rf(req)
} }
if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) <-chan proxycfg.ProxySnapshot); ok { if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) <-chan proxysnapshot.ProxySnapshot); ok {
r0 = rf(req) r0 = rf(req)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan proxycfg.ProxySnapshot) r0 = ret.Get(0).(<-chan proxysnapshot.ProxySnapshot)
} }
} }
if rf, ok := ret.Get(1).(func(proxycfg.ProxyID) proxycfg.CancelFunc); ok { if rf, ok := ret.Get(1).(func(proxycfg.ProxyID) proxysnapshot.CancelFunc); ok {
r1 = rf(req) r1 = rf(req)
} else { } else {
if ret.Get(1) != nil { if ret.Get(1) != nil {
r1 = ret.Get(1).(proxycfg.CancelFunc) r1 = ret.Get(1).(proxysnapshot.CancelFunc)
} }
} }

View File

@ -1,9 +1,11 @@
// Code generated by mockery v2.15.0. DO NOT EDIT. // Code generated by mockery v2.32.4. DO NOT EDIT.
package catalog package catalog
import ( import (
limiter "github.com/hashicorp/consul/agent/grpc-external/limiter" context "context"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
) )
@ -17,6 +19,10 @@ func (_m *MockSessionLimiter) BeginSession() (limiter.Session, error) {
ret := _m.Called() ret := _m.Called()
var r0 limiter.Session var r0 limiter.Session
var r1 error
if rf, ok := ret.Get(0).(func() (limiter.Session, error)); ok {
return rf()
}
if rf, ok := ret.Get(0).(func() limiter.Session); ok { if rf, ok := ret.Get(0).(func() limiter.Session); ok {
r0 = rf() r0 = rf()
} else { } else {
@ -25,7 +31,6 @@ func (_m *MockSessionLimiter) BeginSession() (limiter.Session, error) {
} }
} }
var r1 error
if rf, ok := ret.Get(1).(func() error); ok { if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf() r1 = rf()
} else { } else {
@ -35,13 +40,17 @@ func (_m *MockSessionLimiter) BeginSession() (limiter.Session, error) {
return r0, r1 return r0, r1
} }
type mockConstructorTestingTNewMockSessionLimiter interface { // Run provides a mock function with given fields: ctx
mock.TestingT func (_m *MockSessionLimiter) Run(ctx context.Context) {
Cleanup(func()) _m.Called(ctx)
} }
// NewMockSessionLimiter creates a new instance of MockSessionLimiter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // NewMockSessionLimiter creates a new instance of MockSessionLimiter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockSessionLimiter(t mockConstructorTestingTNewMockSessionLimiter) *MockSessionLimiter { // The first argument is typically a *testing.T value.
func NewMockSessionLimiter(t interface {
mock.TestingT
Cleanup(func())
}) *MockSessionLimiter {
mock := &MockSessionLimiter{} mock := &MockSessionLimiter{}
mock.Mock.Test(t) mock.Mock.Test(t)

View File

@ -3,12 +3,11 @@
package catalog package catalog
import ( import (
limiter "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
pbresource "github.com/hashicorp/consul/proto-public/pbresource" pbresource "github.com/hashicorp/consul/proto-public/pbresource"
proxycfg "github.com/hashicorp/consul/agent/proxycfg"
) )
// MockWatcher is an autogenerated mock type for the Watcher type // MockWatcher is an autogenerated mock type for the Watcher type
@ -17,21 +16,21 @@ type MockWatcher struct {
} }
// Watch provides a mock function with given fields: proxyID, nodeName, token // Watch provides a mock function with given fields: proxyID, nodeName, token
func (_m *MockWatcher) Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { func (_m *MockWatcher) Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) {
ret := _m.Called(proxyID, nodeName, token) ret := _m.Called(proxyID, nodeName, token)
var r0 <-chan proxycfg.ProxySnapshot var r0 <-chan proxysnapshot.ProxySnapshot
var r1 limiter.SessionTerminatedChan var r1 limiter.SessionTerminatedChan
var r2 proxycfg.CancelFunc var r2 proxysnapshot.CancelFunc
var r3 error var r3 error
if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error)); ok { if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error)); ok {
return rf(proxyID, nodeName, token) return rf(proxyID, nodeName, token)
} }
if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) <-chan proxycfg.ProxySnapshot); ok { if rf, ok := ret.Get(0).(func(*pbresource.ID, string, string) <-chan proxysnapshot.ProxySnapshot); ok {
r0 = rf(proxyID, nodeName, token) r0 = rf(proxyID, nodeName, token)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan proxycfg.ProxySnapshot) r0 = ret.Get(0).(<-chan proxysnapshot.ProxySnapshot)
} }
} }
@ -43,11 +42,11 @@ func (_m *MockWatcher) Watch(proxyID *pbresource.ID, nodeName string, token stri
} }
} }
if rf, ok := ret.Get(2).(func(*pbresource.ID, string, string) proxycfg.CancelFunc); ok { if rf, ok := ret.Get(2).(func(*pbresource.ID, string, string) proxysnapshot.CancelFunc); ok {
r2 = rf(proxyID, nodeName, token) r2 = rf(proxyID, nodeName, token)
} else { } else {
if ret.Get(2) != nil { if ret.Get(2) != nil {
r2 = ret.Get(2).(proxycfg.CancelFunc) r2 = ret.Get(2).(proxysnapshot.CancelFunc)
} }
} }

View File

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/proxycfg-sources/catalog" "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
structs "github.com/hashicorp/consul/agent/structs" structs "github.com/hashicorp/consul/agent/structs"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto-public/pbresource"
) )
@ -22,8 +23,8 @@ func NewConfigSource(cfgMgr ConfigManager) *ConfigSource {
return &ConfigSource{cfgMgr} return &ConfigSource{cfgMgr}
} }
func (m *ConfigSource) Watch(proxyID *pbresource.ID, nodeName string, _ string) (<-chan proxycfg.ProxySnapshot, func (m *ConfigSource) Watch(proxyID *pbresource.ID, nodeName string, _ string) (<-chan proxysnapshot.ProxySnapshot,
limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) {
serviceID := structs.NewServiceID(proxyID.Name, catalog.GetEnterpriseMetaFromResourceID(proxyID)) serviceID := structs.NewServiceID(proxyID.Name, catalog.GetEnterpriseMetaFromResourceID(proxyID))
watchCh, cancelWatch := m.manager.Watch(proxycfg.ProxyID{ watchCh, cancelWatch := m.manager.Watch(proxycfg.ProxyID{
ServiceID: serviceID, ServiceID: serviceID,

View File

@ -4,6 +4,7 @@ package local
import ( import (
proxycfg "github.com/hashicorp/consul/agent/proxycfg" proxycfg "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs" structs "github.com/hashicorp/consul/agent/structs"
@ -50,27 +51,27 @@ func (_m *MockConfigManager) RegisteredProxies(source proxycfg.ProxySource) []pr
} }
// Watch provides a mock function with given fields: id // Watch provides a mock function with given fields: id
func (_m *MockConfigManager) Watch(id proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc) { func (_m *MockConfigManager) Watch(id proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc) {
ret := _m.Called(id) ret := _m.Called(id)
var r0 <-chan proxycfg.ProxySnapshot var r0 <-chan proxysnapshot.ProxySnapshot
var r1 proxycfg.CancelFunc var r1 proxysnapshot.CancelFunc
if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc)); ok { if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc)); ok {
return rf(id) return rf(id)
} }
if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) <-chan proxycfg.ProxySnapshot); ok { if rf, ok := ret.Get(0).(func(proxycfg.ProxyID) <-chan proxysnapshot.ProxySnapshot); ok {
r0 = rf(id) r0 = rf(id)
} else { } else {
if ret.Get(0) != nil { if ret.Get(0) != nil {
r0 = ret.Get(0).(<-chan proxycfg.ProxySnapshot) r0 = ret.Get(0).(<-chan proxysnapshot.ProxySnapshot)
} }
} }
if rf, ok := ret.Get(1).(func(proxycfg.ProxyID) proxycfg.CancelFunc); ok { if rf, ok := ret.Get(1).(func(proxycfg.ProxyID) proxysnapshot.CancelFunc); ok {
r1 = rf(id) r1 = rf(id)
} else { } else {
if ret.Get(1) != nil { if ret.Get(1) != nil {
r1 = ret.Get(1).(proxycfg.CancelFunc) r1 = ret.Get(1).(proxysnapshot.CancelFunc)
} }
} }

View File

@ -5,6 +5,7 @@ package local
import ( import (
"context" "context"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"time" "time"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
@ -135,7 +136,7 @@ func sync(cfg SyncConfig) {
//go:generate mockery --name ConfigManager --inpackage //go:generate mockery --name ConfigManager --inpackage
type ConfigManager interface { type ConfigManager interface {
Watch(id proxycfg.ProxyID) (<-chan proxycfg.ProxySnapshot, proxycfg.CancelFunc) Watch(id proxycfg.ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc)
Register(proxyID proxycfg.ProxyID, service *structs.NodeService, source proxycfg.ProxySource, token string, overwrite bool) error Register(proxyID proxycfg.ProxyID, service *structs.NodeService, source proxycfg.ProxySource, token string, overwrite bool) error
Deregister(proxyID proxycfg.ProxyID, source proxycfg.ProxySource) Deregister(proxyID proxycfg.ProxyID, source proxycfg.ProxySource)
RegisteredProxies(source proxycfg.ProxySource) []proxycfg.ProxyID RegisteredProxies(source proxycfg.ProxySource) []proxycfg.ProxyID

View File

@ -8,16 +8,6 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
// ProxySnapshot is an abstraction that allows interchangeability between
// Catalog V1 ConfigSnapshot and Catalog V2 ProxyState.
type ProxySnapshot interface {
AllowEmptyListeners() bool
AllowEmptyRoutes() bool
AllowEmptyClusters() bool
Authorize(authz acl.Authorizer) error
LoggerName() string
}
// The below functions are added to ConfigSnapshot to allow it to conform to // The below functions are added to ConfigSnapshot to allow it to conform to
// the ProxySnapshot interface. // the ProxySnapshot interface.
func (s *ConfigSnapshot) AllowEmptyListeners() bool { func (s *ConfigSnapshot) AllowEmptyListeners() bool {

View File

@ -4,10 +4,11 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
) )
func TestConfigSnapshot_AllowEmptyClusters(t *testing.T) { func TestConfigSnapshot_AllowEmptyClusters(t *testing.T) {

View File

@ -5,6 +5,7 @@ package proxycfg
import ( import (
"errors" "errors"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"runtime/debug" "runtime/debug"
"sync" "sync"
@ -36,10 +37,6 @@ type ProxyID struct {
// from overwriting each other's registrations. // from overwriting each other's registrations.
type ProxySource string type ProxySource string
// CancelFunc is a type for a returned function that can be called to cancel a
// watch.
type CancelFunc func()
// Manager provides an API with which proxy services can be registered, and // Manager provides an API with which proxy services can be registered, and
// coordinates the fetching (and refreshing) of intentions, upstreams, discovery // coordinates the fetching (and refreshing) of intentions, upstreams, discovery
// chain, certificates etc. // chain, certificates etc.
@ -55,7 +52,7 @@ type Manager struct {
mu sync.Mutex mu sync.Mutex
proxies map[ProxyID]*state proxies map[ProxyID]*state
watchers map[ProxyID]map[uint64]chan ProxySnapshot watchers map[ProxyID]map[uint64]chan proxysnapshot.ProxySnapshot
maxWatchID uint64 maxWatchID uint64
} }
@ -106,7 +103,7 @@ func NewManager(cfg ManagerConfig) (*Manager, error) {
m := &Manager{ m := &Manager{
ManagerConfig: cfg, ManagerConfig: cfg,
proxies: make(map[ProxyID]*state), proxies: make(map[ProxyID]*state),
watchers: make(map[ProxyID]map[uint64]chan ProxySnapshot), watchers: make(map[ProxyID]map[uint64]chan proxysnapshot.ProxySnapshot),
rateLimiter: rate.NewLimiter(cfg.UpdateRateLimit, 1), rateLimiter: rate.NewLimiter(cfg.UpdateRateLimit, 1),
} }
return m, nil return m, nil
@ -262,7 +259,7 @@ func (m *Manager) notify(snap *ConfigSnapshot) {
// it will drain the chan and then re-attempt delivery so that a slow consumer // it will drain the chan and then re-attempt delivery so that a slow consumer
// gets the latest config earlier. This MUST be called from a method where m.mu // gets the latest config earlier. This MUST be called from a method where m.mu
// is held to be safe since it assumes we are the only goroutine sending on ch. // is held to be safe since it assumes we are the only goroutine sending on ch.
func (m *Manager) deliverLatest(snap *ConfigSnapshot, ch chan ProxySnapshot) { func (m *Manager) deliverLatest(snap *ConfigSnapshot, ch chan proxysnapshot.ProxySnapshot) {
// Send if chan is empty // Send if chan is empty
select { select {
case ch <- snap: case ch <- snap:
@ -299,16 +296,16 @@ OUTER:
// will not fail, but no updates will be delivered until the proxy is // will not fail, but no updates will be delivered until the proxy is
// registered. If there is already a valid snapshot in memory, it will be // registered. If there is already a valid snapshot in memory, it will be
// delivered immediately. // delivered immediately.
func (m *Manager) Watch(id ProxyID) (<-chan ProxySnapshot, CancelFunc) { func (m *Manager) Watch(id ProxyID) (<-chan proxysnapshot.ProxySnapshot, proxysnapshot.CancelFunc) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
// This buffering is crucial otherwise we'd block immediately trying to // This buffering is crucial otherwise we'd block immediately trying to
// deliver the current snapshot below if we already have one. // deliver the current snapshot below if we already have one.
ch := make(chan ProxySnapshot, 1) ch := make(chan proxysnapshot.ProxySnapshot, 1)
watchers, ok := m.watchers[id] watchers, ok := m.watchers[id]
if !ok { if !ok {
watchers = make(map[uint64]chan ProxySnapshot) watchers = make(map[uint64]chan proxysnapshot.ProxySnapshot)
} }
watchID := m.maxWatchID watchID := m.maxWatchID
m.maxWatchID++ m.maxWatchID++

View File

@ -4,6 +4,7 @@
package proxycfg package proxycfg
import ( import (
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"testing" "testing"
"time" "time"
@ -469,7 +470,7 @@ func testManager_BasicLifecycle(
require.Len(t, m.watchers, 0) require.Len(t, m.watchers, 0)
} }
func assertWatchChanBlocks(t *testing.T, ch <-chan ProxySnapshot) { func assertWatchChanBlocks(t *testing.T, ch <-chan proxysnapshot.ProxySnapshot) {
t.Helper() t.Helper()
select { select {
@ -479,7 +480,7 @@ func assertWatchChanBlocks(t *testing.T, ch <-chan ProxySnapshot) {
} }
} }
func assertWatchChanRecvs(t *testing.T, ch <-chan ProxySnapshot, expect ProxySnapshot) { func assertWatchChanRecvs(t *testing.T, ch <-chan proxysnapshot.ProxySnapshot, expect proxysnapshot.ProxySnapshot) {
t.Helper() t.Helper()
select { select {
@ -517,7 +518,7 @@ func TestManager_deliverLatest(t *testing.T) {
} }
// test 1 buffered chan // test 1 buffered chan
ch1 := make(chan ProxySnapshot, 1) ch1 := make(chan proxysnapshot.ProxySnapshot, 1)
// Sending to an unblocked chan should work // Sending to an unblocked chan should work
m.deliverLatest(snap1, ch1) m.deliverLatest(snap1, ch1)
@ -533,7 +534,7 @@ func TestManager_deliverLatest(t *testing.T) {
require.Equal(t, snap2, <-ch1) require.Equal(t, snap2, <-ch1)
// Same again for 5-buffered chan // Same again for 5-buffered chan
ch5 := make(chan ProxySnapshot, 5) ch5 := make(chan proxysnapshot.ProxySnapshot, 5)
// Sending to an unblocked chan should work // Sending to an unblocked chan should work
m.deliverLatest(snap1, ch5) m.deliverLatest(snap1, ch5)

View File

@ -5,7 +5,9 @@ package agent
import ( import (
"encoding/json" "encoding/json"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/internal/mesh" "github.com/hashicorp/consul/internal/mesh"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
rtest "github.com/hashicorp/consul/internal/resource/resourcetest" rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -14,8 +16,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
@ -54,7 +54,7 @@ func TestAgent_local_proxycfg(t *testing.T) {
// This is a little gross, but this gives us the layered pair of // This is a little gross, but this gives us the layered pair of
// local/catalog sources for now. // local/catalog sources for now.
cfg := a.xdsServer.CfgSrc cfg := a.xdsServer.ProxyWatcher
var ( var (
timer = time.After(100 * time.Millisecond) timer = time.After(100 * time.Millisecond)
@ -64,9 +64,9 @@ func TestAgent_local_proxycfg(t *testing.T) {
var ( var (
firstTime = true firstTime = true
ch <-chan proxycfg.ProxySnapshot ch <-chan proxysnapshot.ProxySnapshot
stc limiter.SessionTerminatedChan stc limiter.SessionTerminatedChan
cancel proxycfg.CancelFunc cancel proxysnapshot.CancelFunc
) )
defer func() { defer func() {
if cancel != nil { if cancel != nil {

View File

@ -1820,7 +1820,7 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer {
deps := newDefaultDeps(t, conf) deps := newDefaultDeps(t, conf)
externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, rate.NullRequestLimitsHandler()) externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, rate.NullRequestLimitsHandler())
server, err := consul.NewServer(conf, deps, externalGRPCServer, nil, deps.Logger) server, err := consul.NewServer(conf, deps, externalGRPCServer, nil, deps.Logger, nil)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, server.Shutdown()) require.NoError(t, server.Shutdown())

View File

@ -11,6 +11,8 @@ import (
"github.com/hashicorp/consul/agent/xds/configfetcher" "github.com/hashicorp/consul/agent/xds/configfetcher"
"github.com/hashicorp/consul/agent/xdsv2" "github.com/hashicorp/consul/agent/xdsv2"
"github.com/hashicorp/consul/internal/mesh" "github.com/hashicorp/consul/internal/mesh"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto-public/pbresource"
"strconv" "strconv"
"sync" "sync"
@ -91,7 +93,7 @@ func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error {
// Envoy resource generator based on whether it was passed a ConfigSource or // Envoy resource generator based on whether it was passed a ConfigSource or
// ProxyState implementation of the ProxySnapshot interface and returns the // ProxyState implementation of the ProxySnapshot interface and returns the
// generated Envoy configuration. // generated Envoy configuration.
func getEnvoyConfiguration(proxySnapshot proxycfg.ProxySnapshot, logger hclog.Logger, cfgFetcher configfetcher.ConfigFetcher) (map[string][]proto.Message, error) { func getEnvoyConfiguration(proxySnapshot proxysnapshot.ProxySnapshot, logger hclog.Logger, cfgFetcher configfetcher.ConfigFetcher) (map[string][]proto.Message, error) {
switch proxySnapshot.(type) { switch proxySnapshot.(type) {
case *proxycfg.ConfigSnapshot: case *proxycfg.ConfigSnapshot:
logger.Trace("ProxySnapshot update channel received a ProxySnapshot of type ConfigSnapshot", logger.Trace("ProxySnapshot update channel received a ProxySnapshot of type ConfigSnapshot",
@ -106,14 +108,14 @@ func getEnvoyConfiguration(proxySnapshot proxycfg.ProxySnapshot, logger hclog.Lo
c := proxySnapshot.(*proxycfg.ConfigSnapshot) c := proxySnapshot.(*proxycfg.ConfigSnapshot)
logger.Trace("ConfigSnapshot", c) logger.Trace("ConfigSnapshot", c)
return generator.AllResourcesFromSnapshot(c) return generator.AllResourcesFromSnapshot(c)
case *mesh.ProxyState: case *proxytracker.ProxyState:
logger.Trace("ProxySnapshot update channel received a ProxySnapshot of type ProxyState", logger.Trace("ProxySnapshot update channel received a ProxySnapshot of type ProxyState",
"proxySnapshot", proxySnapshot, "proxySnapshot", proxySnapshot,
) )
generator := xdsv2.NewResourceGenerator( generator := xdsv2.NewResourceGenerator(
logger, logger,
) )
c := proxySnapshot.(*mesh.ProxyState) c := proxySnapshot.(*proxytracker.ProxyState)
logger.Trace("ProxyState", c) logger.Trace("ProxyState", c)
return generator.AllResourcesFromIR(c) return generator.AllResourcesFromIR(c)
default: default:
@ -135,9 +137,9 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
// Loop state // Loop state
var ( var (
proxySnapshot proxycfg.ProxySnapshot proxySnapshot proxysnapshot.ProxySnapshot
node *envoy_config_core_v3.Node node *envoy_config_core_v3.Node
stateCh <-chan proxycfg.ProxySnapshot stateCh <-chan proxysnapshot.ProxySnapshot
drainCh limiter.SessionTerminatedChan drainCh limiter.SessionTerminatedChan
watchCancel func() watchCancel func()
nonce uint64 // xDS requires a unique nonce to correlate response/request pairs nonce uint64 // xDS requires a unique nonce to correlate response/request pairs
@ -202,7 +204,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
authTimer = time.After(s.AuthCheckFrequency) authTimer = time.After(s.AuthCheckFrequency)
} }
checkStreamACLs := func(proxySnap proxycfg.ProxySnapshot) error { checkStreamACLs := func(proxySnap proxysnapshot.ProxySnapshot) error {
return s.authorize(stream.Context(), proxySnap) return s.authorize(stream.Context(), proxySnap)
} }
@ -326,7 +328,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err) return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err)
} }
stateCh, drainCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, options.Token) stateCh, drainCh, watchCancel, err = s.ProxyWatcher.Watch(proxyID, nodeName, options.Token)
switch { switch {
case errors.Is(err, limiter.ErrCapacityReached): case errors.Is(err, limiter.ErrCapacityReached):
return errOverwhelmed return errOverwhelmed
@ -432,14 +434,14 @@ func newResourceIDFromEnvoyNode(node *envoy_config_core_v3.Node) *pbresource.ID
} }
} }
func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, proxySnapshot proxycfg.ProxySnapshot, node *envoy_config_core_v3.Node) (*xdscommon.IndexedResources, error) { func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, proxySnapshot proxysnapshot.ProxySnapshot, node *envoy_config_core_v3.Node) (*xdscommon.IndexedResources, error) {
// TODO(proxystate) // TODO(proxystate)
// This is a workaround for now as envoy extensions are not yet supported with ProxyState. // This is a workaround for now as envoy extensions are not yet supported with ProxyState.
// For now, we cast to proxycfg.ConfigSnapshot and no-op if it's the pbmesh.ProxyState type. // For now, we cast to proxycfg.ConfigSnapshot and no-op if it's the pbmesh.ProxyState type.
var snapshot *proxycfg.ConfigSnapshot var snapshot *proxycfg.ConfigSnapshot
switch proxySnapshot.(type) { switch proxySnapshot.(type) {
//TODO(proxystate): implement envoy extensions for ProxyState //TODO(proxystate): implement envoy extensions for ProxyState
case *mesh.ProxyState: case *proxytracker.ProxyState:
return resources, nil return resources, nil
case *proxycfg.ConfigSnapshot: case *proxycfg.ConfigSnapshot:
snapshot = proxySnapshot.(*proxycfg.ConfigSnapshot) snapshot = proxySnapshot.(*proxycfg.ConfigSnapshot)

View File

@ -5,11 +5,10 @@ package proxystateconverter
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/internal/mesh"
"github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/configfetcher" "github.com/hashicorp/consul/agent/xds/configfetcher"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
@ -19,7 +18,7 @@ import (
type Converter struct { type Converter struct {
Logger hclog.Logger Logger hclog.Logger
CfgFetcher configfetcher.ConfigFetcher CfgFetcher configfetcher.ConfigFetcher
proxyState *mesh.ProxyState proxyState *proxytracker.ProxyState
} }
func NewConverter( func NewConverter(
@ -29,7 +28,7 @@ func NewConverter(
return &Converter{ return &Converter{
Logger: logger, Logger: logger,
CfgFetcher: cfgFetcher, CfgFetcher: cfgFetcher,
proxyState: &mesh.ProxyState{ proxyState: &proxytracker.ProxyState{
ProxyState: &pbmesh.ProxyState{ ProxyState: &pbmesh.ProxyState{
Listeners: make([]*pbproxystate.Listener, 0), Listeners: make([]*pbproxystate.Listener, 0),
Clusters: make(map[string]*pbproxystate.Cluster), Clusters: make(map[string]*pbproxystate.Cluster),
@ -40,7 +39,7 @@ func NewConverter(
} }
} }
func (g *Converter) ProxyStateFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) (*mesh.ProxyState, error) { func (g *Converter) ProxyStateFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) (*proxytracker.ProxyState, error) {
err := g.resourcesFromSnapshot(cfgSnap) err := g.resourcesFromSnapshot(cfgSnap)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to generate FullProxyState: %v", err) return nil, fmt.Errorf("failed to generate FullProxyState: %v", err)

View File

@ -6,6 +6,8 @@ package xds
import ( import (
"context" "context"
"errors" "errors"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto-public/pbresource"
"sync/atomic" "sync/atomic"
"time" "time"
@ -24,8 +26,6 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
external "github.com/hashicorp/consul/agent/grpc-external" external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
) )
var ( var (
@ -85,7 +85,7 @@ type ACLResolverFunc func(id string) (acl.Authorizer, error)
// ProxyConfigSource is the interface xds.Server requires to consume proxy // ProxyConfigSource is the interface xds.Server requires to consume proxy
// config updates. // config updates.
type ProxyWatcher interface { type ProxyWatcher interface {
Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxycfg.ProxySnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) Watch(proxyID *pbresource.ID, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot, limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error)
} }
// Server represents a gRPC server that can handle xDS requests from Envoy. All // Server represents a gRPC server that can handle xDS requests from Envoy. All
@ -96,7 +96,7 @@ type ProxyWatcher interface {
type Server struct { type Server struct {
NodeName string NodeName string
Logger hclog.Logger Logger hclog.Logger
CfgSrc ProxyWatcher ProxyWatcher ProxyWatcher
ResolveToken ACLResolverFunc ResolveToken ACLResolverFunc
CfgFetcher configfetcher.ConfigFetcher CfgFetcher configfetcher.ConfigFetcher
@ -147,14 +147,14 @@ func (c *activeStreamCounters) Increment(ctx context.Context) func() {
func NewServer( func NewServer(
nodeName string, nodeName string,
logger hclog.Logger, logger hclog.Logger,
cfgMgr ProxyWatcher, proxyWatcher ProxyWatcher,
resolveTokenSecret ACLResolverFunc, resolveTokenSecret ACLResolverFunc,
cfgFetcher configfetcher.ConfigFetcher, cfgFetcher configfetcher.ConfigFetcher,
) *Server { ) *Server {
return &Server{ return &Server{
NodeName: nodeName, NodeName: nodeName,
Logger: logger, Logger: logger,
CfgSrc: cfgMgr, ProxyWatcher: proxyWatcher,
ResolveToken: resolveTokenSecret, ResolveToken: resolveTokenSecret,
CfgFetcher: cfgFetcher, CfgFetcher: cfgFetcher,
AuthCheckFrequency: DefaultAuthCheckFrequency, AuthCheckFrequency: DefaultAuthCheckFrequency,
@ -203,7 +203,7 @@ func (s *Server) authenticate(ctx context.Context) (acl.Authorizer, error) {
// proxy ID. We assume that any data in the snapshot was already filtered, // proxy ID. We assume that any data in the snapshot was already filtered,
// which allows this authorization to be a shallow authorization check // which allows this authorization to be a shallow authorization check
// for all the data in a ProxySnapshot. // for all the data in a ProxySnapshot.
func (s *Server) authorize(ctx context.Context, proxySnapshot proxycfg.ProxySnapshot) error { func (s *Server) authorize(ctx context.Context, proxySnapshot proxysnapshot.ProxySnapshot) error {
if proxySnapshot == nil { if proxySnapshot == nil {
return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot") return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot")
} }

View File

@ -6,6 +6,7 @@ package xds
import ( import (
"github.com/hashicorp/consul/agent/proxycfg-sources/catalog" "github.com/hashicorp/consul/agent/proxycfg-sources/catalog"
"github.com/hashicorp/consul/agent/xds/response" "github.com/hashicorp/consul/agent/xds/response"
"github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto-public/pbresource"
"sort" "sort"
"sync" "sync"
@ -74,14 +75,14 @@ func newTestSnapshot(
// testing. It also implements ConnectAuthz to allow control over authorization. // testing. It also implements ConnectAuthz to allow control over authorization.
type testManager struct { type testManager struct {
sync.Mutex sync.Mutex
stateChans map[structs.ServiceID]chan proxycfg.ProxySnapshot stateChans map[structs.ServiceID]chan proxysnapshot.ProxySnapshot
drainChans map[structs.ServiceID]chan struct{} drainChans map[structs.ServiceID]chan struct{}
cancels chan structs.ServiceID cancels chan structs.ServiceID
} }
func newTestManager(t *testing.T) *testManager { func newTestManager(t *testing.T) *testManager {
return &testManager{ return &testManager{
stateChans: map[structs.ServiceID]chan proxycfg.ProxySnapshot{}, stateChans: map[structs.ServiceID]chan proxysnapshot.ProxySnapshot{},
drainChans: map[structs.ServiceID]chan struct{}{}, drainChans: map[structs.ServiceID]chan struct{}{},
cancels: make(chan structs.ServiceID, 10), cancels: make(chan structs.ServiceID, 10),
} }
@ -91,12 +92,12 @@ func newTestManager(t *testing.T) *testManager {
func (m *testManager) RegisterProxy(t *testing.T, proxyID structs.ServiceID) { func (m *testManager) RegisterProxy(t *testing.T, proxyID structs.ServiceID) {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
m.stateChans[proxyID] = make(chan proxycfg.ProxySnapshot, 1) m.stateChans[proxyID] = make(chan proxysnapshot.ProxySnapshot, 1)
m.drainChans[proxyID] = make(chan struct{}) m.drainChans[proxyID] = make(chan struct{})
} }
// Deliver simulates a proxy registration // Deliver simulates a proxy registration
func (m *testManager) DeliverConfig(t *testing.T, proxyID structs.ServiceID, cfg proxycfg.ProxySnapshot) { func (m *testManager) DeliverConfig(t *testing.T, proxyID structs.ServiceID, cfg proxysnapshot.ProxySnapshot) {
t.Helper() t.Helper()
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
@ -123,8 +124,8 @@ func (m *testManager) DrainStreams(proxyID structs.ServiceID) {
} }
// Watch implements ConfigManager // Watch implements ConfigManager
func (m *testManager) Watch(id *pbresource.ID, _ string, _ string) (<-chan proxycfg.ProxySnapshot, func (m *testManager) Watch(id *pbresource.ID, _ string, _ string) (<-chan proxysnapshot.ProxySnapshot,
limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) {
// Create service ID // Create service ID
proxyID := structs.NewServiceID(id.Name, catalog.GetEnterpriseMetaFromResourceID(id)) proxyID := structs.NewServiceID(id.Name, catalog.GetEnterpriseMetaFromResourceID(id))
m.Lock() m.Lock()

View File

@ -5,8 +5,7 @@ package xdsv2
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/internal/mesh" proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
@ -29,11 +28,11 @@ func NewResourceGenerator(
} }
type ProxyResources struct { type ProxyResources struct {
proxyState *mesh.ProxyState proxyState *proxytracker.ProxyState
envoyResources map[string][]proto.Message envoyResources map[string][]proto.Message
} }
func (g *ResourceGenerator) AllResourcesFromIR(proxyState *mesh.ProxyState) (map[string][]proto.Message, error) { func (g *ResourceGenerator) AllResourcesFromIR(proxyState *proxytracker.ProxyState) (map[string][]proto.Message, error) {
pr := &ProxyResources{ pr := &ProxyResources{
proxyState: proxyState, proxyState: proxyState,
envoyResources: make(map[string][]proto.Message), envoyResources: make(map[string][]proto.Message),

View File

@ -13,10 +13,10 @@ import (
type Dependencies struct { type Dependencies struct {
TrustBundleFetcher xds.TrustBundleFetcher TrustBundleFetcher xds.TrustBundleFetcher
ProxyUpdater xds.ProxyUpdater
} }
func Register(mgr *controller.Manager, deps Dependencies) { func Register(mgr *controller.Manager, deps Dependencies) {
mapper := bimapper.New(types.ProxyStateTemplateType, catalog.ServiceEndpointsType) mapper := bimapper.New(types.ProxyStateTemplateType, catalog.ServiceEndpointsType)
// TODO: Pass in a "real" updater once proxy tracker work has completed. mgr.Register(xds.Controller(mapper, deps.ProxyUpdater, deps.TrustBundleFetcher))
mgr.Register(xds.Controller(mapper, nil, deps.TrustBundleFetcher))
} }

View File

@ -5,14 +5,14 @@ package xds
import ( import (
"context" "context"
"github.com/hashicorp/consul/internal/catalog" "github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh/internal/controllers/xds/status" "github.com/hashicorp/consul/internal/mesh/internal/controllers/xds/status"
"github.com/hashicorp/consul/internal/mesh/internal/types" "github.com/hashicorp/consul/internal/mesh/internal/types"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/mappers/bimapper" "github.com/hashicorp/consul/internal/resource/mappers/bimapper"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
"github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto-public/pbresource"
) )
@ -43,7 +43,7 @@ type TrustBundleFetcher func() (*pbproxystate.TrustBundle, error)
// and also check its connectivity to the server. // and also check its connectivity to the server.
type ProxyUpdater interface { type ProxyUpdater interface {
// PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy. // PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy.
PushChange(id *pbresource.ID, snapshot *pbmesh.ProxyState) error PushChange(id *pbresource.ID, snapshot proxysnapshot.ProxySnapshot) error
// ProxyConnectedToServer returns whether this id is connected to this server. // ProxyConnectedToServer returns whether this id is connected to this server.
ProxyConnectedToServer(id *pbresource.ID) bool ProxyConnectedToServer(id *pbresource.ID) bool
@ -156,7 +156,7 @@ func (r *xdsReconciler) Reconcile(ctx context.Context, rt controller.Runtime, re
computedProxyState := proxyStateTemplate.Template.ProxyState computedProxyState := proxyStateTemplate.Template.ProxyState
err = r.updater.PushChange(req.ID, computedProxyState) err = r.updater.PushChange(req.ID, &proxytracker.ProxyState{ProxyState: computedProxyState})
if err != nil { if err != nil {
// Set the status. // Set the status.
statusCondition = status.ConditionRejectedPushChangeFailed(status.KeyFromID(req.ID)) statusCondition = status.ConditionRejectedPushChangeFailed(status.KeyFromID(req.ID))

View File

@ -60,7 +60,7 @@ func (suite *xdsControllerTestSuite) SetupTest() {
suite.fetcher = mockFetcher suite.fetcher = mockFetcher
suite.mapper = bimapper.New(types.ProxyStateTemplateType, catalog.ServiceEndpointsType) suite.mapper = bimapper.New(types.ProxyStateTemplateType, catalog.ServiceEndpointsType)
suite.updater = NewMockUpdater() suite.updater = newMockUpdater()
suite.ctl = &xdsReconciler{ suite.ctl = &xdsReconciler{
bimapper: suite.mapper, bimapper: suite.mapper,

View File

@ -5,9 +5,10 @@ package xds
import ( import (
"fmt" "fmt"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
"sync" "sync"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate" "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
"github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto-public/pbresource"
@ -19,14 +20,14 @@ type mockUpdater struct {
lock sync.Mutex lock sync.Mutex
// latestPs is a map from a ProxyStateTemplate's id.Name in string form to the last computed ProxyState for that // latestPs is a map from a ProxyStateTemplate's id.Name in string form to the last computed ProxyState for that
// ProxyStateTemplate. // ProxyStateTemplate.
latestPs map[string]*pbmesh.ProxyState latestPs map[string]proxysnapshot.ProxySnapshot
notConnected bool notConnected bool
pushChangeError bool pushChangeError bool
} }
func NewMockUpdater() *mockUpdater { func newMockUpdater() *mockUpdater {
return &mockUpdater{ return &mockUpdater{
latestPs: make(map[string]*pbmesh.ProxyState), latestPs: make(map[string]proxysnapshot.ProxySnapshot),
} }
} }
@ -42,13 +43,13 @@ func (m *mockUpdater) SetProxyAsNotConnected() {
m.notConnected = true m.notConnected = true
} }
func (m *mockUpdater) PushChange(id *pbresource.ID, snapshot *pbmesh.ProxyState) error { func (m *mockUpdater) PushChange(id *pbresource.ID, snapshot proxysnapshot.ProxySnapshot) error {
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()
if m.pushChangeError { if m.pushChangeError {
return fmt.Errorf("mock push change error") return fmt.Errorf("mock push change error")
} else { } else {
m.setUnsafe(id.Name, snapshot) m.setUnsafe(id.Name, snapshot.(*proxytracker.ProxyState))
} }
return nil return nil
} }
@ -62,12 +63,12 @@ func (m *mockUpdater) ProxyConnectedToServer(_ *pbresource.ID) bool {
return true return true
} }
func (p *mockUpdater) Get(name string) *pbmesh.ProxyState { func (p *mockUpdater) Get(name string) *proxytracker.ProxyState {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
ps, ok := p.latestPs[name] ps, ok := p.latestPs[name]
if ok { if ok {
return ps return ps.(*proxytracker.ProxyState)
} }
return nil return nil
} }
@ -77,7 +78,7 @@ func (p *mockUpdater) GetEndpoints(name string) map[string]*pbproxystate.Endpoin
defer p.lock.Unlock() defer p.lock.Unlock()
ps, ok := p.latestPs[name] ps, ok := p.latestPs[name]
if ok { if ok {
return ps.Endpoints return ps.(*proxytracker.ProxyState).Endpoints
} }
return nil return nil
} }
@ -87,17 +88,17 @@ func (p *mockUpdater) GetTrustBundle(name string) map[string]*pbproxystate.Trust
defer p.lock.Unlock() defer p.lock.Unlock()
ps, ok := p.latestPs[name] ps, ok := p.latestPs[name]
if ok { if ok {
return ps.TrustBundles return ps.(*proxytracker.ProxyState).TrustBundles
} }
return nil return nil
} }
func (p *mockUpdater) Set(name string, ps *pbmesh.ProxyState) { func (p *mockUpdater) Set(name string, ps *proxytracker.ProxyState) {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
p.setUnsafe(name, ps) p.setUnsafe(name, ps)
} }
func (p *mockUpdater) setUnsafe(name string, ps *pbmesh.ProxyState) { func (p *mockUpdater) setUnsafe(name string, ps *proxytracker.ProxyState) {
p.latestPs[name] = ps p.latestPs[name] = ps
} }

View File

@ -0,0 +1,17 @@
package proxysnapshot
import "github.com/hashicorp/consul/acl"
// ProxySnapshot is an abstraction that allows interchangeability between
// Catalog V1 ConfigSnapshot and Catalog V2 ProxyState.
type ProxySnapshot interface {
AllowEmptyListeners() bool
AllowEmptyRoutes() bool
AllowEmptyClusters() bool
Authorize(authz acl.Authorizer) error
LoggerName() string
}
// CancelFunc is a type for a returned function that can be called to cancel a
// watch.
type CancelFunc func()

View File

@ -3,7 +3,7 @@
package proxytracker package proxytracker
import ( import (
limiter "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/grpc-external/limiter"
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
) )

View File

@ -1,4 +1,4 @@
package mesh package proxytracker
import ( import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"

View File

@ -6,15 +6,15 @@ package proxytracker
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/internal/mesh/internal/types"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"sync" "sync"
"github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh"
"github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto-public/pbresource"
) )
@ -35,9 +35,9 @@ func (e *ProxyConnection) Key() string {
// when the ProxyState for that proxyID has changed. // when the ProxyState for that proxyID has changed.
type proxyWatchData struct { type proxyWatchData struct {
// notifyCh is the channel that the watcher receives updates from ProxyTracker. // notifyCh is the channel that the watcher receives updates from ProxyTracker.
notifyCh chan proxycfg.ProxySnapshot notifyCh chan proxysnapshot.ProxySnapshot
// state is the current/last updated ProxyState for a given proxy. // state is the current/last updated ProxyState for a given proxy.
state *mesh.ProxyState state proxysnapshot.ProxySnapshot
// token is the ACL token provided by the watcher. // token is the ACL token provided by the watcher.
token string token string
// nodeName is the node where the given proxy resides. // nodeName is the node where the given proxy resides.
@ -87,8 +87,8 @@ func NewProxyTracker(cfg ProxyTrackerConfig) *ProxyTracker {
// Watch connects a proxy with ProxyTracker and returns the consumer a channel to receive updates, // Watch connects a proxy with ProxyTracker and returns the consumer a channel to receive updates,
// a channel to notify of xDS terminated session, and a cancel function to cancel the watch. // a channel to notify of xDS terminated session, and a cancel function to cancel the watch.
func (pt *ProxyTracker) Watch(proxyID *pbresource.ID, func (pt *ProxyTracker) Watch(proxyID *pbresource.ID,
nodeName string, token string) (<-chan proxycfg.ProxySnapshot, nodeName string, token string) (<-chan proxysnapshot.ProxySnapshot,
limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { limiter.SessionTerminatedChan, proxysnapshot.CancelFunc, error) {
pt.config.Logger.Trace("watch initiated", "proxyID", proxyID, "nodeName", nodeName) pt.config.Logger.Trace("watch initiated", "proxyID", proxyID, "nodeName", nodeName)
if err := pt.validateWatchArgs(proxyID, nodeName); err != nil { if err := pt.validateWatchArgs(proxyID, nodeName); err != nil {
pt.config.Logger.Error("args failed validation", err) pt.config.Logger.Error("args failed validation", err)
@ -106,7 +106,7 @@ func (pt *ProxyTracker) Watch(proxyID *pbresource.ID,
// This buffering is crucial otherwise we'd block immediately trying to // This buffering is crucial otherwise we'd block immediately trying to
// deliver the current snapshot below if we already have one. // deliver the current snapshot below if we already have one.
proxyStateChan := make(chan proxycfg.ProxySnapshot, 1) proxyStateChan := make(chan proxysnapshot.ProxySnapshot, 1)
watchData := &proxyWatchData{ watchData := &proxyWatchData{
notifyCh: proxyStateChan, notifyCh: proxyStateChan,
state: nil, state: nil,
@ -166,7 +166,7 @@ func (pt *ProxyTracker) notifyNewProxyChannel(proxyID *pbresource.ID) error {
// - ends the session with xDS session limiter. // - ends the session with xDS session limiter.
// - closes the proxy state channel assigned to the proxy. // - closes the proxy state channel assigned to the proxy.
// This function assumes the state lock is already held. // This function assumes the state lock is already held.
func (pt *ProxyTracker) cancelWatchLocked(proxyReferenceKey resource.ReferenceKey, proxyStateChan chan proxycfg.ProxySnapshot, session limiter.Session) { func (pt *ProxyTracker) cancelWatchLocked(proxyReferenceKey resource.ReferenceKey, proxyStateChan chan proxysnapshot.ProxySnapshot, session limiter.Session) {
delete(pt.proxies, proxyReferenceKey) delete(pt.proxies, proxyReferenceKey)
session.End() session.End()
close(proxyStateChan) close(proxyStateChan)
@ -179,8 +179,8 @@ func (pt *ProxyTracker) validateWatchArgs(proxyID *pbresource.ID,
nodeName string) error { nodeName string) error {
if proxyID == nil { if proxyID == nil {
return errors.New("proxyID is required") return errors.New("proxyID is required")
} else if proxyID.GetType().GetKind() != mesh.ProxyStateTemplateConfigurationType.Kind { } else if proxyID.GetType().GetKind() != types.ProxyStateTemplateType.Kind {
return fmt.Errorf("proxyID must be a %s", mesh.ProxyStateTemplateConfigurationType.GetKind()) return fmt.Errorf("proxyID must be a %s", types.ProxyStateTemplateType.GetKind())
} else if nodeName == "" { } else if nodeName == "" {
return errors.New("nodeName is required") return errors.New("nodeName is required")
} }
@ -189,7 +189,7 @@ func (pt *ProxyTracker) validateWatchArgs(proxyID *pbresource.ID,
} }
// PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy. // PushChange allows pushing a computed ProxyState to xds for xds resource generation to send to a proxy.
func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *mesh.ProxyState) error { func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState proxysnapshot.ProxySnapshot) error {
pt.config.Logger.Trace("push change called for proxy", "proxyID", proxyID) pt.config.Logger.Trace("push change called for proxy", "proxyID", proxyID)
proxyReferenceKey := resource.NewReferenceKey(proxyID) proxyReferenceKey := resource.NewReferenceKey(proxyID)
pt.mu.Lock() pt.mu.Lock()
@ -205,7 +205,7 @@ func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState *mesh.Prox
return nil return nil
} }
func (pt *ProxyTracker) deliverLatest(proxyID *pbresource.ID, proxyState *mesh.ProxyState, ch chan proxycfg.ProxySnapshot) { func (pt *ProxyTracker) deliverLatest(proxyID *pbresource.ID, proxyState proxysnapshot.ProxySnapshot, ch chan proxysnapshot.ProxySnapshot) {
pt.config.Logger.Trace("delivering latest proxy snapshot to proxy", "proxyID", proxyID) pt.config.Logger.Trace("delivering latest proxy snapshot to proxy", "proxyID", proxyID)
// Send if chan is empty // Send if chan is empty
select { select {

View File

@ -8,7 +8,7 @@ import (
"fmt" "fmt"
"github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/controller"
"github.com/hashicorp/consul/internal/mesh" "github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/resourcetest" "github.com/hashicorp/consul/internal/resource/resourcetest"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1"
@ -20,7 +20,7 @@ import (
) )
func TestProxyTracker_Watch(t *testing.T) { func TestProxyTracker_Watch(t *testing.T) {
resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID()
proxyReferenceKey := resource.NewReferenceKey(resourceID) proxyReferenceKey := resource.NewReferenceKey(resourceID)
lim := NewMockSessionLimiter(t) lim := NewMockSessionLimiter(t)
session1 := newMockSession(t) session1 := newMockSession(t)
@ -70,7 +70,7 @@ func TestProxyTracker_Watch(t *testing.T) {
} }
func TestProxyTracker_Watch_ErrorConsumerNotReady(t *testing.T) { func TestProxyTracker_Watch_ErrorConsumerNotReady(t *testing.T) {
resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID()
proxyReferenceKey := resource.NewReferenceKey(resourceID) proxyReferenceKey := resource.NewReferenceKey(resourceID)
lim := NewMockSessionLimiter(t) lim := NewMockSessionLimiter(t)
session1 := newMockSession(t) session1 := newMockSession(t)
@ -85,7 +85,7 @@ func TestProxyTracker_Watch_ErrorConsumerNotReady(t *testing.T) {
//fill up buffered channel while the consumer is not ready to simulate the error //fill up buffered channel while the consumer is not ready to simulate the error
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {
event := controller.Event{Obj: &ProxyConnection{ProxyID: resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, fmt.Sprintf("test%d", i)).ID()}} event := controller.Event{Obj: &ProxyConnection{ProxyID: resourcetest.Resource(types.ProxyStateTemplateType, fmt.Sprintf("test%d", i)).ID()}}
pt.newProxyConnectionCh <- event pt.newProxyConnectionCh <- event
} }
@ -121,14 +121,14 @@ func TestProxyTracker_Watch_ArgValidationErrors(t *testing.T) {
}, },
{ {
description: "Empty nodeName", description: "Empty nodeName",
proxyID: resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID(), proxyID: resourcetest.Resource(types.ProxyStateTemplateType, "test").ID(),
nodeName: "", nodeName: "",
token: "something", token: "something",
expectedError: errors.New("nodeName is required"), expectedError: errors.New("nodeName is required"),
}, },
{ {
description: "resource is not ProxyStateTemplate", description: "resource is not ProxyStateTemplate",
proxyID: resourcetest.Resource(mesh.ProxyConfigurationType, "test").ID(), proxyID: resourcetest.Resource(types.ProxyConfigurationType, "test").ID(),
nodeName: "something", nodeName: "something",
token: "something else", token: "something else",
expectedError: errors.New("proxyID must be a ProxyStateTemplate"), expectedError: errors.New("proxyID must be a ProxyStateTemplate"),
@ -155,7 +155,7 @@ func TestProxyTracker_Watch_ArgValidationErrors(t *testing.T) {
} }
func TestProxyTracker_Watch_SessionLimiterError(t *testing.T) { func TestProxyTracker_Watch_SessionLimiterError(t *testing.T) {
resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID()
lim := NewMockSessionLimiter(t) lim := NewMockSessionLimiter(t)
lim.On("BeginSession").Return(nil, errors.New("kaboom")) lim.On("BeginSession").Return(nil, errors.New("kaboom"))
logger := testutil.Logger(t) logger := testutil.Logger(t)
@ -174,7 +174,7 @@ func TestProxyTracker_Watch_SessionLimiterError(t *testing.T) {
} }
func TestProxyTracker_PushChange(t *testing.T) { func TestProxyTracker_PushChange(t *testing.T) {
resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID()
proxyReferenceKey := resource.NewReferenceKey(resourceID) proxyReferenceKey := resource.NewReferenceKey(resourceID)
lim := NewMockSessionLimiter(t) lim := NewMockSessionLimiter(t)
session1 := newMockSession(t) session1 := newMockSession(t)
@ -193,7 +193,7 @@ func TestProxyTracker_PushChange(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// PushChange // PushChange
proxyState := &mesh.ProxyState{ProxyState: &pbmesh.ProxyState{ proxyState := &ProxyState{ProxyState: &pbmesh.ProxyState{
IntentionDefaultAllow: true, IntentionDefaultAllow: true,
}} }}
@ -216,7 +216,7 @@ func TestProxyTracker_PushChange(t *testing.T) {
} }
func TestProxyTracker_PushChanges_ErrorProxyNotConnected(t *testing.T) { func TestProxyTracker_PushChanges_ErrorProxyNotConnected(t *testing.T) {
resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID()
lim := NewMockSessionLimiter(t) lim := NewMockSessionLimiter(t)
logger := testutil.Logger(t) logger := testutil.Logger(t)
@ -226,7 +226,7 @@ func TestProxyTracker_PushChanges_ErrorProxyNotConnected(t *testing.T) {
}) })
// PushChange // PushChange
proxyState := &mesh.ProxyState{ProxyState: &pbmesh.ProxyState{ proxyState := &ProxyState{ProxyState: &pbmesh.ProxyState{
IntentionDefaultAllow: true, IntentionDefaultAllow: true,
}} }}
@ -273,14 +273,14 @@ func TestProxyTracker_ProxyConnectedToServer(t *testing.T) {
Logger: logger, Logger: logger,
SessionLimiter: lim, SessionLimiter: lim,
}) })
resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID()
tc.preProcessingFunc(pt, resourceID, lim, session1, session1TermCh) tc.preProcessingFunc(pt, resourceID, lim, session1, session1TermCh)
require.Equal(t, tc.shouldExist, pt.ProxyConnectedToServer(resourceID)) require.Equal(t, tc.shouldExist, pt.ProxyConnectedToServer(resourceID))
} }
} }
func TestProxyTracker_Shutdown(t *testing.T) { func TestProxyTracker_Shutdown(t *testing.T) {
resourceID := resourcetest.Resource(mesh.ProxyStateTemplateConfigurationType, "test").ID() resourceID := resourcetest.Resource(types.ProxyStateTemplateType, "test").ID()
proxyReferenceKey := resource.NewReferenceKey(resourceID) proxyReferenceKey := resource.NewReferenceKey(resourceID)
lim := NewMockSessionLimiter(t) lim := NewMockSessionLimiter(t)
session1 := newMockSession(t) session1 := newMockSession(t)