proxycfg: introduce explicit UpstreamID in lieu of bare string (#12125)

The gist here is that now we use a value-type struct proxycfg.UpstreamID
as the map key in ConfigSnapshot maps where we used to use "upstream
id-ish" strings. These are internal only and used just for bidirectional
trips through the agent cache keyspace (like the discovery chain target
struct).

For the few places where the upstream id needs to be projected into xDS,
that's what (proxycfg.UpstreamID).EnvoyID() is for. This lets us ALWAYS
inject the partition and namespace into these things without making
stuff like the golden testdata diverge.
This commit is contained in:
R.B. Boyer 2022-01-20 10:12:04 -06:00 committed by GitHub
parent ca3aca92c4
commit 424f3cdd2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 843 additions and 436 deletions

View File

@ -18,16 +18,16 @@ type handlerConnectProxy struct {
// state.
func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, error) {
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
snap.ConnectProxy.WatchedDiscoveryChains = make(map[string]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreams = make(map[string]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedGateways = make(map[string]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain)
snap.ConnectProxy.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
snap.ConnectProxy.PreparedQueryEndpoints = make(map[string]structs.CheckServiceNodes)
snap.ConnectProxy.UpstreamConfig = make(map[string]*structs.Upstream)
snap.ConnectProxy.PassthroughUpstreams = make(map[string]ServicePassthroughAddrs)
snap.ConnectProxy.PreparedQueryEndpoints = make(map[UpstreamID]structs.CheckServiceNodes)
snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream)
snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]ServicePassthroughAddrs)
// Watch for root changes
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
@ -106,9 +106,11 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
for i := range s.proxyCfg.Upstreams {
u := s.proxyCfg.Upstreams[i]
uid := NewUpstreamID(&u)
// Store defaults keyed under wildcard so they can be applied to centrally configured upstreams
if u.DestinationName == structs.WildcardSpecifier {
snap.ConnectProxy.UpstreamConfig[u.DestinationID().String()] = &u
snap.ConnectProxy.UpstreamConfig[uid] = &u
continue
}
@ -117,7 +119,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
if u.CentrallyConfigured {
continue
}
snap.ConnectProxy.UpstreamConfig[u.Identifier()] = &u
snap.ConnectProxy.UpstreamConfig[uid] = &u
dc := s.source.Datacenter
if u.Datacenter != "" {
@ -144,7 +146,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
// the plain discovery chain if there is an error so it's safe to
// continue.
s.logger.Warn("failed to parse upstream config",
"upstream", u.Identifier(),
"upstream", uid.String(),
"error", err,
)
}
@ -157,7 +159,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
QueryIDOrName: u.DestinationName,
Connect: true,
Source: *s.source,
}, "upstream:"+u.Identifier(), s.ch)
}, "upstream:"+uid.String(), s.ch)
if err != nil {
return snap, err
}
@ -176,9 +178,9 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
OverrideMeshGateway: s.proxyCfg.MeshGateway.OverlayWith(u.MeshGateway),
OverrideProtocol: cfg.Protocol,
OverrideConnectTimeout: cfg.ConnectTimeout(),
}, "discovery-chain:"+u.Identifier(), s.ch)
}, "discovery-chain:"+uid.String(), s.ch)
if err != nil {
return snap, fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
return snap, fmt.Errorf("failed to watch discovery chain for %s: %v", uid.String(), err)
}
default:
@ -220,12 +222,14 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv
return fmt.Errorf("invalid type for response %T", u.Result)
}
seenServices := make(map[string]struct{})
seenUpstreams := make(map[UpstreamID]struct{})
for _, svc := range resp.Services {
seenServices[svc.String()] = struct{}{}
uid := NewUpstreamIDFromServiceName(svc)
seenUpstreams[uid] = struct{}{}
cfgMap := make(map[string]interface{})
u, ok := snap.ConnectProxy.UpstreamConfig[svc.String()]
u, ok := snap.ConnectProxy.UpstreamConfig[uid]
if ok {
cfgMap = u.Config
} else {
@ -233,11 +237,12 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv
// This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled
// by the ResolveServiceConfig endpoint.
wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, s.proxyID.WithWildcardNamespace())
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardSID.String()]
wildcardUID := NewUpstreamIDFromServiceID(wildcardSID)
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardUID]
if ok {
u = defaults
cfgMap = defaults.Config
snap.ConnectProxy.UpstreamConfig[svc.String()] = defaults
snap.ConnectProxy.UpstreamConfig[uid] = defaults
}
}
@ -247,7 +252,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv
// the plain discovery chain if there is an error so it's safe to
// continue.
s.logger.Warn("failed to parse upstream config",
"upstream", u.Identifier(),
"upstream", uid,
"error", err,
)
}
@ -257,7 +262,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv
meshGateway = meshGateway.OverlayWith(u.MeshGateway)
}
watchOpts := discoveryChainWatchOpts{
id: svc.String(),
id: NewUpstreamIDFromServiceName(svc),
name: svc.Name,
namespace: svc.NamespaceOrDefault(),
partition: svc.PartitionOrDefault(),
@ -268,69 +273,69 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv
up := &handlerUpstreams{handlerState: s.handlerState}
err = up.watchDiscoveryChain(ctx, snap, watchOpts)
if err != nil {
return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err)
return fmt.Errorf("failed to watch discovery chain for %s: %v", uid, err)
}
}
snap.ConnectProxy.IntentionUpstreams = seenServices
snap.ConnectProxy.IntentionUpstreams = seenUpstreams
// Clean up data from services that were not in the update
for sn, targets := range snap.ConnectProxy.WatchedUpstreams {
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
for uid, targets := range snap.ConnectProxy.WatchedUpstreams {
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
if _, ok := seenUpstreams[uid]; !ok {
for _, cancelFn := range targets {
cancelFn()
}
delete(snap.ConnectProxy.WatchedUpstreams, sn)
delete(snap.ConnectProxy.WatchedUpstreams, uid)
}
}
for sn := range snap.ConnectProxy.WatchedUpstreamEndpoints {
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
for uid := range snap.ConnectProxy.WatchedUpstreamEndpoints {
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedUpstreamEndpoints, sn)
if _, ok := seenUpstreams[uid]; !ok {
delete(snap.ConnectProxy.WatchedUpstreamEndpoints, uid)
}
}
for sn, cancelMap := range snap.ConnectProxy.WatchedGateways {
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
for uid, cancelMap := range snap.ConnectProxy.WatchedGateways {
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
if _, ok := seenUpstreams[uid]; !ok {
for _, cancelFn := range cancelMap {
cancelFn()
}
delete(snap.ConnectProxy.WatchedGateways, sn)
delete(snap.ConnectProxy.WatchedGateways, uid)
}
}
for sn := range snap.ConnectProxy.WatchedGatewayEndpoints {
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
for uid := range snap.ConnectProxy.WatchedGatewayEndpoints {
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedGatewayEndpoints, sn)
if _, ok := seenUpstreams[uid]; !ok {
delete(snap.ConnectProxy.WatchedGatewayEndpoints, uid)
}
}
for sn, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains {
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
for uid, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains {
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
if _, ok := seenUpstreams[uid]; !ok {
cancelFn()
delete(snap.ConnectProxy.WatchedDiscoveryChains, sn)
delete(snap.ConnectProxy.WatchedDiscoveryChains, uid)
}
}
// These entries are intentionally handled separately from the WatchedDiscoveryChains above.
// There have been situations where a discovery watch was cancelled, then fired.
// That update event then re-populated the DiscoveryChain map entry, which wouldn't get cleaned up
// since there was no known watch for it.
for sn := range snap.ConnectProxy.DiscoveryChain {
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
for uid := range snap.ConnectProxy.DiscoveryChain {
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
continue
}
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.DiscoveryChain, sn)
if _, ok := seenUpstreams[uid]; !ok {
delete(snap.ConnectProxy.DiscoveryChain, uid)
}
}
@ -340,7 +345,8 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv
return fmt.Errorf("invalid type for response: %T", u.Result)
}
pq := strings.TrimPrefix(u.CorrelationID, "upstream:")
snap.ConnectProxy.PreparedQueryEndpoints[pq] = resp.Nodes
uid := UpstreamIDFromString(pq)
snap.ConnectProxy.PreparedQueryEndpoints[uid] = resp.Nodes
case strings.HasPrefix(u.CorrelationID, svcChecksWatchIDPrefix):
resp, ok := u.Result.([]structs.CheckType)

View File

@ -48,12 +48,12 @@ func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot,
return snap, err
}
snap.IngressGateway.WatchedDiscoveryChains = make(map[string]context.CancelFunc)
snap.IngressGateway.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
snap.IngressGateway.WatchedUpstreams = make(map[string]map[string]context.CancelFunc)
snap.IngressGateway.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.IngressGateway.WatchedGateways = make(map[string]map[string]context.CancelFunc)
snap.IngressGateway.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.IngressGateway.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc)
snap.IngressGateway.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain)
snap.IngressGateway.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc)
snap.IngressGateway.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes)
snap.IngressGateway.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc)
snap.IngressGateway.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes)
snap.IngressGateway.Listeners = make(map[IngressListenerKey]structs.IngressListener)
return snap, nil
}
@ -102,13 +102,15 @@ func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u cache.Update
// Update our upstreams and watches.
var hosts []string
watchedSvcs := make(map[string]struct{})
watchedSvcs := make(map[UpstreamID]struct{})
upstreamsMap := make(map[IngressListenerKey]structs.Upstreams)
for _, service := range services.Services {
u := makeUpstream(service)
uid := NewUpstreamID(&u)
watchOpts := discoveryChainWatchOpts{
id: u.Identifier(),
id: uid,
name: u.DestinationName,
namespace: u.DestinationNamespace,
partition: u.DestinationPartition,
@ -117,9 +119,9 @@ func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u cache.Update
up := &handlerUpstreams{handlerState: s.handlerState}
err := up.watchDiscoveryChain(ctx, snap, watchOpts)
if err != nil {
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
return fmt.Errorf("failed to watch discovery chain for %s: %v", uid, err)
}
watchedSvcs[u.Identifier()] = struct{}{}
watchedSvcs[uid] = struct{}{}
hosts = append(hosts, service.Hosts...)
@ -132,10 +134,10 @@ func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u cache.Update
snap.IngressGateway.Hosts = hosts
snap.IngressGateway.HostsSet = true
for id, cancelFn := range snap.IngressGateway.WatchedDiscoveryChains {
if _, ok := watchedSvcs[id]; !ok {
for uid, cancelFn := range snap.IngressGateway.WatchedDiscoveryChains {
if _, ok := watchedSvcs[uid]; !ok {
cancelFn()
delete(snap.IngressGateway.WatchedDiscoveryChains, id)
delete(snap.IngressGateway.WatchedDiscoveryChains, uid)
}
}

View File

@ -187,6 +187,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
})
db := structs.NewServiceName("db", nil)
dbUID := NewUpstreamIDFromServiceName(db)
// Create test cases using some of the common data above.
tests := []*testcase_BasicLifecycle{
@ -214,28 +215,28 @@ func TestManager_BasicLifecycle(t *testing.T) {
ConnectProxy: configSnapshotConnectProxy{
ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{
Leaf: leaf,
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
db.String(): dbDefaultChain(),
DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{
dbUID: dbDefaultChain(),
},
WatchedDiscoveryChains: map[string]context.CancelFunc{},
WatchedDiscoveryChains: map[UpstreamID]context.CancelFunc{},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
db.String(): {
WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{
dbUID: {
"db.default.default.dc1": TestUpstreamNodes(t, db.Name),
},
},
WatchedGateways: nil, // Clone() clears this out
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
db.String(): {},
WatchedGatewayEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{
dbUID: {},
},
UpstreamConfig: map[string]*structs.Upstream{
upstreams[0].Identifier(): &upstreams[0],
upstreams[1].Identifier(): &upstreams[1],
upstreams[2].Identifier(): &upstreams[2],
UpstreamConfig: map[UpstreamID]*structs.Upstream{
NewUpstreamID(&upstreams[0]): &upstreams[0],
NewUpstreamID(&upstreams[1]): &upstreams[1],
NewUpstreamID(&upstreams[2]): &upstreams[2],
},
PassthroughUpstreams: map[string]ServicePassthroughAddrs{},
PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
Intentions: TestIntentions().Matches[0],
IntentionsSet: true,
@ -271,29 +272,29 @@ func TestManager_BasicLifecycle(t *testing.T) {
ConnectProxy: configSnapshotConnectProxy{
ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{
Leaf: leaf,
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
db.String(): dbSplitChain(),
DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{
dbUID: dbSplitChain(),
},
WatchedDiscoveryChains: map[string]context.CancelFunc{},
WatchedDiscoveryChains: map[UpstreamID]context.CancelFunc{},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
db.String(): {
WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{
dbUID: {
"v1.db.default.default.dc1": TestUpstreamNodes(t, db.Name),
"v2.db.default.default.dc1": TestUpstreamNodesAlternate(t),
},
},
WatchedGateways: nil, // Clone() clears this out
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
db.String(): {},
WatchedGatewayEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{
dbUID: {},
},
UpstreamConfig: map[string]*structs.Upstream{
upstreams[0].Identifier(): &upstreams[0],
upstreams[1].Identifier(): &upstreams[1],
upstreams[2].Identifier(): &upstreams[2],
UpstreamConfig: map[UpstreamID]*structs.Upstream{
NewUpstreamID(&upstreams[0]): &upstreams[0],
NewUpstreamID(&upstreams[1]): &upstreams[1],
NewUpstreamID(&upstreams[2]): &upstreams[2],
},
PassthroughUpstreams: map[string]ServicePassthroughAddrs{},
PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
Intentions: TestIntentions().Matches[0],
IntentionsSet: true,

130
agent/proxycfg/naming.go Normal file
View File

@ -0,0 +1,130 @@
package proxycfg
import (
"strings"
"github.com/hashicorp/consul/agent/structs"
)
type UpstreamID struct {
Type string
Name string
Datacenter string
structs.EnterpriseMeta
}
func NewUpstreamID(u *structs.Upstream) UpstreamID {
id := UpstreamID{
Type: u.DestinationType,
Name: u.DestinationName,
Datacenter: u.Datacenter,
EnterpriseMeta: structs.NewEnterpriseMetaWithPartition(
u.DestinationPartition,
u.DestinationNamespace,
),
}
id.normalize()
return id
}
func NewUpstreamIDFromServiceName(sn structs.ServiceName) UpstreamID {
id := UpstreamID{
Name: sn.Name,
EnterpriseMeta: sn.EnterpriseMeta,
}
id.normalize()
return id
}
func NewUpstreamIDFromServiceID(sid structs.ServiceID) UpstreamID {
id := UpstreamID{
Name: sid.ID,
EnterpriseMeta: sid.EnterpriseMeta,
}
id.normalize()
return id
}
func (u *UpstreamID) normalize() {
if u.Type == structs.UpstreamDestTypeService {
u.Type = ""
}
u.EnterpriseMeta.Normalize()
}
// String encodes the UpstreamID into a string for use in agent cache keys.
// You can decode it back again using UpstreamIDFromString.
func (u UpstreamID) String() string {
return UpstreamIDString(u.Type, u.Datacenter, u.Name, &u.EnterpriseMeta)
}
func (u UpstreamID) GoString() string {
return u.String()
}
func UpstreamIDFromString(input string) UpstreamID {
typ, dc, name, entMeta := ParseUpstreamIDString(input)
id := UpstreamID{
Type: typ,
Datacenter: dc,
Name: name,
EnterpriseMeta: *entMeta,
}
id.normalize()
return id
}
const upstreamTypePreparedQueryPrefix = structs.UpstreamDestTypePreparedQuery + ":"
func ParseUpstreamIDString(input string) (typ, dc, name string, meta *structs.EnterpriseMeta) {
if strings.HasPrefix(input, upstreamTypePreparedQueryPrefix) {
typ = structs.UpstreamDestTypePreparedQuery
input = strings.TrimPrefix(input, upstreamTypePreparedQueryPrefix)
}
idx := strings.LastIndex(input, "?dc=")
if idx != -1 {
dc = input[idx+4:]
input = input[0:idx]
}
name, meta = parseInnerUpstreamIDString(input)
return typ, dc, name, meta
}
// EnvoyID returns a string representation that uniquely identifies the
// upstream in a canonical but human readable way.
//
// This should be used for any situation where we generate identifiers in Envoy
// xDS structures for this upstream.
//
// This will ensure that generated identifiers for the same thing in OSS and
// Enterprise render the same and omit default namespaces and partitions.
func (u UpstreamID) EnvoyID() string {
name := u.enterpriseIdentifierPrefix() + u.Name
typ := u.Type
if u.Datacenter != "" {
name += "?dc=" + u.Datacenter
}
// Service is default type so never prefix it. This is more readable and long
// term it is the only type that matters so we can drop the prefix and have
// nicer naming in metrics etc.
if typ == "" || typ == structs.UpstreamDestTypeService {
return name
}
return typ + ":" + name
}
func UpstreamsToMap(us structs.Upstreams) map[UpstreamID]*structs.Upstream {
upstreamMap := make(map[UpstreamID]*structs.Upstream)
for i := range us {
u := us[i]
upstreamMap[NewUpstreamID(&u)] = &u
}
return upstreamMap
}

View File

@ -0,0 +1,30 @@
//go:build !consulent
// +build !consulent
package proxycfg
import (
"github.com/hashicorp/consul/agent/structs"
)
func UpstreamIDString(typ, dc, name string, _ *structs.EnterpriseMeta) string {
ret := name
if dc != "" {
ret += "?dc=" + dc
}
if typ == "" || typ == structs.UpstreamDestTypeService {
return ret
}
return typ + ":" + ret
}
func parseInnerUpstreamIDString(input string) (string, *structs.EnterpriseMeta) {
return input, structs.DefaultEnterpriseMetaInDefaultPartition()
}
func (u UpstreamID) enterpriseIdentifierPrefix() string {
return ""
}

View File

@ -0,0 +1,195 @@
package proxycfg
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
)
func TestUpstreamIDFromString(t *testing.T) {
type testcase struct {
id string
expect UpstreamID
}
run := func(t *testing.T, tc testcase) {
tc.expect.EnterpriseMeta.Normalize()
got := UpstreamIDFromString(tc.id)
require.Equal(t, tc.expect, got)
}
prefix := ""
if structs.DefaultEnterpriseMetaInDefaultPartition().PartitionOrEmpty() != "" {
prefix = "default/default/"
}
cases := map[string]testcase{
"prepared query": {
"prepared_query:" + prefix + "foo",
UpstreamID{
Type: structs.UpstreamDestTypePreparedQuery,
Name: "foo",
},
},
"prepared query dc": {
"prepared_query:" + prefix + "foo?dc=dc2",
UpstreamID{
Type: structs.UpstreamDestTypePreparedQuery,
Name: "foo",
Datacenter: "dc2",
},
},
"normal": {
prefix + "foo",
UpstreamID{
Name: "foo",
},
},
"normal dc": {
prefix + "foo?dc=dc2",
UpstreamID{
Name: "foo",
Datacenter: "dc2",
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
run(t, tc)
})
}
}
func TestUpstreamID_String(t *testing.T) {
type testcase struct {
u UpstreamID
expect string
}
run := func(t *testing.T, tc testcase) {
got := tc.u.String()
require.Equal(t, tc.expect, got)
}
prefix := ""
if structs.DefaultEnterpriseMetaInDefaultPartition().PartitionOrEmpty() != "" {
prefix = "default/default/"
}
cases := map[string]testcase{
"prepared query": {
UpstreamID{
Type: structs.UpstreamDestTypePreparedQuery,
Name: "foo",
},
"prepared_query:" + prefix + "foo",
},
"prepared query dc": {
UpstreamID{
Type: structs.UpstreamDestTypePreparedQuery,
Name: "foo",
Datacenter: "dc2",
},
"prepared_query:" + prefix + "foo?dc=dc2",
},
"normal implicit": {
UpstreamID{
Name: "foo",
},
prefix + "foo",
},
"normal implicit dc": {
UpstreamID{
Name: "foo",
Datacenter: "dc2",
},
prefix + "foo?dc=dc2",
},
"normal explicit": {
UpstreamID{
Type: structs.UpstreamDestTypeService,
Name: "foo",
},
prefix + "foo",
},
"normal explicit dc": {
UpstreamID{
Type: structs.UpstreamDestTypeService,
Name: "foo",
Datacenter: "dc2",
},
prefix + "foo?dc=dc2",
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
run(t, tc)
})
}
}
func TestUpstreamID_EnvoyID(t *testing.T) {
type testcase struct {
u UpstreamID
expect string
}
run := func(t *testing.T, tc testcase) {
got := tc.u.EnvoyID()
require.Equal(t, tc.expect, got)
}
cases := map[string]testcase{
"prepared query": {
UpstreamID{
Type: structs.UpstreamDestTypePreparedQuery,
Name: "foo",
},
"prepared_query:foo",
},
"prepared query dc": {
UpstreamID{
Type: structs.UpstreamDestTypePreparedQuery,
Name: "foo",
Datacenter: "dc2",
},
"prepared_query:foo?dc=dc2",
},
"normal implicit": {
UpstreamID{
Name: "foo",
},
"foo",
},
"normal implicit dc": {
UpstreamID{
Name: "foo",
Datacenter: "dc2",
},
"foo?dc=dc2",
},
"normal explicit": {
UpstreamID{
Type: structs.UpstreamDestTypeService,
Name: "foo",
},
"foo",
},
"normal explicit dc": {
UpstreamID{
Type: structs.UpstreamDestTypeService,
Name: "foo",
Datacenter: "dc2",
},
"foo?dc=dc2",
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
run(t, tc)
})
}
}

View File

@ -18,47 +18,47 @@ import (
type ConfigSnapshotUpstreams struct {
Leaf *structs.IssuedCert
// DiscoveryChain is a map of upstream.Identifier() ->
// CompiledDiscoveryChain's, and is used to determine what services could be
// targeted by this upstream. We then instantiate watches for those targets.
DiscoveryChain map[string]*structs.CompiledDiscoveryChain
// DiscoveryChain is a map of UpstreamID -> CompiledDiscoveryChain's, and
// is used to determine what services could be targeted by this upstream.
// We then instantiate watches for those targets.
DiscoveryChain map[UpstreamID]*structs.CompiledDiscoveryChain
// WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's
// WatchedDiscoveryChains is a map of UpstreamID -> CancelFunc's
// in order to cancel any watches when the proxy's configuration is
// changed. Ingress gateways and transparent proxies need this because
// discovery chain watches are added and removed through the lifecycle
// of a single proxycfg state instance.
WatchedDiscoveryChains map[string]context.CancelFunc
WatchedDiscoveryChains map[UpstreamID]context.CancelFunc
// WatchedUpstreams is a map of upstream.Identifier() -> (map of TargetID ->
// WatchedUpstreams is a map of UpstreamID -> (map of TargetID ->
// CancelFunc's) in order to cancel any watches when the configuration is
// changed.
WatchedUpstreams map[string]map[string]context.CancelFunc
WatchedUpstreams map[UpstreamID]map[string]context.CancelFunc
// WatchedUpstreamEndpoints is a map of upstream.Identifier() -> (map of
// WatchedUpstreamEndpoints is a map of UpstreamID -> (map of
// TargetID -> CheckServiceNodes) and is used to determine the backing
// endpoints of an upstream.
WatchedUpstreamEndpoints map[string]map[string]structs.CheckServiceNodes
WatchedUpstreamEndpoints map[UpstreamID]map[string]structs.CheckServiceNodes
// WatchedGateways is a map of upstream.Identifier() -> (map of
// GatewayKey.String() -> CancelFunc) in order to cancel watches for mesh gateways
WatchedGateways map[string]map[string]context.CancelFunc
// WatchedGateways is a map of UpstreamID -> (map of GatewayKey.String() ->
// CancelFunc) in order to cancel watches for mesh gateways
WatchedGateways map[UpstreamID]map[string]context.CancelFunc
// WatchedGatewayEndpoints is a map of upstream.Identifier() -> (map of
// GatewayKey.String() -> CheckServiceNodes) and is used to determine the backing
// endpoints of a mesh gateway.
WatchedGatewayEndpoints map[string]map[string]structs.CheckServiceNodes
// WatchedGatewayEndpoints is a map of UpstreamID -> (map of
// GatewayKey.String() -> CheckServiceNodes) and is used to determine the
// backing endpoints of a mesh gateway.
WatchedGatewayEndpoints map[UpstreamID]map[string]structs.CheckServiceNodes
// UpstreamConfig is a map to an upstream's configuration.
UpstreamConfig map[string]*structs.Upstream
UpstreamConfig map[UpstreamID]*structs.Upstream
// PassthroughEndpoints is a map of: ServiceName -> ServicePassthroughAddrs.
PassthroughUpstreams map[string]ServicePassthroughAddrs
// PassthroughEndpoints is a map of: UpstreamID -> ServicePassthroughAddrs.
PassthroughUpstreams map[UpstreamID]ServicePassthroughAddrs
// IntentionUpstreams is a set of upstreams inferred from intentions.
// The keys are created with structs.ServiceName.String().
//
// This list only applies to proxies registered in 'transparent' mode.
IntentionUpstreams map[string]struct{}
IntentionUpstreams map[UpstreamID]struct{}
}
type GatewayKey struct {
@ -107,7 +107,7 @@ type configSnapshotConnectProxy struct {
ConfigSnapshotUpstreams
WatchedServiceChecks map[structs.ServiceID][]structs.CheckType // TODO: missing garbage collection
PreparedQueryEndpoints map[string]structs.CheckServiceNodes // DEPRECATED:see:WatchedUpstreamEndpoints
PreparedQueryEndpoints map[UpstreamID]structs.CheckServiceNodes // DEPRECATED:see:WatchedUpstreamEndpoints
// NOTE: Intentions stores a list of lists as returned by the Intentions
// Match RPC. So far we only use the first list as the list of matching
@ -371,8 +371,8 @@ type configSnapshotIngressGateway struct {
// the GatewayServices RPC to retrieve them.
Upstreams map[IngressListenerKey]structs.Upstreams
// UpstreamsSet is the unique set of upstream.Identifier() the gateway routes to.
UpstreamsSet map[string]struct{}
// UpstreamsSet is the unique set of UpstreamID the gateway routes to.
UpstreamsSet map[UpstreamID]struct{}
// Listeners is the original listener config from the ingress-gateway config
// entry to save us trying to pass fields through Upstreams

View File

@ -437,7 +437,7 @@ type gatewayWatchOpts struct {
source structs.QuerySource
token string
key GatewayKey
upstreamID string
upstreamID UpstreamID
}
func watchMeshGateway(ctx context.Context, opts gatewayWatchOpts) error {
@ -448,5 +448,5 @@ func watchMeshGateway(ctx context.Context, opts gatewayWatchOpts) error {
UseServiceKind: true,
Source: opts.source,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(opts.key.Partition),
}, fmt.Sprintf("mesh-gateway:%s:%s", opts.key.String(), opts.upstreamID), opts.notifyCh)
}, fmt.Sprintf("mesh-gateway:%s:%s", opts.key.String(), opts.upstreamID.String()), opts.notifyCh)
}

View File

@ -381,8 +381,9 @@ func ingressConfigWatchEvent(gwTLS bool, mixedTLS bool) cache.UpdateEvent {
}
}
func upstreamIDForDC2(name string) string {
return fmt.Sprintf("%s?dc=dc2", name)
func upstreamIDForDC2(uid UpstreamID) UpstreamID {
uid.Datacenter = "dc2"
return uid
}
// This test is meant to exercise the various parts of the cache watching done by the state as
@ -410,6 +411,10 @@ func TestState_WatchesAndUpdates(t *testing.T) {
db = structs.NewServiceName("db", nil)
billing = structs.NewServiceName("billing", nil)
api = structs.NewServiceName("api", nil)
apiUID = NewUpstreamIDFromServiceName(api)
dbUID = NewUpstreamIDFromServiceName(db)
pqUID = UpstreamIDFromString("prepared_query:query")
)
rootWatchEvent := func() cache.UpdateEvent {
@ -501,8 +506,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
rootsWatchID: genVerifyRootsWatch("dc1"),
leafWatchID: genVerifyLeafWatch("web", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("web", "dc1"),
"upstream:prepared_query:query": genVerifyPreparedQueryWatch("query", "dc1"),
fmt.Sprintf("discovery-chain:%s", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
"upstream:" + pqUID.String(): genVerifyPreparedQueryWatch("query", "dc1"),
fmt.Sprintf("discovery-chain:%s", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "api",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
@ -512,7 +517,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Mode: meshGatewayProxyConfigValue,
},
}),
fmt.Sprintf("discovery-chain:%s-failover-remote?dc=dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
fmt.Sprintf("discovery-chain:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "api-failover-remote",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
@ -522,7 +527,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Mode: structs.MeshGatewayModeRemote,
},
}),
fmt.Sprintf("discovery-chain:%s-failover-local?dc=dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
fmt.Sprintf("discovery-chain:%s-failover-local?dc=dc2", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "api-failover-local",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
@ -532,7 +537,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Mode: structs.MeshGatewayModeLocal,
},
}),
fmt.Sprintf("discovery-chain:%s-failover-direct?dc=dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
fmt.Sprintf("discovery-chain:%s-failover-direct?dc=dc2", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "api-failover-direct",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
@ -542,7 +547,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Mode: structs.MeshGatewayModeNone,
},
}),
fmt.Sprintf("discovery-chain:%s-dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
fmt.Sprintf("discovery-chain:%s-dc2", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "api-dc2",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
@ -566,7 +571,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Err: nil,
},
{
CorrelationID: fmt.Sprintf("discovery-chain:%s", api.String()),
CorrelationID: fmt.Sprintf("discovery-chain:%s", apiUID.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
@ -576,7 +581,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Err: nil,
},
{
CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-remote?dc=dc2", api.String()),
CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-remote?dc=dc2", apiUID.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-remote", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
@ -586,7 +591,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Err: nil,
},
{
CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-local?dc=dc2", api.String()),
CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-local?dc=dc2", apiUID.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-local", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
@ -596,7 +601,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Err: nil,
},
{
CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-direct?dc=dc2", api.String()),
CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-direct?dc=dc2", apiUID.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-direct", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
@ -606,7 +611,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Err: nil,
},
{
CorrelationID: fmt.Sprintf("discovery-chain:%s-dc2", api.String()),
CorrelationID: fmt.Sprintf("discovery-chain:%s-dc2", apiUID.String()),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api-dc2", "default", "default", "dc1", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
@ -645,12 +650,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
stage1 := verificationStage{
requiredWatches: map[string]verifyWatchRequest{
fmt.Sprintf("upstream-target:api.default.default.dc1:%s", api.String()): genVerifyServiceWatch("api", "", "dc1", true),
fmt.Sprintf("upstream-target:api-failover-remote.default.default.dc2:%s-failover-remote?dc=dc2", api.String()): genVerifyServiceWatch("api-failover-remote", "", "dc2", true),
fmt.Sprintf("upstream-target:api-failover-local.default.default.dc2:%s-failover-local?dc=dc2", api.String()): genVerifyServiceWatch("api-failover-local", "", "dc2", true),
fmt.Sprintf("upstream-target:api-failover-direct.default.default.dc2:%s-failover-direct?dc=dc2", api.String()): genVerifyServiceWatch("api-failover-direct", "", "dc2", true),
fmt.Sprintf("mesh-gateway:dc2:%s-failover-remote?dc=dc2", api.String()): genVerifyGatewayWatch("dc2"),
fmt.Sprintf("mesh-gateway:dc1:%s-failover-local?dc=dc2", api.String()): genVerifyGatewayWatch("dc1"),
fmt.Sprintf("upstream-target:api.default.default.dc1:%s", apiUID.String()): genVerifyServiceWatch("api", "", "dc1", true),
fmt.Sprintf("upstream-target:api-failover-remote.default.default.dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-remote", "", "dc2", true),
fmt.Sprintf("upstream-target:api-failover-local.default.default.dc2:%s-failover-local?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-local", "", "dc2", true),
fmt.Sprintf("upstream-target:api-failover-direct.default.default.dc2:%s-failover-direct?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-direct", "", "dc2", true),
fmt.Sprintf("mesh-gateway:dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc2"),
fmt.Sprintf("mesh-gateway:dc1:%s-failover-local?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
@ -673,7 +678,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}
if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal {
stage1.requiredWatches[fmt.Sprintf("mesh-gateway:dc1:%s-dc2", api.String())] = genVerifyGatewayWatch("dc1")
stage1.requiredWatches[fmt.Sprintf("mesh-gateway:dc1:%s-dc2", apiUID.String())] = genVerifyGatewayWatch("dc1")
}
return testCase{
@ -999,7 +1004,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
})
require.Len(t, snap.IngressGateway.WatchedDiscoveryChains, 1)
require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, api.String())
require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, apiUID)
},
},
{
@ -1020,7 +1025,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
{
requiredWatches: map[string]verifyWatchRequest{
"discovery-chain:" + api.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
"discovery-chain:" + apiUID.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "api",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
@ -1030,7 +1035,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
events: []cache.UpdateEvent{
{
CorrelationID: "discovery-chain:" + api.String(),
CorrelationID: "discovery-chain:" + apiUID.String(),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil),
},
@ -1039,16 +1044,16 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.IngressGateway.WatchedUpstreams, 1)
require.Len(t, snap.IngressGateway.WatchedUpstreams[api.String()], 1)
require.Len(t, snap.IngressGateway.WatchedUpstreams[apiUID], 1)
},
},
{
requiredWatches: map[string]verifyWatchRequest{
"upstream-target:api.default.default.dc1:" + api.String(): genVerifyServiceWatch("api", "", "dc1", true),
"upstream-target:api.default.default.dc1:" + apiUID.String(): genVerifyServiceWatch("api", "", "dc1", true),
},
events: []cache.UpdateEvent{
{
CorrelationID: "upstream-target:api.default.default.dc1:" + api.String(),
CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
@ -1068,10 +1073,10 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints, 1)
require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints, api.String())
require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints[api.String()], 1)
require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints[api.String()], "api.default.default.dc1")
require.Equal(t, snap.IngressGateway.WatchedUpstreamEndpoints[api.String()]["api.default.default.dc1"],
require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints, apiUID)
require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints[apiUID], 1)
require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints[apiUID], "api.default.default.dc1")
require.Equal(t, snap.IngressGateway.WatchedUpstreamEndpoints[apiUID]["api.default.default.dc1"],
structs.CheckServiceNodes{
{
Node: &structs.Node{
@ -1135,7 +1140,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Len(t, snap.IngressGateway.Hosts, 1)
require.Len(t, snap.IngressGateway.Upstreams, 1)
require.Len(t, snap.IngressGateway.WatchedDiscoveryChains, 1)
require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, api.String())
require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, apiUID)
},
},
{
@ -1220,7 +1225,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Len(t, snap.IngressGateway.Hosts, 1)
require.Len(t, snap.IngressGateway.Upstreams, 1)
require.Len(t, snap.IngressGateway.WatchedDiscoveryChains, 1)
require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, api.String())
require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, apiUID)
},
},
{
@ -1682,8 +1687,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.True(t, snap.TerminatingGateway.IsEmpty())
require.False(t, snap.ConnectProxy.IsEmpty())
expectUpstreams := map[string]*structs.Upstream{
db.String(): {
expectUpstreams := map[UpstreamID]*structs.Upstream{
dbUID: {
DestinationName: "db",
DestinationNamespace: structs.IntentionDefaultNamespace,
DestinationPartition: structs.IntentionDefaultNamespace,
@ -1771,7 +1776,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Len(t, snap.ConnectProxy.UpstreamConfig, 1)
wc := structs.NewServiceName(structs.WildcardSpecifier, structs.WildcardEnterpriseMetaInDefaultPartition())
require.Contains(t, snap.ConnectProxy.UpstreamConfig, wc.String())
wcUID := NewUpstreamIDFromServiceName(wc)
require.Contains(t, snap.ConnectProxy.UpstreamConfig, wcUID)
},
},
// Valid snapshot after roots, leaf, and intentions
@ -1833,16 +1839,16 @@ func TestState_WatchesAndUpdates(t *testing.T) {
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "should still be valid")
require.Equal(t, map[string]struct{}{db.String(): {}}, snap.ConnectProxy.IntentionUpstreams)
require.Equal(t, map[UpstreamID]struct{}{dbUID: {}}, snap.ConnectProxy.IntentionUpstreams)
// Should start watch for db's chain
require.Contains(t, snap.ConnectProxy.WatchedDiscoveryChains, db.String())
require.Contains(t, snap.ConnectProxy.WatchedDiscoveryChains, dbUID)
// Should not have results yet
require.Empty(t, snap.ConnectProxy.DiscoveryChain)
require.Len(t, snap.ConnectProxy.UpstreamConfig, 2)
cfg, ok := snap.ConnectProxy.UpstreamConfig[db.String()]
cfg, ok := snap.ConnectProxy.UpstreamConfig[dbUID]
require.True(t, ok)
// Upstream config should have been inherited from defaults under wildcard key
@ -1852,7 +1858,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// Discovery chain updates should be stored
{
requiredWatches: map[string]verifyWatchRequest{
"discovery-chain:" + db.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
"discovery-chain:" + dbUID.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
@ -1864,7 +1870,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
events: []cache.UpdateEvent{
{
CorrelationID: "discovery-chain:" + db.String(),
CorrelationID: "discovery-chain:" + dbUID.String(),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", nil),
},
@ -1873,16 +1879,16 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[db.String()], 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[dbUID], 1)
},
},
{
requiredWatches: map[string]verifyWatchRequest{
"upstream-target:db.default.default.dc1:" + db.String(): genVerifyServiceWatch("db", "", "dc1", true),
"upstream-target:db.default.default.dc1:" + dbUID.String(): genVerifyServiceWatch("db", "", "dc1", true),
},
events: []cache.UpdateEvent{
{
CorrelationID: "upstream-target:db.default.default.dc1:" + db.String(),
CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
@ -1931,10 +1937,10 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, db.String())
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[db.String()], 1)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[db.String()], "db.default.default.dc1")
require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[db.String()]["db.default.default.dc1"],
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, dbUID)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID], 1)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID], "db.default.default.dc1")
require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"],
structs.CheckServiceNodes{
{
Node: &structs.Node{
@ -1980,8 +1986,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// The LAN service address is used below because transparent proxying
// does not support querying service nodes in other DCs, and the WAN address
// should not be used in DC-local calls.
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[string]ServicePassthroughAddrs{
db.String(): {
require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]ServicePassthroughAddrs{
dbUID: {
SNI: connect.ServiceSNI("db", "", structs.IntentionDefaultNamespace, "", snap.Datacenter, snap.Roots.TrustDomain),
SpiffeID: connect.SpiffeIDService{
Host: snap.Roots.TrustDomain,
@ -2001,7 +2007,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// Discovery chain updates should be stored
{
requiredWatches: map[string]verifyWatchRequest{
"discovery-chain:" + db.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
"discovery-chain:" + dbUID.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
@ -2013,7 +2019,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
events: []cache.UpdateEvent{
{
CorrelationID: "discovery-chain:" + db.String(),
CorrelationID: "discovery-chain:" + dbUID.String(),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", nil, &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
@ -2028,12 +2034,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[db.String()], 2)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[dbUID], 2)
// In transparent mode we watch the upstream's endpoints even if the upstream is not a target of its chain.
// This will happen in cases like redirects.
require.Contains(t, snap.ConnectProxy.WatchedUpstreams[db.String()], "db.default.default.dc1")
require.Contains(t, snap.ConnectProxy.WatchedUpstreams[db.String()], "mysql.default.default.dc1")
require.Contains(t, snap.ConnectProxy.WatchedUpstreams[dbUID], "db.default.default.dc1")
require.Contains(t, snap.ConnectProxy.WatchedUpstreams[dbUID], "mysql.default.default.dc1")
},
},
// Empty list of upstreams should clean everything up
@ -2110,7 +2116,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
"discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
"discovery-chain:" + upstreamIDForDC2(dbUID).String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
@ -2129,8 +2135,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Len(t, snap.ConnectProxy.UpstreamConfig, 2)
wc := structs.NewServiceName(structs.WildcardSpecifier, structs.WildcardEnterpriseMetaInDefaultPartition())
require.Contains(t, snap.ConnectProxy.UpstreamConfig, wc.String())
require.Contains(t, snap.ConnectProxy.UpstreamConfig, upstreamIDForDC2(db.String()))
wcUID := NewUpstreamIDFromServiceName(wc)
require.Contains(t, snap.ConnectProxy.UpstreamConfig, wcUID)
require.Contains(t, snap.ConnectProxy.UpstreamConfig, upstreamIDForDC2(dbUID))
},
},
// Valid snapshot after roots, leaf, and intentions
@ -2172,7 +2179,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// Discovery chain updates should be stored
{
requiredWatches: map[string]verifyWatchRequest{
"discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
"discovery-chain:" + upstreamIDForDC2(dbUID).String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
@ -2183,7 +2190,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
events: []cache.UpdateEvent{
{
CorrelationID: "discovery-chain:" + upstreamIDForDC2(db.String()),
CorrelationID: "discovery-chain:" + upstreamIDForDC2(dbUID).String(),
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc2", "trustdomain.consul",
func(req *discoverychain.CompileRequest) {
@ -2195,9 +2202,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedGateways, 1)
require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1)
require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(dbUID)], 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(dbUID)], 1)
},
},
// Empty list of upstreams should only clean up implicit upstreams. The explicit upstream db should not
@ -2209,7 +2216,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
"discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
"discovery-chain:" + upstreamIDForDC2(dbUID).String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: "db",
EvaluateInDatacenter: "dc2",
EvaluateInNamespace: "default",
@ -2238,11 +2245,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// Explicit upstreams should not be deleted when the empty update event happens since that is
// for intention upstreams.
require.Len(t, snap.ConnectProxy.DiscoveryChain, 1)
require.Contains(t, snap.ConnectProxy.DiscoveryChain, upstreamIDForDC2(db.String()))
require.Contains(t, snap.ConnectProxy.DiscoveryChain, upstreamIDForDC2(dbUID))
require.Len(t, snap.ConnectProxy.WatchedGateways, 1)
require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1)
require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(dbUID)], 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(dbUID)], 1)
},
},
},

