mirror of
https://github.com/status-im/consul.git
synced 2025-02-01 08:27:20 +00:00
xds: ensure that dependent xDS resources are reconfigured during primary type warming (#10381)
Updates to a cluster will clear the associated endpoints, and updates to a listener will clear the associated routes. Update the incremental xDS logic to account for this implicit cleanup so that we can finish warming the clusters and listeners. Fixes #10379
This commit is contained in:
parent
645e406ca0
commit
f72774618d
3
.changelog/10381.txt
Normal file
3
.changelog/10381.txt
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
```release-note:bug
|
||||||
|
xds: (beta-only) ensure that dependent xDS resources are reconfigured during primary type warming
|
||||||
|
```
|
@ -120,6 +120,18 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
|||||||
EndpointType: newDeltaType(generator, stream, EndpointType, nil),
|
EndpointType: newDeltaType(generator, stream, EndpointType, nil),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Endpoints are stored within a Cluster (and Routes
|
||||||
|
// are stored within a Listener) so whenever the
|
||||||
|
// enclosing resource is updated the inner resource
|
||||||
|
// list is cleared implicitly.
|
||||||
|
//
|
||||||
|
// When this happens we should update our local
|
||||||
|
// representation of envoy state to force an update.
|
||||||
|
//
|
||||||
|
// see: https://github.com/envoyproxy/envoy/issues/13009
|
||||||
|
handlers[ListenerType].childType = handlers[RouteType]
|
||||||
|
handlers[ClusterType].childType = handlers[EndpointType]
|
||||||
|
|
||||||
var authTimer <-chan time.Time
|
var authTimer <-chan time.Time
|
||||||
extendAuthTimer := func() {
|
extendAuthTimer := func() {
|
||||||
authTimer = time.After(s.AuthCheckFrequency)
|
authTimer = time.After(s.AuthCheckFrequency)
|
||||||
@ -177,6 +189,10 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
|||||||
// index and hash the xDS structures
|
// index and hash the xDS structures
|
||||||
newResourceMap := indexResources(generator.Logger, newRes)
|
newResourceMap := indexResources(generator.Logger, newRes)
|
||||||
|
|
||||||
|
if err := populateChildIndexMap(newResourceMap); err != nil {
|
||||||
|
return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
newVersions, err := computeResourceVersions(newResourceMap)
|
newVersions, err := computeResourceVersions(newResourceMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return status.Errorf(codes.Unavailable, "failed to compute xDS resource versions: %v", err)
|
return status.Errorf(codes.Unavailable, "failed to compute xDS resource versions: %v", err)
|
||||||
@ -352,6 +368,11 @@ type xDSDeltaType struct {
|
|||||||
typeURL string
|
typeURL string
|
||||||
allowEmptyFn func(kind structs.ServiceKind) bool
|
allowEmptyFn func(kind structs.ServiceKind) bool
|
||||||
|
|
||||||
|
// childType is a type that in Envoy is actually stored within this type.
|
||||||
|
// Upserts of THIS type should potentially trigger dependent named
|
||||||
|
// resources within the child to be re-configured.
|
||||||
|
childType *xDSDeltaType
|
||||||
|
|
||||||
// registered indicates if this type has been requested at least once by
|
// registered indicates if this type has been requested at least once by
|
||||||
// the proxy
|
// the proxy
|
||||||
registered bool
|
registered bool
|
||||||
@ -373,8 +394,13 @@ type xDSDeltaType struct {
|
|||||||
// map. Once we get an ACK from envoy we'll update the resourceVersions map
|
// map. Once we get an ACK from envoy we'll update the resourceVersions map
|
||||||
// and strike the entry from this map.
|
// and strike the entry from this map.
|
||||||
//
|
//
|
||||||
// nonce -> name -> version
|
// nonce -> name -> {version}
|
||||||
pendingUpdates map[string]map[string]string
|
pendingUpdates map[string]map[string]PendingUpdate
|
||||||
|
}
|
||||||
|
|
||||||
|
type PendingUpdate struct {
|
||||||
|
Version string
|
||||||
|
ChildResources []string // optional
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDeltaType(
|
func newDeltaType(
|
||||||
@ -389,7 +415,7 @@ func newDeltaType(
|
|||||||
typeURL: typeUrl,
|
typeURL: typeUrl,
|
||||||
allowEmptyFn: allowEmptyFn,
|
allowEmptyFn: allowEmptyFn,
|
||||||
resourceVersions: make(map[string]string),
|
resourceVersions: make(map[string]string),
|
||||||
pendingUpdates: make(map[string]map[string]string),
|
pendingUpdates: make(map[string]map[string]PendingUpdate),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -511,11 +537,28 @@ func (t *xDSDeltaType) ack(nonce string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for name, version := range pending {
|
for name, obj := range pending {
|
||||||
if version == "" {
|
if obj.Version == "" {
|
||||||
delete(t.resourceVersions, name)
|
delete(t.resourceVersions, name)
|
||||||
} else {
|
} else {
|
||||||
t.resourceVersions[name] = version
|
t.resourceVersions[name] = obj.Version
|
||||||
|
}
|
||||||
|
if t.childType != nil && obj.Version != "" {
|
||||||
|
// This branch only matters on UPDATE, since we already have
|
||||||
|
// mechanisms to clean up orphaned resources.
|
||||||
|
for _, childName := range obj.ChildResources {
|
||||||
|
if _, exist := t.childType.resourceVersions[childName]; exist {
|
||||||
|
t.generator.Logger.Trace(
|
||||||
|
"triggering implicit update of resource",
|
||||||
|
"typeUrl", t.typeURL,
|
||||||
|
"resource", name,
|
||||||
|
"childTypeUrl", t.childType.typeURL,
|
||||||
|
"childResource", childName,
|
||||||
|
)
|
||||||
|
// Basically manifest this as a re-subscribe
|
||||||
|
t.childType.resourceVersions[childName] = ""
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.sentToEnvoyOnce = true
|
t.sentToEnvoyOnce = true
|
||||||
@ -529,7 +572,7 @@ func (t *xDSDeltaType) nack(nonce string) {
|
|||||||
func (t *xDSDeltaType) SendIfNew(
|
func (t *xDSDeltaType) SendIfNew(
|
||||||
kind structs.ServiceKind,
|
kind structs.ServiceKind,
|
||||||
currentVersions map[string]string, // type => name => version (as consul knows right now)
|
currentVersions map[string]string, // type => name => version (as consul knows right now)
|
||||||
resourceMap IndexedResources,
|
resourceMap *IndexedResources,
|
||||||
nonce *uint64,
|
nonce *uint64,
|
||||||
upsert, remove bool,
|
upsert, remove bool,
|
||||||
) (error, bool) {
|
) (error, bool) {
|
||||||
@ -571,6 +614,17 @@ func (t *xDSDeltaType) SendIfNew(
|
|||||||
}
|
}
|
||||||
logger.Trace("sent response", "nonce", resp.Nonce)
|
logger.Trace("sent response", "nonce", resp.Nonce)
|
||||||
|
|
||||||
|
if t.childType != nil {
|
||||||
|
// Capture the relevant child resource names on this pending update so
|
||||||
|
// we can properly clean up the linked children when this change is
|
||||||
|
// ACKed.
|
||||||
|
for name, obj := range updates {
|
||||||
|
if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok {
|
||||||
|
obj.ChildResources = children
|
||||||
|
updates[name] = obj
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
t.pendingUpdates[resp.Nonce] = updates
|
t.pendingUpdates[resp.Nonce] = updates
|
||||||
|
|
||||||
return nil, true
|
return nil, true
|
||||||
@ -578,13 +632,13 @@ func (t *xDSDeltaType) SendIfNew(
|
|||||||
|
|
||||||
func (t *xDSDeltaType) createDeltaResponse(
|
func (t *xDSDeltaType) createDeltaResponse(
|
||||||
currentVersions map[string]string, // name => version (as consul knows right now)
|
currentVersions map[string]string, // name => version (as consul knows right now)
|
||||||
resourceMap IndexedResources,
|
resourceMap *IndexedResources,
|
||||||
upsert, remove bool,
|
upsert, remove bool,
|
||||||
) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]string, error) {
|
) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]PendingUpdate, error) {
|
||||||
// compute difference
|
// compute difference
|
||||||
var (
|
var (
|
||||||
hasRelevantUpdates = false
|
hasRelevantUpdates = false
|
||||||
updates = make(map[string]string)
|
updates = make(map[string]PendingUpdate)
|
||||||
)
|
)
|
||||||
// First find things that need updating or deleting
|
// First find things that need updating or deleting
|
||||||
for name, envoyVers := range t.resourceVersions {
|
for name, envoyVers := range t.resourceVersions {
|
||||||
@ -593,12 +647,12 @@ func (t *xDSDeltaType) createDeltaResponse(
|
|||||||
if remove {
|
if remove {
|
||||||
hasRelevantUpdates = true
|
hasRelevantUpdates = true
|
||||||
}
|
}
|
||||||
updates[name] = ""
|
updates[name] = PendingUpdate{Version: ""}
|
||||||
} else if currVers != envoyVers {
|
} else if currVers != envoyVers {
|
||||||
if upsert {
|
if upsert {
|
||||||
hasRelevantUpdates = true
|
hasRelevantUpdates = true
|
||||||
}
|
}
|
||||||
updates[name] = currVers
|
updates[name] = PendingUpdate{Version: currVers}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -606,7 +660,7 @@ func (t *xDSDeltaType) createDeltaResponse(
|
|||||||
if t.wildcard {
|
if t.wildcard {
|
||||||
for name, currVers := range currentVersions {
|
for name, currVers := range currentVersions {
|
||||||
if _, ok := t.resourceVersions[name]; !ok {
|
if _, ok := t.resourceVersions[name]; !ok {
|
||||||
updates[name] = currVers
|
updates[name] = PendingUpdate{Version: currVers}
|
||||||
if upsert {
|
if upsert {
|
||||||
hasRelevantUpdates = true
|
hasRelevantUpdates = true
|
||||||
}
|
}
|
||||||
@ -623,15 +677,15 @@ func (t *xDSDeltaType) createDeltaResponse(
|
|||||||
// TODO(rb): consider putting something in SystemVersionInfo?
|
// TODO(rb): consider putting something in SystemVersionInfo?
|
||||||
TypeUrl: t.typeURL,
|
TypeUrl: t.typeURL,
|
||||||
}
|
}
|
||||||
realUpdates := make(map[string]string)
|
realUpdates := make(map[string]PendingUpdate)
|
||||||
for name, vers := range updates {
|
for name, obj := range updates {
|
||||||
if vers == "" {
|
if obj.Version == "" {
|
||||||
if remove {
|
if remove {
|
||||||
resp.RemovedResources = append(resp.RemovedResources, name)
|
resp.RemovedResources = append(resp.RemovedResources, name)
|
||||||
realUpdates[name] = ""
|
realUpdates[name] = PendingUpdate{Version: ""}
|
||||||
}
|
}
|
||||||
} else if upsert {
|
} else if upsert {
|
||||||
resources, ok := resourceMap[t.typeURL]
|
resources, ok := resourceMap.Index[t.typeURL]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil, fmt.Errorf("unknown type url: %s", t.typeURL)
|
return nil, nil, fmt.Errorf("unknown type url: %s", t.typeURL)
|
||||||
}
|
}
|
||||||
@ -647,18 +701,18 @@ func (t *xDSDeltaType) createDeltaResponse(
|
|||||||
resp.Resources = append(resp.Resources, &envoy_discovery_v3.Resource{
|
resp.Resources = append(resp.Resources, &envoy_discovery_v3.Resource{
|
||||||
Name: name,
|
Name: name,
|
||||||
Resource: any,
|
Resource: any,
|
||||||
Version: vers,
|
Version: obj.Version,
|
||||||
})
|
})
|
||||||
realUpdates[name] = vers
|
realUpdates[name] = obj
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, realUpdates, nil
|
return resp, realUpdates, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func computeResourceVersions(resourceMap IndexedResources) (map[string]map[string]string, error) {
|
func computeResourceVersions(resourceMap *IndexedResources) (map[string]map[string]string, error) {
|
||||||
out := make(map[string]map[string]string)
|
out := make(map[string]map[string]string)
|
||||||
for typeUrl, resources := range resourceMap {
|
for typeUrl, resources := range resourceMap.Index {
|
||||||
m, err := hashResourceMap(resources)
|
m, err := hashResourceMap(resources)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to hash resources for %q: %v", typeUrl, err)
|
return nil, fmt.Errorf("failed to hash resources for %q: %v", typeUrl, err)
|
||||||
@ -668,18 +722,51 @@ func computeResourceVersions(resourceMap IndexedResources) (map[string]map[strin
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type IndexedResources map[string]map[string]proto.Message
|
type IndexedResources struct {
|
||||||
|
// Index is a map of typeURL => resourceName => resource
|
||||||
|
Index map[string]map[string]proto.Message
|
||||||
|
|
||||||
func emptyIndexedResources() IndexedResources {
|
// ChildIndex is a map of typeURL => parentResourceName => list of
|
||||||
return map[string]map[string]proto.Message{
|
// childResourceNames. This only applies if the child and parent do not
|
||||||
ListenerType: make(map[string]proto.Message),
|
// share a name.
|
||||||
RouteType: make(map[string]proto.Message),
|
ChildIndex map[string]map[string][]string
|
||||||
ClusterType: make(map[string]proto.Message),
|
}
|
||||||
EndpointType: make(map[string]proto.Message),
|
|
||||||
|
func emptyIndexedResources() *IndexedResources {
|
||||||
|
return &IndexedResources{
|
||||||
|
Index: map[string]map[string]proto.Message{
|
||||||
|
ListenerType: make(map[string]proto.Message),
|
||||||
|
RouteType: make(map[string]proto.Message),
|
||||||
|
ClusterType: make(map[string]proto.Message),
|
||||||
|
EndpointType: make(map[string]proto.Message),
|
||||||
|
},
|
||||||
|
ChildIndex: map[string]map[string][]string{
|
||||||
|
ListenerType: make(map[string][]string),
|
||||||
|
ClusterType: make(map[string][]string),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func indexResources(logger hclog.Logger, resources map[string][]proto.Message) IndexedResources {
|
func populateChildIndexMap(resourceMap *IndexedResources) error {
|
||||||
|
// LDS and RDS have a more complicated relationship.
|
||||||
|
for name, res := range resourceMap.Index[ListenerType] {
|
||||||
|
listener := res.(*envoy_listener_v3.Listener)
|
||||||
|
rdsRouteNames, err := extractRdsResourceNames(listener)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resourceMap.ChildIndex[ListenerType][name] = rdsRouteNames
|
||||||
|
}
|
||||||
|
|
||||||
|
// CDS and EDS share exact names.
|
||||||
|
for name := range resourceMap.Index[ClusterType] {
|
||||||
|
resourceMap.ChildIndex[ClusterType][name] = []string{name}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func indexResources(logger hclog.Logger, resources map[string][]proto.Message) *IndexedResources {
|
||||||
data := emptyIndexedResources()
|
data := emptyIndexedResources()
|
||||||
|
|
||||||
for typeURL, typeRes := range resources {
|
for typeURL, typeRes := range resources {
|
||||||
@ -688,7 +775,7 @@ func indexResources(logger hclog.Logger, resources map[string][]proto.Message) I
|
|||||||
if name == "" {
|
if name == "" {
|
||||||
logger.Warn("skipping unexpected xDS type found in delta snapshot", "typeURL", typeURL)
|
logger.Warn("skipping unexpected xDS type found in delta snapshot", "typeURL", typeURL)
|
||||||
} else {
|
} else {
|
||||||
data[typeURL][name] = res
|
data.Index[typeURL][name] = res
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -327,6 +327,333 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) {
|
||||||
|
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||||
|
// Allow all
|
||||||
|
return acl.RootAuthorizer("manage"), nil
|
||||||
|
}
|
||||||
|
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
|
||||||
|
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||||
|
|
||||||
|
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||||
|
|
||||||
|
// Register the proxy to create state needed to Watch() on
|
||||||
|
mgr.RegisterProxy(t, sid)
|
||||||
|
|
||||||
|
var snap *proxycfg.ConfigSnapshot
|
||||||
|
runStep(t, "get into initial state", func(t *testing.T) {
|
||||||
|
snap = newTestSnapshot(t, nil, "")
|
||||||
|
|
||||||
|
// Send initial cluster discover.
|
||||||
|
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
|
||||||
|
|
||||||
|
// Check no response sent yet
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
|
||||||
|
requireProtocolVersionGauge(t, scenario, "v3", 1)
|
||||||
|
|
||||||
|
// Deliver a new snapshot (tcp with one tcp upstream)
|
||||||
|
mgr.DeliverConfig(t, sid, snap)
|
||||||
|
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: ClusterType,
|
||||||
|
Nonce: hexString(1),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestCluster(t, snap, "tcp:local_app"),
|
||||||
|
makeTestCluster(t, snap, "tcp:db"),
|
||||||
|
makeTestCluster(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Envoy then tries to discover endpoints for those clusters.
|
||||||
|
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||||
|
ResourceNamesSubscribe: []string{
|
||||||
|
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||||
|
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// It also (in parallel) issues the cluster ACK
|
||||||
|
envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil)
|
||||||
|
|
||||||
|
// We should get a response immediately since the config is already present in
|
||||||
|
// the server for endpoints. Note that this should not be racy if the server
|
||||||
|
// is behaving well since the Cluster send above should be blocked until we
|
||||||
|
// deliver a new config version.
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: EndpointType,
|
||||||
|
Nonce: hexString(2),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestEndpoints(t, snap, "tcp:db"),
|
||||||
|
makeTestEndpoints(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// And no other response yet
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
|
||||||
|
// Envoy now sends listener request
|
||||||
|
envoy.SendDeltaReq(t, ListenerType, nil)
|
||||||
|
|
||||||
|
// It also (in parallel) issues the endpoint ACK
|
||||||
|
envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil)
|
||||||
|
|
||||||
|
// And should get a response immediately.
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: ListenerType,
|
||||||
|
Nonce: hexString(3),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestListener(t, snap, "tcp:public_listener"),
|
||||||
|
makeTestListener(t, snap, "tcp:db"),
|
||||||
|
makeTestListener(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// And no other response yet
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
|
||||||
|
// ACKs the listener
|
||||||
|
envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil)
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "trigger cluster update needing implicit endpoint replacements", func(t *testing.T) {
|
||||||
|
// Update the snapshot in a way that causes a single cluster update.
|
||||||
|
snap = newTestSnapshot(t, snap, "", &structs.ServiceResolverConfigEntry{
|
||||||
|
Kind: structs.ServiceResolver,
|
||||||
|
Name: "db",
|
||||||
|
ConnectTimeout: 1337 * time.Second,
|
||||||
|
})
|
||||||
|
mgr.DeliverConfig(t, sid, snap)
|
||||||
|
|
||||||
|
// The cluster is updated
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: ClusterType,
|
||||||
|
Nonce: hexString(4),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
// SAME makeTestCluster(t, snap, "tcp:local_app"),
|
||||||
|
makeTestCluster(t, snap, "tcp:db:timeout"),
|
||||||
|
// SAME makeTestCluster(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
envoy.SendDeltaReqACK(t, ClusterType, 4, true, nil)
|
||||||
|
|
||||||
|
// And we re-send the endpoints for the updated cluster after getting the
|
||||||
|
// ACK for the cluster.
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: EndpointType,
|
||||||
|
Nonce: hexString(5),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestEndpoints(t, snap, "tcp:db"),
|
||||||
|
// SAME makeTestEndpoints(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
envoy.SendDeltaReqACK(t, EndpointType, 5, true, nil)
|
||||||
|
|
||||||
|
// And no other response yet
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
})
|
||||||
|
|
||||||
|
envoy.Close()
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
require.NoError(t, err)
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
t.Fatalf("timed out waiting for handler to finish")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChangesImpactRoutes(t *testing.T) {
|
||||||
|
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||||
|
// Allow all
|
||||||
|
return acl.RootAuthorizer("manage"), nil
|
||||||
|
}
|
||||||
|
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
|
||||||
|
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||||
|
|
||||||
|
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||||
|
|
||||||
|
// Register the proxy to create state needed to Watch() on
|
||||||
|
mgr.RegisterProxy(t, sid)
|
||||||
|
|
||||||
|
var snap *proxycfg.ConfigSnapshot
|
||||||
|
|
||||||
|
runStep(t, "get into initial state", func(t *testing.T) {
|
||||||
|
// Send initial cluster discover (empty payload)
|
||||||
|
envoy.SendDeltaReq(t, ClusterType, nil)
|
||||||
|
|
||||||
|
// Check no response sent yet
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
|
||||||
|
// Deliver a new snapshot (tcp with one http upstream with no-op disco chain)
|
||||||
|
snap = newTestSnapshot(t, nil, "http2", &structs.ServiceConfigEntry{
|
||||||
|
Kind: structs.ServiceDefaults,
|
||||||
|
Name: "db",
|
||||||
|
Protocol: "http2",
|
||||||
|
}, &structs.ServiceRouterConfigEntry{
|
||||||
|
Kind: structs.ServiceRouter,
|
||||||
|
Name: "db",
|
||||||
|
Routes: nil,
|
||||||
|
})
|
||||||
|
mgr.DeliverConfig(t, sid, snap)
|
||||||
|
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: ClusterType,
|
||||||
|
Nonce: hexString(1),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestCluster(t, snap, "tcp:local_app"),
|
||||||
|
makeTestCluster(t, snap, "http2:db"),
|
||||||
|
makeTestCluster(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Envoy then tries to discover endpoints for those clusters.
|
||||||
|
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||||
|
ResourceNamesSubscribe: []string{
|
||||||
|
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||||
|
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// It also (in parallel) issues the cluster ACK
|
||||||
|
envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil)
|
||||||
|
|
||||||
|
// We should get a response immediately since the config is already present in
|
||||||
|
// the server for endpoints. Note that this should not be racy if the server
|
||||||
|
// is behaving well since the Cluster send above should be blocked until we
|
||||||
|
// deliver a new config version.
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: EndpointType,
|
||||||
|
Nonce: hexString(2),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestEndpoints(t, snap, "http2:db"),
|
||||||
|
makeTestEndpoints(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// And no other response yet
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
|
||||||
|
// Envoy now sends listener request
|
||||||
|
envoy.SendDeltaReq(t, ListenerType, nil)
|
||||||
|
|
||||||
|
// It also (in parallel) issues the endpoint ACK
|
||||||
|
envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil)
|
||||||
|
|
||||||
|
// And should get a response immediately.
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: ListenerType,
|
||||||
|
Nonce: hexString(3),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestListener(t, snap, "tcp:public_listener"),
|
||||||
|
makeTestListener(t, snap, "http2:db:rds"),
|
||||||
|
makeTestListener(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// And no other response yet
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
|
||||||
|
// Envoy now sends routes request
|
||||||
|
envoy.SendDeltaReq(t, RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||||
|
ResourceNamesSubscribe: []string{
|
||||||
|
"db",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// ACKs the listener
|
||||||
|
envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil)
|
||||||
|
|
||||||
|
// And should get a response immediately.
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: RouteType,
|
||||||
|
Nonce: hexString(4),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestRoute(t, "http2:db"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
envoy.SendDeltaReqACK(t, RouteType, 4, true, nil)
|
||||||
|
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
})
|
||||||
|
|
||||||
|
runStep(t, "trigger listener update needing implicit route replacements", func(t *testing.T) {
|
||||||
|
// Update the snapshot in a way that causes a single listener update.
|
||||||
|
//
|
||||||
|
// Downgrade from http2 to http
|
||||||
|
snap = newTestSnapshot(t, snap, "http", &structs.ServiceConfigEntry{
|
||||||
|
Kind: structs.ServiceDefaults,
|
||||||
|
Name: "db",
|
||||||
|
Protocol: "http",
|
||||||
|
}, &structs.ServiceRouterConfigEntry{
|
||||||
|
Kind: structs.ServiceRouter,
|
||||||
|
Name: "db",
|
||||||
|
Routes: nil,
|
||||||
|
})
|
||||||
|
mgr.DeliverConfig(t, sid, snap)
|
||||||
|
|
||||||
|
// db cluster is refreshed (unrelated to the test scenario other than it's required)
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: ClusterType,
|
||||||
|
Nonce: hexString(5),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestCluster(t, snap, "http:db"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// the listener is updated
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: ListenerType,
|
||||||
|
Nonce: hexString(6),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestListener(t, snap, "http:db:rds"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
envoy.SendDeltaReqACK(t, ClusterType, 5, true, nil)
|
||||||
|
|
||||||
|
// ACKs the listener
|
||||||
|
envoy.SendDeltaReqACK(t, ListenerType, 6, true, nil)
|
||||||
|
|
||||||
|
// The behaviors of Cluster updates triggering re-sends of Endpoint updates
|
||||||
|
// tested in TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints
|
||||||
|
// triggers here. It is not explicitly under test, but we have to get past
|
||||||
|
// this exchange to get to the part we care about.
|
||||||
|
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: EndpointType,
|
||||||
|
Nonce: hexString(7),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestEndpoints(t, snap, "http:db"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
envoy.SendDeltaReqACK(t, EndpointType, 7, true, nil)
|
||||||
|
|
||||||
|
// THE ACTUAL THING WE CARE ABOUT: replaced route config
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: RouteType,
|
||||||
|
Nonce: hexString(8),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestRoute(t, "http2:db"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
envoy.SendDeltaReqACK(t, RouteType, 8, true, nil)
|
||||||
|
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
})
|
||||||
|
|
||||||
|
envoy.Close()
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
require.NoError(t, err)
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
t.Fatalf("timed out waiting for handler to finish")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
|
func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -663,6 +663,51 @@ const (
|
|||||||
httpConnectionManagerNewName = "envoy.filters.network.http_connection_manager"
|
httpConnectionManagerNewName = "envoy.filters.network.http_connection_manager"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func extractRdsResourceNames(listener *envoy_listener_v3.Listener) ([]string, error) {
|
||||||
|
var found []string
|
||||||
|
|
||||||
|
for chainIdx, chain := range listener.FilterChains {
|
||||||
|
for filterIdx, filter := range chain.Filters {
|
||||||
|
if filter.Name != httpConnectionManagerNewName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
tc, ok := filter.ConfigType.(*envoy_listener_v3.Filter_TypedConfig)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf(
|
||||||
|
"filter chain %d has a %q filter %d with an unsupported config type: %T",
|
||||||
|
chainIdx,
|
||||||
|
filter.Name,
|
||||||
|
filterIdx,
|
||||||
|
filter.ConfigType,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
var hcm envoy_http_v3.HttpConnectionManager
|
||||||
|
if err := ptypes.UnmarshalAny(tc.TypedConfig, &hcm); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if hcm.RouteSpecifier == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
rds, ok := hcm.RouteSpecifier.(*envoy_http_v3.HttpConnectionManager_Rds)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if rds.Rds == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
found = append(found, rds.Rds.RouteConfigName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return found, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Locate the existing http connect manager L4 filter and inject our RBAC filter at the top.
|
// Locate the existing http connect manager L4 filter and inject our RBAC filter at the top.
|
||||||
func injectHTTPFilterOnFilterChains(
|
func injectHTTPFilterOnFilterChains(
|
||||||
listener *envoy_listener_v3.Listener,
|
listener *envoy_listener_v3.Listener,
|
||||||
|
@ -391,6 +391,24 @@ func makeTestCluster(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName st
|
|||||||
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
|
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
|
||||||
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
|
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
|
||||||
}
|
}
|
||||||
|
case "tcp:db:timeout":
|
||||||
|
return &envoy_cluster_v3.Cluster{
|
||||||
|
Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||||
|
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{
|
||||||
|
Type: envoy_cluster_v3.Cluster_EDS,
|
||||||
|
},
|
||||||
|
EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{
|
||||||
|
EdsConfig: xdsNewADSConfig(),
|
||||||
|
},
|
||||||
|
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{},
|
||||||
|
OutlierDetection: &envoy_cluster_v3.OutlierDetection{},
|
||||||
|
AltStatName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||||
|
CommonLbConfig: &envoy_cluster_v3.Cluster_CommonLbConfig{
|
||||||
|
HealthyPanicThreshold: &envoy_type_v3.Percent{Value: 0},
|
||||||
|
},
|
||||||
|
ConnectTimeout: ptypes.DurationProto(1337 * time.Second),
|
||||||
|
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
|
||||||
|
}
|
||||||
case "http2:db":
|
case "http2:db":
|
||||||
return &envoy_cluster_v3.Cluster{
|
return &envoy_cluster_v3.Cluster{
|
||||||
Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||||
@ -410,6 +428,25 @@ func makeTestCluster(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName st
|
|||||||
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
|
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
|
||||||
Http2ProtocolOptions: &envoy_core_v3.Http2ProtocolOptions{},
|
Http2ProtocolOptions: &envoy_core_v3.Http2ProtocolOptions{},
|
||||||
}
|
}
|
||||||
|
case "http:db":
|
||||||
|
return &envoy_cluster_v3.Cluster{
|
||||||
|
Name: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||||
|
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{
|
||||||
|
Type: envoy_cluster_v3.Cluster_EDS,
|
||||||
|
},
|
||||||
|
EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{
|
||||||
|
EdsConfig: xdsNewADSConfig(),
|
||||||
|
},
|
||||||
|
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{},
|
||||||
|
OutlierDetection: &envoy_cluster_v3.OutlierDetection{},
|
||||||
|
AltStatName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||||
|
CommonLbConfig: &envoy_cluster_v3.Cluster_CommonLbConfig{
|
||||||
|
HealthyPanicThreshold: &envoy_type_v3.Percent{Value: 0},
|
||||||
|
},
|
||||||
|
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
|
||||||
|
TransportSocket: xdsNewUpstreamTransportSocket(t, snap, "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"),
|
||||||
|
// HttpProtocolOptions: &envoy_core_v3.Http1ProtocolOptions{},
|
||||||
|
}
|
||||||
case "tcp:geo-cache":
|
case "tcp:geo-cache":
|
||||||
return &envoy_cluster_v3.Cluster{
|
return &envoy_cluster_v3.Cluster{
|
||||||
Name: "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
|
Name: "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
|
||||||
@ -444,7 +481,7 @@ func makeTestEndpoints(t *testing.T, _ *proxycfg.ConfigSnapshot, fixtureName str
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
case "http2:db":
|
case "http2:db", "http:db":
|
||||||
return &envoy_endpoint_v3.ClusterLoadAssignment{
|
return &envoy_endpoint_v3.ClusterLoadAssignment{
|
||||||
ClusterName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
ClusterName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||||
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{
|
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{
|
||||||
@ -570,6 +607,34 @@ func makeTestListener(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName s
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
case "http:db:rds":
|
||||||
|
return &envoy_listener_v3.Listener{
|
||||||
|
Name: "db:127.0.0.1:9191",
|
||||||
|
Address: makeAddress("127.0.0.1", 9191),
|
||||||
|
TrafficDirection: envoy_core_v3.TrafficDirection_OUTBOUND,
|
||||||
|
FilterChains: []*envoy_listener_v3.FilterChain{
|
||||||
|
{
|
||||||
|
Filters: []*envoy_listener_v3.Filter{
|
||||||
|
xdsNewFilter(t, "envoy.filters.network.http_connection_manager", &envoy_http_v3.HttpConnectionManager{
|
||||||
|
HttpFilters: []*envoy_http_v3.HttpFilter{
|
||||||
|
{Name: "envoy.filters.http.router"},
|
||||||
|
},
|
||||||
|
RouteSpecifier: &envoy_http_v3.HttpConnectionManager_Rds{
|
||||||
|
Rds: &envoy_http_v3.Rds{
|
||||||
|
RouteConfigName: "db",
|
||||||
|
ConfigSource: xdsNewADSConfig(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
StatPrefix: "upstream.db.default.dc1",
|
||||||
|
Tracing: &envoy_http_v3.HttpConnectionManager_Tracing{
|
||||||
|
RandomSampling: &envoy_type_v3.Percent{Value: 0},
|
||||||
|
},
|
||||||
|
// HttpProtocolOptions: &envoy_core_v3.Http1ProtocolOptions{},
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
case "tcp:geo-cache":
|
case "tcp:geo-cache":
|
||||||
return &envoy_listener_v3.Listener{
|
return &envoy_listener_v3.Listener{
|
||||||
Name: "prepared_query:geo-cache:127.10.10.10:8181",
|
Name: "prepared_query:geo-cache:127.10.10.10:8181",
|
||||||
@ -596,7 +661,7 @@ func makeTestListener(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName s
|
|||||||
|
|
||||||
func makeTestRoute(t *testing.T, fixtureName string) *envoy_route_v3.RouteConfiguration {
|
func makeTestRoute(t *testing.T, fixtureName string) *envoy_route_v3.RouteConfiguration {
|
||||||
switch fixtureName {
|
switch fixtureName {
|
||||||
case "http2:db":
|
case "http2:db", "http:db":
|
||||||
return &envoy_route_v3.RouteConfiguration{
|
return &envoy_route_v3.RouteConfiguration{
|
||||||
Name: "db",
|
Name: "db",
|
||||||
ValidateClusters: makeBoolValue(true),
|
ValidateClusters: makeBoolValue(true),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user