Support custom watches on controller (#18439)

* Support custom watches on controller
* refactor mapper methods
This commit is contained in:
Ashwin Venkatesh 2023-08-17 16:34:18 -04:00 committed by GitHub
parent 92cfb4a07e
commit 97b41d946f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 195 additions and 30 deletions

3
.changelog/18439.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
Support custom watches on the Consul Controller framework.
```

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/consul/controller/queue"
"github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/proto-public/pbresource"
) )
@ -46,6 +47,21 @@ func (c Controller) WithWatch(watchedType *pbresource.Type, mapper DependencyMap
return c return c
} }
// WithCustomWatch adds a custom watch on the given dependency to the controller. Custom mapper
// will be called to map events produced by source to the controller's watched type.
func (c Controller) WithCustomWatch(source *Source, mapper CustomDependencyMapper) Controller {
if source == nil {
panic("source must not be nil")
}
if mapper == nil {
panic("mapper must not be nil")
}
c.customWatches = append(c.customWatches, customWatch{source, mapper})
return c
}
// WithLogger changes the controller's logger. // WithLogger changes the controller's logger.
func (c Controller) WithLogger(logger hclog.Logger) Controller { func (c Controller) WithLogger(logger hclog.Logger) Controller {
if logger == nil { if logger == nil {
@ -111,6 +127,7 @@ type Controller struct {
reconciler Reconciler reconciler Reconciler
logger hclog.Logger logger hclog.Logger
watches []watch watches []watch
customWatches []customWatch
baseBackoff time.Duration baseBackoff time.Duration
maxBackoff time.Duration maxBackoff time.Duration
placement Placement placement Placement
@ -121,6 +138,45 @@ type watch struct {
mapper DependencyMapper mapper DependencyMapper
} }
// Watch is responsible for watching for custom events from source and adding them to
// the event queue.
func (s *Source) Watch(ctx context.Context, add func(e Event)) error {
for {
select {
case <-ctx.Done():
return nil
case evt, ok := <-s.Source:
if !ok {
return nil
}
add(evt)
}
}
}
// Source is used as a generic source of events. This can be used when events aren't coming from resources
// stored by the resource API.
type Source struct {
Source <-chan Event
}
// Event captures an event in the system which the API can choose to respond to.
type Event struct {
Obj queue.ItemType
}
// Key returns a string that will be used to de-duplicate items in the queue.
func (e Event) Key() string {
return e.Obj.Key()
}
// customWatch represent a Watch on a custom Event source and a Mapper to map said
// Events into Requests that the controller can respond to.
type customWatch struct {
source *Source
mapper CustomDependencyMapper
}
// Request represents a request to reconcile the resource with the given ID. // Request represents a request to reconcile the resource with the given ID.
type Request struct { type Request struct {
// ID of the resource that needs to be reconciled. // ID of the resource that needs to be reconciled.

View File

@ -25,9 +25,20 @@ func TestController_API(t *testing.T) {
rec := newTestReconciler() rec := newTestReconciler()
client := svctest.RunResourceService(t, demo.RegisterTypes) client := svctest.RunResourceService(t, demo.RegisterTypes)
concertsChan := make(chan controller.Event)
defer close(concertsChan)
concertSource := &controller.Source{Source: concertsChan}
concertMapper := func(ctx context.Context, rt controller.Runtime, event controller.Event) ([]controller.Request, error) {
artistID := event.Obj.(*Concert).artistID
var requests []controller.Request
requests = append(requests, controller.Request{ID: artistID})
return requests, nil
}
ctrl := controller. ctrl := controller.
ForType(demo.TypeV2Artist). ForType(demo.TypeV2Artist).
WithWatch(demo.TypeV2Album, controller.MapOwner). WithWatch(demo.TypeV2Album, controller.MapOwner).
WithCustomWatch(concertSource, concertMapper).
WithBackoff(10*time.Millisecond, 100*time.Millisecond). WithBackoff(10*time.Millisecond, 100*time.Millisecond).
WithReconciler(rec) WithReconciler(rec)
@ -69,6 +80,32 @@ func TestController_API(t *testing.T) {
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID) prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)
}) })
t.Run("custom watched resource type", func(t *testing.T) {
res, err := demo.GenerateV2Artist()
require.NoError(t, err)
rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
require.NoError(t, err)
req := rec.wait(t)
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)
rec.expectNoRequest(t, 500*time.Millisecond)
concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: rsp.Resource.Id}}
watchedReq := rec.wait(t)
prototest.AssertDeepEqual(t, req.ID, watchedReq.ID)
otherArtist, err := demo.GenerateV2Artist()
require.NoError(t, err)
concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: otherArtist.Id}}
watchedReq = rec.wait(t)
prototest.AssertDeepEqual(t, otherArtist.Id, watchedReq.ID)
})
t.Run("error retries", func(t *testing.T) { t.Run("error retries", func(t *testing.T) {
rec.failNext(errors.New("KABOOM")) rec.failNext(errors.New("KABOOM"))
@ -266,3 +303,12 @@ func testContext(t *testing.T) context.Context {
return ctx return ctx
} }
type Concert struct {
name string
artistID *pbresource.ID
}
func (c Concert) Key() string {
return c.name
}

View File

@ -40,20 +40,39 @@ func (c *controllerRunner) run(ctx context.Context) error {
}) })
}) })
for _, watch := range c.ctrl.watches { for _, w := range c.ctrl.watches {
watch := watch
mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl) mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl)
watcher := w
// Watched Type Events → Mapper Queue // Watched Type Events → Mapper Queue
group.Go(func() error { group.Go(func() error {
return c.watch(groupCtx, watch.watchedType, func(res *pbresource.Resource) { return c.watch(groupCtx, watcher.watchedType, func(res *pbresource.Resource) {
mapQueue.Add(mapperRequest{res: res}) mapQueue.Add(mapperRequest{res: res})
}) })
}) })
// Mapper Queue → Mapper → Reconciliation Queue // Mapper Queue → Mapper → Reconciliation Queue
group.Go(func() error { group.Go(func() error {
return c.runMapper(groupCtx, watch, mapQueue, recQueue) return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(mapperRequest).res)
})
})
}
for _, cw := range c.ctrl.customWatches {
customMapQueue := runQueue[Event](groupCtx, c.ctrl)
watcher := cw
// Custom Events → Mapper Queue
group.Go(func() error {
return watcher.source.Watch(groupCtx, func(e Event) {
customMapQueue.Add(e)
})
})
// Mapper Queue → Mapper → Reconciliation Queue
group.Go(func() error {
return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(Event))
})
}) })
} }
@ -71,7 +90,7 @@ func runQueue[T queue.ItemType](ctx context.Context, ctrl Controller) queue.Work
} }
func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error { func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error {
watch, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{ wl, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{
Type: typ, Type: typ,
Tenancy: &pbresource.Tenancy{ Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard, Partition: storage.Wildcard,
@ -85,7 +104,7 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add
} }
for { for {
event, err := watch.Recv() event, err := wl.Recv()
if err != nil { if err != nil {
c.logger.Warn("error received from watch", "error", err) c.logger.Warn("error received from watch", "error", err)
return err return err
@ -99,6 +118,7 @@ func (c *controllerRunner) runMapper(
w watch, w watch,
from queue.WorkQueue[mapperRequest], from queue.WorkQueue[mapperRequest],
to queue.WorkQueue[Request], to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error { ) error {
logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType)) logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType))
@ -108,18 +128,53 @@ func (c *controllerRunner) runMapper(
return nil return nil
} }
var reqs []Request if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
err := c.handlePanic(func() error {
var err error
reqs, err = w.mapper(ctx, c.runtime(), item.res)
return err
})
if err != nil {
from.AddRateLimited(item) from.AddRateLimited(item)
from.Done(item) from.Done(item)
continue continue
} }
from.Forget(item)
from.Done(item)
}
}
func (c *controllerRunner) runCustomMapper(
ctx context.Context,
cw customWatch,
from queue.WorkQueue[Event],
to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error {
logger := c.logger.With("watched_event", cw.source)
for {
item, shutdown := from.Get()
if shutdown {
return nil
}
if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
}
from.Forget(item)
from.Done(item)
}
}
func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error {
var reqs []Request
if err := c.handlePanic(func() error {
var err error
reqs, err = mapper(ctx, c.runtime(), item)
return err
}); err != nil {
return err
}
for _, r := range reqs { for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type", logger.Error("dependency mapper returned request for a resource of the wrong type",
@ -130,10 +185,7 @@ func (c *controllerRunner) runMapper(
} }
to.Add(r) to.Add(r)
} }
return nil
from.Forget(item)
from.Done(item)
}
} }
func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error { func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error {

View File

@ -18,6 +18,14 @@ type DependencyMapper func(
res *pbresource.Resource, res *pbresource.Resource,
) ([]Request, error) ) ([]Request, error)
// CustomDependencyMapper is called when an Event occurs to determine which of the
// controller's managed resources need to be reconciled.
type CustomDependencyMapper func(
ctx context.Context,
rt Runtime,
event Event,
) ([]Request, error)
// MapOwner implements a DependencyMapper that returns the updated resource's owner. // MapOwner implements a DependencyMapper that returns the updated resource's owner.
func MapOwner(_ context.Context, _ Runtime, res *pbresource.Resource) ([]Request, error) { func MapOwner(_ context.Context, _ Runtime, res *pbresource.Resource) ([]Request, error) {
var reqs []Request var reqs []Request