From f72774618d65b353ee5c205139d1f79d6fdc2c41 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Mon, 14 Jun 2021 17:20:27 -0500 Subject: [PATCH] xds: ensure that dependent xDS resources are reconfigured during primary type warming (#10381) Updates to a cluster will clear the associated endpoints, and updates to a listener will clear the associated routes. Update the incremental xDS logic to account for this implicit cleanup so that we can finish warming the clusters and listeners. Fixes #10379 --- .changelog/10381.txt | 3 + agent/xds/delta.go | 149 ++++++++--- agent/xds/delta_test.go | 327 +++++++++++++++++++++++++ agent/xds/listeners.go | 45 ++++ agent/xds/xds_protocol_helpers_test.go | 69 +++++- 5 files changed, 560 insertions(+), 33 deletions(-) create mode 100644 .changelog/10381.txt diff --git a/.changelog/10381.txt b/.changelog/10381.txt new file mode 100644 index 0000000000..0eb45f6047 --- /dev/null +++ b/.changelog/10381.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: (beta-only) ensure that dependent xDS resources are reconfigured during primary type warming +``` diff --git a/agent/xds/delta.go b/agent/xds/delta.go index f5fac86720..762ea31c48 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -120,6 +120,18 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove EndpointType: newDeltaType(generator, stream, EndpointType, nil), } + // Endpoints are stored within a Cluster (and Routes + // are stored within a Listener) so whenever the + // enclosing resource is updated the inner resource + // list is cleared implicitly. + // + // When this happens we should update our local + // representation of envoy state to force an update. + // + // see: https://github.com/envoyproxy/envoy/issues/13009 + handlers[ListenerType].childType = handlers[RouteType] + handlers[ClusterType].childType = handlers[EndpointType] + var authTimer <-chan time.Time extendAuthTimer := func() { authTimer = time.After(s.AuthCheckFrequency) @@ -177,6 +189,10 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove // index and hash the xDS structures newResourceMap := indexResources(generator.Logger, newRes) + if err := populateChildIndexMap(newResourceMap); err != nil { + return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err) + } + newVersions, err := computeResourceVersions(newResourceMap) if err != nil { return status.Errorf(codes.Unavailable, "failed to compute xDS resource versions: %v", err) @@ -352,6 +368,11 @@ type xDSDeltaType struct { typeURL string allowEmptyFn func(kind structs.ServiceKind) bool + // childType is a type that in Envoy is actually stored within this type. + // Upserts of THIS type should potentially trigger dependent named + // resources within the child to be re-configured. + childType *xDSDeltaType + // registered indicates if this type has been requested at least once by // the proxy registered bool @@ -373,8 +394,13 @@ type xDSDeltaType struct { // map. Once we get an ACK from envoy we'll update the resourceVersions map // and strike the entry from this map. // - // nonce -> name -> version - pendingUpdates map[string]map[string]string + // nonce -> name -> {version} + pendingUpdates map[string]map[string]PendingUpdate +} + +type PendingUpdate struct { + Version string + ChildResources []string // optional } func newDeltaType( @@ -389,7 +415,7 @@ func newDeltaType( typeURL: typeUrl, allowEmptyFn: allowEmptyFn, resourceVersions: make(map[string]string), - pendingUpdates: make(map[string]map[string]string), + pendingUpdates: make(map[string]map[string]PendingUpdate), } } @@ -511,11 +537,28 @@ func (t *xDSDeltaType) ack(nonce string) { return } - for name, version := range pending { - if version == "" { + for name, obj := range pending { + if obj.Version == "" { delete(t.resourceVersions, name) } else { - t.resourceVersions[name] = version + t.resourceVersions[name] = obj.Version + } + if t.childType != nil && obj.Version != "" { + // This branch only matters on UPDATE, since we already have + // mechanisms to clean up orphaned resources. + for _, childName := range obj.ChildResources { + if _, exist := t.childType.resourceVersions[childName]; exist { + t.generator.Logger.Trace( + "triggering implicit update of resource", + "typeUrl", t.typeURL, + "resource", name, + "childTypeUrl", t.childType.typeURL, + "childResource", childName, + ) + // Basically manifest this as a re-subscribe + t.childType.resourceVersions[childName] = "" + } + } } } t.sentToEnvoyOnce = true @@ -529,7 +572,7 @@ func (t *xDSDeltaType) nack(nonce string) { func (t *xDSDeltaType) SendIfNew( kind structs.ServiceKind, currentVersions map[string]string, // type => name => version (as consul knows right now) - resourceMap IndexedResources, + resourceMap *IndexedResources, nonce *uint64, upsert, remove bool, ) (error, bool) { @@ -571,6 +614,17 @@ func (t *xDSDeltaType) SendIfNew( } logger.Trace("sent response", "nonce", resp.Nonce) + if t.childType != nil { + // Capture the relevant child resource names on this pending update so + // we can properly clean up the linked children when this change is + // ACKed. + for name, obj := range updates { + if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok { + obj.ChildResources = children + updates[name] = obj + } + } + } t.pendingUpdates[resp.Nonce] = updates return nil, true @@ -578,13 +632,13 @@ func (t *xDSDeltaType) SendIfNew( func (t *xDSDeltaType) createDeltaResponse( currentVersions map[string]string, // name => version (as consul knows right now) - resourceMap IndexedResources, + resourceMap *IndexedResources, upsert, remove bool, -) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]string, error) { +) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]PendingUpdate, error) { // compute difference var ( hasRelevantUpdates = false - updates = make(map[string]string) + updates = make(map[string]PendingUpdate) ) // First find things that need updating or deleting for name, envoyVers := range t.resourceVersions { @@ -593,12 +647,12 @@ func (t *xDSDeltaType) createDeltaResponse( if remove { hasRelevantUpdates = true } - updates[name] = "" + updates[name] = PendingUpdate{Version: ""} } else if currVers != envoyVers { if upsert { hasRelevantUpdates = true } - updates[name] = currVers + updates[name] = PendingUpdate{Version: currVers} } } @@ -606,7 +660,7 @@ func (t *xDSDeltaType) createDeltaResponse( if t.wildcard { for name, currVers := range currentVersions { if _, ok := t.resourceVersions[name]; !ok { - updates[name] = currVers + updates[name] = PendingUpdate{Version: currVers} if upsert { hasRelevantUpdates = true } @@ -623,15 +677,15 @@ func (t *xDSDeltaType) createDeltaResponse( // TODO(rb): consider putting something in SystemVersionInfo? TypeUrl: t.typeURL, } - realUpdates := make(map[string]string) - for name, vers := range updates { - if vers == "" { + realUpdates := make(map[string]PendingUpdate) + for name, obj := range updates { + if obj.Version == "" { if remove { resp.RemovedResources = append(resp.RemovedResources, name) - realUpdates[name] = "" + realUpdates[name] = PendingUpdate{Version: ""} } } else if upsert { - resources, ok := resourceMap[t.typeURL] + resources, ok := resourceMap.Index[t.typeURL] if !ok { return nil, nil, fmt.Errorf("unknown type url: %s", t.typeURL) } @@ -647,18 +701,18 @@ func (t *xDSDeltaType) createDeltaResponse( resp.Resources = append(resp.Resources, &envoy_discovery_v3.Resource{ Name: name, Resource: any, - Version: vers, + Version: obj.Version, }) - realUpdates[name] = vers + realUpdates[name] = obj } } return resp, realUpdates, nil } -func computeResourceVersions(resourceMap IndexedResources) (map[string]map[string]string, error) { +func computeResourceVersions(resourceMap *IndexedResources) (map[string]map[string]string, error) { out := make(map[string]map[string]string) - for typeUrl, resources := range resourceMap { + for typeUrl, resources := range resourceMap.Index { m, err := hashResourceMap(resources) if err != nil { return nil, fmt.Errorf("failed to hash resources for %q: %v", typeUrl, err) @@ -668,18 +722,51 @@ func computeResourceVersions(resourceMap IndexedResources) (map[string]map[strin return out, nil } -type IndexedResources map[string]map[string]proto.Message +type IndexedResources struct { + // Index is a map of typeURL => resourceName => resource + Index map[string]map[string]proto.Message -func emptyIndexedResources() IndexedResources { - return map[string]map[string]proto.Message{ - ListenerType: make(map[string]proto.Message), - RouteType: make(map[string]proto.Message), - ClusterType: make(map[string]proto.Message), - EndpointType: make(map[string]proto.Message), + // ChildIndex is a map of typeURL => parentResourceName => list of + // childResourceNames. This only applies if the child and parent do not + // share a name. + ChildIndex map[string]map[string][]string +} + +func emptyIndexedResources() *IndexedResources { + return &IndexedResources{ + Index: map[string]map[string]proto.Message{ + ListenerType: make(map[string]proto.Message), + RouteType: make(map[string]proto.Message), + ClusterType: make(map[string]proto.Message), + EndpointType: make(map[string]proto.Message), + }, + ChildIndex: map[string]map[string][]string{ + ListenerType: make(map[string][]string), + ClusterType: make(map[string][]string), + }, } } -func indexResources(logger hclog.Logger, resources map[string][]proto.Message) IndexedResources { +func populateChildIndexMap(resourceMap *IndexedResources) error { + // LDS and RDS have a more complicated relationship. + for name, res := range resourceMap.Index[ListenerType] { + listener := res.(*envoy_listener_v3.Listener) + rdsRouteNames, err := extractRdsResourceNames(listener) + if err != nil { + return err + } + resourceMap.ChildIndex[ListenerType][name] = rdsRouteNames + } + + // CDS and EDS share exact names. + for name := range resourceMap.Index[ClusterType] { + resourceMap.ChildIndex[ClusterType][name] = []string{name} + } + + return nil +} + +func indexResources(logger hclog.Logger, resources map[string][]proto.Message) *IndexedResources { data := emptyIndexedResources() for typeURL, typeRes := range resources { @@ -688,7 +775,7 @@ func indexResources(logger hclog.Logger, resources map[string][]proto.Message) I if name == "" { logger.Warn("skipping unexpected xDS type found in delta snapshot", "typeURL", typeURL) } else { - data[typeURL][name] = res + data.Index[typeURL][name] = res } } } diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 56c43b60c0..638e2ff437 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -327,6 +327,333 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { } } +func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) { + aclResolve := func(id string) (acl.Authorizer, error) { + // Allow all + return acl.RootAuthorizer("manage"), nil + } + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy + + sid := structs.NewServiceID("web-sidecar-proxy", nil) + + // Register the proxy to create state needed to Watch() on + mgr.RegisterProxy(t, sid) + + var snap *proxycfg.ConfigSnapshot + runStep(t, "get into initial state", func(t *testing.T) { + snap = newTestSnapshot(t, nil, "") + + // Send initial cluster discover. + envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{}) + + // Check no response sent yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + requireProtocolVersionGauge(t, scenario, "v3", 1) + + // Deliver a new snapshot (tcp with one tcp upstream) + mgr.DeliverConfig(t, sid, snap) + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ClusterType, + Nonce: hexString(1), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "tcp:db"), + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + + // Envoy then tries to discover endpoints for those clusters. + envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", + }, + }) + + // It also (in parallel) issues the cluster ACK + envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil) + + // We should get a response immediately since the config is already present in + // the server for endpoints. Note that this should not be racy if the server + // is behaving well since the Cluster send above should be blocked until we + // deliver a new config version. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(2), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db"), + makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + }) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // Envoy now sends listener request + envoy.SendDeltaReq(t, ListenerType, nil) + + // It also (in parallel) issues the endpoint ACK + envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil) + + // And should get a response immediately. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ListenerType, + Nonce: hexString(3), + Resources: makeTestResources(t, + makeTestListener(t, snap, "tcp:public_listener"), + makeTestListener(t, snap, "tcp:db"), + makeTestListener(t, snap, "tcp:geo-cache"), + ), + }) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // ACKs the listener + envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil) + }) + + runStep(t, "trigger cluster update needing implicit endpoint replacements", func(t *testing.T) { + // Update the snapshot in a way that causes a single cluster update. + snap = newTestSnapshot(t, snap, "", &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "db", + ConnectTimeout: 1337 * time.Second, + }) + mgr.DeliverConfig(t, sid, snap) + + // The cluster is updated + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ClusterType, + Nonce: hexString(4), + Resources: makeTestResources(t, + // SAME makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "tcp:db:timeout"), + // SAME makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + + envoy.SendDeltaReqACK(t, ClusterType, 4, true, nil) + + // And we re-send the endpoints for the updated cluster after getting the + // ACK for the cluster. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(5), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "tcp:db"), + // SAME makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + }) + envoy.SendDeltaReqACK(t, EndpointType, 5, true, nil) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + }) + + envoy.Close() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } +} + +func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChangesImpactRoutes(t *testing.T) { + aclResolve := func(id string) (acl.Authorizer, error) { + // Allow all + return acl.RootAuthorizer("manage"), nil + } + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy + + sid := structs.NewServiceID("web-sidecar-proxy", nil) + + // Register the proxy to create state needed to Watch() on + mgr.RegisterProxy(t, sid) + + var snap *proxycfg.ConfigSnapshot + + runStep(t, "get into initial state", func(t *testing.T) { + // Send initial cluster discover (empty payload) + envoy.SendDeltaReq(t, ClusterType, nil) + + // Check no response sent yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // Deliver a new snapshot (tcp with one http upstream with no-op disco chain) + snap = newTestSnapshot(t, nil, "http2", &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "db", + Protocol: "http2", + }, &structs.ServiceRouterConfigEntry{ + Kind: structs.ServiceRouter, + Name: "db", + Routes: nil, + }) + mgr.DeliverConfig(t, sid, snap) + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ClusterType, + Nonce: hexString(1), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "http2:db"), + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + + // Envoy then tries to discover endpoints for those clusters. + envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", + }, + }) + + // It also (in parallel) issues the cluster ACK + envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil) + + // We should get a response immediately since the config is already present in + // the server for endpoints. Note that this should not be racy if the server + // is behaving well since the Cluster send above should be blocked until we + // deliver a new config version. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(2), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "http2:db"), + makeTestEndpoints(t, snap, "tcp:geo-cache"), + ), + }) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // Envoy now sends listener request + envoy.SendDeltaReq(t, ListenerType, nil) + + // It also (in parallel) issues the endpoint ACK + envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil) + + // And should get a response immediately. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ListenerType, + Nonce: hexString(3), + Resources: makeTestResources(t, + makeTestListener(t, snap, "tcp:public_listener"), + makeTestListener(t, snap, "http2:db:rds"), + makeTestListener(t, snap, "tcp:geo-cache"), + ), + }) + + // And no other response yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + // Envoy now sends routes request + envoy.SendDeltaReq(t, RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "db", + }, + }) + + // ACKs the listener + envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil) + + // And should get a response immediately. + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: RouteType, + Nonce: hexString(4), + Resources: makeTestResources(t, + makeTestRoute(t, "http2:db"), + ), + }) + + envoy.SendDeltaReqACK(t, RouteType, 4, true, nil) + + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + }) + + runStep(t, "trigger listener update needing implicit route replacements", func(t *testing.T) { + // Update the snapshot in a way that causes a single listener update. + // + // Downgrade from http2 to http + snap = newTestSnapshot(t, snap, "http", &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "db", + Protocol: "http", + }, &structs.ServiceRouterConfigEntry{ + Kind: structs.ServiceRouter, + Name: "db", + Routes: nil, + }) + mgr.DeliverConfig(t, sid, snap) + + // db cluster is refreshed (unrelated to the test scenario other than it's required) + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ClusterType, + Nonce: hexString(5), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "http:db"), + ), + }) + + // the listener is updated + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ListenerType, + Nonce: hexString(6), + Resources: makeTestResources(t, + makeTestListener(t, snap, "http:db:rds"), + ), + }) + + envoy.SendDeltaReqACK(t, ClusterType, 5, true, nil) + + // ACKs the listener + envoy.SendDeltaReqACK(t, ListenerType, 6, true, nil) + + // The behaviors of Cluster updates triggering re-sends of Endpoint updates + // tested in TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints + // triggers here. It is not explicitly under test, but we have to get past + // this exchange to get to the part we care about. + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: EndpointType, + Nonce: hexString(7), + Resources: makeTestResources(t, + makeTestEndpoints(t, snap, "http:db"), + ), + }) + + envoy.SendDeltaReqACK(t, EndpointType, 7, true, nil) + + // THE ACTUAL THING WE CARE ABOUT: replaced route config + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: RouteType, + Nonce: hexString(8), + Resources: makeTestResources(t, + makeTestRoute(t, "http2:db"), + ), + }) + + envoy.SendDeltaReqACK(t, RouteType, 8, true, nil) + + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + }) + + envoy.Close() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } +} + func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) { tests := []struct { name string diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index c4d5eb0090..1cd54b3426 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -663,6 +663,51 @@ const ( httpConnectionManagerNewName = "envoy.filters.network.http_connection_manager" ) +func extractRdsResourceNames(listener *envoy_listener_v3.Listener) ([]string, error) { + var found []string + + for chainIdx, chain := range listener.FilterChains { + for filterIdx, filter := range chain.Filters { + if filter.Name != httpConnectionManagerNewName { + continue + } + + tc, ok := filter.ConfigType.(*envoy_listener_v3.Filter_TypedConfig) + if !ok { + return nil, fmt.Errorf( + "filter chain %d has a %q filter %d with an unsupported config type: %T", + chainIdx, + filter.Name, + filterIdx, + filter.ConfigType, + ) + } + + var hcm envoy_http_v3.HttpConnectionManager + if err := ptypes.UnmarshalAny(tc.TypedConfig, &hcm); err != nil { + return nil, err + } + + if hcm.RouteSpecifier == nil { + continue + } + + rds, ok := hcm.RouteSpecifier.(*envoy_http_v3.HttpConnectionManager_Rds) + if !ok { + continue + } + + if rds.Rds == nil { + continue + } + + found = append(found, rds.Rds.RouteConfigName) + } + } + + return found, nil +} + // Locate the existing http connect manager L4 filter and inject our RBAC filter at the top. func injectHTTPFilterOnFilterChains( listener *envoy_listener_v3.Listener, diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index ed8608e01b..b518359a92 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -391,6 +391,24 @@ func makeTestCluster(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName st ConnectTimeout: ptypes.DurationProto(5 * time.Second), TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"), } + case "tcp:db:timeout": + return &envoy_cluster_v3.Cluster{ + Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{ + Type: envoy_cluster_v3.Cluster_EDS, + }, + EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{ + EdsConfig: xdsNewADSConfig(), + }, + CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{}, + OutlierDetection: &envoy_cluster_v3.OutlierDetection{}, + AltStatName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + CommonLbConfig: &envoy_cluster_v3.Cluster_CommonLbConfig{ + HealthyPanicThreshold: &envoy_type_v3.Percent{Value: 0}, + }, + ConnectTimeout: ptypes.DurationProto(1337 * time.Second), + TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"), + } case "http2:db": return &envoy_cluster_v3.Cluster{ Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", @@ -410,6 +428,25 @@ func makeTestCluster(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName st TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"), Http2ProtocolOptions: &envoy_core_v3.Http2ProtocolOptions{}, } + case "http:db": + return &envoy_cluster_v3.Cluster{ + Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{ + Type: envoy_cluster_v3.Cluster_EDS, + }, + EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{ + EdsConfig: xdsNewADSConfig(), + }, + CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{}, + OutlierDetection: &envoy_cluster_v3.OutlierDetection{}, + AltStatName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + CommonLbConfig: &envoy_cluster_v3.Cluster_CommonLbConfig{ + HealthyPanicThreshold: &envoy_type_v3.Percent{Value: 0}, + }, + ConnectTimeout: ptypes.DurationProto(5 * time.Second), + TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"), + // HttpProtocolOptions: &envoy_core_v3.Http1ProtocolOptions{}, + } case "tcp:geo-cache": return &envoy_cluster_v3.Cluster{ Name: "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", @@ -444,7 +481,7 @@ func makeTestEndpoints(t *testing.T, _ *proxycfg.ConfigSnapshot, fixtureName str }, }, } - case "http2:db": + case "http2:db", "http:db": return &envoy_endpoint_v3.ClusterLoadAssignment{ ClusterName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{ @@ -570,6 +607,34 @@ func makeTestListener(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName s }, }, } + case "http:db:rds": + return &envoy_listener_v3.Listener{ + Name: "db:127.0.0.1:9191", + Address: makeAddress("127.0.0.1", 9191), + TrafficDirection: envoy_core_v3.TrafficDirection_OUTBOUND, + FilterChains: []*envoy_listener_v3.FilterChain{ + { + Filters: []*envoy_listener_v3.Filter{ + xdsNewFilter(t, "envoy.filters.network.http_connection_manager", &envoy_http_v3.HttpConnectionManager{ + HttpFilters: []*envoy_http_v3.HttpFilter{ + {Name: "envoy.filters.http.router"}, + }, + RouteSpecifier: &envoy_http_v3.HttpConnectionManager_Rds{ + Rds: &envoy_http_v3.Rds{ + RouteConfigName: "db", + ConfigSource: xdsNewADSConfig(), + }, + }, + StatPrefix: "upstream.db.default.dc1", + Tracing: &envoy_http_v3.HttpConnectionManager_Tracing{ + RandomSampling: &envoy_type_v3.Percent{Value: 0}, + }, + // HttpProtocolOptions: &envoy_core_v3.Http1ProtocolOptions{}, + }), + }, + }, + }, + } case "tcp:geo-cache": return &envoy_listener_v3.Listener{ Name: "prepared_query:geo-cache:127.10.10.10:8181", @@ -596,7 +661,7 @@ func makeTestListener(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName s func makeTestRoute(t *testing.T, fixtureName string) *envoy_route_v3.RouteConfiguration { switch fixtureName { - case "http2:db": + case "http2:db", "http:db": return &envoy_route_v3.RouteConfiguration{ Name: "db", ValidateClusters: makeBoolValue(true),