xds: adding more delta protocol tests (#10398)

Fixes #10125
This commit is contained in:
R.B. Boyer 2021-06-15 15:21:07 -05:00 committed by GitHub
parent 3ee66b2e9a
commit 80c39f1083
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 283 additions and 138 deletions

View File

@ -1,6 +1,7 @@
package xds
import (
"errors"
"sync/atomic"
"testing"
"time"
@ -9,6 +10,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"
rpcstatus "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -17,16 +19,6 @@ import (
"github.com/hashicorp/consul/agent/structs"
)
/* TODO: Test scenarios
- initial resource versions
- removing resources
- nack
- unsubscribe
- error during handling causing retry
*/
// NOTE: For these tests, prefer not using xDS protobuf "factory" methods if
// possible to avoid using them to test themselves.
//
@ -45,132 +37,228 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
snap := newTestSnapshot(t, nil, "")
var snap *proxycfg.ConfigSnapshot
// Send initial cluster discover. We'll assume we are testing a partial
// reconnect and include some initial resource versions that will be
// cleaned up.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
InitialResourceVersions: mustMakeVersionMap(t,
makeTestCluster(t, snap, "tcp:geo-cache"),
),
runStep(t, "initial setup", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "")
// Send initial cluster discover. We'll assume we are testing a partial
// reconnect and include some initial resource versions that will be
// cleaned up.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
InitialResourceVersions: mustMakeVersionMap(t,
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
requireProtocolVersionGauge(t, scenario, "v3", 1)
// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)
})
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
runStep(t, "first sync", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
// SAME_AS_INITIAL_VERSION: makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
requireProtocolVersionGauge(t, scenario, "v3", 1)
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
// We'll assume we are testing a partial "reconnect"
InitialResourceVersions: mustMakeVersionMap(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
// "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
//
// Include "fake-endpoints" here to test subscribing to an unknown
// thing and have consul tell us there's no data for it.
"fake-endpoints",
},
})
// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
// SAME_AS_INITIAL_VERSION: makeTestCluster(t, snap, "tcp:geo-cache"),
),
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
// SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
makeTestListener(t, snap, "tcp:db"),
makeTestListener(t, snap, "tcp:geo-cache"),
),
})
// cleanup unused resources now that we've created/updated relevant things
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(4),
RemovedResources: []string{
"fake-endpoints", // correcting the errant subscription
},
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3)
// ACK the endpoint removal
envoy.SendDeltaReqACK(t, EndpointType, 4)
// If we re-subscribe to something even if there are no changes we get a
// fresh copy.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
envoy.SendDeltaReqACK(t, EndpointType, 5)
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
// We'll assume we are testing a partial "reconnect"
InitialResourceVersions: mustMakeVersionMap(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
// "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
//
// Include "fake-endpoints" here to test subscribing to an unknown
// thing and have consul tell us there's no data for it.
"fake-endpoints",
},
deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, svc, targetID string) {
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[svc][targetID] =
snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[svc][targetID][0:1]
}
runStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) {
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesUnsubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for DB
snap = newTestSnapshot(t, snap, "")
deleteAllButOneEndpoint(snap, "db", "db.default.dc1")
mgr.DeliverConfig(t, sid, snap)
// We never send an EDS reply about this change.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil)
runStep(t, "restore endpoint subscription", func(t *testing.T) {
// Fix the snapshot
snap = newTestSnapshot(t, snap, "")
mgr.DeliverConfig(t, sid, snap)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
// SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
// and fix the subscription
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(6),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
),
})
envoy.SendDeltaReqACK(t, EndpointType, 6)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
runStep(t, "simulate envoy NACKing an endpoint update", func(t *testing.T) {
// Trigger only an EDS update.
snap = newTestSnapshot(t, snap, "")
deleteAllButOneEndpoint(snap, "db", "db.default.dc1")
mgr.DeliverConfig(t, sid, snap)
// Envoy now sends listener request
envoy.SendDeltaReq(t, ListenerType, nil)
// Send envoy an EDS update.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(7),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db[0]"),
),
})
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil)
envoy.SendDeltaReqNACK(t, EndpointType, 7, &rpcstatus.Status{})
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
makeTestListener(t, snap, "tcp:db"),
makeTestListener(t, snap, "tcp:geo-cache"),
),
// Send it again.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(8),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db[0]"),
),
})
envoy.SendDeltaReqACK(t, EndpointType, 8)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
// cleanup unused resources now that we've created/updated relevant things
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(4),
RemovedResources: []string{
"fake-endpoints", // correcting the errant subscription
},
// NOTE: this has to be the last subtest since it kills the stream
runStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) {
// Force sends to fail
envoy.SetSendErr(errors.New("test error"))
// Trigger only an EDS update (flipping BACK to 2 endpoints in the LBassignment)
snap = newTestSnapshot(t, snap, "")
mgr.DeliverConfig(t, sid, snap)
})
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil)
// ACK the endpoint removal
envoy.SendDeltaReqACK(t, EndpointType, 4, true, nil)
// If we re-subscribe to something even if there are no changes we get a
// fresh copy.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
envoy.SendDeltaReqACK(t, EndpointType, 5, true, nil)
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// TODO(rb): test NACK
envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
// Envoy died.
expect := status.Errorf(codes.Unavailable,
"failed to send upsert reply for type %q: test error",
EndpointType)
require.EqualError(t, err, expect.Error())
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
@ -223,7 +311,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil)
envoy.SendDeltaReqACK(t, ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
@ -245,7 +333,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
envoy.SendDeltaReq(t, ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil)
envoy.SendDeltaReqACK(t, EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
@ -262,7 +350,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil)
envoy.SendDeltaReqACK(t, ListenerType, 3)
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -302,7 +390,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
})
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 4, true, nil)
envoy.SendDeltaReqACK(t, ListenerType, 4)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
@ -313,7 +401,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
),
})
envoy.SendDeltaReqACK(t, RouteType, 5, true, nil)
envoy.SendDeltaReqACK(t, RouteType, 5)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
@ -374,7 +462,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil)
envoy.SendDeltaReqACK(t, ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
@ -396,7 +484,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
envoy.SendDeltaReq(t, ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil)
envoy.SendDeltaReqACK(t, EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
@ -413,7 +501,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil)
envoy.SendDeltaReqACK(t, ListenerType, 3)
})
runStep(t, "trigger cluster update needing implicit endpoint replacements", func(t *testing.T) {
@ -436,7 +524,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
),
})
envoy.SendDeltaReqACK(t, ClusterType, 4, true, nil)
envoy.SendDeltaReqACK(t, ClusterType, 4)
// And we re-send the endpoints for the updated cluster after getting the
// ACK for the cluster.
@ -448,7 +536,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
// SAME makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
envoy.SendDeltaReqACK(t, EndpointType, 5, true, nil)
envoy.SendDeltaReqACK(t, EndpointType, 5)
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -516,7 +604,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil)
envoy.SendDeltaReqACK(t, ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
@ -538,7 +626,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
envoy.SendDeltaReq(t, ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2, true, nil)
envoy.SendDeltaReqACK(t, EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
@ -562,7 +650,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
})
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3, true, nil)
envoy.SendDeltaReqACK(t, ListenerType, 3)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
@ -573,7 +661,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
),
})
envoy.SendDeltaReqACK(t, RouteType, 4, true, nil)
envoy.SendDeltaReqACK(t, RouteType, 4)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
@ -611,10 +699,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
),
})
envoy.SendDeltaReqACK(t, ClusterType, 5, true, nil)
envoy.SendDeltaReqACK(t, ClusterType, 5)
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 6, true, nil)
envoy.SendDeltaReqACK(t, ListenerType, 6)
// The behaviors of Cluster updates triggering re-sends of Endpoint updates
// tested in TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints
@ -629,7 +717,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
),
})
envoy.SendDeltaReqACK(t, EndpointType, 7, true, nil)
envoy.SendDeltaReqACK(t, EndpointType, 7)
// THE ACTUAL THING WE CARE ABOUT: replaced route config
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
@ -640,7 +728,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
),
})
envoy.SendDeltaReqACK(t, RouteType, 8, true, nil)
envoy.SendDeltaReqACK(t, RouteType, 8)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
@ -1010,7 +1098,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACK: clusters
envoy.SendDeltaReqACK(t, ClusterType, 1, true, nil)
envoy.SendDeltaReqACK(t, ClusterType, 1)
// REQ: listeners
envoy.SendDeltaReq(t, ListenerType, nil)

