consul/agent/xds/server_test.go
R.B. Boyer 6a39b47448 Support Incremental xDS mode (#9855)
This adds support for the Incremental xDS protocol when using xDS v3. This is best reviewed commit-by-commit and will not be squashed when merged.

Union of all commit messages follows to give an overarching summary:

xds: exclusively support incremental xDS when using xDS v3

Attempts to use SoTW via v3 will fail, much like attempts to use incremental via v2 will fail.
Work around a strange older envoy behavior involving empty CDS responses over incremental xDS.
xds: various cleanups and refactors that don't strictly concern the addition of incremental xDS support

Dissolve the connectionInfo struct in favor of per-connection ResourceGenerators instead.
Do a better job of ensuring the xds code uses a well configured logger that accurately describes the connected client.
xds: pull out checkStreamACLs method in advance of a later commit

xds: rewrite SoTW xDS protocol tests to use protobufs rather than hand-rolled json strings

In the test we very lightly reuse some of the more boring protobuf construction helper code that is also technically under test. The important thing of the protocol tests is testing the protocol. The actual inputs and outputs are largely already handled by the xds golden output tests now so these protocol tests don't have to do double-duty.

This also updates the SoTW protocol test to exclusively use xDS v2 which is the only variant of SoTW that will be supported in Consul 1.10.

xds: default xds.Server.AuthCheckFrequency at use-time instead of construction-time
2021-04-29 18:54:53 +00:00

792 lines
25 KiB
Go

package xds
import (
"sync/atomic"
"testing"
"time"
envoy_api_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)
// NOTE: For these tests, prefer not using xDS protobuf "factory" methods if
// possible to avoid using them to test themselves.
//
// Stick to very straightforward stuff in xds_protocol_helpers_test.go.
func TestServer_StreamAggregatedResources_v2_BasicProtocol_TCP(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
// Send initial cluster discover (empty payload)
envoy.SendReq(t, ClusterType_v2, 0, 0)
// Check no response sent yet
assertChanBlocked(t, envoy.stream.sendCh)
// Deliver a new snapshot
snap := newTestSnapshot(t, nil, "")
mgr.DeliverConfig(t, sid, snap)
expectClusterResponse := func(v, n uint64) *envoy_api_v2.DiscoveryResponse {
return &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(v),
TypeUrl: ClusterType_v2,
Nonce: hexString(n),
Resources: makeTestResources_v2(t,
makeTestCluster_v2(t, snap, "tcp:local_app"),
makeTestCluster_v2(t, snap, "tcp:db"),
makeTestCluster_v2(t, snap, "tcp:geo-cache"),
),
}
}
expectEndpointResponse := func(v, n uint64) *envoy_api_v2.DiscoveryResponse {
return &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(v),
TypeUrl: EndpointType_v2,
Nonce: hexString(n),
Resources: makeTestResources_v2(t,
makeTestEndpoints_v2(t, snap, "tcp:db"),
makeTestEndpoints_v2(t, snap, "tcp:geo-cache"),
),
}
}
expectListenerResponse := func(v, n uint64) *envoy_api_v2.DiscoveryResponse {
return &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(v),
TypeUrl: ListenerType_v2,
Nonce: hexString(n),
Resources: makeTestResources_v2(t,
makeTestListener_v2(t, snap, "tcp:public_listener"),
makeTestListener_v2(t, snap, "tcp:db"),
makeTestListener_v2(t, snap, "tcp:geo-cache"),
),
}
}
assertResponseSent(t, envoy.stream.sendCh, expectClusterResponse(1, 1))
// Envoy then tries to discover endpoints for those clusters. Technically it
// includes the cluster names in the ResourceNames field but we ignore that
// completely for now so not bothering to simulate that.
envoy.SendReq(t, EndpointType_v2, 0, 0)
// It also (in parallel) issues the next cluster request (which acts as an ACK
// of the version we sent)
envoy.SendReq(t, ClusterType_v2, 1, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertResponseSent(t, envoy.stream.sendCh, expectEndpointResponse(1, 2))
// And no other response yet
assertChanBlocked(t, envoy.stream.sendCh)
// Envoy now sends listener request along with next endpoint one
envoy.SendReq(t, ListenerType_v2, 0, 0)
envoy.SendReq(t, EndpointType_v2, 1, 2)
// And should get a response immediately.
assertResponseSent(t, envoy.stream.sendCh, expectListenerResponse(1, 3))
// Now send Route request along with next listener one
envoy.SendReq(t, RouteType_v2, 0, 0)
envoy.SendReq(t, ListenerType_v2, 1, 3)
// We don't serve routes yet so this should block with no response
assertChanBlocked(t, envoy.stream.sendCh)
// WOOP! Envoy now has full connect config. Lets verify that if we update it,
// all the responses get resent with the new version. We don't actually want
// to change everything because that's tedious - our implementation will
// actually resend all blocked types on the new "version" anyway since it
// doesn't know _what_ changed. We could do something trivial but let's
// simulate a leaf cert expiring and being rotated.
snap.ConnectProxy.Leaf = proxycfg.TestLeafForCA(t, snap.Roots.Roots[0])
mgr.DeliverConfig(t, sid, snap)
// All 3 response that have something to return should return with new version
// note that the ordering is not deterministic in general. Trying to make this
// test order-agnostic though is a massive pain because we
// don't know the order the nonces will be assigned. For now we rely and
// require our implementation to always deliver updates in a specific order
// which is reasonable anyway to ensure consistency of the config Envoy sees.
assertResponseSent(t, envoy.stream.sendCh, expectClusterResponse(2, 4))
assertResponseSent(t, envoy.stream.sendCh, expectEndpointResponse(2, 5))
assertResponseSent(t, envoy.stream.sendCh, expectListenerResponse(2, 6))
// Let's pretend that Envoy doesn't like that new listener config. It will ACK
// all the others (same version) but NACK the listener. This is the most
// subtle part of xDS and the server implementation so I'll elaborate. A full
// description of the protocol can be found at
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol
// Envoy delays making a followup request for a type until after it has
// processed and applied the last response. The next request then will include
// the nonce in the last response which acknowledges _receiving_ and handling
// that response. It also includes the currently applied version. If all is
// good and it successfully applies the config, then the version in the next
// response will be the same version just sent. This is considered to be an
// ACK of that version for that type. If envoy fails to apply the config for
// some reason, it will still acknowledge that it received it (still return
// the responses nonce), but will show the previous version it's still using.
// This is considered a NACK. It's important that the server pay attention to
// the _nonce_ and not the version when deciding what to send otherwise a bad
// version that can't be applied in Envoy will cause a busy loop.
//
// In this case we are simulating that Envoy failed to apply the Listener
// response but did apply the other types so all get the new nonces, but
// listener stays on v1.
envoy.SendReq(t, ClusterType_v2, 2, 4)
envoy.SendReq(t, EndpointType_v2, 2, 5)
envoy.SendReq(t, ListenerType_v2, 1, 6)
// Even though we nacked, we should still NOT get then v2 listeners
// redelivered since nothing has changed.
assertChanBlocked(t, envoy.stream.sendCh)
// Change config again and make sure it's delivered to everyone!
snap.ConnectProxy.Leaf = proxycfg.TestLeafForCA(t, snap.Roots.Roots[0])
mgr.DeliverConfig(t, sid, snap)
assertResponseSent(t, envoy.stream.sendCh, expectClusterResponse(3, 7))
assertResponseSent(t, envoy.stream.sendCh, expectEndpointResponse(3, 8))
assertResponseSent(t, envoy.stream.sendCh, expectListenerResponse(3, 9))
envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}
func TestServer_StreamAggregatedResources_v2_BasicProtocol_HTTP(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
// Send initial cluster discover (empty payload)
envoy.SendReq(t, ClusterType_v2, 0, 0)
// Check no response sent yet
assertChanBlocked(t, envoy.stream.sendCh)
// Deliver a new snapshot
// Deliver a new snapshot (tcp with one http upstream)
snap := newTestSnapshot(t, nil, "http2", &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "db",
Protocol: "http2",
})
mgr.DeliverConfig(t, sid, snap)
expectClusterResponse := func(v, n uint64) *envoy_api_v2.DiscoveryResponse {
return &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(v),
TypeUrl: ClusterType_v2,
Nonce: hexString(n),
Resources: makeTestResources_v2(t,
makeTestCluster_v2(t, snap, "tcp:local_app"),
makeTestCluster_v2(t, snap, "http2:db"),
makeTestCluster_v2(t, snap, "tcp:geo-cache"),
),
}
}
expectEndpointResponse := func(v, n uint64) *envoy_api_v2.DiscoveryResponse {
return &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(v),
TypeUrl: EndpointType_v2,
Nonce: hexString(n),
Resources: makeTestResources_v2(t,
makeTestEndpoints_v2(t, snap, "http2:db"),
makeTestEndpoints_v2(t, snap, "tcp:geo-cache"),
),
}
}
expectListenerResponse := func(v, n uint64) *envoy_api_v2.DiscoveryResponse {
return &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(v),
TypeUrl: ListenerType_v2,
Nonce: hexString(n),
Resources: makeTestResources_v2(t,
makeTestListener_v2(t, snap, "tcp:public_listener"),
makeTestListener_v2(t, snap, "http2:db"),
makeTestListener_v2(t, snap, "tcp:geo-cache"),
),
}
}
runStep(t, "no-rds", func(t *testing.T) {
// REQ: clusters
envoy.SendReq(t, ClusterType_v2, 0, 0)
// RESP: clusters
assertResponseSent(t, envoy.stream.sendCh, expectClusterResponse(1, 1))
assertChanBlocked(t, envoy.stream.sendCh)
// REQ: endpoints
envoy.SendReq(t, EndpointType_v2, 0, 0)
// ACK: clusters
envoy.SendReq(t, ClusterType_v2, 1, 1)
// RESP: endpoints
assertResponseSent(t, envoy.stream.sendCh, expectEndpointResponse(1, 2))
assertChanBlocked(t, envoy.stream.sendCh)
// REQ: listeners
envoy.SendReq(t, ListenerType_v2, 0, 0)
// ACK: endpoints
envoy.SendReq(t, EndpointType_v2, 1, 2)
// RESP: listeners
assertResponseSent(t, envoy.stream.sendCh, expectListenerResponse(1, 3))
assertChanBlocked(t, envoy.stream.sendCh)
// ACK: listeners
envoy.SendReq(t, ListenerType_v2, 1, 3)
assertChanBlocked(t, envoy.stream.sendCh)
})
// -- reconfigure with a no-op discovery chain
snap = newTestSnapshot(t, snap, "http2", &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "db",
Protocol: "http2",
}, &structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "db",
Routes: nil,
})
mgr.DeliverConfig(t, sid, snap)
// update this test helper to reflect the RDS-linked listener
expectListenerResponse = func(v, n uint64) *envoy_api_v2.DiscoveryResponse {
return &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(v),
TypeUrl: ListenerType_v2,
Nonce: hexString(n),
Resources: makeTestResources_v2(t,
makeTestListener_v2(t, snap, "tcp:public_listener"),
makeTestListener_v2(t, snap, "http2:db:rds"),
makeTestListener_v2(t, snap, "tcp:geo-cache"),
),
}
}
runStep(t, "with-rds", func(t *testing.T) {
// RESP: listeners (but also a stray update of the other registered types)
assertResponseSent(t, envoy.stream.sendCh, expectClusterResponse(2, 4))
assertResponseSent(t, envoy.stream.sendCh, expectEndpointResponse(2, 5))
assertResponseSent(t, envoy.stream.sendCh, expectListenerResponse(2, 6))
assertChanBlocked(t, envoy.stream.sendCh)
// ACK: listeners (but also stray ACKs of the other registered types)
envoy.SendReq(t, ClusterType_v2, 2, 4)
envoy.SendReq(t, EndpointType_v2, 2, 5)
envoy.SendReq(t, ListenerType_v2, 2, 6)
// REQ: routes
envoy.SendReq(t, RouteType_v2, 0, 0)
// RESP: routes
assertResponseSent(t, envoy.stream.sendCh, &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(2),
TypeUrl: RouteType_v2,
Nonce: hexString(7),
Resources: makeTestResources_v2(t,
makeTestRoute_v2(t, "http2:db"),
),
})
assertChanBlocked(t, envoy.stream.sendCh)
// ACK: routes
envoy.SendReq(t, RouteType_v2, 2, 7)
})
envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}
func TestServer_StreamAggregatedResources_v2_ACLEnforcement(t *testing.T) {
tests := []struct {
name string
defaultDeny bool
acl string
token string
wantDenied bool
cfgSnap *proxycfg.ConfigSnapshot
}{
// Note that although we've stubbed actual ACL checks in the testManager
// ConnectAuthorize mock, by asserting against specific reason strings here
// even in the happy case which can't match the default one returned by the
// mock we are implicitly validating that the implementation used the
// correct token from the context.
{
name: "no ACLs configured",
defaultDeny: false,
wantDenied: false,
},
{
name: "default deny, no token",
defaultDeny: true,
wantDenied: true,
},
{
name: "default deny, write token",
defaultDeny: true,
acl: `service "web" { policy = "write" }`,
token: "service-write-on-web",
wantDenied: false,
},
{
name: "default deny, read token",
defaultDeny: true,
acl: `service "web" { policy = "read" }`,
token: "service-write-on-web",
wantDenied: true,
},
{
name: "default deny, write token on different service",
defaultDeny: true,
acl: `service "not-web" { policy = "write" }`,
token: "service-write-on-not-web",
wantDenied: true,
},
{
name: "ingress default deny, write token on different service",
defaultDeny: true,
acl: `service "not-ingress" { policy = "write" }`,
token: "service-write-on-not-ingress",
wantDenied: true,
cfgSnap: proxycfg.TestConfigSnapshotIngressGateway(t),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
if !tt.defaultDeny {
// Allow all
return acl.RootAuthorizer("allow"), nil
}
if tt.acl == "" {
// No token and defaultDeny is denied
return acl.RootAuthorizer("deny"), nil
}
// Ensure the correct token was passed
require.Equal(t, tt.token, id)
// Parse the ACL and enforce it
policy, err := acl.NewPolicyFromSource("", 0, tt.acl, acl.SyntaxLegacy, nil, nil)
require.NoError(t, err)
return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
}
scenario := newTestServerScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
// Deliver a new snapshot
snap := tt.cfgSnap
if snap == nil {
snap = newTestSnapshot(t, nil, "")
}
mgr.DeliverConfig(t, sid, snap)
// Send initial listener discover, in real life Envoy always sends cluster
// first but it doesn't really matter and listener has a response that
// includes the token in the ext rbac filter so lets us test more stuff.
envoy.SendReq(t, ListenerType_v2, 0, 0)
if !tt.wantDenied {
assertResponseSent(t, envoy.stream.sendCh, &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(1),
TypeUrl: ListenerType_v2,
Nonce: hexString(1),
Resources: makeTestResources_v2(t,
makeTestListener_v2(t, snap, "tcp:public_listener"),
makeTestListener_v2(t, snap, "tcp:db"),
makeTestListener_v2(t, snap, "tcp:geo-cache"),
),
})
// Close the client stream since all is well. We _don't_ do this in the
// expected error case because we want to verify the error closes the
// stream from server side.
envoy.Close()
}
select {
case err := <-errCh:
if tt.wantDenied {
require.Error(t, err)
require.Contains(t, err.Error(), "permission denied")
mgr.AssertWatchCancelled(t, sid)
} else {
require.NoError(t, err)
}
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
})
}
}
func TestServer_StreamAggregatedResources_v2_ACLTokenDeleted_StreamTerminatedDuringDiscoveryRequest(t *testing.T) {
aclRules := `service "web" { policy = "write" }`
token := "service-write-on-web"
policy, err := acl.NewPolicyFromSource("", 0, aclRules, acl.SyntaxLegacy, nil, nil)
require.NoError(t, err)
var validToken atomic.Value
validToken.Store(token)
aclResolve := func(id string) (acl.Authorizer, error) {
if token := validToken.Load(); token == nil || id != token.(string) {
return nil, acl.ErrNotFound
}
return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
}
scenario := newTestServerScenario(t, aclResolve, "web-sidecar-proxy", token,
1*time.Hour, // make sure this doesn't kick in
)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
getError := func() (gotErr error, ok bool) {
select {
case err := <-errCh:
return err, true
default:
return nil, false
}
}
sid := structs.NewServiceID("web-sidecar-proxy", nil)
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
// Send initial cluster discover (OK)
envoy.SendReq(t, ClusterType_v2, 0, 0)
{
err, ok := getError()
require.NoError(t, err)
require.False(t, ok)
}
// Check no response sent yet
assertChanBlocked(t, envoy.stream.sendCh)
{
err, ok := getError()
require.NoError(t, err)
require.False(t, ok)
}
// Deliver a new snapshot
snap := newTestSnapshot(t, nil, "")
mgr.DeliverConfig(t, sid, snap)
assertResponseSent(t, envoy.stream.sendCh, &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(1),
TypeUrl: ClusterType_v2,
Nonce: hexString(1),
Resources: makeTestResources_v2(t,
makeTestCluster_v2(t, snap, "tcp:local_app"),
makeTestCluster_v2(t, snap, "tcp:db"),
makeTestCluster_v2(t, snap, "tcp:geo-cache"),
),
})
// Now nuke the ACL token.
validToken.Store("")
// It also (in parallel) issues the next cluster request (which acts as an ACK
// of the version we sent)
envoy.SendReq(t, ClusterType_v2, 1, 1)
select {
case err := <-errCh:
require.Error(t, err)
gerr, ok := status.FromError(err)
require.Truef(t, ok, "not a grpc status error: type='%T' value=%v", err, err)
require.Equal(t, codes.Unauthenticated, gerr.Code())
require.Equal(t, "unauthenticated: ACL not found", gerr.Message())
mgr.AssertWatchCancelled(t, sid)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}
func TestServer_StreamAggregatedResources_v2_ACLTokenDeleted_StreamTerminatedInBackground(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
aclRules := `service "web" { policy = "write" }`
token := "service-write-on-web"
policy, err := acl.NewPolicyFromSource("", 0, aclRules, acl.SyntaxLegacy, nil, nil)
require.NoError(t, err)
var validToken atomic.Value
validToken.Store(token)
aclResolve := func(id string) (acl.Authorizer, error) {
if token := validToken.Load(); token == nil || id != token.(string) {
return nil, acl.ErrNotFound
}
return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
}
scenario := newTestServerScenario(t, aclResolve, "web-sidecar-proxy", token,
100*time.Millisecond, // Make this short.
)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
getError := func() (gotErr error, ok bool) {
select {
case err := <-errCh:
return err, true
default:
return nil, false
}
}
sid := structs.NewServiceID("web-sidecar-proxy", nil)
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
// Send initial cluster discover (OK)
envoy.SendReq(t, ClusterType_v2, 0, 0)
{
err, ok := getError()
require.NoError(t, err)
require.False(t, ok)
}
// Check no response sent yet
assertChanBlocked(t, envoy.stream.sendCh)
{
err, ok := getError()
require.NoError(t, err)
require.False(t, ok)
}
// Deliver a new snapshot
snap := newTestSnapshot(t, nil, "")
mgr.DeliverConfig(t, sid, snap)
assertResponseSent(t, envoy.stream.sendCh, &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(1),
TypeUrl: ClusterType_v2,
Nonce: hexString(1),
Resources: makeTestResources_v2(t,
makeTestCluster_v2(t, snap, "tcp:local_app"),
makeTestCluster_v2(t, snap, "tcp:db"),
makeTestCluster_v2(t, snap, "tcp:geo-cache"),
),
})
// It also (in parallel) issues the next cluster request (which acts as an ACK
// of the version we sent)
envoy.SendReq(t, ClusterType_v2, 1, 1)
// Check no response sent yet
assertChanBlocked(t, envoy.stream.sendCh)
{
err, ok := getError()
require.NoError(t, err)
require.False(t, ok)
}
// Now nuke the ACL token while there's no activity.
validToken.Store("")
select {
case err := <-errCh:
require.Error(t, err)
gerr, ok := status.FromError(err)
require.Truef(t, ok, "not a grpc status error: type='%T' value=%v", err, err)
require.Equal(t, codes.Unauthenticated, gerr.Code())
require.Equal(t, "unauthenticated: ACL not found", gerr.Message())
mgr.AssertWatchCancelled(t, sid)
case <-time.After(200 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}
func TestServer_StreamAggregatedResources_v2_IngressEmptyResponse(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerScenario(t, aclResolve, "ingress-gateway", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("ingress-gateway", nil)
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
// Send initial cluster discover
envoy.SendReq(t, ClusterType_v2, 0, 0)
// Check no response sent yet
assertChanBlocked(t, envoy.stream.sendCh)
// Deliver a new snapshot with no services
snap := proxycfg.TestConfigSnapshotIngressGatewayNoServices(t)
mgr.DeliverConfig(t, sid, snap)
emptyClusterResp := &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(1),
TypeUrl: ClusterType_v2,
Nonce: hexString(1),
}
emptyListenerResp := &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(1),
TypeUrl: ListenerType_v2,
Nonce: hexString(2),
}
emptyRouteResp := &envoy_api_v2.DiscoveryResponse{
VersionInfo: hexString(1),
TypeUrl: RouteType_v2,
Nonce: hexString(3),
}
assertResponseSent(t, envoy.stream.sendCh, emptyClusterResp)
// Send initial listener discover
envoy.SendReq(t, ListenerType_v2, 0, 0)
assertResponseSent(t, envoy.stream.sendCh, emptyListenerResp)
envoy.SendReq(t, RouteType_v2, 0, 0)
assertResponseSent(t, envoy.stream.sendCh, emptyRouteResp)
envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}
func assertChanBlocked(t *testing.T, ch chan *envoy_api_v2.DiscoveryResponse) {
t.Helper()
select {
case r := <-ch:
t.Fatalf("chan should block but received: %v", r)
case <-time.After(10 * time.Millisecond):
return
}
}
func assertResponseSent(t *testing.T, ch chan *envoy_api_v2.DiscoveryResponse, want *envoy_api_v2.DiscoveryResponse) {
t.Helper()
select {
case got := <-ch:
assertResponse(t, got, want)
case <-time.After(50 * time.Millisecond):
t.Fatalf("no response received after 50ms")
}
}
// assertResponse is a helper to test a envoy.DiscoveryResponse matches the
// expected value. We use JSON during comparison here because the responses use protobuf
// Any type which includes binary protobuf encoding.
func assertResponse(t *testing.T, got, want *envoy_api_v2.DiscoveryResponse) {
t.Helper()
gotJSON := protoToJSON(t, got)
wantJSON := protoToJSON(t, want)
require.JSONEqf(t, wantJSON, gotJSON, "got:\n%s", gotJSON)
}
func makeTestResources_v2(t *testing.T, resources ...proto.Message) []*any.Any {
var ret []*any.Any
for _, res := range resources {
any, err := ptypes.MarshalAny(res)
require.NoError(t, err)
ret = append(ret, any)
}
return ret
}
func makeTestListener_v2(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName string) *envoy_api_v2.Listener {
v3 := makeTestListener(t, snap, fixtureName)
v2, err := convertListenerToV2(v3)
require.NoError(t, err)
return v2
}
func makeTestCluster_v2(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName string) *envoy_api_v2.Cluster {
v3 := makeTestCluster(t, snap, fixtureName)
v2, err := convertClusterToV2(v3)
require.NoError(t, err)
return v2
}
func makeTestEndpoints_v2(t *testing.T, snap *proxycfg.ConfigSnapshot, fixtureName string) *envoy_api_v2.ClusterLoadAssignment {
v3 := makeTestEndpoints(t, snap, fixtureName)
v2, err := convertClusterLoadAssignmentToV2(v3)
require.NoError(t, err)
return v2
}
func makeTestRoute_v2(t *testing.T, fixtureName string) *envoy_api_v2.RouteConfiguration {
v3 := makeTestRoute(t, fixtureName)
v2, err := convertRouteConfigurationToV2(v3)
require.NoError(t, err)
return v2
}