From cbea3d203c44db86a37920c43015e10e66ecfd97 Mon Sep 17 00:00:00 2001 From: freddygv Date: Thu, 27 Jan 2022 23:49:06 -0700 Subject: [PATCH] Fix race of upstreams with same passthrough ip Due to timing, a transparent proxy could have two upstreams to dial directly with the same address. For example: - The orders service can dial upstreams shipping and payment directly. - An instance of shipping at address 10.0.0.1 is deregistered. - Payments is scaled up and scheduled to have address 10.0.0.1. - The orders service receives the event for the new payments instance before seeing the deregistration for the shipping instance. At this point two upstreams have the same passthrough address and Envoy will reject the listener configuration. To disambiguate this commit considers the Raft index when storing passthrough addresses. In the example above, 10.0.0.1 would only be associated with the newer payments service instance. --- agent/consul/gateway_locator.go | 2 +- agent/proxycfg/connect_proxy.go | 8 +- agent/proxycfg/manager_test.go | 2 + agent/proxycfg/snapshot.go | 17 +++- agent/proxycfg/state.go | 2 +- agent/proxycfg/state_test.go | 143 +++++++++++++++++++++++++++++++- agent/proxycfg/upstreams.go | 40 +++++++-- agent/structs/structs.go | 6 +- agent/structs/structs_test.go | 58 ++++++++++++- agent/xds/clusters.go | 2 +- agent/xds/endpoints.go | 4 +- agent/xds/listeners.go | 12 +-- 12 files changed, 270 insertions(+), 26 deletions(-) diff --git a/agent/consul/gateway_locator.go b/agent/consul/gateway_locator.go index cac63692f7..ce6e390337 100644 --- a/agent/consul/gateway_locator.go +++ b/agent/consul/gateway_locator.go @@ -455,7 +455,7 @@ func retainGateways(full structs.CheckServiceNodes) structs.CheckServiceNodes { func renderGatewayAddrs(gateways structs.CheckServiceNodes, wan bool) []string { out := make([]string, 0, len(gateways)) for _, csn := range gateways { - addr, port := csn.BestAddress(wan) + _, addr, port := csn.BestAddress(wan) completeAddr := ipaddr.FormatAddressPort(addr, port) out = append(out, completeAddr) } diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index 12b7028d63..64ce9020c5 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -28,6 +28,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e snap.ConnectProxy.PreparedQueryEndpoints = make(map[UpstreamID]structs.CheckServiceNodes) snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream) snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]map[string]map[string]struct{}) + snap.ConnectProxy.PassthroughIndices = make(map[string]indexedTarget) // Watch for root changes err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ @@ -326,11 +327,16 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv delete(snap.ConnectProxy.WatchedDiscoveryChains, uid) } } - for uid, _ := range snap.ConnectProxy.PassthroughUpstreams { + for uid := range snap.ConnectProxy.PassthroughUpstreams { if _, ok := seenUpstreams[uid]; !ok { delete(snap.ConnectProxy.PassthroughUpstreams, uid) } } + for addr, indexed := range snap.ConnectProxy.PassthroughIndices { + if _, ok := seenUpstreams[indexed.upstreamID]; !ok { + delete(snap.ConnectProxy.PassthroughIndices, addr) + } + } // These entries are intentionally handled separately from the WatchedDiscoveryChains above. // There have been situations where a discovery watch was cancelled, then fired. diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index c514794967..5ac37b793c 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -235,6 +235,7 @@ func TestManager_BasicLifecycle(t *testing.T) { NewUpstreamID(&upstreams[2]): &upstreams[2], }, PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, + PassthroughIndices: map[string]indexedTarget{}, }, PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, @@ -293,6 +294,7 @@ func TestManager_BasicLifecycle(t *testing.T) { NewUpstreamID(&upstreams[2]): &upstreams[2], }, PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, + PassthroughIndices: map[string]indexedTarget{}, }, PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index 1b4e0f6bcb..98aafa2629 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -52,15 +52,30 @@ type ConfigSnapshotUpstreams struct { UpstreamConfig map[UpstreamID]*structs.Upstream // PassthroughEndpoints is a map of: UpstreamID -> (map of TargetID -> - // (set of IP addresses)). + // (set of IP addresses)). It contains the upstream endpoints that + // can be dialed directly by a transparent proxy. PassthroughUpstreams map[UpstreamID]map[string]map[string]struct{} + // PassthroughIndices is a map of: address -> indexedTarget. + // It is used to track the modify index associated with a passthrough address. + // Tracking this index helps break ties when a single address is shared by + // more than one upstream due to a race. + PassthroughIndices map[string]indexedTarget + // IntentionUpstreams is a set of upstreams inferred from intentions. // // This list only applies to proxies registered in 'transparent' mode. IntentionUpstreams map[UpstreamID]struct{} } +// indexedTarget is used to associate the Raft modify index of a resource +// with the corresponding upstream target. +type indexedTarget struct { + upstreamID UpstreamID + targetID string + idx uint64 +} + type GatewayKey struct { Datacenter string Partition string diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index f31c62b4e1..0c55f034a1 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -412,7 +412,7 @@ func hostnameEndpoints(logger hclog.Logger, localKey GatewayKey, nodes structs.C ) for _, n := range nodes { - addr, _ := n.BestAddress(!localKey.Matches(n.Node.Datacenter, n.Node.PartitionOrDefault())) + _, addr, _ := n.BestAddress(!localKey.Matches(n.Node.Datacenter, n.Node.PartitionOrDefault())) if net.ParseIP(addr) != nil { hasIP = true continue diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 1e1a6269e0..c80e57f5dc 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -1909,12 +1909,18 @@ func TestState_WatchesAndUpdates(t *testing.T) { DialedDirectly: true, }, }, + RaftIndex: structs.RaftIndex{ + ModifyIndex: 12, + }, }, }, { Node: &structs.Node{ Node: "node2", Address: "10.0.0.2", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 21, + }, }, Service: &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, @@ -1960,12 +1966,18 @@ func TestState_WatchesAndUpdates(t *testing.T) { DialedDirectly: true, }, }, + RaftIndex: structs.RaftIndex{ + ModifyIndex: 12, + }, }, }, { Node: &structs.Node{ Node: "node2", Address: "10.0.0.2", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 21, + }, }, Service: &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, @@ -1992,6 +2004,18 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, }) + require.Equal(t, snap.ConnectProxy.PassthroughIndices, map[string]indexedTarget{ + "10.0.0.2": { + upstreamID: dbUID, + targetID: "db.default.default.dc1", + idx: 21, + }, + "10.10.10.10": { + upstreamID: dbUID, + targetID: "db.default.default.dc1", + idx: 12, + }, + }) }, }, // Discovery chain updates should be stored @@ -2043,6 +2067,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { Node: &structs.Node{ Node: "node2", Address: "10.0.0.2", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 21, + }, }, Service: &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, @@ -2074,6 +2101,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { Node: &structs.Node{ Node: "node2", Address: "10.0.0.2", + RaftIndex: structs.RaftIndex{ + ModifyIndex: 21, + }, }, Service: &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, @@ -2096,10 +2126,120 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, }) + require.Equal(t, snap.ConnectProxy.PassthroughIndices, map[string]indexedTarget{ + "10.0.0.2": { + upstreamID: dbUID, + targetID: "db.default.default.dc1", + idx: 21, + }, + }) }, }, - // Empty list of upstreams should clean everything up { + // Receive a new upstream target event with a conflicting passthrough address + events: []cache.UpdateEvent{ + { + CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(), + Result: &structs.IndexedCheckServiceNodes{ + Nodes: structs.CheckServiceNodes{ + { + Node: &structs.Node{ + Node: "node2", + }, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "api-sidecar-proxy", + Service: "api-sidecar-proxy", + Address: "10.0.0.2", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + TransparentProxy: structs.TransparentProxyConfig{ + DialedDirectly: true, + }, + }, + RaftIndex: structs.RaftIndex{ + ModifyIndex: 32, + }, + }, + }, + }, + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 2) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, apiUID) + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], 1) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], "api.default.default.dc1") + + // THe endpoint and passthrough address for proxy1 should be gone. + require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID]["api.default.default.dc1"], + structs.CheckServiceNodes{ + { + Node: &structs.Node{ + Node: "node2", + }, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "api-sidecar-proxy", + Service: "api-sidecar-proxy", + Address: "10.0.0.2", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + TransparentProxy: structs.TransparentProxyConfig{ + DialedDirectly: true, + }, + }, + RaftIndex: structs.RaftIndex{ + ModifyIndex: 32, + }, + }, + }, + }, + ) + require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]map[string]map[string]struct{}{ + apiUID: { + // This target has a higher index so the old passthrough address should be discarded. + "api.default.default.dc1": map[string]struct{}{ + "10.0.0.2": {}, + }, + }, + }) + require.Equal(t, snap.ConnectProxy.PassthroughIndices, map[string]indexedTarget{ + "10.0.0.2": { + upstreamID: apiUID, + targetID: "api.default.default.dc1", + idx: 32, + }, + }) + }, + }, + { + // Event with no nodes should clean up addrs + events: []cache.UpdateEvent{ + { + CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(), + Result: &structs.IndexedCheckServiceNodes{ + Nodes: structs.CheckServiceNodes{}, + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 2) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, apiUID) + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], 1) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID], "api.default.default.dc1") + + // The endpoint and passthrough address for proxy1 should be gone. + require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[apiUID]["api.default.default.dc1"]) + require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[apiUID]["api.default.default.dc1"]) + require.Empty(t, snap.ConnectProxy.PassthroughIndices) + }, + }, + { + // Empty list of upstreams should clean up map keys requiredWatches: map[string]verifyWatchRequest{ rootsWatchID: genVerifyRootsWatch("dc1"), intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, @@ -2128,6 +2268,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Empty(t, snap.ConnectProxy.DiscoveryChain) require.Empty(t, snap.ConnectProxy.IntentionUpstreams) require.Empty(t, snap.ConnectProxy.PassthroughUpstreams) + require.Empty(t, snap.ConnectProxy.PassthroughIndices) }, }, }, diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index 33b8015162..4cd406a42d 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -94,10 +94,17 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up return nil } - if _, ok := upstreamsSnapshot.PassthroughUpstreams[uid]; !ok { - upstreamsSnapshot.PassthroughUpstreams[uid] = make(map[string]map[string]struct{}) + // Clear out this target's existing passthrough upstreams and indices so that they can be repopulated below. + if _, ok := upstreamsSnapshot.PassthroughUpstreams[uid]; ok { + for addr := range upstreamsSnapshot.PassthroughUpstreams[uid][targetID] { + if indexed := upstreamsSnapshot.PassthroughIndices[addr]; indexed.targetID == targetID && indexed.upstreamID == uid { + delete(upstreamsSnapshot.PassthroughIndices, addr) + } + } + upstreamsSnapshot.PassthroughUpstreams[uid][targetID] = make(map[string]struct{}) } - upstreamsSnapshot.PassthroughUpstreams[uid][targetID] = make(map[string]struct{}) + + passthroughs := make(map[string]struct{}) for _, node := range resp.Nodes { if !node.Service.Proxy.TransparentProxy.DialedDirectly { @@ -107,8 +114,31 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up // Make sure to use an external address when crossing partitions. // Datacenter is not considered because transparent proxies cannot dial other datacenters. isRemote := !structs.EqualPartitions(node.Node.PartitionOrDefault(), s.proxyID.PartitionOrDefault()) - addr, _ := node.BestAddress(isRemote) - upstreamsSnapshot.PassthroughUpstreams[uid][targetID][addr] = struct{}{} + csnIdx, addr, _ := node.BestAddress(isRemote) + + existing := upstreamsSnapshot.PassthroughIndices[addr] + if existing.idx > csnIdx { + // The last known instance with this address had a higher index so it takes precedence. + continue + } + + // The current instance has a higher Raft index so we ensure the passthrough address is only + // associated with this upstream target. Older associations are cleaned up as needed. + delete(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID][existing.targetID], addr) + if len(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID][existing.targetID]) == 0 { + delete(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID], existing.targetID) + } + if len(upstreamsSnapshot.PassthroughUpstreams[existing.upstreamID]) == 0 { + delete(upstreamsSnapshot.PassthroughUpstreams, existing.upstreamID) + } + + upstreamsSnapshot.PassthroughIndices[addr] = indexedTarget{idx: csnIdx, upstreamID: uid, targetID: targetID} + passthroughs[addr] = struct{}{} + } + if len(passthroughs) > 0 { + upstreamsSnapshot.PassthroughUpstreams[uid] = map[string]map[string]struct{}{ + targetID: passthroughs, + } } case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"): diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 168c70efe1..e2e94b4387 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1741,7 +1741,7 @@ type CheckServiceNode struct { Checks HealthChecks } -func (csn *CheckServiceNode) BestAddress(wan bool) (string, int) { +func (csn *CheckServiceNode) BestAddress(wan bool) (uint64, string, int) { // TODO (mesh-gateway) needs a test // best address // wan @@ -1754,12 +1754,14 @@ func (csn *CheckServiceNode) BestAddress(wan bool) (string, int) { // node addr addr, port := csn.Service.BestAddress(wan) + idx := csn.Service.ModifyIndex if addr == "" { addr = csn.Node.BestAddress(wan) + idx = csn.Node.ModifyIndex } - return addr, port + return idx, addr, port } func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecision { diff --git a/agent/structs/structs_test.go b/agent/structs/structs_test.go index b4ea8e50c5..f87d568d43 100644 --- a/agent/structs/structs_test.go +++ b/agent/structs/structs_test.go @@ -2105,14 +2105,18 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { input CheckServiceNode lanAddr string lanPort int + lanIdx uint64 wanAddr string wanPort int + wanIdx uint64 } nodeAddr := "10.1.2.3" nodeWANAddr := "198.18.19.20" + nodeIdx := uint64(11) serviceAddr := "10.2.3.4" servicePort := 1234 + serviceIdx := uint64(22) serviceWANAddr := "198.19.20.21" serviceWANPort := 987 @@ -2121,15 +2125,23 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { input: CheckServiceNode{ Node: &Node{ Address: nodeAddr, + RaftIndex: RaftIndex{ + ModifyIndex: nodeIdx, + }, }, Service: &NodeService{ Port: servicePort, + RaftIndex: RaftIndex{ + ModifyIndex: serviceIdx, + }, }, }, lanAddr: nodeAddr, + lanIdx: nodeIdx, lanPort: servicePort, wanAddr: nodeAddr, + wanIdx: nodeIdx, wanPort: servicePort, }, "node-wan-address": { @@ -2139,15 +2151,23 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { TaggedAddresses: map[string]string{ "wan": nodeWANAddr, }, + RaftIndex: RaftIndex{ + ModifyIndex: nodeIdx, + }, }, Service: &NodeService{ Port: servicePort, + RaftIndex: RaftIndex{ + ModifyIndex: serviceIdx, + }, }, }, lanAddr: nodeAddr, + lanIdx: nodeIdx, lanPort: servicePort, wanAddr: nodeWANAddr, + wanIdx: nodeIdx, wanPort: servicePort, }, "service-address": { @@ -2158,16 +2178,24 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { TaggedAddresses: map[string]string{ "wan": nodeWANAddr, }, + RaftIndex: RaftIndex{ + ModifyIndex: nodeIdx, + }, }, Service: &NodeService{ Address: serviceAddr, Port: servicePort, + RaftIndex: RaftIndex{ + ModifyIndex: serviceIdx, + }, }, }, lanAddr: serviceAddr, + lanIdx: serviceIdx, lanPort: servicePort, wanAddr: serviceAddr, + wanIdx: serviceIdx, wanPort: servicePort, }, "service-wan-address": { @@ -2178,6 +2206,9 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { TaggedAddresses: map[string]string{ "wan": nodeWANAddr, }, + RaftIndex: RaftIndex{ + ModifyIndex: nodeIdx, + }, }, Service: &NodeService{ Address: serviceAddr, @@ -2188,12 +2219,17 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { Port: serviceWANPort, }, }, + RaftIndex: RaftIndex{ + ModifyIndex: serviceIdx, + }, }, }, lanAddr: serviceAddr, + lanIdx: serviceIdx, lanPort: servicePort, wanAddr: serviceWANAddr, + wanIdx: serviceIdx, wanPort: serviceWANPort, }, "service-wan-address-default-port": { @@ -2204,6 +2240,9 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { TaggedAddresses: map[string]string{ "wan": nodeWANAddr, }, + RaftIndex: RaftIndex{ + ModifyIndex: nodeIdx, + }, }, Service: &NodeService{ Address: serviceAddr, @@ -2214,12 +2253,17 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { Port: 0, }, }, + RaftIndex: RaftIndex{ + ModifyIndex: serviceIdx, + }, }, }, lanAddr: serviceAddr, + lanIdx: serviceIdx, lanPort: servicePort, wanAddr: serviceWANAddr, + wanIdx: serviceIdx, wanPort: servicePort, }, "service-wan-address-node-lan": { @@ -2230,6 +2274,9 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { TaggedAddresses: map[string]string{ "wan": nodeWANAddr, }, + RaftIndex: RaftIndex{ + ModifyIndex: nodeIdx, + }, }, Service: &NodeService{ Port: servicePort, @@ -2239,12 +2286,17 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { Port: serviceWANPort, }, }, + RaftIndex: RaftIndex{ + ModifyIndex: serviceIdx, + }, }, }, lanAddr: nodeAddr, + lanIdx: nodeIdx, lanPort: servicePort, wanAddr: serviceWANAddr, + wanIdx: serviceIdx, wanPort: serviceWANPort, }, } @@ -2254,13 +2306,15 @@ func TestCheckServiceNode_BestAddress(t *testing.T) { tc := tc t.Run(name, func(t *testing.T) { - addr, port := tc.input.BestAddress(false) + idx, addr, port := tc.input.BestAddress(false) require.Equal(t, tc.lanAddr, addr) require.Equal(t, tc.lanPort, port) + require.Equal(t, tc.lanIdx, idx) - addr, port = tc.input.BestAddress(true) + idx, addr, port = tc.input.BestAddress(true) require.Equal(t, tc.wanAddr, addr) require.Equal(t, tc.wanPort, port) + require.Equal(t, tc.wanIdx, idx) }) } } diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index ab27bc9b5f..283df4125b 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -907,7 +907,7 @@ func (s *ResourceGenerator) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, op fallback *envoy_endpoint_v3.LbEndpoint ) for i, e := range opts.hostnameEndpoints { - addr, port := e.BestAddress(opts.isRemote) + _, addr, port := e.BestAddress(opts.isRemote) uniqueHostnames[addr] = true health, weight := calculateEndpointHealthAndWeight(e, opts.onlyPassing) diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index 4742ea5264..9981dc9401 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -221,7 +221,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C for _, srv := range cfgSnap.MeshGateway.ConsulServers { clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node) - addr, port := srv.BestAddress(false /*wan*/) + _, addr, port := srv.BestAddress(false /*wan*/) lbEndpoint := &envoy_endpoint_v3.LbEndpoint{ HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ @@ -512,7 +512,7 @@ func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpo for _, ep := range endpoints { // TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc? - addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault())) + _, addr, port := ep.BestAddress(!localKey.Matches(ep.Node.Datacenter, ep.Node.PartitionOrDefault())) healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing) if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN { diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index e7e87724b7..bf087780ad 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -218,20 +218,14 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. // as opposed to via a virtual IP. var passthroughChains []*envoy_listener_v3.FilterChain - for _, target := range cfgSnap.ConnectProxy.PassthroughUpstreams { - for tid, addrs := range target { + for _, targets := range cfgSnap.ConnectProxy.PassthroughUpstreams { + for tid, addrs := range targets { uid := proxycfg.NewUpstreamIDFromTargetID(tid) sni := connect.ServiceSNI( uid.Name, "", uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain) - u := structs.Upstream{ - DestinationName: uid.Name, - DestinationNamespace: uid.NamespaceOrDefault(), - DestinationPartition: uid.PartitionOrDefault(), - } - - filterName := fmt.Sprintf("%s.%s.%s.%s", u.DestinationName, u.DestinationNamespace, u.DestinationPartition, cfgSnap.Datacenter) + filterName := fmt.Sprintf("%s.%s.%s.%s", uid.Name, uid.NamespaceOrDefault(), uid.PartitionOrDefault(), cfgSnap.Datacenter) filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{ clusterName: "passthrough~" + sni,