View File

@ -696,18 +696,18 @@ func TestConfigSnapshot(t testing.T) *ConfigSnapshot {
ConnectProxy: configSnapshotConnectProxy{
ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{
Leaf: leaf,
UpstreamConfig: upstreams.ToMap(),
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbChain,
UpstreamConfig: UpstreamsToMap(upstreams),
DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{
UpstreamIDFromString("db"): dbChain,
},
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {
WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{
UpstreamIDFromString("db"): {
"db.default.default.dc1": TestUpstreamNodes(t, "db"),
},
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{
"prepared_query:geo-cache": TestPreparedQueryNodes(t, "geo-cache"),
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{
UpstreamIDFromString("prepared_query:geo-cache"): TestPreparedQueryNodes(t, "geo-cache"),
},
Intentions: nil, // no intentions defined
IntentionsSet: true,
@ -822,8 +822,8 @@ func testConfigSnapshotDiscoveryChain(t testing.T, variation string, additionalE
ConfigSnapshotUpstreams: setupTestVariationConfigEntriesAndSnapshot(
t, variation, leaf, additionalEntries...,
),
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{
"prepared_query:geo-cache": TestPreparedQueryNodes(t, "geo-cache"),
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{
UpstreamIDFromString("prepared_query:geo-cache"): TestPreparedQueryNodes(t, "geo-cache"),
},
Intentions: nil, // no intentions defined
IntentionsSet: true,
@ -1402,18 +1402,20 @@ func setupTestVariationConfigEntriesAndSnapshot(
dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", compileSetup, entries...)
dbUID := UpstreamIDFromString("db")
upstreams := structs.TestUpstreams(t)
snap := ConfigSnapshotUpstreams{
Leaf: leaf,
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbChain,
DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{
dbUID: dbChain,
},
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {
WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{
dbUID: {
"db.default.default.dc1": TestUpstreamNodes(t, "db"),
},
},
UpstreamConfig: upstreams.ToMap(),
UpstreamConfig: UpstreamsToMap(upstreams),
}
switch variation {
@ -1422,61 +1424,61 @@ func setupTestVariationConfigEntriesAndSnapshot(
case "simple":
case "external-sni":
case "failover":
snap.WatchedUpstreamEndpoints["db"]["fail.default.default.dc1"] =
snap.WatchedUpstreamEndpoints[dbUID]["fail.default.default.dc1"] =
TestUpstreamNodesAlternate(t)
case "failover-through-remote-gateway-triggered":
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc1"] =
snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"] =
TestUpstreamNodesInStatus(t, "critical")
fallthrough
case "failover-through-remote-gateway":
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc2"] =
snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc2"] =
TestUpstreamNodesDC2(t)
snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{
"db": {
snap.WatchedGatewayEndpoints = map[UpstreamID]map[string]structs.CheckServiceNodes{
dbUID: {
"dc2": TestGatewayNodesDC2(t),
},
}
case "failover-through-double-remote-gateway-triggered":
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc1"] =
snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"] =
TestUpstreamNodesInStatus(t, "critical")
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc2"] =
snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc2"] =
TestUpstreamNodesInStatusDC2(t, "critical")
fallthrough
case "failover-through-double-remote-gateway":
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc3"] = TestUpstreamNodesDC2(t)
snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{
"db": {
snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc3"] = TestUpstreamNodesDC2(t)
snap.WatchedGatewayEndpoints = map[UpstreamID]map[string]structs.CheckServiceNodes{
dbUID: {
"dc2": TestGatewayNodesDC2(t),
"dc3": TestGatewayNodesDC3(t),
},
}
case "failover-through-local-gateway-triggered":
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc1"] =
snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"] =
TestUpstreamNodesInStatus(t, "critical")
fallthrough
case "failover-through-local-gateway":
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc2"] =
snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc2"] =
TestUpstreamNodesDC2(t)
snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{
"db": {
snap.WatchedGatewayEndpoints = map[UpstreamID]map[string]structs.CheckServiceNodes{
dbUID: {
"dc1": TestGatewayNodesDC1(t),
},
}
case "failover-through-double-local-gateway-triggered":
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc1"] =
snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"] =
TestUpstreamNodesInStatus(t, "critical")
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc2"] =
snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc2"] =
TestUpstreamNodesInStatusDC2(t, "critical")
fallthrough
case "failover-through-double-local-gateway":
snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc3"] = TestUpstreamNodesDC2(t)
snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{
"db": {
snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc3"] = TestUpstreamNodesDC2(t)
snap.WatchedGatewayEndpoints = map[UpstreamID]map[string]structs.CheckServiceNodes{
dbUID: {
"dc1": TestGatewayNodesDC1(t),
},
}
case "splitter-with-resolver-redirect-multidc":
snap.WatchedUpstreamEndpoints["db"] = map[string]structs.CheckServiceNodes{
snap.WatchedUpstreamEndpoints[dbUID] = map[string]structs.CheckServiceNodes{
"v1.db.default.default.dc1": TestUpstreamNodes(t, "db"),
"v2.db.default.default.dc2": TestUpstreamNodesDC2(t),
}
@ -1484,10 +1486,10 @@ func setupTestVariationConfigEntriesAndSnapshot(
case "grpc-router":
case "chain-and-router":
case "http-multiple-services":
snap.WatchedUpstreamEndpoints["foo"] = map[string]structs.CheckServiceNodes{
snap.WatchedUpstreamEndpoints[UpstreamIDFromString("foo")] = map[string]structs.CheckServiceNodes{
"foo.default.default.dc1": TestUpstreamNodes(t, "foo"),
}
snap.WatchedUpstreamEndpoints["bar"] = map[string]structs.CheckServiceNodes{
snap.WatchedUpstreamEndpoints[UpstreamIDFromString("bar")] = map[string]structs.CheckServiceNodes{
"bar.default.default.dc1": TestUpstreamNodesAlternate(t),
}
case "lb-resolver":
@ -2147,9 +2149,9 @@ func TestConfigSnapshotIngress_MultipleListenersDuplicateService(t testing.T) *C
fooChain := discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
barChain := discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.IngressGateway.DiscoveryChain = map[string]*structs.CompiledDiscoveryChain{
"foo": fooChain,
"bar": barChain,
snap.IngressGateway.DiscoveryChain = map[UpstreamID]*structs.CompiledDiscoveryChain{
UpstreamIDFromString("foo"): fooChain,
UpstreamIDFromString("bar"): barChain,
}
return snap

View File

@ -42,34 +42,35 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
uidString := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
uid := UpstreamIDFromString(uidString)
switch snap.Kind {
case structs.ServiceKindIngressGateway:
if _, ok := snap.IngressGateway.UpstreamsSet[svc]; !ok {
if _, ok := snap.IngressGateway.UpstreamsSet[uid]; !ok {
// Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped.
// The associated watch was likely cancelled.
delete(upstreamsSnapshot.DiscoveryChain, svc)
s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", svc)
delete(upstreamsSnapshot.DiscoveryChain, uid)
s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", uid)
return nil
}
case structs.ServiceKindConnectProxy:
explicit := snap.ConnectProxy.UpstreamConfig[svc].HasLocalPortOrSocket()
if _, implicit := snap.ConnectProxy.IntentionUpstreams[svc]; !implicit && !explicit {
explicit := snap.ConnectProxy.UpstreamConfig[uid].HasLocalPortOrSocket()
if _, implicit := snap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit {
// Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped.
// The associated watch was likely cancelled.
delete(upstreamsSnapshot.DiscoveryChain, svc)
s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", svc)
delete(upstreamsSnapshot.DiscoveryChain, uid)
s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", uid)
return nil
}
default:
return fmt.Errorf("discovery-chain watch fired for unsupported kind: %s", snap.Kind)
}
upstreamsSnapshot.DiscoveryChain[svc] = resp.Chain
upstreamsSnapshot.DiscoveryChain[uid] = resp.Chain
if err := s.resetWatchesFromChain(ctx, svc, resp.Chain, upstreamsSnapshot); err != nil {
if err := s.resetWatchesFromChain(ctx, uid, resp.Chain, upstreamsSnapshot); err != nil {
return err
}
@ -79,15 +80,17 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up
return fmt.Errorf("invalid type for response: %T", u.Result)
}
correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:")
targetID, svc, ok := removeColonPrefix(correlationID)
targetID, uidString, ok := removeColonPrefix(correlationID)
if !ok {
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
}
if _, ok := upstreamsSnapshot.WatchedUpstreamEndpoints[svc]; !ok {
upstreamsSnapshot.WatchedUpstreamEndpoints[svc] = make(map[string]structs.CheckServiceNodes)
uid := UpstreamIDFromString(uidString)
if _, ok := upstreamsSnapshot.WatchedUpstreamEndpoints[uid]; !ok {
upstreamsSnapshot.WatchedUpstreamEndpoints[uid] = make(map[string]structs.CheckServiceNodes)
}
upstreamsSnapshot.WatchedUpstreamEndpoints[svc][targetID] = resp.Nodes
upstreamsSnapshot.WatchedUpstreamEndpoints[uid][targetID] = resp.Nodes
var passthroughAddrs map[string]ServicePassthroughAddrs
@ -121,8 +124,9 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up
Service: svc.Name,
}
if _, ok := upstreamsSnapshot.PassthroughUpstreams[svc.String()]; !ok {
upstreamsSnapshot.PassthroughUpstreams[svc.String()] = ServicePassthroughAddrs{
svcUID := NewUpstreamIDFromServiceName(svc)
if _, ok := upstreamsSnapshot.PassthroughUpstreams[svcUID]; !ok {
upstreamsSnapshot.PassthroughUpstreams[svcUID] = ServicePassthroughAddrs{
SNI: sni,
SpiffeID: spiffeID,
@ -136,7 +140,7 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up
isRemote := !structs.EqualPartitions(svc.PartitionOrDefault(), s.proxyID.PartitionOrDefault())
addr, _ := node.BestAddress(isRemote)
upstreamsSnapshot.PassthroughUpstreams[svc.String()].Addrs[addr] = struct{}{}
upstreamsSnapshot.PassthroughUpstreams[NewUpstreamIDFromServiceName(svc)].Addrs[addr] = struct{}{}
}
}
@ -146,14 +150,16 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up
return fmt.Errorf("invalid type for response: %T", u.Result)
}
correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
key, svc, ok := removeColonPrefix(correlationID)
key, uidString, ok := removeColonPrefix(correlationID)
if !ok {
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
}
if _, ok = upstreamsSnapshot.WatchedGatewayEndpoints[svc]; !ok {
upstreamsSnapshot.WatchedGatewayEndpoints[svc] = make(map[string]structs.CheckServiceNodes)
uid := UpstreamIDFromString(uidString)
if _, ok = upstreamsSnapshot.WatchedGatewayEndpoints[uid]; !ok {
upstreamsSnapshot.WatchedGatewayEndpoints[uid] = make(map[string]structs.CheckServiceNodes)
}
upstreamsSnapshot.WatchedGatewayEndpoints[svc][key] = resp.Nodes
upstreamsSnapshot.WatchedGatewayEndpoints[uid][key] = resp.Nodes
default:
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
@ -171,27 +177,27 @@ func removeColonPrefix(s string) (string, string, bool) {
func (s *handlerUpstreams) resetWatchesFromChain(
ctx context.Context,
id string,
uid UpstreamID,
chain *structs.CompiledDiscoveryChain,
snap *ConfigSnapshotUpstreams,
) error {
s.logger.Trace("resetting watches for discovery chain", "id", id)
s.logger.Trace("resetting watches for discovery chain", "id", uid)
if chain == nil {
return fmt.Errorf("not possible to arrive here with no discovery chain")
}
// Initialize relevant sub maps.
if _, ok := snap.WatchedUpstreams[id]; !ok {
snap.WatchedUpstreams[id] = make(map[string]context.CancelFunc)
if _, ok := snap.WatchedUpstreams[uid]; !ok {
snap.WatchedUpstreams[uid] = make(map[string]context.CancelFunc)
}
if _, ok := snap.WatchedUpstreamEndpoints[id]; !ok {
snap.WatchedUpstreamEndpoints[id] = make(map[string]structs.CheckServiceNodes)
if _, ok := snap.WatchedUpstreamEndpoints[uid]; !ok {
snap.WatchedUpstreamEndpoints[uid] = make(map[string]structs.CheckServiceNodes)
}
if _, ok := snap.WatchedGateways[id]; !ok {
snap.WatchedGateways[id] = make(map[string]context.CancelFunc)
if _, ok := snap.WatchedGateways[uid]; !ok {
snap.WatchedGateways[uid] = make(map[string]context.CancelFunc)
}
if _, ok := snap.WatchedGatewayEndpoints[id]; !ok {
snap.WatchedGatewayEndpoints[id] = make(map[string]structs.CheckServiceNodes)
if _, ok := snap.WatchedGatewayEndpoints[uid]; !ok {
snap.WatchedGatewayEndpoints[uid] = make(map[string]structs.CheckServiceNodes)
}
// We could invalidate this selectively based on a hash of the relevant
@ -199,14 +205,14 @@ func (s *handlerUpstreams) resetWatchesFromChain(
// upstream when the chain changes in any way.
//
// TODO(rb): content hash based add/remove
for targetID, cancelFn := range snap.WatchedUpstreams[id] {
for targetID, cancelFn := range snap.WatchedUpstreams[uid] {
s.logger.Trace("stopping watch of target",
"upstream", id,
"upstream", uid,
"chain", chain.ServiceName,
"target", targetID,
)
delete(snap.WatchedUpstreams[id], targetID)
delete(snap.WatchedUpstreamEndpoints[id], targetID)
delete(snap.WatchedUpstreams[uid], targetID)
delete(snap.WatchedUpstreamEndpoints[uid], targetID)
cancelFn()
}
@ -222,7 +228,7 @@ func (s *handlerUpstreams) resetWatchesFromChain(
}
opts := targetWatchOpts{
upstreamID: id,
upstreamID: uid,
chainID: target.ID,
service: target.Service,
filter: target.Subset.Filter,
@ -231,7 +237,7 @@ func (s *handlerUpstreams) resetWatchesFromChain(
}
err := s.watchUpstreamTarget(ctx, snap, opts)
if err != nil {
return fmt.Errorf("failed to watch target %q for upstream %q", target.ID, id)
return fmt.Errorf("failed to watch target %q for upstream %q", target.ID, uid)
}
// We'll get endpoints from the gateway query, but the health still has
@ -267,7 +273,7 @@ func (s *handlerUpstreams) resetWatchesFromChain(
chainEntMeta := structs.NewEnterpriseMetaWithPartition(chain.Partition, chain.Namespace)
opts := targetWatchOpts{
upstreamID: id,
upstreamID: uid,
chainID: chainID,
service: chain.ServiceName,
filter: "",
@ -276,18 +282,18 @@ func (s *handlerUpstreams) resetWatchesFromChain(
}
err := s.watchUpstreamTarget(ctx, snap, opts)
if err != nil {
return fmt.Errorf("failed to watch target %q for upstream %q", chainID, id)
return fmt.Errorf("failed to watch target %q for upstream %q", chainID, uid)
}
}
for key := range needGateways {
if _, ok := snap.WatchedGateways[id][key]; ok {
if _, ok := snap.WatchedGateways[uid][key]; ok {
continue
}
gwKey := gatewayKeyFromString(key)
s.logger.Trace("initializing watch of mesh gateway",
"upstream", id,
"upstream", uid,
"chain", chain.ServiceName,
"datacenter", gwKey.Datacenter,
"partition", gwKey.Partition,
@ -300,7 +306,7 @@ func (s *handlerUpstreams) resetWatchesFromChain(
source: *s.source,
token: s.token,
key: gwKey,
upstreamID: id,
upstreamID: uid,
}
err := watchMeshGateway(ctx, opts)
if err != nil {
@ -308,23 +314,23 @@ func (s *handlerUpstreams) resetWatchesFromChain(
return err
}
snap.WatchedGateways[id][key] = cancel
snap.WatchedGateways[uid][key] = cancel
}
for key, cancelFn := range snap.WatchedGateways[id] {
for key, cancelFn := range snap.WatchedGateways[uid] {
if _, ok := needGateways[key]; ok {
continue
}
gwKey := gatewayKeyFromString(key)
s.logger.Trace("stopping watch of mesh gateway",
"upstream", id,
"upstream", uid,
"chain", chain.ServiceName,
"datacenter", gwKey.Datacenter,
"partition", gwKey.Partition,
)
delete(snap.WatchedGateways[id], key)
delete(snap.WatchedGatewayEndpoints[id], key)
delete(snap.WatchedGateways[uid], key)
delete(snap.WatchedGatewayEndpoints[uid], key)
cancelFn()
}
@ -332,7 +338,7 @@ func (s *handlerUpstreams) resetWatchesFromChain(
}
type targetWatchOpts struct {
upstreamID string
upstreamID UpstreamID
chainID string
service string
filter string
@ -350,7 +356,7 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config
var finalMeta structs.EnterpriseMeta
finalMeta.Merge(opts.entMeta)
correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID
correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID.String()
ctx, cancel := context.WithCancel(ctx)
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
@ -378,7 +384,7 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config
}
type discoveryChainWatchOpts struct {
id string
id UpstreamID
name string
namespace string
partition string
@ -403,7 +409,7 @@ func (s *handlerUpstreams) watchDiscoveryChain(ctx context.Context, snap *Config
OverrideProtocol: opts.cfg.Protocol,
OverrideConnectTimeout: opts.cfg.ConnectTimeout(),
OverrideMeshGateway: opts.meshGateway,
}, "discovery-chain:"+opts.id, s.ch)
}, "discovery-chain:"+opts.id.String(), s.ch)
if err != nil {
cancel()
return err

View File

@ -314,15 +314,6 @@ func (us Upstreams) ToAPI() []api.Upstream {
return a
}
func (us Upstreams) ToMap() map[string]*Upstream {
upstreamMap := make(map[string]*Upstream)
for i := range us {
upstreamMap[us[i].Identifier()] = &us[i]
}
return upstreamMap
}
// UpstreamsFromAPI is a helper for converting api.Upstream to Upstream.
func UpstreamsFromAPI(us []api.Upstream) Upstreams {
a := make([]Upstream, len(us))
@ -551,24 +542,17 @@ func (k UpstreamKey) String() string {
)
}
// String implements Stringer by returning the Identifier.
func (u *Upstream) String() string {
return u.Identifier()
}
// Identifier returns a string representation that uniquely identifies the
// upstream in a canonical but human readable way.
func (us *Upstream) Identifier() string {
name := us.enterpriseIdentifierPrefix() + us.DestinationName
// String returns a representation of this upstream suitable for debugging
// purposes but nothing relies upon this format.
func (us *Upstream) String() string {
name := us.enterpriseStringPrefix() + us.DestinationName
typ := us.DestinationType
if us.Datacenter != "" {
name += "?dc=" + us.Datacenter
}
// Service is default type so never prefix it. This is more readable and long
// term it is the only type that matters so we can drop the prefix and have
// nicer naming in metrics etc.
// Service is default type so never prefix it.
if typ == "" || typ == UpstreamDestTypeService {
return name
}

View File

@ -13,6 +13,6 @@ func (us *Upstream) DestinationID() ServiceID {
}
}
func (us *Upstream) enterpriseIdentifierPrefix() string {
func (us *Upstream) enterpriseStringPrefix() string {
return ""
}

View File

@ -77,22 +77,22 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C
clusters = append(clusters, passthroughs...)
}
for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id]
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid]
explicit := upstreamCfg.HasLocalPortOrSocket()
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit {
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit {
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
}
chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid]
if !ok {
// this should not happen
return nil, fmt.Errorf("no endpoint map for upstream %q", id)
return nil, fmt.Errorf("no endpoint map for upstream %q", uid)
}
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(id, upstreamCfg, chain, chainEndpoints, cfgSnap)
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(uid, upstreamCfg, chain, chainEndpoints, cfgSnap)
if err != nil {
return nil, err
}
@ -387,30 +387,30 @@ func (s *ResourceGenerator) injectGatewayServiceAddons(cfgSnap *proxycfg.ConfigS
func (s *ResourceGenerator) clustersFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
var clusters []proto.Message
createdClusters := make(map[string]bool)
createdClusters := make(map[proxycfg.UpstreamID]bool)
for _, upstreams := range cfgSnap.IngressGateway.Upstreams {
for _, u := range upstreams {
id := u.Identifier()
uid := proxycfg.NewUpstreamID(&u)
// If we've already created a cluster for this upstream, skip it. Multiple listeners may
// reference the same upstream, so we don't need to create duplicate clusters in that case.
if createdClusters[id] {
if createdClusters[uid] {
continue
}
chain, ok := cfgSnap.IngressGateway.DiscoveryChain[id]
chain, ok := cfgSnap.IngressGateway.DiscoveryChain[uid]
if !ok {
// this should not happen
return nil, fmt.Errorf("no discovery chain for upstream %q", id)
return nil, fmt.Errorf("no discovery chain for upstream %q", uid)
}
chainEndpoints, ok := cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id]
chainEndpoints, ok := cfgSnap.IngressGateway.WatchedUpstreamEndpoints[uid]
if !ok {
// this should not happen
return nil, fmt.Errorf("no endpoint map for upstream %q", id)
return nil, fmt.Errorf("no endpoint map for upstream %q", uid)
}
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(id, &u, chain, chainEndpoints, cfgSnap)
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(uid, &u, chain, chainEndpoints, cfgSnap)
if err != nil {
return nil, err
}
@ -418,7 +418,7 @@ func (s *ResourceGenerator) clustersFromSnapshotIngressGateway(cfgSnap *proxycfg
for _, c := range upstreamClusters {
clusters = append(clusters, c)
}
createdClusters[id] = true
createdClusters[uid] = true
}
}
return clusters, nil
@ -481,6 +481,8 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs
var c *envoy_cluster_v3.Cluster
var err error
uid := proxycfg.NewUpstreamID(&upstream)
dc := upstream.Datacenter
if dc == "" {
dc = cfgSnap.Datacenter
@ -491,7 +493,7 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", upstream.Identifier(), "error", err)
s.Logger.Warn("failed to parse", "upstream", uid, "error", err)
}
if cfg.EnvoyClusterJSON != "" {
c, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON)
@ -524,7 +526,7 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs
}
}
endpoints := cfgSnap.ConnectProxy.PreparedQueryEndpoints[upstream.Identifier()]
endpoints := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid]
var (
spiffeIDs = make([]connect.SpiffeIDService, 0)
seen = make(map[string]struct{})
@ -571,14 +573,14 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs
}
func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
id string,
uid proxycfg.UpstreamID,
upstream *structs.Upstream,
chain *structs.CompiledDiscoveryChain,
chainEndpoints map[string]structs.CheckServiceNodes,
cfgSnap *proxycfg.ConfigSnapshot,
) ([]*envoy_cluster_v3.Cluster, error) {
if chain == nil {
return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", id)
return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", uid)
}
configMap := make(map[string]interface{})
@ -589,7 +591,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", id,
s.Logger.Warn("failed to parse", "upstream", uid,
"error", err)
}
@ -604,7 +606,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
}
} else {
s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configured for",
"discovery chain", chain.ServiceName, "upstream", id,
"discovery chain", chain.ServiceName, "upstream", uid,
"envoy_cluster_json", chain.ServiceName)
}
}

View File

@ -69,8 +69,8 @@ func TestClustersFromSnapshot(t *testing.T) {
customAppClusterJSON(t, customClusterJSONOptions{
Name: "myservice",
})
snap.ConnectProxy.UpstreamConfig = map[string]*structs.Upstream{
"db": {
snap.ConnectProxy.UpstreamConfig = map[proxycfg.UpstreamID]*structs.Upstream{
UID("db"): {
// The local bind port is overridden by the escape hatch, but is required for explicit upstreams.
LocalBindPort: 9191,
Config: map[string]interface{}{
@ -669,15 +669,17 @@ func TestClustersFromSnapshot(t *testing.T) {
snap.Proxy.Mode = structs.ProxyModeTransparent
kafka := structs.NewServiceName("kafka", nil)
mongo := structs.NewServiceName("mongo", nil)
kafkaUID := proxycfg.NewUpstreamIDFromServiceName(kafka)
mongoUID := proxycfg.NewUpstreamIDFromServiceName(mongo)
snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{
kafka.String(): {},
mongo.String(): {},
snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{
kafkaUID: {},
mongoUID: {},
}
// We add a passthrough cluster for each upstream service name
snap.ConnectProxy.PassthroughUpstreams = map[string]proxycfg.ServicePassthroughAddrs{
kafka.String(): {
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{
kafkaUID: {
SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
SpiffeID: connect.SpiffeIDService{
Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul",
@ -689,7 +691,7 @@ func TestClustersFromSnapshot(t *testing.T) {
"9.9.9.9": {},
},
},
mongo.String(): {
mongoUID: {
SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
SpiffeID: connect.SpiffeIDService{
Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul",
@ -705,8 +707,8 @@ func TestClustersFromSnapshot(t *testing.T) {
}
// There should still be a cluster for non-passthrough requests
snap.ConnectProxy.DiscoveryChain[mongo.String()] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.WatchedUpstreamEndpoints[mongo.String()] = map[string]structs.CheckServiceNodes{
snap.ConnectProxy.DiscoveryChain[mongoUID] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.WatchedUpstreamEndpoints[mongoUID] = map[string]structs.CheckServiceNodes{
"mongo.default.default.dc1": {
structs.CheckServiceNode{
Node: &structs.Node{
@ -923,3 +925,8 @@ func TestEnvoyLBConfig_InjectToCluster(t *testing.T) {
})
}
}
// UID is just a convenience function to aid in writing tests less verbosely.
func UID(input string) proxycfg.UpstreamID {
return proxycfg.UpstreamIDFromString(input)
}

View File

@ -153,9 +153,9 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, svc, targetID string) {
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[svc][targetID] =
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[svc][targetID][0:1]
deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) {
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID] =
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1]
}
runStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) {
@ -169,7 +169,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
// now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for DB
snap = newTestSnapshot(t, snap, "")
deleteAllButOneEndpoint(snap, "db", "db.default.default.dc1")
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
mgr.DeliverConfig(t, sid, snap)
// We never send an EDS reply about this change.
@ -206,7 +206,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
runStep(t, "simulate envoy NACKing an endpoint update", func(t *testing.T) {
// Trigger only an EDS update.
snap = newTestSnapshot(t, snap, "")
deleteAllButOneEndpoint(snap, "db", "db.default.default.dc1")
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
mgr.DeliverConfig(t, sid, snap)
// Send envoy an EDS update.

View File

@ -47,22 +47,22 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
resources := make([]proto.Message, 0,
len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id]
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid]
explicit := upstreamCfg.HasLocalPortOrSocket()
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit {
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit {
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
}
es := s.endpointsFromDiscoveryChain(
id,
uid,
chain,
cfgSnap.Locality,
upstreamCfg,
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id],
cfgSnap.ConnectProxy.WatchedGatewayEndpoints[id],
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid],
cfgSnap.ConnectProxy.WatchedGatewayEndpoints[uid],
)
resources = append(resources, es...)
}
@ -72,7 +72,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
continue
}
id := u.Identifier()
uid := proxycfg.NewUpstreamID(&u)
dc := u.Datacenter
if dc == "" {
@ -80,7 +80,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
}
clusterName := connect.UpstreamSNI(&u, "", dc, cfgSnap.Roots.TrustDomain)
endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[id]
endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid]
if ok {
la := makeLoadAssignment(
clusterName,
@ -318,27 +318,27 @@ func (s *ResourceGenerator) endpointsFromServicesAndResolvers(
func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
var resources []proto.Message
createdClusters := make(map[string]bool)
createdClusters := make(map[proxycfg.UpstreamID]bool)
for _, upstreams := range cfgSnap.IngressGateway.Upstreams {
for _, u := range upstreams {
id := u.Identifier()
uid := proxycfg.NewUpstreamID(&u)
// If we've already created endpoints for this upstream, skip it. Multiple listeners may
// reference the same upstream, so we don't need to create duplicate endpoints in that case.
if createdClusters[id] {
if createdClusters[uid] {
continue
}
es := s.endpointsFromDiscoveryChain(
id,
cfgSnap.IngressGateway.DiscoveryChain[id],
uid,
cfgSnap.IngressGateway.DiscoveryChain[uid],
proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: u.DestinationPartition},
&u,
cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id],
cfgSnap.IngressGateway.WatchedGatewayEndpoints[id],
cfgSnap.IngressGateway.WatchedUpstreamEndpoints[uid],
cfgSnap.IngressGateway.WatchedGatewayEndpoints[uid],
)
resources = append(resources, es...)
createdClusters[id] = true
createdClusters[uid] = true
}
}
return resources, nil
@ -366,7 +366,7 @@ func makePipeEndpoint(path string) *envoy_endpoint_v3.LbEndpoint {
}
func (s *ResourceGenerator) endpointsFromDiscoveryChain(
id string,
uid proxycfg.UpstreamID,
chain *structs.CompiledDiscoveryChain,
gatewayKey proxycfg.GatewayKey,
upstream *structs.Upstream,
@ -387,7 +387,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", id,
s.Logger.Warn("failed to parse", "upstream", uid,
"error", err)
}
@ -402,7 +402,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
}
} else {
s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configued for",
"discovery chain", chain.ServiceName, "upstream", id,
"discovery chain", chain.ServiceName, "upstream", uid,
"envoy_cluster_json", chain.ServiceName)
}
}

View File

@ -325,8 +325,8 @@ func TestEndpointsFromSnapshot(t *testing.T) {
customAppClusterJSON(t, customClusterJSONOptions{
Name: "myservice",
})
snap.ConnectProxy.UpstreamConfig = map[string]*structs.Upstream{
"db": {
snap.ConnectProxy.UpstreamConfig = map[proxycfg.UpstreamID]*structs.Upstream{
UID("db"): {
// The local bind port is overridden by the escape hatch, but is required for explicit upstreams.
LocalBindPort: 9191,
Config: map[string]interface{}{

View File

@ -93,16 +93,16 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
}
}
for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id]
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid]
explicit := upstreamCfg.HasLocalPortOrSocket()
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit {
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit {
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
}
cfg := s.getAndModifyUpstreamConfigForListener(id, upstreamCfg, chain)
cfg := s.getAndModifyUpstreamConfigForListener(uid, upstreamCfg, chain)
// If escape hatch is present, create a listener from it and move on to the next
if cfg.EnvoyListenerJSON != "" {
@ -133,7 +133,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
// Generate the upstream listeners for when they are explicitly set with a local bind port or socket path
if upstreamCfg != nil && upstreamCfg.HasLocalPortOrSocket() {
filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{
routeName: id,
routeName: uid.EnvoyID(),
clusterName: clusterName,
filterName: filterName,
protocol: cfg.Protocol,
@ -143,7 +143,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
return nil, err
}
upstreamListener := makeListener(id, upstreamCfg, envoy_core_v3.TrafficDirection_OUTBOUND)
upstreamListener := makeListener(uid.EnvoyID(), upstreamCfg, envoy_core_v3.TrafficDirection_OUTBOUND)
upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{
filterChain,
}
@ -158,7 +158,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
// as we do for explicit upstreams above.
filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{
routeName: id,
routeName: uid.EnvoyID(),
clusterName: clusterName,
filterName: filterName,
protocol: cfg.Protocol,
@ -168,7 +168,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
return nil, err
}
endpoints := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id][chain.ID()]
endpoints := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid][chain.ID()]
uniqueAddrs := make(map[string]struct{})
// Match on the virtual IP for the upstream service (identified by the chain's ID).
@ -199,7 +199,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
}
if len(uniqueAddrs) > 2 {
s.Logger.Debug("detected multiple virtual IPs for an upstream, all will be used to match traffic",
"upstream", id, "ip_count", len(uniqueAddrs))
"upstream", uid, "ip_count", len(uniqueAddrs))
}
// For every potential address we collected, create the appropriate address prefix to match on.
@ -218,12 +218,11 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
// as opposed to via a virtual IP.
var passthroughChains []*envoy_listener_v3.FilterChain
for svc, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams {
sn := structs.ServiceNameFromString(svc)
for uid, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams {
u := structs.Upstream{
DestinationName: sn.Name,
DestinationNamespace: sn.NamespaceOrDefault(),
DestinationPartition: sn.PartitionOrDefault(),
DestinationName: uid.Name,
DestinationNamespace: uid.NamespaceOrDefault(),
DestinationPartition: uid.PartitionOrDefault(),
}
filterName := fmt.Sprintf("%s.%s.%s.%s", u.DestinationName, u.DestinationNamespace, u.DestinationPartition, cfgSnap.Datacenter)
@ -271,7 +270,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
}
// Looping over explicit upstreams is only needed for prepared queries because they do not have discovery chains
for id, u := range cfgSnap.ConnectProxy.UpstreamConfig {
for uid, u := range cfgSnap.ConnectProxy.UpstreamConfig {
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
continue
}
@ -280,7 +279,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", u.Identifier(), "error", err)
s.Logger.Warn("failed to parse", "upstream", uid, "error", err)
}
// If escape hatch is present, create a listener from it and move on to the next
@ -288,7 +287,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
upstreamListener, err := makeListenerFromUserConfig(cfg.EnvoyListenerJSON)
if err != nil {
s.Logger.Error("failed to parse envoy_listener_json",
"upstream", u.Identifier(),
"upstream", uid,
"error", err)
continue
}
@ -296,13 +295,13 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
continue
}
upstreamListener := makeListener(id, u, envoy_core_v3.TrafficDirection_OUTBOUND)
upstreamListener := makeListener(uid.EnvoyID(), u, envoy_core_v3.TrafficDirection_OUTBOUND)
filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{
// TODO (SNI partition) add partition for upstream SNI
clusterName: connect.UpstreamSNI(u, "", cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
filterName: id,
routeName: id,
filterName: uid.EnvoyID(),
routeName: uid.EnvoyID(),
protocol: cfg.Protocol,
})
if err != nil {
@ -1289,7 +1288,11 @@ func simpleChainTarget(chain *structs.CompiledDiscoveryChain) (*structs.Discover
return chain.Targets[targetID], nil
}
func (s *ResourceGenerator) getAndModifyUpstreamConfigForListener(id string, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) structs.UpstreamConfig {
func (s *ResourceGenerator) getAndModifyUpstreamConfigForListener(
uid proxycfg.UpstreamID,
u *structs.Upstream,
chain *structs.CompiledDiscoveryChain,
) structs.UpstreamConfig {
var (
cfg structs.UpstreamConfig
err error
@ -1304,7 +1307,7 @@ func (s *ResourceGenerator) getAndModifyUpstreamConfigForListener(id string, u *
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", id, "error", err)
s.Logger.Warn("failed to parse", "upstream", uid, "error", err)
}
} else {
// Use NoDefaults here so that we can set the protocol to the chain
@ -1313,12 +1316,12 @@ func (s *ResourceGenerator) getAndModifyUpstreamConfigForListener(id string, u *
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", id, "error", err)
s.Logger.Warn("failed to parse", "upstream", uid, "error", err)
}
if cfg.EnvoyListenerJSON != "" {
s.Logger.Warn("ignoring escape hatch setting because already configured for",
"discovery chain", chain.ServiceName, "upstream", id, "config", "envoy_listener_json")
"discovery chain", chain.ServiceName, "upstream", uid, "config", "envoy_listener_json")
// Remove from config struct so we don't use it later on
cfg.EnvoyListenerJSON = ""

View File

@ -2,9 +2,11 @@ package xds
import (
"fmt"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/duration"
"github.com/golang/protobuf/ptypes/wrappers"
@ -33,15 +35,16 @@ func (s *ResourceGenerator) makeIngressGatewayListeners(address string, cfgSnap
// member, because this key/value pair is created only when a
// GatewayService is returned in the RPC
u := upstreams[0]
id := u.Identifier()
chain := cfgSnap.IngressGateway.DiscoveryChain[id]
uid := proxycfg.NewUpstreamID(&u)
chain := cfgSnap.IngressGateway.DiscoveryChain[uid]
if chain == nil {
// Wait until a chain is present in the snapshot.
continue
}
cfg := s.getAndModifyUpstreamConfigForListener(id, &u, chain)
cfg := s.getAndModifyUpstreamConfigForListener(uid, &u, chain)
// RDS, Envoy's Route Discovery Service, is only used for HTTP services with a customized discovery chain.
// TODO(freddy): Why can the protocol of the listener be overridden here?
@ -60,9 +63,9 @@ func (s *ResourceGenerator) makeIngressGatewayListeners(address string, cfgSnap
filterName := fmt.Sprintf("%s.%s.%s.%s", chain.ServiceName, chain.Namespace, chain.Partition, chain.Datacenter)
l := makePortListenerWithDefault(id, address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND)
l := makePortListenerWithDefault(uid.EnvoyID(), address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND)
filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{
routeName: id,
routeName: uid.EnvoyID(),
useRDS: useRDS,
clusterName: clusterName,
filterName: filterName,

View File

@ -72,6 +72,8 @@ func TestListenersFromSnapshot(t *testing.T) {
snap.Proxy.Upstreams[0].LocalBindPort = 0
snap.Proxy.Upstreams[0].LocalBindSocketPath = "/tmp/service-mesh/client-1/grpc-employee-server"
snap.Proxy.Upstreams[0].LocalBindSocketMode = "0640"
snap.ConnectProxy.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams)
},
},
{
@ -95,6 +97,8 @@ func TestListenersFromSnapshot(t *testing.T) {
create: proxycfg.TestConfigSnapshot,
setup: func(snap *proxycfg.ConfigSnapshot) {
snap.Proxy.Upstreams[0].Config["protocol"] = "http"
snap.ConnectProxy.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams)
},
},
{
@ -159,14 +163,21 @@ func TestListenersFromSnapshot(t *testing.T) {
create: proxycfg.TestConfigSnapshot,
setup: func(snap *proxycfg.ConfigSnapshot) {
for i := range snap.Proxy.Upstreams {
if snap.Proxy.Upstreams[i].DestinationName != "db" {
continue // only tweak the db upstream
}
if snap.Proxy.Upstreams[i].Config == nil {
snap.Proxy.Upstreams[i].Config = map[string]interface{}{}
}
uid := proxycfg.NewUpstreamID(&snap.Proxy.Upstreams[i])
snap.Proxy.Upstreams[i].Config["envoy_listener_json"] =
customListenerJSON(t, customListenerJSONOptions{
Name: snap.Proxy.Upstreams[i].Identifier() + ":custom-upstream",
Name: uid.EnvoyID() + ":custom-upstream",
})
}
snap.ConnectProxy.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams)
},
},
{
@ -174,14 +185,22 @@ func TestListenersFromSnapshot(t *testing.T) {
create: proxycfg.TestConfigSnapshotDiscoveryChainWithFailover,
setup: func(snap *proxycfg.ConfigSnapshot) {
for i := range snap.Proxy.Upstreams {
if snap.Proxy.Upstreams[i].DestinationName != "db" {
continue // only tweak the db upstream
}
if snap.Proxy.Upstreams[i].Config == nil {
snap.Proxy.Upstreams[i].Config = map[string]interface{}{}
}
uid := proxycfg.NewUpstreamID(&snap.Proxy.Upstreams[i])
snap.Proxy.Upstreams[i].Config["envoy_listener_json"] =
customListenerJSON(t, customListenerJSONOptions{
Name: snap.Proxy.Upstreams[i].Identifier() + ":custom-upstream",
Name: uid.EnvoyID() + ":custom-upstream",
})
}
snap.ConnectProxy.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams)
},
},
{
@ -901,7 +920,7 @@ func TestListenersFromSnapshot(t *testing.T) {
connect.TestClusterID+".consul",
nil,
)
snap.IngressGateway.DiscoveryChain["secure"] = secureChain
snap.IngressGateway.DiscoveryChain[UID("secure")] = secureChain
insecureChain := discoverychain.TestCompileConfigEntries(
t,
@ -912,7 +931,7 @@ func TestListenersFromSnapshot(t *testing.T) {
connect.TestClusterID+".consul",
nil,
)
snap.IngressGateway.DiscoveryChain["insecure"] = insecureChain
snap.IngressGateway.DiscoveryChain[UID("insecure")] = insecureChain
snap.IngressGateway.Listeners = map[proxycfg.IngressListenerKey]structs.IngressListener{
{Protocol: "tcp", Port: 8080}: {
@ -1084,12 +1103,13 @@ func TestListenersFromSnapshot(t *testing.T) {
// DiscoveryChain without an UpstreamConfig should yield a filter chain when in transparent proxy mode
google := structs.NewServiceName("google", nil)
snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{
google.String(): {},
googleUID := proxycfg.NewUpstreamIDFromServiceName(google)
snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{
googleUID: {},
}
snap.ConnectProxy.DiscoveryChain[google.String()] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.DiscoveryChain[googleUID] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.WatchedUpstreamEndpoints[google.String()] = map[string]structs.CheckServiceNodes{
snap.ConnectProxy.WatchedUpstreamEndpoints[googleUID] = map[string]structs.CheckServiceNodes{
"google.default.default.dc1": {
structs.CheckServiceNode{
Node: &structs.Node{
@ -1126,7 +1146,7 @@ func TestListenersFromSnapshot(t *testing.T) {
}
// DiscoveryChains without endpoints do not get a filter chain because there are no addresses to match on.
snap.ConnectProxy.DiscoveryChain["no-endpoints"] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.DiscoveryChain[UID("no-endpoints")] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
},
},
{
@ -1144,11 +1164,12 @@ func TestListenersFromSnapshot(t *testing.T) {
// DiscoveryChain without an UpstreamConfig should yield a filter chain when in transparent proxy mode
google := structs.NewServiceName("google", nil)
snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{
google.String(): {},
googleUID := proxycfg.NewUpstreamIDFromServiceName(google)
snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{
googleUID: {},
}
snap.ConnectProxy.DiscoveryChain[google.String()] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.WatchedUpstreamEndpoints[google.String()] = map[string]structs.CheckServiceNodes{
snap.ConnectProxy.DiscoveryChain[googleUID] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.WatchedUpstreamEndpoints[googleUID] = map[string]structs.CheckServiceNodes{
"google.default.default.dc1": {
structs.CheckServiceNode{
Node: &structs.Node{
@ -1168,7 +1189,7 @@ func TestListenersFromSnapshot(t *testing.T) {
}
// DiscoveryChains without endpoints do not get a filter chain because there are no addresses to match on.
snap.ConnectProxy.DiscoveryChain["no-endpoints"] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.DiscoveryChain[UID("no-endpoints")] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
},
},
{
@ -1178,24 +1199,26 @@ func TestListenersFromSnapshot(t *testing.T) {
snap.Proxy.Mode = structs.ProxyModeTransparent
kafka := structs.NewServiceName("kafka", nil)
mongo := structs.NewServiceName("mongo", nil)
kafkaUID := proxycfg.NewUpstreamIDFromServiceName(kafka)
mongoUID := proxycfg.NewUpstreamIDFromServiceName(mongo)
snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{
kafka.String(): {},
mongo.String(): {},
snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{
kafkaUID: {},
mongoUID: {},
}
snap.ConnectProxy.DiscoveryChain[mongo.String()] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.DiscoveryChain[kafka.String()] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.DiscoveryChain[mongoUID] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.DiscoveryChain[kafkaUID] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
// We add a filter chains for each passthrough service name.
// The filter chain will route to a cluster with the same SNI name.
snap.ConnectProxy.PassthroughUpstreams = map[string]proxycfg.ServicePassthroughAddrs{
kafka.String(): {
snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{
kafkaUID: {
SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
Addrs: map[string]struct{}{
"9.9.9.9": {},
},
},
mongo.String(): {
mongoUID: {
SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul",
Addrs: map[string]struct{}{
"10.10.10.10": {},
@ -1205,7 +1228,7 @@ func TestListenersFromSnapshot(t *testing.T) {
}
// There should still be a filter chain for mongo's virtual address
snap.ConnectProxy.WatchedUpstreamEndpoints[mongo.String()] = map[string]structs.CheckServiceNodes{
snap.ConnectProxy.WatchedUpstreamEndpoints[mongoUID] = map[string]structs.CheckServiceNodes{
"mongo.default.default.dc1": {
structs.CheckServiceNode{
Node: &structs.Node{
@ -1240,12 +1263,14 @@ func TestListenersFromSnapshot(t *testing.T) {
// DiscoveryChain without an UpstreamConfig should yield a filter chain when in transparent proxy mode
google := structs.NewServiceName("google", nil)
kafka := structs.NewServiceName("kafka", nil)
snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{
google.String(): {},
kafka.String(): {},
googleUID := proxycfg.NewUpstreamIDFromServiceName(google)
kafkaUID := proxycfg.NewUpstreamIDFromServiceName(kafka)
snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{
googleUID: {},
kafkaUID: {},
}
snap.ConnectProxy.DiscoveryChain[google.String()] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.DiscoveryChain[kafka.String()] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.DiscoveryChain[googleUID] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
snap.ConnectProxy.DiscoveryChain[kafkaUID] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
tgate := structs.CheckServiceNode{
Node: &structs.Node{
@ -1264,10 +1289,10 @@ func TestListenersFromSnapshot(t *testing.T) {
},
},
}
snap.ConnectProxy.WatchedUpstreamEndpoints[google.String()] = map[string]structs.CheckServiceNodes{
snap.ConnectProxy.WatchedUpstreamEndpoints[googleUID] = map[string]structs.CheckServiceNodes{
"google.default.default.dc1": {tgate},
}
snap.ConnectProxy.WatchedUpstreamEndpoints[kafka.String()] = map[string]structs.CheckServiceNodes{
snap.ConnectProxy.WatchedUpstreamEndpoints[kafkaUID] = map[string]structs.CheckServiceNodes{
"kafka.default.default.dc1": {tgate},
}
},

View File

@ -48,24 +48,24 @@ func (s *ResourceGenerator) routesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot)
// "routes" in the snapshot.
func (s *ResourceGenerator) routesForConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
var resources []proto.Message
for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
if chain.IsDefault() {
continue
}
explicit := cfgSnap.ConnectProxy.UpstreamConfig[id].HasLocalPortOrSocket()
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit {
explicit := cfgSnap.ConnectProxy.UpstreamConfig[uid].HasLocalPortOrSocket()
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit {
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
}
virtualHost, err := makeUpstreamRouteForDiscoveryChain(id, chain, []string{"*"})
virtualHost, err := makeUpstreamRouteForDiscoveryChain(uid.EnvoyID(), chain, []string{"*"})
if err != nil {
return nil, err
}
route := &envoy_route_v3.RouteConfiguration{
Name: id,
Name: uid.EnvoyID(),
VirtualHosts: []*envoy_route_v3.VirtualHost{virtualHost},
// ValidateClusters defaults to true when defined statically and false
// when done via RDS. Re-set the reasonable value of true to prevent
@ -173,7 +173,7 @@ func makeNamedDefaultRouteWithLB(clusterName string, lb *structs.LoadBalancer, a
func (s *ResourceGenerator) routesForIngressGateway(
listeners map[proxycfg.IngressListenerKey]structs.IngressListener,
upstreams map[proxycfg.IngressListenerKey]structs.Upstreams,
chains map[string]*structs.CompiledDiscoveryChain,
chains map[proxycfg.UpstreamID]*structs.CompiledDiscoveryChain,
) ([]proto.Message, error) {
var result []proto.Message
@ -195,14 +195,14 @@ func (s *ResourceGenerator) routesForIngressGateway(
}
for _, u := range upstreams {
upstreamID := u.Identifier()
chain := chains[upstreamID]
uid := proxycfg.NewUpstreamID(&u)
chain := chains[uid]
if chain == nil {
continue
}
domains := generateUpstreamIngressDomains(listenerKey, u)
virtualHost, err := makeUpstreamRouteForDiscoveryChain(upstreamID, chain, domains)
virtualHost, err := makeUpstreamRouteForDiscoveryChain(uid.EnvoyID(), chain, domains)
if err != nil {
return nil, err
}

View File

@ -204,11 +204,11 @@ func TestRoutesFromSnapshot(t *testing.T) {
bazChain := discoverychain.TestCompileConfigEntries(t, "baz", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
quxChain := discoverychain.TestCompileConfigEntries(t, "qux", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...)
snap.IngressGateway.DiscoveryChain = map[string]*structs.CompiledDiscoveryChain{
"foo": fooChain,
"bar": barChain,
"baz": bazChain,
"qux": quxChain,
snap.IngressGateway.DiscoveryChain = map[proxycfg.UpstreamID]*structs.CompiledDiscoveryChain{
UID("foo"): fooChain,
UID("bar"): barChain,
UID("baz"): bazChain,
UID("qux"): quxChain,
}
},
},
@ -667,6 +667,9 @@ func setupIngressWithTwoHTTPServices(t *testing.T, o ingressSDSOpts) func(snap *
},
}
webUID := proxycfg.NewUpstreamID(&webUpstream)
fooUID := proxycfg.NewUpstreamID(&fooUpstream)
// Setup additional HTTP service on same listener with default router
snap.IngressGateway.Upstreams = map[proxycfg.IngressListenerKey]structs.Upstreams{
{Protocol: "http", Port: 9191}: {webUpstream, fooUpstream},
@ -775,7 +778,7 @@ func setupIngressWithTwoHTTPServices(t *testing.T, o ingressSDSOpts) func(snap *
o.entMetas["web"].PartitionOrDefault(), "dc1",
connect.TestClusterID+".consul", nil, entries...)
snap.IngressGateway.DiscoveryChain[webUpstream.Identifier()] = webChain
snap.IngressGateway.DiscoveryChain[fooUpstream.Identifier()] = fooChain
snap.IngressGateway.DiscoveryChain[webUID] = webChain
snap.IngressGateway.DiscoveryChain[fooUID] = fooChain
}
}

View File

@ -27,11 +27,11 @@
},
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
"name": "prepared_query:geo-cache:custom-upstream",
"name": "prepared_query:geo-cache:127.10.10.10:8181",
"address": {
"socketAddress": {
"address": "11.11.11.11",
"portValue": 11111
"address": "127.10.10.10",
"portValue": 8181
}
},
"filterChains": [
@ -41,13 +41,14 @@
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "foo-stats",
"cluster": "random-cluster"
"statPrefix": "upstream.prepared_query_geo-cache",
"cluster": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul"
}
}
]
}
]
],
"trafficDirection": "OUTBOUND"
},
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",

View File

@ -43,8 +43,8 @@ func newTestSnapshot(
additionalEntries ...structs.ConfigEntry,
) *proxycfg.ConfigSnapshot {
snap := proxycfg.TestConfigSnapshotDiscoveryChainDefaultWithEntries(t, additionalEntries...)
snap.ConnectProxy.PreparedQueryEndpoints = map[string]structs.CheckServiceNodes{
"prepared_query:geo-cache": proxycfg.TestPreparedQueryNodes(t, "geo-cache"),
snap.ConnectProxy.PreparedQueryEndpoints = map[proxycfg.UpstreamID]structs.CheckServiceNodes{
UID("prepared_query:geo-cache"): proxycfg.TestPreparedQueryNodes(t, "geo-cache"),
}
if prevSnap != nil {
snap.Roots = prevSnap.Roots
@ -53,7 +53,7 @@ func newTestSnapshot(
if dbServiceProtocol != "" {
// Simulate ServiceManager injection of protocol
snap.Proxy.Upstreams[0].Config["protocol"] = dbServiceProtocol
snap.ConnectProxy.ConfigSnapshotUpstreams.UpstreamConfig = snap.Proxy.Upstreams.ToMap()
snap.ConnectProxy.ConfigSnapshotUpstreams.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams)
}
return snap
}