package xds import ( "context" "errors" "fmt" "log" "sync/atomic" "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoyauthz "github.com/envoyproxy/go-control-plane/envoy/service/auth/v2alpha" envoydisco "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" "github.com/gogo/googleapis/google/rpc" "github.com/gogo/protobuf/proto" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" ) // ADSStream is a shorter way of referring to this thing... type ADSStream = envoydisco.AggregatedDiscoveryService_StreamAggregatedResourcesServer const ( // Resource types in xDS v2. These are copied from // envoyproxy/go-control-plane/pkg/cache/resource.go since we don't need any of // the rest of that package. typePrefix = "type.googleapis.com/envoy.api.v2." // EndpointType is the TypeURL for Endpoint discovery responses. EndpointType = typePrefix + "ClusterLoadAssignment" // ClusterType is the TypeURL for Cluster discovery responses. ClusterType = typePrefix + "Cluster" // RouteType is the TypeURL for Route discovery responses. RouteType = typePrefix + "RouteConfiguration" // ListenerType is the TypeURL for Listener discovery responses. ListenerType = typePrefix + "Listener" // PublicListenerName is the name we give the public listener in Envoy config. PublicListenerName = "public_listener" // LocalAppClusterName is the name we give the local application "cluster" in // Envoy config. LocalAppClusterName = "local_app" // LocalAgentClusterName is the name we give the local agent "cluster" in // Envoy config. LocalAgentClusterName = "local_agent" // DefaultAuthCheckFrequency is the default value for // Server.AuthCheckFrequency to use when the zero value is provided. DefaultAuthCheckFrequency = 5 * time.Minute ) // ACLResolverFunc is a shim to resolve ACLs. Since ACL enforcement is so far // entirely agent-local and all uses private methods this allows a simple shim // to be written in the agent package to allow resolving without tightly // coupling this to the agent. type ACLResolverFunc func(id string) (acl.Authorizer, error) // ConnectAuthz is the interface the agent needs to expose to be able to re-use // the authorization logic between both APIs. type ConnectAuthz interface { // ConnectAuthorize is implemented by Agent.ConnectAuthorize ConnectAuthorize(token string, req *structs.ConnectAuthorizeRequest) (authz bool, reason string, m *cache.ResultMeta, err error) } // ConfigManager is the interface xds.Server requires to consume proxy config // updates. It's satisfied normally by the agent's proxycfg.Manager, but allows // easier testing without several layers of mocked cache, local state and // proxycfg.Manager. type ConfigManager interface { Watch(proxyID string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc) } // Server represents a gRPC server that can handle both XDS and ext_authz // requests from Envoy. All of it's public members must be set before the gRPC // server is started. // // A full description of the XDS protocol can be found at // https://github.com/envoyproxy/data-plane-api/blob/master/XDS_PROTOCOL.md type Server struct { Logger *log.Logger CfgMgr ConfigManager Authz ConnectAuthz ResolveToken ACLResolverFunc // AuthCheckFrequency is how often we should re-check the credentials used // during a long-lived gRPC Stream after it has been initially established. // This is only used during idle periods of stream interactions (i.e. when // there has been no recent DiscoveryRequest). AuthCheckFrequency time.Duration } // Initialize will finish configuring the Server for first use. func (s *Server) Initialize() { if s.AuthCheckFrequency == 0 { s.AuthCheckFrequency = DefaultAuthCheckFrequency } } // StreamAggregatedResources implements // envoydisco.AggregatedDiscoveryServiceServer. This is the ADS endpoint which is // the only xDS API we directly support for now. func (s *Server) StreamAggregatedResources(stream ADSStream) error { // a channel for receiving incoming requests reqCh := make(chan *envoy.DiscoveryRequest) reqStop := int32(0) go func() { for { req, err := stream.Recv() if atomic.LoadInt32(&reqStop) != 0 { return } if err != nil { close(reqCh) return } reqCh <- req } }() err := s.process(stream, reqCh) if err != nil { s.Logger.Printf("[DEBUG] Error handling ADS stream: %s", err) } // prevents writing to a closed channel if send failed on blocked recv atomic.StoreInt32(&reqStop, 1) return err } const ( stateInit int = iota statePendingInitialConfig stateRunning ) func (s *Server) process(stream ADSStream, reqCh <-chan *envoy.DiscoveryRequest) error { // xDS requires a unique nonce to correlate response/request pairs var nonce uint64 // xDS works with versions of configs. Internally we don't have a consistent // version. We could just hash the config since versions don't have to be // ordered as far as I can tell, but it's cheaper just to increment a counter // every time we observe a new config since the upstream proxycfg package only // delivers updates when there are actual changes. var configVersion uint64 // Loop state var cfgSnap *proxycfg.ConfigSnapshot var req *envoy.DiscoveryRequest var ok bool var stateCh <-chan *proxycfg.ConfigSnapshot var watchCancel func() var proxyID string // need to run a small state machine to get through initial authentication. var state = stateInit // Configure handlers for each type of request handlers := map[string]*xDSType{ EndpointType: &xDSType{ typeURL: EndpointType, resources: endpointsFromSnapshot, stream: stream, }, ClusterType: &xDSType{ typeURL: ClusterType, resources: clustersFromSnapshot, stream: stream, }, RouteType: &xDSType{ typeURL: RouteType, resources: routesFromSnapshot, stream: stream, }, ListenerType: &xDSType{ typeURL: ListenerType, resources: listenersFromSnapshot, stream: stream, }, } var authTimer <-chan time.Time extendAuthTimer := func() { authTimer = time.After(s.AuthCheckFrequency) } checkStreamACLs := func(cfgSnap *proxycfg.ConfigSnapshot) error { if cfgSnap == nil { return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot") } token := tokenFromStream(stream) rule, err := s.ResolveToken(token) if acl.IsErrNotFound(err) { return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err) } else if acl.IsErrPermissionDenied(err) { return status.Errorf(codes.PermissionDenied, "permission denied: %v", err) } else if err != nil { return err } if rule != nil && !rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, nil) { return status.Errorf(codes.PermissionDenied, "permission denied") } // Authed OK! return nil } for { select { case <-authTimer: // It's been too long since a Discovery{Request,Response} so recheck ACLs. if err := checkStreamACLs(cfgSnap); err != nil { return err } extendAuthTimer() case req, ok = <-reqCh: if !ok { // reqCh is closed when stream.Recv errors which is how we detect client // going away. AFAICT the stream.Context() is only cancelled once the // RPC method returns which it can't until we return from this one so // there's no point in blocking on that. return nil } if req.TypeUrl == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } if handler, ok := handlers[req.TypeUrl]; ok { handler.Recv(req) } case cfgSnap = <-stateCh: // We got a new config, update the version counter configVersion++ } // Trigger state machine switch state { case stateInit: if req == nil { // This can't happen (tm) since stateCh is nil until after the first req // is received but lets not panic about it. continue } // Start authentication process, we need the proxyID proxyID = req.Node.Id // Start watching config for that proxy stateCh, watchCancel = s.CfgMgr.Watch(proxyID) // Note that in this case we _intend_ the defer to only be triggered when // this whole process method ends (i.e. when streaming RPC aborts) not at // the end of the current loop iteration. We have to do it in the loop // here since we can't start watching until we get to this state in the // state machine. defer watchCancel() // Now wait for the config so we can check ACL state = statePendingInitialConfig case statePendingInitialConfig: if cfgSnap == nil { // Nothing we can do until we get the initial config continue } // Got config, try to authenticate next. state = stateRunning // Lets actually process the config we just got or we'll mis responding fallthrough case stateRunning: // Check ACLs on every Discovery{Request,Response}. if err := checkStreamACLs(cfgSnap); err != nil { return err } // For the first time through the state machine, this is when the // timer is first started. extendAuthTimer() // See if any handlers need to have the current (possibly new) config // sent. Note the order here is actually significant so we can't just // range the map which has no determined order. It's important because: // // 1. Envoy needs to see a consistent snapshot to avoid potentially // dropping traffic due to inconsistencies. This is the // main win of ADS after all - we get to control this order. // 2. Non-determinsic order of complex protobuf responses which are // compared for non-exact JSON equivalence makes the tests uber-messy // to handle for _, typeURL := range []string{ClusterType, EndpointType, RouteType, ListenerType} { handler := handlers[typeURL] if err := handler.SendIfNew(cfgSnap, configVersion, &nonce); err != nil { return err } } } } } type xDSType struct { typeURL string stream ADSStream req *envoy.DiscoveryRequest lastNonce string // lastVersion is the version that was last sent to the proxy. It is needed // because we don't want to send the same version more than once. // req.VersionInfo may be an older version than the most recent once sent in // two cases: 1) if the ACK wasn't received yet and `req` still points to the // previous request we already responded to and 2) if the proxy rejected the // last version we sent with a Nack then req.VersionInfo will be the older // version it's hanging on to. lastVersion uint64 resources func(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) } func (t *xDSType) Recv(req *envoy.DiscoveryRequest) { if t.lastNonce == "" || t.lastNonce == req.GetResponseNonce() { t.req = req } } func (t *xDSType) SendIfNew(cfgSnap *proxycfg.ConfigSnapshot, version uint64, nonce *uint64) error { if t.req == nil { return nil } if t.lastVersion >= version { // Already sent this version return nil } resources, err := t.resources(cfgSnap, tokenFromStream(t.stream)) if err != nil { return err } if resources == nil || len(resources) == 0 { // Nothing to send yet return nil } // Note we only increment nonce when we actually send - not important for // correctness but makes tests much simpler when we skip a type like Routes // with nothing to send. *nonce++ nonceStr := fmt.Sprintf("%08x", *nonce) versionStr := fmt.Sprintf("%08x", version) resp, err := createResponse(t.typeURL, versionStr, nonceStr, resources) if err != nil { return err } err = t.stream.Send(resp) if err != nil { return err } t.lastVersion = version t.lastNonce = nonceStr return nil } func tokenFromStream(stream ADSStream) string { return tokenFromContext(stream.Context()) } func tokenFromContext(ctx context.Context) string { md, ok := metadata.FromIncomingContext(ctx) if !ok { return "" } toks, ok := md["x-consul-token"] if ok && len(toks) > 0 { return toks[0] } return "" } // IncrementalAggregatedResources implements envoydisco.AggregatedDiscoveryServiceServer func (s *Server) IncrementalAggregatedResources(_ envoydisco.AggregatedDiscoveryService_IncrementalAggregatedResourcesServer) error { return errors.New("not implemented") } func deniedResponse(reason string) (*envoyauthz.CheckResponse, error) { return &envoyauthz.CheckResponse{ Status: &rpc.Status{ Code: int32(rpc.PERMISSION_DENIED), Message: "Denied: " + reason, }, }, nil } // Check implements envoyauthz.AuthorizationServer. func (s *Server) Check(ctx context.Context, r *envoyauthz.CheckRequest) (*envoyauthz.CheckResponse, error) { // Sanity checks if r.Attributes == nil || r.Attributes.Source == nil || r.Attributes.Destination == nil { return nil, status.Error(codes.InvalidArgument, "source and destination attributes are required") } if r.Attributes.Source.Principal == "" || r.Attributes.Destination.Principal == "" { return nil, status.Error(codes.InvalidArgument, "source and destination Principal are required") } // Parse destination to know the target service dest, err := connect.ParseCertURIFromString(r.Attributes.Destination.Principal) if err != nil { // Treat this as an auth error since Envoy has sent something it considers // valid, it's just not an identity we trust. return deniedResponse("Destination Principal is not a valid Connect identity") } destID, ok := dest.(*connect.SpiffeIDService) if !ok { return deniedResponse("Destination Principal is not a valid Service identity") } // For now we don't validate the trust domain of the _destination_ at all - // the HTTP Authorize endpoint just accepts a target _service_ and it's // implicit that the request is for the correct cluster. We might want to // reconsider this later but plumbing in additional machinery to check the // clusterID here is not really necessary for now unless Envoys are badly // configured. Our threat model _requires_ correctly configured and well // behaved proxies given that they have ACLs to fetch certs and so can do // whatever they want including not authorizing traffic at all or routing it // do a different service than they auth'd against. // Create an authz request req := &structs.ConnectAuthorizeRequest{ Target: destID.Service, ClientCertURI: r.Attributes.Source.Principal, // TODO(banks): need Envoy to support sending cert serial/hash to enforce // revocation later. } token := tokenFromContext(ctx) authed, reason, _, err := s.Authz.ConnectAuthorize(token, req) if err != nil { if err == acl.ErrPermissionDenied { return nil, status.Error(codes.PermissionDenied, err.Error()) } return nil, status.Error(codes.Internal, err.Error()) } if !authed { return deniedResponse(reason) } return &envoyauthz.CheckResponse{ Status: &rpc.Status{ Code: int32(rpc.OK), Message: "ALLOWED: " + reason, }, }, nil } // GRPCServer returns a server instance that can handle XDS and ext_authz // requests. func (s *Server) GRPCServer(certFile, keyFile string) (*grpc.Server, error) { opts := []grpc.ServerOption{ grpc.MaxConcurrentStreams(2048), } if certFile != "" && keyFile != "" { creds, err := credentials.NewServerTLSFromFile(certFile, keyFile) if err != nil { return nil, err } opts = append(opts, grpc.Creds(creds)) } srv := grpc.NewServer(opts...) envoydisco.RegisterAggregatedDiscoveryServiceServer(srv, s) envoyauthz.RegisterAuthorizationServer(srv, s) return srv, nil }