resource: reconcile managed types every ~8hrs (#20606)

This commit is contained in:
Semir Patel 2024-02-13 10:51:54 -06:00 committed by GitHub
parent 35f1173689
commit b716a9ef6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 333 additions and 103 deletions

View File

@ -11,9 +11,10 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/go-hclog"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/consul/controller/queue"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
@ -380,7 +381,7 @@ func (c *controller) reconcileHandler(ctx context.Context, req Request) {
var requeueAfter RequeueAfterError
if errors.As(err, &requeueAfter) {
c.work.Forget(req)
c.work.AddAfter(req, time.Duration(requeueAfter))
c.work.AddAfter(req, time.Duration(requeueAfter), false)
return
}

View File

@ -18,8 +18,10 @@ type DeferQueue[T ItemType] interface {
// Defer defers processing a Request until a given time. When
// the timeout is hit, the request will be processed by the
// callback given in the Process loop. If the given context
// is canceled, the item is not deferred.
Defer(ctx context.Context, item T, until time.Time)
// is canceled, the item is not deferred. Override replaces
// any existing item regardless of the enqueue time when true.
Defer(ctx context.Context, item T, until time.Time, override bool)
// Process processes all items in the defer queue with the
// given callback, blocking until the given context is canceled.
// Callers should only ever call Process once, likely in a
@ -32,6 +34,9 @@ type DeferQueue[T ItemType] interface {
type deferredRequest[T ItemType] struct {
enqueueAt time.Time
item T
// override replaces any existing item when true regardless
// of the enqueue time
override bool
// index holds the index for the given heap entry so that if
// the entry is updated the heap can be re-sorted
index int
@ -64,10 +69,11 @@ func NewDeferQueue[T ItemType](tick time.Duration) DeferQueue[T] {
// Defer defers the given Request until the given time in the future. If the
// passed in context is canceled before the Request is deferred, then this
// immediately returns.
func (q *deferQueue[T]) Defer(ctx context.Context, item T, until time.Time) {
func (q *deferQueue[T]) Defer(ctx context.Context, item T, until time.Time, override bool) {
entry := &deferredRequest[T]{
enqueueAt: until,
item: item,
override: override,
}
select {
@ -79,9 +85,9 @@ func (q *deferQueue[T]) Defer(ctx context.Context, item T, until time.Time) {
// deferEntry adds a deferred request to the priority queue
func (q *deferQueue[T]) deferEntry(entry *deferredRequest[T]) {
existing, exists := q.entries[entry.item.Key()]
if exists {
// insert or update the item deferral time
if existing.enqueueAt.After(entry.enqueueAt) {
if exists {
if entry.override || entry.enqueueAt.Before(existing.enqueueAt) {
existing.enqueueAt = entry.enqueueAt
heap.Fix(q.heap, existing.index)
}

View File

@ -27,8 +27,9 @@ type WorkQueue[T ItemType] interface {
Get() (item T, shutdown bool)
// Add immediately adds a Request to the work queue.
Add(item T)
// AddAfter adds a Request to the work queue after a given amount of time.
AddAfter(item T, duration time.Duration)
// AddAfter adds a Request to the work queue after a given amount of time
// with the option to override any existing Request that may be scheduled.
AddAfter(item T, duration time.Duration, override bool)
// AddRateLimited adds a Request to the work queue after the amount of time
// specified by applying the queue's rate limiter.
AddRateLimited(item T)
@ -41,10 +42,10 @@ type WorkQueue[T ItemType] interface {
// queue implements a rate-limited work queue
type queue[T ItemType] struct {
// queue holds an ordered list of Requests needing to be processed
// queue holds an ordered list of non-deferred Requests needing to be processed
queue []T
// dirty holds the working set of all Requests, whether they are being
// dirty holds the working set of all non-deferred Requests, whether they are being
// processed or not
dirty map[string]struct{}
// processing holds the set of current requests being processed
@ -145,8 +146,9 @@ func (q *queue[T]) Add(item T) {
q.cond.Signal()
}
// AddAfter adds a Request to the work queue after a given amount of time.
func (q *queue[T]) AddAfter(item T, duration time.Duration) {
// AddAfter adds a Request to the work queue after a given amount of time with
// the option to override any existing Request that may be scheduled.
func (q *queue[T]) AddAfter(item T, duration time.Duration, override bool) {
// don't add if we're already shutting down
if q.shuttingDown() {
return
@ -158,13 +160,13 @@ func (q *queue[T]) AddAfter(item T, duration time.Duration) {
return
}
q.deferred.Defer(q.ctx, item, time.Now().Add(duration))
q.deferred.Defer(q.ctx, item, time.Now().Add(duration), override)
}
// AddRateLimited adds the given Request to the queue after applying the
// rate limiter to determine when the Request should next be processed.
func (q *queue[T]) AddRateLimited(item T) {
q.AddAfter(item, q.ratelimiter.NextRetry(item))
q.AddAfter(item, q.ratelimiter.NextRetry(item), false)
}
// Forget signals the queue to reset the rate-limiting for the given Request.

View File

@ -59,8 +59,8 @@ func (c *countingWorkQueue[T]) adds() uint64 {
return atomic.LoadUint64(&c.addCounter)
}
func (c *countingWorkQueue[T]) AddAfter(item T, duration time.Duration) {
c.inner.AddAfter(item, duration)
func (c *countingWorkQueue[T]) AddAfter(item T, duration time.Duration, override bool) {
c.inner.AddAfter(item, duration, override)
atomic.AddUint64(&c.addAfterCounter, 1)
}

View File

@ -47,6 +47,7 @@ func (r *testReconciler) setResponse(err error) {
func (r *testReconciler) step() {
r.stepChan <- struct{}{}
}
func (r *testReconciler) stepFor(duration time.Duration) {
select {
case r.stepChan <- struct{}{}:

View File

@ -45,6 +45,12 @@ type Controller struct {
logger hclog.Logger
startCb RuntimeCallback
stopCb RuntimeCallback
// forceReconcileEvery is the time to wait after a successful reconciliation
// before forcing a reconciliation. The net result is a reconciliation of
// the managed type on a regular interval. This ensures that the state of the
// world is continually reconciled, hence correct in the face of missed events
// or other issues.
forceReconcileEvery time.Duration
}
type RuntimeCallback func(context.Context, Runtime)
@ -53,7 +59,7 @@ type RuntimeCallback func(context.Context, Runtime)
// Extra cache indexes may be provided as well and these indexes will be automatically managed.
// Typically, further calls to other builder methods will be needed to fully configure
// the controller such as using WithReconcile to define the the code that will be called
// when the managed resource needs reconcilation.
// when the managed resource needs reconciliation.
func NewController(name string, managedType *pbresource.Type, indexes ...*index.Index) *Controller {
w := &watch{
watchedType: managedType,
@ -69,6 +75,7 @@ func NewController(name string, managedType *pbresource.Type, indexes ...*index.
managedTypeWatch: w,
watches: make(map[string]*watch),
queries: make(map[string]cache.Query),
forceReconcileEvery: 8 * time.Hour,
}
}
@ -170,6 +177,15 @@ func (ctl *Controller) WithPlacement(placement Placement) *Controller {
return ctl
}
// WithForceReconcileEvery controls how often a resource gets periodically reconciled
// to ensure that the state of the world is correct (8 hours is the default).
// This exists for tests only and should not be customized by controller authors!
func (ctl *Controller) WithForceReconcileEvery(duration time.Duration) *Controller {
ctl.logger.Warn("WithForceReconcileEvery is for testing only and should not be set by controllers")
ctl.forceReconcileEvery = duration
return ctl
}
// buildCache will construct a controller Cache given the watches/indexes that have
// been added to the controller. This is mainly to be used by the TestController and
// Manager when setting up how things

View File

@ -266,6 +266,158 @@ func waitForAtomicBoolValue(t testutil.TestingTB, actual *atomic.Bool, expected
})
}
func TestController_WithForceReconcileEvery_UpsertSuccess(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
// Given a controller
// When the controller reconciles a resource due to an upsert and succeeds
// Then the controller manager should scheduled a forced reconcile after forceReconcileEvery
rec := newTestReconciler()
client := svctest.NewResourceServiceBuilder().
WithRegisterFns(demo.RegisterTypes).
Run(t)
// Create sizeable gap between reconcile #1 and forced reconcile #2 to ensure the delay occurs
forceReconcileEvery := 5 * time.Second
ctrl := controller.
NewController("artist", pbdemov2.ArtistType).
WithLogger(testutil.Logger(t)).
WithForceReconcileEvery(forceReconcileEvery).
WithReconciler(rec)
mgr := controller.NewManager(client, testutil.Logger(t))
mgr.Register(ctrl)
mgr.SetRaftLeader(true)
go mgr.Run(testContext(t))
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
require.NoError(t, err)
// Verify reconcile #1 happens immediately
_, req := rec.waitFor(t, time.Second)
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)
// Verify no reconciles occur between reconcile #1 and forced reconcile #2.
// Remove a second for max jitter (20% of 5s) and one more second to be safe.
rec.expectNoRequest(t, forceReconcileEvery-time.Second-time.Second)
// Verify forced reconcile #2 occurred (forceReconcileEvery - 1s - 1s + 3s > forceReconcileEvery)
_, req = rec.waitFor(t, time.Second*3)
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)
}
func TestController_WithForceReconcileEvery_SkipOnReconcileError(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
// Given a controller configured with a forceReconcileEvery duration
// When the controller reconciles a resource due to an upsert and returns an error
// Then the controller manager should not schedule a forced reconcile and allow
// the existing error handling to schedule a rate-limited retry
rec := newTestReconciler()
client := svctest.NewResourceServiceBuilder().
WithRegisterFns(demo.RegisterTypes).
Run(t)
// Large enough gap to test for a period of no-reconciles
forceReconcileEvery := 5 * time.Second
ctrl := controller.
NewController("artist", pbdemov2.ArtistType).
WithLogger(testutil.Logger(t)).
WithForceReconcileEvery(forceReconcileEvery).
WithReconciler(rec)
mgr := controller.NewManager(client, testutil.Logger(t))
mgr.Register(ctrl)
mgr.SetRaftLeader(true)
go mgr.Run(testContext(t))
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
// Setup reconcile #1 to fail
rec.failNext(errors.New("reconcile #1 error"))
rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
require.NoError(t, err)
// Observe failed reconcile #1
_, req := rec.wait(t)
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)
// Observe successful (rate-limited retry) reconcile #2. By not failNext'ing it,
// we're expecting it now'ish.
_, req = rec.wait(t)
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)
// Observe no forced reconcile for gap after last successful reconcile
// -1s for 20% jitter reduction
// -1s for just to be safe
rec.expectNoRequest(t, forceReconcileEvery-time.Second-time.Second)
// Finally observe forced reconcile #3 up to 1 sec past (5-1-1+3) accumulated forceReconcileEvery delay
_, req = rec.waitFor(t, 3*time.Second)
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)
}
func TestController_WithForceReconcileEvery_SkipOnDelete(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
// Given a controller configured with a forceReconcileEvery duration
// When the controller reconciles a resource due to a delete and succeeds
// Then the controller manager should not schedule a forced reconcile
rec := newTestReconciler()
client := svctest.NewResourceServiceBuilder().
WithRegisterFns(demo.RegisterTypes).
Run(t)
// Large enough gap to test for a period of no-reconciles
forceReconcileEvery := 5 * time.Second
ctrl := controller.
NewController("artist", pbdemov2.ArtistType).
WithLogger(testutil.Logger(t)).
WithForceReconcileEvery(forceReconcileEvery).
WithReconciler(rec)
mgr := controller.NewManager(client, testutil.Logger(t))
mgr.Register(ctrl)
mgr.SetRaftLeader(true)
go mgr.Run(testContext(t))
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
require.NoError(t, err)
// Account for reconcile #1 due to initial write
_, req := rec.wait(t)
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)
// Perform a delete
_, err = client.Delete(testContext(t), &pbresource.DeleteRequest{Id: rsp.Resource.Id})
require.NoError(t, err)
// Account for the reconcile #2 due to the delete
_, req = rec.wait(t)
// Account for the deferred forced reconcile #3 from the original write event since deferred
// reconciles don't seem to be de-duped against non-deferred reconciles.
_, req = rec.waitFor(t, forceReconcileEvery)
// Verify no further reconciles occur
rec.expectNoRequest(t, forceReconcileEvery)
}
func TestController_Placement(t *testing.T) {
t.Parallel()
@ -493,8 +645,13 @@ func (r *testReconciler) Reconcile(_ context.Context, rt controller.Runtime, req
}
}
func (r *testReconciler) failNext(err error) { r.errors <- err }
func (r *testReconciler) panicNext(p any) { r.panics <- p }
func (r *testReconciler) failNext(err error) {
r.errors <- err
}
func (r *testReconciler) panicNext(p any) {
r.panics <- p
}
func (r *testReconciler) expectNoRequest(t *testing.T, duration time.Duration) {
t.Helper()
@ -509,12 +666,17 @@ func (r *testReconciler) expectNoRequest(t *testing.T, duration time.Duration) {
func (r *testReconciler) wait(t *testing.T) (controller.Runtime, controller.Request) {
t.Helper()
return r.waitFor(t, 500*time.Millisecond)
}
func (r *testReconciler) waitFor(t *testing.T, duration time.Duration) (controller.Runtime, controller.Request) {
t.Helper()
var args requestArgs
select {
case args = <-r.calls:
case <-time.After(500 * time.Millisecond):
t.Fatal("Reconcile was not called after 500ms")
case <-time.After(duration):
t.Fatalf("Reconcile was not called after %v", duration)
}
return args.rt, args.req
}

View File

@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"time"
"golang.org/x/sync/errgroup"
@ -66,34 +67,34 @@ func newControllerRunner(c *Controller, client pbresource.ResourceServiceClient,
}
}
func (c *controllerRunner) run(ctx context.Context) error {
c.logger.Debug("controller running")
defer c.logger.Debug("controller stopping")
func (cr *controllerRunner) run(ctx context.Context) error {
cr.logger.Debug("controller running")
defer cr.logger.Debug("controller stopping")
// Initialize the controller if required
if c.ctrl.initializer != nil {
c.logger.Debug("controller initializing")
err := c.ctrl.initializer.Initialize(ctx, c.runtime(c.logger))
if cr.ctrl.initializer != nil {
cr.logger.Debug("controller initializing")
err := cr.ctrl.initializer.Initialize(ctx, cr.runtime(cr.logger))
if err != nil {
return err
}
c.logger.Debug("controller initialized")
cr.logger.Debug("controller initialized")
}
c.cache = c.ctrl.buildCache()
cr.cache = cr.ctrl.buildCache()
defer func() {
// once no longer running we should nil out the cache
// so that we don't hold pointers to resources which may
// become out of date in the future.
c.cache = nil
cr.cache = nil
}()
if c.ctrl.startCb != nil {
c.ctrl.startCb(ctx, c.runtime(c.logger))
if cr.ctrl.startCb != nil {
cr.ctrl.startCb(ctx, cr.runtime(cr.logger))
}
if c.ctrl.stopCb != nil {
defer c.ctrl.stopCb(ctx, c.runtime(c.logger))
if cr.ctrl.stopCb != nil {
defer cr.ctrl.stopCb(ctx, cr.runtime(cr.logger))
}
// Before we launch the reconciler or the dependency mappers, ensure the
@ -102,76 +103,76 @@ func (c *controllerRunner) run(ctx context.Context) error {
// Without doing this the cache is unsafe for general use without causing
// reconcile regressions in certain cases.
{
c.logger.Debug("priming caches")
cr.logger.Debug("priming caches")
primeGroup, primeGroupCtx := errgroup.WithContext(ctx)
// Managed Type Events
primeGroup.Go(func() error {
return c.primeCache(primeGroupCtx, c.ctrl.managedTypeWatch.watchedType)
return cr.primeCache(primeGroupCtx, cr.ctrl.managedTypeWatch.watchedType)
})
for _, w := range c.ctrl.watches {
for _, w := range cr.ctrl.watches {
watcher := w
// Watched Type Events
primeGroup.Go(func() error {
return c.primeCache(primeGroupCtx, watcher.watchedType)
return cr.primeCache(primeGroupCtx, watcher.watchedType)
})
}
if err := primeGroup.Wait(); err != nil {
return err
}
c.logger.Debug("priming caches complete")
cr.logger.Debug("priming caches complete")
}
group, groupCtx := errgroup.WithContext(ctx)
recQueue := runQueue[Request](groupCtx, c.ctrl)
recQueue := runQueue[Request](groupCtx, cr.ctrl)
// Managed Type Events → Reconciliation Queue
// Managed Type Events → Managed Type Reconciliation Queue
group.Go(func() error {
return c.watch(groupCtx, c.ctrl.managedTypeWatch.watchedType, func(res *pbresource.Resource) {
return cr.watch(groupCtx, cr.ctrl.managedTypeWatch.watchedType, func(res *pbresource.Resource) {
recQueue.Add(Request{ID: res.Id})
})
})
for _, w := range c.ctrl.watches {
mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl)
for _, w := range cr.ctrl.watches {
mapQueue := runQueue[mapperRequest](groupCtx, cr.ctrl)
watcher := w
// Watched Type Events → Mapper Queue
// Watched Type Events → Watched Type Mapper Queue
group.Go(func() error {
return c.watch(groupCtx, watcher.watchedType, func(res *pbresource.Resource) {
return cr.watch(groupCtx, watcher.watchedType, func(res *pbresource.Resource) {
mapQueue.Add(mapperRequest{res: res})
})
})
// Mapper Queue → Mapper → Reconciliation Queue
// Watched Type Mapper Queue → Watched Type Mapper → Managed Type Reconciliation Queue
group.Go(func() error {
return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return cr.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(mapperRequest).res)
})
})
}
for _, cw := range c.ctrl.customWatches {
customMapQueue := runQueue[Event](groupCtx, c.ctrl)
for _, cw := range cr.ctrl.customWatches {
customMapQueue := runQueue[Event](groupCtx, cr.ctrl)
watcher := cw
// Custom Events → Mapper Queue
// Custom Events → Custom Mapper Queue
group.Go(func() error {
return watcher.source.Watch(groupCtx, func(e Event) {
customMapQueue.Add(e)
})
})
// Mapper Queue → Mapper → Reconciliation Queue
// Custom Mapper Queue → Custom Dependency Mapper → Managed Type Reconciliation Queue
group.Go(func() error {
return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return cr.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(Event))
})
})
}
// Reconciliation Queue → Reconciler
// Managed Type Reconciliation Queue → Reconciler
group.Go(func() error {
return c.runReconciler(groupCtx, recQueue)
return cr.runReconciler(groupCtx, recQueue)
})
return group.Wait()
@ -182,8 +183,8 @@ func runQueue[T queue.ItemType](ctx context.Context, ctrl *Controller) queue.Wor
return queue.RunWorkQueue[T](ctx, base, max)
}
func (c *controllerRunner) primeCache(ctx context.Context, typ *pbresource.Type) error {
wl, err := c.watchClient.WatchList(ctx, &pbresource.WatchListRequest{
func (cr *controllerRunner) primeCache(ctx context.Context, typ *pbresource.Type) error {
wl, err := cr.watchClient.WatchList(ctx, &pbresource.WatchListRequest{
Type: typ,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
@ -191,36 +192,36 @@ func (c *controllerRunner) primeCache(ctx context.Context, typ *pbresource.Type)
},
})
if err != nil {
c.handleInvalidControllerWatch(err)
c.logger.Error("failed to create cache priming watch", "error", err)
cr.handleInvalidControllerWatch(err)
cr.logger.Error("failed to create cache priming watch", "error", err)
return err
}
for {
event, err := wl.Recv()
if err != nil {
c.handleInvalidControllerWatch(err)
c.logger.Warn("error received from cache priming watch", "error", err)
cr.handleInvalidControllerWatch(err)
cr.logger.Warn("error received from cache priming watch", "error", err)
return err
}
switch {
case event.GetUpsert() != nil:
c.cache.Insert(event.GetUpsert().Resource)
cr.cache.Insert(event.GetUpsert().Resource)
case event.GetDelete() != nil:
c.cache.Delete(event.GetDelete().Resource)
cr.cache.Delete(event.GetDelete().Resource)
case event.GetEndOfSnapshot() != nil:
// This concludes the initial snapshot. The cache is primed.
return nil
default:
c.logger.Warn("skipping unexpected event type", "type", hclog.Fmt("%T", event.GetEvent()))
cr.logger.Warn("skipping unexpected event type", "type", hclog.Fmt("%T", event.GetEvent()))
continue
}
}
}
func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error {
wl, err := c.watchClient.WatchList(ctx, &pbresource.WatchListRequest{
func (cr *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error {
wl, err := cr.watchClient.WatchList(ctx, &pbresource.WatchListRequest{
Type: typ,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
@ -228,35 +229,35 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add
},
})
if err != nil {
c.handleInvalidControllerWatch(err)
c.logger.Error("failed to create watch", "error", err)
cr.handleInvalidControllerWatch(err)
cr.logger.Error("failed to create watch", "error", err)
return err
}
for {
event, err := wl.Recv()
if err != nil {
c.handleInvalidControllerWatch(err)
c.logger.Warn("error received from watch", "error", err)
cr.handleInvalidControllerWatch(err)
cr.logger.Warn("error received from watch", "error", err)
return err
}
// Keep the cache up to date. There main reason to do this here is
// to ensure that any mapper/reconciliation queue deduping wont
// to ensure that any mapper/reconciliation queue deduping won't
// hide events from being observed and updating the cache state.
// Therefore we should do this before any queueing.
var resource *pbresource.Resource
switch {
case event.GetUpsert() != nil:
resource = event.GetUpsert().GetResource()
c.cache.Insert(resource)
cr.cache.Insert(resource)
case event.GetDelete() != nil:
resource = event.GetDelete().GetResource()
c.cache.Delete(resource)
cr.cache.Delete(resource)
case event.GetEndOfSnapshot() != nil:
continue // ignore
default:
c.logger.Warn("skipping unexpected event type", "type", hclog.Fmt("%T", event.GetEvent()))
cr.logger.Warn("skipping unexpected event type", "type", hclog.Fmt("%T", event.GetEvent()))
continue
}
@ -272,14 +273,14 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add
}
}
func (c *controllerRunner) runMapper(
func (cr *controllerRunner) runMapper(
ctx context.Context,
w *watch,
from queue.WorkQueue[mapperRequest],
to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error {
logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType))
logger := cr.logger.With("watched_resource_type", resource.ToGVK(w.watchedType))
for {
item, shutdown := from.Get()
@ -287,7 +288,7 @@ func (c *controllerRunner) runMapper(
return nil
}
if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
if err := cr.doMap(ctx, mapper, to, item, logger); err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
@ -298,14 +299,14 @@ func (c *controllerRunner) runMapper(
}
}
func (c *controllerRunner) runCustomMapper(
func (cr *controllerRunner) runCustomMapper(
ctx context.Context,
cw customWatch,
from queue.WorkQueue[Event],
to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error {
logger := c.logger.With("watched_event", cw.source)
logger := cr.logger.With("watched_event", cw.source)
for {
item, shutdown := from.Get()
@ -313,7 +314,7 @@ func (c *controllerRunner) runCustomMapper(
return nil
}
if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
if err := cr.doMap(ctx, mapper, to, item, logger); err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
@ -324,20 +325,20 @@ func (c *controllerRunner) runCustomMapper(
}
}
func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error {
func (cr *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error {
var reqs []Request
if err := c.handlePanic(func() error {
if err := cr.handlePanic(func() error {
var err error
reqs, err = mapper(ctx, c.runtime(logger.With("map-request-key", item.Key())), item)
reqs, err = mapper(ctx, cr.runtime(logger.With("map-request-key", item.Key())), item)
return err
}); err != nil {
return err
}
for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedTypeWatch.watchedType) {
if !resource.EqualType(r.ID.Type, cr.ctrl.managedTypeWatch.watchedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedTypeWatch.watchedType),
"type_expected", resource.ToGVK(cr.ctrl.managedTypeWatch.watchedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
@ -347,24 +348,62 @@ func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Co
return nil
}
func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error {
// maybeScheduleForcedReconcile makes sure that a "reconcile the world" happens periodically for the
// controller's managed type.
func (cr *controllerRunner) maybeScheduleForcedReconcile(queue queue.WorkQueue[Request], req Request) {
// In order to periodically "reconcile the world", we schedule a deferred reconcile request
// (aka forced reconcile) minus a sizeable random jitter to avoid a thundering herd.
//
// A few notes on how this integrates with existing non-"reconcile the world" requests:
//
// 1. Successful reconciles result in a new deferred "reconcile the world" request being scheduled.
// The net effect is that the managed type will be continually reconciled regardless of any updates.
// 2. Failed reconciles are re-queued with a rate limit and get added to the deferred reconcile queue.
// Any existing deferred "reconcile the world" request will be replaced by the rate-limited deferred
// request.
// 3. An existing deferred "reconcile the world" request can't be removed on the successful reconcile
// of a delete operation. We rely on controller idempotency to eventually process the deferred request
// as a no-op.
_, err := cr.runtimeClient.Read(context.Background(), &pbresource.ReadRequest{Id: req.ID})
switch {
case err != nil && status.Code(err) == codes.NotFound:
// Resource was deleted -> nothing to force reconcile so do nothing
return
default:
// Reconcile of resource upsert was successful or we had an unexpected
// error. In either case, we should schedule a forced reconcile for completeness.
scheduleAt := reduceByRandomJitter(cr.ctrl.forceReconcileEvery)
queue.AddAfter(req, scheduleAt, true)
}
}
// reduceByRandomJitter returns a duration reduced by a random amount up to 20%.
func reduceByRandomJitter(d time.Duration) time.Duration {
percent := rand.Float64() * 0.2
reduction := time.Duration(float64(d) * percent)
return d - reduction
}
func (cr *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error {
for {
req, shutdown := queue.Get()
if shutdown {
return nil
}
c.logger.Trace("handling request", "request", req)
err := c.handlePanic(func() error {
return c.ctrl.reconciler.Reconcile(ctx, c.runtime(c.logger.With("resource-id", req.ID.String())), req)
cr.logger.Trace("handling request", "request", req)
err := cr.handlePanic(func() error {
return cr.ctrl.reconciler.Reconcile(ctx, cr.runtime(cr.logger.With("resource-id", req.ID.String())), req)
})
if err == nil {
queue.Forget(req)
cr.maybeScheduleForcedReconcile(queue, req)
} else {
cr.logger.Trace("post-processing reconcile failure")
var requeueAfter RequeueAfterError
if errors.As(err, &requeueAfter) {
queue.Forget(req)
queue.AddAfter(req, time.Duration(requeueAfter))
queue.AddAfter(req, time.Duration(requeueAfter), false)
} else {
queue.AddRateLimited(req)
}
@ -373,11 +412,11 @@ func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQu
}
}
func (c *controllerRunner) handlePanic(fn func() error) (err error) {
func (cr *controllerRunner) handlePanic(fn func() error) (err error) {
defer func() {
if r := recover(); r != nil {
stack := hclog.Stacktrace()
c.logger.Error("controller panic",
cr.logger.Error("controller panic",
"panic", r,
"stack", stack,
)
@ -389,27 +428,30 @@ func (c *controllerRunner) handlePanic(fn func() error) (err error) {
return fn()
}
func (c *controllerRunner) runtime(logger hclog.Logger) Runtime {
func (cr *controllerRunner) runtime(logger hclog.Logger) Runtime {
return Runtime{
// dependency mappers and controllers are always given the cloning client
// so that they do not have to care about mutating values that they read
// through the client.
Client: c.runtimeClient,
Client: cr.runtimeClient,
Logger: logger,
// ensure that resources queried via the cache get cloned so that the
// dependency mapper or reconciler is free to modify them.
Cache: cache.NewCloningReadOnlyCache(c.cache),
Cache: cache.NewCloningReadOnlyCache(cr.cache),
}
}
func (c *controllerRunner) handleInvalidControllerWatch(err error) {
func (cr *controllerRunner) handleInvalidControllerWatch(err error) {
st, ok := status.FromError(err)
if ok && st.Code() == codes.InvalidArgument {
panic(fmt.Sprintf("controller %s attempted to initiate an invalid watch: %q. This is a bug within the controller.", c.ctrl.name, err.Error()))
panic(fmt.Sprintf("controller %s attempted to initiate an invalid watch: %q. This is a bug within the controller.", cr.ctrl.name, err.Error()))
}
}
type mapperRequest struct{ res *pbresource.Resource }
type mapperRequest struct {
// res is the resource that was watched and is being mapped.
res *pbresource.Resource
}
// Key satisfies the queue.ItemType interface. It returns a string which will be
// used to de-duplicate requests in the queue.