From b716a9ef6baff76709bbe64d3d767c3973b72184 Mon Sep 17 00:00:00 2001 From: Semir Patel Date: Tue, 13 Feb 2024 10:51:54 -0600 Subject: [PATCH] resource: reconcile managed types every ~8hrs (#20606) --- agent/consul/controller/controller.go | 5 +- agent/consul/controller/queue/defer.go | 16 +- agent/consul/controller/queue/queue.go | 18 +- agent/consul/controller/queue_test.go | 4 +- agent/consul/controller/reconciler_test.go | 1 + internal/controller/controller.go | 26 ++- internal/controller/controller_test.go | 170 +++++++++++++++++- internal/controller/runner.go | 196 +++++++++++++-------- 8 files changed, 333 insertions(+), 103 deletions(-) diff --git a/agent/consul/controller/controller.go b/agent/consul/controller/controller.go index 1eccb7e7ab..29f864a7a6 100644 --- a/agent/consul/controller/controller.go +++ b/agent/consul/controller/controller.go @@ -11,9 +11,10 @@ import ( "sync/atomic" "time" - "github.com/hashicorp/go-hclog" "golang.org/x/sync/errgroup" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/agent/consul/controller/queue" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" @@ -380,7 +381,7 @@ func (c *controller) reconcileHandler(ctx context.Context, req Request) { var requeueAfter RequeueAfterError if errors.As(err, &requeueAfter) { c.work.Forget(req) - c.work.AddAfter(req, time.Duration(requeueAfter)) + c.work.AddAfter(req, time.Duration(requeueAfter), false) return } diff --git a/agent/consul/controller/queue/defer.go b/agent/consul/controller/queue/defer.go index e9b8a9c3ad..6ba5d09aa9 100644 --- a/agent/consul/controller/queue/defer.go +++ b/agent/consul/controller/queue/defer.go @@ -18,8 +18,10 @@ type DeferQueue[T ItemType] interface { // Defer defers processing a Request until a given time. When // the timeout is hit, the request will be processed by the // callback given in the Process loop. If the given context - // is canceled, the item is not deferred. - Defer(ctx context.Context, item T, until time.Time) + // is canceled, the item is not deferred. Override replaces + // any existing item regardless of the enqueue time when true. + Defer(ctx context.Context, item T, until time.Time, override bool) + // Process processes all items in the defer queue with the // given callback, blocking until the given context is canceled. // Callers should only ever call Process once, likely in a @@ -32,6 +34,9 @@ type DeferQueue[T ItemType] interface { type deferredRequest[T ItemType] struct { enqueueAt time.Time item T + // override replaces any existing item when true regardless + // of the enqueue time + override bool // index holds the index for the given heap entry so that if // the entry is updated the heap can be re-sorted index int @@ -64,10 +69,11 @@ func NewDeferQueue[T ItemType](tick time.Duration) DeferQueue[T] { // Defer defers the given Request until the given time in the future. If the // passed in context is canceled before the Request is deferred, then this // immediately returns. -func (q *deferQueue[T]) Defer(ctx context.Context, item T, until time.Time) { +func (q *deferQueue[T]) Defer(ctx context.Context, item T, until time.Time, override bool) { entry := &deferredRequest[T]{ enqueueAt: until, item: item, + override: override, } select { @@ -79,9 +85,9 @@ func (q *deferQueue[T]) Defer(ctx context.Context, item T, until time.Time) { // deferEntry adds a deferred request to the priority queue func (q *deferQueue[T]) deferEntry(entry *deferredRequest[T]) { existing, exists := q.entries[entry.item.Key()] + // insert or update the item deferral time if exists { - // insert or update the item deferral time - if existing.enqueueAt.After(entry.enqueueAt) { + if entry.override || entry.enqueueAt.Before(existing.enqueueAt) { existing.enqueueAt = entry.enqueueAt heap.Fix(q.heap, existing.index) } diff --git a/agent/consul/controller/queue/queue.go b/agent/consul/controller/queue/queue.go index 92c624cc2a..ed26ca6ff5 100644 --- a/agent/consul/controller/queue/queue.go +++ b/agent/consul/controller/queue/queue.go @@ -27,8 +27,9 @@ type WorkQueue[T ItemType] interface { Get() (item T, shutdown bool) // Add immediately adds a Request to the work queue. Add(item T) - // AddAfter adds a Request to the work queue after a given amount of time. - AddAfter(item T, duration time.Duration) + // AddAfter adds a Request to the work queue after a given amount of time + // with the option to override any existing Request that may be scheduled. + AddAfter(item T, duration time.Duration, override bool) // AddRateLimited adds a Request to the work queue after the amount of time // specified by applying the queue's rate limiter. AddRateLimited(item T) @@ -41,10 +42,10 @@ type WorkQueue[T ItemType] interface { // queue implements a rate-limited work queue type queue[T ItemType] struct { - // queue holds an ordered list of Requests needing to be processed + // queue holds an ordered list of non-deferred Requests needing to be processed queue []T - // dirty holds the working set of all Requests, whether they are being + // dirty holds the working set of all non-deferred Requests, whether they are being // processed or not dirty map[string]struct{} // processing holds the set of current requests being processed @@ -145,8 +146,9 @@ func (q *queue[T]) Add(item T) { q.cond.Signal() } -// AddAfter adds a Request to the work queue after a given amount of time. -func (q *queue[T]) AddAfter(item T, duration time.Duration) { +// AddAfter adds a Request to the work queue after a given amount of time with +// the option to override any existing Request that may be scheduled. +func (q *queue[T]) AddAfter(item T, duration time.Duration, override bool) { // don't add if we're already shutting down if q.shuttingDown() { return @@ -158,13 +160,13 @@ func (q *queue[T]) AddAfter(item T, duration time.Duration) { return } - q.deferred.Defer(q.ctx, item, time.Now().Add(duration)) + q.deferred.Defer(q.ctx, item, time.Now().Add(duration), override) } // AddRateLimited adds the given Request to the queue after applying the // rate limiter to determine when the Request should next be processed. func (q *queue[T]) AddRateLimited(item T) { - q.AddAfter(item, q.ratelimiter.NextRetry(item)) + q.AddAfter(item, q.ratelimiter.NextRetry(item), false) } // Forget signals the queue to reset the rate-limiting for the given Request. diff --git a/agent/consul/controller/queue_test.go b/agent/consul/controller/queue_test.go index cb6f609829..b7aa08af65 100644 --- a/agent/consul/controller/queue_test.go +++ b/agent/consul/controller/queue_test.go @@ -59,8 +59,8 @@ func (c *countingWorkQueue[T]) adds() uint64 { return atomic.LoadUint64(&c.addCounter) } -func (c *countingWorkQueue[T]) AddAfter(item T, duration time.Duration) { - c.inner.AddAfter(item, duration) +func (c *countingWorkQueue[T]) AddAfter(item T, duration time.Duration, override bool) { + c.inner.AddAfter(item, duration, override) atomic.AddUint64(&c.addAfterCounter, 1) } diff --git a/agent/consul/controller/reconciler_test.go b/agent/consul/controller/reconciler_test.go index c3b8a450b1..5229e7d1d4 100644 --- a/agent/consul/controller/reconciler_test.go +++ b/agent/consul/controller/reconciler_test.go @@ -47,6 +47,7 @@ func (r *testReconciler) setResponse(err error) { func (r *testReconciler) step() { r.stepChan <- struct{}{} } + func (r *testReconciler) stepFor(duration time.Duration) { select { case r.stepChan <- struct{}{}: diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 8d4297e00c..c418854344 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -45,6 +45,12 @@ type Controller struct { logger hclog.Logger startCb RuntimeCallback stopCb RuntimeCallback + // forceReconcileEvery is the time to wait after a successful reconciliation + // before forcing a reconciliation. The net result is a reconciliation of + // the managed type on a regular interval. This ensures that the state of the + // world is continually reconciled, hence correct in the face of missed events + // or other issues. + forceReconcileEvery time.Duration } type RuntimeCallback func(context.Context, Runtime) @@ -53,7 +59,7 @@ type RuntimeCallback func(context.Context, Runtime) // Extra cache indexes may be provided as well and these indexes will be automatically managed. // Typically, further calls to other builder methods will be needed to fully configure // the controller such as using WithReconcile to define the the code that will be called -// when the managed resource needs reconcilation. +// when the managed resource needs reconciliation. func NewController(name string, managedType *pbresource.Type, indexes ...*index.Index) *Controller { w := &watch{ watchedType: managedType, @@ -65,10 +71,11 @@ func NewController(name string, managedType *pbresource.Type, indexes ...*index. } return &Controller{ - name: name, - managedTypeWatch: w, - watches: make(map[string]*watch), - queries: make(map[string]cache.Query), + name: name, + managedTypeWatch: w, + watches: make(map[string]*watch), + queries: make(map[string]cache.Query), + forceReconcileEvery: 8 * time.Hour, } } @@ -170,6 +177,15 @@ func (ctl *Controller) WithPlacement(placement Placement) *Controller { return ctl } +// WithForceReconcileEvery controls how often a resource gets periodically reconciled +// to ensure that the state of the world is correct (8 hours is the default). +// This exists for tests only and should not be customized by controller authors! +func (ctl *Controller) WithForceReconcileEvery(duration time.Duration) *Controller { + ctl.logger.Warn("WithForceReconcileEvery is for testing only and should not be set by controllers") + ctl.forceReconcileEvery = duration + return ctl +} + // buildCache will construct a controller Cache given the watches/indexes that have // been added to the controller. This is mainly to be used by the TestController and // Manager when setting up how things diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index edcb57d1c0..db1c49b505 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -266,6 +266,158 @@ func waitForAtomicBoolValue(t testutil.TestingTB, actual *atomic.Bool, expected }) } +func TestController_WithForceReconcileEvery_UpsertSuccess(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + t.Parallel() + + // Given a controller + // When the controller reconciles a resource due to an upsert and succeeds + // Then the controller manager should scheduled a forced reconcile after forceReconcileEvery + rec := newTestReconciler() + client := svctest.NewResourceServiceBuilder(). + WithRegisterFns(demo.RegisterTypes). + Run(t) + + // Create sizeable gap between reconcile #1 and forced reconcile #2 to ensure the delay occurs + forceReconcileEvery := 5 * time.Second + ctrl := controller. + NewController("artist", pbdemov2.ArtistType). + WithLogger(testutil.Logger(t)). + WithForceReconcileEvery(forceReconcileEvery). + WithReconciler(rec) + + mgr := controller.NewManager(client, testutil.Logger(t)) + mgr.Register(ctrl) + mgr.SetRaftLeader(true) + go mgr.Run(testContext(t)) + + res, err := demo.GenerateV2Artist() + require.NoError(t, err) + + rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res}) + require.NoError(t, err) + + // Verify reconcile #1 happens immediately + _, req := rec.waitFor(t, time.Second) + prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID) + + // Verify no reconciles occur between reconcile #1 and forced reconcile #2. + // Remove a second for max jitter (20% of 5s) and one more second to be safe. + rec.expectNoRequest(t, forceReconcileEvery-time.Second-time.Second) + + // Verify forced reconcile #2 occurred (forceReconcileEvery - 1s - 1s + 3s > forceReconcileEvery) + _, req = rec.waitFor(t, time.Second*3) + prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID) +} + +func TestController_WithForceReconcileEvery_SkipOnReconcileError(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + t.Parallel() + + // Given a controller configured with a forceReconcileEvery duration + // When the controller reconciles a resource due to an upsert and returns an error + // Then the controller manager should not schedule a forced reconcile and allow + // the existing error handling to schedule a rate-limited retry + rec := newTestReconciler() + client := svctest.NewResourceServiceBuilder(). + WithRegisterFns(demo.RegisterTypes). + Run(t) + + // Large enough gap to test for a period of no-reconciles + forceReconcileEvery := 5 * time.Second + ctrl := controller. + NewController("artist", pbdemov2.ArtistType). + WithLogger(testutil.Logger(t)). + WithForceReconcileEvery(forceReconcileEvery). + WithReconciler(rec) + + mgr := controller.NewManager(client, testutil.Logger(t)) + mgr.Register(ctrl) + mgr.SetRaftLeader(true) + go mgr.Run(testContext(t)) + + res, err := demo.GenerateV2Artist() + require.NoError(t, err) + + // Setup reconcile #1 to fail + rec.failNext(errors.New("reconcile #1 error")) + rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res}) + require.NoError(t, err) + + // Observe failed reconcile #1 + _, req := rec.wait(t) + prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID) + + // Observe successful (rate-limited retry) reconcile #2. By not failNext'ing it, + // we're expecting it now'ish. + _, req = rec.wait(t) + prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID) + + // Observe no forced reconcile for gap after last successful reconcile + // -1s for 20% jitter reduction + // -1s for just to be safe + rec.expectNoRequest(t, forceReconcileEvery-time.Second-time.Second) + + // Finally observe forced reconcile #3 up to 1 sec past (5-1-1+3) accumulated forceReconcileEvery delay + _, req = rec.waitFor(t, 3*time.Second) + prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID) +} + +func TestController_WithForceReconcileEvery_SkipOnDelete(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + t.Parallel() + // Given a controller configured with a forceReconcileEvery duration + // When the controller reconciles a resource due to a delete and succeeds + // Then the controller manager should not schedule a forced reconcile + rec := newTestReconciler() + client := svctest.NewResourceServiceBuilder(). + WithRegisterFns(demo.RegisterTypes). + Run(t) + + // Large enough gap to test for a period of no-reconciles + forceReconcileEvery := 5 * time.Second + ctrl := controller. + NewController("artist", pbdemov2.ArtistType). + WithLogger(testutil.Logger(t)). + WithForceReconcileEvery(forceReconcileEvery). + WithReconciler(rec) + + mgr := controller.NewManager(client, testutil.Logger(t)) + mgr.Register(ctrl) + mgr.SetRaftLeader(true) + go mgr.Run(testContext(t)) + + res, err := demo.GenerateV2Artist() + require.NoError(t, err) + + rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res}) + require.NoError(t, err) + + // Account for reconcile #1 due to initial write + _, req := rec.wait(t) + prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID) + + // Perform a delete + _, err = client.Delete(testContext(t), &pbresource.DeleteRequest{Id: rsp.Resource.Id}) + require.NoError(t, err) + + // Account for the reconcile #2 due to the delete + _, req = rec.wait(t) + + // Account for the deferred forced reconcile #3 from the original write event since deferred + // reconciles don't seem to be de-duped against non-deferred reconciles. + _, req = rec.waitFor(t, forceReconcileEvery) + + // Verify no further reconciles occur + rec.expectNoRequest(t, forceReconcileEvery) +} + func TestController_Placement(t *testing.T) { t.Parallel() @@ -493,8 +645,13 @@ func (r *testReconciler) Reconcile(_ context.Context, rt controller.Runtime, req } } -func (r *testReconciler) failNext(err error) { r.errors <- err } -func (r *testReconciler) panicNext(p any) { r.panics <- p } +func (r *testReconciler) failNext(err error) { + r.errors <- err +} + +func (r *testReconciler) panicNext(p any) { + r.panics <- p +} func (r *testReconciler) expectNoRequest(t *testing.T, duration time.Duration) { t.Helper() @@ -509,12 +666,17 @@ func (r *testReconciler) expectNoRequest(t *testing.T, duration time.Duration) { func (r *testReconciler) wait(t *testing.T) (controller.Runtime, controller.Request) { t.Helper() + return r.waitFor(t, 500*time.Millisecond) +} + +func (r *testReconciler) waitFor(t *testing.T, duration time.Duration) (controller.Runtime, controller.Request) { + t.Helper() var args requestArgs select { case args = <-r.calls: - case <-time.After(500 * time.Millisecond): - t.Fatal("Reconcile was not called after 500ms") + case <-time.After(duration): + t.Fatalf("Reconcile was not called after %v", duration) } return args.rt, args.req } diff --git a/internal/controller/runner.go b/internal/controller/runner.go index 7092654b71..6ec2d3568f 100644 --- a/internal/controller/runner.go +++ b/internal/controller/runner.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "math/rand" "time" "golang.org/x/sync/errgroup" @@ -66,34 +67,34 @@ func newControllerRunner(c *Controller, client pbresource.ResourceServiceClient, } } -func (c *controllerRunner) run(ctx context.Context) error { - c.logger.Debug("controller running") - defer c.logger.Debug("controller stopping") +func (cr *controllerRunner) run(ctx context.Context) error { + cr.logger.Debug("controller running") + defer cr.logger.Debug("controller stopping") // Initialize the controller if required - if c.ctrl.initializer != nil { - c.logger.Debug("controller initializing") - err := c.ctrl.initializer.Initialize(ctx, c.runtime(c.logger)) + if cr.ctrl.initializer != nil { + cr.logger.Debug("controller initializing") + err := cr.ctrl.initializer.Initialize(ctx, cr.runtime(cr.logger)) if err != nil { return err } - c.logger.Debug("controller initialized") + cr.logger.Debug("controller initialized") } - c.cache = c.ctrl.buildCache() + cr.cache = cr.ctrl.buildCache() defer func() { // once no longer running we should nil out the cache // so that we don't hold pointers to resources which may // become out of date in the future. - c.cache = nil + cr.cache = nil }() - if c.ctrl.startCb != nil { - c.ctrl.startCb(ctx, c.runtime(c.logger)) + if cr.ctrl.startCb != nil { + cr.ctrl.startCb(ctx, cr.runtime(cr.logger)) } - if c.ctrl.stopCb != nil { - defer c.ctrl.stopCb(ctx, c.runtime(c.logger)) + if cr.ctrl.stopCb != nil { + defer cr.ctrl.stopCb(ctx, cr.runtime(cr.logger)) } // Before we launch the reconciler or the dependency mappers, ensure the @@ -102,76 +103,76 @@ func (c *controllerRunner) run(ctx context.Context) error { // Without doing this the cache is unsafe for general use without causing // reconcile regressions in certain cases. { - c.logger.Debug("priming caches") + cr.logger.Debug("priming caches") primeGroup, primeGroupCtx := errgroup.WithContext(ctx) // Managed Type Events primeGroup.Go(func() error { - return c.primeCache(primeGroupCtx, c.ctrl.managedTypeWatch.watchedType) + return cr.primeCache(primeGroupCtx, cr.ctrl.managedTypeWatch.watchedType) }) - for _, w := range c.ctrl.watches { + for _, w := range cr.ctrl.watches { watcher := w // Watched Type Events primeGroup.Go(func() error { - return c.primeCache(primeGroupCtx, watcher.watchedType) + return cr.primeCache(primeGroupCtx, watcher.watchedType) }) } if err := primeGroup.Wait(); err != nil { return err } - c.logger.Debug("priming caches complete") + cr.logger.Debug("priming caches complete") } group, groupCtx := errgroup.WithContext(ctx) - recQueue := runQueue[Request](groupCtx, c.ctrl) + recQueue := runQueue[Request](groupCtx, cr.ctrl) - // Managed Type Events → Reconciliation Queue + // Managed Type Events → Managed Type Reconciliation Queue group.Go(func() error { - return c.watch(groupCtx, c.ctrl.managedTypeWatch.watchedType, func(res *pbresource.Resource) { + return cr.watch(groupCtx, cr.ctrl.managedTypeWatch.watchedType, func(res *pbresource.Resource) { recQueue.Add(Request{ID: res.Id}) }) }) - for _, w := range c.ctrl.watches { - mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl) + for _, w := range cr.ctrl.watches { + mapQueue := runQueue[mapperRequest](groupCtx, cr.ctrl) watcher := w - // Watched Type Events → Mapper Queue + // Watched Type Events → Watched Type Mapper Queue group.Go(func() error { - return c.watch(groupCtx, watcher.watchedType, func(res *pbresource.Resource) { + return cr.watch(groupCtx, watcher.watchedType, func(res *pbresource.Resource) { mapQueue.Add(mapperRequest{res: res}) }) }) - // Mapper Queue → Mapper → Reconciliation Queue + // Watched Type Mapper Queue → Watched Type Mapper → Managed Type Reconciliation Queue group.Go(func() error { - return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { + return cr.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { return watcher.mapper(ctx, runtime, itemType.(mapperRequest).res) }) }) } - for _, cw := range c.ctrl.customWatches { - customMapQueue := runQueue[Event](groupCtx, c.ctrl) + for _, cw := range cr.ctrl.customWatches { + customMapQueue := runQueue[Event](groupCtx, cr.ctrl) watcher := cw - // Custom Events → Mapper Queue + // Custom Events → Custom Mapper Queue group.Go(func() error { return watcher.source.Watch(groupCtx, func(e Event) { customMapQueue.Add(e) }) }) - // Mapper Queue → Mapper → Reconciliation Queue + // Custom Mapper Queue → Custom Dependency Mapper → Managed Type Reconciliation Queue group.Go(func() error { - return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { + return cr.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { return watcher.mapper(ctx, runtime, itemType.(Event)) }) }) } - // Reconciliation Queue → Reconciler + // Managed Type Reconciliation Queue → Reconciler group.Go(func() error { - return c.runReconciler(groupCtx, recQueue) + return cr.runReconciler(groupCtx, recQueue) }) return group.Wait() @@ -182,8 +183,8 @@ func runQueue[T queue.ItemType](ctx context.Context, ctrl *Controller) queue.Wor return queue.RunWorkQueue[T](ctx, base, max) } -func (c *controllerRunner) primeCache(ctx context.Context, typ *pbresource.Type) error { - wl, err := c.watchClient.WatchList(ctx, &pbresource.WatchListRequest{ +func (cr *controllerRunner) primeCache(ctx context.Context, typ *pbresource.Type) error { + wl, err := cr.watchClient.WatchList(ctx, &pbresource.WatchListRequest{ Type: typ, Tenancy: &pbresource.Tenancy{ Partition: storage.Wildcard, @@ -191,36 +192,36 @@ func (c *controllerRunner) primeCache(ctx context.Context, typ *pbresource.Type) }, }) if err != nil { - c.handleInvalidControllerWatch(err) - c.logger.Error("failed to create cache priming watch", "error", err) + cr.handleInvalidControllerWatch(err) + cr.logger.Error("failed to create cache priming watch", "error", err) return err } for { event, err := wl.Recv() if err != nil { - c.handleInvalidControllerWatch(err) - c.logger.Warn("error received from cache priming watch", "error", err) + cr.handleInvalidControllerWatch(err) + cr.logger.Warn("error received from cache priming watch", "error", err) return err } switch { case event.GetUpsert() != nil: - c.cache.Insert(event.GetUpsert().Resource) + cr.cache.Insert(event.GetUpsert().Resource) case event.GetDelete() != nil: - c.cache.Delete(event.GetDelete().Resource) + cr.cache.Delete(event.GetDelete().Resource) case event.GetEndOfSnapshot() != nil: // This concludes the initial snapshot. The cache is primed. return nil default: - c.logger.Warn("skipping unexpected event type", "type", hclog.Fmt("%T", event.GetEvent())) + cr.logger.Warn("skipping unexpected event type", "type", hclog.Fmt("%T", event.GetEvent())) continue } } } -func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error { - wl, err := c.watchClient.WatchList(ctx, &pbresource.WatchListRequest{ +func (cr *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error { + wl, err := cr.watchClient.WatchList(ctx, &pbresource.WatchListRequest{ Type: typ, Tenancy: &pbresource.Tenancy{ Partition: storage.Wildcard, @@ -228,35 +229,35 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add }, }) if err != nil { - c.handleInvalidControllerWatch(err) - c.logger.Error("failed to create watch", "error", err) + cr.handleInvalidControllerWatch(err) + cr.logger.Error("failed to create watch", "error", err) return err } for { event, err := wl.Recv() if err != nil { - c.handleInvalidControllerWatch(err) - c.logger.Warn("error received from watch", "error", err) + cr.handleInvalidControllerWatch(err) + cr.logger.Warn("error received from watch", "error", err) return err } // Keep the cache up to date. There main reason to do this here is - // to ensure that any mapper/reconciliation queue deduping wont + // to ensure that any mapper/reconciliation queue deduping won't // hide events from being observed and updating the cache state. // Therefore we should do this before any queueing. var resource *pbresource.Resource switch { case event.GetUpsert() != nil: resource = event.GetUpsert().GetResource() - c.cache.Insert(resource) + cr.cache.Insert(resource) case event.GetDelete() != nil: resource = event.GetDelete().GetResource() - c.cache.Delete(resource) + cr.cache.Delete(resource) case event.GetEndOfSnapshot() != nil: continue // ignore default: - c.logger.Warn("skipping unexpected event type", "type", hclog.Fmt("%T", event.GetEvent())) + cr.logger.Warn("skipping unexpected event type", "type", hclog.Fmt("%T", event.GetEvent())) continue } @@ -272,14 +273,14 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add } } -func (c *controllerRunner) runMapper( +func (cr *controllerRunner) runMapper( ctx context.Context, w *watch, from queue.WorkQueue[mapperRequest], to queue.WorkQueue[Request], mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), ) error { - logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType)) + logger := cr.logger.With("watched_resource_type", resource.ToGVK(w.watchedType)) for { item, shutdown := from.Get() @@ -287,7 +288,7 @@ func (c *controllerRunner) runMapper( return nil } - if err := c.doMap(ctx, mapper, to, item, logger); err != nil { + if err := cr.doMap(ctx, mapper, to, item, logger); err != nil { from.AddRateLimited(item) from.Done(item) continue @@ -298,14 +299,14 @@ func (c *controllerRunner) runMapper( } } -func (c *controllerRunner) runCustomMapper( +func (cr *controllerRunner) runCustomMapper( ctx context.Context, cw customWatch, from queue.WorkQueue[Event], to queue.WorkQueue[Request], mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), ) error { - logger := c.logger.With("watched_event", cw.source) + logger := cr.logger.With("watched_event", cw.source) for { item, shutdown := from.Get() @@ -313,7 +314,7 @@ func (c *controllerRunner) runCustomMapper( return nil } - if err := c.doMap(ctx, mapper, to, item, logger); err != nil { + if err := cr.doMap(ctx, mapper, to, item, logger); err != nil { from.AddRateLimited(item) from.Done(item) continue @@ -324,20 +325,20 @@ func (c *controllerRunner) runCustomMapper( } } -func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error { +func (cr *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error { var reqs []Request - if err := c.handlePanic(func() error { + if err := cr.handlePanic(func() error { var err error - reqs, err = mapper(ctx, c.runtime(logger.With("map-request-key", item.Key())), item) + reqs, err = mapper(ctx, cr.runtime(logger.With("map-request-key", item.Key())), item) return err }); err != nil { return err } for _, r := range reqs { - if !resource.EqualType(r.ID.Type, c.ctrl.managedTypeWatch.watchedType) { + if !resource.EqualType(r.ID.Type, cr.ctrl.managedTypeWatch.watchedType) { logger.Error("dependency mapper returned request for a resource of the wrong type", - "type_expected", resource.ToGVK(c.ctrl.managedTypeWatch.watchedType), + "type_expected", resource.ToGVK(cr.ctrl.managedTypeWatch.watchedType), "type_got", resource.ToGVK(r.ID.Type), ) continue @@ -347,24 +348,62 @@ func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Co return nil } -func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error { +// maybeScheduleForcedReconcile makes sure that a "reconcile the world" happens periodically for the +// controller's managed type. +func (cr *controllerRunner) maybeScheduleForcedReconcile(queue queue.WorkQueue[Request], req Request) { + // In order to periodically "reconcile the world", we schedule a deferred reconcile request + // (aka forced reconcile) minus a sizeable random jitter to avoid a thundering herd. + // + // A few notes on how this integrates with existing non-"reconcile the world" requests: + // + // 1. Successful reconciles result in a new deferred "reconcile the world" request being scheduled. + // The net effect is that the managed type will be continually reconciled regardless of any updates. + // 2. Failed reconciles are re-queued with a rate limit and get added to the deferred reconcile queue. + // Any existing deferred "reconcile the world" request will be replaced by the rate-limited deferred + // request. + // 3. An existing deferred "reconcile the world" request can't be removed on the successful reconcile + // of a delete operation. We rely on controller idempotency to eventually process the deferred request + // as a no-op. + _, err := cr.runtimeClient.Read(context.Background(), &pbresource.ReadRequest{Id: req.ID}) + switch { + case err != nil && status.Code(err) == codes.NotFound: + // Resource was deleted -> nothing to force reconcile so do nothing + return + default: + // Reconcile of resource upsert was successful or we had an unexpected + // error. In either case, we should schedule a forced reconcile for completeness. + scheduleAt := reduceByRandomJitter(cr.ctrl.forceReconcileEvery) + queue.AddAfter(req, scheduleAt, true) + } +} + +// reduceByRandomJitter returns a duration reduced by a random amount up to 20%. +func reduceByRandomJitter(d time.Duration) time.Duration { + percent := rand.Float64() * 0.2 + reduction := time.Duration(float64(d) * percent) + return d - reduction +} + +func (cr *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error { for { req, shutdown := queue.Get() if shutdown { return nil } - c.logger.Trace("handling request", "request", req) - err := c.handlePanic(func() error { - return c.ctrl.reconciler.Reconcile(ctx, c.runtime(c.logger.With("resource-id", req.ID.String())), req) + cr.logger.Trace("handling request", "request", req) + err := cr.handlePanic(func() error { + return cr.ctrl.reconciler.Reconcile(ctx, cr.runtime(cr.logger.With("resource-id", req.ID.String())), req) }) if err == nil { queue.Forget(req) + cr.maybeScheduleForcedReconcile(queue, req) } else { + cr.logger.Trace("post-processing reconcile failure") var requeueAfter RequeueAfterError if errors.As(err, &requeueAfter) { queue.Forget(req) - queue.AddAfter(req, time.Duration(requeueAfter)) + queue.AddAfter(req, time.Duration(requeueAfter), false) } else { queue.AddRateLimited(req) } @@ -373,11 +412,11 @@ func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQu } } -func (c *controllerRunner) handlePanic(fn func() error) (err error) { +func (cr *controllerRunner) handlePanic(fn func() error) (err error) { defer func() { if r := recover(); r != nil { stack := hclog.Stacktrace() - c.logger.Error("controller panic", + cr.logger.Error("controller panic", "panic", r, "stack", stack, ) @@ -389,27 +428,30 @@ func (c *controllerRunner) handlePanic(fn func() error) (err error) { return fn() } -func (c *controllerRunner) runtime(logger hclog.Logger) Runtime { +func (cr *controllerRunner) runtime(logger hclog.Logger) Runtime { return Runtime{ // dependency mappers and controllers are always given the cloning client // so that they do not have to care about mutating values that they read // through the client. - Client: c.runtimeClient, + Client: cr.runtimeClient, Logger: logger, // ensure that resources queried via the cache get cloned so that the // dependency mapper or reconciler is free to modify them. - Cache: cache.NewCloningReadOnlyCache(c.cache), + Cache: cache.NewCloningReadOnlyCache(cr.cache), } } -func (c *controllerRunner) handleInvalidControllerWatch(err error) { +func (cr *controllerRunner) handleInvalidControllerWatch(err error) { st, ok := status.FromError(err) if ok && st.Code() == codes.InvalidArgument { - panic(fmt.Sprintf("controller %s attempted to initiate an invalid watch: %q. This is a bug within the controller.", c.ctrl.name, err.Error())) + panic(fmt.Sprintf("controller %s attempted to initiate an invalid watch: %q. This is a bug within the controller.", cr.ctrl.name, err.Error())) } } -type mapperRequest struct{ res *pbresource.Resource } +type mapperRequest struct { + // res is the resource that was watched and is being mapped. + res *pbresource.Resource +} // Key satisfies the queue.ItemType interface. It returns a string which will be // used to de-duplicate requests in the queue.