mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 14:55:02 +00:00
catalog: add metadata filtering to refine workload selectors (#19198)
This implements the Filter field on pbcatalog.WorkloadSelector to be a post-fetch in-memory filter using the https://github.com/hashicorp/go-bexpr expression language to filter resources based on their envelope metadata fields. All existing usages of WorkloadSelector should be able to make use of the filter.
This commit is contained in:
parent
f0e4897736
commit
99f7a1219e
@ -101,3 +101,12 @@ func NewFailoverPolicyMapper() FailoverPolicyMapper {
|
||||
func ValidateLocalServiceRefNoSection(ref *pbresource.Reference, wrapErr func(error) error) error {
|
||||
return types.ValidateLocalServiceRefNoSection(ref, wrapErr)
|
||||
}
|
||||
|
||||
// ValidateSelector ensures that the selector has at least one exact or prefix
|
||||
// match constraint, and that if a filter is present it is valid.
|
||||
//
|
||||
// The selector can be nil, and have zero exact/prefix matches if allowEmpty is
|
||||
// set to true.
|
||||
func ValidateSelector(sel *pbcatalog.WorkloadSelector, allowEmpty bool) error {
|
||||
return types.ValidateSelector(sel, allowEmpty)
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ package endpoints
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
@ -169,6 +170,14 @@ func gatherWorkloadsForService(ctx context.Context, rt controller.Runtime, svc *
|
||||
workloadNames[rsp.Resource.Id.Name] = struct{}{}
|
||||
}
|
||||
|
||||
if sel.GetFilter() != "" && len(workloads) > 0 {
|
||||
var err error
|
||||
workloads, err = resource.FilterResourcesByMetadata(workloads, sel.GetFilter())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error filtering results by metadata: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Sorting ensures deterministic output. This will help for testing but
|
||||
// the real reason to do this is so we will be able to diff the set of
|
||||
// workloads endpoints to determine if we need to update them.
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/stretchr/testify/suite"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
|
||||
"github.com/hashicorp/consul/internal/catalog/internal/types"
|
||||
@ -30,14 +31,16 @@ type reconciliationDataSuite struct {
|
||||
client pbresource.ResourceServiceClient
|
||||
rt controller.Runtime
|
||||
|
||||
apiServiceData *pbcatalog.Service
|
||||
apiService *pbresource.Resource
|
||||
apiEndpoints *pbresource.Resource
|
||||
api1Workload *pbresource.Resource
|
||||
api2Workload *pbresource.Resource
|
||||
api123Workload *pbresource.Resource
|
||||
web1Workload *pbresource.Resource
|
||||
web2Workload *pbresource.Resource
|
||||
apiServiceData *pbcatalog.Service
|
||||
apiService *pbresource.Resource
|
||||
apiServiceSubsetData *pbcatalog.Service
|
||||
apiServiceSubset *pbresource.Resource
|
||||
apiEndpoints *pbresource.Resource
|
||||
api1Workload *pbresource.Resource
|
||||
api2Workload *pbresource.Resource
|
||||
api123Workload *pbresource.Resource
|
||||
web1Workload *pbresource.Resource
|
||||
web2Workload *pbresource.Resource
|
||||
}
|
||||
|
||||
func (suite *reconciliationDataSuite) SetupTest() {
|
||||
@ -62,12 +65,19 @@ func (suite *reconciliationDataSuite) SetupTest() {
|
||||
},
|
||||
},
|
||||
}
|
||||
suite.apiServiceSubsetData = proto.Clone(suite.apiServiceData).(*pbcatalog.Service)
|
||||
suite.apiServiceSubsetData.Workloads.Filter = "(zim in metadata) and (metadata.zim matches `^g.`)"
|
||||
|
||||
suite.apiService = rtest.Resource(pbcatalog.ServiceType, "api").
|
||||
WithData(suite.T(), suite.apiServiceData).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
suite.apiServiceSubset = rtest.Resource(pbcatalog.ServiceType, "api-subset").
|
||||
WithData(suite.T(), suite.apiServiceSubsetData).
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
suite.api1Workload = rtest.Resource(pbcatalog.WorkloadType, "api-1").
|
||||
WithMeta("zim", "dib").
|
||||
WithData(suite.T(), &pbcatalog.Workload{
|
||||
Addresses: []*pbcatalog.WorkloadAddress{
|
||||
{Host: "127.0.0.1"},
|
||||
@ -92,6 +102,7 @@ func (suite *reconciliationDataSuite) SetupTest() {
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
suite.api123Workload = rtest.Resource(pbcatalog.WorkloadType, "api-123").
|
||||
WithMeta("zim", "gir").
|
||||
WithData(suite.T(), &pbcatalog.Workload{
|
||||
Addresses: []*pbcatalog.WorkloadAddress{
|
||||
{Host: "127.0.0.1"},
|
||||
@ -104,6 +115,7 @@ func (suite *reconciliationDataSuite) SetupTest() {
|
||||
Write(suite.T(), suite.client)
|
||||
|
||||
suite.web1Workload = rtest.Resource(pbcatalog.WorkloadType, "web-1").
|
||||
WithMeta("zim", "gaz").
|
||||
WithData(suite.T(), &pbcatalog.Workload{
|
||||
Addresses: []*pbcatalog.WorkloadAddress{
|
||||
{Host: "127.0.0.1"},
|
||||
@ -259,6 +271,20 @@ func (suite *reconciliationDataSuite) TestGetWorkloadData() {
|
||||
prototest.AssertDeepEqual(suite.T(), suite.web2Workload, data[4].resource)
|
||||
}
|
||||
|
||||
func (suite *reconciliationDataSuite) TestGetWorkloadDataWithFilter() {
|
||||
// This is like TestGetWorkloadData except it exercises the post-read
|
||||
// filter on the selector.
|
||||
data, err := getWorkloadData(suite.ctx, suite.rt, &serviceData{
|
||||
resource: suite.apiServiceSubset,
|
||||
service: suite.apiServiceSubsetData,
|
||||
})
|
||||
|
||||
require.NoError(suite.T(), err)
|
||||
require.Len(suite.T(), data, 2)
|
||||
prototest.AssertDeepEqual(suite.T(), suite.api123Workload, data[0].resource)
|
||||
prototest.AssertDeepEqual(suite.T(), suite.web1Workload, data[1].resource)
|
||||
}
|
||||
|
||||
func TestReconciliationData(t *testing.T) {
|
||||
suite.Run(t, new(reconciliationDataSuite))
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ func ValidateDNSPolicy(res *pbresource.Resource) error {
|
||||
var err error
|
||||
// Ensure that this resource isn't useless and is attempting to
|
||||
// select at least one workload.
|
||||
if selErr := validateSelector(policy.Workloads, false); selErr != nil {
|
||||
if selErr := ValidateSelector(policy.Workloads, false); selErr != nil {
|
||||
err = multierror.Append(err, resource.ErrInvalidField{
|
||||
Name: "workloads",
|
||||
Wrapped: selErr,
|
||||
|
@ -30,7 +30,7 @@ func ValidateHealthChecks(res *pbresource.Resource) error {
|
||||
var err error
|
||||
|
||||
// Validate the workload selector
|
||||
if selErr := validateSelector(checks.Workloads, false); selErr != nil {
|
||||
if selErr := ValidateSelector(checks.Workloads, false); selErr != nil {
|
||||
err = multierror.Append(err, resource.ErrInvalidField{
|
||||
Name: "workloads",
|
||||
Wrapped: selErr,
|
||||
|
@ -61,7 +61,7 @@ func ValidateService(res *pbresource.Resource) error {
|
||||
// ServiceEndpoints objects for this service such as when desiring to
|
||||
// configure endpoint information for external services that are not
|
||||
// registered as workloads
|
||||
if selErr := validateSelector(service.Workloads, true); selErr != nil {
|
||||
if selErr := ValidateSelector(service.Workloads, true); selErr != nil {
|
||||
err = multierror.Append(err, resource.ErrInvalidField{
|
||||
Name: "workloads",
|
||||
Wrapped: selErr,
|
||||
|
@ -78,7 +78,7 @@ func validateWorkloadHost(host string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateSelector(sel *pbcatalog.WorkloadSelector, allowEmpty bool) error {
|
||||
func ValidateSelector(sel *pbcatalog.WorkloadSelector, allowEmpty bool) error {
|
||||
if sel == nil {
|
||||
if allowEmpty {
|
||||
return nil
|
||||
@ -88,14 +88,20 @@ func validateSelector(sel *pbcatalog.WorkloadSelector, allowEmpty bool) error {
|
||||
}
|
||||
|
||||
if len(sel.Names) == 0 && len(sel.Prefixes) == 0 {
|
||||
if allowEmpty {
|
||||
return nil
|
||||
if !allowEmpty {
|
||||
return resource.ErrEmpty
|
||||
}
|
||||
|
||||
return resource.ErrEmpty
|
||||
if sel.Filter != "" {
|
||||
return resource.ErrInvalidField{
|
||||
Name: "filter",
|
||||
Wrapped: errors.New("filter cannot be set unless there is a name or prefix selector"),
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
var merr error
|
||||
|
||||
// Validate that all the exact match names are non-empty. This is
|
||||
// mostly for the sake of not admitting values that should always
|
||||
@ -103,7 +109,7 @@ func validateSelector(sel *pbcatalog.WorkloadSelector, allowEmpty bool) error {
|
||||
// This is because workloads must have non-empty names.
|
||||
for idx, name := range sel.Names {
|
||||
if name == "" {
|
||||
err = multierror.Append(err, resource.ErrInvalidListElement{
|
||||
merr = multierror.Append(merr, resource.ErrInvalidListElement{
|
||||
Name: "names",
|
||||
Index: idx,
|
||||
Wrapped: resource.ErrEmpty,
|
||||
@ -111,7 +117,14 @@ func validateSelector(sel *pbcatalog.WorkloadSelector, allowEmpty bool) error {
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
if err := resource.ValidateMetadataFilter(sel.GetFilter()); err != nil {
|
||||
merr = multierror.Append(merr, resource.ErrInvalidField{
|
||||
Name: "filter",
|
||||
Wrapped: err,
|
||||
})
|
||||
}
|
||||
|
||||
return merr
|
||||
}
|
||||
|
||||
func validateIPAddress(ip string) error {
|
||||
|
@ -4,6 +4,7 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -281,11 +282,49 @@ func TestValidateSelector(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
"filter-with-empty-query": {
|
||||
selector: &pbcatalog.WorkloadSelector{
|
||||
Filter: "garbage.value == zzz",
|
||||
},
|
||||
allowEmpty: true,
|
||||
err: resource.ErrInvalidField{
|
||||
Name: "filter",
|
||||
Wrapped: errors.New(
|
||||
`filter cannot be set unless there is a name or prefix selector`,
|
||||
),
|
||||
},
|
||||
},
|
||||
"bad-filter": {
|
||||
selector: &pbcatalog.WorkloadSelector{
|
||||
Prefixes: []string{"foo", "bar"},
|
||||
Filter: "garbage.value == zzz",
|
||||
},
|
||||
allowEmpty: false,
|
||||
err: &multierror.Error{
|
||||
Errors: []error{
|
||||
resource.ErrInvalidField{
|
||||
Name: "filter",
|
||||
Wrapped: fmt.Errorf(
|
||||
`filter "garbage.value == zzz" is invalid: %w`,
|
||||
errors.New(`Selector "garbage" is not valid`),
|
||||
),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"good-filter": {
|
||||
selector: &pbcatalog.WorkloadSelector{
|
||||
Prefixes: []string{"foo", "bar"},
|
||||
Filter: "metadata.zone == west1",
|
||||
},
|
||||
allowEmpty: false,
|
||||
err: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tcase := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
err := validateSelector(tcase.selector, tcase.allowEmpty)
|
||||
err := ValidateSelector(tcase.selector, tcase.allowEmpty)
|
||||
if tcase.err == nil {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
|
@ -92,7 +92,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
|
||||
destinationIDs := r.mapper.DestinationsForWorkload(req.ID)
|
||||
rt.Logger.Trace("cached destinations IDs", "ids", destinationIDs)
|
||||
|
||||
decodedDestinations, err := r.fetchDestinations(ctx, rt.Client, destinationIDs)
|
||||
decodedDestinations, err := r.fetchDestinations(ctx, rt.Client, destinationIDs, workload)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error fetching mapper", "error", err)
|
||||
return err
|
||||
@ -241,8 +241,9 @@ func validate(
|
||||
func (r *reconciler) fetchDestinations(
|
||||
ctx context.Context,
|
||||
client pbresource.ResourceServiceClient,
|
||||
destinationIDs []*pbresource.ID) ([]*types.DecodedDestinations, error) {
|
||||
|
||||
destinationIDs []*pbresource.ID,
|
||||
workload *types.DecodedWorkload,
|
||||
) ([]*types.DecodedDestinations, error) {
|
||||
// Sort all configs alphabetically.
|
||||
sort.Slice(destinationIDs, func(i, j int) bool {
|
||||
return destinationIDs[i].GetName() < destinationIDs[j].GetName()
|
||||
@ -259,6 +260,17 @@ func (r *reconciler) fetchDestinations(
|
||||
r.mapper.UntrackDestinations(id)
|
||||
continue
|
||||
}
|
||||
|
||||
if res.Data.Workloads.Filter != "" {
|
||||
match, err := resource.FilterMatchesResourceMetadata(workload.Resource, res.Data.Workloads.Filter)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error checking selector filters: %w", err)
|
||||
}
|
||||
if !match {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
decoded = append(decoded, res)
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@ package proxyconfiguration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
@ -86,7 +87,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
|
||||
proxyCfgIDs := r.proxyConfigMapper.IDsForWorkload(req.ID)
|
||||
rt.Logger.Trace("cached proxy cfg IDs", "ids", proxyCfgIDs)
|
||||
|
||||
decodedProxyCfgs, err := r.fetchProxyConfigs(ctx, rt.Client, proxyCfgIDs)
|
||||
decodedProxyCfgs, err := r.fetchProxyConfigs(ctx, rt.Client, proxyCfgIDs, workload)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error fetching proxy configurations", "error", err)
|
||||
return err
|
||||
@ -154,8 +155,9 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
|
||||
func (r *reconciler) fetchProxyConfigs(
|
||||
ctx context.Context,
|
||||
client pbresource.ResourceServiceClient,
|
||||
proxyCfgIds []*pbresource.ID) ([]*types.DecodedProxyConfiguration, error) {
|
||||
|
||||
proxyCfgIds []*pbresource.ID,
|
||||
workload *types.DecodedWorkload,
|
||||
) ([]*types.DecodedProxyConfiguration, error) {
|
||||
var decoded []*types.DecodedProxyConfiguration
|
||||
for _, id := range proxyCfgIds {
|
||||
res, err := resource.GetDecodedResource[*pbmesh.ProxyConfiguration](ctx, client, id)
|
||||
@ -167,6 +169,17 @@ func (r *reconciler) fetchProxyConfigs(
|
||||
r.proxyConfigMapper.UntrackID(id)
|
||||
continue
|
||||
}
|
||||
|
||||
if res.Data.Workloads.Filter != "" {
|
||||
match, err := resource.FilterMatchesResourceMetadata(workload.Resource, res.Data.Workloads.Filter)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error checking selector filters: %w", err)
|
||||
}
|
||||
if !match {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
decoded = append(decoded, res)
|
||||
}
|
||||
|
||||
|
@ -73,6 +73,14 @@ func ValidateDestinations(res *pbresource.Resource) error {
|
||||
|
||||
var merr error
|
||||
|
||||
// Validate the workload selector
|
||||
if selErr := catalog.ValidateSelector(destinations.Workloads, false); selErr != nil {
|
||||
merr = multierror.Append(merr, resource.ErrInvalidField{
|
||||
Name: "workloads",
|
||||
Wrapped: selErr,
|
||||
})
|
||||
}
|
||||
|
||||
for i, dest := range destinations.Destinations {
|
||||
wrapDestErr := func(err error) error {
|
||||
return resource.ErrInvalidListElement{
|
||||
@ -97,7 +105,5 @@ func ValidateDestinations(res *pbresource.Resource) error {
|
||||
// TODO(v2): validate ListenAddr
|
||||
}
|
||||
|
||||
// TODO(v2): validate workload selectors
|
||||
|
||||
return merr
|
||||
}
|
||||
|
@ -4,8 +4,12 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/go-multierror"
|
||||
|
||||
"github.com/hashicorp/consul/internal/catalog"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
func RegisterUpstreamsConfiguration(r resource.Registry) {
|
||||
@ -13,6 +17,26 @@ func RegisterUpstreamsConfiguration(r resource.Registry) {
|
||||
Type: pbmesh.DestinationsConfigurationType,
|
||||
Proto: &pbmesh.DestinationsConfiguration{},
|
||||
Scope: resource.ScopeNamespace,
|
||||
Validate: nil,
|
||||
Validate: ValidateDestinationsConfiguration,
|
||||
})
|
||||
}
|
||||
|
||||
func ValidateDestinationsConfiguration(res *pbresource.Resource) error {
|
||||
var cfg pbmesh.DestinationsConfiguration
|
||||
|
||||
if err := res.Data.UnmarshalTo(&cfg); err != nil {
|
||||
return resource.NewErrDataParse(&cfg, err)
|
||||
}
|
||||
|
||||
var merr error
|
||||
|
||||
// Validate the workload selector
|
||||
if selErr := catalog.ValidateSelector(cfg.Workloads, false); selErr != nil {
|
||||
merr = multierror.Append(merr, resource.ErrInvalidField{
|
||||
Name: "workloads",
|
||||
Wrapped: selErr,
|
||||
})
|
||||
}
|
||||
|
||||
return merr
|
||||
}
|
||||
|
@ -0,0 +1,80 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package types
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/resource/resourcetest"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
|
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
|
||||
"github.com/hashicorp/consul/proto/private/prototest"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
func TestValidateDestinationsConfiguration(t *testing.T) {
|
||||
type testcase struct {
|
||||
data *pbmesh.DestinationsConfiguration
|
||||
expectErr string
|
||||
}
|
||||
|
||||
run := func(t *testing.T, tc testcase) {
|
||||
res := resourcetest.Resource(pbmesh.DestinationsConfigurationType, "api").
|
||||
WithTenancy(resource.DefaultNamespacedTenancy()).
|
||||
WithData(t, tc.data).
|
||||
Build()
|
||||
|
||||
err := ValidateDestinationsConfiguration(res)
|
||||
|
||||
// Verify that validate didn't actually change the object.
|
||||
got := resourcetest.MustDecode[*pbmesh.DestinationsConfiguration](t, res)
|
||||
prototest.AssertDeepEqual(t, tc.data, got.Data)
|
||||
|
||||
if tc.expectErr == "" {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
testutil.RequireErrorContains(t, err, tc.expectErr)
|
||||
}
|
||||
}
|
||||
|
||||
cases := map[string]testcase{
|
||||
// emptiness
|
||||
"empty": {
|
||||
data: &pbmesh.DestinationsConfiguration{},
|
||||
expectErr: `invalid "workloads" field: cannot be empty`,
|
||||
},
|
||||
"empty selector": {
|
||||
data: &pbmesh.DestinationsConfiguration{
|
||||
Workloads: &pbcatalog.WorkloadSelector{},
|
||||
},
|
||||
expectErr: `invalid "workloads" field: cannot be empty`,
|
||||
},
|
||||
"bad selector": {
|
||||
data: &pbmesh.DestinationsConfiguration{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{"blah"},
|
||||
Filter: "garbage.foo == bar",
|
||||
},
|
||||
},
|
||||
expectErr: `invalid "filter" field: filter "garbage.foo == bar" is invalid: Selector "garbage" is not valid`,
|
||||
},
|
||||
"good selector": {
|
||||
data: &pbmesh.DestinationsConfiguration{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{"blah"},
|
||||
Filter: "metadata.foo == bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
run(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
@ -123,11 +123,30 @@ func TestValidateUpstreams(t *testing.T) {
|
||||
cases := map[string]testcase{
|
||||
// emptiness
|
||||
"empty": {
|
||||
data: &pbmesh.Destinations{},
|
||||
data: &pbmesh.Destinations{},
|
||||
expectErr: `invalid "workloads" field: cannot be empty`,
|
||||
},
|
||||
"empty selector": {
|
||||
data: &pbmesh.Destinations{
|
||||
Workloads: &pbcatalog.WorkloadSelector{},
|
||||
},
|
||||
expectErr: `invalid "workloads" field: cannot be empty`,
|
||||
},
|
||||
"bad selector": {
|
||||
data: &pbmesh.Destinations{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{"blah"},
|
||||
Filter: "garbage.foo == bar",
|
||||
},
|
||||
},
|
||||
expectErr: `invalid "filter" field: filter "garbage.foo == bar" is invalid: Selector "garbage" is not valid`,
|
||||
},
|
||||
"dest/nil ref": {
|
||||
skipMutate: true,
|
||||
data: &pbmesh.Destinations{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{"blah"},
|
||||
},
|
||||
Destinations: []*pbmesh.Destination{
|
||||
{DestinationRef: nil},
|
||||
},
|
||||
@ -137,6 +156,9 @@ func TestValidateUpstreams(t *testing.T) {
|
||||
"dest/bad type": {
|
||||
skipMutate: true,
|
||||
data: &pbmesh.Destinations{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{"blah"},
|
||||
},
|
||||
Destinations: []*pbmesh.Destination{
|
||||
{DestinationRef: newRefWithTenancy(pbcatalog.WorkloadType, "default.default", "api")},
|
||||
},
|
||||
@ -146,6 +168,9 @@ func TestValidateUpstreams(t *testing.T) {
|
||||
"dest/nil tenancy": {
|
||||
skipMutate: true,
|
||||
data: &pbmesh.Destinations{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{"blah"},
|
||||
},
|
||||
Destinations: []*pbmesh.Destination{
|
||||
{DestinationRef: &pbresource.Reference{Type: pbcatalog.ServiceType, Name: "api"}},
|
||||
},
|
||||
@ -155,6 +180,9 @@ func TestValidateUpstreams(t *testing.T) {
|
||||
"dest/bad dest tenancy/partition": {
|
||||
skipMutate: true,
|
||||
data: &pbmesh.Destinations{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{"blah"},
|
||||
},
|
||||
Destinations: []*pbmesh.Destination{
|
||||
{DestinationRef: newRefWithTenancy(pbcatalog.ServiceType, ".bar", "api")},
|
||||
},
|
||||
@ -164,6 +192,9 @@ func TestValidateUpstreams(t *testing.T) {
|
||||
"dest/bad dest tenancy/namespace": {
|
||||
skipMutate: true,
|
||||
data: &pbmesh.Destinations{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{"blah"},
|
||||
},
|
||||
Destinations: []*pbmesh.Destination{
|
||||
{DestinationRef: newRefWithTenancy(pbcatalog.ServiceType, "foo", "api")},
|
||||
},
|
||||
@ -173,6 +204,9 @@ func TestValidateUpstreams(t *testing.T) {
|
||||
"dest/bad dest tenancy/peer_name": {
|
||||
skipMutate: true,
|
||||
data: &pbmesh.Destinations{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{"blah"},
|
||||
},
|
||||
Destinations: []*pbmesh.Destination{
|
||||
{DestinationRef: resourcetest.Resource(pbcatalog.ServiceType, "api").
|
||||
WithTenancy(&pbresource.Tenancy{Partition: "foo", Namespace: "bar"}).
|
||||
@ -183,6 +217,22 @@ func TestValidateUpstreams(t *testing.T) {
|
||||
},
|
||||
"normal": {
|
||||
data: &pbmesh.Destinations{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{"blah"},
|
||||
},
|
||||
Destinations: []*pbmesh.Destination{
|
||||
{DestinationRef: newRefWithTenancy(pbcatalog.ServiceType, "foo.bar", "api")},
|
||||
{DestinationRef: newRefWithTenancy(pbcatalog.ServiceType, "foo.zim", "api")},
|
||||
{DestinationRef: newRefWithTenancy(pbcatalog.ServiceType, "gir.zim", "api")},
|
||||
},
|
||||
},
|
||||
},
|
||||
"normal with selector": {
|
||||
data: &pbmesh.Destinations{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{"blah"},
|
||||
Filter: "metadata.foo == bar",
|
||||
},
|
||||
Destinations: []*pbmesh.Destination{
|
||||
{DestinationRef: newRefWithTenancy(pbcatalog.ServiceType, "foo.bar", "api")},
|
||||
{DestinationRef: newRefWithTenancy(pbcatalog.ServiceType, "foo.zim", "api")},
|
||||
|
@ -4,6 +4,9 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/go-multierror"
|
||||
|
||||
"github.com/hashicorp/consul/internal/catalog"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
@ -12,12 +15,11 @@ import (
|
||||
|
||||
func RegisterProxyConfiguration(r resource.Registry) {
|
||||
r.Register(resource.Registration{
|
||||
Type: pbmesh.ProxyConfigurationType,
|
||||
Proto: &pbmesh.ProxyConfiguration{},
|
||||
Scope: resource.ScopeNamespace,
|
||||
// TODO(rb): add validation for proxy configuration
|
||||
Validate: nil,
|
||||
Type: pbmesh.ProxyConfigurationType,
|
||||
Proto: &pbmesh.ProxyConfiguration{},
|
||||
Scope: resource.ScopeNamespace,
|
||||
Mutate: MutateProxyConfiguration,
|
||||
Validate: ValidateProxyConfiguration,
|
||||
})
|
||||
}
|
||||
|
||||
@ -49,3 +51,25 @@ func MutateProxyConfiguration(res *pbresource.Resource) error {
|
||||
|
||||
return res.Data.MarshalFrom(&proxyCfg)
|
||||
}
|
||||
|
||||
func ValidateProxyConfiguration(res *pbresource.Resource) error {
|
||||
var cfg pbmesh.ProxyConfiguration
|
||||
|
||||
if err := res.Data.UnmarshalTo(&cfg); err != nil {
|
||||
return resource.NewErrDataParse(&cfg, err)
|
||||
}
|
||||
|
||||
var merr error
|
||||
|
||||
// Validate the workload selector
|
||||
if selErr := catalog.ValidateSelector(cfg.Workloads, false); selErr != nil {
|
||||
merr = multierror.Append(merr, resource.ErrInvalidField{
|
||||
Name: "workloads",
|
||||
Wrapped: selErr,
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(rb): add more validation for proxy configuration
|
||||
|
||||
return merr
|
||||
}
|
||||
|
@ -8,10 +8,13 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/resource/resourcetest"
|
||||
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
|
||||
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
|
||||
"github.com/hashicorp/consul/proto/private/prototest"
|
||||
"github.com/hashicorp/consul/sdk/iptables"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
func TestMutateProxyConfiguration(t *testing.T) {
|
||||
@ -82,3 +85,74 @@ func TestMutateProxyConfiguration(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateProxyConfiguration(t *testing.T) {
|
||||
type testcase struct {
|
||||
data *pbmesh.ProxyConfiguration
|
||||
expectErr string
|
||||
}
|
||||
|
||||
run := func(t *testing.T, tc testcase) {
|
||||
res := resourcetest.Resource(pbmesh.ProxyConfigurationType, "api").
|
||||
WithTenancy(resource.DefaultNamespacedTenancy()).
|
||||
WithData(t, tc.data).
|
||||
Build()
|
||||
|
||||
// Ensure things are properly mutated and updated in the inputs.
|
||||
err := MutateProxyConfiguration(res)
|
||||
require.NoError(t, err)
|
||||
{
|
||||
mutated := resourcetest.MustDecode[*pbmesh.ProxyConfiguration](t, res)
|
||||
tc.data = mutated.Data
|
||||
}
|
||||
|
||||
err = ValidateProxyConfiguration(res)
|
||||
|
||||
// Verify that validate didn't actually change the object.
|
||||
got := resourcetest.MustDecode[*pbmesh.ProxyConfiguration](t, res)
|
||||
prototest.AssertDeepEqual(t, tc.data, got.Data)
|
||||
|
||||
if tc.expectErr == "" {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
testutil.RequireErrorContains(t, err, tc.expectErr)
|
||||
}
|
||||
}
|
||||
|
||||
cases := map[string]testcase{
|
||||
// emptiness
|
||||
"empty": {
|
||||
data: &pbmesh.ProxyConfiguration{},
|
||||
expectErr: `invalid "workloads" field: cannot be empty`,
|
||||
},
|
||||
"empty selector": {
|
||||
data: &pbmesh.ProxyConfiguration{
|
||||
Workloads: &pbcatalog.WorkloadSelector{},
|
||||
},
|
||||
expectErr: `invalid "workloads" field: cannot be empty`,
|
||||
},
|
||||
"bad selector": {
|
||||
data: &pbmesh.ProxyConfiguration{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{"blah"},
|
||||
Filter: "garbage.foo == bar",
|
||||
},
|
||||
},
|
||||
expectErr: `invalid "filter" field: filter "garbage.foo == bar" is invalid: Selector "garbage" is not valid`,
|
||||
},
|
||||
"good selector": {
|
||||
data: &pbmesh.ProxyConfiguration{
|
||||
Workloads: &pbcatalog.WorkloadSelector{
|
||||
Names: []string{"blah"},
|
||||
Filter: "metadata.foo == bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
run(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
105
internal/resource/filter.go
Normal file
105
internal/resource/filter.go
Normal file
@ -0,0 +1,105 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package resource
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/go-bexpr"
|
||||
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
// FilterResourcesByMetadata will use the provided go-bexpr based filter to
|
||||
// retain matching items from the provided slice.
|
||||
//
|
||||
// The only variables usable in the expressions are the metadata keys prefixed
|
||||
// by "metadata."
|
||||
//
|
||||
// If no filter is provided, then this does nothing and returns the input.
|
||||
func FilterResourcesByMetadata(resources []*pbresource.Resource, filter string) ([]*pbresource.Resource, error) {
|
||||
if filter == "" || len(resources) == 0 {
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
eval, err := createMetadataFilterEvaluator(filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filtered := make([]*pbresource.Resource, 0, len(resources))
|
||||
for _, res := range resources {
|
||||
vars := &metadataFilterFieldDetails{
|
||||
Meta: res.Metadata,
|
||||
}
|
||||
match, err := eval.Evaluate(vars)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if match {
|
||||
filtered = append(filtered, res)
|
||||
}
|
||||
}
|
||||
if len(filtered) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return filtered, nil
|
||||
}
|
||||
|
||||
// FilterMatchesResourceMetadata will use the provided go-bexpr based filter to
|
||||
// determine if the provided resource matches.
|
||||
//
|
||||
// The only variables usable in the expressions are the metadata keys prefixed
|
||||
// by "metadata."
|
||||
//
|
||||
// If no filter is provided, then this returns true.
|
||||
func FilterMatchesResourceMetadata(res *pbresource.Resource, filter string) (bool, error) {
|
||||
if res == nil {
|
||||
return false, nil
|
||||
} else if filter == "" {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
eval, err := createMetadataFilterEvaluator(filter)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
vars := &metadataFilterFieldDetails{
|
||||
Meta: res.Metadata,
|
||||
}
|
||||
match, err := eval.Evaluate(vars)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return match, nil
|
||||
}
|
||||
|
||||
// ValidateMetadataFilter will validate that the provided filter is going to be
|
||||
// a valid input to the FilterResourcesByMetadata function.
|
||||
//
|
||||
// This is best called from a Validate hook.
|
||||
func ValidateMetadataFilter(filter string) error {
|
||||
if filter == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := createMetadataFilterEvaluator(filter)
|
||||
return err
|
||||
}
|
||||
|
||||
func createMetadataFilterEvaluator(filter string) (*bexpr.Evaluator, error) {
|
||||
sampleVars := &metadataFilterFieldDetails{
|
||||
Meta: make(map[string]string),
|
||||
}
|
||||
eval, err := bexpr.CreateEvaluatorForType(filter, nil, sampleVars)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("filter %q is invalid: %w", filter, err)
|
||||
}
|
||||
return eval, nil
|
||||
}
|
||||
|
||||
type metadataFilterFieldDetails struct {
|
||||
Meta map[string]string `bexpr:"metadata"`
|
||||
}
|
195
internal/resource/filter_test.go
Normal file
195
internal/resource/filter_test.go
Normal file
@ -0,0 +1,195 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package resource
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"github.com/hashicorp/consul/proto/private/prototest"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
func TestFilterResourcesByMetadata(t *testing.T) {
|
||||
type testcase struct {
|
||||
in []*pbresource.Resource
|
||||
filter string
|
||||
expect []*pbresource.Resource
|
||||
expectErr string
|
||||
}
|
||||
|
||||
create := func(name string, kvs ...string) *pbresource.Resource {
|
||||
require.True(t, len(kvs)%2 == 0)
|
||||
|
||||
meta := make(map[string]string)
|
||||
for i := 0; i < len(kvs); i += 2 {
|
||||
meta[kvs[i]] = kvs[i+1]
|
||||
}
|
||||
|
||||
return &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Name: name,
|
||||
},
|
||||
Metadata: meta,
|
||||
}
|
||||
}
|
||||
|
||||
run := func(t *testing.T, tc testcase) {
|
||||
got, err := FilterResourcesByMetadata(tc.in, tc.filter)
|
||||
if tc.expectErr != "" {
|
||||
require.Error(t, err)
|
||||
testutil.RequireErrorContains(t, err, tc.expectErr)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
prototest.AssertDeepEqual(t, tc.expect, got)
|
||||
}
|
||||
}
|
||||
|
||||
cases := map[string]testcase{
|
||||
"nil input": {},
|
||||
"no filter": {
|
||||
in: []*pbresource.Resource{
|
||||
create("one"),
|
||||
create("two"),
|
||||
create("three"),
|
||||
create("four"),
|
||||
},
|
||||
filter: "",
|
||||
expect: []*pbresource.Resource{
|
||||
create("one"),
|
||||
create("two"),
|
||||
create("three"),
|
||||
create("four"),
|
||||
},
|
||||
},
|
||||
"bad filter": {
|
||||
in: []*pbresource.Resource{
|
||||
create("one"),
|
||||
create("two"),
|
||||
create("three"),
|
||||
create("four"),
|
||||
},
|
||||
filter: "garbage.value == zzz",
|
||||
expectErr: `Selector "garbage" is not valid`,
|
||||
},
|
||||
"filter everything out": {
|
||||
in: []*pbresource.Resource{
|
||||
create("one"),
|
||||
create("two"),
|
||||
create("three"),
|
||||
create("four"),
|
||||
},
|
||||
filter: "metadata.foo == bar",
|
||||
},
|
||||
"filter simply": {
|
||||
in: []*pbresource.Resource{
|
||||
create("one", "foo", "bar"),
|
||||
create("two", "foo", "baz"),
|
||||
create("three", "zim", "gir"),
|
||||
create("four", "zim", "gaz", "foo", "bar"),
|
||||
},
|
||||
filter: "metadata.foo == bar",
|
||||
expect: []*pbresource.Resource{
|
||||
create("one", "foo", "bar"),
|
||||
create("four", "zim", "gaz", "foo", "bar"),
|
||||
},
|
||||
},
|
||||
"filter prefix": {
|
||||
in: []*pbresource.Resource{
|
||||
create("one", "foo", "bar"),
|
||||
create("two", "foo", "baz"),
|
||||
create("three", "zim", "gir"),
|
||||
create("four", "zim", "gaz", "foo", "bar"),
|
||||
create("four", "zim", "zzz"),
|
||||
},
|
||||
filter: "(zim in metadata) and (metadata.zim matches `^g.`)",
|
||||
expect: []*pbresource.Resource{
|
||||
create("three", "zim", "gir"),
|
||||
create("four", "zim", "gaz", "foo", "bar"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
run(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterMatchesResourceMetadata(t *testing.T) {
|
||||
type testcase struct {
|
||||
res *pbresource.Resource
|
||||
filter string
|
||||
expect bool
|
||||
expectErr string
|
||||
}
|
||||
|
||||
create := func(name string, kvs ...string) *pbresource.Resource {
|
||||
require.True(t, len(kvs)%2 == 0)
|
||||
|
||||
meta := make(map[string]string)
|
||||
for i := 0; i < len(kvs); i += 2 {
|
||||
meta[kvs[i]] = kvs[i+1]
|
||||
}
|
||||
|
||||
return &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Name: name,
|
||||
},
|
||||
Metadata: meta,
|
||||
}
|
||||
}
|
||||
|
||||
run := func(t *testing.T, tc testcase) {
|
||||
got, err := FilterMatchesResourceMetadata(tc.res, tc.filter)
|
||||
if tc.expectErr != "" {
|
||||
require.Error(t, err)
|
||||
testutil.RequireErrorContains(t, err, tc.expectErr)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expect, got)
|
||||
}
|
||||
}
|
||||
|
||||
cases := map[string]testcase{
|
||||
"nil input": {},
|
||||
"no filter": {
|
||||
res: create("one"),
|
||||
filter: "",
|
||||
expect: true,
|
||||
},
|
||||
"bad filter": {
|
||||
res: create("one"),
|
||||
filter: "garbage.value == zzz",
|
||||
expectErr: `Selector "garbage" is not valid`,
|
||||
},
|
||||
"no match": {
|
||||
res: create("one"),
|
||||
filter: "metadata.foo == bar",
|
||||
},
|
||||
"match simply": {
|
||||
res: create("one", "foo", "bar"),
|
||||
filter: "metadata.foo == bar",
|
||||
expect: true,
|
||||
},
|
||||
"match via prefix": {
|
||||
res: create("four", "zim", "gaz", "foo", "bar"),
|
||||
filter: "(zim in metadata) and (metadata.zim matches `^g.`)",
|
||||
expect: true,
|
||||
},
|
||||
"no match via prefix": {
|
||||
res: create("four", "zim", "zzz", "foo", "bar"),
|
||||
filter: "(zim in metadata) and (metadata.zim matches `^g.`)",
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
run(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user