// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package xds

import (
	"errors"
	"fmt"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"testing"
	"time"

	"github.com/hashicorp/consul/envoyextensions/xdscommon"

	"github.com/armon/go-metrics"
	envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
	envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
	envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
	"github.com/hashicorp/consul/acl"
	"github.com/hashicorp/consul/agent/grpc-external/limiter"
	"github.com/hashicorp/consul/agent/proxycfg"
	"github.com/hashicorp/consul/agent/structs"
	"github.com/hashicorp/consul/api"
	"github.com/hashicorp/consul/envoyextensions/extensioncommon"
	"github.com/hashicorp/consul/sdk/testutil"
	"github.com/hashicorp/consul/sdk/testutil/retry"
	"github.com/hashicorp/consul/version"
	"github.com/hashicorp/go-hclog"
	goversion "github.com/hashicorp/go-version"
	"github.com/stretchr/testify/require"
	rpcstatus "google.golang.org/genproto/googleapis/rpc/status"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/proto"
)

// NOTE: For these tests, prefer not using xDS protobuf "factory" methods if
// possible to avoid using them to test themselves.
//
// Stick to very straightforward stuff in xds_protocol_helpers_test.go.

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
	aclResolve := func(id string) (acl.Authorizer, error) {
		// Allow all
		return acl.RootAuthorizer("manage"), nil
	}
	scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
	mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

	sid := structs.NewServiceID("web-sidecar-proxy", nil)

	// Register the proxy to create state needed to Watch() on
	mgr.RegisterProxy(t, sid)

	var snap *proxycfg.ConfigSnapshot

	testutil.RunStep(t, "initial setup", func(t *testing.T) {
		snap = newTestSnapshot(t, nil, "",
			func(ns *structs.NodeService) {
				// Add extension for local proxy.
				ns.Proxy.EnvoyExtensions = []structs.EnvoyExtension{
					{
						Name: api.BuiltinLuaExtension,
						Arguments: map[string]interface{}{
							"ProxyType": "connect-proxy",
							"Listener":  "inbound",
							"Script":    "x = 0",
						},
					},
				}
			})

		// Send initial cluster discover. We'll assume we are testing a partial
		// reconnect and include some initial resource versions that will be
		// cleaned up.
		envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			InitialResourceVersions: mustMakeVersionMap(t,
				makeTestCluster(t, snap, "tcp:geo-cache"),
			),
		})

		// Check no response sent yet
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		requireProtocolVersionGauge(t, scenario, "v3", 1)

		// Deliver a new snapshot (tcp with one tcp upstream)
		mgr.DeliverConfig(t, sid, snap)
	})

	testutil.RunStep(t, "first sync", func(t *testing.T) {
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ClusterType,
			Nonce:   hexString(1),
			Resources: makeTestResources(t,
				makeTestCluster(t, snap, "tcp:local_app"),
				makeTestCluster(t, snap, "tcp:db"),
				// SAME_AS_INITIAL_VERSION: makeTestCluster(t, snap, "tcp:geo-cache"),
			),
		})

		// Envoy then tries to discover endpoints for those clusters.
		envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			// We'll assume we are testing a partial "reconnect"
			InitialResourceVersions: mustMakeVersionMap(t,
				makeTestEndpoints(t, snap, "tcp:geo-cache"),
			),
			ResourceNamesSubscribe: []string{
				"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
				// "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
				//
				// Include "fake-endpoints" here to test subscribing to an unknown
				// thing and have consul tell us there's no data for it.
				"fake-endpoints",
			},
		})

		// We should get a response immediately since the config is already present in
		// the server for endpoints. Note that this should not be racy if the server
		// is behaving well since the Cluster send above should be blocked until we
		// deliver a new config version.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.EndpointType,
			Nonce:   hexString(2),
			Resources: makeTestResources(t,
				makeTestEndpoints(t, snap, "tcp:db"),
				// SAME_AS_INITIAL_VERSION: makeTestEndpoints(t, snap, "tcp:geo-cache"),
				// SAME_AS_INITIAL_VERSION: "fake-endpoints",
			),
		})

		// After receiving the endpoints Envoy sends an ACK for the cluster
		envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// Envoy now sends listener request
		envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)

		// It also (in parallel) issues the endpoint ACK
		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)

		// And should get a response immediately.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ListenerType,
			Nonce:   hexString(3),
			Resources: makeTestResources(t,
				makeTestListener(t, snap, "tcp:public_listener"),
				makeTestListener(t, snap, "tcp:db"),
				makeTestListener(t, snap, "tcp:geo-cache"),
			),
		})

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// ACKs the listener
		envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)

		// If Envoy re-subscribes to something even if there are no changes we send a
		// fresh copy.
		envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			ResourceNamesSubscribe: []string{
				"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
			},
		})

		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.EndpointType,
			Nonce:   hexString(4),
			Resources: makeTestResources(t,
				makeTestEndpoints(t, snap, "tcp:geo-cache"),
			),
		})

		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		requireExtensionMetrics(t, scenario, api.BuiltinLuaExtension, sid, nil)
	})

	deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) {
		snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID] =
			snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1]
	}

	testutil.RunStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) {
		envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			ResourceNamesUnsubscribe: []string{
				"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
			},
		})

		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for db.
		snap = newTestSnapshot(t, snap, "", nil)
		deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
		mgr.DeliverConfig(t, sid, snap)

		// We never send an EDS reply about this change because Envoy is not subscribed to db anymore.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	})

	testutil.RunStep(t, "restore endpoint subscription", func(t *testing.T) {
		// Restore db's deleted endpoints by generating a new snapshot.
		snap = newTestSnapshot(t, snap, "", nil)
		mgr.DeliverConfig(t, sid, snap)

		// We never send an EDS reply about this change because Envoy is still not subscribed to db.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// When Envoy re-subscribes to db we send the endpoints for it.
		envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			ResourceNamesSubscribe: []string{
				"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
			},
		})
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.EndpointType,
			Nonce:   hexString(5),
			Resources: makeTestResources(t,
				makeTestEndpoints(t, snap, "tcp:db"),
			),
		})

		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5)

		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	})

	// NOTE: this has to be the last subtest since it kills the stream
	testutil.RunStep(t, "simulate an envoy error sending an update to envoy", func(t *testing.T) {
		// Force sends to fail
		envoy.SetSendErr(errors.New("test error"))

		// Trigger only an EDS update by deleting endpoints again.
		deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")

		// We never send any replies about this change because we died.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	})

	envoy.Close()
	select {
	case err := <-errCh:
		require.NoError(t, err)
	case <-time.After(50 * time.Millisecond):
		t.Fatalf("timed out waiting for handler to finish")
	}
}

func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
	aclResolve := func(id string) (acl.Authorizer, error) {
		// Allow all
		return acl.RootAuthorizer("manage"), nil
	}
	scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
	mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

	sid := structs.NewServiceID("web-sidecar-proxy", nil)

	// Register the proxy to create state needed to Watch() on
	mgr.RegisterProxy(t, sid)

	var snap *proxycfg.ConfigSnapshot

	testutil.RunStep(t, "initial setup", func(t *testing.T) {
		snap = newTestSnapshot(t, nil, "", nil)

		// Plug in a bad port for the public listener
		snap.Port = 1

		// Send initial cluster discover.
		envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})

		// Check no response sent yet
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		requireProtocolVersionGauge(t, scenario, "v3", 1)

		// Deliver a new snapshot (tcp with one tcp upstream)
		mgr.DeliverConfig(t, sid, snap)
	})

	testutil.RunStep(t, "simulate Envoy NACKing initial listener", func(t *testing.T) {
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ClusterType,
			Nonce:   hexString(1),
			Resources: makeTestResources(t,
				makeTestCluster(t, snap, "tcp:local_app"),
				makeTestCluster(t, snap, "tcp:db"),
				makeTestCluster(t, snap, "tcp:geo-cache"),
			),
		})

		// Envoy then tries to discover endpoints for those clusters.
		envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			ResourceNamesSubscribe: []string{
				"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
				"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
			},
		})

		// We should get a response immediately since the config is already present in
		// the server for endpoints. Note that this should not be racy if the server
		// is behaving well since the Cluster send above should be blocked until we
		// deliver a new config version.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.EndpointType,
			Nonce:   hexString(2),
			Resources: makeTestResources(t,
				makeTestEndpoints(t, snap, "tcp:db"),
				makeTestEndpoints(t, snap, "tcp:geo-cache"),
			),
		})

		// After receiving the endpoints Envoy sends an ACK for the clusters
		envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// Envoy now sends listener request
		envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)

		// It also (in parallel) issues the endpoint ACK
		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)

		// And should get a response immediately.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ListenerType,
			Nonce:   hexString(3),
			Resources: makeTestResources(t,
				// Response contains public_listener with port that Envoy can't bind to
				makeTestListener(t, snap, "tcp:bad_public_listener"),
				makeTestListener(t, snap, "tcp:db"),
				makeTestListener(t, snap, "tcp:geo-cache"),
			),
		})

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// Envoy NACKs the listener update due to the bad public listener
		envoy.SendDeltaReqNACK(t, xdscommon.ListenerType, 3, &rpcstatus.Status{})

		// Consul should not respond until a new snapshot is delivered
		// because the current snapshot is known to be bad.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	})

	testutil.RunStep(t, "simulate envoy NACKing a listener update", func(t *testing.T) {
		// Correct the port and deliver a new snapshot
		snap.Port = 9999
		mgr.DeliverConfig(t, sid, snap)

		// And should send a response immediately.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ListenerType,
			Nonce:   hexString(4),
			Resources: makeTestResources(t,
				// Send a public listener that Envoy will accept
				makeTestListener(t, snap, "tcp:public_listener"),
				makeTestListener(t, snap, "tcp:db"),
				makeTestListener(t, snap, "tcp:geo-cache"),
			),
		})

		// New listener is acked now
		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)

		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	})

	envoy.Close()
	select {
	case err := <-errCh:
		require.NoError(t, err)
	case <-time.After(50 * time.Millisecond):
		t.Fatalf("timed out waiting for handler to finish")
	}
}

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
	aclResolve := func(id string) (acl.Authorizer, error) {
		// Allow all
		return acl.RootAuthorizer("manage"), nil
	}
	scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
	mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

	sid := structs.NewServiceID("web-sidecar-proxy", nil)

	// Register the proxy to create state needed to Watch() on
	mgr.RegisterProxy(t, sid)

	// Send initial cluster discover (empty payload)
	envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)

	// Check no response sent yet
	assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

	// Deliver a new snapshot (tcp with one http upstream)
	snap := newTestSnapshot(t, nil, "http2", nil, &structs.ServiceConfigEntry{
		Kind:     structs.ServiceDefaults,
		Name:     "db",
		Protocol: "http2",
	})
	mgr.DeliverConfig(t, sid, snap)

	testutil.RunStep(t, "no-rds", func(t *testing.T) {
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ClusterType,
			Nonce:   hexString(1),
			Resources: makeTestResources(t,
				makeTestCluster(t, snap, "tcp:local_app"),
				makeTestCluster(t, snap, "http2:db"),
				makeTestCluster(t, snap, "tcp:geo-cache"),
			),
		})

		// Envoy then tries to discover endpoints for those clusters.
		envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			ResourceNamesSubscribe: []string{
				"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
				"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
			},
		})

		// We should get a response immediately since the config is already present in
		// the server for endpoints. Note that this should not be racy if the server
		// is behaving well since the Cluster send above should be blocked until we
		// deliver a new config version.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.EndpointType,
			Nonce:   hexString(2),
			Resources: makeTestResources(t,
				makeTestEndpoints(t, snap, "http2:db"),
				makeTestEndpoints(t, snap, "tcp:geo-cache"),
			),
		})

		// After receiving the endpoints Envoy sends an ACK for the clusters
		envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// Envoy now sends listener request
		envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)

		// It also (in parallel) issues the endpoint ACK
		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)

		// And should get a response immediately.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ListenerType,
			Nonce:   hexString(3),
			Resources: makeTestResources(t,
				makeTestListener(t, snap, "tcp:public_listener"),
				makeTestListener(t, snap, "http2:db"),
				makeTestListener(t, snap, "tcp:geo-cache"),
			),
		})

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// ACKs the listener
		envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	})

	// -- reconfigure with a no-op discovery chain

	snap = newTestSnapshot(t, snap, "http2", nil, &structs.ServiceConfigEntry{
		Kind:     structs.ServiceDefaults,
		Name:     "db",
		Protocol: "http2",
	}, &structs.ServiceRouterConfigEntry{
		Kind:   structs.ServiceRouter,
		Name:   "db",
		Routes: nil,
	})
	mgr.DeliverConfig(t, sid, snap)

	testutil.RunStep(t, "with-rds", func(t *testing.T) {
		// Just the "db" listener sees a change
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ListenerType,
			Nonce:   hexString(4),
			Resources: makeTestResources(t,
				makeTestListener(t, snap, "http2:db:rds"),
			),
		})

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// Envoy now sends routes request
		envoy.SendDeltaReq(t, xdscommon.RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			ResourceNamesSubscribe: []string{
				"db",
			},
		})

		// And should get a response immediately.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.RouteType,
			Nonce:   hexString(5),
			Resources: makeTestResources(t,
				makeTestRoute(t, "http2:db"),
			),
		})

		// After receiving the routes, Envoy sends acks back for the listener and routes.
		envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 4)
		envoy.SendDeltaReqACK(t, xdscommon.RouteType, 5)

		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	})

	envoy.Close()
	select {
	case err := <-errCh:
		require.NoError(t, err)
	case <-time.After(50 * time.Millisecond):
		t.Fatalf("timed out waiting for handler to finish")
	}
}

