Prevent xDS tight loop on cfg errors (#12195)

This commit is contained in:
Freddy 2022-02-10 15:37:36 -07:00 committed by GitHub
parent ce478330f2
commit 378a7258e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 185 additions and 40 deletions

3
.changelog/12195.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
xds: prevents tight loop where the Consul client agent would repeatedly re-send config that Envoy has rejected.
```

View File

@ -26,6 +26,15 @@ import (
"github.com/hashicorp/consul/logging"
)
type deltaRecvResponse int
const (
deltaRecvResponseNack deltaRecvResponse = iota
deltaRecvResponseAck
deltaRecvNewSubscription
deltaRecvUnknownType
)
// ADSDeltaStream is a shorter way of referring to this thing...
type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer
@ -175,8 +184,17 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
}
if handler, ok := handlers[req.TypeUrl]; ok {
if handler.Recv(req, generator.ProxyFeatures) {
switch handler.Recv(req, generator.ProxyFeatures) {
case deltaRecvNewSubscription:
generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl)
case deltaRecvResponseNack:
generator.Logger.Trace("got nack response for type", "typeUrl", req.TypeUrl)
// There is no reason to believe that generating new xDS resources from the same snapshot
// would lead to an ACK from Envoy. Instead we continue to the top of this for loop and wait
// for a new request or snapshot.
continue
}
}
@ -430,9 +448,9 @@ func newDeltaType(
// Recv handles new discovery requests from envoy.
//
// Returns true the first time a type receives a request.
func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf supportedProxyFeatures) bool {
func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf supportedProxyFeatures) deltaRecvResponse {
if t == nil {
return false // not something we care about
return deltaRecvUnknownType // not something we care about
}
logger := t.generator.Logger.With("typeUrl", t.typeURL)
@ -487,6 +505,7 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su
logger.Error("got error response from envoy proxy", "nonce", req.ResponseNonce,
"error", status.ErrorProto(req.ErrorDetail))
t.nack(req.ResponseNonce)
return deltaRecvResponseNack
}
}
@ -557,7 +576,10 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su
}
}
return registeredThisTime
if registeredThisTime {
return deltaRecvNewSubscription
}
return deltaRecvResponseAck
}
func (t *xDSDeltaType) ack(nonce string) {

View File

@ -203,37 +203,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
runStep(t, "simulate envoy NACKing an endpoint update", func(t *testing.T) {
// Trigger only an EDS update.
snap = newTestSnapshot(t, snap, "")
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
mgr.DeliverConfig(t, sid, snap)
// Send envoy an EDS update.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(6),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db[0]"),
),
})
envoy.SendDeltaReqNACK(t, EndpointType, 6, &rpcstatus.Status{})
// Send it again.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(7),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db[0]"),
),
})
envoy.SendDeltaReqACK(t, EndpointType, 7)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
// NOTE: this has to be the last subtest since it kills the stream
runStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) {
// Force sends to fail
@ -250,11 +219,138 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
envoy.Close()
select {
case err := <-errCh:
// Envoy died.
expect := status.Errorf(codes.Unavailable,
"failed to send upsert reply for type %q: test error",
EndpointType)
require.EqualError(t, err, expect.Error())
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}
func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
var snap *proxycfg.ConfigSnapshot
runStep(t, "initial setup", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "")
// Plug in a bad port for the public listener
snap.Port = 1
// Send initial cluster discover.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
requireProtocolVersionGauge(t, scenario, "v3", 1)
// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)
})
runStep(t, "first sync", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
// Response contains public_listener with port that Envoy can't bind to
makeTestListener(t, snap, "tcp:bad_public_listener"),
makeTestListener(t, snap, "tcp:db"),
makeTestListener(t, snap, "tcp:geo-cache"),
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// NACKs the listener update due to the bad public listener
envoy.SendDeltaReqNACK(t, ListenerType, 3, &rpcstatus.Status{})
// Consul should not respond until a new snapshot is delivered
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
runStep(t, "simulate envoy NACKing a listener update", func(t *testing.T) {
// Correct the port and deliver a new snapshot
snap.Port = 9999
mgr.DeliverConfig(t, sid, snap)
// And should send a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
Nonce: hexString(4),
Resources: makeTestResources(t,
// Send a public listener that Envoy will accept
makeTestListener(t, snap, "tcp:public_listener"),
makeTestListener(t, snap, "tcp:db"),
makeTestListener(t, snap, "tcp:geo-cache"),
),
})
// New listener is acked now
envoy.SendDeltaReqACK(t, EndpointType, 4)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}

View File

@ -533,6 +533,30 @@ func makeTestEndpoints(t *testing.T, _ *proxycfg.ConfigSnapshot, fixtureName str
func makeTestListener(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName string) *envoy_listener_v3.Listener {
switch fixtureName {
case "tcp:bad_public_listener":
return &envoy_listener_v3.Listener{
// Envoy can't bind to port 1
Name: "public_listener:0.0.0.0:1",
Address: makeAddress("0.0.0.0", 1),
TrafficDirection: envoy_core_v3.TrafficDirection_INBOUND,
FilterChains: []*envoy_listener_v3.FilterChain{
{
TransportSocket: xdsNewPublicTransportSocket(t, snap),
Filters: []*envoy_listener_v3.Filter{
xdsNewFilter(t, "envoy.filters.network.rbac", &envoy_network_rbac_v3.RBAC{
Rules: &envoy_rbac_v3.RBAC{},
StatPrefix: "connect_authz",
}),
xdsNewFilter(t, "envoy.filters.network.tcp_proxy", &envoy_tcp_proxy_v3.TcpProxy{
ClusterSpecifier: &envoy_tcp_proxy_v3.TcpProxy_Cluster{
Cluster: "local_app",
},
StatPrefix: "public_listener",
}),
},
},
},
}
case "tcp:public_listener":
return &envoy_listener_v3.Listener{
Name: "public_listener:0.0.0.0:9999",