controller: make the WorkQueue generic (#16982)

This commit is contained in:
Dan Upton 2023-05-05 15:38:22 +01:00 committed by GitHub
parent 4715a86358
commit 917afcf3c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 126 additions and 114 deletions

View File

@ -11,11 +11,13 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/hashicorp/go-hclog"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/consul/agent/consul/controller/queue"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog"
"golang.org/x/sync/errgroup"
) )
// much of this is a re-implementation of // much of this is a re-implementation of
@ -53,7 +55,7 @@ type Controller interface {
// WithQueueFactory allows a Controller to replace its underlying work queue // WithQueueFactory allows a Controller to replace its underlying work queue
// implementation. This is most useful for testing. This should only ever be called // implementation. This is most useful for testing. This should only ever be called
// prior to running Run. // prior to running Run.
WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) queue.WorkQueue[Request]) Controller
// AddTrigger allows for triggering a reconciliation request when a // AddTrigger allows for triggering a reconciliation request when a
// triggering function returns, when the passed in context is canceled // triggering function returns, when the passed in context is canceled
// the trigger must return // the trigger must return
@ -79,11 +81,11 @@ type controller struct {
// makeQueue is the factory used for creating the work queue, generally // makeQueue is the factory used for creating the work queue, generally
// this shouldn't be touched, but can be updated for testing purposes // this shouldn't be touched, but can be updated for testing purposes
makeQueue func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue makeQueue func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) queue.WorkQueue[Request]
// workers is the number of workers to use to process data // workers is the number of workers to use to process data
workers int workers int
// work is the internal work queue that pending Requests are added to // work is the internal work queue that pending Requests are added to
work WorkQueue work queue.WorkQueue[Request]
// baseBackoff is the starting backoff time for the work queue's rate limiter // baseBackoff is the starting backoff time for the work queue's rate limiter
baseBackoff time.Duration baseBackoff time.Duration
// maxBackoff is the maximum backoff time for the work queue's rate limiter // maxBackoff is the maximum backoff time for the work queue's rate limiter
@ -125,7 +127,7 @@ func New(publisher state.EventPublisher, reconciler Reconciler) Controller {
workers: 1, workers: 1,
baseBackoff: 5 * time.Millisecond, baseBackoff: 5 * time.Millisecond,
maxBackoff: 1000 * time.Second, maxBackoff: 1000 * time.Second,
makeQueue: RunWorkQueue, makeQueue: queue.RunWorkQueue[Request],
started: make(chan struct{}), started: make(chan struct{}),
triggers: make(map[Request]func()), triggers: make(map[Request]func()),
logger: hclog.NewNullLogger(), logger: hclog.NewNullLogger(),
@ -179,7 +181,7 @@ func (c *controller) WithLogger(logger hclog.Logger) Controller {
// WithQueueFactory changes the initialization method for the Controller's work // WithQueueFactory changes the initialization method for the Controller's work
// queue, this is predominantly just used for testing. This should only ever be called // queue, this is predominantly just used for testing. This should only ever be called
// prior to running Start. // prior to running Start.
func (c *controller) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller { func (c *controller) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) queue.WorkQueue[Request]) Controller {
c.ensureNotRunning() c.ensureNotRunning()
c.makeQueue = fn c.makeQueue = fn

View File

@ -14,6 +14,7 @@ import (
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/consul/controller/queue"
"github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
@ -144,13 +145,13 @@ func TestBasicController_Retry(t *testing.T) {
StorageBackend: fsm.NullStorageBackend, StorageBackend: fsm.NullStorageBackend,
}).State() }).State()
queueInitialized := make(chan *countingWorkQueue) queueInitialized := make(chan *countingWorkQueue[Request])
controller := New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{ controller := New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway, Topic: state.EventTopicIngressGateway,
Subject: stream.SubjectWildcard, Subject: stream.SubjectWildcard,
}).WithWorkers(-1).WithBackoff(1*time.Millisecond, 1*time.Millisecond) }).WithWorkers(-1).WithBackoff(1*time.Millisecond, 1*time.Millisecond)
go controller.WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue { go controller.WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) queue.WorkQueue[Request] {
queue := newCountingWorkQueue(RunWorkQueue(ctx, baseBackoff, maxBackoff)) queue := newCountingWorkQueue(queue.RunWorkQueue[Request](ctx, baseBackoff, maxBackoff))
queueInitialized <- queue queueInitialized <- queue
return queue return queue
}).Run(ctx) }).Run(ctx)
@ -244,9 +245,9 @@ func TestBasicController_RunPanicAssertions(t *testing.T) {
started := make(chan struct{}) started := make(chan struct{})
reconciler := newTestReconciler(false) reconciler := newTestReconciler(false)
publisher := stream.NewEventPublisher(0) publisher := stream.NewEventPublisher(0)
controller := New(publisher, reconciler).WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue { controller := New(publisher, reconciler).WithQueueFactory(func(ctx context.Context, baseBackoff, maxBackoff time.Duration) queue.WorkQueue[Request] {
close(started) close(started)
return RunWorkQueue(ctx, baseBackoff, maxBackoff) return queue.RunWorkQueue[Request](ctx, baseBackoff, maxBackoff)
}) })
subscription := &stream.SubscribeRequest{ subscription := &stream.SubscribeRequest{
Topic: state.EventTopicIngressGateway, Topic: state.EventTopicIngressGateway,
@ -276,7 +277,7 @@ func TestBasicController_RunPanicAssertions(t *testing.T) {
controller.WithWorkers(1) controller.WithWorkers(1)
}) })
require.Panics(t, func() { require.Panics(t, func() {
controller.WithQueueFactory(RunWorkQueue) controller.WithQueueFactory(queue.RunWorkQueue[Request])
}) })
} }

