mirror of
https://github.com/status-im/consul.git
synced 2025-01-21 19:20:41 +00:00
[CC-7031] Add initialization support to resource controllers (#20138)
* Add Initializer to the controller The Initializer adds support for running any required initialization steps when the controller is first started. * Implement HCP Link initializer The link initializer will create a Link resource if the cloud configuration has been set. * Simplify retry logic and testing * Remove internal retry, replace with logging logic
This commit is contained in:
parent
0a261682cd
commit
98c9702ba3
@ -2570,7 +2570,9 @@ func validateAutoConfigAuthorizer(rt RuntimeConfig) error {
|
||||
|
||||
func (b *builder) cloudConfigVal(v Config) hcpconfig.CloudConfig {
|
||||
val := hcpconfig.CloudConfig{
|
||||
ResourceID: os.Getenv("HCP_RESOURCE_ID"),
|
||||
ResourceID: os.Getenv("HCP_RESOURCE_ID"),
|
||||
ClientID: os.Getenv("HCP_CLIENT_ID"),
|
||||
ClientSecret: os.Getenv("HCP_CLIENT_SECRET"),
|
||||
}
|
||||
// Node id might get overridden in setup.go:142
|
||||
nodeID := stringVal(v.NodeID)
|
||||
@ -2581,8 +2583,6 @@ func (b *builder) cloudConfigVal(v Config) hcpconfig.CloudConfig {
|
||||
return val
|
||||
}
|
||||
|
||||
val.ClientID = stringVal(v.Cloud.ClientID)
|
||||
val.ClientSecret = stringVal(v.Cloud.ClientSecret)
|
||||
val.AuthURL = stringVal(v.Cloud.AuthURL)
|
||||
val.Hostname = stringVal(v.Cloud.Hostname)
|
||||
val.ScadaAddress = stringVal(v.Cloud.ScadaAddress)
|
||||
@ -2590,6 +2590,15 @@ func (b *builder) cloudConfigVal(v Config) hcpconfig.CloudConfig {
|
||||
if resourceID := stringVal(v.Cloud.ResourceID); resourceID != "" {
|
||||
val.ResourceID = resourceID
|
||||
}
|
||||
|
||||
if clientID := stringVal(v.Cloud.ClientID); clientID != "" {
|
||||
val.ClientID = clientID
|
||||
}
|
||||
|
||||
if clientSecret := stringVal(v.Cloud.ClientSecret); clientSecret != "" {
|
||||
val.ClientSecret = clientSecret
|
||||
}
|
||||
|
||||
return val
|
||||
}
|
||||
|
||||
|
@ -971,6 +971,7 @@ func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) error
|
||||
hcpctl.RegisterControllers(s.controllerManager, hcpctl.ControllerDependencies{
|
||||
ResourceApisEnabled: s.useV2Resources,
|
||||
HCPAllowV2ResourceApis: s.hcpAllowV2Resources,
|
||||
CloudConfig: deps.HCP.Config,
|
||||
})
|
||||
|
||||
// When not enabled, the v1 tenancy bridge is used by default.
|
||||
|
@ -65,3 +65,9 @@ func (c *CloudConfig) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfi
|
||||
opts = append(opts, hcpcfg.FromEnv(), hcpcfg.WithoutBrowserLogin())
|
||||
return hcpcfg.NewHCPConfig(opts...)
|
||||
}
|
||||
|
||||
// IsConfigured returns whether the cloud configuration has been set either
|
||||
// in the configuration file or via environment variables.
|
||||
func (c *CloudConfig) IsConfigured() bool {
|
||||
return c.ResourceID != "" && c.ClientID != "" && c.ClientSecret != ""
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
|
||||
// Deps contains the interfaces that the rest of Consul core depends on for HCP integration.
|
||||
type Deps struct {
|
||||
Config config.CloudConfig
|
||||
Client client.Client
|
||||
Provider scada.Provider
|
||||
Sink metrics.MetricSink
|
||||
@ -53,6 +54,7 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
|
||||
}
|
||||
|
||||
return Deps{
|
||||
Config: cfg,
|
||||
Client: hcpClient,
|
||||
Provider: provider,
|
||||
Sink: sink,
|
||||
|
@ -12,6 +12,7 @@ A controller consists of several parts:
|
||||
2. **Additional watched types** - These are additional types a controller may care about in addition to the main watched type.
|
||||
3. **Additional custom watches** - These are the watches for things that aren't resources in Consul.
|
||||
4. **Reconciler** - This is the instance that's responsible for reconciling requests whenever there's an event for the main watched type or for any of the watched types.
|
||||
5. **Initializer** - This is responsible for anything that needs to be executed when the controller is started.
|
||||
|
||||
A basic controller setup could look like this:
|
||||
|
||||
|
@ -425,6 +425,59 @@ func barController() controller.Controller {
|
||||
|
||||
[`controller.PlacementEachServer`]: https://pkg.go.dev/github.com/hashicorp/consul/internal/controller#PlacementEachServer
|
||||
|
||||
### Initializer
|
||||
|
||||
If your controller needs to execute setup steps when the controller
|
||||
first starts and before any resources are reconciled, you can add an
|
||||
Initializer.
|
||||
|
||||
If the controller has an Initializer, it will not start unless the
|
||||
Initialize method is successful. The controller does not have retry
|
||||
logic for the initialize method specifically, but the controller
|
||||
is restarted on error. When restarted, the controller will attempt
|
||||
to execute the initialization again.
|
||||
|
||||
The example below has the controller creating a default resource as
|
||||
part of initialization.
|
||||
|
||||
```Go
|
||||
package foo
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
pbv1alpha1 "github.com/hashicorp/consul/proto-public/pbfoo/v1alpha1"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
func barController() controller.Controller {
|
||||
return controller.ForType(pbv1alpha1.BarType).
|
||||
WithReconciler(barReconciler{}).
|
||||
WithInitializer(barInitializer{})
|
||||
}
|
||||
|
||||
type barInitializer struct{}
|
||||
|
||||
func (barInitializer) Initialize(ctx context.Context, rt controller.Runtime) error {
|
||||
_, err := rt.Client.Write(ctx,
|
||||
&pbresource.WriteRequest{
|
||||
Resource: &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Name: "default",
|
||||
Type: pbv1alpha1.BarType,
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
## Ownership & Cascading Deletion
|
||||
|
||||
The resource service implements a lightweight `1:N` ownership model where, on
|
||||
|
@ -33,6 +33,7 @@ type DependencyMapper func(
|
||||
type Controller struct {
|
||||
name string
|
||||
reconciler Reconciler
|
||||
initializer Initializer
|
||||
managedTypeWatch *watch
|
||||
watches map[string]*watch
|
||||
queries map[string]cache.Query
|
||||
@ -309,3 +310,15 @@ func (r Request) Key() string {
|
||||
r.ID.Uid,
|
||||
)
|
||||
}
|
||||
|
||||
// Initializer implements the business logic that is executed when the
|
||||
// controller is first started.
|
||||
type Initializer interface {
|
||||
Initialize(ctx context.Context, rt Runtime) error
|
||||
}
|
||||
|
||||
// WithInitializer changes the controller's initializer.
|
||||
func (c *Controller) WithInitializer(initializer Initializer) *Controller {
|
||||
c.initializer = initializer
|
||||
return c
|
||||
}
|
||||
|
@ -50,6 +50,8 @@ func TestController_API(t *testing.T) {
|
||||
})
|
||||
|
||||
rec := newTestReconciler()
|
||||
expectedInitAttempts := 2 // testing retries
|
||||
init := newTestInitializer(expectedInitAttempts)
|
||||
client := svctest.NewResourceServiceBuilder().
|
||||
WithRegisterFns(demo.RegisterTypes).
|
||||
Run(t)
|
||||
@ -70,13 +72,20 @@ func TestController_API(t *testing.T) {
|
||||
WithQuery("some-query", errQuery).
|
||||
WithCustomWatch(concertSource, concertMapper).
|
||||
WithBackoff(10*time.Millisecond, 100*time.Millisecond).
|
||||
WithReconciler(rec)
|
||||
WithReconciler(rec).
|
||||
WithInitializer(init)
|
||||
|
||||
mgr := controller.NewManager(client, testutil.Logger(t))
|
||||
mgr.Register(ctrl)
|
||||
mgr.SetRaftLeader(true)
|
||||
go mgr.Run(testContext(t))
|
||||
|
||||
t.Run("initialize", func(t *testing.T) {
|
||||
for i := 0; i < expectedInitAttempts; i++ {
|
||||
init.wait(t)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("managed resource type", func(t *testing.T) {
|
||||
res, err := demo.GenerateV2Artist()
|
||||
require.NoError(t, err)
|
||||
@ -493,3 +502,43 @@ type Concert struct {
|
||||
func (c Concert) Key() string {
|
||||
return c.name
|
||||
}
|
||||
|
||||
func newTestInitializer(errorCount int) *testInitializer {
|
||||
return &testInitializer{
|
||||
calls: make(chan error, 1),
|
||||
expectedAttempts: errorCount,
|
||||
}
|
||||
}
|
||||
|
||||
type testInitializer struct {
|
||||
expectedAttempts int // number of times the initializer should run to test retries
|
||||
attempts int // running count of times initialize is called
|
||||
calls chan error
|
||||
}
|
||||
|
||||
func (i *testInitializer) Initialize(_ context.Context, _ controller.Runtime) error {
|
||||
i.attempts++
|
||||
if i.attempts < i.expectedAttempts {
|
||||
// Return an error to cause a retry
|
||||
err := errors.New("initialization error")
|
||||
i.calls <- err
|
||||
return err
|
||||
} else {
|
||||
i.calls <- nil
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (i *testInitializer) wait(t *testing.T) {
|
||||
t.Helper()
|
||||
select {
|
||||
case err := <-i.calls:
|
||||
if err == nil {
|
||||
// Initialize did not error, no more calls should be expected
|
||||
close(i.calls)
|
||||
}
|
||||
return
|
||||
case <-time.After(1000 * time.Millisecond):
|
||||
t.Fatal("Initialize was not called after 1000ms")
|
||||
}
|
||||
}
|
||||
|
@ -67,6 +67,16 @@ func (c *controllerRunner) run(ctx context.Context) error {
|
||||
c.logger.Debug("controller running")
|
||||
defer c.logger.Debug("controller stopping")
|
||||
|
||||
// Initialize the controller if required
|
||||
if c.ctrl.initializer != nil {
|
||||
c.logger.Debug("controller initializing")
|
||||
err := c.ctrl.initializer.Initialize(ctx, c.runtime(c.logger))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.logger.Debug("controller initialized")
|
||||
}
|
||||
|
||||
c.cache = c.ctrl.buildCache()
|
||||
defer func() {
|
||||
// once no longer running we should nil out the cache
|
||||
|
@ -5,17 +5,22 @@ package link
|
||||
|
||||
import (
|
||||
"context"
|
||||
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
|
||||
"strings"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
|
||||
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
|
||||
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/hcp/internal/types"
|
||||
pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2"
|
||||
)
|
||||
|
||||
@ -37,8 +42,11 @@ var DefaultHCPClientFn HCPClientFn = func(link *pbhcp.Link) (hcpclient.Client, e
|
||||
return hcpClient, nil
|
||||
}
|
||||
|
||||
func LinkController(resourceApisEnabled bool, hcpAllowV2ResourceApis bool, hcpClientFn HCPClientFn) *controller.Controller {
|
||||
func LinkController(resourceApisEnabled bool, hcpAllowV2ResourceApis bool, hcpClientFn HCPClientFn, cfg config.CloudConfig) *controller.Controller {
|
||||
return controller.NewController("link", pbhcp.LinkType).
|
||||
WithInitializer(&linkInitializer{
|
||||
cloudConfig: cfg,
|
||||
}).
|
||||
WithReconciler(&linkReconciler{
|
||||
resourceApisEnabled: resourceApisEnabled,
|
||||
hcpAllowV2ResourceApis: hcpAllowV2ResourceApis,
|
||||
@ -169,3 +177,52 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r
|
||||
|
||||
return r.writeStatusIfNotEqual(ctx, rt, res, newStatus)
|
||||
}
|
||||
|
||||
type linkInitializer struct {
|
||||
cloudConfig config.CloudConfig
|
||||
}
|
||||
|
||||
func (i *linkInitializer) Initialize(ctx context.Context, rt controller.Runtime) error {
|
||||
if !i.cloudConfig.IsConfigured() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Construct a link resource to reflect the configuration
|
||||
data, err := anypb.New(&pbhcp.Link{
|
||||
ResourceId: i.cloudConfig.ResourceID,
|
||||
ClientId: i.cloudConfig.ClientID,
|
||||
ClientSecret: i.cloudConfig.ClientSecret,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the link resource for a configuration-based link
|
||||
_, err = rt.Client.Write(ctx,
|
||||
&pbresource.WriteRequest{
|
||||
Resource: &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Name: types.LinkName,
|
||||
Type: pbhcp.LinkType,
|
||||
},
|
||||
Metadata: map[string]string{
|
||||
types.MetadataSourceKey: types.MetadataSourceConfig,
|
||||
},
|
||||
Data: data,
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), storage.ErrWrongUid.Error()) ||
|
||||
strings.Contains(err.Error(), "leader unknown") {
|
||||
// If the error is likely ignorable and could eventually resolve itself,
|
||||
// log it as TRACE rather than ERROR.
|
||||
rt.Logger.Trace("error initializing controller", "error", err)
|
||||
} else {
|
||||
rt.Logger.Error("error initializing controller", "error", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -6,14 +6,17 @@ package link
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"testing"
|
||||
|
||||
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
|
||||
|
||||
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/hcp/internal/types"
|
||||
"github.com/hashicorp/consul/internal/resource/resourcetest"
|
||||
@ -78,7 +81,7 @@ func (suite *controllerSuite) TestController_Ok() {
|
||||
HCPPortalURL: "http://test.com",
|
||||
AccessLevel: &readOnly,
|
||||
}, nil)
|
||||
mgr.Register(LinkController(false, false, mockClientFn))
|
||||
mgr.Register(LinkController(false, false, mockClientFn, config.CloudConfig{}))
|
||||
mgr.SetRaftLeader(true)
|
||||
go mgr.Run(suite.ctx)
|
||||
|
||||
@ -102,11 +105,54 @@ func (suite *controllerSuite) TestController_Ok() {
|
||||
require.Equal(suite.T(), pbhcp.AccessLevel_ACCESS_LEVEL_GLOBAL_READ_ONLY, updatedLink.AccessLevel)
|
||||
}
|
||||
|
||||
func (suite *controllerSuite) TestController_Initialize() {
|
||||
// Run the controller manager with a configured link
|
||||
mgr := controller.NewManager(suite.client, suite.rt.Logger)
|
||||
|
||||
mockClient, mockClientFn := mockHcpClientFn(suite.T())
|
||||
readWrite := gnmmod.HashicorpCloudGlobalNetworkManager20220215ClusterConsulAccessLevelCONSULACCESSLEVELGLOBALREADWRITE
|
||||
mockClient.EXPECT().GetCluster(mock.Anything).Return(&hcpclient.Cluster{
|
||||
HCPPortalURL: "http://test.com",
|
||||
AccessLevel: &readWrite,
|
||||
}, nil)
|
||||
|
||||
cloudCfg := config.CloudConfig{
|
||||
ClientID: "client-id-abc",
|
||||
ClientSecret: "client-secret-abc",
|
||||
ResourceID: "resource-id-abc",
|
||||
}
|
||||
|
||||
mgr.Register(LinkController(false, false, mockClientFn, cloudCfg))
|
||||
mgr.SetRaftLeader(true)
|
||||
go mgr.Run(suite.ctx)
|
||||
|
||||
// Wait for link to be created by initializer
|
||||
id := &pbresource.ID{
|
||||
Type: pbhcp.LinkType,
|
||||
Name: types.LinkName,
|
||||
}
|
||||
suite.T().Cleanup(suite.deleteResourceFunc(id))
|
||||
r := suite.client.WaitForResourceExists(suite.T(), id)
|
||||
|
||||
// Check that created link has expected values
|
||||
var link pbhcp.Link
|
||||
err := r.Data.UnmarshalTo(&link)
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
require.Equal(suite.T(), cloudCfg.ResourceID, link.ResourceId)
|
||||
require.Equal(suite.T(), cloudCfg.ClientID, link.ClientId)
|
||||
require.Equal(suite.T(), cloudCfg.ClientSecret, link.ClientSecret)
|
||||
require.Equal(suite.T(), types.MetadataSourceConfig, r.Metadata[types.MetadataSourceKey])
|
||||
|
||||
// Wait for link to be connected successfully
|
||||
suite.client.WaitForStatusCondition(suite.T(), id, StatusKey, ConditionLinked(link.ResourceId))
|
||||
}
|
||||
|
||||
func (suite *controllerSuite) TestControllerResourceApisEnabled_LinkDisabled() {
|
||||
// Run the controller manager
|
||||
mgr := controller.NewManager(suite.client, suite.rt.Logger)
|
||||
_, mockClientFunc := mockHcpClientFn(suite.T())
|
||||
mgr.Register(LinkController(true, false, mockClientFunc))
|
||||
mgr.Register(LinkController(true, false, mockClientFunc, config.CloudConfig{}))
|
||||
mgr.SetRaftLeader(true)
|
||||
go mgr.Run(suite.ctx)
|
||||
|
||||
@ -132,7 +178,7 @@ func (suite *controllerSuite) TestControllerResourceApisEnabledWithOverride_Link
|
||||
HCPPortalURL: "http://test.com",
|
||||
}, nil)
|
||||
|
||||
mgr.Register(LinkController(true, true, mockClientFunc))
|
||||
mgr.Register(LinkController(true, true, mockClientFunc, config.CloudConfig{}))
|
||||
mgr.SetRaftLeader(true)
|
||||
go mgr.Run(suite.ctx)
|
||||
|
||||
@ -156,7 +202,7 @@ func (suite *controllerSuite) TestController_GetClusterError() {
|
||||
mockClient, mockClientFunc := mockHcpClientFn(suite.T())
|
||||
mockClient.EXPECT().GetCluster(mock.Anything).Return(nil, fmt.Errorf("error"))
|
||||
|
||||
mgr.Register(LinkController(true, true, mockClientFunc))
|
||||
mgr.Register(LinkController(true, true, mockClientFunc, config.CloudConfig{}))
|
||||
mgr.SetRaftLeader(true)
|
||||
go mgr.Run(suite.ctx)
|
||||
|
||||
|
@ -5,11 +5,13 @@ package controllers
|
||||
|
||||
import (
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/hcp/internal/controllers/link"
|
||||
)
|
||||
|
||||
type Dependencies struct {
|
||||
CloudConfig config.CloudConfig
|
||||
ResourceApisEnabled bool
|
||||
HCPAllowV2ResourceApis bool
|
||||
HCPClient hcpclient.Client
|
||||
@ -20,5 +22,6 @@ func Register(mgr *controller.Manager, deps Dependencies) {
|
||||
deps.ResourceApisEnabled,
|
||||
deps.HCPAllowV2ResourceApis,
|
||||
link.DefaultHCPClientFn,
|
||||
deps.CloudConfig,
|
||||
))
|
||||
}
|
||||
|
@ -13,6 +13,12 @@ import (
|
||||
|
||||
type DecodedLink = resource.DecodedResource[*pbhcp.Link]
|
||||
|
||||
const (
|
||||
LinkName = "global"
|
||||
MetadataSourceKey = "source"
|
||||
MetadataSourceConfig = "config"
|
||||
)
|
||||
|
||||
var (
|
||||
linkConfigurationNameError = errors.New("only a single Link resource is allowed and it must be named global")
|
||||
)
|
||||
@ -31,7 +37,7 @@ var ValidateLink = resource.DecodeAndValidate(validateLink)
|
||||
func validateLink(res *DecodedLink) error {
|
||||
var err error
|
||||
|
||||
if res.Id.Name != "global" {
|
||||
if res.Id.Name != LinkName {
|
||||
err = multierror.Append(err, resource.ErrInvalidField{
|
||||
Name: "name",
|
||||
Wrapped: linkConfigurationNameError,
|
||||
|
Loading…
x
Reference in New Issue
Block a user