consul/vendor/github.com/envoyproxy/go-control-plane/pkg/server/server.go

378 lines
11 KiB
Go

// Copyright 2018 Envoyproxy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package server provides an implementation of a streaming xDS server.
package server
import (
"context"
"errors"
"strconv"
"sync/atomic"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/envoyproxy/go-control-plane/envoy/api/v2"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/envoyproxy/go-control-plane/pkg/cache"
)
// Server is a collection of handlers for streaming discovery requests.
type Server interface {
v2.EndpointDiscoveryServiceServer
v2.ClusterDiscoveryServiceServer
v2.RouteDiscoveryServiceServer
v2.ListenerDiscoveryServiceServer
discovery.AggregatedDiscoveryServiceServer
// Fetch is the universal fetch method.
Fetch(context.Context, *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error)
}
// Callbacks is a collection of callbacks inserted into the server operation.
// The callbacks are invoked synchronously.
type Callbacks interface {
// OnStreamOpen is called once an xDS stream is open with a stream ID and the type URL (or "" for ADS).
OnStreamOpen(int64, string)
// OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
OnStreamClosed(int64)
// OnStreamRequest is called once a request is received on a stream.
OnStreamRequest(int64, *v2.DiscoveryRequest)
// OnStreamResponse is called immediately prior to sending a response on a stream.
OnStreamResponse(int64, *v2.DiscoveryRequest, *v2.DiscoveryResponse)
// OnFetchRequest is called for each Fetch request
OnFetchRequest(*v2.DiscoveryRequest)
// OnFetchResponse is called immediately prior to sending a response.
OnFetchResponse(*v2.DiscoveryRequest, *v2.DiscoveryResponse)
}
// NewServer creates handlers from a config watcher and an optional logger.
func NewServer(config cache.Cache, callbacks Callbacks) Server {
return &server{cache: config, callbacks: callbacks}
}
type server struct {
cache cache.Cache
callbacks Callbacks
// streamCount for counting bi-di streams
streamCount int64
}
type stream interface {
Send(*v2.DiscoveryResponse) error
Recv() (*v2.DiscoveryRequest, error)
}
// watches for all xDS resource types
type watches struct {
endpoints chan cache.Response
clusters chan cache.Response
routes chan cache.Response
listeners chan cache.Response
endpointCancel func()
clusterCancel func()
routeCancel func()
listenerCancel func()
endpointNonce string
clusterNonce string
routeNonce string
listenerNonce string
}
// Cancel all watches
func (values watches) Cancel() {
if values.endpointCancel != nil {
values.endpointCancel()
}
if values.clusterCancel != nil {
values.clusterCancel()
}
if values.routeCancel != nil {
values.routeCancel()
}
if values.listenerCancel != nil {
values.listenerCancel()
}
}
func createResponse(resp *cache.Response, typeURL string) (*v2.DiscoveryResponse, error) {
if resp == nil {
return nil, errors.New("missing response")
}
resources := make([]types.Any, len(resp.Resources))
for i := 0; i < len(resp.Resources); i++ {
data, err := proto.Marshal(resp.Resources[i])
if err != nil {
return nil, err
}
resources[i] = types.Any{
TypeUrl: typeURL,
Value: data,
}
}
out := &v2.DiscoveryResponse{
VersionInfo: resp.Version,
Resources: resources,
TypeUrl: typeURL,
}
return out, nil
}
// process handles a bi-di stream request
func (s *server) process(stream stream, reqCh <-chan *v2.DiscoveryRequest, defaultTypeURL string) error {
// increment stream count
streamID := atomic.AddInt64(&s.streamCount, 1)
// unique nonce generator for req-resp pairs per xDS stream; the server
// ignores stale nonces. nonce is only modified within send() function.
var streamNonce int64
// a collection of watches per request type
var values watches
defer func() {
values.Cancel()
if s.callbacks != nil {
s.callbacks.OnStreamClosed(streamID)
}
}()
// sends a response by serializing to protobuf Any
send := func(resp cache.Response, typeURL string) (string, error) {
out, err := createResponse(&resp, typeURL)
if err != nil {
return "", err
}
// increment nonce
streamNonce = streamNonce + 1
out.Nonce = strconv.FormatInt(streamNonce, 10)
if s.callbacks != nil {
s.callbacks.OnStreamResponse(streamID, &resp.Request, out)
}
return out.Nonce, stream.Send(out)
}
if s.callbacks != nil {
s.callbacks.OnStreamOpen(streamID, defaultTypeURL)
}
for {
select {
// config watcher can send the requested resources types in any order
case resp, more := <-values.endpoints:
if !more {
return status.Errorf(codes.Unavailable, "endpoints watch failed")
}
nonce, err := send(resp, cache.EndpointType)
if err != nil {
return err
}
values.endpointNonce = nonce
case resp, more := <-values.clusters:
if !more {
return status.Errorf(codes.Unavailable, "clusters watch failed")
}
nonce, err := send(resp, cache.ClusterType)
if err != nil {
return err
}
values.clusterNonce = nonce
case resp, more := <-values.routes:
if !more {
return status.Errorf(codes.Unavailable, "routes watch failed")
}
nonce, err := send(resp, cache.RouteType)
if err != nil {
return err
}
values.routeNonce = nonce
case resp, more := <-values.listeners:
if !more {
return status.Errorf(codes.Unavailable, "listeners watch failed")
}
nonce, err := send(resp, cache.ListenerType)
if err != nil {
return err
}
values.listenerNonce = nonce
case req, more := <-reqCh:
// input stream ended or errored out
if !more {
return nil
}
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
// nonces can be reused across streams; we verify nonce only if nonce is not initialized
nonce := req.GetResponseNonce()
// type URL is required for ADS but is implicit for xDS
if defaultTypeURL == cache.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}
if s.callbacks != nil {
s.callbacks.OnStreamRequest(streamID, req)
}
// cancel existing watches to (re-)request a newer version
switch {
case req.TypeUrl == cache.EndpointType && (values.endpointNonce == "" || values.endpointNonce == nonce):
if values.endpointCancel != nil {
values.endpointCancel()
}
values.endpoints, values.endpointCancel = s.cache.CreateWatch(*req)
case req.TypeUrl == cache.ClusterType && (values.clusterNonce == "" || values.clusterNonce == nonce):
if values.clusterCancel != nil {
values.clusterCancel()
}
values.clusters, values.clusterCancel = s.cache.CreateWatch(*req)
case req.TypeUrl == cache.RouteType && (values.routeNonce == "" || values.routeNonce == nonce):
if values.routeCancel != nil {
values.routeCancel()
}
values.routes, values.routeCancel = s.cache.CreateWatch(*req)
case req.TypeUrl == cache.ListenerType && (values.listenerNonce == "" || values.listenerNonce == nonce):
if values.listenerCancel != nil {
values.listenerCancel()
}
values.listeners, values.listenerCancel = s.cache.CreateWatch(*req)
}
}
}
}
// handler converts a blocking read call to channels and initiates stream processing
func (s *server) handler(stream stream, typeURL string) error {
// a channel for receiving incoming requests
reqCh := make(chan *v2.DiscoveryRequest)
reqStop := int32(0)
go func() {
for {
req, err := stream.Recv()
if atomic.LoadInt32(&reqStop) != 0 {
return
}
if err != nil {
close(reqCh)
return
}
reqCh <- req
}
}()
err := s.process(stream, reqCh, typeURL)
// prevents writing to a closed channel if send failed on blocked recv
// TODO(kuat) figure out how to unblock recv through gRPC API
atomic.StoreInt32(&reqStop, 1)
return err
}
func (s *server) StreamAggregatedResources(stream discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
return s.handler(stream, cache.AnyType)
}
func (s *server) StreamEndpoints(stream v2.EndpointDiscoveryService_StreamEndpointsServer) error {
return s.handler(stream, cache.EndpointType)
}
func (s *server) StreamClusters(stream v2.ClusterDiscoveryService_StreamClustersServer) error {
return s.handler(stream, cache.ClusterType)
}
func (s *server) StreamRoutes(stream v2.RouteDiscoveryService_StreamRoutesServer) error {
return s.handler(stream, cache.RouteType)
}
func (s *server) StreamListeners(stream v2.ListenerDiscoveryService_StreamListenersServer) error {
return s.handler(stream, cache.ListenerType)
}
// Fetch is the universal fetch method.
func (s *server) Fetch(ctx context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) {
if s.callbacks != nil {
s.callbacks.OnFetchRequest(req)
}
resp, err := s.cache.Fetch(ctx, *req)
if err != nil {
return nil, err
}
out, err := createResponse(resp, req.TypeUrl)
if s.callbacks != nil {
s.callbacks.OnFetchResponse(req, out)
}
return out, err
}
func (s *server) FetchEndpoints(ctx context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) {
if req == nil {
return nil, status.Errorf(codes.Unavailable, "empty request")
}
req.TypeUrl = cache.EndpointType
return s.Fetch(ctx, req)
}
func (s *server) FetchClusters(ctx context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) {
if req == nil {
return nil, status.Errorf(codes.Unavailable, "empty request")
}
req.TypeUrl = cache.ClusterType
return s.Fetch(ctx, req)
}
func (s *server) FetchRoutes(ctx context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) {
if req == nil {
return nil, status.Errorf(codes.Unavailable, "empty request")
}
req.TypeUrl = cache.RouteType
return s.Fetch(ctx, req)
}
func (s *server) FetchListeners(ctx context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) {
if req == nil {
return nil, status.Errorf(codes.Unavailable, "empty request")
}
req.TypeUrl = cache.ListenerType
return s.Fetch(ctx, req)
}
func (s *server) IncrementalAggregatedResources(_ discovery.AggregatedDiscoveryService_IncrementalAggregatedResourcesServer) error {
return errors.New("not implemented")
}
func (s *server) IncrementalClusters(_ v2.ClusterDiscoveryService_IncrementalClustersServer) error {
return errors.New("not implemented")
}
func (s *server) IncrementalRoutes(_ v2.RouteDiscoveryService_IncrementalRoutesServer) error {
return errors.New("not implemented")
}