From 439a7fce2d872630cc23341e7b2ae368ac6a4ca4 Mon Sep 17 00:00:00 2001 From: Freddy Date: Thu, 15 Apr 2021 13:54:40 -0600 Subject: [PATCH] Split Upstream.Identifier() so non-empty namespace is always prepended in ent (#10031) --- agent/proxycfg/manager_test.go | 14 +-- agent/proxycfg/state_test.go | 107 ++++++++++++---------- agent/structs/connect_proxy_config.go | 22 ----- agent/structs/connect_proxy_config_oss.go | 22 +++++ agent/xds/clusters.go | 4 +- 5 files changed, 91 insertions(+), 78 deletions(-) diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 2c9fa40714..c4496b6d3f 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -190,6 +190,8 @@ func TestManager_BasicLifecycle(t *testing.T) { EnterpriseMeta: *structs.DefaultEnterpriseMeta(), }) + db := structs.NewServiceName("db", nil) + // Create test cases using some of the common data above. tests := []*testcase_BasicLifecycle{ { @@ -217,18 +219,18 @@ func TestManager_BasicLifecycle(t *testing.T) { ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{ Leaf: leaf, DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ - "db": dbDefaultChain(), + db.String(): dbDefaultChain(), }, WatchedDiscoveryChains: map[string]context.CancelFunc{}, WatchedUpstreams: nil, // Clone() clears this out WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{ - "db": { + db.String(): { "db.default.dc1": TestUpstreamNodes(t), }, }, WatchedGateways: nil, // Clone() clears this out WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{ - "db": {}, + db.String(): {}, }, UpstreamConfig: map[string]*structs.Upstream{ upstreams[0].Identifier(): &upstreams[0], @@ -271,19 +273,19 @@ func TestManager_BasicLifecycle(t *testing.T) { ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{ Leaf: leaf, DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ - "db": dbSplitChain(), + db.String(): dbSplitChain(), }, WatchedDiscoveryChains: map[string]context.CancelFunc{}, WatchedUpstreams: nil, // Clone() clears this out WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{ - "db": { + db.String(): { "v1.db.default.dc1": TestUpstreamNodes(t), "v2.db.default.dc1": TestUpstreamNodesAlternate(t), }, }, WatchedGateways: nil, // Clone() clears this out WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{ - "db": {}, + db.String(): {}, }, UpstreamConfig: map[string]*structs.Upstream{ upstreams[0].Identifier(): &upstreams[0], diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 7ffc6f23db..85728ab706 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -381,6 +381,13 @@ func TestState_WatchesAndUpdates(t *testing.T) { indexedRoots, issuedCert := TestCerts(t) + // Used to account for differences in OSS/ent implementations of ServiceID.String() + var ( + db = structs.NewServiceName("db", nil) + billing = structs.NewServiceName("billing", nil) + api = structs.NewServiceName("api", nil) + ) + rootWatchEvent := func() cache.UpdateEvent { return cache.UpdateEvent{ CorrelationID: rootsWatchID, @@ -471,7 +478,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { leafWatchID: genVerifyLeafWatch("web", "dc1"), intentionsWatchID: genVerifyIntentionWatch("web", "dc1"), "upstream:prepared_query:query": genVerifyPreparedQueryWatch("query", "dc1"), - "discovery-chain:api": genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + fmt.Sprintf("discovery-chain:%s", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -480,7 +487,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Mode: meshGatewayProxyConfigValue, }, }), - "discovery-chain:api-failover-remote?dc=dc2": genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + fmt.Sprintf("discovery-chain:%s-failover-remote?dc=dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api-failover-remote", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -489,7 +496,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Mode: structs.MeshGatewayModeRemote, }, }), - "discovery-chain:api-failover-local?dc=dc2": genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + fmt.Sprintf("discovery-chain:%s-failover-local?dc=dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api-failover-local", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -498,7 +505,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Mode: structs.MeshGatewayModeLocal, }, }), - "discovery-chain:api-failover-direct?dc=dc2": genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + fmt.Sprintf("discovery-chain:%s-failover-direct?dc=dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api-failover-direct", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -507,7 +514,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Mode: structs.MeshGatewayModeNone, }, }), - "discovery-chain:api-dc2": genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + fmt.Sprintf("discovery-chain:%s-dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api-dc2", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -649,15 +656,6 @@ func TestState_WatchesAndUpdates(t *testing.T) { } } - // Used in terminating-gateway cases to account for differences in OSS/ent implementations of ServiceID.String() - db := structs.NewServiceName("db", nil) - dbStr := db.String() - - billing := structs.NewServiceName("billing", nil) - - api := structs.NewServiceName("api", nil) - apiStr := api.String() - dbIxnMatch := &structs.IndexedIntentionMatches{ Matches: []structs.Intentions{ []*structs.Intention{ @@ -973,7 +971,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }) require.Len(t, snap.IngressGateway.WatchedDiscoveryChains, 1) - require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, "api") + require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, api.String()) }, }, { @@ -994,7 +992,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { requiredWatches: map[string]verifyWatchRequest{ - "discovery-chain:api": genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + api.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -1003,7 +1001,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, events: []cache.UpdateEvent{ { - CorrelationID: "discovery-chain:api", + CorrelationID: "discovery-chain:" + api.String(), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "dc1", "trustdomain.consul", "dc1", nil), }, @@ -1012,16 +1010,16 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.IngressGateway.WatchedUpstreams, 1) - require.Len(t, snap.IngressGateway.WatchedUpstreams["api"], 1) + require.Len(t, snap.IngressGateway.WatchedUpstreams[api.String()], 1) }, }, { requiredWatches: map[string]verifyWatchRequest{ - "upstream-target:api.default.dc1:api": genVerifyServiceWatch("api", "", "dc1", true), + "upstream-target:api.default.dc1:" + api.String(): genVerifyServiceWatch("api", "", "dc1", true), }, events: []cache.UpdateEvent{ { - CorrelationID: "upstream-target:api.default.dc1:api", + CorrelationID: "upstream-target:api.default.dc1:" + api.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: structs.CheckServiceNodes{ { @@ -1041,10 +1039,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints, 1) - require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints, "api") - require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints["api"], 1) - require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints["api"], "api.default.dc1") - require.Equal(t, snap.IngressGateway.WatchedUpstreamEndpoints["api"]["api.default.dc1"], + require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints, api.String()) + require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints[api.String()], 1) + require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints[api.String()], "api.default.dc1") + require.Equal(t, snap.IngressGateway.WatchedUpstreamEndpoints[api.String()]["api.default.dc1"], structs.CheckServiceNodes{ { Node: &structs.Node{ @@ -1108,7 +1106,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.IngressGateway.Hosts, 1) require.Len(t, snap.IngressGateway.Upstreams, 1) require.Len(t, snap.IngressGateway.WatchedDiscoveryChains, 1) - require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, "api") + require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, api.String()) }, }, { @@ -1272,11 +1270,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { requiredWatches: map[string]verifyWatchRequest{ - "external-service:" + dbStr: genVerifyServiceWatch("db", "", "dc1", false), + "external-service:" + db.String(): genVerifyServiceWatch("db", "", "dc1", false), }, events: []cache.UpdateEvent{ { - CorrelationID: "external-service:" + dbStr, + CorrelationID: "external-service:" + db.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: structs.CheckServiceNodes{ { @@ -1317,11 +1315,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { requiredWatches: map[string]verifyWatchRequest{ - "external-service:" + apiStr: genVerifyServiceWatch("api", "", "dc1", false), + "external-service:" + api.String(): genVerifyServiceWatch("api", "", "dc1", false), }, events: []cache.UpdateEvent{ { - CorrelationID: "external-service:" + apiStr, + CorrelationID: "external-service:" + api.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: structs.CheckServiceNodes{ { @@ -1410,11 +1408,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { requiredWatches: map[string]verifyWatchRequest{ - "service-leaf:" + dbStr: genVerifyLeafWatch("db", "dc1"), + "service-leaf:" + db.String(): genVerifyLeafWatch("db", "dc1"), }, events: []cache.UpdateEvent{ { - CorrelationID: "service-leaf:" + dbStr, + CorrelationID: "service-leaf:" + db.String(), Result: issuedCert, Err: nil, }, @@ -1428,11 +1426,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { requiredWatches: map[string]verifyWatchRequest{ - serviceIntentionsIDPrefix + dbStr: genVerifyIntentionWatch("db", "dc1"), + serviceIntentionsIDPrefix + db.String(): genVerifyIntentionWatch("db", "dc1"), }, events: []cache.UpdateEvent{ { - CorrelationID: serviceIntentionsIDPrefix + dbStr, + CorrelationID: serviceIntentionsIDPrefix + db.String(), Result: dbIxnMatch, Err: nil, }, @@ -1449,11 +1447,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { requiredWatches: map[string]verifyWatchRequest{ - serviceConfigIDPrefix + dbStr: genVerifyResolvedConfigWatch("db", "dc1"), + serviceConfigIDPrefix + db.String(): genVerifyResolvedConfigWatch("db", "dc1"), }, events: []cache.UpdateEvent{ { - CorrelationID: serviceConfigIDPrefix + dbStr, + CorrelationID: serviceConfigIDPrefix + db.String(), Result: dbConfig, Err: nil, }, @@ -1468,11 +1466,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { requiredWatches: map[string]verifyWatchRequest{ - "service-resolver:" + dbStr: genVerifyResolverWatch("db", "dc1", structs.ServiceResolver), + "service-resolver:" + db.String(): genVerifyResolverWatch("db", "dc1", structs.ServiceResolver), }, events: []cache.UpdateEvent{ { - CorrelationID: "service-resolver:" + dbStr, + CorrelationID: "service-resolver:" + db.String(), Result: dbResolver, Err: nil, }, @@ -1542,6 +1540,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { Proxy: structs.ConnectProxyConfig{ DestinationServiceName: "api", Mode: structs.ProxyModeTransparent, + Upstreams: structs.Upstreams{ + { + DestinationName: "db", + }, + }, }, }, sourceDC: "dc1", @@ -1557,10 +1560,18 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid") - require.True(t, snap.ConnectProxy.IsEmpty()) require.True(t, snap.MeshGateway.IsEmpty()) require.True(t, snap.IngressGateway.IsEmpty()) require.True(t, snap.TerminatingGateway.IsEmpty()) + + require.False(t, snap.ConnectProxy.IsEmpty()) + expectUpstreams := map[string]*structs.Upstream{ + db.String(): { + DestinationName: "db", + DestinationNamespace: structs.IntentionDefaultNamespace, + }, + } + require.Equal(t, expectUpstreams, snap.ConnectProxy.UpstreamConfig) }, }, { @@ -1707,7 +1718,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.True(t, snap.Valid(), "should still be valid") // Should start watch for db's chain - require.Contains(t, snap.ConnectProxy.WatchedDiscoveryChains, dbStr) + require.Contains(t, snap.ConnectProxy.WatchedDiscoveryChains, db.String()) // Should not have results yet require.Empty(t, snap.ConnectProxy.DiscoveryChain) @@ -1723,7 +1734,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { // Discovery chain updates should be stored { requiredWatches: map[string]verifyWatchRequest{ - "discovery-chain:" + dbStr: genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + db.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -1734,7 +1745,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, events: []cache.UpdateEvent{ { - CorrelationID: "discovery-chain:" + dbStr, + CorrelationID: "discovery-chain:" + db.String(), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1", "trustdomain.consul", "dc1", nil), }, @@ -1743,16 +1754,16 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1) - require.Len(t, snap.ConnectProxy.WatchedUpstreams[dbStr], 1) + require.Len(t, snap.ConnectProxy.WatchedUpstreams[db.String()], 1) }, }, { requiredWatches: map[string]verifyWatchRequest{ - "upstream-target:db.default.dc1:" + dbStr: genVerifyServiceWatch("db", "", "dc1", true), + "upstream-target:db.default.dc1:" + db.String(): genVerifyServiceWatch("db", "", "dc1", true), }, events: []cache.UpdateEvent{ { - CorrelationID: "upstream-target:db.default.dc1:" + dbStr, + CorrelationID: "upstream-target:db.default.dc1:" + db.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: structs.CheckServiceNodes{ { @@ -1772,10 +1783,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1) - require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, dbStr) - require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbStr], 1) - require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbStr], "db.default.dc1") - require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbStr]["db.default.dc1"], + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, db.String()) + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[db.String()], 1) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[db.String()], "db.default.dc1") + require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[db.String()]["db.default.dc1"], structs.CheckServiceNodes{ { Node: &structs.Node{ diff --git a/agent/structs/connect_proxy_config.go b/agent/structs/connect_proxy_config.go index 7fa431d811..db4026aba4 100644 --- a/agent/structs/connect_proxy_config.go +++ b/agent/structs/connect_proxy_config.go @@ -436,28 +436,6 @@ func (k UpstreamKey) String() string { ) } -// Identifier returns a string representation that uniquely identifies the -// upstream in a canonical but human readable way. -func (u *Upstream) Identifier() string { - name := u.DestinationName - typ := u.DestinationType - - if typ != UpstreamDestTypePreparedQuery && u.DestinationNamespace != "" && u.DestinationNamespace != IntentionDefaultNamespace { - name = u.DestinationNamespace + "/" + u.DestinationName - } - if u.Datacenter != "" { - name += "?dc=" + u.Datacenter - } - - // Service is default type so never prefix it. This is more readable and long - // term it is the only type that matters so we can drop the prefix and have - // nicer naming in metrics etc. - if typ == "" || typ == UpstreamDestTypeService { - return name - } - return typ + ":" + name -} - // String implements Stringer by returning the Identifier. func (u *Upstream) String() string { return u.Identifier() diff --git a/agent/structs/connect_proxy_config_oss.go b/agent/structs/connect_proxy_config_oss.go index 990c235bd7..9ce5e26793 100644 --- a/agent/structs/connect_proxy_config_oss.go +++ b/agent/structs/connect_proxy_config_oss.go @@ -11,3 +11,25 @@ func (us *Upstream) DestinationID() ServiceID { ID: us.DestinationName, } } + +// Identifier returns a string representation that uniquely identifies the +// upstream in a canonical but human readable way. +func (us *Upstream) Identifier() string { + name := us.DestinationName + typ := us.DestinationType + + if typ != UpstreamDestTypePreparedQuery && us.DestinationNamespace != "" && us.DestinationNamespace != IntentionDefaultNamespace { + name = us.DestinationNamespace + "/" + us.DestinationName + } + if us.Datacenter != "" { + name += "?dc=" + us.Datacenter + } + + // Service is default type so never prefix it. This is more readable and long + // term it is the only type that matters so we can drop the prefix and have + // nicer naming in metrics etc. + if typ == "" || typ == UpstreamDestTypeService { + return name + } + return typ + ":" + name +} diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index ff36277da5..95c936e5e4 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -472,7 +472,7 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain( if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. - s.Logger.Warn("failed to parse", "upstream", upstream.Identifier(), + s.Logger.Warn("failed to parse", "upstream", id, "error", err) } @@ -487,7 +487,7 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain( } } else { s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configured for", - "discovery chain", chain.ServiceName, "upstream", upstream.Identifier(), + "discovery chain", chain.ServiceName, "upstream", id, "envoy_cluster_json", chain.ServiceName) } }