Make an xdscommon package that will be shared between Consul and Envoy plugins

This commit is contained in:
Eric 2022-03-08 14:37:24 -05:00
parent abfcde1bc6
commit f5c9fa6fa6
9 changed files with 194 additions and 179 deletions

View File

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/sdk/testutil"
)
@ -491,7 +492,7 @@ func TestClustersFromSnapshot(t *testing.T) {
return clusters[i].(*envoy_cluster_v3.Cluster).Name < clusters[j].(*envoy_cluster_v3.Cluster).Name
})
r, err := createResponse(ClusterType, "00000001", "00000001", clusters)
r, err := createResponse(xdscommon.ClusterType, "00000001", "00000001", clusters)
require.NoError(t, err)
t.Run("current", func(t *testing.T) {

View File

@ -23,6 +23,7 @@ import (
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/logging"
)
@ -93,7 +94,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
// resourceMap is the SoTW we are incrementally attempting to sync to envoy.
//
// type => name => proto
resourceMap = emptyIndexedResources()
resourceMap = xdscommon.EmptyIndexedResources()
// currentVersions is the the xDS versioning represented by Resources.
//
@ -113,20 +114,20 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
// Configure handlers for each type of request we currently care about.
handlers := map[string]*xDSDeltaType{
ListenerType: newDeltaType(generator, stream, ListenerType, func(kind structs.ServiceKind) bool {
xdscommon.ListenerType: newDeltaType(generator, stream, xdscommon.ListenerType, func(kind structs.ServiceKind) bool {
return cfgSnap.Kind == structs.ServiceKindIngressGateway
}),
RouteType: newDeltaType(generator, stream, RouteType, func(kind structs.ServiceKind) bool {
xdscommon.RouteType: newDeltaType(generator, stream, xdscommon.RouteType, func(kind structs.ServiceKind) bool {
return cfgSnap.Kind == structs.ServiceKindIngressGateway
}),
ClusterType: newDeltaType(generator, stream, ClusterType, func(kind structs.ServiceKind) bool {
xdscommon.ClusterType: newDeltaType(generator, stream, xdscommon.ClusterType, func(kind structs.ServiceKind) bool {
// Mesh, Ingress, and Terminating gateways are allowed to inform CDS of
// no clusters.
return cfgSnap.Kind == structs.ServiceKindMeshGateway ||
cfgSnap.Kind == structs.ServiceKindTerminatingGateway ||
cfgSnap.Kind == structs.ServiceKindIngressGateway
}),
EndpointType: newDeltaType(generator, stream, EndpointType, nil),
xdscommon.EndpointType: newDeltaType(generator, stream, xdscommon.EndpointType, nil),
}
// Endpoints are stored within a Cluster (and Routes
@ -138,8 +139,8 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
// representation of envoy state to force an update.
//
// see: https://github.com/envoyproxy/envoy/issues/13009
handlers[ListenerType].childType = handlers[RouteType]
handlers[ClusterType].childType = handlers[EndpointType]
handlers[xdscommon.ListenerType].childType = handlers[xdscommon.RouteType]
handlers[xdscommon.ClusterType].childType = handlers[xdscommon.EndpointType]
var authTimer <-chan time.Time
extendAuthTimer := func() {
@ -332,18 +333,18 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
var xDSUpdateOrder = []xDSUpdateOperation{
// 1. CDS updates (if any) must always be pushed first.
{TypeUrl: ClusterType, Upsert: true},
{TypeUrl: xdscommon.ClusterType, Upsert: true},
// 2. EDS updates (if any) must arrive after CDS updates for the respective clusters.
{TypeUrl: EndpointType, Upsert: true},
{TypeUrl: xdscommon.EndpointType, Upsert: true},
// 3. LDS updates must arrive after corresponding CDS/EDS updates.
{TypeUrl: ListenerType, Upsert: true, Remove: true},
{TypeUrl: xdscommon.ListenerType, Upsert: true, Remove: true},
// 4. RDS updates related to the newly added listeners must arrive after CDS/EDS/LDS updates.
{TypeUrl: RouteType, Upsert: true, Remove: true},
{TypeUrl: xdscommon.RouteType, Upsert: true, Remove: true},
// 5. (NOT IMPLEMENTED YET IN CONSUL) VHDS updates (if any) related to the newly added RouteConfigurations must arrive after RDS updates.
// {},
// 6. Stale CDS clusters and related EDS endpoints (ones no longer being referenced) can then be removed.
{TypeUrl: ClusterType, Remove: true},
{TypeUrl: EndpointType, Remove: true},
{TypeUrl: xdscommon.ClusterType, Remove: true},
{TypeUrl: xdscommon.EndpointType, Remove: true},
// xDS updates can be pushed independently if no new
// clusters/routes/listeners are added or if its acceptable to
// temporarily drop traffic during updates. Note that in case of
@ -464,7 +465,7 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su
if sf.ForceLDSandCDSToAlwaysUseWildcardsOnReconnect {
switch t.typeURL {
case ListenerType, ClusterType:
case xdscommon.ListenerType, xdscommon.ClusterType:
if !t.wildcard {
t.wildcard = true
logger.Trace("fixing Envoy bug fixed in 1.19.0 by inferring wildcard mode for type")
@ -628,7 +629,7 @@ func (t *xDSDeltaType) nack(nonce string) {
func (t *xDSDeltaType) SendIfNew(
kind structs.ServiceKind,
currentVersions map[string]string, // type => name => version (as consul knows right now)
resourceMap *IndexedResources,
resourceMap *xdscommon.IndexedResources,
nonce *uint64,
upsert, remove bool,
) (error, bool) {
@ -688,7 +689,7 @@ func (t *xDSDeltaType) SendIfNew(
func (t *xDSDeltaType) createDeltaResponse(
currentVersions map[string]string, // name => version (as consul knows right now)
resourceMap *IndexedResources,
resourceMap *xdscommon.IndexedResources,
upsert, remove bool,
) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]PendingUpdate, error) {
// compute difference
@ -797,7 +798,7 @@ func (t *xDSDeltaType) createDeltaResponse(
return resp, realUpdates, nil
}
func computeResourceVersions(resourceMap *IndexedResources) (map[string]map[string]string, error) {
func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {
out := make(map[string]map[string]string)
for typeUrl, resources := range resourceMap.Index {
m, err := hashResourceMap(resources)
@ -809,52 +810,27 @@ func computeResourceVersions(resourceMap *IndexedResources) (map[string]map[stri
return out, nil
}
type IndexedResources struct {
// Index is a map of typeURL => resourceName => resource
Index map[string]map[string]proto.Message
// ChildIndex is a map of typeURL => parentResourceName => list of
// childResourceNames. This only applies if the child and parent do not
// share a name.
ChildIndex map[string]map[string][]string
}
func emptyIndexedResources() *IndexedResources {
return &IndexedResources{
Index: map[string]map[string]proto.Message{
ListenerType: make(map[string]proto.Message),
RouteType: make(map[string]proto.Message),
ClusterType: make(map[string]proto.Message),
EndpointType: make(map[string]proto.Message),
},
ChildIndex: map[string]map[string][]string{
ListenerType: make(map[string][]string),
ClusterType: make(map[string][]string),
},
}
}
func populateChildIndexMap(resourceMap *IndexedResources) error {
func populateChildIndexMap(resourceMap *xdscommon.IndexedResources) error {
// LDS and RDS have a more complicated relationship.
for name, res := range resourceMap.Index[ListenerType] {
for name, res := range resourceMap.Index[xdscommon.ListenerType] {
listener := res.(*envoy_listener_v3.Listener)
rdsRouteNames, err := extractRdsResourceNames(listener)
if err != nil {
return err
}
resourceMap.ChildIndex[ListenerType][name] = rdsRouteNames
resourceMap.ChildIndex[xdscommon.ListenerType][name] = rdsRouteNames
}
// CDS and EDS share exact names.
for name := range resourceMap.Index[ClusterType] {
resourceMap.ChildIndex[ClusterType][name] = []string{name}
for name := range resourceMap.Index[xdscommon.ClusterType] {
resourceMap.ChildIndex[xdscommon.ClusterType][name] = []string{name}
}
return nil
}
func indexResources(logger hclog.Logger, resources map[string][]proto.Message) *IndexedResources {
data := emptyIndexedResources()
func indexResources(logger hclog.Logger, resources map[string][]proto.Message) *xdscommon.IndexedResources {
data := xdscommon.EmptyIndexedResources()
for typeURL, typeRes := range resources {
for _, res := range typeRes {

View File

@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/xdscommon"
)
// NOTE: For these tests, prefer not using xDS protobuf "factory" methods if
@ -46,7 +47,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
// Send initial cluster discover. We'll assume we are testing a partial
// reconnect and include some initial resource versions that will be
// cleaned up.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
InitialResourceVersions: mustMakeVersionMap(t,
makeTestCluster(t, snap, "tcp:geo-cache"),
),
@ -63,7 +64,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
runStep(t, "first sync", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
@ -73,7 +74,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
// We'll assume we are testing a partial "reconnect"
InitialResourceVersions: mustMakeVersionMap(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
@ -89,14 +90,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1)
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
@ -109,14 +110,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, ListenerType, nil)
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
@ -129,25 +130,25 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3)
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
// If we re-subscribe to something even if there are no changes we get a
// fresh copy.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
envoy.SendDeltaReqACK(t, EndpointType, 4)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -159,7 +160,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
}
runStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) {
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesUnsubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
@ -185,20 +186,20 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// and fix the subscription
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
})
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
),
})
envoy.SendDeltaReqACK(t, EndpointType, 5)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
@ -247,7 +248,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
snap.Port = 1
// Send initial cluster discover.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -260,7 +261,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
runStep(t, "first sync", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
@ -270,7 +271,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
@ -278,14 +279,14 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1)
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
@ -297,14 +298,14 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, ListenerType, nil)
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
// Response contains public_listener with port that Envoy can't bind to
@ -318,7 +319,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// NACKs the listener update due to the bad public listener
envoy.SendDeltaReqNACK(t, ListenerType, 3, &rpcstatus.Status{})
envoy.SendDeltaReqNACK(t, xdscommon.ListenerType, 3, &rpcstatus.Status{})
// Consul should not respond until a new snapshot is delivered
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -331,7 +332,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
// And should send a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(4),
Resources: makeTestResources(t,
// Send a public listener that Envoy will accept
@ -342,7 +343,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
})
// New listener is acked now
envoy.SendDeltaReqACK(t, EndpointType, 4)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
@ -370,7 +371,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
mgr.RegisterProxy(t, sid)
// Send initial cluster discover (empty payload)
envoy.SendDeltaReq(t, ClusterType, nil)
envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -385,7 +386,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
runStep(t, "no-rds", func(t *testing.T) {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
@ -395,7 +396,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
@ -403,14 +404,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1)
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "http2:db"),
@ -422,14 +423,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, ListenerType, nil)
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
@ -442,7 +443,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3)
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -464,7 +465,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
runStep(t, "with-rds", func(t *testing.T) {
// Just the "db" listener sees a change
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestListener(t, snap, "http2:db:rds"),
@ -475,25 +476,25 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends routes request
envoy.SendDeltaReq(t, RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{
envoy.SendDeltaReq(t, xdscommon.RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db",
},
})
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 4)
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 4)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: RouteType,
TypeUrl: xdscommon.RouteType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestRoute(t, "http2:db"),
),
})
envoy.SendDeltaReqACK(t, RouteType, 5)
envoy.SendDeltaReqACK(t, xdscommon.RouteType, 5)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
@ -520,11 +521,11 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
// This mutateFn causes any endpoint with a name containing "geo-cache" to be
// omitted from the response while the hack is active.
var slowHackDisabled uint32
server.ResourceMapMutateFn = func(resourceMap *IndexedResources) {
server.ResourceMapMutateFn = func(resourceMap *xdscommon.IndexedResources) {
if atomic.LoadUint32(&slowHackDisabled) == 1 {
return
}
if em, ok := resourceMap.Index[EndpointType]; ok {
if em, ok := resourceMap.Index[xdscommon.EndpointType]; ok {
for k := range em {
if strings.Contains(k, "geo-cache") {
delete(em, k)
@ -543,7 +544,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
snap = newTestSnapshot(t, nil, "")
// Send initial cluster discover.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -554,7 +555,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
mgr.DeliverConfig(t, sid, snap)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
@ -564,7 +565,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
@ -572,7 +573,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1)
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
@ -581,7 +582,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
//
// NOTE: we do NOT return back geo-cache yet
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
@ -593,14 +594,14 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, ListenerType, nil)
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
@ -613,7 +614,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3)
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
})
// Disable hack. Need to wait for one more event to wake up the loop.
@ -626,7 +627,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
mgr.DeliverConfig(t, sid, snap)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:geo-cache"),
@ -637,7 +638,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 4)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)
})
@ -674,7 +675,7 @@ func TestServer_DeltaAggregatedResources_v3_GetAllClusterAfterConsulRestarted(t
// This is to simulate the discovery request call from envoy after disconnected from consul ads stream.
//
// We need to force it to be an older version of envoy so that the logic shifts.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"local_app",
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
@ -697,7 +698,7 @@ func TestServer_DeltaAggregatedResources_v3_GetAllClusterAfterConsulRestarted(t
mgr.DeliverConfig(t, sid, snap)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
@ -734,7 +735,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
snap = newTestSnapshot(t, nil, "")
// Send initial cluster discover.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -745,7 +746,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
mgr.DeliverConfig(t, sid, snap)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
@ -755,7 +756,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
@ -763,14 +764,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1)
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
@ -782,14 +783,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, ListenerType, nil)
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
@ -802,7 +803,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3)
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
})
runStep(t, "trigger cluster update needing implicit endpoint replacements", func(t *testing.T) {
@ -816,7 +817,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
// The cluster is updated
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(4),
Resources: makeTestResources(t,
// SAME makeTestCluster(t, snap, "tcp:local_app"),
@ -825,19 +826,19 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
),
})
envoy.SendDeltaReqACK(t, ClusterType, 4)
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 4)
// And we re-send the endpoints for the updated cluster after getting the
// ACK for the cluster.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
// SAME makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
envoy.SendDeltaReqACK(t, EndpointType, 5)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5)
// And no other response yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -869,7 +870,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
runStep(t, "get into initial state", func(t *testing.T) {
// Send initial cluster discover (empty payload)
envoy.SendDeltaReq(t, ClusterType, nil)
envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -887,7 +888,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
mgr.DeliverConfig(t, sid, snap)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
@ -897,7 +898,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
})
// Envoy then tries to discover endpoints for those clusters.
envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
@ -905,14 +906,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
})
// It also (in parallel) issues the cluster ACK
envoy.SendDeltaReqACK(t, ClusterType, 1)
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "http2:db"),
@ -924,14 +925,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends listener request
envoy.SendDeltaReq(t, ListenerType, nil)
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
// It also (in parallel) issues the endpoint ACK
envoy.SendDeltaReqACK(t, EndpointType, 2)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(3),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
@ -944,25 +945,25 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// Envoy now sends routes request
envoy.SendDeltaReq(t, RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{
envoy.SendDeltaReq(t, xdscommon.RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db",
},
})
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 3)
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
// And should get a response immediately.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: RouteType,
TypeUrl: xdscommon.RouteType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestRoute(t, "http2:db"),
),
})
envoy.SendDeltaReqACK(t, RouteType, 4)
envoy.SendDeltaReqACK(t, xdscommon.RouteType, 4)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
@ -984,14 +985,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
// db cluster is refreshed (unrelated to the test scenario other than it's required)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "http:db"),
),
})
envoy.SendDeltaReqACK(t, ClusterType, 5)
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 5)
// The behaviors of Cluster updates triggering re-sends of Endpoint updates
// tested in TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints
@ -999,18 +1000,18 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
// this exchange to get to the part we care about.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: EndpointType,
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(6),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "http:db"),
),
})
envoy.SendDeltaReqACK(t, EndpointType, 6)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 6)
// the listener is updated
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(7),
Resources: makeTestResources(t,
makeTestListener(t, snap, "http:db:rds"),
@ -1018,18 +1019,18 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
})
// ACKs the listener
envoy.SendDeltaReqACK(t, ListenerType, 7)
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 7)
// THE ACTUAL THING WE CARE ABOUT: replaced route config
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: RouteType,
TypeUrl: xdscommon.RouteType,
Nonce: hexString(8),
Resources: makeTestResources(t,
makeTestRoute(t, "http2:db"),
),
})
envoy.SendDeltaReqACK(t, RouteType, 8)
envoy.SendDeltaReqACK(t, xdscommon.RouteType, 8)
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
})
@ -1134,11 +1135,11 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
// Send initial listener discover, in real life Envoy always sends cluster
// first but it doesn't really matter and listener has a response that
// includes the token in the ext rbac filter so lets us test more stuff.
envoy.SendDeltaReq(t, ListenerType, nil)
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
if !tt.wantDenied {
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestListener(t, snap, "tcp:public_listener"),
@ -1208,7 +1209,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri
mgr.RegisterProxy(t, sid)
// Send initial cluster discover (OK)
envoy.SendDeltaReq(t, ClusterType, nil)
envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)
{
err, ok := getError()
require.NoError(t, err)
@ -1228,7 +1229,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri
mgr.DeliverConfig(t, sid, snap)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
@ -1239,7 +1240,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri
// It also (in parallel) issues the next cluster request (which acts as an ACK
// of the version we sent)
envoy.SendDeltaReq(t, ClusterType, nil)
envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -1306,7 +1307,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa
mgr.RegisterProxy(t, sid)
// Send initial cluster discover (OK)
envoy.SendDeltaReq(t, ClusterType, nil)
envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)
{
err, ok := getError()
require.NoError(t, err)
@ -1326,7 +1327,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa
mgr.DeliverConfig(t, sid, snap)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
@ -1337,7 +1338,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa
// It also (in parallel) issues the next cluster request (which acts as an ACK
// of the version we sent)
envoy.SendDeltaReq(t, ClusterType, nil)
envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -1378,7 +1379,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
mgr.RegisterProxy(t, sid)
// Send initial cluster discover
envoy.SendDeltaReq(t, ClusterType, nil)
envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
@ -1388,25 +1389,25 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
mgr.DeliverConfig(t, sid, snap)
// REQ: clusters
envoy.SendDeltaReq(t, ClusterType, nil)
envoy.SendDeltaReq(t, xdscommon.ClusterType, nil)
// RESP: cluster
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
})
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACK: clusters
envoy.SendDeltaReqACK(t, ClusterType, 1)
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// REQ: listeners
envoy.SendDeltaReq(t, ListenerType, nil)
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
// RESP: listeners
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ListenerType,
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(2),
})

