proxycfg: Use streaming in connect state

This commit is contained in:
Daniel Nephin 2020-11-10 18:57:35 -05:00
parent 1a764553c0
commit 906834ce8e
5 changed files with 38 additions and 16 deletions

View File

@ -536,6 +536,10 @@ func (a *Agent) Start(ctx context.Context) error {
} }
// Start the proxy config manager. // Start the proxy config manager.
cacheName := cachetype.HealthServicesName
if a.config.UseStreamingBackend {
cacheName = cachetype.StreamingHealthServicesName
}
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{ a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
Cache: a.cache, Cache: a.cache,
Logger: a.logger.Named(logging.ProxyConfig), Logger: a.logger.Named(logging.ProxyConfig),
@ -551,6 +555,7 @@ func (a *Agent) Start(ctx context.Context) error {
}, },
TLSConfigurator: a.tlsConfigurator, TLSConfigurator: a.tlsConfigurator,
IntentionDefaultAllow: intentionDefaultAllow, IntentionDefaultAllow: intentionDefaultAllow,
ServiceHealthCacheName: cacheName,
}) })
if err != nil { if err != nil {
return err return err

View File

@ -4,11 +4,12 @@ import (
"errors" "errors"
"sync" "sync"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
) )
var ( var (
@ -71,6 +72,9 @@ type ManagerConfig struct {
Logger hclog.Logger Logger hclog.Logger
TLSConfigurator *tlsutil.Configurator TLSConfigurator *tlsutil.Configurator
// TODO: replace this field with a type that exposes Notify
ServiceHealthCacheName string
// IntentionDefaultAllow is set by the agent so that we can pass this // IntentionDefaultAllow is set by the agent so that we can pass this
// information to proxies that need to make intention decisions on their // information to proxies that need to make intention decisions on their
// own. // own.
@ -187,7 +191,7 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, token string
} }
var err error var err error
state, err = newState(ns, token) state, err = newState(ns, token, m.ManagerConfig.ServiceHealthCacheName)
if err != nil { if err != nil {
return err return err
} }

View File

@ -342,7 +342,13 @@ func testManager_BasicLifecycle(
state.TriggerSyncChanges = func() {} state.TriggerSyncChanges = func() {}
// Create manager // Create manager
m, err := NewManager(ManagerConfig{c, state, source, DNSConfig{}, logger, nil, false}) m, err := NewManager(ManagerConfig{
Cache: c,
State: state,
Source: source,
Logger: logger,
ServiceHealthCacheName: cachetype.HealthServicesName,
})
require.NoError(err) require.NoError(err)
// And run it // And run it

View File

@ -9,13 +9,14 @@ import (
"strings" "strings"
"time" "time"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/copystructure"
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/copystructure"
"github.com/mitchellh/mapstructure"
) )
type CacheNotifier interface { type CacheNotifier interface {
@ -72,6 +73,9 @@ type state struct {
proxyCfg structs.ConnectProxyConfig proxyCfg structs.ConnectProxyConfig
token string token string
// TODO: replace this field with a type that exposes Notify
serviceHealthCacheName string
ch chan cache.UpdateEvent ch chan cache.UpdateEvent
snapCh chan ConfigSnapshot snapCh chan ConfigSnapshot
reqCh chan chan *ConfigSnapshot reqCh chan chan *ConfigSnapshot
@ -120,7 +124,7 @@ func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error
// //
// The returned state needs its required dependencies to be set before Watch // The returned state needs its required dependencies to be set before Watch
// can be called. // can be called.
func newState(ns *structs.NodeService, token string) (*state, error) { func newState(ns *structs.NodeService, token string, serviceHealthCacheName string) (*state, error) {
switch ns.Kind { switch ns.Kind {
case structs.ServiceKindConnectProxy: case structs.ServiceKindConnectProxy:
case structs.ServiceKindTerminatingGateway: case structs.ServiceKindTerminatingGateway:
@ -155,6 +159,8 @@ func newState(ns *structs.NodeService, token string) (*state, error) {
taggedAddresses: taggedAddresses, taggedAddresses: taggedAddresses,
proxyCfg: proxyCfg, proxyCfg: proxyCfg,
token: token, token: token,
serviceHealthCacheName: serviceHealthCacheName,
// 10 is fairly arbitrary here but allow for the 3 mandatory and a // 10 is fairly arbitrary here but allow for the 3 mandatory and a
// reasonable number of upstream watches to all deliver their initial // reasonable number of upstream watches to all deliver their initial
// messages in parallel without blocking the cache.Notify loops. It's not a // messages in parallel without blocking the cache.Notify loops. It's not a
@ -225,7 +231,7 @@ func (s *state) watchConnectProxyService(ctx context.Context, correlationId stri
var finalMeta structs.EnterpriseMeta var finalMeta structs.EnterpriseMeta
finalMeta.Merge(entMeta) finalMeta.Merge(entMeta)
return s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ return s.cache.Notify(ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{
Datacenter: dc, Datacenter: dc,
QueryOptions: structs.QueryOptions{ QueryOptions: structs.QueryOptions{
Token: s.token, Token: s.token,
@ -443,7 +449,7 @@ func (s *state) initWatchesMeshGateway() error {
return err return err
} }
err = s.cache.Notify(s.ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ err = s.cache.Notify(s.ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter, Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token}, QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: structs.ConsulServiceName, ServiceName: structs.ConsulServiceName,
@ -969,7 +975,7 @@ func (s *state) handleUpdateTerminatingGateway(u cache.UpdateEvent, snap *Config
// Watch the health endpoint to discover endpoints for the service // Watch the health endpoint to discover endpoints for the service
if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok { if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok {
ctx, cancel := context.WithCancel(s.ctx) ctx, cancel := context.WithCancel(s.ctx)
err := s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ err := s.cache.Notify(ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter, Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token}, QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: svc.Service.Name, ServiceName: svc.Service.Name,
@ -1267,7 +1273,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
if _, ok := snap.MeshGateway.WatchedServices[svc]; !ok { if _, ok := snap.MeshGateway.WatchedServices[svc]; !ok {
ctx, cancel := context.WithCancel(s.ctx) ctx, cancel := context.WithCancel(s.ctx)
err := s.cache.Notify(ctx, cachetype.HealthServicesName, &structs.ServiceSpecificRequest{ err := s.cache.Notify(ctx, s.serviceHealthCacheName, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter, Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token}, QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: svc.Name, ServiceName: svc.Name,

View File

@ -6,12 +6,13 @@ import (
"sync" "sync"
"testing" "testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
) )
func TestStateChanged(t *testing.T) { func TestStateChanged(t *testing.T) {
@ -111,7 +112,7 @@ func TestStateChanged(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
require := require.New(t) require := require.New(t)
state, err := newState(tt.ns, tt.token) state, err := newState(tt.ns, tt.token, cachetype.HealthServicesName)
require.NoError(err) require.NoError(err)
otherNS, otherToken := tt.mutate(*tt.ns, tt.token) otherNS, otherToken := tt.mutate(*tt.ns, tt.token)
require.Equal(tt.want, state.Changed(otherNS, otherToken)) require.Equal(tt.want, state.Changed(otherNS, otherToken))
@ -1509,7 +1510,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
for name, tc := range cases { for name, tc := range cases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
state, err := newState(&tc.ns, "") state, err := newState(&tc.ns, "", cachetype.HealthServicesName)
// verify building the initial state worked // verify building the initial state worked
require.NoError(t, err) require.NoError(t, err)