mirror of
https://github.com/status-im/consul.git
synced 2025-01-26 05:29:55 +00:00
5fb9df1640
* Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
1119 lines
36 KiB
Go
1119 lines
36 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
||
// SPDX-License-Identifier: BUSL-1.1
|
||
|
||
package xds
|
||
|
||
import (
|
||
"crypto/sha256"
|
||
"encoding/hex"
|
||
"errors"
|
||
"fmt"
|
||
"strconv"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/armon/go-metrics"
|
||
envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
||
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
|
||
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
||
"github.com/hashicorp/go-hclog"
|
||
goversion "github.com/hashicorp/go-version"
|
||
|
||
"google.golang.org/grpc/codes"
|
||
"google.golang.org/grpc/status"
|
||
"google.golang.org/protobuf/proto"
|
||
"google.golang.org/protobuf/types/known/anypb"
|
||
|
||
"github.com/hashicorp/consul/agent/envoyextensions"
|
||
external "github.com/hashicorp/consul/agent/grpc-external"
|
||
"github.com/hashicorp/consul/agent/grpc-external/limiter"
|
||
"github.com/hashicorp/consul/agent/proxycfg"
|
||
"github.com/hashicorp/consul/agent/structs"
|
||
"github.com/hashicorp/consul/agent/xds/extensionruntime"
|
||
"github.com/hashicorp/consul/envoyextensions/extensioncommon"
|
||
"github.com/hashicorp/consul/envoyextensions/xdscommon"
|
||
"github.com/hashicorp/consul/logging"
|
||
"github.com/hashicorp/consul/version"
|
||
)
|
||
|
||
var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another")
|
||
|
||
type deltaRecvResponse int
|
||
|
||
const (
|
||
deltaRecvResponseNack deltaRecvResponse = iota
|
||
deltaRecvResponseAck
|
||
deltaRecvNewSubscription
|
||
deltaRecvUnknownType
|
||
)
|
||
|
||
// ADSDeltaStream is a shorter way of referring to this thing...
|
||
type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer
|
||
|
||
// DeltaAggregatedResources implements envoy_discovery_v3.AggregatedDiscoveryServiceServer
|
||
func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error {
|
||
defer s.activeStreams.Increment(stream.Context())()
|
||
|
||
// a channel for receiving incoming requests
|
||
reqCh := make(chan *envoy_discovery_v3.DeltaDiscoveryRequest)
|
||
reqStop := int32(0)
|
||
go func() {
|
||
for {
|
||
req, err := stream.Recv()
|
||
if atomic.LoadInt32(&reqStop) != 0 {
|
||
return
|
||
}
|
||
if err != nil {
|
||
s.Logger.Error("Error receiving new DeltaDiscoveryRequest; closing request channel", "error", err)
|
||
close(reqCh)
|
||
return
|
||
}
|
||
reqCh <- req
|
||
}
|
||
}()
|
||
|
||
err := s.processDelta(stream, reqCh)
|
||
if err != nil {
|
||
s.Logger.Error("Error handling ADS delta stream", "xdsVersion", "v3", "error", err)
|
||
}
|
||
|
||
// prevents writing to a closed channel if send failed on blocked recv
|
||
atomic.StoreInt32(&reqStop, 1)
|
||
|
||
return err
|
||
}
|
||
|
||
const (
|
||
stateDeltaInit int = iota
|
||
stateDeltaPendingInitialConfig
|
||
stateDeltaRunning
|
||
)
|
||
|
||
func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discovery_v3.DeltaDiscoveryRequest) error {
|
||
// Handle invalid ACL tokens up-front.
|
||
if _, err := s.authenticate(stream.Context()); err != nil {
|
||
return err
|
||
}
|
||
|
||
// Loop state
|
||
var (
|
||
cfgSnap *proxycfg.ConfigSnapshot
|
||
node *envoy_config_core_v3.Node
|
||
stateCh <-chan *proxycfg.ConfigSnapshot
|
||
drainCh limiter.SessionTerminatedChan
|
||
watchCancel func()
|
||
proxyID structs.ServiceID
|
||
nonce uint64 // xDS requires a unique nonce to correlate response/request pairs
|
||
ready bool // set to true after the first snapshot arrives
|
||
|
||
streamStartTime = time.Now()
|
||
streamStartOnce sync.Once
|
||
)
|
||
|
||
var (
|
||
// resourceMap is the SoTW we are incrementally attempting to sync to envoy.
|
||
//
|
||
// type => name => proto
|
||
resourceMap = xdscommon.EmptyIndexedResources()
|
||
|
||
// currentVersions is the xDS versioning represented by Resources.
|
||
//
|
||
// type => name => version (as consul knows right now)
|
||
currentVersions = make(map[string]map[string]string)
|
||
)
|
||
|
||
generator := NewResourceGenerator(
|
||
s.Logger.Named(logging.XDS).With("xdsVersion", "v3"),
|
||
s.CfgFetcher,
|
||
true,
|
||
)
|
||
|
||
// need to run a small state machine to get through initial authentication.
|
||
var state = stateDeltaInit
|
||
|
||
// Configure handlers for each type of request we currently care about.
|
||
handlers := map[string]*xDSDeltaType{
|
||
xdscommon.ListenerType: newDeltaType(generator, stream, xdscommon.ListenerType, func(kind structs.ServiceKind) bool {
|
||
// Ingress and API gateways are allowed to inform LDS of no listeners.
|
||
return cfgSnap.Kind == structs.ServiceKindIngressGateway ||
|
||
cfgSnap.Kind == structs.ServiceKindAPIGateway
|
||
}),
|
||
xdscommon.RouteType: newDeltaType(generator, stream, xdscommon.RouteType, func(kind structs.ServiceKind) bool {
|
||
// Ingress and API gateways are allowed to inform RDS of no routes.
|
||
return cfgSnap.Kind == structs.ServiceKindIngressGateway ||
|
||
cfgSnap.Kind == structs.ServiceKindAPIGateway
|
||
}),
|
||
xdscommon.ClusterType: newDeltaType(generator, stream, xdscommon.ClusterType, func(kind structs.ServiceKind) bool {
|
||
// Mesh, Ingress, API and Terminating gateways are allowed to inform CDS of no clusters.
|
||
return cfgSnap.Kind == structs.ServiceKindMeshGateway ||
|
||
cfgSnap.Kind == structs.ServiceKindTerminatingGateway ||
|
||
cfgSnap.Kind == structs.ServiceKindIngressGateway ||
|
||
cfgSnap.Kind == structs.ServiceKindAPIGateway
|
||
}),
|
||
xdscommon.EndpointType: newDeltaType(generator, stream, xdscommon.EndpointType, nil),
|
||
xdscommon.SecretType: newDeltaType(generator, stream, xdscommon.SecretType, nil), // TODO allowEmptyFn
|
||
}
|
||
|
||
// Endpoints are stored within a Cluster (and Routes
|
||
// are stored within a Listener) so whenever the
|
||
// enclosing resource is updated the inner resource
|
||
// list is cleared implicitly.
|
||
//
|
||
// When this happens we should update our local
|
||
// representation of envoy state to force an update.
|
||
//
|
||
// see: https://github.com/envoyproxy/envoy/issues/13009
|
||
handlers[xdscommon.ListenerType].deltaChild = &xDSDeltaChild{
|
||
childType: handlers[xdscommon.RouteType],
|
||
childrenNames: make(map[string][]string),
|
||
}
|
||
handlers[xdscommon.ClusterType].deltaChild = &xDSDeltaChild{
|
||
childType: handlers[xdscommon.EndpointType],
|
||
childrenNames: make(map[string][]string),
|
||
}
|
||
|
||
var authTimer <-chan time.Time
|
||
extendAuthTimer := func() {
|
||
authTimer = time.After(s.AuthCheckFrequency)
|
||
}
|
||
|
||
checkStreamACLs := func(cfgSnap *proxycfg.ConfigSnapshot) error {
|
||
return s.authorize(stream.Context(), cfgSnap)
|
||
}
|
||
|
||
for {
|
||
select {
|
||
case <-drainCh:
|
||
generator.Logger.Debug("draining stream to rebalance load")
|
||
metrics.IncrCounter([]string{"xds", "server", "streamDrained"}, 1)
|
||
return errOverwhelmed
|
||
case <-authTimer:
|
||
// It's been too long since a Discovery{Request,Response} so recheck ACLs.
|
||
if err := checkStreamACLs(cfgSnap); err != nil {
|
||
return err
|
||
}
|
||
extendAuthTimer()
|
||
|
||
case req, ok := <-reqCh:
|
||
if !ok {
|
||
// reqCh is closed when stream.Recv errors which is how we detect client
|
||
// going away. AFAICT the stream.Context() is only canceled once the
|
||
// RPC method returns which it can't until we return from this one so
|
||
// there's no point in blocking on that.
|
||
return nil
|
||
}
|
||
|
||
generator.logTraceRequest("Incremental xDS v3", req)
|
||
|
||
if req.TypeUrl == "" {
|
||
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
|
||
}
|
||
|
||
if node == nil && req.Node != nil {
|
||
node = req.Node
|
||
var err error
|
||
generator.ProxyFeatures, err = xdscommon.DetermineSupportedProxyFeatures(req.Node)
|
||
if err != nil {
|
||
return status.Errorf(codes.InvalidArgument, err.Error())
|
||
}
|
||
}
|
||
|
||
if handler, ok := handlers[req.TypeUrl]; ok {
|
||
switch handler.Recv(req, generator.ProxyFeatures) {
|
||
case deltaRecvNewSubscription:
|
||
generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl)
|
||
|
||
case deltaRecvResponseNack:
|
||
generator.Logger.Trace("got nack response for type", "typeUrl", req.TypeUrl)
|
||
|
||
// There is no reason to believe that generating new xDS resources from the same snapshot
|
||
// would lead to an ACK from Envoy. Instead we continue to the top of this for loop and wait
|
||
// for a new request or snapshot.
|
||
continue
|
||
}
|
||
}
|
||
|
||
case cs, ok := <-stateCh:
|
||
if !ok {
|
||
// stateCh is closed either when *we* cancel the watch (on-exit via defer)
|
||
// or by the proxycfg.Manager when an irrecoverable error is encountered
|
||
// such as the ACL token getting deleted.
|
||
//
|
||
// We know for sure that this is the latter case, because in the former we
|
||
// would've already exited this loop.
|
||
return status.Error(codes.Aborted, "xDS stream terminated due to an irrecoverable error, please try again")
|
||
}
|
||
cfgSnap = cs
|
||
|
||
newRes, err := generator.AllResourcesFromSnapshot(cfgSnap)
|
||
if err != nil {
|
||
return status.Errorf(codes.Unavailable, "failed to generate all xDS resources from the snapshot: %v", err)
|
||
}
|
||
|
||
// index and hash the xDS structures
|
||
newResourceMap := xdscommon.IndexResources(generator.Logger, newRes)
|
||
|
||
if s.ResourceMapMutateFn != nil {
|
||
s.ResourceMapMutateFn(newResourceMap)
|
||
}
|
||
|
||
if newResourceMap, err = s.applyEnvoyExtensions(newResourceMap, cfgSnap, node); err != nil {
|
||
// err is already the result of calling status.Errorf
|
||
return err
|
||
}
|
||
|
||
if err := populateChildIndexMap(newResourceMap); err != nil {
|
||
return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err)
|
||
}
|
||
|
||
newVersions, err := computeResourceVersions(newResourceMap)
|
||
if err != nil {
|
||
return status.Errorf(codes.Unavailable, "failed to compute xDS resource versions: %v", err)
|
||
}
|
||
|
||
resourceMap = newResourceMap
|
||
currentVersions = newVersions
|
||
ready = true
|
||
}
|
||
|
||
// Trigger state machine
|
||
switch state {
|
||
case stateDeltaInit:
|
||
if node == nil {
|
||
// This can't happen (tm) since stateCh is nil until after the first req
|
||
// is received but lets not panic about it.
|
||
continue
|
||
}
|
||
|
||
nodeName := node.GetMetadata().GetFields()["node_name"].GetStringValue()
|
||
if nodeName == "" {
|
||
nodeName = s.NodeName
|
||
}
|
||
|
||
// Start authentication process, we need the proxyID
|
||
proxyID = structs.NewServiceID(node.Id, parseEnterpriseMeta(node))
|
||
|
||
// Start watching config for that proxy
|
||
var err error
|
||
options, err := external.QueryOptionsFromContext(stream.Context())
|
||
if err != nil {
|
||
return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err)
|
||
}
|
||
|
||
stateCh, drainCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, options.Token)
|
||
switch {
|
||
case errors.Is(err, limiter.ErrCapacityReached):
|
||
return errOverwhelmed
|
||
case err != nil:
|
||
return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err)
|
||
}
|
||
// Note that in this case we _intend_ the defer to only be triggered when
|
||
// this whole process method ends (i.e. when streaming RPC aborts) not at
|
||
// the end of the current loop iteration. We have to do it in the loop
|
||
// here since we can't start watching until we get to this state in the
|
||
// state machine.
|
||
defer watchCancel()
|
||
|
||
generator.Logger = generator.Logger.With("service_id", proxyID.String()) // enhance future logs
|
||
|
||
generator.Logger.Trace("watching proxy, pending initial proxycfg snapshot for xDS")
|
||
|
||
// Now wait for the config so we can check ACL
|
||
state = stateDeltaPendingInitialConfig
|
||
case stateDeltaPendingInitialConfig:
|
||
if cfgSnap == nil {
|
||
// Nothing we can do until we get the initial config
|
||
continue
|
||
}
|
||
|
||
// Got config, try to authenticate next.
|
||
state = stateDeltaRunning
|
||
|
||
// Upgrade the logger
|
||
switch cfgSnap.Kind {
|
||
case structs.ServiceKindConnectProxy:
|
||
case structs.ServiceKindTerminatingGateway:
|
||
generator.Logger = generator.Logger.Named(logging.TerminatingGateway)
|
||
case structs.ServiceKindMeshGateway:
|
||
generator.Logger = generator.Logger.Named(logging.MeshGateway)
|
||
case structs.ServiceKindIngressGateway:
|
||
generator.Logger = generator.Logger.Named(logging.IngressGateway)
|
||
}
|
||
|
||
generator.Logger.Trace("Got initial config snapshot")
|
||
|
||
// Let's actually process the config we just got, or we'll miss responding
|
||
fallthrough
|
||
case stateDeltaRunning:
|
||
// Check ACLs on every Discovery{Request,Response}.
|
||
if err := checkStreamACLs(cfgSnap); err != nil {
|
||
return err
|
||
}
|
||
// For the first time through the state machine, this is when the
|
||
// timer is first started.
|
||
extendAuthTimer()
|
||
|
||
if !ready {
|
||
generator.Logger.Trace("Skipping delta computation because we haven't gotten a snapshot yet")
|
||
continue
|
||
}
|
||
|
||
generator.Logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any")
|
||
|
||
streamStartOnce.Do(func() {
|
||
metrics.MeasureSince([]string{"xds", "server", "streamStart"}, streamStartTime)
|
||
})
|
||
|
||
for _, op := range xDSUpdateOrder {
|
||
if op.TypeUrl == xdscommon.ListenerType || op.TypeUrl == xdscommon.RouteType {
|
||
if clusterHandler := handlers[xdscommon.ClusterType]; clusterHandler.registered && len(clusterHandler.pendingUpdates) > 0 {
|
||
generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
|
||
"typeUrl", op.TypeUrl, "dependent", xdscommon.ClusterType)
|
||
|
||
// Receiving an ACK from Envoy will unblock the select statement above,
|
||
// and re-trigger an attempt to send these skipped updates.
|
||
break
|
||
}
|
||
if endpointHandler := handlers[xdscommon.EndpointType]; endpointHandler.registered && len(endpointHandler.pendingUpdates) > 0 {
|
||
generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
|
||
"typeUrl", op.TypeUrl, "dependent", xdscommon.EndpointType)
|
||
|
||
// Receiving an ACK from Envoy will unblock the select statement above,
|
||
// and re-trigger an attempt to send these skipped updates.
|
||
break
|
||
}
|
||
}
|
||
err, _ := handlers[op.TypeUrl].SendIfNew(
|
||
cfgSnap.Kind,
|
||
currentVersions[op.TypeUrl],
|
||
resourceMap,
|
||
&nonce,
|
||
op.Upsert,
|
||
op.Remove,
|
||
)
|
||
if err != nil {
|
||
return status.Errorf(codes.Unavailable,
|
||
"failed to send %sreply for type %q: %v",
|
||
op.errorLogNameReplyPrefix(),
|
||
op.TypeUrl, err)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, cfgSnap *proxycfg.ConfigSnapshot, node *envoy_config_core_v3.Node) (*xdscommon.IndexedResources, error) {
|
||
var err error
|
||
envoyVersion := xdscommon.DetermineEnvoyVersionFromNode(node)
|
||
consulVersion, err := goversion.NewVersion(version.Version)
|
||
|
||
if err != nil {
|
||
return nil, status.Errorf(codes.InvalidArgument, "failed to parse Consul version")
|
||
}
|
||
|
||
serviceConfigs := extensionruntime.GetRuntimeConfigurations(cfgSnap)
|
||
for _, cfgs := range serviceConfigs {
|
||
for _, cfg := range cfgs {
|
||
resources, err = validateAndApplyEnvoyExtension(s.Logger, cfgSnap, resources, cfg, envoyVersion, consulVersion)
|
||
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
}
|
||
|
||
return resources, nil
|
||
}
|
||
|
||
func validateAndApplyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot, resources *xdscommon.IndexedResources, runtimeConfig extensioncommon.RuntimeConfig, envoyVersion, consulVersion *goversion.Version) (*xdscommon.IndexedResources, error) {
|
||
logFn := logger.Warn
|
||
if runtimeConfig.EnvoyExtension.Required {
|
||
logFn = logger.Error
|
||
}
|
||
|
||
svc := runtimeConfig.ServiceName
|
||
|
||
errorParams := []interface{}{
|
||
"extension", runtimeConfig.EnvoyExtension.Name,
|
||
"service", svc.Name,
|
||
"namespace", svc.Namespace,
|
||
"partition", svc.Partition,
|
||
}
|
||
|
||
getMetricLabels := func(err error) []metrics.Label {
|
||
return []metrics.Label{
|
||
{Name: "extension", Value: runtimeConfig.EnvoyExtension.Name},
|
||
{Name: "version", Value: "builtin/" + version.Version},
|
||
{Name: "service", Value: cfgSnap.Service},
|
||
{Name: "partition", Value: cfgSnap.ProxyID.PartitionOrDefault()},
|
||
{Name: "namespace", Value: cfgSnap.ProxyID.NamespaceOrDefault()},
|
||
{Name: "error", Value: strconv.FormatBool(err != nil)},
|
||
}
|
||
}
|
||
|
||
ext := runtimeConfig.EnvoyExtension
|
||
|
||
if v := ext.EnvoyVersion; v != "" {
|
||
c, err := goversion.NewConstraint(v)
|
||
if err != nil {
|
||
logFn("failed to parse Envoy extension version constraint", errorParams...)
|
||
|
||
if ext.Required {
|
||
return nil, status.Errorf(codes.InvalidArgument, "failed to parse Envoy version constraint for extension %q for service %q", ext.Name, svc.Name)
|
||
}
|
||
return resources, nil
|
||
}
|
||
|
||
if !c.Check(envoyVersion) {
|
||
logger.Info("skipping envoy extension due to Envoy version constraint violation", errorParams...)
|
||
return resources, nil
|
||
}
|
||
}
|
||
|
||
if v := ext.ConsulVersion; v != "" {
|
||
c, err := goversion.NewConstraint(v)
|
||
if err != nil {
|
||
logFn("failed to parse Consul extension version constraint", errorParams...)
|
||
|
||
if ext.Required {
|
||
return nil, status.Errorf(codes.InvalidArgument, "failed to parse Consul version constraint for extension %q for service %q", ext.Name, svc.Name)
|
||
}
|
||
return resources, nil
|
||
}
|
||
|
||
if !c.Check(consulVersion) {
|
||
logger.Info("skipping envoy extension due to Consul version constraint violation", errorParams...)
|
||
return resources, nil
|
||
}
|
||
}
|
||
|
||
now := time.Now()
|
||
extender, err := envoyextensions.ConstructExtension(ext)
|
||
metrics.MeasureSinceWithLabels([]string{"envoy_extension", "validate_arguments"}, now, getMetricLabels(err))
|
||
if err != nil {
|
||
errorParams = append(errorParams, "error", err)
|
||
logFn("failed to construct extension", errorParams...)
|
||
|
||
if ext.Required {
|
||
return nil, status.Errorf(codes.InvalidArgument, "failed to construct extension %q for service %q", ext.Name, svc.Name)
|
||
}
|
||
|
||
return resources, nil
|
||
}
|
||
|
||
now = time.Now()
|
||
err = extender.Validate(&runtimeConfig)
|
||
metrics.MeasureSinceWithLabels([]string{"envoy_extension", "validate"}, now, getMetricLabels(err))
|
||
if err != nil {
|
||
errorParams = append(errorParams, "error", err)
|
||
logFn("failed to validate extension arguments", errorParams...)
|
||
|
||
if ext.Required {
|
||
return nil, status.Errorf(codes.InvalidArgument, "failed to validate arguments for extension %q for service %q", ext.Name, svc.Name)
|
||
}
|
||
|
||
return resources, nil
|
||
}
|
||
|
||
now = time.Now()
|
||
resources, err = applyEnvoyExtension(extender, resources, &runtimeConfig)
|
||
metrics.MeasureSinceWithLabels([]string{"envoy_extension", "extend"}, now, getMetricLabels(err))
|
||
if err != nil {
|
||
errorParams = append(errorParams, "error", err)
|
||
logFn("failed to apply envoy extension", errorParams...)
|
||
|
||
if ext.Required {
|
||
return nil, status.Errorf(codes.InvalidArgument, "failed to patch xDS resources in the %q extension: %v", ext.Name, err)
|
||
}
|
||
}
|
||
|
||
return resources, nil
|
||
}
|
||
|
||
// applyEnvoyExtension safely checks whether an extension can be applied, and if so attempts to apply it.
|
||
//
|
||
// applyEnvoyExtension makes a copy of the provided IndexedResources, then applies the given extension to them.
|
||
// The copy ensures against partial application if a non-required extension modifies a resource then fails at a later
|
||
// stage; this is necessary because IndexedResources and its proto messages are all passed by reference, and
|
||
// non-required extensions do not lead to a terminal failure in xDS updates.
|
||
//
|
||
// If the application is successful, the modified copy is returned. If not, the original and an error is returned.
|
||
// Returning resources in either case allows for applying extensions in a loop and reporting on non-required extension
|
||
// failures simultaneously.
|
||
func applyEnvoyExtension(extender extensioncommon.EnvoyExtender, resources *xdscommon.IndexedResources, runtimeConfig *extensioncommon.RuntimeConfig) (r *xdscommon.IndexedResources, e error) {
|
||
// Don't panic due to an extension misbehaving.
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
r = resources
|
||
e = fmt.Errorf("attempt to apply Envoy extension %q caused an unexpected panic: %v",
|
||
runtimeConfig.EnvoyExtension.Name, err)
|
||
}
|
||
}()
|
||
|
||
// First check whether the extension is eligible for application in the current environment.
|
||
// Do this before copying indexed resources for the sake of efficiency.
|
||
if !extender.CanApply(runtimeConfig) {
|
||
return resources, nil
|
||
}
|
||
|
||
newResources, err := extender.Extend(xdscommon.Clone(resources), runtimeConfig)
|
||
if err != nil {
|
||
return resources, err
|
||
}
|
||
|
||
return newResources, nil
|
||
}
|
||
|
||
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations
|
||
var xDSUpdateOrder = []xDSUpdateOperation{
|
||
// 1. SDS updates (if any) can be pushed here with no harm.
|
||
{TypeUrl: xdscommon.SecretType, Upsert: true},
|
||
// 2. CDS updates (if any) must always be pushed before the following types.
|
||
{TypeUrl: xdscommon.ClusterType, Upsert: true},
|
||
// 3. EDS updates (if any) must arrive after CDS updates for the respective clusters.
|
||
{TypeUrl: xdscommon.EndpointType, Upsert: true},
|
||
// 4. LDS updates must arrive after corresponding CDS/EDS updates.
|
||
{TypeUrl: xdscommon.ListenerType, Upsert: true, Remove: true},
|
||
// 5. RDS updates related to the newly added listeners must arrive after CDS/EDS/LDS updates.
|
||
{TypeUrl: xdscommon.RouteType, Upsert: true, Remove: true},
|
||
// 6. (NOT IMPLEMENTED YET IN CONSUL) VHDS updates (if any) related to the newly added RouteConfigurations must arrive after RDS updates.
|
||
// {},
|
||
// 7. Stale CDS clusters, related EDS endpoints (ones no longer being referenced) and SDS secrets can then be removed.
|
||
{TypeUrl: xdscommon.ClusterType, Remove: true},
|
||
{TypeUrl: xdscommon.EndpointType, Remove: true},
|
||
{TypeUrl: xdscommon.SecretType, Remove: true},
|
||
// xDS updates can be pushed independently if no new
|
||
// clusters/routes/listeners are added or if it’s acceptable to
|
||
// temporarily drop traffic during updates. Note that in case of
|
||
// LDS updates, the listeners will be warmed before they receive
|
||
// traffic, i.e. the dependent routes are fetched through RDS if
|
||
// configured. Clusters are warmed when adding/removing/updating
|
||
// clusters. On the other hand, routes are not warmed, i.e., the
|
||
// management plane must ensure that clusters referenced by a route
|
||
// are in place, before pushing the updates for a route.
|
||
}
|
||
|
||
type xDSUpdateOperation struct {
|
||
TypeUrl string
|
||
Upsert bool
|
||
Remove bool
|
||
}
|
||
|
||
func (op *xDSUpdateOperation) errorLogNameReplyPrefix() string {
|
||
switch {
|
||
case op.Upsert && op.Remove:
|
||
return "upsert/remove "
|
||
case op.Upsert:
|
||
return "upsert "
|
||
case op.Remove:
|
||
return "remove "
|
||
default:
|
||
return ""
|
||
}
|
||
}
|
||
|
||
type xDSDeltaChild struct {
|
||
// childType is a type that in Envoy is actually stored within this type.
|
||
// Upserts of THIS type should potentially trigger dependent named
|
||
// resources within the child to be re-configured.
|
||
childType *xDSDeltaType
|
||
|
||
// childrenNames is map of parent resource names to a list of associated child resource
|
||
// names.
|
||
childrenNames map[string][]string
|
||
}
|
||
|
||
type xDSDeltaType struct {
|
||
generator *ResourceGenerator
|
||
stream ADSDeltaStream
|
||
typeURL string
|
||
allowEmptyFn func(kind structs.ServiceKind) bool
|
||
|
||
// deltaChild contains data for an xDS child type if there is one.
|
||
// For example, endpoints are a child type of clusters.
|
||
deltaChild *xDSDeltaChild
|
||
|
||
// registered indicates if this type has been requested at least once by
|
||
// the proxy
|
||
registered bool
|
||
|
||
// wildcard indicates that this type was requested with no preference for
|
||
// specific resource names. subscribe/unsubscribe are ignored.
|
||
wildcard bool
|
||
|
||
// sentToEnvoyOnce is true after we've sent one response to envoy.
|
||
sentToEnvoyOnce bool
|
||
|
||
// subscriptions is the set of currently subscribed envoy resources.
|
||
// If wildcard == true, this will be empty.
|
||
subscriptions map[string]struct{}
|
||
|
||
// resourceVersions is the current view of CONFIRMED/ACKed updates to
|
||
// envoy's view of the loaded resources.
|
||
//
|
||
// name => version
|
||
resourceVersions map[string]string
|
||
|
||
// pendingUpdates is a set of un-ACKed updates to the 'resourceVersions'
|
||
// map. Once we get an ACK from envoy we'll update the resourceVersions map
|
||
// and strike the entry from this map.
|
||
//
|
||
// nonce -> name -> {version}
|
||
pendingUpdates map[string]map[string]PendingUpdate
|
||
}
|
||
|
||
func (t *xDSDeltaType) subscribed(name string) bool {
|
||
if t.wildcard {
|
||
return true
|
||
}
|
||
_, subscribed := t.subscriptions[name]
|
||
return subscribed
|
||
}
|
||
|
||
type PendingUpdate struct {
|
||
Remove bool
|
||
Version string
|
||
}
|
||
|
||
func newDeltaType(
|
||
generator *ResourceGenerator,
|
||
stream ADSDeltaStream,
|
||
typeUrl string,
|
||
allowEmptyFn func(kind structs.ServiceKind) bool,
|
||
) *xDSDeltaType {
|
||
return &xDSDeltaType{
|
||
generator: generator,
|
||
stream: stream,
|
||
typeURL: typeUrl,
|
||
allowEmptyFn: allowEmptyFn,
|
||
subscriptions: make(map[string]struct{}),
|
||
resourceVersions: make(map[string]string),
|
||
pendingUpdates: make(map[string]map[string]PendingUpdate),
|
||
}
|
||
}
|
||
|
||
// Recv handles new discovery requests from envoy.
|
||
//
|
||
// Returns true the first time a type receives a request.
|
||
func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf xdscommon.SupportedProxyFeatures) deltaRecvResponse {
|
||
if t == nil {
|
||
return deltaRecvUnknownType // not something we care about
|
||
}
|
||
logger := t.generator.Logger.With("typeUrl", t.typeURL)
|
||
|
||
registeredThisTime := false
|
||
if !t.registered {
|
||
// We are in the wildcard mode if the first request of a particular
|
||
// type has empty subscription list
|
||
t.wildcard = len(req.ResourceNamesSubscribe) == 0
|
||
t.registered = true
|
||
registeredThisTime = true
|
||
}
|
||
|
||
/*
|
||
DeltaDiscoveryRequest can be sent in the following situations:
|
||
|
||
Initial message in a xDS bidirectional gRPC stream.
|
||
|
||
As an ACK or NACK response to a previous DeltaDiscoveryResponse. In
|
||
this case the response_nonce is set to the nonce value in the Response.
|
||
ACK or NACK is determined by the absence or presence of error_detail.
|
||
|
||
Spontaneous DeltaDiscoveryRequests from the client. This can be done to
|
||
dynamically add or remove elements from the tracked resource_names set.
|
||
In this case response_nonce must be omitted.
|
||
|
||
*/
|
||
|
||
/*
|
||
DeltaDiscoveryRequest plays two independent roles. Any
|
||
DeltaDiscoveryRequest can be either or both of:
|
||
*/
|
||
|
||
if req.ResponseNonce != "" {
|
||
/*
|
||
[2] (N)ACKing an earlier resource update from the server (using
|
||
response_nonce, with presence of error_detail making it a NACK).
|
||
*/
|
||
if req.ErrorDetail == nil {
|
||
logger.Trace("got ok response from envoy proxy", "nonce", req.ResponseNonce)
|
||
t.ack(req.ResponseNonce)
|
||
} else {
|
||
logger.Error("got error response from envoy proxy", "nonce", req.ResponseNonce,
|
||
"error", status.ErrorProto(req.ErrorDetail))
|
||
t.nack(req.ResponseNonce)
|
||
return deltaRecvResponseNack
|
||
}
|
||
}
|
||
|
||
if registeredThisTime && len(req.InitialResourceVersions) > 0 {
|
||
/*
|
||
Additionally, the first message (for a given type_url) of a
|
||
reconnected gRPC stream has a third role:
|
||
|
||
[3] informing the server of the resources (and their versions) that
|
||
the client already possesses, using the initial_resource_versions
|
||
field.
|
||
*/
|
||
logger.Trace("setting initial resource versions for stream",
|
||
"resources", req.InitialResourceVersions)
|
||
t.resourceVersions = req.InitialResourceVersions
|
||
if !t.wildcard {
|
||
for k := range req.InitialResourceVersions {
|
||
t.subscriptions[k] = struct{}{}
|
||
}
|
||
}
|
||
}
|
||
|
||
if !t.wildcard {
|
||
/*
|
||
[1] informing the server of what resources the client has
|
||
gained/lost interest in (using resource_names_subscribe and
|
||
resource_names_unsubscribe), or
|
||
*/
|
||
for _, name := range req.ResourceNamesSubscribe {
|
||
// A resource_names_subscribe field may contain resource names that
|
||
// the server believes the client is already subscribed to, and
|
||
// furthermore has the most recent versions of. However, the server
|
||
// must still provide those resources in the response; due to
|
||
// implementation details hidden from the server, the client may
|
||
// have “forgotten” those resources despite apparently remaining
|
||
// subscribed.
|
||
//
|
||
// NOTE: the server must respond with all resources listed in
|
||
// resource_names_subscribe, even if it believes the client has the
|
||
// most recent version of them. The reason: the client may have
|
||
// dropped them, but then regained interest before it had a chance
|
||
// to send the unsubscribe message.
|
||
//
|
||
// We handle that here by ALWAYS wiping the version so the diff
|
||
// decides to send the value.
|
||
_, alreadySubscribed := t.subscriptions[name]
|
||
t.subscriptions[name] = struct{}{}
|
||
|
||
// Reset the tracked version so we force a reply.
|
||
if _, alreadyTracked := t.resourceVersions[name]; alreadyTracked {
|
||
t.resourceVersions[name] = ""
|
||
}
|
||
|
||
// Certain xDS types are children of other types, meaning that if Envoy subscribes to a parent.
|
||
// We MUST assume that if Envoy ever had data for the children of this parent, then the child's
|
||
// data is gone.
|
||
if t.deltaChild != nil && t.deltaChild.childType.registered {
|
||
for _, childName := range t.deltaChild.childrenNames[name] {
|
||
t.ensureChildResend(name, childName)
|
||
}
|
||
}
|
||
|
||
if alreadySubscribed {
|
||
logger.Trace("re-subscribing resource for stream", "resource", name)
|
||
} else {
|
||
logger.Trace("subscribing resource for stream", "resource", name)
|
||
}
|
||
}
|
||
|
||
for _, name := range req.ResourceNamesUnsubscribe {
|
||
if _, ok := t.subscriptions[name]; !ok {
|
||
continue
|
||
}
|
||
delete(t.subscriptions, name)
|
||
logger.Trace("unsubscribing resource for stream", "resource", name)
|
||
// NOTE: we'll let the normal differential comparison handle cleaning up resourceVersions
|
||
}
|
||
}
|
||
|
||
if registeredThisTime {
|
||
return deltaRecvNewSubscription
|
||
}
|
||
return deltaRecvResponseAck
|
||
}
|
||
|
||
func (t *xDSDeltaType) ack(nonce string) {
|
||
pending, ok := t.pendingUpdates[nonce]
|
||
if !ok {
|
||
return
|
||
}
|
||
|
||
for name, obj := range pending {
|
||
if obj.Remove {
|
||
delete(t.resourceVersions, name)
|
||
continue
|
||
}
|
||
|
||
t.resourceVersions[name] = obj.Version
|
||
}
|
||
t.sentToEnvoyOnce = true
|
||
delete(t.pendingUpdates, nonce)
|
||
}
|
||
|
||
func (t *xDSDeltaType) nack(nonce string) {
|
||
delete(t.pendingUpdates, nonce)
|
||
}
|
||
|
||
func (t *xDSDeltaType) SendIfNew(
|
||
kind structs.ServiceKind,
|
||
currentVersions map[string]string, // type => name => version (as consul knows right now)
|
||
resourceMap *xdscommon.IndexedResources,
|
||
nonce *uint64,
|
||
upsert, remove bool,
|
||
) (error, bool) {
|
||
if t == nil || !t.registered {
|
||
return nil, false
|
||
}
|
||
|
||
// Wait for Envoy to catch up with this delta type before sending something new.
|
||
if len(t.pendingUpdates) > 0 {
|
||
return nil, false
|
||
}
|
||
|
||
logger := t.generator.Logger.With("typeUrl", t.typeURL)
|
||
|
||
allowEmpty := t.allowEmptyFn != nil && t.allowEmptyFn(kind)
|
||
|
||
// Zero length resource responses should be ignored and are the result of no
|
||
// data yet. Notice that this caused a bug originally where we had zero
|
||
// healthy endpoints for an upstream that would cause Envoy to hang waiting
|
||
// for the EDS response. This is fixed though by ensuring we send an explicit
|
||
// empty LoadAssignment resource for the cluster rather than allowing junky
|
||
// empty resources.
|
||
if len(currentVersions) == 0 && !allowEmpty {
|
||
// Nothing to send yet
|
||
return nil, false
|
||
}
|
||
|
||
resp, updates, err := t.createDeltaResponse(currentVersions, resourceMap, upsert, remove)
|
||
if err != nil {
|
||
return err, false
|
||
}
|
||
|
||
if resp == nil {
|
||
return nil, false
|
||
}
|
||
|
||
*nonce++
|
||
resp.Nonce = fmt.Sprintf("%08x", *nonce)
|
||
|
||
t.generator.logTraceResponse("Incremental xDS v3", resp)
|
||
|
||
logger.Trace("sending response", "nonce", resp.Nonce)
|
||
if err := t.stream.Send(resp); err != nil {
|
||
return err, false
|
||
}
|
||
logger.Trace("sent response", "nonce", resp.Nonce)
|
||
|
||
// Certain xDS types are children of other types, meaning that if an update is pushed for a parent,
|
||
// we MUST send new data for all its children. Envoy will NOT re-subscribe to the child data upon
|
||
// receiving updates for the parent, so we need to handle this ourselves.
|
||
//
|
||
// Note that we do not check whether the deltaChild.childType is registered here, since we send
|
||
// parent types before child types, meaning that it's expected on first send of a parent that
|
||
// there are no subscriptions for the child type.
|
||
if t.deltaChild != nil {
|
||
for name := range updates {
|
||
if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok {
|
||
// Capture the relevant child resource names on this pending update so
|
||
// we can know the linked children if Envoy ever re-subscribes to the parent resource.
|
||
t.deltaChild.childrenNames[name] = children
|
||
|
||
for _, childName := range children {
|
||
t.ensureChildResend(name, childName)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
t.pendingUpdates[resp.Nonce] = updates
|
||
|
||
return nil, true
|
||
}
|
||
|
||
func (t *xDSDeltaType) createDeltaResponse(
|
||
currentVersions map[string]string, // name => version (as consul knows right now)
|
||
resourceMap *xdscommon.IndexedResources,
|
||
upsert, remove bool,
|
||
) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]PendingUpdate, error) {
|
||
// compute difference
|
||
var (
|
||
hasRelevantUpdates = false
|
||
updates = make(map[string]PendingUpdate)
|
||
)
|
||
|
||
if t.wildcard {
|
||
// First find things that need updating or deleting
|
||
for name, envoyVers := range t.resourceVersions {
|
||
currVers, ok := currentVersions[name]
|
||
if !ok {
|
||
if remove {
|
||
hasRelevantUpdates = true
|
||
}
|
||
updates[name] = PendingUpdate{Remove: true}
|
||
} else if currVers != envoyVers {
|
||
if upsert {
|
||
hasRelevantUpdates = true
|
||
}
|
||
updates[name] = PendingUpdate{Version: currVers}
|
||
}
|
||
}
|
||
|
||
// Now find new things
|
||
for name, currVers := range currentVersions {
|
||
if _, known := t.resourceVersions[name]; known {
|
||
continue
|
||
}
|
||
if upsert {
|
||
hasRelevantUpdates = true
|
||
}
|
||
updates[name] = PendingUpdate{Version: currVers}
|
||
}
|
||
} else {
|
||
// First find things that need updating or deleting
|
||
|
||
// Walk the list of things currently stored in envoy
|
||
for name, envoyVers := range t.resourceVersions {
|
||
if t.subscribed(name) {
|
||
if currVers, ok := currentVersions[name]; ok {
|
||
if currVers != envoyVers {
|
||
if upsert {
|
||
hasRelevantUpdates = true
|
||
}
|
||
updates[name] = PendingUpdate{Version: currVers}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// Now find new things not in envoy yet
|
||
for name := range t.subscriptions {
|
||
if _, known := t.resourceVersions[name]; known {
|
||
continue
|
||
}
|
||
if currVers, ok := currentVersions[name]; ok {
|
||
updates[name] = PendingUpdate{Version: currVers}
|
||
if upsert {
|
||
hasRelevantUpdates = true
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
if !hasRelevantUpdates && t.sentToEnvoyOnce {
|
||
return nil, nil, nil
|
||
}
|
||
|
||
// now turn this into a disco response
|
||
resp := &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||
// TODO(rb): consider putting something in SystemVersionInfo?
|
||
TypeUrl: t.typeURL,
|
||
}
|
||
realUpdates := make(map[string]PendingUpdate)
|
||
for name, obj := range updates {
|
||
if obj.Remove {
|
||
if remove {
|
||
resp.RemovedResources = append(resp.RemovedResources, name)
|
||
realUpdates[name] = PendingUpdate{Remove: true}
|
||
}
|
||
} else if upsert {
|
||
resources, ok := resourceMap.Index[t.typeURL]
|
||
if !ok {
|
||
return nil, nil, fmt.Errorf("unknown type url: %s", t.typeURL)
|
||
}
|
||
res, ok := resources[name]
|
||
if !ok {
|
||
return nil, nil, fmt.Errorf("unknown name for type url %q: %s", t.typeURL, name)
|
||
}
|
||
any, err := anypb.New(res)
|
||
if err != nil {
|
||
return nil, nil, err
|
||
}
|
||
|
||
resp.Resources = append(resp.Resources, &envoy_discovery_v3.Resource{
|
||
Name: name,
|
||
Resource: any,
|
||
Version: obj.Version,
|
||
})
|
||
realUpdates[name] = obj
|
||
}
|
||
}
|
||
|
||
return resp, realUpdates, nil
|
||
}
|
||
|
||
func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
|
||
if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist {
|
||
return
|
||
}
|
||
if !t.subscribed(childName) {
|
||
return
|
||
}
|
||
|
||
t.generator.Logger.Trace(
|
||
"triggering implicit update of resource",
|
||
"typeUrl", t.typeURL,
|
||
"resource", parentName,
|
||
"childTypeUrl", t.deltaChild.childType.typeURL,
|
||
"childResource", childName,
|
||
)
|
||
|
||
// resourceVersions tracks the last known version for this childName that Envoy
|
||
// has ACKed. By setting this to empty it effectively tells us that Envoy does
|
||
// not have any data for that child, and we need to re-send.
|
||
t.deltaChild.childType.resourceVersions[childName] = ""
|
||
}
|
||
|
||
func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {
|
||
out := make(map[string]map[string]string)
|
||
for typeUrl, resources := range resourceMap.Index {
|
||
m, err := hashResourceMap(resources)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to hash resources for %q: %v", typeUrl, err)
|
||
}
|
||
out[typeUrl] = m
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
func populateChildIndexMap(resourceMap *xdscommon.IndexedResources) error {
|
||
// LDS and RDS have a more complicated relationship.
|
||
for name, res := range resourceMap.Index[xdscommon.ListenerType] {
|
||
listener := res.(*envoy_listener_v3.Listener)
|
||
rdsRouteNames, err := extractRdsResourceNames(listener)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
resourceMap.ChildIndex[xdscommon.ListenerType][name] = rdsRouteNames
|
||
}
|
||
|
||
// CDS and EDS share exact names.
|
||
for name := range resourceMap.Index[xdscommon.ClusterType] {
|
||
resourceMap.ChildIndex[xdscommon.ClusterType][name] = []string{name}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func hashResourceMap(resources map[string]proto.Message) (map[string]string, error) {
|
||
m := make(map[string]string)
|
||
for name, res := range resources {
|
||
h, err := hashResource(res)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to hash resource %q: %v", name, err)
|
||
}
|
||
m[name] = h
|
||
}
|
||
return m, nil
|
||
}
|
||
|
||
// hashResource will take a resource and create a SHA256 hash sum out of the marshaled bytes
|
||
func hashResource(res proto.Message) (string, error) {
|
||
h := sha256.New()
|
||
marshaller := proto.MarshalOptions{Deterministic: true}
|
||
|
||
data, err := marshaller.Marshal(res)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
h.Write(data)
|
||
|
||
return hex.EncodeToString(h.Sum(nil)), nil
|
||
}
|