func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) {
	// This illustrates a scenario related to https://github.com/hashicorp/consul/issues/10563

	aclResolve := func(id string) (acl.Authorizer, error) {
		// Allow all
		return acl.RootAuthorizer("manage"), nil
	}
	scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
	server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy

	// This mutateFn causes any endpoint with a name containing "geo-cache" to be
	// omitted from the response while the hack is active.
	var slowHackDisabled uint32
	server.ResourceMapMutateFn = func(resourceMap *xdscommon.IndexedResources) {
		if atomic.LoadUint32(&slowHackDisabled) == 1 {
			return
		}
		if em, ok := resourceMap.Index[xdscommon.EndpointType]; ok {
			for k := range em {
				if strings.Contains(k, "geo-cache") {
					delete(em, k)
				}
			}
		}
	}

	sid := structs.NewServiceID("web-sidecar-proxy", nil)

	// Register the proxy to create state needed to Watch() on
	mgr.RegisterProxy(t, sid)

	var snap *proxycfg.ConfigSnapshot
	testutil.RunStep(t, "get into initial state", func(t *testing.T) {
		snap = newTestSnapshot(t, nil, "", nil)

		// Send initial cluster discover.
		envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})

		// Check no response sent yet
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		requireProtocolVersionGauge(t, scenario, "v3", 1)

		// Deliver a new snapshot (tcp with one tcp upstream)
		mgr.DeliverConfig(t, sid, snap)

		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ClusterType,
			Nonce:   hexString(1),
			Resources: makeTestResources(t,
				makeTestCluster(t, snap, "tcp:local_app"),
				makeTestCluster(t, snap, "tcp:db"),
				makeTestCluster(t, snap, "tcp:geo-cache"),
			),
		})

		// Envoy then tries to discover endpoints for those clusters.
		envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			ResourceNamesSubscribe: []string{
				"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
				"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
			},
		})

		// We should get a response immediately since the config is already present in
		// the server for endpoints. Note that this should not be racy if the server
		// is behaving well since the Cluster send above should be blocked until we
		// deliver a new config version.
		//
		// NOTE: we do NOT return back geo-cache yet
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.EndpointType,
			Nonce:   hexString(2),
			Resources: makeTestResources(t,
				makeTestEndpoints(t, snap, "tcp:db"),
				// makeTestEndpoints(t, snap, "tcp:geo-cache"),
			),
		})

		// After receiving the endpoints Envoy sends an ACK for the clusters.
		// Envoy aims to wait to receive endpoints before ACKing clusters,
		// but because it received an update for at least one of the clusters it cares about
		// then it will ACK despite not having received an update for all clusters.
		// This behavior was observed against Envoy v1.21 and v1.23.
		envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// Envoy now sends listener request
		envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)

		// It also (in parallel) issues the endpoint ACK
		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)

		// And should get a response immediately.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ListenerType,
			Nonce:   hexString(3),
			Resources: makeTestResources(t,
				makeTestListener(t, snap, "tcp:public_listener"),
				makeTestListener(t, snap, "tcp:db"),
				makeTestListener(t, snap, "tcp:geo-cache"),
			),
		})

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// ACKs the listener
		envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
	})

	// Disable hack. Need to wait for one more event to wake up the loop.
	atomic.StoreUint32(&slowHackDisabled, 1)

	testutil.RunStep(t, "delayed endpoint update finally comes in", func(t *testing.T) {
		// Trigger the xds.Server select{} to wake up and notice our hack is disabled.
		// The actual contents of this change are irrelevant.
		snap = newTestSnapshot(t, snap, "", nil)
		mgr.DeliverConfig(t, sid, snap)

		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.EndpointType,
			Nonce:   hexString(4),
			Resources: makeTestResources(t,
				makeTestEndpoints(t, snap, "tcp:geo-cache"),
			),
		})

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// It also (in parallel) issues the endpoint ACK
		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)

	})

	envoy.Close()
	select {
	case err := <-errCh:
		require.NoError(t, err)
	case <-time.After(50 * time.Millisecond):
		t.Fatalf("timed out waiting for handler to finish")
	}
}

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) {
	aclResolve := func(id string) (acl.Authorizer, error) {
		// Allow all
		return acl.RootAuthorizer("manage"), nil
	}
	scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
	mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

	sid := structs.NewServiceID("web-sidecar-proxy", nil)

	// Register the proxy to create state needed to Watch() on
	mgr.RegisterProxy(t, sid)

	var snap *proxycfg.ConfigSnapshot
	testutil.RunStep(t, "get into initial state", func(t *testing.T) {
		snap = newTestSnapshot(t, nil, "", nil)

		// Send initial cluster discover.
		envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})

		// Check no response sent yet
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		requireProtocolVersionGauge(t, scenario, "v3", 1)

		// Deliver a new snapshot (tcp with one tcp upstream)
		mgr.DeliverConfig(t, sid, snap)

		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ClusterType,
			Nonce:   hexString(1),
			Resources: makeTestResources(t,
				makeTestCluster(t, snap, "tcp:local_app"),
				makeTestCluster(t, snap, "tcp:db"),
				makeTestCluster(t, snap, "tcp:geo-cache"),
			),
		})

		// Envoy then tries to discover endpoints for those clusters.
		envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			ResourceNamesSubscribe: []string{
				"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
				"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
			},
		})

		// We should get a response immediately since the config is already present in
		// the server for endpoints. Note that this should not be racy if the server
		// is behaving well since the Cluster send above should be blocked until we
		// deliver a new config version.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.EndpointType,
			Nonce:   hexString(2),
			Resources: makeTestResources(t,
				makeTestEndpoints(t, snap, "tcp:db"),
				makeTestEndpoints(t, snap, "tcp:geo-cache"),
			),
		})

		// After receiving the endpoints Envoy sends an ACK for the clusters
		envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// Envoy now sends listener request
		envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)

		// It also (in parallel) issues the endpoint ACK
		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)

		// And should get a response immediately.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ListenerType,
			Nonce:   hexString(3),
			Resources: makeTestResources(t,
				makeTestListener(t, snap, "tcp:public_listener"),
				makeTestListener(t, snap, "tcp:db"),
				makeTestListener(t, snap, "tcp:geo-cache"),
			),
		})

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// ACKs the listener
		envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
	})

	testutil.RunStep(t, "trigger cluster update needing implicit endpoint replacements", func(t *testing.T) {
		// Update the snapshot in a way that causes a single cluster update.
		snap = newTestSnapshot(t, snap, "", nil, &structs.ServiceResolverConfigEntry{
			Kind:           structs.ServiceResolver,
			Name:           "db",
			ConnectTimeout: 1337 * time.Second,
		})
		mgr.DeliverConfig(t, sid, snap)

		// The cluster is updated
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ClusterType,
			Nonce:   hexString(4),
			Resources: makeTestResources(t,
				// SAME makeTestCluster(t, snap, "tcp:local_app"),
				makeTestCluster(t, snap, "tcp:db:timeout"),
				// SAME makeTestCluster(t, snap, "tcp:geo-cache"),
			),
		})

		// And we re-send the endpoints for the updated cluster after getting the
		// ACK for the cluster.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.EndpointType,
			Nonce:   hexString(5),
			Resources: makeTestResources(t,
				makeTestEndpoints(t, snap, "tcp:db"),
				// SAME makeTestEndpoints(t, snap, "tcp:geo-cache"),
			),
		})

		// Envoy then ACK's the clusters and the endpoints.
		envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 4)
		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5)

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	})

	envoy.Close()
	select {
	case err := <-errCh:
		require.NoError(t, err)
	case <-time.After(50 * time.Millisecond):
		t.Fatalf("timed out waiting for handler to finish")
	}
}

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangeBeforeEndpointAck(t *testing.T) {
	// This test ensures that the following race condition does not block indefinitely:
	// - Send update endpoints
	// - Send update cluster
	// - Recv ACK endpoints
	// - Recv ACK cluster
	// Prior to a bug fix, this would have resulted in the endpoints NOT existing in Envoy. This occurred because
	// the cluster update implicitly clears the endpoints in Envoy, but we would never re-send the endpoint data
	// to compensate for the loss because we would incorrectly ACK the invalid old endpoint hash. Since the
	// endpoint's hash did not actually change, they would not be resent.
	aclResolve := func(id string) (acl.Authorizer, error) {
		// Allow all
		return acl.RootAuthorizer("manage"), nil
	}
	scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
	mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

	sid := structs.NewServiceID("web-sidecar-proxy", nil)

	// Register the proxy to create state needed to Watch() on
	mgr.RegisterProxy(t, sid)

	var snap *proxycfg.ConfigSnapshot
	testutil.RunStep(t, "initial setup", func(t *testing.T) {
		snap = newTestSnapshot(t, nil, "", nil)

		// Send initial cluster discover.
		envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})

		// Check no response sent yet
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		requireProtocolVersionGauge(t, scenario, "v3", 1)

		// Deliver a new snapshot (tcp with one tcp upstream)
		mgr.DeliverConfig(t, sid, snap)

		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ClusterType,
			Nonce:   hexString(1),
			Resources: makeTestResources(t,
				makeTestCluster(t, snap, "tcp:local_app"),
				makeTestCluster(t, snap, "tcp:db"),
				makeTestCluster(t, snap, "tcp:geo-cache"),
			),
		})
	})

	var newSnap *proxycfg.ConfigSnapshot
	testutil.RunStep(t, "resend cluster immediately", func(t *testing.T) {
		// Deliver updated snapshot with new CA roots and leaf certificate. This will not be
		// sent to Envoy until the initial set of cluster message is ACKed.
		newSnap = newTestSnapshot(t, nil, "", nil)
		mgr.DeliverConfig(t, sid, newSnap)

		// Envoy then tries to discover endpoints for clusters.
		envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			ResourceNamesSubscribe: []string{
				"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
				"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
			},
		})

		// We should get a response immediately since the config is already present in
		// the server for endpoints. Note that this should not be racy if the server
		// is behaving well since the Cluster send above should be blocked until we
		// deliver a new config version.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.EndpointType,
			Nonce:   hexString(2),
			Resources: makeTestResources(t,
				makeTestEndpoints(t, snap, "tcp:db"),
				makeTestEndpoints(t, snap, "tcp:geo-cache"),
			),
		})

		// After receiving the endpoints Envoy sends an ACK for the clusters
		envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)

		// The updated cluster snapshot with new certificates is sent immediately
		// after the first is ACKed.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ClusterType,
			Nonce:   hexString(3),
			Resources: makeTestResources(t,
				// SAME makeTestCluster(t, snap, "tcp:local_app"),
				makeTestCluster(t, newSnap, "tcp:db"),
				makeTestCluster(t, newSnap, "tcp:geo-cache"),
			),
		})
	})

	testutil.RunStep(t, "resend endpoints", func(t *testing.T) {
		// Envoy requests listeners because it has received endpoints. We won't send listeners
		// until Envoy ACKs the second cluster update.
		envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)

		// Envoy ACKs the endpoints from the first cluster update.
		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)

		// Resend endpoints because the clusters changed.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.EndpointType,
			Nonce:   hexString(4),
			Resources: makeTestResources(t,
				makeTestEndpoints(t, newSnap, "tcp:db"),
				makeTestEndpoints(t, newSnap, "tcp:geo-cache"),
			),
		})

		// Envoy ACKs the new cluster and endpoints.
		envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 3)
		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)

		// Listeners are sent after the cluster and endpoints are ACKed.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ListenerType,
			Nonce:   hexString(5),
			Resources: makeTestResources(t,
				makeTestListener(t, newSnap, "tcp:public_listener"),
				makeTestListener(t, newSnap, "tcp:db"),
				makeTestListener(t, newSnap, "tcp:geo-cache"),
			),
		})

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// ACKs the listener
		envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 5)
	})

	envoy.Close()
	select {
	case err := <-errCh:
		require.NoError(t, err)
	case <-time.After(50 * time.Millisecond):
		t.Fatalf("timed out waiting for handler to finish")
	}
}

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChangesImpactRoutes(t *testing.T) {
	aclResolve := func(id string) (acl.Authorizer, error) {
		// Allow all
		return acl.RootAuthorizer("manage"), nil
	}
	scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
	mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

	sid := structs.NewServiceID("web-sidecar-proxy", nil)

	// Register the proxy to create state needed to Watch() on
	mgr.RegisterProxy(t, sid)

	var snap *proxycfg.ConfigSnapshot

	testutil.RunStep(t, "get into initial state", func(t *testing.T) {
		// Send initial cluster discover (empty payload)
		envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)

		// Check no response sent yet
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// Deliver a new snapshot (tcp with one http upstream with no-op disco chain)
		snap = newTestSnapshot(t, nil, "http2", nil, &structs.ServiceConfigEntry{
			Kind:     structs.ServiceDefaults,
			Name:     "db",
			Protocol: "http2",
		}, &structs.ServiceRouterConfigEntry{
			Kind:   structs.ServiceRouter,
			Name:   "db",
			Routes: nil,
		})
		mgr.DeliverConfig(t, sid, snap)

		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ClusterType,
			Nonce:   hexString(1),
			Resources: makeTestResources(t,
				makeTestCluster(t, snap, "tcp:local_app"),
				makeTestCluster(t, snap, "http2:db"),
				makeTestCluster(t, snap, "tcp:geo-cache"),
			),
		})

		// Envoy then tries to discover endpoints for those clusters.
		envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			ResourceNamesSubscribe: []string{
				"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
				"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
			},
		})

		// We should get a response immediately since the config is already present in
		// the server for endpoints. Note that this should not be racy if the server
		// is behaving well since the Cluster send above should be blocked until we
		// deliver a new config version.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.EndpointType,
			Nonce:   hexString(2),
			Resources: makeTestResources(t,
				makeTestEndpoints(t, snap, "http2:db"),
				makeTestEndpoints(t, snap, "tcp:geo-cache"),
			),
		})

		// After receiving the endpoints Envoy sends an ACK for the clusters
		envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// Envoy now sends listener request
		envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)

		// It also (in parallel) issues the endpoint ACK
		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)

		// And should get a response immediately.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ListenerType,
			Nonce:   hexString(3),
			Resources: makeTestResources(t,
				makeTestListener(t, snap, "tcp:public_listener"),
				makeTestListener(t, snap, "http2:db:rds"),
				makeTestListener(t, snap, "tcp:geo-cache"),
			),
		})

		// We are caught up, so there should be nothing queued to send.
		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

		// Envoy now sends routes request
		envoy.SendDeltaReq(t, xdscommon.RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			ResourceNamesSubscribe: []string{
				"db",
			},
		})

		// And should get a response immediately.
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.RouteType,
			Nonce:   hexString(4),
			Resources: makeTestResources(t,
				makeTestRoute(t, "http2:db"),
			),
		})

		// After receiving the routes, Envoy sends acks back for the listener and routes.
		envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
		envoy.SendDeltaReqACK(t, xdscommon.RouteType, 4)

		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	})

	testutil.RunStep(t, "trigger listener update needing implicit route replacements", func(t *testing.T) {
		// Update the snapshot in a way that causes a single listener update.
		//
		// Downgrade from http2 to http
		snap = newTestSnapshot(t, snap, "http", nil, &structs.ServiceConfigEntry{
			Kind:     structs.ServiceDefaults,
			Name:     "db",
			Protocol: "http",
		}, &structs.ServiceRouterConfigEntry{
			Kind:   structs.ServiceRouter,
			Name:   "db",
			Routes: nil,
		})
		mgr.DeliverConfig(t, sid, snap)

		// db cluster is refreshed (unrelated to the test scenario other than it's required)
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ClusterType,
			Nonce:   hexString(5),
			Resources: makeTestResources(t,
				makeTestCluster(t, snap, "http:db"),
			),
		})

		envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 5)

		// The behaviors of Cluster updates triggering re-sends of Endpoint updates
		// tested in TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints
		// triggers here. It is not explicitly under test, but we have to get past
		// this exchange to get to the part we care about.

		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.EndpointType,
			Nonce:   hexString(6),
			Resources: makeTestResources(t,
				makeTestEndpoints(t, snap, "http:db"),
			),
		})

		envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 6)

		// the listener is updated
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ListenerType,
			Nonce:   hexString(7),
			Resources: makeTestResources(t,
				makeTestListener(t, snap, "http:db:rds"),
			),
		})

		// THE ACTUAL THING WE CARE ABOUT: replaced route config
		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.RouteType,
			Nonce:   hexString(8),
			Resources: makeTestResources(t,
				makeTestRoute(t, "http2:db"),
			),
		})

		// After receiving the routes, Envoy sends acks back for the listener and routes.
		envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 7)
		envoy.SendDeltaReqACK(t, xdscommon.RouteType, 8)

		assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	})

	envoy.Close()
	select {
	case err := <-errCh:
		require.NoError(t, err)
	case <-time.After(50 * time.Millisecond):
		t.Fatalf("timed out waiting for handler to finish")
	}
}

