diff --git a/.changelog/20312.txt b/.changelog/20312.txt new file mode 100644 index 0000000000..f42c4f17be --- /dev/null +++ b/.changelog/20312.txt @@ -0,0 +1,6 @@ +```release-note:feature +cloud: Adds new API/CLI to initiate and manage linking a Consul cluster to HCP Consul Central +``` +```release-note:breaking-change +telemetry: Adds fix to always use the value of `telemetry.disable_hostname` when determining whether to prefix gauge-type metrics with the hostname of the Consul agent. Previously, if only the default metric sink was enabled, this configuration was ignored and always treated as `true`, even though its default value is `false`. +``` \ No newline at end of file diff --git a/agent/agent.go b/agent/agent.go index cda7ed0290..8dc49e7da8 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1295,7 +1295,7 @@ func (a *Agent) listenHTTP() ([]apiServer, error) { } httpAddrs := a.config.HTTPAddrs - if a.config.IsCloudEnabled() { + if a.scadaProvider != nil { httpAddrs = append(httpAddrs, scada.CAPCoreAPI) } diff --git a/agent/agent_test.go b/agent/agent_test.go index 076ea75ce7..c70eb15338 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -37,7 +37,6 @@ import ( "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "golang.org/x/time/rate" @@ -6338,12 +6337,8 @@ func TestAgent_scadaProvider(t *testing.T) { require.NoError(t, err) defer require.NoError(t, l.Close()) - pvd.EXPECT().UpdateMeta(mock.Anything).Once() - pvd.EXPECT().Start().Return(nil).Once() pvd.EXPECT().Listen(scada.CAPCoreAPI.Capability()).Return(l, nil).Once() pvd.EXPECT().Stop().Return(nil).Once() - pvd.EXPECT().SessionStatus().Return("test") - pvd.EXPECT().UpdateHCPConfig(mock.Anything).Return(nil).Once() a := TestAgent{ OverrideDeps: func(deps *BaseDeps) { deps.HCP.Provider = pvd diff --git a/agent/config/builder.go b/agent/config/builder.go index dbb90f2ebf..3918daabd7 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1112,8 +1112,8 @@ func (b *builder) build() (rt RuntimeConfig, err error) { LocalProxyConfigResyncInterval: 30 * time.Second, } - // host metrics are enabled by default if consul is configured with HashiCorp Cloud Platform integration - rt.Telemetry.EnableHostMetrics = boolValWithDefault(c.Telemetry.EnableHostMetrics, rt.IsCloudEnabled()) + // host metrics are enabled by default to support HashiCorp Cloud Platform integration + rt.Telemetry.EnableHostMetrics = boolValWithDefault(c.Telemetry.EnableHostMetrics, true) rt.TLS, err = b.buildTLSConfig(rt, c.TLS) if err != nil { diff --git a/agent/consul/server.go b/agent/consul/server.go index c61896600b..8f3ac60886 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -595,14 +595,15 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, }) s.hcpManager = hcp.NewManager(hcp.ManagerConfig{ - CloudConfig: s.config.Cloud, - Client: flat.HCP.Client, + CloudConfig: flat.HCP.Config, StatusFn: s.hcpServerStatus(flat), Logger: logger.Named("hcp_manager"), SCADAProvider: flat.HCP.Provider, TelemetryProvider: flat.HCP.TelemetryProvider, ManagementTokenUpserterFn: func(name, secretId string) error { - if s.IsLeader() { + // Check the state of the server before attempting to upsert the token. Otherwise, + // the upsert will fail and log errors that do not require action from the user. + if s.config.ACLsEnabled && s.IsLeader() && s.InPrimaryDatacenter() { // Idea for improvement: Upsert a token with a well-known accessorId here instead // of a randomly generated one. This would prevent any possible insertion collision between // this and the insertion that happens during the ACL initialization process (initializeACLs function) @@ -953,15 +954,6 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, // Start the metrics handlers. go s.updateMetrics() - // Now we are setup, configure the HCP manager - go func() { - err := s.hcpManager.Run(&lib.StopChannelContext{StopCh: shutdownCh}) - if err != nil { - logger.Error("error starting HCP manager, some HashiCorp Cloud Platform functionality has been disabled", - "error", err) - } - }() - err = s.runEnterpriseRateLimiterConfigEntryController() if err != nil { return nil, err @@ -995,6 +987,7 @@ func (s *Server) registerControllers(deps Deps, proxyUpdater ProxyUpdater) error HCPAllowV2ResourceApis: s.hcpAllowV2Resources, CloudConfig: deps.HCP.Config, DataDir: deps.HCP.DataDir, + HCPManager: s.hcpManager, }) // When not enabled, the v1 tenancy bridge is used by default. diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index fe58542d76..be8b68f1d1 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -39,6 +39,7 @@ import ( external "github.com/hashicorp/consul/agent/grpc-external" grpcmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" hcpclient "github.com/hashicorp/consul/agent/hcp/client" + hcpconfig "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/agent/leafcert" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/rpc/middleware" @@ -2097,6 +2098,8 @@ func TestServer_Peering_LeadershipCheck(t *testing.T) { func TestServer_hcpManager(t *testing.T) { _, conf1 := testServerConfig(t) + + // Configure the server for the StatusFn conf1.BootstrapExpect = 1 conf1.RPCAdvertise = &net.TCPAddr{IP: []byte{127, 0, 0, 2}, Port: conf1.RPCAddr.Port} hcp1 := hcpclient.NewMockClient(t) @@ -2106,8 +2109,10 @@ func TestServer_hcpManager(t *testing.T) { require.Equal(t, status.LanAddress, "127.0.0.2") }).Call.Return(nil) + // Configure the server for the ManagementTokenUpserterFn + conf1.ACLsEnabled = true + deps1 := newDefaultDeps(t, conf1) - deps1.HCP.Client = hcp1 s1, err := newServerWithDeps(t, conf1, deps1) if err != nil { t.Fatalf("err: %v", err) @@ -2115,8 +2120,25 @@ func TestServer_hcpManager(t *testing.T) { defer s1.Shutdown() require.NotNil(t, s1.hcpManager) waitForLeaderEstablishment(t, s1) + + // Update the HCP manager and start it + token, err := uuid.GenerateUUID() + require.NoError(t, err) + s1.hcpManager.UpdateConfig(hcp1, hcpconfig.CloudConfig{ + ManagementToken: token, + }) + err = s1.hcpManager.Start(context.Background()) + require.NoError(t, err) + + // Validate that the server status pushed as expected hcp1.AssertExpectations(t) + // Validate that the HCP token has been created as expected + retry.Run(t, func(r *retry.R) { + _, createdToken, err := s1.fsm.State().ACLTokenGetBySecret(nil, token, nil) + require.NoError(r, err) + require.NotNil(r, createdToken) + }) } func TestServer_addServerTLSInfo(t *testing.T) { diff --git a/agent/hcp/config/config.go b/agent/hcp/config/config.go index 6a60f2892f..e91bfe079d 100644 --- a/agent/hcp/config/config.go +++ b/agent/hcp/config/config.go @@ -71,3 +71,50 @@ func (c *CloudConfig) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfi func (c *CloudConfig) IsConfigured() bool { return c.ResourceID != "" && c.ClientID != "" && c.ClientSecret != "" } + +// Merge returns a cloud configuration that is the combined the values of +// two configurations. +func Merge(o CloudConfig, n CloudConfig) CloudConfig { + c := o + if n.ResourceID != "" { + c.ResourceID = n.ResourceID + } + + if n.ClientID != "" { + c.ClientID = n.ClientID + } + + if n.ClientSecret != "" { + c.ClientSecret = n.ClientSecret + } + + if n.Hostname != "" { + c.Hostname = n.Hostname + } + + if n.AuthURL != "" { + c.AuthURL = n.AuthURL + } + + if n.ScadaAddress != "" { + c.ScadaAddress = n.ScadaAddress + } + + if n.ManagementToken != "" { + c.ManagementToken = n.ManagementToken + } + + if n.TLSConfig != nil { + c.TLSConfig = n.TLSConfig + } + + if n.NodeID != "" { + c.NodeID = n.NodeID + } + + if n.NodeName != "" { + c.NodeName = n.NodeName + } + + return c +} diff --git a/agent/hcp/config/config_test.go b/agent/hcp/config/config_test.go new file mode 100644 index 0000000000..ca07d4d94e --- /dev/null +++ b/agent/hcp/config/config_test.go @@ -0,0 +1,82 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package config + +import ( + "crypto/tls" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMerge(t *testing.T) { + oldCfg := CloudConfig{ + ResourceID: "old-resource-id", + ClientID: "old-client-id", + ClientSecret: "old-client-secret", + Hostname: "old-hostname", + AuthURL: "old-auth-url", + ScadaAddress: "old-scada-address", + ManagementToken: "old-token", + TLSConfig: &tls.Config{ + ServerName: "old-server-name", + }, + NodeID: "old-node-id", + NodeName: "old-node-name", + } + + newCfg := CloudConfig{ + ResourceID: "new-resource-id", + ClientID: "new-client-id", + ClientSecret: "new-client-secret", + Hostname: "new-hostname", + AuthURL: "new-auth-url", + ScadaAddress: "new-scada-address", + ManagementToken: "new-token", + TLSConfig: &tls.Config{ + ServerName: "new-server-name", + }, + NodeID: "new-node-id", + NodeName: "new-node-name", + } + + for name, tc := range map[string]struct { + newCfg CloudConfig + expectedCfg CloudConfig + }{ + "Empty": { + newCfg: CloudConfig{}, + expectedCfg: oldCfg, + }, + "All": { + newCfg: newCfg, + expectedCfg: newCfg, + }, + "Partial": { + newCfg: CloudConfig{ + ResourceID: newCfg.ResourceID, + ClientID: newCfg.ClientID, + ClientSecret: newCfg.ClientSecret, + ManagementToken: newCfg.ManagementToken, + }, + expectedCfg: CloudConfig{ + ResourceID: newCfg.ResourceID, + ClientID: newCfg.ClientID, + ClientSecret: newCfg.ClientSecret, + ManagementToken: newCfg.ManagementToken, + Hostname: oldCfg.Hostname, + AuthURL: oldCfg.AuthURL, + ScadaAddress: oldCfg.ScadaAddress, + TLSConfig: oldCfg.TLSConfig, + NodeID: oldCfg.NodeID, + NodeName: oldCfg.NodeName, + }, + }, + } { + t.Run(name, func(t *testing.T) { + merged := Merge(oldCfg, tc.newCfg) + require.Equal(t, tc.expectedCfg, merged) + }) + } +} diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index af00404c63..5027c88ac7 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -19,9 +19,8 @@ import ( // Deps contains the interfaces that the rest of Consul core depends on for HCP integration. type Deps struct { Config config.CloudConfig - Client client.Client Provider scada.Provider - Sink metrics.MetricSink + Sink metrics.ShutdownSink TelemetryProvider *hcpProviderImpl DataDir string } @@ -30,11 +29,6 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger, dataDir string) (Deps, ctx := context.Background() ctx = hclog.WithContext(ctx, logger) - hcpClient, err := client.NewClient(cfg) - if err != nil { - return Deps{}, fmt.Errorf("failed to init client: %w", err) - } - provider, err := scada.New(logger.Named("scada")) if err != nil { return Deps{}, fmt.Errorf("failed to init scada: %w", err) @@ -56,7 +50,6 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger, dataDir string) (Deps, return Deps{ Config: cfg, - Client: hcpClient, Provider: provider, Sink: sink, TelemetryProvider: metricsProvider, @@ -70,7 +63,7 @@ func sink( ctx context.Context, metricsClient telemetry.MetricsClient, cfgProvider *hcpProviderImpl, -) (metrics.MetricSink, error) { +) (metrics.ShutdownSink, error) { logger := hclog.FromContext(ctx) reader := telemetry.NewOTELReader(metricsClient, cfgProvider) diff --git a/agent/hcp/manager.go b/agent/hcp/manager.go index 84b9f5f7da..522f8a7f85 100644 --- a/agent/hcp/manager.go +++ b/agent/hcp/manager.go @@ -62,6 +62,9 @@ type ManagementTokenUpserter func(name, secretId string) error type Manager struct { logger hclog.Logger + running bool + runLock sync.RWMutex + cfg ManagerConfig cfgMu sync.RWMutex @@ -81,67 +84,90 @@ func NewManager(cfg ManagerConfig) *Manager { } } -// Run executes the Manager it's designed to be run in its own goroutine for -// the life of a server agent. It should be run even if HCP is not configured -// yet for servers since a config update might configure it later and -// UpdateConfig called. It will effectively do nothing if there are no HCP -// credentials set other than wait for some to be added. -func (m *Manager) Run(ctx context.Context) error { +// Start executes the logic for connecting to HCP and sending periodic server updates. If the +// manager has been previously started, it will not start again. +func (m *Manager) Start(ctx context.Context) error { + // Check if the manager has already started + changed := m.setRunning(true) + if !changed { + m.logger.Trace("HCP manager already started") + return nil + } + var err error - m.logger.Debug("HCP manager starting") + m.logger.Info("HCP manager starting") // Update and start the SCADA provider err = m.startSCADAProvider() if err != nil { m.logger.Error("failed to start scada provider", "error", err) + m.setRunning(false) return err } // Update and start the telemetry provider to enable the HCP metrics sink if err := m.startTelemetryProvider(ctx); err != nil { m.logger.Error("failed to update telemetry config provider", "error", err) + m.setRunning(false) return err } // immediately send initial update select { case <-ctx.Done(): + m.setRunning(false) return nil case <-m.updateCh: // empty the update chan if there is a queued update to prevent repeated update in main loop err = m.sendUpdate() + if err != nil { + m.setRunning(false) + return err + } default: err = m.sendUpdate() + if err != nil { + m.setRunning(false) + return err + } } // main loop - for { - // Check for configured management token from HCP and upsert it if found - if hcpManagement := m.cfg.CloudConfig.ManagementToken; len(hcpManagement) > 0 { - upsertTokenErr := m.cfg.ManagementTokenUpserterFn("HCP Management Token", hcpManagement) - if upsertTokenErr != nil { - m.logger.Error("failed to upsert HCP management token", "err", upsertTokenErr) + go func() { + for { + m.cfgMu.RLock() + cfg := m.cfg + m.cfgMu.RUnlock() + + // Check for configured management token from HCP and upsert it if found + if hcpManagement := cfg.CloudConfig.ManagementToken; len(hcpManagement) > 0 { + if cfg.ManagementTokenUpserterFn != nil { + upsertTokenErr := cfg.ManagementTokenUpserterFn("HCP Management Token", hcpManagement) + if upsertTokenErr != nil { + m.logger.Error("failed to upsert HCP management token", "err", upsertTokenErr) + } + } + } + + nextUpdate := cfg.nextHeartbeat() + if err != nil { + m.logger.Error("failed to send server status to HCP", "err", err, "next_heartbeat", nextUpdate.String()) + } + + select { + case <-ctx.Done(): + m.setRunning(false) + return + + case <-m.updateCh: + err = m.sendUpdate() + + case <-time.After(nextUpdate): + err = m.sendUpdate() } } + }() - m.cfgMu.RLock() - cfg := m.cfg - m.cfgMu.RUnlock() - nextUpdate := cfg.nextHeartbeat() - if err != nil { - m.logger.Error("failed to send server status to HCP", "err", err, "next_heartbeat", nextUpdate.String()) - } - - select { - case <-ctx.Done(): - return nil - - case <-m.updateCh: - err = m.sendUpdate() - - case <-time.After(nextUpdate): - err = m.sendUpdate() - } - } + return err } func (m *Manager) startSCADAProvider() error { @@ -184,21 +210,28 @@ func (m *Manager) startTelemetryProvider(ctx context.Context) error { return nil } -func (m *Manager) UpdateConfig(cfg ManagerConfig) { - m.cfgMu.Lock() - defer m.cfgMu.Unlock() - old := m.cfg - m.cfg = cfg - if old.enabled() || cfg.enabled() { - // Only log about this if cloud is actually configured or it would be - // confusing. We check both old and new in case we are disabling cloud or - // enabling it or just updating it. - m.logger.Info("updated HCP configuration") - } +func (m *Manager) GetCloudConfig() config.CloudConfig { + m.cfgMu.RLock() + defer m.cfgMu.RUnlock() - // Send a new status update since we might have just gotten connection details - // for the first time. - m.SendUpdate() + return m.cfg.CloudConfig +} + +func (m *Manager) UpdateConfig(client hcpclient.Client, cloudCfg config.CloudConfig) { + m.cfgMu.Lock() + // Save original values + originalCfg := m.cfg.CloudConfig + originalClient := m.cfg.Client + + // Update with new values + m.cfg.Client = client + m.cfg.CloudConfig = cloudCfg + m.cfgMu.Unlock() + + // Send update if already running and values were updated + if m.isRunning() && (originalClient != client || originalCfg != cloudCfg) { + m.SendUpdate() + } } func (m *Manager) SendUpdate() { @@ -245,5 +278,26 @@ func (m *Manager) sendUpdate() error { return err } - return m.cfg.Client.PushServerStatus(ctx, &s) + return cfg.Client.PushServerStatus(ctx, &s) +} + +func (m *Manager) isRunning() bool { + m.runLock.RLock() + defer m.runLock.RUnlock() + return m.running +} + +// setRunning sets the running status of the manager to the given value. If the +// given value is the same as the current running status, it returns false. If +// current status is updated to the given status, it returns true. +func (m *Manager) setRunning(r bool) bool { + m.runLock.Lock() + defer m.runLock.Unlock() + + if m.running == r { + return false + } + + m.running = r + return true } diff --git a/agent/hcp/manager_test.go b/agent/hcp/manager_test.go index e4195a6df3..2c00c87521 100644 --- a/agent/hcp/manager_test.go +++ b/agent/hcp/manager_test.go @@ -17,7 +17,7 @@ import ( "golang.org/x/net/context" ) -func TestManager_Run(t *testing.T) { +func TestManager_Start(t *testing.T) { client := hcpclient.NewMockClient(t) statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { return hcpclient.ServerStatus{ID: t.Name()}, nil @@ -36,12 +36,12 @@ func TestManager_Run(t *testing.T) { ManagementToken: "fake-token", } scadaM := scada.NewMockProvider(t) - scadaM.EXPECT().UpdateHCPConfig(cloudCfg).Return(nil) + scadaM.EXPECT().UpdateHCPConfig(cloudCfg).Return(nil).Once() scadaM.EXPECT().UpdateMeta( map[string]string{ "consul_server_id": string(cloudCfg.NodeID), }, - ).Return() + ).Return().Once() scadaM.EXPECT().Start().Return(nil) telemetryProvider := &hcpProviderImpl{ @@ -58,11 +58,9 @@ func TestManager_Run(t *testing.T) { mockTelemetryCfg, nil).Maybe() mgr := NewManager(ManagerConfig{ - Client: client, Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), StatusFn: statusF, ManagementTokenUpserterFn: upsertManagementTokenF, - CloudConfig: cloudCfg, SCADAProvider: scadaM, TelemetryProvider: telemetryProvider, }) @@ -70,20 +68,138 @@ func TestManager_Run(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - go mgr.Run(ctx) + mgr.UpdateConfig(client, cloudCfg) + mgr.Start(ctx) select { case <-updateCh: case <-time.After(time.Second): require.Fail(t, "manager did not send update in expected time") } + select { + case <-upsertManagementTokenCalled: + case <-time.After(time.Second): + require.Fail(t, "manager did not upsert management token in expected time") + } + // Make sure after manager has stopped no more statuses are pushed. cancel() client.AssertExpectations(t) require.Equal(t, client, telemetryProvider.hcpClient) require.NotNil(t, telemetryProvider.GetHeader()) require.NotNil(t, telemetryProvider.GetHTTPClient()) - require.NotEmpty(t, upsertManagementTokenCalled, "upsert management token function not called") +} + +func TestManager_StartMultipleTimes(t *testing.T) { + client := hcpclient.NewMockClient(t) + statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { + return hcpclient.ServerStatus{ID: t.Name()}, nil + } + + updateCh := make(chan struct{}, 1) + client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once() + + cloudCfg := config.CloudConfig{ + ResourceID: "organization/85702e73-8a3d-47dc-291c-379b783c5804/project/8c0547c0-10e8-1ea2-dffe-384bee8da634/hashicorp.consul.global-network-manager.cluster/test", + NodeID: "node-1", + ManagementToken: "fake-token", + } + + mgr := NewManager(ManagerConfig{ + Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), + StatusFn: statusF, + }) + + mgr.testUpdateSent = updateCh + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // Start the manager twice concurrently, expect only one update + mgr.UpdateConfig(client, cloudCfg) + go mgr.Start(ctx) + go mgr.Start(ctx) + select { + case <-updateCh: + case <-time.After(time.Second): + require.Fail(t, "manager did not send update in expected time") + } + + select { + case <-updateCh: + require.Fail(t, "manager sent an update when not expected") + case <-time.After(time.Second): + } + + // Try start the manager again, still don't expect an update since already running + mgr.Start(ctx) + select { + case <-updateCh: + require.Fail(t, "manager sent an update when not expected") + case <-time.After(time.Second): + } +} + +func TestManager_UpdateConfig(t *testing.T) { + client := hcpclient.NewMockClient(t) + statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { + return hcpclient.ServerStatus{ID: t.Name()}, nil + } + + updateCh := make(chan struct{}, 1) + + cloudCfg := config.CloudConfig{ + ResourceID: "organization/85702e73-8a3d-47dc-291c-379b783c5804/project/8c0547c0-10e8-1ea2-dffe-384bee8da634/hashicorp.consul.global-network-manager.cluster/test", + NodeID: "node-1", + } + + mgr := NewManager(ManagerConfig{ + Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}), + StatusFn: statusF, + CloudConfig: cloudCfg, + Client: client, + }) + + mgr.testUpdateSent = updateCh + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // Start the manager, expect an initial status update + client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once() + mgr.Start(ctx) + select { + case <-updateCh: + case <-time.After(time.Second): + require.Fail(t, "manager did not send update in expected time") + } + + // Update the cloud configuration, expect a status update + client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once() + updatedCfg := cloudCfg + updatedCfg.ManagementToken = "token" + mgr.UpdateConfig(client, updatedCfg) + select { + case <-updateCh: + case <-time.After(time.Second): + require.Fail(t, "manager did not send update in expected time") + } + + // Update the client, expect a status update + updatedClient := hcpclient.NewMockClient(t) + updatedClient.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once() + mgr.UpdateConfig(updatedClient, updatedCfg) + select { + case <-updateCh: + case <-time.After(time.Second): + require.Fail(t, "manager did not send update in expected time") + } + + // Update with the same values, don't expect a status update + mgr.UpdateConfig(updatedClient, updatedCfg) + select { + case <-updateCh: + require.Fail(t, "manager sent an update when not expected") + case <-time.After(time.Second): + } } func TestManager_SendUpdate(t *testing.T) { @@ -105,7 +221,7 @@ func TestManager_SendUpdate(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go mgr.Run(ctx) + mgr.Start(ctx) select { case <-updateCh: case <-time.After(time.Second): @@ -141,7 +257,7 @@ func TestManager_SendUpdate_Periodic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go mgr.Run(ctx) + mgr.Start(ctx) select { case <-updateCh: case <-time.After(time.Second): diff --git a/agent/hcp/telemetry/otel_sink.go b/agent/hcp/telemetry/otel_sink.go index 12aae98201..ad31077404 100644 --- a/agent/hcp/telemetry/otel_sink.go +++ b/agent/hcp/telemetry/otel_sink.go @@ -136,6 +136,10 @@ func NewOTELSink(ctx context.Context, opts *OTELSinkOpts) (*OTELSink, error) { }, nil } +func (o *OTELSink) Shutdown() { + o.meterProvider.Shutdown(context.TODO()) +} + // SetGauge emits a Consul gauge metric. func (o *OTELSink) SetGauge(key []string, val float32) { o.SetGaugeWithLabels(key, val, nil) diff --git a/agent/hcp/telemetry_provider.go b/agent/hcp/telemetry_provider.go index 34a55ebc3b..bf68520e35 100644 --- a/agent/hcp/telemetry_provider.go +++ b/agent/hcp/telemetry_provider.go @@ -114,14 +114,12 @@ func NewHCPProvider(ctx context.Context) *hcpProviderImpl { // Run starts a process that continuously checks for updates to the telemetry configuration // by making a request to HCP. It only starts running if it's not already running. func (h *hcpProviderImpl) Run(ctx context.Context, c *HCPProviderCfg) error { - if h.isRunning() { + changed := h.setRunning(true) + if !changed { + // Provider is already running. return nil } - h.rw.Lock() - h.running = true - h.rw.Unlock() - // Update the provider with the HCP configurations h.hcpClient = c.HCPClient err := h.updateHTTPConfig(c.HCPConfig) @@ -313,10 +311,18 @@ func (h *hcpProviderImpl) GetHTTPClient() *retryablehttp.Client { return h.httpCfg.client } -// isRunning acquires a read lock to return whether the provider is running. -func (h *hcpProviderImpl) isRunning() bool { - h.rw.RLock() - defer h.rw.RUnlock() +// setRunning acquires a write lock to set whether the provider is running. +// If the given value is the same as the current running status, it returns +// false. If current status is updated to the given status, it returns true. +func (h *hcpProviderImpl) setRunning(r bool) bool { + h.rw.Lock() + defer h.rw.Unlock() - return h.running + if h.running == r { + return false + } + + h.running = r + + return true } diff --git a/agent/hcp/telemetry_provider_test.go b/agent/hcp/telemetry_provider_test.go index 03bb15b265..eccca99c04 100644 --- a/agent/hcp/telemetry_provider_test.go +++ b/agent/hcp/telemetry_provider_test.go @@ -331,6 +331,63 @@ func TestTelemetryConfigProvider_Run(t *testing.T) { mockClient.AssertExpectations(t) } +func TestTelemetryConfigProvider_MultipleRun(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + provider := NewHCPProvider(ctx) + + testUpdateConfigCh := make(chan struct{}, 1) + provider.testUpdateConfigCh = testUpdateConfigCh + + // Configure mocks + mockClient := client.NewMockClient(t) + mTelemetryCfg, err := testTelemetryCfg(&testConfig{ + refreshInterval: 30 * time.Minute, + }) + require.NoError(t, err) + mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mTelemetryCfg, nil) + mockHCPCfg := &config.MockCloudCfg{} + + // Run provider twice in parallel + go provider.Run(context.Background(), &HCPProviderCfg{ + HCPClient: mockClient, + HCPConfig: mockHCPCfg, + }) + go provider.Run(context.Background(), &HCPProviderCfg{ + HCPClient: mockClient, + HCPConfig: mockHCPCfg, + }) + + // Expect only one update config call + select { + case <-testUpdateConfigCh: + case <-time.After(time.Second): + require.Fail(t, "provider did not attempt to update config in expected time") + } + + select { + case <-testUpdateConfigCh: + require.Fail(t, "provider unexpectedly updated config") + case <-time.After(time.Second): + } + + // Try calling run again, should not update again + provider.Run(context.Background(), &HCPProviderCfg{ + HCPClient: mockClient, + HCPConfig: mockHCPCfg, + }) + + select { + case <-testUpdateConfigCh: + require.Fail(t, "provider unexpectedly updated config") + case <-time.After(time.Second): + } + + mockClient.AssertExpectations(t) +} + func TestTelemetryConfigProvider_updateHTTPConfig(t *testing.T) { for name, test := range map[string]struct { wantErr string diff --git a/agent/setup.go b/agent/setup.go index 2c218ea13a..7dc8cdb9a7 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -137,21 +137,15 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl cfg.Telemetry.PrometheusOpts.SummaryDefinitions = summaries var extraSinks []metrics.MetricSink - if cfg.IsCloudEnabled() { - // This values is set late within newNodeIDFromConfig above - cfg.Cloud.NodeID = cfg.NodeID + // This values is set late within newNodeIDFromConfig above + cfg.Cloud.NodeID = cfg.NodeID - d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger.Named("hcp"), cfg.DataDir) - if err != nil { - return d, err - } - if d.HCP.Sink != nil { - extraSinks = append(extraSinks, d.HCP.Sink) - } - } else { - d.HCP = hcp.Deps{ - DataDir: cfg.DataDir, - } + d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger.Named("hcp"), cfg.DataDir) + if err != nil { + return d, err + } + if d.HCP.Sink != nil { + extraSinks = append(extraSinks, d.HCP.Sink) } d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger, extraSinks...) @@ -277,6 +271,9 @@ func (bd BaseDeps) Close() { bd.AutoConfig.Stop() bd.LeafCertManager.Stop() bd.MetricsConfig.Cancel() + if bd.HCP.Sink != nil { + bd.HCP.Sink.Shutdown() + } for _, fn := range []func(){bd.deregisterBalancer, bd.deregisterResolver, bd.stopHostCollector} { if fn != nil { diff --git a/api/agent_test.go b/api/agent_test.go index 511d51607d..067564f04e 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -58,8 +58,13 @@ func TestAPI_AgentMetrics(t *testing.T) { if err != nil { r.Fatalf("err: %v", err) } + hostname, err := os.Hostname() + if err != nil { + r.Fatalf("error determining hostname: %v", err) + } + metricName := fmt.Sprintf("consul.%s.runtime.alloc_bytes", hostname) for _, g := range metrics.Gauges { - if g.Name == "consul.runtime.alloc_bytes" { + if g.Name == metricName { return } } diff --git a/internal/hcp/internal/controllers/link/controller.go b/internal/hcp/internal/controllers/link/controller.go index d7ad99ba4b..df690971b6 100644 --- a/internal/hcp/internal/controllers/link/controller.go +++ b/internal/hcp/internal/controllers/link/controller.go @@ -12,6 +12,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/anypb" + "github.com/hashicorp/consul/agent/hcp" "github.com/hashicorp/consul/agent/hcp/bootstrap" hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/agent/hcp/config" @@ -27,14 +28,10 @@ import ( // This function type should be passed to a LinkController in order to tell it how to make a client from // a Link. For normal use, DefaultHCPClientFn should be used, but tests can substitute in a function that creates a // mock client. -type HCPClientFn func(link *pbhcp.Link) (hcpclient.Client, error) +type HCPClientFn func(config.CloudConfig) (hcpclient.Client, error) -var DefaultHCPClientFn HCPClientFn = func(link *pbhcp.Link) (hcpclient.Client, error) { - hcpClient, err := hcpclient.NewClient(config.CloudConfig{ - ResourceID: link.ResourceId, - ClientID: link.ClientId, - ClientSecret: link.ClientSecret, - }) +var DefaultHCPClientFn HCPClientFn = func(cfg config.CloudConfig) (hcpclient.Client, error) { + hcpClient, err := hcpclient.NewClient(cfg) if err != nil { return nil, err } @@ -47,8 +44,15 @@ func LinkController( hcpClientFn HCPClientFn, cfg config.CloudConfig, dataDir string, + hcpManager *hcp.Manager, ) *controller.Controller { return controller.NewController("link", pbhcp.LinkType). + // Placement is configured to each server so that the HCP manager is started + // on each server. We plan to implement an alternative strategy to starting + // the HCP manager so that the controller placement will eventually only be + // on the leader. + // https://hashicorp.atlassian.net/browse/CC-7364 + WithPlacement(controller.PlacementEachServer). WithInitializer(&linkInitializer{ cloudConfig: cfg, }). @@ -57,6 +61,7 @@ func LinkController( hcpAllowV2ResourceApis: hcpAllowV2ResourceApis, hcpClientFn: hcpClientFn, dataDir: dataDir, + hcpManager: hcpManager, }) } @@ -65,6 +70,7 @@ type linkReconciler struct { hcpAllowV2ResourceApis bool hcpClientFn HCPClientFn dataDir string + hcpManager *hcp.Manager } func hcpAccessLevelToConsul(level *gnmmod.HashicorpCloudGlobalNetworkManager20220215ClusterConsulAccessLevel) pbhcp.AccessLevel { @@ -131,7 +137,18 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r return writeStatusIfNotEqual(ctx, rt, res, newStatus) } - hcpClient, err := r.hcpClientFn(&link) + // Merge the link data with the existing cloud config so that we only overwrite the + // fields that are provided by the link. This ensures that: + // 1. The HCP configuration (i.e., how to connect to HCP) is preserved + // 2. The Consul agent's node ID and node name are preserved + existingCfg := r.hcpManager.GetCloudConfig() + newCfg := config.CloudConfig{ + ResourceID: link.ResourceId, + ClientID: link.ClientId, + ClientSecret: link.ClientSecret, + } + cfg := config.Merge(existingCfg, newCfg) + hcpClient, err := r.hcpClientFn(cfg) if err != nil { rt.Logger.Error("error creating HCP client", "error", err) return err @@ -159,7 +176,7 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r } _, err = rt.Client.Write(ctx, &pbresource.WriteRequest{Resource: &pbresource.Resource{ Id: &pbresource.ID{ - Name: "global", + Name: types.LinkName, Type: pbhcp.LinkType, }, Metadata: res.Metadata, @@ -173,13 +190,25 @@ func (r *linkReconciler) Reconcile(ctx context.Context, rt controller.Runtime, r // Load the management token if access is not set to read-only. Read-only clusters // will not have a management token provided by HCP. + var token string if accessLevel != pbhcp.AccessLevel_ACCESS_LEVEL_GLOBAL_READ_ONLY { - _, err = bootstrap.LoadManagementToken(ctx, rt.Logger, hcpClient, r.dataDir) + token, err = bootstrap.LoadManagementToken(ctx, rt.Logger, hcpClient, r.dataDir) if err != nil { linkingFailed(ctx, rt, res, err) return err } - // TODO: Update the HCP manager with the loaded management token as part of CC-7044 + } + + // Update the HCP manager configuration with the link values + cfg.ManagementToken = token + r.hcpManager.UpdateConfig(hcpClient, cfg) + + // Start the manager + err = r.hcpManager.Start(ctx) + if err != nil { + rt.Logger.Error("error starting HCP manager", "error", err) + linkingFailed(ctx, rt, res, err) + return err } newStatus = &pbresource.Status{ diff --git a/internal/hcp/internal/controllers/link/controller_test.go b/internal/hcp/internal/controllers/link/controller_test.go index d4b911390e..da4dd188d4 100644 --- a/internal/hcp/internal/controllers/link/controller_test.go +++ b/internal/hcp/internal/controllers/link/controller_test.go @@ -10,6 +10,7 @@ import ( "path/filepath" "testing" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models" "github.com/stretchr/testify/mock" @@ -17,6 +18,7 @@ import ( "github.com/stretchr/testify/suite" svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing" + "github.com/hashicorp/consul/agent/hcp" "github.com/hashicorp/consul/agent/hcp/bootstrap" hcpclient "github.com/hashicorp/consul/agent/hcp/client" "github.com/hashicorp/consul/agent/hcp/config" @@ -44,7 +46,7 @@ type controllerSuite struct { func mockHcpClientFn(t *testing.T) (*hcpclient.MockClient, HCPClientFn) { mockClient := hcpclient.NewMockClient(t) - mockClientFunc := func(link *pbhcp.Link) (hcpclient.Client, error) { + mockClientFunc := func(config config.CloudConfig) (hcpclient.Client, error) { return mockClient, nil } @@ -106,6 +108,16 @@ func (suite *controllerSuite) TestController_Ok() { ConsulConfig: "{}", }, nil).Once() + statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { + return hcpclient.ServerStatus{ID: suite.T().Name()}, nil + } + mockClient.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: suite.T().Name()}). + Return(nil).Once() + hcpMgr := hcp.NewManager(hcp.ManagerConfig{ + Logger: hclog.NewNullLogger(), + StatusFn: statusF, + }) + dataDir := testutil.TempDir(suite.T(), "test-link-controller") suite.dataDir = dataDir mgr.Register(LinkController( @@ -114,6 +126,7 @@ func (suite *controllerSuite) TestController_Ok() { mockClientFn, config.CloudConfig{}, dataDir, + hcpMgr, )) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) @@ -160,6 +173,16 @@ func (suite *controllerSuite) TestController_Initialize() { ResourceID: types.GenerateTestResourceID(suite.T()), } + statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) { + return hcpclient.ServerStatus{ID: suite.T().Name()}, nil + } + mockClient.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: suite.T().Name()}). + Return(nil).Once() + hcpMgr := hcp.NewManager(hcp.ManagerConfig{ + Logger: hclog.NewNullLogger(), + StatusFn: statusF, + }) + dataDir := testutil.TempDir(suite.T(), "test-link-controller") suite.dataDir = dataDir @@ -169,6 +192,7 @@ func (suite *controllerSuite) TestController_Initialize() { mockClientFn, cloudCfg, dataDir, + hcpMgr, )) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) @@ -207,6 +231,7 @@ func (suite *controllerSuite) TestControllerResourceApisEnabled_LinkDisabled() { mockClientFunc, config.CloudConfig{}, dataDir, + hcp.NewManager(hcp.ManagerConfig{}), )) mgr.SetRaftLeader(true) go mgr.Run(suite.ctx) @@ -243,6 +268,9 @@ func (suite *controllerSuite) TestControllerResourceApisEnabledWithOverride_Link dataDir := testutil.TempDir(suite.T(), "test-link-controller") suite.dataDir = dataDir + hcpMgr := hcp.NewManager(hcp.ManagerConfig{ + Logger: hclog.NewNullLogger(), + }) mgr.Register(LinkController( true, @@ -250,6 +278,7 @@ func (suite *controllerSuite) TestControllerResourceApisEnabledWithOverride_Link mockClientFunc, config.CloudConfig{}, dataDir, + hcpMgr, )) mgr.SetRaftLeader(true) @@ -304,6 +333,7 @@ func (suite *controllerSuite) TestController_GetClusterError() { mockClientFunc, config.CloudConfig{}, dataDir, + hcp.NewManager(hcp.ManagerConfig{}), )) mgr.SetRaftLeader(true) diff --git a/internal/hcp/internal/controllers/register.go b/internal/hcp/internal/controllers/register.go index 6920fbecd5..cce864e00b 100644 --- a/internal/hcp/internal/controllers/register.go +++ b/internal/hcp/internal/controllers/register.go @@ -4,6 +4,7 @@ package controllers import ( + "github.com/hashicorp/consul/agent/hcp" "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/internal/controller" "github.com/hashicorp/consul/internal/hcp/internal/controllers/link" @@ -14,6 +15,7 @@ type Dependencies struct { ResourceApisEnabled bool HCPAllowV2ResourceApis bool DataDir string + HCPManager *hcp.Manager } func Register(mgr *controller.Manager, deps Dependencies) { @@ -23,5 +25,6 @@ func Register(mgr *controller.Manager, deps Dependencies) { link.DefaultHCPClientFn, deps.CloudConfig, deps.DataDir, + deps.HCPManager, )) } diff --git a/internal/hcp/internal/types/link.go b/internal/hcp/internal/types/link.go index 80e0343089..6ab07c5240 100644 --- a/internal/hcp/internal/types/link.go +++ b/internal/hcp/internal/types/link.go @@ -6,8 +6,10 @@ package types import ( "errors" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/internal/resource" pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2" + "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/go-multierror" hcpresource "github.com/hashicorp/hcp-sdk-go/resource" ) @@ -31,9 +33,44 @@ func RegisterLink(r resource.Registry) { Proto: &pbhcp.Link{}, Scope: resource.ScopeCluster, Validate: ValidateLink, + ACLs: &resource.ACLHooks{ + Read: aclReadHookLink, + Write: aclWriteHookLink, + List: aclListHookLink, + }, }) } +func aclReadHookLink(authorizer acl.Authorizer, authzContext *acl.AuthorizerContext, _ *pbresource.ID, _ *pbresource.Resource) error { + err := authorizer.ToAllowAuthorizer().OperatorReadAllowed(authzContext) + if err != nil { + return err + } + return nil +} + +func aclWriteHookLink(authorizer acl.Authorizer, authzContext *acl.AuthorizerContext, _ *pbresource.Resource) error { + err := authorizer.ToAllowAuthorizer().OperatorWriteAllowed(authzContext) + if err != nil { + return err + } + + err = authorizer.ToAllowAuthorizer().ACLWriteAllowed(authzContext) + if err != nil { + return err + } + + return nil +} + +func aclListHookLink(authorizer acl.Authorizer, authzContext *acl.AuthorizerContext) error { + err := authorizer.ToAllowAuthorizer().OperatorReadAllowed(authzContext) + if err != nil { + return err + } + return nil +} + var ValidateLink = resource.DecodeAndValidate(validateLink) func validateLink(res *DecodedLink) error { diff --git a/internal/hcp/internal/types/link_test.go b/internal/hcp/internal/types/link_test.go index d53ec909cd..46f03a4a81 100644 --- a/internal/hcp/internal/types/link_test.go +++ b/internal/hcp/internal/types/link_test.go @@ -182,15 +182,15 @@ func TestLinkACLs(t *testing.T) { WriteOK: rtest.DENY, ListOK: rtest.DENY, }, - "link test read": { - Rules: `operator = "read"`, + "link test read and list": { + Rules: `{"operator": "read"}`, Res: link, ReadOK: rtest.ALLOW, WriteOK: rtest.DENY, ListOK: rtest.ALLOW, }, "link test write": { - Rules: `operator = "write"`, + Rules: `{"operator": "write", "acl": "write"}`, Res: link, ReadOK: rtest.ALLOW, WriteOK: rtest.ALLOW, diff --git a/lib/telemetry.go b/lib/telemetry.go index e06341eefb..bb6f4624ae 100644 --- a/lib/telemetry.go +++ b/lib/telemetry.go @@ -360,13 +360,9 @@ func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink, extraSinks } } - if len(sinks) > 0 { - sinks = append(sinks, memSink) - metrics.NewGlobal(metricsConf, sinks) - } else { - metricsConf.EnableHostname = false - metrics.NewGlobal(metricsConf, memSink) - } + sinks = append(sinks, memSink) + metrics.NewGlobal(metricsConf, sinks) + return sinks, errors }