2018-10-03 18:18:55 +00:00
package xds
import (
"context"
"errors"
"sync/atomic"
2019-01-11 15:43:18 +00:00
"time"
2018-10-03 18:18:55 +00:00
2021-02-26 22:23:15 +00:00
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
2021-02-22 21:00:15 +00:00
2021-05-14 18:59:13 +00:00
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
2020-01-28 23:50:41 +00:00
"github.com/hashicorp/go-hclog"
2020-06-23 20:19:56 +00:00
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
2020-12-23 17:50:28 +00:00
"github.com/hashicorp/consul/acl"
2022-07-13 15:33:48 +00:00
external "github.com/hashicorp/consul/agent/grpc-external"
2022-09-09 14:02:01 +00:00
"github.com/hashicorp/consul/agent/grpc-external/limiter"
2020-12-23 17:50:28 +00:00
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
2022-03-08 19:37:24 +00:00
"github.com/hashicorp/consul/agent/xds/xdscommon"
2018-10-03 18:18:55 +00:00
)
2023-01-16 17:31:56 +00:00
var (
StatsGauges = [ ] prometheus . GaugeDefinition {
{
Name : [ ] string { "xds" , "server" , "streams" } ,
Help : "Measures the number of active xDS streams handled by the server split by protocol version." ,
} ,
{
Name : [ ] string { "xds" , "server" , "streamsUnauthenticated" } ,
Help : "Counts the number of active xDS streams handled by the server that are unauthenticated because ACLs are not enabled or ACL tokens were missing." ,
} ,
}
StatsCounters = [ ] prometheus . CounterDefinition {
{
Name : [ ] string { "xds" , "server" , "streamDrained" } ,
Help : "Counts the number of xDS streams that are drained when rebalancing the load between servers." ,
} ,
}
StatsSummaries = [ ] prometheus . SummaryDefinition {
{
Name : [ ] string { "xds" , "server" , "streamStart" } ,
Help : "Measures the time in milliseconds after an xDS stream is opened until xDS resources are first generated for the stream." ,
} ,
}
)
2022-10-12 19:17:58 +00:00
2018-10-03 18:18:55 +00:00
// ADSStream is a shorter way of referring to this thing...
2021-02-26 22:23:15 +00:00
type ADSStream = envoy_discovery_v3 . AggregatedDiscoveryService_StreamAggregatedResourcesServer
2018-10-03 18:18:55 +00:00
const (
// LocalAgentClusterName is the name we give the local agent "cluster" in
2019-04-29 16:27:57 +00:00
// Envoy config. Note that all cluster names may collide with service names
// since we want cluster names and service names to match to enable nice
// metrics correlation without massaging prefixes on cluster names.
//
// We should probably make this more unlikely to collied however changing it
// potentially breaks upgrade compatibility without restarting all Envoy's as
// it will no longer match their existing cluster name. Changing this will
// affect metrics output so could break dashboards (for local agent traffic).
//
// We should probably just make it configurable if anyone actually has
// services named "local_agent" in the future.
2018-10-03 18:18:55 +00:00
LocalAgentClusterName = "local_agent"
2019-01-11 15:43:18 +00:00
2021-03-17 19:40:49 +00:00
// OriginalDestinationClusterName is the name we give to the passthrough
// cluster which redirects transparently-proxied requests to their original
2021-06-09 20:34:17 +00:00
// destination outside the mesh. This cluster prevents Consul from blocking
// connections to destinations outside of the catalog when in transparent
// proxy mode.
2021-03-17 19:40:49 +00:00
OriginalDestinationClusterName = "original-destination"
2019-01-11 15:43:18 +00:00
// DefaultAuthCheckFrequency is the default value for
// Server.AuthCheckFrequency to use when the zero value is provided.
DefaultAuthCheckFrequency = 5 * time . Minute
2018-10-03 18:18:55 +00:00
)
// ACLResolverFunc is a shim to resolve ACLs. Since ACL enforcement is so far
// entirely agent-local and all uses private methods this allows a simple shim
// to be written in the agent package to allow resolving without tightly
// coupling this to the agent.
2018-10-19 16:04:07 +00:00
type ACLResolverFunc func ( id string ) ( acl . Authorizer , error )
2018-10-03 18:18:55 +00:00
2019-09-26 02:55:52 +00:00
// ConfigFetcher is the interface the agent needs to expose
// for the xDS server to fetch agent config, currently only one field is fetched
type ConfigFetcher interface {
AdvertiseAddrLAN ( ) string
}
2022-05-27 11:38:52 +00:00
// ProxyConfigSource is the interface xds.Server requires to consume proxy
// config updates.
type ProxyConfigSource interface {
2023-01-18 18:33:21 +00:00
Watch ( id structs . ServiceID , nodeName string , token string ) ( <- chan * proxycfg . ConfigSnapshot , limiter . SessionTerminatedChan , proxycfg . CancelFunc , error )
2022-09-09 14:02:01 +00:00
}
2020-08-27 17:20:58 +00:00
// Server represents a gRPC server that can handle xDS requests from Envoy. All
// of it's public members must be set before the gRPC server is started.
2018-10-03 18:18:55 +00:00
//
// A full description of the XDS protocol can be found at
2019-06-03 16:03:05 +00:00
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol
2018-10-03 18:18:55 +00:00
type Server struct {
2023-01-18 18:33:21 +00:00
NodeName string
Logger hclog . Logger
CfgSrc ProxyConfigSource
ResolveToken ACLResolverFunc
CfgFetcher ConfigFetcher
2021-04-29 18:54:05 +00:00
2019-01-11 15:43:18 +00:00
// AuthCheckFrequency is how often we should re-check the credentials used
// during a long-lived gRPC Stream after it has been initially established.
// This is only used during idle periods of stream interactions (i.e. when
// there has been no recent DiscoveryRequest).
AuthCheckFrequency time . Duration
2021-02-26 22:23:15 +00:00
2021-09-21 14:58:56 +00:00
// ResourceMapMutateFn exclusively exists for testing purposes.
2022-03-08 19:37:24 +00:00
ResourceMapMutateFn func ( resourceMap * xdscommon . IndexedResources )
2021-09-21 14:58:56 +00:00
2022-12-08 19:46:42 +00:00
activeStreams * activeStreamCounters
2021-05-14 18:59:13 +00:00
}
2023-01-16 17:31:56 +00:00
// activeStreamCounters tracks various stream-related metrics.
// Requires that activeStreamCounters be a pointer field.
2021-05-14 18:59:13 +00:00
type activeStreamCounters struct {
2023-01-16 17:31:56 +00:00
xDSv3 atomic . Uint64
unauthenticated atomic . Uint64
2021-05-14 18:59:13 +00:00
}
2023-01-16 17:31:56 +00:00
func ( c * activeStreamCounters ) Increment ( ctx context . Context ) func ( ) {
// If no ACL token is found, increase the gauge.
o , _ := external . QueryOptionsFromContext ( ctx )
if o . Token == "" {
unauthn := c . unauthenticated . Add ( 1 )
metrics . SetGauge ( [ ] string { "xds" , "server" , "streamsUnauthenticated" } , float32 ( unauthn ) )
2021-05-14 18:59:13 +00:00
}
2023-01-16 17:31:56 +00:00
// Historically there had been a "v2" version.
labels := [ ] metrics . Label { { Name : "version" , Value : "v3" } }
count := c . xDSv3 . Add ( 1 )
2021-05-14 18:59:13 +00:00
metrics . SetGaugeWithLabels ( [ ] string { "xds" , "server" , "streams" } , float32 ( count ) , labels )
2023-01-16 17:31:56 +00:00
// This closure should be called in a defer to decrement the gauges after the stream is closed.
2021-05-14 18:59:13 +00:00
return func ( ) {
2023-01-16 17:31:56 +00:00
if o . Token == "" {
unauthn := c . unauthenticated . Add ( ^ uint64 ( 0 ) )
metrics . SetGauge ( [ ] string { "xds" , "server" , "streamsUnauthenticated" } , float32 ( unauthn ) )
}
count := c . xDSv3 . Add ( ^ uint64 ( 0 ) )
2021-05-14 18:59:13 +00:00
metrics . SetGaugeWithLabels ( [ ] string { "xds" , "server" , "streams" } , float32 ( count ) , labels )
}
2019-01-11 15:43:18 +00:00
}
2021-04-29 18:54:05 +00:00
func NewServer (
2022-05-27 11:38:52 +00:00
nodeName string ,
2021-04-29 18:54:05 +00:00
logger hclog . Logger ,
2022-05-27 11:38:52 +00:00
cfgMgr ProxyConfigSource ,
2021-04-29 18:54:05 +00:00
resolveToken ACLResolverFunc ,
cfgFetcher ConfigFetcher ,
) * Server {
return & Server {
2022-12-08 19:46:42 +00:00
NodeName : nodeName ,
Logger : logger ,
CfgSrc : cfgMgr ,
ResolveToken : resolveToken ,
CfgFetcher : cfgFetcher ,
AuthCheckFrequency : DefaultAuthCheckFrequency ,
activeStreams : & activeStreamCounters { } ,
2021-04-29 18:54:05 +00:00
}
}
2018-10-03 18:18:55 +00:00
// StreamAggregatedResources implements
2021-02-26 22:23:15 +00:00
// envoy_discovery_v3.AggregatedDiscoveryServiceServer. This is the ADS endpoint which is
2018-10-03 18:18:55 +00:00
// the only xDS API we directly support for now.
2021-04-29 18:54:05 +00:00
//
// Deprecated: use DeltaAggregatedResources instead
2018-10-03 18:18:55 +00:00
func ( s * Server ) StreamAggregatedResources ( stream ADSStream ) error {
2021-04-29 18:54:05 +00:00
return errors . New ( "not implemented" )
}
2022-03-22 12:40:24 +00:00
// Register the XDS server handlers to the given gRPC server.
func ( s * Server ) Register ( srv * grpc . Server ) {
2021-02-26 22:23:15 +00:00
envoy_discovery_v3 . RegisterAggregatedDiscoveryServiceServer ( srv , s )
2018-10-03 18:18:55 +00:00
}
2021-04-29 18:54:05 +00:00
2022-08-11 09:19:36 +00:00
func ( s * Server ) authenticate ( ctx context . Context ) ( acl . Authorizer , error ) {
2022-09-28 16:56:59 +00:00
options , err := external . QueryOptionsFromContext ( ctx )
if err != nil {
return nil , status . Errorf ( codes . Internal , "error fetching options from context: %v" , err )
}
authz , err := s . ResolveToken ( options . Token )
2022-08-11 09:19:36 +00:00
if acl . IsErrNotFound ( err ) {
return nil , status . Errorf ( codes . Unauthenticated , "unauthenticated: %v" , err )
} else if acl . IsErrPermissionDenied ( err ) {
return nil , status . Error ( codes . PermissionDenied , err . Error ( ) )
} else if err != nil {
return nil , status . Errorf ( codes . Internal , "error resolving acl token: %v" , err )
}
return authz , nil
}
2021-08-13 15:53:19 +00:00
// authorize the xDS request using the token stored in ctx. This authorization is
// a bit different from most interfaces. Instead of explicitly authorizing or
// filtering each piece of data in the response, the request is authorized
// by checking the token has `service:write` for the service ID of the destination
// service (for kind=ConnectProxy), or the gateway service (for other kinds).
// This authorization strategy requires that agent/proxycfg only fetches data
// using a token with the same permissions, and that it stores the data by
// proxy ID. We assume that any data in the snapshot was already filtered,
// which allows this authorization to be a shallow authorization check
// for all the data in a ConfigSnapshot.
func ( s * Server ) authorize ( ctx context . Context , cfgSnap * proxycfg . ConfigSnapshot ) error {
2021-04-29 18:54:05 +00:00
if cfgSnap == nil {
return status . Errorf ( codes . Unauthenticated , "unauthenticated: no config snapshot" )
}
2022-08-11 09:19:36 +00:00
authz , err := s . authenticate ( ctx )
if err != nil {
return err
2021-04-29 18:54:05 +00:00
}
var authzContext acl . AuthorizerContext
switch cfgSnap . Kind {
case structs . ServiceKindConnectProxy :
cfgSnap . ProxyID . EnterpriseMeta . FillAuthzContext ( & authzContext )
2022-03-11 02:48:27 +00:00
if err := authz . ToAllowAuthorizer ( ) . ServiceWriteAllowed ( cfgSnap . Proxy . DestinationServiceName , & authzContext ) ; err != nil {
return status . Errorf ( codes . PermissionDenied , err . Error ( ) )
2021-04-29 18:54:05 +00:00
}
case structs . ServiceKindMeshGateway , structs . ServiceKindTerminatingGateway , structs . ServiceKindIngressGateway :
cfgSnap . ProxyID . EnterpriseMeta . FillAuthzContext ( & authzContext )
2022-03-11 02:48:27 +00:00
if err := authz . ToAllowAuthorizer ( ) . ServiceWriteAllowed ( cfgSnap . Service , & authzContext ) ; err != nil {
return status . Errorf ( codes . PermissionDenied , err . Error ( ) )
2021-04-29 18:54:05 +00:00
}
default :
return status . Errorf ( codes . Internal , "Invalid service kind" )
}
// Authed OK!
return nil
}