mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 14:24:39 +00:00
Move HCP Manager lifecycle management out of Link controller (#20401)
* Add function to get update channel for watching HCP Link * Add MonitorHCPLink function This function can be called in a goroutine to manage the lifecycle of the HCP manager. * Update HCP Manager config in link monitor before starting This updates HCPMonitorLink so it updates the HCP manager with an HCP client and management token when a Link is upserted. * Let MonitorHCPManager handle lifecycle instead of link controller * Remove cleanup from Link controller and move it to MonitorHCPLink Previously, the Link Controller was responsible for cleaning up the HCP-related files on the file system. This change makes it so MonitorHCPLink handles this cleanup. As a result, we are able to remove the PlacementEachServer placement strategy for the Link controller because it no longer needs to do this per-node cleanup. * Remove HCP Manager dependency from Link Controller The Link controller does not need to have HCP Manager as a dependency anymore, so this removes that dependency in order to simplify the design. * Add Linked prefix to Linked status variables This is in preparation for adding a new status type to the Link resource. * Add new "validated" status type to link resource The link resource controller will now set a "validated" status in addition to the "linked" status. This is needed so that other components (eg the HCP manager) know when the Link is ready to link with HCP. * Fix tests * Handle new 'EndOfSnapshot' WatchList event * Fix watch test * Remove unnecessary config from TestAgent_scadaProvider Since the Scada provider is now started on agent startup regardless of whether a cloud config is provided, this removes the cloud config override from the relevant test. This change is not exactly related to the changes from this PR, but rather is something small and sort of related that was noticed while working on this PR. * Simplify link watch test and remove sleep from link watch This updates the link watch test so that it uses more mocks and does not require setting up the infrastructure for the HCP Link controller. This also removes the time.Sleep delay in the link watcher loop in favor of an error counter. When we receive 10 consecutive errors, we shut down the link watcher loop. * Add better logging for link validation. Remove EndOfSnapshot test. * Refactor link monitor test into a table test * Add some clarifying comments to link monitor * Simplify link watch test * Test a bunch more errors cases in link monitor test * Use exponential backoff instead of errorCounter in LinkWatch * Move link watch and link monitor into a single goroutine called from server.go * Refactor HCP link watcher to use single go-routine. Previously, if the WatchClient errored, we would've never recovered because we never retry to create the stream. With this change, we have a single goroutine that runs for the life of the server agent and if the WatchClient stream ever errors, we retry the creation of the stream with an exponential backoff.
This commit is contained in:
parent
9d8f9a5470
commit
5fb6ab6a3a
@ -32,10 +32,6 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/google/tcpproxy"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/hcp-scada-provider/capability"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
@ -44,6 +40,11 @@ import (
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
"gopkg.in/square/go-jose.v2/jwt"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/hcp-scada-provider/capability"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/checks"
|
||||
@ -6343,12 +6344,6 @@ func TestAgent_scadaProvider(t *testing.T) {
|
||||
OverrideDeps: func(deps *BaseDeps) {
|
||||
deps.HCP.Provider = pvd
|
||||
},
|
||||
Overrides: `
|
||||
cloud {
|
||||
resource_id = "organization/0b9de9a3-8403-4ca6-aba8-fca752f42100/project/0b9de9a3-8403-4ca6-aba8-fca752f42100/consul.cluster/0b9de9a3-8403-4ca6-aba8-fca752f42100"
|
||||
client_id = "test"
|
||||
client_secret = "test"
|
||||
}`,
|
||||
}
|
||||
defer a.Shutdown()
|
||||
require.NoError(t, a.Start(t))
|
||||
|
@ -21,6 +21,11 @@ import (
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/fullstorydev/grpchan/inprocgrpc"
|
||||
"go.etcd.io/bbolt"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul-net-rpc/net/rpc"
|
||||
"github.com/hashicorp/go-connlimit"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
@ -31,11 +36,6 @@ import (
|
||||
walmetrics "github.com/hashicorp/raft-wal/metrics"
|
||||
"github.com/hashicorp/raft-wal/verifier"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"go.etcd.io/bbolt"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul-net-rpc/net/rpc"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/blockingquery"
|
||||
@ -53,6 +53,7 @@ import (
|
||||
"github.com/hashicorp/consul/agent/consul/xdscapacity"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap"
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
logdrop "github.com/hashicorp/consul/agent/log-drop"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
@ -889,6 +890,23 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server,
|
||||
// to enable RPC forwarding.
|
||||
s.grpcLeaderForwarder = flat.LeaderForwarder
|
||||
|
||||
// Start watching HCP Link resource. This needs to be created after
|
||||
// the GRPC services are set up in order for the resource service client to
|
||||
// function. This uses the insecure grpc channel so that it doesn't need to
|
||||
// present a valid ACL token.
|
||||
go hcp.RunHCPLinkWatcher(
|
||||
&lib.StopChannelContext{StopCh: shutdownCh},
|
||||
logger.Named("hcp-link-watcher"),
|
||||
pbresource.NewResourceServiceClient(s.insecureSafeGRPCChan),
|
||||
hcp.HCPManagerLifecycleFn(
|
||||
s.hcpManager,
|
||||
hcpclient.NewClient,
|
||||
bootstrap.LoadManagementToken,
|
||||
flat.HCP.Config,
|
||||
flat.HCP.DataDir,
|
||||
),
|
||||
)
|
||||
|
||||
s.controllerManager = controller.NewManager(
|
||||
// Usage of the insecure + unsafe grpc chan is required for the controller
|
||||
// manager. It must be unauthorized so that controllers do not need to
|
||||
@ -990,13 +1008,13 @@ func isV1CatalogRequest(rpcName string) bool {
|
||||
}
|
||||
|
||||
func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) error {
|
||||
hcpctl.RegisterControllers(s.controllerManager, hcpctl.ControllerDependencies{
|
||||
ResourceApisEnabled: s.useV2Resources,
|
||||
HCPAllowV2ResourceApis: s.hcpAllowV2Resources,
|
||||
CloudConfig: deps.HCP.Config,
|
||||
DataDir: deps.HCP.DataDir,
|
||||
HCPManager: s.hcpManager,
|
||||
})
|
||||
hcpctl.RegisterControllers(
|
||||
s.controllerManager, hcpctl.ControllerDependencies{
|
||||
ResourceApisEnabled: s.useV2Resources,
|
||||
HCPAllowV2ResourceApis: s.hcpAllowV2Resources,
|
||||
CloudConfig: deps.HCP.Config,
|
||||
},
|
||||
)
|
||||
|
||||
// When not enabled, the v1 tenancy bridge is used by default.
|
||||
if s.useV2Tenancy {
|
||||
|
@ -18,17 +18,17 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap/constants"
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
SubDir = "hcp-config"
|
||||
|
||||
CAFileName = "server-tls-cas.pem"
|
||||
CertFileName = "server-tls-cert.pem"
|
||||
ConfigFileName = "server-config.json"
|
||||
@ -128,7 +128,7 @@ func persistAndProcessConfig(dataDir string, devMode bool, bsCfg *hcpclient.Boot
|
||||
}
|
||||
|
||||
// Create subdir if it's not already there.
|
||||
dir := filepath.Join(dataDir, SubDir)
|
||||
dir := filepath.Join(dataDir, constants.SubDir)
|
||||
if err := lib.EnsurePath(dir, true); err != nil {
|
||||
return "", fmt.Errorf("failed to ensure directory %q: %w", dir, err)
|
||||
}
|
||||
@ -273,7 +273,7 @@ func LoadPersistedBootstrapConfig(dataDir string, ui UI) (*RawBootstrapConfig, b
|
||||
return nil, false
|
||||
}
|
||||
|
||||
dir := filepath.Join(dataDir, SubDir)
|
||||
dir := filepath.Join(dataDir, constants.SubDir)
|
||||
|
||||
_, err := os.Stat(filepath.Join(dir, SuccessFileName))
|
||||
if os.IsNotExist(err) {
|
||||
@ -309,7 +309,7 @@ func LoadPersistedBootstrapConfig(dataDir string, ui UI) (*RawBootstrapConfig, b
|
||||
}
|
||||
|
||||
func loadBootstrapConfigJSON(dataDir string) (string, error) {
|
||||
filename := filepath.Join(dataDir, SubDir, ConfigFileName)
|
||||
filename := filepath.Join(dataDir, constants.SubDir, ConfigFileName)
|
||||
|
||||
_, err := os.Stat(filename)
|
||||
if os.IsNotExist(err) {
|
||||
@ -461,7 +461,7 @@ func ValidateTLSCerts(cert, key string, caCerts []string) error {
|
||||
// LoadManagementToken returns the management token, either by loading it from the persisted
|
||||
// token config file or by fetching it from HCP if the token file does not exist.
|
||||
func LoadManagementToken(ctx context.Context, logger hclog.Logger, client hcpclient.Client, dataDir string) (string, error) {
|
||||
hcpCfgDir := filepath.Join(dataDir, SubDir)
|
||||
hcpCfgDir := filepath.Join(dataDir, constants.SubDir)
|
||||
token, err := loadManagementToken(hcpCfgDir)
|
||||
|
||||
if err != nil {
|
||||
|
@ -11,15 +11,18 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/mitchellh/cli"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap/constants"
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/mitchellh/cli"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func Test_loadPersistedBootstrapConfig(t *testing.T) {
|
||||
@ -37,7 +40,7 @@ func Test_loadPersistedBootstrapConfig(t *testing.T) {
|
||||
run := func(t *testing.T, tc testCase) {
|
||||
dataDir := testutil.TempDir(t, "load-bootstrap-cfg")
|
||||
|
||||
dir := filepath.Join(dataDir, SubDir)
|
||||
dir := filepath.Join(dataDir, constants.SubDir)
|
||||
|
||||
// Do some common setup as if we received config from HCP and persisted it to disk.
|
||||
require.NoError(t, lib.EnsurePath(dir, true))
|
||||
@ -264,7 +267,7 @@ func TestLoadManagementToken(t *testing.T) {
|
||||
run := func(t *testing.T, tc testCase) {
|
||||
dataDir := testutil.TempDir(t, "load-management-token")
|
||||
|
||||
hcpCfgDir := filepath.Join(dataDir, SubDir)
|
||||
hcpCfgDir := filepath.Join(dataDir, constants.SubDir)
|
||||
if !tc.skipHCPConfigDir {
|
||||
err := os.Mkdir(hcpCfgDir, 0755)
|
||||
require.NoError(t, err)
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap"
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap/constants"
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
)
|
||||
|
||||
@ -161,7 +162,7 @@ func finalizeRuntimeConfig(rc *config.RuntimeConfig, cfg *bootstrap.RawBootstrap
|
||||
// validatePersistedConfig attempts to load persisted config to check for errors and basic validity.
|
||||
// Errors here will raise issues like referencing unsupported config fields.
|
||||
func validatePersistedConfig(dataDir string) error {
|
||||
filename := filepath.Join(dataDir, bootstrap.SubDir, bootstrap.ConfigFileName)
|
||||
filename := filepath.Join(dataDir, constants.SubDir, bootstrap.ConfigFileName)
|
||||
_, err := config.Load(config.LoadOpts{
|
||||
ConfigFiles: []string{filename},
|
||||
HCL: []string{
|
||||
|
@ -13,13 +13,15 @@ import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/mitchellh/cli"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap"
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap/constants"
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/mitchellh/cli"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBootstrapConfigLoader(t *testing.T) {
|
||||
@ -274,7 +276,7 @@ func TestLoadConfig_Persistence(t *testing.T) {
|
||||
|
||||
// New clusters should have received and persisted the whole suite of config.
|
||||
verifyFn: func(t *testing.T, rc *config.RuntimeConfig) {
|
||||
dir := filepath.Join(rc.DataDir, bootstrap.SubDir)
|
||||
dir := filepath.Join(rc.DataDir, constants.SubDir)
|
||||
|
||||
entries, err := os.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
@ -310,7 +312,7 @@ func TestLoadConfig_Persistence(t *testing.T) {
|
||||
|
||||
// Existing clusters should have only received and persisted the management token.
|
||||
verifyFn: func(t *testing.T, rc *config.RuntimeConfig) {
|
||||
dir := filepath.Join(rc.DataDir, bootstrap.SubDir)
|
||||
dir := filepath.Join(rc.DataDir, constants.SubDir)
|
||||
|
||||
entries, err := os.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
@ -347,7 +349,7 @@ func TestValidatePersistedConfig(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { os.RemoveAll(dataDir) })
|
||||
|
||||
dir := filepath.Join(dataDir, bootstrap.SubDir)
|
||||
dir := filepath.Join(dataDir, constants.SubDir)
|
||||
require.NoError(t, lib.EnsurePath(dir, true))
|
||||
|
||||
if tc.configContents != "" {
|
||||
|
9
agent/hcp/bootstrap/constants/constants.go
Normal file
9
agent/hcp/bootstrap/constants/constants.go
Normal file
@ -0,0 +1,9 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
// Package constants declares some constants for use in the HCP bootstrapping
|
||||
// process. It is in its own package with no other dependencies in order
|
||||
// to avoid a dependency cycle.
|
||||
package constants
|
||||
|
||||
const SubDir = "hcp-config"
|
68
agent/hcp/link_watch.go
Normal file
68
agent/hcp/link_watch.go
Normal file
@ -0,0 +1,68 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package hcp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
hcpctl "github.com/hashicorp/consul/internal/hcp"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
type LinkEventHandler = func(context.Context, hclog.Logger, *pbresource.WatchEvent)
|
||||
|
||||
func handleLinkEvents(ctx context.Context, logger hclog.Logger, watchClient pbresource.ResourceService_WatchListClient, linkEventHandler LinkEventHandler) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Debug("context canceled, exiting")
|
||||
return
|
||||
default:
|
||||
watchEvent, err := watchClient.Recv()
|
||||
|
||||
if err != nil {
|
||||
logger.Error("error receiving link watch event", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
linkEventHandler(ctx, logger, watchEvent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func RunHCPLinkWatcher(
|
||||
ctx context.Context, logger hclog.Logger, client pbresource.ResourceServiceClient, linkEventHandler LinkEventHandler,
|
||||
) {
|
||||
errorBackoff := &retry.Waiter{
|
||||
MinFailures: 10,
|
||||
MinWait: 0,
|
||||
MaxWait: 1 * time.Minute,
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Debug("context canceled, exiting")
|
||||
return
|
||||
default:
|
||||
watchClient, err := client.WatchList(
|
||||
ctx, &pbresource.WatchListRequest{
|
||||
Type: pbhcp.LinkType,
|
||||
NamePrefix: hcpctl.LinkName,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
logger.Error("failed to create watch on Link", "error", err)
|
||||
errorBackoff.Wait(ctx)
|
||||
continue
|
||||
}
|
||||
errorBackoff.Reset()
|
||||
handleLinkEvents(ctx, logger, watchClient, linkEventHandler)
|
||||
}
|
||||
}
|
||||
}
|
101
agent/hcp/link_watch_test.go
Normal file
101
agent/hcp/link_watch_test.go
Normal file
@ -0,0 +1,101 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package hcp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
mockpbresource "github.com/hashicorp/consul/grpcmocks/proto-public/pbresource"
|
||||
hcpctl "github.com/hashicorp/consul/internal/hcp"
|
||||
pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
// This tests that when we get a watch event from the Recv call, we get that same event on the
|
||||
// output channel, then we
|
||||
func TestLinkWatcher_Ok(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
testWatchEvent := &pbresource.WatchEvent{}
|
||||
mockWatchListClient := mockpbresource.NewResourceService_WatchListClient(t)
|
||||
mockWatchListClient.EXPECT().Recv().Return(testWatchEvent, nil)
|
||||
|
||||
eventCh := make(chan *pbresource.WatchEvent)
|
||||
mockLinkHandler := func(_ context.Context, _ hclog.Logger, event *pbresource.WatchEvent) {
|
||||
eventCh <- event
|
||||
}
|
||||
|
||||
client := mockpbresource.NewResourceServiceClient(t)
|
||||
client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{
|
||||
Type: pbhcp.LinkType,
|
||||
NamePrefix: hcpctl.LinkName,
|
||||
}).Return(mockWatchListClient, nil)
|
||||
|
||||
go RunHCPLinkWatcher(ctx, hclog.Default(), client, mockLinkHandler)
|
||||
|
||||
// Assert that the link handler is called with the testWatchEvent
|
||||
receivedWatchEvent := <-eventCh
|
||||
require.Equal(t, testWatchEvent, receivedWatchEvent)
|
||||
}
|
||||
|
||||
func TestLinkWatcher_RecvError(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// Our mock WatchListClient will simulate 5 errors, then will cancel the context.
|
||||
// We expect RunHCPLinkWatcher to attempt to create the WatchListClient 6 times (initial attempt plus 5 retries)
|
||||
// before exiting due to context cancellation.
|
||||
mockWatchListClient := mockpbresource.NewResourceService_WatchListClient(t)
|
||||
numFailures := 5
|
||||
failures := 0
|
||||
mockWatchListClient.EXPECT().Recv().RunAndReturn(func() (*pbresource.WatchEvent, error) {
|
||||
if failures < numFailures {
|
||||
failures++
|
||||
return nil, errors.New("unexpectedError")
|
||||
}
|
||||
defer cancel()
|
||||
return &pbresource.WatchEvent{}, nil
|
||||
})
|
||||
|
||||
client := mockpbresource.NewResourceServiceClient(t)
|
||||
client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{
|
||||
Type: pbhcp.LinkType,
|
||||
NamePrefix: hcpctl.LinkName,
|
||||
}).Return(mockWatchListClient, nil).Times(numFailures + 1)
|
||||
|
||||
RunHCPLinkWatcher(ctx, hclog.Default(), client, func(_ context.Context, _ hclog.Logger, _ *pbresource.WatchEvent) {})
|
||||
}
|
||||
|
||||
func TestLinkWatcher_WatchListError(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// Our mock WatchList will simulate 5 errors, then will cancel the context.
|
||||
// We expect RunHCPLinkWatcher to attempt to create the WatchListClient 6 times (initial attempt plus 5 retries)
|
||||
// before exiting due to context cancellation.
|
||||
numFailures := 5
|
||||
failures := 0
|
||||
|
||||
client := mockpbresource.NewResourceServiceClient(t)
|
||||
client.EXPECT().WatchList(mock.Anything, &pbresource.WatchListRequest{
|
||||
Type: pbhcp.LinkType,
|
||||
NamePrefix: hcpctl.LinkName,
|
||||
}).RunAndReturn(func(_ context.Context, _ *pbresource.WatchListRequest, _ ...grpc.CallOption) (pbresource.ResourceService_WatchListClient, error) {
|
||||
if failures < numFailures {
|
||||
failures++
|
||||
return nil, errors.New("unexpectedError")
|
||||
}
|
||||
defer cancel()
|
||||
return mockpbresource.NewResourceService_WatchListClient(t), nil
|
||||
}).Times(numFailures + 1)
|
||||
|
||||
RunHCPLinkWatcher(ctx, hclog.Default(), client, func(_ context.Context, _ hclog.Logger, _ *pbresource.WatchEvent) {})
|
||||
}
|
@ -9,11 +9,12 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/agent/hcp/scada"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
var (
|
||||
|
107
agent/hcp/manager_lifecycle.go
Normal file
107
agent/hcp/manager_lifecycle.go
Normal file
@ -0,0 +1,107 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package hcp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap/constants"
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
hcpctl "github.com/hashicorp/consul/internal/hcp"
|
||||
pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
// HCPManagerLifecycleFn returns a LinkEventHandler function which will appropriately
|
||||
// Start and Stop the HCP Manager based on the Link event received. If a link is upserted,
|
||||
// the HCP Manager is started, and if a link is deleted, the HCP manager is stopped.
|
||||
func HCPManagerLifecycleFn(
|
||||
m Manager,
|
||||
hcpClientFn func(cfg config.CloudConfig) (hcpclient.Client, error),
|
||||
loadMgmtTokenFn func(
|
||||
ctx context.Context, logger hclog.Logger, hcpClient hcpclient.Client, dataDir string,
|
||||
) (string, error),
|
||||
cloudConfig config.CloudConfig,
|
||||
dataDir string,
|
||||
) LinkEventHandler {
|
||||
return func(ctx context.Context, logger hclog.Logger, watchEvent *pbresource.WatchEvent) {
|
||||
// This indicates that a Link was deleted
|
||||
if watchEvent.GetDelete() != nil {
|
||||
logger.Debug("HCP Link deleted, stopping HCP manager")
|
||||
|
||||
if dataDir != "" {
|
||||
hcpConfigDir := filepath.Join(dataDir, constants.SubDir)
|
||||
logger.Debug("deleting hcp-config dir", "dir", hcpConfigDir)
|
||||
err := os.RemoveAll(hcpConfigDir)
|
||||
if err != nil {
|
||||
logger.Error("failed to delete hcp-config dir", "dir", hcpConfigDir, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
err := m.Stop()
|
||||
if err != nil {
|
||||
logger.Error("error stopping HCP manager", "error", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// This indicates that a Link was either created or updated
|
||||
if watchEvent.GetUpsert() != nil {
|
||||
logger.Debug("HCP Link upserted, starting manager if not already started")
|
||||
|
||||
res := watchEvent.GetUpsert().GetResource()
|
||||
var link pbhcp.Link
|
||||
if err := res.GetData().UnmarshalTo(&link); err != nil {
|
||||
logger.Error("error unmarshalling link data", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
if validated, reason := hcpctl.IsValidated(res); !validated {
|
||||
logger.Debug("HCP Link not validated, not starting manager", "reason", reason)
|
||||
return
|
||||
}
|
||||
|
||||
// Update the HCP manager configuration with the link values
|
||||
// Merge the link data with the existing cloud config so that we only overwrite the
|
||||
// fields that are provided by the link. This ensures that:
|
||||
// 1. The HCP configuration (i.e., how to connect to HCP) is preserved
|
||||
// 2. The Consul agent's node ID and node name are preserved
|
||||
newCfg := config.CloudConfig{
|
||||
ResourceID: link.ResourceId,
|
||||
ClientID: link.ClientId,
|
||||
ClientSecret: link.ClientSecret,
|
||||
}
|
||||
mergedCfg := config.Merge(cloudConfig, newCfg)
|
||||
hcpClient, err := hcpClientFn(mergedCfg)
|
||||
if err != nil {
|
||||
logger.Error("error creating HCP client", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Load the management token if access is set to read-write. Read-only clusters
|
||||
// will not have a management token provided by HCP.
|
||||
var token string
|
||||
if link.GetAccessLevel() == pbhcp.AccessLevel_ACCESS_LEVEL_GLOBAL_READ_WRITE {
|
||||
token, err = loadMgmtTokenFn(ctx, logger, hcpClient, dataDir)
|
||||
if err != nil {
|
||||
logger.Error("error loading management token", "error", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
mergedCfg.ManagementToken = token
|
||||
m.UpdateConfig(hcpClient, mergedCfg)
|
||||
|
||||
err = m.Start(ctx)
|
||||
if err != nil {
|
||||
logger.Error("error starting HCP manager", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
236
agent/hcp/manager_lifecycle_test.go
Normal file
236
agent/hcp/manager_lifecycle_test.go
Normal file
@ -0,0 +1,236 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package hcp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap/constants"
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
hcpctl "github.com/hashicorp/consul/internal/hcp"
|
||||
pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
func TestHCPManagerLifecycleFn(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
logger := hclog.New(&hclog.LoggerOptions{Output: io.Discard})
|
||||
|
||||
mockHCPClient := hcpclient.NewMockClient(t)
|
||||
mockHcpClientFn := func(_ config.CloudConfig) (hcpclient.Client, error) {
|
||||
return mockHCPClient, nil
|
||||
}
|
||||
|
||||
mockLoadMgmtTokenFn := func(ctx context.Context, logger hclog.Logger, hcpClient hcpclient.Client, dataDir string) (string, error) {
|
||||
return "test-mgmt-token", nil
|
||||
}
|
||||
|
||||
dataDir := testutil.TempDir(t, "test-link-controller")
|
||||
err := os.Mkdir(filepath.Join(dataDir, constants.SubDir), os.ModeDir)
|
||||
require.NoError(t, err)
|
||||
existingCfg := config.CloudConfig{
|
||||
AuthURL: "test.com",
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
mutateLink func(*pbhcp.Link)
|
||||
mutateUpsertEvent func(*pbresource.WatchEvent_Upsert)
|
||||
applyMocksAndAssertions func(*testing.T, *MockManager, *pbhcp.Link)
|
||||
hcpClientFn func(config.CloudConfig) (hcpclient.Client, error)
|
||||
loadMgmtTokenFn func(context.Context, hclog.Logger, hcpclient.Client, string) (string, error)
|
||||
}
|
||||
|
||||
testCases := map[string]testCase{
|
||||
// HCP manager should be started when link is created and stopped when link is deleted
|
||||
"Ok": {
|
||||
applyMocksAndAssertions: func(t *testing.T, mgr *MockManager, link *pbhcp.Link) {
|
||||
mgr.EXPECT().Start(mock.Anything).Return(nil).Once()
|
||||
|
||||
expectedCfg := config.CloudConfig{
|
||||
ResourceID: link.ResourceId,
|
||||
ClientID: link.ClientId,
|
||||
ClientSecret: link.ClientSecret,
|
||||
AuthURL: "test.com",
|
||||
ManagementToken: "test-mgmt-token",
|
||||
}
|
||||
mgr.EXPECT().UpdateConfig(mockHCPClient, expectedCfg).Once()
|
||||
|
||||
mgr.EXPECT().Stop().Return(nil).Once()
|
||||
},
|
||||
},
|
||||
// HCP manager should not be updated with management token
|
||||
"ReadOnly": {
|
||||
mutateLink: func(link *pbhcp.Link) {
|
||||
link.AccessLevel = pbhcp.AccessLevel_ACCESS_LEVEL_GLOBAL_READ_ONLY
|
||||
},
|
||||
applyMocksAndAssertions: func(t *testing.T, mgr *MockManager, link *pbhcp.Link) {
|
||||
mgr.EXPECT().Start(mock.Anything).Return(nil).Once()
|
||||
|
||||
expectedCfg := config.CloudConfig{
|
||||
ResourceID: link.ResourceId,
|
||||
ClientID: link.ClientId,
|
||||
ClientSecret: link.ClientSecret,
|
||||
AuthURL: "test.com",
|
||||
ManagementToken: "",
|
||||
}
|
||||
mgr.EXPECT().UpdateConfig(mockHCPClient, expectedCfg).Once()
|
||||
|
||||
mgr.EXPECT().Stop().Return(nil).Once()
|
||||
},
|
||||
},
|
||||
// HCP manager should not be started or updated if link is not validated
|
||||
"ValidationError": {
|
||||
mutateUpsertEvent: func(upsert *pbresource.WatchEvent_Upsert) {
|
||||
upsert.Resource.Status = map[string]*pbresource.Status{
|
||||
hcpctl.StatusKey: {
|
||||
Conditions: []*pbresource.Condition{hcpctl.ConditionValidatedFailed},
|
||||
},
|
||||
}
|
||||
},
|
||||
applyMocksAndAssertions: func(t *testing.T, mgr *MockManager, link *pbhcp.Link) {
|
||||
mgr.AssertNotCalled(t, "Start", mock.Anything)
|
||||
mgr.AssertNotCalled(t, "UpdateConfig", mock.Anything, mock.Anything)
|
||||
mgr.EXPECT().Stop().Return(nil).Once()
|
||||
},
|
||||
},
|
||||
"Error_InvalidLink": {
|
||||
mutateUpsertEvent: func(upsert *pbresource.WatchEvent_Upsert) {
|
||||
upsert.Resource = nil
|
||||
},
|
||||
applyMocksAndAssertions: func(t *testing.T, mgr *MockManager, link *pbhcp.Link) {
|
||||
mgr.AssertNotCalled(t, "Start", mock.Anything)
|
||||
mgr.AssertNotCalled(t, "UpdateConfig", mock.Anything, mock.Anything)
|
||||
mgr.EXPECT().Stop().Return(nil).Once()
|
||||
},
|
||||
},
|
||||
"Error_HCPManagerStop": {
|
||||
applyMocksAndAssertions: func(t *testing.T, mgr *MockManager, link *pbhcp.Link) {
|
||||
mgr.EXPECT().Start(mock.Anything).Return(nil).Once()
|
||||
mgr.EXPECT().UpdateConfig(mock.Anything, mock.Anything).Return().Once()
|
||||
mgr.EXPECT().Stop().Return(errors.New("could not stop HCP manager")).Once()
|
||||
},
|
||||
},
|
||||
"Error_CreatingHCPClient": {
|
||||
applyMocksAndAssertions: func(t *testing.T, mgr *MockManager, link *pbhcp.Link) {
|
||||
mgr.AssertNotCalled(t, "Start", mock.Anything)
|
||||
mgr.AssertNotCalled(t, "UpdateConfig", mock.Anything, mock.Anything)
|
||||
mgr.EXPECT().Stop().Return(nil).Once()
|
||||
},
|
||||
hcpClientFn: func(_ config.CloudConfig) (hcpclient.Client, error) {
|
||||
return nil, errors.New("could not create HCP client")
|
||||
},
|
||||
},
|
||||
// This should result in the HCP manager not being started
|
||||
"Error_LoadMgmtToken": {
|
||||
applyMocksAndAssertions: func(t *testing.T, mgr *MockManager, link *pbhcp.Link) {
|
||||
mgr.AssertNotCalled(t, "Start", mock.Anything)
|
||||
mgr.AssertNotCalled(t, "UpdateConfig", mock.Anything, mock.Anything)
|
||||
mgr.EXPECT().Stop().Return(nil).Once()
|
||||
},
|
||||
loadMgmtTokenFn: func(ctx context.Context, logger hclog.Logger, hcpClient hcpclient.Client, dataDir string) (string, error) {
|
||||
return "", errors.New("could not load management token")
|
||||
},
|
||||
},
|
||||
"Error_HCPManagerStart": {
|
||||
applyMocksAndAssertions: func(t *testing.T, mgr *MockManager, link *pbhcp.Link) {
|
||||
mgr.EXPECT().Start(mock.Anything).Return(errors.New("could not start HCP manager")).Once()
|
||||
mgr.EXPECT().UpdateConfig(mock.Anything, mock.Anything).Return().Once()
|
||||
mgr.EXPECT().Stop().Return(nil).Once()
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, test := range testCases {
|
||||
t.Run(name, func(t2 *testing.T) {
|
||||
mgr := NewMockManager(t2)
|
||||
|
||||
// Set up a link
|
||||
link := pbhcp.Link{
|
||||
ResourceId: "abc",
|
||||
ClientId: "def",
|
||||
ClientSecret: "ghi",
|
||||
AccessLevel: pbhcp.AccessLevel_ACCESS_LEVEL_GLOBAL_READ_WRITE,
|
||||
}
|
||||
|
||||
if test.mutateLink != nil {
|
||||
test.mutateLink(&link)
|
||||
}
|
||||
|
||||
linkResource, err := anypb.New(&link)
|
||||
require.NoError(t2, err)
|
||||
|
||||
if test.applyMocksAndAssertions != nil {
|
||||
test.applyMocksAndAssertions(t2, mgr, &link)
|
||||
}
|
||||
|
||||
testHcpClientFn := mockHcpClientFn
|
||||
if test.hcpClientFn != nil {
|
||||
testHcpClientFn = test.hcpClientFn
|
||||
}
|
||||
|
||||
testLoadMgmtToken := mockLoadMgmtTokenFn
|
||||
if test.loadMgmtTokenFn != nil {
|
||||
testLoadMgmtToken = test.loadMgmtTokenFn
|
||||
}
|
||||
|
||||
updateManagerLifecycle := HCPManagerLifecycleFn(
|
||||
mgr, testHcpClientFn,
|
||||
testLoadMgmtToken, existingCfg, dataDir,
|
||||
)
|
||||
|
||||
upsertEvent := &pbresource.WatchEvent_Upsert{
|
||||
Resource: &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Name: "global",
|
||||
Type: pbhcp.LinkType,
|
||||
},
|
||||
Status: map[string]*pbresource.Status{
|
||||
hcpctl.StatusKey: {
|
||||
Conditions: []*pbresource.Condition{hcpctl.ConditionValidatedSuccess},
|
||||
},
|
||||
},
|
||||
Data: linkResource,
|
||||
},
|
||||
}
|
||||
if test.mutateUpsertEvent != nil {
|
||||
test.mutateUpsertEvent(upsertEvent)
|
||||
}
|
||||
|
||||
// Handle upsert event
|
||||
updateManagerLifecycle(ctx, logger, &pbresource.WatchEvent{
|
||||
Event: &pbresource.WatchEvent_Upsert_{
|
||||
Upsert: upsertEvent,
|
||||
},
|
||||
})
|
||||
|
||||
// Handle delete event. This should stop HCP manager
|
||||
updateManagerLifecycle(ctx, logger, &pbresource.WatchEvent{
|
||||
Event: &pbresource.WatchEvent_Delete_{
|
||||
Delete: &pbresource.WatchEvent_Delete{},
|
||||
},
|
||||
})
|
||||
|
||||
// Ensure hcp-config directory is removed
|
||||
file := filepath.Join(dataDir, constants.SubDir)
|
||||
if _, err := os.Stat(file); err == nil || !os.IsNotExist(err) {
|
||||
require.Fail(t2, "should have removed hcp-config directory")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -9,16 +9,78 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/agent/hcp/scada"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/agent/hcp/scada"
|
||||
hcpctl "github.com/hashicorp/consul/internal/hcp"
|
||||
pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
func TestManager_MonitorHCPLink(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
logger := hclog.New(&hclog.LoggerOptions{Output: io.Discard})
|
||||
|
||||
mgr := NewManager(
|
||||
ManagerConfig{
|
||||
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
},
|
||||
)
|
||||
mockHCPClient := hcpclient.NewMockClient(t)
|
||||
mockHcpClientFn := func(_ config.CloudConfig) (hcpclient.Client, error) {
|
||||
return mockHCPClient, nil
|
||||
}
|
||||
loadMgmtTokenFn := func(ctx context.Context, logger hclog.Logger, hcpClient hcpclient.Client, dataDir string) (string, error) {
|
||||
return "test-mgmt-token", nil
|
||||
}
|
||||
|
||||
require.False(t, mgr.isRunning())
|
||||
updateManagerLifecycle := HCPManagerLifecycleFn(
|
||||
mgr, mockHcpClientFn,
|
||||
loadMgmtTokenFn, config.CloudConfig{}, "",
|
||||
)
|
||||
|
||||
// Set up a link
|
||||
link := pbhcp.Link{
|
||||
ResourceId: "abc",
|
||||
ClientId: "def",
|
||||
ClientSecret: "ghi",
|
||||
AccessLevel: pbhcp.AccessLevel_ACCESS_LEVEL_GLOBAL_READ_WRITE,
|
||||
}
|
||||
linkResource, err := anypb.New(&link)
|
||||
require.NoError(t, err)
|
||||
updateManagerLifecycle(ctx, logger, &pbresource.WatchEvent{
|
||||
Event: &pbresource.WatchEvent_Upsert_{
|
||||
Upsert: &pbresource.WatchEvent_Upsert{
|
||||
Resource: &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Name: "global",
|
||||
Type: pbhcp.LinkType,
|
||||
},
|
||||
Status: map[string]*pbresource.Status{
|
||||
hcpctl.StatusKey: {
|
||||
Conditions: []*pbresource.Condition{hcpctl.ConditionValidatedSuccess},
|
||||
},
|
||||
},
|
||||
Data: linkResource,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// Validate that the HCP manager is started
|
||||
require.True(t, mgr.isRunning())
|
||||
}
|
||||
|
||||
func TestManager_Start(t *testing.T) {
|
||||
client := hcpclient.NewMockClient(t)
|
||||
statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) {
|
||||
@ -47,18 +109,22 @@ func TestManager_Start(t *testing.T) {
|
||||
scadaM.EXPECT().Start().Return(nil)
|
||||
|
||||
telemetryM := NewMockTelemetryProvider(t)
|
||||
telemetryM.EXPECT().Start(mock.Anything, &HCPProviderCfg{
|
||||
HCPClient: client,
|
||||
HCPConfig: &cloudCfg,
|
||||
}).Return(nil).Once()
|
||||
telemetryM.EXPECT().Start(
|
||||
mock.Anything, &HCPProviderCfg{
|
||||
HCPClient: client,
|
||||
HCPConfig: &cloudCfg,
|
||||
},
|
||||
).Return(nil).Once()
|
||||
|
||||
mgr := NewManager(ManagerConfig{
|
||||
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
StatusFn: statusF,
|
||||
ManagementTokenUpserterFn: upsertManagementTokenF,
|
||||
SCADAProvider: scadaM,
|
||||
TelemetryProvider: telemetryM,
|
||||
})
|
||||
mgr := NewManager(
|
||||
ManagerConfig{
|
||||
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
StatusFn: statusF,
|
||||
ManagementTokenUpserterFn: upsertManagementTokenF,
|
||||
SCADAProvider: scadaM,
|
||||
TelemetryProvider: telemetryM,
|
||||
},
|
||||
)
|
||||
mgr.testUpdateSent = updateCh
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
@ -97,10 +163,12 @@ func TestManager_StartMultipleTimes(t *testing.T) {
|
||||
ManagementToken: "fake-token",
|
||||
}
|
||||
|
||||
mgr := NewManager(ManagerConfig{
|
||||
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
StatusFn: statusF,
|
||||
})
|
||||
mgr := NewManager(
|
||||
ManagerConfig{
|
||||
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
StatusFn: statusF,
|
||||
},
|
||||
)
|
||||
|
||||
mgr.testUpdateSent = updateCh
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -144,12 +212,14 @@ func TestManager_UpdateConfig(t *testing.T) {
|
||||
NodeID: "node-1",
|
||||
}
|
||||
|
||||
mgr := NewManager(ManagerConfig{
|
||||
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
StatusFn: statusF,
|
||||
CloudConfig: cloudCfg,
|
||||
Client: client,
|
||||
})
|
||||
mgr := NewManager(
|
||||
ManagerConfig{
|
||||
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
StatusFn: statusF,
|
||||
CloudConfig: cloudCfg,
|
||||
Client: client,
|
||||
},
|
||||
)
|
||||
|
||||
mgr.testUpdateSent = updateCh
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -203,11 +273,13 @@ func TestManager_SendUpdate(t *testing.T) {
|
||||
|
||||
// Expect two calls, once during run startup and again when SendUpdate is called
|
||||
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Twice()
|
||||
mgr := NewManager(ManagerConfig{
|
||||
Client: client,
|
||||
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
StatusFn: statusF,
|
||||
})
|
||||
mgr := NewManager(
|
||||
ManagerConfig{
|
||||
Client: client,
|
||||
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
StatusFn: statusF,
|
||||
},
|
||||
)
|
||||
mgr.testUpdateSent = updateCh
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -237,13 +309,15 @@ func TestManager_SendUpdate_Periodic(t *testing.T) {
|
||||
|
||||
// Expect two calls, once during run startup and again when SendUpdate is called
|
||||
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Twice()
|
||||
mgr := NewManager(ManagerConfig{
|
||||
Client: client,
|
||||
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
StatusFn: statusF,
|
||||
MaxInterval: time.Second,
|
||||
MinInterval: 100 * time.Millisecond,
|
||||
})
|
||||
mgr := NewManager(
|
||||
ManagerConfig{
|
||||
Client: client,
|
||||
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
|
||||
StatusFn: statusF,
|
||||
MaxInterval: time.Second,
|
||||
MinInterval: 100 * time.Millisecond,
|
||||
},
|
||||
)
|
||||
mgr.testUpdateSent = updateCh
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -305,18 +379,20 @@ func TestManager_Stop(t *testing.T) {
|
||||
telemetryM.EXPECT().Stop().Return().Once()
|
||||
|
||||
// Configure manager with all its dependencies
|
||||
mgr := NewManager(ManagerConfig{
|
||||
Logger: testutil.Logger(t),
|
||||
StatusFn: statusF,
|
||||
Client: client,
|
||||
ManagementTokenUpserterFn: upsertManagementTokenF,
|
||||
ManagementTokenDeleterFn: deleteManagementTokenF,
|
||||
SCADAProvider: scadaM,
|
||||
TelemetryProvider: telemetryM,
|
||||
CloudConfig: config.CloudConfig{
|
||||
ManagementToken: token,
|
||||
mgr := NewManager(
|
||||
ManagerConfig{
|
||||
Logger: testutil.Logger(t),
|
||||
StatusFn: statusF,
|
||||
Client: client,
|
||||
ManagementTokenUpserterFn: upsertManagementTokenF,
|
||||
ManagementTokenDeleterFn: deleteManagementTokenF,
|
||||
SCADAProvider: scadaM,
|
||||
TelemetryProvider: telemetryM,
|
||||
CloudConfig: config.CloudConfig{
|
||||
ManagementToken: token,
|
||||
},
|
||||
},
|
||||
})
|
||||
)
|
||||
mgr.testUpdateSent = updateCh
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
@ -6,6 +6,7 @@ package hcp
|
||||
import (
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/hcp/internal/controllers"
|
||||
"github.com/hashicorp/consul/internal/hcp/internal/controllers/link"
|
||||
"github.com/hashicorp/consul/internal/hcp/internal/types"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
)
|
||||
@ -18,8 +19,16 @@ func RegisterTypes(r resource.Registry) {
|
||||
|
||||
type ControllerDependencies = controllers.Dependencies
|
||||
|
||||
var IsValidated = link.IsValidated
|
||||
var LinkName = types.LinkName
|
||||
|
||||
// RegisterControllers registers controllers for the catalog types with
|
||||
// the given controller Manager.
|
||||
func RegisterControllers(mgr *controller.Manager, deps ControllerDependencies) {
|
||||
controllers.Register(mgr, deps)
|
||||
}
|
||||
|
||||
// Needed for testing
|
||||
var StatusKey = link.StatusKey
|
||||
var ConditionValidatedSuccess = link.ConditionValidatedSuccess
|
||||
var ConditionValidatedFailed = link.ConditionValidatedFailed
|
||||
|
@ -8,18 +8,16 @@ import (
|
||||
"crypto/tls"
|
||||
"strings"
|
||||
|
||||
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap"
|
||||
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
|
||||
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/hcp/internal/types"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/internal/storage"
|
||||
pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
@ -44,34 +42,28 @@ func LinkController(
|
||||
hcpAllowV2ResourceApis bool,
|
||||
hcpClientFn HCPClientFn,
|
||||
cfg config.CloudConfig,
|
||||
dataDir string,
|
||||
hcpManager hcp.Manager,
|
||||
) *controller.Controller {
|
||||
return controller.NewController("link", pbhcp.LinkType).
|
||||
// Placement is configured to each server so that the HCP manager is started
|
||||
// on each server. We plan to implement an alternative strategy to starting
|
||||
// the HCP manager so that the controller placement will eventually only be
|
||||
// on the leader.
|
||||
// https://hashicorp.atlassian.net/browse/CC-7364
|
||||
WithPlacement(controller.PlacementEachServer).
|
||||
WithInitializer(&linkInitializer{
|
||||
cloudConfig: cfg,
|
||||
}).
|
||||
WithReconciler(&linkReconciler{
|
||||
resourceApisEnabled: resourceApisEnabled,
|
||||
hcpAllowV2ResourceApis: hcpAllowV2ResourceApis,
|
||||
hcpClientFn: hcpClientFn,
|
||||
dataDir: dataDir,
|
||||
hcpManager: hcpManager,
|
||||
})
|
||||
WithInitializer(
|
||||
&linkInitializer{
|
||||
cloudConfig: cfg,
|
||||
},
|
||||
).
|
||||
WithReconciler(
|
||||
&linkReconciler{
|
||||
resourceApisEnabled: resourceApisEnabled,
|
||||
hcpAllowV2ResourceApis: hcpAllowV2ResourceApis,
|
||||
hcpClientFn: hcpClientFn,
|
||||
cloudConfig: cfg,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
type linkReconciler struct {
|
||||
resourceApisEnabled bool
|
||||
hcpAllowV2ResourceApis bool
|
||||
hcpClientFn HCPClientFn
|
||||
dataDir string
|
||||
hcpManager hcp.Manager
|
||||
cloudConfig config.CloudConfig
|
||||
}
|
||||
|
||||
func hcpAccessLevelToConsul(level *gnmmod.HashicorpCloudGlobalNetworkManager20220215ClusterConsulAccessLevel) pbhcp.AccessLevel {
|
||||
@ -102,7 +94,7 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r
|
||||
switch {
|
||||
case status.Code(err) == codes.NotFound:
|
||||
rt.Logger.Trace("link has been deleted")
|
||||
return cleanup(rt, r.hcpManager, r.dataDir)
|
||||
return nil
|
||||
case err != nil:
|
||||
rt.Logger.Error("the resource service has returned an unexpected error", "error", err)
|
||||
return err
|
||||
@ -115,43 +107,25 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r
|
||||
return err
|
||||
}
|
||||
|
||||
if err = addFinalizer(ctx, rt, res); err != nil {
|
||||
rt.Logger.Error("error adding finalizer to link resource", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if resource.IsMarkedForDeletion(res) {
|
||||
if err = cleanup(rt, r.hcpManager, r.dataDir); err != nil {
|
||||
rt.Logger.Error("error cleaning up link resource", "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
err := ensureDeleted(ctx, rt, res)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error deleting link resource", "error", err)
|
||||
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validation - Ensure V2 Resource APIs are not enabled unless hcpAllowV2ResourceApis flag is set
|
||||
var newStatus *pbresource.Status
|
||||
newStatus := &pbresource.Status{
|
||||
ObservedGeneration: res.Generation,
|
||||
Conditions: []*pbresource.Condition{},
|
||||
}
|
||||
defer writeStatusIfNotEqual(ctx, rt, res, newStatus)
|
||||
if r.resourceApisEnabled && !r.hcpAllowV2ResourceApis {
|
||||
newStatus = &pbresource.Status{
|
||||
ObservedGeneration: res.Generation,
|
||||
Conditions: []*pbresource.Condition{ConditionDisabled},
|
||||
}
|
||||
return writeStatusIfNotEqual(ctx, rt, res, newStatus)
|
||||
newStatus.Conditions = append(newStatus.Conditions, ConditionValidatedFailed)
|
||||
return nil
|
||||
} else {
|
||||
newStatus.Conditions = append(newStatus.Conditions, ConditionValidatedSuccess)
|
||||
}
|
||||
|
||||
// Merge the link data with the existing cloud config so that we only overwrite the
|
||||
// fields that are provided by the link. This ensures that:
|
||||
// 1. The HCP configuration (i.e., how to connect to HCP) is preserved
|
||||
// 2. The Consul agent's node ID and node name are preserved
|
||||
existingCfg := r.hcpManager.GetCloudConfig()
|
||||
newCfg := CloudConfigFromLink(&link)
|
||||
cfg := config.Merge(existingCfg, newCfg)
|
||||
cfg := config.Merge(r.cloudConfig, newCfg)
|
||||
hcpClient, err := r.hcpClientFn(cfg)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error creating HCP client", "error", err)
|
||||
@ -162,7 +136,8 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r
|
||||
cluster, err := hcpClient.GetCluster(ctx)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error querying HCP for cluster", "error", err)
|
||||
linkingFailed(ctx, rt, res, err)
|
||||
condition := linkingFailedCondition(err)
|
||||
newStatus.Conditions = append(newStatus.Conditions, condition)
|
||||
return err
|
||||
}
|
||||
accessLevel := hcpAccessLevelToConsul(cluster.AccessLevel)
|
||||
@ -178,47 +153,23 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r
|
||||
rt.Logger.Error("error marshalling link data", "error", err)
|
||||
return err
|
||||
}
|
||||
_, err = rt.Client.Write(ctx, &pbresource.WriteRequest{Resource: &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Name: types.LinkName,
|
||||
Type: pbhcp.LinkType,
|
||||
},
|
||||
Metadata: res.Metadata,
|
||||
Data: updatedData,
|
||||
}})
|
||||
_, err = rt.Client.Write(
|
||||
ctx, &pbresource.WriteRequest{Resource: &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
Name: types.LinkName,
|
||||
Type: pbhcp.LinkType,
|
||||
},
|
||||
Metadata: res.Metadata,
|
||||
Data: updatedData,
|
||||
}},
|
||||
)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error updating link", "error", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Load the management token if access is not set to read-only. Read-only clusters
|
||||
// will not have a management token provided by HCP.
|
||||
var token string
|
||||
if accessLevel != pbhcp.AccessLevel_ACCESS_LEVEL_GLOBAL_READ_ONLY {
|
||||
token, err = bootstrap.LoadManagementToken(ctx, rt.Logger, hcpClient, r.dataDir)
|
||||
if err != nil {
|
||||
linkingFailed(ctx, rt, res, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Update the HCP manager configuration with the link values
|
||||
cfg.ManagementToken = token
|
||||
r.hcpManager.UpdateConfig(hcpClient, cfg)
|
||||
|
||||
// Start the manager
|
||||
err = r.hcpManager.Start(ctx)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error starting HCP manager", "error", err)
|
||||
linkingFailed(ctx, rt, res, err)
|
||||
return err
|
||||
}
|
||||
|
||||
newStatus = &pbresource.Status{
|
||||
ObservedGeneration: res.Generation,
|
||||
Conditions: []*pbresource.Condition{ConditionLinked(link.ResourceId)},
|
||||
}
|
||||
newStatus.Conditions = append(newStatus.Conditions, ConditionLinked(link.ResourceId))
|
||||
|
||||
return writeStatusIfNotEqual(ctx, rt, res, newStatus)
|
||||
}
|
||||
@ -233,17 +184,20 @@ func (i *linkInitializer) Initialize(ctx context.Context, rt controller.Runtime)
|
||||
}
|
||||
|
||||
// Construct a link resource to reflect the configuration
|
||||
data, err := anypb.New(&pbhcp.Link{
|
||||
ResourceId: i.cloudConfig.ResourceID,
|
||||
ClientId: i.cloudConfig.ClientID,
|
||||
ClientSecret: i.cloudConfig.ClientSecret,
|
||||
})
|
||||
data, err := anypb.New(
|
||||
&pbhcp.Link{
|
||||
ResourceId: i.cloudConfig.ResourceID,
|
||||
ClientId: i.cloudConfig.ClientID,
|
||||
ClientSecret: i.cloudConfig.ClientSecret,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the link resource for a configuration-based link
|
||||
_, err = rt.Client.Write(ctx,
|
||||
_, err = rt.Client.Write(
|
||||
ctx,
|
||||
&pbresource.WriteRequest{
|
||||
Resource: &pbresource.Resource{
|
||||
Id: &pbresource.ID{
|
||||
|
@ -6,29 +6,23 @@ package link
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/go-uuid"
|
||||
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
|
||||
|
||||
svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap"
|
||||
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/hcp/internal/types"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
|
||||
pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
)
|
||||
|
||||
type controllerSuite struct {
|
||||
@ -39,7 +33,6 @@ type controllerSuite struct {
|
||||
rt controller.Runtime
|
||||
|
||||
tenancies []*pbresource.Tenancy
|
||||
dataDir string
|
||||
}
|
||||
|
||||
func mockHcpClientFn(t *testing.T) (*hcpclient.MockClient, HCPClientFn) {
|
||||
@ -74,17 +67,6 @@ func TestLinkController(t *testing.T) {
|
||||
func (suite *controllerSuite) deleteResourceFunc(id *pbresource.ID) func() {
|
||||
return func() {
|
||||
suite.client.MustDelete(suite.T(), id)
|
||||
|
||||
// Ensure hcp-config directory is removed
|
||||
retry.Run(suite.T(), func(r *retry.R) {
|
||||
if suite.dataDir != "" {
|
||||
file := filepath.Join(suite.dataDir, bootstrap.SubDir)
|
||||
if _, err := os.Stat(file); !os.IsNotExist(err) {
|
||||
r.Fatalf("should have removed hcp-config directory")
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
suite.client.WaitForDeletion(suite.T(), id)
|
||||
}
|
||||
}
|
||||
@ -99,29 +81,11 @@ func (suite *controllerSuite) TestController_Ok() {
|
||||
AccessLevel: &readWrite,
|
||||
}, nil)
|
||||
|
||||
token, err := uuid.GenerateUUID()
|
||||
require.NoError(suite.T(), err)
|
||||
mockClient.EXPECT().FetchBootstrap(mock.Anything).
|
||||
Return(&hcpclient.BootstrapConfig{
|
||||
ManagementToken: token,
|
||||
ConsulConfig: "{}",
|
||||
}, nil).Once()
|
||||
|
||||
hcpMgr := hcp.NewMockManager(suite.T())
|
||||
hcpMgr.EXPECT().GetCloudConfig().Return(config.CloudConfig{})
|
||||
hcpMgr.EXPECT().UpdateConfig(mock.Anything, mock.Anything)
|
||||
hcpMgr.EXPECT().Start(mock.Anything).Return(nil)
|
||||
hcpMgr.EXPECT().Stop().Return(nil)
|
||||
|
||||
dataDir := testutil.TempDir(suite.T(), "test-link-controller")
|
||||
suite.dataDir = dataDir
|
||||
mgr.Register(LinkController(
|
||||
false,
|
||||
false,
|
||||
mockClientFn,
|
||||
config.CloudConfig{},
|
||||
dataDir,
|
||||
hcpMgr,
|
||||
))
|
||||
mgr.SetRaftLeader(true)
|
||||
go mgr.Run(suite.ctx)
|
||||
@ -138,11 +102,6 @@ func (suite *controllerSuite) TestController_Ok() {
|
||||
|
||||
suite.T().Cleanup(suite.deleteResourceFunc(link.Id))
|
||||
|
||||
// Ensure finalizer was added
|
||||
suite.client.WaitForResourceState(suite.T(), link.Id, func(t rtest.T, res *pbresource.Resource) {
|
||||
require.True(t, resource.HasFinalizer(res, StatusKey), "link resource does not have finalizer")
|
||||
})
|
||||
|
||||
suite.client.WaitForStatusCondition(suite.T(), link.Id, StatusKey, ConditionLinked(linkData.ResourceId))
|
||||
var updatedLink pbhcp.Link
|
||||
updatedLinkResource := suite.client.WaitForNewVersion(suite.T(), link.Id, link.Version)
|
||||
@ -168,22 +127,11 @@ func (suite *controllerSuite) TestController_Initialize() {
|
||||
ResourceID: types.GenerateTestResourceID(suite.T()),
|
||||
}
|
||||
|
||||
hcpMgr := hcp.NewMockManager(suite.T())
|
||||
hcpMgr.EXPECT().GetCloudConfig().Return(cloudCfg)
|
||||
hcpMgr.EXPECT().UpdateConfig(mock.Anything, mock.Anything)
|
||||
hcpMgr.EXPECT().Start(mock.Anything).Return(nil)
|
||||
hcpMgr.EXPECT().Stop().Return(nil)
|
||||
|
||||
dataDir := testutil.TempDir(suite.T(), "test-link-controller")
|
||||
suite.dataDir = dataDir
|
||||
|
||||
mgr.Register(LinkController(
|
||||
false,
|
||||
false,
|
||||
mockClientFn,
|
||||
cloudCfg,
|
||||
dataDir,
|
||||
hcpMgr,
|
||||
))
|
||||
mgr.SetRaftLeader(true)
|
||||
go mgr.Run(suite.ctx)
|
||||
@ -214,18 +162,12 @@ func (suite *controllerSuite) TestControllerResourceApisEnabled_LinkDisabled() {
|
||||
// Run the controller manager
|
||||
mgr := controller.NewManager(suite.client, suite.rt.Logger)
|
||||
_, mockClientFunc := mockHcpClientFn(suite.T())
|
||||
dataDir := testutil.TempDir(suite.T(), "test-link-controller")
|
||||
suite.dataDir = dataDir
|
||||
|
||||
hcpMgr := hcp.NewMockManager(suite.T())
|
||||
hcpMgr.EXPECT().Stop().Return(nil)
|
||||
mgr.Register(LinkController(
|
||||
true,
|
||||
false,
|
||||
mockClientFunc,
|
||||
config.CloudConfig{},
|
||||
dataDir,
|
||||
hcpMgr,
|
||||
))
|
||||
mgr.SetRaftLeader(true)
|
||||
go mgr.Run(suite.ctx)
|
||||
@ -241,7 +183,7 @@ func (suite *controllerSuite) TestControllerResourceApisEnabled_LinkDisabled() {
|
||||
|
||||
suite.T().Cleanup(suite.deleteResourceFunc(link.Id))
|
||||
|
||||
suite.client.WaitForStatusCondition(suite.T(), link.Id, StatusKey, ConditionDisabled)
|
||||
suite.client.WaitForStatusCondition(suite.T(), link.Id, StatusKey, ConditionValidatedFailed)
|
||||
}
|
||||
|
||||
func (suite *controllerSuite) TestControllerResourceApisEnabledWithOverride_LinkNotDisabled() {
|
||||
@ -252,29 +194,11 @@ func (suite *controllerSuite) TestControllerResourceApisEnabledWithOverride_Link
|
||||
HCPPortalURL: "http://test.com",
|
||||
}, nil)
|
||||
|
||||
token, err := uuid.GenerateUUID()
|
||||
require.NoError(suite.T(), err)
|
||||
mockClient.EXPECT().FetchBootstrap(mock.Anything).
|
||||
Return(&hcpclient.BootstrapConfig{
|
||||
ManagementToken: token,
|
||||
ConsulConfig: "{}",
|
||||
}, nil).Once()
|
||||
|
||||
dataDir := testutil.TempDir(suite.T(), "test-link-controller")
|
||||
suite.dataDir = dataDir
|
||||
hcpMgr := hcp.NewMockManager(suite.T())
|
||||
hcpMgr.EXPECT().GetCloudConfig().Return(config.CloudConfig{})
|
||||
hcpMgr.EXPECT().UpdateConfig(mock.Anything, mock.Anything)
|
||||
hcpMgr.EXPECT().Start(mock.Anything).Return(nil)
|
||||
hcpMgr.EXPECT().Stop().Return(nil)
|
||||
|
||||
mgr.Register(LinkController(
|
||||
true,
|
||||
true,
|
||||
mockClientFunc,
|
||||
config.CloudConfig{},
|
||||
dataDir,
|
||||
hcpMgr,
|
||||
))
|
||||
|
||||
mgr.SetRaftLeader(true)
|
||||
@ -292,6 +216,7 @@ func (suite *controllerSuite) TestControllerResourceApisEnabledWithOverride_Link
|
||||
suite.T().Cleanup(suite.deleteResourceFunc(link.Id))
|
||||
|
||||
suite.client.WaitForStatusCondition(suite.T(), link.Id, StatusKey, ConditionLinked(linkData.ResourceId))
|
||||
suite.client.WaitForStatusCondition(suite.T(), link.Id, StatusKey, ConditionValidatedSuccess)
|
||||
}
|
||||
|
||||
func (suite *controllerSuite) TestController_GetClusterError() {
|
||||
@ -321,20 +246,11 @@ func (suite *controllerSuite) TestController_GetClusterError() {
|
||||
mockClient, mockClientFunc := mockHcpClientFn(t)
|
||||
mockClient.EXPECT().GetCluster(mock.Anything).Return(nil, tc.expectErr)
|
||||
|
||||
dataDir := testutil.TempDir(t, "test-link-controller")
|
||||
suite.dataDir = dataDir
|
||||
|
||||
hcpMgr := hcp.NewMockManager(t)
|
||||
hcpMgr.EXPECT().GetCloudConfig().Return(config.CloudConfig{})
|
||||
hcpMgr.EXPECT().Stop().Return(nil)
|
||||
|
||||
mgr.Register(LinkController(
|
||||
true,
|
||||
true,
|
||||
mockClientFunc,
|
||||
config.CloudConfig{},
|
||||
dataDir,
|
||||
hcpMgr,
|
||||
))
|
||||
|
||||
mgr.SetRaftLeader(true)
|
||||
|
@ -1,75 +0,0 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package link
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
"github.com/hashicorp/consul/agent/hcp/bootstrap"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/resource"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
func cleanup(rt controller.Runtime, hcpManager hcp.Manager, dataDir string) error {
|
||||
rt.Logger.Trace("cleaning up link resource")
|
||||
|
||||
rt.Logger.Debug("stopping HCP manager")
|
||||
hcpManager.Stop()
|
||||
|
||||
if dataDir != "" {
|
||||
hcpConfigDir := filepath.Join(dataDir, bootstrap.SubDir)
|
||||
rt.Logger.Debug("deleting hcp-config dir", "dir", hcpConfigDir)
|
||||
err := os.RemoveAll(hcpConfigDir)
|
||||
if err != nil {
|
||||
rt.Logger.Error("failed to delete hcp-config dir", "dir", hcpConfigDir, "err", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func addFinalizer(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) error {
|
||||
// The statusKey doubles as the finalizer name for the link resource
|
||||
if resource.HasFinalizer(res, StatusKey) {
|
||||
rt.Logger.Trace("already has finalizer")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Finalizer hasn't been written, so add it.
|
||||
resource.AddFinalizer(res, StatusKey)
|
||||
_, err := rt.Client.Write(ctx, &pbresource.WriteRequest{Resource: res})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rt.Logger.Trace("added finalizer")
|
||||
return err
|
||||
}
|
||||
|
||||
// ensureDeleted makes sure a link is finally deleted
|
||||
func ensureDeleted(ctx context.Context, rt controller.Runtime, res *pbresource.Resource) error {
|
||||
// Remove finalizer if present
|
||||
if resource.HasFinalizer(res, StatusKey) {
|
||||
resource.RemoveFinalizer(res, StatusKey)
|
||||
_, err := rt.Client.Write(ctx, &pbresource.WriteRequest{Resource: res})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rt.Logger.Trace("removed finalizer")
|
||||
}
|
||||
|
||||
// Finally, delete the link
|
||||
_, err := rt.Client.Delete(ctx, &pbresource.DeleteRequest{Id: res.Id})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Success
|
||||
rt.Logger.Trace("finally deleted")
|
||||
return nil
|
||||
}
|
@ -18,52 +18,71 @@ import (
|
||||
const (
|
||||
StatusKey = "consul.io/hcp/link"
|
||||
|
||||
StatusLinked = "linked"
|
||||
LinkedReason = "SUCCESS"
|
||||
FailedReason = "FAILED"
|
||||
DisabledReasonV2ResourcesUnsupported = "DISABLED_V2_RESOURCES_UNSUPPORTED"
|
||||
UnauthorizedReason = "UNAUTHORIZED"
|
||||
ForbiddenReason = "FORBIDDEN"
|
||||
// Statuses
|
||||
StatusLinked = "linked"
|
||||
StatusValidated = "validated"
|
||||
|
||||
LinkedSuccessReason = "SUCCESS"
|
||||
LinkedFailedReason = "FAILED"
|
||||
LinkedDisabledReasonV2ResourcesUnsupportedReason = "DISABLED_V2_RESOURCES_UNSUPPORTED"
|
||||
LinkedUnauthorizedReason = "UNAUTHORIZED"
|
||||
LinkedForbiddenReason = "FORBIDDEN"
|
||||
ValidatedSuccessReason = "SUCCESS"
|
||||
ValidatedFailedV2ResourcesReason = "V2_RESOURCES_UNSUPPORTED"
|
||||
|
||||
LinkedMessageFormat = "Successfully linked to cluster '%s'"
|
||||
FailedMessage = "Failed to link to HCP due to unexpected error"
|
||||
DisabledResourceAPIsEnabledMessage = "Link is disabled because resource-apis are enabled"
|
||||
UnauthorizedMessage = "Access denied, check client_id and client_secret"
|
||||
ForbiddenMessage = "Access denied, check the resource_id"
|
||||
ValidatedSuccessMessage = "Successfully validated link"
|
||||
ValidatedFailedV2ResourcesMessage = "Link is disabled because resource-apis are enabled"
|
||||
)
|
||||
|
||||
var (
|
||||
ConditionDisabled = &pbresource.Condition{
|
||||
Type: StatusLinked,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: DisabledReasonV2ResourcesUnsupported,
|
||||
Reason: LinkedDisabledReasonV2ResourcesUnsupportedReason,
|
||||
Message: DisabledResourceAPIsEnabledMessage,
|
||||
}
|
||||
ConditionFailed = &pbresource.Condition{
|
||||
Type: StatusLinked,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: FailedReason,
|
||||
Reason: LinkedFailedReason,
|
||||
Message: FailedMessage,
|
||||
}
|
||||
ConditionUnauthorized = &pbresource.Condition{
|
||||
Type: StatusLinked,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: UnauthorizedReason,
|
||||
Reason: LinkedUnauthorizedReason,
|
||||
Message: UnauthorizedMessage,
|
||||
}
|
||||
ConditionForbidden = &pbresource.Condition{
|
||||
Type: StatusLinked,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: ForbiddenReason,
|
||||
Reason: LinkedForbiddenReason,
|
||||
Message: ForbiddenMessage,
|
||||
}
|
||||
ConditionValidatedSuccess = &pbresource.Condition{
|
||||
Type: StatusValidated,
|
||||
State: pbresource.Condition_STATE_TRUE,
|
||||
Reason: ValidatedSuccessReason,
|
||||
Message: ValidatedSuccessMessage,
|
||||
}
|
||||
ConditionValidatedFailed = &pbresource.Condition{
|
||||
Type: StatusValidated,
|
||||
State: pbresource.Condition_STATE_FALSE,
|
||||
Reason: ValidatedFailedV2ResourcesReason,
|
||||
Message: ValidatedFailedV2ResourcesMessage,
|
||||
}
|
||||
)
|
||||
|
||||
func ConditionLinked(resourceId string) *pbresource.Condition {
|
||||
return &pbresource.Condition{
|
||||
Type: StatusLinked,
|
||||
State: pbresource.Condition_STATE_TRUE,
|
||||
Reason: LinkedReason,
|
||||
Reason: LinkedSuccessReason,
|
||||
Message: fmt.Sprintf(LinkedMessageFormat, resourceId),
|
||||
}
|
||||
}
|
||||
@ -72,39 +91,39 @@ func writeStatusIfNotEqual(ctx context.Context, rt controller.Runtime, res *pbre
|
||||
if resource.EqualStatus(res.Status[StatusKey], status, false) {
|
||||
return nil
|
||||
}
|
||||
_, err := rt.Client.WriteStatus(ctx, &pbresource.WriteStatusRequest{
|
||||
Id: res.Id,
|
||||
Key: StatusKey,
|
||||
Status: status,
|
||||
})
|
||||
_, err := rt.Client.WriteStatus(
|
||||
ctx, &pbresource.WriteStatusRequest{
|
||||
Id: res.Id,
|
||||
Key: StatusKey,
|
||||
Status: status,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
rt.Logger.Error("error writing link status", "error", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func linkingFailed(ctx context.Context, rt controller.Runtime, res *pbresource.Resource, err error) error {
|
||||
var condition *pbresource.Condition
|
||||
func linkingFailedCondition(err error) *pbresource.Condition {
|
||||
switch {
|
||||
case errors.Is(err, client.ErrUnauthorized):
|
||||
condition = ConditionUnauthorized
|
||||
return ConditionUnauthorized
|
||||
case errors.Is(err, client.ErrForbidden):
|
||||
condition = ConditionForbidden
|
||||
return ConditionForbidden
|
||||
default:
|
||||
condition = ConditionFailed
|
||||
return ConditionFailed
|
||||
}
|
||||
newStatus := &pbresource.Status{
|
||||
ObservedGeneration: res.Generation,
|
||||
Conditions: []*pbresource.Condition{condition},
|
||||
}
|
||||
|
||||
writeErr := writeStatusIfNotEqual(ctx, rt, res, newStatus)
|
||||
if writeErr != nil {
|
||||
rt.Logger.Error("error writing status", "error", writeErr)
|
||||
return writeErr
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func IsLinked(res *pbresource.Resource) (linked bool, reason string) {
|
||||
return isConditionTrue(res, StatusLinked)
|
||||
}
|
||||
|
||||
func IsValidated(res *pbresource.Resource) (linked bool, reason string) {
|
||||
return isConditionTrue(res, StatusValidated)
|
||||
}
|
||||
|
||||
func isConditionTrue(res *pbresource.Resource, statusType string) (bool, string) {
|
||||
if !resource.EqualType(res.GetId().GetType(), pbhcp.LinkType) {
|
||||
return false, "resource is not hcp.Link type"
|
||||
}
|
||||
@ -115,9 +134,9 @@ func IsLinked(res *pbresource.Resource) (linked bool, reason string) {
|
||||
}
|
||||
|
||||
for _, cond := range linkStatus.GetConditions() {
|
||||
if cond.Type == StatusLinked && cond.GetState() == pbresource.Condition_STATE_TRUE {
|
||||
if cond.Type == statusType && cond.GetState() == pbresource.Condition_STATE_TRUE {
|
||||
return true, ""
|
||||
}
|
||||
}
|
||||
return false, "link status does not include positive linked condition"
|
||||
return false, fmt.Sprintf("link status does not include positive %s condition", statusType)
|
||||
}
|
||||
|
@ -4,7 +4,6 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/hcp"
|
||||
"github.com/hashicorp/consul/agent/hcp/config"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/hcp/internal/controllers/link"
|
||||
@ -15,19 +14,17 @@ type Dependencies struct {
|
||||
CloudConfig config.CloudConfig
|
||||
ResourceApisEnabled bool
|
||||
HCPAllowV2ResourceApis bool
|
||||
DataDir string
|
||||
HCPManager *hcp.HCPManager
|
||||
}
|
||||
|
||||
func Register(mgr *controller.Manager, deps Dependencies) {
|
||||
mgr.Register(link.LinkController(
|
||||
deps.ResourceApisEnabled,
|
||||
deps.HCPAllowV2ResourceApis,
|
||||
link.DefaultHCPClientFn,
|
||||
deps.CloudConfig,
|
||||
deps.DataDir,
|
||||
deps.HCPManager,
|
||||
))
|
||||
mgr.Register(
|
||||
link.LinkController(
|
||||
deps.ResourceApisEnabled,
|
||||
deps.HCPAllowV2ResourceApis,
|
||||
link.DefaultHCPClientFn,
|
||||
deps.CloudConfig,
|
||||
),
|
||||
)
|
||||
|
||||
mgr.Register(telemetrystate.TelemetryStateController(link.DefaultHCPClientFn))
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user