mirror of https://github.com/status-im/consul.git
Fix xDS missing endpoint race condition. (#19866)
This fixes the following race condition: - Send update endpoints - Send update cluster - Recv ACK endpoints - Recv ACK cluster Prior to this fix, it would have resulted in the endpoints NOT existing in Envoy. This occurred because the cluster update implicitly clears the endpoints in Envoy, but we would never re-send the endpoint data to compensate for the loss, because we would incorrectly ACK the invalid old endpoint hash. Since the endpoint's hash did not actually change, they would not be resent. The fix for this is to effectively clear out the invalid pending ACKs for child resources whenever the parent changes. This ensures that we do not store the child's hash as accepted when the race occurs. An escape-hatch environment variable `XDS_PROTOCOL_LEGACY_CHILD_RESEND` was added so that users can revert back to the old legacy behavior in the event that this produces unknown side-effects. Visit the following thread for some extra context on why certainty around these race conditions is difficult: https://github.com/envoyproxy/envoy/issues/13009 This bug report and fix was mostly implemented by @ksmiley with some minor tweaks. Co-authored-by: Keith Smiley <ksmiley@salesforce.com>
This commit is contained in:
parent
0ca070b301
commit
0ac958f27b
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
xds: ensure child resources are re-sent to Envoy when the parent is updated even if the child already has pending updates.
|
||||||
|
```
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -43,6 +44,11 @@ import (
|
||||||
|
|
||||||
var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another")
|
var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another")
|
||||||
|
|
||||||
|
// xdsProtocolLegacyChildResend enables the legacy behavior for the `ensureChildResend` function.
|
||||||
|
// This environment variable exists as an escape hatch so that users can disable the behavior, if needed.
|
||||||
|
// Ideally, this is a flag we can remove in 1.19+
|
||||||
|
var xdsProtocolLegacyChildResend = (os.Getenv("XDS_PROTOCOL_LEGACY_CHILD_RESEND") != "")
|
||||||
|
|
||||||
type deltaRecvResponse int
|
type deltaRecvResponse int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -1080,13 +1086,9 @@ func (t *xDSDeltaType) createDeltaResponse(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
|
func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
|
||||||
if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !t.subscribed(childName) {
|
if !t.subscribed(childName) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
t.logger.Trace(
|
t.logger.Trace(
|
||||||
"triggering implicit update of resource",
|
"triggering implicit update of resource",
|
||||||
"typeUrl", t.typeURL,
|
"typeUrl", t.typeURL,
|
||||||
|
@ -1094,11 +1096,41 @@ func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
|
||||||
"childTypeUrl", t.deltaChild.childType.typeURL,
|
"childTypeUrl", t.deltaChild.childType.typeURL,
|
||||||
"childResource", childName,
|
"childResource", childName,
|
||||||
)
|
)
|
||||||
|
|
||||||
// resourceVersions tracks the last known version for this childName that Envoy
|
// resourceVersions tracks the last known version for this childName that Envoy
|
||||||
// has ACKed. By setting this to empty it effectively tells us that Envoy does
|
// has ACKed. By setting this to empty it effectively tells us that Envoy does
|
||||||
// not have any data for that child, and we need to re-send.
|
// not have any data for that child, and we need to re-send.
|
||||||
t.deltaChild.childType.resourceVersions[childName] = ""
|
if _, exist := t.deltaChild.childType.resourceVersions[childName]; exist {
|
||||||
|
t.deltaChild.childType.resourceVersions[childName] = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if xdsProtocolLegacyChildResend {
|
||||||
|
return
|
||||||
|
// TODO: This legacy behavior can be removed in 1.19, provided there are no outstanding issues.
|
||||||
|
//
|
||||||
|
// In this legacy mode, there is a confirmed race condition:
|
||||||
|
// - Send update endpoints
|
||||||
|
// - Send update cluster
|
||||||
|
// - Recv ACK endpoints
|
||||||
|
// - Recv ACK cluster
|
||||||
|
//
|
||||||
|
// When this situation happens, Envoy wipes the child endpoints when the cluster is updated,
|
||||||
|
// but it would never receive new ones. The endpoints would not be resent, because their hash
|
||||||
|
// never changed since the previous ACK.
|
||||||
|
//
|
||||||
|
// Due to ambiguity with the Envoy protocol [https://github.com/envoyproxy/envoy/issues/13009],
|
||||||
|
// it's difficult to state with certainty that no other unexpected side-effects are possible.
|
||||||
|
// This legacy escape hatch is left in-place in case some other complex race condition crops up.
|
||||||
|
//
|
||||||
|
// Longer-term, we should modify the hash of children to include the parent hash so that this
|
||||||
|
// behavior is implicitly handled, rather than being an edge case.
|
||||||
|
}
|
||||||
|
|
||||||
|
// pendingUpdates can contain newer versions that have been sent to Envoy but
|
||||||
|
// that we haven't processed an ACK for yet. These need to be cleared out, too,
|
||||||
|
// so that they aren't moved to resourceVersions by ack()
|
||||||
|
for nonce := range t.deltaChild.childType.pendingUpdates {
|
||||||
|
delete(t.deltaChild.childType.pendingUpdates[nonce], childName)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {
|
func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {
|
||||||
|
|
|
@ -6,7 +6,6 @@ package xds
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/hashicorp/consul/envoyextensions/xdscommon"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -14,6 +13,8 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/envoyextensions/xdscommon"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
||||||
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
|
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
|
||||||
|
@ -821,6 +822,147 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangeBeforeEndpointAck(t *testing.T) {
|
||||||
|
// This test ensures that the following race condition does not block indefinitely:
|
||||||
|
// - Send update endpoints
|
||||||
|
// - Send update cluster
|
||||||
|
// - Recv ACK endpoints
|
||||||
|
// - Recv ACK cluster
|
||||||
|
// Prior to a bug fix, this would have resulted in the endpoints NOT existing in Envoy. This occurred because
|
||||||
|
// the cluster update implicitly clears the endpoints in Envoy, but we would never re-send the endpoint data
|
||||||
|
// to compensate for the loss because we would incorrectly ACK the invalid old endpoint hash. Since the
|
||||||
|
// endpoint's hash did not actually change, they would not be resent.
|
||||||
|
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||||
|
// Allow all
|
||||||
|
return acl.RootAuthorizer("manage"), nil
|
||||||
|
}
|
||||||
|
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
|
||||||
|
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
|
||||||
|
|
||||||
|
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||||
|
|
||||||
|
// Register the proxy to create state needed to Watch() on
|
||||||
|
mgr.RegisterProxy(t, sid)
|
||||||
|
|
||||||
|
var snap *proxycfg.ConfigSnapshot
|
||||||
|
testutil.RunStep(t, "initial setup", func(t *testing.T) {
|
||||||
|
snap = newTestSnapshot(t, nil, "", nil)
|
||||||
|
|
||||||
|
// Send initial cluster discover.
|
||||||
|
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
|
||||||
|
|
||||||
|
// Check no response sent yet
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
|
||||||
|
requireProtocolVersionGauge(t, scenario, "v3", 1)
|
||||||
|
|
||||||
|
// Deliver a new snapshot (tcp with one tcp upstream)
|
||||||
|
mgr.DeliverConfig(t, sid, snap)
|
||||||
|
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: xdscommon.ClusterType,
|
||||||
|
Nonce: hexString(1),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestCluster(t, snap, "tcp:local_app"),
|
||||||
|
makeTestCluster(t, snap, "tcp:db"),
|
||||||
|
makeTestCluster(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
var newSnap *proxycfg.ConfigSnapshot
|
||||||
|
testutil.RunStep(t, "resend cluster immediately", func(t *testing.T) {
|
||||||
|
// Deliver updated snapshot with new CA roots and leaf certificate. This will not be
|
||||||
|
// sent to Envoy until the initial set of cluster message is ACKed.
|
||||||
|
newSnap = newTestSnapshot(t, nil, "", nil)
|
||||||
|
mgr.DeliverConfig(t, sid, newSnap)
|
||||||
|
|
||||||
|
// Envoy then tries to discover endpoints for clusters.
|
||||||
|
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||||
|
ResourceNamesSubscribe: []string{
|
||||||
|
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||||
|
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// We should get a response immediately since the config is already present in
|
||||||
|
// the server for endpoints. Note that this should not be racy if the server
|
||||||
|
// is behaving well since the Cluster send above should be blocked until we
|
||||||
|
// deliver a new config version.
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: xdscommon.EndpointType,
|
||||||
|
Nonce: hexString(2),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestEndpoints(t, snap, "tcp:db"),
|
||||||
|
makeTestEndpoints(t, snap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// After receiving the endpoints Envoy sends an ACK for the clusters
|
||||||
|
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
|
||||||
|
|
||||||
|
// The updated cluster snapshot with new certificates is sent immediately
|
||||||
|
// after the first is ACKed.
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: xdscommon.ClusterType,
|
||||||
|
Nonce: hexString(3),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
// SAME makeTestCluster(t, snap, "tcp:local_app"),
|
||||||
|
makeTestCluster(t, newSnap, "tcp:db"),
|
||||||
|
makeTestCluster(t, newSnap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "resend endpoints", func(t *testing.T) {
|
||||||
|
// Envoy requests listeners because it has received endpoints. We won't send listeners
|
||||||
|
// until Envoy ACKs the second cluster update.
|
||||||
|
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
|
||||||
|
|
||||||
|
// Envoy ACKs the endpoints from the first cluster update.
|
||||||
|
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)
|
||||||
|
|
||||||
|
// Resend endpoints because the clusters changed.
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: xdscommon.EndpointType,
|
||||||
|
Nonce: hexString(4),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestEndpoints(t, newSnap, "tcp:db"),
|
||||||
|
makeTestEndpoints(t, newSnap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Envoy ACKs the new cluster and endpoints.
|
||||||
|
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 3)
|
||||||
|
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)
|
||||||
|
|
||||||
|
// Listeners are sent after the cluster and endpoints are ACKed.
|
||||||
|
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||||
|
TypeUrl: xdscommon.ListenerType,
|
||||||
|
Nonce: hexString(5),
|
||||||
|
Resources: makeTestResources(t,
|
||||||
|
makeTestListener(t, newSnap, "tcp:public_listener"),
|
||||||
|
makeTestListener(t, newSnap, "tcp:db"),
|
||||||
|
makeTestListener(t, newSnap, "tcp:geo-cache"),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
// We are caught up, so there should be nothing queued to send.
|
||||||
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||||
|
|
||||||
|
// ACKs the listener
|
||||||
|
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 5)
|
||||||
|
})
|
||||||
|
|
||||||
|
envoy.Close()
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
require.NoError(t, err)
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
t.Fatalf("timed out waiting for handler to finish")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChangesImpactRoutes(t *testing.T) {
|
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChangesImpactRoutes(t *testing.T) {
|
||||||
aclResolve := func(id string) (acl.Authorizer, error) {
|
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||||
// Allow all
|
// Allow all
|
||||||
|
|
Loading…
Reference in New Issue