mirror of https://github.com/status-im/consul.git
optimized fetching services in exported service controller (#19695)
* optimized fetching services in exported service controller * added aliases for some complex types
This commit is contained in:
parent
58cc6eded4
commit
a28f4b7f37
|
@ -76,10 +76,24 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
|
|||
rt.Logger.Error("error getting partitioned exported services", "error", err)
|
||||
return err
|
||||
}
|
||||
//TODO: we are listing more services than required. this should be optimized
|
||||
oldComputedExportedService, err := getOldComputedExportedService(ctx, rt, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(exportedServices) == 0 && len(namespaceExportedServices) == 0 && len(partitionedExportedServices) == 0 {
|
||||
if oldComputedExportedService.GetResource() != nil {
|
||||
rt.Logger.Trace("deleting computed exported services")
|
||||
if err := deleteResource(ctx, rt, oldComputedExportedService.GetResource()); err != nil {
|
||||
rt.Logger.Error("error deleting computed exported service", "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
namespace := getNamespaceForServices(exportedServices, namespaceExportedServices, partitionedExportedServices)
|
||||
services, err := resource.ListDecodedResource[*pbcatalog.Service](ctx, rt.Client, &pbresource.ListRequest{
|
||||
Tenancy: &pbresource.Tenancy{
|
||||
Namespace: storage.Wildcard,
|
||||
Namespace: namespace,
|
||||
Partition: req.ID.Tenancy.Partition,
|
||||
PeerName: resource.DefaultPeerName,
|
||||
},
|
||||
|
@ -123,20 +137,11 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
|
|||
builder.track(svc.Id, pes.Data.Consumers)
|
||||
}
|
||||
}
|
||||
|
||||
oldComputedExportedService, err := getOldComputedExportedService(ctx, rt, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newComputedExportedService := builder.build()
|
||||
|
||||
if oldComputedExportedService.GetResource() != nil && newComputedExportedService == nil {
|
||||
rt.Logger.Trace("deleting computed exported services")
|
||||
_, err := rt.Client.Delete(ctx, &pbresource.DeleteRequest{
|
||||
Id: oldComputedExportedService.GetResource().GetId(),
|
||||
Version: oldComputedExportedService.GetResource().GetVersion(),
|
||||
})
|
||||
if err != nil {
|
||||
if err := deleteResource(ctx, rt, oldComputedExportedService.GetResource()); err != nil {
|
||||
rt.Logger.Error("error deleting computed exported service", "error", err)
|
||||
return err
|
||||
}
|
||||
|
@ -293,3 +298,43 @@ func sortRefValue(m map[resource.ReferenceKey]*serviceExports) []*serviceExports
|
|||
})
|
||||
return vals
|
||||
}
|
||||
|
||||
func getNamespaceForServices(exportedServices []*types.DecodedExportedServices, namespaceExportedServices []*types.DecodedNamespaceExportedServices, partitionedExportedServices []*types.DecodedPartitionExportedServices) string {
|
||||
if len(partitionedExportedServices) > 0 {
|
||||
return storage.Wildcard
|
||||
}
|
||||
resources := []*pbresource.Resource{}
|
||||
for _, exp := range exportedServices {
|
||||
resources = append(resources, exp.GetResource())
|
||||
}
|
||||
for _, exp := range namespaceExportedServices {
|
||||
resources = append(resources, exp.GetResource())
|
||||
}
|
||||
return getNamespace(resources)
|
||||
}
|
||||
|
||||
func getNamespace(resources []*pbresource.Resource) string {
|
||||
if len(resources) == 0 {
|
||||
// We shouldn't ever hit this.
|
||||
panic("resources cannot be empty")
|
||||
}
|
||||
|
||||
namespace := resources[0].Id.Tenancy.Namespace
|
||||
for _, res := range resources[1:] {
|
||||
if res.Id.Tenancy.Namespace != namespace {
|
||||
return storage.Wildcard
|
||||
}
|
||||
}
|
||||
return namespace
|
||||
}
|
||||
|
||||
func deleteResource(ctx context.Context, rt controller.Runtime, resource *pbresource.Resource) error {
|
||||
_, err := rt.Client.Delete(ctx, &pbresource.DeleteRequest{
|
||||
Id: resource.GetId(),
|
||||
Version: resource.GetVersion(),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
svc "github.com/hashicorp/consul/agent/grpc-external/services/resource"
|
||||
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
|
||||
|
@ -83,31 +82,19 @@ func (suite *controllerSuite) appendTenancyInfo(tenancy *pbresource.Tenancy) str
|
|||
return fmt.Sprintf("%s_Namespace_%s_Partition", tenancy.Namespace, tenancy.Partition)
|
||||
}
|
||||
|
||||
func removeService(consumers []*pbmulticluster.ComputedExportedService, ref *pbresource.Reference) []*pbmulticluster.ComputedExportedService {
|
||||
newConsumers := []*pbmulticluster.ComputedExportedService{}
|
||||
for _, consumer := range consumers {
|
||||
if !proto.Equal(consumer.TargetRef, ref) {
|
||||
newConsumers = append(newConsumers, consumer)
|
||||
}
|
||||
}
|
||||
return newConsumers
|
||||
}
|
||||
|
||||
func (suite *controllerSuite) getComputedExportedSvc(id *pbresource.ID) *pbmulticluster.ComputedExportedServices {
|
||||
computedExportedService := suite.client.RequireResourceExists(suite.T(), id)
|
||||
decodedComputedExportedService := rtest.MustDecode[*pbmulticluster.ComputedExportedServices](suite.T(), computedExportedService)
|
||||
return decodedComputedExportedService.Data
|
||||
}
|
||||
|
||||
var svc0, svc2, svc3, svc4, svc5 *pbresource.Resource
|
||||
|
||||
func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {
|
||||
id := rtest.Resource(pbmulticluster.ComputedExportedServicesType, "global").WithTenancy(&pbresource.Tenancy{
|
||||
Partition: tenancy.Partition,
|
||||
}).ID()
|
||||
require.NotNil(suite.T(), id)
|
||||
|
||||
rtest.Resource(pbcatalog.ServiceType, "svc1").
|
||||
svc1 := rtest.Resource(pbcatalog.ServiceType, "svc1").
|
||||
WithData(suite.T(), &pbcatalog.Service{
|
||||
Ports: []*pbcatalog.ServicePort{
|
||||
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
|
||||
|
@ -137,7 +124,7 @@ func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {
|
|||
expectedComputedExportedService := getExpectation(tenancy, suite.isEnterprise, 0)
|
||||
prototest.AssertDeepEqual(suite.T(), expectedComputedExportedService, actualComputedExportedService)
|
||||
|
||||
svc2 = rtest.Resource(pbcatalog.ServiceType, "svc2").
|
||||
rtest.Resource(pbcatalog.ServiceType, "svc2").
|
||||
WithData(suite.T(), &pbcatalog.Service{
|
||||
Ports: []*pbcatalog.ServicePort{
|
||||
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
|
||||
|
@ -146,7 +133,7 @@ func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {
|
|||
WithTenancy(tenancy).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
svc0 = rtest.Resource(pbcatalog.ServiceType, "svc0").
|
||||
svc0 := rtest.Resource(pbcatalog.ServiceType, "svc0").
|
||||
WithData(suite.T(), &pbcatalog.Service{
|
||||
Ports: []*pbcatalog.ServicePort{
|
||||
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
|
||||
|
@ -172,7 +159,7 @@ func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {
|
|||
expectedComputedExportedService = getExpectation(tenancy, suite.isEnterprise, 1)
|
||||
prototest.AssertDeepEqual(suite.T(), expectedComputedExportedService, actualComputedExportedService)
|
||||
|
||||
svc3 = rtest.Resource(pbcatalog.ServiceType, "svc3").
|
||||
svc3 := rtest.Resource(pbcatalog.ServiceType, "svc3").
|
||||
WithData(suite.T(), &pbcatalog.Service{
|
||||
Ports: []*pbcatalog.ServicePort{
|
||||
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
|
||||
|
@ -212,7 +199,7 @@ func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {
|
|||
expectedComputedExportedService = getExpectation(tenancy, suite.isEnterprise, 4)
|
||||
prototest.AssertDeepEqual(suite.T(), expectedComputedExportedService, actualComputedExportedService)
|
||||
|
||||
svc4 = rtest.Resource(pbcatalog.ServiceType, "svc4").
|
||||
svc4 := rtest.Resource(pbcatalog.ServiceType, "svc4").
|
||||
WithData(suite.T(), &pbcatalog.Service{
|
||||
Ports: []*pbcatalog.ServicePort{
|
||||
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
|
||||
|
@ -238,7 +225,7 @@ func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {
|
|||
expectedComputedExportedService = getExpectation(tenancy, suite.isEnterprise, 6)
|
||||
prototest.AssertDeepEqual(suite.T(), expectedComputedExportedService, actualComputedExportedService)
|
||||
|
||||
svc5 = rtest.Resource(pbcatalog.ServiceType, "svc5").
|
||||
rtest.Resource(pbcatalog.ServiceType, "svc5").
|
||||
WithData(suite.T(), &pbcatalog.Service{
|
||||
Ports: []*pbcatalog.ServicePort{
|
||||
{TargetPort: "http", Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
|
||||
|
@ -278,6 +265,37 @@ func (suite *controllerSuite) reconcileTest(tenancy *pbresource.Tenancy) {
|
|||
|
||||
suite.client.RequireResourceNotFound(suite.T(), id)
|
||||
|
||||
nameexpSvc1 := rtest.Resource(pbmulticluster.NamespaceExportedServicesType, "namesvc1").WithData(suite.T(), exportedNamespaceSvcData).WithTenancy(&pbresource.Tenancy{
|
||||
Partition: tenancy.Partition,
|
||||
Namespace: "app",
|
||||
}).Write(suite.T(), suite.client)
|
||||
require.NotNil(suite.T(), nameexpSvc1)
|
||||
err = suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: id})
|
||||
require.NoError(suite.T(), err)
|
||||
actualComputedExportedService = suite.getComputedExportedSvc(id)
|
||||
expectedComputedExportedService = getExpectation(&pbresource.Tenancy{
|
||||
Partition: tenancy.Partition,
|
||||
Namespace: "app",
|
||||
}, suite.isEnterprise, 10)
|
||||
prototest.AssertDeepEqual(suite.T(), expectedComputedExportedService, actualComputedExportedService)
|
||||
|
||||
expSvc1 := rtest.Resource(pbmulticluster.ExportedServicesType, "expsvc1").WithData(suite.T(), exportedSvcData).WithTenancy(tenancy).Write(suite.T(), suite.client)
|
||||
require.NotNil(suite.T(), expSvc1)
|
||||
|
||||
err = suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: id})
|
||||
require.NoError(suite.T(), err)
|
||||
actualComputedExportedService = suite.getComputedExportedSvc(id)
|
||||
expectedComputedExportedService = getExpectation(tenancy, suite.isEnterprise, 11)
|
||||
prototest.AssertDeepEqual(suite.T(), expectedComputedExportedService, actualComputedExportedService)
|
||||
|
||||
suite.client.MustDelete(suite.T(), svc0.Id)
|
||||
suite.client.MustDelete(suite.T(), svc1.Id)
|
||||
|
||||
err = suite.reconciler.Reconcile(suite.ctx, suite.rt, controller.Request{ID: id})
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
suite.client.RequireResourceNotFound(suite.T(), id)
|
||||
|
||||
}
|
||||
|
||||
func getExpectation(tenancy *pbresource.Tenancy, isEnterprise bool, testCase int) *pbmulticluster.ComputedExportedServices {
|
||||
|
@ -416,6 +434,15 @@ func getExpectation(tenancy *pbresource.Tenancy, isEnterprise bool, testCase int
|
|||
return makeCES(
|
||||
makeConsumer(svc1Ref, peer0Consumer, part0Consumer),
|
||||
)
|
||||
case 10:
|
||||
return makeCES(
|
||||
makeConsumer(svc0Ref, peer1Consumer),
|
||||
)
|
||||
case 11:
|
||||
return makeCES(
|
||||
makeConsumer(svc0Ref, peer1Consumer),
|
||||
makeConsumer(svc1Ref, peer0Consumer, part0Consumer),
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package types
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
pbmulticluster "github.com/hashicorp/consul/proto-public/pbmulticluster/v2beta1"
|
||||
)
|
||||
|
||||
type (
|
||||
DecodedExportedServices = resource.DecodedResource[*pbmulticluster.ExportedServices]
|
||||
DecodedNamespaceExportedServices = resource.DecodedResource[*pbmulticluster.NamespaceExportedServices]
|
||||
DecodedPartitionExportedServices = resource.DecodedResource[*pbmulticluster.PartitionExportedServices]
|
||||
)
|
Loading…
Reference in New Issue