diff --git a/.changelog/18439.txt b/.changelog/18439.txt new file mode 100644 index 0000000000..dd12738d5c --- /dev/null +++ b/.changelog/18439.txt @@ -0,0 +1,3 @@ +```release-note:feature +Support custom watches on the Consul Controller framework. +``` diff --git a/internal/controller/api.go b/internal/controller/api.go index 17dfa8ef0a..5c2fc2e782 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/agent/consul/controller/queue" "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/proto-public/pbresource" ) @@ -46,6 +47,21 @@ func (c Controller) WithWatch(watchedType *pbresource.Type, mapper DependencyMap return c } +// WithCustomWatch adds a custom watch on the given dependency to the controller. Custom mapper +// will be called to map events produced by source to the controller's watched type. +func (c Controller) WithCustomWatch(source *Source, mapper CustomDependencyMapper) Controller { + if source == nil { + panic("source must not be nil") + } + + if mapper == nil { + panic("mapper must not be nil") + } + + c.customWatches = append(c.customWatches, customWatch{source, mapper}) + return c +} + // WithLogger changes the controller's logger. func (c Controller) WithLogger(logger hclog.Logger) Controller { if logger == nil { @@ -107,13 +123,14 @@ func (c Controller) backoff() (time.Duration, time.Duration) { // Use the builder methods in this package (starting with ForType) to construct // a controller, and then pass it to a Manager to be executed. type Controller struct { - managedType *pbresource.Type - reconciler Reconciler - logger hclog.Logger - watches []watch - baseBackoff time.Duration - maxBackoff time.Duration - placement Placement + managedType *pbresource.Type + reconciler Reconciler + logger hclog.Logger + watches []watch + customWatches []customWatch + baseBackoff time.Duration + maxBackoff time.Duration + placement Placement } type watch struct { @@ -121,6 +138,45 @@ type watch struct { mapper DependencyMapper } +// Watch is responsible for watching for custom events from source and adding them to +// the event queue. +func (s *Source) Watch(ctx context.Context, add func(e Event)) error { + for { + select { + case <-ctx.Done(): + return nil + case evt, ok := <-s.Source: + if !ok { + return nil + } + add(evt) + } + } +} + +// Source is used as a generic source of events. This can be used when events aren't coming from resources +// stored by the resource API. +type Source struct { + Source <-chan Event +} + +// Event captures an event in the system which the API can choose to respond to. +type Event struct { + Obj queue.ItemType +} + +// Key returns a string that will be used to de-duplicate items in the queue. +func (e Event) Key() string { + return e.Obj.Key() +} + +// customWatch represent a Watch on a custom Event source and a Mapper to map said +// Events into Requests that the controller can respond to. +type customWatch struct { + source *Source + mapper CustomDependencyMapper +} + // Request represents a request to reconcile the resource with the given ID. type Request struct { // ID of the resource that needs to be reconciled. diff --git a/internal/controller/api_test.go b/internal/controller/api_test.go index 215063d87c..40d3ec99be 100644 --- a/internal/controller/api_test.go +++ b/internal/controller/api_test.go @@ -25,9 +25,20 @@ func TestController_API(t *testing.T) { rec := newTestReconciler() client := svctest.RunResourceService(t, demo.RegisterTypes) + concertsChan := make(chan controller.Event) + defer close(concertsChan) + concertSource := &controller.Source{Source: concertsChan} + concertMapper := func(ctx context.Context, rt controller.Runtime, event controller.Event) ([]controller.Request, error) { + artistID := event.Obj.(*Concert).artistID + var requests []controller.Request + requests = append(requests, controller.Request{ID: artistID}) + return requests, nil + } + ctrl := controller. ForType(demo.TypeV2Artist). WithWatch(demo.TypeV2Album, controller.MapOwner). + WithCustomWatch(concertSource, concertMapper). WithBackoff(10*time.Millisecond, 100*time.Millisecond). WithReconciler(rec) @@ -69,6 +80,32 @@ func TestController_API(t *testing.T) { prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID) }) + t.Run("custom watched resource type", func(t *testing.T) { + res, err := demo.GenerateV2Artist() + require.NoError(t, err) + + rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res}) + require.NoError(t, err) + + req := rec.wait(t) + prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID) + + rec.expectNoRequest(t, 500*time.Millisecond) + + concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: rsp.Resource.Id}} + + watchedReq := rec.wait(t) + prototest.AssertDeepEqual(t, req.ID, watchedReq.ID) + + otherArtist, err := demo.GenerateV2Artist() + require.NoError(t, err) + + concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: otherArtist.Id}} + + watchedReq = rec.wait(t) + prototest.AssertDeepEqual(t, otherArtist.Id, watchedReq.ID) + }) + t.Run("error retries", func(t *testing.T) { rec.failNext(errors.New("KABOOM")) @@ -266,3 +303,12 @@ func testContext(t *testing.T) context.Context { return ctx } + +type Concert struct { + name string + artistID *pbresource.ID +} + +func (c Concert) Key() string { + return c.name +} diff --git a/internal/controller/controller.go b/internal/controller/controller.go index f6a064853c..ac901d355b 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -40,20 +40,39 @@ func (c *controllerRunner) run(ctx context.Context) error { }) }) - for _, watch := range c.ctrl.watches { - watch := watch + for _, w := range c.ctrl.watches { mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl) - + watcher := w // Watched Type Events → Mapper Queue group.Go(func() error { - return c.watch(groupCtx, watch.watchedType, func(res *pbresource.Resource) { + return c.watch(groupCtx, watcher.watchedType, func(res *pbresource.Resource) { mapQueue.Add(mapperRequest{res: res}) }) }) // Mapper Queue → Mapper → Reconciliation Queue group.Go(func() error { - return c.runMapper(groupCtx, watch, mapQueue, recQueue) + return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { + return watcher.mapper(ctx, runtime, itemType.(mapperRequest).res) + }) + }) + } + + for _, cw := range c.ctrl.customWatches { + customMapQueue := runQueue[Event](groupCtx, c.ctrl) + watcher := cw + // Custom Events → Mapper Queue + group.Go(func() error { + return watcher.source.Watch(groupCtx, func(e Event) { + customMapQueue.Add(e) + }) + }) + + // Mapper Queue → Mapper → Reconciliation Queue + group.Go(func() error { + return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { + return watcher.mapper(ctx, runtime, itemType.(Event)) + }) }) } @@ -71,7 +90,7 @@ func runQueue[T queue.ItemType](ctx context.Context, ctrl Controller) queue.Work } func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error { - watch, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{ + wl, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{ Type: typ, Tenancy: &pbresource.Tenancy{ Partition: storage.Wildcard, @@ -85,7 +104,7 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add } for { - event, err := watch.Recv() + event, err := wl.Recv() if err != nil { c.logger.Warn("error received from watch", "error", err) return err @@ -99,6 +118,7 @@ func (c *controllerRunner) runMapper( w watch, from queue.WorkQueue[mapperRequest], to queue.WorkQueue[Request], + mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), ) error { logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType)) @@ -108,27 +128,36 @@ func (c *controllerRunner) runMapper( return nil } - var reqs []Request - err := c.handlePanic(func() error { - var err error - reqs, err = w.mapper(ctx, c.runtime(), item.res) - return err - }) - if err != nil { + if err := c.doMap(ctx, mapper, to, item, logger); err != nil { from.AddRateLimited(item) from.Done(item) continue } - for _, r := range reqs { - if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { - logger.Error("dependency mapper returned request for a resource of the wrong type", - "type_expected", resource.ToGVK(c.ctrl.managedType), - "type_got", resource.ToGVK(r.ID.Type), - ) - continue - } - to.Add(r) + from.Forget(item) + from.Done(item) + } +} + +func (c *controllerRunner) runCustomMapper( + ctx context.Context, + cw customWatch, + from queue.WorkQueue[Event], + to queue.WorkQueue[Request], + mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), +) error { + logger := c.logger.With("watched_event", cw.source) + + for { + item, shutdown := from.Get() + if shutdown { + return nil + } + + if err := c.doMap(ctx, mapper, to, item, logger); err != nil { + from.AddRateLimited(item) + from.Done(item) + continue } from.Forget(item) @@ -136,6 +165,29 @@ func (c *controllerRunner) runMapper( } } +func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error { + var reqs []Request + if err := c.handlePanic(func() error { + var err error + reqs, err = mapper(ctx, c.runtime(), item) + return err + }); err != nil { + return err + } + + for _, r := range reqs { + if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { + logger.Error("dependency mapper returned request for a resource of the wrong type", + "type_expected", resource.ToGVK(c.ctrl.managedType), + "type_got", resource.ToGVK(r.ID.Type), + ) + continue + } + to.Add(r) + } + return nil +} + func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error { for { req, shutdown := queue.Get() diff --git a/internal/controller/dependency_mappers.go b/internal/controller/dependency_mappers.go index e66e4b50e5..1ac331ffaf 100644 --- a/internal/controller/dependency_mappers.go +++ b/internal/controller/dependency_mappers.go @@ -18,6 +18,14 @@ type DependencyMapper func( res *pbresource.Resource, ) ([]Request, error) +// CustomDependencyMapper is called when an Event occurs to determine which of the +// controller's managed resources need to be reconciled. +type CustomDependencyMapper func( + ctx context.Context, + rt Runtime, + event Event, +) ([]Request, error) + // MapOwner implements a DependencyMapper that returns the updated resource's owner. func MapOwner(_ context.Context, _ Runtime, res *pbresource.Resource) ([]Request, error) { var reqs []Request