mirror of https://github.com/status-im/consul.git
[NET-7414] Reconcile PST for mesh gateway workloads on change to ComputedExportedServices (#20271)
* Reconcile ProxyStateTemplate on change to ComputedExportedServices * gofmt changeset --------- Co-authored-by: NiniOak <anita.akaeze@hashicorp.com>
This commit is contained in:
parent
57bad0df85
commit
45d645471b
|
@ -55,6 +55,7 @@ flowchart TD
|
||||||
mesh/v2beta1/proxystatetemplate --> mesh/v2beta1/computedexplicitdestinations
|
mesh/v2beta1/proxystatetemplate --> mesh/v2beta1/computedexplicitdestinations
|
||||||
mesh/v2beta1/proxystatetemplate --> mesh/v2beta1/computedproxyconfiguration
|
mesh/v2beta1/proxystatetemplate --> mesh/v2beta1/computedproxyconfiguration
|
||||||
mesh/v2beta1/proxystatetemplate --> mesh/v2beta1/computedroutes
|
mesh/v2beta1/proxystatetemplate --> mesh/v2beta1/computedroutes
|
||||||
|
mesh/v2beta1/proxystatetemplate --> multicluster/v2/computedexportedservices
|
||||||
mesh/v2beta1/tcproute
|
mesh/v2beta1/tcproute
|
||||||
multicluster/v2/computedexportedservices --> catalog/v2beta1/service
|
multicluster/v2/computedexportedservices --> catalog/v2beta1/service
|
||||||
multicluster/v2/computedexportedservices --> multicluster/v2/exportedservices
|
multicluster/v2/computedexportedservices --> multicluster/v2/exportedservices
|
||||||
|
|
|
@ -9,11 +9,13 @@ import (
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
"google.golang.org/protobuf/types/known/anypb"
|
"google.golang.org/protobuf/types/known/anypb"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/internal/controller"
|
"github.com/hashicorp/consul/internal/controller"
|
||||||
"github.com/hashicorp/consul/internal/controller/dependency"
|
"github.com/hashicorp/consul/internal/controller/dependency"
|
||||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/apigateways"
|
"github.com/hashicorp/consul/internal/mesh/internal/controllers/apigateways"
|
||||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/gatewayproxy/builder"
|
"github.com/hashicorp/consul/internal/mesh/internal/controllers/gatewayproxy/builder"
|
||||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/gatewayproxy/fetcher"
|
"github.com/hashicorp/consul/internal/mesh/internal/controllers/gatewayproxy/fetcher"
|
||||||
|
"github.com/hashicorp/consul/internal/mesh/internal/controllers/gatewayproxy/mapper"
|
||||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/meshgateways"
|
"github.com/hashicorp/consul/internal/mesh/internal/controllers/meshgateways"
|
||||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/sidecarproxy"
|
"github.com/hashicorp/consul/internal/mesh/internal/controllers/sidecarproxy"
|
||||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/sidecarproxy/cache"
|
"github.com/hashicorp/consul/internal/mesh/internal/controllers/sidecarproxy/cache"
|
||||||
|
@ -34,9 +36,11 @@ const (
|
||||||
func Controller(cache *cache.Cache, trustDomainFetcher sidecarproxy.TrustDomainFetcher, dc string, defaultAllow bool) *controller.Controller {
|
func Controller(cache *cache.Cache, trustDomainFetcher sidecarproxy.TrustDomainFetcher, dc string, defaultAllow bool) *controller.Controller {
|
||||||
// TODO NET-7016 Use caching functionality in NewController being implemented at time of writing
|
// TODO NET-7016 Use caching functionality in NewController being implemented at time of writing
|
||||||
// TODO NET-7017 Add the host of other types we should watch
|
// TODO NET-7017 Add the host of other types we should watch
|
||||||
|
// TODO NET-7565: Add watch for serviceTypes across partitions
|
||||||
return controller.NewController(ControllerName, pbmesh.ProxyStateTemplateType).
|
return controller.NewController(ControllerName, pbmesh.ProxyStateTemplateType).
|
||||||
WithWatch(pbcatalog.WorkloadType, dependency.ReplaceType(pbmesh.ProxyStateTemplateType)).
|
WithWatch(pbcatalog.WorkloadType, dependency.ReplaceType(pbmesh.ProxyStateTemplateType)).
|
||||||
WithWatch(pbmesh.ComputedProxyConfigurationType, dependency.ReplaceType(pbmesh.ProxyStateTemplateType)).
|
WithWatch(pbmesh.ComputedProxyConfigurationType, dependency.ReplaceType(pbmesh.ProxyStateTemplateType)).
|
||||||
|
WithWatch(pbmulticluster.ComputedExportedServicesType, mapper.AllMeshGatewayWorkloadsInPartition).
|
||||||
WithReconciler(&reconciler{
|
WithReconciler(&reconciler{
|
||||||
cache: cache,
|
cache: cache,
|
||||||
dc: dc,
|
dc: dc,
|
||||||
|
@ -124,7 +128,9 @@ func (r *reconciler) reconcileMeshGatewayProxyState(ctx context.Context, dataFet
|
||||||
}
|
}
|
||||||
|
|
||||||
// This covers any incoming requests from inside my partition to services outside my partition
|
// This covers any incoming requests from inside my partition to services outside my partition
|
||||||
meshGateways, err := dataFetcher.FetchMeshGateways(ctx)
|
meshGateways, err := dataFetcher.FetchMeshGateways(ctx, &pbresource.Tenancy{
|
||||||
|
Partition: acl.WildcardPartitionName,
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
rt.Logger.Warn("error reading the associated mesh gateways", "error", err)
|
rt.Logger.Warn("error reading the associated mesh gateways", "error", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
|
||||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/sidecarproxy/cache"
|
"github.com/hashicorp/consul/internal/mesh/internal/controllers/sidecarproxy/cache"
|
||||||
"github.com/hashicorp/consul/internal/mesh/internal/types"
|
"github.com/hashicorp/consul/internal/mesh/internal/types"
|
||||||
"github.com/hashicorp/consul/internal/resource"
|
"github.com/hashicorp/consul/internal/resource"
|
||||||
|
@ -38,18 +37,13 @@ func (f *Fetcher) FetchMeshGateway(ctx context.Context, id *pbresource.ID) (*typ
|
||||||
dec, err := resource.GetDecodedResource[*pbmesh.MeshGateway](ctx, f.client, id)
|
dec, err := resource.GetDecodedResource[*pbmesh.MeshGateway](ctx, f.client, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if dec == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return dec, nil
|
return dec, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FetchMeshGateways fetches all MeshGateway resources known to the local server.
|
// FetchMeshGateways fetches all MeshGateway resources known to the local server.
|
||||||
func (f *Fetcher) FetchMeshGateways(ctx context.Context) ([]*types.DecodedMeshGateway, error) {
|
func (f *Fetcher) FetchMeshGateways(ctx context.Context, tenancy *pbresource.Tenancy) ([]*types.DecodedMeshGateway, error) {
|
||||||
tenancy := resource.DefaultClusteredTenancy()
|
|
||||||
tenancy.Partition = acl.WildcardPartitionName
|
|
||||||
|
|
||||||
dec, err := resource.ListDecodedResource[*pbmesh.MeshGateway](ctx, f.client, &pbresource.ListRequest{
|
dec, err := resource.ListDecodedResource[*pbmesh.MeshGateway](ctx, f.client, &pbresource.ListRequest{
|
||||||
Type: pbmesh.MeshGatewayType,
|
Type: pbmesh.MeshGatewayType,
|
||||||
Tenancy: tenancy,
|
Tenancy: tenancy,
|
||||||
|
@ -69,8 +63,6 @@ func (f *Fetcher) FetchProxyStateTemplate(ctx context.Context, id *pbresource.ID
|
||||||
dec, err := resource.GetDecodedResource[*pbmesh.ProxyStateTemplate](ctx, f.client, id)
|
dec, err := resource.GetDecodedResource[*pbmesh.ProxyStateTemplate](ctx, f.client, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if dec == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return dec, nil
|
return dec, nil
|
||||||
|
@ -84,8 +76,6 @@ func (f *Fetcher) FetchWorkload(ctx context.Context, id *pbresource.ID) (*types.
|
||||||
dec, err := resource.GetDecodedResource[*pbcatalog.Workload](ctx, f.client, id)
|
dec, err := resource.GetDecodedResource[*pbcatalog.Workload](ctx, f.client, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if dec == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return dec, nil
|
return dec, nil
|
||||||
|
@ -99,8 +89,6 @@ func (f *Fetcher) FetchComputedExportedServices(ctx context.Context, id *pbresou
|
||||||
dec, err := resource.GetDecodedResource[*pbmulticluster.ComputedExportedServices](ctx, f.client, id)
|
dec, err := resource.GetDecodedResource[*pbmulticluster.ComputedExportedServices](ctx, f.client, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if dec == nil {
|
|
||||||
return nil, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return dec, nil
|
return dec, nil
|
||||||
|
@ -114,8 +102,17 @@ func (f *Fetcher) FetchService(ctx context.Context, id *pbresource.ID) (*types.D
|
||||||
dec, err := resource.GetDecodedResource[*pbcatalog.Service](ctx, f.client, id)
|
dec, err := resource.GetDecodedResource[*pbcatalog.Service](ctx, f.client, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if dec == nil {
|
}
|
||||||
return nil, nil
|
|
||||||
|
return dec, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fetcher) FetchServiceEndpoints(ctx context.Context, id *pbresource.ID) (*types.DecodedServiceEndpoints, error) {
|
||||||
|
assertResourceType(pbcatalog.ServiceEndpointsType, id.Type)
|
||||||
|
|
||||||
|
dec, err := resource.GetDecodedResource[*pbcatalog.ServiceEndpoints](ctx, f.client, id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return dec, nil
|
return dec, nil
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
// Copyright (c) HashiCorp, Inc.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package mapper
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/internal/controller"
|
||||||
|
"github.com/hashicorp/consul/internal/mesh/internal/controllers/gatewayproxy/fetcher"
|
||||||
|
"github.com/hashicorp/consul/internal/resource"
|
||||||
|
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
|
||||||
|
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
|
||||||
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AllMeshGatewayWorkloadsInPartition returns one controller.Request for each Workload
|
||||||
|
// selected by a MeshGateway in the partition of the Resource.
|
||||||
|
var AllMeshGatewayWorkloadsInPartition = func(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) ([]controller.Request, error) {
|
||||||
|
fetcher := fetcher.New(rt.Client, nil)
|
||||||
|
|
||||||
|
gateways, err := fetcher.FetchMeshGateways(ctx, &pbresource.Tenancy{
|
||||||
|
Partition: res.Id.Tenancy.Partition,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var requests []controller.Request
|
||||||
|
|
||||||
|
for _, gateway := range gateways {
|
||||||
|
endpointsID := resource.ReplaceType(pbcatalog.ServiceEndpointsType, gateway.Id)
|
||||||
|
|
||||||
|
endpoints, err := fetcher.FetchServiceEndpoints(ctx, endpointsID)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if endpoints == nil || endpoints.Data == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, endpoint := range endpoints.Data.Endpoints {
|
||||||
|
requests = append(requests, controller.Request{
|
||||||
|
ID: resource.ReplaceType(pbmesh.ProxyStateTemplateType, endpoint.TargetRef),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return requests, nil
|
||||||
|
}
|
Loading…
Reference in New Issue