From 98c9702ba3e1b873324dc1ed284c2d1c43293967 Mon Sep 17 00:00:00 2001 From: Melissa Kam <3768460+mkam@users.noreply.github.com> Date: Fri, 19 Jan 2024 11:47:48 -0600 Subject: [PATCH] [CC-7031] Add initialization support to resource controllers (#20138) * Add Initializer to the controller The Initializer adds support for running any required initialization steps when the controller is first started. * Implement HCP Link initializer The link initializer will create a Link resource if the cloud configuration has been set. * Simplify retry logic and testing * Remove internal retry, replace with logging logic --- agent/config/builder.go | 15 ++++- agent/consul/server.go | 1 + agent/hcp/config/config.go | 6 ++ agent/hcp/deps.go | 2 + .../controller-architecture/controllers.md | 1 + .../controller-architecture/guide.md | 53 +++++++++++++++ internal/controller/controller.go | 13 ++++ internal/controller/controller_test.go | 51 ++++++++++++++- internal/controller/runner.go | 10 +++ .../internal/controllers/link/controller.go | 65 +++++++++++++++++-- .../controllers/link/controller_test.go | 58 +++++++++++++++-- internal/hcp/internal/controllers/register.go | 3 + internal/hcp/internal/types/link.go | 8 ++- 13 files changed, 271 insertions(+), 15 deletions(-) diff --git a/agent/config/builder.go b/agent/config/builder.go index b1b4cb4740..307f22320b 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -2570,7 +2570,9 @@ func validateAutoConfigAuthorizer(rt RuntimeConfig) error { func (b *builder) cloudConfigVal(v Config) hcpconfig.CloudConfig { val := hcpconfig.CloudConfig{ - ResourceID: os.Getenv("HCP_RESOURCE_ID"), + ResourceID: os.Getenv("HCP_RESOURCE_ID"), + ClientID: os.Getenv("HCP_CLIENT_ID"), + ClientSecret: os.Getenv("HCP_CLIENT_SECRET"), } // Node id might get overridden in setup.go:142 nodeID := stringVal(v.NodeID) @@ -2581,8 +2583,6 @@ func (b *builder) cloudConfigVal(v Config) hcpconfig.CloudConfig { return val } - val.ClientID = stringVal(v.Cloud.ClientID) - val.ClientSecret = stringVal(v.Cloud.ClientSecret) val.AuthURL = stringVal(v.Cloud.AuthURL) val.Hostname = stringVal(v.Cloud.Hostname) val.ScadaAddress = stringVal(v.Cloud.ScadaAddress) @@ -2590,6 +2590,15 @@ func (b *builder) cloudConfigVal(v Config) hcpconfig.CloudConfig { if resourceID := stringVal(v.Cloud.ResourceID); resourceID != "" { val.ResourceID = resourceID } + + if clientID := stringVal(v.Cloud.ClientID); clientID != "" { + val.ClientID = clientID + } + + if clientSecret := stringVal(v.Cloud.ClientSecret); clientSecret != "" { + val.ClientSecret = clientSecret + } + return val } diff --git a/agent/consul/server.go b/agent/consul/server.go index cd31ca2807..1415d7194c 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -971,6 +971,7 @@ func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) error hcpctl.RegisterControllers(s.controllerManager, hcpctl.ControllerDependencies{ ResourceApisEnabled: s.useV2Resources, HCPAllowV2ResourceApis: s.hcpAllowV2Resources, + CloudConfig: deps.HCP.Config, }) // When not enabled, the v1 tenancy bridge is used by default. diff --git a/agent/hcp/config/config.go b/agent/hcp/config/config.go index e501992aad..6a60f2892f 100644 --- a/agent/hcp/config/config.go +++ b/agent/hcp/config/config.go @@ -65,3 +65,9 @@ func (c *CloudConfig) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfi opts = append(opts, hcpcfg.FromEnv(), hcpcfg.WithoutBrowserLogin()) return hcpcfg.NewHCPConfig(opts...) } + +// IsConfigured returns whether the cloud configuration has been set either +// in the configuration file or via environment variables. +func (c *CloudConfig) IsConfigured() bool { + return c.ResourceID != "" && c.ClientID != "" && c.ClientSecret != "" +} diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index 5c770f959e..90189da3cb 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -18,6 +18,7 @@ import ( // Deps contains the interfaces that the rest of Consul core depends on for HCP integration. type Deps struct { + Config config.CloudConfig Client client.Client Provider scada.Provider Sink metrics.MetricSink @@ -53,6 +54,7 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) { } return Deps{ + Config: cfg, Client: hcpClient, Provider: provider, Sink: sink, diff --git a/docs/v2-architecture/controller-architecture/controllers.md b/docs/v2-architecture/controller-architecture/controllers.md index 60dd74cdf9..b0fe269630 100644 --- a/docs/v2-architecture/controller-architecture/controllers.md +++ b/docs/v2-architecture/controller-architecture/controllers.md @@ -12,6 +12,7 @@ A controller consists of several parts: 2. **Additional watched types** - These are additional types a controller may care about in addition to the main watched type. 3. **Additional custom watches** - These are the watches for things that aren't resources in Consul. 4. **Reconciler** - This is the instance that's responsible for reconciling requests whenever there's an event for the main watched type or for any of the watched types. +5. **Initializer** - This is responsible for anything that needs to be executed when the controller is started. A basic controller setup could look like this: diff --git a/docs/v2-architecture/controller-architecture/guide.md b/docs/v2-architecture/controller-architecture/guide.md index 969106364e..fdb907371c 100644 --- a/docs/v2-architecture/controller-architecture/guide.md +++ b/docs/v2-architecture/controller-architecture/guide.md @@ -425,6 +425,59 @@ func barController() controller.Controller { [`controller.PlacementEachServer`]: https://pkg.go.dev/github.com/hashicorp/consul/internal/controller#PlacementEachServer +### Initializer + +If your controller needs to execute setup steps when the controller +first starts and before any resources are reconciled, you can add an +Initializer. + +If the controller has an Initializer, it will not start unless the +Initialize method is successful. The controller does not have retry +logic for the initialize method specifically, but the controller +is restarted on error. When restarted, the controller will attempt +to execute the initialization again. + +The example below has the controller creating a default resource as +part of initialization. + +```Go +package foo + +import ( + "context" + + "github.com/hashicorp/consul/internal/controller" + pbv1alpha1 "github.com/hashicorp/consul/proto-public/pbfoo/v1alpha1" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +func barController() controller.Controller { + return controller.ForType(pbv1alpha1.BarType). + WithReconciler(barReconciler{}). + WithInitializer(barInitializer{}) +} + +type barInitializer struct{} + +func (barInitializer) Initialize(ctx context.Context, rt controller.Runtime) error { + _, err := rt.Client.Write(ctx, + &pbresource.WriteRequest{ + Resource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: "default", + Type: pbv1alpha1.BarType, + }, + }, + }, + ) + if err != nil { + return err + } + + return nil +} +``` + ## Ownership & Cascading Deletion The resource service implements a lightweight `1:N` ownership model where, on diff --git a/internal/controller/controller.go b/internal/controller/controller.go index d6ed1671e7..56a1392433 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -33,6 +33,7 @@ type DependencyMapper func( type Controller struct { name string reconciler Reconciler + initializer Initializer managedTypeWatch *watch watches map[string]*watch queries map[string]cache.Query @@ -309,3 +310,15 @@ func (r Request) Key() string { r.ID.Uid, ) } + +// Initializer implements the business logic that is executed when the +// controller is first started. +type Initializer interface { + Initialize(ctx context.Context, rt Runtime) error +} + +// WithInitializer changes the controller's initializer. +func (c *Controller) WithInitializer(initializer Initializer) *Controller { + c.initializer = initializer + return c +} diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index 548a0d8edb..992e45f8e0 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -50,6 +50,8 @@ func TestController_API(t *testing.T) { }) rec := newTestReconciler() + expectedInitAttempts := 2 // testing retries + init := newTestInitializer(expectedInitAttempts) client := svctest.NewResourceServiceBuilder(). WithRegisterFns(demo.RegisterTypes). Run(t) @@ -70,13 +72,20 @@ func TestController_API(t *testing.T) { WithQuery("some-query", errQuery). WithCustomWatch(concertSource, concertMapper). WithBackoff(10*time.Millisecond, 100*time.Millisecond). - WithReconciler(rec) + WithReconciler(rec). + WithInitializer(init) mgr := controller.NewManager(client, testutil.Logger(t)) mgr.Register(ctrl) mgr.SetRaftLeader(true) go mgr.Run(testContext(t)) + t.Run("initialize", func(t *testing.T) { + for i := 0; i < expectedInitAttempts; i++ { + init.wait(t) + } + }) + t.Run("managed resource type", func(t *testing.T) { res, err := demo.GenerateV2Artist() require.NoError(t, err) @@ -493,3 +502,43 @@ type Concert struct { func (c Concert) Key() string { return c.name } + +func newTestInitializer(errorCount int) *testInitializer { + return &testInitializer{ + calls: make(chan error, 1), + expectedAttempts: errorCount, + } +} + +type testInitializer struct { + expectedAttempts int // number of times the initializer should run to test retries + attempts int // running count of times initialize is called + calls chan error +} + +func (i *testInitializer) Initialize(_ context.Context, _ controller.Runtime) error { + i.attempts++ + if i.attempts < i.expectedAttempts { + // Return an error to cause a retry + err := errors.New("initialization error") + i.calls <- err + return err + } else { + i.calls <- nil + return nil + } +} + +func (i *testInitializer) wait(t *testing.T) { + t.Helper() + select { + case err := <-i.calls: + if err == nil { + // Initialize did not error, no more calls should be expected + close(i.calls) + } + return + case <-time.After(1000 * time.Millisecond): + t.Fatal("Initialize was not called after 1000ms") + } +} diff --git a/internal/controller/runner.go b/internal/controller/runner.go index 1c60a6f15e..5a761689d0 100644 --- a/internal/controller/runner.go +++ b/internal/controller/runner.go @@ -67,6 +67,16 @@ func (c *controllerRunner) run(ctx context.Context) error { c.logger.Debug("controller running") defer c.logger.Debug("controller stopping") + // Initialize the controller if required + if c.ctrl.initializer != nil { + c.logger.Debug("controller initializing") + err := c.ctrl.initializer.Initialize(ctx, c.runtime(c.logger)) + if err != nil { + return err + } + c.logger.Debug("controller initialized") + } + c.cache = c.ctrl.buildCache() defer func() { // once no longer running we should nil out the cache diff --git a/internal/hcp/internal/controllers/link/controller.go b/internal/hcp/internal/controllers/link/controller.go index b1b607f600..b263c5e417 100644 --- a/internal/hcp/internal/controllers/link/controller.go +++ b/internal/hcp/internal/controllers/link/controller.go @@ -5,17 +5,22 @@ package link import ( "context" - gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models" + "strings" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" + gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models" + hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/internal/resource" + "github.com/hashicorp/consul/internal/storage" "github.com/hashicorp/consul/proto-public/pbresource" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/hashicorp/consul/internal/controller" + "github.com/hashicorp/consul/internal/hcp/internal/types" pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2" ) @@ -37,8 +42,11 @@ var DefaultHCPClientFn HCPClientFn = func(link *pbhcp.Link) (hcpclient.Client, e return hcpClient, nil } -func LinkController(resourceApisEnabled bool, hcpAllowV2ResourceApis bool, hcpClientFn HCPClientFn) *controller.Controller { +func LinkController(resourceApisEnabled bool, hcpAllowV2ResourceApis bool, hcpClientFn HCPClientFn, cfg config.CloudConfig) *controller.Controller { return controller.NewController("link", pbhcp.LinkType). + WithInitializer(&linkInitializer{ + cloudConfig: cfg, + }). WithReconciler(&linkReconciler{ resourceApisEnabled: resourceApisEnabled, hcpAllowV2ResourceApis: hcpAllowV2ResourceApis, @@ -169,3 +177,52 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r return r.writeStatusIfNotEqual(ctx, rt, res, newStatus) } + +type linkInitializer struct { + cloudConfig config.CloudConfig +} + +func (i *linkInitializer) Initialize(ctx context.Context, rt controller.Runtime) error { + if !i.cloudConfig.IsConfigured() { + return nil + } + + // Construct a link resource to reflect the configuration + data, err := anypb.New(&pbhcp.Link{ + ResourceId: i.cloudConfig.ResourceID, + ClientId: i.cloudConfig.ClientID, + ClientSecret: i.cloudConfig.ClientSecret, + }) + if err != nil { + return err + } + + // Create the link resource for a configuration-based link + _, err = rt.Client.Write(ctx, + &pbresource.WriteRequest{ + Resource: &pbresource.Resource{ + Id: &pbresource.ID{ + Name: types.LinkName, + Type: pbhcp.LinkType, + }, + Metadata: map[string]string{ + types.MetadataSourceKey: types.MetadataSourceConfig, + }, + Data: data, + }, + }, + ) + if err != nil { + if strings.Contains(err.Error(), storage.ErrWrongUid.Error()) || + strings.Contains(err.Error(), "leader unknown") { + // If the error is likely ignorable and could eventually resolve itself, + // log it as TRACE rather than ERROR. + rt.Logger.Trace("error initializing controller", "error", err) + } else { + rt.Logger.Error("error initializing controller", "error", err) + } + return err + } + + return nil +} diff --git a/internal/hcp/internal/controllers/link/controller_test.go b/internal/hcp/internal/controllers/link/controller_test.go index e7c0553c3e..8601cd9984 100644 --- a/internal/hcp/internal/controllers/link/controller_test.go +++ b/internal/hcp/internal/controllers/link/controller_test.go @@ -6,14 +6,17 @@ package link import ( "context" "fmt" - gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models" + "testing" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "testing" + + gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models" svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" hcpclient "github.com/hashicorp/consul/agent/hcp/client" + "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/hcp/internal/types" "github.com/hashicorp/consul/internal/resource/resourcetest" @@ -78,7 +81,7 @@ func (suite *controllerSuite) TestController_Ok() { HCPPortalURL: "http://test.com", AccessLevel: &readOnly, }, nil) - mgr.Register(LinkController(false, false, mockClientFn)) + mgr.Register(LinkController(false, false, mockClientFn, config.CloudConfig{})) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) @@ -102,11 +105,54 @@ func (suite *controllerSuite) TestController_Ok() { require.Equal(suite.T(), pbhcp.AccessLevel_ACCESS_LEVEL_GLOBAL_READ_ONLY, updatedLink.AccessLevel) } +func (suite *controllerSuite) TestController_Initialize() { + // Run the controller manager with a configured link + mgr := controller.NewManager(suite.client, suite.rt.Logger) + + mockClient, mockClientFn := mockHcpClientFn(suite.T()) + readWrite := gnmmod.HashicorpCloudGlobalNetworkManager20220215ClusterConsulAccessLevelCONSULACCESSLEVELGLOBALREADWRITE + mockClient.EXPECT().GetCluster(mock.Anything).Return(&hcpclient.Cluster{ + HCPPortalURL: "http://test.com", + AccessLevel: &readWrite, + }, nil) + + cloudCfg := config.CloudConfig{ + ClientID: "client-id-abc", + ClientSecret: "client-secret-abc", + ResourceID: "resource-id-abc", + } + + mgr.Register(LinkController(false, false, mockClientFn, cloudCfg)) + mgr.SetRaftLeader(true) + go mgr.Run(suite.ctx) + + // Wait for link to be created by initializer + id := &pbresource.ID{ + Type: pbhcp.LinkType, + Name: types.LinkName, + } + suite.T().Cleanup(suite.deleteResourceFunc(id)) + r := suite.client.WaitForResourceExists(suite.T(), id) + + // Check that created link has expected values + var link pbhcp.Link + err := r.Data.UnmarshalTo(&link) + require.NoError(suite.T(), err) + + require.Equal(suite.T(), cloudCfg.ResourceID, link.ResourceId) + require.Equal(suite.T(), cloudCfg.ClientID, link.ClientId) + require.Equal(suite.T(), cloudCfg.ClientSecret, link.ClientSecret) + require.Equal(suite.T(), types.MetadataSourceConfig, r.Metadata[types.MetadataSourceKey]) + + // Wait for link to be connected successfully + suite.client.WaitForStatusCondition(suite.T(), id, StatusKey, ConditionLinked(link.ResourceId)) +} + func (suite *controllerSuite) TestControllerResourceApisEnabled_LinkDisabled() { // Run the controller manager mgr := controller.NewManager(suite.client, suite.rt.Logger) _, mockClientFunc := mockHcpClientFn(suite.T()) - mgr.Register(LinkController(true, false, mockClientFunc)) + mgr.Register(LinkController(true, false, mockClientFunc, config.CloudConfig{})) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) @@ -132,7 +178,7 @@ func (suite *controllerSuite) TestControllerResourceApisEnabledWithOverride_Link HCPPortalURL: "http://test.com", }, nil) - mgr.Register(LinkController(true, true, mockClientFunc)) + mgr.Register(LinkController(true, true, mockClientFunc, config.CloudConfig{})) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) @@ -156,7 +202,7 @@ func (suite *controllerSuite) TestController_GetClusterError() { mockClient, mockClientFunc := mockHcpClientFn(suite.T()) mockClient.EXPECT().GetCluster(mock.Anything).Return(nil, fmt.Errorf("error")) - mgr.Register(LinkController(true, true, mockClientFunc)) + mgr.Register(LinkController(true, true, mockClientFunc, config.CloudConfig{})) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) diff --git a/internal/hcp/internal/controllers/register.go b/internal/hcp/internal/controllers/register.go index c56e45eafe..30cc678866 100644 --- a/internal/hcp/internal/controllers/register.go +++ b/internal/hcp/internal/controllers/register.go @@ -5,11 +5,13 @@ package controllers import ( hcpclient "github.com/hashicorp/consul/agent/hcp/client" + "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/hcp/internal/controllers/link" ) type Dependencies struct { + CloudConfig config.CloudConfig ResourceApisEnabled bool HCPAllowV2ResourceApis bool HCPClient hcpclient.Client @@ -20,5 +22,6 @@ func Register(mgr *controller.Manager, deps Dependencies) { deps.ResourceApisEnabled, deps.HCPAllowV2ResourceApis, link.DefaultHCPClientFn, + deps.CloudConfig, )) } diff --git a/internal/hcp/internal/types/link.go b/internal/hcp/internal/types/link.go index cd8bfe39a2..ca8432ffed 100644 --- a/internal/hcp/internal/types/link.go +++ b/internal/hcp/internal/types/link.go @@ -13,6 +13,12 @@ import ( type DecodedLink = resource.DecodedResource[*pbhcp.Link] +const ( + LinkName = "global" + MetadataSourceKey = "source" + MetadataSourceConfig = "config" +) + var ( linkConfigurationNameError = errors.New("only a single Link resource is allowed and it must be named global") ) @@ -31,7 +37,7 @@ var ValidateLink = resource.DecodeAndValidate(validateLink) func validateLink(res *DecodedLink) error { var err error - if res.Id.Name != "global" { + if res.Id.Name != LinkName { err = multierror.Append(err, resource.ErrInvalidField{ Name: "name", Wrapped: linkConfigurationNameError,