mirror of
https://github.com/status-im/consul.git
synced 2025-01-26 13:40:20 +00:00
c3c836edae
* fix: update watch endpoint to default based on scope * test: additional test * refactor: rename list validate function * refactor: rename validate<Op>Request() -> ensure<Op>RequestValid() for consistency
249 lines
6.2 KiB
Go
249 lines
6.2 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package controller
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/hashicorp/consul/agent/consul/controller/queue"
|
|
"github.com/hashicorp/consul/internal/resource"
|
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
|
)
|
|
|
|
// controllerRunner contains the actual implementation of running a controller
|
|
// including creating watches, calling the reconciler, handling retries, etc.
|
|
type controllerRunner struct {
|
|
ctrl Controller
|
|
client pbresource.ResourceServiceClient
|
|
logger hclog.Logger
|
|
}
|
|
|
|
func (c *controllerRunner) run(ctx context.Context) error {
|
|
c.logger.Debug("controller running")
|
|
defer c.logger.Debug("controller stopping")
|
|
|
|
group, groupCtx := errgroup.WithContext(ctx)
|
|
recQueue := runQueue[Request](groupCtx, c.ctrl)
|
|
|
|
// Managed Type Events → Reconciliation Queue
|
|
group.Go(func() error {
|
|
return c.watch(groupCtx, c.ctrl.managedType, func(res *pbresource.Resource) {
|
|
recQueue.Add(Request{ID: res.Id})
|
|
})
|
|
})
|
|
|
|
for _, w := range c.ctrl.watches {
|
|
mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl)
|
|
watcher := w
|
|
// Watched Type Events → Mapper Queue
|
|
group.Go(func() error {
|
|
return c.watch(groupCtx, watcher.watchedType, func(res *pbresource.Resource) {
|
|
mapQueue.Add(mapperRequest{res: res})
|
|
})
|
|
})
|
|
|
|
// Mapper Queue → Mapper → Reconciliation Queue
|
|
group.Go(func() error {
|
|
return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
|
|
return watcher.mapper(ctx, runtime, itemType.(mapperRequest).res)
|
|
})
|
|
})
|
|
}
|
|
|
|
for _, cw := range c.ctrl.customWatches {
|
|
customMapQueue := runQueue[Event](groupCtx, c.ctrl)
|
|
watcher := cw
|
|
// Custom Events → Mapper Queue
|
|
group.Go(func() error {
|
|
return watcher.source.Watch(groupCtx, func(e Event) {
|
|
customMapQueue.Add(e)
|
|
})
|
|
})
|
|
|
|
// Mapper Queue → Mapper → Reconciliation Queue
|
|
group.Go(func() error {
|
|
return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
|
|
return watcher.mapper(ctx, runtime, itemType.(Event))
|
|
})
|
|
})
|
|
}
|
|
|
|
// Reconciliation Queue → Reconciler
|
|
group.Go(func() error {
|
|
return c.runReconciler(groupCtx, recQueue)
|
|
})
|
|
|
|
return group.Wait()
|
|
}
|
|
|
|
func runQueue[T queue.ItemType](ctx context.Context, ctrl Controller) queue.WorkQueue[T] {
|
|
base, max := ctrl.backoff()
|
|
return queue.RunWorkQueue[T](ctx, base, max)
|
|
}
|
|
|
|
func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error {
|
|
wl, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{
|
|
Type: typ,
|
|
})
|
|
if err != nil {
|
|
c.logger.Error("failed to create watch", "error", err)
|
|
return err
|
|
}
|
|
|
|
for {
|
|
event, err := wl.Recv()
|
|
if err != nil {
|
|
c.logger.Warn("error received from watch", "error", err)
|
|
return err
|
|
}
|
|
add(event.Resource)
|
|
}
|
|
}
|
|
|
|
func (c *controllerRunner) runMapper(
|
|
ctx context.Context,
|
|
w watch,
|
|
from queue.WorkQueue[mapperRequest],
|
|
to queue.WorkQueue[Request],
|
|
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
|
|
) error {
|
|
logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType))
|
|
|
|
for {
|
|
item, shutdown := from.Get()
|
|
if shutdown {
|
|
return nil
|
|
}
|
|
|
|
if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
|
|
from.AddRateLimited(item)
|
|
from.Done(item)
|
|
continue
|
|
}
|
|
|
|
from.Forget(item)
|
|
from.Done(item)
|
|
}
|
|
}
|
|
|
|
func (c *controllerRunner) runCustomMapper(
|
|
ctx context.Context,
|
|
cw customWatch,
|
|
from queue.WorkQueue[Event],
|
|
to queue.WorkQueue[Request],
|
|
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
|
|
) error {
|
|
logger := c.logger.With("watched_event", cw.source)
|
|
|
|
for {
|
|
item, shutdown := from.Get()
|
|
if shutdown {
|
|
return nil
|
|
}
|
|
|
|
if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
|
|
from.AddRateLimited(item)
|
|
from.Done(item)
|
|
continue
|
|
}
|
|
|
|
from.Forget(item)
|
|
from.Done(item)
|
|
}
|
|
}
|
|
|
|
func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error {
|
|
var reqs []Request
|
|
if err := c.handlePanic(func() error {
|
|
var err error
|
|
reqs, err = mapper(ctx, c.runtime(), item)
|
|
return err
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, r := range reqs {
|
|
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
|
|
logger.Error("dependency mapper returned request for a resource of the wrong type",
|
|
"type_expected", resource.ToGVK(c.ctrl.managedType),
|
|
"type_got", resource.ToGVK(r.ID.Type),
|
|
)
|
|
continue
|
|
}
|
|
to.Add(r)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error {
|
|
for {
|
|
req, shutdown := queue.Get()
|
|
if shutdown {
|
|
return nil
|
|
}
|
|
|
|
c.logger.Trace("handling request", "request", req)
|
|
err := c.handlePanic(func() error {
|
|
return c.ctrl.reconciler.Reconcile(ctx, c.runtime(), req)
|
|
})
|
|
if err == nil {
|
|
queue.Forget(req)
|
|
} else {
|
|
var requeueAfter RequeueAfterError
|
|
if errors.As(err, &requeueAfter) {
|
|
queue.Forget(req)
|
|
queue.AddAfter(req, time.Duration(requeueAfter))
|
|
} else {
|
|
queue.AddRateLimited(req)
|
|
}
|
|
}
|
|
queue.Done(req)
|
|
}
|
|
}
|
|
|
|
func (c *controllerRunner) handlePanic(fn func() error) (err error) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
stack := hclog.Stacktrace()
|
|
c.logger.Error("controller panic",
|
|
"panic", r,
|
|
"stack", stack,
|
|
)
|
|
err = fmt.Errorf("panic [recovered]: %v", r)
|
|
return
|
|
}
|
|
}()
|
|
|
|
return fn()
|
|
}
|
|
|
|
func (c *controllerRunner) runtime() Runtime {
|
|
return Runtime{
|
|
Client: c.client,
|
|
Logger: c.logger,
|
|
}
|
|
}
|
|
|
|
type mapperRequest struct{ res *pbresource.Resource }
|
|
|
|
// Key satisfies the queue.ItemType interface. It returns a string which will be
|
|
// used to de-duplicate requests in the queue.
|
|
func (i mapperRequest) Key() string {
|
|
return fmt.Sprintf(
|
|
"type=%q,part=%q,peer=%q,ns=%q,name=%q,uid=%q",
|
|
resource.ToGVK(i.res.Id.Type),
|
|
i.res.Id.Tenancy.Partition,
|
|
i.res.Id.Tenancy.PeerName,
|
|
i.res.Id.Tenancy.Namespace,
|
|
i.res.Id.Name,
|
|
i.res.Id.Uid,
|
|
)
|
|
}
|