consul/internal/controller/controller.go
Matt Keeler 326c0ecfbe
In-Memory gRPC (#19942)
* Implement In-Process gRPC for use by controller caching/indexing

This replaces the pipe base listener implementation we were previously using. The new style CAN avoid cloning resources which our controller caching/indexing is taking advantage of to not duplicate resource objects in memory.

To maintain safety for controllers and for them to be able to modify data they get back from the cache and the resource service, the client they are presented in their runtime will be wrapped with an autogenerated client which clones request and response messages as they pass through the client.

Another sizable change in this PR is to consolidate how server specific gRPC services get registered and managed. Before this was in a bunch of different methods and it was difficult to track down how gRPC services were registered. Now its all in one place.

* Fix race in tests

* Ensure the resource service is registered to the multiplexed handler for forwarding from client agents

* Expose peer streaming on the internal handler
2024-01-12 11:54:07 -05:00

312 lines
9.4 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package controller
import (
"context"
"fmt"
"strings"
"time"
"github.com/hashicorp/consul/internal/controller/cache"
"github.com/hashicorp/consul/internal/controller/cache/index"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/go-hclog"
)
// DependencyMapper is called when a dependency watched via WithWatch is changed
// to determine which of the controller's managed resources need to be reconciled.
type DependencyMapper func(
ctx context.Context,
rt Runtime,
res *pbresource.Resource,
) ([]Request, error)
// Controller runs a reconciliation loop to respond to changes in resources and
// their dependencies. It is heavily inspired by Kubernetes' controller pattern:
// https://kubernetes.io/docs/concepts/architecture/controller/
//
// Use the builder methods in this package (starting with NewController) to construct
// a controller, and then pass it to a Manager to be executed.
type Controller struct {
name string
reconciler Reconciler
managedTypeWatch *watch
watches map[string]*watch
queries map[string]cache.Query
customWatches []customWatch
placement Placement
baseBackoff time.Duration
maxBackoff time.Duration
logger hclog.Logger
startCb RuntimeCallback
stopCb RuntimeCallback
}
type RuntimeCallback func(context.Context, Runtime)
// NewController creates a controller that is setup to watched the managed type.
// Extra cache indexes may be provided as well and these indexes will be automatically managed.
// Typically, further calls to other builder methods will be needed to fully configure
// the controller such as using WithReconcile to define the the code that will be called
// when the managed resource needs reconcilation.
func NewController(name string, managedType *pbresource.Type, indexes ...*index.Index) *Controller {
w := &watch{
watchedType: managedType,
indexes: make(map[string]*index.Index),
}
for _, idx := range indexes {
w.addIndex(idx)
}
return &Controller{
name: name,
managedTypeWatch: w,
watches: make(map[string]*watch),
queries: make(map[string]cache.Query),
}
}
// WithNotifyStart registers a callback to be run when the controller is being started.
// This happens prior to watches being started and with a fresh cache.
func (ctl *Controller) WithNotifyStart(start RuntimeCallback) *Controller {
ctl.startCb = start
return ctl
}
// WithNotifyStop registers a callback to be run when the controller has been stopped.
// This happens after all the watches and mapper/reconcile queues have been stopped. The
// cache will contain everything that was present when we started stopping watches.
func (ctl *Controller) WithNotifyStop(stop RuntimeCallback) *Controller {
ctl.stopCb = stop
return ctl
}
// WithReconciler changes the controller's reconciler.
func (ctl *Controller) WithReconciler(reconciler Reconciler) *Controller {
if reconciler == nil {
panic("reconciler must not be nil")
}
ctl.reconciler = reconciler
return ctl
}
// WithWatch enables watching of the specified resource type and mapping it to the managed type
// via the provided DependencyMapper. Extra cache indexes to calculate on the watched type
// may also be provided.
func (ctl *Controller) WithWatch(watchedType *pbresource.Type, mapper DependencyMapper, indexes ...*index.Index) *Controller {
key := resource.ToGVK(watchedType)
_, alreadyWatched := ctl.watches[key]
if alreadyWatched {
panic(fmt.Sprintf("resource type %q already has a configured watch", key))
}
w := newWatch(watchedType, mapper)
for _, idx := range indexes {
w.addIndex(idx)
}
ctl.watches[key] = w
return ctl
}
// WithQuery will add a named query to the controllers cache for usage during reconcile or in dependency mappers
func (ctl *Controller) WithQuery(queryName string, fn cache.Query) *Controller {
_, duplicate := ctl.queries[queryName]
if duplicate {
panic(fmt.Sprintf("a predefined cache query with name %q already exists", queryName))
}
ctl.queries[queryName] = fn
return ctl
}
// WithCustomWatch adds a new custom watch. Custom watches do not affect the controller cache.
func (ctl *Controller) WithCustomWatch(source *Source, mapper CustomDependencyMapper) *Controller {
if source == nil {
panic("source must not be nil")
}
if mapper == nil {
panic("mapper must not be nil")
}
ctl.customWatches = append(ctl.customWatches, customWatch{source, mapper})
return ctl
}
// WithLogger changes the controller's logger.
func (ctl *Controller) WithLogger(logger hclog.Logger) *Controller {
if logger == nil {
panic("logger must not be nil")
}
ctl.logger = logger
return ctl
}
// WithBackoff changes the base and maximum backoff values for the controller's
// retry rate limiter.
func (ctl *Controller) WithBackoff(base, max time.Duration) *Controller {
ctl.baseBackoff = base
ctl.maxBackoff = max
return ctl
}
// WithPlacement changes where and how many replicas of the controller will run.
// In the majority of cases, the default placement (one leader elected instance
// per cluster) is the most appropriate and you shouldn't need to override it.
func (ctl *Controller) WithPlacement(placement Placement) *Controller {
ctl.placement = placement
return ctl
}
// buildCache will construct a controller Cache given the watches/indexes that have
// been added to the controller. This is mainly to be used by the TestController and
// Manager when setting up how things
func (ctl *Controller) buildCache() cache.Cache {
c := cache.New()
addWatchToCache(c, ctl.managedTypeWatch)
for _, w := range ctl.watches {
addWatchToCache(c, w)
}
for name, query := range ctl.queries {
if err := c.AddQuery(name, query); err != nil {
panic(err)
}
}
return c
}
// String returns a textual description of the controller, useful for debugging.
func (ctl *Controller) String() string {
watchedTypes := make([]string, 0, len(ctl.watches))
for watchedType := range ctl.watches {
watchedTypes = append(watchedTypes, watchedType)
}
base, max := ctl.backoff()
return fmt.Sprintf(
"<Controller managed_type=%s, watched_types=[%s], backoff=<base=%s, max=%s>, placement=%s>",
resource.ToGVK(ctl.managedTypeWatch.watchedType),
strings.Join(watchedTypes, ", "),
base, max,
ctl.placement,
)
}
func (ctl *Controller) backoff() (time.Duration, time.Duration) {
base := ctl.baseBackoff
if base == 0 {
base = 5 * time.Millisecond
}
max := ctl.maxBackoff
if max == 0 {
max = 1000 * time.Second
}
return base, max
}
func (ctl *Controller) buildLogger(defaultLogger hclog.Logger) hclog.Logger {
logger := defaultLogger
if ctl.logger != nil {
logger = ctl.logger
}
return logger.With("controller", ctl.name, "managed_type", resource.ToGVK(ctl.managedTypeWatch.watchedType))
}
func addWatchToCache(c cache.Cache, w *watch) {
c.AddType(w.watchedType)
for _, index := range w.indexes {
if err := c.AddIndex(w.watchedType, index); err != nil {
panic(err)
}
}
}
// Placement determines where and how many replicas of the controller will run.
type Placement int
const (
// PlacementSingleton ensures there is a single, leader-elected, instance of
// the controller running in the cluster at any time. It's the default and is
// suitable for most use-cases.
PlacementSingleton Placement = iota
// PlacementEachServer ensures there is a replica of the controller running on
// each server in the cluster. It is useful for cases where the controller is
// responsible for applying some configuration resource to the server whenever
// it changes (e.g. rate-limit configuration). Generally, controllers in this
// placement mode should not modify resources.
PlacementEachServer
)
// String satisfies the fmt.Stringer interface.
func (p Placement) String() string {
switch p {
case PlacementSingleton:
return "singleton"
case PlacementEachServer:
return "each-server"
}
panic(fmt.Sprintf("unknown placement %d", p))
}
// Reconciler implements the business logic of a controller.
type Reconciler interface {
// Reconcile the resource identified by req.ID.
Reconcile(ctx context.Context, rt Runtime, req Request) error
}
// RequeueAfterError is an error that allows a Reconciler to override the
// exponential backoff behavior of the Controller, rather than applying
// the backoff algorithm, returning a RequeueAfterError will cause the
// Controller to reschedule the Request at a given time in the future.
type RequeueAfterError time.Duration
// Error implements the error interface.
func (r RequeueAfterError) Error() string {
return fmt.Sprintf("requeue at %s", time.Duration(r))
}
// RequeueAfter constructs a RequeueAfterError with the given duration
// setting.
func RequeueAfter(after time.Duration) error {
return RequeueAfterError(after)
}
// RequeueNow constructs a RequeueAfterError that reschedules the Request
// immediately.
func RequeueNow() error {
return RequeueAfterError(0)
}
// Request represents a request to reconcile the resource with the given ID.
type Request struct {
// ID of the resource that needs to be reconciled.
ID *pbresource.ID
}
// Key satisfies the queue.ItemType interface. It returns a string which will be
// used to de-duplicate requests in the queue.
func (r Request) Key() string {
return fmt.Sprintf(
"part=%q,peer=%q,ns=%q,name=%q,uid=%q",
r.ID.Tenancy.Partition,
r.ID.Tenancy.PeerName,
r.ID.Tenancy.Namespace,
r.ID.Name,
r.ID.Uid,
)
}