View File

@ -15,6 +15,7 @@ import (
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/sdk/testutil"
)
@ -512,7 +513,7 @@ func TestEndpointsFromSnapshot(t *testing.T) {
sort.Slice(endpoints, func(i, j int) bool {
return endpoints[i].(*envoy_endpoint_v3.ClusterLoadAssignment).ClusterName < endpoints[j].(*envoy_endpoint_v3.ClusterLoadAssignment).ClusterName
})
r, err := createResponse(EndpointType, "00000001", "00000001", endpoints)
r, err := createResponse(xdscommon.EndpointType, "00000001", "00000001", endpoints)
require.NoError(t, err)
t.Run("current", func(t *testing.T) {

View File

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types"
)
@ -667,7 +668,7 @@ func TestListenersFromSnapshot(t *testing.T) {
return listeners[i].(*envoy_listener_v3.Listener).Name < listeners[j].(*envoy_listener_v3.Listener).Name
})
r, err := createResponse(ListenerType, "00000001", "00000001", listeners)
r, err := createResponse(xdscommon.ListenerType, "00000001", "00000001", listeners)
require.NoError(t, err)
t.Run("current", func(t *testing.T) {

View File

@ -7,6 +7,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/xds/xdscommon"
)
// ResourceGenerator is associated with a single gRPC stream and creates xDS
@ -36,7 +37,7 @@ func newResourceGenerator(
func (g *ResourceGenerator) allResourcesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) (map[string][]proto.Message, error) {
all := make(map[string][]proto.Message)
for _, typeUrl := range []string{ListenerType, RouteType, ClusterType, EndpointType} {
for _, typeUrl := range []string{xdscommon.ListenerType, xdscommon.RouteType, xdscommon.ClusterType, xdscommon.EndpointType} {
res, err := g.resourcesFromSnapshot(typeUrl, cfgSnap)
if err != nil {
return nil, fmt.Errorf("failed to generate xDS resources for %q: %v", typeUrl, err)
@ -48,13 +49,13 @@ func (g *ResourceGenerator) allResourcesFromSnapshot(cfgSnap *proxycfg.ConfigSna
func (g *ResourceGenerator) resourcesFromSnapshot(typeUrl string, cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
switch typeUrl {
case ListenerType:
case xdscommon.ListenerType:
return g.listenersFromSnapshot(cfgSnap)
case RouteType:
case xdscommon.RouteType:
return g.routesFromSnapshot(cfgSnap)
case ClusterType:
case xdscommon.ClusterType:
return g.clustersFromSnapshot(cfgSnap)
case EndpointType:
case xdscommon.EndpointType:
return g.endpointsFromSnapshot(cfgSnap)
default:
return nil, fmt.Errorf("unknown typeUrl: %s", typeUrl)

View File

@ -15,6 +15,7 @@ import (
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/proxysupport"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/sdk/testutil"
)
@ -201,7 +202,7 @@ func TestRoutesFromSnapshot(t *testing.T) {
sort.Slice(routes, func(i, j int) bool {
return routes[i].(*envoy_route_v3.RouteConfiguration).Name < routes[j].(*envoy_route_v3.RouteConfiguration).Name
})
r, err := createResponse(RouteType, "00000001", "00000001", routes)
r, err := createResponse(xdscommon.RouteType, "00000001", "00000001", routes)
require.NoError(t, err)
t.Run("current", func(t *testing.T) {

View File

@ -23,6 +23,7 @@ import (
agentgrpc "github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/xdscommon"
"github.com/hashicorp/consul/tlsutil"
)
@ -37,23 +38,6 @@ var StatsGauges = []prometheus.GaugeDefinition{
type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer
const (
// Resource types in xDS v3. These are copied from
// envoyproxy/go-control-plane/pkg/resource/v3/resource.go since we don't need any of
// the rest of that package.
apiTypePrefix = "type.googleapis.com/"
// EndpointType is the TypeURL for Endpoint discovery responses.
EndpointType = apiTypePrefix + "envoy.config.endpoint.v3.ClusterLoadAssignment"
// ClusterType is the TypeURL for Cluster discovery responses.
ClusterType = apiTypePrefix + "envoy.config.cluster.v3.Cluster"
// RouteType is the TypeURL for Route discovery responses.
RouteType = apiTypePrefix + "envoy.config.route.v3.RouteConfiguration"
// ListenerType is the TypeURL for Listener discovery responses.
ListenerType = apiTypePrefix + "envoy.config.listener.v3.Listener"
// PublicListenerName is the name we give the public listener in Envoy config.
PublicListenerName = "public_listener"
@ -145,7 +129,7 @@ type Server struct {
AuthCheckFrequency time.Duration
// ResourceMapMutateFn exclusively exists for testing purposes.
ResourceMapMutateFn func(resourceMap *IndexedResources)
ResourceMapMutateFn func(resourceMap *xdscommon.IndexedResources)
activeStreams *activeStreamCounters
}

View File

@ -0,0 +1,49 @@
package xdscommon
import (
"github.com/golang/protobuf/proto"
)
const (
// Resource types in xDS v3. These are copied from
// envoyproxy/go-control-plane/pkg/resource/v3/resource.go since we don't need any of
// the rest of that package.
apiTypePrefix = "type.googleapis.com/"
// EndpointType is the TypeURL for Endpoint discovery responses.
EndpointType = apiTypePrefix + "envoy.config.endpoint.v3.ClusterLoadAssignment"
// ClusterType is the TypeURL for Cluster discovery responses.
ClusterType = apiTypePrefix + "envoy.config.cluster.v3.Cluster"
// RouteType is the TypeURL for Route discovery responses.
RouteType = apiTypePrefix + "envoy.config.route.v3.RouteConfiguration"
// ListenerType is the TypeURL for Listener discovery responses.
ListenerType = apiTypePrefix + "envoy.config.listener.v3.Listener"
)
type IndexedResources struct {
// Index is a map of typeURL => resourceName => resource
Index map[string]map[string]proto.Message
// ChildIndex is a map of typeURL => parentResourceName => list of
// childResourceNames. This only applies if the child and parent do not
// share a name.
ChildIndex map[string]map[string][]string
}
func EmptyIndexedResources() *IndexedResources {
return &IndexedResources{
Index: map[string]map[string]proto.Message{
ListenerType: make(map[string]proto.Message),
RouteType: make(map[string]proto.Message),
ClusterType: make(map[string]proto.Message),
EndpointType: make(map[string]proto.Message),
},
ChildIndex: map[string]map[string][]string{
ListenerType: make(map[string][]string),
ClusterType: make(map[string][]string),
},
}
}