From 3b9bb8d6f9f56c94db194f5507f898e64b913e05 Mon Sep 17 00:00:00 2001 From: Melissa Kam <3768460+mkam@users.noreply.github.com> Date: Mon, 29 Jan 2024 16:31:44 -0600 Subject: [PATCH] [CC-7044] Start HCP manager as part of link creation (#20312) * Check for ACL write permissions on write Link eventually will be creating a token, so require acl:write. * Convert Run to Start, only allow to start once * Always initialize HCP components at startup * Support for updating config and client * Pass HCP manager to controller * Start HCP manager in link resource Start as part of link creation rather than always starting. Update the HCP manager with values from the link before starting as well. * Fix metrics sink leaked goroutine * Remove the hardcoded disabled hostname prefix The HCP metrics sink will always be enabled, so the length of sinks will always be greater than zero. This also means that we will also always default to prefixing metrics with the hostname, which is what our documentation states is the expected behavior anyway. * Add changelog * Check and set running status in one method * Check for primary datacenter, add back test * Clarify merge reasoning, fix timing issue in test * Add comment about controller placement * Expand on breaking change, fix typo in changelog --- .changelog/20312.txt | 6 + agent/agent.go | 2 +- agent/agent_test.go | 5 - agent/config/builder.go | 4 +- agent/consul/server.go | 17 +- agent/consul/server_test.go | 24 ++- agent/hcp/config/config.go | 47 ++++++ agent/hcp/config/config_test.go | 82 ++++++++++ agent/hcp/deps.go | 11 +- agent/hcp/manager.go | 148 ++++++++++++------ agent/hcp/manager_test.go | 134 ++++++++++++++-- agent/hcp/telemetry/otel_sink.go | 4 + agent/hcp/telemetry_provider.go | 26 +-- agent/hcp/telemetry_provider_test.go | 57 +++++++ agent/setup.go | 25 ++- api/agent_test.go | 7 +- .../internal/controllers/link/controller.go | 51 ++++-- .../controllers/link/controller_test.go | 32 +++- internal/hcp/internal/controllers/register.go | 3 + internal/hcp/internal/types/link.go | 37 +++++ internal/hcp/internal/types/link_test.go | 6 +- lib/telemetry.go | 10 +- 22 files changed, 605 insertions(+), 133 deletions(-) create mode 100644 .changelog/20312.txt create mode 100644 agent/hcp/config/config_test.go diff --git a/.changelog/20312.txt b/.changelog/20312.txt new file mode 100644 index 0000000000..f42c4f17be --- /dev/null +++ b/.changelog/20312.txt @@ -0,0 +1,6 @@ +```release-note:feature +cloud: Adds new API/CLI to initiate and manage linking a Consul cluster to HCP Consul Central +``` +```release-note:breaking-change +telemetry: Adds fix to always use the value of `telemetry.disable_hostname` when determining whether to prefix gauge-type metrics with the hostname of the Consul agent. Previously, if only the default metric sink was enabled, this configuration was ignored and always treated as `true`, even though its default value is `false`. +``` \ No newline at end of file diff --git a/agent/agent.go b/agent/agent.go index cda7ed0290..8dc49e7da8 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1295,7 +1295,7 @@ func (a *Agent) listenHTTP() ([]apiServer, error) { } httpAddrs := a.config.HTTPAddrs - if a.config.IsCloudEnabled() { + if a.scadaProvider != nil { httpAddrs = append(httpAddrs, scada.CAPCoreAPI) } diff --git a/agent/agent_test.go b/agent/agent_test.go index 076ea75ce7..c70eb15338 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -37,7 +37,6 @@ import ( "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "golang.org/x/time/rate" @@ -6338,12 +6337,8 @@ func TestAgent_scadaProvider(t *testing.T) { require.NoError(t, err) defer require.NoError(t, l.Close()) - pvd.EXPECT().UpdateMeta(mock.Anything).Once() - pvd.EXPECT().Start().Return(nil).Once() pvd.EXPECT().Listen(scada.CAPCoreAPI.Capability()).Return(l, nil).Once() pvd.EXPECT().Stop().Return(nil).Once() - pvd.EXPECT().SessionStatus().Return("test") - pvd.EXPECT().UpdateHCPConfig(mock.Anything).Return(nil).Once() a := TestAgent{ OverrideDeps: func(deps *BaseDeps) { deps.HCP.Provider = pvd diff --git a/agent/config/builder.go b/agent/config/builder.go index dbb90f2ebf..3918daabd7 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1112,8 +1112,8 @@ func (b *builder) build() (rt RuntimeConfig, err error) { LocalProxyConfigResyncInterval: 30 * time.Second, } - // host metrics are enabled by default if consul is configured with HashiCorp Cloud Platform integration - rt.Telemetry.EnableHostMetrics = boolValWithDefault(c.Telemetry.EnableHostMetrics, rt.IsCloudEnabled()) + // host metrics are enabled by default to support HashiCorp Cloud Platform integration + rt.Telemetry.EnableHostMetrics = boolValWithDefault(c.Telemetry.EnableHostMetrics, true) rt.TLS, err = b.buildTLSConfig(rt, c.TLS) if err != nil { diff --git a/agent/consul/server.go b/agent/consul/server.go index c61896600b..8f3ac60886 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -595,14 +595,15 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, }) s.hcpManager = hcp.NewManager(hcp.ManagerConfig{ - CloudConfig: s.config.Cloud, - Client: flat.HCP.Client, + CloudConfig: flat.HCP.Config, StatusFn: s.hcpServerStatus(flat), Logger: logger.Named("hcp_manager"), SCADAProvider: flat.HCP.Provider, TelemetryProvider: flat.HCP.TelemetryProvider, ManagementTokenUpserterFn: func(name, secretId string) error { - if s.IsLeader() { + // Check the state of the server before attempting to upsert the token. Otherwise, + // the upsert will fail and log errors that do not require action from the user. + if s.config.ACLsEnabled && s.IsLeader() && s.InPrimaryDatacenter() { // Idea for improvement: Upsert a token with a well-known accessorId here instead // of a randomly generated one. This would prevent any possible insertion collision between // this and the insertion that happens during the ACL initialization process (initializeACLs function) @@ -953,15 +954,6 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, // Start the metrics handlers. go s.updateMetrics() - // Now we are setup, configure the HCP manager - go func() { - err := s.hcpManager.Run(&lib.StopChannelContext{StopCh: shutdownCh}) - if err != nil { - logger.Error("error starting HCP manager, some HashiCorp Cloud Platform functionality has been disabled", - "error", err) - } - }() - err = s.runEnterpriseRateLimiterConfigEntryController() if err != nil { return nil, err @@ -995,6 +987,7 @@ func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) error HCPAllowV2ResourceApis: s.hcpAllowV2Resources, CloudConfig: deps.HCP.Config, DataDir: deps.HCP.DataDir, + HCPManager: s.hcpManager, }) // When not enabled, the v1 tenancy bridge is used by default. diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index fe58542d76..be8b68f1d1 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -39,6 +39,7 @@ import ( external "github.com/hashicorp/consul/agent/grpc-external" grpcmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" hcpclient "github.com/hashicorp/consul/agent/hcp/client" + hcpconfig "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/agent/leafcert" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/rpc/middleware" @@ -2097,6 +2098,8 @@ func TestServer_Peering_LeadershipCheck(t *testing.T) { func TestServer_hcpManager(t *testing.T) { _, conf1 := testServerConfig(t) + + // Configure the server for the StatusFn conf1.BootstrapExpect = 1 conf1.RPCAdvertise = &net.TCPAddr{IP: []byte{127, 0, 0, 2}, Port: conf1.RPCAddr.Port} hcp1 := hcpclient.NewMockClient(t) @@ -2106,8 +2109,10 @@ func TestServer_hcpManager(t *testing.T) { require.Equal(t, status.LanAddress, "127.0.0.2") }).Call.Return(nil) + // Configure the server for the ManagementTokenUpserterFn + conf1.ACLsEnabled = true + deps1 := newDefaultDeps(t, conf1) - deps1.HCP.Client = hcp1 s1, err := newServerWithDeps(t, conf1, deps1) if err != nil { t.Fatalf("err: %v", err) @@ -2115,8 +2120,25 @@ func TestServer_hcpManager(t *testing.T) { defer s1.Shutdown() require.NotNil(t, s1.hcpManager) waitForLeaderEstablishment(t, s1) + + // Update the HCP manager and start it + token, err := uuid.GenerateUUID() + require.NoError(t, err) + s1.hcpManager.UpdateConfig(hcp1, hcpconfig.CloudConfig{ + ManagementToken: token, + }) + err = s1.hcpManager.Start(context.Background()) + require.NoError(t, err) + + // Validate that the server status pushed as expected hcp1.AssertExpectations(t) + // Validate that the HCP token has been created as expected + retry.Run(t, func(r *retry.R) { + _, createdToken, err := s1.fsm.State().ACLTokenGetBySecret(nil, token, nil) + require.NoError(r, err) + require.NotNil(r, createdToken) + }) } func TestServer_addServerTLSInfo(t *testing.T) { diff --git a/agent/hcp/config/config.go b/agent/hcp/config/config.go index 6a60f2892f..e91bfe079d 100644 --- a/agent/hcp/config/config.go +++ b/agent/hcp/config/config.go @@ -71,3 +71,50 @@ func (c *CloudConfig) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfi func (c *CloudConfig) IsConfigured() bool { return c.ResourceID != "" && c.ClientID != "" && c.ClientSecret != "" } + +// Merge returns a cloud configuration that is the combined the values of +// two configurations. +func Merge(o CloudConfig, n CloudConfig) CloudConfig { + c := o + if n.ResourceID != "" { + c.ResourceID = n.ResourceID + } + + if n.ClientID != "" { + c.ClientID = n.ClientID + } + + if n.ClientSecret != "" { + c.ClientSecret = n.ClientSecret + } + + if n.Hostname != "" { + c.Hostname = n.Hostname + } + + if n.AuthURL != "" { + c.AuthURL = n.AuthURL + } + + if n.ScadaAddress != "" { + c.ScadaAddress = n.ScadaAddress + } + + if n.ManagementToken != "" { + c.ManagementToken = n.ManagementToken + } + + if n.TLSConfig != nil { + c.TLSConfig = n.TLSConfig + } + + if n.NodeID != "" { + c.NodeID = n.NodeID + } + + if n.NodeName != "" { + c.NodeName = n.NodeName + } + + return c +} diff --git a/agent/hcp/config/config_test.go b/agent/hcp/config/config_test.go new file mode 100644 index 0000000000..ca07d4d94e --- /dev/null +++ b/agent/hcp/config/config_test.go @@ -0,0 +1,82 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package config + +import ( + "crypto/tls" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMerge(t *testing.T) { + oldCfg := CloudConfig{ + ResourceID: "old-resource-id", + ClientID: "old-client-id", + ClientSecret: "old-client-secret", + Hostname: "old-hostname", + AuthURL: "old-auth-url", + ScadaAddress: "old-scada-address", + ManagementToken: "old-token", + TLSConfig: &tls.Config{ + ServerName: "old-server-name", + }, + NodeID: "old-node-id", + NodeName: "old-node-name", + } + + newCfg := CloudConfig{ + ResourceID: "new-resource-id", + ClientID: "new-client-id", + ClientSecret: "new-client-secret", + Hostname: "new-hostname", + AuthURL: "new-auth-url", + ScadaAddress: "new-scada-address", + ManagementToken: "new-token", + TLSConfig: &tls.Config{ + ServerName: "new-server-name", + }, + NodeID: "new-node-id", + NodeName: "new-node-name", + } + + for name, tc := range map[string]struct { + newCfg CloudConfig + expectedCfg CloudConfig + }{ + "Empty": { + newCfg: CloudConfig{}, + expectedCfg: oldCfg, + }, + "All": { + newCfg: newCfg, + expectedCfg: newCfg, + }, + "Partial": { + newCfg: CloudConfig{ + ResourceID: newCfg.ResourceID, + ClientID: newCfg.ClientID, + ClientSecret: newCfg.ClientSecret, + ManagementToken: newCfg.ManagementToken, + }, + expectedCfg: CloudConfig{ + ResourceID: newCfg.ResourceID, + ClientID: newCfg.ClientID, + ClientSecret: newCfg.ClientSecret, + ManagementToken: newCfg.ManagementToken, + Hostname: oldCfg.Hostname, + AuthURL: oldCfg.AuthURL, + ScadaAddress: oldCfg.ScadaAddress, + TLSConfig: oldCfg.TLSConfig, + NodeID: oldCfg.NodeID, + NodeName: oldCfg.NodeName, + }, + }, + } { + t.Run(name, func(t *testing.T) { + merged := Merge(oldCfg, tc.newCfg) + require.Equal(t, tc.expectedCfg, merged) + }) + } +} diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index af00404c63..5027c88ac7 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -19,9 +19,8 @@ import ( // Deps contains the interfaces that the rest of Consul core depends on for HCP integration. type Deps struct { Config config.CloudConfig - Client client.Client Provider scada.Provider - Sink metrics.MetricSink + Sink metrics.ShutdownSink TelemetryProvider *hcpProviderImpl DataDir string } @@ -30,11 +29,6 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger, dataDir string) (Deps, ctx := context.Background() ctx = hclog.WithContext(ctx, logger) - hcpClient, err := client.NewClient(cfg) - if err != nil { - return Deps{}, fmt.Errorf("failed to init client: %w", err) - } - provider, err := scada.New(logger.Named("scada")) if err != nil { return Deps{}, fmt.Errorf("failed to init scada: %w", err) @@ -56,7 +50,6 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger, dataDir string) (Deps, return Deps{ Config: cfg, - Client: hcpClient, Provider: provider, Sink: sink, TelemetryProvider: metricsProvider, @@ -70,7 +63,7 @@ func sink( ctx context.Context, metricsClient telemetry.MetricsClient, cfgProvider *hcpProviderImpl, -) (metrics.MetricSink, error) { +) (metrics.ShutdownSink, error) { logger := hclog.FromContext(ctx) reader := telemetry.NewOTELReader(metricsClient, cfgProvider) diff --git a/agent/hcp/manager.go b/agent/hcp/manager.go index 84b9f5f7da..522f8a7f85 100644 --- a/agent/hcp/manager.go +++ b/agent/hcp/manager.go @@ -62,6 +62,9 @@ type ManagementTokenUpserter func(name, secretId string) error type Manager struct { logger hclog.Logger + running bool + runLock sync.RWMutex + cfg ManagerConfig cfgMu sync.RWMutex @@ -81,67 +84,90 @@ func NewManager(cfg ManagerConfig) *Manager { } } -// Run executes the Manager it's designed to be run in its own goroutine for -// the life of a server agent. It should be run even if HCP is not configured -// yet for servers since a config update might configure it later and -// UpdateConfig called. It will effectively do nothing if there are no HCP -// credentials set other than wait for some to be added. -func (m *Manager) Run(ctx context.Context) error { +// Start executes the logic for connecting to HCP and sending periodic server updates. If the +// manager has been previously started, it will not start again. +func (m *Manager) Start(ctx context.Context) error { + // Check if the manager has already started + changed := m.setRunning(true) + if !changed { + m.logger.Trace("HCP manager already started") + return nil + } + var err error - m.logger.Debug("HCP manager starting") + m.logger.Info("HCP manager starting") // Update and start the SCADA provider err = m.startSCADAProvider() if err != nil { m.logger.Error("failed to start scada provider", "error", err) + m.setRunning(false) return err } // Update and start the telemetry provider to enable the HCP metrics sink if err := m.startTelemetryProvider(ctx); err != nil { m.logger.Error("failed to update telemetry config provider", "error", err) + m.setRunning(false) return err } // immediately send initial update select { case <-ctx.Done(): + m.setRunning(false) return nil case <-m.updateCh: // empty the update chan if there is a queued update to prevent repeated update in main loop err = m.sendUpdate() + if err != nil { + m.setRunning(false) + return err + } default: err = m.sendUpdate() + if err != nil { + m.setRunning(false) + return err + } } // main loop - for { - // Check for configured management token from HCP and upsert it if found - if hcpManagement := m.cfg.CloudConfig.ManagementToken; len(hcpManagement) > 0 { - upsertTokenErr := m.cfg.ManagementTokenUpserterFn("HCP Management Token", hcpManagement) - if upsertTokenErr != nil { - m.logger.Error("failed to upsert HCP management token", "err", upsertTokenErr) + go func() { + for { + m.cfgMu.RLock() + cfg := m.cfg + m.cfgMu.RUnlock() + + // Check for configured management token from HCP and upsert it if found + if hcpManagement := cfg.CloudConfig.ManagementToken; len(hcpManagement) > 0 { + if cfg.ManagementTokenUpserterFn != nil { + upsertTokenErr := cfg.ManagementTokenUpserterFn("HCP Management Token", hcpManagement) + if upsertTokenErr != nil { + m.logger.Error("failed to upsert HCP management token", "err", upsertTokenErr) + } + } + } + + nextUpdate := cfg.nextHeartbeat() + if err != nil { + m.logger.Error("failed to send server status to HCP", "err", err, "next_heartbeat", nextUpdate.String()) + } + + select { + case <-ctx.Done(): + m.setRunning(false) + return + + case <-m.updateCh: + err = m.sendUpdate() + + case <-time.After(nextUpdate): + err = m.sendUpdate() } } + }() - m.cfgMu.RLock() - cfg := m.cfg - m.cfgMu.RUnlock() - nextUpdate := cfg.nextHeartbeat() - if err != nil { - m.logger.Error("failed to send server status to HCP", "err", err, "next_heartbeat", nextUpdate.String()) - } - - select { - case <-ctx.Done(): - return nil - - case <-m.updateCh: - err = m.sendUpdate() - - case <-time.After(nextUpdate): - err = m.sendUpdate() - } - } + return err } func (m *Manager) startSCADAProvider() error { @@ -184,21 +210,28 @@ func (m *Manager) startTelemetryProvider(ctx context.Context) error { return nil } -func (m *Manager) UpdateConfig(cfg ManagerConfig) { - m.cfgMu.Lock() - defer m.cfgMu.Unlock() - old := m.cfg - m.cfg = cfg - if old.enabled() || cfg.enabled() { - // Only log about this if cloud is actually configured or it would be - // confusing. We check both old and new in case we are disabling cloud or - // enabling it or just updating it. - m.logger.Info("updated HCP configuration") - } +func (m *Manager) GetCloudConfig() config.CloudConfig { + m.cfgMu.RLock() + defer m.cfgMu.RUnlock() - // Send a new status update since we might have just gotten connection details - // for the first time. - m.SendUpdate() + return m.cfg.CloudConfig +} + +func (m *Manager) UpdateConfig(client hcpclient.Client, cloudCfg config.CloudConfig) { + m.cfgMu.Lock() + // Save original values + originalCfg := m.cfg.CloudConfig + originalClient := m.cfg.Client + + // Update with new values + m.cfg.Client = client + m.cfg.CloudConfig = cloudCfg + m.cfgMu.Unlock() + + // Send update if already running and values were updated + if m.isRunning() && (originalClient != client || originalCfg != cloudCfg) { + m.SendUpdate() + } } func (m *Manager) SendUpdate() { @@ -245,5 +278,26 @@ func (m *Manager) sendUpdate() error { return err } - return m.cfg.Client.PushServerStatus(ctx, &s) + return cfg.Client.PushServerStatus(ctx, &s) +} + +func (m *Manager) isRunning() bool { + m.runLock.RLock() + defer m.runLock.RUnlock() + return m.running +} + +// setRunning sets the running status of the manager to the given value. If the +// given value is the same as the current running status, it returns false. If +// current status is updated to the given status, it returns true. +func (m *Manager) setRunning(r bool) bool { + m.runLock.Lock() + defer m.runLock.Unlock() + + if m.running == r { + return false + } + + m.running = r + return true } diff --git a/agent/hcp/manager_test.go b/agent/hcp/manager_test.go index e4195a6df3..2c00c87521 100644 --- a/agent/hcp/manager_test.go +++ b/agent/hcp/manager_test.go @@ -17,7 +17,7 @@ import ( "golang.org/x/net/context" ) -func TestManager_Run(t *testing.T) { +func TestManager_Start(t *testing.T) { client := hcpclient.NewMockClient(t) statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { return hcpclient.ServerStatus{ID: t.Name()}, nil @@ -36,12 +36,12 @@ func TestManager_Run(t *testing.T) { ManagementToken: "fake-token", } scadaM := scada.NewMockProvider(t) - scadaM.EXPECT().UpdateHCPConfig(cloudCfg).Return(nil) + scadaM.EXPECT().UpdateHCPConfig(cloudCfg).Return(nil).Once() scadaM.EXPECT().UpdateMeta( map[string]string{ "consul_server_id": string(cloudCfg.NodeID), }, - ).Return() + ).Return().Once() scadaM.EXPECT().Start().Return(nil) telemetryProvider := &hcpProviderImpl{ @@ -58,11 +58,9 @@ func TestManager_Run(t *testing.T) { mockTelemetryCfg, nil).Maybe() mgr := NewManager(ManagerConfig{ - Client: client, Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), StatusFn: statusF, ManagementTokenUpserterFn: upsertManagementTokenF, - CloudConfig: cloudCfg, SCADAProvider: scadaM, TelemetryProvider: telemetryProvider, }) @@ -70,20 +68,138 @@ func TestManager_Run(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - go mgr.Run(ctx) + mgr.UpdateConfig(client, cloudCfg) + mgr.Start(ctx) select { case <-updateCh: case <-time.After(time.Second): require.Fail(t, "manager did not send update in expected time") } + select { + case <-upsertManagementTokenCalled: + case <-time.After(time.Second): + require.Fail(t, "manager did not upsert management token in expected time") + } + // Make sure after manager has stopped no more statuses are pushed. cancel() client.AssertExpectations(t) require.Equal(t, client, telemetryProvider.hcpClient) require.NotNil(t, telemetryProvider.GetHeader()) require.NotNil(t, telemetryProvider.GetHTTPClient()) - require.NotEmpty(t, upsertManagementTokenCalled, "upsert management token function not called") +} + +func TestManager_StartMultipleTimes(t *testing.T) { + client := hcpclient.NewMockClient(t) + statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { + return hcpclient.ServerStatus{ID: t.Name()}, nil + } + + updateCh := make(chan struct{}, 1) + client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once() + + cloudCfg := config.CloudConfig{ + ResourceID: "organization/85702e73-8a3d-47dc-291c-379b783c5804/project/8c0547c0-10e8-1ea2-dffe-384bee8da634/hashicorp.consul.global-network-manager.cluster/test", + NodeID: "node-1", + ManagementToken: "fake-token", + } + + mgr := NewManager(ManagerConfig{ + Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), + StatusFn: statusF, + }) + + mgr.testUpdateSent = updateCh + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // Start the manager twice concurrently, expect only one update + mgr.UpdateConfig(client, cloudCfg) + go mgr.Start(ctx) + go mgr.Start(ctx) + select { + case <-updateCh: + case <-time.After(time.Second): + require.Fail(t, "manager did not send update in expected time") + } + + select { + case <-updateCh: + require.Fail(t, "manager sent an update when not expected") + case <-time.After(time.Second): + } + + // Try start the manager again, still don't expect an update since already running + mgr.Start(ctx) + select { + case <-updateCh: + require.Fail(t, "manager sent an update when not expected") + case <-time.After(time.Second): + } +} + +func TestManager_UpdateConfig(t *testing.T) { + client := hcpclient.NewMockClient(t) + statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { + return hcpclient.ServerStatus{ID: t.Name()}, nil + } + + updateCh := make(chan struct{}, 1) + + cloudCfg := config.CloudConfig{ + ResourceID: "organization/85702e73-8a3d-47dc-291c-379b783c5804/project/8c0547c0-10e8-1ea2-dffe-384bee8da634/hashicorp.consul.global-network-manager.cluster/test", + NodeID: "node-1", + } + + mgr := NewManager(ManagerConfig{ + Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), + StatusFn: statusF, + CloudConfig: cloudCfg, + Client: client, + }) + + mgr.testUpdateSent = updateCh + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // Start the manager, expect an initial status update + client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once() + mgr.Start(ctx) + select { + case <-updateCh: + case <-time.After(time.Second): + require.Fail(t, "manager did not send update in expected time") + } + + // Update the cloud configuration, expect a status update + client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once() + updatedCfg := cloudCfg + updatedCfg.ManagementToken = "token" + mgr.UpdateConfig(client, updatedCfg) + select { + case <-updateCh: + case <-time.After(time.Second): + require.Fail(t, "manager did not send update in expected time") + } + + // Update the client, expect a status update + updatedClient := hcpclient.NewMockClient(t) + updatedClient.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once() + mgr.UpdateConfig(updatedClient, updatedCfg) + select { + case <-updateCh: + case <-time.After(time.Second): + require.Fail(t, "manager did not send update in expected time") + } + + // Update with the same values, don't expect a status update + mgr.UpdateConfig(updatedClient, updatedCfg) + select { + case <-updateCh: + require.Fail(t, "manager sent an update when not expected") + case <-time.After(time.Second): + } } func TestManager_SendUpdate(t *testing.T) { @@ -105,7 +221,7 @@ func TestManager_SendUpdate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go mgr.Run(ctx) + mgr.Start(ctx) select { case <-updateCh: case <-time.After(time.Second): @@ -141,7 +257,7 @@ func TestManager_SendUpdate_Periodic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go mgr.Run(ctx) + mgr.Start(ctx) select { case <-updateCh: case <-time.After(time.Second): diff --git a/agent/hcp/telemetry/otel_sink.go b/agent/hcp/telemetry/otel_sink.go index 12aae98201..ad31077404 100644 --- a/agent/hcp/telemetry/otel_sink.go +++ b/agent/hcp/telemetry/otel_sink.go @@ -136,6 +136,10 @@ func NewOTELSink(ctx context.Context, opts *OTELSinkOpts) (*OTELSink, error) { }, nil } +func (o *OTELSink) Shutdown() { + o.meterProvider.Shutdown(context.TODO()) +} + // SetGauge emits a Consul gauge metric. func (o *OTELSink) SetGauge(key []string, val float32) { o.SetGaugeWithLabels(key, val, nil) diff --git a/agent/hcp/telemetry_provider.go b/agent/hcp/telemetry_provider.go index 34a55ebc3b..bf68520e35 100644 --- a/agent/hcp/telemetry_provider.go +++ b/agent/hcp/telemetry_provider.go @@ -114,14 +114,12 @@ func NewHCPProvider(ctx context.Context) *hcpProviderImpl { // Run starts a process that continuously checks for updates to the telemetry configuration // by making a request to HCP. It only starts running if it's not already running. func (h *hcpProviderImpl) Run(ctx context.Context, c *HCPProviderCfg) error { - if h.isRunning() { + changed := h.setRunning(true) + if !changed { + // Provider is already running. return nil } - h.rw.Lock() - h.running = true - h.rw.Unlock() - // Update the provider with the HCP configurations h.hcpClient = c.HCPClient err := h.updateHTTPConfig(c.HCPConfig) @@ -313,10 +311,18 @@ func (h *hcpProviderImpl) GetHTTPClient() *retryablehttp.Client { return h.httpCfg.client } -// isRunning acquires a read lock to return whether the provider is running. -func (h *hcpProviderImpl) isRunning() bool { - h.rw.RLock() - defer h.rw.RUnlock() +// setRunning acquires a write lock to set whether the provider is running. +// If the given value is the same as the current running status, it returns +// false. If current status is updated to the given status, it returns true. +func (h *hcpProviderImpl) setRunning(r bool) bool { + h.rw.Lock() + defer h.rw.Unlock() - return h.running + if h.running == r { + return false + } + + h.running = r + + return true } diff --git a/agent/hcp/telemetry_provider_test.go b/agent/hcp/telemetry_provider_test.go index 03bb15b265..eccca99c04 100644 --- a/agent/hcp/telemetry_provider_test.go +++ b/agent/hcp/telemetry_provider_test.go @@ -331,6 +331,63 @@ func TestTelemetryConfigProvider_Run(t *testing.T) { mockClient.AssertExpectations(t) } +func TestTelemetryConfigProvider_MultipleRun(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + provider := NewHCPProvider(ctx) + + testUpdateConfigCh := make(chan struct{}, 1) + provider.testUpdateConfigCh = testUpdateConfigCh + + // Configure mocks + mockClient := client.NewMockClient(t) + mTelemetryCfg, err := testTelemetryCfg(&testConfig{ + refreshInterval: 30 * time.Minute, + }) + require.NoError(t, err) + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mTelemetryCfg, nil) + mockHCPCfg := &config.MockCloudCfg{} + + // Run provider twice in parallel + go provider.Run(context.Background(), &HCPProviderCfg{ + HCPClient: mockClient, + HCPConfig: mockHCPCfg, + }) + go provider.Run(context.Background(), &HCPProviderCfg{ + HCPClient: mockClient, + HCPConfig: mockHCPCfg, + }) + + // Expect only one update config call + select { + case <-testUpdateConfigCh: + case <-time.After(time.Second): + require.Fail(t, "provider did not attempt to update config in expected time") + } + + select { + case <-testUpdateConfigCh: + require.Fail(t, "provider unexpectedly updated config") + case <-time.After(time.Second): + } + + // Try calling run again, should not update again + provider.Run(context.Background(), &HCPProviderCfg{ + HCPClient: mockClient, + HCPConfig: mockHCPCfg, + }) + + select { + case <-testUpdateConfigCh: + require.Fail(t, "provider unexpectedly updated config") + case <-time.After(time.Second): + } + + mockClient.AssertExpectations(t) +} + func TestTelemetryConfigProvider_updateHTTPConfig(t *testing.T) { for name, test := range map[string]struct { wantErr string diff --git a/agent/setup.go b/agent/setup.go index 2c218ea13a..7dc8cdb9a7 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -137,21 +137,15 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl cfg.Telemetry.PrometheusOpts.SummaryDefinitions = summaries var extraSinks []metrics.MetricSink - if cfg.IsCloudEnabled() { - // This values is set late within newNodeIDFromConfig above - cfg.Cloud.NodeID = cfg.NodeID + // This values is set late within newNodeIDFromConfig above + cfg.Cloud.NodeID = cfg.NodeID - d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger.Named("hcp"), cfg.DataDir) - if err != nil { - return d, err - } - if d.HCP.Sink != nil { - extraSinks = append(extraSinks, d.HCP.Sink) - } - } else { - d.HCP = hcp.Deps{ - DataDir: cfg.DataDir, - } + d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger.Named("hcp"), cfg.DataDir) + if err != nil { + return d, err + } + if d.HCP.Sink != nil { + extraSinks = append(extraSinks, d.HCP.Sink) } d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger, extraSinks...) @@ -277,6 +271,9 @@ func (bd BaseDeps) Close() { bd.AutoConfig.Stop() bd.LeafCertManager.Stop() bd.MetricsConfig.Cancel() + if bd.HCP.Sink != nil { + bd.HCP.Sink.Shutdown() + } for _, fn := range []func(){bd.deregisterBalancer, bd.deregisterResolver, bd.stopHostCollector} { if fn != nil { diff --git a/api/agent_test.go b/api/agent_test.go index 511d51607d..067564f04e 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -58,8 +58,13 @@ func TestAPI_AgentMetrics(t *testing.T) { if err != nil { r.Fatalf("err: %v", err) } + hostname, err := os.Hostname() + if err != nil { + r.Fatalf("error determining hostname: %v", err) + } + metricName := fmt.Sprintf("consul.%s.runtime.alloc_bytes", hostname) for _, g := range metrics.Gauges { - if g.Name == "consul.runtime.alloc_bytes" { + if g.Name == metricName { return } } diff --git a/internal/hcp/internal/controllers/link/controller.go b/internal/hcp/internal/controllers/link/controller.go index d7ad99ba4b..df690971b6 100644 --- a/internal/hcp/internal/controllers/link/controller.go +++ b/internal/hcp/internal/controllers/link/controller.go @@ -12,6 +12,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" + "github.com/hashicorp/consul/agent/hcp" "github.com/hashicorp/consul/agent/hcp/bootstrap" hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/agent/hcp/config" @@ -27,14 +28,10 @@ import ( // This function type should be passed to a LinkController in order to tell it how to make a client from // a Link. For normal use, DefaultHCPClientFn should be used, but tests can substitute in a function that creates a // mock client. -type HCPClientFn func(link *pbhcp.Link) (hcpclient.Client, error) +type HCPClientFn func(config.CloudConfig) (hcpclient.Client, error) -var DefaultHCPClientFn HCPClientFn = func(link *pbhcp.Link) (hcpclient.Client, error) { - hcpClient, err := hcpclient.NewClient(config.CloudConfig{ - ResourceID: link.ResourceId, - ClientID: link.ClientId, - ClientSecret: link.ClientSecret, - }) +var DefaultHCPClientFn HCPClientFn = func(cfg config.CloudConfig) (hcpclient.Client, error) { + hcpClient, err := hcpclient.NewClient(cfg) if err != nil { return nil, err } @@ -47,8 +44,15 @@ func LinkController( hcpClientFn HCPClientFn, cfg config.CloudConfig, dataDir string, + hcpManager *hcp.Manager, ) *controller.Controller { return controller.NewController("link", pbhcp.LinkType). + // Placement is configured to each server so that the HCP manager is started + // on each server. We plan to implement an alternative strategy to starting + // the HCP manager so that the controller placement will eventually only be + // on the leader. + // https://hashicorp.atlassian.net/browse/CC-7364 + WithPlacement(controller.PlacementEachServer). WithInitializer(&linkInitializer{ cloudConfig: cfg, }). @@ -57,6 +61,7 @@ func LinkController( hcpAllowV2ResourceApis: hcpAllowV2ResourceApis, hcpClientFn: hcpClientFn, dataDir: dataDir, + hcpManager: hcpManager, }) } @@ -65,6 +70,7 @@ type linkReconciler struct { hcpAllowV2ResourceApis bool hcpClientFn HCPClientFn dataDir string + hcpManager *hcp.Manager } func hcpAccessLevelToConsul(level *gnmmod.HashicorpCloudGlobalNetworkManager20220215ClusterConsulAccessLevel) pbhcp.AccessLevel { @@ -131,7 +137,18 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r return writeStatusIfNotEqual(ctx, rt, res, newStatus) } - hcpClient, err := r.hcpClientFn(&link) + // Merge the link data with the existing cloud config so that we only overwrite the + // fields that are provided by the link. This ensures that: + // 1. The HCP configuration (i.e., how to connect to HCP) is preserved + // 2. The Consul agent's node ID and node name are preserved + existingCfg := r.hcpManager.GetCloudConfig() + newCfg := config.CloudConfig{ + ResourceID: link.ResourceId, + ClientID: link.ClientId, + ClientSecret: link.ClientSecret, + } + cfg := config.Merge(existingCfg, newCfg) + hcpClient, err := r.hcpClientFn(cfg) if err != nil { rt.Logger.Error("error creating HCP client", "error", err) return err @@ -159,7 +176,7 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r } _, err = rt.Client.Write(ctx, &pbresource.WriteRequest{Resource: &pbresource.Resource{ Id: &pbresource.ID{ - Name: "global", + Name: types.LinkName, Type: pbhcp.LinkType, }, Metadata: res.Metadata, @@ -173,13 +190,25 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r // Load the management token if access is not set to read-only. Read-only clusters // will not have a management token provided by HCP. + var token string if accessLevel != pbhcp.AccessLevel_ACCESS_LEVEL_GLOBAL_READ_ONLY { - _, err = bootstrap.LoadManagementToken(ctx, rt.Logger, hcpClient, r.dataDir) + token, err = bootstrap.LoadManagementToken(ctx, rt.Logger, hcpClient, r.dataDir) if err != nil { linkingFailed(ctx, rt, res, err) return err } - // TODO: Update the HCP manager with the loaded management token as part of CC-7044 + } + + // Update the HCP manager configuration with the link values + cfg.ManagementToken = token + r.hcpManager.UpdateConfig(hcpClient, cfg) + + // Start the manager + err = r.hcpManager.Start(ctx) + if err != nil { + rt.Logger.Error("error starting HCP manager", "error", err) + linkingFailed(ctx, rt, res, err) + return err } newStatus = &pbresource.Status{ diff --git a/internal/hcp/internal/controllers/link/controller_test.go b/internal/hcp/internal/controllers/link/controller_test.go index d4b911390e..da4dd188d4 100644 --- a/internal/hcp/internal/controllers/link/controller_test.go +++ b/internal/hcp/internal/controllers/link/controller_test.go @@ -10,6 +10,7 @@ import ( "path/filepath" "testing" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models" "github.com/stretchr/testify/mock" @@ -17,6 +18,7 @@ import ( "github.com/stretchr/testify/suite" svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" + "github.com/hashicorp/consul/agent/hcp" "github.com/hashicorp/consul/agent/hcp/bootstrap" hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/agent/hcp/config" @@ -44,7 +46,7 @@ type controllerSuite struct { func mockHcpClientFn(t *testing.T) (*hcpclient.MockClient, HCPClientFn) { mockClient := hcpclient.NewMockClient(t) - mockClientFunc := func(link *pbhcp.Link) (hcpclient.Client, error) { + mockClientFunc := func(config config.CloudConfig) (hcpclient.Client, error) { return mockClient, nil } @@ -106,6 +108,16 @@ func (suite *controllerSuite) TestController_Ok() { ConsulConfig: "{}", }, nil).Once() + statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { + return hcpclient.ServerStatus{ID: suite.T().Name()}, nil + } + mockClient.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: suite.T().Name()}). + Return(nil).Once() + hcpMgr := hcp.NewManager(hcp.ManagerConfig{ + Logger: hclog.NewNullLogger(), + StatusFn: statusF, + }) + dataDir := testutil.TempDir(suite.T(), "test-link-controller") suite.dataDir = dataDir mgr.Register(LinkController( @@ -114,6 +126,7 @@ func (suite *controllerSuite) TestController_Ok() { mockClientFn, config.CloudConfig{}, dataDir, + hcpMgr, )) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) @@ -160,6 +173,16 @@ func (suite *controllerSuite) TestController_Initialize() { ResourceID: types.GenerateTestResourceID(suite.T()), } + statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { + return hcpclient.ServerStatus{ID: suite.T().Name()}, nil + } + mockClient.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: suite.T().Name()}). + Return(nil).Once() + hcpMgr := hcp.NewManager(hcp.ManagerConfig{ + Logger: hclog.NewNullLogger(), + StatusFn: statusF, + }) + dataDir := testutil.TempDir(suite.T(), "test-link-controller") suite.dataDir = dataDir @@ -169,6 +192,7 @@ func (suite *controllerSuite) TestController_Initialize() { mockClientFn, cloudCfg, dataDir, + hcpMgr, )) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) @@ -207,6 +231,7 @@ func (suite *controllerSuite) TestControllerResourceApisEnabled_LinkDisabled() { mockClientFunc, config.CloudConfig{}, dataDir, + hcp.NewManager(hcp.ManagerConfig{}), )) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) @@ -243,6 +268,9 @@ func (suite *controllerSuite) TestControllerResourceApisEnabledWithOverride_Link dataDir := testutil.TempDir(suite.T(), "test-link-controller") suite.dataDir = dataDir + hcpMgr := hcp.NewManager(hcp.ManagerConfig{ + Logger: hclog.NewNullLogger(), + }) mgr.Register(LinkController( true, @@ -250,6 +278,7 @@ func (suite *controllerSuite) TestControllerResourceApisEnabledWithOverride_Link mockClientFunc, config.CloudConfig{}, dataDir, + hcpMgr, )) mgr.SetRaftLeader(true) @@ -304,6 +333,7 @@ func (suite *controllerSuite) TestController_GetClusterError() { mockClientFunc, config.CloudConfig{}, dataDir, + hcp.NewManager(hcp.ManagerConfig{}), )) mgr.SetRaftLeader(true) diff --git a/internal/hcp/internal/controllers/register.go b/internal/hcp/internal/controllers/register.go index 6920fbecd5..cce864e00b 100644 --- a/internal/hcp/internal/controllers/register.go +++ b/internal/hcp/internal/controllers/register.go @@ -4,6 +4,7 @@ package controllers import ( + "github.com/hashicorp/consul/agent/hcp" "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/hcp/internal/controllers/link" @@ -14,6 +15,7 @@ type Dependencies struct { ResourceApisEnabled bool HCPAllowV2ResourceApis bool DataDir string + HCPManager *hcp.Manager } func Register(mgr *controller.Manager, deps Dependencies) { @@ -23,5 +25,6 @@ func Register(mgr *controller.Manager, deps Dependencies) { link.DefaultHCPClientFn, deps.CloudConfig, deps.DataDir, + deps.HCPManager, )) } diff --git a/internal/hcp/internal/types/link.go b/internal/hcp/internal/types/link.go index 80e0343089..6ab07c5240 100644 --- a/internal/hcp/internal/types/link.go +++ b/internal/hcp/internal/types/link.go @@ -6,8 +6,10 @@ package types import ( "errors" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/internal/resource" pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2" + "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/go-multierror" hcpresource "github.com/hashicorp/hcp-sdk-go/resource" ) @@ -31,9 +33,44 @@ func RegisterLink(r resource.Registry) { Proto: &pbhcp.Link{}, Scope: resource.ScopeCluster, Validate: ValidateLink, + ACLs: &resource.ACLHooks{ + Read: aclReadHookLink, + Write: aclWriteHookLink, + List: aclListHookLink, + }, }) } +func aclReadHookLink(authorizer acl.Authorizer, authzContext *acl.AuthorizerContext, _ *pbresource.ID, _ *pbresource.Resource) error { + err := authorizer.ToAllowAuthorizer().OperatorReadAllowed(authzContext) + if err != nil { + return err + } + return nil +} + +func aclWriteHookLink(authorizer acl.Authorizer, authzContext *acl.AuthorizerContext, _ *pbresource.Resource) error { + err := authorizer.ToAllowAuthorizer().OperatorWriteAllowed(authzContext) + if err != nil { + return err + } + + err = authorizer.ToAllowAuthorizer().ACLWriteAllowed(authzContext) + if err != nil { + return err + } + + return nil +} + +func aclListHookLink(authorizer acl.Authorizer, authzContext *acl.AuthorizerContext) error { + err := authorizer.ToAllowAuthorizer().OperatorReadAllowed(authzContext) + if err != nil { + return err + } + return nil +} + var ValidateLink = resource.DecodeAndValidate(validateLink) func validateLink(res *DecodedLink) error { diff --git a/internal/hcp/internal/types/link_test.go b/internal/hcp/internal/types/link_test.go index d53ec909cd..46f03a4a81 100644 --- a/internal/hcp/internal/types/link_test.go +++ b/internal/hcp/internal/types/link_test.go @@ -182,15 +182,15 @@ func TestLinkACLs(t *testing.T) { WriteOK: rtest.DENY, ListOK: rtest.DENY, }, - "link test read": { - Rules: `operator = "read"`, + "link test read and list": { + Rules: `{"operator": "read"}`, Res: link, ReadOK: rtest.ALLOW, WriteOK: rtest.DENY, ListOK: rtest.ALLOW, }, "link test write": { - Rules: `operator = "write"`, + Rules: `{"operator": "write", "acl": "write"}`, Res: link, ReadOK: rtest.ALLOW, WriteOK: rtest.ALLOW, diff --git a/lib/telemetry.go b/lib/telemetry.go index e06341eefb..bb6f4624ae 100644 --- a/lib/telemetry.go +++ b/lib/telemetry.go @@ -360,13 +360,9 @@ func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink, extraSinks } } - if len(sinks) > 0 { - sinks = append(sinks, memSink) - metrics.NewGlobal(metricsConf, sinks) - } else { - metricsConf.EnableHostname = false - metrics.NewGlobal(metricsConf, memSink) - } + sinks = append(sinks, memSink) + metrics.NewGlobal(metricsConf, sinks) + return sinks, errors }