resource: Make resource delete tenancy aware (#18476)

resource: Make resource delete tenancy awarae
This commit is contained in:
Semir Patel 2023-08-16 11:44:10 -05:00 committed by GitHub
parent 5ca8cd67e8
commit e6c1c479b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 140 additions and 101 deletions

View File

@ -27,17 +27,13 @@ import (
// - Errors with Aborted if the requested Version does not match the stored Version.
// - Errors with PermissionDenied if ACL check fails
func (s *Server) Delete(ctx context.Context, req *pbresource.DeleteRequest) (*pbresource.DeleteResponse, error) {
if err := validateDeleteRequest(req); err != nil {
return nil, err
}
reg, err := s.resolveType(req.Id.Type)
reg, err := s.validateDeleteRequest(req)
if err != nil {
return nil, err
}
// TODO(spatel): Refactor _ and entMeta in NET-4919
authz, authzContext, err := s.getAuthorizer(tokenFromContext(ctx), acl.DefaultEnterpriseMeta())
entMeta := v2TenancyToV1EntMeta(req.Id.Tenancy)
authz, authzContext, err := s.getAuthorizer(tokenFromContext(ctx), entMeta)
if err != nil {
return nil, err
}
@ -48,6 +44,10 @@ func (s *Server) Delete(ctx context.Context, req *pbresource.DeleteRequest) (*pb
if req.Version == "" || req.Id.Uid == "" {
consistency = storage.StrongConsistency
}
// Apply defaults when tenancy units empty.
v1EntMetaToV2Tenancy(reg, entMeta, req.Id.Tenancy)
existing, err := s.Backend.Read(ctx, consistency, req.Id)
switch {
case errors.Is(err, storage.ErrNotFound):
@ -144,15 +144,31 @@ func (s *Server) maybeCreateTombstone(ctx context.Context, deleteId *pbresource.
}
}
func validateDeleteRequest(req *pbresource.DeleteRequest) error {
func (s *Server) validateDeleteRequest(req *pbresource.DeleteRequest) (*resource.Registration, error) {
if req.Id == nil {
return status.Errorf(codes.InvalidArgument, "id is required")
return nil, status.Errorf(codes.InvalidArgument, "id is required")
}
if err := validateId(req.Id, "id"); err != nil {
return err
return nil, err
}
return nil
reg, err := s.resolveType(req.Id.Type)
if err != nil {
return nil, err
}
// Check scope
if reg.Scope == resource.ScopePartition && req.Id.Tenancy.Namespace != "" {
return nil, status.Errorf(
codes.InvalidArgument,
"partition scoped resource %s cannot have a namespace. got: %s",
resource.ToGVK(req.Id.Type),
req.Id.Tenancy.Namespace,
)
}
return reg, nil
}
// Maintains a deterministic mapping between a resource and it's tombstone's

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/internal/resource"
@ -25,35 +26,36 @@ func TestDelete_InputValidation(t *testing.T) {
demo.RegisterTypes(server.Registry)
testCases := map[string]func(*pbresource.DeleteRequest){
"no id": func(req *pbresource.DeleteRequest) { req.Id = nil },
"no type": func(req *pbresource.DeleteRequest) { req.Id.Type = nil },
"no tenancy": func(req *pbresource.DeleteRequest) { req.Id.Tenancy = nil },
"no name": func(req *pbresource.DeleteRequest) { req.Id.Name = "" },
// TODO(spatel): Refactor tenancy as part of NET-4919
//
// clone necessary to not pollute DefaultTenancy
// "tenancy partition not default": func(req *pbresource.DeleteRequest) {
// req.Id.Tenancy = clone(req.Id.Tenancy)
// req.Id.Tenancy.Partition = ""
// },
// "tenancy namespace not default": func(req *pbresource.DeleteRequest) {
// req.Id.Tenancy = clone(req.Id.Tenancy)
// req.Id.Tenancy.Namespace = ""
// },
// "tenancy peername not local": func(req *pbresource.DeleteRequest) {
// req.Id.Tenancy = clone(req.Id.Tenancy)
// req.Id.Tenancy.PeerName = ""
// },
testCases := map[string]func(artistId, recordLabelId *pbresource.ID) *pbresource.ID{
"no id": func(artistId, recordLabelId *pbresource.ID) *pbresource.ID {
return nil
},
"no type": func(artistId, _ *pbresource.ID) *pbresource.ID {
artistId.Type = nil
return artistId
},
"no tenancy": func(artistId, _ *pbresource.ID) *pbresource.ID {
artistId.Tenancy = nil
return artistId
},
"no name": func(artistId, _ *pbresource.ID) *pbresource.ID {
artistId.Name = ""
return artistId
},
"partition scoped resource with namespace": func(_, recordLabelId *pbresource.ID) *pbresource.ID {
recordLabelId.Tenancy.Namespace = "ishouldnothaveanamespace"
return recordLabelId
},
}
for desc, modFn := range testCases {
t.Run(desc, func(t *testing.T) {
res, err := demo.GenerateV2Artist()
recordLabel, err := demo.GenerateV1RecordLabel("LoonyTunes")
require.NoError(t, err)
req := &pbresource.DeleteRequest{Id: res.Id, Version: ""}
modFn(req)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
req := &pbresource.DeleteRequest{Id: modFn(artist.Id, recordLabel.Id), Version: ""}
_, err = client.Delete(testContext(t), req)
require.Error(t, err)
@ -125,34 +127,48 @@ func TestDelete_Success(t *testing.T) {
for desc, tc := range deleteTestCases() {
t.Run(desc, func(t *testing.T) {
server, client, ctx := testDeps(t)
demo.RegisterTypes(server.Registry)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
for tenancyDesc, modFn := range tenancyCases() {
t.Run(tenancyDesc, func(t *testing.T) {
server, client, ctx := testDeps(t)
demo.RegisterTypes(server.Registry)
rsp, err := client.Write(ctx, &pbresource.WriteRequest{Resource: artist})
require.NoError(t, err)
artistId := clone(rsp.Resource.Id)
artist = rsp.Resource
recordLabel, err := demo.GenerateV1RecordLabel("LoonyTunes")
require.NoError(t, err)
recordLabel, err = server.Backend.WriteCAS(ctx, recordLabel)
require.NoError(t, err)
// delete
_, err = client.Delete(ctx, tc.deleteReqFn(artist))
require.NoError(t, err)
artist, err := demo.GenerateV2Artist()
require.NoError(t, err)
artist, err = server.Backend.WriteCAS(ctx, artist)
require.NoError(t, err)
// verify deleted
_, err = server.Backend.Read(ctx, storage.StrongConsistency, artistId)
require.Error(t, err)
require.ErrorIs(t, err, storage.ErrNotFound)
// Pick the resource to be deleted based on type's scope
deleteId := modFn(artist.Id, recordLabel.Id)
deleteReq := tc.deleteReqFn(recordLabel)
if proto.Equal(deleteId.Type, demo.TypeV2Artist) {
deleteReq = tc.deleteReqFn(artist)
}
// verify tombstone created
_, err = client.Read(ctx, &pbresource.ReadRequest{
Id: &pbresource.ID{
Name: tombstoneName(artistId),
Type: resource.TypeV1Tombstone,
Tenancy: artist.Id.Tenancy,
},
})
require.NoError(t, err)
// Delete
_, err = client.Delete(ctx, deleteReq)
require.NoError(t, err)
// Verify deleted
_, err = server.Backend.Read(ctx, storage.StrongConsistency, deleteId)
require.Error(t, err)
require.ErrorIs(t, err, storage.ErrNotFound)
// Verify tombstone created
_, err = client.Read(ctx, &pbresource.ReadRequest{
Id: &pbresource.ID{
Name: tombstoneName(deleteReq.Id),
Type: resource.TypeV1Tombstone,
Tenancy: deleteReq.Id.Tenancy,
},
})
require.NoError(t, err, "expected tombstome to be found")
})
}
})
}
}

View File

@ -5,7 +5,6 @@ package resource
import (
"context"
"strings"
"testing"
"github.com/stretchr/testify/mock"
@ -157,47 +156,7 @@ func TestRead_GroupVersionMismatch(t *testing.T) {
func TestRead_Success(t *testing.T) {
for desc, tc := range readTestCases() {
t.Run(desc, func(t *testing.T) {
tenancyCases := map[string]func(artistId, recordlabelId *pbresource.ID) *pbresource.ID{
"namespaced resource provides nonempty partition and namespace": func(artistId, recordLabelId *pbresource.ID) *pbresource.ID {
return artistId
},
"namespaced resource provides uppercase partition and namespace": func(artistId, _ *pbresource.ID) *pbresource.ID {
id := clone(artistId)
id.Tenancy.Partition = strings.ToUpper(artistId.Tenancy.Partition)
id.Tenancy.Namespace = strings.ToUpper(artistId.Tenancy.Namespace)
return id
},
"namespaced resource inherits tokens partition when empty": func(artistId, _ *pbresource.ID) *pbresource.ID {
id := clone(artistId)
id.Tenancy.Partition = ""
return id
},
"namespaced resource inherits tokens namespace when empty": func(artistId, _ *pbresource.ID) *pbresource.ID {
id := clone(artistId)
id.Tenancy.Namespace = ""
return id
},
"namespaced resource inherits tokens partition and namespace when empty": func(artistId, _ *pbresource.ID) *pbresource.ID {
id := clone(artistId)
id.Tenancy.Partition = ""
id.Tenancy.Namespace = ""
return id
},
"partitioned resource provides nonempty partition": func(_, recordLabelId *pbresource.ID) *pbresource.ID {
return recordLabelId
},
"partitioned resource provides uppercase partition": func(_, recordLabelId *pbresource.ID) *pbresource.ID {
id := clone(recordLabelId)
id.Tenancy.Partition = strings.ToUpper(recordLabelId.Tenancy.Partition)
return id
},
"partitioned resource inherits tokens partition when empty": func(_, recordLabelId *pbresource.ID) *pbresource.ID {
id := clone(recordLabelId)
id.Tenancy.Partition = ""
return id
},
}
for tenancyDesc, modFn := range tenancyCases {
for tenancyDesc, modFn := range tenancyCases() {
t.Run(tenancyDesc, func(t *testing.T) {
server := testServer(t)
demo.RegisterTypes(server.Registry)

View File

@ -6,6 +6,7 @@ package resource
import (
"context"
"fmt"
"strings"
"testing"
"github.com/stretchr/testify/mock"
@ -129,3 +130,50 @@ func modifyArtist(t *testing.T, res *pbresource.Resource) *pbresource.Resource {
res.Data = data
return res
}
// tenancyCases returns permutations of valid tenancy structs in a resource id to use as inputs.
// - the id is for a recordLabel when the resource is partition scoped
// - the id is for an artist when the resource is namespace scoped
func tenancyCases() map[string]func(artistId, recordlabelId *pbresource.ID) *pbresource.ID {
tenancyCases := map[string]func(artistId, recordlabelId *pbresource.ID) *pbresource.ID{
"namespaced resource provides nonempty partition and namespace": func(artistId, recordLabelId *pbresource.ID) *pbresource.ID {
return artistId
},
"namespaced resource provides uppercase partition and namespace": func(artistId, _ *pbresource.ID) *pbresource.ID {
id := clone(artistId)
id.Tenancy.Partition = strings.ToUpper(artistId.Tenancy.Partition)
id.Tenancy.Namespace = strings.ToUpper(artistId.Tenancy.Namespace)
return id
},
"namespaced resource inherits tokens partition when empty": func(artistId, _ *pbresource.ID) *pbresource.ID {
id := clone(artistId)
id.Tenancy.Partition = ""
return id
},
"namespaced resource inherits tokens namespace when empty": func(artistId, _ *pbresource.ID) *pbresource.ID {
id := clone(artistId)
id.Tenancy.Namespace = ""
return id
},
"namespaced resource inherits tokens partition and namespace when empty": func(artistId, _ *pbresource.ID) *pbresource.ID {
id := clone(artistId)
id.Tenancy.Partition = ""
id.Tenancy.Namespace = ""
return id
},
"partitioned resource provides nonempty partition": func(_, recordLabelId *pbresource.ID) *pbresource.ID {
return recordLabelId
},
"partitioned resource provides uppercase partition": func(_, recordLabelId *pbresource.ID) *pbresource.ID {
id := clone(recordLabelId)
id.Tenancy.Partition = strings.ToUpper(recordLabelId.Tenancy.Partition)
return id
},
"partitioned resource inherits tokens partition when empty": func(_, recordLabelId *pbresource.ID) *pbresource.ID {
id := clone(recordLabelId)
id.Tenancy.Partition = ""
return id
},
}
return tenancyCases
}