mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 21:35:52 +00:00
xds: generate endpoints directly from API gateway snapshot (#17390)
* endpoints xds cluster configuration * resources test fix * fix reversion in resources_test * Update agent/proxycfg/api_gateway.go Co-authored-by: John Maguire <john.maguire@hashicorp.com> * gofmt * Modify getReadyUpstreams to filter upstreams by listener (#17410) Each listener would previously have all upstreams from any route that bound to the listener. This is problematic when a route bound to one listener also binds to other listeners and so includes upstreams for multiple listeners. The list for a given listener would then wind up including upstreams for other listeners. * Update agent/proxycfg/api_gateway.go Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com> * Restore import blocking * Skip to next route if route has no upstreams * cleanup * change set from bool to empty struct --------- Co-authored-by: John Maguire <john.maguire@hashicorp.com> Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com>
This commit is contained in:
parent
1d6a0c8f21
commit
134aac7c26
@ -125,10 +125,12 @@ func (h *handlerAPIGateway) handleUpdate(ctx context.Context, u UpdateEvent, sna
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return (*handlerUpstreams)(h).handleUpdateUpstreams(ctx, u, snap)
|
if err := (*handlerUpstreams)(h).handleUpdateUpstreams(ctx, u, snap); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return h.recompileDiscoveryChains(snap)
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleRootCAUpdate responds to changes in the watched root CA for a gateway
|
// handleRootCAUpdate responds to changes in the watched root CA for a gateway
|
||||||
@ -308,7 +310,6 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
|
|||||||
DestinationNamespace: service.NamespaceOrDefault(),
|
DestinationNamespace: service.NamespaceOrDefault(),
|
||||||
DestinationPartition: service.PartitionOrDefault(),
|
DestinationPartition: service.PartitionOrDefault(),
|
||||||
LocalBindPort: listener.Port,
|
LocalBindPort: listener.Port,
|
||||||
// TODO IngressHosts: g.Hosts,
|
|
||||||
// Pass the protocol that was configured on the listener in order
|
// Pass the protocol that was configured on the listener in order
|
||||||
// to force that protocol on the Envoy listener.
|
// to force that protocol on the Envoy listener.
|
||||||
Config: map[string]interface{}{
|
Config: map[string]interface{}{
|
||||||
@ -316,7 +317,7 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
listenerKey := APIGatewayListenerKey{Protocol: string(listener.Protocol), Port: listener.Port}
|
listenerKey := APIGatewayListenerKeyFromListener(listener)
|
||||||
upstreams[listenerKey] = append(upstreams[listenerKey], upstream)
|
upstreams[listenerKey] = append(upstreams[listenerKey], upstream)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -370,7 +371,7 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
listenerKey := APIGatewayListenerKey{Protocol: string(listener.Protocol), Port: listener.Port}
|
listenerKey := APIGatewayListenerKeyFromListener(listener)
|
||||||
upstreams[listenerKey] = append(upstreams[listenerKey], upstream)
|
upstreams[listenerKey] = append(upstreams[listenerKey], upstream)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -420,6 +421,45 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *handlerAPIGateway) recompileDiscoveryChains(snap *ConfigSnapshot) error {
|
||||||
|
synthesizedChains := map[UpstreamID]*structs.CompiledDiscoveryChain{}
|
||||||
|
|
||||||
|
for name, listener := range snap.APIGateway.Listeners {
|
||||||
|
boundListener, ok := snap.APIGateway.BoundListeners[name]
|
||||||
|
if !(ok && snap.APIGateway.GatewayConfig.ListenerIsReady(name)) {
|
||||||
|
// Skip any listeners that don't have a bound listener. Once the bound listener is created, this will be run again.
|
||||||
|
// skip any listeners that might be in an invalid state
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a synthesized discovery chain for each service.
|
||||||
|
services, upstreams, compiled, err := snap.APIGateway.synthesizeChains(h.source.Datacenter, listener, boundListener)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(upstreams) == 0 {
|
||||||
|
// skip if we can't construct any upstreams
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, service := range services {
|
||||||
|
id := NewUpstreamIDFromServiceName(structs.NewServiceName(service.Name, &service.EnterpriseMeta))
|
||||||
|
if compiled[i].ServiceName != service.Name {
|
||||||
|
return fmt.Errorf("Compiled Discovery chain for %s does not match service %s", compiled[i].ServiceName, id)
|
||||||
|
}
|
||||||
|
synthesizedChains[id] = compiled[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge in additional discovery chains
|
||||||
|
for id, chain := range synthesizedChains {
|
||||||
|
snap.APIGateway.DiscoveryChain[id] = chain
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// referenceIsForListener returns whether the provided structs.ResourceReference
|
// referenceIsForListener returns whether the provided structs.ResourceReference
|
||||||
// targets the provided structs.APIGatewayListener. For this to be true, the kind
|
// targets the provided structs.APIGatewayListener. For this to be true, the kind
|
||||||
// and name must match the structs.APIGatewayConfigEntry containing the listener,
|
// and name must match the structs.APIGatewayConfigEntry containing the listener,
|
||||||
|
@ -984,6 +984,10 @@ func (c *configSnapshotIngressGateway) isEmpty() bool {
|
|||||||
|
|
||||||
type APIGatewayListenerKey = IngressListenerKey
|
type APIGatewayListenerKey = IngressListenerKey
|
||||||
|
|
||||||
|
func APIGatewayListenerKeyFromListener(l structs.APIGatewayListener) APIGatewayListenerKey {
|
||||||
|
return APIGatewayListenerKey{Protocol: string(l.Protocol), Port: l.Port}
|
||||||
|
}
|
||||||
|
|
||||||
type IngressListenerKey struct {
|
type IngressListenerKey struct {
|
||||||
Protocol string
|
Protocol string
|
||||||
Port int
|
Port int
|
||||||
|
@ -41,13 +41,7 @@ func (s *ResourceGenerator) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapsh
|
|||||||
case structs.ServiceKindIngressGateway:
|
case structs.ServiceKindIngressGateway:
|
||||||
return s.endpointsFromSnapshotIngressGateway(cfgSnap)
|
return s.endpointsFromSnapshotIngressGateway(cfgSnap)
|
||||||
case structs.ServiceKindAPIGateway:
|
case structs.ServiceKindAPIGateway:
|
||||||
// TODO Find a cleaner solution, can't currently pass unexported property types
|
return s.endpointsFromSnapshotAPIGateway(cfgSnap)
|
||||||
var err error
|
|
||||||
cfgSnap.IngressGateway, err = cfgSnap.APIGateway.ToIngress(cfgSnap.Datacenter)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return s.endpointsFromSnapshotIngressGateway(cfgSnap)
|
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
||||||
}
|
}
|
||||||
@ -527,6 +521,98 @@ func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycf
|
|||||||
return resources, nil
|
return resources, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// helper struct to persist upstream parent information when ready upstream list is built out
|
||||||
|
type readyUpstreams struct {
|
||||||
|
listenerKey proxycfg.APIGatewayListenerKey
|
||||||
|
listenerCfg structs.APIGatewayListener
|
||||||
|
boundListenerCfg structs.BoundAPIGatewayListener
|
||||||
|
routeReference structs.ResourceReference
|
||||||
|
upstreams []structs.Upstream
|
||||||
|
}
|
||||||
|
|
||||||
|
// getReadyUpstreams returns a map containing the list of upstreams for each listener that is ready
|
||||||
|
func getReadyUpstreams(cfgSnap *proxycfg.ConfigSnapshot) map[string]readyUpstreams {
|
||||||
|
|
||||||
|
ready := map[string]readyUpstreams{}
|
||||||
|
for _, l := range cfgSnap.APIGateway.Listeners {
|
||||||
|
// Only include upstreams for listeners that are ready
|
||||||
|
if !cfgSnap.APIGateway.GatewayConfig.ListenerIsReady(l.Name) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// For each route bound to the listener
|
||||||
|
boundListener := cfgSnap.APIGateway.BoundListeners[l.Name]
|
||||||
|
for _, routeRef := range boundListener.Routes {
|
||||||
|
// Get all upstreams for the route
|
||||||
|
routeUpstreams, ok := cfgSnap.APIGateway.Upstreams[routeRef]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter to upstreams that attach to this specific listener since
|
||||||
|
// a route can bind to + have upstreams for multiple listeners
|
||||||
|
listenerKey := proxycfg.APIGatewayListenerKeyFromListener(l)
|
||||||
|
routeUpstreamsForListener, ok := routeUpstreams[listenerKey]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, upstream := range routeUpstreamsForListener {
|
||||||
|
// Insert or update readyUpstreams for the listener to include this upstream
|
||||||
|
r, ok := ready[l.Name]
|
||||||
|
if !ok {
|
||||||
|
r = readyUpstreams{
|
||||||
|
listenerKey: listenerKey,
|
||||||
|
listenerCfg: l,
|
||||||
|
boundListenerCfg: boundListener,
|
||||||
|
routeReference: routeRef,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r.upstreams = append(r.upstreams, upstream)
|
||||||
|
ready[l.Name] = r
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ready
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ResourceGenerator) endpointsFromSnapshotAPIGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||||
|
var resources []proto.Message
|
||||||
|
createdClusters := make(map[proxycfg.UpstreamID]struct{})
|
||||||
|
|
||||||
|
readyUpstreamsList := getReadyUpstreams(cfgSnap)
|
||||||
|
|
||||||
|
for _, readyUpstreams := range readyUpstreamsList {
|
||||||
|
for _, u := range readyUpstreams.upstreams {
|
||||||
|
uid := proxycfg.NewUpstreamID(&u)
|
||||||
|
|
||||||
|
// If we've already created endpoints for this upstream, skip it. Multiple listeners may
|
||||||
|
// reference the same upstream, so we don't need to create duplicate endpoints in that case.
|
||||||
|
_, ok := createdClusters[uid]
|
||||||
|
if ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
endpoints, err := s.endpointsFromDiscoveryChain(
|
||||||
|
uid,
|
||||||
|
cfgSnap.APIGateway.DiscoveryChain[uid],
|
||||||
|
cfgSnap,
|
||||||
|
proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: u.DestinationPartition},
|
||||||
|
u.Config,
|
||||||
|
cfgSnap.APIGateway.WatchedUpstreamEndpoints[uid],
|
||||||
|
cfgSnap.APIGateway.WatchedGatewayEndpoints[uid],
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resources = append(resources, endpoints...)
|
||||||
|
createdClusters[uid] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return resources, nil
|
||||||
|
}
|
||||||
|
|
||||||
// used in clusters.go
|
// used in clusters.go
|
||||||
func makeEndpoint(host string, port int) *envoy_endpoint_v3.LbEndpoint {
|
func makeEndpoint(host string, port int) *envoy_endpoint_v3.LbEndpoint {
|
||||||
return &envoy_endpoint_v3.LbEndpoint{
|
return &envoy_endpoint_v3.LbEndpoint{
|
||||||
@ -628,6 +714,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
|
|||||||
|
|
||||||
var escapeHatchCluster *envoy_cluster_v3.Cluster
|
var escapeHatchCluster *envoy_cluster_v3.Cluster
|
||||||
if !forMeshGateway {
|
if !forMeshGateway {
|
||||||
|
|
||||||
cfg, err := structs.ParseUpstreamConfigNoDefaults(upstreamConfigMap)
|
cfg, err := structs.ParseUpstreamConfigNoDefaults(upstreamConfigMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Don't hard fail on a config typo, just warn. The parse func returns
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
||||||
|
@ -365,13 +365,20 @@ func getAPIGatewayGoldenTestCases(t *testing.T) []goldenTestCase {
|
|||||||
}},
|
}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}, []structs.BoundRoute{
|
},
|
||||||
|
[]structs.BoundRoute{
|
||||||
&structs.TCPRouteConfigEntry{
|
&structs.TCPRouteConfigEntry{
|
||||||
Kind: structs.TCPRoute,
|
Kind: structs.TCPRoute,
|
||||||
Name: "route",
|
Name: "route",
|
||||||
Services: []structs.TCPService{{
|
Services: []structs.TCPService{{
|
||||||
Name: "service",
|
Name: "service",
|
||||||
}},
|
}},
|
||||||
|
Parents: []structs.ResourceReference{
|
||||||
|
{
|
||||||
|
Kind: structs.APIGateway,
|
||||||
|
Name: "api-gateway",
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}, []structs.InlineCertificateConfigEntry{{
|
}, []structs.InlineCertificateConfigEntry{{
|
||||||
Kind: structs.InlineCertificate,
|
Kind: structs.InlineCertificate,
|
||||||
@ -410,6 +417,12 @@ func getAPIGatewayGoldenTestCases(t *testing.T) []goldenTestCase {
|
|||||||
Name: "service",
|
Name: "service",
|
||||||
}},
|
}},
|
||||||
}},
|
}},
|
||||||
|
Parents: []structs.ResourceReference{
|
||||||
|
{
|
||||||
|
Kind: structs.APIGateway,
|
||||||
|
Name: "api-gateway",
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}, nil, []proxycfg.UpdateEvent{{
|
}, nil, []proxycfg.UpdateEvent{{
|
||||||
CorrelationID: "discovery-chain:" + serviceUID.String(),
|
CorrelationID: "discovery-chain:" + serviceUID.String(),
|
||||||
|
@ -6,7 +6,7 @@ load helpers
|
|||||||
retry_default curl -f -s localhost:20000/stats -o /dev/null
|
retry_default curl -f -s localhost:20000/stats -o /dev/null
|
||||||
}
|
}
|
||||||
|
|
||||||
@test "api gateway should have be accepted and not conflicted" {
|
@test "api gateway should have been accepted and not conflicted" {
|
||||||
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
|
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
|
||||||
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
|
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,7 @@ load helpers
|
|||||||
retry_default curl -f -s localhost:20000/stats -o /dev/null
|
retry_default curl -f -s localhost:20000/stats -o /dev/null
|
||||||
}
|
}
|
||||||
|
|
||||||
@test "api gateway should have be accepted and not conflicted" {
|
@test "api gateway should have been accepted and not conflicted" {
|
||||||
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
|
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
|
||||||
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
|
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,7 @@ load helpers
|
|||||||
retry_default curl -f -s localhost:20000/stats -o /dev/null
|
retry_default curl -f -s localhost:20000/stats -o /dev/null
|
||||||
}
|
}
|
||||||
|
|
||||||
@test "api gateway should have be accepted and not conflicted" {
|
@test "api gateway should have been accepted and not conflicted" {
|
||||||
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
|
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
|
||||||
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
|
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,7 @@ load helpers
|
|||||||
retry_default curl -f -s localhost:20000/stats -o /dev/null
|
retry_default curl -f -s localhost:20000/stats -o /dev/null
|
||||||
}
|
}
|
||||||
|
|
||||||
@test "api gateway should have be accepted and not conflicted" {
|
@test "api gateway should have been accepted and not conflicted" {
|
||||||
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
|
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
|
||||||
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
|
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,7 @@ load helpers
|
|||||||
retry_default curl -f -s localhost:20000/stats -o /dev/null
|
retry_default curl -f -s localhost:20000/stats -o /dev/null
|
||||||
}
|
}
|
||||||
|
|
||||||
@test "api gateway should have be accepted and not conflicted" {
|
@test "api gateway should have been accepted and not conflicted" {
|
||||||
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
|
assert_config_entry_status Accepted True Accepted primary api-gateway api-gateway
|
||||||
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
|
assert_config_entry_status Conflicted False NoConflict primary api-gateway api-gateway
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user