mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 13:55:55 +00:00
d37572bd44
This will aggregate all HealthStatus objects owned by the Node and update the status of the Node with an overall health.
245 lines
6.8 KiB
Go
245 lines
6.8 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package controller
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
"github.com/hashicorp/consul/internal/resource"
|
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
|
)
|
|
|
|
// ForType begins building a Controller for the given resource type.
|
|
func ForType(managedType *pbresource.Type) Controller {
|
|
return Controller{managedType: managedType}
|
|
}
|
|
|
|
// WithReconciler changes the controller's reconciler.
|
|
func (c Controller) WithReconciler(reconciler Reconciler) Controller {
|
|
if reconciler == nil {
|
|
panic("reconciler must not be nil")
|
|
}
|
|
|
|
c.reconciler = reconciler
|
|
return c
|
|
}
|
|
|
|
// WithWatch adds a watch on the given type/dependency to the controller. mapper
|
|
// will be called to determine which resources must be reconciled as a result of
|
|
// a watched resource changing.
|
|
func (c Controller) WithWatch(watchedType *pbresource.Type, mapper DependencyMapper) Controller {
|
|
if watchedType == nil {
|
|
panic("watchedType must not be nil")
|
|
}
|
|
|
|
if mapper == nil {
|
|
panic("mapper must not be nil")
|
|
}
|
|
|
|
c.watches = append(c.watches, watch{watchedType, mapper})
|
|
return c
|
|
}
|
|
|
|
// WithLogger changes the controller's logger.
|
|
func (c Controller) WithLogger(logger hclog.Logger) Controller {
|
|
if logger == nil {
|
|
panic("logger must not be nil")
|
|
}
|
|
|
|
c.logger = logger
|
|
return c
|
|
}
|
|
|
|
// WithBackoff changes the base and maximum backoff values for the controller's
|
|
// retry rate limiter.
|
|
func (c Controller) WithBackoff(base, max time.Duration) Controller {
|
|
c.baseBackoff = base
|
|
c.maxBackoff = max
|
|
return c
|
|
}
|
|
|
|
// WithPlacement changes where and how many replicas of the controller will run.
|
|
// In the majority of cases, the default placement (one leader elected instance
|
|
// per cluster) is the most appropriate and you shouldn't need to override it.
|
|
func (c Controller) WithPlacement(placement Placement) Controller {
|
|
c.placement = placement
|
|
return c
|
|
}
|
|
|
|
// String returns a textual description of the controller, useful for debugging.
|
|
func (c Controller) String() string {
|
|
watchedTypes := make([]string, len(c.watches))
|
|
for idx, w := range c.watches {
|
|
watchedTypes[idx] = fmt.Sprintf("%q", resource.ToGVK(w.watchedType))
|
|
}
|
|
base, max := c.backoff()
|
|
return fmt.Sprintf(
|
|
"<Controller managed_type=%q, watched_types=[%s], backoff=<base=%q, max=%q>, placement=%q>",
|
|
resource.ToGVK(c.managedType),
|
|
strings.Join(watchedTypes, ", "),
|
|
base, max,
|
|
c.placement,
|
|
)
|
|
}
|
|
|
|
func (c Controller) backoff() (time.Duration, time.Duration) {
|
|
base := c.baseBackoff
|
|
if base == 0 {
|
|
base = 5 * time.Millisecond
|
|
}
|
|
max := c.maxBackoff
|
|
if max == 0 {
|
|
max = 1000 * time.Second
|
|
}
|
|
return base, max
|
|
}
|
|
|
|
// Controller runs a reconciliation loop to respond to changes in resources and
|
|
// their dependencies. It is heavily inspired by Kubernetes' controller pattern:
|
|
// https://kubernetes.io/docs/concepts/architecture/controller/
|
|
//
|
|
// Use the builder methods in this package (starting with ForType) to construct
|
|
// a controller, and then pass it to a Manager to be executed.
|
|
type Controller struct {
|
|
managedType *pbresource.Type
|
|
reconciler Reconciler
|
|
logger hclog.Logger
|
|
watches []watch
|
|
baseBackoff time.Duration
|
|
maxBackoff time.Duration
|
|
placement Placement
|
|
}
|
|
|
|
type watch struct {
|
|
watchedType *pbresource.Type
|
|
mapper DependencyMapper
|
|
}
|
|
|
|
// Request represents a request to reconcile the resource with the given ID.
|
|
type Request struct {
|
|
// ID of the resource that needs to be reconciled.
|
|
ID *pbresource.ID
|
|
}
|
|
|
|
// Key satisfies the queue.ItemType interface. It returns a string which will be
|
|
// used to de-duplicate requests in the queue.
|
|
func (r Request) Key() string {
|
|
return fmt.Sprintf(
|
|
"part=%q,peer=%q,ns=%q,name=%q,uid=%q",
|
|
r.ID.Tenancy.Partition,
|
|
r.ID.Tenancy.PeerName,
|
|
r.ID.Tenancy.Namespace,
|
|
r.ID.Name,
|
|
r.ID.Uid,
|
|
)
|
|
}
|
|
|
|
// Runtime contains the dependencies required by reconcilers.
|
|
type Runtime struct {
|
|
Client pbresource.ResourceServiceClient
|
|
Logger hclog.Logger
|
|
}
|
|
|
|
// Reconciler implements the business logic of a controller.
|
|
type Reconciler interface {
|
|
// Reconcile the resource identified by req.ID.
|
|
Reconcile(ctx context.Context, rt Runtime, req Request) error
|
|
}
|
|
|
|
// DependencyMapper is called when a dependency watched via WithWatch is changed
|
|
// to determine which of the controller's managed resources need to be reconciled.
|
|
type DependencyMapper func(
|
|
ctx context.Context,
|
|
rt Runtime,
|
|
res *pbresource.Resource,
|
|
) ([]Request, error)
|
|
|
|
// MapOwner implements a DependencyMapper that returns the updated resource's owner.
|
|
func MapOwner(_ context.Context, _ Runtime, res *pbresource.Resource) ([]Request, error) {
|
|
var reqs []Request
|
|
if res.Owner != nil {
|
|
reqs = append(reqs, Request{ID: res.Owner})
|
|
}
|
|
return reqs, nil
|
|
}
|
|
|
|
func MapOwnerFiltered(filter *pbresource.Type) DependencyMapper {
|
|
return func(_ context.Context, _ Runtime, res *pbresource.Resource) ([]Request, error) {
|
|
if res.Owner == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
ownerType := res.Owner.GetType()
|
|
if ownerType.Group != filter.Group {
|
|
return nil, nil
|
|
}
|
|
|
|
if ownerType.GroupVersion != filter.GroupVersion {
|
|
return nil, nil
|
|
}
|
|
|
|
if ownerType.Kind != filter.Kind {
|
|
return nil, nil
|
|
}
|
|
|
|
return []Request{{ID: res.Owner}}, nil
|
|
}
|
|
}
|
|
|
|
// Placement determines where and how many replicas of the controller will run.
|
|
type Placement int
|
|
|
|
const (
|
|
// PlacementSingleton ensures there is a single, leader-elected, instance of
|
|
// the controller running in the cluster at any time. It's the default and is
|
|
// suitable for most use-cases.
|
|
PlacementSingleton Placement = iota
|
|
|
|
// PlacementEachServer ensures there is a replica of the controller running on
|
|
// each server in the cluster. It is useful for cases where the controller is
|
|
// responsible for applying some configuration resource to the server whenever
|
|
// it changes (e.g. rate-limit configuration). Generally, controllers in this
|
|
// placement mode should not modify resources.
|
|
PlacementEachServer
|
|
)
|
|
|
|
// String satisfies the fmt.Stringer interface.
|
|
func (p Placement) String() string {
|
|
switch p {
|
|
case PlacementSingleton:
|
|
return "singleton"
|
|
case PlacementEachServer:
|
|
return "each-server"
|
|
}
|
|
panic(fmt.Sprintf("unknown placement %d", p))
|
|
}
|
|
|
|
// RequeueAfterError is an error that allows a Reconciler to override the
|
|
// exponential backoff behavior of the Controller, rather than applying
|
|
// the backoff algorithm, returning a RequeueAfterError will cause the
|
|
// Controller to reschedule the Request at a given time in the future.
|
|
type RequeueAfterError time.Duration
|
|
|
|
// Error implements the error interface.
|
|
func (r RequeueAfterError) Error() string {
|
|
return fmt.Sprintf("requeue at %s", time.Duration(r))
|
|
}
|
|
|
|
// RequeueAfter constructs a RequeueAfterError with the given duration
|
|
// setting.
|
|
func RequeueAfter(after time.Duration) error {
|
|
return RequeueAfterError(after)
|
|
}
|
|
|
|
// RequeueNow constructs a RequeueAfterError that reschedules the Request
|
|
// immediately.
|
|
func RequeueNow() error {
|
|
return RequeueAfterError(0)
|
|
}
|