mirror of https://github.com/status-im/consul.git
149 lines
4.1 KiB
Go
149 lines
4.1 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package reaper
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/internal/controller"
|
|
"github.com/hashicorp/consul/internal/resource"
|
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
const (
|
|
statusKeyReaperController = "consul.io/reaper-controller"
|
|
secondPassDelay = 30 * time.Second
|
|
conditionTypeFirstPassCompleted = "FirstPassCompleted"
|
|
)
|
|
|
|
// RegisterControllers registers controllers for the tombstone type.
|
|
func RegisterControllers(mgr *controller.Manager) {
|
|
mgr.Register(reaperController())
|
|
}
|
|
|
|
func reaperController() controller.Controller {
|
|
return controller.ForType(resource.TypeV1Tombstone).
|
|
WithReconciler(newReconciler())
|
|
}
|
|
|
|
func newReconciler() *tombstoneReconciler {
|
|
return &tombstoneReconciler{
|
|
timeNow: time.Now,
|
|
}
|
|
}
|
|
|
|
type tombstoneReconciler struct {
|
|
// Testing shim
|
|
timeNow func() time.Time
|
|
}
|
|
|
|
// Deletes all owned (child) resources of an owner (parent) resource.
|
|
//
|
|
// The reconciliation for tombstones is split into two passes.
|
|
// The first pass attempts to delete child resources created before the owner resource was deleted.
|
|
// The second pass is run after a reasonable delay to delete child resources that may have been
|
|
// created during or after the completion of the first pass.
|
|
func (r *tombstoneReconciler) Reconcile(ctx context.Context, rt controller.Runtime, req controller.Request) error {
|
|
rsp, err := rt.Client.Read(ctx, &pbresource.ReadRequest{Id: req.ID})
|
|
switch {
|
|
case status.Code(err) == codes.NotFound:
|
|
// tombstone not found. nothing to do
|
|
return nil
|
|
case err != nil:
|
|
// retry later
|
|
return err
|
|
}
|
|
res := rsp.Resource
|
|
|
|
var tombstone pbresource.Tombstone
|
|
if err := res.Data.UnmarshalTo(&tombstone); err != nil {
|
|
return err
|
|
}
|
|
|
|
firstPassCompletedOnEntry := isFirstPassCompleted(res)
|
|
|
|
// Corner case:
|
|
// Check secondPassDelay has elasped since first pass in cases where queued
|
|
// reconciliation requests are lost between the first and second pass
|
|
// (e.g. controller relocated to a different server due to raft leadership
|
|
// change).
|
|
if firstPassCompletedOnEntry && !r.secondPassDelayElapsed(res.Status[statusKeyReaperController]) {
|
|
return controller.RequeueAfter(secondPassDelay)
|
|
}
|
|
|
|
// Retrieve owner's children
|
|
listRsp, err := rt.Client.ListByOwner(ctx, &pbresource.ListByOwnerRequest{Owner: tombstone.Owner})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Attempt to delete each child
|
|
for _, child := range listRsp.Resources {
|
|
_, err := rt.Client.Delete(ctx, &pbresource.DeleteRequest{Id: child.Id})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if firstPassCompletedOnEntry {
|
|
// we just did the second pass -> delete tombstone
|
|
_, err := rt.Client.Delete(ctx, &pbresource.DeleteRequest{Id: res.Id})
|
|
if err != nil {
|
|
// tombstone deletion failed, just retry
|
|
return err
|
|
}
|
|
// tombstone delete succeeded and reconciliation complete
|
|
return nil
|
|
} else {
|
|
// we just did the first pass -> queue up the second pass
|
|
_, err = rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{
|
|
Id: res.Id,
|
|
Key: statusKeyReaperController,
|
|
Status: &pbresource.Status{
|
|
ObservedGeneration: res.Generation,
|
|
Conditions: []*pbresource.Condition{
|
|
{
|
|
Type: conditionTypeFirstPassCompleted,
|
|
State: pbresource.Condition_STATE_TRUE,
|
|
Reason: "Success",
|
|
Message: "First pass of child resource deletion completed",
|
|
},
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return controller.RequeueAfter(secondPassDelay)
|
|
}
|
|
}
|
|
|
|
func (r *tombstoneReconciler) secondPassDelayElapsed(status *pbresource.Status) bool {
|
|
firstPassTime := status.UpdatedAt.AsTime()
|
|
return firstPassTime.Add(secondPassDelay).Before(r.timeNow())
|
|
}
|
|
|
|
func isFirstPassCompleted(res *pbresource.Resource) bool {
|
|
if res.Status == nil {
|
|
return false
|
|
}
|
|
|
|
status, ok := res.Status[statusKeyReaperController]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
// First time through, first and second pass ahead of us
|
|
if len(status.Conditions) == 0 {
|
|
return false
|
|
}
|
|
|
|
// Single condition "FirstPassCompleted"
|
|
condition := status.Conditions[0]
|
|
return condition.State == pbresource.Condition_STATE_TRUE
|
|
}
|