func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
	tests := []struct {
		name        string
		defaultDeny bool
		acl         string
		token       string
		wantDenied  bool
		cfgSnap     *proxycfg.ConfigSnapshot
	}{
		// Note that although we've stubbed actual ACL checks in the testManager
		// ConnectAuthorize mock, by asserting against specific reason strings here
		// even in the happy case which can't match the default one returned by the
		// mock we are implicitly validating that the implementation used the
		// correct token from the context.
		{
			name:        "no ACLs configured",
			defaultDeny: false,
			wantDenied:  false,
		},
		{
			name:        "default deny, no token",
			defaultDeny: true,
			wantDenied:  true,
		},
		{
			name:        "default deny, write token",
			defaultDeny: true,
			acl:         `service "web" { policy = "write" }`,
			token:       "service-write-on-web",
			wantDenied:  false,
		},
		{
			name:        "default deny, read token",
			defaultDeny: true,
			acl:         `service "web" { policy = "read" }`,
			token:       "service-write-on-web",
			wantDenied:  true,
		},
		{
			name:        "default deny, write token on different service",
			defaultDeny: true,
			acl:         `service "not-web" { policy = "write" }`,
			token:       "service-write-on-not-web",
			wantDenied:  true,
		},
		{
			name:        "ingress default deny, write token on different service",
			defaultDeny: true,
			acl:         `service "not-ingress" { policy = "write" }`,
			token:       "service-write-on-not-ingress",
			wantDenied:  true,
			cfgSnap:     proxycfg.TestConfigSnapshotIngressGateway(t, true, "tcp", "default", nil, nil, nil),
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			var stopped bool
			lock := &sync.RWMutex{}

			defer func() {
				lock.Lock()
				stopped = true
				lock.Unlock()
			}()

			// aclResolve may be called in a goroutine even after a
			// testcase tt returns. Capture the variable as tc so the
			// values don't swap in the next iteration.
			tc := tt
			aclResolve := func(id string) (acl.Authorizer, error) {
				if !tc.defaultDeny {
					// Allow all
					return acl.RootAuthorizer("allow"), nil
				}
				if tc.acl == "" {
					// No token and defaultDeny is denied
					return acl.RootAuthorizer("deny"), nil
				}

				lock.RLock()
				defer lock.RUnlock()

				if stopped {
					return acl.DenyAll().ToAllowAuthorizer(), nil
				}

				// Ensure the correct token was passed
				require.Equal(t, tc.token, id)
				// Parse the ACL and enforce it
				policy, err := acl.NewPolicyFromSource(tc.acl, nil, nil)
				require.NoError(t, err)
				return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
			}

			scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0)
			mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

			sid := structs.NewServiceID("web-sidecar-proxy", nil)
			// Register the proxy to create state needed to Watch() on
			mgr.RegisterProxy(t, sid)

			// Deliver a new snapshot
			snap := tt.cfgSnap
			if snap == nil {
				snap = newTestSnapshot(t, nil, "", nil)
			}
			mgr.DeliverConfig(t, sid, snap)

			// Send initial listener discover, in real life Envoy always sends cluster
			// first but it doesn't really matter and listener has a response that
			// includes the token in the ext rbac filter so lets us test more stuff.
			envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)

			// If there is no token, check that we increment the gauge
			if tt.token == "" {
				retry.Run(t, func(r *retry.R) {
					data := scenario.sink.Data()
					require.Len(r, data, 1)

					item := data[0]
					val, ok := item.Gauges["consul.xds.test.xds.server.streamsUnauthenticated"]
					require.True(r, ok)
					require.Equal(r, float32(1), val.Value)
				})
			}

			if !tt.wantDenied {
				assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
					TypeUrl: xdscommon.ListenerType,
					Nonce:   hexString(1),
					Resources: makeTestResources(t,
						makeTestListener(t, snap, "tcp:public_listener"),
						makeTestListener(t, snap, "tcp:db"),
						makeTestListener(t, snap, "tcp:geo-cache"),
					),
				})
				// Close the client stream since all is well. We _don't_ do this in the
				// expected error case because we want to verify the error closes the
				// stream from server side.
				envoy.Close()
			}

			select {
			case err := <-errCh:
				if tt.wantDenied {
					require.Error(t, err)
					status, ok := status.FromError(err)
					require.True(t, ok)
					require.Equal(t, codes.PermissionDenied, status.Code())
					require.Contains(t, err.Error(), "Permission denied")
					mgr.AssertWatchCancelled(t, sid)
				} else {
					require.NoError(t, err)
				}
			case <-time.After(50 * time.Millisecond):
				t.Fatalf("timed out waiting for handler to finish")
			}

			// If there is no token, check that we decrement the gauge
			if tt.token == "" {
				retry.Run(t, func(r *retry.R) {
					data := scenario.sink.Data()
					require.Len(r, data, 1)

					item := data[0]
					val, ok := item.Gauges["consul.xds.test.xds.server.streamsUnauthenticated"]
					require.True(r, ok)
					require.Equal(r, float32(0), val.Value)
				})
			}
		})
	}
}

