mirror of https://github.com/status-im/consul.git
xds: fix representation of incremental xDS subscriptions (#10987)
Fixes #10563 The `resourceVersion` map was doing two jobs prior to this PR. The first job was to track what version of every resource we know envoy currently has. The second was to track subscriptions to those resources (by way of the empty string for a version). This mostly works out fine, but occasionally leads to consul removing a resource and accidentally (effectively) unsubscribing at the same time. The fix separates these two jobs. When all of the resources for a subscription are removed we continue to track the subscription until envoy explicitly unsubscribes
This commit is contained in:
parent
377c3a9b0b
commit
b2d17ac448
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
xds: fixed a bug where Envoy sidecars could enter a state where they failed to receive xds updates from Consul
|
||||
```
|
|
@ -189,6 +189,10 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
|||
// index and hash the xDS structures
|
||||
newResourceMap := indexResources(generator.Logger, newRes)
|
||||
|
||||
if s.ResourceMapMutateFn != nil {
|
||||
s.ResourceMapMutateFn(newResourceMap)
|
||||
}
|
||||
|
||||
if err := populateChildIndexMap(newResourceMap); err != nil {
|
||||
return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err)
|
||||
}
|
||||
|
@ -384,6 +388,10 @@ type xDSDeltaType struct {
|
|||
// sentToEnvoyOnce is true after we've sent one response to envoy.
|
||||
sentToEnvoyOnce bool
|
||||
|
||||
// subscriptions is the set of currently subscribed envoy resources.
|
||||
// If wildcard == true, this will be empty.
|
||||
subscriptions map[string]struct{}
|
||||
|
||||
// resourceVersions is the current view of CONFIRMED/ACKed updates to
|
||||
// envoy's view of the loaded resources.
|
||||
//
|
||||
|
@ -398,7 +406,16 @@ type xDSDeltaType struct {
|
|||
pendingUpdates map[string]map[string]PendingUpdate
|
||||
}
|
||||
|
||||
func (t *xDSDeltaType) subscribed(name string) bool {
|
||||
if t.wildcard {
|
||||
return true
|
||||
}
|
||||
_, subscribed := t.subscriptions[name]
|
||||
return subscribed
|
||||
}
|
||||
|
||||
type PendingUpdate struct {
|
||||
Remove bool
|
||||
Version string
|
||||
ChildResources []string // optional
|
||||
}
|
||||
|
@ -414,6 +431,7 @@ func newDeltaType(
|
|||
stream: stream,
|
||||
typeURL: typeUrl,
|
||||
allowEmptyFn: allowEmptyFn,
|
||||
subscriptions: make(map[string]struct{}),
|
||||
resourceVersions: make(map[string]string),
|
||||
pendingUpdates: make(map[string]map[string]PendingUpdate),
|
||||
}
|
||||
|
@ -484,6 +502,11 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool
|
|||
logger.Trace("setting initial resource versions for stream",
|
||||
"resources", req.InitialResourceVersions)
|
||||
t.resourceVersions = req.InitialResourceVersions
|
||||
if !t.wildcard {
|
||||
for k := range req.InitialResourceVersions {
|
||||
t.subscriptions[k] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !t.wildcard {
|
||||
|
@ -509,8 +532,13 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool
|
|||
//
|
||||
// We handle that here by ALWAYS wiping the version so the diff
|
||||
// decides to send the value.
|
||||
_, alreadySubscribed := t.resourceVersions[name]
|
||||
t.resourceVersions[name] = "" // start with no version
|
||||
_, alreadySubscribed := t.subscriptions[name]
|
||||
t.subscriptions[name] = struct{}{}
|
||||
|
||||
// Reset the tracked version so we force a reply.
|
||||
if _, alreadyTracked := t.resourceVersions[name]; alreadyTracked {
|
||||
t.resourceVersions[name] = ""
|
||||
}
|
||||
|
||||
if alreadySubscribed {
|
||||
logger.Trace("re-subscribing resource for stream", "resource", name)
|
||||
|
@ -520,11 +548,12 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool
|
|||
}
|
||||
|
||||
for _, name := range req.ResourceNamesUnsubscribe {
|
||||
if _, ok := t.resourceVersions[name]; !ok {
|
||||
if _, ok := t.subscriptions[name]; !ok {
|
||||
continue
|
||||
}
|
||||
delete(t.resourceVersions, name)
|
||||
delete(t.subscriptions, name)
|
||||
logger.Trace("unsubscribing resource for stream", "resource", name)
|
||||
// NOTE: we'll let the normal differential comparison handle cleaning up resourceVersions
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -538,26 +567,31 @@ func (t *xDSDeltaType) ack(nonce string) {
|
|||
}
|
||||
|
||||
for name, obj := range pending {
|
||||
if obj.Version == "" {
|
||||
if obj.Remove {
|
||||
delete(t.resourceVersions, name)
|
||||
} else {
|
||||
t.resourceVersions[name] = obj.Version
|
||||
continue
|
||||
}
|
||||
if t.childType != nil && obj.Version != "" {
|
||||
|
||||
t.resourceVersions[name] = obj.Version
|
||||
if t.childType != nil {
|
||||
// This branch only matters on UPDATE, since we already have
|
||||
// mechanisms to clean up orphaned resources.
|
||||
for _, childName := range obj.ChildResources {
|
||||
if _, exist := t.childType.resourceVersions[childName]; exist {
|
||||
t.generator.Logger.Trace(
|
||||
"triggering implicit update of resource",
|
||||
"typeUrl", t.typeURL,
|
||||
"resource", name,
|
||||
"childTypeUrl", t.childType.typeURL,
|
||||
"childResource", childName,
|
||||
)
|
||||
// Basically manifest this as a re-subscribe
|
||||
t.childType.resourceVersions[childName] = ""
|
||||
if _, exist := t.childType.resourceVersions[childName]; !exist {
|
||||
continue
|
||||
}
|
||||
if !t.subscribed(childName) {
|
||||
continue
|
||||
}
|
||||
t.generator.Logger.Trace(
|
||||
"triggering implicit update of resource",
|
||||
"typeUrl", t.typeURL,
|
||||
"resource", name,
|
||||
"childTypeUrl", t.childType.typeURL,
|
||||
"childResource", childName,
|
||||
)
|
||||
// Basically manifest this as a re-subscribe/re-sync
|
||||
t.childType.resourceVersions[childName] = ""
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -640,26 +674,57 @@ func (t *xDSDeltaType) createDeltaResponse(
|
|||
hasRelevantUpdates = false
|
||||
updates = make(map[string]PendingUpdate)
|
||||
)
|
||||
// First find things that need updating or deleting
|
||||
for name, envoyVers := range t.resourceVersions {
|
||||
currVers, ok := currentVersions[name]
|
||||
if !ok {
|
||||
if remove {
|
||||
hasRelevantUpdates = true
|
||||
|
||||
if t.wildcard {
|
||||
// First find things that need updating or deleting
|
||||
for name, envoyVers := range t.resourceVersions {
|
||||
currVers, ok := currentVersions[name]
|
||||
if !ok {
|
||||
if remove {
|
||||
hasRelevantUpdates = true
|
||||
}
|
||||
updates[name] = PendingUpdate{Remove: true}
|
||||
} else if currVers != envoyVers {
|
||||
if upsert {
|
||||
hasRelevantUpdates = true
|
||||
}
|
||||
updates[name] = PendingUpdate{Version: currVers}
|
||||
}
|
||||
}
|
||||
|
||||
// Now find new things
|
||||
for name, currVers := range currentVersions {
|
||||
if _, known := t.resourceVersions[name]; known {
|
||||
continue
|
||||
}
|
||||
updates[name] = PendingUpdate{Version: ""}
|
||||
} else if currVers != envoyVers {
|
||||
if upsert {
|
||||
hasRelevantUpdates = true
|
||||
}
|
||||
updates[name] = PendingUpdate{Version: currVers}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// First find things that need updating or deleting
|
||||
|
||||
// Now find new things
|
||||
if t.wildcard {
|
||||
for name, currVers := range currentVersions {
|
||||
if _, ok := t.resourceVersions[name]; !ok {
|
||||
// Walk the list of things currently stored in envoy
|
||||
for name, envoyVers := range t.resourceVersions {
|
||||
if t.subscribed(name) {
|
||||
if currVers, ok := currentVersions[name]; ok {
|
||||
if currVers != envoyVers {
|
||||
if upsert {
|
||||
hasRelevantUpdates = true
|
||||
}
|
||||
updates[name] = PendingUpdate{Version: currVers}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now find new things not in envoy yet
|
||||
for name := range t.subscriptions {
|
||||
if _, known := t.resourceVersions[name]; known {
|
||||
continue
|
||||
}
|
||||
if currVers, ok := currentVersions[name]; ok {
|
||||
updates[name] = PendingUpdate{Version: currVers}
|
||||
if upsert {
|
||||
hasRelevantUpdates = true
|
||||
|
@ -679,10 +744,10 @@ func (t *xDSDeltaType) createDeltaResponse(
|
|||
}
|
||||
realUpdates := make(map[string]PendingUpdate)
|
||||
for name, obj := range updates {
|
||||
if obj.Version == "" {
|
||||
if obj.Remove {
|
||||
if remove {
|
||||
resp.RemovedResources = append(resp.RemovedResources, name)
|
||||
realUpdates[name] = PendingUpdate{Version: ""}
|
||||
realUpdates[name] = PendingUpdate{Remove: true}
|
||||
}
|
||||
} else if upsert {
|
||||
resources, ok := resourceMap.Index[t.typeURL]
|
||||
|
|
|
@ -2,6 +2,7 @@ package xds
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -100,6 +101,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
Resources: makeTestResources(t,
|
||||
makeTestEndpoints(t, snap, "tcp:db"),
|
||||
// SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"),
|
||||
// SAME_AS_INITIAL_VERSION: "fake-endpoints",
|
||||
),
|
||||
})
|
||||
|
||||
|
@ -123,24 +125,12 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
),
|
||||
})
|
||||
|
||||
// cleanup unused resources now that we've created/updated relevant things
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: EndpointType,
|
||||
Nonce: hexString(4),
|
||||
RemovedResources: []string{
|
||||
"fake-endpoints", // correcting the errant subscription
|
||||
},
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// ACKs the listener
|
||||
envoy.SendDeltaReqACK(t, ListenerType, 3)
|
||||
|
||||
// ACK the endpoint removal
|
||||
envoy.SendDeltaReqACK(t, EndpointType, 4)
|
||||
|
||||
// If we re-subscribe to something even if there are no changes we get a
|
||||
// fresh copy.
|
||||
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||
|
@ -151,13 +141,13 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: EndpointType,
|
||||
Nonce: hexString(5),
|
||||
Nonce: hexString(4),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestEndpoints(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
envoy.SendDeltaReqACK(t, EndpointType, 5)
|
||||
envoy.SendDeltaReqACK(t, EndpointType, 4)
|
||||
|
||||
// And no other response yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
@ -202,13 +192,13 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
})
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: EndpointType,
|
||||
Nonce: hexString(6),
|
||||
Nonce: hexString(5),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestEndpoints(t, snap, "tcp:db"),
|
||||
),
|
||||
})
|
||||
|
||||
envoy.SendDeltaReqACK(t, EndpointType, 6)
|
||||
envoy.SendDeltaReqACK(t, EndpointType, 5)
|
||||
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
})
|
||||
|
@ -220,6 +210,17 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
mgr.DeliverConfig(t, sid, snap)
|
||||
|
||||
// Send envoy an EDS update.
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: EndpointType,
|
||||
Nonce: hexString(6),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestEndpoints(t, snap, "tcp:db[0]"),
|
||||
),
|
||||
})
|
||||
|
||||
envoy.SendDeltaReqNACK(t, EndpointType, 6, &rpcstatus.Status{})
|
||||
|
||||
// Send it again.
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: EndpointType,
|
||||
Nonce: hexString(7),
|
||||
|
@ -228,18 +229,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
),
|
||||
})
|
||||
|
||||
envoy.SendDeltaReqNACK(t, EndpointType, 7, &rpcstatus.Status{})
|
||||
|
||||
// Send it again.
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: EndpointType,
|
||||
Nonce: hexString(8),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestEndpoints(t, snap, "tcp:db[0]"),
|
||||
),
|
||||
})
|
||||
|
||||
envoy.SendDeltaReqACK(t, EndpointType, 8)
|
||||
envoy.SendDeltaReqACK(t, EndpointType, 7)
|
||||
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
})
|
||||
|
@ -421,6 +411,149 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) {
|
||||
// This illustrates a scenario related to https://github.com/hashicorp/consul/issues/10563
|
||||
|
||||
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||
// Allow all
|
||||
return acl.RootAuthorizer("manage"), nil
|
||||
}
|
||||
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
|
||||
server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy
|
||||
|
||||
// This mutateFn causes any endpoint with a name containing "geo-cache" to be
|
||||
// omitted from the response while the hack is active.
|
||||
var slowHackDisabled uint32
|
||||
server.ResourceMapMutateFn = func(resourceMap *IndexedResources) {
|
||||
if atomic.LoadUint32(&slowHackDisabled) == 1 {
|
||||
return
|
||||
}
|
||||
if em, ok := resourceMap.Index[EndpointType]; ok {
|
||||
for k := range em {
|
||||
if strings.Contains(k, "geo-cache") {
|
||||
delete(em, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sid := structs.NewServiceID("web-sidecar-proxy", nil)
|
||||
|
||||
// Register the proxy to create state needed to Watch() on
|
||||
mgr.RegisterProxy(t, sid)
|
||||
|
||||
var snap *proxycfg.ConfigSnapshot
|
||||
runStep(t, "get into initial state", func(t *testing.T) {
|
||||
snap = newTestSnapshot(t, nil, "")
|
||||
|
||||
// Send initial cluster discover.
|
||||
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
|
||||
|
||||
// Check no response sent yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
requireProtocolVersionGauge(t, scenario, "v3", 1)
|
||||
|
||||
// Deliver a new snapshot (tcp with one tcp upstream)
|
||||
mgr.DeliverConfig(t, sid, snap)
|
||||
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: ClusterType,
|
||||
Nonce: hexString(1),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestCluster(t, snap, "tcp:local_app"),
|
||||
makeTestCluster(t, snap, "tcp:db"),
|
||||
makeTestCluster(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
// Envoy then tries to discover endpoints for those clusters.
|
||||
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||
ResourceNamesSubscribe: []string{
|
||||
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
|
||||
},
|
||||
})
|
||||
|
||||
// It also (in parallel) issues the cluster ACK
|
||||
envoy.SendDeltaReqACK(t, ClusterType, 1)
|
||||
|
||||
// We should get a response immediately since the config is already present in
|
||||
// the server for endpoints. Note that this should not be racy if the server
|
||||
// is behaving well since the Cluster send above should be blocked until we
|
||||
// deliver a new config version.
|
||||
//
|
||||
// NOTE: we do NOT return back geo-cache yet
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: EndpointType,
|
||||
Nonce: hexString(2),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestEndpoints(t, snap, "tcp:db"),
|
||||
// makeTestEndpoints(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// Envoy now sends listener request
|
||||
envoy.SendDeltaReq(t, ListenerType, nil)
|
||||
|
||||
// It also (in parallel) issues the endpoint ACK
|
||||
envoy.SendDeltaReqACK(t, EndpointType, 2)
|
||||
|
||||
// And should get a response immediately.
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: ListenerType,
|
||||
Nonce: hexString(3),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestListener(t, snap, "tcp:public_listener"),
|
||||
makeTestListener(t, snap, "tcp:db"),
|
||||
makeTestListener(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// ACKs the listener
|
||||
envoy.SendDeltaReqACK(t, ListenerType, 3)
|
||||
})
|
||||
|
||||
// Disable hack. Need to wait for one more event to wake up the loop.
|
||||
atomic.StoreUint32(&slowHackDisabled, 1)
|
||||
|
||||
runStep(t, "delayed endpoint update finally comes in", func(t *testing.T) {
|
||||
// Trigger the xds.Server select{} to wake up and notice our hack is disabled.
|
||||
// The actual contents of this change are irrelevant.
|
||||
snap = newTestSnapshot(t, snap, "")
|
||||
mgr.DeliverConfig(t, sid, snap)
|
||||
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: EndpointType,
|
||||
Nonce: hexString(4),
|
||||
Resources: makeTestResources(t,
|
||||
makeTestEndpoints(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// It also (in parallel) issues the endpoint ACK
|
||||
envoy.SendDeltaReqACK(t, EndpointType, 4)
|
||||
|
||||
})
|
||||
|
||||
envoy.Close()
|
||||
select {
|
||||
case err := <-errCh:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatalf("timed out waiting for handler to finish")
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) {
|
||||
aclResolve := func(id string) (acl.Authorizer, error) {
|
||||
// Allow all
|
||||
|
|
|
@ -152,6 +152,9 @@ type Server struct {
|
|||
|
||||
DisableV2Protocol bool
|
||||
|
||||
// ResourceMapMutateFn exclusively exists for testing purposes.
|
||||
ResourceMapMutateFn func(resourceMap *IndexedResources)
|
||||
|
||||
activeStreams *activeStreamCounters
|
||||
}
|
||||
|
||||
|
|
|
@ -177,7 +177,9 @@ func newTestServerScenarioInner(
|
|||
nil, /*checkFetcher HTTPCheckFetcher*/
|
||||
nil, /*cfgFetcher ConfigFetcher*/
|
||||
)
|
||||
s.AuthCheckFrequency = authCheckFrequency
|
||||
if authCheckFrequency > 0 {
|
||||
s.AuthCheckFrequency = authCheckFrequency
|
||||
}
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
|
|
Loading…
Reference in New Issue