From b0e87dbe13d98aa617ccf58f844008c7d1bc1519 Mon Sep 17 00:00:00 2001 From: Melissa Kam <3768460+mkam@users.noreply.github.com> Date: Tue, 30 Jan 2024 09:40:36 -0600 Subject: [PATCH] [CC-7049] Stop the HCP manager when link is deleted (#20351) * Add Stop method to telemetry provider Stop the main loop of the provider and set the config to disabled. * Add interface for telemetry provider Added for easier testing. Also renamed Run to Start, which better fits with Stop. * Add Stop method to HCP manager * Add manager interface, rename implementation Add interface for easier testing, rename existing Manager to HCPManager. * Stop HCP manager in link Finalizer * Attempt to cleanup if resource has been deleted The link should be cleaned up by the finalizer, but there's an edge case in a multi-server setup where the link is fully deleted on one server before the other server reconciles. This will cover the case where the reconcile happens after the resource is deleted. * Add a delete mananagement token function Passes a function to the HCP manager that deletes the management token that was initially created by the manager. * Delete token as part of stopping the manager * Lock around disabling config, remove descriptions --- agent/consul/leader.go | 32 +++ agent/consul/server.go | 10 +- agent/consul/server_test.go | 11 + agent/hcp/manager.go | 90 ++++++-- agent/hcp/manager_test.go | 126 +++++++++-- agent/hcp/mock_Manager.go | 209 ++++++++++++++++++ agent/hcp/mock_TelemetryProvider.go | 115 ++++++++++ agent/hcp/telemetry_provider.go | 50 ++++- agent/hcp/telemetry_provider_test.go | 81 ++++++- .../internal/controllers/link/controller.go | 15 +- .../controllers/link/controller_test.go | 67 +++--- .../internal/controllers/link/finalizer.go | 13 +- internal/hcp/internal/controllers/register.go | 2 +- 13 files changed, 730 insertions(+), 91 deletions(-) create mode 100644 agent/hcp/mock_Manager.go create mode 100644 agent/hcp/mock_TelemetryProvider.go diff --git a/agent/consul/leader.go b/agent/consul/leader.go index d4ae8b1d73..53312c7fe5 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -635,6 +635,38 @@ func (s *Server) upsertManagementToken(name, secretID string) error { return nil } +func (s *Server) deleteManagementToken(secretId string) error { + state := s.fsm.State() + + // Fetch the token to get its accessor ID and to verify that it's a management token + _, token, err := state.ACLTokenGetBySecret(nil, secretId, nil) + if err != nil { + return fmt.Errorf("failed to get management token: %v", err) + } + + if token == nil { + // token is already deleted + return nil + } + + accessorID := token.AccessorID + if len(token.Policies) != 1 && token.Policies[0].ID != structs.ACLPolicyGlobalManagementID { + return fmt.Errorf("failed to delete management token: not a management token") + } + + // Delete the token + req := structs.ACLTokenBatchDeleteRequest{ + TokenIDs: []string{accessorID}, + } + if _, err := s.raftApply(structs.ACLTokenDeleteRequestType, &req); err != nil { + return fmt.Errorf("failed to delete management token: %v", err) + } + + s.logger.Info("deleted ACL token", "description", token.Description) + + return nil +} + func (s *Server) insertAnonymousToken() error { state := s.fsm.State() _, token, err := state.ACLTokenGetBySecret(nil, anonymousToken, nil) diff --git a/agent/consul/server.go b/agent/consul/server.go index 8f3ac60886..86eec4250d 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -457,7 +457,7 @@ type Server struct { xdsCapacityController *xdscapacity.Controller // hcpManager handles pushing server status updates to the HashiCorp Cloud Platform when enabled - hcpManager *hcp.Manager + hcpManager *hcp.HCPManager // embedded struct to hold all the enterprise specific data EnterpriseServer @@ -611,6 +611,14 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, } return nil }, + ManagementTokenDeleterFn: func(secretId string) error { + // Check the state of the server before attempting to delete the token.Otherwise, + // the delete will fail and log errors that do not require action from the user. + if s.config.ACLsEnabled && s.IsLeader() && s.InPrimaryDatacenter() { + return s.deleteManagementToken(secretId) + } + return nil + }, }) var recorder *middleware.RequestRecorder diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index be8b68f1d1..c7b51c79fe 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -2139,6 +2139,17 @@ func TestServer_hcpManager(t *testing.T) { require.NoError(r, err) require.NotNil(r, createdToken) }) + + // Stop the HCP manager + err = s1.hcpManager.Stop() + require.NoError(t, err) + + // Validate that the HCP token has been deleted as expected + retry.Run(t, func(r *retry.R) { + _, createdToken, err := s1.fsm.State().ACLTokenGetBySecret(nil, token, nil) + require.NoError(r, err) + require.Nil(r, createdToken) + }) } func TestServer_addServerTLSInfo(t *testing.T) { diff --git a/agent/hcp/manager.go b/agent/hcp/manager.go index 522f8a7f85..eebd87b1b7 100644 --- a/agent/hcp/manager.go +++ b/agent/hcp/manager.go @@ -5,6 +5,7 @@ package hcp import ( "context" + "reflect" "sync" "time" @@ -20,16 +21,19 @@ var ( defaultManagerMaxInterval = 75 * time.Minute ) +var _ Manager = (*HCPManager)(nil) + type ManagerConfig struct { Client hcpclient.Client CloudConfig config.CloudConfig SCADAProvider scada.Provider - TelemetryProvider *hcpProviderImpl + TelemetryProvider TelemetryProvider StatusFn StatusCallback // Idempotent function to upsert the HCP management token. This will be called periodically in // the manager's main loop. ManagementTokenUpserterFn ManagementTokenUpserter + ManagementTokenDeleterFn ManagementTokenDeleter MinInterval time.Duration MaxInterval time.Duration @@ -58,8 +62,17 @@ func (cfg *ManagerConfig) nextHeartbeat() time.Duration { type StatusCallback func(context.Context) (hcpclient.ServerStatus, error) type ManagementTokenUpserter func(name, secretId string) error +type ManagementTokenDeleter func(secretId string) error -type Manager struct { +//go:generate mockery --name Manager --with-expecter --inpackage +type Manager interface { + Start(context.Context) error + Stop() error + GetCloudConfig() config.CloudConfig + UpdateConfig(hcpclient.Client, config.CloudConfig) +} + +type HCPManager struct { logger hclog.Logger running bool @@ -69,14 +82,15 @@ type Manager struct { cfgMu sync.RWMutex updateCh chan struct{} + stopCh chan struct{} // testUpdateSent is set by unit tests to signal when the manager's status update has triggered testUpdateSent chan struct{} } // NewManager returns a Manager initialized with the given configuration. -func NewManager(cfg ManagerConfig) *Manager { - return &Manager{ +func NewManager(cfg ManagerConfig) *HCPManager { + return &HCPManager{ logger: cfg.Logger, cfg: cfg, @@ -86,7 +100,7 @@ func NewManager(cfg ManagerConfig) *Manager { // Start executes the logic for connecting to HCP and sending periodic server updates. If the // manager has been previously started, it will not start again. -func (m *Manager) Start(ctx context.Context) error { +func (m *HCPManager) Start(ctx context.Context) error { // Check if the manager has already started changed := m.setRunning(true) if !changed { @@ -117,6 +131,8 @@ func (m *Manager) Start(ctx context.Context) error { case <-ctx.Done(): m.setRunning(false) return nil + case <-m.stopCh: + return nil case <-m.updateCh: // empty the update chan if there is a queued update to prevent repeated update in main loop err = m.sendUpdate() if err != nil { @@ -158,6 +174,9 @@ func (m *Manager) Start(ctx context.Context) error { m.setRunning(false) return + case <-m.stopCh: + return + case <-m.updateCh: err = m.sendUpdate() @@ -170,7 +189,7 @@ func (m *Manager) Start(ctx context.Context) error { return err } -func (m *Manager) startSCADAProvider() error { +func (m *HCPManager) startSCADAProvider() error { provider := m.cfg.SCADAProvider if provider == nil { return nil @@ -197,12 +216,12 @@ func (m *Manager) startSCADAProvider() error { return nil } -func (m *Manager) startTelemetryProvider(ctx context.Context) error { - if m.cfg.TelemetryProvider == nil { +func (m *HCPManager) startTelemetryProvider(ctx context.Context) error { + if m.cfg.TelemetryProvider == nil || reflect.ValueOf(m.cfg.TelemetryProvider).IsNil() { return nil } - m.cfg.TelemetryProvider.Run(ctx, &HCPProviderCfg{ + m.cfg.TelemetryProvider.Start(ctx, &HCPProviderCfg{ HCPClient: m.cfg.Client, HCPConfig: &m.cfg.CloudConfig, }) @@ -210,14 +229,14 @@ func (m *Manager) startTelemetryProvider(ctx context.Context) error { return nil } -func (m *Manager) GetCloudConfig() config.CloudConfig { +func (m *HCPManager) GetCloudConfig() config.CloudConfig { m.cfgMu.RLock() defer m.cfgMu.RUnlock() return m.cfg.CloudConfig } -func (m *Manager) UpdateConfig(client hcpclient.Client, cloudCfg config.CloudConfig) { +func (m *HCPManager) UpdateConfig(client hcpclient.Client, cloudCfg config.CloudConfig) { m.cfgMu.Lock() // Save original values originalCfg := m.cfg.CloudConfig @@ -234,7 +253,7 @@ func (m *Manager) UpdateConfig(client hcpclient.Client, cloudCfg config.CloudCon } } -func (m *Manager) SendUpdate() { +func (m *HCPManager) SendUpdate() { m.logger.Debug("HCP triggering status update") select { case m.updateCh <- struct{}{}: @@ -252,7 +271,7 @@ func (m *Manager) SendUpdate() { // and a "isRetrying" state or something so that we attempt to send update, but // then fetch fresh info on each attempt to send so if we are already in a retry // backoff a new push is a no-op. -func (m *Manager) sendUpdate() error { +func (m *HCPManager) sendUpdate() error { m.cfgMu.RLock() cfg := m.cfg m.cfgMu.RUnlock() @@ -281,7 +300,7 @@ func (m *Manager) sendUpdate() error { return cfg.Client.PushServerStatus(ctx, &s) } -func (m *Manager) isRunning() bool { +func (m *HCPManager) isRunning() bool { m.runLock.RLock() defer m.runLock.RUnlock() return m.running @@ -290,7 +309,7 @@ func (m *Manager) isRunning() bool { // setRunning sets the running status of the manager to the given value. If the // given value is the same as the current running status, it returns false. If // current status is updated to the given status, it returns true. -func (m *Manager) setRunning(r bool) bool { +func (m *HCPManager) setRunning(r bool) bool { m.runLock.Lock() defer m.runLock.Unlock() @@ -298,6 +317,47 @@ func (m *Manager) setRunning(r bool) bool { return false } + // Initialize or close the stop channel depending what running status + // we're transitioning to. Channel must be initialized on start since + // a provider can be stopped and started multiple times. + if r { + m.stopCh = make(chan struct{}) + } else { + close(m.stopCh) + } + m.running = r return true } + +// Stop stops the manager's main loop that sends updates +// and stops the SCADA provider and telemetry provider. +func (m *HCPManager) Stop() error { + changed := m.setRunning(false) + if !changed { + m.logger.Trace("HCP manager already stopped") + return nil + } + m.logger.Info("HCP manager stopping") + + m.cfgMu.RLock() + defer m.cfgMu.RUnlock() + + if m.cfg.SCADAProvider != nil { + m.cfg.SCADAProvider.Stop() + } + + if m.cfg.TelemetryProvider != nil && !reflect.ValueOf(m.cfg.TelemetryProvider).IsNil() { + m.cfg.TelemetryProvider.Stop() + } + + if m.cfg.ManagementTokenDeleterFn != nil && m.cfg.CloudConfig.ManagementToken != "" { + err := m.cfg.ManagementTokenDeleterFn(m.cfg.CloudConfig.ManagementToken) + if err != nil { + return err + } + } + + m.logger.Info("HCP manager stopped") + return nil +} diff --git a/agent/hcp/manager_test.go b/agent/hcp/manager_test.go index 2c00c87521..0dd96bea64 100644 --- a/agent/hcp/manager_test.go +++ b/agent/hcp/manager_test.go @@ -4,6 +4,7 @@ package hcp import ( + "fmt" "io" "testing" "time" @@ -11,6 +12,7 @@ import ( hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/agent/hcp/scada" + "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -31,7 +33,7 @@ func TestManager_Start(t *testing.T) { client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once() cloudCfg := config.CloudConfig{ - ResourceID: "organization/85702e73-8a3d-47dc-291c-379b783c5804/project/8c0547c0-10e8-1ea2-dffe-384bee8da634/hashicorp.consul.global-network-manager.cluster/test", + ResourceID: "resource-id", NodeID: "node-1", ManagementToken: "fake-token", } @@ -44,25 +46,18 @@ func TestManager_Start(t *testing.T) { ).Return().Once() scadaM.EXPECT().Start().Return(nil) - telemetryProvider := &hcpProviderImpl{ - httpCfg: &httpCfg{}, - logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), - cfg: defaultDisabledCfg(), - } - - mockTelemetryCfg, err := testTelemetryCfg(&testConfig{ - refreshInterval: 1 * time.Second, - }) - require.NoError(t, err) - client.EXPECT().FetchTelemetryConfig(mock.Anything).Return( - mockTelemetryCfg, nil).Maybe() + telemetryM := NewMockTelemetryProvider(t) + telemetryM.EXPECT().Start(mock.Anything, &HCPProviderCfg{ + HCPClient: client, + HCPConfig: &cloudCfg, + }).Return(nil).Once() mgr := NewManager(ManagerConfig{ Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), StatusFn: statusF, ManagementTokenUpserterFn: upsertManagementTokenF, SCADAProvider: scadaM, - TelemetryProvider: telemetryProvider, + TelemetryProvider: telemetryM, }) mgr.testUpdateSent = updateCh ctx, cancel := context.WithCancel(context.Background()) @@ -85,9 +80,6 @@ func TestManager_Start(t *testing.T) { // Make sure after manager has stopped no more statuses are pushed. cancel() client.AssertExpectations(t) - require.Equal(t, client, telemetryProvider.hcpClient) - require.NotNil(t, telemetryProvider.GetHeader()) - require.NotNil(t, telemetryProvider.GetHTTPClient()) } func TestManager_StartMultipleTimes(t *testing.T) { @@ -270,3 +262,103 @@ func TestManager_SendUpdate_Periodic(t *testing.T) { } client.AssertExpectations(t) } + +func TestManager_Stop(t *testing.T) { + client := hcpclient.NewMockClient(t) + + // Configure status functions called in sendUpdate + statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { + return hcpclient.ServerStatus{ID: t.Name()}, nil + } + updateCh := make(chan struct{}, 1) + client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Twice() + + // Configure management token creation and cleanup + token := "test-token" + upsertManagementTokenCalled := make(chan struct{}, 1) + upsertManagementTokenF := func(name, secretID string) error { + upsertManagementTokenCalled <- struct{}{} + if secretID != token { + return fmt.Errorf("expected token %q, got %q", token, secretID) + } + return nil + } + deleteManagementTokenCalled := make(chan struct{}, 1) + deleteManagementTokenF := func(secretID string) error { + deleteManagementTokenCalled <- struct{}{} + if secretID != token { + return fmt.Errorf("expected token %q, got %q", token, secretID) + } + return nil + } + + // Configure the SCADA provider + scadaM := scada.NewMockProvider(t) + scadaM.EXPECT().UpdateHCPConfig(mock.Anything).Return(nil).Once() + scadaM.EXPECT().UpdateMeta(mock.Anything).Return().Once() + scadaM.EXPECT().Start().Return(nil).Once() + scadaM.EXPECT().Stop().Return(nil).Once() + + // Configure the telemetry provider + telemetryM := NewMockTelemetryProvider(t) + telemetryM.EXPECT().Start(mock.Anything, mock.Anything).Return(nil).Once() + telemetryM.EXPECT().Stop().Return().Once() + + // Configure manager with all its dependencies + mgr := NewManager(ManagerConfig{ + Logger: testutil.Logger(t), + StatusFn: statusF, + Client: client, + ManagementTokenUpserterFn: upsertManagementTokenF, + ManagementTokenDeleterFn: deleteManagementTokenF, + SCADAProvider: scadaM, + TelemetryProvider: telemetryM, + CloudConfig: config.CloudConfig{ + ManagementToken: token, + }, + }) + mgr.testUpdateSent = updateCh + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // Start the manager + err := mgr.Start(ctx) + require.NoError(t, err) + select { + case <-updateCh: + case <-time.After(time.Second): + require.Fail(t, "manager did not send update in expected time") + } + select { + case <-upsertManagementTokenCalled: + case <-time.After(time.Second): + require.Fail(t, "manager did not create token in expected time") + } + + // Send an update to ensure the manager is running in its main loop + mgr.SendUpdate() + select { + case <-updateCh: + case <-time.After(time.Second): + require.Fail(t, "manager did not send update in expected time") + } + + // Stop the manager + err = mgr.Stop() + require.NoError(t, err) + + // Validate that the management token delete function is called + select { + case <-deleteManagementTokenCalled: + case <-time.After(time.Millisecond * 100): + require.Fail(t, "manager did not create token in expected time") + } + + // Send an update, expect no update since manager is stopped + mgr.SendUpdate() + select { + case <-updateCh: + require.Fail(t, "manager sent update after stopped") + case <-time.After(time.Second): + } +} diff --git a/agent/hcp/mock_Manager.go b/agent/hcp/mock_Manager.go new file mode 100644 index 0000000000..422d9034d8 --- /dev/null +++ b/agent/hcp/mock_Manager.go @@ -0,0 +1,209 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package hcp + +import ( + client "github.com/hashicorp/consul/agent/hcp/client" + config "github.com/hashicorp/consul/agent/hcp/config" + + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// MockManager is an autogenerated mock type for the Manager type +type MockManager struct { + mock.Mock +} + +type MockManager_Expecter struct { + mock *mock.Mock +} + +func (_m *MockManager) EXPECT() *MockManager_Expecter { + return &MockManager_Expecter{mock: &_m.Mock} +} + +// GetCloudConfig provides a mock function with given fields: +func (_m *MockManager) GetCloudConfig() config.CloudConfig { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetCloudConfig") + } + + var r0 config.CloudConfig + if rf, ok := ret.Get(0).(func() config.CloudConfig); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(config.CloudConfig) + } + + return r0 +} + +// MockManager_GetCloudConfig_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCloudConfig' +type MockManager_GetCloudConfig_Call struct { + *mock.Call +} + +// GetCloudConfig is a helper method to define mock.On call +func (_e *MockManager_Expecter) GetCloudConfig() *MockManager_GetCloudConfig_Call { + return &MockManager_GetCloudConfig_Call{Call: _e.mock.On("GetCloudConfig")} +} + +func (_c *MockManager_GetCloudConfig_Call) Run(run func()) *MockManager_GetCloudConfig_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockManager_GetCloudConfig_Call) Return(_a0 config.CloudConfig) *MockManager_GetCloudConfig_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockManager_GetCloudConfig_Call) RunAndReturn(run func() config.CloudConfig) *MockManager_GetCloudConfig_Call { + _c.Call.Return(run) + return _c +} + +// Start provides a mock function with given fields: _a0 +func (_m *MockManager) Start(_a0 context.Context) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for Start") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockManager_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' +type MockManager_Start_Call struct { + *mock.Call +} + +// Start is a helper method to define mock.On call +// - _a0 context.Context +func (_e *MockManager_Expecter) Start(_a0 interface{}) *MockManager_Start_Call { + return &MockManager_Start_Call{Call: _e.mock.On("Start", _a0)} +} + +func (_c *MockManager_Start_Call) Run(run func(_a0 context.Context)) *MockManager_Start_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *MockManager_Start_Call) Return(_a0 error) *MockManager_Start_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockManager_Start_Call) RunAndReturn(run func(context.Context) error) *MockManager_Start_Call { + _c.Call.Return(run) + return _c +} + +// Stop provides a mock function with given fields: +func (_m *MockManager) Stop() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Stop") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockManager_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop' +type MockManager_Stop_Call struct { + *mock.Call +} + +// Stop is a helper method to define mock.On call +func (_e *MockManager_Expecter) Stop() *MockManager_Stop_Call { + return &MockManager_Stop_Call{Call: _e.mock.On("Stop")} +} + +func (_c *MockManager_Stop_Call) Run(run func()) *MockManager_Stop_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockManager_Stop_Call) Return(_a0 error) *MockManager_Stop_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockManager_Stop_Call) RunAndReturn(run func() error) *MockManager_Stop_Call { + _c.Call.Return(run) + return _c +} + +// UpdateConfig provides a mock function with given fields: _a0, _a1 +func (_m *MockManager) UpdateConfig(_a0 client.Client, _a1 config.CloudConfig) { + _m.Called(_a0, _a1) +} + +// MockManager_UpdateConfig_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateConfig' +type MockManager_UpdateConfig_Call struct { + *mock.Call +} + +// UpdateConfig is a helper method to define mock.On call +// - _a0 client.Client +// - _a1 config.CloudConfig +func (_e *MockManager_Expecter) UpdateConfig(_a0 interface{}, _a1 interface{}) *MockManager_UpdateConfig_Call { + return &MockManager_UpdateConfig_Call{Call: _e.mock.On("UpdateConfig", _a0, _a1)} +} + +func (_c *MockManager_UpdateConfig_Call) Run(run func(_a0 client.Client, _a1 config.CloudConfig)) *MockManager_UpdateConfig_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(client.Client), args[1].(config.CloudConfig)) + }) + return _c +} + +func (_c *MockManager_UpdateConfig_Call) Return() *MockManager_UpdateConfig_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_UpdateConfig_Call) RunAndReturn(run func(client.Client, config.CloudConfig)) *MockManager_UpdateConfig_Call { + _c.Call.Return(run) + return _c +} + +// NewMockManager creates a new instance of MockManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockManager(t interface { + mock.TestingT + Cleanup(func()) +}) *MockManager { + mock := &MockManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/hcp/mock_TelemetryProvider.go b/agent/hcp/mock_TelemetryProvider.go new file mode 100644 index 0000000000..f654575f5b --- /dev/null +++ b/agent/hcp/mock_TelemetryProvider.go @@ -0,0 +1,115 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package hcp + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// MockTelemetryProvider is an autogenerated mock type for the TelemetryProvider type +type MockTelemetryProvider struct { + mock.Mock +} + +type MockTelemetryProvider_Expecter struct { + mock *mock.Mock +} + +func (_m *MockTelemetryProvider) EXPECT() *MockTelemetryProvider_Expecter { + return &MockTelemetryProvider_Expecter{mock: &_m.Mock} +} + +// Start provides a mock function with given fields: ctx, c +func (_m *MockTelemetryProvider) Start(ctx context.Context, c *HCPProviderCfg) error { + ret := _m.Called(ctx, c) + + if len(ret) == 0 { + panic("no return value specified for Start") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *HCPProviderCfg) error); ok { + r0 = rf(ctx, c) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockTelemetryProvider_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' +type MockTelemetryProvider_Start_Call struct { + *mock.Call +} + +// Start is a helper method to define mock.On call +// - ctx context.Context +// - c *HCPProviderCfg +func (_e *MockTelemetryProvider_Expecter) Start(ctx interface{}, c interface{}) *MockTelemetryProvider_Start_Call { + return &MockTelemetryProvider_Start_Call{Call: _e.mock.On("Start", ctx, c)} +} + +func (_c *MockTelemetryProvider_Start_Call) Run(run func(ctx context.Context, c *HCPProviderCfg)) *MockTelemetryProvider_Start_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*HCPProviderCfg)) + }) + return _c +} + +func (_c *MockTelemetryProvider_Start_Call) Return(_a0 error) *MockTelemetryProvider_Start_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockTelemetryProvider_Start_Call) RunAndReturn(run func(context.Context, *HCPProviderCfg) error) *MockTelemetryProvider_Start_Call { + _c.Call.Return(run) + return _c +} + +// Stop provides a mock function with given fields: +func (_m *MockTelemetryProvider) Stop() { + _m.Called() +} + +// MockTelemetryProvider_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop' +type MockTelemetryProvider_Stop_Call struct { + *mock.Call +} + +// Stop is a helper method to define mock.On call +func (_e *MockTelemetryProvider_Expecter) Stop() *MockTelemetryProvider_Stop_Call { + return &MockTelemetryProvider_Stop_Call{Call: _e.mock.On("Stop")} +} + +func (_c *MockTelemetryProvider_Stop_Call) Run(run func()) *MockTelemetryProvider_Stop_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockTelemetryProvider_Stop_Call) Return() *MockTelemetryProvider_Stop_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTelemetryProvider_Stop_Call) RunAndReturn(run func()) *MockTelemetryProvider_Stop_Call { + _c.Call.Return(run) + return _c +} + +// NewMockTelemetryProvider creates a new instance of MockTelemetryProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockTelemetryProvider(t interface { + mock.TestingT + Cleanup(func()) +}) *MockTelemetryProvider { + mock := &MockTelemetryProvider{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/hcp/telemetry_provider.go b/agent/hcp/telemetry_provider.go index bf68520e35..1ae8682df8 100644 --- a/agent/hcp/telemetry_provider.go +++ b/agent/hcp/telemetry_provider.go @@ -35,6 +35,7 @@ var ( ) // Ensure hcpProviderImpl implements telemetry provider interfaces. +var _ TelemetryProvider = &hcpProviderImpl{} var _ telemetry.ConfigProvider = &hcpProviderImpl{} var _ telemetry.EndpointProvider = &hcpProviderImpl{} var _ client.MetricsClientProvider = &hcpProviderImpl{} @@ -56,6 +57,9 @@ type hcpProviderImpl struct { // running indicates if the HCP telemetry config provider has been started running bool + // stopCh is used to signal that the telemetry config provider should stop running. + stopCh chan struct{} + // hcpClient is an authenticated client used to make HTTP requests to HCP. hcpClient client.Client @@ -94,6 +98,12 @@ type httpCfg struct { client *retryablehttp.Client } +//go:generate mockery --name TelemetryProvider --with-expecter --inpackage +type TelemetryProvider interface { + Start(ctx context.Context, c *HCPProviderCfg) error + Stop() +} + type HCPProviderCfg struct { HCPClient client.Client HCPConfig config.CloudConfigurer @@ -111,9 +121,9 @@ func NewHCPProvider(ctx context.Context) *hcpProviderImpl { return h } -// Run starts a process that continuously checks for updates to the telemetry configuration +// Start starts a process that continuously checks for updates to the telemetry configuration // by making a request to HCP. It only starts running if it's not already running. -func (h *hcpProviderImpl) Run(ctx context.Context, c *HCPProviderCfg) error { +func (h *hcpProviderImpl) Start(ctx context.Context, c *HCPProviderCfg) error { changed := h.setRunning(true) if !changed { // Provider is already running. @@ -139,7 +149,7 @@ func (h *hcpProviderImpl) run(ctx context.Context) error { // Try to initialize config once before starting periodic fetch. h.updateConfig(ctx) - ticker := time.NewTicker(h.cfg.refreshInterval) + ticker := time.NewTicker(h.getRefreshInterval()) defer ticker.Stop() for { select { @@ -149,6 +159,8 @@ func (h *hcpProviderImpl) run(ctx context.Context) error { } case <-ctx.Done(): return nil + case <-h.stopCh: + return nil } } } @@ -224,6 +236,13 @@ func (h *hcpProviderImpl) modifyDynamicCfg(newCfg *dynamicConfig) { metrics.IncrCounter(internalMetricRefreshSuccess, 1) } +func (h *hcpProviderImpl) getRefreshInterval() time.Duration { + h.rw.RLock() + defer h.rw.RUnlock() + + return h.cfg.refreshInterval +} + // GetEndpoint acquires a read lock to return endpoint configuration for consumers. func (h *hcpProviderImpl) GetEndpoint() *url.URL { h.rw.RLock() @@ -322,7 +341,32 @@ func (h *hcpProviderImpl) setRunning(r bool) bool { return false } + // Initialize or close the stop channel depending what running status + // we're transitioning to. Channel must be initialized on start since + // a provider can be stopped and started multiple times. + if r { + h.stopCh = make(chan struct{}) + } else { + close(h.stopCh) + } + h.running = r return true } + +// Stop acquires a write lock to mark the provider as not running and sends a stop signal to the +// main run loop. It also updates the provider with a disabled configuration. +func (h *hcpProviderImpl) Stop() { + changed := h.setRunning(false) + if !changed { + h.logger.Trace("telemetry config provider already stopped") + return + } + + h.rw.Lock() + h.cfg = defaultDisabledCfg() + h.rw.Unlock() + + h.logger.Debug("telemetry config provider stopped") +} diff --git a/agent/hcp/telemetry_provider_test.go b/agent/hcp/telemetry_provider_test.go index eccca99c04..6801b9271e 100644 --- a/agent/hcp/telemetry_provider_test.go +++ b/agent/hcp/telemetry_provider_test.go @@ -286,7 +286,7 @@ func TestTelemetryConfigProvider_UpdateConfig(t *testing.T) { } } -func TestTelemetryConfigProvider_Run(t *testing.T) { +func TestTelemetryConfigProvider_Start(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -311,22 +311,23 @@ func TestTelemetryConfigProvider_Run(t *testing.T) { mockHCPCfg := &config.MockCloudCfg{} // Run provider - go provider.Run(context.Background(), &HCPProviderCfg{ + go provider.Start(context.Background(), &HCPProviderCfg{ HCPClient: mockClient, HCPConfig: mockHCPCfg, }) - var count int + // Expect at least two update config calls to validate provider is running + // and has entered the main run loop select { case <-testUpdateConfigCh: - // Expect/wait for at least two update config calls - count++ - if count > 2 { - break - } case <-time.After(time.Second): require.Fail(t, "provider did not attempt to update config in expected time") } + select { + case <-testUpdateConfigCh: + case <-time.After(time.Millisecond * 500): + require.Fail(t, "provider did not attempt to update config in expected time") + } mockClient.AssertExpectations(t) } @@ -351,11 +352,11 @@ func TestTelemetryConfigProvider_MultipleRun(t *testing.T) { mockHCPCfg := &config.MockCloudCfg{} // Run provider twice in parallel - go provider.Run(context.Background(), &HCPProviderCfg{ + go provider.Start(context.Background(), &HCPProviderCfg{ HCPClient: mockClient, HCPConfig: mockHCPCfg, }) - go provider.Run(context.Background(), &HCPProviderCfg{ + go provider.Start(context.Background(), &HCPProviderCfg{ HCPClient: mockClient, HCPConfig: mockHCPCfg, }) @@ -374,7 +375,7 @@ func TestTelemetryConfigProvider_MultipleRun(t *testing.T) { } // Try calling run again, should not update again - provider.Run(context.Background(), &HCPProviderCfg{ + provider.Start(context.Background(), &HCPProviderCfg{ HCPClient: mockClient, HCPConfig: mockHCPCfg, }) @@ -435,6 +436,62 @@ func TestTelemetryConfigProvider_updateHTTPConfig(t *testing.T) { } } +func TestTelemetryConfigProvider_Stop(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + provider := NewHCPProvider(ctx) + + testUpdateConfigCh := make(chan struct{}, 1) + provider.testUpdateConfigCh = testUpdateConfigCh + + // Configure mocks + mockClient := client.NewMockClient(t) + mTelemetryCfg, err := testTelemetryCfg(&testConfig{ + endpoint: "http://test.com/v1/metrics", + filters: "test", + labels: map[string]string{ + "test_label": "123", + }, + refreshInterval: testRefreshInterval, + }) + require.NoError(t, err) + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mTelemetryCfg, nil) + mockHCPCfg := &config.MockCloudCfg{} + + // Run provider + provider.Start(context.Background(), &HCPProviderCfg{ + HCPClient: mockClient, + HCPConfig: mockHCPCfg, + }) + + // Wait for at least two update config calls to ensure provider is running + // and has entered the main run loop + select { + case <-testUpdateConfigCh: + case <-time.After(time.Second): + require.Fail(t, "provider did not attempt to update config in expected time") + } + select { + case <-testUpdateConfigCh: + case <-time.After(time.Millisecond * 500): + require.Fail(t, "provider did not attempt to update config in expected time") + } + + // Stop the provider + provider.Stop() + require.Equal(t, defaultDisabledCfg(), provider.cfg) + select { + case <-testUpdateConfigCh: + require.Fail(t, "provider should not attempt to update config after stop") + case <-time.After(time.Second): + // Success, no updates have happened after stopping + } + + mockClient.AssertExpectations(t) +} + // mockRaceClient is a mock HCP client that fetches TelemetryConfig. // The mock TelemetryConfig returned can be manually updated at any time. // It manages concurrent read/write access to config with a sync.RWMutex. @@ -504,7 +561,7 @@ func TestTelemetryConfigProvider_Race(t *testing.T) { // Start the provider goroutine, which fetches client TelemetryConfig every RefreshInterval. provider := NewHCPProvider(ctx) - err = provider.Run(context.Background(), &HCPProviderCfg{m, config.MockCloudCfg{}}) + err = provider.Start(context.Background(), &HCPProviderCfg{m, config.MockCloudCfg{}}) require.NoError(t, err) for count := 0; count < testRaceWriteSampleCount; count++ { diff --git a/internal/hcp/internal/controllers/link/controller.go b/internal/hcp/internal/controllers/link/controller.go index df690971b6..e39ae4cccc 100644 --- a/internal/hcp/internal/controllers/link/controller.go +++ b/internal/hcp/internal/controllers/link/controller.go @@ -44,7 +44,7 @@ func LinkController( hcpClientFn HCPClientFn, cfg config.CloudConfig, dataDir string, - hcpManager *hcp.Manager, + hcpManager hcp.Manager, ) *controller.Controller { return controller.NewController("link", pbhcp.LinkType). // Placement is configured to each server so that the HCP manager is started @@ -70,7 +70,7 @@ type linkReconciler struct { hcpAllowV2ResourceApis bool hcpClientFn HCPClientFn dataDir string - hcpManager *hcp.Manager + hcpManager hcp.Manager } func hcpAccessLevelToConsul(level *gnmmod.HashicorpCloudGlobalNetworkManager20220215ClusterConsulAccessLevel) pbhcp.AccessLevel { @@ -101,7 +101,7 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r switch { case status.Code(err) == codes.NotFound: rt.Logger.Trace("link has been deleted") - return nil + return cleanup(rt, r.hcpManager, r.dataDir) case err != nil: rt.Logger.Error("the resource service has returned an unexpected error", "error", err) return err @@ -120,10 +120,17 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r } if resource.IsMarkedForDeletion(res) { - if err = cleanup(ctx, rt, res, r.dataDir); err != nil { + if err = cleanup(rt, r.hcpManager, r.dataDir); err != nil { rt.Logger.Error("error cleaning up link resource", "error", err) return err } + + err := ensureDeleted(ctx, rt, res) + if err != nil { + rt.Logger.Error("error deleting link resource", "error", err) + + return err + } return nil } diff --git a/internal/hcp/internal/controllers/link/controller_test.go b/internal/hcp/internal/controllers/link/controller_test.go index da4dd188d4..7eadc149ac 100644 --- a/internal/hcp/internal/controllers/link/controller_test.go +++ b/internal/hcp/internal/controllers/link/controller_test.go @@ -10,7 +10,6 @@ import ( "path/filepath" "testing" - "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models" "github.com/stretchr/testify/mock" @@ -108,15 +107,11 @@ func (suite *controllerSuite) TestController_Ok() { ConsulConfig: "{}", }, nil).Once() - statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { - return hcpclient.ServerStatus{ID: suite.T().Name()}, nil - } - mockClient.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: suite.T().Name()}). - Return(nil).Once() - hcpMgr := hcp.NewManager(hcp.ManagerConfig{ - Logger: hclog.NewNullLogger(), - StatusFn: statusF, - }) + hcpMgr := hcp.NewMockManager(suite.T()) + hcpMgr.EXPECT().GetCloudConfig().Return(config.CloudConfig{}) + hcpMgr.EXPECT().UpdateConfig(mock.Anything, mock.Anything) + hcpMgr.EXPECT().Start(mock.Anything).Return(nil) + hcpMgr.EXPECT().Stop().Return(nil) dataDir := testutil.TempDir(suite.T(), "test-link-controller") suite.dataDir = dataDir @@ -173,15 +168,11 @@ func (suite *controllerSuite) TestController_Initialize() { ResourceID: types.GenerateTestResourceID(suite.T()), } - statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { - return hcpclient.ServerStatus{ID: suite.T().Name()}, nil - } - mockClient.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: suite.T().Name()}). - Return(nil).Once() - hcpMgr := hcp.NewManager(hcp.ManagerConfig{ - Logger: hclog.NewNullLogger(), - StatusFn: statusF, - }) + hcpMgr := hcp.NewMockManager(suite.T()) + hcpMgr.EXPECT().GetCloudConfig().Return(cloudCfg) + hcpMgr.EXPECT().UpdateConfig(mock.Anything, mock.Anything) + hcpMgr.EXPECT().Start(mock.Anything).Return(nil) + hcpMgr.EXPECT().Stop().Return(nil) dataDir := testutil.TempDir(suite.T(), "test-link-controller") suite.dataDir = dataDir @@ -225,13 +216,16 @@ func (suite *controllerSuite) TestControllerResourceApisEnabled_LinkDisabled() { _, mockClientFunc := mockHcpClientFn(suite.T()) dataDir := testutil.TempDir(suite.T(), "test-link-controller") suite.dataDir = dataDir + + hcpMgr := hcp.NewMockManager(suite.T()) + hcpMgr.EXPECT().Stop().Return(nil) mgr.Register(LinkController( true, false, mockClientFunc, config.CloudConfig{}, dataDir, - hcp.NewManager(hcp.ManagerConfig{}), + hcpMgr, )) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) @@ -268,9 +262,11 @@ func (suite *controllerSuite) TestControllerResourceApisEnabledWithOverride_Link dataDir := testutil.TempDir(suite.T(), "test-link-controller") suite.dataDir = dataDir - hcpMgr := hcp.NewManager(hcp.ManagerConfig{ - Logger: hclog.NewNullLogger(), - }) + hcpMgr := hcp.NewMockManager(suite.T()) + hcpMgr.EXPECT().GetCloudConfig().Return(config.CloudConfig{}) + hcpMgr.EXPECT().UpdateConfig(mock.Anything, mock.Anything) + hcpMgr.EXPECT().Start(mock.Anything).Return(nil) + hcpMgr.EXPECT().Stop().Return(nil) mgr.Register(LinkController( true, @@ -322,35 +318,42 @@ func (suite *controllerSuite) TestController_GetClusterError() { suite.T().Run(name, func(t *testing.T) { // Run the controller manager mgr := controller.NewManager(suite.client, suite.rt.Logger) - mockClient, mockClientFunc := mockHcpClientFn(suite.T()) + mockClient, mockClientFunc := mockHcpClientFn(t) mockClient.EXPECT().GetCluster(mock.Anything).Return(nil, tc.expectErr) - dataDir := testutil.TempDir(suite.T(), "test-link-controller") + dataDir := testutil.TempDir(t, "test-link-controller") suite.dataDir = dataDir + + hcpMgr := hcp.NewMockManager(t) + hcpMgr.EXPECT().GetCloudConfig().Return(config.CloudConfig{}) + hcpMgr.EXPECT().Stop().Return(nil) + mgr.Register(LinkController( true, true, mockClientFunc, config.CloudConfig{}, dataDir, - hcp.NewManager(hcp.ManagerConfig{}), + hcpMgr, )) mgr.SetRaftLeader(true) - go mgr.Run(suite.ctx) + ctx, cancel := context.WithCancel(suite.ctx) + t.Cleanup(cancel) + go mgr.Run(ctx) linkData := &pbhcp.Link{ ClientId: "abc", ClientSecret: "abc", - ResourceId: types.GenerateTestResourceID(suite.T()), + ResourceId: types.GenerateTestResourceID(t), } link := rtest.Resource(pbhcp.LinkType, "global"). - WithData(suite.T(), linkData). - Write(suite.T(), suite.client) + WithData(t, linkData). + Write(t, suite.client) - suite.T().Cleanup(suite.deleteResourceFunc(link.Id)) + t.Cleanup(suite.deleteResourceFunc(link.Id)) - suite.client.WaitForStatusCondition(suite.T(), link.Id, StatusKey, tc.expectCondition) + suite.client.WaitForStatusCondition(t, link.Id, StatusKey, tc.expectCondition) }) } } diff --git a/internal/hcp/internal/controllers/link/finalizer.go b/internal/hcp/internal/controllers/link/finalizer.go index b29a7c70a4..1689779cac 100644 --- a/internal/hcp/internal/controllers/link/finalizer.go +++ b/internal/hcp/internal/controllers/link/finalizer.go @@ -8,28 +8,29 @@ import ( "os" "path/filepath" + "github.com/hashicorp/consul/agent/hcp" "github.com/hashicorp/consul/agent/hcp/bootstrap" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/proto-public/pbresource" ) -func cleanup(ctx context.Context, rt controller.Runtime, res *pbresource.Resource, dataDir string) error { +func cleanup(rt controller.Runtime, hcpManager hcp.Manager, dataDir string) error { rt.Logger.Trace("cleaning up link resource") + + rt.Logger.Debug("stopping HCP manager") + hcpManager.Stop() + if dataDir != "" { hcpConfigDir := filepath.Join(dataDir, bootstrap.SubDir) rt.Logger.Debug("deleting hcp-config dir", "dir", hcpConfigDir) err := os.RemoveAll(hcpConfigDir) if err != nil { + rt.Logger.Error("failed to delete hcp-config dir", "dir", hcpConfigDir, "err", err) return err } } - err := ensureDeleted(ctx, rt, res) - if err != nil { - return err - } - return nil } diff --git a/internal/hcp/internal/controllers/register.go b/internal/hcp/internal/controllers/register.go index cce864e00b..8f66bb0db1 100644 --- a/internal/hcp/internal/controllers/register.go +++ b/internal/hcp/internal/controllers/register.go @@ -15,7 +15,7 @@ type Dependencies struct { ResourceApisEnabled bool HCPAllowV2ResourceApis bool DataDir string - HCPManager *hcp.Manager + HCPManager *hcp.HCPManager } func Register(mgr *controller.Manager, deps Dependencies) {