func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuringDiscoveryRequest(t *testing.T) {
	if testing.Short() {
		t.Skip("too slow for testing.Short")
	}

	aclRules := `service "web" { policy = "write" }`
	token := "service-write-on-web"

	policy, err := acl.NewPolicyFromSource(aclRules, nil, nil)
	require.NoError(t, err)

	var validToken atomic.Value
	validToken.Store(token)

	aclResolve := func(id string) (acl.Authorizer, error) {
		if token := validToken.Load(); token == nil || id != token.(string) {
			return nil, acl.ErrNotFound
		}

		return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
	}
	scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
		100*time.Millisecond, // Make this short.
	)
	mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

	getError := func() (gotErr error, ok bool) {
		select {
		case err := <-errCh:
			return err, true
		default:
			return nil, false
		}
	}

	sid := structs.NewServiceID("web-sidecar-proxy", nil)
	// Register the proxy to create state needed to Watch() on
	mgr.RegisterProxy(t, sid)

	// Send initial cluster discover (OK)
	envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)
	{
		err, ok := getError()
		require.NoError(t, err)
		require.False(t, ok)
	}

	// Check no response sent yet
	assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	{
		err, ok := getError()
		require.NoError(t, err)
		require.False(t, ok)
	}

	// Deliver a new snapshot
	snap := newTestSnapshot(t, nil, "", nil)
	mgr.DeliverConfig(t, sid, snap)

	assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
		TypeUrl: xdscommon.ClusterType,
		Nonce:   hexString(1),
		Resources: makeTestResources(t,
			makeTestCluster(t, snap, "tcp:local_app"),
			makeTestCluster(t, snap, "tcp:db"),
			makeTestCluster(t, snap, "tcp:geo-cache"),
		),
	})

	// It also (in parallel) issues the next cluster request (which acts as an ACK
	// of the version we sent)
	envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)

	// Check no response sent yet
	assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	{
		err, ok := getError()
		require.NoError(t, err)
		require.False(t, ok)
	}

	// Now nuke the ACL token while there's no activity.
	validToken.Store("")

	select {
	case err := <-errCh:
		require.Error(t, err)
		gerr, ok := status.FromError(err)
		require.Truef(t, ok, "not a grpc status error: type='%T' value=%v", err, err)
		require.Equal(t, codes.Unauthenticated, gerr.Code())
		require.Equal(t, "unauthenticated: ACL not found", gerr.Message())

		mgr.AssertWatchCancelled(t, sid)
	case <-time.After(200 * time.Millisecond):
		t.Fatalf("timed out waiting for handler to finish")
	}
}

