consul/agent/xds/testing.go
R.B. Boyer d2c0945f52
xds: fix for delta xDS reconnect bug in LDS/CDS (#12174)
When a wildcard xDS type (LDS/CDS/SRDS) reconnects from a delta xDS stream,
prior to envoy `1.19.0` it would populate the `ResourceNamesSubscribe` field
with the full list of currently subscribed items, instead of simply omitting it
to infer that it wanted everything (which is what wildcard mode means).

This upstream issue was filed in envoyproxy/envoy#16063 and fixed in
envoyproxy/envoy#16153 which went out in Envoy `1.19.0` and is fixed in later
versions (later refactored in envoyproxy/envoy#16855).

This PR conditionally forces LDS/CDS to be wildcard-only even when the
connected Envoy requests a non-wildcard subscription, but only does so on
versions prior to `1.19.0`, as we should not need to do this on later versions.

This fixes the failure case as described here: #11833 (comment)

Co-authored-by: Huan Wang <fredwanghuan@gmail.com>
2022-01-25 11:24:27 -06:00

274 lines
6.1 KiB
Go

package xds
import (
"context"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/mitchellh/go-testing-interface"
status "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"github.com/hashicorp/consul/agent/xds/proxysupport"
)
// TestADSDeltaStream mocks
// discovery.AggregatedDiscoveryService_DeltaAggregatedResourcesServer to allow
// testing the ADS handler.
type TestADSDeltaStream struct {
stubGrpcServerStream
sendCh chan *envoy_discovery_v3.DeltaDiscoveryResponse
recvCh chan *envoy_discovery_v3.DeltaDiscoveryRequest
mu sync.Mutex
sendErr error
}
var _ ADSDeltaStream = (*TestADSDeltaStream)(nil)
func NewTestADSDeltaStream(t testing.T, ctx context.Context) *TestADSDeltaStream {
s := &TestADSDeltaStream{
sendCh: make(chan *envoy_discovery_v3.DeltaDiscoveryResponse, 1),
recvCh: make(chan *envoy_discovery_v3.DeltaDiscoveryRequest, 1),
}
s.stubGrpcServerStream.ctx = ctx
return s
}
// Send implements ADSDeltaStream
func (s *TestADSDeltaStream) Send(r *envoy_discovery_v3.DeltaDiscoveryResponse) error {
s.mu.Lock()
err := s.sendErr
s.mu.Unlock()
if err != nil {
return err
}
s.sendCh <- r
return nil
}
func (s *TestADSDeltaStream) SetSendErr(err error) {
s.mu.Lock()
s.sendErr = err
s.mu.Unlock()
}
// Recv implements ADSDeltaStream
func (s *TestADSDeltaStream) Recv() (*envoy_discovery_v3.DeltaDiscoveryRequest, error) {
r := <-s.recvCh
if r == nil {
return nil, io.EOF
}
return r, nil
}
// TestEnvoy is a helper to simulate Envoy ADS requests.
type TestEnvoy struct {
mu sync.Mutex
ctx context.Context
cancel func()
proxyID string
token string
EnvoyVersion string
deltaStream *TestADSDeltaStream // Incremental v3
}
// NewTestEnvoy creates a TestEnvoy instance.
func NewTestEnvoy(t testing.T, proxyID, token string) *TestEnvoy {
ctx, cancel := context.WithCancel(context.Background())
// If a token is given, attach it to the context in the same way gRPC attaches
// metadata in calls and stream contexts.
if token != "" {
ctx = metadata.NewIncomingContext(ctx,
metadata.Pairs("x-consul-token", token))
}
return &TestEnvoy{
ctx: ctx,
cancel: cancel,
proxyID: proxyID,
token: token,
deltaStream: NewTestADSDeltaStream(t, ctx),
}
}
func hexString(v uint64) string {
if v == 0 {
return ""
}
return fmt.Sprintf("%08x", v)
}
func stringToEnvoyVersion(vs string) (*envoy_type_v3.SemanticVersion, bool) {
parts := strings.Split(vs, ".")
if len(parts) != 3 {
return nil, false
}
major, err := strconv.Atoi(parts[0])
if err != nil {
return nil, false
}
minor, err := strconv.Atoi(parts[1])
if err != nil {
return nil, false
}
patch, err := strconv.Atoi(parts[2])
if err != nil {
return nil, false
}
return &envoy_type_v3.SemanticVersion{
MajorNumber: uint32(major),
MinorNumber: uint32(minor),
Patch: uint32(patch),
}, true
}
func (e *TestEnvoy) SetSendErr(err error) {
e.deltaStream.SetSendErr(err)
}
// SendDeltaReq sends a delta request from the test server.
//
// NOTE: the input request is mutated before sending by injecting the node.
func (e *TestEnvoy) SendDeltaReq(
t testing.T,
typeURL string,
req *envoy_discovery_v3.DeltaDiscoveryRequest, // optional
) {
e.sendDeltaReq(t, typeURL, nil, req)
}
func (e *TestEnvoy) SendDeltaReqACK(
t testing.T,
typeURL string,
nonce uint64,
) {
e.sendDeltaReq(t, typeURL, &nonce, nil)
}
func (e *TestEnvoy) SendDeltaReqNACK(
t testing.T,
typeURL string,
nonce uint64,
errorDetail *status.Status,
) {
e.sendDeltaReq(t, typeURL, &nonce, &envoy_discovery_v3.DeltaDiscoveryRequest{
ErrorDetail: errorDetail,
})
}
func (e *TestEnvoy) sendDeltaReq(
t testing.T,
typeURL string,
nonce *uint64,
req *envoy_discovery_v3.DeltaDiscoveryRequest, // optional
) {
e.mu.Lock()
defer e.mu.Unlock()
stringVersion := e.EnvoyVersion
if stringVersion == "" {
stringVersion = proxysupport.EnvoyVersions[0]
}
ev, valid := stringToEnvoyVersion(stringVersion)
if !valid {
t.Fatal("envoy version is not valid: %s", stringVersion)
}
if req == nil {
req = &envoy_discovery_v3.DeltaDiscoveryRequest{}
}
if nonce != nil {
req.ResponseNonce = hexString(*nonce)
}
req.TypeUrl = typeURL
req.Node = &envoy_core_v3.Node{
Id: e.proxyID,
Cluster: e.proxyID,
UserAgentName: "envoy",
UserAgentVersionType: &envoy_core_v3.Node_UserAgentBuildVersion{
UserAgentBuildVersion: &envoy_core_v3.BuildVersion{
Version: ev,
},
},
}
select {
case e.deltaStream.recvCh <- req:
case <-time.After(50 * time.Millisecond):
t.Fatalf("send to delta stream blocked for too long")
}
}
// Close closes the client and cancels it's request context.
func (e *TestEnvoy) Close() error {
e.mu.Lock()
defer e.mu.Unlock()
// unblock the recv chans to simulate recv errors when client disconnects
if e.deltaStream != nil && e.deltaStream.recvCh != nil {
close(e.deltaStream.recvCh)
e.deltaStream = nil
}
if e.cancel != nil {
e.cancel()
}
return nil
}
type stubGrpcServerStream struct {
ctx context.Context
grpc.ServerStream
}
var _ grpc.ServerStream = (*stubGrpcServerStream)(nil)
// SetHeader implements grpc.ServerStream as part of ADSDeltaStream
func (s *stubGrpcServerStream) SetHeader(metadata.MD) error {
return nil
}
// SendHeader implements grpc.ServerStream as part of ADSDeltaStream
func (s *stubGrpcServerStream) SendHeader(metadata.MD) error {
return nil
}
// SetTrailer implements grpc.ServerStream as part of ADSDeltaStream
func (s *stubGrpcServerStream) SetTrailer(metadata.MD) {
}
// Context implements grpc.ServerStream as part of ADSDeltaStream
func (s *stubGrpcServerStream) Context() context.Context {
return s.ctx
}
// SendMsg implements grpc.ServerStream as part of ADSDeltaStream
func (s *stubGrpcServerStream) SendMsg(m interface{}) error {
return nil
}
// RecvMsg implements grpc.ServerStream as part of ADSDeltaStream
func (s *stubGrpcServerStream) RecvMsg(m interface{}) error {
return nil
}