// Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: BUSL-1.1 package xds import ( "crypto/sha256" "encoding/hex" "errors" "fmt" "strconv" "sync" "sync/atomic" "time" "github.com/armon/go-metrics" envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/hashicorp/go-hclog" goversion "github.com/hashicorp/go-version" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "github.com/hashicorp/consul/agent/envoyextensions" external "github.com/hashicorp/consul/agent/grpc-external" "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/xds/configfetcher" "github.com/hashicorp/consul/agent/xds/extensionruntime" "github.com/hashicorp/consul/agent/xdsv2" "github.com/hashicorp/consul/envoyextensions/extensioncommon" "github.com/hashicorp/consul/envoyextensions/xdscommon" proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker" "github.com/hashicorp/consul/logging" pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1" "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/version" ) var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another") type deltaRecvResponse int const ( deltaRecvResponseNack deltaRecvResponse = iota deltaRecvResponseAck deltaRecvNewSubscription deltaRecvUnknownType ) // ADSDeltaStream is a shorter way of referring to this thing... type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer // DeltaAggregatedResources implements envoy_discovery_v3.AggregatedDiscoveryServiceServer func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error { defer s.activeStreams.Increment(stream.Context())() // a channel for receiving incoming requests reqCh := make(chan *envoy_discovery_v3.DeltaDiscoveryRequest) reqStop := int32(0) go func() { for { req, err := stream.Recv() if atomic.LoadInt32(&reqStop) != 0 { return } if err != nil { s.Logger.Error("Error receiving new DeltaDiscoveryRequest; closing request channel", "error", err) close(reqCh) return } reqCh <- req } }() err := s.processDelta(stream, reqCh) if err != nil { s.Logger.Error("Error handling ADS delta stream", "xdsVersion", "v3", "error", err) } // prevents writing to a closed channel if send failed on blocked recv atomic.StoreInt32(&reqStop, 1) return err } // getEnvoyConfiguration is a utility function that instantiates the proper // Envoy resource generator based on whether it was passed a ConfigSource or // ProxyState implementation of the ProxySnapshot interface and returns the // generated Envoy configuration. func getEnvoyConfiguration(proxySnapshot proxysnapshot.ProxySnapshot, logger hclog.Logger, cfgFetcher configfetcher.ConfigFetcher) (map[string][]proto.Message, error) { switch proxySnapshot.(type) { case *proxycfg.ConfigSnapshot: logger.Trace("ProxySnapshot update channel received a ProxySnapshot of type ConfigSnapshot", "proxySnapshot", proxySnapshot, ) generator := NewResourceGenerator( logger, cfgFetcher, true, ) c := proxySnapshot.(*proxycfg.ConfigSnapshot) logger.Trace("ConfigSnapshot", c) return generator.AllResourcesFromSnapshot(c) case *proxytracker.ProxyState: logger.Trace("ProxySnapshot update channel received a ProxySnapshot of type ProxyState", "proxySnapshot", proxySnapshot, ) generator := xdsv2.NewResourceGenerator( logger, ) c := proxySnapshot.(*proxytracker.ProxyState) logger.Trace("ProxyState", c) resources, err := generator.AllResourcesFromIR(c) if err != nil { logger.Error("error generating resources from proxy state template", "err", err) return nil, err } logger.Trace("generated resources from proxy state template", "resources", resources) return resources, nil default: return nil, errors.New("proxysnapshot must be of type ProxyState or ConfigSnapshot") } } const ( stateDeltaInit int = iota stateDeltaPendingInitialConfig stateDeltaRunning ) func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discovery_v3.DeltaDiscoveryRequest) error { // Handle invalid ACL tokens up-front. if _, err := s.authenticate(stream.Context()); err != nil { return err } // Loop state var ( proxySnapshot proxysnapshot.ProxySnapshot node *envoy_config_core_v3.Node stateCh <-chan proxysnapshot.ProxySnapshot drainCh limiter.SessionTerminatedChan watchCancel func() nonce uint64 // xDS requires a unique nonce to correlate response/request pairs ready bool // set to true after the first snapshot arrives streamStartTime = time.Now() streamStartOnce sync.Once ) var ( // resourceMap is the SoTW we are incrementally attempting to sync to envoy. // // type => name => proto resourceMap = xdscommon.EmptyIndexedResources() // currentVersions is the xDS versioning represented by Resources. // // type => name => version (as consul knows right now) currentVersions = make(map[string]map[string]string) ) logger := s.Logger.Named(logging.XDS).With("xdsVersion", "v3") // need to run a small state machine to get through initial authentication. var state = stateDeltaInit // Configure handlers for each type of request we currently care about. handlers := map[string]*xDSDeltaType{ xdscommon.ListenerType: newDeltaType(logger, stream, xdscommon.ListenerType, func() bool { return proxySnapshot.AllowEmptyListeners() }), xdscommon.RouteType: newDeltaType(logger, stream, xdscommon.RouteType, func() bool { return proxySnapshot.AllowEmptyRoutes() }), xdscommon.ClusterType: newDeltaType(logger, stream, xdscommon.ClusterType, func() bool { return proxySnapshot.AllowEmptyClusters() }), xdscommon.EndpointType: newDeltaType(logger, stream, xdscommon.EndpointType, nil), xdscommon.SecretType: newDeltaType(logger, stream, xdscommon.SecretType, nil), // TODO allowEmptyFn } // Endpoints are stored within a Cluster (and Routes // are stored within a Listener) so whenever the // enclosing resource is updated the inner resource // list is cleared implicitly. // // When this happens we should update our local // representation of envoy state to force an update. // // see: https://github.com/envoyproxy/envoy/issues/13009 handlers[xdscommon.ListenerType].deltaChild = &xDSDeltaChild{ childType: handlers[xdscommon.RouteType], childrenNames: make(map[string][]string), } handlers[xdscommon.ClusterType].deltaChild = &xDSDeltaChild{ childType: handlers[xdscommon.EndpointType], childrenNames: make(map[string][]string), } var authTimer <-chan time.Time extendAuthTimer := func() { authTimer = time.After(s.AuthCheckFrequency) } checkStreamACLs := func(proxySnap proxysnapshot.ProxySnapshot) error { return s.authorize(stream.Context(), proxySnap) } for { select { case <-drainCh: logger.Debug("draining stream to rebalance load") metrics.IncrCounter([]string{"xds", "server", "streamDrained"}, 1) return errOverwhelmed case <-authTimer: // It's been too long since a Discovery{Request,Response} so recheck ACLs. if err := checkStreamACLs(proxySnapshot); err != nil { return err } extendAuthTimer() case req, ok := <-reqCh: if !ok { // reqCh is closed when stream.Recv errors which is how we detect client // going away. AFAICT the stream.Context() is only canceled once the // RPC method returns which it can't until we return from this one so // there's no point in blocking on that. return nil } logTraceRequest(logger, "Incremental xDS v3", req) if req.TypeUrl == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } var proxyFeatures xdscommon.SupportedProxyFeatures if node == nil && req.Node != nil { node = req.Node var err error proxyFeatures, err = xdscommon.DetermineSupportedProxyFeatures(req.Node) if err != nil { return status.Errorf(codes.InvalidArgument, err.Error()) } } if handler, ok := handlers[req.TypeUrl]; ok { switch handler.Recv(req, proxyFeatures) { case deltaRecvNewSubscription: logger.Trace("subscribing to type", "typeUrl", req.TypeUrl) case deltaRecvResponseNack: logger.Trace("got nack response for type", "typeUrl", req.TypeUrl) // There is no reason to believe that generating new xDS resources from the same snapshot // would lead to an ACK from Envoy. Instead we continue to the top of this for loop and wait // for a new request or snapshot. continue } } case cs, ok := <-stateCh: if !ok { // stateCh is closed either when *we* cancel the watch (on-exit via defer) // or by the proxycfg.Manager when an irrecoverable error is encountered // such as the ACL token getting deleted. // // We know for sure that this is the latter case, because in the former we // would've already exited this loop. return status.Error(codes.Aborted, "xDS stream terminated due to an irrecoverable error, please try again") } proxySnapshot = cs newRes, err := getEnvoyConfiguration(proxySnapshot, logger, s.CfgFetcher) if err != nil { return status.Errorf(codes.Unavailable, "failed to generate all xDS resources from the snapshot: %v", err) } // index and hash the xDS structures newResourceMap := xdscommon.IndexResources(logger, newRes) if s.ResourceMapMutateFn != nil { s.ResourceMapMutateFn(newResourceMap) } if newResourceMap, err = s.applyEnvoyExtensions(newResourceMap, proxySnapshot, node); err != nil { // err is already the result of calling status.Errorf return err } if err := populateChildIndexMap(newResourceMap); err != nil { return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err) } newVersions, err := computeResourceVersions(newResourceMap) if err != nil { return status.Errorf(codes.Unavailable, "failed to compute xDS resource versions: %v", err) } resourceMap = newResourceMap currentVersions = newVersions ready = true } // Trigger state machine switch state { case stateDeltaInit: if node == nil { // This can't happen (tm) since stateCh is nil until after the first req // is received but lets not panic about it. continue } nodeName := node.GetMetadata().GetFields()["node_name"].GetStringValue() if nodeName == "" { nodeName = s.NodeName } // Start authentication process, we need the proxyID proxyID := newResourceIDFromEnvoyNode(node) // Start watching config for that proxy var err error options, err := external.QueryOptionsFromContext(stream.Context()) if err != nil { return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err) } stateCh, drainCh, watchCancel, err = s.ProxyWatcher.Watch(proxyID, nodeName, options.Token) switch { case errors.Is(err, limiter.ErrCapacityReached): return errOverwhelmed case err != nil: return status.Errorf(codes.Internal, "failed to watch proxy: %s", err) } // Note that in this case we _intend_ the defer to only be triggered when // this whole process method ends (i.e. when streaming RPC aborts) not at // the end of the current loop iteration. We have to do it in the loop // here since we can't start watching until we get to this state in the // state machine. defer watchCancel() logger = logger.With("service_id", proxyID.Name) // enhance future logs logger.Trace("watching proxy, pending initial proxycfg snapshot for xDS") // Now wait for the config so we can check ACL state = stateDeltaPendingInitialConfig case stateDeltaPendingInitialConfig: if proxySnapshot == nil { // Nothing we can do until we get the initial config continue } // Got config, try to authenticate next. state = stateDeltaRunning // Upgrade the logger loggerName := proxySnapshot.LoggerName() if loggerName != "" { logger = logger.Named(loggerName) } logger.Trace("Got initial config snapshot") // Let's actually process the config we just got, or we'll miss responding fallthrough case stateDeltaRunning: // Check ACLs on every Discovery{Request,Response}. if err := checkStreamACLs(proxySnapshot); err != nil { return err } // For the first time through the state machine, this is when the // timer is first started. extendAuthTimer() if !ready { logger.Trace("Skipping delta computation because we haven't gotten a snapshot yet") continue } logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any") streamStartOnce.Do(func() { metrics.MeasureSince([]string{"xds", "server", "streamStart"}, streamStartTime) }) for _, op := range xDSUpdateOrder { if op.TypeUrl == xdscommon.ListenerType || op.TypeUrl == xdscommon.RouteType { if clusterHandler := handlers[xdscommon.ClusterType]; clusterHandler.registered && len(clusterHandler.pendingUpdates) > 0 { logger.Trace("Skipping delta computation for resource because there are dependent updates pending", "typeUrl", op.TypeUrl, "dependent", xdscommon.ClusterType) // Receiving an ACK from Envoy will unblock the select statement above, // and re-trigger an attempt to send these skipped updates. break } if endpointHandler := handlers[xdscommon.EndpointType]; endpointHandler.registered && len(endpointHandler.pendingUpdates) > 0 { logger.Trace("Skipping delta computation for resource because there are dependent updates pending", "typeUrl", op.TypeUrl, "dependent", xdscommon.EndpointType) // Receiving an ACK from Envoy will unblock the select statement above, // and re-trigger an attempt to send these skipped updates. break } } err, _ := handlers[op.TypeUrl].SendIfNew(currentVersions[op.TypeUrl], resourceMap, &nonce, op.Upsert, op.Remove) if err != nil { return status.Errorf(codes.Unavailable, "failed to send %sreply for type %q: %v", op.errorLogNameReplyPrefix(), op.TypeUrl, err) } } } } } // newResourceIDFromEnvoyNode is a utility function that allows creating a // Resource ID from an Envoy proxy node so that existing delta calls can easily // use ProxyWatcher interface arguments for Watch(). func newResourceIDFromEnvoyNode(node *envoy_config_core_v3.Node) *pbresource.ID { entMeta := parseEnterpriseMeta(node) return &pbresource.ID{ Name: node.Id, Tenancy: &pbresource.Tenancy{ Namespace: entMeta.NamespaceOrDefault(), Partition: entMeta.PartitionOrDefault(), }, Type: pbmesh.ProxyStateTemplateType, } } func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, proxySnapshot proxysnapshot.ProxySnapshot, node *envoy_config_core_v3.Node) (*xdscommon.IndexedResources, error) { // TODO(proxystate) // This is a workaround for now as envoy extensions are not yet supported with ProxyState. // For now, we cast to proxycfg.ConfigSnapshot and no-op if it's the pbmesh.ProxyState type. var snapshot *proxycfg.ConfigSnapshot switch proxySnapshot.(type) { //TODO(proxystate): implement envoy extensions for ProxyState case *proxytracker.ProxyState: return resources, nil case *proxycfg.ConfigSnapshot: snapshot = proxySnapshot.(*proxycfg.ConfigSnapshot) default: return nil, status.Errorf(codes.InvalidArgument, "unsupported config snapshot type to apply envoy extensions to %T", proxySnapshot) } var err error envoyVersion := xdscommon.DetermineEnvoyVersionFromNode(node) consulVersion, err := goversion.NewVersion(version.Version) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to parse Consul version") } serviceConfigs := extensionruntime.GetRuntimeConfigurations(snapshot) for _, cfgs := range serviceConfigs { for _, cfg := range cfgs { resources, err = validateAndApplyEnvoyExtension(s.Logger, snapshot, resources, cfg, envoyVersion, consulVersion) if err != nil { return nil, err } } } return resources, nil } func validateAndApplyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot, resources *xdscommon.IndexedResources, runtimeConfig extensioncommon.RuntimeConfig, envoyVersion, consulVersion *goversion.Version) (*xdscommon.IndexedResources, error) { logFn := logger.Warn if runtimeConfig.EnvoyExtension.Required { logFn = logger.Error } svc := runtimeConfig.ServiceName errorParams := []interface{}{ "extension", runtimeConfig.EnvoyExtension.Name, "service", svc.Name, "namespace", svc.Namespace, "partition", svc.Partition, } getMetricLabels := func(err error) []metrics.Label { return []metrics.Label{ {Name: "extension", Value: runtimeConfig.EnvoyExtension.Name}, {Name: "version", Value: "builtin/" + version.Version}, {Name: "service", Value: cfgSnap.Service}, {Name: "partition", Value: cfgSnap.ProxyID.PartitionOrDefault()}, {Name: "namespace", Value: cfgSnap.ProxyID.NamespaceOrDefault()}, {Name: "error", Value: strconv.FormatBool(err != nil)}, } } ext := runtimeConfig.EnvoyExtension if v := ext.EnvoyVersion; v != "" { c, err := goversion.NewConstraint(v) if err != nil { logFn("failed to parse Envoy extension version constraint", errorParams...) if ext.Required { return nil, status.Errorf(codes.InvalidArgument, "failed to parse Envoy version constraint for extension %q for service %q", ext.Name, svc.Name) } return resources, nil } if !c.Check(envoyVersion) { logger.Info("skipping envoy extension due to Envoy version constraint violation", errorParams...) return resources, nil } } if v := ext.ConsulVersion; v != "" { c, err := goversion.NewConstraint(v) if err != nil { logFn("failed to parse Consul extension version constraint", errorParams...) if ext.Required { return nil, status.Errorf(codes.InvalidArgument, "failed to parse Consul version constraint for extension %q for service %q", ext.Name, svc.Name) } return resources, nil } if !c.Check(consulVersion) { logger.Info("skipping envoy extension due to Consul version constraint violation", errorParams...) return resources, nil } } now := time.Now() extender, err := envoyextensions.ConstructExtension(ext) metrics.MeasureSinceWithLabels([]string{"envoy_extension", "validate_arguments"}, now, getMetricLabels(err)) if err != nil { errorParams = append(errorParams, "error", err) logFn("failed to construct extension", errorParams...) if ext.Required { return nil, status.Errorf(codes.InvalidArgument, "failed to construct extension %q for service %q", ext.Name, svc.Name) } return resources, nil } now = time.Now() err = extender.Validate(&runtimeConfig) metrics.MeasureSinceWithLabels([]string{"envoy_extension", "validate"}, now, getMetricLabels(err)) if err != nil { errorParams = append(errorParams, "error", err) logFn("failed to validate extension arguments", errorParams...) if ext.Required { return nil, status.Errorf(codes.InvalidArgument, "failed to validate arguments for extension %q for service %q", ext.Name, svc.Name) } return resources, nil } now = time.Now() resources, err = applyEnvoyExtension(extender, resources, &runtimeConfig) metrics.MeasureSinceWithLabels([]string{"envoy_extension", "extend"}, now, getMetricLabels(err)) if err != nil { errorParams = append(errorParams, "error", err) logFn("failed to apply envoy extension", errorParams...) if ext.Required { return nil, status.Errorf(codes.InvalidArgument, "failed to patch xDS resources in the %q extension: %v", ext.Name, err) } } return resources, nil } // applyEnvoyExtension safely checks whether an extension can be applied, and if so attempts to apply it. // // applyEnvoyExtension makes a copy of the provided IndexedResources, then applies the given extension to them. // The copy ensures against partial application if a non-required extension modifies a resource then fails at a later // stage; this is necessary because IndexedResources and its proto messages are all passed by reference, and // non-required extensions do not lead to a terminal failure in xDS updates. // // If the application is successful, the modified copy is returned. If not, the original and an error is returned. // Returning resources in either case allows for applying extensions in a loop and reporting on non-required extension // failures simultaneously. func applyEnvoyExtension(extender extensioncommon.EnvoyExtender, resources *xdscommon.IndexedResources, runtimeConfig *extensioncommon.RuntimeConfig) (r *xdscommon.IndexedResources, e error) { // Don't panic due to an extension misbehaving. defer func() { if err := recover(); err != nil { r = resources e = fmt.Errorf("attempt to apply Envoy extension %q caused an unexpected panic: %v", runtimeConfig.EnvoyExtension.Name, err) } }() // First check whether the extension is eligible for application in the current environment. // Do this before copying indexed resources for the sake of efficiency. if !extender.CanApply(runtimeConfig) { return resources, nil } newResources, err := extender.Extend(xdscommon.Clone(resources), runtimeConfig) if err != nil { return resources, err } return newResources, nil } // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations var xDSUpdateOrder = []xDSUpdateOperation{ // 1. SDS updates (if any) can be pushed here with no harm. {TypeUrl: xdscommon.SecretType, Upsert: true}, // 2. CDS updates (if any) must always be pushed before the following types. {TypeUrl: xdscommon.ClusterType, Upsert: true}, // 3. EDS updates (if any) must arrive after CDS updates for the respective clusters. {TypeUrl: xdscommon.EndpointType, Upsert: true}, // 4. LDS updates must arrive after corresponding CDS/EDS updates. {TypeUrl: xdscommon.ListenerType, Upsert: true, Remove: true}, // 5. RDS updates related to the newly added listeners must arrive after CDS/EDS/LDS updates. {TypeUrl: xdscommon.RouteType, Upsert: true, Remove: true}, // 6. (NOT IMPLEMENTED YET IN CONSUL) VHDS updates (if any) related to the newly added RouteConfigurations must arrive after RDS updates. // {}, // 7. Stale CDS clusters, related EDS endpoints (ones no longer being referenced) and SDS secrets can then be removed. {TypeUrl: xdscommon.ClusterType, Remove: true}, {TypeUrl: xdscommon.EndpointType, Remove: true}, {TypeUrl: xdscommon.SecretType, Remove: true}, // xDS updates can be pushed independently if no new // clusters/routes/listeners are added or if it’s acceptable to // temporarily drop traffic during updates. Note that in case of // LDS updates, the listeners will be warmed before they receive // traffic, i.e. the dependent routes are fetched through RDS if // configured. Clusters are warmed when adding/removing/updating // clusters. On the other hand, routes are not warmed, i.e., the // management plane must ensure that clusters referenced by a route // are in place, before pushing the updates for a route. } type xDSUpdateOperation struct { TypeUrl string Upsert bool Remove bool } func (op *xDSUpdateOperation) errorLogNameReplyPrefix() string { switch { case op.Upsert && op.Remove: return "upsert/remove " case op.Upsert: return "upsert " case op.Remove: return "remove " default: return "" } } type xDSDeltaChild struct { // childType is a type that in Envoy is actually stored within this type. // Upserts of THIS type should potentially trigger dependent named // resources within the child to be re-configured. childType *xDSDeltaType // childrenNames is map of parent resource names to a list of associated child resource // names. childrenNames map[string][]string } type xDSDeltaType struct { logger hclog.Logger stream ADSDeltaStream typeURL string allowEmptyFn func() bool // deltaChild contains data for an xDS child type if there is one. // For example, endpoints are a child type of clusters. deltaChild *xDSDeltaChild // registered indicates if this type has been requested at least once by // the proxy registered bool // wildcard indicates that this type was requested with no preference for // specific resource names. subscribe/unsubscribe are ignored. wildcard bool // sentToEnvoyOnce is true after we've sent one response to envoy. sentToEnvoyOnce bool // subscriptions is the set of currently subscribed envoy resources. // If wildcard == true, this will be empty. subscriptions map[string]struct{} // resourceVersions is the current view of CONFIRMED/ACKed updates to // envoy's view of the loaded resources. // // name => version resourceVersions map[string]string // pendingUpdates is a set of un-ACKed updates to the 'resourceVersions' // map. Once we get an ACK from envoy we'll update the resourceVersions map // and strike the entry from this map. // // nonce -> name -> {version} pendingUpdates map[string]map[string]PendingUpdate } func (t *xDSDeltaType) subscribed(name string) bool { if t.wildcard { return true } _, subscribed := t.subscriptions[name] return subscribed } type PendingUpdate struct { Remove bool Version string } func newDeltaType( logger hclog.Logger, stream ADSDeltaStream, typeUrl string, allowEmptyFn func() bool, ) *xDSDeltaType { return &xDSDeltaType{ logger: logger, stream: stream, typeURL: typeUrl, allowEmptyFn: allowEmptyFn, subscriptions: make(map[string]struct{}), resourceVersions: make(map[string]string), pendingUpdates: make(map[string]map[string]PendingUpdate), } } // Recv handles new discovery requests from envoy. // // Returns true the first time a type receives a request. func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf xdscommon.SupportedProxyFeatures) deltaRecvResponse { if t == nil { return deltaRecvUnknownType // not something we care about } registeredThisTime := false if !t.registered { // We are in the wildcard mode if the first request of a particular // type has empty subscription list t.wildcard = len(req.ResourceNamesSubscribe) == 0 t.registered = true registeredThisTime = true } /* DeltaDiscoveryRequest can be sent in the following situations: Initial message in a xDS bidirectional gRPC stream. As an ACK or NACK response to a previous DeltaDiscoveryResponse. In this case the response_nonce is set to the nonce value in the Response. ACK or NACK is determined by the absence or presence of error_detail. Spontaneous DeltaDiscoveryRequests from the client. This can be done to dynamically add or remove elements from the tracked resource_names set. In this case response_nonce must be omitted. */ /* DeltaDiscoveryRequest plays two independent roles. Any DeltaDiscoveryRequest can be either or both of: */ if req.ResponseNonce != "" { /* [2] (N)ACKing an earlier resource update from the server (using response_nonce, with presence of error_detail making it a NACK). */ if req.ErrorDetail == nil { t.logger.Trace("got ok response from envoy proxy", "nonce", req.ResponseNonce) t.ack(req.ResponseNonce) } else { t.logger.Error("got error response from envoy proxy", "nonce", req.ResponseNonce, "error", status.ErrorProto(req.ErrorDetail)) t.nack(req.ResponseNonce) return deltaRecvResponseNack } } if registeredThisTime && len(req.InitialResourceVersions) > 0 { /* Additionally, the first message (for a given type_url) of a reconnected gRPC stream has a third role: [3] informing the server of the resources (and their versions) that the client already possesses, using the initial_resource_versions field. */ t.logger.Trace("setting initial resource versions for stream", "resources", req.InitialResourceVersions) t.resourceVersions = req.InitialResourceVersions if !t.wildcard { for k := range req.InitialResourceVersions { t.subscriptions[k] = struct{}{} } } } if !t.wildcard { /* [1] informing the server of what resources the client has gained/lost interest in (using resource_names_subscribe and resource_names_unsubscribe), or */ for _, name := range req.ResourceNamesSubscribe { // A resource_names_subscribe field may contain resource names that // the server believes the client is already subscribed to, and // furthermore has the most recent versions of. However, the server // must still provide those resources in the response; due to // implementation details hidden from the server, the client may // have “forgotten” those resources despite apparently remaining // subscribed. // // NOTE: the server must respond with all resources listed in // resource_names_subscribe, even if it believes the client has the // most recent version of them. The reason: the client may have // dropped them, but then regained interest before it had a chance // to send the unsubscribe message. // // We handle that here by ALWAYS wiping the version so the diff // decides to send the value. _, alreadySubscribed := t.subscriptions[name] t.subscriptions[name] = struct{}{} // Reset the tracked version so we force a reply. if _, alreadyTracked := t.resourceVersions[name]; alreadyTracked { t.resourceVersions[name] = "" } // Certain xDS types are children of other types, meaning that if Envoy subscribes to a parent. // We MUST assume that if Envoy ever had data for the children of this parent, then the child's // data is gone. if t.deltaChild != nil && t.deltaChild.childType.registered { for _, childName := range t.deltaChild.childrenNames[name] { t.ensureChildResend(name, childName) } } if alreadySubscribed { t.logger.Trace("re-subscribing resource for stream", "resource", name) } else { t.logger.Trace("subscribing resource for stream", "resource", name) } } for _, name := range req.ResourceNamesUnsubscribe { if _, ok := t.subscriptions[name]; !ok { continue } delete(t.subscriptions, name) t.logger.Trace("unsubscribing resource for stream", "resource", name) // NOTE: we'll let the normal differential comparison handle cleaning up resourceVersions } } if registeredThisTime { return deltaRecvNewSubscription } return deltaRecvResponseAck } func (t *xDSDeltaType) ack(nonce string) { pending, ok := t.pendingUpdates[nonce] if !ok { return } for name, obj := range pending { if obj.Remove { delete(t.resourceVersions, name) continue } t.resourceVersions[name] = obj.Version } t.sentToEnvoyOnce = true delete(t.pendingUpdates, nonce) } func (t *xDSDeltaType) nack(nonce string) { delete(t.pendingUpdates, nonce) } func (t *xDSDeltaType) SendIfNew( currentVersions map[string]string, // type => name => version (as consul knows right now) resourceMap *xdscommon.IndexedResources, nonce *uint64, upsert, remove bool, ) (error, bool) { if t == nil || !t.registered { return nil, false } // Wait for Envoy to catch up with this delta type before sending something new. if len(t.pendingUpdates) > 0 { return nil, false } logger := t.logger.With("typeUrl", t.typeURL) allowEmpty := t.allowEmptyFn != nil && t.allowEmptyFn() // Zero length resource responses should be ignored and are the result of no // data yet. Notice that this caused a bug originally where we had zero // healthy endpoints for an upstream that would cause Envoy to hang waiting // for the EDS response. This is fixed though by ensuring we send an explicit // empty LoadAssignment resource for the cluster rather than allowing junky // empty resources. if len(currentVersions) == 0 && !allowEmpty { // Nothing to send yet return nil, false } resp, updates, err := t.createDeltaResponse(currentVersions, resourceMap, upsert, remove) if err != nil { return err, false } if resp == nil { return nil, false } *nonce++ resp.Nonce = fmt.Sprintf("%08x", *nonce) logTraceResponse(t.logger, "Incremental xDS v3", resp) logger.Trace("sending response", "nonce", resp.Nonce) if err := t.stream.Send(resp); err != nil { return err, false } logger.Trace("sent response", "nonce", resp.Nonce) // Certain xDS types are children of other types, meaning that if an update is pushed for a parent, // we MUST send new data for all its children. Envoy will NOT re-subscribe to the child data upon // receiving updates for the parent, so we need to handle this ourselves. // // Note that we do not check whether the deltaChild.childType is registered here, since we send // parent types before child types, meaning that it's expected on first send of a parent that // there are no subscriptions for the child type. if t.deltaChild != nil { for name := range updates { if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok { // Capture the relevant child resource names on this pending update so // we can know the linked children if Envoy ever re-subscribes to the parent resource. t.deltaChild.childrenNames[name] = children for _, childName := range children { t.ensureChildResend(name, childName) } } } } t.pendingUpdates[resp.Nonce] = updates return nil, true } func (t *xDSDeltaType) createDeltaResponse( currentVersions map[string]string, // name => version (as consul knows right now) resourceMap *xdscommon.IndexedResources, upsert, remove bool, ) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]PendingUpdate, error) { // compute difference var ( hasRelevantUpdates = false updates = make(map[string]PendingUpdate) ) if t.wildcard { // First find things that need updating or deleting for name, envoyVers := range t.resourceVersions { currVers, ok := currentVersions[name] if !ok { if remove { hasRelevantUpdates = true } updates[name] = PendingUpdate{Remove: true} } else if currVers != envoyVers { if upsert { hasRelevantUpdates = true } updates[name] = PendingUpdate{Version: currVers} } } // Now find new things for name, currVers := range currentVersions { if _, known := t.resourceVersions[name]; known { continue } if upsert { hasRelevantUpdates = true } updates[name] = PendingUpdate{Version: currVers} } } else { // First find things that need updating or deleting // Walk the list of things currently stored in envoy for name, envoyVers := range t.resourceVersions { if t.subscribed(name) { if currVers, ok := currentVersions[name]; ok { if currVers != envoyVers { if upsert { hasRelevantUpdates = true } updates[name] = PendingUpdate{Version: currVers} } } } } // Now find new things not in envoy yet for name := range t.subscriptions { if _, known := t.resourceVersions[name]; known { continue } if currVers, ok := currentVersions[name]; ok { updates[name] = PendingUpdate{Version: currVers} if upsert { hasRelevantUpdates = true } } } } if !hasRelevantUpdates && t.sentToEnvoyOnce { return nil, nil, nil } // now turn this into a disco response resp := &envoy_discovery_v3.DeltaDiscoveryResponse{ // TODO(rb): consider putting something in SystemVersionInfo? TypeUrl: t.typeURL, } realUpdates := make(map[string]PendingUpdate) for name, obj := range updates { if obj.Remove { if remove { resp.RemovedResources = append(resp.RemovedResources, name) realUpdates[name] = PendingUpdate{Remove: true} } } else if upsert { resources, ok := resourceMap.Index[t.typeURL] if !ok { return nil, nil, fmt.Errorf("unknown type url: %s", t.typeURL) } res, ok := resources[name] if !ok { return nil, nil, fmt.Errorf("unknown name for type url %q: %s", t.typeURL, name) } any, err := anypb.New(res) if err != nil { return nil, nil, err } resp.Resources = append(resp.Resources, &envoy_discovery_v3.Resource{ Name: name, Resource: any, Version: obj.Version, }) realUpdates[name] = obj } } return resp, realUpdates, nil } func (t *xDSDeltaType) ensureChildResend(parentName, childName string) { if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist { return } if !t.subscribed(childName) { return } t.logger.Trace( "triggering implicit update of resource", "typeUrl", t.typeURL, "resource", parentName, "childTypeUrl", t.deltaChild.childType.typeURL, "childResource", childName, ) // resourceVersions tracks the last known version for this childName that Envoy // has ACKed. By setting this to empty it effectively tells us that Envoy does // not have any data for that child, and we need to re-send. t.deltaChild.childType.resourceVersions[childName] = "" } func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) { out := make(map[string]map[string]string) for typeUrl, resources := range resourceMap.Index { m, err := hashResourceMap(resources) if err != nil { return nil, fmt.Errorf("failed to hash resources for %q: %v", typeUrl, err) } out[typeUrl] = m } return out, nil } func populateChildIndexMap(resourceMap *xdscommon.IndexedResources) error { // LDS and RDS have a more complicated relationship. for name, res := range resourceMap.Index[xdscommon.ListenerType] { listener := res.(*envoy_listener_v3.Listener) rdsRouteNames, err := extractRdsResourceNames(listener) if err != nil { return err } resourceMap.ChildIndex[xdscommon.ListenerType][name] = rdsRouteNames } // CDS and EDS share exact names. for name := range resourceMap.Index[xdscommon.ClusterType] { resourceMap.ChildIndex[xdscommon.ClusterType][name] = []string{name} } return nil } func hashResourceMap(resources map[string]proto.Message) (map[string]string, error) { m := make(map[string]string) for name, res := range resources { h, err := hashResource(res) if err != nil { return nil, fmt.Errorf("failed to hash resource %q: %v", name, err) } m[name] = h } return m, nil } // hashResource will take a resource and create a SHA256 hash sum out of the marshaled bytes func hashResource(res proto.Message) (string, error) { h := sha256.New() marshaller := proto.MarshalOptions{Deterministic: true} data, err := marshaller.Marshal(res) if err != nil { return "", err } h.Write(data) return hex.EncodeToString(h.Sum(nil)), nil }