diff --git a/agent/consul/controller/controller.go b/agent/consul/controller/controller.go new file mode 100644 index 0000000000..03a95e2122 --- /dev/null +++ b/agent/consul/controller/controller.go @@ -0,0 +1,289 @@ +package controller + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" + + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" + "golang.org/x/sync/errgroup" +) + +// much of this is a re-implementation of +// https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.13/pkg/internal/controller/controller.go + +// Transformer is a function that takes one type of config entry that has changed +// and transforms that into a set of reconciliation requests to enqueue. +type Transformer func(entry structs.ConfigEntry) []Request + +// Controller subscribes to a set of watched resources from the +// state store and delegates processing them to a given Reconciler. +// If a Reconciler errors while processing a Request, then the +// Controller handles rescheduling the Request to be re-processed. +type Controller interface { + // Run begins the Controller's main processing loop. When the given + // context is canceled, the Controller stops processing any remaining work. + // The Run function should only ever be called once. + Run(ctx context.Context) error + // Subscribe tells the controller to subscribe to updates for config entries based + // on the given request. Optional transformation functions can also be passed in + // to Subscribe, allowing a controller to map a config entry to a different type of + // request under the hood (i.e. watching a dependency and triggering a Reconcile on + // the dependent resource). This should only ever be called prior to calling Run. + Subscribe(request *stream.SubscribeRequest, transformers ...Transformer) Controller + // WithBackoff changes the base and maximum backoff values for the Controller's + // Request retry rate limiter. This should only ever be called prior to + // running Run. + WithBackoff(base, max time.Duration) Controller + // WithWorkers sets the number of worker goroutines used to process the queue + // this defaults to 1 goroutine. + WithWorkers(i int) Controller + // WithQueueFactory allows a Controller to replace its underlying work queue + // implementation. This is most useful for testing. This should only ever be called + // prior to running Run. + WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller +} + +var _ Controller = &controller{} + +type subscription struct { + request *stream.SubscribeRequest + transformers []Transformer +} + +// controller implements the Controller interface +type controller struct { + // reconciler is the Reconciler that processes all subscribed + // Requests + reconciler Reconciler + + // makeQueue is the factory used for creating the work queue, generally + // this shouldn't be touched, but can be updated for testing purposes + makeQueue func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue + // workers is the number of workers to use to process data + workers int + // work is the internal work queue that pending Requests are added to + work WorkQueue + // baseBackoff is the starting backoff time for the work queue's rate limiter + baseBackoff time.Duration + // maxBackoff is the maximum backoff time for the work queue's rate limiter + maxBackoff time.Duration + + // subscriptions is a list of subscription requests for retrieving configuration entries + subscriptions []subscription + // publisher is the event publisher that should be subscribed to for any updates + publisher state.EventPublisher + + // running ensures that we are only calling Run a single time + running int32 +} + +// New returns a new Controller associated with the given state store and reconciler. +func New(publisher state.EventPublisher, reconciler Reconciler) Controller { + return &controller{ + reconciler: reconciler, + publisher: publisher, + workers: 1, + baseBackoff: 5 * time.Millisecond, + maxBackoff: 1000 * time.Second, + makeQueue: RunWorkQueue, + } +} + +// Subscribe tells the controller to subscribe to updates for config entries of the +// given kind and with the associated enterprise metadata. This should only ever be +// called prior to running Start. +func (c *controller) Subscribe(request *stream.SubscribeRequest, transformers ...Transformer) Controller { + c.ensureNotRunning() + + c.subscriptions = append(c.subscriptions, subscription{ + request: request, + transformers: transformers, + }) + return c +} + +// WithBackoff changes the base and maximum backoff values for the Controller's +// Request retry rate limiter. This should only ever be called prior to +// running Start. +func (c *controller) WithBackoff(base, max time.Duration) Controller { + c.ensureNotRunning() + + c.baseBackoff = base + c.maxBackoff = max + return c +} + +// WithWorkers sets the number of worker goroutines used to process the queue +// this defaults to 1 goroutine. +func (c *controller) WithWorkers(i int) Controller { + c.ensureNotRunning() + + if i <= 0 { + i = 1 + } + c.workers = i + return c +} + +// WithQueueFactory changes the initialization method for the Controller's work +// queue, this is predominantly just used for testing. This should only ever be called +// prior to running Start. +func (c *controller) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller { + c.ensureNotRunning() + + c.makeQueue = fn + return c +} + +// ensureNotRunning makes sure we aren't trying to reconfigure an already +// running controller, it panics if Run has already been invoked +func (c *controller) ensureNotRunning() { + if atomic.LoadInt32(&c.running) == 1 { + panic("cannot configure controller once Run is called") + } +} + +// Run begins the Controller's main processing loop. When the given +// context is canceled, the Controller stops processing any remaining work. +// The Run function should only ever be called once, calling it multiple +// times will result in a panic. +func (c *controller) Run(ctx context.Context) error { + if !atomic.CompareAndSwapInt32(&c.running, 0, 1) { + panic("Run cannot be called more than once") + } + + group, groupCtx := errgroup.WithContext(ctx) + + // set up our queue + c.work = c.makeQueue(groupCtx, c.baseBackoff, c.maxBackoff) + + for _, sub := range c.subscriptions { + // store a reference for the closure + sub := sub + group.Go(func() error { + var index uint64 + + subscription, err := c.publisher.Subscribe(sub.request) + if err != nil { + return err + } + defer subscription.Unsubscribe() + + for { + event, err := subscription.Next(ctx) + switch { + case errors.Is(err, context.Canceled): + return nil + case err != nil: + return err + } + + if event.IsFramingEvent() { + continue + } + + if event.Index <= index { + continue + } + + index = event.Index + + if err := c.processEvent(sub, event); err != nil { + return err + } + } + }) + } + + for i := 0; i < c.workers; i++ { + group.Go(func() error { + for { + request, shutdown := c.work.Get() + if shutdown { + // Stop working + return nil + } + c.reconcileHandler(groupCtx, request) + // Done is called here because it is required to be called + // when we've finished processing each request + c.work.Done(request) + } + }) + } + + <-groupCtx.Done() + return nil +} + +func (c *controller) processEvent(sub subscription, event stream.Event) error { + switch payload := event.Payload.(type) { + case state.EventPayloadConfigEntry: + c.enqueueEntry(payload.Value, sub.transformers...) + return nil + case *stream.PayloadEvents: + for _, event := range payload.Items { + if err := c.processEvent(sub, event); err != nil { + return err + } + } + return nil + default: + return fmt.Errorf("unhandled event type: %T", payload) + } +} + +// enqueueEntry adds all of the given entry into the work queue. If given +// one or more transformation functions, it will enqueue all of the resulting +// reconciliation requests returned from each Transformer. +func (c *controller) enqueueEntry(entry structs.ConfigEntry, transformers ...Transformer) { + if len(transformers) == 0 { + c.work.Add(Request{ + Kind: entry.GetKind(), + Name: entry.GetName(), + Meta: entry.GetEnterpriseMeta(), + }) + } else { + for _, fn := range transformers { + for _, request := range fn(entry) { + c.work.Add(request) + } + } + } +} + +// reconcile wraps the reconciler in a panic handler +func (c *controller) reconcile(ctx context.Context, req Request) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic [recovered]: %v", r) + return + } + }() + return c.reconciler.Reconcile(ctx, req) +} + +// reconcileHandler invokes the reconciler and looks at its return value +// to determine whether the request should be rescheduled +func (c *controller) reconcileHandler(ctx context.Context, req Request) { + if err := c.reconcile(ctx, req); err != nil { + // handle the case where we're specifically told to requeue later + var requeueAfter RequeueAfterError + if errors.As(err, &requeueAfter) { + c.work.Forget(req) + c.work.AddAfter(req, time.Duration(requeueAfter)) + return + } + + // fallback to rate limit ourselves + c.work.AddRateLimited(req) + return + } + + // if no error then Forget this request so it is not retried + c.work.Forget(req) +} diff --git a/agent/consul/controller/controller_test.go b/agent/consul/controller/controller_test.go new file mode 100644 index 0000000000..d71b7403a7 --- /dev/null +++ b/agent/consul/controller/controller_test.go @@ -0,0 +1,273 @@ +package controller + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/hashicorp/consul/agent/consul/fsm" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" +) + +func TestBasicController(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + reconciler := newTestReconciler(false) + + publisher := stream.NewEventPublisher(1 * time.Millisecond) + go publisher.Run(ctx) + + // get the store through the FSM since the publisher handlers get registered through it + store := fsm.NewFromDeps(fsm.Deps{ + Logger: hclog.New(nil), + NewStateStore: func() *state.Store { + return state.NewStateStoreWithEventPublisher(nil, publisher) + }, + Publisher: publisher, + }).State() + + for i := 0; i < 200; i++ { + entryIndex := uint64(i + 1) + name := fmt.Sprintf("foo-%d", i) + require.NoError(t, store.EnsureConfigEntry(entryIndex, &structs.IngressGatewayConfigEntry{ + Kind: structs.IngressGateway, + Name: name, + })) + } + + go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{ + Topic: state.EventTopicIngressGateway, + Subject: stream.SubjectWildcard, + }).WithWorkers(10).Run(ctx) + + received := []string{} +LOOP: + for { + select { + case request := <-reconciler.received: + require.Equal(t, structs.IngressGateway, request.Kind) + received = append(received, request.Name) + if len(received) == 200 { + break LOOP + } + case <-ctx.Done(): + break LOOP + } + } + + // since we only modified each entry once, we should have exactly 200 reconcliation calls + require.Len(t, received, 200) + for i := 0; i < 200; i++ { + require.Contains(t, received, fmt.Sprintf("foo-%d", i)) + } +} + +func TestBasicController_Transform(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + reconciler := newTestReconciler(false) + + publisher := stream.NewEventPublisher(0) + go publisher.Run(ctx) + + // get the store through the FSM since the publisher handlers get registered through it + store := fsm.NewFromDeps(fsm.Deps{ + Logger: hclog.New(nil), + NewStateStore: func() *state.Store { + return state.NewStateStoreWithEventPublisher(nil, publisher) + }, + Publisher: publisher, + }).State() + + go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{ + Topic: state.EventTopicIngressGateway, + Subject: stream.SubjectWildcard, + }, func(entry structs.ConfigEntry) []Request { + return []Request{{ + Kind: "foo", + Name: "bar", + }} + }).Run(ctx) + + require.NoError(t, store.EnsureConfigEntry(1, &structs.IngressGatewayConfigEntry{ + Kind: structs.IngressGateway, + Name: "test", + })) + + select { + case request := <-reconciler.received: + require.Equal(t, "foo", request.Kind) + require.Equal(t, "bar", request.Name) + case <-ctx.Done(): + t.Fatal("stopped reconciler before event received") + } +} + +func TestBasicController_Retry(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + reconciler := newTestReconciler(true) + defer reconciler.stop() + + publisher := stream.NewEventPublisher(0) + go publisher.Run(ctx) + + // get the store through the FSM since the publisher handlers get registered through it + store := fsm.NewFromDeps(fsm.Deps{ + Logger: hclog.New(nil), + NewStateStore: func() *state.Store { + return state.NewStateStoreWithEventPublisher(nil, publisher) + }, + Publisher: publisher, + }).State() + + queueInitialized := make(chan *countingWorkQueue) + controller := New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{ + Topic: state.EventTopicIngressGateway, + Subject: stream.SubjectWildcard, + }).WithWorkers(-1).WithBackoff(1*time.Millisecond, 1*time.Millisecond) + go controller.WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue { + queue := newCountingWorkQueue(RunWorkQueue(ctx, baseBackoff, maxBackoff)) + queueInitialized <- queue + return queue + }).Run(ctx) + + queue := <-queueInitialized + + ensureCalled := func(request chan Request, name string) bool { + // give a short window for our counters to update + defer time.Sleep(10 * time.Millisecond) + select { + case req := <-request: + require.Equal(t, structs.IngressGateway, req.Kind) + require.Equal(t, name, req.Name) + return true + case <-time.After(10 * time.Millisecond): + return false + } + } + + // check to make sure we are called once + queue.reset() + require.NoError(t, store.EnsureConfigEntry(1, &structs.IngressGatewayConfigEntry{ + Kind: structs.IngressGateway, + Name: "foo-1", + })) + require.False(t, ensureCalled(reconciler.received, "foo-1")) + require.EqualValues(t, 0, queue.dones()) + require.EqualValues(t, 0, queue.requeues()) + reconciler.step() + require.True(t, ensureCalled(reconciler.received, "foo-1")) + require.EqualValues(t, 1, queue.dones()) + require.EqualValues(t, 0, queue.requeues()) + + // check that we requeue when an arbitrary error occurs + queue.reset() + reconciler.setResponse(errors.New("error")) + require.NoError(t, store.EnsureConfigEntry(2, &structs.IngressGatewayConfigEntry{ + Kind: structs.IngressGateway, + Name: "foo-2", + })) + require.False(t, ensureCalled(reconciler.received, "foo-2")) + require.EqualValues(t, 0, queue.dones()) + require.EqualValues(t, 0, queue.requeues()) + require.EqualValues(t, 0, queue.addRateLimiteds()) + reconciler.step() + // check we're processed the first time and re-queued + require.True(t, ensureCalled(reconciler.received, "foo-2")) + require.EqualValues(t, 1, queue.dones()) + require.EqualValues(t, 1, queue.requeues()) + require.EqualValues(t, 1, queue.addRateLimiteds()) + // now make sure we succeed + reconciler.setResponse(nil) + reconciler.step() + require.True(t, ensureCalled(reconciler.received, "foo-2")) + require.EqualValues(t, 2, queue.dones()) + require.EqualValues(t, 1, queue.requeues()) + require.EqualValues(t, 1, queue.addRateLimiteds()) + + // check that we requeue at a given rate when using a RequeueAfterError + queue.reset() + reconciler.setResponse(RequeueNow()) + require.NoError(t, store.EnsureConfigEntry(3, &structs.IngressGatewayConfigEntry{ + Kind: structs.IngressGateway, + Name: "foo-3", + })) + require.False(t, ensureCalled(reconciler.received, "foo-3")) + require.EqualValues(t, 0, queue.dones()) + require.EqualValues(t, 0, queue.requeues()) + require.EqualValues(t, 0, queue.addRateLimiteds()) + reconciler.step() + // check we're processed the first time and re-queued + require.True(t, ensureCalled(reconciler.received, "foo-3")) + require.EqualValues(t, 1, queue.dones()) + require.EqualValues(t, 1, queue.requeues()) + require.EqualValues(t, 1, queue.addAfters()) + // now make sure we succeed + reconciler.setResponse(nil) + reconciler.step() + require.True(t, ensureCalled(reconciler.received, "foo-3")) + require.EqualValues(t, 2, queue.dones()) + require.EqualValues(t, 1, queue.requeues()) + require.EqualValues(t, 1, queue.addAfters()) +} + +func TestBasicController_RunPanicAssertions(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + started := make(chan struct{}) + reconciler := newTestReconciler(false) + publisher := stream.NewEventPublisher(0) + controller := New(publisher, reconciler).WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue { + close(started) + return RunWorkQueue(ctx, baseBackoff, maxBackoff) + }) + subscription := &stream.SubscribeRequest{ + Topic: state.EventTopicIngressGateway, + Subject: stream.SubjectWildcard, + } + + // kick off the controller + go controller.Subscribe(subscription).Run(ctx) + + // wait to make sure the following assertions don't + // get run before the above goroutine is spawned + <-started + + // make sure we can't call Run again + require.Panics(t, func() { + controller.Run(ctx) + }) + + // make sure all of our configuration methods panic + require.Panics(t, func() { + controller.Subscribe(subscription) + }) + require.Panics(t, func() { + controller.WithBackoff(1, 10) + }) + require.Panics(t, func() { + controller.WithWorkers(1) + }) + require.Panics(t, func() { + controller.WithQueueFactory(RunWorkQueue) + }) +} diff --git a/agent/consul/controller/defer.go b/agent/consul/controller/defer.go new file mode 100644 index 0000000000..398373b0bd --- /dev/null +++ b/agent/consul/controller/defer.go @@ -0,0 +1,224 @@ +package controller + +import ( + "container/heap" + "context" + "time" +) + +// much of this is a re-implementation of +// https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/delaying_queue.go + +// DeferQueue is a generic priority queue implementation that +// allows for deferring and later processing Requests. +type DeferQueue interface { + // Defer defers processing a Request until a given time. When + // the timeout is hit, the request will be processed by the + // callback given in the Process loop. If the given context + // is canceled, the item is not deferred. + Defer(ctx context.Context, item Request, until time.Time) + // Process processes all items in the defer queue with the + // given callback, blocking until the given context is canceled. + // Callers should only ever call Process once, likely in a + // long-lived goroutine. + Process(ctx context.Context, callback func(item Request)) +} + +// deferredRequest is a wrapped Request with information about +// when a retry should be attempted +type deferredRequest struct { + enqueueAt time.Time + item Request + // index holds the index for the given heap entry so that if + // the entry is updated the heap can be re-sorted + index int +} + +// deferQueue is a priority queue for deferring Requests for +// future processing +type deferQueue struct { + heap *deferHeap + entries map[Request]*deferredRequest + + addChannel chan *deferredRequest + heartbeat *time.Ticker + nextReadyTimer *time.Timer +} + +// NewDeferQueue returns a priority queue for deferred Requests. +func NewDeferQueue(tick time.Duration) DeferQueue { + dHeap := &deferHeap{} + heap.Init(dHeap) + + return &deferQueue{ + heap: dHeap, + entries: make(map[Request]*deferredRequest), + addChannel: make(chan *deferredRequest), + heartbeat: time.NewTicker(tick), + } +} + +// Defer defers the given Request until the given time in the future. If the +// passed in context is canceled before the Request is deferred, then this +// immediately returns. +func (q *deferQueue) Defer(ctx context.Context, item Request, until time.Time) { + entry := &deferredRequest{ + enqueueAt: until, + item: item, + } + + select { + case <-ctx.Done(): + case q.addChannel <- entry: + } +} + +// deferEntry adds a deferred request to the priority queue +func (q *deferQueue) deferEntry(entry *deferredRequest) { + existing, exists := q.entries[entry.item] + if exists { + // insert or update the item deferral time + if existing.enqueueAt.After(entry.enqueueAt) { + existing.enqueueAt = entry.enqueueAt + heap.Fix(q.heap, existing.index) + } + + return + } + + heap.Push(q.heap, entry) + q.entries[entry.item] = entry +} + +// readyRequest returns a pointer to the next ready Request or +// nil if no Requests are ready to be processed +func (q *deferQueue) readyRequest() *Request { + if q.heap.Len() == 0 { + return nil + } + + now := time.Now() + + entry := q.heap.Peek().(*deferredRequest) + if entry.enqueueAt.After(now) { + return nil + } + + entry = heap.Pop(q.heap).(*deferredRequest) + delete(q.entries, entry.item) + return &entry.item +} + +// signalReady returns a timer signal to the next Request +// that will be ready on the queue +func (q *deferQueue) signalReady() <-chan time.Time { + if q.heap.Len() == 0 { + return make(<-chan time.Time) + } + + if q.nextReadyTimer != nil { + q.nextReadyTimer.Stop() + } + now := time.Now() + entry := q.heap.Peek().(*deferredRequest) + q.nextReadyTimer = time.NewTimer(entry.enqueueAt.Sub(now)) + return q.nextReadyTimer.C +} + +// Process processes all items in the defer queue with the +// given callback, blocking until the given context is canceled. +// Callers should only ever call Process once, likely in a +// long-lived goroutine. +func (q *deferQueue) Process(ctx context.Context, callback func(item Request)) { + for { + ready := q.readyRequest() + if ready != nil { + callback(*ready) + } + + signalReady := q.signalReady() + + select { + case <-ctx.Done(): + if q.nextReadyTimer != nil { + q.nextReadyTimer.Stop() + } + q.heartbeat.Stop() + return + + case <-q.heartbeat.C: + // continue the loop, which process ready items + + case <-signalReady: + // continue the loop, which process ready items + + case entry := <-q.addChannel: + enqueueOrProcess := func(entry *deferredRequest) { + now := time.Now() + if entry.enqueueAt.After(now) { + q.deferEntry(entry) + } else { + // fast-path, process immediately if we don't need to defer + callback(entry.item) + } + } + + enqueueOrProcess(entry) + + // drain the add channel before we do anything else + drained := false + for !drained { + select { + case entry := <-q.addChannel: + enqueueOrProcess(entry) + default: + drained = true + } + } + } + } +} + +var _ heap.Interface = &deferHeap{} + +// deferHeap implements heap.Interface +type deferHeap []*deferredRequest + +// Len returns the length of the heap. +func (h deferHeap) Len() int { + return len(h) +} + +// Less compares heap items for purposes of sorting. +func (h deferHeap) Less(i, j int) bool { + return h[i].enqueueAt.Before(h[j].enqueueAt) +} + +// Swap swaps two entries in the heap. +func (h deferHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} + +// Push pushes an entry onto the heap. +func (h *deferHeap) Push(x interface{}) { + n := len(*h) + item := x.(*deferredRequest) + item.index = n + *h = append(*h, item) +} + +// Pop pops an entry off the heap. +func (h *deferHeap) Pop() interface{} { + n := len(*h) + item := (*h)[n-1] + item.index = -1 + *h = (*h)[0:(n - 1)] + return item +} + +// Peek returns the next item on the heap. +func (h deferHeap) Peek() interface{} { + return h[0] +} diff --git a/agent/consul/controller/doc.go b/agent/consul/controller/doc.go new file mode 100644 index 0000000000..ca40814e3c --- /dev/null +++ b/agent/consul/controller/doc.go @@ -0,0 +1,10 @@ +// Package controller contains a re-implementation of the Kubernetes +// [controller-runtime](https://github.com/kubernetes-sigs/controller-runtime) +// with the core using Consul's event publishing pipeline rather than +// Kubernetes' client list/watch APIs. +// +// Generally this package enables defining asynchronous control loops +// meant to be run on a Consul cluster's leader that reconcile derived state +// in config entries that might be dependent on multiple sources. + +package controller diff --git a/agent/consul/controller/queue.go b/agent/consul/controller/queue.go new file mode 100644 index 0000000000..d86c5c671b --- /dev/null +++ b/agent/consul/controller/queue.go @@ -0,0 +1,176 @@ +package controller + +import ( + "context" + "sync" + "time" +) + +// much of this is a re-implementation of +// https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/queue.go + +// WorkQueue is an interface for a work queue with semantics to help with +// retries and rate limiting. +type WorkQueue interface { + // Get retrieves the next Request in the queue, blocking until a Request is + // available, if shutdown is true, then the queue is shutting down and should + // no longer be used by the caller. + Get() (item Request, shutdown bool) + // Add immediately adds a Request to the work queue. + Add(item Request) + // AddAfter adds a Request to the work queue after a given amount of time. + AddAfter(item Request, duration time.Duration) + // AddRateLimited adds a Request to the work queue after the amount of time + // specified by applying the queue's rate limiter. + AddRateLimited(item Request) + // Forget signals the queue to reset the rate-limiting for the given Request. + Forget(item Request) + // Done tells the work queue that the Request has been successfully processed + // and can be deleted from the queue. + Done(item Request) +} + +// queue implements a rate-limited work queue +type queue struct { + // queue holds an ordered list of Requests needing to be processed + queue []Request + + // dirty holds the working set of all Requests, whether they are being + // processed or not + dirty map[Request]struct{} + // processing holds the set of current requests being processed + processing map[Request]struct{} + + // deferred is an internal priority queue that tracks deferred + // Requests + deferred DeferQueue + // ratelimiter is the internal rate-limiter for the queue + ratelimiter Limiter + + // cond synchronizes queue access and handles signalling for when + // data is available in the queue + cond *sync.Cond + + // ctx is the top-level context that, when canceled, shuts down the queue + ctx context.Context +} + +// RunWorkQueue returns a started WorkQueue that has per-Request exponential backoff rate-limiting. +// When the passed in context is canceled, the queue shuts down. +func RunWorkQueue(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue { + q := &queue{ + ratelimiter: NewRateLimiter(baseBackoff, maxBackoff), + dirty: make(map[Request]struct{}), + processing: make(map[Request]struct{}), + cond: sync.NewCond(&sync.Mutex{}), + deferred: NewDeferQueue(500 * time.Millisecond), + ctx: ctx, + } + go q.start() + + return q +} + +// start begins the asynchronous processing loop for the deferral queue +func (q *queue) start() { + go q.deferred.Process(q.ctx, func(item Request) { + q.Add(item) + }) + + <-q.ctx.Done() + q.cond.Broadcast() +} + +// shuttingDown returns whether the queue is in the process of shutting down +func (q *queue) shuttingDown() bool { + select { + case <-q.ctx.Done(): + return true + default: + return false + } +} + +// Get returns the next Request to be processed by the caller, blocking until +// an item is available in the queue. If the returned shutdown parameter is true, +// then the caller should stop using the queue. Any Requests returned by a call +// to Get must be explicitly marked as processed via the Done method. +func (q *queue) Get() (item Request, shutdown bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + for len(q.queue) == 0 && !q.shuttingDown() { + q.cond.Wait() + } + if len(q.queue) == 0 { + // We must be shutting down. + return Request{}, true + } + + item, q.queue = q.queue[0], q.queue[1:] + + q.processing[item] = struct{}{} + delete(q.dirty, item) + + return item, false +} + +// Add puts the given Request in the queue. If the Request is already in +// the queue or the queue is stopping, then this is a no-op. +func (q *queue) Add(item Request) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + if q.shuttingDown() { + return + } + if _, ok := q.dirty[item]; ok { + return + } + + q.dirty[item] = struct{}{} + if _, ok := q.processing[item]; ok { + return + } + + q.queue = append(q.queue, item) + q.cond.Signal() +} + +// AddAfter adds a Request to the work queue after a given amount of time. +func (q *queue) AddAfter(item Request, duration time.Duration) { + // don't add if we're already shutting down + if q.shuttingDown() { + return + } + + // immediately add if there is no delay + if duration <= 0 { + q.Add(item) + return + } + + q.deferred.Defer(q.ctx, item, time.Now().Add(duration)) +} + +// AddRateLimited adds the given Request to the queue after applying the +// rate limiter to determine when the Request should next be processed. +func (q *queue) AddRateLimited(item Request) { + q.AddAfter(item, q.ratelimiter.NextRetry(item)) +} + +// Forget signals the queue to reset the rate-limiting for the given Request. +func (q *queue) Forget(item Request) { + q.ratelimiter.Forget(item) +} + +// Done removes the item from the queue, if it has been marked dirty +// again while being processed, it is re-added to the queue. +func (q *queue) Done(item Request) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + delete(q.processing, item) + if _, ok := q.dirty[item]; ok { + q.queue = append(q.queue, item) + q.cond.Signal() + } +} diff --git a/agent/consul/controller/queue_test.go b/agent/consul/controller/queue_test.go new file mode 100644 index 0000000000..8bfc00312b --- /dev/null +++ b/agent/consul/controller/queue_test.go @@ -0,0 +1,93 @@ +package controller + +import ( + "sync/atomic" + "time" +) + +var _ WorkQueue = &countingWorkQueue{} + +type countingWorkQueue struct { + getCounter uint64 + addCounter uint64 + addAfterCounter uint64 + addRateLimitedCounter uint64 + forgetCounter uint64 + doneCounter uint64 + + inner WorkQueue +} + +func newCountingWorkQueue(inner WorkQueue) *countingWorkQueue { + return &countingWorkQueue{ + inner: inner, + } +} + +func (c *countingWorkQueue) reset() { + atomic.StoreUint64(&c.getCounter, 0) + atomic.StoreUint64(&c.addCounter, 0) + atomic.StoreUint64(&c.addAfterCounter, 0) + atomic.StoreUint64(&c.addRateLimitedCounter, 0) + atomic.StoreUint64(&c.forgetCounter, 0) + atomic.StoreUint64(&c.doneCounter, 0) +} + +func (c *countingWorkQueue) requeues() uint64 { + return c.addAfters() + c.addRateLimiteds() +} + +func (c *countingWorkQueue) Get() (item Request, shutdown bool) { + item, err := c.inner.Get() + atomic.AddUint64(&c.getCounter, 1) + return item, err +} + +func (c *countingWorkQueue) gets() uint64 { + return atomic.LoadUint64(&c.getCounter) +} + +func (c *countingWorkQueue) Add(item Request) { + c.inner.Add(item) + atomic.AddUint64(&c.addCounter, 1) +} + +func (c *countingWorkQueue) adds() uint64 { + return atomic.LoadUint64(&c.addCounter) +} + +func (c *countingWorkQueue) AddAfter(item Request, duration time.Duration) { + c.inner.AddAfter(item, duration) + atomic.AddUint64(&c.addAfterCounter, 1) +} + +func (c *countingWorkQueue) addAfters() uint64 { + return atomic.LoadUint64(&c.addAfterCounter) +} + +func (c *countingWorkQueue) AddRateLimited(item Request) { + c.inner.AddRateLimited(item) + atomic.AddUint64(&c.addRateLimitedCounter, 1) +} + +func (c *countingWorkQueue) addRateLimiteds() uint64 { + return atomic.LoadUint64(&c.addRateLimitedCounter) +} + +func (c *countingWorkQueue) Forget(item Request) { + c.inner.Forget(item) + atomic.AddUint64(&c.forgetCounter, 1) +} + +func (c *countingWorkQueue) forgets() uint64 { + return atomic.LoadUint64(&c.forgetCounter) +} + +func (c *countingWorkQueue) Done(item Request) { + c.inner.Done(item) + atomic.AddUint64(&c.doneCounter, 1) +} + +func (c *countingWorkQueue) dones() uint64 { + return atomic.LoadUint64(&c.doneCounter) +} diff --git a/agent/consul/controller/rate.go b/agent/consul/controller/rate.go new file mode 100644 index 0000000000..829b39705d --- /dev/null +++ b/agent/consul/controller/rate.go @@ -0,0 +1,70 @@ +package controller + +import ( + "math" + "sync" + "time" +) + +// much of this is a re-implementation of: +// https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/default_rate_limiters.go + +// Limiter is an interface for a rate limiter that can limit +// the number of retries processed in the work queue. +type Limiter interface { + // NextRetry returns the remaining time until the queue should + // reprocess a Request. + NextRetry(request Request) time.Duration + // Forget causes the Limiter to reset the backoff for the Request. + Forget(request Request) +} + +var _ Limiter = &ratelimiter{} + +type ratelimiter struct { + failures map[Request]int + base time.Duration + max time.Duration + mutex sync.RWMutex +} + +// NewRateLimiter returns a Limiter that does per-item exponential +// backoff. +func NewRateLimiter(base, max time.Duration) Limiter { + return &ratelimiter{ + failures: make(map[Request]int), + base: base, + max: max, + } +} + +// NextRetry returns the remaining time until the queue should +// reprocess a Request. +func (r *ratelimiter) NextRetry(request Request) time.Duration { + r.mutex.RLock() + defer r.mutex.RUnlock() + + exponent := r.failures[request] + r.failures[request] = r.failures[request] + 1 + + backoff := float64(r.base.Nanoseconds()) * math.Pow(2, float64(exponent)) + // make sure we don't overflow time.Duration + if backoff > math.MaxInt64 { + return r.max + } + + calculated := time.Duration(backoff) + if calculated > r.max { + return r.max + } + + return calculated +} + +// Forget causes the Limiter to reset the backoff for the Request. +func (r *ratelimiter) Forget(request Request) { + r.mutex.Lock() + defer r.mutex.Unlock() + + delete(r.failures, request) +} diff --git a/agent/consul/controller/rate_test.go b/agent/consul/controller/rate_test.go new file mode 100644 index 0000000000..8d128eee11 --- /dev/null +++ b/agent/consul/controller/rate_test.go @@ -0,0 +1,62 @@ +package controller + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestRateLimiter_Backoff(t *testing.T) { + t.Parallel() + + limiter := NewRateLimiter(1*time.Millisecond, 1*time.Second) + + request := Request{Kind: "one"} + require.Equal(t, 1*time.Millisecond, limiter.NextRetry(request)) + require.Equal(t, 2*time.Millisecond, limiter.NextRetry(request)) + require.Equal(t, 4*time.Millisecond, limiter.NextRetry(request)) + require.Equal(t, 8*time.Millisecond, limiter.NextRetry(request)) + require.Equal(t, 16*time.Millisecond, limiter.NextRetry(request)) + + requestTwo := Request{Kind: "two"} + require.Equal(t, 1*time.Millisecond, limiter.NextRetry(requestTwo)) + require.Equal(t, 2*time.Millisecond, limiter.NextRetry(requestTwo)) + + limiter.Forget(request) + require.Equal(t, 1*time.Millisecond, limiter.NextRetry(request)) +} + +func TestRateLimiter_Overflow(t *testing.T) { + t.Parallel() + + limiter := NewRateLimiter(1*time.Millisecond, 1000*time.Second) + + request := Request{Kind: "one"} + for i := 0; i < 5; i++ { + limiter.NextRetry(request) + } + // ensure we have a normally incrementing exponential backoff + require.Equal(t, 32*time.Millisecond, limiter.NextRetry(request)) + + overflow := Request{Kind: "overflow"} + for i := 0; i < 1000; i++ { + limiter.NextRetry(overflow) + } + // make sure we're capped at the passed in max backoff + require.Equal(t, 1000*time.Second, limiter.NextRetry(overflow)) + + limiter = NewRateLimiter(1*time.Minute, 1000*time.Hour) + + for i := 0; i < 2; i++ { + limiter.NextRetry(request) + } + // ensure we have a normally incrementing exponential backoff + require.Equal(t, 4*time.Minute, limiter.NextRetry(request)) + + for i := 0; i < 1000; i++ { + limiter.NextRetry(overflow) + } + // make sure we're capped at the passed in max backoff + require.Equal(t, 1000*time.Hour, limiter.NextRetry(overflow)) +} diff --git a/agent/consul/controller/reconciler.go b/agent/consul/controller/reconciler.go new file mode 100644 index 0000000000..530c17ccbd --- /dev/null +++ b/agent/consul/controller/reconciler.go @@ -0,0 +1,51 @@ +package controller + +import ( + "context" + "fmt" + "time" + + "github.com/hashicorp/consul/acl" +) + +// Request contains the information necessary to reconcile a config entry. +// This includes only the information required to uniquely identify the +// config entry. +type Request struct { + Kind string + Name string + Meta *acl.EnterpriseMeta +} + +// RequeueAfterError is an error that allows a Reconciler to override the +// exponential backoff behavior of the Controller, rather than applying +// the backoff algorithm, returning a RequeueAfterError will cause the +// Controller to reschedule the Request at a given time in the future. +type RequeueAfterError time.Duration + +// Error implements the error interface. +func (r RequeueAfterError) Error() string { + return fmt.Sprintf("requeue at %s", time.Duration(r)) +} + +// RequeueAfter constructs a RequeueAfterError with the given duration +// setting. +func RequeueAfter(after time.Duration) error { + return RequeueAfterError(after) +} + +// RequeueNow constructs a RequeueAfterError that reschedules the Request +// immediately. +func RequeueNow() error { + return RequeueAfterError(0) +} + +// Reconciler is the main implementation interface for Controllers. A Reconciler +// receives any change notifications for config entries that the controller is subscribed +// to and processes them with its Reconcile function. +type Reconciler interface { + // Reconcile performs a reconciliation on the config entry referred to by the Request. + // The Controller will requeue the Request to be processed again if an error is non-nil. + // If no error is returned, the Request will be removed from the working queue. + Reconcile(context.Context, Request) error +} diff --git a/agent/consul/controller/reconciler_test.go b/agent/consul/controller/reconciler_test.go new file mode 100644 index 0000000000..d2f0567533 --- /dev/null +++ b/agent/consul/controller/reconciler_test.go @@ -0,0 +1,61 @@ +package controller + +import ( + "context" + "sync" +) + +type testReconciler struct { + received chan Request + response error + mutex sync.Mutex + stepChan chan struct{} + stopChan chan struct{} + ctx context.Context +} + +func (r *testReconciler) Reconcile(ctx context.Context, req Request) error { + if r.stepChan != nil { + select { + case <-r.stopChan: + return nil + case <-r.stepChan: + } + } + + select { + case <-r.stopChan: + return nil + case r.received <- req: + } + + r.mutex.Lock() + defer r.mutex.Unlock() + return r.response +} + +func (r *testReconciler) setResponse(err error) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.response = err +} + +func (r *testReconciler) step() { + r.stepChan <- struct{}{} +} + +func (r *testReconciler) stop() { + close(r.stopChan) +} + +func newTestReconciler(stepping bool) *testReconciler { + r := &testReconciler{ + received: make(chan Request, 1000), + stopChan: make(chan struct{}), + } + if stepping { + r.stepChan = make(chan struct{}) + } + + return r +}