View File

@ -1,7 +1,7 @@
// Copyright (c) HashiCorp, Inc. // Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
package controller package queue
import ( import (
"container/heap" "container/heap"
@ -14,24 +14,24 @@ import (
// DeferQueue is a generic priority queue implementation that // DeferQueue is a generic priority queue implementation that
// allows for deferring and later processing Requests. // allows for deferring and later processing Requests.
type DeferQueue interface { type DeferQueue[T ItemType] interface {
// Defer defers processing a Request until a given time. When // Defer defers processing a Request until a given time. When
// the timeout is hit, the request will be processed by the // the timeout is hit, the request will be processed by the
// callback given in the Process loop. If the given context // callback given in the Process loop. If the given context
// is canceled, the item is not deferred. // is canceled, the item is not deferred.
Defer(ctx context.Context, item Request, until time.Time) Defer(ctx context.Context, item T, until time.Time)
// Process processes all items in the defer queue with the // Process processes all items in the defer queue with the
// given callback, blocking until the given context is canceled. // given callback, blocking until the given context is canceled.
// Callers should only ever call Process once, likely in a // Callers should only ever call Process once, likely in a
// long-lived goroutine. // long-lived goroutine.
Process(ctx context.Context, callback func(item Request)) Process(ctx context.Context, callback func(item T))
} }
// deferredRequest is a wrapped Request with information about // deferredRequest is a wrapped Request with information about
// when a retry should be attempted // when a retry should be attempted
type deferredRequest struct { type deferredRequest[T ItemType] struct {
enqueueAt time.Time enqueueAt time.Time
item Request item T
// index holds the index for the given heap entry so that if // index holds the index for the given heap entry so that if
// the entry is updated the heap can be re-sorted // the entry is updated the heap can be re-sorted
index int index int
@ -39,24 +39,24 @@ type deferredRequest struct {
// deferQueue is a priority queue for deferring Requests for // deferQueue is a priority queue for deferring Requests for
// future processing // future processing
type deferQueue struct { type deferQueue[T ItemType] struct {
heap *deferHeap heap *deferHeap[T]
entries map[Request]*deferredRequest entries map[T]*deferredRequest[T]
addChannel chan *deferredRequest addChannel chan *deferredRequest[T]
heartbeat *time.Ticker heartbeat *time.Ticker
nextReadyTimer *time.Timer nextReadyTimer *time.Timer
} }
// NewDeferQueue returns a priority queue for deferred Requests. // NewDeferQueue returns a priority queue for deferred Requests.
func NewDeferQueue(tick time.Duration) DeferQueue { func NewDeferQueue[T ItemType](tick time.Duration) DeferQueue[T] {
dHeap := &deferHeap{} dHeap := &deferHeap[T]{}
heap.Init(dHeap) heap.Init(dHeap)
return &deferQueue{ return &deferQueue[T]{
heap: dHeap, heap: dHeap,
entries: make(map[Request]*deferredRequest), entries: make(map[T]*deferredRequest[T]),
addChannel: make(chan *deferredRequest), addChannel: make(chan *deferredRequest[T]),
heartbeat: time.NewTicker(tick), heartbeat: time.NewTicker(tick),
} }
} }
@ -64,8 +64,8 @@ func NewDeferQueue(tick time.Duration) DeferQueue {
// Defer defers the given Request until the given time in the future. If the // Defer defers the given Request until the given time in the future. If the
// passed in context is canceled before the Request is deferred, then this // passed in context is canceled before the Request is deferred, then this
// immediately returns. // immediately returns.
func (q *deferQueue) Defer(ctx context.Context, item Request, until time.Time) { func (q *deferQueue[T]) Defer(ctx context.Context, item T, until time.Time) {
entry := &deferredRequest{ entry := &deferredRequest[T]{
enqueueAt: until, enqueueAt: until,
item: item, item: item,
} }
@ -77,7 +77,7 @@ func (q *deferQueue) Defer(ctx context.Context, item Request, until time.Time) {
} }
// deferEntry adds a deferred request to the priority queue // deferEntry adds a deferred request to the priority queue
func (q *deferQueue) deferEntry(entry *deferredRequest) { func (q *deferQueue[T]) deferEntry(entry *deferredRequest[T]) {
existing, exists := q.entries[entry.item] existing, exists := q.entries[entry.item]
if exists { if exists {
// insert or update the item deferral time // insert or update the item deferral time
@ -95,26 +95,26 @@ func (q *deferQueue) deferEntry(entry *deferredRequest) {
// readyRequest returns a pointer to the next ready Request or // readyRequest returns a pointer to the next ready Request or
// nil if no Requests are ready to be processed // nil if no Requests are ready to be processed
func (q *deferQueue) readyRequest() *Request { func (q *deferQueue[T]) readyRequest() *T {
if q.heap.Len() == 0 { if q.heap.Len() == 0 {
return nil return nil
} }
now := time.Now() now := time.Now()
entry := q.heap.Peek().(*deferredRequest) entry := q.heap.Peek().(*deferredRequest[T])
if entry.enqueueAt.After(now) { if entry.enqueueAt.After(now) {
return nil return nil
} }
entry = heap.Pop(q.heap).(*deferredRequest) entry = heap.Pop(q.heap).(*deferredRequest[T])
delete(q.entries, entry.item) delete(q.entries, entry.item)
return &entry.item return &entry.item
} }
// signalReady returns a timer signal to the next Request // signalReady returns a timer signal to the next Request
// that will be ready on the queue // that will be ready on the queue
func (q *deferQueue) signalReady() <-chan time.Time { func (q *deferQueue[T]) signalReady() <-chan time.Time {
if q.heap.Len() == 0 { if q.heap.Len() == 0 {
return make(<-chan time.Time) return make(<-chan time.Time)
} }
@ -123,7 +123,7 @@ func (q *deferQueue) signalReady() <-chan time.Time {
q.nextReadyTimer.Stop() q.nextReadyTimer.Stop()
} }
now := time.Now() now := time.Now()
entry := q.heap.Peek().(*deferredRequest) entry := q.heap.Peek().(*deferredRequest[T])
q.nextReadyTimer = time.NewTimer(entry.enqueueAt.Sub(now)) q.nextReadyTimer = time.NewTimer(entry.enqueueAt.Sub(now))
return q.nextReadyTimer.C return q.nextReadyTimer.C
} }
@ -132,7 +132,7 @@ func (q *deferQueue) signalReady() <-chan time.Time {
// given callback, blocking until the given context is canceled. // given callback, blocking until the given context is canceled.
// Callers should only ever call Process once, likely in a // Callers should only ever call Process once, likely in a
// long-lived goroutine. // long-lived goroutine.
func (q *deferQueue) Process(ctx context.Context, callback func(item Request)) { func (q *deferQueue[T]) Process(ctx context.Context, callback func(item T)) {
for { for {
ready := q.readyRequest() ready := q.readyRequest()
if ready != nil { if ready != nil {
@ -156,7 +156,7 @@ func (q *deferQueue) Process(ctx context.Context, callback func(item Request)) {
// continue the loop, which process ready items // continue the loop, which process ready items
case entry := <-q.addChannel: case entry := <-q.addChannel:
enqueueOrProcess := func(entry *deferredRequest) { enqueueOrProcess := func(entry *deferredRequest[T]) {
now := time.Now() now := time.Now()
if entry.enqueueAt.After(now) { if entry.enqueueAt.After(now) {
q.deferEntry(entry) q.deferEntry(entry)
@ -182,38 +182,38 @@ func (q *deferQueue) Process(ctx context.Context, callback func(item Request)) {
} }
} }
var _ heap.Interface = &deferHeap{} var _ heap.Interface = &deferHeap[string]{}
// deferHeap implements heap.Interface // deferHeap implements heap.Interface
type deferHeap []*deferredRequest type deferHeap[T ItemType] []*deferredRequest[T]
// Len returns the length of the heap. // Len returns the length of the heap.
func (h deferHeap) Len() int { func (h deferHeap[T]) Len() int {
return len(h) return len(h)
} }
// Less compares heap items for purposes of sorting. // Less compares heap items for purposes of sorting.
func (h deferHeap) Less(i, j int) bool { func (h deferHeap[T]) Less(i, j int) bool {
return h[i].enqueueAt.Before(h[j].enqueueAt) return h[i].enqueueAt.Before(h[j].enqueueAt)
} }
// Swap swaps two entries in the heap. // Swap swaps two entries in the heap.
func (h deferHeap) Swap(i, j int) { func (h deferHeap[T]) Swap(i, j int) {
h[i], h[j] = h[j], h[i] h[i], h[j] = h[j], h[i]
h[i].index = i h[i].index = i
h[j].index = j h[j].index = j
} }
// Push pushes an entry onto the heap. // Push pushes an entry onto the heap.
func (h *deferHeap) Push(x interface{}) { func (h *deferHeap[T]) Push(x interface{}) {
n := len(*h) n := len(*h)
item := x.(*deferredRequest) item := x.(*deferredRequest[T])
item.index = n item.index = n
*h = append(*h, item) *h = append(*h, item)
} }
// Pop pops an entry off the heap. // Pop pops an entry off the heap.
func (h *deferHeap) Pop() interface{} { func (h *deferHeap[T]) Pop() interface{} {
n := len(*h) n := len(*h)
item := (*h)[n-1] item := (*h)[n-1]
item.index = -1 item.index = -1
@ -222,6 +222,6 @@ func (h *deferHeap) Pop() interface{} {
} }
// Peek returns the next item on the heap. // Peek returns the next item on the heap.
func (h deferHeap) Peek() interface{} { func (h deferHeap[T]) Peek() interface{} {
return h[0] return h[0]
} }

View File

@ -1,7 +1,7 @@
// Copyright (c) HashiCorp, Inc. // Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
package controller package queue
import ( import (
"context" "context"
@ -12,43 +12,46 @@ import (
// much of this is a re-implementation of // much of this is a re-implementation of
// https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/queue.go // https://github.com/kubernetes/client-go/blob/release-1.25/util/workqueue/queue.go
// ItemType is the type constraint for items in the WorkQueue.
type ItemType comparable
// WorkQueue is an interface for a work queue with semantics to help with // WorkQueue is an interface for a work queue with semantics to help with
// retries and rate limiting. // retries and rate limiting.
type WorkQueue interface { type WorkQueue[T ItemType] interface {
// Get retrieves the next Request in the queue, blocking until a Request is // Get retrieves the next Request in the queue, blocking until a Request is
// available, if shutdown is true, then the queue is shutting down and should // available, if shutdown is true, then the queue is shutting down and should
// no longer be used by the caller. // no longer be used by the caller.
Get() (item Request, shutdown bool) Get() (item T, shutdown bool)
// Add immediately adds a Request to the work queue. // Add immediately adds a Request to the work queue.
Add(item Request) Add(item T)
// AddAfter adds a Request to the work queue after a given amount of time. // AddAfter adds a Request to the work queue after a given amount of time.
AddAfter(item Request, duration time.Duration) AddAfter(item T, duration time.Duration)
// AddRateLimited adds a Request to the work queue after the amount of time // AddRateLimited adds a Request to the work queue after the amount of time
// specified by applying the queue's rate limiter. // specified by applying the queue's rate limiter.
AddRateLimited(item Request) AddRateLimited(item T)
// Forget signals the queue to reset the rate-limiting for the given Request. // Forget signals the queue to reset the rate-limiting for the given Request.
Forget(item Request) Forget(item T)
// Done tells the work queue that the Request has been successfully processed // Done tells the work queue that the Request has been successfully processed
// and can be deleted from the queue. // and can be deleted from the queue.
Done(item Request) Done(item T)
} }
// queue implements a rate-limited work queue // queue implements a rate-limited work queue
type queue struct { type queue[T ItemType] struct {
// queue holds an ordered list of Requests needing to be processed // queue holds an ordered list of Requests needing to be processed
queue []Request queue []T
// dirty holds the working set of all Requests, whether they are being // dirty holds the working set of all Requests, whether they are being
// processed or not // processed or not
dirty map[Request]struct{} dirty map[T]struct{}
// processing holds the set of current requests being processed // processing holds the set of current requests being processed
processing map[Request]struct{} processing map[T]struct{}
// deferred is an internal priority queue that tracks deferred // deferred is an internal priority queue that tracks deferred
// Requests // Requests
deferred DeferQueue deferred DeferQueue[T]
// ratelimiter is the internal rate-limiter for the queue // ratelimiter is the internal rate-limiter for the queue
ratelimiter Limiter ratelimiter Limiter[T]
// cond synchronizes queue access and handles signalling for when // cond synchronizes queue access and handles signalling for when
// data is available in the queue // data is available in the queue
@ -58,15 +61,15 @@ type queue struct {
ctx context.Context ctx context.Context
} }
// RunWorkQueue returns a started WorkQueue that has per-Request exponential backoff rate-limiting. // RunWorkQueue returns a started WorkQueue that has per-item exponential backoff rate-limiting.
// When the passed in context is canceled, the queue shuts down. // When the passed in context is canceled, the queue shuts down.
func RunWorkQueue(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue { func RunWorkQueue[T ItemType](ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue[T] {
q := &queue{ q := &queue[T]{
ratelimiter: NewRateLimiter(baseBackoff, maxBackoff), ratelimiter: NewRateLimiter[T](baseBackoff, maxBackoff),
dirty: make(map[Request]struct{}), dirty: make(map[T]struct{}),
processing: make(map[Request]struct{}), processing: make(map[T]struct{}),
cond: sync.NewCond(&sync.Mutex{}), cond: sync.NewCond(&sync.Mutex{}),
deferred: NewDeferQueue(500 * time.Millisecond), deferred: NewDeferQueue[T](500 * time.Millisecond),
ctx: ctx, ctx: ctx,
} }
go q.start() go q.start()
@ -75,8 +78,8 @@ func RunWorkQueue(ctx context.Context, baseBackoff, maxBackoff time.Duration) Wo
} }
// start begins the asynchronous processing loop for the deferral queue // start begins the asynchronous processing loop for the deferral queue
func (q *queue) start() { func (q *queue[T]) start() {
go q.deferred.Process(q.ctx, func(item Request) { go q.deferred.Process(q.ctx, func(item T) {
q.Add(item) q.Add(item)
}) })
@ -85,7 +88,7 @@ func (q *queue) start() {
} }
// shuttingDown returns whether the queue is in the process of shutting down // shuttingDown returns whether the queue is in the process of shutting down
func (q *queue) shuttingDown() bool { func (q *queue[T]) shuttingDown() bool {
select { select {
case <-q.ctx.Done(): case <-q.ctx.Done():
return true return true
@ -98,7 +101,7 @@ func (q *queue) shuttingDown() bool {
// an item is available in the queue. If the returned shutdown parameter is true, // an item is available in the queue. If the returned shutdown parameter is true,
// then the caller should stop using the queue. Any Requests returned by a call // then the caller should stop using the queue. Any Requests returned by a call
// to Get must be explicitly marked as processed via the Done method. // to Get must be explicitly marked as processed via the Done method.
func (q *queue) Get() (item Request, shutdown bool) { func (q *queue[T]) Get() (item T, shutdown bool) {
q.cond.L.Lock() q.cond.L.Lock()
defer q.cond.L.Unlock() defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown() { for len(q.queue) == 0 && !q.shuttingDown() {
@ -106,7 +109,8 @@ func (q *queue) Get() (item Request, shutdown bool) {
} }
if len(q.queue) == 0 { if len(q.queue) == 0 {
// We must be shutting down. // We must be shutting down.
return Request{}, true var zero T
return zero, true
} }
item, q.queue = q.queue[0], q.queue[1:] item, q.queue = q.queue[0], q.queue[1:]
@ -119,7 +123,7 @@ func (q *queue) Get() (item Request, shutdown bool) {
// Add puts the given Request in the queue. If the Request is already in // Add puts the given Request in the queue. If the Request is already in
// the queue or the queue is stopping, then this is a no-op. // the queue or the queue is stopping, then this is a no-op.
func (q *queue) Add(item Request) { func (q *queue[T]) Add(item T) {
q.cond.L.Lock() q.cond.L.Lock()
defer q.cond.L.Unlock() defer q.cond.L.Unlock()
if q.shuttingDown() { if q.shuttingDown() {
@ -139,7 +143,7 @@ func (q *queue) Add(item Request) {
} }
// AddAfter adds a Request to the work queue after a given amount of time. // AddAfter adds a Request to the work queue after a given amount of time.
func (q *queue) AddAfter(item Request, duration time.Duration) { func (q *queue[T]) AddAfter(item T, duration time.Duration) {
// don't add if we're already shutting down // don't add if we're already shutting down
if q.shuttingDown() { if q.shuttingDown() {
return return
@ -156,18 +160,18 @@ func (q *queue) AddAfter(item Request, duration time.Duration) {
// AddRateLimited adds the given Request to the queue after applying the // AddRateLimited adds the given Request to the queue after applying the
// rate limiter to determine when the Request should next be processed. // rate limiter to determine when the Request should next be processed.
func (q *queue) AddRateLimited(item Request) { func (q *queue[T]) AddRateLimited(item T) {
q.AddAfter(item, q.ratelimiter.NextRetry(item)) q.AddAfter(item, q.ratelimiter.NextRetry(item))
} }
// Forget signals the queue to reset the rate-limiting for the given Request. // Forget signals the queue to reset the rate-limiting for the given Request.
func (q *queue) Forget(item Request) { func (q *queue[T]) Forget(item T) {
q.ratelimiter.Forget(item) q.ratelimiter.Forget(item)
} }
// Done removes the item from the queue, if it has been marked dirty // Done removes the item from the queue, if it has been marked dirty
// again while being processed, it is re-added to the queue. // again while being processed, it is re-added to the queue.
func (q *queue) Done(item Request) { func (q *queue[T]) Done(item T) {
q.cond.L.Lock() q.cond.L.Lock()
defer q.cond.L.Unlock() defer q.cond.L.Unlock()

View File

@ -1,7 +1,7 @@
// Copyright (c) HashiCorp, Inc. // Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
package controller package queue
import ( import (
"math" "math"
@ -14,18 +14,18 @@ import (
// Limiter is an interface for a rate limiter that can limit // Limiter is an interface for a rate limiter that can limit
// the number of retries processed in the work queue. // the number of retries processed in the work queue.
type Limiter interface { type Limiter[T ItemType] interface {
// NextRetry returns the remaining time until the queue should // NextRetry returns the remaining time until the queue should
// reprocess a Request. // reprocess a Request.
NextRetry(request Request) time.Duration NextRetry(request T) time.Duration
// Forget causes the Limiter to reset the backoff for the Request. // Forget causes the Limiter to reset the backoff for the Request.
Forget(request Request) Forget(request T)
} }
var _ Limiter = &ratelimiter{} var _ Limiter[string] = &ratelimiter[string]{}
type ratelimiter struct { type ratelimiter[T ItemType] struct {
failures map[Request]int failures map[T]int
base time.Duration base time.Duration
max time.Duration max time.Duration
mutex sync.RWMutex mutex sync.RWMutex
@ -33,9 +33,9 @@ type ratelimiter struct {
// NewRateLimiter returns a Limiter that does per-item exponential // NewRateLimiter returns a Limiter that does per-item exponential
// backoff. // backoff.
func NewRateLimiter(base, max time.Duration) Limiter { func NewRateLimiter[T ItemType](base, max time.Duration) Limiter[T] {
return &ratelimiter{ return &ratelimiter[T]{
failures: make(map[Request]int), failures: make(map[T]int),
base: base, base: base,
max: max, max: max,
} }
@ -43,7 +43,7 @@ func NewRateLimiter(base, max time.Duration) Limiter {
// NextRetry returns the remaining time until the queue should // NextRetry returns the remaining time until the queue should
// reprocess a Request. // reprocess a Request.
func (r *ratelimiter) NextRetry(request Request) time.Duration { func (r *ratelimiter[T]) NextRetry(request T) time.Duration {
r.mutex.RLock() r.mutex.RLock()
defer r.mutex.RUnlock() defer r.mutex.RUnlock()
@ -65,7 +65,7 @@ func (r *ratelimiter) NextRetry(request Request) time.Duration {
} }
// Forget causes the Limiter to reset the backoff for the Request. // Forget causes the Limiter to reset the backoff for the Request.
func (r *ratelimiter) Forget(request Request) { func (r *ratelimiter[T]) Forget(request T) {
r.mutex.Lock() r.mutex.Lock()
defer r.mutex.Unlock() defer r.mutex.Unlock()

View File

@ -1,7 +1,7 @@
// Copyright (c) HashiCorp, Inc. // Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
package controller package queue
import ( import (
"testing" "testing"
@ -10,10 +10,12 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
type Request struct{ Kind string }
func TestRateLimiter_Backoff(t *testing.T) { func TestRateLimiter_Backoff(t *testing.T) {
t.Parallel() t.Parallel()
limiter := NewRateLimiter(1*time.Millisecond, 1*time.Second) limiter := NewRateLimiter[Request](1*time.Millisecond, 1*time.Second)
request := Request{Kind: "one"} request := Request{Kind: "one"}
require.Equal(t, 1*time.Millisecond, limiter.NextRetry(request)) require.Equal(t, 1*time.Millisecond, limiter.NextRetry(request))
@ -33,7 +35,7 @@ func TestRateLimiter_Backoff(t *testing.T) {
func TestRateLimiter_Overflow(t *testing.T) { func TestRateLimiter_Overflow(t *testing.T) {
t.Parallel() t.Parallel()
limiter := NewRateLimiter(1*time.Millisecond, 1000*time.Second) limiter := NewRateLimiter[Request](1*time.Millisecond, 1000*time.Second)
request := Request{Kind: "one"} request := Request{Kind: "one"}
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
@ -49,7 +51,7 @@ func TestRateLimiter_Overflow(t *testing.T) {
// make sure we're capped at the passed in max backoff // make sure we're capped at the passed in max backoff
require.Equal(t, 1000*time.Second, limiter.NextRetry(overflow)) require.Equal(t, 1000*time.Second, limiter.NextRetry(overflow))
limiter = NewRateLimiter(1*time.Minute, 1000*time.Hour) limiter = NewRateLimiter[Request](1*time.Minute, 1000*time.Hour)
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
limiter.NextRetry(request) limiter.NextRetry(request)

View File

@ -6,11 +6,13 @@ package controller
import ( import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/hashicorp/consul/agent/consul/controller/queue"
) )
var _ WorkQueue = &countingWorkQueue{} var _ queue.WorkQueue[string] = &countingWorkQueue[string]{}
type countingWorkQueue struct { type countingWorkQueue[T queue.ItemType] struct {
getCounter uint64 getCounter uint64
addCounter uint64 addCounter uint64
addAfterCounter uint64 addAfterCounter uint64
@ -18,16 +20,16 @@ type countingWorkQueue struct {
forgetCounter uint64 forgetCounter uint64
doneCounter uint64 doneCounter uint64
inner WorkQueue inner queue.WorkQueue[T]
} }
func newCountingWorkQueue(inner WorkQueue) *countingWorkQueue { func newCountingWorkQueue[T queue.ItemType](inner queue.WorkQueue[T]) *countingWorkQueue[T] {
return &countingWorkQueue{ return &countingWorkQueue[T]{
inner: inner, inner: inner,
} }
} }
func (c *countingWorkQueue) reset() { func (c *countingWorkQueue[T]) reset() {
atomic.StoreUint64(&c.getCounter, 0) atomic.StoreUint64(&c.getCounter, 0)
atomic.StoreUint64(&c.addCounter, 0) atomic.StoreUint64(&c.addCounter, 0)
atomic.StoreUint64(&c.addAfterCounter, 0) atomic.StoreUint64(&c.addAfterCounter, 0)
@ -36,61 +38,61 @@ func (c *countingWorkQueue) reset() {
atomic.StoreUint64(&c.doneCounter, 0) atomic.StoreUint64(&c.doneCounter, 0)
} }
func (c *countingWorkQueue) requeues() uint64 { func (c *countingWorkQueue[T]) requeues() uint64 {
return c.addAfters() + c.addRateLimiteds() return c.addAfters() + c.addRateLimiteds()
} }
func (c *countingWorkQueue) Get() (item Request, shutdown bool) { func (c *countingWorkQueue[T]) Get() (item T, shutdown bool) {
item, err := c.inner.Get() item, err := c.inner.Get()
atomic.AddUint64(&c.getCounter, 1) atomic.AddUint64(&c.getCounter, 1)
return item, err return item, err
} }
func (c *countingWorkQueue) gets() uint64 { func (c *countingWorkQueue[T]) gets() uint64 {
return atomic.LoadUint64(&c.getCounter) return atomic.LoadUint64(&c.getCounter)
} }
func (c *countingWorkQueue) Add(item Request) { func (c *countingWorkQueue[T]) Add(item T) {
c.inner.Add(item) c.inner.Add(item)
atomic.AddUint64(&c.addCounter, 1) atomic.AddUint64(&c.addCounter, 1)
} }
func (c *countingWorkQueue) adds() uint64 { func (c *countingWorkQueue[T]) adds() uint64 {
return atomic.LoadUint64(&c.addCounter) return atomic.LoadUint64(&c.addCounter)
} }
func (c *countingWorkQueue) AddAfter(item Request, duration time.Duration) { func (c *countingWorkQueue[T]) AddAfter(item T, duration time.Duration) {
c.inner.AddAfter(item, duration) c.inner.AddAfter(item, duration)
atomic.AddUint64(&c.addAfterCounter, 1) atomic.AddUint64(&c.addAfterCounter, 1)
} }
func (c *countingWorkQueue) addAfters() uint64 { func (c *countingWorkQueue[T]) addAfters() uint64 {
return atomic.LoadUint64(&c.addAfterCounter) return atomic.LoadUint64(&c.addAfterCounter)
} }
func (c *countingWorkQueue) AddRateLimited(item Request) { func (c *countingWorkQueue[T]) AddRateLimited(item T) {
c.inner.AddRateLimited(item) c.inner.AddRateLimited(item)
atomic.AddUint64(&c.addRateLimitedCounter, 1) atomic.AddUint64(&c.addRateLimitedCounter, 1)
} }
func (c *countingWorkQueue) addRateLimiteds() uint64 { func (c *countingWorkQueue[T]) addRateLimiteds() uint64 {
return atomic.LoadUint64(&c.addRateLimitedCounter) return atomic.LoadUint64(&c.addRateLimitedCounter)
} }
func (c *countingWorkQueue) Forget(item Request) { func (c *countingWorkQueue[T]) Forget(item T) {
c.inner.Forget(item) c.inner.Forget(item)
atomic.AddUint64(&c.forgetCounter, 1) atomic.AddUint64(&c.forgetCounter, 1)
} }
func (c *countingWorkQueue) forgets() uint64 { func (c *countingWorkQueue[T]) forgets() uint64 {
return atomic.LoadUint64(&c.forgetCounter) return atomic.LoadUint64(&c.forgetCounter)
} }
func (c *countingWorkQueue) Done(item Request) { func (c *countingWorkQueue[T]) Done(item T) {
c.inner.Done(item) c.inner.Done(item)
atomic.AddUint64(&c.doneCounter, 1) atomic.AddUint64(&c.doneCounter, 1)
} }
func (c *countingWorkQueue) dones() uint64 { func (c *countingWorkQueue[T]) dones() uint64 {
return atomic.LoadUint64(&c.doneCounter) return atomic.LoadUint64(&c.doneCounter)
} }

View File

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/controller" "github.com/hashicorp/consul/agent/consul/controller"
"github.com/hashicorp/consul/agent/consul/controller/queue"
"github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
@ -4096,7 +4097,7 @@ func (n *noopController) Subscribe(request *stream.SubscribeRequest, transformer
func (n *noopController) WithBackoff(base, max time.Duration) controller.Controller { return n } func (n *noopController) WithBackoff(base, max time.Duration) controller.Controller { return n }
func (n *noopController) WithLogger(logger hclog.Logger) controller.Controller { return n } func (n *noopController) WithLogger(logger hclog.Logger) controller.Controller { return n }
func (n *noopController) WithWorkers(i int) controller.Controller { return n } func (n *noopController) WithWorkers(i int) controller.Controller { return n }
func (n *noopController) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) controller.WorkQueue) controller.Controller { func (n *noopController) WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) queue.WorkQueue[controller.Request]) controller.Controller {
return n return n
} }