mirror of https://github.com/status-im/consul.git
[NET-7375] Add Skeleton for APIGW ProxyStateTemplateBuilder (#20628)
* WIP * got empty state working and cleaned up additional code * Moved api gateway mapper to it's own file, removed mesh port usage for api gateway, allow gateway to reconcile without routes * Update internal/mesh/internal/controllers/gatewayproxy/controller.go Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com> * Feedback from PR Review: - rename referencedTCPRoutes variable to be more descriptive - fetchers are in alphabetical order - move log statements to be more indicative of internal state --------- Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com>
This commit is contained in:
parent
f5c2b408f7
commit
2ed67ba9ae
|
@ -0,0 +1,154 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package builder
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"google.golang.org/protobuf/types/known/durationpb"
|
||||
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/gatewayproxy/fetcher"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/types"
|
||||
pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
|
||||
meshv2beta1 "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
|
||||
"github.com/hashicorp/consul/proto-public/pbmesh/v2beta1/pbproxystate"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
type apiGWProxyStateTemplateBuilder struct {
|
||||
workload *types.DecodedWorkload
|
||||
dataFetcher *fetcher.Fetcher
|
||||
dc string
|
||||
services []*pbcatalog.Service
|
||||
tcpRoutes []*meshv2beta1.TCPRoute
|
||||
apiGateway *meshv2beta1.APIGateway
|
||||
logger hclog.Logger
|
||||
trustDomain string
|
||||
}
|
||||
|
||||
func NewAPIGWProxyStateTemplateBuilder(workload *types.DecodedWorkload, services []*pbcatalog.Service, tcpRoutes []*meshv2beta1.TCPRoute, apiGateway *meshv2beta1.APIGateway, logger hclog.Logger, dataFetcher *fetcher.Fetcher, dc, trustDomain string) *apiGWProxyStateTemplateBuilder {
|
||||
return &apiGWProxyStateTemplateBuilder{
|
||||
workload: workload,
|
||||
dataFetcher: dataFetcher,
|
||||
services: services,
|
||||
tcpRoutes: tcpRoutes,
|
||||
apiGateway: apiGateway,
|
||||
dc: dc,
|
||||
logger: logger,
|
||||
trustDomain: trustDomain,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *apiGWProxyStateTemplateBuilder) identity() *pbresource.Reference {
|
||||
return &pbresource.Reference{
|
||||
Name: b.workload.Data.Identity,
|
||||
Tenancy: b.workload.Id.Tenancy,
|
||||
Type: pbauth.WorkloadIdentityType,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *apiGWProxyStateTemplateBuilder) listeners() []*pbproxystate.Listener {
|
||||
// TODO NET-7985
|
||||
var listeners []*pbproxystate.Listener
|
||||
|
||||
address := b.workload.Data.Addresses[0]
|
||||
for idx, portName := range address.Ports {
|
||||
|
||||
workloadPort, ok := b.workload.Data.Ports[portName]
|
||||
if !ok {
|
||||
b.logger.Trace("port does not exist for workload", "port name", portName)
|
||||
continue
|
||||
}
|
||||
listeners = append(listeners, b.listener(fmt.Sprintf("default-%d", idx), address, workloadPort.Port, pbproxystate.Direction_DIRECTION_INBOUND, b.routers()))
|
||||
}
|
||||
|
||||
b.logger.Trace("listeners for apigw pst", "listeners", listeners)
|
||||
return listeners
|
||||
}
|
||||
|
||||
func (b *apiGWProxyStateTemplateBuilder) listener(name string, address *pbcatalog.WorkloadAddress, port uint32, _ pbproxystate.Direction, routers []*pbproxystate.Router) *pbproxystate.Listener {
|
||||
// TODO NET-7985
|
||||
return &pbproxystate.Listener{
|
||||
Name: name,
|
||||
Direction: pbproxystate.Direction_DIRECTION_INBOUND,
|
||||
BindAddress: &pbproxystate.Listener_HostPort{
|
||||
HostPort: &pbproxystate.HostPortAddress{
|
||||
Host: address.Host,
|
||||
Port: port,
|
||||
},
|
||||
},
|
||||
Capabilities: []pbproxystate.Capability{
|
||||
pbproxystate.Capability_CAPABILITY_L4_TLS_INSPECTION,
|
||||
},
|
||||
DefaultRouter: &pbproxystate.Router{
|
||||
Destination: &pbproxystate.Router_L4{
|
||||
L4: &pbproxystate.L4Destination{
|
||||
Destination: &pbproxystate.L4Destination_Cluster{
|
||||
Cluster: &pbproxystate.DestinationCluster{
|
||||
Name: nullRouteClusterName,
|
||||
},
|
||||
},
|
||||
StatPrefix: "prefix",
|
||||
},
|
||||
},
|
||||
},
|
||||
Routers: routers,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *apiGWProxyStateTemplateBuilder) clusters() map[string]*pbproxystate.Cluster {
|
||||
clusters := map[string]*pbproxystate.Cluster{}
|
||||
|
||||
// Add null route cluster for any unmatched traffic
|
||||
clusters[nullRouteClusterName] = &pbproxystate.Cluster{
|
||||
Name: nullRouteClusterName,
|
||||
Group: &pbproxystate.Cluster_EndpointGroup{
|
||||
EndpointGroup: &pbproxystate.EndpointGroup{
|
||||
Group: &pbproxystate.EndpointGroup_Static{
|
||||
Static: &pbproxystate.StaticEndpointGroup{
|
||||
Config: &pbproxystate.StaticEndpointGroupConfig{
|
||||
ConnectTimeout: durationpb.New(10 * time.Second),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Protocol: pbproxystate.Protocol_PROTOCOL_TCP,
|
||||
}
|
||||
|
||||
// TODO NET-7984
|
||||
return clusters
|
||||
}
|
||||
|
||||
func (b *apiGWProxyStateTemplateBuilder) routers() []*pbproxystate.Router {
|
||||
return []*pbproxystate.Router{}
|
||||
}
|
||||
|
||||
func (b *apiGWProxyStateTemplateBuilder) routes() map[string]*pbproxystate.Route {
|
||||
// TODO NET-7986
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *apiGWProxyStateTemplateBuilder) Build() *meshv2beta1.ProxyStateTemplate {
|
||||
return &meshv2beta1.ProxyStateTemplate{
|
||||
ProxyState: &meshv2beta1.ProxyState{
|
||||
Identity: b.identity(),
|
||||
Listeners: b.listeners(),
|
||||
Clusters: b.clusters(),
|
||||
Routes: b.routes(),
|
||||
},
|
||||
RequiredEndpoints: b.requiredEndpoints(),
|
||||
RequiredLeafCertificates: make(map[string]*pbproxystate.LeafCertificateRef),
|
||||
RequiredTrustBundles: make(map[string]*pbproxystate.TrustBundleRef),
|
||||
}
|
||||
}
|
||||
|
||||
func (b *apiGWProxyStateTemplateBuilder) requiredEndpoints() map[string]*pbproxystate.EndpointRef {
|
||||
requiredEndpoints := make(map[string]*pbproxystate.EndpointRef)
|
||||
|
||||
return requiredEndpoints
|
||||
}
|
|
@ -28,7 +28,7 @@ const (
|
|||
nullRouteClusterName = "null_route_cluster"
|
||||
)
|
||||
|
||||
type proxyStateTemplateBuilder struct {
|
||||
type meshGWProxyStateTemplateBuilder struct {
|
||||
workload *types.DecodedWorkload
|
||||
dataFetcher *fetcher.Fetcher
|
||||
dc string
|
||||
|
@ -38,8 +38,8 @@ type proxyStateTemplateBuilder struct {
|
|||
remoteGatewayIDs []*pbresource.ID
|
||||
}
|
||||
|
||||
func NewProxyStateTemplateBuilder(workload *types.DecodedWorkload, exportedServices []*pbmulticluster.ComputedExportedService, logger hclog.Logger, dataFetcher *fetcher.Fetcher, dc, trustDomain string, remoteGatewayIDs []*pbresource.ID) *proxyStateTemplateBuilder {
|
||||
return &proxyStateTemplateBuilder{
|
||||
func NewMeshGWProxyStateTemplateBuilder(workload *types.DecodedWorkload, exportedServices []*pbmulticluster.ComputedExportedService, logger hclog.Logger, dataFetcher *fetcher.Fetcher, dc, trustDomain string, remoteGatewayIDs []*pbresource.ID) *meshGWProxyStateTemplateBuilder {
|
||||
return &meshGWProxyStateTemplateBuilder{
|
||||
workload: workload,
|
||||
dataFetcher: dataFetcher,
|
||||
dc: dc,
|
||||
|
@ -50,7 +50,7 @@ func NewProxyStateTemplateBuilder(workload *types.DecodedWorkload, exportedServi
|
|||
}
|
||||
}
|
||||
|
||||
func (b *proxyStateTemplateBuilder) identity() *pbresource.Reference {
|
||||
func (b *meshGWProxyStateTemplateBuilder) identity() *pbresource.Reference {
|
||||
return &pbresource.Reference{
|
||||
Name: b.workload.Data.Identity,
|
||||
Tenancy: b.workload.Id.Tenancy,
|
||||
|
@ -58,7 +58,7 @@ func (b *proxyStateTemplateBuilder) identity() *pbresource.Reference {
|
|||
}
|
||||
}
|
||||
|
||||
func (b *proxyStateTemplateBuilder) listeners() []*pbproxystate.Listener {
|
||||
func (b *meshGWProxyStateTemplateBuilder) listeners() []*pbproxystate.Listener {
|
||||
var listeners []*pbproxystate.Listener
|
||||
var address *pbcatalog.WorkloadAddress
|
||||
|
||||
|
@ -107,7 +107,7 @@ func (b *proxyStateTemplateBuilder) listeners() []*pbproxystate.Listener {
|
|||
// meshListener constructs a pbproxystate.Listener that receives outgoing
|
||||
// traffic from the local partition where the mesh gateway mode is "local". This
|
||||
// traffic will be sent to a mesh gateway in a remote partition.
|
||||
func (b *proxyStateTemplateBuilder) meshListener(address *pbcatalog.WorkloadAddress, port uint32) *pbproxystate.Listener {
|
||||
func (b *meshGWProxyStateTemplateBuilder) meshListener(address *pbcatalog.WorkloadAddress, port uint32) *pbproxystate.Listener {
|
||||
return b.listener("mesh_listener", address, port, pbproxystate.Direction_DIRECTION_OUTBOUND, b.meshRouters())
|
||||
}
|
||||
|
||||
|
@ -115,11 +115,11 @@ func (b *proxyStateTemplateBuilder) meshListener(address *pbcatalog.WorkloadAddr
|
|||
// traffic from the public internet, either from a mesh gateway in a remote partition
|
||||
// where the mesh gateway mode is "local" or from a service in a remote partition
|
||||
// where the mesh gateway mode is "remote".
|
||||
func (b *proxyStateTemplateBuilder) wanListener(address *pbcatalog.WorkloadAddress, port uint32) *pbproxystate.Listener {
|
||||
func (b *meshGWProxyStateTemplateBuilder) wanListener(address *pbcatalog.WorkloadAddress, port uint32) *pbproxystate.Listener {
|
||||
return b.listener("wan_listener", address, port, pbproxystate.Direction_DIRECTION_INBOUND, b.wanRouters())
|
||||
}
|
||||
|
||||
func (b *proxyStateTemplateBuilder) listener(name string, address *pbcatalog.WorkloadAddress, port uint32, direction pbproxystate.Direction, routers []*pbproxystate.Router) *pbproxystate.Listener {
|
||||
func (b *meshGWProxyStateTemplateBuilder) listener(name string, address *pbcatalog.WorkloadAddress, port uint32, direction pbproxystate.Direction, routers []*pbproxystate.Router) *pbproxystate.Listener {
|
||||
return &pbproxystate.Listener{
|
||||
Name: name,
|
||||
Direction: direction,
|
||||
|
@ -152,7 +152,7 @@ func (b *proxyStateTemplateBuilder) listener(name string, address *pbcatalog.Wor
|
|||
// a pbproxystate.Router matching the partition + datacenter of the SNI to the target
|
||||
// cluster. Traffic flowing through this router originates in the local partition where
|
||||
// the mesh gateway mode is "local".
|
||||
func (b *proxyStateTemplateBuilder) meshRouters() []*pbproxystate.Router {
|
||||
func (b *meshGWProxyStateTemplateBuilder) meshRouters() []*pbproxystate.Router {
|
||||
var routers []*pbproxystate.Router
|
||||
|
||||
for _, remoteGatewayID := range b.remoteGatewayIDs {
|
||||
|
@ -192,7 +192,7 @@ func (b *proxyStateTemplateBuilder) meshRouters() []*pbproxystate.Router {
|
|||
// a pbproxystate.Router matching the SNI to the target cluster. Traffic flowing through this
|
||||
// router originates from a mesh gateway in a remote partition where the mesh gateway mode is
|
||||
// "local" or from a service in a remote partition where the mesh gateway mode is "remote".
|
||||
func (b *proxyStateTemplateBuilder) wanRouters() []*pbproxystate.Router {
|
||||
func (b *meshGWProxyStateTemplateBuilder) wanRouters() []*pbproxystate.Router {
|
||||
var routers []*pbproxystate.Router
|
||||
|
||||
for _, exportedService := range b.exportedServices {
|
||||
|
@ -235,7 +235,7 @@ func (b *proxyStateTemplateBuilder) wanRouters() []*pbproxystate.Router {
|
|||
return routers
|
||||
}
|
||||
|
||||
func (b *proxyStateTemplateBuilder) clusters() map[string]*pbproxystate.Cluster {
|
||||
func (b *meshGWProxyStateTemplateBuilder) clusters() map[string]*pbproxystate.Cluster {
|
||||
clusters := map[string]*pbproxystate.Cluster{}
|
||||
|
||||
// Clusters handling incoming traffic from a remote partition
|
||||
|
@ -316,12 +316,12 @@ func (b *proxyStateTemplateBuilder) clusters() map[string]*pbproxystate.Cluster
|
|||
return clusters
|
||||
}
|
||||
|
||||
func (b *proxyStateTemplateBuilder) routes() map[string]*pbproxystate.Route {
|
||||
func (b *meshGWProxyStateTemplateBuilder) routes() map[string]*pbproxystate.Route {
|
||||
// TODO NET-6428
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *proxyStateTemplateBuilder) Build() *meshv2beta1.ProxyStateTemplate {
|
||||
func (b *meshGWProxyStateTemplateBuilder) Build() *meshv2beta1.ProxyStateTemplate {
|
||||
return &meshv2beta1.ProxyStateTemplate{
|
||||
ProxyState: &meshv2beta1.ProxyState{
|
||||
Identity: b.identity(),
|
||||
|
@ -337,7 +337,7 @@ func (b *proxyStateTemplateBuilder) Build() *meshv2beta1.ProxyStateTemplate {
|
|||
|
||||
// requiredEndpoints loops through the consumers for each exported service
|
||||
// and adds a pbproxystate.EndpointRef to be hydrated for each cluster.
|
||||
func (b *proxyStateTemplateBuilder) requiredEndpoints() map[string]*pbproxystate.EndpointRef {
|
||||
func (b *meshGWProxyStateTemplateBuilder) requiredEndpoints() map[string]*pbproxystate.EndpointRef {
|
||||
requiredEndpoints := make(map[string]*pbproxystate.EndpointRef)
|
||||
|
||||
// Endpoints for clusters handling incoming traffic from another partition
|
||||
|
@ -399,11 +399,11 @@ func (b *proxyStateTemplateBuilder) requiredEndpoints() map[string]*pbproxystate
|
|||
// clusterNameForExportedService generates a cluster name for a given service
|
||||
// that is being exported from the local partition to a remote partition. This
|
||||
// partition may reside in the same datacenter or in a remote datacenter.
|
||||
func (b *proxyStateTemplateBuilder) clusterNameForExportedService(serviceRef *pbresource.Reference, consumer *pbmulticluster.ComputedExportedServiceConsumer, port string) string {
|
||||
func (b *meshGWProxyStateTemplateBuilder) clusterNameForExportedService(serviceRef *pbresource.Reference, consumer *pbmulticluster.ComputedExportedServiceConsumer, port string) string {
|
||||
return fmt.Sprintf("%s.%s", port, b.sniForExportedService(serviceRef, consumer))
|
||||
}
|
||||
|
||||
func (b *proxyStateTemplateBuilder) sniForExportedService(serviceRef *pbresource.Reference, consumer *pbmulticluster.ComputedExportedServiceConsumer) string {
|
||||
func (b *meshGWProxyStateTemplateBuilder) sniForExportedService(serviceRef *pbresource.Reference, consumer *pbmulticluster.ComputedExportedServiceConsumer) string {
|
||||
switch consumer.Tenancy.(type) {
|
||||
case *pbmulticluster.ComputedExportedServiceConsumer_Partition:
|
||||
return connect.ServiceSNI(serviceRef.Name, "", serviceRef.Tenancy.Namespace, serviceRef.Tenancy.Partition, b.dc, b.trustDomain)
|
||||
|
@ -417,7 +417,7 @@ func (b *proxyStateTemplateBuilder) sniForExportedService(serviceRef *pbresource
|
|||
// clusterNameForRemoteGateway generates a cluster name for a given remote mesh
|
||||
// gateway. This will be used to route traffic from the local partition to the mesh
|
||||
// gateway for a remote partition.
|
||||
func (b *proxyStateTemplateBuilder) clusterNameForRemoteGateway(remoteGatewayID *pbresource.ID) string {
|
||||
func (b *meshGWProxyStateTemplateBuilder) clusterNameForRemoteGateway(remoteGatewayID *pbresource.ID) string {
|
||||
return connect.GatewaySNI(b.dc, remoteGatewayID.Tenancy.Partition, b.trustDomain)
|
||||
}
|
||||
|
|
@ -197,7 +197,7 @@ func (suite *proxyStateTemplateBuilderSuite) TestProxyStateTemplateBuilder_Build
|
|||
"without address ports": suite.workloadWithOutAddressPorts,
|
||||
} {
|
||||
testutil.RunStep(suite.T(), name, func(t *testing.T) {
|
||||
builder := NewProxyStateTemplateBuilder(workload, suite.exportedServicesPeerData.Data.Services, logger, f, dc, trustDomain, nil)
|
||||
builder := NewMeshGWProxyStateTemplateBuilder(workload, suite.exportedServicesPeerData.Data.Services, logger, f, dc, trustDomain, nil)
|
||||
expectedProxyStateTemplate := &pbmesh.ProxyStateTemplate{
|
||||
ProxyState: &pbmesh.ProxyState{
|
||||
Identity: &pbresource.Reference{
|
|
@ -40,6 +40,7 @@ func Controller(trustDomainFetcher sidecarproxy.TrustDomainFetcher, dc string, d
|
|||
WithWatch(pbcatalog.WorkloadType, dependency.ReplaceType(pbmesh.ProxyStateTemplateType)).
|
||||
WithWatch(pbmesh.ComputedProxyConfigurationType, dependency.ReplaceType(pbmesh.ProxyStateTemplateType)).
|
||||
WithWatch(pbmulticluster.ComputedExportedServicesType, mapper.AllMeshGatewayWorkloadsInPartition).
|
||||
WithWatch(pbmesh.TCPRouteType, mapper.APIGatewaysInParentRefs).
|
||||
WithReconciler(&reconciler{
|
||||
dc: dc,
|
||||
defaultAllow: defaultAllow,
|
||||
|
@ -85,14 +86,120 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
|
|||
return r.reconcileMeshGatewayProxyState(ctx, dataFetcher, workload, rt, req)
|
||||
case apigateways.GatewayKind:
|
||||
rt.Logger.Trace("workload is a api-gateway; reconciling", "workload", workloadID, "workloadData", workload.Data)
|
||||
// TODO: NET-735 -- implement api-gateway reconciliation
|
||||
return nil
|
||||
return r.reconcileAPIGatewayProxyState(ctx, dataFetcher, workload, rt, req)
|
||||
default:
|
||||
rt.Logger.Trace("workload is not a gateway; skipping reconciliation", "workload", workloadID)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (r *reconciler) reconcileAPIGatewayProxyState(ctx context.Context, dataFetcher *fetcher.Fetcher, workload *resource.DecodedResource[*pbcatalog.Workload], rt controller.Runtime, req controller.Request) error {
|
||||
proxyStateTemplate, err := dataFetcher.FetchProxyStateTemplate(ctx, req.ID)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error reading proxy state template", "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if proxyStateTemplate == nil {
|
||||
req.ID.Uid = ""
|
||||
rt.Logger.Trace("proxy state template for this gateway doesn't yet exist; generating a new one")
|
||||
}
|
||||
|
||||
gwID := &pbresource.ID{
|
||||
Name: workload.Data.Identity,
|
||||
Type: pbmesh.APIGatewayType,
|
||||
Tenancy: workload.Id.Tenancy,
|
||||
}
|
||||
|
||||
apiGateway, err := dataFetcher.FetchAPIGateway(ctx, gwID)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error reading the associated api gateway", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if apiGateway == nil {
|
||||
rt.Logger.Trace("api gateway doesn't exist; skipping reconciliation", "apiGatewayID", gwID)
|
||||
return nil
|
||||
}
|
||||
|
||||
allTCPRoutes, err := dataFetcher.FetchAllTCPRoutes(ctx, req.ID.Tenancy)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error reading the associated tcp routes", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if len(allTCPRoutes) == 0 {
|
||||
rt.Logger.Trace("no tcp routes found for this gateway", "apiGatewayID", gwID)
|
||||
}
|
||||
|
||||
services := make([]*pbcatalog.Service, 0)
|
||||
|
||||
tcpRoutesReferencingGateway := make([]*pbmesh.TCPRoute, 0)
|
||||
for _, tcpRoute := range allTCPRoutes {
|
||||
for _, parentRef := range tcpRoute.Data.ParentRefs {
|
||||
if resource.EqualReference(parentRef.Ref, resource.ReferenceFromReferenceOrID(apiGateway.Id)) {
|
||||
tcpRoutesReferencingGateway = append(tcpRoutesReferencingGateway, tcpRoute.Data)
|
||||
for _, rule := range tcpRoute.Data.Rules {
|
||||
for _, backendRef := range rule.BackendRefs {
|
||||
service, err := dataFetcher.FetchService(ctx, resource.IDFromReference(backendRef.BackendRef.Ref))
|
||||
if err != nil {
|
||||
rt.Logger.Error("error reading the associated service", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if service == nil {
|
||||
rt.Logger.Error("service was nil", "serviceID", resource.IDFromReference(backendRef.BackendRef.Ref))
|
||||
return nil
|
||||
}
|
||||
services = append(services, service.Data)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trustDomain, err := r.getTrustDomain()
|
||||
if err != nil {
|
||||
rt.Logger.Error("error fetching trust domain to compute proxy state template", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
gw := apiGateway.Data
|
||||
|
||||
newPST := builder.NewAPIGWProxyStateTemplateBuilder(workload, services, tcpRoutesReferencingGateway, gw, rt.Logger, dataFetcher, r.dc, trustDomain).Build()
|
||||
|
||||
proxyTemplateData, err := anypb.New(newPST)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error creating proxy state template data", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// If we're not creating a new PST and the generated one matches the existing one, nothing to do
|
||||
if proxyStateTemplate != nil && proto.Equal(proxyStateTemplate.Data, newPST) {
|
||||
rt.Logger.Trace("no changes to existing proxy state template")
|
||||
return nil
|
||||
}
|
||||
|
||||
rt.Logger.Trace("updating proxy state template")
|
||||
|
||||
// Write the created/updated ProxyStateTemplate with Workload as owner
|
||||
_, err = rt.Client.Write(ctx, &pbresource.WriteRequest{
|
||||
Resource: &pbresource.Resource{
|
||||
Id: req.ID,
|
||||
Metadata: map[string]string{"gateway-kind": workload.Metadata["gateway-kind"]},
|
||||
Owner: workload.Resource.Id,
|
||||
Data: proxyTemplateData,
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
rt.Logger.Error("error writing proxy state template", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *reconciler) reconcileMeshGatewayProxyState(ctx context.Context, dataFetcher *fetcher.Fetcher, workload *resource.DecodedResource[*pbcatalog.Workload], rt controller.Runtime, req controller.Request) error {
|
||||
proxyStateTemplate, err := dataFetcher.FetchProxyStateTemplate(ctx, req.ID)
|
||||
if err != nil {
|
||||
|
@ -146,14 +253,13 @@ func (r *reconciler) reconcileMeshGatewayProxyState(ctx context.Context, dataFet
|
|||
return err
|
||||
}
|
||||
|
||||
newPST := builder.NewProxyStateTemplateBuilder(workload, exportedServices, rt.Logger, dataFetcher, r.dc, trustDomain, remoteGatewayIDs).Build()
|
||||
newPST := builder.NewMeshGWProxyStateTemplateBuilder(workload, exportedServices, rt.Logger, dataFetcher, r.dc, trustDomain, remoteGatewayIDs).Build()
|
||||
|
||||
proxyTemplateData, err := anypb.New(newPST)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error creating proxy state template data", "error", err)
|
||||
return err
|
||||
}
|
||||
rt.Logger.Trace("updating proxy state template")
|
||||
|
||||
// If we're not creating a new PST and the generated one matches the existing one, nothing to do
|
||||
if proxyStateTemplate != nil && proto.Equal(proxyStateTemplate.Data, newPST) {
|
||||
|
@ -161,6 +267,8 @@ func (r *reconciler) reconcileMeshGatewayProxyState(ctx context.Context, dataFet
|
|||
return nil
|
||||
}
|
||||
|
||||
rt.Logger.Trace("updating proxy state template")
|
||||
|
||||
// Write the created/updated ProxyStateTemplate with MeshGateway owner
|
||||
_, err = rt.Client.Write(ctx, &pbresource.WriteRequest{
|
||||
Resource: &pbresource.Resource{
|
||||
|
|
|
@ -27,6 +27,46 @@ func New(client pbresource.ResourceServiceClient) *Fetcher {
|
|||
}
|
||||
}
|
||||
|
||||
// FetchAllTCPRoutes fetches all the tcp routes.
|
||||
// TODO: in the future this will not be necessary as we'll use the computed gateway routes.
|
||||
func (f *Fetcher) FetchAllTCPRoutes(ctx context.Context, tenancy *pbresource.Tenancy) ([]*types.DecodedTCPRoute, error) {
|
||||
dec, err := resource.ListDecodedResource[*pbmesh.TCPRoute](ctx, f.client, &pbresource.ListRequest{
|
||||
Type: pbmesh.TCPRouteType,
|
||||
Tenancy: tenancy,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return dec, nil
|
||||
}
|
||||
|
||||
// FetchAPIGateway fetches a service resource from the resource service.
|
||||
// This will panic if the type field in the ID argument is not a APIGateway type.
|
||||
func (f *Fetcher) FetchAPIGateway(ctx context.Context, id *pbresource.ID) (*types.DecodedAPIGateway, error) {
|
||||
assertResourceType(pbmesh.APIGatewayType, id.Type)
|
||||
|
||||
dec, err := resource.GetDecodedResource[*pbmesh.APIGateway](ctx, f.client, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return dec, nil
|
||||
}
|
||||
|
||||
// FetchComputedExportedServices fetches a service resource from the resource service.
|
||||
// This will panic if the type field in the ID argument is not a ComputedExportedServices type.
|
||||
func (f *Fetcher) FetchComputedExportedServices(ctx context.Context, id *pbresource.ID) (*types.DecodedComputedExportedServices, error) {
|
||||
assertResourceType(pbmulticluster.ComputedExportedServicesType, id.Type)
|
||||
|
||||
dec, err := resource.GetDecodedResource[*pbmulticluster.ComputedExportedServices](ctx, f.client, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return dec, nil
|
||||
}
|
||||
|
||||
// FetchMeshGateway fetches a service resource from the resource service.
|
||||
// This will panic if the type field in the ID argument is not a MeshGateway type.
|
||||
func (f *Fetcher) FetchMeshGateway(ctx context.Context, id *pbresource.ID) (*types.DecodedMeshGateway, error) {
|
||||
|
@ -66,32 +106,6 @@ func (f *Fetcher) FetchProxyStateTemplate(ctx context.Context, id *pbresource.ID
|
|||
return dec, nil
|
||||
}
|
||||
|
||||
// FetchWorkload fetches a service resource from the resource service.
|
||||
// This will panic if the type field in the ID argument is not a Workload type.
|
||||
func (f *Fetcher) FetchWorkload(ctx context.Context, id *pbresource.ID) (*types.DecodedWorkload, error) {
|
||||
assertResourceType(pbcatalog.WorkloadType, id.Type)
|
||||
|
||||
dec, err := resource.GetDecodedResource[*pbcatalog.Workload](ctx, f.client, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return dec, nil
|
||||
}
|
||||
|
||||
// FetchComputedExportedServices fetches a service resource from the resource service.
|
||||
// This will panic if the type field in the ID argument is not a ComputedExportedServices type.
|
||||
func (f *Fetcher) FetchComputedExportedServices(ctx context.Context, id *pbresource.ID) (*types.DecodedComputedExportedServices, error) {
|
||||
assertResourceType(pbmulticluster.ComputedExportedServicesType, id.Type)
|
||||
|
||||
dec, err := resource.GetDecodedResource[*pbmulticluster.ComputedExportedServices](ctx, f.client, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return dec, nil
|
||||
}
|
||||
|
||||
// FetchService fetches a service resource from the resource service.
|
||||
// This will panic if the type field in the ID argument is not a Service type.
|
||||
func (f *Fetcher) FetchService(ctx context.Context, id *pbresource.ID) (*types.DecodedService, error) {
|
||||
|
@ -116,6 +130,32 @@ func (f *Fetcher) FetchServiceEndpoints(ctx context.Context, id *pbresource.ID)
|
|||
return dec, nil
|
||||
}
|
||||
|
||||
// FetchTCPRoute fetches all the tcp routes.
|
||||
// TODO: in the future this will not be necessary as we'll use the computed gateway routes.
|
||||
func (f *Fetcher) FetchTCPRoute(ctx context.Context, id *pbresource.ID) (*types.DecodedTCPRoute, error) {
|
||||
assertResourceType(pbmesh.TCPRouteType, id.Type)
|
||||
|
||||
dec, err := resource.GetDecodedResource[*pbmesh.TCPRoute](ctx, f.client, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return dec, nil
|
||||
}
|
||||
|
||||
// FetchWorkload fetches a service resource from the resource service.
|
||||
// This will panic if the type field in the ID argument is not a Workload type.
|
||||
func (f *Fetcher) FetchWorkload(ctx context.Context, id *pbresource.ID) (*types.DecodedWorkload, error) {
|
||||
assertResourceType(pbcatalog.WorkloadType, id.Type)
|
||||
|
||||
dec, err := resource.GetDecodedResource[*pbcatalog.Workload](ctx, f.client, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return dec, nil
|
||||
}
|
||||
|
||||
// this is a helper function to ensure that the resource type we are querying for is the type we expect
|
||||
func assertResourceType(expected, actual *pbresource.Type) {
|
||||
if !proto.Equal(expected, actual) {
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package mapper
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/gatewayproxy/fetcher"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
|
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
var APIGatewaysInParentRefs = func(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
|
||||
fetcher := fetcher.New(rt.Client)
|
||||
|
||||
requests := make([]controller.Request, 0)
|
||||
|
||||
route, err := fetcher.FetchTCPRoute(ctx, res.Id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if route == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
for _, parentRef := range route.Data.GetParentRefs() {
|
||||
if !resource.EqualType(parentRef.Ref.Type, pbmesh.APIGatewayType) {
|
||||
rt.Logger.Trace("parent reference type is not supported", "type", parentRef.Ref.Type)
|
||||
continue
|
||||
}
|
||||
|
||||
endpointsID := resource.ReplaceType(pbcatalog.ServiceEndpointsType, resource.IDFromReference(parentRef.Ref))
|
||||
endpoints, err := fetcher.FetchServiceEndpoints(ctx, endpointsID)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if endpoints == nil || endpoints.Data == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, endpoint := range endpoints.Data.Endpoints {
|
||||
requests = append(requests, controller.Request{
|
||||
ID: resource.ReplaceType(pbmesh.ProxyStateTemplateType, endpoint.TargetRef),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return requests, nil
|
||||
}
|
|
@ -197,6 +197,13 @@ func mapXRouteToComputedRoutes[T types.XRouteData](res *pbresource.Resource, m *
|
|||
|
||||
route := dec.Data
|
||||
|
||||
// we should ignore xRoutes that have a parentRef to an APIGateway
|
||||
for _, ref := range route.GetParentRefs() {
|
||||
if resource.EqualType(pbmesh.APIGatewayType, ref.Ref.Type) {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
m.TrackXRoute(res.Id, route)
|
||||
|
||||
refs := parentRefSliceToRefSlice(route.GetParentRefs())
|
||||
|
|
Loading…
Reference in New Issue