View File

@ -30,6 +30,9 @@ type TestADSDeltaStream struct {
stubGrpcServerStream
sendCh chan *envoy_discovery_v3.DeltaDiscoveryResponse
recvCh chan *envoy_discovery_v3.DeltaDiscoveryRequest
mu sync.Mutex
sendErr error
}
var _ ADSDeltaStream = (*TestADSDeltaStream)(nil)
@ -45,10 +48,24 @@ func NewTestADSDeltaStream(t testing.T, ctx context.Context) *TestADSDeltaStream
// Send implements ADSDeltaStream
func (s *TestADSDeltaStream) Send(r *envoy_discovery_v3.DeltaDiscoveryResponse) error {
s.mu.Lock()
err := s.sendErr
s.mu.Unlock()
if err != nil {
return err
}
s.sendCh <- r
return nil
}
func (s *TestADSDeltaStream) SetSendErr(err error) {
s.mu.Lock()
s.sendErr = err
s.mu.Unlock()
}
// Recv implements ADSDeltaStream
func (s *TestADSDeltaStream) Recv() (*envoy_discovery_v3.DeltaDiscoveryRequest, error) {
r := <-s.recvCh
@ -65,6 +82,9 @@ type TestADSStream struct {
stubGrpcServerStream
sendCh chan *envoy_api_v2.DiscoveryResponse
recvCh chan *envoy_api_v2.DiscoveryRequest
mu sync.Mutex
sendErr error
}
// NewTestADSStream makes a new TestADSStream
@ -79,10 +99,24 @@ func NewTestADSStream(t testing.T, ctx context.Context) *TestADSStream {
// Send implements ADSStream
func (s *TestADSStream) Send(r *envoy_api_v2.DiscoveryResponse) error {
s.mu.Lock()
err := s.sendErr
s.mu.Unlock()
if err != nil {
return err
}
s.sendCh <- r
return nil
}
func (s *TestADSStream) SetSendErr(err error) {
s.mu.Lock()
s.sendErr = err
s.mu.Unlock()
}
// Recv implements ADSStream
func (s *TestADSStream) Recv() (*envoy_api_v2.DiscoveryRequest, error) {
r := <-s.recvCh
@ -197,6 +231,11 @@ func (e *TestEnvoy) SendReq(t testing.T, typeURL string, version, nonce uint64)
}
}
func (e *TestEnvoy) SetSendErr(err error) {
e.stream.SetSendErr(err)
e.deltaStream.SetSendErr(err)
}
// SendDeltaReq sends a delta request from the test server.
//
// NOTE: the input request is mutated before sending by injecting the node.
@ -207,19 +246,26 @@ func (e *TestEnvoy) SendDeltaReq(
) {
e.sendDeltaReq(t, typeURL, nil, req)
}
func (e *TestEnvoy) SendDeltaReqACK(
t testing.T,
typeURL string,
nonce uint64,
ack bool,
) {
e.sendDeltaReq(t, typeURL, &nonce, nil)
}
func (e *TestEnvoy) SendDeltaReqNACK(
t testing.T,
typeURL string,
nonce uint64,
errorDetail *status.Status,
) {
req := &envoy_discovery_v3.DeltaDiscoveryRequest{}
if !ack {
req.ErrorDetail = errorDetail
}
e.sendDeltaReq(t, typeURL, &nonce, req)
e.sendDeltaReq(t, typeURL, &nonce, &envoy_discovery_v3.DeltaDiscoveryRequest{
ErrorDetail: errorDetail,
})
}
func (e *TestEnvoy) sendDeltaReq(
t testing.T,
typeURL string,

View File

@ -481,6 +481,17 @@ func makeTestEndpoints(t *testing.T, _ *proxycfg.ConfigSnapshot, fixtureName str
},
},
}
case "tcp:db[0]":
return &envoy_endpoint_v3.ClusterLoadAssignment{
ClusterName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{
{
LbEndpoints: []*envoy_endpoint_v3.LbEndpoint{
xdsNewEndpointWithHealth("10.10.1.1", 8080, envoy_core_v3.HealthStatus_HEALTHY, 1),
},
},
},
}
case "http2:db", "http:db":
return &envoy_endpoint_v3.ClusterLoadAssignment{
ClusterName: "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",