func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBackground(t *testing.T) {
	if testing.Short() {
		t.Skip("too slow for testing.Short")
	}

	aclRules := `service "web" { policy = "write" }`
	token := "service-write-on-web"

	policy, err := acl.NewPolicyFromSource(aclRules, nil, nil)
	require.NoError(t, err)

	var validToken atomic.Value
	validToken.Store(token)

	aclResolve := func(id string) (acl.Authorizer, error) {
		if token := validToken.Load(); token == nil || id != token.(string) {
			return nil, acl.ErrNotFound
		}

		return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
	}
	scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
		100*time.Millisecond, // Make this short.
	)
	mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

	getError := func() (gotErr error, ok bool) {
		select {
		case err := <-errCh:
			return err, true
		default:
			return nil, false
		}
	}

	sid := structs.NewServiceID("web-sidecar-proxy", nil)
	// Register the proxy to create state needed to Watch() on
	mgr.RegisterProxy(t, sid)

	// Send initial cluster discover (OK)
	envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)
	{
		err, ok := getError()
		require.NoError(t, err)
		require.False(t, ok)
	}

	// Check no response sent yet
	assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	{
		err, ok := getError()
		require.NoError(t, err)
		require.False(t, ok)
	}

	// Deliver a new snapshot
	snap := newTestSnapshot(t, nil, "", nil)
	mgr.DeliverConfig(t, sid, snap)

	assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
		TypeUrl: xdscommon.ClusterType,
		Nonce:   hexString(1),
		Resources: makeTestResources(t,
			makeTestCluster(t, snap, "tcp:local_app"),
			makeTestCluster(t, snap, "tcp:db"),
			makeTestCluster(t, snap, "tcp:geo-cache"),
		),
	})

	// It also (in parallel) issues the next cluster request (which acts as an ACK
	// of the version we sent)
	envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)

	// Check no response sent yet
	assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
	{
		err, ok := getError()
		require.NoError(t, err)
		require.False(t, ok)
	}

	// Now nuke the ACL token while there's no activity.
	validToken.Store("")

	select {
	case err := <-errCh:
		require.Error(t, err)
		gerr, ok := status.FromError(err)
		require.Truef(t, ok, "not a grpc status error: type='%T' value=%v", err, err)
		require.Equal(t, codes.Unauthenticated, gerr.Code())
		require.Equal(t, "unauthenticated: ACL not found", gerr.Message())

		mgr.AssertWatchCancelled(t, sid)
	case <-time.After(200 * time.Millisecond):
		t.Fatalf("timed out waiting for handler to finish")
	}
}

