consul/internal/controller/controller.go

203 lines
4.8 KiB
Go
Raw Normal View History

2023-05-09 14:25:55 +00:00
// Copyright (c) HashiCorp, Inc.
[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
2023-08-11 13:12:13 +00:00
// SPDX-License-Identifier: BUSL-1.1
2023-05-09 14:25:55 +00:00
2023-04-25 11:52:35 +00:00
package controller
import (
"context"
2023-05-09 14:25:55 +00:00
"errors"
"fmt"
"time"
2023-04-25 11:52:35 +00:00
"github.com/hashicorp/go-hclog"
2023-05-09 14:25:55 +00:00
"golang.org/x/sync/errgroup"
"github.com/hashicorp/consul/agent/consul/controller/queue"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/proto-public/pbresource"
2023-04-25 11:52:35 +00:00
)
// controllerRunner contains the actual implementation of running a controller
// including creating watches, calling the reconciler, handling retries, etc.
type controllerRunner struct {
ctrl Controller
2023-05-09 14:25:55 +00:00
client pbresource.ResourceServiceClient
2023-04-25 11:52:35 +00:00
logger hclog.Logger
}
func (c *controllerRunner) run(ctx context.Context) error {
c.logger.Debug("controller running")
defer c.logger.Debug("controller stopping")
2023-05-09 14:25:55 +00:00
group, groupCtx := errgroup.WithContext(ctx)
recQueue := runQueue[Request](groupCtx, c.ctrl)
// Managed Type Events → Reconciliation Queue
group.Go(func() error {
return c.watch(groupCtx, c.ctrl.managedType, func(res *pbresource.Resource) {
recQueue.Add(Request{ID: res.Id})
})
})
for _, watch := range c.ctrl.watches {
watch := watch
mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl)
2023-05-09 14:25:55 +00:00
// Watched Type Events → Mapper Queue
group.Go(func() error {
return c.watch(groupCtx, watch.watchedType, func(res *pbresource.Resource) {
mapQueue.Add(mapperRequest{res: res})
})
2023-05-09 14:25:55 +00:00
})
// Mapper Queue → Mapper → Reconciliation Queue
group.Go(func() error {
return c.runMapper(groupCtx, watch, mapQueue, recQueue)
})
}
// Reconciliation Queue → Reconciler
group.Go(func() error {
return c.runReconciler(groupCtx, recQueue)
})
return group.Wait()
}
func runQueue[T queue.ItemType](ctx context.Context, ctrl Controller) queue.WorkQueue[T] {
base, max := ctrl.backoff()
return queue.RunWorkQueue[T](ctx, base, max)
}
func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error {
watch, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{
Type: typ,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
PeerName: storage.Wildcard,
Namespace: storage.Wildcard,
},
})
if err != nil {
c.logger.Error("failed to create watch", "error", err)
return err
}
for {
event, err := watch.Recv()
if err != nil {
c.logger.Warn("error received from watch", "error", err)
return err
}
add(event.Resource)
}
}
func (c *controllerRunner) runMapper(
ctx context.Context,
w watch,
from queue.WorkQueue[mapperRequest],
2023-05-09 14:25:55 +00:00
to queue.WorkQueue[Request],
) error {
logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType))
for {
item, shutdown := from.Get()
2023-05-09 14:25:55 +00:00
if shutdown {
return nil
}
var reqs []Request
err := c.handlePanic(func() error {
var err error
reqs, err = w.mapper(ctx, c.runtime(), item.res)
2023-05-09 14:25:55 +00:00
return err
})
if err != nil {
from.AddRateLimited(item)
from.Done(item)
2023-05-09 14:25:55 +00:00
continue
}
for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
2023-05-09 14:25:55 +00:00
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
}
to.Add(r)
}
from.Forget(item)
from.Done(item)
2023-05-09 14:25:55 +00:00
}
}
func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error {
for {
req, shutdown := queue.Get()
if shutdown {
return nil
}
c.logger.Trace("handling request", "request", req)
err := c.handlePanic(func() error {
return c.ctrl.reconciler.Reconcile(ctx, c.runtime(), req)
})
if err == nil {
queue.Forget(req)
} else {
var requeueAfter RequeueAfterError
if errors.As(err, &requeueAfter) {
queue.Forget(req)
queue.AddAfter(req, time.Duration(requeueAfter))
} else {
queue.AddRateLimited(req)
}
}
queue.Done(req)
}
}
func (c *controllerRunner) handlePanic(fn func() error) (err error) {
defer func() {
if r := recover(); r != nil {
stack := hclog.Stacktrace()
c.logger.Error("controller panic",
"panic", r,
"stack", stack,
)
err = fmt.Errorf("panic [recovered]: %v", r)
return
}
}()
return fn()
}
func (c *controllerRunner) runtime() Runtime {
return Runtime{
Client: c.client,
Logger: c.logger,
}
2023-04-25 11:52:35 +00:00
}
type mapperRequest struct{ res *pbresource.Resource }
// Key satisfies the queue.ItemType interface. It returns a string which will be
// used to de-duplicate requests in the queue.
func (i mapperRequest) Key() string {
return fmt.Sprintf(
"type=%q,part=%q,peer=%q,ns=%q,name=%q,uid=%q",
resource.ToGVK(i.res.Id.Type),
i.res.Id.Tenancy.Partition,
i.res.Id.Tenancy.PeerName,
i.res.Id.Tenancy.Namespace,
i.res.Id.Name,
i.res.Id.Uid,
)
}