mirror of https://github.com/status-im/consul.git
xds: ensure that dependent xDS resources are reconfigured during primary type warming (#10381)
Updates to a cluster will clear the associated endpoints, and updates to a listener will clear the associated routes. Update the incremental xDS logic to account for this implicit cleanup so that we can finish warming the clusters and listeners. Fixes #10379
This commit is contained in:
parent
ffb13f35f1
commit
848ad8535b
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
xds: (beta-only) ensure that dependent xDS resources are reconfigured during primary type warming
|
||||
```
|
|
@ -120,6 +120,18 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
|||
EndpointType: newDeltaType(generator, stream, EndpointType, nil),
|
||||
}
|
||||
|
||||
// Endpoints are stored within a Cluster (and Routes
|
||||
// are stored within a Listener) so whenever the
|
||||
// enclosing resource is updated the inner resource
|
||||
// list is cleared implicitly.
|
||||
//
|
||||
// When this happens we should update our local
|
||||
// representation of envoy state to force an update.
|
||||
//
|
||||
// see: https://github.com/envoyproxy/envoy/issues/13009
|
||||
handlers[ListenerType].childType = handlers[RouteType]
|
||||
handlers[ClusterType].childType = handlers[EndpointType]
|
||||
|
||||
var authTimer <-chan time.Time
|
||||
extendAuthTimer := func() {
|
||||
authTimer = time.After(s.AuthCheckFrequency)
|
||||
|
@ -177,6 +189,10 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
|||
// index and hash the xDS structures
|
||||
newResourceMap := indexResources(generator.Logger, newRes)
|
||||
|
||||
if err := populateChildIndexMap(newResourceMap); err != nil {
|
||||
return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err)
|
||||
}
|
||||
|
||||
newVersions, err := computeResourceVersions(newResourceMap)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Unavailable, "failed to compute xDS resource versions: %v", err)
|
||||
|
@ -352,6 +368,11 @@ type xDSDeltaType struct {
|
|||
typeURL string
|
||||
allowEmptyFn func(kind structs.ServiceKind) bool
|
||||
|
||||
// childType is a type that in Envoy is actually stored within this type.
|
||||
// Upserts of THIS type should potentially trigger dependent named
|
||||
// resources within the child to be re-configured.
|
||||
childType *xDSDeltaType
|
||||
|
||||
// registered indicates if this type has been requested at least once by
|
||||
// the proxy
|
||||
registered bool
|
||||
|
@ -373,8 +394,13 @@ type xDSDeltaType struct {
|
|||
// map. Once we get an ACK from envoy we'll update the resourceVersions map
|
||||
// and strike the entry from this map.
|
||||
//
|
||||
// nonce -> name -> version
|
||||
pendingUpdates map[string]map[string]string
|
||||
// nonce -> name -> {version}
|
||||
pendingUpdates map[string]map[string]PendingUpdate
|
||||
}
|
||||
|
||||
type PendingUpdate struct {
|
||||
Version string
|
||||
ChildResources []string // optional
|
||||
}
|
||||
|
||||
func newDeltaType(
|
||||
|
@ -389,7 +415,7 @@ func newDeltaType(
|
|||
typeURL: typeUrl,
|
||||
allowEmptyFn: allowEmptyFn,
|
||||
resourceVersions: make(map[string]string),
|
||||
pendingUpdates: make(map[string]map[string]string),
|
||||
pendingUpdates: make(map[string]map[string]PendingUpdate),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -511,11 +537,28 @@ func (t *xDSDeltaType) ack(nonce string) {
|
|||
return
|
||||
}
|
||||
|
||||
for name, version := range pending {
|
||||
if version == "" {
|
||||
for name, obj := range pending {
|
||||
if obj.Version == "" {
|
||||
delete(t.resourceVersions, name)
|
||||
} else {
|
||||
t.resourceVersions[name] = version
|
||||
t.resourceVersions[name] = obj.Version
|
||||
}
|
||||
if t.childType != nil && obj.Version != "" {
|
||||
// This branch only matters on UPDATE, since we already have
|
||||
// mechanisms to clean up orphaned resources.
|
||||
for _, childName := range obj.ChildResources {
|
||||
if _, exist := t.childType.resourceVersions[childName]; exist {
|
||||
t.generator.Logger.Trace(
|
||||
"triggering implicit update of resource",
|
||||
"typeUrl", t.typeURL,
|
||||
"resource", name,
|
||||
"childTypeUrl", t.childType.typeURL,
|
||||
"childResource", childName,
|
||||
)
|
||||
// Basically manifest this as a re-subscribe
|
||||
t.childType.resourceVersions[childName] = ""
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
t.sentToEnvoyOnce = true
|
||||
|
@ -529,7 +572,7 @@ func (t *xDSDeltaType) nack(nonce string) {
|
|||
func (t *xDSDeltaType) SendIfNew(
|
||||
kind structs.ServiceKind,
|
||||
currentVersions map[string]string, // type => name => version (as consul knows right now)
|
||||
resourceMap IndexedResources,
|
||||
resourceMap *IndexedResources,
|
||||
nonce *uint64,
|
||||
upsert, remove bool,
|
||||
) (error, bool) {
|
||||
|
@ -571,6 +614,17 @@ func (t *xDSDeltaType) SendIfNew(
|
|||
}
|
||||
logger.Trace("sent response", "nonce", resp.Nonce)
|
||||
|
||||
if t.childType != nil {
|
||||
// Capture the relevant child resource names on this pending update so
|
||||
// we can properly clean up the linked children when this change is
|
||||
// ACKed.
|
||||
for name, obj := range updates {
|
||||
if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok {
|
||||
obj.ChildResources = children
|
||||
updates[name] = obj
|
||||
}
|
||||
}
|
||||
}
|
||||
t.pendingUpdates[resp.Nonce] = updates
|
||||
|
||||
return nil, true
|
||||
|
@ -578,13 +632,13 @@ func (t *xDSDeltaType) SendIfNew(
|
|||
|
||||
func (t *xDSDeltaType) createDeltaResponse(
|
||||
currentVersions map[string]string, // name => version (as consul knows right now)
|
||||
resourceMap IndexedResources,
|
||||
resourceMap *IndexedResources,
|
||||
upsert, remove bool,
|
||||
) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]string, error) {
|
||||
) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]PendingUpdate, error) {
|
||||
// compute difference
|
||||
var (
|
||||
hasRelevantUpdates = false
|
||||
updates = make(map[string]string)
|
||||
updates = make(map[string]PendingUpdate)
|
||||
)
|
||||
// First find things that need updating or deleting
|
||||
for name, envoyVers := range t.resourceVersions {
|
||||
|
@ -593,12 +647,12 @@ func (t *xDSDeltaType) createDeltaResponse(
|
|||
if remove {
|
||||
hasRelevantUpdates = true
|
||||
}
|
||||
updates[name] = ""
|
||||
updates[name] = PendingUpdate{Version: ""}
|
||||
} else if currVers != envoyVers {
|
||||
if upsert {
|
||||
hasRelevantUpdates = true
|
||||
}
|
||||
updates[name] = currVers
|
||||
updates[name] = PendingUpdate{Version: currVers}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -606,7 +660,7 @@ func (t *xDSDeltaType) createDeltaResponse(
|
|||
if t.wildcard {
|
||||
for name, currVers := range currentVersions {
|
||||
if _, ok := t.resourceVersions[name]; !ok {
|
||||
updates[name] = currVers
|
||||
updates[name] = PendingUpdate{Version: currVers}
|
||||
if upsert {
|
||||
hasRelevantUpdates = true
|
||||
}
|
||||
|
@ -623,15 +677,15 @@ func (t *xDSDeltaType) createDeltaResponse(
|
|||
// TODO(rb): consider putting something in SystemVersionInfo?
|
||||
TypeUrl: t.typeURL,
|
||||
}
|
||||
realUpdates := make(map[string]string)
|
||||
for name, vers := range updates {
|
||||
if vers == "" {
|
||||
realUpdates := make(map[string]PendingUpdate)
|
||||
for name, obj := range updates {
|
||||
if obj.Version == "" {
|
||||
if remove {
|
||||
resp.RemovedResources = append(resp.RemovedResources, name)
|
||||
realUpdates[name] = ""
|
||||
realUpdates[name] = PendingUpdate{Version: ""}
|
||||
}
|
||||
} else if upsert {
|
||||
resources, ok := resourceMap[t.typeURL]
|
||||
resources, ok := resourceMap.Index[t.typeURL]
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("unknown type url: %s", t.typeURL)
|
||||
}
|
||||
|
@ -647,18 +701,18 @@ func (t *xDSDeltaType) createDeltaResponse(
|
|||
resp.Resources = append(resp.Resources, &envoy_discovery_v3.Resource{
|
||||
Name: name,
|
||||
Resource: any,
|
||||
Version: vers,
|
||||
Version: obj.Version,
|
||||
})
|
||||
realUpdates[name] = vers
|
||||
realUpdates[name] = obj
|
||||
}
|
||||
}
|
||||
|
||||
return resp, realUpdates, nil
|
||||
}
|
||||
|
||||
func computeResourceVersions(resourceMap IndexedResources) (map[string]map[string]string, error) {
|
||||
func computeResourceVersions(resourceMap *IndexedResources) (map[string]map[string]string, error) {
|
||||
out := make(map[string]map[string]string)
|
||||
for typeUrl, resources := range resourceMap {
|
||||
for typeUrl, resources := range resourceMap.Index {
|
||||
m, err := hashResourceMap(resources)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to hash resources for %q: %v", typeUrl, err)
|
||||
|
@ -668,18 +722,51 @@ func computeResourceVersions(resourceMap IndexedResources) (map[string]map[strin
|
|||
return out, nil
|
||||
}
|
||||
|
||||
type IndexedResources map[string]map[string]proto.Message
|
||||
type IndexedResources struct {
|
||||
// Index is a map of typeURL => resourceName => resource
|
||||
Index map[string]map[string]proto.Message
|
||||
|
||||
func emptyIndexedResources() IndexedResources {
|
||||
return map[string]map[string]proto.Message{
|
||||
ListenerType: make(map[string]proto.Message),
|
||||
RouteType: make(map[string]proto.Message),
|
||||
ClusterType: make(map[string]proto.Message),
|
||||
EndpointType: make(map[string]proto.Message),
|
||||
// ChildIndex is a map of typeURL => parentResourceName => list of
|
||||
// childResourceNames. This only applies if the child and parent do not
|
||||
// share a name.
|
||||
ChildIndex map[string]map[string][]string
|
||||
}
|
||||
|
||||
func emptyIndexedResources() *IndexedResources {
|
||||
return &IndexedResources{
|
||||
Index: map[string]map[string]proto.Message{
|
||||
ListenerType: make(map[string]proto.Message),
|
||||
RouteType: make(map[string]proto.Message),
|
||||
ClusterType: make(map[string]proto.Message),
|
||||
EndpointType: make(map[string]proto.Message),
|
||||
},
|
||||
ChildIndex: map[string]map[string][]string{
|
||||
ListenerType: make(map[string][]string),
|
||||
ClusterType: make(map[string][]string),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func indexResources(logger hclog.Logger, resources map[string][]proto.Message) IndexedResources {
|
||||
func populateChildIndexMap(resourceMap *IndexedResources) error {
|
||||
// LDS and RDS have a more complicated relationship.
|
||||
for name, res := range resourceMap.Index[ListenerType] {
|
||||
listener := res.(*envoy_listener_v3.Listener)
|
||||
rdsRouteNames, err := extractRdsResourceNames(listener)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resourceMap.ChildIndex[ListenerType][name] = rdsRouteNames
|
||||
}
|
||||
|
||||
// CDS and EDS share exact names.
|
||||
for name := range resourceMap.Index[ClusterType] {
|
||||
resourceMap.ChildIndex[ClusterType][name] = []string{name}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func indexResources(logger hclog.Logger, resources map[string][]proto.Message) *IndexedResources {
|
||||
data := emptyIndexedResources()
|
||||
|
||||
for typeURL, typeRes := range resources {
|
||||
|
@ -688,7 +775,7 @@ func indexResources(logger hclog.Logger, resources map[string][]proto.Message) I
|
|||
if name == "" {
|
||||
logger.Warn("skipping unexpected xDS type found in delta snapshot", "typeURL", typeURL)
|
||||
} else {
|
||||
data[typeURL][name] = res
|
||||
data.Index[typeURL][name] = res
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -327,6 +327,333 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) {
|
||||
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||
// Allow all
|
||||
return acl.RootAuthorizer("manage"), nil
|
||||
}
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
|
||||
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||
|
||||
// Register the proxy to create state needed to Watch() on
|
||||
mgr.RegisterProxy(t, sid)
|
||||
|
||||
var snap *proxycfg.ConfigSnapshot
|
||||
runStep(t, "get into initial state", func(t *testing.T) {
|
||||
snap = newTestSnapshot(t, nil, "")
|
||||
|
||||
// Send initial cluster discover.
|
||||
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
|
||||
|
||||
// Check no response sent yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
requireProtocolVersionGauge(t, scenario, "v3", 1)
|
||||
|
||||
// Deliver a new snapshot (tcp with one tcp upstream)
|
||||
mgr.DeliverConfig(t, sid, snap)
|
||||
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: ClusterType,
|
||||
Nonce: hexString(1),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestCluster(t, snap, "tcp:local_app"),
|
||||
makeTestCluster(t, snap, "tcp:db"),
|
||||
makeTestCluster(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
// Envoy then tries to discover endpoints for those clusters.
|
||||
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||
ResourceNamesSubscribe: []string{
|
||||
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
|
||||
},
|
||||
})
|
||||
|
||||
// It also (in parallel) issues the cluster ACK
|
||||
envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil)
|
||||
|
||||
// We should get a response immediately since the config is already present in
|
||||
// the server for endpoints. Note that this should not be racy if the server
|
||||
// is behaving well since the Cluster send above should be blocked until we
|
||||
// deliver a new config version.
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: EndpointType,
|
||||
Nonce: hexString(2),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestEndpoints(t, snap, "tcp:db"),
|
||||
makeTestEndpoints(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// Envoy now sends listener request
|
||||
envoy.SendDeltaReq(t, ListenerType, nil)
|
||||
|
||||
// It also (in parallel) issues the endpoint ACK
|
||||
envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil)
|
||||
|
||||
// And should get a response immediately.
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: ListenerType,
|
||||
Nonce: hexString(3),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestListener(t, snap, "tcp:public_listener"),
|
||||
makeTestListener(t, snap, "tcp:db"),
|
||||
makeTestListener(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// ACKs the listener
|
||||
envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil)
|
||||
})
|
||||
|
||||
runStep(t, "trigger cluster update needing implicit endpoint replacements", func(t *testing.T) {
|
||||
// Update the snapshot in a way that causes a single cluster update.
|
||||
snap = newTestSnapshot(t, snap, "", &structs.ServiceResolverConfigEntry{
|
||||
Kind: structs.ServiceResolver,
|
||||
Name: "db",
|
||||
ConnectTimeout: 1337 * time.Second,
|
||||
})
|
||||
mgr.DeliverConfig(t, sid, snap)
|
||||
|
||||
// The cluster is updated
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: ClusterType,
|
||||
Nonce: hexString(4),
|
||||
Resources: makeTestResources(t,
|
||||
// SAME makeTestCluster(t, snap, "tcp:local_app"),
|
||||
makeTestCluster(t, snap, "tcp:db:timeout"),
|
||||
// SAME makeTestCluster(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
envoy.SendDeltaReqACK(t, ClusterType, 4, true, nil)
|
||||
|
||||
// And we re-send the endpoints for the updated cluster after getting the
|
||||
// ACK for the cluster.
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: EndpointType,
|
||||
Nonce: hexString(5),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestEndpoints(t, snap, "tcp:db"),
|
||||
// SAME makeTestEndpoints(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
envoy.SendDeltaReqACK(t, EndpointType, 5, true, nil)
|
||||
|
||||
// And no other response yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
})
|
||||
|
||||
envoy.Close()
|
||||
select {
|
||||
case err := <-errCh:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatalf("timed out waiting for handler to finish")
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChangesImpactRoutes(t *testing.T) {
|
||||
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||
// Allow all
|
||||
return acl.RootAuthorizer("manage"), nil
|
||||
}
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
|
||||
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||
|
||||
// Register the proxy to create state needed to Watch() on
|
||||
mgr.RegisterProxy(t, sid)
|
||||
|
||||
var snap *proxycfg.ConfigSnapshot
|
||||
|
||||
runStep(t, "get into initial state", func(t *testing.T) {
|
||||
// Send initial cluster discover (empty payload)
|
||||
envoy.SendDeltaReq(t, ClusterType, nil)
|
||||
|
||||
// Check no response sent yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// Deliver a new snapshot (tcp with one http upstream with no-op disco chain)
|
||||
snap = newTestSnapshot(t, nil, "http2", &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "db",
|
||||
Protocol: "http2",
|
||||
}, &structs.ServiceRouterConfigEntry{
|
||||
Kind: structs.ServiceRouter,
|
||||
Name: "db",
|
||||
Routes: nil,
|
||||
})
|
||||
mgr.DeliverConfig(t, sid, snap)
|
||||
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: ClusterType,
|
||||
Nonce: hexString(1),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestCluster(t, snap, "tcp:local_app"),
|
||||
makeTestCluster(t, snap, "http2:db"),
|
||||
makeTestCluster(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
// Envoy then tries to discover endpoints for those clusters.
|
||||
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||
ResourceNamesSubscribe: []string{
|
||||
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
|
||||
},
|
||||
})
|
||||
|
||||
// It also (in parallel) issues the cluster ACK
|
||||
envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil)
|
||||
|
||||
// We should get a response immediately since the config is already present in
|
||||
// the server for endpoints. Note that this should not be racy if the server
|
||||
// is behaving well since the Cluster send above should be blocked until we
|
||||
// deliver a new config version.
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: EndpointType,
|
||||
Nonce: hexString(2),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestEndpoints(t, snap, "http2:db"),
|
||||
makeTestEndpoints(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// Envoy now sends listener request
|
||||
envoy.SendDeltaReq(t, ListenerType, nil)
|
||||
|
||||
// It also (in parallel) issues the endpoint ACK
|
||||
envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil)
|
||||
|
||||
// And should get a response immediately.
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: ListenerType,
|
||||
Nonce: hexString(3),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestListener(t, snap, "tcp:public_listener"),
|
||||
makeTestListener(t, snap, "http2:db:rds"),
|
||||
makeTestListener(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// Envoy now sends routes request
|
||||
envoy.SendDeltaReq(t, RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||
ResourceNamesSubscribe: []string{
|
||||
"db",
|
||||
},
|
||||
})
|
||||
|
||||
// ACKs the listener
|
||||
envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil)
|
||||
|
||||
// And should get a response immediately.
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: RouteType,
|
||||
Nonce: hexString(4),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestRoute(t, "http2:db"),
|
||||
),
|
||||
})
|
||||
|
||||
envoy.SendDeltaReqACK(t, RouteType, 4, true, nil)
|
||||
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
})
|
||||
|
||||
runStep(t, "trigger listener update needing implicit route replacements", func(t *testing.T) {
|
||||
// Update the snapshot in a way that causes a single listener update.
|
||||
//
|
||||
// Downgrade from http2 to http
|
||||
snap = newTestSnapshot(t, snap, "http", &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "db",
|
||||
Protocol: "http",
|
||||
}, &structs.ServiceRouterConfigEntry{
|
||||
Kind: structs.ServiceRouter,
|
||||
Name: "db",
|
||||
Routes: nil,
|
||||
})
|
||||
mgr.DeliverConfig(t, sid, snap)
|
||||
|
||||
// db cluster is refreshed (unrelated to the test scenario other than it's required)
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: ClusterType,
|
||||
Nonce: hexString(5),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestCluster(t, snap, "http:db"),
|
||||
),
|
||||
})
|
||||
|
||||
// the listener is updated
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: ListenerType,
|
||||
Nonce: hexString(6),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestListener(t, snap, "http:db:rds"),
|
||||
),
|
||||
})
|
||||
|
||||
envoy.SendDeltaReqACK(t, ClusterType, 5, true, nil)
|
||||
|
||||
// ACKs the listener
|
||||
envoy.SendDeltaReqACK(t, ListenerType, 6, true, nil)
|
||||
|
||||
// The behaviors of Cluster updates triggering re-sends of Endpoint updates
|
||||
// tested in TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints
|
||||
// triggers here. It is not explicitly under test, but we have to get past
|
||||
// this exchange to get to the part we care about.
|
||||
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: EndpointType,
|
||||
Nonce: hexString(7),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestEndpoints(t, snap, "http:db"),
|
||||
),
|
||||
})
|
||||
|
||||
envoy.SendDeltaReqACK(t, EndpointType, 7, true, nil)
|
||||
|
||||
// THE ACTUAL THING WE CARE ABOUT: replaced route config
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: RouteType,
|
||||
Nonce: hexString(8),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestRoute(t, "http2:db"),
|
||||
),
|
||||
})
|
||||
|
||||
envoy.SendDeltaReqACK(t, RouteType, 8, true, nil)
|
||||
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
})
|
||||
|
||||
envoy.Close()
|
||||
select {
|
||||
case err := <-errCh:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatalf("timed out waiting for handler to finish")
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
|
@ -663,6 +663,51 @@ const (
|
|||
httpConnectionManagerNewName = "envoy.filters.network.http_connection_manager"
|
||||
)
|
||||
|
||||
func extractRdsResourceNames(listener *envoy_listener_v3.Listener) ([]string, error) {
|
||||
var found []string
|
||||
|
||||
for chainIdx, chain := range listener.FilterChains {
|
||||
for filterIdx, filter := range chain.Filters {
|
||||
if filter.Name != httpConnectionManagerNewName {
|
||||
continue
|
||||
}
|
||||
|
||||
tc, ok := filter.ConfigType.(*envoy_listener_v3.Filter_TypedConfig)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf(
|
||||
"filter chain %d has a %q filter %d with an unsupported config type: %T",
|
||||
chainIdx,
|
||||
filter.Name,
|
||||
filterIdx,
|
||||
filter.ConfigType,
|
||||
)
|
||||
}
|
||||
|
||||
var hcm envoy_http_v3.HttpConnectionManager
|
||||
if err := ptypes.UnmarshalAny(tc.TypedConfig, &hcm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if hcm.RouteSpecifier == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
rds, ok := hcm.RouteSpecifier.(*envoy_http_v3.HttpConnectionManager_Rds)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if rds.Rds == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
found = append(found, rds.Rds.RouteConfigName)
|
||||
}
|
||||
}
|
||||
|
||||
return found, nil
|
||||
}
|
||||
|
||||
// Locate the existing http connect manager L4 filter and inject our RBAC filter at the top.
|
||||
func injectHTTPFilterOnFilterChains(
|
||||
listener *envoy_listener_v3.Listener,
|
||||
|
|
|
@ -391,6 +391,24 @@ func makeTestCluster(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName st
|
|||
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
|
||||
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
|
||||
}
|
||||
case "tcp:db:timeout":
|
||||
return &envoy_cluster_v3.Cluster{
|
||||
Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{
|
||||
Type: envoy_cluster_v3.Cluster_EDS,
|
||||
},
|
||||
EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{
|
||||
EdsConfig: xdsNewADSConfig(),
|
||||
},
|
||||
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{},
|
||||
OutlierDetection: &envoy_cluster_v3.OutlierDetection{},
|
||||
AltStatName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||
CommonLbConfig: &envoy_cluster_v3.Cluster_CommonLbConfig{
|
||||
HealthyPanicThreshold: &envoy_type_v3.Percent{Value: 0},
|
||||
},
|
||||
ConnectTimeout: ptypes.DurationProto(1337 * time.Second),
|
||||
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
|
||||
}
|
||||
case "http2:db":
|
||||
return &envoy_cluster_v3.Cluster{
|
||||
Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||
|
@ -410,6 +428,25 @@ func makeTestCluster(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName st
|
|||
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
|
||||
Http2ProtocolOptions: &envoy_core_v3.Http2ProtocolOptions{},
|
||||
}
|
||||
case "http:db":
|
||||
return &envoy_cluster_v3.Cluster{
|
||||
Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{
|
||||
Type: envoy_cluster_v3.Cluster_EDS,
|
||||
},
|
||||
EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{
|
||||
EdsConfig: xdsNewADSConfig(),
|
||||
},
|
||||
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{},
|
||||
OutlierDetection: &envoy_cluster_v3.OutlierDetection{},
|
||||
AltStatName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||
CommonLbConfig: &envoy_cluster_v3.Cluster_CommonLbConfig{
|
||||
HealthyPanicThreshold: &envoy_type_v3.Percent{Value: 0},
|
||||
},
|
||||
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
|
||||
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
|
||||
// HttpProtocolOptions: &envoy_core_v3.Http1ProtocolOptions{},
|
||||
}
|
||||
case "tcp:geo-cache":
|
||||
return &envoy_cluster_v3.Cluster{
|
||||
Name: "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
|
||||
|
@ -444,7 +481,7 @@ func makeTestEndpoints(t *testing.T, _ *proxycfg.ConfigSnapshot, fixtureName str
|
|||
},
|
||||
},
|
||||
}
|
||||
case "http2:db":
|
||||
case "http2:db", "http:db":
|
||||
return &envoy_endpoint_v3.ClusterLoadAssignment{
|
||||
ClusterName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{
|
||||
|
@ -570,6 +607,34 @@ func makeTestListener(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName s
|
|||
},
|
||||
},
|
||||
}
|
||||
case "http:db:rds":
|
||||
return &envoy_listener_v3.Listener{
|
||||
Name: "db:127.0.0.1:9191",
|
||||
Address: makeAddress("127.0.0.1", 9191),
|
||||
TrafficDirection: envoy_core_v3.TrafficDirection_OUTBOUND,
|
||||
FilterChains: []*envoy_listener_v3.FilterChain{
|
||||
{
|
||||
Filters: []*envoy_listener_v3.Filter{
|
||||
xdsNewFilter(t, "envoy.filters.network.http_connection_manager", &envoy_http_v3.HttpConnectionManager{
|
||||
HttpFilters: []*envoy_http_v3.HttpFilter{
|
||||
{Name: "envoy.filters.http.router"},
|
||||
},
|
||||
RouteSpecifier: &envoy_http_v3.HttpConnectionManager_Rds{
|
||||
Rds: &envoy_http_v3.Rds{
|
||||
RouteConfigName: "db",
|
||||
ConfigSource: xdsNewADSConfig(),
|
||||
},
|
||||
},
|
||||
StatPrefix: "upstream.db.default.dc1",
|
||||
Tracing: &envoy_http_v3.HttpConnectionManager_Tracing{
|
||||
RandomSampling: &envoy_type_v3.Percent{Value: 0},
|
||||
},
|
||||
// HttpProtocolOptions: &envoy_core_v3.Http1ProtocolOptions{},
|
||||
}),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
case "tcp:geo-cache":
|
||||
return &envoy_listener_v3.Listener{
|
||||
Name: "prepared_query:geo-cache:127.10.10.10:8181",
|
||||
|
@ -596,7 +661,7 @@ func makeTestListener(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName s
|
|||
|
||||
func makeTestRoute(t *testing.T, fixtureName string) *envoy_route_v3.RouteConfiguration {
|
||||
switch fixtureName {
|
||||
case "http2:db":
|
||||
case "http2:db", "http:db":
|
||||
return &envoy_route_v3.RouteConfiguration{
|
||||
Name: "db",
|
||||
ValidateClusters: makeBoolValue(true),
|
||||
|
|
Loading…
Reference in New Issue