func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
	aclResolve := func(id string) (acl.Authorizer, error) {
		// Allow all
		return acl.RootAuthorizer("manage"), nil
	}
	scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0)
	mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

	sid := structs.NewServiceID("ingress-gateway", nil)

	// Register the proxy to create state needed to Watch() on
	mgr.RegisterProxy(t, sid)

	// Send initial cluster discover
	envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)

	// Check no response sent yet
	assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

	// Deliver a new snapshot with no services
	snap := proxycfg.TestConfigSnapshotIngressGateway(t, false, "tcp", "default", nil, nil, nil)
	mgr.DeliverConfig(t, sid, snap)

	// REQ: clusters
	envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)

	// RESP: cluster
	assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
		TypeUrl: xdscommon.ClusterType,
		Nonce:   hexString(1),
	})

	assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

	// ACK: clusters
	envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)

	// REQ: listeners
	envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)

	// RESP: listeners
	assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
		TypeUrl: xdscommon.ListenerType,
		Nonce:   hexString(2),
	})

	assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

	envoy.Close()
	select {
	case err := <-errCh:
		require.NoError(t, err)
	case <-time.After(50 * time.Millisecond):
		t.Fatalf("timed out waiting for handler to finish")
	}
}

func TestServer_DeltaAggregatedResources_v3_CapacityReached(t *testing.T) {
	aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }

	scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
	mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

	sid := structs.NewServiceID("web-sidecar-proxy", nil)

	mgr.RegisterProxy(t, sid)
	mgr.DrainStreams(sid)

	snap := newTestSnapshot(t, nil, "", nil)

	envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
		InitialResourceVersions: mustMakeVersionMap(t,
			makeTestCluster(t, snap, "tcp:geo-cache"),
		),
	})

	select {
	case err := <-errCh:
		require.Error(t, err)
		require.Equal(t, codes.ResourceExhausted.String(), status.Code(err).String())
	case <-time.After(50 * time.Millisecond):
		t.Fatalf("timed out waiting for handler to finish")
	}
}

func TestServer_DeltaAggregatedResources_v3_CfgSrcTerminated(t *testing.T) {
	aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }

	scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
	mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

	sid := structs.NewServiceID("web-sidecar-proxy", nil)

	mgr.RegisterProxy(t, sid)

	snap := newTestSnapshot(t, nil, "", nil)
	envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
		InitialResourceVersions: mustMakeVersionMap(t,
			makeTestCluster(t, snap, "tcp:geo-cache"),
		),
	})
	mgr.CfgSrcTerminate(sid)

	select {
	case err := <-errCh:
		require.Error(t, err)
		require.Equal(t, codes.Internal.String(), status.Code(err).String())
		require.Equal(t, errConfigSyncError, err)
	case <-time.After(50 * time.Millisecond):
		t.Fatalf("timed out waiting for handler to finish")
	}
}

type capacityReachedLimiter struct{}

func (capacityReachedLimiter) BeginSession() (limiter.Session, error) {
	return nil, limiter.ErrCapacityReached
}

func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) {
	aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }
	scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
	mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy

	sid := structs.NewServiceID("web-sidecar-proxy", nil)

	mgr.RegisterProxy(t, sid)

	testutil.RunStep(t, "successful request/response", func(t *testing.T) {
		snap := newTestSnapshot(t, nil, "", nil)

		envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
			InitialResourceVersions: mustMakeVersionMap(t,
				makeTestCluster(t, snap, "tcp:geo-cache"),
			),
		})

		mgr.DeliverConfig(t, sid, snap)

		assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
			TypeUrl: xdscommon.ClusterType,
			Nonce:   hexString(1),
			Resources: makeTestResources(t,
				makeTestCluster(t, snap, "tcp:local_app"),
				makeTestCluster(t, snap, "tcp:db"),
			),
		})
	})

	testutil.RunStep(t, "terminate limiter session", func(t *testing.T) {
		mgr.DrainStreams(sid)

		select {
		case err := <-errCh:
			require.Error(t, err)
			require.Equal(t, codes.ResourceExhausted.String(), status.Code(err).String())
		case <-time.After(50 * time.Millisecond):
			t.Fatalf("timed out waiting for handler to finish")
		}
	})

	testutil.RunStep(t, "check drain counter incremented", func(t *testing.T) {
		data := scenario.sink.Data()
		require.Len(t, data, 1)

		item := data[0]
		require.Len(t, item.Counters, 1)

		val, ok := item.Counters["consul.xds.test.xds.server.streamDrained"]
		require.True(t, ok)
		require.Equal(t, 1, val.Count)
	})

	testutil.RunStep(t, "check streamStart metric recorded", func(t *testing.T) {
		data := scenario.sink.Data()
		require.Len(t, data, 1)

		item := data[0]
		require.Len(t, item.Samples, 1)

		val, ok := item.Samples["consul.xds.test.xds.server.streamStart"]
		require.True(t, ok)
		require.Equal(t, 1, val.Count)
	})
}

func assertDeltaChanBlocked(t *testing.T, ch chan *envoy_discovery_v3.DeltaDiscoveryResponse) {
	t.Helper()
	select {
	case r := <-ch:
		t.Fatalf("chan should block but received: %v", r)
	case <-time.After(10 * time.Millisecond):
		return
	}
}

func assertDeltaResponseSent(t *testing.T, ch chan *envoy_discovery_v3.DeltaDiscoveryResponse, want *envoy_discovery_v3.DeltaDiscoveryResponse) {
	t.Helper()
	select {
	case got := <-ch:
		assertDeltaResponse(t, got, want)
	case <-time.After(50 * time.Millisecond):
		t.Fatalf("no response received after 50ms")
	}
}

// assertDeltaResponse is a helper to test a envoy.DeltaDiscoveryResponse matches the
// expected value. We use JSON during comparison here because the responses use protobuf
// Any type which includes binary protobuf encoding.
func assertDeltaResponse(t *testing.T, got, want *envoy_discovery_v3.DeltaDiscoveryResponse) {
	t.Helper()

	gotJSON := protoToSortedJSON(t, got)
	wantJSON := protoToSortedJSON(t, want)
	require.JSONEqf(t, wantJSON, gotJSON, "got:\n%s", gotJSON)
}

