mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 03:29:43 +00:00
xds: Remove APIGateway ToIngress function (#17453)
* xds generation for routes api gateway * Update gateway.go * move buildHttpRoute into xds package * Update agent/consul/discoverychain/gateway.go * remove unneeded function * convert http route code to only run for http protocol to future proof code path * Update agent/consul/discoverychain/gateway.go Co-authored-by: Mike Morris <mikemorris@users.noreply.github.com> * fix tests, clean up http check logic * clean up todo * Fix casing in docstring * Fix import block, adjust docstrings * Rename func * Consolidate docstring onto single line * Remove ToIngress() conversion for APIGW, which generates its own xDS now * update name and comment * use constant value * use constant * rename readyUpstreams to readyListeners to better communicate what that function is doing --------- Co-authored-by: Mike Morris <mikemorris@users.noreply.github.com> Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com>
This commit is contained in:
parent
127eba6d61
commit
b147323fb0
@ -378,23 +378,6 @@ func (o *configSnapshotAPIGateway) DeepCopy() *configSnapshotAPIGateway {
|
||||
cp.Listeners[k2] = cp_Listeners_v2
|
||||
}
|
||||
}
|
||||
if o.ListenerCertificates != nil {
|
||||
cp.ListenerCertificates = make(map[IngressListenerKey][]structs.InlineCertificateConfigEntry, len(o.ListenerCertificates))
|
||||
for k2, v2 := range o.ListenerCertificates {
|
||||
var cp_ListenerCertificates_v2 []structs.InlineCertificateConfigEntry
|
||||
if v2 != nil {
|
||||
cp_ListenerCertificates_v2 = make([]structs.InlineCertificateConfigEntry, len(v2))
|
||||
copy(cp_ListenerCertificates_v2, v2)
|
||||
for i3 := range v2 {
|
||||
{
|
||||
retV := v2[i3].DeepCopy()
|
||||
cp_ListenerCertificates_v2[i3] = *retV
|
||||
}
|
||||
}
|
||||
}
|
||||
cp.ListenerCertificates[k2] = cp_ListenerCertificates_v2
|
||||
}
|
||||
}
|
||||
if o.BoundListeners != nil {
|
||||
cp.BoundListeners = make(map[string]structs.BoundAPIGatewayListener, len(o.BoundListeners))
|
||||
for k2, v2 := range o.BoundListeners {
|
||||
|
@ -735,109 +735,10 @@ type configSnapshotAPIGateway struct {
|
||||
// Listeners is the original listener config from the api-gateway config
|
||||
// entry to save us trying to pass fields through Upstreams
|
||||
Listeners map[string]structs.APIGatewayListener
|
||||
// this acts as an intermediary for inlining certificates
|
||||
// FUTURE(nathancoleman) Remove when ToIngress is removed
|
||||
ListenerCertificates map[IngressListenerKey][]structs.InlineCertificateConfigEntry
|
||||
|
||||
BoundListeners map[string]structs.BoundAPIGatewayListener
|
||||
}
|
||||
|
||||
// ToIngress converts a configSnapshotAPIGateway to a configSnapshotIngressGateway.
|
||||
// This is temporary, for the sake of re-using existing codepaths when integrating
|
||||
// Consul API Gateway into Consul core.
|
||||
//
|
||||
// FUTURE(nathancoleman): Remove when API gateways have custom snapshot generation
|
||||
func (c *configSnapshotAPIGateway) ToIngress(datacenter string) (configSnapshotIngressGateway, error) {
|
||||
// Convert API Gateway Listeners to Ingress Listeners.
|
||||
ingressListeners := make(map[IngressListenerKey]structs.IngressListener, len(c.Listeners))
|
||||
ingressUpstreams := make(map[IngressListenerKey]structs.Upstreams, len(c.Listeners))
|
||||
synthesizedChains := map[UpstreamID]*structs.CompiledDiscoveryChain{}
|
||||
watchedUpstreamEndpoints := make(map[UpstreamID]map[string]structs.CheckServiceNodes)
|
||||
watchedGatewayEndpoints := make(map[UpstreamID]map[string]structs.CheckServiceNodes)
|
||||
|
||||
// reset the cached certificates
|
||||
c.ListenerCertificates = make(map[IngressListenerKey][]structs.InlineCertificateConfigEntry)
|
||||
|
||||
for name, listener := range c.Listeners {
|
||||
boundListener, ok := c.BoundListeners[name]
|
||||
if !ok {
|
||||
// Skip any listeners that don't have a bound listener. Once the bound listener is created, this will be run again.
|
||||
continue
|
||||
}
|
||||
|
||||
if !c.GatewayConfig.ListenerIsReady(name) {
|
||||
// skip any listeners that might be in an invalid state
|
||||
continue
|
||||
}
|
||||
|
||||
ingressListener := structs.IngressListener{
|
||||
Port: listener.Port,
|
||||
Protocol: string(listener.Protocol),
|
||||
}
|
||||
|
||||
// Create a synthesized discovery chain for each service.
|
||||
services, upstreams, compiled, err := c.synthesizeChains(datacenter, listener, boundListener)
|
||||
if err != nil {
|
||||
return configSnapshotIngressGateway{}, err
|
||||
}
|
||||
|
||||
if len(upstreams) == 0 {
|
||||
// skip if we can't construct any upstreams
|
||||
continue
|
||||
}
|
||||
|
||||
ingressListener.Services = services
|
||||
for i, service := range services {
|
||||
id := NewUpstreamIDFromServiceName(structs.NewServiceName(service.Name, &service.EnterpriseMeta))
|
||||
upstreamEndpoints := make(map[string]structs.CheckServiceNodes)
|
||||
gatewayEndpoints := make(map[string]structs.CheckServiceNodes)
|
||||
|
||||
// add the watched endpoints and gateway endpoints under the new upstream
|
||||
for _, endpoints := range c.WatchedUpstreamEndpoints {
|
||||
for targetID, endpoint := range endpoints {
|
||||
upstreamEndpoints[targetID] = endpoint
|
||||
}
|
||||
}
|
||||
for _, endpoints := range c.WatchedGatewayEndpoints {
|
||||
for targetID, endpoint := range endpoints {
|
||||
gatewayEndpoints[targetID] = endpoint
|
||||
}
|
||||
}
|
||||
|
||||
synthesizedChains[id] = compiled[i]
|
||||
watchedUpstreamEndpoints[id] = upstreamEndpoints
|
||||
watchedGatewayEndpoints[id] = gatewayEndpoints
|
||||
}
|
||||
|
||||
key := IngressListenerKey{
|
||||
Port: listener.Port,
|
||||
Protocol: string(listener.Protocol),
|
||||
}
|
||||
|
||||
// Configure TLS for the ingress listener
|
||||
tls, err := c.toIngressTLS(key, listener, boundListener)
|
||||
if err != nil {
|
||||
return configSnapshotIngressGateway{}, err
|
||||
}
|
||||
|
||||
ingressListener.TLS = tls
|
||||
ingressListeners[key] = ingressListener
|
||||
ingressUpstreams[key] = upstreams
|
||||
}
|
||||
|
||||
snapshotUpstreams := c.DeepCopy().ConfigSnapshotUpstreams
|
||||
snapshotUpstreams.DiscoveryChain = synthesizedChains
|
||||
snapshotUpstreams.WatchedUpstreamEndpoints = watchedUpstreamEndpoints
|
||||
snapshotUpstreams.WatchedGatewayEndpoints = watchedGatewayEndpoints
|
||||
|
||||
return configSnapshotIngressGateway{
|
||||
Upstreams: ingressUpstreams,
|
||||
ConfigSnapshotUpstreams: snapshotUpstreams,
|
||||
GatewayConfigLoaded: true,
|
||||
Listeners: ingressListeners,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *configSnapshotAPIGateway) synthesizeChains(datacenter string, listener structs.APIGatewayListener, boundListener structs.BoundAPIGatewayListener) ([]structs.IngressService, structs.Upstreams, []*structs.CompiledDiscoveryChain, error) {
|
||||
chains := []*structs.CompiledDiscoveryChain{}
|
||||
trustDomain := ""
|
||||
@ -914,27 +815,6 @@ DOMAIN_LOOP:
|
||||
return services, upstreams, compiled, err
|
||||
}
|
||||
|
||||
func (c *configSnapshotAPIGateway) toIngressTLS(key IngressListenerKey, listener structs.APIGatewayListener, bound structs.BoundAPIGatewayListener) (*structs.GatewayTLSConfig, error) {
|
||||
if len(listener.TLS.Certificates) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
for _, certRef := range bound.Certificates {
|
||||
cert, ok := c.Certificates.Get(certRef)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
c.ListenerCertificates[key] = append(c.ListenerCertificates[key], *cert)
|
||||
}
|
||||
|
||||
return &structs.GatewayTLSConfig{
|
||||
Enabled: true,
|
||||
TLSMinVersion: listener.TLS.MinVersion,
|
||||
TLSMaxVersion: listener.TLS.MaxVersion,
|
||||
CipherSuites: listener.TLS.CipherSuites,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type configSnapshotIngressGateway struct {
|
||||
ConfigSnapshotUpstreams
|
||||
|
||||
|
@ -11,10 +11,7 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
fuzz "github.com/google/gofuzz"
|
||||
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/proto/private/pbpeering"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestConfigSnapshot_Clone(t *testing.T) {
|
||||
@ -56,39 +53,3 @@ func TestConfigSnapshot_Clone(t *testing.T) {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestAPIGatewaySnapshotToIngressGatewaySnapshot(t *testing.T) {
|
||||
cases := map[string]struct {
|
||||
apiGatewaySnapshot *configSnapshotAPIGateway
|
||||
expected configSnapshotIngressGateway
|
||||
}{
|
||||
"default": {
|
||||
apiGatewaySnapshot: &configSnapshotAPIGateway{
|
||||
Listeners: map[string]structs.APIGatewayListener{},
|
||||
},
|
||||
expected: configSnapshotIngressGateway{
|
||||
GatewayConfigLoaded: true,
|
||||
ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{
|
||||
PeerUpstreamEndpoints: watch.NewMap[UpstreamID, structs.CheckServiceNodes](),
|
||||
WatchedLocalGWEndpoints: watch.NewMap[string, structs.CheckServiceNodes](),
|
||||
WatchedGatewayEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{},
|
||||
WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{},
|
||||
UpstreamPeerTrustBundles: watch.NewMap[string, *pbpeering.PeeringTrustBundle](),
|
||||
DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{},
|
||||
},
|
||||
Listeners: map[IngressListenerKey]structs.IngressListener{},
|
||||
Defaults: structs.IngressServiceConfig{},
|
||||
Upstreams: map[IngressListenerKey]structs.Upstreams{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
actual, err := tc.apiGatewaySnapshot.ToIngress("dc1")
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, tc.expected, actual)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -813,10 +813,10 @@ func (s *ResourceGenerator) clustersFromSnapshotIngressGateway(cfgSnap *proxycfg
|
||||
func (s *ResourceGenerator) clustersFromSnapshotAPIGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||
var clusters []proto.Message
|
||||
createdClusters := make(map[proxycfg.UpstreamID]bool)
|
||||
readyUpstreamsList := getReadyUpstreams(cfgSnap)
|
||||
readyListeners := getReadyListeners(cfgSnap)
|
||||
|
||||
for _, readyUpstreams := range readyUpstreamsList {
|
||||
for _, upstream := range readyUpstreams.upstreams {
|
||||
for _, readyListener := range readyListeners {
|
||||
for _, upstream := range readyListener.upstreams {
|
||||
uid := proxycfg.NewUpstreamID(&upstream)
|
||||
|
||||
// If we've already created a cluster for this upstream, skip it. Multiple listeners may
|
||||
|
@ -521,69 +521,14 @@ func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycf
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
// helper struct to persist upstream parent information when ready upstream list is built out
|
||||
type readyUpstreams struct {
|
||||
listenerKey proxycfg.APIGatewayListenerKey
|
||||
listenerCfg structs.APIGatewayListener
|
||||
boundListenerCfg structs.BoundAPIGatewayListener
|
||||
routeReference structs.ResourceReference
|
||||
upstreams []structs.Upstream
|
||||
}
|
||||
|
||||
// getReadyUpstreams returns a map containing the list of upstreams for each listener that is ready
|
||||
func getReadyUpstreams(cfgSnap *proxycfg.ConfigSnapshot) map[string]readyUpstreams {
|
||||
|
||||
ready := map[string]readyUpstreams{}
|
||||
for _, l := range cfgSnap.APIGateway.Listeners {
|
||||
// Only include upstreams for listeners that are ready
|
||||
if !cfgSnap.APIGateway.GatewayConfig.ListenerIsReady(l.Name) {
|
||||
continue
|
||||
}
|
||||
|
||||
// For each route bound to the listener
|
||||
boundListener := cfgSnap.APIGateway.BoundListeners[l.Name]
|
||||
for _, routeRef := range boundListener.Routes {
|
||||
// Get all upstreams for the route
|
||||
routeUpstreams, ok := cfgSnap.APIGateway.Upstreams[routeRef]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter to upstreams that attach to this specific listener since
|
||||
// a route can bind to + have upstreams for multiple listeners
|
||||
listenerKey := proxycfg.APIGatewayListenerKeyFromListener(l)
|
||||
routeUpstreamsForListener, ok := routeUpstreams[listenerKey]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, upstream := range routeUpstreamsForListener {
|
||||
// Insert or update readyUpstreams for the listener to include this upstream
|
||||
r, ok := ready[l.Name]
|
||||
if !ok {
|
||||
r = readyUpstreams{
|
||||
listenerKey: listenerKey,
|
||||
listenerCfg: l,
|
||||
boundListenerCfg: boundListener,
|
||||
routeReference: routeRef,
|
||||
}
|
||||
}
|
||||
r.upstreams = append(r.upstreams, upstream)
|
||||
ready[l.Name] = r
|
||||
}
|
||||
}
|
||||
}
|
||||
return ready
|
||||
}
|
||||
|
||||
func (s *ResourceGenerator) endpointsFromSnapshotAPIGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||
var resources []proto.Message
|
||||
createdClusters := make(map[proxycfg.UpstreamID]struct{})
|
||||
|
||||
readyUpstreamsList := getReadyUpstreams(cfgSnap)
|
||||
readyListeners := getReadyListeners(cfgSnap)
|
||||
|
||||
for _, readyUpstreams := range readyUpstreamsList {
|
||||
for _, u := range readyUpstreams.upstreams {
|
||||
for _, readyListener := range readyListeners {
|
||||
for _, u := range readyListener.upstreams {
|
||||
uid := proxycfg.NewUpstreamID(&u)
|
||||
|
||||
// If we've already created endpoints for this upstream, skip it. Multiple listeners may
|
||||
|
@ -20,12 +20,12 @@ import (
|
||||
func (s *ResourceGenerator) makeAPIGatewayListeners(address string, cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||
var resources []proto.Message
|
||||
|
||||
readyUpstreamsList := getReadyUpstreams(cfgSnap)
|
||||
readyListeners := getReadyListeners(cfgSnap)
|
||||
|
||||
for _, readyUpstreams := range readyUpstreamsList {
|
||||
listenerCfg := readyUpstreams.listenerCfg
|
||||
listenerKey := readyUpstreams.listenerKey
|
||||
boundListener := readyUpstreams.boundListenerCfg
|
||||
for _, readyListener := range readyListeners {
|
||||
listenerCfg := readyListener.listenerCfg
|
||||
listenerKey := readyListener.listenerKey
|
||||
boundListener := readyListener.boundListenerCfg
|
||||
|
||||
var certs []structs.InlineCertificateConfigEntry
|
||||
for _, certRef := range boundListener.Certificates {
|
||||
@ -49,7 +49,7 @@ func (s *ResourceGenerator) makeAPIGatewayListeners(address string, cfgSnap *pro
|
||||
// We rely on the invariant of upstreams slice always having at least 1
|
||||
// member, because this key/value pair is created only when a
|
||||
// GatewayService is returned in the RPC
|
||||
u := readyUpstreams.upstreams[0]
|
||||
u := readyListener.upstreams[0]
|
||||
uid := proxycfg.NewUpstreamID(&u)
|
||||
|
||||
chain := cfgSnap.APIGateway.DiscoveryChain[uid]
|
||||
@ -172,7 +172,7 @@ func (s *ResourceGenerator) makeAPIGatewayListeners(address string, cfgSnap *pro
|
||||
|
||||
// See if there are other services that didn't have specific SNI-matching
|
||||
// filter chains. If so add a default filterchain to serve them.
|
||||
if len(sniFilterChains) < len(readyUpstreams.upstreams) && !isAPIGatewayWithTLS {
|
||||
if len(sniFilterChains) < len(readyListener.upstreams) && !isAPIGatewayWithTLS {
|
||||
defaultFilter, err := makeListenerFilter(filterOpts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -197,6 +197,61 @@ func (s *ResourceGenerator) makeAPIGatewayListeners(address string, cfgSnap *pro
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
// helper struct to persist upstream parent information when ready upstream list is built out
|
||||
type readyListener struct {
|
||||
listenerKey proxycfg.APIGatewayListenerKey
|
||||
listenerCfg structs.APIGatewayListener
|
||||
boundListenerCfg structs.BoundAPIGatewayListener
|
||||
routeReference structs.ResourceReference
|
||||
upstreams []structs.Upstream
|
||||
}
|
||||
|
||||
// getReadyListeners returns a map containing the list of upstreams for each listener that is ready
|
||||
func getReadyListeners(cfgSnap *proxycfg.ConfigSnapshot) map[string]readyListener {
|
||||
|
||||
ready := map[string]readyListener{}
|
||||
for _, l := range cfgSnap.APIGateway.Listeners {
|
||||
// Only include upstreams for listeners that are ready
|
||||
if !cfgSnap.APIGateway.GatewayConfig.ListenerIsReady(l.Name) {
|
||||
continue
|
||||
}
|
||||
|
||||
// For each route bound to the listener
|
||||
boundListener := cfgSnap.APIGateway.BoundListeners[l.Name]
|
||||
for _, routeRef := range boundListener.Routes {
|
||||
// Get all upstreams for the route
|
||||
routeUpstreams, ok := cfgSnap.APIGateway.Upstreams[routeRef]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter to upstreams that attach to this specific listener since
|
||||
// a route can bind to + have upstreams for multiple listeners
|
||||
listenerKey := proxycfg.APIGatewayListenerKeyFromListener(l)
|
||||
routeUpstreamsForListener, ok := routeUpstreams[listenerKey]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, upstream := range routeUpstreamsForListener {
|
||||
// Insert or update readyListener for the listener to include this upstream
|
||||
r, ok := ready[l.Name]
|
||||
if !ok {
|
||||
r = readyListener{
|
||||
listenerKey: listenerKey,
|
||||
listenerCfg: l,
|
||||
boundListenerCfg: boundListener,
|
||||
routeReference: routeRef,
|
||||
}
|
||||
}
|
||||
r.upstreams = append(r.upstreams, upstream)
|
||||
ready[l.Name] = r
|
||||
}
|
||||
}
|
||||
}
|
||||
return ready
|
||||
}
|
||||
|
||||
func makeDownstreamTLSContextFromSnapshotAPIListenerConfig(cfgSnap *proxycfg.ConfigSnapshot, listenerCfg structs.APIGatewayListener) (*envoy_tls_v3.DownstreamTlsContext, error) {
|
||||
var downstreamContext *envoy_tls_v3.DownstreamTlsContext
|
||||
|
||||
|
@ -425,12 +425,11 @@ func (s *ResourceGenerator) routesForIngressGateway(cfgSnap *proxycfg.ConfigSnap
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// routesForAPIGateway returns the xDS API representation of the
|
||||
// "routes" in the snapshot.
|
||||
// routesForAPIGateway returns the xDS API representation of the "routes" in the snapshot.
|
||||
func (s *ResourceGenerator) routesForAPIGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||
var result []proto.Message
|
||||
|
||||
readyUpstreamsList := getReadyUpstreams(cfgSnap)
|
||||
readyUpstreamsList := getReadyListeners(cfgSnap)
|
||||
|
||||
for _, readyUpstreams := range readyUpstreamsList {
|
||||
listenerCfg := readyUpstreams.listenerCfg
|
||||
@ -478,7 +477,7 @@ func (s *ResourceGenerator) routesForAPIGateway(cfgSnap *proxycfg.ConfigSnapshot
|
||||
return nil, err
|
||||
}
|
||||
|
||||
injectHeaderManipToVirtualHostAPIGateway(&reformatedRoute, virtualHost)
|
||||
addHeaderFiltersToVirtualHost(&reformatedRoute, virtualHost)
|
||||
|
||||
defaultRoute.VirtualHosts = append(defaultRoute.VirtualHosts, virtualHost)
|
||||
}
|
||||
@ -1098,7 +1097,7 @@ func injectHeaderManipToRoute(dest *structs.ServiceRouteDestination, r *envoy_ro
|
||||
return nil
|
||||
}
|
||||
|
||||
func injectHeaderManipToVirtualHostAPIGateway(dest *structs.HTTPRouteConfigEntry, vh *envoy_route_v3.VirtualHost) {
|
||||
func addHeaderFiltersToVirtualHost(dest *structs.HTTPRouteConfigEntry, vh *envoy_route_v3.VirtualHost) {
|
||||
for _, rule := range dest.Rules {
|
||||
for _, header := range rule.Filters.Headers {
|
||||
vh.RequestHeadersToAdd = append(vh.RequestHeadersToAdd, makeHeadersValueOptions(header.Add, true)...)
|
||||
|
Loading…
x
Reference in New Issue
Block a user