Matt Keeler 123bc95e1a
Add Common Controller Caching Infrastructure (#19767)
* Add Common Controller Caching Infrastructure
2023-12-13 10:06:39 -05:00

123 lines
3.2 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package resource
import (
"context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/proto-public/pbresource"
)
func (s *Server) List(ctx context.Context, req *pbresource.ListRequest) (*pbresource.ListResponse, error) {
reg, err := s.ensureListRequestValid(req)
if err != nil {
return nil, err
}
// v1 ACL subsystem is "wildcard" aware so just pass on through.
entMeta := v2TenancyToV1EntMeta(req.Tenancy)
token := tokenFromContext(ctx)
authz, authzContext, err := s.getAuthorizer(token, entMeta)
if err != nil {
return nil, err
}
// Check ACLs.
err = reg.ACLs.List(authz, authzContext)
switch {
case acl.IsErrPermissionDenied(err):
return nil, status.Error(codes.PermissionDenied, err.Error())
case err != nil:
return nil, status.Errorf(codes.Internal, "failed list acl: %v", err)
}
// Ensure we're defaulting correctly when request tenancy units are empty.
v1EntMetaToV2Tenancy(reg, entMeta, req.Tenancy)
resources, err := s.Backend.List(
ctx,
readConsistencyFrom(ctx),
storage.UnversionedTypeFrom(req.Type),
req.Tenancy,
req.NamePrefix,
)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed list: %v", err)
}
result := make([]*pbresource.Resource, 0)
for _, resource := range resources {
// Filter out non-matching GroupVersion.
if resource.Id.Type.GroupVersion != req.Type.GroupVersion {
continue
}
// Need to rebuild authorizer per resource since wildcard inputs may
// result in different tenancies. Consider caching per tenancy if this
// is deemed expensive.
entMeta = v2TenancyToV1EntMeta(resource.Id.Tenancy)
authz, authzContext, err = s.getAuthorizer(token, entMeta)
if err != nil {
return nil, err
}
// Filter out items that don't pass read ACLs.
err = reg.ACLs.Read(authz, authzContext, resource.Id, resource)
switch {
case acl.IsErrPermissionDenied(err):
continue
case err != nil:
return nil, status.Errorf(codes.Internal, "failed read acl: %v", err)
}
result = append(result, resource)
}
return &pbresource.ListResponse{Resources: result}, nil
}
func (s *Server) ensureListRequestValid(req *pbresource.ListRequest) (*resource.Registration, error) {
var field string
switch {
case req.Type == nil:
field = "type"
case req.Tenancy == nil:
field = "tenancy"
}
if field != "" {
return nil, status.Errorf(codes.InvalidArgument, "%s is required", field)
}
// Check type exists.
reg, err := s.resolveType(req.Type)
if err != nil {
return nil, err
}
if err = checkV2Tenancy(s.UseV2Tenancy, req.Type); err != nil {
return nil, err
}
if err := validateWildcardTenancy(req.Tenancy, req.NamePrefix); err != nil {
return nil, err
}
// Error when partition scoped and namespace not empty.
if reg.Scope == resource.ScopePartition && req.Tenancy.Namespace != "" && req.Tenancy.Namespace != storage.Wildcard {
return nil, status.Errorf(
codes.InvalidArgument,
"partition scoped type %s cannot have a namespace. got: %s",
resource.ToGVK(req.Type),
req.Tenancy.Namespace,
)
}
return reg, nil
}