func mustMakeVersionMap(t *testing.T, resources ...proto.Message) map[string]string {
	m := make(map[string]string)
	for _, res := range resources {
		name := xdscommon.GetResourceName(res)
		m[name] = mustHashResource(t, res)
	}
	return m
}

func requireExtensionMetrics(
	t *testing.T,
	scenario *testServerScenario,
	extName string,
	sid structs.ServiceID,
	err error,
) {
	data := scenario.sink.Data()
	require.Len(t, data, 1)
	item := data[0]

	expectLabels := []metrics.Label{
		{Name: "extension", Value: extName},
		{Name: "version", Value: "builtin/" + version.Version},
		{Name: "service", Value: sid.ID},
		{Name: "partition", Value: sid.PartitionOrDefault()},
		{Name: "namespace", Value: sid.NamespaceOrDefault()},
		{Name: "error", Value: strconv.FormatBool(err != nil)},
	}

	for _, s := range []string{
		"consul.xds.test.envoy_extension.validate_arguments;",
		"consul.xds.test.envoy_extension.validate;",
		"consul.xds.test.envoy_extension.extend;",
	} {
		foundLabel := false
		for k, v := range item.Samples {
			if strings.HasPrefix(k, s) {
				foundLabel = true
				require.ElementsMatch(t, expectLabels, v.Labels)
			}
		}
		require.True(t, foundLabel)
	}
}

func Test_validateAndApplyEnvoyExtension_Validations(t *testing.T) {
	type testCase struct {
		name          string
		runtimeConfig extensioncommon.RuntimeConfig
		err           bool
		errString     string
	}

	envoyVersion, _ := goversion.NewVersion("1.25.0")
	consulVersion, _ := goversion.NewVersion("1.16.0")

	svc := api.CompoundServiceName{
		Name:      "s1",
		Partition: "ap1",
		Namespace: "ns1",
	}

	makeRuntimeConfig := func(required bool, consulVersion string, envoyVersion string, args map[string]interface{}) extensioncommon.RuntimeConfig {
		if args == nil {
			args = map[string]interface{}{
				"ARN": "arn:aws:lambda:us-east-1:111111111111:function:lambda-1234",
			}

		}
		return extensioncommon.RuntimeConfig{
			EnvoyExtension: api.EnvoyExtension{
				Name:          api.BuiltinAWSLambdaExtension,
				Required:      required,
				ConsulVersion: consulVersion,
				EnvoyVersion:  envoyVersion,
				Arguments:     args,
			},
			ServiceName: svc,
		}
	}

	cases := []testCase{
		{
			name:          "invalid consul version constraint - required",
			runtimeConfig: makeRuntimeConfig(true, "bad", ">= 1.0", nil),
			err:           true,
			errString:     "failed to parse Consul version constraint for extension",
		},
		{
			name:          "invalid consul version constraint - not required",
			runtimeConfig: makeRuntimeConfig(false, "bad", ">= 1.0", nil),
			err:           false,
		},
		{
			name:          "invalid envoy version constraint - required",
			runtimeConfig: makeRuntimeConfig(true, ">= 1.0", "bad", nil),
			err:           true,
			errString:     "failed to parse Envoy version constraint for extension",
		},
		{
			name:          "invalid envoy version constraint - not required",
			runtimeConfig: makeRuntimeConfig(false, ">= 1.0", "bad", nil),
			err:           false,
		},
		{
			name:          "no envoy version constraint match",
			runtimeConfig: makeRuntimeConfig(false, "", ">= 2.0.0", nil),
			err:           false,
		},
		{
			name:          "no consul version constraint match",
			runtimeConfig: makeRuntimeConfig(false, ">= 2.0.0", "", nil),
			err:           false,
		},
		{
			name:          "invalid extension arguments - required",
			runtimeConfig: makeRuntimeConfig(true, ">= 1.15.0", ">= 1.25.0", map[string]interface{}{"bad": "args"}),
			err:           true,
			errString:     "failed to construct extension",
		},
		{
			name:          "invalid extension arguments - not required",
			runtimeConfig: makeRuntimeConfig(false, ">= 1.15.0", ">= 1.25.0", map[string]interface{}{"bad": "args"}),
			err:           false,
		},
		{
			name:          "valid everything - no resources and required",
			runtimeConfig: makeRuntimeConfig(true, ">= 1.15.0", ">= 1.25.0", nil),
			err:           true,
			errString:     "failed to patch xDS resources in",
		},
		{
			name:          "valid everything",
			runtimeConfig: makeRuntimeConfig(false, ">= 1.15.0", ">= 1.25.0", nil),
			err:           false,
		},
	}

	for _, tc := range cases {
		t.Run(tc.name, func(t *testing.T) {
			snap := proxycfg.ConfigSnapshot{
				ProxyID: proxycfg.ProxyID{
					ServiceID: structs.NewServiceID("s1", nil),
				},
			}
			resources, err := validateAndApplyEnvoyExtension(hclog.NewNullLogger(), &snap, nil, tc.runtimeConfig, envoyVersion, consulVersion)
			if tc.err {
				require.Error(t, err)
				require.Contains(t, err.Error(), tc.errString)
			} else {
				require.NoError(t, err)
				require.Nil(t, resources)
			}
		})
	}
}

func Test_applyEnvoyExtension_CanApply(t *testing.T) {
	type testCase struct {
		name     string
		canApply bool
	}

	cases := []testCase{
		{
			name:     "cannot apply: is not applied",
			canApply: false,
		},
		{
			name:     "can apply: is applied",
			canApply: true,
		},
	}

	for _, tc := range cases {
		t.Run(tc.name, func(t *testing.T) {
			extender := extensioncommon.BasicEnvoyExtender{
				Extension: &maybeCanApplyExtension{
					canApply: tc.canApply,
				},
			}
			config := &extensioncommon.RuntimeConfig{
				Kind:                  api.ServiceKindConnectProxy,
				ServiceName:           api.CompoundServiceName{Name: "api"},
				Upstreams:             map[api.CompoundServiceName]*extensioncommon.UpstreamData{},
				IsSourcedFromUpstream: false,
				EnvoyExtension: api.EnvoyExtension{
					Name:     "maybeCanApplyExtension",
					Required: false,
				},
			}
			listener := &envoy_listener_v3.Listener{
				Name:                  xdscommon.OutboundListenerName,
				IgnoreGlobalConnLimit: false,
			}
			indexedResources := xdscommon.IndexResources(testutil.Logger(t), map[string][]proto.Message{
				xdscommon.ListenerType: {
					listener,
				},
			})

			result, err := applyEnvoyExtension(&extender, indexedResources, config)
			require.NoError(t, err)
			resultListener := result.Index[xdscommon.ListenerType][xdscommon.OutboundListenerName].(*envoy_listener_v3.Listener)
			require.Equal(t, tc.canApply, resultListener.IgnoreGlobalConnLimit)
		})
	}
}

