mirror of https://github.com/status-im/consul.git
[NET-6426] Modify Reconcile Loop for Mesh Gateway Resources to Correctly Write Proxy State Template (#20085)
This commit is contained in:
parent
3b111277ad
commit
c6c2d8bf82
|
@ -4,6 +4,9 @@
|
|||
package builder
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/gatewayproxy/fetcher"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/types"
|
||||
pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1"
|
||||
meshv2beta1 "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
|
||||
|
@ -12,12 +15,22 @@ import (
|
|||
)
|
||||
|
||||
type proxyStateTemplateBuilder struct {
|
||||
workload *types.DecodedWorkload
|
||||
workload *types.DecodedWorkload
|
||||
dataFetcher *fetcher.Fetcher
|
||||
dc string
|
||||
exportedServices *types.DecodedComputedExportedServices
|
||||
logger hclog.Logger
|
||||
trustDomain string
|
||||
}
|
||||
|
||||
func NewProxyStateTemplateBuilder(workload *types.DecodedWorkload) *proxyStateTemplateBuilder {
|
||||
func NewProxyStateTemplateBuilder(workload *types.DecodedWorkload, exportedServices *types.DecodedComputedExportedServices, logger hclog.Logger, dataFetcher *fetcher.Fetcher, dc, trustDomain string) *proxyStateTemplateBuilder {
|
||||
return &proxyStateTemplateBuilder{
|
||||
workload: workload,
|
||||
workload: workload,
|
||||
dataFetcher: dataFetcher,
|
||||
dc: dc,
|
||||
exportedServices: exportedServices,
|
||||
logger: logger,
|
||||
trustDomain: trustDomain,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -13,32 +13,41 @@ import (
|
|||
"github.com/hashicorp/consul/internal/controller/dependency"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/gatewayproxy/builder"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/gatewayproxy/fetcher"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/sidecarproxy"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/sidecarproxy/cache"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/types"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
|
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
|
||||
pbmulticluster "github.com/hashicorp/consul/proto-public/pbmulticluster/v2beta1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
// ControllerName is the name for this controller. It's used for logging or status keys.
|
||||
const ControllerName = "consul.io/gateway-proxy-controller"
|
||||
const ControllerName = "consul.io/gateway-proxy"
|
||||
|
||||
// Controller is responsible for triggering reconciler for watched resources
|
||||
func Controller(cache *cache.Cache) *controller.Controller {
|
||||
func Controller(cache *cache.Cache, trustDomainFetcher sidecarproxy.TrustDomainFetcher, dc string, defaultAllow bool) *controller.Controller {
|
||||
// TODO NET-7016 Use caching functionality in NewController being implemented at time of writing
|
||||
// TODO NET-7017 Add the host of other types we should watch
|
||||
return controller.NewController(ControllerName, pbmesh.ProxyStateTemplateType).
|
||||
WithWatch(pbcatalog.WorkloadType, dependency.ReplaceType(pbmesh.ProxyStateTemplateType)).
|
||||
WithWatch(pbmesh.ComputedProxyConfigurationType, dependency.ReplaceType(pbmesh.ProxyStateTemplateType)).
|
||||
WithReconciler(&reconciler{
|
||||
cache: cache,
|
||||
cache: cache,
|
||||
dc: dc,
|
||||
defaultAllow: defaultAllow,
|
||||
getTrustDomain: trustDomainFetcher,
|
||||
})
|
||||
}
|
||||
|
||||
// reconciler is responsible for managing the ProxyStateTemplate for all
|
||||
// gateway types: mesh, api (future) and terminating (future).
|
||||
type reconciler struct {
|
||||
cache *cache.Cache
|
||||
cache *cache.Cache
|
||||
dc string
|
||||
defaultAllow bool
|
||||
getTrustDomain sidecarproxy.TrustDomainFetcher
|
||||
}
|
||||
|
||||
// Reconcile is responsible for creating and updating the pbmesh.ProxyStateTemplate
|
||||
|
@ -60,9 +69,8 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
|
|||
}
|
||||
|
||||
if workload == nil {
|
||||
// If workload has been deleted, then return as ProxyStateTemplate should be cleaned up
|
||||
// by the garbage collector because of the owner reference.
|
||||
rt.Logger.Trace("workload doesn't exist; skipping reconciliation", "workload", workloadID)
|
||||
// Workload no longer exists, let garbage collector clean up
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -104,7 +112,27 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
|
|||
rt.Logger.Trace("proxy state template for this gateway doesn't yet exist; generating a new one")
|
||||
}
|
||||
|
||||
newPST := builder.NewProxyStateTemplateBuilder(workload).Build()
|
||||
exportedServicesID := &pbresource.ID{
|
||||
Name: "global",
|
||||
Tenancy: &pbresource.Tenancy{
|
||||
Partition: req.ID.Tenancy.Partition,
|
||||
},
|
||||
Type: pbmulticluster.ExportedServicesType,
|
||||
}
|
||||
|
||||
exportedServices, err := dataFetcher.FetchExportedServices(ctx, exportedServicesID)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error reading the associated exported services", "error", err)
|
||||
exportedServices = &types.DecodedComputedExportedServices{}
|
||||
}
|
||||
|
||||
trustDomain, err := r.getTrustDomain()
|
||||
if err != nil {
|
||||
rt.Logger.Error("error fetching trust domain to compute proxy state template", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
newPST := builder.NewProxyStateTemplateBuilder(workload, exportedServices, rt.Logger, dataFetcher, r.dc, trustDomain).Build()
|
||||
|
||||
proxyTemplateData, err := anypb.New(newPST)
|
||||
if err != nil {
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/hashicorp/consul/internal/resource"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
|
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
|
||||
pbmulticluster "github.com/hashicorp/consul/proto-public/pbmulticluster/v2beta1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
|
@ -34,7 +35,7 @@ func (f *Fetcher) FetchMeshGateway(ctx context.Context, id *pbresource.ID) (*typ
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
return dec, err
|
||||
return dec, nil
|
||||
}
|
||||
|
||||
func (f *Fetcher) FetchProxyStateTemplate(ctx context.Context, id *pbresource.ID) (*types.DecodedProxyStateTemplate, error) {
|
||||
|
@ -45,7 +46,7 @@ func (f *Fetcher) FetchProxyStateTemplate(ctx context.Context, id *pbresource.ID
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
return dec, err
|
||||
return dec, nil
|
||||
}
|
||||
|
||||
func (f *Fetcher) FetchWorkload(ctx context.Context, id *pbresource.ID) (*types.DecodedWorkload, error) {
|
||||
|
@ -56,5 +57,27 @@ func (f *Fetcher) FetchWorkload(ctx context.Context, id *pbresource.ID) (*types.
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
return dec, err
|
||||
return dec, nil
|
||||
}
|
||||
|
||||
func (f *Fetcher) FetchExportedServices(ctx context.Context, id *pbresource.ID) (*types.DecodedComputedExportedServices, error) {
|
||||
dec, err := resource.GetDecodedResource[*pbmulticluster.ComputedExportedServices](ctx, f.client, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if dec == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return dec, nil
|
||||
}
|
||||
|
||||
func (f *Fetcher) FetchService(ctx context.Context, id *pbresource.ID) (*types.DecodedService, error) {
|
||||
dec, err := resource.GetDecodedResource[*pbcatalog.Service](ctx, f.client, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if dec == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return dec, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,250 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package fetcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
|
||||
"github.com/hashicorp/consul/internal/catalog"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/controllers/sidecarproxy/cache"
|
||||
"github.com/hashicorp/consul/internal/mesh/internal/types"
|
||||
"github.com/hashicorp/consul/internal/multicluster"
|
||||
"github.com/hashicorp/consul/internal/resource/resourcetest"
|
||||
pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
|
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
|
||||
pbmulticluster "github.com/hashicorp/consul/proto-public/pbmulticluster/v2beta1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
type dataFetcherSuite struct {
|
||||
suite.Suite
|
||||
|
||||
ctx context.Context
|
||||
client pbresource.ResourceServiceClient
|
||||
resourceClient *resourcetest.Client
|
||||
rt controller.Runtime
|
||||
|
||||
apiService *pbresource.Resource
|
||||
meshGateway *pbresource.Resource
|
||||
proxyStateTemplate *pbresource.Resource
|
||||
workload *pbresource.Resource
|
||||
exportedServices *pbresource.Resource
|
||||
|
||||
tenancies []*pbresource.Tenancy
|
||||
}
|
||||
|
||||
func (suite *dataFetcherSuite) SetupTest() {
|
||||
suite.ctx = testutil.TestContext(suite.T())
|
||||
suite.tenancies = resourcetest.TestTenancies()
|
||||
suite.client = svctest.NewResourceServiceBuilder().
|
||||
WithRegisterFns(types.Register, catalog.RegisterTypes, multicluster.RegisterTypes).
|
||||
WithTenancies(suite.tenancies...).
|
||||
Run(suite.T())
|
||||
suite.resourceClient = resourcetest.NewClient(suite.client)
|
||||
suite.rt = controller.Runtime{
|
||||
Client: suite.client,
|
||||
Logger: testutil.Logger(suite.T()),
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *dataFetcherSuite) setupWithTenancy(tenancy *pbresource.Tenancy) {
|
||||
suite.apiService = resourcetest.Resource(pbcatalog.ServiceType, "api-1").
|
||||
WithTenancy(tenancy).
|
||||
WithData(suite.T(), &pbcatalog.Service{
|
||||
Ports: []*pbcatalog.ServicePort{
|
||||
{TargetPort: "tcp", VirtualPort: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP},
|
||||
{TargetPort: "mesh", VirtualPort: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH},
|
||||
},
|
||||
},
|
||||
).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
suite.meshGateway = resourcetest.Resource(pbmesh.MeshGatewayType, "mesh-gateway-1").
|
||||
WithData(suite.T(), &pbmesh.MeshGateway{
|
||||
GatewayClassName: "gateway-class-1",
|
||||
}).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
suite.proxyStateTemplate = resourcetest.Resource(pbmesh.ProxyStateTemplateType, "proxy-state-template-1").
|
||||
WithTenancy(tenancy).
|
||||
WithData(suite.T(), &pbmesh.ProxyStateTemplate{}).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
identityID := resourcetest.Resource(pbauth.WorkloadIdentityType, "workload-identity-abc").
|
||||
WithTenancy(tenancy).ID()
|
||||
|
||||
suite.workload = resourcetest.Resource(pbcatalog.WorkloadType, "service-workload-abc").
|
||||
WithTenancy(tenancy).
|
||||
WithData(suite.T(), &pbcatalog.Workload{
|
||||
Identity: identityID.Name,
|
||||
Ports: map[string]*pbcatalog.WorkloadPort{
|
||||
"foo": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
|
||||
},
|
||||
Addresses: []*pbcatalog.WorkloadAddress{
|
||||
{
|
||||
Host: "10.0.0.1",
|
||||
Ports: []string{"foo"},
|
||||
},
|
||||
},
|
||||
}).Write(suite.T(), suite.client)
|
||||
|
||||
suite.exportedServices = resourcetest.Resource(pbmulticluster.ComputedExportedServicesType, "global").
|
||||
WithData(suite.T(), &pbmulticluster.ComputedExportedServices{}).
|
||||
Write(suite.T(), suite.client)
|
||||
}
|
||||
|
||||
func (suite *dataFetcherSuite) TestFetcher_FetchMeshGateway() {
|
||||
suite.runTestCaseWithTenancies(func(_ *pbresource.Tenancy) {
|
||||
c := cache.New()
|
||||
|
||||
f := Fetcher{
|
||||
cache: c,
|
||||
client: suite.client,
|
||||
}
|
||||
|
||||
testutil.RunStep(suite.T(), "mesh gateway does not exist", func(t *testing.T) {
|
||||
nonExistantID := resourcetest.Resource(pbmesh.MeshGatewayType, "not-found").ID()
|
||||
gtw, err := f.FetchMeshGateway(suite.ctx, nonExistantID)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, gtw)
|
||||
})
|
||||
|
||||
testutil.RunStep(suite.T(), "mesh gateway exists", func(t *testing.T) {
|
||||
gtw, err := f.FetchMeshGateway(suite.ctx, suite.meshGateway.Id)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, gtw)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (suite *dataFetcherSuite) TestFetcher_FetchProxyStateTemplate() {
|
||||
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
|
||||
c := cache.New()
|
||||
|
||||
f := Fetcher{
|
||||
cache: c,
|
||||
client: suite.client,
|
||||
}
|
||||
|
||||
testutil.RunStep(suite.T(), "proxy state template does not exist", func(t *testing.T) {
|
||||
nonExistantID := resourcetest.Resource(pbmesh.ProxyStateTemplateType, "not-found").WithTenancy(tenancy).ID()
|
||||
tmpl, err := f.FetchProxyStateTemplate(suite.ctx, nonExistantID)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, tmpl)
|
||||
})
|
||||
|
||||
testutil.RunStep(suite.T(), "proxy state template exists", func(t *testing.T) {
|
||||
tmpl, err := f.FetchProxyStateTemplate(suite.ctx, suite.proxyStateTemplate.Id)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, tmpl)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (suite *dataFetcherSuite) TestFetcher_FetchWorkload() {
|
||||
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
|
||||
c := cache.New()
|
||||
|
||||
f := Fetcher{
|
||||
cache: c,
|
||||
client: suite.client,
|
||||
}
|
||||
|
||||
testutil.RunStep(suite.T(), "workload does not exist", func(t *testing.T) {
|
||||
nonExistantID := resourcetest.Resource(pbcatalog.WorkloadType, "not-found").WithTenancy(tenancy).ID()
|
||||
tmpl, err := f.FetchWorkload(suite.ctx, nonExistantID)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, tmpl)
|
||||
})
|
||||
|
||||
testutil.RunStep(suite.T(), "workload exists", func(t *testing.T) {
|
||||
tmpl, err := f.FetchWorkload(suite.ctx, suite.workload.Id)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, tmpl)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (suite *dataFetcherSuite) TestFetcher_FetchExportedServices() {
|
||||
suite.runTestCaseWithTenancies(func(_ *pbresource.Tenancy) {
|
||||
c := cache.New()
|
||||
|
||||
f := Fetcher{
|
||||
cache: c,
|
||||
client: suite.client,
|
||||
}
|
||||
|
||||
testutil.RunStep(suite.T(), "exported services do not exist", func(t *testing.T) {
|
||||
nonExistantID := resourcetest.Resource(pbmulticluster.ComputedExportedServicesType, "not-found").ID()
|
||||
svcs, err := f.FetchExportedServices(suite.ctx, nonExistantID)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, svcs)
|
||||
})
|
||||
|
||||
testutil.RunStep(suite.T(), "workload exists", func(t *testing.T) {
|
||||
svcs, err := f.FetchExportedServices(suite.ctx, suite.exportedServices.Id)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, svcs)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (suite *dataFetcherSuite) TestFetcher_FetchService() {
|
||||
suite.runTestCaseWithTenancies(func(tenancy *pbresource.Tenancy) {
|
||||
c := cache.New()
|
||||
|
||||
f := Fetcher{
|
||||
cache: c,
|
||||
client: suite.client,
|
||||
}
|
||||
|
||||
testutil.RunStep(suite.T(), "service does not exist", func(t *testing.T) {
|
||||
nonExistantID := resourcetest.Resource(pbcatalog.ServiceType, "not-found").WithTenancy(tenancy).ID()
|
||||
svc, err := f.FetchService(suite.ctx, nonExistantID)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, svc)
|
||||
})
|
||||
|
||||
testutil.RunStep(suite.T(), "service exists", func(t *testing.T) {
|
||||
svc, err := f.FetchService(suite.ctx, suite.apiService.Id)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, svc)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestDataFetcher(t *testing.T) {
|
||||
suite.Run(t, new(dataFetcherSuite))
|
||||
}
|
||||
|
||||
func (suite *dataFetcherSuite) appendTenancyInfo(tenancy *pbresource.Tenancy) string {
|
||||
return fmt.Sprintf("%s_Namespace_%s_Partition", tenancy.Namespace, tenancy.Partition)
|
||||
}
|
||||
|
||||
func (suite *dataFetcherSuite) cleanUpNodes() {
|
||||
suite.resourceClient.MustDelete(suite.T(), suite.apiService.Id)
|
||||
suite.resourceClient.MustDelete(suite.T(), suite.meshGateway.Id)
|
||||
suite.resourceClient.MustDelete(suite.T(), suite.proxyStateTemplate.Id)
|
||||
}
|
||||
|
||||
func (suite *dataFetcherSuite) runTestCaseWithTenancies(t func(*pbresource.Tenancy)) {
|
||||
for _, tenancy := range suite.tenancies {
|
||||
suite.Run(suite.appendTenancyInfo(tenancy), func() {
|
||||
suite.setupWithTenancy(tenancy)
|
||||
suite.T().Cleanup(func() {
|
||||
suite.cleanUpNodes()
|
||||
})
|
||||
t(tenancy)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -48,7 +48,7 @@ func Register(mgr *controller.Manager, deps Dependencies) {
|
|||
sidecarproxy.Controller(cache.New(), deps.TrustDomainFetcher, deps.LocalDatacenter, deps.DefaultAllow),
|
||||
)
|
||||
|
||||
mgr.Register(gatewayproxy.Controller(cache.New()))
|
||||
mgr.Register(gatewayproxy.Controller(cache.New(), deps.TrustDomainFetcher, deps.LocalDatacenter, deps.DefaultAllow))
|
||||
|
||||
mgr.Register(routes.Controller())
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
|
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
|
||||
pbmulticluster "github.com/hashicorp/consul/proto-public/pbmulticluster/v2beta1"
|
||||
)
|
||||
|
||||
type (
|
||||
|
@ -28,4 +29,5 @@ type (
|
|||
DecodedComputedDestinations = resource.DecodedResource[*pbmesh.ComputedExplicitDestinations]
|
||||
DecodedProxyStateTemplate = resource.DecodedResource[*pbmesh.ProxyStateTemplate]
|
||||
DecodedMeshGateway = resource.DecodedResource[*pbmesh.MeshGateway]
|
||||
DecodedComputedExportedServices = resource.DecodedResource[*pbmulticluster.ComputedExportedServices]
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue