mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 13:26:07 +00:00
resource: retry non-CAS deletes automatically (#20292)
This commit is contained in:
parent
6188db4d20
commit
6d9e8fdd05
@ -50,56 +50,66 @@ func (s *Server) Delete(ctx context.Context, req *pbresource.DeleteRequest) (*pb
|
||||
// Apply defaults when tenancy units empty.
|
||||
v1EntMetaToV2Tenancy(reg, entMeta, req.Id.Tenancy)
|
||||
|
||||
existing, err := s.Backend.Read(ctx, consistency, req.Id)
|
||||
switch {
|
||||
case errors.Is(err, storage.ErrNotFound):
|
||||
// Deletes are idempotent so no-op when not found
|
||||
return &pbresource.DeleteResponse{}, nil
|
||||
case err != nil:
|
||||
return nil, status.Errorf(codes.Internal, "failed read: %v", err)
|
||||
}
|
||||
|
||||
// Check ACLs
|
||||
err = reg.ACLs.Write(authz, authzContext, existing)
|
||||
switch {
|
||||
case acl.IsErrPermissionDenied(err):
|
||||
return nil, status.Error(codes.PermissionDenied, err.Error())
|
||||
case err != nil:
|
||||
return nil, status.Errorf(codes.Internal, "failed write acl: %v", err)
|
||||
}
|
||||
|
||||
deleteVersion := req.Version
|
||||
deleteId := req.Id
|
||||
if deleteVersion == "" || deleteId.Uid == "" {
|
||||
deleteVersion = existing.Version
|
||||
deleteId = existing.Id
|
||||
}
|
||||
|
||||
// Check finalizers for a deferred delete
|
||||
if resource.HasFinalizers(existing) {
|
||||
if resource.IsMarkedForDeletion(existing) {
|
||||
// Delete previously requested and finalizers still present so nothing to do
|
||||
return &pbresource.DeleteResponse{}, nil
|
||||
// Only non-CAS deletes (version=="") are automatically retried.
|
||||
err = s.retryCAS(ctx, req.Version, func() error {
|
||||
existing, err := s.Backend.Read(ctx, consistency, req.Id)
|
||||
switch {
|
||||
case errors.Is(err, storage.ErrNotFound):
|
||||
// Deletes are idempotent so no-op when not found
|
||||
return nil
|
||||
case err != nil:
|
||||
return status.Errorf(codes.Internal, "failed read: %v", err)
|
||||
}
|
||||
|
||||
// Mark for deletion and let controllers that put finalizers in place do their
|
||||
// thing. Note we're passing in a clone of the recently read resource since
|
||||
// we've not crossed a network/serialization boundary since the read and we
|
||||
// don't want to mutate the in-mem reference.
|
||||
return s.markForDeletion(ctx, clone(existing))
|
||||
}
|
||||
// Check ACLs
|
||||
err = reg.ACLs.Write(authz, authzContext, existing)
|
||||
switch {
|
||||
case acl.IsErrPermissionDenied(err):
|
||||
return status.Error(codes.PermissionDenied, err.Error())
|
||||
case err != nil:
|
||||
return status.Errorf(codes.Internal, "failed write acl: %v", err)
|
||||
}
|
||||
|
||||
// Continue with an immediate delete
|
||||
if err := s.maybeCreateTombstone(ctx, deleteId); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
deleteVersion := req.Version
|
||||
deleteId := req.Id
|
||||
if deleteVersion == "" || deleteId.Uid == "" {
|
||||
deleteVersion = existing.Version
|
||||
deleteId = existing.Id
|
||||
}
|
||||
|
||||
// Check finalizers for a deferred delete
|
||||
if resource.HasFinalizers(existing) {
|
||||
if resource.IsMarkedForDeletion(existing) {
|
||||
// Delete previously requested and finalizers still present so nothing to do
|
||||
return nil
|
||||
}
|
||||
|
||||
// Mark for deletion and let controllers that put finalizers in place do their
|
||||
// thing. Note we're passing in a clone of the recently read resource since
|
||||
// we've not crossed a network/serialization boundary since the read and we
|
||||
// don't want to mutate the in-mem reference.
|
||||
_, err := s.markForDeletion(ctx, clone(existing))
|
||||
return err
|
||||
}
|
||||
|
||||
// Continue with an immediate delete
|
||||
if err := s.maybeCreateTombstone(ctx, deleteId); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.Backend.DeleteCAS(ctx, deleteId, deleteVersion)
|
||||
return err
|
||||
})
|
||||
|
||||
err = s.Backend.DeleteCAS(ctx, deleteId, deleteVersion)
|
||||
switch {
|
||||
case err == nil:
|
||||
return &pbresource.DeleteResponse{}, nil
|
||||
case errors.Is(err, storage.ErrCASFailure):
|
||||
return nil, status.Error(codes.Aborted, err.Error())
|
||||
case isGRPCStatusError(err):
|
||||
// Pass through gRPC errors from internal calls to resource service
|
||||
// endpoints (e.g. Write when marking for deletion).
|
||||
return nil, err
|
||||
default:
|
||||
return nil, status.Errorf(codes.Internal, "failed delete: %v", err)
|
||||
}
|
||||
|
@ -180,7 +180,7 @@ func TestDelete_ACLs(t *testing.T) {
|
||||
authz: AuthorizerFrom(t, demo.ArtistV1WritePolicy),
|
||||
assertErrFn: func(err error) {
|
||||
require.Error(t, err)
|
||||
require.Equal(t, codes.PermissionDenied.String(), status.Code(err).String())
|
||||
require.Equal(t, codes.PermissionDenied.String(), status.Code(err).String(), err)
|
||||
},
|
||||
},
|
||||
"delete allowed": {
|
||||
@ -289,6 +289,52 @@ func TestDelete_Success(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelete_NonCAS_Retry(t *testing.T) {
|
||||
server := testServer(t)
|
||||
client := testClient(t, server)
|
||||
demo.RegisterTypes(server.Registry)
|
||||
|
||||
res, err := demo.GenerateV2Artist()
|
||||
require.NoError(t, err)
|
||||
|
||||
rsp1, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Simulate conflicting versions by blocking the RPC after it has read the
|
||||
// current version of the resource, but before it tries to do a CAS delete
|
||||
// based on that version.
|
||||
backend := &blockOnceBackend{
|
||||
Backend: server.Backend,
|
||||
|
||||
readCompletedCh: make(chan struct{}),
|
||||
blockCh: make(chan struct{}),
|
||||
}
|
||||
server.Backend = backend
|
||||
|
||||
deleteResultCh := make(chan error)
|
||||
go func() {
|
||||
_, err := client.Delete(testContext(t), &pbresource.DeleteRequest{Id: rsp1.Resource.Id, Version: ""})
|
||||
deleteResultCh <- err
|
||||
}()
|
||||
|
||||
// Wait for the read, to ensure the Delete in the goroutine above has read the
|
||||
// current version of the resource.
|
||||
<-backend.readCompletedCh
|
||||
|
||||
// Update the artist so that its version is different from the version read by Delete
|
||||
res = modifyArtist(t, rsp1.Resource)
|
||||
_, err = backend.WriteCAS(testContext(t), res)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Unblock the Delete by allowing the backend read to return and attempt a CAS delete.
|
||||
// The CAS delete should fail once, and they retry the backend read/delete cycle again
|
||||
// successfully.
|
||||
close(backend.blockCh)
|
||||
|
||||
// Check that the delete succeeded anyway because of a retry.
|
||||
require.NoError(t, <-deleteResultCh)
|
||||
}
|
||||
|
||||
func TestDelete_TombstoneDeletionDoesNotCreateNewTombstone(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -5,7 +5,9 @@ package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@ -19,6 +21,7 @@ import (
|
||||
"github.com/hashicorp/consul/acl/resolver"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
@ -297,4 +300,39 @@ func isTenancyMarkedForDeletion(reg *resource.Registration, tenancyBridge Tenanc
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// retryCAS retries the given operation with exponential backoff if the user
|
||||
// didn't provide a version. This is intended to hide failures when the user
|
||||
// isn't intentionally performing a CAS operation (all writes are, by design,
|
||||
// CAS operations at the storage backend layer).
|
||||
func (s *Server) retryCAS(ctx context.Context, vsn string, cas func() error) error {
|
||||
if vsn != "" {
|
||||
return cas()
|
||||
}
|
||||
|
||||
const maxAttempts = 5
|
||||
|
||||
// These parameters are fairly arbitrary, so if you find better ones then go
|
||||
// ahead and swap them out! In general, we want to wait long enough to smooth
|
||||
// over small amounts of storage replication lag, but not so long that we make
|
||||
// matters worse by holding onto load.
|
||||
backoff := &retry.Waiter{
|
||||
MinWait: 50 * time.Millisecond,
|
||||
MaxWait: 1 * time.Second,
|
||||
Jitter: retry.NewJitter(50),
|
||||
Factor: 75 * time.Millisecond,
|
||||
}
|
||||
|
||||
var err error
|
||||
for i := 1; i <= maxAttempts; i++ {
|
||||
if err = cas(); !errors.Is(err, storage.ErrCASFailure) {
|
||||
break
|
||||
}
|
||||
if backoff.Wait(ctx) != nil {
|
||||
break
|
||||
}
|
||||
s.Logger.Trace("retrying failed CAS operation", "failure_count", i)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func clone[T proto.Message](v T) T { return proto.Clone(v).(T) }
|
||||
|
@ -6,6 +6,7 @@ package resource_test
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
@ -23,6 +24,7 @@ import (
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/resource/demo"
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
"github.com/hashicorp/consul/internal/storage/inmem"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
pbdemov2 "github.com/hashicorp/consul/proto/private/pbdemo/v2"
|
||||
@ -283,4 +285,25 @@ func tenancyCases() map[string]func(artistId, recordlabelId *pbresource.ID) *pbr
|
||||
return tenancyCases
|
||||
}
|
||||
|
||||
type blockOnceBackend struct {
|
||||
storage.Backend
|
||||
|
||||
done uint32
|
||||
readCompletedCh chan struct{}
|
||||
blockCh chan struct{}
|
||||
}
|
||||
|
||||
func (b *blockOnceBackend) Read(ctx context.Context, consistency storage.ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) {
|
||||
res, err := b.Backend.Read(ctx, consistency, id)
|
||||
|
||||
// Block for exactly one call to Read. All subsequent calls (including those
|
||||
// concurrent to the blocked call) will return immediately.
|
||||
if atomic.CompareAndSwapUint32(&b.done, 0, 1) {
|
||||
close(b.readCompletedCh)
|
||||
<-b.blockCh
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
func clone[T proto.Message](v T) T { return proto.Clone(v).(T) }
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
"golang.org/x/exp/maps"
|
||||
@ -18,7 +17,6 @@ import (
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
@ -253,41 +251,6 @@ func (s *Server) Write(ctx context.Context, req *pbresource.WriteRequest) (*pbre
|
||||
return &pbresource.WriteResponse{Resource: result}, nil
|
||||
}
|
||||
|
||||
// retryCAS retries the given operation with exponential backoff if the user
|
||||
// didn't provide a version. This is intended to hide failures when the user
|
||||
// isn't intentionally performing a CAS operation (all writes are, by design,
|
||||
// CAS operations at the storage backend layer).
|
||||
func (s *Server) retryCAS(ctx context.Context, vsn string, cas func() error) error {
|
||||
if vsn != "" {
|
||||
return cas()
|
||||
}
|
||||
|
||||
const maxAttempts = 5
|
||||
|
||||
// These parameters are fairly arbitrary, so if you find better ones then go
|
||||
// ahead and swap them out! In general, we want to wait long enough to smooth
|
||||
// over small amounts of storage replication lag, but not so long that we make
|
||||
// matters worse by holding onto load.
|
||||
backoff := &retry.Waiter{
|
||||
MinWait: 50 * time.Millisecond,
|
||||
MaxWait: 1 * time.Second,
|
||||
Jitter: retry.NewJitter(50),
|
||||
Factor: 75 * time.Millisecond,
|
||||
}
|
||||
|
||||
var err error
|
||||
for i := 1; i <= maxAttempts; i++ {
|
||||
if err = cas(); !errors.Is(err, storage.ErrCASFailure) {
|
||||
break
|
||||
}
|
||||
if backoff.Wait(ctx) != nil {
|
||||
break
|
||||
}
|
||||
s.Logger.Trace("retrying failed CAS operation", "failure_count", i)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Server) ensureWriteRequestValid(req *pbresource.WriteRequest) (*resource.Registration, error) {
|
||||
var field string
|
||||
switch {
|
||||
|
@ -518,8 +518,8 @@ func TestWriteStatus_NonCASUpdate_Retry(t *testing.T) {
|
||||
backend := &blockOnceBackend{
|
||||
Backend: server.Backend,
|
||||
|
||||
readCh: make(chan struct{}),
|
||||
blockCh: make(chan struct{}),
|
||||
readCompletedCh: make(chan struct{}),
|
||||
blockCh: make(chan struct{}),
|
||||
}
|
||||
server.Backend = backend
|
||||
|
||||
@ -534,7 +534,7 @@ func TestWriteStatus_NonCASUpdate_Retry(t *testing.T) {
|
||||
|
||||
// Wait for the read, to ensure the Write in the goroutine above has read the
|
||||
// current version of the resource.
|
||||
<-backend.readCh
|
||||
<-backend.readCompletedCh
|
||||
|
||||
// Update the resource.
|
||||
_, err = client.Write(testContext(t), &pbresource.WriteRequest{Resource: modifyArtist(t, res)})
|
||||
|
@ -6,7 +6,6 @@ package resource_test
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -23,7 +22,6 @@ import (
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/resource/demo"
|
||||
rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
pbdemo "github.com/hashicorp/consul/proto/private/pbdemo/v1"
|
||||
pbdemov1 "github.com/hashicorp/consul/proto/private/pbdemo/v1"
|
||||
@ -753,8 +751,8 @@ func TestWrite_NonCASUpdate_Retry(t *testing.T) {
|
||||
backend := &blockOnceBackend{
|
||||
Backend: server.Backend,
|
||||
|
||||
readCh: make(chan struct{}),
|
||||
blockCh: make(chan struct{}),
|
||||
readCompletedCh: make(chan struct{}),
|
||||
blockCh: make(chan struct{}),
|
||||
}
|
||||
server.Backend = backend
|
||||
|
||||
@ -769,7 +767,7 @@ func TestWrite_NonCASUpdate_Retry(t *testing.T) {
|
||||
|
||||
// Wait for the read, to ensure the Write in the goroutine above has read the
|
||||
// current version of the resource.
|
||||
<-backend.readCh
|
||||
<-backend.readCompletedCh
|
||||
|
||||
// Update the resource.
|
||||
res = modifyArtist(t, rsp1.Resource)
|
||||
@ -896,27 +894,6 @@ func TestWrite_Owner_Uid(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
type blockOnceBackend struct {
|
||||
storage.Backend
|
||||
|
||||
done uint32
|
||||
readCh chan struct{}
|
||||
blockCh chan struct{}
|
||||
}
|
||||
|
||||
func (b *blockOnceBackend) Read(ctx context.Context, consistency storage.ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) {
|
||||
res, err := b.Backend.Read(ctx, consistency, id)
|
||||
|
||||
// Block for exactly one call to Read. All subsequent calls (including those
|
||||
// concurrent to the blocked call) will return immediately.
|
||||
if atomic.CompareAndSwapUint32(&b.done, 0, 1) {
|
||||
close(b.readCh)
|
||||
<-b.blockCh
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
func TestEnsureFinalizerRemoved(t *testing.T) {
|
||||
type testCase struct {
|
||||
mod func(input, existing *pbresource.Resource)
|
||||
|
Loading…
x
Reference in New Issue
Block a user