func Test_applyEnvoyExtension_PartialApplicationDisallowed(t *testing.T) {
	type testCase struct {
		name            string
		fail            bool
		returnOnFailure bool
	}

	cases := []testCase{
		{
			name:            "failure: returns nothing",
			fail:            true,
			returnOnFailure: false,
		},
		// Not expected, but cover to be sure.
		{
			name:            "failure: returns values",
			fail:            true,
			returnOnFailure: true,
		},
		// Ensure that under normal circumstances, the extension would succeed in
		// modifying resources.
		{
			name: "success: resources modified",
			fail: false,
		},
	}

	for _, tc := range cases {
		for _, indexType := range []string{
			xdscommon.ListenerType,
			xdscommon.ClusterType,
		} {
			typeShortName := indexType[strings.LastIndex(indexType, ".")+1:]
			t.Run(fmt.Sprintf("%s: %s", tc.name, typeShortName), func(t *testing.T) {
				extender := extensioncommon.BasicEnvoyExtender{
					Extension: &partialFailureExtension{
						returnOnFailure: tc.returnOnFailure,
						// Alternate which resource fails so that we can test for
						// partial modification independent of patch order.
						failListener: tc.fail && indexType == xdscommon.ListenerType,
						failCluster:  tc.fail && indexType == xdscommon.ClusterType,
					},
				}
				config := &extensioncommon.RuntimeConfig{
					Kind:                  api.ServiceKindConnectProxy,
					ServiceName:           api.CompoundServiceName{Name: "api"},
					Upstreams:             map[api.CompoundServiceName]*extensioncommon.UpstreamData{},
					IsSourcedFromUpstream: false,
					EnvoyExtension: api.EnvoyExtension{
						Name:     "partialFailureExtension",
						Required: false,
					},
				}
				cluster := &envoy_cluster_v3.Cluster{
					Name:          xdscommon.LocalAppClusterName,
					RespectDnsTtl: false,
				}
				listener := &envoy_listener_v3.Listener{
					Name:                  xdscommon.OutboundListenerName,
					IgnoreGlobalConnLimit: false,
				}
				indexedResources := xdscommon.IndexResources(testutil.Logger(t), map[string][]proto.Message{
					xdscommon.ClusterType: {
						cluster,
					},
					xdscommon.ListenerType: {
						listener,
					},
				})

				result, err := applyEnvoyExtension(&extender, indexedResources, config)
				if tc.fail {
					require.Error(t, err)
				} else {
					require.NoError(t, err)
				}

				resultListener := result.Index[xdscommon.ListenerType][xdscommon.OutboundListenerName].(*envoy_listener_v3.Listener)
				resultCluster := result.Index[xdscommon.ClusterType][xdscommon.LocalAppClusterName].(*envoy_cluster_v3.Cluster)
				require.Equal(t, !tc.fail, resultListener.IgnoreGlobalConnLimit)
				require.Equal(t, !tc.fail, resultCluster.RespectDnsTtl)

				// Regardless of success, original values should not be modified.
				originalListener := indexedResources.Index[xdscommon.ListenerType][xdscommon.OutboundListenerName].(*envoy_listener_v3.Listener)
				originalCluster := indexedResources.Index[xdscommon.ClusterType][xdscommon.LocalAppClusterName].(*envoy_cluster_v3.Cluster)
				require.False(t, originalListener.IgnoreGlobalConnLimit)
				require.False(t, originalCluster.RespectDnsTtl)
			})
		}
	}
}

func Test_applyEnvoyExtension_HandlesPanics(t *testing.T) {
	type testCase struct {
		name            string
		panicOnCanApply bool
		panicOnPatch    bool
	}

	cases := []testCase{
		{
			name:            "panic: CanApply",
			panicOnCanApply: true,
		},
		{
			name:         "panic: Extend",
			panicOnPatch: true,
		},
	}

	for _, tc := range cases {
		t.Run(tc.name, func(t *testing.T) {
			extension := &maybePanicExtension{
				panicOnCanApply: tc.panicOnCanApply,
				panicOnPatch:    tc.panicOnPatch,
			}
			extender := extensioncommon.BasicEnvoyExtender{
				Extension: extension,
			}
			config := &extensioncommon.RuntimeConfig{
				Kind:                  api.ServiceKindConnectProxy,
				ServiceName:           api.CompoundServiceName{Name: "api"},
				Upstreams:             map[api.CompoundServiceName]*extensioncommon.UpstreamData{},
				IsSourcedFromUpstream: false,
				EnvoyExtension: api.EnvoyExtension{
					Name:     "maybePanicExtension",
					Required: false,
				},
			}
			listener := &envoy_listener_v3.Listener{
				Name:                  xdscommon.OutboundListenerName,
				IgnoreGlobalConnLimit: false,
			}
			indexedResources := xdscommon.IndexResources(testutil.Logger(t), map[string][]proto.Message{
				xdscommon.ListenerType: {
					listener,
				},
			})

			_, err := applyEnvoyExtension(&extender, indexedResources, config)

			// We did not panic, good.
			// First assert our test is valid by forcing a panic, then check the error message that was returned.
			if tc.panicOnCanApply {
				require.PanicsWithError(t, "this is an expected failure in CanApply", func() {
					extension.CanApply(config)
				})
				require.ErrorContains(t, err, "attempt to apply Envoy extension \"maybePanicExtension\" caused an unexpected panic: this is an expected failure in CanApply")
			}
			if tc.panicOnPatch {
				require.PanicsWithError(t, "this is an expected failure in PatchListener", func() {
					_, _, _ = extension.PatchListener(config.GetListenerPayload(listener))
				})
				require.ErrorContains(t, err, "attempt to apply Envoy extension \"maybePanicExtension\" caused an unexpected panic: this is an expected failure in PatchListener")
			}
		})
	}
}

type maybeCanApplyExtension struct {
	extensioncommon.BasicExtensionAdapter
	canApply bool
}

var _ extensioncommon.BasicExtension = (*maybeCanApplyExtension)(nil)

func (m *maybeCanApplyExtension) CanApply(_ *extensioncommon.RuntimeConfig) bool {
	return m.canApply
}

func (m *maybeCanApplyExtension) PatchListener(payload extensioncommon.ListenerPayload) (*envoy_listener_v3.Listener, bool, error) {
	payload.Message.IgnoreGlobalConnLimit = true
	return payload.Message, true, nil
}

type partialFailureExtension struct {
	extensioncommon.BasicExtensionAdapter
	returnOnFailure bool
	failCluster     bool
	failListener    bool
}

var _ extensioncommon.BasicExtension = (*partialFailureExtension)(nil)

func (p *partialFailureExtension) CanApply(_ *extensioncommon.RuntimeConfig) bool {
	return true
}

func (p *partialFailureExtension) PatchListener(payload extensioncommon.ListenerPayload) (*envoy_listener_v3.Listener, bool, error) {
	// Modify original input message
	payload.Message.IgnoreGlobalConnLimit = true

	err := fmt.Errorf("oops - listener patch failed")
	if !p.failListener {
		err = nil
	}

	returnMsg := payload.Message
	if err != nil && !p.returnOnFailure {
		returnMsg = nil
	}

	patched := err == nil || p.returnOnFailure

	return returnMsg, patched, err
}

func (p *partialFailureExtension) PatchCluster(payload extensioncommon.ClusterPayload) (*envoy_cluster_v3.Cluster, bool, error) {
	// Modify original input message
	payload.Message.RespectDnsTtl = true

	err := fmt.Errorf("oops - cluster patch failed")
	if !p.failCluster {
		err = nil
	}

	returnMsg := payload.Message
	if err != nil && !p.returnOnFailure {
		returnMsg = nil
	}

	patched := err == nil || p.returnOnFailure

	return returnMsg, patched, err
}

type maybePanicExtension struct {
	extensioncommon.BasicExtensionAdapter
	panicOnCanApply bool
	panicOnPatch    bool
}

var _ extensioncommon.BasicExtension = (*maybePanicExtension)(nil)

func (m *maybePanicExtension) CanApply(_ *extensioncommon.RuntimeConfig) bool {
	if m.panicOnCanApply {
		panic(fmt.Errorf("this is an expected failure in CanApply"))
	}
	return true
}

func (m *maybePanicExtension) PatchListener(payload extensioncommon.ListenerPayload) (*envoy_listener_v3.Listener, bool, error) {
	if m.panicOnPatch {
		panic(fmt.Errorf("this is an expected failure in PatchListener"))
	}
	payload.Message.IgnoreGlobalConnLimit = true
	return payload.Message, true, nil
}