Matt Keeler 326c0ecfbe
In-Memory gRPC (#19942)
* Implement In-Process gRPC for use by controller caching/indexing

This replaces the pipe base listener implementation we were previously using. The new style CAN avoid cloning resources which our controller caching/indexing is taking advantage of to not duplicate resource objects in memory.

To maintain safety for controllers and for them to be able to modify data they get back from the cache and the resource service, the client they are presented in their runtime will be wrapped with an autogenerated client which clones request and response messages as they pass through the client.

Another sizable change in this PR is to consolidate how server specific gRPC services get registered and managed. Before this was in a bunch of different methods and it was difficult to track down how gRPC services were registered. Now its all in one place.

* Fix race in tests

* Ensure the resource service is registered to the multiplexed handler for forwarding from client agents

* Expose peer streaming on the internal handler
2024-01-12 11:54:07 -05:00

335 lines
10 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package controller
import (
"context"
"errors"
"fmt"
"time"
"github.com/hashicorp/go-hclog"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/consul/agent/consul/controller/queue"
"github.com/hashicorp/consul/internal/controller/cache"
"github.com/hashicorp/consul/internal/protoutil"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/proto-public/pbresource"
)
// Runtime contains the dependencies required by reconcilers.
type Runtime struct {
Client pbresource.ResourceServiceClient
Logger hclog.Logger
Cache cache.ReadOnlyCache
}
// controllerRunner contains the actual implementation of running a controller
// including creating watches, calling the reconciler, handling retries, etc.
type controllerRunner struct {
ctrl *Controller
// watchClient will be used by the controller infrastructure internally to
// perform watches and maintain the cache. On servers, this client will use
// the in-memory gRPC clients which DO NOT cause cloning of data returned by
// the resource service. This is desirable so that our cache doesn't incur
// the overhead of duplicating all resources that are watched. Generally
// dependency mappers and reconcilers should not be given this client so
// that they can be free to modify the data they are returned.
watchClient pbresource.ResourceServiceClient
// runtimeClient will be used by dependency mappers and reconcilers to
// access the resource service. On servers, this client will use the in-memory
// gRPC client wrapped with the cloning client to force cloning of protobuf
// messages as they pass through the client. This is desirable so that
// controllers and their mappers can be free to modify the data returned
// to them without having to think about the fact that the data should
// be immutable as it is shared with the controllers cache as well as the
// resource service itself.
runtimeClient pbresource.ResourceServiceClient
logger hclog.Logger
cache cache.Cache
}
func newControllerRunner(c *Controller, client pbresource.ResourceServiceClient, defaultLogger hclog.Logger) *controllerRunner {
return &controllerRunner{
ctrl: c,
watchClient: client,
runtimeClient: pbresource.NewCloningResourceServiceClient(client),
logger: c.buildLogger(defaultLogger),
// Do not build the cache here. If we build/set it when the controller runs
// then if a controller is restarted it will invalidate the previous cache automatically.
}
}
func (c *controllerRunner) run(ctx context.Context) error {
c.logger.Debug("controller running")
defer c.logger.Debug("controller stopping")
c.cache = c.ctrl.buildCache()
defer func() {
// once no longer running we should nil out the cache
// so that we don't hold pointers to resources which may
// become out of date in the future.
c.cache = nil
}()
if c.ctrl.startCb != nil {
c.ctrl.startCb(ctx, c.runtime(c.logger))
}
if c.ctrl.stopCb != nil {
defer c.ctrl.stopCb(ctx, c.runtime(c.logger))
}
group, groupCtx := errgroup.WithContext(ctx)
recQueue := runQueue[Request](groupCtx, c.ctrl)
// Managed Type Events → Reconciliation Queue
group.Go(func() error {
return c.watch(groupCtx, c.ctrl.managedTypeWatch.watchedType, func(res *pbresource.Resource) {
recQueue.Add(Request{ID: res.Id})
})
})
for _, w := range c.ctrl.watches {
mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl)
watcher := w
// Watched Type Events → Mapper Queue
group.Go(func() error {
return c.watch(groupCtx, watcher.watchedType, func(res *pbresource.Resource) {
mapQueue.Add(mapperRequest{res: res})
})
})
// Mapper Queue → Mapper → Reconciliation Queue
group.Go(func() error {
return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(mapperRequest).res)
})
})
}
for _, cw := range c.ctrl.customWatches {
customMapQueue := runQueue[Event](groupCtx, c.ctrl)
watcher := cw
// Custom Events → Mapper Queue
group.Go(func() error {
return watcher.source.Watch(groupCtx, func(e Event) {
customMapQueue.Add(e)
})
})
// Mapper Queue → Mapper → Reconciliation Queue
group.Go(func() error {
return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(Event))
})
})
}
// Reconciliation Queue → Reconciler
group.Go(func() error {
return c.runReconciler(groupCtx, recQueue)
})
return group.Wait()
}
func runQueue[T queue.ItemType](ctx context.Context, ctrl *Controller) queue.WorkQueue[T] {
base, max := ctrl.backoff()
return queue.RunWorkQueue[T](ctx, base, max)
}
func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error {
wl, err := c.watchClient.WatchList(ctx, &pbresource.WatchListRequest{
Type: typ,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
PeerName: storage.Wildcard,
Namespace: storage.Wildcard,
},
})
if err != nil {
c.logger.Error("failed to create watch", "error", err)
return err
}
for {
event, err := wl.Recv()
if err != nil {
c.logger.Warn("error received from watch", "error", err)
return err
}
// Keep the cache up to date. There main reason to do this here is
// to ensure that any mapper/reconciliation queue deduping wont
// hide events from being observed and updating the cache state.
// Therefore we should do this before any queueing.
switch event.Operation {
case pbresource.WatchEvent_OPERATION_UPSERT:
c.cache.Insert(event.Resource)
case pbresource.WatchEvent_OPERATION_DELETE:
c.cache.Delete(event.Resource)
}
// Before adding the resource into the queue we must clone it.
// While we want the cache to not have duplicate copies of all the
// data, we do want downstream consumers like dependency mappers and
// controllers to be able to freely modify the data they are given.
// Therefore we clone the resource here to prevent any accidental
// mutation of data held by the cache (and presumably by the resource
// service assuming that the regular client we were given is the inmem
// variant)
add(protoutil.Clone(event.Resource))
}
}
func (c *controllerRunner) runMapper(
ctx context.Context,
w *watch,
from queue.WorkQueue[mapperRequest],
to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error {
logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType))
for {
item, shutdown := from.Get()
if shutdown {
return nil
}
if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
}
from.Forget(item)
from.Done(item)
}
}
func (c *controllerRunner) runCustomMapper(
ctx context.Context,
cw customWatch,
from queue.WorkQueue[Event],
to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error {
logger := c.logger.With("watched_event", cw.source)
for {
item, shutdown := from.Get()
if shutdown {
return nil
}
if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
}
from.Forget(item)
from.Done(item)
}
}
func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error {
var reqs []Request
if err := c.handlePanic(func() error {
var err error
reqs, err = mapper(ctx, c.runtime(logger.With("map-request-key", item.Key())), item)
return err
}); err != nil {
return err
}
for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedTypeWatch.watchedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedTypeWatch.watchedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
}
to.Add(r)
}
return nil
}
func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error {
for {
req, shutdown := queue.Get()
if shutdown {
return nil
}
c.logger.Trace("handling request", "request", req)
err := c.handlePanic(func() error {
return c.ctrl.reconciler.Reconcile(ctx, c.runtime(c.logger.With("resource-id", req.ID.String())), req)
})
if err == nil {
queue.Forget(req)
} else {
var requeueAfter RequeueAfterError
if errors.As(err, &requeueAfter) {
queue.Forget(req)
queue.AddAfter(req, time.Duration(requeueAfter))
} else {
queue.AddRateLimited(req)
}
}
queue.Done(req)
}
}
func (c *controllerRunner) handlePanic(fn func() error) (err error) {
defer func() {
if r := recover(); r != nil {
stack := hclog.Stacktrace()
c.logger.Error("controller panic",
"panic", r,
"stack", stack,
)
err = fmt.Errorf("panic [recovered]: %v", r)
return
}
}()
return fn()
}
func (c *controllerRunner) runtime(logger hclog.Logger) Runtime {
return Runtime{
// dependency mappers and controllers are always given the cloning client
// so that they do not have to care about mutating values that they read
// through the client.
Client: c.runtimeClient,
Logger: logger,
// ensure that resources queried via the cache get cloned so that the
// dependency mapper or reconciler is free to modify them.
Cache: cache.NewCloningReadOnlyCache(c.cache),
}
}
type mapperRequest struct{ res *pbresource.Resource }
// Key satisfies the queue.ItemType interface. It returns a string which will be
// used to de-duplicate requests in the queue.
func (i mapperRequest) Key() string {
return fmt.Sprintf(
"type=%q,part=%q,peer=%q,ns=%q,name=%q,uid=%q",
resource.ToGVK(i.res.Id.Type),
i.res.Id.Tenancy.Partition,
i.res.Id.Tenancy.PeerName,
i.res.Id.Tenancy.Namespace,
i.res.Id.Name,
i.res.Id.Uid,
)
}